Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

amqplib-easy

Package Overview
Dependencies
Maintainers
1
Versions
28
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

amqplib-easy - npm Package Compare versions

Comparing version 4.2.0 to 4.2.2

6

API.md

@@ -67,4 +67,4 @@ - [`Create(amqpUrl)`](#createamqpurl-socketoptions---amqp)

- `retry`: (boolean or object) if false, disable retry via
[amqplib-retry](https://www.npmjs.com/package/amqplib-retry). An object with
`failQueue` can also be specified to override the fail queue. Defaults to
true.
[amqplib-retry](https://www.npmjs.com/package/amqplib-retry). Defaults to true, but an object with properties that follow may be specified to customize the behavior of the retry.
- `failQueue` can also be specified to override the name of the queue used to hold failed messages.
- `delay` a function which accepts the number of attempts as an argument and returns a number of milliseconds to indicate how long to wait before retrying. If `-1` is returned, the message will be put into the failure queue.

@@ -1,13 +0,13 @@

'use strict';
'use strict'
var defaults = require('lodash.defaults'),
BPromise = require('bluebird'),
amqp = require('amqplib'),
retry = require('amqplib-retry'),
diehard = require('diehard'),
connections = {},
sendChannels = {};
var defaults = require('lodash.defaults')
var Promise = require('bluebird')
var amqp = require('amqplib')
var retry = require('amqplib-retry')
var diehard = require('diehard')
var connections = {}
var sendChannels = {}
function cleanup(done) {
BPromise.map(
function cleanup (done) {
return Promise.map(
Object.keys(connections),

@@ -17,29 +17,33 @@ function (connectionUrl) {

.then(function (connection) {
return connection.close();
});
}
).nodeify(done);
return connection.close()
})
})
.then(function () {
connections = {}
sendChannels = {}
})
.nodeify(done)
}
function toBuffer(obj) {
function toBuffer (obj) {
if (obj instanceof Buffer) {
return obj;
return obj
}
return new Buffer(JSON.stringify(obj));
return new Buffer(JSON.stringify(obj))
}
diehard.register(cleanup);
diehard.register(cleanup)
module.exports = function (amqpUrl, socketOptions) {
function connect() {
function connect () {
if (!connections[amqpUrl]) {
socketOptions = defaults({}, socketOptions || {}, {
channelMax: 100
});
connections[amqpUrl] = BPromise.resolve(amqp.connect(amqpUrl, socketOptions));
})
connections[amqpUrl] = Promise.resolve(amqp.connect(amqpUrl, socketOptions))
}
return connections[amqpUrl];
return connections[amqpUrl]
}
function sendChannel() {
function sendChannel () {
if (!sendChannels[amqpUrl]) {

@@ -51,19 +55,19 @@ sendChannels[amqpUrl] = connect()

channel.on('error', function () {
sendChannels[amqpUrl] = null;
sendChannels[amqpUrl] = null
// clear out the channel since it's in a bad state
});
return channel;
});
});
})
return channel
})
})
}
return sendChannels[amqpUrl];
return sendChannels[amqpUrl]
}
function createChannel() {
function createChannel () {
return connect().then(function (connection) {
return BPromise.resolve(connection.createChannel());
});
return Promise.resolve(connection.createChannel())
})
}
function consume(queueConfig, handler) {
function consume (queueConfig, handler) {
var options = defaults({}, queueConfig || {}, {

@@ -76,7 +80,7 @@ exchangeType: 'topic',

arguments: {}
});
})
// automatically enable retry unless it is specifically disabled
if ((options.retry !== false && !options.retry) || options.retry === true) {
options.retry = {};
options.retry = {}
}

@@ -88,3 +92,3 @@

failQueue: options.queue + '.failure'
});
})
}

@@ -94,31 +98,32 @@

