New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

servicebus

Package Overview
Dependencies
Maintainers
2
Versions
73
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

servicebus - npm Package Compare versions

Comparing version 0.1.0 to 0.2.0

bus/index.js

199

bus/bus.js

@@ -1,163 +0,78 @@

var util = require('util'),
amqp = require('amqp'),
events = require('events'),
PubSubQueue = require('./pubsubqueue'),
Queue = require('./queue'),
newId = require('node-uuid');
function Bus () {
this.middleware = [];
}
function Bus(options, implOpts) {
var noop = function () {};
options = options || {}, implOpts, self = this;
options.url = options.url || process.env.RABBITMQ_URL || 'amqp://localhost';
implOpts = implOpts || { defaultExchangeName: 'amq.topic' };
this.log = options.log || { debug: noop, info: noop, warn: noop, error: noop };
this.delayOnStartup = options.delayOnStartup || 10;
this.initialized = false;
this.log.debug('connecting to rabbitmq on ' + options.url);
this.connection = amqp.createConnection(options, implOpts);
this.pubsubqueues = {};
this.queues = {};
Bus.prototype.use = function (middleware) {
this.middleware.push(middleware);
return this;
}
var self = this;
this.connection.on('error', function (err) {
self.log.error('Error connecting to rabbitmq at ' + options.url + ' error: ' + err.toString());
throw err;
});
Bus.prototype.handleIncoming = function (message, headers, deliveryInfo, messageHandle, options, callback) {
var stack = this.middleware, index = this.middleware.length - 1;
this.connection.on('close', function (err) {
self.log.debug('rabbitmq connection closed.');
});
function next (err) {
this.connection.on('ready', function () {
self.initialized = true;
self.log.debug("rabbitmq connected to " + self.connection.serverProperties.product);
});
}
var layer;
var args = Array.prototype.slice.call(arguments, 1);
function packageEvent(queueName, message, cid) {
var data = message;
var event = {
cid: cid || message.cid || newId()
, data: data
, datetime: message.datetime || new Date().toUTCString()
, type: message.type || queueName
};
return event;
}
message = (args.length > 1) ? args[0] : message;
headers = (args.length > 1) ? args[1] : headers;
deliveryInfo = (args.length > 1) ? args[2] : deliveryInfo;
messageHandle = (args.length > 1) ? args[3] : messageHandle;
options = (args.length > 1) ? args[3] : options;
Bus.prototype.listen = function listen(queueName, options, callback) {
var self = this;
this.log.debug('calling listen dog: ', queueName);
if (typeof options === "function") {
callback = options;
options = {};
}
layer = stack[index--];
if (self.initialized) {
if (self.queues[queueName] === undefined) {
this.log.debug('creating queue ' + queueName);
self.queues[queueName] = new Queue(self.connection, queueName, { log: self.log });
if ( ! layer) {
return callback(message, headers, deliveryInfo, messageHandle, options);
}
self.queues[queueName].listen(callback, options);
} else {
self.connection.on('ready', function() {
self.log.debug('penis');
process.nextTick(function() {
self.initialized = true;
self.listen(queueName, options, callback);
});
});
}
};
Bus.prototype.send = function send(queueName, message, cid) {
var event = packageEvent(queueName, message, cid);
this._send(queueName, event);
};
if (layer.handleIncoming) {
layer.handleIncoming(message, headers, deliveryInfo, messageHandle, options, next);
} else {
next(null, message, headers, deliveryInfo, messageHandle, options);
}
Bus.prototype._send = function send(queueName, message) {
var self = this;
if (self.initialized) {
if (self.queues[queueName] === undefined) {
self.queues[queueName] = new Queue(self.connection, queueName, { log: self.log });
}
self.queues[queueName].send(message);
} else {
var resend = function() {
self.initialized = true;
self._send(queueName, message);
};
var timeout = function(){
self.log.debug('timout triggered');
self.connection.removeListener('ready', resend);
process.nextTick(resend);
};
var timeoutId = setTimeout(timeout, self.delayOnStartup);
self.connection.on('ready', function() {
clearTimeout(timeoutId);
process.nextTick(resend);
});
}
};
Bus.prototype.subscribe = function subscribe(queueName, options, callback) {
var self = this;
next();
}
if (typeof options === "function") {
callback = options;
options = {};
}
if (self.initialized) {
if (self.pubsubqueues[queueName] === undefined) {
self.pubsubqueues[queueName] = new PubSubQueue(self.connection, queueName, { log: self.log });
}
self.pubsubqueues[queueName].subscribe(callback, options);
} else {
self.connection.on('ready', function() {
process.nextTick(function() {
self.initialized = true;
self.subscribe(queueName, options, callback);
});
});
}
};
Bus.prototype.publish = function publish(queueName, message, cid) {
var event = packageEvent(queueName, message, cid);
this._publish(queueName, event);
};
Bus.prototype.handleOutgoing = function (queueName, message, callback) {
var stack = this.middleware, index = 0;
Bus.prototype._publish = function _publish(queueName, message, cid) {
var self = this;
if (self.initialized) {
if (self.pubsubqueues[queueName] === undefined) {
this.log.debug('creating pubsub queue ' + queueName);
self.pubsubqueues[queueName] = new PubSubQueue(self.connection, queueName, { log: self.log });
function next (err) {
var layer;
var args = Array.prototype.slice.call(arguments, 1);
queueName = (args.length > 1) ? args[0] : queueName;
message = (args.length > 1) ? args[1] : message;
layer = stack[index];
index++;
if ( ! layer) {
return callback(queueName, message);
}
self.pubsubqueues[queueName].publish(message);
} else {
var republish = function() {
self.initialized = true;
self._publish(queueName, message);
};
self.connection.on('ready', function() {
process.nextTick(republish);
});
if (layer.handleOutgoing) {
layer.handleOutgoing(queueName, message, next);
} else {
next(null, queueName, message);
}
}
};
module.exports.bus = function bus (options, implOpts) {
return new Bus(options, implOpts);
};
next(null, queueName, message);
}
var namedBuses = {};
Bus.prototype.correlate = require('./middleware/correlate');
Bus.prototype.log = require('./middleware/log');
Bus.prototype.package = require('./middleware/package');
Bus.prototype.retry = require('./middleware/retry');
module.exports.namedBus = function namedBus(name, options, implOpts) {
var bus = namedBuses[name];
if ( ! bus) {
bus = namedBuses[name] = new Bus(options, implOpts);
}
return bus;
}
module.exports = Bus;

@@ -1,1 +0,1 @@

module.exports = require('./bus/bus');
module.exports = require('./bus');
{
"author": "Matt Walters <mattwalters5@gmail.com>",
"contributors":
[ "mattwalters5@gmail.com", "timisbusy@gmail.com" ],
"contributors": [
"mattwalters5@gmail.com",
"timisbusy@gmail.com"
],
"name": "servicebus",
"description": "Simple service bus for sending events between processes using amqp.",
"version": "0.1.0",
"version": "0.2.0",
"homepage": "https://github.com/mateodelnorte/servicebus",

@@ -14,13 +16,15 @@ "repository": {

"engines": {
"node": "~0.6.x"
"node": "0.6 || 0.8 || 0.9 || 0.10"
},
"dependencies": {
"amqp": "0.1.3",
"node-uuid": "1.3.3"
"amqp": "0.1.7",
"node-uuid": "1.4.0",
"debug": "~0.7.2"
},
"devDependencies": {
"mocha": ">=1.2.1",
"should": "0.6.3"
"longjohn": "~0.2.0",
"mocha": ">=1.2.1",
"should": "0.6.3"
},
"optionalDependencies": {}
}

@@ -59,2 +59,80 @@ # servicebus

bus.publish('my.event', { my: 'event' });
}, 1000);
}, 1000);
# Middleware
Servicebus allows for middleware packages to enact behavior at the time a message is sent or received. They are very similar to connect middleware in their usage:
```
if ( ! process.env.RABBITMQ_URL)
throw new Error('Tests require a RABBITMQ_URL environment variable to be set, pointing to the RabbiqMQ instance you wish to use.');
var busUrl = process.env.RABBITMQ_URL
var bus = require('../').bus({ url: busUrl });
bus.use(bus.package());
bus.use(bus.correlate());
bus.use(bus.log());
bus.use(bus.retry());
module.exports.bus = bus;
```
Middleware may defined one or two functions to modify incoming or outgoing messages:
```
...
function logIncoming (queueName, message, next) {
log('received ' + util.inspect(message));
next(null, queueName, message);
}
function logOutgoing (message, headers, deliveryInfo, messageHandle, options, next) {
log('sending ' + util.inspect(message));
next(null, message, headers, deliveryInfo, messageHandle, options);
}
return {
handleIncoming: logIncoming,
handleOutgoing: logOutgoing
};
```
handleIncoming pipelines behavior to be enacted on an incoming message. handleOutgoing pipelines behavior to be enacted on an outgoing message. To say that the behavior is pipelined is to say that each middleware is called in succession, allowing each to enact its behavior before the next. (in from protocol->servicebus->middleware 1->middleware 2->servicebus->user code)
## Included Middleware
### Correlate
Correlate simply adds a .cid (Correlation Identity) property to any outgoing message that doesn't already have one. This is useful for following messages in logs across services.
### Log
Log ensures that incoming and outgoing messages are logged to stdout via the debug module.
### Package
Package repackages outgoing messages, encapsulating the original message as a .data property and adding additional properties for information like message type and datetime sent:
```
// bus.publish('my:event', { my: 'event' });
{
my: 'event'
};
```
becomes
```
{
data: {
my: 'event'
}
, datetime: 'Wed, 04 Sep 2013 19:31:11 GMT'
, type: 'my:event'
};
```
### Retry
Retry provides ability to specify a max number of times an erroring message will be retried before being placed on an error queue. The retry middleware requires the correlate middleware.
var noop = function () {};
var log = { debug: noop, info: noop, warn: noop, error: noop };
var bus = require('../bus/bus').bus({ log: log });
var log = require('debug')('servicebus:test')
var bus = require('./bus-shim').bus;

@@ -15,3 +15,3 @@ describe('servicebus', function(){

bus.publish('my.event.11', { my: 'event' });
}, 10);
}, 100);
});

