stream-to-it
Advanced tools
Comparing version 0.2.0 to 0.2.1
{ | ||
"name": "stream-to-it", | ||
"version": "0.2.0", | ||
"version": "0.2.1", | ||
"description": "Convert Node.js streams to streaming iterables", | ||
"main": "index.js", | ||
"files": [ | ||
"duplex.js", | ||
"sink.js", | ||
"source.js", | ||
"transform.js" | ||
], | ||
"scripts": { | ||
@@ -21,18 +27,13 @@ "test": "ava test/*.test.js --verbose", | ||
"dependencies": { | ||
"get-iterator": "^1.0.2", | ||
"p-defer": "^3.0.0" | ||
"get-iterator": "^1.0.2" | ||
}, | ||
"devDependencies": { | ||
"ava": "^2.2.0", | ||
"ava": "^3.10.1", | ||
"bl": "^4.0.0", | ||
"it-pipe": "^1.0.1", | ||
"nyc": "^14.1.1", | ||
"it-pipe": "^1.1.0", | ||
"nyc": "^15.1.0", | ||
"p-fifo": "^1.0.0", | ||
"standard": "^14.3.1", | ||
"streaming-iterables": "^4.1.0" | ||
"streaming-iterables": "^5.0.2" | ||
}, | ||
"ava": { | ||
"babel": false, | ||
"compileEnhancements": false | ||
}, | ||
"directories": { | ||
@@ -39,0 +40,0 @@ "test": "test" |
@@ -33,3 +33,3 @@ # stream-to-it | ||
Also works with browser [ReadableStream](https://developer.mozilla.org/en-US/docs/Web/API/ReadableStream): | ||
Also works with browser [`ReadableStream`](https://developer.mozilla.org/en-US/docs/Web/API/ReadableStream): | ||
@@ -84,7 +84,7 @@ ```js | ||
### `toIterable.source(stream): Function` | ||
### `toIterable.source(readable): Function` | ||
Convert a [`Readable`](https://nodejs.org/dist/latest/docs/api/stream.html#stream_readable_streams) stream to a [source](https://gist.github.com/alanshaw/591dc7dd54e4f99338a347ef568d6ee9#source-it) iterable. | ||
Convert a [`Readable`](https://nodejs.org/dist/latest/docs/api/stream.html#stream_readable_streams) stream or a browser [`ReadableStream`](https://developer.mozilla.org/en-US/docs/Web/API/ReadableStream) to a [source](https://gist.github.com/alanshaw/591dc7dd54e4f99338a347ef568d6ee9#source-it) iterable. | ||
### `toIterable.sink(sink): Function` | ||
### `toIterable.sink(writable): Function` | ||
@@ -91,0 +91,0 @@ Convert a [`Writable`](https://nodejs.org/dist/latest/docs/api/stream.html#stream_writable_streams) stream to a [sink](https://gist.github.com/alanshaw/591dc7dd54e4f99338a347ef568d6ee9#sink-it) iterable. |
159
sink.js
const getIterator = require('get-iterator') | ||
const defer = require('p-defer') | ||
@@ -7,107 +6,91 @@ module.exports = writable => async source => { | ||
const errPromise = defer() | ||
const closePromise = defer() | ||
const endingPromise = defer() | ||
const finishPromise = defer() | ||
let drainPromise | ||
const endSource = (source) => { | ||
if (typeof source.return === 'function') source.return() | ||
} | ||
const errorHandler = err => errPromise.reject(err) | ||
const closeHandler = () => closePromise.resolve({ closed: true }) | ||
const finishHandler = () => finishPromise.resolve({ finished: true }) | ||
const drainHandler = () => { | ||
if (drainPromise) drainPromise.resolve({ drained: true }) | ||
let error = null | ||
let errCb = null | ||
const errorHandler = (err) => { | ||
error = err | ||
if (errCb) errCb(err) | ||
// When the writable errors, end the source to exit iteration early | ||
endSource(source) | ||
} | ||
// There's no event to determine the start of a call to .end() | ||
const _end = writable.end.bind(writable) | ||
writable.end = (...args) => { | ||
endingPromise.resolve({ ending: true }) | ||
return _end(...args) | ||
let closeCb = null | ||
let closed = false | ||
const closeHandler = () => { | ||
closed = true | ||
if (closeCb) closeCb() | ||
} | ||
writable | ||
.on('error', errorHandler) | ||
.on('close', closeHandler) | ||
.on('finish', finishHandler) | ||
.on('drain', drainHandler) | ||
let finishCb = null | ||
let finished = false | ||
const finishHandler = () => { | ||
finished = true | ||
if (finishCb) finishCb() | ||
} | ||
const getNext = async () => { | ||
try { | ||
return source.next() | ||
} catch (err) { | ||
writable.destroy(err) | ||
return errPromise.promise | ||
} | ||
let drainCb = null | ||
const drainHandler = () => { | ||
if (drainCb) drainCb() | ||
} | ||
try { | ||
while (true) { | ||
// Race the iterator and the error, close and finish listener | ||
const result = await Promise.race([ | ||
errPromise.promise, | ||
closePromise.promise, | ||
endingPromise.promise, | ||
finishPromise.promise, | ||
getNext() | ||
]) | ||
const waitForDrainOrClose = () => { | ||
return new Promise((resolve, reject) => { | ||
closeCb = drainCb = resolve | ||
errCb = reject | ||
writable.once('drain', drainHandler) | ||
}) | ||
} | ||
if (result.closed || result.finished) { | ||
break | ||
} | ||
const waitForDone = () => { | ||
// Immediately end the source | ||
endSource(source) | ||
return new Promise((resolve, reject) => { | ||
if (closed || finished) return resolve() | ||
finishCb = closeCb = resolve | ||
errCb = reject | ||
}) | ||
} | ||
// .end() was called, waiting on flush (finish event) | ||
if (result.ending) { | ||
await Promise.race([ | ||
errPromise.promise, | ||
// TODO: do we need to wait on close? If slow end and destroy is | ||
// called then what is emitted? close or finish? | ||
closePromise.promise, | ||
finishPromise.promise | ||
]) | ||
break | ||
} | ||
const cleanup = () => { | ||
writable.removeListener('error', errorHandler) | ||
writable.removeListener('close', closeHandler) | ||
writable.removeListener('finish', finishHandler) | ||
writable.removeListener('drain', drainHandler) | ||
} | ||
// If destroyed, race err & close to determine reason & then throw/break | ||
if (writable.destroyed) { | ||
await Promise.race([ | ||
errPromise.promise, | ||
closePromise.promise | ||
]) | ||
break | ||
} | ||
writable.once('error', errorHandler) | ||
writable.once('close', closeHandler) | ||
writable.once('finish', finishHandler) | ||
if (result.done) { | ||
writable.end() | ||
await Promise.race([ | ||
errPromise.promise, | ||
// TODO: do we need to wait on close? If slow end and destroy is | ||
// called then what is emitted? close or finish? | ||
closePromise.promise, | ||
finishPromise.promise | ||
]) | ||
break | ||
} | ||
try { | ||
for await (const value of source) { | ||
if (!writable.writable || writable.destroyed) break | ||
if (!writable.write(result.value)) { | ||
drainPromise = defer() | ||
await Promise.race([ | ||
errPromise.promise, | ||
closePromise.promise, | ||
finishPromise.promise, | ||
drainPromise.promise | ||
]) | ||
if (writable.write(value) === false) { | ||
await waitForDrainOrClose() | ||
} | ||
} | ||
} finally { | ||
writable | ||
.removeListener('error', errorHandler) | ||
.removeListener('close', closeHandler) | ||
.removeListener('finish', finishHandler) | ||
.removeListener('drain', drainHandler) | ||
} catch (err) { | ||
// The writable did not error, give it the error | ||
writable.destroy(err) | ||
} | ||
// End the iterator if it is a generator | ||
if (typeof source.return === 'function') { | ||
await source.return() | ||
try { | ||
// Everything is good and we're done writing, end everything | ||
if (!error && writable.writable) { | ||
writable.end() | ||
} | ||
// Wait until we close or finish. This supports halfClosed streams | ||
await waitForDone() | ||
} finally { | ||
// Clean up listeners | ||
cleanup() | ||
} | ||
// Notify the user an error occurred | ||
if (error) throw error | ||
} |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
1
9076
8
117
1
- Removedp-defer@^3.0.0
- Removedp-defer@3.0.0(transitive)