.then(function (ch) {
ch.prefetch(options.prefetch);
return BPromise.resolve()
ch.prefetch(options.prefetch)
return Promise.resolve()
.then(function () {
return BPromise.all([
options.exchange ? ch.assertExchange(options.exchange, options.exchangeType, options.exchangeOptions) : BPromise.resolve(),
return Promise.all([
options.exchange ? ch.assertExchange(options.exchange, options.exchangeType, options.exchangeOptions) : Promise.resolve(),
ch.assertQueue(options.queue, options.queueOptions),
options.retry && options.retry.failQueue ? ch.assertQueue(options.retry.failQueue, options.queueOptions) : BPromise.resolve()
]);
options.retry && options.retry.failQueue ? ch.assertQueue(options.retry.failQueue, options.queueOptions) : Promise.resolve()
])
})
.then(function () {
if (options.topics && options.topics.length) {
return BPromise.map(options.topics, function (topic) {
return ch.bindQueue(options.queue, options.exchange, topic, options.arguments);
});
return Promise.map(options.topics, function (topic) {
return ch.bindQueue(options.queue, options.exchange, topic, options.arguments)
})
} else if (options.exchangeType === 'fanout') {
return ch.bindQueue(options.queue, options.exchange, '', options.arguments);
return ch.bindQueue(options.queue, options.exchange, '', options.arguments)
}
})
.then(function () {
function parse(msg) {
function parse (msg) {
return function () {
try {
msg.json = options.parse(msg.content.toString());
return handler(msg, ch);
msg.json = options.parse(msg.content.toString())
return handler(msg, ch)
} catch (err) {
console.error('Error deserializing AMQP message content.', err);
console.error('Error deserializing AMQP message content.', err)
}
};
}
}
if (options.retry) {

@@ -129,21 +134,26 @@ return ch.consume(options.queue, retry({

failureQueue: options.retry.failQueue,
delay: options.retry.delay,
handler: function (msg) {
if (!msg) { return; }
return BPromise.resolve()
.then(parse(msg));
if (!msg) {
return
}
return Promise.resolve()
.then(parse(msg))
}
}));
}))
} else {
return ch.consume(options.queue, function (msg) {
if (!msg) { return; }
return BPromise.resolve()
if (!msg) {
return
}
return Promise.resolve()
.then(parse(msg))
.then(function () {
return ch.ack(msg);
return ch.ack(msg)
})
.catch(function (err) {
ch.nack(msg);
throw err;
});
});
ch.nack(msg)
throw err
})
})
}

@@ -153,60 +163,63 @@ })

return function () {
return BPromise.resolve(ch.cancel(consumerInfo.consumerTag));
};
});
});
return Promise.resolve(ch.cancel(consumerInfo.consumerTag))
}
})
})
}
function publish(queueConfig, key, json, messageOptions) {
function publish (queueConfig, key, json, messageOptions) {
return sendChannel()
.then(function (ch) {
if (queueConfig.exchange === null || queueConfig.exchange === undefined) {
throw new Error('Client tries to publish to an exchange while exchange name is not undefined.');
.then(
function (ch) {
if (queueConfig.exchange === null || queueConfig.exchange === undefined) {
throw new Error('Client tries to publish to an exchange while exchange name is not undefined.')
}
return Promise
.all([
ch.assertExchange(queueConfig.exchange, queueConfig.exchangeType || 'topic', queueConfig.exchangeOptions || {durable: true}),
queueConfig.queue ? ch.assertQueue(queueConfig.queue, queueConfig.queueOptions || {durable: true}) : Promise.resolve()
])
.then(function () {
return new Promise(function (resolve, reject) {
ch.publish(queueConfig.exchange,
key,
toBuffer(json),
messageOptions || queueConfig.messageOptions || {persistent: true},
function (err) {
if (err) {
reject(err)
} else {
resolve()
}
}
)
})
})
}
return BPromise.all([
ch.assertExchange(queueConfig.exchange, queueConfig.exchangeType || 'topic', queueConfig.exchangeOptions || {durable: true}),
queueConfig.queue ? ch.assertQueue(queueConfig.queue, queueConfig.queueOptions || {durable: true}) : BPromise.resolve()
])
.then(function () {
return new BPromise(function (resolve, reject) {
ch.publish(queueConfig.exchange,
key,
toBuffer(json),
messageOptions || queueConfig.messageOptions || {persistent: true},
function (err) {
if (err) {
reject(err);
} else {
resolve();
}
}
);
});
});
}
);
)
}
function sendToQueue(queueConfig, json, messageOptions) {
function sendToQueue (queueConfig, json, messageOptions) {
return sendChannel()
.then(function (ch) {
return ch.assertQueue(queueConfig.queue, queueConfig.queueOptions || {durable: true})
.then(function () {
return new BPromise(function (resolve, reject) {
ch.sendToQueue(
queueConfig.queue,
toBuffer(json),
messageOptions || queueConfig.messageOptions || {persistent: true},
function (err) {
if (err) {
reject(err);
} else {
resolve();
.then(
function (ch) {
return ch.assertQueue(queueConfig.queue, queueConfig.queueOptions || {durable: true})
.then(function () {
return new Promise(function (resolve, reject) {
ch.sendToQueue(
queueConfig.queue,
toBuffer(json),
messageOptions || queueConfig.messageOptions || {persistent: true},
function (err) {
if (err) {
reject(err)
} else {
resolve()
}
}
}
);
});
});
}
);
)
})
})
}
)
}

@@ -219,5 +232,5 @@

sendToQueue: sendToQueue
};
};
}
}
module.exports.close = cleanup;
module.exports.close = cleanup
{
"name": "amqplib-easy",
"version": "4.2.0",
"version": "4.2.2",
"description": "Simplified API for interacting with AMQP",
"main": "index.js",
"scripts": {
"lint": "eslint --ignore-path .gitignore .",
"lint": "standard",
"test": "npm run lint && mocha"
},
"standard": {
"globals": [
"describe",
"context",
"before",
"beforeEach",
"after",
"afterEach",
"it",
"expect"
]
},
"repository": {

@@ -26,6 +38,5 @@ "type": "git",

"devDependencies": {
"eslint": "^0.11.0",
"eslint-plugin-nodeca": "^1.0.3",
"mocha": "^2.1.0",
"should": "^4.5.0"
"should": "^8.2.2",
"standard": "^6"
},

@@ -32,0 +43,0 @@ "dependencies": {

/*globals it:false*/
'use strict';
'use strict'
var amqpUrl = 'amqp://guest:guest@localhost:5672',
childProcess = require('child_process'),
BPromise = require('bluebird'),
amqp = require('../index')(amqpUrl);
var amqpUrl = 'amqp://guest:guest@localhost:5672'
var childProcess = require('child_process')
var BPromise = require('bluebird')
var amqp = require('../index')(amqpUrl)

@@ -15,28 +15,29 @@ describe('amqplib-easy', function () {

.then(function (channel) {
return BPromise.all([
channel.checkQueue('found_cats')
.then(function () {
return channel.deleteQueue('found_cats');
}),
channel.checkQueue('found_cats.failure')
.then(function () {
return channel.deleteQueue('found_cats.failure');
})
])
return BPromise.all(
[
channel.checkQueue('found_cats')
.then(function () {
return channel.deleteQueue('found_cats')
}),
channel.checkQueue('found_cats.failure')
.then(function () {
return channel.deleteQueue('found_cats.failure')
})
])
.catch(function () {
//the queue doesn't exist, so w/e
return;
});
});
});
});
// the queue doesn't exist, so w/e
return
})
})
})
})
describe('consumer', function () {
var cancel;
var cancel
afterEach(function () {
if (cancel) {
return cancel();
return cancel()
}
});
})

