@pager/jackrabbit
Advanced tools
Comparing version 5.3.1 to 5.4.0
@@ -0,1 +1,8 @@ | ||
# [5.4.0](https://github.com/pagerinc/jackrabbit/compare/v5.3.1...v5.4.0) (2022-04-19) | ||
### Features | ||
* ENG-4019 Gracefully reconnect to rabbit ([#861](https://github.com/pagerinc/jackrabbit/issues/861)) ([62d025c](https://github.com/pagerinc/jackrabbit/commit/62d025ccf05e08d1d512aa87920a4266f69a0916)) | ||
## [5.3.1](https://github.com/pagerinc/jackrabbit/compare/v5.3.0...v5.3.1) (2021-05-28) | ||
@@ -2,0 +9,0 @@ |
@@ -67,7 +67,10 @@ 'use strict'; | ||
let blocked = false; | ||
let connecting = false; | ||
let channel; | ||
let connection; | ||
let publishing = 0; | ||
let replyQueueConfigured = false; | ||
const options = Extend({}, DEFAULT_EXCHANGE_OPTIONS, exchangeOptions); | ||
const replyQueue = options.noReply ? null : Queue({ exclusive: true }); | ||
const activeQueues = []; | ||
const pendingReplies = {}; | ||
@@ -118,4 +121,15 @@ | ||
const doWhenReady = (readyFlag, fn) => { | ||
if (readyFlag) { | ||
fn(); | ||
} | ||
else { | ||
emitter.once('ready', fn); | ||
} | ||
}; | ||
const connect = (con) => { | ||
connecting = true; | ||
connection = con; | ||
@@ -127,6 +141,19 @@ connection.createChannel(onChannel); | ||
if (replyQueue) { | ||
replyQueue.on('close', bail.bind(this)); | ||
replyQueue.consume(onReply, { noAck: true }); | ||
replyQueue.once('close', bail.bind(this)); | ||
if (!replyQueueConfigured) { // This should only happen once | ||
doWhenReady(ready, () => { | ||
replyQueue.consume(onReply, { noAck: true }); | ||
replyQueueConfigured = true; | ||
}); | ||
} | ||
} | ||
doWhenReady(ready, () => { | ||
activeQueues.forEach((queue) => queue.connect(connection)); | ||
connecting = false; | ||
}); | ||
return emitter; | ||
@@ -145,3 +172,3 @@ }; | ||
channel.bindQueue(newQueue.name, emitter.name, key, {}, (err, ok) => { | ||
channel.bindQueue(newQueue.amqLabel, emitter.name, key, {}, (err, ok) => { | ||
@@ -161,3 +188,3 @@ if (err) { | ||
const newQueue = Queue(queueOptions); | ||
newQueue.on('close', bail.bind(this)); | ||
newQueue.once('close', bail.bind(this)); | ||
newQueue.once('ready', () => { | ||
@@ -176,12 +203,7 @@ // the default exchange has implicit bindings to all queues | ||
if (connection) { | ||
activeQueues.push(newQueue); | ||
if (connection && ready && !connecting) { | ||
newQueue.connect(connection); | ||
} | ||
else { | ||
emitter.once('ready', () => { | ||
newQueue.connect(connection); | ||
}); | ||
} | ||
return newQueue; | ||
@@ -232,12 +254,4 @@ }; | ||
if (ready) { | ||
sendMessageRef(message, publishOptions); | ||
} | ||
else { | ||
emitter.once('ready', () => { | ||
doWhenReady(ready, () => sendMessageRef(message, publishOptions)); | ||
sendMessageRef(message, publishOptions); | ||
}); | ||
} | ||
return emitter; | ||
@@ -392,3 +406,3 @@ }; | ||
channel = chan; | ||
channel.on('close', bail.bind(this, new Error('channel closed'))); | ||
channel.once('close', bail.bind(this, new Error('channel closed'))); | ||
channel.on('drain', onDrain); | ||
@@ -395,0 +409,0 @@ emitter.emit('connected'); |
@@ -8,3 +8,3 @@ 'use strict'; | ||
const jackrabbit = (url) => { | ||
const jackrabbit = (url, logger, options = {}) => { | ||
@@ -15,4 +15,13 @@ if (!url) { | ||
options.reconnectionTimeout = options.reconnectionTimeout || +process.env.RABBIT_RECONNECTION_TIMEOUT || 2000; | ||
options.maxRetries = options.maxRetries || +process.env.RABBIT_RECONNECTION_RETRIES || 20; | ||
if (process.env.RABBIT_RECONNECTION_EXACT_TIMEOUT !== 'true') { | ||
options.reconnectionTimeout = Math.floor(options.reconnectionTimeout * (1 + Math.random() * 0.1)); | ||
} | ||
// state | ||
let connection; | ||
let connectionAttempts = 0; | ||
const exchanges = []; | ||
const pendingExchangesForConnection = []; | ||
@@ -72,6 +81,8 @@ | ||
return (type, name, options) => { | ||
return (type, name, exchangeOptions) => { | ||
const newExchange = Exchange(name, type, options); | ||
const newExchange = Exchange(name, type, exchangeOptions); | ||
exchanges.push(newExchange); | ||
if (connection) { | ||
connection.setMaxListeners(exchanges.length + 10); | ||
newExchange.connect(connection); | ||
@@ -93,7 +104,58 @@ } | ||
connection = undefined; | ||
if (err) { | ||
if (err && !tryReconnect(err)) { | ||
rabbit.emit('error', err); | ||
doLog('fatal', 'Rabbit connection error!'); | ||
process.exit(1); | ||
} | ||
}; | ||
const isReconnectionError = (err) => { | ||
return err.code === 320 || err.message === 'Socket closed abruptly during opening handshake' || err.message.includes('ECONNREFUSED'); | ||
}; | ||
const doLog = (level, message) => { | ||
if (typeof logger?.[level] === 'function') { | ||
logger[level](message); | ||
} | ||
else if (typeof logger?.log === 'function') { | ||
logger.log(level, message); | ||
} | ||
else { | ||
rabbit.emit(level, message); | ||
} | ||
}; | ||
const tryReconnect = (err) => { | ||
if (!isReconnectionError(err)) { | ||
return false; | ||
} | ||
if (connectionAttempts >= options.maxRetries) { | ||
err.meta = 'Error connecting to RabbitMQ'; | ||
return false; | ||
} | ||
const doReconnect = () => { | ||
++connectionAttempts; | ||
rabbit.emit('reconnecting'); | ||
doLog('info', `Reconnecting to RabbitMQ (${connectionAttempts}/${options.maxRetries})...`); | ||
Amqp.connect(url, onConnection); | ||
}; | ||
if (connectionAttempts === 0) { | ||
doLog('warn', `Lost connection to RabbitMQ! Reconnecting in ${options.reconnectionTimeout}ms...`); | ||
doReconnect(); | ||
} | ||
else { | ||
setTimeout(doReconnect, options.reconnectionTimeout); | ||
} | ||
return true; | ||
}; | ||
const onConnection = (err, conn) => { | ||
@@ -106,10 +168,36 @@ | ||
connection = conn; | ||
connection.on('close', bail.bind(this)); | ||
connection.setMaxListeners(exchanges.length + 10); | ||
connection.once('close', bail.bind(this)); | ||
connection.on('blocked', (cause) => rabbit.emit('blocked', cause)); | ||
connection.on('unblocked', () => rabbit.emit('unblocked')); | ||
pendingExchangesForConnection.forEach((exchange) => { | ||
exchange.connect(connection); | ||
}); | ||
rabbit.emit('connected'); | ||
const notifyReady = () => { | ||
rabbit.emit(connectionAttempts > 0 ? 'reconnected' : 'connected'); | ||
if (connectionAttempts > 0) { | ||
doLog('info', 'Reconnected to RabbitMQ'); | ||
connectionAttempts = 0; | ||
} | ||
}; | ||
const pendingExchanges = connectionAttempts > 0 ? exchanges : pendingExchangesForConnection; | ||
if (pendingExchanges.length === 0) { | ||
notifyReady(); | ||
} | ||
else { | ||
let readyCount = 0; | ||
pendingExchanges.forEach((exchange) => { | ||
exchange.connect(connection); | ||
exchange.once('ready', () => { | ||
++readyCount; | ||
if (readyCount === pendingExchanges.length) { | ||
notifyReady(); | ||
} | ||
}); | ||
}); | ||
} | ||
}; | ||
@@ -116,0 +204,0 @@ |
@@ -23,2 +23,4 @@ 'use strict'; | ||
const consumers = []; | ||
const connect = (connection) => { | ||
@@ -67,12 +69,9 @@ | ||
const opts = Extend({}, DEFAULT_CONSUME_OPTIONS, consumeOptions); | ||
channel.consume(emitter.name, onMessage, opts, onConsume); | ||
channel.consume(emitter.amqLabel, onMessage, opts, onConsume); | ||
consumers.push({ onMessage, opts }); | ||
return; | ||
} | ||
emitter.once('ready', () => { | ||
const opts = Extend({}, DEFAULT_CONSUME_OPTIONS, consumeOptions); | ||
channel.consume(emitter.name, onMessage, opts, onConsume); | ||
}); | ||
const opts = Extend({}, DEFAULT_CONSUME_OPTIONS, consumeOptions); | ||
consumers.push({ onMessage, opts }); | ||
}; | ||
@@ -105,3 +104,3 @@ | ||
if (channel) { | ||
channel.purgeQueue(emitter.name, onPurged); | ||
channel.purgeQueue(emitter.amqLabel, onPurged); | ||
} | ||
@@ -111,3 +110,3 @@ else { | ||
channel.purgeQueue(emitter.name, onPurged); | ||
channel.purgeQueue(emitter.amqLabel, onPurged); | ||
}); | ||
@@ -156,3 +155,3 @@ } | ||
channel = undefined; | ||
emitter.name = undefined; | ||
emitter.amqLabel = undefined; | ||
consumerTag = undefined; | ||
@@ -170,5 +169,10 @@ emitter.emit('close', err); | ||
channel.prefetch(emitter.options.prefetch); | ||
channel.on('close', bail.bind(this, new Error('channel closed'))); | ||
channel.once('close', bail.bind(this, new Error('channel closed'))); | ||
emitter.emit('connected'); | ||
channel.assertQueue(emitter.name, emitter.options, onQueue); | ||
emitter.once('ready', () => { | ||
consumers.forEach((consumer) => channel.consume(emitter.amqLabel, consumer.onMessage, consumer.opts, onConsume)); | ||
}); | ||
}; | ||
@@ -182,3 +186,3 @@ | ||
emitter.name = info.queue; | ||
emitter.amqLabel = info.queue; | ||
ready = true; | ||
@@ -192,2 +196,3 @@ emitter.emit('ready'); | ||
name: options.name, | ||
amqLabel: undefined, // Holds the current connection's name | ||
options: Extend({}, DEFAULT_QUEUE_OPTIONS, options), | ||
@@ -194,0 +199,0 @@ connect, |
{ | ||
"name": "@pager/jackrabbit", | ||
"version": "5.3.1", | ||
"version": "5.4.0", | ||
"description": "Easy RabbitMQ for node", | ||
@@ -43,3 +43,3 @@ "keywords": [ | ||
"dependencies": { | ||
"amqplib": "0.7.x", | ||
"amqplib": "0.8.x", | ||
"lodash.assignin": "4.x", | ||
@@ -51,7 +51,8 @@ "uuid": "8.x" | ||
"chai": "4.x", | ||
"dotenv": "16.0.0", | ||
"eslint": "7.x", | ||
"eslint-config-hapi": "12.x", | ||
"eslint-plugin-hapi": "4.x", | ||
"mocha": "8.x", | ||
"semantic-release": "17.x", | ||
"mocha": "9.x", | ||
"semantic-release": "19.x", | ||
"sinon": "11.x" | ||
@@ -58,0 +59,0 @@ }, |
# Jackrabbit | ||
This is an actively maintained fork of [hunterloftis/jackrabbit], a library for | ||
RabbitMQ in Node.js without hating life. | ||
This is a fork of [hunterloftis/jackrabbit]. | ||
@@ -85,1 +84,16 @@ [![CircleCI](https://circleci.com/gh/pagerinc/jackrabbit.svg?style=svg)](https://circleci.com/gh/pagerinc/jackrabbit) | ||
[hunterloftis/jackrabbit]: https://github.com/hunterloftis/jackrabbit | ||
## Reconnection | ||
> Jackrabbit is a wrapper for [ampqlib](https://github.com/amqp-node/amqplib), ampqlib does NOT support reconnection. | ||
This project will try to recover a lost connection gracefully, if it fails to do so, we will throw an `error` event and then exit the current process with code `1`. | ||
Our approach to reconnection is recording all the exchanges and queues created through jackrabbit. Once a connection is lost, we will try to create a new one, update the existing exchange and queue references, initialize a new channel for each queue, and bind each queue's consumers to their new channel. This should be transparent to any users of this lib. | ||
You can configure some basic parameters of the reconnection process with some env vars: | ||
Name|Default|Description | ||
-|-|- | ||
`RABBIT_RECONNECTION_TIMEOUT`| 2000 | ms between each reconnection attempt. The first attempt will always be immediate. | ||
`RABBIT_RECONNECTION_RETRIES`| 20 | Amount of retries before erroring out and killing the node process. | ||
`RABBIT_RECONNECTION_EXACT_TIMEOUT` | false | To prevent total outages on HA services, we're adding a random overhead of 0-10% to the reconnection timeout by default. You can disable this behaviour by setting this option to `true`. |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Environment variable access
Supply chain riskPackage accesses environment variables, which may be a sign of credential stuffing or data theft.
Found 3 instances in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
30688
648
99
9
3
+ Addedamqplib@0.8.0(transitive)
- Removedamqplib@0.7.1(transitive)
Updatedamqplib@0.8.x