Comparing version 0.3.8 to 0.3.9
@@ -35,2 +35,12 @@ /* | ||
exports.version = require('../package.json').version; | ||
exports.defaultOptions = { | ||
cluster: true, | ||
port: 3000, | ||
monPort: 3001, | ||
ecv: { | ||
path: '/ecv' | ||
}, | ||
monPath: '/', | ||
noWorkers: os.cpus().length | ||
}; | ||
@@ -41,10 +51,7 @@ var Cluster = module.exports = function Cluster(options) { | ||
this.options = options || {}; | ||
this.options.port = this.options.port || 3000; | ||
this.options.monPort = this.options.monPort || 3001; | ||
this.options.ecv = this.options.ecv || { | ||
path: '/ecv' | ||
} | ||
this.options.monPath = this.options.monPath || '/'; | ||
this.options.noWorkers = this.options.noWorkers || os.cpus().length; | ||
this.options = {}; | ||
_.extend(this.options, exports.defaultOptions); | ||
_.extend(this.options, options); | ||
assert.notEqual(this.options.port, this.options.monPort, "monitor port & application port cannot use the same!"); | ||
} | ||
@@ -61,3 +68,3 @@ | ||
if(!self.options.hasOwnProperty('cluster') || self.options.cluster === true) { | ||
if(self.options.cluster) { | ||
var master = new Process({ | ||
@@ -64,0 +71,0 @@ pids: process.cwd() + '/pids', |
@@ -25,3 +25,5 @@ /* | ||
os = require('os'), | ||
fs = require('fs'); | ||
fs = require('fs'), | ||
Q = require('q'), | ||
util = require('util'); | ||
@@ -31,3 +33,3 @@ var debug = process.env['cluster2']; | ||
if(debug) { | ||
console.log.apply(null, arguments); | ||
console.log.apply(null, (arguments || []).join('')); | ||
} | ||
@@ -49,4 +51,6 @@ } | ||
this._heartbeats = []; | ||
this.killall = function(signal) { | ||
log('killall called with signal ' + signal); | ||
log('killall called with signal ', signal); | ||
var that = this, fullname; | ||
@@ -67,3 +71,3 @@ fs.readdir(that.options.pids, function(err, paths) { | ||
if(count === 1 && mf) { | ||
log('Sending ' + signal + ' to the master'); | ||
log('Sending ', signal, ' to the master'); | ||
that.kill(that.options.pids + '/' + mf, signal); | ||
@@ -78,7 +82,7 @@ } | ||
this.kill = function(fullname, signal, f) { | ||
log('sending ' + signal + ' to ' + fullname); | ||
log('sending ', signal, ' to ', fullname); | ||
fs.readFile(fullname, 'ascii', function(err, data) { | ||
var pid = parseInt(data); | ||
if(pid === process.pid) { | ||
log('Unlinking ' + fullname); | ||
log('Unlinking ', fullname); | ||
fs.unlinkSync(fullname); | ||
@@ -96,3 +100,3 @@ process.exit(0); | ||
fs.unlink(fullname, function(err) { | ||
log('Unlinking ' + fullname); | ||
log('Unlinking ', fullname); | ||
if(err) { | ||
@@ -122,2 +126,3 @@ console.error('Unable to delete ' + fullname); | ||
self.killall('SIGKILL'); | ||
clearInterval(self._heartbeatScheduler); | ||
} | ||
@@ -129,4 +134,5 @@ }); | ||
var self = this; | ||
fs.writeFileSync(this.options.pids + '/worker.' + worker.pid + '.pid', worker.pid); | ||
fs.writeFileSync(util.format('%s/worker.%d.pid', this.options.pids, worker.pid), worker.pid); | ||
self.emitter.emit('forked', worker.pid); | ||
@@ -136,16 +142,81 @@ | ||
worker.on('message', function (message) { | ||
if(message.type === 'counter') { | ||
var name = message.name; | ||
if(!self.stats.workers[message.pid]) { | ||
self.stats.workers[message.pid] = {}; | ||
} | ||
var pidStats = self.stats.workers[message.pid]; | ||
if(!pidStats[name]) { | ||
pidStats[name] = 0 | ||
} | ||
pidStats[name]++; | ||
if(message.type === 'counter') { | ||
var name = message.name; | ||
if(!self.stats.workers[message.pid]) { | ||
self.stats.workers[message.pid] = {}; | ||
} | ||
var pidStats = self.stats.workers[message.pid]; | ||
if(!pidStats[name]) { | ||
pidStats[name] = 0 | ||
} | ||
pidStats[name]++; | ||
self.emitter.emit("listening", message.pid); | ||
} | ||
); | ||
if(message.type === "heartbeat"){ | ||
if(message.pid != process.pid){ | ||
self._heartbeats.push(message);//must append to the tail | ||
} | ||
self._heartbeatScheduler = self._heartbeatScheduler || setInterval(function () { | ||
var count = self._heartbeats.length; | ||
var aggr = _.reduce(self._heartbeats, function(memoize, heartbeat){ | ||
return { | ||
pid: process.pid,//using master's pid | ||
uptime: (memoize.uptime || 0) + heartbeat.uptime,//sum | ||
freemem: (memoize.freemem || 0) + heartbeat.freemem,//sum | ||
totalConnections: (memoize.totalConnections || 0) + heartbeat.totalConnections,//sum | ||
pendingConnections: (memoize.pendingConnections || 0) + heartbeat.pendingConnections,//sum | ||
timedoutConnections: (memoize.timedoutConnections || 0) + heartbeat.timedoutConnections//sum | ||
}; | ||
}, {}); | ||
//delete aggregated heartbeats from self | ||
_.each(_.range(0, count), function(aggregated){ | ||
self._heartbeats.shift(); | ||
}); | ||
//emit the aggregated heartbeat message | ||
self.emitter.emit('heartbeat', { | ||
pid: process.pid, | ||
uptime: aggr.uptime / count,//avg | ||
freemem: aggr.freemem / count,//avg | ||
totalConnections: aggr.totalConnections,//total | ||
pendingConnections: aggr.pendingConnections,//total | ||
timedoutConnections: aggr.timedoutConnections//total | ||
}); | ||
}, 60000); | ||
} | ||
else if(message.type === "read-config"){ | ||
var sameOrigin = function(origin, message){ | ||
return _.isEqual(_.pick(origin, "domain", "target", "project", "config", "version"), | ||
_.pick(message, "domain", "target", "project", "config", "version")); | ||
}, | ||
origin = message, | ||
deferred = Q.defer(); | ||
self.emitter.on("config-read", function(message){ | ||
if(sameOrigin(origin, message)){ | ||
deferred.resolve(message); | ||
} | ||
}); | ||
var timeOut = setTimeout(function(){ | ||
deferred.reject(new Error("timeout")); | ||
}, 10000);//timeout after 10 seconds | ||
//let either raptor-config or node-config module to handle the "read-config" message | ||
//simple protocol is to wait for "config-read" response back within 10 secs; | ||
self.emitter.emit("read-config", message); | ||
deferred.promise | ||
.then(function(message){ | ||
clearTimeout(timeOut); | ||
self.notifyWorkers(message); | ||
}) | ||
.fail(function(error){ | ||
message.error = error; | ||
self.notifyWorkers(message); | ||
}) | ||
.done(); | ||
} | ||
}); | ||
this.stats.noWorkers++; | ||
@@ -201,4 +272,4 @@ | ||
fs.writeFileSync(self.options.pids + '/master.' + self.stats.pid + '.pid', self.stats.pid); | ||
console.log('Master ' + process.pid + ' started'); | ||
fs.writeFileSync(util.format('%s/master.%d.pdf', self.options.pids, self.stats.pid), self.stats.pid); | ||
log('Master ', process.pid, ' started'); | ||
@@ -211,4 +282,4 @@ // Fork workers | ||
var deathWatcher = function (worker, code, signal) { | ||
if(code === 0) { | ||
var deathWatcher = function (worker, code, signal) { | ||
if(code === 0) { | ||
self.stats.noWorkers--; | ||
@@ -220,4 +291,5 @@ return; | ||
self.stats.noWorkers--; | ||
var worker = self.createWorker(); | ||
self.workers[worker.pid + ''] = worker; | ||
//bugfix by huzhou@ebay.com, worker & replacement name collision | ||
var replacement = self.createWorker(); | ||
self.workers[replacement.pid + ''] = replacement; | ||
delete self.workers[worker.pid + '']; | ||
@@ -234,3 +306,3 @@ delete self.stats.workers[worker.pid]; | ||
process.on('SIGTERM', function() { | ||
log(process.pid + ' got SIGTERM'); | ||
log(process.pid, ' got SIGTERM'); | ||
self.emitter.emit('SIGTERM', { | ||
@@ -240,5 +312,5 @@ pid: process.pid, | ||
}); | ||
var internal = setInterval(function() { | ||
var interval = setInterval(function() { | ||
if(self.stats.noWorkers === 0) { | ||
clearInterval(internal); | ||
clearInterval(interval); | ||
process.exit(0); | ||
@@ -321,3 +393,8 @@ } | ||
app.app.on('listening', function() { | ||
apps.listening = true; | ||
app.listening = true; | ||
process.send({ | ||
type:"counter", | ||
name:process.pid, | ||
pid:process.pid | ||
}); | ||
}); | ||
@@ -330,3 +407,3 @@ | ||
app.app.listen(port, host, function() { | ||
console.log('Worker ' + process.pid + ' listening on ' + port); | ||
log('Worker ', process.pid, ' listening on ', port); | ||
if(self.options.ecv) { | ||
@@ -374,3 +451,3 @@ ecv.enable(apps, self.options, self.emitter, function(data) { | ||
var heartbeat = setInterval(function () { | ||
self.emitter.emit('heartbeat', { | ||
var heartbeat = { | ||
pid: process.pid, | ||
@@ -382,3 +459,10 @@ uptime: Math.round(process.uptime()), | ||
timedoutConnections: timedoutConns | ||
}); | ||
}; | ||
self.emitter.emit('heartbeat', heartbeat); | ||
var toMaster = { | ||
type:"heartbeat" | ||
}; | ||
_.extend(toMaster, heartbeat); | ||
process.send(toMaster); | ||
}, 60000); | ||
@@ -398,3 +482,3 @@ | ||
process.on('exit', function () { | ||
log(process.pid + ' is about to exit.'); | ||
log(process.pid, ' is about to exit.'); | ||
}); | ||
@@ -401,0 +485,0 @@ }; |
@@ -8,3 +8,3 @@ { | ||
"name": "cluster2", | ||
"version": "0.3.8", | ||
"version": "0.3.9", | ||
"repository": { | ||
@@ -22,3 +22,4 @@ "type": "git", | ||
"ejs": "", | ||
"npm": "" | ||
"npm": "", | ||
"q": "" | ||
}, | ||
@@ -25,0 +26,0 @@ "devDependencies": { |
@@ -28,2 +28,3 @@ ## What is cluster2 | ||
npm install cluster2 | ||
@@ -30,0 +31,0 @@ ### Start a TCP Server |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Wildcard dependency
QualityPackage has a dependency with a floating version range. This can cause issues if the dependency publishes a new major version.
Found 1 instance in 1 package
New author
Supply chain riskA new npm collaborator published a version of the package for the first time. New collaborators are usually benign additions to a project, but do indicate a change to the security surface area of a package.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
84888
23
1297
245
5
5
3
+ Addedq@
+ Addedasap@2.0.6(transitive)
+ Addedpop-iterate@1.0.1(transitive)
+ Addedq@2.0.3(transitive)
+ Addedweak-map@1.0.8(transitive)