rabbitmq-queue-stream
Advanced tools
Comparing version 0.1.5 to 0.1.6
12
index.js
@@ -34,7 +34,7 @@ var amqp = require("amqp"); | ||
exports.DeleteMessage = function(message) { | ||
exports.RejectMessage = function(message) { | ||
if(!message._meta) { | ||
return console.error(); | ||
} | ||
message._meta.delete = true; | ||
message._meta.reject = true; | ||
return message; | ||
@@ -355,11 +355,15 @@ }; | ||
/* TODO: How do we handle errors from acking? */ | ||
var evt; | ||
if(message._meta.requeue) { | ||
me.__outstandingAcks[ackIndex].reject(true); | ||
} else if(message._meta.delete) { | ||
evt = "requeued"; | ||
} else if(message._meta.reject) { | ||
me.__outstandingAcks[ackIndex].reject(false); | ||
evt = "rejected"; | ||
} else { | ||
me.__outstandingAcks[ackIndex].acknowledge(false); | ||
evt = "acknowledged"; | ||
} | ||
me.__outstandingAcks[ackIndex] = null; | ||
this.emit("deleted", message); | ||
this.emit(evt, message); | ||
next(); | ||
@@ -366,0 +370,0 @@ }; |
{ | ||
"name": "rabbitmq-queue-stream", | ||
"version": "0.1.5", | ||
"version": "0.1.6", | ||
"description": "Reliable streaming interface to rabbitmq queues", | ||
"main": "index.js", | ||
"scripts": { | ||
"test": "make test-codecov.io" | ||
"test": "make test" | ||
}, | ||
@@ -9,0 +9,0 @@ "repository": { |
@@ -68,4 +68,4 @@ # rabbitmq-queue-stream | ||
* | ||
* Delete: | ||
* this.push(RabbitMQStream.DeleteMessage(data)); | ||
* Reject: | ||
* this.push(RabbitMQStream.RejectMessage(data)); | ||
*/ | ||
@@ -107,3 +107,3 @@ next(); | ||
#### .source | ||
* parseError - Emitted when a job cannot be json parsed. Passes in malform | ||
* parseError - Emitted when a job cannot be json parsed. | ||
```javascript | ||
@@ -116,11 +116,13 @@ myQueueStream.source.on("parseError", function(err, message) { | ||
#### .sink | ||
* deleted - Emitted everytime a job is deleted from the queue | ||
* acknowledged - Emitted everytime a message is acknowledged | ||
* rejected - Emitted when a message is rejected | ||
* requeued - Emitted when a message is requeued | ||
```javascript | ||
var totalDeleted = 0; | ||
myQueueStream.source.on("deleted", function() { | ||
console.log("Deleted", totalDeleted++); | ||
var totalAcked = 0; | ||
myQueueStream.source.on("acknowledged", function(message) { | ||
console.log("Acknowledged:", message); | ||
totalAcked++; | ||
}); | ||
``` | ||
* formatError - Sink received a job that does not have the necessary information to be deleted from the queue. | ||
Most likely emitted when objects not originating from .source are written to sink. | ||
* formatError - Sink received a job that does not have the necessary information to be deleted from the queue. Most likely emitted when objects not originating from .source are written to sink. | ||
```javascript | ||
@@ -127,0 +129,0 @@ myQueueStream.sink.on("formatError", function(err, message) { |
12
test.js
@@ -501,3 +501,3 @@ var EventEmitter = require("events").EventEmitter; | ||
it("releases messages from queue when tagged with rabbitmq.ReleaseMessage", function(done) { | ||
it("releases messages from queue when tagged with rabbitmq.RequeueMessage", function(done) { | ||
instance.__outstandingAcks = [ | ||
@@ -511,3 +511,3 @@ undefined, | ||
instance._streamifyQueue(cb); | ||
instance.sink.on("deleted", function () { | ||
instance.sink.on("requeued", function () { | ||
expect(amqpResponseStub.reject.callCount).to.be(1); | ||
@@ -521,3 +521,3 @@ expect(amqpResponseStub.reject.args[0][0]).to.be(true); | ||
it("removes messages from queue when tagged with AMQPStream.RemoveMessage", function(done) { | ||
it("removes messages from queue when tagged with AMQPStream.RejectMessage", function(done) { | ||
instance.__outstandingAcks = [ | ||
@@ -528,6 +528,6 @@ undefined, | ||
readable._read = function () { | ||
this.push(rabbitmq.DeleteMessage(goodMessage)); | ||
this.push(rabbitmq.RejectMessage(goodMessage)); | ||
}; | ||
instance._streamifyQueue(cb); | ||
instance.sink.on("deleted", function () { | ||
instance.sink.on("rejected", function () { | ||
expect(amqpResponseStub.reject.callCount).to.be(1); | ||
@@ -551,3 +551,3 @@ expect(amqpResponseStub.reject.args[0][0]).to.be(false); | ||
instance._streamifyQueue(cb); | ||
instance.sink.on("deleted", function () { | ||
instance.sink.on("acknowledged", function () { | ||
expect(amqpResponseStub.acknowledge.callCount).to.be(1); | ||
@@ -554,0 +554,0 @@ expect(amqpResponseStub.acknowledge.args[0][0]).to.be(false); |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
41393
956
135