ironmq-queue-stream (WIP)
Tests
make test
Test coverage could be better.
Usage
IronStream = require("ironmq-queue-stream").IronStream;
var iron = new IronStream({projectId: "", projectToken: ""});
var queueOptions = {
ironmq: {
n: 100,
timeout: THIRTY_MINUTES
},
stream: {
highWaterMark: HIGHWATER_MARK
}
};
var someQueueStream = iron.queue("someQueue", queueOptions);
someQueueStream.pipe(someOtherStream);
var jsonParserOptions = {
onError: function(err, message) {
console.error("Problem parsing JSON for:", message, "Error:", err);
},
stream: {
highWaterMark: HIGHWATER_MARK
}
};
var parsedStream = IronStream.parseJson(queueStream, jsonParserOptions);
parsedStream.pipe(someOtherStream);
Sinks
var Sink = require("ironmq-queue-stream").Sink;
var iron = new IronStream({projectId: "", projectToken: ""});
var myQueueStream = iron.queue("myQueue");
var sink = new IronMQStream.Sink(myQueueStream, {deleteInBatchesOf: 100, stream: {highWaterMark: HIGHWATER_MARK}});
myQueueStream.pipe(someOtherStream).pipe(sink);
Error Handling