Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

rabbitmq-queue-stream

Package Overview
Dependencies
Maintainers
3
Versions
20
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

rabbitmq-queue-stream - npm Package Compare versions

Comparing version 0.2.0 to 0.2.1

63

index.js

@@ -65,2 +65,4 @@ var amqp = require("amqp");

function AMQPStreams(numStreams, options) {
EventEmitter.call(this);
this.__numStreams = numStreams || 1;

@@ -81,8 +83,27 @@ this.__options = options;

me._amqpConnection = connection;
me._connected = false;
// forward EventEmiter methods to underlying connection
_.without(_.keys(EventEmitter.prototype), 'emit').forEach(function(key) {
me[key] = connection[key].bind(connection);
connection.on('ready', function() {
me._connected = true;
});
connection.on('close', function() {
me._connected = false;
});
// forward connection events to `this`
['ready', 'error', 'close'].forEach(function(eventName) {
connection.on(eventName, function() {
// ignore error events while disconnecting, since those errors will be forwarded to
// disconnect's callback. See #disconnect()
if (eventName === 'error' && me._disconnecting) {
return;
}
var args = Array.prototype.slice.call(arguments);
args.unshift(eventName);
me.emit.apply(me, args);
});
});
//create individual stream channels to queue

@@ -118,15 +139,17 @@ var createWorker = function(n, cb) {

/* handle successful or error on initial connection */
connection.once("error", function(err) {
function onError(err) {
streamsDebug("Error creating connection " + err.message);
connection.removeAllListeners("ready");
connection.removeListener("ready", onReady);
return cb(err);
});
}
connection.once("ready", function() {
function onReady() {
streamsDebug("Successfully created connection");
connection.removeAllListeners("error");
connection.removeListener("error", onError);
return cb(null, connection);
});
}
/* handle successful or error on initial connection */
connection.once("error", onError);
connection.once("ready", onReady);
};

@@ -154,2 +177,8 @@

AMQPStreams.prototype.unsubscribeConsumers = function(cb) {
// noop if we're disconnected
if (!this._connected) {
streamsDebug("Skipping unsubscribeConsumers");
return cb();
}
//close every worker stream

@@ -171,2 +200,8 @@ async.eachSeries(this.channels, function(stream, next) {

AMQPStreams.prototype.closeConsumers = function(cb) {
// noop if we're disconnected
if (!this._connected) {
streamsDebug("Skipping closeConsumers");
return cb();
}
async.eachSeries(this.channels, function(stream, next) {

@@ -179,2 +214,9 @@ stream.close(next);

streamsDebug("Closing AMQP connection");
// noop if we're disconnected
if (!this._connected) {
streamsDebug("Skipping disconnect");
return cb();
}
this._disconnecting = true;
var me = this;

@@ -202,2 +244,3 @@ this._amqpConnection.disconnect();

this._amqpConnection.once("close", function() {
this._disconnecting = false;
me._amqpConnection.removeListener("error", ignoreEconnresetError);

@@ -204,0 +247,0 @@ cb();

5

package.json
{
"name": "rabbitmq-queue-stream",
"version": "0.2.0",
"version": "0.2.1",
"description": "Reliable streaming interface to rabbitmq queues",

@@ -38,3 +38,4 @@ "main": "index.js",

"Peter Hayes <peter.k.hayes@gmail.com>",
"Nick Bottomley <nhbottomley@gmail.com>"
"Nick Bottomley <nhbottomley@gmail.com>",
"Byron Wong <byronwong@gmail.com>"
],

@@ -41,0 +42,0 @@ "license": "MIT",

@@ -10,3 +10,14 @@ var EventEmitter = require("events").EventEmitter;

function stubConnection() {
var subscriptionObj = {addCallback: sinon.stub().yields({})};
var queueObj = new EventEmitter();
queueObj.subscribe = sinon.stub().returns(subscriptionObj);
var connectionObj = new EventEmitter();
connectionObj.queue = sinon.stub().yields(queueObj);
return connectionObj;
}
describe("rabbitmq-queue-stream", function() {

@@ -181,6 +192,15 @@ describe("AMQPStreams", function() {

describe("AMQP queue control methods", function() {
var amqp;
beforeEach(function() {
var rabbitmq = rewire('./');
var createConnectionStub,
connectionObj,
queueStreams;
beforeEach(function (done) {
connectionObj = stubConnection();
var AMQPStreams = rabbitmq.__get__('AMQPStreams');
createConnectionStub =
sinon.stub(AMQPStreams.prototype, "_createConnection").yields(null, connectionObj);
var streams = [];
amqp = new rabbitmq.AMQPStreams(6, {});
_.times(4, function() {

@@ -192,10 +212,26 @@ streams.push({

});
amqp.channels = streams;
rabbitmq.init(6, {
queue: {name: 'hi'}
}, function(err, qs) {
expect(err).to.not.be.ok();
queueStreams = qs;
queueStreams.channels = streams;
// simulate amqp connected
connectionObj.emit('ready');
done();
});
});
afterEach(function () {
createConnectionStub.restore();
});
describe("#unsubscribeConsumers", function() {
it("calls #unsubscribe for every stream in AMQPStreams.channels", function(done) {
amqp.unsubscribeConsumers(function(err) {
queueStreams.unsubscribeConsumers(function(err) {
expect(err).to.not.be.ok();
amqp.channels.forEach(function(channel) {
queueStreams.channels.forEach(function(channel) {
expect(channel.unsubscribe.callCount).to.be(1);

@@ -207,2 +243,14 @@ });

it("is a noop if currently disconnected from broker", function(done) {
// simulate amqp disconnected
connectionObj.emit('close');
queueStreams.unsubscribeConsumers(function(err) {
expect(err).to.not.be.ok();
queueStreams.channels.forEach(function(channel) {
expect(channel.unsubscribe.callCount).to.be(0);
});
done();
});
});
});

@@ -212,5 +260,5 @@

it("calls #close on every stream in AMQPStreams.channels", function(done) {
amqp.closeConsumers(function(err) {
queueStreams.closeConsumers(function(err) {
expect(err).to.not.be.ok();
amqp.channels.forEach(function(channel) {
queueStreams.channels.forEach(function(channel) {
expect(channel.close.callCount).to.be(1);

@@ -221,32 +269,42 @@ });

});
it("is a noop if currently disconnected from broker", function(done) {
// simulate amqp disconnected
connectionObj.emit('close');
queueStreams.closeConsumers(function(err) {
expect(err).to.not.be.ok();
queueStreams.channels.forEach(function(channel) {
expect(channel.close.callCount).to.be(0);
});
done();
});
});
});
describe("#disconnect", function() {
var mockAmqp;
beforeEach(function() {
mockAmqp = new EventEmitter();
mockAmqp.disconnect = sinon.spy();
amqp._amqpConnection = mockAmqp;
connectionObj.disconnect = sinon.spy();
});
it("calls #disconnect on the amqp connection", function(done) {
amqp.disconnect(function(err) {
queueStreams.disconnect(function(err) {
expect(err).to.not.be.ok();
expect(amqp._amqpConnection.disconnect.callCount).to.be(1);
expect(connectionObj.disconnect.callCount).to.be(1);
done();
});
//simulate successful close
amqp._amqpConnection.emit("close");
connectionObj.emit("close");
});
it("passes back an error to the callback when something goes wrong", function(done) {
amqp.disconnect(function(err) {
queueStreams.disconnect(function(err) {
expect(err).to.be.an(Error);
done();
});
amqp._amqpConnection.emit("error", new Error("Some disconnection error"));
connectionObj.emit("error", new Error("Some disconnection error"));
});
it("ignores TCP ECONNRESET errors", function(done) {
amqp.disconnect(function(err) {
queueStreams.disconnect(function(err) {
//will fail if first error event gets through

@@ -256,5 +314,16 @@ expect(err).to.not.be.ok();

});
amqp._amqpConnection.emit("error", new Error("ECONNRESET"));
amqp._amqpConnection.emit("close");
connectionObj.emit("error", new Error("ECONNRESET"));
connectionObj.emit("close");
});
it("is a noop if currently disconnected from broker", function(done) {
// simulate amqp disconnected
connectionObj.emit('close');
queueStreams.disconnect(function(err) {
expect(err).to.not.be.ok();
expect(connectionObj.disconnect.callCount).to.be(0);
done();
});
});
});

@@ -264,3 +333,3 @@

beforeEach(function() {
amqp.channels.forEach(function(stream) {
queueStreams.channels.forEach(function(stream) {
stream._subscribeToQueue = sinon.stub().yields(null);

@@ -271,3 +340,3 @@ });

afterEach(function() {
amqp.channels.forEach(function(stream) {
queueStreams.channels.forEach(function(stream) {
stream._subscribeToQueue.reset();

@@ -278,5 +347,5 @@ });

it("attempts to resubscribe to the queue if the worker is unsubscribed", function(done) {
amqp.resubscribeConsumers(function(err) {
queueStreams.resubscribeConsumers(function(err) {
expect(err).to.not.be.ok();
amqp.channels.forEach(function(stream) {
queueStreams.channels.forEach(function(stream) {
expect(stream._subscribeToQueue.callCount).to.be(1);

@@ -290,8 +359,8 @@ });

//since we haven't initialized the streams in the test, let's manually say we've subscribed here
amqp.channels.forEach(function(stream) {
queueStreams.channels.forEach(function(stream) {
stream.subscribed = true;
});
amqp.resubscribeConsumers(function(err) {
queueStreams.resubscribeConsumers(function(err) {
expect(err).to.not.be.ok();
amqp.channels.forEach(function(stream) {
queueStreams.channels.forEach(function(stream) {
expect(stream._subscribeToQueue.callCount).to.be(0);

@@ -307,15 +376,2 @@ });

var rabbitmq = rewire('./');
function stubConnection() {
var subscriptionObj = {addCallback: sinon.stub().yields({})};
var queueObj = new EventEmitter();
queueObj.subscribe = sinon.stub().returns(subscriptionObj);
var connectionObj = new EventEmitter();
connectionObj.queue = sinon.stub().yields(queueObj);
return connectionObj;
}
var createConnectionStub,

@@ -322,0 +378,0 @@ connectionObj;

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc