servicebus
Advanced tools
Comparing version 2.0.4 to 2.0.5
@@ -72,8 +72,10 @@ var events = require('events'); | ||
var self = this; | ||
var listening = false; | ||
var subscription = null; | ||
function _unsubscribe (cb) { | ||
if (self.listening) { | ||
// cancel misses callback version so we have to use the promise | ||
if (listening) { | ||
// should we prevent multiple cancel calls? | ||
self.listenChannel | ||
.cancel(self.subscription.consumerTag) | ||
.cancel(subscription.consumerTag) | ||
.then(function () { | ||
@@ -121,4 +123,4 @@ self.emit('unlistened'); | ||
.then(function (ok) { | ||
self.listening = true; | ||
self.subscription = { consumerTag: ok.consumerTag }; | ||
listening = true; | ||
subscription = { consumerTag: ok.consumerTag }; | ||
self.emit('listening'); | ||
@@ -125,0 +127,0 @@ }); |
@@ -10,3 +10,3 @@ { | ||
"description": "Simple service bus for sending events between processes using amqp.", | ||
"version": "2.0.4", | ||
"version": "2.0.5", | ||
"homepage": "https://github.com/mateodelnorte/servicebus", | ||
@@ -13,0 +13,0 @@ "repository": { |
@@ -10,2 +10,11 @@ var noop = function () {}; | ||
before(function (done) { | ||
// wait until bus is fully initialized | ||
if (!bus.initialized) { | ||
bus.on('ready', done); | ||
} else { | ||
done(); | ||
} | ||
}); | ||
describe('#publish & #subscribe', function(){ | ||
@@ -85,3 +94,3 @@ | ||
}); | ||
} | ||
} | ||
}, 100); | ||
@@ -144,4 +153,30 @@ var subscription = bus.subscribe('my.event.14', { ack: true }, function (event) { | ||
// }); | ||
it('should not receive events after successful unsubscribe', function (done) { | ||
var subscribed = false; | ||
var subscription = null; | ||
const unsubscribe = function () { | ||
subscription.unsubscribe(function () { | ||
subscribed = false; | ||
bus.publish('my.event.17', { my: 'event' }); | ||
setTimeout(done, 500); | ||
}); | ||
}; | ||
const handler = function (event) { | ||
if (subscribed) { | ||
unsubscribe(); | ||
} else { | ||
throw new Error('unexpected invocation'); | ||
} | ||
}; | ||
subscription = bus.subscribe('my.event.17', handler); | ||
setTimeout(function () { | ||
subscribed = true; | ||
bus.publish('my.event.17', { my: 'event' }); | ||
}, 100); | ||
}); | ||
}); | ||
}); |
@@ -19,18 +19,125 @@ 'use strict'; | ||
describe('#subscribe & #unsubscribe', function () { | ||
it('should complete subscribe/unsubscribe cycle', function (done) { | ||
const subscription = bus.subscribe('my.event.11', function (event) {}); | ||
describe('on temporary queues', function () { | ||
it('should complete subscribe/unsubscribe cycle', function (done) { | ||
const subscription = bus.subscribe('test.subunsub.1', function (event) {}); | ||
subscription | ||
.should.not.equal(bus) | ||
.and.be.a.Object() | ||
.and.have.ownProperty('unsubscribe'); | ||
subscription.unsubscribe | ||
.should.be.a.Function(); | ||
subscription | ||
.should.not.equal(bus) | ||
.and.be.a.Object() | ||
.and.have.ownProperty('unsubscribe'); | ||
subscription.unsubscribe | ||
.should.be.a.Function(); | ||
subscription.unsubscribe(function () { | ||
done(); | ||
subscription.unsubscribe(function () { | ||
done(); | ||
}); | ||
}); | ||
it('should not receive events after unsubscribe', function (done) { | ||
const subscription = bus.subscribe('test.subunsub.2', function (event) { | ||
throw new Error('unexpected event'); | ||
}); | ||
subscription.unsubscribe(function () { | ||
bus.publish('test.subunsub.2', {test: 2}); | ||
setTimeout(function () { | ||
done(); | ||
}, 100); | ||
}); | ||
}); | ||
it('should not receive events after multiple subscribes/unsubscribes', function (done) { | ||
var receivedEvents = 0; | ||
const subscription1 = bus.subscribe('test.subunsub.3', function (event) { | ||
receivedEvents += 1; | ||
}); | ||
const subscription2 = bus.subscribe('test.subunsub.3', function (event) { | ||
receivedEvents += 1; | ||
}); | ||
var subscriptions = 2; | ||
const onUnsubscribed = function () { | ||
subscriptions -= 1; | ||
if (subscriptions == 0) { | ||
bus.publish('test.subunsub.3', {test: 2}); | ||
setTimeout(function () { | ||
receivedEvents.should.be.equal(0); | ||
done(); | ||
}, 100); | ||
} | ||
}; | ||
subscription1.unsubscribe(onUnsubscribed()); | ||
subscription2.unsubscribe(onUnsubscribed()); | ||
}); | ||
}); | ||
describe('on persistent queues', function () { | ||
const testQueue = 'test.subunsub.persistent'; | ||
function _subscribe(eventHandler) { | ||
return bus.subscribe(testQueue, {ack: true}, eventHandler); | ||
} | ||
afterEach(function (done) { | ||
// drain queue | ||
var drainedEvents = 0; | ||
const subscription = _subscribe(function (event) { | ||
drainedEvents += 1; | ||
event.handle.ack(); | ||
}); | ||
setTimeout(function () { | ||
subscription.unsubscribe(); | ||
done(); | ||
}, 200); | ||
}); | ||
it('should not receive events after unsubscribe', function (done) { | ||
const ts = Date.now(); | ||
var receivedEvents = 0; | ||
const subscription = _subscribe(function (event) { | ||
console.log(ts, event); | ||
receivedEvents += 1; | ||
event.handle.ack(); | ||
}); | ||
setTimeout(function () { | ||
subscription.unsubscribe(function () { | ||
bus.publish(testQueue, {test: 3, ts: ts}, {ack: true}); | ||
setTimeout(function () { | ||
receivedEvents.should.be.equal(0); | ||
done(); | ||
}, 100); | ||
}); | ||
}, 100); | ||
}); | ||
it('should not receive events after multiple subscribes/unsubscribes', function (done) { | ||
var receivedEvents = 0; | ||
const subscription1 = _subscribe(function (event) { | ||
receivedEvents += 1; | ||
event.handle.ack(); | ||
}); | ||
const subscription2 = _subscribe(function (event) { | ||
receivedEvents += 1; | ||
event.handle.ack(); | ||
}); | ||
var subscriptions = 2; | ||
const onUnsubscribed = function () { | ||
subscriptions -= 1; | ||
if (subscriptions == 0) { | ||
bus.publish(testQueue, {test: 4}, {ack: true}); | ||
setTimeout(function () { | ||
receivedEvents.should.be.equal(0); | ||
done(); | ||
}, 250); | ||
} | ||
}; | ||
setTimeout(function () { | ||
subscription1.unsubscribe(onUnsubscribed); | ||
subscription2.unsubscribe(onUnsubscribed); | ||
}, 100); | ||
}); | ||
}); | ||
}); | ||
}); | ||
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
61083
1475