amqp-wrapper
Advanced tools
Comparing version 3.1.0 to 4.0.0
223
amqp.js
var amqp = require('amqplib'), | ||
Q = require('q'), | ||
_ = require('lodash'); | ||
var exchange, | ||
queueParams = {}; | ||
var PREFETCH = 10; | ||
_ = require('lodash'), | ||
queueSetup = require('./queue-setup'); | ||
// When we connect, we will remember the channel here: | ||
var channel; | ||
module.exports = function(config) { | ||
if (!config || !config.url || !config.exchange) { | ||
throw new Error('amqp-wrapper: Invalid config'); | ||
} | ||
/** | ||
* For publishing, we assert the queue is there and bind it to the routing | ||
* key we are going to use. | ||
*/ | ||
var setupForPublish = function(channel, queueParams) { | ||
var setupPublishes = queueParams.publish.map(function(queue) { | ||
return channel.assertQueue(queue.name, queue.options) | ||
.then(function() { | ||
return channel.bindQueue(queue.name, exchange, queue.routingKey); | ||
}); | ||
}); | ||
return Q.all(setupPublishes); | ||
}; | ||
var channel; | ||
// For consuming, we only assert the queue is there. | ||
var setupForConsume = function(channel, queueParams) { | ||
channel.prefetch(PREFETCH); | ||
return channel.assertQueue(queueParams.consume.name, | ||
queueParams.consume.options); | ||
}; | ||
config.prefetch = config.prefetch || 10; | ||
if (process.env.NODE_ENV === 'test') { | ||
exports.replaceSetupFuncs = function(consume, publish) { | ||
setupForPublish = publish; | ||
setupForConsume = consume; | ||
}; | ||
exports.getSetupFuncs = function() { | ||
return {publish: setupForPublish, consume: setupForConsume}; | ||
}; | ||
} | ||
var ret = { | ||
/** | ||
* Passes the AMQP channel created to the callback. | ||
*/ | ||
connect: function(cb) { | ||
// amqp.connect throws on some error conditions, rather than resolving the | ||
// promise. Hence the need for the try/catch. | ||
try { | ||
Q(amqp.connect(config.url)) | ||
.then(function(conn) { | ||
return conn.createConfirmChannel(); | ||
}) | ||
.then(function(ch) { | ||
channel = ch; | ||
/** | ||
* Passes the AMQP channel created to the callback. | ||
*/ | ||
exports.connect = function(uri, exch, _queueParams, cb) { | ||
queueParams = _queueParams; | ||
exchange = exch; | ||
// amqp.connect throws on some error conditions, rather than resolving the | ||
// promise. Hence the need for the try/catch. | ||
try { | ||
Q(amqp.connect(uri)) | ||
.then(function(conn) { | ||
return conn.createConfirmChannel(); | ||
}) | ||
.then(function(ch) { | ||
channel = ch; | ||
var assert_exchange = ch.assertExchange(exchange, 'topic'); | ||
var todo = assert_exchange; | ||
if (queueParams.publish && queueParams.publish instanceof Array) { | ||
todo = todo.then(setupForPublish(ch, queueParams)); | ||
var promise = ch.assertExchange(config.exchange, 'topic'); | ||
if (config.queues.publish && config.queues.publish instanceof Array) { | ||
promise = promise.then(queueSetup.setupForPublish(ch, config)); | ||
} | ||
if (config.queues.consume && config.queues.consume.name) { | ||
promise = promise.then(queueSetup.setupForConsume(ch, config)); | ||
} | ||
return promise; | ||
}).nodeify(cb); | ||
} | ||
if (queueParams.consume && queueParams.consume.name) { | ||
todo = todo.then(setupForConsume(ch, queueParams)); | ||
catch (e) { | ||
console.log('Exception thrown in AMQP connection and setup.'); | ||
cb(e); | ||
} | ||
return todo; | ||
}).nodeify(cb); | ||
} | ||
catch (e) { | ||
console.log('Exception thrown in AMQP connection and setup.'); | ||
cb(e); | ||
} | ||
}; | ||
}, | ||
/** | ||
* Publish a message to one of the AMQP queues specified on connect. | ||
* @param {string} name The name of the queue to use. | ||
* @param {string} The message to publish. | ||
* @param {Function(err)} The callback to call when done. | ||
*/ | ||
exports.publishToQueue = function(name, message, callback) { | ||
if (typeof message === 'object') message = JSON.stringify(message); | ||
var publishQueue = _.find(queueParams.publish, {'name': name}); | ||
channel.publish(exchange, publishQueue.routingKey, new Buffer(message), | ||
{}, callback); | ||
}; | ||
/** | ||
* Publish a message to one of the AMQP queues specified on connect. | ||
* @param {string} name The name of the queue to use. | ||
* @param {string} The message to publish. | ||
* @param {Function(err)} The callback to call when done. | ||
*/ | ||
publishToQueue: function(name, message, callback) { | ||
if (typeof message === 'object') message = JSON.stringify(message); | ||
var publishQueue = _.find(config.queues.publish, {'name': name}); | ||
channel.publish(config.exchange, publishQueue.routingKey, new Buffer(message), | ||
{}, callback); | ||
}, | ||
/** | ||
* Publish a message using the specified routing key. | ||
* @param {string} name The name of the queue to use. | ||
* @param {string} The message to publish. | ||
* @param {Object} options Any options to pass through to the underlying | ||
* publish. | ||
* @param {Function(err)} The callback to call when done. | ||
*/ | ||
exports.publish = function(routingKey, message, options, callback) { | ||
if (typeof message === 'object') message = JSON.stringify(message); | ||
channel.publish(exchange, routingKey, new Buffer(message), options, callback); | ||
}; | ||
/** | ||
* Publish a message using the specified routing key. | ||
* @param {string} name The name of the queue to use. | ||
* @param {string} The message to publish. | ||
* @param {Object} options Any options to pass through to the underlying | ||
* publish. | ||
* @param {Function(err)} The callback to call when done. | ||
*/ | ||
publish: function(routingKey, message, options, callback) { | ||
if (typeof message === 'object') message = JSON.stringify(message); | ||
channel.publish(config.exchange, routingKey, new Buffer(message), options, callback); | ||
}, | ||
/** | ||
* handleMessage() is expected to be of the form: | ||
* handleMessage(parsedMessage, callback). | ||
* If callback is called with a non-null error, then the message will be | ||
* nacked. You can call it like: | ||
* callback(err, requeue) in order | ||
* to instruct rabbit whether to requeue the message (or discard/dead letter). | ||
* | ||
* If not given, requeue is assumed to be false. | ||
* | ||
* cf http://squaremo.github.io/amqp.node/doc/channel_api.html#toc_34 | ||
*/ | ||
exports.consume = function(handleMessage) { | ||
function callback(message) { | ||
function done(err, requeue) { | ||
if (requeue === undefined) requeue = false; | ||
if (err) return channel.nack(message, false, requeue); | ||
channel.ack(message); | ||
} | ||
/** | ||
* handleMessage() is expected to be of the form: | ||
* handleMessage(parsedMessage, callback). | ||
* If callback is called with a non-null error, then the message will be | ||
* nacked. You can call it like: | ||
* callback(err, requeue) in order | ||
* to instruct rabbit whether to requeue the message (or discard/dead letter). | ||
* | ||
* If not given, requeue is assumed to be false. | ||
* | ||
* cf http://squaremo.github.io/amqp.node/doc/channel_api.html#toc_34 | ||
*/ | ||
consume: function(handleMessage) { | ||
function callback(message) { | ||
function done(err, requeue) { | ||
if (requeue === undefined) requeue = false; | ||
if (err) return channel.nack(message, false, requeue); | ||
channel.ack(message); | ||
} | ||
try { | ||
var messagePayload = message.content.toString(); | ||
var parsedPayload = JSON.parse(messagePayload); | ||
handleMessage(parsedPayload, done); | ||
try { | ||
var messagePayload = message.content.toString(); | ||
var parsedPayload = JSON.parse(messagePayload); | ||
handleMessage(parsedPayload, done); | ||
} | ||
catch (error) { | ||
console.log(error); | ||
// Do not requeue on exception - it means something unexpected (and prob. | ||
// non-transitory) happened. | ||
done(error, false); | ||
} | ||
} | ||
channel.consume(config.queues.consume.name, callback, {noAck: false}); | ||
} | ||
catch (error) { | ||
console.log(error); | ||
// Do not requeue on exception - it means something unexpected (and prob. | ||
// non-transitory) happened. | ||
done(error, false); | ||
} | ||
} | ||
}; | ||
channel.consume(queueParams.consume.name, callback, {noAck: false}); | ||
return ret; | ||
}; | ||
exports.prefetch = function(value) { | ||
channel.prefetch(value); | ||
}; | ||
if (process.env.NODE_ENV == 'test') { | ||
exports.queueParams = function(_queueParams) { | ||
queueParams = _queueParams; | ||
}; | ||
} | ||
// vim: set et sw=2 ts=2 colorcolumn=80: |
{ | ||
"name": "amqp-wrapper", | ||
"version": "3.1.0", | ||
"version": "4.0.0", | ||
"description": "A wrapper around https://github.com/squaremo/amqp.node to make consuming and publishing dead easy.", | ||
"main": "amqp.js", | ||
"scripts": { | ||
"test": "echo \"Error: no test specified\" && exit 1" | ||
"test": "grunt test" | ||
}, | ||
@@ -29,10 +29,12 @@ "repository": { | ||
"devDependencies": { | ||
"expect.js": "^0.3.1", | ||
"grunt": "~0.4.2", | ||
"grunt-exec": "~0.4.2", | ||
"grunt-env": "~0.4.0", | ||
"grunt-contrib-jshint": "~0.7.2", | ||
"grunt-contrib-watch": "~0.5.3", | ||
"grunt-env": "~0.4.0", | ||
"grunt-exec": "~0.4.2", | ||
"grunt-mocha-cli": "~1.4.0", | ||
"lodash": "^2.4.1", | ||
"mocha-unfunk-reporter": "~0.3.7", | ||
"chai": "~1.8.1", | ||
"q": "^0.9.7", | ||
"sandboxed-module": "~0.3.0", | ||
@@ -39,0 +41,0 @@ "sinon": "~1.8.1" |
@@ -15,23 +15,28 @@ amqp-wrapper | ||
var queues = { | ||
consume: { | ||
name: process.env.AMQP_CONSUME, | ||
options: {deadLetterExchange: process.env.AMQP_DEAD_LETTER_EXCHANGE} | ||
var config = { | ||
url: process.env.AMQP_URL, | ||
exchange: process.env.AMQP_EXCHANGE, | ||
queues: { | ||
consume: { | ||
name: process.env.AMQP_CONSUME, | ||
options: {deadLetterExchange: process.env.AMQP_DEAD_LETTER_EXCHANGE} | ||
}, | ||
publish: [ | ||
{ | ||
name: process.env.AMQP_RESPONSE, | ||
routingKey: process.env.AMQP_RESPONSE_ROUTING_KEY, | ||
options: {/* ... */} // options passed to ch.assertQueue() in wrapped lib. | ||
}, | ||
{ // ... | ||
} | ||
] | ||
}, | ||
publish: [ | ||
{ | ||
name: process.env.AMQP_RESPONSE, | ||
routingKey: process.env.AMQP_RESPONSE_ROUTING_KEY, | ||
options: {/* ... */} // options passed to ch.assertQueue() in wrapped lib. | ||
}, | ||
{ // ... | ||
} | ||
] | ||
// Set the QOS/prefetch. | ||
prefetch: 100 | ||
}; | ||
AMQP.connect(process.env.AMQP_URL, process.env.AMQP_EXCHANGE, | ||
queues, amqpConnectDone); | ||
var amqp = AMQP(config); | ||
// Set the QOS/prefetch. | ||
AMQP.prefetch(100); | ||
// Must call this before you consume/publish/etc... | ||
amqp.connect(amqpConnectDone); | ||
@@ -50,9 +55,9 @@ // Consuming | ||
// Start consuming: | ||
AMQP.consume(handleMessage); | ||
amqp.consume(handleMessage); | ||
// Publishing to one of the queues declared on connect. | ||
AMQP.publishToQueue(name, payload, done); | ||
amqp.publishToQueue(name, payload, done); | ||
// Publishing to arbitrary routing key. | ||
AMQP.publish(routingKey, payload, done); | ||
amqp.publish(routingKey, payload, options, done); | ||
@@ -59,0 +64,0 @@ If `payload` is an object, it will be turned into JSON. |
140
test/amqp.js
var SandboxedModule = require('sandboxed-module'), | ||
expect = require('chai').expect, | ||
sinon = require('sinon'), | ||
$ = require('lodash'); | ||
expect = require('expect.js'), | ||
Sinon = require('sinon'), | ||
AMQP = require('../amqp'); | ||
describe('AMQP', function() { | ||
var AMQP; | ||
beforeEach(function() { | ||
AMQP = require('../amqp'); | ||
}); | ||
var config = { | ||
url: 'amqp://guest:guest@localhost', | ||
exchange: 'mytestexchange', | ||
queues: { | ||
consume: { | ||
name: 'myconsumequeue' | ||
}, | ||
publish: [ | ||
{ | ||
name: 'mypublishqueue', | ||
routingKey: 'mypublishqueuerk' | ||
} | ||
] | ||
} | ||
}; | ||
describe('#connect', function() { | ||
it('should call the callback successfully', function(done) { | ||
AMQP.connect('amqp://guest:guest@localhost', 'mytestexchange', { | ||
consume: { | ||
name: 'myconsumequeue' | ||
}, | ||
publish: [ | ||
{ | ||
name: 'mypublishqueue', | ||
routingKey: 'mypublishqueuerk' | ||
} | ||
] | ||
}, done); | ||
var amqp = AMQP(config); | ||
amqp.connect(done); | ||
}); | ||
it('should setup for publishing and consuming', function(done) { | ||
AMQP.replaceSetupFuncs(sinon.spy(), sinon.spy()); | ||
AMQP.connect('amqp://guest:guest@localhost', 'mytestexchange', { | ||
consume: {name: 'myconsumequeue2'}, | ||
publish: [ | ||
{ | ||
name: 'mypublishqueue2', | ||
routingKey: 'mypublishqueue2' | ||
} | ||
] | ||
}, function(err) { | ||
var queueSetup = require('../queue-setup'); | ||
var amqp = AMQP(config); | ||
Sinon.stub(queueSetup, 'setupForConsume'); | ||
Sinon.stub(queueSetup, 'setupForPublish'); | ||
amqp.connect(function(err) { | ||
if (err) return done(err); | ||
expect(AMQP.getSetupFuncs().consume.calledOnce, 'setupForConsume()').to. | ||
equal(true); | ||
expect(AMQP.getSetupFuncs().publish.calledOnce, 'setupForPublish()').to. | ||
equal(true); | ||
expect(queueSetup.setupForConsume.calledOnce).to.be(true); | ||
expect(queueSetup.setupForPublish.calledOnce).to.be(true); | ||
done(); | ||
@@ -47,12 +42,6 @@ }); | ||
it('should call the callback successfully', function(done) { | ||
AMQP.connect('amqp://guest:guest@localhost', 'mytestexchange', { | ||
publish: [ | ||
{ | ||
name: 'myqueue', | ||
routingKey: 'myqueuekey' | ||
} | ||
] | ||
}, function(err, res) { | ||
var amqp = AMQP(config); | ||
amqp.connect(function(err) { | ||
if (err) return done(err); | ||
AMQP.publishToQueue('myqueue', 'test', done); | ||
amqp.publishToQueue('mypublishqueue', 'test', done); | ||
}); | ||
@@ -63,13 +52,13 @@ }); | ||
it('should call the callback successfully', function(done) { | ||
AMQP.connect('amqp://guest:guest@localhost', 'mytestexchange', {}, | ||
function(err, res) { | ||
var amqp = AMQP(config); | ||
amqp.connect(function(err) { | ||
if (err) return done(err); | ||
AMQP.publish('myqueue', 'test', {}, done); | ||
amqp.publish('myqueue', 'test', {}, done); | ||
}); | ||
}); | ||
it('should accept objects', function(done) { | ||
AMQP.connect('amqp://guest:guest@localhost', 'mytestexchange', {}, | ||
function(err, res) { | ||
var amqp = AMQP(config); | ||
amqp.connect(function(err) { | ||
if (err) return done(err); | ||
AMQP.publish('myqueue', {woo: 'test'}, {}, done); | ||
amqp.publish('myqueue', {woo: 'test'}, {}, done); | ||
}); | ||
@@ -79,23 +68,4 @@ }); | ||
describe('#consume', function() { | ||
function createMockedModuleObject(messageToDeliver, additionals) { | ||
var channelMock = { | ||
consume: function (a, handleMessage, b) { | ||
handleMessage({ | ||
content: { | ||
toString: function() {return messageToDeliver;} | ||
} | ||
}); | ||
} | ||
}; | ||
return { | ||
locals: { | ||
channel: $.extend(channelMock, additionals) | ||
} | ||
}; | ||
} | ||
it('if done(err) is called with err === null, calls ack().', | ||
function(done) { | ||
var ackSpy = sinon.spy(function(message) { | ||
it('if done(err) is called with err === null, calls ack().', function(done) { | ||
var ackSpy = Sinon.spy(function(message) { | ||
done(); | ||
@@ -105,4 +75,4 @@ }); | ||
// message will be {}. Mock out 'ack' method. | ||
createMockedModuleObject('{}', {ack: ackSpy})); | ||
mockedAMQP.queueParams({consume: {}}); | ||
require('./amqplibmock')('{}', {ack: ackSpy}) | ||
)(config); | ||
@@ -113,3 +83,6 @@ function myMessageHandler(parsedMsg, cb) { | ||
mockedAMQP.consume(myMessageHandler); | ||
mockedAMQP.connect(function(err) { | ||
if(err) return done(err); | ||
mockedAMQP.consume(myMessageHandler); | ||
}); | ||
}); | ||
@@ -119,3 +92,3 @@ | ||
function(done) { | ||
var nackSpy = sinon.spy(function(message, upTo, requeue) { | ||
var nackSpy = Sinon.spy(function(message, upTo, requeue) { | ||
expect(requeue).to.equal(false); | ||
@@ -127,4 +100,4 @@ done(); | ||
// message will be invalid json. Mock out 'nack' method. | ||
createMockedModuleObject('nonvalidjson', {nack: nackSpy})); | ||
mockedAMQP.queueParams({consume: {}}); | ||
require('./amqplibmock')('nonvalidjson', {nack: nackSpy}) | ||
)(config); | ||
@@ -135,16 +108,18 @@ function myMessageHandler(parsedMsg, cb) { | ||
mockedAMQP.consume(myMessageHandler); | ||
mockedAMQP.connect(function(err) { | ||
if(err) return done(err); | ||
mockedAMQP.consume(myMessageHandler); | ||
}); | ||
}); | ||
it('if json callback called with err, calls nack() with requeue as given.', | ||
function(done) { | ||
var nackSpy = sinon.spy(function(message, upTo, requeue) { | ||
var nackSpy = Sinon.spy(function(message, upTo, requeue) { | ||
expect(requeue).to.equal('requeue'); | ||
done(); | ||
}); | ||
var mockedAMQP = SandboxedModule.require('../amqp', | ||
// message will be {}. Mock out 'nack' method. | ||
createMockedModuleObject('{}', {nack: nackSpy})); | ||
mockedAMQP.queueParams({consume: {}}); | ||
require('./amqplibmock')('{}', {nack: nackSpy}) | ||
)(config); | ||
function myMessageHandler(parsedMsg, cb) { | ||
@@ -154,3 +129,6 @@ cb(new Error('got it bad'), 'requeue'); | ||
mockedAMQP.consume(myMessageHandler); | ||
mockedAMQP.connect(function(err) { | ||
if(err) return done(err); | ||
mockedAMQP.consume(myMessageHandler); | ||
}); | ||
}); | ||
@@ -157,0 +135,0 @@ }); |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Environment variable access
Supply chain riskPackage accesses environment variables, which may be a sign of credential stuffing or data theft.
Found 1 instance in 1 package
No tests
QualityPackage does not have any tests. This is a strong signal of a poorly maintained or low quality package.
Found 1 instance in 1 package
14446
9
323
1
89
0
12