amqp-connection-manager
Advanced tools
Comparing version 4.0.1 to 4.1.0
@@ -6,5 +6,8 @@ "use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
const crypto_1 = __importDefault(require("crypto")); | ||
const events_1 = require("events"); | ||
const promise_breaker_1 = __importDefault(require("promise-breaker")); | ||
const util_1 = require("util"); | ||
const MAX_MESSAGES_PER_BATCH = 1000; | ||
const randomBytes = (0, util_1.promisify)(crypto_1.default.randomBytes); | ||
const IRRECOVERABLE_ERRORS = [ | ||
@@ -503,2 +506,3 @@ 403, | ||
async consume(queue, onMessage, options = {}) { | ||
const consumerTag = options.consumerTag || (await randomBytes(16)).toString('hex'); | ||
const consumer = { | ||
@@ -508,6 +512,10 @@ consumerTag: null, | ||
onMessage, | ||
options, | ||
options: { | ||
...options, | ||
consumerTag, | ||
}, | ||
}; | ||
this._consumers.push(consumer); | ||
await this._consume(consumer); | ||
return { consumerTag }; | ||
} | ||
@@ -566,2 +574,13 @@ async _consume(consumer) { | ||
} | ||
async cancel(consumerTag) { | ||
const idx = this._consumers.findIndex((x) => x.options.consumerTag === consumerTag); | ||
if (idx === -1) { | ||
return; | ||
} | ||
const consumer = this._consumers[idx]; | ||
this._consumers.splice(idx, 1); | ||
if (this._channel && consumer.consumerTag) { | ||
await this._channel.cancel(consumer.consumerTag); | ||
} | ||
} | ||
/** Send an `ack` to the underlying channel. */ | ||
@@ -568,0 +587,0 @@ ack(message, allUpTo) { |
@@ -209,3 +209,3 @@ /// <reference types="node" /> | ||
*/ | ||
consume(queue: string, onMessage: Consumer['onMessage'], options?: ConsumerOptions): Promise<void>; | ||
consume(queue: string, onMessage: Consumer['onMessage'], options?: ConsumerOptions): Promise<amqplib.Replies.Consume>; | ||
private _consume; | ||
@@ -217,2 +217,3 @@ private _reconnectConsumer; | ||
cancelAll(): Promise<void>; | ||
cancel(consumerTag: string): Promise<void>; | ||
/** Send an `ack` to the underlying channel. */ | ||
@@ -219,0 +220,0 @@ ack(message: amqplib.Message, allUpTo?: boolean): void; |
@@ -0,4 +1,7 @@ | ||
import crypto from 'crypto'; | ||
import { EventEmitter } from 'events'; | ||
import pb from 'promise-breaker'; | ||
import { promisify } from 'util'; | ||
const MAX_MESSAGES_PER_BATCH = 1000; | ||
const randomBytes = promisify(crypto.randomBytes); | ||
const IRRECOVERABLE_ERRORS = [ | ||
@@ -497,2 +500,3 @@ 403, | ||
async consume(queue, onMessage, options = {}) { | ||
const consumerTag = options.consumerTag || (await randomBytes(16)).toString('hex'); | ||
const consumer = { | ||
@@ -502,6 +506,10 @@ consumerTag: null, | ||
onMessage, | ||
options, | ||
options: { | ||
...options, | ||
consumerTag, | ||
}, | ||
}; | ||
this._consumers.push(consumer); | ||
await this._consume(consumer); | ||
return { consumerTag }; | ||
} | ||
@@ -560,2 +568,13 @@ async _consume(consumer) { | ||
} | ||
async cancel(consumerTag) { | ||
const idx = this._consumers.findIndex((x) => x.options.consumerTag === consumerTag); | ||
if (idx === -1) { | ||
return; | ||
} | ||
const consumer = this._consumers[idx]; | ||
this._consumers.splice(idx, 1); | ||
if (this._channel && consumer.consumerTag) { | ||
await this._channel.cancel(consumer.consumerTag); | ||
} | ||
} | ||
/** Send an `ack` to the underlying channel. */ | ||
@@ -562,0 +581,0 @@ ack(message, allUpTo) { |
{ | ||
"name": "amqp-connection-manager", | ||
"version": "4.0.1", | ||
"version": "4.1.0", | ||
"description": "Auto-reconnect and round robin support for amqplib.", | ||
@@ -56,3 +56,3 @@ "module": "./dist/esm/index.js", | ||
"promise-tools": "^2.1.0", | ||
"semantic-release": "^18.0.1", | ||
"semantic-release": "^19.0.2", | ||
"ts-jest": "^27.0.5", | ||
@@ -59,0 +59,0 @@ "ts-node": "^10.2.1", |
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
140542
2457