Socket
Socket
Sign inDemoInstall

amqp-connection-manager

Package Overview
Dependencies
Maintainers
13
Versions
67
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

amqp-connection-manager - npm Package Compare versions

Comparing version 1.4.2 to 2.0.0

.babelrc

13

CHANGELOG.md

@@ -0,1 +1,14 @@

<a name="2.0.0"></a>
# [2.0.0](https://github.com/benbria/node-amqp-connection-manager/compare/v1.4.2...v2.0.0) (2018-05-05)
### Code Refactoring
* Rewrite all source in javascript. ([377d01d](https://github.com/benbria/node-amqp-connection-manager/commit/377d01d))
### BREAKING CHANGES
* Officially dropping support for node v4.x.x.
1.4.0

@@ -2,0 +15,0 @@ -----

307

lib/AmqpConnectionManager.js

@@ -1,58 +0,84 @@

// Generated by CoffeeScript 2.2.4
(function() {
var AmqpConnectionManager, ChannelWrapper, EventEmitter, HEARTBEAT_IN_SECONDS, _, amqp, pb, urlUtils, wait;
'use strict';
({EventEmitter} = require('events'));
Object.defineProperty(exports, "__esModule", {
value: true
});
amqp = require('amqplib');
var _events = require('events');
urlUtils = require('url');
var _amqplib = require('amqplib');
_ = require('lodash');
var _amqplib2 = _interopRequireDefault(_amqplib);
ChannelWrapper = require('./ChannelWrapper');
var _url = require('url');
({wait} = require('./helpers'));
var _url2 = _interopRequireDefault(_url);
pb = require('promise-breaker');
var _ChannelWrapper = require('./ChannelWrapper');
// Default heartbeat time.
HEARTBEAT_IN_SECONDS = 5;
var _ChannelWrapper2 = _interopRequireDefault(_ChannelWrapper);
// Events:
// * `connect({connection, url})` - Emitted whenever we connect to a broker.
// * `disconnect({err})` - Emitted whenever we disconnect from a broker.
var _helpers = require('./helpers');
AmqpConnectionManager = class AmqpConnectionManager extends EventEmitter {
// Create a new AmqplibConnectionManager.
var _promiseBreaker = require('promise-breaker');
// * `urls` is an array of brokers to connect to. AmqplibConnectionManager will round-robin between them
// whenever it needs to create a new connection.
// * `options.heartbeatIntervalInSeconds` is the interval, in seconds, to send heartbeats. Defaults to 5 seconds.
// * `options.reconnectTimeInSeconds` is the time to wait before trying to reconnect. If not specified,
// defaults to `heartbeatIntervalInSeconds`.
// * `options.connectionOptions` is passed to the amqplib connect method.
// * `options.findServers(callback)` is a function which returns one or more servers to connect to. This should
// return either a single URL or an array of URLs. This is handy when you're using a service discovery mechanism
// such as Consul or etcd. Instead of taking a `callback`, this can also return a Promise. Note that if this
// is supplied, then `urls` is ignored.
var _promiseBreaker2 = _interopRequireDefault(_promiseBreaker);
function _interopRequireDefault(obj) { return obj && obj.__esModule ? obj : { default: obj }; }
// Default heartbeat time.
const HEARTBEAT_IN_SECONDS = 5;
/* istanbul ignore next */
function neverThrows() {
return err => setImmediate(() => {
throw new Error(`AmqpConnectionManager - should never get here: ${err.message}\n` + err.stack);
});
}
//
// Events:
// * `connect({connection, url})` - Emitted whenever we connect to a broker.
// * `disconnect({err})` - Emitted whenever we disconnect from a broker.
//
class AmqpConnectionManager extends _events.EventEmitter {
/**
* Create a new AmqplibConnectionManager.
*
* @param {string[]} urls - An array of brokers to connect to.
* AmqplibConnectionManager will round-robin between them whenever it
* needs to create a new connection.
* @param {Object} [options={}] -
* @param {number} [options.heartbeatIntervalInSeconds=5] - The interval,
* in seconds, to send heartbeats.
* @param {number} [options.reconnectTimeInSeconds] - The time to wait
* before trying to reconnect. If not specified, defaults to
* `heartbeatIntervalInSeconds`.
* @param {Object} [options.connectionOptions] - Passed to the amqplib
* connect method.
* @param {function} [options.findServers] - A `fn(callback)` or a `fn()`
* which returns a Promise. This should resolve to one or more servers
* to connect to, either a single URL or an array of URLs. This is handy
* when you're using a service discovery mechanism such as Consul or etcd.
* Note that if this is supplied, then `urls` is ignored.
*/
constructor(urls, options = {}) {
var ref, ref1, ref2;
super();
if (!urls && !options.findServers) {
throw new Error("Must supply either `urls` or `findServers`");
}
this._channels = [];
this._currentUrl = 0;
this.connectionOptions = options.connectionOptions;
this.heartbeatIntervalInSeconds = (ref = options.heartbeatIntervalInSeconds) != null ? ref : HEARTBEAT_IN_SECONDS;
this.reconnectTimeInSeconds = (ref1 = options.reconnectTimeInSeconds) != null ? ref1 : this.heartbeatIntervalInSeconds;
// There will be one listener per channel, and there could be a lot of channels, so disable warnings from node.
this.setMaxListeners(0);
this._findServers = (ref2 = options.findServers) != null ? ref2 : (function() {
return Promise.resolve(urls);
});
this._connect();
super();
if (!urls && !options.findServers) {
throw new Error("Must supply either `urls` or `findServers`");
}
this._channels = [];
this._currentUrl = 0;
this.connectionOptions = options.connectionOptions;
this.heartbeatIntervalInSeconds = options.heartbeatIntervalInSeconds || HEARTBEAT_IN_SECONDS;
this.reconnectTimeInSeconds = options.reconnectTimeInSeconds || this.heartbeatIntervalInSeconds;
// There will be one listener per channel, and there could be a lot of channels, so disable warnings from node.
this.setMaxListeners(0);
this._findServers = options.findServers || (() => Promise.resolve(urls));
this._connect();
}

@@ -62,114 +88,113 @@

createChannel(options = {}) {
var channel;
channel = new ChannelWrapper(this, options);
this._channels.push(channel);
channel.once('close', () => {
return this._channels = _.without(this._channels, channel);
});
return channel;
const channel = new _ChannelWrapper2.default(this, options);
this._channels.push(channel);
channel.once('close', () => {
this._channels = this._channels.filter(c => c !== channel);
});
return channel;
}
close() {
if (this._closed) {
return;
}
this._closed = true;
return Promise.all(this._channels.map(function(channel) {
return channel.close();
// Ignore errors closing channels.
})).catch(function() {}).then(() => {
var ref;
this._channels = [];
if ((ref = this._currentConnection) != null) {
ref.close();
if (this._closed) {
return Promise.resolve();
}
return this._currentConnection = null;
});
this._closed = true;
return Promise.all(this._channels.map(channel => channel.close())).catch(function () {
// Ignore errors closing channels.
}).then(() => {
this._channels = [];
if (this._currentConnection) {
this._currentConnection.close();
}
this._currentConnection = null;
});
}
isConnected() {
return this._currentConnection != null;
return !!this._currentConnection;
}
_connect() {
if (this._closed || this._connecting || this.isConnected()) {
return Promise.resolve();
}
this._connecting = true;
return Promise.resolve().then(() => {
if (!this._urls || (this._currentUrl >= this._urls.length)) {
this._currentUrl = 0;
return pb.callFn(this._findServers, 0, null);
} else {
return this._urls;
if (this._closed || this._connecting || this.isConnected()) {
return Promise.resolve();
}
}).then((urls) => {
var amqpUrl, url;
if ((urls != null) && !_.isArray(urls)) {
urls = [urls];
}
this._urls = urls;
if (!urls || urls.length === 0) {
throw new Error('amqp-connection-manager: No servers found');
}
// Round robin between brokers
url = urls[this._currentUrl];
this._currentUrl++;
amqpUrl = urlUtils.parse(url);
if (amqpUrl.search != null) {
amqpUrl.search += `&heartbeat=${this.heartbeatIntervalInSeconds}`;
} else {
amqpUrl.search = `?heartbeat=${this.heartbeatIntervalInSeconds}`;
}
return amqp.connect(urlUtils.format(amqpUrl), this.connectionOptions).then((connection) => {
this._currentConnection = connection;
//emit 'blocked' when RabbitMQ server decides to block the connection (resources running low)
connection.on('blocked', (reason) => {
return this.emit('blocked', {reason});
});
connection.on('unblocked', () => {
return this.emit('unblocked');
});
// Reconnect if the broker goes away.
connection.on('error', (err) => {
return Promise.resolve().then(function() {
return this._currentConnection.close();
}).catch(function(err) {}).then(() => { // Ignore
this._currentConnection = null;
this.emit('disconnect', {err});
return this._connect();
}).catch(function(err) {
/* !pragma coverage-skip-block */
// `_connect()` should never throw.
return console.error("amqp-connection-manager: AmqpConnectionManager:_connect()" + " - How did you get here?", err.stack);
this._connecting = true;
return Promise.resolve().then(() => {
if (!this._urls || this._currentUrl >= this._urls.length) {
this._currentUrl = 0;
return _promiseBreaker2.default.callFn(this._findServers, 0, null);
} else {
return this._urls;
}
}).then(urls => {
if (urls && !Array.isArray(urls)) {
urls = [urls];
}
this._urls = urls;
if (!urls || urls.length === 0) {
throw new Error('amqp-connection-manager: No servers found');
}
// Round robin between brokers
const url = urls[this._currentUrl];
this._currentUrl++;
const amqpUrl = _url2.default.parse(url);
if (amqpUrl.search) {
amqpUrl.search += `&heartbeat=${this.heartbeatIntervalInSeconds}`;
} else {
amqpUrl.search = `?heartbeat=${this.heartbeatIntervalInSeconds}`;
}
return _amqplib2.default.connect(_url2.default.format(amqpUrl), this.connectionOptions).then(connection => {
this._currentConnection = connection;
//emit 'blocked' when RabbitMQ server decides to block the connection (resources running low)
connection.on('blocked', reason => this.emit('blocked', { reason }));
connection.on('unblocked', () => this.emit('unblocked'));
// Reconnect if the broker goes away.
connection.on('error', err => Promise.resolve().then(() => this._currentConnection.close()).catch(() => {/* Ignore */}).then(() => {
this._currentConnection = null;
this.emit('disconnect', { err });
return this._connect();
})
// `_connect()` should never throw.
.catch(neverThrows));
// Reconnect if the connection closes gracefully
connection.on('close', err => {
this._currentConnection = null;
this.emit('disconnect', { err });
(0, _helpers.wait)(this.reconnectTimeInSeconds * 1000).then(() => this._connect())
// `_connect()` should never throw.
.catch(neverThrows);
});
this._connecting = false;
this.emit('connect', { connection, url });
return null;
});
});
// Reconnect if the connection closes gracefully
connection.on('close', (err) => {
}).catch(err => {
this.emit('disconnect', { err });
// Connection failed...
this._currentConnection = null;
this.emit('disconnect', {err});
return wait(this.reconnectTimeInSeconds * 1000).then(() => {
return this._connect();
// TODO: Probably want to try right away here, especially if there are multiple brokers to try...
return (0, _helpers.wait)(this.reconnectTimeInSeconds * 1000).then(() => {
this._connecting = false;
return this._connect();
});
});
this._connecting = false;
this.emit('connect', {connection, url});
return null;
});
}).catch((err) => {
this.emit('disconnect', {err});
// Connection failed...
this._currentConnection = null;
// TODO: Probably want to try right away here, especially if there are multiple brokers to try...
return wait(this.reconnectTimeInSeconds * 1000).then(() => {
this._connecting = false;
return this._connect();
});
});
}
};
module.exports = AmqpConnectionManager;
}).call(this);
}
exports.default = AmqpConnectionManager;
//# sourceMappingURL=AmqpConnectionManager.js.map

@@ -1,352 +0,375 @@

// Generated by CoffeeScript 2.2.4
(function() {
var ChannelWrapper, EventEmitter, Promise, _, pb, ref,
boundMethodCheck = function(instance, Constructor) { if (!(instance instanceof Constructor)) { throw new Error('Bound instance method accessed before binding'); } };
'use strict';
Promise = (ref = global.Promise) != null ? ref : require('es6-promise').Promise;
Object.defineProperty(exports, "__esModule", {
value: true
});
({EventEmitter} = require('events'));
var _events = require('events');
_ = require('lodash');
var _promiseBreaker = require('promise-breaker');
pb = require('promise-breaker');
var _promiseBreaker2 = _interopRequireDefault(_promiseBreaker);
ChannelWrapper = (function() {
// Calls to `publish()` or `sendToQueue()` work just like in amqplib, but messages are queued internally and
// are guaranteed to be delivered. If the underlying connection drops, ChannelWrapper will wait for a new
// connection and continue.
function _interopRequireDefault(obj) { return obj && obj.__esModule ? obj : { default: obj }; }
// Events:
// * `connect` - emitted every time this channel connects or reconnects.
// * `error(err, {name})` - emitted if an error occurs setting up the channel.
// * `drop({message, err})` - called when a JSON message was dropped because it could not be encoded.
// * `close` - emitted when this channel closes via a call to `close()`
/**
* Calls to `publish()` or `sendToQueue()` work just like in amqplib, but messages are queued internally and
* are guaranteed to be delivered. If the underlying connection drops, ChannelWrapper will wait for a new
* connection and continue.
*
* Events:
* * `connect` - emitted every time this channel connects or reconnects.
* * `error(err, {name})` - emitted if an error occurs setting up the channel.
* * `drop({message, err})` - called when a JSON message was dropped because it could not be encoded.
* * `close` - emitted when this channel closes via a call to `close()`
*
*/
class ChannelWrapper extends _events.EventEmitter {
/**
* Adds a new 'setup handler'.
*
* `setup(channel, [cb])` is a function to call when a new underlying channel is created - handy for asserting
* exchanges and queues exists, and whatnot. The `channel` object here is a ConfigChannel from amqplib.
* The `setup` function should return a Promise (or optionally take a callback) - no messages will be sent until
* this Promise resolves.
*
* If there is a connection, `setup()` will be run immediately, and the addSetup Promise/callback won't resolve
* until `setup` is complete. Note that in this case, if the setup throws an error, no 'error' event will
* be emitted, since you can just handle the error here (although the `setup` will still be added for future
* reconnects, even if it throws an error.)
*
* Setup functions should, ideally, not throw errors, but if they do then the ChannelWrapper will emit an 'error'
* event.
*
* @param {function} setup - setup function.
* @param {function} [done] - callback.
* @returns {void | Promise} - Resolves when complete.
*/
addSetup(setup, done = null) {
return _promiseBreaker2.default.addCallback(done, (this._settingUp || Promise.resolve()).then(() => {
this._setups.push(setup);
if (this._channel) {
return _promiseBreaker2.default.call(setup, null, this._channel);
} else {
return undefined;
}
}));
}
class ChannelWrapper extends EventEmitter {
// Create a new ChannelWrapper.
/**
* Remove a setup function added with `addSetup`. If there is currently a
* connection, `teardown(channel, [cb])` will be run immediately, and the
* returned Promise will not resolve until it completes.
*
* @param {function} setup - the setup function to remove.
* @param {function} [teardown] - `function(channel, [cb])` to run to tear
* down the chanel.
* @param {function} [done] - Optional callback.
* @returns {void | Promise} - Resolves when complete.
*/
removeSetup(setup, teardown = null, done = null) {
return _promiseBreaker2.default.addCallback(done, () => {
this._setups = this._setups.filter(s => s !== setup);
// * `options.name` is a name for this channel. Handy for debugging.
// * `options.setup` is a default setup function to call. See `addSetup` for details.
// * `options.json` if true, then ChannelWrapper assumes all messages passed to `publish()` and `sendToQueue()`
// are plain JSON objects. These will be encoded automatically before being sent.
return (this._settingUp || Promise.resolve()).then(() => this._channel ? _promiseBreaker2.default.call(teardown, null, this._channel) : undefined);
});
}
constructor(connectionManager, options = {}) {
var ref1;
/**
* Returns a Promise which resolves when this channel next connects.
* (Mainly here for unit testing...)
*
* @param {function} [done] - Optional callback.
* @returns {void | Promise} - Resolves when connected.
*/
waitForConnect(done = null) {
return _promiseBreaker2.default.addCallback(done, this._channel && !this._settingUp ? Promise.resolve() : new Promise(resolve => this.once('connect', resolve)));
}
/*
* Publish a message to the channel.
*
* This works just like amqplib's `publish()`, except if the channel is not
* connected, this will wait until the channel is connected. Returns a
* Promise which will only resolve when the message has been succesfully sent.
* The returned promise will be rejected if `close()` is called on this
* channel before it can be sent, if `options.json` is set and the message
* can't be encoded, or if the broker rejects the message for some reason.
*
*/
publish(exchange, routingKey, content, options, done = null) {
return _promiseBreaker2.default.addCallback(done, new Promise((resolve, reject) => {
this._messages.push({
type: 'publish',
exchange,
routingKey,
content,
options,
resolve,
reject
});
this._startWorker();
}));
}
/*
* Send a message to a queue.
*
* This works just like amqplib's `sendToQueue`, except if the channel is not connected, this will wait until the
* channel is connected. Returns a Promise which will only resolve when the message has been succesfully sent.
* The returned promise will be rejected only if `close()` is called on this channel before it can be sent.
*
* `message` here should be a JSON-able object.
*/
sendToQueue(queue, content, options, done = null) {
return _promiseBreaker2.default.addCallback(done, new Promise((resolve, reject) => {
this._messages.push({
type: 'sendToQueue',
queue,
content,
options,
resolve,
reject
});
return this._startWorker();
}));
}
/**
* Create a new ChannelWrapper.
*
* @param {AmqpConnectionManager} connectionManager - connection manager which
* created this channel.
* @param {Object} [options] -
* @param {string} [options.name] - A name for this channel. Handy for debugging.
* @param {function} [options.setup] - A default setup function to call. See
* `addSetup` for details.
* @param {boolean} [options.json] - if true, then ChannelWrapper assumes all
* messages passed to `publish()` and `sendToQueue()` are plain JSON objects.
* These will be encoded automatically before being sent.
*
*/
constructor(connectionManager, options = {}) {
super();
// Called whenever we connect to the broker.
this._onConnect = this._onConnect.bind(this);
// Wait for another reconnect to create a new channel.
// Called whenever we disconnect from the AMQP server.
this._onDisconnect = this._onDisconnect.bind(this);
this._connectionManager = connectionManager;
this.name = options.name;
this._json = (ref1 = options.json) != null ? ref1 : false;
this._json = 'json' in options ? options.json : false;
// Place to store queued messages.
this._messages = [];
// True if the "worker" is busy sending messages. False if we need to start the worker to get stuff done.
// True if the "worker" is busy sending messages. False if we need to
// start the worker to get stuff done.
this._working = false;
// If we're in the process of creating a channel, this is a Promise which will resolve when the channel is
// set up. Otherwise, this is `null`.
// If we're in the process of creating a channel, this is a Promise which
// will resolve when the channel is set up. Otherwise, this is `null`.
this._settingUp = null;
// The currently connected channel. Note that not all setup functions have been run on this channel until
// `@_settingUp` is either null or resolved.
// The currently connected channel. Note that not all setup functions
// have been run on this channel until `@_settingUp` is either null or
// resolved.
this._channel = null;
// We kill off workers when we disconnect. Whenever we start a new worker, we bump up the `_workerNumber` -
// this makes it so if stale workers ever do wake up, they'll know to stop working.
// We kill off workers when we disconnect. Whenever we start a new
// worker, we bump up the `_workerNumber` - this makes it so if stale
// workers ever do wake up, they'll know to stop working.
this._workerNumber = 0;
// Array of setup functions to call.
this._setups = [];
if (options.setup != null) {
this._setups.push(options.setup);
if (options.setup) {
this._setups.push(options.setup);
}
if (connectionManager.isConnected()) {
this._onConnect({
connection: this._connectionManager._currentConnection
});
this._onConnect({
connection: this._connectionManager._currentConnection
});
}
connectionManager.on('connect', this._onConnect);
connectionManager.on('disconnect', this._onDisconnect);
}
}
_onConnect({connection}) {
boundMethodCheck(this, ChannelWrapper);
// Called whenever we connect to the broker.
_onConnect({ connection }) {
this._connection = connection;
return connection.createConfirmChannel().then((channel) => {
this._channel = channel;
channel.on('close', () => {
return this._onChannelClose(channel);
});
return this._settingUp = Promise.all(this._setups.map((setupFn) => {
return connection.createConfirmChannel().then(channel => {
this._channel = channel;
channel.on('close', () => this._onChannelClose(channel));
this._settingUp = Promise.all(this._setups.map(setupFn =>
// TODO: Use a timeout here to guard against setupFns that never resolve?
return pb.call(setupFn, null, channel).catch((err) => {
if (this._channel) {
return this.emit('error', err, {
name: this.name
});
} else {
_promiseBreaker2.default.call(setupFn, null, channel).catch(err => {
if (this._channel) {
this.emit('error', err, { name: this.name });
} else {
// Don't emit an error if setups failed because the channel was closing.
}
}))).then(() => {
this._settingUp = null;
return this._channel;
});
}
});
// Don't emit an error if setups failed because the channel was closing.
})).then(() => {
return this._settingUp;
}).then(() => {
if (!this._channel) {
// Can happen if channel closes while we're setting up.
return;
}
// Since we just connected, publish any queued messages
this._startWorker();
this.emit('connect');
}).catch(err => {
this.emit('error', err, { name: this.name });
this._settingUp = null;
return this._channel;
});
}).then(() => {
if (this._channel == null) { // Can happen if channel closes while we're setting up.
return;
}
// Since we just connected, publish any queued messages
this._startWorker();
return this.emit('connect');
}).catch((err) => {
this.emit('error', err, {
name: this.name
});
this._settingUp = null;
return this._channel = null;
this._channel = null;
});
}
}
// Called whenever the channel closes.
_onChannelClose(channel) {
// Called whenever the channel closes.
_onChannelClose(channel) {
if (this._channel === channel) {
return this._channel = null;
this._channel = null;
}
}
}
// Wait for another reconnect to create a new channel.
_onDisconnect() {
boundMethodCheck(this, ChannelWrapper);
// Called whenever we disconnect from the AMQP server.
_onDisconnect() {
this._channel = null;
this._settingUp = null;
// Kill off the current worker. We never get any kind of error for messages in flight - see
// https://github.com/squaremo/amqp.node/issues/191.
return this._working = false;
}
this._working = false;
}
// Returns the number of unsent messages queued on this channel.
queueLength() {
// Returns the number of unsent messages queued on this channel.
queueLength() {
return this._messages.length;
}
}
// Destroy this channel.
// Any unsent messages will have their associated Promises rejected.
close() {
// Destroy this channel.
//
// Any unsent messages will have their associated Promises rejected.
//
close() {
return Promise.resolve().then(() => {
var answer, ref1, ref2;
this._working = false;
if (this._messages.length !== 0) {
// Reject any unsent messages.
this._messages.forEach(function(message) {
return message.reject(new Error('Channel closed'));
});
}
this._connectionManager.removeListener('connect', this._onConnect);
this._connectionManager.removeListener('disconnect', this._onDisconnect);
answer = (ref1 = (ref2 = this._channel) != null ? ref2.close() : void 0) != null ? ref1 : Promise.resolve();
this._channel = null;
this.emit('close');
return answer;
});
}
this._working = false;
if (this._messages.length !== 0) {
// Reject any unsent messages.
this._messages.forEach(message => message.reject(new Error('Channel closed')));
}
_shouldPublish() {
return (this._messages.length > 0) && !this._settingUp && this._channel;
}
this._connectionManager.removeListener('connect', this._onConnect);
this._connectionManager.removeListener('disconnect', this._onDisconnect);
const answer = this._channel && this._channel.close() || undefined;
this._channel = null;
// Start publishing queued messages, if there isn't already a worker doing this.
_startWorker() {
if (!this._working && this._shouldPublish()) {
this._working = true;
this._workerNumber++;
return this._publishQueuedMessages(this._workerNumber);
}
}
this.emit('close');
_publishQueuedMessages(workerNumber) {
var channel, message;
if (!this._shouldPublish() || !this._working || (workerNumber !== this._workerNumber)) {
// Can't publish anything right now...
this._working = false;
return Promise.resolve();
}
channel = this._channel;
message = this._messages[0];
Promise.resolve().then(() => {
var encodedMessage, sendPromise;
encodedMessage = this._json ? new Buffer.from(JSON.stringify(message.content)) : message.content;
sendPromise = (function() {
switch (message.type) {
case 'publish':
return new Promise(function(resolve, reject) {
var result;
return result = channel.publish(message.exchange, message.routingKey, encodedMessage, message.options, function(err) {
if (err) {
return reject(err);
}
return setImmediate(function() {
return resolve(result);
});
});
});
case 'sendToQueue':
return new Promise(function(resolve, reject) {
var result;
return result = channel.sendToQueue(message.queue, encodedMessage, message.options, function(err) {
if (err) {
return reject(err);
}
return setImmediate(function() {
return resolve(result);
});
});
});
default:
/* !pragma coverage-skip-block */
throw new Error(`Unhandled message type ${message.type}`);
}
})();
return sendPromise;
}).then((result) => {
this._messages.shift();
message.resolve(result);
// Send some more!
return this._publishQueuedMessages(workerNumber);
}, (err) => {
if (!this._channel) {
} else {
// Something went wrong trying to send this message - could be JSON.stringify failed, could be the
// broker rejected the message. Either way, reject it back
// Tried to write to a closed channel. Leave the message in the queue and we'll try again when we
// reconnect.
this._messages.shift();
message.reject(err);
// Send some more!
return this._publishQueuedMessages(workerNumber);
}
}).catch((err) => {
/* !pragma coverage-skip-block */
console.error("amqp-connection-manager: ChannelWrapper:_publishQueuedMessages() - How did you get here?", err.stack);
this.emit('error', err);
return this._working = false;
return answer;
});
return null;
}
}
// Send an `ack` to the underlying channel.
ack(...args) {
var ref1;
return (ref1 = this._channel) != null ? ref1.ack(...args) : void 0;
}
_shouldPublish() {
return this._messages.length > 0 && !this._settingUp && this._channel;
}
// Send a `nack` to the underlying channel.
nack(...args) {
var ref1;
return (ref1 = this._channel) != null ? ref1.nack(...args) : void 0;
}
};
// Adds a new 'setup handler'.
// `setup(channel, [cb])` is a function to call when a new underlying channel is created - handy for asserting
// exchanges and queues exists, and whatnot. The `channel` object here is a ConfigChannel from amqplib.
// The `setup` function should return a Promise (or optionally take a callback) - no messages will be sent until
// this Promise resolves.
// If there is a connection, `setup()` will be run immediately, and the addSetup Promise/callback won't resolve
// until `setup` is complete. Note that in this case, if the setup throws an error, no 'error' event will
// be emitted, since you can just handle the error here (although the `setup` will still be added for future
// reconnects, even if it throws an error.)
// Setup functions should, ideally, not throw errors, but if they do then the ChannelWrapper will emit an 'error'
// event.
ChannelWrapper.prototype.addSetup = pb.break(function(setup) {
return (this._settingUp || Promise.resolve()).then(() => {
this._setups.push(setup);
if (this._channel) {
return pb.call(setup, null, this._channel);
// Start publishing queued messages, if there isn't already a worker doing this.
_startWorker() {
if (!this._working && this._shouldPublish()) {
this._working = true;
this._workerNumber++;
this._publishQueuedMessages(this._workerNumber);
}
});
});
}
// Remove a setup function added with `addSetup`. If there is currently connection, `teardown(channel, [cb])` will
// be run immediately, and the returned Promise will not resolve until it completes.
ChannelWrapper.prototype.removeSetup = pb.break(function(setup, teardown) {
this._setups = _.without(this._setups, setup);
return (this._settingUp || Promise.resolve()).then(() => {
if (this._channel) {
return pb.call(teardown, null, this._channel);
_publishQueuedMessages(workerNumber) {
if (!this._shouldPublish() || !this._working || workerNumber !== this._workerNumber) {
// Can't publish anything right now...
this._working = false;
return Promise.resolve();
}
});
});
// Returns a Promise which resolves when this channel next connects.
// (Mainly here for unit testing...)
ChannelWrapper.prototype.waitForConnect = pb.break(function() {
if (this._channel && !this._settingUp) {
return Promise.resolve();
} else {
return new Promise((resolve) => {
return this.once('connect', resolve);
});
}
});
const channel = this._channel;
const message = this._messages[0];
// Publish a message to the channel.
Promise.resolve().then(() => {
const encodedMessage = this._json ? new Buffer.from(JSON.stringify(message.content)) : message.content;
// This works just like amqplib's `publish()`, except if the channel is not connected, this will wait until the
// channel is connected. Returns a Promise which will only resolve when the message has been succesfully sent.
// The returned promise will be rejected if `close()` is called on this channel before it can be sent, if
// `options.json` is set and the message can't be encoded, or if the broker rejects the message for some reason.
const sendPromise = (() => {
switch (message.type) {
case 'publish':
return new Promise(function (resolve, reject) {
const result = channel.publish(message.exchange, message.routingKey, encodedMessage, message.options, err => {
if (err) {
reject(err);
} else {
setImmediate(() => resolve(result));
}
});
});
case 'sendToQueue':
return new Promise(function (resolve, reject) {
const result = channel.sendToQueue(message.queue, encodedMessage, message.options, err => {
if (err) {
reject(err);
} else {
setImmediate(() => resolve(result));
}
});
});
ChannelWrapper.prototype.publish = pb.break(function(exchange, routingKey, content, options) {
return new Promise((resolve, reject) => {
this._messages.push({
type: 'publish',
exchange,
routingKey,
content,
options,
resolve,
reject
});
return this._startWorker();
});
});
/* istanbul ignore next */
default:
throw new Error(`Unhandled message type ${message.type}`);
}
})();
// Send a message to a queue.
return sendPromise;
}).then(result => {
this._messages.shift();
message.resolve(result);
// This works just like amqplib's `sendToQueue`, except if the channel is not connected, this will wait until the
// channel is connected. Returns a Promise which will only resolve when the message has been succesfully sent.
// The returned promise will be rejected only if `close()` is called on this channel before it can be sent.
// Send some more!
this._publishQueuedMessages(workerNumber);
}, err => {
if (!this._channel) {
// Tried to write to a closed channel. Leave the message in the queue and we'll try again when we
// reconnect.
} else {
// Something went wrong trying to send this message - could be JSON.stringify failed, could be the
// broker rejected the message. Either way, reject it back
this._messages.shift();
message.reject(err);
// `message` here should be a JSON-able object.
ChannelWrapper.prototype.sendToQueue = pb.break(function(queue, content, options) {
return new Promise((resolve, reject) => {
this._messages.push({
type: 'sendToQueue',
queue,
content,
options,
resolve,
reject
// Send some more!
this._publishQueuedMessages(workerNumber);
}
}).catch( /* istanbul ignore next */err => {
this.emit('error', err);
this._working = false;
});
return this._startWorker();
});
});
return ChannelWrapper;
return null;
}
}).call(this);
// Send an `ack` to the underlying channel.
ack() {
return this._channel && this._channel.ack.apply(this._channel, arguments);
}
module.exports = ChannelWrapper;
}).call(this);
// Send a `nack` to the underlying channel.
nack() {
return this._channel && this._channel.nack.apply(this._channel, arguments);
}
}
exports.default = ChannelWrapper;
//# sourceMappingURL=ChannelWrapper.js.map

@@ -1,13 +0,12 @@

// Generated by CoffeeScript 2.2.4
(function() {
var Promise, ref;
"use strict";
Promise = (ref = global.Promise) != null ? ref : require('es6-promise').Promise;
exports.wait = function(timeInMs) {
return new Promise(function(resolve, reject) {
return setTimeout(resolve, timeInMs);
Object.defineProperty(exports, "__esModule", {
value: true
});
exports.wait = wait;
function wait(timeInMs) {
return new Promise(function (resolve) {
return setTimeout(resolve, timeInMs);
});
};
}).call(this);
}
//# sourceMappingURL=helpers.js.map

@@ -1,11 +0,17 @@

// Generated by CoffeeScript 2.2.4
(function() {
var AmqpConnectionManager;
'use strict';
AmqpConnectionManager = require('./AmqpConnectionManager');
Object.defineProperty(exports, "__esModule", {
value: true
});
exports.connect = connect;
exports.connect = function(urls, options) {
return new AmqpConnectionManager(urls, options);
};
var _AmqpConnectionManager = require('./AmqpConnectionManager');
}).call(this);
var _AmqpConnectionManager2 = _interopRequireDefault(_AmqpConnectionManager);
function _interopRequireDefault(obj) { return obj && obj.__esModule ? obj : { default: obj }; }
function connect(urls, options) {
return new _AmqpConnectionManager2.default(urls, options);
}
//# sourceMappingURL=index.js.map
{
"name": "amqp-connection-manager",
"version": "1.4.2",
"version": "2.0.0",
"description": "Auto-reconnect and round robin support for amqplib.",
"main": "lib/index.js",
"dependencies": {
"es6-promise": "^4.1.1",
"lodash": "^4.15.0",
"promise-breaker": "^4.1.2",
"when": "^3.7.3"
"promise-breaker": "^4.1.2"
},

@@ -16,22 +13,50 @@ "peerDependencies": {

"devDependencies": {
"@semantic-release/changelog": "^2.0.2",
"@semantic-release/git": "^4.0.3",
"amqplib": "^0.5.1",
"babel-cli": "^6.26.0",
"babel-core": "^6.26.3",
"babel-env": "^2.4.1",
"babel-plugin-istanbul": "^4.1.6",
"babel-register": "^6.26.0",
"chai": "^4.1.2",
"chai-as-promised": "^7.1.1",
"chai-string": "^1.1.2",
"coffee-coverage": "^3.0.0",
"coffeescript": "^2.2.4",
"commitlint": "^6.2.0",
"coveralls": "^3.0.0",
"eslint": "^4.19.1",
"eslint-config-benbria": "^3.0.2",
"eslint-plugin-import": "^2.11.0",
"eslint-plugin-promise": "^3.7.0",
"greenkeeper-lockfile": "^1.14.0",
"husky": "^1.0.0-rc.2",
"istanbul": "^0.4.0",
"mocha": "^5.1.1",
"nyc": "^11.7.1",
"promise-tools": "^1.1.0",
"proxyquire": "^2.0.1",
"semantic-release": "^15.2.0",
"sinon": "^5.0.1"
},
"engines": {
"node": ">=6.0.0",
"npm": ">5.0.0"
},
"scripts": {
"prepublish": "npm run build && npm test",
"build": "coffee -c -o lib src",
"prepare": "npm run build",
"prepublishOnly": "npm test",
"build": "babel -s -d lib src",
"clean": "rm -rf lib coverage",
"distclean": "npm run clean && rm -rf node_modules",
"test": "mocha \"test/**/*.{js,coffee}\" && istanbul report text-summary lcov"
"test": "npm run test:lint && npm run test:unittest",
"test:unittest": "NODE_ENV=test nyc mocha test",
"test:lint": "eslint src test",
"precommit:unittest": "NODE_ENV=test nyc mocha test --reporter progress",
"semantic-release": "semantic-release"
},
"husky": {
"hooks": {
"commit-msg": "commitlint -e $GIT_PARAMS",
"pre-commit": "npm run test:lint && npm run precommit:unittest"
}
},
"repository": {

@@ -38,0 +63,0 @@ "type": "git",

[![Build Status](https://travis-ci.org/benbria/node-amqp-connection-manager.svg?branch=master)](https://travis-ci.org/benbria/node-amqp-connection-manager)
[![Coverage Status](https://coveralls.io/repos/benbria/node-amqp-connection-manager/badge.svg?branch=master&service=github)](https://coveralls.io/github/benbria/node-amqp-connection-manager?branch=master)
[![Greenkeeper badge](https://badges.greenkeeper.io/benbria/node-amqp-connection-manager.svg)](https://greenkeeper.io/)
[![semantic-release](https://img.shields.io/badge/%20%20%F0%9F%93%A6%F0%9F%9A%80-semantic--release-e10079.svg)](https://github.com/semantic-release/semantic-release)

@@ -5,0 +6,0 @@ [![Dependency Status](https://david-dm.org/benbria/node-amqp-connection-manager.svg)](https://david-dm.org/benbria/node-amqp-connection-manager)

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc