stream-to-it
Advanced tools
Comparing version 0.2.1 to 0.2.2
{ | ||
"name": "stream-to-it", | ||
"version": "0.2.1", | ||
"version": "0.2.2", | ||
"description": "Convert Node.js streams to streaming iterables", | ||
@@ -32,2 +32,3 @@ "main": "index.js", | ||
"bl": "^4.0.0", | ||
"delay": "^4.3.0", | ||
"it-pipe": "^1.1.0", | ||
@@ -34,0 +35,0 @@ "nyc": "^15.1.0", |
33
sink.js
@@ -6,3 +6,3 @@ const getIterator = require('get-iterator') | ||
const endSource = (source) => { | ||
const maybeEndSource = (source) => { | ||
if (typeof source.return === 'function') source.return() | ||
@@ -16,4 +16,4 @@ } | ||
if (errCb) errCb(err) | ||
// When the writable errors, end the source to exit iteration early | ||
endSource(source) | ||
// When the writable errors, try to end the source to exit iteration early | ||
maybeEndSource(source) | ||
} | ||
@@ -49,6 +49,6 @@ | ||
const waitForDone = () => { | ||
// Immediately end the source | ||
endSource(source) | ||
// Immediately try to end the source | ||
maybeEndSource(source) | ||
return new Promise((resolve, reject) => { | ||
if (closed || finished) return resolve() | ||
if (closed || finished || error) return resolve() | ||
finishCb = closeCb = resolve | ||
@@ -72,3 +72,3 @@ errCb = reject | ||
for await (const value of source) { | ||
if (!writable.writable || writable.destroyed) break | ||
if (!writable.writable || writable.destroyed || error) break | ||
@@ -80,9 +80,14 @@ if (writable.write(value) === false) { | ||
} catch (err) { | ||
// The writable did not error, give it the error | ||
writable.destroy(err) | ||
// error is set by stream error handler so only destroy stream if source threw | ||
if (!error) { | ||
writable.destroy() | ||
} | ||
// could we be obscuring an error here? | ||
error = err | ||
} | ||
try { | ||
// Everything is good and we're done writing, end everything | ||
if (!error && writable.writable) { | ||
// We're done writing, end everything (n.b. stream may be destroyed at this point but then this is a no-op) | ||
if (writable.writable) { | ||
writable.end() | ||
@@ -93,2 +98,5 @@ } | ||
await waitForDone() | ||
// Notify the user an error occurred | ||
if (error) throw error | ||
} finally { | ||
@@ -98,5 +106,2 @@ // Clean up listeners | ||
} | ||
// Notify the user an error occurred | ||
if (error) throw error | ||
} |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
9301
121
0
8