Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

rabbitmq-queue-stream

Package Overview
Dependencies
Maintainers
4
Versions
20
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

rabbitmq-queue-stream - npm Package Compare versions

Comparing version 0.3.0 to 0.4.0

43

index.js

@@ -259,2 +259,3 @@ var _ = require("lodash");

function AMQPStream(connection, options, workerNum) {
this.id = workerNum;
this.__connection = connection;

@@ -480,4 +481,46 @@ this.__options = options || {};

/*
* Test helper that responds with with a mock `source`
* and `sink` properties. Only returns one channel.
*/
exports.createWithTestMessages = function(testMessages) {
var stubSource = new Readable({objectMode: true});
stubSource._read = function() {
var itemWrapper;
var nextItem = testMessages.shift();
if(nextItem) {
itemWrapper = {
headers: {},
deliveryInfo: {},
payload: nextItem,
_meta: {}
};
}
this.push(itemWrapper);
};
var stubSink = new Writable({objectMode: true});
stubSink._write = function(item, enc, next) {
var evt;
if(item._meta.requeue) {
evt = "requeued";
} else if(item._meta.reject) {
evt = "rejected";
} else {
evt = "acknowledged";
}
this.emit(evt, item);
next();
};
return {
channels: [{
source: stubSource,
sink: stubSink
}]
};
};
/* export for testing */
exports.AMQPStreams = AMQPStreams;
exports.AMQPStream = AMQPStream;

2

package.json
{
"name": "rabbitmq-queue-stream",
"version": "0.3.0",
"version": "0.4.0",
"description": "Reliable streaming interface to rabbitmq queues",

@@ -5,0 +5,0 @@ "main": "index.js",

@@ -114,3 +114,31 @@ # rabbitmq-queue-stream

```
There also a helper method that helps with integration test
```javascript
var RabbitMQStream = require("rabbitmq-queue-stream");
var Transform = require("stream").Transform;
var myTransformStream = new Transform({objectMode: true});
myTransformStream._transform = function(item, enc, next) {
console.log("Transforming item:", item);
this.push(item);
next();
};
var streamifiedQueues = RabbitMQStream.createWithTestMessages([
"testMessage1",
{testMessage: "2"},
{testMessage: "3"}
]);
/*
* streamifiedQueues.channels will contain one channel with a
* streamable .source and .sink.
*/
var channel = streamifiedQueues.channels.shift();
channel.source
.pipe(myTransformStream)
.pipe(channel.sink);
//channel .sink emits 'requeued', 'rejected', and 'acknowledged' events
channel.sink.on("acknowledged", console.log.bind(null, "Acknowledged message!"));
```
### Emitted Events

@@ -117,0 +145,0 @@

@@ -931,2 +931,61 @@ var EventEmitter = require("events").EventEmitter;

describe("#createWithTestMessages", function() {
var testMessages, collector, collectedMessages, channel;
beforeEach(function() {
testMessages = [
"testMessage1",
{testMessage: "2"},
{testMessage: "3"}
];
collectedMessages = [];
collector = new stream.Transform({objectMode: true});
collector._transform = function(item, enc, next) {
collectedMessages.push(item);
this.push();
next();
};
var streamifiedQueues = rabbitmq.createWithTestMessages(_.clone(testMessages, true));
channel = streamifiedQueues.channels.shift();
});
it("returns a stub that gives you the messages that you set in the `payload` field", function(done) {
channel.source
.pipe(collector)
.pipe(channel.sink);
collector.on("end", function() {
var receivedMessages = collectedMessages.map(function(m) { return m.payload; });
expect(receivedMessages).to.eql(testMessages);
done();
});
});
it(".sink emits 'requeued', 'rejected', and 'acknowledged' events", function(done) {
var events = ["requeued", "rejected", "acknowledged"];
var emittedEvents = [];
var handler = new stream.Transform({objectMode: true});
var queueActions = ["RequeueMessage", "RejectMessage"];
handler._transform = function(item, enc, cb) {
if(queueActions.length) {
this.push(rabbitmq[queueActions.shift()](item));
} else {
this.push(item);
}
cb();
};
events.forEach(function(evt) {
channel.sink.on(evt, emittedEvents.push.bind(emittedEvents));
});
channel.source
.pipe(handler)
.pipe(channel.sink);
handler.on("end", function() {
expect(emittedEvents.length).to.be(3);
done();
});
});
});
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc