ordered-read-streams
Advanced tools
Comparing version 0.0.7 to 0.0.8
82
index.js
@@ -5,2 +5,6 @@ var Readable = require('stream').Readable; | ||
function OrderedStreams(streams, options) { | ||
if (!(this instanceof(OrderedStreams))) { | ||
return new OrderedStreams(streams, options); | ||
} | ||
streams = streams || []; | ||
@@ -17,45 +21,24 @@ options = options || {}; | ||
var self = this; | ||
// stream data buffer | ||
this._buffs = []; | ||
if (streams.length === 0) { | ||
this.push(null); // no streams, close | ||
} else { | ||
// stream data buffer | ||
this._buff = {}; | ||
this._totalStreams = streams.length; | ||
this._openedStreams = streams.length; | ||
streams.forEach(function (s, i) { | ||
if (!s.readable) { | ||
throw new Error('All input streams must be readable'); | ||
} | ||
return; | ||
} | ||
if (!self._buff[i]) { | ||
self._buff[i] = []; | ||
} | ||
streams.forEach(function (s, i) { | ||
if (!s.readable) { | ||
throw new Error('All input streams must be readable'); | ||
} | ||
s.on('error', function (e) { | ||
this.emit('error', e); | ||
}.bind(this)); | ||
s.on('data', function (data) { | ||
if (i === 0) { | ||
// from first stream we simply push data | ||
self.push(data); | ||
} else { | ||
self._buff[i].push(data); // store in buffer for future | ||
} | ||
}); | ||
s.on('end', function () { | ||
if (!--self._openedStreams) { | ||
// no more opened streams | ||
// flush buffered data (if any) before end | ||
for (var j = 0; j < self._totalStreams; j++) { | ||
while (self._buff[j].length) { | ||
self.push(self._buff[j].shift()); | ||
} | ||
} | ||
self.push(null); | ||
} | ||
}); | ||
s.on('error', function (e) { | ||
self.emit('error', e); | ||
}); | ||
}); | ||
} | ||
var buff = []; | ||
this._buffs.push(buff); | ||
s.on('data', buff.unshift.bind(buff)); | ||
s.on('end', flushStreamAtIndex.bind(this, i)); | ||
}, this); | ||
} | ||
@@ -65,4 +48,27 @@ | ||
function flushStreamAtIndex (index) { | ||
this._buffs[index].finished = true; | ||
this._flush(); | ||
} | ||
OrderedStreams.prototype._read = function () {}; | ||
OrderedStreams.prototype._flush = function () { | ||
for (var i = 0, buffs = this._buffs, l = buffs.length; i < l; i++) { | ||
if (buffs[i].finished !== true) { | ||
return; | ||
} | ||
// every buffs before index are all finished, ready to flush | ||
for (var j = 0; j <= i; j++) { | ||
var buffAtIndex = buffs[j]; | ||
while (buffAtIndex.length) { | ||
this.push(buffAtIndex.pop()); | ||
} | ||
} | ||
} | ||
// no more opened streams | ||
// flush buffered data (if any) before end | ||
this.push(null); | ||
}; | ||
module.exports = OrderedStreams; |
{ | ||
"name": "ordered-read-streams", | ||
"version": "0.0.7", | ||
"version": "0.0.8", | ||
"description": "Combines array of streams into one read stream in strict order", | ||
@@ -16,7 +16,8 @@ "main": "index.js", | ||
"devDependencies": { | ||
"should": "~2.1.1", | ||
"should": "~3.0.1", | ||
"mocha": "~1.17.0", | ||
"through2": "~0.4.0", | ||
"jshint": "~2.4.1" | ||
"jshint": "~2.4.1", | ||
"pre-commit": "0.0.4" | ||
} | ||
} |
@@ -7,3 +7,3 @@ var should = require('should'); | ||
it('should end if no streams are given', function (done) { | ||
var streams = new OrderedStreams(); | ||
var streams = OrderedStreams(); | ||
streams.on('data', function () { | ||
@@ -10,0 +10,0 @@ done('error'); |
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
8648
197
5