exp-amqp-connection
Advanced tools
Comparing version
50
index.js
@@ -21,4 +21,4 @@ "use strict"; | ||
heartbeat: 10, | ||
resubscribeOnError: true, | ||
productName: getProductName(), | ||
resubscribeOnError: true, | ||
queueArguments: {}, | ||
@@ -32,2 +32,4 @@ prefetch: 20, | ||
const api = new EventEmitter(); | ||
const amqpEvents = new EventEmitter(); | ||
amqpEvents.on("error", (err) => api.emit("error", err)); | ||
behaviour = Object.assign({}, defaultBehaviour, behaviour); | ||
@@ -38,9 +40,15 @@ | ||
bootstrap(behaviour, (bootstrapErr, bootstrapRes) => { | ||
if (bootstrapErr) api.emit("error", bootstrapErr); | ||
if (bootstrapErr) amqpEvents.emit("error", bootstrapErr); | ||
if (bootstrapRes && bootstrapRes.virgin) { | ||
bootstrapRes.connection.on("error", (err) => api.emit("error", err)); | ||
bootstrapRes.connection.on("close", (why) => api.emit("error", why)); | ||
api.emit("connected"); | ||
bootstrapRes.pubChannel.on("error", (err) => api.emit("error", err)); | ||
bootstrapRes.subChannel.on("error", (err) => api.emit("error", err)); | ||
bootstrapRes.connection.on("error", (err) => amqpEvents.emit("error", `AMQP connection error: ${err}`)); | ||
bootstrapRes.connection.on("close", (err) => amqpEvents.emit("error", `AMQP connection error: ${err}`)); | ||
// Only way to detect explicit close from management console.... | ||
if (bootstrapRes.connection.connection && bootstrapRes.connection.connection.stream) { | ||
bootstrapRes.connection.connection.stream.on( | ||
"close", (why) => amqpEvents.emit("error", `AMQP connection closed: ${why}`) | ||
); | ||
} | ||
bootstrapRes.pubChannel.on("error", (err) => amqpEvents.emit("error", `AMQP pub channel error: ${err}`)); | ||
bootstrapRes.subChannel.on("error", (err) => amqpEvents.emit("error", `AMQP sub channel error: ${err}`)); | ||
bootstrapRes.pubChannel.assertExchange(behaviour.exchange, "topic"); | ||
@@ -70,3 +78,3 @@ } | ||
if (!message) { | ||
return api.emit("error", "Subscription cancelled"); | ||
return amqpEvents.emit("error", "Subscription cancelled"); | ||
} | ||
@@ -102,16 +110,15 @@ const ackFun = () => subChannel.ack(message); | ||
let attempt = 1; | ||
if (behaviour.resubscribeOnError) { | ||
const resubscribeOnError = (err) => { | ||
if (err && !resubTimer) { | ||
behaviour.logger.info("Amqp error received. Resubscribing in 5 secs.", err.message); | ||
resubTimer = setTimeout(() => { | ||
attempt = attempt + 1; | ||
doSubscribe(routingKeyOrKeys, queue, handler, attempt); | ||
resubTimer = null; | ||
}, 5000); | ||
} | ||
}; | ||
api.on("error", resubscribeOnError); | ||
} | ||
const resubscribeOnError = (err) => { | ||
if (err && !resubTimer && behaviour.resubscribeOnError) { | ||
behaviour.logger.info("Amqp error received. Resubscribing in 5 secs.", err.message); | ||
resubTimer = setTimeout(() => { | ||
attempt = attempt + 1; | ||
doSubscribe(routingKeyOrKeys, queue, handler, attempt); | ||
resubTimer = null; | ||
}, 5000); | ||
} | ||
}; | ||
doSubscribe(routingKeyOrKeys, queue, handler, attempt); | ||
amqpEvents.on("error", resubscribeOnError); | ||
}; | ||
@@ -152,4 +159,3 @@ | ||
"x-dead-letter-exchange": behaviour.exchange, | ||
"x-message-ttl": delay, | ||
"x-expires": delay + 60000 | ||
"x-message-ttl": delay | ||
} | ||
@@ -156,0 +162,0 @@ }); |
@@ -10,3 +10,3 @@ { | ||
], | ||
"version": "3.1.1-beta-2", | ||
"version": "3.1.1-beta-3", | ||
"scripts": { | ||
@@ -13,0 +13,0 @@ "predockertest": "docker-compose up -d rabbitmq && docker-compose build app", |
@@ -33,3 +33,3 @@ "use strict"; | ||
And("We create a subscription that will nack one message without requeueing", (done) => { | ||
broker.on("subscribed", () => done()); | ||
broker.once("subscribed", () => done()); | ||
broker.subscribeTmp("testNackRoutingKey", (msg, meta, ack) => { | ||
@@ -36,0 +36,0 @@ received.push({ |
@@ -69,3 +69,2 @@ "use strict"; | ||
Scenario("Unparsable message", () => { | ||
@@ -260,5 +259,5 @@ let nMessages = 0; | ||
Then("An error 320 should be raised", () => { | ||
assert.equal(320, error.code); | ||
assert(error != null); | ||
}); | ||
}); | ||
}); |
@@ -11,6 +11,8 @@ "use strict"; | ||
const rabbitUrl = process.env.RABBIT_URL || "amqp://guest:guest@localhost"; | ||
const defaultBehaviour = { exchange: "e1", confirm: true, url: rabbitUrl }; | ||
const defaultBehaviour = { exchange: "e1", confirm: true, url: rabbitUrl, resubscribeOnError: false }; | ||
function init(customBehaviour) { | ||
return impl(Object.assign({}, defaultBehaviour, customBehaviour)); | ||
const broker = impl(Object.assign({}, defaultBehaviour, customBehaviour)); | ||
broker.on("error", () => {}); | ||
return broker; | ||
} | ||
@@ -17,0 +19,0 @@ |
Sorry, the diff of this file is not supported yet
55970
3.54%1228
0.57%