@@ -23,2 +23,3 @@

count++;
log('received my.event.12 ' + count + ' times');
if (count === 4) {

@@ -45,7 +46,7 @@ done();

bus.publish('my.event.12', { my: 'event' });
}, 10);
}, 100);
});
it('can handle high event throughput', function(done){
var count = 0, endCount = 10000;
var count = 0, endCount = 5000;
function tryDone(){

@@ -58,2 +59,3 @@ count++;

bus.subscribe('my.event.13', function (event) {
log('received my.event.13 ' + count + ' times');
tryDone();

@@ -65,6 +67,6 @@ });

};
}, 10);
}, 100);
});
it('sends subsequent messages only after previous messages are acknowledged', function(done){
it('sends subsequent messages only after previous messages are acknowledged', function (done){
var count = 0;

@@ -75,9 +77,11 @@ var interval = setInterval(function checkDone () {

clearInterval(interval);
} else {
console.log('not done yet!');
}
}, 10);
bus.subscribe('my.event.14', { ack: true }, function (event, handle) {
}
//else {
// console.log('not done yet!');
// }
}, 100);
bus.subscribe('my.event.14', { ack: true }, function (event) {
count++;
handle.ack();
log('received my.event.14 ' + count + ' times');
event.handle.ack();
});

@@ -89,25 +93,6 @@ setTimeout(function () {

bus.publish('my.event.14', { my: 'event' });
}, 10);
}, 100);
});
it('rejected messages should retry until max retries', function(done){
var count = 0;
var interval = setInterval(function checkDone () {
if (count === 4) {
done();
clearInterval(interval);
} else {
console.log('not done yet!');
}
}, 10);
bus.subscribe('my.event.15', { ack: true }, function (event, handle) {
count++;
handle.reject();
});
setTimeout(function () {
bus.publish('my.event.15', { my: 'event' });
}, 10);
});
});
});
var cp = require('child_process');
var noop = function () {};
var log = { debug: noop, info: noop, warn: noop, error: noop };
var bus = require('../bus/bus').bus({ log: log });
var bus = require('./bus-shim').bus;
var should = require('should');

