You're Invited:Meet the Socket Team at BlackHat and DEF CON in Las Vegas, Aug 7-8.RSVP
Socket
Socket
Sign inDemoInstall

amqp-connection-manager

Package Overview
Dependencies
Maintainers
1
Versions
67
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

Comparing version 3.6.0 to 3.7.0

34

dist/cjs/AmqpConnectionManager.js

@@ -75,3 +75,37 @@ "use strict";

this._findServers = options.findServers || (() => Promise.resolve(urls));
}
/**
* Start the connect retries and await the first connect result. Even if the initial connect fails or timeouts, the
* reconnect attempts will continue in the background.
* @param [options={}] -
* @param [options.timeout] - Time to wait for initial connect
*/
async connect({ timeout } = {}) {
this._connect();
let reject;
const onDisconnect = ({ err }) => {
// Ignore disconnects caused by dead servers etc., but throw on operational errors like bad credentials.
if (err.isOperational) {
reject(err);
}
};
try {
await Promise.race([
(0, events_1.once)(this, 'connect'),
new Promise((_resolve, innerReject) => {
reject = innerReject;
this.on('disconnect', onDisconnect);
}),
...(timeout
? [
(0, helpers_js_1.wait)(timeout).promise.then(() => {
throw new Error('amqp-connection-manager: connect timeout');
}),
]
: []),
]);
}
finally {
this.removeListener('disconnect', onDisconnect);
}
}

@@ -78,0 +112,0 @@ // `options` here are any options that can be passed to ChannelWrapper.

64

dist/cjs/ChannelWrapper.js

@@ -81,2 +81,3 @@ "use strict";

this.name = options.name;
this._publishTimeout = options.publishTimeout;
this._json = (_a = options.json) !== null && _a !== void 0 ? _a : false;

@@ -184,3 +185,4 @@ // Array of setup functions to call.

return promise_breaker_1.default.addCallback(done, new Promise((resolve, reject) => {
this._messages.push({
const { timeout, ...opts } = options || {};
this._enqueueMessage({
type: 'publish',

@@ -190,6 +192,7 @@ exchange,

content,
options,
resolve,
reject,
});
options: opts,
isTimedout: false,
}, timeout || this._publishTimeout);
this._startWorker();

@@ -209,13 +212,34 @@ }));

return promise_breaker_1.default.addCallback(done, new Promise((resolve, reject) => {
this._messages.push({
const { timeout, ...opts } = options || {};
this._enqueueMessage({
type: 'sendToQueue',
queue,
content,
options,
resolve,
reject,
});
return this._startWorker();
options: opts,
isTimedout: false,
}, timeout || this._publishTimeout);
this._startWorker();
}));
}
_enqueueMessage(message, timeout) {
if (timeout) {
message.timeout = setTimeout(() => {
let idx = this._messages.indexOf(message);
if (idx !== -1) {
this._messages.splice(idx, 1);
}
else {
idx = this._unconfirmedMessages.indexOf(message);
if (idx !== -1) {
this._unconfirmedMessages.splice(idx, 1);
}
}
message.isTimedout = true;
message.reject(new Error('timeout'));
}, timeout);
}
this._messages.push(message);
}
// Called whenever we connect to the broker.

@@ -294,7 +318,17 @@ async _onConnect({ connection }) {

// Reject any unsent messages.
this._messages.forEach((message) => message.reject(new Error('Channel closed')));
this._messages.forEach((message) => {
if (message.timeout) {
clearTimeout(message.timeout);
}
message.reject(new Error('Channel closed'));
});
}
if (this._unconfirmedMessages.length !== 0) {
// Reject any unconfirmed messages.
this._unconfirmedMessages.forEach((message) => message.reject(new Error('Channel closed')));
this._unconfirmedMessages.forEach((message) => {
if (message.timeout) {
clearTimeout(message.timeout);
}
message.reject(new Error('Channel closed'));
});
}

@@ -387,2 +421,8 @@ this._connectionManager.removeListener('connect', this._onConnect);

