multicast-eventemitter
Advanced tools
Comparing version 0.0.8 to 0.10.0
@@ -9,12 +9,13 @@ // Copyright 2011 Thorcom Systems Ltd. All Rights Reserved. | ||
setInterval(function() { | ||
console.log('emitting channelA'); | ||
emitter.emit('channelA', 'this is channel A', new Date().getTime()); | ||
console.log('emitting channelB'); | ||
emitter.emit('channelB', 'this is channel B', new Date().getTime()); | ||
var now = new Date().getTime(); | ||
console.log('emitting eventA', now); | ||
emitter.emit('eventA', 'this is eventA', now); | ||
console.log('emitting eventB', now); | ||
emitter.emit('eventB', 'this is eventB', now); | ||
}, 1000); | ||
// subscribe to channelA events | ||
emitter.on('channelA', function(text, time) { | ||
console.log('message received on channelA:', text, time); | ||
// subscribe to eventB events | ||
emitter.on('eventB', function(text, time) { | ||
console.log('eventB received...', 'text:', text, 'time:', time); | ||
}); | ||
@@ -7,5 +7,5 @@ // Copyright 2011 Thorcom Systems Ltd. All Rights Reserved. | ||
// subscribe to channelA events | ||
emitter.on('channelA', function(text, time) { | ||
console.log('message received on channelA:', text, time); | ||
// subscribe to eventA events | ||
emitter.on('eventA', function(text, time) { | ||
console.log('eventA received...', 'text:', text, 'time:', time); | ||
}); |
@@ -9,6 +9,6 @@ // Copyright 2011 Thorcom Systems Ltd. All Rights Reserved. | ||
setInterval(function() { | ||
console.log('emitting channelA'); | ||
emitter.emit('channelA', 'this is channel A', new Date().getTime()); | ||
console.log('emitting channelB'); | ||
emitter.emit('channelB', 'this is channel B', new Date().getTime()); | ||
console.log('emitting eventA'); | ||
emitter.emit('eventA', 'this is eventA', new Date().getTime()); | ||
console.log('emitting eventB'); | ||
emitter.emit('eventB', 'this is eventB', new Date().getTime()); | ||
}, 1000); |
// Copyright 2011 Thorcom Systems Ltd. All Rights Reserved. | ||
var VERSION_0_8_X = "v0.8.x"; | ||
var VERSION_0_10_X = "v0.10.x"; | ||
var version = nodeVersion(); | ||
/* | ||
* This is required because they changed the way that bind works between 0.8.x and 0.10.x | ||
* I have not yet found a way of writing code which works on both, without this switch. | ||
*/ | ||
function nodeVersion() { | ||
if (process.version.match(/^v0\.8\./)) { | ||
return VERSION_0_8_X; | ||
} else if (process.version.match(/^v0\.10\./)) { | ||
return VERSION_0_10_X; | ||
} else { | ||
throw 'multicast-eventemitter can only work with NodeJS v0.8.x and v0.10.x'; | ||
} | ||
} | ||
var dgram = require('dgram') | ||
@@ -11,3 +29,3 @@ , util = require('util') | ||
'transport': 'multicast' // 'pgm' (zeromq) to follow... | ||
, 'multicastInteface': undefined | ||
, 'multicastInterface': undefined | ||
, 'ttl': 64 | ||
@@ -19,2 +37,6 @@ , 'overrides' : {} // should be of form "event_name: { address: 'address', port: port }" | ||
/* | ||
* A function to get or create the singleton emitter. | ||
*/ | ||
exports.getEmitter = getEmitter; | ||
@@ -28,11 +50,24 @@ function getEmitter() { | ||
/* | ||
* The constructor. Perhaps this shouldn't be exported? | ||
*/ | ||
exports.MulticastEventEmitter = MulticastEventEmitter; | ||
function MulticastEventEmitter() { | ||
this.src = getIPv4Address() + '/' + process.pid; | ||
this.listenersByEvent = {}; // a hash of arrays of functions | ||
this.serversByEvent = {}; // a hash of bound datagram servers | ||
this.lastSeq = {}; // a hash of last heard sequence number, by src | ||
this.src = Math.random(); // unique id of this sender | ||
this.seq = 0; | ||
this.serversByEvent = {}; // a hash of bound datagram rx servers | ||
this.lastSeq = {}; // a hash of last heard sequence number, by src and event | ||
this.seqByEvent = {}; // a hash of last sent sequence numbers, by event | ||
this.sender = dgram.createSocket('udp4'); | ||
var that = this; | ||
this.sender.bind(function() { | ||
that.sender.setBroadcast(true); | ||
that.sender.setMulticastTTL(options.ttl); | ||
that.sender.setMulticastLoopback(true); // needed from inter-process intra-box comms | ||
}); | ||
} | ||
/* | ||
* This adds a listener. | ||
*/ | ||
MulticastEventEmitter.prototype.addListener = function(event, listener) { | ||
@@ -51,9 +86,20 @@ //console.log('addListener', event, listener); | ||
// create and keep track of udp multicast listening server | ||
var server = this.serversByEvent = dgram.createSocket('udp4'); | ||
server.addMembership(hash.address, options.multicastInterface); | ||
server.setMulticastTTL(options.ttl); | ||
server.setMulticastLoopback(true); // needed from inter-process intra-box comms | ||
var that = this; | ||
server.on('message', function(msg, rinfo) { that.handleMessage(event, msg, rinfo); }); | ||
server.bind(hash.port, hash.address); | ||
var server = this.serversByEvent[event] = dgram.createSocket('udp4'); | ||
// work around lack of backwards compatibility of NodeJS v0.10.X re: multicast | ||
if (version === VERSION_0_10_X) { | ||
server.bind(hash.port, hash.address, function() { | ||
server.setMulticastTTL(options.ttl); | ||
server.addMembership(hash.address, options.multicastInterface); | ||
server.setMulticastLoopback(true); // needed from inter-process intra-box comms | ||
server.on('message', function(msg, rinfo) { that.handleMessage(event, msg, rinfo); }); | ||
}); | ||
} else { // VERSION_0_8_X | ||
server.bind(hash.port, hash.address); | ||
server.setMulticastTTL(options.ttl); | ||
server.addMembership(hash.address, options.multicastInterface); | ||
server.setMulticastLoopback(true); // needed from inter-process intra-box comms | ||
server.on('message', function(msg, rinfo) { that.handleMessage(event, msg, rinfo); }); | ||
} | ||
} | ||
@@ -63,2 +109,5 @@ this.listenersByEvent[event].push(listener); | ||
/* | ||
* This handles the incoming messages. | ||
*/ | ||
MulticastEventEmitter.prototype.handleMessage = function(event, msg, rinfo) { | ||
@@ -70,3 +119,11 @@ //console.log('handleMessage', event, msg, rinfo); | ||
if (message.seq) { | ||
this.lastSeq[message.src] = message.seq; | ||
var key = message.src + '/' + event; | ||
if (!this.lastSeq[key]) this.lastSeq[key] = 0; | ||
var missed = message.seq - (this.lastSeq[key] + 1); | ||
if (missed) { | ||
console.warn(missed, 'messages missed', this.lastSeq[key]); | ||
} else { | ||
//console.info(missed, 'messages missed', this.lastSeq[key], key); | ||
} | ||
this.lastSeq[key] = message.seq; | ||
} | ||
@@ -94,4 +151,8 @@ } catch(e) { | ||
/* | ||
* Emit an event. | ||
*/ | ||
MulticastEventEmitter.prototype.emit = function(event) { // varargs... | ||
var args = Array.prototype.slice.call(arguments); | ||
args.shift(); // don't send the event name as one of the arguments | ||
//console.log('emit', args); | ||
@@ -104,11 +165,26 @@ var hash; | ||
} | ||
var message = new Buffer(JSON.stringify({ event: event, args: args, src: this.src, seq: this.seq++ })); | ||
var socket = dgram.createSocket('udp4'); | ||
socket.addMembership(hash.address, options.multicastInterface); | ||
socket.setMulticastTTL(options.ttl); | ||
socket.setMulticastLoopback(true); // needed from inter-process intra-box comms | ||
socket.send(message, 0, message.length, hash.port, hash.address); | ||
socket.close(); // TODO: cache open sockets for reuse? | ||
if (!this.seqByEvent[event]) this.seqByEvent[event] = 0; | ||
var message = new Buffer(JSON.stringify({ event: event, args: args, src: this.src, seq: this.seqByEvent[event]++ })); | ||
//console.log("this.sender.send(" + message + ", " + 0 + ", " + message.length + ", " + hash.port + ", " + hash.address + ");"); | ||
this.sender.send(message, 0, message.length, hash.port, hash.address); | ||
} | ||
/* | ||
* Get the first non-localhost IPv4 address. | ||
* This is only used for unique identification (with pid), not comms. | ||
*/ | ||
function getIPv4Address() { | ||
var interfaces = require('os').networkInterfaces(); | ||
for (var devName in interfaces) { | ||
var iface = interfaces[devName]; | ||
for (var i = 0; i < iface.length; i++) { | ||
var alias = iface[i]; | ||
if (alias.family === 'IPv4' && alias.address !== '127.0.0.1' && !alias.internal) | ||
return alias.address; | ||
} | ||
} | ||
return '0.0.0.0'; | ||
} |
{ "name" : "multicast-eventemitter" | ||
, "description" : "LAN wide eventemitter, using multicast." | ||
, "version" : "0.0.8" | ||
, "version" : "0.10.0" | ||
, "maintainers" : | ||
@@ -17,6 +17,8 @@ [ { "name": "Chris Dew" | ||
, "main" : "./lib/multicast-eventemitter.js" | ||
, "engines" : { "node" : ">=0.4.7" } | ||
, "dependencies" : { "vows" : ">=0.5.2" | ||
, "docco" : ">=0.3.0" | ||
} | ||
, "engines" : { | ||
"node" : "~0.8.0 ~0.10.0" | ||
} | ||
, "devDependencies" : { "vows" : ">=0.5.2" | ||
, "docco" : ">=0.3.0" | ||
} | ||
} |
multicast-eventemitter | ||
---------------------- | ||
*Under heavy development - DO NOT USE IN PRODUCTION* | ||
Status: FIXED, now works with NodeJS v0.8.x and v0.10.x. | ||
Please see https://github.com/marak/hook.io for a mature alternative. | ||
This package provides a cluster-wide event emitter. Events sent from any process on any machine on a LAN can be subscribed to by any other process on any other machine. | ||
This package provides a cluster-wide event emitter. Message sent from any process on any machine on a LAN can be subscibed to by any other process on any other machine. | ||
Multicast is more efficient than broadcast. If messages were broadcast, then each subscriber would need to discard the the events it wasnt't interested in. With multicast, this is done by the NIC. | ||
@@ -14,6 +14,5 @@ It works efficiently by hashing event names into 24 bits of address space and 15 bits of port number - thus a 1 in half-a-trillion chance of event name collision. | ||
1. Messages are limited to about 1.5KB in length - exceeding this will produce a parsing error on receivers. | ||
1. Messages are limited to about 1.5KB in length - exceeding this *may* produce a parsing error on receivers. | ||
2. It is unreliable (though we receive 100% of sent messages on our LAN - YMMV) | ||
There are known and unknown bugs - see the FIXMEs and TODOs in the code. |
New author
Supply chain riskA new npm collaborator published a version of the package for the first time. New collaborators are usually benign additions to a project, but do indicate a change to the security surface area of a package.
Found 1 instance in 1 package
Non-existent author
Supply chain riskThe package was published by an npm account that no longer exists.
Found 1 instance in 1 package
11249
0
264
0
2
9
18
2
- Removeddocco@>=0.3.0
- Removedvows@>=0.5.2
- Removedbalanced-match@1.0.2(transitive)
- Removedbrace-expansion@1.1.11(transitive)
- Removedcommander@8.3.0(transitive)
- Removedconcat-map@0.0.1(transitive)
- Removeddiff@4.0.2(transitive)
- Removeddocco@0.9.1(transitive)
- Removedeyes@0.1.8(transitive)
- Removedfs-extra@10.0.1(transitive)
- Removedfs.realpath@1.0.0(transitive)
- Removedglob@7.2.3(transitive)
- Removedgraceful-fs@4.2.11(transitive)
- Removedhighlight.js@11.3.1(transitive)
- Removedinflight@1.0.6(transitive)
- Removedinherits@2.0.4(transitive)
- Removedjsonfile@6.1.0(transitive)
- Removedmarked@4.0.19(transitive)
- Removedminimatch@3.1.2(transitive)
- Removedonce@1.4.0(transitive)
- Removedpath-is-absolute@1.0.1(transitive)
- Removedunderscore@1.13.7(transitive)
- Removeduniversalify@2.0.1(transitive)
- Removedvows@0.8.3(transitive)
- Removedwrappy@1.0.2(transitive)