servicebus
Advanced tools
Comparing version 1.0.1 to 1.0.6
@@ -160,6 +160,4 @@ var amqp = require('amqplib'), | ||
this.initialized.done(function() { | ||
function _send (queueName, message, options) { | ||
self.setOptions(queueName, options); | ||
if (self.queues[options.queueName] === undefined) { | ||
@@ -171,4 +169,13 @@ self.queues[options.queueName] = new Queue(options); | ||
}); | ||
} | ||
}); | ||
if ( ! this.initialized.isFulfilled()) { | ||
self.initialized.done(function() { | ||
_send(queueName, message, options); | ||
}); | ||
} else { | ||
_send(queueName, message, options); | ||
} | ||
}; | ||
@@ -208,6 +215,4 @@ | ||
this.initialized.done(function() { | ||
function _publish (queueName, message, options) { | ||
self.setOptions(queueName, options); | ||
if (self.pubsubqueues[options.queueName] === undefined) { | ||
@@ -221,7 +226,13 @@ self.log('creating pubsub queue ' + options.queueName); | ||
}); | ||
} | ||
}); | ||
if ( ! this.initialized.isFulfilled()) { | ||
this.initialized.done(function() { | ||
_publish(queueName, message, options); | ||
}); | ||
} else { | ||
_publish(queueName, message, options); | ||
} | ||
}; | ||
module.exports.Bus = RabbitMQBus; |
var events = require('events'); | ||
var extend = require('extend'); | ||
var newId = require('node-uuid').v4; | ||
var Promise = require('bluebird'); | ||
var util = require('util'); | ||
@@ -69,42 +70,55 @@ | ||
this.correlator.queueName(options, function (err, uniqueName) { | ||
if (err) throw err; | ||
self.listenChannel.assertQueue(uniqueName, self.queueOptions) | ||
.then(function (qok) { | ||
return self.listenChannel.bindQueue(uniqueName, self.exchangeName, self.routingKey || self.queueName); | ||
}).then(function (queue) { | ||
self.listenChannel.consume(uniqueName, function (message) { | ||
/* | ||
Note from http://www.squaremobius.net/amqp.node/doc/channel_api.html | ||
& http://www.rabbitmq.com/consumer-cancel.html: | ||
var initialized = new Promise(function (resolve, reject) { | ||
self.correlator.queueName(options, function (err, uniqueName) { | ||
if (err) throw err; | ||
self.listenChannel.assertQueue(uniqueName, self.queueOptions) | ||
.then(function (qok) { | ||
return self.listenChannel.bindQueue(uniqueName, self.exchangeName, self.routingKey || self.queueName); | ||
}).then(function () { | ||
if (self.ack) { | ||
self.log('asserting error queue ' + self.errorQueueName); | ||
self.listenChannel.assertQueue(self.errorQueueName, self.queueOptions) | ||
.then(function (_qok) { | ||
resolve(uniqueName); | ||
}); | ||
} else { | ||
resolve(uniqueName); | ||
} | ||
}); | ||
}); | ||
}); | ||
If the consumer is cancelled by RabbitMQ, the message callback will be invoked with null. | ||
*/ | ||
if (message === null) { | ||
return; | ||
initialized.then(function (uniqueName) { | ||
self.listenChannel.consume(uniqueName, function (message) { | ||
/* | ||
Note from http://www.squaremobius.net/amqp.node/doc/channel_api.html | ||
& http://www.rabbitmq.com/consumer-cancel.html: | ||
If the consumer is cancelled by RabbitMQ, the message callback will be invoked with null. | ||
*/ | ||
if (message === null) { | ||
return; | ||
} | ||
// todo: map contentType to default formatters | ||
message.content = options.formatter.deserialize(message.content); | ||
options.queueType = 'pubsubqueue'; | ||
self.bus.handleIncoming(self.listenChannel, message, options, function (channel, message, options) { | ||
// amqplib intercepts errors and closes connections before bubbling up | ||
// to domain error handlers when they occur non-asynchronously within | ||
// callback. Therefore, if there is a process domain, we try-catch to | ||
// redirect the error, assuming the domain creator's intentions. | ||
try { | ||
callback(message.content, message); | ||
} catch (err) { | ||
if (process.domain && process.domain.listeners('error')) { | ||
process.domain.emit('error', err); | ||
} else { | ||
self.emit('error', err); | ||
} | ||
// todo: map contentType to default formatters | ||
message.content = options.formatter.deserialize(message.content); | ||
options.queueType = 'pubsubqueue'; | ||
self.bus.handleIncoming(self.listenChannel, message, options, function (channel, message, options) { | ||
// amqplib intercepts errors and closes connections before bubbling up | ||
// to domain error handlers when they occur non-asynchronously within | ||
// callback. Therefore, if there is a process domain, we try-catch to | ||
// redirect the error, assuming the domain creator's intentions. | ||
try { | ||
callback(message.content, message); | ||
} catch (err) { | ||
if (process.domain && process.domain.listeners('error')) { | ||
process.domain.emit('error', err); | ||
} else { | ||
self.emit('error', err); | ||
} | ||
} | ||
}); | ||
}, { noAck: ! self.ack }) | ||
.then(function (ok) { | ||
self.subscription = { consumerTag: ok.consumerTag }; | ||
}); | ||
} | ||
}); | ||
}, { noAck: ! self.ack }) | ||
.then(function (ok) { | ||
self.subscription = { consumerTag: ok.consumerTag }; | ||
}); | ||
}); | ||
@@ -111,0 +125,0 @@ |
@@ -10,3 +10,3 @@ { | ||
"description": "Simple service bus for sending events between processes using amqp.", | ||
"version": "1.0.1", | ||
"version": "1.0.6", | ||
"homepage": "https://github.com/mateodelnorte/servicebus", | ||
@@ -13,0 +13,0 @@ "repository": { |
@@ -102,3 +102,3 @@ var noop = function () {}; | ||
setTimeout(function () { | ||
bus.publish('my.event.15', { data: Math.random() }, { ack: true }); | ||
bus.publish('my.event.15', { data: Math.random() }); | ||
}, 1000); | ||
@@ -105,0 +105,0 @@ }); |
@@ -147,7 +147,11 @@ var noop = function () {}; | ||
bus.listen('my.event.18', { ack: true }, function (event) { | ||
done(new Error('should not receive events after destroy')); | ||
event.handle.ack(); | ||
// commence ugliest hack ever to get around rabbitmq queue consumer rules | ||
setTimeout(function () { | ||
done(new Error('should not receive events after destroy')); | ||
}, 500); | ||
}); | ||
setTimeout(function () { | ||
bus.destroyListener('my.event.18').on('success', function () { | ||
bus.send('my.event.18', { test: 'data'}); | ||
bus.send('my.event.18', { test: 'data'}, { ack: true, expiration: 100 }); | ||
setTimeout(done, 100); | ||
@@ -154,0 +158,0 @@ }); |
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
54331
1313