Socket
Socket
Sign inDemoInstall

amqplib-easy

Package Overview
Dependencies
Maintainers
2
Versions
28
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

amqplib-easy - npm Package Compare versions

Comparing version 3.4.0 to 4.0.0

7

API.md

@@ -1,2 +0,2 @@

- [`Create(amqpUrl)`](#createamqpurl---amqp)
- [`Create(amqpUrl)`](#createamqpurl-socketoptions---amqp)
- [`AMQP.consume(config, handler)`](#amqpconsumeconfig-handler---cancellationpromise)

@@ -8,3 +8,3 @@ - [`AMQP.publish(config, key, message, [options])`](#amqppublishconfig-key-message-options---promise)

### `Create(amqpUrl)` -> `AMQP`
### `Create(amqpUrl, [socketOptions])` -> `AMQP`
Create an `AMQP` which connects to `amqpUrl`. E.g.,

@@ -15,2 +15,5 @@ ```javascript

[`socketOptions`](http://www.squaremobius.net/amqp.node/channel_api.html#connect)
default to `maxChannels: 100`.
### `AMQP.consume(config, handler)` -> `CancellationPromise`

@@ -17,0 +20,0 @@ Asserts queue and exchange specified in [`config`](#config) and binds them

@@ -8,3 +8,4 @@ 'use strict';

diehard = require('diehard'),
connections = {};
connections = {},
sendChannels = {};

@@ -32,6 +33,9 @@ function cleanup(done) {

module.exports = function (amqpUrl) {
module.exports = function (amqpUrl, socketOptions) {
function connect() {
if (!connections[amqpUrl]) {
connections[amqpUrl] = BPromise.resolve(amqp.connect(amqpUrl));
socketOptions = defaults({}, socketOptions || {}, {
channelMax: 100
});
connections[amqpUrl] = BPromise.resolve(amqp.connect(amqpUrl, socketOptions));
}

@@ -41,2 +45,19 @@ return connections[amqpUrl];

function sendChannel() {
if (!sendChannels[amqpUrl]) {
sendChannels[amqpUrl] = connect()
.then(function (connection) {
return connection.createConfirmChannel()
.then(function (channel) {
channel.on('error', function () {
sendChannels[amqpUrl] = null;
// clear out the channel since it's in a bad state
});
return channel;
});
});
}
return sendChannels[amqpUrl];
}
function createChannel() {

@@ -48,9 +69,2 @@ return connect().then(function (connection) {

function createChannelDisposer() {
return createChannel()
.disposer(function (channel) {
return channel.close();
});
}
function consume(queueConfig, handler) {

@@ -142,5 +156,4 @@ var options = defaults({}, queueConfig || {}, {

function publish(queueConfig, key, json, messageOptions) {
return BPromise.using(
createChannelDisposer(),
function (ch) {
return sendChannel()
.then(function (ch) {
if (queueConfig.exchange === null || queueConfig.exchange === undefined) {

@@ -154,6 +167,16 @@ throw new Error('Client tries to publish to an exchange while exchange name is not undefined.');

.then(function () {
return ch.publish(queueConfig.exchange,
key,
toBuffer(json),
messageOptions || queueConfig.messageOptions || {persistent: true});
return new BPromise(function (resolve, reject) {
ch.publish(queueConfig.exchange,
key,
toBuffer(json),
messageOptions || queueConfig.messageOptions || {persistent: true},
function (err) {
if (err) {
reject(err);
} else {
resolve();
}
}
);
});
});

@@ -165,12 +188,20 @@ }

function sendToQueue(queueConfig, json, messageOptions) {
return BPromise.using(
createChannelDisposer(),
function (ch) {
return sendChannel()
.then(function (ch) {
return ch.assertQueue(queueConfig.queue, queueConfig.queueOptions || {durable: true})
.then(function () {
return ch.sendToQueue(
queueConfig.queue,
toBuffer(json),
messageOptions || queueConfig.messageOptions || {persistent: true}
);
return new BPromise(function (resolve, reject) {
ch.sendToQueue(
queueConfig.queue,
toBuffer(json),
messageOptions || queueConfig.messageOptions || {persistent: true},
function (err) {
if (err) {
reject(err);
} else {
resolve();
}
}
);
});
});

@@ -177,0 +208,0 @@ }

{
"name": "amqplib-easy",
"version": "3.4.0",
"version": "4.0.0",
"description": "Simplified API for interacting with AMQP",

@@ -5,0 +5,0 @@ "main": "index.js",

@@ -34,3 +34,3 @@ /*globals it:false*/

describe('', function () {
describe('consumer', function () {
var cancel;

@@ -129,2 +129,23 @@

});
it('should publish even if something causes the channel to die', function (done) {
amqp.consume(
{
exchange: 'cat',
queue: 'found_cats',
topics: [ 'found.*' ]
},
function () {
done();
}
)
.then(function (c) {
cancel = c;
return amqp.publish({ exchange: 'cat', exchangeType: 'direct' }, 'found.tawny', { name: 'Sally' })
.catch(function () {
return amqp.publish({ exchange: 'cat', exchangeType: 'topic' }, 'found.tawny', { name: 'Sally' });
});
})
.catch(done);
});
});

@@ -131,0 +152,0 @@

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc