Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

amqp-wrapper

Package Overview
Dependencies
Maintainers
1
Versions
32
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

amqp-wrapper - npm Package Compare versions

Comparing version 4.1.3 to 5.0.0

test/config.js

36

amqp.js
'use strict';
var amqp = require('amqplib/callback_api');
var _ = require('lodash');
var async = require('async');
var stringifysafe = require('json-stringify-safe');

@@ -50,14 +48,7 @@ var queueSetup = require('./lib/queue-setup');

}
var tasks = [];
if (config.queues.publish && config.queues.publish instanceof Array) {
tasks.push(function(callback) {
queueSetup.setupForPublish(channel, config, callback);
});
if (config.queue && config.queue.name) {
queueSetup.setupForConsume(channel, config, cb);
} else {
cb();
}
if (config.queues.consume && config.queues.consume.name) {
tasks.push(function(callback) {
queueSetup.setupForConsume(channel, config, callback);
});
}
async.series(tasks, cb);
}

@@ -67,17 +58,2 @@ },

/**
* Publish a message to one of the AMQP queues specified on connect.
* @param {string} name The name of the queue to use.
* @param {string} message The message to publish.
* @param {Function(err)} callback The callback to call when done.
*/
publishToQueue: function(name, message, callback) {
if (typeof message === 'object') {
message = stringifysafe(message);
}
var publishQueue = _.find(config.queues.publish, {'name': name});
channel.publish(config.exchange, publishQueue.routingKey,
new Buffer(message), {}, callback);
},
/**
* Publish a message using the specified routing key.

@@ -138,3 +114,3 @@ * @param {string} routingKey The name of the queue to use.

channel.consume(config.queues.consume.name, callback, {noAck: false});
channel.consume(config.queue.name, callback, {noAck: false});
}

@@ -145,1 +121,3 @@ };

};
// vim: set et sw=2:

@@ -28,22 +28,19 @@ 'use strict';

*/
exports.setupForPublish = function(channel, params, callback) {
async.each(
params.queues.publish,
function(queue, cb) {
debug('setupForPublish()', queue);
async.series([
async.apply(maybeDeclareDeadLetters, channel, queue),
channel.assertQueue.bind(channel, queue.name, queue.options),
channel.bindQueue.bind(channel,
queue.name, params.exchange, queue.routingKey, {})
], cb);
}, callback);
exports.setupForConsume = function(channel, params, callback) {
var queue = params.queue;
debug('setupForConsume()', queue);
async.series([
maybeDeclareDeadLetters.bind(undefined, channel, queue),
channel.assertQueue.bind(channel, queue.name, queue.options),
function(callback) {
if (queue.routingKey) {
channel.bindQueue(queue.name, params.exchange, queue.routingKey, {},
callback);
} else {
callback();
}
}
], callback);
};
// For consuming, we only assert the queue is there.
exports.setupForConsume = function(channel, params, callback) {
debug('setupForConsume()');
channel.prefetch(params.prefetch);
channel.assertQueue(params.queues.consume.name,
params.queues.consume.options, callback);
};
// vim: set sw=2 et:
{
"name": "amqp-wrapper",
"version": "4.1.3",
"version": "5.0.0",
"description": "A wrapper around https://github.com/squaremo/amqp.node to make consuming and publishing dead easy.",

@@ -29,3 +29,2 @@ "main": "amqp.js",

"debug": "^1.0.4",
"lodash": "^2.4.1",
"json-stringify-safe": "^5.0.0"

@@ -32,0 +31,0 @@ },

@@ -9,4 +9,5 @@ amqp-wrapper

- All queues will be declared (made to exist).
- If you specify a routing key for a publish queue, then a binding will be set up
- You can specify a queue which will be declared (made to exist). This will be
the queue from which you will consume.
- If you specify a routing key for the queue, then a binding will be set up
(I.e. a mapping that tells AMQP to route message with that routing key to that

@@ -33,16 +34,6 @@ queue on the exchange you have specified).

exchange: process.env.AMQP_EXCHANGE,
queues: {
consume: {
name: process.env.AMQP_CONSUME,
options: {/* ... */} // options passed to ch.assertQueue() in wrapped lib.
},
publish: [
{
name: process.env.AMQP_RESPONSE,
routingKey: process.env.AMQP_RESPONSE_ROUTING_KEY,
options: {deadLetterExchange: process.env.AMQP_DEAD_LETTER_EXCHANGE}
},
{ // ...
}
]
queue: {
name: process.env.AMQP_CONSUME,
routingKey: process.env.AMQP_ROUTING_KEY, // If supplied, queue is bound to this key on the exchange.
options: {/* ... */} // options passed to ch.assertQueue() in wrapped lib.
},

@@ -72,5 +63,2 @@ // Set the QOS/prefetch.

// Publishing to one of the queues declared on connect.
amqp.publishToQueue(name, payload, done);
// Publishing to arbitrary routing key.

@@ -77,0 +65,0 @@ amqp.publish(routingKey, payload, options, done);

'use strict';
var exec = require('child_process').exec;
var SandboxedModule = require('sandboxed-module');

@@ -9,2 +10,18 @@ var AMQP = require('../../amqp');

describe('AMQP', function() {
// Set up a vhost for testing purposes so we don't pollute /.
before(function(done) {
exec('curl -u guest:guest -H "content-type:application/json" ' +
'-XPUT http://localhost:15672/api/vhosts/amqp-wrapper-testing', next);
function next(err) {
if (err) {
return done(err);
}
exec('curl -u guest:guest -H "content-type:application/json" ' +
'-XPUT http://localhost:15672/' +
'api/permissions/amqp-wrapper-testing/guest ' +
'-d \'{"configure":".*","write":".*","read":".*"}\'',
done);
}
});
var config = require('../config');

@@ -48,3 +65,3 @@ describe('#constructor', function() {

});
it('should setup for publishing and consuming', function(done) {
it('should declare your queue, and bind it', function(done) {
var amqpLibMock = require('./amqplibmock')();

@@ -62,5 +79,5 @@ var mockedAMQP = SandboxedModule.require('../../amqp', {

// two queues, one of which is dead lettered
expect(amqpLibMock.assertQueueSpy.callCount).to.equal(3);
// Bind the publishing queue, and its dead letter queue.
// one queue, dead lettered
expect(amqpLibMock.assertQueueSpy.callCount).to.equal(2);
// Bind the consume queue, and its dead letter queue.
expect(amqpLibMock.bindQueueSpy.callCount).to.equal(2);

@@ -70,11 +87,21 @@ done();

});
});
describe('#publishToQueue', function() {
it('should call the callback successfully', function(done) {
var amqp = AMQP(config);
amqp.connect(function(err) {
it('should just declare if you don\'t specify routing key', function(done) {
var amqpLibMock = require('./amqplibmock')();
var config = require('../configNoKey');
var mockedAMQP = SandboxedModule.require('../../amqp', {
requires: {
'amqplib/callback_api': amqpLibMock.mock
}
})(config);
mockedAMQP.connect(function(err) {
if (err) {
return done(err);
}
amqp.publishToQueue('mypublishqueue', 'test', done);
// one queue, not dead lettered
expect(amqpLibMock.assertQueueSpy.callCount).to.equal(1);
// No binding.
expect(amqpLibMock.bindQueueSpy.callCount).to.equal(0);
done();
});

@@ -187,1 +214,3 @@ });

});
// vim: set et sw=2 colorcolumn=80:

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc