ordered-read-streams
Advanced tools
Comparing version 0.0.8 to 0.1.0
104
index.js
var Readable = require('stream').Readable; | ||
var util = require('util'); | ||
function addStream(streams, stream) | ||
{ | ||
if(!stream.readable) throw new Error('All input streams must be readable'); | ||
if(this._readableState.ended) throw new Error('Adding streams after ended'); | ||
var self = this; | ||
stream._buffer = []; | ||
stream.on('data', function(chunk) | ||
{ | ||
if(this === streams[0]) | ||
self.push(chunk); | ||
else | ||
this._buffer.push(chunk); | ||
}); | ||
stream.on('end', function() | ||
{ | ||
for(var stream = streams[0]; | ||
stream && stream._readableState.ended; | ||
stream = streams[0]) | ||
{ | ||
while(stream._buffer.length) | ||
self.push(stream._buffer.shift()); | ||
streams.shift(); | ||
} | ||
if(!streams.length) self.push(null); | ||
}); | ||
stream.on('error', this.emit.bind(this, 'error')); | ||
streams.push(stream); | ||
} | ||
function OrderedStreams(streams, options) { | ||
@@ -12,6 +55,2 @@ if (!(this instanceof(OrderedStreams))) { | ||
if (!Array.isArray(streams)) { | ||
streams = [streams]; | ||
} | ||
options.objectMode = true; | ||
@@ -21,53 +60,30 @@ | ||
// stream data buffer | ||
this._buffs = []; | ||
if (streams.length === 0) { | ||
this.push(null); // no streams, close | ||
return; | ||
} | ||
if(!Array.isArray(streams)) streams = [streams]; | ||
if(!streams.length) return this.push(null); // no streams, close | ||
streams.forEach(function (s, i) { | ||
if (!s.readable) { | ||
throw new Error('All input streams must be readable'); | ||
} | ||
s.on('error', function (e) { | ||
this.emit('error', e); | ||
}.bind(this)); | ||
var buff = []; | ||
this._buffs.push(buff); | ||
var addStream_bind = addStream.bind(this, []); | ||
s.on('data', buff.unshift.bind(buff)); | ||
s.on('end', flushStreamAtIndex.bind(this, i)); | ||
}, this); | ||
} | ||
util.inherits(OrderedStreams, Readable); | ||
this.concat = function() | ||
{ | ||
Array.prototype.forEach.call(arguments, function(item) | ||
{ | ||
if(Array.isArray(item)) | ||
item.forEach(addStream_bind); | ||
function flushStreamAtIndex (index) { | ||
this._buffs[index].finished = true; | ||
this._flush(); | ||
else | ||
addStream_bind(item); | ||
}); | ||
}; | ||
this.concat(streams); | ||
} | ||
util.inherits(OrderedStreams, Readable); | ||
OrderedStreams.prototype._read = function () {}; | ||
OrderedStreams.prototype._flush = function () { | ||
for (var i = 0, buffs = this._buffs, l = buffs.length; i < l; i++) { | ||
if (buffs[i].finished !== true) { | ||
return; | ||
} | ||
// every buffs before index are all finished, ready to flush | ||
for (var j = 0; j <= i; j++) { | ||
var buffAtIndex = buffs[j]; | ||
while (buffAtIndex.length) { | ||
this.push(buffAtIndex.pop()); | ||
} | ||
} | ||
} | ||
// no more opened streams | ||
// flush buffered data (if any) before end | ||
this.push(null); | ||
}; | ||
module.exports = OrderedStreams; |
{ | ||
"name": "ordered-read-streams", | ||
"version": "0.0.8", | ||
"version": "0.1.0", | ||
"description": "Combines array of streams into one read stream in strict order", | ||
@@ -5,0 +5,0 @@ "main": "index.js", |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
8684