rabbitmq-queue-stream
Tests
$ make test
Usage
$ npm i rabbitmq-queue-stream
var RabbitMQStream = require("rabbitmq-queue-stream");
var stream = require("stream");
var options = {
connection: {
url: "amqp://user:password@rabbitmq.com"
},
nodeAmqp: {
reconnect: false
}
queue: {
name: "myQueue",
subscribe: {
},
connection: {
}
}
};
RabbitMQStream.init(2, options, function(err, streamifiedQueues) {
if(err) {
return console.error(err);
}
streamifiedQueues.channels.forEach(function(channel) {
var myProcessingStream = new stream.Transform({objectMode: true});
myProcessingStream._transform(function(obj, enc, next) {
this.push(obj);
next();
});
channel.source
.pipe(myProcessingStream)
.pipe(channel.sink);
});
process.on("SIGTERM", function() {
streamifiedQueues.gracefulDisconnect(function(err) {
});
});
});
There also a helper method that helps with integration test
var RabbitMQStream = require("rabbitmq-queue-stream");
var Transform = require("stream").Transform;
var myTransformStream = new Transform({objectMode: true});
myTransformStream._transform = function(item, enc, next) {
console.log("Transforming item:", item);
this.push(item);
next();
};
var streamifiedQueues = RabbitMQStream.createWithTestMessages([
"testMessage1",
{testMessage: "2"},
{testMessage: "3"}
]);
var channel = streamifiedQueues.channels.shift();
channel.source
.pipe(myTransformStream)
.pipe(channel.sink);
channel.sink.on("acknowledged", console.log.bind(null, "Acknowledged message!"));
Emitted Events
AMQPStreams
- ready - AMQP client connected or reconnected
- error - Emitted if connection to broker dies
RabbitMQStream.init(2, options, function(err, streamifiedQueues) {
streamifiedQueues.on('error', function(err) {
console.error('socket disconnected!');
});
});
.source
- parseError - Emitted when a message specifies contentType: application/json but is malformed JSON.
myQueueStream.source.on("parseError", function(err, message) {
console.error("Problem JSON parsing message", message);
});
.sink
- acknowledged - Emitted everytime a message is acknowledged
- rejected - Emitted when a message is rejected
- requeued - Emitted when a message is requeued
var totalAcked = 0;
myQueueStream.source.on("acknowledged", function(message) {
console.log("Acknowledged:", message);
totalAcked++;
});
- formatError - Sink received a job that does not have the necessary information to be deleted from the queue. Most likely emitted when objects not originating from .source are written to sink.
myQueueStream.sink.on("formatError", function(err, message) {
console.error("Malformatted message written to .sink. Please check your pipeline configuration", message);
});
TODO
- Add a jsonMode to make automatic parsing of source data optional