Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

cluster2

Package Overview
Dependencies
Maintainers
11
Versions
70
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

cluster2 - npm Package Compare versions

Comparing version 0.4.23 to 0.4.24

pids/master.10855.pdf

150

lib/misc.js

@@ -17,8 +17,6 @@ /*

var _ = require('underscore'),
fs = require('fs'),
util = require('util'),
var fs = require('fs'),
assert = require('assert'),
when = require('when'),
timeout = require('when/timeout'),
assert = require('assert');
timeout = require('when/timeout');

@@ -59,3 +57,3 @@ // Utility to ensure that certain directories exist

/*var tillPrevDeath = null;
var tillPrevDeath = null;

@@ -112,142 +110,2 @@ exports.deathQueue = function deathQueue(worker, emitter, success){

}
};*/
exports.safeKill = function(pid, signal, logger){
try{
process.kill(pid, signal);
return false;
}
catch(e){
//verify error is Error: ESRCH
logger.debug('[shutdown] safeKill received:%j', e);
return e.errno === 'ESRCH'; //no such process
}
};
exports.deathQueueGenerator = function(options){
options = options || {};
var tillPrevDeath = null,
queue = options.queue || [],
wait = options.timeout || 60000,
retry = options.retry || 3,
logger = options.logger || {
'debug' : function(){
console.log.apply(console, arguments);
}
};
return function deathQueue(worker, emitter, success){
assert.ok(worker);
assert.ok(emitter);
assert.ok(success);
var pid = worker.pid,
death = util.format('worker-%d-died', pid);
if(!_.contains(queue, pid)){
queue.push(pid);
var tillDeath = when.defer(),
afterDeath = null,
die = function die(retry){
if(!retry){
if(tillPrevDeath){
tillPrevDeath.reject(new Error('[deathQueue] failed after retries'));
}
tillPrevDeath = null;//reset
}
var successor = success(),
successorPid = successor.pid,
successorGuard = setTimeout(function onSuccessorTimeout(){
//handle error case of successor not 'listening' after started
exports.safeKill(pid, 'SIGTERM', logger);
logger.debug('[deathQueue] successor:%d did not start listening, kill by SIGTERM', successorPid);
//cancel onListening event handler of the dead successor
emitter.removeListener('listening', onSuccessorListening);
//retry of the 'die' process
die(retry - 1);
}, wait);
//when successor is in place, the old worker could be discontinued finally
emitter.on('listening', function onSuccessorListening(onboard){
if(successorPid !== onboard){
return; //noop
}
else{
emitter.removeListener('listening', onSuccessorListening);
}
clearTimeout(successorGuard);
logger.debug('[deathQueue] successor:%d of %d is ready, wait for %s and timeout in:%dms', successorPid, pid, death, wait);
var deathGuard = setTimeout(function(){
if(!exports.safeKill(pid, 'SIGTERM', logger)){
//worker still there, should emit 'exit' eventually
logger.debug('[deathQueue] worker:%d did not report death by:%d, kill by SIGTERM', pid, wait);
//remove the redundant exit listener
emitter.removeListener('died', onDeath);
}
else{//suicide or accident already happended, process has run away
//we emit this from master on behalf of the run away process.
logger.debug('[deathQueue] worker:%d probably ran away, emit:%s on behalf', death);
//immediately report death to the master
emitter.emit('died', pid);
}
}, wait);
worker.kill('SIGINT');
emitter.on('died', function onDeath(dismiss){
if(pid !== dismiss){
return;
}
else{
emitter.removeListener('died', onDeath);
}
logger.debug('[deathQueue] %d died', pid);
clearTimeout(deathGuard);//release the deathGuard
tillDeath.resolve(pid);
if(tillPrevDeath === afterDeath){//last of dyingQueue resolved, clean up the dyingQueue
logger.debug('[deathQueue] death queue cleaned up');
tillPrevDeath = null;
queue.splice(0, queue.length);
}
});
});
};
if(!tillPrevDeath){//1st in the dying queue,
afterDeath = tillPrevDeath = tillDeath.promise;//1 min
die(retry);
}
else{
afterDeath = tillPrevDeath = tillPrevDeath.ensure(_.bind(die, null, retry));
}
}
};
};
exports.deathQueue = exports.deathQueueGenerator({
'timeout': 60000
});

@@ -29,4 +29,5 @@ /*

util = require('util'),
usage = require('usage'),
memwatch = require('memwatch');
BigNumber = require('bignumber.js'),
usage = require('usage');
// uvmon = require('nodefly-uvmon');

@@ -59,2 +60,3 @@ var debug = process.env['cluster2'];

log('killall called with signal ', signal);
// uvmon.stop();
var that = this, fullname;

@@ -87,3 +89,3 @@ fs.readdir(that.options.pids, function(err, paths) {

fs.readFile(fullname, 'ascii', function(err, data) {
var pid = parseInt(data);
var pid = parseInt(data);
if(pid === process.pid) {

@@ -142,3 +144,2 @@ log('Unlinking ', fullname);

self.emitter.emit('forked', worker.pid);
// Collect counters from workers

@@ -160,3 +161,20 @@ worker.on('message', function (message) {

if(message.pid != process.pid){
self._heartbeats.push(message);//must append to the tail
self._heartbeats.push({
'pid': message.pid,
'usertime': message.cpu/self.options.noWorkers,
'systime': message.cpu/self.options.noWorkers,
'uptime': message.uptime,
'totalmem': Math.pow(2, 31) - 1,
'freemem': Math.pow(2, 31) - 1 - message.memory,
'totalConnections': message.totalConnections,
'pendingConnections': message.aliveConnections,
'timedoutConnections': 0,
'fullGCs': message.gc.full,
'incrementalGCs': message.gc.incremental,
'pauseMS': message.gc.pauseMS,
'totalTransactions': message.transactions,
'totalDurations': message.durations,
'errors': message.error.count,
'interval': message.cycle
});//must append to the tail
// update the last heartbeat time for the worker

@@ -170,38 +188,62 @@ var workerStats = self.stats.workers[message.pid];

var count = self._heartbeats.length,
threads = {},
aggr = {};
var heartbeatsOfThisCycle = _.clone(self._heartbeats);
//clean up immediately
self._heartbeats = [];
_.each(_.range(0, count), function(aggregated){
var heartbeat = self._heartbeats.shift();
var groupedByWorkers = _.reduce(heartbeatsOfThisCycle, function(memoize, aHeartbeat){
_.each(heartbeat, function(val, key){
aggr[key] = (aggr[key] || 0) + val;
});
memoize[aHeartbeat.pid] = (memoize[aHeartbeat.pid] || []).concat(aHeartbeat);
return memoize;
}, {}),
threads[heartbeat.pid] = heartbeat.pid;
});
avgOfWorkers = _.map(groupedByWorkers, function(heartbeatsOfThisWorker, pid){
//emit the aggregated heartbeat message
self.emitter.emit('heartbeat', {
var aggrOfThisWorker = {},
numOfHeartbeats = heartbeatsOfThisWorker.length;
_.each(heartbeatsOfThisWorker, function(aHeartbeatOfThisWorker){
_.each(aHeartbeatOfThisWorker, function(val, key){
aggrOfThisWorker[key] = (aggrOfThisWorker[key] || 0) + val;
});
});
return {
'pid': process.pid,
'usertime': aggrOfThisWorker.usertime / numOfHeartbeats,//avg
'systime': aggrOfThisWorker.systime / numOfHeartbeats,//avg
'uptime': aggrOfThisWorker.uptime / numOfHeartbeats,//avg
'totalmem': aggrOfThisWorker.totalmem / numOfHeartbeats,//avg
'freemem': aggrOfThisWorker.freemem / numOfHeartbeats,//avg
'totalConnections': aggrOfThisWorker.totalConnections / numOfHeartbeats,//avg
'pendingConnections': aggrOfThisWorker.pendingConnections / numOfHeartbeats,//avg
'timedoutConnections': aggrOfThisWorker.timedoutConnections / numOfHeartbeats,//avg
'fullGCs': aggrOfThisWorker.fullGCs / numOfHeartbeats,//avg
'incrementalGCs': aggrOfThisWorker.incrementalGCs / numOfHeartbeats,//avg
'pauseMS': aggrOfThisWorker.pauseMS / numOfHeartbeats,//avg
'totalTransactions': aggrOfThisWorker.totalTransactions / numOfHeartbeats,//avg
'totalDuration': aggrOfThisWorker.totalDurations / numOfHeartbeats,//avg
'errors': aggrOfThisWorker.errors / numOfHeartbeats,//avg
'interval': aggrOfThisWorker.interval / numOfHeartbeats//avg
};
}),
aggrOfWorkers = _.reduce(avgOfWorkers, function(memoize, avgOfWorker){
_.each(avgOfWorker, function(val, key){
memoize[key] = (memoize[key] || 0) + val;
});
return memoize;
}, {});
_.extend(aggrOfWorkers, {
'pid': process.pid,
'usertime': aggr.usertime / count,//avg
'systime': aggr.systime / count,//avg
'uptime': aggr.uptime / count,//avg
'totalmem': aggr.totalmem / count,//avg
'freemem': aggr.freemem / count,//avg
'totalConnections': aggr.totalConnections,//total
'pendingConnections': aggr.pendingConnections,//total
'timedoutConnections': aggr.timedoutConnections,//total
'fullGCs': aggr.fullGCs,//total
'incrementalGCs': aggr.incrementalGCs,//total
'heapCompactions': aggr.heapCompactions,//total
'totalTransactions': aggr.totalTransactions,//total
'totalDuration': aggr.totalDuration,//total
'errors': aggr.errors,//total
'threads': _.keys(threads).length,
'interval': Date.now() - self.lastTime
'threads': avgOfWorkers.length,
'uptime': aggrOfWorkers.uptime / avgOfWorkers.length, //uptime is the only value we don't want total..
'interval': aggrOfWorkers.interval / avgOfWorkers.length
});
//emit the aggregated heartbeat message
self.emitter.emit('heartbeat', aggrOfWorkers);
self.lastTime = Date.now();

@@ -364,2 +406,10 @@

if(cluster.isMaster) {
if(!_.contains(process.argv, '--nouse-idle-notification')){
//the ugly way to force --nouse-idle-notification is actually to modify process.argv directly in master
var argv = _.toArray(process.argv),
node = argv.shift();
argv.unshift('--nouse-idle-notification');
argv.unshift(node);
process.argv = argv;
}

@@ -476,2 +526,4 @@ this.stats.pid = process.pid;

else {
var totalTransactions = new BigNumber(0);
var totalDurations = new BigNumber(0);

@@ -607,8 +659,16 @@ var listening = false, conns = 0, totalConns = 0, timedoutConns = 0, noAppClosed = 0, graceful = _.once(function graceful(signal, code){

var memStats = {
'num_full_gc': 0,
'num_inc_gc': 0,
'heap_compactions': 0
gc: {
'incremental': 0,
'full': 0,
'pauseMS': 0
}
};
memwatch.on('stats', function(stats){
_.extend(memStats, stats || {});
var bin = require('gc-stats/build/Release/gcstats');
bin.afterGC(function(stats) {
//cannot tell if it's incremental or full, just to check if pauseMS is too long
//logger.debug('[worker] gc usage:%d, type:%s', usage, type);
memStats.gc[stats.pauseMS < 500 ? 'incremental' : 'full'] += 1;
memStats.gc.pauseMS += stats.pauseMS;
});

@@ -618,6 +678,7 @@

'count': 0,
'totalDuration': 0
'totalDuration': 0,
'start': Date.now()
}
self.emitter.on('rootTransaction', function(tx){
txStats.count += 1;
txStats.count += 1;
txStats.totalDuration += tx.duration;

@@ -632,27 +693,33 @@ });

var heartbeatInterval = self.options.heartbeatInterval || 60000;
// Heartbeat - make sure to clear this on 'close'
var heartbeat = setInterval(function () {
totalTransactions = totalTransactions.plus(txStats.count);
totalDurations = totalDurations.plus(txStats.totalDuration);
usage.lookup(process.pid, function(err, result) {
if(!err){
if(!err){
var memTotal = Math.pow(2, 31) - 1,//should be 4g full space, but that exceeds MAX_INT, reduce it to MAX_INT
heartbeat = {
var heartbeat = {
'pid': process.pid,
'usertime': result.cpu,
'systime': result.cpu,
'uptime': Math.round(process.uptime()),
'totalmem': memTotal,
'freemem': memTotal - result.memory,
'uptime': process.uptime(),
'cpu': result.cpu,
'memory': result.memory,
'aliveConnections': conns,
'totalConnections': totalConns,
'pendingConnections': conns,
'timedoutConnections': timedoutConns,
'fullGCs': memStats['num_full_gc'],
'incrementalGCs': memStats['num_inc_gc'],
'heapCompactions': memStats['heap_compactions'],
'totalTransactions': txStats.count,
'totalDuration': txStats.totalDuration,
'errors': errors
'transactions': txStats.count,
'durations': txStats.totalDuration,
'totalTransactions': totalTransactions,
'totalDurations': totalDurations,
'gc': memStats.gc,
// 'uv': uvmon.getData(), //added uv monitor data for heartbeat
'error': {
count: errors
},
'cycle': (Date.now() - txStats.start)
};
self.emitter.emit('heartbeat', heartbeat);

@@ -665,9 +732,13 @@ var toMaster = {

process.send(toMaster);
//reset
memStats.num_full_gc = 0;
memStats.num_inc_gc = 0;
memStats.heap_compactions = 0;
memStats = {
gc: {
'incremental': 0,
'full': 0,
'pauseMS': 0
}
};
txStats.count = 0;
txStats.totalDuration = 0;
txStats.start = Date.now();
errors = 0;

@@ -677,3 +748,3 @@ }

}, self.options.heartbeatInterval || 60000);
}, heartbeatInterval);
// put the heartbeat interval id in the process context

@@ -680,0 +751,0 @@ process.heartbeat = heartbeat;

{
"author": "ql.io",
"contributors": [{
"name": "Subbu Allamaraju",
"email": "subbu@ebaysf.com"
},
"author": "ql.io",
"contributors": [
{
"name": "Roy Zhou",
"email": "huzhou@ebay.com"
}],
"name": "cluster2",
"version": "0.4.23",
"repository": {
"type": "git",
"url": "https://github.com/cubejs/cluster2"
},
"engines": {
"node": ">= 0.8.0"
},
"main": "lib/index.js",
"dependencies": {
"underscore": "~1.4.4",
"express": "~2.5.11",
"ejs": "~0.8.4",
"npm": "~1.3.0",
"when": "~2.4.0",
"memwatch": "~0.2.2",
"usage": "~0.3.8"
},
"devDependencies": {
"websocket": "~1.0.8",
"nodeunit": "~0.8.0",
"request": "~2.21.0",
"mocha": "~1.11.0",
"should": "~1.2.2",
"harbor": "~0.2.0"
},
"scripts": {
"test": "nodeunit test"
},
"optionalDependencies": {},
"publishConfig": {
"registry": "https://registry.npmjs.org/"
"name": "Subbu Allamaraju",
"email": "subbu@ebaysf.com"
}
],
"name": "cluster2",
"version": "0.4.24",
"repository": {
"type": "git",
"url": "https://github.com/cubejs/cluster2"
},
"engines": {
"node": ">= 0.8.0"
},
"main": "lib/index.js",
"dependencies": {
"underscore": "~1.4.4",
"express": "~2.5.11",
"ejs": "~0.8.4",
"npm": "~1.3.0",
"when": "~2.4.0",
"usage": "~0.3.8",
"bignumber.js": "~1.1.1",
"gc-stats": "~0.0.1"
},
"devDependencies": {
"websocket": "~1.0.8",
"nodeunit": "~0.8.0",
"request": "~2.21.0",
"mocha": "~1.11.0",
"should": "~1.2.2",
"harbor": "~0.2.0"
},
"scripts": {
"test": "nodeunit test"
},
"optionalDependencies": {},
"publishConfig": {
"registry": "http://registry.npmjs.org/"
}
}
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc