Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

proteus-cluster

Package Overview
Dependencies
Maintainers
1
Versions
8
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

proteus-cluster - npm Package Compare versions

Comparing version 1.0.4 to 1.0.5

347

lib/cluster.js

@@ -8,7 +8,7 @@ /**

var fs = require('fs')
, os = require('os')
, cluster = require('cluster')
, loggers = require('proteus-logger')
, logger = loggers.get('cluster')
;
, os = require('os')
, cluster = require('cluster')
, loggers = require('proteus-logger')
, logger = loggers.get('cluster')
;

@@ -25,6 +25,3 @@ // exports

// const
var CLUSTER_MODE_RUNNING = 0x01
, CLUSTER_MODE_RESTARTING = 0x02
, CLUSTER_MODE_SHUTTING = 0x03
, DEFAULT_DISCONNECT_TIMEOUT = 10000
var DEFAULT_DISCONNECT_TIMEOUT = 10000
, DEFAULT_MAX_FORK_COUNT = 100

@@ -34,10 +31,11 @@ ;

// variables
var _expiredQueue
, _clusterStatus = CLUSTER_MODE_SHUTTING
var _pidfile
, _messageListeners = {}
, _expiredQueue = []
, _disconnectTimeout
, _disconnectTimer
, _messageListeners = {}
, _disconnectTimer = {}
, _maxForkCount = DEFAULT_MAX_FORK_COUNT
, _forkedCount
, _pidfile
, _forkedCount = 0
, _isShutting = false
, _isRestarting = false
;

@@ -50,55 +48,90 @@

function createCluster(conf) {
// only for master process
if (!cluster.isMaster) {
throw new Error('cluster module must be executed from master process.');
}
// if not in CLUSTER_MODE_SHUTTING mode
if (_clusterStatus !== CLUSTER_MODE_SHUTTING) {
return;
}
logger.debug('starting master ' + process.pid);
logger.info('starting master ' + process.pid);
// setup event will be called only once
cluster.on('setup', function(){
// manage expired workers
_clusterStatus = CLUSTER_MODE_RUNNING;
_disconnectTimeout = conf.disconnectTimeout || DEFAULT_DISCONNECT_TIMEOUT;
_expiredQueue = [];
_disconnectTimer = {};
_maxForkCount = conf.maxForkCount || DEFAULT_MAX_FORK_COUNT;
_forkedCount = 0;
_disconnectTimeout = conf.disconnectTimeout || DEFAULT_DISCONNECT_TIMEOUT;
_maxForkCount = conf.maxForkCount || DEFAULT_MAX_FORK_COUNT;
logger.info('disconnect timeout : ' + _disconnectTimeout);
logger.info('max fork count : ' + _maxForkCount);
// start all workers
var workerNum = conf.worker || os.cpus().length;
for (var i = 0; i < workerNum; i++) {
fork();
}
// create pid file
if (conf.pid) {
_pidfile = conf.pid;
logger.info('create pid file : ' + _pidfile);
fs.writeFileSync(_pidfile, String(process.pid), 'utf8');
}
// initialize API listener
if ('api' in conf) {
setupAPI(conf.api);
}
// when worker has been exited
cluster.on('exit', function(worker, code, signal) {
logger.debug('worker exited', { pid: worker.process.pid, id: worker.id });
// clear disconnect timer if exists
if (_disconnectTimer[worker.id]) {
clearTimeout(_disconnectTimer[worker.id]);
delete _disconnectTimer[worker.id];
}
// if worker was set as restart, or killed accidentally fork/shutdown
if (worker.restart || !worker.suicide) {
// if worker was forked less than max count, fork again
if (_forkedCount < _maxForkCount) {
return fork();
}
logger.info('too many forked workers. going to shutdown.');
return shutdown();
}
// if worker was killed and not set as restart, do nothing
});
// attache signals
process.on('SIGUSR1', forceShutdown); // restart workers
process.on('SIGUSR2', restart); // restart workers
process.on('SIGHUP', function() {}); // ignore SIGHUP for background processing
process.on('SIGINT', shutdown); // exit with signal 0
process.on('SIGTERM', shutdown); // exit with signal 0
});
// set worker settings
cluster.setupMaster({
var setup = {
exec : conf.exec,
args : conf.args
});
};
logger.info('setup master', JSON.stringify(setup));
// start all workers
var workerNum = conf.worker || os.cpus().length;
for (var i = 0; i < workerNum; i++) {
fork();
}
cluster.setupMaster(setup);
// create pid file
if (conf.pid) {
_pidfile = conf.pid;
fs.writeFileSync(_pidfile, String(process.pid), 'utf8');
}
}
// initialize API listener
if ('api' in conf) {
function setupAPI(api) {
var listen = api.listen;
var port = api.port || 8111;
var listen = conf.api.listen;
var port = conf.api.port || 8111;
var http = require('http');
var http = require('http');
// create server
var server = http.createServer(function(req, res) {
logger.info('HTTP command received', {method:req.method, url:req.url});
if (req.method === 'POST' && req.url === '/send') {
var body = '';
req.setEncoding('utf8');
req
// create server
var server = http.createServer(function(req, res) {
logger.debug('HTTP command received', {method:req.method, url:req.url});
if (req.method === 'POST' && req.url === '/send') {
var body = '';
req.setEncoding('utf8');
req
.on('data', function(data) {

@@ -114,56 +147,19 @@ body += data;

// restart workers
} else if (req.method === 'GET' && req.url === '/restart') {
restart();
res.statusCode = 200;
res.end('OK');
} else {
res.statusCode = 404;
res.end('NOT FOUND');
}
});
if (listen) {
server.listen(port, listen, function() {
logger.info('Cluster HTTP-API is listening on ' + listen + ':' + port);
});
} else if (req.method === 'GET' && req.url === '/restart') {
restart();
res.statusCode = 200;
res.end('OK');
} else {
server.listen(port, function() {
logger.info('Cluster HTTP-API is listening on 0.0.0.0:' + listen);
});
res.statusCode = 404;
res.end('NOT FOUND');
}
}
// attache signals
process.on('SIGUSR2', restart); // restart workers
process.on('SIGHUP', function() {}); // ignore SIGHUP for background processing
process.on('SIGINT', shutdown); // exit with signal 0
process.on('SIGTERM', shutdown); // exit with signal 0
// TODO can't capture SIGKILL event
// process.on('SIGKILL', forceShutdown); // exit with signal 1
}
/**
* disconnect single expired worker dequeued from expiredQueue
*/
function disconnectExpiredWorker() {
if (_expiredQueue.length > 0) {
var expiredWorkerId = _expiredQueue.shift();
var expiredWorker = cluster.workers[expiredWorkerId];
if (!expiredWorker) {
// check if expired worker still exists
return disconnectExpiredWorker();
}
// set disconnect timer
_disconnectTimer[expiredWorkerId] = setTimeout(function() {
logger.info('disconnect timeout. destroying worker', { pid: expiredWorker.process.pid, id: expiredWorker.id });
(expiredWorker.kill) ? expiredWorker.kill() : expiredWorker.destroy();
}, _disconnectTimeout);
logger.info('going to disconnect worker', { pid: expiredWorker.process.pid, id: expiredWorker.id });
// mark restart
expiredWorker.restart = true;
expiredWorker.disconnect();
});
if (listen) {
server.listen(port, listen, function() {
logger.info('Cluster HTTP-API is listening on ' + listen + ':' + port);
});
} else {
// set to CLUSTER_MODE_RUNNING mode if all expired workers has been disconnected
_clusterStatus = CLUSTER_MODE_RUNNING;
server.listen(port, function() {
logger.info('Cluster HTTP-API is listening on 0.0.0.0:' + listen);
});
}

@@ -177,29 +173,11 @@ }

// start worker from this file
// start worker
var worker = cluster.fork();
logger.info('add worker', { pid:worker.process.pid, id:worker.id });
_forkedCount++;
// when worker disconnected
logger.debug('fork new worker', { pid:worker.process.pid, id:worker.id });
// when worker has been disconnected
worker.on('disconnect', function() {
logger.info('worker disconnect', { pid: worker.process.pid, id: worker.id });
// clear disconnect timer if exists
if (_disconnectTimer[worker.id]) {
clearTimeout(_disconnectTimer[worker.id]);
delete _disconnectTimer[worker.id];
}
// fork new worker if server is still in CLUSTER_MODE_RUNNING or CLUSTER_MODE_RESTARTING mode
if (_clusterStatus === CLUSTER_MODE_RUNNING || _clusterStatus === CLUSTER_MODE_RESTARTING) {
// fork is called too many times. shtudown cluster.
if (_forkedCount > _maxForkCount) {
logger.info('too many forked workers. going to shutdown.');
shutdown();
} else if (worker.suicide === true && !worker.restart) {
logger.info('worker suicided', { id: worker.id });
} else {
fork();
}
}
logger.debug('worker disconnected', { pid: worker.process.pid, id: worker.id });
});

@@ -226,11 +204,46 @@

/**
* shutdown cluster gracefully
*/
function shutdown() {
if (_isShutting) return;
_isShutting = true;
logger.info('shutdown cluster gracefully');
// disconnect all workers
for (var id in cluster.workers) {
var worker = cluster.workers[id];
logger.debug('going to disconnect worker', { pid: worker.process.pid, id: worker.id });
// mark don't restart
worker.restart = false;
// set disconnect timer
_disconnectTimer[id] = setTimeout(forceDisconnect(worker), _disconnectTimeout);
}
// if all worker has been disconnected
cluster.disconnect(function(){
logger.debug('all workers has been disconnected');
if (_pidfile) {
try {
fs.unlinkSync(_pidfile);
} catch (e) {
}
}
logger.debug('process exit with code 0');
process.exit(0);
});
}
/**
* restart all workers
*/
function restart() {
// if not in CLUSTER_MODE_RUNNING mode
if (_clusterStatus !== CLUSTER_MODE_RUNNING) {
return;
}
_clusterStatus = CLUSTER_MODE_RESTARTING;
if (_isRestarting || _isShutting) return;
_isRestarting = true;
logger.info('restarting workers');

@@ -246,41 +259,36 @@

function forceDisconnect(worker) {
return function() {
logger.debug('disconnect timeout. destroying worker', { pid: worker.process.pid, id: worker.id });
if (worker.kill) {
worker.kill('SIGKILL');
} else {
worker.destroy('SIGKILL');
}
};
}
/**
* shutdown cluster gracefully
* disconnect single expired worker dequeued from expiredQueue
*/
function shutdown() {
// if not in CLUSTER_MODE_RUNNING mode
if (_clusterStatus !== CLUSTER_MODE_RUNNING && _clusterStatus !== CLUSTER_MODE_RESTARTING) {
return;
}
_clusterStatus = CLUSTER_MODE_SHUTTING;
function disconnectExpiredWorker() {
if (_expiredQueue.length > 0) {
var expiredWorkerId = _expiredQueue.shift();
var expiredWorker = cluster.workers[expiredWorkerId];
if (!expiredWorker) {
// check if expired worker still exists
return disconnectExpiredWorker();
}
logger.info('shutdown cluster gracefully');
logger.debug('going to disconnect worker', { pid: expiredWorker.process.pid, id: expiredWorker.id });
// when worker has been disconnected
cluster.on('disconnect', function(worker) {
if (_expiredQueue.length > 0) {
// disconnect expired worker if still exists
disconnectExpiredWorker();
return;
}
// disconnect master if all workers has been disconnected
cluster.disconnect(function() {
logger.info('all workers has been disconnected');
if (_pidfile) {
try {
fs.unlinkSync(_pidfile);
} catch (e) {
}
}
logger.info('process exit with code 0');
process.exit(0);
})
});
// set disconnect timer
_disconnectTimer[expiredWorkerId] = setTimeout(forceDisconnect(expiredWorker), _disconnectTimeout);
// add all running workers into expired queue
for (var id in cluster.workers) {
_expiredQueue.push(id);
// mark restart
expiredWorker.restart = true;
expiredWorker.disconnect();
} else {
_isRestarting = false;
}
// disconnect expired worker
disconnectExpiredWorker();
}

@@ -292,3 +300,2 @@

function forceShutdown() {
_clusterStatus = CLUSTER_MODE_SHUTTING;

@@ -338,3 +345,1 @@ logger.info('shutdown cluster forcefully');

}
{
"name": "proteus-cluster",
"version": "1.0.4",
"version": "1.0.5",
"scripts": {

@@ -5,0 +5,0 @@ "start": "node app",

@@ -36,11 +36,16 @@ proteus-cluster

- pid
- process id file [/tmp/proteus-cluster.pid]
- process id file
- exec
- startup JS file for workers [__filename]
- args
- arguments to be sent when calling startup JS file (array)
- disconnectTimeout
- timeout milliseconds for worker to wait for graceful shutdown [120000]
- timeout milliseconds for worker to wait for graceful shutdown [10000]
- maxForkCount
- max fork count for worker (in case of endless restart) [100]
- args
- arguments for worker (array)
- api
- listen
- hostname to accept connections for web-based API
- port
- port to accept connections for web-based API [8111]

@@ -55,6 +60,12 @@

var conf = {};
conf.worker = 4;
conf.pid = '/tmp/proteus.pid';
conf.exec = 'worker.js';
conf.worker = 4;
conf.pid = '/tmp/proteus.pid';
conf.exec = 'worker.js';
conf.args = ['-c', 'config.json'];
conf.disconnectTimeout = 5000;
conf.maxForkCount = 30;
conf.api = {
listen : '0.0.0.0',
port : 8111
});
cluster(conf);

@@ -91,3 +102,3 @@ ```

run by API
run from program

@@ -98,3 +109,9 @@ ```js

run by calling API
```shell
curl http://localhost:8111/restart
```
## graceful shutdown

@@ -113,3 +130,3 @@

run from API
run from program

@@ -120,3 +137,22 @@ ```js

## force shutdown
run from shell
```shell
$ node cluster.js
$ ps ax | grep node | grep -v 'grep'
2051 s000 S+ 0:00.09 node cluster.js
2052 s000 R+ 0:00.42 node /proteus-cluster/test/worker.js
2053 s000 R+ 0:00.42 node /proteus-cluster/test/worker.js
$ kill -SIGUSR1 2051
```
run from program
```js
cluster.forceShutdown();
```
## send messages from worker to master

@@ -184,13 +220,17 @@

- pid
- プロセスID [/tmp/proteus-cluster.pid]
- プロセスID
- exec
- workerの起動JSファイル [__filename]
- args
- workerの起動JSファイル呼び出し時に渡す引数
- disconnectTimeout
- workerを安全に停止するためのタイムアウト時間。時間を過ぎると強制停止される。 [120000]
- workerを安全に停止するためのタイムアウト時間。時間を過ぎると強制停止される。 [10000]
- maxForkCount
- workerをforkする回数の上限値(永久に再起動を繰り返さないための対応) [100]
- args
- workerに渡す引数(配列)
- api
- listen
- WebベースのAPIを起動する際のホスト名
- port
- WebベースのAPIを起動する際のポート番号 [8111]
## cluster起動

@@ -203,6 +243,12 @@

var conf = {};
conf.worker = 4;
conf.pid = '/tmp/proteus.pid';
conf.exec = 'worker.js';
conf.worker = 4;
conf.pid = '/tmp/proteus.pid';
conf.exec = 'worker.js';
conf.args = ['-c', 'config.json'];
conf.disconnectTimeout = 5000;
conf.maxForkCount = 30;
conf.api = {
listen : '0.0.0.0',
port : 8111
});
cluster(conf);

@@ -240,3 +286,3 @@ ```

API実行
プログラムから実行

@@ -247,3 +293,8 @@ ```js

APIを呼び出して実行
```shell
curl http://localhost:8111/restart
```
## graceful shutdown

@@ -262,3 +313,3 @@

API実行
プログラムから実行

@@ -269,3 +320,22 @@ ```js

## force shutdown
シェルから実行
```shell
$ node cluster.js
$ ps ax | grep node | grep -v 'grep'
2051 s000 S+ 0:00.09 node cluster.js
2052 s000 R+ 0:00.42 node /proteus-cluster/test/worker.js
2053 s000 R+ 0:00.42 node /proteus-cluster/test/worker.js
$ kill -SIGUSR1 2051
```
プログラムから実行
```js
cluster.forceShutdown();
```
## workerからmasterへのメッセージ送信

@@ -272,0 +342,0 @@

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc