Socket
Socket
Sign inDemoInstall

amqp-connection-manager

Package Overview
Dependencies
Maintainers
9
Versions
67
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

amqp-connection-manager - npm Package Compare versions

Comparing version 2.3.0 to 2.3.1

.mocharc.js

7

CHANGELOG.md

@@ -0,1 +1,8 @@

## [2.3.1](https://github.com/benbria/node-amqp-connection-manager/compare/v2.3.0...v2.3.1) (2019-04-01)
### Bug Fixes
* prevent too many connection attempts on error ([2760ce5](https://github.com/benbria/node-amqp-connection-manager/commit/2760ce5)), closes [#77](https://github.com/benbria/node-amqp-connection-manager/issues/77)
# [2.3.0](https://github.com/benbria/node-amqp-connection-manager/compare/v2.2.0...v2.3.0) (2018-11-20)

@@ -2,0 +9,0 @@

306

lib/AmqpConnectionManager.js

@@ -1,27 +0,20 @@

'use strict';
"use strict";
Object.defineProperty(exports, "__esModule", {
value: true
value: true
});
exports.default = void 0;
var _events = require('events');
var _events = require("events");
var _amqplib = require('amqplib');
var _amqplib = _interopRequireDefault(require("amqplib"));
var _amqplib2 = _interopRequireDefault(_amqplib);
var _url = _interopRequireDefault(require("url"));
var _url = require('url');
var _ChannelWrapper = _interopRequireDefault(require("./ChannelWrapper"));
var _url2 = _interopRequireDefault(_url);
var _helpers = require("./helpers");
var _ChannelWrapper = require('./ChannelWrapper');
var _promiseBreaker = _interopRequireDefault(require("promise-breaker"));
var _ChannelWrapper2 = _interopRequireDefault(_ChannelWrapper);
var _helpers = require('./helpers');
var _promiseBreaker = require('promise-breaker');
var _promiseBreaker2 = _interopRequireDefault(_promiseBreaker);
function _interopRequireDefault(obj) { return obj && obj.__esModule ? obj : { default: obj }; }

@@ -31,11 +24,9 @@

const HEARTBEAT_IN_SECONDS = 5;
/* istanbul ignore next */
/* istanbul ignore next */
function neverThrows() {
return err => setImmediate(() => {
throw new Error(`AmqpConnectionManager - should never get here: ${err.message}\n` + err.stack);
});
}
//
return err => setImmediate(() => {
throw new Error(`AmqpConnectionManager - should never get here: ${err.message}\n` + err.stack);
});
} //
// Events:

@@ -45,165 +36,170 @@ // * `connect({connection, url})` - Emitted whenever we connect to a broker.

//
class AmqpConnectionManager extends _events.EventEmitter {
/**
* Create a new AmqplibConnectionManager.
*
* @param {(string|Object)[]} urls - An array of brokers to connect to.
* Takes url strings or objects {url: string, connectionOptions?: object}
* If present, a broker's [connectionOptions] will be used instead
* of [options.connectionOptions] when passed to the amqplib connect method.
* AmqplibConnectionManager will round-robin between them whenever it
* needs to create a new connection.
* @param {Object} [options={}] -
* @param {number} [options.heartbeatIntervalInSeconds=5] - The interval,
* in seconds, to send heartbeats.
* @param {number} [options.reconnectTimeInSeconds] - The time to wait
* before trying to reconnect. If not specified, defaults to
* `heartbeatIntervalInSeconds`.
* @param {Object} [options.connectionOptions] - Passed to the amqplib
* connect method.
* @param {function} [options.findServers] - A `fn(callback)` or a `fn()`
* which returns a Promise. This should resolve to one or more servers
* to connect to, either a single URL or an array of URLs. This is handy
* when you're using a service discovery mechanism such as Consul or etcd.
* Note that if this is supplied, then `urls` is ignored.
*/
constructor(urls, options = {}) {
super();
if (!urls && !options.findServers) {
throw new Error("Must supply either `urls` or `findServers`");
}
this._channels = [];
/**
* Create a new AmqplibConnectionManager.
*
* @param {(string|Object)[]} urls - An array of brokers to connect to.
* Takes url strings or objects {url: string, connectionOptions?: object}
* If present, a broker's [connectionOptions] will be used instead
* of [options.connectionOptions] when passed to the amqplib connect method.
* AmqplibConnectionManager will round-robin between them whenever it
* needs to create a new connection.
* @param {Object} [options={}] -
* @param {number} [options.heartbeatIntervalInSeconds=5] - The interval,
* in seconds, to send heartbeats.
* @param {number} [options.reconnectTimeInSeconds] - The time to wait
* before trying to reconnect. If not specified, defaults to
* `heartbeatIntervalInSeconds`.
* @param {Object} [options.connectionOptions] - Passed to the amqplib
* connect method.
* @param {function} [options.findServers] - A `fn(callback)` or a `fn()`
* which returns a Promise. This should resolve to one or more servers
* to connect to, either a single URL or an array of URLs. This is handy
* when you're using a service discovery mechanism such as Consul or etcd.
* Note that if this is supplied, then `urls` is ignored.
*/
constructor(urls, options = {}) {
super();
this._currentUrl = 0;
this.connectionOptions = options.connectionOptions;
if (!urls && !options.findServers) {
throw new Error("Must supply either `urls` or `findServers`");
}
this.heartbeatIntervalInSeconds = options.heartbeatIntervalInSeconds || HEARTBEAT_IN_SECONDS;
this.reconnectTimeInSeconds = options.reconnectTimeInSeconds || this.heartbeatIntervalInSeconds;
this._channels = [];
this._currentUrl = 0;
this.connectionOptions = options.connectionOptions;
this.heartbeatIntervalInSeconds = options.heartbeatIntervalInSeconds || HEARTBEAT_IN_SECONDS;
this.reconnectTimeInSeconds = options.reconnectTimeInSeconds || this.heartbeatIntervalInSeconds; // There will be one listener per channel, and there could be a lot of channels, so disable warnings from node.
// There will be one listener per channel, and there could be a lot of channels, so disable warnings from node.
this.setMaxListeners(0);
this.setMaxListeners(0);
this._findServers = options.findServers || (() => Promise.resolve(urls));
this._findServers = options.findServers || (() => Promise.resolve(urls));
this._connect();
}
this._connect();
} // `options` here are any options that can be passed to ChannelWrapper.
// `options` here are any options that can be passed to ChannelWrapper.
createChannel(options = {}) {
const channel = new _ChannelWrapper2.default(this, options);
this._channels.push(channel);
channel.once('close', () => {
this._channels = this._channels.filter(c => c !== channel);
});
return channel;
}
close() {
if (this._closed) {
return Promise.resolve();
}
this._closed = true;
createChannel(options = {}) {
const channel = new _ChannelWrapper.default(this, options);
return Promise.all(this._channels.map(channel => channel.close())).catch(function () {
// Ignore errors closing channels.
}).then(() => {
this._channels = [];
if (this._currentConnection) {
this._currentConnection.removeAllListeners('close');
this._currentConnection.close();
}
this._currentConnection = null;
});
}
this._channels.push(channel);
isConnected() {
return !!this._currentConnection;
channel.once('close', () => {
this._channels = this._channels.filter(c => c !== channel);
});
return channel;
}
close() {
if (this._closed) {
return Promise.resolve();
}
_connect() {
if (this._closed || this._connecting || this.isConnected()) {
return Promise.resolve();
}
this._closed = true;
return Promise.all(this._channels.map(channel => channel.close())).catch(function () {// Ignore errors closing channels.
}).then(() => {
this._channels = [];
this._connecting = true;
if (this._currentConnection) {
this._currentConnection.removeAllListeners('close');
return Promise.resolve().then(() => {
if (!this._urls || this._currentUrl >= this._urls.length) {
this._currentUrl = 0;
return _promiseBreaker2.default.callFn(this._findServers, 0, null);
} else {
return this._urls;
}
}).then(urls => {
if (urls && !Array.isArray(urls)) {
urls = [urls];
}
this._urls = urls;
this._currentConnection.close();
}
if (!urls || urls.length === 0) {
throw new Error('amqp-connection-manager: No servers found');
}
this._currentConnection = null;
});
}
// Round robin between brokers
const url = urls[this._currentUrl];
this._currentUrl++;
isConnected() {
return !!this._currentConnection;
}
// url can be a string or object {url: string, connectionOptions?: object}
const urlString = url.url || url;
const connectionOptions = url.connectionOptions || this.connectionOptions;
_connect() {
if (this._closed || this._connecting || this.isConnected()) {
return Promise.resolve();
}
const amqpUrl = _url2.default.parse(urlString);
if (amqpUrl.search) {
amqpUrl.search += `&heartbeat=${this.heartbeatIntervalInSeconds}`;
} else {
amqpUrl.search = `?heartbeat=${this.heartbeatIntervalInSeconds}`;
}
this._connecting = true;
return Promise.resolve().then(() => {
if (!this._urls || this._currentUrl >= this._urls.length) {
this._currentUrl = 0;
return _promiseBreaker.default.callFn(this._findServers, 0, null);
} else {
return this._urls;
}
}).then(urls => {
if (urls && !Array.isArray(urls)) {
urls = [urls];
}
return _amqplib2.default.connect(_url2.default.format(amqpUrl), connectionOptions).then(connection => {
this._currentConnection = connection;
this._urls = urls;
//emit 'blocked' when RabbitMQ server decides to block the connection (resources running low)
connection.on('blocked', reason => this.emit('blocked', { reason }));
if (!urls || urls.length === 0) {
throw new Error('amqp-connection-manager: No servers found');
} // Round robin between brokers
connection.on('unblocked', () => this.emit('unblocked'));
// Reconnect if the broker goes away.
connection.on('error', err => Promise.resolve().then(() => this._currentConnection.close()).catch(() => {/* Ignore */}).then(() => {
this._currentConnection = null;
this.emit('disconnect', { err });
return this._connect();
})
// `_connect()` should never throw.
.catch(neverThrows));
const url = urls[this._currentUrl];
this._currentUrl++; // url can be a string or object {url: string, connectionOptions?: object}
// Reconnect if the connection closes gracefully
connection.on('close', err => {
this._currentConnection = null;
this.emit('disconnect', { err });
const urlString = url.url || url;
const connectionOptions = url.connectionOptions || this.connectionOptions;
(0, _helpers.wait)(this.reconnectTimeInSeconds * 1000).then(() => this._connect())
// `_connect()` should never throw.
.catch(neverThrows);
});
const amqpUrl = _url.default.parse(urlString);
this._connecting = false;
this.emit('connect', { connection, url: urlString });
if (amqpUrl.search) {
amqpUrl.search += `&heartbeat=${this.heartbeatIntervalInSeconds}`;
} else {
amqpUrl.search = `?heartbeat=${this.heartbeatIntervalInSeconds}`;
}
return null;
});
}).catch(err => {
this.emit('disconnect', { err });
return _amqplib.default.connect(_url.default.format(amqpUrl), connectionOptions).then(connection => {
this._currentConnection = connection; //emit 'blocked' when RabbitMQ server decides to block the connection (resources running low)
// Connection failed...
this._currentConnection = null;
connection.on('blocked', reason => this.emit('blocked', {
reason
}));
connection.on('unblocked', () => this.emit('unblocked'));
connection.on('error', () =>
/* err */
{// if this event was emitted, then the connection was already closed,
// so no need to call #close here
// also, 'close' is emitted after 'error',
// so no need for work already done in 'close' handler
}); // Reconnect if the connection closes
// TODO: Probably want to try right away here, especially if there are multiple brokers to try...
return (0, _helpers.wait)(this.reconnectTimeInSeconds * 1000).then(() => {
this._connecting = false;
return this._connect();
});
connection.on('close', err => {
this._currentConnection = null;
this.emit('disconnect', {
err
});
(0, _helpers.wait)(this.reconnectTimeInSeconds * 1000).then(() => this._connect()) // `_connect()` should never throw.
.catch(neverThrows);
});
}
this._connecting = false;
this.emit('connect', {
connection,
url: urlString
});
return null;
});
}).catch(err => {
this.emit('disconnect', {
err
}); // Connection failed...
this._currentConnection = null; // TODO: Probably want to try right away here, especially if there are multiple brokers to try...
return (0, _helpers.wait)(this.reconnectTimeInSeconds * 1000).then(() => {
this._connecting = false;
return this._connect();
});
});
}
}
exports.default = AmqpConnectionManager;
//# sourceMappingURL=AmqpConnectionManager.js.map

@@ -1,13 +0,12 @@

'use strict';
"use strict";
Object.defineProperty(exports, "__esModule", {
value: true
value: true
});
exports.default = void 0;
var _events = require('events');
var _events = require("events");
var _promiseBreaker = require('promise-breaker');
var _promiseBreaker = _interopRequireDefault(require("promise-breaker"));
var _promiseBreaker2 = _interopRequireDefault(_promiseBreaker);
function _interopRequireDefault(obj) { return obj && obj.__esModule ? obj : { default: obj }; }

@@ -28,361 +27,368 @@

class ChannelWrapper extends _events.EventEmitter {
/**
* Adds a new 'setup handler'.
*
* `setup(channel, [cb])` is a function to call when a new underlying channel is created - handy for asserting
* exchanges and queues exists, and whatnot. The `channel` object here is a ConfigChannel from amqplib.
* The `setup` function should return a Promise (or optionally take a callback) - no messages will be sent until
* this Promise resolves.
*
* If there is a connection, `setup()` will be run immediately, and the addSetup Promise/callback won't resolve
* until `setup` is complete. Note that in this case, if the setup throws an error, no 'error' event will
* be emitted, since you can just handle the error here (although the `setup` will still be added for future
* reconnects, even if it throws an error.)
*
* Setup functions should, ideally, not throw errors, but if they do then the ChannelWrapper will emit an 'error'
* event.
*
* @param {function} setup - setup function.
* @param {function} [done] - callback.
* @returns {void | Promise} - Resolves when complete.
*/
addSetup(setup, done = null) {
return _promiseBreaker2.default.addCallback(done, (this._settingUp || Promise.resolve()).then(() => {
this._setups.push(setup);
if (this._channel) {
return _promiseBreaker2.default.call(setup, this, this._channel);
} else {
return undefined;
}
}));
}
/**
* Adds a new 'setup handler'.
*
* `setup(channel, [cb])` is a function to call when a new underlying channel is created - handy for asserting
* exchanges and queues exists, and whatnot. The `channel` object here is a ConfigChannel from amqplib.
* The `setup` function should return a Promise (or optionally take a callback) - no messages will be sent until
* this Promise resolves.
*
* If there is a connection, `setup()` will be run immediately, and the addSetup Promise/callback won't resolve
* until `setup` is complete. Note that in this case, if the setup throws an error, no 'error' event will
* be emitted, since you can just handle the error here (although the `setup` will still be added for future
* reconnects, even if it throws an error.)
*
* Setup functions should, ideally, not throw errors, but if they do then the ChannelWrapper will emit an 'error'
* event.
*
* @param {function} setup - setup function.
* @param {function} [done] - callback.
* @returns {void | Promise} - Resolves when complete.
*/
addSetup(setup, done = null) {
return _promiseBreaker.default.addCallback(done, (this._settingUp || Promise.resolve()).then(() => {
this._setups.push(setup);
/**
* Remove a setup function added with `addSetup`. If there is currently a
* connection, `teardown(channel, [cb])` will be run immediately, and the
* returned Promise will not resolve until it completes.
*
* @param {function} setup - the setup function to remove.
* @param {function} [teardown] - `function(channel, [cb])` to run to tear
* down the chanel.
* @param {function} [done] - Optional callback.
* @returns {void | Promise} - Resolves when complete.
*/
removeSetup(setup, teardown = null, done = null) {
return _promiseBreaker2.default.addCallback(done, () => {
this._setups = this._setups.filter(s => s !== setup);
if (this._channel) {
return _promiseBreaker.default.call(setup, this, this._channel);
} else {
return undefined;
}
}));
}
/**
* Remove a setup function added with `addSetup`. If there is currently a
* connection, `teardown(channel, [cb])` will be run immediately, and the
* returned Promise will not resolve until it completes.
*
* @param {function} setup - the setup function to remove.
* @param {function} [teardown] - `function(channel, [cb])` to run to tear
* down the chanel.
* @param {function} [done] - Optional callback.
* @returns {void | Promise} - Resolves when complete.
*/
return (this._settingUp || Promise.resolve()).then(() => this._channel ? _promiseBreaker2.default.call(teardown, this, this._channel) : undefined);
});
}
/**
* Returns a Promise which resolves when this channel next connects.
* (Mainly here for unit testing...)
*
* @param {function} [done] - Optional callback.
* @returns {void | Promise} - Resolves when connected.
*/
waitForConnect(done = null) {
return _promiseBreaker2.default.addCallback(done, this._channel && !this._settingUp ? Promise.resolve() : new Promise(resolve => this.once('connect', resolve)));
}
removeSetup(setup, teardown = null, done = null) {
return _promiseBreaker.default.addCallback(done, () => {
this._setups = this._setups.filter(s => s !== setup);
return (this._settingUp || Promise.resolve()).then(() => this._channel ? _promiseBreaker.default.call(teardown, this, this._channel) : undefined);
});
}
/**
* Returns a Promise which resolves when this channel next connects.
* (Mainly here for unit testing...)
*
* @param {function} [done] - Optional callback.
* @returns {void | Promise} - Resolves when connected.
*/
/*
* Publish a message to the channel.
*
* This works just like amqplib's `publish()`, except if the channel is not
* connected, this will wait until the channel is connected. Returns a
* Promise which will only resolve when the message has been succesfully sent.
* The returned promise will be rejected if `close()` is called on this
* channel before it can be sent, if `options.json` is set and the message
* can't be encoded, or if the broker rejects the message for some reason.
*
*/
publish(exchange, routingKey, content, options, done = null) {
return _promiseBreaker2.default.addCallback(done, new Promise((resolve, reject) => {
this._messages.push({
type: 'publish',
exchange,
routingKey,
content,
options,
resolve,
reject
});
this._startWorker();
}));
}
/*
* Send a message to a queue.
*
* This works just like amqplib's `sendToQueue`, except if the channel is not connected, this will wait until the
* channel is connected. Returns a Promise which will only resolve when the message has been succesfully sent.
* The returned promise will be rejected only if `close()` is called on this channel before it can be sent.
*
* `message` here should be a JSON-able object.
*/
sendToQueue(queue, content, options, done = null) {
return _promiseBreaker2.default.addCallback(done, new Promise((resolve, reject) => {
this._messages.push({
type: 'sendToQueue',
queue,
content,
options,
resolve,
reject
});
return this._startWorker();
}));
}
waitForConnect(done = null) {
return _promiseBreaker.default.addCallback(done, this._channel && !this._settingUp ? Promise.resolve() : new Promise(resolve => this.once('connect', resolve)));
}
/*
* Publish a message to the channel.
*
* This works just like amqplib's `publish()`, except if the channel is not
* connected, this will wait until the channel is connected. Returns a
* Promise which will only resolve when the message has been succesfully sent.
* The returned promise will be rejected if `close()` is called on this
* channel before it can be sent, if `options.json` is set and the message
* can't be encoded, or if the broker rejects the message for some reason.
*
*/
/**
* Create a new ChannelWrapper.
*
* @param {AmqpConnectionManager} connectionManager - connection manager which
* created this channel.
* @param {Object} [options] -
* @param {string} [options.name] - A name for this channel. Handy for debugging.
* @param {function} [options.setup] - A default setup function to call. See
* `addSetup` for details.
* @param {boolean} [options.json] - if true, then ChannelWrapper assumes all
* messages passed to `publish()` and `sendToQueue()` are plain JSON objects.
* These will be encoded automatically before being sent.
*
*/
constructor(connectionManager, options = {}) {
super();
this._onConnect = this._onConnect.bind(this);
this._onDisconnect = this._onDisconnect.bind(this);
this._connectionManager = connectionManager;
this.name = options.name;
this.context = {};
publish(exchange, routingKey, content, options, done = null) {
return _promiseBreaker.default.addCallback(done, new Promise((resolve, reject) => {
this._messages.push({
type: 'publish',
exchange,
routingKey,
content,
options,
resolve,
reject
});
this._json = 'json' in options ? options.json : false;
this._startWorker();
}));
}
/*
* Send a message to a queue.
*
* This works just like amqplib's `sendToQueue`, except if the channel is not connected, this will wait until the
* channel is connected. Returns a Promise which will only resolve when the message has been succesfully sent.
* The returned promise will be rejected only if `close()` is called on this channel before it can be sent.
*
* `message` here should be a JSON-able object.
*/
// Place to store queued messages.
this._messages = [];
// True if the "worker" is busy sending messages. False if we need to
// start the worker to get stuff done.
this._working = false;
sendToQueue(queue, content, options, done = null) {
return _promiseBreaker.default.addCallback(done, new Promise((resolve, reject) => {
this._messages.push({
type: 'sendToQueue',
queue,
content,
options,
resolve,
reject
});
// If we're in the process of creating a channel, this is a Promise which
// will resolve when the channel is set up. Otherwise, this is `null`.
this._settingUp = null;
return this._startWorker();
}));
}
/**
* Create a new ChannelWrapper.
*
* @param {AmqpConnectionManager} connectionManager - connection manager which
* created this channel.
* @param {Object} [options] -
* @param {string} [options.name] - A name for this channel. Handy for debugging.
* @param {function} [options.setup] - A default setup function to call. See
* `addSetup` for details.
* @param {boolean} [options.json] - if true, then ChannelWrapper assumes all
* messages passed to `publish()` and `sendToQueue()` are plain JSON objects.
* These will be encoded automatically before being sent.
*
*/
// The currently connected channel. Note that not all setup functions
// have been run on this channel until `@_settingUp` is either null or
// resolved.
this._channel = null;
// We kill off workers when we disconnect. Whenever we start a new
// worker, we bump up the `_workerNumber` - this makes it so if stale
// workers ever do wake up, they'll know to stop working.
this._workerNumber = 0;
constructor(connectionManager, options = {}) {
super();
this._onConnect = this._onConnect.bind(this);
this._onDisconnect = this._onDisconnect.bind(this);
this._connectionManager = connectionManager;
this.name = options.name;
this.context = {};
this._json = 'json' in options ? options.json : false; // Place to store queued messages.
// Array of setup functions to call.
this._setups = [];
if (options.setup) {
this._setups.push(options.setup);
}
this._messages = []; // True if the "worker" is busy sending messages. False if we need to
// start the worker to get stuff done.
if (connectionManager.isConnected()) {
this._onConnect({
connection: this._connectionManager._currentConnection
});
}
connectionManager.on('connect', this._onConnect);
connectionManager.on('disconnect', this._onDisconnect);
}
this._working = false; // If we're in the process of creating a channel, this is a Promise which
// will resolve when the channel is set up. Otherwise, this is `null`.
// Called whenever we connect to the broker.
_onConnect({ connection }) {
this._connection = connection;
this._settingUp = null; // The currently connected channel. Note that not all setup functions
// have been run on this channel until `@_settingUp` is either null or
// resolved.
return connection.createConfirmChannel().then(channel => {
this._channel = channel;
channel.on('close', () => this._onChannelClose(channel));
this._channel = null; // We kill off workers when we disconnect. Whenever we start a new
// worker, we bump up the `_workerNumber` - this makes it so if stale
// workers ever do wake up, they'll know to stop working.
this._settingUp = Promise.all(this._setups.map(setupFn =>
// TODO: Use a timeout here to guard against setupFns that never resolve?
_promiseBreaker2.default.call(setupFn, this, channel).catch(err => {
if (this._channel) {
this.emit('error', err, { name: this.name });
} else {
// Don't emit an error if setups failed because the channel was closing.
}
}))).then(() => {
this._settingUp = null;
return this._channel;
});
this._workerNumber = 0; // Array of setup functions to call.
return this._settingUp;
}).then(() => {
if (!this._channel) {
// Can happen if channel closes while we're setting up.
return;
}
this._setups = [];
// Since we just connected, publish any queued messages
this._startWorker();
this.emit('connect');
}).catch(err => {
this.emit('error', err, { name: this.name });
this._settingUp = null;
this._channel = null;
});
if (options.setup) {
this._setups.push(options.setup);
}
// Called whenever the channel closes.
_onChannelClose(channel) {
if (this._channel === channel) {
this._channel = null;
}
if (connectionManager.isConnected()) {
this._onConnect({
connection: this._connectionManager._currentConnection
});
}
// Wait for another reconnect to create a new channel.
// Called whenever we disconnect from the AMQP server.
_onDisconnect() {
this._channel = null;
connectionManager.on('connect', this._onConnect);
connectionManager.on('disconnect', this._onDisconnect);
} // Called whenever we connect to the broker.
_onConnect({
connection
}) {
this._connection = connection;
return connection.createConfirmChannel().then(channel => {
this._channel = channel;
channel.on('close', () => this._onChannelClose(channel));
this._settingUp = Promise.all(this._setups.map(setupFn => // TODO: Use a timeout here to guard against setupFns that never resolve?
_promiseBreaker.default.call(setupFn, this, channel).catch(err => {
if (this._channel) {
this.emit('error', err, {
name: this.name
});
} else {// Don't emit an error if setups failed because the channel was closing.
}
}))).then(() => {
this._settingUp = null;
return this._channel;
});
return this._settingUp;
}).then(() => {
if (!this._channel) {
// Can happen if channel closes while we're setting up.
return;
} // Since we just connected, publish any queued messages
// Kill off the current worker. We never get any kind of error for messages in flight - see
// https://github.com/squaremo/amqp.node/issues/191.
this._working = false;
}
// Returns the number of unsent messages queued on this channel.
queueLength() {
return this._messages.length;
this._startWorker();
this.emit('connect');
}).catch(err => {
this.emit('error', err, {
name: this.name
});
this._settingUp = null;
this._channel = null;
});
} // Called whenever the channel closes.
_onChannelClose(channel) {
if (this._channel === channel) {
this._channel = null;
}
} // Wait for another reconnect to create a new channel.
// Called whenever we disconnect from the AMQP server.
// Destroy this channel.
//
// Any unsent messages will have their associated Promises rejected.
//
close() {
return Promise.resolve().then(() => {
this._working = false;
if (this._messages.length !== 0) {
// Reject any unsent messages.
this._messages.forEach(message => message.reject(new Error('Channel closed')));
}
this._connectionManager.removeListener('connect', this._onConnect);
this._connectionManager.removeListener('disconnect', this._onDisconnect);
const answer = this._channel && this._channel.close() || undefined;
this._channel = null;
_onDisconnect() {
this._channel = null;
this._settingUp = null; // Kill off the current worker. We never get any kind of error for messages in flight - see
// https://github.com/squaremo/amqp.node/issues/191.
this.emit('close');
this._working = false;
} // Returns the number of unsent messages queued on this channel.
return answer;
});
}
_shouldPublish() {
return this._messages.length > 0 && !this._settingUp && this._channel;
queueLength() {
return this._messages.length;
} // Destroy this channel.
//
// Any unsent messages will have their associated Promises rejected.
//
close() {
return Promise.resolve().then(() => {
this._working = false;
if (this._messages.length !== 0) {
// Reject any unsent messages.
this._messages.forEach(message => message.reject(new Error('Channel closed')));
}
this._connectionManager.removeListener('connect', this._onConnect);
this._connectionManager.removeListener('disconnect', this._onDisconnect);
const answer = this._channel && this._channel.close() || undefined;
this._channel = null;
this.emit('close');
return answer;
});
}
_shouldPublish() {
return this._messages.length > 0 && !this._settingUp && this._channel;
} // Start publishing queued messages, if there isn't already a worker doing this.
_startWorker() {
if (!this._working && this._shouldPublish()) {
this._working = true;
this._workerNumber++;
this._publishQueuedMessages(this._workerNumber);
}
}
// Start publishing queued messages, if there isn't already a worker doing this.
_startWorker() {
if (!this._working && this._shouldPublish()) {
this._working = true;
this._workerNumber++;
this._publishQueuedMessages(this._workerNumber);
}
_publishQueuedMessages(workerNumber) {
if (!this._shouldPublish() || !this._working || workerNumber !== this._workerNumber) {
// Can't publish anything right now...
this._working = false;
return Promise.resolve();
}
_publishQueuedMessages(workerNumber) {
if (!this._shouldPublish() || !this._working || workerNumber !== this._workerNumber) {
// Can't publish anything right now...
this._working = false;
return Promise.resolve();
const channel = this._channel;
const message = this._messages[0];
Promise.resolve().then(() => {
const encodedMessage = this._json ? new Buffer.from(JSON.stringify(message.content)) : message.content;
const sendPromise = (() => {
switch (message.type) {
case 'publish':
return new Promise(function (resolve, reject) {
const result = channel.publish(message.exchange, message.routingKey, encodedMessage, message.options, err => {
if (err) {
reject(err);
} else {
setImmediate(() => resolve(result));
}
});
});
case 'sendToQueue':
return new Promise(function (resolve, reject) {
const result = channel.sendToQueue(message.queue, encodedMessage, message.options, err => {
if (err) {
reject(err);
} else {
setImmediate(() => resolve(result));
}
});
});
/* istanbul ignore next */
default:
throw new Error(`Unhandled message type ${message.type}`);
}
})();
const channel = this._channel;
const message = this._messages[0];
return sendPromise;
}).then(result => {
this._messages.shift();
Promise.resolve().then(() => {
const encodedMessage = this._json ? new Buffer.from(JSON.stringify(message.content)) : message.content;
message.resolve(result); // Send some more!
const sendPromise = (() => {
switch (message.type) {
case 'publish':
return new Promise(function (resolve, reject) {
const result = channel.publish(message.exchange, message.routingKey, encodedMessage, message.options, err => {
if (err) {
reject(err);
} else {
setImmediate(() => resolve(result));
}
});
});
case 'sendToQueue':
return new Promise(function (resolve, reject) {
const result = channel.sendToQueue(message.queue, encodedMessage, message.options, err => {
if (err) {
reject(err);
} else {
setImmediate(() => resolve(result));
}
});
});
this._publishQueuedMessages(workerNumber);
}, err => {
if (!this._channel) {// Tried to write to a closed channel. Leave the message in the queue and we'll try again when we
// reconnect.
} else {
// Something went wrong trying to send this message - could be JSON.stringify failed, could be the
// broker rejected the message. Either way, reject it back
this._messages.shift();
/* istanbul ignore next */
default:
throw new Error(`Unhandled message type ${message.type}`);
}
})();
message.reject(err); // Send some more!
return sendPromise;
}).then(result => {
this._messages.shift();
message.resolve(result);
this._publishQueuedMessages(workerNumber);
}
}).catch(
/* istanbul ignore next */
err => {
this.emit('error', err);
this._working = false;
});
return null;
} // Send an `ack` to the underlying channel.
// Send some more!
this._publishQueuedMessages(workerNumber);
}, err => {
if (!this._channel) {
// Tried to write to a closed channel. Leave the message in the queue and we'll try again when we
// reconnect.
} else {
// Something went wrong trying to send this message - could be JSON.stringify failed, could be the
// broker rejected the message. Either way, reject it back
this._messages.shift();
message.reject(err);
// Send some more!
this._publishQueuedMessages(workerNumber);
}
}).catch( /* istanbul ignore next */err => {
this.emit('error', err);
this._working = false;
});
ack() {
return this._channel && this._channel.ack.apply(this._channel, arguments);
} // Send an `ackAll` to the underlying channel.
return null;
}
// Send an `ack` to the underlying channel.
ack() {
return this._channel && this._channel.ack.apply(this._channel, arguments);
}
ackAll() {
return this._channel && this._channel.ackAll.apply(this._channel, arguments);
} // Send a `nack` to the underlying channel.
// Send an `ackAll` to the underlying channel.
ackAll() {
return this._channel && this._channel.ackAll.apply(this._channel, arguments);
}
// Send a `nack` to the underlying channel.
nack() {
return this._channel && this._channel.nack.apply(this._channel, arguments);
}
nack() {
return this._channel && this._channel.nack.apply(this._channel, arguments);
} // Send a `nackAll` to the underlying channel.
// Send a `nackAll` to the underlying channel.
nackAll() {
return this._channel && this._channel.nackAll.apply(this._channel, arguments);
}
nackAll() {
return this._channel && this._channel.nackAll.apply(this._channel, arguments);
}
}
exports.default = ChannelWrapper;
//# sourceMappingURL=ChannelWrapper.js.map
"use strict";
Object.defineProperty(exports, "__esModule", {
value: true
value: true
});
exports.wait = wait;
function wait(timeInMs) {
return new Promise(function (resolve) {
return setTimeout(resolve, timeInMs);
});
return new Promise(function (resolve) {
return setTimeout(resolve, timeInMs);
});
}
//# sourceMappingURL=helpers.js.map

@@ -1,23 +0,22 @@

'use strict';
"use strict";
Object.defineProperty(exports, "__esModule", {
value: true
value: true
});
exports.connect = connect;
exports.default = void 0;
var _AmqpConnectionManager = require('./AmqpConnectionManager');
var _AmqpConnectionManager = _interopRequireDefault(require("./AmqpConnectionManager"));
var _AmqpConnectionManager2 = _interopRequireDefault(_AmqpConnectionManager);
function _interopRequireDefault(obj) { return obj && obj.__esModule ? obj : { default: obj }; }
function connect(urls, options) {
return new _AmqpConnectionManager2.default(urls, options);
return new _AmqpConnectionManager.default(urls, options);
}
const amqp = {
connect
connect
};
exports.default = amqp;
var _default = amqp;
exports.default = _default;
//# sourceMappingURL=index.js.map
{
"name": "amqp-connection-manager",
"version": "2.3.0",
"version": "2.3.1",
"description": "Auto-reconnect and round robin support for amqplib.",

@@ -13,2 +13,6 @@ "main": "lib/index.js",

"devDependencies": {
"@babel/cli": "^7.2.3",
"@babel/core": "^7.3.3",
"@babel/preset-env": "^7.3.1",
"@babel/register": "^7.0.0",
"@jwalton/semantic-release-config": "^1.0.0",

@@ -18,7 +22,3 @@ "@semantic-release/changelog": "^3.0.1",

"amqplib": "^0.5.1",
"babel-cli": "^6.26.0",
"babel-core": "^6.26.3",
"babel-plugin-istanbul": "^4.1.6",
"babel-preset-env": "^1.6.1",
"babel-register": "^6.26.0",
"babel-plugin-istanbul": "^5.1.1",
"chai": "^4.1.2",

@@ -28,3 +28,4 @@ "chai-as-promised": "^7.1.1",

"coveralls": "^3.0.0",
"eslint": "^5.0.0",
"cross-env": "^5.2.0",
"eslint": "^5.14.1",
"eslint-config-benbria": "^4.0.0",

@@ -36,8 +37,8 @@ "eslint-plugin-import": "^2.11.0",

"istanbul": "^0.4.0",
"mocha": "^5.1.1",
"nyc": "^12.0.1",
"mocha": "^6.0.2",
"nyc": "^13.3.0",
"promise-tools": "^2.0.0",
"proxyquire": "^2.0.1",
"semantic-release": "^15.9.6",
"sinon": "^6.0.1"
"semantic-release": "^15.13.0",
"sinon": "^7.2.4"
},

@@ -54,5 +55,5 @@ "engines": {

"test": "npm run test:lint && npm run test:unittest",
"test:unittest": "NODE_ENV=test nyc mocha test",
"test:unittest": "cross-env NODE_ENV=test nyc mocha",
"test:lint": "eslint src test",
"precommit:unittest": "BABEL_DISABLE_CACHE=1 NODE_ENV=test nyc mocha test --reporter progress",
"precommit:unittest": "cross-env BABEL_DISABLE_CACHE=1 cross-env NODE_ENV=test nyc mocha --reporter progress",
"semantic-release": "semantic-release"

@@ -59,0 +60,0 @@ },

@@ -58,3 +58,3 @@ [![Build Status](https://travis-ci.org/benbria/node-amqp-connection-manager.svg?branch=master)](https://travis-ci.org/benbria/node-amqp-connection-manager)

// Note that `this` here is the channelWrapper instance.
return channel.assertQueue('rxQueueName', {durable: true}),
return channel.assertQueue('rxQueueName', {durable: true});
}

@@ -61,0 +61,0 @@ });

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc