amqp-wrapper
Advanced tools
Comparing version 6.0.0 to 7.0.0-es6.0
141
amqp.js
@@ -1,65 +0,57 @@ | ||
'use strict'; | ||
const amqp = require('amqplib/callback_api'); | ||
const amqp = require('amqplib'); | ||
const stringifysafe = require('json-stringify-safe'); | ||
const queueSetup = require('./lib/queue-setup'); | ||
const debug = require('debug')('amqp-wrapper'); | ||
const Deferred = require('deferential'); | ||
module.exports = function (config) { | ||
if (!config || !config.url || !config.exchange) { | ||
throw new Error('amqp-wrapper: Invalid config'); | ||
/** | ||
* Class to contain an instantiated connection/channel to AMQP with a given | ||
* config. | ||
*/ | ||
class AMQPWrapper { | ||
/** | ||
* Instantiate an AMQP wrapper with a given config. | ||
* @param {object} config | ||
* @param {string} config.url | ||
* @param {string} config.exchange | ||
* @param {object} config.queue | ||
* @param {string} config.queue.name | ||
* @param {Array<string>|string} config.queue.routingKey | ||
* @param {object} config.queue.options | ||
*/ | ||
constructor (config) { | ||
if (!config || !config.url || !config.exchange) { | ||
throw new Error('amqp-wrapper: Invalid config'); | ||
} | ||
this.config = config; | ||
this.connection = null; | ||
this.channel = null; | ||
this.prefetch = config.prefetch || 10; | ||
} | ||
var connection, channel; | ||
var prefetch = config.prefetch || 10; | ||
/** | ||
* Connects and remembers the channel. | ||
* @async | ||
* @description Connects, establishes a channel, sets up exchange/queues/bindings/dead | ||
* lettering. | ||
* @returns {Promise} | ||
*/ | ||
function connect (cb) { | ||
var d = Deferred(); | ||
amqp.connect(config.url, createChannel); | ||
function createChannel (err, conn) { | ||
debug('createChannel()'); | ||
if (err) { | ||
return d.reject(err); | ||
} | ||
connection = conn; | ||
conn.createConfirmChannel(assertExchange); | ||
async connect () { | ||
const { config } = this; | ||
const conn = await amqp.connect(config.url); | ||
this.channel = await conn.createConfirmChannel(); | ||
this.channel.prefetch(this.prefetch); | ||
await this.channel.assertExchange(config.exchange, 'topic', {}); | ||
if (config.queue && config.queue.name) { | ||
return queueSetup.setupForConsume(this.channel, config); | ||
} | ||
function assertExchange (err, ch) { | ||
debug('assertExchange()', ch); | ||
if (err) { | ||
return d.reject(err); | ||
} | ||
channel = ch; | ||
channel.prefetch(prefetch); | ||
channel.assertExchange(config.exchange, 'topic', {}, assertQueues); | ||
} | ||
function assertQueues (err) { | ||
debug('assertQueues()'); | ||
if (err) { | ||
return d.reject(err); | ||
} | ||
if (config.queue && config.queue.name) { | ||
queueSetup.setupForConsume(channel, config, d.resolver(cb)); | ||
} else { | ||
d.resolve(); | ||
} | ||
} | ||
return d.nodeify(cb); | ||
} | ||
function close (cb) { | ||
if (connection) { | ||
return connection.close(cb); | ||
/** | ||
* @async | ||
* @description | ||
* Closes connection. | ||
* @returns {Promise} | ||
*/ | ||
async close () { | ||
if (this.connection) { | ||
return this.connection.close(); | ||
} | ||
cb(); | ||
} | ||
@@ -75,15 +67,26 @@ | ||
*/ | ||
function publish (routingKey, message, options, cb) { | ||
debug('publish()'); | ||
var d = Deferred(); | ||
/** | ||
* @async | ||
* @description | ||
* Publish a message to the given routing key, with given options. | ||
* @param {string} routingKey | ||
* @param {object|string} message | ||
* @param {object} options | ||
* @returns {Promise} | ||
*/ | ||
async publish (routingKey, message, options) { | ||
if (typeof message === 'object') { | ||
message = stringifysafe(message); | ||
} | ||
channel.publish(config.exchange, routingKey, Buffer.from(message), | ||
options, d.resolver(cb)); | ||
return d.nodeify(cb); | ||
return this.channel.publish(this.config.exchange, routingKey, Buffer.from(message), options); | ||
} | ||
/** | ||
* @async | ||
* Start consuming on the queue specified in the config you passed on | ||
* instantiation, using the given message handler callback. | ||
* @param {function} handleMessage | ||
* @param {object} options | ||
* @description | ||
* handleMessage() is expected to be of the form: | ||
@@ -100,5 +103,6 @@ * handleMessage(parsedMessage, callback). | ||
* cf http://squaremo.github.io/amqp.node/doc/channel_api.html#toc_34 | ||
* @returns {Promise} | ||
*/ | ||
function consume (handleMessage, options) { | ||
debug('consume()'); | ||
async consume (handleMessage, options) { | ||
const { channel } = this; | ||
function callback (message) { | ||
@@ -127,13 +131,6 @@ function done (err, requeue) { | ||
channel.consume(config.queue.name, callback, options); | ||
return channel.consume(this.config.queue.name, callback, options); | ||
} | ||
} | ||
return { | ||
connect, | ||
close, | ||
publish, | ||
consume | ||
}; | ||
}; | ||
// vim: set et sw=2: | ||
module.exports = AMQPWrapper; |
@@ -1,38 +0,17 @@ | ||
'use strict'; | ||
var async = require('async'); | ||
var debug = require('debug')('amqp-wrapper'); | ||
function bindRoutingKeys (channel, exchange, queueName, keys, done) { | ||
var routingKeys; | ||
if (typeof keys === 'string') { | ||
routingKeys = [keys]; | ||
} else { | ||
routingKeys = keys; | ||
async function bindRoutingKeys (channel, exchange, queueName, keys = []) { | ||
const routingKeys = (typeof keys === 'string') | ||
? [keys] | ||
: keys; | ||
for (let routingKey in routingKeys) { | ||
await channel.bindQueue(queueName, exchange, routingKey); | ||
} | ||
if (routingKeys) { | ||
async.map(routingKeys, | ||
function (key, callback) { | ||
channel.bindQueue(queueName, exchange, key, {}, callback); | ||
}, | ||
done); | ||
} else { | ||
done(); | ||
} | ||
} | ||
function maybeDeclareDeadLetters (channel, queue, callback) { | ||
if (!queue.options || !queue.options.deadLetterExchange) { | ||
return callback(); | ||
} | ||
async function maybeDeclareDeadLetters (channel, queue) { | ||
if (!queue.options || !queue.options.deadLetterExchange) return; | ||
var qName = queue.name + | ||
(queue.options.deadLetterQueueSuffix || '-dead-letter'); | ||
async.series([ | ||
channel.assertExchange.bind(channel, | ||
queue.options.deadLetterExchange, 'topic', {}), | ||
channel.assertQueue.bind(channel, qName, {}), | ||
bindRoutingKeys.bind(undefined, channel, queue.options.deadLetterExchange, qName, | ||
queue.options.deadLetterExchangeRoutingKey || queue.routingKey) | ||
], callback); | ||
var qName = queue.name + (queue.options.deadLetterQueueSuffix || '-dead-letter'); | ||
await channel.assertExchange(queue.options.deadLetterExchange, 'topic', {}); | ||
await channel.assertQueue(qName, {}); | ||
await bindRoutingKeys(channel, queue.options.deadLetterExchange, qName, queue.options.deadLetterExchangeRoutingKey || queue.routingKey); | ||
} | ||
@@ -44,15 +23,7 @@ | ||
*/ | ||
exports.setupForConsume = function (channel, params, callback) { | ||
var queue = params.queue; | ||
debug('setupForConsume()', queue); | ||
console.log('queue'); | ||
console.log(queue); | ||
async.series([ | ||
maybeDeclareDeadLetters.bind(undefined, channel, queue), | ||
channel.assertQueue.bind(channel, queue.name, queue.options), | ||
bindRoutingKeys.bind(undefined, channel, params.exchange, queue.name, | ||
queue.routingKey) | ||
], callback); | ||
exports.setupForConsume = async function (channel, params) { | ||
var { queue } = params; | ||
await maybeDeclareDeadLetters(channel, queue); | ||
await channel.assertQueue(queue.name, queue.options); | ||
await bindRoutingKeys(channel, params.exchange, queue.name, queue.routingKey); | ||
}; | ||
// vim: set sw=2 et: |
{ | ||
"name": "amqp-wrapper", | ||
"version": "6.0.0", | ||
"version": "7.0.0-es6.0", | ||
"engines": { | ||
"node": ">= 8" | ||
}, | ||
"description": "A wrapper around https://github.com/squaremo/amqp.node to make consuming and publishing dead easy.", | ||
"main": "amqp.js", | ||
"scripts": { | ||
"test": "semistandard && NODE_ENV=test mocha test --recursive --exit", | ||
"coverage": "nyc -a -c -r html -r text -r lcov npm test" | ||
"coverage": "nyc -a -c -r html -r text -r lcov npm test", | ||
"docs": "jsdoc2md amqp.js", | ||
"test": "semistandard && NODE_ENV=test mocha test --recursive --exit" | ||
}, | ||
@@ -26,10 +30,10 @@ "repository": { | ||
"amqplib": "^0.5.5", | ||
"async": "^3.1.0", | ||
"debug": "^2.6.8", | ||
"deferential": "^1.0.0", | ||
"json-stringify-safe": "^5.0.0", | ||
"q": "^1.5.1" | ||
"json-stringify-safe": "^5.0.0" | ||
}, | ||
"devDependencies": { | ||
"expect.js": "^0.3.1", | ||
"chai": "^4.2.0", | ||
"chai-as-promised": "^7.1.1", | ||
"dirty-chai": "^2.0.1", | ||
"jsdoc-to-markdown": "^5.0.0", | ||
"mocha": "^6.2.0", | ||
@@ -39,3 +43,3 @@ "nyc": "^11.4.1", | ||
"semistandard": "^12.0.1", | ||
"sinon": "^1.10.3" | ||
"sinon": "^7.3.2" | ||
}, | ||
@@ -42,0 +46,0 @@ "semistandard": { |
121
README.md
@@ -46,21 +46,23 @@ amqp-wrapper | ||
// Must call this before you consume/publish/etc... | ||
amqp.connect(amqpConnectDone); | ||
async function main () { | ||
// Must call this before you consume/publish/etc... | ||
await amqp.connect(); | ||
// Consuming | ||
var handleMessage = function(message, callback) { | ||
//... | ||
}; | ||
// You must call: | ||
callback(err, requeue) | ||
// in your handleMessage. If `err` !== `null` then the message will be `nack`ed. | ||
// Requeueing will be requeue iff `requeue` is `true`. | ||
// If `err` is `null` then the message is `ack`ed. | ||
// If an exception occurs in handleMessage, then the message is `nack`ed and not requeued. | ||
// Consuming | ||
var handleMessage = function(message, callback) { | ||
//... | ||
}; | ||
// You must call: | ||
callback(err, requeue) | ||
// in your handleMessage. If `err` !== `null` then the message will be `nack`ed. | ||
// Requeueing will be requeue iff `requeue` is `true`. | ||
// If `err` is `null` then the message is `ack`ed. | ||
// If an exception occurs in handleMessage, then the message is `nack`ed and not requeued. | ||
// Start consuming: | ||
amqp.consume(handleMessage); | ||
// Start consuming: | ||
amqp.consume(handleMessage); | ||
// Publishing to arbitrary routing key. | ||
amqp.publish(routingKey, payload, options, done); | ||
// Publishing to arbitrary routing key. | ||
await amqp.publish(routingKey, payload, options); | ||
} | ||
``` | ||
@@ -74,5 +76,5 @@ | ||
```bash | ||
docker run --rm -p 5672:5672 dockerfile/rabbitmq | ||
docker run -d --rm -p 5672:5672 rabbitmq | ||
``` | ||
Then: | ||
Wait for it to finish starting up, then: | ||
``` | ||
@@ -86,7 +88,86 @@ npm test | ||
# Promises | ||
# API | ||
`connect()` and `publish()` support promises too. If you don't give a callback, they'll return a promise! | ||
> amqp-wrapper@6.0.0 docs /Users/tim/git/node-amqp-wrapper | ||
> jsdoc2md amqp.js | ||
<a name="AMQPWrapper"></a> | ||
## AMQPWrapper | ||
Class to contain an instantiated connection/channel to AMQP with a given | ||
config. | ||
**Kind**: global class | ||
* [AMQPWrapper](#AMQPWrapper) | ||
* [new AMQPWrapper(config)](#new_AMQPWrapper_new) | ||
* [.connect()](#AMQPWrapper+connect) ⇒ <code>Promise</code> | ||
* [.close()](#AMQPWrapper+close) ⇒ <code>Promise</code> | ||
* [.publish(routingKey, message, options)](#AMQPWrapper+publish) ⇒ <code>Promise</code> | ||
* [.consume(handleMessage, options)](#AMQPWrapper+consume) ⇒ <code>Promise</code> | ||
<a name="new_AMQPWrapper_new"></a> | ||
### new AMQPWrapper(config) | ||
Instantiate an AMQP wrapper with a given config. | ||
| Param | Type | | ||
| --- | --- | | ||
| config | <code>object</code> | | ||
| config.url | <code>string</code> | | ||
| config.exchange | <code>string</code> | | ||
| config.queue | <code>object</code> | | ||
| config.queue.name | <code>string</code> | | ||
| config.queue.routingKey | <code>Array.<string></code> \| <code>string</code> | | ||
| config.queue.options | <code>object</code> | | ||
<a name="AMQPWrapper+connect"></a> | ||
### amqpWrapper.connect() ⇒ <code>Promise</code> | ||
Connects, establishes a channel, sets up exchange/queues/bindings/dead | ||
lettering. | ||
**Kind**: instance method of [<code>AMQPWrapper</code>](#AMQPWrapper) | ||
<a name="AMQPWrapper+close"></a> | ||
### amqpWrapper.close() ⇒ <code>Promise</code> | ||
Closes connection. | ||
**Kind**: instance method of [<code>AMQPWrapper</code>](#AMQPWrapper) | ||
<a name="AMQPWrapper+publish"></a> | ||
### amqpWrapper.publish(routingKey, message, options) ⇒ <code>Promise</code> | ||
Publish a message to the given routing key, with given options. | ||
**Kind**: instance method of [<code>AMQPWrapper</code>](#AMQPWrapper) | ||
| Param | Type | | ||
| --- | --- | | ||
| routingKey | <code>string</code> | | ||
| message | <code>object</code> \| <code>string</code> | | ||
| options | <code>object</code> | | ||
<a name="AMQPWrapper+consume"></a> | ||
### amqpWrapper.consume(handleMessage, options) ⇒ <code>Promise</code> | ||
handleMessage() is expected to be of the form: | ||
handleMessage(parsedMessage, callback). | ||
If callback is called with a non-null error, then the message will be | ||
nacked. You can call it like: | ||
callback(err, requeue) in order | ||
to instruct rabbit whether to requeue the message | ||
(or discard/dead letter). | ||
If not given, requeue is assumed to be false. | ||
cf http://squaremo.github.io/amqp.node/doc/channel_api.html#toc_34 | ||
**Kind**: instance method of [<code>AMQPWrapper</code>](#AMQPWrapper) | ||
| Param | Type | | ||
| --- | --- | | ||
| handleMessage | <code>function</code> | | ||
| options | <code>object</code> | | ||
# License | ||
@@ -93,0 +174,0 @@ |
@@ -7,10 +7,12 @@ 'use strict'; | ||
describe('AMQP', function () { | ||
describe('#connect', function () { | ||
describe('#consume', function () { | ||
it('should return a promise', function (done) { | ||
var amqp = AMQP(config); | ||
var amqp = new AMQP(config); | ||
amqp.connect().then(function () { | ||
done(); | ||
}, done); | ||
return amqp.consume().then(function () { | ||
done(); | ||
}, done); | ||
}); | ||
}); | ||
}); | ||
}); |
233
test/amqp.js
@@ -1,5 +0,5 @@ | ||
'use strict'; | ||
const SandboxedModule = require('sandboxed-module'); | ||
const expect = require('expect.js'); | ||
const expect = require('chai').expect; | ||
require('chai').use(require('chai-as-promised')); | ||
require('chai').use(require('dirty-chai')); | ||
const AMQP = require('../amqp'); | ||
@@ -10,21 +10,13 @@ const config = require('./config'); | ||
describe('#constructor', function () { | ||
it('should throw with empty constructor', function (done) { | ||
expect(function () { AMQP(); }).to | ||
.throwError('amqp-wrapper: Invalid config'); | ||
done(); | ||
it('should throw with empty constructor', function () { | ||
expect(() => new AMQP()).to.throw('amqp-wrapper: Invalid config'); | ||
}); | ||
it('should throw with no url or exchange', function (done) { | ||
expect(function () { AMQP({}); }).to | ||
.throwError('amqp-wrapper: Invalid config'); | ||
done(); | ||
it('should throw with no url or exchange', function () { | ||
expect(() => new AMQP({})).to.throw('amqp-wrapper: Invalid config'); | ||
}); | ||
it('should throw with no url', function (done) { | ||
expect(function () { AMQP({ exchange: '' }); }).to | ||
.throwError('amqp-wrapper: Invalid config'); | ||
done(); | ||
it('should throw with no url', function () { | ||
expect(() => new AMQP({ exchange: '' })).to.throw('amqp-wrapper: Invalid config'); | ||
}); | ||
it('should throw with no exchange', function (done) { | ||
expect(function () { AMQP({ url: '' }); }).to | ||
.throwError('amqp-wrapper: Invalid config'); | ||
done(); | ||
it('should throw with no exchange', function () { | ||
expect(() => new AMQP({ url: '' })).to.throw('amqp-wrapper: Invalid config'); | ||
}); | ||
@@ -34,7 +26,7 @@ }); | ||
it('should should fail to connect to bad endpoint', function (done) { | ||
var amqp = AMQP({ | ||
var amqp = new AMQP({ | ||
url: 'amqp://guest:guest@localhost:6767', | ||
exchange: 'FOO' | ||
}); | ||
amqp.connect(function (err) { | ||
amqp.connect().catch(function (err) { | ||
expect(err.code).to.equal('ECONNREFUSED'); | ||
@@ -45,60 +37,48 @@ done(); | ||
it('should call the callback successfully', function (done) { | ||
var amqp = AMQP(config.good); | ||
amqp.connect(done); | ||
var amqp = new AMQP(config.good); | ||
amqp.connect().then(() => done()); | ||
}); | ||
it('should declare your queue, and bind it', function (done) { | ||
it('should declare your queue, and bind it', async function () { | ||
var amqpLibMock = require('./amqplibmock')(); | ||
var mockedAMQP = SandboxedModule.require('../amqp', { | ||
var MockedAMQP = SandboxedModule.require('../amqp', { | ||
requires: { | ||
'amqplib/callback_api': amqpLibMock.mock | ||
'amqplib': amqpLibMock.mock | ||
} | ||
})(config.good); | ||
}); | ||
const mockedAMQP = new MockedAMQP(config.good); | ||
mockedAMQP.connect(function (err) { | ||
if (err) { | ||
return done(err); | ||
await mockedAMQP.connect(); | ||
// one queue, dead lettered | ||
expect(amqpLibMock.assertQueueSpy.callCount).to.equal(2); | ||
// Bind the consume queue, and its dead letter queue. | ||
expect(amqpLibMock.bindQueueSpy.callCount).to.equal(2); | ||
}); | ||
it('allows you to specify an array for routingKey and binds each given', function (done) { | ||
var amqpLibMock = require('./amqplibmock')(); | ||
var MockedAMQP = SandboxedModule.require('../amqp', { | ||
requires: { | ||
'amqplib': amqpLibMock.mock | ||
} | ||
}); | ||
const mockedAMQP = new MockedAMQP(config.routingKeyArray); | ||
mockedAMQP.connect().then(function () { | ||
// one queue, dead lettered | ||
expect(amqpLibMock.assertQueueSpy.callCount).to.equal(2); | ||
// Bind the consume queue, and its dead letter queue. | ||
expect(amqpLibMock.bindQueueSpy.callCount).to.equal(2); | ||
// Bind the consume queue with its two routing keys, and its dead | ||
// letter queue. | ||
expect(amqpLibMock.bindQueueSpy.callCount).to.equal(4); | ||
done(); | ||
}); | ||
}).catch(done); | ||
}); | ||
it('allows you to specify an array for routingKey and binds each given', | ||
function (done) { | ||
var amqpLibMock = require('./amqplibmock')(); | ||
var mockedAMQP = SandboxedModule.require('../amqp', { | ||
requires: { | ||
'amqplib/callback_api': amqpLibMock.mock | ||
} | ||
})(config.routingKeyArray); | ||
mockedAMQP.connect(function (err) { | ||
if (err) { | ||
return done(err); | ||
} | ||
// one queue, dead lettered | ||
expect(amqpLibMock.assertQueueSpy.callCount).to.equal(2); | ||
// Bind the consume queue with its two routing keys, and its dead | ||
// letter queue. | ||
expect(amqpLibMock.bindQueueSpy.callCount).to.equal(4); | ||
done(); | ||
}); | ||
}); | ||
it('should just declare if you don\'t specify routing key', function (done) { | ||
var amqpLibMock = require('./amqplibmock')(); | ||
var mockedAMQP = SandboxedModule.require('../amqp', { | ||
var MockedAMQP = SandboxedModule.require('../amqp', { | ||
requires: { | ||
'amqplib/callback_api': amqpLibMock.mock | ||
'amqplib': amqpLibMock.mock | ||
} | ||
})(config.noRoutingKey); | ||
}); | ||
const mockedAMQP = new MockedAMQP(config.noRoutingKey); | ||
mockedAMQP.connect(function (err) { | ||
if (err) { | ||
return done(err); | ||
} | ||
mockedAMQP.connect().then(function () { | ||
// one queue, not dead lettered | ||
@@ -109,81 +89,66 @@ expect(amqpLibMock.assertQueueSpy.callCount).to.equal(1); | ||
done(); | ||
}); | ||
}).catch(done); | ||
}); | ||
}); | ||
describe('#publish', function () { | ||
it('should call the callback successfully', function (done) { | ||
var amqp = AMQP(config.good); | ||
amqp.connect(function (err) { | ||
if (err) { | ||
return done(err); | ||
} | ||
amqp.publish('myqueue', 'test', {}, done); | ||
}); | ||
it('should resolve successfully', async function () { | ||
var amqp = new AMQP(config.good); | ||
await amqp.connect(); | ||
await expect(amqp.publish('myqueue', 'test', {})).to.eventually.be.fulfilled(); | ||
}); | ||
it('should accept objects', function (done) { | ||
var amqp = AMQP(config.good); | ||
amqp.connect(function (err) { | ||
if (err) { | ||
return done(err); | ||
} | ||
amqp.publish('myqueue', { woo: 'test' }, {}, done); | ||
}); | ||
it('should accept objects', async function () { | ||
var amqp = new AMQP(config.good); | ||
await amqp.connect(); | ||
await expect(amqp.publish('myqueue', { woo: 'test' }, {})).to.eventually.be.fulfilled(); | ||
}); | ||
}); | ||
describe('#consume', function () { | ||
it('if done(err) is called with err === null, calls ack().', | ||
function (done) { | ||
var ack = function () { | ||
done(); | ||
}; | ||
describe('#consume', async function () { | ||
it('if done(err) is called with err === null, calls ack().', function (done) { | ||
var ack = function () { | ||
done(); | ||
}; | ||
var amqpLibMock = require('./amqplibmock')({ overrides: { ack: ack } }); | ||
var mockedAMQP = SandboxedModule.require('../amqp', { | ||
requires: { | ||
'amqplib/callback_api': amqpLibMock.mock | ||
} | ||
})(config.good); | ||
function myMessageHandler (parsedMsg, cb) { | ||
cb(); | ||
var amqpLibMock = require('./amqplibmock')({ overrides: { ack: ack } }); | ||
var MockedAMQP = SandboxedModule.require('../amqp', { | ||
requires: { | ||
'amqplib': amqpLibMock.mock | ||
} | ||
mockedAMQP.connect(function (err) { | ||
if (err) { | ||
return done(err); | ||
} | ||
mockedAMQP.consume(myMessageHandler); | ||
}); | ||
}); | ||
const mockedAMQP = new MockedAMQP(config.good); | ||
it('if json unparsable, calls nack() with requeue of false.', | ||
function (done) { | ||
var nack = function (message, upTo, requeue) { | ||
expect(requeue).to.equal(false); | ||
done(); | ||
}; | ||
function myMessageHandler (parsedMsg, cb) { | ||
cb(); | ||
} | ||
var amqpLibMock = require('./amqplibmock')({ | ||
messageToDeliver: 'nonvalidjson', | ||
overrides: { nack: nack } | ||
}); | ||
mockedAMQP.connect().then(function () { | ||
mockedAMQP.consume(myMessageHandler); | ||
}).catch(done); | ||
}); | ||
var mockedAMQP = SandboxedModule.require('../amqp', { | ||
requires: { | ||
'amqplib/callback_api': amqpLibMock.mock | ||
} | ||
})(config.good); | ||
it('if json unparsable, calls nack() with requeue of false.', function (done) { | ||
var nack = function (message, upTo, requeue) { | ||
expect(requeue).to.equal(false); | ||
done(); | ||
}; | ||
function myMessageHandler (parsedMsg, cb) { | ||
cb(); | ||
var amqpLibMock = require('./amqplibmock')({ | ||
messageToDeliver: 'nonvalidjson', | ||
overrides: { nack: nack } | ||
}); | ||
var MockedAMQP = SandboxedModule.require('../amqp', { | ||
requires: { | ||
'amqplib': amqpLibMock.mock | ||
} | ||
}); | ||
const mockedAMQP = new MockedAMQP(config.good); | ||
mockedAMQP.connect(function (err) { | ||
if (err) { | ||
return done(err); | ||
} | ||
mockedAMQP.consume(myMessageHandler); | ||
}); | ||
}); | ||
function myMessageHandler (parsedMsg, cb) { | ||
cb(); | ||
} | ||
mockedAMQP.connect().then(function () { | ||
mockedAMQP.consume(myMessageHandler); | ||
}).catch(done); | ||
}); | ||
it('if json callback called with err, calls nack() with requeue as given.', | ||
@@ -198,7 +163,8 @@ function (done) { | ||
var mockedAMQP = SandboxedModule.require('../amqp', { | ||
var MockedAMQP = SandboxedModule.require('../amqp', { | ||
requires: { | ||
'amqplib/callback_api': amqpLibMock.mock | ||
'amqplib': amqpLibMock.mock | ||
} | ||
})(config.good); | ||
}); | ||
const mockedAMQP = new MockedAMQP(config.good); | ||
@@ -209,8 +175,5 @@ function myMessageHandler (parsedMsg, cb) { | ||
mockedAMQP.connect(function (err) { | ||
if (err) { | ||
return done(err); | ||
} | ||
mockedAMQP.connect().then(function () { | ||
mockedAMQP.consume(myMessageHandler); | ||
}); | ||
}).catch(done); | ||
}); | ||
@@ -217,0 +180,0 @@ }); |
@@ -1,3 +0,1 @@ | ||
'use strict'; | ||
/** | ||
@@ -13,3 +11,3 @@ * This is a mock for the underlying amqplib library that we are wrapping. | ||
var channelMock = { | ||
consume: Sinon.stub().yields({ | ||
consume: Sinon.stub().callsArgWith(1, { | ||
content: { | ||
@@ -19,5 +17,5 @@ toString: function () { return messageToDeliver; } | ||
}), | ||
assertExchange: Sinon.stub().callsArg(3), | ||
assertQueue: Sinon.stub().callsArg(2), | ||
bindQueue: Sinon.stub().callsArg(4), | ||
assertExchange: Sinon.stub().resolves(), | ||
assertQueue: Sinon.stub().resolves(), | ||
bindQueue: Sinon.stub().resolves(), | ||
prefetch: Sinon.spy(), | ||
@@ -29,4 +27,4 @@ ack: overrides.ack || Sinon.spy(), | ||
var amqpLibMock = { | ||
connect: Sinon.stub().yields(null, { | ||
createConfirmChannel: Sinon.stub().yields(null, channelMock) | ||
connect: Sinon.stub().resolves({ | ||
createConfirmChannel: Sinon.stub().resolves(channelMock) | ||
}) | ||
@@ -33,0 +31,0 @@ }; |
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
No v1
QualityPackage is not semver >=1. This means it is not stable and does not support ^ ranges.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Deprecated
MaintenanceThe maintainer of the package marked it as deprecated. This could indicate that a single version should not be used, or that the package is no longer maintained and any new vulnerabilities will not be fixed.
Found 1 instance in 1 package
22578
3
0
194
9
387
2
- Removedasync@^3.1.0
- Removeddeferential@^1.0.0
- Removedq@^1.5.1
- Removedasync@3.2.6(transitive)
- Removeddeferential@1.0.0(transitive)
- Removednative-promise-only@0.8.1(transitive)
- Removedq@1.5.1(transitive)