readable-stream
Advanced tools
Comparing version 4.4.2 to 4.5.0
'use strict' | ||
const { SymbolDispose } = require('../../ours/primordials') | ||
const { AbortError, codes } = require('../../ours/errors') | ||
@@ -7,2 +8,3 @@ const { isNodeStream, isWebStream, kControllerErrorFunction } = require('./utils') | ||
const { ERR_INVALID_ARG_TYPE } = codes | ||
let addAbortListener | ||
@@ -46,6 +48,7 @@ // This method is inlined here for readable-stream | ||
} else { | ||
signal.addEventListener('abort', onAbort) | ||
eos(stream, () => signal.removeEventListener('abort', onAbort)) | ||
addAbortListener = addAbortListener || require('../../ours/util').addAbortListener | ||
const disposable = addAbortListener(signal, onAbort) | ||
eos(stream, disposable[SymbolDispose]) | ||
} | ||
return stream | ||
} |
@@ -77,3 +77,3 @@ 'use strict' | ||
writableObjectMode: !!(head !== null && head !== undefined && head.writableObjectMode), | ||
readableObjectMode: !!(tail !== null && tail !== undefined && tail.writableObjectMode), | ||
readableObjectMode: !!(tail !== null && tail !== undefined && tail.readableObjectMode), | ||
writable, | ||
@@ -80,0 +80,0 @@ readable |
@@ -15,3 +15,3 @@ 'use strict' | ||
const { Symbol } = require('../../ours/primordials') | ||
const { kDestroyed, isDestroyed, isFinished, isServerRequest } = require('./utils') | ||
const { kIsDestroyed, isDestroyed, isFinished, isServerRequest } = require('./utils') | ||
const kDestroy = Symbol('kDestroy') | ||
@@ -282,3 +282,3 @@ const kConstruct = Symbol('kConstruct') | ||
if (!stream.destroyed) { | ||
stream[kDestroyed] = true | ||
stream[kIsDestroyed] = true | ||
} | ||
@@ -285,0 +285,0 @@ } |
@@ -16,3 +16,5 @@ /* replacement start */ | ||
isWritableNodeStream, | ||
isDuplexNodeStream | ||
isDuplexNodeStream, | ||
isReadableStream, | ||
isWritableStream | ||
} = require('./utils') | ||
@@ -27,2 +29,3 @@ const eos = require('./end-of-stream') | ||
const Readable = require('./readable') | ||
const Writable = require('./writable') | ||
const { createDeferredPromise } = require('../../ours/util') | ||
@@ -82,13 +85,12 @@ const from = require('./from') | ||
} | ||
// TODO: Webstreams | ||
// if (isReadableStream(body)) { | ||
// return _duplexify({ readable: Readable.fromWeb(body) }); | ||
// } | ||
// TODO: Webstreams | ||
// if (isWritableStream(body)) { | ||
// return _duplexify({ writable: Writable.fromWeb(body) }); | ||
// } | ||
if (isReadableStream(body)) { | ||
return _duplexify({ | ||
readable: Readable.fromWeb(body) | ||
}) | ||
} | ||
if (isWritableStream(body)) { | ||
return _duplexify({ | ||
writable: Writable.fromWeb(body) | ||
}) | ||
} | ||
if (typeof body === 'function') { | ||
@@ -150,12 +152,9 @@ const { value, write, final, destroy } = fromAsyncGen(body) | ||
} | ||
// TODO: Webstreams. | ||
// if ( | ||
// isReadableStream(body?.readable) && | ||
// isWritableStream(body?.writable) | ||
// ) { | ||
// return Duplexify.fromWeb(body); | ||
// } | ||
if ( | ||
isReadableStream(body === null || body === undefined ? undefined : body.readable) && | ||
isWritableStream(body === null || body === undefined ? undefined : body.writable) | ||
) { | ||
return Duplexify.fromWeb(body) | ||
} | ||
if ( | ||
typeof (body === null || body === undefined ? undefined : body.writable) === 'object' || | ||
@@ -162,0 +161,0 @@ typeof (body === null || body === undefined ? undefined : body.readable) === 'object' |
@@ -14,3 +14,3 @@ /* replacement start */ | ||
const { validateAbortSignal, validateFunction, validateObject, validateBoolean } = require('../validators') | ||
const { Promise, PromisePrototypeThen } = require('../../ours/primordials') | ||
const { Promise, PromisePrototypeThen, SymbolDispose } = require('../../ours/primordials') | ||
const { | ||
@@ -32,2 +32,3 @@ isClosed, | ||
} = require('./utils') | ||
let addAbortListener | ||
function isRequest(stream) { | ||
@@ -217,8 +218,9 @@ return stream.setHeader && typeof stream.abort === 'function' | ||
} else { | ||
addAbortListener = addAbortListener || require('../../ours/util').addAbortListener | ||
const disposable = addAbortListener(options.signal, abort) | ||
const originalCallback = callback | ||
callback = once((...args) => { | ||
options.signal.removeEventListener('abort', abort) | ||
disposable[SymbolDispose]() | ||
originalCallback.apply(stream, args) | ||
}) | ||
options.signal.addEventListener('abort', abort) | ||
} | ||
@@ -244,8 +246,9 @@ } | ||
} else { | ||
addAbortListener = addAbortListener || require('../../ours/util').addAbortListener | ||
const disposable = addAbortListener(options.signal, abort) | ||
const originalCallback = callback | ||
callback = once((...args) => { | ||
options.signal.removeEventListener('abort', abort) | ||
disposable[SymbolDispose]() | ||
originalCallback.apply(stream, args) | ||
}) | ||
options.signal.addEventListener('abort', abort) | ||
} | ||
@@ -252,0 +255,0 @@ } |
@@ -10,2 +10,3 @@ 'use strict' | ||
const kWeakHandler = require('../../ours/primordials').Symbol('kWeak') | ||
const kResistStopPropagation = require('../../ours/primordials').Symbol('kResistStopPropagation') | ||
const { finished } = require('./end-of-stream') | ||
@@ -15,4 +16,6 @@ const staticCompose = require('./compose') | ||
const { isWritable, isNodeStream } = require('./utils') | ||
const { deprecate } = require('../../ours/util') | ||
const { | ||
ArrayPrototypePush, | ||
Boolean, | ||
MathFloor, | ||
@@ -23,2 +26,3 @@ Number, | ||
PromiseReject, | ||
PromiseResolve, | ||
PromisePrototypeThen, | ||
@@ -60,37 +64,39 @@ Symbol | ||
} | ||
validateInteger(concurrency, 'concurrency', 1) | ||
let highWaterMark = concurrency - 1 | ||
if ((options === null || options === undefined ? undefined : options.highWaterMark) != null) { | ||
highWaterMark = MathFloor(options.highWaterMark) | ||
} | ||
validateInteger(concurrency, 'options.concurrency', 1) | ||
validateInteger(highWaterMark, 'options.highWaterMark', 0) | ||
highWaterMark += concurrency | ||
return async function* map() { | ||
var _options$signal, _options$signal2 | ||
const ac = new AbortController() | ||
const signal = require('../../ours/util').AbortSignalAny( | ||
[options === null || options === undefined ? undefined : options.signal].filter(Boolean) | ||
) | ||
const stream = this | ||
const queue = [] | ||
const signal = ac.signal | ||
const signalOpt = { | ||
signal | ||
} | ||
const abort = () => ac.abort() | ||
if ( | ||
options !== null && | ||
options !== undefined && | ||
(_options$signal = options.signal) !== null && | ||
_options$signal !== undefined && | ||
_options$signal.aborted | ||
) { | ||
abort() | ||
} | ||
options === null || options === undefined | ||
? undefined | ||
: (_options$signal2 = options.signal) === null || _options$signal2 === undefined | ||
? undefined | ||
: _options$signal2.addEventListener('abort', abort) | ||
let next | ||
let resume | ||
let done = false | ||
function onDone() { | ||
let cnt = 0 | ||
function onCatch() { | ||
done = true | ||
afterItemProcessed() | ||
} | ||
function afterItemProcessed() { | ||
cnt -= 1 | ||
maybeResume() | ||
} | ||
function maybeResume() { | ||
if (resume && !done && cnt < concurrency && queue.length < highWaterMark) { | ||
resume() | ||
resume = null | ||
} | ||
} | ||
async function pump() { | ||
try { | ||
for await (let val of stream) { | ||
var _val | ||
if (done) { | ||
@@ -104,11 +110,11 @@ return | ||
val = fn(val, signalOpt) | ||
if (val === kEmpty) { | ||
continue | ||
} | ||
val = PromiseResolve(val) | ||
} catch (err) { | ||
val = PromiseReject(err) | ||
} | ||
if (val === kEmpty) { | ||
continue | ||
} | ||
if (typeof ((_val = val) === null || _val === undefined ? undefined : _val.catch) === 'function') { | ||
val.catch(onDone) | ||
} | ||
cnt += 1 | ||
PromisePrototypeThen(val, afterItemProcessed, onCatch) | ||
queue.push(val) | ||
@@ -119,3 +125,3 @@ if (next) { | ||
} | ||
if (!done && queue.length && queue.length >= concurrency) { | ||
if (!done && (queue.length >= highWaterMark || cnt >= concurrency)) { | ||
await new Promise((resolve) => { | ||
@@ -129,6 +135,5 @@ resume = resolve | ||
const val = PromiseReject(err) | ||
PromisePrototypeThen(val, undefined, onDone) | ||
PromisePrototypeThen(val, afterItemProcessed, onCatch) | ||
queue.push(val) | ||
} finally { | ||
var _options$signal3 | ||
done = true | ||
@@ -139,7 +144,2 @@ if (next) { | ||
} | ||
options === null || options === undefined | ||
? undefined | ||
: (_options$signal3 = options.signal) === null || _options$signal3 === undefined | ||
? undefined | ||
: _options$signal3.removeEventListener('abort', abort) | ||
} | ||
@@ -162,6 +162,3 @@ } | ||
queue.shift() | ||
if (resume) { | ||
resume() | ||
resume = null | ||
} | ||
maybeResume() | ||
} | ||
@@ -173,3 +170,2 @@ await new Promise((resolve) => { | ||
} finally { | ||
ac.abort() | ||
done = true | ||
@@ -193,9 +189,9 @@ if (resume) { | ||
for await (const val of this) { | ||
var _options$signal4 | ||
var _options$signal | ||
if ( | ||
options !== null && | ||
options !== undefined && | ||
(_options$signal4 = options.signal) !== null && | ||
_options$signal4 !== undefined && | ||
_options$signal4.aborted | ||
(_options$signal = options.signal) !== null && | ||
_options$signal !== undefined && | ||
_options$signal.aborted | ||
) { | ||
@@ -268,3 +264,3 @@ throw new AbortError({ | ||
async function reduce(reducer, initialValue, options) { | ||
var _options$signal5 | ||
var _options$signal2 | ||
if (typeof reducer !== 'function') { | ||
@@ -283,5 +279,5 @@ throw new ERR_INVALID_ARG_TYPE('reducer', ['Function', 'AsyncFunction'], reducer) | ||
options !== undefined && | ||
(_options$signal5 = options.signal) !== null && | ||
_options$signal5 !== undefined && | ||
_options$signal5.aborted | ||
(_options$signal2 = options.signal) !== null && | ||
_options$signal2 !== undefined && | ||
_options$signal2.aborted | ||
) { | ||
@@ -300,3 +296,4 @@ const err = new AbortError(undefined, { | ||
once: true, | ||
[kWeakHandler]: this | ||
[kWeakHandler]: this, | ||
[kResistStopPropagation]: true | ||
} | ||
@@ -308,3 +305,3 @@ options.signal.addEventListener('abort', () => ac.abort(), opts) | ||
for await (const value of this) { | ||
var _options$signal6 | ||
var _options$signal3 | ||
gotAnyItemFromStream = true | ||
@@ -314,5 +311,5 @@ if ( | ||
options !== undefined && | ||
(_options$signal6 = options.signal) !== null && | ||
_options$signal6 !== undefined && | ||
_options$signal6.aborted | ||
(_options$signal3 = options.signal) !== null && | ||
_options$signal3 !== undefined && | ||
_options$signal3.aborted | ||
) { | ||
@@ -347,9 +344,9 @@ throw new AbortError() | ||
for await (const val of this) { | ||
var _options$signal7 | ||
var _options$signal4 | ||
if ( | ||
options !== null && | ||
options !== undefined && | ||
(_options$signal7 = options.signal) !== null && | ||
_options$signal7 !== undefined && | ||
_options$signal7.aborted | ||
(_options$signal4 = options.signal) !== null && | ||
_options$signal4 !== undefined && | ||
_options$signal4.aborted | ||
) { | ||
@@ -393,9 +390,9 @@ throw new AbortError(undefined, { | ||
return async function* drop() { | ||
var _options$signal8 | ||
var _options$signal5 | ||
if ( | ||
options !== null && | ||
options !== undefined && | ||
(_options$signal8 = options.signal) !== null && | ||
_options$signal8 !== undefined && | ||
_options$signal8.aborted | ||
(_options$signal5 = options.signal) !== null && | ||
_options$signal5 !== undefined && | ||
_options$signal5.aborted | ||
) { | ||
@@ -405,9 +402,9 @@ throw new AbortError() | ||
for await (const val of this) { | ||
var _options$signal9 | ||
var _options$signal6 | ||
if ( | ||
options !== null && | ||
options !== undefined && | ||
(_options$signal9 = options.signal) !== null && | ||
_options$signal9 !== undefined && | ||
_options$signal9.aborted | ||
(_options$signal6 = options.signal) !== null && | ||
_options$signal6 !== undefined && | ||
_options$signal6.aborted | ||
) { | ||
@@ -431,9 +428,9 @@ throw new AbortError() | ||
return async function* take() { | ||
var _options$signal10 | ||
var _options$signal7 | ||
if ( | ||
options !== null && | ||
options !== undefined && | ||
(_options$signal10 = options.signal) !== null && | ||
_options$signal10 !== undefined && | ||
_options$signal10.aborted | ||
(_options$signal7 = options.signal) !== null && | ||
_options$signal7 !== undefined && | ||
_options$signal7.aborted | ||
) { | ||
@@ -443,9 +440,9 @@ throw new AbortError() | ||
for await (const val of this) { | ||
var _options$signal11 | ||
var _options$signal8 | ||
if ( | ||
options !== null && | ||
options !== undefined && | ||
(_options$signal11 = options.signal) !== null && | ||
_options$signal11 !== undefined && | ||
_options$signal11.aborted | ||
(_options$signal8 = options.signal) !== null && | ||
_options$signal8 !== undefined && | ||
_options$signal8.aborted | ||
) { | ||
@@ -456,3 +453,6 @@ throw new AbortError() | ||
yield val | ||
} else { | ||
} | ||
// Don't get another item from iterator in case we reached the end | ||
if (number <= 0) { | ||
return | ||
@@ -464,3 +464,3 @@ } | ||
module.exports.streamReturningOperators = { | ||
asIndexedPairs, | ||
asIndexedPairs: deprecate(asIndexedPairs, 'readable.asIndexedPairs will be removed in a future version.'), | ||
drop, | ||
@@ -467,0 +467,0 @@ filter, |
@@ -10,3 +10,3 @@ /* replacement start */ | ||
;('use strict') | ||
const { ArrayIsArray, Promise, SymbolAsyncIterator } = require('../../ours/primordials') | ||
const { ArrayIsArray, Promise, SymbolAsyncIterator, SymbolDispose } = require('../../ours/primordials') | ||
const eos = require('./end-of-stream') | ||
@@ -36,3 +36,3 @@ const { once } = require('../../ours/util') | ||
isReadableStream, | ||
isReadableEnded | ||
isReadableFinished | ||
} = require('./utils') | ||
@@ -42,2 +42,3 @@ const AbortController = globalThis.AbortController || require('abort-controller').AbortController | ||
let Readable | ||
let addAbortListener | ||
function destroyer(stream, reading, writing) { | ||
@@ -135,4 +136,4 @@ let finished = false | ||
writable.end() | ||
await wait() | ||
} | ||
await wait() | ||
finish() | ||
@@ -192,3 +193,7 @@ } catch (err) { | ||
} | ||
outerSignal === null || outerSignal === undefined ? undefined : outerSignal.addEventListener('abort', abort) | ||
addAbortListener = addAbortListener || require('../../ours/util').addAbortListener | ||
let disposable | ||
if (outerSignal) { | ||
disposable = addAbortListener(outerSignal, abort) | ||
} | ||
let error | ||
@@ -202,2 +207,3 @@ let value | ||
function finishImpl(err, final) { | ||
var _disposable | ||
if (err && (!error || error.code === 'ERR_STREAM_PREMATURE_CLOSE')) { | ||
@@ -212,3 +218,3 @@ error = err | ||
} | ||
outerSignal === null || outerSignal === undefined ? undefined : outerSignal.removeEventListener('abort', abort) | ||
;(_disposable = disposable) === null || _disposable === undefined ? undefined : _disposable[SymbolDispose]() | ||
ac.abort() | ||
@@ -421,3 +427,3 @@ if (final) { | ||
} | ||
if (isReadableEnded(src)) { | ||
if (isReadableFinished(src)) { | ||
// End the destination if the source has already ended. | ||
@@ -424,0 +430,0 @@ process.nextTick(endFn) |
@@ -38,2 +38,3 @@ /* replacement start */ | ||
SafeSet, | ||
SymbolAsyncDispose, | ||
SymbolAsyncIterator, | ||
@@ -63,3 +64,4 @@ Symbol | ||
ERR_STREAM_UNSHIFT_AFTER_END_EVENT | ||
} | ||
}, | ||
AbortError | ||
} = require('../../ours/errors') | ||
@@ -74,2 +76,72 @@ const { validateObject } = require('../validators') | ||
const { errorOrDestroy } = destroyImpl | ||
const kObjectMode = 1 << 0 | ||
const kEnded = 1 << 1 | ||
const kEndEmitted = 1 << 2 | ||
const kReading = 1 << 3 | ||
const kConstructed = 1 << 4 | ||
const kSync = 1 << 5 | ||
const kNeedReadable = 1 << 6 | ||
const kEmittedReadable = 1 << 7 | ||
const kReadableListening = 1 << 8 | ||
const kResumeScheduled = 1 << 9 | ||
const kErrorEmitted = 1 << 10 | ||
const kEmitClose = 1 << 11 | ||
const kAutoDestroy = 1 << 12 | ||
const kDestroyed = 1 << 13 | ||
const kClosed = 1 << 14 | ||
const kCloseEmitted = 1 << 15 | ||
const kMultiAwaitDrain = 1 << 16 | ||
const kReadingMore = 1 << 17 | ||
const kDataEmitted = 1 << 18 | ||
// TODO(benjamingr) it is likely slower to do it this way than with free functions | ||
function makeBitMapDescriptor(bit) { | ||
return { | ||
enumerable: false, | ||
get() { | ||
return (this.state & bit) !== 0 | ||
}, | ||
set(value) { | ||
if (value) this.state |= bit | ||
else this.state &= ~bit | ||
} | ||
} | ||
} | ||
ObjectDefineProperties(ReadableState.prototype, { | ||
objectMode: makeBitMapDescriptor(kObjectMode), | ||
ended: makeBitMapDescriptor(kEnded), | ||
endEmitted: makeBitMapDescriptor(kEndEmitted), | ||
reading: makeBitMapDescriptor(kReading), | ||
// Stream is still being constructed and cannot be | ||
// destroyed until construction finished or failed. | ||
// Async construction is opt in, therefore we start as | ||
// constructed. | ||
constructed: makeBitMapDescriptor(kConstructed), | ||
// A flag to be able to tell if the event 'readable'/'data' is emitted | ||
// immediately, or on a later tick. We set this to true at first, because | ||
// any actions that shouldn't happen until "later" should generally also | ||
// not happen before the first read call. | ||
sync: makeBitMapDescriptor(kSync), | ||
// Whenever we return null, then we set a flag to say | ||
// that we're awaiting a 'readable' event emission. | ||
needReadable: makeBitMapDescriptor(kNeedReadable), | ||
emittedReadable: makeBitMapDescriptor(kEmittedReadable), | ||
readableListening: makeBitMapDescriptor(kReadableListening), | ||
resumeScheduled: makeBitMapDescriptor(kResumeScheduled), | ||
// True if the error was already emitted and should not be thrown again. | ||
errorEmitted: makeBitMapDescriptor(kErrorEmitted), | ||
emitClose: makeBitMapDescriptor(kEmitClose), | ||
autoDestroy: makeBitMapDescriptor(kAutoDestroy), | ||
// Has it been destroyed. | ||
destroyed: makeBitMapDescriptor(kDestroyed), | ||
// Indicates whether the stream has finished destroying. | ||
closed: makeBitMapDescriptor(kClosed), | ||
// True if close has been emitted or would have been emitted | ||
// depending on emitClose. | ||
closeEmitted: makeBitMapDescriptor(kCloseEmitted), | ||
multiAwaitDrain: makeBitMapDescriptor(kMultiAwaitDrain), | ||
// If true, a maybeReadMore has been scheduled. | ||
readingMore: makeBitMapDescriptor(kReadingMore), | ||
dataEmitted: makeBitMapDescriptor(kDataEmitted) | ||
}) | ||
function ReadableState(options, stream, isDuplex) { | ||
@@ -83,6 +155,9 @@ // Duplex streams are both readable and writable, but share | ||
// Bit map field to store ReadableState more effciently with 1 bit per field | ||
// instead of a V8 slot per field. | ||
this.state = kEmitClose | kAutoDestroy | kConstructed | kSync | ||
// Object stream flag. Used to make read(n) ignore n and to | ||
// make all the buffer merging and length checks go away. | ||
this.objectMode = !!(options && options.objectMode) | ||
if (isDuplex) this.objectMode = this.objectMode || !!(options && options.readableObjectMode) | ||
if (options && options.objectMode) this.state |= kObjectMode | ||
if (isDuplex && options && options.readableObjectMode) this.state |= kObjectMode | ||
@@ -102,38 +177,10 @@ // The point at which it stops calling _read() to fill the buffer | ||
this.flowing = null | ||
this.ended = false | ||
this.endEmitted = false | ||
this.reading = false | ||
// Stream is still being constructed and cannot be | ||
// destroyed until construction finished or failed. | ||
// Async construction is opt in, therefore we start as | ||
// constructed. | ||
this.constructed = true | ||
// A flag to be able to tell if the event 'readable'/'data' is emitted | ||
// immediately, or on a later tick. We set this to true at first, because | ||
// any actions that shouldn't happen until "later" should generally also | ||
// not happen before the first read call. | ||
this.sync = true | ||
// Whenever we return null, then we set a flag to say | ||
// that we're awaiting a 'readable' event emission. | ||
this.needReadable = false | ||
this.emittedReadable = false | ||
this.readableListening = false | ||
this.resumeScheduled = false | ||
this[kPaused] = null | ||
// True if the error was already emitted and should not be thrown again. | ||
this.errorEmitted = false | ||
// Should close be emitted on destroy. Defaults to true. | ||
this.emitClose = !options || options.emitClose !== false | ||
if (options && options.emitClose === false) this.state &= ~kEmitClose | ||
// Should .destroy() be called after 'end' (and potentially 'finish'). | ||
this.autoDestroy = !options || options.autoDestroy !== false | ||
if (options && options.autoDestroy === false) this.state &= ~kAutoDestroy | ||
// Has it been destroyed. | ||
this.destroyed = false | ||
// Indicates whether the stream has errored. When true no further | ||
@@ -145,9 +192,2 @@ // _read calls, 'data' or 'readable' events should occur. This is needed | ||
// Indicates whether the stream has finished destroying. | ||
this.closed = false | ||
// True if close has been emitted or would have been emitted | ||
// depending on emitClose. | ||
this.closeEmitted = false | ||
// Crypto is kind of old and crusty. Historically, its default string | ||
@@ -161,7 +201,2 @@ // encoding is 'binary' so we have to make this configurable. | ||
this.awaitDrainWriters = null | ||
this.multiAwaitDrain = false | ||
// If true, a maybeReadMore has been scheduled. | ||
this.readingMore = false | ||
this.dataEmitted = false | ||
this.decoder = null | ||
@@ -202,2 +237,10 @@ this.encoding = null | ||
} | ||
Readable.prototype[SymbolAsyncDispose] = function () { | ||
let error | ||
if (!this.destroyed) { | ||
error = this.readableEnded ? null : new AbortError() | ||
this.destroy(error) | ||
} | ||
return new Promise((resolve, reject) => eos(this, (err) => (err && err !== error ? reject(err) : resolve(null)))) | ||
} | ||
@@ -220,3 +263,3 @@ // Manually shove something into the read() buffer. | ||
let err | ||
if (!state.objectMode) { | ||
if ((state.state & kObjectMode) === 0) { | ||
if (typeof chunk === 'string') { | ||
@@ -246,7 +289,7 @@ encoding = encoding || state.defaultEncoding | ||
} else if (chunk === null) { | ||
state.reading = false | ||
state.state &= ~kReading | ||
onEofChunk(stream, state) | ||
} else if (state.objectMode || (chunk && chunk.length > 0)) { | ||
} else if ((state.state & kObjectMode) !== 0 || (chunk && chunk.length > 0)) { | ||
if (addToFront) { | ||
if (state.endEmitted) errorOrDestroy(stream, new ERR_STREAM_UNSHIFT_AFTER_END_EVENT()) | ||
if ((state.state & kEndEmitted) !== 0) errorOrDestroy(stream, new ERR_STREAM_UNSHIFT_AFTER_END_EVENT()) | ||
else if (state.destroyed || state.errored) return false | ||
@@ -259,3 +302,3 @@ else addChunk(stream, state, chunk, true) | ||
} else { | ||
state.reading = false | ||
state.state &= ~kReading | ||
if (state.decoder && !encoding) { | ||
@@ -270,3 +313,3 @@ chunk = state.decoder.write(chunk) | ||
} else if (!addToFront) { | ||
state.reading = false | ||
state.state &= ~kReading | ||
maybeReadMore(stream, state) | ||
@@ -284,3 +327,3 @@ } | ||
// when we have multiple pipes. | ||
if (state.multiAwaitDrain) { | ||
if ((state.state & kMultiAwaitDrain) !== 0) { | ||
state.awaitDrainWriters.clear() | ||
@@ -297,3 +340,3 @@ } else { | ||
else state.buffer.push(chunk) | ||
if (state.needReadable) emitReadable(stream) | ||
if ((state.state & kNeedReadable) !== 0) emitReadable(stream) | ||
} | ||
@@ -348,3 +391,3 @@ maybeReadMore(stream, state) | ||
if (n <= 0 || (state.length === 0 && state.ended)) return 0 | ||
if (state.objectMode) return 1 | ||
if ((state.state & kObjectMode) !== 0) return 1 | ||
if (NumberIsNaN(n)) { | ||
@@ -374,3 +417,3 @@ // Only flow one buffer at a time. | ||
if (n > state.highWaterMark) state.highWaterMark = computeNewHighWaterMark(n) | ||
if (n !== 0) state.emittedReadable = false | ||
if (n !== 0) state.state &= ~kEmittedReadable | ||
@@ -421,3 +464,3 @@ // If we're doing read(0) to trigger a readable event, but we | ||
// if we need a readable event, then we need to do some reading. | ||
let doRead = state.needReadable | ||
let doRead = (state.state & kNeedReadable) !== 0 | ||
debug('need readable', doRead) | ||
@@ -439,6 +482,5 @@ | ||
debug('do read') | ||
state.reading = true | ||
state.sync = true | ||
state.state |= kReading | kSync | ||
// If the length is currently zero, then we *need* a readable event. | ||
if (state.length === 0) state.needReadable = true | ||
if (state.length === 0) state.state |= kNeedReadable | ||
@@ -451,3 +493,4 @@ // Call internal read method | ||
} | ||
state.sync = false | ||
state.state &= ~kSync | ||
// If _read pushed data synchronously, then `reading` will be false, | ||
@@ -731,5 +774,3 @@ // and we need to re-evaluate how much data we can return to the user. | ||
if (dest.writableNeedDrain === true) { | ||
if (state.flowing) { | ||
pause() | ||
} | ||
pause() | ||
} else if (!state.flowing) { | ||
@@ -736,0 +777,0 @@ debug('pipe resume') |
'use strict' | ||
const { MathFloor, NumberIsInteger } = require('../../ours/primordials') | ||
const { validateInteger } = require('../validators') | ||
const { ERR_INVALID_ARG_VALUE } = require('../../ours/errors').codes | ||
let defaultHighWaterMarkBytes = 16 * 1024 | ||
let defaultHighWaterMarkObjectMode = 16 | ||
function highWaterMarkFrom(options, isDuplex, duplexKey) { | ||
@@ -9,4 +12,12 @@ return options.highWaterMark != null ? options.highWaterMark : isDuplex ? options[duplexKey] : null | ||
function getDefaultHighWaterMark(objectMode) { | ||
return objectMode ? 16 : 16 * 1024 | ||
return objectMode ? defaultHighWaterMarkObjectMode : defaultHighWaterMarkBytes | ||
} | ||
function setDefaultHighWaterMark(objectMode, value) { | ||
validateInteger(value, 'value', 0) | ||
if (objectMode) { | ||
defaultHighWaterMarkObjectMode = value | ||
} else { | ||
defaultHighWaterMarkBytes = value | ||
} | ||
} | ||
function getHighWaterMark(state, options, duplexKey, isDuplex) { | ||
@@ -27,3 +38,4 @@ const hwm = highWaterMarkFrom(options, isDuplex, duplexKey) | ||
getHighWaterMark, | ||
getDefaultHighWaterMark | ||
getDefaultHighWaterMark, | ||
setDefaultHighWaterMark | ||
} |
'use strict' | ||
const { Symbol, SymbolAsyncIterator, SymbolIterator, SymbolFor } = require('../../ours/primordials') | ||
const kDestroyed = Symbol('kDestroyed') | ||
const kIsErrored = Symbol('kIsErrored') | ||
const kIsReadable = Symbol('kIsReadable') | ||
const kIsDisturbed = Symbol('kIsDisturbed') | ||
const { SymbolAsyncIterator, SymbolIterator, SymbolFor } = require('../../ours/primordials') | ||
// We need to use SymbolFor to make these globally available | ||
// for interopt with readable-stream, i.e. readable-stream | ||
// and node core needs to be able to read/write private state | ||
// from each other for proper interoperability. | ||
const kIsDestroyed = SymbolFor('nodejs.stream.destroyed') | ||
const kIsErrored = SymbolFor('nodejs.stream.errored') | ||
const kIsReadable = SymbolFor('nodejs.stream.readable') | ||
const kIsWritable = SymbolFor('nodejs.stream.writable') | ||
const kIsDisturbed = SymbolFor('nodejs.stream.disturbed') | ||
const kIsClosedPromise = SymbolFor('nodejs.webstream.isClosedPromise') | ||
@@ -90,3 +96,3 @@ const kControllerErrorFunction = SymbolFor('nodejs.webstream.controllerErrorFunction') | ||
const state = wState || rState | ||
return !!(stream.destroyed || stream[kDestroyed] || (state !== null && state !== undefined && state.destroyed)) | ||
return !!(stream.destroyed || stream[kIsDestroyed] || (state !== null && state !== undefined && state.destroyed)) | ||
} | ||
@@ -139,2 +145,3 @@ | ||
function isWritable(stream) { | ||
if (stream && stream[kIsWritable] != null) return stream[kIsWritable] | ||
if (typeof (stream === null || stream === undefined ? undefined : stream.writable) !== 'boolean') return null | ||
@@ -292,3 +299,4 @@ if (isDestroyed(stream)) return false | ||
module.exports = { | ||
kDestroyed, | ||
isDestroyed, | ||
kIsDestroyed, | ||
isDisturbed, | ||
@@ -302,4 +310,4 @@ kIsDisturbed, | ||
kControllerErrorFunction, | ||
kIsWritable, | ||
isClosed, | ||
isDestroyed, | ||
isDuplexNodeStream, | ||
@@ -306,0 +314,0 @@ isFinished, |
@@ -53,3 +53,2 @@ /* eslint jsdoc/require-jsdoc: "error" */ | ||
* behaviors. | ||
* | ||
* @param {*} value Values to be validated | ||
@@ -309,2 +308,22 @@ * @param {string} name Name of the argument | ||
/** | ||
* @callback validateAbortSignalArray | ||
* @param {*} value | ||
* @param {string} name | ||
* @returns {asserts value is AbortSignal[]} | ||
*/ | ||
/** @type {validateAbortSignalArray} */ | ||
function validateAbortSignalArray(value, name) { | ||
validateArray(value, name) | ||
for (let i = 0; i < value.length; i++) { | ||
const signal = value[i] | ||
const indexedName = `${name}[${i}]` | ||
if (signal == null) { | ||
throw new ERR_INVALID_ARG_TYPE(indexedName, 'AbortSignal', signal) | ||
} | ||
validateAbortSignal(signal, indexedName) | ||
} | ||
} | ||
/** | ||
* @param {*} signal | ||
@@ -493,2 +512,3 @@ * @param {string} [name='signal'] | ||
validateBooleanArray, | ||
validateAbortSignalArray, | ||
validateBoolean, | ||
@@ -495,0 +515,0 @@ validateBuffer, |
@@ -74,2 +74,5 @@ 'use strict' | ||
}, | ||
PromiseResolve(val) { | ||
return Promise.resolve(val) | ||
}, | ||
ReflectApply: Reflect.apply, | ||
@@ -98,6 +101,9 @@ RegExpPrototypeTest(self, value) { | ||
SymbolIterator: Symbol.iterator, | ||
SymbolDispose: Symbol.dispose || Symbol('Symbol.dispose'), | ||
SymbolAsyncDispose: Symbol.asyncDispose || Symbol('Symbol.asyncDispose'), | ||
TypedArrayPrototypeSet(self, buf, len) { | ||
return self.set(buf, len) | ||
}, | ||
Boolean: Boolean, | ||
Uint8Array | ||
} |
'use strict' | ||
const bufferModule = require('buffer') | ||
const { kResistStopPropagation, SymbolDispose } = require('./primordials') | ||
const AbortSignal = global.AbortSignal || require('abort-controller').AbortSignal | ||
const AbortController = global.AbortController || require('abort-controller').AbortController | ||
const AsyncFunction = Object.getPrototypeOf(async function () {}).constructor | ||
@@ -18,2 +21,11 @@ const Blob = globalThis.Blob || bufferModule.Blob | ||
const validateAbortSignal = (signal, name) => { | ||
if (signal !== undefined && (signal === null || typeof signal !== 'object' || !('aborted' in signal))) { | ||
throw new ERR_INVALID_ARG_TYPE(name, 'AbortSignal', signal) | ||
} | ||
} | ||
const validateFunction = (value, name) => { | ||
if (typeof value !== 'function') throw new ERR_INVALID_ARG_TYPE(name, 'Function', value) | ||
} | ||
// This is a simplified version of AggregateError | ||
@@ -127,4 +139,64 @@ class AggregateError extends Error { | ||
}, | ||
isBlob | ||
isBlob, | ||
deprecate(fn, message) { | ||
return fn | ||
}, | ||
addAbortListener: | ||
require('events').addAbortListener || | ||
function addAbortListener(signal, listener) { | ||
if (signal === undefined) { | ||
throw new ERR_INVALID_ARG_TYPE('signal', 'AbortSignal', signal) | ||
} | ||
validateAbortSignal(signal, 'signal') | ||
validateFunction(listener, 'listener') | ||
let removeEventListener | ||
if (signal.aborted) { | ||
queueMicrotask(() => listener()) | ||
} else { | ||
signal.addEventListener('abort', listener, { | ||
__proto__: null, | ||
once: true, | ||
[kResistStopPropagation]: true | ||
}) | ||
removeEventListener = () => { | ||
signal.removeEventListener('abort', listener) | ||
} | ||
} | ||
return { | ||
__proto__: null, | ||
[SymbolDispose]() { | ||
var _removeEventListener | ||
;(_removeEventListener = removeEventListener) === null || _removeEventListener === undefined | ||
? undefined | ||
: _removeEventListener() | ||
} | ||
} | ||
}, | ||
AbortSignalAny: | ||
AbortSignal.any || | ||
function AbortSignalAny(signals) { | ||
// Fast path if there is only one signal. | ||
if (signals.length === 1) { | ||
return signals[0] | ||
} | ||
const ac = new AbortController() | ||
const abort = () => ac.abort() | ||
signals.forEach((signal) => { | ||
validateAbortSignal(signal, 'signals') | ||
signal.addEventListener('abort', abort, { | ||
once: true | ||
}) | ||
}) | ||
ac.signal.addEventListener( | ||
'abort', | ||
() => { | ||
signals.forEach((signal) => signal.removeEventListener('abort', abort)) | ||
}, | ||
{ | ||
once: true | ||
} | ||
) | ||
return ac.signal | ||
} | ||
} | ||
module.exports.promisify.custom = Symbol.for('nodejs.util.promisify.custom') |
@@ -37,2 +37,3 @@ /* replacement start */ | ||
const compose = require('./internal/streams/compose') | ||
const { setDefaultHighWaterMark, getDefaultHighWaterMark } = require('./internal/streams/state') | ||
const { pipeline } = require('./internal/streams/pipeline') | ||
@@ -45,5 +46,7 @@ const { destroyer } = require('./internal/streams/destroy') | ||
const Stream = (module.exports = require('./internal/streams/legacy').Stream) | ||
Stream.isDestroyed = utils.isDestroyed | ||
Stream.isDisturbed = utils.isDisturbed | ||
Stream.isErrored = utils.isErrored | ||
Stream.isReadable = utils.isReadable | ||
Stream.isWritable = utils.isWritable | ||
Stream.Readable = require('./internal/streams/readable') | ||
@@ -108,2 +111,4 @@ for (const key of ObjectKeys(streamReturningOperators)) { | ||
Stream.compose = compose | ||
Stream.setDefaultHighWaterMark = setDefaultHighWaterMark | ||
Stream.getDefaultHighWaterMark = getDefaultHighWaterMark | ||
ObjectDefineProperty(Stream, 'promises', { | ||
@@ -110,0 +115,0 @@ __proto__: null, |
{ | ||
"name": "readable-stream", | ||
"version": "4.4.2", | ||
"version": "4.5.0", | ||
"description": "Node.js Streams, a user-land copy of the stream library from Node.js", | ||
@@ -5,0 +5,0 @@ "homepage": "https://github.com/nodejs/readable-stream", |
@@ -14,5 +14,5 @@ # readable-stream | ||
This package is a mirror of the streams implementations in Node.js 18.16.0. | ||
This package is a mirror of the streams implementations in Node.js 18.19.0. | ||
Full documentation may be found on the [Node.js website](https://nodejs.org/dist/v18.16.0/docs/api/stream.html). | ||
Full documentation may be found on the [Node.js website](https://nodejs.org/dist/v18.19.0/docs/api/stream.html). | ||
@@ -19,0 +19,0 @@ If you want to guarantee a stable streams base, regardless of what version of |
217248
6575