New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

exp-amqp-connection

Package Overview
Dependencies
Maintainers
4
Versions
58
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

exp-amqp-connection - npm Package Compare versions

Comparing version

to
1.2.4

.idea/.name

37

index.js

@@ -77,2 +77,7 @@ "use strict";

setImmediate(function () {
if (behaviour.deadLetterExchangeName) {
return setupDeadLetterExchange(behaviour.deadLetterExchangeName, function () {
callback(exch);
});
}
callback(exch);

@@ -83,2 +88,28 @@ });

function setupDeadLetterExchange(deadLetterExchangeName, callback) {
var options = {
durable: true,
autoDelete: false,
type: "topic"
};
conn.exchange(deadLetterExchangeName, options, function (ex) {
var options = {durable: true, autoDelete: false};
conn.queue(deadLetterExchangeName + ".deadLetterQueue", options, function (q) {
q.bind(ex, "#");
callback();
});
});
}
function getQueueOptions() {
if (behaviour.deadLetterExchangeName) {
queueOptions["arguments"] = queueOptions["arguments"] || {};
queueOptions["arguments"]["x-dead-letter-exchange"] = behaviour.deadLetterExchangeName;
if (!subscribeOptions.ack) {
throw new Error("Ack needs to be enabled in subscribeOptions for dead letter exchange to work");
}
}
return queueOptions;
}
function publish(routingKey, message, publishCallback) {

@@ -100,3 +131,3 @@ var actualPublishCallback = publishCallback || function () {};

logger.debug("Attempting to connect to queue", id);
conn.queue(queueName, queueOptions, function (queue) {
conn.queue(queueName, getQueueOptions(), function (queue) {
routingPatterns.forEach(function (routingPattern) {

@@ -122,2 +153,3 @@ queue.bind(behaviour.exchange, routingPattern);

}
attemptExclusiveSubscribe(1);

@@ -128,3 +160,4 @@ }

var actualSubscribeCallback = subscribeCallback || function () {};
conn.queue(queueName, queueOptions, function (queue) {
conn.queue(queueName, getQueueOptions(), function (queue) {
queue.on("error", function (queueError) {

@@ -131,0 +164,0 @@ return actualSubscribeCallback(queueError);

2

package.json

@@ -9,3 +9,3 @@ {

],
"version": "1.2.2",
"version": "1.2.4",
"scripts": {

@@ -12,0 +12,0 @@ "test": "NODE_ENV=test mocha -t 5000 && jshint ."

@@ -12,2 +12,3 @@ # Simple amqp library

* Reuse connection
* Dead letter exchange handling

@@ -37,5 +38,6 @@ ## API

logger: "..." // one-arg-function used for logging errors. Defaults to console.log
deadLetterExchangeName: "...", // Enable dead letter exchange by setting a name for it.
exchangeOptions: "...", // Options to pass to the exchange
queueOptions: "...", // Options to pass to the queue
subscribeOptions: "...", // Options to use for subscribing
subscribeOptions: "...", // Options to use for subscribing,
};

@@ -79,3 +81,3 @@ ```

if (err) return console.err(err);
conn.subscribe("myRoutingKey", "myQueueName", function (message) {
conn.subscribe("myRoutingKey", "myQueueName", function (message, headers, deliveryInfo, messageObject) {
console.log("Got message", message);

@@ -118,1 +120,32 @@ });

```
### Dead letter exchange
Messages from a queue can be 'dead-lettered'; that is, republished to another exchange.
For more information: https://www.rabbitmq.com/dlx.html
This option will create a dead letter queue with the name `deadLetterExchangeName + ".deadLetterQueue"`
```js
var amqpConn = require("exp-amqp-connection");
var options = {
exchange: "myExchange",
deadLetterExchangeName: "myExchange.dead",
subscribeOptions: {
ack: true // Ack must be enabled for dead lettering to work
}
}
amqpConn({host: "amqpHost"}, options, function (err, conn) {
if (err) return console.err(err);
conn.subscribe("myRoutingKey", "myQueueName", function (message, headers, deliveryInfo, messageObject) {
if (message) {
messageObject.acknowledge();
} else {
messageObject.reject(false); // reject=true, requeue=false causes dead-lettering
}
console.log("Got message", message);
});
});
```

@@ -8,2 +8,3 @@ "use strict";

var async = require("async");
var extend = require("../extend");

@@ -66,2 +67,44 @@ var defaultBehaviour = {exchange: "e1", errorLogger: console.log};

Feature("Dead letter exchange", function () {
Scenario("Publishing failed messages on dead letter exchange", function () {
var message;
var deadLetterExchangeName = "e1.dead";
var behaviour = extend(defaultBehaviour, {
deadLetterExchangeName: deadLetterExchangeName,
subscribeOptions: {
ack: true
}
});
after(disconnect);
And("We have a connection with a dead letter exchange", function (done) {
connect(defaultConnOpts, behaviour, ignoreErrors(done));
});
And("We are listing to the dead letter exchange", function (done) {
amqp(defaultConnOpts, {exchange: deadLetterExchangeName}, function (err, conn) {
if (err) return done(err);
conn.subscribe("#", "deadQ", function (msg) {
message = msg;
}, done);
});
});
And("We reject all messages", function (done) {
connection.subscribe("testRoutingKey", "testQ", function (msg, headers, deliveryInfo, ack) {
ack.reject(false);
}, done);
});
When("We publish a message", function (done) {
connection.publish("testRoutingKey", {testData: "hello"}, done);
});
Then("The message should be in the dead letter queue", function (done) {
setTimeout(function () {
assert.equal(message.testData, "hello");
done();
}, 50);
});
});
});
function testConnection(done) {

@@ -109,5 +152,5 @@ var randomRoutingKey = "RK" + crypto.randomBytes(6).toString("hex");

request.get("http://guest:guest@localhost:15672/api/connections",
function (err, resp, connections) {
callback(err, JSON.parse(connections));
});
function (err, resp, connections) {
callback(err, JSON.parse(connections));
});
}

@@ -114,0 +157,0 @@