Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

@nohost/router

Package Overview
Dependencies
Maintainers
2
Versions
20
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@nohost/router - npm Package Compare versions

Comparing version 0.5.6 to 0.6.0

227

lib/connect.js

@@ -1,216 +0,29 @@

const http = require('http');
const net = require('net');
const { getRawHeaders, getRawHeaderNames, formatHeaders } = require('hparser');
const { isLocalAddress } = require('./address');
const { request, tunnel, upgrade, getRawHeaders, onClose } = require('@nohost/connect');
const noop = () => {};
const LOCALHOST = '127.0.0.1';
const XFF = 'x-forwarded-for';
const XWCP = 'x-whistle-client-port';
const WS_RE = /websocket/i;
const CLOSED_ERR = new Error('Closed');
const TIMEOUT_ERR = new Error('Timeout');
const TIMEOUT = 3600;
const ERROR_HEADERS = { 'x-server': 'nohost/connect' };
const isUpgrade = ({ headers }) => /\bupgrade\b/i.test(headers.connection);
const connect = function(options, callback) {
let socket;
let timer;
let done;
let retry;
const execCallback = function(err) {
clearTimeout(timer);
timer = null;
if (!done) {
done = true;
callback(err, socket);
}
};
const handleError = function(err) {
if (done) {
return;
}
socket.removeAllListeners();
socket.on('error', noop);
socket.destroy(err);
clearTimeout(timer);
if (retry) {
return execCallback(err);
}
retry = true;
timer = setTimeout(() => handleError(TIMEOUT_ERR), TIMEOUT);
try {
socket = net.connect(options, execCallback);
} catch (e) {
return execCallback(e);
}
socket.on('error', handleError);
socket.on('close', (e) => {
if (!done) {
execCallback(e || new Error('closed'));
}
});
};
timer = setTimeout(() => handleError(TIMEOUT_ERR), TIMEOUT);
try {
socket = net.connect(options, execCallback);
} catch (e) {
return execCallback(e);
module.exports = async (options, req, res) => {
if (!res) {
return request(req, options);
}
socket.on('error', handleError);
};
const getClientPort = (req) => {
return req.socket.remotePort || 0;
};
const removeIPV6Prefix = (ip) => {
if (typeof ip !== 'string') {
return '';
}
return ip.indexOf('::ffff:') === 0 ? ip.substring(7) : ip;
};
const getClientIp = (req) => {
let ip = req.headers[XFF];
if (!net.isIP(ip) || isLocalAddress(ip)) {
ip = req.socket.remoteAddress;
}
ip = removeIPV6Prefix(ip);
return isLocalAddress(ip) ? LOCALHOST : ip;
};
const restoreHeaders = (req, isRes) => {
const { headers, rawHeaders } = req;
if (!isRes) {
headers[XFF] = getClientIp(req);
headers[XWCP] = getClientPort(req);
}
return formatHeaders(headers, rawHeaders && getRawHeaderNames(rawHeaders));
};
const destroy = (req) => {
if (req) {
if (req.destroy) {
req.destroy();
} else if (req.abort) {
req.abort();
if (!res.writeHead) {
if (isUpgrade(req)) {
upgrade(req, options);
} else {
tunnel(req, options);
}
return;
}
};
exports.destroy = destroy;
const getOptions = (req) => {
return {
path: req.url || '/',
method: req.method,
agent: false,
headers: restoreHeaders(req),
};
};
const addCloseEvent = (req, cb) => {
req.on('error', cb);
req.once('close', cb);
};
const onClose = (req, res, cb) => {
const execCb = (err) => {
if (req._hasError) {
return;
}
req._hasError = true;
cb(err || CLOSED_ERR);
};
if (typeof res === 'function') {
cb = res;
} else if (res) {
addCloseEvent(res, execCb);
onClose(res, (err) => req.emit('close', err));
try {
const svrRes = await request(req, options);
res.writeHead(svrRes.statusCode, getRawHeaders(svrRes));
svrRes.pipe(res);
} catch (err) {
res.writeHead(500, ERROR_HEADERS);
res.end(err.stack);
}
addCloseEvent(req, execCb);
};
const connectNohost = (options, req, reqSock, callback) => {
connect(options, (err, socket) => {
// 确保销毁所有连接
if (err) {
if (!req._hasError) {
destroy(req);
destroy(reqSock);
}
return callback(err);
}
if (req._hasError) {
socket.destroy();
return callback(CLOSED_ERR);
}
onClose(reqSock || req, () => socket.destroy());
callback(null, socket);
});
};
const proxyConnect = (options, req, reqSock) => {
connectNohost(options, req, reqSock, (err, socket) => {
if (err) {
return;
}
socket.write([
`${WS_RE.test(req.headers.upgrade) ? 'GET' : 'CONNECT'} ${req.url} HTTP/1.1`,
getRawHeaders(restoreHeaders(req)),
'\r\n',
].join('\r\n'));
reqSock.pipe(socket).pipe(reqSock);
});
};
const proxyRequest = (options, req, callback) => {
connectNohost(options, req, null, (err, socket) => {
if (err) {
return callback(err);
}
let done;
const execCb = (e, res) => {
if (e) {
destroy(client); // eslint-disable-line
destroy(req);
}
if (!done) {
done = true;
callback(e, res);
}
};
const opts = getOptions(req);
opts.createConnection = () => socket;
opts.agent = null;
const client = http.request(opts, res => {
res.on('error', execCb);
execCb(null, res);
});
onClose(req, execCb);
client.on('error', execCb);
req.pipe(client);
});
};
exports.proxyToNohost = (options, req, res) => {
onClose(req, res, () => {
destroy(req);
destroy(res);
});
if (res && !res.writeHead) {
return proxyConnect(options, req, res);
}
return new Promise((resolve) => {
proxyRequest(options, req, (err, svrRes) => {
if (err) {
destroy(req);
return destroy(res);
}
if (!res) {
return resolve(svrRes);
}
res.writeHead(svrRes.statusCode, restoreHeaders(svrRes, true));
svrRes.pipe(res);
});
});
};

140

lib/index.js

@@ -1,7 +0,8 @@

const crc32 = require('crc32');
const url = require('url');
const { getWorkers } = require('./util');
const { proxyToNohost, destroy } = require('./connect');
const { parse: parseUrl } = require('url');
const { getServers, isFinished, getJSON } = require('./util');
const connect = require('./connect');
const INTERVAL = 12000;
const INTERVAL = 10000;
const ONE_MINU = 1000 * 60;
const SPACE_NAME = 'x-whistle-nohost-space-name';

@@ -11,13 +12,64 @@ const GROUP_NAME = 'x-whistle-nohost-group-name';

const ENV_HEAD = 'x-whistle-nohost-env';
let index = 0;
class Router {
constructor(servers) {
if (!Array.isArray(servers)) {
this._nohostAddress = {
host: servers.host,
port: servers.port,
};
return;
}
this._index = 0;
this._statusCache = {};
let curMinute = Math.floor(Date.now() / ONE_MINU);
setInterval(() => {
const minute = Math.floor(Date.now() / ONE_MINU);
if (minute === curMinute) {
return;
}
curMinute = minute;
const cache = this._statusCache;
Object.keys(cache).forEach((key) => {
if (cache[key].initTime !== minute) {
delete cache[key];
}
});
}, 1000);
this.update(servers);
}
getWorkers() {
return this._workers == null ? this._pending : this._workers;
_connectDefault(req, res, callback) {
const { servers } = this._result;
const i = this._index++ % servers.length;
if (this._index >= Number.MAX_SAFE_INTEGER) {
this._index = 0;
}
req.headers[ENV_HEAD] = '$';
const server = servers[i];
if (typeof callback === 'function') {
callback(server);
}
return connect(server, req, res);
}
_getStatus(space, group, env) {
const { base64 } = this._result;
if (!this._base64 || this._base64 !== base64) {
this._statusCache = {};
this._base64 = base64;
}
const query = `?space=${space}&group=${group}&env=${env || ''}`;
// 考虑到实际使用场景不会有那么多在线的环境,这里不使用 LRU cache
let status = this._statusCache[query];
if (!status) {
const options = parseUrl(`${this._servers[0].statusUrl}${query}`);
options.headers = { 'x-nohost-servers': base64 };
status = getJSON(options);
status.initTime = Math.floor(Date.now() / ONE_MINU);
this._statusCache[query] = status;
}
return status;
}
update(servers) {

@@ -30,5 +82,5 @@ this._servers = servers || [];

clearTimeout(this._timer);
this._pending = getWorkers(this._servers);
this._pending.then((workers) => {
this._workers = workers;
this._pending = getServers(this._servers);
this._pending.then((result) => {
this._result = result || '';
this._pending = null;

@@ -45,26 +97,46 @@ if (this._waiting) {

async proxy(req, res) {
const space = req.headers[SPACE_NAME];
const group = req.headers[GROUP_NAME];
const name = req.headers[ENV_NAME];
const workers = await this.getWorkers();
if (!workers.totalLen || req.socket.destroyed) {
destroy(req);
destroy(res);
async proxy(req, res, callback) {
if (typeof res === 'function') {
callback = res;
res = null;
}
if (this._nohostAddress) {
if (req.isUIRequest) {
req.headers['x-whistle-nohost-ui'] = 1;
}
if (typeof callback === 'function') {
callback(this._nohostAddress);
}
return connect(this._nohostAddress, req, res);
}
let result = this._result;
if (result == null) {
result = await this._pending;
}
if (!result || isFinished(req)) {
if (!res) {
throw new Error(result ? 'request is finished.' : 'not found nohost server.');
}
req.destroy();
return;
}
const { headers } = req;
const space = headers[SPACE_NAME];
const group = headers[GROUP_NAME];
const name = headers[ENV_NAME];
if (!space || !group) {
if (req.isUIRequest) {
return destroy(req);
if (!res) {
throw new Error('space & group is required.');
}
req.destroy();
return;
}
const servers = workers._servers;
const i = index++ % servers.totalLen;
headers[ENV_HEAD] = `${i}`;
return proxyToNohost(servers[i], req, res);
return this._connectDefault(req, res, callback);
}
const hash = parseInt(crc32(`${space}/${group}/${name || ''}`), 16) % workers.totalLen;
const env = `$${hash}`;
headers[ENV_HEAD] = env;
const status = await this._getStatus(space, group, name);
if (!status.host) {
return this._connectDefault(req, res, callback);
}
const env = `$${status.index}`;
if (req.isUIRequest) {

@@ -80,8 +152,12 @@ headers['x-whistle-nohost-ui'] = 1;

}
return proxyToNohost(workers[hash], req, res);
headers[ENV_HEAD] = env;
if (typeof callback === 'function') {
callback(status);
}
return connect(status, req, res);
}
proxyUI(req, res) {
proxyUI(req, res, callback) {
req.isUIRequest = true;
return this.proxy(req, res);
return this.proxy(req, res, callback);
}

@@ -93,6 +169,6 @@ }

Router.ENV_NAME = ENV_NAME;
Router.NOHOST_RULE = 'x-whistle-nohost-rule';
Router.NOHOST_VALUE = 'x-whistle-nohost-value';
Router.NOHOST_RULE = 'x-whistle-rule-value';
Router.NOHOST_VALUE = 'x-whistle-key-value';
Router.CLIENT_ID = 'x-whistle-client-id';
Router.CLIENT_ID_FILTER = 'x-whistle-filter-client-id';
module.exports = Router;

@@ -6,3 +6,2 @@ const http = require('http');

const TIMEOUT = 2000;
let workersCache = {};

@@ -13,7 +12,6 @@ const getBody = (url) => {

const timer = setTimeout(() => {
client.abort(); // eslint-disable-line
client.destroy(); // eslint-disable-line
}, TIMEOUT);
const client = http.get(url, (res) => {
clearTimeout(timer);
res.on('error', reject);
if (res.statusCode !== 200) {

@@ -26,5 +24,3 @@ return reject(new Error(`Response status code: ${res.statusCode}`));

});
res.on('end', () => {
resolve(body);
});
res.on('end', () => resolve(body));
});

@@ -48,2 +44,4 @@ client.on('error', reject);

exports.getJSON = (options) => getJSON(options, RETRY_COUNT);
const getWorkerNum = async ({ statusUrl }) => {

@@ -78,2 +76,31 @@ try {

exports.isFinished = (req) => {
if (req.finished) {
return true;
}
const { socket } = req;
return socket && (socket.destroyed || !socket.writable);
};
exports.getServers = async (servers) => {
servers = parseServers(servers);
if (!servers.length) {
return {};
}
const usableServers = [];
const list = [];
await Promise.all(servers.map(async (server) => {
const workerNum = await getWorkerNum(server);
if (workerNum) {
server.workerNum = workerNum;
usableServers.push(server);
list.push(`${server.host}:${server.port}/${workerNum}`);
}
}));
return list.length && {
servers: usableServers,
base64: Buffer.from(list.join()).toString('base64'),
};
};
exports.getWorkers = async (servers) => {

@@ -84,19 +111,19 @@ servers = parseServers(servers);

}
const result = await Promise.all(servers.map(getWorkerNum));
const workers = [];
const usableServers = [];
const cache = {};
servers.forEach(({ host, port }, i) => {
const key = `${host}:${port}`;
const workerNum = result[i] || workersCache[key];
const base64 = [];
await Promise.all(servers.map(async (server) => {
const workerNum = await getWorkerNum(server);
if (workerNum) {
cache[key] = workerNum;
server.workerNum = workerNum;
const { host, port } = server;
const option = { host, port };
usableServers.push(option);
for (i = 0; i < workerNum; i++) {
base64.push(`${host}:${port}/${workerNum}`);
for (let i = 0; i < workerNum; i++) {
workers.push(option);
}
}
});
workersCache = cache;
}));
usableServers._base64 = Buffer.from(base64.join()).toString('base64');
workers.totalLen = workers.length;

@@ -103,0 +130,0 @@ usableServers.totalLen = usableServers.length;

{
"name": "@nohost/router",
"version": "0.5.6",
"version": "0.6.0",
"description": "Nohost cluster router",

@@ -35,2 +35,3 @@ "main": "lib/",

"dependencies": {
"@nohost/connect": "^1.2.1",
"crc32": "^0.2.2",

@@ -37,0 +38,0 @@ "hparser": "^0.3.0"

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc