Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

@pager/jackrabbit

Package Overview
Dependencies
Maintainers
35
Versions
48
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@pager/jackrabbit - npm Package Compare versions

Comparing version 5.3.1 to 5.4.0

7

CHANGELOG.md

@@ -0,1 +1,8 @@

# [5.4.0](https://github.com/pagerinc/jackrabbit/compare/v5.3.1...v5.4.0) (2022-04-19)
### Features
* ENG-4019 Gracefully reconnect to rabbit ([#861](https://github.com/pagerinc/jackrabbit/issues/861)) ([62d025c](https://github.com/pagerinc/jackrabbit/commit/62d025ccf05e08d1d512aa87920a4266f69a0916))
## [5.3.1](https://github.com/pagerinc/jackrabbit/compare/v5.3.0...v5.3.1) (2021-05-28)

@@ -2,0 +9,0 @@

56

lib/exchange.js

@@ -67,7 +67,10 @@ 'use strict';

let blocked = false;
let connecting = false;
let channel;
let connection;
let publishing = 0;
let replyQueueConfigured = false;
const options = Extend({}, DEFAULT_EXCHANGE_OPTIONS, exchangeOptions);
const replyQueue = options.noReply ? null : Queue({ exclusive: true });
const activeQueues = [];
const pendingReplies = {};

@@ -118,4 +121,15 @@

const doWhenReady = (readyFlag, fn) => {
if (readyFlag) {
fn();
}
else {
emitter.once('ready', fn);
}
};
const connect = (con) => {
connecting = true;
connection = con;

@@ -127,6 +141,19 @@ connection.createChannel(onChannel);

if (replyQueue) {
replyQueue.on('close', bail.bind(this));
replyQueue.consume(onReply, { noAck: true });
replyQueue.once('close', bail.bind(this));
if (!replyQueueConfigured) { // This should only happen once
doWhenReady(ready, () => {
replyQueue.consume(onReply, { noAck: true });
replyQueueConfigured = true;
});
}
}
doWhenReady(ready, () => {
activeQueues.forEach((queue) => queue.connect(connection));
connecting = false;
});
return emitter;

@@ -145,3 +172,3 @@ };

channel.bindQueue(newQueue.name, emitter.name, key, {}, (err, ok) => {
channel.bindQueue(newQueue.amqLabel, emitter.name, key, {}, (err, ok) => {

@@ -161,3 +188,3 @@ if (err) {

const newQueue = Queue(queueOptions);
newQueue.on('close', bail.bind(this));
newQueue.once('close', bail.bind(this));
newQueue.once('ready', () => {

@@ -176,12 +203,7 @@ // the default exchange has implicit bindings to all queues

if (connection) {
activeQueues.push(newQueue);
if (connection && ready && !connecting) {
newQueue.connect(connection);
}
else {
emitter.once('ready', () => {
newQueue.connect(connection);
});
}
return newQueue;

@@ -232,12 +254,4 @@ };

if (ready) {
sendMessageRef(message, publishOptions);
}
else {
emitter.once('ready', () => {
doWhenReady(ready, () => sendMessageRef(message, publishOptions));
sendMessageRef(message, publishOptions);
});
}
return emitter;

@@ -392,3 +406,3 @@ };

channel = chan;
channel.on('close', bail.bind(this, new Error('channel closed')));
channel.once('close', bail.bind(this, new Error('channel closed')));
channel.on('drain', onDrain);

@@ -395,0 +409,0 @@ emitter.emit('connected');

@@ -8,3 +8,3 @@ 'use strict';

const jackrabbit = (url) => {
const jackrabbit = (url, logger, options = {}) => {

@@ -15,4 +15,13 @@ if (!url) {

options.reconnectionTimeout = options.reconnectionTimeout || +process.env.RABBIT_RECONNECTION_TIMEOUT || 2000;
options.maxRetries = options.maxRetries || +process.env.RABBIT_RECONNECTION_RETRIES || 20;
if (process.env.RABBIT_RECONNECTION_EXACT_TIMEOUT !== 'true') {
options.reconnectionTimeout = Math.floor(options.reconnectionTimeout * (1 + Math.random() * 0.1));
}
// state
let connection;
let connectionAttempts = 0;
const exchanges = [];
const pendingExchangesForConnection = [];

@@ -72,6 +81,8 @@

return (type, name, options) => {
return (type, name, exchangeOptions) => {
const newExchange = Exchange(name, type, options);
const newExchange = Exchange(name, type, exchangeOptions);
exchanges.push(newExchange);
if (connection) {
connection.setMaxListeners(exchanges.length + 10);
newExchange.connect(connection);

@@ -93,7 +104,58 @@ }

connection = undefined;
if (err) {
if (err && !tryReconnect(err)) {
rabbit.emit('error', err);
doLog('fatal', 'Rabbit connection error!');
process.exit(1);
}
};
const isReconnectionError = (err) => {
return err.code === 320 || err.message === 'Socket closed abruptly during opening handshake' || err.message.includes('ECONNREFUSED');
};
const doLog = (level, message) => {
if (typeof logger?.[level] === 'function') {
logger[level](message);
}
else if (typeof logger?.log === 'function') {
logger.log(level, message);
}
else {
rabbit.emit(level, message);
}
};
const tryReconnect = (err) => {
if (!isReconnectionError(err)) {
return false;
}
if (connectionAttempts >= options.maxRetries) {
err.meta = 'Error connecting to RabbitMQ';
return false;
}
const doReconnect = () => {
++connectionAttempts;
rabbit.emit('reconnecting');
doLog('info', `Reconnecting to RabbitMQ (${connectionAttempts}/${options.maxRetries})...`);
Amqp.connect(url, onConnection);
};
if (connectionAttempts === 0) {
doLog('warn', `Lost connection to RabbitMQ! Reconnecting in ${options.reconnectionTimeout}ms...`);
doReconnect();
}
else {
setTimeout(doReconnect, options.reconnectionTimeout);
}
return true;
};
const onConnection = (err, conn) => {

@@ -106,10 +168,36 @@

connection = conn;
connection.on('close', bail.bind(this));
connection.setMaxListeners(exchanges.length + 10);
connection.once('close', bail.bind(this));
connection.on('blocked', (cause) => rabbit.emit('blocked', cause));
connection.on('unblocked', () => rabbit.emit('unblocked'));
pendingExchangesForConnection.forEach((exchange) => {
exchange.connect(connection);
});
rabbit.emit('connected');
const notifyReady = () => {
rabbit.emit(connectionAttempts > 0 ? 'reconnected' : 'connected');
if (connectionAttempts > 0) {
doLog('info', 'Reconnected to RabbitMQ');
connectionAttempts = 0;
}
};
const pendingExchanges = connectionAttempts > 0 ? exchanges : pendingExchangesForConnection;
if (pendingExchanges.length === 0) {
notifyReady();
}
else {
let readyCount = 0;
pendingExchanges.forEach((exchange) => {
exchange.connect(connection);
exchange.once('ready', () => {
++readyCount;
if (readyCount === pendingExchanges.length) {
notifyReady();
}
});
});
}
};

@@ -116,0 +204,0 @@

@@ -23,2 +23,4 @@ 'use strict';

const consumers = [];
const connect = (connection) => {

@@ -67,12 +69,9 @@

const opts = Extend({}, DEFAULT_CONSUME_OPTIONS, consumeOptions);
channel.consume(emitter.name, onMessage, opts, onConsume);
channel.consume(emitter.amqLabel, onMessage, opts, onConsume);
consumers.push({ onMessage, opts });
return;
}
emitter.once('ready', () => {
const opts = Extend({}, DEFAULT_CONSUME_OPTIONS, consumeOptions);
channel.consume(emitter.name, onMessage, opts, onConsume);
});
const opts = Extend({}, DEFAULT_CONSUME_OPTIONS, consumeOptions);
consumers.push({ onMessage, opts });
};

@@ -105,3 +104,3 @@

if (channel) {
channel.purgeQueue(emitter.name, onPurged);
channel.purgeQueue(emitter.amqLabel, onPurged);
}

@@ -111,3 +110,3 @@ else {

channel.purgeQueue(emitter.name, onPurged);
channel.purgeQueue(emitter.amqLabel, onPurged);
});

@@ -156,3 +155,3 @@ }

channel = undefined;
emitter.name = undefined;
emitter.amqLabel = undefined;
consumerTag = undefined;

@@ -170,5 +169,10 @@ emitter.emit('close', err);

channel.prefetch(emitter.options.prefetch);
channel.on('close', bail.bind(this, new Error('channel closed')));
channel.once('close', bail.bind(this, new Error('channel closed')));
emitter.emit('connected');
channel.assertQueue(emitter.name, emitter.options, onQueue);
emitter.once('ready', () => {
consumers.forEach((consumer) => channel.consume(emitter.amqLabel, consumer.onMessage, consumer.opts, onConsume));
});
};

@@ -182,3 +186,3 @@

emitter.name = info.queue;
emitter.amqLabel = info.queue;
ready = true;

@@ -192,2 +196,3 @@ emitter.emit('ready');

name: options.name,
amqLabel: undefined, // Holds the current connection's name
options: Extend({}, DEFAULT_QUEUE_OPTIONS, options),

@@ -194,0 +199,0 @@ connect,

{
"name": "@pager/jackrabbit",
"version": "5.3.1",
"version": "5.4.0",
"description": "Easy RabbitMQ for node",

@@ -43,3 +43,3 @@ "keywords": [

"dependencies": {
"amqplib": "0.7.x",
"amqplib": "0.8.x",
"lodash.assignin": "4.x",

@@ -51,7 +51,8 @@ "uuid": "8.x"

"chai": "4.x",
"dotenv": "16.0.0",
"eslint": "7.x",
"eslint-config-hapi": "12.x",
"eslint-plugin-hapi": "4.x",
"mocha": "8.x",
"semantic-release": "17.x",
"mocha": "9.x",
"semantic-release": "19.x",
"sinon": "11.x"

@@ -58,0 +59,0 @@ },

# Jackrabbit
This is an actively maintained fork of [hunterloftis/jackrabbit], a library for
RabbitMQ in Node.js without hating life.
This is a fork of [hunterloftis/jackrabbit].

@@ -85,1 +84,16 @@ [![CircleCI](https://circleci.com/gh/pagerinc/jackrabbit.svg?style=svg)](https://circleci.com/gh/pagerinc/jackrabbit)

[hunterloftis/jackrabbit]: https://github.com/hunterloftis/jackrabbit
## Reconnection
> Jackrabbit is a wrapper for [ampqlib](https://github.com/amqp-node/amqplib), ampqlib does NOT support reconnection.
This project will try to recover a lost connection gracefully, if it fails to do so, we will throw an `error` event and then exit the current process with code `1`.
Our approach to reconnection is recording all the exchanges and queues created through jackrabbit. Once a connection is lost, we will try to create a new one, update the existing exchange and queue references, initialize a new channel for each queue, and bind each queue's consumers to their new channel. This should be transparent to any users of this lib.
You can configure some basic parameters of the reconnection process with some env vars:
Name|Default|Description
-|-|-
`RABBIT_RECONNECTION_TIMEOUT`| 2000 | ms between each reconnection attempt. The first attempt will always be immediate.
`RABBIT_RECONNECTION_RETRIES`| 20 | Amount of retries before erroring out and killing the node process.
`RABBIT_RECONNECTION_EXACT_TIMEOUT` | false | To prevent total outages on HA services, we're adding a random overhead of 0-10% to the reconnection timeout by default. You can disable this behaviour by setting this option to `true`.
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc