ordered-read-streams
Advanced tools
Comparing version 1.0.1 to 2.0.0
154
index.js
@@ -1,3 +0,2 @@ | ||
var Readable = require('readable-stream/readable'); | ||
var util = require('util'); | ||
var Readable = require('streamx').Readable; | ||
@@ -13,88 +12,131 @@ function isReadable(stream) { | ||
if (typeof stream._read !== 'function') { | ||
if (typeof stream.read !== 'function') { | ||
return false; | ||
} | ||
if (!stream._readableState) { | ||
return false; | ||
} | ||
return true; | ||
} | ||
function addStream (streams, stream) { | ||
function assertReadableStream(stream) { | ||
if (!isReadable(stream)) { | ||
throw new Error('All input streams must be readable'); | ||
} | ||
} | ||
var self = this; | ||
function OrderedStreams(streams, options) { | ||
streams = streams || []; | ||
stream._buffer = []; | ||
if (!Array.isArray(streams)) { | ||
streams = [streams]; | ||
} | ||
stream.on('readable', function () { | ||
var chunk = stream.read(); | ||
while (chunk) { | ||
if (this === streams[0]) { | ||
self.push(chunk); | ||
} else { | ||
this._buffer.push(chunk); | ||
} | ||
chunk = stream.read(); | ||
} | ||
streams = Array.prototype.concat.apply([], streams); | ||
streams.forEach(assertReadableStream); | ||
options = Object.assign({}, options, { | ||
read: read, | ||
predestroy: predestroy, | ||
}); | ||
stream.on('end', function () { | ||
for (var stream = streams[0]; | ||
stream && stream._readableState.ended; | ||
stream = streams[0]) { | ||
while (stream._buffer.length) { | ||
self.push(stream._buffer.shift()); | ||
} | ||
var readable = new Readable(options); | ||
streams.shift(); | ||
var streamIdx = 0; | ||
var activeStream = streams[streamIdx]; | ||
var destroyedIdx = -1; | ||
var destroyedByError = false; | ||
var readableClosed = false; | ||
streams.forEach(setup); | ||
function setup(stream, idx) { | ||
stream.on('data', onData); | ||
stream.once('error', onError); | ||
stream.once('end', onEnd); | ||
stream.once('close', onClose); | ||
stream.pause(); | ||
function cleanup() { | ||
stream.off('data', onData); | ||
stream.off('error', onError); | ||
stream.off('end', onEnd); | ||
stream.off('close', onClose); | ||
} | ||
if (!streams.length) { | ||
self.push(null); | ||
function onError(err) { | ||
destroyedByError = true; | ||
cleanup(); | ||
readable.destroy(err); | ||
} | ||
}); | ||
stream.on('error', this.emit.bind(this, 'error')); | ||
function onEnd() { | ||
streamIdx++; | ||
activeStream = streams[streamIdx]; | ||
cleanup(); | ||
if (activeStream) { | ||
activeStream.resume(); | ||
} else { | ||
readable.push(null); | ||
} | ||
} | ||
streams.push(stream); | ||
} | ||
function onClose() { | ||
destroyedIdx = idx; | ||
readableClosed = true; | ||
cleanup(); | ||
readable.destroy(); | ||
} | ||
} | ||
function OrderedStreams (streams, options) { | ||
if (!(this instanceof(OrderedStreams))) { | ||
return new OrderedStreams(streams, options); | ||
function predestroy() { | ||
streams.forEach(destroyStream); | ||
} | ||
streams = streams || []; | ||
options = options || {}; | ||
function destroyStream(stream, idx) { | ||
if (destroyedIdx === idx) { | ||
return; | ||
} | ||
options.objectMode = true; | ||
if (destroyedByError) { | ||
return stream.destroy(); | ||
} | ||
if (readableClosed) { | ||
return stream.destroy(); | ||
} | ||
Readable.call(this, options); | ||
stream.destroy(new Error('Wrapper destroyed')); | ||
} | ||
if (!Array.isArray(streams)) { | ||
streams = [streams]; | ||
function onData(chunk) { | ||
var drained = readable.push(chunk); | ||
// If the stream is not drained, we pause the activeStream | ||
// The activeStream will be resumed on the next call to `read` | ||
if (!drained) { | ||
activeStream.pause(); | ||
} | ||
} | ||
if (!streams.length) { | ||
return this.push(null); // no streams, close | ||
function read(cb) { | ||
if (activeStream) { | ||
activeStream.resume(); | ||
} else { | ||
readable.push(null); | ||
} | ||
cb(); | ||
} | ||
var addStreamBinded = addStream.bind(this, []); | ||
function addSource(stream) { | ||
assertReadableStream(stream); | ||
var idx = streams.push(stream); | ||
setup(stream, idx); | ||
activeStream = streams[streamIdx]; | ||
} | ||
streams.forEach(function (item) { | ||
if (Array.isArray(item)) { | ||
item.forEach(addStreamBinded); | ||
} else { | ||
addStreamBinded(item); | ||
} | ||
}); | ||
readable.addSource = addSource; | ||
return readable; | ||
} | ||
util.inherits(OrderedStreams, Readable); | ||
OrderedStreams.prototype._read = function () {}; | ||
module.exports = OrderedStreams; |
{ | ||
"name": "ordered-read-streams", | ||
"version": "1.0.1", | ||
"description": "Combines array of streams into one read stream in strict order", | ||
"version": "2.0.0", | ||
"description": "Combines array of streams into one Readable stream in strict order.", | ||
"author": "Gulp Team <team@gulpjs.com> (https://gulpjs.com/)", | ||
"contributors": [ | ||
"Blaine Bublitz <blaine.bublitz@gmail.com>", | ||
"Artem Medeu <artem.medeusheyev@gmail.com>" | ||
], | ||
"repository": "gulpjs/ordered-read-streams", | ||
"license": "MIT", | ||
"engines": { | ||
"node": ">= 10.13.0" | ||
}, | ||
"files": [ | ||
"LICENSE", | ||
"index.js" | ||
], | ||
"scripts": { | ||
"test": "jscs *.js test/*js && jshint *.js test/*.js && mocha" | ||
"lint": "eslint .", | ||
"pretest": "npm run lint", | ||
"test": "nyc mocha --async-only" | ||
}, | ||
"repository": "armed/ordered-read-streams", | ||
"author": "Artem Medeusheyev <artem.medeusheyev@gmail.com>", | ||
"license": "MIT", | ||
"dependencies": { | ||
"readable-stream": "^2.0.1" | ||
"streamx": "^2.12.5" | ||
}, | ||
"devDependencies": { | ||
"expect": "^1.20.2", | ||
"jscs": "^1.13.1", | ||
"jshint": "^2.8.0", | ||
"mississippi": "^1.3.0", | ||
"mocha": "^2.2.5", | ||
"pre-commit": "^1.0.10", | ||
"through2": "^2.0.0" | ||
} | ||
"eslint": "^7.32.0", | ||
"eslint-config-gulp": "^5.0.1", | ||
"eslint-plugin-node": "^11.1.0", | ||
"expect": "^27.4.2", | ||
"mocha": "^8.4.0", | ||
"nyc": "^15.1.0", | ||
"readable-stream": "^3.6.0" | ||
}, | ||
"nyc": { | ||
"reporter": [ | ||
"lcov", | ||
"text-summary" | ||
] | ||
}, | ||
"prettier": { | ||
"singleQuote": true | ||
}, | ||
"keywords": [ | ||
"streams", | ||
"ordered", | ||
"group", | ||
"combine", | ||
"streamx", | ||
"readable" | ||
] | ||
} |
123
README.md
@@ -1,65 +0,96 @@ | ||
# ordered-read-streams [![NPM version](https://img.shields.io/npm/v/ordered-read-streams.svg)](http://badge.fury.io/js/ordered-read-streams) [![Build Status](https://travis-ci.org/armed/ordered-read-streams.svg?branch=master)](https://travis-ci.org/armed/ordered-read-streams) | ||
<p align="center"> | ||
<a href="https://gulpjs.com"> | ||
<img height="257" width="114" src="https://raw.githubusercontent.com/gulpjs/artwork/master/gulp-2x.png"> | ||
</a> | ||
</p> | ||
Combines array of streams into one read stream in strict order. | ||
# ordered-read-streams | ||
## Installation | ||
[![NPM version][npm-image]][npm-url] [![Downloads][downloads-image]][npm-url] [![Build Status][ci-image]][ci-url] [![Coveralls Status][coveralls-image]][coveralls-url] | ||
`npm install ordered-read-streams` | ||
Combines array of streams into one Readable stream in strict order. | ||
## Overview | ||
## Usage | ||
`ordered-read-streams` handles all data/errors from input streams in parallel, but emits data/errors in strict order in which streams are passed to constructor. This is `objectMode = true` stream. | ||
## Example | ||
```js | ||
var through = require('through2'); | ||
var Ordered = require('ordered-read-streams'); | ||
var { Readable } = require('streamx'); | ||
var ordered = require('ordered-read-streams'); | ||
var s1 = through.obj(function (data, enc, next) { | ||
var self = this; | ||
setTimeout(function () { | ||
self.push(data); | ||
next(); | ||
}, 200) | ||
var s1 = new Readable({ | ||
read: function (cb) { | ||
var self = this; | ||
if (self.called) { | ||
self.push(null); | ||
return cb(null); | ||
} | ||
setTimeout(function () { | ||
self.called = true; | ||
self.push('stream 1'); | ||
cb(null); | ||
}, 200); | ||
}, | ||
}); | ||
var s2 = through.obj(function (data, enc, next) { | ||
var self = this; | ||
setTimeout(function () { | ||
self.push(data); | ||
next(); | ||
}, 30) | ||
var s2 = new Readable({ | ||
read: function (cb) { | ||
var self = this; | ||
if (self.called) { | ||
self.push(null); | ||
return cb(null); | ||
} | ||
setTimeout(function () { | ||
self.called = true; | ||
self.push('stream 2'); | ||
cb(null); | ||
}, 30); | ||
}, | ||
}); | ||
var s3 = through.obj(function (data, enc, next) { | ||
var self = this; | ||
setTimeout(function () { | ||
self.push(data); | ||
next(); | ||
}, 100) | ||
var s3 = new Readable({ | ||
read: function (cb) { | ||
var self = this; | ||
if (self.called) { | ||
self.push(null); | ||
return cb(null); | ||
} | ||
setTimeout(function () { | ||
self.called = true; | ||
self.push('stream 3'); | ||
cb(null); | ||
}, 100); | ||
}, | ||
}); | ||
var streams = new Ordered([s1, s2, s3]); | ||
streams.on('data', function (data) { | ||
var readable = ordered([s1, s2, s3]); | ||
readable.on('data', function (data) { | ||
console.log(data); | ||
}) | ||
// Logs: | ||
// stream 1 | ||
// stream 2 | ||
// stream 3 | ||
}); | ||
``` | ||
s1.write('stream 1'); | ||
s1.end(); | ||
## API | ||
s2.write('stream 2'); | ||
s2.end(); | ||
### `ordered(streams, [options])` | ||
s3.write('stream 3'); | ||
s3.end(); | ||
``` | ||
Ouput will be: | ||
Takes an array of `Readable` streams and produces a single `OrderedReadable` stream that will consume the provided streams in strict order. The produced `Readable` stream respects backpressure on itself and any provided streams. | ||
``` | ||
stream 1 | ||
stream 2 | ||
stream 3 | ||
``` | ||
#### `orderedReadable.addSource(stream)` | ||
## Licence | ||
The returned `Readable` stream has an `addSource` instance function that takes appends a `Readable` stream to the list of source streams that the `OrderedReadable` is reading from. | ||
## License | ||
MIT | ||
<!-- prettier-ignore-start --> | ||
[downloads-image]: https://img.shields.io/npm/dm/ordered-read-streams.svg?style=flat-square | ||
[npm-url]: https://www.npmjs.com/package/ordered-read-streams | ||
[npm-image]: https://img.shields.io/npm/v/ordered-read-streams.svg?style=flat-square | ||
[ci-url]: https://github.com/gulpjs/ordered-read-streams/actions?query=workflow:dev | ||
[ci-image]: https://img.shields.io/github/workflow/status/gulpjs/ordered-read-streams/dev?style=flat-square | ||
[coveralls-url]: https://coveralls.io/r/gulpjs/ordered-read-streams | ||
[coveralls-image]: https://img.shields.io/coveralls/gulpjs/ordered-read-streams/master.svg?style=flat-square | ||
<!-- prettier-ignore-end --> |
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
No repository
Supply chain riskPackage does not have a linked source code repository. Without this field, a package will have no reference to the location of the source code use to generate the package.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
New author
Supply chain riskA new npm collaborator published a version of the package for the first time. New collaborators are usually benign additions to a project, but do indicate a change to the security surface area of a package.
Found 1 instance in 1 package
No repository
Supply chain riskPackage does not have a linked source code repository. Without this field, a package will have no reference to the location of the source code use to generate the package.
Found 1 instance in 1 package
7734
111
97
0
+ Addedstreamx@^2.12.5
+ Addedb4a@1.6.7(transitive)
+ Addedbare-events@2.5.0(transitive)
+ Addedfast-fifo@1.3.2(transitive)
+ Addedqueue-tick@1.0.1(transitive)
+ Addedstreamx@2.21.1(transitive)
+ Addedtext-decoder@1.2.3(transitive)
- Removedreadable-stream@^2.0.1
- Removedcore-util-is@1.0.3(transitive)
- Removedinherits@2.0.4(transitive)
- Removedisarray@1.0.0(transitive)
- Removedprocess-nextick-args@2.0.1(transitive)
- Removedreadable-stream@2.3.8(transitive)
- Removedsafe-buffer@5.1.2(transitive)
- Removedstring_decoder@1.1.1(transitive)
- Removedutil-deprecate@1.0.2(transitive)