rabbitmq-queue-stream
Advanced tools
Comparing version 0.4.0 to 0.4.1
15
index.js
@@ -149,4 +149,5 @@ var _ = require("lodash"); | ||
/* | ||
* NOTE: A proper disconnection routine from rabbitMQ should be | ||
* NOTE: A graceful disconnection routine from rabbitMQ should be | ||
* done in the following order: | ||
@@ -165,2 +166,14 @@ * | ||
/* | ||
* Helper method that strings together the above disconnection | ||
* routine. | ||
*/ | ||
AMQPStreams.prototype.gracefulDisconnect = function(cb) { | ||
async.series([ | ||
this.unsubscribeConsumers.bind(this), | ||
this.closeConsumers.bind(this), | ||
this.disconnect.bind(this) | ||
], cb); | ||
}; | ||
/* | ||
* Stops fetching messages from the queue. Channels are kept open. | ||
@@ -167,0 +180,0 @@ * Use AMQPStreams#closeConsumers to close the channels to queue. |
{ | ||
"name": "rabbitmq-queue-stream", | ||
"version": "0.4.0", | ||
"version": "0.4.1", | ||
"description": "Reliable streaming interface to rabbitmq queues", | ||
@@ -5,0 +5,0 @@ "main": "index.js", |
@@ -91,23 +91,8 @@ # rabbitmq-queue-stream | ||
/* example graceful shutdown routine */ | ||
var gracefulShutdown = function() { | ||
//stop fetching messages | ||
streamifiedQueues.unsubscribeConsumers(function(err) { | ||
if(err) { | ||
//handle error | ||
} | ||
//Wait some time for queues to flush out before closing consumers. | ||
streamifiedQueues.closeConsumers(function(err) { | ||
if(err) { | ||
//handle error | ||
} | ||
streamifiedQueues.disconnect(function(err) { | ||
if(err) { | ||
//handle error | ||
} | ||
process.exit(0); | ||
}); | ||
}); | ||
process.on("SIGTERM", function() { | ||
streamifiedQueues.gracefulDisconnect(function(err) { | ||
// process.exit | ||
}); | ||
}; | ||
}); | ||
}); | ||
@@ -114,0 +99,0 @@ ``` |
24
test.js
@@ -204,3 +204,3 @@ var EventEmitter = require("events").EventEmitter; | ||
var streams = []; | ||
_.times(4, function() { | ||
_.times(6, function() { | ||
streams.push({ | ||
@@ -325,2 +325,24 @@ unsubscribe: sinon.stub().yields(null), | ||
describe("#gracefulDisconnect", function() { | ||
beforeEach(function() { | ||
queueStreams.unsubscribeConsumers = sinon.stub().yields(null); | ||
queueStreams.closeConsumers = sinon.stub().yields(null); | ||
queueStreams.disconnect = sinon.stub().yields(null); | ||
}); | ||
it("calls #unsubscribeConsumers, #closeConsumers, #disconnect", function(done) { | ||
queueStreams.gracefulDisconnect(function(err) { | ||
expect(err).to.not.be.ok(); | ||
// queueStreams.channels.forEach(function(channel) { | ||
// expect(channel.disconnect.callCount).to.be(1); | ||
// }); | ||
expect(queueStreams.unsubscribeConsumers.callCount).to.be(1); | ||
expect(queueStreams.closeConsumers.callCount).to.be(1); | ||
expect(queueStreams.disconnect.callCount).to.be(1); | ||
done(); | ||
}); | ||
}); | ||
}); | ||
describe("#resubscribeConsumers", function() { | ||
@@ -327,0 +349,0 @@ beforeEach(function() { |
55476
1300
172