@@ -47,25 +48,29 @@ it('should accept alternate parser', function (done) {

exchange: 'cat',
parse: function () { return { name: 'Fred' }; },
parse: function () {
return {name: 'Fred'}
},
queue: 'found_cats',
topics: [ 'found.*' ]
topics: ['found.*']
},
function (cat) {
var name = cat.json.name;
var name = cat.json.name
try {
name.should.equal('Fred');
done();
} catch (err) { done(err); }
name.should.equal('Fred')
done()
} catch (err) {
done(err)
}
}
)
.then(function (c) {
cancel = c;
cancel = c
return BPromise.all([
amqp.sendToQueue({ queue: 'found_cats' }, new Buffer('dsadasd'))
]);
amqp.sendToQueue({queue: 'found_cats'}, new Buffer('dsadasd'))
])
})
.catch(done);
});
.catch(done)
})
it('should handle buffers reasonably', function (done) {
var catCount = 0;
var catCount = 0
amqp.consume(

@@ -75,28 +80,28 @@ {

queue: 'found_cats',
topics: [ 'found.*' ]
topics: ['found.*']
},
function (cat) {
var name = cat.json.name;
var name = cat.json.name
try {
/*eslint-disable no-unused-expressions*/
(name === 'Sally' || name === 'Fred').should.be.ok;
/*eslint-enable no-unused-expressions*/
(name === 'Sally' || name === 'Fred').should.be.ok()
if (++catCount === 2) {
done();
done()
}
} catch (err) { done(err); }
} catch (err) {
done(err)
}
}
)
.then(function (c) {
cancel = c;
cancel = c
return BPromise.all([
amqp.publish({ exchange: 'cat' }, 'found.tawny', new Buffer('{ "name": "Sally" }')),
amqp.sendToQueue({ queue: 'found_cats' }, new Buffer('{ "name": "Fred" }'))
]);
amqp.publish({exchange: 'cat'}, 'found.tawny', new Buffer('{ "name": "Sally" }')),
amqp.sendToQueue({queue: 'found_cats'}, new Buffer('{ "name": "Fred" }'))
])
})
.catch(done);
});
.catch(done)
})
it('should publish, sendToQueue and receive', function (done) {
var catCount = 0;
var catCount = 0
amqp.consume(

@@ -106,27 +111,26 @@ {

queue: 'found_cats',
topics: [ 'found.*' ]
topics: ['found.*']
},
function (cat) {
var name = cat.json.name;
var name = cat.json.name
try {
/*eslint-disable no-unused-expressions*/
(name === 'Sally' || name === 'Fred').should.be.ok;
/*eslint-enable no-unused-expressions*/
(name === 'Sally' || name === 'Fred').should.be.ok()
if (++catCount === 2) {
done();
done()
}
} catch (err) { done(err); }
} catch (err) {
done(err)
}
}
)
.then(function (c) {
cancel = c;
cancel = c
return BPromise.all([
amqp.publish({ exchange: 'cat' }, 'found.tawny', { name: 'Sally' }),
amqp.sendToQueue({ queue: 'found_cats' }, { name: 'Fred' })
]);
amqp.publish({exchange: 'cat'}, 'found.tawny', {name: 'Sally'}),
amqp.sendToQueue({queue: 'found_cats'}, {name: 'Fred'})
])
})
.catch(done);
.catch(done)
})
});
it('should publish even if something causes the channel to die', function (done) {

@@ -137,21 +141,20 @@ amqp.consume(

queue: 'found_cats',
topics: [ 'found.*' ]
topics: ['found.*']
},
function () {
done();
done()
}
)
.then(function (c) {
cancel = c;
return amqp.publish({ exchange: 'cat', exchangeType: 'direct' }, 'found.tawny', { name: 'Sally' })
cancel = c
return amqp.publish({exchange: 'cat', exchangeType: 'direct'}, 'found.tawny', {name: 'Sally'})
.catch(function () {
return amqp.publish({ exchange: 'cat', exchangeType: 'topic' }, 'found.tawny', { name: 'Sally' });
});
return amqp.publish({exchange: 'cat', exchangeType: 'topic'}, 'found.tawny', {name: 'Sally'})
})
})
.catch(done);
});
.catch(done)
})
describe('fanout exchange', function () {
function deleteCat() {
function deleteCat () {
return amqp.connect()

@@ -162,14 +165,15 @@ .then(function (connection) {

channel.checkExchange('cat')
.then(
function () {
channel.deleteExchange('cat');
},
function () { /* NBD it doesn't exist */ }
);
});
});
.then(
function () {
channel.deleteExchange('cat')
},
function () { /* NBD it doesn't exist */
}
)
})
})
}
beforeEach(deleteCat);
afterEach(deleteCat);
beforeEach(deleteCat)
afterEach(deleteCat)

