amqp-wrapper
Advanced tools
Comparing version 5.5.1 to 5.6.0-requeue.0
213
amqp.js
'use strict'; | ||
var amqp = require('amqplib/callback_api'), | ||
stringifysafe = require('json-stringify-safe'), | ||
queueSetup = require('./lib/queue-setup'), | ||
debug = require('debug')('amqp-wrapper'), | ||
Deferred = require('deferential'); | ||
const amqp = require('amqplib/callback_api'); | ||
const stringifysafe = require('json-stringify-safe'); | ||
const queueSetup = require('./lib/queue-setup'); | ||
const debug = require('debug')('amqp-wrapper'); | ||
const Deferred = require('deferential'); | ||
module.exports = function(config) { | ||
module.exports = function (config) { | ||
if (!config || !config.url || !config.exchange) { | ||
@@ -14,122 +14,119 @@ throw new Error('amqp-wrapper: Invalid config'); | ||
var connection, channel; | ||
let connection; | ||
let channel; | ||
let consumerTag; | ||
var prefetch = config.prefetch || 10; | ||
var ret = { | ||
/** | ||
* Connects and remembers the channel. | ||
*/ | ||
connect: function(cb) { | ||
var d = Deferred(); | ||
amqp.connect(config.url, createChannel); | ||
/** | ||
* Connects and remembers the channel. | ||
*/ | ||
function connect (cb) { | ||
var d = Deferred(); | ||
amqp.connect(config.url, createChannel); | ||
function createChannel(err, conn) { | ||
debug('createChannel()'); | ||
if (err) { | ||
return d.reject(err); | ||
} | ||
connection = conn; | ||
function createChannel (err, conn) { | ||
if (err) return d.reject(err); | ||
connection = conn; | ||
conn.createConfirmChannel(assertExchange); | ||
} | ||
conn.createConfirmChannel(assertExchange); | ||
} | ||
function assertExchange (err, ch) { | ||
if (err) return d.reject(err); | ||
channel = ch; | ||
channel.prefetch(prefetch); | ||
channel.assertExchange(config.exchange, 'topic', {}, assertQueues); | ||
} | ||
function assertExchange(err, ch) { | ||
debug('assertExchange()', ch); | ||
if (err) { | ||
return d.reject(err); | ||
} | ||
channel = ch; | ||
function assertQueues (err) { | ||
if (err) return d.reject(err); | ||
if (!config.queue || !config.queue.name) return d.resolve(); | ||
queueSetup.setupForConsume(channel, config, d.resolver(cb)); | ||
} | ||
return d.nodeify(cb); | ||
} | ||
channel.prefetch(prefetch); | ||
channel.assertExchange(config.exchange, 'topic', {}, assertQueues); | ||
} | ||
function requeueAll () { | ||
channel.cancel(consumerTag); | ||
channel.nackAll(true); | ||
} | ||
function assertQueues(err) { | ||
debug('assertQueues()'); | ||
if (err) { | ||
return d.reject(err); | ||
} | ||
if (config.queue && config.queue.name) { | ||
queueSetup.setupForConsume(channel, config, d.resolver(cb)); | ||
} else { | ||
d.resolve(); | ||
} | ||
} | ||
return d.nodeify(cb); | ||
}, | ||
function close (cb) { | ||
if (!connection) return cb(); | ||
return connection.close(cb); | ||
} | ||
close: function(cb) { | ||
if (connection) { | ||
return connection.close(cb); | ||
/** | ||
* Publish a message using the specified routing key. | ||
* @param {string} routingKey The name of the queue to use. | ||
* @param {string} message The message to publish. | ||
* @param {Object} options Any options to pass through to the underlying | ||
* publish. | ||
* @param {Function(err)} callback The callback to call when done. | ||
*/ | ||
function publish (routingKey, message, options, cb) { | ||
debug('publish()'); | ||
var d = Deferred(); | ||
if (typeof message === 'object') { | ||
message = stringifysafe(message); | ||
} | ||
channel.publish(config.exchange, routingKey, Buffer.from(message), | ||
options, d.resolver(cb)); | ||
return d.nodeify(cb); | ||
} | ||
/** | ||
* handleMessage() is expected to be of the form: | ||
* handleMessage(parsedMessage, callback). | ||
* If callback is called with a non-null error, then the message will be | ||
* nacked. You can call it like: | ||
* callback(err, requeue) in order | ||
* to instruct rabbit whether to requeue the message | ||
* (or discard/dead letter). | ||
* | ||
* If not given, requeue is assumed to be false. | ||
* | ||
* cf http://squaremo.github.io/amqp.node/doc/channel_api.html#toc_34 | ||
*/ | ||
function consume (handleMessage, options, cb) { | ||
const d = Deferred(); | ||
debug('consume()'); | ||
function onMessage (message) { | ||
function done (err, requeue) { | ||
if (err) return channel.nack(message, false, requeue || false); | ||
channel.ack(message); | ||
} | ||
cb(); | ||
}, | ||
/** | ||
* Publish a message using the specified routing key. | ||
* @param {string} routingKey The name of the queue to use. | ||
* @param {string} message The message to publish. | ||
* @param {Object} options Any options to pass through to the underlying | ||
* publish. | ||
* @param {Function(err)} callback The callback to call when done. | ||
*/ | ||
publish: function(routingKey, message, options, cb) { | ||
debug('publish()'); | ||
var d = Deferred(); | ||
if (typeof message === 'object') { | ||
message = stringifysafe(message); | ||
try { | ||
var messagePayload = message.content.toString(); | ||
var parsedPayload = JSON.parse(messagePayload); | ||
handleMessage(parsedPayload, done); | ||
} catch (error) { | ||
console.error(error); | ||
// Do not requeue on exception - it means something unexpected | ||
// (and prob. non-transitory) happened. | ||
done(error, false); | ||
} | ||
channel.publish(config.exchange, routingKey, new Buffer(message), | ||
options, d.resolver(cb)); | ||
} | ||
return d.nodeify(cb); | ||
}, | ||
channel.consume(config.queue.name, onMessage, options, consumeCb); | ||
/** | ||
* handleMessage() is expected to be of the form: | ||
* handleMessage(parsedMessage, callback). | ||
* If callback is called with a non-null error, then the message will be | ||
* nacked. You can call it like: | ||
* callback(err, requeue) in order | ||
* to instruct rabbit whether to requeue the message | ||
* (or discard/dead letter). | ||
* | ||
* If not given, requeue is assumed to be false. | ||
* | ||
* cf http://squaremo.github.io/amqp.node/doc/channel_api.html#toc_34 | ||
*/ | ||
consume: function(handleMessage, options) { | ||
debug('consume()'); | ||
function callback(message) { | ||
function done(err, requeue) { | ||
if (requeue === undefined) { | ||
requeue = false; | ||
} | ||
if (err) { | ||
return channel.nack(message, false, requeue); | ||
} | ||
channel.ack(message); | ||
} | ||
function consumeCb (err, ok) { | ||
if (err) return d.reject(err); | ||
consumerTag = ok.consumerTag; | ||
return d.resolve(); | ||
} | ||
return d.nodeify(cb); | ||
} | ||
try { | ||
var messagePayload = message.content.toString(); | ||
var parsedPayload = JSON.parse(messagePayload); | ||
handleMessage(parsedPayload, done); | ||
} | ||
catch (error) { | ||
console.log(error); | ||
// Do not requeue on exception - it means something unexpected | ||
// (and prob. non-transitory) happened. | ||
done(error, false); | ||
} | ||
} | ||
channel.consume(config.queue.name, callback, options); | ||
} | ||
return { | ||
connect, | ||
close, | ||
publish, | ||
consume, | ||
requeueAll | ||
}; | ||
return ret; | ||
}; | ||
// vim: set et sw=2: |
@@ -6,3 +6,3 @@ 'use strict'; | ||
function bindRoutingKeys(channel, exchange, queueName, keys, done) { | ||
function bindRoutingKeys (channel, exchange, queueName, keys, done) { | ||
var routingKeys; | ||
@@ -16,6 +16,6 @@ if (typeof keys === 'string') { | ||
async.map(routingKeys, | ||
function(key, callback) { | ||
channel.bindQueue(queueName, exchange, key, {}, callback); | ||
}, | ||
done); | ||
function (key, callback) { | ||
channel.bindQueue(queueName, exchange, key, {}, callback); | ||
}, | ||
done); | ||
} else { | ||
@@ -26,3 +26,3 @@ done(); | ||
function maybeDeclareDeadLetters(channel, queue, callback) { | ||
function maybeDeclareDeadLetters (channel, queue, callback) { | ||
if (!queue.options || !queue.options.deadLetterExchange) { | ||
@@ -40,3 +40,3 @@ return callback(); | ||
queue.options.deadLetterExchangeRoutingKey || queue.routingKey) | ||
], callback); | ||
], callback); | ||
} | ||
@@ -48,7 +48,7 @@ | ||
*/ | ||
exports.setupForConsume = function(channel, params, callback) { | ||
exports.setupForConsume = function (channel, params, callback) { | ||
var queue = params.queue; | ||
debug('setupForConsume()', queue); | ||
console.log('queue'); | ||
console.log(queue); | ||
console.log('queue'); | ||
console.log(queue); | ||
async.series([ | ||
@@ -55,0 +55,0 @@ maybeDeclareDeadLetters.bind(undefined, channel, queue), |
{ | ||
"name": "amqp-wrapper", | ||
"version": "5.5.1", | ||
"version": "5.6.0-requeue.0", | ||
"description": "A wrapper around https://github.com/squaremo/amqp.node to make consuming and publishing dead easy.", | ||
"main": "amqp.js", | ||
"scripts": { | ||
"test": "NODE_ENV=test mocha test --recursive", | ||
"test": "semistandard && NODE_ENV=test mocha test --recursive", | ||
"jshint": "jshint -c .jshintrc --exclude-path .gitignore .", | ||
"codestyle": "jscs -p google lib/ test/" | ||
"codestyle": "jscs -p google lib/ test/", | ||
"coverage": "nyc -a -c -r html -r text -r lcov npm test" | ||
}, | ||
@@ -39,5 +40,12 @@ "repository": { | ||
"mocha": "^1.21.4", | ||
"nyc": "^11.4.1", | ||
"sandboxed-module": "^0.3.0", | ||
"semistandard": "^12.0.1", | ||
"sinon": "^1.10.3" | ||
}, | ||
"semistandard": { | ||
"env": [ | ||
"mocha" | ||
] | ||
} | ||
} |
'use strict'; | ||
var AMQP = require('../amqp'), | ||
config = require('./config').good; | ||
const AMQP = require('../amqp'); | ||
const config = require('./config').good; | ||
describe('AMQP', function() { | ||
describe('#connect', function() { | ||
it('should return a promise', function(done) { | ||
describe('AMQP', function () { | ||
describe('#connect', function () { | ||
it('should return a promise', function (done) { | ||
var amqp = AMQP(config); | ||
return amqp.connect().then(function() { | ||
return amqp.connect().then(function () { | ||
done(); | ||
@@ -12,0 +12,0 @@ }, done); |
227
test/amqp.js
'use strict'; | ||
var SandboxedModule = require('sandboxed-module'), | ||
expect = require('expect.js'), | ||
AMQP = require('../amqp'), | ||
config = require('./config'); | ||
const SandboxedModule = require('sandboxed-module'); | ||
const expect = require('expect.js'); | ||
const AMQP = require('../amqp'); | ||
const config = require('./config'); | ||
describe('AMQP', function() { | ||
describe('#constructor', function() { | ||
it('should throw with empty constructor', function(done) { | ||
expect(function() { AMQP(); }).to | ||
describe('AMQP', function () { | ||
describe('#constructor', function () { | ||
it('should throw with empty constructor', function (done) { | ||
expect(function () { AMQP(); }).to | ||
.throwError('amqp-wrapper: Invalid config'); | ||
done(); | ||
}); | ||
it('should throw with no url or exchange', function(done) { | ||
expect(function() { AMQP({}); }).to | ||
it('should throw with no url or exchange', function (done) { | ||
expect(function () { AMQP({}); }).to | ||
.throwError('amqp-wrapper: Invalid config'); | ||
done(); | ||
}); | ||
it('should throw with no url', function(done) { | ||
expect(function() { AMQP({exchange: ''}); }).to | ||
it('should throw with no url', function (done) { | ||
expect(function () { AMQP({exchange: ''}); }).to | ||
.throwError('amqp-wrapper: Invalid config'); | ||
done(); | ||
}); | ||
it('should throw with no exchange', function(done) { | ||
expect(function() { AMQP({url: ''}); }).to | ||
it('should throw with no exchange', function (done) { | ||
expect(function () { AMQP({url: ''}); }).to | ||
.throwError('amqp-wrapper: Invalid config'); | ||
@@ -31,4 +31,4 @@ done(); | ||
}); | ||
describe('#connect', function() { | ||
it('should should fail to connect to bad endpoint', function(done) { | ||
describe('#connect', function () { | ||
it('should should fail to connect to bad endpoint', function (done) { | ||
var amqp = AMQP({ | ||
@@ -38,3 +38,3 @@ url: 'amqp://guest:guest@localhost:6767', | ||
}); | ||
amqp.connect(function(err) { | ||
amqp.connect(function (err) { | ||
expect(err.code).to.equal('ECONNREFUSED'); | ||
@@ -44,7 +44,7 @@ done(); | ||
}); | ||
it('should call the callback successfully', function(done) { | ||
it('should call the callback successfully', function (done) { | ||
var amqp = AMQP(config.good); | ||
amqp.connect(done); | ||
}); | ||
it('should declare your queue, and bind it', function(done) { | ||
it('should declare your queue, and bind it', function (done) { | ||
var amqpLibMock = require('./amqplibmock')(); | ||
@@ -57,3 +57,3 @@ var mockedAMQP = SandboxedModule.require('../amqp', { | ||
mockedAMQP.connect(function(err) { | ||
mockedAMQP.connect(function (err) { | ||
if (err) { | ||
@@ -71,25 +71,24 @@ return done(err); | ||
it('allows you to specify an array for routingKey and binds each given', | ||
function(done) { | ||
var amqpLibMock = require('./amqplibmock')(); | ||
var mockedAMQP = SandboxedModule.require('../amqp', { | ||
requires: { | ||
'amqplib/callback_api': amqpLibMock.mock | ||
} | ||
})(config.routingKeyArray); | ||
function (done) { | ||
var amqpLibMock = require('./amqplibmock')(); | ||
var mockedAMQP = SandboxedModule.require('../amqp', { | ||
requires: { | ||
'amqplib/callback_api': amqpLibMock.mock | ||
} | ||
})(config.routingKeyArray); | ||
mockedAMQP.connect(function(err) { | ||
if (err) { | ||
return done(err); | ||
} | ||
mockedAMQP.connect(function (err) { | ||
if (err) { | ||
return done(err); | ||
} | ||
// one queue, dead lettered | ||
expect(amqpLibMock.assertQueueSpy.callCount).to.equal(2); | ||
// Bind the consume queue with its two routing keys, and its dead | ||
// letter queue. | ||
expect(amqpLibMock.bindQueueSpy.callCount).to.equal(4); | ||
done(); | ||
// one queue, dead lettered | ||
expect(amqpLibMock.assertQueueSpy.callCount).to.equal(2); | ||
// Bind the consume queue with its two routing keys, and its dead | ||
// letter queue. | ||
expect(amqpLibMock.bindQueueSpy.callCount).to.equal(4); | ||
done(); | ||
}); | ||
}); | ||
}); | ||
it('should just declare if you don\'t specify routing key', function(done) { | ||
it('should just declare if you don\'t specify routing key', function (done) { | ||
var amqpLibMock = require('./amqplibmock')(); | ||
@@ -102,3 +101,3 @@ var mockedAMQP = SandboxedModule.require('../amqp', { | ||
mockedAMQP.connect(function(err) { | ||
mockedAMQP.connect(function (err) { | ||
if (err) { | ||
@@ -116,6 +115,6 @@ return done(err); | ||
}); | ||
describe('#publish', function() { | ||
it('should call the callback successfully', function(done) { | ||
describe('#publish', function () { | ||
it('should call the callback successfully', function (done) { | ||
var amqp = AMQP(config.good); | ||
amqp.connect(function(err) { | ||
amqp.connect(function (err) { | ||
if (err) { | ||
@@ -127,5 +126,5 @@ return done(err); | ||
}); | ||
it('should accept objects', function(done) { | ||
it('should accept objects', function (done) { | ||
var amqp = AMQP(config.good); | ||
amqp.connect(function(err) { | ||
amqp.connect(function (err) { | ||
if (err) { | ||
@@ -138,82 +137,102 @@ return done(err); | ||
}); | ||
describe('#consume', function() { | ||
describe('#consume', function () { | ||
it('if done(err) is called with err === null, calls ack().', | ||
function(done) { | ||
var ack = function() { | ||
done(); | ||
}; | ||
function (done) { | ||
var ack = function () { | ||
done(); | ||
}; | ||
var amqpLibMock = require('./amqplibmock')({overrides: {ack: ack}}); | ||
var amqpLibMock = require('./amqplibmock')({overrides: {ack: ack}}); | ||
var mockedAMQP = SandboxedModule.require('../amqp', { | ||
requires: { | ||
'amqplib/callback_api': amqpLibMock.mock | ||
var mockedAMQP = SandboxedModule.require('../amqp', { | ||
requires: { | ||
'amqplib/callback_api': amqpLibMock.mock | ||
} | ||
})(config.good); | ||
function myMessageHandler (parsedMsg, cb) { | ||
cb(); | ||
} | ||
})(config.good); | ||
function myMessageHandler(parsedMsg, cb) { | ||
cb(); | ||
} | ||
mockedAMQP.connect(function(err) { | ||
if (err) { | ||
return done(err); | ||
} | ||
mockedAMQP.consume(myMessageHandler); | ||
mockedAMQP.connect(function (err) { | ||
if (err) { | ||
return done(err); | ||
} | ||
mockedAMQP.consume(myMessageHandler); | ||
}); | ||
}); | ||
}); | ||
it('if json unparsable, calls nack() with requeue of false.', | ||
function(done) { | ||
var nack = function(message, upTo, requeue) { | ||
expect(requeue).to.equal(false); | ||
done(); | ||
}; | ||
function (done) { | ||
var nack = function (message, upTo, requeue) { | ||
expect(requeue).to.equal(false); | ||
done(); | ||
}; | ||
var amqpLibMock = require('./amqplibmock')({ | ||
messageToDeliver: 'nonvalidjson', | ||
overrides: {nack: nack} | ||
}); | ||
var amqpLibMock = require('./amqplibmock')({ | ||
messageToDeliver: 'nonvalidjson', | ||
overrides: {nack: nack} | ||
}); | ||
var mockedAMQP = SandboxedModule.require('../amqp', { | ||
requires: { | ||
'amqplib/callback_api': amqpLibMock.mock | ||
var mockedAMQP = SandboxedModule.require('../amqp', { | ||
requires: { | ||
'amqplib/callback_api': amqpLibMock.mock | ||
} | ||
})(config.good); | ||
function myMessageHandler (parsedMsg, cb) { | ||
cb(); | ||
} | ||
})(config.good); | ||
function myMessageHandler(parsedMsg, cb) { | ||
cb(); | ||
} | ||
mockedAMQP.connect(function(err) { | ||
if (err) { | ||
return done(err); | ||
} | ||
mockedAMQP.consume(myMessageHandler); | ||
mockedAMQP.connect(function (err) { | ||
if (err) { | ||
return done(err); | ||
} | ||
mockedAMQP.consume(myMessageHandler); | ||
}); | ||
}); | ||
}); | ||
it('if json callback called with err, calls nack() with requeue as given.', | ||
function(done) { | ||
var nack = function(message, upTo, requeue) { | ||
expect(requeue).to.equal('requeue'); | ||
done(); | ||
}; | ||
function (done) { | ||
var nack = function (message, upTo, requeue) { | ||
expect(requeue).to.equal('requeue'); | ||
done(); | ||
}; | ||
var amqpLibMock = require('./amqplibmock')({overrides: {nack: nack}}); | ||
var amqpLibMock = require('./amqplibmock')({overrides: {nack: nack}}); | ||
var mockedAMQP = SandboxedModule.require('../amqp', { | ||
requires: { | ||
'amqplib/callback_api': amqpLibMock.mock | ||
var mockedAMQP = SandboxedModule.require('../amqp', { | ||
requires: { | ||
'amqplib/callback_api': amqpLibMock.mock | ||
} | ||
})(config.good); | ||
function myMessageHandler (parsedMsg, cb) { | ||
cb(new Error('got it bad'), 'requeue'); | ||
} | ||
})(config.good); | ||
function myMessageHandler(parsedMsg, cb) { | ||
cb(new Error('got it bad'), 'requeue'); | ||
} | ||
mockedAMQP.connect(function (err) { | ||
if (err) { | ||
return done(err); | ||
} | ||
mockedAMQP.consume(myMessageHandler); | ||
}); | ||
}); | ||
}); | ||
mockedAMQP.connect(function(err) { | ||
describe('#requeueAll', function () { | ||
it('returns messages to the queue', function (done) { | ||
var amqp = AMQP(config.good); | ||
amqp.connect(function (err) { | ||
if (err) { | ||
return done(err); | ||
} | ||
mockedAMQP.consume(myMessageHandler); | ||
amqp.publish('myqueue', 'test', { hi: 'there' }, done); | ||
amqp.consume((msg) => { | ||
expect(msg.hi).to.equal('there'); | ||
}).then(() => { | ||
amqp.requeueAll(); | ||
amqp.consume((msg) => { | ||
expect(msg.hi).to.equal('there'); | ||
}); | ||
}); | ||
}); | ||
@@ -220,0 +239,0 @@ }); |
@@ -8,3 +8,3 @@ 'use strict'; | ||
module.exports = function(config) { | ||
module.exports = function (config) { | ||
var overrides = (config && config.overrides) || {}; | ||
@@ -16,3 +16,3 @@ var messageToDeliver = (config && config.messageToDeliver) || '{}'; | ||
content: { | ||
toString: function() {return messageToDeliver;} | ||
toString: function () { return messageToDeliver; } | ||
} | ||
@@ -19,0 +19,0 @@ }), |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
No v1
QualityPackage is not semver >=1. This means it is not stable and does not support ^ ranges.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Deprecated
MaintenanceThe maintainer of the package marked it as deprecated. This could indicate that a single version should not be used, or that the package is no longer maintained and any new vulnerabilities will not be fixed.
Found 1 instance in 1 package
21117
452
0
9
13
2