Colvin nodejs rabbitmq utils
A RabbitMQ package with some utils like retryables to use across diferents projects.
Usage
Consumer/worker example
import { amqpConnect, retryable, worker, deadLetter } from '@thecolvinco/nodejs-amqplib';
const connectionString = 'amqp://rabbitmq:rabbitmq@localhost:5672';
const args = process.argv.slice(2);
const queueName = args[0] || 'send-invoice-on-shopify-order-created';
const bindingKey = args[1] || 'blom.superapp.1.event.shopify.order-created';
const dlxExchange = args[2] || 'blom.superapp.dlx.direct';
const dlxRoutingKey = args[3] || 'blom.superapp.dlx';
const dlxQueueName = args[4] || 'blom-superapp-dlx-queue';
const workable = (channel) => {
return async (msg) => {
console.info(`Message received ${msg}`);
if (msg.content.toString() === 'retry') {
await retryable({
channel,
message: msg,
queue: {
name: queueName,
},
retryExchange: {
name: 'retries.exchange',
},
maxRetries: 3,
delay: 1000,
});
} else {
channel.ack(msg);
};
};
};
amqpConnect(connectionString).then(async function(conn) {
try {
const channel = await conn.createChannel();
const exchangeData = {
name: 'blom.exchange.topic',
type: 'topic',
};
const queueData = {
name: queueName,
bindingKey,
options: {
durable: true,
deadLetterExchange: dlxExchange,
deadLetterRoutingKey: dlxRoutingKey
},
dlx: {
func: deadLetter,
params: {
channel,
dlxQueue: {
name: dlxQueueName,
},
dlxExchange: {
name: dlxExchange,
type: 'direct',
},
},
},
};
const consumerData = {
onMessage: workable(channel),
options: {
noAck: false
},
};
await worker({
channel,
exchange: exchangeData,
queue: queueData,
consumer: consumerData,
});
console.log("Wating for messages....");
} catch(error) {
console.log(error)
}
}).catch(console.log);
Producer example
import { amqpConnect, producer } from '@thecolvinco/nodejs-amqplib';
const connectionString = 'amqp://rabbitmq:rabbitmq@localhost:5672';
const args = process.argv.slice(2);
amqpConnect(connectionString).then(async (connection) => {
return connection.createChannel().then(async channel => {
const message = args[0] || 'Hello world';
const key = args[1] || 'blom.superapp.1.event.shopify.order-created';
const exchange = {
name: 'blom.exchange.topic',
type: 'topic',
};
await producer({
channel,
message,
key,
exchange,
});
console.log(`[x] Sent ${message} to ${exchange.name} with key ${key}`);
}).finally(() => { connection.close(); });
}).catch(console.log);
About
This package is maintained by TheColvinCo
LICENSE
Code is licensed under the MIT License.