@nohost/router
Advanced tools
Comparing version 0.5.6 to 0.6.0
@@ -1,216 +0,29 @@ | ||
const http = require('http'); | ||
const net = require('net'); | ||
const { getRawHeaders, getRawHeaderNames, formatHeaders } = require('hparser'); | ||
const { isLocalAddress } = require('./address'); | ||
const { request, tunnel, upgrade, getRawHeaders, onClose } = require('@nohost/connect'); | ||
const noop = () => {}; | ||
const LOCALHOST = '127.0.0.1'; | ||
const XFF = 'x-forwarded-for'; | ||
const XWCP = 'x-whistle-client-port'; | ||
const WS_RE = /websocket/i; | ||
const CLOSED_ERR = new Error('Closed'); | ||
const TIMEOUT_ERR = new Error('Timeout'); | ||
const TIMEOUT = 3600; | ||
const ERROR_HEADERS = { 'x-server': 'nohost/connect' }; | ||
const isUpgrade = ({ headers }) => /\bupgrade\b/i.test(headers.connection); | ||
const connect = function(options, callback) { | ||
let socket; | ||
let timer; | ||
let done; | ||
let retry; | ||
const execCallback = function(err) { | ||
clearTimeout(timer); | ||
timer = null; | ||
if (!done) { | ||
done = true; | ||
callback(err, socket); | ||
} | ||
}; | ||
const handleError = function(err) { | ||
if (done) { | ||
return; | ||
} | ||
socket.removeAllListeners(); | ||
socket.on('error', noop); | ||
socket.destroy(err); | ||
clearTimeout(timer); | ||
if (retry) { | ||
return execCallback(err); | ||
} | ||
retry = true; | ||
timer = setTimeout(() => handleError(TIMEOUT_ERR), TIMEOUT); | ||
try { | ||
socket = net.connect(options, execCallback); | ||
} catch (e) { | ||
return execCallback(e); | ||
} | ||
socket.on('error', handleError); | ||
socket.on('close', (e) => { | ||
if (!done) { | ||
execCallback(e || new Error('closed')); | ||
} | ||
}); | ||
}; | ||
timer = setTimeout(() => handleError(TIMEOUT_ERR), TIMEOUT); | ||
try { | ||
socket = net.connect(options, execCallback); | ||
} catch (e) { | ||
return execCallback(e); | ||
module.exports = async (options, req, res) => { | ||
if (!res) { | ||
return request(req, options); | ||
} | ||
socket.on('error', handleError); | ||
}; | ||
const getClientPort = (req) => { | ||
return req.socket.remotePort || 0; | ||
}; | ||
const removeIPV6Prefix = (ip) => { | ||
if (typeof ip !== 'string') { | ||
return ''; | ||
} | ||
return ip.indexOf('::ffff:') === 0 ? ip.substring(7) : ip; | ||
}; | ||
const getClientIp = (req) => { | ||
let ip = req.headers[XFF]; | ||
if (!net.isIP(ip) || isLocalAddress(ip)) { | ||
ip = req.socket.remoteAddress; | ||
} | ||
ip = removeIPV6Prefix(ip); | ||
return isLocalAddress(ip) ? LOCALHOST : ip; | ||
}; | ||
const restoreHeaders = (req, isRes) => { | ||
const { headers, rawHeaders } = req; | ||
if (!isRes) { | ||
headers[XFF] = getClientIp(req); | ||
headers[XWCP] = getClientPort(req); | ||
} | ||
return formatHeaders(headers, rawHeaders && getRawHeaderNames(rawHeaders)); | ||
}; | ||
const destroy = (req) => { | ||
if (req) { | ||
if (req.destroy) { | ||
req.destroy(); | ||
} else if (req.abort) { | ||
req.abort(); | ||
if (!res.writeHead) { | ||
if (isUpgrade(req)) { | ||
upgrade(req, options); | ||
} else { | ||
tunnel(req, options); | ||
} | ||
return; | ||
} | ||
}; | ||
exports.destroy = destroy; | ||
const getOptions = (req) => { | ||
return { | ||
path: req.url || '/', | ||
method: req.method, | ||
agent: false, | ||
headers: restoreHeaders(req), | ||
}; | ||
}; | ||
const addCloseEvent = (req, cb) => { | ||
req.on('error', cb); | ||
req.once('close', cb); | ||
}; | ||
const onClose = (req, res, cb) => { | ||
const execCb = (err) => { | ||
if (req._hasError) { | ||
return; | ||
} | ||
req._hasError = true; | ||
cb(err || CLOSED_ERR); | ||
}; | ||
if (typeof res === 'function') { | ||
cb = res; | ||
} else if (res) { | ||
addCloseEvent(res, execCb); | ||
onClose(res, (err) => req.emit('close', err)); | ||
try { | ||
const svrRes = await request(req, options); | ||
res.writeHead(svrRes.statusCode, getRawHeaders(svrRes)); | ||
svrRes.pipe(res); | ||
} catch (err) { | ||
res.writeHead(500, ERROR_HEADERS); | ||
res.end(err.stack); | ||
} | ||
addCloseEvent(req, execCb); | ||
}; | ||
const connectNohost = (options, req, reqSock, callback) => { | ||
connect(options, (err, socket) => { | ||
// 确保销毁所有连接 | ||
if (err) { | ||
if (!req._hasError) { | ||
destroy(req); | ||
destroy(reqSock); | ||
} | ||
return callback(err); | ||
} | ||
if (req._hasError) { | ||
socket.destroy(); | ||
return callback(CLOSED_ERR); | ||
} | ||
onClose(reqSock || req, () => socket.destroy()); | ||
callback(null, socket); | ||
}); | ||
}; | ||
const proxyConnect = (options, req, reqSock) => { | ||
connectNohost(options, req, reqSock, (err, socket) => { | ||
if (err) { | ||
return; | ||
} | ||
socket.write([ | ||
`${WS_RE.test(req.headers.upgrade) ? 'GET' : 'CONNECT'} ${req.url} HTTP/1.1`, | ||
getRawHeaders(restoreHeaders(req)), | ||
'\r\n', | ||
].join('\r\n')); | ||
reqSock.pipe(socket).pipe(reqSock); | ||
}); | ||
}; | ||
const proxyRequest = (options, req, callback) => { | ||
connectNohost(options, req, null, (err, socket) => { | ||
if (err) { | ||
return callback(err); | ||
} | ||
let done; | ||
const execCb = (e, res) => { | ||
if (e) { | ||
destroy(client); // eslint-disable-line | ||
destroy(req); | ||
} | ||
if (!done) { | ||
done = true; | ||
callback(e, res); | ||
} | ||
}; | ||
const opts = getOptions(req); | ||
opts.createConnection = () => socket; | ||
opts.agent = null; | ||
const client = http.request(opts, res => { | ||
res.on('error', execCb); | ||
execCb(null, res); | ||
}); | ||
onClose(req, execCb); | ||
client.on('error', execCb); | ||
req.pipe(client); | ||
}); | ||
}; | ||
exports.proxyToNohost = (options, req, res) => { | ||
onClose(req, res, () => { | ||
destroy(req); | ||
destroy(res); | ||
}); | ||
if (res && !res.writeHead) { | ||
return proxyConnect(options, req, res); | ||
} | ||
return new Promise((resolve) => { | ||
proxyRequest(options, req, (err, svrRes) => { | ||
if (err) { | ||
destroy(req); | ||
return destroy(res); | ||
} | ||
if (!res) { | ||
return resolve(svrRes); | ||
} | ||
res.writeHead(svrRes.statusCode, restoreHeaders(svrRes, true)); | ||
svrRes.pipe(res); | ||
}); | ||
}); | ||
}; |
140
lib/index.js
@@ -1,7 +0,8 @@ | ||
const crc32 = require('crc32'); | ||
const url = require('url'); | ||
const { getWorkers } = require('./util'); | ||
const { proxyToNohost, destroy } = require('./connect'); | ||
const { parse: parseUrl } = require('url'); | ||
const { getServers, isFinished, getJSON } = require('./util'); | ||
const connect = require('./connect'); | ||
const INTERVAL = 12000; | ||
const INTERVAL = 10000; | ||
const ONE_MINU = 1000 * 60; | ||
const SPACE_NAME = 'x-whistle-nohost-space-name'; | ||
@@ -11,13 +12,64 @@ const GROUP_NAME = 'x-whistle-nohost-group-name'; | ||
const ENV_HEAD = 'x-whistle-nohost-env'; | ||
let index = 0; | ||
class Router { | ||
constructor(servers) { | ||
if (!Array.isArray(servers)) { | ||
this._nohostAddress = { | ||
host: servers.host, | ||
port: servers.port, | ||
}; | ||
return; | ||
} | ||
this._index = 0; | ||
this._statusCache = {}; | ||
let curMinute = Math.floor(Date.now() / ONE_MINU); | ||
setInterval(() => { | ||
const minute = Math.floor(Date.now() / ONE_MINU); | ||
if (minute === curMinute) { | ||
return; | ||
} | ||
curMinute = minute; | ||
const cache = this._statusCache; | ||
Object.keys(cache).forEach((key) => { | ||
if (cache[key].initTime !== minute) { | ||
delete cache[key]; | ||
} | ||
}); | ||
}, 1000); | ||
this.update(servers); | ||
} | ||
getWorkers() { | ||
return this._workers == null ? this._pending : this._workers; | ||
_connectDefault(req, res, callback) { | ||
const { servers } = this._result; | ||
const i = this._index++ % servers.length; | ||
if (this._index >= Number.MAX_SAFE_INTEGER) { | ||
this._index = 0; | ||
} | ||
req.headers[ENV_HEAD] = '$'; | ||
const server = servers[i]; | ||
if (typeof callback === 'function') { | ||
callback(server); | ||
} | ||
return connect(server, req, res); | ||
} | ||
_getStatus(space, group, env) { | ||
const { base64 } = this._result; | ||
if (!this._base64 || this._base64 !== base64) { | ||
this._statusCache = {}; | ||
this._base64 = base64; | ||
} | ||
const query = `?space=${space}&group=${group}&env=${env || ''}`; | ||
// 考虑到实际使用场景不会有那么多在线的环境,这里不使用 LRU cache | ||
let status = this._statusCache[query]; | ||
if (!status) { | ||
const options = parseUrl(`${this._servers[0].statusUrl}${query}`); | ||
options.headers = { 'x-nohost-servers': base64 }; | ||
status = getJSON(options); | ||
status.initTime = Math.floor(Date.now() / ONE_MINU); | ||
this._statusCache[query] = status; | ||
} | ||
return status; | ||
} | ||
update(servers) { | ||
@@ -30,5 +82,5 @@ this._servers = servers || []; | ||
clearTimeout(this._timer); | ||
this._pending = getWorkers(this._servers); | ||
this._pending.then((workers) => { | ||
this._workers = workers; | ||
this._pending = getServers(this._servers); | ||
this._pending.then((result) => { | ||
this._result = result || ''; | ||
this._pending = null; | ||
@@ -45,26 +97,46 @@ if (this._waiting) { | ||
async proxy(req, res) { | ||
const space = req.headers[SPACE_NAME]; | ||
const group = req.headers[GROUP_NAME]; | ||
const name = req.headers[ENV_NAME]; | ||
const workers = await this.getWorkers(); | ||
if (!workers.totalLen || req.socket.destroyed) { | ||
destroy(req); | ||
destroy(res); | ||
async proxy(req, res, callback) { | ||
if (typeof res === 'function') { | ||
callback = res; | ||
res = null; | ||
} | ||
if (this._nohostAddress) { | ||
if (req.isUIRequest) { | ||
req.headers['x-whistle-nohost-ui'] = 1; | ||
} | ||
if (typeof callback === 'function') { | ||
callback(this._nohostAddress); | ||
} | ||
return connect(this._nohostAddress, req, res); | ||
} | ||
let result = this._result; | ||
if (result == null) { | ||
result = await this._pending; | ||
} | ||
if (!result || isFinished(req)) { | ||
if (!res) { | ||
throw new Error(result ? 'request is finished.' : 'not found nohost server.'); | ||
} | ||
req.destroy(); | ||
return; | ||
} | ||
const { headers } = req; | ||
const space = headers[SPACE_NAME]; | ||
const group = headers[GROUP_NAME]; | ||
const name = headers[ENV_NAME]; | ||
if (!space || !group) { | ||
if (req.isUIRequest) { | ||
return destroy(req); | ||
if (!res) { | ||
throw new Error('space & group is required.'); | ||
} | ||
req.destroy(); | ||
return; | ||
} | ||
const servers = workers._servers; | ||
const i = index++ % servers.totalLen; | ||
headers[ENV_HEAD] = `${i}`; | ||
return proxyToNohost(servers[i], req, res); | ||
return this._connectDefault(req, res, callback); | ||
} | ||
const hash = parseInt(crc32(`${space}/${group}/${name || ''}`), 16) % workers.totalLen; | ||
const env = `$${hash}`; | ||
headers[ENV_HEAD] = env; | ||
const status = await this._getStatus(space, group, name); | ||
if (!status.host) { | ||
return this._connectDefault(req, res, callback); | ||
} | ||
const env = `$${status.index}`; | ||
if (req.isUIRequest) { | ||
@@ -80,8 +152,12 @@ headers['x-whistle-nohost-ui'] = 1; | ||
} | ||
return proxyToNohost(workers[hash], req, res); | ||
headers[ENV_HEAD] = env; | ||
if (typeof callback === 'function') { | ||
callback(status); | ||
} | ||
return connect(status, req, res); | ||
} | ||
proxyUI(req, res) { | ||
proxyUI(req, res, callback) { | ||
req.isUIRequest = true; | ||
return this.proxy(req, res); | ||
return this.proxy(req, res, callback); | ||
} | ||
@@ -93,6 +169,6 @@ } | ||
Router.ENV_NAME = ENV_NAME; | ||
Router.NOHOST_RULE = 'x-whistle-nohost-rule'; | ||
Router.NOHOST_VALUE = 'x-whistle-nohost-value'; | ||
Router.NOHOST_RULE = 'x-whistle-rule-value'; | ||
Router.NOHOST_VALUE = 'x-whistle-key-value'; | ||
Router.CLIENT_ID = 'x-whistle-client-id'; | ||
Router.CLIENT_ID_FILTER = 'x-whistle-filter-client-id'; | ||
module.exports = Router; |
@@ -6,3 +6,2 @@ const http = require('http'); | ||
const TIMEOUT = 2000; | ||
let workersCache = {}; | ||
@@ -13,7 +12,6 @@ const getBody = (url) => { | ||
const timer = setTimeout(() => { | ||
client.abort(); // eslint-disable-line | ||
client.destroy(); // eslint-disable-line | ||
}, TIMEOUT); | ||
const client = http.get(url, (res) => { | ||
clearTimeout(timer); | ||
res.on('error', reject); | ||
if (res.statusCode !== 200) { | ||
@@ -26,5 +24,3 @@ return reject(new Error(`Response status code: ${res.statusCode}`)); | ||
}); | ||
res.on('end', () => { | ||
resolve(body); | ||
}); | ||
res.on('end', () => resolve(body)); | ||
}); | ||
@@ -48,2 +44,4 @@ client.on('error', reject); | ||
exports.getJSON = (options) => getJSON(options, RETRY_COUNT); | ||
const getWorkerNum = async ({ statusUrl }) => { | ||
@@ -78,2 +76,31 @@ try { | ||
exports.isFinished = (req) => { | ||
if (req.finished) { | ||
return true; | ||
} | ||
const { socket } = req; | ||
return socket && (socket.destroyed || !socket.writable); | ||
}; | ||
exports.getServers = async (servers) => { | ||
servers = parseServers(servers); | ||
if (!servers.length) { | ||
return {}; | ||
} | ||
const usableServers = []; | ||
const list = []; | ||
await Promise.all(servers.map(async (server) => { | ||
const workerNum = await getWorkerNum(server); | ||
if (workerNum) { | ||
server.workerNum = workerNum; | ||
usableServers.push(server); | ||
list.push(`${server.host}:${server.port}/${workerNum}`); | ||
} | ||
})); | ||
return list.length && { | ||
servers: usableServers, | ||
base64: Buffer.from(list.join()).toString('base64'), | ||
}; | ||
}; | ||
exports.getWorkers = async (servers) => { | ||
@@ -84,19 +111,19 @@ servers = parseServers(servers); | ||
} | ||
const result = await Promise.all(servers.map(getWorkerNum)); | ||
const workers = []; | ||
const usableServers = []; | ||
const cache = {}; | ||
servers.forEach(({ host, port }, i) => { | ||
const key = `${host}:${port}`; | ||
const workerNum = result[i] || workersCache[key]; | ||
const base64 = []; | ||
await Promise.all(servers.map(async (server) => { | ||
const workerNum = await getWorkerNum(server); | ||
if (workerNum) { | ||
cache[key] = workerNum; | ||
server.workerNum = workerNum; | ||
const { host, port } = server; | ||
const option = { host, port }; | ||
usableServers.push(option); | ||
for (i = 0; i < workerNum; i++) { | ||
base64.push(`${host}:${port}/${workerNum}`); | ||
for (let i = 0; i < workerNum; i++) { | ||
workers.push(option); | ||
} | ||
} | ||
}); | ||
workersCache = cache; | ||
})); | ||
usableServers._base64 = Buffer.from(base64.join()).toString('base64'); | ||
workers.totalLen = workers.length; | ||
@@ -103,0 +130,0 @@ usableServers.totalLen = usableServers.length; |
{ | ||
"name": "@nohost/router", | ||
"version": "0.5.6", | ||
"version": "0.6.0", | ||
"description": "Nohost cluster router", | ||
@@ -35,2 +35,3 @@ "main": "lib/", | ||
"dependencies": { | ||
"@nohost/connect": "^1.2.1", | ||
"crc32": "^0.2.2", | ||
@@ -37,0 +38,0 @@ "hparser": "^0.3.0" |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Network access
Supply chain riskThis module accesses the network.
Found 1 instance in 1 package
2
13940
3
346
+ Added@nohost/connect@^1.2.1
+ Added@nohost/connect@1.4.0(transitive)