Haredo
Haredo version 3 introduces breaking changes. See 3.0 Changes

RabbitMQ client for Node.js with a focus on simplicity and type safety.
Table of Contents
Features
Usage
Working examples are available on github
Initializing
import { Haredo } from 'haredo';
const haredo = Haredo({
url: 'amqp://localhost:5672/'
});
Listening for messages
example on GitHub
haredo.queue('my-queue')
.bindExchange('testExchange', '#', 'topic', { durable: false })
.subscribe(async (message) => {
console.log(message.data);
});
Publishing to an exchange
example on GitHub
haredo.exchange('my-exchange').publish({ id: 5, status: 'active' }, 'item.created');
Publishing to a queue
example on GitHub
haredo.queue('my-queue').publish({ id: 5, status: 'inactive' });
Limit concurrency
haredo.queue('my-queue')
.prefetch(5)
.subscribe(async (message) => {
console.log(message);
});
Delayed messages
Note: this requires RabbitMQ Delayed Message Plugin to be installed and enabled on the server.
example on GitHub
interface Message {.exchange
id: number;
}
const delayedExchange = Exchange<Message>('my-delayed-exchange', 'x-delayed-message').delayed('topic');
await haredo.queue('my-queue')
.bindExchange(delayedExchange, '#')
.subscribe((data, { timestamp }) => {
console.log(`Received message in ${ Date.now() - timestamp }ms id:${ data.id } `);
});
let id = 0;
while (true) {
id += 1;
console.log('Publishing message', id);
const msg = delayedMessage.json({ id }).timestamp(Date.now());
await haredo
.exchange(delayedExchange)
.delay(1000)
.publish(msg);
await delay(2000);
}
Quorum queues with delivery limits
Node: requires RabbitMQ 3.8.0 or higher, see Quorum Queues Overview for more information.
example on GitHub
Message throttling
example on GitHub
await haredo.queue('my-queue')
.backoff(standardBackoff({
failThreshold: 3,
failSpan: 5000,
failTimeout: 5000
}))
.subscribe(() => {
throw new Error('Nack this message for me')
});
Dead letter
View on GitHub
Middleware
example on GitHub
import { Middleware } from 'haredo';
const timeMessage: Middleware = ({ queue }, next) => {
const start = Date.now();
await next();
console.log(`Message took ${ Date.now() - start }ms`);
}
await haredo.queue('my-queue')
.use(timeMessage)
.subscribe(() => {
throw new Error('Nack this message for me')
});
Global middleware
Add a middleware that will be called for every message in every subscriber
example on GitHub
declare module 'haredo/types' {
interface HaredoMessage<T> {
cid?: string;
}
}
const haredo = Haredo({
url: 'amqp://localhost:5672/'
globalMiddleware: [
(message) => {
message.cid = message.headers?.['x-cid'] as string;
}
]
});
Graceful shutdown
Calling consumer.cancel() will send cancel to channel and wait for existing messages to be handled before resolving the returned promise.
Calling haredoInstance.close() will gracefully close all of it's consumers
Automatic setup
By default Haredo will automatically assert the queues and exchanges and bind them
to each other each time publish/subscribe is called. This can be disabled by calling .skipSetup()
await haredo.queue('my-queue')
.skipSetup()
.subscribe(() => {
throw new Error('Nack this message for me');
});
await haredo.queue('my-queue')
.bindExchange('testExchange', '#', 'topic', { durable: false })
.skipSetup({ skipBoundExchanges: true, skipBindings: true, skipCreate: false });
Reestablish
By default Haredo attempts to restart the consumer when losing connection.
This behavior can be disabled by calling .reestablish(false)
const consumer = await haredo.queue('my-queue')
.reestablish(false)
.subscribe(() => {});
consumer.emitter.on('finish', () => {
console.log('Consumer has exited');
});
Extending Haredo
Add new methods to the Haredo instance. Only available for publish chains. Allows you to modify
the state, requires returning the modified state.
example on GitHub
interface Extension {
queue: {
cid<T>(cid: string): QueueChain<T>;
};
}
const haredo = Haredo<Extension>({
url: 'amqp://localhost:5672/'
extensions: [
{
name: 'cid',
queue: (state) => {
return (cid: string) => ({
...state,
headers: {
...state.headers,
'x-cid': cid
}
});
}
}
]
});
await haredo.queue('my-queue')
.cid('123')
.publish({ id: 5, status: 'inactive' });