thisCanSend = this._channelHasRoom = channel.publish(message.exchange, message.routingKey, encodedMessage, message.options, (err) => {
if (message.isTimedout) {
return;
}
if (message.timeout) {
clearTimeout(message.timeout);
}
if (err) {

@@ -400,2 +440,8 @@ this._messageRejected(message, err);

thisCanSend = this._channelHasRoom = channel.sendToQueue(message.queue, encodedMessage, message.options, (err) => {
if (message.isTimedout) {
return;
}
if (message.timeout) {
clearTimeout(message.timeout);
}
if (err) {

@@ -402,0 +448,0 @@ this._messageRejected(message, err);

9

dist/cjs/index.js

@@ -6,6 +6,11 @@ "use strict";

Object.defineProperty(exports, "__esModule", { value: true });
exports.connect = void 0;
exports.AmqpConnectionManagerClass = exports.connect = void 0;
const AmqpConnectionManager_js_1 = __importDefault(require("./AmqpConnectionManager.js"));
exports.AmqpConnectionManagerClass = AmqpConnectionManager_js_1.default;
function connect(urls, options) {
return new AmqpConnectionManager_js_1.default(urls, options);
const conn = new AmqpConnectionManager_js_1.default(urls, options);
conn.connect().catch(() => {
/* noop */
});
return conn;
}

@@ -12,0 +17,0 @@ exports.connect = connect;

@@ -62,2 +62,3 @@ /// <reference types="node" />

}) => void): this;
listeners(eventName: string | symbol): Function[];
on(event: string, listener: (...args: any[]) => void): this;

@@ -100,2 +101,6 @@ on(event: 'connect', listener: ConnectListener): this;

removeListener(event: string, listener: (...args: any[]) => void): this;
connect(options?: {
timeout?: number;
}): Promise<void>;
reconnect(): void;
createChannel(options?: CreateChannelOpts): ChannelWrapper;

@@ -105,5 +110,5 @@ close(): Promise<void>;

/** The current connection. */
get connection(): Connection | undefined;
readonly connection: Connection | undefined;
/** Returns the number of registered channels. */
get channelCount(): number;
readonly channelCount: number;
}

@@ -154,2 +159,11 @@ export default class AmqpConnectionManager extends EventEmitter implements IAmqpConnectionManager {

constructor(urls: ConnectionUrl | ConnectionUrl[] | undefined | null, options?: AmqpConnectionManagerOptions);
/**
* Start the connect retries and await the first connect result. Even if the initial connect fails or timeouts, the
* reconnect attempts will continue in the background.
* @param [options={}] -
* @param [options.timeout] - Time to wait for initial connect
*/
connect({ timeout }?: {
timeout?: number;
}): Promise<void>;
createChannel(options?: CreateChannelOpts): ChannelWrapper;

@@ -156,0 +170,0 @@ close(): Promise<void>;

import amqp from 'amqplib';
import { EventEmitter } from 'events';
import { EventEmitter, once } from 'events';
import pb from 'promise-breaker';

@@ -70,3 +70,37 @@ import { URL } from 'url';

this._findServers = options.findServers || (() => Promise.resolve(urls));
}
/**
* Start the connect retries and await the first connect result. Even if the initial connect fails or timeouts, the
* reconnect attempts will continue in the background.
* @param [options={}] -
* @param [options.timeout] - Time to wait for initial connect
*/
async connect({ timeout } = {}) {
this._connect();
let reject;
const onDisconnect = ({ err }) => {
// Ignore disconnects caused by dead servers etc., but throw on operational errors like bad credentials.
if (err.isOperational) {
reject(err);
}
};
try {
await Promise.race([
once(this, 'connect'),
new Promise((_resolve, innerReject) => {
reject = innerReject;
this.on('disconnect', onDisconnect);
}),
...(timeout
? [
wait(timeout).promise.then(() => {
throw new Error('amqp-connection-manager: connect timeout');
}),
]
: []),
]);
}
finally {
this.removeListener('disconnect', onDisconnect);
}
}

@@ -73,0 +107,0 @@ // `options` here are any options that can be passed to ChannelWrapper.

@@ -21,3 +21,11 @@ /// <reference types="node" />

json?: boolean;
/**
* Default publish timeout in ms. Messages not published within the given time are rejected with a timeout error.
*/
publishTimeout?: number;
}
interface PublishOptions extends Options.Publish {
/** Message will be rejected after timeout ms */
timeout?: number;
}
interface ConsumerOptions extends amqplib.Options.Consume {

@@ -81,2 +89,6 @@ prefetch?: number;

private _channelHasRoom;
/**
* Default publish timeout
*/
private _publishTimeout?;
name?: string;

@@ -145,3 +157,3 @@ addListener(event: string, listener: (...args: any[]) => void): this;

*/
removeSetup(setup: SetupFunc, teardown?: pb.Callback<void>, done?: pb.Callback<void>): Promise<void>;
removeSetup(setup: SetupFunc, teardown?: SetupFunc, done?: pb.Callback<void>): Promise<void>;
/**

@@ -155,4 +167,5 @@ * Returns a Promise which resolves when this channel next connects.

waitForConnect(done?: pb.Callback<void>): Promise<void>;
publish(exchange: string, routingKey: string, content: Buffer | string | unknown, options?: amqplib.Options.Publish, done?: pb.Callback<boolean>): Promise<boolean>;
sendToQueue(queue: string, content: Buffer | string | unknown, options?: Options.Publish, done?: pb.Callback<boolean>): Promise<boolean>;
publish(exchange: string, routingKey: string, content: Buffer | string | unknown, options?: PublishOptions, done?: pb.Callback<boolean>): Promise<boolean>;
sendToQueue(queue: string, content: Buffer | string | unknown, options?: PublishOptions, done?: pb.Callback<boolean>): Promise<boolean>;
private _enqueueMessage;
/**

@@ -159,0 +172,0 @@ * Create a new ChannelWrapper.

@@ -76,2 +76,3 @@ import { EventEmitter } from 'events';

this.name = options.name;
this._publishTimeout = options.publishTimeout;
this._json = (_a = options.json) !== null && _a !== void 0 ? _a : false;

@@ -179,3 +180,4 @@ // Array of setup functions to call.

return pb.addCallback(done, new Promise((resolve, reject) => {
this._messages.push({
const { timeout, ...opts } = options || {};
this._enqueueMessage({
type: 'publish',

@@ -185,6 +187,7 @@ exchange,

content,
options,
resolve,
reject,
});
options: opts,
isTimedout: false,
}, timeout || this._publishTimeout);
this._startWorker();

@@ -204,13 +207,34 @@ }));

return pb.addCallback(done, new Promise((resolve, reject) => {
this._messages.push({
const { timeout, ...opts } = options || {};
this._enqueueMessage({
type: 'sendToQueue',
queue,
content,
options,
resolve,
reject,
});
return this._startWorker();
options: opts,
isTimedout: false,
}, timeout || this._publishTimeout);
this._startWorker();
}));
}
_enqueueMessage(message, timeout) {
if (timeout) {
message.timeout = setTimeout(() => {
let idx = this._messages.indexOf(message);
if (idx !== -1) {
this._messages.splice(idx, 1);
}
else {
idx = this._unconfirmedMessages.indexOf(message);
if (idx !== -1) {
this._unconfirmedMessages.splice(idx, 1);
}
}
message.isTimedout = true;
message.reject(new Error('timeout'));
}, timeout);
}
this._messages.push(message);
}
// Called whenever we connect to the broker.

@@ -289,7 +313,17 @@ async _onConnect({ connection }) {

// Reject any unsent messages.
this._messages.forEach((message) => message.reject(new Error('Channel closed')));
this._messages.forEach((message) => {
if (message.timeout) {
clearTimeout(message.timeout);
}
message.reject(new Error('Channel closed'));
});
}
if (this._unconfirmedMessages.length !== 0) {
// Reject any unconfirmed messages.
this._unconfirmedMessages.forEach((message) => message.reject(new Error('Channel closed')));
this._unconfirmedMessages.forEach((message) => {
if (message.timeout) {
clearTimeout(message.timeout);
}
message.reject(new Error('Channel closed'));
});
}

@@ -382,2 +416,8 @@ this._connectionManager.removeListener('connect', this._onConnect);

thisCanSend = this._channelHasRoom = channel.publish(message.exchange, message.routingKey, encodedMessage, message.options, (err) => {
if (message.isTimedout) {
return;
}
if (message.timeout) {
clearTimeout(message.timeout);
}
if (err) {

@@ -395,2 +435,8 @@ this._messageRejected(message, err);

thisCanSend = this._channelHasRoom = channel.sendToQueue(message.queue, encodedMessage, message.options, (err) => {
if (message.isTimedout) {
return;
}
if (message.timeout) {
clearTimeout(message.timeout);
}
if (err) {

@@ -397,0 +443,0 @@ this._messageRejected(message, err);

@@ -1,5 +0,6 @@

import { AmqpConnectionManagerOptions, ConnectionUrl, IAmqpConnectionManager } from './AmqpConnectionManager.js';
import AmqpConnectionManager, { AmqpConnectionManagerOptions, ConnectionUrl, IAmqpConnectionManager } from './AmqpConnectionManager.js';
export type { AmqpConnectionManagerOptions, ConnectionUrl, IAmqpConnectionManager as AmqpConnectionManager, } from './AmqpConnectionManager.js';
export type { CreateChannelOpts, default as ChannelWrapper, SetupFunc } from './ChannelWrapper.js';
export declare function connect(urls: ConnectionUrl | ConnectionUrl[] | undefined | null, options?: AmqpConnectionManagerOptions): IAmqpConnectionManager;
export { AmqpConnectionManager as AmqpConnectionManagerClass };
declare const amqp: {

@@ -6,0 +7,0 @@ connect: typeof connect;

import AmqpConnectionManager from './AmqpConnectionManager.js';
export function connect(urls, options) {
return new AmqpConnectionManager(urls, options);
const conn = new AmqpConnectionManager(urls, options);
conn.connect().catch(() => {
/* noop */
});
return conn;
}
export { AmqpConnectionManager as AmqpConnectionManagerClass };
const amqp = { connect };
export default amqp;
//# sourceMappingURL=index.js.map
{
"name": "amqp-connection-manager",
"version": "3.6.0",
"version": "3.7.0",
"description": "Auto-reconnect and round robin support for amqplib.",

@@ -19,5 +19,3 @@ "module": "./dist/esm/index.js",

"dependencies": {
"promise-breaker": "^5.0.0",
"ts-node": "^10.2.1",
"typescript": "^4.3.5"
"promise-breaker": "^5.0.0"
},

@@ -63,3 +61,5 @@ "peerDependencies": {

"semantic-release": "^17.1.1",
"ts-jest": "^27.0.5"
"ts-jest": "^27.0.5",
"ts-node": "^10.2.1",
"typescript": "^4.3.5"
},

@@ -66,0 +66,0 @@ "engines": {

@@ -5,9 +5,4 @@ # amqp-connection-manager

![Build Status](https://github.com/jwalton/node-amqp-connection-manager/workflows/GitHub%20CI/badge.svg)
[![Coverage Status](https://coveralls.io/repos/jwalton/node-amqp-connection-manager/badge.svg?branch=master&service=github)](https://coveralls.io/github/jwalton/node-amqp-connection-manager?branch=master)
[![semantic-release](https://img.shields.io/badge/%20%20%F0%9F%93%A6%F0%9F%9A%80-semantic--release-e10079.svg)](https://github.com/semantic-release/semantic-release)
[![Dependency Status](https://david-dm.org/jwalton/node-amqp-connection-manager.svg)](https://david-dm.org/jwalton/node-amqp-connection-manager)
[![devDependency Status](https://david-dm.org/jwalton/node-amqp-connection-manager/dev-status.svg)](https://david-dm.org/jwalton/node-amqp-connection-manager#info=devDependencies)
[![peerDependency Status](https://david-dm.org/jwalton/node-amqp-connection-manager/peer-status.svg)](https://david-dm.org/jwalton/node-amqp-connection-manager#info=peerDependencies)
Connection management for amqplib. This is a wrapper around [amqplib](http://www.squaremobius.net/amqp.node/) which provides automatic reconnects.

@@ -143,2 +138,3 @@

are plain JSON objects. These will be encoded automatically before being sent.
- `options.publishTimeout` - a default timeout for messages published to this channel.

@@ -188,2 +184,8 @@ ### AmqpConnectionManager#isConnected()

Both of these functions take an additional option when passing options:
- `timeout` - If specified, if a messages is not acked by the amqp broker within the specified number of milliseconds,
the message will be rejected. Note that the message _may_ still end up getting delivered after the timeout, as we
have no way to cancel the in-flight request.
### ChannelWrapper#ack and ChannelWrapper#nack

@@ -190,0 +192,0 @@

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc