combine-stream
Advanced tools
+1
-4
@@ -7,4 +7,2 @@ #!/usr/bin/env node | ||
| var combine = new CombineStream(); | ||
| var delayed = function delayed(n) { | ||
@@ -35,4 +33,3 @@ var s = new stream.Transform({objectMode: true}); | ||
| combine.addStream(streamA); | ||
| combine.addStream(streamB); | ||
| var combine = new CombineStream([streamA, streamB]); | ||
@@ -39,0 +36,0 @@ combine.on("data", console.log); |
+41
-84
@@ -14,30 +14,49 @@ var stream = require("readable-stream"); | ||
| this._streams = []; | ||
| var self = this; | ||
| if (options.streams && Array.isArray(options.streams)) { | ||
| for (var i=0;i<options.streams.length;++i) { | ||
| this.addStream(options.streams[i]); | ||
| } | ||
| // copy the streams array, or make an empty one | ||
| this._streams = (options.streams || []); | ||
| // need at least one stream | ||
| if (this._streams.length === 0) { | ||
| this._streams.push(new stream.PassThrough({objectMode: true})); | ||
| } | ||
| var self = this; | ||
| // default: true | ||
| this._bubbleErrors = (typeof options.bubbleErrors === "undefined") || !!options.bubbleErrors; | ||
| // propagate .end() action | ||
| this.on("finish", function() { | ||
| var waiting = self._streams.length; | ||
| // error bubbling! yay! | ||
| if (this._bubbleErrors) { | ||
| for (var i=0;i<this._streams.length;++i) { | ||
| this._streams[i].on("error", function(e) { | ||
| return self.emit("error", e); | ||
| }); | ||
| } | ||
| } | ||
| var streams = self._streams.slice(); | ||
| // poor man's .pipe() | ||
| var awaitingEnd = this._streams.length; | ||
| for (var i=0;i<this._streams.length;++i) { | ||
| (function(s) { | ||
| s.on("data", function(e) { | ||
| if (!self.push(e)) { | ||
| s.pause(); | ||
| } | ||
| }); | ||
| return streams.forEach(function(entry) { | ||
| entry.stream.removeListener("end", entry.onEnd); | ||
| entry.stream.removeListener("finish", entry.onFinish); | ||
| s.once("end", function() { | ||
| awaitingEnd--; | ||
| return entry.stream.end(function() { | ||
| waiting--; | ||
| if (awaitingEnd === 0) { | ||
| self.push(null); | ||
| } | ||
| }); | ||
| })(this._streams[i]); | ||
| } | ||
| if (waiting === 0) { | ||
| return self.push(null); | ||
| } | ||
| }) | ||
| }); | ||
| // propagate .end() action | ||
| this.on("finish", function() { | ||
| for (var i=0;i<self._streams.length;++i) { | ||
| self._streams[i].end(); | ||
| } | ||
| }); | ||
@@ -55,3 +74,3 @@ }; | ||
| for (var i=0;i<this._streams.length;++i) { | ||
| this._streams[i].stream.write(input, encoding, function() { | ||
| this._streams[i].write(input, encoding, function() { | ||
| waiting--; | ||
@@ -68,66 +87,4 @@ | ||
| for (var i=0;i<this._streams.length;++i) { | ||
| this._streams[i].stream.resume(); | ||
| this._streams[i].resume(); | ||
| } | ||
| }; | ||
| CombineStream.prototype.addStream = function addStream(str) { | ||
| var self = this; | ||
| var onData = function onData(e) { | ||
| if (!self.push(e)) { | ||
| str.pause(); | ||
| } | ||
| }; | ||
| var onEnd = function onEnd() { | ||
| self.removeStream(str); | ||
| }; | ||
| var onFinish = function onFinish() { | ||
| self.removeStream(str); | ||
| }; | ||
| var onError = function onError(err) { | ||
| self.emit("error", err); | ||
| }; | ||
| str.on("data", onData); | ||
| str.on("end", onEnd); | ||
| str.on("finish", onFinish); | ||
| str.on("error", onError); | ||
| this._streams.push({ | ||
| stream: str, | ||
| onData: onData, | ||
| onEnd: onEnd, | ||
| onFinish: onFinish, | ||
| onError: onError, | ||
| }); | ||
| return this; | ||
| }; | ||
| CombineStream.prototype.removeStream = function removeStream(str) { | ||
| var index = -1; | ||
| for (var i=0;i<this._streams.length;++i) { | ||
| if (this._streams[i].stream === str) { | ||
| index = i; | ||
| break; | ||
| } | ||
| } | ||
| if (index === -1) { | ||
| return this; | ||
| } | ||
| var entry = this._streams[index]; | ||
| this._streams.splice(index, 1); | ||
| str.removeListener("data", entry.onData); | ||
| str.removeListener("end", entry.onEnd); | ||
| str.removeListener("finish", entry.onFinish); | ||
| str.removeListener("error", entry.onError); | ||
| return this; | ||
| }; |
+1
-1
| { | ||
| "name": "combine-stream", | ||
| "version": "0.0.2", | ||
| "version": "0.0.3", | ||
| "description": "Combine multiple duplex streams into just one", | ||
@@ -5,0 +5,0 @@ "main": "index.js", |
+11
-43
@@ -1,2 +0,2 @@ | ||
| combine-stream [](https://travis-ci.org/deoxxa/fork) | ||
| combine-stream [](https://travis-ci.org/deoxxa/combine-stream) | ||
| =============== | ||
@@ -25,6 +25,4 @@ | ||
| var CombineStream = require("./"); | ||
| var CombineStream = require("combine-stream"); | ||
| var combine = new CombineStream(); | ||
| var streamA = new stream.PassThrough({objectMode: true}), | ||
@@ -34,5 +32,3 @@ streamB = new stream.PassThrough({objectMode: true}), | ||
| combine.addStream(streamA); | ||
| combine.addStream(streamB); | ||
| combine.addStream(streamC); | ||
| var combine = new CombineStream([streamA, streamB, streamC]); | ||
@@ -82,3 +78,3 @@ combine.on("data", console.log); | ||
| ```javascript | ||
| var fork = new CombineStream({ | ||
| var combine = new CombineStream({ | ||
| logSize: 100, | ||
@@ -95,3 +91,4 @@ recordDuplicates: true, | ||
| * _options_ - an object containing, as well as the regular `TransformStream` | ||
| options, the following possible parameters: | ||
| options, the parameters described below. If this argument is an array, it will | ||
| be wrapped in `{streams: ...}`. | ||
@@ -101,33 +98,5 @@ _options_ | ||
| * _streams_ - an array of streams to add at instantiation time. | ||
| * _bubbleErrors_ - a boolean value specifying whether to bubble errors up from | ||
| the wrapped streams. | ||
| **addStream** | ||
| ```javascript | ||
| combine.addStream(stream); | ||
| ``` | ||
| ```javascript | ||
| combine.addStream(new stream.PassThrough({ | ||
| objectMode: true, | ||
| })); | ||
| ``` | ||
| Arguments | ||
| * _stream_ - a stream to add to the combine-stream intance | ||
| **removeStream** | ||
| ```javascript | ||
| combine.removeStream(stream); | ||
| ``` | ||
| ```javascript | ||
| combine.removeStream(anExistingStream); | ||
| ``` | ||
| Arguments | ||
| * _stream_ - a stream to remove from the combine-stream instance | ||
| Example | ||
@@ -143,4 +112,2 @@ ------- | ||
| var combine = new CombineStream(); | ||
| var delayed = function delayed(n) { | ||
@@ -168,7 +135,8 @@ var s = new stream.Transform({objectMode: true}); | ||
| var combine = new CombineStream(); | ||
| var streamA = delayed(100), | ||
| streamB = delayed(500); | ||
| combine.addStream(streamA); | ||
| combine.addStream(streamB); | ||
| var combine = new CombineStream([streamA, streamB]); | ||
@@ -175,0 +143,0 @@ combine.on("data", console.log); |
+23
-17
@@ -10,17 +10,2 @@ var assert = require("chai").assert, | ||
| it("should combine all output into one stream", function(done) { | ||
| var combine = new CombineStream(); | ||
| var expected = ["hello 1", "hello 2"]; | ||
| actual = []; | ||
| combine.on("data", function(e) { | ||
| actual.push(e); | ||
| }); | ||
| combine.on("end", function() { | ||
| assert.deepEqual(expected, actual); | ||
| return done(); | ||
| }); | ||
| var s1 = new stream.Transform({objectMode: true}), | ||
@@ -41,5 +26,17 @@ s2 = new stream.Transform({objectMode: true}); | ||
| combine.addStream(s1); | ||
| combine.addStream(s2); | ||
| var combine = new CombineStream([s1, s2]); | ||
| var expected = ["hello 1", "hello 2"]; | ||
| actual = []; | ||
| combine.on("data", function(e) { | ||
| actual.push(e); | ||
| }); | ||
| combine.on("end", function() { | ||
| assert.deepEqual(expected, actual); | ||
| return done(); | ||
| }); | ||
| combine.write("hello"); | ||
@@ -88,2 +85,11 @@ | ||
| }); | ||
| it("should end when all containing streams end", function(done) { | ||
| var s1 = new stream.PassThrough(), | ||
| s2 = new stream.PassThrough(); | ||
| var combine = new CombineStream([s1, s2]); | ||
| combine.end(done); | ||
| }); | ||
| }); |
10666
-7.62%161
-14.81%173
-15.61%