Security News
Input Validation Vulnerabilities Dominate MITRE's 2024 CWE Top 25 List
MITRE's 2024 CWE Top 25 highlights critical software vulnerabilities like XSS, SQL Injection, and CSRF, reflecting shifts due to a refined ranking methodology.
rabbitmq-queue-stream
Advanced tools
$ make test
$ npm i rabbitmq-queue-stream
var RabbitMQStream = require("rabbitmq-queue-stream");
var stream = require("stream");
var options = {
connection: {
url: "amqp://user:password@rabbitmq.com"
},
nodeAmqp: {
reconnect: false // defaults to true, see https://github.com/postwait/node-amqp#connection-options-and-url (search for reconnect)
}
queue: {
name: "myQueue",
subscribe: {
/* Any option accepted by https://github.com/postwait/node-amqp#queuesubscribeoptions-listener */
},
connection: {
/* Any option accepted by https://github.com/postwait/node-amqp#connectionqueuename-options-opencallback */
}
}
};
/*
* Initialize two consumer channels to our queue.
*/
RabbitMQStream.init(2, options, function(err, streamifiedQueues) {
if(err) {
return console.error(err);
}
/*
* Each consumer channel comes with a .source and .sink property.
*
* .source is a Readable stream that gives us a stream of objects
* from the specified queue
*
* Every job written to .sink is deleted from the queue. Only object
* originating from .source should be written to .sink
*
*/
streamifiedQueues.channels.forEach(function(channel) {
var myProcessingStream = new stream.Transform({objectMode: true});
myProcessingStream._transform(function(obj, enc, next) {
/*
* Messages received from the source will have their data namespaced
* in the `obj.payload` property. `payload` will contain a parsed
* JSON object if clients specified contentType: application/json
* when enqueuing the message. Messages enqueued with contentType:
* application/json but are malformed will be automatically rejected.
* Add a listener to event `parseError`, which will be emitted by
* channel.source, to handle errors yourself.
*/
this.push(obj);
/*
* Messages are successfully acked and removed from the queue by default.
* RabbitMQStream provides methods to requeue and delete messages too.
*
* Requeue:
* this.push(RabbitMQStream.RequeueMessage(obj));
*
* Reject:
* this.push(RabbitMQStream.RejectMessage(obj));
*/
next();
});
channel.source
.pipe(myProcessingStream)
.pipe(channel.sink);
});
/* example graceful shutdown routine */
var gracefulShutdown = function() {
//stop fetching messages
streamifiedQueues.unsubscribeConsumers(function(err) {
if(err) {
//handle error
}
//Wait some time for queues to flush out before closing consumers.
streamifiedQueues.closeConsumers(function(err) {
if(err) {
//handle error
}
streamifiedQueues.disconnect(function(err) {
if(err) {
//handle error
}
process.exit(0);
});
});
});
};
});
There also a helper method that helps with integration test
var RabbitMQStream = require("rabbitmq-queue-stream");
var Transform = require("stream").Transform;
var myTransformStream = new Transform({objectMode: true});
myTransformStream._transform = function(item, enc, next) {
console.log("Transforming item:", item);
this.push(item);
next();
};
var streamifiedQueues = RabbitMQStream.createWithTestMessages([
"testMessage1",
{testMessage: "2"},
{testMessage: "3"}
]);
/*
* streamifiedQueues.channels will contain one channel with a
* streamable .source and .sink.
*/
var channel = streamifiedQueues.channels.shift();
channel.source
.pipe(myTransformStream)
.pipe(channel.sink);
//channel .sink emits 'requeued', 'rejected', and 'acknowledged' events
channel.sink.on("acknowledged", console.log.bind(null, "Acknowledged message!"));
RabbitMQStream.init(2, options, function(err, streamifiedQueues) {
streamifiedQueues.on('error', function(err) {
console.error('socket disconnected!');
});
});
myQueueStream.source.on("parseError", function(err, message) {
console.error("Problem JSON parsing message", message);
});
var totalAcked = 0;
myQueueStream.source.on("acknowledged", function(message) {
console.log("Acknowledged:", message);
totalAcked++;
});
myQueueStream.sink.on("formatError", function(err, message) {
console.error("Malformatted message written to .sink. Please check your pipeline configuration", message);
});
FAQs
Reliable streaming interface to rabbitmq queues
The npm package rabbitmq-queue-stream receives a total of 2 weekly downloads. As such, rabbitmq-queue-stream popularity was classified as not popular.
We found that rabbitmq-queue-stream demonstrated a not healthy version release cadence and project activity because the last version was released a year ago. It has 5 open source maintainers collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Security News
MITRE's 2024 CWE Top 25 highlights critical software vulnerabilities like XSS, SQL Injection, and CSRF, reflecting shifts due to a refined ranking methodology.
Security News
In this segment of the Risky Business podcast, Feross Aboukhadijeh and Patrick Gray discuss the challenges of tracking malware discovered in open source softare.
Research
Security News
A threat actor's playbook for exploiting the npm ecosystem was exposed on the dark web, detailing how to build a blockchain-powered botnet.