rabbitmq-queue-stream
Advanced tools
Comparing version 0.2.1 to 0.3.0
72
index.js
@@ -0,17 +1,13 @@ | ||
var _ = require("lodash"); | ||
var amqp = require("amqp"); | ||
var async = require("async"); | ||
var _ = require("lodash"); | ||
var streamsDebug = require("debug")("amqp-streams"); | ||
var streamInitDebug = require("debug")("amqp-stream-init"); | ||
var streamDebug = require("debug")("amqp-stream"); //stream runtime debug | ||
var systemDebug = require("debug")("system"); | ||
var debug = require("debug"); | ||
var Readable = require("stream").Readable; | ||
var Writable = require("stream").Writable; | ||
var Transform = require("stream").Transform; | ||
var EventEmitter = require('events').EventEmitter; | ||
var EventEmitter = require('events').EventEmitter; | ||
var Transform = require("stream").Transform; | ||
var Readable = require("stream").Readable; | ||
var Writable = require("stream").Writable; | ||
var streamsDebug = debug("rabbitmq-queue-stream:streams"); | ||
exports.init = function(numStreams, options, cb) { | ||
@@ -27,3 +23,2 @@ if(!cb) { | ||
var RequeueMessage = function(message) { | ||
@@ -74,3 +69,2 @@ if(!message._meta) { | ||
AMQPStreams.prototype.initialize = function(cb) { | ||
@@ -232,3 +226,3 @@ streamsDebug("Initializing " + this.__numStreams + " streams"); | ||
if(_.contains(err.message, "ECONNRESET")) { | ||
streamDebug("Ignoring ECONNRESET error"); | ||
streamsDebug("Ignoring ECONNRESET error"); | ||
return; | ||
@@ -267,3 +261,3 @@ } | ||
*/ | ||
function AMQPStream(connection, options) { | ||
function AMQPStream(connection, options, workerNum) { | ||
this.__connection = connection; | ||
@@ -273,9 +267,9 @@ this.__options = options || {}; | ||
this.__pendingQueue = []; | ||
this.__debug = debug("rabbitmq-queue-stream:worker:" + (workerNum || "-")); | ||
} | ||
AMQPStream.create = function(connection, options, cb) { | ||
this._totalWorkers = this._totalWorkers || 0; | ||
this._totalWorkers++; | ||
streamInitDebug("Creating stream " + this._totalWorkers); | ||
var stream = new this(connection, options); | ||
this.__totalWorkers = this.__totalWorkers || 0; | ||
this.__totalWorkers++; | ||
var stream = new this(connection, options, this.__totalWorkers); | ||
stream.initialize(cb); | ||
@@ -286,3 +280,3 @@ }; | ||
var me = this; | ||
streamInitDebug("Initializing"); | ||
this.__debug("Initializing"); | ||
if(!this.__options.name) { | ||
@@ -303,3 +297,3 @@ throw new Error("You must provide a `name` to queueStream options object"); | ||
if(err) { | ||
streamInitDebug("Error subscribe to queue " + this.__options.name + ". " + err.message); | ||
me.__debug("Error subscribe to queue " + this.__options.name + ". " + err.message); | ||
return cb(err); | ||
@@ -316,3 +310,3 @@ } | ||
function onError(err) { | ||
streamInitDebug("Error connecting to queue " + queueName + ": " + err.message); | ||
me.__debug("Error connecting to queue " + queueName + ": " + err.message); | ||
return cb(err); | ||
@@ -322,3 +316,3 @@ } | ||
this.__connection.queue(queueName, _.merge({passive: true}, this.__options.connection), function(queue) { | ||
streamInitDebug("Connected to queue " + queueName); | ||
me.__debug("Connected to queue " + queueName); | ||
me.__connection.removeListener("error", onError); | ||
@@ -337,3 +331,3 @@ return cb(null, queue); | ||
).addCallback(function(ok) { | ||
streamDebug("Subscribed with consumer tag: " + ok.consumerTag); | ||
me.__debug("Subscribed with consumer tag: " + ok.consumerTag); | ||
me.__consumerTag = ok.consumerTag; | ||
@@ -366,3 +360,3 @@ me.subscribed = true; | ||
} else { | ||
streamDebug("Automatically rejecting malformed message. " + | ||
this.__debug("Automatically rejecting malformed message. " + | ||
"Add listener to 'parseError' for custom behavior"); | ||
@@ -372,3 +366,3 @@ return this.sink.write(RejectMessage(serializableMessage)); | ||
} | ||
streamDebug("Received message. Inserted ack into index " + serializableMessage._meta.ackIndex); | ||
this.__debug("Received message. Inserted ack into index " + serializableMessage._meta.ackIndex); | ||
this.__pendingQueue.push(serializableMessage); | ||
@@ -383,10 +377,7 @@ }; | ||
streamInitDebug("Creating queue source"); | ||
/* Create the .source ReadableStream */ | ||
queueStream = new Readable({objectMode: true}); | ||
queueStream._read = function() { | ||
systemDebug("_read queueStream"); | ||
me.__debug("_read .source"); | ||
me._waitForMessage(function(message) { | ||
@@ -397,6 +388,4 @@ this.push(message); | ||
streamInitDebug("Creating readable queue stream"); | ||
prepareMessage = new Transform({objectMode: true}); | ||
prepareMessage._transform = function(message, enc, next) { | ||
systemDebug("_transform prepareMessage"); | ||
this.push(_.pick(message, "payload", "_meta")); | ||
@@ -408,8 +397,7 @@ next(); | ||
/* Create the .sink WritableStream */ | ||
streamInitDebug("Creating queue sink"); | ||
sink = new Writable({objectMode: true}); | ||
sink._write = function(message, enc, next) { | ||
systemDebug("_write sink"); | ||
me.__debug("_write .sink"); | ||
if(!message._meta || !_.isNumber(message._meta.ackIndex)) { | ||
streamDebug("Could not find ackIndex in message " + message); | ||
me.__debug("Could not find ackIndex in message " + message); | ||
return this.emit("formatError", new Error("No ack index for message"), message); | ||
@@ -420,3 +408,3 @@ } | ||
//something went wrong and we can't ack message | ||
streamDebug("Could not find ack function for " + message); | ||
me.__debug("Could not find ack function for " + message); | ||
return this.emit("ackError", new Error("Cannot find ack for message."), message); | ||
@@ -448,7 +436,7 @@ } | ||
if(_.isEmpty(this.__pendingQueue)) { | ||
streamDebug("Waiting for message"); | ||
this.__debug("Waiting for message"); | ||
i = setInterval(function() { | ||
if(!_.isEmpty(me.__pendingQueue)) { | ||
clearInterval(i); | ||
streamDebug("Received messages. Continuing..."); | ||
me.__debug("Received messages. Continuing..."); | ||
return cb(me.__pendingQueue.shift()); | ||
@@ -458,3 +446,3 @@ } | ||
} else { | ||
streamDebug("Dequeueing pending message"); | ||
this.__debug("Dequeueing pending message"); | ||
cb(this.__pendingQueue.shift()); | ||
@@ -481,3 +469,3 @@ } | ||
var me = this; | ||
streamDebug("Unsubscribing with consumerTag " + this.__consumerTag); | ||
this.__debug("Unsubscribing with consumerTag " + this.__consumerTag); | ||
if(this.subscribed) { | ||
@@ -489,3 +477,3 @@ this.__queue.unsubscribe(this.__consumerTag).addCallback(function() { | ||
} else { | ||
streamDebug("Worker already unsubscribed"); | ||
this.__debug("Worker already unsubscribed"); | ||
cb(); | ||
@@ -497,3 +485,3 @@ } | ||
var me = this; | ||
streamDebug("Unsubscribing with consumerTag " + this.__consumerTag); | ||
this.__debug("Unsubscribing with consumerTag " + this.__consumerTag); | ||
this.__queue.close(this.__consumer); | ||
@@ -500,0 +488,0 @@ var closeHandler = function() { |
{ | ||
"name": "rabbitmq-queue-stream", | ||
"version": "0.2.1", | ||
"version": "0.3.0", | ||
"description": "Reliable streaming interface to rabbitmq queues", | ||
@@ -5,0 +5,0 @@ "main": "index.js", |
@@ -438,3 +438,3 @@ var EventEmitter = require("events").EventEmitter; | ||
rabbitmq.AMQPStream.create(); | ||
expect(rabbitmq.AMQPStream._totalWorkers).to.equal(3); | ||
expect(rabbitmq.AMQPStream.__totalWorkers).to.equal(3); | ||
}); | ||
@@ -464,3 +464,3 @@ | ||
queue = new EventEmitter(); | ||
instance = new rabbitmq.AMQPStream(connection, options); | ||
instance = new rabbitmq.AMQPStream(connection, options, 1); | ||
connectToQueueStub = sinon.stub(rabbitmq.AMQPStream.prototype, "_connectToQueue"); | ||
@@ -467,0 +467,0 @@ subscribeToQueueStub = sinon.stub(rabbitmq.AMQPStream.prototype, "_subscribeToQueue"); |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
51144
1177