readable-stream
Advanced tools
Comparing version 4.3.0 to 4.4.0
'use strict' | ||
const { AbortError, codes } = require('../../ours/errors') | ||
const { isNodeStream, isWebStream, kControllerErrorFunction } = require('./utils') | ||
const eos = require('./end-of-stream') | ||
@@ -15,9 +16,6 @@ const { ERR_INVALID_ARG_TYPE } = codes | ||
} | ||
function isNodeStream(obj) { | ||
return !!(obj && typeof obj.pipe === 'function') | ||
} | ||
module.exports.addAbortSignal = function addAbortSignal(signal, stream) { | ||
validateAbortSignal(signal, 'signal') | ||
if (!isNodeStream(stream)) { | ||
throw new ERR_INVALID_ARG_TYPE('stream', 'stream.Stream', stream) | ||
if (!isNodeStream(stream) && !isWebStream(stream)) { | ||
throw new ERR_INVALID_ARG_TYPE('stream', ['ReadableStream', 'WritableStream', 'Stream'], stream) | ||
} | ||
@@ -30,9 +28,17 @@ return module.exports.addAbortSignalNoValidate(signal, stream) | ||
} | ||
const onAbort = () => { | ||
stream.destroy( | ||
new AbortError(undefined, { | ||
cause: signal.reason | ||
}) | ||
) | ||
} | ||
const onAbort = isNodeStream(stream) | ||
? () => { | ||
stream.destroy( | ||
new AbortError(undefined, { | ||
cause: signal.reason | ||
}) | ||
) | ||
} | ||
: () => { | ||
stream[kControllerErrorFunction]( | ||
new AbortError(undefined, { | ||
cause: signal.reason | ||
}) | ||
) | ||
} | ||
if (signal.aborted) { | ||
@@ -39,0 +45,0 @@ onAbort() |
@@ -6,7 +6,16 @@ 'use strict' | ||
const { destroyer } = require('./destroy') | ||
const { isNodeStream, isReadable, isWritable } = require('./utils') | ||
const { | ||
isNodeStream, | ||
isReadable, | ||
isWritable, | ||
isWebStream, | ||
isTransformStream, | ||
isWritableStream, | ||
isReadableStream | ||
} = require('./utils') | ||
const { | ||
AbortError, | ||
codes: { ERR_INVALID_ARG_VALUE, ERR_MISSING_ARGS } | ||
} = require('../../ours/errors') | ||
const eos = require('./end-of-stream') | ||
module.exports = function compose(...streams) { | ||
@@ -28,10 +37,13 @@ if (streams.length === 0) { | ||
for (let n = 0; n < streams.length; ++n) { | ||
if (!isNodeStream(streams[n])) { | ||
if (!isNodeStream(streams[n]) && !isWebStream(streams[n])) { | ||
// TODO(ronag): Add checks for non streams. | ||
continue | ||
} | ||
if (n < streams.length - 1 && !isReadable(streams[n])) { | ||
if ( | ||
n < streams.length - 1 && | ||
!(isReadable(streams[n]) || isReadableStream(streams[n]) || isTransformStream(streams[n])) | ||
) { | ||
throw new ERR_INVALID_ARG_VALUE(`streams[${n}]`, orgStreams[n], 'must be readable') | ||
} | ||
if (n > 0 && !isWritable(streams[n])) { | ||
if (n > 0 && !(isWritable(streams[n]) || isWritableStream(streams[n]) || isTransformStream(streams[n]))) { | ||
throw new ERR_INVALID_ARG_VALUE(`streams[${n}]`, orgStreams[n], 'must be writable') | ||
@@ -58,4 +70,4 @@ } | ||
const tail = pipeline(streams, onfinished) | ||
const writable = !!isWritable(head) | ||
const readable = !!isReadable(tail) | ||
const writable = !!(isWritable(head) || isWritableStream(head) || isTransformStream(head)) | ||
const readable = !!(isReadable(tail) || isReadableStream(tail) || isTransformStream(tail)) | ||
@@ -73,21 +85,45 @@ // TODO(ronag): Avoid double buffering. | ||
if (writable) { | ||
d._write = function (chunk, encoding, callback) { | ||
if (head.write(chunk, encoding)) { | ||
callback() | ||
} else { | ||
ondrain = callback | ||
if (isNodeStream(head)) { | ||
d._write = function (chunk, encoding, callback) { | ||
if (head.write(chunk, encoding)) { | ||
callback() | ||
} else { | ||
ondrain = callback | ||
} | ||
} | ||
d._final = function (callback) { | ||
head.end() | ||
onfinish = callback | ||
} | ||
head.on('drain', function () { | ||
if (ondrain) { | ||
const cb = ondrain | ||
ondrain = null | ||
cb() | ||
} | ||
}) | ||
} else if (isWebStream(head)) { | ||
const writable = isTransformStream(head) ? head.writable : head | ||
const writer = writable.getWriter() | ||
d._write = async function (chunk, encoding, callback) { | ||
try { | ||
await writer.ready | ||
writer.write(chunk).catch(() => {}) | ||
callback() | ||
} catch (err) { | ||
callback(err) | ||
} | ||
} | ||
d._final = async function (callback) { | ||
try { | ||
await writer.ready | ||
writer.close().catch(() => {}) | ||
onfinish = callback | ||
} catch (err) { | ||
callback(err) | ||
} | ||
} | ||
} | ||
d._final = function (callback) { | ||
head.end() | ||
onfinish = callback | ||
} | ||
head.on('drain', function () { | ||
if (ondrain) { | ||
const cb = ondrain | ||
ondrain = null | ||
cb() | ||
} | ||
}) | ||
tail.on('finish', function () { | ||
const toRead = isTransformStream(tail) ? tail.readable : tail | ||
eos(toRead, () => { | ||
if (onfinish) { | ||
@@ -101,23 +137,44 @@ const cb = onfinish | ||
if (readable) { | ||
tail.on('readable', function () { | ||
if (onreadable) { | ||
const cb = onreadable | ||
onreadable = null | ||
cb() | ||
} | ||
}) | ||
tail.on('end', function () { | ||
d.push(null) | ||
}) | ||
d._read = function () { | ||
while (true) { | ||
const buf = tail.read() | ||
if (buf === null) { | ||
onreadable = d._read | ||
return | ||
if (isNodeStream(tail)) { | ||
tail.on('readable', function () { | ||
if (onreadable) { | ||
const cb = onreadable | ||
onreadable = null | ||
cb() | ||
} | ||
if (!d.push(buf)) { | ||
return | ||
}) | ||
tail.on('end', function () { | ||
d.push(null) | ||
}) | ||
d._read = function () { | ||
while (true) { | ||
const buf = tail.read() | ||
if (buf === null) { | ||
onreadable = d._read | ||
return | ||
} | ||
if (!d.push(buf)) { | ||
return | ||
} | ||
} | ||
} | ||
} else if (isWebStream(tail)) { | ||
const readable = isTransformStream(tail) ? tail.readable : tail | ||
const reader = readable.getReader() | ||
d._read = async function () { | ||
while (true) { | ||
try { | ||
const { value, done } = await reader.read() | ||
if (!d.push(value)) { | ||
return | ||
} | ||
if (done) { | ||
d.push(null) | ||
return | ||
} | ||
} catch { | ||
return | ||
} | ||
} | ||
} | ||
} | ||
@@ -136,3 +193,5 @@ } | ||
onclose = callback | ||
destroyer(tail, err) | ||
if (isNodeStream(tail)) { | ||
destroyer(tail, err) | ||
} | ||
} | ||
@@ -139,0 +198,0 @@ } |
@@ -39,3 +39,3 @@ 'use strict' | ||
const s = w || r | ||
if ((w && w.destroyed) || (r && r.destroyed)) { | ||
if ((w !== null && w !== undefined && w.destroyed) || (r !== null && r !== undefined && r.destroyed)) { | ||
if (typeof cb === 'function') { | ||
@@ -111,3 +111,3 @@ cb() | ||
} | ||
if ((w && w.emitClose) || (r && r.emitClose)) { | ||
if ((w !== null && w !== undefined && w.emitClose) || (r !== null && r !== undefined && r.emitClose)) { | ||
self.emit('close') | ||
@@ -119,3 +119,3 @@ } | ||
const w = self._writableState | ||
if ((w && w.errorEmitted) || (r && r.errorEmitted)) { | ||
if ((w !== null && w !== undefined && w.errorEmitted) || (r !== null && r !== undefined && r.errorEmitted)) { | ||
return | ||
@@ -168,6 +168,7 @@ } | ||
const w = stream._writableState | ||
if ((w && w.destroyed) || (r && r.destroyed)) { | ||
if ((w !== null && w !== undefined && w.destroyed) || (r !== null && r !== undefined && r.destroyed)) { | ||
return this | ||
} | ||
if ((r && r.autoDestroy) || (w && w.autoDestroy)) stream.destroy(err) | ||
if ((r !== null && r !== undefined && r.autoDestroy) || (w !== null && w !== undefined && w.autoDestroy)) | ||
stream.destroy(err) | ||
else if (err) { | ||
@@ -235,5 +236,7 @@ // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364 | ||
try { | ||
stream._construct(onConstruct) | ||
stream._construct((err) => { | ||
process.nextTick(onConstruct, err) | ||
}) | ||
} catch (err) { | ||
onConstruct(err) | ||
process.nextTick(onConstruct, err) | ||
} | ||
@@ -245,3 +248,3 @@ } | ||
function isRequest(stream) { | ||
return stream && stream.setHeader && typeof stream.abort === 'function' | ||
return (stream === null || stream === undefined ? undefined : stream.setHeader) && typeof stream.abort === 'function' | ||
} | ||
@@ -248,0 +251,0 @@ function emitCloseLegacy(stream) { |
@@ -285,4 +285,2 @@ /* replacement start */ | ||
d.destroy(err) | ||
} else if (!readable && !writable) { | ||
d.destroy() | ||
} | ||
@@ -289,0 +287,0 @@ } |
@@ -13,4 +13,4 @@ /* replacement start */ | ||
const { kEmptyObject, once } = require('../../ours/util') | ||
const { validateAbortSignal, validateFunction, validateObject } = require('../validators') | ||
const { Promise } = require('../../ours/primordials') | ||
const { validateAbortSignal, validateFunction, validateObject, validateBoolean } = require('../validators') | ||
const { Promise, PromisePrototypeThen } = require('../../ours/primordials') | ||
const { | ||
@@ -20,2 +20,3 @@ isClosed, | ||
isReadableNodeStream, | ||
isReadableStream, | ||
isReadableFinished, | ||
@@ -25,6 +26,8 @@ isReadableErrored, | ||
isWritableNodeStream, | ||
isWritableStream, | ||
isWritableFinished, | ||
isWritableErrored, | ||
isNodeStream, | ||
willEmitClose: _willEmitClose | ||
willEmitClose: _willEmitClose, | ||
kIsClosedPromise | ||
} = require('./utils') | ||
@@ -48,2 +51,8 @@ function isRequest(stream) { | ||
callback = once(callback) | ||
if (isReadableStream(stream) || isWritableStream(stream)) { | ||
return eosWeb(stream, options, callback) | ||
} | ||
if (!isNodeStream(stream)) { | ||
throw new ERR_INVALID_ARG_TYPE('stream', ['ReadableStream', 'WritableStream', 'Stream'], stream) | ||
} | ||
const readable = | ||
@@ -57,6 +66,2 @@ (_options$readable = options.readable) !== null && _options$readable !== undefined | ||
: isWritableNodeStream(stream) | ||
if (!isNodeStream(stream)) { | ||
// TODO: Webstreams. | ||
throw new ERR_INVALID_ARG_TYPE('stream', 'Stream', stream) | ||
} | ||
const wState = stream._writableState | ||
@@ -125,2 +130,10 @@ const rState = stream._readableState | ||
} | ||
const onclosed = () => { | ||
closed = true | ||
const errored = isWritableErrored(stream) || isReadableErrored(stream) | ||
if (errored && typeof errored !== 'boolean') { | ||
return callback.call(stream, errored) | ||
} | ||
callback.call(stream) | ||
} | ||
const onrequest = () => { | ||
@@ -162,3 +175,3 @@ stream.req.on('finish', onfinish) | ||
if (!willEmitClose) { | ||
process.nextTick(onclose) | ||
process.nextTick(onclosed) | ||
} | ||
@@ -170,3 +183,3 @@ } else if ( | ||
) { | ||
process.nextTick(onclose) | ||
process.nextTick(onclosed) | ||
} else if ( | ||
@@ -177,5 +190,5 @@ !writable && | ||
) { | ||
process.nextTick(onclose) | ||
process.nextTick(onclosed) | ||
} else if (rState && stream.req && stream.aborted) { | ||
process.nextTick(onclose) | ||
process.nextTick(onclosed) | ||
} | ||
@@ -221,5 +234,49 @@ const cleanup = () => { | ||
} | ||
function eosWeb(stream, options, callback) { | ||
let isAborted = false | ||
let abort = nop | ||
if (options.signal) { | ||
abort = () => { | ||
isAborted = true | ||
callback.call( | ||
stream, | ||
new AbortError(undefined, { | ||
cause: options.signal.reason | ||
}) | ||
) | ||
} | ||
if (options.signal.aborted) { | ||
process.nextTick(abort) | ||
} else { | ||
const originalCallback = callback | ||
callback = once((...args) => { | ||
options.signal.removeEventListener('abort', abort) | ||
originalCallback.apply(stream, args) | ||
}) | ||
options.signal.addEventListener('abort', abort) | ||
} | ||
} | ||
const resolverFn = (...args) => { | ||
if (!isAborted) { | ||
process.nextTick(() => callback.apply(stream, args)) | ||
} | ||
} | ||
PromisePrototypeThen(stream[kIsClosedPromise].promise, resolverFn, resolverFn) | ||
return nop | ||
} | ||
function finished(stream, opts) { | ||
var _opts | ||
let autoCleanup = false | ||
if (opts === null) { | ||
opts = kEmptyObject | ||
} | ||
if ((_opts = opts) !== null && _opts !== undefined && _opts.cleanup) { | ||
validateBoolean(opts.cleanup, 'cleanup') | ||
autoCleanup = opts.cleanup | ||
} | ||
return new Promise((resolve, reject) => { | ||
eos(stream, opts, (err) => { | ||
const cleanup = eos(stream, opts, (err) => { | ||
if (autoCleanup) { | ||
cleanup() | ||
} | ||
if (err) { | ||
@@ -226,0 +283,0 @@ reject(err) |
@@ -5,3 +5,3 @@ 'use strict' | ||
const { | ||
codes: { ERR_INVALID_ARG_TYPE, ERR_MISSING_ARGS, ERR_OUT_OF_RANGE }, | ||
codes: { ERR_INVALID_ARG_VALUE, ERR_INVALID_ARG_TYPE, ERR_MISSING_ARGS, ERR_OUT_OF_RANGE }, | ||
AbortError | ||
@@ -12,2 +12,5 @@ } = require('../../ours/errors') | ||
const { finished } = require('./end-of-stream') | ||
const staticCompose = require('./compose') | ||
const { addAbortSignalNoValidate } = require('./add-abort-signal') | ||
const { isWritable, isNodeStream } = require('./utils') | ||
const { | ||
@@ -25,2 +28,19 @@ ArrayPrototypePush, | ||
const kEof = Symbol('kEof') | ||
function compose(stream, options) { | ||
if (options != null) { | ||
validateObject(options, 'options') | ||
} | ||
if ((options === null || options === undefined ? undefined : options.signal) != null) { | ||
validateAbortSignal(options.signal, 'options.signal') | ||
} | ||
if (isNodeStream(stream) && !isWritable(stream)) { | ||
throw new ERR_INVALID_ARG_VALUE('stream', stream, 'must be writable') | ||
} | ||
const composedStream = staticCompose(this, stream) | ||
if (options !== null && options !== undefined && options.signal) { | ||
// Not validating as we already validated before | ||
addAbortSignalNoValidate(options.signal, composedStream) | ||
} | ||
return composedStream | ||
} | ||
function map(fn, options) { | ||
@@ -430,3 +450,4 @@ if (typeof fn !== 'function') { | ||
map, | ||
take | ||
take, | ||
compose | ||
} | ||
@@ -433,0 +454,0 @@ module.exports.promiseReturningOperators = { |
@@ -27,3 +27,12 @@ /* replacement start */ | ||
const { validateFunction, validateAbortSignal } = require('../validators') | ||
const { isIterable, isReadable, isReadableNodeStream, isNodeStream } = require('./utils') | ||
const { | ||
isIterable, | ||
isReadable, | ||
isReadableNodeStream, | ||
isNodeStream, | ||
isTransformStream, | ||
isWebStream, | ||
isReadableStream, | ||
isReadableEnded | ||
} = require('./utils') | ||
const AbortController = globalThis.AbortController || require('abort-controller').AbortController | ||
@@ -78,3 +87,3 @@ let PassThrough | ||
} | ||
async function pump(iterable, writable, finish, { end }) { | ||
async function pumpToNode(iterable, writable, finish, { end }) { | ||
let error | ||
@@ -135,2 +144,27 @@ let onresolve = null | ||
} | ||
async function pumpToWeb(readable, writable, finish, { end }) { | ||
if (isTransformStream(writable)) { | ||
writable = writable.writable | ||
} | ||
// https://streams.spec.whatwg.org/#example-manual-write-with-backpressure | ||
const writer = writable.getWriter() | ||
try { | ||
for await (const chunk of readable) { | ||
await writer.ready | ||
writer.write(chunk).catch(() => {}) | ||
} | ||
await writer.ready | ||
if (end) { | ||
await writer.close() | ||
} | ||
finish() | ||
} catch (err) { | ||
try { | ||
await writer.abort(err) | ||
finish(err) | ||
} catch (err) { | ||
finish(err) | ||
} | ||
} | ||
} | ||
function pipeline(...streams) { | ||
@@ -221,3 +255,3 @@ return pipelineImpl(streams, once(popCallback(streams))) | ||
} | ||
} else if (isIterable(stream) || isReadableNodeStream(stream)) { | ||
} else if (isIterable(stream) || isReadableNodeStream(stream) || isTransformStream(stream)) { | ||
ret = stream | ||
@@ -228,3 +262,8 @@ } else { | ||
} else if (typeof stream === 'function') { | ||
ret = makeAsyncIterable(ret) | ||
if (isTransformStream(ret)) { | ||
var _ret | ||
ret = makeAsyncIterable((_ret = ret) === null || _ret === undefined ? undefined : _ret.readable) | ||
} else { | ||
ret = makeAsyncIterable(ret) | ||
} | ||
ret = stream(ret, { | ||
@@ -238,3 +277,3 @@ signal | ||
} else { | ||
var _ret | ||
var _ret2 | ||
if (!PassThrough) { | ||
@@ -255,3 +294,3 @@ PassThrough = require('./passthrough') | ||
// second use. | ||
const then = (_ret = ret) === null || _ret === undefined ? undefined : _ret.then | ||
const then = (_ret2 = ret) === null || _ret2 === undefined ? undefined : _ret2.then | ||
if (typeof then === 'function') { | ||
@@ -278,5 +317,11 @@ finishCount++ | ||
finishCount++ | ||
pump(ret, pt, finish, { | ||
pumpToNode(ret, pt, finish, { | ||
end | ||
}) | ||
} else if (isReadableStream(ret) || isTransformStream(ret)) { | ||
const toRead = ret.readable || ret | ||
finishCount++ | ||
pumpToNode(toRead, pt, finish, { | ||
end | ||
}) | ||
} else { | ||
@@ -301,11 +346,45 @@ throw new ERR_INVALID_RETURN_VALUE('AsyncIterable or Promise', 'destination', ret) | ||
} | ||
} else if (isTransformStream(ret) || isReadableStream(ret)) { | ||
const toRead = ret.readable || ret | ||
finishCount++ | ||
pumpToNode(toRead, stream, finish, { | ||
end | ||
}) | ||
} else if (isIterable(ret)) { | ||
finishCount++ | ||
pump(ret, stream, finish, { | ||
pumpToNode(ret, stream, finish, { | ||
end | ||
}) | ||
} else { | ||
throw new ERR_INVALID_ARG_TYPE('val', ['Readable', 'Iterable', 'AsyncIterable'], ret) | ||
throw new ERR_INVALID_ARG_TYPE( | ||
'val', | ||
['Readable', 'Iterable', 'AsyncIterable', 'ReadableStream', 'TransformStream'], | ||
ret | ||
) | ||
} | ||
ret = stream | ||
} else if (isWebStream(stream)) { | ||
if (isReadableNodeStream(ret)) { | ||
finishCount++ | ||
pumpToWeb(makeAsyncIterable(ret), stream, finish, { | ||
end | ||
}) | ||
} else if (isReadableStream(ret) || isIterable(ret)) { | ||
finishCount++ | ||
pumpToWeb(ret, stream, finish, { | ||
end | ||
}) | ||
} else if (isTransformStream(ret)) { | ||
finishCount++ | ||
pumpToWeb(ret.readable, stream, finish, { | ||
end | ||
}) | ||
} else { | ||
throw new ERR_INVALID_ARG_TYPE( | ||
'val', | ||
['Readable', 'Iterable', 'AsyncIterable', 'ReadableStream', 'TransformStream'], | ||
ret | ||
) | ||
} | ||
ret = stream | ||
} else { | ||
@@ -332,4 +411,5 @@ ret = Duplex.from(stream) | ||
src.pipe(dst, { | ||
end | ||
}) | ||
end: false | ||
}) // If end is true we already will have a listener to end dst. | ||
if (end) { | ||
@@ -339,6 +419,13 @@ // Compat. Before node v10.12.0 stdio used to throw an error so | ||
// Now they allow it but "secretly" don't close the underlying fd. | ||
src.once('end', () => { | ||
function endFn() { | ||
ended = true | ||
dst.end() | ||
}) | ||
} | ||
if (isReadableEnded(src)) { | ||
// End the destination if the source has already ended. | ||
process.nextTick(endFn) | ||
} else { | ||
src.once('end', endFn) | ||
} | ||
} else { | ||
@@ -345,0 +432,0 @@ finish() |
'use strict' | ||
const { Symbol, SymbolAsyncIterator, SymbolIterator } = require('../../ours/primordials') | ||
const { Symbol, SymbolAsyncIterator, SymbolIterator, SymbolFor } = require('../../ours/primordials') | ||
const kDestroyed = Symbol('kDestroyed') | ||
@@ -8,2 +8,4 @@ const kIsErrored = Symbol('kIsErrored') | ||
const kIsDisturbed = Symbol('kIsDisturbed') | ||
const kIsClosedPromise = SymbolFor('nodejs.webstream.isClosedPromise') | ||
const kControllerErrorFunction = SymbolFor('nodejs.webstream.controllerErrorFunction') | ||
function isReadableNodeStream(obj, strict = false) { | ||
@@ -60,2 +62,20 @@ var _obj$_readableState | ||
} | ||
function isReadableStream(obj) { | ||
return !!( | ||
obj && | ||
!isNodeStream(obj) && | ||
typeof obj.pipeThrough === 'function' && | ||
typeof obj.getReader === 'function' && | ||
typeof obj.cancel === 'function' | ||
) | ||
} | ||
function isWritableStream(obj) { | ||
return !!(obj && !isNodeStream(obj) && typeof obj.getWriter === 'function' && typeof obj.abort === 'function') | ||
} | ||
function isTransformStream(obj) { | ||
return !!(obj && !isNodeStream(obj) && typeof obj.readable === 'object' && typeof obj.writable === 'object') | ||
} | ||
function isWebStream(obj) { | ||
return isReadableStream(obj) || isWritableStream(obj) || isTransformStream(obj) | ||
} | ||
function isIterable(obj, isAsync) { | ||
@@ -279,2 +299,4 @@ if (obj == null) return false | ||
kIsReadable, | ||
kIsClosedPromise, | ||
kControllerErrorFunction, | ||
isClosed, | ||
@@ -286,2 +308,3 @@ isDestroyed, | ||
isReadableNodeStream, | ||
isReadableStream, | ||
isReadableEnded, | ||
@@ -291,4 +314,6 @@ isReadableFinished, | ||
isNodeStream, | ||
isWebStream, | ||
isWritable, | ||
isWritableNodeStream, | ||
isWritableStream, | ||
isWritableEnded, | ||
@@ -299,3 +324,4 @@ isWritableFinished, | ||
isServerResponse, | ||
willEmitClose | ||
willEmitClose, | ||
isTransformStream | ||
} |
@@ -0,1 +1,3 @@ | ||
/* eslint jsdoc/require-jsdoc: "error" */ | ||
'use strict' | ||
@@ -202,2 +204,9 @@ | ||
} | ||
/** | ||
* @param {any} options | ||
* @param {string} key | ||
* @param {boolean} defaultValue | ||
* @returns {boolean} | ||
*/ | ||
function getOwnPropertyValueOrDefault(options, key, defaultValue) { | ||
@@ -233,2 +242,20 @@ return options == null || !ObjectPrototypeHasOwnProperty(options, key) ? defaultValue : options[key] | ||
/** | ||
* @callback validateDictionary - We are using the Web IDL Standard definition | ||
* of "dictionary" here, which means any value | ||
* whose Type is either Undefined, Null, or | ||
* Object (which includes functions). | ||
* @param {*} value | ||
* @param {string} name | ||
* @see https://webidl.spec.whatwg.org/#es-dictionary | ||
* @see https://tc39.es/ecma262/#table-typeof-operator-results | ||
*/ | ||
/** @type {validateDictionary} */ | ||
const validateDictionary = hideStackFrames((value, name) => { | ||
if (value != null && typeof value !== 'object' && typeof value !== 'function') { | ||
throw new ERR_INVALID_ARG_TYPE(name, 'a dictionary', value) | ||
} | ||
}) | ||
/** | ||
* @callback validateArray | ||
@@ -252,4 +279,33 @@ * @param {*} value | ||
// eslint-disable-next-line jsdoc/require-returns-check | ||
/** | ||
* @callback validateStringArray | ||
* @param {*} value | ||
* @param {string} name | ||
* @returns {asserts value is string[]} | ||
*/ | ||
/** @type {validateStringArray} */ | ||
function validateStringArray(value, name) { | ||
validateArray(value, name) | ||
for (let i = 0; i < value.length; i++) { | ||
validateString(value[i], `${name}[${i}]`) | ||
} | ||
} | ||
/** | ||
* @callback validateBooleanArray | ||
* @param {*} value | ||
* @param {string} name | ||
* @returns {asserts value is boolean[]} | ||
*/ | ||
/** @type {validateBooleanArray} */ | ||
function validateBooleanArray(value, name) { | ||
validateArray(value, name) | ||
for (let i = 0; i < value.length; i++) { | ||
validateBoolean(value[i], `${name}[${i}]`) | ||
} | ||
} | ||
/** | ||
* @param {*} signal | ||
@@ -376,2 +432,57 @@ * @param {string} [name='signal'] | ||
} | ||
/* | ||
The rules for the Link header field are described here: | ||
https://www.rfc-editor.org/rfc/rfc8288.html#section-3 | ||
This regex validates any string surrounded by angle brackets | ||
(not necessarily a valid URI reference) followed by zero or more | ||
link-params separated by semicolons. | ||
*/ | ||
const linkValueRegExp = /^(?:<[^>]*>)(?:\s*;\s*[^;"\s]+(?:=(")?[^;"\s]*\1)?)*$/ | ||
/** | ||
* @param {any} value | ||
* @param {string} name | ||
*/ | ||
function validateLinkHeaderFormat(value, name) { | ||
if (typeof value === 'undefined' || !RegExpPrototypeExec(linkValueRegExp, value)) { | ||
throw new ERR_INVALID_ARG_VALUE( | ||
name, | ||
value, | ||
'must be an array or string of format "</styles.css>; rel=preload; as=style"' | ||
) | ||
} | ||
} | ||
/** | ||
* @param {any} hints | ||
* @return {string} | ||
*/ | ||
function validateLinkHeaderValue(hints) { | ||
if (typeof hints === 'string') { | ||
validateLinkHeaderFormat(hints, 'hints') | ||
return hints | ||
} else if (ArrayIsArray(hints)) { | ||
const hintsLength = hints.length | ||
let result = '' | ||
if (hintsLength === 0) { | ||
return result | ||
} | ||
for (let i = 0; i < hintsLength; i++) { | ||
const link = hints[i] | ||
validateLinkHeaderFormat(link, 'hints') | ||
result += link | ||
if (i !== hintsLength - 1) { | ||
result += ', ' | ||
} | ||
} | ||
return result | ||
} | ||
throw new ERR_INVALID_ARG_VALUE( | ||
'hints', | ||
hints, | ||
'must be an array or string of format "</styles.css>; rel=preload; as=style"' | ||
) | ||
} | ||
module.exports = { | ||
@@ -382,4 +493,7 @@ isInt32, | ||
validateArray, | ||
validateStringArray, | ||
validateBooleanArray, | ||
validateBoolean, | ||
validateBuffer, | ||
validateDictionary, | ||
validateEncoding, | ||
@@ -399,3 +513,4 @@ validateFunction, | ||
validateUnion, | ||
validateAbortSignal | ||
validateAbortSignal, | ||
validateLinkHeaderValue | ||
} |
@@ -93,2 +93,3 @@ 'use strict' | ||
Symbol, | ||
SymbolFor: Symbol.for, | ||
SymbolAsyncIterator: Symbol.asyncIterator, | ||
@@ -95,0 +96,0 @@ SymbolHasInstance: Symbol.hasInstance, |
'use strict' | ||
const { ArrayPrototypePop, Promise } = require('../ours/primordials') | ||
const { isIterable, isNodeStream } = require('../internal/streams/utils') | ||
const { isIterable, isNodeStream, isWebStream } = require('../internal/streams/utils') | ||
const { pipelineImpl: pl } = require('../internal/streams/pipeline') | ||
const { finished } = require('../internal/streams/end-of-stream') | ||
require('stream') | ||
function pipeline(...streams) { | ||
@@ -12,3 +13,9 @@ return new Promise((resolve, reject) => { | ||
const lastArg = streams[streams.length - 1] | ||
if (lastArg && typeof lastArg === 'object' && !isNodeStream(lastArg) && !isIterable(lastArg)) { | ||
if ( | ||
lastArg && | ||
typeof lastArg === 'object' && | ||
!isNodeStream(lastArg) && | ||
!isIterable(lastArg) && | ||
!isWebStream(lastArg) | ||
) { | ||
const options = ArrayPrototypePop(streams) | ||
@@ -15,0 +22,0 @@ signal = options.signal |
{ | ||
"name": "readable-stream", | ||
"version": "4.3.0", | ||
"version": "4.4.0", | ||
"description": "Node.js Streams, a user-land copy of the stream library from Node.js", | ||
@@ -5,0 +5,0 @@ "homepage": "https://github.com/nodejs/readable-stream", |
@@ -14,5 +14,5 @@ # readable-stream | ||
This package is a mirror of the streams implementations in Node.js 18.9.0. | ||
This package is a mirror of the streams implementations in Node.js 18.16.0. | ||
Full documentation may be found on the [Node.js website](https://nodejs.org/dist/v18.9.0/docs/api/stream.html). | ||
Full documentation may be found on the [Node.js website](https://nodejs.org/dist/v18.16.0/docs/api/stream.html). | ||
@@ -19,0 +19,0 @@ If you want to guarantee a stable streams base, regardless of what version of |
210127
6394