exp-amqp-connection
Advanced tools
Comparing version
18
index.js
@@ -122,3 +122,3 @@ "use strict"; | ||
if (!subscribeOptions.ack) { | ||
throw new Error("Ack needs to be enabled in subscribeOptions for dead letter exchange to work"); | ||
throw new Error("Ack must be enabled in subscribeOptions for dead letter exchange to work"); | ||
} | ||
@@ -137,4 +137,4 @@ } | ||
function subscribeExclusive(routingKey, queueName, handler, subscribeCallback, queueIsTakenCallback) { | ||
queueIsTakenCallback = queueIsTakenCallback || function () {}; | ||
function subscribeExclusive(routingKey, queueName, handler, subscribeCallback, waitCallback) { | ||
waitCallback = waitCallback || function () {}; | ||
var onExclusiveCallback = subscribeCallback || function () {}; | ||
@@ -156,3 +156,3 @@ var internalSubscribeOptions = extend(subscribeOptions, {exclusive: true}); | ||
logger.info("Someone else is using the queue, we'll try again", id); | ||
queueIsTakenCallback(err, id); | ||
waitCallback(err, id); | ||
setTimeout(attemptExclusiveSubscribe.bind(null, ++id), 5000); | ||
@@ -167,3 +167,3 @@ } else { | ||
onExclusiveCallback(); | ||
logger.info("Exclusively subscribing to '" + queueName + "'. Other instances will have to wait.", id); | ||
logger.info("Exclusively subscribing to '" + queueName + "'", id); | ||
}); | ||
@@ -176,5 +176,5 @@ }); | ||
function subscribe(routingKey, queueName, handler, subscribeCallback) { | ||
function subscribe(routingKeyOrKeys, queueName, handler, subscribeCallback) { | ||
var actualSubscribeCallback = subscribeCallback || function () {}; | ||
var routingKeys = Array.isArray(routingKeyOrKeys) ? routingKeyOrKeys : [routingKeyOrKeys]; | ||
conn.queue(queueName, getQueueOptions(), function (queue) { | ||
@@ -188,3 +188,5 @@ queue.on("error", function (queueError) { | ||
}); | ||
queue.bind(behaviour.exchange, routingKey); | ||
routingKeys.forEach(function (routingKey) { | ||
queue.bind(behaviour.exchange, routingKey); | ||
}); | ||
queue.subscribe(subscribeOptions, handler); | ||
@@ -191,0 +193,0 @@ }); |
@@ -10,3 +10,3 @@ { | ||
], | ||
"version": "1.3.2", | ||
"version": "1.3.3", | ||
"scripts": { | ||
@@ -13,0 +13,0 @@ "test": "NODE_ENV=test mocha -t 5000 && jshint ." |
@@ -27,3 +27,4 @@ # Simple amqp library | ||
The first arg is amqp connection options. See https://github.com/postwait/node-amqp#connection-options-and-url. | ||
The first arg is amqp connection options. | ||
See https://github.com/postwait/node-amqp#connection-options-and-url. | ||
@@ -42,3 +43,5 @@ The second arg defines various behaviour options: | ||
subscribeOptions: "...", // Options to use for subscribing, | ||
consumerCancelNotification: "..." // If true, enable rabbit consumner cancel notifications. Causes exit of dieOnError is set, otherwise the notification will just be logged | ||
consumerCancelNotification: "..." // If true, enable rabbit consumner cancel notifications. | ||
// Causes exit of dieOnError is set, otherwise the notification | ||
// will just be logged | ||
}; | ||
@@ -79,4 +82,5 @@ ``` | ||
NOTE: it is highly recommended to enable both ``dieOnError`` as well as ``consumerCancelNotification`` when subscribing | ||
to ensure a restart/reconnect in all scenarios where the subscription fails. | ||
NOTE: it is highly recommended to enable both ``dieOnError`` as well as | ||
``consumerCancelNotification`` when subscribing to ensure a restart/reconnect in all scenarios | ||
where the subscription fails. | ||
@@ -88,4 +92,4 @@ ```js | ||
if (err) return console.err(err); | ||
conn.subscribe("myRoutingKey", "myQueueName", function (message, headers, deliveryInfo, messageObject) { | ||
console.log("Got message", message); | ||
conn.subscribe("myRoutingKey", "myQueueName", function (msg, headers, deliveryInfo, msgObject) { | ||
console.log("Got message", msg); | ||
}); | ||
@@ -95,2 +99,10 @@ }); | ||
You can subscribe to multiple routing keys by passing an array instead of a string: | ||
```js | ||
... | ||
conn.subscribe(["routingKey1", "routingKey2"], "myQueueName", function (msg) { ... }); | ||
... | ||
``` | ||
### Reuse connection | ||
@@ -133,3 +145,4 @@ | ||
For more information: https://www.rabbitmq.com/dlx.html | ||
This option will create a dead letter queue with the name `deadLetterExchangeName + ".deadLetterQueue"` | ||
This option will create a dead letter queue with the name | ||
`deadLetterExchangeName + ".deadLetterQueue"` | ||
@@ -149,4 +162,4 @@ ```js | ||
if (err) return console.err(err); | ||
conn.subscribe("myRoutingKey", "myQueueName", function (message, headers, deliveryInfo, messageObject) { | ||
if (message) { | ||
conn.subscribe("myRoutingKey", "myQueueName", function (msg, headers, deliveryInfo, msgObject) { | ||
if (msg) { | ||
messageObject.acknowledge(); | ||
@@ -157,5 +170,5 @@ } else { | ||
console.log("Got message", message); | ||
console.log("Got message", msg); | ||
}); | ||
}); | ||
``` |
@@ -69,2 +69,26 @@ "use strict"; | ||
Scenario("Multiple routing keys", function () { | ||
var messages = []; | ||
var handler = function (message) { messages.push(message.testData); }; | ||
after(disconnect); | ||
When("We have a connection", function (done) { | ||
connect(defaultConnOpts, defaultBehaviour, ignoreErrors(done)); | ||
}); | ||
And("We create a subscription for routing key 1 and 2", function (done) { | ||
connection.subscribe(["rk1", "rk2"], "testQ1", handler, done); | ||
}); | ||
When("We publish a message with routing key 1", function (done) { | ||
connection.publish("rk1", {testData: "m1"}, done); | ||
}); | ||
Then("It should be delivered once", function () { | ||
assert.deepEqual(["m1"], messages); | ||
}); | ||
When("We publish a message with routing key 2", function (done) { | ||
connection.publish("rk2", {testData: "m2"}, done); | ||
}); | ||
Then("It should be delivered once", function () { | ||
assert.deepEqual(messages, ["m1", "m2"]); | ||
}); | ||
}); | ||
Scenario("Cancelled sub", function () { | ||
@@ -71,0 +95,0 @@ var message; |
80761
1.61%535
5.11%167
8.44%