You're Invited: Meet the Socket team at BSidesSF and RSAC - April 27 - May 1.RSVP
Socket
Sign inDemoInstall
Socket

servicebus

Package Overview
Dependencies
Maintainers
2
Versions
73
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

servicebus - npm Package Compare versions

Comparing version

to
0.0.4

34

bus/bus.js

@@ -33,2 +33,3 @@ var util = require('util'),

this.connection.on('ready', function () {
self.initialized = true;
self.log.debug("rabbitmq connected to " + self.connection.serverProperties.product);

@@ -51,3 +52,3 @@ });

var self = this;
this.log.debug('calling listen dog: ', queueName);
if (typeof options === "function") {

@@ -65,14 +66,8 @@ callback = options;

} else {
var relisten = function() {
self.initialized = true;
self.listen(queueName, options, callback);
};
var timeout = function(){
self.connection.removeListener('ready', relisten);
process.nextTick(relisten);
};
var timeoutId = setTimeout(timeout, self.delayOnStartup);
self.connection.on('ready', function() {
clearTimeout(timeoutId);
process.nextTick(relisten);
self.log.debug('penis');
process.nextTick(function() {
self.initialized = true;
self.listen(queueName, options, callback);
});
});

@@ -125,14 +120,7 @@ }

} else {
var resubscribe = function() {
self.initialized = true;
self.subscribe(queueName, options, callback);
};
var timeout = function() {
self.connection.removeListener('ready', resubscribe);
process.nextTick(resubscribe);
};
var timeoutId = setTimeout(timeout, 1000);
self.connection.on('ready', function() {
clearTimeout(timeoutId);
process.nextTick(resubscribe);
process.nextTick(function() {
self.initialized = true;
self.subscribe(queueName, options, callback);
});
});

@@ -139,0 +127,0 @@ }

@@ -47,3 +47,3 @@ var events = require('events'),

}
}
};

@@ -54,16 +54,24 @@ function PubSubQueue (connection, queueName, options) {

this.errorQueueName = queueName + '.error';
this.exchange = connection.exchange('amq.topic', { type: 'topic', durable: true, autoDelete: false });
this.log = options.log;
this.maxRetries = options.maxRetries || 3;
this.queueName = queueName;
this.rejected = {};
this.rejected = {};
var self = this;
}
connection.exchange('amq.topic', { type: 'topic', durable: true, autoDelete: false }, function (exchange) {
self.exchange = exchange;
});
};
PubSubQueue.prototype.publish = function publish (event) {
var self = this;
this.log.debug('publishing to exchange ' + self.exchange.name + ' ' + self.queueName + ' event ' + util.inspect(event));
self.exchange.publish(self.queueName, event, { contentType: 'application/json', deliveryMode: 2 });
if ( ! this.exchange) {
this.connection.on('ready', function () {
self.publish(event);
});
} else {
this.log.debug('publishing to exchange ' + self.exchange.name + ' ' + self.queueName + ' event ' + util.inspect(event));
process.nextTick(function () {
self.exchange.publish(self.queueName, event, { contentType: 'application/json', deliveryMode: 2 });
});
}
};

@@ -70,0 +78,0 @@

@@ -71,6 +71,9 @@ var events = require('events'),

Queue.prototype.send = function send (event) {
var self = this;
this.log.debug('sending to queue ' + this.queueName + ' event ' + util.inspect(event));
this.connection.publish(this.queueName, event, { contentType: 'application/json', deliveryMode: 2 });
process.nextTick(function () {
self.connection.publish(self.queueName, event, { contentType: 'application/json', deliveryMode: 2 });
});
};
module.exports = Queue;

@@ -7,3 +7,3 @@ {

"description": "Simple service bus for sending events between processes using amqp.",
"version": "0.0.3",
"version": "0.0.4",
"homepage": "https://github.com/mateodelnorte/servicebus",

@@ -10,0 +10,0 @@ "repository": {

@@ -7,2 +7,2 @@ var noop = function () {};

bus.publish('event.21', { event: 1 });
}, 100);
}, 1000);

@@ -7,2 +7,2 @@ var noop = function () {};

bus.send('event.22', { event: 1 });
}, 100);
}, 5000);
var noop = function () {};
var log = { debug: noop, info: noop, warn: noop, error: noop };
var bus = require('../bus/bus').bus({ log: log });
var bus = require('../bus/bus').bus({ log : log });

@@ -79,6 +79,9 @@ describe('servicebus', function(){

setTimeout(function () {
bus.send('my.event.4', { my: 'event' });
bus.send('my.event.4', { my: 'event' });
bus.send('my.event.4', { my: 'event' });
bus.send('my.event.4', { my: 'event' });
//process.nextTick(function () {
bus.send('my.event.4', { my: 'event' });
bus.send('my.event.4', { my: 'event' });
bus.send('my.event.4', { my: 'event' });
bus.send('my.event.4', { my: 'event' });
//});
}, 10);

@@ -85,0 +88,0 @@ });

Sorry, the diff of this file is not supported yet