@@ -11,3 +10,3 @@

it('should cause message to be received by listen', function(done){
it('should cause message to be received by listen', function (done){
var count = 0;

@@ -22,4 +21,2 @@ function tryDone(){

var sender = cp.fork(__dirname + '/child_processes/sender.js');
bus.listen('event.22', function (event) {

@@ -29,92 +26,7 @@ tryDone();

var sender = cp.fork(__dirname + '/child_processes/.sender.js');
});
// it('should distribute out to subsequent listeners when multiple listening', function(done){
// var count = 0;
// function tryDone(){
// count++;
// if (count === 4) {
// done();
// }
// }
// bus.listen('my.event.2', function (event) {
// tryDone();
// });
// bus.listen('my.event.2', function (event) {
// tryDone();
// });
// bus.listen('my.event.2', function (event) {
// tryDone();
// });
// bus.listen('my.event.2', function (event) {
// tryDone();
// });
// setTimeout(function () {
// bus.send('my.event.2', { my: 'event' });
// bus.send('my.event.2', { my: 'event' });
// bus.send('my.event.2', { my: 'event' });
// bus.send('my.event.2', { my: 'event' });
// }, 10);
// });
// it('can handle high event throughput', function(done){
// var count = 0, endCount = 10000;
// function tryDone(){
// count++;
// if (count > endCount) {
// done();
// }
// }
// bus.listen('my.event.3', function (event) {
// tryDone();
// });
// setTimeout(function () {
// for(var i = 0; i <= endCount; ++i){
// bus.send('my.event.3', { my: 'event' });
// };
// }, 10);
// });
// it('sends subsequent messages only after previous messages are acknowledged', function(done){
// var count = 0;
// var interval = setInterval(function checkDone () {
// if (count === 4) {
// done();
// clearInterval(interval);
// } else {
// console.log('not done yet!');
// }
// }, 10);
// bus.listen('my.event.4', { ack: true }, function (event, handle) {
// count++;
// handle.ack();
// });
// setTimeout(function () {
// bus.send('my.event.4', { my: 'event' });
// bus.send('my.event.4', { my: 'event' });
// bus.send('my.event.4', { my: 'event' });
// bus.send('my.event.4', { my: 'event' });
// }, 10);
// });
// it('rejected messages should retry until max retries', function(done){
// var count = 0;
// var interval = setInterval(function checkDone () {
// if (count === 4) {
// done();
// clearInterval(interval);
// } else {
// console.log('not done yet!');
// }
// }, 10);
// bus.listen('my.event.5', { ack: true }, function (event, handle) {
// count++;
// handle.reject();
// });
// setTimeout(function () {
// bus.send('my.event.5', { my: 'event' });
// }, 10);
// });
});
})
var noop = function () {};
var log = { debug: noop, info: noop, warn: noop, error: noop };
var bus = require('../bus/bus').bus({ log : log });
var log = require('debug')('servicebus:test');
var bus = require('./bus-shim').bus;
var util = require('util');
describe('servicebus', function(){
describe('#send & #listen', function(){
describe('#send & #listen', function() {

@@ -47,3 +48,3 @@ it('should cause message to be received by listen', function(done){

it('can handle high event throughput', function(done){
var count = 0, endCount = 10000;
var count = 0, endCount = 5000;
function tryDone(){

@@ -62,3 +63,3 @@ count++;

};
}, 10);
}, 100);
});

@@ -73,8 +74,8 @@

} else {
console.log('not done yet!');
// log('not done yet!');
}
}, 10);
bus.listen('my.event.4', { ack: true }, function (event, handle) {
bus.listen('my.event.4', { ack: true }, function (event) {
count++;
handle.ack();
event.handle.ack();
});

@@ -92,22 +93,3 @@ setTimeout(function () {

it('rejected messages should retry until max retries', function(done){
var count = 0;
var interval = setInterval(function checkDone () {
if (count === 4) {
done();
clearInterval(interval);
} else {
console.log('not done yet!');
}
}, 10);
bus.listen('my.event.5', { ack: true }, function (event, handle) {
count++;
handle.reject();
});
setTimeout(function () {
bus.send('my.event.5', { my: 'event' });
}, 10);
});
});
})

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc