servicebus
Advanced tools
Comparing version 2.0.9 to 2.0.10
@@ -25,3 +25,3 @@ var amqp = require('amqplib'), | ||
this.channels = []; | ||
this.correlator = new Correlator(options); | ||
this.correlator = options.correlator || new Correlator(options); | ||
this.delayOnStartup = options.delayOnStartup || 10; | ||
@@ -109,4 +109,6 @@ this.exchangeName = options.exchangeName; | ||
this.log('listen on queue %s', queueName); | ||
var self = this; | ||
this.log('listen on queue %j', queueName); | ||
if (typeof options === "function") { | ||
@@ -125,3 +127,7 @@ callback = options; | ||
this.log('creating queue %s', options.queueName); | ||
this.queues[options.queueName] = new Queue(options); | ||
var queue = new Queue(options); | ||
queue.on('listening', function () { | ||
self.emit('listening', queue); | ||
}); | ||
this.queues[options.queueName] = queue; | ||
} | ||
@@ -220,2 +226,4 @@ | ||
this.log('subscribe on queue %j', queueName); | ||
var handle = null; | ||
@@ -234,3 +242,7 @@ function _unsubscribe (options) { | ||
this.log('creating pusubqueue %s', options.queueName); | ||
this.pubsubqueues[options.queueName] = new PubSubQueue(options); | ||
var pubSubQueue = new PubSubQueue(options); | ||
pubSubQueue.on('subscribed', function () { | ||
self.emit('subscribed', pubSubQueue); | ||
}); | ||
this.pubsubqueues[options.queueName] = pubSubQueue; | ||
} | ||
@@ -237,0 +249,0 @@ |
@@ -72,7 +72,9 @@ var events = require('events'); | ||
var self = this; | ||
var listening = false; | ||
var subscribed = false; | ||
var subscription = null; | ||
this.log('subscribing to queue %j with routingKey %j', this.queueName, this.routingKey); | ||
function _unsubscribe (cb) { | ||
if (listening) { | ||
if (subscribed) { | ||
// should we prevent multiple cancel calls? | ||
@@ -88,3 +90,3 @@ self.listenChannel | ||
} else { | ||
self.on('listening', _unsubscribe.bind(this, cb)); | ||
self.on('subscribed', _unsubscribe.bind(this, cb)); | ||
} | ||
@@ -124,5 +126,5 @@ } | ||
.then(function (ok) { | ||
listening = true; | ||
subscribed = true; | ||
subscription = { consumerTag: ok.consumerTag }; | ||
self.emit('listening'); | ||
self.emit('subscribed'); | ||
}); | ||
@@ -129,0 +131,0 @@ } |
@@ -77,3 +77,3 @@ var EventEmitter = require('events').EventEmitter; | ||
this.log('listening to queue %s', this.queueName); | ||
this.log('listening to queue %j', this.queueName); | ||
@@ -80,0 +80,0 @@ if ( ! this.initialized) { |
@@ -10,3 +10,3 @@ { | ||
"description": "Simple service bus for sending events between processes using amqp.", | ||
"version": "2.0.9", | ||
"version": "2.0.10", | ||
"homepage": "https://github.com/mateodelnorte/servicebus", | ||
@@ -13,0 +13,0 @@ "repository": { |
@@ -12,16 +12,16 @@ # servicebus | ||
Process A: | ||
var bus = require('servicebus').bus(); | ||
bus.listen('my.event', function (event) { | ||
console.log(event); | ||
}); | ||
```js | ||
var bus = require('servicebus').bus(); | ||
bus.listen('my.event', function (event) { | ||
console.log(event); | ||
}); | ||
``` | ||
Process B: | ||
```js | ||
var bus = require('servicebus').bus(); | ||
Process B: | ||
var bus = require('servicebus').bus(); | ||
setInterval(function () { | ||
bus.send('my.event', { my: 'event' }); | ||
}, 1000); | ||
setInterval(function () { | ||
bus.send('my.event', { my: 'event' }); | ||
}, 1000); | ||
``` | ||
## Round-Robin Load Distribution | ||
@@ -37,7 +37,9 @@ | ||
bus.listen('my.event', { ack: true }, function (event) { | ||
event.handle.acknowledge(); // acknowledge a message | ||
event.handle.ack(); // short hand is also available | ||
event.handle.reject(); // reject a message | ||
}); | ||
```js | ||
bus.listen('my.event', { ack: true }, function (event) { | ||
event.handle.acknowledge(); // acknowledge a message | ||
event.handle.ack(); // short hand is also available | ||
event.handle.reject(); // reject a message | ||
}); | ||
``` | ||
@@ -51,16 +53,16 @@ Message acknowledgement is suited for use in load distribution scenarios. | ||
Process A (can be run any number of times, all will receive the event): | ||
var bus = require('servicebus').bus(); | ||
bus.subscribe('my.event', function (event) { | ||
console.log(event); | ||
}); | ||
```js | ||
var bus = require('servicebus').bus(); | ||
bus.subscribe('my.event', function (event) { | ||
console.log(event); | ||
}); | ||
``` | ||
Process B: | ||
```js | ||
var bus = require('servicebus').bus(); | ||
Process B: | ||
var bus = require('servicebus').bus(); | ||
setInterval(function () { | ||
bus.publish('my.event', { my: 'event' }); | ||
}, 1000); | ||
setInterval(function () { | ||
bus.publish('my.event', { my: 'event' }); | ||
}, 1000); | ||
``` | ||
# Topic Routing | ||
@@ -70,3 +72,3 @@ | ||
``` | ||
```js | ||
bus.publish('event.one', { event: 'one' }); | ||
@@ -76,3 +78,3 @@ bus.publish('event.two', { event: 'two' }); | ||
and for the listener... | ||
``` | ||
```js | ||
bus.subscribe('event.*', function (msg) ... | ||
@@ -85,3 +87,3 @@ ``` | ||
``` | ||
```js | ||
if ( ! process.env.RABBITMQ_URL) | ||
@@ -103,3 +105,3 @@ throw new Error('Tests require a RABBITMQ_URL environment variable to be set, pointing to the RabbiqMQ instance you wish to use.'); | ||
``` | ||
```js | ||
... | ||
@@ -139,3 +141,3 @@ | ||
``` | ||
```js | ||
// bus.publish('my:event', { my: 'event' }); | ||
@@ -147,3 +149,3 @@ { | ||
becomes | ||
``` | ||
```js | ||
{ | ||
@@ -150,0 +152,0 @@ data: { |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
62756
1525
156