
Research
Malicious npm Packages Impersonate Flashbots SDKs, Targeting Ethereum Wallet Credentials
Four npm packages disguised as cryptographic tools steal developer credentials and send them to attacker-controlled Telegram infrastructure.
rabbitmq-queue-stream
Advanced tools
$ make test
$ npm i rabbitmq-queue-stream
var RabbitMQStream = require("rabbitmq-queue-stream");
var stream = require("stream");
var options = {
connection: {
url: "amqp://user:password@rabbitmq.com"
},
nodeAmqp: {
reconnect: false // defaults to true, see https://github.com/postwait/node-amqp#connection-options-and-url (search for reconnect)
}
queue: {
name: "myQueue",
subscribe: {
/* Any option accepted by https://github.com/postwait/node-amqp#queuesubscribeoptions-listener */
},
connection: {
/* Any option accepted by https://github.com/postwait/node-amqp#connectionqueuename-options-opencallback */
}
}
};
/*
* Initialize two consumer channels to our queue.
*/
RabbitMQStream.init(2, options, function(err, streamifiedQueues) {
if(err) {
return console.error(err);
}
/*
* Each consumer channel comes with a .source and .sink property.
*
* .source is a Readable stream that gives us a stream of objects
* from the specified queue
*
* Every job written to .sink is deleted from the queue. Only object
* originating from .source should be written to .sink
*
*/
streamifiedQueues.channels.forEach(function(channel) {
var myProcessingStream = new stream.Transform({objectMode: true});
myProcessingStream._transform(function(obj, enc, next) {
/*
* Messages received from the source will have their data namespaced
* in the `obj.payload` property. `payload` will contain a parsed
* JSON object if clients specified contentType: application/json
* when enqueuing the message. Messages enqueued with contentType:
* application/json but are malformed will be automatically rejected.
* Add a listener to event `parseError`, which will be emitted by
* channel.source, to handle errors yourself.
*/
this.push(obj);
/*
* Messages are successfully acked and removed from the queue by default.
* RabbitMQStream provides methods to requeue and delete messages too.
*
* Requeue:
* this.push(RabbitMQStream.RequeueMessage(obj));
*
* Reject:
* this.push(RabbitMQStream.RejectMessage(obj));
*/
next();
});
channel.source
.pipe(myProcessingStream)
.pipe(channel.sink);
});
process.on("SIGTERM", function() {
streamifiedQueues.gracefulDisconnect(function(err) {
// process.exit
});
});
});
There also a helper method that helps with integration test
var RabbitMQStream = require("rabbitmq-queue-stream");
var Transform = require("stream").Transform;
var myTransformStream = new Transform({objectMode: true});
myTransformStream._transform = function(item, enc, next) {
console.log("Transforming item:", item);
this.push(item);
next();
};
var streamifiedQueues = RabbitMQStream.createWithTestMessages([
"testMessage1",
{testMessage: "2"},
{testMessage: "3"}
]);
/*
* streamifiedQueues.channels will contain one channel with a
* streamable .source and .sink.
*/
var channel = streamifiedQueues.channels.shift();
channel.source
.pipe(myTransformStream)
.pipe(channel.sink);
//channel .sink emits 'requeued', 'rejected', and 'acknowledged' events
channel.sink.on("acknowledged", console.log.bind(null, "Acknowledged message!"));
RabbitMQStream.init(2, options, function(err, streamifiedQueues) {
streamifiedQueues.on('error', function(err) {
console.error('socket disconnected!');
});
});
myQueueStream.source.on("parseError", function(err, message) {
console.error("Problem JSON parsing message", message);
});
var totalAcked = 0;
myQueueStream.source.on("acknowledged", function(message) {
console.log("Acknowledged:", message);
totalAcked++;
});
myQueueStream.sink.on("formatError", function(err, message) {
console.error("Malformatted message written to .sink. Please check your pipeline configuration", message);
});
FAQs
Reliable streaming interface to rabbitmq queues
The npm package rabbitmq-queue-stream receives a total of 22 weekly downloads. As such, rabbitmq-queue-stream popularity was classified as not popular.
We found that rabbitmq-queue-stream demonstrated a not healthy version release cadence and project activity because the last version was released a year ago. It has 5 open source maintainers collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Research
Four npm packages disguised as cryptographic tools steal developer credentials and send them to attacker-controlled Telegram infrastructure.
Security News
Ruby maintainers from Bundler and rbenv teams are building rv to bring Python uv's speed and unified tooling approach to Ruby development.
Security News
Following last week’s supply chain attack, Nx published findings on the GitHub Actions exploit and moved npm publishing to Trusted Publishers.