Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

rabbitmq-queue-stream

Package Overview
Dependencies
Maintainers
3
Versions
20
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

rabbitmq-queue-stream - npm Package Compare versions

Comparing version 0.1.8 to 0.1.9

2

index.js

@@ -341,2 +341,3 @@ var amqp = require("amqp");

}
next();
};

@@ -361,3 +362,2 @@ this.source = queueStream.pipe(prepareMessage);

}
/* TODO: How do we handle errors from acking? */
var evt;

@@ -364,0 +364,0 @@ if(message._meta.requeue) {

{
"name": "rabbitmq-queue-stream",
"version": "0.1.8",
"version": "0.1.9",
"description": "Reliable streaming interface to rabbitmq queues",

@@ -5,0 +5,0 @@ "main": "index.js",

@@ -750,2 +750,56 @@ var EventEmitter = require("events").EventEmitter;

});
});
describe("integration test", function() {
/* Integration test by injecting fake amqp messages into _handleMessage. Cause
The acks to queue the next message into the system.
*/
var streamInstance;
var message1 = {_id: "1"};
var message2 = {_id: "2"};
var message3 = {_id: "3"};
var payloads = [message1, message2, message3];
//ack stub defers control to something that injects a new message until
var ackStub = function(instance) {
return {
acknowledge: function() {
if(payloads.length) {
setTimeout(function() {
injectNewMessage(instance);
}, 10);
}
}
};
};
var injectNewMessage = function(instance) {
instance._handleIncomingMessage(payloads.shift(), {}, {contentType: "application/json"}, ackStub(instance));
};
before(function(done) {
var connection = new EventEmitter();
streamInstance = new rabbitmq.AMQPStream(connection);
//setup .source and .sink properties
streamInstance._streamifyQueue(done);
});
it("pipes all outstanding messages received by rabbit downstream when properly acked", function(done) {
var receivedMessages = [];
var collector = new stream.Transform({objectMode: true});
collector._transform = function(obj, enc, next) {
receivedMessages.push(obj);
if(receivedMessages.length === 3) {
done();
}
this.push(obj);
next();
};
streamInstance.source.pipe(collector).pipe(streamInstance.sink);
injectNewMessage(streamInstance);
});
});
});
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc