You're Invited:Meet the Socket Team at BlackHat and DEF CON in Las Vegas, Aug 7-8.RSVP
Socket
Socket
Sign inDemoInstall

amqp-connection-manager

Package Overview
Dependencies
Maintainers
1
Versions
67
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

Comparing version 3.5.1 to 3.5.2

186

dist/cjs/ChannelWrapper.js

@@ -8,2 +8,3 @@ "use strict";

const promise_breaker_1 = __importDefault(require("promise-breaker"));
const MAX_MESSAGES_PER_BATCH = 1000;
const IRRECOVERABLE_ERRORS = [

@@ -71,2 +72,6 @@ 403,

this._workerNumber = 0;
/**
* True if the underlying channel has room for more messages.
*/
this._channelHasRoom = true;
this._onConnect = this._onConnect.bind(this);

@@ -217,3 +222,5 @@ this._onDisconnect = this._onDisconnect.bind(this);

this._channel = channel;
this._channelHasRoom = true;
channel.on('close', () => this._onChannelClose(channel));
channel.on('drain', () => this._onChannelDrain());
this._settingUp = Promise.all(this._setups.map((setupFn) =>

@@ -235,9 +242,2 @@ // TODO: Use a timeout here to guard against setupFns that never resolve?

}
if (this._unconfirmedMessages.length > 0) {
// requeue any messages that were left unconfirmed when connection was lost
let message;
while ((message = this._unconfirmedMessages.shift())) {
this._messages.push(message);
}
}
// Since we just connected, publish any queued messages

@@ -260,2 +260,7 @@ this._startWorker();

}
/** Called whenever the channel drains. */
_onChannelDrain() {
this._channelHasRoom = true;
this._startWorker();
}
// Called whenever we disconnect from the AMQP server.

@@ -298,3 +303,3 @@ _onDisconnect(ex) {

_shouldPublish() {
return this._messages.length > 0 && !this._settingUp && !!this._channel;
return (this._messages.length > 0 && !this._settingUp && !!this._channel && this._channelHasRoom);
}

@@ -313,2 +318,40 @@ // Start publishing queued messages, if there isn't already a worker doing this.

}
_messageResolved(message, result) {
removeUnconfirmedMessage(this._unconfirmedMessages, message);
message.resolve(result);
}
_messageRejected(message, err) {
if (!this._channel && this._canWaitReconnection()) {
// Tried to write to a closed channel. Leave the message in the queue and we'll try again when
// we reconnect.
removeUnconfirmedMessage(this._unconfirmedMessages, message);
this._messages.push(message);
}
else {
// Something went wrong trying to send this message - could be JSON.stringify failed, could be
// the broker rejected the message. Either way, reject it back
removeUnconfirmedMessage(this._unconfirmedMessages, message);
message.reject(err);
}
}
_getEncodedMessage(content) {
let encodedMessage;
if (this._json) {
encodedMessage = Buffer.from(JSON.stringify(content));
}
else if (typeof content === 'string') {
encodedMessage = Buffer.from(content);
}
else if (content instanceof Buffer) {
encodedMessage = content;
}
else if (typeof content === 'object' && typeof content.toString === 'function') {
encodedMessage = Buffer.from(content.toString());
}
else {
console.warn('amqp-connection-manager: Sending JSON message, but json option not speicifed');
encodedMessage = Buffer.from(JSON.stringify(content));
}
return encodedMessage;
}
_publishQueuedMessages(workerNumber) {

@@ -324,85 +367,54 @@ const channel = this._channel;

}
const message = this._messages.shift();
if (message) {
this._unconfirmedMessages.push(message);
Promise.resolve()
.then(() => {
let encodedMessage;
if (this._json) {
encodedMessage = Buffer.from(JSON.stringify(message.content));
try {
// Send messages in batches of 1000 - don't want to starve the event loop.
let sendsLeft = MAX_MESSAGES_PER_BATCH;
while (this._channelHasRoom && this._messages.length > 0 && sendsLeft > 0) {
sendsLeft--;
const message = this._messages.shift();
if (!message) {
break;
}
else if (typeof message.content === 'string') {
encodedMessage = Buffer.from(message.content);
}
else if (message.content instanceof Buffer) {
encodedMessage = message.content;
}
else if (typeof message.content === 'object' &&
typeof message.content.toString === 'function') {
encodedMessage = Buffer.from(message.content.toString());
}
else {
throw new Error('Invalid message content');
}
let result = true;
const sendPromise = (() => {
switch (message.type) {
case 'publish':
return new Promise(function (resolve, reject) {
result = channel.publish(message.exchange, message.routingKey, encodedMessage, message.options, (err) => {
if (err) {
reject(err);
}
else {
resolve(result);
}
});
});
case 'sendToQueue':
return new Promise(function (resolve, reject) {
result = channel.sendToQueue(message.queue, encodedMessage, message.options, (err) => {
if (err) {
reject(err);
}
else {
resolve(result);
}
});
});
/* istanbul ignore next */
default:
throw new Error(`Unhandled message type ${message.type}`);
this._unconfirmedMessages.push(message);
const encodedMessage = this._getEncodedMessage(message.content);
switch (message.type) {
case 'publish': {
let thisCanSend = true;
thisCanSend = this._channelHasRoom = channel.publish(message.exchange, message.routingKey, encodedMessage, message.options, (err) => {
if (err) {
this._messageRejected(message, err);
}
else {
this._messageResolved(message, thisCanSend);
}
});
break;
}
})();
if (result) {
this._publishQueuedMessages(workerNumber);
case 'sendToQueue': {
let thisCanSend = true;
thisCanSend = this._channelHasRoom = channel.sendToQueue(message.queue, encodedMessage, message.options, (err) => {
if (err) {
this._messageRejected(message, err);
}
else {
this._messageResolved(message, thisCanSend);
}
});
break;
}
/* istanbul ignore next */
default:
throw new Error(`Unhandled message type ${message.type}`);
}
else {
channel.once('drain', () => this._publishQueuedMessages(workerNumber));
}
return sendPromise;
})
.then((result) => {
removeUnconfirmedMessage(this._unconfirmedMessages, message);
message.resolve(result);
}, (err) => {
if (!this._channel && this._canWaitReconnection()) {
// Tried to write to a closed channel. Leave the message in the queue and we'll try again when
// we reconnect.
removeUnconfirmedMessage(this._unconfirmedMessages, message);
this._messages.push(message);
}
else {
// Something went wrong trying to send this message - could be JSON.stringify failed, could be
// the broker rejected the message. Either way, reject it back
removeUnconfirmedMessage(this._unconfirmedMessages, message);
message.reject(err);
}
})
.catch(
/* istanbul ignore next */ (err) => {
this.emit('error', err);
this._working = false;
});
}
// If we didn't send all the messages, send some more...
if (this._channelHasRoom && this._messages.length > 0) {
setImmediate(() => this._publishQueuedMessages(workerNumber));
}
this._working = false;
/* istanbul ignore next */
}
catch (err) {
this._working = false;
this.emit('error', err);
}
}

@@ -409,0 +421,0 @@ /** Send an `ack` to the underlying channel. */

@@ -65,2 +65,6 @@ /// <reference types="node" />

private _workerNumber;
/**
* True if the underlying channel has room for more messages.
*/
private _channelHasRoom;
name?: string;

@@ -157,2 +161,4 @@ addListener(event: string, listener: (...args: any[]) => void): this;

private _onChannelClose;
/** Called whenever the channel drains. */
private _onChannelDrain;
private _onDisconnect;

@@ -164,2 +170,5 @@ queueLength(): number;

private _canWaitReconnection;
private _messageResolved;
private _messageRejected;
private _getEncodedMessage;
private _publishQueuedMessages;

@@ -166,0 +175,0 @@ /** Send an `ack` to the underlying channel. */

import { EventEmitter } from 'events';
import pb from 'promise-breaker';
const MAX_MESSAGES_PER_BATCH = 1000;
const IRRECOVERABLE_ERRORS = [

@@ -65,2 +66,6 @@ 403,

this._workerNumber = 0;
/**
* True if the underlying channel has room for more messages.
*/
this._channelHasRoom = true;
this._onConnect = this._onConnect.bind(this);

@@ -211,3 +216,5 @@ this._onDisconnect = this._onDisconnect.bind(this);

this._channel = channel;
this._channelHasRoom = true;
channel.on('close', () => this._onChannelClose(channel));
channel.on('drain', () => this._onChannelDrain());
this._settingUp = Promise.all(this._setups.map((setupFn) =>

@@ -229,9 +236,2 @@ // TODO: Use a timeout here to guard against setupFns that never resolve?

}
if (this._unconfirmedMessages.length > 0) {
// requeue any messages that were left unconfirmed when connection was lost
let message;
while ((message = this._unconfirmedMessages.shift())) {
this._messages.push(message);
}
}
// Since we just connected, publish any queued messages

@@ -254,2 +254,7 @@ this._startWorker();

}
/** Called whenever the channel drains. */
_onChannelDrain() {
this._channelHasRoom = true;
this._startWorker();
}
// Called whenever we disconnect from the AMQP server.

@@ -292,3 +297,3 @@ _onDisconnect(ex) {

_shouldPublish() {
return this._messages.length > 0 && !this._settingUp && !!this._channel;
return (this._messages.length > 0 && !this._settingUp && !!this._channel && this._channelHasRoom);
}

@@ -307,2 +312,40 @@ // Start publishing queued messages, if there isn't already a worker doing this.

}
_messageResolved(message, result) {
removeUnconfirmedMessage(this._unconfirmedMessages, message);
message.resolve(result);
}
_messageRejected(message, err) {
if (!this._channel && this._canWaitReconnection()) {
// Tried to write to a closed channel. Leave the message in the queue and we'll try again when
// we reconnect.
removeUnconfirmedMessage(this._unconfirmedMessages, message);
this._messages.push(message);
}
else {
// Something went wrong trying to send this message - could be JSON.stringify failed, could be
// the broker rejected the message. Either way, reject it back
removeUnconfirmedMessage(this._unconfirmedMessages, message);
message.reject(err);
}
}
_getEncodedMessage(content) {
let encodedMessage;
if (this._json) {
encodedMessage = Buffer.from(JSON.stringify(content));
}
else if (typeof content === 'string') {
encodedMessage = Buffer.from(content);
}
else if (content instanceof Buffer) {
encodedMessage = content;
}
else if (typeof content === 'object' && typeof content.toString === 'function') {
encodedMessage = Buffer.from(content.toString());
}
else {
console.warn('amqp-connection-manager: Sending JSON message, but json option not speicifed');
encodedMessage = Buffer.from(JSON.stringify(content));
}
return encodedMessage;
}
_publishQueuedMessages(workerNumber) {

@@ -318,85 +361,54 @@ const channel = this._channel;

}
const message = this._messages.shift();
if (message) {
this._unconfirmedMessages.push(message);
Promise.resolve()
.then(() => {
let encodedMessage;
if (this._json) {
encodedMessage = Buffer.from(JSON.stringify(message.content));
try {
// Send messages in batches of 1000 - don't want to starve the event loop.
let sendsLeft = MAX_MESSAGES_PER_BATCH;
while (this._channelHasRoom && this._messages.length > 0 && sendsLeft > 0) {
sendsLeft--;
const message = this._messages.shift();
if (!message) {
break;
}
else if (typeof message.content === 'string') {
encodedMessage = Buffer.from(message.content);
}
else if (message.content instanceof Buffer) {
encodedMessage = message.content;
}
else if (typeof message.content === 'object' &&
typeof message.content.toString === 'function') {
encodedMessage = Buffer.from(message.content.toString());
}
else {
throw new Error('Invalid message content');
}
let result = true;
const sendPromise = (() => {
switch (message.type) {
case 'publish':
return new Promise(function (resolve, reject) {
result = channel.publish(message.exchange, message.routingKey, encodedMessage, message.options, (err) => {
if (err) {
reject(err);
}
else {
resolve(result);
}
});
});
case 'sendToQueue':
return new Promise(function (resolve, reject) {
result = channel.sendToQueue(message.queue, encodedMessage, message.options, (err) => {
if (err) {
reject(err);
}
else {
resolve(result);
}
});
});
/* istanbul ignore next */
default:
throw new Error(`Unhandled message type ${message.type}`);
this._unconfirmedMessages.push(message);
const encodedMessage = this._getEncodedMessage(message.content);
switch (message.type) {
case 'publish': {
let thisCanSend = true;
thisCanSend = this._channelHasRoom = channel.publish(message.exchange, message.routingKey, encodedMessage, message.options, (err) => {
if (err) {
this._messageRejected(message, err);
}
else {
this._messageResolved(message, thisCanSend);
}
});
break;
}
})();
if (result) {
this._publishQueuedMessages(workerNumber);
case 'sendToQueue': {
let thisCanSend = true;
thisCanSend = this._channelHasRoom = channel.sendToQueue(message.queue, encodedMessage, message.options, (err) => {
if (err) {
this._messageRejected(message, err);
}
else {
this._messageResolved(message, thisCanSend);
}
});
break;
}
/* istanbul ignore next */
default:
throw new Error(`Unhandled message type ${message.type}`);
}
else {
channel.once('drain', () => this._publishQueuedMessages(workerNumber));
}
return sendPromise;
})
.then((result) => {
removeUnconfirmedMessage(this._unconfirmedMessages, message);
message.resolve(result);
}, (err) => {
if (!this._channel && this._canWaitReconnection()) {
// Tried to write to a closed channel. Leave the message in the queue and we'll try again when
// we reconnect.
removeUnconfirmedMessage(this._unconfirmedMessages, message);
this._messages.push(message);
}
else {
// Something went wrong trying to send this message - could be JSON.stringify failed, could be
// the broker rejected the message. Either way, reject it back
removeUnconfirmedMessage(this._unconfirmedMessages, message);
message.reject(err);
}
})
.catch(
/* istanbul ignore next */ (err) => {
this.emit('error', err);
this._working = false;
});
}
// If we didn't send all the messages, send some more...
if (this._channelHasRoom && this._messages.length > 0) {
setImmediate(() => this._publishQueuedMessages(workerNumber));
}
this._working = false;
/* istanbul ignore next */
}
catch (err) {
this._working = false;
this.emit('error', err);
}
}

@@ -403,0 +415,0 @@ /** Send an `ack` to the underlying channel. */

{
"name": "amqp-connection-manager",
"version": "3.5.1",
"version": "3.5.2",
"description": "Auto-reconnect and round robin support for amqplib.",

@@ -5,0 +5,0 @@ "module": "./dist/esm/index.js",

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc