cluster2
This is a completely overhaul, not expected to be backward compatible, but the features should cover the most popular while some changes are on their way:
simplification
You'll see that we've simplified the api a great deal, no more cluster class, instance to worry, a single #listen method to take all dancing parts.
And those configurable pieces mostly have reasonable defaults, and could be easily set from command line arguments. For example, --port=8080
, --cache.enable
etc.
Also we've adopted Promise A+ (when.js). style to replace the callbacks, we like it for the fewer level of nested code, a lot.
You'll also find some redundant features like: multiple app/port support, ecv on workers, none cluster mode, all removed to keep code compact.
var listen = require('cluster2').listen;
listen({
'noWorkers': 1,
'createServer': require('http').createServer,
'app': app,
'port': 9090,
'configureApp': function(app){
return app;
},
'warmUp': function(app){
return app;
},
'debug': {
'webPort': 9092,
'saveLiveEdit': true
},
'ecv': {
'mode': 'control',
'root': '/ecv'
},
'cache': {
'enable': true,
'mode': 'standalone'
},
'gc': {
'monitor': true,
'idle-noitification': false,
'explicit': false
},
'monCreateServer': require('http').createServer,
'monConfigureApp': function(monApp){
return monApp;
},
'monApp': monApp,
'monPort': 9091,
'maxAge': 3600,
'heartbeatInterval': 5000
})
.then(function(resolved){
})
.otherwise(function(error){
});
application flow
For this cluster2 to work perfect, you might need to accept some of the assumption we made of your application flow. It exists to make your life easier, so as for our middleware registration to work as expected.
The flow is as the following:
- master starts
listen
- master configures
monApp
with given monConfigureApp
- master starts
caching
service if enabled - master creats server using
monCreateServer
and takes in configured monApp
- master starts server on the
monPort
and wait for listening
event - master starts forking workers
- worker starts
listen
- worker configures
app
with given configureApp
- worker creates server using
createServer
and takes in configured app
- worker starts server on
port
and wait for listening
event - worker receives
listening
event and starts warmup
- worker waits for
warmup
to complete and notify master that it's ready to serve traffic - worker resolves the
promise
returned by listen
- master receives notifications from all workers then mark up
ecv
- master then resolves the
promise
returned by listen
A few key points:
- The abstract pattern is the same for master & worker (different in what's done in each step): listen -> configure app -> create server -> server listen -> warmup -> resolve promise
- Caching service starts early, so that you could start using cache whether in master or worker, after configure app
- WarmUp is added as an explicit step to allow application to be performant when the traffic is on.
- Configure app, and warmup could return value (app) or promise which resolves to the app.
A clear flow as above allows users to inject their middleware, routes, warm up their application in a deterministic manner.
And we could leverage this, so that we could safely register middleware like tps collection in front of users'. This makes testing much easier too,
as the promise won't be resolved till the server actually starts, no more timed waiting, event emitting etc. You're good to request anything by then.
emitter
Cluster used to be an emitter itself, which isn't very much helpful, and forced event register/emit to be delayed till the cluster instance is created.
Even if it's created, accessing the instance from different modules require the instance to be passed down, or global, neither looks appealing.
The new cluster-emitter is designed to work with cluster not cluster2 instance at all (in fact, we eliminated the cluster2 instance as you see the api above)
The emitter also makes communications between worker & master (or reverse) as simple as a normal EventEmitter.
var emitter = require('cluster2/emitter');
emitter.on('event', function callback(){
});
emitter.once('event', function callbackOnce(){
});
emitter.removeListener('event', callback);
emitter.removeListener('event', callbackOnce);
emitter.removeAllListeners('event');
emitter.emit('event', 'arg0', 'arg1');
emitter.emitTo(['self'].concat(_.map(cluster.workers, function(w){return w.process.pid;})), ['event', 'arg0', 'arg1']);
emitter.emitTo(['self', 'master'], ['event', 'arg0', 'arg1']);
emitter.to(['master']).emit('event', 'arg0', 'arg1');
ecv
ECV is a preserved feature, but we've simplified that too. Most of the use cases we've seen doesn't really need an ECV for each worker process, in fact
that could be very confusing. To let tools view the cluster as an entirety, ECV is to run only in master runtime, it still supports the 'monitor' vs. 'control' mode.
var enable = require('cluster2/ecv').enable;
enable(app);
var listen = require('cluster2').listen;
listen({
'noWorkers': 1,
'createServer': require('http').createServer,
'app': app,
'port': 9090,
'monPort': 9091,
'debug': {
'webPort': 9092,
'saveLiveEdit': true
},
'ecv': {
'mode': 'control',
'root': '/ecv',
'markUp': '/ecv/markUp',
'markDown': '/ecv/markDown'
},
'heartbeatInterval': 5000
});
listen({
'noWorkers': 1,
'createServer': require('http').createServer,
'app': app,
'port': 9090,
'monPort': 9091,
'debug': {
'webPort': 9092,
'saveLiveEdit': true
},
'ecv': {
'mode': 'monitor',
'root': '/ecv',
'monitor': '/myapplication/route1',
'validator': function(err, response, body){
return true;
}
},
'heartbeatInterval': 5000
});
debug
Ever imagined debugging to be simpler? Here's the good news, we've carefully designed the debugging process from the ground up of the new cluster.
With integration with ECV, worker lifecycle management, node-inspector, and bootstrap + websocket debug app (middleware to be exact). You're now
able to debug any running worker a few clicks away, same applies for a newly forked one.
http://localhost:9091/debug
(change host, port to your configured values) debug
route is what we added as a middleware to the monitor app given. It presents an insight of the running workers, their health; in addition, the cluster cache status. You could hover on a worker pid to request a node-inspector based debug, the control flow is described at __dirname/lib/public/images/live-debugging.png
.
The experience is designed to be the same across different environments, whether dev, qa, or even production, the same debugging flow and mechanism would make diagnostics much more effective.
deps
This is a preserved feature of cluster2, it simply list the npm ls result and give it under http://localhost:9091/deps
route, which looks like the following.
{
"name": "cluster2",
"version": "0.5.0",
"dependencies": {
"underscore": {
"version": "1.4.4",
"from": "underscore@~1.4.4"
},
"usage": {
"version": "0.3.8",
"from": "usage@~0.3.8",
"dependencies": {
"bindings": {
"version": "1.1.1",
"from": "bindings@1.x.x"
}
}
},
"when": {
"version": "2.3.0",
"from": "when@~2.3.0"
},
"graceful-fs": {
"version": "2.0.1",
"from": "graceful-fs@~2.0.0"
},
"gc-stats": {
"version": "0.0.1",
"from": "gc-stats@~0.0.1",
"resolved": "https://registry.npmjs.org/gc-stats/-/gc-stats-0.0.1.tgz"
},
"bignumber.js": {
"version": "1.1.1",
"from": "bignumber.js@~1.1.1"
}
}
}
robustness
This is sth we learned given the real experience of a node.js application, workers do get slower, whether that's memory leak, or GC becomes worse, it's easier to prepare
than to avoid. So as a step forward from the previous 'death watch', we're now proactively collecting performance statistics and to decide it a worker could be ended
before it gets slow. You could see the simple heurstic we put at __dirname/lib/utils.js
# assertOld
function. You can always overwrite this based on your application's characteristics, but this gives a good starting point based on heartbeat collected stats.
exports.assertOld = function assertOld(maxAge){
maxAge = maxAge || 3600 * 24 * 3;
return function(heartbeat){
return heartbeat.uptime >= maxAge;
};
};
exports.assertBadGC = function assertBadGC(){
var peaks = {};
return function(heartbeat){
var pid = heartbeat.pid,
uptime = heartbeat.uptime,
currTPS = heartbeat.tps || (heartbeat.transactions * 1000 / heartbeat.cycle);
if(currTPS <= 2){
return false;
}
var peak = peaks[pid] = peaks[pid] || {
'tps': currTPS,
'cpu': heartbeat.cpu,
'memory': heartbeat.memory,
'gc': {
'incremental': heartbeat.gc.incremental,
'full': heartbeat.gc.full
}
};
if(currTPS >= peak.tps){
peak.tps = Math.max(heartbeat.tps, peak.tps);
peak.cpu = Math.max(heartbeat.cpu, peak.cpu);
peak.memory = Math.max(heartbeat.memory, peak.memory);
peak.gc.incremental = Math.max(heartbeat.gc.incremental, peak.gc.incremental);
peak.gc.full = Math.max(heartbeat.gc.full, peak.gc.full);
}
else if(currTPS < peak.tps * 0.9
&& heartbeat.cpu > peak.cpu
&& heartbeat.memory > peak.memory
&& heartbeat.gc.incremental > peak.gc.incremental
&& heartbeat.gc.full >= peak.gc.full){
return true;
}
return false;
}
};
{
'shouldKill': options.shouldKill || (function(){
var assertions = [assertOld(_this.maxAge), assertBadGC()];
return function(heartbeat){
return _.some(assertions, function(a){
return a(heartbeat);
});
};
})()
}
Apart from the above mentioned proactive collection, we noticed another subtle issue in practice. When a worker is dead, its load will be distributed to the rest of alives certainly, that adds some stress to the alives, but when more than one worker died at the same time, the stress could become problem.
Therefore, to prevent such from happening when worker is marked to be replaced, we made it a FIFO, further explained in __dirname/lib/utils
# deathQueue
function. Its purpose is to guarantee that no more than one worker could commit suicide and be replaced at the same time.
exports.deathQueue = (function(){
var tillPrevDeath = null,
queued = [];
return function deathQueue(pid, emitter, success, options){
options = options || {};
assert.ok(pid);
assert.ok(emitter);
assert.ok(success);
var wait = options.timeout || 60000,
death = util.format('worker-%d-died', pid),
logger = options.logger || {
'debug' : function(){
console.log.apply(console, arguments);
}
};
if(!_.contains(queued, pid)){
queued.push(pid);
var tillDeath = when.defer(),
afterDeath = null,
die = function(){
var successor = success();
emitter.once(util.format('worker-%d-warmup', successor.process.pid), function(){
logger.debug('[deathQueue] successor:%d of %d warmup', successor.process.pid, pid);
emitter.to(['master', pid]).emit('disconnect', pid);
emitter.once(death, function(){
logger.debug('[deathQueue] %d died', pid);
tillDeath.resolve(pid);
if(tillPrevDeath === afterDeath){
logger.debug('[deathQueue] death queue cleaned up');
tillPrevDeath = null;
}
});
setTimeout(function(){
if(!exports.safeKill(pid, 'SIGTERM', logger)){
logger.debug('[deathQueue] worker:%d did not report death by:%d, kill by SIGTERM', pid, wait);
}
else{
logger.debug('[deathQueue] worker:%d probably ran away, emit:%s on behalf', death);
emitter.to(['master']).emit(death);
}
}, wait);
});
};
if(!tillPrevDeath){
afterDeath = tillPrevDeath = tillDeath.promise;
die();
}
else{
afterDeath = tillPrevDeath = tillPrevDeath.ensure(die);
}
}
};
})();
Oh, one more thing, much as we hope that all workers will behave well, let us know when it's going to give up, in reality, they might not.
For an additional level of protection, we added a simple nanny
monitor to our master, which simply collects each workers' last heartbeat
event and check if any possible runaway happened.
Once detected, it will be treated the same as a suicide event, using the above deathQueue
. This will ensure you won't have a cluster running fewer and fewer workers.
exports.nanny = function nanny(puppets, emitter, success, options){
assert.ok(puppets);
assert.ok(emitter);
assert.ok(success);
options = options || {};
var tolerance = options.tolerance,
now = Date.now();
_.each(puppets, function(p){
if(now - p.lastHeartbeat > tolerance){
exports.deathQueue(p.pid, emitter, success, options);
}
});
};
caching
This is as exciting as debugging, it allows workers to share computation results, watch over changes, in a fast and reliable manner.
We tried work delegation to master once, and found it error-prone and difficult to code against, caching makes things so much simpler, using domain socket, so much faster.
The atomic getOrLoad syntax makes sharing efficient, running cache manager as another worker and persistence support make it disaster recoverable.
It's like having a memcached process, only this is node, and you can debug it too.
var cache = require('cluster2/cache').use('cache-name', {
'persist': true,
'expire': 60000
});
var cache;
cache.keys({
'wait': 100
})
.then(function(keys){
});
listen({
'noWorkers': 1,
'createServer': require('http').createServer,
'app': app,
'port': 9090,
'monPort': 9091,
'debug': {
'webPort': 9092,
'saveLiveEdit': true
},
'ecv': {
'mode': 'control',
'root': '/ecv'
},
'cache': {
'enable': true,
'mode': 'standalone'
},
'heartbeatInterval': 5000
})
Note that, we allow you to use caching w/o cluster2, if you want to enable caching from none cluster2 runtime, the feature could be enabled via:
require('cluster2/cache').enable({
'enable': true
});
get
- with the loader, if concurrent
get
happens across the workers in a cluster, only one will be allowed to load while the rest will be in fact watch
till that one finishes loading. - this will reduce the stress upon the backend services which loads exact same data nicely
var cache;
cache.get('cache-key-1',
function(){
return 'cache-value-loaded-1';
},
{
'wait': 100
})
.then(function(value){
})
.otherwise(function(error){
});
var cache;
cache.set('cache-key-1',
'cache-value-loaded-1',
{
'leaveIfNotNull': false,
'wait': 100
})
.then(function(happens){
})
.otherwise(function(error){
});
var cache;
cache.del('cache-key-1',
{
'wait': 100
})
.then(function(value){
});
var cache;
cache.watch('cache-key-1',
function watching(value, key){
});
var cache;
cache.unwatch('cache-key-1', watching);
status
This is a helpful piece evolved from the current cluster2, which is to allow applications to easily register status of any interest.
It allows each worker to register its own state, master would automatically aggregate all states from active workers.
It works nicely with our monitor capability (via debug middleware)
require('cluster2/status')
.register('status-name',
function(){
return 'view';
},
function(value){
});
require('cluster2/status')
.statuses();
require('cluster2/status')
.getStatus('status-name')
.then(function(status){
})
.otherwise(function(error){
});
require('cluster2/status')
.setStatus('status-name',
'value')
.then(function(set){
})
.otherwise(function(error){
});