servicebus
Advanced tools
Comparing version 0.4.1 to 0.4.3
@@ -0,6 +1,12 @@ | ||
var EventEmitter = require('events').EventEmitter; | ||
var util = require('util'); | ||
function Bus () { | ||
this.incomingMiddleware = []; | ||
this.outgoingMiddleware = []; | ||
EventEmitter.call(this); | ||
} | ||
util.inherits(Bus, EventEmitter); | ||
Bus.prototype.use = function (middleware) { | ||
@@ -17,2 +23,4 @@ if (middleware.handleIncoming) this.incomingMiddleware.push(middleware.handleIncoming); | ||
function next (err) { | ||
if (err) throw err; // at this point we don't have a mechanism for providing an error-aware callback to sends and publishes, | ||
// so we'll throw. in the future we can check for the presense of one and throw if it's not provided | ||
@@ -46,2 +54,4 @@ var layer; | ||
function next (err) { | ||
if (err) throw err; // at this point we don't have a mechanism for providing a callback to sends and publishes, | ||
// so we'll throw. in the future we can check for the presense of one and throw if it's not provided | ||
@@ -48,0 +58,0 @@ var layer; |
var amqp = require('amqp'), | ||
Bus = require('../bus'), | ||
Correlator = require('./correlator'), | ||
log = require('debug')('servicebus'), | ||
@@ -18,3 +19,4 @@ events = require('events'), | ||
options.vhost = options.vhost || process.env.RABBITMQ_VHOST || '/'; | ||
this.correlator = new Correlator(options); | ||
this.delayOnStartup = options.delayOnStartup || 10; | ||
@@ -24,2 +26,3 @@ this.log = options.log || log; | ||
this.queues = {}; | ||
this.queuesFile = options.queuesFile; | ||
@@ -34,4 +37,11 @@ log('connecting to rabbitmq on ' + options.url); | ||
self.log('rabbitmq connection closed.'); | ||
self.emit('close'); | ||
}); | ||
this.connection.on('error', function (err) { | ||
// if you don't want servicebus to crash on error, you'll need to listen on the | ||
// bus' error event, log or do whatever you like. | ||
self.emit('error', err); | ||
}); | ||
this.initialized = new Promise(function (resolve, reject) { | ||
@@ -43,2 +53,3 @@ | ||
self.log("rabbitmq connected to " + self.connection.serverProperties.product); | ||
self.emit('ready'); | ||
resolve(); | ||
@@ -49,3 +60,3 @@ }); | ||
self.log('Error connecting to rabbitmq at ' + options.url + ' error: ' + err.toString()); | ||
throw err; | ||
self.emit('error', err); | ||
}); | ||
@@ -69,9 +80,12 @@ | ||
this.setOptions(queueName, options); | ||
this.initialized.done(function() { | ||
if (self.queues[queueName] === undefined) { | ||
log('creating queue ' + queueName); | ||
self.queues[queueName] = new Queue({ bus: self, connection: self.connection, queueName: queueName, log: self.log }); | ||
if (self.queues[options.queueName] === undefined) { | ||
log('creating queue ' + options.queueName); | ||
self.queues[options.queueName] = new Queue(options); | ||
} | ||
self.queues[queueName].listen(callback, options); | ||
self.queues[options.queueName].listen(callback, options); | ||
}); | ||
@@ -97,2 +111,20 @@ | ||
RabbitMQBus.prototype.setOptions = function (queueName, options) { | ||
if (typeof queueName === 'object') { | ||
options.queueName = queueName.queueName; | ||
options.routingKey = queueName.routingKey; | ||
queueName = queueName.queueName; | ||
} else { | ||
options.queueName = queueName; | ||
} | ||
extend(options, { | ||
bus: this, | ||
connection: this.connection, | ||
correlator: this.correlator, | ||
log: this.log, | ||
queuesFile: this.queuesFile | ||
}); | ||
} | ||
RabbitMQBus.prototype.send = function send (queueName, message, options) { | ||
@@ -102,8 +134,9 @@ var self = this; | ||
this.setOptions(queueName, options); | ||
this.initialized.done(function() { | ||
if (self.queues[queueName] === undefined) { | ||
extend(options, { bus: self, connection: self.connection, queueName: queueName, log: self.log }); | ||
self.queues[queueName] = new Queue(options); | ||
if (self.queues[options.queueName] === undefined) { | ||
self.queues[options.queueName] = new Queue(options); | ||
} | ||
self.handleOutgoing(queueName, message, function (queueName, message) { | ||
self.handleOutgoing(options.queueName, message, function (queueName, message) { | ||
log('sending to queue ' + queueName + ' event ' + util.inspect(message)); | ||
@@ -124,2 +157,4 @@ self.queues[queueName].send(message); | ||
this.setOptions(queueName, options); | ||
var handle = null; | ||
@@ -131,6 +166,6 @@ function _unsubscribe (options) { | ||
this.initialized.done(function() { | ||
if (self.pubsubqueues[queueName] === undefined) { | ||
self.pubsubqueues[queueName] = new PubSubQueue({ bus: self, connection: self.connection, queueName: queueName, log: self.log }); | ||
if (self.pubsubqueues[options.queueName] === undefined) { | ||
self.pubsubqueues[options.queueName] = new PubSubQueue(options); | ||
} | ||
handle = self.pubsubqueues[queueName].subscribe(callback, options); | ||
handle = self.pubsubqueues[options.queueName].subscribe(options, callback); | ||
}); | ||
@@ -144,12 +179,15 @@ | ||
RabbitMQBus.prototype.publish = function publish (queueName, message) { | ||
RabbitMQBus.prototype.publish = function publish (queueName, message, options) { | ||
var self = this; | ||
options = options || {}; | ||
this.setOptions(queueName, options); | ||
this.initialized.done(function() { | ||
if (self.pubsubqueues[queueName] === undefined) { | ||
log('creating pubsub queue ' + queueName); | ||
self.pubsubqueues[queueName] = new PubSubQueue({ bus: self, connection: self.connection, queueName: queueName, log: self.log }); | ||
if (self.pubsubqueues[options.queueName] === undefined) { | ||
log('creating pubsub queue ' + options.queueName); | ||
self.pubsubqueues[options.queueName] = new PubSubQueue(options); | ||
} | ||
self.handleOutgoing(queueName, message, function (queueName, message) { | ||
self.handleOutgoing(options.queueName, message, function (queueName, message) { | ||
log('sending to queue ' + queueName + ' event ' + util.inspect(message)); | ||
@@ -156,0 +194,0 @@ self.pubsubqueues[queueName].publish(message); |
var events = require('events'), | ||
fs = require('fs'), | ||
newId = require('node-uuid'), | ||
path = require('path'), | ||
Promise = require('bluebird'), | ||
QueueRegistry = require('./queueregistry'), | ||
util = require('util'); | ||
function Correlator () { | ||
events.EventEmitter.call(this); | ||
var queues = {}; | ||
function Correlator (options) { | ||
var self = this; | ||
this.ready = false; | ||
this.registry = new QueueRegistry(); | ||
this.registry.getCurrentQueues(function (err, queues) { | ||
self.ready = true; | ||
self.emit('ready'); | ||
this.filename = (options && options.queuesFile) ? path.join(process.cwd(), options.queuesFile) : path.join(process.cwd(), '.queues'); | ||
this.loading = new Promise(function (resolve, reject) { | ||
var result; | ||
fs.readFile(self.filename, function (err, buf) { | ||
if (err) { | ||
return resolve({}); | ||
} | ||
try { | ||
result = JSON.parse(buf.toString()); | ||
} catch (err) { | ||
result = {}; | ||
} finally { | ||
resolve(result); | ||
} | ||
}); | ||
}); | ||
events.EventEmitter.call(this); | ||
} | ||
@@ -19,33 +35,25 @@ | ||
Correlator.prototype.getUniqueId = function getUniqueId (queueName, subscriptionId, callback) { | ||
var id, | ||
self = this; | ||
if ( ! subscriptionId) { | ||
id = queueName + '.' + newId(); | ||
return callback(null, id); | ||
} | ||
function getIdFromQueueData () { | ||
if( ! self.registry.queues[subscriptionId]) { | ||
id = queueName + '.' + newId(); | ||
self.registry.queues[subscriptionId] = id; | ||
self.registry.setCurrentQueues(self.registry.queues, function (err) { | ||
if (err) callback(err); | ||
else callback(null, id); | ||
}); | ||
Correlator.prototype.queueName = function queueName (options, callback) { | ||
var self = this; | ||
this.loading.done(function (result) { | ||
queues = result; | ||
var queueName; | ||
if (queues.hasOwnProperty(options.queueName)) { | ||
queueName = queues[options.queueName]; | ||
} else { | ||
id = self.registry.queues[subscriptionId]; | ||
callback(null, id); | ||
queueName = util.format('%s.%s', options.queueName, newId()); | ||
queues[options.queueName] = queueName; | ||
} | ||
} | ||
if (this.ready) { | ||
getIdFromQueueData(); | ||
} else { | ||
this.on('ready', function () { | ||
getIdFromQueueData(); | ||
self.persistQueueFile(function (err) { | ||
if (err) return callback(err); | ||
callback(null, queueName); | ||
}); | ||
} | ||
}); | ||
}; | ||
Correlator.prototype.persistQueueFile = function (callback) { | ||
var contents = JSON.stringify(queues); | ||
fs.writeFile(this.filename, contents, callback); | ||
} | ||
module.exports = Correlator; |
@@ -1,3 +0,2 @@ | ||
var Correlator = require('./correlator'), | ||
events = require('events'), | ||
var events = require('events'), | ||
newId = require('node-uuid').v4, | ||
@@ -10,4 +9,4 @@ Serializer = require('./serializer'), | ||
this.connection = options.connection; | ||
this.correlator = new Correlator(); | ||
// this.errorQueueName = options.queueName + '.error'; | ||
this.correlator = options.correlator; | ||
this.errorQueueName = options.queueName + '.error'; | ||
this.log = options.log; | ||
@@ -32,3 +31,3 @@ this.maxRetries = options.maxRetries || 3; | ||
this.log('publishing to exchange ' + self.exchange.name + ' ' + self.queueName + ' event ' + util.inspect(event)); | ||
process.nextTick(function () { | ||
setImmediate(function () { | ||
self.exchange.publish(self.queueName, event, { contentType: 'application/json', deliveryMode: 2 }); | ||
@@ -39,3 +38,3 @@ }); | ||
PubSubQueue.prototype.subscribe = function subscribe (callback, options) { | ||
PubSubQueue.prototype.subscribe = function subscribe (options, callback) { | ||
var self = this, | ||
@@ -47,12 +46,12 @@ uniqueName, | ||
// if (options && options.ack) { | ||
// queueOptions.durable = true; | ||
// queueOptions.autoDelete = false; | ||
// self.connection.queue(self.errorQueueName, queueOptions, function (q) { | ||
// q.bind(self.exchange, self.errorQueueName); | ||
// q.on('queueBindOk', function() { | ||
// self.log('bound to ' + self.errorQueueName); | ||
// }); | ||
// }); | ||
// } | ||
if (options && options.ack) { | ||
queueOptions.durable = true; | ||
queueOptions.autoDelete = false; | ||
self.connection.queue(self.errorQueueName, queueOptions, function (q) { | ||
q.bind(self.exchange, self.errorQueueName); | ||
q.on('queueBindOk', function() { | ||
self.log('bound to ' + self.errorQueueName); | ||
}); | ||
}); | ||
} | ||
@@ -63,6 +62,4 @@ var queue; | ||
} | ||
this.correlator.getUniqueId(self.queueName, options.subscriptionId, function (err, _id) { | ||
this.correlator.queueName(options, function (err, uniqueName) { | ||
if (err) throw err; | ||
uniqueName = _id; | ||
self.connection.queue(uniqueName, queueOptions, function (q) { | ||
@@ -69,0 +66,0 @@ queue = q; |
@@ -1,5 +0,15 @@ | ||
var events = require('events'), | ||
util = require('util'); | ||
var EventEmitter = require('events').EventEmitter; | ||
var extend = require('extend'); | ||
var Promise = require('bluebird'); | ||
var util = require('util'); | ||
function Queue(options) { | ||
function Queue (options) { | ||
var options = options || {}; | ||
var queueOptions = options.queueOptions || {}; | ||
extend(queueOptions, { | ||
autoDelete: ! (options.ack || options.acknowledge), | ||
durable: options.ack || options.acknowledge | ||
}); | ||
this.bus = options.bus; | ||
@@ -13,9 +23,49 @@ this.connection = options.connection; | ||
this.rejected = {}; | ||
this.routingKey = options.routingKey; | ||
this.contentType = options.contentType || 'application/json'; | ||
this.deliveryMode = (options.ack || options.acknowledge || options.persistent) ? 2 : 1; // default to non-persistent messages | ||
this.deliveryMode = (options.ack || options.acknowledge || options.persistent) | ||
? 2 | ||
: 1; // default to non-persistent messages | ||
this.ack = (options.ack || options.acknowledge); | ||
events.EventEmitter.call(this); | ||
EventEmitter.call(this); | ||
var self = this; | ||
this.initialized = Promise.all([ | ||
// we're initialized when our queues are bound | ||
new Promise(function (resolve, reject) { | ||
self.log('connecting to queue ' + self.queueName); | ||
self.queue = self.connection.queue(self.queueName, queueOptions, function () { | ||
self.log('binding to routingKey ' + self.routingKey || self.queueName); | ||
self.queue.bind(self.routingKey || self.queueName); | ||
self.queue.on('queueBindOk', function() { | ||
self.log('bound to queue ' + self.queueName); | ||
resolve(); | ||
}); | ||
}); | ||
}), | ||
new Promise(function (resolve, reject) { | ||
if (self.ack) { | ||
queueOptions.durable = true; | ||
queueOptions.autoDelete = false; | ||
self.errorQueue = self.connection.queue(self.queueName + '.error', queueOptions, function (eq) { | ||
eq.bind(self.errorQueueName); | ||
eq.on('queueBindOk', function () { | ||
self.log('bound to ' + self.errorQueueName); | ||
resolve(); | ||
}); | ||
}); | ||
} else { | ||
resolve(); | ||
} | ||
}) | ||
]).catch(function (err) { | ||
self.log('error connecting to queue ', options.queueName, '. error: ' + err.toString()); | ||
self.emit('error', err); | ||
}); | ||
} | ||
util.inherits(Queue, events.EventEmitter); | ||
util.inherits(Queue, EventEmitter); | ||
@@ -32,27 +82,13 @@ Queue.prototype.error = function error (event) { | ||
var self = this; | ||
this.log('listening to queue ' + this.queueName + ' with options ' + util.inspect(options)); | ||
if (options && options.ack) { | ||
queueOptions.durable = true; | ||
queueOptions.autoDelete = false; | ||
self.errorQueue = self.connection.queue(self.queueName + '.error', queueOptions, function(eq) { | ||
eq.bind(self.errorQueueName); | ||
eq.on('queueBindOk', function() { | ||
self.log('bound to ' + self.errorQueueName); | ||
this.initialized.done(function () { | ||
self.queue.subscribe(options, function (message, headers, deliveryInfo, messageHandle) { | ||
self.bus.handleIncoming(message, headers, deliveryInfo, messageHandle, options, function (message, headers, deliveryInfo, messageHandle, options) { | ||
callback(message, headers, deliveryInfo, messageHandle, options); | ||
}); | ||
}).on('success', function (subscription) { | ||
self.subscription = subscription; | ||
}); | ||
} | ||
self.queue = this.connection.queue(this.queueName, queueOptions, function() { | ||
self.queue.bind(self.queueName); | ||
self.queue.on('queueBindOk', function() { | ||
self.log('listening to queue ' + self.queueName + ' with options ' + util.inspect(options)); | ||
self.queue.subscribe(options, function (message, headers, deliveryInfo, messageHandle) { | ||
self.bus.handleIncoming(message, headers, deliveryInfo, messageHandle, options, function (message, headers, deliveryInfo, messageHandle, options) { | ||
callback(message, headers, deliveryInfo, messageHandle, options); | ||
}); | ||
}).on('success', function (subscription) { | ||
self.subscription = subscription; | ||
}); | ||
self.initialized = true; | ||
}); | ||
}); | ||
@@ -77,4 +113,4 @@ }; | ||
var self = this; | ||
process.nextTick(function () { | ||
self.connection.publish(self.queueName, event, { | ||
this.initialized.done(function () { | ||
self.connection.publish(self.routingKey || self.queueName, event, { | ||
contentType: self.contentType, | ||
@@ -81,0 +117,0 @@ deliveryMode: self.deliveryMode |
var events = require('events'), | ||
fs = require('fs'), | ||
path = require('path'), | ||
util = require('util'); | ||
@@ -8,3 +9,3 @@ | ||
function Serializer () { | ||
this.filename = process.cwd() + '/.queues'; | ||
this.filename = path.join(process.cwd(), '.queues'); | ||
events.EventEmitter.call(this); | ||
@@ -11,0 +12,0 @@ } |
@@ -10,3 +10,3 @@ { | ||
"description": "Simple service bus for sending events between processes using amqp.", | ||
"version": "0.4.1", | ||
"version": "0.4.3", | ||
"homepage": "https://github.com/mateodelnorte/servicebus", | ||
@@ -13,0 +13,0 @@ "repository": { |
@@ -9,3 +9,3 @@ var noop = function () {}; | ||
it('should cause message to be received by subscribe', function(done){ | ||
it('should cause message to be received by subscribe', function (done){ | ||
bus.subscribe('my.event.11', function (event) { | ||
@@ -19,3 +19,3 @@ done(); | ||
it('should fan out to when multiple listening', function(done){ | ||
it('should fan out to when multiple listening', function (done){ | ||
var count = 0; | ||
@@ -49,3 +49,4 @@ function tryDone(){ | ||
it('can handle high event throughput', function(done){ | ||
it('can handle high event throughput', function (done){ | ||
this.timeout(30000); | ||
var count = 0, endCount = 5000; | ||
@@ -93,4 +94,8 @@ function tryDone(){ | ||
}); | ||
// it('should allow for a mixture of ack:true and ack:false subscriptions', function () { | ||
// }); | ||
}); | ||
}); |
@@ -10,3 +10,3 @@ var noop = function () {}; | ||
it('should cause message to be received by listen', function(done){ | ||
it('should cause message to be received by listen', function (done){ | ||
bus.listen('my.event.1', function (event) { | ||
@@ -20,3 +20,3 @@ done(); | ||
it('should distribute out to subsequent listeners when multiple listening', function(done){ | ||
it('should distribute out to subsequent listeners when multiple listening', function (done){ | ||
var count = 0; | ||
@@ -49,3 +49,4 @@ function tryDone(){ | ||
it('can handle high event throughput', function(done){ | ||
it('can handle high event throughput', function (done){ | ||
this.timeout(30000); | ||
var count = 0, endCount = 5000; | ||
@@ -68,13 +69,11 @@ function tryDone(){ | ||
it('sends subsequent messages only after previous messages are acknowledged', function(done){ | ||
it('sends subsequent messages only after previous messages are acknowledged', function (done){ | ||
var count = 0; | ||
var interval = setInterval(function checkDone () { | ||
if (count === 4) { | ||
clearInterval(interval); | ||
bus.destroyListener('my.event.4').on('success', function () { | ||
clearInterval(interval); | ||
done(); | ||
}); | ||
} else { | ||
// log('not done yet!'); | ||
} | ||
} | ||
}, 10); | ||
@@ -86,11 +85,42 @@ bus.listen('my.event.4', { ack: true }, function (event) { | ||
setTimeout(function () { | ||
//process.nextTick(function () { | ||
bus.send('my.event.4', { my: 'event' }); | ||
bus.send('my.event.4', { my: 'event' }); | ||
bus.send('my.event.4', { my: 'event' }); | ||
bus.send('my.event.4', { my: 'event' }); | ||
//}); | ||
bus.send('my.event.4', { my: 'event' }); | ||
bus.send('my.event.4', { my: 'event' }); | ||
bus.send('my.event.4', { my: 'event' }); | ||
bus.send('my.event.4', { my: 'event' }); | ||
}, 10); | ||
}); | ||
// it('allows routing based on routingKey, on same named queue', function (done){ | ||
// var count = 0; | ||
// var interval = setInterval(function checkDone () { | ||
// if (count === 4) { | ||
// clearInterval(interval); | ||
// bus.destroyListener('my.event.routingKey').on('success', function () { | ||
// done(); | ||
// }); | ||
// } | ||
// }, 10); | ||
// bus.listen('my.event.routingKey.1', { ack: true }, function (event) { | ||
// count++; | ||
// event.handle.ack(); | ||
// }); | ||
// bus.listen('my.event.routingKey.2', { ack: true }, function (event) { | ||
// count++; | ||
// event.handle.ack(); | ||
// }); | ||
// bus.listen('my.event.routingKey.3', { ack: true }, function (event) { | ||
// count++; | ||
// event.handle.ack(); | ||
// }); | ||
// bus.listen('my.event.routingKey.4', { ack: true }, function (event) { | ||
// count++; | ||
// event.handle.ack(); | ||
// }); | ||
// setTimeout(function () { | ||
// bus.send({ queueName: 'my.event.routingKey', routingKey: 'my.event.routingKey.1' }, { my: 'event1' }); | ||
// bus.send({ queueName: 'my.event.routingKey', routingKey: 'my.event.routingKey.2' }, { my: 'event2' }); | ||
// bus.send({ queueName: 'my.event.routingKey', routingKey: 'my.event.routingKey.3' }, { my: 'event3' }); | ||
// bus.send({ queueName: 'my.event.routingKey', routingKey: 'my.event.routingKey.4' }, { my: 'event4' }); | ||
// }, 10); | ||
// }); | ||
@@ -112,3 +142,3 @@ }); | ||
}); | ||
}, 2000); | ||
}, 1500); | ||
}); | ||
@@ -129,3 +159,3 @@ | ||
}); | ||
}, 2000); | ||
}, 1500); | ||
}); | ||
@@ -132,0 +162,0 @@ |
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
40284
936
7