combine-stream
Advanced tools
Comparing version 0.0.2 to 0.0.3
@@ -7,4 +7,2 @@ #!/usr/bin/env node | ||
var combine = new CombineStream(); | ||
var delayed = function delayed(n) { | ||
@@ -35,4 +33,3 @@ var s = new stream.Transform({objectMode: true}); | ||
combine.addStream(streamA); | ||
combine.addStream(streamB); | ||
var combine = new CombineStream([streamA, streamB]); | ||
@@ -39,0 +36,0 @@ combine.on("data", console.log); |
125
index.js
@@ -14,30 +14,49 @@ var stream = require("readable-stream"); | ||
this._streams = []; | ||
var self = this; | ||
if (options.streams && Array.isArray(options.streams)) { | ||
for (var i=0;i<options.streams.length;++i) { | ||
this.addStream(options.streams[i]); | ||
} | ||
// copy the streams array, or make an empty one | ||
this._streams = (options.streams || []); | ||
// need at least one stream | ||
if (this._streams.length === 0) { | ||
this._streams.push(new stream.PassThrough({objectMode: true})); | ||
} | ||
var self = this; | ||
// default: true | ||
this._bubbleErrors = (typeof options.bubbleErrors === "undefined") || !!options.bubbleErrors; | ||
// propagate .end() action | ||
this.on("finish", function() { | ||
var waiting = self._streams.length; | ||
// error bubbling! yay! | ||
if (this._bubbleErrors) { | ||
for (var i=0;i<this._streams.length;++i) { | ||
this._streams[i].on("error", function(e) { | ||
return self.emit("error", e); | ||
}); | ||
} | ||
} | ||
var streams = self._streams.slice(); | ||
// poor man's .pipe() | ||
var awaitingEnd = this._streams.length; | ||
for (var i=0;i<this._streams.length;++i) { | ||
(function(s) { | ||
s.on("data", function(e) { | ||
if (!self.push(e)) { | ||
s.pause(); | ||
} | ||
}); | ||
return streams.forEach(function(entry) { | ||
entry.stream.removeListener("end", entry.onEnd); | ||
entry.stream.removeListener("finish", entry.onFinish); | ||
s.once("end", function() { | ||
awaitingEnd--; | ||
return entry.stream.end(function() { | ||
waiting--; | ||
if (awaitingEnd === 0) { | ||
self.push(null); | ||
} | ||
}); | ||
})(this._streams[i]); | ||
} | ||
if (waiting === 0) { | ||
return self.push(null); | ||
} | ||
}) | ||
}); | ||
// propagate .end() action | ||
this.on("finish", function() { | ||
for (var i=0;i<self._streams.length;++i) { | ||
self._streams[i].end(); | ||
} | ||
}); | ||
@@ -55,3 +74,3 @@ }; | ||
for (var i=0;i<this._streams.length;++i) { | ||
this._streams[i].stream.write(input, encoding, function() { | ||
this._streams[i].write(input, encoding, function() { | ||
waiting--; | ||
@@ -68,66 +87,4 @@ | ||
for (var i=0;i<this._streams.length;++i) { | ||
this._streams[i].stream.resume(); | ||
this._streams[i].resume(); | ||
} | ||
}; | ||
CombineStream.prototype.addStream = function addStream(str) { | ||
var self = this; | ||
var onData = function onData(e) { | ||
if (!self.push(e)) { | ||
str.pause(); | ||
} | ||
}; | ||
var onEnd = function onEnd() { | ||
self.removeStream(str); | ||
}; | ||
var onFinish = function onFinish() { | ||
self.removeStream(str); | ||
}; | ||
var onError = function onError(err) { | ||
self.emit("error", err); | ||
}; | ||
str.on("data", onData); | ||
str.on("end", onEnd); | ||
str.on("finish", onFinish); | ||
str.on("error", onError); | ||
this._streams.push({ | ||
stream: str, | ||
onData: onData, | ||
onEnd: onEnd, | ||
onFinish: onFinish, | ||
onError: onError, | ||
}); | ||
return this; | ||
}; | ||
CombineStream.prototype.removeStream = function removeStream(str) { | ||
var index = -1; | ||
for (var i=0;i<this._streams.length;++i) { | ||
if (this._streams[i].stream === str) { | ||
index = i; | ||
break; | ||
} | ||
} | ||
if (index === -1) { | ||
return this; | ||
} | ||
var entry = this._streams[index]; | ||
this._streams.splice(index, 1); | ||
str.removeListener("data", entry.onData); | ||
str.removeListener("end", entry.onEnd); | ||
str.removeListener("finish", entry.onFinish); | ||
str.removeListener("error", entry.onError); | ||
return this; | ||
}; |
{ | ||
"name": "combine-stream", | ||
"version": "0.0.2", | ||
"version": "0.0.3", | ||
"description": "Combine multiple duplex streams into just one", | ||
@@ -5,0 +5,0 @@ "main": "index.js", |
@@ -1,2 +0,2 @@ | ||
combine-stream [![build status](https://travis-ci.org/deoxxa/combine-stream.png)](https://travis-ci.org/deoxxa/fork) | ||
combine-stream [![build status](https://travis-ci.org/deoxxa/combine-stream.png)](https://travis-ci.org/deoxxa/combine-stream) | ||
=============== | ||
@@ -25,6 +25,4 @@ | ||
var CombineStream = require("./"); | ||
var CombineStream = require("combine-stream"); | ||
var combine = new CombineStream(); | ||
var streamA = new stream.PassThrough({objectMode: true}), | ||
@@ -34,5 +32,3 @@ streamB = new stream.PassThrough({objectMode: true}), | ||
combine.addStream(streamA); | ||
combine.addStream(streamB); | ||
combine.addStream(streamC); | ||
var combine = new CombineStream([streamA, streamB, streamC]); | ||
@@ -82,3 +78,3 @@ combine.on("data", console.log); | ||
```javascript | ||
var fork = new CombineStream({ | ||
var combine = new CombineStream({ | ||
logSize: 100, | ||
@@ -95,3 +91,4 @@ recordDuplicates: true, | ||
* _options_ - an object containing, as well as the regular `TransformStream` | ||
options, the following possible parameters: | ||
options, the parameters described below. If this argument is an array, it will | ||
be wrapped in `{streams: ...}`. | ||
@@ -101,33 +98,5 @@ _options_ | ||
* _streams_ - an array of streams to add at instantiation time. | ||
* _bubbleErrors_ - a boolean value specifying whether to bubble errors up from | ||
the wrapped streams. | ||
**addStream** | ||
```javascript | ||
combine.addStream(stream); | ||
``` | ||
```javascript | ||
combine.addStream(new stream.PassThrough({ | ||
objectMode: true, | ||
})); | ||
``` | ||
Arguments | ||
* _stream_ - a stream to add to the combine-stream intance | ||
**removeStream** | ||
```javascript | ||
combine.removeStream(stream); | ||
``` | ||
```javascript | ||
combine.removeStream(anExistingStream); | ||
``` | ||
Arguments | ||
* _stream_ - a stream to remove from the combine-stream instance | ||
Example | ||
@@ -143,4 +112,2 @@ ------- | ||
var combine = new CombineStream(); | ||
var delayed = function delayed(n) { | ||
@@ -168,7 +135,8 @@ var s = new stream.Transform({objectMode: true}); | ||
var combine = new CombineStream(); | ||
var streamA = delayed(100), | ||
streamB = delayed(500); | ||
combine.addStream(streamA); | ||
combine.addStream(streamB); | ||
var combine = new CombineStream([streamA, streamB]); | ||
@@ -175,0 +143,0 @@ combine.on("data", console.log); |
@@ -10,17 +10,2 @@ var assert = require("chai").assert, | ||
it("should combine all output into one stream", function(done) { | ||
var combine = new CombineStream(); | ||
var expected = ["hello 1", "hello 2"]; | ||
actual = []; | ||
combine.on("data", function(e) { | ||
actual.push(e); | ||
}); | ||
combine.on("end", function() { | ||
assert.deepEqual(expected, actual); | ||
return done(); | ||
}); | ||
var s1 = new stream.Transform({objectMode: true}), | ||
@@ -41,5 +26,17 @@ s2 = new stream.Transform({objectMode: true}); | ||
combine.addStream(s1); | ||
combine.addStream(s2); | ||
var combine = new CombineStream([s1, s2]); | ||
var expected = ["hello 1", "hello 2"]; | ||
actual = []; | ||
combine.on("data", function(e) { | ||
actual.push(e); | ||
}); | ||
combine.on("end", function() { | ||
assert.deepEqual(expected, actual); | ||
return done(); | ||
}); | ||
combine.write("hello"); | ||
@@ -88,2 +85,11 @@ | ||
}); | ||
it("should end when all containing streams end", function(done) { | ||
var s1 = new stream.PassThrough(), | ||
s2 = new stream.PassThrough(); | ||
var combine = new CombineStream([s1, s2]); | ||
combine.end(done); | ||
}); | ||
}); |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
10666
161
173