node-message-bus
![GitHub](https://img.shields.io/github/stars/zitros/node-message-bus.svg?style=social&label=Star)
Declarative AMQP (RabbitMQ / LavinMQ / ...) support for your NodeJS microservices. A convenient wrapper around ampqlib
for RabbitMQ, bringing the most critical features to build with message bus pattern.
Requirements
- NodeJS 18+ (required for native
fetch
when NODE_ENV=test
)
Features
- Declarative interface, simple in use.
- Built-in, no-external-dependencies support of exponential backoff.
- No-dependencies, out-of-the-box testing with dynamic instances provided by CloudAMQP.
- Simple yet flexible interfaces.
- Pluggable logging library.
Table of Contents
- Installation
- Configuration
- Examples
- Publisher
- Consumer
- Usage
- Initialize
- Disconnect
- Publish messages
- Consume messages
- Using message types
- Other functions
- Features
- Built-in exponential backoff
- Spot RabbitMQ instances for testing
- License
Installation
Install this NPM module:
npm install --save node-message-bus
Configuration
This module can be configured with the following environment variables.
NODE_MESSAGE_BUS_CONNECTION_URL=http://admin:admin@rabbitmq
NODE_MESSAGE_BUS_DEFAULT_EXCHANGE_NAME=amq.topic
NODE_MESSAGE_BUS_DEFAULT_EXCHANGE_TYPE=topic
NODE_ENV=test
NODE_MESSAGE_BUS_TESTING_CLOUDAMQP_API_KEY=faf83b09-352f-add3-c2e3-c83212a32344
NODE_MESSAGE_BUS_TESTING_CLOUDAMQP_INSTANCE_LIFETIME=3600000
NODE_MESSAGE_BUS_TESTING_CLOUDAMQP_PREFERRED_REGIONS=eu-central,europe-west
Examples
Below you will find simple copy-paste examples of publisher and consumer.
Publisher
import { initMessageBus, publishMessage } from 'node-message-bus';
await initMessageBus();
await publishMessage({
key: 'worker.test',
body: 'Hello',
});
Consumer
Application init file:
import { initMessageBus } from 'node-message-bus';
await initMessageBus({
});
Specific consumer handler file:
import { configureMessageBus, consumeMessages } from 'node-message-bus';
await configureMessageBus({
queues: ['test-queue-1'],
bindings: [{ toQueue: 'test-queue-1', routingKey: 'worker.#' }],
});
await consumeMessages('test-queue-1', async ({ body, key }) => {
console.log(`Consumed message with routingKey=${key}:`, body);
});
or
await consumeMessages(
{
queues: ['test-queue-1'],
bindings: [{ toQueue: 'test-queue-1', routingKey: 'worker.#' }],
},
async ({ body, key }) => {
console.log(`Consumed message with routingKey=${key}:`, body);
}
);
Usage
Initialize
In JavaScript, you have to call initMessageBus
function to initialize the library with
what this service will listen or push to, before doing anything else. You won't get errors
if you, for instance, publish messages before calling this functions, but publishing will
hang and wait until initMessageBus is called.
import { initMessageBus } from 'node-message-bus';
await initMessageBus({
logger: (logType, message) => console[logType](message),
exchanges: [
{
name: 'amq.topic',
type: 'topic',
},
],
queues: [
{
name: 'test-queue-1',
},
{
name: 'test-queue-with-dead-letter-and-ttl',
options: {
deadLetterExchange: '',
deadLetterRoutingKey: 'something',
messageTtl: 2000,
},
},
{
name: 'test-queue-dead-letter-handler',
},
],
bindings: [
{
toQueue: 'test-queue-1',
routingKey: 'routing-key',
},
{
toQueue: 'test-queue-dead-letter-handler',
routingKey: 'something',
},
],
amqpConfig: {
},
});
If you need to define any queues in runtime (which should only be used for testing), use configureMessageBus
,
which has the same API as initMessageBus
.
import { configureMessageBus } from 'node-message-bus';
await configureMessageBus({
});
Disconnect
To properly disconnect from a message bus ensuring all messages are ack'ed, use this function.
We demonstrate the usage of this function along with node-graceful-shutdown
NPM module.
import { onShutdown } from 'node-graceful-shutdown';
import { closeMessageBus } from 'node-message-bus';
onShutdown(closeMessageBus);
Publish messages
Mind to call initMessageBus
for microservices which are publishing only.
import { publishMessage } from 'node-message-bus';
await publishMessage({
key: 'key-1',
body: {
info: 'This will be serialized to JSON,',
or: "you can pass a primitive value to 'data'.",
},
});
You can also publish messages to a single queue (which is not recommended under normal circumstances):
import { publishMessageToQueue } from 'node-message-bus';
await publishMessageToQueue({
queueName: 'queue-1',
body: 'Made in 🇺🇦',
});
In testing scenarios, you can also access a few extra functions that will allow for easier assertions:
import { startApp, stopApp, myAwesomeFunc } from 'build';
import {
getLastPublishedMessages,
getLastRejectedMessages,
getLastConsumedMessages,
clearLastMessages,
} from 'node-message-bus';
before(async () => {
await startApp();
});
after(async () => {
await stopApp();
});
describe('Dummy test', () => {
beforeEach(async () => {
await clearLastMessages();
});
it('tests something', async () => {
await myAwesomeFunc();
expect(getLastPublishedMessages()).to.have.length(2);
expect(getLastConsumedMessages()).to.have.length(2);
expect(getLastRejectedMessages()).to.have.length(0);
});
});
Consume messages
Consumers are typically defined once, globally, per-microservice.
import { consumeMessages } from 'node-message-bus';
await consumeMessages(
'test-queue-1',
async ({ body, key, failThisMessage }) => {
await failThisMessage(
new Error(
'"soft fail" this message, which sends it to the backoff queue.'
)
);
}
);
Sequential AMQP message consumption
If you want a worker (instance!) to consume a maximum of one message at a time from the queue, you can limit it
with a prefetchCount
option for the specific queue (consumer):
import { consumeMessages } from 'node-message-bus';
await consumeMessages(
{
queues: ['test-queue-1'],
prefetchCount: 1,
},
async ({ body }) => {
}
);
Using message types
It is recommended to use node-message-bus
with typed messages, to ensure data integrity during compilation.
You can do it purely with TypeScript:
import { publishMessage, consumeMessages, IMessage } from 'node-message-bus';
interface MessageWorkerTaskA
extends IMessage<'worker.task-a', { typed: string }> {}
type MessageWorkerTaskB = IMessage<'worker.task-b', { myData: number }>;
export type Message = MessageWorkerTaskA | MessageWorkerTaskB;
Then, use the following example to publish and consume typed messages. Both key and body will correspond only to
a specific message.
import { Message } from '@your-company/types';
await publishMessage<Message>({
key: 'worker.task-a',
body: {
typed: 'Hello, world!',
},
});
await consumeMessages<Message>(
'my-queue',
async ({ key, body, failThisMessage }) => {
if (key === 'worker.task-a') {
} else if (key === 'worker.task-b') {
} else {
await failThisMessage();
}
}
);
Other functions
purgeQueue / purgeAllQueues
purgeAllQueues()
is a convenient way to cleanup every defined (in this process) queue before each new test you run.
import { purgeQueue, purgeAllQueues } from 'node-message-bus';
await purgeQueue('test-queue-1');
await purgeAllQueues();
deleteQueue
deleteQueue()
lets you delete a defined queue by name.
import { deleteQueue } from 'node-message-bus';
await deleteQueue('test-queue-1');
Features
Built-in exponential backoff
node-message-bus
features exponential backoff when consuming messages out of the box, with no dependencies.
When consuming a message errors (or you nack
it yourself), the message which caused an error will be ack
nowledged
but sent back to the automatic exponential backoff queue. This queue will be created automatically on failures.
If creating a backoff queue or sending a message to it fails too, the message will be nack
'ed and hence consuming
this message will be retried.
Assuming there's a queue queue-name
, with incoming message routing-key
, automatic exponential backoff queues use
the following naming pattern:
queue-name
(original queue)queue-name.backoff-1s+routing-key
(delayed by 1 seconds)queue-name.backoff-4s+routing-key
(delayed by 4 seconds)queue-name.backoff-16s+routing-key
(delayed by 16 seconds)queue-name.backoff-64s+routing-key
(delayed by 64 seconds)queue-name.backoff-256s+routing-key
(delayed by 256 seconds)queue-name.backoff-1024s+routing-key
(delayed by 1024 seconds)
These queues have TTL and push the message back to the exchange with the same routing key and properties they had
before. These queues are not auto-deleted.
Dynamic RabbitMQ instances for testing
If you set the following environment variables,
NODE_ENV=test
NODE_MESSAGE_BUS_TESTING_CLOUDAMQP_API_KEY=<your_api_key>
node-message-bus
will switch to a testing mode and will be using cloudamqp.com to
create hot RabbitMQ instances on initMessageBus
, under the hood. Thus, tests you run will be connected to
a clean RabbitMQ instance.
node-message-bus
create a free plan instances, which should be enough for various testing purposes:
Specifically, when in testing mode, initMessageBus
will perform the following sequence of actions:
- Connect to cloudamqp.com using the provided API key.
- Delete all instances with names
node-message-bus-delete-after-*
, where *
is past the current date (as deleting instances in step 5.
can fail). - Create a new instance named
node-message-bus-delete-after-*
, where *
is the current date plus 1 hour (controlled by NODE_MESSAGE_BUS_TESTING_CLOUDAMQP_INSTANCE_LIFETIME
). - Run a test connecting to this instance.
- Delete the created instance during this run
node-message-bus-delete-after-*
.
In case cloudamqp.com is down, tests will fail. But you see yourself that this is
a highly available service here.
License
MIT © Nikita Savchenko