amqp-connection-manager
Advanced tools
Comparing version 3.5.2 to 3.6.0
@@ -224,3 +224,3 @@ "use strict"; | ||
this.emit('disconnect', { err }); | ||
const handle = helpers_js_1.wait(this.reconnectTimeInSeconds * 1000); | ||
const handle = (0, helpers_js_1.wait)(this.reconnectTimeInSeconds * 1000); | ||
this._cancelRetriesHandler = handle.cancel; | ||
@@ -249,6 +249,6 @@ handle.promise | ||
if (err.name === 'OperationalError' && err.message === 'connect ETIMEDOUT') { | ||
handle = helpers_js_1.wait(0); | ||
handle = (0, helpers_js_1.wait)(0); | ||
} | ||
else { | ||
handle = helpers_js_1.wait(this.reconnectTimeInSeconds * 1000); | ||
handle = (0, helpers_js_1.wait)(this.reconnectTimeInSeconds * 1000); | ||
} | ||
@@ -255,0 +255,0 @@ this._cancelRetriesHandler = handle.cancel; |
@@ -60,2 +60,4 @@ "use strict"; | ||
this._unconfirmedMessages = []; | ||
/** Consumers which will be reconnected on channel errors etc. */ | ||
this._consumers = []; | ||
/** | ||
@@ -83,2 +85,3 @@ * True if the "worker" is busy sending messages. False if we need to | ||
this._setups = []; | ||
this._consumers = []; | ||
if (options.setup) { | ||
@@ -233,3 +236,7 @@ this._setups.push(options.setup); | ||
this.emit('error', err, { name: this.name }); | ||
}))).then(() => { | ||
}))) | ||
.then(() => { | ||
return Promise.all(this._consumers.map((c) => this._reconnectConsumer(c))); | ||
}) | ||
.then(() => { | ||
this._settingUp = undefined; | ||
@@ -416,2 +423,68 @@ }); | ||
} | ||
/** | ||
* Setup a consumer | ||
* This consumer will be reconnected on cancellation and channel errors. | ||
*/ | ||
async consume(queue, onMessage, options = {}) { | ||
const consumer = { | ||
consumerTag: null, | ||
queue, | ||
onMessage, | ||
options, | ||
}; | ||
this._consumers.push(consumer); | ||
await this._consume(consumer); | ||
} | ||
async _consume(consumer) { | ||
if (!this._channel) { | ||
return; | ||
} | ||
const { prefetch, ...options } = consumer.options; | ||
if (typeof prefetch === 'number') { | ||
this._channel.prefetch(prefetch, false); | ||
} | ||
const { consumerTag } = await this._channel.consume(consumer.queue, (msg) => { | ||
if (!msg) { | ||
consumer.consumerTag = null; | ||
this._reconnectConsumer(consumer).catch((err) => { | ||
if (err.isOperational && err.message.includes('BasicConsume; 404')) { | ||
// Ignore errors caused by queue not declared. In | ||
// those cases the connection will reconnect and | ||
// then consumers reestablished. The full reconnect | ||
// might be avoided if we assert the queue again | ||
// before starting to consume. | ||
return; | ||
} | ||
throw err; | ||
}); | ||
return; | ||
} | ||
consumer.onMessage(msg); | ||
}, options); | ||
consumer.consumerTag = consumerTag; | ||
} | ||
async _reconnectConsumer(consumer) { | ||
if (!this._consumers.includes(consumer)) { | ||
// Intentionally canceled | ||
return; | ||
} | ||
await this._consume(consumer); | ||
} | ||
/** | ||
* Cancel all consumers | ||
*/ | ||
async cancelAll() { | ||
const consumers = this._consumers; | ||
this._consumers = []; | ||
if (!this._channel) { | ||
return; | ||
} | ||
const channel = this._channel; | ||
await Promise.all(consumers.reduce((acc, consumer) => { | ||
if (consumer.consumerTag) { | ||
acc.push(channel.cancel(consumer.consumerTag)); | ||
} | ||
return acc; | ||
}, [])); | ||
} | ||
/** Send an `ack` to the underlying channel. */ | ||
@@ -418,0 +491,0 @@ ack(message, allUpTo) { |
@@ -22,2 +22,11 @@ /// <reference types="node" /> | ||
} | ||
interface ConsumerOptions extends amqplib.Options.Consume { | ||
prefetch?: number; | ||
} | ||
interface Consumer { | ||
consumerTag: string | null; | ||
queue: string; | ||
onMessage: (msg: amqplib.ConsumeMessage) => void; | ||
options: ConsumerOptions; | ||
} | ||
/** | ||
@@ -49,2 +58,4 @@ * Calls to `publish()` or `sendToQueue()` work just like in amqplib, but messages are queued internally and | ||
private _irrecoverableCode; | ||
/** Consumers which will be reconnected on channel errors etc. */ | ||
private _consumers; | ||
/** | ||
@@ -174,2 +185,13 @@ * The currently connected channel. Note that not all setup functions | ||
private _publishQueuedMessages; | ||
/** | ||
* Setup a consumer | ||
* This consumer will be reconnected on cancellation and channel errors. | ||
*/ | ||
consume(queue: string, onMessage: Consumer['onMessage'], options?: ConsumerOptions): Promise<void>; | ||
private _consume; | ||
private _reconnectConsumer; | ||
/** | ||
* Cancel all consumers | ||
*/ | ||
cancelAll(): Promise<void>; | ||
/** Send an `ack` to the underlying channel. */ | ||
@@ -198,1 +220,2 @@ ack(message: amqplib.Message, allUpTo?: boolean): void; | ||
} | ||
export {}; |
@@ -55,2 +55,4 @@ import { EventEmitter } from 'events'; | ||
this._unconfirmedMessages = []; | ||
/** Consumers which will be reconnected on channel errors etc. */ | ||
this._consumers = []; | ||
/** | ||
@@ -78,2 +80,3 @@ * True if the "worker" is busy sending messages. False if we need to | ||
this._setups = []; | ||
this._consumers = []; | ||
if (options.setup) { | ||
@@ -228,3 +231,7 @@ this._setups.push(options.setup); | ||
this.emit('error', err, { name: this.name }); | ||
}))).then(() => { | ||
}))) | ||
.then(() => { | ||
return Promise.all(this._consumers.map((c) => this._reconnectConsumer(c))); | ||
}) | ||
.then(() => { | ||
this._settingUp = undefined; | ||
@@ -411,2 +418,68 @@ }); | ||
} | ||
/** | ||
* Setup a consumer | ||
* This consumer will be reconnected on cancellation and channel errors. | ||
*/ | ||
async consume(queue, onMessage, options = {}) { | ||
const consumer = { | ||
consumerTag: null, | ||
queue, | ||
onMessage, | ||
options, | ||
}; | ||
this._consumers.push(consumer); | ||
await this._consume(consumer); | ||
} | ||
async _consume(consumer) { | ||
if (!this._channel) { | ||
return; | ||
} | ||
const { prefetch, ...options } = consumer.options; | ||
if (typeof prefetch === 'number') { | ||
this._channel.prefetch(prefetch, false); | ||
} | ||
const { consumerTag } = await this._channel.consume(consumer.queue, (msg) => { | ||
if (!msg) { | ||
consumer.consumerTag = null; | ||
this._reconnectConsumer(consumer).catch((err) => { | ||
if (err.isOperational && err.message.includes('BasicConsume; 404')) { | ||
// Ignore errors caused by queue not declared. In | ||
// those cases the connection will reconnect and | ||
// then consumers reestablished. The full reconnect | ||
// might be avoided if we assert the queue again | ||
// before starting to consume. | ||
return; | ||
} | ||
throw err; | ||
}); | ||
return; | ||
} | ||
consumer.onMessage(msg); | ||
}, options); | ||
consumer.consumerTag = consumerTag; | ||
} | ||
async _reconnectConsumer(consumer) { | ||
if (!this._consumers.includes(consumer)) { | ||
// Intentionally canceled | ||
return; | ||
} | ||
await this._consume(consumer); | ||
} | ||
/** | ||
* Cancel all consumers | ||
*/ | ||
async cancelAll() { | ||
const consumers = this._consumers; | ||
this._consumers = []; | ||
if (!this._channel) { | ||
return; | ||
} | ||
const channel = this._channel; | ||
await Promise.all(consumers.reduce((acc, consumer) => { | ||
if (consumer.consumerTag) { | ||
acc.push(channel.cancel(consumer.consumerTag)); | ||
} | ||
return acc; | ||
}, [])); | ||
} | ||
/** Send an `ack` to the underlying channel. */ | ||
@@ -413,0 +486,0 @@ ack(message, allUpTo) { |
{ | ||
"name": "amqp-connection-manager", | ||
"version": "3.5.2", | ||
"version": "3.6.0", | ||
"description": "Auto-reconnect and round robin support for amqplib.", | ||
@@ -5,0 +5,0 @@ "module": "./dist/esm/index.js", |
Sorry, the diff of this file is not supported yet
120643
2081