Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

proteus-cluster

Package Overview
Dependencies
Maintainers
3
Versions
8
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

proteus-cluster - npm Package Compare versions

Comparing version 1.1.0 to 1.1.2

469

lib/cluster.js

@@ -8,7 +8,7 @@ /**

var fs = require('fs')
, os = require('os')
, cluster = require('cluster')
, loggers = require('proteus-logger')
, logger = loggers.get('cluster')
;
, os = require('os')
, cluster = require('cluster')
, loggers = require('proteus-logger')
, logger = loggers.get('cluster')
;

@@ -26,16 +26,16 @@ // exports

var DEFAULT_DISCONNECT_TIMEOUT = 10000
, DEFAULT_MAX_FORK_COUNT = 100
;
, DEFAULT_MAX_FORK_COUNT = 100
;
// variables
var _pidfile
, _messageListeners = {}
, _expiredQueue = []
, _disconnectTimeout
, _disconnectTimer = {}
, _maxForkCount = DEFAULT_MAX_FORK_COUNT
, _forkedCount = 0
, _isShutting = false
, _isRestarting = false
;
, _messageListeners = {}
, _expiredQueue = []
, _disconnectTimeout
, _disconnectTimer = {}
, _maxForkCount = DEFAULT_MAX_FORK_COUNT
, _forkedCount = 0
, _isShutting = false
, _isRestarting = false
;

@@ -48,69 +48,69 @@ /**

// set worker settings
var setup = {
exec : conf.exec,
args : conf.args
};
logger.info('setup master', JSON.stringify(setup));
// set worker settings
var setup = {
exec : conf.exec,
args : conf.args
};
logger.info('setup master', JSON.stringify(setup));
cluster.setupMaster(setup);
cluster.setupMaster(setup);
logger.debug('starting master ' + process.pid);
logger.debug('starting master ' + process.pid);
// setup event will be called only once
_disconnectTimeout = conf.disconnectTimeout || DEFAULT_DISCONNECT_TIMEOUT;
_maxForkCount = conf.maxForkCount || DEFAULT_MAX_FORK_COUNT;
// setup event will be called only once
_disconnectTimeout = conf.disconnectTimeout || DEFAULT_DISCONNECT_TIMEOUT;
_maxForkCount = conf.maxForkCount || DEFAULT_MAX_FORK_COUNT;
logger.info('disconnect timeout : ' + _disconnectTimeout);
logger.info('max fork count : ' + _maxForkCount);
logger.info('disconnect timeout : ' + _disconnectTimeout);
logger.info('max fork count : ' + _maxForkCount);
// start all workers
var workerNum = conf.worker || os.cpus().length;
for (var i = 0; i < workerNum; i++) {
fork();
}
// start all workers
var workerNum = conf.worker || os.cpus().length;
for (var i = 0; i < workerNum; i++) {
fork();
}
// create pid file
if (conf.pid) {
_pidfile = conf.pid;
logger.info('create pid file : ' + _pidfile);
// create pid file
if (conf.pid) {
_pidfile = conf.pid;
logger.info('create pid file : ' + _pidfile);
fs.writeFileSync(_pidfile, String(process.pid), 'utf8');
}
fs.writeFileSync(_pidfile, String(process.pid), 'utf8');
}
// initialize API listener
if ('api' in conf) {
setupAPI(conf.api);
}
// initialize API listener
if ('api' in conf) {
setupAPI(conf.api);
}
// when worker has been exited
cluster.on('exit', function(worker, code, signal) {
// when worker has been exited
cluster.on('exit', function(worker, code, signal) {
logger.debug('worker exited', { pid: worker.process.pid, id: worker.id });
logger.debug('worker exited', { pid: worker.process.pid, id: worker.id });
// clear disconnect timer if exists
if (_disconnectTimer[worker.id]) {
clearTimeout(_disconnectTimer[worker.id]);
delete _disconnectTimer[worker.id];
}
// clear disconnect timer if exists
if (_disconnectTimer[worker.id]) {
clearTimeout(_disconnectTimer[worker.id]);
delete _disconnectTimer[worker.id];
}
// if worker was set as restart, or killed accidentally fork/shutdown
if (worker.restart || !worker.suicide) {
// if worker was set as restart, or killed accidentally fork/shutdown
if (worker.restart || !worker.suicide) {
// if worker was forked less than max count, fork again
if (_forkedCount < _maxForkCount) {
return fork();
}
logger.info('too many forked workers. going to shutdown.');
return shutdown();
}
// if worker was forked less than max count, fork again
if (_forkedCount < _maxForkCount) {
return fork();
}
logger.info('too many forked workers. going to shutdown.');
return shutdown();
}
// if worker was killed and not set as restart, do nothing
});
// if worker was killed and not set as restart, do nothing
});
// attache signals
process.on('SIGUSR2', restart); // restart workers
process.on('SIGHUP', function() {}); // ignore SIGHUP for background processing
process.on('SIGINT', shutdown); // exit with signal 0
process.on('SIGTERM', shutdown); // exit with signal 0
// attache signals
process.on('SIGUSR2', restart); // restart workers
process.on('SIGHUP', function() {}); // ignore SIGHUP for background processing
process.on('SIGINT', shutdown); // exit with signal 0
process.on('SIGTERM', shutdown); // exit with signal 0

@@ -120,42 +120,97 @@ }

function setupAPI(api) {
var listen = api.listen;
var port = api.port || 8111;
var listen = api.listen;
var port = api.port || 8111;
var workerTimeout = api.workerTimeout || 5000;
var http = require('http');
var http = require('http');
// create server
var server = http.createServer(function(req, res) {
logger.debug('HTTP command received', {method:req.method, url:req.url});
if (req.method === 'POST' && req.url === '/send') {
var body = '';
req.setEncoding('utf8');
req
.on('data', function(data) {
body += data;
})
.on('end', function() {
var message = JSON.parse(body);
sendMessage(message);
res.statusCode = 200;
res.end('OK');
});
// restart workers
} else if (req.method === 'GET' && req.url === '/restart') {
restart();
res.statusCode = 200;
res.end('OK');
} else {
res.statusCode = 404;
res.end('NOT FOUND');
}
});
if (listen) {
server.listen(port, listen, function() {
logger.info('Cluster HTTP-API is listening on ' + listen + ':' + port);
});
} else {
server.listen(port, function() {
logger.info('Cluster HTTP-API is listening on 0.0.0.0:' + port);
});
}
// create server
var server = http.createServer(function(req, res) {
logger.debug('HTTP command received', {method:req.method, url:req.url});
var body = '';
var message;
if (req.method === 'POST' && req.url === '/send') {
req.setEncoding('utf8');
req
.on('data', function(data) {
body += data;
})
.on('end', function() {
try {
message = JSON.parse(body);
} catch (e) {
res.statusCode = 400;
return res.end('request body parse failure.');
}
sendMessage(message);
res.statusCode = 200;
res.end('OK');
});
} else if (req.method === 'POST' && req.url === '/sync_send') {
req.setEncoding('utf8');
req
.on('data', function(data) {
body += data;
})
.on('end', function() {
try {
message = JSON.parse(body);
} catch (e) {
res.statusCode = 400;
return res.end('request body parse failure.');
}
var cmd = message && message.cmd;
if (!cmd) {
sendMessage(message);
res.statusCode = 200;
return res.end('OK');
}
if (typeof _messageListeners[cmd] === 'function') {
res.statusCode = 500;
return res.end('server too busy.');
}
var workerIds = Object.keys(cluster.workers);
addMessageListener(cmd, function(msg, worker) {
workerIds = workerIds.filter(function(v) {
return v !== '' + worker.id;
});
if (workerIds.length <= 0) {
clearTimeout(timer);
removeMessageListener(cmd);
res.statusCode = 200;
res.end('OK');
}
});
sendMessage(message);
var timer = setTimeout(function() {
removeMessageListener(cmd);
res.statusCode = 500;
res.end('worker timeout.');
}, message.timeout || workerTimeout);
});
} else if (req.method === 'GET' && req.url === '/restart') {
// restart workers
restart();
res.statusCode = 200;
res.end('OK');
} else {
res.statusCode = 404;
res.end('NOT FOUND');
}
});
if (listen) {
server.listen(port, listen, function() {
logger.info('Cluster HTTP-API is listening on ' + listen + ':' + port);
});
} else {
server.listen(port, function() {
logger.info('Cluster HTTP-API is listening on 0.0.0.0:' + port);
});
}
}

@@ -168,29 +223,29 @@

// start worker
var worker = cluster.fork();
_forkedCount++;
// start worker
var worker = cluster.fork();
_forkedCount++;
logger.debug('fork new worker', { pid:worker.process.pid, id:worker.id });
logger.debug('fork new worker', { pid:worker.process.pid, id:worker.id });
// when worker has been disconnected
worker.on('disconnect', function() {
logger.debug('worker disconnected', { pid: worker.process.pid, id: worker.id });
});
// when worker has been disconnected
worker.on('disconnect', function() {
logger.debug('worker disconnected', { pid: worker.process.pid, id: worker.id });
});
// when worker becomes active
worker.on('online', function() {
logger.info('worker becomes active', { pid: worker.process.pid, id: worker.id });
// disconnect expired worker
disconnectExpiredWorker();
});
// when worker becomes active
worker.on('online', function() {
logger.info('worker becomes active', { pid: worker.process.pid, id: worker.id });
// disconnect expired worker
disconnectExpiredWorker();
});
// when worker sends message
worker.on('message', function(msg) {
if (msg && 'cmd' in msg) {
// if message listener is registered
if (_messageListeners[msg.cmd]) {
_messageListeners[msg.cmd](msg);
}
}
});
// when worker sends message
worker.on('message', function(msg) {
if (msg && 'cmd' in msg) {
// if message listener is registered
if (_messageListeners[msg.cmd]) {
_messageListeners[msg.cmd](msg, worker);
}
}
});
}

@@ -203,35 +258,35 @@

if (_isShutting) return;
_isShutting = true;
if (_isShutting) return;
_isShutting = true;
logger.info('shutdown cluster gracefully');
logger.info('shutdown cluster gracefully');
// disconnect all workers
for (var id in cluster.workers) {
var worker = cluster.workers[id];
// disconnect all workers
for (var id in cluster.workers) {
var worker = cluster.workers[id];
logger.debug('going to disconnect worker', { pid: worker.process.pid, id: worker.id });
logger.debug('going to disconnect worker', { pid: worker.process.pid, id: worker.id });
// mark don't restart
worker.restart = false;
// mark don't restart
worker.restart = false;
// set disconnect timer
_disconnectTimer[id] = setTimeout(forceDisconnect(worker), _disconnectTimeout);
}
// set disconnect timer
_disconnectTimer[id] = setTimeout(forceDisconnect(worker), _disconnectTimeout);
}
// if all worker has been disconnected
cluster.disconnect(function(){
logger.debug('all workers has been disconnected');
if (_pidfile) {
try {
fs.unlinkSync(_pidfile);
} catch (e) {
}
}
// in case worker processes are still alive
process.nextTick(function(){
logger.debug('process exit with code 0');
process.exit(0);
});
});
// if all worker has been disconnected
cluster.disconnect(function(){
logger.debug('all workers has been disconnected');
if (_pidfile) {
try {
fs.unlinkSync(_pidfile);
} catch (e) {
}
}
// in case worker processes are still alive
process.nextTick(function(){
logger.debug('process exit with code 0');
process.exit(0);
});
});
}

@@ -244,24 +299,24 @@

if (_isRestarting || _isShutting) return;
_isRestarting = true;
if (_isRestarting || _isShutting) return;
_isRestarting = true;
logger.info('restarting workers');
logger.info('restarting workers');
// add all running workers into expired queue
for (var id in cluster.workers) {
_expiredQueue.push(id);
}
// disconnect expired worker
disconnectExpiredWorker();
// add all running workers into expired queue
for (var id in cluster.workers) {
_expiredQueue.push(id);
}
// disconnect expired worker
disconnectExpiredWorker();
}
function forceDisconnect(worker) {
return function() {
logger.debug('disconnect timeout. destroying worker', { pid: worker.process.pid, id: worker.id });
if (worker.kill) {
worker.kill('SIGKILL');
} else {
worker.destroy('SIGKILL');
}
};
return function() {
logger.debug('disconnect timeout. destroying worker', { pid: worker.process.pid, id: worker.id });
if (worker.kill) {
worker.kill('SIGKILL');
} else {
worker.destroy('SIGKILL');
}
};
}

@@ -273,21 +328,21 @@

function disconnectExpiredWorker() {
if (_expiredQueue.length > 0) {
var expiredWorkerId = _expiredQueue.shift();
var expiredWorker = cluster.workers[expiredWorkerId];
if (!expiredWorker) {
// check if expired worker still exists
return disconnectExpiredWorker();
}
if (_expiredQueue.length > 0) {
var expiredWorkerId = _expiredQueue.shift();
var expiredWorker = cluster.workers[expiredWorkerId];
if (!expiredWorker) {
// check if expired worker still exists
return disconnectExpiredWorker();
}
logger.debug('going to disconnect worker', { pid: expiredWorker.process.pid, id: expiredWorker.id });
logger.debug('going to disconnect worker', { pid: expiredWorker.process.pid, id: expiredWorker.id });
// set disconnect timer
_disconnectTimer[expiredWorkerId] = setTimeout(forceDisconnect(expiredWorker), _disconnectTimeout);
// set disconnect timer
_disconnectTimer[expiredWorkerId] = setTimeout(forceDisconnect(expiredWorker), _disconnectTimeout);
// mark restart
expiredWorker.restart = true;
expiredWorker.disconnect();
} else {
_isRestarting = false;
}
// mark restart
expiredWorker.restart = true;
expiredWorker.disconnect();
} else {
_isRestarting = false;
}
}

@@ -300,14 +355,14 @@

logger.info('shutdown cluster forcefully');
logger.info('shutdown cluster forcefully');
for (var id in cluster.workers) {
cluster.workers[id].process.kill('SIGKILL');
}
if (_pidfile) {
try {
fs.unlinkSync(_pidfile);
} catch (e) {
}
}
process.exit(1);
for (var id in cluster.workers) {
cluster.workers[id].process.kill('SIGKILL');
}
if (_pidfile) {
try {
fs.unlinkSync(_pidfile);
} catch (e) {
}
}
process.exit(1);
}

@@ -322,3 +377,3 @@

function addMessageListener(cmd, fn) {
_messageListeners[cmd] = fn;
_messageListeners[cmd] = fn;
}

@@ -331,3 +386,3 @@

function removeMessageListener(cmd) {
delete _messageListeners[cmd];
delete _messageListeners[cmd];
}

@@ -340,7 +395,7 @@

function sendMessage(obj) {
if (cluster.isMaster) {
for (var id in cluster.workers) {
cluster.workers[id].send(obj);
}
}
if (cluster.isMaster) {
for (var id in cluster.workers) {
cluster.workers[id].send(obj);
}
}
}
{
"name": "proteus-cluster",
"version": "1.1.0",
"version": "1.1.2",
"scripts": {

@@ -5,0 +5,0 @@ "start": "node app",

@@ -93,3 +93,96 @@ 'use strict';

});
describe('send_sync', function() {
it('send sync message from master using HTTP API', function(done) {
logger.debug('[master] going to send sync message from master using HTTP API');
var data = JSON.stringify({ cmd: 'sync_ping', msg: 'sync message from master using HTTP API' });
var post = http.request({
host: 'localhost',
port: 8881,
path: '/sync_send',
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Content-Length': data.length
}
}, function(res) {
expect(200).to.eql(res.statusCode);
res.setEncoding('utf-8');
res.on('data', function(chunk) {
expect('OK').to.eql(chunk);
});
res.on('end', function() {
done();
});
});
post.write(data);
post.end();
});
it('should be worker timeout', function(done) {
logger.debug('[master] going to send sync message from master using HTTP API');
var data = JSON.stringify({ cmd: 'sync_ping', msg: 'sync message from master using HTTP API', timeout: 10 });
var post = http.request({
host: 'localhost',
port: 8881,
path: '/sync_send',
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Content-Length': data.length
}
}, function(res) {
expect(500).to.eql(res.statusCode);
res.setEncoding('utf-8');
res.on('data', function(chunk) {
expect('worker timeout.').to.eql(chunk);
});
res.on('end', function() {
done();
});
});
post.write(data);
post.end();
});
it('should be server too busy', function(done) {
logger.debug('[master] going to send sync message from master using HTTP API');
var data = JSON.stringify({ cmd: 'sync_ping', msg: 'sync message from master using HTTP API' });
var options = {
host: 'localhost',
port: 8881,
path: '/sync_send',
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Content-Length': data.length
}
};
var post1 = http.request(options, function(res) {
expect(200).to.eql(res.statusCode);
});
post1.write(data);
post1.end();
setTimeout(function() {
var post2 = http.request(options, function(res) {
expect(500).to.eql(res.statusCode);
res.setEncoding('utf-8');
res.on('data', function(chunk) {
expect('server too busy.').to.eql(chunk);
});
res.on('end', function() {
done();
});
});
post2.write(data);
post2.end();
}, 10);
});
});
});
});

@@ -37,2 +37,15 @@ 'use strict';

function sendSyncPing() {
try {
logger.debug('[worker] going to send message from worker');
setTimeout(function() {
process.send({cmd: 'sync_ping', msg: 'message from worker ' + process.pid});
process.exit(0);
}, 100);
} catch(e){
process.exit(0);
}
}
process.on('message', function(obj) {

@@ -42,3 +55,5 @@ logger.debug('[worker] message received : ' + JSON.stringify(obj));

sendPong();
} else if (obj.cmd === 'sync_ping') {
sendSyncPing();
}
});
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc