readable-stream
Advanced tools
Comparing version 3.6.0 to 4.0.0
@@ -1,139 +0,3 @@ | ||
// Copyright Joyent, Inc. and other Node contributors. | ||
// | ||
// Permission is hereby granted, free of charge, to any person obtaining a | ||
// copy of this software and associated documentation files (the | ||
// "Software"), to deal in the Software without restriction, including | ||
// without limitation the rights to use, copy, modify, merge, publish, | ||
// distribute, sublicense, and/or sell copies of the Software, and to permit | ||
// persons to whom the Software is furnished to do so, subject to the | ||
// following conditions: | ||
// | ||
// The above copyright notice and this permission notice shall be included | ||
// in all copies or substantial portions of the Software. | ||
// | ||
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS | ||
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF | ||
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN | ||
// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, | ||
// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR | ||
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE | ||
// USE OR OTHER DEALINGS IN THE SOFTWARE. | ||
// a duplex stream is just a stream that is both readable and writable. | ||
// Since JS doesn't have multiple prototypal inheritance, this class | ||
// prototypally inherits from Readable, and then parasitically from | ||
// Writable. | ||
'use strict'; | ||
/*<replacement>*/ | ||
'use strict' // Keep this file as an alias for the full stream module. | ||
var objectKeys = Object.keys || function (obj) { | ||
var keys = []; | ||
for (var key in obj) { | ||
keys.push(key); | ||
} | ||
return keys; | ||
}; | ||
/*</replacement>*/ | ||
module.exports = Duplex; | ||
var Readable = require('./_stream_readable'); | ||
var Writable = require('./_stream_writable'); | ||
require('inherits')(Duplex, Readable); | ||
{ | ||
// Allow the keys array to be GC'ed. | ||
var keys = objectKeys(Writable.prototype); | ||
for (var v = 0; v < keys.length; v++) { | ||
var method = keys[v]; | ||
if (!Duplex.prototype[method]) Duplex.prototype[method] = Writable.prototype[method]; | ||
} | ||
} | ||
function Duplex(options) { | ||
if (!(this instanceof Duplex)) return new Duplex(options); | ||
Readable.call(this, options); | ||
Writable.call(this, options); | ||
this.allowHalfOpen = true; | ||
if (options) { | ||
if (options.readable === false) this.readable = false; | ||
if (options.writable === false) this.writable = false; | ||
if (options.allowHalfOpen === false) { | ||
this.allowHalfOpen = false; | ||
this.once('end', onend); | ||
} | ||
} | ||
} | ||
Object.defineProperty(Duplex.prototype, 'writableHighWaterMark', { | ||
// making it explicit this property is not enumerable | ||
// because otherwise some prototype manipulation in | ||
// userland will fail | ||
enumerable: false, | ||
get: function get() { | ||
return this._writableState.highWaterMark; | ||
} | ||
}); | ||
Object.defineProperty(Duplex.prototype, 'writableBuffer', { | ||
// making it explicit this property is not enumerable | ||
// because otherwise some prototype manipulation in | ||
// userland will fail | ||
enumerable: false, | ||
get: function get() { | ||
return this._writableState && this._writableState.getBuffer(); | ||
} | ||
}); | ||
Object.defineProperty(Duplex.prototype, 'writableLength', { | ||
// making it explicit this property is not enumerable | ||
// because otherwise some prototype manipulation in | ||
// userland will fail | ||
enumerable: false, | ||
get: function get() { | ||
return this._writableState.length; | ||
} | ||
}); // the no-half-open enforcer | ||
function onend() { | ||
// If the writable side ended, then we're ok. | ||
if (this._writableState.ended) return; // no more data can be written. | ||
// But allow more writes to happen in this tick. | ||
process.nextTick(onEndNT, this); | ||
} | ||
function onEndNT(self) { | ||
self.end(); | ||
} | ||
Object.defineProperty(Duplex.prototype, 'destroyed', { | ||
// making it explicit this property is not enumerable | ||
// because otherwise some prototype manipulation in | ||
// userland will fail | ||
enumerable: false, | ||
get: function get() { | ||
if (this._readableState === undefined || this._writableState === undefined) { | ||
return false; | ||
} | ||
return this._readableState.destroyed && this._writableState.destroyed; | ||
}, | ||
set: function set(value) { | ||
// we ignore the value if the stream | ||
// has not been initialized yet | ||
if (this._readableState === undefined || this._writableState === undefined) { | ||
return; | ||
} // backward compatibility, the user is explicitly | ||
// managing destroyed | ||
this._readableState.destroyed = value; | ||
this._writableState.destroyed = value; | ||
} | ||
}); | ||
module.exports = require('./stream').Duplex |
@@ -1,39 +0,3 @@ | ||
// Copyright Joyent, Inc. and other Node contributors. | ||
// | ||
// Permission is hereby granted, free of charge, to any person obtaining a | ||
// copy of this software and associated documentation files (the | ||
// "Software"), to deal in the Software without restriction, including | ||
// without limitation the rights to use, copy, modify, merge, publish, | ||
// distribute, sublicense, and/or sell copies of the Software, and to permit | ||
// persons to whom the Software is furnished to do so, subject to the | ||
// following conditions: | ||
// | ||
// The above copyright notice and this permission notice shall be included | ||
// in all copies or substantial portions of the Software. | ||
// | ||
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS | ||
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF | ||
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN | ||
// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, | ||
// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR | ||
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE | ||
// USE OR OTHER DEALINGS IN THE SOFTWARE. | ||
// a passthrough stream. | ||
// basically just the most minimal sort of Transform stream. | ||
// Every written chunk gets output as-is. | ||
'use strict'; | ||
'use strict' // Keep this file as an alias for the full stream module. | ||
module.exports = PassThrough; | ||
var Transform = require('./_stream_transform'); | ||
require('inherits')(PassThrough, Transform); | ||
function PassThrough(options) { | ||
if (!(this instanceof PassThrough)) return new PassThrough(options); | ||
Transform.call(this, options); | ||
} | ||
PassThrough.prototype._transform = function (chunk, encoding, cb) { | ||
cb(null, chunk); | ||
}; | ||
module.exports = require('./stream').PassThrough |
@@ -1,1124 +0,3 @@ | ||
// Copyright Joyent, Inc. and other Node contributors. | ||
// | ||
// Permission is hereby granted, free of charge, to any person obtaining a | ||
// copy of this software and associated documentation files (the | ||
// "Software"), to deal in the Software without restriction, including | ||
// without limitation the rights to use, copy, modify, merge, publish, | ||
// distribute, sublicense, and/or sell copies of the Software, and to permit | ||
// persons to whom the Software is furnished to do so, subject to the | ||
// following conditions: | ||
// | ||
// The above copyright notice and this permission notice shall be included | ||
// in all copies or substantial portions of the Software. | ||
// | ||
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS | ||
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF | ||
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN | ||
// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, | ||
// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR | ||
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE | ||
// USE OR OTHER DEALINGS IN THE SOFTWARE. | ||
'use strict'; | ||
'use strict' // Keep this file as an alias for the full stream module. | ||
module.exports = Readable; | ||
/*<replacement>*/ | ||
var Duplex; | ||
/*</replacement>*/ | ||
Readable.ReadableState = ReadableState; | ||
/*<replacement>*/ | ||
var EE = require('events').EventEmitter; | ||
var EElistenerCount = function EElistenerCount(emitter, type) { | ||
return emitter.listeners(type).length; | ||
}; | ||
/*</replacement>*/ | ||
/*<replacement>*/ | ||
var Stream = require('./internal/streams/stream'); | ||
/*</replacement>*/ | ||
var Buffer = require('buffer').Buffer; | ||
var OurUint8Array = global.Uint8Array || function () {}; | ||
function _uint8ArrayToBuffer(chunk) { | ||
return Buffer.from(chunk); | ||
} | ||
function _isUint8Array(obj) { | ||
return Buffer.isBuffer(obj) || obj instanceof OurUint8Array; | ||
} | ||
/*<replacement>*/ | ||
var debugUtil = require('util'); | ||
var debug; | ||
if (debugUtil && debugUtil.debuglog) { | ||
debug = debugUtil.debuglog('stream'); | ||
} else { | ||
debug = function debug() {}; | ||
} | ||
/*</replacement>*/ | ||
var BufferList = require('./internal/streams/buffer_list'); | ||
var destroyImpl = require('./internal/streams/destroy'); | ||
var _require = require('./internal/streams/state'), | ||
getHighWaterMark = _require.getHighWaterMark; | ||
var _require$codes = require('../errors').codes, | ||
ERR_INVALID_ARG_TYPE = _require$codes.ERR_INVALID_ARG_TYPE, | ||
ERR_STREAM_PUSH_AFTER_EOF = _require$codes.ERR_STREAM_PUSH_AFTER_EOF, | ||
ERR_METHOD_NOT_IMPLEMENTED = _require$codes.ERR_METHOD_NOT_IMPLEMENTED, | ||
ERR_STREAM_UNSHIFT_AFTER_END_EVENT = _require$codes.ERR_STREAM_UNSHIFT_AFTER_END_EVENT; // Lazy loaded to improve the startup performance. | ||
var StringDecoder; | ||
var createReadableStreamAsyncIterator; | ||
var from; | ||
require('inherits')(Readable, Stream); | ||
var errorOrDestroy = destroyImpl.errorOrDestroy; | ||
var kProxyEvents = ['error', 'close', 'destroy', 'pause', 'resume']; | ||
function prependListener(emitter, event, fn) { | ||
// Sadly this is not cacheable as some libraries bundle their own | ||
// event emitter implementation with them. | ||
if (typeof emitter.prependListener === 'function') return emitter.prependListener(event, fn); // This is a hack to make sure that our error handler is attached before any | ||
// userland ones. NEVER DO THIS. This is here only because this code needs | ||
// to continue to work with older versions of Node.js that do not include | ||
// the prependListener() method. The goal is to eventually remove this hack. | ||
if (!emitter._events || !emitter._events[event]) emitter.on(event, fn);else if (Array.isArray(emitter._events[event])) emitter._events[event].unshift(fn);else emitter._events[event] = [fn, emitter._events[event]]; | ||
} | ||
function ReadableState(options, stream, isDuplex) { | ||
Duplex = Duplex || require('./_stream_duplex'); | ||
options = options || {}; // Duplex streams are both readable and writable, but share | ||
// the same options object. | ||
// However, some cases require setting options to different | ||
// values for the readable and the writable sides of the duplex stream. | ||
// These options can be provided separately as readableXXX and writableXXX. | ||
if (typeof isDuplex !== 'boolean') isDuplex = stream instanceof Duplex; // object stream flag. Used to make read(n) ignore n and to | ||
// make all the buffer merging and length checks go away | ||
this.objectMode = !!options.objectMode; | ||
if (isDuplex) this.objectMode = this.objectMode || !!options.readableObjectMode; // the point at which it stops calling _read() to fill the buffer | ||
// Note: 0 is a valid value, means "don't call _read preemptively ever" | ||
this.highWaterMark = getHighWaterMark(this, options, 'readableHighWaterMark', isDuplex); // A linked list is used to store data chunks instead of an array because the | ||
// linked list can remove elements from the beginning faster than | ||
// array.shift() | ||
this.buffer = new BufferList(); | ||
this.length = 0; | ||
this.pipes = null; | ||
this.pipesCount = 0; | ||
this.flowing = null; | ||
this.ended = false; | ||
this.endEmitted = false; | ||
this.reading = false; // a flag to be able to tell if the event 'readable'/'data' is emitted | ||
// immediately, or on a later tick. We set this to true at first, because | ||
// any actions that shouldn't happen until "later" should generally also | ||
// not happen before the first read call. | ||
this.sync = true; // whenever we return null, then we set a flag to say | ||
// that we're awaiting a 'readable' event emission. | ||
this.needReadable = false; | ||
this.emittedReadable = false; | ||
this.readableListening = false; | ||
this.resumeScheduled = false; | ||
this.paused = true; // Should close be emitted on destroy. Defaults to true. | ||
this.emitClose = options.emitClose !== false; // Should .destroy() be called after 'end' (and potentially 'finish') | ||
this.autoDestroy = !!options.autoDestroy; // has it been destroyed | ||
this.destroyed = false; // Crypto is kind of old and crusty. Historically, its default string | ||
// encoding is 'binary' so we have to make this configurable. | ||
// Everything else in the universe uses 'utf8', though. | ||
this.defaultEncoding = options.defaultEncoding || 'utf8'; // the number of writers that are awaiting a drain event in .pipe()s | ||
this.awaitDrain = 0; // if true, a maybeReadMore has been scheduled | ||
this.readingMore = false; | ||
this.decoder = null; | ||
this.encoding = null; | ||
if (options.encoding) { | ||
if (!StringDecoder) StringDecoder = require('string_decoder/').StringDecoder; | ||
this.decoder = new StringDecoder(options.encoding); | ||
this.encoding = options.encoding; | ||
} | ||
} | ||
function Readable(options) { | ||
Duplex = Duplex || require('./_stream_duplex'); | ||
if (!(this instanceof Readable)) return new Readable(options); // Checking for a Stream.Duplex instance is faster here instead of inside | ||
// the ReadableState constructor, at least with V8 6.5 | ||
var isDuplex = this instanceof Duplex; | ||
this._readableState = new ReadableState(options, this, isDuplex); // legacy | ||
this.readable = true; | ||
if (options) { | ||
if (typeof options.read === 'function') this._read = options.read; | ||
if (typeof options.destroy === 'function') this._destroy = options.destroy; | ||
} | ||
Stream.call(this); | ||
} | ||
Object.defineProperty(Readable.prototype, 'destroyed', { | ||
// making it explicit this property is not enumerable | ||
// because otherwise some prototype manipulation in | ||
// userland will fail | ||
enumerable: false, | ||
get: function get() { | ||
if (this._readableState === undefined) { | ||
return false; | ||
} | ||
return this._readableState.destroyed; | ||
}, | ||
set: function set(value) { | ||
// we ignore the value if the stream | ||
// has not been initialized yet | ||
if (!this._readableState) { | ||
return; | ||
} // backward compatibility, the user is explicitly | ||
// managing destroyed | ||
this._readableState.destroyed = value; | ||
} | ||
}); | ||
Readable.prototype.destroy = destroyImpl.destroy; | ||
Readable.prototype._undestroy = destroyImpl.undestroy; | ||
Readable.prototype._destroy = function (err, cb) { | ||
cb(err); | ||
}; // Manually shove something into the read() buffer. | ||
// This returns true if the highWaterMark has not been hit yet, | ||
// similar to how Writable.write() returns true if you should | ||
// write() some more. | ||
Readable.prototype.push = function (chunk, encoding) { | ||
var state = this._readableState; | ||
var skipChunkCheck; | ||
if (!state.objectMode) { | ||
if (typeof chunk === 'string') { | ||
encoding = encoding || state.defaultEncoding; | ||
if (encoding !== state.encoding) { | ||
chunk = Buffer.from(chunk, encoding); | ||
encoding = ''; | ||
} | ||
skipChunkCheck = true; | ||
} | ||
} else { | ||
skipChunkCheck = true; | ||
} | ||
return readableAddChunk(this, chunk, encoding, false, skipChunkCheck); | ||
}; // Unshift should *always* be something directly out of read() | ||
Readable.prototype.unshift = function (chunk) { | ||
return readableAddChunk(this, chunk, null, true, false); | ||
}; | ||
function readableAddChunk(stream, chunk, encoding, addToFront, skipChunkCheck) { | ||
debug('readableAddChunk', chunk); | ||
var state = stream._readableState; | ||
if (chunk === null) { | ||
state.reading = false; | ||
onEofChunk(stream, state); | ||
} else { | ||
var er; | ||
if (!skipChunkCheck) er = chunkInvalid(state, chunk); | ||
if (er) { | ||
errorOrDestroy(stream, er); | ||
} else if (state.objectMode || chunk && chunk.length > 0) { | ||
if (typeof chunk !== 'string' && !state.objectMode && Object.getPrototypeOf(chunk) !== Buffer.prototype) { | ||
chunk = _uint8ArrayToBuffer(chunk); | ||
} | ||
if (addToFront) { | ||
if (state.endEmitted) errorOrDestroy(stream, new ERR_STREAM_UNSHIFT_AFTER_END_EVENT());else addChunk(stream, state, chunk, true); | ||
} else if (state.ended) { | ||
errorOrDestroy(stream, new ERR_STREAM_PUSH_AFTER_EOF()); | ||
} else if (state.destroyed) { | ||
return false; | ||
} else { | ||
state.reading = false; | ||
if (state.decoder && !encoding) { | ||
chunk = state.decoder.write(chunk); | ||
if (state.objectMode || chunk.length !== 0) addChunk(stream, state, chunk, false);else maybeReadMore(stream, state); | ||
} else { | ||
addChunk(stream, state, chunk, false); | ||
} | ||
} | ||
} else if (!addToFront) { | ||
state.reading = false; | ||
maybeReadMore(stream, state); | ||
} | ||
} // We can push more data if we are below the highWaterMark. | ||
// Also, if we have no data yet, we can stand some more bytes. | ||
// This is to work around cases where hwm=0, such as the repl. | ||
return !state.ended && (state.length < state.highWaterMark || state.length === 0); | ||
} | ||
function addChunk(stream, state, chunk, addToFront) { | ||
if (state.flowing && state.length === 0 && !state.sync) { | ||
state.awaitDrain = 0; | ||
stream.emit('data', chunk); | ||
} else { | ||
// update the buffer info. | ||
state.length += state.objectMode ? 1 : chunk.length; | ||
if (addToFront) state.buffer.unshift(chunk);else state.buffer.push(chunk); | ||
if (state.needReadable) emitReadable(stream); | ||
} | ||
maybeReadMore(stream, state); | ||
} | ||
function chunkInvalid(state, chunk) { | ||
var er; | ||
if (!_isUint8Array(chunk) && typeof chunk !== 'string' && chunk !== undefined && !state.objectMode) { | ||
er = new ERR_INVALID_ARG_TYPE('chunk', ['string', 'Buffer', 'Uint8Array'], chunk); | ||
} | ||
return er; | ||
} | ||
Readable.prototype.isPaused = function () { | ||
return this._readableState.flowing === false; | ||
}; // backwards compatibility. | ||
Readable.prototype.setEncoding = function (enc) { | ||
if (!StringDecoder) StringDecoder = require('string_decoder/').StringDecoder; | ||
var decoder = new StringDecoder(enc); | ||
this._readableState.decoder = decoder; // If setEncoding(null), decoder.encoding equals utf8 | ||
this._readableState.encoding = this._readableState.decoder.encoding; // Iterate over current buffer to convert already stored Buffers: | ||
var p = this._readableState.buffer.head; | ||
var content = ''; | ||
while (p !== null) { | ||
content += decoder.write(p.data); | ||
p = p.next; | ||
} | ||
this._readableState.buffer.clear(); | ||
if (content !== '') this._readableState.buffer.push(content); | ||
this._readableState.length = content.length; | ||
return this; | ||
}; // Don't raise the hwm > 1GB | ||
var MAX_HWM = 0x40000000; | ||
function computeNewHighWaterMark(n) { | ||
if (n >= MAX_HWM) { | ||
// TODO(ronag): Throw ERR_VALUE_OUT_OF_RANGE. | ||
n = MAX_HWM; | ||
} else { | ||
// Get the next highest power of 2 to prevent increasing hwm excessively in | ||
// tiny amounts | ||
n--; | ||
n |= n >>> 1; | ||
n |= n >>> 2; | ||
n |= n >>> 4; | ||
n |= n >>> 8; | ||
n |= n >>> 16; | ||
n++; | ||
} | ||
return n; | ||
} // This function is designed to be inlinable, so please take care when making | ||
// changes to the function body. | ||
function howMuchToRead(n, state) { | ||
if (n <= 0 || state.length === 0 && state.ended) return 0; | ||
if (state.objectMode) return 1; | ||
if (n !== n) { | ||
// Only flow one buffer at a time | ||
if (state.flowing && state.length) return state.buffer.head.data.length;else return state.length; | ||
} // If we're asking for more than the current hwm, then raise the hwm. | ||
if (n > state.highWaterMark) state.highWaterMark = computeNewHighWaterMark(n); | ||
if (n <= state.length) return n; // Don't have enough | ||
if (!state.ended) { | ||
state.needReadable = true; | ||
return 0; | ||
} | ||
return state.length; | ||
} // you can override either this method, or the async _read(n) below. | ||
Readable.prototype.read = function (n) { | ||
debug('read', n); | ||
n = parseInt(n, 10); | ||
var state = this._readableState; | ||
var nOrig = n; | ||
if (n !== 0) state.emittedReadable = false; // if we're doing read(0) to trigger a readable event, but we | ||
// already have a bunch of data in the buffer, then just trigger | ||
// the 'readable' event and move on. | ||
if (n === 0 && state.needReadable && ((state.highWaterMark !== 0 ? state.length >= state.highWaterMark : state.length > 0) || state.ended)) { | ||
debug('read: emitReadable', state.length, state.ended); | ||
if (state.length === 0 && state.ended) endReadable(this);else emitReadable(this); | ||
return null; | ||
} | ||
n = howMuchToRead(n, state); // if we've ended, and we're now clear, then finish it up. | ||
if (n === 0 && state.ended) { | ||
if (state.length === 0) endReadable(this); | ||
return null; | ||
} // All the actual chunk generation logic needs to be | ||
// *below* the call to _read. The reason is that in certain | ||
// synthetic stream cases, such as passthrough streams, _read | ||
// may be a completely synchronous operation which may change | ||
// the state of the read buffer, providing enough data when | ||
// before there was *not* enough. | ||
// | ||
// So, the steps are: | ||
// 1. Figure out what the state of things will be after we do | ||
// a read from the buffer. | ||
// | ||
// 2. If that resulting state will trigger a _read, then call _read. | ||
// Note that this may be asynchronous, or synchronous. Yes, it is | ||
// deeply ugly to write APIs this way, but that still doesn't mean | ||
// that the Readable class should behave improperly, as streams are | ||
// designed to be sync/async agnostic. | ||
// Take note if the _read call is sync or async (ie, if the read call | ||
// has returned yet), so that we know whether or not it's safe to emit | ||
// 'readable' etc. | ||
// | ||
// 3. Actually pull the requested chunks out of the buffer and return. | ||
// if we need a readable event, then we need to do some reading. | ||
var doRead = state.needReadable; | ||
debug('need readable', doRead); // if we currently have less than the highWaterMark, then also read some | ||
if (state.length === 0 || state.length - n < state.highWaterMark) { | ||
doRead = true; | ||
debug('length less than watermark', doRead); | ||
} // however, if we've ended, then there's no point, and if we're already | ||
// reading, then it's unnecessary. | ||
if (state.ended || state.reading) { | ||
doRead = false; | ||
debug('reading or ended', doRead); | ||
} else if (doRead) { | ||
debug('do read'); | ||
state.reading = true; | ||
state.sync = true; // if the length is currently zero, then we *need* a readable event. | ||
if (state.length === 0) state.needReadable = true; // call internal read method | ||
this._read(state.highWaterMark); | ||
state.sync = false; // If _read pushed data synchronously, then `reading` will be false, | ||
// and we need to re-evaluate how much data we can return to the user. | ||
if (!state.reading) n = howMuchToRead(nOrig, state); | ||
} | ||
var ret; | ||
if (n > 0) ret = fromList(n, state);else ret = null; | ||
if (ret === null) { | ||
state.needReadable = state.length <= state.highWaterMark; | ||
n = 0; | ||
} else { | ||
state.length -= n; | ||
state.awaitDrain = 0; | ||
} | ||
if (state.length === 0) { | ||
// If we have nothing in the buffer, then we want to know | ||
// as soon as we *do* get something into the buffer. | ||
if (!state.ended) state.needReadable = true; // If we tried to read() past the EOF, then emit end on the next tick. | ||
if (nOrig !== n && state.ended) endReadable(this); | ||
} | ||
if (ret !== null) this.emit('data', ret); | ||
return ret; | ||
}; | ||
function onEofChunk(stream, state) { | ||
debug('onEofChunk'); | ||
if (state.ended) return; | ||
if (state.decoder) { | ||
var chunk = state.decoder.end(); | ||
if (chunk && chunk.length) { | ||
state.buffer.push(chunk); | ||
state.length += state.objectMode ? 1 : chunk.length; | ||
} | ||
} | ||
state.ended = true; | ||
if (state.sync) { | ||
// if we are sync, wait until next tick to emit the data. | ||
// Otherwise we risk emitting data in the flow() | ||
// the readable code triggers during a read() call | ||
emitReadable(stream); | ||
} else { | ||
// emit 'readable' now to make sure it gets picked up. | ||
state.needReadable = false; | ||
if (!state.emittedReadable) { | ||
state.emittedReadable = true; | ||
emitReadable_(stream); | ||
} | ||
} | ||
} // Don't emit readable right away in sync mode, because this can trigger | ||
// another read() call => stack overflow. This way, it might trigger | ||
// a nextTick recursion warning, but that's not so bad. | ||
function emitReadable(stream) { | ||
var state = stream._readableState; | ||
debug('emitReadable', state.needReadable, state.emittedReadable); | ||
state.needReadable = false; | ||
if (!state.emittedReadable) { | ||
debug('emitReadable', state.flowing); | ||
state.emittedReadable = true; | ||
process.nextTick(emitReadable_, stream); | ||
} | ||
} | ||
function emitReadable_(stream) { | ||
var state = stream._readableState; | ||
debug('emitReadable_', state.destroyed, state.length, state.ended); | ||
if (!state.destroyed && (state.length || state.ended)) { | ||
stream.emit('readable'); | ||
state.emittedReadable = false; | ||
} // The stream needs another readable event if | ||
// 1. It is not flowing, as the flow mechanism will take | ||
// care of it. | ||
// 2. It is not ended. | ||
// 3. It is below the highWaterMark, so we can schedule | ||
// another readable later. | ||
state.needReadable = !state.flowing && !state.ended && state.length <= state.highWaterMark; | ||
flow(stream); | ||
} // at this point, the user has presumably seen the 'readable' event, | ||
// and called read() to consume some data. that may have triggered | ||
// in turn another _read(n) call, in which case reading = true if | ||
// it's in progress. | ||
// However, if we're not ended, or reading, and the length < hwm, | ||
// then go ahead and try to read some more preemptively. | ||
function maybeReadMore(stream, state) { | ||
if (!state.readingMore) { | ||
state.readingMore = true; | ||
process.nextTick(maybeReadMore_, stream, state); | ||
} | ||
} | ||
function maybeReadMore_(stream, state) { | ||
// Attempt to read more data if we should. | ||
// | ||
// The conditions for reading more data are (one of): | ||
// - Not enough data buffered (state.length < state.highWaterMark). The loop | ||
// is responsible for filling the buffer with enough data if such data | ||
// is available. If highWaterMark is 0 and we are not in the flowing mode | ||
// we should _not_ attempt to buffer any extra data. We'll get more data | ||
// when the stream consumer calls read() instead. | ||
// - No data in the buffer, and the stream is in flowing mode. In this mode | ||
// the loop below is responsible for ensuring read() is called. Failing to | ||
// call read here would abort the flow and there's no other mechanism for | ||
// continuing the flow if the stream consumer has just subscribed to the | ||
// 'data' event. | ||
// | ||
// In addition to the above conditions to keep reading data, the following | ||
// conditions prevent the data from being read: | ||
// - The stream has ended (state.ended). | ||
// - There is already a pending 'read' operation (state.reading). This is a | ||
// case where the the stream has called the implementation defined _read() | ||
// method, but they are processing the call asynchronously and have _not_ | ||
// called push() with new data. In this case we skip performing more | ||
// read()s. The execution ends in this method again after the _read() ends | ||
// up calling push() with more data. | ||
while (!state.reading && !state.ended && (state.length < state.highWaterMark || state.flowing && state.length === 0)) { | ||
var len = state.length; | ||
debug('maybeReadMore read 0'); | ||
stream.read(0); | ||
if (len === state.length) // didn't get any data, stop spinning. | ||
break; | ||
} | ||
state.readingMore = false; | ||
} // abstract method. to be overridden in specific implementation classes. | ||
// call cb(er, data) where data is <= n in length. | ||
// for virtual (non-string, non-buffer) streams, "length" is somewhat | ||
// arbitrary, and perhaps not very meaningful. | ||
Readable.prototype._read = function (n) { | ||
errorOrDestroy(this, new ERR_METHOD_NOT_IMPLEMENTED('_read()')); | ||
}; | ||
Readable.prototype.pipe = function (dest, pipeOpts) { | ||
var src = this; | ||
var state = this._readableState; | ||
switch (state.pipesCount) { | ||
case 0: | ||
state.pipes = dest; | ||
break; | ||
case 1: | ||
state.pipes = [state.pipes, dest]; | ||
break; | ||
default: | ||
state.pipes.push(dest); | ||
break; | ||
} | ||
state.pipesCount += 1; | ||
debug('pipe count=%d opts=%j', state.pipesCount, pipeOpts); | ||
var doEnd = (!pipeOpts || pipeOpts.end !== false) && dest !== process.stdout && dest !== process.stderr; | ||
var endFn = doEnd ? onend : unpipe; | ||
if (state.endEmitted) process.nextTick(endFn);else src.once('end', endFn); | ||
dest.on('unpipe', onunpipe); | ||
function onunpipe(readable, unpipeInfo) { | ||
debug('onunpipe'); | ||
if (readable === src) { | ||
if (unpipeInfo && unpipeInfo.hasUnpiped === false) { | ||
unpipeInfo.hasUnpiped = true; | ||
cleanup(); | ||
} | ||
} | ||
} | ||
function onend() { | ||
debug('onend'); | ||
dest.end(); | ||
} // when the dest drains, it reduces the awaitDrain counter | ||
// on the source. This would be more elegant with a .once() | ||
// handler in flow(), but adding and removing repeatedly is | ||
// too slow. | ||
var ondrain = pipeOnDrain(src); | ||
dest.on('drain', ondrain); | ||
var cleanedUp = false; | ||
function cleanup() { | ||
debug('cleanup'); // cleanup event handlers once the pipe is broken | ||
dest.removeListener('close', onclose); | ||
dest.removeListener('finish', onfinish); | ||
dest.removeListener('drain', ondrain); | ||
dest.removeListener('error', onerror); | ||
dest.removeListener('unpipe', onunpipe); | ||
src.removeListener('end', onend); | ||
src.removeListener('end', unpipe); | ||
src.removeListener('data', ondata); | ||
cleanedUp = true; // if the reader is waiting for a drain event from this | ||
// specific writer, then it would cause it to never start | ||
// flowing again. | ||
// So, if this is awaiting a drain, then we just call it now. | ||
// If we don't know, then assume that we are waiting for one. | ||
if (state.awaitDrain && (!dest._writableState || dest._writableState.needDrain)) ondrain(); | ||
} | ||
src.on('data', ondata); | ||
function ondata(chunk) { | ||
debug('ondata'); | ||
var ret = dest.write(chunk); | ||
debug('dest.write', ret); | ||
if (ret === false) { | ||
// If the user unpiped during `dest.write()`, it is possible | ||
// to get stuck in a permanently paused state if that write | ||
// also returned false. | ||
// => Check whether `dest` is still a piping destination. | ||
if ((state.pipesCount === 1 && state.pipes === dest || state.pipesCount > 1 && indexOf(state.pipes, dest) !== -1) && !cleanedUp) { | ||
debug('false write response, pause', state.awaitDrain); | ||
state.awaitDrain++; | ||
} | ||
src.pause(); | ||
} | ||
} // if the dest has an error, then stop piping into it. | ||
// however, don't suppress the throwing behavior for this. | ||
function onerror(er) { | ||
debug('onerror', er); | ||
unpipe(); | ||
dest.removeListener('error', onerror); | ||
if (EElistenerCount(dest, 'error') === 0) errorOrDestroy(dest, er); | ||
} // Make sure our error handler is attached before userland ones. | ||
prependListener(dest, 'error', onerror); // Both close and finish should trigger unpipe, but only once. | ||
function onclose() { | ||
dest.removeListener('finish', onfinish); | ||
unpipe(); | ||
} | ||
dest.once('close', onclose); | ||
function onfinish() { | ||
debug('onfinish'); | ||
dest.removeListener('close', onclose); | ||
unpipe(); | ||
} | ||
dest.once('finish', onfinish); | ||
function unpipe() { | ||
debug('unpipe'); | ||
src.unpipe(dest); | ||
} // tell the dest that it's being piped to | ||
dest.emit('pipe', src); // start the flow if it hasn't been started already. | ||
if (!state.flowing) { | ||
debug('pipe resume'); | ||
src.resume(); | ||
} | ||
return dest; | ||
}; | ||
function pipeOnDrain(src) { | ||
return function pipeOnDrainFunctionResult() { | ||
var state = src._readableState; | ||
debug('pipeOnDrain', state.awaitDrain); | ||
if (state.awaitDrain) state.awaitDrain--; | ||
if (state.awaitDrain === 0 && EElistenerCount(src, 'data')) { | ||
state.flowing = true; | ||
flow(src); | ||
} | ||
}; | ||
} | ||
Readable.prototype.unpipe = function (dest) { | ||
var state = this._readableState; | ||
var unpipeInfo = { | ||
hasUnpiped: false | ||
}; // if we're not piping anywhere, then do nothing. | ||
if (state.pipesCount === 0) return this; // just one destination. most common case. | ||
if (state.pipesCount === 1) { | ||
// passed in one, but it's not the right one. | ||
if (dest && dest !== state.pipes) return this; | ||
if (!dest) dest = state.pipes; // got a match. | ||
state.pipes = null; | ||
state.pipesCount = 0; | ||
state.flowing = false; | ||
if (dest) dest.emit('unpipe', this, unpipeInfo); | ||
return this; | ||
} // slow case. multiple pipe destinations. | ||
if (!dest) { | ||
// remove all. | ||
var dests = state.pipes; | ||
var len = state.pipesCount; | ||
state.pipes = null; | ||
state.pipesCount = 0; | ||
state.flowing = false; | ||
for (var i = 0; i < len; i++) { | ||
dests[i].emit('unpipe', this, { | ||
hasUnpiped: false | ||
}); | ||
} | ||
return this; | ||
} // try to find the right one. | ||
var index = indexOf(state.pipes, dest); | ||
if (index === -1) return this; | ||
state.pipes.splice(index, 1); | ||
state.pipesCount -= 1; | ||
if (state.pipesCount === 1) state.pipes = state.pipes[0]; | ||
dest.emit('unpipe', this, unpipeInfo); | ||
return this; | ||
}; // set up data events if they are asked for | ||
// Ensure readable listeners eventually get something | ||
Readable.prototype.on = function (ev, fn) { | ||
var res = Stream.prototype.on.call(this, ev, fn); | ||
var state = this._readableState; | ||
if (ev === 'data') { | ||
// update readableListening so that resume() may be a no-op | ||
// a few lines down. This is needed to support once('readable'). | ||
state.readableListening = this.listenerCount('readable') > 0; // Try start flowing on next tick if stream isn't explicitly paused | ||
if (state.flowing !== false) this.resume(); | ||
} else if (ev === 'readable') { | ||
if (!state.endEmitted && !state.readableListening) { | ||
state.readableListening = state.needReadable = true; | ||
state.flowing = false; | ||
state.emittedReadable = false; | ||
debug('on readable', state.length, state.reading); | ||
if (state.length) { | ||
emitReadable(this); | ||
} else if (!state.reading) { | ||
process.nextTick(nReadingNextTick, this); | ||
} | ||
} | ||
} | ||
return res; | ||
}; | ||
Readable.prototype.addListener = Readable.prototype.on; | ||
Readable.prototype.removeListener = function (ev, fn) { | ||
var res = Stream.prototype.removeListener.call(this, ev, fn); | ||
if (ev === 'readable') { | ||
// We need to check if there is someone still listening to | ||
// readable and reset the state. However this needs to happen | ||
// after readable has been emitted but before I/O (nextTick) to | ||
// support once('readable', fn) cycles. This means that calling | ||
// resume within the same tick will have no | ||
// effect. | ||
process.nextTick(updateReadableListening, this); | ||
} | ||
return res; | ||
}; | ||
Readable.prototype.removeAllListeners = function (ev) { | ||
var res = Stream.prototype.removeAllListeners.apply(this, arguments); | ||
if (ev === 'readable' || ev === undefined) { | ||
// We need to check if there is someone still listening to | ||
// readable and reset the state. However this needs to happen | ||
// after readable has been emitted but before I/O (nextTick) to | ||
// support once('readable', fn) cycles. This means that calling | ||
// resume within the same tick will have no | ||
// effect. | ||
process.nextTick(updateReadableListening, this); | ||
} | ||
return res; | ||
}; | ||
function updateReadableListening(self) { | ||
var state = self._readableState; | ||
state.readableListening = self.listenerCount('readable') > 0; | ||
if (state.resumeScheduled && !state.paused) { | ||
// flowing needs to be set to true now, otherwise | ||
// the upcoming resume will not flow. | ||
state.flowing = true; // crude way to check if we should resume | ||
} else if (self.listenerCount('data') > 0) { | ||
self.resume(); | ||
} | ||
} | ||
function nReadingNextTick(self) { | ||
debug('readable nexttick read 0'); | ||
self.read(0); | ||
} // pause() and resume() are remnants of the legacy readable stream API | ||
// If the user uses them, then switch into old mode. | ||
Readable.prototype.resume = function () { | ||
var state = this._readableState; | ||
if (!state.flowing) { | ||
debug('resume'); // we flow only if there is no one listening | ||
// for readable, but we still have to call | ||
// resume() | ||
state.flowing = !state.readableListening; | ||
resume(this, state); | ||
} | ||
state.paused = false; | ||
return this; | ||
}; | ||
function resume(stream, state) { | ||
if (!state.resumeScheduled) { | ||
state.resumeScheduled = true; | ||
process.nextTick(resume_, stream, state); | ||
} | ||
} | ||
function resume_(stream, state) { | ||
debug('resume', state.reading); | ||
if (!state.reading) { | ||
stream.read(0); | ||
} | ||
state.resumeScheduled = false; | ||
stream.emit('resume'); | ||
flow(stream); | ||
if (state.flowing && !state.reading) stream.read(0); | ||
} | ||
Readable.prototype.pause = function () { | ||
debug('call pause flowing=%j', this._readableState.flowing); | ||
if (this._readableState.flowing !== false) { | ||
debug('pause'); | ||
this._readableState.flowing = false; | ||
this.emit('pause'); | ||
} | ||
this._readableState.paused = true; | ||
return this; | ||
}; | ||
function flow(stream) { | ||
var state = stream._readableState; | ||
debug('flow', state.flowing); | ||
while (state.flowing && stream.read() !== null) { | ||
; | ||
} | ||
} // wrap an old-style stream as the async data source. | ||
// This is *not* part of the readable stream interface. | ||
// It is an ugly unfortunate mess of history. | ||
Readable.prototype.wrap = function (stream) { | ||
var _this = this; | ||
var state = this._readableState; | ||
var paused = false; | ||
stream.on('end', function () { | ||
debug('wrapped end'); | ||
if (state.decoder && !state.ended) { | ||
var chunk = state.decoder.end(); | ||
if (chunk && chunk.length) _this.push(chunk); | ||
} | ||
_this.push(null); | ||
}); | ||
stream.on('data', function (chunk) { | ||
debug('wrapped data'); | ||
if (state.decoder) chunk = state.decoder.write(chunk); // don't skip over falsy values in objectMode | ||
if (state.objectMode && (chunk === null || chunk === undefined)) return;else if (!state.objectMode && (!chunk || !chunk.length)) return; | ||
var ret = _this.push(chunk); | ||
if (!ret) { | ||
paused = true; | ||
stream.pause(); | ||
} | ||
}); // proxy all the other methods. | ||
// important when wrapping filters and duplexes. | ||
for (var i in stream) { | ||
if (this[i] === undefined && typeof stream[i] === 'function') { | ||
this[i] = function methodWrap(method) { | ||
return function methodWrapReturnFunction() { | ||
return stream[method].apply(stream, arguments); | ||
}; | ||
}(i); | ||
} | ||
} // proxy certain important events. | ||
for (var n = 0; n < kProxyEvents.length; n++) { | ||
stream.on(kProxyEvents[n], this.emit.bind(this, kProxyEvents[n])); | ||
} // when we try to consume some more bytes, simply unpause the | ||
// underlying stream. | ||
this._read = function (n) { | ||
debug('wrapped _read', n); | ||
if (paused) { | ||
paused = false; | ||
stream.resume(); | ||
} | ||
}; | ||
return this; | ||
}; | ||
if (typeof Symbol === 'function') { | ||
Readable.prototype[Symbol.asyncIterator] = function () { | ||
if (createReadableStreamAsyncIterator === undefined) { | ||
createReadableStreamAsyncIterator = require('./internal/streams/async_iterator'); | ||
} | ||
return createReadableStreamAsyncIterator(this); | ||
}; | ||
} | ||
Object.defineProperty(Readable.prototype, 'readableHighWaterMark', { | ||
// making it explicit this property is not enumerable | ||
// because otherwise some prototype manipulation in | ||
// userland will fail | ||
enumerable: false, | ||
get: function get() { | ||
return this._readableState.highWaterMark; | ||
} | ||
}); | ||
Object.defineProperty(Readable.prototype, 'readableBuffer', { | ||
// making it explicit this property is not enumerable | ||
// because otherwise some prototype manipulation in | ||
// userland will fail | ||
enumerable: false, | ||
get: function get() { | ||
return this._readableState && this._readableState.buffer; | ||
} | ||
}); | ||
Object.defineProperty(Readable.prototype, 'readableFlowing', { | ||
// making it explicit this property is not enumerable | ||
// because otherwise some prototype manipulation in | ||
// userland will fail | ||
enumerable: false, | ||
get: function get() { | ||
return this._readableState.flowing; | ||
}, | ||
set: function set(state) { | ||
if (this._readableState) { | ||
this._readableState.flowing = state; | ||
} | ||
} | ||
}); // exposed for testing purposes only. | ||
Readable._fromList = fromList; | ||
Object.defineProperty(Readable.prototype, 'readableLength', { | ||
// making it explicit this property is not enumerable | ||
// because otherwise some prototype manipulation in | ||
// userland will fail | ||
enumerable: false, | ||
get: function get() { | ||
return this._readableState.length; | ||
} | ||
}); // Pluck off n bytes from an array of buffers. | ||
// Length is the combined lengths of all the buffers in the list. | ||
// This function is designed to be inlinable, so please take care when making | ||
// changes to the function body. | ||
function fromList(n, state) { | ||
// nothing buffered | ||
if (state.length === 0) return null; | ||
var ret; | ||
if (state.objectMode) ret = state.buffer.shift();else if (!n || n >= state.length) { | ||
// read it all, truncate the list | ||
if (state.decoder) ret = state.buffer.join('');else if (state.buffer.length === 1) ret = state.buffer.first();else ret = state.buffer.concat(state.length); | ||
state.buffer.clear(); | ||
} else { | ||
// read part of list | ||
ret = state.buffer.consume(n, state.decoder); | ||
} | ||
return ret; | ||
} | ||
function endReadable(stream) { | ||
var state = stream._readableState; | ||
debug('endReadable', state.endEmitted); | ||
if (!state.endEmitted) { | ||
state.ended = true; | ||
process.nextTick(endReadableNT, state, stream); | ||
} | ||
} | ||
function endReadableNT(state, stream) { | ||
debug('endReadableNT', state.endEmitted, state.length); // Check that we didn't get one last unshift. | ||
if (!state.endEmitted && state.length === 0) { | ||
state.endEmitted = true; | ||
stream.readable = false; | ||
stream.emit('end'); | ||
if (state.autoDestroy) { | ||
// In case of duplex streams we need a way to detect | ||
// if the writable side is ready for autoDestroy as well | ||
var wState = stream._writableState; | ||
if (!wState || wState.autoDestroy && wState.finished) { | ||
stream.destroy(); | ||
} | ||
} | ||
} | ||
} | ||
if (typeof Symbol === 'function') { | ||
Readable.from = function (iterable, opts) { | ||
if (from === undefined) { | ||
from = require('./internal/streams/from'); | ||
} | ||
return from(Readable, iterable, opts); | ||
}; | ||
} | ||
function indexOf(xs, x) { | ||
for (var i = 0, l = xs.length; i < l; i++) { | ||
if (xs[i] === x) return i; | ||
} | ||
return -1; | ||
} | ||
module.exports = require('./stream').Readable |
@@ -1,201 +0,3 @@ | ||
// Copyright Joyent, Inc. and other Node contributors. | ||
// | ||
// Permission is hereby granted, free of charge, to any person obtaining a | ||
// copy of this software and associated documentation files (the | ||
// "Software"), to deal in the Software without restriction, including | ||
// without limitation the rights to use, copy, modify, merge, publish, | ||
// distribute, sublicense, and/or sell copies of the Software, and to permit | ||
// persons to whom the Software is furnished to do so, subject to the | ||
// following conditions: | ||
// | ||
// The above copyright notice and this permission notice shall be included | ||
// in all copies or substantial portions of the Software. | ||
// | ||
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS | ||
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF | ||
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN | ||
// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, | ||
// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR | ||
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE | ||
// USE OR OTHER DEALINGS IN THE SOFTWARE. | ||
// a transform stream is a readable/writable stream where you do | ||
// something with the data. Sometimes it's called a "filter", | ||
// but that's not a great name for it, since that implies a thing where | ||
// some bits pass through, and others are simply ignored. (That would | ||
// be a valid example of a transform, of course.) | ||
// | ||
// While the output is causally related to the input, it's not a | ||
// necessarily symmetric or synchronous transformation. For example, | ||
// a zlib stream might take multiple plain-text writes(), and then | ||
// emit a single compressed chunk some time in the future. | ||
// | ||
// Here's how this works: | ||
// | ||
// The Transform stream has all the aspects of the readable and writable | ||
// stream classes. When you write(chunk), that calls _write(chunk,cb) | ||
// internally, and returns false if there's a lot of pending writes | ||
// buffered up. When you call read(), that calls _read(n) until | ||
// there's enough pending readable data buffered up. | ||
// | ||
// In a transform stream, the written data is placed in a buffer. When | ||
// _read(n) is called, it transforms the queued up data, calling the | ||
// buffered _write cb's as it consumes chunks. If consuming a single | ||
// written chunk would result in multiple output chunks, then the first | ||
// outputted bit calls the readcb, and subsequent chunks just go into | ||
// the read buffer, and will cause it to emit 'readable' if necessary. | ||
// | ||
// This way, back-pressure is actually determined by the reading side, | ||
// since _read has to be called to start processing a new chunk. However, | ||
// a pathological inflate type of transform can cause excessive buffering | ||
// here. For example, imagine a stream where every byte of input is | ||
// interpreted as an integer from 0-255, and then results in that many | ||
// bytes of output. Writing the 4 bytes {ff,ff,ff,ff} would result in | ||
// 1kb of data being output. In this case, you could write a very small | ||
// amount of input, and end up with a very large amount of output. In | ||
// such a pathological inflating mechanism, there'd be no way to tell | ||
// the system to stop doing the transform. A single 4MB write could | ||
// cause the system to run out of memory. | ||
// | ||
// However, even in such a pathological case, only a single written chunk | ||
// would be consumed, and then the rest would wait (un-transformed) until | ||
// the results of the previous transformed chunk were consumed. | ||
'use strict'; | ||
'use strict' // Keep this file as an alias for the full stream module. | ||
module.exports = Transform; | ||
var _require$codes = require('../errors').codes, | ||
ERR_METHOD_NOT_IMPLEMENTED = _require$codes.ERR_METHOD_NOT_IMPLEMENTED, | ||
ERR_MULTIPLE_CALLBACK = _require$codes.ERR_MULTIPLE_CALLBACK, | ||
ERR_TRANSFORM_ALREADY_TRANSFORMING = _require$codes.ERR_TRANSFORM_ALREADY_TRANSFORMING, | ||
ERR_TRANSFORM_WITH_LENGTH_0 = _require$codes.ERR_TRANSFORM_WITH_LENGTH_0; | ||
var Duplex = require('./_stream_duplex'); | ||
require('inherits')(Transform, Duplex); | ||
function afterTransform(er, data) { | ||
var ts = this._transformState; | ||
ts.transforming = false; | ||
var cb = ts.writecb; | ||
if (cb === null) { | ||
return this.emit('error', new ERR_MULTIPLE_CALLBACK()); | ||
} | ||
ts.writechunk = null; | ||
ts.writecb = null; | ||
if (data != null) // single equals check for both `null` and `undefined` | ||
this.push(data); | ||
cb(er); | ||
var rs = this._readableState; | ||
rs.reading = false; | ||
if (rs.needReadable || rs.length < rs.highWaterMark) { | ||
this._read(rs.highWaterMark); | ||
} | ||
} | ||
function Transform(options) { | ||
if (!(this instanceof Transform)) return new Transform(options); | ||
Duplex.call(this, options); | ||
this._transformState = { | ||
afterTransform: afterTransform.bind(this), | ||
needTransform: false, | ||
transforming: false, | ||
writecb: null, | ||
writechunk: null, | ||
writeencoding: null | ||
}; // start out asking for a readable event once data is transformed. | ||
this._readableState.needReadable = true; // we have implemented the _read method, and done the other things | ||
// that Readable wants before the first _read call, so unset the | ||
// sync guard flag. | ||
this._readableState.sync = false; | ||
if (options) { | ||
if (typeof options.transform === 'function') this._transform = options.transform; | ||
if (typeof options.flush === 'function') this._flush = options.flush; | ||
} // When the writable side finishes, then flush out anything remaining. | ||
this.on('prefinish', prefinish); | ||
} | ||
function prefinish() { | ||
var _this = this; | ||
if (typeof this._flush === 'function' && !this._readableState.destroyed) { | ||
this._flush(function (er, data) { | ||
done(_this, er, data); | ||
}); | ||
} else { | ||
done(this, null, null); | ||
} | ||
} | ||
Transform.prototype.push = function (chunk, encoding) { | ||
this._transformState.needTransform = false; | ||
return Duplex.prototype.push.call(this, chunk, encoding); | ||
}; // This is the part where you do stuff! | ||
// override this function in implementation classes. | ||
// 'chunk' is an input chunk. | ||
// | ||
// Call `push(newChunk)` to pass along transformed output | ||
// to the readable side. You may call 'push' zero or more times. | ||
// | ||
// Call `cb(err)` when you are done with this chunk. If you pass | ||
// an error, then that'll put the hurt on the whole operation. If you | ||
// never call cb(), then you'll never get another chunk. | ||
Transform.prototype._transform = function (chunk, encoding, cb) { | ||
cb(new ERR_METHOD_NOT_IMPLEMENTED('_transform()')); | ||
}; | ||
Transform.prototype._write = function (chunk, encoding, cb) { | ||
var ts = this._transformState; | ||
ts.writecb = cb; | ||
ts.writechunk = chunk; | ||
ts.writeencoding = encoding; | ||
if (!ts.transforming) { | ||
var rs = this._readableState; | ||
if (ts.needTransform || rs.needReadable || rs.length < rs.highWaterMark) this._read(rs.highWaterMark); | ||
} | ||
}; // Doesn't matter what the args are here. | ||
// _transform does all the work. | ||
// That we got here means that the readable side wants more data. | ||
Transform.prototype._read = function (n) { | ||
var ts = this._transformState; | ||
if (ts.writechunk !== null && !ts.transforming) { | ||
ts.transforming = true; | ||
this._transform(ts.writechunk, ts.writeencoding, ts.afterTransform); | ||
} else { | ||
// mark that we need a transform, so that any data that comes in | ||
// will get processed, now that we've asked for it. | ||
ts.needTransform = true; | ||
} | ||
}; | ||
Transform.prototype._destroy = function (err, cb) { | ||
Duplex.prototype._destroy.call(this, err, function (err2) { | ||
cb(err2); | ||
}); | ||
}; | ||
function done(stream, er, data) { | ||
if (er) return stream.emit('error', er); | ||
if (data != null) // single equals check for both `null` and `undefined` | ||
stream.push(data); // TODO(BridgeAR): Write a test for these two error cases | ||
// if there's nothing in the write buffer, then that means | ||
// that nothing more will ever be provided | ||
if (stream._writableState.length) throw new ERR_TRANSFORM_WITH_LENGTH_0(); | ||
if (stream._transformState.transforming) throw new ERR_TRANSFORM_ALREADY_TRANSFORMING(); | ||
return stream.push(null); | ||
} | ||
module.exports = require('./stream').Transform |
@@ -1,697 +0,3 @@ | ||
// Copyright Joyent, Inc. and other Node contributors. | ||
// | ||
// Permission is hereby granted, free of charge, to any person obtaining a | ||
// copy of this software and associated documentation files (the | ||
// "Software"), to deal in the Software without restriction, including | ||
// without limitation the rights to use, copy, modify, merge, publish, | ||
// distribute, sublicense, and/or sell copies of the Software, and to permit | ||
// persons to whom the Software is furnished to do so, subject to the | ||
// following conditions: | ||
// | ||
// The above copyright notice and this permission notice shall be included | ||
// in all copies or substantial portions of the Software. | ||
// | ||
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS | ||
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF | ||
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN | ||
// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, | ||
// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR | ||
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE | ||
// USE OR OTHER DEALINGS IN THE SOFTWARE. | ||
// A bit simpler than readable streams. | ||
// Implement an async ._write(chunk, encoding, cb), and it'll handle all | ||
// the drain event emission and buffering. | ||
'use strict'; | ||
'use strict' // Keep this file as an alias for the full stream module. | ||
module.exports = Writable; | ||
/* <replacement> */ | ||
function WriteReq(chunk, encoding, cb) { | ||
this.chunk = chunk; | ||
this.encoding = encoding; | ||
this.callback = cb; | ||
this.next = null; | ||
} // It seems a linked list but it is not | ||
// there will be only 2 of these for each stream | ||
function CorkedRequest(state) { | ||
var _this = this; | ||
this.next = null; | ||
this.entry = null; | ||
this.finish = function () { | ||
onCorkedFinish(_this, state); | ||
}; | ||
} | ||
/* </replacement> */ | ||
/*<replacement>*/ | ||
var Duplex; | ||
/*</replacement>*/ | ||
Writable.WritableState = WritableState; | ||
/*<replacement>*/ | ||
var internalUtil = { | ||
deprecate: require('util-deprecate') | ||
}; | ||
/*</replacement>*/ | ||
/*<replacement>*/ | ||
var Stream = require('./internal/streams/stream'); | ||
/*</replacement>*/ | ||
var Buffer = require('buffer').Buffer; | ||
var OurUint8Array = global.Uint8Array || function () {}; | ||
function _uint8ArrayToBuffer(chunk) { | ||
return Buffer.from(chunk); | ||
} | ||
function _isUint8Array(obj) { | ||
return Buffer.isBuffer(obj) || obj instanceof OurUint8Array; | ||
} | ||
var destroyImpl = require('./internal/streams/destroy'); | ||
var _require = require('./internal/streams/state'), | ||
getHighWaterMark = _require.getHighWaterMark; | ||
var _require$codes = require('../errors').codes, | ||
ERR_INVALID_ARG_TYPE = _require$codes.ERR_INVALID_ARG_TYPE, | ||
ERR_METHOD_NOT_IMPLEMENTED = _require$codes.ERR_METHOD_NOT_IMPLEMENTED, | ||
ERR_MULTIPLE_CALLBACK = _require$codes.ERR_MULTIPLE_CALLBACK, | ||
ERR_STREAM_CANNOT_PIPE = _require$codes.ERR_STREAM_CANNOT_PIPE, | ||
ERR_STREAM_DESTROYED = _require$codes.ERR_STREAM_DESTROYED, | ||
ERR_STREAM_NULL_VALUES = _require$codes.ERR_STREAM_NULL_VALUES, | ||
ERR_STREAM_WRITE_AFTER_END = _require$codes.ERR_STREAM_WRITE_AFTER_END, | ||
ERR_UNKNOWN_ENCODING = _require$codes.ERR_UNKNOWN_ENCODING; | ||
var errorOrDestroy = destroyImpl.errorOrDestroy; | ||
require('inherits')(Writable, Stream); | ||
function nop() {} | ||
function WritableState(options, stream, isDuplex) { | ||
Duplex = Duplex || require('./_stream_duplex'); | ||
options = options || {}; // Duplex streams are both readable and writable, but share | ||
// the same options object. | ||
// However, some cases require setting options to different | ||
// values for the readable and the writable sides of the duplex stream, | ||
// e.g. options.readableObjectMode vs. options.writableObjectMode, etc. | ||
if (typeof isDuplex !== 'boolean') isDuplex = stream instanceof Duplex; // object stream flag to indicate whether or not this stream | ||
// contains buffers or objects. | ||
this.objectMode = !!options.objectMode; | ||
if (isDuplex) this.objectMode = this.objectMode || !!options.writableObjectMode; // the point at which write() starts returning false | ||
// Note: 0 is a valid value, means that we always return false if | ||
// the entire buffer is not flushed immediately on write() | ||
this.highWaterMark = getHighWaterMark(this, options, 'writableHighWaterMark', isDuplex); // if _final has been called | ||
this.finalCalled = false; // drain event flag. | ||
this.needDrain = false; // at the start of calling end() | ||
this.ending = false; // when end() has been called, and returned | ||
this.ended = false; // when 'finish' is emitted | ||
this.finished = false; // has it been destroyed | ||
this.destroyed = false; // should we decode strings into buffers before passing to _write? | ||
// this is here so that some node-core streams can optimize string | ||
// handling at a lower level. | ||
var noDecode = options.decodeStrings === false; | ||
this.decodeStrings = !noDecode; // Crypto is kind of old and crusty. Historically, its default string | ||
// encoding is 'binary' so we have to make this configurable. | ||
// Everything else in the universe uses 'utf8', though. | ||
this.defaultEncoding = options.defaultEncoding || 'utf8'; // not an actual buffer we keep track of, but a measurement | ||
// of how much we're waiting to get pushed to some underlying | ||
// socket or file. | ||
this.length = 0; // a flag to see when we're in the middle of a write. | ||
this.writing = false; // when true all writes will be buffered until .uncork() call | ||
this.corked = 0; // a flag to be able to tell if the onwrite cb is called immediately, | ||
// or on a later tick. We set this to true at first, because any | ||
// actions that shouldn't happen until "later" should generally also | ||
// not happen before the first write call. | ||
this.sync = true; // a flag to know if we're processing previously buffered items, which | ||
// may call the _write() callback in the same tick, so that we don't | ||
// end up in an overlapped onwrite situation. | ||
this.bufferProcessing = false; // the callback that's passed to _write(chunk,cb) | ||
this.onwrite = function (er) { | ||
onwrite(stream, er); | ||
}; // the callback that the user supplies to write(chunk,encoding,cb) | ||
this.writecb = null; // the amount that is being written when _write is called. | ||
this.writelen = 0; | ||
this.bufferedRequest = null; | ||
this.lastBufferedRequest = null; // number of pending user-supplied write callbacks | ||
// this must be 0 before 'finish' can be emitted | ||
this.pendingcb = 0; // emit prefinish if the only thing we're waiting for is _write cbs | ||
// This is relevant for synchronous Transform streams | ||
this.prefinished = false; // True if the error was already emitted and should not be thrown again | ||
this.errorEmitted = false; // Should close be emitted on destroy. Defaults to true. | ||
this.emitClose = options.emitClose !== false; // Should .destroy() be called after 'finish' (and potentially 'end') | ||
this.autoDestroy = !!options.autoDestroy; // count buffered requests | ||
this.bufferedRequestCount = 0; // allocate the first CorkedRequest, there is always | ||
// one allocated and free to use, and we maintain at most two | ||
this.corkedRequestsFree = new CorkedRequest(this); | ||
} | ||
WritableState.prototype.getBuffer = function getBuffer() { | ||
var current = this.bufferedRequest; | ||
var out = []; | ||
while (current) { | ||
out.push(current); | ||
current = current.next; | ||
} | ||
return out; | ||
}; | ||
(function () { | ||
try { | ||
Object.defineProperty(WritableState.prototype, 'buffer', { | ||
get: internalUtil.deprecate(function writableStateBufferGetter() { | ||
return this.getBuffer(); | ||
}, '_writableState.buffer is deprecated. Use _writableState.getBuffer ' + 'instead.', 'DEP0003') | ||
}); | ||
} catch (_) {} | ||
})(); // Test _writableState for inheritance to account for Duplex streams, | ||
// whose prototype chain only points to Readable. | ||
var realHasInstance; | ||
if (typeof Symbol === 'function' && Symbol.hasInstance && typeof Function.prototype[Symbol.hasInstance] === 'function') { | ||
realHasInstance = Function.prototype[Symbol.hasInstance]; | ||
Object.defineProperty(Writable, Symbol.hasInstance, { | ||
value: function value(object) { | ||
if (realHasInstance.call(this, object)) return true; | ||
if (this !== Writable) return false; | ||
return object && object._writableState instanceof WritableState; | ||
} | ||
}); | ||
} else { | ||
realHasInstance = function realHasInstance(object) { | ||
return object instanceof this; | ||
}; | ||
} | ||
function Writable(options) { | ||
Duplex = Duplex || require('./_stream_duplex'); // Writable ctor is applied to Duplexes, too. | ||
// `realHasInstance` is necessary because using plain `instanceof` | ||
// would return false, as no `_writableState` property is attached. | ||
// Trying to use the custom `instanceof` for Writable here will also break the | ||
// Node.js LazyTransform implementation, which has a non-trivial getter for | ||
// `_writableState` that would lead to infinite recursion. | ||
// Checking for a Stream.Duplex instance is faster here instead of inside | ||
// the WritableState constructor, at least with V8 6.5 | ||
var isDuplex = this instanceof Duplex; | ||
if (!isDuplex && !realHasInstance.call(Writable, this)) return new Writable(options); | ||
this._writableState = new WritableState(options, this, isDuplex); // legacy. | ||
this.writable = true; | ||
if (options) { | ||
if (typeof options.write === 'function') this._write = options.write; | ||
if (typeof options.writev === 'function') this._writev = options.writev; | ||
if (typeof options.destroy === 'function') this._destroy = options.destroy; | ||
if (typeof options.final === 'function') this._final = options.final; | ||
} | ||
Stream.call(this); | ||
} // Otherwise people can pipe Writable streams, which is just wrong. | ||
Writable.prototype.pipe = function () { | ||
errorOrDestroy(this, new ERR_STREAM_CANNOT_PIPE()); | ||
}; | ||
function writeAfterEnd(stream, cb) { | ||
var er = new ERR_STREAM_WRITE_AFTER_END(); // TODO: defer error events consistently everywhere, not just the cb | ||
errorOrDestroy(stream, er); | ||
process.nextTick(cb, er); | ||
} // Checks that a user-supplied chunk is valid, especially for the particular | ||
// mode the stream is in. Currently this means that `null` is never accepted | ||
// and undefined/non-string values are only allowed in object mode. | ||
function validChunk(stream, state, chunk, cb) { | ||
var er; | ||
if (chunk === null) { | ||
er = new ERR_STREAM_NULL_VALUES(); | ||
} else if (typeof chunk !== 'string' && !state.objectMode) { | ||
er = new ERR_INVALID_ARG_TYPE('chunk', ['string', 'Buffer'], chunk); | ||
} | ||
if (er) { | ||
errorOrDestroy(stream, er); | ||
process.nextTick(cb, er); | ||
return false; | ||
} | ||
return true; | ||
} | ||
Writable.prototype.write = function (chunk, encoding, cb) { | ||
var state = this._writableState; | ||
var ret = false; | ||
var isBuf = !state.objectMode && _isUint8Array(chunk); | ||
if (isBuf && !Buffer.isBuffer(chunk)) { | ||
chunk = _uint8ArrayToBuffer(chunk); | ||
} | ||
if (typeof encoding === 'function') { | ||
cb = encoding; | ||
encoding = null; | ||
} | ||
if (isBuf) encoding = 'buffer';else if (!encoding) encoding = state.defaultEncoding; | ||
if (typeof cb !== 'function') cb = nop; | ||
if (state.ending) writeAfterEnd(this, cb);else if (isBuf || validChunk(this, state, chunk, cb)) { | ||
state.pendingcb++; | ||
ret = writeOrBuffer(this, state, isBuf, chunk, encoding, cb); | ||
} | ||
return ret; | ||
}; | ||
Writable.prototype.cork = function () { | ||
this._writableState.corked++; | ||
}; | ||
Writable.prototype.uncork = function () { | ||
var state = this._writableState; | ||
if (state.corked) { | ||
state.corked--; | ||
if (!state.writing && !state.corked && !state.bufferProcessing && state.bufferedRequest) clearBuffer(this, state); | ||
} | ||
}; | ||
Writable.prototype.setDefaultEncoding = function setDefaultEncoding(encoding) { | ||
// node::ParseEncoding() requires lower case. | ||
if (typeof encoding === 'string') encoding = encoding.toLowerCase(); | ||
if (!(['hex', 'utf8', 'utf-8', 'ascii', 'binary', 'base64', 'ucs2', 'ucs-2', 'utf16le', 'utf-16le', 'raw'].indexOf((encoding + '').toLowerCase()) > -1)) throw new ERR_UNKNOWN_ENCODING(encoding); | ||
this._writableState.defaultEncoding = encoding; | ||
return this; | ||
}; | ||
Object.defineProperty(Writable.prototype, 'writableBuffer', { | ||
// making it explicit this property is not enumerable | ||
// because otherwise some prototype manipulation in | ||
// userland will fail | ||
enumerable: false, | ||
get: function get() { | ||
return this._writableState && this._writableState.getBuffer(); | ||
} | ||
}); | ||
function decodeChunk(state, chunk, encoding) { | ||
if (!state.objectMode && state.decodeStrings !== false && typeof chunk === 'string') { | ||
chunk = Buffer.from(chunk, encoding); | ||
} | ||
return chunk; | ||
} | ||
Object.defineProperty(Writable.prototype, 'writableHighWaterMark', { | ||
// making it explicit this property is not enumerable | ||
// because otherwise some prototype manipulation in | ||
// userland will fail | ||
enumerable: false, | ||
get: function get() { | ||
return this._writableState.highWaterMark; | ||
} | ||
}); // if we're already writing something, then just put this | ||
// in the queue, and wait our turn. Otherwise, call _write | ||
// If we return false, then we need a drain event, so set that flag. | ||
function writeOrBuffer(stream, state, isBuf, chunk, encoding, cb) { | ||
if (!isBuf) { | ||
var newChunk = decodeChunk(state, chunk, encoding); | ||
if (chunk !== newChunk) { | ||
isBuf = true; | ||
encoding = 'buffer'; | ||
chunk = newChunk; | ||
} | ||
} | ||
var len = state.objectMode ? 1 : chunk.length; | ||
state.length += len; | ||
var ret = state.length < state.highWaterMark; // we must ensure that previous needDrain will not be reset to false. | ||
if (!ret) state.needDrain = true; | ||
if (state.writing || state.corked) { | ||
var last = state.lastBufferedRequest; | ||
state.lastBufferedRequest = { | ||
chunk: chunk, | ||
encoding: encoding, | ||
isBuf: isBuf, | ||
callback: cb, | ||
next: null | ||
}; | ||
if (last) { | ||
last.next = state.lastBufferedRequest; | ||
} else { | ||
state.bufferedRequest = state.lastBufferedRequest; | ||
} | ||
state.bufferedRequestCount += 1; | ||
} else { | ||
doWrite(stream, state, false, len, chunk, encoding, cb); | ||
} | ||
return ret; | ||
} | ||
function doWrite(stream, state, writev, len, chunk, encoding, cb) { | ||
state.writelen = len; | ||
state.writecb = cb; | ||
state.writing = true; | ||
state.sync = true; | ||
if (state.destroyed) state.onwrite(new ERR_STREAM_DESTROYED('write'));else if (writev) stream._writev(chunk, state.onwrite);else stream._write(chunk, encoding, state.onwrite); | ||
state.sync = false; | ||
} | ||
function onwriteError(stream, state, sync, er, cb) { | ||
--state.pendingcb; | ||
if (sync) { | ||
// defer the callback if we are being called synchronously | ||
// to avoid piling up things on the stack | ||
process.nextTick(cb, er); // this can emit finish, and it will always happen | ||
// after error | ||
process.nextTick(finishMaybe, stream, state); | ||
stream._writableState.errorEmitted = true; | ||
errorOrDestroy(stream, er); | ||
} else { | ||
// the caller expect this to happen before if | ||
// it is async | ||
cb(er); | ||
stream._writableState.errorEmitted = true; | ||
errorOrDestroy(stream, er); // this can emit finish, but finish must | ||
// always follow error | ||
finishMaybe(stream, state); | ||
} | ||
} | ||
function onwriteStateUpdate(state) { | ||
state.writing = false; | ||
state.writecb = null; | ||
state.length -= state.writelen; | ||
state.writelen = 0; | ||
} | ||
function onwrite(stream, er) { | ||
var state = stream._writableState; | ||
var sync = state.sync; | ||
var cb = state.writecb; | ||
if (typeof cb !== 'function') throw new ERR_MULTIPLE_CALLBACK(); | ||
onwriteStateUpdate(state); | ||
if (er) onwriteError(stream, state, sync, er, cb);else { | ||
// Check if we're actually ready to finish, but don't emit yet | ||
var finished = needFinish(state) || stream.destroyed; | ||
if (!finished && !state.corked && !state.bufferProcessing && state.bufferedRequest) { | ||
clearBuffer(stream, state); | ||
} | ||
if (sync) { | ||
process.nextTick(afterWrite, stream, state, finished, cb); | ||
} else { | ||
afterWrite(stream, state, finished, cb); | ||
} | ||
} | ||
} | ||
function afterWrite(stream, state, finished, cb) { | ||
if (!finished) onwriteDrain(stream, state); | ||
state.pendingcb--; | ||
cb(); | ||
finishMaybe(stream, state); | ||
} // Must force callback to be called on nextTick, so that we don't | ||
// emit 'drain' before the write() consumer gets the 'false' return | ||
// value, and has a chance to attach a 'drain' listener. | ||
function onwriteDrain(stream, state) { | ||
if (state.length === 0 && state.needDrain) { | ||
state.needDrain = false; | ||
stream.emit('drain'); | ||
} | ||
} // if there's something in the buffer waiting, then process it | ||
function clearBuffer(stream, state) { | ||
state.bufferProcessing = true; | ||
var entry = state.bufferedRequest; | ||
if (stream._writev && entry && entry.next) { | ||
// Fast case, write everything using _writev() | ||
var l = state.bufferedRequestCount; | ||
var buffer = new Array(l); | ||
var holder = state.corkedRequestsFree; | ||
holder.entry = entry; | ||
var count = 0; | ||
var allBuffers = true; | ||
while (entry) { | ||
buffer[count] = entry; | ||
if (!entry.isBuf) allBuffers = false; | ||
entry = entry.next; | ||
count += 1; | ||
} | ||
buffer.allBuffers = allBuffers; | ||
doWrite(stream, state, true, state.length, buffer, '', holder.finish); // doWrite is almost always async, defer these to save a bit of time | ||
// as the hot path ends with doWrite | ||
state.pendingcb++; | ||
state.lastBufferedRequest = null; | ||
if (holder.next) { | ||
state.corkedRequestsFree = holder.next; | ||
holder.next = null; | ||
} else { | ||
state.corkedRequestsFree = new CorkedRequest(state); | ||
} | ||
state.bufferedRequestCount = 0; | ||
} else { | ||
// Slow case, write chunks one-by-one | ||
while (entry) { | ||
var chunk = entry.chunk; | ||
var encoding = entry.encoding; | ||
var cb = entry.callback; | ||
var len = state.objectMode ? 1 : chunk.length; | ||
doWrite(stream, state, false, len, chunk, encoding, cb); | ||
entry = entry.next; | ||
state.bufferedRequestCount--; // if we didn't call the onwrite immediately, then | ||
// it means that we need to wait until it does. | ||
// also, that means that the chunk and cb are currently | ||
// being processed, so move the buffer counter past them. | ||
if (state.writing) { | ||
break; | ||
} | ||
} | ||
if (entry === null) state.lastBufferedRequest = null; | ||
} | ||
state.bufferedRequest = entry; | ||
state.bufferProcessing = false; | ||
} | ||
Writable.prototype._write = function (chunk, encoding, cb) { | ||
cb(new ERR_METHOD_NOT_IMPLEMENTED('_write()')); | ||
}; | ||
Writable.prototype._writev = null; | ||
Writable.prototype.end = function (chunk, encoding, cb) { | ||
var state = this._writableState; | ||
if (typeof chunk === 'function') { | ||
cb = chunk; | ||
chunk = null; | ||
encoding = null; | ||
} else if (typeof encoding === 'function') { | ||
cb = encoding; | ||
encoding = null; | ||
} | ||
if (chunk !== null && chunk !== undefined) this.write(chunk, encoding); // .end() fully uncorks | ||
if (state.corked) { | ||
state.corked = 1; | ||
this.uncork(); | ||
} // ignore unnecessary end() calls. | ||
if (!state.ending) endWritable(this, state, cb); | ||
return this; | ||
}; | ||
Object.defineProperty(Writable.prototype, 'writableLength', { | ||
// making it explicit this property is not enumerable | ||
// because otherwise some prototype manipulation in | ||
// userland will fail | ||
enumerable: false, | ||
get: function get() { | ||
return this._writableState.length; | ||
} | ||
}); | ||
function needFinish(state) { | ||
return state.ending && state.length === 0 && state.bufferedRequest === null && !state.finished && !state.writing; | ||
} | ||
function callFinal(stream, state) { | ||
stream._final(function (err) { | ||
state.pendingcb--; | ||
if (err) { | ||
errorOrDestroy(stream, err); | ||
} | ||
state.prefinished = true; | ||
stream.emit('prefinish'); | ||
finishMaybe(stream, state); | ||
}); | ||
} | ||
function prefinish(stream, state) { | ||
if (!state.prefinished && !state.finalCalled) { | ||
if (typeof stream._final === 'function' && !state.destroyed) { | ||
state.pendingcb++; | ||
state.finalCalled = true; | ||
process.nextTick(callFinal, stream, state); | ||
} else { | ||
state.prefinished = true; | ||
stream.emit('prefinish'); | ||
} | ||
} | ||
} | ||
function finishMaybe(stream, state) { | ||
var need = needFinish(state); | ||
if (need) { | ||
prefinish(stream, state); | ||
if (state.pendingcb === 0) { | ||
state.finished = true; | ||
stream.emit('finish'); | ||
if (state.autoDestroy) { | ||
// In case of duplex streams we need a way to detect | ||
// if the readable side is ready for autoDestroy as well | ||
var rState = stream._readableState; | ||
if (!rState || rState.autoDestroy && rState.endEmitted) { | ||
stream.destroy(); | ||
} | ||
} | ||
} | ||
} | ||
return need; | ||
} | ||
function endWritable(stream, state, cb) { | ||
state.ending = true; | ||
finishMaybe(stream, state); | ||
if (cb) { | ||
if (state.finished) process.nextTick(cb);else stream.once('finish', cb); | ||
} | ||
state.ended = true; | ||
stream.writable = false; | ||
} | ||
function onCorkedFinish(corkReq, state, err) { | ||
var entry = corkReq.entry; | ||
corkReq.entry = null; | ||
while (entry) { | ||
var cb = entry.callback; | ||
state.pendingcb--; | ||
cb(err); | ||
entry = entry.next; | ||
} // reuse the free corkReq. | ||
state.corkedRequestsFree.next = corkReq; | ||
} | ||
Object.defineProperty(Writable.prototype, 'destroyed', { | ||
// making it explicit this property is not enumerable | ||
// because otherwise some prototype manipulation in | ||
// userland will fail | ||
enumerable: false, | ||
get: function get() { | ||
if (this._writableState === undefined) { | ||
return false; | ||
} | ||
return this._writableState.destroyed; | ||
}, | ||
set: function set(value) { | ||
// we ignore the value if the stream | ||
// has not been initialized yet | ||
if (!this._writableState) { | ||
return; | ||
} // backward compatibility, the user is explicitly | ||
// managing destroyed | ||
this._writableState.destroyed = value; | ||
} | ||
}); | ||
Writable.prototype.destroy = destroyImpl.destroy; | ||
Writable.prototype._undestroy = destroyImpl.undestroy; | ||
Writable.prototype._destroy = function (err, cb) { | ||
cb(err); | ||
}; | ||
module.exports = require('./stream').Writable |
@@ -1,210 +0,178 @@ | ||
'use strict'; | ||
'use strict' | ||
function ownKeys(object, enumerableOnly) { var keys = Object.keys(object); if (Object.getOwnPropertySymbols) { var symbols = Object.getOwnPropertySymbols(object); if (enumerableOnly) symbols = symbols.filter(function (sym) { return Object.getOwnPropertyDescriptor(object, sym).enumerable; }); keys.push.apply(keys, symbols); } return keys; } | ||
const { StringPrototypeSlice, SymbolIterator, TypedArrayPrototypeSet, Uint8Array } = require('../../ours/primordials') | ||
function _objectSpread(target) { for (var i = 1; i < arguments.length; i++) { var source = arguments[i] != null ? arguments[i] : {}; if (i % 2) { ownKeys(Object(source), true).forEach(function (key) { _defineProperty(target, key, source[key]); }); } else if (Object.getOwnPropertyDescriptors) { Object.defineProperties(target, Object.getOwnPropertyDescriptors(source)); } else { ownKeys(Object(source)).forEach(function (key) { Object.defineProperty(target, key, Object.getOwnPropertyDescriptor(source, key)); }); } } return target; } | ||
const { inspect } = require('../../ours/util') | ||
function _defineProperty(obj, key, value) { if (key in obj) { Object.defineProperty(obj, key, { value: value, enumerable: true, configurable: true, writable: true }); } else { obj[key] = value; } return obj; } | ||
module.exports = class BufferList { | ||
constructor() { | ||
this.head = null | ||
this.tail = null | ||
this.length = 0 | ||
} | ||
function _classCallCheck(instance, Constructor) { if (!(instance instanceof Constructor)) { throw new TypeError("Cannot call a class as a function"); } } | ||
push(v) { | ||
const entry = { | ||
data: v, | ||
next: null | ||
} | ||
if (this.length > 0) this.tail.next = entry | ||
else this.head = entry | ||
this.tail = entry | ||
++this.length | ||
} | ||
function _defineProperties(target, props) { for (var i = 0; i < props.length; i++) { var descriptor = props[i]; descriptor.enumerable = descriptor.enumerable || false; descriptor.configurable = true; if ("value" in descriptor) descriptor.writable = true; Object.defineProperty(target, descriptor.key, descriptor); } } | ||
unshift(v) { | ||
const entry = { | ||
data: v, | ||
next: this.head | ||
} | ||
if (this.length === 0) this.tail = entry | ||
this.head = entry | ||
++this.length | ||
} | ||
function _createClass(Constructor, protoProps, staticProps) { if (protoProps) _defineProperties(Constructor.prototype, protoProps); if (staticProps) _defineProperties(Constructor, staticProps); return Constructor; } | ||
shift() { | ||
if (this.length === 0) return | ||
const ret = this.head.data | ||
if (this.length === 1) this.head = this.tail = null | ||
else this.head = this.head.next | ||
--this.length | ||
return ret | ||
} | ||
var _require = require('buffer'), | ||
Buffer = _require.Buffer; | ||
clear() { | ||
this.head = this.tail = null | ||
this.length = 0 | ||
} | ||
var _require2 = require('util'), | ||
inspect = _require2.inspect; | ||
join(s) { | ||
if (this.length === 0) return '' | ||
let p = this.head | ||
let ret = '' + p.data | ||
var custom = inspect && inspect.custom || 'inspect'; | ||
while ((p = p.next) !== null) ret += s + p.data | ||
function copyBuffer(src, target, offset) { | ||
Buffer.prototype.copy.call(src, target, offset); | ||
} | ||
return ret | ||
} | ||
module.exports = | ||
/*#__PURE__*/ | ||
function () { | ||
function BufferList() { | ||
_classCallCheck(this, BufferList); | ||
concat(n) { | ||
if (this.length === 0) return Buffer.alloc(0) | ||
const ret = Buffer.allocUnsafe(n >>> 0) | ||
let p = this.head | ||
let i = 0 | ||
this.head = null; | ||
this.tail = null; | ||
this.length = 0; | ||
} | ||
_createClass(BufferList, [{ | ||
key: "push", | ||
value: function push(v) { | ||
var entry = { | ||
data: v, | ||
next: null | ||
}; | ||
if (this.length > 0) this.tail.next = entry;else this.head = entry; | ||
this.tail = entry; | ||
++this.length; | ||
while (p) { | ||
TypedArrayPrototypeSet(ret, p.data, i) | ||
i += p.data.length | ||
p = p.next | ||
} | ||
}, { | ||
key: "unshift", | ||
value: function unshift(v) { | ||
var entry = { | ||
data: v, | ||
next: this.head | ||
}; | ||
if (this.length === 0) this.tail = entry; | ||
this.head = entry; | ||
++this.length; | ||
} | ||
}, { | ||
key: "shift", | ||
value: function shift() { | ||
if (this.length === 0) return; | ||
var ret = this.head.data; | ||
if (this.length === 1) this.head = this.tail = null;else this.head = this.head.next; | ||
--this.length; | ||
return ret; | ||
} | ||
}, { | ||
key: "clear", | ||
value: function clear() { | ||
this.head = this.tail = null; | ||
this.length = 0; | ||
} | ||
}, { | ||
key: "join", | ||
value: function join(s) { | ||
if (this.length === 0) return ''; | ||
var p = this.head; | ||
var ret = '' + p.data; | ||
while (p = p.next) { | ||
ret += s + p.data; | ||
} | ||
return ret | ||
} // Consumes a specified amount of bytes or characters from the buffered data. | ||
return ret; | ||
consume(n, hasStrings) { | ||
const data = this.head.data | ||
if (n < data.length) { | ||
// `slice` is the same for buffers and strings. | ||
const slice = data.slice(0, n) | ||
this.head.data = data.slice(n) | ||
return slice | ||
} | ||
}, { | ||
key: "concat", | ||
value: function concat(n) { | ||
if (this.length === 0) return Buffer.alloc(0); | ||
var ret = Buffer.allocUnsafe(n >>> 0); | ||
var p = this.head; | ||
var i = 0; | ||
while (p) { | ||
copyBuffer(p.data, ret, i); | ||
i += p.data.length; | ||
p = p.next; | ||
} | ||
if (n === data.length) { | ||
// First chunk is a perfect match. | ||
return this.shift() | ||
} // Result spans more than one buffer. | ||
return ret; | ||
} // Consumes a specified amount of bytes or characters from the buffered data. | ||
return hasStrings ? this._getString(n) : this._getBuffer(n) | ||
} | ||
}, { | ||
key: "consume", | ||
value: function consume(n, hasStrings) { | ||
var ret; | ||
first() { | ||
return this.head.data | ||
} | ||
if (n < this.head.data.length) { | ||
// `slice` is the same for buffers and strings. | ||
ret = this.head.data.slice(0, n); | ||
this.head.data = this.head.data.slice(n); | ||
} else if (n === this.head.data.length) { | ||
// First chunk is a perfect match. | ||
ret = this.shift(); | ||
} else { | ||
// Result spans more than one buffer. | ||
ret = hasStrings ? this._getString(n) : this._getBuffer(n); | ||
} | ||
return ret; | ||
*[SymbolIterator]() { | ||
for (let p = this.head; p; p = p.next) { | ||
yield p.data | ||
} | ||
}, { | ||
key: "first", | ||
value: function first() { | ||
return this.head.data; | ||
} // Consumes a specified amount of characters from the buffered data. | ||
} // Consumes a specified amount of characters from the buffered data. | ||
}, { | ||
key: "_getString", | ||
value: function _getString(n) { | ||
var p = this.head; | ||
var c = 1; | ||
var ret = p.data; | ||
n -= ret.length; | ||
_getString(n) { | ||
let ret = '' | ||
let p = this.head | ||
let c = 0 | ||
while (p = p.next) { | ||
var str = p.data; | ||
var nb = n > str.length ? str.length : n; | ||
if (nb === str.length) ret += str;else ret += str.slice(0, n); | ||
n -= nb; | ||
do { | ||
const str = p.data | ||
if (n === 0) { | ||
if (nb === str.length) { | ||
++c; | ||
if (p.next) this.head = p.next;else this.head = this.tail = null; | ||
} else { | ||
this.head = p; | ||
p.data = str.slice(nb); | ||
} | ||
break; | ||
if (n > str.length) { | ||
ret += str | ||
n -= str.length | ||
} else { | ||
if (n === str.length) { | ||
ret += str | ||
++c | ||
if (p.next) this.head = p.next | ||
else this.head = this.tail = null | ||
} else { | ||
ret += StringPrototypeSlice(str, 0, n) | ||
this.head = p | ||
p.data = StringPrototypeSlice(str, n) | ||
} | ||
++c; | ||
break | ||
} | ||
this.length -= c; | ||
return ret; | ||
} // Consumes a specified amount of bytes from the buffered data. | ||
++c | ||
} while ((p = p.next) !== null) | ||
}, { | ||
key: "_getBuffer", | ||
value: function _getBuffer(n) { | ||
var ret = Buffer.allocUnsafe(n); | ||
var p = this.head; | ||
var c = 1; | ||
p.data.copy(ret); | ||
n -= p.data.length; | ||
this.length -= c | ||
return ret | ||
} // Consumes a specified amount of bytes from the buffered data. | ||
while (p = p.next) { | ||
var buf = p.data; | ||
var nb = n > buf.length ? buf.length : n; | ||
buf.copy(ret, ret.length - n, 0, nb); | ||
n -= nb; | ||
_getBuffer(n) { | ||
const ret = Buffer.allocUnsafe(n) | ||
const retLen = n | ||
let p = this.head | ||
let c = 0 | ||
if (n === 0) { | ||
if (nb === buf.length) { | ||
++c; | ||
if (p.next) this.head = p.next;else this.head = this.tail = null; | ||
} else { | ||
this.head = p; | ||
p.data = buf.slice(nb); | ||
} | ||
do { | ||
const buf = p.data | ||
break; | ||
if (n > buf.length) { | ||
TypedArrayPrototypeSet(ret, buf, retLen - n) | ||
n -= buf.length | ||
} else { | ||
if (n === buf.length) { | ||
TypedArrayPrototypeSet(ret, buf, retLen - n) | ||
++c | ||
if (p.next) this.head = p.next | ||
else this.head = this.tail = null | ||
} else { | ||
TypedArrayPrototypeSet(ret, new Uint8Array(buf.buffer, buf.byteOffset, n), retLen - n) | ||
this.head = p | ||
p.data = buf.slice(n) | ||
} | ||
++c; | ||
break | ||
} | ||
this.length -= c; | ||
return ret; | ||
} // Make sure the linked list only shows the minimal necessary information. | ||
++c | ||
} while ((p = p.next) !== null) | ||
}, { | ||
key: custom, | ||
value: function value(_, options) { | ||
return inspect(this, _objectSpread({}, options, { | ||
// Only inspect one level. | ||
depth: 0, | ||
// It should not recurse. | ||
customInspect: false | ||
})); | ||
} | ||
}]); | ||
this.length -= c | ||
return ret | ||
} // Make sure the linked list only shows the minimal necessary information. | ||
return BufferList; | ||
}(); | ||
[Symbol.for('nodejs.util.inspect.custom')](_, options) { | ||
return inspect(this, { | ||
...options, | ||
// Only inspect one level. | ||
depth: 0, | ||
// It should not recurse. | ||
customInspect: false | ||
}) | ||
} | ||
} |
@@ -1,91 +0,180 @@ | ||
'use strict'; // undocumented cb() API, needed for core, not for public API | ||
'use strict' | ||
const { | ||
aggregateTwoErrors, | ||
codes: { ERR_MULTIPLE_CALLBACK }, | ||
AbortError | ||
} = require('../../ours/errors') | ||
const { Symbol } = require('../../ours/primordials') | ||
const { kDestroyed, isDestroyed, isFinished, isServerRequest } = require('./utils') | ||
const kDestroy = Symbol('kDestroy') | ||
const kConstruct = Symbol('kConstruct') | ||
function checkError(err, w, r) { | ||
if (err) { | ||
// Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364 | ||
err.stack // eslint-disable-line no-unused-expressions | ||
if (w && !w.errored) { | ||
w.errored = err | ||
} | ||
if (r && !r.errored) { | ||
r.errored = err | ||
} | ||
} | ||
} // Backwards compat. cb() is undocumented and unused in core but | ||
// unfortunately might be used by modules. | ||
function destroy(err, cb) { | ||
var _this = this; | ||
const r = this._readableState | ||
const w = this._writableState // With duplex streams we use the writable side for state. | ||
var readableDestroyed = this._readableState && this._readableState.destroyed; | ||
var writableDestroyed = this._writableState && this._writableState.destroyed; | ||
const s = w || r | ||
if (readableDestroyed || writableDestroyed) { | ||
if (cb) { | ||
cb(err); | ||
} else if (err) { | ||
if (!this._writableState) { | ||
process.nextTick(emitErrorNT, this, err); | ||
} else if (!this._writableState.errorEmitted) { | ||
this._writableState.errorEmitted = true; | ||
process.nextTick(emitErrorNT, this, err); | ||
} | ||
if ((w && w.destroyed) || (r && r.destroyed)) { | ||
if (typeof cb === 'function') { | ||
cb() | ||
} | ||
return this; | ||
} // we set destroyed to true before firing error callbacks in order | ||
return this | ||
} // We set destroyed to true before firing error callbacks in order | ||
// to make it re-entrance safe in case destroy() is called within callbacks | ||
checkError(err, w, r) | ||
if (this._readableState) { | ||
this._readableState.destroyed = true; | ||
} // if this is a duplex stream mark the writable part as destroyed as well | ||
if (w) { | ||
w.destroyed = true | ||
} | ||
if (r) { | ||
r.destroyed = true | ||
} // If still constructing then defer calling _destroy. | ||
if (this._writableState) { | ||
this._writableState.destroyed = true; | ||
if (!s.constructed) { | ||
this.once(kDestroy, function (er) { | ||
_destroy(this, aggregateTwoErrors(er, err), cb) | ||
}) | ||
} else { | ||
_destroy(this, err, cb) | ||
} | ||
this._destroy(err || null, function (err) { | ||
if (!cb && err) { | ||
if (!_this._writableState) { | ||
process.nextTick(emitErrorAndCloseNT, _this, err); | ||
} else if (!_this._writableState.errorEmitted) { | ||
_this._writableState.errorEmitted = true; | ||
process.nextTick(emitErrorAndCloseNT, _this, err); | ||
} else { | ||
process.nextTick(emitCloseNT, _this); | ||
} | ||
} else if (cb) { | ||
process.nextTick(emitCloseNT, _this); | ||
cb(err); | ||
return this | ||
} | ||
function _destroy(self, err, cb) { | ||
let called = false | ||
function onDestroy(err) { | ||
if (called) { | ||
return | ||
} | ||
called = true | ||
const r = self._readableState | ||
const w = self._writableState | ||
checkError(err, w, r) | ||
if (w) { | ||
w.closed = true | ||
} | ||
if (r) { | ||
r.closed = true | ||
} | ||
if (typeof cb === 'function') { | ||
cb(err) | ||
} | ||
if (err) { | ||
process.nextTick(emitErrorCloseNT, self, err) | ||
} else { | ||
process.nextTick(emitCloseNT, _this); | ||
process.nextTick(emitCloseNT, self) | ||
} | ||
}); | ||
} | ||
return this; | ||
try { | ||
self._destroy(err || null, onDestroy) | ||
} catch (err) { | ||
onDestroy(err) | ||
} | ||
} | ||
function emitErrorAndCloseNT(self, err) { | ||
emitErrorNT(self, err); | ||
emitCloseNT(self); | ||
function emitErrorCloseNT(self, err) { | ||
emitErrorNT(self, err) | ||
emitCloseNT(self) | ||
} | ||
function emitCloseNT(self) { | ||
if (self._writableState && !self._writableState.emitClose) return; | ||
if (self._readableState && !self._readableState.emitClose) return; | ||
self.emit('close'); | ||
} | ||
const r = self._readableState | ||
const w = self._writableState | ||
function undestroy() { | ||
if (this._readableState) { | ||
this._readableState.destroyed = false; | ||
this._readableState.reading = false; | ||
this._readableState.ended = false; | ||
this._readableState.endEmitted = false; | ||
if (w) { | ||
w.closeEmitted = true | ||
} | ||
if (this._writableState) { | ||
this._writableState.destroyed = false; | ||
this._writableState.ended = false; | ||
this._writableState.ending = false; | ||
this._writableState.finalCalled = false; | ||
this._writableState.prefinished = false; | ||
this._writableState.finished = false; | ||
this._writableState.errorEmitted = false; | ||
if (r) { | ||
r.closeEmitted = true | ||
} | ||
if ((w && w.emitClose) || (r && r.emitClose)) { | ||
self.emit('close') | ||
} | ||
} | ||
function emitErrorNT(self, err) { | ||
self.emit('error', err); | ||
const r = self._readableState | ||
const w = self._writableState | ||
if ((w && w.errorEmitted) || (r && r.errorEmitted)) { | ||
return | ||
} | ||
if (w) { | ||
w.errorEmitted = true | ||
} | ||
if (r) { | ||
r.errorEmitted = true | ||
} | ||
self.emit('error', err) | ||
} | ||
function errorOrDestroy(stream, err) { | ||
function undestroy() { | ||
const r = this._readableState | ||
const w = this._writableState | ||
if (r) { | ||
r.constructed = true | ||
r.closed = false | ||
r.closeEmitted = false | ||
r.destroyed = false | ||
r.errored = null | ||
r.errorEmitted = false | ||
r.reading = false | ||
r.ended = r.readable === false | ||
r.endEmitted = r.readable === false | ||
} | ||
if (w) { | ||
w.constructed = true | ||
w.destroyed = false | ||
w.closed = false | ||
w.closeEmitted = false | ||
w.errored = null | ||
w.errorEmitted = false | ||
w.finalCalled = false | ||
w.prefinished = false | ||
w.ended = w.writable === false | ||
w.ending = w.writable === false | ||
w.finished = w.writable === false | ||
} | ||
} | ||
function errorOrDestroy(stream, err, sync) { | ||
// We have tests that rely on errors being emitted | ||
@@ -96,11 +185,149 @@ // in the same tick, so changing this is semver major. | ||
// semver major update we should change the default to this. | ||
var rState = stream._readableState; | ||
var wState = stream._writableState; | ||
if (rState && rState.autoDestroy || wState && wState.autoDestroy) stream.destroy(err);else stream.emit('error', err); | ||
const r = stream._readableState | ||
const w = stream._writableState | ||
if ((w && w.destroyed) || (r && r.destroyed)) { | ||
return this | ||
} | ||
if ((r && r.autoDestroy) || (w && w.autoDestroy)) stream.destroy(err) | ||
else if (err) { | ||
// Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364 | ||
err.stack // eslint-disable-line no-unused-expressions | ||
if (w && !w.errored) { | ||
w.errored = err | ||
} | ||
if (r && !r.errored) { | ||
r.errored = err | ||
} | ||
if (sync) { | ||
process.nextTick(emitErrorNT, stream, err) | ||
} else { | ||
emitErrorNT(stream, err) | ||
} | ||
} | ||
} | ||
function construct(stream, cb) { | ||
if (typeof stream._construct !== 'function') { | ||
return | ||
} | ||
const r = stream._readableState | ||
const w = stream._writableState | ||
if (r) { | ||
r.constructed = false | ||
} | ||
if (w) { | ||
w.constructed = false | ||
} | ||
stream.once(kConstruct, cb) | ||
if (stream.listenerCount(kConstruct) > 1) { | ||
// Duplex | ||
return | ||
} | ||
process.nextTick(constructNT, stream) | ||
} | ||
function constructNT(stream) { | ||
let called = false | ||
function onConstruct(err) { | ||
if (called) { | ||
errorOrDestroy(stream, err !== null && err !== undefined ? err : new ERR_MULTIPLE_CALLBACK()) | ||
return | ||
} | ||
called = true | ||
const r = stream._readableState | ||
const w = stream._writableState | ||
const s = w || r | ||
if (r) { | ||
r.constructed = true | ||
} | ||
if (w) { | ||
w.constructed = true | ||
} | ||
if (s.destroyed) { | ||
stream.emit(kDestroy, err) | ||
} else if (err) { | ||
errorOrDestroy(stream, err, true) | ||
} else { | ||
process.nextTick(emitConstructNT, stream) | ||
} | ||
} | ||
try { | ||
stream._construct(onConstruct) | ||
} catch (err) { | ||
onConstruct(err) | ||
} | ||
} | ||
function emitConstructNT(stream) { | ||
stream.emit(kConstruct) | ||
} | ||
function isRequest(stream) { | ||
return stream && stream.setHeader && typeof stream.abort === 'function' | ||
} | ||
function emitCloseLegacy(stream) { | ||
stream.emit('close') | ||
} | ||
function emitErrorCloseLegacy(stream, err) { | ||
stream.emit('error', err) | ||
process.nextTick(emitCloseLegacy, stream) | ||
} // Normalize destroy for legacy. | ||
function destroyer(stream, err) { | ||
if (!stream || isDestroyed(stream)) { | ||
return | ||
} | ||
if (!err && !isFinished(stream)) { | ||
err = new AbortError() | ||
} // TODO: Remove isRequest branches. | ||
if (isServerRequest(stream)) { | ||
stream.socket = null | ||
stream.destroy(err) | ||
} else if (isRequest(stream)) { | ||
stream.abort() | ||
} else if (isRequest(stream.req)) { | ||
stream.req.abort() | ||
} else if (typeof stream.destroy === 'function') { | ||
stream.destroy(err) | ||
} else if (typeof stream.close === 'function') { | ||
// TODO: Don't lose err? | ||
stream.close() | ||
} else if (err) { | ||
process.nextTick(emitErrorCloseLegacy, stream) | ||
} else { | ||
process.nextTick(emitCloseLegacy, stream) | ||
} | ||
if (!stream.destroyed) { | ||
stream[kDestroyed] = true | ||
} | ||
} | ||
module.exports = { | ||
destroy: destroy, | ||
undestroy: undestroy, | ||
errorOrDestroy: errorOrDestroy | ||
}; | ||
construct, | ||
destroyer, | ||
destroy, | ||
undestroy, | ||
errorOrDestroy | ||
} |
// Ported from https://github.com/mafintosh/end-of-stream with | ||
// permission from the author, Mathias Buus (@mafintosh). | ||
'use strict'; | ||
'use strict' | ||
var ERR_STREAM_PREMATURE_CLOSE = require('../../../errors').codes.ERR_STREAM_PREMATURE_CLOSE; | ||
const { AbortError, codes } = require('../../ours/errors') | ||
function once(callback) { | ||
var called = false; | ||
return function () { | ||
if (called) return; | ||
called = true; | ||
const { ERR_INVALID_ARG_TYPE, ERR_STREAM_PREMATURE_CLOSE } = codes | ||
for (var _len = arguments.length, args = new Array(_len), _key = 0; _key < _len; _key++) { | ||
args[_key] = arguments[_key]; | ||
} | ||
const { once } = require('../../ours/util') | ||
callback.apply(this, args); | ||
}; | ||
} | ||
const { validateAbortSignal, validateFunction, validateObject } = require('../validators') | ||
function noop() {} | ||
const { Promise } = require('../../ours/primordials') | ||
const { | ||
isClosed, | ||
isReadable, | ||
isReadableNodeStream, | ||
isReadableFinished, | ||
isReadableErrored, | ||
isWritable, | ||
isWritableNodeStream, | ||
isWritableFinished, | ||
isWritableErrored, | ||
isNodeStream, | ||
willEmitClose: _willEmitClose | ||
} = require('./utils') | ||
function isRequest(stream) { | ||
return stream.setHeader && typeof stream.abort === 'function'; | ||
return stream.setHeader && typeof stream.abort === 'function' | ||
} | ||
function eos(stream, opts, callback) { | ||
if (typeof opts === 'function') return eos(stream, null, opts); | ||
if (!opts) opts = {}; | ||
callback = once(callback || noop); | ||
var readable = opts.readable || opts.readable !== false && stream.readable; | ||
var writable = opts.writable || opts.writable !== false && stream.writable; | ||
const nop = () => {} | ||
var onlegacyfinish = function onlegacyfinish() { | ||
if (!stream.writable) onfinish(); | ||
}; | ||
function eos(stream, options, callback) { | ||
var _options$readable, _options$writable | ||
var writableEnded = stream._writableState && stream._writableState.finished; | ||
if (arguments.length === 2) { | ||
callback = options | ||
options = {} | ||
} else if (options == null) { | ||
options = {} | ||
} else { | ||
validateObject(options, 'options') | ||
} | ||
var onfinish = function onfinish() { | ||
writable = false; | ||
writableEnded = true; | ||
if (!readable) callback.call(stream); | ||
}; | ||
validateFunction(callback, 'callback') | ||
validateAbortSignal(options.signal, 'options.signal') | ||
callback = once(callback) | ||
const readable = | ||
(_options$readable = options.readable) !== null && _options$readable !== undefined | ||
? _options$readable | ||
: isReadableNodeStream(stream) | ||
const writable = | ||
(_options$writable = options.writable) !== null && _options$writable !== undefined | ||
? _options$writable | ||
: isWritableNodeStream(stream) | ||
var readableEnded = stream._readableState && stream._readableState.endEmitted; | ||
if (!isNodeStream(stream)) { | ||
// TODO: Webstreams. | ||
throw new ERR_INVALID_ARG_TYPE('stream', 'Stream', stream) | ||
} | ||
var onend = function onend() { | ||
readable = false; | ||
readableEnded = true; | ||
if (!writable) callback.call(stream); | ||
}; | ||
const wState = stream._writableState | ||
const rState = stream._readableState | ||
var onerror = function onerror(err) { | ||
callback.call(stream, err); | ||
}; | ||
const onlegacyfinish = () => { | ||
if (!stream.writable) { | ||
onfinish() | ||
} | ||
} // TODO (ronag): Improve soft detection to include core modules and | ||
// common ecosystem modules that do properly emit 'close' but fail | ||
// this generic check. | ||
var onclose = function onclose() { | ||
var err; | ||
let willEmitClose = | ||
_willEmitClose(stream) && isReadableNodeStream(stream) === readable && isWritableNodeStream(stream) === writable | ||
let writableFinished = isWritableFinished(stream, false) | ||
if (readable && !readableEnded) { | ||
if (!stream._readableState || !stream._readableState.ended) err = new ERR_STREAM_PREMATURE_CLOSE(); | ||
return callback.call(stream, err); | ||
const onfinish = () => { | ||
writableFinished = true // Stream should not be destroyed here. If it is that | ||
// means that user space is doing something differently and | ||
// we cannot trust willEmitClose. | ||
if (stream.destroyed) { | ||
willEmitClose = false | ||
} | ||
if (writable && !writableEnded) { | ||
if (!stream._writableState || !stream._writableState.ended) err = new ERR_STREAM_PREMATURE_CLOSE(); | ||
return callback.call(stream, err); | ||
if (willEmitClose && (!stream.readable || readable)) { | ||
return | ||
} | ||
}; | ||
var onrequest = function onrequest() { | ||
stream.req.on('finish', onfinish); | ||
}; | ||
if (!readable || readableFinished) { | ||
callback.call(stream) | ||
} | ||
} | ||
let readableFinished = isReadableFinished(stream, false) | ||
const onend = () => { | ||
readableFinished = true // Stream should not be destroyed here. If it is that | ||
// means that user space is doing something differently and | ||
// we cannot trust willEmitClose. | ||
if (stream.destroyed) { | ||
willEmitClose = false | ||
} | ||
if (willEmitClose && (!stream.writable || writable)) { | ||
return | ||
} | ||
if (!writable || writableFinished) { | ||
callback.call(stream) | ||
} | ||
} | ||
const onerror = (err) => { | ||
callback.call(stream, err) | ||
} | ||
let closed = isClosed(stream) | ||
const onclose = () => { | ||
closed = true | ||
const errored = isWritableErrored(stream) || isReadableErrored(stream) | ||
if (errored && typeof errored !== 'boolean') { | ||
return callback.call(stream, errored) | ||
} | ||
if (readable && !readableFinished && isReadableNodeStream(stream, true)) { | ||
if (!isReadableFinished(stream, false)) return callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE()) | ||
} | ||
if (writable && !writableFinished) { | ||
if (!isWritableFinished(stream, false)) return callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE()) | ||
} | ||
callback.call(stream) | ||
} | ||
const onrequest = () => { | ||
stream.req.on('finish', onfinish) | ||
} | ||
if (isRequest(stream)) { | ||
stream.on('complete', onfinish); | ||
stream.on('abort', onclose); | ||
if (stream.req) onrequest();else stream.on('request', onrequest); | ||
} else if (writable && !stream._writableState) { | ||
stream.on('complete', onfinish) | ||
if (!willEmitClose) { | ||
stream.on('abort', onclose) | ||
} | ||
if (stream.req) { | ||
onrequest() | ||
} else { | ||
stream.on('request', onrequest) | ||
} | ||
} else if (writable && !wState) { | ||
// legacy streams | ||
stream.on('end', onlegacyfinish); | ||
stream.on('close', onlegacyfinish); | ||
stream.on('end', onlegacyfinish) | ||
stream.on('close', onlegacyfinish) | ||
} // Not all streams will emit 'close' after 'aborted'. | ||
if (!willEmitClose && typeof stream.aborted === 'boolean') { | ||
stream.on('aborted', onclose) | ||
} | ||
stream.on('end', onend); | ||
stream.on('finish', onfinish); | ||
if (opts.error !== false) stream.on('error', onerror); | ||
stream.on('close', onclose); | ||
return function () { | ||
stream.removeListener('complete', onfinish); | ||
stream.removeListener('abort', onclose); | ||
stream.removeListener('request', onrequest); | ||
if (stream.req) stream.req.removeListener('finish', onfinish); | ||
stream.removeListener('end', onlegacyfinish); | ||
stream.removeListener('close', onlegacyfinish); | ||
stream.removeListener('finish', onfinish); | ||
stream.removeListener('end', onend); | ||
stream.removeListener('error', onerror); | ||
stream.removeListener('close', onclose); | ||
}; | ||
stream.on('end', onend) | ||
stream.on('finish', onfinish) | ||
if (options.error !== false) { | ||
stream.on('error', onerror) | ||
} | ||
stream.on('close', onclose) | ||
if (closed) { | ||
process.nextTick(onclose) | ||
} else if ( | ||
(wState !== null && wState !== undefined && wState.errorEmitted) || | ||
(rState !== null && rState !== undefined && rState.errorEmitted) | ||
) { | ||
if (!willEmitClose) { | ||
process.nextTick(onclose) | ||
} | ||
} else if ( | ||
!readable && | ||
(!willEmitClose || isReadable(stream)) && | ||
(writableFinished || isWritable(stream) === false) | ||
) { | ||
process.nextTick(onclose) | ||
} else if ( | ||
!writable && | ||
(!willEmitClose || isWritable(stream)) && | ||
(readableFinished || isReadable(stream) === false) | ||
) { | ||
process.nextTick(onclose) | ||
} else if (rState && stream.req && stream.aborted) { | ||
process.nextTick(onclose) | ||
} | ||
const cleanup = () => { | ||
callback = nop | ||
stream.removeListener('aborted', onclose) | ||
stream.removeListener('complete', onfinish) | ||
stream.removeListener('abort', onclose) | ||
stream.removeListener('request', onrequest) | ||
if (stream.req) stream.req.removeListener('finish', onfinish) | ||
stream.removeListener('end', onlegacyfinish) | ||
stream.removeListener('close', onlegacyfinish) | ||
stream.removeListener('finish', onfinish) | ||
stream.removeListener('end', onend) | ||
stream.removeListener('error', onerror) | ||
stream.removeListener('close', onclose) | ||
} | ||
if (options.signal && !closed) { | ||
const abort = () => { | ||
// Keep it because cleanup removes it. | ||
const endCallback = callback | ||
cleanup() | ||
endCallback.call( | ||
stream, | ||
new AbortError(undefined, { | ||
cause: options.signal.reason | ||
}) | ||
) | ||
} | ||
if (options.signal.aborted) { | ||
process.nextTick(abort) | ||
} else { | ||
const originalCallback = callback | ||
callback = once((...args) => { | ||
options.signal.removeEventListener('abort', abort) | ||
originalCallback.apply(stream, args) | ||
}) | ||
options.signal.addEventListener('abort', abort) | ||
} | ||
} | ||
return cleanup | ||
} | ||
module.exports = eos; | ||
function finished(stream, opts) { | ||
return new Promise((resolve, reject) => { | ||
eos(stream, opts, (err) => { | ||
if (err) { | ||
reject(err) | ||
} else { | ||
resolve() | ||
} | ||
}) | ||
}) | ||
} | ||
module.exports = eos | ||
module.exports.finished = finished |
@@ -1,64 +0,108 @@ | ||
'use strict'; | ||
'use strict' | ||
function asyncGeneratorStep(gen, resolve, reject, _next, _throw, key, arg) { try { var info = gen[key](arg); var value = info.value; } catch (error) { reject(error); return; } if (info.done) { resolve(value); } else { Promise.resolve(value).then(_next, _throw); } } | ||
const { PromisePrototypeThen, SymbolAsyncIterator, SymbolIterator } = require('../../ours/primordials') | ||
function _asyncToGenerator(fn) { return function () { var self = this, args = arguments; return new Promise(function (resolve, reject) { var gen = fn.apply(self, args); function _next(value) { asyncGeneratorStep(gen, resolve, reject, _next, _throw, "next", value); } function _throw(err) { asyncGeneratorStep(gen, resolve, reject, _next, _throw, "throw", err); } _next(undefined); }); }; } | ||
const { ERR_INVALID_ARG_TYPE, ERR_STREAM_NULL_VALUES } = require('../../ours/errors').codes | ||
function ownKeys(object, enumerableOnly) { var keys = Object.keys(object); if (Object.getOwnPropertySymbols) { var symbols = Object.getOwnPropertySymbols(object); if (enumerableOnly) symbols = symbols.filter(function (sym) { return Object.getOwnPropertyDescriptor(object, sym).enumerable; }); keys.push.apply(keys, symbols); } return keys; } | ||
function from(Readable, iterable, opts) { | ||
let iterator | ||
function _objectSpread(target) { for (var i = 1; i < arguments.length; i++) { var source = arguments[i] != null ? arguments[i] : {}; if (i % 2) { ownKeys(Object(source), true).forEach(function (key) { _defineProperty(target, key, source[key]); }); } else if (Object.getOwnPropertyDescriptors) { Object.defineProperties(target, Object.getOwnPropertyDescriptors(source)); } else { ownKeys(Object(source)).forEach(function (key) { Object.defineProperty(target, key, Object.getOwnPropertyDescriptor(source, key)); }); } } return target; } | ||
if (typeof iterable === 'string' || iterable instanceof Buffer) { | ||
return new Readable({ | ||
objectMode: true, | ||
...opts, | ||
function _defineProperty(obj, key, value) { if (key in obj) { Object.defineProperty(obj, key, { value: value, enumerable: true, configurable: true, writable: true }); } else { obj[key] = value; } return obj; } | ||
read() { | ||
this.push(iterable) | ||
this.push(null) | ||
} | ||
}) | ||
} | ||
var ERR_INVALID_ARG_TYPE = require('../../../errors').codes.ERR_INVALID_ARG_TYPE; | ||
let isAsync | ||
function from(Readable, iterable, opts) { | ||
var iterator; | ||
if (iterable && iterable[SymbolAsyncIterator]) { | ||
isAsync = true | ||
iterator = iterable[SymbolAsyncIterator]() | ||
} else if (iterable && iterable[SymbolIterator]) { | ||
isAsync = false | ||
iterator = iterable[SymbolIterator]() | ||
} else { | ||
throw new ERR_INVALID_ARG_TYPE('iterable', ['Iterable'], iterable) | ||
} | ||
if (iterable && typeof iterable.next === 'function') { | ||
iterator = iterable; | ||
} else if (iterable && iterable[Symbol.asyncIterator]) iterator = iterable[Symbol.asyncIterator]();else if (iterable && iterable[Symbol.iterator]) iterator = iterable[Symbol.iterator]();else throw new ERR_INVALID_ARG_TYPE('iterable', ['Iterable'], iterable); | ||
var readable = new Readable(_objectSpread({ | ||
objectMode: true | ||
}, opts)); // Reading boolean to protect against _read | ||
const readable = new Readable({ | ||
objectMode: true, | ||
highWaterMark: 1, | ||
// TODO(ronag): What options should be allowed? | ||
...opts | ||
}) // Flag to protect against _read | ||
// being called before last iteration completion. | ||
var reading = false; | ||
let reading = false | ||
readable._read = function () { | ||
if (!reading) { | ||
reading = true; | ||
next(); | ||
reading = true | ||
next() | ||
} | ||
}; | ||
} | ||
function next() { | ||
return _next2.apply(this, arguments); | ||
readable._destroy = function (error, cb) { | ||
PromisePrototypeThen( | ||
close(error), | ||
() => process.nextTick(cb, error), // nextTick is here in case cb throws | ||
(e) => process.nextTick(cb, e || error) | ||
) | ||
} | ||
function _next2() { | ||
_next2 = _asyncToGenerator(function* () { | ||
async function close(error) { | ||
const hadError = error !== undefined && error !== null | ||
const hasThrow = typeof iterator.throw === 'function' | ||
if (hadError && hasThrow) { | ||
const { value, done } = await iterator.throw(error) | ||
await value | ||
if (done) { | ||
return | ||
} | ||
} | ||
if (typeof iterator.return === 'function') { | ||
const { value } = await iterator.return() | ||
await value | ||
} | ||
} | ||
async function next() { | ||
for (;;) { | ||
try { | ||
var _ref = yield iterator.next(), | ||
value = _ref.value, | ||
done = _ref.done; | ||
const { value, done } = isAsync ? await iterator.next() : iterator.next() | ||
if (done) { | ||
readable.push(null); | ||
} else if (readable.push((yield value))) { | ||
next(); | ||
readable.push(null) | ||
} else { | ||
reading = false; | ||
const res = value && typeof value.then === 'function' ? await value : value | ||
if (res === null) { | ||
reading = false | ||
throw new ERR_STREAM_NULL_VALUES() | ||
} else if (readable.push(res)) { | ||
continue | ||
} else { | ||
reading = false | ||
} | ||
} | ||
} catch (err) { | ||
readable.destroy(err); | ||
readable.destroy(err) | ||
} | ||
}); | ||
return _next2.apply(this, arguments); | ||
break | ||
} | ||
} | ||
return readable; | ||
return readable | ||
} | ||
module.exports = from; | ||
module.exports = from |
// Ported from https://github.com/mafintosh/pump with | ||
// permission from the author, Mathias Buus (@mafintosh). | ||
'use strict'; | ||
'use strict' | ||
var eos; | ||
const abortControllerModule = require('abort-controller') | ||
function once(callback) { | ||
var called = false; | ||
return function () { | ||
if (called) return; | ||
called = true; | ||
callback.apply(void 0, arguments); | ||
}; | ||
} | ||
const { ArrayIsArray, Promise, SymbolAsyncIterator } = require('../../ours/primordials') | ||
var _require$codes = require('../../../errors').codes, | ||
ERR_MISSING_ARGS = _require$codes.ERR_MISSING_ARGS, | ||
ERR_STREAM_DESTROYED = _require$codes.ERR_STREAM_DESTROYED; | ||
const eos = require('./end-of-stream') | ||
function noop(err) { | ||
// Rethrow the error if it exists to avoid swallowing it | ||
if (err) throw err; | ||
const { once } = require('../../ours/util') | ||
const destroyImpl = require('./destroy') | ||
const Duplex = require('./duplex') | ||
const { | ||
aggregateTwoErrors, | ||
codes: { ERR_INVALID_ARG_TYPE, ERR_INVALID_RETURN_VALUE, ERR_MISSING_ARGS, ERR_STREAM_DESTROYED }, | ||
AbortError | ||
} = require('../../ours/errors') | ||
const { validateFunction, validateAbortSignal } = require('../validators') | ||
const { isIterable, isReadable, isReadableNodeStream, isNodeStream } = require('./utils') | ||
const AbortController = globalThis.AbortController || abortControllerModule.AbortController | ||
let PassThrough | ||
let Readable | ||
function destroyer(stream, reading, writing) { | ||
let finished = false | ||
stream.on('close', () => { | ||
finished = true | ||
}) | ||
const cleanup = eos( | ||
stream, | ||
{ | ||
readable: reading, | ||
writable: writing | ||
}, | ||
(err) => { | ||
finished = !err | ||
} | ||
) | ||
return { | ||
destroy: (err) => { | ||
if (finished) return | ||
finished = true | ||
destroyImpl.destroyer(stream, err || new ERR_STREAM_DESTROYED('pipe')) | ||
}, | ||
cleanup | ||
} | ||
} | ||
function isRequest(stream) { | ||
return stream.setHeader && typeof stream.abort === 'function'; | ||
function popCallback(streams) { | ||
// Streams should never be an empty array. It should always contain at least | ||
// a single stream. Therefore optimize for the average case instead of | ||
// checking for length === 0 as well. | ||
validateFunction(streams[streams.length - 1], 'streams[stream.length - 1]') | ||
return streams.pop() | ||
} | ||
function destroyer(stream, reading, writing, callback) { | ||
callback = once(callback); | ||
var closed = false; | ||
stream.on('close', function () { | ||
closed = true; | ||
}); | ||
if (eos === undefined) eos = require('./end-of-stream'); | ||
eos(stream, { | ||
readable: reading, | ||
writable: writing | ||
}, function (err) { | ||
if (err) return callback(err); | ||
closed = true; | ||
callback(); | ||
}); | ||
var destroyed = false; | ||
return function (err) { | ||
if (closed) return; | ||
if (destroyed) return; | ||
destroyed = true; // request.destroy just do .end - .abort is what we want | ||
function makeAsyncIterable(val) { | ||
if (isIterable(val)) { | ||
return val | ||
} else if (isReadableNodeStream(val)) { | ||
// Legacy streams are not Iterable. | ||
return fromReadable(val) | ||
} | ||
if (isRequest(stream)) return stream.abort(); | ||
if (typeof stream.destroy === 'function') return stream.destroy(); | ||
callback(err || new ERR_STREAM_DESTROYED('pipe')); | ||
}; | ||
throw new ERR_INVALID_ARG_TYPE('val', ['Readable', 'Iterable', 'AsyncIterable'], val) | ||
} | ||
function call(fn) { | ||
fn(); | ||
async function* fromReadable(val) { | ||
if (!Readable) { | ||
Readable = require('./readable') | ||
} | ||
yield* Readable.prototype[SymbolAsyncIterator].call(val) | ||
} | ||
function pipe(from, to) { | ||
return from.pipe(to); | ||
async function pump(iterable, writable, finish, { end }) { | ||
let error | ||
let onresolve = null | ||
const resume = (err) => { | ||
if (err) { | ||
error = err | ||
} | ||
if (onresolve) { | ||
const callback = onresolve | ||
onresolve = null | ||
callback() | ||
} | ||
} | ||
const wait = () => | ||
new Promise((resolve, reject) => { | ||
if (error) { | ||
reject(error) | ||
} else { | ||
onresolve = () => { | ||
if (error) { | ||
reject(error) | ||
} else { | ||
resolve() | ||
} | ||
} | ||
} | ||
}) | ||
writable.on('drain', resume) | ||
const cleanup = eos( | ||
writable, | ||
{ | ||
readable: false | ||
}, | ||
resume | ||
) | ||
try { | ||
if (writable.writableNeedDrain) { | ||
await wait() | ||
} | ||
for await (const chunk of iterable) { | ||
if (!writable.write(chunk)) { | ||
await wait() | ||
} | ||
} | ||
if (end) { | ||
writable.end() | ||
} | ||
await wait() | ||
finish() | ||
} catch (err) { | ||
finish(error !== err ? aggregateTwoErrors(error, err) : err) | ||
} finally { | ||
cleanup() | ||
writable.off('drain', resume) | ||
} | ||
} | ||
function popCallback(streams) { | ||
if (!streams.length) return noop; | ||
if (typeof streams[streams.length - 1] !== 'function') return noop; | ||
return streams.pop(); | ||
function pipeline(...streams) { | ||
return pipelineImpl(streams, once(popCallback(streams))) | ||
} | ||
function pipeline() { | ||
for (var _len = arguments.length, streams = new Array(_len), _key = 0; _key < _len; _key++) { | ||
streams[_key] = arguments[_key]; | ||
function pipelineImpl(streams, callback, opts) { | ||
if (streams.length === 1 && ArrayIsArray(streams[0])) { | ||
streams = streams[0] | ||
} | ||
var callback = popCallback(streams); | ||
if (Array.isArray(streams[0])) streams = streams[0]; | ||
if (streams.length < 2) { | ||
throw new ERR_MISSING_ARGS('streams'); | ||
throw new ERR_MISSING_ARGS('streams') | ||
} | ||
var error; | ||
var destroys = streams.map(function (stream, i) { | ||
var reading = i < streams.length - 1; | ||
var writing = i > 0; | ||
return destroyer(stream, reading, writing, function (err) { | ||
if (!error) error = err; | ||
if (err) destroys.forEach(call); | ||
if (reading) return; | ||
destroys.forEach(call); | ||
callback(error); | ||
}); | ||
}); | ||
return streams.reduce(pipe); | ||
const ac = new AbortController() | ||
const signal = ac.signal | ||
const outerSignal = opts === null || opts === undefined ? undefined : opts.signal // Need to cleanup event listeners if last stream is readable | ||
// https://github.com/nodejs/node/issues/35452 | ||
const lastStreamCleanup = [] | ||
validateAbortSignal(outerSignal, 'options.signal') | ||
function abort() { | ||
finishImpl(new AbortError()) | ||
} | ||
outerSignal === null || outerSignal === undefined ? undefined : outerSignal.addEventListener('abort', abort) | ||
let error | ||
let value | ||
const destroys = [] | ||
let finishCount = 0 | ||
function finish(err) { | ||
finishImpl(err, --finishCount === 0) | ||
} | ||
function finishImpl(err, final) { | ||
if (err && (!error || error.code === 'ERR_STREAM_PREMATURE_CLOSE')) { | ||
error = err | ||
} | ||
if (!error && !final) { | ||
return | ||
} | ||
while (destroys.length) { | ||
destroys.shift()(error) | ||
} | ||
outerSignal === null || outerSignal === undefined ? undefined : outerSignal.removeEventListener('abort', abort) | ||
ac.abort() | ||
if (final) { | ||
if (!error) { | ||
lastStreamCleanup.forEach((fn) => fn()) | ||
} | ||
process.nextTick(callback, error, value) | ||
} | ||
} | ||
let ret | ||
for (let i = 0; i < streams.length; i++) { | ||
const stream = streams[i] | ||
const reading = i < streams.length - 1 | ||
const writing = i > 0 | ||
const end = reading || (opts === null || opts === undefined ? undefined : opts.end) !== false | ||
const isLastStream = i === streams.length - 1 | ||
if (isNodeStream(stream)) { | ||
if (end) { | ||
const { destroy, cleanup } = destroyer(stream, reading, writing) | ||
destroys.push(destroy) | ||
if (isReadable(stream) && isLastStream) { | ||
lastStreamCleanup.push(cleanup) | ||
} | ||
} // Catch stream errors that occur after pipe/pump has completed. | ||
function onError(err) { | ||
if (err && err.name !== 'AbortError' && err.code !== 'ERR_STREAM_PREMATURE_CLOSE') { | ||
finish(err) | ||
} | ||
} | ||
stream.on('error', onError) | ||
if (isReadable(stream) && isLastStream) { | ||
lastStreamCleanup.push(() => { | ||
stream.removeListener('error', onError) | ||
}) | ||
} | ||
} | ||
if (i === 0) { | ||
if (typeof stream === 'function') { | ||
ret = stream({ | ||
signal | ||
}) | ||
if (!isIterable(ret)) { | ||
throw new ERR_INVALID_RETURN_VALUE('Iterable, AsyncIterable or Stream', 'source', ret) | ||
} | ||
} else if (isIterable(stream) || isReadableNodeStream(stream)) { | ||
ret = stream | ||
} else { | ||
ret = Duplex.from(stream) | ||
} | ||
} else if (typeof stream === 'function') { | ||
ret = makeAsyncIterable(ret) | ||
ret = stream(ret, { | ||
signal | ||
}) | ||
if (reading) { | ||
if (!isIterable(ret, true)) { | ||
throw new ERR_INVALID_RETURN_VALUE('AsyncIterable', `transform[${i - 1}]`, ret) | ||
} | ||
} else { | ||
var _ret | ||
if (!PassThrough) { | ||
PassThrough = require('./passthrough') | ||
} // If the last argument to pipeline is not a stream | ||
// we must create a proxy stream so that pipeline(...) | ||
// always returns a stream which can be further | ||
// composed through `.pipe(stream)`. | ||
const pt = new PassThrough({ | ||
objectMode: true | ||
}) // Handle Promises/A+ spec, `then` could be a getter that throws on | ||
// second use. | ||
const then = (_ret = ret) === null || _ret === undefined ? undefined : _ret.then | ||
if (typeof then === 'function') { | ||
finishCount++ | ||
then.call( | ||
ret, | ||
(val) => { | ||
value = val | ||
if (val != null) { | ||
pt.write(val) | ||
} | ||
if (end) { | ||
pt.end() | ||
} | ||
process.nextTick(finish) | ||
}, | ||
(err) => { | ||
pt.destroy(err) | ||
process.nextTick(finish, err) | ||
} | ||
) | ||
} else if (isIterable(ret, true)) { | ||
finishCount++ | ||
pump(ret, pt, finish, { | ||
end | ||
}) | ||
} else { | ||
throw new ERR_INVALID_RETURN_VALUE('AsyncIterable or Promise', 'destination', ret) | ||
} | ||
ret = pt | ||
const { destroy, cleanup } = destroyer(ret, false, true) | ||
destroys.push(destroy) | ||
if (isLastStream) { | ||
lastStreamCleanup.push(cleanup) | ||
} | ||
} | ||
} else if (isNodeStream(stream)) { | ||
if (isReadableNodeStream(ret)) { | ||
finishCount += 2 | ||
const cleanup = pipe(ret, stream, finish, { | ||
end | ||
}) | ||
if (isReadable(stream) && isLastStream) { | ||
lastStreamCleanup.push(cleanup) | ||
} | ||
} else if (isIterable(ret)) { | ||
finishCount++ | ||
pump(ret, stream, finish, { | ||
end | ||
}) | ||
} else { | ||
throw new ERR_INVALID_ARG_TYPE('val', ['Readable', 'Iterable', 'AsyncIterable'], ret) | ||
} | ||
ret = stream | ||
} else { | ||
ret = Duplex.from(stream) | ||
} | ||
} | ||
if ( | ||
(signal !== null && signal !== undefined && signal.aborted) || | ||
(outerSignal !== null && outerSignal !== undefined && outerSignal.aborted) | ||
) { | ||
process.nextTick(abort) | ||
} | ||
return ret | ||
} | ||
module.exports = pipeline; | ||
function pipe(src, dst, finish, { end }) { | ||
src.pipe(dst, { | ||
end | ||
}) | ||
if (end) { | ||
// Compat. Before node v10.12.0 stdio used to throw an error so | ||
// pipe() did/does not end() stdio destinations. | ||
// Now they allow it but "secretly" don't close the underlying fd. | ||
src.once('end', () => dst.end()) | ||
} else { | ||
finish() | ||
} | ||
eos( | ||
src, | ||
{ | ||
readable: true, | ||
writable: false | ||
}, | ||
(err) => { | ||
const rState = src._readableState | ||
if ( | ||
err && | ||
err.code === 'ERR_STREAM_PREMATURE_CLOSE' && | ||
rState && | ||
rState.ended && | ||
!rState.errored && | ||
!rState.errorEmitted | ||
) { | ||
// Some readable streams will emit 'close' before 'end'. However, since | ||
// this is on the readable side 'end' should still be emitted if the | ||
// stream has been ended and no error emitted. This should be allowed in | ||
// favor of backwards compatibility. Since the stream is piped to a | ||
// destination this should not result in any observable difference. | ||
// We don't need to check if this is a writable premature close since | ||
// eos will only fail with premature close on the reading side for | ||
// duplex streams. | ||
src.once('end', finish).once('error', finish) | ||
} else { | ||
finish(err) | ||
} | ||
} | ||
) | ||
return eos( | ||
dst, | ||
{ | ||
readable: false, | ||
writable: true | ||
}, | ||
finish | ||
) | ||
} | ||
module.exports = { | ||
pipelineImpl, | ||
pipeline | ||
} |
@@ -1,27 +0,33 @@ | ||
'use strict'; | ||
'use strict' | ||
var ERR_INVALID_OPT_VALUE = require('../../../errors').codes.ERR_INVALID_OPT_VALUE; | ||
const { MathFloor, NumberIsInteger } = require('../../ours/primordials') | ||
const { ERR_INVALID_ARG_VALUE } = require('../../ours/errors').codes | ||
function highWaterMarkFrom(options, isDuplex, duplexKey) { | ||
return options.highWaterMark != null ? options.highWaterMark : isDuplex ? options[duplexKey] : null; | ||
return options.highWaterMark != null ? options.highWaterMark : isDuplex ? options[duplexKey] : null | ||
} | ||
function getDefaultHighWaterMark(objectMode) { | ||
return objectMode ? 16 : 16 * 1024 | ||
} | ||
function getHighWaterMark(state, options, duplexKey, isDuplex) { | ||
var hwm = highWaterMarkFrom(options, isDuplex, duplexKey); | ||
const hwm = highWaterMarkFrom(options, isDuplex, duplexKey) | ||
if (hwm != null) { | ||
if (!(isFinite(hwm) && Math.floor(hwm) === hwm) || hwm < 0) { | ||
var name = isDuplex ? duplexKey : 'highWaterMark'; | ||
throw new ERR_INVALID_OPT_VALUE(name, hwm); | ||
if (!NumberIsInteger(hwm) || hwm < 0) { | ||
const name = isDuplex ? `options.${duplexKey}` : 'options.highWaterMark' | ||
throw new ERR_INVALID_ARG_VALUE(name, hwm) | ||
} | ||
return Math.floor(hwm); | ||
return MathFloor(hwm) | ||
} // Default value | ||
return state.objectMode ? 16 : 16 * 1024; | ||
return getDefaultHighWaterMark(state.objectMode) | ||
} | ||
module.exports = { | ||
getHighWaterMark: getHighWaterMark | ||
}; | ||
getHighWaterMark, | ||
getDefaultHighWaterMark | ||
} |
129
package.json
{ | ||
"name": "readable-stream", | ||
"version": "3.6.0", | ||
"description": "Streams3, a user-land copy of the stream library from Node.js", | ||
"main": "readable.js", | ||
"engines": { | ||
"node": ">= 6" | ||
}, | ||
"dependencies": { | ||
"inherits": "^2.0.3", | ||
"string_decoder": "^1.1.1", | ||
"util-deprecate": "^1.0.1" | ||
}, | ||
"devDependencies": { | ||
"@babel/cli": "^7.2.0", | ||
"@babel/core": "^7.2.0", | ||
"@babel/polyfill": "^7.0.0", | ||
"@babel/preset-env": "^7.2.0", | ||
"airtap": "0.0.9", | ||
"assert": "^1.4.0", | ||
"bl": "^2.0.0", | ||
"deep-strict-equal": "^0.2.0", | ||
"events.once": "^2.0.2", | ||
"glob": "^7.1.2", | ||
"gunzip-maybe": "^1.4.1", | ||
"hyperquest": "^2.1.3", | ||
"lolex": "^2.6.0", | ||
"nyc": "^11.0.0", | ||
"pump": "^3.0.0", | ||
"rimraf": "^2.6.2", | ||
"tap": "^12.0.0", | ||
"tape": "^4.9.0", | ||
"tar-fs": "^1.16.2", | ||
"util-promisify": "^2.1.0" | ||
}, | ||
"scripts": { | ||
"test": "tap -J --no-esm test/parallel/*.js test/ours/*.js", | ||
"ci": "TAP=1 tap --no-esm test/parallel/*.js test/ours/*.js | tee test.tap", | ||
"test-browsers": "airtap --sauce-connect --loopback airtap.local -- test/browser.js", | ||
"test-browser-local": "airtap --open --local -- test/browser.js", | ||
"cover": "nyc npm test", | ||
"report": "nyc report --reporter=lcov", | ||
"update-browser-errors": "babel -o errors-browser.js errors.js" | ||
}, | ||
"repository": { | ||
"type": "git", | ||
"url": "git://github.com/nodejs/readable-stream" | ||
}, | ||
"version": "4.0.0", | ||
"description": "Node.js Streams, a user-land copy of the stream library from Node.js", | ||
"homepage": "https://github.com/nodejs/readable-stream", | ||
"license": "MIT", | ||
"licenses": [ | ||
{ | ||
"type": "MIT", | ||
"url": "https://choosealicense.com/licenses/mit/" | ||
} | ||
], | ||
"keywords": [ | ||
@@ -54,16 +18,67 @@ "readable", | ||
], | ||
"repository": { | ||
"type": "git", | ||
"url": "git://github.com/nodejs/readable-stream" | ||
}, | ||
"bugs": { | ||
"url": "https://github.com/nodejs/readable-stream/issues" | ||
}, | ||
"main": "lib/ours/index.js", | ||
"files": [ | ||
"lib", | ||
"LICENSE", | ||
"README.md" | ||
], | ||
"browser": { | ||
"util": false, | ||
"worker_threads": false, | ||
"./errors": "./errors-browser.js", | ||
"./readable.js": "./readable-browser.js", | ||
"./lib/internal/streams/from.js": "./lib/internal/streams/from-browser.js", | ||
"./lib/internal/streams/stream.js": "./lib/internal/streams/stream-browser.js" | ||
"util": "./lib/ours/util.js", | ||
"./lib/ours/index.js": "./lib/ours/browser.js" | ||
}, | ||
"nyc": { | ||
"include": [ | ||
"lib/**.js" | ||
] | ||
"scripts": { | ||
"build": "node build/build.mjs", | ||
"postbuild": "prettier -w lib test", | ||
"test": "tap --rcfile=./tap.yml test/parallel/test-*.js test/ours/test-*.js", | ||
"test:prepare": "node test/browser/runner-prepare.mjs", | ||
"test:browsers": "node test/browser/runner-browser.mjs", | ||
"test:bundlers": "node test/browser/runner-node.mjs", | ||
"coverage": "c8 -c ./c8.json tap --rcfile=./tap.yml test/parallel/test-*.js test/ours/test-*.js", | ||
"format": "prettier -w src lib test", | ||
"lint": "eslint src" | ||
}, | ||
"license": "MIT" | ||
"dependencies": { | ||
"abort-controller": "^3.0.0" | ||
}, | ||
"devDependencies": { | ||
"@babel/core": "^7.17.10", | ||
"@babel/plugin-proposal-nullish-coalescing-operator": "^7.16.7", | ||
"@babel/plugin-proposal-optional-chaining": "^7.16.7", | ||
"@rollup/plugin-commonjs": "^22.0.0", | ||
"@rollup/plugin-inject": "^4.0.4", | ||
"@rollup/plugin-node-resolve": "^13.3.0", | ||
"@sinonjs/fake-timers": "^9.1.2", | ||
"browserify": "^17.0.0", | ||
"buffer-es6": "^4.9.3", | ||
"c8": "^7.11.2", | ||
"esbuild": "^0.14.39", | ||
"esbuild-plugin-alias": "^0.2.1", | ||
"eslint": "^8.15.0", | ||
"eslint-config-standard": "^17.0.0", | ||
"eslint-plugin-import": "^2.26.0", | ||
"eslint-plugin-n": "^15.2.0", | ||
"eslint-plugin-promise": "^6.0.0", | ||
"playwright": "^1.21.1", | ||
"prettier": "^2.6.2", | ||
"process-es6": "^0.11.6", | ||
"rollup": "^2.72.1", | ||
"rollup-plugin-polyfill-node": "^0.9.0", | ||
"tap": "^16.2.0", | ||
"tap-mocha-reporter": "^5.0.3", | ||
"tape": "^5.5.3", | ||
"tar": "^6.1.11", | ||
"undici": "^5.1.1", | ||
"webpack": "^5.72.1", | ||
"webpack-cli": "^4.9.2" | ||
}, | ||
"engines": { | ||
"node": "^12.22.0 || ^14.17.0 || >=16.0.0" | ||
} | ||
} |
# readable-stream | ||
***Node.js core streams for userland*** [![Build Status](https://travis-ci.com/nodejs/readable-stream.svg?branch=master)](https://travis-ci.com/nodejs/readable-stream) | ||
**_Node.js core streams for userland_** | ||
[![npm status](https://img.shields.io/npm/v/readable-stream.svg)](https://npm.im/readable-stream) | ||
[![node](https://img.shields.io/node/v/readable-stream.svg)](https://www.npmjs.org/package/readable-stream) | ||
[![Node.js Build](https://github.com/nodejs/readable-stream/workflows/Node.js/badge.svg)](https://github.com/nodejs/readable-stream/actions?query=workflow%3ANode.js) | ||
[![Browsers Build](https://github.com/nodejs/readable-stream/workflows/Browsers/badge.svg)](https://github.com/nodejs/readable-stream/actions?query=workflow%3ABrowsers) | ||
[![NPM](https://nodei.co/npm/readable-stream.png?downloads=true&downloadRank=true)](https://nodei.co/npm/readable-stream/) | ||
[![NPM](https://nodei.co/npm-dl/readable-stream.png?&months=6&height=3)](https://nodei.co/npm/readable-stream/) | ||
[![Sauce Test Status](https://saucelabs.com/browser-matrix/readabe-stream.svg)](https://saucelabs.com/u/readabe-stream) | ||
```bash | ||
npm install --save readable-stream | ||
npm install readable-stream | ||
``` | ||
This package is a mirror of the streams implementations in Node.js. | ||
This package is a mirror of the streams implementations in Node.js 18.0.0. | ||
Full documentation may be found on the [Node.js website](https://nodejs.org/dist/v10.19.0/docs/api/stream.html). | ||
Full documentation may be found on the [Node.js website](https://nodejs.org/dist/v18.0.0/docs/api/stream.html). | ||
If you want to guarantee a stable streams base, regardless of what version of | ||
Node you, or the users of your libraries are using, use **readable-stream** *only* and avoid the *"stream"* module in Node-core, for background see [this blogpost](http://r.va.gg/2014/06/why-i-dont-use-nodes-core-stream-module.html). | ||
Node you, or the users of your libraries are using, use **readable-stream** _only_ and avoid the _"stream"_ module in Node-core, for background see [this blogpost](http://r.va.gg/2014/06/why-i-dont-use-nodes-core-stream-module.html). | ||
As of version 2.0.0 **readable-stream** uses semantic versioning. | ||
## Version 4.x.x | ||
v4.x.x of `readable-stream` is a cut from Node 18. This version supports Node 12, 14, 16 and 18, as well as evergreen browsers. | ||
The breaking changes introduced by v4 are composed of the combined breaking changes in: | ||
* [Node v12](https://nodejs.org/en/blog/release/v12.0.0/) | ||
* [Node v13](https://nodejs.org/en/blog/release/v13.0.0/) | ||
* [Node v14](https://nodejs.org/en/blog/release/v14.0.0/) | ||
* [Node v15](https://nodejs.org/en/blog/release/v15.0.0/) | ||
* [Node v16](https://nodejs.org/en/blog/release/v16.0.0/) | ||
* [Node v17](https://nodejs.org/en/blog/release/v17.0.0/) | ||
* [Node v18](https://nodejs.org/en/blog/release/v18.0.0/) | ||
This also includes _many_ new features. | ||
## Version 3.x.x | ||
@@ -51,8 +63,5 @@ | ||
## Version 2.x.x | ||
v2.x.x of `readable-stream` is a cut of the stream module from Node 8 (there have been no semver-major changes from Node 4 to 8). This version supports all Node.js versions from 0.8, as well as evergreen browsers and IE 10 & 11. | ||
### Big Thanks | ||
Cross-browser Testing Platform and Open Source <3 Provided by [Sauce Labs][sauce] | ||
# Usage | ||
@@ -73,3 +82,3 @@ | ||
} = require('readable-stream') | ||
```` | ||
``` | ||
@@ -81,2 +90,26 @@ Note that `require('stream')` will return `Stream`, while | ||
## Usage In Browsers | ||
You will need a bundler like [`browserify`](https://github.com/browserify/browserify#readme), [`webpack`](https://webpack.js.org/), [`parcel`](https://github.com/parcel-bundler/parcel#readme) or similar. With Webpack 5 (which unlike other bundlers does not polyfill Node.js core modules and globals like `process`) you will also need to: | ||
1. Install polyfills by running `npm install buffer process --save-dev` | ||
2. Create a [`webpack.config.js`](https://webpack.js.org/guides/getting-started/#using-a-configuration) file containing: | ||
```js | ||
const webpack = require('webpack') | ||
module.exports = { | ||
plugins: [ | ||
new webpack.ProvidePlugin({ | ||
process: 'process/browser' | ||
}) | ||
], | ||
resolve: { | ||
fallback: { | ||
buffer: require.resolve('buffer/') | ||
} | ||
} | ||
} | ||
``` | ||
# Streams Working Group | ||
@@ -88,23 +121,20 @@ | ||
* Addressing stream issues on the Node.js issue tracker. | ||
* Authoring and editing stream documentation within the Node.js project. | ||
* Reviewing changes to stream subclasses within the Node.js project. | ||
* Redirecting changes to streams from the Node.js project to this | ||
- Addressing stream issues on the Node.js issue tracker. | ||
- Authoring and editing stream documentation within the Node.js project. | ||
- Reviewing changes to stream subclasses within the Node.js project. | ||
- Redirecting changes to streams from the Node.js project to this | ||
project. | ||
* Assisting in the implementation of stream providers within Node.js. | ||
* Recommending versions of `readable-stream` to be included in Node.js. | ||
* Messaging about the future of streams to give the community advance | ||
- Assisting in the implementation of stream providers within Node.js. | ||
- Recommending versions of `readable-stream` to be included in Node.js. | ||
- Messaging about the future of streams to give the community advance | ||
notice of changes. | ||
<a name="members"></a> | ||
## Team Members | ||
* **Calvin Metcalf** ([@calvinmetcalf](https://github.com/calvinmetcalf)) <calvin.metcalf@gmail.com> | ||
- Release GPG key: F3EF5F62A87FC27A22E643F714CE4FF5015AA242 | ||
* **Mathias Buus** ([@mafintosh](https://github.com/mafintosh)) <mathiasbuus@gmail.com> | ||
* **Matteo Collina** ([@mcollina](https://github.com/mcollina)) <matteo.collina@gmail.com> | ||
- **Mathias Buus** ([@mafintosh](https://github.com/mafintosh)) <mathiasbuus@gmail.com> | ||
- **Matteo Collina** ([@mcollina](https://github.com/mcollina)) <matteo.collina@gmail.com> | ||
- Release GPG key: 3ABC01543F22DD2239285CDD818674489FBC127E | ||
* **Irina Shestak** ([@lrlna](https://github.com/lrlna)) <shestak.irina@gmail.com> | ||
* **Yoshua Wyuts** ([@yoshuawuyts](https://github.com/yoshuawuyts)) <yoshuawuyts@gmail.com> | ||
[sauce]: https://saucelabs.com | ||
- **Robert Nagy** ([@ronag](https://github.com/ronag)) <ronagy@icloud.com> | ||
- **Vincent Weevers** ([@vweevers](https://github.com/vweevers)) <mail@vincentweevers.nl> |
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
No bug tracker
MaintenancePackage does not have a linked bug tracker in package.json.
Found 1 instance in 1 package
No website
QualityPackage does not have a website.
Found 1 instance in 1 package
193244
1
34
5561
1
0
137
29
1
+ Addedabort-controller@^3.0.0
+ Addedabort-controller@3.0.0(transitive)
+ Addedevent-target-shim@5.0.1(transitive)
- Removedinherits@^2.0.3
- Removedstring_decoder@^1.1.1
- Removedutil-deprecate@^1.0.1
- Removedinherits@2.0.4(transitive)
- Removedsafe-buffer@5.2.1(transitive)
- Removedstring_decoder@1.3.0(transitive)
- Removedutil-deprecate@1.0.2(transitive)