amqp-connection-manager
Advanced tools
Comparing version 1.4.2 to 2.0.0
@@ -0,1 +1,14 @@ | ||
<a name="2.0.0"></a> | ||
# [2.0.0](https://github.com/benbria/node-amqp-connection-manager/compare/v1.4.2...v2.0.0) (2018-05-05) | ||
### Code Refactoring | ||
* Rewrite all source in javascript. ([377d01d](https://github.com/benbria/node-amqp-connection-manager/commit/377d01d)) | ||
### BREAKING CHANGES | ||
* Officially dropping support for node v4.x.x. | ||
1.4.0 | ||
@@ -2,0 +15,0 @@ ----- |
@@ -1,58 +0,84 @@ | ||
// Generated by CoffeeScript 2.2.4 | ||
(function() { | ||
var AmqpConnectionManager, ChannelWrapper, EventEmitter, HEARTBEAT_IN_SECONDS, _, amqp, pb, urlUtils, wait; | ||
'use strict'; | ||
({EventEmitter} = require('events')); | ||
Object.defineProperty(exports, "__esModule", { | ||
value: true | ||
}); | ||
amqp = require('amqplib'); | ||
var _events = require('events'); | ||
urlUtils = require('url'); | ||
var _amqplib = require('amqplib'); | ||
_ = require('lodash'); | ||
var _amqplib2 = _interopRequireDefault(_amqplib); | ||
ChannelWrapper = require('./ChannelWrapper'); | ||
var _url = require('url'); | ||
({wait} = require('./helpers')); | ||
var _url2 = _interopRequireDefault(_url); | ||
pb = require('promise-breaker'); | ||
var _ChannelWrapper = require('./ChannelWrapper'); | ||
// Default heartbeat time. | ||
HEARTBEAT_IN_SECONDS = 5; | ||
var _ChannelWrapper2 = _interopRequireDefault(_ChannelWrapper); | ||
// Events: | ||
// * `connect({connection, url})` - Emitted whenever we connect to a broker. | ||
// * `disconnect({err})` - Emitted whenever we disconnect from a broker. | ||
var _helpers = require('./helpers'); | ||
AmqpConnectionManager = class AmqpConnectionManager extends EventEmitter { | ||
// Create a new AmqplibConnectionManager. | ||
var _promiseBreaker = require('promise-breaker'); | ||
// * `urls` is an array of brokers to connect to. AmqplibConnectionManager will round-robin between them | ||
// whenever it needs to create a new connection. | ||
// * `options.heartbeatIntervalInSeconds` is the interval, in seconds, to send heartbeats. Defaults to 5 seconds. | ||
// * `options.reconnectTimeInSeconds` is the time to wait before trying to reconnect. If not specified, | ||
// defaults to `heartbeatIntervalInSeconds`. | ||
// * `options.connectionOptions` is passed to the amqplib connect method. | ||
// * `options.findServers(callback)` is a function which returns one or more servers to connect to. This should | ||
// return either a single URL or an array of URLs. This is handy when you're using a service discovery mechanism | ||
// such as Consul or etcd. Instead of taking a `callback`, this can also return a Promise. Note that if this | ||
// is supplied, then `urls` is ignored. | ||
var _promiseBreaker2 = _interopRequireDefault(_promiseBreaker); | ||
function _interopRequireDefault(obj) { return obj && obj.__esModule ? obj : { default: obj }; } | ||
// Default heartbeat time. | ||
const HEARTBEAT_IN_SECONDS = 5; | ||
/* istanbul ignore next */ | ||
function neverThrows() { | ||
return err => setImmediate(() => { | ||
throw new Error(`AmqpConnectionManager - should never get here: ${err.message}\n` + err.stack); | ||
}); | ||
} | ||
// | ||
// Events: | ||
// * `connect({connection, url})` - Emitted whenever we connect to a broker. | ||
// * `disconnect({err})` - Emitted whenever we disconnect from a broker. | ||
// | ||
class AmqpConnectionManager extends _events.EventEmitter { | ||
/** | ||
* Create a new AmqplibConnectionManager. | ||
* | ||
* @param {string[]} urls - An array of brokers to connect to. | ||
* AmqplibConnectionManager will round-robin between them whenever it | ||
* needs to create a new connection. | ||
* @param {Object} [options={}] - | ||
* @param {number} [options.heartbeatIntervalInSeconds=5] - The interval, | ||
* in seconds, to send heartbeats. | ||
* @param {number} [options.reconnectTimeInSeconds] - The time to wait | ||
* before trying to reconnect. If not specified, defaults to | ||
* `heartbeatIntervalInSeconds`. | ||
* @param {Object} [options.connectionOptions] - Passed to the amqplib | ||
* connect method. | ||
* @param {function} [options.findServers] - A `fn(callback)` or a `fn()` | ||
* which returns a Promise. This should resolve to one or more servers | ||
* to connect to, either a single URL or an array of URLs. This is handy | ||
* when you're using a service discovery mechanism such as Consul or etcd. | ||
* Note that if this is supplied, then `urls` is ignored. | ||
*/ | ||
constructor(urls, options = {}) { | ||
var ref, ref1, ref2; | ||
super(); | ||
if (!urls && !options.findServers) { | ||
throw new Error("Must supply either `urls` or `findServers`"); | ||
} | ||
this._channels = []; | ||
this._currentUrl = 0; | ||
this.connectionOptions = options.connectionOptions; | ||
this.heartbeatIntervalInSeconds = (ref = options.heartbeatIntervalInSeconds) != null ? ref : HEARTBEAT_IN_SECONDS; | ||
this.reconnectTimeInSeconds = (ref1 = options.reconnectTimeInSeconds) != null ? ref1 : this.heartbeatIntervalInSeconds; | ||
// There will be one listener per channel, and there could be a lot of channels, so disable warnings from node. | ||
this.setMaxListeners(0); | ||
this._findServers = (ref2 = options.findServers) != null ? ref2 : (function() { | ||
return Promise.resolve(urls); | ||
}); | ||
this._connect(); | ||
super(); | ||
if (!urls && !options.findServers) { | ||
throw new Error("Must supply either `urls` or `findServers`"); | ||
} | ||
this._channels = []; | ||
this._currentUrl = 0; | ||
this.connectionOptions = options.connectionOptions; | ||
this.heartbeatIntervalInSeconds = options.heartbeatIntervalInSeconds || HEARTBEAT_IN_SECONDS; | ||
this.reconnectTimeInSeconds = options.reconnectTimeInSeconds || this.heartbeatIntervalInSeconds; | ||
// There will be one listener per channel, and there could be a lot of channels, so disable warnings from node. | ||
this.setMaxListeners(0); | ||
this._findServers = options.findServers || (() => Promise.resolve(urls)); | ||
this._connect(); | ||
} | ||
@@ -62,114 +88,113 @@ | ||
createChannel(options = {}) { | ||
var channel; | ||
channel = new ChannelWrapper(this, options); | ||
this._channels.push(channel); | ||
channel.once('close', () => { | ||
return this._channels = _.without(this._channels, channel); | ||
}); | ||
return channel; | ||
const channel = new _ChannelWrapper2.default(this, options); | ||
this._channels.push(channel); | ||
channel.once('close', () => { | ||
this._channels = this._channels.filter(c => c !== channel); | ||
}); | ||
return channel; | ||
} | ||
close() { | ||
if (this._closed) { | ||
return; | ||
} | ||
this._closed = true; | ||
return Promise.all(this._channels.map(function(channel) { | ||
return channel.close(); | ||
// Ignore errors closing channels. | ||
})).catch(function() {}).then(() => { | ||
var ref; | ||
this._channels = []; | ||
if ((ref = this._currentConnection) != null) { | ||
ref.close(); | ||
if (this._closed) { | ||
return Promise.resolve(); | ||
} | ||
return this._currentConnection = null; | ||
}); | ||
this._closed = true; | ||
return Promise.all(this._channels.map(channel => channel.close())).catch(function () { | ||
// Ignore errors closing channels. | ||
}).then(() => { | ||
this._channels = []; | ||
if (this._currentConnection) { | ||
this._currentConnection.close(); | ||
} | ||
this._currentConnection = null; | ||
}); | ||
} | ||
isConnected() { | ||
return this._currentConnection != null; | ||
return !!this._currentConnection; | ||
} | ||
_connect() { | ||
if (this._closed || this._connecting || this.isConnected()) { | ||
return Promise.resolve(); | ||
} | ||
this._connecting = true; | ||
return Promise.resolve().then(() => { | ||
if (!this._urls || (this._currentUrl >= this._urls.length)) { | ||
this._currentUrl = 0; | ||
return pb.callFn(this._findServers, 0, null); | ||
} else { | ||
return this._urls; | ||
if (this._closed || this._connecting || this.isConnected()) { | ||
return Promise.resolve(); | ||
} | ||
}).then((urls) => { | ||
var amqpUrl, url; | ||
if ((urls != null) && !_.isArray(urls)) { | ||
urls = [urls]; | ||
} | ||
this._urls = urls; | ||
if (!urls || urls.length === 0) { | ||
throw new Error('amqp-connection-manager: No servers found'); | ||
} | ||
// Round robin between brokers | ||
url = urls[this._currentUrl]; | ||
this._currentUrl++; | ||
amqpUrl = urlUtils.parse(url); | ||
if (amqpUrl.search != null) { | ||
amqpUrl.search += `&heartbeat=${this.heartbeatIntervalInSeconds}`; | ||
} else { | ||
amqpUrl.search = `?heartbeat=${this.heartbeatIntervalInSeconds}`; | ||
} | ||
return amqp.connect(urlUtils.format(amqpUrl), this.connectionOptions).then((connection) => { | ||
this._currentConnection = connection; | ||
//emit 'blocked' when RabbitMQ server decides to block the connection (resources running low) | ||
connection.on('blocked', (reason) => { | ||
return this.emit('blocked', {reason}); | ||
}); | ||
connection.on('unblocked', () => { | ||
return this.emit('unblocked'); | ||
}); | ||
// Reconnect if the broker goes away. | ||
connection.on('error', (err) => { | ||
return Promise.resolve().then(function() { | ||
return this._currentConnection.close(); | ||
}).catch(function(err) {}).then(() => { // Ignore | ||
this._currentConnection = null; | ||
this.emit('disconnect', {err}); | ||
return this._connect(); | ||
}).catch(function(err) { | ||
/* !pragma coverage-skip-block */ | ||
// `_connect()` should never throw. | ||
return console.error("amqp-connection-manager: AmqpConnectionManager:_connect()" + " - How did you get here?", err.stack); | ||
this._connecting = true; | ||
return Promise.resolve().then(() => { | ||
if (!this._urls || this._currentUrl >= this._urls.length) { | ||
this._currentUrl = 0; | ||
return _promiseBreaker2.default.callFn(this._findServers, 0, null); | ||
} else { | ||
return this._urls; | ||
} | ||
}).then(urls => { | ||
if (urls && !Array.isArray(urls)) { | ||
urls = [urls]; | ||
} | ||
this._urls = urls; | ||
if (!urls || urls.length === 0) { | ||
throw new Error('amqp-connection-manager: No servers found'); | ||
} | ||
// Round robin between brokers | ||
const url = urls[this._currentUrl]; | ||
this._currentUrl++; | ||
const amqpUrl = _url2.default.parse(url); | ||
if (amqpUrl.search) { | ||
amqpUrl.search += `&heartbeat=${this.heartbeatIntervalInSeconds}`; | ||
} else { | ||
amqpUrl.search = `?heartbeat=${this.heartbeatIntervalInSeconds}`; | ||
} | ||
return _amqplib2.default.connect(_url2.default.format(amqpUrl), this.connectionOptions).then(connection => { | ||
this._currentConnection = connection; | ||
//emit 'blocked' when RabbitMQ server decides to block the connection (resources running low) | ||
connection.on('blocked', reason => this.emit('blocked', { reason })); | ||
connection.on('unblocked', () => this.emit('unblocked')); | ||
// Reconnect if the broker goes away. | ||
connection.on('error', err => Promise.resolve().then(() => this._currentConnection.close()).catch(() => {/* Ignore */}).then(() => { | ||
this._currentConnection = null; | ||
this.emit('disconnect', { err }); | ||
return this._connect(); | ||
}) | ||
// `_connect()` should never throw. | ||
.catch(neverThrows)); | ||
// Reconnect if the connection closes gracefully | ||
connection.on('close', err => { | ||
this._currentConnection = null; | ||
this.emit('disconnect', { err }); | ||
(0, _helpers.wait)(this.reconnectTimeInSeconds * 1000).then(() => this._connect()) | ||
// `_connect()` should never throw. | ||
.catch(neverThrows); | ||
}); | ||
this._connecting = false; | ||
this.emit('connect', { connection, url }); | ||
return null; | ||
}); | ||
}); | ||
// Reconnect if the connection closes gracefully | ||
connection.on('close', (err) => { | ||
}).catch(err => { | ||
this.emit('disconnect', { err }); | ||
// Connection failed... | ||
this._currentConnection = null; | ||
this.emit('disconnect', {err}); | ||
return wait(this.reconnectTimeInSeconds * 1000).then(() => { | ||
return this._connect(); | ||
// TODO: Probably want to try right away here, especially if there are multiple brokers to try... | ||
return (0, _helpers.wait)(this.reconnectTimeInSeconds * 1000).then(() => { | ||
this._connecting = false; | ||
return this._connect(); | ||
}); | ||
}); | ||
this._connecting = false; | ||
this.emit('connect', {connection, url}); | ||
return null; | ||
}); | ||
}).catch((err) => { | ||
this.emit('disconnect', {err}); | ||
// Connection failed... | ||
this._currentConnection = null; | ||
// TODO: Probably want to try right away here, especially if there are multiple brokers to try... | ||
return wait(this.reconnectTimeInSeconds * 1000).then(() => { | ||
this._connecting = false; | ||
return this._connect(); | ||
}); | ||
}); | ||
} | ||
}; | ||
module.exports = AmqpConnectionManager; | ||
}).call(this); | ||
} | ||
exports.default = AmqpConnectionManager; | ||
//# sourceMappingURL=AmqpConnectionManager.js.map |
@@ -1,352 +0,375 @@ | ||
// Generated by CoffeeScript 2.2.4 | ||
(function() { | ||
var ChannelWrapper, EventEmitter, Promise, _, pb, ref, | ||
boundMethodCheck = function(instance, Constructor) { if (!(instance instanceof Constructor)) { throw new Error('Bound instance method accessed before binding'); } }; | ||
'use strict'; | ||
Promise = (ref = global.Promise) != null ? ref : require('es6-promise').Promise; | ||
Object.defineProperty(exports, "__esModule", { | ||
value: true | ||
}); | ||
({EventEmitter} = require('events')); | ||
var _events = require('events'); | ||
_ = require('lodash'); | ||
var _promiseBreaker = require('promise-breaker'); | ||
pb = require('promise-breaker'); | ||
var _promiseBreaker2 = _interopRequireDefault(_promiseBreaker); | ||
ChannelWrapper = (function() { | ||
// Calls to `publish()` or `sendToQueue()` work just like in amqplib, but messages are queued internally and | ||
// are guaranteed to be delivered. If the underlying connection drops, ChannelWrapper will wait for a new | ||
// connection and continue. | ||
function _interopRequireDefault(obj) { return obj && obj.__esModule ? obj : { default: obj }; } | ||
// Events: | ||
// * `connect` - emitted every time this channel connects or reconnects. | ||
// * `error(err, {name})` - emitted if an error occurs setting up the channel. | ||
// * `drop({message, err})` - called when a JSON message was dropped because it could not be encoded. | ||
// * `close` - emitted when this channel closes via a call to `close()` | ||
/** | ||
* Calls to `publish()` or `sendToQueue()` work just like in amqplib, but messages are queued internally and | ||
* are guaranteed to be delivered. If the underlying connection drops, ChannelWrapper will wait for a new | ||
* connection and continue. | ||
* | ||
* Events: | ||
* * `connect` - emitted every time this channel connects or reconnects. | ||
* * `error(err, {name})` - emitted if an error occurs setting up the channel. | ||
* * `drop({message, err})` - called when a JSON message was dropped because it could not be encoded. | ||
* * `close` - emitted when this channel closes via a call to `close()` | ||
* | ||
*/ | ||
class ChannelWrapper extends _events.EventEmitter { | ||
/** | ||
* Adds a new 'setup handler'. | ||
* | ||
* `setup(channel, [cb])` is a function to call when a new underlying channel is created - handy for asserting | ||
* exchanges and queues exists, and whatnot. The `channel` object here is a ConfigChannel from amqplib. | ||
* The `setup` function should return a Promise (or optionally take a callback) - no messages will be sent until | ||
* this Promise resolves. | ||
* | ||
* If there is a connection, `setup()` will be run immediately, and the addSetup Promise/callback won't resolve | ||
* until `setup` is complete. Note that in this case, if the setup throws an error, no 'error' event will | ||
* be emitted, since you can just handle the error here (although the `setup` will still be added for future | ||
* reconnects, even if it throws an error.) | ||
* | ||
* Setup functions should, ideally, not throw errors, but if they do then the ChannelWrapper will emit an 'error' | ||
* event. | ||
* | ||
* @param {function} setup - setup function. | ||
* @param {function} [done] - callback. | ||
* @returns {void | Promise} - Resolves when complete. | ||
*/ | ||
addSetup(setup, done = null) { | ||
return _promiseBreaker2.default.addCallback(done, (this._settingUp || Promise.resolve()).then(() => { | ||
this._setups.push(setup); | ||
if (this._channel) { | ||
return _promiseBreaker2.default.call(setup, null, this._channel); | ||
} else { | ||
return undefined; | ||
} | ||
})); | ||
} | ||
class ChannelWrapper extends EventEmitter { | ||
// Create a new ChannelWrapper. | ||
/** | ||
* Remove a setup function added with `addSetup`. If there is currently a | ||
* connection, `teardown(channel, [cb])` will be run immediately, and the | ||
* returned Promise will not resolve until it completes. | ||
* | ||
* @param {function} setup - the setup function to remove. | ||
* @param {function} [teardown] - `function(channel, [cb])` to run to tear | ||
* down the chanel. | ||
* @param {function} [done] - Optional callback. | ||
* @returns {void | Promise} - Resolves when complete. | ||
*/ | ||
removeSetup(setup, teardown = null, done = null) { | ||
return _promiseBreaker2.default.addCallback(done, () => { | ||
this._setups = this._setups.filter(s => s !== setup); | ||
// * `options.name` is a name for this channel. Handy for debugging. | ||
// * `options.setup` is a default setup function to call. See `addSetup` for details. | ||
// * `options.json` if true, then ChannelWrapper assumes all messages passed to `publish()` and `sendToQueue()` | ||
// are plain JSON objects. These will be encoded automatically before being sent. | ||
return (this._settingUp || Promise.resolve()).then(() => this._channel ? _promiseBreaker2.default.call(teardown, null, this._channel) : undefined); | ||
}); | ||
} | ||
constructor(connectionManager, options = {}) { | ||
var ref1; | ||
/** | ||
* Returns a Promise which resolves when this channel next connects. | ||
* (Mainly here for unit testing...) | ||
* | ||
* @param {function} [done] - Optional callback. | ||
* @returns {void | Promise} - Resolves when connected. | ||
*/ | ||
waitForConnect(done = null) { | ||
return _promiseBreaker2.default.addCallback(done, this._channel && !this._settingUp ? Promise.resolve() : new Promise(resolve => this.once('connect', resolve))); | ||
} | ||
/* | ||
* Publish a message to the channel. | ||
* | ||
* This works just like amqplib's `publish()`, except if the channel is not | ||
* connected, this will wait until the channel is connected. Returns a | ||
* Promise which will only resolve when the message has been succesfully sent. | ||
* The returned promise will be rejected if `close()` is called on this | ||
* channel before it can be sent, if `options.json` is set and the message | ||
* can't be encoded, or if the broker rejects the message for some reason. | ||
* | ||
*/ | ||
publish(exchange, routingKey, content, options, done = null) { | ||
return _promiseBreaker2.default.addCallback(done, new Promise((resolve, reject) => { | ||
this._messages.push({ | ||
type: 'publish', | ||
exchange, | ||
routingKey, | ||
content, | ||
options, | ||
resolve, | ||
reject | ||
}); | ||
this._startWorker(); | ||
})); | ||
} | ||
/* | ||
* Send a message to a queue. | ||
* | ||
* This works just like amqplib's `sendToQueue`, except if the channel is not connected, this will wait until the | ||
* channel is connected. Returns a Promise which will only resolve when the message has been succesfully sent. | ||
* The returned promise will be rejected only if `close()` is called on this channel before it can be sent. | ||
* | ||
* `message` here should be a JSON-able object. | ||
*/ | ||
sendToQueue(queue, content, options, done = null) { | ||
return _promiseBreaker2.default.addCallback(done, new Promise((resolve, reject) => { | ||
this._messages.push({ | ||
type: 'sendToQueue', | ||
queue, | ||
content, | ||
options, | ||
resolve, | ||
reject | ||
}); | ||
return this._startWorker(); | ||
})); | ||
} | ||
/** | ||
* Create a new ChannelWrapper. | ||
* | ||
* @param {AmqpConnectionManager} connectionManager - connection manager which | ||
* created this channel. | ||
* @param {Object} [options] - | ||
* @param {string} [options.name] - A name for this channel. Handy for debugging. | ||
* @param {function} [options.setup] - A default setup function to call. See | ||
* `addSetup` for details. | ||
* @param {boolean} [options.json] - if true, then ChannelWrapper assumes all | ||
* messages passed to `publish()` and `sendToQueue()` are plain JSON objects. | ||
* These will be encoded automatically before being sent. | ||
* | ||
*/ | ||
constructor(connectionManager, options = {}) { | ||
super(); | ||
// Called whenever we connect to the broker. | ||
this._onConnect = this._onConnect.bind(this); | ||
// Wait for another reconnect to create a new channel. | ||
// Called whenever we disconnect from the AMQP server. | ||
this._onDisconnect = this._onDisconnect.bind(this); | ||
this._connectionManager = connectionManager; | ||
this.name = options.name; | ||
this._json = (ref1 = options.json) != null ? ref1 : false; | ||
this._json = 'json' in options ? options.json : false; | ||
// Place to store queued messages. | ||
this._messages = []; | ||
// True if the "worker" is busy sending messages. False if we need to start the worker to get stuff done. | ||
// True if the "worker" is busy sending messages. False if we need to | ||
// start the worker to get stuff done. | ||
this._working = false; | ||
// If we're in the process of creating a channel, this is a Promise which will resolve when the channel is | ||
// set up. Otherwise, this is `null`. | ||
// If we're in the process of creating a channel, this is a Promise which | ||
// will resolve when the channel is set up. Otherwise, this is `null`. | ||
this._settingUp = null; | ||
// The currently connected channel. Note that not all setup functions have been run on this channel until | ||
// `@_settingUp` is either null or resolved. | ||
// The currently connected channel. Note that not all setup functions | ||
// have been run on this channel until `@_settingUp` is either null or | ||
// resolved. | ||
this._channel = null; | ||
// We kill off workers when we disconnect. Whenever we start a new worker, we bump up the `_workerNumber` - | ||
// this makes it so if stale workers ever do wake up, they'll know to stop working. | ||
// We kill off workers when we disconnect. Whenever we start a new | ||
// worker, we bump up the `_workerNumber` - this makes it so if stale | ||
// workers ever do wake up, they'll know to stop working. | ||
this._workerNumber = 0; | ||
// Array of setup functions to call. | ||
this._setups = []; | ||
if (options.setup != null) { | ||
this._setups.push(options.setup); | ||
if (options.setup) { | ||
this._setups.push(options.setup); | ||
} | ||
if (connectionManager.isConnected()) { | ||
this._onConnect({ | ||
connection: this._connectionManager._currentConnection | ||
}); | ||
this._onConnect({ | ||
connection: this._connectionManager._currentConnection | ||
}); | ||
} | ||
connectionManager.on('connect', this._onConnect); | ||
connectionManager.on('disconnect', this._onDisconnect); | ||
} | ||
} | ||
_onConnect({connection}) { | ||
boundMethodCheck(this, ChannelWrapper); | ||
// Called whenever we connect to the broker. | ||
_onConnect({ connection }) { | ||
this._connection = connection; | ||
return connection.createConfirmChannel().then((channel) => { | ||
this._channel = channel; | ||
channel.on('close', () => { | ||
return this._onChannelClose(channel); | ||
}); | ||
return this._settingUp = Promise.all(this._setups.map((setupFn) => { | ||
return connection.createConfirmChannel().then(channel => { | ||
this._channel = channel; | ||
channel.on('close', () => this._onChannelClose(channel)); | ||
this._settingUp = Promise.all(this._setups.map(setupFn => | ||
// TODO: Use a timeout here to guard against setupFns that never resolve? | ||
return pb.call(setupFn, null, channel).catch((err) => { | ||
if (this._channel) { | ||
return this.emit('error', err, { | ||
name: this.name | ||
}); | ||
} else { | ||
_promiseBreaker2.default.call(setupFn, null, channel).catch(err => { | ||
if (this._channel) { | ||
this.emit('error', err, { name: this.name }); | ||
} else { | ||
// Don't emit an error if setups failed because the channel was closing. | ||
} | ||
}))).then(() => { | ||
this._settingUp = null; | ||
return this._channel; | ||
}); | ||
} | ||
}); | ||
// Don't emit an error if setups failed because the channel was closing. | ||
})).then(() => { | ||
return this._settingUp; | ||
}).then(() => { | ||
if (!this._channel) { | ||
// Can happen if channel closes while we're setting up. | ||
return; | ||
} | ||
// Since we just connected, publish any queued messages | ||
this._startWorker(); | ||
this.emit('connect'); | ||
}).catch(err => { | ||
this.emit('error', err, { name: this.name }); | ||
this._settingUp = null; | ||
return this._channel; | ||
}); | ||
}).then(() => { | ||
if (this._channel == null) { // Can happen if channel closes while we're setting up. | ||
return; | ||
} | ||
// Since we just connected, publish any queued messages | ||
this._startWorker(); | ||
return this.emit('connect'); | ||
}).catch((err) => { | ||
this.emit('error', err, { | ||
name: this.name | ||
}); | ||
this._settingUp = null; | ||
return this._channel = null; | ||
this._channel = null; | ||
}); | ||
} | ||
} | ||
// Called whenever the channel closes. | ||
_onChannelClose(channel) { | ||
// Called whenever the channel closes. | ||
_onChannelClose(channel) { | ||
if (this._channel === channel) { | ||
return this._channel = null; | ||
this._channel = null; | ||
} | ||
} | ||
} | ||
// Wait for another reconnect to create a new channel. | ||
_onDisconnect() { | ||
boundMethodCheck(this, ChannelWrapper); | ||
// Called whenever we disconnect from the AMQP server. | ||
_onDisconnect() { | ||
this._channel = null; | ||
this._settingUp = null; | ||
// Kill off the current worker. We never get any kind of error for messages in flight - see | ||
// https://github.com/squaremo/amqp.node/issues/191. | ||
return this._working = false; | ||
} | ||
this._working = false; | ||
} | ||
// Returns the number of unsent messages queued on this channel. | ||
queueLength() { | ||
// Returns the number of unsent messages queued on this channel. | ||
queueLength() { | ||
return this._messages.length; | ||
} | ||
} | ||
// Destroy this channel. | ||
// Any unsent messages will have their associated Promises rejected. | ||
close() { | ||
// Destroy this channel. | ||
// | ||
// Any unsent messages will have their associated Promises rejected. | ||
// | ||
close() { | ||
return Promise.resolve().then(() => { | ||
var answer, ref1, ref2; | ||
this._working = false; | ||
if (this._messages.length !== 0) { | ||
// Reject any unsent messages. | ||
this._messages.forEach(function(message) { | ||
return message.reject(new Error('Channel closed')); | ||
}); | ||
} | ||
this._connectionManager.removeListener('connect', this._onConnect); | ||
this._connectionManager.removeListener('disconnect', this._onDisconnect); | ||
answer = (ref1 = (ref2 = this._channel) != null ? ref2.close() : void 0) != null ? ref1 : Promise.resolve(); | ||
this._channel = null; | ||
this.emit('close'); | ||
return answer; | ||
}); | ||
} | ||
this._working = false; | ||
if (this._messages.length !== 0) { | ||
// Reject any unsent messages. | ||
this._messages.forEach(message => message.reject(new Error('Channel closed'))); | ||
} | ||
_shouldPublish() { | ||
return (this._messages.length > 0) && !this._settingUp && this._channel; | ||
} | ||
this._connectionManager.removeListener('connect', this._onConnect); | ||
this._connectionManager.removeListener('disconnect', this._onDisconnect); | ||
const answer = this._channel && this._channel.close() || undefined; | ||
this._channel = null; | ||
// Start publishing queued messages, if there isn't already a worker doing this. | ||
_startWorker() { | ||
if (!this._working && this._shouldPublish()) { | ||
this._working = true; | ||
this._workerNumber++; | ||
return this._publishQueuedMessages(this._workerNumber); | ||
} | ||
} | ||
this.emit('close'); | ||
_publishQueuedMessages(workerNumber) { | ||
var channel, message; | ||
if (!this._shouldPublish() || !this._working || (workerNumber !== this._workerNumber)) { | ||
// Can't publish anything right now... | ||
this._working = false; | ||
return Promise.resolve(); | ||
} | ||
channel = this._channel; | ||
message = this._messages[0]; | ||
Promise.resolve().then(() => { | ||
var encodedMessage, sendPromise; | ||
encodedMessage = this._json ? new Buffer.from(JSON.stringify(message.content)) : message.content; | ||
sendPromise = (function() { | ||
switch (message.type) { | ||
case 'publish': | ||
return new Promise(function(resolve, reject) { | ||
var result; | ||
return result = channel.publish(message.exchange, message.routingKey, encodedMessage, message.options, function(err) { | ||
if (err) { | ||
return reject(err); | ||
} | ||
return setImmediate(function() { | ||
return resolve(result); | ||
}); | ||
}); | ||
}); | ||
case 'sendToQueue': | ||
return new Promise(function(resolve, reject) { | ||
var result; | ||
return result = channel.sendToQueue(message.queue, encodedMessage, message.options, function(err) { | ||
if (err) { | ||
return reject(err); | ||
} | ||
return setImmediate(function() { | ||
return resolve(result); | ||
}); | ||
}); | ||
}); | ||
default: | ||
/* !pragma coverage-skip-block */ | ||
throw new Error(`Unhandled message type ${message.type}`); | ||
} | ||
})(); | ||
return sendPromise; | ||
}).then((result) => { | ||
this._messages.shift(); | ||
message.resolve(result); | ||
// Send some more! | ||
return this._publishQueuedMessages(workerNumber); | ||
}, (err) => { | ||
if (!this._channel) { | ||
} else { | ||
// Something went wrong trying to send this message - could be JSON.stringify failed, could be the | ||
// broker rejected the message. Either way, reject it back | ||
// Tried to write to a closed channel. Leave the message in the queue and we'll try again when we | ||
// reconnect. | ||
this._messages.shift(); | ||
message.reject(err); | ||
// Send some more! | ||
return this._publishQueuedMessages(workerNumber); | ||
} | ||
}).catch((err) => { | ||
/* !pragma coverage-skip-block */ | ||
console.error("amqp-connection-manager: ChannelWrapper:_publishQueuedMessages() - How did you get here?", err.stack); | ||
this.emit('error', err); | ||
return this._working = false; | ||
return answer; | ||
}); | ||
return null; | ||
} | ||
} | ||
// Send an `ack` to the underlying channel. | ||
ack(...args) { | ||
var ref1; | ||
return (ref1 = this._channel) != null ? ref1.ack(...args) : void 0; | ||
} | ||
_shouldPublish() { | ||
return this._messages.length > 0 && !this._settingUp && this._channel; | ||
} | ||
// Send a `nack` to the underlying channel. | ||
nack(...args) { | ||
var ref1; | ||
return (ref1 = this._channel) != null ? ref1.nack(...args) : void 0; | ||
} | ||
}; | ||
// Adds a new 'setup handler'. | ||
// `setup(channel, [cb])` is a function to call when a new underlying channel is created - handy for asserting | ||
// exchanges and queues exists, and whatnot. The `channel` object here is a ConfigChannel from amqplib. | ||
// The `setup` function should return a Promise (or optionally take a callback) - no messages will be sent until | ||
// this Promise resolves. | ||
// If there is a connection, `setup()` will be run immediately, and the addSetup Promise/callback won't resolve | ||
// until `setup` is complete. Note that in this case, if the setup throws an error, no 'error' event will | ||
// be emitted, since you can just handle the error here (although the `setup` will still be added for future | ||
// reconnects, even if it throws an error.) | ||
// Setup functions should, ideally, not throw errors, but if they do then the ChannelWrapper will emit an 'error' | ||
// event. | ||
ChannelWrapper.prototype.addSetup = pb.break(function(setup) { | ||
return (this._settingUp || Promise.resolve()).then(() => { | ||
this._setups.push(setup); | ||
if (this._channel) { | ||
return pb.call(setup, null, this._channel); | ||
// Start publishing queued messages, if there isn't already a worker doing this. | ||
_startWorker() { | ||
if (!this._working && this._shouldPublish()) { | ||
this._working = true; | ||
this._workerNumber++; | ||
this._publishQueuedMessages(this._workerNumber); | ||
} | ||
}); | ||
}); | ||
} | ||
// Remove a setup function added with `addSetup`. If there is currently connection, `teardown(channel, [cb])` will | ||
// be run immediately, and the returned Promise will not resolve until it completes. | ||
ChannelWrapper.prototype.removeSetup = pb.break(function(setup, teardown) { | ||
this._setups = _.without(this._setups, setup); | ||
return (this._settingUp || Promise.resolve()).then(() => { | ||
if (this._channel) { | ||
return pb.call(teardown, null, this._channel); | ||
_publishQueuedMessages(workerNumber) { | ||
if (!this._shouldPublish() || !this._working || workerNumber !== this._workerNumber) { | ||
// Can't publish anything right now... | ||
this._working = false; | ||
return Promise.resolve(); | ||
} | ||
}); | ||
}); | ||
// Returns a Promise which resolves when this channel next connects. | ||
// (Mainly here for unit testing...) | ||
ChannelWrapper.prototype.waitForConnect = pb.break(function() { | ||
if (this._channel && !this._settingUp) { | ||
return Promise.resolve(); | ||
} else { | ||
return new Promise((resolve) => { | ||
return this.once('connect', resolve); | ||
}); | ||
} | ||
}); | ||
const channel = this._channel; | ||
const message = this._messages[0]; | ||
// Publish a message to the channel. | ||
Promise.resolve().then(() => { | ||
const encodedMessage = this._json ? new Buffer.from(JSON.stringify(message.content)) : message.content; | ||
// This works just like amqplib's `publish()`, except if the channel is not connected, this will wait until the | ||
// channel is connected. Returns a Promise which will only resolve when the message has been succesfully sent. | ||
// The returned promise will be rejected if `close()` is called on this channel before it can be sent, if | ||
// `options.json` is set and the message can't be encoded, or if the broker rejects the message for some reason. | ||
const sendPromise = (() => { | ||
switch (message.type) { | ||
case 'publish': | ||
return new Promise(function (resolve, reject) { | ||
const result = channel.publish(message.exchange, message.routingKey, encodedMessage, message.options, err => { | ||
if (err) { | ||
reject(err); | ||
} else { | ||
setImmediate(() => resolve(result)); | ||
} | ||
}); | ||
}); | ||
case 'sendToQueue': | ||
return new Promise(function (resolve, reject) { | ||
const result = channel.sendToQueue(message.queue, encodedMessage, message.options, err => { | ||
if (err) { | ||
reject(err); | ||
} else { | ||
setImmediate(() => resolve(result)); | ||
} | ||
}); | ||
}); | ||
ChannelWrapper.prototype.publish = pb.break(function(exchange, routingKey, content, options) { | ||
return new Promise((resolve, reject) => { | ||
this._messages.push({ | ||
type: 'publish', | ||
exchange, | ||
routingKey, | ||
content, | ||
options, | ||
resolve, | ||
reject | ||
}); | ||
return this._startWorker(); | ||
}); | ||
}); | ||
/* istanbul ignore next */ | ||
default: | ||
throw new Error(`Unhandled message type ${message.type}`); | ||
} | ||
})(); | ||
// Send a message to a queue. | ||
return sendPromise; | ||
}).then(result => { | ||
this._messages.shift(); | ||
message.resolve(result); | ||
// This works just like amqplib's `sendToQueue`, except if the channel is not connected, this will wait until the | ||
// channel is connected. Returns a Promise which will only resolve when the message has been succesfully sent. | ||
// The returned promise will be rejected only if `close()` is called on this channel before it can be sent. | ||
// Send some more! | ||
this._publishQueuedMessages(workerNumber); | ||
}, err => { | ||
if (!this._channel) { | ||
// Tried to write to a closed channel. Leave the message in the queue and we'll try again when we | ||
// reconnect. | ||
} else { | ||
// Something went wrong trying to send this message - could be JSON.stringify failed, could be the | ||
// broker rejected the message. Either way, reject it back | ||
this._messages.shift(); | ||
message.reject(err); | ||
// `message` here should be a JSON-able object. | ||
ChannelWrapper.prototype.sendToQueue = pb.break(function(queue, content, options) { | ||
return new Promise((resolve, reject) => { | ||
this._messages.push({ | ||
type: 'sendToQueue', | ||
queue, | ||
content, | ||
options, | ||
resolve, | ||
reject | ||
// Send some more! | ||
this._publishQueuedMessages(workerNumber); | ||
} | ||
}).catch( /* istanbul ignore next */err => { | ||
this.emit('error', err); | ||
this._working = false; | ||
}); | ||
return this._startWorker(); | ||
}); | ||
}); | ||
return ChannelWrapper; | ||
return null; | ||
} | ||
}).call(this); | ||
// Send an `ack` to the underlying channel. | ||
ack() { | ||
return this._channel && this._channel.ack.apply(this._channel, arguments); | ||
} | ||
module.exports = ChannelWrapper; | ||
}).call(this); | ||
// Send a `nack` to the underlying channel. | ||
nack() { | ||
return this._channel && this._channel.nack.apply(this._channel, arguments); | ||
} | ||
} | ||
exports.default = ChannelWrapper; | ||
//# sourceMappingURL=ChannelWrapper.js.map |
@@ -1,13 +0,12 @@ | ||
// Generated by CoffeeScript 2.2.4 | ||
(function() { | ||
var Promise, ref; | ||
"use strict"; | ||
Promise = (ref = global.Promise) != null ? ref : require('es6-promise').Promise; | ||
exports.wait = function(timeInMs) { | ||
return new Promise(function(resolve, reject) { | ||
return setTimeout(resolve, timeInMs); | ||
Object.defineProperty(exports, "__esModule", { | ||
value: true | ||
}); | ||
exports.wait = wait; | ||
function wait(timeInMs) { | ||
return new Promise(function (resolve) { | ||
return setTimeout(resolve, timeInMs); | ||
}); | ||
}; | ||
}).call(this); | ||
} | ||
//# sourceMappingURL=helpers.js.map |
@@ -1,11 +0,17 @@ | ||
// Generated by CoffeeScript 2.2.4 | ||
(function() { | ||
var AmqpConnectionManager; | ||
'use strict'; | ||
AmqpConnectionManager = require('./AmqpConnectionManager'); | ||
Object.defineProperty(exports, "__esModule", { | ||
value: true | ||
}); | ||
exports.connect = connect; | ||
exports.connect = function(urls, options) { | ||
return new AmqpConnectionManager(urls, options); | ||
}; | ||
var _AmqpConnectionManager = require('./AmqpConnectionManager'); | ||
}).call(this); | ||
var _AmqpConnectionManager2 = _interopRequireDefault(_AmqpConnectionManager); | ||
function _interopRequireDefault(obj) { return obj && obj.__esModule ? obj : { default: obj }; } | ||
function connect(urls, options) { | ||
return new _AmqpConnectionManager2.default(urls, options); | ||
} | ||
//# sourceMappingURL=index.js.map |
{ | ||
"name": "amqp-connection-manager", | ||
"version": "1.4.2", | ||
"version": "2.0.0", | ||
"description": "Auto-reconnect and round robin support for amqplib.", | ||
"main": "lib/index.js", | ||
"dependencies": { | ||
"es6-promise": "^4.1.1", | ||
"lodash": "^4.15.0", | ||
"promise-breaker": "^4.1.2", | ||
"when": "^3.7.3" | ||
"promise-breaker": "^4.1.2" | ||
}, | ||
@@ -16,22 +13,50 @@ "peerDependencies": { | ||
"devDependencies": { | ||
"@semantic-release/changelog": "^2.0.2", | ||
"@semantic-release/git": "^4.0.3", | ||
"amqplib": "^0.5.1", | ||
"babel-cli": "^6.26.0", | ||
"babel-core": "^6.26.3", | ||
"babel-env": "^2.4.1", | ||
"babel-plugin-istanbul": "^4.1.6", | ||
"babel-register": "^6.26.0", | ||
"chai": "^4.1.2", | ||
"chai-as-promised": "^7.1.1", | ||
"chai-string": "^1.1.2", | ||
"coffee-coverage": "^3.0.0", | ||
"coffeescript": "^2.2.4", | ||
"commitlint": "^6.2.0", | ||
"coveralls": "^3.0.0", | ||
"eslint": "^4.19.1", | ||
"eslint-config-benbria": "^3.0.2", | ||
"eslint-plugin-import": "^2.11.0", | ||
"eslint-plugin-promise": "^3.7.0", | ||
"greenkeeper-lockfile": "^1.14.0", | ||
"husky": "^1.0.0-rc.2", | ||
"istanbul": "^0.4.0", | ||
"mocha": "^5.1.1", | ||
"nyc": "^11.7.1", | ||
"promise-tools": "^1.1.0", | ||
"proxyquire": "^2.0.1", | ||
"semantic-release": "^15.2.0", | ||
"sinon": "^5.0.1" | ||
}, | ||
"engines": { | ||
"node": ">=6.0.0", | ||
"npm": ">5.0.0" | ||
}, | ||
"scripts": { | ||
"prepublish": "npm run build && npm test", | ||
"build": "coffee -c -o lib src", | ||
"prepare": "npm run build", | ||
"prepublishOnly": "npm test", | ||
"build": "babel -s -d lib src", | ||
"clean": "rm -rf lib coverage", | ||
"distclean": "npm run clean && rm -rf node_modules", | ||
"test": "mocha \"test/**/*.{js,coffee}\" && istanbul report text-summary lcov" | ||
"test": "npm run test:lint && npm run test:unittest", | ||
"test:unittest": "NODE_ENV=test nyc mocha test", | ||
"test:lint": "eslint src test", | ||
"precommit:unittest": "NODE_ENV=test nyc mocha test --reporter progress", | ||
"semantic-release": "semantic-release" | ||
}, | ||
"husky": { | ||
"hooks": { | ||
"commit-msg": "commitlint -e $GIT_PARAMS", | ||
"pre-commit": "npm run test:lint && npm run precommit:unittest" | ||
} | ||
}, | ||
"repository": { | ||
@@ -38,0 +63,0 @@ "type": "git", |
[![Build Status](https://travis-ci.org/benbria/node-amqp-connection-manager.svg?branch=master)](https://travis-ci.org/benbria/node-amqp-connection-manager) | ||
[![Coverage Status](https://coveralls.io/repos/benbria/node-amqp-connection-manager/badge.svg?branch=master&service=github)](https://coveralls.io/github/benbria/node-amqp-connection-manager?branch=master) | ||
[![Greenkeeper badge](https://badges.greenkeeper.io/benbria/node-amqp-connection-manager.svg)](https://greenkeeper.io/) | ||
[![semantic-release](https://img.shields.io/badge/%20%20%F0%9F%93%A6%F0%9F%9A%80-semantic--release-e10079.svg)](https://github.com/semantic-release/semantic-release) | ||
@@ -5,0 +6,0 @@ [![Dependency Status](https://david-dm.org/benbria/node-amqp-connection-manager.svg)](https://david-dm.org/benbria/node-amqp-connection-manager) |
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
New author
Supply chain riskA new npm collaborator published a version of the package for the first time. New collaborators are usually benign additions to a project, but do indicate a change to the security surface area of a package.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
154349
2
20
518
182
26
2
- Removedes6-promise@^4.1.1
- Removedlodash@^4.15.0
- Removedwhen@^3.7.3
- Removedes6-promise@4.2.8(transitive)
- Removedlodash@4.17.21(transitive)
- Removedwhen@3.7.8(transitive)