servicebus
Advanced tools
Comparing version
@@ -33,2 +33,3 @@ var util = require('util'), | ||
this.connection.on('ready', function () { | ||
self.initialized = true; | ||
self.log.debug("rabbitmq connected to " + self.connection.serverProperties.product); | ||
@@ -51,3 +52,3 @@ }); | ||
var self = this; | ||
this.log.debug('calling listen dog: ', queueName); | ||
if (typeof options === "function") { | ||
@@ -65,14 +66,8 @@ callback = options; | ||
} else { | ||
var relisten = function() { | ||
self.initialized = true; | ||
self.listen(queueName, options, callback); | ||
}; | ||
var timeout = function(){ | ||
self.connection.removeListener('ready', relisten); | ||
process.nextTick(relisten); | ||
}; | ||
var timeoutId = setTimeout(timeout, self.delayOnStartup); | ||
self.connection.on('ready', function() { | ||
clearTimeout(timeoutId); | ||
process.nextTick(relisten); | ||
self.log.debug('penis'); | ||
process.nextTick(function() { | ||
self.initialized = true; | ||
self.listen(queueName, options, callback); | ||
}); | ||
}); | ||
@@ -125,14 +120,7 @@ } | ||
} else { | ||
var resubscribe = function() { | ||
self.initialized = true; | ||
self.subscribe(queueName, options, callback); | ||
}; | ||
var timeout = function() { | ||
self.connection.removeListener('ready', resubscribe); | ||
process.nextTick(resubscribe); | ||
}; | ||
var timeoutId = setTimeout(timeout, 1000); | ||
self.connection.on('ready', function() { | ||
clearTimeout(timeoutId); | ||
process.nextTick(resubscribe); | ||
process.nextTick(function() { | ||
self.initialized = true; | ||
self.subscribe(queueName, options, callback); | ||
}); | ||
}); | ||
@@ -139,0 +127,0 @@ } |
@@ -47,3 +47,3 @@ var events = require('events'), | ||
} | ||
} | ||
}; | ||
@@ -54,16 +54,24 @@ function PubSubQueue (connection, queueName, options) { | ||
this.errorQueueName = queueName + '.error'; | ||
this.exchange = connection.exchange('amq.topic', { type: 'topic', durable: true, autoDelete: false }); | ||
this.log = options.log; | ||
this.maxRetries = options.maxRetries || 3; | ||
this.queueName = queueName; | ||
this.rejected = {}; | ||
this.rejected = {}; | ||
var self = this; | ||
} | ||
connection.exchange('amq.topic', { type: 'topic', durable: true, autoDelete: false }, function (exchange) { | ||
self.exchange = exchange; | ||
}); | ||
}; | ||
PubSubQueue.prototype.publish = function publish (event) { | ||
var self = this; | ||
this.log.debug('publishing to exchange ' + self.exchange.name + ' ' + self.queueName + ' event ' + util.inspect(event)); | ||
self.exchange.publish(self.queueName, event, { contentType: 'application/json', deliveryMode: 2 }); | ||
if ( ! this.exchange) { | ||
this.connection.on('ready', function () { | ||
self.publish(event); | ||
}); | ||
} else { | ||
this.log.debug('publishing to exchange ' + self.exchange.name + ' ' + self.queueName + ' event ' + util.inspect(event)); | ||
process.nextTick(function () { | ||
self.exchange.publish(self.queueName, event, { contentType: 'application/json', deliveryMode: 2 }); | ||
}); | ||
} | ||
}; | ||
@@ -70,0 +78,0 @@ |
@@ -71,6 +71,9 @@ var events = require('events'), | ||
Queue.prototype.send = function send (event) { | ||
var self = this; | ||
this.log.debug('sending to queue ' + this.queueName + ' event ' + util.inspect(event)); | ||
this.connection.publish(this.queueName, event, { contentType: 'application/json', deliveryMode: 2 }); | ||
process.nextTick(function () { | ||
self.connection.publish(self.queueName, event, { contentType: 'application/json', deliveryMode: 2 }); | ||
}); | ||
}; | ||
module.exports = Queue; |
@@ -7,3 +7,3 @@ { | ||
"description": "Simple service bus for sending events between processes using amqp.", | ||
"version": "0.0.3", | ||
"version": "0.0.4", | ||
"homepage": "https://github.com/mateodelnorte/servicebus", | ||
@@ -10,0 +10,0 @@ "repository": { |
@@ -7,2 +7,2 @@ var noop = function () {}; | ||
bus.publish('event.21', { event: 1 }); | ||
}, 100); | ||
}, 1000); |
@@ -7,2 +7,2 @@ var noop = function () {}; | ||
bus.send('event.22', { event: 1 }); | ||
}, 100); | ||
}, 5000); |
var noop = function () {}; | ||
var log = { debug: noop, info: noop, warn: noop, error: noop }; | ||
var bus = require('../bus/bus').bus({ log: log }); | ||
var bus = require('../bus/bus').bus({ log : log }); | ||
@@ -79,6 +79,9 @@ describe('servicebus', function(){ | ||
setTimeout(function () { | ||
bus.send('my.event.4', { my: 'event' }); | ||
bus.send('my.event.4', { my: 'event' }); | ||
bus.send('my.event.4', { my: 'event' }); | ||
bus.send('my.event.4', { my: 'event' }); | ||
//process.nextTick(function () { | ||
bus.send('my.event.4', { my: 'event' }); | ||
bus.send('my.event.4', { my: 'event' }); | ||
bus.send('my.event.4', { my: 'event' }); | ||
bus.send('my.event.4', { my: 'event' }); | ||
//}); | ||
}, 10); | ||
@@ -85,0 +88,0 @@ }); |
Sorry, the diff of this file is not supported yet
New author
Supply chain riskA new npm collaborator published a version of the package for the first time. New collaborators are usually benign additions to a project, but do indicate a change to the security surface area of a package.
Found 1 instance in 1 package
705
0.57%1
-50%26646
-0.09%