rabbitmq-queue-stream
Advanced tools
Comparing version 0.1.6 to 0.1.8
95
index.js
@@ -26,3 +26,4 @@ var amqp = require("amqp"); | ||
exports.RequeueMessage = function(message) { | ||
var RequeueMessage = function(message) { | ||
if(!message._meta) { | ||
@@ -35,3 +36,3 @@ return console.error(); | ||
exports.RejectMessage = function(message) { | ||
var RejectMessage = function(message) { | ||
if(!message._meta) { | ||
@@ -41,5 +42,9 @@ return console.error(); | ||
message._meta.reject = true; | ||
return message; | ||
}; | ||
exports.RequeueMessage = RequeueMessage; | ||
exports.RejectMessage = RejectMessage; | ||
/* | ||
@@ -270,18 +275,6 @@ | ||
/* TODO: Figure out how to error handle subscription. Maybe a 'once' error handler. */ | ||
queue.subscribe(_.merge({ack: true, prefetchCount: 1}, this.__options.subscribe), function(message, headers, deliveryInfo, ack) { | ||
var serializableMessage = { | ||
body: message.data, | ||
headers: headers, | ||
deliveryInfo: deliveryInfo | ||
}; | ||
/* | ||
* ack is not serializable, so we need to push it | ||
* onto the outstandingAck array right now and attach | ||
* an ackIndex number to the message | ||
*/ | ||
serializableMessage.ackIndex = me._insertAckIntoArray(ack); | ||
streamDebug("Received message. Inserted ack into index " + serializableMessage.ackIndex); | ||
me.__pendingQueue.push(serializableMessage); | ||
}).addCallback(function(ok) { | ||
queue.subscribe( | ||
_.merge({ack: true, prefetchCount: 1}, this.__options.subscribe), | ||
this._handleIncomingMessage.bind(this) | ||
).addCallback(function(ok) { | ||
streamDebug("Subscribed with consumer tag: " + ok.consumerTag); | ||
@@ -294,2 +287,32 @@ me.__consumerTag = ok.consumerTag; | ||
AMQPStream.prototype._handleIncomingMessage = function(message, headers, deliveryInfo, ack) { | ||
var isJSON = deliveryInfo.contentType === "application/json"; | ||
var serializableMessage = { | ||
data: isJSON ? message : message.data, | ||
headers: headers, | ||
deliveryInfo: deliveryInfo, | ||
_meta: { | ||
/* | ||
* ack is not serializable, so we need to push it | ||
* onto the outstandingAck array attach | ||
* an ackIndex number to the message | ||
*/ | ||
ackIndex: this._insertAckIntoArray(ack) | ||
} | ||
}; | ||
if(isJSON && deliveryInfo.parseError) { | ||
if(this.source.listeners('parseError').length) { | ||
return this.source.emit("parseError", deliveryInfo.parseError, deliveryInfo.rawData); | ||
} else { | ||
streamDebug("Automatically rejecting malformed message. " + | ||
"Add listener to 'parseError' for custom behavior"); | ||
return this.sink.write(RejectMessage(serializableMessage)); | ||
} | ||
} | ||
streamDebug("Received message. Inserted ack into index " + serializableMessage._meta.ackIndex); | ||
this.__pendingQueue.push(serializableMessage); | ||
}; | ||
AMQPStream.prototype._streamifyQueue = function(cb) { | ||
@@ -299,5 +322,9 @@ var queueStream, prepareMessage; | ||
var queue = this.__queue; | ||
var sink; | ||
streamInitDebug("Creating queue source"); | ||
/* Create the .source ReadableStream */ | ||
queueStream = new Readable({objectMode: true}); | ||
@@ -311,36 +338,16 @@ queueStream._read = function() { | ||
/* | ||
TODO: add json mode | ||
Transform gets a messages: | ||
{ | ||
body: String. In our case JSON parsable. | ||
headers: , | ||
deliveryInfo: , | ||
ackIndex: Number | ||
} | ||
This only pushes down the parsed body. It attaches the worker | ||
id into meta | ||
*/ | ||
streamInitDebug("Creating readable queue stream"); | ||
prepareMessage = new Transform({objectMode: true}); | ||
prepareMessage._transform = function(message, enc, next) { | ||
var parsedBody; | ||
systemDebug("_transform prepareMessage"); | ||
try { | ||
parsedBody = JSON.parse(message.body.toString()); | ||
} catch(e) { | ||
return this.emit("parseError", e, message); | ||
if(_.isPlainObject(message.data)) { | ||
this.push(_.merge(message.data, {_meta: message._meta})); | ||
} else { | ||
this.push(_.pick(message, "data", "_meta")); | ||
} | ||
parsedBody._meta = parsedBody._meta || {}; | ||
parsedBody._meta.ackIndex = message.ackIndex; | ||
this.push(parsedBody); | ||
next(); | ||
}; | ||
this.source = queueStream.pipe(prepareMessage); | ||
//add sink to instance | ||
var sink; | ||
/* Create the .sink WritableStream */ | ||
streamInitDebug("Creating queue sink"); | ||
@@ -347,0 +354,0 @@ sink = new Writable({objectMode: true}); |
{ | ||
"name": "rabbitmq-queue-stream", | ||
"version": "0.1.6", | ||
"version": "0.1.8", | ||
"description": "Reliable streaming interface to rabbitmq queues", | ||
@@ -5,0 +5,0 @@ "main": "index.js", |
101
test.js
@@ -282,3 +282,14 @@ var EventEmitter = require("events").EventEmitter; | ||
var instance, connection, amqpResponseStub; | ||
beforeEach(function() { | ||
connection = new EventEmitter(); | ||
connection.queue = sinon.stub(); | ||
instance = new rabbitmq.AMQPStream(connection); | ||
amqpResponseStub = sinon.stub({ | ||
acknowledge: function() {}, | ||
reject: function() {} | ||
}); | ||
}); | ||
describe("AMQPStream.create", function() { | ||
@@ -310,3 +321,2 @@ | ||
var options = {}; | ||
var connection = {}; | ||
var callback = sinon.stub(); | ||
@@ -324,6 +334,5 @@ rabbitmq.AMQPStream.create(connection, options, callback); | ||
var instance, connection, options, queue, connectToQueueStub, subscribeToQueueStub, streamifyQueueStub; | ||
var instance, options, queue, connectToQueueStub, subscribeToQueueStub, streamifyQueueStub; | ||
beforeEach(function() { | ||
connection = {}; | ||
options = { | ||
@@ -380,9 +389,7 @@ name: "myStream", | ||
var instance, connection, cb; | ||
var cb; | ||
beforeEach(function() { | ||
cb = sinon.stub(); | ||
connection = new EventEmitter(); | ||
connection.queue = sinon.stub(); | ||
instance = new rabbitmq.AMQPStream(connection); | ||
// instance = new rabbitmq.AMQPStream(connection); | ||
}); | ||
@@ -417,7 +424,57 @@ | ||
it("passes the `connection` option to the underlying driver for queue initialization", function() { | ||
instance = new rabbitmq.AMQPStream(connection, {connection: {passive: false}}); | ||
instance.__connection = new EventEmitter(); | ||
instance.__connection.queue = sinon.stub(); | ||
instance._connectToQueue("myQueue", function() {}); | ||
expect(instance.__connection.queue.args[0][1]).to.eql({passive: false}); | ||
}); | ||
}); | ||
describe("#_handleIncomingMessage", function() { | ||
beforeEach(function() { | ||
//setup .source and .sink | ||
instance._streamifyQueue(function() {}); | ||
}); | ||
describe("when contentType === 'application/json'", function() { | ||
var message = {}; | ||
var headers = {}; | ||
var deliveryInfo = { | ||
parseError: new Error(), | ||
rawData: '{"malformed":', | ||
contentType: 'application/json', | ||
headers: {}, | ||
deliveryMode: 1, | ||
queue: 'some-queue', | ||
deliveryTag: new Buffer("delivery-tag"), | ||
redelivered: false, | ||
exchange: '', | ||
routingKey: 'some-queue', | ||
consumerTag: 'consumer-tag' | ||
}; | ||
it("automatically rejects any malformed message when no event listeners exist on 'parseError'", function (done) { | ||
instance.sink.on("rejected", function(msg) { | ||
done(); | ||
}); | ||
instance._handleIncomingMessage(null, {}, deliveryInfo, amqpResponseStub); | ||
}); | ||
it("emits a `parseError` event on invalid JSON when there are event listeners", function (done) { | ||
instance.source.on("parseError", function(msg) { | ||
done(); | ||
}); | ||
instance._handleIncomingMessage(null, {}, deliveryInfo, amqpResponseStub); | ||
}); | ||
}); | ||
}); | ||
describe("#_streamifyQueue", function() { | ||
var instance, cb, writable, readable; | ||
var cb, writable, readable; | ||
@@ -432,18 +489,5 @@ beforeEach(function () { | ||
describe("stream.source", function () { | ||
it("emits a `parseError` event on invalid JSON in message.body", function (done) { | ||
var badMessage = {}; | ||
instance._waitForMessage = sinon.stub(); | ||
instance._waitForMessage.onCall(0).yields(badMessage); | ||
instance._streamifyQueue(cb); | ||
instance.source.on("parseError", function (err, msg) { | ||
expect(err).to.be.an(Error); | ||
expect(msg).to.be(badMessage); | ||
done(); | ||
}); | ||
instance.source.pipe(writable); | ||
}); | ||
it("parses message, adds ackIndex, pushes downstream", function (done) { | ||
instance._waitForMessage = sinon.stub(); | ||
instance._waitForMessage.onCall(0).yields({body: '{"something": "somethingElse"}', ackIndex: 10}); | ||
instance._waitForMessage.onCall(0).yields({data: {"something": "somethingElse"}, _meta: { ackIndex: 10 }}); | ||
@@ -458,13 +502,16 @@ writable._write = function (message) { | ||
}); | ||
it("passes the `subscribe` option properly to the underlying driver", function () { | ||
instance = new rabbitmq.AMQPStream(connection, {subscribe: {prefetchCount: 100}}); | ||
instance.__queue = {subscribe: sinon.stub()}; //.onFirstCall().returns({addCallback: function() {}}); | ||
instance.__queue.subscribe.onFirstCall().returns({addCallback: function() {}}); | ||
instance._subscribeToQueue(); | ||
expect(instance.__queue.subscribe.args[0][0]).to.eql({ack: true, prefetchCount: 100}); | ||
}); | ||
}); | ||
describe("stream.sink", function () { | ||
var amqpResponseStub; | ||
var goodMessage; | ||
beforeEach(function() { | ||
amqpResponseStub = sinon.stub({ | ||
acknowledge: function() {}, | ||
reject: function() {} | ||
}); | ||
goodMessage = {_meta: {ackIndex: 1}}; | ||
@@ -471,0 +518,0 @@ }); |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
43532
996