Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

rabbitmq-queue-stream

Package Overview
Dependencies
Maintainers
3
Versions
20
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

rabbitmq-queue-stream - npm Package Compare versions

Comparing version 0.1.6 to 0.1.8

95

index.js

@@ -26,3 +26,4 @@ var amqp = require("amqp");

exports.RequeueMessage = function(message) {
var RequeueMessage = function(message) {
if(!message._meta) {

@@ -35,3 +36,3 @@ return console.error();

exports.RejectMessage = function(message) {
var RejectMessage = function(message) {
if(!message._meta) {

@@ -41,5 +42,9 @@ return console.error();

message._meta.reject = true;
return message;
};
exports.RequeueMessage = RequeueMessage;
exports.RejectMessage = RejectMessage;
/*

@@ -270,18 +275,6 @@

/* TODO: Figure out how to error handle subscription. Maybe a 'once' error handler. */
queue.subscribe(_.merge({ack: true, prefetchCount: 1}, this.__options.subscribe), function(message, headers, deliveryInfo, ack) {
var serializableMessage = {
body: message.data,
headers: headers,
deliveryInfo: deliveryInfo
};
/*
* ack is not serializable, so we need to push it
* onto the outstandingAck array right now and attach
* an ackIndex number to the message
*/
serializableMessage.ackIndex = me._insertAckIntoArray(ack);
streamDebug("Received message. Inserted ack into index " + serializableMessage.ackIndex);
me.__pendingQueue.push(serializableMessage);
}).addCallback(function(ok) {
queue.subscribe(
_.merge({ack: true, prefetchCount: 1}, this.__options.subscribe),
this._handleIncomingMessage.bind(this)
).addCallback(function(ok) {
streamDebug("Subscribed with consumer tag: " + ok.consumerTag);

@@ -294,2 +287,32 @@ me.__consumerTag = ok.consumerTag;

AMQPStream.prototype._handleIncomingMessage = function(message, headers, deliveryInfo, ack) {
var isJSON = deliveryInfo.contentType === "application/json";
var serializableMessage = {
data: isJSON ? message : message.data,
headers: headers,
deliveryInfo: deliveryInfo,
_meta: {
/*
* ack is not serializable, so we need to push it
* onto the outstandingAck array attach
* an ackIndex number to the message
*/
ackIndex: this._insertAckIntoArray(ack)
}
};
if(isJSON && deliveryInfo.parseError) {
if(this.source.listeners('parseError').length) {
return this.source.emit("parseError", deliveryInfo.parseError, deliveryInfo.rawData);
} else {
streamDebug("Automatically rejecting malformed message. " +
"Add listener to 'parseError' for custom behavior");
return this.sink.write(RejectMessage(serializableMessage));
}
}
streamDebug("Received message. Inserted ack into index " + serializableMessage._meta.ackIndex);
this.__pendingQueue.push(serializableMessage);
};
AMQPStream.prototype._streamifyQueue = function(cb) {

@@ -299,5 +322,9 @@ var queueStream, prepareMessage;

var queue = this.__queue;
var sink;
streamInitDebug("Creating queue source");
/* Create the .source ReadableStream */
queueStream = new Readable({objectMode: true});

@@ -311,36 +338,16 @@ queueStream._read = function() {

/*
TODO: add json mode
Transform gets a messages:
{
body: String. In our case JSON parsable.
headers: ,
deliveryInfo: ,
ackIndex: Number
}
This only pushes down the parsed body. It attaches the worker
id into meta
*/
streamInitDebug("Creating readable queue stream");
prepareMessage = new Transform({objectMode: true});
prepareMessage._transform = function(message, enc, next) {
var parsedBody;
systemDebug("_transform prepareMessage");
try {
parsedBody = JSON.parse(message.body.toString());
} catch(e) {
return this.emit("parseError", e, message);
if(_.isPlainObject(message.data)) {
this.push(_.merge(message.data, {_meta: message._meta}));
} else {
this.push(_.pick(message, "data", "_meta"));
}
parsedBody._meta = parsedBody._meta || {};
parsedBody._meta.ackIndex = message.ackIndex;
this.push(parsedBody);
next();
};
this.source = queueStream.pipe(prepareMessage);
//add sink to instance
var sink;
/* Create the .sink WritableStream */
streamInitDebug("Creating queue sink");

@@ -347,0 +354,0 @@ sink = new Writable({objectMode: true});

{
"name": "rabbitmq-queue-stream",
"version": "0.1.6",
"version": "0.1.8",
"description": "Reliable streaming interface to rabbitmq queues",

@@ -5,0 +5,0 @@ "main": "index.js",

@@ -282,3 +282,14 @@ var EventEmitter = require("events").EventEmitter;

var instance, connection, amqpResponseStub;
beforeEach(function() {
connection = new EventEmitter();
connection.queue = sinon.stub();
instance = new rabbitmq.AMQPStream(connection);
amqpResponseStub = sinon.stub({
acknowledge: function() {},
reject: function() {}
});
});
describe("AMQPStream.create", function() {

@@ -310,3 +321,2 @@

var options = {};
var connection = {};
var callback = sinon.stub();

@@ -324,6 +334,5 @@ rabbitmq.AMQPStream.create(connection, options, callback);

var instance, connection, options, queue, connectToQueueStub, subscribeToQueueStub, streamifyQueueStub;
var instance, options, queue, connectToQueueStub, subscribeToQueueStub, streamifyQueueStub;
beforeEach(function() {
connection = {};
options = {

@@ -380,9 +389,7 @@ name: "myStream",

var instance, connection, cb;
var cb;
beforeEach(function() {
cb = sinon.stub();
connection = new EventEmitter();
connection.queue = sinon.stub();
instance = new rabbitmq.AMQPStream(connection);
// instance = new rabbitmq.AMQPStream(connection);
});

@@ -417,7 +424,57 @@

it("passes the `connection` option to the underlying driver for queue initialization", function() {
instance = new rabbitmq.AMQPStream(connection, {connection: {passive: false}});
instance.__connection = new EventEmitter();
instance.__connection.queue = sinon.stub();
instance._connectToQueue("myQueue", function() {});
expect(instance.__connection.queue.args[0][1]).to.eql({passive: false});
});
});
describe("#_handleIncomingMessage", function() {
beforeEach(function() {
//setup .source and .sink
instance._streamifyQueue(function() {});
});
describe("when contentType === 'application/json'", function() {
var message = {};
var headers = {};
var deliveryInfo = {
parseError: new Error(),
rawData: '{"malformed":',
contentType: 'application/json',
headers: {},
deliveryMode: 1,
queue: 'some-queue',
deliveryTag: new Buffer("delivery-tag"),
redelivered: false,
exchange: '',
routingKey: 'some-queue',
consumerTag: 'consumer-tag'
};
it("automatically rejects any malformed message when no event listeners exist on 'parseError'", function (done) {
instance.sink.on("rejected", function(msg) {
done();
});
instance._handleIncomingMessage(null, {}, deliveryInfo, amqpResponseStub);
});
it("emits a `parseError` event on invalid JSON when there are event listeners", function (done) {
instance.source.on("parseError", function(msg) {
done();
});
instance._handleIncomingMessage(null, {}, deliveryInfo, amqpResponseStub);
});
});
});
describe("#_streamifyQueue", function() {
var instance, cb, writable, readable;
var cb, writable, readable;

@@ -432,18 +489,5 @@ beforeEach(function () {

describe("stream.source", function () {
it("emits a `parseError` event on invalid JSON in message.body", function (done) {
var badMessage = {};
instance._waitForMessage = sinon.stub();
instance._waitForMessage.onCall(0).yields(badMessage);
instance._streamifyQueue(cb);
instance.source.on("parseError", function (err, msg) {
expect(err).to.be.an(Error);
expect(msg).to.be(badMessage);
done();
});
instance.source.pipe(writable);
});
it("parses message, adds ackIndex, pushes downstream", function (done) {
instance._waitForMessage = sinon.stub();
instance._waitForMessage.onCall(0).yields({body: '{"something": "somethingElse"}', ackIndex: 10});
instance._waitForMessage.onCall(0).yields({data: {"something": "somethingElse"}, _meta: { ackIndex: 10 }});

@@ -458,13 +502,16 @@ writable._write = function (message) {

});
it("passes the `subscribe` option properly to the underlying driver", function () {
instance = new rabbitmq.AMQPStream(connection, {subscribe: {prefetchCount: 100}});
instance.__queue = {subscribe: sinon.stub()}; //.onFirstCall().returns({addCallback: function() {}});
instance.__queue.subscribe.onFirstCall().returns({addCallback: function() {}});
instance._subscribeToQueue();
expect(instance.__queue.subscribe.args[0][0]).to.eql({ack: true, prefetchCount: 100});
});
});
describe("stream.sink", function () {
var amqpResponseStub;
var goodMessage;
beforeEach(function() {
amqpResponseStub = sinon.stub({
acknowledge: function() {},
reject: function() {}
});
goodMessage = {_meta: {ackIndex: 1}};

@@ -471,0 +518,0 @@ });

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc