Simple amqp library

Features
- Hides underlying amqp implementation details
- Publish
- Subscribe
- Optionally kill process on errors
- Reuse connection
- Dead letter exchange handling
API
A single function is exported:
var amqpConn = require("exp-amqp-connection");
amqpConn({host: "amqphost"}, {reuse: "myKey", exchange: "myExchange"}, function (err, conn) {
if (err) return console.err(err);
...
});
The first arg is amqp connection options. See https://github.com/postwait/node-amqp#connection-options-and-url.
The second arg defines various behaviour options:
var behaviourOpts = {
dieOnError: "...",
exchange: "...",
reuse: "...",
logger: "..."
deadLetterExchangeName: "...",
exchangeOptions: "...",
queueOptions: "...",
subscribeOptions: "...",
consumerCancelNotification: "..."
};
More info om consumer cancel notifications here: http://www.rabbitmq.com/consumer-cancel.html
Default values for options, these will be merged with your changes:
var defaultExchangeOptions = {
durable: true,
autoDelete: false,
confirm: true
};
var defaultQueueOptions = {
autoDelete: true
};
var defaultSubscribeOptions = {};
Examples
Publish
var amqpConn = require("exp-amqp-connection");
amqpConn({host: "amqpHost"}, {exchange: "myExchange"}, function (err, conn) {
if (err) return console.err(err);
conn.publish("myRoutingKey", "a message");
});
Subscribe
NOTE: it is highly recommended to enable both dieOnError
as well as consumerCancelNotification
when subscribing
to ensure a restart/reconnect in all scenarios where the subscription fails.
var amqpConn = require("exp-amqp-connection");
var behaviour = {exchange: "myExchange", dieOnError: true, consumerCancelNotification: true};
amqpConn({host: "amqpHost"}, behaviour, function (err, conn) {
if (err) return console.err(err);
conn.subscribe("myRoutingKey", "myQueueName", function (message, headers, deliveryInfo, messageObject) {
console.log("Got message", message);
});
});
Reuse connection
All calls providing the same reuse key will get the same connection returned. If no
reuse key is provided, a new connection is returned each time.
The following will yield a single connection to rabbit instead of 5000:
var amqpConn = require("exp-amqp-connection");
for(var i = 0; i < 5000; i++) {
amqpConn({host: "amqpHost"}, {reuse: "someKey"}, function (err, conn) {
if (err) return console.err(err);
conn.publish("myRoutingKey", "a message");
});
}
Die on error
In certain cases you want to crash the entire node process when there is a problem
with the amqp connection. For example durable subscriptions have problems recovering
in certain corner cases, so in order to not risk getting into a deadlocked state it
is better to crash and let the process restart.
var amqpConn = require("exp-amqp-connection");
amqpConn({host: "amqphost"}, {dieOnError: true}, function (err, conn) {
if (err) return console.err(err);
...
});
Dead letter exchange
Messages from a queue can be 'dead-lettered'; that is, republished to another exchange.
For more information: https://www.rabbitmq.com/dlx.html
This option will create a dead letter queue with the name deadLetterExchangeName + ".deadLetterQueue"
var amqpConn = require("exp-amqp-connection");
var options = {
exchange: "myExchange",
deadLetterExchangeName: "myExchange.dead",
subscribeOptions: {
ack: true
}
}
amqpConn({host: "amqpHost"}, options, function (err, conn) {
if (err) return console.err(err);
conn.subscribe("myRoutingKey", "myQueueName", function (message, headers, deliveryInfo, messageObject) {
if (message) {
messageObject.acknowledge();
} else {
messageObject.reject(false);
}
console.log("Got message", message);
});
});