rabbitmq-queue-stream
Advanced tools
Comparing version 0.1.1 to 0.1.2
10
index.js
@@ -39,3 +39,3 @@ var amqp = require("amqp"); | ||
this.__options = options; | ||
this.streams = []; | ||
this.channels = []; | ||
} | ||
@@ -62,3 +62,3 @@ | ||
} | ||
me.streams = insightStreams; | ||
me.channels = insightStreams; | ||
cb(null, me); | ||
@@ -120,3 +120,3 @@ }); | ||
//close every worker stream | ||
async.eachSeries(this.streams, function(stream, next) { | ||
async.eachSeries(this.channels, function(stream, next) { | ||
stream.unsubscribe(next); | ||
@@ -136,3 +136,3 @@ }, cb); | ||
AMQPStreams.prototype.closeConsumers = function(cb) { | ||
async.eachSeries(this.streams, function(stream, next) { | ||
async.eachSeries(this.channels, function(stream, next) { | ||
stream.close(next); | ||
@@ -176,3 +176,3 @@ }, cb); | ||
AMQPStreams.prototype.resubscribeConsumers = function(cb) { | ||
async.eachSeries(this.streams, function(stream, next) { | ||
async.eachSeries(this.channels, function(stream, next) { | ||
if(!stream.subscribed) { | ||
@@ -179,0 +179,0 @@ stream._subscribeToQueue(next); |
{ | ||
"name": "rabbitmq-queue-stream", | ||
"version": "0.1.1", | ||
"version": "0.1.2", | ||
"description": "Reliable streaming interface to rabbitmq queues", | ||
@@ -5,0 +5,0 @@ "main": "index.js", |
@@ -15,3 +15,3 @@ # rabbitmq-queue-stream | ||
var RabbitMQStream = require("rabbitmq-queue-stream"); | ||
var Transform = require("stream").Transform; | ||
var stream = require("stream"); | ||
@@ -36,4 +36,2 @@ var options = { | ||
var queueStreams = streamifiedQueues.sources; | ||
/* | ||
@@ -50,6 +48,6 @@ * Each consumer channel comes with a .source and .sink property. | ||
streamifiedQueues.sources.forEach(function(myQueueStream) { | ||
streamifiedQueues.channels.forEach(function(channel) { | ||
var doSomethingWithData = new Transform({objectMode: true}); | ||
doSomethingWithData._transform(function(data, enc, next) { | ||
var myProcessingStream = new stream.Transform({objectMode: true}); | ||
myProcessingStream._transform(function(data, enc, next) { | ||
console.log("Doing something with", data); | ||
@@ -60,5 +58,5 @@ this.push(data); | ||
myQueueStream.source | ||
.pipe(doSomethingWithData) | ||
.pipe(myQueueStream.sink); | ||
channel.source | ||
.pipe(myProcessingStream) | ||
.pipe(channel.sink); | ||
}); | ||
@@ -73,3 +71,3 @@ | ||
} | ||
//Wait some time for queues to flush out. Then close Consumers | ||
//Wait some time for queues to flush out before closing consumers. | ||
streamifiedQueues.closeConsumers(function(err) { | ||
@@ -91,3 +89,3 @@ if(err) { | ||
### Events | ||
### Emitted Events | ||
@@ -101,2 +99,3 @@ #### .source | ||
``` | ||
#### .sink | ||
@@ -110,4 +109,4 @@ * deleted - Emitted everytime a job is deleted from the queue | ||
``` | ||
* formatError - Sink received a job that does not have the necessary information to be deleted from the queue. | ||
Most likely emitted when objects not originating from .source are written to sink. | ||
* formatError - Sink received a job that does not have the necessary information to be deleted from the queue. | ||
Most likely emitted when objects not originating from .source are written to sink. | ||
```javascript | ||
@@ -114,0 +113,0 @@ myQueueStream.sink.on("formatError", function(err, message) { |
32
test.js
@@ -70,3 +70,3 @@ var EventEmitter = require("events").EventEmitter; | ||
it("sets me.streams if successful", function (done) { | ||
it("sets channels if successful", function (done) { | ||
var connection = {}; | ||
@@ -85,5 +85,5 @@ var queueStream = {}; | ||
expect(result).to.be(streams); | ||
expect(streams.streams).to.have.length(6); | ||
streams.streams.forEach( function(item) { | ||
expect(item).to.be(worker); | ||
expect(streams.channels).to.have.length(6); | ||
streams.channels.forEach( function(channel) { | ||
expect(channel).to.be(worker); | ||
}); | ||
@@ -174,11 +174,11 @@ done(err); | ||
}); | ||
amqp.streams = streams; | ||
amqp.channels = streams; | ||
}); | ||
describe("#unsubscribeConsumers", function() { | ||
it("calls #unsubscribe for every stream in AMQPStreams.streams", function(done) { | ||
it("calls #unsubscribe for every stream in AMQPStreams.channels", function(done) { | ||
amqp.unsubscribeConsumers(function(err) { | ||
expect(err).to.not.be.ok(); | ||
amqp.streams.forEach(function(stream) { | ||
expect(stream.unsubscribe.callCount).to.be(1); | ||
amqp.channels.forEach(function(channel) { | ||
expect(channel.unsubscribe.callCount).to.be(1); | ||
}); | ||
@@ -192,7 +192,7 @@ done(); | ||
describe("#closeConsumers", function() { | ||
it("calls #close on every stream in AMQPStreams.streams", function(done) { | ||
it("calls #close on every stream in AMQPStreams.channels", function(done) { | ||
amqp.closeConsumers(function(err) { | ||
expect(err).to.not.be.ok(); | ||
amqp.streams.forEach(function(stream) { | ||
expect(stream.close.callCount).to.be(1); | ||
amqp.channels.forEach(function(channel) { | ||
expect(channel.close.callCount).to.be(1); | ||
}); | ||
@@ -243,3 +243,3 @@ done(); | ||
beforeEach(function() { | ||
amqp.streams.forEach(function(stream) { | ||
amqp.channels.forEach(function(stream) { | ||
stream._subscribeToQueue = sinon.stub().yields(null); | ||
@@ -250,3 +250,3 @@ }); | ||
afterEach(function() { | ||
amqp.streams.forEach(function(stream) { | ||
amqp.channels.forEach(function(stream) { | ||
stream._subscribeToQueue.reset(); | ||
@@ -259,3 +259,3 @@ }); | ||
expect(err).to.not.be.ok(); | ||
amqp.streams.forEach(function(stream) { | ||
amqp.channels.forEach(function(stream) { | ||
expect(stream._subscribeToQueue.callCount).to.be(1); | ||
@@ -269,3 +269,3 @@ }); | ||
//since we haven't initialized the streams in the test, let's manually say we've subscribed here | ||
amqp.streams.forEach(function(stream) { | ||
amqp.channels.forEach(function(stream) { | ||
stream.subscribed = true; | ||
@@ -275,3 +275,3 @@ }); | ||
expect(err).to.not.be.ok(); | ||
amqp.streams.forEach(function(stream) { | ||
amqp.channels.forEach(function(stream) { | ||
expect(stream._subscribeToQueue.callCount).to.be(0); | ||
@@ -278,0 +278,0 @@ }); |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
New author
Supply chain riskA new npm collaborator published a version of the package for the first time. New collaborators are usually benign additions to a project, but do indicate a change to the security surface area of a package.
Found 1 instance in 1 package
0
36394
114