amqplib-easy
Advanced tools
Comparing version 4.5.0 to 5.0.0-node-upgrade.0
18
API.md
@@ -67,3 +67,19 @@ - [`Create(amqpUrl)`](#createamqpurl-socketoptions---amqp) | ||
Defaults to an empty object | ||
- `parse`: (function) parse string content of message. Defaults to `JSON.parse` | ||
- `parse`: (function) a function which accepts raw message as an argument and returns decoded message content. | ||
Defaults to `jsonDecoder` which simply converts json encoded message content to `Object` by calling `JSON.parse`. | ||
The raw message passed as an argument have the following properties: | ||
```javascript | ||
{ | ||
content: Buffer, | ||
fields: Object, | ||
properties: Object | ||
} | ||
``` | ||
See | ||
[amqplib.channelConsume](http://www.squaremobius.net/amqp.node/channel_api.html#channel_consume) | ||
for more information. | ||
- `prefetch`: (number) of messages to fetch when consuming. Defaults to `1` | ||
@@ -70,0 +86,0 @@ - `arguments`: (object) containing any binding arguments for the queue. Defaults to `{}` |
51
index.js
'use strict' | ||
var defaults = require('lodash.defaults') | ||
var Promise = require('bluebird') | ||
var amqp = require('amqplib') | ||
var retry = require('amqplib-retry') | ||
var diehard = require('diehard') | ||
var connections = {} | ||
var sendChannels = {} | ||
const defaults = require('lodash.defaults') | ||
const Promise = require('bluebird') | ||
const amqp = require('amqplib') | ||
const retry = require('amqplib-retry') | ||
const diehard = require('diehard') | ||
const connections = {} | ||
const sendChannels = {} | ||
@@ -34,5 +34,10 @@ function closeConnection (connectionUrl) { | ||
} | ||
return new Buffer(JSON.stringify(obj)) | ||
return Buffer.from(JSON.stringify(obj)) | ||
} | ||
function jsonDecoder (message) { | ||
if (!(message && message.content)) return null | ||
return JSON.parse(message.content.toString()) | ||
} | ||
diehard.register(cleanup) | ||
@@ -79,7 +84,7 @@ | ||
function consume (queueConfig, handler) { | ||
var options = defaults({}, queueConfig || {}, { | ||
const options = defaults({}, queueConfig || {}, { | ||
exchangeType: 'topic', | ||
exchangeOptions: {durable: true}, | ||
parse: JSON.parse, | ||
queueOptions: {durable: true}, | ||
exchangeOptions: { durable: true }, | ||
parse: jsonDecoder, | ||
queueOptions: { durable: true }, | ||
prefetch: 1, | ||
@@ -125,3 +130,3 @@ arguments: {} | ||
try { | ||
msg.json = options.parse(msg.content.toString()) | ||
msg.payload = msg.json = options.parse(msg) | ||
return handler(msg, ch) | ||
@@ -182,4 +187,4 @@ } catch (err) { | ||
.all([ | ||
ch.assertExchange(queueConfig.exchange, queueConfig.exchangeType || 'topic', queueConfig.exchangeOptions || {durable: true}), | ||
queueConfig.queue ? ch.assertQueue(queueConfig.queue, queueConfig.queueOptions || {durable: true}) : Promise.resolve() | ||
ch.assertExchange(queueConfig.exchange, queueConfig.exchangeType || 'topic', queueConfig.exchangeOptions || { durable: true }), | ||
queueConfig.queue ? ch.assertQueue(queueConfig.queue, queueConfig.queueOptions || { durable: true }) : Promise.resolve() | ||
]) | ||
@@ -191,3 +196,3 @@ .then(function () { | ||
toBuffer(json), | ||
messageOptions || queueConfig.messageOptions || {persistent: true}, | ||
messageOptions || queueConfig.messageOptions || { persistent: true }, | ||
function (err) { | ||
@@ -211,3 +216,3 @@ if (err) { | ||
function (ch) { | ||
return ch.assertQueue(queueConfig.queue, queueConfig.queueOptions || {durable: true}) | ||
return ch.assertQueue(queueConfig.queue, queueConfig.queueOptions || { durable: true }) | ||
.then(function () { | ||
@@ -218,3 +223,3 @@ return new Promise(function (resolve, reject) { | ||
toBuffer(json), | ||
messageOptions || queueConfig.messageOptions || {persistent: true}, | ||
messageOptions || queueConfig.messageOptions || { persistent: true }, | ||
function (err) { | ||
@@ -235,7 +240,7 @@ if (err) { | ||
return { | ||
connect: connect, | ||
close: close, | ||
consume: consume, | ||
publish: publish, | ||
sendToQueue: sendToQueue | ||
connect, | ||
close, | ||
consume, | ||
publish, | ||
sendToQueue | ||
} | ||
@@ -242,0 +247,0 @@ } |
{ | ||
"name": "amqplib-easy", | ||
"version": "4.5.0", | ||
"version": "5.0.0-node-upgrade.0", | ||
"description": "Simplified API for interacting with AMQP", | ||
@@ -8,3 +8,3 @@ "main": "index.js", | ||
"lint": "standard", | ||
"test": "npm run lint && mocha" | ||
"test": "c8 --clean true -r text -r lcov mocha --timeout 1000 --recursive --exit" | ||
}, | ||
@@ -39,13 +39,14 @@ "standard": { | ||
"devDependencies": { | ||
"mocha": "^2.1.0", | ||
"should": "^8.2.2", | ||
"standard": "^6" | ||
"c8": "^7.12.0", | ||
"mocha": "^10.2.0", | ||
"should": "^13.2.3", | ||
"standard": "^17.0.0" | ||
}, | ||
"dependencies": { | ||
"amqplib": "^0.4.0", | ||
"amqplib-retry": "^1.0.2", | ||
"bluebird": "^2.6.2", | ||
"diehard": "^1.3.0", | ||
"lodash.defaults": "^2.4.1" | ||
"amqplib": "^0.10.3", | ||
"amqplib-retry": "1.1.8-0-node-upgrade.0", | ||
"bluebird": "^3.7.2", | ||
"diehard": "^1.5.2", | ||
"lodash.defaults": "^4.2.0" | ||
} | ||
} |
/* globals it:false */ | ||
'use strict' | ||
var amqpUrl = 'amqp://guest:guest@localhost:5672' | ||
var childProcess = require('child_process') | ||
var BPromise = require('bluebird') | ||
var amqp = require('../index')(amqpUrl) | ||
require('should') | ||
const amqpUrl = 'amqp://guest:guest@localhost:5672' | ||
const childProcess = require('child_process') | ||
const BPromise = require('bluebird') | ||
const amqp = require('../index')(amqpUrl) | ||
@@ -28,3 +29,3 @@ describe('amqplib-easy', function () { | ||
// the queue doesn't exist, so w/e | ||
return | ||
}) | ||
@@ -36,3 +37,3 @@ }) | ||
describe('consumer', function () { | ||
var cancel | ||
let cancel | ||
@@ -50,3 +51,3 @@ afterEach(function () { | ||
parse: function () { | ||
return {name: 'Fred'} | ||
return { name: 'Fred' } | ||
}, | ||
@@ -57,5 +58,8 @@ queue: 'found_cats', | ||
function (cat) { | ||
var name = cat.json.name | ||
const name = cat.json.name | ||
const payload = cat.payload | ||
try { | ||
cat.should.have.properties(['content', 'fields', 'properties']) | ||
name.should.equal('Fred') | ||
payload.should.be.deepEqual(cat.json) | ||
done() | ||
@@ -70,3 +74,3 @@ } catch (err) { | ||
return BPromise.all([ | ||
amqp.sendToQueue({queue: 'found_cats'}, new Buffer('dsadasd')) | ||
amqp.sendToQueue({ queue: 'found_cats' }, Buffer.from('dsadasd')) | ||
]) | ||
@@ -78,3 +82,3 @@ }) | ||
it('should handle buffers reasonably', function (done) { | ||
var catCount = 0 | ||
let catCount = 0 | ||
amqp.consume( | ||
@@ -87,3 +91,3 @@ { | ||
function (cat) { | ||
var name = cat.json.name | ||
const name = cat.json.name | ||
try { | ||
@@ -102,4 +106,4 @@ (name === 'Sally' || name === 'Fred').should.be.ok() | ||
return BPromise.all([ | ||
amqp.publish({exchange: 'cat'}, 'found.tawny', new Buffer('{ "name": "Sally" }')), | ||
amqp.sendToQueue({queue: 'found_cats'}, new Buffer('{ "name": "Fred" }')) | ||
amqp.publish({ exchange: 'cat' }, 'found.tawny', Buffer.from('{ "name": "Sally" }')), | ||
amqp.sendToQueue({ queue: 'found_cats' }, Buffer.from('{ "name": "Fred" }')) | ||
]) | ||
@@ -111,3 +115,3 @@ }) | ||
it('should publish, sendToQueue and receive', function (done) { | ||
var catCount = 0 | ||
let catCount = 0 | ||
amqp.consume( | ||
@@ -120,3 +124,3 @@ { | ||
function (cat) { | ||
var name = cat.json.name | ||
const name = cat.json.name | ||
try { | ||
@@ -135,4 +139,4 @@ (name === 'Sally' || name === 'Fred').should.be.ok() | ||
return BPromise.all([ | ||
amqp.publish({exchange: 'cat'}, 'found.tawny', {name: 'Sally'}), | ||
amqp.sendToQueue({queue: 'found_cats'}, {name: 'Fred'}) | ||
amqp.publish({ exchange: 'cat' }, 'found.tawny', { name: 'Sally' }), | ||
amqp.sendToQueue({ queue: 'found_cats' }, { name: 'Fred' }) | ||
]) | ||
@@ -156,5 +160,5 @@ }) | ||
cancel = c | ||
return amqp.publish({exchange: 'cat', exchangeType: 'direct'}, 'found.tawny', {name: 'Sally'}) | ||
return amqp.publish({ exchange: 'cat', exchangeType: 'direct' }, 'found.tawny', { name: 'Sally' }) | ||
.catch(function () { | ||
return amqp.publish({exchange: 'cat', exchangeType: 'topic'}, 'found.tawny', {name: 'Sally'}) | ||
return amqp.publish({ exchange: 'cat', exchangeType: 'topic' }, 'found.tawny', { name: 'Sally' }) | ||
}) | ||
@@ -178,3 +182,3 @@ }) | ||
} | ||
) | ||
) | ||
}) | ||
@@ -195,3 +199,3 @@ }) | ||
function (cat) { | ||
var name = cat.json.name | ||
const name = cat.json.name | ||
try { | ||
@@ -208,5 +212,5 @@ name.should.equal('Sally') | ||
return amqp.publish( | ||
{exchange: 'cat', exchangeType: 'fanout'}, | ||
{ exchange: 'cat', exchangeType: 'fanout' }, | ||
'found.tawny', | ||
{name: 'Sally'} | ||
{ name: 'Sally' } | ||
) | ||
@@ -231,3 +235,3 @@ }) | ||
} | ||
) | ||
) | ||
}) | ||
@@ -245,3 +249,3 @@ }) | ||
arguments: { | ||
'color': 'blue' | ||
color: 'blue' | ||
}, | ||
@@ -251,3 +255,3 @@ queue: 'found_cats' | ||
function (cat) { | ||
var name = cat.json.name | ||
const name = cat.json.name | ||
try { | ||
@@ -264,6 +268,6 @@ name.should.equal('Sally') | ||
return amqp.publish( | ||
{exchange: 'cat', exchangeType: 'headers'}, | ||
{ exchange: 'cat', exchangeType: 'headers' }, | ||
'found.tawny', | ||
{name: 'Sally'}, | ||
{headers: {color: 'blue'}} | ||
{ name: 'Sally' }, | ||
{ headers: { color: 'blue' } } | ||
) | ||
@@ -295,3 +299,3 @@ }) | ||
.then(function () { | ||
return amqp.sendToQueue({queue: 'found_cats'}, {name: 'Carl'}) | ||
return amqp.sendToQueue({ queue: 'found_cats' }, { name: 'Carl' }) | ||
}) | ||
@@ -333,7 +337,7 @@ .then(function () { | ||
}).catch(done) | ||
.then(function () { | ||
amqp.connect().then(function () { | ||
done() | ||
.then(function () { | ||
amqp.connect().then(function () { | ||
done() | ||
}) | ||
}) | ||
}) | ||
}) | ||
@@ -359,3 +363,3 @@ }) | ||
// Spin up a process to kill | ||
var testProcess = childProcess.fork('./test/resources/death.js', {silent: false}) | ||
const testProcess = childProcess.fork('./test/resources/death.js', { silent: false }) | ||
@@ -376,3 +380,3 @@ testProcess.on('message', function (message) { | ||
describe('x-delayed-message', function () { | ||
var plugin = false | ||
let plugin = false | ||
@@ -403,3 +407,3 @@ after(function () { | ||
// the queue doesn't exist, so w/e | ||
return | ||
}) | ||
@@ -416,3 +420,3 @@ }) | ||
.then(function (channel) { | ||
return channel.assertExchange('cat', 'x-delayed-message', {arguments: {'x-delayed-type': 'fanout'}}) | ||
return channel.assertExchange('cat', 'x-delayed-message', { arguments: { 'x-delayed-type': 'fanout' } }) | ||
}) | ||
@@ -440,29 +444,29 @@ .then(function () { | ||
amqp | ||
.consume({ | ||
exchange: 'cat', | ||
exchangeType: 'x-delayed-message', | ||
exchangeOptions: {arguments: {'x-delayed-type': 'fanout'}}, | ||
queue: 'found_cats' | ||
}, function (cat) { | ||
var name = cat.json.name | ||
// There may be some delay, use 2.9 sec to test | ||
var time = cat.json.time + 2900 | ||
.consume({ | ||
exchange: 'cat', | ||
exchangeType: 'x-delayed-message', | ||
exchangeOptions: { arguments: { 'x-delayed-type': 'fanout' } }, | ||
queue: 'found_cats' | ||
}, function (cat) { | ||
const name = cat.json.name | ||
// There may be some delay, use 2.9 sec to test | ||
const time = cat.json.time + 2900 | ||
try { | ||
name.should.equal('Sally') | ||
time.should.be.below(new Date().getTime()) | ||
done() | ||
} catch (err) { | ||
done(err) | ||
} | ||
}) | ||
.then(function () { | ||
return amqp.publish( | ||
{exchange: 'cat', exchangeType: 'x-delayed-message'}, | ||
'found.tawny', | ||
{name: 'Sally', time: new Date().getTime()}, | ||
{headers: {'x-delay': 3000}} | ||
) | ||
}) | ||
try { | ||
name.should.equal('Sally') | ||
time.should.be.below(new Date().getTime()) | ||
done() | ||
} catch (err) { | ||
done(err) | ||
} | ||
}) | ||
.then(function () { | ||
return amqp.publish( | ||
{ exchange: 'cat', exchangeType: 'x-delayed-message' }, | ||
'found.tawny', | ||
{ name: 'Sally', time: new Date().getTime() }, | ||
{ headers: { 'x-delay': 3000 } } | ||
) | ||
}) | ||
}) | ||
}) |
'use strict' | ||
var amqp = require('../../index')('amqp://guest:guest@localhost:5672') | ||
var diehard = require('diehard') | ||
const amqp = require('../../index')('amqp://guest:guest@localhost:5672') | ||
const diehard = require('diehard') | ||
@@ -6,0 +6,0 @@ amqp.connect() |
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
New author
Supply chain riskA new npm collaborator published a version of the package for the first time. New collaborators are usually benign additions to a project, but do indicate a change to the security surface area of a package.
Found 1 instance in 1 package
No v1
QualityPackage is not semver >=1. This means it is not stable and does not support ^ ranges.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
28596
651
4
11
2
2
+ Added@acuminous/bitsyntax@0.1.2(transitive)
+ Addedamqplib@0.10.4(transitive)
+ Addedamqplib-retry@1.1.8-0-node-upgrade.0(transitive)
+ Addedbuffer-more-ints@1.0.0(transitive)
+ Addeddebug@4.3.7(transitive)
+ Addedlodash.defaults@4.2.0(transitive)
+ Addedquerystringify@2.2.0(transitive)
+ Addedrequires-port@1.0.0(transitive)
+ Addedsafe-buffer@5.1.2(transitive)
+ Addedurl-parse@1.5.10(transitive)
- Removedamqplib@0.4.2(transitive)
- Removedamqplib-retry@1.1.7(transitive)
- Removedbitsyntax@0.0.4(transitive)
- Removedbluebird@2.11.0(transitive)
- Removedbuffer-more-ints@0.0.2(transitive)
- Removedlodash._isnative@2.4.1(transitive)
- Removedlodash._objecttypes@2.4.1(transitive)
- Removedlodash._shimkeys@2.4.1(transitive)
- Removedlodash.defaults@2.4.1(transitive)
- Removedlodash.isobject@2.4.1(transitive)
- Removedlodash.keys@2.4.1(transitive)
- Removedwhen@3.6.4(transitive)
Updatedamqplib@^0.10.3
Updatedbluebird@^3.7.2
Updateddiehard@^1.5.2
Updatedlodash.defaults@^4.2.0