Socket
Socket
Sign inDemoInstall

amqp-connection-manager

Package Overview
Dependencies
16
Maintainers
1
Versions
67
Alerts
File Explorer

Advanced tools

Install Socket

Detect and block malicious and high-risk dependencies

Install

Comparing version 3.7.0 to 3.8.0

91

dist/cjs/ChannelWrapper.js

@@ -50,3 +50,3 @@ "use strict";

constructor(connectionManager, options = {}) {
var _a;
var _a, _b;
super();

@@ -64,2 +64,6 @@ /** If we're in the process of creating a channel, this is a Promise which

/**
* True to create a ConfirmChannel. False to create a regular Channel.
*/
this._confirm = true;
/**
* True if the "worker" is busy sending messages. False if we need to

@@ -82,5 +86,6 @@ * start the worker to get stuff done.

this._connectionManager = connectionManager;
this._confirm = (_a = options.confirm) !== null && _a !== void 0 ? _a : true;
this.name = options.name;
this._publishTimeout = options.publishTimeout;
this._json = (_a = options.json) !== null && _a !== void 0 ? _a : false;
this._json = (_b = options.json) !== null && _b !== void 0 ? _b : false;
// Array of setup functions to call.

@@ -192,3 +197,3 @@ this._setups = [];

routingKey,
content,
content: this._getEncodedMessage(content),
resolve,

@@ -212,2 +217,3 @@ reject,

sendToQueue(queue, content, options, done) {
const encodedContent = this._getEncodedMessage(content);
return promise_breaker_1.default.addCallback(done, new Promise((resolve, reject) => {

@@ -218,3 +224,3 @@ const { timeout, ...opts } = options || {};

queue,
content,
content: encodedContent,
resolve,

@@ -251,3 +257,9 @@ reject,

try {
const channel = await connection.createConfirmChannel();
let channel;
if (this._confirm) {
channel = await connection.createConfirmChannel();
}
else {
channel = await connection.createChannel();
}
this._channel = channel;

@@ -417,39 +429,56 @@ this._channelHasRoom = true;

}
this._unconfirmedMessages.push(message);
const encodedMessage = this._getEncodedMessage(message.content);
let thisCanSend = true;
switch (message.type) {
case 'publish': {
let thisCanSend = true;
thisCanSend = this._channelHasRoom = channel.publish(message.exchange, message.routingKey, encodedMessage, message.options, (err) => {
if (message.isTimedout) {
return;
}
if (this._confirm) {
this._unconfirmedMessages.push(message);
thisCanSend = this._channelHasRoom = channel.publish(message.exchange, message.routingKey, message.content, message.options, (err) => {
if (message.isTimedout) {
return;
}
if (message.timeout) {
clearTimeout(message.timeout);
}
if (err) {
this._messageRejected(message, err);
}
else {
this._messageResolved(message, thisCanSend);
}
});
}
else {
if (message.timeout) {
clearTimeout(message.timeout);
}
if (err) {
this._messageRejected(message, err);
}
else {
this._messageResolved(message, thisCanSend);
}
});
thisCanSend = this._channelHasRoom = channel.publish(message.exchange, message.routingKey, message.content, message.options);
message.resolve(thisCanSend);
}
break;
}
case 'sendToQueue': {
let thisCanSend = true;
thisCanSend = this._channelHasRoom = channel.sendToQueue(message.queue, encodedMessage, message.options, (err) => {
if (message.isTimedout) {
return;
}
if (this._confirm) {
this._unconfirmedMessages.push(message);
thisCanSend = this._channelHasRoom = channel.sendToQueue(message.queue, message.content, message.options, (err) => {
if (message.isTimedout) {
return;
}
if (message.timeout) {
clearTimeout(message.timeout);
}
if (err) {
this._messageRejected(message, err);
}
else {
this._messageResolved(message, thisCanSend);
}
});
}
else {
if (message.timeout) {
clearTimeout(message.timeout);
}
if (err) {
this._messageRejected(message, err);
}
else {
this._messageResolved(message, thisCanSend);
}
});
thisCanSend = this._channelHasRoom = channel.sendToQueue(message.queue, message.content, message.options);
message.resolve(thisCanSend);
}
break;

@@ -456,0 +485,0 @@ }

/// <reference types="node" />
import * as amqplib from 'amqplib';
import { ConfirmChannel, Options } from 'amqplib';
import type * as amqplib from 'amqplib';
import { Options } from 'amqplib';
import { EventEmitter } from 'events';
import pb from 'promise-breaker';
import { IAmqpConnectionManager } from './AmqpConnectionManager.js';
export declare type SetupFunc = ((channel: ConfirmChannel, callback: (error?: Error) => void) => void) | ((channel: ConfirmChannel) => Promise<void>);
export declare type Channel = amqplib.ConfirmChannel | amqplib.Channel;
export declare type SetupFunc = ((channel: Channel, callback: (error?: Error) => void) => void) | ((channel: Channel) => Promise<void>) | ((channel: amqplib.ConfirmChannel, callback: (error?: Error) => void) => void) | ((channel: amqplib.ConfirmChannel) => Promise<void>);
export interface CreateChannelOpts {

@@ -17,2 +18,6 @@ /** Name for this channel. Used for debugging. */

/**
* True to create a ConfirmChannel (default). False to create a regular Channel.
*/
confirm?: boolean;
/**
* if true, then ChannelWrapper assumes all messages passed to publish() and sendToQueue() are plain JSON objects.

@@ -75,2 +80,6 @@ * These will be encoded automatically before being sent.

/**
* True to create a ConfirmChannel. False to create a regular Channel.
*/
private _confirm;
/**
* True if the "worker" is busy sending messages. False if we need to

@@ -77,0 +86,0 @@ * start the worker to get stuff done.

@@ -45,3 +45,3 @@ import { EventEmitter } from 'events';

constructor(connectionManager, options = {}) {
var _a;
var _a, _b;
super();

@@ -59,2 +59,6 @@ /** If we're in the process of creating a channel, this is a Promise which

/**
* True to create a ConfirmChannel. False to create a regular Channel.
*/
this._confirm = true;
/**
* True if the "worker" is busy sending messages. False if we need to

@@ -77,5 +81,6 @@ * start the worker to get stuff done.

this._connectionManager = connectionManager;
this._confirm = (_a = options.confirm) !== null && _a !== void 0 ? _a : true;
this.name = options.name;
this._publishTimeout = options.publishTimeout;
this._json = (_a = options.json) !== null && _a !== void 0 ? _a : false;
this._json = (_b = options.json) !== null && _b !== void 0 ? _b : false;
// Array of setup functions to call.

@@ -187,3 +192,3 @@ this._setups = [];

routingKey,
content,
content: this._getEncodedMessage(content),
resolve,

@@ -207,2 +212,3 @@ reject,

sendToQueue(queue, content, options, done) {
const encodedContent = this._getEncodedMessage(content);
return pb.addCallback(done, new Promise((resolve, reject) => {

@@ -213,3 +219,3 @@ const { timeout, ...opts } = options || {};

queue,
content,
content: encodedContent,
resolve,

@@ -246,3 +252,9 @@ reject,

try {
const channel = await connection.createConfirmChannel();
let channel;
if (this._confirm) {
channel = await connection.createConfirmChannel();
}
else {
channel = await connection.createChannel();
}
this._channel = channel;

@@ -412,39 +424,56 @@ this._channelHasRoom = true;

}
this._unconfirmedMessages.push(message);
const encodedMessage = this._getEncodedMessage(message.content);
let thisCanSend = true;
switch (message.type) {
case 'publish': {
let thisCanSend = true;
thisCanSend = this._channelHasRoom = channel.publish(message.exchange, message.routingKey, encodedMessage, message.options, (err) => {
if (message.isTimedout) {
return;
}
if (this._confirm) {
this._unconfirmedMessages.push(message);
thisCanSend = this._channelHasRoom = channel.publish(message.exchange, message.routingKey, message.content, message.options, (err) => {
if (message.isTimedout) {
return;
}
if (message.timeout) {
clearTimeout(message.timeout);
}
if (err) {
this._messageRejected(message, err);
}
else {
this._messageResolved(message, thisCanSend);
}
});
}
else {
if (message.timeout) {
clearTimeout(message.timeout);
}
if (err) {
this._messageRejected(message, err);
}
else {
this._messageResolved(message, thisCanSend);
}
});
thisCanSend = this._channelHasRoom = channel.publish(message.exchange, message.routingKey, message.content, message.options);
message.resolve(thisCanSend);
}
break;
}
case 'sendToQueue': {
let thisCanSend = true;
thisCanSend = this._channelHasRoom = channel.sendToQueue(message.queue, encodedMessage, message.options, (err) => {
if (message.isTimedout) {
return;
}
if (this._confirm) {
this._unconfirmedMessages.push(message);
thisCanSend = this._channelHasRoom = channel.sendToQueue(message.queue, message.content, message.options, (err) => {
if (message.isTimedout) {
return;
}
if (message.timeout) {
clearTimeout(message.timeout);
}
if (err) {
this._messageRejected(message, err);
}
else {
this._messageResolved(message, thisCanSend);
}
});
}
else {
if (message.timeout) {
clearTimeout(message.timeout);
}
if (err) {
this._messageRejected(message, err);
}
else {
this._messageResolved(message, thisCanSend);
}
});
thisCanSend = this._channelHasRoom = channel.sendToQueue(message.queue, message.content, message.options);
message.resolve(thisCanSend);
}
break;

@@ -451,0 +480,0 @@ }

import AmqpConnectionManager, { AmqpConnectionManagerOptions, ConnectionUrl, IAmqpConnectionManager } from './AmqpConnectionManager.js';
export type { AmqpConnectionManagerOptions, ConnectionUrl, IAmqpConnectionManager as AmqpConnectionManager, } from './AmqpConnectionManager.js';
export type { CreateChannelOpts, default as ChannelWrapper, SetupFunc } from './ChannelWrapper.js';
export type { CreateChannelOpts, default as ChannelWrapper, SetupFunc, Channel, } from './ChannelWrapper.js';
export declare function connect(urls: ConnectionUrl | ConnectionUrl[] | undefined | null, options?: AmqpConnectionManagerOptions): IAmqpConnectionManager;

@@ -5,0 +5,0 @@ export { AmqpConnectionManager as AmqpConnectionManagerClass };

{
"name": "amqp-connection-manager",
"version": "3.7.0",
"version": "3.8.0",
"description": "Auto-reconnect and round robin support for amqplib.",

@@ -5,0 +5,0 @@ "module": "./dist/esm/index.js",

@@ -135,4 +135,5 @@ # amqp-connection-manager

arbitrary data in.
- `options.json` if true, then ChannelWrapper assumes all messages passed to `publish()` and `sendToQueue()`
- `options.json` - if true, then ChannelWrapper assumes all messages passed to `publish()` and `sendToQueue()`
are plain JSON objects. These will be encoded automatically before being sent.
- `options.confirm` - if true (default), the created channel will be a ConfirmChannel
- `options.publishTimeout` - a default timeout for messages published to this channel.

@@ -139,0 +140,0 @@

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc