rabbitmq-queue-stream
Advanced tools
Comparing version 0.1.4 to 0.1.5
32
index.js
@@ -47,7 +47,11 @@ var amqp = require("amqp"); | ||
options.connection.* - Anything accepted by amqp.createConnection | ||
options.url - amqp url | ||
options.queueStream.* - Anything accepted by connection.queue() | ||
options.connection.* - Anything accepted by amqp.createConnection (See: https://github.com/postwait/node-amqp#connection-options-and-url) | ||
options.url - amqp url | ||
options.queue.connection.* - Anything accepted by connection.queue() (See: https://github.com/postwait/node-amqp#connectionqueuename-options-opencallback) | ||
DEFAULT: { passive: true } | ||
options.queue.subscribe.* - Anything accepted by queue.subscribe() (See: https://github.com/postwait/node-amqp#queuesubscribeoptions-listener) | ||
DEFAULT: { ack: true, prefetchCount: 1 } | ||
*/ | ||
@@ -72,10 +76,10 @@ function AMQPStreams(numStreams, options) { | ||
var createWorker = function(n, cb) { | ||
AMQPStream.create(connection, me.__options.queueStream, cb); | ||
AMQPStream.create(connection, me.__options.queue, cb); | ||
}; | ||
async.timesSeries(me.__numStreams, createWorker, function(err, insightStreams) { | ||
async.timesSeries(me.__numStreams, createWorker, function(err, channels) { | ||
if(err) { | ||
return cb(err); | ||
} | ||
me.channels = insightStreams; | ||
me.channels = channels; | ||
cb(null, me); | ||
@@ -146,3 +150,3 @@ }); | ||
* corresponding to this queue stream. See AMQP#disconnect | ||
* to close the actual AMQP connection. You should safely | ||
* to close the actual AMQP connection. You should safely | ||
* unsubsribe all queues before disconnecting from the | ||
@@ -189,3 +193,3 @@ * AMQP server. | ||
/* | ||
* Resubscribe queue consumers. Should be called after | ||
* Resubscribe queue consumers. | ||
*/ | ||
@@ -219,4 +223,4 @@ AMQPStreams.prototype.resubscribeConsumers = function(cb) { | ||
streamInitDebug("Creating stream " + this._totalWorkers); | ||
var insightStream = new this(connection, options); | ||
insightStream.initialize(cb); | ||
var stream = new this(connection, options); | ||
stream.initialize(cb); | ||
}; | ||
@@ -257,3 +261,3 @@ | ||
}); | ||
this.__connection.queue(queueName, {passive: true}, function(queue) { | ||
this.__connection.queue(queueName, _.merge({passive: true}, this.__options.connection), function(queue) { | ||
streamInitDebug("Connected to queue " + queueName); | ||
@@ -268,8 +272,4 @@ me.__connection.removeAllListeners("error"); | ||
var queue = this.__queue; | ||
var subscribeOptions = {ack: true, prefetchCount: 1}; | ||
if(this.__options.prefetchCount) { | ||
subscribeOptions.prefetchCount = this.__options.prefetchCount; | ||
} | ||
/* TODO: Figure out how to error handle subscription. Maybe a 'once' error handler. */ | ||
queue.subscribe(subscribeOptions, function(message, headers, deliveryInfo, ack) { | ||
queue.subscribe(_.merge({ack: true, prefetchCount: 1}, this.__options.subscribe), function(message, headers, deliveryInfo, ack) { | ||
var serializableMessage = { | ||
@@ -276,0 +276,0 @@ body: message.data, |
{ | ||
"name": "rabbitmq-queue-stream", | ||
"version": "0.1.4", | ||
"version": "0.1.5", | ||
"description": "Reliable streaming interface to rabbitmq queues", | ||
"main": "index.js", | ||
"scripts": { | ||
"test": "make test" | ||
"test": "make test-codecov.io" | ||
}, | ||
@@ -9,0 +9,0 @@ "repository": { |
@@ -25,5 +25,10 @@ # rabbitmq-queue-stream | ||
}, | ||
queueStream: { | ||
queue: { | ||
name: "myQueue", | ||
prefetchCount: 100 | ||
subscribe: { | ||
/* Any option accepted by https://github.com/postwait/node-amqp#queuesubscribeoptions-listener */ | ||
}, | ||
connection: { | ||
/* Any option accepted by https://github.com/postwait/node-amqp#connectionqueuename-options-opencallback */ | ||
} | ||
} | ||
@@ -30,0 +35,0 @@ }; |
10
test.js
@@ -49,3 +49,3 @@ var EventEmitter = require("events").EventEmitter; | ||
var connection = {}; | ||
var queueStream = {}; | ||
var queue = {}; | ||
streamCreateStub.yields(null); | ||
@@ -55,3 +55,3 @@ createConnectionStub.yields(null, connection); | ||
var streams = new rabbitmq.AMQPStreams(6, { | ||
queueStream: queueStream | ||
queue: queue | ||
}); | ||
@@ -65,3 +65,3 @@ | ||
expect(argSet[0]).to.be(connection); | ||
expect(argSet[1]).to.be(queueStream); | ||
expect(argSet[1]).to.be(queue); | ||
}); | ||
@@ -75,3 +75,3 @@ done(err); | ||
var connection = {}; | ||
var queueStream = {}; | ||
var queue = {}; | ||
var worker = {}; | ||
@@ -83,3 +83,3 @@ | ||
var streams = new rabbitmq.AMQPStreams(6, { | ||
queueStream: queueStream | ||
queue: queue | ||
}); | ||
@@ -86,0 +86,0 @@ |
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
41205
133
952