rabbitmq-queue-stream
Advanced tools
Comparing version 0.3.0 to 0.4.0
43
index.js
@@ -259,2 +259,3 @@ var _ = require("lodash"); | ||
function AMQPStream(connection, options, workerNum) { | ||
this.id = workerNum; | ||
this.__connection = connection; | ||
@@ -480,4 +481,46 @@ this.__options = options || {}; | ||
/* | ||
* Test helper that responds with with a mock `source` | ||
* and `sink` properties. Only returns one channel. | ||
*/ | ||
exports.createWithTestMessages = function(testMessages) { | ||
var stubSource = new Readable({objectMode: true}); | ||
stubSource._read = function() { | ||
var itemWrapper; | ||
var nextItem = testMessages.shift(); | ||
if(nextItem) { | ||
itemWrapper = { | ||
headers: {}, | ||
deliveryInfo: {}, | ||
payload: nextItem, | ||
_meta: {} | ||
}; | ||
} | ||
this.push(itemWrapper); | ||
}; | ||
var stubSink = new Writable({objectMode: true}); | ||
stubSink._write = function(item, enc, next) { | ||
var evt; | ||
if(item._meta.requeue) { | ||
evt = "requeued"; | ||
} else if(item._meta.reject) { | ||
evt = "rejected"; | ||
} else { | ||
evt = "acknowledged"; | ||
} | ||
this.emit(evt, item); | ||
next(); | ||
}; | ||
return { | ||
channels: [{ | ||
source: stubSource, | ||
sink: stubSink | ||
}] | ||
}; | ||
}; | ||
/* export for testing */ | ||
exports.AMQPStreams = AMQPStreams; | ||
exports.AMQPStream = AMQPStream; |
{ | ||
"name": "rabbitmq-queue-stream", | ||
"version": "0.3.0", | ||
"version": "0.4.0", | ||
"description": "Reliable streaming interface to rabbitmq queues", | ||
@@ -5,0 +5,0 @@ "main": "index.js", |
@@ -114,3 +114,31 @@ # rabbitmq-queue-stream | ||
``` | ||
There also a helper method that helps with integration test | ||
```javascript | ||
var RabbitMQStream = require("rabbitmq-queue-stream"); | ||
var Transform = require("stream").Transform; | ||
var myTransformStream = new Transform({objectMode: true}); | ||
myTransformStream._transform = function(item, enc, next) { | ||
console.log("Transforming item:", item); | ||
this.push(item); | ||
next(); | ||
}; | ||
var streamifiedQueues = RabbitMQStream.createWithTestMessages([ | ||
"testMessage1", | ||
{testMessage: "2"}, | ||
{testMessage: "3"} | ||
]); | ||
/* | ||
* streamifiedQueues.channels will contain one channel with a | ||
* streamable .source and .sink. | ||
*/ | ||
var channel = streamifiedQueues.channels.shift(); | ||
channel.source | ||
.pipe(myTransformStream) | ||
.pipe(channel.sink); | ||
//channel .sink emits 'requeued', 'rejected', and 'acknowledged' events | ||
channel.sink.on("acknowledged", console.log.bind(null, "Acknowledged message!")); | ||
``` | ||
### Emitted Events | ||
@@ -117,0 +145,0 @@ |
59
test.js
@@ -931,2 +931,61 @@ var EventEmitter = require("events").EventEmitter; | ||
describe("#createWithTestMessages", function() { | ||
var testMessages, collector, collectedMessages, channel; | ||
beforeEach(function() { | ||
testMessages = [ | ||
"testMessage1", | ||
{testMessage: "2"}, | ||
{testMessage: "3"} | ||
]; | ||
collectedMessages = []; | ||
collector = new stream.Transform({objectMode: true}); | ||
collector._transform = function(item, enc, next) { | ||
collectedMessages.push(item); | ||
this.push(); | ||
next(); | ||
}; | ||
var streamifiedQueues = rabbitmq.createWithTestMessages(_.clone(testMessages, true)); | ||
channel = streamifiedQueues.channels.shift(); | ||
}); | ||
it("returns a stub that gives you the messages that you set in the `payload` field", function(done) { | ||
channel.source | ||
.pipe(collector) | ||
.pipe(channel.sink); | ||
collector.on("end", function() { | ||
var receivedMessages = collectedMessages.map(function(m) { return m.payload; }); | ||
expect(receivedMessages).to.eql(testMessages); | ||
done(); | ||
}); | ||
}); | ||
it(".sink emits 'requeued', 'rejected', and 'acknowledged' events", function(done) { | ||
var events = ["requeued", "rejected", "acknowledged"]; | ||
var emittedEvents = []; | ||
var handler = new stream.Transform({objectMode: true}); | ||
var queueActions = ["RequeueMessage", "RejectMessage"]; | ||
handler._transform = function(item, enc, cb) { | ||
if(queueActions.length) { | ||
this.push(rabbitmq[queueActions.shift()](item)); | ||
} else { | ||
this.push(item); | ||
} | ||
cb(); | ||
}; | ||
events.forEach(function(evt) { | ||
channel.sink.on(evt, emittedEvents.push.bind(emittedEvents)); | ||
}); | ||
channel.source | ||
.pipe(handler) | ||
.pipe(channel.sink); | ||
handler.on("end", function() { | ||
expect(emittedEvents.length).to.be(3); | ||
done(); | ||
}); | ||
}); | ||
}); | ||
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
54735
1270
187