Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

ordered-read-streams

Package Overview
Dependencies
Maintainers
1
Versions
14
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

ordered-read-streams - npm Package Compare versions

Comparing version 0.0.7 to 0.0.8

82

index.js

@@ -5,2 +5,6 @@ var Readable = require('stream').Readable;

function OrderedStreams(streams, options) {
if (!(this instanceof(OrderedStreams))) {
return new OrderedStreams(streams, options);
}
streams = streams || [];

@@ -17,45 +21,24 @@ options = options || {};

var self = this;
// stream data buffer
this._buffs = [];
if (streams.length === 0) {
this.push(null); // no streams, close
} else {
// stream data buffer
this._buff = {};
this._totalStreams = streams.length;
this._openedStreams = streams.length;
streams.forEach(function (s, i) {
if (!s.readable) {
throw new Error('All input streams must be readable');
}
return;
}
if (!self._buff[i]) {
self._buff[i] = [];
}
streams.forEach(function (s, i) {
if (!s.readable) {
throw new Error('All input streams must be readable');
}
s.on('error', function (e) {
this.emit('error', e);
}.bind(this));
s.on('data', function (data) {
if (i === 0) {
// from first stream we simply push data
self.push(data);
} else {
self._buff[i].push(data); // store in buffer for future
}
});
s.on('end', function () {
if (!--self._openedStreams) {
// no more opened streams
// flush buffered data (if any) before end
for (var j = 0; j < self._totalStreams; j++) {
while (self._buff[j].length) {
self.push(self._buff[j].shift());
}
}
self.push(null);
}
});
s.on('error', function (e) {
self.emit('error', e);
});
});
}
var buff = [];
this._buffs.push(buff);
s.on('data', buff.unshift.bind(buff));
s.on('end', flushStreamAtIndex.bind(this, i));
}, this);
}

@@ -65,4 +48,27 @@

function flushStreamAtIndex (index) {
this._buffs[index].finished = true;
this._flush();
}
OrderedStreams.prototype._read = function () {};
OrderedStreams.prototype._flush = function () {
for (var i = 0, buffs = this._buffs, l = buffs.length; i < l; i++) {
if (buffs[i].finished !== true) {
return;
}
// every buffs before index are all finished, ready to flush
for (var j = 0; j <= i; j++) {
var buffAtIndex = buffs[j];
while (buffAtIndex.length) {
this.push(buffAtIndex.pop());
}
}
}
// no more opened streams
// flush buffered data (if any) before end
this.push(null);
};
module.exports = OrderedStreams;
{
"name": "ordered-read-streams",
"version": "0.0.7",
"version": "0.0.8",
"description": "Combines array of streams into one read stream in strict order",

@@ -16,7 +16,8 @@ "main": "index.js",

"devDependencies": {
"should": "~2.1.1",
"should": "~3.0.1",
"mocha": "~1.17.0",
"through2": "~0.4.0",
"jshint": "~2.4.1"
"jshint": "~2.4.1",
"pre-commit": "0.0.4"
}
}

@@ -7,3 +7,3 @@ var should = require('should');

it('should end if no streams are given', function (done) {
var streams = new OrderedStreams();
var streams = OrderedStreams();
streams.on('data', function () {

@@ -10,0 +10,0 @@ done('error');

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc