exp-amqp-connection
Advanced tools
Comparing version
37
index.js
@@ -77,2 +77,7 @@ "use strict"; | ||
setImmediate(function () { | ||
if (behaviour.deadLetterExchangeName) { | ||
return setupDeadLetterExchange(behaviour.deadLetterExchangeName, function () { | ||
callback(exch); | ||
}); | ||
} | ||
callback(exch); | ||
@@ -83,2 +88,28 @@ }); | ||
function setupDeadLetterExchange(deadLetterExchangeName, callback) { | ||
var options = { | ||
durable: true, | ||
autoDelete: false, | ||
type: "topic" | ||
}; | ||
conn.exchange(deadLetterExchangeName, options, function (ex) { | ||
var options = {durable: true, autoDelete: false}; | ||
conn.queue(deadLetterExchangeName + ".deadLetterQueue", options, function (q) { | ||
q.bind(ex, "#"); | ||
callback(); | ||
}); | ||
}); | ||
} | ||
function getQueueOptions() { | ||
if (behaviour.deadLetterExchangeName) { | ||
queueOptions["arguments"] = queueOptions["arguments"] || {}; | ||
queueOptions["arguments"]["x-dead-letter-exchange"] = behaviour.deadLetterExchangeName; | ||
if (!subscribeOptions.ack) { | ||
throw new Error("Ack needs to be enabled in subscribeOptions for dead letter exchange to work"); | ||
} | ||
} | ||
return queueOptions; | ||
} | ||
function publish(routingKey, message, publishCallback) { | ||
@@ -100,3 +131,3 @@ var actualPublishCallback = publishCallback || function () {}; | ||
logger.debug("Attempting to connect to queue", id); | ||
conn.queue(queueName, queueOptions, function (queue) { | ||
conn.queue(queueName, getQueueOptions(), function (queue) { | ||
routingPatterns.forEach(function (routingPattern) { | ||
@@ -122,2 +153,3 @@ queue.bind(behaviour.exchange, routingPattern); | ||
} | ||
attemptExclusiveSubscribe(1); | ||
@@ -128,3 +160,4 @@ } | ||
var actualSubscribeCallback = subscribeCallback || function () {}; | ||
conn.queue(queueName, queueOptions, function (queue) { | ||
conn.queue(queueName, getQueueOptions(), function (queue) { | ||
queue.on("error", function (queueError) { | ||
@@ -131,0 +164,0 @@ return actualSubscribeCallback(queueError); |
@@ -9,3 +9,3 @@ { | ||
], | ||
"version": "1.2.2", | ||
"version": "1.2.4", | ||
"scripts": { | ||
@@ -12,0 +12,0 @@ "test": "NODE_ENV=test mocha -t 5000 && jshint ." |
@@ -12,2 +12,3 @@ # Simple amqp library | ||
* Reuse connection | ||
* Dead letter exchange handling | ||
@@ -37,5 +38,6 @@ ## API | ||
logger: "..." // one-arg-function used for logging errors. Defaults to console.log | ||
deadLetterExchangeName: "...", // Enable dead letter exchange by setting a name for it. | ||
exchangeOptions: "...", // Options to pass to the exchange | ||
queueOptions: "...", // Options to pass to the queue | ||
subscribeOptions: "...", // Options to use for subscribing | ||
subscribeOptions: "...", // Options to use for subscribing, | ||
}; | ||
@@ -79,3 +81,3 @@ ``` | ||
if (err) return console.err(err); | ||
conn.subscribe("myRoutingKey", "myQueueName", function (message) { | ||
conn.subscribe("myRoutingKey", "myQueueName", function (message, headers, deliveryInfo, messageObject) { | ||
console.log("Got message", message); | ||
@@ -118,1 +120,32 @@ }); | ||
``` | ||
### Dead letter exchange | ||
Messages from a queue can be 'dead-lettered'; that is, republished to another exchange. | ||
For more information: https://www.rabbitmq.com/dlx.html | ||
This option will create a dead letter queue with the name `deadLetterExchangeName + ".deadLetterQueue"` | ||
```js | ||
var amqpConn = require("exp-amqp-connection"); | ||
var options = { | ||
exchange: "myExchange", | ||
deadLetterExchangeName: "myExchange.dead", | ||
subscribeOptions: { | ||
ack: true // Ack must be enabled for dead lettering to work | ||
} | ||
} | ||
amqpConn({host: "amqpHost"}, options, function (err, conn) { | ||
if (err) return console.err(err); | ||
conn.subscribe("myRoutingKey", "myQueueName", function (message, headers, deliveryInfo, messageObject) { | ||
if (message) { | ||
messageObject.acknowledge(); | ||
} else { | ||
messageObject.reject(false); // reject=true, requeue=false causes dead-lettering | ||
} | ||
console.log("Got message", message); | ||
}); | ||
}); | ||
``` |
@@ -8,2 +8,3 @@ "use strict"; | ||
var async = require("async"); | ||
var extend = require("../extend"); | ||
@@ -66,2 +67,44 @@ var defaultBehaviour = {exchange: "e1", errorLogger: console.log}; | ||
Feature("Dead letter exchange", function () { | ||
Scenario("Publishing failed messages on dead letter exchange", function () { | ||
var message; | ||
var deadLetterExchangeName = "e1.dead"; | ||
var behaviour = extend(defaultBehaviour, { | ||
deadLetterExchangeName: deadLetterExchangeName, | ||
subscribeOptions: { | ||
ack: true | ||
} | ||
}); | ||
after(disconnect); | ||
And("We have a connection with a dead letter exchange", function (done) { | ||
connect(defaultConnOpts, behaviour, ignoreErrors(done)); | ||
}); | ||
And("We are listing to the dead letter exchange", function (done) { | ||
amqp(defaultConnOpts, {exchange: deadLetterExchangeName}, function (err, conn) { | ||
if (err) return done(err); | ||
conn.subscribe("#", "deadQ", function (msg) { | ||
message = msg; | ||
}, done); | ||
}); | ||
}); | ||
And("We reject all messages", function (done) { | ||
connection.subscribe("testRoutingKey", "testQ", function (msg, headers, deliveryInfo, ack) { | ||
ack.reject(false); | ||
}, done); | ||
}); | ||
When("We publish a message", function (done) { | ||
connection.publish("testRoutingKey", {testData: "hello"}, done); | ||
}); | ||
Then("The message should be in the dead letter queue", function (done) { | ||
setTimeout(function () { | ||
assert.equal(message.testData, "hello"); | ||
done(); | ||
}, 50); | ||
}); | ||
}); | ||
}); | ||
function testConnection(done) { | ||
@@ -109,5 +152,5 @@ var randomRoutingKey = "RK" + crypto.randomBytes(6).toString("hex"); | ||
request.get("http://guest:guest@localhost:15672/api/connections", | ||
function (err, resp, connections) { | ||
callback(err, JSON.parse(connections)); | ||
}); | ||
function (err, resp, connections) { | ||
callback(err, JSON.parse(connections)); | ||
}); | ||
} | ||
@@ -114,0 +157,0 @@ |
79729
401.47%29
163.64%405
20.18%148
28.7%