rabbitmq-queue-stream
Tests
$ make test
Usage
$ npm i rabbitmq-queue-stream
var RabbitMQStream = require("rabbitmq-queue-stream");
var stream = require("stream");
var options = {
connection: {
url: "amqp://user:password@rabbitmq.com"
},
queueStream: {
name: "myQueue",
prefetchCount: 100
}
};
RabbitMQStream.init(2, options, function(err, streamifiedQueues) {
if(err) {
return console.error(err);
}
streamifiedQueues.channels.forEach(function(channel) {
var myProcessingStream = new stream.Transform({objectMode: true});
myProcessingStream._transform(function(data, enc, next) {
console.log("Doing something with", data);
this.push(data);
next();
});
channel.source
.pipe(myProcessingStream)
.pipe(channel.sink);
});
var gracefulShutdown = function() {
streamifiedQueues.unsubscribeConsumers(function(err) {
if(err) {
}
streamifiedQueues.closeConsumers(function(err) {
if(err) {
}
streamifiedQueues.disconnect(function(err) {
if(err) {
}
process.exit(0);
});
});
});
};
});
Emitted Events
.source
- parseError - Emitted when a job cannot be json parsed. Passes in malform
myQueueStream.source.on("parseError", function(err, message) {
console.error("Problem JSON parsing message", message);
});
.sink
- deleted - Emitted everytime a job is deleted from the queue
var totalDeleted = 0;
myQueueStream.source.on("deleted", function() {
console.log("Deleted", totalDeleted++);
});
- formatError - Sink received a job that does not have the necessary information to be deleted from the queue.
Most likely emitted when objects not originating from .source are written to sink.
myQueueStream.sink.on("formatError", function(err, message) {
console.error("Malformatted message written to .sink. Please check your pipeline configuration", message);
});
TODO
- Add a jsonMode to make automatic parsing of source data optional