Comparing version 0.4.2 to 0.4.3
@@ -31,4 +31,11 @@ var _ = require('underscore'), | ||
function FIRST_REDUCER(memoize, element) { | ||
return { | ||
value : memoize ? memoize.value : element | ||
}; | ||
} | ||
//componentStatus module allows application to #register their component view handlers | ||
//each handler should take a simple JSON object {"component":"", "view":"[JSON|HTML]"} and produce a result | ||
//each handler should take a simple JSON object {'component':', 'view':'[JSON|HTML]'} and produce a result | ||
//which has same keys, except for that view should contain the actual view content, as string to be displayed | ||
@@ -40,3 +47,3 @@ var ComponentStatus = exports.ComponentStatus = function(emitter){ | ||
emitter.on("new-component-status", function(component){ | ||
emitter.on('new-component-status', function(component){ | ||
@@ -54,9 +61,10 @@ //console.log('[cluster2] master component-status:' + JSON.stringify(component)); | ||
.reducer('avg', AVERAGE_REDUCER) | ||
.reducer('array', ARRAY_REDUCER); | ||
.reducer('array', ARRAY_REDUCER) | ||
.reducer('first', FIRST_REDUCER); | ||
}; | ||
//worker | ||
ComponentStatus.prototype.register = function(name, handler, reducer){ | ||
ComponentStatus.prototype.register = function(name, handler, reducer, updater){ | ||
emitter.emit("new-component-status", { | ||
emitter.emit('new-component-status', { | ||
name : name, | ||
@@ -66,3 +74,3 @@ reducer: reducer | ||
emitter.on("get-component-status", function(component, now, options){ | ||
emitter.on('get-component-status', function(component, now, options){ | ||
if(_.isEqual(name, component)){ | ||
@@ -75,2 +83,16 @@ emitter.emit( | ||
if(updater){ | ||
emitter.on('update-component-status', function(component, now, options, value){ | ||
emitter.emit( | ||
util.format('update-component-status-%s-%d-%s', component, now, options.worker ? process.pid : ''), | ||
updater(options.params, value)); | ||
}); | ||
} | ||
var components = this.components = this.components || {}; | ||
components[name] = { | ||
'reducer': reducer, | ||
'count': components[name] ? components[name].count + 1 : 1 | ||
}; | ||
return this; | ||
@@ -127,5 +149,63 @@ }; | ||
ComponentStatus.prototype.setStatus = function(component, options, value){ | ||
options = options || {}; | ||
var params = options.params || [], | ||
worker = options.worker, | ||
done = options.done, | ||
expects = worker ? 1 : this.components[component].count, | ||
reducer = this.reducers[this.components[component].reducer] || DEFAULT_REDUCER, | ||
now = Date.now(), | ||
event = util.format('update-component-status-%s-%d-%s', component, now, worker || ''), | ||
all = [], | ||
collect = function(status){ | ||
all.push(status); | ||
if((expects -= 1) === 0){ | ||
clearTimeout(timeOut); | ||
emitter.removeListener(event, collect); | ||
done(_.reduce(all, reducer, null).value); | ||
} | ||
}, | ||
timeOut = setTimeout(function(){ | ||
emitter.removeListener(event, collect); | ||
var partial = _.reduce(all, reducer, null); | ||
done(partial ? partial.value : null); | ||
}, 3000); | ||
//console.log('[cluster2] getStatus:' + component + ';event:' + event + ';expects:' + expects); | ||
emitter.on(event, collect); | ||
//console.log('[cluster2] expects:' + event); | ||
emitter.emit('update-component-status', component, now, params, value); | ||
return this; | ||
}; | ||
//default single process emitter is the process itself | ||
var emitter = process; | ||
var emitter = { | ||
'handlers': {}, | ||
'emit': function(){ | ||
process.emit.apply(process, arguments); | ||
}, | ||
'on': function(event, handler){ | ||
if(!emitter.handlers[event]){ | ||
emitter.handlers[event] = []; | ||
process.on(event, function(){ | ||
var params = arguments; | ||
_.each(emitter.handlers[event], function(handler){ | ||
handler.apply(null, params); | ||
}); | ||
}); | ||
} | ||
emitter.handlers[event].push(handler); | ||
}, | ||
'removeListener': function(event, handler){ | ||
emitter.handlers[event] = _.without(emitter.handlers[event] || [], handler); | ||
} | ||
}; | ||
//overwrite the emitter in case the process is the master of a cluster | ||
@@ -135,7 +215,7 @@ if(process.cluster && process.cluster.clustered){ | ||
emitter = { | ||
events : [], | ||
handlers : [], | ||
actuals : [], | ||
'handlers': { | ||
emit: function(){ | ||
}, | ||
'emit': function(){ | ||
var args = _.toArray(arguments), | ||
@@ -159,29 +239,28 @@ event = args.shift(); | ||
}, | ||
on: function(event, handler){ | ||
emitter.events.push(event); | ||
emitter.handlers.push(handler); | ||
emitter.actuals.push(function(message){ | ||
//console.log('[cluster2] master handle:' + event + ':' + message.type + ':' + JSON.stringify(message)); | ||
if(_.isEqual(message.type, event)){ | ||
handler.apply(null, message.params); | ||
'on': function(event, handler){ | ||
emitter.handlers[event] = emitter.handlers[event] || []; | ||
emitter.handlers[event].push(handler); | ||
_.each(_.values(workers), function(worker){ | ||
if(!worker.clusterEventHandler){ | ||
worker.clusterEventHandler = function(message){ | ||
_.invoke(emitter.handlers[message.type] || [], 'apply', null, message.params); | ||
}; | ||
worker.on('message', worker.clusterEventHandler); | ||
} | ||
}); | ||
_.each(_.values(workers), function(worker){ | ||
worker.on("message", _.last(emitter.actuals)); | ||
}); | ||
process.on(event, handler); | ||
if(!process.clusterEventHandler){ | ||
process.clusterEventHandler = function(){ | ||
_.invoke(emitter.handlers[message.type] || [], 'apply', null, arguments); | ||
} | ||
} | ||
}, | ||
removeListener: function(event, handler){ | ||
var index = _.indexOf(emitter.handlers, handler); | ||
_.each(_.values(workers), function(worker){ | ||
worker.removeListener("message", emitter.actuals[index]); | ||
}); | ||
'removeListener': function(event, handler){ | ||
emitter.handlers = emitter.handlers.splice(index, 1); | ||
emitter.actuals = emitter.actuals.splice(index, 1); | ||
process.removeListener(event, handler); | ||
emitter.handlers[event] = _.without(emitter.handlers[event] || [], handler); | ||
} | ||
@@ -193,13 +272,17 @@ }; | ||
var worker = workers[pid]; | ||
_.each(emitter.handlers, function(handler, index){ | ||
if(!worker.clusterEventHandler){ | ||
worker.clusterEventHandler = function(message){ | ||
_.invoke(emitter.handlers[message.type] || [], 'apply', null, message.params); | ||
}; | ||
var event = emitter.events[index], | ||
actual = emitter.actuals[index]; | ||
worker.on('message', actual); | ||
}); | ||
worker.on('message', worker.clusterEventHandler); | ||
} | ||
}); | ||
} | ||
if(cluster.isWorker){ | ||
emitter = { | ||
emit: function(){ | ||
'handlers': {}, | ||
'emit': function(){ | ||
@@ -214,13 +297,20 @@ var args = _.toArray(arguments), | ||
}, | ||
on: function(event, handler){ | ||
process.on("message", function(message){ | ||
if(_.isEqual(message.type, event)){ | ||
handler.apply(null, message.params); | ||
} | ||
}); | ||
'on': function(event, handler){ | ||
this.handlers[event] = this.handlers[event] || []; | ||
this.handlers[event].push(handler); | ||
}, | ||
'removeListener': function(event, handler){ | ||
this.handlers[event] = _.without(this.handlers[event] || [], handler); | ||
} | ||
} | ||
}; | ||
process.on('message', function(message){ | ||
_.each(emitter.handlers[message.type] || [], function(h){ | ||
h.apply(null, message.params); | ||
}); | ||
}); | ||
} | ||
exports.componentStatus = new ComponentStatus(emitter); |
119
lib/index.js
@@ -26,3 +26,4 @@ /* | ||
net = require('net'), | ||
events = require('events'); | ||
events = require('events'), | ||
Q = require('q'); | ||
@@ -64,26 +65,29 @@ // Trap all uncaught exception here. | ||
Cluster.prototype.listen = function(createApp, cb) { | ||
var self = this; | ||
var self = this, | ||
options = self.options; | ||
assert.ok(_.isFunction(createApp), 'createApp must be a function'); | ||
if(self.options.cluster) { | ||
if(options.cluster) { | ||
var master = new Process({ | ||
pids: process.cwd() + '/pids', | ||
logs: process.cwd() + '/logs', | ||
port: self.options.port, | ||
host: self.options.host || '0.0.0.0', | ||
monPort: self.options.monPort, | ||
monHost: self.options.monHost || '0.0.0.0', | ||
monPath: self.options.monPath, | ||
ecv: self.options.ecv, | ||
noWorkers: self.options.noWorkers, | ||
timeout: self.options.timeout || 30 * 1000, // idle socket timeout | ||
connThreshold: self.options.connThreshold || 10000, // recycle workers after this many connections | ||
heartbeatInterval: self.options.heartbeatInterval, | ||
port: options.port, | ||
host: options.host || '0.0.0.0', | ||
monPort: options.monPort, | ||
monHost: options.monHost || '0.0.0.0', | ||
monPath: options.monPath, | ||
ecv: options.ecv, | ||
noWorkers: options.noWorkers, | ||
timeout: options.timeout || 30 * 1000, // idle socket timeout | ||
connThreshold: options.connThreshold || 10000, // recycle workers after this many connections | ||
heartbeatInterval: options.heartbeatInterval, | ||
emitter: self | ||
}); | ||
if(self.options.stop) { | ||
if(options.stop) { | ||
master.stop() | ||
} | ||
else if(self.options.shutdown) { | ||
else if(options.shutdown) { | ||
master.shutdown(); | ||
@@ -94,4 +98,4 @@ } | ||
master.listen(app, monApp, function () { | ||
if(self.options.ecv) { | ||
ecv.enable(app, self.options, self, function (data) { | ||
if(options.ecv) { | ||
ecv.enable(app, options, self, function (data) { | ||
return true; | ||
@@ -109,3 +113,3 @@ }); | ||
// Temp Fix to unblock tech talk demo | ||
var ports = _.isArray(self.options.port) ? self.options.port : [self.options.port]; | ||
var ports = _.isArray(options.port) ? options.port : [options.port]; | ||
if (ports.length > 1) { | ||
@@ -115,7 +119,20 @@ console.log('Provide a single port for non-cluster mode. Exiting.'); | ||
} | ||
var host = self.options.host; | ||
createApp.call(null, function (app) { | ||
app.listen(ports[0], host, function () { | ||
if (self.options.ecv) { | ||
ecv.enable(app, self.options, self, function (data) { | ||
createApp.call(null, function (app, monApp) { | ||
//adding monApp to none-cluster mode | ||
var Monitor = require('./monitor.js'), | ||
monitor = new Monitor({ | ||
monitor: options.monitor || monApp, | ||
stats: self.stats, | ||
host: options.monHost, | ||
port: options.monPort, | ||
path: options.monPath | ||
}); | ||
monitor.listen(options.monPort, options.host); | ||
app.listen(ports[0], options.host, function () { | ||
if (options.ecv) { | ||
//bugfix by huzhou@ebay.com, in cluster=false mode, ecv failed because of wrong params, should use array of 'app':app object | ||
ecv.enable([{'app':app}], options, self, function (data) { | ||
return true; | ||
@@ -125,5 +142,13 @@ }); | ||
if (cb) { | ||
cb(app); | ||
cb(app, monitor); | ||
} | ||
}); | ||
//register the master worker itself, as it doesn't go through master process creation | ||
var componentStatus = self.componentStatusResolved = require('./component-status.js').componentStatus; | ||
componentStatus.register('worker', function(){ | ||
return 'm' + process.pid; | ||
}, 'array'); | ||
self.emit('component-status-initialized', componentStatus); | ||
}); | ||
@@ -136,11 +161,14 @@ } | ||
// masters hanging around together | ||
var ports = _.isArray(app) ? _.reduce(app, function(arr, anApp){ | ||
return arr.concat(anApp.port && anApp.app ? | ||
_.isArray(anApp.port) ? anApp.port : [anApp.port] : []); | ||
},[]) | ||
:_.isArray(self.options.port) ? self.options.port : [self.options.port]; | ||
var host = self.options.host; | ||
var ports = _.isArray(app) | ||
? _.reduce(app, | ||
function(ports, anApp){ | ||
return ports.concat(anApp.port && anApp.app | ||
? _.isArray(anApp.port) ? anApp.port : [anApp.port] | ||
: []); | ||
}, | ||
[]) | ||
: _.isArray(options.port) ? options.port : [options.port]; | ||
exitIfBusyPort(host, ports, ports.length - 1, function(){ | ||
cb(_.filter(_.isArray(app) ? app : [{app: app, port: self.options.port}], | ||
exitIfBusyPort(options.host, ports, ports.length - 1, function(){ | ||
cb(_.filter(_.isArray(app) ? app : [{app: app, port: options.port}], | ||
function(app){ | ||
@@ -171,5 +199,26 @@ return app.app && app.port; | ||
} | ||
} | ||
return self; | ||
}; | ||
Cluster.prototype.componentStatus = function(){ | ||
if(!this.componentStatusPromise){ | ||
var componentStatusDeferred = Q.defer(); | ||
this.componentStatusPromise = componentStatusDeferred.promise; | ||
if(!this.componentStatusResolved){ | ||
this.once('component-status-initialized', function(componentStatus){ | ||
componentStatusDeferred.resolve(componentStatus); | ||
}); | ||
} | ||
else{ | ||
componentStatusDeferred.resolve(this.componentStatusResolved); | ||
} | ||
} | ||
return this.componentStatusPromise; | ||
}; | ||
Cluster.prototype.stop = function () { | ||
@@ -180,3 +229,3 @@ var master = new Process({ | ||
master.stop(); | ||
} | ||
}; | ||
@@ -188,2 +237,2 @@ Cluster.prototype.shutdown = function () { | ||
master.shutdown(); | ||
} | ||
}; |
@@ -27,3 +27,5 @@ /* | ||
Q = require('q'), | ||
util = require('util'); | ||
util = require('util'), | ||
getrusage = require('getrusage'), | ||
memwatch = require('memwatch'); | ||
@@ -150,22 +152,23 @@ var debug = process.env['cluster2']; | ||
if(message.type === "heartbeat"){ | ||
if(message.pid != process.pid){ | ||
self._heartbeats.push(message);//must append to the tail | ||
} | ||
self.lastTime = self.lastTime = Date.now(); | ||
self._heartbeatScheduler = self._heartbeatScheduler || setInterval(function () { | ||
var count = self._heartbeats.length; | ||
var aggr = _.reduce(self._heartbeats, function(memoize, heartbeat){ | ||
return { | ||
pid: process.pid,//using master's pid | ||
uptime: (memoize.uptime || 0) + heartbeat.uptime,//sum | ||
totalmem: (memoize.totalmem || 0) + heartbeat.totalmem,//sum | ||
freemem: (memoize.freemem || 0) + heartbeat.freemem,//sum | ||
totalConnections: (memoize.totalConnections || 0) + heartbeat.totalConnections,//sum | ||
pendingConnections: (memoize.pendingConnections || 0) + heartbeat.pendingConnections,//sum | ||
timedoutConnections: (memoize.timedoutConnections || 0) + heartbeat.timedoutConnections//sum | ||
}; | ||
}, {}); | ||
//delete aggregated heartbeats from self | ||
var count = self._heartbeats.length, | ||
threads = {}, | ||
aggr = {}; | ||
_.each(_.range(0, count), function(aggregated){ | ||
self._heartbeats.shift(); | ||
var heartbeat = self._heartbeats.shift(); | ||
_.each(heartbeat, function(val, key){ | ||
aggr[key] = (aggr[key] || 0) + val; | ||
}); | ||
threads[heartbeat.pid] = heartbeat.pid; | ||
}); | ||
@@ -175,11 +178,22 @@ | ||
self.emitter.emit('heartbeat', { | ||
pid: process.pid, | ||
uptime: aggr.uptime / count,//avg | ||
totalmem: aggr.totalmem / count,//avg | ||
freemem: aggr.freemem / count,//avg | ||
totalConnections: aggr.totalConnections,//total | ||
pendingConnections: aggr.pendingConnections,//total | ||
timedoutConnections: aggr.timedoutConnections//total | ||
'pid': process.pid, | ||
'usertime': aggr.usertime / count,//avg | ||
'systime': aggr.systime / count,//avg | ||
'uptime': aggr.uptime / count,//avg | ||
'totalmem': aggr.totalmem / count,//avg | ||
'freemem': aggr.freemem / count,//avg | ||
'totalConnections': aggr.totalConnections,//total | ||
'pendingConnections': aggr.pendingConnections,//total | ||
'timedoutConnections': aggr.timedoutConnections,//total | ||
'fullGCs': aggr.fullGCs,//total | ||
'incrementalGCs': aggr.incrementalGCs,//total | ||
'heapCompactions': aggr.heapCompactions,//total | ||
'totalTransactions': aggr.totalTransactions,//total | ||
'totalDuration': aggr.totalDuration,//total | ||
'threads': _.keys(threads).length, | ||
'interval': Date.now() - self.lastTime | ||
}); | ||
self.lastTime = Date.now(); | ||
}, self.options.heartbeatInterval || 60000); | ||
@@ -313,2 +327,3 @@ } | ||
if(cluster.isMaster) { | ||
this.stats.pid = process.pid; | ||
@@ -327,6 +342,9 @@ this.stats.start = new Date(); | ||
//register the master worker itself | ||
require('./component-status.js').componentStatus.register('worker', function(){ | ||
var componentStatus = self.componentStatusResolved = require('./component-status.js').componentStatus; | ||
componentStatus.register('worker', function(){ | ||
return 'm' + process.pid; | ||
}, 'array'); | ||
self.emitter.emit('component-status-initialized', componentStatus); | ||
// Monitor to serve log files and other stats - typically on an internal port | ||
@@ -416,2 +434,3 @@ var monitor = new Monitor({ | ||
else { | ||
var listening = false, conns = 0, totalConns = 0, timedoutConns = 0, noAppClosed = 0; | ||
@@ -469,2 +488,12 @@ process.on('SIGINT', function() { | ||
_.each(apps, function(app){ | ||
var notifyInitialized = _.once(function(){ | ||
var componentStatus = self.componentStatusResolved = require('./component-status.js').componentStatus; | ||
componentStatus.register('worker', function(){ | ||
return process.pid; | ||
}, 'array'); | ||
self.emitter.emit('component-status-initialized', componentStatus); | ||
}); | ||
app.app.once('listening', function() { | ||
@@ -478,5 +507,3 @@ app.listening = true; | ||
require('./component-status.js').componentStatus.register('worker', function(){ | ||
return process.pid; | ||
}, 'array'); | ||
notifyInitialized(); | ||
}); | ||
@@ -535,12 +562,39 @@ | ||
var memStats = { | ||
'num_full_gc': 0, | ||
'num_inc_gc': 0, | ||
'heap_compactions': 0 | ||
}; | ||
memwatch.on('stats', function(stats){ | ||
_.extend(memStats, stats || {}); | ||
}); | ||
var txStats = { | ||
'count': 0, | ||
'totalDuration': 0 | ||
} | ||
self.emitter.on('rootTransaction', function(tx){ | ||
txStats.count += 1; | ||
txStats.totalDuration += tx.duration; | ||
}); | ||
// Heartbeat - make sure to clear this on 'close' | ||
var heartbeat = setInterval(function () { | ||
var heartbeat = { | ||
pid: process.pid, | ||
uptime: Math.round(process.uptime()), | ||
totalmem: os.totalmem(), | ||
freemem: os.freemem(), | ||
totalConnections: totalConns, | ||
pendingConnections: conns, | ||
timedoutConnections: timedoutConns | ||
'pid': process.pid, | ||
'usertime': getrusage.getcputime(), | ||
'systime': getrusage.getsystemtime(), | ||
'uptime': Math.round(process.uptime()), | ||
'totalmem': os.totalmem(), | ||
'freemem': os.freemem(), | ||
'totalConnections': totalConns, | ||
'pendingConnections': conns, | ||
'timedoutConnections': timedoutConns, | ||
'fullGCs': memStats['num_full_gc'], | ||
'incrementalGCs': memStats['num_inc_gc'], | ||
'heapCompactions': memStats['heap_compactions'], | ||
'totalTransactions': txStats.count, | ||
'totalDuration': txStats.totalDuration | ||
}; | ||
@@ -555,2 +609,9 @@ | ||
process.send(toMaster); | ||
memStats.num_full_gc = 0; | ||
memStats.num_inc_gc = 0; | ||
memStats.heap_compactions = 0; | ||
txStats.count = 0; | ||
txStats.totalDuration = 0; | ||
}, self.options.heartbeatInterval || 60000); | ||
@@ -557,0 +618,0 @@ |
@@ -8,3 +8,3 @@ { | ||
"name": "cluster2", | ||
"version": "0.4.2", | ||
"version": "0.4.3", | ||
"repository": { | ||
@@ -19,15 +19,16 @@ "type": "git", | ||
"dependencies": { | ||
"underscore": "", | ||
"express": "2.5.11", | ||
"ejs": "", | ||
"npm": "", | ||
"q": "" | ||
"underscore": "~1.4.4", | ||
"express": "~2.5.11", | ||
"ejs": "~0.8.4", | ||
"npm": "~1.3.0", | ||
"q": "~0.9.6", | ||
"memwatch": "~0.2.2", | ||
"getrusage": "~0.3.3" | ||
}, | ||
"devDependencies": { | ||
"express": "2.5.11", | ||
"websocket": "", | ||
"nodeunit": "", | ||
"request": "", | ||
"mocha": "", | ||
"should": "" | ||
"websocket": "~1.0.8", | ||
"nodeunit": "~0.8.0", | ||
"request": "~2.21.0", | ||
"mocha": "~1.11.0", | ||
"should": "~1.2.2" | ||
}, | ||
@@ -34,0 +35,0 @@ "scripts": { |
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Wildcard dependency
QualityPackage has a dependency with a floating version range. This can cause issues if the dependency publishes a new major version.
Found 4 instances in 1 package
103975
5
25
1732
1
7
+ Addedgetrusage@~0.3.3
+ Addedmemwatch@~0.2.2
+ Addedbindings@1.5.0(transitive)
+ Addedejs@0.8.8(transitive)
+ Addedfile-uri-to-path@1.0.0(transitive)
+ Addedgetrusage@0.3.3(transitive)
+ Addedmemwatch@0.2.2(transitive)
+ Addednpm@1.3.26(transitive)
+ Addedq@0.9.7(transitive)
+ Addedunderscore@1.4.4(transitive)
- Removedansi-styles@4.3.0(transitive)
- Removedasap@2.0.6(transitive)
- Removedasync@3.2.6(transitive)
- Removedbalanced-match@1.0.2(transitive)
- Removedbrace-expansion@1.1.112.0.1(transitive)
- Removedchalk@4.1.2(transitive)
- Removedcolor-convert@2.0.1(transitive)
- Removedcolor-name@1.1.4(transitive)
- Removedconcat-map@0.0.1(transitive)
- Removedejs@3.1.10(transitive)
- Removedfilelist@1.0.4(transitive)
- Removedhas-flag@4.0.0(transitive)
- Removedjake@10.9.2(transitive)
- Removedminimatch@3.1.25.1.6(transitive)
- Removednpm@10.9.1(transitive)
- Removedpop-iterate@1.0.1(transitive)
- Removedq@2.0.3(transitive)
- Removedsupports-color@7.2.0(transitive)
- Removedunderscore@1.13.7(transitive)
- Removedweak-map@1.0.8(transitive)
Updatedejs@~0.8.4
Updatedexpress@~2.5.11
Updatednpm@~1.3.0
Updatedq@~0.9.6
Updatedunderscore@~1.4.4