Comparing version 0.0.6 to 0.1.0
142
index.js
@@ -8,43 +8,18 @@ 'use strict'; | ||
var uuid = require('node-uuid'); | ||
var net = require('net'); | ||
var freeport = require('./lib/freeport'); | ||
// var inspect = require('./lib/inspect'); | ||
var async = require('async'); | ||
var util = require('util'); | ||
function inspect(obj, depth, multiLine) { | ||
var res = util.inspect(obj, false, depth || 10, true); | ||
if (!multiLine) { | ||
res = res.replace(/(\r\n|\n|\r)/gm, ' '); | ||
} | ||
return res.replace(/\s+/g, ' '); | ||
} | ||
function freeport(cb) { | ||
var server = net.createServer(); | ||
var port; | ||
server.on('listening', function () { | ||
port = server.address().port; | ||
server.close(); | ||
}); | ||
server.on('close', function () { | ||
cb(null, port); | ||
}); | ||
server.listen(0); | ||
} | ||
// ------------------------------ CONSTRUCTOR ---------------------------------- | ||
var Node = function (opt) { | ||
var One = function (opt) { | ||
opt = opt || {}; | ||
// the id of the service that the node will provide | ||
this._service = opt.service || 'indigo1cluster'; | ||
this._service = opt.service || 'unnamedService'; | ||
// cluster which the node belongs to | ||
this._cluster = opt.cluster || 'default'; | ||
this._cluster = opt.cluster || 'defaultCluster'; | ||
@@ -83,23 +58,23 @@ // id of the node | ||
Node.prototype.getId = function () { | ||
One.prototype.getId = function () { | ||
return this._id; | ||
}; | ||
Node.prototype.getCluster = function () { | ||
One.prototype.getCluster = function () { | ||
return this._cluster; | ||
}; | ||
Node.prototype.getClusterTopology = function () { | ||
One.prototype.getClusterTopology = function () { | ||
return this._clusterTopology; | ||
}; | ||
Node.prototype.inCluster = function () { | ||
One.prototype.inCluster = function () { | ||
return this._inCluster; | ||
}; | ||
Node.prototype.advertising = function () { | ||
One.prototype.advertising = function () { | ||
return this._advertising; | ||
}; | ||
Node.prototype.join = function (callback) { | ||
One.prototype.join = function (callback) { | ||
this._pub = zmq.socket('pub'); | ||
@@ -155,3 +130,3 @@ this._sub = zmq.socket('sub'); | ||
this._emitter.emit('join', this._cluster); | ||
if (typeof(callback) === 'function') process.nextTick(function () { callback(null, this._cluster); }.bind(this)); | ||
callback && process.nextTick(callback.bind(null, null, this._cluster)); | ||
@@ -164,6 +139,6 @@ }.bind(this)); | ||
Node.prototype.leave = function (callback) { | ||
One.prototype.leave = function (callback) { | ||
var that = this; | ||
async.series([ | ||
async.waterfal([ | ||
function (next) { | ||
@@ -196,7 +171,7 @@ that._stopDiscovery(next); | ||
that._clusterTop = {}; | ||
that._clusterTopology = {}; | ||
// callback + emit | ||
that._emitter.emit('leave', that._cluster); | ||
if (typeof(callback) === 'function') process.nextTick(function () { callback(null, that._cluster); }.bind(that)); | ||
callback && process.nextTick(callback.bind(null, null, that._cluster)); | ||
}); | ||
@@ -207,3 +182,3 @@ | ||
Node.prototype.startAdvertise = function (details, callback) { | ||
One.prototype.advertise = function (details, callback) { | ||
// fix params in case user does not provide details | ||
@@ -218,3 +193,3 @@ if (typeof(details) === 'function') { | ||
var banner = { | ||
name: this._id, | ||
id: this._id, | ||
txtRecord: { | ||
@@ -243,3 +218,3 @@ cluster: this._cluster | ||
this._emitter.emit('advertise_start', this._adInfo); | ||
if (typeof(callback) === 'function') process.nextTick(function () { callback(null, this._adInfo); }.bind(this)); | ||
callback && process.nextTick(callback.bind(null, null, this._adInfo)); | ||
@@ -253,3 +228,3 @@ }.bind(this)); | ||
Node.prototype.stopAdvertise = function (callback) { | ||
One.prototype.stopAdvertise = function (callback) { | ||
this._ad.stop(); | ||
@@ -261,3 +236,3 @@ | ||
this._emitter.emit('advertise_stop', this._adInfo); | ||
if (typeof(callback) === 'function') process.nextTick(function () { callback(null, this._adInfo); }.bind(this)); | ||
callback && process.nextTick(callback.bind(null, null, this._adInfo)); | ||
@@ -267,3 +242,3 @@ return this; | ||
Node.prototype.subscribe = function (channel, callback) { | ||
One.prototype.subscribe = function (channel, callback) { | ||
if (!this._inCluster) { | ||
@@ -275,6 +250,6 @@ return this._error(new Error('Can\'t subscribe while not in cluster'), callback); | ||
this._sub.subscribe(channel); | ||
this._sub.subscribe(channel + ':'); // ":" added for separating chan from msg | ||
this._emitter.emit('subscribe', channel); | ||
if (typeof(callback) === 'function') process.nextTick(function () { callback(null, channel); }.bind(this)); | ||
callback && process.nextTick(callback.bind(null, null, channel)); | ||
@@ -284,3 +259,3 @@ return this; | ||
Node.prototype.unsubscribe = function (channel, callback) { | ||
One.prototype.unsubscribe = function (channel, callback) { | ||
if (!this._inCluster) { | ||
@@ -290,6 +265,6 @@ return this._error(new Error('Can\'t unsubscribe while not in cluster'), callback); | ||
this._sub.unsubscribe(channel); | ||
this._sub.unsubscribe(channel + ':'); // ":" added for separating chan from msg | ||
this._emitter.emit('unsubscribe', channel); | ||
if (typeof(callback) === 'function') process.nextTick(function () { callback(null, channel); }.bind(this)); | ||
callback && process.nextTick(callback.bind(null, null, channel)); | ||
@@ -299,3 +274,3 @@ return this; | ||
Node.prototype.publish = function (channel, payload) { | ||
One.prototype.publish = function (channel, payload) { | ||
if (!this._inCluster) { | ||
@@ -314,31 +289,31 @@ return this._error(new Error('Can\'t publish while not in cluster')); | ||
Node.prototype.addListener = function () { | ||
One.prototype.addListener = function () { | ||
return this._emitter.addListener.apply(this._emitter, arguments); | ||
}; | ||
Node.prototype.on = function () { | ||
One.prototype.on = function () { | ||
return this._emitter.on.apply(this._emitter, arguments); | ||
}; | ||
Node.prototype.once = function () { | ||
One.prototype.once = function () { | ||
return this._emitter.once.apply(this._emitter, arguments); | ||
}; | ||
Node.prototype.removeListener = function () { | ||
One.prototype.removeListener = function () { | ||
return this._emitter.removeListener.apply(this._emitter, arguments); | ||
}; | ||
Node.prototype.removeAllListeners = function () { | ||
One.prototype.removeAllListeners = function () { | ||
return this._emitter.removeAllListeners.apply(this._emitter, arguments); | ||
}; | ||
Node.prototype.setMaxListeners = function () { | ||
One.prototype.setMaxListeners = function () { | ||
return this._emitter.setMaxListeners.apply(this._emitter, arguments); | ||
}; | ||
Node.prototype.listeners = function () { | ||
One.prototype.listeners = function () { | ||
return this._emitter.listeners.apply(this._emitter, arguments); | ||
}; | ||
Node.prototype.emit = function () { | ||
One.prototype.emit = function () { | ||
return this._emitter.emit.apply(this._emitter, arguments); | ||
@@ -349,3 +324,3 @@ }; | ||
Node.prototype._startDiscovery = function (callback) { | ||
One.prototype._startDiscovery = function (callback) { | ||
this._browser = mdns.createBrowser(mdns.tcp(this._service), { | ||
@@ -371,3 +346,3 @@ resolverSequence: [ | ||
Node.prototype._stopDiscovery = function (callback) { | ||
One.prototype._stopDiscovery = function (callback) { | ||
this._browser.stop(); | ||
@@ -385,19 +360,19 @@ | ||
Node.prototype._handleNodeUp = function (service) { | ||
One.prototype._handleNodeUp = function (banner) { | ||
// if node already in cluster or belongs to other cluster, ignore | ||
if (!mout.lang.isObject(this._clusterTopology[service.name]) && | ||
service.txtRecord.cluster === this._cluster) { | ||
if (!mout.lang.isObject(this._clusterTopology[banner.id]) && | ||
banner.txtRecord.cluster === this._cluster) { | ||
// add node to this node's perception of the cluster | ||
var info = { | ||
id: service.name, | ||
id: banner.id, | ||
timestamp: (new Date()).toJSON(), | ||
address: service.addresses[0], | ||
port: service.port | ||
address: banner.addresses[0], | ||
port: banner.port | ||
}; | ||
info.details = mout.lang.deepClone(service.txtRecord); | ||
info.details = mout.lang.deepClone(banner.txtRecord); | ||
delete info.details.cluster; | ||
this._clusterTopology[service.name] = info; | ||
this._clusterTopology[banner.name] = info; | ||
@@ -411,7 +386,7 @@ // connect to its pub socket | ||
Node.prototype._handleNodeDown = function (service) { | ||
One.prototype._handleNodeDown = function (banner) { | ||
// if node was in this cluster's perception of the cluster, remove it | ||
if (mout.lang.isObject(this._clusterTopology[service.name])) { | ||
var info = mout.lang.deepClone(this._clusterTopology[service.name]); | ||
delete this._clusterTopology[service.name]; | ||
if (mout.lang.isObject(this._clusterTopology[banner.name])) { | ||
var info = mout.lang.deepClone(this._clusterTopology[banner.name]); | ||
delete this._clusterTopology[banner.name]; | ||
@@ -422,7 +397,8 @@ this._emitter.emit('node_down', info); | ||
Node.prototype._handleMessage = function (data) { | ||
One.prototype._handleMessage = function (data) { | ||
data = data.toString(); | ||
var sepPos = data.indexOf(':'); | ||
var chan = data.substr(0, sepPos); | ||
var payload = data.substr(sepPos + 1); | ||
var sepPos = data.indexOf(':'), | ||
chan = data.substr(0, sepPos), | ||
payload = data.substr(sepPos + 1) | ||
; | ||
@@ -432,7 +408,7 @@ this._emitter.emit('message', chan, payload); | ||
Node.prototype._getBind = function (addr, port) { | ||
One.prototype._getBind = function (addr, port) { | ||
return 'tcp://' + addr + ':' + port; | ||
}; | ||
Node.prototype._assertChanValid = function (channel, callback) { | ||
One.prototype._assertChanValid = function (channel, callback) { | ||
if (channel.indexOf(':') > -1) { | ||
@@ -443,3 +419,3 @@ this._error(new Error('Can\'t use commas in channel names'), callback); | ||
Node.prototype._error = function (err, callback) { | ||
One.prototype._error = function (err, callback) { | ||
// note that the error event is only thrown if a callback was not provided | ||
@@ -457,2 +433,2 @@ if (typeof(callback) === 'function') { | ||
module.exports = Node; | ||
module.exports = One; |
{ | ||
"name": "1", | ||
"version": "0.0.6", | ||
"description": "distributed message queue based on ØMQ", | ||
"version": "0.1.0", | ||
"description": "Distributed pub/sub based on ØMQ", | ||
"main": "index.js", | ||
@@ -6,0 +6,0 @@ "scripts": { |
177
README.md
@@ -5,14 +5,5 @@ # 1 ( *One* ) | ||
*1* (pronounced One) is a sort of magnet module, gluing together all the nodes that you launch in a network, and providing a simple pub/sub. It allows you to separate several services in the same network by means of | ||
## Introduction | ||
*1* is a sort of magnet module, gluing together all the nodes that you launch | ||
in a network, and providing a channel based pub/sub. | ||
## Getting started | ||
Take a look at `bin/one.js`. | ||
## Installation | ||
@@ -55,4 +46,170 @@ | ||
## Getting started | ||
``` | ||
var One = require('1'); | ||
var one = new One(); | ||
// Let's do something when we receive messages. | ||
one.on('message', function (chan, msg) { | ||
console.info(chan + '>', msg); | ||
}); | ||
// Join the cluster. | ||
one.join(function (err, cluster) { | ||
err && throw new Error('Unable to join cluster: ' + err); | ||
// Advertise the service. | ||
one.advertise(function (err, adInfo) { | ||
err && throw new Error('Unable to advertise service: ' + err); | ||
// Subscribe a channel | ||
one.subscribe('some_channel', function (err, chan) { | ||
err && throw new Error('Unable to subscribe channel: ' + err); | ||
// Let's send a message to the channel periodically | ||
setTimeout(function () { | ||
one.publish('some_channel', 'You will be notified of this message'); | ||
one.publish('some_channel_you_did_not_subscribe', 'You will not get this message'); | ||
}, 500); | ||
}); | ||
}); | ||
}); | ||
``` | ||
Here's a more elaborate way of instantiating One, with a few extra options: | ||
``` | ||
// You can pass a few options when instantiating One. | ||
// Note that these are all optional, and you can instantiate without any option. | ||
// The example below shows all the default options. | ||
var one = new One({ | ||
// Id of the service you will be providing. | ||
service: 'unnamedService', | ||
// Cluster which this node belongs to. | ||
cluster: 'defaultCluster', | ||
// Id of this node. If null, a random id is generated. | ||
id: null, | ||
// Port used for publishing messages. If null, a free random port is used. | ||
port: null, | ||
// Interface in which the node will bind. | ||
address: '0.0.0.0' | ||
}); | ||
``` | ||
## Reference | ||
### Introduction | ||
This module can be used to easily create auto discoverable services that communicate through means of a distributed pub/sub. Unlike solutions based on Redis or some message queueing software, this module is based on 0MQ, enabling you to create a pub/sub without a single point of failure or bottleneck. | ||
### Advertising service | ||
Upon instantiation of *One*, you can specify the `service` which you are providing. This acts as an immediate identifier in case you create multiple service types that you don't want talking to each other. Only after you start advertising other nodes in the cluster will realise you have joined and listen to you. Until that moment, you are a silent node, which is only capable of listening. | ||
Usage: | ||
``` | ||
var one = new One({ | ||
service: 'myStorageService' | ||
}); | ||
// ... | ||
// Advertising service | ||
one.advertise(function (err, adInfo) { | ||
!err && console.log('Advertising', adInfo); | ||
}); | ||
// ... | ||
// Stopping advertisement | ||
one.stopAdvertise(function (err, adInfo) { | ||
!err && console.log('Stopped advertising', adInfo); | ||
}); | ||
``` | ||
### Clustering | ||
Unlike `service`, clustering allows you to partition multiple nodes of the same service in the same network. Basically, only nodes belonging to the same `cluster` will talk to each other. | ||
Usage: | ||
``` | ||
var one = new One({ | ||
service: 'myStorageService', | ||
cluster: 'cluster1' | ||
}); | ||
// Joining cluster | ||
one.join(function (err, cluster) { | ||
!err && console.log('Joined', cluster); | ||
}); | ||
// ... | ||
// Leaving cluster | ||
one.leave(function (err, cluster) { | ||
!err && console.log('Left', cluster); | ||
}); | ||
``` | ||
### Events | ||
Here's a complete list of the available events that you can listen to: | ||
``` | ||
one.on('join', function (cluster) { | ||
console.log('joined cluster:', cluster); | ||
}); | ||
one.on('leave', function (cluster) { | ||
console.log('left cluster:', cluster); | ||
}); | ||
one.on('advertise_start', function (adInfo) { | ||
console.log('started advertising:', adInfo); | ||
}); | ||
one.on('advertise_stop', function (adInfo) { | ||
console.log('stopped advertising:', adInfo); | ||
}); | ||
one.on('subscribe', function (channel) { | ||
console.log('subscribed:', channel); | ||
}); | ||
one.on('unsubscribe', function (channel) { | ||
console.log('unsubscribed:', channel); | ||
}); | ||
one.on('node_up', function (node) { | ||
console.log('node up:', node); | ||
}); | ||
one.on('node_down', function (node) { | ||
console.log('node down:', node); | ||
}); | ||
one.on('message', function (chan, payload) { | ||
console.log(chan + '>', payload); | ||
}); | ||
// Note that the error event is only emitted if you do not specify a callback to | ||
// a method that throws an error. | ||
one.on('error', function (err) { | ||
console.error('ERROR: ', err); | ||
}); | ||
``` | ||
## License | ||
Released under the [MIT License](http://www.opensource.org/licenses/mit-license.php). |
@@ -1,4 +0,6 @@ | ||
TODO | ||
==== | ||
# TODO | ||
- prevent from using channels with ':' | ||
- unit tests!!! | ||
- prevent from using channels with ':' | ||
- use promises instead of async.waterfal | ||
- review which information I actually eed to return on events/callbacks |
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
22869
13
214
373