Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

ordered-read-streams

Package Overview
Dependencies
Maintainers
1
Versions
14
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

ordered-read-streams - npm Package Compare versions

Comparing version 0.0.8 to 0.1.0

104

index.js
var Readable = require('stream').Readable;
var util = require('util');
function addStream(streams, stream)
{
if(!stream.readable) throw new Error('All input streams must be readable');
if(this._readableState.ended) throw new Error('Adding streams after ended');
var self = this;
stream._buffer = [];
stream.on('data', function(chunk)
{
if(this === streams[0])
self.push(chunk);
else
this._buffer.push(chunk);
});
stream.on('end', function()
{
for(var stream = streams[0];
stream && stream._readableState.ended;
stream = streams[0])
{
while(stream._buffer.length)
self.push(stream._buffer.shift());
streams.shift();
}
if(!streams.length) self.push(null);
});
stream.on('error', this.emit.bind(this, 'error'));
streams.push(stream);
}
function OrderedStreams(streams, options) {

@@ -12,6 +55,2 @@ if (!(this instanceof(OrderedStreams))) {

if (!Array.isArray(streams)) {
streams = [streams];
}
options.objectMode = true;

@@ -21,53 +60,30 @@

// stream data buffer
this._buffs = [];
if (streams.length === 0) {
this.push(null); // no streams, close
return;
}
if(!Array.isArray(streams)) streams = [streams];
if(!streams.length) return this.push(null); // no streams, close
streams.forEach(function (s, i) {
if (!s.readable) {
throw new Error('All input streams must be readable');
}
s.on('error', function (e) {
this.emit('error', e);
}.bind(this));
var buff = [];
this._buffs.push(buff);
var addStream_bind = addStream.bind(this, []);
s.on('data', buff.unshift.bind(buff));
s.on('end', flushStreamAtIndex.bind(this, i));
}, this);
}
util.inherits(OrderedStreams, Readable);
this.concat = function()
{
Array.prototype.forEach.call(arguments, function(item)
{
if(Array.isArray(item))
item.forEach(addStream_bind);
function flushStreamAtIndex (index) {
this._buffs[index].finished = true;
this._flush();
else
addStream_bind(item);
});
};
this.concat(streams);
}
util.inherits(OrderedStreams, Readable);
OrderedStreams.prototype._read = function () {};
OrderedStreams.prototype._flush = function () {
for (var i = 0, buffs = this._buffs, l = buffs.length; i < l; i++) {
if (buffs[i].finished !== true) {
return;
}
// every buffs before index are all finished, ready to flush
for (var j = 0; j <= i; j++) {
var buffAtIndex = buffs[j];
while (buffAtIndex.length) {
this.push(buffAtIndex.pop());
}
}
}
// no more opened streams
// flush buffered data (if any) before end
this.push(null);
};
module.exports = OrderedStreams;
{
"name": "ordered-read-streams",
"version": "0.0.8",
"version": "0.1.0",
"description": "Combines array of streams into one read stream in strict order",

@@ -5,0 +5,0 @@ "main": "index.js",

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc