Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

cluster2

Package Overview
Dependencies
Maintainers
6
Versions
70
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

cluster2 - npm Package Compare versions

Comparing version 0.4.2 to 0.4.3

pids/worker.43182.pid

182

lib/component-status.js

@@ -31,4 +31,11 @@ var _ = require('underscore'),

function FIRST_REDUCER(memoize, element) {
return {
value : memoize ? memoize.value : element
};
}
//componentStatus module allows application to #register their component view handlers
//each handler should take a simple JSON object {"component":"", "view":"[JSON|HTML]"} and produce a result
//each handler should take a simple JSON object {'component':', 'view':'[JSON|HTML]'} and produce a result
//which has same keys, except for that view should contain the actual view content, as string to be displayed

@@ -40,3 +47,3 @@ var ComponentStatus = exports.ComponentStatus = function(emitter){

emitter.on("new-component-status", function(component){
emitter.on('new-component-status', function(component){

@@ -54,9 +61,10 @@ //console.log('[cluster2] master component-status:' + JSON.stringify(component));

.reducer('avg', AVERAGE_REDUCER)
.reducer('array', ARRAY_REDUCER);
.reducer('array', ARRAY_REDUCER)
.reducer('first', FIRST_REDUCER);
};
//worker
ComponentStatus.prototype.register = function(name, handler, reducer){
ComponentStatus.prototype.register = function(name, handler, reducer, updater){
emitter.emit("new-component-status", {
emitter.emit('new-component-status', {
name : name,

@@ -66,3 +74,3 @@ reducer: reducer

emitter.on("get-component-status", function(component, now, options){
emitter.on('get-component-status', function(component, now, options){
if(_.isEqual(name, component)){

@@ -75,2 +83,16 @@ emitter.emit(

if(updater){
emitter.on('update-component-status', function(component, now, options, value){
emitter.emit(
util.format('update-component-status-%s-%d-%s', component, now, options.worker ? process.pid : ''),
updater(options.params, value));
});
}
var components = this.components = this.components || {};
components[name] = {
'reducer': reducer,
'count': components[name] ? components[name].count + 1 : 1
};
return this;

@@ -127,5 +149,63 @@ };

ComponentStatus.prototype.setStatus = function(component, options, value){
options = options || {};
var params = options.params || [],
worker = options.worker,
done = options.done,
expects = worker ? 1 : this.components[component].count,
reducer = this.reducers[this.components[component].reducer] || DEFAULT_REDUCER,
now = Date.now(),
event = util.format('update-component-status-%s-%d-%s', component, now, worker || ''),
all = [],
collect = function(status){
all.push(status);
if((expects -= 1) === 0){
clearTimeout(timeOut);
emitter.removeListener(event, collect);
done(_.reduce(all, reducer, null).value);
}
},
timeOut = setTimeout(function(){
emitter.removeListener(event, collect);
var partial = _.reduce(all, reducer, null);
done(partial ? partial.value : null);
}, 3000);
//console.log('[cluster2] getStatus:' + component + ';event:' + event + ';expects:' + expects);
emitter.on(event, collect);
//console.log('[cluster2] expects:' + event);
emitter.emit('update-component-status', component, now, params, value);
return this;
};
//default single process emitter is the process itself
var emitter = process;
var emitter = {
'handlers': {},
'emit': function(){
process.emit.apply(process, arguments);
},
'on': function(event, handler){
if(!emitter.handlers[event]){
emitter.handlers[event] = [];
process.on(event, function(){
var params = arguments;
_.each(emitter.handlers[event], function(handler){
handler.apply(null, params);
});
});
}
emitter.handlers[event].push(handler);
},
'removeListener': function(event, handler){
emitter.handlers[event] = _.without(emitter.handlers[event] || [], handler);
}
};
//overwrite the emitter in case the process is the master of a cluster

@@ -135,7 +215,7 @@ if(process.cluster && process.cluster.clustered){

emitter = {
events : [],
handlers : [],
actuals : [],
'handlers': {
emit: function(){
},
'emit': function(){
var args = _.toArray(arguments),

@@ -159,29 +239,28 @@ event = args.shift();

},
on: function(event, handler){
emitter.events.push(event);
emitter.handlers.push(handler);
emitter.actuals.push(function(message){
//console.log('[cluster2] master handle:' + event + ':' + message.type + ':' + JSON.stringify(message));
if(_.isEqual(message.type, event)){
handler.apply(null, message.params);
'on': function(event, handler){
emitter.handlers[event] = emitter.handlers[event] || [];
emitter.handlers[event].push(handler);
_.each(_.values(workers), function(worker){
if(!worker.clusterEventHandler){
worker.clusterEventHandler = function(message){
_.invoke(emitter.handlers[message.type] || [], 'apply', null, message.params);
};
worker.on('message', worker.clusterEventHandler);
}
});
_.each(_.values(workers), function(worker){
worker.on("message", _.last(emitter.actuals));
});
process.on(event, handler);
if(!process.clusterEventHandler){
process.clusterEventHandler = function(){
_.invoke(emitter.handlers[message.type] || [], 'apply', null, arguments);
}
}
},
removeListener: function(event, handler){
var index = _.indexOf(emitter.handlers, handler);
_.each(_.values(workers), function(worker){
worker.removeListener("message", emitter.actuals[index]);
});
'removeListener': function(event, handler){
emitter.handlers = emitter.handlers.splice(index, 1);
emitter.actuals = emitter.actuals.splice(index, 1);
process.removeListener(event, handler);
emitter.handlers[event] = _.without(emitter.handlers[event] || [], handler);
}

@@ -193,13 +272,17 @@ };

var worker = workers[pid];
_.each(emitter.handlers, function(handler, index){
if(!worker.clusterEventHandler){
worker.clusterEventHandler = function(message){
_.invoke(emitter.handlers[message.type] || [], 'apply', null, message.params);
};
var event = emitter.events[index],
actual = emitter.actuals[index];
worker.on('message', actual);
});
worker.on('message', worker.clusterEventHandler);
}
});
}
if(cluster.isWorker){
emitter = {
emit: function(){
'handlers': {},
'emit': function(){

@@ -214,13 +297,20 @@ var args = _.toArray(arguments),

},
on: function(event, handler){
process.on("message", function(message){
if(_.isEqual(message.type, event)){
handler.apply(null, message.params);
}
});
'on': function(event, handler){
this.handlers[event] = this.handlers[event] || [];
this.handlers[event].push(handler);
},
'removeListener': function(event, handler){
this.handlers[event] = _.without(this.handlers[event] || [], handler);
}
}
};
process.on('message', function(message){
_.each(emitter.handlers[message.type] || [], function(h){
h.apply(null, message.params);
});
});
}
exports.componentStatus = new ComponentStatus(emitter);

@@ -26,3 +26,4 @@ /*

net = require('net'),
events = require('events');
events = require('events'),
Q = require('q');

@@ -64,26 +65,29 @@ // Trap all uncaught exception here.

Cluster.prototype.listen = function(createApp, cb) {
var self = this;
var self = this,
options = self.options;
assert.ok(_.isFunction(createApp), 'createApp must be a function');
if(self.options.cluster) {
if(options.cluster) {
var master = new Process({
pids: process.cwd() + '/pids',
logs: process.cwd() + '/logs',
port: self.options.port,
host: self.options.host || '0.0.0.0',
monPort: self.options.monPort,
monHost: self.options.monHost || '0.0.0.0',
monPath: self.options.monPath,
ecv: self.options.ecv,
noWorkers: self.options.noWorkers,
timeout: self.options.timeout || 30 * 1000, // idle socket timeout
connThreshold: self.options.connThreshold || 10000, // recycle workers after this many connections
heartbeatInterval: self.options.heartbeatInterval,
port: options.port,
host: options.host || '0.0.0.0',
monPort: options.monPort,
monHost: options.monHost || '0.0.0.0',
monPath: options.monPath,
ecv: options.ecv,
noWorkers: options.noWorkers,
timeout: options.timeout || 30 * 1000, // idle socket timeout
connThreshold: options.connThreshold || 10000, // recycle workers after this many connections
heartbeatInterval: options.heartbeatInterval,
emitter: self
});
if(self.options.stop) {
if(options.stop) {
master.stop()
}
else if(self.options.shutdown) {
else if(options.shutdown) {
master.shutdown();

@@ -94,4 +98,4 @@ }

master.listen(app, monApp, function () {
if(self.options.ecv) {
ecv.enable(app, self.options, self, function (data) {
if(options.ecv) {
ecv.enable(app, options, self, function (data) {
return true;

@@ -109,3 +113,3 @@ });

// Temp Fix to unblock tech talk demo
var ports = _.isArray(self.options.port) ? self.options.port : [self.options.port];
var ports = _.isArray(options.port) ? options.port : [options.port];
if (ports.length > 1) {

@@ -115,7 +119,20 @@ console.log('Provide a single port for non-cluster mode. Exiting.');

}
var host = self.options.host;
createApp.call(null, function (app) {
app.listen(ports[0], host, function () {
if (self.options.ecv) {
ecv.enable(app, self.options, self, function (data) {
createApp.call(null, function (app, monApp) {
//adding monApp to none-cluster mode
var Monitor = require('./monitor.js'),
monitor = new Monitor({
monitor: options.monitor || monApp,
stats: self.stats,
host: options.monHost,
port: options.monPort,
path: options.monPath
});
monitor.listen(options.monPort, options.host);
app.listen(ports[0], options.host, function () {
if (options.ecv) {
//bugfix by huzhou@ebay.com, in cluster=false mode, ecv failed because of wrong params, should use array of 'app':app object
ecv.enable([{'app':app}], options, self, function (data) {
return true;

@@ -125,5 +142,13 @@ });

if (cb) {
cb(app);
cb(app, monitor);
}
});
//register the master worker itself, as it doesn't go through master process creation
var componentStatus = self.componentStatusResolved = require('./component-status.js').componentStatus;
componentStatus.register('worker', function(){
return 'm' + process.pid;
}, 'array');
self.emit('component-status-initialized', componentStatus);
});

@@ -136,11 +161,14 @@ }

// masters hanging around together
var ports = _.isArray(app) ? _.reduce(app, function(arr, anApp){
return arr.concat(anApp.port && anApp.app ?
_.isArray(anApp.port) ? anApp.port : [anApp.port] : []);
},[])
:_.isArray(self.options.port) ? self.options.port : [self.options.port];
var host = self.options.host;
var ports = _.isArray(app)
? _.reduce(app,
function(ports, anApp){
return ports.concat(anApp.port && anApp.app
? _.isArray(anApp.port) ? anApp.port : [anApp.port]
: []);
},
[])
: _.isArray(options.port) ? options.port : [options.port];
exitIfBusyPort(host, ports, ports.length - 1, function(){
cb(_.filter(_.isArray(app) ? app : [{app: app, port: self.options.port}],
exitIfBusyPort(options.host, ports, ports.length - 1, function(){
cb(_.filter(_.isArray(app) ? app : [{app: app, port: options.port}],
function(app){

@@ -171,5 +199,26 @@ return app.app && app.port;

}
}
return self;
};
Cluster.prototype.componentStatus = function(){
if(!this.componentStatusPromise){
var componentStatusDeferred = Q.defer();
this.componentStatusPromise = componentStatusDeferred.promise;
if(!this.componentStatusResolved){
this.once('component-status-initialized', function(componentStatus){
componentStatusDeferred.resolve(componentStatus);
});
}
else{
componentStatusDeferred.resolve(this.componentStatusResolved);
}
}
return this.componentStatusPromise;
};
Cluster.prototype.stop = function () {

@@ -180,3 +229,3 @@ var master = new Process({

master.stop();
}
};

@@ -188,2 +237,2 @@ Cluster.prototype.shutdown = function () {

master.shutdown();
}
};

@@ -27,3 +27,5 @@ /*

Q = require('q'),
util = require('util');
util = require('util'),
getrusage = require('getrusage'),
memwatch = require('memwatch');

@@ -150,22 +152,23 @@ var debug = process.env['cluster2'];

if(message.type === "heartbeat"){
if(message.pid != process.pid){
self._heartbeats.push(message);//must append to the tail
}
self.lastTime = self.lastTime = Date.now();
self._heartbeatScheduler = self._heartbeatScheduler || setInterval(function () {
var count = self._heartbeats.length;
var aggr = _.reduce(self._heartbeats, function(memoize, heartbeat){
return {
pid: process.pid,//using master's pid
uptime: (memoize.uptime || 0) + heartbeat.uptime,//sum
totalmem: (memoize.totalmem || 0) + heartbeat.totalmem,//sum
freemem: (memoize.freemem || 0) + heartbeat.freemem,//sum
totalConnections: (memoize.totalConnections || 0) + heartbeat.totalConnections,//sum
pendingConnections: (memoize.pendingConnections || 0) + heartbeat.pendingConnections,//sum
timedoutConnections: (memoize.timedoutConnections || 0) + heartbeat.timedoutConnections//sum
};
}, {});
//delete aggregated heartbeats from self
var count = self._heartbeats.length,
threads = {},
aggr = {};
_.each(_.range(0, count), function(aggregated){
self._heartbeats.shift();
var heartbeat = self._heartbeats.shift();
_.each(heartbeat, function(val, key){
aggr[key] = (aggr[key] || 0) + val;
});
threads[heartbeat.pid] = heartbeat.pid;
});

@@ -175,11 +178,22 @@

self.emitter.emit('heartbeat', {
pid: process.pid,
uptime: aggr.uptime / count,//avg
totalmem: aggr.totalmem / count,//avg
freemem: aggr.freemem / count,//avg
totalConnections: aggr.totalConnections,//total
pendingConnections: aggr.pendingConnections,//total
timedoutConnections: aggr.timedoutConnections//total
'pid': process.pid,
'usertime': aggr.usertime / count,//avg
'systime': aggr.systime / count,//avg
'uptime': aggr.uptime / count,//avg
'totalmem': aggr.totalmem / count,//avg
'freemem': aggr.freemem / count,//avg
'totalConnections': aggr.totalConnections,//total
'pendingConnections': aggr.pendingConnections,//total
'timedoutConnections': aggr.timedoutConnections,//total
'fullGCs': aggr.fullGCs,//total
'incrementalGCs': aggr.incrementalGCs,//total
'heapCompactions': aggr.heapCompactions,//total
'totalTransactions': aggr.totalTransactions,//total
'totalDuration': aggr.totalDuration,//total
'threads': _.keys(threads).length,
'interval': Date.now() - self.lastTime
});
self.lastTime = Date.now();
}, self.options.heartbeatInterval || 60000);

@@ -313,2 +327,3 @@ }

if(cluster.isMaster) {
this.stats.pid = process.pid;

@@ -327,6 +342,9 @@ this.stats.start = new Date();

//register the master worker itself
require('./component-status.js').componentStatus.register('worker', function(){
var componentStatus = self.componentStatusResolved = require('./component-status.js').componentStatus;
componentStatus.register('worker', function(){
return 'm' + process.pid;
}, 'array');
self.emitter.emit('component-status-initialized', componentStatus);
// Monitor to serve log files and other stats - typically on an internal port

@@ -416,2 +434,3 @@ var monitor = new Monitor({

else {
var listening = false, conns = 0, totalConns = 0, timedoutConns = 0, noAppClosed = 0;

@@ -469,2 +488,12 @@ process.on('SIGINT', function() {

_.each(apps, function(app){
var notifyInitialized = _.once(function(){
var componentStatus = self.componentStatusResolved = require('./component-status.js').componentStatus;
componentStatus.register('worker', function(){
return process.pid;
}, 'array');
self.emitter.emit('component-status-initialized', componentStatus);
});
app.app.once('listening', function() {

@@ -478,5 +507,3 @@ app.listening = true;

require('./component-status.js').componentStatus.register('worker', function(){
return process.pid;
}, 'array');
notifyInitialized();
});

@@ -535,12 +562,39 @@

var memStats = {
'num_full_gc': 0,
'num_inc_gc': 0,
'heap_compactions': 0
};
memwatch.on('stats', function(stats){
_.extend(memStats, stats || {});
});
var txStats = {
'count': 0,
'totalDuration': 0
}
self.emitter.on('rootTransaction', function(tx){
txStats.count += 1;
txStats.totalDuration += tx.duration;
});
// Heartbeat - make sure to clear this on 'close'
var heartbeat = setInterval(function () {
var heartbeat = {
pid: process.pid,
uptime: Math.round(process.uptime()),
totalmem: os.totalmem(),
freemem: os.freemem(),
totalConnections: totalConns,
pendingConnections: conns,
timedoutConnections: timedoutConns
'pid': process.pid,
'usertime': getrusage.getcputime(),
'systime': getrusage.getsystemtime(),
'uptime': Math.round(process.uptime()),
'totalmem': os.totalmem(),
'freemem': os.freemem(),
'totalConnections': totalConns,
'pendingConnections': conns,
'timedoutConnections': timedoutConns,
'fullGCs': memStats['num_full_gc'],
'incrementalGCs': memStats['num_inc_gc'],
'heapCompactions': memStats['heap_compactions'],
'totalTransactions': txStats.count,
'totalDuration': txStats.totalDuration
};

@@ -555,2 +609,9 @@

process.send(toMaster);
memStats.num_full_gc = 0;
memStats.num_inc_gc = 0;
memStats.heap_compactions = 0;
txStats.count = 0;
txStats.totalDuration = 0;
}, self.options.heartbeatInterval || 60000);

@@ -557,0 +618,0 @@

@@ -8,3 +8,3 @@ {

"name": "cluster2",
"version": "0.4.2",
"version": "0.4.3",
"repository": {

@@ -19,15 +19,16 @@ "type": "git",

"dependencies": {
"underscore": "",
"express": "2.5.11",
"ejs": "",
"npm": "",
"q": ""
"underscore": "~1.4.4",
"express": "~2.5.11",
"ejs": "~0.8.4",
"npm": "~1.3.0",
"q": "~0.9.6",
"memwatch": "~0.2.2",
"getrusage": "~0.3.3"
},
"devDependencies": {
"express": "2.5.11",
"websocket": "",
"nodeunit": "",
"request": "",
"mocha": "",
"should": ""
"websocket": "~1.0.8",
"nodeunit": "~0.8.0",
"request": "~2.21.0",
"mocha": "~1.11.0",
"should": "~1.2.2"
},

@@ -34,0 +35,0 @@ "scripts": {

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc