Socket
Socket
Sign inDemoInstall

readable-stream

Package Overview
Dependencies
9
Maintainers
3
Versions
103
Alerts
File Explorer

Advanced tools

Install Socket

Detect and block malicious and high-risk dependencies

Install

Comparing version 4.4.2 to 4.5.0

7

lib/internal/streams/add-abort-signal.js
'use strict'
const { SymbolDispose } = require('../../ours/primordials')
const { AbortError, codes } = require('../../ours/errors')

@@ -7,2 +8,3 @@ const { isNodeStream, isWebStream, kControllerErrorFunction } = require('./utils')

const { ERR_INVALID_ARG_TYPE } = codes
let addAbortListener

@@ -46,6 +48,7 @@ // This method is inlined here for readable-stream

} else {
signal.addEventListener('abort', onAbort)
eos(stream, () => signal.removeEventListener('abort', onAbort))
addAbortListener = addAbortListener || require('../../ours/util').addAbortListener
const disposable = addAbortListener(signal, onAbort)
eos(stream, disposable[SymbolDispose])
}
return stream
}

@@ -77,3 +77,3 @@ 'use strict'

writableObjectMode: !!(head !== null && head !== undefined && head.writableObjectMode),
readableObjectMode: !!(tail !== null && tail !== undefined && tail.writableObjectMode),
readableObjectMode: !!(tail !== null && tail !== undefined && tail.readableObjectMode),
writable,

@@ -80,0 +80,0 @@ readable

@@ -15,3 +15,3 @@ 'use strict'

const { Symbol } = require('../../ours/primordials')
const { kDestroyed, isDestroyed, isFinished, isServerRequest } = require('./utils')
const { kIsDestroyed, isDestroyed, isFinished, isServerRequest } = require('./utils')
const kDestroy = Symbol('kDestroy')

@@ -282,3 +282,3 @@ const kConstruct = Symbol('kConstruct')

if (!stream.destroyed) {
stream[kDestroyed] = true
stream[kIsDestroyed] = true
}

@@ -285,0 +285,0 @@ }

@@ -16,3 +16,5 @@ /* replacement start */

isWritableNodeStream,
isDuplexNodeStream
isDuplexNodeStream,
isReadableStream,
isWritableStream
} = require('./utils')

@@ -27,2 +29,3 @@ const eos = require('./end-of-stream')

const Readable = require('./readable')
const Writable = require('./writable')
const { createDeferredPromise } = require('../../ours/util')

@@ -82,13 +85,12 @@ const from = require('./from')

}
// TODO: Webstreams
// if (isReadableStream(body)) {
// return _duplexify({ readable: Readable.fromWeb(body) });
// }
// TODO: Webstreams
// if (isWritableStream(body)) {
// return _duplexify({ writable: Writable.fromWeb(body) });
// }
if (isReadableStream(body)) {
return _duplexify({
readable: Readable.fromWeb(body)
})
}
if (isWritableStream(body)) {
return _duplexify({
writable: Writable.fromWeb(body)
})
}
if (typeof body === 'function') {

@@ -150,12 +152,9 @@ const { value, write, final, destroy } = fromAsyncGen(body)

}
// TODO: Webstreams.
// if (
// isReadableStream(body?.readable) &&
// isWritableStream(body?.writable)
// ) {
// return Duplexify.fromWeb(body);
// }
if (
isReadableStream(body === null || body === undefined ? undefined : body.readable) &&
isWritableStream(body === null || body === undefined ? undefined : body.writable)
) {
return Duplexify.fromWeb(body)
}
if (
typeof (body === null || body === undefined ? undefined : body.writable) === 'object' ||

@@ -162,0 +161,0 @@ typeof (body === null || body === undefined ? undefined : body.readable) === 'object'

@@ -14,3 +14,3 @@ /* replacement start */

const { validateAbortSignal, validateFunction, validateObject, validateBoolean } = require('../validators')
const { Promise, PromisePrototypeThen } = require('../../ours/primordials')
const { Promise, PromisePrototypeThen, SymbolDispose } = require('../../ours/primordials')
const {

@@ -32,2 +32,3 @@ isClosed,

} = require('./utils')
let addAbortListener
function isRequest(stream) {

@@ -217,8 +218,9 @@ return stream.setHeader && typeof stream.abort === 'function'

} else {
addAbortListener = addAbortListener || require('../../ours/util').addAbortListener
const disposable = addAbortListener(options.signal, abort)
const originalCallback = callback
callback = once((...args) => {
options.signal.removeEventListener('abort', abort)
disposable[SymbolDispose]()
originalCallback.apply(stream, args)
})
options.signal.addEventListener('abort', abort)
}

@@ -244,8 +246,9 @@ }

} else {
addAbortListener = addAbortListener || require('../../ours/util').addAbortListener
const disposable = addAbortListener(options.signal, abort)
const originalCallback = callback
callback = once((...args) => {
options.signal.removeEventListener('abort', abort)
disposable[SymbolDispose]()
originalCallback.apply(stream, args)
})
options.signal.addEventListener('abort', abort)
}

@@ -252,0 +255,0 @@ }

@@ -10,2 +10,3 @@ 'use strict'

const kWeakHandler = require('../../ours/primordials').Symbol('kWeak')
const kResistStopPropagation = require('../../ours/primordials').Symbol('kResistStopPropagation')
const { finished } = require('./end-of-stream')

@@ -15,4 +16,6 @@ const staticCompose = require('./compose')

const { isWritable, isNodeStream } = require('./utils')
const { deprecate } = require('../../ours/util')
const {
ArrayPrototypePush,
Boolean,
MathFloor,

@@ -23,2 +26,3 @@ Number,

PromiseReject,
PromiseResolve,
PromisePrototypeThen,

@@ -60,37 +64,39 @@ Symbol

}
validateInteger(concurrency, 'concurrency', 1)
let highWaterMark = concurrency - 1
if ((options === null || options === undefined ? undefined : options.highWaterMark) != null) {
highWaterMark = MathFloor(options.highWaterMark)
}
validateInteger(concurrency, 'options.concurrency', 1)
validateInteger(highWaterMark, 'options.highWaterMark', 0)
highWaterMark += concurrency
return async function* map() {
var _options$signal, _options$signal2
const ac = new AbortController()
const signal = require('../../ours/util').AbortSignalAny(
[options === null || options === undefined ? undefined : options.signal].filter(Boolean)
)
const stream = this
const queue = []
const signal = ac.signal
const signalOpt = {
signal
}
const abort = () => ac.abort()
if (
options !== null &&
options !== undefined &&
(_options$signal = options.signal) !== null &&
_options$signal !== undefined &&
_options$signal.aborted
) {
abort()
}
options === null || options === undefined
? undefined
: (_options$signal2 = options.signal) === null || _options$signal2 === undefined
? undefined
: _options$signal2.addEventListener('abort', abort)
let next
let resume
let done = false
function onDone() {
let cnt = 0
function onCatch() {
done = true
afterItemProcessed()
}
function afterItemProcessed() {
cnt -= 1
maybeResume()
}
function maybeResume() {
if (resume && !done && cnt < concurrency && queue.length < highWaterMark) {
resume()
resume = null
}
}
async function pump() {
try {
for await (let val of stream) {
var _val
if (done) {

@@ -104,11 +110,11 @@ return

val = fn(val, signalOpt)
if (val === kEmpty) {
continue
}
val = PromiseResolve(val)
} catch (err) {
val = PromiseReject(err)
}
if (val === kEmpty) {
continue
}
if (typeof ((_val = val) === null || _val === undefined ? undefined : _val.catch) === 'function') {
val.catch(onDone)
}
cnt += 1
PromisePrototypeThen(val, afterItemProcessed, onCatch)
queue.push(val)

@@ -119,3 +125,3 @@ if (next) {

}
if (!done && queue.length && queue.length >= concurrency) {
if (!done && (queue.length >= highWaterMark || cnt >= concurrency)) {
await new Promise((resolve) => {

@@ -129,6 +135,5 @@ resume = resolve

const val = PromiseReject(err)
PromisePrototypeThen(val, undefined, onDone)
PromisePrototypeThen(val, afterItemProcessed, onCatch)
queue.push(val)
} finally {
var _options$signal3
done = true

@@ -139,7 +144,2 @@ if (next) {

}
options === null || options === undefined
? undefined
: (_options$signal3 = options.signal) === null || _options$signal3 === undefined
? undefined
: _options$signal3.removeEventListener('abort', abort)
}

@@ -162,6 +162,3 @@ }

queue.shift()
if (resume) {
resume()
resume = null
}
maybeResume()
}

@@ -173,3 +170,2 @@ await new Promise((resolve) => {

} finally {
ac.abort()
done = true

@@ -193,9 +189,9 @@ if (resume) {

for await (const val of this) {
var _options$signal4
var _options$signal
if (
options !== null &&
options !== undefined &&
(_options$signal4 = options.signal) !== null &&
_options$signal4 !== undefined &&
_options$signal4.aborted
(_options$signal = options.signal) !== null &&
_options$signal !== undefined &&
_options$signal.aborted
) {

@@ -268,3 +264,3 @@ throw new AbortError({

async function reduce(reducer, initialValue, options) {
var _options$signal5
var _options$signal2
if (typeof reducer !== 'function') {

@@ -283,5 +279,5 @@ throw new ERR_INVALID_ARG_TYPE('reducer', ['Function', 'AsyncFunction'], reducer)

options !== undefined &&
(_options$signal5 = options.signal) !== null &&
_options$signal5 !== undefined &&
_options$signal5.aborted
(_options$signal2 = options.signal) !== null &&
_options$signal2 !== undefined &&
_options$signal2.aborted
) {

@@ -300,3 +296,4 @@ const err = new AbortError(undefined, {

once: true,
[kWeakHandler]: this
[kWeakHandler]: this,
[kResistStopPropagation]: true
}

@@ -308,3 +305,3 @@ options.signal.addEventListener('abort', () => ac.abort(), opts)

for await (const value of this) {
var _options$signal6
var _options$signal3
gotAnyItemFromStream = true

@@ -314,5 +311,5 @@ if (

options !== undefined &&
(_options$signal6 = options.signal) !== null &&
_options$signal6 !== undefined &&
_options$signal6.aborted
(_options$signal3 = options.signal) !== null &&
_options$signal3 !== undefined &&
_options$signal3.aborted
) {

@@ -347,9 +344,9 @@ throw new AbortError()

for await (const val of this) {
var _options$signal7
var _options$signal4
if (
options !== null &&
options !== undefined &&
(_options$signal7 = options.signal) !== null &&
_options$signal7 !== undefined &&
_options$signal7.aborted
(_options$signal4 = options.signal) !== null &&
_options$signal4 !== undefined &&
_options$signal4.aborted
) {

@@ -393,9 +390,9 @@ throw new AbortError(undefined, {

return async function* drop() {
var _options$signal8
var _options$signal5
if (
options !== null &&
options !== undefined &&
(_options$signal8 = options.signal) !== null &&
_options$signal8 !== undefined &&
_options$signal8.aborted
(_options$signal5 = options.signal) !== null &&
_options$signal5 !== undefined &&
_options$signal5.aborted
) {

@@ -405,9 +402,9 @@ throw new AbortError()

for await (const val of this) {
var _options$signal9
var _options$signal6
if (
options !== null &&
options !== undefined &&
(_options$signal9 = options.signal) !== null &&
_options$signal9 !== undefined &&
_options$signal9.aborted
(_options$signal6 = options.signal) !== null &&
_options$signal6 !== undefined &&
_options$signal6.aborted
) {

@@ -431,9 +428,9 @@ throw new AbortError()

return async function* take() {
var _options$signal10
var _options$signal7
if (
options !== null &&
options !== undefined &&
(_options$signal10 = options.signal) !== null &&
_options$signal10 !== undefined &&
_options$signal10.aborted
(_options$signal7 = options.signal) !== null &&
_options$signal7 !== undefined &&
_options$signal7.aborted
) {

@@ -443,9 +440,9 @@ throw new AbortError()

for await (const val of this) {
var _options$signal11
var _options$signal8
if (
options !== null &&
options !== undefined &&
(_options$signal11 = options.signal) !== null &&
_options$signal11 !== undefined &&
_options$signal11.aborted
(_options$signal8 = options.signal) !== null &&
_options$signal8 !== undefined &&
_options$signal8.aborted
) {

@@ -456,3 +453,6 @@ throw new AbortError()

yield val
} else {
}
// Don't get another item from iterator in case we reached the end
if (number <= 0) {
return

@@ -464,3 +464,3 @@ }

module.exports.streamReturningOperators = {
asIndexedPairs,
asIndexedPairs: deprecate(asIndexedPairs, 'readable.asIndexedPairs will be removed in a future version.'),
drop,

@@ -467,0 +467,0 @@ filter,

@@ -10,3 +10,3 @@ /* replacement start */

;('use strict')
const { ArrayIsArray, Promise, SymbolAsyncIterator } = require('../../ours/primordials')
const { ArrayIsArray, Promise, SymbolAsyncIterator, SymbolDispose } = require('../../ours/primordials')
const eos = require('./end-of-stream')

@@ -36,3 +36,3 @@ const { once } = require('../../ours/util')

isReadableStream,
isReadableEnded
isReadableFinished
} = require('./utils')

@@ -42,2 +42,3 @@ const AbortController = globalThis.AbortController || require('abort-controller').AbortController

let Readable
let addAbortListener
function destroyer(stream, reading, writing) {

@@ -135,4 +136,4 @@ let finished = false

writable.end()
await wait()
}
await wait()
finish()

@@ -192,3 +193,7 @@ } catch (err) {

}
outerSignal === null || outerSignal === undefined ? undefined : outerSignal.addEventListener('abort', abort)
addAbortListener = addAbortListener || require('../../ours/util').addAbortListener
let disposable
if (outerSignal) {
disposable = addAbortListener(outerSignal, abort)
}
let error

@@ -202,2 +207,3 @@ let value

function finishImpl(err, final) {
var _disposable
if (err && (!error || error.code === 'ERR_STREAM_PREMATURE_CLOSE')) {

@@ -212,3 +218,3 @@ error = err

}
outerSignal === null || outerSignal === undefined ? undefined : outerSignal.removeEventListener('abort', abort)
;(_disposable = disposable) === null || _disposable === undefined ? undefined : _disposable[SymbolDispose]()
ac.abort()

@@ -421,3 +427,3 @@ if (final) {

}
if (isReadableEnded(src)) {
if (isReadableFinished(src)) {
// End the destination if the source has already ended.

@@ -424,0 +430,0 @@ process.nextTick(endFn)

@@ -38,2 +38,3 @@ /* replacement start */

SafeSet,
SymbolAsyncDispose,
SymbolAsyncIterator,

@@ -63,3 +64,4 @@ Symbol

ERR_STREAM_UNSHIFT_AFTER_END_EVENT
}
},
AbortError
} = require('../../ours/errors')

@@ -74,2 +76,72 @@ const { validateObject } = require('../validators')

const { errorOrDestroy } = destroyImpl
const kObjectMode = 1 << 0
const kEnded = 1 << 1
const kEndEmitted = 1 << 2
const kReading = 1 << 3
const kConstructed = 1 << 4
const kSync = 1 << 5
const kNeedReadable = 1 << 6
const kEmittedReadable = 1 << 7
const kReadableListening = 1 << 8
const kResumeScheduled = 1 << 9
const kErrorEmitted = 1 << 10
const kEmitClose = 1 << 11
const kAutoDestroy = 1 << 12
const kDestroyed = 1 << 13
const kClosed = 1 << 14
const kCloseEmitted = 1 << 15
const kMultiAwaitDrain = 1 << 16
const kReadingMore = 1 << 17
const kDataEmitted = 1 << 18
// TODO(benjamingr) it is likely slower to do it this way than with free functions
function makeBitMapDescriptor(bit) {
return {
enumerable: false,
get() {
return (this.state & bit) !== 0
},
set(value) {
if (value) this.state |= bit
else this.state &= ~bit
}
}
}
ObjectDefineProperties(ReadableState.prototype, {
objectMode: makeBitMapDescriptor(kObjectMode),
ended: makeBitMapDescriptor(kEnded),
endEmitted: makeBitMapDescriptor(kEndEmitted),
reading: makeBitMapDescriptor(kReading),
// Stream is still being constructed and cannot be
// destroyed until construction finished or failed.
// Async construction is opt in, therefore we start as
// constructed.
constructed: makeBitMapDescriptor(kConstructed),
// A flag to be able to tell if the event 'readable'/'data' is emitted
// immediately, or on a later tick. We set this to true at first, because
// any actions that shouldn't happen until "later" should generally also
// not happen before the first read call.
sync: makeBitMapDescriptor(kSync),
// Whenever we return null, then we set a flag to say
// that we're awaiting a 'readable' event emission.
needReadable: makeBitMapDescriptor(kNeedReadable),
emittedReadable: makeBitMapDescriptor(kEmittedReadable),
readableListening: makeBitMapDescriptor(kReadableListening),
resumeScheduled: makeBitMapDescriptor(kResumeScheduled),
// True if the error was already emitted and should not be thrown again.
errorEmitted: makeBitMapDescriptor(kErrorEmitted),
emitClose: makeBitMapDescriptor(kEmitClose),
autoDestroy: makeBitMapDescriptor(kAutoDestroy),
// Has it been destroyed.
destroyed: makeBitMapDescriptor(kDestroyed),
// Indicates whether the stream has finished destroying.
closed: makeBitMapDescriptor(kClosed),
// True if close has been emitted or would have been emitted
// depending on emitClose.
closeEmitted: makeBitMapDescriptor(kCloseEmitted),
multiAwaitDrain: makeBitMapDescriptor(kMultiAwaitDrain),
// If true, a maybeReadMore has been scheduled.
readingMore: makeBitMapDescriptor(kReadingMore),
dataEmitted: makeBitMapDescriptor(kDataEmitted)
})
function ReadableState(options, stream, isDuplex) {

@@ -83,6 +155,9 @@ // Duplex streams are both readable and writable, but share

// Bit map field to store ReadableState more effciently with 1 bit per field
// instead of a V8 slot per field.
this.state = kEmitClose | kAutoDestroy | kConstructed | kSync
// Object stream flag. Used to make read(n) ignore n and to
// make all the buffer merging and length checks go away.
this.objectMode = !!(options && options.objectMode)
if (isDuplex) this.objectMode = this.objectMode || !!(options && options.readableObjectMode)
if (options && options.objectMode) this.state |= kObjectMode
if (isDuplex && options && options.readableObjectMode) this.state |= kObjectMode

@@ -102,38 +177,10 @@ // The point at which it stops calling _read() to fill the buffer

this.flowing = null
this.ended = false
this.endEmitted = false
this.reading = false
// Stream is still being constructed and cannot be
// destroyed until construction finished or failed.
// Async construction is opt in, therefore we start as
// constructed.
this.constructed = true
// A flag to be able to tell if the event 'readable'/'data' is emitted
// immediately, or on a later tick. We set this to true at first, because
// any actions that shouldn't happen until "later" should generally also
// not happen before the first read call.
this.sync = true
// Whenever we return null, then we set a flag to say
// that we're awaiting a 'readable' event emission.
this.needReadable = false
this.emittedReadable = false
this.readableListening = false
this.resumeScheduled = false
this[kPaused] = null
// True if the error was already emitted and should not be thrown again.
this.errorEmitted = false
// Should close be emitted on destroy. Defaults to true.
this.emitClose = !options || options.emitClose !== false
if (options && options.emitClose === false) this.state &= ~kEmitClose
// Should .destroy() be called after 'end' (and potentially 'finish').
this.autoDestroy = !options || options.autoDestroy !== false
if (options && options.autoDestroy === false) this.state &= ~kAutoDestroy
// Has it been destroyed.
this.destroyed = false
// Indicates whether the stream has errored. When true no further

@@ -145,9 +192,2 @@ // _read calls, 'data' or 'readable' events should occur. This is needed

// Indicates whether the stream has finished destroying.
this.closed = false
// True if close has been emitted or would have been emitted
// depending on emitClose.
this.closeEmitted = false
// Crypto is kind of old and crusty. Historically, its default string

@@ -161,7 +201,2 @@ // encoding is 'binary' so we have to make this configurable.

this.awaitDrainWriters = null
this.multiAwaitDrain = false
// If true, a maybeReadMore has been scheduled.
this.readingMore = false
this.dataEmitted = false
this.decoder = null

@@ -202,2 +237,10 @@ this.encoding = null

}
Readable.prototype[SymbolAsyncDispose] = function () {
let error
if (!this.destroyed) {
error = this.readableEnded ? null : new AbortError()
this.destroy(error)
}
return new Promise((resolve, reject) => eos(this, (err) => (err && err !== error ? reject(err) : resolve(null))))
}

@@ -220,3 +263,3 @@ // Manually shove something into the read() buffer.

let err
if (!state.objectMode) {
if ((state.state & kObjectMode) === 0) {
if (typeof chunk === 'string') {

@@ -246,7 +289,7 @@ encoding = encoding || state.defaultEncoding

} else if (chunk === null) {
state.reading = false
state.state &= ~kReading
onEofChunk(stream, state)
} else if (state.objectMode || (chunk && chunk.length > 0)) {
} else if ((state.state & kObjectMode) !== 0 || (chunk && chunk.length > 0)) {
if (addToFront) {
if (state.endEmitted) errorOrDestroy(stream, new ERR_STREAM_UNSHIFT_AFTER_END_EVENT())
if ((state.state & kEndEmitted) !== 0) errorOrDestroy(stream, new ERR_STREAM_UNSHIFT_AFTER_END_EVENT())
else if (state.destroyed || state.errored) return false

@@ -259,3 +302,3 @@ else addChunk(stream, state, chunk, true)

} else {
state.reading = false
state.state &= ~kReading
if (state.decoder && !encoding) {

@@ -270,3 +313,3 @@ chunk = state.decoder.write(chunk)

} else if (!addToFront) {
state.reading = false
state.state &= ~kReading
maybeReadMore(stream, state)

@@ -284,3 +327,3 @@ }

// when we have multiple pipes.
if (state.multiAwaitDrain) {
if ((state.state & kMultiAwaitDrain) !== 0) {
state.awaitDrainWriters.clear()

@@ -297,3 +340,3 @@ } else {

else state.buffer.push(chunk)
if (state.needReadable) emitReadable(stream)
if ((state.state & kNeedReadable) !== 0) emitReadable(stream)
}

@@ -348,3 +391,3 @@ maybeReadMore(stream, state)

if (n <= 0 || (state.length === 0 && state.ended)) return 0
if (state.objectMode) return 1
if ((state.state & kObjectMode) !== 0) return 1
if (NumberIsNaN(n)) {

@@ -374,3 +417,3 @@ // Only flow one buffer at a time.

if (n > state.highWaterMark) state.highWaterMark = computeNewHighWaterMark(n)
if (n !== 0) state.emittedReadable = false
if (n !== 0) state.state &= ~kEmittedReadable

@@ -421,3 +464,3 @@ // If we're doing read(0) to trigger a readable event, but we

// if we need a readable event, then we need to do some reading.
let doRead = state.needReadable
let doRead = (state.state & kNeedReadable) !== 0
debug('need readable', doRead)

@@ -439,6 +482,5 @@

debug('do read')
state.reading = true
state.sync = true
state.state |= kReading | kSync
// If the length is currently zero, then we *need* a readable event.
if (state.length === 0) state.needReadable = true
if (state.length === 0) state.state |= kNeedReadable

@@ -451,3 +493,4 @@ // Call internal read method

}
state.sync = false
state.state &= ~kSync
// If _read pushed data synchronously, then `reading` will be false,

@@ -731,5 +774,3 @@ // and we need to re-evaluate how much data we can return to the user.

if (dest.writableNeedDrain === true) {
if (state.flowing) {
pause()
}
pause()
} else if (!state.flowing) {

@@ -736,0 +777,0 @@ debug('pipe resume')

'use strict'
const { MathFloor, NumberIsInteger } = require('../../ours/primordials')
const { validateInteger } = require('../validators')
const { ERR_INVALID_ARG_VALUE } = require('../../ours/errors').codes
let defaultHighWaterMarkBytes = 16 * 1024
let defaultHighWaterMarkObjectMode = 16
function highWaterMarkFrom(options, isDuplex, duplexKey) {

@@ -9,4 +12,12 @@ return options.highWaterMark != null ? options.highWaterMark : isDuplex ? options[duplexKey] : null

function getDefaultHighWaterMark(objectMode) {
return objectMode ? 16 : 16 * 1024
return objectMode ? defaultHighWaterMarkObjectMode : defaultHighWaterMarkBytes
}
function setDefaultHighWaterMark(objectMode, value) {
validateInteger(value, 'value', 0)
if (objectMode) {
defaultHighWaterMarkObjectMode = value
} else {
defaultHighWaterMarkBytes = value
}
}
function getHighWaterMark(state, options, duplexKey, isDuplex) {

@@ -27,3 +38,4 @@ const hwm = highWaterMarkFrom(options, isDuplex, duplexKey)

getHighWaterMark,
getDefaultHighWaterMark
getDefaultHighWaterMark,
setDefaultHighWaterMark
}
'use strict'
const { Symbol, SymbolAsyncIterator, SymbolIterator, SymbolFor } = require('../../ours/primordials')
const kDestroyed = Symbol('kDestroyed')
const kIsErrored = Symbol('kIsErrored')
const kIsReadable = Symbol('kIsReadable')
const kIsDisturbed = Symbol('kIsDisturbed')
const { SymbolAsyncIterator, SymbolIterator, SymbolFor } = require('../../ours/primordials')
// We need to use SymbolFor to make these globally available
// for interopt with readable-stream, i.e. readable-stream
// and node core needs to be able to read/write private state
// from each other for proper interoperability.
const kIsDestroyed = SymbolFor('nodejs.stream.destroyed')
const kIsErrored = SymbolFor('nodejs.stream.errored')
const kIsReadable = SymbolFor('nodejs.stream.readable')
const kIsWritable = SymbolFor('nodejs.stream.writable')
const kIsDisturbed = SymbolFor('nodejs.stream.disturbed')
const kIsClosedPromise = SymbolFor('nodejs.webstream.isClosedPromise')

@@ -90,3 +96,3 @@ const kControllerErrorFunction = SymbolFor('nodejs.webstream.controllerErrorFunction')

const state = wState || rState
return !!(stream.destroyed || stream[kDestroyed] || (state !== null && state !== undefined && state.destroyed))
return !!(stream.destroyed || stream[kIsDestroyed] || (state !== null && state !== undefined && state.destroyed))
}

@@ -139,2 +145,3 @@

function isWritable(stream) {
if (stream && stream[kIsWritable] != null) return stream[kIsWritable]
if (typeof (stream === null || stream === undefined ? undefined : stream.writable) !== 'boolean') return null

@@ -292,3 +299,4 @@ if (isDestroyed(stream)) return false

module.exports = {
kDestroyed,
isDestroyed,
kIsDestroyed,
isDisturbed,

@@ -302,4 +310,4 @@ kIsDisturbed,

kControllerErrorFunction,
kIsWritable,
isClosed,
isDestroyed,
isDuplexNodeStream,

@@ -306,0 +314,0 @@ isFinished,

@@ -53,3 +53,2 @@ /* eslint jsdoc/require-jsdoc: "error" */

* behaviors.
*
* @param {*} value Values to be validated

@@ -309,2 +308,22 @@ * @param {string} name Name of the argument

/**
* @callback validateAbortSignalArray
* @param {*} value
* @param {string} name
* @returns {asserts value is AbortSignal[]}
*/
/** @type {validateAbortSignalArray} */
function validateAbortSignalArray(value, name) {
validateArray(value, name)
for (let i = 0; i < value.length; i++) {
const signal = value[i]
const indexedName = `${name}[${i}]`
if (signal == null) {
throw new ERR_INVALID_ARG_TYPE(indexedName, 'AbortSignal', signal)
}
validateAbortSignal(signal, indexedName)
}
}
/**
* @param {*} signal

@@ -493,2 +512,3 @@ * @param {string} [name='signal']

validateBooleanArray,
validateAbortSignalArray,
validateBoolean,

@@ -495,0 +515,0 @@ validateBuffer,

@@ -74,2 +74,5 @@ 'use strict'

},
PromiseResolve(val) {
return Promise.resolve(val)
},
ReflectApply: Reflect.apply,

@@ -98,6 +101,9 @@ RegExpPrototypeTest(self, value) {

SymbolIterator: Symbol.iterator,
SymbolDispose: Symbol.dispose || Symbol('Symbol.dispose'),
SymbolAsyncDispose: Symbol.asyncDispose || Symbol('Symbol.asyncDispose'),
TypedArrayPrototypeSet(self, buf, len) {
return self.set(buf, len)
},
Boolean: Boolean,
Uint8Array
}
'use strict'
const bufferModule = require('buffer')
const { kResistStopPropagation, SymbolDispose } = require('./primordials')
const AbortSignal = global.AbortSignal || require('abort-controller').AbortSignal
const AbortController = global.AbortController || require('abort-controller').AbortController
const AsyncFunction = Object.getPrototypeOf(async function () {}).constructor

@@ -18,2 +21,11 @@ const Blob = globalThis.Blob || bufferModule.Blob

const validateAbortSignal = (signal, name) => {
if (signal !== undefined && (signal === null || typeof signal !== 'object' || !('aborted' in signal))) {
throw new ERR_INVALID_ARG_TYPE(name, 'AbortSignal', signal)
}
}
const validateFunction = (value, name) => {
if (typeof value !== 'function') throw new ERR_INVALID_ARG_TYPE(name, 'Function', value)
}
// This is a simplified version of AggregateError

@@ -127,4 +139,64 @@ class AggregateError extends Error {

},
isBlob
isBlob,
deprecate(fn, message) {
return fn
},
addAbortListener:
require('events').addAbortListener ||
function addAbortListener(signal, listener) {
if (signal === undefined) {
throw new ERR_INVALID_ARG_TYPE('signal', 'AbortSignal', signal)
}
validateAbortSignal(signal, 'signal')
validateFunction(listener, 'listener')
let removeEventListener
if (signal.aborted) {
queueMicrotask(() => listener())
} else {
signal.addEventListener('abort', listener, {
__proto__: null,
once: true,
[kResistStopPropagation]: true
})
removeEventListener = () => {
signal.removeEventListener('abort', listener)
}
}
return {
__proto__: null,
[SymbolDispose]() {
var _removeEventListener
;(_removeEventListener = removeEventListener) === null || _removeEventListener === undefined
? undefined
: _removeEventListener()
}
}
},
AbortSignalAny:
AbortSignal.any ||
function AbortSignalAny(signals) {
// Fast path if there is only one signal.
if (signals.length === 1) {
return signals[0]
}
const ac = new AbortController()
const abort = () => ac.abort()
signals.forEach((signal) => {
validateAbortSignal(signal, 'signals')
signal.addEventListener('abort', abort, {
once: true
})
})
ac.signal.addEventListener(
'abort',
() => {
signals.forEach((signal) => signal.removeEventListener('abort', abort))
},
{
once: true
}
)
return ac.signal
}
}
module.exports.promisify.custom = Symbol.for('nodejs.util.promisify.custom')

@@ -37,2 +37,3 @@ /* replacement start */

const compose = require('./internal/streams/compose')
const { setDefaultHighWaterMark, getDefaultHighWaterMark } = require('./internal/streams/state')
const { pipeline } = require('./internal/streams/pipeline')

@@ -45,5 +46,7 @@ const { destroyer } = require('./internal/streams/destroy')

const Stream = (module.exports = require('./internal/streams/legacy').Stream)
Stream.isDestroyed = utils.isDestroyed
Stream.isDisturbed = utils.isDisturbed
Stream.isErrored = utils.isErrored
Stream.isReadable = utils.isReadable
Stream.isWritable = utils.isWritable
Stream.Readable = require('./internal/streams/readable')

@@ -108,2 +111,4 @@ for (const key of ObjectKeys(streamReturningOperators)) {

Stream.compose = compose
Stream.setDefaultHighWaterMark = setDefaultHighWaterMark
Stream.getDefaultHighWaterMark = getDefaultHighWaterMark
ObjectDefineProperty(Stream, 'promises', {

@@ -110,0 +115,0 @@ __proto__: null,

{
"name": "readable-stream",
"version": "4.4.2",
"version": "4.5.0",
"description": "Node.js Streams, a user-land copy of the stream library from Node.js",

@@ -5,0 +5,0 @@ "homepage": "https://github.com/nodejs/readable-stream",

@@ -14,5 +14,5 @@ # readable-stream

This package is a mirror of the streams implementations in Node.js 18.16.0.
This package is a mirror of the streams implementations in Node.js 18.19.0.
Full documentation may be found on the [Node.js website](https://nodejs.org/dist/v18.16.0/docs/api/stream.html).
Full documentation may be found on the [Node.js website](https://nodejs.org/dist/v18.19.0/docs/api/stream.html).

@@ -19,0 +19,0 @@ If you want to guarantee a stable streams base, regardless of what version of

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc