rabbitmq-eventemitter
Advanced tools
Comparing version 1.0.3 to 1.0.4
64
index.js
@@ -34,2 +34,3 @@ var util = require('util'); | ||
var getChannel = function(callback) { | ||
@@ -48,2 +49,3 @@ self._getConnection(function(err, connection) { | ||
this._waitQueues = {}; | ||
this._onerror = onerror; | ||
@@ -89,27 +91,4 @@ this._getConsumeChannel = thunky(getChannel); | ||
if(delay) { | ||
var waitExchangeName = self._waitExchangeName(delay); | ||
var waitQueueName = self._waitQueueName(pattern, delay); | ||
var next = afterAll(function(err) { | ||
publish(err, waitExchangeName); | ||
}); | ||
channel.assertExchange(waitExchangeName, 'direct', { | ||
durable: true, | ||
autoDelete: false | ||
}, next()); | ||
channel.assertQueue(waitQueueName, { | ||
durable: true, | ||
autoDelete: false, | ||
arguments: { | ||
'x-message-ttl': delay, | ||
'x-dead-letter-exchange': EXCHANGE | ||
} | ||
}, next()); | ||
return channel.bindQueue(waitQueueName, waitExchangeName, pattern, null, next()); | ||
} | ||
publish(null, EXCHANGE); | ||
if(delay) self._assertWaitQueue(channel, pattern, delay, publish); | ||
else publish(null, EXCHANGE); | ||
}); | ||
@@ -170,2 +149,37 @@ }; | ||
Queue.prototype._assertWaitQueue = function(channel, pattern, delay, callback) { | ||
var waitExchangeName = this._waitExchangeName(delay); | ||
var waitQueueName = this._waitQueueName(pattern, delay); | ||
var fn = this._waitQueues[waitQueueName]; | ||
if(!fn) { | ||
fn = thunky(function(cb) { | ||
var next = afterAll(function(err) { | ||
cb(err, waitExchangeName); | ||
}); | ||
channel.assertExchange(waitExchangeName, 'direct', { | ||
durable: true, | ||
autoDelete: false | ||
}, next()); | ||
channel.assertQueue(waitQueueName, { | ||
durable: true, | ||
autoDelete: false, | ||
arguments: { | ||
'x-message-ttl': delay, | ||
'x-dead-letter-exchange': EXCHANGE | ||
} | ||
}, next()); | ||
channel.bindQueue(waitQueueName, waitExchangeName, pattern, null, next()); | ||
}); | ||
this._waitQueues[waitQueueName] = fn; | ||
} | ||
fn(callback); | ||
}; | ||
Queue.prototype._queueName = function(pattern) { | ||
@@ -172,0 +186,0 @@ return this._queueOptions.namespace + '.' + pattern; |
{ | ||
"name": "rabbitmq-eventemitter", | ||
"version": "1.0.3", | ||
"version": "1.0.4", | ||
"description": "Simplified rabbitmq events", | ||
@@ -5,0 +5,0 @@ "main": "index.js", |
7884
148