Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

amqp-wrapper

Package Overview
Dependencies
Maintainers
7
Versions
32
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

amqp-wrapper - npm Package Compare versions

Comparing version 6.0.0 to 7.0.0-es6.0

141

amqp.js

@@ -1,65 +0,57 @@

'use strict';
const amqp = require('amqplib/callback_api');
const amqp = require('amqplib');
const stringifysafe = require('json-stringify-safe');
const queueSetup = require('./lib/queue-setup');
const debug = require('debug')('amqp-wrapper');
const Deferred = require('deferential');
module.exports = function (config) {
if (!config || !config.url || !config.exchange) {
throw new Error('amqp-wrapper: Invalid config');
/**
* Class to contain an instantiated connection/channel to AMQP with a given
* config.
*/
class AMQPWrapper {
/**
* Instantiate an AMQP wrapper with a given config.
* @param {object} config
* @param {string} config.url
* @param {string} config.exchange
* @param {object} config.queue
* @param {string} config.queue.name
* @param {Array<string>|string} config.queue.routingKey
* @param {object} config.queue.options
*/
constructor (config) {
if (!config || !config.url || !config.exchange) {
throw new Error('amqp-wrapper: Invalid config');
}
this.config = config;
this.connection = null;
this.channel = null;
this.prefetch = config.prefetch || 10;
}
var connection, channel;
var prefetch = config.prefetch || 10;
/**
* Connects and remembers the channel.
* @async
* @description Connects, establishes a channel, sets up exchange/queues/bindings/dead
* lettering.
* @returns {Promise}
*/
function connect (cb) {
var d = Deferred();
amqp.connect(config.url, createChannel);
function createChannel (err, conn) {
debug('createChannel()');
if (err) {
return d.reject(err);
}
connection = conn;
conn.createConfirmChannel(assertExchange);
async connect () {
const { config } = this;
const conn = await amqp.connect(config.url);
this.channel = await conn.createConfirmChannel();
this.channel.prefetch(this.prefetch);
await this.channel.assertExchange(config.exchange, 'topic', {});
if (config.queue && config.queue.name) {
return queueSetup.setupForConsume(this.channel, config);
}
function assertExchange (err, ch) {
debug('assertExchange()', ch);
if (err) {
return d.reject(err);
}
channel = ch;
channel.prefetch(prefetch);
channel.assertExchange(config.exchange, 'topic', {}, assertQueues);
}
function assertQueues (err) {
debug('assertQueues()');
if (err) {
return d.reject(err);
}
if (config.queue && config.queue.name) {
queueSetup.setupForConsume(channel, config, d.resolver(cb));
} else {
d.resolve();
}
}
return d.nodeify(cb);
}
function close (cb) {
if (connection) {
return connection.close(cb);
/**
* @async
* @description
* Closes connection.
* @returns {Promise}
*/
async close () {
if (this.connection) {
return this.connection.close();
}
cb();
}

@@ -75,15 +67,26 @@

*/
function publish (routingKey, message, options, cb) {
debug('publish()');
var d = Deferred();
/**
* @async
* @description
* Publish a message to the given routing key, with given options.
* @param {string} routingKey
* @param {object|string} message
* @param {object} options
* @returns {Promise}
*/
async publish (routingKey, message, options) {
if (typeof message === 'object') {
message = stringifysafe(message);
}
channel.publish(config.exchange, routingKey, Buffer.from(message),
options, d.resolver(cb));
return d.nodeify(cb);
return this.channel.publish(this.config.exchange, routingKey, Buffer.from(message), options);
}
/**
* @async
* Start consuming on the queue specified in the config you passed on
* instantiation, using the given message handler callback.
* @param {function} handleMessage
* @param {object} options
* @description
* handleMessage() is expected to be of the form:

@@ -100,5 +103,6 @@ * handleMessage(parsedMessage, callback).

* cf http://squaremo.github.io/amqp.node/doc/channel_api.html#toc_34
* @returns {Promise}
*/
function consume (handleMessage, options) {
debug('consume()');
async consume (handleMessage, options) {
const { channel } = this;
function callback (message) {

@@ -127,13 +131,6 @@ function done (err, requeue) {

channel.consume(config.queue.name, callback, options);
return channel.consume(this.config.queue.name, callback, options);
}
}
return {
connect,
close,
publish,
consume
};
};
// vim: set et sw=2:
module.exports = AMQPWrapper;

@@ -1,38 +0,17 @@

'use strict';
var async = require('async');
var debug = require('debug')('amqp-wrapper');
function bindRoutingKeys (channel, exchange, queueName, keys, done) {
var routingKeys;
if (typeof keys === 'string') {
routingKeys = [keys];
} else {
routingKeys = keys;
async function bindRoutingKeys (channel, exchange, queueName, keys = []) {
const routingKeys = (typeof keys === 'string')
? [keys]
: keys;
for (let routingKey in routingKeys) {
await channel.bindQueue(queueName, exchange, routingKey);
}
if (routingKeys) {
async.map(routingKeys,
function (key, callback) {
channel.bindQueue(queueName, exchange, key, {}, callback);
},
done);
} else {
done();
}
}
function maybeDeclareDeadLetters (channel, queue, callback) {
if (!queue.options || !queue.options.deadLetterExchange) {
return callback();
}
async function maybeDeclareDeadLetters (channel, queue) {
if (!queue.options || !queue.options.deadLetterExchange) return;
var qName = queue.name +
(queue.options.deadLetterQueueSuffix || '-dead-letter');
async.series([
channel.assertExchange.bind(channel,
queue.options.deadLetterExchange, 'topic', {}),
channel.assertQueue.bind(channel, qName, {}),
bindRoutingKeys.bind(undefined, channel, queue.options.deadLetterExchange, qName,
queue.options.deadLetterExchangeRoutingKey || queue.routingKey)
], callback);
var qName = queue.name + (queue.options.deadLetterQueueSuffix || '-dead-letter');
await channel.assertExchange(queue.options.deadLetterExchange, 'topic', {});
await channel.assertQueue(qName, {});
await bindRoutingKeys(channel, queue.options.deadLetterExchange, qName, queue.options.deadLetterExchangeRoutingKey || queue.routingKey);
}

@@ -44,15 +23,7 @@

*/
exports.setupForConsume = function (channel, params, callback) {
var queue = params.queue;
debug('setupForConsume()', queue);
console.log('queue');
console.log(queue);
async.series([
maybeDeclareDeadLetters.bind(undefined, channel, queue),
channel.assertQueue.bind(channel, queue.name, queue.options),
bindRoutingKeys.bind(undefined, channel, params.exchange, queue.name,
queue.routingKey)
], callback);
exports.setupForConsume = async function (channel, params) {
var { queue } = params;
await maybeDeclareDeadLetters(channel, queue);
await channel.assertQueue(queue.name, queue.options);
await bindRoutingKeys(channel, params.exchange, queue.name, queue.routingKey);
};
// vim: set sw=2 et:
{
"name": "amqp-wrapper",
"version": "6.0.0",
"version": "7.0.0-es6.0",
"engines": {
"node": ">= 8"
},
"description": "A wrapper around https://github.com/squaremo/amqp.node to make consuming and publishing dead easy.",
"main": "amqp.js",
"scripts": {
"test": "semistandard && NODE_ENV=test mocha test --recursive --exit",
"coverage": "nyc -a -c -r html -r text -r lcov npm test"
"coverage": "nyc -a -c -r html -r text -r lcov npm test",
"docs": "jsdoc2md amqp.js",
"test": "semistandard && NODE_ENV=test mocha test --recursive --exit"
},

@@ -26,10 +30,10 @@ "repository": {

"amqplib": "^0.5.5",
"async": "^3.1.0",
"debug": "^2.6.8",
"deferential": "^1.0.0",
"json-stringify-safe": "^5.0.0",
"q": "^1.5.1"
"json-stringify-safe": "^5.0.0"
},
"devDependencies": {
"expect.js": "^0.3.1",
"chai": "^4.2.0",
"chai-as-promised": "^7.1.1",
"dirty-chai": "^2.0.1",
"jsdoc-to-markdown": "^5.0.0",
"mocha": "^6.2.0",

@@ -39,3 +43,3 @@ "nyc": "^11.4.1",

"semistandard": "^12.0.1",
"sinon": "^1.10.3"
"sinon": "^7.3.2"
},

@@ -42,0 +46,0 @@ "semistandard": {

@@ -46,21 +46,23 @@ amqp-wrapper

// Must call this before you consume/publish/etc...
amqp.connect(amqpConnectDone);
async function main () {
// Must call this before you consume/publish/etc...
await amqp.connect();
// Consuming
var handleMessage = function(message, callback) {
//...
};
// You must call:
callback(err, requeue)
// in your handleMessage. If `err` !== `null` then the message will be `nack`ed.
// Requeueing will be requeue iff `requeue` is `true`.
// If `err` is `null` then the message is `ack`ed.
// If an exception occurs in handleMessage, then the message is `nack`ed and not requeued.
// Consuming
var handleMessage = function(message, callback) {
//...
};
// You must call:
callback(err, requeue)
// in your handleMessage. If `err` !== `null` then the message will be `nack`ed.
// Requeueing will be requeue iff `requeue` is `true`.
// If `err` is `null` then the message is `ack`ed.
// If an exception occurs in handleMessage, then the message is `nack`ed and not requeued.
// Start consuming:
amqp.consume(handleMessage);
// Start consuming:
amqp.consume(handleMessage);
// Publishing to arbitrary routing key.
amqp.publish(routingKey, payload, options, done);
// Publishing to arbitrary routing key.
await amqp.publish(routingKey, payload, options);
}
```

@@ -74,5 +76,5 @@

```bash
docker run --rm -p 5672:5672 dockerfile/rabbitmq
docker run -d --rm -p 5672:5672 rabbitmq
```
Then:
Wait for it to finish starting up, then:
```

@@ -86,7 +88,86 @@ npm test

# Promises
# API
`connect()` and `publish()` support promises too. If you don't give a callback, they'll return a promise!
> amqp-wrapper@6.0.0 docs /Users/tim/git/node-amqp-wrapper
> jsdoc2md amqp.js
<a name="AMQPWrapper"></a>
## AMQPWrapper
Class to contain an instantiated connection/channel to AMQP with a given
config.
**Kind**: global class
* [AMQPWrapper](#AMQPWrapper)
* [new AMQPWrapper(config)](#new_AMQPWrapper_new)
* [.connect()](#AMQPWrapper+connect) ⇒ <code>Promise</code>
* [.close()](#AMQPWrapper+close) ⇒ <code>Promise</code>
* [.publish(routingKey, message, options)](#AMQPWrapper+publish) ⇒ <code>Promise</code>
* [.consume(handleMessage, options)](#AMQPWrapper+consume) ⇒ <code>Promise</code>
<a name="new_AMQPWrapper_new"></a>
### new AMQPWrapper(config)
Instantiate an AMQP wrapper with a given config.
| Param | Type |
| --- | --- |
| config | <code>object</code> |
| config.url | <code>string</code> |
| config.exchange | <code>string</code> |
| config.queue | <code>object</code> |
| config.queue.name | <code>string</code> |
| config.queue.routingKey | <code>Array.&lt;string&gt;</code> \| <code>string</code> |
| config.queue.options | <code>object</code> |
<a name="AMQPWrapper+connect"></a>
### amqpWrapper.connect() ⇒ <code>Promise</code>
Connects, establishes a channel, sets up exchange/queues/bindings/dead
lettering.
**Kind**: instance method of [<code>AMQPWrapper</code>](#AMQPWrapper)
<a name="AMQPWrapper+close"></a>
### amqpWrapper.close() ⇒ <code>Promise</code>
Closes connection.
**Kind**: instance method of [<code>AMQPWrapper</code>](#AMQPWrapper)
<a name="AMQPWrapper+publish"></a>
### amqpWrapper.publish(routingKey, message, options) ⇒ <code>Promise</code>
Publish a message to the given routing key, with given options.
**Kind**: instance method of [<code>AMQPWrapper</code>](#AMQPWrapper)
| Param | Type |
| --- | --- |
| routingKey | <code>string</code> |
| message | <code>object</code> \| <code>string</code> |
| options | <code>object</code> |
<a name="AMQPWrapper+consume"></a>
### amqpWrapper.consume(handleMessage, options) ⇒ <code>Promise</code>
handleMessage() is expected to be of the form:
handleMessage(parsedMessage, callback).
If callback is called with a non-null error, then the message will be
nacked. You can call it like:
callback(err, requeue) in order
to instruct rabbit whether to requeue the message
(or discard/dead letter).
If not given, requeue is assumed to be false.
cf http://squaremo.github.io/amqp.node/doc/channel_api.html#toc_34
**Kind**: instance method of [<code>AMQPWrapper</code>](#AMQPWrapper)
| Param | Type |
| --- | --- |
| handleMessage | <code>function</code> |
| options | <code>object</code> |
# License

@@ -93,0 +174,0 @@

@@ -7,10 +7,12 @@ 'use strict';

describe('AMQP', function () {
describe('#connect', function () {
describe('#consume', function () {
it('should return a promise', function (done) {
var amqp = AMQP(config);
var amqp = new AMQP(config);
amqp.connect().then(function () {
done();
}, done);
return amqp.consume().then(function () {
done();
}, done);
});
});
});
});

@@ -1,5 +0,5 @@

'use strict';
const SandboxedModule = require('sandboxed-module');
const expect = require('expect.js');
const expect = require('chai').expect;
require('chai').use(require('chai-as-promised'));
require('chai').use(require('dirty-chai'));
const AMQP = require('../amqp');

@@ -10,21 +10,13 @@ const config = require('./config');

describe('#constructor', function () {
it('should throw with empty constructor', function (done) {
expect(function () { AMQP(); }).to
.throwError('amqp-wrapper: Invalid config');
done();
it('should throw with empty constructor', function () {
expect(() => new AMQP()).to.throw('amqp-wrapper: Invalid config');
});
it('should throw with no url or exchange', function (done) {
expect(function () { AMQP({}); }).to
.throwError('amqp-wrapper: Invalid config');
done();
it('should throw with no url or exchange', function () {
expect(() => new AMQP({})).to.throw('amqp-wrapper: Invalid config');
});
it('should throw with no url', function (done) {
expect(function () { AMQP({ exchange: '' }); }).to
.throwError('amqp-wrapper: Invalid config');
done();
it('should throw with no url', function () {
expect(() => new AMQP({ exchange: '' })).to.throw('amqp-wrapper: Invalid config');
});
it('should throw with no exchange', function (done) {
expect(function () { AMQP({ url: '' }); }).to
.throwError('amqp-wrapper: Invalid config');
done();
it('should throw with no exchange', function () {
expect(() => new AMQP({ url: '' })).to.throw('amqp-wrapper: Invalid config');
});

@@ -34,7 +26,7 @@ });

it('should should fail to connect to bad endpoint', function (done) {
var amqp = AMQP({
var amqp = new AMQP({
url: 'amqp://guest:guest@localhost:6767',
exchange: 'FOO'
});
amqp.connect(function (err) {
amqp.connect().catch(function (err) {
expect(err.code).to.equal('ECONNREFUSED');

@@ -45,60 +37,48 @@ done();

it('should call the callback successfully', function (done) {
var amqp = AMQP(config.good);
amqp.connect(done);
var amqp = new AMQP(config.good);
amqp.connect().then(() => done());
});
it('should declare your queue, and bind it', function (done) {
it('should declare your queue, and bind it', async function () {
var amqpLibMock = require('./amqplibmock')();
var mockedAMQP = SandboxedModule.require('../amqp', {
var MockedAMQP = SandboxedModule.require('../amqp', {
requires: {
'amqplib/callback_api': amqpLibMock.mock
'amqplib': amqpLibMock.mock
}
})(config.good);
});
const mockedAMQP = new MockedAMQP(config.good);
mockedAMQP.connect(function (err) {
if (err) {
return done(err);
await mockedAMQP.connect();
// one queue, dead lettered
expect(amqpLibMock.assertQueueSpy.callCount).to.equal(2);
// Bind the consume queue, and its dead letter queue.
expect(amqpLibMock.bindQueueSpy.callCount).to.equal(2);
});
it('allows you to specify an array for routingKey and binds each given', function (done) {
var amqpLibMock = require('./amqplibmock')();
var MockedAMQP = SandboxedModule.require('../amqp', {
requires: {
'amqplib': amqpLibMock.mock
}
});
const mockedAMQP = new MockedAMQP(config.routingKeyArray);
mockedAMQP.connect().then(function () {
// one queue, dead lettered
expect(amqpLibMock.assertQueueSpy.callCount).to.equal(2);
// Bind the consume queue, and its dead letter queue.
expect(amqpLibMock.bindQueueSpy.callCount).to.equal(2);
// Bind the consume queue with its two routing keys, and its dead
// letter queue.
expect(amqpLibMock.bindQueueSpy.callCount).to.equal(4);
done();
});
}).catch(done);
});
it('allows you to specify an array for routingKey and binds each given',
function (done) {
var amqpLibMock = require('./amqplibmock')();
var mockedAMQP = SandboxedModule.require('../amqp', {
requires: {
'amqplib/callback_api': amqpLibMock.mock
}
})(config.routingKeyArray);
mockedAMQP.connect(function (err) {
if (err) {
return done(err);
}
// one queue, dead lettered
expect(amqpLibMock.assertQueueSpy.callCount).to.equal(2);
// Bind the consume queue with its two routing keys, and its dead
// letter queue.
expect(amqpLibMock.bindQueueSpy.callCount).to.equal(4);
done();
});
});
it('should just declare if you don\'t specify routing key', function (done) {
var amqpLibMock = require('./amqplibmock')();
var mockedAMQP = SandboxedModule.require('../amqp', {
var MockedAMQP = SandboxedModule.require('../amqp', {
requires: {
'amqplib/callback_api': amqpLibMock.mock
'amqplib': amqpLibMock.mock
}
})(config.noRoutingKey);
});
const mockedAMQP = new MockedAMQP(config.noRoutingKey);
mockedAMQP.connect(function (err) {
if (err) {
return done(err);
}
mockedAMQP.connect().then(function () {
// one queue, not dead lettered

@@ -109,81 +89,66 @@ expect(amqpLibMock.assertQueueSpy.callCount).to.equal(1);

done();
});
}).catch(done);
});
});
describe('#publish', function () {
it('should call the callback successfully', function (done) {
var amqp = AMQP(config.good);
amqp.connect(function (err) {
if (err) {
return done(err);
}
amqp.publish('myqueue', 'test', {}, done);
});
it('should resolve successfully', async function () {
var amqp = new AMQP(config.good);
await amqp.connect();
await expect(amqp.publish('myqueue', 'test', {})).to.eventually.be.fulfilled();
});
it('should accept objects', function (done) {
var amqp = AMQP(config.good);
amqp.connect(function (err) {
if (err) {
return done(err);
}
amqp.publish('myqueue', { woo: 'test' }, {}, done);
});
it('should accept objects', async function () {
var amqp = new AMQP(config.good);
await amqp.connect();
await expect(amqp.publish('myqueue', { woo: 'test' }, {})).to.eventually.be.fulfilled();
});
});
describe('#consume', function () {
it('if done(err) is called with err === null, calls ack().',
function (done) {
var ack = function () {
done();
};
describe('#consume', async function () {
it('if done(err) is called with err === null, calls ack().', function (done) {
var ack = function () {
done();
};
var amqpLibMock = require('./amqplibmock')({ overrides: { ack: ack } });
var mockedAMQP = SandboxedModule.require('../amqp', {
requires: {
'amqplib/callback_api': amqpLibMock.mock
}
})(config.good);
function myMessageHandler (parsedMsg, cb) {
cb();
var amqpLibMock = require('./amqplibmock')({ overrides: { ack: ack } });
var MockedAMQP = SandboxedModule.require('../amqp', {
requires: {
'amqplib': amqpLibMock.mock
}
mockedAMQP.connect(function (err) {
if (err) {
return done(err);
}
mockedAMQP.consume(myMessageHandler);
});
});
const mockedAMQP = new MockedAMQP(config.good);
it('if json unparsable, calls nack() with requeue of false.',
function (done) {
var nack = function (message, upTo, requeue) {
expect(requeue).to.equal(false);
done();
};
function myMessageHandler (parsedMsg, cb) {
cb();
}
var amqpLibMock = require('./amqplibmock')({
messageToDeliver: 'nonvalidjson',
overrides: { nack: nack }
});
mockedAMQP.connect().then(function () {
mockedAMQP.consume(myMessageHandler);
}).catch(done);
});
var mockedAMQP = SandboxedModule.require('../amqp', {
requires: {
'amqplib/callback_api': amqpLibMock.mock
}
})(config.good);
it('if json unparsable, calls nack() with requeue of false.', function (done) {
var nack = function (message, upTo, requeue) {
expect(requeue).to.equal(false);
done();
};
function myMessageHandler (parsedMsg, cb) {
cb();
var amqpLibMock = require('./amqplibmock')({
messageToDeliver: 'nonvalidjson',
overrides: { nack: nack }
});
var MockedAMQP = SandboxedModule.require('../amqp', {
requires: {
'amqplib': amqpLibMock.mock
}
});
const mockedAMQP = new MockedAMQP(config.good);
mockedAMQP.connect(function (err) {
if (err) {
return done(err);
}
mockedAMQP.consume(myMessageHandler);
});
});
function myMessageHandler (parsedMsg, cb) {
cb();
}
mockedAMQP.connect().then(function () {
mockedAMQP.consume(myMessageHandler);
}).catch(done);
});
it('if json callback called with err, calls nack() with requeue as given.',

@@ -198,7 +163,8 @@ function (done) {

var mockedAMQP = SandboxedModule.require('../amqp', {
var MockedAMQP = SandboxedModule.require('../amqp', {
requires: {
'amqplib/callback_api': amqpLibMock.mock
'amqplib': amqpLibMock.mock
}
})(config.good);
});
const mockedAMQP = new MockedAMQP(config.good);

@@ -209,8 +175,5 @@ function myMessageHandler (parsedMsg, cb) {

mockedAMQP.connect(function (err) {
if (err) {
return done(err);
}
mockedAMQP.connect().then(function () {
mockedAMQP.consume(myMessageHandler);
});
}).catch(done);
});

@@ -217,0 +180,0 @@ });

@@ -1,3 +0,1 @@

'use strict';
/**

@@ -13,3 +11,3 @@ * This is a mock for the underlying amqplib library that we are wrapping.

var channelMock = {
consume: Sinon.stub().yields({
consume: Sinon.stub().callsArgWith(1, {
content: {

@@ -19,5 +17,5 @@ toString: function () { return messageToDeliver; }

}),
assertExchange: Sinon.stub().callsArg(3),
assertQueue: Sinon.stub().callsArg(2),
bindQueue: Sinon.stub().callsArg(4),
assertExchange: Sinon.stub().resolves(),
assertQueue: Sinon.stub().resolves(),
bindQueue: Sinon.stub().resolves(),
prefetch: Sinon.spy(),

@@ -29,4 +27,4 @@ ack: overrides.ack || Sinon.spy(),

var amqpLibMock = {
connect: Sinon.stub().yields(null, {
createConfirmChannel: Sinon.stub().yields(null, channelMock)
connect: Sinon.stub().resolves({
createConfirmChannel: Sinon.stub().resolves(channelMock)
})

@@ -33,0 +31,0 @@ };

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc