clusterhub
Advanced tools
Comparing version 0.1.2 to 0.2.0
566
lib/index.js
@@ -1,559 +0,13 @@ | ||
var cluster = require('cluster') | ||
, EventVat = require('eventvat') | ||
var cluster = require('cluster') | ||
, Hub = require('./hub') | ||
; | ||
// command constants | ||
var EVENT = 0 | ||
, ON = 1 | ||
, OFF = 2 | ||
, OFFALL = 3 | ||
, READY = 4 | ||
var isMaster = cluster.isMaster; | ||
var isWorker = cluster.isWorker; | ||
/** | ||
* Shortcut to check if a property exists in an object. | ||
* @param (Object) obj | ||
* @param (string) key | ||
* @return (boolean) | ||
* Require fork.js to overwrite cluster.fork() | ||
*/ | ||
function has(obj, key) { | ||
return Object.hasOwnProperty.call(obj, key); | ||
} | ||
require('./fork'); | ||
/** | ||
* Keep track of hubs and workers | ||
*/ | ||
var hubs = {}; | ||
var workers = []; | ||
var queue = []; | ||
function queuemsg(fn, msg) { | ||
if (isReady()) { | ||
fn(msg); | ||
} else { | ||
queue.push({ fn: fn, msg: msg }); | ||
} | ||
} | ||
/** | ||
* Returns true if all workers are online and ready | ||
*/ | ||
function isReady() { | ||
var online = workers.filter(function(obj) { return obj.ready; }); | ||
return online.length === workers.length; | ||
} | ||
/** | ||
* When all workers are online, this tells all hubs in the current process | ||
*/ | ||
function ready() { | ||
var key, hub, listener; | ||
for (key in hubs) { | ||
if (has(hubs, key)) { | ||
hub = hubs[key]; | ||
if (hub.isready) continue; | ||
hub.isready = true; | ||
while (listener = hub._onready.shift()) listener(); | ||
} | ||
} | ||
} | ||
/** | ||
* Filter for only online workers and calls fn on each. | ||
* @param (Function) fn | ||
*/ | ||
function forWorkers(fn) { | ||
workers = workers.filter(function(child) { | ||
return !!child.worker._channel; | ||
}); | ||
workers.forEach(fn); | ||
} | ||
/** | ||
* Broadcasts an event to all workers | ||
* @param (string) id Hub id | ||
* @param (string) event | ||
* @param (Object) args | ||
*/ | ||
function broadcast(id, event, args, origin) { | ||
forWorkers(function(child) { | ||
if (origin && child.worker === origin) return; | ||
if (!has(child.events, event)) return; | ||
child.worker.send({ | ||
dir : __dirname | ||
, hub : id | ||
, event : event | ||
, args : args | ||
}); | ||
}); | ||
} | ||
/** | ||
* Emit events for a hub. | ||
* @param (string) id Hub id | ||
* @param (string) event | ||
* @param (Object) args | ||
*/ | ||
function emit(id, event, args) { | ||
var hub = hubs[id]; | ||
// check if there are listeners for this event | ||
if (!has(hub._listeners, event)) return; | ||
hub._listeners[event].forEach(function(listener) { | ||
listener.apply({ local: false, remote: true }, args); | ||
}); | ||
} | ||
/** | ||
* Listen for events from master | ||
*/ | ||
process.on('message', function(msg) { | ||
if (msg.cmd === READY) { | ||
return ready(); | ||
} | ||
// check if hub exists | ||
if (msg.dir !== __dirname) return; | ||
if (msg.hub === undefined || !has(hubs, msg.hub)) return; | ||
var hub = hubs[msg.hub]; | ||
if (msg.event) { | ||
emit(msg.hub, msg.event, msg.args); | ||
} else { | ||
// it can be a response to another command too | ||
hub._cb[msg.key].apply(null, msg.args); | ||
delete hub._cb[msg.key]; | ||
} | ||
}); | ||
/** | ||
* Forks a worker and keeps a reference to it. Listens for any emitted | ||
* events so it can distribute them amongst any worker listeners. | ||
* @return (child) | ||
*/ | ||
var origFork = cluster.fork; | ||
cluster.fork = function() { | ||
var worker = origFork(); | ||
var events = {}; | ||
var obj = { worker: worker, events: events, ready: false }; | ||
workers.push(obj); | ||
function onmessage(msg) { | ||
if (msg.cmd === 'online') { | ||
worker.emit('online'); | ||
obj.ready = true; | ||
for (var key in hubs) { | ||
if (has(hubs, key)) hubs[key].emit('online', msg._workerId); | ||
} | ||
// check if all workers are online and ready | ||
if (!isReady()) return; | ||
// process any messages that were buffered while the hub | ||
// was not ready | ||
while (msg = queue.shift()) msg.fn(msg.msg); | ||
// tell all workers hub is ready | ||
forWorkers(function(child) { | ||
child.worker.send({ cmd: READY }); | ||
}); | ||
ready(); | ||
return; | ||
} | ||
if (msg.hub === undefined || msg.cmd === undefined) return; | ||
if (msg.dir !== __dirname) return; | ||
if (!has(hubs, msg.hub)) { | ||
new Hub(msg.hub); | ||
} | ||
switch (msg.cmd) { | ||
// if this is an emitted event, distribute it amongst all workers | ||
// who are listening for the event. Except the one who sent it | ||
case EVENT: | ||
queuemsg(onevent, msg); | ||
break; | ||
// if it's on/off, add/remove counters to know if this worker should | ||
// get notified of any events or not | ||
case ON: | ||
onon(msg); | ||
break; | ||
case OFF: | ||
onoff(msg); | ||
break; | ||
case OFFALL: | ||
onoffall(msg); | ||
break; | ||
// can be a EventVat command | ||
// in that case, execute it on the EventVat instance for this hub | ||
default: | ||
queuemsg(oncmd, msg); | ||
} | ||
} | ||
function onevent(msg) { | ||
broadcast(msg.hub, msg.event, msg.args, worker); | ||
emit(msg.hub, msg.event, msg.args); | ||
} | ||
function onon(msg) { | ||
if (has(events, msg.event)) { | ||
events[msg.event]++; | ||
} else { | ||
events[msg.event] = 1; | ||
} | ||
} | ||
function onoff(msg) { | ||
if (has(events, msg.event) && --events[msg.event] === 0) { | ||
delete events[msg.event]; | ||
} | ||
} | ||
function onoffall(msg) { | ||
if (msg.event) { | ||
if (has(events, msg.event)) { | ||
delete events[msg.event]; | ||
} | ||
} else { | ||
obj.events = events = {}; | ||
} | ||
} | ||
function oncmd(msg) { | ||
var db = hubs[msg.hub]._db; | ||
var result = db[msg.cmd].apply(db, msg.args); | ||
// if key is given, then a callback is waiting for the result | ||
if (msg.key) { | ||
worker.send({ | ||
dir : __dirname | ||
, hub : msg.hub | ||
, key : msg.key | ||
, args : [result] | ||
}); | ||
} | ||
} | ||
worker.on('message', onmessage); | ||
return worker; | ||
}; | ||
/** | ||
* Remove workers on death | ||
*/ | ||
cluster.on('death', function(worker) { | ||
workers.forEach(function(obj, i) { | ||
if (obj.worker === worker) { | ||
workers.splice(i, 1); | ||
return false; | ||
} | ||
}); | ||
}); | ||
/** | ||
* @constructor | ||
*/ | ||
function Hub(id) { | ||
this._id = id || ''; | ||
if (has(hubs, this._id)) return hubs[this._id]; | ||
hubs[this._id] = this; | ||
this._listeners = {}; | ||
this.isready = false; | ||
this._onready = []; | ||
if (isMaster) { | ||
this._db = new EventVat(); | ||
var self = this; | ||
self._db.onAny(function() { | ||
var args = Array.prototype.slice.call(arguments); | ||
if (has(self._listeners, this.event)) { | ||
self._listeners[this.event].forEach(function(listener) { | ||
listener.apply(null, args); | ||
}); | ||
} | ||
broadcast(self._id, this.event, args); | ||
}); | ||
} else { | ||
this._cb = {}; | ||
} | ||
}; | ||
/** | ||
* Attach all commands from EventVat to Hub. This sends a command to the | ||
* master process to do with hub data. | ||
*/ | ||
Object.keys(EventVat.prototype).forEach(function(cmd) { | ||
Hub.prototype[cmd] = function() { | ||
var self = this | ||
, args = Array.prototype.slice.call(arguments) | ||
, cb | ||
if (typeof args[args.length - 1] === 'function') { | ||
cb = args.pop(); | ||
} | ||
if (isMaster) { | ||
var rs = self._db[cmd].apply(self._db, args) | ||
if (cb) process.nextTick(function() { cb(rs); }); | ||
return rs; | ||
} else { | ||
// if this is a worker, generate a random number so we know what | ||
// callback to call when the master responds | ||
if (cb) { | ||
var key; | ||
while (has(self._cb, (key = Math.floor(Math.random() * 20000)))); | ||
self._cb[key] = cb; | ||
} | ||
self._send({ | ||
cmd: cmd | ||
, args: args | ||
, key: key | ||
}); | ||
} | ||
}; | ||
}); | ||
/** | ||
* Sends message to master/worker | ||
* @param (Object) message | ||
*/ | ||
Hub.prototype._send = function(message) { | ||
message.dir = __dirname; | ||
message.hub = this._id; | ||
// check if channel is open | ||
if (!process._channel) { | ||
this.emitLocal('error', new Error('Master channel closed')); | ||
return; | ||
} | ||
process.send(message); | ||
}; | ||
/** | ||
* Emits event to all workers and the master in the hub. | ||
* @param (string) event | ||
* @param (Object) args... | ||
*/ | ||
Hub.prototype.emit = function() { | ||
this.emitRemote.apply(this, arguments); | ||
this.emitLocal.apply(this, arguments); | ||
}; | ||
/** | ||
* @alias for emit | ||
*/ | ||
Hub.prototype.publish = Hub.prototype.emit; | ||
/** | ||
* Emits an event only to the current process. | ||
* @param (string) event | ||
* @param (Object) args... | ||
*/ | ||
Hub.prototype.emitLocal = function(event) { | ||
var args = Array.prototype.slice.call(arguments, 1); | ||
emit(this._id, event, args); | ||
}; | ||
/** | ||
* Emits an event only to all other workes in the hub including master. | ||
* @param (string) event | ||
* @param (Object) args... | ||
*/ | ||
Hub.prototype.emitRemote = function(event) { | ||
var args = Array.prototype.slice.call(arguments, 1); | ||
if (isWorker) { | ||
this._send({ | ||
cmd: EVENT | ||
, event: event | ||
, args: args | ||
}); | ||
} else { | ||
broadcast(this._id, event, args); | ||
} | ||
}; | ||
/** | ||
* @alias for emitRemote | ||
*/ | ||
Hub.prototype.broadcast = Hub.prototype.emitRemote; | ||
/** | ||
* Starts listening to an event within the hub. | ||
* @param (string) event The event to listen for. | ||
* @param (function(args...)) listener The function that gets called | ||
* when one of the workers emits it. | ||
*/ | ||
Hub.prototype.on = function(event, listener) { | ||
if (!has(this._listeners, event)) this._listeners[event] = []; | ||
this._listeners[event].push(listener); | ||
if (isWorker) { | ||
this._send({ | ||
cmd: ON | ||
, event: event | ||
}); | ||
} | ||
}; | ||
/** | ||
* @alias for on | ||
*/ | ||
Hub.prototype.addListener = Hub.prototype.on; | ||
Hub.prototype.subscribe = Hub.prototype.on; | ||
/** | ||
* Removes a listener from listening to an event. | ||
* @param (string) event | ||
* @param (function) listener | ||
*/ | ||
Hub.prototype.off = function(event, listener) { | ||
if (!has(this._listeners, event)) return; | ||
// remove local listener | ||
var listeners = this._listeners[event]; | ||
var found = false; | ||
for (var i = 0, l = listeners.length; i < l; i++) { | ||
var liss = listeners[i]; | ||
if (liss === listener || liss.listener === listener) { | ||
listeners.splice(i, 1); | ||
found = true; | ||
break; | ||
} | ||
} | ||
// tell master there is one less listener for this event | ||
if (found && isWorker) { | ||
this._send({ | ||
cmd: OFF | ||
, event: event | ||
}); | ||
} | ||
}; | ||
/** | ||
* @alias | ||
*/ | ||
Hub.prototype.removeListener = Hub.prototype.off; | ||
Hub.prototype.unsubscribe = Hub.prototype.off; | ||
/** | ||
* Listens for n number of the event and then stops listening. | ||
* @param (number) n | ||
* @param (string) event | ||
* @param (function(args...)) listener | ||
*/ | ||
Hub.prototype.many = function(n, event, listener) { | ||
var self = this; | ||
function wrapper() { | ||
if (--n === 0) self.off(event, listener); | ||
listener.apply(this, arguments); | ||
} | ||
wrapper.listener = listener; | ||
self.on(event, wrapper); | ||
}; | ||
/** | ||
* Shortcut for `many(1, event, listener)` | ||
* @param (string) event | ||
* @param (function(args...)) listener | ||
*/ | ||
Hub.prototype.once = function(event, listener) { | ||
this.many(1, event, listener); | ||
}; | ||
/** | ||
* Removes all listeners for the event. | ||
* @param (string) event | ||
*/ | ||
Hub.prototype.removeAllListeners = function(event) { | ||
if (event) { | ||
if (has(this._listeners, event)) { | ||
delete this._listeners[event]; | ||
} | ||
} else { | ||
this._listeners = {}; | ||
} | ||
if (isWorker) { | ||
this._send({ | ||
cmd: OFFALL | ||
, event: event | ||
}); | ||
} | ||
}; | ||
/** | ||
* Tells fn when all children in the process are online and ready | ||
*/ | ||
Hub.prototype.ready = function(fn) { | ||
if (this.isready) { | ||
process.nextTick(fn); | ||
} else { | ||
this._onready.push(fn); | ||
} | ||
}; | ||
/** | ||
* Removes Hub instance from memory | ||
*/ | ||
Hub.prototype.destroy = function() { | ||
this._db.die(); | ||
delete hubs[this._id]; | ||
}; | ||
/** | ||
* Export an intance of a hub. This can be used if the clusterhub user | ||
@@ -569,8 +23,2 @@ * doesn't wanna bother creating a new hub. It will be considered the global | ||
globalHub.getWorkers = function() { | ||
return workers.map(function(child) { | ||
return child.worker; | ||
}); | ||
}; | ||
module.exports = globalHub; | ||
@@ -580,4 +28,4 @@ | ||
// expose cluster | ||
globalHub.isMaster = isMaster; | ||
globalHub.isWorker = isWorker; | ||
globalHub.isMaster = cluster.isMaster; | ||
globalHub.isWorker = cluster.isWorker; | ||
globalHub.fork = cluster.fork; |
@@ -5,3 +5,3 @@ { | ||
"keywords": ["cluster", "load balance", "database", "multi process", "sync"], | ||
"version": "0.1.2", | ||
"version": "0.2.0", | ||
"repository": { | ||
@@ -14,3 +14,3 @@ "type": "git", | ||
"scripts": { | ||
"test": "mocha test/masterworker-test.js && mocha test/workerworker-test.js && mocha test/master-test.js && mocha test/kill-worker-test.js && mocha test/kill-master-test.js" | ||
"test": "mocha test/masterworker-test.js && mocha test/workerworker-test.js && mocha test/master-test.js && mocha test/kill-worker-test.js" | ||
@@ -22,9 +22,9 @@ }, | ||
"engines": { | ||
"node": ">=0.6" | ||
"node": ">=0.8" | ||
}, | ||
"dependencies": { | ||
"eventvat": "0.1.x" | ||
"eventvat": "0.2.x" | ||
}, | ||
"devDependencies": { | ||
"mocha": "0.10.x" | ||
"mocha": "x" | ||
}, | ||
@@ -31,0 +31,0 @@ "licenses": [ { |
@@ -30,2 +30,3 @@ # clusterhub [![Build Status](https://secure.travis-ci.org/fent/clusterhub.png)](http://travis-ci.org/fent/clusterhub) | ||
# Features | ||
* Efficient event emitter system. Clusterhub will waste no time sending an event to a process that isn't listening for it. Events from the same process of a listener will be immediately emitted. | ||
@@ -36,2 +37,3 @@ * In process database. Each hub has its own instance of a redis-like database powered by [EventVat](https://github.com/hij1nx/EventVat). | ||
# Motive | ||
Node.js is a perfect candidate to developing [Date Intensive Real-time Applications](http://video.nextconf.eu/video/1914374/nodejs-digs-dirt-about). Load balancing in these applications can become complicated when having to share data between processes. | ||
@@ -56,5 +58,2 @@ | ||
### Hub#getWorkers() | ||
Returns an array of all workers (child processes) spawned with `cluster.fork()`. | ||
Additionally, all functions from the regular [EventEmitter](http://nodejs.org/docs/latest/api/events.html#events.EventEmitter) are included. Plus a couple of extras. | ||
@@ -76,3 +75,3 @@ | ||
All functions from [EventVat](https://github.com/hij1nx/EventVat) are included as well. Their returned value can be accessed by providing a callback as the last argument. Or directly if used by the master. | ||
All functions from [EventVat](https://github.com/hij1nx/EventVat) are included as well. Their returned value can be accessed by providing a callback as the last argument. Or optionally by its returned value if called by the master. | ||
@@ -91,5 +90,7 @@ #### worker process | ||
var returnedVal = hub.incr('foo', function(val) { | ||
// can be given a callback for consistency | ||
console.log(val === 1); // true | ||
}); | ||
// but since it's the master process it has direct access to the database | ||
console.log(returnedVal === 1); // true | ||
@@ -103,3 +104,7 @@ ``` | ||
To use with node v0.6.x look at the v0.1.x tag. | ||
npm install clusterhub@0.1.x | ||
# Tests | ||
@@ -106,0 +111,0 @@ Tests are written with [mocha](http://visionmedia.github.com/mocha/) |
@@ -22,3 +22,3 @@ var hub = require('..') | ||
it('Waits then kill a random worker', function(done) { | ||
it('Waits then destroy a random worker', function(done) { | ||
hub.on('done', done); | ||
@@ -28,7 +28,8 @@ | ||
var worker = workers[Math.floor(Math.random() * WORKERS)]; | ||
worker.kill(); | ||
cluster.on('death', function(worker) { | ||
cluster.on('exit', function(worker) { | ||
cluster.fork(); | ||
}); | ||
worker.destroy(); | ||
}); | ||
@@ -48,6 +49,2 @@ | ||
hub.ready(function() { | ||
setInterval(function() { | ||
hub.incr('foo'); | ||
}, Math.floor(Math.random() * 100)); | ||
setTimeout(function() { | ||
@@ -54,0 +51,0 @@ hub.emit('hello'); |
@@ -53,3 +53,3 @@ var hub = require('..') | ||
cluster.on('death', exit); | ||
cluster.on('exit', exit); | ||
}); | ||
@@ -56,0 +56,0 @@ |
@@ -26,3 +26,3 @@ var hub = require('..') | ||
cluster.on('death', exit); | ||
cluster.on('exit', exit); | ||
}); | ||
@@ -29,0 +29,0 @@ |
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Non-existent author
Supply chain riskThe package was published by an npm account that no longer exists.
Found 1 instance in 1 package
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
14
113
25192
762
1
1
+ Addedeventvat@0.2.1(transitive)
- Removedeventvat@0.1.9(transitive)
Updatedeventvat@0.2.x