Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

servicebus

Package Overview
Dependencies
Maintainers
2
Versions
73
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

servicebus - npm Package Compare versions

Comparing version 1.0.1 to 1.0.6

29

bus/rabbitmq/bus.js

@@ -160,6 +160,4 @@ var amqp = require('amqplib'),

this.initialized.done(function() {
function _send (queueName, message, options) {
self.setOptions(queueName, options);
if (self.queues[options.queueName] === undefined) {

@@ -171,4 +169,13 @@ self.queues[options.queueName] = new Queue(options);

});
}
});
if ( ! this.initialized.isFulfilled()) {
self.initialized.done(function() {
_send(queueName, message, options);
});
} else {
_send(queueName, message, options);
}
};

@@ -208,6 +215,4 @@

this.initialized.done(function() {
function _publish (queueName, message, options) {
self.setOptions(queueName, options);
if (self.pubsubqueues[options.queueName] === undefined) {

@@ -221,7 +226,13 @@ self.log('creating pubsub queue ' + options.queueName);

});
}
});
if ( ! this.initialized.isFulfilled()) {
this.initialized.done(function() {
_publish(queueName, message, options);
});
} else {
_publish(queueName, message, options);
}
};
module.exports.Bus = RabbitMQBus;
var events = require('events');
var extend = require('extend');
var newId = require('node-uuid').v4;
var Promise = require('bluebird');
var util = require('util');

@@ -69,42 +70,55 @@

this.correlator.queueName(options, function (err, uniqueName) {
if (err) throw err;
self.listenChannel.assertQueue(uniqueName, self.queueOptions)
.then(function (qok) {
return self.listenChannel.bindQueue(uniqueName, self.exchangeName, self.routingKey || self.queueName);
}).then(function (queue) {
self.listenChannel.consume(uniqueName, function (message) {
/*
Note from http://www.squaremobius.net/amqp.node/doc/channel_api.html
& http://www.rabbitmq.com/consumer-cancel.html:
var initialized = new Promise(function (resolve, reject) {
self.correlator.queueName(options, function (err, uniqueName) {
if (err) throw err;
self.listenChannel.assertQueue(uniqueName, self.queueOptions)
.then(function (qok) {
return self.listenChannel.bindQueue(uniqueName, self.exchangeName, self.routingKey || self.queueName);
}).then(function () {
if (self.ack) {
self.log('asserting error queue ' + self.errorQueueName);
self.listenChannel.assertQueue(self.errorQueueName, self.queueOptions)
.then(function (_qok) {
resolve(uniqueName);
});
} else {
resolve(uniqueName);
}
});
});
});
If the consumer is cancelled by RabbitMQ, the message callback will be invoked with null.
*/
if (message === null) {
return;
initialized.then(function (uniqueName) {
self.listenChannel.consume(uniqueName, function (message) {
/*
Note from http://www.squaremobius.net/amqp.node/doc/channel_api.html
& http://www.rabbitmq.com/consumer-cancel.html:
If the consumer is cancelled by RabbitMQ, the message callback will be invoked with null.
*/
if (message === null) {
return;
}
// todo: map contentType to default formatters
message.content = options.formatter.deserialize(message.content);
options.queueType = 'pubsubqueue';
self.bus.handleIncoming(self.listenChannel, message, options, function (channel, message, options) {
// amqplib intercepts errors and closes connections before bubbling up
// to domain error handlers when they occur non-asynchronously within
// callback. Therefore, if there is a process domain, we try-catch to
// redirect the error, assuming the domain creator's intentions.
try {
callback(message.content, message);
} catch (err) {
if (process.domain && process.domain.listeners('error')) {
process.domain.emit('error', err);
} else {
self.emit('error', err);
}
// todo: map contentType to default formatters
message.content = options.formatter.deserialize(message.content);
options.queueType = 'pubsubqueue';
self.bus.handleIncoming(self.listenChannel, message, options, function (channel, message, options) {
// amqplib intercepts errors and closes connections before bubbling up
// to domain error handlers when they occur non-asynchronously within
// callback. Therefore, if there is a process domain, we try-catch to
// redirect the error, assuming the domain creator's intentions.
try {
callback(message.content, message);
} catch (err) {
if (process.domain && process.domain.listeners('error')) {
process.domain.emit('error', err);
} else {
self.emit('error', err);
}
}
});
}, { noAck: ! self.ack })
.then(function (ok) {
self.subscription = { consumerTag: ok.consumerTag };
});
}
});
}, { noAck: ! self.ack })
.then(function (ok) {
self.subscription = { consumerTag: ok.consumerTag };
});
});

@@ -111,0 +125,0 @@

@@ -10,3 +10,3 @@ {

"description": "Simple service bus for sending events between processes using amqp.",
"version": "1.0.1",
"version": "1.0.6",
"homepage": "https://github.com/mateodelnorte/servicebus",

@@ -13,0 +13,0 @@ "repository": {

@@ -102,3 +102,3 @@ var noop = function () {};

setTimeout(function () {
bus.publish('my.event.15', { data: Math.random() }, { ack: true });
bus.publish('my.event.15', { data: Math.random() });
}, 1000);

@@ -105,0 +105,0 @@ });

@@ -147,7 +147,11 @@ var noop = function () {};

bus.listen('my.event.18', { ack: true }, function (event) {
done(new Error('should not receive events after destroy'));
event.handle.ack();
// commence ugliest hack ever to get around rabbitmq queue consumer rules
setTimeout(function () {
done(new Error('should not receive events after destroy'));
}, 500);
});
setTimeout(function () {
bus.destroyListener('my.event.18').on('success', function () {
bus.send('my.event.18', { test: 'data'});
bus.send('my.event.18', { test: 'data'}, { ack: true, expiration: 100 });
setTimeout(done, 100);

@@ -154,0 +158,0 @@ });

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc