Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

amqplib-easy

Package Overview
Dependencies
Maintainers
36
Versions
28
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

amqplib-easy - npm Package Compare versions

Comparing version 4.5.0 to 5.0.0-node-upgrade.0

18

API.md

@@ -67,3 +67,19 @@ - [`Create(amqpUrl)`](#createamqpurl-socketoptions---amqp)

Defaults to an empty object
- `parse`: (function) parse string content of message. Defaults to `JSON.parse`
- `parse`: (function) a function which accepts raw message as an argument and returns decoded message content.
Defaults to `jsonDecoder` which simply converts json encoded message content to `Object` by calling `JSON.parse`.
The raw message passed as an argument have the following properties:
```javascript
{
content: Buffer,
fields: Object,
properties: Object
}
```
See
[amqplib.channelConsume](http://www.squaremobius.net/amqp.node/channel_api.html#channel_consume)
for more information.
- `prefetch`: (number) of messages to fetch when consuming. Defaults to `1`

@@ -70,0 +86,0 @@ - `arguments`: (object) containing any binding arguments for the queue. Defaults to `{}`

51

index.js
'use strict'
var defaults = require('lodash.defaults')
var Promise = require('bluebird')
var amqp = require('amqplib')
var retry = require('amqplib-retry')
var diehard = require('diehard')
var connections = {}
var sendChannels = {}
const defaults = require('lodash.defaults')
const Promise = require('bluebird')
const amqp = require('amqplib')
const retry = require('amqplib-retry')
const diehard = require('diehard')
const connections = {}
const sendChannels = {}

@@ -34,5 +34,10 @@ function closeConnection (connectionUrl) {

}
return new Buffer(JSON.stringify(obj))
return Buffer.from(JSON.stringify(obj))
}
function jsonDecoder (message) {
if (!(message && message.content)) return null
return JSON.parse(message.content.toString())
}
diehard.register(cleanup)

@@ -79,7 +84,7 @@

function consume (queueConfig, handler) {
var options = defaults({}, queueConfig || {}, {
const options = defaults({}, queueConfig || {}, {
exchangeType: 'topic',
exchangeOptions: {durable: true},
parse: JSON.parse,
queueOptions: {durable: true},
exchangeOptions: { durable: true },
parse: jsonDecoder,
queueOptions: { durable: true },
prefetch: 1,

@@ -125,3 +130,3 @@ arguments: {}

try {
msg.json = options.parse(msg.content.toString())
msg.payload = msg.json = options.parse(msg)
return handler(msg, ch)

@@ -182,4 +187,4 @@ } catch (err) {

.all([
ch.assertExchange(queueConfig.exchange, queueConfig.exchangeType || 'topic', queueConfig.exchangeOptions || {durable: true}),
queueConfig.queue ? ch.assertQueue(queueConfig.queue, queueConfig.queueOptions || {durable: true}) : Promise.resolve()
ch.assertExchange(queueConfig.exchange, queueConfig.exchangeType || 'topic', queueConfig.exchangeOptions || { durable: true }),
queueConfig.queue ? ch.assertQueue(queueConfig.queue, queueConfig.queueOptions || { durable: true }) : Promise.resolve()
])

@@ -191,3 +196,3 @@ .then(function () {

toBuffer(json),
messageOptions || queueConfig.messageOptions || {persistent: true},
messageOptions || queueConfig.messageOptions || { persistent: true },
function (err) {

@@ -211,3 +216,3 @@ if (err) {

function (ch) {
return ch.assertQueue(queueConfig.queue, queueConfig.queueOptions || {durable: true})
return ch.assertQueue(queueConfig.queue, queueConfig.queueOptions || { durable: true })
.then(function () {

@@ -218,3 +223,3 @@ return new Promise(function (resolve, reject) {

toBuffer(json),
messageOptions || queueConfig.messageOptions || {persistent: true},
messageOptions || queueConfig.messageOptions || { persistent: true },
function (err) {

@@ -235,7 +240,7 @@ if (err) {

return {
connect: connect,
close: close,
consume: consume,
publish: publish,
sendToQueue: sendToQueue
connect,
close,
consume,
publish,
sendToQueue
}

@@ -242,0 +247,0 @@ }

{
"name": "amqplib-easy",
"version": "4.5.0",
"version": "5.0.0-node-upgrade.0",
"description": "Simplified API for interacting with AMQP",

@@ -8,3 +8,3 @@ "main": "index.js",

"lint": "standard",
"test": "npm run lint && mocha"
"test": "c8 --clean true -r text -r lcov mocha --timeout 1000 --recursive --exit"
},

@@ -39,13 +39,14 @@ "standard": {

"devDependencies": {
"mocha": "^2.1.0",
"should": "^8.2.2",
"standard": "^6"
"c8": "^7.12.0",
"mocha": "^10.2.0",
"should": "^13.2.3",
"standard": "^17.0.0"
},
"dependencies": {
"amqplib": "^0.4.0",
"amqplib-retry": "^1.0.2",
"bluebird": "^2.6.2",
"diehard": "^1.3.0",
"lodash.defaults": "^2.4.1"
"amqplib": "^0.10.3",
"amqplib-retry": "1.1.8-0-node-upgrade.0",
"bluebird": "^3.7.2",
"diehard": "^1.5.2",
"lodash.defaults": "^4.2.0"
}
}
/* globals it:false */
'use strict'
var amqpUrl = 'amqp://guest:guest@localhost:5672'
var childProcess = require('child_process')
var BPromise = require('bluebird')
var amqp = require('../index')(amqpUrl)
require('should')
const amqpUrl = 'amqp://guest:guest@localhost:5672'
const childProcess = require('child_process')
const BPromise = require('bluebird')
const amqp = require('../index')(amqpUrl)

@@ -28,3 +29,3 @@ describe('amqplib-easy', function () {

// the queue doesn't exist, so w/e
return
})

@@ -36,3 +37,3 @@ })

describe('consumer', function () {
var cancel
let cancel

@@ -50,3 +51,3 @@ afterEach(function () {

parse: function () {
return {name: 'Fred'}
return { name: 'Fred' }
},

@@ -57,5 +58,8 @@ queue: 'found_cats',

function (cat) {
var name = cat.json.name
const name = cat.json.name
const payload = cat.payload
try {
cat.should.have.properties(['content', 'fields', 'properties'])
name.should.equal('Fred')
payload.should.be.deepEqual(cat.json)
done()

@@ -70,3 +74,3 @@ } catch (err) {

return BPromise.all([
amqp.sendToQueue({queue: 'found_cats'}, new Buffer('dsadasd'))
amqp.sendToQueue({ queue: 'found_cats' }, Buffer.from('dsadasd'))
])

@@ -78,3 +82,3 @@ })

it('should handle buffers reasonably', function (done) {
var catCount = 0
let catCount = 0
amqp.consume(

@@ -87,3 +91,3 @@ {

function (cat) {
var name = cat.json.name
const name = cat.json.name
try {

@@ -102,4 +106,4 @@ (name === 'Sally' || name === 'Fred').should.be.ok()

return BPromise.all([
amqp.publish({exchange: 'cat'}, 'found.tawny', new Buffer('{ "name": "Sally" }')),
amqp.sendToQueue({queue: 'found_cats'}, new Buffer('{ "name": "Fred" }'))
amqp.publish({ exchange: 'cat' }, 'found.tawny', Buffer.from('{ "name": "Sally" }')),
amqp.sendToQueue({ queue: 'found_cats' }, Buffer.from('{ "name": "Fred" }'))
])

@@ -111,3 +115,3 @@ })

it('should publish, sendToQueue and receive', function (done) {
var catCount = 0
let catCount = 0
amqp.consume(

@@ -120,3 +124,3 @@ {

function (cat) {
var name = cat.json.name
const name = cat.json.name
try {

@@ -135,4 +139,4 @@ (name === 'Sally' || name === 'Fred').should.be.ok()

return BPromise.all([
amqp.publish({exchange: 'cat'}, 'found.tawny', {name: 'Sally'}),
amqp.sendToQueue({queue: 'found_cats'}, {name: 'Fred'})
amqp.publish({ exchange: 'cat' }, 'found.tawny', { name: 'Sally' }),
amqp.sendToQueue({ queue: 'found_cats' }, { name: 'Fred' })
])

@@ -156,5 +160,5 @@ })

cancel = c
return amqp.publish({exchange: 'cat', exchangeType: 'direct'}, 'found.tawny', {name: 'Sally'})
return amqp.publish({ exchange: 'cat', exchangeType: 'direct' }, 'found.tawny', { name: 'Sally' })
.catch(function () {
return amqp.publish({exchange: 'cat', exchangeType: 'topic'}, 'found.tawny', {name: 'Sally'})
return amqp.publish({ exchange: 'cat', exchangeType: 'topic' }, 'found.tawny', { name: 'Sally' })
})

@@ -178,3 +182,3 @@ })

}
)
)
})

@@ -195,3 +199,3 @@ })

function (cat) {
var name = cat.json.name
const name = cat.json.name
try {

@@ -208,5 +212,5 @@ name.should.equal('Sally')

return amqp.publish(
{exchange: 'cat', exchangeType: 'fanout'},
{ exchange: 'cat', exchangeType: 'fanout' },
'found.tawny',
{name: 'Sally'}
{ name: 'Sally' }
)

@@ -231,3 +235,3 @@ })

}
)
)
})

@@ -245,3 +249,3 @@ })

arguments: {
'color': 'blue'
color: 'blue'
},

@@ -251,3 +255,3 @@ queue: 'found_cats'

function (cat) {
var name = cat.json.name
const name = cat.json.name
try {

@@ -264,6 +268,6 @@ name.should.equal('Sally')

return amqp.publish(
{exchange: 'cat', exchangeType: 'headers'},
{ exchange: 'cat', exchangeType: 'headers' },
'found.tawny',
{name: 'Sally'},
{headers: {color: 'blue'}}
{ name: 'Sally' },
{ headers: { color: 'blue' } }
)

@@ -295,3 +299,3 @@ })

.then(function () {
return amqp.sendToQueue({queue: 'found_cats'}, {name: 'Carl'})
return amqp.sendToQueue({ queue: 'found_cats' }, { name: 'Carl' })
})

@@ -333,7 +337,7 @@ .then(function () {

}).catch(done)
.then(function () {
amqp.connect().then(function () {
done()
.then(function () {
amqp.connect().then(function () {
done()
})
})
})
})

@@ -359,3 +363,3 @@ })

// Spin up a process to kill
var testProcess = childProcess.fork('./test/resources/death.js', {silent: false})
const testProcess = childProcess.fork('./test/resources/death.js', { silent: false })

@@ -376,3 +380,3 @@ testProcess.on('message', function (message) {

describe('x-delayed-message', function () {
var plugin = false
let plugin = false

@@ -403,3 +407,3 @@ after(function () {

// the queue doesn't exist, so w/e
return
})

@@ -416,3 +420,3 @@ })

.then(function (channel) {
return channel.assertExchange('cat', 'x-delayed-message', {arguments: {'x-delayed-type': 'fanout'}})
return channel.assertExchange('cat', 'x-delayed-message', { arguments: { 'x-delayed-type': 'fanout' } })
})

@@ -440,29 +444,29 @@ .then(function () {

amqp
.consume({
exchange: 'cat',
exchangeType: 'x-delayed-message',
exchangeOptions: {arguments: {'x-delayed-type': 'fanout'}},
queue: 'found_cats'
}, function (cat) {
var name = cat.json.name
// There may be some delay, use 2.9 sec to test
var time = cat.json.time + 2900
.consume({
exchange: 'cat',
exchangeType: 'x-delayed-message',
exchangeOptions: { arguments: { 'x-delayed-type': 'fanout' } },
queue: 'found_cats'
}, function (cat) {
const name = cat.json.name
// There may be some delay, use 2.9 sec to test
const time = cat.json.time + 2900
try {
name.should.equal('Sally')
time.should.be.below(new Date().getTime())
done()
} catch (err) {
done(err)
}
})
.then(function () {
return amqp.publish(
{exchange: 'cat', exchangeType: 'x-delayed-message'},
'found.tawny',
{name: 'Sally', time: new Date().getTime()},
{headers: {'x-delay': 3000}}
)
})
try {
name.should.equal('Sally')
time.should.be.below(new Date().getTime())
done()
} catch (err) {
done(err)
}
})
.then(function () {
return amqp.publish(
{ exchange: 'cat', exchangeType: 'x-delayed-message' },
'found.tawny',
{ name: 'Sally', time: new Date().getTime() },
{ headers: { 'x-delay': 3000 } }
)
})
})
})
'use strict'
var amqp = require('../../index')('amqp://guest:guest@localhost:5672')
var diehard = require('diehard')
const amqp = require('../../index')('amqp://guest:guest@localhost:5672')
const diehard = require('diehard')

@@ -6,0 +6,0 @@ amqp.connect()

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc