Rabbit Queue
,---. | | o| ,---.
|---',---.|---.|---..|--- | |. .,---.. .,---.
| \ ,---|| || ||| | || ||---'| ||---'
` ``---^`---'`---'``---' `---\`---'`---'`---'`---'
About
This module is based on WRabbit
and Jackrabbit.
It makes RabbitMQ management and integration easy. Provides an abstraction layer
above amqplib.
It is written in typescript and requires node v6.0.0 or higher.
Connecting to RabbitMQ
const {Rabbit} = require('rabbit-queue');
const rabbit = new Rabbit(process.env.RABBIT_URL || 'amqp://localhost', {
prefetch: 1,
replyPattern: true,
logger: log4js.getLogger(`Rabbit-queue`),
prefix: ''
});
rabbit.on('connected', () => {
});
rabbit.on('disconnected', (err = new Error('Rabbitmq Disconnected')) => {
console.error(err);
setTimeout(() => rabbit.reconnect(), 100);
});
Usage
const {Rabbit} = require('rabbit-queue');
const rabbit = new Rabbit('amqp://localhost');
rabbit.createQueue('queueName', { durable: false }, (msg, ack) => {
console.log(msg.content.toString());
ack('response');
}).then(() => console.log('queue created'));
rabbit.publish('queueName', { test: 'data' }, { correlationId: '1' })
.then(() => console.log('message published'));
rabbit.getReply('queueName', { test: 'data' }, { correlationId: '1' })
.then((reply) => console.log('received reply', reply));
rabbit.getReply('queueName', { test: 'data' }, { correlationId: '1' }, '', 100)
.then((reply) => console.log('received reply', reply))
.catch((error)=> console.log('Timed out after 100ms'))
Binding to topics
const {Rabbit} = require('rabbit-queue');
const rabbit = new Rabbit('amqp://localhost');
rabbit.createQueue('queueName', { durable: false }, (msg, ack) => {
console.log(msg.content.toString());
ack('response');
}).then(() => console.log('queue created'));
rabbit.bindToExchange('queueName', 'amq.topic', 'routingKey');
rabbit.publishExchange('amq.topic', 'routingKey', { test: 'data' }, { correlationId: '1' })
.then(() => console.log('message published'));
Advanced Usage with BaseQueueHandler (will add to dlq after 3 failed retries)
const rabbit = new Rabbit('amqp://localhost');
class DemoHandler extends BaseQueueHandler {
handle({msg, event, correlationId, startTime}) {
console.log('Received: ', event);
console.log('With correlation id: ' + correlationId);
}
afterDlq({msg, event}) {
}
}
new DemoHandler('demoQueue', rabbit,
{
retries: 3,
retryDelay: 1000,
logEnabled: true,
logger: log4js.getLogger('[demoQueue]')
});
rabbit.publish('demoQueue', { test: 'data' }, { correlationId: '4' });
Get reply pattern implemented with a QueueHandler
const rabbit = new Rabbit(process.env.RABBIT_URL || 'amqp://localhost');
class DemoHandler extends BaseQueueHandler {
handle({msg, event, correlationId, startTime}) {
console.log('Received: ', event);
console.log('With correlation id: ' + correlationId);
return Promise.resolve('reply');
}
afterDlq({msg, event}) {
}
}
new DemoHandler('demoQueue', rabbit,
{
retries: 3,
retryDelay: 1000,
logEnabled: true,
logger: log4js.getLogger('[demoQueue]')
});
rabbit.getReply('demoQueue', { test: 'data' }, { correlationId: '5' })
.then((reply) => console.log('received reply', reply));
Example where handle throws an error
const rabbit = new Rabbit(process.env.RABBIT_URL || 'amqp://localhost');
class DemoHandler extends BaseQueueHandler {
handle({msg, event, correlationId, startTime}) {
return Promise.reject(new Error('test Error'));
}
afterDlq({msg, event}) {
console.log('added to dlq');
}
}
new DemoHandler('demoQueue', rabbit,
{
retries: 3,
retryDelay: 1,
logEnabled: true,
logger: log4js.getLogger('[demoQueue]')
});
rabbit.getReply('demoQueue', { test: 'data' }, { correlationId: '5' })
.then((reply) => console.log('received reply', reply));
Tests
The tests are set up with Docker + Docker-Compose,
so you don't need to install rabbitmq (or even node) to run them:
npm run test-docker