Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

clusterhub

Package Overview
Dependencies
Maintainers
1
Versions
20
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

clusterhub - npm Package Compare versions

Comparing version 0.1.2 to 0.2.0

lib/fork.js

566

lib/index.js

@@ -1,559 +0,13 @@

var cluster = require('cluster')
, EventVat = require('eventvat')
var cluster = require('cluster')
, Hub = require('./hub')
;
// command constants
var EVENT = 0
, ON = 1
, OFF = 2
, OFFALL = 3
, READY = 4
var isMaster = cluster.isMaster;
var isWorker = cluster.isWorker;
/**
* Shortcut to check if a property exists in an object.
* @param (Object) obj
* @param (string) key
* @return (boolean)
* Require fork.js to overwrite cluster.fork()
*/
function has(obj, key) {
return Object.hasOwnProperty.call(obj, key);
}
require('./fork');
/**
* Keep track of hubs and workers
*/
var hubs = {};
var workers = [];
var queue = [];
function queuemsg(fn, msg) {
if (isReady()) {
fn(msg);
} else {
queue.push({ fn: fn, msg: msg });
}
}
/**
* Returns true if all workers are online and ready
*/
function isReady() {
var online = workers.filter(function(obj) { return obj.ready; });
return online.length === workers.length;
}
/**
* When all workers are online, this tells all hubs in the current process
*/
function ready() {
var key, hub, listener;
for (key in hubs) {
if (has(hubs, key)) {
hub = hubs[key];
if (hub.isready) continue;
hub.isready = true;
while (listener = hub._onready.shift()) listener();
}
}
}
/**
* Filter for only online workers and calls fn on each.
* @param (Function) fn
*/
function forWorkers(fn) {
workers = workers.filter(function(child) {
return !!child.worker._channel;
});
workers.forEach(fn);
}
/**
* Broadcasts an event to all workers
* @param (string) id Hub id
* @param (string) event
* @param (Object) args
*/
function broadcast(id, event, args, origin) {
forWorkers(function(child) {
if (origin && child.worker === origin) return;
if (!has(child.events, event)) return;
child.worker.send({
dir : __dirname
, hub : id
, event : event
, args : args
});
});
}
/**
* Emit events for a hub.
* @param (string) id Hub id
* @param (string) event
* @param (Object) args
*/
function emit(id, event, args) {
var hub = hubs[id];
// check if there are listeners for this event
if (!has(hub._listeners, event)) return;
hub._listeners[event].forEach(function(listener) {
listener.apply({ local: false, remote: true }, args);
});
}
/**
* Listen for events from master
*/
process.on('message', function(msg) {
if (msg.cmd === READY) {
return ready();
}
// check if hub exists
if (msg.dir !== __dirname) return;
if (msg.hub === undefined || !has(hubs, msg.hub)) return;
var hub = hubs[msg.hub];
if (msg.event) {
emit(msg.hub, msg.event, msg.args);
} else {
// it can be a response to another command too
hub._cb[msg.key].apply(null, msg.args);
delete hub._cb[msg.key];
}
});
/**
* Forks a worker and keeps a reference to it. Listens for any emitted
* events so it can distribute them amongst any worker listeners.
* @return (child)
*/
var origFork = cluster.fork;
cluster.fork = function() {
var worker = origFork();
var events = {};
var obj = { worker: worker, events: events, ready: false };
workers.push(obj);
function onmessage(msg) {
if (msg.cmd === 'online') {
worker.emit('online');
obj.ready = true;
for (var key in hubs) {
if (has(hubs, key)) hubs[key].emit('online', msg._workerId);
}
// check if all workers are online and ready
if (!isReady()) return;
// process any messages that were buffered while the hub
// was not ready
while (msg = queue.shift()) msg.fn(msg.msg);
// tell all workers hub is ready
forWorkers(function(child) {
child.worker.send({ cmd: READY });
});
ready();
return;
}
if (msg.hub === undefined || msg.cmd === undefined) return;
if (msg.dir !== __dirname) return;
if (!has(hubs, msg.hub)) {
new Hub(msg.hub);
}
switch (msg.cmd) {
// if this is an emitted event, distribute it amongst all workers
// who are listening for the event. Except the one who sent it
case EVENT:
queuemsg(onevent, msg);
break;
// if it's on/off, add/remove counters to know if this worker should
// get notified of any events or not
case ON:
onon(msg);
break;
case OFF:
onoff(msg);
break;
case OFFALL:
onoffall(msg);
break;
// can be a EventVat command
// in that case, execute it on the EventVat instance for this hub
default:
queuemsg(oncmd, msg);
}
}
function onevent(msg) {
broadcast(msg.hub, msg.event, msg.args, worker);
emit(msg.hub, msg.event, msg.args);
}
function onon(msg) {
if (has(events, msg.event)) {
events[msg.event]++;
} else {
events[msg.event] = 1;
}
}
function onoff(msg) {
if (has(events, msg.event) && --events[msg.event] === 0) {
delete events[msg.event];
}
}
function onoffall(msg) {
if (msg.event) {
if (has(events, msg.event)) {
delete events[msg.event];
}
} else {
obj.events = events = {};
}
}
function oncmd(msg) {
var db = hubs[msg.hub]._db;
var result = db[msg.cmd].apply(db, msg.args);
// if key is given, then a callback is waiting for the result
if (msg.key) {
worker.send({
dir : __dirname
, hub : msg.hub
, key : msg.key
, args : [result]
});
}
}
worker.on('message', onmessage);
return worker;
};
/**
* Remove workers on death
*/
cluster.on('death', function(worker) {
workers.forEach(function(obj, i) {
if (obj.worker === worker) {
workers.splice(i, 1);
return false;
}
});
});
/**
* @constructor
*/
function Hub(id) {
this._id = id || '';
if (has(hubs, this._id)) return hubs[this._id];
hubs[this._id] = this;
this._listeners = {};
this.isready = false;
this._onready = [];
if (isMaster) {
this._db = new EventVat();
var self = this;
self._db.onAny(function() {
var args = Array.prototype.slice.call(arguments);
if (has(self._listeners, this.event)) {
self._listeners[this.event].forEach(function(listener) {
listener.apply(null, args);
});
}
broadcast(self._id, this.event, args);
});
} else {
this._cb = {};
}
};
/**
* Attach all commands from EventVat to Hub. This sends a command to the
* master process to do with hub data.
*/
Object.keys(EventVat.prototype).forEach(function(cmd) {
Hub.prototype[cmd] = function() {
var self = this
, args = Array.prototype.slice.call(arguments)
, cb
if (typeof args[args.length - 1] === 'function') {
cb = args.pop();
}
if (isMaster) {
var rs = self._db[cmd].apply(self._db, args)
if (cb) process.nextTick(function() { cb(rs); });
return rs;
} else {
// if this is a worker, generate a random number so we know what
// callback to call when the master responds
if (cb) {
var key;
while (has(self._cb, (key = Math.floor(Math.random() * 20000))));
self._cb[key] = cb;
}
self._send({
cmd: cmd
, args: args
, key: key
});
}
};
});
/**
* Sends message to master/worker
* @param (Object) message
*/
Hub.prototype._send = function(message) {
message.dir = __dirname;
message.hub = this._id;
// check if channel is open
if (!process._channel) {
this.emitLocal('error', new Error('Master channel closed'));
return;
}
process.send(message);
};
/**
* Emits event to all workers and the master in the hub.
* @param (string) event
* @param (Object) args...
*/
Hub.prototype.emit = function() {
this.emitRemote.apply(this, arguments);
this.emitLocal.apply(this, arguments);
};
/**
* @alias for emit
*/
Hub.prototype.publish = Hub.prototype.emit;
/**
* Emits an event only to the current process.
* @param (string) event
* @param (Object) args...
*/
Hub.prototype.emitLocal = function(event) {
var args = Array.prototype.slice.call(arguments, 1);
emit(this._id, event, args);
};
/**
* Emits an event only to all other workes in the hub including master.
* @param (string) event
* @param (Object) args...
*/
Hub.prototype.emitRemote = function(event) {
var args = Array.prototype.slice.call(arguments, 1);
if (isWorker) {
this._send({
cmd: EVENT
, event: event
, args: args
});
} else {
broadcast(this._id, event, args);
}
};
/**
* @alias for emitRemote
*/
Hub.prototype.broadcast = Hub.prototype.emitRemote;
/**
* Starts listening to an event within the hub.
* @param (string) event The event to listen for.
* @param (function(args...)) listener The function that gets called
* when one of the workers emits it.
*/
Hub.prototype.on = function(event, listener) {
if (!has(this._listeners, event)) this._listeners[event] = [];
this._listeners[event].push(listener);
if (isWorker) {
this._send({
cmd: ON
, event: event
});
}
};
/**
* @alias for on
*/
Hub.prototype.addListener = Hub.prototype.on;
Hub.prototype.subscribe = Hub.prototype.on;
/**
* Removes a listener from listening to an event.
* @param (string) event
* @param (function) listener
*/
Hub.prototype.off = function(event, listener) {
if (!has(this._listeners, event)) return;
// remove local listener
var listeners = this._listeners[event];
var found = false;
for (var i = 0, l = listeners.length; i < l; i++) {
var liss = listeners[i];
if (liss === listener || liss.listener === listener) {
listeners.splice(i, 1);
found = true;
break;
}
}
// tell master there is one less listener for this event
if (found && isWorker) {
this._send({
cmd: OFF
, event: event
});
}
};
/**
* @alias
*/
Hub.prototype.removeListener = Hub.prototype.off;
Hub.prototype.unsubscribe = Hub.prototype.off;
/**
* Listens for n number of the event and then stops listening.
* @param (number) n
* @param (string) event
* @param (function(args...)) listener
*/
Hub.prototype.many = function(n, event, listener) {
var self = this;
function wrapper() {
if (--n === 0) self.off(event, listener);
listener.apply(this, arguments);
}
wrapper.listener = listener;
self.on(event, wrapper);
};
/**
* Shortcut for `many(1, event, listener)`
* @param (string) event
* @param (function(args...)) listener
*/
Hub.prototype.once = function(event, listener) {
this.many(1, event, listener);
};
/**
* Removes all listeners for the event.
* @param (string) event
*/
Hub.prototype.removeAllListeners = function(event) {
if (event) {
if (has(this._listeners, event)) {
delete this._listeners[event];
}
} else {
this._listeners = {};
}
if (isWorker) {
this._send({
cmd: OFFALL
, event: event
});
}
};
/**
* Tells fn when all children in the process are online and ready
*/
Hub.prototype.ready = function(fn) {
if (this.isready) {
process.nextTick(fn);
} else {
this._onready.push(fn);
}
};
/**
* Removes Hub instance from memory
*/
Hub.prototype.destroy = function() {
this._db.die();
delete hubs[this._id];
};
/**
* Export an intance of a hub. This can be used if the clusterhub user

@@ -569,8 +23,2 @@ * doesn't wanna bother creating a new hub. It will be considered the global

globalHub.getWorkers = function() {
return workers.map(function(child) {
return child.worker;
});
};
module.exports = globalHub;

@@ -580,4 +28,4 @@

// expose cluster
globalHub.isMaster = isMaster;
globalHub.isWorker = isWorker;
globalHub.isMaster = cluster.isMaster;
globalHub.isWorker = cluster.isWorker;
globalHub.fork = cluster.fork;

@@ -5,3 +5,3 @@ {

"keywords": ["cluster", "load balance", "database", "multi process", "sync"],
"version": "0.1.2",
"version": "0.2.0",
"repository": {

@@ -14,3 +14,3 @@ "type": "git",

"scripts": {
"test": "mocha test/masterworker-test.js && mocha test/workerworker-test.js && mocha test/master-test.js && mocha test/kill-worker-test.js && mocha test/kill-master-test.js"
"test": "mocha test/masterworker-test.js && mocha test/workerworker-test.js && mocha test/master-test.js && mocha test/kill-worker-test.js"

@@ -22,9 +22,9 @@ },

"engines": {
"node": ">=0.6"
"node": ">=0.8"
},
"dependencies": {
"eventvat": "0.1.x"
"eventvat": "0.2.x"
},
"devDependencies": {
"mocha": "0.10.x"
"mocha": "x"
},

@@ -31,0 +31,0 @@ "licenses": [ {

@@ -30,2 +30,3 @@ # clusterhub [![Build Status](https://secure.travis-ci.org/fent/clusterhub.png)](http://travis-ci.org/fent/clusterhub)

# Features
* Efficient event emitter system. Clusterhub will waste no time sending an event to a process that isn't listening for it. Events from the same process of a listener will be immediately emitted.

@@ -36,2 +37,3 @@ * In process database. Each hub has its own instance of a redis-like database powered by [EventVat](https://github.com/hij1nx/EventVat).

# Motive
Node.js is a perfect candidate to developing [Date Intensive Real-time Applications](http://video.nextconf.eu/video/1914374/nodejs-digs-dirt-about). Load balancing in these applications can become complicated when having to share data between processes.

@@ -56,5 +58,2 @@

### Hub#getWorkers()
Returns an array of all workers (child processes) spawned with `cluster.fork()`.
Additionally, all functions from the regular [EventEmitter](http://nodejs.org/docs/latest/api/events.html#events.EventEmitter) are included. Plus a couple of extras.

@@ -76,3 +75,3 @@

All functions from [EventVat](https://github.com/hij1nx/EventVat) are included as well. Their returned value can be accessed by providing a callback as the last argument. Or directly if used by the master.
All functions from [EventVat](https://github.com/hij1nx/EventVat) are included as well. Their returned value can be accessed by providing a callback as the last argument. Or optionally by its returned value if called by the master.

@@ -91,5 +90,7 @@ #### worker process

var returnedVal = hub.incr('foo', function(val) {
// can be given a callback for consistency
console.log(val === 1); // true
});
// but since it's the master process it has direct access to the database
console.log(returnedVal === 1); // true

@@ -103,3 +104,7 @@ ```

To use with node v0.6.x look at the v0.1.x tag.
npm install clusterhub@0.1.x
# Tests

@@ -106,0 +111,0 @@ Tests are written with [mocha](http://visionmedia.github.com/mocha/)

@@ -22,3 +22,3 @@ var hub = require('..')

it('Waits then kill a random worker', function(done) {
it('Waits then destroy a random worker', function(done) {
hub.on('done', done);

@@ -28,7 +28,8 @@

var worker = workers[Math.floor(Math.random() * WORKERS)];
worker.kill();
cluster.on('death', function(worker) {
cluster.on('exit', function(worker) {
cluster.fork();
});
worker.destroy();
});

@@ -48,6 +49,2 @@

hub.ready(function() {
setInterval(function() {
hub.incr('foo');
}, Math.floor(Math.random() * 100));
setTimeout(function() {

@@ -54,0 +51,0 @@ hub.emit('hello');

@@ -53,3 +53,3 @@ var hub = require('..')

cluster.on('death', exit);
cluster.on('exit', exit);
});

@@ -56,0 +56,0 @@

@@ -26,3 +26,3 @@ var hub = require('..')

cluster.on('death', exit);
cluster.on('exit', exit);
});

@@ -29,0 +29,0 @@

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc