rabbitmq-queue-stream
Advanced tools
Comparing version 0.1.3 to 0.1.4
28
index.js
@@ -25,2 +25,19 @@ var amqp = require("amqp"); | ||
exports.RequeueMessage = function(message) { | ||
if(!message._meta) { | ||
return console.error(); | ||
} | ||
message._meta.requeue = true; | ||
return message; | ||
}; | ||
exports.DeleteMessage = function(message) { | ||
if(!message._meta) { | ||
return console.error(); | ||
} | ||
message._meta.delete = true; | ||
return message; | ||
}; | ||
/* | ||
@@ -337,5 +354,12 @@ | ||
} | ||
me.__outstandingAcks[ackIndex].acknowledge(false); | ||
/* TODO: How do we handle errors from acking? */ | ||
if(message._meta.requeue) { | ||
me.__outstandingAcks[ackIndex].reject(true); | ||
} else if(message._meta.delete) { | ||
me.__outstandingAcks[ackIndex].reject(false); | ||
} else { | ||
me.__outstandingAcks[ackIndex].acknowledge(false); | ||
} | ||
me.__outstandingAcks[ackIndex] = null; | ||
this.emit("deleted"); | ||
this.emit("deleted", message); | ||
next(); | ||
@@ -342,0 +366,0 @@ }; |
{ | ||
"name": "rabbitmq-queue-stream", | ||
"version": "0.1.3", | ||
"version": "0.1.4", | ||
"description": "Reliable streaming interface to rabbitmq queues", | ||
@@ -28,2 +28,4 @@ "main": "index.js", | ||
"devDependencies": { | ||
"codecov.io": "0.0.8", | ||
"istanbul": "0.3.5", | ||
"mocha": "2.0.1", | ||
@@ -30,0 +32,0 @@ "expect.js": "0.2.0", |
# rabbitmq-queue-stream | ||
[![Build Status](https://travis-ci.org/classdojo/rabbitmq-queue-stream.svg?branch=master)](https://travis-ci.org/classdojo/rabbitmq-queue-stream) | ||
[![codecov.io](https://codecov.io/github/classdojo/rabbitmq-queue-stream/coverage.svg?branch=master)](https://codecov.io/github/classdojo/rabbitmq-queue-stream?branch=master) | ||
[![NPM version](https://badge.fury.io/js/rabbitmq-queue-stream.png)](http://badge.fury.io/js/rabbitmq-queue-stream) | ||
### Tests | ||
@@ -52,2 +56,12 @@ ```bash | ||
this.push(data); | ||
/* | ||
* Messages are successfully acked and removed from the queue by default. | ||
* RabbitMQStream provides methods to requeue and delete messages too. | ||
* | ||
* Requeue: | ||
* this.push(RabbitMQStream.RequeueMessage(data)); | ||
* | ||
* Delete: | ||
* this.push(RabbitMQStream.DeleteMessage(data)); | ||
*/ | ||
next(); | ||
@@ -54,0 +68,0 @@ }); |
57
test.js
@@ -455,2 +455,13 @@ var EventEmitter = require("events").EventEmitter; | ||
describe("stream.sink", function () { | ||
var amqpResponseStub; | ||
var goodMessage; | ||
beforeEach(function() { | ||
amqpResponseStub = sinon.stub({ | ||
acknowledge: function() {}, | ||
reject: function() {} | ||
}); | ||
goodMessage = {_meta: {ackIndex: 1}}; | ||
}); | ||
it("emits a `formatError` when message._meta has no ackIndex", function (done) { | ||
@@ -491,12 +502,44 @@ var badMessage = { | ||
it("acknowledges ack, nulls it, and emits a `deleted` event", function (done) { | ||
var acknowledgeStub = sinon.stub(); | ||
it("releases messages from queue when tagged with rabbitmq.ReleaseMessage", function(done) { | ||
instance.__outstandingAcks = [ | ||
undefined, | ||
{acknowledge: acknowledgeStub} | ||
amqpResponseStub | ||
]; | ||
var goodMessage = { | ||
_meta: {ackIndex: 1} | ||
readable._read = function () { | ||
this.push(rabbitmq.RequeueMessage(goodMessage)); | ||
}; | ||
instance._streamifyQueue(cb); | ||
instance.sink.on("deleted", function () { | ||
expect(amqpResponseStub.reject.callCount).to.be(1); | ||
expect(amqpResponseStub.reject.args[0][0]).to.be(true); | ||
expect(instance.__outstandingAcks[1]).to.be(null); | ||
done(); | ||
}); | ||
readable.pipe(instance.sink); | ||
}); | ||
it("removes messages from queue when tagged with AMQPStream.RemoveMessage", function(done) { | ||
instance.__outstandingAcks = [ | ||
undefined, | ||
amqpResponseStub | ||
]; | ||
readable._read = function () { | ||
this.push(rabbitmq.DeleteMessage(goodMessage)); | ||
}; | ||
instance._streamifyQueue(cb); | ||
instance.sink.on("deleted", function () { | ||
expect(amqpResponseStub.reject.callCount).to.be(1); | ||
expect(amqpResponseStub.reject.args[0][0]).to.be(false); | ||
expect(instance.__outstandingAcks[1]).to.be(null); | ||
done(); | ||
}); | ||
readable.pipe(instance.sink); | ||
}); | ||
it("acknowledges ack, nulls it, and emits a `deleted` event", function (done) { | ||
instance.__outstandingAcks = [ | ||
undefined, | ||
amqpResponseStub | ||
]; | ||
readable._read = function () { | ||
this.push(goodMessage); | ||
@@ -507,4 +550,4 @@ }; | ||
instance.sink.on("deleted", function () { | ||
expect(acknowledgeStub.callCount).to.be(1); | ||
expect(acknowledgeStub.args[0][0]).to.be(false); | ||
expect(amqpResponseStub.acknowledge.callCount).to.be(1); | ||
expect(amqpResponseStub.acknowledge.args[0][0]).to.be(false); | ||
expect(instance.__outstandingAcks[1]).to.be(null); | ||
@@ -511,0 +554,0 @@ done(); |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
40715
9
953
128
6