rabbitmq-queue-stream
Advanced tools
Comparing version 0.2.0 to 0.2.1
63
index.js
@@ -65,2 +65,4 @@ var amqp = require("amqp"); | ||
function AMQPStreams(numStreams, options) { | ||
EventEmitter.call(this); | ||
this.__numStreams = numStreams || 1; | ||
@@ -81,8 +83,27 @@ this.__options = options; | ||
me._amqpConnection = connection; | ||
me._connected = false; | ||
// forward EventEmiter methods to underlying connection | ||
_.without(_.keys(EventEmitter.prototype), 'emit').forEach(function(key) { | ||
me[key] = connection[key].bind(connection); | ||
connection.on('ready', function() { | ||
me._connected = true; | ||
}); | ||
connection.on('close', function() { | ||
me._connected = false; | ||
}); | ||
// forward connection events to `this` | ||
['ready', 'error', 'close'].forEach(function(eventName) { | ||
connection.on(eventName, function() { | ||
// ignore error events while disconnecting, since those errors will be forwarded to | ||
// disconnect's callback. See #disconnect() | ||
if (eventName === 'error' && me._disconnecting) { | ||
return; | ||
} | ||
var args = Array.prototype.slice.call(arguments); | ||
args.unshift(eventName); | ||
me.emit.apply(me, args); | ||
}); | ||
}); | ||
//create individual stream channels to queue | ||
@@ -118,15 +139,17 @@ var createWorker = function(n, cb) { | ||
/* handle successful or error on initial connection */ | ||
connection.once("error", function(err) { | ||
function onError(err) { | ||
streamsDebug("Error creating connection " + err.message); | ||
connection.removeAllListeners("ready"); | ||
connection.removeListener("ready", onReady); | ||
return cb(err); | ||
}); | ||
} | ||
connection.once("ready", function() { | ||
function onReady() { | ||
streamsDebug("Successfully created connection"); | ||
connection.removeAllListeners("error"); | ||
connection.removeListener("error", onError); | ||
return cb(null, connection); | ||
}); | ||
} | ||
/* handle successful or error on initial connection */ | ||
connection.once("error", onError); | ||
connection.once("ready", onReady); | ||
}; | ||
@@ -154,2 +177,8 @@ | ||
AMQPStreams.prototype.unsubscribeConsumers = function(cb) { | ||
// noop if we're disconnected | ||
if (!this._connected) { | ||
streamsDebug("Skipping unsubscribeConsumers"); | ||
return cb(); | ||
} | ||
//close every worker stream | ||
@@ -171,2 +200,8 @@ async.eachSeries(this.channels, function(stream, next) { | ||
AMQPStreams.prototype.closeConsumers = function(cb) { | ||
// noop if we're disconnected | ||
if (!this._connected) { | ||
streamsDebug("Skipping closeConsumers"); | ||
return cb(); | ||
} | ||
async.eachSeries(this.channels, function(stream, next) { | ||
@@ -179,2 +214,9 @@ stream.close(next); | ||
streamsDebug("Closing AMQP connection"); | ||
// noop if we're disconnected | ||
if (!this._connected) { | ||
streamsDebug("Skipping disconnect"); | ||
return cb(); | ||
} | ||
this._disconnecting = true; | ||
var me = this; | ||
@@ -202,2 +244,3 @@ this._amqpConnection.disconnect(); | ||
this._amqpConnection.once("close", function() { | ||
this._disconnecting = false; | ||
me._amqpConnection.removeListener("error", ignoreEconnresetError); | ||
@@ -204,0 +247,0 @@ cb(); |
{ | ||
"name": "rabbitmq-queue-stream", | ||
"version": "0.2.0", | ||
"version": "0.2.1", | ||
"description": "Reliable streaming interface to rabbitmq queues", | ||
@@ -38,3 +38,4 @@ "main": "index.js", | ||
"Peter Hayes <peter.k.hayes@gmail.com>", | ||
"Nick Bottomley <nhbottomley@gmail.com>" | ||
"Nick Bottomley <nhbottomley@gmail.com>", | ||
"Byron Wong <byronwong@gmail.com>" | ||
], | ||
@@ -41,0 +42,0 @@ "license": "MIT", |
136
test.js
@@ -10,3 +10,14 @@ var EventEmitter = require("events").EventEmitter; | ||
function stubConnection() { | ||
var subscriptionObj = {addCallback: sinon.stub().yields({})}; | ||
var queueObj = new EventEmitter(); | ||
queueObj.subscribe = sinon.stub().returns(subscriptionObj); | ||
var connectionObj = new EventEmitter(); | ||
connectionObj.queue = sinon.stub().yields(queueObj); | ||
return connectionObj; | ||
} | ||
describe("rabbitmq-queue-stream", function() { | ||
@@ -181,6 +192,15 @@ describe("AMQPStreams", function() { | ||
describe("AMQP queue control methods", function() { | ||
var amqp; | ||
beforeEach(function() { | ||
var rabbitmq = rewire('./'); | ||
var createConnectionStub, | ||
connectionObj, | ||
queueStreams; | ||
beforeEach(function (done) { | ||
connectionObj = stubConnection(); | ||
var AMQPStreams = rabbitmq.__get__('AMQPStreams'); | ||
createConnectionStub = | ||
sinon.stub(AMQPStreams.prototype, "_createConnection").yields(null, connectionObj); | ||
var streams = []; | ||
amqp = new rabbitmq.AMQPStreams(6, {}); | ||
_.times(4, function() { | ||
@@ -192,10 +212,26 @@ streams.push({ | ||
}); | ||
amqp.channels = streams; | ||
rabbitmq.init(6, { | ||
queue: {name: 'hi'} | ||
}, function(err, qs) { | ||
expect(err).to.not.be.ok(); | ||
queueStreams = qs; | ||
queueStreams.channels = streams; | ||
// simulate amqp connected | ||
connectionObj.emit('ready'); | ||
done(); | ||
}); | ||
}); | ||
afterEach(function () { | ||
createConnectionStub.restore(); | ||
}); | ||
describe("#unsubscribeConsumers", function() { | ||
it("calls #unsubscribe for every stream in AMQPStreams.channels", function(done) { | ||
amqp.unsubscribeConsumers(function(err) { | ||
queueStreams.unsubscribeConsumers(function(err) { | ||
expect(err).to.not.be.ok(); | ||
amqp.channels.forEach(function(channel) { | ||
queueStreams.channels.forEach(function(channel) { | ||
expect(channel.unsubscribe.callCount).to.be(1); | ||
@@ -207,2 +243,14 @@ }); | ||
it("is a noop if currently disconnected from broker", function(done) { | ||
// simulate amqp disconnected | ||
connectionObj.emit('close'); | ||
queueStreams.unsubscribeConsumers(function(err) { | ||
expect(err).to.not.be.ok(); | ||
queueStreams.channels.forEach(function(channel) { | ||
expect(channel.unsubscribe.callCount).to.be(0); | ||
}); | ||
done(); | ||
}); | ||
}); | ||
}); | ||
@@ -212,5 +260,5 @@ | ||
it("calls #close on every stream in AMQPStreams.channels", function(done) { | ||
amqp.closeConsumers(function(err) { | ||
queueStreams.closeConsumers(function(err) { | ||
expect(err).to.not.be.ok(); | ||
amqp.channels.forEach(function(channel) { | ||
queueStreams.channels.forEach(function(channel) { | ||
expect(channel.close.callCount).to.be(1); | ||
@@ -221,32 +269,42 @@ }); | ||
}); | ||
it("is a noop if currently disconnected from broker", function(done) { | ||
// simulate amqp disconnected | ||
connectionObj.emit('close'); | ||
queueStreams.closeConsumers(function(err) { | ||
expect(err).to.not.be.ok(); | ||
queueStreams.channels.forEach(function(channel) { | ||
expect(channel.close.callCount).to.be(0); | ||
}); | ||
done(); | ||
}); | ||
}); | ||
}); | ||
describe("#disconnect", function() { | ||
var mockAmqp; | ||
beforeEach(function() { | ||
mockAmqp = new EventEmitter(); | ||
mockAmqp.disconnect = sinon.spy(); | ||
amqp._amqpConnection = mockAmqp; | ||
connectionObj.disconnect = sinon.spy(); | ||
}); | ||
it("calls #disconnect on the amqp connection", function(done) { | ||
amqp.disconnect(function(err) { | ||
queueStreams.disconnect(function(err) { | ||
expect(err).to.not.be.ok(); | ||
expect(amqp._amqpConnection.disconnect.callCount).to.be(1); | ||
expect(connectionObj.disconnect.callCount).to.be(1); | ||
done(); | ||
}); | ||
//simulate successful close | ||
amqp._amqpConnection.emit("close"); | ||
connectionObj.emit("close"); | ||
}); | ||
it("passes back an error to the callback when something goes wrong", function(done) { | ||
amqp.disconnect(function(err) { | ||
queueStreams.disconnect(function(err) { | ||
expect(err).to.be.an(Error); | ||
done(); | ||
}); | ||
amqp._amqpConnection.emit("error", new Error("Some disconnection error")); | ||
connectionObj.emit("error", new Error("Some disconnection error")); | ||
}); | ||
it("ignores TCP ECONNRESET errors", function(done) { | ||
amqp.disconnect(function(err) { | ||
queueStreams.disconnect(function(err) { | ||
//will fail if first error event gets through | ||
@@ -256,5 +314,16 @@ expect(err).to.not.be.ok(); | ||
}); | ||
amqp._amqpConnection.emit("error", new Error("ECONNRESET")); | ||
amqp._amqpConnection.emit("close"); | ||
connectionObj.emit("error", new Error("ECONNRESET")); | ||
connectionObj.emit("close"); | ||
}); | ||
it("is a noop if currently disconnected from broker", function(done) { | ||
// simulate amqp disconnected | ||
connectionObj.emit('close'); | ||
queueStreams.disconnect(function(err) { | ||
expect(err).to.not.be.ok(); | ||
expect(connectionObj.disconnect.callCount).to.be(0); | ||
done(); | ||
}); | ||
}); | ||
}); | ||
@@ -264,3 +333,3 @@ | ||
beforeEach(function() { | ||
amqp.channels.forEach(function(stream) { | ||
queueStreams.channels.forEach(function(stream) { | ||
stream._subscribeToQueue = sinon.stub().yields(null); | ||
@@ -271,3 +340,3 @@ }); | ||
afterEach(function() { | ||
amqp.channels.forEach(function(stream) { | ||
queueStreams.channels.forEach(function(stream) { | ||
stream._subscribeToQueue.reset(); | ||
@@ -278,5 +347,5 @@ }); | ||
it("attempts to resubscribe to the queue if the worker is unsubscribed", function(done) { | ||
amqp.resubscribeConsumers(function(err) { | ||
queueStreams.resubscribeConsumers(function(err) { | ||
expect(err).to.not.be.ok(); | ||
amqp.channels.forEach(function(stream) { | ||
queueStreams.channels.forEach(function(stream) { | ||
expect(stream._subscribeToQueue.callCount).to.be(1); | ||
@@ -290,8 +359,8 @@ }); | ||
//since we haven't initialized the streams in the test, let's manually say we've subscribed here | ||
amqp.channels.forEach(function(stream) { | ||
queueStreams.channels.forEach(function(stream) { | ||
stream.subscribed = true; | ||
}); | ||
amqp.resubscribeConsumers(function(err) { | ||
queueStreams.resubscribeConsumers(function(err) { | ||
expect(err).to.not.be.ok(); | ||
amqp.channels.forEach(function(stream) { | ||
queueStreams.channels.forEach(function(stream) { | ||
expect(stream._subscribeToQueue.callCount).to.be(0); | ||
@@ -307,15 +376,2 @@ }); | ||
var rabbitmq = rewire('./'); | ||
function stubConnection() { | ||
var subscriptionObj = {addCallback: sinon.stub().yields({})}; | ||
var queueObj = new EventEmitter(); | ||
queueObj.subscribe = sinon.stub().returns(subscriptionObj); | ||
var connectionObj = new EventEmitter(); | ||
connectionObj.queue = sinon.stub().yields(queueObj); | ||
return connectionObj; | ||
} | ||
var createConnectionStub, | ||
@@ -322,0 +378,0 @@ connectionObj; |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
51414
1183