Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

disturbed

Package Overview
Dependencies
Maintainers
1
Versions
5
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

disturbed - npm Package Compare versions

Comparing version 1.0.2 to 1.0.3

142

index.js
/**
* Distributed events library
*
*/
var uuid = require('node-uuid');
var util = require('util');
var EventEmitter = require('events');
module.exports = function(pubClient, subClient){
var channels = {};
var Emitter = function(pubClient, subClient){
var _this = this;
var _uuid = uuid();
EventEmitter.call(this);
this.on = function(evt, handler){
channels[evt] = channels[evt] || [];
channels[evt].push(handler);
}
this.uuid = uuid();
this.pubClient = pubClient;
this.subClient = subClient;
this.once = function(evt, handler){
function wrapper() {
_this.off(evt, wrapper);
handler.apply(this, arguments);
}
wrapper.__handler = handler;
return _this.on(evt, wrapper);
}
subClient.on('message', function(channel, msg){
this.off = function(evt, handler){
if(channels[evt]){
var handlers = channels[evt];
var index = findHandler(handlers, handler);
if(index !== -1){
handlers.splice(index, 1);
var count = _this.listenerCount(channel);
if(count){
var args;
try{
args = JSON.parse(msg);
}catch(err){
console.error('Parsing event message', err);
}
if(handlers.length === 0){
delete channels[evt];
subClient.unsubscribe(evt);
if(args[0] !== _this.uuid){
args[0] = channel;
_this.emit.apply(_this, args);
}
}
}
});
}
this.emit = function(evt){
var args = Array.prototype.slice.call(arguments);
args[0] = _uuid;
util.inherits(Emitter, EventEmitter);
// Emit to this one
var handlers = channels[evt];
if(handlers){
args.shift();
fireEvent(handlers, args);
}
Emitter.prototype.on = function(){
var _this = this;
var args = Array.prototype.slice.call(arguments);
EventEmitter.prototype.on.apply(this, args);
// Emit to other nodes
return new Promise(function(resolve, reject){
pubClient.publish(evt, JSON.stringify(args), function(err){
if(err){
reject(err);
}else{
resolve();
}
});
return new Promise(function(resolve, reject){
_this.subClient.subscribe(args[0], function(err){
if(err){
reject(err);
}else{
resolve();
}
});
}
})
}
function findHandler(handlers, handler){
for(var i=0; i<handlers.length; i++){
var _handler = handlers[i];
if( (_handler === handler) || (_handler.__handler == handler)){
return i;
Emitter.prototype.distEmit = function(evt){
var _this = this;
var args = Array.prototype.slice.call(arguments);
this.emit.apply(this, args);
args[0] = this.uuid;
// Emit to other nodes
return new Promise(function(resolve, reject){
_this.pubClient.publish(evt, JSON.stringify(args), function(err){
if(err){
reject(err);
}else{
resolve();
}
}
return -1;
}
});
});
}
function fireEvent(handlers, args){
var _handlers = [], i, len = handlers.length;
Emitter.prototype.off = Emitter.prototype.removeListener = function(evt){
var _this = this;
var args = Array.prototype.slice.call(arguments);
EventEmitter.prototype.removeListener.apply(this, args);
//
// Copy the handlers since we could remove listeners when firing
// events.
//
for(i=0; i<len; i++){
_handlers[i] = handlers[i];
}
for(i=0; i < len; i++) {
_handlers[i].apply(this, args);
}
if(!_this.listenerCount(evt)){
_this.subClient.unsubscribe(evt);
}
}
subClient.on('message', function(channel, msg){
var handlers = channels[channel];
if(handlers){
var args;
try{
args = JSON.parse(msg);
}catch(err){
console.error('Parsing event message', err);
}
module.exports = Emitter;
if(args[0] !== _uuid){
fireEvent(handlers, args);
}
}
});
return this;
}
{
"name": "disturbed",
"version": "1.0.2",
"version": "1.0.3",
"description": "A distributed event emitter, both for client and nodejs servers",

@@ -5,0 +5,0 @@ "main": "index.js",

@@ -8,7 +8,7 @@

describe('Disturbed', function(){
describe('Disturbed', function () {
var disturbed;
var pubClient, subClient;
beforeEach(function(){
beforeEach(function () {
disturbed = require('../index.js');

@@ -21,3 +21,3 @@ return Promise.join(

afterEach(function(){
afterEach(function () {
return Promise.join(

@@ -29,7 +29,7 @@ pubClient.quitAsync(),

it('should emit to a local listener', function(done){
var eventEmitter = disturbed(pubClient, subClient)
it('should emit to a local listener', function (done) {
var eventEmitter = new disturbed(pubClient, subClient)
var counter = 0;
eventEmitter.on('test', function(a, b, c){
eventEmitter.on('test', function (a, b, c) {
counter++;

@@ -43,38 +43,40 @@ expect(a).equal(1);

eventEmitter.emit('test', 1, 2, 3);
eventEmitter.distEmit('test', 1, 2, 3);
});
it('should emit to a remote and local listener', function(done){
var eventEmitter1 = disturbed(pubClient, subClient)
var eventEmitter2 = disturbed(pubClient, subClient)
it('should emit to a remote and local listener', function (done) {
var eventEmitter1 = new disturbed(pubClient, subClient)
var eventEmitter2 = new disturbed(pubClient, subClient)
var counter1 = 0;
var counter2 = 0;
eventEmitter1.on('test', function(a, b, c){
counter1++;
expect(a).equal(1);
expect(b).equal(2);
expect(c).equal(3);
expect(counter1).equal(1);
Promise.join(
eventEmitter1.on('test', function (a, b, c) {
counter1++;
expect(a).equal(1);
expect(b).equal(2);
expect(c).equal(3);
expect(counter1).equal(1);
}),
eventEmitter2.on('test', function (a, b, c) {
counter2++;
expect(a).equal(1);
expect(b).equal(2);
expect(c).equal(3);
expect(counter2).equal(1);
if (counter1 == 1) {
done();
}
})
).then(function () {
eventEmitter1.distEmit('test', 1, 2, 3);
});
eventEmitter2.on('test', function(a, b, c){
counter2++;
expect(a).equal(1);
expect(b).equal(2);
expect(c).equal(3);
expect(counter2).equal(1);
if(counter1 == 1){
done();
}
});
eventEmitter1.emit('test', 1, 2, 3);
});
it('should stop listen to events', function(done){
var eventEmitter1 = disturbed(pubClient, subClient)
var eventEmitter2 = disturbed(pubClient, subClient)
it('should stop listen to events', function (done) {
var eventEmitter1 = new disturbed(pubClient, subClient)
var eventEmitter2 = new disturbed(pubClient, subClient)
var counter1 = 0;
var counter2 = 0;
var listener = function(a, b, c){
var listener = function (a, b, c) {
expect(true).equal(false);

@@ -89,3 +91,3 @@ }

eventEmitter1.emit('test', 1, 2, 3);
eventEmitter1.distEmit('test', 1, 2, 3);

@@ -92,0 +94,0 @@ done();

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc