readable-stream
Advanced tools
Comparing version 4.2.0 to 4.3.0
@@ -1,3 +0,4 @@ | ||
'use strict' // Keep this file as an alias for the full stream module. | ||
'use strict' | ||
// Keep this file as an alias for the full stream module. | ||
module.exports = require('./stream').Duplex |
@@ -1,3 +0,4 @@ | ||
'use strict' // Keep this file as an alias for the full stream module. | ||
'use strict' | ||
// Keep this file as an alias for the full stream module. | ||
module.exports = require('./stream').PassThrough |
@@ -1,3 +0,4 @@ | ||
'use strict' // Keep this file as an alias for the full stream module. | ||
'use strict' | ||
// Keep this file as an alias for the full stream module. | ||
module.exports = require('./stream').Readable |
@@ -1,3 +0,4 @@ | ||
'use strict' // Keep this file as an alias for the full stream module. | ||
'use strict' | ||
// Keep this file as an alias for the full stream module. | ||
module.exports = require('./stream').Transform |
@@ -1,3 +0,4 @@ | ||
'use strict' // Keep this file as an alias for the full stream module. | ||
'use strict' | ||
// Keep this file as an alias for the full stream module. | ||
module.exports = require('./stream').Writable |
'use strict' | ||
const { AbortError, codes } = require('../../ours/errors') | ||
const eos = require('./end-of-stream') | ||
const { ERR_INVALID_ARG_TYPE } = codes | ||
const { ERR_INVALID_ARG_TYPE } = codes // This method is inlined here for readable-stream | ||
// This method is inlined here for readable-stream | ||
// It also does not allow for signal to not exist on the stream | ||
// https://github.com/nodejs/node/pull/36061#discussion_r533718029 | ||
const validateAbortSignal = (signal, name) => { | ||
@@ -16,17 +15,12 @@ if (typeof signal !== 'object' || !('aborted' in signal)) { | ||
} | ||
function isNodeStream(obj) { | ||
return !!(obj && typeof obj.pipe === 'function') | ||
} | ||
module.exports.addAbortSignal = function addAbortSignal(signal, stream) { | ||
validateAbortSignal(signal, 'signal') | ||
if (!isNodeStream(stream)) { | ||
throw new ERR_INVALID_ARG_TYPE('stream', 'stream.Stream', stream) | ||
} | ||
return module.exports.addAbortSignalNoValidate(signal, stream) | ||
} | ||
module.exports.addAbortSignalNoValidate = function (signal, stream) { | ||
@@ -36,3 +30,2 @@ if (typeof signal !== 'object' || !('aborted' in signal)) { | ||
} | ||
const onAbort = () => { | ||
@@ -45,3 +38,2 @@ stream.destroy( | ||
} | ||
if (signal.aborted) { | ||
@@ -53,4 +45,3 @@ onAbort() | ||
} | ||
return stream | ||
} |
'use strict' | ||
const { StringPrototypeSlice, SymbolIterator, TypedArrayPrototypeSet, Uint8Array } = require('../../ours/primordials') | ||
const { Buffer } = require('buffer') | ||
const { inspect } = require('../../ours/util') | ||
module.exports = class BufferList { | ||
@@ -15,3 +12,2 @@ constructor() { | ||
} | ||
push(v) { | ||
@@ -27,3 +23,2 @@ const entry = { | ||
} | ||
unshift(v) { | ||
@@ -38,3 +33,2 @@ const entry = { | ||
} | ||
shift() { | ||
@@ -48,3 +42,2 @@ if (this.length === 0) return | ||
} | ||
clear() { | ||
@@ -54,3 +47,2 @@ this.head = this.tail = null | ||
} | ||
join(s) { | ||
@@ -60,8 +52,5 @@ if (this.length === 0) return '' | ||
let ret = '' + p.data | ||
while ((p = p.next) !== null) ret += s + p.data | ||
return ret | ||
} | ||
concat(n) { | ||
@@ -72,3 +61,2 @@ if (this.length === 0) return Buffer.alloc(0) | ||
let i = 0 | ||
while (p) { | ||
@@ -79,9 +67,8 @@ TypedArrayPrototypeSet(ret, p.data, i) | ||
} | ||
return ret | ||
} // Consumes a specified amount of bytes or characters from the buffered data. | ||
} | ||
// Consumes a specified amount of bytes or characters from the buffered data. | ||
consume(n, hasStrings) { | ||
const data = this.head.data | ||
if (n < data.length) { | ||
@@ -93,15 +80,12 @@ // `slice` is the same for buffers and strings. | ||
} | ||
if (n === data.length) { | ||
// First chunk is a perfect match. | ||
return this.shift() | ||
} // Result spans more than one buffer. | ||
} | ||
// Result spans more than one buffer. | ||
return hasStrings ? this._getString(n) : this._getBuffer(n) | ||
} | ||
first() { | ||
return this.head.data | ||
} | ||
*[SymbolIterator]() { | ||
@@ -111,4 +95,5 @@ for (let p = this.head; p; p = p.next) { | ||
} | ||
} // Consumes a specified amount of characters from the buffered data. | ||
} | ||
// Consumes a specified amount of characters from the buffered data. | ||
_getString(n) { | ||
@@ -118,6 +103,4 @@ let ret = '' | ||
let c = 0 | ||
do { | ||
const str = p.data | ||
if (n > str.length) { | ||
@@ -137,13 +120,11 @@ ret += str | ||
} | ||
break | ||
} | ||
++c | ||
} while ((p = p.next) !== null) | ||
this.length -= c | ||
return ret | ||
} // Consumes a specified amount of bytes from the buffered data. | ||
} | ||
// Consumes a specified amount of bytes from the buffered data. | ||
_getBuffer(n) { | ||
@@ -154,6 +135,4 @@ const ret = Buffer.allocUnsafe(n) | ||
let c = 0 | ||
do { | ||
const buf = p.data | ||
if (n > buf.length) { | ||
@@ -173,13 +152,11 @@ TypedArrayPrototypeSet(ret, buf, retLen - n) | ||
} | ||
break | ||
} | ||
++c | ||
} while ((p = p.next) !== null) | ||
this.length -= c | ||
return ret | ||
} // Make sure the linked list only shows the minimal necessary information. | ||
} | ||
// Make sure the linked list only shows the minimal necessary information. | ||
[Symbol.for('nodejs.util.inspect.custom')](_, options) { | ||
@@ -186,0 +163,0 @@ return inspect(this, { |
'use strict' | ||
const { pipeline } = require('./pipeline') | ||
const Duplex = require('./duplex') | ||
const { destroyer } = require('./destroy') | ||
const { isNodeStream, isReadable, isWritable } = require('./utils') | ||
const { | ||
@@ -15,3 +11,2 @@ AbortError, | ||
} = require('../../ours/errors') | ||
module.exports = function compose(...streams) { | ||
@@ -21,13 +16,9 @@ if (streams.length === 0) { | ||
} | ||
if (streams.length === 1) { | ||
return Duplex.from(streams[0]) | ||
} | ||
const orgStreams = [...streams] | ||
if (typeof streams[0] === 'function') { | ||
streams[0] = Duplex.from(streams[0]) | ||
} | ||
if (typeof streams[streams.length - 1] === 'function') { | ||
@@ -37,3 +28,2 @@ const idx = streams.length - 1 | ||
} | ||
for (let n = 0; n < streams.length; ++n) { | ||
@@ -44,7 +34,5 @@ if (!isNodeStream(streams[n])) { | ||
} | ||
if (n < streams.length - 1 && !isReadable(streams[n])) { | ||
throw new ERR_INVALID_ARG_VALUE(`streams[${n}]`, orgStreams[n], 'must be readable') | ||
} | ||
if (n > 0 && !isWritable(streams[n])) { | ||
@@ -54,3 +42,2 @@ throw new ERR_INVALID_ARG_VALUE(`streams[${n}]`, orgStreams[n], 'must be writable') | ||
} | ||
let ondrain | ||
@@ -61,7 +48,5 @@ let onfinish | ||
let d | ||
function onfinished(err) { | ||
const cb = onclose | ||
onclose = null | ||
if (cb) { | ||
@@ -75,10 +60,10 @@ cb(err) | ||
} | ||
const head = streams[0] | ||
const tail = pipeline(streams, onfinished) | ||
const writable = !!isWritable(head) | ||
const readable = !!isReadable(tail) // TODO(ronag): Avoid double buffering. | ||
const readable = !!isReadable(tail) | ||
// TODO(ronag): Avoid double buffering. | ||
// Implement Writable/Readable/Duplex traits. | ||
// See, https://github.com/nodejs/node/pull/33515. | ||
d = new Duplex({ | ||
@@ -91,3 +76,2 @@ // TODO (ronag): highWaterMark? | ||
}) | ||
if (writable) { | ||
@@ -101,3 +85,2 @@ d._write = function (chunk, encoding, callback) { | ||
} | ||
d._final = function (callback) { | ||
@@ -107,3 +90,2 @@ head.end() | ||
} | ||
head.on('drain', function () { | ||
@@ -124,3 +106,2 @@ if (ondrain) { | ||
} | ||
if (readable) { | ||
@@ -137,7 +118,5 @@ tail.on('readable', function () { | ||
}) | ||
d._read = function () { | ||
while (true) { | ||
const buf = tail.read() | ||
if (buf === null) { | ||
@@ -147,3 +126,2 @@ onreadable = d._read | ||
} | ||
if (!d.push(buf)) { | ||
@@ -155,3 +133,2 @@ return | ||
} | ||
d._destroy = function (err, callback) { | ||
@@ -161,7 +138,5 @@ if (!err && onclose !== null) { | ||
} | ||
onreadable = null | ||
ondrain = null | ||
onfinish = null | ||
if (onclose === null) { | ||
@@ -174,4 +149,3 @@ callback(err) | ||
} | ||
return d | ||
} |
@@ -5,3 +5,4 @@ 'use strict' | ||
const process = require('process') | ||
const process = require('process/') | ||
/* replacement end */ | ||
@@ -14,10 +15,6 @@ | ||
} = require('../../ours/errors') | ||
const { Symbol } = require('../../ours/primordials') | ||
const { kDestroyed, isDestroyed, isFinished, isServerRequest } = require('./utils') | ||
const kDestroy = Symbol('kDestroy') | ||
const kConstruct = Symbol('kConstruct') | ||
function checkError(err, w, r) { | ||
@@ -31,3 +28,2 @@ if (err) { | ||
} | ||
if (r && !r.errored) { | ||
@@ -37,11 +33,11 @@ r.errored = err | ||
} | ||
} // Backwards compat. cb() is undocumented and unused in core but | ||
} | ||
// Backwards compat. cb() is undocumented and unused in core but | ||
// unfortunately might be used by modules. | ||
function destroy(err, cb) { | ||
const r = this._readableState | ||
const w = this._writableState // With duplex streams we use the writable side for state. | ||
const w = this._writableState | ||
// With duplex streams we use the writable side for state. | ||
const s = w || r | ||
if ((w && w.destroyed) || (r && r.destroyed)) { | ||
@@ -51,17 +47,16 @@ if (typeof cb === 'function') { | ||
} | ||
return this | ||
} | ||
return this | ||
} // We set destroyed to true before firing error callbacks in order | ||
// We set destroyed to true before firing error callbacks in order | ||
// to make it re-entrance safe in case destroy() is called within callbacks | ||
checkError(err, w, r) | ||
if (w) { | ||
w.destroyed = true | ||
} | ||
if (r) { | ||
r.destroyed = true | ||
} // If still constructing then defer calling _destroy. | ||
} | ||
// If still constructing then defer calling _destroy. | ||
if (!s.constructed) { | ||
@@ -74,9 +69,6 @@ this.once(kDestroy, function (er) { | ||
} | ||
return this | ||
} | ||
function _destroy(self, err, cb) { | ||
let called = false | ||
function onDestroy(err) { | ||
@@ -86,3 +78,2 @@ if (called) { | ||
} | ||
called = true | ||
@@ -92,15 +83,11 @@ const r = self._readableState | ||
checkError(err, w, r) | ||
if (w) { | ||
w.closed = true | ||
} | ||
if (r) { | ||
r.closed = true | ||
} | ||
if (typeof cb === 'function') { | ||
cb(err) | ||
} | ||
if (err) { | ||
@@ -112,3 +99,2 @@ process.nextTick(emitErrorCloseNT, self, err) | ||
} | ||
try { | ||
@@ -120,3 +106,2 @@ self._destroy(err || null, onDestroy) | ||
} | ||
function emitErrorCloseNT(self, err) { | ||
@@ -126,15 +111,11 @@ emitErrorNT(self, err) | ||
} | ||
function emitCloseNT(self) { | ||
const r = self._readableState | ||
const w = self._writableState | ||
if (w) { | ||
w.closeEmitted = true | ||
} | ||
if (r) { | ||
r.closeEmitted = true | ||
} | ||
if ((w && w.emitClose) || (r && r.emitClose)) { | ||
@@ -144,26 +125,19 @@ self.emit('close') | ||
} | ||
function emitErrorNT(self, err) { | ||
const r = self._readableState | ||
const w = self._writableState | ||
if ((w && w.errorEmitted) || (r && r.errorEmitted)) { | ||
return | ||
} | ||
if (w) { | ||
w.errorEmitted = true | ||
} | ||
if (r) { | ||
r.errorEmitted = true | ||
} | ||
self.emit('error', err) | ||
} | ||
function undestroy() { | ||
const r = this._readableState | ||
const w = this._writableState | ||
if (r) { | ||
@@ -180,3 +154,2 @@ r.constructed = true | ||
} | ||
if (w) { | ||
@@ -196,3 +169,2 @@ w.constructed = true | ||
} | ||
function errorOrDestroy(stream, err, sync) { | ||
@@ -204,9 +176,8 @@ // We have tests that rely on errors being emitted | ||
// semver major update we should change the default to this. | ||
const r = stream._readableState | ||
const w = stream._writableState | ||
if ((w && w.destroyed) || (r && r.destroyed)) { | ||
return this | ||
} | ||
if ((r && r.autoDestroy) || (w && w.autoDestroy)) stream.destroy(err) | ||
@@ -220,7 +191,5 @@ else if (err) { | ||
} | ||
if (r && !r.errored) { | ||
r.errored = err | ||
} | ||
if (sync) { | ||
@@ -233,3 +202,2 @@ process.nextTick(emitErrorNT, stream, err) | ||
} | ||
function construct(stream, cb) { | ||
@@ -239,16 +207,11 @@ if (typeof stream._construct !== 'function') { | ||
} | ||
const r = stream._readableState | ||
const w = stream._writableState | ||
if (r) { | ||
r.constructed = false | ||
} | ||
if (w) { | ||
w.constructed = false | ||
} | ||
stream.once(kConstruct, cb) | ||
if (stream.listenerCount(kConstruct) > 1) { | ||
@@ -258,9 +221,6 @@ // Duplex | ||
} | ||
process.nextTick(constructNT, stream) | ||
} | ||
function constructNT(stream) { | ||
let called = false | ||
function onConstruct(err) { | ||
@@ -271,3 +231,2 @@ if (called) { | ||
} | ||
called = true | ||
@@ -277,11 +236,8 @@ const r = stream._readableState | ||
const s = w || r | ||
if (r) { | ||
r.constructed = true | ||
} | ||
if (w) { | ||
w.constructed = true | ||
} | ||
if (s.destroyed) { | ||
@@ -295,3 +251,2 @@ stream.emit(kDestroy, err) | ||
} | ||
try { | ||
@@ -303,20 +258,17 @@ stream._construct(onConstruct) | ||
} | ||
function emitConstructNT(stream) { | ||
stream.emit(kConstruct) | ||
} | ||
function isRequest(stream) { | ||
return stream && stream.setHeader && typeof stream.abort === 'function' | ||
} | ||
function emitCloseLegacy(stream) { | ||
stream.emit('close') | ||
} | ||
function emitErrorCloseLegacy(stream, err) { | ||
stream.emit('error', err) | ||
process.nextTick(emitCloseLegacy, stream) | ||
} // Normalize destroy for legacy. | ||
} | ||
// Normalize destroy for legacy. | ||
function destroyer(stream, err) { | ||
@@ -326,7 +278,7 @@ if (!stream || isDestroyed(stream)) { | ||
} | ||
if (!err && !isFinished(stream)) { | ||
err = new AbortError() | ||
} // TODO: Remove isRequest branches. | ||
} | ||
// TODO: Remove isRequest branches. | ||
if (isServerRequest(stream)) { | ||
@@ -349,3 +301,2 @@ stream.socket = null | ||
} | ||
if (!stream.destroyed) { | ||
@@ -355,3 +306,2 @@ stream[kDestroyed] = true | ||
} | ||
module.exports = { | ||
@@ -358,0 +308,0 @@ construct, |
@@ -21,2 +21,3 @@ // Copyright Joyent, Inc. and other Node contributors. | ||
// USE OR OTHER DEALINGS IN THE SOFTWARE. | ||
// a duplex stream is just a stream that is both readable and writable. | ||
@@ -26,2 +27,3 @@ // Since JS doesn't have multiple prototype inheritance, this class | ||
// Writable. | ||
'use strict' | ||
@@ -35,14 +37,10 @@ | ||
} = require('../../ours/primordials') | ||
module.exports = Duplex | ||
const Readable = require('./readable') | ||
const Writable = require('./writable') | ||
ObjectSetPrototypeOf(Duplex.prototype, Readable.prototype) | ||
ObjectSetPrototypeOf(Duplex, Readable) | ||
{ | ||
const keys = ObjectKeys(Writable.prototype) // Allow the keys array to be GC'ed. | ||
const keys = ObjectKeys(Writable.prototype) | ||
// Allow the keys array to be GC'ed. | ||
for (let i = 0; i < keys.length; i++) { | ||
@@ -53,3 +51,2 @@ const method = keys[i] | ||
} | ||
function Duplex(options) { | ||
@@ -59,6 +56,4 @@ if (!(this instanceof Duplex)) return new Duplex(options) | ||
Writable.call(this, options) | ||
if (options) { | ||
this.allowHalfOpen = options.allowHalfOpen !== false | ||
if (options.readable === false) { | ||
@@ -69,3 +64,2 @@ this._readableState.readable = false | ||
} | ||
if (options.writable === false) { | ||
@@ -81,3 +75,2 @@ this._writableState.writable = false | ||
} | ||
ObjectDefineProperties(Duplex.prototype, { | ||
@@ -122,3 +115,2 @@ writable: { | ||
__proto__: null, | ||
get() { | ||
@@ -128,6 +120,4 @@ if (this._readableState === undefined || this._writableState === undefined) { | ||
} | ||
return this._readableState.destroyed && this._writableState.destroyed | ||
}, | ||
set(value) { | ||
@@ -143,4 +133,5 @@ // Backward compatibility, the user is explicitly | ||
}) | ||
let webStreamsAdapters // Lazy to avoid circular references | ||
let webStreamsAdapters | ||
// Lazy to avoid circular references | ||
function lazyWebStreams() { | ||
@@ -150,13 +141,9 @@ if (webStreamsAdapters === undefined) webStreamsAdapters = {} | ||
} | ||
Duplex.fromWeb = function (pair, options) { | ||
return lazyWebStreams().newStreamDuplexFromReadableWritablePair(pair, options) | ||
} | ||
Duplex.toWeb = function (duplex) { | ||
return lazyWebStreams().newReadableWritablePairFromDuplex(duplex) | ||
} | ||
let duplexify | ||
Duplex.from = function (body) { | ||
@@ -166,4 +153,3 @@ if (!duplexify) { | ||
} | ||
return duplexify(body, 'body') | ||
} |
/* replacement start */ | ||
const process = require('process') | ||
const process = require('process/') | ||
/* replacement end */ | ||
;('use strict') | ||
const bufferModule = require('buffer') | ||
const { | ||
@@ -18,5 +18,3 @@ isReadable, | ||
} = require('./utils') | ||
const eos = require('./end-of-stream') | ||
const { | ||
@@ -26,13 +24,7 @@ AbortError, | ||
} = require('../../ours/errors') | ||
const { destroyer } = require('./destroy') | ||
const Duplex = require('./duplex') | ||
const Readable = require('./readable') | ||
const { createDeferredPromise } = require('../../ours/util') | ||
const from = require('./from') | ||
const Blob = globalThis.Blob || bufferModule.Blob | ||
@@ -47,11 +39,12 @@ const isBlob = | ||
} | ||
const AbortController = globalThis.AbortController || require('abort-controller').AbortController | ||
const { FunctionPrototypeCall } = require('../../ours/primordials') | ||
const { FunctionPrototypeCall } = require('../../ours/primordials') // This is needed for pre node 17. | ||
// This is needed for pre node 17. | ||
class Duplexify extends Duplex { | ||
constructor(options) { | ||
super(options) // https://github.com/nodejs/node/pull/34385 | ||
super(options) | ||
// https://github.com/nodejs/node/pull/34385 | ||
if ((options === null || options === undefined ? undefined : options.readable) === false) { | ||
@@ -62,3 +55,2 @@ this._readableState.readable = false | ||
} | ||
if ((options === null || options === undefined ? undefined : options.writable) === false) { | ||
@@ -72,3 +64,2 @@ this._writableState.writable = false | ||
} | ||
module.exports = function duplexify(body, name) { | ||
@@ -78,3 +69,2 @@ if (isDuplexNodeStream(body)) { | ||
} | ||
if (isReadableNodeStream(body)) { | ||
@@ -85,3 +75,2 @@ return _duplexify({ | ||
} | ||
if (isWritableNodeStream(body)) { | ||
@@ -92,3 +81,2 @@ return _duplexify({ | ||
} | ||
if (isNodeStream(body)) { | ||
@@ -99,6 +87,9 @@ return _duplexify({ | ||
}) | ||
} // TODO: Webstreams | ||
} | ||
// TODO: Webstreams | ||
// if (isReadableStream(body)) { | ||
// return _duplexify({ readable: Readable.fromWeb(body) }); | ||
// } | ||
// TODO: Webstreams | ||
@@ -111,3 +102,2 @@ // if (isWritableStream(body)) { | ||
const { value, write, final, destroy } = fromAsyncGen(body) | ||
if (isIterable(value)) { | ||
@@ -122,5 +112,3 @@ return from(Duplexify, value, { | ||
} | ||
const then = value === null || value === undefined ? undefined : value.then | ||
if (typeof then === 'function') { | ||
@@ -145,3 +133,2 @@ let d | ||
write, | ||
final(cb) { | ||
@@ -157,14 +144,10 @@ final(async () => { | ||
}, | ||
destroy | ||
})) | ||
} | ||
throw new ERR_INVALID_RETURN_VALUE('Iterable, AsyncIterable or AsyncFunction', name, value) | ||
} | ||
if (isBlob(body)) { | ||
return duplexify(body.arrayBuffer()) | ||
} | ||
if (isIterable(body)) { | ||
@@ -176,3 +159,5 @@ return from(Duplexify, body, { | ||
}) | ||
} // TODO: Webstreams. | ||
} | ||
// TODO: Webstreams. | ||
// if ( | ||
@@ -210,5 +195,3 @@ // isReadableStream(body?.readable) && | ||
} | ||
const then = body === null || body === undefined ? undefined : body.then | ||
if (typeof then === 'function') { | ||
@@ -223,3 +206,2 @@ let d | ||
} | ||
d.push(null) | ||
@@ -234,7 +216,5 @@ }, | ||
writable: false, | ||
read() {} | ||
})) | ||
} | ||
throw new ERR_INVALID_ARG_TYPE( | ||
@@ -256,3 +236,2 @@ name, | ||
} | ||
function fromAsyncGen(fn) { | ||
@@ -284,7 +263,5 @@ let { promise, resolve } = createDeferredPromise() | ||
value, | ||
write(chunk, encoding, cb) { | ||
const _resolve = resolve | ||
resolve = null | ||
_resolve({ | ||
@@ -296,7 +273,5 @@ chunk, | ||
}, | ||
final(cb) { | ||
const _resolve = resolve | ||
resolve = null | ||
_resolve({ | ||
@@ -307,3 +282,2 @@ done: true, | ||
}, | ||
destroy(err, cb) { | ||
@@ -315,3 +289,2 @@ ac.abort() | ||
} | ||
function _duplexify(pair) { | ||
@@ -327,7 +300,5 @@ const r = pair.readable && typeof pair.readable.read !== 'function' ? Readable.wrap(pair.readable) : pair.readable | ||
let d | ||
function onfinished(err) { | ||
const cb = onclose | ||
onclose = null | ||
if (cb) { | ||
@@ -340,6 +311,7 @@ cb(err) | ||
} | ||
} // TODO(ronag): Avoid double buffering. | ||
} | ||
// TODO(ronag): Avoid double buffering. | ||
// Implement Writable/Readable/Duplex traits. | ||
// See, https://github.com/nodejs/node/pull/33515. | ||
d = new Duplexify({ | ||
@@ -352,14 +324,10 @@ // TODO (ronag): highWaterMark? | ||
}) | ||
if (writable) { | ||
eos(w, (err) => { | ||
writable = false | ||
if (err) { | ||
destroyer(r, err) | ||
} | ||
onfinished(err) | ||
}) | ||
d._write = function (chunk, encoding, callback) { | ||
@@ -372,3 +340,2 @@ if (w.write(chunk, encoding)) { | ||
} | ||
d._final = function (callback) { | ||
@@ -378,3 +345,2 @@ w.end() | ||
} | ||
w.on('drain', function () { | ||
@@ -395,11 +361,8 @@ if (ondrain) { | ||
} | ||
if (readable) { | ||
eos(r, (err) => { | ||
readable = false | ||
if (err) { | ||
destroyer(r, err) | ||
} | ||
onfinished(err) | ||
@@ -417,7 +380,5 @@ }) | ||
}) | ||
d._read = function () { | ||
while (true) { | ||
const buf = r.read() | ||
if (buf === null) { | ||
@@ -427,3 +388,2 @@ onreadable = d._read | ||
} | ||
if (!d.push(buf)) { | ||
@@ -435,3 +395,2 @@ return | ||
} | ||
d._destroy = function (err, callback) { | ||
@@ -441,7 +400,5 @@ if (!err && onclose !== null) { | ||
} | ||
onreadable = null | ||
ondrain = null | ||
onfinish = null | ||
if (onclose === null) { | ||
@@ -455,4 +412,3 @@ callback(err) | ||
} | ||
return d | ||
} |
/* replacement start */ | ||
const process = require('process') | ||
const process = require('process/') | ||
/* replacement end */ | ||
@@ -8,13 +10,7 @@ // Ported from https://github.com/mafintosh/end-of-stream with | ||
;('use strict') | ||
const { AbortError, codes } = require('../../ours/errors') | ||
const { ERR_INVALID_ARG_TYPE, ERR_STREAM_PREMATURE_CLOSE } = codes | ||
const { kEmptyObject, once } = require('../../ours/util') | ||
const { validateAbortSignal, validateFunction, validateObject } = require('../validators') | ||
const { Promise } = require('../../ours/primordials') | ||
const { | ||
@@ -33,12 +29,8 @@ isClosed, | ||
} = require('./utils') | ||
function isRequest(stream) { | ||
return stream.setHeader && typeof stream.abort === 'function' | ||
} | ||
const nop = () => {} | ||
function eos(stream, options, callback) { | ||
var _options$readable, _options$writable | ||
if (arguments.length === 2) { | ||
@@ -52,3 +44,2 @@ callback = options | ||
} | ||
validateFunction(callback, 'callback') | ||
@@ -65,3 +56,2 @@ validateAbortSignal(options.signal, 'options.signal') | ||
: isWritableNodeStream(stream) | ||
if (!isNodeStream(stream)) { | ||
@@ -71,6 +61,4 @@ // TODO: Webstreams. | ||
} | ||
const wState = stream._writableState | ||
const rState = stream._readableState | ||
const onlegacyfinish = () => { | ||
@@ -80,23 +68,21 @@ if (!stream.writable) { | ||
} | ||
} // TODO (ronag): Improve soft detection to include core modules and | ||
} | ||
// TODO (ronag): Improve soft detection to include core modules and | ||
// common ecosystem modules that do properly emit 'close' but fail | ||
// this generic check. | ||
let willEmitClose = | ||
_willEmitClose(stream) && isReadableNodeStream(stream) === readable && isWritableNodeStream(stream) === writable | ||
let writableFinished = isWritableFinished(stream, false) | ||
const onfinish = () => { | ||
writableFinished = true // Stream should not be destroyed here. If it is that | ||
writableFinished = true | ||
// Stream should not be destroyed here. If it is that | ||
// means that user space is doing something differently and | ||
// we cannot trust willEmitClose. | ||
if (stream.destroyed) { | ||
willEmitClose = false | ||
} | ||
if (willEmitClose && (!stream.readable || readable)) { | ||
return | ||
} | ||
if (!readable || readableFinished) { | ||
@@ -106,18 +92,14 @@ callback.call(stream) | ||
} | ||
let readableFinished = isReadableFinished(stream, false) | ||
const onend = () => { | ||
readableFinished = true // Stream should not be destroyed here. If it is that | ||
readableFinished = true | ||
// Stream should not be destroyed here. If it is that | ||
// means that user space is doing something differently and | ||
// we cannot trust willEmitClose. | ||
if (stream.destroyed) { | ||
willEmitClose = false | ||
} | ||
if (willEmitClose && (!stream.writable || writable)) { | ||
return | ||
} | ||
if (!writable || writableFinished) { | ||
@@ -127,39 +109,28 @@ callback.call(stream) | ||
} | ||
const onerror = (err) => { | ||
callback.call(stream, err) | ||
} | ||
let closed = isClosed(stream) | ||
const onclose = () => { | ||
closed = true | ||
const errored = isWritableErrored(stream) || isReadableErrored(stream) | ||
if (errored && typeof errored !== 'boolean') { | ||
return callback.call(stream, errored) | ||
} | ||
if (readable && !readableFinished && isReadableNodeStream(stream, true)) { | ||
if (!isReadableFinished(stream, false)) return callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE()) | ||
} | ||
if (writable && !writableFinished) { | ||
if (!isWritableFinished(stream, false)) return callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE()) | ||
} | ||
callback.call(stream) | ||
} | ||
const onrequest = () => { | ||
stream.req.on('finish', onfinish) | ||
} | ||
if (isRequest(stream)) { | ||
stream.on('complete', onfinish) | ||
if (!willEmitClose) { | ||
stream.on('abort', onclose) | ||
} | ||
if (stream.req) { | ||
@@ -174,17 +145,14 @@ onrequest() | ||
stream.on('close', onlegacyfinish) | ||
} // Not all streams will emit 'close' after 'aborted'. | ||
} | ||
// Not all streams will emit 'close' after 'aborted'. | ||
if (!willEmitClose && typeof stream.aborted === 'boolean') { | ||
stream.on('aborted', onclose) | ||
} | ||
stream.on('end', onend) | ||
stream.on('finish', onfinish) | ||
if (options.error !== false) { | ||
stream.on('error', onerror) | ||
} | ||
stream.on('close', onclose) | ||
if (closed) { | ||
@@ -214,3 +182,2 @@ process.nextTick(onclose) | ||
} | ||
const cleanup = () => { | ||
@@ -230,3 +197,2 @@ callback = nop | ||
} | ||
if (options.signal && !closed) { | ||
@@ -244,3 +210,2 @@ const abort = () => { | ||
} | ||
if (options.signal.aborted) { | ||
@@ -257,6 +222,4 @@ process.nextTick(abort) | ||
} | ||
return cleanup | ||
} | ||
function finished(stream, opts) { | ||
@@ -273,4 +236,3 @@ return new Promise((resolve, reject) => { | ||
} | ||
module.exports = eos | ||
module.exports.finished = finished |
@@ -5,14 +5,11 @@ 'use strict' | ||
const process = require('process') | ||
const process = require('process/') | ||
/* replacement end */ | ||
const { PromisePrototypeThen, SymbolAsyncIterator, SymbolIterator } = require('../../ours/primordials') | ||
const { Buffer } = require('buffer') | ||
const { ERR_INVALID_ARG_TYPE, ERR_STREAM_NULL_VALUES } = require('../../ours/errors').codes | ||
function from(Readable, iterable, opts) { | ||
let iterator | ||
if (typeof iterable === 'string' || iterable instanceof Buffer) { | ||
@@ -22,3 +19,2 @@ return new Readable({ | ||
...opts, | ||
read() { | ||
@@ -30,5 +26,3 @@ this.push(iterable) | ||
} | ||
let isAsync | ||
if (iterable && iterable[SymbolAsyncIterator]) { | ||
@@ -43,3 +37,2 @@ isAsync = true | ||
} | ||
const readable = new Readable({ | ||
@@ -50,7 +43,7 @@ objectMode: true, | ||
...opts | ||
}) // Flag to protect against _read | ||
}) | ||
// Flag to protect against _read | ||
// being called before last iteration completion. | ||
let reading = false | ||
readable._read = function () { | ||
@@ -62,19 +55,16 @@ if (!reading) { | ||
} | ||
readable._destroy = function (error, cb) { | ||
PromisePrototypeThen( | ||
close(error), | ||
() => process.nextTick(cb, error), // nextTick is here in case cb throws | ||
() => process.nextTick(cb, error), | ||
// nextTick is here in case cb throws | ||
(e) => process.nextTick(cb, e || error) | ||
) | ||
} | ||
async function close(error) { | ||
const hadError = error !== undefined && error !== null | ||
const hasThrow = typeof iterator.throw === 'function' | ||
if (hadError && hasThrow) { | ||
const { value, done } = await iterator.throw(error) | ||
await value | ||
if (done) { | ||
@@ -84,3 +74,2 @@ return | ||
} | ||
if (typeof iterator.return === 'function') { | ||
@@ -91,3 +80,2 @@ const { value } = await iterator.return() | ||
} | ||
async function next() { | ||
@@ -97,3 +85,2 @@ for (;;) { | ||
const { value, done } = isAsync ? await iterator.next() : iterator.next() | ||
if (done) { | ||
@@ -103,3 +90,2 @@ readable.push(null) | ||
const res = value && typeof value.then === 'function' ? await value : value | ||
if (res === null) { | ||
@@ -117,10 +103,7 @@ reading = false | ||
} | ||
break | ||
} | ||
} | ||
return readable | ||
} | ||
module.exports = from |
@@ -7,16 +7,10 @@ // LazyTransform is a special type of Transform stream that is lazily loaded. | ||
const { ObjectDefineProperties, ObjectDefineProperty, ObjectSetPrototypeOf } = require('../../ours/primordials') | ||
const stream = require('../../stream') | ||
const { getDefaultEncoding } = require('../crypto/util') | ||
module.exports = LazyTransform | ||
function LazyTransform(options) { | ||
this._options = options | ||
} | ||
ObjectSetPrototypeOf(LazyTransform.prototype, stream.Transform.prototype) | ||
ObjectSetPrototypeOf(LazyTransform, stream.Transform) | ||
function makeGetter(name) { | ||
@@ -26,11 +20,8 @@ return function () { | ||
this._writableState.decodeStrings = false | ||
if (!this._options || !this._options.defaultEncoding) { | ||
this._writableState.defaultEncoding = getDefaultEncoding() | ||
} | ||
return this[name] | ||
} | ||
} | ||
function makeSetter(name) { | ||
@@ -47,3 +38,2 @@ return function (val) { | ||
} | ||
ObjectDefineProperties(LazyTransform.prototype, { | ||
@@ -50,0 +40,0 @@ _readableState: { |
'use strict' | ||
const { ArrayIsArray, ObjectSetPrototypeOf } = require('../../ours/primordials') | ||
const { EventEmitter: EE } = require('events') | ||
function Stream(opts) { | ||
EE.call(this, opts) | ||
} | ||
ObjectSetPrototypeOf(Stream.prototype, EE.prototype) | ||
ObjectSetPrototypeOf(Stream, EE) | ||
Stream.prototype.pipe = function (dest, options) { | ||
const source = this | ||
function ondata(chunk) { | ||
@@ -22,5 +17,3 @@ if (dest.writable && dest.write(chunk) === false && source.pause) { | ||
} | ||
source.on('data', ondata) | ||
function ondrain() { | ||
@@ -31,6 +24,6 @@ if (source.readable && source.resume) { | ||
} | ||
dest.on('drain', ondrain) | ||
dest.on('drain', ondrain) // If the 'end' option is not supplied, dest.end() will be called when | ||
// If the 'end' option is not supplied, dest.end() will be called when | ||
// source gets the 'end' or 'close' events. Only dest.end() once. | ||
if (!dest._isStdio && (!options || options.end !== false)) { | ||
@@ -40,5 +33,3 @@ source.on('end', onend) | ||
} | ||
let didOnEnd = false | ||
function onend() { | ||
@@ -49,3 +40,2 @@ if (didOnEnd) return | ||
} | ||
function onclose() { | ||
@@ -55,7 +45,7 @@ if (didOnEnd) return | ||
if (typeof dest.destroy === 'function') dest.destroy() | ||
} // Don't leave dangling pipes when there are errors. | ||
} | ||
// Don't leave dangling pipes when there are errors. | ||
function onerror(er) { | ||
cleanup() | ||
if (EE.listenerCount(this, 'error') === 0) { | ||
@@ -65,6 +55,6 @@ this.emit('error', er) | ||
} | ||
prependListener(source, 'error', onerror) | ||
prependListener(dest, 'error', onerror) // Remove all the event listeners that were added. | ||
prependListener(dest, 'error', onerror) | ||
// Remove all the event listeners that were added. | ||
function cleanup() { | ||
@@ -81,19 +71,19 @@ source.removeListener('data', ondata) | ||
} | ||
source.on('end', cleanup) | ||
source.on('close', cleanup) | ||
dest.on('close', cleanup) | ||
dest.emit('pipe', source) // Allow for unix-like usage: A.pipe(B).pipe(C) | ||
dest.emit('pipe', source) | ||
// Allow for unix-like usage: A.pipe(B).pipe(C) | ||
return dest | ||
} | ||
function prependListener(emitter, event, fn) { | ||
// Sadly this is not cacheable as some libraries bundle their own | ||
// event emitter implementation with them. | ||
if (typeof emitter.prependListener === 'function') return emitter.prependListener(event, fn) // This is a hack to make sure that our error handler is attached before any | ||
if (typeof emitter.prependListener === 'function') return emitter.prependListener(event, fn) | ||
// This is a hack to make sure that our error handler is attached before any | ||
// userland ones. NEVER DO THIS. This is here only because this code needs | ||
// to continue to work with older versions of Node.js that do not include | ||
// the prependListener() method. The goal is to eventually remove this hack. | ||
if (!emitter._events || !emitter._events[event]) emitter.on(event, fn) | ||
@@ -103,3 +93,2 @@ else if (ArrayIsArray(emitter._events[event])) emitter._events[event].unshift(fn) | ||
} | ||
module.exports = { | ||
@@ -106,0 +95,0 @@ Stream, |
'use strict' | ||
const AbortController = globalThis.AbortController || require('abort-controller').AbortController | ||
const { | ||
@@ -9,9 +8,5 @@ codes: { ERR_INVALID_ARG_TYPE, ERR_MISSING_ARGS, ERR_OUT_OF_RANGE }, | ||
} = require('../../ours/errors') | ||
const { validateAbortSignal, validateInteger, validateObject } = require('../validators') | ||
const kWeakHandler = require('../../ours/primordials').Symbol('kWeak') | ||
const { finished } = require('./end-of-stream') | ||
const { | ||
@@ -27,6 +22,4 @@ ArrayPrototypePush, | ||
} = require('../../ours/primordials') | ||
const kEmpty = Symbol('kEmpty') | ||
const kEof = Symbol('kEof') | ||
function map(fn, options) { | ||
@@ -36,21 +29,15 @@ if (typeof fn !== 'function') { | ||
} | ||
if (options != null) { | ||
validateObject(options, 'options') | ||
} | ||
if ((options === null || options === undefined ? undefined : options.signal) != null) { | ||
validateAbortSignal(options.signal, 'options.signal') | ||
} | ||
let concurrency = 1 | ||
if ((options === null || options === undefined ? undefined : options.concurrency) != null) { | ||
concurrency = MathFloor(options.concurrency) | ||
} | ||
validateInteger(concurrency, 'concurrency', 1) | ||
return async function* map() { | ||
var _options$signal, _options$signal2 | ||
const ac = new AbortController() | ||
@@ -63,5 +50,3 @@ const stream = this | ||
} | ||
const abort = () => ac.abort() | ||
if ( | ||
@@ -76,3 +61,2 @@ options !== null && | ||
} | ||
options === null || options === undefined | ||
@@ -86,7 +70,5 @@ ? undefined | ||
let done = false | ||
function onDone() { | ||
done = true | ||
} | ||
async function pump() { | ||
@@ -96,11 +78,8 @@ try { | ||
var _val | ||
if (done) { | ||
return | ||
} | ||
if (signal.aborted) { | ||
throw new AbortError() | ||
} | ||
try { | ||
@@ -111,13 +90,9 @@ val = fn(val, signalOpt) | ||
} | ||
if (val === kEmpty) { | ||
continue | ||
} | ||
if (typeof ((_val = val) === null || _val === undefined ? undefined : _val.catch) === 'function') { | ||
val.catch(onDone) | ||
} | ||
queue.push(val) | ||
if (next) { | ||
@@ -127,3 +102,2 @@ next() | ||
} | ||
if (!done && queue.length && queue.length >= concurrency) { | ||
@@ -135,3 +109,2 @@ await new Promise((resolve) => { | ||
} | ||
queue.push(kEof) | ||
@@ -144,5 +117,3 @@ } catch (err) { | ||
var _options$signal3 | ||
done = true | ||
if (next) { | ||
@@ -152,3 +123,2 @@ next() | ||
} | ||
options === null || options === undefined | ||
@@ -161,5 +131,3 @@ ? undefined | ||
} | ||
pump() | ||
try { | ||
@@ -169,17 +137,12 @@ while (true) { | ||
const val = await queue[0] | ||
if (val === kEof) { | ||
return | ||
} | ||
if (signal.aborted) { | ||
throw new AbortError() | ||
} | ||
if (val !== kEmpty) { | ||
yield val | ||
} | ||
queue.shift() | ||
if (resume) { | ||
@@ -190,3 +153,2 @@ resume() | ||
} | ||
await new Promise((resolve) => { | ||
@@ -199,3 +161,2 @@ next = resolve | ||
done = true | ||
if (resume) { | ||
@@ -208,3 +169,2 @@ resume() | ||
} | ||
function asIndexedPairs(options = undefined) { | ||
@@ -214,13 +174,9 @@ if (options != null) { | ||
} | ||
if ((options === null || options === undefined ? undefined : options.signal) != null) { | ||
validateAbortSignal(options.signal, 'options.signal') | ||
} | ||
return async function* asIndexedPairs() { | ||
let index = 0 | ||
for await (const val of this) { | ||
var _options$signal4 | ||
if ( | ||
@@ -237,3 +193,2 @@ options !== null && | ||
} | ||
yield [index++, val] | ||
@@ -243,3 +198,2 @@ } | ||
} | ||
async function some(fn, options = undefined) { | ||
@@ -249,11 +203,9 @@ for await (const unused of filter.call(this, fn, options)) { | ||
} | ||
return false | ||
} | ||
async function every(fn, options = undefined) { | ||
if (typeof fn !== 'function') { | ||
throw new ERR_INVALID_ARG_TYPE('fn', ['Function', 'AsyncFunction'], fn) | ||
} // https://en.wikipedia.org/wiki/De_Morgan%27s_laws | ||
} | ||
// https://en.wikipedia.org/wiki/De_Morgan%27s_laws | ||
return !(await some.call( | ||
@@ -267,3 +219,2 @@ this, | ||
} | ||
async function find(fn, options) { | ||
@@ -273,6 +224,4 @@ for await (const result of filter.call(this, fn, options)) { | ||
} | ||
return undefined | ||
} | ||
async function forEach(fn, options) { | ||
@@ -282,11 +231,9 @@ if (typeof fn !== 'function') { | ||
} | ||
async function forEachFn(value, options) { | ||
await fn(value, options) | ||
return kEmpty | ||
} // eslint-disable-next-line no-unused-vars | ||
} | ||
// eslint-disable-next-line no-unused-vars | ||
for await (const unused of map.call(this, forEachFn, options)); | ||
} | ||
function filter(fn, options) { | ||
@@ -296,3 +243,2 @@ if (typeof fn !== 'function') { | ||
} | ||
async function filterFn(value, options) { | ||
@@ -302,10 +248,9 @@ if (await fn(value, options)) { | ||
} | ||
return kEmpty | ||
} | ||
return map.call(this, filterFn, options) | ||
} | ||
return map.call(this, filterFn, options) | ||
} // Specific to provide better error to reduce since the argument is only | ||
// Specific to provide better error to reduce since the argument is only | ||
// missing if the stream has no items in it - but the code is still appropriate | ||
class ReduceAwareErrMissingArgs extends ERR_MISSING_ARGS { | ||
@@ -317,20 +262,14 @@ constructor() { | ||
} | ||
async function reduce(reducer, initialValue, options) { | ||
var _options$signal5 | ||
if (typeof reducer !== 'function') { | ||
throw new ERR_INVALID_ARG_TYPE('reducer', ['Function', 'AsyncFunction'], reducer) | ||
} | ||
if (options != null) { | ||
validateObject(options, 'options') | ||
} | ||
if ((options === null || options === undefined ? undefined : options.signal) != null) { | ||
validateAbortSignal(options.signal, 'options.signal') | ||
} | ||
let hasInitialValue = arguments.length > 1 | ||
if ( | ||
@@ -347,10 +286,7 @@ options !== null && | ||
this.once('error', () => {}) // The error is already propagated | ||
await finished(this.destroy(err)) | ||
throw err | ||
} | ||
const ac = new AbortController() | ||
const signal = ac.signal | ||
if (options !== null && options !== undefined && options.signal) { | ||
@@ -363,11 +299,7 @@ const opts = { | ||
} | ||
let gotAnyItemFromStream = false | ||
try { | ||
for await (const value of this) { | ||
var _options$signal6 | ||
gotAnyItemFromStream = true | ||
if ( | ||
@@ -382,3 +314,2 @@ options !== null && | ||
} | ||
if (!hasInitialValue) { | ||
@@ -393,3 +324,2 @@ initialValue = value | ||
} | ||
if (!gotAnyItemFromStream && !hasInitialValue) { | ||
@@ -401,6 +331,4 @@ throw new ReduceAwareErrMissingArgs() | ||
} | ||
return initialValue | ||
} | ||
async function toArray(options) { | ||
@@ -410,12 +338,8 @@ if (options != null) { | ||
} | ||
if ((options === null || options === undefined ? undefined : options.signal) != null) { | ||
validateAbortSignal(options.signal, 'options.signal') | ||
} | ||
const result = [] | ||
for await (const val of this) { | ||
var _options$signal7 | ||
if ( | ||
@@ -432,9 +356,6 @@ options !== null && | ||
} | ||
ArrayPrototypePush(result, val) | ||
} | ||
return result | ||
} | ||
function flatMap(fn, options) { | ||
@@ -448,3 +369,2 @@ const values = map.call(this, fn, options) | ||
} | ||
function toIntegerOrInfinity(number) { | ||
@@ -454,14 +374,10 @@ // We coerce here to align with the spec | ||
number = Number(number) | ||
if (NumberIsNaN(number)) { | ||
return 0 | ||
} | ||
if (number < 0) { | ||
throw new ERR_OUT_OF_RANGE('number', '>= 0', number) | ||
} | ||
return number | ||
} | ||
function drop(number, options = undefined) { | ||
@@ -471,11 +387,8 @@ if (options != null) { | ||
} | ||
if ((options === null || options === undefined ? undefined : options.signal) != null) { | ||
validateAbortSignal(options.signal, 'options.signal') | ||
} | ||
number = toIntegerOrInfinity(number) | ||
return async function* drop() { | ||
var _options$signal8 | ||
if ( | ||
@@ -490,6 +403,4 @@ options !== null && | ||
} | ||
for await (const val of this) { | ||
var _options$signal9 | ||
if ( | ||
@@ -504,3 +415,2 @@ options !== null && | ||
} | ||
if (number-- <= 0) { | ||
@@ -512,3 +422,2 @@ yield val | ||
} | ||
function take(number, options = undefined) { | ||
@@ -518,11 +427,8 @@ if (options != null) { | ||
} | ||
if ((options === null || options === undefined ? undefined : options.signal) != null) { | ||
validateAbortSignal(options.signal, 'options.signal') | ||
} | ||
number = toIntegerOrInfinity(number) | ||
return async function* take() { | ||
var _options$signal10 | ||
if ( | ||
@@ -537,6 +443,4 @@ options !== null && | ||
} | ||
for await (const val of this) { | ||
var _options$signal11 | ||
if ( | ||
@@ -551,3 +455,2 @@ options !== null && | ||
} | ||
if (number-- > 0) { | ||
@@ -561,3 +464,2 @@ yield val | ||
} | ||
module.exports.streamReturningOperators = { | ||
@@ -564,0 +466,0 @@ asIndexedPairs, |
@@ -21,16 +21,14 @@ // Copyright Joyent, Inc. and other Node contributors. | ||
// USE OR OTHER DEALINGS IN THE SOFTWARE. | ||
// a passthrough stream. | ||
// basically just the most minimal sort of Transform stream. | ||
// Every written chunk gets output as-is. | ||
'use strict' | ||
const { ObjectSetPrototypeOf } = require('../../ours/primordials') | ||
module.exports = PassThrough | ||
const Transform = require('./transform') | ||
ObjectSetPrototypeOf(PassThrough.prototype, Transform.prototype) | ||
ObjectSetPrototypeOf(PassThrough, Transform) | ||
function PassThrough(options) { | ||
@@ -40,5 +38,4 @@ if (!(this instanceof PassThrough)) return new PassThrough(options) | ||
} | ||
PassThrough.prototype._transform = function (chunk, encoding, cb) { | ||
cb(null, chunk) | ||
} |
/* replacement start */ | ||
const process = require('process') | ||
const process = require('process/') | ||
/* replacement end */ | ||
@@ -8,13 +10,7 @@ // Ported from https://github.com/mafintosh/pump with | ||
;('use strict') | ||
const { ArrayIsArray, Promise, SymbolAsyncIterator } = require('../../ours/primordials') | ||
const eos = require('./end-of-stream') | ||
const { once } = require('../../ours/util') | ||
const destroyImpl = require('./destroy') | ||
const Duplex = require('./duplex') | ||
const { | ||
@@ -31,12 +27,7 @@ aggregateTwoErrors, | ||
} = require('../../ours/errors') | ||
const { validateFunction, validateAbortSignal } = require('../validators') | ||
const { isIterable, isReadable, isReadableNodeStream, isNodeStream } = require('./utils') | ||
const AbortController = globalThis.AbortController || require('abort-controller').AbortController | ||
let PassThrough | ||
let Readable | ||
function destroyer(stream, reading, writing) { | ||
@@ -66,3 +57,2 @@ let finished = false | ||
} | ||
function popCallback(streams) { | ||
@@ -75,3 +65,2 @@ // Streams should never be an empty array. It should always contain at least | ||
} | ||
function makeAsyncIterable(val) { | ||
@@ -84,6 +73,4 @@ if (isIterable(val)) { | ||
} | ||
throw new ERR_INVALID_ARG_TYPE('val', ['Readable', 'Iterable', 'AsyncIterable'], val) | ||
} | ||
async function* fromReadable(val) { | ||
@@ -93,10 +80,7 @@ if (!Readable) { | ||
} | ||
yield* Readable.prototype[SymbolAsyncIterator].call(val) | ||
} | ||
async function pump(iterable, writable, finish, { end }) { | ||
let error | ||
let onresolve = null | ||
const resume = (err) => { | ||
@@ -106,3 +90,2 @@ if (err) { | ||
} | ||
if (onresolve) { | ||
@@ -114,3 +97,2 @@ const callback = onresolve | ||
} | ||
const wait = () => | ||
@@ -130,3 +112,2 @@ new Promise((resolve, reject) => { | ||
}) | ||
writable.on('drain', resume) | ||
@@ -140,3 +121,2 @@ const cleanup = eos( | ||
) | ||
try { | ||
@@ -146,3 +126,2 @@ if (writable.writableNeedDrain) { | ||
} | ||
for await (const chunk of iterable) { | ||
@@ -153,7 +132,5 @@ if (!writable.write(chunk)) { | ||
} | ||
if (end) { | ||
writable.end() | ||
} | ||
await wait() | ||
@@ -168,7 +145,5 @@ finish() | ||
} | ||
function pipeline(...streams) { | ||
return pipelineImpl(streams, once(popCallback(streams))) | ||
} | ||
function pipelineImpl(streams, callback, opts) { | ||
@@ -178,19 +153,16 @@ if (streams.length === 1 && ArrayIsArray(streams[0])) { | ||
} | ||
if (streams.length < 2) { | ||
throw new ERR_MISSING_ARGS('streams') | ||
} | ||
const ac = new AbortController() | ||
const signal = ac.signal | ||
const outerSignal = opts === null || opts === undefined ? undefined : opts.signal // Need to cleanup event listeners if last stream is readable | ||
const outerSignal = opts === null || opts === undefined ? undefined : opts.signal | ||
// Need to cleanup event listeners if last stream is readable | ||
// https://github.com/nodejs/node/issues/35452 | ||
const lastStreamCleanup = [] | ||
validateAbortSignal(outerSignal, 'options.signal') | ||
function abort() { | ||
finishImpl(new AbortError()) | ||
} | ||
outerSignal === null || outerSignal === undefined ? undefined : outerSignal.addEventListener('abort', abort) | ||
@@ -201,7 +173,5 @@ let error | ||
let finishCount = 0 | ||
function finish(err) { | ||
finishImpl(err, --finishCount === 0) | ||
} | ||
function finishImpl(err, final) { | ||
@@ -211,14 +181,10 @@ if (err && (!error || error.code === 'ERR_STREAM_PREMATURE_CLOSE')) { | ||
} | ||
if (!error && !final) { | ||
return | ||
} | ||
while (destroys.length) { | ||
destroys.shift()(error) | ||
} | ||
outerSignal === null || outerSignal === undefined ? undefined : outerSignal.removeEventListener('abort', abort) | ||
ac.abort() | ||
if (final) { | ||
@@ -228,9 +194,6 @@ if (!error) { | ||
} | ||
process.nextTick(callback, error, value) | ||
} | ||
} | ||
let ret | ||
for (let i = 0; i < streams.length; i++) { | ||
@@ -242,3 +205,2 @@ const stream = streams[i] | ||
const isLastStream = i === streams.length - 1 | ||
if (isNodeStream(stream)) { | ||
@@ -248,8 +210,8 @@ if (end) { | ||
destroys.push(destroy) | ||
if (isReadable(stream) && isLastStream) { | ||
lastStreamCleanup.push(cleanup) | ||
} | ||
} // Catch stream errors that occur after pipe/pump has completed. | ||
} | ||
// Catch stream errors that occur after pipe/pump has completed. | ||
function onError(err) { | ||
@@ -260,5 +222,3 @@ if (err && err.name !== 'AbortError' && err.code !== 'ERR_STREAM_PREMATURE_CLOSE') { | ||
} | ||
stream.on('error', onError) | ||
if (isReadable(stream) && isLastStream) { | ||
@@ -270,3 +230,2 @@ lastStreamCleanup.push(() => { | ||
} | ||
if (i === 0) { | ||
@@ -277,3 +236,2 @@ if (typeof stream === 'function') { | ||
}) | ||
if (!isIterable(ret)) { | ||
@@ -292,3 +250,2 @@ throw new ERR_INVALID_RETURN_VALUE('Iterable, AsyncIterable or Stream', 'source', ret) | ||
}) | ||
if (reading) { | ||
@@ -300,6 +257,7 @@ if (!isIterable(ret, true)) { | ||
var _ret | ||
if (!PassThrough) { | ||
PassThrough = require('./passthrough') | ||
} // If the last argument to pipeline is not a stream | ||
} | ||
// If the last argument to pipeline is not a stream | ||
// we must create a proxy stream so that pipeline(...) | ||
@@ -311,7 +269,7 @@ // always returns a stream which can be further | ||
objectMode: true | ||
}) // Handle Promises/A+ spec, `then` could be a getter that throws on | ||
}) | ||
// Handle Promises/A+ spec, `then` could be a getter that throws on | ||
// second use. | ||
const then = (_ret = ret) === null || _ret === undefined ? undefined : _ret.then | ||
if (typeof then === 'function') { | ||
@@ -323,11 +281,8 @@ finishCount++ | ||
value = val | ||
if (val != null) { | ||
pt.write(val) | ||
} | ||
if (end) { | ||
pt.end() | ||
} | ||
process.nextTick(finish) | ||
@@ -348,7 +303,5 @@ }, | ||
} | ||
ret = pt | ||
const { destroy, cleanup } = destroyer(ret, false, true) | ||
destroys.push(destroy) | ||
if (isLastStream) { | ||
@@ -364,3 +317,2 @@ lastStreamCleanup.push(cleanup) | ||
}) | ||
if (isReadable(stream) && isLastStream) { | ||
@@ -377,3 +329,2 @@ lastStreamCleanup.push(cleanup) | ||
} | ||
ret = stream | ||
@@ -384,3 +335,2 @@ } else { | ||
} | ||
if ( | ||
@@ -392,6 +342,4 @@ (signal !== null && signal !== undefined && signal.aborted) || | ||
} | ||
return ret | ||
} | ||
function pipe(src, dst, finish, { end }) { | ||
@@ -408,3 +356,2 @@ let ended = false | ||
}) | ||
if (end) { | ||
@@ -421,3 +368,2 @@ // Compat. Before node v10.12.0 stdio used to throw an error so | ||
} | ||
eos( | ||
@@ -431,3 +377,2 @@ src, | ||
const rState = src._readableState | ||
if ( | ||
@@ -464,3 +409,2 @@ err && | ||
} | ||
module.exports = { | ||
@@ -467,0 +411,0 @@ pipelineImpl, |
/* replacement start */ | ||
const process = require('process') | ||
const process = require('process/') | ||
/* replacement end */ | ||
@@ -26,3 +28,2 @@ // Copyright Joyent, Inc. and other Node contributors. | ||
;('use strict') | ||
const { | ||
@@ -41,26 +42,15 @@ ArrayPrototypeIndexOf, | ||
} = require('../../ours/primordials') | ||
module.exports = Readable | ||
Readable.ReadableState = ReadableState | ||
const { EventEmitter: EE } = require('events') | ||
const { Stream, prependListener } = require('./legacy') | ||
const { Buffer } = require('buffer') | ||
const { addAbortSignal } = require('./add-abort-signal') | ||
const eos = require('./end-of-stream') | ||
let debug = require('../../ours/util').debuglog('stream', (fn) => { | ||
debug = fn | ||
}) | ||
const BufferList = require('./buffer_list') | ||
const destroyImpl = require('./destroy') | ||
const { getHighWaterMark, getDefaultHighWaterMark } = require('./state') | ||
const { | ||
@@ -76,18 +66,10 @@ aggregateTwoErrors, | ||
} = require('../../ours/errors') | ||
const { validateObject } = require('../validators') | ||
const kPaused = Symbol('kPaused') | ||
const { StringDecoder } = require('string_decoder') | ||
const from = require('./from') | ||
ObjectSetPrototypeOf(Readable.prototype, Stream.prototype) | ||
ObjectSetPrototypeOf(Readable, Stream) | ||
const nop = () => {} | ||
const { errorOrDestroy } = destroyImpl | ||
function ReadableState(options, stream, isDuplex) { | ||
@@ -99,15 +81,18 @@ // Duplex streams are both readable and writable, but share | ||
// These options can be provided separately as readableXXX and writableXXX. | ||
if (typeof isDuplex !== 'boolean') isDuplex = stream instanceof require('./duplex') // Object stream flag. Used to make read(n) ignore n and to | ||
if (typeof isDuplex !== 'boolean') isDuplex = stream instanceof require('./duplex') | ||
// Object stream flag. Used to make read(n) ignore n and to | ||
// make all the buffer merging and length checks go away. | ||
this.objectMode = !!(options && options.objectMode) | ||
if (isDuplex) this.objectMode = this.objectMode || !!(options && options.readableObjectMode) | ||
this.objectMode = !!(options && options.objectMode) | ||
if (isDuplex) this.objectMode = this.objectMode || !!(options && options.readableObjectMode) // The point at which it stops calling _read() to fill the buffer | ||
// The point at which it stops calling _read() to fill the buffer | ||
// Note: 0 is a valid value, means "don't call _read preemptively ever" | ||
this.highWaterMark = options | ||
? getHighWaterMark(this, options, 'readableHighWaterMark', isDuplex) | ||
: getDefaultHighWaterMark(false) // A linked list is used to store data chunks instead of an array because the | ||
: getDefaultHighWaterMark(false) | ||
// A linked list is used to store data chunks instead of an array because the | ||
// linked list can remove elements from the beginning faster than | ||
// array.shift(). | ||
this.buffer = new BufferList() | ||
@@ -119,15 +104,18 @@ this.length = 0 | ||
this.endEmitted = false | ||
this.reading = false // Stream is still being constructed and cannot be | ||
this.reading = false | ||
// Stream is still being constructed and cannot be | ||
// destroyed until construction finished or failed. | ||
// Async construction is opt in, therefore we start as | ||
// constructed. | ||
this.constructed = true | ||
this.constructed = true // A flag to be able to tell if the event 'readable'/'data' is emitted | ||
// A flag to be able to tell if the event 'readable'/'data' is emitted | ||
// immediately, or on a later tick. We set this to true at first, because | ||
// any actions that shouldn't happen until "later" should generally also | ||
// not happen before the first read call. | ||
this.sync = true | ||
this.sync = true // Whenever we return null, then we set a flag to say | ||
// Whenever we return null, then we set a flag to say | ||
// that we're awaiting a 'readable' event emission. | ||
this.needReadable = false | ||
@@ -137,30 +125,40 @@ this.emittedReadable = false | ||
this.resumeScheduled = false | ||
this[kPaused] = null // True if the error was already emitted and should not be thrown again. | ||
this[kPaused] = null | ||
this.errorEmitted = false // Should close be emitted on destroy. Defaults to true. | ||
// True if the error was already emitted and should not be thrown again. | ||
this.errorEmitted = false | ||
this.emitClose = !options || options.emitClose !== false // Should .destroy() be called after 'end' (and potentially 'finish'). | ||
// Should close be emitted on destroy. Defaults to true. | ||
this.emitClose = !options || options.emitClose !== false | ||
this.autoDestroy = !options || options.autoDestroy !== false // Has it been destroyed. | ||
// Should .destroy() be called after 'end' (and potentially 'finish'). | ||
this.autoDestroy = !options || options.autoDestroy !== false | ||
this.destroyed = false // Indicates whether the stream has errored. When true no further | ||
// Has it been destroyed. | ||
this.destroyed = false | ||
// Indicates whether the stream has errored. When true no further | ||
// _read calls, 'data' or 'readable' events should occur. This is needed | ||
// since when autoDestroy is disabled we need a way to tell whether the | ||
// stream has failed. | ||
this.errored = null | ||
this.errored = null // Indicates whether the stream has finished destroying. | ||
// Indicates whether the stream has finished destroying. | ||
this.closed = false | ||
this.closed = false // True if close has been emitted or would have been emitted | ||
// True if close has been emitted or would have been emitted | ||
// depending on emitClose. | ||
this.closeEmitted = false | ||
this.closeEmitted = false // Crypto is kind of old and crusty. Historically, its default string | ||
// Crypto is kind of old and crusty. Historically, its default string | ||
// encoding is 'binary' so we have to make this configurable. | ||
// Everything else in the universe uses 'utf8', though. | ||
this.defaultEncoding = (options && options.defaultEncoding) || 'utf8' | ||
this.defaultEncoding = (options && options.defaultEncoding) || 'utf8' // Ref the piped dest which we need a drain event on it | ||
// Ref the piped dest which we need a drain event on it | ||
// type: null | Writable | Set<Writable>. | ||
this.awaitDrainWriters = null | ||
this.multiAwaitDrain = false // If true, a maybeReadMore has been scheduled. | ||
this.multiAwaitDrain = false | ||
// If true, a maybeReadMore has been scheduled. | ||
this.readingMore = false | ||
@@ -170,3 +168,2 @@ this.dataEmitted = false | ||
this.encoding = null | ||
if (options && options.encoding) { | ||
@@ -177,11 +174,9 @@ this.decoder = new StringDecoder(options.encoding) | ||
} | ||
function Readable(options) { | ||
if (!(this instanceof Readable)) return new Readable(options) | ||
function Readable(options) { | ||
if (!(this instanceof Readable)) return new Readable(options) // Checking for a Stream.Duplex instance is faster here instead of inside | ||
// Checking for a Stream.Duplex instance is faster here instead of inside | ||
// the ReadableState constructor, at least with V8 6.5. | ||
const isDuplex = this instanceof require('./duplex') | ||
this._readableState = new ReadableState(options, this, isDuplex) | ||
if (options) { | ||
@@ -193,3 +188,2 @@ if (typeof options.read === 'function') this._read = options.read | ||
} | ||
Stream.call(this, options) | ||
@@ -202,25 +196,23 @@ destroyImpl.construct(this, () => { | ||
} | ||
Readable.prototype.destroy = destroyImpl.destroy | ||
Readable.prototype._undestroy = destroyImpl.undestroy | ||
Readable.prototype._destroy = function (err, cb) { | ||
cb(err) | ||
} | ||
Readable.prototype[EE.captureRejectionSymbol] = function (err) { | ||
this.destroy(err) | ||
} // Manually shove something into the read() buffer. | ||
} | ||
// Manually shove something into the read() buffer. | ||
// This returns true if the highWaterMark has not been hit yet, | ||
// similar to how Writable.write() returns true if you should | ||
// write() some more. | ||
Readable.prototype.push = function (chunk, encoding) { | ||
return readableAddChunk(this, chunk, encoding, false) | ||
} // Unshift should *always* be something directly out of read(). | ||
} | ||
// Unshift should *always* be something directly out of read(). | ||
Readable.prototype.unshift = function (chunk, encoding) { | ||
return readableAddChunk(this, chunk, encoding, true) | ||
} | ||
function readableAddChunk(stream, chunk, encoding, addToFront) { | ||
@@ -230,7 +222,5 @@ debug('readableAddChunk', chunk) | ||
let err | ||
if (!state.objectMode) { | ||
if (typeof chunk === 'string') { | ||
encoding = encoding || state.defaultEncoding | ||
if (state.encoding !== encoding) { | ||
@@ -255,3 +245,2 @@ if (addToFront && state.encoding) { | ||
} | ||
if (err) { | ||
@@ -273,3 +262,2 @@ errorOrDestroy(stream, err) | ||
state.reading = false | ||
if (state.decoder && !encoding) { | ||
@@ -286,9 +274,9 @@ chunk = state.decoder.write(chunk) | ||
maybeReadMore(stream, state) | ||
} // We can push more data if we are below the highWaterMark. | ||
} | ||
// We can push more data if we are below the highWaterMark. | ||
// Also, if we have no data yet, we can stand some more bytes. | ||
// This is to work around cases where hwm=0, such as the repl. | ||
return !state.ended && (state.length < state.highWaterMark || state.length === 0) | ||
} | ||
function addChunk(stream, state, chunk, addToFront) { | ||
@@ -303,3 +291,2 @@ if (state.flowing && state.length === 0 && !state.sync && stream.listenerCount('data') > 0) { | ||
} | ||
state.dataEmitted = true | ||
@@ -314,24 +301,21 @@ stream.emit('data', chunk) | ||
} | ||
maybeReadMore(stream, state) | ||
} | ||
Readable.prototype.isPaused = function () { | ||
const state = this._readableState | ||
return state[kPaused] === true || state.flowing === false | ||
} // Backwards compatibility. | ||
} | ||
// Backwards compatibility. | ||
Readable.prototype.setEncoding = function (enc) { | ||
const decoder = new StringDecoder(enc) | ||
this._readableState.decoder = decoder // If setEncoding(null), decoder.encoding equals utf8. | ||
this._readableState.decoder = decoder | ||
// If setEncoding(null), decoder.encoding equals utf8. | ||
this._readableState.encoding = this._readableState.decoder.encoding | ||
const buffer = this._readableState.buffer // Iterate over current buffer to convert already stored Buffers: | ||
const buffer = this._readableState.buffer | ||
// Iterate over current buffer to convert already stored Buffers: | ||
let content = '' | ||
for (const data of buffer) { | ||
content += decoder.write(data) | ||
} | ||
buffer.clear() | ||
@@ -341,6 +325,6 @@ if (content !== '') buffer.push(content) | ||
return this | ||
} // Don't raise the hwm > 1GB. | ||
} | ||
// Don't raise the hwm > 1GB. | ||
const MAX_HWM = 0x40000000 | ||
function computeNewHighWaterMark(n) { | ||
@@ -360,11 +344,10 @@ if (n > MAX_HWM) { | ||
} | ||
return n | ||
} | ||
return n | ||
} // This function is designed to be inlinable, so please take care when making | ||
// This function is designed to be inlinable, so please take care when making | ||
// changes to the function body. | ||
function howMuchToRead(n, state) { | ||
if (n <= 0 || (state.length === 0 && state.ended)) return 0 | ||
if (state.objectMode) return 1 | ||
if (NumberIsNaN(n)) { | ||
@@ -375,11 +358,11 @@ // Only flow one buffer at a time. | ||
} | ||
if (n <= state.length) return n | ||
return state.ended ? state.length : 0 | ||
} // You can override either this method, or the async _read(n) below. | ||
} | ||
// You can override either this method, or the async _read(n) below. | ||
Readable.prototype.read = function (n) { | ||
debug('read', n) // Same as parseInt(undefined, 10), however V8 7.3 performance regressed | ||
debug('read', n) | ||
// Same as parseInt(undefined, 10), however V8 7.3 performance regressed | ||
// in this scenario, so we are doing it manually. | ||
if (n === undefined) { | ||
@@ -390,11 +373,12 @@ n = NaN | ||
} | ||
const state = this._readableState | ||
const nOrig = n // If we're asking for more than the current hwm, then raise the hwm. | ||
const nOrig = n | ||
// If we're asking for more than the current hwm, then raise the hwm. | ||
if (n > state.highWaterMark) state.highWaterMark = computeNewHighWaterMark(n) | ||
if (n !== 0) state.emittedReadable = false // If we're doing read(0) to trigger a readable event, but we | ||
if (n !== 0) state.emittedReadable = false | ||
// If we're doing read(0) to trigger a readable event, but we | ||
// already have a bunch of data in the buffer, then just trigger | ||
// the 'readable' event and move on. | ||
if ( | ||
@@ -410,9 +394,11 @@ n === 0 && | ||
} | ||
n = howMuchToRead(n, state) | ||
n = howMuchToRead(n, state) // If we've ended, and we're now clear, then finish it up. | ||
// If we've ended, and we're now clear, then finish it up. | ||
if (n === 0 && state.ended) { | ||
if (state.length === 0) endReadable(this) | ||
return null | ||
} // All the actual chunk generation logic needs to be | ||
} | ||
// All the actual chunk generation logic needs to be | ||
// *below* the call to _read. The reason is that in certain | ||
@@ -438,14 +424,16 @@ // synthetic stream cases, such as passthrough streams, _read | ||
// 3. Actually pull the requested chunks out of the buffer and return. | ||
// if we need a readable event, then we need to do some reading. | ||
let doRead = state.needReadable | ||
debug('need readable', doRead) // If we currently have less than the highWaterMark, then also read some. | ||
debug('need readable', doRead) | ||
// If we currently have less than the highWaterMark, then also read some. | ||
if (state.length === 0 || state.length - n < state.highWaterMark) { | ||
doRead = true | ||
debug('length less than watermark', doRead) | ||
} // However, if we've ended, then there's no point, if we're already | ||
} | ||
// However, if we've ended, then there's no point, if we're already | ||
// reading, then it's unnecessary, if we're constructing we have to wait, | ||
// and if we're destroyed or errored, then it's not allowed, | ||
if (state.ended || state.reading || state.destroyed || state.errored || !state.constructed) { | ||
@@ -457,6 +445,7 @@ doRead = false | ||
state.reading = true | ||
state.sync = true // If the length is currently zero, then we *need* a readable event. | ||
state.sync = true | ||
// If the length is currently zero, then we *need* a readable event. | ||
if (state.length === 0) state.needReadable = true | ||
if (state.length === 0) state.needReadable = true // Call internal read method | ||
// Call internal read method | ||
try { | ||
@@ -467,13 +456,10 @@ this._read(state.highWaterMark) | ||
} | ||
state.sync = false // If _read pushed data synchronously, then `reading` will be false, | ||
state.sync = false | ||
// If _read pushed data synchronously, then `reading` will be false, | ||
// and we need to re-evaluate how much data we can return to the user. | ||
if (!state.reading) n = howMuchToRead(nOrig, state) | ||
} | ||
let ret | ||
if (n > 0) ret = fromList(n, state) | ||
else ret = null | ||
if (ret === null) { | ||
@@ -484,3 +470,2 @@ state.needReadable = state.length <= state.highWaterMark | ||
state.length -= n | ||
if (state.multiAwaitDrain) { | ||
@@ -492,11 +477,10 @@ state.awaitDrainWriters.clear() | ||
} | ||
if (state.length === 0) { | ||
// If we have nothing in the buffer, then we want to know | ||
// as soon as we *do* get something into the buffer. | ||
if (!state.ended) state.needReadable = true // If we tried to read() past the EOF, then emit end on the next tick. | ||
if (!state.ended) state.needReadable = true | ||
// If we tried to read() past the EOF, then emit end on the next tick. | ||
if (nOrig !== n && state.ended) endReadable(this) | ||
} | ||
if (ret !== null && !state.errorEmitted && !state.closeEmitted) { | ||
@@ -506,13 +490,9 @@ state.dataEmitted = true | ||
} | ||
return ret | ||
} | ||
function onEofChunk(stream, state) { | ||
debug('onEofChunk') | ||
if (state.ended) return | ||
if (state.decoder) { | ||
const chunk = state.decoder.end() | ||
if (chunk && chunk.length) { | ||
@@ -523,5 +503,3 @@ state.buffer.push(chunk) | ||
} | ||
state.ended = true | ||
if (state.sync) { | ||
@@ -535,11 +513,12 @@ // If we are sync, wait until next tick to emit the data. | ||
state.needReadable = false | ||
state.emittedReadable = true // We have to emit readable now that we are EOF. Modules | ||
state.emittedReadable = true | ||
// We have to emit readable now that we are EOF. Modules | ||
// in the ecosystem (e.g. dicer) rely on this event being sync. | ||
emitReadable_(stream) | ||
} | ||
} // Don't emit readable right away in sync mode, because this can trigger | ||
} | ||
// Don't emit readable right away in sync mode, because this can trigger | ||
// another read() call => stack overflow. This way, it might trigger | ||
// a nextTick recursion warning, but that's not so bad. | ||
function emitReadable(stream) { | ||
@@ -549,3 +528,2 @@ const state = stream._readableState | ||
state.needReadable = false | ||
if (!state.emittedReadable) { | ||
@@ -557,11 +535,11 @@ debug('emitReadable', state.flowing) | ||
} | ||
function emitReadable_(stream) { | ||
const state = stream._readableState | ||
debug('emitReadable_', state.destroyed, state.length, state.ended) | ||
if (!state.destroyed && !state.errored && (state.length || state.ended)) { | ||
stream.emit('readable') | ||
state.emittedReadable = false | ||
} // The stream needs another readable event if: | ||
} | ||
// The stream needs another readable event if: | ||
// 1. It is not flowing, as the flow mechanism will take | ||
@@ -572,6 +550,7 @@ // care of it. | ||
// another readable later. | ||
state.needReadable = !state.flowing && !state.ended && state.length <= state.highWaterMark | ||
flow(stream) | ||
} // At this point, the user has presumably seen the 'readable' event, | ||
} | ||
// At this point, the user has presumably seen the 'readable' event, | ||
// and called read() to consume some data. that may have triggered | ||
@@ -582,3 +561,2 @@ // in turn another _read(n) call, in which case reading = true if | ||
// then go ahead and try to read some more preemptively. | ||
function maybeReadMore(stream, state) { | ||
@@ -590,3 +568,2 @@ if (!state.readingMore && state.constructed) { | ||
} | ||
function maybeReadMore_(stream, state) { | ||
@@ -628,17 +605,15 @@ // Attempt to read more data if we should. | ||
} | ||
state.readingMore = false | ||
} | ||
state.readingMore = false | ||
} // Abstract method. to be overridden in specific implementation classes. | ||
// Abstract method. to be overridden in specific implementation classes. | ||
// call cb(er, data) where data is <= n in length. | ||
// for virtual (non-string, non-buffer) streams, "length" is somewhat | ||
// arbitrary, and perhaps not very meaningful. | ||
Readable.prototype._read = function (n) { | ||
throw new ERR_METHOD_NOT_IMPLEMENTED('_read()') | ||
} | ||
Readable.prototype.pipe = function (dest, pipeOpts) { | ||
const src = this | ||
const state = this._readableState | ||
if (state.pipes.length === 1) { | ||
@@ -650,3 +625,2 @@ if (!state.multiAwaitDrain) { | ||
} | ||
state.pipes.push(dest) | ||
@@ -659,6 +633,4 @@ debug('pipe count=%d opts=%j', state.pipes.length, pipeOpts) | ||
dest.on('unpipe', onunpipe) | ||
function onunpipe(readable, unpipeInfo) { | ||
debug('onunpipe') | ||
if (readable === src) { | ||
@@ -671,3 +643,2 @@ if (unpipeInfo && unpipeInfo.hasUnpiped === false) { | ||
} | ||
function onend() { | ||
@@ -677,16 +648,12 @@ debug('onend') | ||
} | ||
let ondrain | ||
let cleanedUp = false | ||
function cleanup() { | ||
debug('cleanup') // Cleanup event handlers once the pipe is broken. | ||
debug('cleanup') | ||
// Cleanup event handlers once the pipe is broken. | ||
dest.removeListener('close', onclose) | ||
dest.removeListener('finish', onfinish) | ||
if (ondrain) { | ||
dest.removeListener('drain', ondrain) | ||
} | ||
dest.removeListener('error', onerror) | ||
@@ -697,3 +664,5 @@ dest.removeListener('unpipe', onunpipe) | ||
src.removeListener('data', ondata) | ||
cleanedUp = true // If the reader is waiting for a drain event from this | ||
cleanedUp = true | ||
// If the reader is waiting for a drain event from this | ||
// specific writer, then it would cause it to never start | ||
@@ -703,6 +672,4 @@ // flowing again. | ||
// If we don't know, then assume that we are waiting for one. | ||
if (ondrain && state.awaitDrainWriters && (!dest._writableState || dest._writableState.needDrain)) ondrain() | ||
} | ||
function pause() { | ||
@@ -722,6 +689,4 @@ // If the user unpiped during `dest.write()`, it is possible | ||
} | ||
src.pause() | ||
} | ||
if (!ondrain) { | ||
@@ -736,5 +701,3 @@ // When the dest drains, it reduces the awaitDrain counter | ||
} | ||
src.on('data', ondata) | ||
function ondata(chunk) { | ||
@@ -744,9 +707,9 @@ debug('ondata') | ||
debug('dest.write', ret) | ||
if (ret === false) { | ||
pause() | ||
} | ||
} // If the dest has an error, then stop piping into it. | ||
} | ||
// If the dest has an error, then stop piping into it. | ||
// However, don't suppress the throwing behavior for this. | ||
function onerror(er) { | ||
@@ -756,6 +719,4 @@ debug('onerror', er) | ||
dest.removeListener('error', onerror) | ||
if (dest.listenerCount('error') === 0) { | ||
const s = dest._writableState || dest._readableState | ||
if (s && !s.errorEmitted) { | ||
@@ -768,6 +729,8 @@ // User incorrectly emitted 'error' directly on the stream. | ||
} | ||
} // Make sure our error handler is attached before userland ones. | ||
} | ||
prependListener(dest, 'error', onerror) // Both close and finish should trigger unpipe, but only once. | ||
// Make sure our error handler is attached before userland ones. | ||
prependListener(dest, 'error', onerror) | ||
// Both close and finish should trigger unpipe, but only once. | ||
function onclose() { | ||
@@ -777,5 +740,3 @@ dest.removeListener('finish', onfinish) | ||
} | ||
dest.once('close', onclose) | ||
function onfinish() { | ||
@@ -786,12 +747,13 @@ debug('onfinish') | ||
} | ||
dest.once('finish', onfinish) | ||
function unpipe() { | ||
debug('unpipe') | ||
src.unpipe(dest) | ||
} // Tell the dest that it's being piped to. | ||
} | ||
dest.emit('pipe', src) // Start the flow if it hasn't been started already. | ||
// Tell the dest that it's being piped to. | ||
dest.emit('pipe', src) | ||
// Start the flow if it hasn't been started already. | ||
if (dest.writableNeedDrain === true) { | ||
@@ -805,12 +767,11 @@ if (state.flowing) { | ||
} | ||
return dest | ||
} | ||
function pipeOnDrain(src, dest) { | ||
return function pipeOnDrainFunctionResult() { | ||
const state = src._readableState // `ondrain` will call directly, | ||
const state = src._readableState | ||
// `ondrain` will call directly, | ||
// `this` maybe not a reference to dest, | ||
// so we use the real dest here. | ||
if (state.awaitDrainWriters === dest) { | ||
@@ -823,3 +784,2 @@ debug('pipeOnDrain', 1) | ||
} | ||
if ((!state.awaitDrainWriters || state.awaitDrainWriters.size === 0) && src.listenerCount('data')) { | ||
@@ -830,3 +790,2 @@ src.resume() | ||
} | ||
Readable.prototype.unpipe = function (dest) { | ||
@@ -836,6 +795,6 @@ const state = this._readableState | ||
hasUnpiped: false | ||
} // If we're not piping anywhere, then do nothing. | ||
} | ||
// If we're not piping anywhere, then do nothing. | ||
if (state.pipes.length === 0) return this | ||
if (!dest) { | ||
@@ -846,3 +805,2 @@ // remove all. | ||
this.pause() | ||
for (let i = 0; i < dests.length; i++) | ||
@@ -852,6 +810,6 @@ dests[i].emit('unpipe', this, { | ||
}) | ||
return this | ||
} // Try to find the right one. | ||
} | ||
// Try to find the right one. | ||
const index = ArrayPrototypeIndexOf(state.pipes, dest) | ||
@@ -863,14 +821,15 @@ if (index === -1) return this | ||
return this | ||
} // Set up data events if they are asked for | ||
} | ||
// Set up data events if they are asked for | ||
// Ensure readable listeners eventually get something. | ||
Readable.prototype.on = function (ev, fn) { | ||
const res = Stream.prototype.on.call(this, ev, fn) | ||
const state = this._readableState | ||
if (ev === 'data') { | ||
// Update readableListening so that resume() may be a no-op | ||
// a few lines down. This is needed to support once('readable'). | ||
state.readableListening = this.listenerCount('readable') > 0 // Try start flowing on next tick if stream isn't explicitly paused. | ||
state.readableListening = this.listenerCount('readable') > 0 | ||
// Try start flowing on next tick if stream isn't explicitly paused. | ||
if (state.flowing !== false) this.resume() | ||
@@ -883,3 +842,2 @@ } else if (ev === 'readable') { | ||
debug('on readable', state.length, state.reading) | ||
if (state.length) { | ||
@@ -892,11 +850,7 @@ emitReadable(this) | ||
} | ||
return res | ||
} | ||
Readable.prototype.addListener = Readable.prototype.on | ||
Readable.prototype.removeListener = function (ev, fn) { | ||
const res = Stream.prototype.removeListener.call(this, ev, fn) | ||
if (ev === 'readable') { | ||
@@ -911,11 +865,7 @@ // We need to check if there is someone still listening to | ||
} | ||
return res | ||
} | ||
Readable.prototype.off = Readable.prototype.removeListener | ||
Readable.prototype.removeAllListeners = function (ev) { | ||
const res = Stream.prototype.removeAllListeners.apply(this, arguments) | ||
if (ev === 'readable' || ev === undefined) { | ||
@@ -930,14 +880,13 @@ // We need to check if there is someone still listening to | ||
} | ||
return res | ||
} | ||
function updateReadableListening(self) { | ||
const state = self._readableState | ||
state.readableListening = self.listenerCount('readable') > 0 | ||
if (state.resumeScheduled && state[kPaused] === false) { | ||
// Flowing needs to be set to true now, otherwise | ||
// the upcoming resume will not flow. | ||
state.flowing = true // Crude way to check if we should resume. | ||
state.flowing = true | ||
// Crude way to check if we should resume. | ||
} else if (self.listenerCount('data') > 0) { | ||
@@ -949,25 +898,22 @@ self.resume() | ||
} | ||
function nReadingNextTick(self) { | ||
debug('readable nexttick read 0') | ||
self.read(0) | ||
} // pause() and resume() are remnants of the legacy readable stream API | ||
} | ||
// pause() and resume() are remnants of the legacy readable stream API | ||
// If the user uses them, then switch into old mode. | ||
Readable.prototype.resume = function () { | ||
const state = this._readableState | ||
if (!state.flowing) { | ||
debug('resume') // We flow only if there is no one listening | ||
debug('resume') | ||
// We flow only if there is no one listening | ||
// for readable, but we still have to call | ||
// resume(). | ||
state.flowing = !state.readableListening | ||
resume(this, state) | ||
} | ||
state[kPaused] = false | ||
return this | ||
} | ||
function resume(stream, state) { | ||
@@ -979,10 +925,7 @@ if (!state.resumeScheduled) { | ||
} | ||
function resume_(stream, state) { | ||
debug('resume', state.reading) | ||
if (!state.reading) { | ||
stream.read(0) | ||
} | ||
state.resumeScheduled = false | ||
@@ -993,6 +936,4 @@ stream.emit('resume') | ||
} | ||
Readable.prototype.pause = function () { | ||
debug('call pause flowing=%j', this._readableState.flowing) | ||
if (this._readableState.flowing !== false) { | ||
@@ -1003,18 +944,18 @@ debug('pause') | ||
} | ||
this._readableState[kPaused] = true | ||
return this | ||
} | ||
function flow(stream) { | ||
const state = stream._readableState | ||
debug('flow', state.flowing) | ||
while (state.flowing && stream.read() !== null); | ||
} | ||
while (state.flowing && stream.read() !== null); | ||
} // Wrap an old-style stream as the async data source. | ||
// Wrap an old-style stream as the async data source. | ||
// This is *not* part of the readable stream interface. | ||
// It is an ugly unfortunate mess of history. | ||
Readable.prototype.wrap = function (stream) { | ||
let paused = false | ||
Readable.prototype.wrap = function (stream) { | ||
let paused = false // TODO (ronag): Should this.destroy(err) emit | ||
// TODO (ronag): Should this.destroy(err) emit | ||
// 'error' on the wrapped stream? Would require | ||
@@ -1041,3 +982,2 @@ // a static factory method, e.g. Readable.wrap(stream). | ||
}) | ||
this._read = () => { | ||
@@ -1048,9 +988,8 @@ if (paused && stream.resume) { | ||
} | ||
} // Proxy all the other methods. Important when wrapping filters and duplexes. | ||
} | ||
// Proxy all the other methods. Important when wrapping filters and duplexes. | ||
const streamKeys = ObjectKeys(stream) | ||
for (let j = 1; j < streamKeys.length; j++) { | ||
const i = streamKeys[j] | ||
if (this[i] === undefined && typeof stream[i] === 'function') { | ||
@@ -1060,10 +999,7 @@ this[i] = stream[i].bind(stream) | ||
} | ||
return this | ||
} | ||
Readable.prototype[SymbolAsyncIterator] = function () { | ||
return streamToAsyncIterator(this) | ||
} | ||
Readable.prototype.iterator = function (options) { | ||
@@ -1073,6 +1009,4 @@ if (options !== undefined) { | ||
} | ||
return streamToAsyncIterator(this, options) | ||
} | ||
function streamToAsyncIterator(stream, options) { | ||
@@ -1084,3 +1018,2 @@ if (typeof stream.read !== 'function') { | ||
} | ||
const iter = createAsyncIterator(stream, options) | ||
@@ -1090,6 +1023,4 @@ iter.stream = stream | ||
} | ||
async function* createAsyncIterator(stream, options) { | ||
let callback = nop | ||
function next(resolve) { | ||
@@ -1103,3 +1034,2 @@ if (this === stream) { | ||
} | ||
stream.on('readable', next) | ||
@@ -1118,7 +1048,5 @@ let error | ||
) | ||
try { | ||
while (true) { | ||
const chunk = stream.destroyed ? null : stream.read() | ||
if (chunk !== null) { | ||
@@ -1148,19 +1076,18 @@ yield chunk | ||
} | ||
} // Making it explicit these properties are not enumerable | ||
} | ||
// Making it explicit these properties are not enumerable | ||
// because otherwise some prototype manipulation in | ||
// userland will fail. | ||
ObjectDefineProperties(Readable.prototype, { | ||
readable: { | ||
__proto__: null, | ||
get() { | ||
const r = this._readableState // r.readable === false means that this is part of a Duplex stream | ||
const r = this._readableState | ||
// r.readable === false means that this is part of a Duplex stream | ||
// where the readable side was disabled upon construction. | ||
// Compat. The user might manually disable readable side through | ||
// deprecated setter. | ||
return !!r && r.readable !== false && !r.destroyed && !r.errorEmitted && !r.endEmitted | ||
}, | ||
set(val) { | ||
@@ -1220,3 +1147,2 @@ // Backwards compat. | ||
enumerable: false, | ||
get() { | ||
@@ -1229,3 +1155,2 @@ return this._readableState.length | ||
enumerable: false, | ||
get() { | ||
@@ -1238,3 +1163,2 @@ return this._readableState ? this._readableState.objectMode : false | ||
enumerable: false, | ||
get() { | ||
@@ -1247,3 +1171,2 @@ return this._readableState ? this._readableState.encoding : null | ||
enumerable: false, | ||
get() { | ||
@@ -1255,3 +1178,2 @@ return this._readableState ? this._readableState.errored : null | ||
__proto__: null, | ||
get() { | ||
@@ -1264,7 +1186,5 @@ return this._readableState ? this._readableState.closed : false | ||
enumerable: false, | ||
get() { | ||
return this._readableState ? this._readableState.destroyed : false | ||
}, | ||
set(value) { | ||
@@ -1275,5 +1195,6 @@ // We ignore the value if the stream | ||
return | ||
} // Backward compatibility, the user is explicitly | ||
} | ||
// Backward compatibility, the user is explicitly | ||
// managing destroyed. | ||
this._readableState.destroyed = value | ||
@@ -1285,3 +1206,2 @@ } | ||
enumerable: false, | ||
get() { | ||
@@ -1296,3 +1216,2 @@ return this._readableState ? this._readableState.endEmitted : false | ||
__proto__: null, | ||
get() { | ||
@@ -1305,7 +1224,5 @@ return this.pipes.length | ||
__proto__: null, | ||
get() { | ||
return this[kPaused] !== false | ||
}, | ||
set(value) { | ||
@@ -1315,9 +1232,11 @@ this[kPaused] = !!value | ||
} | ||
}) // Exposed for testing purposes only. | ||
}) | ||
Readable._fromList = fromList // Pluck off n bytes from an array of buffers. | ||
// Exposed for testing purposes only. | ||
Readable._fromList = fromList | ||
// Pluck off n bytes from an array of buffers. | ||
// Length is the combined lengths of all the buffers in the list. | ||
// This function is designed to be inlinable, so please take care when making | ||
// changes to the function body. | ||
function fromList(n, state) { | ||
@@ -1340,7 +1259,5 @@ // nothing buffered. | ||
} | ||
function endReadable(stream) { | ||
const state = stream._readableState | ||
debug('endReadable', state.endEmitted) | ||
if (!state.endEmitted) { | ||
@@ -1351,10 +1268,9 @@ state.ended = true | ||
} | ||
function endReadableNT(state, stream) { | ||
debug('endReadableNT', state.endEmitted, state.length) // Check that we didn't get one last unshift. | ||
debug('endReadableNT', state.endEmitted, state.length) | ||
// Check that we didn't get one last unshift. | ||
if (!state.errored && !state.closeEmitted && !state.endEmitted && state.length === 0) { | ||
state.endEmitted = true | ||
stream.emit('end') | ||
if (stream.writable && stream.allowHalfOpen === false) { | ||
@@ -1368,6 +1284,6 @@ process.nextTick(endWritableNT, stream) | ||
!wState || | ||
(wState.autoDestroy && // We don't expect the writable to ever 'finish' | ||
(wState.autoDestroy && | ||
// We don't expect the writable to ever 'finish' | ||
// if writable is explicitly set to false. | ||
(wState.finished || wState.writable === false)) | ||
if (autoDestroy) { | ||
@@ -1379,6 +1295,4 @@ stream.destroy() | ||
} | ||
function endWritableNT(stream) { | ||
const writable = stream.writable && !stream.writableEnded && !stream.destroyed | ||
if (writable) { | ||
@@ -1388,9 +1302,8 @@ stream.end() | ||
} | ||
Readable.from = function (iterable, opts) { | ||
return from(Readable, iterable, opts) | ||
} | ||
let webStreamsAdapters | ||
let webStreamsAdapters // Lazy to avoid circular references | ||
// Lazy to avoid circular references | ||
function lazyWebStreams() { | ||
@@ -1400,14 +1313,10 @@ if (webStreamsAdapters === undefined) webStreamsAdapters = {} | ||
} | ||
Readable.fromWeb = function (readableStream, options) { | ||
return lazyWebStreams().newStreamReadableFromReadableStream(readableStream, options) | ||
} | ||
Readable.toWeb = function (streamReadable, options) { | ||
return lazyWebStreams().newReadableStreamFromStreamReadable(streamReadable, options) | ||
} | ||
Readable.wrap = function (src, options) { | ||
var _ref, _src$readableObjectMo | ||
return new Readable({ | ||
@@ -1422,3 +1331,2 @@ objectMode: | ||
...options, | ||
destroy(err, callback) { | ||
@@ -1425,0 +1333,0 @@ destroyImpl.destroyer(src, err) |
'use strict' | ||
const { MathFloor, NumberIsInteger } = require('../../ours/primordials') | ||
const { ERR_INVALID_ARG_VALUE } = require('../../ours/errors').codes | ||
function highWaterMarkFrom(options, isDuplex, duplexKey) { | ||
return options.highWaterMark != null ? options.highWaterMark : isDuplex ? options[duplexKey] : null | ||
} | ||
function getDefaultHighWaterMark(objectMode) { | ||
return objectMode ? 16 : 16 * 1024 | ||
} | ||
function getHighWaterMark(state, options, duplexKey, isDuplex) { | ||
const hwm = highWaterMarkFrom(options, isDuplex, duplexKey) | ||
if (hwm != null) { | ||
@@ -23,9 +18,8 @@ if (!NumberIsInteger(hwm) || hwm < 0) { | ||
} | ||
return MathFloor(hwm) | ||
} // Default value | ||
} | ||
// Default value | ||
return getDefaultHighWaterMark(state.objectMode) | ||
} | ||
module.exports = { | ||
@@ -32,0 +26,0 @@ getHighWaterMark, |
@@ -21,2 +21,3 @@ // Copyright Joyent, Inc. and other Node contributors. | ||
// USE OR OTHER DEALINGS IN THE SOFTWARE. | ||
// a transform stream is a readable/writable stream where you do | ||
@@ -63,25 +64,20 @@ // something with the data. Sometimes it's called a "filter", | ||
// the results of the previous transformed chunk were consumed. | ||
'use strict' | ||
const { ObjectSetPrototypeOf, Symbol } = require('../../ours/primordials') | ||
module.exports = Transform | ||
const { ERR_METHOD_NOT_IMPLEMENTED } = require('../../ours/errors').codes | ||
const Duplex = require('./duplex') | ||
const { getHighWaterMark } = require('./state') | ||
ObjectSetPrototypeOf(Transform.prototype, Duplex.prototype) | ||
ObjectSetPrototypeOf(Transform, Duplex) | ||
const kCallback = Symbol('kCallback') | ||
function Transform(options) { | ||
if (!(this instanceof Transform)) return new Transform(options) | ||
function Transform(options) { | ||
if (!(this instanceof Transform)) return new Transform(options) // TODO (ronag): This should preferably always be | ||
// TODO (ronag): This should preferably always be | ||
// applied but would be semver-major. Or even better; | ||
// make Transform a Readable with the Writable interface. | ||
const readableHighWaterMark = options ? getHighWaterMark(this, options, 'readableHighWaterMark', true) : null | ||
if (readableHighWaterMark === 0) { | ||
@@ -102,21 +98,20 @@ // A Duplex will buffer both on the writable and readable side while | ||
} | ||
Duplex.call(this, options) | ||
Duplex.call(this, options) // We have implemented the _read method, and done the other things | ||
// We have implemented the _read method, and done the other things | ||
// that Readable wants before the first _read call, so unset the | ||
// sync guard flag. | ||
this._readableState.sync = false | ||
this[kCallback] = null | ||
if (options) { | ||
if (typeof options.transform === 'function') this._transform = options.transform | ||
if (typeof options.flush === 'function') this._flush = options.flush | ||
} // When the writable side finishes, then flush out anything remaining. | ||
} | ||
// When the writable side finishes, then flush out anything remaining. | ||
// Backwards compat. Some Transform streams incorrectly implement _final | ||
// instead of or in addition to _flush. By using 'prefinish' instead of | ||
// implementing _final we continue supporting this unfortunate use case. | ||
this.on('prefinish', prefinish) | ||
} | ||
function final(cb) { | ||
@@ -131,12 +126,8 @@ if (typeof this._flush === 'function' && !this.destroyed) { | ||
} | ||
return | ||
} | ||
if (data != null) { | ||
this.push(data) | ||
} | ||
this.push(null) | ||
if (cb) { | ||
@@ -148,3 +139,2 @@ cb() | ||
this.push(null) | ||
if (cb) { | ||
@@ -155,3 +145,2 @@ cb() | ||
} | ||
function prefinish() { | ||
@@ -162,9 +151,6 @@ if (this._final !== final) { | ||
} | ||
Transform.prototype._final = final | ||
Transform.prototype._transform = function (chunk, encoding, callback) { | ||
throw new ERR_METHOD_NOT_IMPLEMENTED('_transform()') | ||
} | ||
Transform.prototype._write = function (chunk, encoding, callback) { | ||
@@ -174,3 +160,2 @@ const rState = this._readableState | ||
const length = rState.length | ||
this._transform(chunk, encoding, (err, val) => { | ||
@@ -181,10 +166,10 @@ if (err) { | ||
} | ||
if (val != null) { | ||
this.push(val) | ||
} | ||
if ( | ||
wState.ended || // Backwards compat. | ||
length === rState.length || // Backwards compat. | ||
wState.ended || | ||
// Backwards compat. | ||
length === rState.length || | ||
// Backwards compat. | ||
rState.length < rState.highWaterMark | ||
@@ -198,3 +183,2 @@ ) { | ||
} | ||
Transform.prototype._read = function () { | ||
@@ -201,0 +185,0 @@ if (this[kCallback]) { |
'use strict' | ||
const { Symbol, SymbolAsyncIterator, SymbolIterator } = require('../../ours/primordials') | ||
const kDestroyed = Symbol('kDestroyed') | ||
@@ -9,6 +8,4 @@ const kIsErrored = Symbol('kIsErrored') | ||
const kIsDisturbed = Symbol('kIsDisturbed') | ||
function isReadableNodeStream(obj, strict = false) { | ||
var _obj$_readableState | ||
return !!( | ||
@@ -23,3 +20,4 @@ ( | ||
? undefined | ||
: _obj$_readableState.readable) !== false) && // Duplex | ||
: _obj$_readableState.readable) !== false) && | ||
// Duplex | ||
(!obj._writableState || obj._readableState) | ||
@@ -32,3 +30,2 @@ ) // Writable has .pipe. | ||
var _obj$_writableState | ||
return !!( | ||
@@ -56,3 +53,2 @@ ( | ||
} | ||
function isNodeStream(obj) { | ||
@@ -67,3 +63,2 @@ return ( | ||
} | ||
function isIterable(obj, isAsync) { | ||
@@ -75,3 +70,2 @@ if (obj == null) return false | ||
} | ||
function isDestroyed(stream) { | ||
@@ -83,4 +77,5 @@ if (!isNodeStream(stream)) return null | ||
return !!(stream.destroyed || stream[kDestroyed] || (state !== null && state !== undefined && state.destroyed)) | ||
} // Have been end():d. | ||
} | ||
// Have been end():d. | ||
function isWritableEnded(stream) { | ||
@@ -93,4 +88,5 @@ if (!isWritableNodeStream(stream)) return null | ||
return wState.ended | ||
} // Have emitted 'finish'. | ||
} | ||
// Have emitted 'finish'. | ||
function isWritableFinished(stream, strict) { | ||
@@ -103,4 +99,5 @@ if (!isWritableNodeStream(stream)) return null | ||
return !!(wState.finished || (strict === false && wState.ended === true && wState.length === 0)) | ||
} // Have been push(null):d. | ||
} | ||
// Have been push(null):d. | ||
function isReadableEnded(stream) { | ||
@@ -113,4 +110,5 @@ if (!isReadableNodeStream(stream)) return null | ||
return rState.ended | ||
} // Have emitted 'end'. | ||
} | ||
// Have emitted 'end'. | ||
function isReadableFinished(stream, strict) { | ||
@@ -123,3 +121,2 @@ if (!isReadableNodeStream(stream)) return null | ||
} | ||
function isReadable(stream) { | ||
@@ -131,3 +128,2 @@ if (stream && stream[kIsReadable] != null) return stream[kIsReadable] | ||
} | ||
function isWritable(stream) { | ||
@@ -138,3 +134,2 @@ if (typeof (stream === null || stream === undefined ? undefined : stream.writable) !== 'boolean') return null | ||
} | ||
function isFinished(stream, opts) { | ||
@@ -144,29 +139,21 @@ if (!isNodeStream(stream)) { | ||
} | ||
if (isDestroyed(stream)) { | ||
return true | ||
} | ||
if ((opts === null || opts === undefined ? undefined : opts.readable) !== false && isReadable(stream)) { | ||
return false | ||
} | ||
if ((opts === null || opts === undefined ? undefined : opts.writable) !== false && isWritable(stream)) { | ||
return false | ||
} | ||
return true | ||
} | ||
function isWritableErrored(stream) { | ||
var _stream$_writableStat, _stream$_writableStat2 | ||
if (!isNodeStream(stream)) { | ||
return null | ||
} | ||
if (stream.writableErrored) { | ||
return stream.writableErrored | ||
} | ||
return (_stream$_writableStat = | ||
@@ -179,14 +166,10 @@ (_stream$_writableStat2 = stream._writableState) === null || _stream$_writableStat2 === undefined | ||
} | ||
function isReadableErrored(stream) { | ||
var _stream$_readableStat, _stream$_readableStat2 | ||
if (!isNodeStream(stream)) { | ||
return null | ||
} | ||
if (stream.readableErrored) { | ||
return stream.readableErrored | ||
} | ||
return (_stream$_readableStat = | ||
@@ -199,3 +182,2 @@ (_stream$_readableStat2 = stream._readableState) === null || _stream$_readableStat2 === undefined | ||
} | ||
function isClosed(stream) { | ||
@@ -205,10 +187,7 @@ if (!isNodeStream(stream)) { | ||
} | ||
if (typeof stream.closed === 'boolean') { | ||
return stream.closed | ||
} | ||
const wState = stream._writableState | ||
const rState = stream._readableState | ||
if ( | ||
@@ -223,10 +202,7 @@ typeof (wState === null || wState === undefined ? undefined : wState.closed) === 'boolean' || | ||
} | ||
if (typeof stream._closed === 'boolean' && isOutgoingMessage(stream)) { | ||
return stream._closed | ||
} | ||
return null | ||
} | ||
function isOutgoingMessage(stream) { | ||
@@ -240,10 +216,7 @@ return ( | ||
} | ||
function isServerResponse(stream) { | ||
return typeof stream._sent100 === 'boolean' && isOutgoingMessage(stream) | ||
} | ||
function isServerRequest(stream) { | ||
var _stream$req | ||
return ( | ||
@@ -256,3 +229,2 @@ typeof stream._consuming === 'boolean' && | ||
} | ||
function willEmitClose(stream) { | ||
@@ -267,6 +239,4 @@ if (!isNodeStream(stream)) return null | ||
} | ||
function isDisturbed(stream) { | ||
var _stream$kIsDisturbed | ||
return !!( | ||
@@ -279,3 +249,2 @@ stream && | ||
} | ||
function isErrored(stream) { | ||
@@ -292,3 +261,2 @@ var _ref, | ||
_stream$_writableStat4 | ||
return !!( | ||
@@ -324,3 +292,2 @@ stream && | ||
} | ||
module.exports = { | ||
@@ -327,0 +294,0 @@ kDestroyed, |
/* replacement start */ | ||
const process = require('process') | ||
const process = require('process/') | ||
/* replacement end */ | ||
@@ -24,2 +26,3 @@ // Copyright Joyent, Inc. and other Node contributors. | ||
// USE OR OTHER DEALINGS IN THE SOFTWARE. | ||
// A bit simpler than readable streams. | ||
@@ -30,3 +33,2 @@ // Implement an async ._write(chunk, encoding, cb), and it'll handle all | ||
;('use strict') | ||
const { | ||
@@ -43,18 +45,10 @@ ArrayPrototypeSlice, | ||
} = require('../../ours/primordials') | ||
module.exports = Writable | ||
Writable.WritableState = WritableState | ||
const { EventEmitter: EE } = require('events') | ||
const Stream = require('./legacy').Stream | ||
const { Buffer } = require('buffer') | ||
const destroyImpl = require('./destroy') | ||
const { addAbortSignal } = require('./add-abort-signal') | ||
const { getHighWaterMark, getDefaultHighWaterMark } = require('./state') | ||
const { | ||
@@ -71,11 +65,7 @@ ERR_INVALID_ARG_TYPE, | ||
} = require('../../ours/errors').codes | ||
const { errorOrDestroy } = destroyImpl | ||
ObjectSetPrototypeOf(Writable.prototype, Stream.prototype) | ||
ObjectSetPrototypeOf(Writable, Stream) | ||
function nop() {} | ||
const kOnFinished = Symbol('kOnFinished') | ||
function WritableState(options, stream, isDuplex) { | ||
@@ -87,90 +77,114 @@ // Duplex streams are both readable and writable, but share | ||
// e.g. options.readableObjectMode vs. options.writableObjectMode, etc. | ||
if (typeof isDuplex !== 'boolean') isDuplex = stream instanceof require('./duplex') // Object stream flag to indicate whether or not this stream | ||
if (typeof isDuplex !== 'boolean') isDuplex = stream instanceof require('./duplex') | ||
// Object stream flag to indicate whether or not this stream | ||
// contains buffers or objects. | ||
this.objectMode = !!(options && options.objectMode) | ||
if (isDuplex) this.objectMode = this.objectMode || !!(options && options.writableObjectMode) | ||
this.objectMode = !!(options && options.objectMode) | ||
if (isDuplex) this.objectMode = this.objectMode || !!(options && options.writableObjectMode) // The point at which write() starts returning false | ||
// The point at which write() starts returning false | ||
// Note: 0 is a valid value, means that we always return false if | ||
// the entire buffer is not flushed immediately on write(). | ||
this.highWaterMark = options | ||
? getHighWaterMark(this, options, 'writableHighWaterMark', isDuplex) | ||
: getDefaultHighWaterMark(false) // if _final has been called. | ||
: getDefaultHighWaterMark(false) | ||
this.finalCalled = false // drain event flag. | ||
// if _final has been called. | ||
this.finalCalled = false | ||
this.needDrain = false // At the start of calling end() | ||
// drain event flag. | ||
this.needDrain = false | ||
// At the start of calling end() | ||
this.ending = false | ||
// When end() has been called, and returned. | ||
this.ended = false | ||
// When 'finish' is emitted. | ||
this.finished = false | ||
this.ending = false // When end() has been called, and returned. | ||
// Has it been destroyed | ||
this.destroyed = false | ||
this.ended = false // When 'finish' is emitted. | ||
this.finished = false // Has it been destroyed | ||
this.destroyed = false // Should we decode strings into buffers before passing to _write? | ||
// Should we decode strings into buffers before passing to _write? | ||
// this is here so that some node-core streams can optimize string | ||
// handling at a lower level. | ||
const noDecode = !!(options && options.decodeStrings === false) | ||
this.decodeStrings = !noDecode | ||
const noDecode = !!(options && options.decodeStrings === false) | ||
this.decodeStrings = !noDecode // Crypto is kind of old and crusty. Historically, its default string | ||
// Crypto is kind of old and crusty. Historically, its default string | ||
// encoding is 'binary' so we have to make this configurable. | ||
// Everything else in the universe uses 'utf8', though. | ||
this.defaultEncoding = (options && options.defaultEncoding) || 'utf8' | ||
this.defaultEncoding = (options && options.defaultEncoding) || 'utf8' // Not an actual buffer we keep track of, but a measurement | ||
// Not an actual buffer we keep track of, but a measurement | ||
// of how much we're waiting to get pushed to some underlying | ||
// socket or file. | ||
this.length = 0 | ||
this.length = 0 // A flag to see when we're in the middle of a write. | ||
// A flag to see when we're in the middle of a write. | ||
this.writing = false | ||
this.writing = false // When true all writes will be buffered until .uncork() call. | ||
// When true all writes will be buffered until .uncork() call. | ||
this.corked = 0 | ||
this.corked = 0 // A flag to be able to tell if the onwrite cb is called immediately, | ||
// A flag to be able to tell if the onwrite cb is called immediately, | ||
// or on a later tick. We set this to true at first, because any | ||
// actions that shouldn't happen until "later" should generally also | ||
// not happen before the first write call. | ||
this.sync = true | ||
this.sync = true // A flag to know if we're processing previously buffered items, which | ||
// A flag to know if we're processing previously buffered items, which | ||
// may call the _write() callback in the same tick, so that we don't | ||
// end up in an overlapped onwrite situation. | ||
this.bufferProcessing = false | ||
this.bufferProcessing = false // The callback that's passed to _write(chunk, cb). | ||
// The callback that's passed to _write(chunk, cb). | ||
this.onwrite = onwrite.bind(undefined, stream) | ||
this.onwrite = onwrite.bind(undefined, stream) // The callback that the user supplies to write(chunk, encoding, cb). | ||
// The callback that the user supplies to write(chunk, encoding, cb). | ||
this.writecb = null | ||
this.writecb = null // The amount that is being written when _write is called. | ||
// The amount that is being written when _write is called. | ||
this.writelen = 0 | ||
this.writelen = 0 // Storage for data passed to the afterWrite() callback in case of | ||
// Storage for data passed to the afterWrite() callback in case of | ||
// synchronous _write() completion. | ||
this.afterWriteTickInfo = null | ||
resetBuffer(this) | ||
this.afterWriteTickInfo = null | ||
resetBuffer(this) // Number of pending user-supplied write callbacks | ||
// Number of pending user-supplied write callbacks | ||
// this must be 0 before 'finish' can be emitted. | ||
this.pendingcb = 0 | ||
this.pendingcb = 0 // Stream is still being constructed and cannot be | ||
// Stream is still being constructed and cannot be | ||
// destroyed until construction finished or failed. | ||
// Async construction is opt in, therefore we start as | ||
// constructed. | ||
this.constructed = true | ||
this.constructed = true // Emit prefinish if the only thing we're waiting for is _write cbs | ||
// Emit prefinish if the only thing we're waiting for is _write cbs | ||
// This is relevant for synchronous Transform streams. | ||
this.prefinished = false | ||
this.prefinished = false // True if the error was already emitted and should not be thrown again. | ||
// True if the error was already emitted and should not be thrown again. | ||
this.errorEmitted = false | ||
this.errorEmitted = false // Should close be emitted on destroy. Defaults to true. | ||
// Should close be emitted on destroy. Defaults to true. | ||
this.emitClose = !options || options.emitClose !== false | ||
this.emitClose = !options || options.emitClose !== false // Should .destroy() be called after 'finish' (and potentially 'end'). | ||
// Should .destroy() be called after 'finish' (and potentially 'end'). | ||
this.autoDestroy = !options || options.autoDestroy !== false | ||
this.autoDestroy = !options || options.autoDestroy !== false // Indicates whether the stream has errored. When true all write() calls | ||
// Indicates whether the stream has errored. When true all write() calls | ||
// should return false. This is needed since when autoDestroy | ||
// is disabled we need a way to tell whether the stream has failed. | ||
this.errored = null | ||
this.errored = null // Indicates whether the stream has finished destroying. | ||
// Indicates whether the stream has finished destroying. | ||
this.closed = false | ||
this.closed = false // True if close has been emitted or would have been emitted | ||
// True if close has been emitted or would have been emitted | ||
// depending on emitClose. | ||
this.closeEmitted = false | ||
this[kOnFinished] = [] | ||
} | ||
function resetBuffer(state) { | ||
@@ -182,10 +196,7 @@ state.buffered = [] | ||
} | ||
WritableState.prototype.getBuffer = function getBuffer() { | ||
return ArrayPrototypeSlice(this.buffered, this.bufferedIndex) | ||
} | ||
ObjectDefineProperty(WritableState.prototype, 'bufferedRequestCount', { | ||
__proto__: null, | ||
get() { | ||
@@ -195,3 +206,2 @@ return this.buffered.length - this.bufferedIndex | ||
}) | ||
function Writable(options) { | ||
@@ -201,12 +211,12 @@ // Writable ctor is applied to Duplexes, too. | ||
// would return false, as no `_writableState` property is attached. | ||
// Trying to use the custom `instanceof` for Writable here will also break the | ||
// Node.js LazyTransform implementation, which has a non-trivial getter for | ||
// `_writableState` that would lead to infinite recursion. | ||
// Checking for a Stream.Duplex instance is faster here instead of inside | ||
// the WritableState constructor, at least with V8 6.5. | ||
const isDuplex = this instanceof require('./duplex') | ||
if (!isDuplex && !FunctionPrototypeSymbolHasInstance(Writable, this)) return new Writable(options) | ||
this._writableState = new WritableState(options, this, isDuplex) | ||
if (options) { | ||
@@ -220,15 +230,11 @@ if (typeof options.write === 'function') this._write = options.write | ||
} | ||
Stream.call(this, options) | ||
destroyImpl.construct(this, () => { | ||
const state = this._writableState | ||
if (!state.writing) { | ||
clearBuffer(this, state) | ||
} | ||
finishMaybe(this, state) | ||
}) | ||
} | ||
ObjectDefineProperty(Writable, SymbolHasInstance, { | ||
@@ -241,11 +247,10 @@ __proto__: null, | ||
} | ||
}) // Otherwise people can pipe Writable streams, which is just wrong. | ||
}) | ||
// Otherwise people can pipe Writable streams, which is just wrong. | ||
Writable.prototype.pipe = function () { | ||
errorOrDestroy(this, new ERR_STREAM_CANNOT_PIPE()) | ||
} | ||
function _write(stream, chunk, encoding, cb) { | ||
const state = stream._writableState | ||
if (typeof encoding === 'function') { | ||
@@ -259,3 +264,2 @@ cb = encoding | ||
} | ||
if (chunk === null) { | ||
@@ -278,5 +282,3 @@ throw new ERR_STREAM_NULL_VALUES() | ||
} | ||
let err | ||
if (state.ending) { | ||
@@ -287,3 +289,2 @@ err = new ERR_STREAM_WRITE_AFTER_END() | ||
} | ||
if (err) { | ||
@@ -294,18 +295,13 @@ process.nextTick(cb, err) | ||
} | ||
state.pendingcb++ | ||
return writeOrBuffer(stream, state, chunk, encoding, cb) | ||
} | ||
Writable.prototype.write = function (chunk, encoding, cb) { | ||
return _write(this, chunk, encoding, cb) === true | ||
} | ||
Writable.prototype.cork = function () { | ||
this._writableState.corked++ | ||
} | ||
Writable.prototype.uncork = function () { | ||
const state = this._writableState | ||
if (state.corked) { | ||
@@ -316,3 +312,2 @@ state.corked-- | ||
} | ||
Writable.prototype.setDefaultEncoding = function setDefaultEncoding(encoding) { | ||
@@ -324,14 +319,15 @@ // node::ParseEncoding() requires lower case. | ||
return this | ||
} // If we're already writing something, then just put this | ||
} | ||
// If we're already writing something, then just put this | ||
// in the queue, and wait our turn. Otherwise, call _write | ||
// If we return false, then we need a drain event, so set that flag. | ||
function writeOrBuffer(stream, state, chunk, encoding, callback) { | ||
const len = state.objectMode ? 1 : chunk.length | ||
state.length += len // stream._write resets state.length | ||
state.length += len | ||
const ret = state.length < state.highWaterMark // We must ensure that previous needDrain will not be reset to false. | ||
// stream._write resets state.length | ||
const ret = state.length < state.highWaterMark | ||
// We must ensure that previous needDrain will not be reset to false. | ||
if (!ret) state.needDrain = true | ||
if (state.writing || state.corked || state.errored || !state.constructed) { | ||
@@ -343,7 +339,5 @@ state.buffered.push({ | ||
}) | ||
if (state.allBuffers && encoding !== 'buffer') { | ||
state.allBuffers = false | ||
} | ||
if (state.allNoop && callback !== nop) { | ||
@@ -357,12 +351,10 @@ state.allNoop = false | ||
state.sync = true | ||
stream._write(chunk, encoding, state.onwrite) | ||
state.sync = false | ||
} | ||
state.sync = false | ||
} // Return false if errored or destroyed in order to break | ||
// Return false if errored or destroyed in order to break | ||
// any synchronous while(stream.write(data)) loops. | ||
return ret && !state.errored && !state.destroyed | ||
} | ||
function doWrite(stream, state, writev, len, chunk, encoding, cb) { | ||
@@ -378,15 +370,13 @@ state.writelen = len | ||
} | ||
function onwriteError(stream, state, er, cb) { | ||
--state.pendingcb | ||
cb(er) // Ensure callbacks are invoked even when autoDestroy is | ||
cb(er) | ||
// Ensure callbacks are invoked even when autoDestroy is | ||
// not enabled. Passing `er` here doesn't make sense since | ||
// it's related to one specific write, not to the buffered | ||
// writes. | ||
errorBuffer(state) // This can emit error, but error must always follow cb. | ||
errorBuffer(state) | ||
// This can emit error, but error must always follow cb. | ||
errorOrDestroy(stream, er) | ||
} | ||
function onwrite(stream, er) { | ||
@@ -396,3 +386,2 @@ const state = stream._writableState | ||
const cb = state.writecb | ||
if (typeof cb !== 'function') { | ||
@@ -402,3 +391,2 @@ errorOrDestroy(stream, new ERR_MULTIPLE_CALLBACK()) | ||
} | ||
state.writing = false | ||
@@ -408,3 +396,2 @@ state.writecb = null | ||
state.writelen = 0 | ||
if (er) { | ||
@@ -416,9 +403,9 @@ // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364 | ||
state.errored = er | ||
} // In case of duplex streams we need to notify the readable side of the | ||
} | ||
// In case of duplex streams we need to notify the readable side of the | ||
// error. | ||
if (stream._readableState && !stream._readableState.errored) { | ||
stream._readableState.errored = er | ||
} | ||
if (sync) { | ||
@@ -433,3 +420,2 @@ process.nextTick(onwriteError, stream, state, er, cb) | ||
} | ||
if (sync) { | ||
@@ -456,3 +442,2 @@ // It is a common case that the callback passed to .write() is always | ||
} | ||
function afterWriteTick({ stream, state, count, cb }) { | ||
@@ -462,6 +447,4 @@ state.afterWriteTickInfo = null | ||
} | ||
function afterWrite(stream, state, count, cb) { | ||
const needDrain = !state.ending && !stream.destroyed && state.length === 0 && state.needDrain | ||
if (needDrain) { | ||
@@ -471,3 +454,2 @@ state.needDrain = false | ||
} | ||
while (count-- > 0) { | ||
@@ -477,10 +459,9 @@ state.pendingcb-- | ||
} | ||
if (state.destroyed) { | ||
errorBuffer(state) | ||
} | ||
finishMaybe(stream, state) | ||
} // If there's something in the buffer waiting, then invoke callbacks. | ||
} | ||
// If there's something in the buffer waiting, then invoke callbacks. | ||
function errorBuffer(state) { | ||
@@ -490,6 +471,4 @@ if (state.writing) { | ||
} | ||
for (let n = state.bufferedIndex; n < state.buffered.length; ++n) { | ||
var _state$errored | ||
const { chunk, callback } = state.buffered[n] | ||
@@ -504,8 +483,5 @@ const len = state.objectMode ? 1 : chunk.length | ||
} | ||
const onfinishCallbacks = state[kOnFinished].splice(0) | ||
for (let i = 0; i < onfinishCallbacks.length; i++) { | ||
var _state$errored2 | ||
onfinishCallbacks[i]( | ||
@@ -517,6 +493,6 @@ (_state$errored2 = state.errored) !== null && _state$errored2 !== undefined | ||
} | ||
resetBuffer(state) | ||
} // If there's something in the buffer waiting, then process it. | ||
} | ||
// If there's something in the buffer waiting, then process it. | ||
function clearBuffer(stream, state) { | ||
@@ -526,13 +502,9 @@ if (state.corked || state.bufferProcessing || state.destroyed || !state.constructed) { | ||
} | ||
const { buffered, bufferedIndex, objectMode } = state | ||
const bufferedLength = buffered.length - bufferedIndex | ||
if (!bufferedLength) { | ||
return | ||
} | ||
let i = bufferedIndex | ||
state.bufferProcessing = true | ||
if (bufferedLength > 1 && stream._writev) { | ||
@@ -546,5 +518,5 @@ state.pendingcb -= bufferedLength - 1 | ||
} | ||
} // Make a copy of `buffered` if it's going to be used by `callback` above, | ||
} | ||
// Make a copy of `buffered` if it's going to be used by `callback` above, | ||
// since `doWrite` will mutate the array. | ||
const chunks = state.allNoop && i === 0 ? buffered : ArrayPrototypeSlice(buffered, i) | ||
@@ -561,3 +533,2 @@ chunks.allBuffers = state.allBuffers | ||
} while (i < buffered.length && !state.writing) | ||
if (i === buffered.length) { | ||
@@ -572,6 +543,4 @@ resetBuffer(state) | ||
} | ||
state.bufferProcessing = false | ||
} | ||
Writable.prototype._write = function (chunk, encoding, cb) { | ||
@@ -592,8 +561,5 @@ if (this._writev) { | ||
} | ||
Writable.prototype._writev = null | ||
Writable.prototype.end = function (chunk, encoding, cb) { | ||
const state = this._writableState | ||
if (typeof chunk === 'function') { | ||
@@ -607,13 +573,11 @@ cb = chunk | ||
} | ||
let err | ||
if (chunk !== null && chunk !== undefined) { | ||
const ret = _write(this, chunk, encoding) | ||
if (ret instanceof Error) { | ||
err = ret | ||
} | ||
} // .end() fully uncorks. | ||
} | ||
// .end() fully uncorks. | ||
if (state.corked) { | ||
@@ -623,3 +587,2 @@ state.corked = 1 | ||
} | ||
if (err) { | ||
@@ -633,2 +596,3 @@ // Do nothing... | ||
// or not. | ||
state.ending = true | ||
@@ -642,3 +606,2 @@ finishMaybe(this, state, true) | ||
} | ||
if (typeof cb === 'function') { | ||
@@ -651,6 +614,4 @@ if (err || state.finished) { | ||
} | ||
return this | ||
} | ||
function needFinish(state) { | ||
@@ -670,6 +631,4 @@ return ( | ||
} | ||
function callFinal(stream, state) { | ||
let called = false | ||
function onFinish(err) { | ||
@@ -680,20 +639,16 @@ if (called) { | ||
} | ||
called = true | ||
state.pendingcb-- | ||
if (err) { | ||
const onfinishCallbacks = state[kOnFinished].splice(0) | ||
for (let i = 0; i < onfinishCallbacks.length; i++) { | ||
onfinishCallbacks[i](err) | ||
} | ||
errorOrDestroy(stream, err, state.sync) | ||
} else if (needFinish(state)) { | ||
state.prefinished = true | ||
stream.emit('prefinish') // Backwards compat. Don't check state.sync here. | ||
stream.emit('prefinish') | ||
// Backwards compat. Don't check state.sync here. | ||
// Some streams assume 'finish' will be emitted | ||
// asynchronously relative to _final callback. | ||
state.pendingcb++ | ||
@@ -703,6 +658,4 @@ process.nextTick(finish, stream, state) | ||
} | ||
state.sync = true | ||
state.pendingcb++ | ||
try { | ||
@@ -713,6 +666,4 @@ stream._final(onFinish) | ||
} | ||
state.sync = false | ||
} | ||
function prefinish(stream, state) { | ||
@@ -729,7 +680,5 @@ if (!state.prefinished && !state.finalCalled) { | ||
} | ||
function finishMaybe(stream, state, sync) { | ||
if (needFinish(state)) { | ||
prefinish(stream, state) | ||
if (state.pendingcb === 0) { | ||
@@ -756,3 +705,2 @@ if (sync) { | ||
} | ||
function finish(stream, state) { | ||
@@ -762,9 +710,6 @@ state.pendingcb-- | ||
const onfinishCallbacks = state[kOnFinished].splice(0) | ||
for (let i = 0; i < onfinishCallbacks.length; i++) { | ||
onfinishCallbacks[i]() | ||
} | ||
stream.emit('finish') | ||
if (state.autoDestroy) { | ||
@@ -776,6 +721,6 @@ // In case of duplex streams we need a way to detect | ||
!rState || | ||
(rState.autoDestroy && // We don't expect the readable to ever 'end' | ||
(rState.autoDestroy && | ||
// We don't expect the readable to ever 'end' | ||
// if readable is explicitly set to false. | ||
(rState.endEmitted || rState.readable === false)) | ||
if (autoDestroy) { | ||
@@ -786,7 +731,5 @@ stream.destroy() | ||
} | ||
ObjectDefineProperties(Writable.prototype, { | ||
closed: { | ||
__proto__: null, | ||
get() { | ||
@@ -798,7 +741,5 @@ return this._writableState ? this._writableState.closed : false | ||
__proto__: null, | ||
get() { | ||
return this._writableState ? this._writableState.destroyed : false | ||
}, | ||
set(value) { | ||
@@ -813,12 +754,10 @@ // Backward compatibility, the user is explicitly managing destroyed. | ||
__proto__: null, | ||
get() { | ||
const w = this._writableState // w.writable === false means that this is part of a Duplex stream | ||
const w = this._writableState | ||
// w.writable === false means that this is part of a Duplex stream | ||
// where the writable side was disabled upon construction. | ||
// Compat. The user might manually disable writable side through | ||
// deprecated setter. | ||
return !!w && w.writable !== false && !w.destroyed && !w.errored && !w.ending && !w.ended | ||
}, | ||
set(val) { | ||
@@ -833,3 +772,2 @@ // Backwards compatible. | ||
__proto__: null, | ||
get() { | ||
@@ -841,3 +779,2 @@ return this._writableState ? this._writableState.finished : false | ||
__proto__: null, | ||
get() { | ||
@@ -849,3 +786,2 @@ return this._writableState ? this._writableState.objectMode : false | ||
__proto__: null, | ||
get() { | ||
@@ -857,3 +793,2 @@ return this._writableState && this._writableState.getBuffer() | ||
__proto__: null, | ||
get() { | ||
@@ -865,3 +800,2 @@ return this._writableState ? this._writableState.ending : false | ||
__proto__: null, | ||
get() { | ||
@@ -875,3 +809,2 @@ const wState = this._writableState | ||
__proto__: null, | ||
get() { | ||
@@ -883,3 +816,2 @@ return this._writableState && this._writableState.highWaterMark | ||
__proto__: null, | ||
get() { | ||
@@ -891,3 +823,2 @@ return this._writableState ? this._writableState.corked : 0 | ||
__proto__: null, | ||
get() { | ||
@@ -900,3 +831,2 @@ return this._writableState && this._writableState.length | ||
enumerable: false, | ||
get() { | ||
@@ -919,26 +849,22 @@ return this._writableState ? this._writableState.errored : null | ||
const destroy = destroyImpl.destroy | ||
Writable.prototype.destroy = function (err, cb) { | ||
const state = this._writableState // Invoke pending callbacks. | ||
const state = this._writableState | ||
// Invoke pending callbacks. | ||
if (!state.destroyed && (state.bufferedIndex < state.buffered.length || state[kOnFinished].length)) { | ||
process.nextTick(errorBuffer, state) | ||
} | ||
destroy.call(this, err, cb) | ||
return this | ||
} | ||
Writable.prototype._undestroy = destroyImpl.undestroy | ||
Writable.prototype._destroy = function (err, cb) { | ||
cb(err) | ||
} | ||
Writable.prototype[EE.captureRejectionSymbol] = function (err) { | ||
this.destroy(err) | ||
} | ||
let webStreamsAdapters | ||
let webStreamsAdapters // Lazy to avoid circular references | ||
// Lazy to avoid circular references | ||
function lazyWebStreams() { | ||
@@ -948,9 +874,7 @@ if (webStreamsAdapters === undefined) webStreamsAdapters = {} | ||
} | ||
Writable.fromWeb = function (writableStream, options) { | ||
return lazyWebStreams().newStreamWritableFromWritableStream(writableStream, options) | ||
} | ||
Writable.toWeb = function (streamWritable) { | ||
return lazyWebStreams().newWritableStreamFromStreamWritable(streamWritable) | ||
} |
@@ -19,3 +19,2 @@ 'use strict' | ||
} = require('../ours/primordials') | ||
const { | ||
@@ -25,8 +24,6 @@ hideStackFrames, | ||
} = require('../ours/errors') | ||
const { normalizeEncoding } = require('../ours/util') | ||
const { isAsyncFunction, isArrayBufferView } = require('../ours/util').types | ||
const signals = {} | ||
const signals = {} | ||
/** | ||
@@ -36,6 +33,6 @@ * @param {*} value | ||
*/ | ||
function isInt32(value) { | ||
return value === (value | 0) | ||
} | ||
/** | ||
@@ -45,9 +42,8 @@ * @param {*} value | ||
*/ | ||
function isUint32(value) { | ||
return value === value >>> 0 | ||
} | ||
const octalReg = /^[0-7]+$/ | ||
const modeDesc = 'must be a 32-bit unsigned integer or an octal string' | ||
/** | ||
@@ -65,3 +61,2 @@ * Parse and validate values that will be converted into mode_t (the S_* | ||
*/ | ||
function parseFileMode(value, name, def) { | ||
@@ -71,3 +66,2 @@ if (typeof value === 'undefined') { | ||
} | ||
if (typeof value === 'string') { | ||
@@ -77,9 +71,8 @@ if (RegExpPrototypeExec(octalReg, value) === null) { | ||
} | ||
value = NumberParseInt(value, 8) | ||
} | ||
validateUint32(value, name) | ||
return value | ||
} | ||
/** | ||
@@ -95,3 +88,2 @@ * @callback validateInteger | ||
/** @type {validateInteger} */ | ||
const validateInteger = hideStackFrames((value, name, min = NumberMIN_SAFE_INTEGER, max = NumberMAX_SAFE_INTEGER) => { | ||
@@ -102,2 +94,3 @@ if (typeof value !== 'number') throw new ERR_INVALID_ARG_TYPE(name, 'number', value) | ||
}) | ||
/** | ||
@@ -113,3 +106,2 @@ * @callback validateInt32 | ||
/** @type {validateInt32} */ | ||
const validateInt32 = hideStackFrames((value, name, min = -2147483648, max = 2147483647) => { | ||
@@ -120,7 +112,5 @@ // The defaults for min and max correspond to the limits of 32-bit integers. | ||
} | ||
if (!NumberIsInteger(value)) { | ||
throw new ERR_OUT_OF_RANGE(name, 'an integer', value) | ||
} | ||
if (value < min || value > max) { | ||
@@ -130,2 +120,3 @@ throw new ERR_OUT_OF_RANGE(name, `>= ${min} && <= ${max}`, value) | ||
}) | ||
/** | ||
@@ -140,3 +131,2 @@ * @callback validateUint32 | ||
/** @type {validateUint32} */ | ||
const validateUint32 = hideStackFrames((value, name, positive = false) => { | ||
@@ -146,11 +136,8 @@ if (typeof value !== 'number') { | ||
} | ||
if (!NumberIsInteger(value)) { | ||
throw new ERR_OUT_OF_RANGE(name, 'an integer', value) | ||
} | ||
const min = positive ? 1 : 0 // 2 ** 32 === 4294967296 | ||
const max = 4_294_967_295 | ||
const min = positive ? 1 : 0 | ||
// 2 ** 32 === 4294967296 | ||
const max = 4294967295 | ||
if (value < min || value > max) { | ||
@@ -160,2 +147,3 @@ throw new ERR_OUT_OF_RANGE(name, `>= ${min} && <= ${max}`, value) | ||
}) | ||
/** | ||
@@ -169,6 +157,6 @@ * @callback validateString | ||
/** @type {validateString} */ | ||
function validateString(value, name) { | ||
if (typeof value !== 'string') throw new ERR_INVALID_ARG_TYPE(name, 'string', value) | ||
} | ||
/** | ||
@@ -184,6 +172,4 @@ * @callback validateNumber | ||
/** @type {validateNumber} */ | ||
function validateNumber(value, name, min = undefined, max) { | ||
if (typeof value !== 'number') throw new ERR_INVALID_ARG_TYPE(name, 'number', value) | ||
if ( | ||
@@ -201,2 +187,3 @@ (min != null && value < min) || | ||
} | ||
/** | ||
@@ -211,3 +198,2 @@ * @callback validateOneOf | ||
/** @type {validateOneOf} */ | ||
const validateOneOf = hideStackFrames((value, name, oneOf) => { | ||
@@ -223,2 +209,3 @@ if (!ArrayPrototypeIncludes(oneOf, value)) { | ||
}) | ||
/** | ||
@@ -232,10 +219,9 @@ * @callback validateBoolean | ||
/** @type {validateBoolean} */ | ||
function validateBoolean(value, name) { | ||
if (typeof value !== 'boolean') throw new ERR_INVALID_ARG_TYPE(name, 'boolean', value) | ||
} | ||
function getOwnPropertyValueOrDefault(options, key, defaultValue) { | ||
return options == null || !ObjectPrototypeHasOwnProperty(options, key) ? defaultValue : options[key] | ||
} | ||
/** | ||
@@ -253,3 +239,2 @@ * @callback validateObject | ||
/** @type {validateObject} */ | ||
const validateObject = hideStackFrames((value, name, options = null) => { | ||
@@ -259,3 +244,2 @@ const allowArray = getOwnPropertyValueOrDefault(options, 'allowArray', false) | ||
const nullable = getOwnPropertyValueOrDefault(options, 'nullable', false) | ||
if ( | ||
@@ -269,2 +253,3 @@ (!nullable && value === null) || | ||
}) | ||
/** | ||
@@ -279,3 +264,2 @@ * @callback validateArray | ||
/** @type {validateArray} */ | ||
const validateArray = hideStackFrames((value, name, minLength = 0) => { | ||
@@ -285,3 +269,2 @@ if (!ArrayIsArray(value)) { | ||
} | ||
if (value.length < minLength) { | ||
@@ -291,4 +274,5 @@ const reason = `must be longer than ${minLength}` | ||
} | ||
}) // eslint-disable-next-line jsdoc/require-returns-check | ||
}) | ||
// eslint-disable-next-line jsdoc/require-returns-check | ||
/** | ||
@@ -299,6 +283,4 @@ * @param {*} signal | ||
*/ | ||
function validateSignalName(signal, name = 'signal') { | ||
validateString(signal, name) | ||
if (signals[signal] === undefined) { | ||
@@ -308,6 +290,6 @@ if (signals[StringPrototypeToUpperCase(signal)] !== undefined) { | ||
} | ||
throw new ERR_UNKNOWN_SIGNAL(signal) | ||
} | ||
} | ||
/** | ||
@@ -321,3 +303,2 @@ * @callback validateBuffer | ||
/** @type {validateBuffer} */ | ||
const validateBuffer = hideStackFrames((buffer, name = 'buffer') => { | ||
@@ -328,2 +309,3 @@ if (!isArrayBufferView(buffer)) { | ||
}) | ||
/** | ||
@@ -333,7 +315,5 @@ * @param {string} data | ||
*/ | ||
function validateEncoding(data, encoding) { | ||
const normalizedEncoding = normalizeEncoding(encoding) | ||
const length = data.length | ||
if (normalizedEncoding === 'hex' && length % 2 !== 0) { | ||
@@ -343,2 +323,3 @@ throw new ERR_INVALID_ARG_VALUE('encoding', encoding, `is invalid for data of length ${length}`) | ||
} | ||
/** | ||
@@ -352,3 +333,2 @@ * Check that the port number is not NaN when coerced to a number, | ||
*/ | ||
function validatePort(port, name = 'Port', allowZero = true) { | ||
@@ -364,5 +344,5 @@ if ( | ||
} | ||
return port | 0 | ||
} | ||
/** | ||
@@ -375,3 +355,2 @@ * @callback validateAbortSignal | ||
/** @type {validateAbortSignal} */ | ||
const validateAbortSignal = hideStackFrames((signal, name) => { | ||
@@ -382,2 +361,3 @@ if (signal !== undefined && (signal === null || typeof signal !== 'object' || !('aborted' in signal))) { | ||
}) | ||
/** | ||
@@ -391,6 +371,6 @@ * @callback validateFunction | ||
/** @type {validateFunction} */ | ||
const validateFunction = hideStackFrames((value, name) => { | ||
if (typeof value !== 'function') throw new ERR_INVALID_ARG_TYPE(name, 'Function', value) | ||
}) | ||
/** | ||
@@ -404,6 +384,6 @@ * @callback validatePlainFunction | ||
/** @type {validatePlainFunction} */ | ||
const validatePlainFunction = hideStackFrames((value, name) => { | ||
if (typeof value !== 'function' || isAsyncFunction(value)) throw new ERR_INVALID_ARG_TYPE(name, 'Function', value) | ||
}) | ||
/** | ||
@@ -417,6 +397,6 @@ * @callback validateUndefined | ||
/** @type {validateUndefined} */ | ||
const validateUndefined = hideStackFrames((value, name) => { | ||
if (value !== undefined) throw new ERR_INVALID_ARG_TYPE(name, 'undefined', value) | ||
}) | ||
/** | ||
@@ -428,3 +408,2 @@ * @template T | ||
*/ | ||
function validateUnion(value, name, union) { | ||
@@ -435,3 +414,2 @@ if (!ArrayPrototypeIncludes(union, value)) { | ||
} | ||
module.exports = { | ||
@@ -438,0 +416,0 @@ isInt32, |
'use strict' | ||
const CustomStream = require('../stream') | ||
const promises = require('../stream/promises') | ||
const originalDestroy = CustomStream.Readable.destroy | ||
module.exports = CustomStream.Readable // Explicit export naming is needed for ESM | ||
module.exports = CustomStream.Readable | ||
// Explicit export naming is needed for ESM | ||
module.exports._uint8ArrayToBuffer = CustomStream._uint8ArrayToBuffer | ||
@@ -29,3 +28,2 @@ module.exports._isUint8Array = CustomStream._isUint8Array | ||
enumerable: true, | ||
get() { | ||
@@ -35,4 +33,5 @@ return promises | ||
}) | ||
module.exports.Stream = CustomStream.Stream // Allow default importing | ||
module.exports.Stream = CustomStream.Stream | ||
// Allow default importing | ||
module.exports.default = module.exports |
'use strict' | ||
const { format, inspect, AggregateError: CustomAggregateError } = require('./util') | ||
/* | ||
@@ -19,3 +20,4 @@ This file is a reduced and adapted version of the main lib/internal/errors.js file defined at | ||
'number', | ||
'object', // Accept 'Function' and 'Object' as alternative to the lower cased version. | ||
'object', | ||
// Accept 'Function' and 'Object' as alternative to the lower cased version. | ||
'Function', | ||
@@ -30,3 +32,2 @@ 'Object', | ||
const codes = {} | ||
function assert(value, message) { | ||
@@ -36,4 +37,5 @@ if (!value) { | ||
} | ||
} // Only use this for integers! Decimal numbers do not work with this function. | ||
} | ||
// Only use this for integers! Decimal numbers do not work with this function. | ||
function addNumericalSeparator(val) { | ||
@@ -43,14 +45,12 @@ let res = '' | ||
const start = val[0] === '-' ? 1 : 0 | ||
for (; i >= start + 4; i -= 3) { | ||
res = `_${val.slice(i - 3, i)}${res}` | ||
} | ||
return `${val.slice(0, i)}${res}` | ||
} | ||
function getMessage(key, msg, args) { | ||
if (typeof msg === 'function') { | ||
assert( | ||
msg.length <= args.length, // Default options do not count. | ||
msg.length <= args.length, | ||
// Default options do not count. | ||
`Code: ${key}; The provided arguments length (${args.length}) does not match the required ones (${msg.length}).` | ||
@@ -60,3 +60,2 @@ ) | ||
} | ||
const expectedLength = (msg.match(/%[dfijoOs]/g) || []).length | ||
@@ -67,10 +66,7 @@ assert( | ||
) | ||
if (args.length === 0) { | ||
return msg | ||
} | ||
return format(msg, ...args) | ||
} | ||
function E(code, message, Base) { | ||
@@ -80,3 +76,2 @@ if (!Base) { | ||
} | ||
class NodeError extends Base { | ||
@@ -86,3 +81,2 @@ constructor(...args) { | ||
} | ||
toString() { | ||
@@ -92,3 +86,2 @@ return `${this.name} [${code}]: ${this.message}` | ||
} | ||
Object.defineProperties(NodeError.prototype, { | ||
@@ -105,3 +98,2 @@ name: { | ||
}, | ||
writable: true, | ||
@@ -116,3 +108,2 @@ enumerable: false, | ||
} | ||
function hideStackFrames(fn) { | ||
@@ -127,3 +118,2 @@ // We rename the functions that will be hidden to cut off the stacktrace | ||
} | ||
function aggregateTwoErrors(innerError, outerError) { | ||
@@ -136,3 +126,2 @@ if (innerError && outerError && innerError !== outerError) { | ||
} | ||
const err = new AggregateError([outerError, innerError], outerError.message) | ||
@@ -142,6 +131,4 @@ err.code = outerError.code | ||
} | ||
return innerError || outerError | ||
} | ||
class AbortError extends Error { | ||
@@ -152,3 +139,2 @@ constructor(message = 'The operation was aborted', options = undefined) { | ||
} | ||
super(message, options) | ||
@@ -159,3 +145,2 @@ this.code = 'ABORT_ERR' | ||
} | ||
E('ERR_ASSERTION', '%s', Error) | ||
@@ -166,9 +151,6 @@ E( | ||
assert(typeof name === 'string', "'name' must be a string") | ||
if (!Array.isArray(expected)) { | ||
expected = [expected] | ||
} | ||
let msg = 'The ' | ||
if (name.endsWith(' argument')) { | ||
@@ -180,3 +162,2 @@ // For cases like 'first argument' | ||
} | ||
msg += 'must be ' | ||
@@ -186,6 +167,4 @@ const types = [] | ||
const other = [] | ||
for (const value of expected) { | ||
assert(typeof value === 'string', 'All expected entries have to be of type string') | ||
if (kTypes.includes(value)) { | ||
@@ -199,8 +178,8 @@ types.push(value.toLowerCase()) | ||
} | ||
} // Special handle `object` in case other instances are allowed to outline | ||
} | ||
// Special handle `object` in case other instances are allowed to outline | ||
// the differences between each other. | ||
if (instances.length > 0) { | ||
const pos = types.indexOf('object') | ||
if (pos !== -1) { | ||
@@ -211,3 +190,2 @@ types.splice(types, pos, 1) | ||
} | ||
if (types.length > 0) { | ||
@@ -218,7 +196,5 @@ switch (types.length) { | ||
break | ||
case 2: | ||
msg += `one of type ${types[0]} or ${types[1]}` | ||
break | ||
default: { | ||
@@ -229,3 +205,2 @@ const last = types.pop() | ||
} | ||
if (instances.length > 0 || other.length > 0) { | ||
@@ -235,3 +210,2 @@ msg += ' or ' | ||
} | ||
if (instances.length > 0) { | ||
@@ -242,7 +216,5 @@ switch (instances.length) { | ||
break | ||
case 2: | ||
msg += `an instance of ${instances[0]} or ${instances[1]}` | ||
break | ||
default: { | ||
@@ -253,3 +225,2 @@ const last = instances.pop() | ||
} | ||
if (other.length > 0) { | ||
@@ -259,7 +230,5 @@ msg += ' or ' | ||
} | ||
switch (other.length) { | ||
case 0: | ||
break | ||
case 1: | ||
@@ -269,10 +238,7 @@ if (other[0].toLowerCase() !== other[0]) { | ||
} | ||
msg += `${other[0]}` | ||
break | ||
case 2: | ||
msg += `one of ${other[0]} or ${other[1]}` | ||
break | ||
default: { | ||
@@ -283,3 +249,2 @@ const last = other.pop() | ||
} | ||
if (actual == null) { | ||
@@ -291,3 +256,2 @@ msg += `. Received ${actual}` | ||
var _actual$constructor | ||
if ( | ||
@@ -309,10 +273,7 @@ (_actual$constructor = actual.constructor) !== null && | ||
}) | ||
if (inspected.length > 25) { | ||
inspected = `${inspected.slice(0, 25)}...` | ||
} | ||
msg += `. Received type ${typeof actual} (${inspected})` | ||
} | ||
return msg | ||
@@ -326,7 +287,5 @@ }, | ||
let inspected = inspect(value) | ||
if (inspected.length > 128) { | ||
inspected = inspected.slice(0, 128) + '...' | ||
} | ||
const type = name.includes('.') ? 'property' : 'argument' | ||
@@ -341,3 +300,2 @@ return `The ${type} '${name}' ${reason}. Received ${inspected}` | ||
var _value$constructor | ||
const type = | ||
@@ -362,3 +320,2 @@ value !== null && | ||
args = (Array.isArray(args) ? args : [args]).map((a) => `"${a}"`).join(' or ') | ||
switch (len) { | ||
@@ -368,7 +325,5 @@ case 1: | ||
break | ||
case 2: | ||
msg += `The ${args[0]} and ${args[1]} arguments` | ||
break | ||
default: | ||
@@ -381,3 +336,2 @@ { | ||
} | ||
return `${msg} must be specified` | ||
@@ -392,3 +346,2 @@ }, | ||
let received | ||
if (Number.isInteger(input) && Math.abs(input) > 2 ** 32) { | ||
@@ -398,7 +351,5 @@ received = addNumericalSeparator(String(input)) | ||
received = String(input) | ||
if (input > 2n ** 32n || input < -(2n ** 32n)) { | ||
received = addNumericalSeparator(received) | ||
} | ||
received += 'n' | ||
@@ -408,3 +359,2 @@ } else { | ||
} | ||
return `The value of "${str}" is out of range. It must be ${range}. Received ${received}` | ||
@@ -411,0 +361,0 @@ }, |
'use strict' | ||
const Stream = require('stream') | ||
if (Stream && process.env.READABLE_STREAM === 'disable') { | ||
const promises = Stream.promises // Explicit export naming is needed for ESM | ||
const promises = Stream.promises | ||
// Explicit export naming is needed for ESM | ||
module.exports._uint8ArrayToBuffer = Stream._uint8ArrayToBuffer | ||
@@ -26,3 +26,2 @@ module.exports._isUint8Array = Stream._isUint8Array | ||
enumerable: true, | ||
get() { | ||
@@ -35,8 +34,7 @@ return promises | ||
const CustomStream = require('../stream') | ||
const promises = require('../stream/promises') | ||
const originalDestroy = CustomStream.Readable.destroy | ||
module.exports = CustomStream.Readable // Explicit export naming is needed for ESM | ||
module.exports = CustomStream.Readable | ||
// Explicit export naming is needed for ESM | ||
module.exports._uint8ArrayToBuffer = CustomStream._uint8ArrayToBuffer | ||
@@ -61,3 +59,2 @@ module.exports._isUint8Array = CustomStream._isUint8Array | ||
enumerable: true, | ||
get() { | ||
@@ -68,4 +65,5 @@ return promises | ||
module.exports.Stream = CustomStream.Stream | ||
} // Allow default importing | ||
} | ||
// Allow default importing | ||
module.exports.default = module.exports |
'use strict' | ||
/* | ||
@@ -9,3 +10,2 @@ This file is a reduced and adapted version of the main lib/internal/per_context/primordials.js file defined at | ||
*/ | ||
module.exports = { | ||
@@ -15,41 +15,30 @@ ArrayIsArray(self) { | ||
}, | ||
ArrayPrototypeIncludes(self, el) { | ||
return self.includes(el) | ||
}, | ||
ArrayPrototypeIndexOf(self, el) { | ||
return self.indexOf(el) | ||
}, | ||
ArrayPrototypeJoin(self, sep) { | ||
return self.join(sep) | ||
}, | ||
ArrayPrototypeMap(self, fn) { | ||
return self.map(fn) | ||
}, | ||
ArrayPrototypePop(self, el) { | ||
return self.pop(el) | ||
}, | ||
ArrayPrototypePush(self, el) { | ||
return self.push(el) | ||
}, | ||
ArrayPrototypeSlice(self, start, end) { | ||
return self.slice(start, end) | ||
}, | ||
Error, | ||
FunctionPrototypeCall(fn, thisArgs, ...args) { | ||
return fn.call(thisArgs, ...args) | ||
}, | ||
FunctionPrototypeSymbolHasInstance(self, instance) { | ||
return Function.prototype[Symbol.hasInstance].call(self, instance) | ||
}, | ||
MathFloor: Math.floor, | ||
@@ -62,62 +51,45 @@ Number, | ||
NumberParseInt: Number.parseInt, | ||
ObjectDefineProperties(self, props) { | ||
return Object.defineProperties(self, props) | ||
}, | ||
ObjectDefineProperty(self, name, prop) { | ||
return Object.defineProperty(self, name, prop) | ||
}, | ||
ObjectGetOwnPropertyDescriptor(self, name) { | ||
return Object.getOwnPropertyDescriptor(self, name) | ||
}, | ||
ObjectKeys(obj) { | ||
return Object.keys(obj) | ||
}, | ||
ObjectSetPrototypeOf(target, proto) { | ||
return Object.setPrototypeOf(target, proto) | ||
}, | ||
Promise, | ||
PromisePrototypeCatch(self, fn) { | ||
return self.catch(fn) | ||
}, | ||
PromisePrototypeThen(self, thenFn, catchFn) { | ||
return self.then(thenFn, catchFn) | ||
}, | ||
PromiseReject(err) { | ||
return Promise.reject(err) | ||
}, | ||
ReflectApply: Reflect.apply, | ||
RegExpPrototypeTest(self, value) { | ||
return self.test(value) | ||
}, | ||
SafeSet: Set, | ||
String, | ||
StringPrototypeSlice(self, start, end) { | ||
return self.slice(start, end) | ||
}, | ||
StringPrototypeToLowerCase(self) { | ||
return self.toLowerCase() | ||
}, | ||
StringPrototypeToUpperCase(self) { | ||
return self.toUpperCase() | ||
}, | ||
StringPrototypeTrim(self) { | ||
return self.trim() | ||
}, | ||
Symbol, | ||
@@ -127,8 +99,6 @@ SymbolAsyncIterator: Symbol.asyncIterator, | ||
SymbolIterator: Symbol.iterator, | ||
TypedArrayPrototypeSet(self, buf, len) { | ||
return self.set(buf, len) | ||
}, | ||
Uint8Array | ||
} |
'use strict' | ||
const bufferModule = require('buffer') | ||
const AsyncFunction = Object.getPrototypeOf(async function () {}).constructor | ||
const Blob = globalThis.Blob || bufferModule.Blob | ||
/* eslint-disable indent */ | ||
const isBlob = | ||
@@ -19,4 +17,4 @@ typeof Blob !== 'undefined' | ||
/* eslint-enable indent */ | ||
// This is a simplified version of AggregateError | ||
class AggregateError extends Error { | ||
@@ -27,9 +25,6 @@ constructor(errors) { | ||
} | ||
let message = '' | ||
for (let i = 0; i < errors.length; i++) { | ||
message += ` ${errors[i].stack}\n` | ||
} | ||
super(message) | ||
@@ -40,7 +35,5 @@ this.name = 'AggregateError' | ||
} | ||
module.exports = { | ||
AggregateError, | ||
kEmptyObject: Object.freeze({}), | ||
once(callback) { | ||
@@ -52,3 +45,2 @@ let called = false | ||
} | ||
called = true | ||
@@ -58,7 +50,7 @@ callback.apply(this, args) | ||
}, | ||
createDeferredPromise: function () { | ||
let resolve | ||
let reject // eslint-disable-next-line promise/param-names | ||
let reject | ||
// eslint-disable-next-line promise/param-names | ||
const promise = new Promise((res, rej) => { | ||
@@ -74,3 +66,2 @@ resolve = res | ||
}, | ||
promisify(fn) { | ||
@@ -82,3 +73,2 @@ return new Promise((resolve, reject) => { | ||
} | ||
return resolve(...args) | ||
@@ -88,7 +78,5 @@ }) | ||
}, | ||
debuglog() { | ||
return function () {} | ||
}, | ||
format(format, ...args) { | ||
@@ -98,3 +86,2 @@ // Simplified version of https://nodejs.org/api/util.html#utilformatformat-args | ||
const replacement = args.shift() | ||
if (type === 'f') { | ||
@@ -112,3 +99,2 @@ return replacement.toFixed(6) | ||
}, | ||
inspect(value) { | ||
@@ -125,5 +111,3 @@ // Vastly simplified version of https://nodejs.org/api/util.html#utilinspectobject-options | ||
} | ||
return `'${value}'` | ||
case 'number': | ||
@@ -135,12 +119,8 @@ if (isNaN(value)) { | ||
} | ||
return value | ||
case 'bigint': | ||
return `${String(value)}n` | ||
case 'boolean': | ||
case 'undefined': | ||
return String(value) | ||
case 'object': | ||
@@ -150,3 +130,2 @@ return '{}' | ||
}, | ||
types: { | ||
@@ -156,3 +135,2 @@ isAsyncFunction(fn) { | ||
}, | ||
isArrayBufferView(arr) { | ||
@@ -159,0 +137,0 @@ return ArrayBuffer.isView(arr) |
/* replacement start */ | ||
const { Buffer } = require('buffer') | ||
/* replacement end */ | ||
@@ -26,31 +28,18 @@ // Copyright Joyent, Inc. and other Node contributors. | ||
;('use strict') | ||
const { ObjectDefineProperty, ObjectKeys, ReflectApply } = require('./ours/primordials') | ||
const { | ||
promisify: { custom: customPromisify } | ||
} = require('./ours/util') | ||
const { streamReturningOperators, promiseReturningOperators } = require('./internal/streams/operators') | ||
const { | ||
codes: { ERR_ILLEGAL_CONSTRUCTOR } | ||
} = require('./ours/errors') | ||
const compose = require('./internal/streams/compose') | ||
const { pipeline } = require('./internal/streams/pipeline') | ||
const { destroyer } = require('./internal/streams/destroy') | ||
const eos = require('./internal/streams/end-of-stream') | ||
const internalBuffer = {} | ||
const promises = require('./stream/promises') | ||
const utils = require('./internal/streams/utils') | ||
const Stream = (module.exports = require('./internal/streams/legacy').Stream) | ||
Stream.isDisturbed = utils.isDisturbed | ||
@@ -60,6 +49,4 @@ Stream.isErrored = utils.isErrored | ||
Stream.Readable = require('./internal/streams/readable') | ||
for (const key of ObjectKeys(streamReturningOperators)) { | ||
const op = streamReturningOperators[key] | ||
function fn(...args) { | ||
@@ -69,6 +56,4 @@ if (new.target) { | ||
} | ||
return Stream.Readable.from(ReflectApply(op, this, args)) | ||
} | ||
ObjectDefineProperty(fn, 'name', { | ||
@@ -90,6 +75,4 @@ __proto__: null, | ||
} | ||
for (const key of ObjectKeys(promiseReturningOperators)) { | ||
const op = promiseReturningOperators[key] | ||
function fn(...args) { | ||
@@ -99,6 +82,4 @@ if (new.target) { | ||
} | ||
return ReflectApply(op, this, args) | ||
} | ||
ObjectDefineProperty(fn, 'name', { | ||
@@ -120,3 +101,2 @@ __proto__: null, | ||
} | ||
Stream.Writable = require('./internal/streams/writable') | ||
@@ -127,5 +107,3 @@ Stream.Duplex = require('./internal/streams/duplex') | ||
Stream.pipeline = pipeline | ||
const { addAbortSignal } = require('./internal/streams/add-abort-signal') | ||
Stream.addAbortSignal = addAbortSignal | ||
@@ -139,3 +117,2 @@ Stream.finished = eos | ||
enumerable: true, | ||
get() { | ||
@@ -148,3 +125,2 @@ return promises | ||
enumerable: true, | ||
get() { | ||
@@ -157,16 +133,14 @@ return promises.pipeline | ||
enumerable: true, | ||
get() { | ||
return promises.finished | ||
} | ||
}) // Backwards-compat with node 0.4.x | ||
}) | ||
// Backwards-compat with node 0.4.x | ||
Stream.Stream = Stream | ||
Stream._isUint8Array = function isUint8Array(value) { | ||
return value instanceof Uint8Array | ||
} | ||
Stream._uint8ArrayToBuffer = function _uint8ArrayToBuffer(chunk) { | ||
return Buffer.from(chunk.buffer, chunk.byteOffset, chunk.byteLength) | ||
} |
'use strict' | ||
const { ArrayPrototypePop, Promise } = require('../ours/primordials') | ||
const { isIterable, isNodeStream } = require('../internal/streams/utils') | ||
const { pipelineImpl: pl } = require('../internal/streams/pipeline') | ||
const { finished } = require('../internal/streams/end-of-stream') | ||
function pipeline(...streams) { | ||
@@ -16,3 +12,2 @@ return new Promise((resolve, reject) => { | ||
const lastArg = streams[streams.length - 1] | ||
if (lastArg && typeof lastArg === 'object' && !isNodeStream(lastArg) && !isIterable(lastArg)) { | ||
@@ -23,3 +18,2 @@ const options = ArrayPrototypePop(streams) | ||
} | ||
pl( | ||
@@ -41,3 +35,2 @@ streams, | ||
} | ||
module.exports = { | ||
@@ -44,0 +37,0 @@ finished, |
{ | ||
"name": "readable-stream", | ||
"version": "4.2.0", | ||
"version": "4.3.0", | ||
"description": "Node.js Streams, a user-land copy of the stream library from Node.js", | ||
@@ -5,0 +5,0 @@ "homepage": "https://github.com/nodejs/readable-stream", |
@@ -90,24 +90,4 @@ # readable-stream | ||
You will need a bundler like [`browserify`](https://github.com/browserify/browserify#readme), [`webpack`](https://webpack.js.org/), [`parcel`](https://github.com/parcel-bundler/parcel#readme) or similar. With Webpack 5 (which unlike other bundlers does not polyfill Node.js core modules and globals like `process`) you will also need to: | ||
You will need a bundler like [`browserify`](https://github.com/browserify/browserify#readme), [`webpack`](https://webpack.js.org/), [`parcel`](https://github.com/parcel-bundler/parcel#readme) or similar. Polyfills are no longer required since version 4.2.0. | ||
1. Install polyfills by running `npm install buffer process --save-dev` | ||
2. Create a [`webpack.config.js`](https://webpack.js.org/guides/getting-started/#using-a-configuration) file containing: | ||
```js | ||
const webpack = require('webpack') | ||
module.exports = { | ||
plugins: [ | ||
new webpack.ProvidePlugin({ | ||
process: 'process/browser' | ||
}) | ||
], | ||
resolve: { | ||
fallback: { | ||
buffer: require.resolve('buffer/') | ||
} | ||
} | ||
} | ||
``` | ||
# Streams Working Group | ||
@@ -114,0 +94,0 @@ |
6028
1
198901
117