Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

amqp-wrapper

Package Overview
Dependencies
Maintainers
1
Versions
32
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

amqp-wrapper - npm Package Compare versions

Comparing version 3.1.0 to 4.0.0

.travis.yml

223

amqp.js
var amqp = require('amqplib'),
Q = require('q'),
_ = require('lodash');
var exchange,
queueParams = {};
var PREFETCH = 10;
_ = require('lodash'),
queueSetup = require('./queue-setup');
// When we connect, we will remember the channel here:
var channel;
module.exports = function(config) {
if (!config || !config.url || !config.exchange) {
throw new Error('amqp-wrapper: Invalid config');
}
/**
* For publishing, we assert the queue is there and bind it to the routing
* key we are going to use.
*/
var setupForPublish = function(channel, queueParams) {
var setupPublishes = queueParams.publish.map(function(queue) {
return channel.assertQueue(queue.name, queue.options)
.then(function() {
return channel.bindQueue(queue.name, exchange, queue.routingKey);
});
});
return Q.all(setupPublishes);
};
var channel;
// For consuming, we only assert the queue is there.
var setupForConsume = function(channel, queueParams) {
channel.prefetch(PREFETCH);
return channel.assertQueue(queueParams.consume.name,
queueParams.consume.options);
};
config.prefetch = config.prefetch || 10;
if (process.env.NODE_ENV === 'test') {
exports.replaceSetupFuncs = function(consume, publish) {
setupForPublish = publish;
setupForConsume = consume;
};
exports.getSetupFuncs = function() {
return {publish: setupForPublish, consume: setupForConsume};
};
}
var ret = {
/**
* Passes the AMQP channel created to the callback.
*/
connect: function(cb) {
// amqp.connect throws on some error conditions, rather than resolving the
// promise. Hence the need for the try/catch.
try {
Q(amqp.connect(config.url))
.then(function(conn) {
return conn.createConfirmChannel();
})
.then(function(ch) {
channel = ch;
/**
* Passes the AMQP channel created to the callback.
*/
exports.connect = function(uri, exch, _queueParams, cb) {
queueParams = _queueParams;
exchange = exch;
// amqp.connect throws on some error conditions, rather than resolving the
// promise. Hence the need for the try/catch.
try {
Q(amqp.connect(uri))
.then(function(conn) {
return conn.createConfirmChannel();
})
.then(function(ch) {
channel = ch;
var assert_exchange = ch.assertExchange(exchange, 'topic');
var todo = assert_exchange;
if (queueParams.publish && queueParams.publish instanceof Array) {
todo = todo.then(setupForPublish(ch, queueParams));
var promise = ch.assertExchange(config.exchange, 'topic');
if (config.queues.publish && config.queues.publish instanceof Array) {
promise = promise.then(queueSetup.setupForPublish(ch, config));
}
if (config.queues.consume && config.queues.consume.name) {
promise = promise.then(queueSetup.setupForConsume(ch, config));
}
return promise;
}).nodeify(cb);
}
if (queueParams.consume && queueParams.consume.name) {
todo = todo.then(setupForConsume(ch, queueParams));
catch (e) {
console.log('Exception thrown in AMQP connection and setup.');
cb(e);
}
return todo;
}).nodeify(cb);
}
catch (e) {
console.log('Exception thrown in AMQP connection and setup.');
cb(e);
}
};
},
/**
* Publish a message to one of the AMQP queues specified on connect.
* @param {string} name The name of the queue to use.
* @param {string} The message to publish.
* @param {Function(err)} The callback to call when done.
*/
exports.publishToQueue = function(name, message, callback) {
if (typeof message === 'object') message = JSON.stringify(message);
var publishQueue = _.find(queueParams.publish, {'name': name});
channel.publish(exchange, publishQueue.routingKey, new Buffer(message),
{}, callback);
};
/**
* Publish a message to one of the AMQP queues specified on connect.
* @param {string} name The name of the queue to use.
* @param {string} The message to publish.
* @param {Function(err)} The callback to call when done.
*/
publishToQueue: function(name, message, callback) {
if (typeof message === 'object') message = JSON.stringify(message);
var publishQueue = _.find(config.queues.publish, {'name': name});
channel.publish(config.exchange, publishQueue.routingKey, new Buffer(message),
{}, callback);
},
/**
* Publish a message using the specified routing key.
* @param {string} name The name of the queue to use.
* @param {string} The message to publish.
* @param {Object} options Any options to pass through to the underlying
* publish.
* @param {Function(err)} The callback to call when done.
*/
exports.publish = function(routingKey, message, options, callback) {
if (typeof message === 'object') message = JSON.stringify(message);
channel.publish(exchange, routingKey, new Buffer(message), options, callback);
};
/**
* Publish a message using the specified routing key.
* @param {string} name The name of the queue to use.
* @param {string} The message to publish.
* @param {Object} options Any options to pass through to the underlying
* publish.
* @param {Function(err)} The callback to call when done.
*/
publish: function(routingKey, message, options, callback) {
if (typeof message === 'object') message = JSON.stringify(message);
channel.publish(config.exchange, routingKey, new Buffer(message), options, callback);
},
/**
* handleMessage() is expected to be of the form:
* handleMessage(parsedMessage, callback).
* If callback is called with a non-null error, then the message will be
* nacked. You can call it like:
* callback(err, requeue) in order
* to instruct rabbit whether to requeue the message (or discard/dead letter).
*
* If not given, requeue is assumed to be false.
*
* cf http://squaremo.github.io/amqp.node/doc/channel_api.html#toc_34
*/
exports.consume = function(handleMessage) {
function callback(message) {
function done(err, requeue) {
if (requeue === undefined) requeue = false;
if (err) return channel.nack(message, false, requeue);
channel.ack(message);
}
/**
* handleMessage() is expected to be of the form:
* handleMessage(parsedMessage, callback).
* If callback is called with a non-null error, then the message will be
* nacked. You can call it like:
* callback(err, requeue) in order
* to instruct rabbit whether to requeue the message (or discard/dead letter).
*
* If not given, requeue is assumed to be false.
*
* cf http://squaremo.github.io/amqp.node/doc/channel_api.html#toc_34
*/
consume: function(handleMessage) {
function callback(message) {
function done(err, requeue) {
if (requeue === undefined) requeue = false;
if (err) return channel.nack(message, false, requeue);
channel.ack(message);
}
try {
var messagePayload = message.content.toString();
var parsedPayload = JSON.parse(messagePayload);
handleMessage(parsedPayload, done);
try {
var messagePayload = message.content.toString();
var parsedPayload = JSON.parse(messagePayload);
handleMessage(parsedPayload, done);
}
catch (error) {
console.log(error);
// Do not requeue on exception - it means something unexpected (and prob.
// non-transitory) happened.
done(error, false);
}
}
channel.consume(config.queues.consume.name, callback, {noAck: false});
}
catch (error) {
console.log(error);
// Do not requeue on exception - it means something unexpected (and prob.
// non-transitory) happened.
done(error, false);
}
}
};
channel.consume(queueParams.consume.name, callback, {noAck: false});
return ret;
};
exports.prefetch = function(value) {
channel.prefetch(value);
};
if (process.env.NODE_ENV == 'test') {
exports.queueParams = function(_queueParams) {
queueParams = _queueParams;
};
}
// vim: set et sw=2 ts=2 colorcolumn=80:
{
"name": "amqp-wrapper",
"version": "3.1.0",
"version": "4.0.0",
"description": "A wrapper around https://github.com/squaremo/amqp.node to make consuming and publishing dead easy.",
"main": "amqp.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
"test": "grunt test"
},

@@ -29,10 +29,12 @@ "repository": {

"devDependencies": {
"expect.js": "^0.3.1",
"grunt": "~0.4.2",
"grunt-exec": "~0.4.2",
"grunt-env": "~0.4.0",
"grunt-contrib-jshint": "~0.7.2",
"grunt-contrib-watch": "~0.5.3",
"grunt-env": "~0.4.0",
"grunt-exec": "~0.4.2",
"grunt-mocha-cli": "~1.4.0",
"lodash": "^2.4.1",
"mocha-unfunk-reporter": "~0.3.7",
"chai": "~1.8.1",
"q": "^0.9.7",
"sandboxed-module": "~0.3.0",

@@ -39,0 +41,0 @@ "sinon": "~1.8.1"

@@ -15,23 +15,28 @@ amqp-wrapper

var queues = {
consume: {
name: process.env.AMQP_CONSUME,
options: {deadLetterExchange: process.env.AMQP_DEAD_LETTER_EXCHANGE}
var config = {
url: process.env.AMQP_URL,
exchange: process.env.AMQP_EXCHANGE,
queues: {
consume: {
name: process.env.AMQP_CONSUME,
options: {deadLetterExchange: process.env.AMQP_DEAD_LETTER_EXCHANGE}
},
publish: [
{
name: process.env.AMQP_RESPONSE,
routingKey: process.env.AMQP_RESPONSE_ROUTING_KEY,
options: {/* ... */} // options passed to ch.assertQueue() in wrapped lib.
},
{ // ...
}
]
},
publish: [
{
name: process.env.AMQP_RESPONSE,
routingKey: process.env.AMQP_RESPONSE_ROUTING_KEY,
options: {/* ... */} // options passed to ch.assertQueue() in wrapped lib.
},
{ // ...
}
]
// Set the QOS/prefetch.
prefetch: 100
};
AMQP.connect(process.env.AMQP_URL, process.env.AMQP_EXCHANGE,
queues, amqpConnectDone);
var amqp = AMQP(config);
// Set the QOS/prefetch.
AMQP.prefetch(100);
// Must call this before you consume/publish/etc...
amqp.connect(amqpConnectDone);

@@ -50,9 +55,9 @@ // Consuming

// Start consuming:
AMQP.consume(handleMessage);
amqp.consume(handleMessage);
// Publishing to one of the queues declared on connect.
AMQP.publishToQueue(name, payload, done);
amqp.publishToQueue(name, payload, done);
// Publishing to arbitrary routing key.
AMQP.publish(routingKey, payload, done);
amqp.publish(routingKey, payload, options, done);

@@ -59,0 +64,0 @@ If `payload` is an object, it will be turned into JSON.

var SandboxedModule = require('sandboxed-module'),
expect = require('chai').expect,
sinon = require('sinon'),
$ = require('lodash');
expect = require('expect.js'),
Sinon = require('sinon'),
AMQP = require('../amqp');
describe('AMQP', function() {
var AMQP;
beforeEach(function() {
AMQP = require('../amqp');
});
var config = {
url: 'amqp://guest:guest@localhost',
exchange: 'mytestexchange',
queues: {
consume: {
name: 'myconsumequeue'
},
publish: [
{
name: 'mypublishqueue',
routingKey: 'mypublishqueuerk'
}
]
}
};
describe('#connect', function() {
it('should call the callback successfully', function(done) {
AMQP.connect('amqp://guest:guest@localhost', 'mytestexchange', {
consume: {
name: 'myconsumequeue'
},
publish: [
{
name: 'mypublishqueue',
routingKey: 'mypublishqueuerk'
}
]
}, done);
var amqp = AMQP(config);
amqp.connect(done);
});
it('should setup for publishing and consuming', function(done) {
AMQP.replaceSetupFuncs(sinon.spy(), sinon.spy());
AMQP.connect('amqp://guest:guest@localhost', 'mytestexchange', {
consume: {name: 'myconsumequeue2'},
publish: [
{
name: 'mypublishqueue2',
routingKey: 'mypublishqueue2'
}
]
}, function(err) {
var queueSetup = require('../queue-setup');
var amqp = AMQP(config);
Sinon.stub(queueSetup, 'setupForConsume');
Sinon.stub(queueSetup, 'setupForPublish');
amqp.connect(function(err) {
if (err) return done(err);
expect(AMQP.getSetupFuncs().consume.calledOnce, 'setupForConsume()').to.
equal(true);
expect(AMQP.getSetupFuncs().publish.calledOnce, 'setupForPublish()').to.
equal(true);
expect(queueSetup.setupForConsume.calledOnce).to.be(true);
expect(queueSetup.setupForPublish.calledOnce).to.be(true);
done();

@@ -47,12 +42,6 @@ });

it('should call the callback successfully', function(done) {
AMQP.connect('amqp://guest:guest@localhost', 'mytestexchange', {
publish: [
{
name: 'myqueue',
routingKey: 'myqueuekey'
}
]
}, function(err, res) {
var amqp = AMQP(config);
amqp.connect(function(err) {
if (err) return done(err);
AMQP.publishToQueue('myqueue', 'test', done);
amqp.publishToQueue('mypublishqueue', 'test', done);
});

@@ -63,13 +52,13 @@ });

it('should call the callback successfully', function(done) {
AMQP.connect('amqp://guest:guest@localhost', 'mytestexchange', {},
function(err, res) {
var amqp = AMQP(config);
amqp.connect(function(err) {
if (err) return done(err);
AMQP.publish('myqueue', 'test', {}, done);
amqp.publish('myqueue', 'test', {}, done);
});
});
it('should accept objects', function(done) {
AMQP.connect('amqp://guest:guest@localhost', 'mytestexchange', {},
function(err, res) {
var amqp = AMQP(config);
amqp.connect(function(err) {
if (err) return done(err);
AMQP.publish('myqueue', {woo: 'test'}, {}, done);
amqp.publish('myqueue', {woo: 'test'}, {}, done);
});

@@ -79,23 +68,4 @@ });

describe('#consume', function() {
function createMockedModuleObject(messageToDeliver, additionals) {
var channelMock = {
consume: function (a, handleMessage, b) {
handleMessage({
content: {
toString: function() {return messageToDeliver;}
}
});
}
};
return {
locals: {
channel: $.extend(channelMock, additionals)
}
};
}
it('if done(err) is called with err === null, calls ack().',
function(done) {
var ackSpy = sinon.spy(function(message) {
it('if done(err) is called with err === null, calls ack().', function(done) {
var ackSpy = Sinon.spy(function(message) {
done();

@@ -105,4 +75,4 @@ });

// message will be {}. Mock out 'ack' method.
createMockedModuleObject('{}', {ack: ackSpy}));
mockedAMQP.queueParams({consume: {}});
require('./amqplibmock')('{}', {ack: ackSpy})
)(config);

@@ -113,3 +83,6 @@ function myMessageHandler(parsedMsg, cb) {

mockedAMQP.consume(myMessageHandler);
mockedAMQP.connect(function(err) {
if(err) return done(err);
mockedAMQP.consume(myMessageHandler);
});
});

@@ -119,3 +92,3 @@

function(done) {
var nackSpy = sinon.spy(function(message, upTo, requeue) {
var nackSpy = Sinon.spy(function(message, upTo, requeue) {
expect(requeue).to.equal(false);

@@ -127,4 +100,4 @@ done();

// message will be invalid json. Mock out 'nack' method.
createMockedModuleObject('nonvalidjson', {nack: nackSpy}));
mockedAMQP.queueParams({consume: {}});
require('./amqplibmock')('nonvalidjson', {nack: nackSpy})
)(config);

@@ -135,16 +108,18 @@ function myMessageHandler(parsedMsg, cb) {

mockedAMQP.consume(myMessageHandler);
mockedAMQP.connect(function(err) {
if(err) return done(err);
mockedAMQP.consume(myMessageHandler);
});
});
it('if json callback called with err, calls nack() with requeue as given.',
function(done) {
var nackSpy = sinon.spy(function(message, upTo, requeue) {
var nackSpy = Sinon.spy(function(message, upTo, requeue) {
expect(requeue).to.equal('requeue');
done();
});
var mockedAMQP = SandboxedModule.require('../amqp',
// message will be {}. Mock out 'nack' method.
createMockedModuleObject('{}', {nack: nackSpy}));
mockedAMQP.queueParams({consume: {}});
require('./amqplibmock')('{}', {nack: nackSpy})
)(config);
function myMessageHandler(parsedMsg, cb) {

@@ -154,3 +129,6 @@ cb(new Error('got it bad'), 'requeue');

mockedAMQP.consume(myMessageHandler);
mockedAMQP.connect(function(err) {
if(err) return done(err);
mockedAMQP.consume(myMessageHandler);
});
});

@@ -157,0 +135,0 @@ });

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc