rabbitmq-queue-stream
Advanced tools
Comparing version 0.1.9 to 0.1.10
@@ -287,3 +287,3 @@ var amqp = require("amqp"); | ||
var serializableMessage = { | ||
data: isJSON ? message : message.data, | ||
payload: isJSON ? message : message.data, | ||
headers: headers, | ||
@@ -337,7 +337,3 @@ deliveryInfo: deliveryInfo, | ||
systemDebug("_transform prepareMessage"); | ||
if(_.isPlainObject(message.data)) { | ||
this.push(_.merge(message.data, {_meta: message._meta})); | ||
} else { | ||
this.push(_.pick(message, "data", "_meta")); | ||
} | ||
this.push(_.pick(message, "payload", "_meta")); | ||
next(); | ||
@@ -344,0 +340,0 @@ }; |
{ | ||
"name": "rabbitmq-queue-stream", | ||
"version": "0.1.9", | ||
"version": "0.1.10", | ||
"description": "Reliable streaming interface to rabbitmq queues", | ||
@@ -5,0 +5,0 @@ "main": "index.js", |
@@ -58,6 +58,15 @@ # rabbitmq-queue-stream | ||
var myProcessingStream = new stream.Transform({objectMode: true}); | ||
myProcessingStream._transform(function(data, enc, next) { | ||
console.log("Doing something with", data); | ||
this.push(data); | ||
myProcessingStream._transform(function(obj, enc, next) { | ||
/* | ||
* Messages received from the source will have their data namespaced | ||
* in the `obj.payload` property. `payload` will contain a parsed | ||
* JSON object if clients specified contentType: application/json | ||
* when enqueuing the message. Messages enqueued with contentType: | ||
* application/json but are malformed will be automatically rejected. | ||
* Add a listener to event `parseError`, which will be emitted by | ||
* channel.source, to handle errors yourself. | ||
*/ | ||
this.push(obj); | ||
/* | ||
* Messages are successfully acked and removed from the queue by default. | ||
@@ -67,6 +76,6 @@ * RabbitMQStream provides methods to requeue and delete messages too. | ||
* Requeue: | ||
* this.push(RabbitMQStream.RequeueMessage(data)); | ||
* this.push(RabbitMQStream.RequeueMessage(obj)); | ||
* | ||
* Reject: | ||
* this.push(RabbitMQStream.RejectMessage(data)); | ||
* this.push(RabbitMQStream.RejectMessage(obj)); | ||
*/ | ||
@@ -108,3 +117,3 @@ next(); | ||
#### .source | ||
* parseError - Emitted when a job cannot be json parsed. | ||
* parseError - Emitted when a message specifies contentType: application/json but is malformed JSON. | ||
```javascript | ||
@@ -111,0 +120,0 @@ myQueueStream.source.on("parseError", function(err, message) { |
10
test.js
@@ -486,6 +486,6 @@ var EventEmitter = require("events").EventEmitter; | ||
instance._waitForMessage = sinon.stub(); | ||
instance._waitForMessage.onCall(0).yields({data: {"something": "somethingElse"}, _meta: { ackIndex: 10 }}); | ||
instance._waitForMessage.onCall(0).yields({payload: {"something": "somethingElse"}, _meta: { ackIndex: 10 }}); | ||
writable._write = function (message) { | ||
expect(message).to.eql({something: "somethingElse", _meta: {ackIndex: 10}}); | ||
expect(message).to.eql({payload: {something: "somethingElse"}, _meta: {ackIndex: 10}}); | ||
done(); | ||
@@ -763,3 +763,2 @@ }; | ||
//ack stub defers control to something that injects a new message until | ||
var ackStub = function(instance) { | ||
@@ -794,2 +793,7 @@ return { | ||
if(receivedMessages.length === 3) { | ||
expect(receivedMessages).to.eql([message1, message2, message3].map( | ||
function(m) { | ||
return {payload: m, _meta: {ackIndex: 0}}; | ||
} | ||
)); | ||
done(); | ||
@@ -796,0 +800,0 @@ } |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
45638
144