Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

combine-stream

Package Overview
Dependencies
Maintainers
1
Versions
4
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

combine-stream - npm Package Compare versions

Comparing version 0.0.2 to 0.0.3

5

example.js

@@ -7,4 +7,2 @@ #!/usr/bin/env node

var combine = new CombineStream();
var delayed = function delayed(n) {

@@ -35,4 +33,3 @@ var s = new stream.Transform({objectMode: true});

combine.addStream(streamA);
combine.addStream(streamB);
var combine = new CombineStream([streamA, streamB]);

@@ -39,0 +36,0 @@ combine.on("data", console.log);

125

index.js

@@ -14,30 +14,49 @@ var stream = require("readable-stream");

this._streams = [];
var self = this;
if (options.streams && Array.isArray(options.streams)) {
for (var i=0;i<options.streams.length;++i) {
this.addStream(options.streams[i]);
}
// copy the streams array, or make an empty one
this._streams = (options.streams || []);
// need at least one stream
if (this._streams.length === 0) {
this._streams.push(new stream.PassThrough({objectMode: true}));
}
var self = this;
// default: true
this._bubbleErrors = (typeof options.bubbleErrors === "undefined") || !!options.bubbleErrors;
// propagate .end() action
this.on("finish", function() {
var waiting = self._streams.length;
// error bubbling! yay!
if (this._bubbleErrors) {
for (var i=0;i<this._streams.length;++i) {
this._streams[i].on("error", function(e) {
return self.emit("error", e);
});
}
}
var streams = self._streams.slice();
// poor man's .pipe()
var awaitingEnd = this._streams.length;
for (var i=0;i<this._streams.length;++i) {
(function(s) {
s.on("data", function(e) {
if (!self.push(e)) {
s.pause();
}
});
return streams.forEach(function(entry) {
entry.stream.removeListener("end", entry.onEnd);
entry.stream.removeListener("finish", entry.onFinish);
s.once("end", function() {
awaitingEnd--;
return entry.stream.end(function() {
waiting--;
if (awaitingEnd === 0) {
self.push(null);
}
});
})(this._streams[i]);
}
if (waiting === 0) {
return self.push(null);
}
})
});
// propagate .end() action
this.on("finish", function() {
for (var i=0;i<self._streams.length;++i) {
self._streams[i].end();
}
});

@@ -55,3 +74,3 @@ };

for (var i=0;i<this._streams.length;++i) {
this._streams[i].stream.write(input, encoding, function() {
this._streams[i].write(input, encoding, function() {
waiting--;

@@ -68,66 +87,4 @@

for (var i=0;i<this._streams.length;++i) {
this._streams[i].stream.resume();
this._streams[i].resume();
}
};
CombineStream.prototype.addStream = function addStream(str) {
var self = this;
var onData = function onData(e) {
if (!self.push(e)) {
str.pause();
}
};
var onEnd = function onEnd() {
self.removeStream(str);
};
var onFinish = function onFinish() {
self.removeStream(str);
};
var onError = function onError(err) {
self.emit("error", err);
};
str.on("data", onData);
str.on("end", onEnd);
str.on("finish", onFinish);
str.on("error", onError);
this._streams.push({
stream: str,
onData: onData,
onEnd: onEnd,
onFinish: onFinish,
onError: onError,
});
return this;
};
CombineStream.prototype.removeStream = function removeStream(str) {
var index = -1;
for (var i=0;i<this._streams.length;++i) {
if (this._streams[i].stream === str) {
index = i;
break;
}
}
if (index === -1) {
return this;
}
var entry = this._streams[index];
this._streams.splice(index, 1);
str.removeListener("data", entry.onData);
str.removeListener("end", entry.onEnd);
str.removeListener("finish", entry.onFinish);
str.removeListener("error", entry.onError);
return this;
};
{
"name": "combine-stream",
"version": "0.0.2",
"version": "0.0.3",
"description": "Combine multiple duplex streams into just one",

@@ -5,0 +5,0 @@ "main": "index.js",

@@ -1,2 +0,2 @@

combine-stream [![build status](https://travis-ci.org/deoxxa/combine-stream.png)](https://travis-ci.org/deoxxa/fork)
combine-stream [![build status](https://travis-ci.org/deoxxa/combine-stream.png)](https://travis-ci.org/deoxxa/combine-stream)
===============

@@ -25,6 +25,4 @@

var CombineStream = require("./");
var CombineStream = require("combine-stream");
var combine = new CombineStream();
var streamA = new stream.PassThrough({objectMode: true}),

@@ -34,5 +32,3 @@ streamB = new stream.PassThrough({objectMode: true}),

combine.addStream(streamA);
combine.addStream(streamB);
combine.addStream(streamC);
var combine = new CombineStream([streamA, streamB, streamC]);

@@ -82,3 +78,3 @@ combine.on("data", console.log);

```javascript
var fork = new CombineStream({
var combine = new CombineStream({
logSize: 100,

@@ -95,3 +91,4 @@ recordDuplicates: true,

* _options_ - an object containing, as well as the regular `TransformStream`
options, the following possible parameters:
options, the parameters described below. If this argument is an array, it will
be wrapped in `{streams: ...}`.

@@ -101,33 +98,5 @@ _options_

* _streams_ - an array of streams to add at instantiation time.
* _bubbleErrors_ - a boolean value specifying whether to bubble errors up from
the wrapped streams.
**addStream**
```javascript
combine.addStream(stream);
```
```javascript
combine.addStream(new stream.PassThrough({
objectMode: true,
}));
```
Arguments
* _stream_ - a stream to add to the combine-stream intance
**removeStream**
```javascript
combine.removeStream(stream);
```
```javascript
combine.removeStream(anExistingStream);
```
Arguments
* _stream_ - a stream to remove from the combine-stream instance
Example

@@ -143,4 +112,2 @@ -------

var combine = new CombineStream();
var delayed = function delayed(n) {

@@ -168,7 +135,8 @@ var s = new stream.Transform({objectMode: true});

var combine = new CombineStream();
var streamA = delayed(100),
streamB = delayed(500);
combine.addStream(streamA);
combine.addStream(streamB);
var combine = new CombineStream([streamA, streamB]);

@@ -175,0 +143,0 @@ combine.on("data", console.log);

@@ -10,17 +10,2 @@ var assert = require("chai").assert,

it("should combine all output into one stream", function(done) {
var combine = new CombineStream();
var expected = ["hello 1", "hello 2"];
actual = [];
combine.on("data", function(e) {
actual.push(e);
});
combine.on("end", function() {
assert.deepEqual(expected, actual);
return done();
});
var s1 = new stream.Transform({objectMode: true}),

@@ -41,5 +26,17 @@ s2 = new stream.Transform({objectMode: true});

combine.addStream(s1);
combine.addStream(s2);
var combine = new CombineStream([s1, s2]);
var expected = ["hello 1", "hello 2"];
actual = [];
combine.on("data", function(e) {
actual.push(e);
});
combine.on("end", function() {
assert.deepEqual(expected, actual);
return done();
});
combine.write("hello");

@@ -88,2 +85,11 @@

});
it("should end when all containing streams end", function(done) {
var s1 = new stream.PassThrough(),
s2 = new stream.PassThrough();
var combine = new CombineStream([s1, s2]);
combine.end(done);
});
});
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc