Security News
Opengrep Emerges as Open Source Alternative Amid Semgrep Licensing Controversy
Opengrep forks Semgrep to preserve open source SAST in response to controversial licensing changes.
rabbitmq-queue-stream
Advanced tools
$ make test
$ npm i rabbitmq-queue-stream
var RabbitMQStream = require("rabbitmq-queue-stream");
var stream = require("stream");
var options = {
connection: {
url: "amqp://user:password@rabbitmq.com"
},
nodeAmqp: {
reconnect: false // defaults to true, see https://github.com/postwait/node-amqp#connection-options-and-url (search for reconnect)
}
queue: {
name: "myQueue",
subscribe: {
/* Any option accepted by https://github.com/postwait/node-amqp#queuesubscribeoptions-listener */
},
connection: {
/* Any option accepted by https://github.com/postwait/node-amqp#connectionqueuename-options-opencallback */
}
}
};
/*
* Initialize two consumer channels to our queue.
*/
RabbitMQStream.init(2, options, function(err, streamifiedQueues) {
if(err) {
return console.error(err);
}
/*
* Each consumer channel comes with a .source and .sink property.
*
* .source is a Readable stream that gives us a stream of objects
* from the specified queue
*
* Every job written to .sink is deleted from the queue. Only object
* originating from .source should be written to .sink
*
*/
streamifiedQueues.channels.forEach(function(channel) {
var myProcessingStream = new stream.Transform({objectMode: true});
myProcessingStream._transform(function(obj, enc, next) {
/*
* Messages received from the source will have their data namespaced
* in the `obj.payload` property. `payload` will contain a parsed
* JSON object if clients specified contentType: application/json
* when enqueuing the message. Messages enqueued with contentType:
* application/json but are malformed will be automatically rejected.
* Add a listener to event `parseError`, which will be emitted by
* channel.source, to handle errors yourself.
*/
this.push(obj);
/*
* Messages are successfully acked and removed from the queue by default.
* RabbitMQStream provides methods to requeue and delete messages too.
*
* Requeue:
* this.push(RabbitMQStream.RequeueMessage(obj));
*
* Reject:
* this.push(RabbitMQStream.RejectMessage(obj));
*/
next();
});
channel.source
.pipe(myProcessingStream)
.pipe(channel.sink);
});
process.on("SIGTERM", function() {
streamifiedQueues.gracefulDisconnect(function(err) {
// process.exit
});
});
});
There also a helper method that helps with integration test
var RabbitMQStream = require("rabbitmq-queue-stream");
var Transform = require("stream").Transform;
var myTransformStream = new Transform({objectMode: true});
myTransformStream._transform = function(item, enc, next) {
console.log("Transforming item:", item);
this.push(item);
next();
};
var streamifiedQueues = RabbitMQStream.createWithTestMessages([
"testMessage1",
{testMessage: "2"},
{testMessage: "3"}
]);
/*
* streamifiedQueues.channels will contain one channel with a
* streamable .source and .sink.
*/
var channel = streamifiedQueues.channels.shift();
channel.source
.pipe(myTransformStream)
.pipe(channel.sink);
//channel .sink emits 'requeued', 'rejected', and 'acknowledged' events
channel.sink.on("acknowledged", console.log.bind(null, "Acknowledged message!"));
RabbitMQStream.init(2, options, function(err, streamifiedQueues) {
streamifiedQueues.on('error', function(err) {
console.error('socket disconnected!');
});
});
myQueueStream.source.on("parseError", function(err, message) {
console.error("Problem JSON parsing message", message);
});
var totalAcked = 0;
myQueueStream.source.on("acknowledged", function(message) {
console.log("Acknowledged:", message);
totalAcked++;
});
myQueueStream.sink.on("formatError", function(err, message) {
console.error("Malformatted message written to .sink. Please check your pipeline configuration", message);
});
FAQs
Reliable streaming interface to rabbitmq queues
The npm package rabbitmq-queue-stream receives a total of 9 weekly downloads. As such, rabbitmq-queue-stream popularity was classified as not popular.
We found that rabbitmq-queue-stream demonstrated a not healthy version release cadence and project activity because the last version was released a year ago. It has 5 open source maintainers collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Security News
Opengrep forks Semgrep to preserve open source SAST in response to controversial licensing changes.
Security News
Critics call the Node.js EOL CVE a misuse of the system, sparking debate over CVE standards and the growing noise in vulnerability databases.
Security News
cURL and Go security teams are publicly rejecting CVSS as flawed for assessing vulnerabilities and are calling for more accurate, context-aware approaches.