@@ -184,25 +188,25 @@ it('should publish via fanout', function (done) {

function (cat) {
var name = cat.json.name;
var name = cat.json.name
try {
name.should.equal('Sally');
done();
} catch (err) { done(err); }
name.should.equal('Sally')
done()
} catch (err) {
done(err)
}
}
)
.then(function (c) {
cancel = c;
cancel = c
return amqp.publish(
{ exchange: 'cat', exchangeType: 'fanout' },
{exchange: 'cat', exchangeType: 'fanout'},
'found.tawny',
{ name: 'Sally' }
);
{name: 'Sally'}
)
})
.catch(done);
});
});
});
.catch(done)
})
})
})
it('should cancel consumer', function (done) {
amqp.consume(

@@ -212,29 +216,27 @@ {

queue: 'found_cats',
topics: [ 'found.*' ]
topics: ['found.*']
},
function () {
done('Got a cat');
done('Got a cat')
}
)
.then(function (cancel) {
return cancel();
return cancel()
})
.then(function () {
//add some delay so that the cancel takes affect
return BPromise.delay(200);
// add some delay so that the cancel takes affect
return BPromise.delay(200)
})
.then(function () {
return amqp.sendToQueue({ queue: 'found_cats' }, { name: 'Carl' });
return amqp.sendToQueue({queue: 'found_cats'}, {name: 'Carl'})
})
.then(function () {
//add some delay so that we get the sent message if it were sent
return BPromise.delay(200);
// add some delay so that we get the sent message if it were sent
return BPromise.delay(200)
})
.nodeify(done);
.nodeify(done)
})
})
});
});
describe('Connection managment', function () {
it('should reuse the existing connection', function (done) {

@@ -245,13 +247,13 @@ amqp.connect()

.then(function (connection2) {
connection1.should.equal(connection2);
done();
});
connection1.should.equal(connection2)
done()
})
})
.catch(done);
});
.catch(done)
})
it('should close the connection upon death', function (done) {
this.timeout(3000);
this.timeout(3000)
// Spin up a process to kill
var testProcess = childProcess.fork('./test/resources/death.js', { silent: false });
var testProcess = childProcess.fork('./test/resources/death.js', {silent: false})

@@ -261,10 +263,10 @@ testProcess.on('message', function (message) {

case 'ok':
return done();
return done()
case 'ready':
return testProcess.kill('SIGTERM');
return testProcess.kill('SIGTERM')
default:
return done(new Error('Unknown message ' + message));
return done(new Error('Unknown message ' + message))
}
});
});
});
})
})
})

@@ -1,5 +0,5 @@

'use strict';
'use strict'
var amqp = require('../../index')('amqp://guest:guest@localhost:5672'),
diehard = require('diehard');
var amqp = require('../../index')('amqp://guest:guest@localhost:5672')
var diehard = require('diehard')

@@ -9,9 +9,9 @@ amqp.connect()

connection.on('close', function () {
process.send('ok');
});
process.send('ok')
})
})
.then(function () {
process.send('ready');
});
process.send('ready')
})
diehard.listen();
diehard.listen()
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc