RMQBroker
The aim of this module is to abstract away two different way messages are exchanges using queues.
One way is to send messages in real time, and no matter if we lose them if nobody is waiting for them.
You can see this like a stream of messages, always flowing.
You can refer to this also as the pub/sub pattern described here
Another, is when we need to store messages that must be read another point in time, and we must ensure that every message produced is correctly delivered without it can be lost during its way to a consumer.
You can refer to this also as the Work Queues described here
So to sum up, we have four service objects:
- a
DirectPubliser
: send messages directly to a queue where they are stored waiting to be consumed. - a
FanoutPubliser
: publish messages to an exchange. No queue is binded to this exchange, so if there are no subscriber messages will be lost. - a
Consumer
: bind the queue to the exchange and start listening for incoming messages. - a
Subscriber
: creates and bind an unamed queue to the exchange and start listening for incoming messages. The queue will be auto removed when Subscriber disconnects.
Install
npm i --save rmqbroker
Usage
All the above mentioned service objects are created using RMQBuilder
static object's methods. This is the only object exported by the module.
Exchange real time messages
To exchange real time messages we need a FanoutPublisher
and a Subscriber
.
Publisher
const { RMQBuilder } = require('rmqbroker')
const publisher = RMQBuilder.createFanoutPublisher('amqp://localhost', 'ex-demo-01', { durable: false })
publisher.publishMessage(message)
.then(() => {
console.log('message sent!')
return publisher.close()
})
.then(() => {
console.log('Goodbye')
})
.catch(console.error)
Subscriber
const { RMQBuilder } = require('rmqbroker')
const subscriber = RMQBuilder.createSubscriber('amqp://localhost', 'ex-demo-01', { durable: false })
subscriber.on('message', (message) => {
console.log(`New message: ${message}`)
})
subscriber.subscribe()
.then(() => {
console.log('Waiting for incoming messages...')
})
.catch(console.error)
Note that we only declare the same exchange name on the publlisher and the subscriber.
The last object refers to the exchange options, the durable property is set to false by default.
There can be more than one subscriber per exchange. Each of the subscriber recive a copy of the same message.
Work queue
If we don't care about to loose messages we need a DirectPublisher
and a Consumer
.
Publisher
const { RMQBuilder } = require('rmqbroker')
const publisher = RMQBuilder.createDirrectPublisher('amqp://localhost', 'ex-demo-02', 'cool-queue', { durable: true }, { durable: true })
publisher.publishMessage(message)
.then(() => {
console.log('message sent!')
return publisher.close()
})
.then(() => {
console.log('Goodbye')
})
.catch(console.error)
Consumer
const { RMQBuilder } = require('rmqbroker')
const consumer = RMQBuilder.createConsumer('amqp://localhost', 'ex-demo-02', 'cool-queue', { durable: true }, { durable: true })
consumer.startConsuming((message) => {
console.log(message)
})
.then(() => {
console.log('Callback registered to consumer - waiting for messaging')
})
.catch(console.error)
Note that alongside the exchange we must declare a queue name that must be the same between publisher and consumer.
Also the durable option for both exchange and consumer is set to true by default.
There can be more than one consumer per queue, in this case messages are distribuited among all consumers in a way called round-robin.
Example
You can find some examples in the example/
directory.