New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

exp-amqp-connection

Package Overview
Dependencies
Maintainers
4
Versions
58
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

exp-amqp-connection - npm Package Compare versions

Comparing version

to
1.3.1

.tern-port

32

index.js

@@ -51,13 +51,11 @@ "use strict";

var logger = getLog(behaviour.logger);
var exchangeOptions = extend(defaultExchangeOptions, behaviour.exchangeOptions);
var queueOptions = extend(defaultQueueOptions, behaviour.queueOptions);
var subscribeOptions = extend(defaultSubscribeOptions, behaviour.subscribeOptions);
var exchange = null;
connectionConfig.clientProperties =
{"capabilities": {"consumer_cancel_notify": !!behaviour.consumerCancelNotification}};
connectionConfig.heartbeat = 10;
var conn = amqp.createConnection(connectionConfig, {reconnect: !behaviour.dieOnError});
var conn = amqp.createConnection(connectionConfig);
if (behaviour.reuse) {

@@ -67,6 +65,12 @@ savedConns[behaviour.reuse] = conn;

conn.on("close", function (hadError) {
logger.info("Connectoion closed", hadError);
});
conn.on("error", function (connectionError) {
handleError(connectionError, logger);
});
conn.once("error", callback);
conn.once("ready", function () {

@@ -142,3 +146,3 @@ conn.removeListener("error", callback);

queue.on("basicCancel", function () {
handleError("Subscription cancelled from server side", onExclusiveCallback, logger);
handleError("Subscription cancelled from server side", logger);
});

@@ -175,3 +179,3 @@ queue.on("error", function (err) {

queue.on("basicCancel", function () {
handleError("Subscription cancelled from server side", actualSubscribeCallback, logger);
handleError("Subscription cancelled from server side", logger, true);
});

@@ -184,15 +188,19 @@ queue.bind(behaviour.exchange, routingKey);

function close(callback) {
conn.disconnect(callback);
if (callback) {
conn.once("close", function () {callback();});
}
conn.disconnect();
}
function handleError(error, logger) {
function handleError(error, logger, reconnect) {
if (behaviour.dieOnError) {
logger.error("Killing myself over error: ", error);
setTimeout(function () {
logger.error(error);
process.exit(1);
}, 3000);
} else if (reconnect) {
logger.info("Destroying connection forcing reconnect due to error:", error);
conn.socket.destroy();
}
if (logger) {
logger.error(util.format("Amqp error", error, "\n", error.stack));
}
logger.error(util.format("Amqp error", error, "\n", error.stack || ""));
}

@@ -199,0 +207,0 @@

@@ -10,3 +10,3 @@ {

],
"version": "1.3.0",
"version": "1.3.1",
"scripts": {

@@ -13,0 +13,0 @@ "test": "NODE_ENV=test mocha -t 5000 && jshint ."

@@ -11,3 +11,3 @@ "use strict";

var defaultBehaviour = {exchange: "e1", logger: console};
var defaultBehaviour = {exchange: "e1", logger: console, consumerCancelNotification: true};
var defaultConnOpts = {};

@@ -58,3 +58,3 @@ var connection;

And("We create a subscription", function (done) {
connection.subscribe("testRoutingKey", "testQ", function (msg) {
connection.subscribe("testRoutingKey", "testQ1", function (msg) {
message = msg;

@@ -70,2 +70,25 @@ }, done);

});
Scenario("Cancelled sub", function () {
var message;
after(disconnect);
And("We have a connection", function (done) {
connect(defaultConnOpts, defaultBehaviour, ignoreErrors(done));
});
And("We create a subscription", function (done) {
connection.subscribe("testRoutingKey", "testQ2", function (msg) {
message = msg;
}, done);
});
And("We delete the queue", function (done) {
deleteRabbitQueue("testQ2", done);
});
And("We wait a little", function (done) {setTimeout(done, 3000);});
And("We publish a message", function (done) {
connection.publish("testRoutingKey", {testData: "hello"}, done);
});
Then("It should arrive correctly", function () {
assert.equal(message.testData, "hello");
});
});
});

@@ -103,3 +126,3 @@

And("We reject all messages", function (done) {
connection.subscribe("testRoutingKey", "testQ", function (msg, headers, deliveryInfo, ack) {
connection.subscribe("testRoutingKey", "TestQ3", function (msg, headers, deliveryInfo, ack) {
ack.reject(false);

@@ -157,4 +180,4 @@ }, done);

function disconnect() {
if (connection) connection.close();
function disconnect(done) {
if (connection) connection.close(done);
}

@@ -180,5 +203,5 @@

request.del(exchangeUrl, function (err) {
deleteResource(exchangeUrl, function (err) {
if (err) return done(err);
request.del(queueUrl, done);
deleteResource(queueUrl, done);
});

@@ -188,3 +211,15 @@ }

function killRabbitConnection(connection, done) {
request.del("http://guest:guest@localhost:15672/api/connections/" + connection.name, done);
deleteResource("http://guest:guest@localhost:15672/api/connections/" + connection.name, done);
}
function deleteRabbitQueue(queue, done) {
deleteResource("http://guest:guest@localhost:15672/api/queues/%2F/" + queue, done);
}
function deleteResource(url, done) {
request.del(url, function (err, resp, body) {
if (err) return done(err);
if (resp.statusCode >= 300) return done(resp.statusCode + " " + body);
done();
});
}

Sorry, the diff of this file is not supported yet