amqp-connection-manager
Advanced tools
Comparing version 3.5.1 to 3.5.2
@@ -8,2 +8,3 @@ "use strict"; | ||
const promise_breaker_1 = __importDefault(require("promise-breaker")); | ||
const MAX_MESSAGES_PER_BATCH = 1000; | ||
const IRRECOVERABLE_ERRORS = [ | ||
@@ -71,2 +72,6 @@ 403, | ||
this._workerNumber = 0; | ||
/** | ||
* True if the underlying channel has room for more messages. | ||
*/ | ||
this._channelHasRoom = true; | ||
this._onConnect = this._onConnect.bind(this); | ||
@@ -217,3 +222,5 @@ this._onDisconnect = this._onDisconnect.bind(this); | ||
this._channel = channel; | ||
this._channelHasRoom = true; | ||
channel.on('close', () => this._onChannelClose(channel)); | ||
channel.on('drain', () => this._onChannelDrain()); | ||
this._settingUp = Promise.all(this._setups.map((setupFn) => | ||
@@ -235,9 +242,2 @@ // TODO: Use a timeout here to guard against setupFns that never resolve? | ||
} | ||
if (this._unconfirmedMessages.length > 0) { | ||
// requeue any messages that were left unconfirmed when connection was lost | ||
let message; | ||
while ((message = this._unconfirmedMessages.shift())) { | ||
this._messages.push(message); | ||
} | ||
} | ||
// Since we just connected, publish any queued messages | ||
@@ -260,2 +260,7 @@ this._startWorker(); | ||
} | ||
/** Called whenever the channel drains. */ | ||
_onChannelDrain() { | ||
this._channelHasRoom = true; | ||
this._startWorker(); | ||
} | ||
// Called whenever we disconnect from the AMQP server. | ||
@@ -298,3 +303,3 @@ _onDisconnect(ex) { | ||
_shouldPublish() { | ||
return this._messages.length > 0 && !this._settingUp && !!this._channel; | ||
return (this._messages.length > 0 && !this._settingUp && !!this._channel && this._channelHasRoom); | ||
} | ||
@@ -313,2 +318,40 @@ // Start publishing queued messages, if there isn't already a worker doing this. | ||
} | ||
_messageResolved(message, result) { | ||
removeUnconfirmedMessage(this._unconfirmedMessages, message); | ||
message.resolve(result); | ||
} | ||
_messageRejected(message, err) { | ||
if (!this._channel && this._canWaitReconnection()) { | ||
// Tried to write to a closed channel. Leave the message in the queue and we'll try again when | ||
// we reconnect. | ||
removeUnconfirmedMessage(this._unconfirmedMessages, message); | ||
this._messages.push(message); | ||
} | ||
else { | ||
// Something went wrong trying to send this message - could be JSON.stringify failed, could be | ||
// the broker rejected the message. Either way, reject it back | ||
removeUnconfirmedMessage(this._unconfirmedMessages, message); | ||
message.reject(err); | ||
} | ||
} | ||
_getEncodedMessage(content) { | ||
let encodedMessage; | ||
if (this._json) { | ||
encodedMessage = Buffer.from(JSON.stringify(content)); | ||
} | ||
else if (typeof content === 'string') { | ||
encodedMessage = Buffer.from(content); | ||
} | ||
else if (content instanceof Buffer) { | ||
encodedMessage = content; | ||
} | ||
else if (typeof content === 'object' && typeof content.toString === 'function') { | ||
encodedMessage = Buffer.from(content.toString()); | ||
} | ||
else { | ||
console.warn('amqp-connection-manager: Sending JSON message, but json option not speicifed'); | ||
encodedMessage = Buffer.from(JSON.stringify(content)); | ||
} | ||
return encodedMessage; | ||
} | ||
_publishQueuedMessages(workerNumber) { | ||
@@ -324,85 +367,54 @@ const channel = this._channel; | ||
} | ||
const message = this._messages.shift(); | ||
if (message) { | ||
this._unconfirmedMessages.push(message); | ||
Promise.resolve() | ||
.then(() => { | ||
let encodedMessage; | ||
if (this._json) { | ||
encodedMessage = Buffer.from(JSON.stringify(message.content)); | ||
try { | ||
// Send messages in batches of 1000 - don't want to starve the event loop. | ||
let sendsLeft = MAX_MESSAGES_PER_BATCH; | ||
while (this._channelHasRoom && this._messages.length > 0 && sendsLeft > 0) { | ||
sendsLeft--; | ||
const message = this._messages.shift(); | ||
if (!message) { | ||
break; | ||
} | ||
else if (typeof message.content === 'string') { | ||
encodedMessage = Buffer.from(message.content); | ||
} | ||
else if (message.content instanceof Buffer) { | ||
encodedMessage = message.content; | ||
} | ||
else if (typeof message.content === 'object' && | ||
typeof message.content.toString === 'function') { | ||
encodedMessage = Buffer.from(message.content.toString()); | ||
} | ||
else { | ||
throw new Error('Invalid message content'); | ||
} | ||
let result = true; | ||
const sendPromise = (() => { | ||
switch (message.type) { | ||
case 'publish': | ||
return new Promise(function (resolve, reject) { | ||
result = channel.publish(message.exchange, message.routingKey, encodedMessage, message.options, (err) => { | ||
if (err) { | ||
reject(err); | ||
} | ||
else { | ||
resolve(result); | ||
} | ||
}); | ||
}); | ||
case 'sendToQueue': | ||
return new Promise(function (resolve, reject) { | ||
result = channel.sendToQueue(message.queue, encodedMessage, message.options, (err) => { | ||
if (err) { | ||
reject(err); | ||
} | ||
else { | ||
resolve(result); | ||
} | ||
}); | ||
}); | ||
/* istanbul ignore next */ | ||
default: | ||
throw new Error(`Unhandled message type ${message.type}`); | ||
this._unconfirmedMessages.push(message); | ||
const encodedMessage = this._getEncodedMessage(message.content); | ||
switch (message.type) { | ||
case 'publish': { | ||
let thisCanSend = true; | ||
thisCanSend = this._channelHasRoom = channel.publish(message.exchange, message.routingKey, encodedMessage, message.options, (err) => { | ||
if (err) { | ||
this._messageRejected(message, err); | ||
} | ||
else { | ||
this._messageResolved(message, thisCanSend); | ||
} | ||
}); | ||
break; | ||
} | ||
})(); | ||
if (result) { | ||
this._publishQueuedMessages(workerNumber); | ||
case 'sendToQueue': { | ||
let thisCanSend = true; | ||
thisCanSend = this._channelHasRoom = channel.sendToQueue(message.queue, encodedMessage, message.options, (err) => { | ||
if (err) { | ||
this._messageRejected(message, err); | ||
} | ||
else { | ||
this._messageResolved(message, thisCanSend); | ||
} | ||
}); | ||
break; | ||
} | ||
/* istanbul ignore next */ | ||
default: | ||
throw new Error(`Unhandled message type ${message.type}`); | ||
} | ||
else { | ||
channel.once('drain', () => this._publishQueuedMessages(workerNumber)); | ||
} | ||
return sendPromise; | ||
}) | ||
.then((result) => { | ||
removeUnconfirmedMessage(this._unconfirmedMessages, message); | ||
message.resolve(result); | ||
}, (err) => { | ||
if (!this._channel && this._canWaitReconnection()) { | ||
// Tried to write to a closed channel. Leave the message in the queue and we'll try again when | ||
// we reconnect. | ||
removeUnconfirmedMessage(this._unconfirmedMessages, message); | ||
this._messages.push(message); | ||
} | ||
else { | ||
// Something went wrong trying to send this message - could be JSON.stringify failed, could be | ||
// the broker rejected the message. Either way, reject it back | ||
removeUnconfirmedMessage(this._unconfirmedMessages, message); | ||
message.reject(err); | ||
} | ||
}) | ||
.catch( | ||
/* istanbul ignore next */ (err) => { | ||
this.emit('error', err); | ||
this._working = false; | ||
}); | ||
} | ||
// If we didn't send all the messages, send some more... | ||
if (this._channelHasRoom && this._messages.length > 0) { | ||
setImmediate(() => this._publishQueuedMessages(workerNumber)); | ||
} | ||
this._working = false; | ||
/* istanbul ignore next */ | ||
} | ||
catch (err) { | ||
this._working = false; | ||
this.emit('error', err); | ||
} | ||
} | ||
@@ -409,0 +421,0 @@ /** Send an `ack` to the underlying channel. */ |
@@ -65,2 +65,6 @@ /// <reference types="node" /> | ||
private _workerNumber; | ||
/** | ||
* True if the underlying channel has room for more messages. | ||
*/ | ||
private _channelHasRoom; | ||
name?: string; | ||
@@ -157,2 +161,4 @@ addListener(event: string, listener: (...args: any[]) => void): this; | ||
private _onChannelClose; | ||
/** Called whenever the channel drains. */ | ||
private _onChannelDrain; | ||
private _onDisconnect; | ||
@@ -164,2 +170,5 @@ queueLength(): number; | ||
private _canWaitReconnection; | ||
private _messageResolved; | ||
private _messageRejected; | ||
private _getEncodedMessage; | ||
private _publishQueuedMessages; | ||
@@ -166,0 +175,0 @@ /** Send an `ack` to the underlying channel. */ |
import { EventEmitter } from 'events'; | ||
import pb from 'promise-breaker'; | ||
const MAX_MESSAGES_PER_BATCH = 1000; | ||
const IRRECOVERABLE_ERRORS = [ | ||
@@ -65,2 +66,6 @@ 403, | ||
this._workerNumber = 0; | ||
/** | ||
* True if the underlying channel has room for more messages. | ||
*/ | ||
this._channelHasRoom = true; | ||
this._onConnect = this._onConnect.bind(this); | ||
@@ -211,3 +216,5 @@ this._onDisconnect = this._onDisconnect.bind(this); | ||
this._channel = channel; | ||
this._channelHasRoom = true; | ||
channel.on('close', () => this._onChannelClose(channel)); | ||
channel.on('drain', () => this._onChannelDrain()); | ||
this._settingUp = Promise.all(this._setups.map((setupFn) => | ||
@@ -229,9 +236,2 @@ // TODO: Use a timeout here to guard against setupFns that never resolve? | ||
} | ||
if (this._unconfirmedMessages.length > 0) { | ||
// requeue any messages that were left unconfirmed when connection was lost | ||
let message; | ||
while ((message = this._unconfirmedMessages.shift())) { | ||
this._messages.push(message); | ||
} | ||
} | ||
// Since we just connected, publish any queued messages | ||
@@ -254,2 +254,7 @@ this._startWorker(); | ||
} | ||
/** Called whenever the channel drains. */ | ||
_onChannelDrain() { | ||
this._channelHasRoom = true; | ||
this._startWorker(); | ||
} | ||
// Called whenever we disconnect from the AMQP server. | ||
@@ -292,3 +297,3 @@ _onDisconnect(ex) { | ||
_shouldPublish() { | ||
return this._messages.length > 0 && !this._settingUp && !!this._channel; | ||
return (this._messages.length > 0 && !this._settingUp && !!this._channel && this._channelHasRoom); | ||
} | ||
@@ -307,2 +312,40 @@ // Start publishing queued messages, if there isn't already a worker doing this. | ||
} | ||
_messageResolved(message, result) { | ||
removeUnconfirmedMessage(this._unconfirmedMessages, message); | ||
message.resolve(result); | ||
} | ||
_messageRejected(message, err) { | ||
if (!this._channel && this._canWaitReconnection()) { | ||
// Tried to write to a closed channel. Leave the message in the queue and we'll try again when | ||
// we reconnect. | ||
removeUnconfirmedMessage(this._unconfirmedMessages, message); | ||
this._messages.push(message); | ||
} | ||
else { | ||
// Something went wrong trying to send this message - could be JSON.stringify failed, could be | ||
// the broker rejected the message. Either way, reject it back | ||
removeUnconfirmedMessage(this._unconfirmedMessages, message); | ||
message.reject(err); | ||
} | ||
} | ||
_getEncodedMessage(content) { | ||
let encodedMessage; | ||
if (this._json) { | ||
encodedMessage = Buffer.from(JSON.stringify(content)); | ||
} | ||
else if (typeof content === 'string') { | ||
encodedMessage = Buffer.from(content); | ||
} | ||
else if (content instanceof Buffer) { | ||
encodedMessage = content; | ||
} | ||
else if (typeof content === 'object' && typeof content.toString === 'function') { | ||
encodedMessage = Buffer.from(content.toString()); | ||
} | ||
else { | ||
console.warn('amqp-connection-manager: Sending JSON message, but json option not speicifed'); | ||
encodedMessage = Buffer.from(JSON.stringify(content)); | ||
} | ||
return encodedMessage; | ||
} | ||
_publishQueuedMessages(workerNumber) { | ||
@@ -318,85 +361,54 @@ const channel = this._channel; | ||
} | ||
const message = this._messages.shift(); | ||
if (message) { | ||
this._unconfirmedMessages.push(message); | ||
Promise.resolve() | ||
.then(() => { | ||
let encodedMessage; | ||
if (this._json) { | ||
encodedMessage = Buffer.from(JSON.stringify(message.content)); | ||
try { | ||
// Send messages in batches of 1000 - don't want to starve the event loop. | ||
let sendsLeft = MAX_MESSAGES_PER_BATCH; | ||
while (this._channelHasRoom && this._messages.length > 0 && sendsLeft > 0) { | ||
sendsLeft--; | ||
const message = this._messages.shift(); | ||
if (!message) { | ||
break; | ||
} | ||
else if (typeof message.content === 'string') { | ||
encodedMessage = Buffer.from(message.content); | ||
} | ||
else if (message.content instanceof Buffer) { | ||
encodedMessage = message.content; | ||
} | ||
else if (typeof message.content === 'object' && | ||
typeof message.content.toString === 'function') { | ||
encodedMessage = Buffer.from(message.content.toString()); | ||
} | ||
else { | ||
throw new Error('Invalid message content'); | ||
} | ||
let result = true; | ||
const sendPromise = (() => { | ||
switch (message.type) { | ||
case 'publish': | ||
return new Promise(function (resolve, reject) { | ||
result = channel.publish(message.exchange, message.routingKey, encodedMessage, message.options, (err) => { | ||
if (err) { | ||
reject(err); | ||
} | ||
else { | ||
resolve(result); | ||
} | ||
}); | ||
}); | ||
case 'sendToQueue': | ||
return new Promise(function (resolve, reject) { | ||
result = channel.sendToQueue(message.queue, encodedMessage, message.options, (err) => { | ||
if (err) { | ||
reject(err); | ||
} | ||
else { | ||
resolve(result); | ||
} | ||
}); | ||
}); | ||
/* istanbul ignore next */ | ||
default: | ||
throw new Error(`Unhandled message type ${message.type}`); | ||
this._unconfirmedMessages.push(message); | ||
const encodedMessage = this._getEncodedMessage(message.content); | ||
switch (message.type) { | ||
case 'publish': { | ||
let thisCanSend = true; | ||
thisCanSend = this._channelHasRoom = channel.publish(message.exchange, message.routingKey, encodedMessage, message.options, (err) => { | ||
if (err) { | ||
this._messageRejected(message, err); | ||
} | ||
else { | ||
this._messageResolved(message, thisCanSend); | ||
} | ||
}); | ||
break; | ||
} | ||
})(); | ||
if (result) { | ||
this._publishQueuedMessages(workerNumber); | ||
case 'sendToQueue': { | ||
let thisCanSend = true; | ||
thisCanSend = this._channelHasRoom = channel.sendToQueue(message.queue, encodedMessage, message.options, (err) => { | ||
if (err) { | ||
this._messageRejected(message, err); | ||
} | ||
else { | ||
this._messageResolved(message, thisCanSend); | ||
} | ||
}); | ||
break; | ||
} | ||
/* istanbul ignore next */ | ||
default: | ||
throw new Error(`Unhandled message type ${message.type}`); | ||
} | ||
else { | ||
channel.once('drain', () => this._publishQueuedMessages(workerNumber)); | ||
} | ||
return sendPromise; | ||
}) | ||
.then((result) => { | ||
removeUnconfirmedMessage(this._unconfirmedMessages, message); | ||
message.resolve(result); | ||
}, (err) => { | ||
if (!this._channel && this._canWaitReconnection()) { | ||
// Tried to write to a closed channel. Leave the message in the queue and we'll try again when | ||
// we reconnect. | ||
removeUnconfirmedMessage(this._unconfirmedMessages, message); | ||
this._messages.push(message); | ||
} | ||
else { | ||
// Something went wrong trying to send this message - could be JSON.stringify failed, could be | ||
// the broker rejected the message. Either way, reject it back | ||
removeUnconfirmedMessage(this._unconfirmedMessages, message); | ||
message.reject(err); | ||
} | ||
}) | ||
.catch( | ||
/* istanbul ignore next */ (err) => { | ||
this.emit('error', err); | ||
this._working = false; | ||
}); | ||
} | ||
// If we didn't send all the messages, send some more... | ||
if (this._channelHasRoom && this._messages.length > 0) { | ||
setImmediate(() => this._publishQueuedMessages(workerNumber)); | ||
} | ||
this._working = false; | ||
/* istanbul ignore next */ | ||
} | ||
catch (err) { | ||
this._working = false; | ||
this.emit('error', err); | ||
} | ||
} | ||
@@ -403,0 +415,0 @@ /** Send an `ack` to the underlying channel. */ |
{ | ||
"name": "amqp-connection-manager", | ||
"version": "3.5.1", | ||
"version": "3.5.2", | ||
"description": "Auto-reconnect and round robin support for amqplib.", | ||
@@ -5,0 +5,0 @@ "module": "./dist/esm/index.js", |
Sorry, the diff of this file is not supported yet
112906
1912