Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

amqp-wrapper

Package Overview
Dependencies
Maintainers
1
Versions
32
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

amqp-wrapper - npm Package Compare versions

Comparing version 5.5.1 to 5.6.0-requeue.0

213

amqp.js
'use strict';
var amqp = require('amqplib/callback_api'),
stringifysafe = require('json-stringify-safe'),
queueSetup = require('./lib/queue-setup'),
debug = require('debug')('amqp-wrapper'),
Deferred = require('deferential');
const amqp = require('amqplib/callback_api');
const stringifysafe = require('json-stringify-safe');
const queueSetup = require('./lib/queue-setup');
const debug = require('debug')('amqp-wrapper');
const Deferred = require('deferential');
module.exports = function(config) {
module.exports = function (config) {
if (!config || !config.url || !config.exchange) {

@@ -14,122 +14,119 @@ throw new Error('amqp-wrapper: Invalid config');

var connection, channel;
let connection;
let channel;
let consumerTag;
var prefetch = config.prefetch || 10;
var ret = {
/**
* Connects and remembers the channel.
*/
connect: function(cb) {
var d = Deferred();
amqp.connect(config.url, createChannel);
/**
* Connects and remembers the channel.
*/
function connect (cb) {
var d = Deferred();
amqp.connect(config.url, createChannel);
function createChannel(err, conn) {
debug('createChannel()');
if (err) {
return d.reject(err);
}
connection = conn;
function createChannel (err, conn) {
if (err) return d.reject(err);
connection = conn;
conn.createConfirmChannel(assertExchange);
}
conn.createConfirmChannel(assertExchange);
}
function assertExchange (err, ch) {
if (err) return d.reject(err);
channel = ch;
channel.prefetch(prefetch);
channel.assertExchange(config.exchange, 'topic', {}, assertQueues);
}
function assertExchange(err, ch) {
debug('assertExchange()', ch);
if (err) {
return d.reject(err);
}
channel = ch;
function assertQueues (err) {
if (err) return d.reject(err);
if (!config.queue || !config.queue.name) return d.resolve();
queueSetup.setupForConsume(channel, config, d.resolver(cb));
}
return d.nodeify(cb);
}
channel.prefetch(prefetch);
channel.assertExchange(config.exchange, 'topic', {}, assertQueues);
}
function requeueAll () {
channel.cancel(consumerTag);
channel.nackAll(true);
}
function assertQueues(err) {
debug('assertQueues()');
if (err) {
return d.reject(err);
}
if (config.queue && config.queue.name) {
queueSetup.setupForConsume(channel, config, d.resolver(cb));
} else {
d.resolve();
}
}
return d.nodeify(cb);
},
function close (cb) {
if (!connection) return cb();
return connection.close(cb);
}
close: function(cb) {
if (connection) {
return connection.close(cb);
/**
* Publish a message using the specified routing key.
* @param {string} routingKey The name of the queue to use.
* @param {string} message The message to publish.
* @param {Object} options Any options to pass through to the underlying
* publish.
* @param {Function(err)} callback The callback to call when done.
*/
function publish (routingKey, message, options, cb) {
debug('publish()');
var d = Deferred();
if (typeof message === 'object') {
message = stringifysafe(message);
}
channel.publish(config.exchange, routingKey, Buffer.from(message),
options, d.resolver(cb));
return d.nodeify(cb);
}
/**
* handleMessage() is expected to be of the form:
* handleMessage(parsedMessage, callback).
* If callback is called with a non-null error, then the message will be
* nacked. You can call it like:
* callback(err, requeue) in order
* to instruct rabbit whether to requeue the message
* (or discard/dead letter).
*
* If not given, requeue is assumed to be false.
*
* cf http://squaremo.github.io/amqp.node/doc/channel_api.html#toc_34
*/
function consume (handleMessage, options, cb) {
const d = Deferred();
debug('consume()');
function onMessage (message) {
function done (err, requeue) {
if (err) return channel.nack(message, false, requeue || false);
channel.ack(message);
}
cb();
},
/**
* Publish a message using the specified routing key.
* @param {string} routingKey The name of the queue to use.
* @param {string} message The message to publish.
* @param {Object} options Any options to pass through to the underlying
* publish.
* @param {Function(err)} callback The callback to call when done.
*/
publish: function(routingKey, message, options, cb) {
debug('publish()');
var d = Deferred();
if (typeof message === 'object') {
message = stringifysafe(message);
try {
var messagePayload = message.content.toString();
var parsedPayload = JSON.parse(messagePayload);
handleMessage(parsedPayload, done);
} catch (error) {
console.error(error);
// Do not requeue on exception - it means something unexpected
// (and prob. non-transitory) happened.
done(error, false);
}
channel.publish(config.exchange, routingKey, new Buffer(message),
options, d.resolver(cb));
}
return d.nodeify(cb);
},
channel.consume(config.queue.name, onMessage, options, consumeCb);
/**
* handleMessage() is expected to be of the form:
* handleMessage(parsedMessage, callback).
* If callback is called with a non-null error, then the message will be
* nacked. You can call it like:
* callback(err, requeue) in order
* to instruct rabbit whether to requeue the message
* (or discard/dead letter).
*
* If not given, requeue is assumed to be false.
*
* cf http://squaremo.github.io/amqp.node/doc/channel_api.html#toc_34
*/
consume: function(handleMessage, options) {
debug('consume()');
function callback(message) {
function done(err, requeue) {
if (requeue === undefined) {
requeue = false;
}
if (err) {
return channel.nack(message, false, requeue);
}
channel.ack(message);
}
function consumeCb (err, ok) {
if (err) return d.reject(err);
consumerTag = ok.consumerTag;
return d.resolve();
}
return d.nodeify(cb);
}
try {
var messagePayload = message.content.toString();
var parsedPayload = JSON.parse(messagePayload);
handleMessage(parsedPayload, done);
}
catch (error) {
console.log(error);
// Do not requeue on exception - it means something unexpected
// (and prob. non-transitory) happened.
done(error, false);
}
}
channel.consume(config.queue.name, callback, options);
}
return {
connect,
close,
publish,
consume,
requeueAll
};
return ret;
};
// vim: set et sw=2:

@@ -6,3 +6,3 @@ 'use strict';

function bindRoutingKeys(channel, exchange, queueName, keys, done) {
function bindRoutingKeys (channel, exchange, queueName, keys, done) {
var routingKeys;

@@ -16,6 +16,6 @@ if (typeof keys === 'string') {

async.map(routingKeys,
function(key, callback) {
channel.bindQueue(queueName, exchange, key, {}, callback);
},
done);
function (key, callback) {
channel.bindQueue(queueName, exchange, key, {}, callback);
},
done);
} else {

@@ -26,3 +26,3 @@ done();

function maybeDeclareDeadLetters(channel, queue, callback) {
function maybeDeclareDeadLetters (channel, queue, callback) {
if (!queue.options || !queue.options.deadLetterExchange) {

@@ -40,3 +40,3 @@ return callback();

queue.options.deadLetterExchangeRoutingKey || queue.routingKey)
], callback);
], callback);
}

@@ -48,7 +48,7 @@

*/
exports.setupForConsume = function(channel, params, callback) {
exports.setupForConsume = function (channel, params, callback) {
var queue = params.queue;
debug('setupForConsume()', queue);
console.log('queue');
console.log(queue);
console.log('queue');
console.log(queue);
async.series([

@@ -55,0 +55,0 @@ maybeDeclareDeadLetters.bind(undefined, channel, queue),

{
"name": "amqp-wrapper",
"version": "5.5.1",
"version": "5.6.0-requeue.0",
"description": "A wrapper around https://github.com/squaremo/amqp.node to make consuming and publishing dead easy.",
"main": "amqp.js",
"scripts": {
"test": "NODE_ENV=test mocha test --recursive",
"test": "semistandard && NODE_ENV=test mocha test --recursive",
"jshint": "jshint -c .jshintrc --exclude-path .gitignore .",
"codestyle": "jscs -p google lib/ test/"
"codestyle": "jscs -p google lib/ test/",
"coverage": "nyc -a -c -r html -r text -r lcov npm test"
},

@@ -39,5 +40,12 @@ "repository": {

"mocha": "^1.21.4",
"nyc": "^11.4.1",
"sandboxed-module": "^0.3.0",
"semistandard": "^12.0.1",
"sinon": "^1.10.3"
},
"semistandard": {
"env": [
"mocha"
]
}
}
'use strict';
var AMQP = require('../amqp'),
config = require('./config').good;
const AMQP = require('../amqp');
const config = require('./config').good;
describe('AMQP', function() {
describe('#connect', function() {
it('should return a promise', function(done) {
describe('AMQP', function () {
describe('#connect', function () {
it('should return a promise', function (done) {
var amqp = AMQP(config);
return amqp.connect().then(function() {
return amqp.connect().then(function () {
done();

@@ -12,0 +12,0 @@ }, done);

'use strict';
var SandboxedModule = require('sandboxed-module'),
expect = require('expect.js'),
AMQP = require('../amqp'),
config = require('./config');
const SandboxedModule = require('sandboxed-module');
const expect = require('expect.js');
const AMQP = require('../amqp');
const config = require('./config');
describe('AMQP', function() {
describe('#constructor', function() {
it('should throw with empty constructor', function(done) {
expect(function() { AMQP(); }).to
describe('AMQP', function () {
describe('#constructor', function () {
it('should throw with empty constructor', function (done) {
expect(function () { AMQP(); }).to
.throwError('amqp-wrapper: Invalid config');
done();
});
it('should throw with no url or exchange', function(done) {
expect(function() { AMQP({}); }).to
it('should throw with no url or exchange', function (done) {
expect(function () { AMQP({}); }).to
.throwError('amqp-wrapper: Invalid config');
done();
});
it('should throw with no url', function(done) {
expect(function() { AMQP({exchange: ''}); }).to
it('should throw with no url', function (done) {
expect(function () { AMQP({exchange: ''}); }).to
.throwError('amqp-wrapper: Invalid config');
done();
});
it('should throw with no exchange', function(done) {
expect(function() { AMQP({url: ''}); }).to
it('should throw with no exchange', function (done) {
expect(function () { AMQP({url: ''}); }).to
.throwError('amqp-wrapper: Invalid config');

@@ -31,4 +31,4 @@ done();

});
describe('#connect', function() {
it('should should fail to connect to bad endpoint', function(done) {
describe('#connect', function () {
it('should should fail to connect to bad endpoint', function (done) {
var amqp = AMQP({

@@ -38,3 +38,3 @@ url: 'amqp://guest:guest@localhost:6767',

});
amqp.connect(function(err) {
amqp.connect(function (err) {
expect(err.code).to.equal('ECONNREFUSED');

@@ -44,7 +44,7 @@ done();

});
it('should call the callback successfully', function(done) {
it('should call the callback successfully', function (done) {
var amqp = AMQP(config.good);
amqp.connect(done);
});
it('should declare your queue, and bind it', function(done) {
it('should declare your queue, and bind it', function (done) {
var amqpLibMock = require('./amqplibmock')();

@@ -57,3 +57,3 @@ var mockedAMQP = SandboxedModule.require('../amqp', {

mockedAMQP.connect(function(err) {
mockedAMQP.connect(function (err) {
if (err) {

@@ -71,25 +71,24 @@ return done(err);

it('allows you to specify an array for routingKey and binds each given',
function(done) {
var amqpLibMock = require('./amqplibmock')();
var mockedAMQP = SandboxedModule.require('../amqp', {
requires: {
'amqplib/callback_api': amqpLibMock.mock
}
})(config.routingKeyArray);
function (done) {
var amqpLibMock = require('./amqplibmock')();
var mockedAMQP = SandboxedModule.require('../amqp', {
requires: {
'amqplib/callback_api': amqpLibMock.mock
}
})(config.routingKeyArray);
mockedAMQP.connect(function(err) {
if (err) {
return done(err);
}
mockedAMQP.connect(function (err) {
if (err) {
return done(err);
}
// one queue, dead lettered
expect(amqpLibMock.assertQueueSpy.callCount).to.equal(2);
// Bind the consume queue with its two routing keys, and its dead
// letter queue.
expect(amqpLibMock.bindQueueSpy.callCount).to.equal(4);
done();
// one queue, dead lettered
expect(amqpLibMock.assertQueueSpy.callCount).to.equal(2);
// Bind the consume queue with its two routing keys, and its dead
// letter queue.
expect(amqpLibMock.bindQueueSpy.callCount).to.equal(4);
done();
});
});
});
it('should just declare if you don\'t specify routing key', function(done) {
it('should just declare if you don\'t specify routing key', function (done) {
var amqpLibMock = require('./amqplibmock')();

@@ -102,3 +101,3 @@ var mockedAMQP = SandboxedModule.require('../amqp', {

mockedAMQP.connect(function(err) {
mockedAMQP.connect(function (err) {
if (err) {

@@ -116,6 +115,6 @@ return done(err);

});
describe('#publish', function() {
it('should call the callback successfully', function(done) {
describe('#publish', function () {
it('should call the callback successfully', function (done) {
var amqp = AMQP(config.good);
amqp.connect(function(err) {
amqp.connect(function (err) {
if (err) {

@@ -127,5 +126,5 @@ return done(err);

});
it('should accept objects', function(done) {
it('should accept objects', function (done) {
var amqp = AMQP(config.good);
amqp.connect(function(err) {
amqp.connect(function (err) {
if (err) {

@@ -138,82 +137,102 @@ return done(err);

});
describe('#consume', function() {
describe('#consume', function () {
it('if done(err) is called with err === null, calls ack().',
function(done) {
var ack = function() {
done();
};
function (done) {
var ack = function () {
done();
};
var amqpLibMock = require('./amqplibmock')({overrides: {ack: ack}});
var amqpLibMock = require('./amqplibmock')({overrides: {ack: ack}});
var mockedAMQP = SandboxedModule.require('../amqp', {
requires: {
'amqplib/callback_api': amqpLibMock.mock
var mockedAMQP = SandboxedModule.require('../amqp', {
requires: {
'amqplib/callback_api': amqpLibMock.mock
}
})(config.good);
function myMessageHandler (parsedMsg, cb) {
cb();
}
})(config.good);
function myMessageHandler(parsedMsg, cb) {
cb();
}
mockedAMQP.connect(function(err) {
if (err) {
return done(err);
}
mockedAMQP.consume(myMessageHandler);
mockedAMQP.connect(function (err) {
if (err) {
return done(err);
}
mockedAMQP.consume(myMessageHandler);
});
});
});
it('if json unparsable, calls nack() with requeue of false.',
function(done) {
var nack = function(message, upTo, requeue) {
expect(requeue).to.equal(false);
done();
};
function (done) {
var nack = function (message, upTo, requeue) {
expect(requeue).to.equal(false);
done();
};
var amqpLibMock = require('./amqplibmock')({
messageToDeliver: 'nonvalidjson',
overrides: {nack: nack}
});
var amqpLibMock = require('./amqplibmock')({
messageToDeliver: 'nonvalidjson',
overrides: {nack: nack}
});
var mockedAMQP = SandboxedModule.require('../amqp', {
requires: {
'amqplib/callback_api': amqpLibMock.mock
var mockedAMQP = SandboxedModule.require('../amqp', {
requires: {
'amqplib/callback_api': amqpLibMock.mock
}
})(config.good);
function myMessageHandler (parsedMsg, cb) {
cb();
}
})(config.good);
function myMessageHandler(parsedMsg, cb) {
cb();
}
mockedAMQP.connect(function(err) {
if (err) {
return done(err);
}
mockedAMQP.consume(myMessageHandler);
mockedAMQP.connect(function (err) {
if (err) {
return done(err);
}
mockedAMQP.consume(myMessageHandler);
});
});
});
it('if json callback called with err, calls nack() with requeue as given.',
function(done) {
var nack = function(message, upTo, requeue) {
expect(requeue).to.equal('requeue');
done();
};
function (done) {
var nack = function (message, upTo, requeue) {
expect(requeue).to.equal('requeue');
done();
};
var amqpLibMock = require('./amqplibmock')({overrides: {nack: nack}});
var amqpLibMock = require('./amqplibmock')({overrides: {nack: nack}});
var mockedAMQP = SandboxedModule.require('../amqp', {
requires: {
'amqplib/callback_api': amqpLibMock.mock
var mockedAMQP = SandboxedModule.require('../amqp', {
requires: {
'amqplib/callback_api': amqpLibMock.mock
}
})(config.good);
function myMessageHandler (parsedMsg, cb) {
cb(new Error('got it bad'), 'requeue');
}
})(config.good);
function myMessageHandler(parsedMsg, cb) {
cb(new Error('got it bad'), 'requeue');
}
mockedAMQP.connect(function (err) {
if (err) {
return done(err);
}
mockedAMQP.consume(myMessageHandler);
});
});
});
mockedAMQP.connect(function(err) {
describe('#requeueAll', function () {
it('returns messages to the queue', function (done) {
var amqp = AMQP(config.good);
amqp.connect(function (err) {
if (err) {
return done(err);
}
mockedAMQP.consume(myMessageHandler);
amqp.publish('myqueue', 'test', { hi: 'there' }, done);
amqp.consume((msg) => {
expect(msg.hi).to.equal('there');
}).then(() => {
amqp.requeueAll();
amqp.consume((msg) => {
expect(msg.hi).to.equal('there');
});
});
});

@@ -220,0 +239,0 @@ });

@@ -8,3 +8,3 @@ 'use strict';

module.exports = function(config) {
module.exports = function (config) {
var overrides = (config && config.overrides) || {};

@@ -16,3 +16,3 @@ var messageToDeliver = (config && config.messageToDeliver) || '{}';

content: {
toString: function() {return messageToDeliver;}
toString: function () { return messageToDeliver; }
}

@@ -19,0 +19,0 @@ }),

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc