ordered-read-streams
Advanced tools
+60
-44
| var Readable = require('stream').Readable; | ||
| var util = require('util'); | ||
| function addStream(streams, stream) | ||
| { | ||
| if(!stream.readable) throw new Error('All input streams must be readable'); | ||
| if(this._readableState.ended) throw new Error('Adding streams after ended'); | ||
| var self = this; | ||
| stream._buffer = []; | ||
| stream.on('data', function(chunk) | ||
| { | ||
| if(this === streams[0]) | ||
| self.push(chunk); | ||
| else | ||
| this._buffer.push(chunk); | ||
| }); | ||
| stream.on('end', function() | ||
| { | ||
| for(var stream = streams[0]; | ||
| stream && stream._readableState.ended; | ||
| stream = streams[0]) | ||
| { | ||
| while(stream._buffer.length) | ||
| self.push(stream._buffer.shift()); | ||
| streams.shift(); | ||
| } | ||
| if(!streams.length) self.push(null); | ||
| }); | ||
| stream.on('error', this.emit.bind(this, 'error')); | ||
| streams.push(stream); | ||
| } | ||
| function OrderedStreams(streams, options) { | ||
@@ -12,6 +55,2 @@ if (!(this instanceof(OrderedStreams))) { | ||
| if (!Array.isArray(streams)) { | ||
| streams = [streams]; | ||
| } | ||
| options.objectMode = true; | ||
@@ -21,53 +60,30 @@ | ||
| // stream data buffer | ||
| this._buffs = []; | ||
| if (streams.length === 0) { | ||
| this.push(null); // no streams, close | ||
| return; | ||
| } | ||
| if(!Array.isArray(streams)) streams = [streams]; | ||
| if(!streams.length) return this.push(null); // no streams, close | ||
| streams.forEach(function (s, i) { | ||
| if (!s.readable) { | ||
| throw new Error('All input streams must be readable'); | ||
| } | ||
| s.on('error', function (e) { | ||
| this.emit('error', e); | ||
| }.bind(this)); | ||
| var buff = []; | ||
| this._buffs.push(buff); | ||
| var addStream_bind = addStream.bind(this, []); | ||
| s.on('data', buff.unshift.bind(buff)); | ||
| s.on('end', flushStreamAtIndex.bind(this, i)); | ||
| }, this); | ||
| } | ||
| util.inherits(OrderedStreams, Readable); | ||
| this.concat = function() | ||
| { | ||
| Array.prototype.forEach.call(arguments, function(item) | ||
| { | ||
| if(Array.isArray(item)) | ||
| item.forEach(addStream_bind); | ||
| function flushStreamAtIndex (index) { | ||
| this._buffs[index].finished = true; | ||
| this._flush(); | ||
| else | ||
| addStream_bind(item); | ||
| }); | ||
| }; | ||
| this.concat(streams); | ||
| } | ||
| util.inherits(OrderedStreams, Readable); | ||
| OrderedStreams.prototype._read = function () {}; | ||
| OrderedStreams.prototype._flush = function () { | ||
| for (var i = 0, buffs = this._buffs, l = buffs.length; i < l; i++) { | ||
| if (buffs[i].finished !== true) { | ||
| return; | ||
| } | ||
| // every buffs before index are all finished, ready to flush | ||
| for (var j = 0; j <= i; j++) { | ||
| var buffAtIndex = buffs[j]; | ||
| while (buffAtIndex.length) { | ||
| this.push(buffAtIndex.pop()); | ||
| } | ||
| } | ||
| } | ||
| // no more opened streams | ||
| // flush buffered data (if any) before end | ||
| this.push(null); | ||
| }; | ||
| module.exports = OrderedStreams; |
+1
-1
| { | ||
| "name": "ordered-read-streams", | ||
| "version": "0.0.8", | ||
| "version": "0.1.0", | ||
| "description": "Combines array of streams into one read stream in strict order", | ||
@@ -5,0 +5,0 @@ "main": "index.js", |
8684
0.42%