You're Invited:Meet the Socket Team at BlackHat and DEF CON in Las Vegas, Aug 7-8.RSVP
Socket
Socket
Sign inDemoInstall

amqp-connection-manager

Package Overview
Dependencies
Maintainers
1
Versions
67
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

Comparing version 3.5.2 to 3.6.0

6

dist/cjs/AmqpConnectionManager.js

@@ -224,3 +224,3 @@ "use strict";

this.emit('disconnect', { err });
const handle = helpers_js_1.wait(this.reconnectTimeInSeconds * 1000);
const handle = (0, helpers_js_1.wait)(this.reconnectTimeInSeconds * 1000);
this._cancelRetriesHandler = handle.cancel;

@@ -249,6 +249,6 @@ handle.promise

if (err.name === 'OperationalError' && err.message === 'connect ETIMEDOUT') {
handle = helpers_js_1.wait(0);
handle = (0, helpers_js_1.wait)(0);
}
else {
handle = helpers_js_1.wait(this.reconnectTimeInSeconds * 1000);
handle = (0, helpers_js_1.wait)(this.reconnectTimeInSeconds * 1000);
}

@@ -255,0 +255,0 @@ this._cancelRetriesHandler = handle.cancel;

@@ -60,2 +60,4 @@ "use strict";

this._unconfirmedMessages = [];
/** Consumers which will be reconnected on channel errors etc. */
this._consumers = [];
/**

@@ -83,2 +85,3 @@ * True if the "worker" is busy sending messages. False if we need to

this._setups = [];
this._consumers = [];
if (options.setup) {

@@ -233,3 +236,7 @@ this._setups.push(options.setup);

this.emit('error', err, { name: this.name });
}))).then(() => {
})))
.then(() => {
return Promise.all(this._consumers.map((c) => this._reconnectConsumer(c)));
})
.then(() => {
this._settingUp = undefined;

@@ -416,2 +423,68 @@ });

}
/**
* Setup a consumer
* This consumer will be reconnected on cancellation and channel errors.
*/
async consume(queue, onMessage, options = {}) {
const consumer = {
consumerTag: null,
queue,
onMessage,
options,
};
this._consumers.push(consumer);
await this._consume(consumer);
}
async _consume(consumer) {
if (!this._channel) {
return;
}
const { prefetch, ...options } = consumer.options;
if (typeof prefetch === 'number') {
this._channel.prefetch(prefetch, false);
}
const { consumerTag } = await this._channel.consume(consumer.queue, (msg) => {
if (!msg) {
consumer.consumerTag = null;
this._reconnectConsumer(consumer).catch((err) => {
if (err.isOperational && err.message.includes('BasicConsume; 404')) {
// Ignore errors caused by queue not declared. In
// those cases the connection will reconnect and
// then consumers reestablished. The full reconnect
// might be avoided if we assert the queue again
// before starting to consume.
return;
}
throw err;
});
return;
}
consumer.onMessage(msg);
}, options);
consumer.consumerTag = consumerTag;
}
async _reconnectConsumer(consumer) {
if (!this._consumers.includes(consumer)) {
// Intentionally canceled
return;
}
await this._consume(consumer);
}
/**
* Cancel all consumers
*/
async cancelAll() {
const consumers = this._consumers;
this._consumers = [];
if (!this._channel) {
return;
}
const channel = this._channel;
await Promise.all(consumers.reduce((acc, consumer) => {
if (consumer.consumerTag) {
acc.push(channel.cancel(consumer.consumerTag));
}
return acc;
}, []));
}
/** Send an `ack` to the underlying channel. */

@@ -418,0 +491,0 @@ ack(message, allUpTo) {

@@ -22,2 +22,11 @@ /// <reference types="node" />

}
interface ConsumerOptions extends amqplib.Options.Consume {
prefetch?: number;
}
interface Consumer {
consumerTag: string | null;
queue: string;
onMessage: (msg: amqplib.ConsumeMessage) => void;
options: ConsumerOptions;
}
/**

@@ -49,2 +58,4 @@ * Calls to `publish()` or `sendToQueue()` work just like in amqplib, but messages are queued internally and

private _irrecoverableCode;
/** Consumers which will be reconnected on channel errors etc. */
private _consumers;
/**

@@ -174,2 +185,13 @@ * The currently connected channel. Note that not all setup functions

private _publishQueuedMessages;
/**
* Setup a consumer
* This consumer will be reconnected on cancellation and channel errors.
*/
consume(queue: string, onMessage: Consumer['onMessage'], options?: ConsumerOptions): Promise<void>;
private _consume;
private _reconnectConsumer;
/**
* Cancel all consumers
*/
cancelAll(): Promise<void>;
/** Send an `ack` to the underlying channel. */

@@ -198,1 +220,2 @@ ack(message: amqplib.Message, allUpTo?: boolean): void;

}
export {};

@@ -55,2 +55,4 @@ import { EventEmitter } from 'events';

this._unconfirmedMessages = [];
/** Consumers which will be reconnected on channel errors etc. */
this._consumers = [];
/**

@@ -78,2 +80,3 @@ * True if the "worker" is busy sending messages. False if we need to

this._setups = [];
this._consumers = [];
if (options.setup) {

@@ -228,3 +231,7 @@ this._setups.push(options.setup);

this.emit('error', err, { name: this.name });
}))).then(() => {
})))
.then(() => {
return Promise.all(this._consumers.map((c) => this._reconnectConsumer(c)));
})
.then(() => {
this._settingUp = undefined;

@@ -411,2 +418,68 @@ });

}
/**
* Setup a consumer
* This consumer will be reconnected on cancellation and channel errors.
*/
async consume(queue, onMessage, options = {}) {
const consumer = {
consumerTag: null,
queue,
onMessage,
options,
};
this._consumers.push(consumer);
await this._consume(consumer);
}
async _consume(consumer) {
if (!this._channel) {
return;
}
const { prefetch, ...options } = consumer.options;
if (typeof prefetch === 'number') {
this._channel.prefetch(prefetch, false);
}
const { consumerTag } = await this._channel.consume(consumer.queue, (msg) => {
if (!msg) {
consumer.consumerTag = null;
this._reconnectConsumer(consumer).catch((err) => {
if (err.isOperational && err.message.includes('BasicConsume; 404')) {
// Ignore errors caused by queue not declared. In
// those cases the connection will reconnect and
// then consumers reestablished. The full reconnect
// might be avoided if we assert the queue again
// before starting to consume.
return;
}
throw err;
});
return;
}
consumer.onMessage(msg);
}, options);
consumer.consumerTag = consumerTag;
}
async _reconnectConsumer(consumer) {
if (!this._consumers.includes(consumer)) {
// Intentionally canceled
return;
}
await this._consume(consumer);
}
/**
* Cancel all consumers
*/
async cancelAll() {
const consumers = this._consumers;
this._consumers = [];
if (!this._channel) {
return;
}
const channel = this._channel;
await Promise.all(consumers.reduce((acc, consumer) => {
if (consumer.consumerTag) {
acc.push(channel.cancel(consumer.consumerTag));
}
return acc;
}, []));
}
/** Send an `ack` to the underlying channel. */

@@ -413,0 +486,0 @@ ack(message, allUpTo) {

{
"name": "amqp-connection-manager",
"version": "3.5.2",
"version": "3.6.0",
"description": "Auto-reconnect and round robin support for amqplib.",

@@ -5,0 +5,0 @@ "module": "./dist/esm/index.js",

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc