AmqpCacoon
Overview
This is a basic library to provide amqp support. This library is a wrapper around node-amqp-connection-manager which wraps
amqplib.
Features
AMQP Cacoon, in combination with AMQP Connection Manager:
- provides support for behind-the-scenes retries on network failure:
- automatically handles reconnect if AMQP connection is lost and re-established.
- caches published messages if they are published while AMQP is disconnected.
- guarantees receipt of published messages and provides wrappers around potentially non-persistent channels.
- allows consuming single or a batch of messages.
Dependencies
This version of AMQP Cacoon has been tested with:
It is possible the package functions correctly with older versions of node and other dependencies, though these might
be untested.
Simple Usage
Connect
This allows you to connect to an amqp server
const config = {
messageBus: {
protocol: 'amqp',
username: 'valtech',
password: 'iscool',
host: 'localhost',
port: 5672,
testQueue: 'test-queue',
},
};
let amqpCacoon = new AmqpCacoon({
protocol: config.messageBus.protocol,
username: config.messageBus.username,
password: config.messageBus.password,
host: config.messageBus.host,
port: config.messageBus.port,
amqp_opts: {
heartbeatIntervalInSeconds: 5,
reconnectTimeInSeconds: 5,
connectionOptions: {
ca:
config.messageBus.port === 'amqps'
? [
fs.readFileSync(
__dirname + '/' + secrets.amqpCACertName || 'ca_certificate.pem'
),
]
: null,
},
},
providers: {
logger: logger,
},
});
Note: See the RabbitMQ Setup section below for how to set up Exchanges and Queues once your
connection is established.
Publishing Messages
Once you have an active amqpCaccon object, here is an example of how to publish a message.
let channel = await amqpCacoon.getPublishChannel();
await amqpCacoon.publish(
'',
config.messageBus.testQueue,
Buffer.from('TestMessage')
);
Note:
- This is the exchange name to publish into. Leaving this blank publishes to the AMQP Default Exchange.
- This is the routing key for the message. If you publish to the AMQP Default Exchange, then the Routing Key
needs to be the name of a Queue which the default Exchange will route the message into.
If you're publishing to a named Exchange (which is more flexible) then you can still pass a routing key
but be sure to BIND that exchange to one or more queues based on the routing key so that the exchnage knows
how to route your messages! Learn more about routing here: https://www.rabbitmq.com/tutorials/tutorial-four-javascript.html
- This is the message to publish. Notice it must be a Buffer.
Note: Please see ./examples/example-amqp-publish.js for a complete example.
Consume Single Message
This is an example of how to consume a single message.
let channel = await amqpCacoon.getConsumerChannel();
channel.assertQueue(config.messageBus.testQueue);
amqpCacoon.registerConsumer(
config.messageBus.testQueue,
async (channel: Channel, msg: ConsumeMessage) => {
try {
console.log('Messsage content', msg.content.toString());
channel.ack(msg)
} catch (e) {
channel.nack(msg)
}
}
);
Note: Please see ./examples/example-amqp-consumer.js for a complete example.
Consume Message Batch
When consuming messages, instead of one at a time, you can choose to have your callback passed in a batch. This feature
allows you to wait until either some time has elapsed or a specified message data size has been exceeded before the
messages are turned over to your callback.
amqpCacoon.registerConsumerBatch(
config.messageBus.testQueue,
async (channel: Channel, batch: ConsumeBatchMessages) => {
try {
console.log(`Messsages in the bactch: ${batch.messages.length}`);
batch.ackAll()
} catch (e) {
batch.nackAll()
}
},
{
batching: {
maxTimeMs: 60000,
maxSizeBytes: 1024 * 1024 * 2,
}
}
);
Note: Please see ./examples/example-amqp-consumer-batch.js for a complete example.
Note that in practice, all messages are fetched by AMQP Cacoon from the broker as soon as they're released by the broker.
However, AMQP Cacoon batches the contents until the time or data limit is reached, then your callback is handed over
the messages for processing. Note that AMQP Cacoon will not ACK nor NACK the messages until your callback decides! So,
you'll see the messages accumulate in RabbitMQ until your callback is called, then batches will be removed from RMQ
as you ACK them. The best practice is to write your application to either ACK or NACK all the messages in a batch,
however, it is possible to ACK, NACK individual messages (though that is not demonstrated here.)
Dealing With Channels via ChannelWrapper
This library exposes node-amqp-connection-manager's ChannelWrapper when you call either getConsumerChannel
or
getPublishChannel
. Instead of exposing the Amqp Channel directly (which may or may not be valid depending on the
network status), AmqpConnectionManager provides a ChannelWrapper class as an interface to interacting with the
underlying channel. Most functions that can be performed on an AmqpLib Channel
can be performed on the
ChannelWrapper
, including ackAll
, nackAll
, etc. though they are Promise-based.
See AMQPConnectionManager's documentation for more info,
as well as the underlying amqplib docs.
Note:
- Remember to ack or nack on all messages. This is a standard message bus pattern.
- An alternative is to pass an option into the
registerConsumer
to not require an ack (noAck). The problem with this
is that if your application is reset or errors out, you may lose the message or messages.
RabbitMQ Setup
AMQP Caccon exposes the underlying Amqp Connection Manager setup function which allows a setup callback to be passed
in the configuration, or added to a ChannelWrapper at any point. This function can be used with callbacks or Promises
and directly exposes the underlying AMQP channel as a ConfirmChannel (an extension of Channel) (since we know it is
valid at that point). The setup function is useful for asserting queues and performing other necessary tasks that must
be completed once a valid connection to amqp is made. See AMQPConnectionManager's documentation for more details.
Here are typescript and javascript examples of this in practice:
let amqpCacoonConfig: IAmqpCacoonConfig = {
protocol: config.messageBus.protocol,
username: config.messageBus.username,
password: config.messageBus.password,
host: config.messageBus.host,
port: config.messageBus.port,
connectionString: config.messageBus.connectionString,
amqp_opts: {},
providers: {
logger: logger,
},
onChannelConnect: async function (channel: ConfirmChannel) {
if (channel) {
await channel.assertQueue(config.messageBus.testQueue);
}
},
};
let amqpCacoonConfig = {
protocol: config.messageBus.protocol,
username: config.messageBus.username,
password: config.messageBus.password,
host: config.messageBus.host,
port: config.messageBus.port,
connectionString: config.messageBus.connectionString,
amqp_opts: {},
providers: {
logger: logger,
},
onChannelConnect: async function (channel) {
if (channel) {
await channel.assertQueue(config.messageBus.testQueue);
}
},
};
Note that in both examples:
- You add a callback in your amqpCacoonConfig which will be called once the connection to RabbitMQ is established passing
in a channel into the callback.
- You then use that channel to perform
assertQueue()
, assertExhange()
, bindQueue()
and other setup
operations.
Logger
You can pass a logger (optional) as the examples above. For a logger to work, it should have at least the following methods:
Some loggers you may use:
The examples in the directory ./examples/src use log4js.
And for a typescript project, using tslog is quite straightforward:
const logger = new Logger({
displayLoggerName: true,
minLevel: 'silly',
name: 'amqp-cacoon logger',
type: 'pretty'
});
Run the Examples
The directory ./examples/src contains several examples demonstrating the features of AMQP Cacoon.
To run the examples:
- Install node modules
cd ./examples
npm i
-
Make sure that you have an AMQP broker like RabbitMQ running, as noted in the comments at the top of the example files in examples/src
.
-
Run any one of the specific examples
node ./example-amqp-publish.js
Running Tests for this Repo
Note that all the tests for this REPO are UNIT TESTS that do not require an actual AMQP host to be
setup. Consequently, the tests verify that the AMQP Cacoon wrappers are properly calling the underlying
AMQPLIP and NODE AMQP MANAGER libraries. For a more "real world" test, see Run the Examples.
-
Install node modules (This also loads local modules from our own repositories)
npm install
-
Run tests
npm run test
Roadmap
- TODO: Add an example of ConsumeBatch where individual messages are ACK/NACK.
- PENDING: Timeout if drain event does not occur after some amount of time when channel is not ready to receive a
publish. As of 09/2020, the publish-on-drain functionality has been removed, as
node-amqp-manager
does not support
it at this time (pending a bugfix?). This requires further research and testing. See https://github.com/valtech-sd/amqp-cacoon/issues/20.