Socket
Socket
Sign inDemoInstall

amqp-connection-manager

Package Overview
Dependencies
19
Maintainers
12
Versions
67
Alerts
File Explorer

Advanced tools

Install Socket

Detect and block malicious and high-risk dependencies

Install

Comparing version 1.4.0 to 1.4.1

234

lib/AmqpConnectionManager.js

@@ -1,8 +0,6 @@

// Generated by CoffeeScript 1.12.7
// Generated by CoffeeScript 2.2.4
(function() {
var AmqpConnectionManager, ChannelWrapper, EventEmitter, HEARTBEAT_IN_SECONDS, _, amqp, pb, urlUtils, wait,
extend = function(child, parent) { for (var key in parent) { if (hasProp.call(parent, key)) child[key] = parent[key]; } function ctor() { this.constructor = child; } ctor.prototype = parent.prototype; child.prototype = new ctor(); child.__super__ = parent.prototype; return child; },
hasProp = {}.hasOwnProperty;
var AmqpConnectionManager, ChannelWrapper, EventEmitter, HEARTBEAT_IN_SECONDS, _, amqp, pb, urlUtils, wait;
EventEmitter = require('events').EventEmitter;
({EventEmitter} = require('events'));

@@ -17,16 +15,31 @@ amqp = require('amqplib');

wait = require('./helpers').wait;
({wait} = require('./helpers'));
pb = require('promise-breaker');
// Default heartbeat time.
HEARTBEAT_IN_SECONDS = 5;
AmqpConnectionManager = (function(superClass) {
extend(AmqpConnectionManager, superClass);
// Events:
// * `connect({connection, url})` - Emitted whenever we connect to a broker.
// * `disconnect({err})` - Emitted whenever we disconnect from a broker.
function AmqpConnectionManager(urls, options) {
AmqpConnectionManager = class AmqpConnectionManager extends EventEmitter {
// Create a new AmqplibConnectionManager.
// * `urls` is an array of brokers to connect to. AmqplibConnectionManager will round-robin between them
// whenever it needs to create a new connection.
// * `options.heartbeatIntervalInSeconds` is the interval, in seconds, to send heartbeats. Defaults to 5 seconds.
// * `options.reconnectTimeInSeconds` is the time to wait before trying to reconnect. If not specified,
// defaults to `heartbeatIntervalInSeconds`.
// * `options.connectionOptions` is passed to the amqplib connect method.
// * `options.findServers(callback)` is a function which returns one or more servers to connect to. This should
// return either a single URL or an array of URLs. This is handy when you're using a service discovery mechanism
// such as Consul or etcd. Instead of taking a `callback`, this can also return a Promise. Note that if this
// is supplied, then `urls` is ignored.
constructor(urls, options = {}) {
var ref, ref1, ref2;
if (options == null) {
options = {};
}
super();
if (!urls && !options.findServers) {

@@ -40,2 +53,3 @@ throw new Error("Must supply either `urls` or `findServers`");

this.reconnectTimeInSeconds = (ref1 = options.reconnectTimeInSeconds) != null ? ref1 : this.heartbeatIntervalInSeconds;
// There will be one listener per channel, and there could be a lot of channels, so disable warnings from node.
this.setMaxListeners(0);

@@ -48,18 +62,14 @@ this._findServers = (ref2 = options.findServers) != null ? ref2 : (function() {

AmqpConnectionManager.prototype.createChannel = function(options) {
// `options` here are any options that can be passed to ChannelWrapper.
createChannel(options = {}) {
var channel;
if (options == null) {
options = {};
}
channel = new ChannelWrapper(this, options);
this._channels.push(channel);
channel.once('close', (function(_this) {
return function() {
return _this._channels = _.without(_this._channels, channel);
};
})(this));
channel.once('close', () => {
return this._channels = _.without(this._channels, channel);
});
return channel;
};
}
AmqpConnectionManager.prototype.close = function() {
close() {
if (this._closed) {

@@ -71,19 +81,18 @@ return;

return channel.close();
}))["catch"](function() {}).then((function(_this) {
return function() {
var ref;
_this._channels = [];
if ((ref = _this._currentConnection) != null) {
ref.close();
}
return _this._currentConnection = null;
};
})(this));
};
// Ignore errors closing channels.
})).catch(function() {}).then(() => {
var ref;
this._channels = [];
if ((ref = this._currentConnection) != null) {
ref.close();
}
return this._currentConnection = null;
});
}
AmqpConnectionManager.prototype.isConnected = function() {
isConnected() {
return this._currentConnection != null;
};
}
AmqpConnectionManager.prototype._connect = function() {
_connect() {
if (this._closed || this._connecting || this.isConnected()) {

@@ -93,91 +102,78 @@ return Promise.resolve();

this._connecting = true;
return Promise.resolve().then((function(_this) {
return function() {
if (!_this._urls || (_this._currentUrl >= _this._urls.length)) {
_this._currentUrl = 0;
return pb.callFn(_this._findServers, 0, null);
} else {
return _this._urls;
}
};
})(this)).then((function(_this) {
return function(urls) {
var amqpUrl, url;
if ((urls != null) && !_.isArray(urls)) {
urls = [urls];
}
_this._urls = urls;
if (!urls || urls.length === 0) {
throw new Error('amqp-connection-manager: No servers found');
}
url = urls[_this._currentUrl];
_this._currentUrl++;
amqpUrl = urlUtils.parse(url);
if (amqpUrl.search != null) {
amqpUrl.search += "&heartbeat=" + _this.heartbeatIntervalInSeconds;
} else {
amqpUrl.search = "?heartbeat=" + _this.heartbeatIntervalInSeconds;
}
return amqp.connect(urlUtils.format(amqpUrl), _this.connectionOptions).then(function(connection) {
_this._currentConnection = connection;
connection.on('blocked', function(reason) {
return _this.emit('blocked', {
reason: reason
});
return Promise.resolve().then(() => {
if (!this._urls || (this._currentUrl >= this._urls.length)) {
this._currentUrl = 0;
return pb.callFn(this._findServers, 0, null);
} else {
return this._urls;
}
}).then((urls) => {
var amqpUrl, url;
if ((urls != null) && !_.isArray(urls)) {
urls = [urls];
}
this._urls = urls;
if (!urls || urls.length === 0) {
throw new Error('amqp-connection-manager: No servers found');
}
// Round robin between brokers
url = urls[this._currentUrl];
this._currentUrl++;
amqpUrl = urlUtils.parse(url);
if (amqpUrl.search != null) {
amqpUrl.search += `&heartbeat=${this.heartbeatIntervalInSeconds}`;
} else {
amqpUrl.search = `?heartbeat=${this.heartbeatIntervalInSeconds}`;
}
return amqp.connect(urlUtils.format(amqpUrl), this.connectionOptions).then((connection) => {
this._currentConnection = connection;
//emit 'blocked' when RabbitMQ server decides to block the connection (resources running low)
connection.on('blocked', (reason) => {
return this.emit('blocked', {reason});
});
connection.on('unblocked', () => {
return this.emit('unblocked');
});
// Reconnect if the broker goes away.
connection.on('error', (err) => {
return Promise.resolve().then(function() {
return this._currentConnection.close();
}).catch(function(err) {}).then(() => { // Ignore
this._currentConnection = null;
this.emit('disconnect', {err});
return this._connect();
}).catch(function(err) {
/* !pragma coverage-skip-block */
// `_connect()` should never throw.
return console.error("amqp-connection-manager: AmqpConnectionManager:_connect()" + " - How did you get here?", err.stack);
});
connection.on('unblocked', function() {
return _this.emit('unblocked');
});
// Reconnect if the connection closes gracefully
connection.on('close', (err) => {
this._currentConnection = null;
this.emit('disconnect', {err});
return wait(this.reconnectTimeInSeconds * 1000).then(() => {
return this._connect();
});
connection.on('error', function(err) {
return Promise.resolve().then(function() {
return this._currentConnection.close();
})["catch"](function(err) {}).then(function() {
_this._currentConnection = null;
_this.emit('disconnect', {
err: err
});
return _this._connect();
})["catch"](function(err) {
/* !pragma coverage-skip-block */
return console.error("amqp-connection-manager: AmqpConnectionManager:_connect()" + " - How did you get here?", err.stack);
});
});
connection.on('close', function(err) {
_this._currentConnection = null;
_this.emit('disconnect', {
err: err
});
return wait(_this.reconnectTimeInSeconds * 1000).then(function() {
return _this._connect();
});
});
_this._connecting = false;
_this.emit('connect', {
connection: connection,
url: url
});
return null;
});
};
})(this))["catch"]((function(_this) {
return function(err) {
_this.emit('disconnect', {
err: err
});
_this._currentConnection = null;
return wait(_this.reconnectTimeInSeconds * 1000).then(function() {
_this._connecting = false;
return _this._connect();
});
};
})(this));
};
this._connecting = false;
this.emit('connect', {connection, url});
return null;
});
}).catch((err) => {
this.emit('disconnect', {err});
// Connection failed...
this._currentConnection = null;
// TODO: Probably want to try right away here, especially if there are multiple brokers to try...
return wait(this.reconnectTimeInSeconds * 1000).then(() => {
this._connecting = false;
return this._connect();
});
});
}
return AmqpConnectionManager;
};
})(EventEmitter);
module.exports = AmqpConnectionManager;
}).call(this);

@@ -1,12 +0,9 @@

// Generated by CoffeeScript 1.12.7
// Generated by CoffeeScript 2.2.4
(function() {
var ChannelWrapper, EventEmitter, Promise, _, pb, ref,
bind = function(fn, me){ return function(){ return fn.apply(me, arguments); }; },
extend = function(child, parent) { for (var key in parent) { if (hasProp.call(parent, key)) child[key] = parent[key]; } function ctor() { this.constructor = child; } ctor.prototype = parent.prototype; child.prototype = new ctor(); child.__super__ = parent.prototype; return child; },
hasProp = {}.hasOwnProperty,
slice = [].slice;
boundMethodCheck = function(instance, Constructor) { if (!(instance instanceof Constructor)) { throw new Error('Bound instance method accessed before binding'); } };
Promise = (ref = global.Promise) != null ? ref : require('es6-promise').Promise;
EventEmitter = require('events').EventEmitter;
({EventEmitter} = require('events'));

@@ -17,48 +14,74 @@ _ = require('lodash');

ChannelWrapper = (function(superClass) {
extend(ChannelWrapper, superClass);
ChannelWrapper = (function() {
// Calls to `publish()` or `sendToQueue()` work just like in amqplib, but messages are queued internally and
// are guaranteed to be delivered. If the underlying connection drops, ChannelWrapper will wait for a new
// connection and continue.
function ChannelWrapper(connectionManager, options) {
var ref1;
if (options == null) {
options = {};
// Events:
// * `connect` - emitted every time this channel connects or reconnects.
// * `error(err, {name})` - emitted if an error occurs setting up the channel.
// * `drop({message, err})` - called when a JSON message was dropped because it could not be encoded.
// * `close` - emitted when this channel closes via a call to `close()`
class ChannelWrapper extends EventEmitter {
// Create a new ChannelWrapper.
// * `options.name` is a name for this channel. Handy for debugging.
// * `options.setup` is a default setup function to call. See `addSetup` for details.
// * `options.json` if true, then ChannelWrapper assumes all messages passed to `publish()` and `sendToQueue()`
// are plain JSON objects. These will be encoded automatically before being sent.
constructor(connectionManager, options = {}) {
var ref1;
super();
// Called whenever we connect to the broker.
this._onConnect = this._onConnect.bind(this);
// Wait for another reconnect to create a new channel.
// Called whenever we disconnect from the AMQP server.
this._onDisconnect = this._onDisconnect.bind(this);
this._connectionManager = connectionManager;
this.name = options.name;
this._json = (ref1 = options.json) != null ? ref1 : false;
// Place to store queued messages.
this._messages = [];
// True if the "worker" is busy sending messages. False if we need to start the worker to get stuff done.
this._working = false;
// If we're in the process of creating a channel, this is a Promise which will resolve when the channel is
// set up. Otherwise, this is `null`.
this._settingUp = null;
// The currently connected channel. Note that not all setup functions have been run on this channel until
// `@_settingUp` is either null or resolved.
this._channel = null;
// We kill off workers when we disconnect. Whenever we start a new worker, we bump up the `_workerNumber` -
// this makes it so if stale workers ever do wake up, they'll know to stop working.
this._workerNumber = 0;
// Array of setup functions to call.
this._setups = [];
if (options.setup != null) {
this._setups.push(options.setup);
}
if (connectionManager.isConnected()) {
this._onConnect({
connection: this._connectionManager._currentConnection
});
}
connectionManager.on('connect', this._onConnect);
connectionManager.on('disconnect', this._onDisconnect);
}
this._onDisconnect = bind(this._onDisconnect, this);
this._onConnect = bind(this._onConnect, this);
this._connectionManager = connectionManager;
this.name = options.name;
this._json = (ref1 = options.json) != null ? ref1 : false;
this._messages = [];
this._working = false;
this._settingUp = null;
this._channel = null;
this._workerNumber = 0;
this._setups = [];
if (options.setup != null) {
this._setups.push(options.setup);
}
if (connectionManager.isConnected()) {
this._onConnect({
connection: this._connectionManager._currentConnection
});
}
connectionManager.on('connect', this._onConnect);
connectionManager.on('disconnect', this._onDisconnect);
}
ChannelWrapper.prototype._onConnect = function(arg) {
var connection;
connection = arg.connection;
this._connection = connection;
return connection.createConfirmChannel().then((function(_this) {
return function(channel) {
_this._channel = channel;
channel.on('close', function() {
return _this._onChannelClose(channel);
_onConnect({connection}) {
boundMethodCheck(this, ChannelWrapper);
this._connection = connection;
return connection.createConfirmChannel().then((channel) => {
this._channel = channel;
channel.on('close', () => {
return this._onChannelClose(channel);
});
return _this._settingUp = Promise.all(_this._setups.map(function(setupFn) {
return pb.call(setupFn, null, channel)["catch"](function(err) {
if (_this._channel) {
return _this.emit('error', err, {
name: _this.name
return this._settingUp = Promise.all(this._setups.map((setupFn) => {
// TODO: Use a timeout here to guard against setupFns that never resolve?
return pb.call(setupFn, null, channel).catch((err) => {
if (this._channel) {
return this.emit('error', err, {
name: this.name
});

@@ -69,120 +92,93 @@ } else {

});
})).then(function() {
_this._settingUp = null;
return _this._channel;
// Don't emit an error if setups failed because the channel was closing.
})).then(() => {
this._settingUp = null;
return this._channel;
});
};
})(this)).then((function(_this) {
return function() {
if (_this._channel == null) {
}).then(() => {
if (this._channel == null) { // Can happen if channel closes while we're setting up.
return;
}
_this._startWorker();
return _this.emit('connect');
};
})(this))["catch"]((function(_this) {
return function(err) {
_this.emit('error', err, {
name: _this.name
// Since we just connected, publish any queued messages
this._startWorker();
return this.emit('connect');
}).catch((err) => {
this.emit('error', err, {
name: this.name
});
_this._settingUp = null;
return _this._channel = null;
};
})(this));
};
this._settingUp = null;
return this._channel = null;
});
}
ChannelWrapper.prototype._onChannelClose = function(channel) {
if (this._channel === channel) {
return this._channel = null;
// Called whenever the channel closes.
_onChannelClose(channel) {
if (this._channel === channel) {
return this._channel = null;
}
}
};
ChannelWrapper.prototype._onDisconnect = function() {
this._channel = null;
this._settingUp = null;
return this._working = false;
};
_onDisconnect() {
boundMethodCheck(this, ChannelWrapper);
this._channel = null;
this._settingUp = null;
// Kill off the current worker. We never get any kind of error for messages in flight - see
// https://github.com/squaremo/amqp.node/issues/191.
return this._working = false;
}
ChannelWrapper.prototype.addSetup = pb["break"](function(setup) {
return (this._settingUp || Promise.resolve()).then((function(_this) {
return function() {
_this._setups.push(setup);
if (_this._channel) {
return pb.call(setup, null, _this._channel);
}
};
})(this));
});
// Returns the number of unsent messages queued on this channel.
queueLength() {
return this._messages.length;
}
ChannelWrapper.prototype.removeSetup = pb["break"](function(setup, teardown) {
this._setups = _.without(this._setups, setup);
return (this._settingUp || Promise.resolve()).then((function(_this) {
return function() {
if (_this._channel) {
return pb.call(teardown, null, _this._channel);
}
};
})(this));
});
// Destroy this channel.
ChannelWrapper.prototype.queueLength = function() {
return this._messages.length;
};
// Any unsent messages will have their associated Promises rejected.
ChannelWrapper.prototype.close = function() {
return Promise.resolve().then((function(_this) {
return function() {
close() {
return Promise.resolve().then(() => {
var answer, ref1, ref2;
_this._working = false;
if (_this._messages.length !== 0) {
_this._messages.forEach(function(message) {
this._working = false;
if (this._messages.length !== 0) {
// Reject any unsent messages.
this._messages.forEach(function(message) {
return message.reject(new Error('Channel closed'));
});
}
_this._connectionManager.removeListener('connect', _this._onConnect);
_this._connectionManager.removeListener('disconnect', _this._onDisconnect);
answer = (ref1 = (ref2 = _this._channel) != null ? ref2.close() : void 0) != null ? ref1 : Promise.resolve();
_this._channel = null;
_this.emit('close');
this._connectionManager.removeListener('connect', this._onConnect);
this._connectionManager.removeListener('disconnect', this._onDisconnect);
answer = (ref1 = (ref2 = this._channel) != null ? ref2.close() : void 0) != null ? ref1 : Promise.resolve();
this._channel = null;
this.emit('close');
return answer;
};
})(this));
};
});
}
ChannelWrapper.prototype.waitForConnect = pb["break"](function() {
if (this._channel && !this._settingUp) {
return Promise.resolve();
} else {
return new Promise((function(_this) {
return function(resolve) {
return _this.once('connect', resolve);
};
})(this));
_shouldPublish() {
return (this._messages.length > 0) && !this._settingUp && this._channel;
}
});
ChannelWrapper.prototype._shouldPublish = function() {
return (this._messages.length > 0) && !this._settingUp && this._channel;
};
ChannelWrapper.prototype._startWorker = function() {
if (!this._working && this._shouldPublish()) {
this._working = true;
this._workerNumber++;
return this._publishQueuedMessages(this._workerNumber);
// Start publishing queued messages, if there isn't already a worker doing this.
_startWorker() {
if (!this._working && this._shouldPublish()) {
this._working = true;
this._workerNumber++;
return this._publishQueuedMessages(this._workerNumber);
}
}
};
ChannelWrapper.prototype._publishQueuedMessages = function(workerNumber) {
var channel, message;
if (!this._shouldPublish() || !this._working || (workerNumber !== this._workerNumber)) {
this._working = false;
return Promise.resolve();
}
channel = this._channel;
message = this._messages[0];
Promise.resolve().then((function(_this) {
return function() {
_publishQueuedMessages(workerNumber) {
var channel, message;
if (!this._shouldPublish() || !this._working || (workerNumber !== this._workerNumber)) {
// Can't publish anything right now...
this._working = false;
return Promise.resolve();
}
channel = this._channel;
message = this._messages[0];
Promise.resolve().then(() => {
var encodedMessage, sendPromise;
encodedMessage = _this._json ? new Buffer(JSON.stringify(message.content)) : message.content;
encodedMessage = this._json ? new Buffer(JSON.stringify(message.content)) : message.content;
sendPromise = (function() {

@@ -215,85 +211,142 @@ switch (message.type) {

default:
/* !pragma coverage-skip-block */
throw new Error("Unhandled message type " + message.type);
throw new Error(`Unhandled message type ${message.type}`);
}
})();
return sendPromise;
};
})(this)).then((function(_this) {
return function(result) {
_this._messages.shift();
}).then((result) => {
this._messages.shift();
message.resolve(result);
return _this._publishQueuedMessages(workerNumber);
};
})(this), (function(_this) {
return function(err) {
if (!_this._channel) {
// Send some more!
return this._publishQueuedMessages(workerNumber);
}, (err) => {
if (!this._channel) {
} else {
_this._messages.shift();
// Something went wrong trying to send this message - could be JSON.stringify failed, could be the
// broker rejected the message. Either way, reject it back
// Tried to write to a closed channel. Leave the message in the queue and we'll try again when we
// reconnect.
this._messages.shift();
message.reject(err);
return _this._publishQueuedMessages(workerNumber);
// Send some more!
return this._publishQueuedMessages(workerNumber);
}
};
})(this))["catch"]((function(_this) {
return function(err) {
}).catch((err) => {
/* !pragma coverage-skip-block */
console.error("amqp-connection-manager: ChannelWrapper:_publishQueuedMessages() - How did you get here?", err.stack);
_this.emit('error', err);
return _this._working = false;
};
})(this));
return null;
};
this.emit('error', err);
return this._working = false;
});
return null;
}
ChannelWrapper.prototype.ack = function() {
var args, ref1;
args = 1 <= arguments.length ? slice.call(arguments, 0) : [];
return (ref1 = this._channel) != null ? ref1.ack.apply(ref1, args) : void 0;
};
// Send an `ack` to the underlying channel.
ack(...args) {
var ref1;
return (ref1 = this._channel) != null ? ref1.ack(...args) : void 0;
}
ChannelWrapper.prototype.nack = function() {
var args, ref1;
args = 1 <= arguments.length ? slice.call(arguments, 0) : [];
return (ref1 = this._channel) != null ? ref1.nack.apply(ref1, args) : void 0;
// Send a `nack` to the underlying channel.
nack(...args) {
var ref1;
return (ref1 = this._channel) != null ? ref1.nack(...args) : void 0;
}
};
ChannelWrapper.prototype.publish = pb["break"](function(exchange, routingKey, content, options) {
return new Promise((function(_this) {
return function(resolve, reject) {
_this._messages.push({
type: 'publish',
exchange: exchange,
routingKey: routingKey,
content: content,
options: options,
resolve: resolve,
reject: reject
});
return _this._startWorker();
};
})(this));
// Adds a new 'setup handler'.
// `setup(channel, [cb])` is a function to call when a new underlying channel is created - handy for asserting
// exchanges and queues exists, and whatnot. The `channel` object here is a ConfigChannel from amqplib.
// The `setup` function should return a Promise (or optionally take a callback) - no messages will be sent until
// this Promise resolves.
// If there is a connection, `setup()` will be run immediately, and the addSetup Promise/callback won't resolve
// until `setup` is complete. Note that in this case, if the setup throws an error, no 'error' event will
// be emitted, since you can just handle the error here (although the `setup` will still be added for future
// reconnects, even if it throws an error.)
// Setup functions should, ideally, not throw errors, but if they do then the ChannelWrapper will emit an 'error'
// event.
ChannelWrapper.prototype.addSetup = pb.break(function(setup) {
return (this._settingUp || Promise.resolve()).then(() => {
this._setups.push(setup);
if (this._channel) {
return pb.call(setup, null, this._channel);
}
});
});
ChannelWrapper.prototype.sendToQueue = pb["break"](function(queue, content, options) {
return new Promise((function(_this) {
return function(resolve, reject) {
_this._messages.push({
type: 'sendToQueue',
queue: queue,
content: content,
options: options,
resolve: resolve,
reject: reject
});
return _this._startWorker();
};
})(this));
// Remove a setup function added with `addSetup`. If there is currently connection, `teardown(channel, [cb])` will
// be run immediately, and the returned Promise will not resolve until it completes.
ChannelWrapper.prototype.removeSetup = pb.break(function(setup, teardown) {
this._setups = _.without(this._setups, setup);
return (this._settingUp || Promise.resolve()).then(() => {
if (this._channel) {
return pb.call(teardown, null, this._channel);
}
});
});
// Returns a Promise which resolves when this channel next connects.
// (Mainly here for unit testing...)
ChannelWrapper.prototype.waitForConnect = pb.break(function() {
if (this._channel && !this._settingUp) {
return Promise.resolve();
} else {
return new Promise((resolve) => {
return this.once('connect', resolve);
});
}
});
// Publish a message to the channel.
// This works just like amqplib's `publish()`, except if the channel is not connected, this will wait until the
// channel is connected. Returns a Promise which will only resolve when the message has been succesfully sent.
// The returned promise will be rejected if `close()` is called on this channel before it can be sent, if
// `options.json` is set and the message can't be encoded, or if the broker rejects the message for some reason.
ChannelWrapper.prototype.publish = pb.break(function(exchange, routingKey, content, options) {
return new Promise((resolve, reject) => {
this._messages.push({
type: 'publish',
exchange,
routingKey,
content,
options,
resolve,
reject
});
return this._startWorker();
});
});
// Send a message to a queue.
// This works just like amqplib's `sendToQueue`, except if the channel is not connected, this will wait until the
// channel is connected. Returns a Promise which will only resolve when the message has been succesfully sent.
// The returned promise will be rejected only if `close()` is called on this channel before it can be sent.
// `message` here should be a JSON-able object.
ChannelWrapper.prototype.sendToQueue = pb.break(function(queue, content, options) {
return new Promise((resolve, reject) => {
this._messages.push({
type: 'sendToQueue',
queue,
content,
options,
resolve,
reject
});
return this._startWorker();
});
});
return ChannelWrapper;
})(EventEmitter);
}).call(this);

@@ -300,0 +353,0 @@ module.exports = ChannelWrapper;

@@ -1,2 +0,2 @@

// Generated by CoffeeScript 1.12.7
// Generated by CoffeeScript 2.2.4
(function() {

@@ -3,0 +3,0 @@ var Promise, ref;

@@ -1,2 +0,2 @@

// Generated by CoffeeScript 1.12.7
// Generated by CoffeeScript 2.2.4
(function() {

@@ -3,0 +3,0 @@ var AmqpConnectionManager;

{
"name": "amqp-connection-manager",
"version": "1.4.0",
"version": "1.4.1",
"description": "Auto-reconnect and round robin support for amqplib.",

@@ -20,8 +20,9 @@ "main": "lib/index.js",

"chai-string": "^1.1.2",
"coffee-coverage": "^2.0.1",
"coffee-script": "^1.9.3",
"coffee-coverage": "^3.0.0",
"coffeescript": "^2.2.4",
"coveralls": "^3.0.0",
"greenkeeper-lockfile": "^1.14.0",
"istanbul": "^0.4.0",
"mocha": "^4.0.1",
"proxyquire": "^1.7.0",
"mocha": "^5.1.1",
"proxyquire": "^2.0.1",
"sinon": "^4.0.1"

@@ -28,0 +29,0 @@ },

[![Build Status](https://travis-ci.org/benbria/node-amqp-connection-manager.svg?branch=master)](https://travis-ci.org/benbria/node-amqp-connection-manager)
[![Coverage Status](https://coveralls.io/repos/benbria/node-amqp-connection-manager/badge.svg?branch=master&service=github)](https://coveralls.io/github/benbria/node-amqp-connection-manager?branch=master)
[![Greenkeeper badge](https://badges.greenkeeper.io/benbria/node-amqp-connection-manager.svg)](https://greenkeeper.io/)

@@ -8,2 +9,3 @@ [![Dependency Status](https://david-dm.org/benbria/node-amqp-connection-manager.svg)](https://david-dm.org/benbria/node-amqp-connection-manager)

Connection management for amqplib.

@@ -10,0 +12,0 @@

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap

Packages

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc