Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

ordered-read-streams

Package Overview
Dependencies
Maintainers
2
Versions
14
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

ordered-read-streams - npm Package Compare versions

Comparing version 1.0.1 to 2.0.0

154

index.js

@@ -1,3 +0,2 @@

var Readable = require('readable-stream/readable');
var util = require('util');
var Readable = require('streamx').Readable;

@@ -13,88 +12,131 @@ function isReadable(stream) {

if (typeof stream._read !== 'function') {
if (typeof stream.read !== 'function') {
return false;
}
if (!stream._readableState) {
return false;
}
return true;
}
function addStream (streams, stream) {
function assertReadableStream(stream) {
if (!isReadable(stream)) {
throw new Error('All input streams must be readable');
}
}
var self = this;
function OrderedStreams(streams, options) {
streams = streams || [];
stream._buffer = [];
if (!Array.isArray(streams)) {
streams = [streams];
}
stream.on('readable', function () {
var chunk = stream.read();
while (chunk) {
if (this === streams[0]) {
self.push(chunk);
} else {
this._buffer.push(chunk);
}
chunk = stream.read();
}
streams = Array.prototype.concat.apply([], streams);
streams.forEach(assertReadableStream);
options = Object.assign({}, options, {
read: read,
predestroy: predestroy,
});
stream.on('end', function () {
for (var stream = streams[0];
stream && stream._readableState.ended;
stream = streams[0]) {
while (stream._buffer.length) {
self.push(stream._buffer.shift());
}
var readable = new Readable(options);
streams.shift();
var streamIdx = 0;
var activeStream = streams[streamIdx];
var destroyedIdx = -1;
var destroyedByError = false;
var readableClosed = false;
streams.forEach(setup);
function setup(stream, idx) {
stream.on('data', onData);
stream.once('error', onError);
stream.once('end', onEnd);
stream.once('close', onClose);
stream.pause();
function cleanup() {
stream.off('data', onData);
stream.off('error', onError);
stream.off('end', onEnd);
stream.off('close', onClose);
}
if (!streams.length) {
self.push(null);
function onError(err) {
destroyedByError = true;
cleanup();
readable.destroy(err);
}
});
stream.on('error', this.emit.bind(this, 'error'));
function onEnd() {
streamIdx++;
activeStream = streams[streamIdx];
cleanup();
if (activeStream) {
activeStream.resume();
} else {
readable.push(null);
}
}
streams.push(stream);
}
function onClose() {
destroyedIdx = idx;
readableClosed = true;
cleanup();
readable.destroy();
}
}
function OrderedStreams (streams, options) {
if (!(this instanceof(OrderedStreams))) {
return new OrderedStreams(streams, options);
function predestroy() {
streams.forEach(destroyStream);
}
streams = streams || [];
options = options || {};
function destroyStream(stream, idx) {
if (destroyedIdx === idx) {
return;
}
options.objectMode = true;
if (destroyedByError) {
return stream.destroy();
}
if (readableClosed) {
return stream.destroy();
}
Readable.call(this, options);
stream.destroy(new Error('Wrapper destroyed'));
}
if (!Array.isArray(streams)) {
streams = [streams];
function onData(chunk) {
var drained = readable.push(chunk);
// If the stream is not drained, we pause the activeStream
// The activeStream will be resumed on the next call to `read`
if (!drained) {
activeStream.pause();
}
}
if (!streams.length) {
return this.push(null); // no streams, close
function read(cb) {
if (activeStream) {
activeStream.resume();
} else {
readable.push(null);
}
cb();
}
var addStreamBinded = addStream.bind(this, []);
function addSource(stream) {
assertReadableStream(stream);
var idx = streams.push(stream);
setup(stream, idx);
activeStream = streams[streamIdx];
}
streams.forEach(function (item) {
if (Array.isArray(item)) {
item.forEach(addStreamBinded);
} else {
addStreamBinded(item);
}
});
readable.addSource = addSource;
return readable;
}
util.inherits(OrderedStreams, Readable);
OrderedStreams.prototype._read = function () {};
module.exports = OrderedStreams;
{
"name": "ordered-read-streams",
"version": "1.0.1",
"description": "Combines array of streams into one read stream in strict order",
"version": "2.0.0",
"description": "Combines array of streams into one Readable stream in strict order.",
"author": "Gulp Team <team@gulpjs.com> (https://gulpjs.com/)",
"contributors": [
"Blaine Bublitz <blaine.bublitz@gmail.com>",
"Artem Medeu <artem.medeusheyev@gmail.com>"
],
"repository": "gulpjs/ordered-read-streams",
"license": "MIT",
"engines": {
"node": ">= 10.13.0"
},
"files": [
"LICENSE",
"index.js"
],
"scripts": {
"test": "jscs *.js test/*js && jshint *.js test/*.js && mocha"
"lint": "eslint .",
"pretest": "npm run lint",
"test": "nyc mocha --async-only"
},
"repository": "armed/ordered-read-streams",
"author": "Artem Medeusheyev <artem.medeusheyev@gmail.com>",
"license": "MIT",
"dependencies": {
"readable-stream": "^2.0.1"
"streamx": "^2.12.5"
},
"devDependencies": {
"expect": "^1.20.2",
"jscs": "^1.13.1",
"jshint": "^2.8.0",
"mississippi": "^1.3.0",
"mocha": "^2.2.5",
"pre-commit": "^1.0.10",
"through2": "^2.0.0"
}
"eslint": "^7.32.0",
"eslint-config-gulp": "^5.0.1",
"eslint-plugin-node": "^11.1.0",
"expect": "^27.4.2",
"mocha": "^8.4.0",
"nyc": "^15.1.0",
"readable-stream": "^3.6.0"
},
"nyc": {
"reporter": [
"lcov",
"text-summary"
]
},
"prettier": {
"singleQuote": true
},
"keywords": [
"streams",
"ordered",
"group",
"combine",
"streamx",
"readable"
]
}

@@ -1,65 +0,96 @@

# ordered-read-streams [![NPM version](https://img.shields.io/npm/v/ordered-read-streams.svg)](http://badge.fury.io/js/ordered-read-streams) [![Build Status](https://travis-ci.org/armed/ordered-read-streams.svg?branch=master)](https://travis-ci.org/armed/ordered-read-streams)
<p align="center">
<a href="https://gulpjs.com">
<img height="257" width="114" src="https://raw.githubusercontent.com/gulpjs/artwork/master/gulp-2x.png">
</a>
</p>
Combines array of streams into one read stream in strict order.
# ordered-read-streams
## Installation
[![NPM version][npm-image]][npm-url] [![Downloads][downloads-image]][npm-url] [![Build Status][ci-image]][ci-url] [![Coveralls Status][coveralls-image]][coveralls-url]
`npm install ordered-read-streams`
Combines array of streams into one Readable stream in strict order.
## Overview
## Usage
`ordered-read-streams` handles all data/errors from input streams in parallel, but emits data/errors in strict order in which streams are passed to constructor. This is `objectMode = true` stream.
## Example
```js
var through = require('through2');
var Ordered = require('ordered-read-streams');
var { Readable } = require('streamx');
var ordered = require('ordered-read-streams');
var s1 = through.obj(function (data, enc, next) {
var self = this;
setTimeout(function () {
self.push(data);
next();
}, 200)
var s1 = new Readable({
read: function (cb) {
var self = this;
if (self.called) {
self.push(null);
return cb(null);
}
setTimeout(function () {
self.called = true;
self.push('stream 1');
cb(null);
}, 200);
},
});
var s2 = through.obj(function (data, enc, next) {
var self = this;
setTimeout(function () {
self.push(data);
next();
}, 30)
var s2 = new Readable({
read: function (cb) {
var self = this;
if (self.called) {
self.push(null);
return cb(null);
}
setTimeout(function () {
self.called = true;
self.push('stream 2');
cb(null);
}, 30);
},
});
var s3 = through.obj(function (data, enc, next) {
var self = this;
setTimeout(function () {
self.push(data);
next();
}, 100)
var s3 = new Readable({
read: function (cb) {
var self = this;
if (self.called) {
self.push(null);
return cb(null);
}
setTimeout(function () {
self.called = true;
self.push('stream 3');
cb(null);
}, 100);
},
});
var streams = new Ordered([s1, s2, s3]);
streams.on('data', function (data) {
var readable = ordered([s1, s2, s3]);
readable.on('data', function (data) {
console.log(data);
})
// Logs:
// stream 1
// stream 2
// stream 3
});
```
s1.write('stream 1');
s1.end();
## API
s2.write('stream 2');
s2.end();
### `ordered(streams, [options])`
s3.write('stream 3');
s3.end();
```
Ouput will be:
Takes an array of `Readable` streams and produces a single `OrderedReadable` stream that will consume the provided streams in strict order. The produced `Readable` stream respects backpressure on itself and any provided streams.
```
stream 1
stream 2
stream 3
```
#### `orderedReadable.addSource(stream)`
## Licence
The returned `Readable` stream has an `addSource` instance function that takes appends a `Readable` stream to the list of source streams that the `OrderedReadable` is reading from.
## License
MIT
<!-- prettier-ignore-start -->
[downloads-image]: https://img.shields.io/npm/dm/ordered-read-streams.svg?style=flat-square
[npm-url]: https://www.npmjs.com/package/ordered-read-streams
[npm-image]: https://img.shields.io/npm/v/ordered-read-streams.svg?style=flat-square
[ci-url]: https://github.com/gulpjs/ordered-read-streams/actions?query=workflow:dev
[ci-image]: https://img.shields.io/github/workflow/status/gulpjs/ordered-read-streams/dev?style=flat-square
[coveralls-url]: https://coveralls.io/r/gulpjs/ordered-read-streams
[coveralls-image]: https://img.shields.io/coveralls/gulpjs/ordered-read-streams/master.svg?style=flat-square
<!-- prettier-ignore-end -->

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc