Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

servicebus

Package Overview
Dependencies
Maintainers
2
Versions
73
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

servicebus - npm Package Compare versions

Comparing version 2.0.4 to 2.0.5

12

bus/rabbitmq/pubsubqueue.js

@@ -72,8 +72,10 @@ var events = require('events');

var self = this;
var listening = false;
var subscription = null;
function _unsubscribe (cb) {
if (self.listening) {
// cancel misses callback version so we have to use the promise
if (listening) {
// should we prevent multiple cancel calls?
self.listenChannel
.cancel(self.subscription.consumerTag)
.cancel(subscription.consumerTag)
.then(function () {

@@ -121,4 +123,4 @@ self.emit('unlistened');

.then(function (ok) {
self.listening = true;
self.subscription = { consumerTag: ok.consumerTag };
listening = true;
subscription = { consumerTag: ok.consumerTag };
self.emit('listening');

@@ -125,0 +127,0 @@ });

@@ -10,3 +10,3 @@ {

"description": "Simple service bus for sending events between processes using amqp.",
"version": "2.0.4",
"version": "2.0.5",
"homepage": "https://github.com/mateodelnorte/servicebus",

@@ -13,0 +13,0 @@ "repository": {

@@ -10,2 +10,11 @@ var noop = function () {};

before(function (done) {
// wait until bus is fully initialized
if (!bus.initialized) {
bus.on('ready', done);
} else {
done();
}
});
describe('#publish & #subscribe', function(){

@@ -85,3 +94,3 @@

});
}
}
}, 100);

@@ -144,4 +153,30 @@ var subscription = bus.subscribe('my.event.14', { ack: true }, function (event) {

// });
it('should not receive events after successful unsubscribe', function (done) {
var subscribed = false;
var subscription = null;
const unsubscribe = function () {
subscription.unsubscribe(function () {
subscribed = false;
bus.publish('my.event.17', { my: 'event' });
setTimeout(done, 500);
});
};
const handler = function (event) {
if (subscribed) {
unsubscribe();
} else {
throw new Error('unexpected invocation');
}
};
subscription = bus.subscribe('my.event.17', handler);
setTimeout(function () {
subscribed = true;
bus.publish('my.event.17', { my: 'event' });
}, 100);
});
});
});

@@ -19,18 +19,125 @@ 'use strict';

describe('#subscribe & #unsubscribe', function () {
it('should complete subscribe/unsubscribe cycle', function (done) {
const subscription = bus.subscribe('my.event.11', function (event) {});
describe('on temporary queues', function () {
it('should complete subscribe/unsubscribe cycle', function (done) {
const subscription = bus.subscribe('test.subunsub.1', function (event) {});
subscription
.should.not.equal(bus)
.and.be.a.Object()
.and.have.ownProperty('unsubscribe');
subscription.unsubscribe
.should.be.a.Function();
subscription
.should.not.equal(bus)
.and.be.a.Object()
.and.have.ownProperty('unsubscribe');
subscription.unsubscribe
.should.be.a.Function();
subscription.unsubscribe(function () {
done();
subscription.unsubscribe(function () {
done();
});
});
it('should not receive events after unsubscribe', function (done) {
const subscription = bus.subscribe('test.subunsub.2', function (event) {
throw new Error('unexpected event');
});
subscription.unsubscribe(function () {
bus.publish('test.subunsub.2', {test: 2});
setTimeout(function () {
done();
}, 100);
});
});
it('should not receive events after multiple subscribes/unsubscribes', function (done) {
var receivedEvents = 0;
const subscription1 = bus.subscribe('test.subunsub.3', function (event) {
receivedEvents += 1;
});
const subscription2 = bus.subscribe('test.subunsub.3', function (event) {
receivedEvents += 1;
});
var subscriptions = 2;
const onUnsubscribed = function () {
subscriptions -= 1;
if (subscriptions == 0) {
bus.publish('test.subunsub.3', {test: 2});
setTimeout(function () {
receivedEvents.should.be.equal(0);
done();
}, 100);
}
};
subscription1.unsubscribe(onUnsubscribed());
subscription2.unsubscribe(onUnsubscribed());
});
});
describe('on persistent queues', function () {
const testQueue = 'test.subunsub.persistent';
function _subscribe(eventHandler) {
return bus.subscribe(testQueue, {ack: true}, eventHandler);
}
afterEach(function (done) {
// drain queue
var drainedEvents = 0;
const subscription = _subscribe(function (event) {
drainedEvents += 1;
event.handle.ack();
});
setTimeout(function () {
subscription.unsubscribe();
done();
}, 200);
});
it('should not receive events after unsubscribe', function (done) {
const ts = Date.now();
var receivedEvents = 0;
const subscription = _subscribe(function (event) {
console.log(ts, event);
receivedEvents += 1;
event.handle.ack();
});
setTimeout(function () {
subscription.unsubscribe(function () {
bus.publish(testQueue, {test: 3, ts: ts}, {ack: true});
setTimeout(function () {
receivedEvents.should.be.equal(0);
done();
}, 100);
});
}, 100);
});
it('should not receive events after multiple subscribes/unsubscribes', function (done) {
var receivedEvents = 0;
const subscription1 = _subscribe(function (event) {
receivedEvents += 1;
event.handle.ack();
});
const subscription2 = _subscribe(function (event) {
receivedEvents += 1;
event.handle.ack();
});
var subscriptions = 2;
const onUnsubscribed = function () {
subscriptions -= 1;
if (subscriptions == 0) {
bus.publish(testQueue, {test: 4}, {ack: true});
setTimeout(function () {
receivedEvents.should.be.equal(0);
done();
}, 250);
}
};
setTimeout(function () {
subscription1.unsubscribe(onUnsubscribed);
subscription2.unsubscribe(onUnsubscribed);
}, 100);
});
});
});
});
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc