Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

rabbitmq-queue-stream

Package Overview
Dependencies
Maintainers
3
Versions
20
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

rabbitmq-queue-stream - npm Package Compare versions

Comparing version 0.1.4 to 0.1.5

32

index.js

@@ -47,7 +47,11 @@ var amqp = require("amqp");

options.connection.* - Anything accepted by amqp.createConnection
options.url - amqp url
options.queueStream.* - Anything accepted by connection.queue()
options.connection.* - Anything accepted by amqp.createConnection (See: https://github.com/postwait/node-amqp#connection-options-and-url)
options.url - amqp url
options.queue.connection.* - Anything accepted by connection.queue() (See: https://github.com/postwait/node-amqp#connectionqueuename-options-opencallback)
DEFAULT: { passive: true }
options.queue.subscribe.* - Anything accepted by queue.subscribe() (See: https://github.com/postwait/node-amqp#queuesubscribeoptions-listener)
DEFAULT: { ack: true, prefetchCount: 1 }
*/

@@ -72,10 +76,10 @@ function AMQPStreams(numStreams, options) {

var createWorker = function(n, cb) {
AMQPStream.create(connection, me.__options.queueStream, cb);
AMQPStream.create(connection, me.__options.queue, cb);
};
async.timesSeries(me.__numStreams, createWorker, function(err, insightStreams) {
async.timesSeries(me.__numStreams, createWorker, function(err, channels) {
if(err) {
return cb(err);
}
me.channels = insightStreams;
me.channels = channels;
cb(null, me);

@@ -146,3 +150,3 @@ });

* corresponding to this queue stream. See AMQP#disconnect
* to close the actual AMQP connection. You should safely
* to close the actual AMQP connection. You should safely
* unsubsribe all queues before disconnecting from the

@@ -189,3 +193,3 @@ * AMQP server.

/*
* Resubscribe queue consumers. Should be called after
* Resubscribe queue consumers.
*/

@@ -219,4 +223,4 @@ AMQPStreams.prototype.resubscribeConsumers = function(cb) {

streamInitDebug("Creating stream " + this._totalWorkers);
var insightStream = new this(connection, options);
insightStream.initialize(cb);
var stream = new this(connection, options);
stream.initialize(cb);
};

@@ -257,3 +261,3 @@

});
this.__connection.queue(queueName, {passive: true}, function(queue) {
this.__connection.queue(queueName, _.merge({passive: true}, this.__options.connection), function(queue) {
streamInitDebug("Connected to queue " + queueName);

@@ -268,8 +272,4 @@ me.__connection.removeAllListeners("error");

var queue = this.__queue;
var subscribeOptions = {ack: true, prefetchCount: 1};
if(this.__options.prefetchCount) {
subscribeOptions.prefetchCount = this.__options.prefetchCount;
}
/* TODO: Figure out how to error handle subscription. Maybe a 'once' error handler. */
queue.subscribe(subscribeOptions, function(message, headers, deliveryInfo, ack) {
queue.subscribe(_.merge({ack: true, prefetchCount: 1}, this.__options.subscribe), function(message, headers, deliveryInfo, ack) {
var serializableMessage = {

@@ -276,0 +276,0 @@ body: message.data,

{
"name": "rabbitmq-queue-stream",
"version": "0.1.4",
"version": "0.1.5",
"description": "Reliable streaming interface to rabbitmq queues",
"main": "index.js",
"scripts": {
"test": "make test"
"test": "make test-codecov.io"
},

@@ -9,0 +9,0 @@ "repository": {

@@ -25,5 +25,10 @@ # rabbitmq-queue-stream

},
queueStream: {
queue: {
name: "myQueue",
prefetchCount: 100
subscribe: {
/* Any option accepted by https://github.com/postwait/node-amqp#queuesubscribeoptions-listener */
},
connection: {
/* Any option accepted by https://github.com/postwait/node-amqp#connectionqueuename-options-opencallback */
}
}

@@ -30,0 +35,0 @@ };

@@ -49,3 +49,3 @@ var EventEmitter = require("events").EventEmitter;

var connection = {};
var queueStream = {};
var queue = {};
streamCreateStub.yields(null);

@@ -55,3 +55,3 @@ createConnectionStub.yields(null, connection);

var streams = new rabbitmq.AMQPStreams(6, {
queueStream: queueStream
queue: queue
});

@@ -65,3 +65,3 @@

expect(argSet[0]).to.be(connection);
expect(argSet[1]).to.be(queueStream);
expect(argSet[1]).to.be(queue);
});

@@ -75,3 +75,3 @@ done(err);

var connection = {};
var queueStream = {};
var queue = {};
var worker = {};

@@ -83,3 +83,3 @@

var streams = new rabbitmq.AMQPStreams(6, {
queueStream: queueStream
queue: queue
});

@@ -86,0 +86,0 @@

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc