Comparing version 0.4.23 to 0.4.24
150
lib/misc.js
@@ -17,8 +17,6 @@ /* | ||
var _ = require('underscore'), | ||
fs = require('fs'), | ||
util = require('util'), | ||
var fs = require('fs'), | ||
assert = require('assert'), | ||
when = require('when'), | ||
timeout = require('when/timeout'), | ||
assert = require('assert'); | ||
timeout = require('when/timeout'); | ||
@@ -59,3 +57,3 @@ // Utility to ensure that certain directories exist | ||
/*var tillPrevDeath = null; | ||
var tillPrevDeath = null; | ||
@@ -112,142 +110,2 @@ exports.deathQueue = function deathQueue(worker, emitter, success){ | ||
} | ||
};*/ | ||
exports.safeKill = function(pid, signal, logger){ | ||
try{ | ||
process.kill(pid, signal); | ||
return false; | ||
} | ||
catch(e){ | ||
//verify error is Error: ESRCH | ||
logger.debug('[shutdown] safeKill received:%j', e); | ||
return e.errno === 'ESRCH'; //no such process | ||
} | ||
}; | ||
exports.deathQueueGenerator = function(options){ | ||
options = options || {}; | ||
var tillPrevDeath = null, | ||
queue = options.queue || [], | ||
wait = options.timeout || 60000, | ||
retry = options.retry || 3, | ||
logger = options.logger || { | ||
'debug' : function(){ | ||
console.log.apply(console, arguments); | ||
} | ||
}; | ||
return function deathQueue(worker, emitter, success){ | ||
assert.ok(worker); | ||
assert.ok(emitter); | ||
assert.ok(success); | ||
var pid = worker.pid, | ||
death = util.format('worker-%d-died', pid); | ||
if(!_.contains(queue, pid)){ | ||
queue.push(pid); | ||
var tillDeath = when.defer(), | ||
afterDeath = null, | ||
die = function die(retry){ | ||
if(!retry){ | ||
if(tillPrevDeath){ | ||
tillPrevDeath.reject(new Error('[deathQueue] failed after retries')); | ||
} | ||
tillPrevDeath = null;//reset | ||
} | ||
var successor = success(), | ||
successorPid = successor.pid, | ||
successorGuard = setTimeout(function onSuccessorTimeout(){ | ||
//handle error case of successor not 'listening' after started | ||
exports.safeKill(pid, 'SIGTERM', logger); | ||
logger.debug('[deathQueue] successor:%d did not start listening, kill by SIGTERM', successorPid); | ||
//cancel onListening event handler of the dead successor | ||
emitter.removeListener('listening', onSuccessorListening); | ||
//retry of the 'die' process | ||
die(retry - 1); | ||
}, wait); | ||
//when successor is in place, the old worker could be discontinued finally | ||
emitter.on('listening', function onSuccessorListening(onboard){ | ||
if(successorPid !== onboard){ | ||
return; //noop | ||
} | ||
else{ | ||
emitter.removeListener('listening', onSuccessorListening); | ||
} | ||
clearTimeout(successorGuard); | ||
logger.debug('[deathQueue] successor:%d of %d is ready, wait for %s and timeout in:%dms', successorPid, pid, death, wait); | ||
var deathGuard = setTimeout(function(){ | ||
if(!exports.safeKill(pid, 'SIGTERM', logger)){ | ||
//worker still there, should emit 'exit' eventually | ||
logger.debug('[deathQueue] worker:%d did not report death by:%d, kill by SIGTERM', pid, wait); | ||
//remove the redundant exit listener | ||
emitter.removeListener('died', onDeath); | ||
} | ||
else{//suicide or accident already happended, process has run away | ||
//we emit this from master on behalf of the run away process. | ||
logger.debug('[deathQueue] worker:%d probably ran away, emit:%s on behalf', death); | ||
//immediately report death to the master | ||
emitter.emit('died', pid); | ||
} | ||
}, wait); | ||
worker.kill('SIGINT'); | ||
emitter.on('died', function onDeath(dismiss){ | ||
if(pid !== dismiss){ | ||
return; | ||
} | ||
else{ | ||
emitter.removeListener('died', onDeath); | ||
} | ||
logger.debug('[deathQueue] %d died', pid); | ||
clearTimeout(deathGuard);//release the deathGuard | ||
tillDeath.resolve(pid); | ||
if(tillPrevDeath === afterDeath){//last of dyingQueue resolved, clean up the dyingQueue | ||
logger.debug('[deathQueue] death queue cleaned up'); | ||
tillPrevDeath = null; | ||
queue.splice(0, queue.length); | ||
} | ||
}); | ||
}); | ||
}; | ||
if(!tillPrevDeath){//1st in the dying queue, | ||
afterDeath = tillPrevDeath = tillDeath.promise;//1 min | ||
die(retry); | ||
} | ||
else{ | ||
afterDeath = tillPrevDeath = tillPrevDeath.ensure(_.bind(die, null, retry)); | ||
} | ||
} | ||
}; | ||
}; | ||
exports.deathQueue = exports.deathQueueGenerator({ | ||
'timeout': 60000 | ||
}); |
@@ -29,4 +29,5 @@ /* | ||
util = require('util'), | ||
usage = require('usage'), | ||
memwatch = require('memwatch'); | ||
BigNumber = require('bignumber.js'), | ||
usage = require('usage'); | ||
// uvmon = require('nodefly-uvmon'); | ||
@@ -59,2 +60,3 @@ var debug = process.env['cluster2']; | ||
log('killall called with signal ', signal); | ||
// uvmon.stop(); | ||
var that = this, fullname; | ||
@@ -87,3 +89,3 @@ fs.readdir(that.options.pids, function(err, paths) { | ||
fs.readFile(fullname, 'ascii', function(err, data) { | ||
var pid = parseInt(data); | ||
var pid = parseInt(data); | ||
if(pid === process.pid) { | ||
@@ -142,3 +144,2 @@ log('Unlinking ', fullname); | ||
self.emitter.emit('forked', worker.pid); | ||
// Collect counters from workers | ||
@@ -160,3 +161,20 @@ worker.on('message', function (message) { | ||
if(message.pid != process.pid){ | ||
self._heartbeats.push(message);//must append to the tail | ||
self._heartbeats.push({ | ||
'pid': message.pid, | ||
'usertime': message.cpu/self.options.noWorkers, | ||
'systime': message.cpu/self.options.noWorkers, | ||
'uptime': message.uptime, | ||
'totalmem': Math.pow(2, 31) - 1, | ||
'freemem': Math.pow(2, 31) - 1 - message.memory, | ||
'totalConnections': message.totalConnections, | ||
'pendingConnections': message.aliveConnections, | ||
'timedoutConnections': 0, | ||
'fullGCs': message.gc.full, | ||
'incrementalGCs': message.gc.incremental, | ||
'pauseMS': message.gc.pauseMS, | ||
'totalTransactions': message.transactions, | ||
'totalDurations': message.durations, | ||
'errors': message.error.count, | ||
'interval': message.cycle | ||
});//must append to the tail | ||
// update the last heartbeat time for the worker | ||
@@ -170,38 +188,62 @@ var workerStats = self.stats.workers[message.pid]; | ||
var count = self._heartbeats.length, | ||
threads = {}, | ||
aggr = {}; | ||
var heartbeatsOfThisCycle = _.clone(self._heartbeats); | ||
//clean up immediately | ||
self._heartbeats = []; | ||
_.each(_.range(0, count), function(aggregated){ | ||
var heartbeat = self._heartbeats.shift(); | ||
var groupedByWorkers = _.reduce(heartbeatsOfThisCycle, function(memoize, aHeartbeat){ | ||
_.each(heartbeat, function(val, key){ | ||
aggr[key] = (aggr[key] || 0) + val; | ||
}); | ||
memoize[aHeartbeat.pid] = (memoize[aHeartbeat.pid] || []).concat(aHeartbeat); | ||
return memoize; | ||
}, {}), | ||
threads[heartbeat.pid] = heartbeat.pid; | ||
}); | ||
avgOfWorkers = _.map(groupedByWorkers, function(heartbeatsOfThisWorker, pid){ | ||
//emit the aggregated heartbeat message | ||
self.emitter.emit('heartbeat', { | ||
var aggrOfThisWorker = {}, | ||
numOfHeartbeats = heartbeatsOfThisWorker.length; | ||
_.each(heartbeatsOfThisWorker, function(aHeartbeatOfThisWorker){ | ||
_.each(aHeartbeatOfThisWorker, function(val, key){ | ||
aggrOfThisWorker[key] = (aggrOfThisWorker[key] || 0) + val; | ||
}); | ||
}); | ||
return { | ||
'pid': process.pid, | ||
'usertime': aggrOfThisWorker.usertime / numOfHeartbeats,//avg | ||
'systime': aggrOfThisWorker.systime / numOfHeartbeats,//avg | ||
'uptime': aggrOfThisWorker.uptime / numOfHeartbeats,//avg | ||
'totalmem': aggrOfThisWorker.totalmem / numOfHeartbeats,//avg | ||
'freemem': aggrOfThisWorker.freemem / numOfHeartbeats,//avg | ||
'totalConnections': aggrOfThisWorker.totalConnections / numOfHeartbeats,//avg | ||
'pendingConnections': aggrOfThisWorker.pendingConnections / numOfHeartbeats,//avg | ||
'timedoutConnections': aggrOfThisWorker.timedoutConnections / numOfHeartbeats,//avg | ||
'fullGCs': aggrOfThisWorker.fullGCs / numOfHeartbeats,//avg | ||
'incrementalGCs': aggrOfThisWorker.incrementalGCs / numOfHeartbeats,//avg | ||
'pauseMS': aggrOfThisWorker.pauseMS / numOfHeartbeats,//avg | ||
'totalTransactions': aggrOfThisWorker.totalTransactions / numOfHeartbeats,//avg | ||
'totalDuration': aggrOfThisWorker.totalDurations / numOfHeartbeats,//avg | ||
'errors': aggrOfThisWorker.errors / numOfHeartbeats,//avg | ||
'interval': aggrOfThisWorker.interval / numOfHeartbeats//avg | ||
}; | ||
}), | ||
aggrOfWorkers = _.reduce(avgOfWorkers, function(memoize, avgOfWorker){ | ||
_.each(avgOfWorker, function(val, key){ | ||
memoize[key] = (memoize[key] || 0) + val; | ||
}); | ||
return memoize; | ||
}, {}); | ||
_.extend(aggrOfWorkers, { | ||
'pid': process.pid, | ||
'usertime': aggr.usertime / count,//avg | ||
'systime': aggr.systime / count,//avg | ||
'uptime': aggr.uptime / count,//avg | ||
'totalmem': aggr.totalmem / count,//avg | ||
'freemem': aggr.freemem / count,//avg | ||
'totalConnections': aggr.totalConnections,//total | ||
'pendingConnections': aggr.pendingConnections,//total | ||
'timedoutConnections': aggr.timedoutConnections,//total | ||
'fullGCs': aggr.fullGCs,//total | ||
'incrementalGCs': aggr.incrementalGCs,//total | ||
'heapCompactions': aggr.heapCompactions,//total | ||
'totalTransactions': aggr.totalTransactions,//total | ||
'totalDuration': aggr.totalDuration,//total | ||
'errors': aggr.errors,//total | ||
'threads': _.keys(threads).length, | ||
'interval': Date.now() - self.lastTime | ||
'threads': avgOfWorkers.length, | ||
'uptime': aggrOfWorkers.uptime / avgOfWorkers.length, //uptime is the only value we don't want total.. | ||
'interval': aggrOfWorkers.interval / avgOfWorkers.length | ||
}); | ||
//emit the aggregated heartbeat message | ||
self.emitter.emit('heartbeat', aggrOfWorkers); | ||
self.lastTime = Date.now(); | ||
@@ -364,2 +406,10 @@ | ||
if(cluster.isMaster) { | ||
if(!_.contains(process.argv, '--nouse-idle-notification')){ | ||
//the ugly way to force --nouse-idle-notification is actually to modify process.argv directly in master | ||
var argv = _.toArray(process.argv), | ||
node = argv.shift(); | ||
argv.unshift('--nouse-idle-notification'); | ||
argv.unshift(node); | ||
process.argv = argv; | ||
} | ||
@@ -476,2 +526,4 @@ this.stats.pid = process.pid; | ||
else { | ||
var totalTransactions = new BigNumber(0); | ||
var totalDurations = new BigNumber(0); | ||
@@ -607,8 +659,16 @@ var listening = false, conns = 0, totalConns = 0, timedoutConns = 0, noAppClosed = 0, graceful = _.once(function graceful(signal, code){ | ||
var memStats = { | ||
'num_full_gc': 0, | ||
'num_inc_gc': 0, | ||
'heap_compactions': 0 | ||
gc: { | ||
'incremental': 0, | ||
'full': 0, | ||
'pauseMS': 0 | ||
} | ||
}; | ||
memwatch.on('stats', function(stats){ | ||
_.extend(memStats, stats || {}); | ||
var bin = require('gc-stats/build/Release/gcstats'); | ||
bin.afterGC(function(stats) { | ||
//cannot tell if it's incremental or full, just to check if pauseMS is too long | ||
//logger.debug('[worker] gc usage:%d, type:%s', usage, type); | ||
memStats.gc[stats.pauseMS < 500 ? 'incremental' : 'full'] += 1; | ||
memStats.gc.pauseMS += stats.pauseMS; | ||
}); | ||
@@ -618,6 +678,7 @@ | ||
'count': 0, | ||
'totalDuration': 0 | ||
'totalDuration': 0, | ||
'start': Date.now() | ||
} | ||
self.emitter.on('rootTransaction', function(tx){ | ||
txStats.count += 1; | ||
txStats.count += 1; | ||
txStats.totalDuration += tx.duration; | ||
@@ -632,27 +693,33 @@ }); | ||
var heartbeatInterval = self.options.heartbeatInterval || 60000; | ||
// Heartbeat - make sure to clear this on 'close' | ||
var heartbeat = setInterval(function () { | ||
totalTransactions = totalTransactions.plus(txStats.count); | ||
totalDurations = totalDurations.plus(txStats.totalDuration); | ||
usage.lookup(process.pid, function(err, result) { | ||
if(!err){ | ||
if(!err){ | ||
var memTotal = Math.pow(2, 31) - 1,//should be 4g full space, but that exceeds MAX_INT, reduce it to MAX_INT | ||
heartbeat = { | ||
var heartbeat = { | ||
'pid': process.pid, | ||
'usertime': result.cpu, | ||
'systime': result.cpu, | ||
'uptime': Math.round(process.uptime()), | ||
'totalmem': memTotal, | ||
'freemem': memTotal - result.memory, | ||
'uptime': process.uptime(), | ||
'cpu': result.cpu, | ||
'memory': result.memory, | ||
'aliveConnections': conns, | ||
'totalConnections': totalConns, | ||
'pendingConnections': conns, | ||
'timedoutConnections': timedoutConns, | ||
'fullGCs': memStats['num_full_gc'], | ||
'incrementalGCs': memStats['num_inc_gc'], | ||
'heapCompactions': memStats['heap_compactions'], | ||
'totalTransactions': txStats.count, | ||
'totalDuration': txStats.totalDuration, | ||
'errors': errors | ||
'transactions': txStats.count, | ||
'durations': txStats.totalDuration, | ||
'totalTransactions': totalTransactions, | ||
'totalDurations': totalDurations, | ||
'gc': memStats.gc, | ||
// 'uv': uvmon.getData(), //added uv monitor data for heartbeat | ||
'error': { | ||
count: errors | ||
}, | ||
'cycle': (Date.now() - txStats.start) | ||
}; | ||
self.emitter.emit('heartbeat', heartbeat); | ||
@@ -665,9 +732,13 @@ var toMaster = { | ||
process.send(toMaster); | ||
//reset | ||
memStats.num_full_gc = 0; | ||
memStats.num_inc_gc = 0; | ||
memStats.heap_compactions = 0; | ||
memStats = { | ||
gc: { | ||
'incremental': 0, | ||
'full': 0, | ||
'pauseMS': 0 | ||
} | ||
}; | ||
txStats.count = 0; | ||
txStats.totalDuration = 0; | ||
txStats.start = Date.now(); | ||
errors = 0; | ||
@@ -677,3 +748,3 @@ } | ||
}, self.options.heartbeatInterval || 60000); | ||
}, heartbeatInterval); | ||
// put the heartbeat interval id in the process context | ||
@@ -680,0 +751,0 @@ process.heartbeat = heartbeat; |
{ | ||
"author": "ql.io", | ||
"contributors": [{ | ||
"name": "Subbu Allamaraju", | ||
"email": "subbu@ebaysf.com" | ||
}, | ||
"author": "ql.io", | ||
"contributors": [ | ||
{ | ||
"name": "Roy Zhou", | ||
"email": "huzhou@ebay.com" | ||
}], | ||
"name": "cluster2", | ||
"version": "0.4.23", | ||
"repository": { | ||
"type": "git", | ||
"url": "https://github.com/cubejs/cluster2" | ||
}, | ||
"engines": { | ||
"node": ">= 0.8.0" | ||
}, | ||
"main": "lib/index.js", | ||
"dependencies": { | ||
"underscore": "~1.4.4", | ||
"express": "~2.5.11", | ||
"ejs": "~0.8.4", | ||
"npm": "~1.3.0", | ||
"when": "~2.4.0", | ||
"memwatch": "~0.2.2", | ||
"usage": "~0.3.8" | ||
}, | ||
"devDependencies": { | ||
"websocket": "~1.0.8", | ||
"nodeunit": "~0.8.0", | ||
"request": "~2.21.0", | ||
"mocha": "~1.11.0", | ||
"should": "~1.2.2", | ||
"harbor": "~0.2.0" | ||
}, | ||
"scripts": { | ||
"test": "nodeunit test" | ||
}, | ||
"optionalDependencies": {}, | ||
"publishConfig": { | ||
"registry": "https://registry.npmjs.org/" | ||
"name": "Subbu Allamaraju", | ||
"email": "subbu@ebaysf.com" | ||
} | ||
], | ||
"name": "cluster2", | ||
"version": "0.4.24", | ||
"repository": { | ||
"type": "git", | ||
"url": "https://github.com/cubejs/cluster2" | ||
}, | ||
"engines": { | ||
"node": ">= 0.8.0" | ||
}, | ||
"main": "lib/index.js", | ||
"dependencies": { | ||
"underscore": "~1.4.4", | ||
"express": "~2.5.11", | ||
"ejs": "~0.8.4", | ||
"npm": "~1.3.0", | ||
"when": "~2.4.0", | ||
"usage": "~0.3.8", | ||
"bignumber.js": "~1.1.1", | ||
"gc-stats": "~0.0.1" | ||
}, | ||
"devDependencies": { | ||
"websocket": "~1.0.8", | ||
"nodeunit": "~0.8.0", | ||
"request": "~2.21.0", | ||
"mocha": "~1.11.0", | ||
"should": "~1.2.2", | ||
"harbor": "~0.2.0" | ||
}, | ||
"scripts": { | ||
"test": "nodeunit test" | ||
}, | ||
"optionalDependencies": {}, | ||
"publishConfig": { | ||
"registry": "http://registry.npmjs.org/" | ||
} | ||
} |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
New author
Supply chain riskA new npm collaborator published a version of the package for the first time. New collaborators are usually benign additions to a project, but do indicate a change to the security surface area of a package.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
114340
8
27
1914
3
+ Addedbignumber.js@~1.1.1
+ Addedgc-stats@~0.0.1
+ Addedbignumber.js@1.1.1(transitive)
+ Addedgc-stats@0.0.6(transitive)
+ Addednan@2.22.0(transitive)
- Removedmemwatch@~0.2.2
- Removedmemwatch@0.2.2(transitive)