servicebus
Advanced tools
Comparing version 0.1.0 to 0.2.0
199
bus/bus.js
@@ -1,163 +0,78 @@ | ||
var util = require('util'), | ||
amqp = require('amqp'), | ||
events = require('events'), | ||
PubSubQueue = require('./pubsubqueue'), | ||
Queue = require('./queue'), | ||
newId = require('node-uuid'); | ||
function Bus () { | ||
this.middleware = []; | ||
} | ||
function Bus(options, implOpts) { | ||
var noop = function () {}; | ||
options = options || {}, implOpts, self = this; | ||
options.url = options.url || process.env.RABBITMQ_URL || 'amqp://localhost'; | ||
implOpts = implOpts || { defaultExchangeName: 'amq.topic' }; | ||
this.log = options.log || { debug: noop, info: noop, warn: noop, error: noop }; | ||
this.delayOnStartup = options.delayOnStartup || 10; | ||
this.initialized = false; | ||
this.log.debug('connecting to rabbitmq on ' + options.url); | ||
this.connection = amqp.createConnection(options, implOpts); | ||
this.pubsubqueues = {}; | ||
this.queues = {}; | ||
Bus.prototype.use = function (middleware) { | ||
this.middleware.push(middleware); | ||
return this; | ||
} | ||
var self = this; | ||
this.connection.on('error', function (err) { | ||
self.log.error('Error connecting to rabbitmq at ' + options.url + ' error: ' + err.toString()); | ||
throw err; | ||
}); | ||
Bus.prototype.handleIncoming = function (message, headers, deliveryInfo, messageHandle, options, callback) { | ||
var stack = this.middleware, index = this.middleware.length - 1; | ||
this.connection.on('close', function (err) { | ||
self.log.debug('rabbitmq connection closed.'); | ||
}); | ||
function next (err) { | ||
this.connection.on('ready', function () { | ||
self.initialized = true; | ||
self.log.debug("rabbitmq connected to " + self.connection.serverProperties.product); | ||
}); | ||
} | ||
var layer; | ||
var args = Array.prototype.slice.call(arguments, 1); | ||
function packageEvent(queueName, message, cid) { | ||
var data = message; | ||
var event = { | ||
cid: cid || message.cid || newId() | ||
, data: data | ||
, datetime: message.datetime || new Date().toUTCString() | ||
, type: message.type || queueName | ||
}; | ||
return event; | ||
} | ||
message = (args.length > 1) ? args[0] : message; | ||
headers = (args.length > 1) ? args[1] : headers; | ||
deliveryInfo = (args.length > 1) ? args[2] : deliveryInfo; | ||
messageHandle = (args.length > 1) ? args[3] : messageHandle; | ||
options = (args.length > 1) ? args[3] : options; | ||
Bus.prototype.listen = function listen(queueName, options, callback) { | ||
var self = this; | ||
this.log.debug('calling listen dog: ', queueName); | ||
if (typeof options === "function") { | ||
callback = options; | ||
options = {}; | ||
} | ||
layer = stack[index--]; | ||
if (self.initialized) { | ||
if (self.queues[queueName] === undefined) { | ||
this.log.debug('creating queue ' + queueName); | ||
self.queues[queueName] = new Queue(self.connection, queueName, { log: self.log }); | ||
if ( ! layer) { | ||
return callback(message, headers, deliveryInfo, messageHandle, options); | ||
} | ||
self.queues[queueName].listen(callback, options); | ||
} else { | ||
self.connection.on('ready', function() { | ||
self.log.debug('penis'); | ||
process.nextTick(function() { | ||
self.initialized = true; | ||
self.listen(queueName, options, callback); | ||
}); | ||
}); | ||
} | ||
}; | ||
Bus.prototype.send = function send(queueName, message, cid) { | ||
var event = packageEvent(queueName, message, cid); | ||
this._send(queueName, event); | ||
}; | ||
if (layer.handleIncoming) { | ||
layer.handleIncoming(message, headers, deliveryInfo, messageHandle, options, next); | ||
} else { | ||
next(null, message, headers, deliveryInfo, messageHandle, options); | ||
} | ||
Bus.prototype._send = function send(queueName, message) { | ||
var self = this; | ||
if (self.initialized) { | ||
if (self.queues[queueName] === undefined) { | ||
self.queues[queueName] = new Queue(self.connection, queueName, { log: self.log }); | ||
} | ||
self.queues[queueName].send(message); | ||
} else { | ||
var resend = function() { | ||
self.initialized = true; | ||
self._send(queueName, message); | ||
}; | ||
var timeout = function(){ | ||
self.log.debug('timout triggered'); | ||
self.connection.removeListener('ready', resend); | ||
process.nextTick(resend); | ||
}; | ||
var timeoutId = setTimeout(timeout, self.delayOnStartup); | ||
self.connection.on('ready', function() { | ||
clearTimeout(timeoutId); | ||
process.nextTick(resend); | ||
}); | ||
} | ||
}; | ||
Bus.prototype.subscribe = function subscribe(queueName, options, callback) { | ||
var self = this; | ||
next(); | ||
} | ||
if (typeof options === "function") { | ||
callback = options; | ||
options = {}; | ||
} | ||
if (self.initialized) { | ||
if (self.pubsubqueues[queueName] === undefined) { | ||
self.pubsubqueues[queueName] = new PubSubQueue(self.connection, queueName, { log: self.log }); | ||
} | ||
self.pubsubqueues[queueName].subscribe(callback, options); | ||
} else { | ||
self.connection.on('ready', function() { | ||
process.nextTick(function() { | ||
self.initialized = true; | ||
self.subscribe(queueName, options, callback); | ||
}); | ||
}); | ||
} | ||
}; | ||
Bus.prototype.publish = function publish(queueName, message, cid) { | ||
var event = packageEvent(queueName, message, cid); | ||
this._publish(queueName, event); | ||
}; | ||
Bus.prototype.handleOutgoing = function (queueName, message, callback) { | ||
var stack = this.middleware, index = 0; | ||
Bus.prototype._publish = function _publish(queueName, message, cid) { | ||
var self = this; | ||
if (self.initialized) { | ||
if (self.pubsubqueues[queueName] === undefined) { | ||
this.log.debug('creating pubsub queue ' + queueName); | ||
self.pubsubqueues[queueName] = new PubSubQueue(self.connection, queueName, { log: self.log }); | ||
function next (err) { | ||
var layer; | ||
var args = Array.prototype.slice.call(arguments, 1); | ||
queueName = (args.length > 1) ? args[0] : queueName; | ||
message = (args.length > 1) ? args[1] : message; | ||
layer = stack[index]; | ||
index++; | ||
if ( ! layer) { | ||
return callback(queueName, message); | ||
} | ||
self.pubsubqueues[queueName].publish(message); | ||
} else { | ||
var republish = function() { | ||
self.initialized = true; | ||
self._publish(queueName, message); | ||
}; | ||
self.connection.on('ready', function() { | ||
process.nextTick(republish); | ||
}); | ||
if (layer.handleOutgoing) { | ||
layer.handleOutgoing(queueName, message, next); | ||
} else { | ||
next(null, queueName, message); | ||
} | ||
} | ||
}; | ||
module.exports.bus = function bus (options, implOpts) { | ||
return new Bus(options, implOpts); | ||
}; | ||
next(null, queueName, message); | ||
} | ||
var namedBuses = {}; | ||
Bus.prototype.correlate = require('./middleware/correlate'); | ||
Bus.prototype.log = require('./middleware/log'); | ||
Bus.prototype.package = require('./middleware/package'); | ||
Bus.prototype.retry = require('./middleware/retry'); | ||
module.exports.namedBus = function namedBus(name, options, implOpts) { | ||
var bus = namedBuses[name]; | ||
if ( ! bus) { | ||
bus = namedBuses[name] = new Bus(options, implOpts); | ||
} | ||
return bus; | ||
} | ||
module.exports = Bus; |
@@ -1,1 +0,1 @@ | ||
module.exports = require('./bus/bus'); | ||
module.exports = require('./bus'); |
{ | ||
"author": "Matt Walters <mattwalters5@gmail.com>", | ||
"contributors": | ||
[ "mattwalters5@gmail.com", "timisbusy@gmail.com" ], | ||
"contributors": [ | ||
"mattwalters5@gmail.com", | ||
"timisbusy@gmail.com" | ||
], | ||
"name": "servicebus", | ||
"description": "Simple service bus for sending events between processes using amqp.", | ||
"version": "0.1.0", | ||
"version": "0.2.0", | ||
"homepage": "https://github.com/mateodelnorte/servicebus", | ||
@@ -14,13 +16,15 @@ "repository": { | ||
"engines": { | ||
"node": "~0.6.x" | ||
"node": "0.6 || 0.8 || 0.9 || 0.10" | ||
}, | ||
"dependencies": { | ||
"amqp": "0.1.3", | ||
"node-uuid": "1.3.3" | ||
"amqp": "0.1.7", | ||
"node-uuid": "1.4.0", | ||
"debug": "~0.7.2" | ||
}, | ||
"devDependencies": { | ||
"mocha": ">=1.2.1", | ||
"should": "0.6.3" | ||
"longjohn": "~0.2.0", | ||
"mocha": ">=1.2.1", | ||
"should": "0.6.3" | ||
}, | ||
"optionalDependencies": {} | ||
} |
@@ -59,2 +59,80 @@ # servicebus | ||
bus.publish('my.event', { my: 'event' }); | ||
}, 1000); | ||
}, 1000); | ||
# Middleware | ||
Servicebus allows for middleware packages to enact behavior at the time a message is sent or received. They are very similar to connect middleware in their usage: | ||
``` | ||
if ( ! process.env.RABBITMQ_URL) | ||
throw new Error('Tests require a RABBITMQ_URL environment variable to be set, pointing to the RabbiqMQ instance you wish to use.'); | ||
var busUrl = process.env.RABBITMQ_URL | ||
var bus = require('../').bus({ url: busUrl }); | ||
bus.use(bus.package()); | ||
bus.use(bus.correlate()); | ||
bus.use(bus.log()); | ||
bus.use(bus.retry()); | ||
module.exports.bus = bus; | ||
``` | ||
Middleware may defined one or two functions to modify incoming or outgoing messages: | ||
``` | ||
... | ||
function logIncoming (queueName, message, next) { | ||
log('received ' + util.inspect(message)); | ||
next(null, queueName, message); | ||
} | ||
function logOutgoing (message, headers, deliveryInfo, messageHandle, options, next) { | ||
log('sending ' + util.inspect(message)); | ||
next(null, message, headers, deliveryInfo, messageHandle, options); | ||
} | ||
return { | ||
handleIncoming: logIncoming, | ||
handleOutgoing: logOutgoing | ||
}; | ||
``` | ||
handleIncoming pipelines behavior to be enacted on an incoming message. handleOutgoing pipelines behavior to be enacted on an outgoing message. To say that the behavior is pipelined is to say that each middleware is called in succession, allowing each to enact its behavior before the next. (in from protocol->servicebus->middleware 1->middleware 2->servicebus->user code) | ||
## Included Middleware | ||
### Correlate | ||
Correlate simply adds a .cid (Correlation Identity) property to any outgoing message that doesn't already have one. This is useful for following messages in logs across services. | ||
### Log | ||
Log ensures that incoming and outgoing messages are logged to stdout via the debug module. | ||
### Package | ||
Package repackages outgoing messages, encapsulating the original message as a .data property and adding additional properties for information like message type and datetime sent: | ||
``` | ||
// bus.publish('my:event', { my: 'event' }); | ||
{ | ||
my: 'event' | ||
}; | ||
``` | ||
becomes | ||
``` | ||
{ | ||
data: { | ||
my: 'event' | ||
} | ||
, datetime: 'Wed, 04 Sep 2013 19:31:11 GMT' | ||
, type: 'my:event' | ||
}; | ||
``` | ||
### Retry | ||
Retry provides ability to specify a max number of times an erroring message will be retried before being placed on an error queue. The retry middleware requires the correlate middleware. |
var noop = function () {}; | ||
var log = { debug: noop, info: noop, warn: noop, error: noop }; | ||
var bus = require('../bus/bus').bus({ log: log }); | ||
var log = require('debug')('servicebus:test') | ||
var bus = require('./bus-shim').bus; | ||
@@ -15,3 +15,3 @@ describe('servicebus', function(){ | ||
bus.publish('my.event.11', { my: 'event' }); | ||
}, 10); | ||
}, 100); | ||
}); | ||
@@ -23,2 +23,3 @@ | ||
count++; | ||
log('received my.event.12 ' + count + ' times'); | ||
if (count === 4) { | ||
@@ -45,7 +46,7 @@ done(); | ||
bus.publish('my.event.12', { my: 'event' }); | ||
}, 10); | ||
}, 100); | ||
}); | ||
it('can handle high event throughput', function(done){ | ||
var count = 0, endCount = 10000; | ||
var count = 0, endCount = 5000; | ||
function tryDone(){ | ||
@@ -58,2 +59,3 @@ count++; | ||
bus.subscribe('my.event.13', function (event) { | ||
log('received my.event.13 ' + count + ' times'); | ||
tryDone(); | ||
@@ -65,6 +67,6 @@ }); | ||
}; | ||
}, 10); | ||
}, 100); | ||
}); | ||
it('sends subsequent messages only after previous messages are acknowledged', function(done){ | ||
it('sends subsequent messages only after previous messages are acknowledged', function (done){ | ||
var count = 0; | ||
@@ -75,9 +77,11 @@ var interval = setInterval(function checkDone () { | ||
clearInterval(interval); | ||
} else { | ||
console.log('not done yet!'); | ||
} | ||
}, 10); | ||
bus.subscribe('my.event.14', { ack: true }, function (event, handle) { | ||
} | ||
//else { | ||
// console.log('not done yet!'); | ||
// } | ||
}, 100); | ||
bus.subscribe('my.event.14', { ack: true }, function (event) { | ||
count++; | ||
handle.ack(); | ||
log('received my.event.14 ' + count + ' times'); | ||
event.handle.ack(); | ||
}); | ||
@@ -89,25 +93,6 @@ setTimeout(function () { | ||
bus.publish('my.event.14', { my: 'event' }); | ||
}, 10); | ||
}, 100); | ||
}); | ||
it('rejected messages should retry until max retries', function(done){ | ||
var count = 0; | ||
var interval = setInterval(function checkDone () { | ||
if (count === 4) { | ||
done(); | ||
clearInterval(interval); | ||
} else { | ||
console.log('not done yet!'); | ||
} | ||
}, 10); | ||
bus.subscribe('my.event.15', { ack: true }, function (event, handle) { | ||
count++; | ||
handle.reject(); | ||
}); | ||
setTimeout(function () { | ||
bus.publish('my.event.15', { my: 'event' }); | ||
}, 10); | ||
}); | ||
}); | ||
}); |
var cp = require('child_process'); | ||
var noop = function () {}; | ||
var log = { debug: noop, info: noop, warn: noop, error: noop }; | ||
var bus = require('../bus/bus').bus({ log: log }); | ||
var bus = require('./bus-shim').bus; | ||
var should = require('should'); | ||
@@ -11,3 +10,3 @@ | ||
it('should cause message to be received by listen', function(done){ | ||
it('should cause message to be received by listen', function (done){ | ||
var count = 0; | ||
@@ -22,4 +21,2 @@ function tryDone(){ | ||
var sender = cp.fork(__dirname + '/child_processes/sender.js'); | ||
bus.listen('event.22', function (event) { | ||
@@ -29,92 +26,7 @@ tryDone(); | ||
var sender = cp.fork(__dirname + '/child_processes/.sender.js'); | ||
}); | ||
// it('should distribute out to subsequent listeners when multiple listening', function(done){ | ||
// var count = 0; | ||
// function tryDone(){ | ||
// count++; | ||
// if (count === 4) { | ||
// done(); | ||
// } | ||
// } | ||
// bus.listen('my.event.2', function (event) { | ||
// tryDone(); | ||
// }); | ||
// bus.listen('my.event.2', function (event) { | ||
// tryDone(); | ||
// }); | ||
// bus.listen('my.event.2', function (event) { | ||
// tryDone(); | ||
// }); | ||
// bus.listen('my.event.2', function (event) { | ||
// tryDone(); | ||
// }); | ||
// setTimeout(function () { | ||
// bus.send('my.event.2', { my: 'event' }); | ||
// bus.send('my.event.2', { my: 'event' }); | ||
// bus.send('my.event.2', { my: 'event' }); | ||
// bus.send('my.event.2', { my: 'event' }); | ||
// }, 10); | ||
// }); | ||
// it('can handle high event throughput', function(done){ | ||
// var count = 0, endCount = 10000; | ||
// function tryDone(){ | ||
// count++; | ||
// if (count > endCount) { | ||
// done(); | ||
// } | ||
// } | ||
// bus.listen('my.event.3', function (event) { | ||
// tryDone(); | ||
// }); | ||
// setTimeout(function () { | ||
// for(var i = 0; i <= endCount; ++i){ | ||
// bus.send('my.event.3', { my: 'event' }); | ||
// }; | ||
// }, 10); | ||
// }); | ||
// it('sends subsequent messages only after previous messages are acknowledged', function(done){ | ||
// var count = 0; | ||
// var interval = setInterval(function checkDone () { | ||
// if (count === 4) { | ||
// done(); | ||
// clearInterval(interval); | ||
// } else { | ||
// console.log('not done yet!'); | ||
// } | ||
// }, 10); | ||
// bus.listen('my.event.4', { ack: true }, function (event, handle) { | ||
// count++; | ||
// handle.ack(); | ||
// }); | ||
// setTimeout(function () { | ||
// bus.send('my.event.4', { my: 'event' }); | ||
// bus.send('my.event.4', { my: 'event' }); | ||
// bus.send('my.event.4', { my: 'event' }); | ||
// bus.send('my.event.4', { my: 'event' }); | ||
// }, 10); | ||
// }); | ||
// it('rejected messages should retry until max retries', function(done){ | ||
// var count = 0; | ||
// var interval = setInterval(function checkDone () { | ||
// if (count === 4) { | ||
// done(); | ||
// clearInterval(interval); | ||
// } else { | ||
// console.log('not done yet!'); | ||
// } | ||
// }, 10); | ||
// bus.listen('my.event.5', { ack: true }, function (event, handle) { | ||
// count++; | ||
// handle.reject(); | ||
// }); | ||
// setTimeout(function () { | ||
// bus.send('my.event.5', { my: 'event' }); | ||
// }, 10); | ||
// }); | ||
}); | ||
}) |
var noop = function () {}; | ||
var log = { debug: noop, info: noop, warn: noop, error: noop }; | ||
var bus = require('../bus/bus').bus({ log : log }); | ||
var log = require('debug')('servicebus:test'); | ||
var bus = require('./bus-shim').bus; | ||
var util = require('util'); | ||
describe('servicebus', function(){ | ||
describe('#send & #listen', function(){ | ||
describe('#send & #listen', function() { | ||
@@ -47,3 +48,3 @@ it('should cause message to be received by listen', function(done){ | ||
it('can handle high event throughput', function(done){ | ||
var count = 0, endCount = 10000; | ||
var count = 0, endCount = 5000; | ||
function tryDone(){ | ||
@@ -62,3 +63,3 @@ count++; | ||
}; | ||
}, 10); | ||
}, 100); | ||
}); | ||
@@ -73,8 +74,8 @@ | ||
} else { | ||
console.log('not done yet!'); | ||
// log('not done yet!'); | ||
} | ||
}, 10); | ||
bus.listen('my.event.4', { ack: true }, function (event, handle) { | ||
bus.listen('my.event.4', { ack: true }, function (event) { | ||
count++; | ||
handle.ack(); | ||
event.handle.ack(); | ||
}); | ||
@@ -92,22 +93,3 @@ setTimeout(function () { | ||
it('rejected messages should retry until max retries', function(done){ | ||
var count = 0; | ||
var interval = setInterval(function checkDone () { | ||
if (count === 4) { | ||
done(); | ||
clearInterval(interval); | ||
} else { | ||
console.log('not done yet!'); | ||
} | ||
}, 10); | ||
bus.listen('my.event.5', { ack: true }, function (event, handle) { | ||
count++; | ||
handle.reject(); | ||
}); | ||
setTimeout(function () { | ||
bus.send('my.event.5', { my: 'event' }); | ||
}, 10); | ||
}); | ||
}); | ||
}) |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
33313
27
769
137
3
3
5
2
+ Addeddebug@~0.7.2
+ Addedamqp@0.1.7(transitive)
+ Addeddebug@0.7.4(transitive)
+ Addednode-uuid@1.4.0(transitive)
- Removedamqp@0.1.3(transitive)
- Removednode-uuid@1.3.3(transitive)
Updatedamqp@0.1.7
Updatednode-uuid@1.4.0