Socket
Socket
Sign inDemoInstall

readable-stream

Package Overview
Dependencies
7
Maintainers
3
Versions
103
Alerts
File Explorer

Advanced tools

Install Socket

Detect and block malicious and high-risk dependencies

Install

Comparing version 4.2.0 to 4.3.0

3

lib/_stream_duplex.js

@@ -1,3 +0,4 @@

'use strict' // Keep this file as an alias for the full stream module.
'use strict'
// Keep this file as an alias for the full stream module.
module.exports = require('./stream').Duplex

@@ -1,3 +0,4 @@

'use strict' // Keep this file as an alias for the full stream module.
'use strict'
// Keep this file as an alias for the full stream module.
module.exports = require('./stream').PassThrough

@@ -1,3 +0,4 @@

'use strict' // Keep this file as an alias for the full stream module.
'use strict'
// Keep this file as an alias for the full stream module.
module.exports = require('./stream').Readable

@@ -1,3 +0,4 @@

'use strict' // Keep this file as an alias for the full stream module.
'use strict'
// Keep this file as an alias for the full stream module.
module.exports = require('./stream').Transform

@@ -1,3 +0,4 @@

'use strict' // Keep this file as an alias for the full stream module.
'use strict'
// Keep this file as an alias for the full stream module.
module.exports = require('./stream').Writable
'use strict'
const { AbortError, codes } = require('../../ours/errors')
const eos = require('./end-of-stream')
const { ERR_INVALID_ARG_TYPE } = codes
const { ERR_INVALID_ARG_TYPE } = codes // This method is inlined here for readable-stream
// This method is inlined here for readable-stream
// It also does not allow for signal to not exist on the stream
// https://github.com/nodejs/node/pull/36061#discussion_r533718029
const validateAbortSignal = (signal, name) => {

@@ -16,17 +15,12 @@ if (typeof signal !== 'object' || !('aborted' in signal)) {

}
function isNodeStream(obj) {
return !!(obj && typeof obj.pipe === 'function')
}
module.exports.addAbortSignal = function addAbortSignal(signal, stream) {
validateAbortSignal(signal, 'signal')
if (!isNodeStream(stream)) {
throw new ERR_INVALID_ARG_TYPE('stream', 'stream.Stream', stream)
}
return module.exports.addAbortSignalNoValidate(signal, stream)
}
module.exports.addAbortSignalNoValidate = function (signal, stream) {

@@ -36,3 +30,2 @@ if (typeof signal !== 'object' || !('aborted' in signal)) {

}
const onAbort = () => {

@@ -45,3 +38,2 @@ stream.destroy(

}
if (signal.aborted) {

@@ -53,4 +45,3 @@ onAbort()

}
return stream
}
'use strict'
const { StringPrototypeSlice, SymbolIterator, TypedArrayPrototypeSet, Uint8Array } = require('../../ours/primordials')
const { Buffer } = require('buffer')
const { inspect } = require('../../ours/util')
module.exports = class BufferList {

@@ -15,3 +12,2 @@ constructor() {

}
push(v) {

@@ -27,3 +23,2 @@ const entry = {

}
unshift(v) {

@@ -38,3 +33,2 @@ const entry = {

}
shift() {

@@ -48,3 +42,2 @@ if (this.length === 0) return

}
clear() {

@@ -54,3 +47,2 @@ this.head = this.tail = null

}
join(s) {

@@ -60,8 +52,5 @@ if (this.length === 0) return ''

let ret = '' + p.data
while ((p = p.next) !== null) ret += s + p.data
return ret
}
concat(n) {

@@ -72,3 +61,2 @@ if (this.length === 0) return Buffer.alloc(0)

let i = 0
while (p) {

@@ -79,9 +67,8 @@ TypedArrayPrototypeSet(ret, p.data, i)

}
return ret
} // Consumes a specified amount of bytes or characters from the buffered data.
}
// Consumes a specified amount of bytes or characters from the buffered data.
consume(n, hasStrings) {
const data = this.head.data
if (n < data.length) {

@@ -93,15 +80,12 @@ // `slice` is the same for buffers and strings.

}
if (n === data.length) {
// First chunk is a perfect match.
return this.shift()
} // Result spans more than one buffer.
}
// Result spans more than one buffer.
return hasStrings ? this._getString(n) : this._getBuffer(n)
}
first() {
return this.head.data
}
*[SymbolIterator]() {

@@ -111,4 +95,5 @@ for (let p = this.head; p; p = p.next) {

}
} // Consumes a specified amount of characters from the buffered data.
}
// Consumes a specified amount of characters from the buffered data.
_getString(n) {

@@ -118,6 +103,4 @@ let ret = ''

let c = 0
do {
const str = p.data
if (n > str.length) {

@@ -137,13 +120,11 @@ ret += str

}
break
}
++c
} while ((p = p.next) !== null)
this.length -= c
return ret
} // Consumes a specified amount of bytes from the buffered data.
}
// Consumes a specified amount of bytes from the buffered data.
_getBuffer(n) {

@@ -154,6 +135,4 @@ const ret = Buffer.allocUnsafe(n)

let c = 0
do {
const buf = p.data
if (n > buf.length) {

@@ -173,13 +152,11 @@ TypedArrayPrototypeSet(ret, buf, retLen - n)

}
break
}
++c
} while ((p = p.next) !== null)
this.length -= c
return ret
} // Make sure the linked list only shows the minimal necessary information.
}
// Make sure the linked list only shows the minimal necessary information.
[Symbol.for('nodejs.util.inspect.custom')](_, options) {

@@ -186,0 +163,0 @@ return inspect(this, {

'use strict'
const { pipeline } = require('./pipeline')
const Duplex = require('./duplex')
const { destroyer } = require('./destroy')
const { isNodeStream, isReadable, isWritable } = require('./utils')
const {

@@ -15,3 +11,2 @@ AbortError,

} = require('../../ours/errors')
module.exports = function compose(...streams) {

@@ -21,13 +16,9 @@ if (streams.length === 0) {

}
if (streams.length === 1) {
return Duplex.from(streams[0])
}
const orgStreams = [...streams]
if (typeof streams[0] === 'function') {
streams[0] = Duplex.from(streams[0])
}
if (typeof streams[streams.length - 1] === 'function') {

@@ -37,3 +28,2 @@ const idx = streams.length - 1

}
for (let n = 0; n < streams.length; ++n) {

@@ -44,7 +34,5 @@ if (!isNodeStream(streams[n])) {

}
if (n < streams.length - 1 && !isReadable(streams[n])) {
throw new ERR_INVALID_ARG_VALUE(`streams[${n}]`, orgStreams[n], 'must be readable')
}
if (n > 0 && !isWritable(streams[n])) {

@@ -54,3 +42,2 @@ throw new ERR_INVALID_ARG_VALUE(`streams[${n}]`, orgStreams[n], 'must be writable')

}
let ondrain

@@ -61,7 +48,5 @@ let onfinish

let d
function onfinished(err) {
const cb = onclose
onclose = null
if (cb) {

@@ -75,10 +60,10 @@ cb(err)

}
const head = streams[0]
const tail = pipeline(streams, onfinished)
const writable = !!isWritable(head)
const readable = !!isReadable(tail) // TODO(ronag): Avoid double buffering.
const readable = !!isReadable(tail)
// TODO(ronag): Avoid double buffering.
// Implement Writable/Readable/Duplex traits.
// See, https://github.com/nodejs/node/pull/33515.
d = new Duplex({

@@ -91,3 +76,2 @@ // TODO (ronag): highWaterMark?

})
if (writable) {

@@ -101,3 +85,2 @@ d._write = function (chunk, encoding, callback) {

}
d._final = function (callback) {

@@ -107,3 +90,2 @@ head.end()

}
head.on('drain', function () {

@@ -124,3 +106,2 @@ if (ondrain) {

}
if (readable) {

@@ -137,7 +118,5 @@ tail.on('readable', function () {

})
d._read = function () {
while (true) {
const buf = tail.read()
if (buf === null) {

@@ -147,3 +126,2 @@ onreadable = d._read

}
if (!d.push(buf)) {

@@ -155,3 +133,2 @@ return

}
d._destroy = function (err, callback) {

@@ -161,7 +138,5 @@ if (!err && onclose !== null) {

}
onreadable = null
ondrain = null
onfinish = null
if (onclose === null) {

@@ -174,4 +149,3 @@ callback(err)

}
return d
}

@@ -5,3 +5,4 @@ 'use strict'

const process = require('process')
const process = require('process/')
/* replacement end */

@@ -14,10 +15,6 @@

} = require('../../ours/errors')
const { Symbol } = require('../../ours/primordials')
const { kDestroyed, isDestroyed, isFinished, isServerRequest } = require('./utils')
const kDestroy = Symbol('kDestroy')
const kConstruct = Symbol('kConstruct')
function checkError(err, w, r) {

@@ -31,3 +28,2 @@ if (err) {

}
if (r && !r.errored) {

@@ -37,11 +33,11 @@ r.errored = err

}
} // Backwards compat. cb() is undocumented and unused in core but
}
// Backwards compat. cb() is undocumented and unused in core but
// unfortunately might be used by modules.
function destroy(err, cb) {
const r = this._readableState
const w = this._writableState // With duplex streams we use the writable side for state.
const w = this._writableState
// With duplex streams we use the writable side for state.
const s = w || r
if ((w && w.destroyed) || (r && r.destroyed)) {

@@ -51,17 +47,16 @@ if (typeof cb === 'function') {

}
return this
}
return this
} // We set destroyed to true before firing error callbacks in order
// We set destroyed to true before firing error callbacks in order
// to make it re-entrance safe in case destroy() is called within callbacks
checkError(err, w, r)
if (w) {
w.destroyed = true
}
if (r) {
r.destroyed = true
} // If still constructing then defer calling _destroy.
}
// If still constructing then defer calling _destroy.
if (!s.constructed) {

@@ -74,9 +69,6 @@ this.once(kDestroy, function (er) {

}
return this
}
function _destroy(self, err, cb) {
let called = false
function onDestroy(err) {

@@ -86,3 +78,2 @@ if (called) {

}
called = true

@@ -92,15 +83,11 @@ const r = self._readableState

checkError(err, w, r)
if (w) {
w.closed = true
}
if (r) {
r.closed = true
}
if (typeof cb === 'function') {
cb(err)
}
if (err) {

@@ -112,3 +99,2 @@ process.nextTick(emitErrorCloseNT, self, err)

}
try {

@@ -120,3 +106,2 @@ self._destroy(err || null, onDestroy)

}
function emitErrorCloseNT(self, err) {

@@ -126,15 +111,11 @@ emitErrorNT(self, err)

}
function emitCloseNT(self) {
const r = self._readableState
const w = self._writableState
if (w) {
w.closeEmitted = true
}
if (r) {
r.closeEmitted = true
}
if ((w && w.emitClose) || (r && r.emitClose)) {

@@ -144,26 +125,19 @@ self.emit('close')

}
function emitErrorNT(self, err) {
const r = self._readableState
const w = self._writableState
if ((w && w.errorEmitted) || (r && r.errorEmitted)) {
return
}
if (w) {
w.errorEmitted = true
}
if (r) {
r.errorEmitted = true
}
self.emit('error', err)
}
function undestroy() {
const r = this._readableState
const w = this._writableState
if (r) {

@@ -180,3 +154,2 @@ r.constructed = true

}
if (w) {

@@ -196,3 +169,2 @@ w.constructed = true

}
function errorOrDestroy(stream, err, sync) {

@@ -204,9 +176,8 @@ // We have tests that rely on errors being emitted

// semver major update we should change the default to this.
const r = stream._readableState
const w = stream._writableState
if ((w && w.destroyed) || (r && r.destroyed)) {
return this
}
if ((r && r.autoDestroy) || (w && w.autoDestroy)) stream.destroy(err)

@@ -220,7 +191,5 @@ else if (err) {

}
if (r && !r.errored) {
r.errored = err
}
if (sync) {

@@ -233,3 +202,2 @@ process.nextTick(emitErrorNT, stream, err)

}
function construct(stream, cb) {

@@ -239,16 +207,11 @@ if (typeof stream._construct !== 'function') {

}
const r = stream._readableState
const w = stream._writableState
if (r) {
r.constructed = false
}
if (w) {
w.constructed = false
}
stream.once(kConstruct, cb)
if (stream.listenerCount(kConstruct) > 1) {

@@ -258,9 +221,6 @@ // Duplex

}
process.nextTick(constructNT, stream)
}
function constructNT(stream) {
let called = false
function onConstruct(err) {

@@ -271,3 +231,2 @@ if (called) {

}
called = true

@@ -277,11 +236,8 @@ const r = stream._readableState

const s = w || r
if (r) {
r.constructed = true
}
if (w) {
w.constructed = true
}
if (s.destroyed) {

@@ -295,3 +251,2 @@ stream.emit(kDestroy, err)

}
try {

@@ -303,20 +258,17 @@ stream._construct(onConstruct)

}
function emitConstructNT(stream) {
stream.emit(kConstruct)
}
function isRequest(stream) {
return stream && stream.setHeader && typeof stream.abort === 'function'
}
function emitCloseLegacy(stream) {
stream.emit('close')
}
function emitErrorCloseLegacy(stream, err) {
stream.emit('error', err)
process.nextTick(emitCloseLegacy, stream)
} // Normalize destroy for legacy.
}
// Normalize destroy for legacy.
function destroyer(stream, err) {

@@ -326,7 +278,7 @@ if (!stream || isDestroyed(stream)) {

}
if (!err && !isFinished(stream)) {
err = new AbortError()
} // TODO: Remove isRequest branches.
}
// TODO: Remove isRequest branches.
if (isServerRequest(stream)) {

@@ -349,3 +301,2 @@ stream.socket = null

}
if (!stream.destroyed) {

@@ -355,3 +306,2 @@ stream[kDestroyed] = true

}
module.exports = {

@@ -358,0 +308,0 @@ construct,

@@ -21,2 +21,3 @@ // Copyright Joyent, Inc. and other Node contributors.

// USE OR OTHER DEALINGS IN THE SOFTWARE.
// a duplex stream is just a stream that is both readable and writable.

@@ -26,2 +27,3 @@ // Since JS doesn't have multiple prototype inheritance, this class

// Writable.
'use strict'

@@ -35,14 +37,10 @@

} = require('../../ours/primordials')
module.exports = Duplex
const Readable = require('./readable')
const Writable = require('./writable')
ObjectSetPrototypeOf(Duplex.prototype, Readable.prototype)
ObjectSetPrototypeOf(Duplex, Readable)
{
const keys = ObjectKeys(Writable.prototype) // Allow the keys array to be GC'ed.
const keys = ObjectKeys(Writable.prototype)
// Allow the keys array to be GC'ed.
for (let i = 0; i < keys.length; i++) {

@@ -53,3 +51,2 @@ const method = keys[i]

}
function Duplex(options) {

@@ -59,6 +56,4 @@ if (!(this instanceof Duplex)) return new Duplex(options)

Writable.call(this, options)
if (options) {
this.allowHalfOpen = options.allowHalfOpen !== false
if (options.readable === false) {

@@ -69,3 +64,2 @@ this._readableState.readable = false

}
if (options.writable === false) {

@@ -81,3 +75,2 @@ this._writableState.writable = false

}
ObjectDefineProperties(Duplex.prototype, {

@@ -122,3 +115,2 @@ writable: {

__proto__: null,
get() {

@@ -128,6 +120,4 @@ if (this._readableState === undefined || this._writableState === undefined) {

}
return this._readableState.destroyed && this._writableState.destroyed
},
set(value) {

@@ -143,4 +133,5 @@ // Backward compatibility, the user is explicitly

})
let webStreamsAdapters // Lazy to avoid circular references
let webStreamsAdapters
// Lazy to avoid circular references
function lazyWebStreams() {

@@ -150,13 +141,9 @@ if (webStreamsAdapters === undefined) webStreamsAdapters = {}

}
Duplex.fromWeb = function (pair, options) {
return lazyWebStreams().newStreamDuplexFromReadableWritablePair(pair, options)
}
Duplex.toWeb = function (duplex) {
return lazyWebStreams().newReadableWritablePairFromDuplex(duplex)
}
let duplexify
Duplex.from = function (body) {

@@ -166,4 +153,3 @@ if (!duplexify) {

}
return duplexify(body, 'body')
}
/* replacement start */
const process = require('process')
const process = require('process/')
/* replacement end */
;('use strict')
const bufferModule = require('buffer')
const {

@@ -18,5 +18,3 @@ isReadable,

} = require('./utils')
const eos = require('./end-of-stream')
const {

@@ -26,13 +24,7 @@ AbortError,

} = require('../../ours/errors')
const { destroyer } = require('./destroy')
const Duplex = require('./duplex')
const Readable = require('./readable')
const { createDeferredPromise } = require('../../ours/util')
const from = require('./from')
const Blob = globalThis.Blob || bufferModule.Blob

@@ -47,11 +39,12 @@ const isBlob =

}
const AbortController = globalThis.AbortController || require('abort-controller').AbortController
const { FunctionPrototypeCall } = require('../../ours/primordials')
const { FunctionPrototypeCall } = require('../../ours/primordials') // This is needed for pre node 17.
// This is needed for pre node 17.
class Duplexify extends Duplex {
constructor(options) {
super(options) // https://github.com/nodejs/node/pull/34385
super(options)
// https://github.com/nodejs/node/pull/34385
if ((options === null || options === undefined ? undefined : options.readable) === false) {

@@ -62,3 +55,2 @@ this._readableState.readable = false

}
if ((options === null || options === undefined ? undefined : options.writable) === false) {

@@ -72,3 +64,2 @@ this._writableState.writable = false

}
module.exports = function duplexify(body, name) {

@@ -78,3 +69,2 @@ if (isDuplexNodeStream(body)) {

}
if (isReadableNodeStream(body)) {

@@ -85,3 +75,2 @@ return _duplexify({

}
if (isWritableNodeStream(body)) {

@@ -92,3 +81,2 @@ return _duplexify({

}
if (isNodeStream(body)) {

@@ -99,6 +87,9 @@ return _duplexify({

})
} // TODO: Webstreams
}
// TODO: Webstreams
// if (isReadableStream(body)) {
// return _duplexify({ readable: Readable.fromWeb(body) });
// }
// TODO: Webstreams

@@ -111,3 +102,2 @@ // if (isWritableStream(body)) {

const { value, write, final, destroy } = fromAsyncGen(body)
if (isIterable(value)) {

@@ -122,5 +112,3 @@ return from(Duplexify, value, {

}
const then = value === null || value === undefined ? undefined : value.then
if (typeof then === 'function') {

@@ -145,3 +133,2 @@ let d

write,
final(cb) {

@@ -157,14 +144,10 @@ final(async () => {

},
destroy
}))
}
throw new ERR_INVALID_RETURN_VALUE('Iterable, AsyncIterable or AsyncFunction', name, value)
}
if (isBlob(body)) {
return duplexify(body.arrayBuffer())
}
if (isIterable(body)) {

@@ -176,3 +159,5 @@ return from(Duplexify, body, {

})
} // TODO: Webstreams.
}
// TODO: Webstreams.
// if (

@@ -210,5 +195,3 @@ // isReadableStream(body?.readable) &&

}
const then = body === null || body === undefined ? undefined : body.then
if (typeof then === 'function') {

@@ -223,3 +206,2 @@ let d

}
d.push(null)

@@ -234,7 +216,5 @@ },

writable: false,
read() {}
}))
}
throw new ERR_INVALID_ARG_TYPE(

@@ -256,3 +236,2 @@ name,

}
function fromAsyncGen(fn) {

@@ -284,7 +263,5 @@ let { promise, resolve } = createDeferredPromise()

value,
write(chunk, encoding, cb) {
const _resolve = resolve
resolve = null
_resolve({

@@ -296,7 +273,5 @@ chunk,

},
final(cb) {
const _resolve = resolve
resolve = null
_resolve({

@@ -307,3 +282,2 @@ done: true,

},
destroy(err, cb) {

@@ -315,3 +289,2 @@ ac.abort()

}
function _duplexify(pair) {

@@ -327,7 +300,5 @@ const r = pair.readable && typeof pair.readable.read !== 'function' ? Readable.wrap(pair.readable) : pair.readable

let d
function onfinished(err) {
const cb = onclose
onclose = null
if (cb) {

@@ -340,6 +311,7 @@ cb(err)

}
} // TODO(ronag): Avoid double buffering.
}
// TODO(ronag): Avoid double buffering.
// Implement Writable/Readable/Duplex traits.
// See, https://github.com/nodejs/node/pull/33515.
d = new Duplexify({

@@ -352,14 +324,10 @@ // TODO (ronag): highWaterMark?

})
if (writable) {
eos(w, (err) => {
writable = false
if (err) {
destroyer(r, err)
}
onfinished(err)
})
d._write = function (chunk, encoding, callback) {

@@ -372,3 +340,2 @@ if (w.write(chunk, encoding)) {

}
d._final = function (callback) {

@@ -378,3 +345,2 @@ w.end()

}
w.on('drain', function () {

@@ -395,11 +361,8 @@ if (ondrain) {

}
if (readable) {
eos(r, (err) => {
readable = false
if (err) {
destroyer(r, err)
}
onfinished(err)

@@ -417,7 +380,5 @@ })

})
d._read = function () {
while (true) {
const buf = r.read()
if (buf === null) {

@@ -427,3 +388,2 @@ onreadable = d._read

}
if (!d.push(buf)) {

@@ -435,3 +395,2 @@ return

}
d._destroy = function (err, callback) {

@@ -441,7 +400,5 @@ if (!err && onclose !== null) {

}
onreadable = null
ondrain = null
onfinish = null
if (onclose === null) {

@@ -455,4 +412,3 @@ callback(err)

}
return d
}
/* replacement start */
const process = require('process')
const process = require('process/')
/* replacement end */

@@ -8,13 +10,7 @@ // Ported from https://github.com/mafintosh/end-of-stream with

;('use strict')
const { AbortError, codes } = require('../../ours/errors')
const { ERR_INVALID_ARG_TYPE, ERR_STREAM_PREMATURE_CLOSE } = codes
const { kEmptyObject, once } = require('../../ours/util')
const { validateAbortSignal, validateFunction, validateObject } = require('../validators')
const { Promise } = require('../../ours/primordials')
const {

@@ -33,12 +29,8 @@ isClosed,

} = require('./utils')
function isRequest(stream) {
return stream.setHeader && typeof stream.abort === 'function'
}
const nop = () => {}
function eos(stream, options, callback) {
var _options$readable, _options$writable
if (arguments.length === 2) {

@@ -52,3 +44,2 @@ callback = options

}
validateFunction(callback, 'callback')

@@ -65,3 +56,2 @@ validateAbortSignal(options.signal, 'options.signal')

: isWritableNodeStream(stream)
if (!isNodeStream(stream)) {

@@ -71,6 +61,4 @@ // TODO: Webstreams.

}
const wState = stream._writableState
const rState = stream._readableState
const onlegacyfinish = () => {

@@ -80,23 +68,21 @@ if (!stream.writable) {

}
} // TODO (ronag): Improve soft detection to include core modules and
}
// TODO (ronag): Improve soft detection to include core modules and
// common ecosystem modules that do properly emit 'close' but fail
// this generic check.
let willEmitClose =
_willEmitClose(stream) && isReadableNodeStream(stream) === readable && isWritableNodeStream(stream) === writable
let writableFinished = isWritableFinished(stream, false)
const onfinish = () => {
writableFinished = true // Stream should not be destroyed here. If it is that
writableFinished = true
// Stream should not be destroyed here. If it is that
// means that user space is doing something differently and
// we cannot trust willEmitClose.
if (stream.destroyed) {
willEmitClose = false
}
if (willEmitClose && (!stream.readable || readable)) {
return
}
if (!readable || readableFinished) {

@@ -106,18 +92,14 @@ callback.call(stream)

}
let readableFinished = isReadableFinished(stream, false)
const onend = () => {
readableFinished = true // Stream should not be destroyed here. If it is that
readableFinished = true
// Stream should not be destroyed here. If it is that
// means that user space is doing something differently and
// we cannot trust willEmitClose.
if (stream.destroyed) {
willEmitClose = false
}
if (willEmitClose && (!stream.writable || writable)) {
return
}
if (!writable || writableFinished) {

@@ -127,39 +109,28 @@ callback.call(stream)

}
const onerror = (err) => {
callback.call(stream, err)
}
let closed = isClosed(stream)
const onclose = () => {
closed = true
const errored = isWritableErrored(stream) || isReadableErrored(stream)
if (errored && typeof errored !== 'boolean') {
return callback.call(stream, errored)
}
if (readable && !readableFinished && isReadableNodeStream(stream, true)) {
if (!isReadableFinished(stream, false)) return callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE())
}
if (writable && !writableFinished) {
if (!isWritableFinished(stream, false)) return callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE())
}
callback.call(stream)
}
const onrequest = () => {
stream.req.on('finish', onfinish)
}
if (isRequest(stream)) {
stream.on('complete', onfinish)
if (!willEmitClose) {
stream.on('abort', onclose)
}
if (stream.req) {

@@ -174,17 +145,14 @@ onrequest()

stream.on('close', onlegacyfinish)
} // Not all streams will emit 'close' after 'aborted'.
}
// Not all streams will emit 'close' after 'aborted'.
if (!willEmitClose && typeof stream.aborted === 'boolean') {
stream.on('aborted', onclose)
}
stream.on('end', onend)
stream.on('finish', onfinish)
if (options.error !== false) {
stream.on('error', onerror)
}
stream.on('close', onclose)
if (closed) {

@@ -214,3 +182,2 @@ process.nextTick(onclose)

}
const cleanup = () => {

@@ -230,3 +197,2 @@ callback = nop

}
if (options.signal && !closed) {

@@ -244,3 +210,2 @@ const abort = () => {

}
if (options.signal.aborted) {

@@ -257,6 +222,4 @@ process.nextTick(abort)

}
return cleanup
}
function finished(stream, opts) {

@@ -273,4 +236,3 @@ return new Promise((resolve, reject) => {

}
module.exports = eos
module.exports.finished = finished

@@ -5,14 +5,11 @@ 'use strict'

const process = require('process')
const process = require('process/')
/* replacement end */
const { PromisePrototypeThen, SymbolAsyncIterator, SymbolIterator } = require('../../ours/primordials')
const { Buffer } = require('buffer')
const { ERR_INVALID_ARG_TYPE, ERR_STREAM_NULL_VALUES } = require('../../ours/errors').codes
function from(Readable, iterable, opts) {
let iterator
if (typeof iterable === 'string' || iterable instanceof Buffer) {

@@ -22,3 +19,2 @@ return new Readable({

...opts,
read() {

@@ -30,5 +26,3 @@ this.push(iterable)

}
let isAsync
if (iterable && iterable[SymbolAsyncIterator]) {

@@ -43,3 +37,2 @@ isAsync = true

}
const readable = new Readable({

@@ -50,7 +43,7 @@ objectMode: true,

...opts
}) // Flag to protect against _read
})
// Flag to protect against _read
// being called before last iteration completion.
let reading = false
readable._read = function () {

@@ -62,19 +55,16 @@ if (!reading) {

}
readable._destroy = function (error, cb) {
PromisePrototypeThen(
close(error),
() => process.nextTick(cb, error), // nextTick is here in case cb throws
() => process.nextTick(cb, error),
// nextTick is here in case cb throws
(e) => process.nextTick(cb, e || error)
)
}
async function close(error) {
const hadError = error !== undefined && error !== null
const hasThrow = typeof iterator.throw === 'function'
if (hadError && hasThrow) {
const { value, done } = await iterator.throw(error)
await value
if (done) {

@@ -84,3 +74,2 @@ return

}
if (typeof iterator.return === 'function') {

@@ -91,3 +80,2 @@ const { value } = await iterator.return()

}
async function next() {

@@ -97,3 +85,2 @@ for (;;) {

const { value, done } = isAsync ? await iterator.next() : iterator.next()
if (done) {

@@ -103,3 +90,2 @@ readable.push(null)

const res = value && typeof value.then === 'function' ? await value : value
if (res === null) {

@@ -117,10 +103,7 @@ reading = false

}
break
}
}
return readable
}
module.exports = from

@@ -7,16 +7,10 @@ // LazyTransform is a special type of Transform stream that is lazily loaded.

const { ObjectDefineProperties, ObjectDefineProperty, ObjectSetPrototypeOf } = require('../../ours/primordials')
const stream = require('../../stream')
const { getDefaultEncoding } = require('../crypto/util')
module.exports = LazyTransform
function LazyTransform(options) {
this._options = options
}
ObjectSetPrototypeOf(LazyTransform.prototype, stream.Transform.prototype)
ObjectSetPrototypeOf(LazyTransform, stream.Transform)
function makeGetter(name) {

@@ -26,11 +20,8 @@ return function () {

this._writableState.decodeStrings = false
if (!this._options || !this._options.defaultEncoding) {
this._writableState.defaultEncoding = getDefaultEncoding()
}
return this[name]
}
}
function makeSetter(name) {

@@ -47,3 +38,2 @@ return function (val) {

}
ObjectDefineProperties(LazyTransform.prototype, {

@@ -50,0 +40,0 @@ _readableState: {

'use strict'
const { ArrayIsArray, ObjectSetPrototypeOf } = require('../../ours/primordials')
const { EventEmitter: EE } = require('events')
function Stream(opts) {
EE.call(this, opts)
}
ObjectSetPrototypeOf(Stream.prototype, EE.prototype)
ObjectSetPrototypeOf(Stream, EE)
Stream.prototype.pipe = function (dest, options) {
const source = this
function ondata(chunk) {

@@ -22,5 +17,3 @@ if (dest.writable && dest.write(chunk) === false && source.pause) {

}
source.on('data', ondata)
function ondrain() {

@@ -31,6 +24,6 @@ if (source.readable && source.resume) {

}
dest.on('drain', ondrain)
dest.on('drain', ondrain) // If the 'end' option is not supplied, dest.end() will be called when
// If the 'end' option is not supplied, dest.end() will be called when
// source gets the 'end' or 'close' events. Only dest.end() once.
if (!dest._isStdio && (!options || options.end !== false)) {

@@ -40,5 +33,3 @@ source.on('end', onend)

}
let didOnEnd = false
function onend() {

@@ -49,3 +40,2 @@ if (didOnEnd) return

}
function onclose() {

@@ -55,7 +45,7 @@ if (didOnEnd) return

if (typeof dest.destroy === 'function') dest.destroy()
} // Don't leave dangling pipes when there are errors.
}
// Don't leave dangling pipes when there are errors.
function onerror(er) {
cleanup()
if (EE.listenerCount(this, 'error') === 0) {

@@ -65,6 +55,6 @@ this.emit('error', er)

}
prependListener(source, 'error', onerror)
prependListener(dest, 'error', onerror) // Remove all the event listeners that were added.
prependListener(dest, 'error', onerror)
// Remove all the event listeners that were added.
function cleanup() {

@@ -81,19 +71,19 @@ source.removeListener('data', ondata)

}
source.on('end', cleanup)
source.on('close', cleanup)
dest.on('close', cleanup)
dest.emit('pipe', source) // Allow for unix-like usage: A.pipe(B).pipe(C)
dest.emit('pipe', source)
// Allow for unix-like usage: A.pipe(B).pipe(C)
return dest
}
function prependListener(emitter, event, fn) {
// Sadly this is not cacheable as some libraries bundle their own
// event emitter implementation with them.
if (typeof emitter.prependListener === 'function') return emitter.prependListener(event, fn) // This is a hack to make sure that our error handler is attached before any
if (typeof emitter.prependListener === 'function') return emitter.prependListener(event, fn)
// This is a hack to make sure that our error handler is attached before any
// userland ones. NEVER DO THIS. This is here only because this code needs
// to continue to work with older versions of Node.js that do not include
// the prependListener() method. The goal is to eventually remove this hack.
if (!emitter._events || !emitter._events[event]) emitter.on(event, fn)

@@ -103,3 +93,2 @@ else if (ArrayIsArray(emitter._events[event])) emitter._events[event].unshift(fn)

}
module.exports = {

@@ -106,0 +95,0 @@ Stream,

'use strict'
const AbortController = globalThis.AbortController || require('abort-controller').AbortController
const {

@@ -9,9 +8,5 @@ codes: { ERR_INVALID_ARG_TYPE, ERR_MISSING_ARGS, ERR_OUT_OF_RANGE },

} = require('../../ours/errors')
const { validateAbortSignal, validateInteger, validateObject } = require('../validators')
const kWeakHandler = require('../../ours/primordials').Symbol('kWeak')
const { finished } = require('./end-of-stream')
const {

@@ -27,6 +22,4 @@ ArrayPrototypePush,

} = require('../../ours/primordials')
const kEmpty = Symbol('kEmpty')
const kEof = Symbol('kEof')
function map(fn, options) {

@@ -36,21 +29,15 @@ if (typeof fn !== 'function') {

}
if (options != null) {
validateObject(options, 'options')
}
if ((options === null || options === undefined ? undefined : options.signal) != null) {
validateAbortSignal(options.signal, 'options.signal')
}
let concurrency = 1
if ((options === null || options === undefined ? undefined : options.concurrency) != null) {
concurrency = MathFloor(options.concurrency)
}
validateInteger(concurrency, 'concurrency', 1)
return async function* map() {
var _options$signal, _options$signal2
const ac = new AbortController()

@@ -63,5 +50,3 @@ const stream = this

}
const abort = () => ac.abort()
if (

@@ -76,3 +61,2 @@ options !== null &&

}
options === null || options === undefined

@@ -86,7 +70,5 @@ ? undefined

let done = false
function onDone() {
done = true
}
async function pump() {

@@ -96,11 +78,8 @@ try {

var _val
if (done) {
return
}
if (signal.aborted) {
throw new AbortError()
}
try {

@@ -111,13 +90,9 @@ val = fn(val, signalOpt)

}
if (val === kEmpty) {
continue
}
if (typeof ((_val = val) === null || _val === undefined ? undefined : _val.catch) === 'function') {
val.catch(onDone)
}
queue.push(val)
if (next) {

@@ -127,3 +102,2 @@ next()

}
if (!done && queue.length && queue.length >= concurrency) {

@@ -135,3 +109,2 @@ await new Promise((resolve) => {

}
queue.push(kEof)

@@ -144,5 +117,3 @@ } catch (err) {

var _options$signal3
done = true
if (next) {

@@ -152,3 +123,2 @@ next()

}
options === null || options === undefined

@@ -161,5 +131,3 @@ ? undefined

}
pump()
try {

@@ -169,17 +137,12 @@ while (true) {

const val = await queue[0]
if (val === kEof) {
return
}
if (signal.aborted) {
throw new AbortError()
}
if (val !== kEmpty) {
yield val
}
queue.shift()
if (resume) {

@@ -190,3 +153,2 @@ resume()

}
await new Promise((resolve) => {

@@ -199,3 +161,2 @@ next = resolve

done = true
if (resume) {

@@ -208,3 +169,2 @@ resume()

}
function asIndexedPairs(options = undefined) {

@@ -214,13 +174,9 @@ if (options != null) {

}
if ((options === null || options === undefined ? undefined : options.signal) != null) {
validateAbortSignal(options.signal, 'options.signal')
}
return async function* asIndexedPairs() {
let index = 0
for await (const val of this) {
var _options$signal4
if (

@@ -237,3 +193,2 @@ options !== null &&

}
yield [index++, val]

@@ -243,3 +198,2 @@ }

}
async function some(fn, options = undefined) {

@@ -249,11 +203,9 @@ for await (const unused of filter.call(this, fn, options)) {

}
return false
}
async function every(fn, options = undefined) {
if (typeof fn !== 'function') {
throw new ERR_INVALID_ARG_TYPE('fn', ['Function', 'AsyncFunction'], fn)
} // https://en.wikipedia.org/wiki/De_Morgan%27s_laws
}
// https://en.wikipedia.org/wiki/De_Morgan%27s_laws
return !(await some.call(

@@ -267,3 +219,2 @@ this,

}
async function find(fn, options) {

@@ -273,6 +224,4 @@ for await (const result of filter.call(this, fn, options)) {

}
return undefined
}
async function forEach(fn, options) {

@@ -282,11 +231,9 @@ if (typeof fn !== 'function') {

}
async function forEachFn(value, options) {
await fn(value, options)
return kEmpty
} // eslint-disable-next-line no-unused-vars
}
// eslint-disable-next-line no-unused-vars
for await (const unused of map.call(this, forEachFn, options));
}
function filter(fn, options) {

@@ -296,3 +243,2 @@ if (typeof fn !== 'function') {

}
async function filterFn(value, options) {

@@ -302,10 +248,9 @@ if (await fn(value, options)) {

}
return kEmpty
}
return map.call(this, filterFn, options)
}
return map.call(this, filterFn, options)
} // Specific to provide better error to reduce since the argument is only
// Specific to provide better error to reduce since the argument is only
// missing if the stream has no items in it - but the code is still appropriate
class ReduceAwareErrMissingArgs extends ERR_MISSING_ARGS {

@@ -317,20 +262,14 @@ constructor() {

}
async function reduce(reducer, initialValue, options) {
var _options$signal5
if (typeof reducer !== 'function') {
throw new ERR_INVALID_ARG_TYPE('reducer', ['Function', 'AsyncFunction'], reducer)
}
if (options != null) {
validateObject(options, 'options')
}
if ((options === null || options === undefined ? undefined : options.signal) != null) {
validateAbortSignal(options.signal, 'options.signal')
}
let hasInitialValue = arguments.length > 1
if (

@@ -347,10 +286,7 @@ options !== null &&

this.once('error', () => {}) // The error is already propagated
await finished(this.destroy(err))
throw err
}
const ac = new AbortController()
const signal = ac.signal
if (options !== null && options !== undefined && options.signal) {

@@ -363,11 +299,7 @@ const opts = {

}
let gotAnyItemFromStream = false
try {
for await (const value of this) {
var _options$signal6
gotAnyItemFromStream = true
if (

@@ -382,3 +314,2 @@ options !== null &&

}
if (!hasInitialValue) {

@@ -393,3 +324,2 @@ initialValue = value

}
if (!gotAnyItemFromStream && !hasInitialValue) {

@@ -401,6 +331,4 @@ throw new ReduceAwareErrMissingArgs()

}
return initialValue
}
async function toArray(options) {

@@ -410,12 +338,8 @@ if (options != null) {

}
if ((options === null || options === undefined ? undefined : options.signal) != null) {
validateAbortSignal(options.signal, 'options.signal')
}
const result = []
for await (const val of this) {
var _options$signal7
if (

@@ -432,9 +356,6 @@ options !== null &&

}
ArrayPrototypePush(result, val)
}
return result
}
function flatMap(fn, options) {

@@ -448,3 +369,2 @@ const values = map.call(this, fn, options)

}
function toIntegerOrInfinity(number) {

@@ -454,14 +374,10 @@ // We coerce here to align with the spec

number = Number(number)
if (NumberIsNaN(number)) {
return 0
}
if (number < 0) {
throw new ERR_OUT_OF_RANGE('number', '>= 0', number)
}
return number
}
function drop(number, options = undefined) {

@@ -471,11 +387,8 @@ if (options != null) {

}
if ((options === null || options === undefined ? undefined : options.signal) != null) {
validateAbortSignal(options.signal, 'options.signal')
}
number = toIntegerOrInfinity(number)
return async function* drop() {
var _options$signal8
if (

@@ -490,6 +403,4 @@ options !== null &&

}
for await (const val of this) {
var _options$signal9
if (

@@ -504,3 +415,2 @@ options !== null &&

}
if (number-- <= 0) {

@@ -512,3 +422,2 @@ yield val

}
function take(number, options = undefined) {

@@ -518,11 +427,8 @@ if (options != null) {

}
if ((options === null || options === undefined ? undefined : options.signal) != null) {
validateAbortSignal(options.signal, 'options.signal')
}
number = toIntegerOrInfinity(number)
return async function* take() {
var _options$signal10
if (

@@ -537,6 +443,4 @@ options !== null &&

}
for await (const val of this) {
var _options$signal11
if (

@@ -551,3 +455,2 @@ options !== null &&

}
if (number-- > 0) {

@@ -561,3 +464,2 @@ yield val

}
module.exports.streamReturningOperators = {

@@ -564,0 +466,0 @@ asIndexedPairs,

@@ -21,16 +21,14 @@ // Copyright Joyent, Inc. and other Node contributors.

// USE OR OTHER DEALINGS IN THE SOFTWARE.
// a passthrough stream.
// basically just the most minimal sort of Transform stream.
// Every written chunk gets output as-is.
'use strict'
const { ObjectSetPrototypeOf } = require('../../ours/primordials')
module.exports = PassThrough
const Transform = require('./transform')
ObjectSetPrototypeOf(PassThrough.prototype, Transform.prototype)
ObjectSetPrototypeOf(PassThrough, Transform)
function PassThrough(options) {

@@ -40,5 +38,4 @@ if (!(this instanceof PassThrough)) return new PassThrough(options)

}
PassThrough.prototype._transform = function (chunk, encoding, cb) {
cb(null, chunk)
}
/* replacement start */
const process = require('process')
const process = require('process/')
/* replacement end */

@@ -8,13 +10,7 @@ // Ported from https://github.com/mafintosh/pump with

;('use strict')
const { ArrayIsArray, Promise, SymbolAsyncIterator } = require('../../ours/primordials')
const eos = require('./end-of-stream')
const { once } = require('../../ours/util')
const destroyImpl = require('./destroy')
const Duplex = require('./duplex')
const {

@@ -31,12 +27,7 @@ aggregateTwoErrors,

} = require('../../ours/errors')
const { validateFunction, validateAbortSignal } = require('../validators')
const { isIterable, isReadable, isReadableNodeStream, isNodeStream } = require('./utils')
const AbortController = globalThis.AbortController || require('abort-controller').AbortController
let PassThrough
let Readable
function destroyer(stream, reading, writing) {

@@ -66,3 +57,2 @@ let finished = false

}
function popCallback(streams) {

@@ -75,3 +65,2 @@ // Streams should never be an empty array. It should always contain at least

}
function makeAsyncIterable(val) {

@@ -84,6 +73,4 @@ if (isIterable(val)) {

}
throw new ERR_INVALID_ARG_TYPE('val', ['Readable', 'Iterable', 'AsyncIterable'], val)
}
async function* fromReadable(val) {

@@ -93,10 +80,7 @@ if (!Readable) {

}
yield* Readable.prototype[SymbolAsyncIterator].call(val)
}
async function pump(iterable, writable, finish, { end }) {
let error
let onresolve = null
const resume = (err) => {

@@ -106,3 +90,2 @@ if (err) {

}
if (onresolve) {

@@ -114,3 +97,2 @@ const callback = onresolve

}
const wait = () =>

@@ -130,3 +112,2 @@ new Promise((resolve, reject) => {

})
writable.on('drain', resume)

@@ -140,3 +121,2 @@ const cleanup = eos(

)
try {

@@ -146,3 +126,2 @@ if (writable.writableNeedDrain) {

}
for await (const chunk of iterable) {

@@ -153,7 +132,5 @@ if (!writable.write(chunk)) {

}
if (end) {
writable.end()
}
await wait()

@@ -168,7 +145,5 @@ finish()

}
function pipeline(...streams) {
return pipelineImpl(streams, once(popCallback(streams)))
}
function pipelineImpl(streams, callback, opts) {

@@ -178,19 +153,16 @@ if (streams.length === 1 && ArrayIsArray(streams[0])) {

}
if (streams.length < 2) {
throw new ERR_MISSING_ARGS('streams')
}
const ac = new AbortController()
const signal = ac.signal
const outerSignal = opts === null || opts === undefined ? undefined : opts.signal // Need to cleanup event listeners if last stream is readable
const outerSignal = opts === null || opts === undefined ? undefined : opts.signal
// Need to cleanup event listeners if last stream is readable
// https://github.com/nodejs/node/issues/35452
const lastStreamCleanup = []
validateAbortSignal(outerSignal, 'options.signal')
function abort() {
finishImpl(new AbortError())
}
outerSignal === null || outerSignal === undefined ? undefined : outerSignal.addEventListener('abort', abort)

@@ -201,7 +173,5 @@ let error

let finishCount = 0
function finish(err) {
finishImpl(err, --finishCount === 0)
}
function finishImpl(err, final) {

@@ -211,14 +181,10 @@ if (err && (!error || error.code === 'ERR_STREAM_PREMATURE_CLOSE')) {

}
if (!error && !final) {
return
}
while (destroys.length) {
destroys.shift()(error)
}
outerSignal === null || outerSignal === undefined ? undefined : outerSignal.removeEventListener('abort', abort)
ac.abort()
if (final) {

@@ -228,9 +194,6 @@ if (!error) {

}
process.nextTick(callback, error, value)
}
}
let ret
for (let i = 0; i < streams.length; i++) {

@@ -242,3 +205,2 @@ const stream = streams[i]

const isLastStream = i === streams.length - 1
if (isNodeStream(stream)) {

@@ -248,8 +210,8 @@ if (end) {

destroys.push(destroy)
if (isReadable(stream) && isLastStream) {
lastStreamCleanup.push(cleanup)
}
} // Catch stream errors that occur after pipe/pump has completed.
}
// Catch stream errors that occur after pipe/pump has completed.
function onError(err) {

@@ -260,5 +222,3 @@ if (err && err.name !== 'AbortError' && err.code !== 'ERR_STREAM_PREMATURE_CLOSE') {

}
stream.on('error', onError)
if (isReadable(stream) && isLastStream) {

@@ -270,3 +230,2 @@ lastStreamCleanup.push(() => {

}
if (i === 0) {

@@ -277,3 +236,2 @@ if (typeof stream === 'function') {

})
if (!isIterable(ret)) {

@@ -292,3 +250,2 @@ throw new ERR_INVALID_RETURN_VALUE('Iterable, AsyncIterable or Stream', 'source', ret)

})
if (reading) {

@@ -300,6 +257,7 @@ if (!isIterable(ret, true)) {

var _ret
if (!PassThrough) {
PassThrough = require('./passthrough')
} // If the last argument to pipeline is not a stream
}
// If the last argument to pipeline is not a stream
// we must create a proxy stream so that pipeline(...)

@@ -311,7 +269,7 @@ // always returns a stream which can be further

objectMode: true
}) // Handle Promises/A+ spec, `then` could be a getter that throws on
})
// Handle Promises/A+ spec, `then` could be a getter that throws on
// second use.
const then = (_ret = ret) === null || _ret === undefined ? undefined : _ret.then
if (typeof then === 'function') {

@@ -323,11 +281,8 @@ finishCount++

value = val
if (val != null) {
pt.write(val)
}
if (end) {
pt.end()
}
process.nextTick(finish)

@@ -348,7 +303,5 @@ },

}
ret = pt
const { destroy, cleanup } = destroyer(ret, false, true)
destroys.push(destroy)
if (isLastStream) {

@@ -364,3 +317,2 @@ lastStreamCleanup.push(cleanup)

})
if (isReadable(stream) && isLastStream) {

@@ -377,3 +329,2 @@ lastStreamCleanup.push(cleanup)

}
ret = stream

@@ -384,3 +335,2 @@ } else {

}
if (

@@ -392,6 +342,4 @@ (signal !== null && signal !== undefined && signal.aborted) ||

}
return ret
}
function pipe(src, dst, finish, { end }) {

@@ -408,3 +356,2 @@ let ended = false

})
if (end) {

@@ -421,3 +368,2 @@ // Compat. Before node v10.12.0 stdio used to throw an error so

}
eos(

@@ -431,3 +377,2 @@ src,

const rState = src._readableState
if (

@@ -464,3 +409,2 @@ err &&

}
module.exports = {

@@ -467,0 +411,0 @@ pipelineImpl,

/* replacement start */
const process = require('process')
const process = require('process/')
/* replacement end */

@@ -26,3 +28,2 @@ // Copyright Joyent, Inc. and other Node contributors.

;('use strict')
const {

@@ -41,26 +42,15 @@ ArrayPrototypeIndexOf,

} = require('../../ours/primordials')
module.exports = Readable
Readable.ReadableState = ReadableState
const { EventEmitter: EE } = require('events')
const { Stream, prependListener } = require('./legacy')
const { Buffer } = require('buffer')
const { addAbortSignal } = require('./add-abort-signal')
const eos = require('./end-of-stream')
let debug = require('../../ours/util').debuglog('stream', (fn) => {
debug = fn
})
const BufferList = require('./buffer_list')
const destroyImpl = require('./destroy')
const { getHighWaterMark, getDefaultHighWaterMark } = require('./state')
const {

@@ -76,18 +66,10 @@ aggregateTwoErrors,

} = require('../../ours/errors')
const { validateObject } = require('../validators')
const kPaused = Symbol('kPaused')
const { StringDecoder } = require('string_decoder')
const from = require('./from')
ObjectSetPrototypeOf(Readable.prototype, Stream.prototype)
ObjectSetPrototypeOf(Readable, Stream)
const nop = () => {}
const { errorOrDestroy } = destroyImpl
function ReadableState(options, stream, isDuplex) {

@@ -99,15 +81,18 @@ // Duplex streams are both readable and writable, but share

// These options can be provided separately as readableXXX and writableXXX.
if (typeof isDuplex !== 'boolean') isDuplex = stream instanceof require('./duplex') // Object stream flag. Used to make read(n) ignore n and to
if (typeof isDuplex !== 'boolean') isDuplex = stream instanceof require('./duplex')
// Object stream flag. Used to make read(n) ignore n and to
// make all the buffer merging and length checks go away.
this.objectMode = !!(options && options.objectMode)
if (isDuplex) this.objectMode = this.objectMode || !!(options && options.readableObjectMode)
this.objectMode = !!(options && options.objectMode)
if (isDuplex) this.objectMode = this.objectMode || !!(options && options.readableObjectMode) // The point at which it stops calling _read() to fill the buffer
// The point at which it stops calling _read() to fill the buffer
// Note: 0 is a valid value, means "don't call _read preemptively ever"
this.highWaterMark = options
? getHighWaterMark(this, options, 'readableHighWaterMark', isDuplex)
: getDefaultHighWaterMark(false) // A linked list is used to store data chunks instead of an array because the
: getDefaultHighWaterMark(false)
// A linked list is used to store data chunks instead of an array because the
// linked list can remove elements from the beginning faster than
// array.shift().
this.buffer = new BufferList()

@@ -119,15 +104,18 @@ this.length = 0

this.endEmitted = false
this.reading = false // Stream is still being constructed and cannot be
this.reading = false
// Stream is still being constructed and cannot be
// destroyed until construction finished or failed.
// Async construction is opt in, therefore we start as
// constructed.
this.constructed = true
this.constructed = true // A flag to be able to tell if the event 'readable'/'data' is emitted
// A flag to be able to tell if the event 'readable'/'data' is emitted
// immediately, or on a later tick. We set this to true at first, because
// any actions that shouldn't happen until "later" should generally also
// not happen before the first read call.
this.sync = true
this.sync = true // Whenever we return null, then we set a flag to say
// Whenever we return null, then we set a flag to say
// that we're awaiting a 'readable' event emission.
this.needReadable = false

@@ -137,30 +125,40 @@ this.emittedReadable = false

this.resumeScheduled = false
this[kPaused] = null // True if the error was already emitted and should not be thrown again.
this[kPaused] = null
this.errorEmitted = false // Should close be emitted on destroy. Defaults to true.
// True if the error was already emitted and should not be thrown again.
this.errorEmitted = false
this.emitClose = !options || options.emitClose !== false // Should .destroy() be called after 'end' (and potentially 'finish').
// Should close be emitted on destroy. Defaults to true.
this.emitClose = !options || options.emitClose !== false
this.autoDestroy = !options || options.autoDestroy !== false // Has it been destroyed.
// Should .destroy() be called after 'end' (and potentially 'finish').
this.autoDestroy = !options || options.autoDestroy !== false
this.destroyed = false // Indicates whether the stream has errored. When true no further
// Has it been destroyed.
this.destroyed = false
// Indicates whether the stream has errored. When true no further
// _read calls, 'data' or 'readable' events should occur. This is needed
// since when autoDestroy is disabled we need a way to tell whether the
// stream has failed.
this.errored = null
this.errored = null // Indicates whether the stream has finished destroying.
// Indicates whether the stream has finished destroying.
this.closed = false
this.closed = false // True if close has been emitted or would have been emitted
// True if close has been emitted or would have been emitted
// depending on emitClose.
this.closeEmitted = false
this.closeEmitted = false // Crypto is kind of old and crusty. Historically, its default string
// Crypto is kind of old and crusty. Historically, its default string
// encoding is 'binary' so we have to make this configurable.
// Everything else in the universe uses 'utf8', though.
this.defaultEncoding = (options && options.defaultEncoding) || 'utf8'
this.defaultEncoding = (options && options.defaultEncoding) || 'utf8' // Ref the piped dest which we need a drain event on it
// Ref the piped dest which we need a drain event on it
// type: null | Writable | Set<Writable>.
this.awaitDrainWriters = null
this.multiAwaitDrain = false // If true, a maybeReadMore has been scheduled.
this.multiAwaitDrain = false
// If true, a maybeReadMore has been scheduled.
this.readingMore = false

@@ -170,3 +168,2 @@ this.dataEmitted = false

this.encoding = null
if (options && options.encoding) {

@@ -177,11 +174,9 @@ this.decoder = new StringDecoder(options.encoding)

}
function Readable(options) {
if (!(this instanceof Readable)) return new Readable(options)
function Readable(options) {
if (!(this instanceof Readable)) return new Readable(options) // Checking for a Stream.Duplex instance is faster here instead of inside
// Checking for a Stream.Duplex instance is faster here instead of inside
// the ReadableState constructor, at least with V8 6.5.
const isDuplex = this instanceof require('./duplex')
this._readableState = new ReadableState(options, this, isDuplex)
if (options) {

@@ -193,3 +188,2 @@ if (typeof options.read === 'function') this._read = options.read

}
Stream.call(this, options)

@@ -202,25 +196,23 @@ destroyImpl.construct(this, () => {

}
Readable.prototype.destroy = destroyImpl.destroy
Readable.prototype._undestroy = destroyImpl.undestroy
Readable.prototype._destroy = function (err, cb) {
cb(err)
}
Readable.prototype[EE.captureRejectionSymbol] = function (err) {
this.destroy(err)
} // Manually shove something into the read() buffer.
}
// Manually shove something into the read() buffer.
// This returns true if the highWaterMark has not been hit yet,
// similar to how Writable.write() returns true if you should
// write() some more.
Readable.prototype.push = function (chunk, encoding) {
return readableAddChunk(this, chunk, encoding, false)
} // Unshift should *always* be something directly out of read().
}
// Unshift should *always* be something directly out of read().
Readable.prototype.unshift = function (chunk, encoding) {
return readableAddChunk(this, chunk, encoding, true)
}
function readableAddChunk(stream, chunk, encoding, addToFront) {

@@ -230,7 +222,5 @@ debug('readableAddChunk', chunk)

let err
if (!state.objectMode) {
if (typeof chunk === 'string') {
encoding = encoding || state.defaultEncoding
if (state.encoding !== encoding) {

@@ -255,3 +245,2 @@ if (addToFront && state.encoding) {

}
if (err) {

@@ -273,3 +262,2 @@ errorOrDestroy(stream, err)

state.reading = false
if (state.decoder && !encoding) {

@@ -286,9 +274,9 @@ chunk = state.decoder.write(chunk)

maybeReadMore(stream, state)
} // We can push more data if we are below the highWaterMark.
}
// We can push more data if we are below the highWaterMark.
// Also, if we have no data yet, we can stand some more bytes.
// This is to work around cases where hwm=0, such as the repl.
return !state.ended && (state.length < state.highWaterMark || state.length === 0)
}
function addChunk(stream, state, chunk, addToFront) {

@@ -303,3 +291,2 @@ if (state.flowing && state.length === 0 && !state.sync && stream.listenerCount('data') > 0) {

}
state.dataEmitted = true

@@ -314,24 +301,21 @@ stream.emit('data', chunk)

}
maybeReadMore(stream, state)
}
Readable.prototype.isPaused = function () {
const state = this._readableState
return state[kPaused] === true || state.flowing === false
} // Backwards compatibility.
}
// Backwards compatibility.
Readable.prototype.setEncoding = function (enc) {
const decoder = new StringDecoder(enc)
this._readableState.decoder = decoder // If setEncoding(null), decoder.encoding equals utf8.
this._readableState.decoder = decoder
// If setEncoding(null), decoder.encoding equals utf8.
this._readableState.encoding = this._readableState.decoder.encoding
const buffer = this._readableState.buffer // Iterate over current buffer to convert already stored Buffers:
const buffer = this._readableState.buffer
// Iterate over current buffer to convert already stored Buffers:
let content = ''
for (const data of buffer) {
content += decoder.write(data)
}
buffer.clear()

@@ -341,6 +325,6 @@ if (content !== '') buffer.push(content)

return this
} // Don't raise the hwm > 1GB.
}
// Don't raise the hwm > 1GB.
const MAX_HWM = 0x40000000
function computeNewHighWaterMark(n) {

@@ -360,11 +344,10 @@ if (n > MAX_HWM) {

}
return n
}
return n
} // This function is designed to be inlinable, so please take care when making
// This function is designed to be inlinable, so please take care when making
// changes to the function body.
function howMuchToRead(n, state) {
if (n <= 0 || (state.length === 0 && state.ended)) return 0
if (state.objectMode) return 1
if (NumberIsNaN(n)) {

@@ -375,11 +358,11 @@ // Only flow one buffer at a time.

}
if (n <= state.length) return n
return state.ended ? state.length : 0
} // You can override either this method, or the async _read(n) below.
}
// You can override either this method, or the async _read(n) below.
Readable.prototype.read = function (n) {
debug('read', n) // Same as parseInt(undefined, 10), however V8 7.3 performance regressed
debug('read', n)
// Same as parseInt(undefined, 10), however V8 7.3 performance regressed
// in this scenario, so we are doing it manually.
if (n === undefined) {

@@ -390,11 +373,12 @@ n = NaN

}
const state = this._readableState
const nOrig = n // If we're asking for more than the current hwm, then raise the hwm.
const nOrig = n
// If we're asking for more than the current hwm, then raise the hwm.
if (n > state.highWaterMark) state.highWaterMark = computeNewHighWaterMark(n)
if (n !== 0) state.emittedReadable = false // If we're doing read(0) to trigger a readable event, but we
if (n !== 0) state.emittedReadable = false
// If we're doing read(0) to trigger a readable event, but we
// already have a bunch of data in the buffer, then just trigger
// the 'readable' event and move on.
if (

@@ -410,9 +394,11 @@ n === 0 &&

}
n = howMuchToRead(n, state)
n = howMuchToRead(n, state) // If we've ended, and we're now clear, then finish it up.
// If we've ended, and we're now clear, then finish it up.
if (n === 0 && state.ended) {
if (state.length === 0) endReadable(this)
return null
} // All the actual chunk generation logic needs to be
}
// All the actual chunk generation logic needs to be
// *below* the call to _read. The reason is that in certain

@@ -438,14 +424,16 @@ // synthetic stream cases, such as passthrough streams, _read

// 3. Actually pull the requested chunks out of the buffer and return.
// if we need a readable event, then we need to do some reading.
let doRead = state.needReadable
debug('need readable', doRead) // If we currently have less than the highWaterMark, then also read some.
debug('need readable', doRead)
// If we currently have less than the highWaterMark, then also read some.
if (state.length === 0 || state.length - n < state.highWaterMark) {
doRead = true
debug('length less than watermark', doRead)
} // However, if we've ended, then there's no point, if we're already
}
// However, if we've ended, then there's no point, if we're already
// reading, then it's unnecessary, if we're constructing we have to wait,
// and if we're destroyed or errored, then it's not allowed,
if (state.ended || state.reading || state.destroyed || state.errored || !state.constructed) {

@@ -457,6 +445,7 @@ doRead = false

state.reading = true
state.sync = true // If the length is currently zero, then we *need* a readable event.
state.sync = true
// If the length is currently zero, then we *need* a readable event.
if (state.length === 0) state.needReadable = true
if (state.length === 0) state.needReadable = true // Call internal read method
// Call internal read method
try {

@@ -467,13 +456,10 @@ this._read(state.highWaterMark)

}
state.sync = false // If _read pushed data synchronously, then `reading` will be false,
state.sync = false
// If _read pushed data synchronously, then `reading` will be false,
// and we need to re-evaluate how much data we can return to the user.
if (!state.reading) n = howMuchToRead(nOrig, state)
}
let ret
if (n > 0) ret = fromList(n, state)
else ret = null
if (ret === null) {

@@ -484,3 +470,2 @@ state.needReadable = state.length <= state.highWaterMark

state.length -= n
if (state.multiAwaitDrain) {

@@ -492,11 +477,10 @@ state.awaitDrainWriters.clear()

}
if (state.length === 0) {
// If we have nothing in the buffer, then we want to know
// as soon as we *do* get something into the buffer.
if (!state.ended) state.needReadable = true // If we tried to read() past the EOF, then emit end on the next tick.
if (!state.ended) state.needReadable = true
// If we tried to read() past the EOF, then emit end on the next tick.
if (nOrig !== n && state.ended) endReadable(this)
}
if (ret !== null && !state.errorEmitted && !state.closeEmitted) {

@@ -506,13 +490,9 @@ state.dataEmitted = true

}
return ret
}
function onEofChunk(stream, state) {
debug('onEofChunk')
if (state.ended) return
if (state.decoder) {
const chunk = state.decoder.end()
if (chunk && chunk.length) {

@@ -523,5 +503,3 @@ state.buffer.push(chunk)

}
state.ended = true
if (state.sync) {

@@ -535,11 +513,12 @@ // If we are sync, wait until next tick to emit the data.

state.needReadable = false
state.emittedReadable = true // We have to emit readable now that we are EOF. Modules
state.emittedReadable = true
// We have to emit readable now that we are EOF. Modules
// in the ecosystem (e.g. dicer) rely on this event being sync.
emitReadable_(stream)
}
} // Don't emit readable right away in sync mode, because this can trigger
}
// Don't emit readable right away in sync mode, because this can trigger
// another read() call => stack overflow. This way, it might trigger
// a nextTick recursion warning, but that's not so bad.
function emitReadable(stream) {

@@ -549,3 +528,2 @@ const state = stream._readableState

state.needReadable = false
if (!state.emittedReadable) {

@@ -557,11 +535,11 @@ debug('emitReadable', state.flowing)

}
function emitReadable_(stream) {
const state = stream._readableState
debug('emitReadable_', state.destroyed, state.length, state.ended)
if (!state.destroyed && !state.errored && (state.length || state.ended)) {
stream.emit('readable')
state.emittedReadable = false
} // The stream needs another readable event if:
}
// The stream needs another readable event if:
// 1. It is not flowing, as the flow mechanism will take

@@ -572,6 +550,7 @@ // care of it.

// another readable later.
state.needReadable = !state.flowing && !state.ended && state.length <= state.highWaterMark
flow(stream)
} // At this point, the user has presumably seen the 'readable' event,
}
// At this point, the user has presumably seen the 'readable' event,
// and called read() to consume some data. that may have triggered

@@ -582,3 +561,2 @@ // in turn another _read(n) call, in which case reading = true if

// then go ahead and try to read some more preemptively.
function maybeReadMore(stream, state) {

@@ -590,3 +568,2 @@ if (!state.readingMore && state.constructed) {

}
function maybeReadMore_(stream, state) {

@@ -628,17 +605,15 @@ // Attempt to read more data if we should.

}
state.readingMore = false
}
state.readingMore = false
} // Abstract method. to be overridden in specific implementation classes.
// Abstract method. to be overridden in specific implementation classes.
// call cb(er, data) where data is <= n in length.
// for virtual (non-string, non-buffer) streams, "length" is somewhat
// arbitrary, and perhaps not very meaningful.
Readable.prototype._read = function (n) {
throw new ERR_METHOD_NOT_IMPLEMENTED('_read()')
}
Readable.prototype.pipe = function (dest, pipeOpts) {
const src = this
const state = this._readableState
if (state.pipes.length === 1) {

@@ -650,3 +625,2 @@ if (!state.multiAwaitDrain) {

}
state.pipes.push(dest)

@@ -659,6 +633,4 @@ debug('pipe count=%d opts=%j', state.pipes.length, pipeOpts)

dest.on('unpipe', onunpipe)
function onunpipe(readable, unpipeInfo) {
debug('onunpipe')
if (readable === src) {

@@ -671,3 +643,2 @@ if (unpipeInfo && unpipeInfo.hasUnpiped === false) {

}
function onend() {

@@ -677,16 +648,12 @@ debug('onend')

}
let ondrain
let cleanedUp = false
function cleanup() {
debug('cleanup') // Cleanup event handlers once the pipe is broken.
debug('cleanup')
// Cleanup event handlers once the pipe is broken.
dest.removeListener('close', onclose)
dest.removeListener('finish', onfinish)
if (ondrain) {
dest.removeListener('drain', ondrain)
}
dest.removeListener('error', onerror)

@@ -697,3 +664,5 @@ dest.removeListener('unpipe', onunpipe)

src.removeListener('data', ondata)
cleanedUp = true // If the reader is waiting for a drain event from this
cleanedUp = true
// If the reader is waiting for a drain event from this
// specific writer, then it would cause it to never start

@@ -703,6 +672,4 @@ // flowing again.

// If we don't know, then assume that we are waiting for one.
if (ondrain && state.awaitDrainWriters && (!dest._writableState || dest._writableState.needDrain)) ondrain()
}
function pause() {

@@ -722,6 +689,4 @@ // If the user unpiped during `dest.write()`, it is possible

}
src.pause()
}
if (!ondrain) {

@@ -736,5 +701,3 @@ // When the dest drains, it reduces the awaitDrain counter

}
src.on('data', ondata)
function ondata(chunk) {

@@ -744,9 +707,9 @@ debug('ondata')

debug('dest.write', ret)
if (ret === false) {
pause()
}
} // If the dest has an error, then stop piping into it.
}
// If the dest has an error, then stop piping into it.
// However, don't suppress the throwing behavior for this.
function onerror(er) {

@@ -756,6 +719,4 @@ debug('onerror', er)

dest.removeListener('error', onerror)
if (dest.listenerCount('error') === 0) {
const s = dest._writableState || dest._readableState
if (s && !s.errorEmitted) {

@@ -768,6 +729,8 @@ // User incorrectly emitted 'error' directly on the stream.

}
} // Make sure our error handler is attached before userland ones.
}
prependListener(dest, 'error', onerror) // Both close and finish should trigger unpipe, but only once.
// Make sure our error handler is attached before userland ones.
prependListener(dest, 'error', onerror)
// Both close and finish should trigger unpipe, but only once.
function onclose() {

@@ -777,5 +740,3 @@ dest.removeListener('finish', onfinish)

}
dest.once('close', onclose)
function onfinish() {

@@ -786,12 +747,13 @@ debug('onfinish')

}
dest.once('finish', onfinish)
function unpipe() {
debug('unpipe')
src.unpipe(dest)
} // Tell the dest that it's being piped to.
}
dest.emit('pipe', src) // Start the flow if it hasn't been started already.
// Tell the dest that it's being piped to.
dest.emit('pipe', src)
// Start the flow if it hasn't been started already.
if (dest.writableNeedDrain === true) {

@@ -805,12 +767,11 @@ if (state.flowing) {

}
return dest
}
function pipeOnDrain(src, dest) {
return function pipeOnDrainFunctionResult() {
const state = src._readableState // `ondrain` will call directly,
const state = src._readableState
// `ondrain` will call directly,
// `this` maybe not a reference to dest,
// so we use the real dest here.
if (state.awaitDrainWriters === dest) {

@@ -823,3 +784,2 @@ debug('pipeOnDrain', 1)

}
if ((!state.awaitDrainWriters || state.awaitDrainWriters.size === 0) && src.listenerCount('data')) {

@@ -830,3 +790,2 @@ src.resume()

}
Readable.prototype.unpipe = function (dest) {

@@ -836,6 +795,6 @@ const state = this._readableState

hasUnpiped: false
} // If we're not piping anywhere, then do nothing.
}
// If we're not piping anywhere, then do nothing.
if (state.pipes.length === 0) return this
if (!dest) {

@@ -846,3 +805,2 @@ // remove all.

this.pause()
for (let i = 0; i < dests.length; i++)

@@ -852,6 +810,6 @@ dests[i].emit('unpipe', this, {

})
return this
} // Try to find the right one.
}
// Try to find the right one.
const index = ArrayPrototypeIndexOf(state.pipes, dest)

@@ -863,14 +821,15 @@ if (index === -1) return this

return this
} // Set up data events if they are asked for
}
// Set up data events if they are asked for
// Ensure readable listeners eventually get something.
Readable.prototype.on = function (ev, fn) {
const res = Stream.prototype.on.call(this, ev, fn)
const state = this._readableState
if (ev === 'data') {
// Update readableListening so that resume() may be a no-op
// a few lines down. This is needed to support once('readable').
state.readableListening = this.listenerCount('readable') > 0 // Try start flowing on next tick if stream isn't explicitly paused.
state.readableListening = this.listenerCount('readable') > 0
// Try start flowing on next tick if stream isn't explicitly paused.
if (state.flowing !== false) this.resume()

@@ -883,3 +842,2 @@ } else if (ev === 'readable') {

debug('on readable', state.length, state.reading)
if (state.length) {

@@ -892,11 +850,7 @@ emitReadable(this)

}
return res
}
Readable.prototype.addListener = Readable.prototype.on
Readable.prototype.removeListener = function (ev, fn) {
const res = Stream.prototype.removeListener.call(this, ev, fn)
if (ev === 'readable') {

@@ -911,11 +865,7 @@ // We need to check if there is someone still listening to

}
return res
}
Readable.prototype.off = Readable.prototype.removeListener
Readable.prototype.removeAllListeners = function (ev) {
const res = Stream.prototype.removeAllListeners.apply(this, arguments)
if (ev === 'readable' || ev === undefined) {

@@ -930,14 +880,13 @@ // We need to check if there is someone still listening to

}
return res
}
function updateReadableListening(self) {
const state = self._readableState
state.readableListening = self.listenerCount('readable') > 0
if (state.resumeScheduled && state[kPaused] === false) {
// Flowing needs to be set to true now, otherwise
// the upcoming resume will not flow.
state.flowing = true // Crude way to check if we should resume.
state.flowing = true
// Crude way to check if we should resume.
} else if (self.listenerCount('data') > 0) {

@@ -949,25 +898,22 @@ self.resume()

}
function nReadingNextTick(self) {
debug('readable nexttick read 0')
self.read(0)
} // pause() and resume() are remnants of the legacy readable stream API
}
// pause() and resume() are remnants of the legacy readable stream API
// If the user uses them, then switch into old mode.
Readable.prototype.resume = function () {
const state = this._readableState
if (!state.flowing) {
debug('resume') // We flow only if there is no one listening
debug('resume')
// We flow only if there is no one listening
// for readable, but we still have to call
// resume().
state.flowing = !state.readableListening
resume(this, state)
}
state[kPaused] = false
return this
}
function resume(stream, state) {

@@ -979,10 +925,7 @@ if (!state.resumeScheduled) {

}
function resume_(stream, state) {
debug('resume', state.reading)
if (!state.reading) {
stream.read(0)
}
state.resumeScheduled = false

@@ -993,6 +936,4 @@ stream.emit('resume')

}
Readable.prototype.pause = function () {
debug('call pause flowing=%j', this._readableState.flowing)
if (this._readableState.flowing !== false) {

@@ -1003,18 +944,18 @@ debug('pause')

}
this._readableState[kPaused] = true
return this
}
function flow(stream) {
const state = stream._readableState
debug('flow', state.flowing)
while (state.flowing && stream.read() !== null);
}
while (state.flowing && stream.read() !== null);
} // Wrap an old-style stream as the async data source.
// Wrap an old-style stream as the async data source.
// This is *not* part of the readable stream interface.
// It is an ugly unfortunate mess of history.
Readable.prototype.wrap = function (stream) {
let paused = false
Readable.prototype.wrap = function (stream) {
let paused = false // TODO (ronag): Should this.destroy(err) emit
// TODO (ronag): Should this.destroy(err) emit
// 'error' on the wrapped stream? Would require

@@ -1041,3 +982,2 @@ // a static factory method, e.g. Readable.wrap(stream).

})
this._read = () => {

@@ -1048,9 +988,8 @@ if (paused && stream.resume) {

}
} // Proxy all the other methods. Important when wrapping filters and duplexes.
}
// Proxy all the other methods. Important when wrapping filters and duplexes.
const streamKeys = ObjectKeys(stream)
for (let j = 1; j < streamKeys.length; j++) {
const i = streamKeys[j]
if (this[i] === undefined && typeof stream[i] === 'function') {

@@ -1060,10 +999,7 @@ this[i] = stream[i].bind(stream)

}
return this
}
Readable.prototype[SymbolAsyncIterator] = function () {
return streamToAsyncIterator(this)
}
Readable.prototype.iterator = function (options) {

@@ -1073,6 +1009,4 @@ if (options !== undefined) {

}
return streamToAsyncIterator(this, options)
}
function streamToAsyncIterator(stream, options) {

@@ -1084,3 +1018,2 @@ if (typeof stream.read !== 'function') {

}
const iter = createAsyncIterator(stream, options)

@@ -1090,6 +1023,4 @@ iter.stream = stream

}
async function* createAsyncIterator(stream, options) {
let callback = nop
function next(resolve) {

@@ -1103,3 +1034,2 @@ if (this === stream) {

}
stream.on('readable', next)

@@ -1118,7 +1048,5 @@ let error

)
try {
while (true) {
const chunk = stream.destroyed ? null : stream.read()
if (chunk !== null) {

@@ -1148,19 +1076,18 @@ yield chunk

}
} // Making it explicit these properties are not enumerable
}
// Making it explicit these properties are not enumerable
// because otherwise some prototype manipulation in
// userland will fail.
ObjectDefineProperties(Readable.prototype, {
readable: {
__proto__: null,
get() {
const r = this._readableState // r.readable === false means that this is part of a Duplex stream
const r = this._readableState
// r.readable === false means that this is part of a Duplex stream
// where the readable side was disabled upon construction.
// Compat. The user might manually disable readable side through
// deprecated setter.
return !!r && r.readable !== false && !r.destroyed && !r.errorEmitted && !r.endEmitted
},
set(val) {

@@ -1220,3 +1147,2 @@ // Backwards compat.

enumerable: false,
get() {

@@ -1229,3 +1155,2 @@ return this._readableState.length

enumerable: false,
get() {

@@ -1238,3 +1163,2 @@ return this._readableState ? this._readableState.objectMode : false

enumerable: false,
get() {

@@ -1247,3 +1171,2 @@ return this._readableState ? this._readableState.encoding : null

enumerable: false,
get() {

@@ -1255,3 +1178,2 @@ return this._readableState ? this._readableState.errored : null

__proto__: null,
get() {

@@ -1264,7 +1186,5 @@ return this._readableState ? this._readableState.closed : false

enumerable: false,
get() {
return this._readableState ? this._readableState.destroyed : false
},
set(value) {

@@ -1275,5 +1195,6 @@ // We ignore the value if the stream

return
} // Backward compatibility, the user is explicitly
}
// Backward compatibility, the user is explicitly
// managing destroyed.
this._readableState.destroyed = value

@@ -1285,3 +1206,2 @@ }

enumerable: false,
get() {

@@ -1296,3 +1216,2 @@ return this._readableState ? this._readableState.endEmitted : false

__proto__: null,
get() {

@@ -1305,7 +1224,5 @@ return this.pipes.length

__proto__: null,
get() {
return this[kPaused] !== false
},
set(value) {

@@ -1315,9 +1232,11 @@ this[kPaused] = !!value

}
}) // Exposed for testing purposes only.
})
Readable._fromList = fromList // Pluck off n bytes from an array of buffers.
// Exposed for testing purposes only.
Readable._fromList = fromList
// Pluck off n bytes from an array of buffers.
// Length is the combined lengths of all the buffers in the list.
// This function is designed to be inlinable, so please take care when making
// changes to the function body.
function fromList(n, state) {

@@ -1340,7 +1259,5 @@ // nothing buffered.

}
function endReadable(stream) {
const state = stream._readableState
debug('endReadable', state.endEmitted)
if (!state.endEmitted) {

@@ -1351,10 +1268,9 @@ state.ended = true

}
function endReadableNT(state, stream) {
debug('endReadableNT', state.endEmitted, state.length) // Check that we didn't get one last unshift.
debug('endReadableNT', state.endEmitted, state.length)
// Check that we didn't get one last unshift.
if (!state.errored && !state.closeEmitted && !state.endEmitted && state.length === 0) {
state.endEmitted = true
stream.emit('end')
if (stream.writable && stream.allowHalfOpen === false) {

@@ -1368,6 +1284,6 @@ process.nextTick(endWritableNT, stream)

!wState ||
(wState.autoDestroy && // We don't expect the writable to ever 'finish'
(wState.autoDestroy &&
// We don't expect the writable to ever 'finish'
// if writable is explicitly set to false.
(wState.finished || wState.writable === false))
if (autoDestroy) {

@@ -1379,6 +1295,4 @@ stream.destroy()

}
function endWritableNT(stream) {
const writable = stream.writable && !stream.writableEnded && !stream.destroyed
if (writable) {

@@ -1388,9 +1302,8 @@ stream.end()

}
Readable.from = function (iterable, opts) {
return from(Readable, iterable, opts)
}
let webStreamsAdapters
let webStreamsAdapters // Lazy to avoid circular references
// Lazy to avoid circular references
function lazyWebStreams() {

@@ -1400,14 +1313,10 @@ if (webStreamsAdapters === undefined) webStreamsAdapters = {}

}
Readable.fromWeb = function (readableStream, options) {
return lazyWebStreams().newStreamReadableFromReadableStream(readableStream, options)
}
Readable.toWeb = function (streamReadable, options) {
return lazyWebStreams().newReadableStreamFromStreamReadable(streamReadable, options)
}
Readable.wrap = function (src, options) {
var _ref, _src$readableObjectMo
return new Readable({

@@ -1422,3 +1331,2 @@ objectMode:

...options,
destroy(err, callback) {

@@ -1425,0 +1333,0 @@ destroyImpl.destroyer(src, err)

'use strict'
const { MathFloor, NumberIsInteger } = require('../../ours/primordials')
const { ERR_INVALID_ARG_VALUE } = require('../../ours/errors').codes
function highWaterMarkFrom(options, isDuplex, duplexKey) {
return options.highWaterMark != null ? options.highWaterMark : isDuplex ? options[duplexKey] : null
}
function getDefaultHighWaterMark(objectMode) {
return objectMode ? 16 : 16 * 1024
}
function getHighWaterMark(state, options, duplexKey, isDuplex) {
const hwm = highWaterMarkFrom(options, isDuplex, duplexKey)
if (hwm != null) {

@@ -23,9 +18,8 @@ if (!NumberIsInteger(hwm) || hwm < 0) {

}
return MathFloor(hwm)
} // Default value
}
// Default value
return getDefaultHighWaterMark(state.objectMode)
}
module.exports = {

@@ -32,0 +26,0 @@ getHighWaterMark,

@@ -21,2 +21,3 @@ // Copyright Joyent, Inc. and other Node contributors.

// USE OR OTHER DEALINGS IN THE SOFTWARE.
// a transform stream is a readable/writable stream where you do

@@ -63,25 +64,20 @@ // something with the data. Sometimes it's called a "filter",

// the results of the previous transformed chunk were consumed.
'use strict'
const { ObjectSetPrototypeOf, Symbol } = require('../../ours/primordials')
module.exports = Transform
const { ERR_METHOD_NOT_IMPLEMENTED } = require('../../ours/errors').codes
const Duplex = require('./duplex')
const { getHighWaterMark } = require('./state')
ObjectSetPrototypeOf(Transform.prototype, Duplex.prototype)
ObjectSetPrototypeOf(Transform, Duplex)
const kCallback = Symbol('kCallback')
function Transform(options) {
if (!(this instanceof Transform)) return new Transform(options)
function Transform(options) {
if (!(this instanceof Transform)) return new Transform(options) // TODO (ronag): This should preferably always be
// TODO (ronag): This should preferably always be
// applied but would be semver-major. Or even better;
// make Transform a Readable with the Writable interface.
const readableHighWaterMark = options ? getHighWaterMark(this, options, 'readableHighWaterMark', true) : null
if (readableHighWaterMark === 0) {

@@ -102,21 +98,20 @@ // A Duplex will buffer both on the writable and readable side while

}
Duplex.call(this, options)
Duplex.call(this, options) // We have implemented the _read method, and done the other things
// We have implemented the _read method, and done the other things
// that Readable wants before the first _read call, so unset the
// sync guard flag.
this._readableState.sync = false
this[kCallback] = null
if (options) {
if (typeof options.transform === 'function') this._transform = options.transform
if (typeof options.flush === 'function') this._flush = options.flush
} // When the writable side finishes, then flush out anything remaining.
}
// When the writable side finishes, then flush out anything remaining.
// Backwards compat. Some Transform streams incorrectly implement _final
// instead of or in addition to _flush. By using 'prefinish' instead of
// implementing _final we continue supporting this unfortunate use case.
this.on('prefinish', prefinish)
}
function final(cb) {

@@ -131,12 +126,8 @@ if (typeof this._flush === 'function' && !this.destroyed) {

}
return
}
if (data != null) {
this.push(data)
}
this.push(null)
if (cb) {

@@ -148,3 +139,2 @@ cb()

this.push(null)
if (cb) {

@@ -155,3 +145,2 @@ cb()

}
function prefinish() {

@@ -162,9 +151,6 @@ if (this._final !== final) {

}
Transform.prototype._final = final
Transform.prototype._transform = function (chunk, encoding, callback) {
throw new ERR_METHOD_NOT_IMPLEMENTED('_transform()')
}
Transform.prototype._write = function (chunk, encoding, callback) {

@@ -174,3 +160,2 @@ const rState = this._readableState

const length = rState.length
this._transform(chunk, encoding, (err, val) => {

@@ -181,10 +166,10 @@ if (err) {

}
if (val != null) {
this.push(val)
}
if (
wState.ended || // Backwards compat.
length === rState.length || // Backwards compat.
wState.ended ||
// Backwards compat.
length === rState.length ||
// Backwards compat.
rState.length < rState.highWaterMark

@@ -198,3 +183,2 @@ ) {

}
Transform.prototype._read = function () {

@@ -201,0 +185,0 @@ if (this[kCallback]) {

'use strict'
const { Symbol, SymbolAsyncIterator, SymbolIterator } = require('../../ours/primordials')
const kDestroyed = Symbol('kDestroyed')

@@ -9,6 +8,4 @@ const kIsErrored = Symbol('kIsErrored')

const kIsDisturbed = Symbol('kIsDisturbed')
function isReadableNodeStream(obj, strict = false) {
var _obj$_readableState
return !!(

@@ -23,3 +20,4 @@ (

? undefined
: _obj$_readableState.readable) !== false) && // Duplex
: _obj$_readableState.readable) !== false) &&
// Duplex
(!obj._writableState || obj._readableState)

@@ -32,3 +30,2 @@ ) // Writable has .pipe.

var _obj$_writableState
return !!(

@@ -56,3 +53,2 @@ (

}
function isNodeStream(obj) {

@@ -67,3 +63,2 @@ return (

}
function isIterable(obj, isAsync) {

@@ -75,3 +70,2 @@ if (obj == null) return false

}
function isDestroyed(stream) {

@@ -83,4 +77,5 @@ if (!isNodeStream(stream)) return null

return !!(stream.destroyed || stream[kDestroyed] || (state !== null && state !== undefined && state.destroyed))
} // Have been end():d.
}
// Have been end():d.
function isWritableEnded(stream) {

@@ -93,4 +88,5 @@ if (!isWritableNodeStream(stream)) return null

return wState.ended
} // Have emitted 'finish'.
}
// Have emitted 'finish'.
function isWritableFinished(stream, strict) {

@@ -103,4 +99,5 @@ if (!isWritableNodeStream(stream)) return null

return !!(wState.finished || (strict === false && wState.ended === true && wState.length === 0))
} // Have been push(null):d.
}
// Have been push(null):d.
function isReadableEnded(stream) {

@@ -113,4 +110,5 @@ if (!isReadableNodeStream(stream)) return null

return rState.ended
} // Have emitted 'end'.
}
// Have emitted 'end'.
function isReadableFinished(stream, strict) {

@@ -123,3 +121,2 @@ if (!isReadableNodeStream(stream)) return null

}
function isReadable(stream) {

@@ -131,3 +128,2 @@ if (stream && stream[kIsReadable] != null) return stream[kIsReadable]

}
function isWritable(stream) {

@@ -138,3 +134,2 @@ if (typeof (stream === null || stream === undefined ? undefined : stream.writable) !== 'boolean') return null

}
function isFinished(stream, opts) {

@@ -144,29 +139,21 @@ if (!isNodeStream(stream)) {

}
if (isDestroyed(stream)) {
return true
}
if ((opts === null || opts === undefined ? undefined : opts.readable) !== false && isReadable(stream)) {
return false
}
if ((opts === null || opts === undefined ? undefined : opts.writable) !== false && isWritable(stream)) {
return false
}
return true
}
function isWritableErrored(stream) {
var _stream$_writableStat, _stream$_writableStat2
if (!isNodeStream(stream)) {
return null
}
if (stream.writableErrored) {
return stream.writableErrored
}
return (_stream$_writableStat =

@@ -179,14 +166,10 @@ (_stream$_writableStat2 = stream._writableState) === null || _stream$_writableStat2 === undefined

}
function isReadableErrored(stream) {
var _stream$_readableStat, _stream$_readableStat2
if (!isNodeStream(stream)) {
return null
}
if (stream.readableErrored) {
return stream.readableErrored
}
return (_stream$_readableStat =

@@ -199,3 +182,2 @@ (_stream$_readableStat2 = stream._readableState) === null || _stream$_readableStat2 === undefined

}
function isClosed(stream) {

@@ -205,10 +187,7 @@ if (!isNodeStream(stream)) {

}
if (typeof stream.closed === 'boolean') {
return stream.closed
}
const wState = stream._writableState
const rState = stream._readableState
if (

@@ -223,10 +202,7 @@ typeof (wState === null || wState === undefined ? undefined : wState.closed) === 'boolean' ||

}
if (typeof stream._closed === 'boolean' && isOutgoingMessage(stream)) {
return stream._closed
}
return null
}
function isOutgoingMessage(stream) {

@@ -240,10 +216,7 @@ return (

}
function isServerResponse(stream) {
return typeof stream._sent100 === 'boolean' && isOutgoingMessage(stream)
}
function isServerRequest(stream) {
var _stream$req
return (

@@ -256,3 +229,2 @@ typeof stream._consuming === 'boolean' &&

}
function willEmitClose(stream) {

@@ -267,6 +239,4 @@ if (!isNodeStream(stream)) return null

}
function isDisturbed(stream) {
var _stream$kIsDisturbed
return !!(

@@ -279,3 +249,2 @@ stream &&

}
function isErrored(stream) {

@@ -292,3 +261,2 @@ var _ref,

_stream$_writableStat4
return !!(

@@ -324,3 +292,2 @@ stream &&

}
module.exports = {

@@ -327,0 +294,0 @@ kDestroyed,

/* replacement start */
const process = require('process')
const process = require('process/')
/* replacement end */

@@ -24,2 +26,3 @@ // Copyright Joyent, Inc. and other Node contributors.

// USE OR OTHER DEALINGS IN THE SOFTWARE.
// A bit simpler than readable streams.

@@ -30,3 +33,2 @@ // Implement an async ._write(chunk, encoding, cb), and it'll handle all

;('use strict')
const {

@@ -43,18 +45,10 @@ ArrayPrototypeSlice,

} = require('../../ours/primordials')
module.exports = Writable
Writable.WritableState = WritableState
const { EventEmitter: EE } = require('events')
const Stream = require('./legacy').Stream
const { Buffer } = require('buffer')
const destroyImpl = require('./destroy')
const { addAbortSignal } = require('./add-abort-signal')
const { getHighWaterMark, getDefaultHighWaterMark } = require('./state')
const {

@@ -71,11 +65,7 @@ ERR_INVALID_ARG_TYPE,

} = require('../../ours/errors').codes
const { errorOrDestroy } = destroyImpl
ObjectSetPrototypeOf(Writable.prototype, Stream.prototype)
ObjectSetPrototypeOf(Writable, Stream)
function nop() {}
const kOnFinished = Symbol('kOnFinished')
function WritableState(options, stream, isDuplex) {

@@ -87,90 +77,114 @@ // Duplex streams are both readable and writable, but share

// e.g. options.readableObjectMode vs. options.writableObjectMode, etc.
if (typeof isDuplex !== 'boolean') isDuplex = stream instanceof require('./duplex') // Object stream flag to indicate whether or not this stream
if (typeof isDuplex !== 'boolean') isDuplex = stream instanceof require('./duplex')
// Object stream flag to indicate whether or not this stream
// contains buffers or objects.
this.objectMode = !!(options && options.objectMode)
if (isDuplex) this.objectMode = this.objectMode || !!(options && options.writableObjectMode)
this.objectMode = !!(options && options.objectMode)
if (isDuplex) this.objectMode = this.objectMode || !!(options && options.writableObjectMode) // The point at which write() starts returning false
// The point at which write() starts returning false
// Note: 0 is a valid value, means that we always return false if
// the entire buffer is not flushed immediately on write().
this.highWaterMark = options
? getHighWaterMark(this, options, 'writableHighWaterMark', isDuplex)
: getDefaultHighWaterMark(false) // if _final has been called.
: getDefaultHighWaterMark(false)
this.finalCalled = false // drain event flag.
// if _final has been called.
this.finalCalled = false
this.needDrain = false // At the start of calling end()
// drain event flag.
this.needDrain = false
// At the start of calling end()
this.ending = false
// When end() has been called, and returned.
this.ended = false
// When 'finish' is emitted.
this.finished = false
this.ending = false // When end() has been called, and returned.
// Has it been destroyed
this.destroyed = false
this.ended = false // When 'finish' is emitted.
this.finished = false // Has it been destroyed
this.destroyed = false // Should we decode strings into buffers before passing to _write?
// Should we decode strings into buffers before passing to _write?
// this is here so that some node-core streams can optimize string
// handling at a lower level.
const noDecode = !!(options && options.decodeStrings === false)
this.decodeStrings = !noDecode
const noDecode = !!(options && options.decodeStrings === false)
this.decodeStrings = !noDecode // Crypto is kind of old and crusty. Historically, its default string
// Crypto is kind of old and crusty. Historically, its default string
// encoding is 'binary' so we have to make this configurable.
// Everything else in the universe uses 'utf8', though.
this.defaultEncoding = (options && options.defaultEncoding) || 'utf8'
this.defaultEncoding = (options && options.defaultEncoding) || 'utf8' // Not an actual buffer we keep track of, but a measurement
// Not an actual buffer we keep track of, but a measurement
// of how much we're waiting to get pushed to some underlying
// socket or file.
this.length = 0
this.length = 0 // A flag to see when we're in the middle of a write.
// A flag to see when we're in the middle of a write.
this.writing = false
this.writing = false // When true all writes will be buffered until .uncork() call.
// When true all writes will be buffered until .uncork() call.
this.corked = 0
this.corked = 0 // A flag to be able to tell if the onwrite cb is called immediately,
// A flag to be able to tell if the onwrite cb is called immediately,
// or on a later tick. We set this to true at first, because any
// actions that shouldn't happen until "later" should generally also
// not happen before the first write call.
this.sync = true
this.sync = true // A flag to know if we're processing previously buffered items, which
// A flag to know if we're processing previously buffered items, which
// may call the _write() callback in the same tick, so that we don't
// end up in an overlapped onwrite situation.
this.bufferProcessing = false
this.bufferProcessing = false // The callback that's passed to _write(chunk, cb).
// The callback that's passed to _write(chunk, cb).
this.onwrite = onwrite.bind(undefined, stream)
this.onwrite = onwrite.bind(undefined, stream) // The callback that the user supplies to write(chunk, encoding, cb).
// The callback that the user supplies to write(chunk, encoding, cb).
this.writecb = null
this.writecb = null // The amount that is being written when _write is called.
// The amount that is being written when _write is called.
this.writelen = 0
this.writelen = 0 // Storage for data passed to the afterWrite() callback in case of
// Storage for data passed to the afterWrite() callback in case of
// synchronous _write() completion.
this.afterWriteTickInfo = null
resetBuffer(this)
this.afterWriteTickInfo = null
resetBuffer(this) // Number of pending user-supplied write callbacks
// Number of pending user-supplied write callbacks
// this must be 0 before 'finish' can be emitted.
this.pendingcb = 0
this.pendingcb = 0 // Stream is still being constructed and cannot be
// Stream is still being constructed and cannot be
// destroyed until construction finished or failed.
// Async construction is opt in, therefore we start as
// constructed.
this.constructed = true
this.constructed = true // Emit prefinish if the only thing we're waiting for is _write cbs
// Emit prefinish if the only thing we're waiting for is _write cbs
// This is relevant for synchronous Transform streams.
this.prefinished = false
this.prefinished = false // True if the error was already emitted and should not be thrown again.
// True if the error was already emitted and should not be thrown again.
this.errorEmitted = false
this.errorEmitted = false // Should close be emitted on destroy. Defaults to true.
// Should close be emitted on destroy. Defaults to true.
this.emitClose = !options || options.emitClose !== false
this.emitClose = !options || options.emitClose !== false // Should .destroy() be called after 'finish' (and potentially 'end').
// Should .destroy() be called after 'finish' (and potentially 'end').
this.autoDestroy = !options || options.autoDestroy !== false
this.autoDestroy = !options || options.autoDestroy !== false // Indicates whether the stream has errored. When true all write() calls
// Indicates whether the stream has errored. When true all write() calls
// should return false. This is needed since when autoDestroy
// is disabled we need a way to tell whether the stream has failed.
this.errored = null
this.errored = null // Indicates whether the stream has finished destroying.
// Indicates whether the stream has finished destroying.
this.closed = false
this.closed = false // True if close has been emitted or would have been emitted
// True if close has been emitted or would have been emitted
// depending on emitClose.
this.closeEmitted = false
this[kOnFinished] = []
}
function resetBuffer(state) {

@@ -182,10 +196,7 @@ state.buffered = []

}
WritableState.prototype.getBuffer = function getBuffer() {
return ArrayPrototypeSlice(this.buffered, this.bufferedIndex)
}
ObjectDefineProperty(WritableState.prototype, 'bufferedRequestCount', {
__proto__: null,
get() {

@@ -195,3 +206,2 @@ return this.buffered.length - this.bufferedIndex

})
function Writable(options) {

@@ -201,12 +211,12 @@ // Writable ctor is applied to Duplexes, too.

// would return false, as no `_writableState` property is attached.
// Trying to use the custom `instanceof` for Writable here will also break the
// Node.js LazyTransform implementation, which has a non-trivial getter for
// `_writableState` that would lead to infinite recursion.
// Checking for a Stream.Duplex instance is faster here instead of inside
// the WritableState constructor, at least with V8 6.5.
const isDuplex = this instanceof require('./duplex')
if (!isDuplex && !FunctionPrototypeSymbolHasInstance(Writable, this)) return new Writable(options)
this._writableState = new WritableState(options, this, isDuplex)
if (options) {

@@ -220,15 +230,11 @@ if (typeof options.write === 'function') this._write = options.write

}
Stream.call(this, options)
destroyImpl.construct(this, () => {
const state = this._writableState
if (!state.writing) {
clearBuffer(this, state)
}
finishMaybe(this, state)
})
}
ObjectDefineProperty(Writable, SymbolHasInstance, {

@@ -241,11 +247,10 @@ __proto__: null,

}
}) // Otherwise people can pipe Writable streams, which is just wrong.
})
// Otherwise people can pipe Writable streams, which is just wrong.
Writable.prototype.pipe = function () {
errorOrDestroy(this, new ERR_STREAM_CANNOT_PIPE())
}
function _write(stream, chunk, encoding, cb) {
const state = stream._writableState
if (typeof encoding === 'function') {

@@ -259,3 +264,2 @@ cb = encoding

}
if (chunk === null) {

@@ -278,5 +282,3 @@ throw new ERR_STREAM_NULL_VALUES()

}
let err
if (state.ending) {

@@ -287,3 +289,2 @@ err = new ERR_STREAM_WRITE_AFTER_END()

}
if (err) {

@@ -294,18 +295,13 @@ process.nextTick(cb, err)

}
state.pendingcb++
return writeOrBuffer(stream, state, chunk, encoding, cb)
}
Writable.prototype.write = function (chunk, encoding, cb) {
return _write(this, chunk, encoding, cb) === true
}
Writable.prototype.cork = function () {
this._writableState.corked++
}
Writable.prototype.uncork = function () {
const state = this._writableState
if (state.corked) {

@@ -316,3 +312,2 @@ state.corked--

}
Writable.prototype.setDefaultEncoding = function setDefaultEncoding(encoding) {

@@ -324,14 +319,15 @@ // node::ParseEncoding() requires lower case.

return this
} // If we're already writing something, then just put this
}
// If we're already writing something, then just put this
// in the queue, and wait our turn. Otherwise, call _write
// If we return false, then we need a drain event, so set that flag.
function writeOrBuffer(stream, state, chunk, encoding, callback) {
const len = state.objectMode ? 1 : chunk.length
state.length += len // stream._write resets state.length
state.length += len
const ret = state.length < state.highWaterMark // We must ensure that previous needDrain will not be reset to false.
// stream._write resets state.length
const ret = state.length < state.highWaterMark
// We must ensure that previous needDrain will not be reset to false.
if (!ret) state.needDrain = true
if (state.writing || state.corked || state.errored || !state.constructed) {

@@ -343,7 +339,5 @@ state.buffered.push({

})
if (state.allBuffers && encoding !== 'buffer') {
state.allBuffers = false
}
if (state.allNoop && callback !== nop) {

@@ -357,12 +351,10 @@ state.allNoop = false

state.sync = true
stream._write(chunk, encoding, state.onwrite)
state.sync = false
}
state.sync = false
} // Return false if errored or destroyed in order to break
// Return false if errored or destroyed in order to break
// any synchronous while(stream.write(data)) loops.
return ret && !state.errored && !state.destroyed
}
function doWrite(stream, state, writev, len, chunk, encoding, cb) {

@@ -378,15 +370,13 @@ state.writelen = len

}
function onwriteError(stream, state, er, cb) {
--state.pendingcb
cb(er) // Ensure callbacks are invoked even when autoDestroy is
cb(er)
// Ensure callbacks are invoked even when autoDestroy is
// not enabled. Passing `er` here doesn't make sense since
// it's related to one specific write, not to the buffered
// writes.
errorBuffer(state) // This can emit error, but error must always follow cb.
errorBuffer(state)
// This can emit error, but error must always follow cb.
errorOrDestroy(stream, er)
}
function onwrite(stream, er) {

@@ -396,3 +386,2 @@ const state = stream._writableState

const cb = state.writecb
if (typeof cb !== 'function') {

@@ -402,3 +391,2 @@ errorOrDestroy(stream, new ERR_MULTIPLE_CALLBACK())

}
state.writing = false

@@ -408,3 +396,2 @@ state.writecb = null

state.writelen = 0
if (er) {

@@ -416,9 +403,9 @@ // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364

state.errored = er
} // In case of duplex streams we need to notify the readable side of the
}
// In case of duplex streams we need to notify the readable side of the
// error.
if (stream._readableState && !stream._readableState.errored) {
stream._readableState.errored = er
}
if (sync) {

@@ -433,3 +420,2 @@ process.nextTick(onwriteError, stream, state, er, cb)

}
if (sync) {

@@ -456,3 +442,2 @@ // It is a common case that the callback passed to .write() is always

}
function afterWriteTick({ stream, state, count, cb }) {

@@ -462,6 +447,4 @@ state.afterWriteTickInfo = null

}
function afterWrite(stream, state, count, cb) {
const needDrain = !state.ending && !stream.destroyed && state.length === 0 && state.needDrain
if (needDrain) {

@@ -471,3 +454,2 @@ state.needDrain = false

}
while (count-- > 0) {

@@ -477,10 +459,9 @@ state.pendingcb--

}
if (state.destroyed) {
errorBuffer(state)
}
finishMaybe(stream, state)
} // If there's something in the buffer waiting, then invoke callbacks.
}
// If there's something in the buffer waiting, then invoke callbacks.
function errorBuffer(state) {

@@ -490,6 +471,4 @@ if (state.writing) {

}
for (let n = state.bufferedIndex; n < state.buffered.length; ++n) {
var _state$errored
const { chunk, callback } = state.buffered[n]

@@ -504,8 +483,5 @@ const len = state.objectMode ? 1 : chunk.length

}
const onfinishCallbacks = state[kOnFinished].splice(0)
for (let i = 0; i < onfinishCallbacks.length; i++) {
var _state$errored2
onfinishCallbacks[i](

@@ -517,6 +493,6 @@ (_state$errored2 = state.errored) !== null && _state$errored2 !== undefined

}
resetBuffer(state)
} // If there's something in the buffer waiting, then process it.
}
// If there's something in the buffer waiting, then process it.
function clearBuffer(stream, state) {

@@ -526,13 +502,9 @@ if (state.corked || state.bufferProcessing || state.destroyed || !state.constructed) {

}
const { buffered, bufferedIndex, objectMode } = state
const bufferedLength = buffered.length - bufferedIndex
if (!bufferedLength) {
return
}
let i = bufferedIndex
state.bufferProcessing = true
if (bufferedLength > 1 && stream._writev) {

@@ -546,5 +518,5 @@ state.pendingcb -= bufferedLength - 1

}
} // Make a copy of `buffered` if it's going to be used by `callback` above,
}
// Make a copy of `buffered` if it's going to be used by `callback` above,
// since `doWrite` will mutate the array.
const chunks = state.allNoop && i === 0 ? buffered : ArrayPrototypeSlice(buffered, i)

@@ -561,3 +533,2 @@ chunks.allBuffers = state.allBuffers

} while (i < buffered.length && !state.writing)
if (i === buffered.length) {

@@ -572,6 +543,4 @@ resetBuffer(state)

}
state.bufferProcessing = false
}
Writable.prototype._write = function (chunk, encoding, cb) {

@@ -592,8 +561,5 @@ if (this._writev) {

}
Writable.prototype._writev = null
Writable.prototype.end = function (chunk, encoding, cb) {
const state = this._writableState
if (typeof chunk === 'function') {

@@ -607,13 +573,11 @@ cb = chunk

}
let err
if (chunk !== null && chunk !== undefined) {
const ret = _write(this, chunk, encoding)
if (ret instanceof Error) {
err = ret
}
} // .end() fully uncorks.
}
// .end() fully uncorks.
if (state.corked) {

@@ -623,3 +587,2 @@ state.corked = 1

}
if (err) {

@@ -633,2 +596,3 @@ // Do nothing...

// or not.
state.ending = true

@@ -642,3 +606,2 @@ finishMaybe(this, state, true)

}
if (typeof cb === 'function') {

@@ -651,6 +614,4 @@ if (err || state.finished) {

}
return this
}
function needFinish(state) {

@@ -670,6 +631,4 @@ return (

}
function callFinal(stream, state) {
let called = false
function onFinish(err) {

@@ -680,20 +639,16 @@ if (called) {

}
called = true
state.pendingcb--
if (err) {
const onfinishCallbacks = state[kOnFinished].splice(0)
for (let i = 0; i < onfinishCallbacks.length; i++) {
onfinishCallbacks[i](err)
}
errorOrDestroy(stream, err, state.sync)
} else if (needFinish(state)) {
state.prefinished = true
stream.emit('prefinish') // Backwards compat. Don't check state.sync here.
stream.emit('prefinish')
// Backwards compat. Don't check state.sync here.
// Some streams assume 'finish' will be emitted
// asynchronously relative to _final callback.
state.pendingcb++

@@ -703,6 +658,4 @@ process.nextTick(finish, stream, state)

}
state.sync = true
state.pendingcb++
try {

@@ -713,6 +666,4 @@ stream._final(onFinish)

}
state.sync = false
}
function prefinish(stream, state) {

@@ -729,7 +680,5 @@ if (!state.prefinished && !state.finalCalled) {

}
function finishMaybe(stream, state, sync) {
if (needFinish(state)) {
prefinish(stream, state)
if (state.pendingcb === 0) {

@@ -756,3 +705,2 @@ if (sync) {

}
function finish(stream, state) {

@@ -762,9 +710,6 @@ state.pendingcb--

const onfinishCallbacks = state[kOnFinished].splice(0)
for (let i = 0; i < onfinishCallbacks.length; i++) {
onfinishCallbacks[i]()
}
stream.emit('finish')
if (state.autoDestroy) {

@@ -776,6 +721,6 @@ // In case of duplex streams we need a way to detect

!rState ||
(rState.autoDestroy && // We don't expect the readable to ever 'end'
(rState.autoDestroy &&
// We don't expect the readable to ever 'end'
// if readable is explicitly set to false.
(rState.endEmitted || rState.readable === false))
if (autoDestroy) {

@@ -786,7 +731,5 @@ stream.destroy()

}
ObjectDefineProperties(Writable.prototype, {
closed: {
__proto__: null,
get() {

@@ -798,7 +741,5 @@ return this._writableState ? this._writableState.closed : false

__proto__: null,
get() {
return this._writableState ? this._writableState.destroyed : false
},
set(value) {

@@ -813,12 +754,10 @@ // Backward compatibility, the user is explicitly managing destroyed.

__proto__: null,
get() {
const w = this._writableState // w.writable === false means that this is part of a Duplex stream
const w = this._writableState
// w.writable === false means that this is part of a Duplex stream
// where the writable side was disabled upon construction.
// Compat. The user might manually disable writable side through
// deprecated setter.
return !!w && w.writable !== false && !w.destroyed && !w.errored && !w.ending && !w.ended
},
set(val) {

@@ -833,3 +772,2 @@ // Backwards compatible.

__proto__: null,
get() {

@@ -841,3 +779,2 @@ return this._writableState ? this._writableState.finished : false

__proto__: null,
get() {

@@ -849,3 +786,2 @@ return this._writableState ? this._writableState.objectMode : false

__proto__: null,
get() {

@@ -857,3 +793,2 @@ return this._writableState && this._writableState.getBuffer()

__proto__: null,
get() {

@@ -865,3 +800,2 @@ return this._writableState ? this._writableState.ending : false

__proto__: null,
get() {

@@ -875,3 +809,2 @@ const wState = this._writableState

__proto__: null,
get() {

@@ -883,3 +816,2 @@ return this._writableState && this._writableState.highWaterMark

__proto__: null,
get() {

@@ -891,3 +823,2 @@ return this._writableState ? this._writableState.corked : 0

__proto__: null,
get() {

@@ -900,3 +831,2 @@ return this._writableState && this._writableState.length

enumerable: false,
get() {

@@ -919,26 +849,22 @@ return this._writableState ? this._writableState.errored : null

const destroy = destroyImpl.destroy
Writable.prototype.destroy = function (err, cb) {
const state = this._writableState // Invoke pending callbacks.
const state = this._writableState
// Invoke pending callbacks.
if (!state.destroyed && (state.bufferedIndex < state.buffered.length || state[kOnFinished].length)) {
process.nextTick(errorBuffer, state)
}
destroy.call(this, err, cb)
return this
}
Writable.prototype._undestroy = destroyImpl.undestroy
Writable.prototype._destroy = function (err, cb) {
cb(err)
}
Writable.prototype[EE.captureRejectionSymbol] = function (err) {
this.destroy(err)
}
let webStreamsAdapters
let webStreamsAdapters // Lazy to avoid circular references
// Lazy to avoid circular references
function lazyWebStreams() {

@@ -948,9 +874,7 @@ if (webStreamsAdapters === undefined) webStreamsAdapters = {}

}
Writable.fromWeb = function (writableStream, options) {
return lazyWebStreams().newStreamWritableFromWritableStream(writableStream, options)
}
Writable.toWeb = function (streamWritable) {
return lazyWebStreams().newWritableStreamFromStreamWritable(streamWritable)
}

@@ -19,3 +19,2 @@ 'use strict'

} = require('../ours/primordials')
const {

@@ -25,8 +24,6 @@ hideStackFrames,

} = require('../ours/errors')
const { normalizeEncoding } = require('../ours/util')
const { isAsyncFunction, isArrayBufferView } = require('../ours/util').types
const signals = {}
const signals = {}
/**

@@ -36,6 +33,6 @@ * @param {*} value

*/
function isInt32(value) {
return value === (value | 0)
}
/**

@@ -45,9 +42,8 @@ * @param {*} value

*/
function isUint32(value) {
return value === value >>> 0
}
const octalReg = /^[0-7]+$/
const modeDesc = 'must be a 32-bit unsigned integer or an octal string'
/**

@@ -65,3 +61,2 @@ * Parse and validate values that will be converted into mode_t (the S_*

*/
function parseFileMode(value, name, def) {

@@ -71,3 +66,2 @@ if (typeof value === 'undefined') {

}
if (typeof value === 'string') {

@@ -77,9 +71,8 @@ if (RegExpPrototypeExec(octalReg, value) === null) {

}
value = NumberParseInt(value, 8)
}
validateUint32(value, name)
return value
}
/**

@@ -95,3 +88,2 @@ * @callback validateInteger

/** @type {validateInteger} */
const validateInteger = hideStackFrames((value, name, min = NumberMIN_SAFE_INTEGER, max = NumberMAX_SAFE_INTEGER) => {

@@ -102,2 +94,3 @@ if (typeof value !== 'number') throw new ERR_INVALID_ARG_TYPE(name, 'number', value)

})
/**

@@ -113,3 +106,2 @@ * @callback validateInt32

/** @type {validateInt32} */
const validateInt32 = hideStackFrames((value, name, min = -2147483648, max = 2147483647) => {

@@ -120,7 +112,5 @@ // The defaults for min and max correspond to the limits of 32-bit integers.

}
if (!NumberIsInteger(value)) {
throw new ERR_OUT_OF_RANGE(name, 'an integer', value)
}
if (value < min || value > max) {

@@ -130,2 +120,3 @@ throw new ERR_OUT_OF_RANGE(name, `>= ${min} && <= ${max}`, value)

})
/**

@@ -140,3 +131,2 @@ * @callback validateUint32

/** @type {validateUint32} */
const validateUint32 = hideStackFrames((value, name, positive = false) => {

@@ -146,11 +136,8 @@ if (typeof value !== 'number') {

}
if (!NumberIsInteger(value)) {
throw new ERR_OUT_OF_RANGE(name, 'an integer', value)
}
const min = positive ? 1 : 0 // 2 ** 32 === 4294967296
const max = 4_294_967_295
const min = positive ? 1 : 0
// 2 ** 32 === 4294967296
const max = 4294967295
if (value < min || value > max) {

@@ -160,2 +147,3 @@ throw new ERR_OUT_OF_RANGE(name, `>= ${min} && <= ${max}`, value)

})
/**

@@ -169,6 +157,6 @@ * @callback validateString

/** @type {validateString} */
function validateString(value, name) {
if (typeof value !== 'string') throw new ERR_INVALID_ARG_TYPE(name, 'string', value)
}
/**

@@ -184,6 +172,4 @@ * @callback validateNumber

/** @type {validateNumber} */
function validateNumber(value, name, min = undefined, max) {
if (typeof value !== 'number') throw new ERR_INVALID_ARG_TYPE(name, 'number', value)
if (

@@ -201,2 +187,3 @@ (min != null && value < min) ||

}
/**

@@ -211,3 +198,2 @@ * @callback validateOneOf

/** @type {validateOneOf} */
const validateOneOf = hideStackFrames((value, name, oneOf) => {

@@ -223,2 +209,3 @@ if (!ArrayPrototypeIncludes(oneOf, value)) {

})
/**

@@ -232,10 +219,9 @@ * @callback validateBoolean

/** @type {validateBoolean} */
function validateBoolean(value, name) {
if (typeof value !== 'boolean') throw new ERR_INVALID_ARG_TYPE(name, 'boolean', value)
}
function getOwnPropertyValueOrDefault(options, key, defaultValue) {
return options == null || !ObjectPrototypeHasOwnProperty(options, key) ? defaultValue : options[key]
}
/**

@@ -253,3 +239,2 @@ * @callback validateObject

/** @type {validateObject} */
const validateObject = hideStackFrames((value, name, options = null) => {

@@ -259,3 +244,2 @@ const allowArray = getOwnPropertyValueOrDefault(options, 'allowArray', false)

const nullable = getOwnPropertyValueOrDefault(options, 'nullable', false)
if (

@@ -269,2 +253,3 @@ (!nullable && value === null) ||

})
/**

@@ -279,3 +264,2 @@ * @callback validateArray

/** @type {validateArray} */
const validateArray = hideStackFrames((value, name, minLength = 0) => {

@@ -285,3 +269,2 @@ if (!ArrayIsArray(value)) {

}
if (value.length < minLength) {

@@ -291,4 +274,5 @@ const reason = `must be longer than ${minLength}`

}
}) // eslint-disable-next-line jsdoc/require-returns-check
})
// eslint-disable-next-line jsdoc/require-returns-check
/**

@@ -299,6 +283,4 @@ * @param {*} signal

*/
function validateSignalName(signal, name = 'signal') {
validateString(signal, name)
if (signals[signal] === undefined) {

@@ -308,6 +290,6 @@ if (signals[StringPrototypeToUpperCase(signal)] !== undefined) {

}
throw new ERR_UNKNOWN_SIGNAL(signal)
}
}
/**

@@ -321,3 +303,2 @@ * @callback validateBuffer

/** @type {validateBuffer} */
const validateBuffer = hideStackFrames((buffer, name = 'buffer') => {

@@ -328,2 +309,3 @@ if (!isArrayBufferView(buffer)) {

})
/**

@@ -333,7 +315,5 @@ * @param {string} data

*/
function validateEncoding(data, encoding) {
const normalizedEncoding = normalizeEncoding(encoding)
const length = data.length
if (normalizedEncoding === 'hex' && length % 2 !== 0) {

@@ -343,2 +323,3 @@ throw new ERR_INVALID_ARG_VALUE('encoding', encoding, `is invalid for data of length ${length}`)

}
/**

@@ -352,3 +333,2 @@ * Check that the port number is not NaN when coerced to a number,

*/
function validatePort(port, name = 'Port', allowZero = true) {

@@ -364,5 +344,5 @@ if (

}
return port | 0
}
/**

@@ -375,3 +355,2 @@ * @callback validateAbortSignal

/** @type {validateAbortSignal} */
const validateAbortSignal = hideStackFrames((signal, name) => {

@@ -382,2 +361,3 @@ if (signal !== undefined && (signal === null || typeof signal !== 'object' || !('aborted' in signal))) {

})
/**

@@ -391,6 +371,6 @@ * @callback validateFunction

/** @type {validateFunction} */
const validateFunction = hideStackFrames((value, name) => {
if (typeof value !== 'function') throw new ERR_INVALID_ARG_TYPE(name, 'Function', value)
})
/**

@@ -404,6 +384,6 @@ * @callback validatePlainFunction

/** @type {validatePlainFunction} */
const validatePlainFunction = hideStackFrames((value, name) => {
if (typeof value !== 'function' || isAsyncFunction(value)) throw new ERR_INVALID_ARG_TYPE(name, 'Function', value)
})
/**

@@ -417,6 +397,6 @@ * @callback validateUndefined

/** @type {validateUndefined} */
const validateUndefined = hideStackFrames((value, name) => {
if (value !== undefined) throw new ERR_INVALID_ARG_TYPE(name, 'undefined', value)
})
/**

@@ -428,3 +408,2 @@ * @template T

*/
function validateUnion(value, name, union) {

@@ -435,3 +414,2 @@ if (!ArrayPrototypeIncludes(union, value)) {

}
module.exports = {

@@ -438,0 +416,0 @@ isInt32,

'use strict'
const CustomStream = require('../stream')
const promises = require('../stream/promises')
const originalDestroy = CustomStream.Readable.destroy
module.exports = CustomStream.Readable // Explicit export naming is needed for ESM
module.exports = CustomStream.Readable
// Explicit export naming is needed for ESM
module.exports._uint8ArrayToBuffer = CustomStream._uint8ArrayToBuffer

@@ -29,3 +28,2 @@ module.exports._isUint8Array = CustomStream._isUint8Array

enumerable: true,
get() {

@@ -35,4 +33,5 @@ return promises

})
module.exports.Stream = CustomStream.Stream // Allow default importing
module.exports.Stream = CustomStream.Stream
// Allow default importing
module.exports.default = module.exports
'use strict'
const { format, inspect, AggregateError: CustomAggregateError } = require('./util')
/*

@@ -19,3 +20,4 @@ This file is a reduced and adapted version of the main lib/internal/errors.js file defined at

'number',
'object', // Accept 'Function' and 'Object' as alternative to the lower cased version.
'object',
// Accept 'Function' and 'Object' as alternative to the lower cased version.
'Function',

@@ -30,3 +32,2 @@ 'Object',

const codes = {}
function assert(value, message) {

@@ -36,4 +37,5 @@ if (!value) {

}
} // Only use this for integers! Decimal numbers do not work with this function.
}
// Only use this for integers! Decimal numbers do not work with this function.
function addNumericalSeparator(val) {

@@ -43,14 +45,12 @@ let res = ''

const start = val[0] === '-' ? 1 : 0
for (; i >= start + 4; i -= 3) {
res = `_${val.slice(i - 3, i)}${res}`
}
return `${val.slice(0, i)}${res}`
}
function getMessage(key, msg, args) {
if (typeof msg === 'function') {
assert(
msg.length <= args.length, // Default options do not count.
msg.length <= args.length,
// Default options do not count.
`Code: ${key}; The provided arguments length (${args.length}) does not match the required ones (${msg.length}).`

@@ -60,3 +60,2 @@ )

}
const expectedLength = (msg.match(/%[dfijoOs]/g) || []).length

@@ -67,10 +66,7 @@ assert(

)
if (args.length === 0) {
return msg
}
return format(msg, ...args)
}
function E(code, message, Base) {

@@ -80,3 +76,2 @@ if (!Base) {

}
class NodeError extends Base {

@@ -86,3 +81,2 @@ constructor(...args) {

}
toString() {

@@ -92,3 +86,2 @@ return `${this.name} [${code}]: ${this.message}`

}
Object.defineProperties(NodeError.prototype, {

@@ -105,3 +98,2 @@ name: {

},
writable: true,

@@ -116,3 +108,2 @@ enumerable: false,

}
function hideStackFrames(fn) {

@@ -127,3 +118,2 @@ // We rename the functions that will be hidden to cut off the stacktrace

}
function aggregateTwoErrors(innerError, outerError) {

@@ -136,3 +126,2 @@ if (innerError && outerError && innerError !== outerError) {

}
const err = new AggregateError([outerError, innerError], outerError.message)

@@ -142,6 +131,4 @@ err.code = outerError.code

}
return innerError || outerError
}
class AbortError extends Error {

@@ -152,3 +139,2 @@ constructor(message = 'The operation was aborted', options = undefined) {

}
super(message, options)

@@ -159,3 +145,2 @@ this.code = 'ABORT_ERR'

}
E('ERR_ASSERTION', '%s', Error)

@@ -166,9 +151,6 @@ E(

assert(typeof name === 'string', "'name' must be a string")
if (!Array.isArray(expected)) {
expected = [expected]
}
let msg = 'The '
if (name.endsWith(' argument')) {

@@ -180,3 +162,2 @@ // For cases like 'first argument'

}
msg += 'must be '

@@ -186,6 +167,4 @@ const types = []

const other = []
for (const value of expected) {
assert(typeof value === 'string', 'All expected entries have to be of type string')
if (kTypes.includes(value)) {

@@ -199,8 +178,8 @@ types.push(value.toLowerCase())

}
} // Special handle `object` in case other instances are allowed to outline
}
// Special handle `object` in case other instances are allowed to outline
// the differences between each other.
if (instances.length > 0) {
const pos = types.indexOf('object')
if (pos !== -1) {

@@ -211,3 +190,2 @@ types.splice(types, pos, 1)

}
if (types.length > 0) {

@@ -218,7 +196,5 @@ switch (types.length) {

break
case 2:
msg += `one of type ${types[0]} or ${types[1]}`
break
default: {

@@ -229,3 +205,2 @@ const last = types.pop()

}
if (instances.length > 0 || other.length > 0) {

@@ -235,3 +210,2 @@ msg += ' or '

}
if (instances.length > 0) {

@@ -242,7 +216,5 @@ switch (instances.length) {

break
case 2:
msg += `an instance of ${instances[0]} or ${instances[1]}`
break
default: {

@@ -253,3 +225,2 @@ const last = instances.pop()

}
if (other.length > 0) {

@@ -259,7 +230,5 @@ msg += ' or '

}
switch (other.length) {
case 0:
break
case 1:

@@ -269,10 +238,7 @@ if (other[0].toLowerCase() !== other[0]) {

}
msg += `${other[0]}`
break
case 2:
msg += `one of ${other[0]} or ${other[1]}`
break
default: {

@@ -283,3 +249,2 @@ const last = other.pop()

}
if (actual == null) {

@@ -291,3 +256,2 @@ msg += `. Received ${actual}`

var _actual$constructor
if (

@@ -309,10 +273,7 @@ (_actual$constructor = actual.constructor) !== null &&

})
if (inspected.length > 25) {
inspected = `${inspected.slice(0, 25)}...`
}
msg += `. Received type ${typeof actual} (${inspected})`
}
return msg

@@ -326,7 +287,5 @@ },

let inspected = inspect(value)
if (inspected.length > 128) {
inspected = inspected.slice(0, 128) + '...'
}
const type = name.includes('.') ? 'property' : 'argument'

@@ -341,3 +300,2 @@ return `The ${type} '${name}' ${reason}. Received ${inspected}`

var _value$constructor
const type =

@@ -362,3 +320,2 @@ value !== null &&

args = (Array.isArray(args) ? args : [args]).map((a) => `"${a}"`).join(' or ')
switch (len) {

@@ -368,7 +325,5 @@ case 1:

break
case 2:
msg += `The ${args[0]} and ${args[1]} arguments`
break
default:

@@ -381,3 +336,2 @@ {

}
return `${msg} must be specified`

@@ -392,3 +346,2 @@ },

let received
if (Number.isInteger(input) && Math.abs(input) > 2 ** 32) {

@@ -398,7 +351,5 @@ received = addNumericalSeparator(String(input))

received = String(input)
if (input > 2n ** 32n || input < -(2n ** 32n)) {
received = addNumericalSeparator(received)
}
received += 'n'

@@ -408,3 +359,2 @@ } else {

}
return `The value of "${str}" is out of range. It must be ${range}. Received ${received}`

@@ -411,0 +361,0 @@ },

'use strict'
const Stream = require('stream')
if (Stream && process.env.READABLE_STREAM === 'disable') {
const promises = Stream.promises // Explicit export naming is needed for ESM
const promises = Stream.promises
// Explicit export naming is needed for ESM
module.exports._uint8ArrayToBuffer = Stream._uint8ArrayToBuffer

@@ -26,3 +26,2 @@ module.exports._isUint8Array = Stream._isUint8Array

enumerable: true,
get() {

@@ -35,8 +34,7 @@ return promises

const CustomStream = require('../stream')
const promises = require('../stream/promises')
const originalDestroy = CustomStream.Readable.destroy
module.exports = CustomStream.Readable // Explicit export naming is needed for ESM
module.exports = CustomStream.Readable
// Explicit export naming is needed for ESM
module.exports._uint8ArrayToBuffer = CustomStream._uint8ArrayToBuffer

@@ -61,3 +59,2 @@ module.exports._isUint8Array = CustomStream._isUint8Array

enumerable: true,
get() {

@@ -68,4 +65,5 @@ return promises

module.exports.Stream = CustomStream.Stream
} // Allow default importing
}
// Allow default importing
module.exports.default = module.exports
'use strict'
/*

@@ -9,3 +10,2 @@ This file is a reduced and adapted version of the main lib/internal/per_context/primordials.js file defined at

*/
module.exports = {

@@ -15,41 +15,30 @@ ArrayIsArray(self) {

},
ArrayPrototypeIncludes(self, el) {
return self.includes(el)
},
ArrayPrototypeIndexOf(self, el) {
return self.indexOf(el)
},
ArrayPrototypeJoin(self, sep) {
return self.join(sep)
},
ArrayPrototypeMap(self, fn) {
return self.map(fn)
},
ArrayPrototypePop(self, el) {
return self.pop(el)
},
ArrayPrototypePush(self, el) {
return self.push(el)
},
ArrayPrototypeSlice(self, start, end) {
return self.slice(start, end)
},
Error,
FunctionPrototypeCall(fn, thisArgs, ...args) {
return fn.call(thisArgs, ...args)
},
FunctionPrototypeSymbolHasInstance(self, instance) {
return Function.prototype[Symbol.hasInstance].call(self, instance)
},
MathFloor: Math.floor,

@@ -62,62 +51,45 @@ Number,

NumberParseInt: Number.parseInt,
ObjectDefineProperties(self, props) {
return Object.defineProperties(self, props)
},
ObjectDefineProperty(self, name, prop) {
return Object.defineProperty(self, name, prop)
},
ObjectGetOwnPropertyDescriptor(self, name) {
return Object.getOwnPropertyDescriptor(self, name)
},
ObjectKeys(obj) {
return Object.keys(obj)
},
ObjectSetPrototypeOf(target, proto) {
return Object.setPrototypeOf(target, proto)
},
Promise,
PromisePrototypeCatch(self, fn) {
return self.catch(fn)
},
PromisePrototypeThen(self, thenFn, catchFn) {
return self.then(thenFn, catchFn)
},
PromiseReject(err) {
return Promise.reject(err)
},
ReflectApply: Reflect.apply,
RegExpPrototypeTest(self, value) {
return self.test(value)
},
SafeSet: Set,
String,
StringPrototypeSlice(self, start, end) {
return self.slice(start, end)
},
StringPrototypeToLowerCase(self) {
return self.toLowerCase()
},
StringPrototypeToUpperCase(self) {
return self.toUpperCase()
},
StringPrototypeTrim(self) {
return self.trim()
},
Symbol,

@@ -127,8 +99,6 @@ SymbolAsyncIterator: Symbol.asyncIterator,

SymbolIterator: Symbol.iterator,
TypedArrayPrototypeSet(self, buf, len) {
return self.set(buf, len)
},
Uint8Array
}
'use strict'
const bufferModule = require('buffer')
const AsyncFunction = Object.getPrototypeOf(async function () {}).constructor
const Blob = globalThis.Blob || bufferModule.Blob
/* eslint-disable indent */
const isBlob =

@@ -19,4 +17,4 @@ typeof Blob !== 'undefined'

/* eslint-enable indent */
// This is a simplified version of AggregateError
class AggregateError extends Error {

@@ -27,9 +25,6 @@ constructor(errors) {

}
let message = ''
for (let i = 0; i < errors.length; i++) {
message += ` ${errors[i].stack}\n`
}
super(message)

@@ -40,7 +35,5 @@ this.name = 'AggregateError'

}
module.exports = {
AggregateError,
kEmptyObject: Object.freeze({}),
once(callback) {

@@ -52,3 +45,2 @@ let called = false

}
called = true

@@ -58,7 +50,7 @@ callback.apply(this, args)

},
createDeferredPromise: function () {
let resolve
let reject // eslint-disable-next-line promise/param-names
let reject
// eslint-disable-next-line promise/param-names
const promise = new Promise((res, rej) => {

@@ -74,3 +66,2 @@ resolve = res

},
promisify(fn) {

@@ -82,3 +73,2 @@ return new Promise((resolve, reject) => {

}
return resolve(...args)

@@ -88,7 +78,5 @@ })

},
debuglog() {
return function () {}
},
format(format, ...args) {

@@ -98,3 +86,2 @@ // Simplified version of https://nodejs.org/api/util.html#utilformatformat-args

const replacement = args.shift()
if (type === 'f') {

@@ -112,3 +99,2 @@ return replacement.toFixed(6)

},
inspect(value) {

@@ -125,5 +111,3 @@ // Vastly simplified version of https://nodejs.org/api/util.html#utilinspectobject-options

}
return `'${value}'`
case 'number':

@@ -135,12 +119,8 @@ if (isNaN(value)) {

}
return value
case 'bigint':
return `${String(value)}n`
case 'boolean':
case 'undefined':
return String(value)
case 'object':

@@ -150,3 +130,2 @@ return '{}'

},
types: {

@@ -156,3 +135,2 @@ isAsyncFunction(fn) {

},
isArrayBufferView(arr) {

@@ -159,0 +137,0 @@ return ArrayBuffer.isView(arr)

/* replacement start */
const { Buffer } = require('buffer')
/* replacement end */

@@ -26,31 +28,18 @@ // Copyright Joyent, Inc. and other Node contributors.

;('use strict')
const { ObjectDefineProperty, ObjectKeys, ReflectApply } = require('./ours/primordials')
const {
promisify: { custom: customPromisify }
} = require('./ours/util')
const { streamReturningOperators, promiseReturningOperators } = require('./internal/streams/operators')
const {
codes: { ERR_ILLEGAL_CONSTRUCTOR }
} = require('./ours/errors')
const compose = require('./internal/streams/compose')
const { pipeline } = require('./internal/streams/pipeline')
const { destroyer } = require('./internal/streams/destroy')
const eos = require('./internal/streams/end-of-stream')
const internalBuffer = {}
const promises = require('./stream/promises')
const utils = require('./internal/streams/utils')
const Stream = (module.exports = require('./internal/streams/legacy').Stream)
Stream.isDisturbed = utils.isDisturbed

@@ -60,6 +49,4 @@ Stream.isErrored = utils.isErrored

Stream.Readable = require('./internal/streams/readable')
for (const key of ObjectKeys(streamReturningOperators)) {
const op = streamReturningOperators[key]
function fn(...args) {

@@ -69,6 +56,4 @@ if (new.target) {

}
return Stream.Readable.from(ReflectApply(op, this, args))
}
ObjectDefineProperty(fn, 'name', {

@@ -90,6 +75,4 @@ __proto__: null,

}
for (const key of ObjectKeys(promiseReturningOperators)) {
const op = promiseReturningOperators[key]
function fn(...args) {

@@ -99,6 +82,4 @@ if (new.target) {

}
return ReflectApply(op, this, args)
}
ObjectDefineProperty(fn, 'name', {

@@ -120,3 +101,2 @@ __proto__: null,

}
Stream.Writable = require('./internal/streams/writable')

@@ -127,5 +107,3 @@ Stream.Duplex = require('./internal/streams/duplex')

Stream.pipeline = pipeline
const { addAbortSignal } = require('./internal/streams/add-abort-signal')
Stream.addAbortSignal = addAbortSignal

@@ -139,3 +117,2 @@ Stream.finished = eos

enumerable: true,
get() {

@@ -148,3 +125,2 @@ return promises

enumerable: true,
get() {

@@ -157,16 +133,14 @@ return promises.pipeline

enumerable: true,
get() {
return promises.finished
}
}) // Backwards-compat with node 0.4.x
})
// Backwards-compat with node 0.4.x
Stream.Stream = Stream
Stream._isUint8Array = function isUint8Array(value) {
return value instanceof Uint8Array
}
Stream._uint8ArrayToBuffer = function _uint8ArrayToBuffer(chunk) {
return Buffer.from(chunk.buffer, chunk.byteOffset, chunk.byteLength)
}
'use strict'
const { ArrayPrototypePop, Promise } = require('../ours/primordials')
const { isIterable, isNodeStream } = require('../internal/streams/utils')
const { pipelineImpl: pl } = require('../internal/streams/pipeline')
const { finished } = require('../internal/streams/end-of-stream')
function pipeline(...streams) {

@@ -16,3 +12,2 @@ return new Promise((resolve, reject) => {

const lastArg = streams[streams.length - 1]
if (lastArg && typeof lastArg === 'object' && !isNodeStream(lastArg) && !isIterable(lastArg)) {

@@ -23,3 +18,2 @@ const options = ArrayPrototypePop(streams)

}
pl(

@@ -41,3 +35,2 @@ streams,

}
module.exports = {

@@ -44,0 +37,0 @@ finished,

{
"name": "readable-stream",
"version": "4.2.0",
"version": "4.3.0",
"description": "Node.js Streams, a user-land copy of the stream library from Node.js",

@@ -5,0 +5,0 @@ "homepage": "https://github.com/nodejs/readable-stream",

@@ -90,24 +90,4 @@ # readable-stream

You will need a bundler like [`browserify`](https://github.com/browserify/browserify#readme), [`webpack`](https://webpack.js.org/), [`parcel`](https://github.com/parcel-bundler/parcel#readme) or similar. With Webpack 5 (which unlike other bundlers does not polyfill Node.js core modules and globals like `process`) you will also need to:
You will need a bundler like [`browserify`](https://github.com/browserify/browserify#readme), [`webpack`](https://webpack.js.org/), [`parcel`](https://github.com/parcel-bundler/parcel#readme) or similar. Polyfills are no longer required since version 4.2.0.
1. Install polyfills by running `npm install buffer process --save-dev`
2. Create a [`webpack.config.js`](https://webpack.js.org/guides/getting-started/#using-a-configuration) file containing:
```js
const webpack = require('webpack')
module.exports = {
plugins: [
new webpack.ProvidePlugin({
process: 'process/browser'
})
],
resolve: {
fallback: {
buffer: require.resolve('buffer/')
}
}
}
```
# Streams Working Group

@@ -114,0 +94,0 @@

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc