amqp-connection-manager
Advanced tools
Comparing version 1.4.0 to 1.4.1
@@ -1,8 +0,6 @@ | ||
// Generated by CoffeeScript 1.12.7 | ||
// Generated by CoffeeScript 2.2.4 | ||
(function() { | ||
var AmqpConnectionManager, ChannelWrapper, EventEmitter, HEARTBEAT_IN_SECONDS, _, amqp, pb, urlUtils, wait, | ||
extend = function(child, parent) { for (var key in parent) { if (hasProp.call(parent, key)) child[key] = parent[key]; } function ctor() { this.constructor = child; } ctor.prototype = parent.prototype; child.prototype = new ctor(); child.__super__ = parent.prototype; return child; }, | ||
hasProp = {}.hasOwnProperty; | ||
var AmqpConnectionManager, ChannelWrapper, EventEmitter, HEARTBEAT_IN_SECONDS, _, amqp, pb, urlUtils, wait; | ||
EventEmitter = require('events').EventEmitter; | ||
({EventEmitter} = require('events')); | ||
@@ -17,16 +15,31 @@ amqp = require('amqplib'); | ||
wait = require('./helpers').wait; | ||
({wait} = require('./helpers')); | ||
pb = require('promise-breaker'); | ||
// Default heartbeat time. | ||
HEARTBEAT_IN_SECONDS = 5; | ||
AmqpConnectionManager = (function(superClass) { | ||
extend(AmqpConnectionManager, superClass); | ||
// Events: | ||
// * `connect({connection, url})` - Emitted whenever we connect to a broker. | ||
// * `disconnect({err})` - Emitted whenever we disconnect from a broker. | ||
function AmqpConnectionManager(urls, options) { | ||
AmqpConnectionManager = class AmqpConnectionManager extends EventEmitter { | ||
// Create a new AmqplibConnectionManager. | ||
// * `urls` is an array of brokers to connect to. AmqplibConnectionManager will round-robin between them | ||
// whenever it needs to create a new connection. | ||
// * `options.heartbeatIntervalInSeconds` is the interval, in seconds, to send heartbeats. Defaults to 5 seconds. | ||
// * `options.reconnectTimeInSeconds` is the time to wait before trying to reconnect. If not specified, | ||
// defaults to `heartbeatIntervalInSeconds`. | ||
// * `options.connectionOptions` is passed to the amqplib connect method. | ||
// * `options.findServers(callback)` is a function which returns one or more servers to connect to. This should | ||
// return either a single URL or an array of URLs. This is handy when you're using a service discovery mechanism | ||
// such as Consul or etcd. Instead of taking a `callback`, this can also return a Promise. Note that if this | ||
// is supplied, then `urls` is ignored. | ||
constructor(urls, options = {}) { | ||
var ref, ref1, ref2; | ||
if (options == null) { | ||
options = {}; | ||
} | ||
super(); | ||
if (!urls && !options.findServers) { | ||
@@ -40,2 +53,3 @@ throw new Error("Must supply either `urls` or `findServers`"); | ||
this.reconnectTimeInSeconds = (ref1 = options.reconnectTimeInSeconds) != null ? ref1 : this.heartbeatIntervalInSeconds; | ||
// There will be one listener per channel, and there could be a lot of channels, so disable warnings from node. | ||
this.setMaxListeners(0); | ||
@@ -48,18 +62,14 @@ this._findServers = (ref2 = options.findServers) != null ? ref2 : (function() { | ||
AmqpConnectionManager.prototype.createChannel = function(options) { | ||
// `options` here are any options that can be passed to ChannelWrapper. | ||
createChannel(options = {}) { | ||
var channel; | ||
if (options == null) { | ||
options = {}; | ||
} | ||
channel = new ChannelWrapper(this, options); | ||
this._channels.push(channel); | ||
channel.once('close', (function(_this) { | ||
return function() { | ||
return _this._channels = _.without(_this._channels, channel); | ||
}; | ||
})(this)); | ||
channel.once('close', () => { | ||
return this._channels = _.without(this._channels, channel); | ||
}); | ||
return channel; | ||
}; | ||
} | ||
AmqpConnectionManager.prototype.close = function() { | ||
close() { | ||
if (this._closed) { | ||
@@ -71,19 +81,18 @@ return; | ||
return channel.close(); | ||
}))["catch"](function() {}).then((function(_this) { | ||
return function() { | ||
var ref; | ||
_this._channels = []; | ||
if ((ref = _this._currentConnection) != null) { | ||
ref.close(); | ||
} | ||
return _this._currentConnection = null; | ||
}; | ||
})(this)); | ||
}; | ||
// Ignore errors closing channels. | ||
})).catch(function() {}).then(() => { | ||
var ref; | ||
this._channels = []; | ||
if ((ref = this._currentConnection) != null) { | ||
ref.close(); | ||
} | ||
return this._currentConnection = null; | ||
}); | ||
} | ||
AmqpConnectionManager.prototype.isConnected = function() { | ||
isConnected() { | ||
return this._currentConnection != null; | ||
}; | ||
} | ||
AmqpConnectionManager.prototype._connect = function() { | ||
_connect() { | ||
if (this._closed || this._connecting || this.isConnected()) { | ||
@@ -93,91 +102,78 @@ return Promise.resolve(); | ||
this._connecting = true; | ||
return Promise.resolve().then((function(_this) { | ||
return function() { | ||
if (!_this._urls || (_this._currentUrl >= _this._urls.length)) { | ||
_this._currentUrl = 0; | ||
return pb.callFn(_this._findServers, 0, null); | ||
} else { | ||
return _this._urls; | ||
} | ||
}; | ||
})(this)).then((function(_this) { | ||
return function(urls) { | ||
var amqpUrl, url; | ||
if ((urls != null) && !_.isArray(urls)) { | ||
urls = [urls]; | ||
} | ||
_this._urls = urls; | ||
if (!urls || urls.length === 0) { | ||
throw new Error('amqp-connection-manager: No servers found'); | ||
} | ||
url = urls[_this._currentUrl]; | ||
_this._currentUrl++; | ||
amqpUrl = urlUtils.parse(url); | ||
if (amqpUrl.search != null) { | ||
amqpUrl.search += "&heartbeat=" + _this.heartbeatIntervalInSeconds; | ||
} else { | ||
amqpUrl.search = "?heartbeat=" + _this.heartbeatIntervalInSeconds; | ||
} | ||
return amqp.connect(urlUtils.format(amqpUrl), _this.connectionOptions).then(function(connection) { | ||
_this._currentConnection = connection; | ||
connection.on('blocked', function(reason) { | ||
return _this.emit('blocked', { | ||
reason: reason | ||
}); | ||
return Promise.resolve().then(() => { | ||
if (!this._urls || (this._currentUrl >= this._urls.length)) { | ||
this._currentUrl = 0; | ||
return pb.callFn(this._findServers, 0, null); | ||
} else { | ||
return this._urls; | ||
} | ||
}).then((urls) => { | ||
var amqpUrl, url; | ||
if ((urls != null) && !_.isArray(urls)) { | ||
urls = [urls]; | ||
} | ||
this._urls = urls; | ||
if (!urls || urls.length === 0) { | ||
throw new Error('amqp-connection-manager: No servers found'); | ||
} | ||
// Round robin between brokers | ||
url = urls[this._currentUrl]; | ||
this._currentUrl++; | ||
amqpUrl = urlUtils.parse(url); | ||
if (amqpUrl.search != null) { | ||
amqpUrl.search += `&heartbeat=${this.heartbeatIntervalInSeconds}`; | ||
} else { | ||
amqpUrl.search = `?heartbeat=${this.heartbeatIntervalInSeconds}`; | ||
} | ||
return amqp.connect(urlUtils.format(amqpUrl), this.connectionOptions).then((connection) => { | ||
this._currentConnection = connection; | ||
//emit 'blocked' when RabbitMQ server decides to block the connection (resources running low) | ||
connection.on('blocked', (reason) => { | ||
return this.emit('blocked', {reason}); | ||
}); | ||
connection.on('unblocked', () => { | ||
return this.emit('unblocked'); | ||
}); | ||
// Reconnect if the broker goes away. | ||
connection.on('error', (err) => { | ||
return Promise.resolve().then(function() { | ||
return this._currentConnection.close(); | ||
}).catch(function(err) {}).then(() => { // Ignore | ||
this._currentConnection = null; | ||
this.emit('disconnect', {err}); | ||
return this._connect(); | ||
}).catch(function(err) { | ||
/* !pragma coverage-skip-block */ | ||
// `_connect()` should never throw. | ||
return console.error("amqp-connection-manager: AmqpConnectionManager:_connect()" + " - How did you get here?", err.stack); | ||
}); | ||
connection.on('unblocked', function() { | ||
return _this.emit('unblocked'); | ||
}); | ||
// Reconnect if the connection closes gracefully | ||
connection.on('close', (err) => { | ||
this._currentConnection = null; | ||
this.emit('disconnect', {err}); | ||
return wait(this.reconnectTimeInSeconds * 1000).then(() => { | ||
return this._connect(); | ||
}); | ||
connection.on('error', function(err) { | ||
return Promise.resolve().then(function() { | ||
return this._currentConnection.close(); | ||
})["catch"](function(err) {}).then(function() { | ||
_this._currentConnection = null; | ||
_this.emit('disconnect', { | ||
err: err | ||
}); | ||
return _this._connect(); | ||
})["catch"](function(err) { | ||
/* !pragma coverage-skip-block */ | ||
return console.error("amqp-connection-manager: AmqpConnectionManager:_connect()" + " - How did you get here?", err.stack); | ||
}); | ||
}); | ||
connection.on('close', function(err) { | ||
_this._currentConnection = null; | ||
_this.emit('disconnect', { | ||
err: err | ||
}); | ||
return wait(_this.reconnectTimeInSeconds * 1000).then(function() { | ||
return _this._connect(); | ||
}); | ||
}); | ||
_this._connecting = false; | ||
_this.emit('connect', { | ||
connection: connection, | ||
url: url | ||
}); | ||
return null; | ||
}); | ||
}; | ||
})(this))["catch"]((function(_this) { | ||
return function(err) { | ||
_this.emit('disconnect', { | ||
err: err | ||
}); | ||
_this._currentConnection = null; | ||
return wait(_this.reconnectTimeInSeconds * 1000).then(function() { | ||
_this._connecting = false; | ||
return _this._connect(); | ||
}); | ||
}; | ||
})(this)); | ||
}; | ||
this._connecting = false; | ||
this.emit('connect', {connection, url}); | ||
return null; | ||
}); | ||
}).catch((err) => { | ||
this.emit('disconnect', {err}); | ||
// Connection failed... | ||
this._currentConnection = null; | ||
// TODO: Probably want to try right away here, especially if there are multiple brokers to try... | ||
return wait(this.reconnectTimeInSeconds * 1000).then(() => { | ||
this._connecting = false; | ||
return this._connect(); | ||
}); | ||
}); | ||
} | ||
return AmqpConnectionManager; | ||
}; | ||
})(EventEmitter); | ||
module.exports = AmqpConnectionManager; | ||
}).call(this); |
@@ -1,12 +0,9 @@ | ||
// Generated by CoffeeScript 1.12.7 | ||
// Generated by CoffeeScript 2.2.4 | ||
(function() { | ||
var ChannelWrapper, EventEmitter, Promise, _, pb, ref, | ||
bind = function(fn, me){ return function(){ return fn.apply(me, arguments); }; }, | ||
extend = function(child, parent) { for (var key in parent) { if (hasProp.call(parent, key)) child[key] = parent[key]; } function ctor() { this.constructor = child; } ctor.prototype = parent.prototype; child.prototype = new ctor(); child.__super__ = parent.prototype; return child; }, | ||
hasProp = {}.hasOwnProperty, | ||
slice = [].slice; | ||
boundMethodCheck = function(instance, Constructor) { if (!(instance instanceof Constructor)) { throw new Error('Bound instance method accessed before binding'); } }; | ||
Promise = (ref = global.Promise) != null ? ref : require('es6-promise').Promise; | ||
EventEmitter = require('events').EventEmitter; | ||
({EventEmitter} = require('events')); | ||
@@ -17,48 +14,74 @@ _ = require('lodash'); | ||
ChannelWrapper = (function(superClass) { | ||
extend(ChannelWrapper, superClass); | ||
ChannelWrapper = (function() { | ||
// Calls to `publish()` or `sendToQueue()` work just like in amqplib, but messages are queued internally and | ||
// are guaranteed to be delivered. If the underlying connection drops, ChannelWrapper will wait for a new | ||
// connection and continue. | ||
function ChannelWrapper(connectionManager, options) { | ||
var ref1; | ||
if (options == null) { | ||
options = {}; | ||
// Events: | ||
// * `connect` - emitted every time this channel connects or reconnects. | ||
// * `error(err, {name})` - emitted if an error occurs setting up the channel. | ||
// * `drop({message, err})` - called when a JSON message was dropped because it could not be encoded. | ||
// * `close` - emitted when this channel closes via a call to `close()` | ||
class ChannelWrapper extends EventEmitter { | ||
// Create a new ChannelWrapper. | ||
// * `options.name` is a name for this channel. Handy for debugging. | ||
// * `options.setup` is a default setup function to call. See `addSetup` for details. | ||
// * `options.json` if true, then ChannelWrapper assumes all messages passed to `publish()` and `sendToQueue()` | ||
// are plain JSON objects. These will be encoded automatically before being sent. | ||
constructor(connectionManager, options = {}) { | ||
var ref1; | ||
super(); | ||
// Called whenever we connect to the broker. | ||
this._onConnect = this._onConnect.bind(this); | ||
// Wait for another reconnect to create a new channel. | ||
// Called whenever we disconnect from the AMQP server. | ||
this._onDisconnect = this._onDisconnect.bind(this); | ||
this._connectionManager = connectionManager; | ||
this.name = options.name; | ||
this._json = (ref1 = options.json) != null ? ref1 : false; | ||
// Place to store queued messages. | ||
this._messages = []; | ||
// True if the "worker" is busy sending messages. False if we need to start the worker to get stuff done. | ||
this._working = false; | ||
// If we're in the process of creating a channel, this is a Promise which will resolve when the channel is | ||
// set up. Otherwise, this is `null`. | ||
this._settingUp = null; | ||
// The currently connected channel. Note that not all setup functions have been run on this channel until | ||
// `@_settingUp` is either null or resolved. | ||
this._channel = null; | ||
// We kill off workers when we disconnect. Whenever we start a new worker, we bump up the `_workerNumber` - | ||
// this makes it so if stale workers ever do wake up, they'll know to stop working. | ||
this._workerNumber = 0; | ||
// Array of setup functions to call. | ||
this._setups = []; | ||
if (options.setup != null) { | ||
this._setups.push(options.setup); | ||
} | ||
if (connectionManager.isConnected()) { | ||
this._onConnect({ | ||
connection: this._connectionManager._currentConnection | ||
}); | ||
} | ||
connectionManager.on('connect', this._onConnect); | ||
connectionManager.on('disconnect', this._onDisconnect); | ||
} | ||
this._onDisconnect = bind(this._onDisconnect, this); | ||
this._onConnect = bind(this._onConnect, this); | ||
this._connectionManager = connectionManager; | ||
this.name = options.name; | ||
this._json = (ref1 = options.json) != null ? ref1 : false; | ||
this._messages = []; | ||
this._working = false; | ||
this._settingUp = null; | ||
this._channel = null; | ||
this._workerNumber = 0; | ||
this._setups = []; | ||
if (options.setup != null) { | ||
this._setups.push(options.setup); | ||
} | ||
if (connectionManager.isConnected()) { | ||
this._onConnect({ | ||
connection: this._connectionManager._currentConnection | ||
}); | ||
} | ||
connectionManager.on('connect', this._onConnect); | ||
connectionManager.on('disconnect', this._onDisconnect); | ||
} | ||
ChannelWrapper.prototype._onConnect = function(arg) { | ||
var connection; | ||
connection = arg.connection; | ||
this._connection = connection; | ||
return connection.createConfirmChannel().then((function(_this) { | ||
return function(channel) { | ||
_this._channel = channel; | ||
channel.on('close', function() { | ||
return _this._onChannelClose(channel); | ||
_onConnect({connection}) { | ||
boundMethodCheck(this, ChannelWrapper); | ||
this._connection = connection; | ||
return connection.createConfirmChannel().then((channel) => { | ||
this._channel = channel; | ||
channel.on('close', () => { | ||
return this._onChannelClose(channel); | ||
}); | ||
return _this._settingUp = Promise.all(_this._setups.map(function(setupFn) { | ||
return pb.call(setupFn, null, channel)["catch"](function(err) { | ||
if (_this._channel) { | ||
return _this.emit('error', err, { | ||
name: _this.name | ||
return this._settingUp = Promise.all(this._setups.map((setupFn) => { | ||
// TODO: Use a timeout here to guard against setupFns that never resolve? | ||
return pb.call(setupFn, null, channel).catch((err) => { | ||
if (this._channel) { | ||
return this.emit('error', err, { | ||
name: this.name | ||
}); | ||
@@ -69,120 +92,93 @@ } else { | ||
}); | ||
})).then(function() { | ||
_this._settingUp = null; | ||
return _this._channel; | ||
// Don't emit an error if setups failed because the channel was closing. | ||
})).then(() => { | ||
this._settingUp = null; | ||
return this._channel; | ||
}); | ||
}; | ||
})(this)).then((function(_this) { | ||
return function() { | ||
if (_this._channel == null) { | ||
}).then(() => { | ||
if (this._channel == null) { // Can happen if channel closes while we're setting up. | ||
return; | ||
} | ||
_this._startWorker(); | ||
return _this.emit('connect'); | ||
}; | ||
})(this))["catch"]((function(_this) { | ||
return function(err) { | ||
_this.emit('error', err, { | ||
name: _this.name | ||
// Since we just connected, publish any queued messages | ||
this._startWorker(); | ||
return this.emit('connect'); | ||
}).catch((err) => { | ||
this.emit('error', err, { | ||
name: this.name | ||
}); | ||
_this._settingUp = null; | ||
return _this._channel = null; | ||
}; | ||
})(this)); | ||
}; | ||
this._settingUp = null; | ||
return this._channel = null; | ||
}); | ||
} | ||
ChannelWrapper.prototype._onChannelClose = function(channel) { | ||
if (this._channel === channel) { | ||
return this._channel = null; | ||
// Called whenever the channel closes. | ||
_onChannelClose(channel) { | ||
if (this._channel === channel) { | ||
return this._channel = null; | ||
} | ||
} | ||
}; | ||
ChannelWrapper.prototype._onDisconnect = function() { | ||
this._channel = null; | ||
this._settingUp = null; | ||
return this._working = false; | ||
}; | ||
_onDisconnect() { | ||
boundMethodCheck(this, ChannelWrapper); | ||
this._channel = null; | ||
this._settingUp = null; | ||
// Kill off the current worker. We never get any kind of error for messages in flight - see | ||
// https://github.com/squaremo/amqp.node/issues/191. | ||
return this._working = false; | ||
} | ||
ChannelWrapper.prototype.addSetup = pb["break"](function(setup) { | ||
return (this._settingUp || Promise.resolve()).then((function(_this) { | ||
return function() { | ||
_this._setups.push(setup); | ||
if (_this._channel) { | ||
return pb.call(setup, null, _this._channel); | ||
} | ||
}; | ||
})(this)); | ||
}); | ||
// Returns the number of unsent messages queued on this channel. | ||
queueLength() { | ||
return this._messages.length; | ||
} | ||
ChannelWrapper.prototype.removeSetup = pb["break"](function(setup, teardown) { | ||
this._setups = _.without(this._setups, setup); | ||
return (this._settingUp || Promise.resolve()).then((function(_this) { | ||
return function() { | ||
if (_this._channel) { | ||
return pb.call(teardown, null, _this._channel); | ||
} | ||
}; | ||
})(this)); | ||
}); | ||
// Destroy this channel. | ||
ChannelWrapper.prototype.queueLength = function() { | ||
return this._messages.length; | ||
}; | ||
// Any unsent messages will have their associated Promises rejected. | ||
ChannelWrapper.prototype.close = function() { | ||
return Promise.resolve().then((function(_this) { | ||
return function() { | ||
close() { | ||
return Promise.resolve().then(() => { | ||
var answer, ref1, ref2; | ||
_this._working = false; | ||
if (_this._messages.length !== 0) { | ||
_this._messages.forEach(function(message) { | ||
this._working = false; | ||
if (this._messages.length !== 0) { | ||
// Reject any unsent messages. | ||
this._messages.forEach(function(message) { | ||
return message.reject(new Error('Channel closed')); | ||
}); | ||
} | ||
_this._connectionManager.removeListener('connect', _this._onConnect); | ||
_this._connectionManager.removeListener('disconnect', _this._onDisconnect); | ||
answer = (ref1 = (ref2 = _this._channel) != null ? ref2.close() : void 0) != null ? ref1 : Promise.resolve(); | ||
_this._channel = null; | ||
_this.emit('close'); | ||
this._connectionManager.removeListener('connect', this._onConnect); | ||
this._connectionManager.removeListener('disconnect', this._onDisconnect); | ||
answer = (ref1 = (ref2 = this._channel) != null ? ref2.close() : void 0) != null ? ref1 : Promise.resolve(); | ||
this._channel = null; | ||
this.emit('close'); | ||
return answer; | ||
}; | ||
})(this)); | ||
}; | ||
}); | ||
} | ||
ChannelWrapper.prototype.waitForConnect = pb["break"](function() { | ||
if (this._channel && !this._settingUp) { | ||
return Promise.resolve(); | ||
} else { | ||
return new Promise((function(_this) { | ||
return function(resolve) { | ||
return _this.once('connect', resolve); | ||
}; | ||
})(this)); | ||
_shouldPublish() { | ||
return (this._messages.length > 0) && !this._settingUp && this._channel; | ||
} | ||
}); | ||
ChannelWrapper.prototype._shouldPublish = function() { | ||
return (this._messages.length > 0) && !this._settingUp && this._channel; | ||
}; | ||
ChannelWrapper.prototype._startWorker = function() { | ||
if (!this._working && this._shouldPublish()) { | ||
this._working = true; | ||
this._workerNumber++; | ||
return this._publishQueuedMessages(this._workerNumber); | ||
// Start publishing queued messages, if there isn't already a worker doing this. | ||
_startWorker() { | ||
if (!this._working && this._shouldPublish()) { | ||
this._working = true; | ||
this._workerNumber++; | ||
return this._publishQueuedMessages(this._workerNumber); | ||
} | ||
} | ||
}; | ||
ChannelWrapper.prototype._publishQueuedMessages = function(workerNumber) { | ||
var channel, message; | ||
if (!this._shouldPublish() || !this._working || (workerNumber !== this._workerNumber)) { | ||
this._working = false; | ||
return Promise.resolve(); | ||
} | ||
channel = this._channel; | ||
message = this._messages[0]; | ||
Promise.resolve().then((function(_this) { | ||
return function() { | ||
_publishQueuedMessages(workerNumber) { | ||
var channel, message; | ||
if (!this._shouldPublish() || !this._working || (workerNumber !== this._workerNumber)) { | ||
// Can't publish anything right now... | ||
this._working = false; | ||
return Promise.resolve(); | ||
} | ||
channel = this._channel; | ||
message = this._messages[0]; | ||
Promise.resolve().then(() => { | ||
var encodedMessage, sendPromise; | ||
encodedMessage = _this._json ? new Buffer(JSON.stringify(message.content)) : message.content; | ||
encodedMessage = this._json ? new Buffer(JSON.stringify(message.content)) : message.content; | ||
sendPromise = (function() { | ||
@@ -215,85 +211,142 @@ switch (message.type) { | ||
default: | ||
/* !pragma coverage-skip-block */ | ||
throw new Error("Unhandled message type " + message.type); | ||
throw new Error(`Unhandled message type ${message.type}`); | ||
} | ||
})(); | ||
return sendPromise; | ||
}; | ||
})(this)).then((function(_this) { | ||
return function(result) { | ||
_this._messages.shift(); | ||
}).then((result) => { | ||
this._messages.shift(); | ||
message.resolve(result); | ||
return _this._publishQueuedMessages(workerNumber); | ||
}; | ||
})(this), (function(_this) { | ||
return function(err) { | ||
if (!_this._channel) { | ||
// Send some more! | ||
return this._publishQueuedMessages(workerNumber); | ||
}, (err) => { | ||
if (!this._channel) { | ||
} else { | ||
_this._messages.shift(); | ||
// Something went wrong trying to send this message - could be JSON.stringify failed, could be the | ||
// broker rejected the message. Either way, reject it back | ||
// Tried to write to a closed channel. Leave the message in the queue and we'll try again when we | ||
// reconnect. | ||
this._messages.shift(); | ||
message.reject(err); | ||
return _this._publishQueuedMessages(workerNumber); | ||
// Send some more! | ||
return this._publishQueuedMessages(workerNumber); | ||
} | ||
}; | ||
})(this))["catch"]((function(_this) { | ||
return function(err) { | ||
}).catch((err) => { | ||
/* !pragma coverage-skip-block */ | ||
console.error("amqp-connection-manager: ChannelWrapper:_publishQueuedMessages() - How did you get here?", err.stack); | ||
_this.emit('error', err); | ||
return _this._working = false; | ||
}; | ||
})(this)); | ||
return null; | ||
}; | ||
this.emit('error', err); | ||
return this._working = false; | ||
}); | ||
return null; | ||
} | ||
ChannelWrapper.prototype.ack = function() { | ||
var args, ref1; | ||
args = 1 <= arguments.length ? slice.call(arguments, 0) : []; | ||
return (ref1 = this._channel) != null ? ref1.ack.apply(ref1, args) : void 0; | ||
}; | ||
// Send an `ack` to the underlying channel. | ||
ack(...args) { | ||
var ref1; | ||
return (ref1 = this._channel) != null ? ref1.ack(...args) : void 0; | ||
} | ||
ChannelWrapper.prototype.nack = function() { | ||
var args, ref1; | ||
args = 1 <= arguments.length ? slice.call(arguments, 0) : []; | ||
return (ref1 = this._channel) != null ? ref1.nack.apply(ref1, args) : void 0; | ||
// Send a `nack` to the underlying channel. | ||
nack(...args) { | ||
var ref1; | ||
return (ref1 = this._channel) != null ? ref1.nack(...args) : void 0; | ||
} | ||
}; | ||
ChannelWrapper.prototype.publish = pb["break"](function(exchange, routingKey, content, options) { | ||
return new Promise((function(_this) { | ||
return function(resolve, reject) { | ||
_this._messages.push({ | ||
type: 'publish', | ||
exchange: exchange, | ||
routingKey: routingKey, | ||
content: content, | ||
options: options, | ||
resolve: resolve, | ||
reject: reject | ||
}); | ||
return _this._startWorker(); | ||
}; | ||
})(this)); | ||
// Adds a new 'setup handler'. | ||
// `setup(channel, [cb])` is a function to call when a new underlying channel is created - handy for asserting | ||
// exchanges and queues exists, and whatnot. The `channel` object here is a ConfigChannel from amqplib. | ||
// The `setup` function should return a Promise (or optionally take a callback) - no messages will be sent until | ||
// this Promise resolves. | ||
// If there is a connection, `setup()` will be run immediately, and the addSetup Promise/callback won't resolve | ||
// until `setup` is complete. Note that in this case, if the setup throws an error, no 'error' event will | ||
// be emitted, since you can just handle the error here (although the `setup` will still be added for future | ||
// reconnects, even if it throws an error.) | ||
// Setup functions should, ideally, not throw errors, but if they do then the ChannelWrapper will emit an 'error' | ||
// event. | ||
ChannelWrapper.prototype.addSetup = pb.break(function(setup) { | ||
return (this._settingUp || Promise.resolve()).then(() => { | ||
this._setups.push(setup); | ||
if (this._channel) { | ||
return pb.call(setup, null, this._channel); | ||
} | ||
}); | ||
}); | ||
ChannelWrapper.prototype.sendToQueue = pb["break"](function(queue, content, options) { | ||
return new Promise((function(_this) { | ||
return function(resolve, reject) { | ||
_this._messages.push({ | ||
type: 'sendToQueue', | ||
queue: queue, | ||
content: content, | ||
options: options, | ||
resolve: resolve, | ||
reject: reject | ||
}); | ||
return _this._startWorker(); | ||
}; | ||
})(this)); | ||
// Remove a setup function added with `addSetup`. If there is currently connection, `teardown(channel, [cb])` will | ||
// be run immediately, and the returned Promise will not resolve until it completes. | ||
ChannelWrapper.prototype.removeSetup = pb.break(function(setup, teardown) { | ||
this._setups = _.without(this._setups, setup); | ||
return (this._settingUp || Promise.resolve()).then(() => { | ||
if (this._channel) { | ||
return pb.call(teardown, null, this._channel); | ||
} | ||
}); | ||
}); | ||
// Returns a Promise which resolves when this channel next connects. | ||
// (Mainly here for unit testing...) | ||
ChannelWrapper.prototype.waitForConnect = pb.break(function() { | ||
if (this._channel && !this._settingUp) { | ||
return Promise.resolve(); | ||
} else { | ||
return new Promise((resolve) => { | ||
return this.once('connect', resolve); | ||
}); | ||
} | ||
}); | ||
// Publish a message to the channel. | ||
// This works just like amqplib's `publish()`, except if the channel is not connected, this will wait until the | ||
// channel is connected. Returns a Promise which will only resolve when the message has been succesfully sent. | ||
// The returned promise will be rejected if `close()` is called on this channel before it can be sent, if | ||
// `options.json` is set and the message can't be encoded, or if the broker rejects the message for some reason. | ||
ChannelWrapper.prototype.publish = pb.break(function(exchange, routingKey, content, options) { | ||
return new Promise((resolve, reject) => { | ||
this._messages.push({ | ||
type: 'publish', | ||
exchange, | ||
routingKey, | ||
content, | ||
options, | ||
resolve, | ||
reject | ||
}); | ||
return this._startWorker(); | ||
}); | ||
}); | ||
// Send a message to a queue. | ||
// This works just like amqplib's `sendToQueue`, except if the channel is not connected, this will wait until the | ||
// channel is connected. Returns a Promise which will only resolve when the message has been succesfully sent. | ||
// The returned promise will be rejected only if `close()` is called on this channel before it can be sent. | ||
// `message` here should be a JSON-able object. | ||
ChannelWrapper.prototype.sendToQueue = pb.break(function(queue, content, options) { | ||
return new Promise((resolve, reject) => { | ||
this._messages.push({ | ||
type: 'sendToQueue', | ||
queue, | ||
content, | ||
options, | ||
resolve, | ||
reject | ||
}); | ||
return this._startWorker(); | ||
}); | ||
}); | ||
return ChannelWrapper; | ||
})(EventEmitter); | ||
}).call(this); | ||
@@ -300,0 +353,0 @@ module.exports = ChannelWrapper; |
@@ -1,2 +0,2 @@ | ||
// Generated by CoffeeScript 1.12.7 | ||
// Generated by CoffeeScript 2.2.4 | ||
(function() { | ||
@@ -3,0 +3,0 @@ var Promise, ref; |
@@ -1,2 +0,2 @@ | ||
// Generated by CoffeeScript 1.12.7 | ||
// Generated by CoffeeScript 2.2.4 | ||
(function() { | ||
@@ -3,0 +3,0 @@ var AmqpConnectionManager; |
{ | ||
"name": "amqp-connection-manager", | ||
"version": "1.4.0", | ||
"version": "1.4.1", | ||
"description": "Auto-reconnect and round robin support for amqplib.", | ||
@@ -20,8 +20,9 @@ "main": "lib/index.js", | ||
"chai-string": "^1.1.2", | ||
"coffee-coverage": "^2.0.1", | ||
"coffee-script": "^1.9.3", | ||
"coffee-coverage": "^3.0.0", | ||
"coffeescript": "^2.2.4", | ||
"coveralls": "^3.0.0", | ||
"greenkeeper-lockfile": "^1.14.0", | ||
"istanbul": "^0.4.0", | ||
"mocha": "^4.0.1", | ||
"proxyquire": "^1.7.0", | ||
"mocha": "^5.1.1", | ||
"proxyquire": "^2.0.1", | ||
"sinon": "^4.0.1" | ||
@@ -28,0 +29,0 @@ }, |
[![Build Status](https://travis-ci.org/benbria/node-amqp-connection-manager.svg?branch=master)](https://travis-ci.org/benbria/node-amqp-connection-manager) | ||
[![Coverage Status](https://coveralls.io/repos/benbria/node-amqp-connection-manager/badge.svg?branch=master&service=github)](https://coveralls.io/github/benbria/node-amqp-connection-manager?branch=master) | ||
[![Greenkeeper badge](https://badges.greenkeeper.io/benbria/node-amqp-connection-manager.svg)](https://greenkeeper.io/) | ||
@@ -8,2 +9,3 @@ [![Dependency Status](https://david-dm.org/benbria/node-amqp-connection-manager.svg)](https://david-dm.org/benbria/node-amqp-connection-manager) | ||
Connection management for amqplib. | ||
@@ -10,0 +12,0 @@ |
Sorry, the diff of this file is not supported yet
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
SPDX disjunction
LicenseSPDX disjunction for an artifact's license information
Found 1 instance in 1 package
SPDX disjunction
LicenseSPDX disjunction for an artifact's license information
Found 1 instance in 1 package
32161
480
181
12
320546
1