Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

rabbitmq-queue-stream

Package Overview
Dependencies
Maintainers
4
Versions
20
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

rabbitmq-queue-stream - npm Package Compare versions

Comparing version 0.2.1 to 0.3.0

72

index.js

@@ -0,17 +1,13 @@

var _ = require("lodash");
var amqp = require("amqp");
var async = require("async");
var _ = require("lodash");
var streamsDebug = require("debug")("amqp-streams");
var streamInitDebug = require("debug")("amqp-stream-init");
var streamDebug = require("debug")("amqp-stream"); //stream runtime debug
var systemDebug = require("debug")("system");
var debug = require("debug");
var Readable = require("stream").Readable;
var Writable = require("stream").Writable;
var Transform = require("stream").Transform;
var EventEmitter = require('events').EventEmitter;
var EventEmitter = require('events').EventEmitter;
var Transform = require("stream").Transform;
var Readable = require("stream").Readable;
var Writable = require("stream").Writable;
var streamsDebug = debug("rabbitmq-queue-stream:streams");
exports.init = function(numStreams, options, cb) {

@@ -27,3 +23,2 @@ if(!cb) {

var RequeueMessage = function(message) {

@@ -74,3 +69,2 @@ if(!message._meta) {

AMQPStreams.prototype.initialize = function(cb) {

@@ -232,3 +226,3 @@ streamsDebug("Initializing " + this.__numStreams + " streams");

if(_.contains(err.message, "ECONNRESET")) {
streamDebug("Ignoring ECONNRESET error");
streamsDebug("Ignoring ECONNRESET error");
return;

@@ -267,3 +261,3 @@ }

*/
function AMQPStream(connection, options) {
function AMQPStream(connection, options, workerNum) {
this.__connection = connection;

@@ -273,9 +267,9 @@ this.__options = options || {};

this.__pendingQueue = [];
this.__debug = debug("rabbitmq-queue-stream:worker:" + (workerNum || "-"));
}
AMQPStream.create = function(connection, options, cb) {
this._totalWorkers = this._totalWorkers || 0;
this._totalWorkers++;
streamInitDebug("Creating stream " + this._totalWorkers);
var stream = new this(connection, options);
this.__totalWorkers = this.__totalWorkers || 0;
this.__totalWorkers++;
var stream = new this(connection, options, this.__totalWorkers);
stream.initialize(cb);

@@ -286,3 +280,3 @@ };

var me = this;
streamInitDebug("Initializing");
this.__debug("Initializing");
if(!this.__options.name) {

@@ -303,3 +297,3 @@ throw new Error("You must provide a `name` to queueStream options object");

if(err) {
streamInitDebug("Error subscribe to queue " + this.__options.name + ". " + err.message);
me.__debug("Error subscribe to queue " + this.__options.name + ". " + err.message);
return cb(err);

@@ -316,3 +310,3 @@ }

function onError(err) {
streamInitDebug("Error connecting to queue " + queueName + ": " + err.message);
me.__debug("Error connecting to queue " + queueName + ": " + err.message);
return cb(err);

@@ -322,3 +316,3 @@ }

this.__connection.queue(queueName, _.merge({passive: true}, this.__options.connection), function(queue) {
streamInitDebug("Connected to queue " + queueName);
me.__debug("Connected to queue " + queueName);
me.__connection.removeListener("error", onError);

@@ -337,3 +331,3 @@ return cb(null, queue);

).addCallback(function(ok) {
streamDebug("Subscribed with consumer tag: " + ok.consumerTag);
me.__debug("Subscribed with consumer tag: " + ok.consumerTag);
me.__consumerTag = ok.consumerTag;

@@ -366,3 +360,3 @@ me.subscribed = true;

} else {
streamDebug("Automatically rejecting malformed message. " +
this.__debug("Automatically rejecting malformed message. " +
"Add listener to 'parseError' for custom behavior");

@@ -372,3 +366,3 @@ return this.sink.write(RejectMessage(serializableMessage));

}
streamDebug("Received message. Inserted ack into index " + serializableMessage._meta.ackIndex);
this.__debug("Received message. Inserted ack into index " + serializableMessage._meta.ackIndex);
this.__pendingQueue.push(serializableMessage);

@@ -383,10 +377,7 @@ };

streamInitDebug("Creating queue source");
/* Create the .source ReadableStream */
queueStream = new Readable({objectMode: true});
queueStream._read = function() {
systemDebug("_read queueStream");
me.__debug("_read .source");
me._waitForMessage(function(message) {

@@ -397,6 +388,4 @@ this.push(message);

streamInitDebug("Creating readable queue stream");
prepareMessage = new Transform({objectMode: true});
prepareMessage._transform = function(message, enc, next) {
systemDebug("_transform prepareMessage");
this.push(_.pick(message, "payload", "_meta"));

@@ -408,8 +397,7 @@ next();

/* Create the .sink WritableStream */
streamInitDebug("Creating queue sink");
sink = new Writable({objectMode: true});
sink._write = function(message, enc, next) {
systemDebug("_write sink");
me.__debug("_write .sink");
if(!message._meta || !_.isNumber(message._meta.ackIndex)) {
streamDebug("Could not find ackIndex in message " + message);
me.__debug("Could not find ackIndex in message " + message);
return this.emit("formatError", new Error("No ack index for message"), message);

@@ -420,3 +408,3 @@ }

//something went wrong and we can't ack message
streamDebug("Could not find ack function for " + message);
me.__debug("Could not find ack function for " + message);
return this.emit("ackError", new Error("Cannot find ack for message."), message);

@@ -448,7 +436,7 @@ }

if(_.isEmpty(this.__pendingQueue)) {
streamDebug("Waiting for message");
this.__debug("Waiting for message");
i = setInterval(function() {
if(!_.isEmpty(me.__pendingQueue)) {
clearInterval(i);
streamDebug("Received messages. Continuing...");
me.__debug("Received messages. Continuing...");
return cb(me.__pendingQueue.shift());

@@ -458,3 +446,3 @@ }

} else {
streamDebug("Dequeueing pending message");
this.__debug("Dequeueing pending message");
cb(this.__pendingQueue.shift());

@@ -481,3 +469,3 @@ }

var me = this;
streamDebug("Unsubscribing with consumerTag " + this.__consumerTag);
this.__debug("Unsubscribing with consumerTag " + this.__consumerTag);
if(this.subscribed) {

@@ -489,3 +477,3 @@ this.__queue.unsubscribe(this.__consumerTag).addCallback(function() {

} else {
streamDebug("Worker already unsubscribed");
this.__debug("Worker already unsubscribed");
cb();

@@ -497,3 +485,3 @@ }

var me = this;
streamDebug("Unsubscribing with consumerTag " + this.__consumerTag);
this.__debug("Unsubscribing with consumerTag " + this.__consumerTag);
this.__queue.close(this.__consumer);

@@ -500,0 +488,0 @@ var closeHandler = function() {

{
"name": "rabbitmq-queue-stream",
"version": "0.2.1",
"version": "0.3.0",
"description": "Reliable streaming interface to rabbitmq queues",

@@ -5,0 +5,0 @@ "main": "index.js",

@@ -438,3 +438,3 @@ var EventEmitter = require("events").EventEmitter;

rabbitmq.AMQPStream.create();
expect(rabbitmq.AMQPStream._totalWorkers).to.equal(3);
expect(rabbitmq.AMQPStream.__totalWorkers).to.equal(3);
});

@@ -464,3 +464,3 @@

queue = new EventEmitter();
instance = new rabbitmq.AMQPStream(connection, options);
instance = new rabbitmq.AMQPStream(connection, options, 1);
connectToQueueStub = sinon.stub(rabbitmq.AMQPStream.prototype, "_connectToQueue");

@@ -467,0 +467,0 @@ subscribeToQueueStub = sinon.stub(rabbitmq.AMQPStream.prototype, "_subscribeToQueue");

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc