amqp-wrapper
Advanced tools
Comparing version 4.1.3 to 5.0.0
36
amqp.js
'use strict'; | ||
var amqp = require('amqplib/callback_api'); | ||
var _ = require('lodash'); | ||
var async = require('async'); | ||
var stringifysafe = require('json-stringify-safe'); | ||
@@ -50,14 +48,7 @@ var queueSetup = require('./lib/queue-setup'); | ||
} | ||
var tasks = []; | ||
if (config.queues.publish && config.queues.publish instanceof Array) { | ||
tasks.push(function(callback) { | ||
queueSetup.setupForPublish(channel, config, callback); | ||
}); | ||
if (config.queue && config.queue.name) { | ||
queueSetup.setupForConsume(channel, config, cb); | ||
} else { | ||
cb(); | ||
} | ||
if (config.queues.consume && config.queues.consume.name) { | ||
tasks.push(function(callback) { | ||
queueSetup.setupForConsume(channel, config, callback); | ||
}); | ||
} | ||
async.series(tasks, cb); | ||
} | ||
@@ -67,17 +58,2 @@ }, | ||
/** | ||
* Publish a message to one of the AMQP queues specified on connect. | ||
* @param {string} name The name of the queue to use. | ||
* @param {string} message The message to publish. | ||
* @param {Function(err)} callback The callback to call when done. | ||
*/ | ||
publishToQueue: function(name, message, callback) { | ||
if (typeof message === 'object') { | ||
message = stringifysafe(message); | ||
} | ||
var publishQueue = _.find(config.queues.publish, {'name': name}); | ||
channel.publish(config.exchange, publishQueue.routingKey, | ||
new Buffer(message), {}, callback); | ||
}, | ||
/** | ||
* Publish a message using the specified routing key. | ||
@@ -138,3 +114,3 @@ * @param {string} routingKey The name of the queue to use. | ||
channel.consume(config.queues.consume.name, callback, {noAck: false}); | ||
channel.consume(config.queue.name, callback, {noAck: false}); | ||
} | ||
@@ -145,1 +121,3 @@ }; | ||
}; | ||
// vim: set et sw=2: |
@@ -28,22 +28,19 @@ 'use strict'; | ||
*/ | ||
exports.setupForPublish = function(channel, params, callback) { | ||
async.each( | ||
params.queues.publish, | ||
function(queue, cb) { | ||
debug('setupForPublish()', queue); | ||
async.series([ | ||
async.apply(maybeDeclareDeadLetters, channel, queue), | ||
channel.assertQueue.bind(channel, queue.name, queue.options), | ||
channel.bindQueue.bind(channel, | ||
queue.name, params.exchange, queue.routingKey, {}) | ||
], cb); | ||
}, callback); | ||
exports.setupForConsume = function(channel, params, callback) { | ||
var queue = params.queue; | ||
debug('setupForConsume()', queue); | ||
async.series([ | ||
maybeDeclareDeadLetters.bind(undefined, channel, queue), | ||
channel.assertQueue.bind(channel, queue.name, queue.options), | ||
function(callback) { | ||
if (queue.routingKey) { | ||
channel.bindQueue(queue.name, params.exchange, queue.routingKey, {}, | ||
callback); | ||
} else { | ||
callback(); | ||
} | ||
} | ||
], callback); | ||
}; | ||
// For consuming, we only assert the queue is there. | ||
exports.setupForConsume = function(channel, params, callback) { | ||
debug('setupForConsume()'); | ||
channel.prefetch(params.prefetch); | ||
channel.assertQueue(params.queues.consume.name, | ||
params.queues.consume.options, callback); | ||
}; | ||
// vim: set sw=2 et: |
{ | ||
"name": "amqp-wrapper", | ||
"version": "4.1.3", | ||
"version": "5.0.0", | ||
"description": "A wrapper around https://github.com/squaremo/amqp.node to make consuming and publishing dead easy.", | ||
@@ -29,3 +29,2 @@ "main": "amqp.js", | ||
"debug": "^1.0.4", | ||
"lodash": "^2.4.1", | ||
"json-stringify-safe": "^5.0.0" | ||
@@ -32,0 +31,0 @@ }, |
@@ -9,4 +9,5 @@ amqp-wrapper | ||
- All queues will be declared (made to exist). | ||
- If you specify a routing key for a publish queue, then a binding will be set up | ||
- You can specify a queue which will be declared (made to exist). This will be | ||
the queue from which you will consume. | ||
- If you specify a routing key for the queue, then a binding will be set up | ||
(I.e. a mapping that tells AMQP to route message with that routing key to that | ||
@@ -33,16 +34,6 @@ queue on the exchange you have specified). | ||
exchange: process.env.AMQP_EXCHANGE, | ||
queues: { | ||
consume: { | ||
name: process.env.AMQP_CONSUME, | ||
options: {/* ... */} // options passed to ch.assertQueue() in wrapped lib. | ||
}, | ||
publish: [ | ||
{ | ||
name: process.env.AMQP_RESPONSE, | ||
routingKey: process.env.AMQP_RESPONSE_ROUTING_KEY, | ||
options: {deadLetterExchange: process.env.AMQP_DEAD_LETTER_EXCHANGE} | ||
}, | ||
{ // ... | ||
} | ||
] | ||
queue: { | ||
name: process.env.AMQP_CONSUME, | ||
routingKey: process.env.AMQP_ROUTING_KEY, // If supplied, queue is bound to this key on the exchange. | ||
options: {/* ... */} // options passed to ch.assertQueue() in wrapped lib. | ||
}, | ||
@@ -72,5 +63,2 @@ // Set the QOS/prefetch. | ||
// Publishing to one of the queues declared on connect. | ||
amqp.publishToQueue(name, payload, done); | ||
// Publishing to arbitrary routing key. | ||
@@ -77,0 +65,0 @@ amqp.publish(routingKey, payload, options, done); |
'use strict'; | ||
var exec = require('child_process').exec; | ||
var SandboxedModule = require('sandboxed-module'); | ||
@@ -9,2 +10,18 @@ var AMQP = require('../../amqp'); | ||
describe('AMQP', function() { | ||
// Set up a vhost for testing purposes so we don't pollute /. | ||
before(function(done) { | ||
exec('curl -u guest:guest -H "content-type:application/json" ' + | ||
'-XPUT http://localhost:15672/api/vhosts/amqp-wrapper-testing', next); | ||
function next(err) { | ||
if (err) { | ||
return done(err); | ||
} | ||
exec('curl -u guest:guest -H "content-type:application/json" ' + | ||
'-XPUT http://localhost:15672/' + | ||
'api/permissions/amqp-wrapper-testing/guest ' + | ||
'-d \'{"configure":".*","write":".*","read":".*"}\'', | ||
done); | ||
} | ||
}); | ||
var config = require('../config'); | ||
@@ -48,3 +65,3 @@ describe('#constructor', function() { | ||
}); | ||
it('should setup for publishing and consuming', function(done) { | ||
it('should declare your queue, and bind it', function(done) { | ||
var amqpLibMock = require('./amqplibmock')(); | ||
@@ -62,5 +79,5 @@ var mockedAMQP = SandboxedModule.require('../../amqp', { | ||
// two queues, one of which is dead lettered | ||
expect(amqpLibMock.assertQueueSpy.callCount).to.equal(3); | ||
// Bind the publishing queue, and its dead letter queue. | ||
// one queue, dead lettered | ||
expect(amqpLibMock.assertQueueSpy.callCount).to.equal(2); | ||
// Bind the consume queue, and its dead letter queue. | ||
expect(amqpLibMock.bindQueueSpy.callCount).to.equal(2); | ||
@@ -70,11 +87,21 @@ done(); | ||
}); | ||
}); | ||
describe('#publishToQueue', function() { | ||
it('should call the callback successfully', function(done) { | ||
var amqp = AMQP(config); | ||
amqp.connect(function(err) { | ||
it('should just declare if you don\'t specify routing key', function(done) { | ||
var amqpLibMock = require('./amqplibmock')(); | ||
var config = require('../configNoKey'); | ||
var mockedAMQP = SandboxedModule.require('../../amqp', { | ||
requires: { | ||
'amqplib/callback_api': amqpLibMock.mock | ||
} | ||
})(config); | ||
mockedAMQP.connect(function(err) { | ||
if (err) { | ||
return done(err); | ||
} | ||
amqp.publishToQueue('mypublishqueue', 'test', done); | ||
// one queue, not dead lettered | ||
expect(amqpLibMock.assertQueueSpy.callCount).to.equal(1); | ||
// No binding. | ||
expect(amqpLibMock.bindQueueSpy.callCount).to.equal(0); | ||
done(); | ||
}); | ||
@@ -187,1 +214,3 @@ }); | ||
}); | ||
// vim: set et sw=2 colorcolumn=80: |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Shell access
Supply chain riskThis module accesses the system shell. Accessing the system shell increases the risk of executing arbitrary code.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
4
11
17552
384
91
1
- Removedlodash@^2.4.1
- Removedlodash@2.4.2(transitive)