Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

multicast-eventemitter

Package Overview
Dependencies
Maintainers
1
Versions
9
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

multicast-eventemitter - npm Package Compare versions

Comparing version 0.0.8 to 0.10.0

15

examples/loopback.js

@@ -9,12 +9,13 @@ // Copyright 2011 Thorcom Systems Ltd. All Rights Reserved.

setInterval(function() {
console.log('emitting channelA');
emitter.emit('channelA', 'this is channel A', new Date().getTime());
console.log('emitting channelB');
emitter.emit('channelB', 'this is channel B', new Date().getTime());
var now = new Date().getTime();
console.log('emitting eventA', now);
emitter.emit('eventA', 'this is eventA', now);
console.log('emitting eventB', now);
emitter.emit('eventB', 'this is eventB', now);
}, 1000);
// subscribe to channelA events
emitter.on('channelA', function(text, time) {
console.log('message received on channelA:', text, time);
// subscribe to eventB events
emitter.on('eventB', function(text, time) {
console.log('eventB received...', 'text:', text, 'time:', time);
});

@@ -7,5 +7,5 @@ // Copyright 2011 Thorcom Systems Ltd. All Rights Reserved.

// subscribe to channelA events
emitter.on('channelA', function(text, time) {
console.log('message received on channelA:', text, time);
// subscribe to eventA events
emitter.on('eventA', function(text, time) {
console.log('eventA received...', 'text:', text, 'time:', time);
});

@@ -9,6 +9,6 @@ // Copyright 2011 Thorcom Systems Ltd. All Rights Reserved.

setInterval(function() {
console.log('emitting channelA');
emitter.emit('channelA', 'this is channel A', new Date().getTime());
console.log('emitting channelB');
emitter.emit('channelB', 'this is channel B', new Date().getTime());
console.log('emitting eventA');
emitter.emit('eventA', 'this is eventA', new Date().getTime());
console.log('emitting eventB');
emitter.emit('eventB', 'this is eventB', new Date().getTime());
}, 1000);
// Copyright 2011 Thorcom Systems Ltd. All Rights Reserved.
var VERSION_0_8_X = "v0.8.x";
var VERSION_0_10_X = "v0.10.x";
var version = nodeVersion();
/*
* This is required because they changed the way that bind works between 0.8.x and 0.10.x
* I have not yet found a way of writing code which works on both, without this switch.
*/
function nodeVersion() {
if (process.version.match(/^v0\.8\./)) {
return VERSION_0_8_X;
} else if (process.version.match(/^v0\.10\./)) {
return VERSION_0_10_X;
} else {
throw 'multicast-eventemitter can only work with NodeJS v0.8.x and v0.10.x';
}
}
var dgram = require('dgram')

@@ -11,3 +29,3 @@ , util = require('util')

'transport': 'multicast' // 'pgm' (zeromq) to follow...
, 'multicastInteface': undefined
, 'multicastInterface': undefined
, 'ttl': 64

@@ -19,2 +37,6 @@ , 'overrides' : {} // should be of form "event_name: { address: 'address', port: port }"

/*
* A function to get or create the singleton emitter.
*/
exports.getEmitter = getEmitter;

@@ -28,11 +50,24 @@ function getEmitter() {

/*
* The constructor. Perhaps this shouldn't be exported?
*/
exports.MulticastEventEmitter = MulticastEventEmitter;
function MulticastEventEmitter() {
this.src = getIPv4Address() + '/' + process.pid;
this.listenersByEvent = {}; // a hash of arrays of functions
this.serversByEvent = {}; // a hash of bound datagram servers
this.lastSeq = {}; // a hash of last heard sequence number, by src
this.src = Math.random(); // unique id of this sender
this.seq = 0;
this.serversByEvent = {}; // a hash of bound datagram rx servers
this.lastSeq = {}; // a hash of last heard sequence number, by src and event
this.seqByEvent = {}; // a hash of last sent sequence numbers, by event
this.sender = dgram.createSocket('udp4');
var that = this;
this.sender.bind(function() {
that.sender.setBroadcast(true);
that.sender.setMulticastTTL(options.ttl);
that.sender.setMulticastLoopback(true); // needed from inter-process intra-box comms
});
}
/*
* This adds a listener.
*/
MulticastEventEmitter.prototype.addListener = function(event, listener) {

@@ -51,9 +86,20 @@ //console.log('addListener', event, listener);

// create and keep track of udp multicast listening server
var server = this.serversByEvent = dgram.createSocket('udp4');
server.addMembership(hash.address, options.multicastInterface);
server.setMulticastTTL(options.ttl);
server.setMulticastLoopback(true); // needed from inter-process intra-box comms
var that = this;
server.on('message', function(msg, rinfo) { that.handleMessage(event, msg, rinfo); });
server.bind(hash.port, hash.address);
var server = this.serversByEvent[event] = dgram.createSocket('udp4');
// work around lack of backwards compatibility of NodeJS v0.10.X re: multicast
if (version === VERSION_0_10_X) {
server.bind(hash.port, hash.address, function() {
server.setMulticastTTL(options.ttl);
server.addMembership(hash.address, options.multicastInterface);
server.setMulticastLoopback(true); // needed from inter-process intra-box comms
server.on('message', function(msg, rinfo) { that.handleMessage(event, msg, rinfo); });
});
} else { // VERSION_0_8_X
server.bind(hash.port, hash.address);
server.setMulticastTTL(options.ttl);
server.addMembership(hash.address, options.multicastInterface);
server.setMulticastLoopback(true); // needed from inter-process intra-box comms
server.on('message', function(msg, rinfo) { that.handleMessage(event, msg, rinfo); });
}
}

@@ -63,2 +109,5 @@ this.listenersByEvent[event].push(listener);

/*
* This handles the incoming messages.
*/
MulticastEventEmitter.prototype.handleMessage = function(event, msg, rinfo) {

@@ -70,3 +119,11 @@ //console.log('handleMessage', event, msg, rinfo);

if (message.seq) {
this.lastSeq[message.src] = message.seq;
var key = message.src + '/' + event;
if (!this.lastSeq[key]) this.lastSeq[key] = 0;
var missed = message.seq - (this.lastSeq[key] + 1);
if (missed) {
console.warn(missed, 'messages missed', this.lastSeq[key]);
} else {
//console.info(missed, 'messages missed', this.lastSeq[key], key);
}
this.lastSeq[key] = message.seq;
}

@@ -94,4 +151,8 @@ } catch(e) {

/*
* Emit an event.
*/
MulticastEventEmitter.prototype.emit = function(event) { // varargs...
var args = Array.prototype.slice.call(arguments);
args.shift(); // don't send the event name as one of the arguments
//console.log('emit', args);

@@ -104,11 +165,26 @@ var hash;

}
var message = new Buffer(JSON.stringify({ event: event, args: args, src: this.src, seq: this.seq++ }));
var socket = dgram.createSocket('udp4');
socket.addMembership(hash.address, options.multicastInterface);
socket.setMulticastTTL(options.ttl);
socket.setMulticastLoopback(true); // needed from inter-process intra-box comms
socket.send(message, 0, message.length, hash.port, hash.address);
socket.close(); // TODO: cache open sockets for reuse?
if (!this.seqByEvent[event]) this.seqByEvent[event] = 0;
var message = new Buffer(JSON.stringify({ event: event, args: args, src: this.src, seq: this.seqByEvent[event]++ }));
//console.log("this.sender.send(" + message + ", " + 0 + ", " + message.length + ", " + hash.port + ", " + hash.address + ");");
this.sender.send(message, 0, message.length, hash.port, hash.address);
}
/*
* Get the first non-localhost IPv4 address.
* This is only used for unique identification (with pid), not comms.
*/
function getIPv4Address() {
var interfaces = require('os').networkInterfaces();
for (var devName in interfaces) {
var iface = interfaces[devName];
for (var i = 0; i < iface.length; i++) {
var alias = iface[i];
if (alias.family === 'IPv4' && alias.address !== '127.0.0.1' && !alias.internal)
return alias.address;
}
}
return '0.0.0.0';
}
{ "name" : "multicast-eventemitter"
, "description" : "LAN wide eventemitter, using multicast."
, "version" : "0.0.8"
, "version" : "0.10.0"
, "maintainers" :

@@ -17,6 +17,8 @@ [ { "name": "Chris Dew"

, "main" : "./lib/multicast-eventemitter.js"
, "engines" : { "node" : ">=0.4.7" }
, "dependencies" : { "vows" : ">=0.5.2"
, "docco" : ">=0.3.0"
}
, "engines" : {
"node" : "~0.8.0 ~0.10.0"
}
, "devDependencies" : { "vows" : ">=0.5.2"
, "docco" : ">=0.3.0"
}
}
multicast-eventemitter
----------------------
*Under heavy development - DO NOT USE IN PRODUCTION*
Status: FIXED, now works with NodeJS v0.8.x and v0.10.x.
Please see https://github.com/marak/hook.io for a mature alternative.
This package provides a cluster-wide event emitter. Events sent from any process on any machine on a LAN can be subscribed to by any other process on any other machine.
This package provides a cluster-wide event emitter. Message sent from any process on any machine on a LAN can be subscibed to by any other process on any other machine.
Multicast is more efficient than broadcast. If messages were broadcast, then each subscriber would need to discard the the events it wasnt't interested in. With multicast, this is done by the NIC.

@@ -14,6 +14,5 @@ It works efficiently by hashing event names into 24 bits of address space and 15 bits of port number - thus a 1 in half-a-trillion chance of event name collision.

1. Messages are limited to about 1.5KB in length - exceeding this will produce a parsing error on receivers.
1. Messages are limited to about 1.5KB in length - exceeding this *may* produce a parsing error on receivers.
2. It is unreliable (though we receive 100% of sent messages on our LAN - YMMV)
There are known and unknown bugs - see the FIXMEs and TODOs in the code.
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc