exp-amqp-connection
Advanced tools
Comparing version
32
index.js
@@ -51,13 +51,11 @@ "use strict"; | ||
var logger = getLog(behaviour.logger); | ||
var exchangeOptions = extend(defaultExchangeOptions, behaviour.exchangeOptions); | ||
var queueOptions = extend(defaultQueueOptions, behaviour.queueOptions); | ||
var subscribeOptions = extend(defaultSubscribeOptions, behaviour.subscribeOptions); | ||
var exchange = null; | ||
connectionConfig.clientProperties = | ||
{"capabilities": {"consumer_cancel_notify": !!behaviour.consumerCancelNotification}}; | ||
connectionConfig.heartbeat = 10; | ||
var conn = amqp.createConnection(connectionConfig, {reconnect: !behaviour.dieOnError}); | ||
var conn = amqp.createConnection(connectionConfig); | ||
if (behaviour.reuse) { | ||
@@ -67,6 +65,12 @@ savedConns[behaviour.reuse] = conn; | ||
conn.on("close", function (hadError) { | ||
logger.info("Connectoion closed", hadError); | ||
}); | ||
conn.on("error", function (connectionError) { | ||
handleError(connectionError, logger); | ||
}); | ||
conn.once("error", callback); | ||
conn.once("ready", function () { | ||
@@ -142,3 +146,3 @@ conn.removeListener("error", callback); | ||
queue.on("basicCancel", function () { | ||
handleError("Subscription cancelled from server side", onExclusiveCallback, logger); | ||
handleError("Subscription cancelled from server side", logger); | ||
}); | ||
@@ -175,3 +179,3 @@ queue.on("error", function (err) { | ||
queue.on("basicCancel", function () { | ||
handleError("Subscription cancelled from server side", actualSubscribeCallback, logger); | ||
handleError("Subscription cancelled from server side", logger, true); | ||
}); | ||
@@ -184,15 +188,19 @@ queue.bind(behaviour.exchange, routingKey); | ||
function close(callback) { | ||
conn.disconnect(callback); | ||
if (callback) { | ||
conn.once("close", function () {callback();}); | ||
} | ||
conn.disconnect(); | ||
} | ||
function handleError(error, logger) { | ||
function handleError(error, logger, reconnect) { | ||
if (behaviour.dieOnError) { | ||
logger.error("Killing myself over error: ", error); | ||
setTimeout(function () { | ||
logger.error(error); | ||
process.exit(1); | ||
}, 3000); | ||
} else if (reconnect) { | ||
logger.info("Destroying connection forcing reconnect due to error:", error); | ||
conn.socket.destroy(); | ||
} | ||
if (logger) { | ||
logger.error(util.format("Amqp error", error, "\n", error.stack)); | ||
} | ||
logger.error(util.format("Amqp error", error, "\n", error.stack || "")); | ||
} | ||
@@ -199,0 +207,0 @@ |
@@ -10,3 +10,3 @@ { | ||
], | ||
"version": "1.3.0", | ||
"version": "1.3.1", | ||
"scripts": { | ||
@@ -13,0 +13,0 @@ "test": "NODE_ENV=test mocha -t 5000 && jshint ." |
@@ -11,3 +11,3 @@ "use strict"; | ||
var defaultBehaviour = {exchange: "e1", logger: console}; | ||
var defaultBehaviour = {exchange: "e1", logger: console, consumerCancelNotification: true}; | ||
var defaultConnOpts = {}; | ||
@@ -58,3 +58,3 @@ var connection; | ||
And("We create a subscription", function (done) { | ||
connection.subscribe("testRoutingKey", "testQ", function (msg) { | ||
connection.subscribe("testRoutingKey", "testQ1", function (msg) { | ||
message = msg; | ||
@@ -70,2 +70,25 @@ }, done); | ||
}); | ||
Scenario("Cancelled sub", function () { | ||
var message; | ||
after(disconnect); | ||
And("We have a connection", function (done) { | ||
connect(defaultConnOpts, defaultBehaviour, ignoreErrors(done)); | ||
}); | ||
And("We create a subscription", function (done) { | ||
connection.subscribe("testRoutingKey", "testQ2", function (msg) { | ||
message = msg; | ||
}, done); | ||
}); | ||
And("We delete the queue", function (done) { | ||
deleteRabbitQueue("testQ2", done); | ||
}); | ||
And("We wait a little", function (done) {setTimeout(done, 3000);}); | ||
And("We publish a message", function (done) { | ||
connection.publish("testRoutingKey", {testData: "hello"}, done); | ||
}); | ||
Then("It should arrive correctly", function () { | ||
assert.equal(message.testData, "hello"); | ||
}); | ||
}); | ||
}); | ||
@@ -103,3 +126,3 @@ | ||
And("We reject all messages", function (done) { | ||
connection.subscribe("testRoutingKey", "testQ", function (msg, headers, deliveryInfo, ack) { | ||
connection.subscribe("testRoutingKey", "TestQ3", function (msg, headers, deliveryInfo, ack) { | ||
ack.reject(false); | ||
@@ -157,4 +180,4 @@ }, done); | ||
function disconnect() { | ||
if (connection) connection.close(); | ||
function disconnect(done) { | ||
if (connection) connection.close(done); | ||
} | ||
@@ -180,5 +203,5 @@ | ||
request.del(exchangeUrl, function (err) { | ||
deleteResource(exchangeUrl, function (err) { | ||
if (err) return done(err); | ||
request.del(queueUrl, done); | ||
deleteResource(queueUrl, done); | ||
}); | ||
@@ -188,3 +211,15 @@ } | ||
function killRabbitConnection(connection, done) { | ||
request.del("http://guest:guest@localhost:15672/api/connections/" + connection.name, done); | ||
deleteResource("http://guest:guest@localhost:15672/api/connections/" + connection.name, done); | ||
} | ||
function deleteRabbitQueue(queue, done) { | ||
deleteResource("http://guest:guest@localhost:15672/api/queues/%2F/" + queue, done); | ||
} | ||
function deleteResource(url, done) { | ||
request.del(url, function (err, resp, body) { | ||
if (err) return done(err); | ||
if (resp.statusCode >= 300) return done(resp.statusCode + " " + body); | ||
done(); | ||
}); | ||
} |
Sorry, the diff of this file is not supported yet
82843
7.59%36
16.13%506
14.48%