ordered-read-streams
Advanced tools
Comparing version 0.0.4 to 0.0.5
23
index.js
@@ -24,2 +24,4 @@ var Readable = require('stream').Readable; | ||
this._buff = {}; | ||
this._buffChinks = 0; | ||
this._totalStreams = streams.length; | ||
this._openedStreams = streams.length; | ||
@@ -31,9 +33,13 @@ streams.forEach(function (s, i) { | ||
if (!self._buff[i]) { | ||
self._buff[i] = []; | ||
} | ||
s.on('data', function (data) { | ||
if (i === self._currentIndex) { | ||
// data got from stream, which is at current index | ||
self._currentIndex++; | ||
self.push(data); | ||
} else { | ||
self._buff[i] = data; // store in buffer for future | ||
self._buffChinks++; | ||
self._buff[i].push(data); // store in buffer for future | ||
} | ||
@@ -43,7 +49,12 @@ }); | ||
if (i === self._currentIndex) { | ||
// stream ended without any data and it at current index | ||
// stream ended and it at current index | ||
self._currentIndex++; | ||
} | ||
if (!--self._openedStreams) { | ||
self.push(null); // close | ||
for (var i = self._currentIndex; i < self._totalStreams; i++) { | ||
while (self._buff[i].length) { | ||
self.push(self._buff[i].shift()); | ||
} | ||
} | ||
self.push(null) | ||
} | ||
@@ -65,5 +76,3 @@ }); | ||
var data = this._buff[this._currentIndex]; | ||
if (data) { | ||
// if we already have stored data - push it | ||
this._currentIndex++; | ||
if (data.length) { | ||
this.push(data); | ||
@@ -70,0 +79,0 @@ } |
{ | ||
"name": "ordered-read-streams", | ||
"version": "0.0.4", | ||
"version": "0.0.5", | ||
"description": "Combines array of streams into one read stream in strict order", | ||
@@ -5,0 +5,0 @@ "main": "index.js", |
@@ -45,3 +45,3 @@ var should = require('should'); | ||
streams.on('end', function () { | ||
results.length.should.equal(3); | ||
results.length.should.be.exactly(3); | ||
results[0].should.equal('stream 1'); | ||
@@ -63,2 +63,24 @@ results[1].should.equal('stream 2'); | ||
it('should emit all data event from each stream', function (done) { | ||
var s = through.obj(function (data, enc, next) { | ||
this.push(data); | ||
next(); | ||
}); | ||
var streams = new OrderedStreams(s); | ||
var results = []; | ||
streams.on('data', function (data) { | ||
results.push(data); | ||
}); | ||
streams.on('end', function () { | ||
results.length.should.be.exactly(3); | ||
done(); | ||
}); | ||
s.write('data1'); | ||
s.write('data2'); | ||
s.write('data3'); | ||
s.end(); | ||
}); | ||
it('should preserve streams order', function(done) { | ||
@@ -65,0 +87,0 @@ var s1 = through.obj(function (data, enc, next) { |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
8940
208