proteus-cluster
Advanced tools
Comparing version 1.1.0 to 1.1.2
@@ -8,7 +8,7 @@ /** | ||
var fs = require('fs') | ||
, os = require('os') | ||
, cluster = require('cluster') | ||
, loggers = require('proteus-logger') | ||
, logger = loggers.get('cluster') | ||
; | ||
, os = require('os') | ||
, cluster = require('cluster') | ||
, loggers = require('proteus-logger') | ||
, logger = loggers.get('cluster') | ||
; | ||
@@ -26,16 +26,16 @@ // exports | ||
var DEFAULT_DISCONNECT_TIMEOUT = 10000 | ||
, DEFAULT_MAX_FORK_COUNT = 100 | ||
; | ||
, DEFAULT_MAX_FORK_COUNT = 100 | ||
; | ||
// variables | ||
var _pidfile | ||
, _messageListeners = {} | ||
, _expiredQueue = [] | ||
, _disconnectTimeout | ||
, _disconnectTimer = {} | ||
, _maxForkCount = DEFAULT_MAX_FORK_COUNT | ||
, _forkedCount = 0 | ||
, _isShutting = false | ||
, _isRestarting = false | ||
; | ||
, _messageListeners = {} | ||
, _expiredQueue = [] | ||
, _disconnectTimeout | ||
, _disconnectTimer = {} | ||
, _maxForkCount = DEFAULT_MAX_FORK_COUNT | ||
, _forkedCount = 0 | ||
, _isShutting = false | ||
, _isRestarting = false | ||
; | ||
@@ -48,69 +48,69 @@ /** | ||
// set worker settings | ||
var setup = { | ||
exec : conf.exec, | ||
args : conf.args | ||
}; | ||
logger.info('setup master', JSON.stringify(setup)); | ||
// set worker settings | ||
var setup = { | ||
exec : conf.exec, | ||
args : conf.args | ||
}; | ||
logger.info('setup master', JSON.stringify(setup)); | ||
cluster.setupMaster(setup); | ||
cluster.setupMaster(setup); | ||
logger.debug('starting master ' + process.pid); | ||
logger.debug('starting master ' + process.pid); | ||
// setup event will be called only once | ||
_disconnectTimeout = conf.disconnectTimeout || DEFAULT_DISCONNECT_TIMEOUT; | ||
_maxForkCount = conf.maxForkCount || DEFAULT_MAX_FORK_COUNT; | ||
// setup event will be called only once | ||
_disconnectTimeout = conf.disconnectTimeout || DEFAULT_DISCONNECT_TIMEOUT; | ||
_maxForkCount = conf.maxForkCount || DEFAULT_MAX_FORK_COUNT; | ||
logger.info('disconnect timeout : ' + _disconnectTimeout); | ||
logger.info('max fork count : ' + _maxForkCount); | ||
logger.info('disconnect timeout : ' + _disconnectTimeout); | ||
logger.info('max fork count : ' + _maxForkCount); | ||
// start all workers | ||
var workerNum = conf.worker || os.cpus().length; | ||
for (var i = 0; i < workerNum; i++) { | ||
fork(); | ||
} | ||
// start all workers | ||
var workerNum = conf.worker || os.cpus().length; | ||
for (var i = 0; i < workerNum; i++) { | ||
fork(); | ||
} | ||
// create pid file | ||
if (conf.pid) { | ||
_pidfile = conf.pid; | ||
logger.info('create pid file : ' + _pidfile); | ||
// create pid file | ||
if (conf.pid) { | ||
_pidfile = conf.pid; | ||
logger.info('create pid file : ' + _pidfile); | ||
fs.writeFileSync(_pidfile, String(process.pid), 'utf8'); | ||
} | ||
fs.writeFileSync(_pidfile, String(process.pid), 'utf8'); | ||
} | ||
// initialize API listener | ||
if ('api' in conf) { | ||
setupAPI(conf.api); | ||
} | ||
// initialize API listener | ||
if ('api' in conf) { | ||
setupAPI(conf.api); | ||
} | ||
// when worker has been exited | ||
cluster.on('exit', function(worker, code, signal) { | ||
// when worker has been exited | ||
cluster.on('exit', function(worker, code, signal) { | ||
logger.debug('worker exited', { pid: worker.process.pid, id: worker.id }); | ||
logger.debug('worker exited', { pid: worker.process.pid, id: worker.id }); | ||
// clear disconnect timer if exists | ||
if (_disconnectTimer[worker.id]) { | ||
clearTimeout(_disconnectTimer[worker.id]); | ||
delete _disconnectTimer[worker.id]; | ||
} | ||
// clear disconnect timer if exists | ||
if (_disconnectTimer[worker.id]) { | ||
clearTimeout(_disconnectTimer[worker.id]); | ||
delete _disconnectTimer[worker.id]; | ||
} | ||
// if worker was set as restart, or killed accidentally fork/shutdown | ||
if (worker.restart || !worker.suicide) { | ||
// if worker was set as restart, or killed accidentally fork/shutdown | ||
if (worker.restart || !worker.suicide) { | ||
// if worker was forked less than max count, fork again | ||
if (_forkedCount < _maxForkCount) { | ||
return fork(); | ||
} | ||
logger.info('too many forked workers. going to shutdown.'); | ||
return shutdown(); | ||
} | ||
// if worker was forked less than max count, fork again | ||
if (_forkedCount < _maxForkCount) { | ||
return fork(); | ||
} | ||
logger.info('too many forked workers. going to shutdown.'); | ||
return shutdown(); | ||
} | ||
// if worker was killed and not set as restart, do nothing | ||
}); | ||
// if worker was killed and not set as restart, do nothing | ||
}); | ||
// attache signals | ||
process.on('SIGUSR2', restart); // restart workers | ||
process.on('SIGHUP', function() {}); // ignore SIGHUP for background processing | ||
process.on('SIGINT', shutdown); // exit with signal 0 | ||
process.on('SIGTERM', shutdown); // exit with signal 0 | ||
// attache signals | ||
process.on('SIGUSR2', restart); // restart workers | ||
process.on('SIGHUP', function() {}); // ignore SIGHUP for background processing | ||
process.on('SIGINT', shutdown); // exit with signal 0 | ||
process.on('SIGTERM', shutdown); // exit with signal 0 | ||
@@ -120,42 +120,97 @@ } | ||
function setupAPI(api) { | ||
var listen = api.listen; | ||
var port = api.port || 8111; | ||
var listen = api.listen; | ||
var port = api.port || 8111; | ||
var workerTimeout = api.workerTimeout || 5000; | ||
var http = require('http'); | ||
var http = require('http'); | ||
// create server | ||
var server = http.createServer(function(req, res) { | ||
logger.debug('HTTP command received', {method:req.method, url:req.url}); | ||
if (req.method === 'POST' && req.url === '/send') { | ||
var body = ''; | ||
req.setEncoding('utf8'); | ||
req | ||
.on('data', function(data) { | ||
body += data; | ||
}) | ||
.on('end', function() { | ||
var message = JSON.parse(body); | ||
sendMessage(message); | ||
res.statusCode = 200; | ||
res.end('OK'); | ||
}); | ||
// restart workers | ||
} else if (req.method === 'GET' && req.url === '/restart') { | ||
restart(); | ||
res.statusCode = 200; | ||
res.end('OK'); | ||
} else { | ||
res.statusCode = 404; | ||
res.end('NOT FOUND'); | ||
} | ||
}); | ||
if (listen) { | ||
server.listen(port, listen, function() { | ||
logger.info('Cluster HTTP-API is listening on ' + listen + ':' + port); | ||
}); | ||
} else { | ||
server.listen(port, function() { | ||
logger.info('Cluster HTTP-API is listening on 0.0.0.0:' + port); | ||
}); | ||
} | ||
// create server | ||
var server = http.createServer(function(req, res) { | ||
logger.debug('HTTP command received', {method:req.method, url:req.url}); | ||
var body = ''; | ||
var message; | ||
if (req.method === 'POST' && req.url === '/send') { | ||
req.setEncoding('utf8'); | ||
req | ||
.on('data', function(data) { | ||
body += data; | ||
}) | ||
.on('end', function() { | ||
try { | ||
message = JSON.parse(body); | ||
} catch (e) { | ||
res.statusCode = 400; | ||
return res.end('request body parse failure.'); | ||
} | ||
sendMessage(message); | ||
res.statusCode = 200; | ||
res.end('OK'); | ||
}); | ||
} else if (req.method === 'POST' && req.url === '/sync_send') { | ||
req.setEncoding('utf8'); | ||
req | ||
.on('data', function(data) { | ||
body += data; | ||
}) | ||
.on('end', function() { | ||
try { | ||
message = JSON.parse(body); | ||
} catch (e) { | ||
res.statusCode = 400; | ||
return res.end('request body parse failure.'); | ||
} | ||
var cmd = message && message.cmd; | ||
if (!cmd) { | ||
sendMessage(message); | ||
res.statusCode = 200; | ||
return res.end('OK'); | ||
} | ||
if (typeof _messageListeners[cmd] === 'function') { | ||
res.statusCode = 500; | ||
return res.end('server too busy.'); | ||
} | ||
var workerIds = Object.keys(cluster.workers); | ||
addMessageListener(cmd, function(msg, worker) { | ||
workerIds = workerIds.filter(function(v) { | ||
return v !== '' + worker.id; | ||
}); | ||
if (workerIds.length <= 0) { | ||
clearTimeout(timer); | ||
removeMessageListener(cmd); | ||
res.statusCode = 200; | ||
res.end('OK'); | ||
} | ||
}); | ||
sendMessage(message); | ||
var timer = setTimeout(function() { | ||
removeMessageListener(cmd); | ||
res.statusCode = 500; | ||
res.end('worker timeout.'); | ||
}, message.timeout || workerTimeout); | ||
}); | ||
} else if (req.method === 'GET' && req.url === '/restart') { | ||
// restart workers | ||
restart(); | ||
res.statusCode = 200; | ||
res.end('OK'); | ||
} else { | ||
res.statusCode = 404; | ||
res.end('NOT FOUND'); | ||
} | ||
}); | ||
if (listen) { | ||
server.listen(port, listen, function() { | ||
logger.info('Cluster HTTP-API is listening on ' + listen + ':' + port); | ||
}); | ||
} else { | ||
server.listen(port, function() { | ||
logger.info('Cluster HTTP-API is listening on 0.0.0.0:' + port); | ||
}); | ||
} | ||
} | ||
@@ -168,29 +223,29 @@ | ||
// start worker | ||
var worker = cluster.fork(); | ||
_forkedCount++; | ||
// start worker | ||
var worker = cluster.fork(); | ||
_forkedCount++; | ||
logger.debug('fork new worker', { pid:worker.process.pid, id:worker.id }); | ||
logger.debug('fork new worker', { pid:worker.process.pid, id:worker.id }); | ||
// when worker has been disconnected | ||
worker.on('disconnect', function() { | ||
logger.debug('worker disconnected', { pid: worker.process.pid, id: worker.id }); | ||
}); | ||
// when worker has been disconnected | ||
worker.on('disconnect', function() { | ||
logger.debug('worker disconnected', { pid: worker.process.pid, id: worker.id }); | ||
}); | ||
// when worker becomes active | ||
worker.on('online', function() { | ||
logger.info('worker becomes active', { pid: worker.process.pid, id: worker.id }); | ||
// disconnect expired worker | ||
disconnectExpiredWorker(); | ||
}); | ||
// when worker becomes active | ||
worker.on('online', function() { | ||
logger.info('worker becomes active', { pid: worker.process.pid, id: worker.id }); | ||
// disconnect expired worker | ||
disconnectExpiredWorker(); | ||
}); | ||
// when worker sends message | ||
worker.on('message', function(msg) { | ||
if (msg && 'cmd' in msg) { | ||
// if message listener is registered | ||
if (_messageListeners[msg.cmd]) { | ||
_messageListeners[msg.cmd](msg); | ||
} | ||
} | ||
}); | ||
// when worker sends message | ||
worker.on('message', function(msg) { | ||
if (msg && 'cmd' in msg) { | ||
// if message listener is registered | ||
if (_messageListeners[msg.cmd]) { | ||
_messageListeners[msg.cmd](msg, worker); | ||
} | ||
} | ||
}); | ||
} | ||
@@ -203,35 +258,35 @@ | ||
if (_isShutting) return; | ||
_isShutting = true; | ||
if (_isShutting) return; | ||
_isShutting = true; | ||
logger.info('shutdown cluster gracefully'); | ||
logger.info('shutdown cluster gracefully'); | ||
// disconnect all workers | ||
for (var id in cluster.workers) { | ||
var worker = cluster.workers[id]; | ||
// disconnect all workers | ||
for (var id in cluster.workers) { | ||
var worker = cluster.workers[id]; | ||
logger.debug('going to disconnect worker', { pid: worker.process.pid, id: worker.id }); | ||
logger.debug('going to disconnect worker', { pid: worker.process.pid, id: worker.id }); | ||
// mark don't restart | ||
worker.restart = false; | ||
// mark don't restart | ||
worker.restart = false; | ||
// set disconnect timer | ||
_disconnectTimer[id] = setTimeout(forceDisconnect(worker), _disconnectTimeout); | ||
} | ||
// set disconnect timer | ||
_disconnectTimer[id] = setTimeout(forceDisconnect(worker), _disconnectTimeout); | ||
} | ||
// if all worker has been disconnected | ||
cluster.disconnect(function(){ | ||
logger.debug('all workers has been disconnected'); | ||
if (_pidfile) { | ||
try { | ||
fs.unlinkSync(_pidfile); | ||
} catch (e) { | ||
} | ||
} | ||
// in case worker processes are still alive | ||
process.nextTick(function(){ | ||
logger.debug('process exit with code 0'); | ||
process.exit(0); | ||
}); | ||
}); | ||
// if all worker has been disconnected | ||
cluster.disconnect(function(){ | ||
logger.debug('all workers has been disconnected'); | ||
if (_pidfile) { | ||
try { | ||
fs.unlinkSync(_pidfile); | ||
} catch (e) { | ||
} | ||
} | ||
// in case worker processes are still alive | ||
process.nextTick(function(){ | ||
logger.debug('process exit with code 0'); | ||
process.exit(0); | ||
}); | ||
}); | ||
} | ||
@@ -244,24 +299,24 @@ | ||
if (_isRestarting || _isShutting) return; | ||
_isRestarting = true; | ||
if (_isRestarting || _isShutting) return; | ||
_isRestarting = true; | ||
logger.info('restarting workers'); | ||
logger.info('restarting workers'); | ||
// add all running workers into expired queue | ||
for (var id in cluster.workers) { | ||
_expiredQueue.push(id); | ||
} | ||
// disconnect expired worker | ||
disconnectExpiredWorker(); | ||
// add all running workers into expired queue | ||
for (var id in cluster.workers) { | ||
_expiredQueue.push(id); | ||
} | ||
// disconnect expired worker | ||
disconnectExpiredWorker(); | ||
} | ||
function forceDisconnect(worker) { | ||
return function() { | ||
logger.debug('disconnect timeout. destroying worker', { pid: worker.process.pid, id: worker.id }); | ||
if (worker.kill) { | ||
worker.kill('SIGKILL'); | ||
} else { | ||
worker.destroy('SIGKILL'); | ||
} | ||
}; | ||
return function() { | ||
logger.debug('disconnect timeout. destroying worker', { pid: worker.process.pid, id: worker.id }); | ||
if (worker.kill) { | ||
worker.kill('SIGKILL'); | ||
} else { | ||
worker.destroy('SIGKILL'); | ||
} | ||
}; | ||
} | ||
@@ -273,21 +328,21 @@ | ||
function disconnectExpiredWorker() { | ||
if (_expiredQueue.length > 0) { | ||
var expiredWorkerId = _expiredQueue.shift(); | ||
var expiredWorker = cluster.workers[expiredWorkerId]; | ||
if (!expiredWorker) { | ||
// check if expired worker still exists | ||
return disconnectExpiredWorker(); | ||
} | ||
if (_expiredQueue.length > 0) { | ||
var expiredWorkerId = _expiredQueue.shift(); | ||
var expiredWorker = cluster.workers[expiredWorkerId]; | ||
if (!expiredWorker) { | ||
// check if expired worker still exists | ||
return disconnectExpiredWorker(); | ||
} | ||
logger.debug('going to disconnect worker', { pid: expiredWorker.process.pid, id: expiredWorker.id }); | ||
logger.debug('going to disconnect worker', { pid: expiredWorker.process.pid, id: expiredWorker.id }); | ||
// set disconnect timer | ||
_disconnectTimer[expiredWorkerId] = setTimeout(forceDisconnect(expiredWorker), _disconnectTimeout); | ||
// set disconnect timer | ||
_disconnectTimer[expiredWorkerId] = setTimeout(forceDisconnect(expiredWorker), _disconnectTimeout); | ||
// mark restart | ||
expiredWorker.restart = true; | ||
expiredWorker.disconnect(); | ||
} else { | ||
_isRestarting = false; | ||
} | ||
// mark restart | ||
expiredWorker.restart = true; | ||
expiredWorker.disconnect(); | ||
} else { | ||
_isRestarting = false; | ||
} | ||
} | ||
@@ -300,14 +355,14 @@ | ||
logger.info('shutdown cluster forcefully'); | ||
logger.info('shutdown cluster forcefully'); | ||
for (var id in cluster.workers) { | ||
cluster.workers[id].process.kill('SIGKILL'); | ||
} | ||
if (_pidfile) { | ||
try { | ||
fs.unlinkSync(_pidfile); | ||
} catch (e) { | ||
} | ||
} | ||
process.exit(1); | ||
for (var id in cluster.workers) { | ||
cluster.workers[id].process.kill('SIGKILL'); | ||
} | ||
if (_pidfile) { | ||
try { | ||
fs.unlinkSync(_pidfile); | ||
} catch (e) { | ||
} | ||
} | ||
process.exit(1); | ||
} | ||
@@ -322,3 +377,3 @@ | ||
function addMessageListener(cmd, fn) { | ||
_messageListeners[cmd] = fn; | ||
_messageListeners[cmd] = fn; | ||
} | ||
@@ -331,3 +386,3 @@ | ||
function removeMessageListener(cmd) { | ||
delete _messageListeners[cmd]; | ||
delete _messageListeners[cmd]; | ||
} | ||
@@ -340,7 +395,7 @@ | ||
function sendMessage(obj) { | ||
if (cluster.isMaster) { | ||
for (var id in cluster.workers) { | ||
cluster.workers[id].send(obj); | ||
} | ||
} | ||
if (cluster.isMaster) { | ||
for (var id in cluster.workers) { | ||
cluster.workers[id].send(obj); | ||
} | ||
} | ||
} |
{ | ||
"name": "proteus-cluster", | ||
"version": "1.1.0", | ||
"version": "1.1.2", | ||
"scripts": { | ||
@@ -5,0 +5,0 @@ "start": "node app", |
@@ -93,3 +93,96 @@ 'use strict'; | ||
}); | ||
describe('send_sync', function() { | ||
it('send sync message from master using HTTP API', function(done) { | ||
logger.debug('[master] going to send sync message from master using HTTP API'); | ||
var data = JSON.stringify({ cmd: 'sync_ping', msg: 'sync message from master using HTTP API' }); | ||
var post = http.request({ | ||
host: 'localhost', | ||
port: 8881, | ||
path: '/sync_send', | ||
method: 'POST', | ||
headers: { | ||
'Content-Type': 'application/json', | ||
'Content-Length': data.length | ||
} | ||
}, function(res) { | ||
expect(200).to.eql(res.statusCode); | ||
res.setEncoding('utf-8'); | ||
res.on('data', function(chunk) { | ||
expect('OK').to.eql(chunk); | ||
}); | ||
res.on('end', function() { | ||
done(); | ||
}); | ||
}); | ||
post.write(data); | ||
post.end(); | ||
}); | ||
it('should be worker timeout', function(done) { | ||
logger.debug('[master] going to send sync message from master using HTTP API'); | ||
var data = JSON.stringify({ cmd: 'sync_ping', msg: 'sync message from master using HTTP API', timeout: 10 }); | ||
var post = http.request({ | ||
host: 'localhost', | ||
port: 8881, | ||
path: '/sync_send', | ||
method: 'POST', | ||
headers: { | ||
'Content-Type': 'application/json', | ||
'Content-Length': data.length | ||
} | ||
}, function(res) { | ||
expect(500).to.eql(res.statusCode); | ||
res.setEncoding('utf-8'); | ||
res.on('data', function(chunk) { | ||
expect('worker timeout.').to.eql(chunk); | ||
}); | ||
res.on('end', function() { | ||
done(); | ||
}); | ||
}); | ||
post.write(data); | ||
post.end(); | ||
}); | ||
it('should be server too busy', function(done) { | ||
logger.debug('[master] going to send sync message from master using HTTP API'); | ||
var data = JSON.stringify({ cmd: 'sync_ping', msg: 'sync message from master using HTTP API' }); | ||
var options = { | ||
host: 'localhost', | ||
port: 8881, | ||
path: '/sync_send', | ||
method: 'POST', | ||
headers: { | ||
'Content-Type': 'application/json', | ||
'Content-Length': data.length | ||
} | ||
}; | ||
var post1 = http.request(options, function(res) { | ||
expect(200).to.eql(res.statusCode); | ||
}); | ||
post1.write(data); | ||
post1.end(); | ||
setTimeout(function() { | ||
var post2 = http.request(options, function(res) { | ||
expect(500).to.eql(res.statusCode); | ||
res.setEncoding('utf-8'); | ||
res.on('data', function(chunk) { | ||
expect('server too busy.').to.eql(chunk); | ||
}); | ||
res.on('end', function() { | ||
done(); | ||
}); | ||
}); | ||
post2.write(data); | ||
post2.end(); | ||
}, 10); | ||
}); | ||
}); | ||
}); | ||
}); |
@@ -37,2 +37,15 @@ 'use strict'; | ||
function sendSyncPing() { | ||
try { | ||
logger.debug('[worker] going to send message from worker'); | ||
setTimeout(function() { | ||
process.send({cmd: 'sync_ping', msg: 'message from worker ' + process.pid}); | ||
process.exit(0); | ||
}, 100); | ||
} catch(e){ | ||
process.exit(0); | ||
} | ||
} | ||
process.on('message', function(obj) { | ||
@@ -42,3 +55,5 @@ logger.debug('[worker] message received : ' + JSON.stringify(obj)); | ||
sendPong(); | ||
} else if (obj.cmd === 'sync_ping') { | ||
sendSyncPing(); | ||
} | ||
}); |
New author
Supply chain riskA new npm collaborator published a version of the package for the first time. New collaborators are usually benign additions to a project, but do indicate a change to the security surface area of a package.
Found 1 instance in 1 package
24908
554
2