Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

servicebus

Package Overview
Dependencies
Maintainers
2
Versions
73
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

servicebus - npm Package Compare versions

Comparing version 0.4.1 to 0.4.3

10

bus/bus.js

@@ -0,6 +1,12 @@

var EventEmitter = require('events').EventEmitter;
var util = require('util');
function Bus () {
this.incomingMiddleware = [];
this.outgoingMiddleware = [];
EventEmitter.call(this);
}
util.inherits(Bus, EventEmitter);
Bus.prototype.use = function (middleware) {

@@ -17,2 +23,4 @@ if (middleware.handleIncoming) this.incomingMiddleware.push(middleware.handleIncoming);

function next (err) {
if (err) throw err; // at this point we don't have a mechanism for providing an error-aware callback to sends and publishes,
// so we'll throw. in the future we can check for the presense of one and throw if it's not provided

@@ -46,2 +54,4 @@ var layer;

function next (err) {
if (err) throw err; // at this point we don't have a mechanism for providing a callback to sends and publishes,
// so we'll throw. in the future we can check for the presense of one and throw if it's not provided

@@ -48,0 +58,0 @@ var layer;

74

bus/rabbitmq/bus.js
var amqp = require('amqp'),
Bus = require('../bus'),
Correlator = require('./correlator'),
log = require('debug')('servicebus'),

@@ -18,3 +19,4 @@ events = require('events'),

options.vhost = options.vhost || process.env.RABBITMQ_VHOST || '/';
this.correlator = new Correlator(options);
this.delayOnStartup = options.delayOnStartup || 10;

@@ -24,2 +26,3 @@ this.log = options.log || log;

this.queues = {};
this.queuesFile = options.queuesFile;

@@ -34,4 +37,11 @@ log('connecting to rabbitmq on ' + options.url);

self.log('rabbitmq connection closed.');
self.emit('close');
});
this.connection.on('error', function (err) {
// if you don't want servicebus to crash on error, you'll need to listen on the
// bus' error event, log or do whatever you like.
self.emit('error', err);
});
this.initialized = new Promise(function (resolve, reject) {

@@ -43,2 +53,3 @@

self.log("rabbitmq connected to " + self.connection.serverProperties.product);
self.emit('ready');
resolve();

@@ -49,3 +60,3 @@ });

self.log('Error connecting to rabbitmq at ' + options.url + ' error: ' + err.toString());
throw err;
self.emit('error', err);
});

@@ -69,9 +80,12 @@

this.setOptions(queueName, options);
this.initialized.done(function() {
if (self.queues[queueName] === undefined) {
log('creating queue ' + queueName);
self.queues[queueName] = new Queue({ bus: self, connection: self.connection, queueName: queueName, log: self.log });
if (self.queues[options.queueName] === undefined) {
log('creating queue ' + options.queueName);
self.queues[options.queueName] = new Queue(options);
}
self.queues[queueName].listen(callback, options);
self.queues[options.queueName].listen(callback, options);
});

@@ -97,2 +111,20 @@

RabbitMQBus.prototype.setOptions = function (queueName, options) {
if (typeof queueName === 'object') {
options.queueName = queueName.queueName;
options.routingKey = queueName.routingKey;
queueName = queueName.queueName;
} else {
options.queueName = queueName;
}
extend(options, {
bus: this,
connection: this.connection,
correlator: this.correlator,
log: this.log,
queuesFile: this.queuesFile
});
}
RabbitMQBus.prototype.send = function send (queueName, message, options) {

@@ -102,8 +134,9 @@ var self = this;

this.setOptions(queueName, options);
this.initialized.done(function() {
if (self.queues[queueName] === undefined) {
extend(options, { bus: self, connection: self.connection, queueName: queueName, log: self.log });
self.queues[queueName] = new Queue(options);
if (self.queues[options.queueName] === undefined) {
self.queues[options.queueName] = new Queue(options);
}
self.handleOutgoing(queueName, message, function (queueName, message) {
self.handleOutgoing(options.queueName, message, function (queueName, message) {
log('sending to queue ' + queueName + ' event ' + util.inspect(message));

@@ -124,2 +157,4 @@ self.queues[queueName].send(message);

this.setOptions(queueName, options);
var handle = null;

@@ -131,6 +166,6 @@ function _unsubscribe (options) {

this.initialized.done(function() {
if (self.pubsubqueues[queueName] === undefined) {
self.pubsubqueues[queueName] = new PubSubQueue({ bus: self, connection: self.connection, queueName: queueName, log: self.log });
if (self.pubsubqueues[options.queueName] === undefined) {
self.pubsubqueues[options.queueName] = new PubSubQueue(options);
}
handle = self.pubsubqueues[queueName].subscribe(callback, options);
handle = self.pubsubqueues[options.queueName].subscribe(options, callback);
});

@@ -144,12 +179,15 @@

RabbitMQBus.prototype.publish = function publish (queueName, message) {
RabbitMQBus.prototype.publish = function publish (queueName, message, options) {
var self = this;
options = options || {};
this.setOptions(queueName, options);
this.initialized.done(function() {
if (self.pubsubqueues[queueName] === undefined) {
log('creating pubsub queue ' + queueName);
self.pubsubqueues[queueName] = new PubSubQueue({ bus: self, connection: self.connection, queueName: queueName, log: self.log });
if (self.pubsubqueues[options.queueName] === undefined) {
log('creating pubsub queue ' + options.queueName);
self.pubsubqueues[options.queueName] = new PubSubQueue(options);
}
self.handleOutgoing(queueName, message, function (queueName, message) {
self.handleOutgoing(options.queueName, message, function (queueName, message) {
log('sending to queue ' + queueName + ' event ' + util.inspect(message));

@@ -156,0 +194,0 @@ self.pubsubqueues[queueName].publish(message);

var events = require('events'),
fs = require('fs'),
newId = require('node-uuid'),
path = require('path'),
Promise = require('bluebird'),
QueueRegistry = require('./queueregistry'),
util = require('util');
function Correlator () {
events.EventEmitter.call(this);
var queues = {};
function Correlator (options) {
var self = this;
this.ready = false;
this.registry = new QueueRegistry();
this.registry.getCurrentQueues(function (err, queues) {
self.ready = true;
self.emit('ready');
this.filename = (options && options.queuesFile) ? path.join(process.cwd(), options.queuesFile) : path.join(process.cwd(), '.queues');
this.loading = new Promise(function (resolve, reject) {
var result;
fs.readFile(self.filename, function (err, buf) {
if (err) {
return resolve({});
}
try {
result = JSON.parse(buf.toString());
} catch (err) {
result = {};
} finally {
resolve(result);
}
});
});
events.EventEmitter.call(this);
}

@@ -19,33 +35,25 @@

Correlator.prototype.getUniqueId = function getUniqueId (queueName, subscriptionId, callback) {
var id,
self = this;
if ( ! subscriptionId) {
id = queueName + '.' + newId();
return callback(null, id);
}
function getIdFromQueueData () {
if( ! self.registry.queues[subscriptionId]) {
id = queueName + '.' + newId();
self.registry.queues[subscriptionId] = id;
self.registry.setCurrentQueues(self.registry.queues, function (err) {
if (err) callback(err);
else callback(null, id);
});
Correlator.prototype.queueName = function queueName (options, callback) {
var self = this;
this.loading.done(function (result) {
queues = result;
var queueName;
if (queues.hasOwnProperty(options.queueName)) {
queueName = queues[options.queueName];
} else {
id = self.registry.queues[subscriptionId];
callback(null, id);
queueName = util.format('%s.%s', options.queueName, newId());
queues[options.queueName] = queueName;
}
}
if (this.ready) {
getIdFromQueueData();
} else {
this.on('ready', function () {
getIdFromQueueData();
self.persistQueueFile(function (err) {
if (err) return callback(err);
callback(null, queueName);
});
}
});
};
Correlator.prototype.persistQueueFile = function (callback) {
var contents = JSON.stringify(queues);
fs.writeFile(this.filename, contents, callback);
}
module.exports = Correlator;

@@ -1,3 +0,2 @@

var Correlator = require('./correlator'),
events = require('events'),
var events = require('events'),
newId = require('node-uuid').v4,

@@ -10,4 +9,4 @@ Serializer = require('./serializer'),

this.connection = options.connection;
this.correlator = new Correlator();
// this.errorQueueName = options.queueName + '.error';
this.correlator = options.correlator;
this.errorQueueName = options.queueName + '.error';
this.log = options.log;

@@ -32,3 +31,3 @@ this.maxRetries = options.maxRetries || 3;

this.log('publishing to exchange ' + self.exchange.name + ' ' + self.queueName + ' event ' + util.inspect(event));
process.nextTick(function () {
setImmediate(function () {
self.exchange.publish(self.queueName, event, { contentType: 'application/json', deliveryMode: 2 });

@@ -39,3 +38,3 @@ });

PubSubQueue.prototype.subscribe = function subscribe (callback, options) {
PubSubQueue.prototype.subscribe = function subscribe (options, callback) {
var self = this,

@@ -47,12 +46,12 @@ uniqueName,

// if (options && options.ack) {
// queueOptions.durable = true;
// queueOptions.autoDelete = false;
// self.connection.queue(self.errorQueueName, queueOptions, function (q) {
// q.bind(self.exchange, self.errorQueueName);
// q.on('queueBindOk', function() {
// self.log('bound to ' + self.errorQueueName);
// });
// });
// }
if (options && options.ack) {
queueOptions.durable = true;
queueOptions.autoDelete = false;
self.connection.queue(self.errorQueueName, queueOptions, function (q) {
q.bind(self.exchange, self.errorQueueName);
q.on('queueBindOk', function() {
self.log('bound to ' + self.errorQueueName);
});
});
}

@@ -63,6 +62,4 @@ var queue;

}
this.correlator.getUniqueId(self.queueName, options.subscriptionId, function (err, _id) {
this.correlator.queueName(options, function (err, uniqueName) {
if (err) throw err;
uniqueName = _id;
self.connection.queue(uniqueName, queueOptions, function (q) {

@@ -69,0 +66,0 @@ queue = q;

@@ -1,5 +0,15 @@

var events = require('events'),
util = require('util');
var EventEmitter = require('events').EventEmitter;
var extend = require('extend');
var Promise = require('bluebird');
var util = require('util');
function Queue(options) {
function Queue (options) {
var options = options || {};
var queueOptions = options.queueOptions || {};
extend(queueOptions, {
autoDelete: ! (options.ack || options.acknowledge),
durable: options.ack || options.acknowledge
});
this.bus = options.bus;

@@ -13,9 +23,49 @@ this.connection = options.connection;

this.rejected = {};
this.routingKey = options.routingKey;
this.contentType = options.contentType || 'application/json';
this.deliveryMode = (options.ack || options.acknowledge || options.persistent) ? 2 : 1; // default to non-persistent messages
this.deliveryMode = (options.ack || options.acknowledge || options.persistent)
? 2
: 1; // default to non-persistent messages
this.ack = (options.ack || options.acknowledge);
events.EventEmitter.call(this);
EventEmitter.call(this);
var self = this;
this.initialized = Promise.all([
// we're initialized when our queues are bound
new Promise(function (resolve, reject) {
self.log('connecting to queue ' + self.queueName);
self.queue = self.connection.queue(self.queueName, queueOptions, function () {
self.log('binding to routingKey ' + self.routingKey || self.queueName);
self.queue.bind(self.routingKey || self.queueName);
self.queue.on('queueBindOk', function() {
self.log('bound to queue ' + self.queueName);
resolve();
});
});
}),
new Promise(function (resolve, reject) {
if (self.ack) {
queueOptions.durable = true;
queueOptions.autoDelete = false;
self.errorQueue = self.connection.queue(self.queueName + '.error', queueOptions, function (eq) {
eq.bind(self.errorQueueName);
eq.on('queueBindOk', function () {
self.log('bound to ' + self.errorQueueName);
resolve();
});
});
} else {
resolve();
}
})
]).catch(function (err) {
self.log('error connecting to queue ', options.queueName, '. error: ' + err.toString());
self.emit('error', err);
});
}
util.inherits(Queue, events.EventEmitter);
util.inherits(Queue, EventEmitter);

@@ -32,27 +82,13 @@ Queue.prototype.error = function error (event) {

var self = this;
this.log('listening to queue ' + this.queueName + ' with options ' + util.inspect(options));
if (options && options.ack) {
queueOptions.durable = true;
queueOptions.autoDelete = false;
self.errorQueue = self.connection.queue(self.queueName + '.error', queueOptions, function(eq) {
eq.bind(self.errorQueueName);
eq.on('queueBindOk', function() {
self.log('bound to ' + self.errorQueueName);
this.initialized.done(function () {
self.queue.subscribe(options, function (message, headers, deliveryInfo, messageHandle) {
self.bus.handleIncoming(message, headers, deliveryInfo, messageHandle, options, function (message, headers, deliveryInfo, messageHandle, options) {
callback(message, headers, deliveryInfo, messageHandle, options);
});
}).on('success', function (subscription) {
self.subscription = subscription;
});
}
self.queue = this.connection.queue(this.queueName, queueOptions, function() {
self.queue.bind(self.queueName);
self.queue.on('queueBindOk', function() {
self.log('listening to queue ' + self.queueName + ' with options ' + util.inspect(options));
self.queue.subscribe(options, function (message, headers, deliveryInfo, messageHandle) {
self.bus.handleIncoming(message, headers, deliveryInfo, messageHandle, options, function (message, headers, deliveryInfo, messageHandle, options) {
callback(message, headers, deliveryInfo, messageHandle, options);
});
}).on('success', function (subscription) {
self.subscription = subscription;
});
self.initialized = true;
});
});

@@ -77,4 +113,4 @@ };

var self = this;
process.nextTick(function () {
self.connection.publish(self.queueName, event, {
this.initialized.done(function () {
self.connection.publish(self.routingKey || self.queueName, event, {
contentType: self.contentType,

@@ -81,0 +117,0 @@ deliveryMode: self.deliveryMode

var events = require('events'),
fs = require('fs'),
path = require('path'),
util = require('util');

@@ -8,3 +9,3 @@

function Serializer () {
this.filename = process.cwd() + '/.queues';
this.filename = path.join(process.cwd(), '.queues');
events.EventEmitter.call(this);

@@ -11,0 +12,0 @@ }

@@ -10,3 +10,3 @@ {

"description": "Simple service bus for sending events between processes using amqp.",
"version": "0.4.1",
"version": "0.4.3",
"homepage": "https://github.com/mateodelnorte/servicebus",

@@ -13,0 +13,0 @@ "repository": {

@@ -9,3 +9,3 @@ var noop = function () {};

it('should cause message to be received by subscribe', function(done){
it('should cause message to be received by subscribe', function (done){
bus.subscribe('my.event.11', function (event) {

@@ -19,3 +19,3 @@ done();

it('should fan out to when multiple listening', function(done){
it('should fan out to when multiple listening', function (done){
var count = 0;

@@ -49,3 +49,4 @@ function tryDone(){

it('can handle high event throughput', function(done){
it('can handle high event throughput', function (done){
this.timeout(30000);
var count = 0, endCount = 5000;

@@ -93,4 +94,8 @@ function tryDone(){

});
// it('should allow for a mixture of ack:true and ack:false subscriptions', function () {
// });
});
});

@@ -10,3 +10,3 @@ var noop = function () {};

it('should cause message to be received by listen', function(done){
it('should cause message to be received by listen', function (done){
bus.listen('my.event.1', function (event) {

@@ -20,3 +20,3 @@ done();

it('should distribute out to subsequent listeners when multiple listening', function(done){
it('should distribute out to subsequent listeners when multiple listening', function (done){
var count = 0;

@@ -49,3 +49,4 @@ function tryDone(){

it('can handle high event throughput', function(done){
it('can handle high event throughput', function (done){
this.timeout(30000);
var count = 0, endCount = 5000;

@@ -68,13 +69,11 @@ function tryDone(){

it('sends subsequent messages only after previous messages are acknowledged', function(done){
it('sends subsequent messages only after previous messages are acknowledged', function (done){
var count = 0;
var interval = setInterval(function checkDone () {
if (count === 4) {
clearInterval(interval);
bus.destroyListener('my.event.4').on('success', function () {
clearInterval(interval);
done();
});
} else {
// log('not done yet!');
}
}
}, 10);

@@ -86,11 +85,42 @@ bus.listen('my.event.4', { ack: true }, function (event) {

setTimeout(function () {
//process.nextTick(function () {
bus.send('my.event.4', { my: 'event' });
bus.send('my.event.4', { my: 'event' });
bus.send('my.event.4', { my: 'event' });
bus.send('my.event.4', { my: 'event' });
//});
bus.send('my.event.4', { my: 'event' });
bus.send('my.event.4', { my: 'event' });
bus.send('my.event.4', { my: 'event' });
bus.send('my.event.4', { my: 'event' });
}, 10);
});
// it('allows routing based on routingKey, on same named queue', function (done){
// var count = 0;
// var interval = setInterval(function checkDone () {
// if (count === 4) {
// clearInterval(interval);
// bus.destroyListener('my.event.routingKey').on('success', function () {
// done();
// });
// }
// }, 10);
// bus.listen('my.event.routingKey.1', { ack: true }, function (event) {
// count++;
// event.handle.ack();
// });
// bus.listen('my.event.routingKey.2', { ack: true }, function (event) {
// count++;
// event.handle.ack();
// });
// bus.listen('my.event.routingKey.3', { ack: true }, function (event) {
// count++;
// event.handle.ack();
// });
// bus.listen('my.event.routingKey.4', { ack: true }, function (event) {
// count++;
// event.handle.ack();
// });
// setTimeout(function () {
// bus.send({ queueName: 'my.event.routingKey', routingKey: 'my.event.routingKey.1' }, { my: 'event1' });
// bus.send({ queueName: 'my.event.routingKey', routingKey: 'my.event.routingKey.2' }, { my: 'event2' });
// bus.send({ queueName: 'my.event.routingKey', routingKey: 'my.event.routingKey.3' }, { my: 'event3' });
// bus.send({ queueName: 'my.event.routingKey', routingKey: 'my.event.routingKey.4' }, { my: 'event4' });
// }, 10);
// });

@@ -112,3 +142,3 @@ });

});
}, 2000);
}, 1500);
});

@@ -129,3 +159,3 @@

});
}, 2000);
}, 1500);
});

@@ -132,0 +162,0 @@

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc