
RabbitMQ Client
Node.js client library for RabbitMQ. Publish
messages, declare rules for routing those messages into queues, consume messages from queues.
Why not amqplib?
- No dependencies
- Automatically re-connect, re-subscribe, or retry publishing
- Optional higher-level Consumer/Publisher API for even more robustness
- Written in typescript and published with heavily commented type definitions
- See here for full API documentation
- Intuitive API with named parameters instead of positional
- "x-arguments" like "x-message-ttl" don't have camelCase aliases
RabbitMQ Compatibility
- To connect to RabbitMQ version 4.1.x or higher, you must use version 5.0.3 or higher of this library. See #75 and the CHANGELOG for details
Performance
Performance is comparable to amqplib (see ./benchmark.ts).
| rabbitmq-client publish-confirm (null route) | 2,611 | 382919 | ±3.69% | 1306 |
| amqplib publish-confirm (null route) | 2,315 | 431880 | ±4.89% | 1158 |
| rabbitmq-client publish-confirm (transient queue) | 961 | 1039884 | ±1.07% | 481 |
| amqplib publish-confirm (transient queue) | 1,059 | 943706 | ±1.34% | 530 |
Quick start
In addition to the lower-level RabbitMQ methods, this library exposes two main
interfaces, a Consumer and a Publisher (which should cover 90% of uses
cases), as well as a third RPCClient for request-response communication.
import {Connection} from 'rabbitmq-client'
const rabbit = new Connection('amqp://guest:guest@localhost:5672')
rabbit.on('error', (err) => {
console.log('RabbitMQ connection error', err)
})
rabbit.on('connection', () => {
console.log('Connection successfully (re)established')
})
const sub = rabbit.createConsumer({
queue: 'user-events',
queueOptions: {durable: true},
qos: {prefetchCount: 2},
exchanges: [{exchange: 'my-events', type: 'topic'}],
queueBindings: [{exchange: 'my-events', routingKey: 'users.*'}],
}, async (msg) => {
console.log('received message (user-events)', msg)
})
sub.on('error', (err) => {
console.log('consumer error (user-events)', err)
})
const pub = rabbit.createPublisher({
confirm: true,
maxAttempts: 2,
exchanges: [{exchange: 'my-events', type: 'topic'}]
})
await pub.send(
{exchange: 'my-events', routingKey: 'users.visit'},
{id: 1, name: 'Alan Turing'})
await pub.send('user-events', {id: 1, name: 'Alan Turing'})
async function onShutdown() {
await pub.close()
await sub.close()
await rabbit.close()
}
process.on('SIGINT', onShutdown)
process.on('SIGTERM', onShutdown)
Connection.createConsumer() vs Channel.basicConsume()
The above Consumer & Publisher interfaces are recommended for most cases.
These combine a few of the lower level RabbitMQ methods (exposed on the
Channel interface) and and are much safer to use since they can recover after
connection loss, or after a number of other edge-cases you may not have
imagined. Consider the following list of scenarios (not exhaustive):
- Connection lost due to a server restart, missed heartbeats (timeout), forced
by the management UI, etc.
- Channel closed as a result of publishing to an exchange which does not exist
(or was deleted), or attempting to acknowledge an invalid deliveryTag
- Consumer closed from the management UI, or because the queue was deleted, or
because basicCancel() was called
In all of these cases you would need to create a new channel and re-declare any
queues/exchanges/bindings before you can start publishing/consuming messages
again. And you're probably publishing many messages, concurrently, so you'd
want to make sure this setup only runs once per connection. If a consumer is
cancelled then you may be able to reuse the channel but you still need to check
the queue and so on...
The Consumer & Publisher interfaces abstract all of that away by running
the necessary setup as needed and handling all the edge-cases for you.
Managing queues & exchanges
A number of management methods are available on the Connection interface; you
can create/delete queues, exchanges, or bindings between them. It's generally
safer to do this declaratively with a Consumer or Publisher. But maybe you
just want to do something once.
const rabbit = new Connection()
await rabbit.queueDeclare({queue: 'my-queue', exclusive: true})
await rabbit.exchangeDeclare({queue: 'my-queue', exchange: 'my-exchange', type: 'topic'})
await rabbit.queueBind({queue: 'my-queue', exchange: 'my-exchange'})
const {messageCount} = await rabbit.queueDeclare({queue: 'my-queue', passive: true})
And if you really want to, you can acquire a raw AMQP Channel but this should
be a last resort.
const ch = await rabbit.acquire()
ch.on('close', () => {
console.log('channel was closed')
})
const msg = ch.basicGet('my-queue')
console.log(msg)
await ch.close()
RPCClient: request-response communication between services
This will create a single "client" Channel on which you may publish messages
and listen for direct responses. This can allow, for example, two
micro-services to communicate with each other using RabbitMQ as the middleman
instead of directly via HTTP.
const rabbit = new Connection()
const rpcServer = rabbit.createConsumer({
queue: 'my-rpc-queue'
}, async (req, reply) => {
console.log('request:', req.body)
await reply('pong')
})
process.on('SIGINT', async () => {
await rpcServer.close()
await rabbit.close()
})
const rabbit = new Connection()
const rpcClient = rabbit.createRPCClient({confirm: true})
const res = await rpcClient.send('my-rpc-queue', 'ping')
console.log('response:', res.body)
await rpcClient.close()
await rabbit.close()