Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

rabbitmq-queue-stream

Package Overview
Dependencies
Maintainers
3
Versions
20
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

rabbitmq-queue-stream - npm Package Compare versions

Comparing version 0.1.1 to 0.1.2

10

index.js

@@ -39,3 +39,3 @@ var amqp = require("amqp");

this.__options = options;
this.streams = [];
this.channels = [];
}

@@ -62,3 +62,3 @@

}
me.streams = insightStreams;
me.channels = insightStreams;
cb(null, me);

@@ -120,3 +120,3 @@ });

//close every worker stream
async.eachSeries(this.streams, function(stream, next) {
async.eachSeries(this.channels, function(stream, next) {
stream.unsubscribe(next);

@@ -136,3 +136,3 @@ }, cb);

AMQPStreams.prototype.closeConsumers = function(cb) {
async.eachSeries(this.streams, function(stream, next) {
async.eachSeries(this.channels, function(stream, next) {
stream.close(next);

@@ -176,3 +176,3 @@ }, cb);

AMQPStreams.prototype.resubscribeConsumers = function(cb) {
async.eachSeries(this.streams, function(stream, next) {
async.eachSeries(this.channels, function(stream, next) {
if(!stream.subscribed) {

@@ -179,0 +179,0 @@ stream._subscribeToQueue(next);

{
"name": "rabbitmq-queue-stream",
"version": "0.1.1",
"version": "0.1.2",
"description": "Reliable streaming interface to rabbitmq queues",

@@ -5,0 +5,0 @@ "main": "index.js",

@@ -15,3 +15,3 @@ # rabbitmq-queue-stream

var RabbitMQStream = require("rabbitmq-queue-stream");
var Transform = require("stream").Transform;
var stream = require("stream");

@@ -36,4 +36,2 @@ var options = {

var queueStreams = streamifiedQueues.sources;
/*

@@ -50,6 +48,6 @@ * Each consumer channel comes with a .source and .sink property.

streamifiedQueues.sources.forEach(function(myQueueStream) {
streamifiedQueues.channels.forEach(function(channel) {
var doSomethingWithData = new Transform({objectMode: true});
doSomethingWithData._transform(function(data, enc, next) {
var myProcessingStream = new stream.Transform({objectMode: true});
myProcessingStream._transform(function(data, enc, next) {
console.log("Doing something with", data);

@@ -60,5 +58,5 @@ this.push(data);

myQueueStream.source
.pipe(doSomethingWithData)
.pipe(myQueueStream.sink);
channel.source
.pipe(myProcessingStream)
.pipe(channel.sink);
});

@@ -73,3 +71,3 @@

}
//Wait some time for queues to flush out. Then close Consumers
//Wait some time for queues to flush out before closing consumers.
streamifiedQueues.closeConsumers(function(err) {

@@ -91,3 +89,3 @@ if(err) {

### Events
### Emitted Events

@@ -101,2 +99,3 @@ #### .source

```
#### .sink

@@ -110,4 +109,4 @@ * deleted - Emitted everytime a job is deleted from the queue

```
* formatError - Sink received a job that does not have the necessary information to be deleted from the queue.
Most likely emitted when objects not originating from .source are written to sink.
* formatError - Sink received a job that does not have the necessary information to be deleted from the queue.
Most likely emitted when objects not originating from .source are written to sink.
```javascript

@@ -114,0 +113,0 @@ myQueueStream.sink.on("formatError", function(err, message) {

@@ -70,3 +70,3 @@ var EventEmitter = require("events").EventEmitter;

it("sets me.streams if successful", function (done) {
it("sets channels if successful", function (done) {
var connection = {};

@@ -85,5 +85,5 @@ var queueStream = {};

expect(result).to.be(streams);
expect(streams.streams).to.have.length(6);
streams.streams.forEach( function(item) {
expect(item).to.be(worker);
expect(streams.channels).to.have.length(6);
streams.channels.forEach( function(channel) {
expect(channel).to.be(worker);
});

@@ -174,11 +174,11 @@ done(err);

});
amqp.streams = streams;
amqp.channels = streams;
});
describe("#unsubscribeConsumers", function() {
it("calls #unsubscribe for every stream in AMQPStreams.streams", function(done) {
it("calls #unsubscribe for every stream in AMQPStreams.channels", function(done) {
amqp.unsubscribeConsumers(function(err) {
expect(err).to.not.be.ok();
amqp.streams.forEach(function(stream) {
expect(stream.unsubscribe.callCount).to.be(1);
amqp.channels.forEach(function(channel) {
expect(channel.unsubscribe.callCount).to.be(1);
});

@@ -192,7 +192,7 @@ done();

describe("#closeConsumers", function() {
it("calls #close on every stream in AMQPStreams.streams", function(done) {
it("calls #close on every stream in AMQPStreams.channels", function(done) {
amqp.closeConsumers(function(err) {
expect(err).to.not.be.ok();
amqp.streams.forEach(function(stream) {
expect(stream.close.callCount).to.be(1);
amqp.channels.forEach(function(channel) {
expect(channel.close.callCount).to.be(1);
});

@@ -243,3 +243,3 @@ done();

beforeEach(function() {
amqp.streams.forEach(function(stream) {
amqp.channels.forEach(function(stream) {
stream._subscribeToQueue = sinon.stub().yields(null);

@@ -250,3 +250,3 @@ });

afterEach(function() {
amqp.streams.forEach(function(stream) {
amqp.channels.forEach(function(stream) {
stream._subscribeToQueue.reset();

@@ -259,3 +259,3 @@ });

expect(err).to.not.be.ok();
amqp.streams.forEach(function(stream) {
amqp.channels.forEach(function(stream) {
expect(stream._subscribeToQueue.callCount).to.be(1);

@@ -269,3 +269,3 @@ });

//since we haven't initialized the streams in the test, let's manually say we've subscribed here
amqp.streams.forEach(function(stream) {
amqp.channels.forEach(function(stream) {
stream.subscribed = true;

@@ -275,3 +275,3 @@ });

expect(err).to.not.be.ok();
amqp.streams.forEach(function(stream) {
amqp.channels.forEach(function(stream) {
expect(stream._subscribeToQueue.callCount).to.be(0);

@@ -278,0 +278,0 @@ });

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc