readable-stream
Advanced tools
Comparing version 3.6.0 to 3.6.1
@@ -21,2 +21,3 @@ // Copyright Joyent, Inc. and other Node contributors. | ||
// USE OR OTHER DEALINGS IN THE SOFTWARE. | ||
// a duplex stream is just a stream that is both readable and writable. | ||
@@ -26,12 +27,9 @@ // Since JS doesn't have multiple prototypal inheritance, this class | ||
// Writable. | ||
'use strict'; | ||
/*<replacement>*/ | ||
var objectKeys = Object.keys || function (obj) { | ||
var keys = []; | ||
for (var key in obj) { | ||
keys.push(key); | ||
} | ||
for (var key in obj) keys.push(key); | ||
return keys; | ||
@@ -41,21 +39,14 @@ }; | ||
module.exports = Duplex; | ||
var Readable = require('./_stream_readable'); | ||
var Writable = require('./_stream_writable'); | ||
const Readable = require('./_stream_readable'); | ||
const Writable = require('./_stream_writable'); | ||
require('inherits')(Duplex, Readable); | ||
{ | ||
// Allow the keys array to be GC'ed. | ||
var keys = objectKeys(Writable.prototype); | ||
const keys = objectKeys(Writable.prototype); | ||
for (var v = 0; v < keys.length; v++) { | ||
var method = keys[v]; | ||
const method = keys[v]; | ||
if (!Duplex.prototype[method]) Duplex.prototype[method] = Writable.prototype[method]; | ||
} | ||
} | ||
function Duplex(options) { | ||
@@ -66,7 +57,5 @@ if (!(this instanceof Duplex)) return new Duplex(options); | ||
this.allowHalfOpen = true; | ||
if (options) { | ||
if (options.readable === false) this.readable = false; | ||
if (options.writable === false) this.writable = false; | ||
if (options.allowHalfOpen === false) { | ||
@@ -78,3 +67,2 @@ this.allowHalfOpen = false; | ||
} | ||
Object.defineProperty(Duplex.prototype, 'writableHighWaterMark', { | ||
@@ -85,3 +73,3 @@ // making it explicit this property is not enumerable | ||
enumerable: false, | ||
get: function get() { | ||
get() { | ||
return this._writableState.highWaterMark; | ||
@@ -104,19 +92,19 @@ } | ||
enumerable: false, | ||
get: function get() { | ||
get() { | ||
return this._writableState.length; | ||
} | ||
}); // the no-half-open enforcer | ||
}); | ||
// the no-half-open enforcer | ||
function onend() { | ||
// If the writable side ended, then we're ok. | ||
if (this._writableState.ended) return; // no more data can be written. | ||
if (this._writableState.ended) return; | ||
// no more data can be written. | ||
// But allow more writes to happen in this tick. | ||
process.nextTick(onEndNT, this); | ||
} | ||
function onEndNT(self) { | ||
self.end(); | ||
} | ||
Object.defineProperty(Duplex.prototype, 'destroyed', { | ||
@@ -127,10 +115,9 @@ // making it explicit this property is not enumerable | ||
enumerable: false, | ||
get: function get() { | ||
get() { | ||
if (this._readableState === undefined || this._writableState === undefined) { | ||
return false; | ||
} | ||
return this._readableState.destroyed && this._writableState.destroyed; | ||
}, | ||
set: function set(value) { | ||
set(value) { | ||
// we ignore the value if the stream | ||
@@ -140,6 +127,6 @@ // has not been initialized yet | ||
return; | ||
} // backward compatibility, the user is explicitly | ||
} | ||
// backward compatibility, the user is explicitly | ||
// managing destroyed | ||
this._readableState.destroyed = value; | ||
@@ -146,0 +133,0 @@ this._writableState.destroyed = value; |
@@ -21,13 +21,12 @@ // Copyright Joyent, Inc. and other Node contributors. | ||
// USE OR OTHER DEALINGS IN THE SOFTWARE. | ||
// a passthrough stream. | ||
// basically just the most minimal sort of Transform stream. | ||
// Every written chunk gets output as-is. | ||
'use strict'; | ||
module.exports = PassThrough; | ||
var Transform = require('./_stream_transform'); | ||
const Transform = require('./_stream_transform'); | ||
require('inherits')(PassThrough, Transform); | ||
function PassThrough(options) { | ||
@@ -37,5 +36,4 @@ if (!(this instanceof PassThrough)) return new PassThrough(options); | ||
} | ||
PassThrough.prototype._transform = function (chunk, encoding, cb) { | ||
cb(null, chunk); | ||
}; |
@@ -21,7 +21,8 @@ // Copyright Joyent, Inc. and other Node contributors. | ||
// USE OR OTHER DEALINGS IN THE SOFTWARE. | ||
'use strict'; | ||
module.exports = Readable; | ||
/*<replacement>*/ | ||
var Duplex; | ||
@@ -31,6 +32,5 @@ /*</replacement>*/ | ||
Readable.ReadableState = ReadableState; | ||
/*<replacement>*/ | ||
var EE = require('events').EventEmitter; | ||
const EE = require('events').EventEmitter; | ||
var EElistenerCount = function EElistenerCount(emitter, type) { | ||
@@ -42,26 +42,17 @@ return emitter.listeners(type).length; | ||
/*<replacement>*/ | ||
var Stream = require('./internal/streams/stream'); | ||
/*</replacement>*/ | ||
var Buffer = require('buffer').Buffer; | ||
var OurUint8Array = global.Uint8Array || function () {}; | ||
const Buffer = require('buffer').Buffer; | ||
const OurUint8Array = (typeof global !== 'undefined' ? global : typeof window !== 'undefined' ? window : typeof self !== 'undefined' ? self : {}).Uint8Array || function () {}; | ||
function _uint8ArrayToBuffer(chunk) { | ||
return Buffer.from(chunk); | ||
} | ||
function _isUint8Array(obj) { | ||
return Buffer.isBuffer(obj) || obj instanceof OurUint8Array; | ||
} | ||
/*<replacement>*/ | ||
var debugUtil = require('util'); | ||
var debug; | ||
const debugUtil = require('util'); | ||
let debug; | ||
if (debugUtil && debugUtil.debuglog) { | ||
@@ -74,40 +65,35 @@ debug = debugUtil.debuglog('stream'); | ||
const BufferList = require('./internal/streams/buffer_list'); | ||
const destroyImpl = require('./internal/streams/destroy'); | ||
const _require = require('./internal/streams/state'), | ||
getHighWaterMark = _require.getHighWaterMark; | ||
const _require$codes = require('../errors').codes, | ||
ERR_INVALID_ARG_TYPE = _require$codes.ERR_INVALID_ARG_TYPE, | ||
ERR_STREAM_PUSH_AFTER_EOF = _require$codes.ERR_STREAM_PUSH_AFTER_EOF, | ||
ERR_METHOD_NOT_IMPLEMENTED = _require$codes.ERR_METHOD_NOT_IMPLEMENTED, | ||
ERR_STREAM_UNSHIFT_AFTER_END_EVENT = _require$codes.ERR_STREAM_UNSHIFT_AFTER_END_EVENT; | ||
var BufferList = require('./internal/streams/buffer_list'); | ||
var destroyImpl = require('./internal/streams/destroy'); | ||
var _require = require('./internal/streams/state'), | ||
getHighWaterMark = _require.getHighWaterMark; | ||
var _require$codes = require('../errors').codes, | ||
ERR_INVALID_ARG_TYPE = _require$codes.ERR_INVALID_ARG_TYPE, | ||
ERR_STREAM_PUSH_AFTER_EOF = _require$codes.ERR_STREAM_PUSH_AFTER_EOF, | ||
ERR_METHOD_NOT_IMPLEMENTED = _require$codes.ERR_METHOD_NOT_IMPLEMENTED, | ||
ERR_STREAM_UNSHIFT_AFTER_END_EVENT = _require$codes.ERR_STREAM_UNSHIFT_AFTER_END_EVENT; // Lazy loaded to improve the startup performance. | ||
var StringDecoder; | ||
var createReadableStreamAsyncIterator; | ||
var from; | ||
// Lazy loaded to improve the startup performance. | ||
let StringDecoder; | ||
let createReadableStreamAsyncIterator; | ||
let from; | ||
require('inherits')(Readable, Stream); | ||
var errorOrDestroy = destroyImpl.errorOrDestroy; | ||
var kProxyEvents = ['error', 'close', 'destroy', 'pause', 'resume']; | ||
const errorOrDestroy = destroyImpl.errorOrDestroy; | ||
const kProxyEvents = ['error', 'close', 'destroy', 'pause', 'resume']; | ||
function prependListener(emitter, event, fn) { | ||
// Sadly this is not cacheable as some libraries bundle their own | ||
// event emitter implementation with them. | ||
if (typeof emitter.prependListener === 'function') return emitter.prependListener(event, fn); // This is a hack to make sure that our error handler is attached before any | ||
if (typeof emitter.prependListener === 'function') return emitter.prependListener(event, fn); | ||
// This is a hack to make sure that our error handler is attached before any | ||
// userland ones. NEVER DO THIS. This is here only because this code needs | ||
// to continue to work with older versions of Node.js that do not include | ||
// the prependListener() method. The goal is to eventually remove this hack. | ||
if (!emitter._events || !emitter._events[event]) emitter.on(event, fn);else if (Array.isArray(emitter._events[event])) emitter._events[event].unshift(fn);else emitter._events[event] = [fn, emitter._events[event]]; | ||
} | ||
function ReadableState(options, stream, isDuplex) { | ||
Duplex = Duplex || require('./_stream_duplex'); | ||
options = options || {}; // Duplex streams are both readable and writable, but share | ||
options = options || {}; | ||
// Duplex streams are both readable and writable, but share | ||
// the same options object. | ||
@@ -117,14 +103,16 @@ // However, some cases require setting options to different | ||
// These options can be provided separately as readableXXX and writableXXX. | ||
if (typeof isDuplex !== 'boolean') isDuplex = stream instanceof Duplex; | ||
if (typeof isDuplex !== 'boolean') isDuplex = stream instanceof Duplex; // object stream flag. Used to make read(n) ignore n and to | ||
// object stream flag. Used to make read(n) ignore n and to | ||
// make all the buffer merging and length checks go away | ||
this.objectMode = !!options.objectMode; | ||
if (isDuplex) this.objectMode = this.objectMode || !!options.readableObjectMode; | ||
this.objectMode = !!options.objectMode; | ||
if (isDuplex) this.objectMode = this.objectMode || !!options.readableObjectMode; // the point at which it stops calling _read() to fill the buffer | ||
// the point at which it stops calling _read() to fill the buffer | ||
// Note: 0 is a valid value, means "don't call _read preemptively ever" | ||
this.highWaterMark = getHighWaterMark(this, options, 'readableHighWaterMark', isDuplex); | ||
this.highWaterMark = getHighWaterMark(this, options, 'readableHighWaterMark', isDuplex); // A linked list is used to store data chunks instead of an array because the | ||
// A linked list is used to store data chunks instead of an array because the | ||
// linked list can remove elements from the beginning faster than | ||
// array.shift() | ||
this.buffer = new BufferList(); | ||
@@ -137,10 +125,12 @@ this.length = 0; | ||
this.endEmitted = false; | ||
this.reading = false; // a flag to be able to tell if the event 'readable'/'data' is emitted | ||
this.reading = false; | ||
// a flag to be able to tell if the event 'readable'/'data' is emitted | ||
// immediately, or on a later tick. We set this to true at first, because | ||
// any actions that shouldn't happen until "later" should generally also | ||
// not happen before the first read call. | ||
this.sync = true; | ||
this.sync = true; // whenever we return null, then we set a flag to say | ||
// whenever we return null, then we set a flag to say | ||
// that we're awaiting a 'readable' event emission. | ||
this.needReadable = false; | ||
@@ -150,20 +140,25 @@ this.emittedReadable = false; | ||
this.resumeScheduled = false; | ||
this.paused = true; // Should close be emitted on destroy. Defaults to true. | ||
this.paused = true; | ||
this.emitClose = options.emitClose !== false; // Should .destroy() be called after 'end' (and potentially 'finish') | ||
// Should close be emitted on destroy. Defaults to true. | ||
this.emitClose = options.emitClose !== false; | ||
this.autoDestroy = !!options.autoDestroy; // has it been destroyed | ||
// Should .destroy() be called after 'end' (and potentially 'finish') | ||
this.autoDestroy = !!options.autoDestroy; | ||
this.destroyed = false; // Crypto is kind of old and crusty. Historically, its default string | ||
// has it been destroyed | ||
this.destroyed = false; | ||
// Crypto is kind of old and crusty. Historically, its default string | ||
// encoding is 'binary' so we have to make this configurable. | ||
// Everything else in the universe uses 'utf8', though. | ||
this.defaultEncoding = options.defaultEncoding || 'utf8'; | ||
this.defaultEncoding = options.defaultEncoding || 'utf8'; // the number of writers that are awaiting a drain event in .pipe()s | ||
// the number of writers that are awaiting a drain event in .pipe()s | ||
this.awaitDrain = 0; | ||
this.awaitDrain = 0; // if true, a maybeReadMore has been scheduled | ||
// if true, a maybeReadMore has been scheduled | ||
this.readingMore = false; | ||
this.decoder = null; | ||
this.encoding = null; | ||
if (options.encoding) { | ||
@@ -175,13 +170,13 @@ if (!StringDecoder) StringDecoder = require('string_decoder/').StringDecoder; | ||
} | ||
function Readable(options) { | ||
Duplex = Duplex || require('./_stream_duplex'); | ||
if (!(this instanceof Readable)) return new Readable(options); // Checking for a Stream.Duplex instance is faster here instead of inside | ||
if (!(this instanceof Readable)) return new Readable(options); | ||
// Checking for a Stream.Duplex instance is faster here instead of inside | ||
// the ReadableState constructor, at least with V8 6.5 | ||
const isDuplex = this instanceof Duplex; | ||
this._readableState = new ReadableState(options, this, isDuplex); | ||
var isDuplex = this instanceof Duplex; | ||
this._readableState = new ReadableState(options, this, isDuplex); // legacy | ||
// legacy | ||
this.readable = true; | ||
if (options) { | ||
@@ -191,6 +186,4 @@ if (typeof options.read === 'function') this._read = options.read; | ||
} | ||
Stream.call(this); | ||
} | ||
Object.defineProperty(Readable.prototype, 'destroyed', { | ||
@@ -201,10 +194,9 @@ // making it explicit this property is not enumerable | ||
enumerable: false, | ||
get: function get() { | ||
get() { | ||
if (this._readableState === undefined) { | ||
return false; | ||
} | ||
return this._readableState.destroyed; | ||
}, | ||
set: function set(value) { | ||
set(value) { | ||
// we ignore the value if the stream | ||
@@ -214,6 +206,6 @@ // has not been initialized yet | ||
return; | ||
} // backward compatibility, the user is explicitly | ||
} | ||
// backward compatibility, the user is explicitly | ||
// managing destroyed | ||
this._readableState.destroyed = value; | ||
@@ -224,19 +216,16 @@ } | ||
Readable.prototype._undestroy = destroyImpl.undestroy; | ||
Readable.prototype._destroy = function (err, cb) { | ||
cb(err); | ||
}; // Manually shove something into the read() buffer. | ||
}; | ||
// Manually shove something into the read() buffer. | ||
// This returns true if the highWaterMark has not been hit yet, | ||
// similar to how Writable.write() returns true if you should | ||
// write() some more. | ||
Readable.prototype.push = function (chunk, encoding) { | ||
var state = this._readableState; | ||
var skipChunkCheck; | ||
if (!state.objectMode) { | ||
if (typeof chunk === 'string') { | ||
encoding = encoding || state.defaultEncoding; | ||
if (encoding !== state.encoding) { | ||
@@ -246,3 +235,2 @@ chunk = Buffer.from(chunk, encoding); | ||
} | ||
skipChunkCheck = true; | ||
@@ -253,15 +241,12 @@ } | ||
} | ||
return readableAddChunk(this, chunk, encoding, false, skipChunkCheck); | ||
}; // Unshift should *always* be something directly out of read() | ||
}; | ||
// Unshift should *always* be something directly out of read() | ||
Readable.prototype.unshift = function (chunk) { | ||
return readableAddChunk(this, chunk, null, true, false); | ||
}; | ||
function readableAddChunk(stream, chunk, encoding, addToFront, skipChunkCheck) { | ||
debug('readableAddChunk', chunk); | ||
var state = stream._readableState; | ||
if (chunk === null) { | ||
@@ -273,3 +258,2 @@ state.reading = false; | ||
if (!skipChunkCheck) er = chunkInvalid(state, chunk); | ||
if (er) { | ||
@@ -281,3 +265,2 @@ errorOrDestroy(stream, er); | ||
} | ||
if (addToFront) { | ||
@@ -291,3 +274,2 @@ if (state.endEmitted) errorOrDestroy(stream, new ERR_STREAM_UNSHIFT_AFTER_END_EVENT());else addChunk(stream, state, chunk, true); | ||
state.reading = false; | ||
if (state.decoder && !encoding) { | ||
@@ -304,10 +286,9 @@ chunk = state.decoder.write(chunk); | ||
} | ||
} // We can push more data if we are below the highWaterMark. | ||
} | ||
// We can push more data if we are below the highWaterMark. | ||
// Also, if we have no data yet, we can stand some more bytes. | ||
// This is to work around cases where hwm=0, such as the repl. | ||
return !state.ended && (state.length < state.highWaterMark || state.length === 0); | ||
} | ||
function addChunk(stream, state, chunk, addToFront) { | ||
@@ -323,31 +304,26 @@ if (state.flowing && state.length === 0 && !state.sync) { | ||
} | ||
maybeReadMore(stream, state); | ||
} | ||
function chunkInvalid(state, chunk) { | ||
var er; | ||
if (!_isUint8Array(chunk) && typeof chunk !== 'string' && chunk !== undefined && !state.objectMode) { | ||
er = new ERR_INVALID_ARG_TYPE('chunk', ['string', 'Buffer', 'Uint8Array'], chunk); | ||
} | ||
return er; | ||
} | ||
Readable.prototype.isPaused = function () { | ||
return this._readableState.flowing === false; | ||
}; // backwards compatibility. | ||
}; | ||
// backwards compatibility. | ||
Readable.prototype.setEncoding = function (enc) { | ||
if (!StringDecoder) StringDecoder = require('string_decoder/').StringDecoder; | ||
var decoder = new StringDecoder(enc); | ||
this._readableState.decoder = decoder; // If setEncoding(null), decoder.encoding equals utf8 | ||
const decoder = new StringDecoder(enc); | ||
this._readableState.decoder = decoder; | ||
// If setEncoding(null), decoder.encoding equals utf8 | ||
this._readableState.encoding = this._readableState.decoder.encoding; | ||
this._readableState.encoding = this._readableState.decoder.encoding; // Iterate over current buffer to convert already stored Buffers: | ||
var p = this._readableState.buffer.head; | ||
var content = ''; | ||
// Iterate over current buffer to convert already stored Buffers: | ||
let p = this._readableState.buffer.head; | ||
let content = ''; | ||
while (p !== null) { | ||
@@ -357,13 +333,10 @@ content += decoder.write(p.data); | ||
} | ||
this._readableState.buffer.clear(); | ||
if (content !== '') this._readableState.buffer.push(content); | ||
this._readableState.length = content.length; | ||
return this; | ||
}; // Don't raise the hwm > 1GB | ||
}; | ||
var MAX_HWM = 0x40000000; | ||
// Don't raise the hwm > 1GB | ||
const MAX_HWM = 0x40000000; | ||
function computeNewHighWaterMark(n) { | ||
@@ -384,21 +357,18 @@ if (n >= MAX_HWM) { | ||
} | ||
return n; | ||
} | ||
return n; | ||
} // This function is designed to be inlinable, so please take care when making | ||
// This function is designed to be inlinable, so please take care when making | ||
// changes to the function body. | ||
function howMuchToRead(n, state) { | ||
if (n <= 0 || state.length === 0 && state.ended) return 0; | ||
if (state.objectMode) return 1; | ||
if (n !== n) { | ||
// Only flow one buffer at a time | ||
if (state.flowing && state.length) return state.buffer.head.data.length;else return state.length; | ||
} // If we're asking for more than the current hwm, then raise the hwm. | ||
} | ||
// If we're asking for more than the current hwm, then raise the hwm. | ||
if (n > state.highWaterMark) state.highWaterMark = computeNewHighWaterMark(n); | ||
if (n <= state.length) return n; // Don't have enough | ||
if (n <= state.length) return n; | ||
// Don't have enough | ||
if (!state.ended) { | ||
@@ -408,7 +378,6 @@ state.needReadable = true; | ||
} | ||
return state.length; | ||
} // you can override either this method, or the async _read(n) below. | ||
} | ||
// you can override either this method, or the async _read(n) below. | ||
Readable.prototype.read = function (n) { | ||
@@ -419,6 +388,7 @@ debug('read', n); | ||
var nOrig = n; | ||
if (n !== 0) state.emittedReadable = false; // if we're doing read(0) to trigger a readable event, but we | ||
if (n !== 0) state.emittedReadable = false; | ||
// if we're doing read(0) to trigger a readable event, but we | ||
// already have a bunch of data in the buffer, then just trigger | ||
// the 'readable' event and move on. | ||
if (n === 0 && state.needReadable && ((state.highWaterMark !== 0 ? state.length >= state.highWaterMark : state.length > 0) || state.ended)) { | ||
@@ -429,9 +399,11 @@ debug('read: emitReadable', state.length, state.ended); | ||
} | ||
n = howMuchToRead(n, state); | ||
n = howMuchToRead(n, state); // if we've ended, and we're now clear, then finish it up. | ||
// if we've ended, and we're now clear, then finish it up. | ||
if (n === 0 && state.ended) { | ||
if (state.length === 0) endReadable(this); | ||
return null; | ||
} // All the actual chunk generation logic needs to be | ||
} | ||
// All the actual chunk generation logic needs to be | ||
// *below* the call to _read. The reason is that in certain | ||
@@ -457,15 +429,15 @@ // synthetic stream cases, such as passthrough streams, _read | ||
// 3. Actually pull the requested chunks out of the buffer and return. | ||
// if we need a readable event, then we need to do some reading. | ||
var doRead = state.needReadable; | ||
debug('need readable', doRead); // if we currently have less than the highWaterMark, then also read some | ||
debug('need readable', doRead); | ||
// if we currently have less than the highWaterMark, then also read some | ||
if (state.length === 0 || state.length - n < state.highWaterMark) { | ||
doRead = true; | ||
debug('length less than watermark', doRead); | ||
} // however, if we've ended, then there's no point, and if we're already | ||
} | ||
// however, if we've ended, then there's no point, and if we're already | ||
// reading, then it's unnecessary. | ||
if (state.ended || state.reading) { | ||
@@ -477,17 +449,14 @@ doRead = false; | ||
state.reading = true; | ||
state.sync = true; // if the length is currently zero, then we *need* a readable event. | ||
if (state.length === 0) state.needReadable = true; // call internal read method | ||
state.sync = true; | ||
// if the length is currently zero, then we *need* a readable event. | ||
if (state.length === 0) state.needReadable = true; | ||
// call internal read method | ||
this._read(state.highWaterMark); | ||
state.sync = false; // If _read pushed data synchronously, then `reading` will be false, | ||
state.sync = false; | ||
// If _read pushed data synchronously, then `reading` will be false, | ||
// and we need to re-evaluate how much data we can return to the user. | ||
if (!state.reading) n = howMuchToRead(nOrig, state); | ||
} | ||
var ret; | ||
if (n > 0) ret = fromList(n, state);else ret = null; | ||
if (ret === null) { | ||
@@ -500,22 +469,18 @@ state.needReadable = state.length <= state.highWaterMark; | ||
} | ||
if (state.length === 0) { | ||
// If we have nothing in the buffer, then we want to know | ||
// as soon as we *do* get something into the buffer. | ||
if (!state.ended) state.needReadable = true; // If we tried to read() past the EOF, then emit end on the next tick. | ||
if (!state.ended) state.needReadable = true; | ||
// If we tried to read() past the EOF, then emit end on the next tick. | ||
if (nOrig !== n && state.ended) endReadable(this); | ||
} | ||
if (ret !== null) this.emit('data', ret); | ||
return ret; | ||
}; | ||
function onEofChunk(stream, state) { | ||
debug('onEofChunk'); | ||
if (state.ended) return; | ||
if (state.decoder) { | ||
var chunk = state.decoder.end(); | ||
if (chunk && chunk.length) { | ||
@@ -526,5 +491,3 @@ state.buffer.push(chunk); | ||
} | ||
state.ended = true; | ||
if (state.sync) { | ||
@@ -538,3 +501,2 @@ // if we are sync, wait until next tick to emit the data. | ||
state.needReadable = false; | ||
if (!state.emittedReadable) { | ||
@@ -545,7 +507,7 @@ state.emittedReadable = true; | ||
} | ||
} // Don't emit readable right away in sync mode, because this can trigger | ||
} | ||
// Don't emit readable right away in sync mode, because this can trigger | ||
// another read() call => stack overflow. This way, it might trigger | ||
// a nextTick recursion warning, but that's not so bad. | ||
function emitReadable(stream) { | ||
@@ -555,3 +517,2 @@ var state = stream._readableState; | ||
state.needReadable = false; | ||
if (!state.emittedReadable) { | ||
@@ -563,11 +524,11 @@ debug('emitReadable', state.flowing); | ||
} | ||
function emitReadable_(stream) { | ||
var state = stream._readableState; | ||
debug('emitReadable_', state.destroyed, state.length, state.ended); | ||
if (!state.destroyed && (state.length || state.ended)) { | ||
stream.emit('readable'); | ||
state.emittedReadable = false; | ||
} // The stream needs another readable event if | ||
} | ||
// The stream needs another readable event if | ||
// 1. It is not flowing, as the flow mechanism will take | ||
@@ -578,7 +539,7 @@ // care of it. | ||
// another readable later. | ||
state.needReadable = !state.flowing && !state.ended && state.length <= state.highWaterMark; | ||
flow(stream); | ||
} // at this point, the user has presumably seen the 'readable' event, | ||
} | ||
// at this point, the user has presumably seen the 'readable' event, | ||
// and called read() to consume some data. that may have triggered | ||
@@ -589,4 +550,2 @@ // in turn another _read(n) call, in which case reading = true if | ||
// then go ahead and try to read some more preemptively. | ||
function maybeReadMore(stream, state) { | ||
@@ -598,3 +557,2 @@ if (!state.readingMore) { | ||
} | ||
function maybeReadMore_(stream, state) { | ||
@@ -625,24 +583,22 @@ // Attempt to read more data if we should. | ||
while (!state.reading && !state.ended && (state.length < state.highWaterMark || state.flowing && state.length === 0)) { | ||
var len = state.length; | ||
const len = state.length; | ||
debug('maybeReadMore read 0'); | ||
stream.read(0); | ||
if (len === state.length) // didn't get any data, stop spinning. | ||
if (len === state.length) | ||
// didn't get any data, stop spinning. | ||
break; | ||
} | ||
state.readingMore = false; | ||
} | ||
state.readingMore = false; | ||
} // abstract method. to be overridden in specific implementation classes. | ||
// abstract method. to be overridden in specific implementation classes. | ||
// call cb(er, data) where data is <= n in length. | ||
// for virtual (non-string, non-buffer) streams, "length" is somewhat | ||
// arbitrary, and perhaps not very meaningful. | ||
Readable.prototype._read = function (n) { | ||
errorOrDestroy(this, new ERR_METHOD_NOT_IMPLEMENTED('_read()')); | ||
}; | ||
Readable.prototype.pipe = function (dest, pipeOpts) { | ||
var src = this; | ||
var state = this._readableState; | ||
switch (state.pipesCount) { | ||
@@ -652,7 +608,5 @@ case 0: | ||
break; | ||
case 1: | ||
state.pipes = [state.pipes, dest]; | ||
break; | ||
default: | ||
@@ -662,3 +616,2 @@ state.pipes.push(dest); | ||
} | ||
state.pipesCount += 1; | ||
@@ -670,6 +623,4 @@ debug('pipe count=%d opts=%j', state.pipesCount, pipeOpts); | ||
dest.on('unpipe', onunpipe); | ||
function onunpipe(readable, unpipeInfo) { | ||
debug('onunpipe'); | ||
if (readable === src) { | ||
@@ -682,19 +633,17 @@ if (unpipeInfo && unpipeInfo.hasUnpiped === false) { | ||
} | ||
function onend() { | ||
debug('onend'); | ||
dest.end(); | ||
} // when the dest drains, it reduces the awaitDrain counter | ||
} | ||
// when the dest drains, it reduces the awaitDrain counter | ||
// on the source. This would be more elegant with a .once() | ||
// handler in flow(), but adding and removing repeatedly is | ||
// too slow. | ||
var ondrain = pipeOnDrain(src); | ||
dest.on('drain', ondrain); | ||
var cleanedUp = false; | ||
function cleanup() { | ||
debug('cleanup'); // cleanup event handlers once the pipe is broken | ||
debug('cleanup'); | ||
// cleanup event handlers once the pipe is broken | ||
dest.removeListener('close', onclose); | ||
@@ -708,3 +657,5 @@ dest.removeListener('finish', onfinish); | ||
src.removeListener('data', ondata); | ||
cleanedUp = true; // if the reader is waiting for a drain event from this | ||
cleanedUp = true; | ||
// if the reader is waiting for a drain event from this | ||
// specific writer, then it would cause it to never start | ||
@@ -714,8 +665,5 @@ // flowing again. | ||
// If we don't know, then assume that we are waiting for one. | ||
if (state.awaitDrain && (!dest._writableState || dest._writableState.needDrain)) ondrain(); | ||
} | ||
src.on('data', ondata); | ||
function ondata(chunk) { | ||
@@ -725,3 +673,2 @@ debug('ondata'); | ||
debug('dest.write', ret); | ||
if (ret === false) { | ||
@@ -736,9 +683,8 @@ // If the user unpiped during `dest.write()`, it is possible | ||
} | ||
src.pause(); | ||
} | ||
} // if the dest has an error, then stop piping into it. | ||
} | ||
// if the dest has an error, then stop piping into it. | ||
// however, don't suppress the throwing behavior for this. | ||
function onerror(er) { | ||
@@ -749,7 +695,8 @@ debug('onerror', er); | ||
if (EElistenerCount(dest, 'error') === 0) errorOrDestroy(dest, er); | ||
} // Make sure our error handler is attached before userland ones. | ||
} | ||
// Make sure our error handler is attached before userland ones. | ||
prependListener(dest, 'error', onerror); | ||
prependListener(dest, 'error', onerror); // Both close and finish should trigger unpipe, but only once. | ||
// Both close and finish should trigger unpipe, but only once. | ||
function onclose() { | ||
@@ -759,5 +706,3 @@ dest.removeListener('finish', onfinish); | ||
} | ||
dest.once('close', onclose); | ||
function onfinish() { | ||
@@ -768,13 +713,12 @@ debug('onfinish'); | ||
} | ||
dest.once('finish', onfinish); | ||
function unpipe() { | ||
debug('unpipe'); | ||
src.unpipe(dest); | ||
} // tell the dest that it's being piped to | ||
} | ||
// tell the dest that it's being piped to | ||
dest.emit('pipe', src); | ||
dest.emit('pipe', src); // start the flow if it hasn't been started already. | ||
// start the flow if it hasn't been started already. | ||
if (!state.flowing) { | ||
@@ -784,6 +728,4 @@ debug('pipe resume'); | ||
} | ||
return dest; | ||
}; | ||
function pipeOnDrain(src) { | ||
@@ -794,3 +736,2 @@ return function pipeOnDrainFunctionResult() { | ||
if (state.awaitDrain) state.awaitDrain--; | ||
if (state.awaitDrain === 0 && EElistenerCount(src, 'data')) { | ||
@@ -802,3 +743,2 @@ state.flowing = true; | ||
} | ||
Readable.prototype.unpipe = function (dest) { | ||
@@ -808,11 +748,14 @@ var state = this._readableState; | ||
hasUnpiped: false | ||
}; // if we're not piping anywhere, then do nothing. | ||
}; | ||
if (state.pipesCount === 0) return this; // just one destination. most common case. | ||
// if we're not piping anywhere, then do nothing. | ||
if (state.pipesCount === 0) return this; | ||
// just one destination. most common case. | ||
if (state.pipesCount === 1) { | ||
// passed in one, but it's not the right one. | ||
if (dest && dest !== state.pipes) return this; | ||
if (!dest) dest = state.pipes; // got a match. | ||
if (!dest) dest = state.pipes; | ||
// got a match. | ||
state.pipes = null; | ||
@@ -823,4 +766,5 @@ state.pipesCount = 0; | ||
return this; | ||
} // slow case. multiple pipe destinations. | ||
} | ||
// slow case. multiple pipe destinations. | ||
@@ -834,13 +778,9 @@ if (!dest) { | ||
state.flowing = false; | ||
for (var i = 0; i < len; i++) { | ||
dests[i].emit('unpipe', this, { | ||
hasUnpiped: false | ||
}); | ||
} | ||
for (var i = 0; i < len; i++) dests[i].emit('unpipe', this, { | ||
hasUnpiped: false | ||
}); | ||
return this; | ||
} // try to find the right one. | ||
} | ||
// try to find the right one. | ||
var index = indexOf(state.pipes, dest); | ||
@@ -853,15 +793,15 @@ if (index === -1) return this; | ||
return this; | ||
}; // set up data events if they are asked for | ||
}; | ||
// set up data events if they are asked for | ||
// Ensure readable listeners eventually get something | ||
Readable.prototype.on = function (ev, fn) { | ||
var res = Stream.prototype.on.call(this, ev, fn); | ||
var state = this._readableState; | ||
const res = Stream.prototype.on.call(this, ev, fn); | ||
const state = this._readableState; | ||
if (ev === 'data') { | ||
// update readableListening so that resume() may be a no-op | ||
// a few lines down. This is needed to support once('readable'). | ||
state.readableListening = this.listenerCount('readable') > 0; // Try start flowing on next tick if stream isn't explicitly paused | ||
state.readableListening = this.listenerCount('readable') > 0; | ||
// Try start flowing on next tick if stream isn't explicitly paused | ||
if (state.flowing !== false) this.resume(); | ||
@@ -874,3 +814,2 @@ } else if (ev === 'readable') { | ||
debug('on readable', state.length, state.reading); | ||
if (state.length) { | ||
@@ -883,11 +822,7 @@ emitReadable(this); | ||
} | ||
return res; | ||
}; | ||
Readable.prototype.addListener = Readable.prototype.on; | ||
Readable.prototype.removeListener = function (ev, fn) { | ||
var res = Stream.prototype.removeListener.call(this, ev, fn); | ||
const res = Stream.prototype.removeListener.call(this, ev, fn); | ||
if (ev === 'readable') { | ||
@@ -902,9 +837,6 @@ // We need to check if there is someone still listening to | ||
} | ||
return res; | ||
}; | ||
Readable.prototype.removeAllListeners = function (ev) { | ||
var res = Stream.prototype.removeAllListeners.apply(this, arguments); | ||
const res = Stream.prototype.removeAllListeners.apply(this, arguments); | ||
if (ev === 'readable' || ev === undefined) { | ||
@@ -919,14 +851,13 @@ // We need to check if there is someone still listening to | ||
} | ||
return res; | ||
}; | ||
function updateReadableListening(self) { | ||
var state = self._readableState; | ||
const state = self._readableState; | ||
state.readableListening = self.listenerCount('readable') > 0; | ||
if (state.resumeScheduled && !state.paused) { | ||
// flowing needs to be set to true now, otherwise | ||
// the upcoming resume will not flow. | ||
state.flowing = true; // crude way to check if we should resume | ||
state.flowing = true; | ||
// crude way to check if we should resume | ||
} else if (self.listenerCount('data') > 0) { | ||
@@ -936,26 +867,22 @@ self.resume(); | ||
} | ||
function nReadingNextTick(self) { | ||
debug('readable nexttick read 0'); | ||
self.read(0); | ||
} // pause() and resume() are remnants of the legacy readable stream API | ||
} | ||
// pause() and resume() are remnants of the legacy readable stream API | ||
// If the user uses them, then switch into old mode. | ||
Readable.prototype.resume = function () { | ||
var state = this._readableState; | ||
if (!state.flowing) { | ||
debug('resume'); // we flow only if there is no one listening | ||
debug('resume'); | ||
// we flow only if there is no one listening | ||
// for readable, but we still have to call | ||
// resume() | ||
state.flowing = !state.readableListening; | ||
resume(this, state); | ||
} | ||
state.paused = false; | ||
return this; | ||
}; | ||
function resume(stream, state) { | ||
@@ -967,10 +894,7 @@ if (!state.resumeScheduled) { | ||
} | ||
function resume_(stream, state) { | ||
debug('resume', state.reading); | ||
if (!state.reading) { | ||
stream.read(0); | ||
} | ||
state.resumeScheduled = false; | ||
@@ -981,6 +905,4 @@ stream.emit('resume'); | ||
} | ||
Readable.prototype.pause = function () { | ||
debug('call pause flowing=%j', this._readableState.flowing); | ||
if (this._readableState.flowing !== false) { | ||
@@ -991,42 +913,32 @@ debug('pause'); | ||
} | ||
this._readableState.paused = true; | ||
return this; | ||
}; | ||
function flow(stream) { | ||
var state = stream._readableState; | ||
const state = stream._readableState; | ||
debug('flow', state.flowing); | ||
while (state.flowing && stream.read() !== null); | ||
} | ||
while (state.flowing && stream.read() !== null) { | ||
; | ||
} | ||
} // wrap an old-style stream as the async data source. | ||
// wrap an old-style stream as the async data source. | ||
// This is *not* part of the readable stream interface. | ||
// It is an ugly unfortunate mess of history. | ||
Readable.prototype.wrap = function (stream) { | ||
var _this = this; | ||
var state = this._readableState; | ||
var paused = false; | ||
stream.on('end', function () { | ||
stream.on('end', () => { | ||
debug('wrapped end'); | ||
if (state.decoder && !state.ended) { | ||
var chunk = state.decoder.end(); | ||
if (chunk && chunk.length) _this.push(chunk); | ||
if (chunk && chunk.length) this.push(chunk); | ||
} | ||
_this.push(null); | ||
this.push(null); | ||
}); | ||
stream.on('data', function (chunk) { | ||
stream.on('data', chunk => { | ||
debug('wrapped data'); | ||
if (state.decoder) chunk = state.decoder.write(chunk); // don't skip over falsy values in objectMode | ||
if (state.decoder) chunk = state.decoder.write(chunk); | ||
// don't skip over falsy values in objectMode | ||
if (state.objectMode && (chunk === null || chunk === undefined)) return;else if (!state.objectMode && (!chunk || !chunk.length)) return; | ||
var ret = _this.push(chunk); | ||
var ret = this.push(chunk); | ||
if (!ret) { | ||
@@ -1036,5 +948,6 @@ paused = true; | ||
} | ||
}); // proxy all the other methods. | ||
}); | ||
// proxy all the other methods. | ||
// important when wrapping filters and duplexes. | ||
for (var i in stream) { | ||
@@ -1048,14 +961,13 @@ if (this[i] === undefined && typeof stream[i] === 'function') { | ||
} | ||
} // proxy certain important events. | ||
} | ||
// proxy certain important events. | ||
for (var n = 0; n < kProxyEvents.length; n++) { | ||
stream.on(kProxyEvents[n], this.emit.bind(this, kProxyEvents[n])); | ||
} // when we try to consume some more bytes, simply unpause the | ||
} | ||
// when we try to consume some more bytes, simply unpause the | ||
// underlying stream. | ||
this._read = function (n) { | ||
this._read = n => { | ||
debug('wrapped _read', n); | ||
if (paused) { | ||
@@ -1066,6 +978,4 @@ paused = false; | ||
}; | ||
return this; | ||
}; | ||
if (typeof Symbol === 'function') { | ||
@@ -1076,7 +986,5 @@ Readable.prototype[Symbol.asyncIterator] = function () { | ||
} | ||
return createReadableStreamAsyncIterator(this); | ||
}; | ||
} | ||
Object.defineProperty(Readable.prototype, 'readableHighWaterMark', { | ||
@@ -1113,4 +1021,5 @@ // making it explicit this property is not enumerable | ||
} | ||
}); // exposed for testing purposes only. | ||
}); | ||
// exposed for testing purposes only. | ||
Readable._fromList = fromList; | ||
@@ -1122,10 +1031,11 @@ Object.defineProperty(Readable.prototype, 'readableLength', { | ||
enumerable: false, | ||
get: function get() { | ||
get() { | ||
return this._readableState.length; | ||
} | ||
}); // Pluck off n bytes from an array of buffers. | ||
}); | ||
// Pluck off n bytes from an array of buffers. | ||
// Length is the combined lengths of all the buffers in the list. | ||
// This function is designed to be inlinable, so please take care when making | ||
// changes to the function body. | ||
function fromList(n, state) { | ||
@@ -1145,7 +1055,5 @@ // nothing buffered | ||
} | ||
function endReadable(stream) { | ||
var state = stream._readableState; | ||
debug('endReadable', state.endEmitted); | ||
if (!state.endEmitted) { | ||
@@ -1156,6 +1064,6 @@ state.ended = true; | ||
} | ||
function endReadableNT(state, stream) { | ||
debug('endReadableNT', state.endEmitted, state.length); // Check that we didn't get one last unshift. | ||
debug('endReadableNT', state.endEmitted, state.length); | ||
// Check that we didn't get one last unshift. | ||
if (!state.endEmitted && state.length === 0) { | ||
@@ -1165,8 +1073,6 @@ state.endEmitted = true; | ||
stream.emit('end'); | ||
if (state.autoDestroy) { | ||
// In case of duplex streams we need a way to detect | ||
// if the writable side is ready for autoDestroy as well | ||
var wState = stream._writableState; | ||
const wState = stream._writableState; | ||
if (!wState || wState.autoDestroy && wState.finished) { | ||
@@ -1178,3 +1084,2 @@ stream.destroy(); | ||
} | ||
if (typeof Symbol === 'function') { | ||
@@ -1185,7 +1090,5 @@ Readable.from = function (iterable, opts) { | ||
} | ||
return from(Readable, iterable, opts); | ||
}; | ||
} | ||
function indexOf(xs, x) { | ||
@@ -1195,4 +1098,3 @@ for (var i = 0, l = xs.length; i < l; i++) { | ||
} | ||
return -1; | ||
} |
@@ -21,2 +21,3 @@ // Copyright Joyent, Inc. and other Node contributors. | ||
// USE OR OTHER DEALINGS IN THE SOFTWARE. | ||
// a transform stream is a readable/writable stream where you do | ||
@@ -63,16 +64,13 @@ // something with the data. Sometimes it's called a "filter", | ||
// the results of the previous transformed chunk were consumed. | ||
'use strict'; | ||
module.exports = Transform; | ||
var _require$codes = require('../errors').codes, | ||
ERR_METHOD_NOT_IMPLEMENTED = _require$codes.ERR_METHOD_NOT_IMPLEMENTED, | ||
ERR_MULTIPLE_CALLBACK = _require$codes.ERR_MULTIPLE_CALLBACK, | ||
ERR_TRANSFORM_ALREADY_TRANSFORMING = _require$codes.ERR_TRANSFORM_ALREADY_TRANSFORMING, | ||
ERR_TRANSFORM_WITH_LENGTH_0 = _require$codes.ERR_TRANSFORM_WITH_LENGTH_0; | ||
var Duplex = require('./_stream_duplex'); | ||
const _require$codes = require('../errors').codes, | ||
ERR_METHOD_NOT_IMPLEMENTED = _require$codes.ERR_METHOD_NOT_IMPLEMENTED, | ||
ERR_MULTIPLE_CALLBACK = _require$codes.ERR_MULTIPLE_CALLBACK, | ||
ERR_TRANSFORM_ALREADY_TRANSFORMING = _require$codes.ERR_TRANSFORM_ALREADY_TRANSFORMING, | ||
ERR_TRANSFORM_WITH_LENGTH_0 = _require$codes.ERR_TRANSFORM_WITH_LENGTH_0; | ||
const Duplex = require('./_stream_duplex'); | ||
require('inherits')(Transform, Duplex); | ||
function afterTransform(er, data) { | ||
@@ -82,10 +80,9 @@ var ts = this._transformState; | ||
var cb = ts.writecb; | ||
if (cb === null) { | ||
return this.emit('error', new ERR_MULTIPLE_CALLBACK()); | ||
} | ||
ts.writechunk = null; | ||
ts.writecb = null; | ||
if (data != null) // single equals check for both `null` and `undefined` | ||
if (data != null) | ||
// single equals check for both `null` and `undefined` | ||
this.push(data); | ||
@@ -95,3 +92,2 @@ cb(er); | ||
rs.reading = false; | ||
if (rs.needReadable || rs.length < rs.highWaterMark) { | ||
@@ -101,3 +97,2 @@ this._read(rs.highWaterMark); | ||
} | ||
function Transform(options) { | ||
@@ -113,25 +108,23 @@ if (!(this instanceof Transform)) return new Transform(options); | ||
writeencoding: null | ||
}; // start out asking for a readable event once data is transformed. | ||
}; | ||
this._readableState.needReadable = true; // we have implemented the _read method, and done the other things | ||
// start out asking for a readable event once data is transformed. | ||
this._readableState.needReadable = true; | ||
// we have implemented the _read method, and done the other things | ||
// that Readable wants before the first _read call, so unset the | ||
// sync guard flag. | ||
this._readableState.sync = false; | ||
if (options) { | ||
if (typeof options.transform === 'function') this._transform = options.transform; | ||
if (typeof options.flush === 'function') this._flush = options.flush; | ||
} // When the writable side finishes, then flush out anything remaining. | ||
} | ||
// When the writable side finishes, then flush out anything remaining. | ||
this.on('prefinish', prefinish); | ||
} | ||
function prefinish() { | ||
var _this = this; | ||
if (typeof this._flush === 'function' && !this._readableState.destroyed) { | ||
this._flush(function (er, data) { | ||
done(_this, er, data); | ||
this._flush((er, data) => { | ||
done(this, er, data); | ||
}); | ||
@@ -142,7 +135,8 @@ } else { | ||
} | ||
Transform.prototype.push = function (chunk, encoding) { | ||
this._transformState.needTransform = false; | ||
return Duplex.prototype.push.call(this, chunk, encoding); | ||
}; // This is the part where you do stuff! | ||
}; | ||
// This is the part where you do stuff! | ||
// override this function in implementation classes. | ||
@@ -157,8 +151,5 @@ // 'chunk' is an input chunk. | ||
// never call cb(), then you'll never get another chunk. | ||
Transform.prototype._transform = function (chunk, encoding, cb) { | ||
cb(new ERR_METHOD_NOT_IMPLEMENTED('_transform()')); | ||
}; | ||
Transform.prototype._write = function (chunk, encoding, cb) { | ||
@@ -169,3 +160,2 @@ var ts = this._transformState; | ||
ts.writeencoding = encoding; | ||
if (!ts.transforming) { | ||
@@ -175,13 +165,11 @@ var rs = this._readableState; | ||
} | ||
}; // Doesn't matter what the args are here. | ||
}; | ||
// Doesn't matter what the args are here. | ||
// _transform does all the work. | ||
// That we got here means that the readable side wants more data. | ||
Transform.prototype._read = function (n) { | ||
var ts = this._transformState; | ||
if (ts.writechunk !== null && !ts.transforming) { | ||
ts.transforming = true; | ||
this._transform(ts.writechunk, ts.writeencoding, ts.afterTransform); | ||
@@ -194,16 +182,16 @@ } else { | ||
}; | ||
Transform.prototype._destroy = function (err, cb) { | ||
Duplex.prototype._destroy.call(this, err, function (err2) { | ||
Duplex.prototype._destroy.call(this, err, err2 => { | ||
cb(err2); | ||
}); | ||
}; | ||
function done(stream, er, data) { | ||
if (er) return stream.emit('error', er); | ||
if (data != null) // single equals check for both `null` and `undefined` | ||
stream.push(data); // TODO(BridgeAR): Write a test for these two error cases | ||
if (data != null) | ||
// single equals check for both `null` and `undefined` | ||
stream.push(data); | ||
// TODO(BridgeAR): Write a test for these two error cases | ||
// if there's nothing in the write buffer, then that means | ||
// that nothing more will ever be provided | ||
if (stream._writableState.length) throw new ERR_TRANSFORM_WITH_LENGTH_0(); | ||
@@ -210,0 +198,0 @@ if (stream._transformState.transforming) throw new ERR_TRANSFORM_ALREADY_TRANSFORMING(); |
@@ -21,10 +21,12 @@ // Copyright Joyent, Inc. and other Node contributors. | ||
// USE OR OTHER DEALINGS IN THE SOFTWARE. | ||
// A bit simpler than readable streams. | ||
// Implement an async ._write(chunk, encoding, cb), and it'll handle all | ||
// the drain event emission and buffering. | ||
'use strict'; | ||
module.exports = Writable; | ||
/* <replacement> */ | ||
function WriteReq(chunk, encoding, cb) { | ||
@@ -35,14 +37,11 @@ this.chunk = chunk; | ||
this.next = null; | ||
} // It seems a linked list but it is not | ||
} | ||
// It seems a linked list but it is not | ||
// there will be only 2 of these for each stream | ||
function CorkedRequest(state) { | ||
var _this = this; | ||
this.next = null; | ||
this.entry = null; | ||
this.finish = function () { | ||
onCorkedFinish(_this, state); | ||
this.finish = () => { | ||
onCorkedFinish(this, state); | ||
}; | ||
@@ -53,4 +52,2 @@ } | ||
/*<replacement>*/ | ||
var Duplex; | ||
@@ -60,5 +57,5 @@ /*</replacement>*/ | ||
Writable.WritableState = WritableState; | ||
/*<replacement>*/ | ||
var internalUtil = { | ||
const internalUtil = { | ||
deprecate: require('util-deprecate') | ||
@@ -69,43 +66,33 @@ }; | ||
/*<replacement>*/ | ||
var Stream = require('./internal/streams/stream'); | ||
/*</replacement>*/ | ||
var Buffer = require('buffer').Buffer; | ||
var OurUint8Array = global.Uint8Array || function () {}; | ||
const Buffer = require('buffer').Buffer; | ||
const OurUint8Array = (typeof global !== 'undefined' ? global : typeof window !== 'undefined' ? window : typeof self !== 'undefined' ? self : {}).Uint8Array || function () {}; | ||
function _uint8ArrayToBuffer(chunk) { | ||
return Buffer.from(chunk); | ||
} | ||
function _isUint8Array(obj) { | ||
return Buffer.isBuffer(obj) || obj instanceof OurUint8Array; | ||
} | ||
var destroyImpl = require('./internal/streams/destroy'); | ||
var _require = require('./internal/streams/state'), | ||
getHighWaterMark = _require.getHighWaterMark; | ||
var _require$codes = require('../errors').codes, | ||
ERR_INVALID_ARG_TYPE = _require$codes.ERR_INVALID_ARG_TYPE, | ||
ERR_METHOD_NOT_IMPLEMENTED = _require$codes.ERR_METHOD_NOT_IMPLEMENTED, | ||
ERR_MULTIPLE_CALLBACK = _require$codes.ERR_MULTIPLE_CALLBACK, | ||
ERR_STREAM_CANNOT_PIPE = _require$codes.ERR_STREAM_CANNOT_PIPE, | ||
ERR_STREAM_DESTROYED = _require$codes.ERR_STREAM_DESTROYED, | ||
ERR_STREAM_NULL_VALUES = _require$codes.ERR_STREAM_NULL_VALUES, | ||
ERR_STREAM_WRITE_AFTER_END = _require$codes.ERR_STREAM_WRITE_AFTER_END, | ||
ERR_UNKNOWN_ENCODING = _require$codes.ERR_UNKNOWN_ENCODING; | ||
var errorOrDestroy = destroyImpl.errorOrDestroy; | ||
const destroyImpl = require('./internal/streams/destroy'); | ||
const _require = require('./internal/streams/state'), | ||
getHighWaterMark = _require.getHighWaterMark; | ||
const _require$codes = require('../errors').codes, | ||
ERR_INVALID_ARG_TYPE = _require$codes.ERR_INVALID_ARG_TYPE, | ||
ERR_METHOD_NOT_IMPLEMENTED = _require$codes.ERR_METHOD_NOT_IMPLEMENTED, | ||
ERR_MULTIPLE_CALLBACK = _require$codes.ERR_MULTIPLE_CALLBACK, | ||
ERR_STREAM_CANNOT_PIPE = _require$codes.ERR_STREAM_CANNOT_PIPE, | ||
ERR_STREAM_DESTROYED = _require$codes.ERR_STREAM_DESTROYED, | ||
ERR_STREAM_NULL_VALUES = _require$codes.ERR_STREAM_NULL_VALUES, | ||
ERR_STREAM_WRITE_AFTER_END = _require$codes.ERR_STREAM_WRITE_AFTER_END, | ||
ERR_UNKNOWN_ENCODING = _require$codes.ERR_UNKNOWN_ENCODING; | ||
const errorOrDestroy = destroyImpl.errorOrDestroy; | ||
require('inherits')(Writable, Stream); | ||
function nop() {} | ||
function WritableState(options, stream, isDuplex) { | ||
Duplex = Duplex || require('./_stream_duplex'); | ||
options = options || {}; // Duplex streams are both readable and writable, but share | ||
options = options || {}; | ||
// Duplex streams are both readable and writable, but share | ||
// the same options object. | ||
@@ -115,84 +102,102 @@ // However, some cases require setting options to different | ||
// e.g. options.readableObjectMode vs. options.writableObjectMode, etc. | ||
if (typeof isDuplex !== 'boolean') isDuplex = stream instanceof Duplex; | ||
if (typeof isDuplex !== 'boolean') isDuplex = stream instanceof Duplex; // object stream flag to indicate whether or not this stream | ||
// object stream flag to indicate whether or not this stream | ||
// contains buffers or objects. | ||
this.objectMode = !!options.objectMode; | ||
if (isDuplex) this.objectMode = this.objectMode || !!options.writableObjectMode; | ||
this.objectMode = !!options.objectMode; | ||
if (isDuplex) this.objectMode = this.objectMode || !!options.writableObjectMode; // the point at which write() starts returning false | ||
// the point at which write() starts returning false | ||
// Note: 0 is a valid value, means that we always return false if | ||
// the entire buffer is not flushed immediately on write() | ||
this.highWaterMark = getHighWaterMark(this, options, 'writableHighWaterMark', isDuplex); | ||
this.highWaterMark = getHighWaterMark(this, options, 'writableHighWaterMark', isDuplex); // if _final has been called | ||
// if _final has been called | ||
this.finalCalled = false; | ||
this.finalCalled = false; // drain event flag. | ||
// drain event flag. | ||
this.needDrain = false; | ||
// at the start of calling end() | ||
this.ending = false; | ||
// when end() has been called, and returned | ||
this.ended = false; | ||
// when 'finish' is emitted | ||
this.finished = false; | ||
this.needDrain = false; // at the start of calling end() | ||
// has it been destroyed | ||
this.destroyed = false; | ||
this.ending = false; // when end() has been called, and returned | ||
this.ended = false; // when 'finish' is emitted | ||
this.finished = false; // has it been destroyed | ||
this.destroyed = false; // should we decode strings into buffers before passing to _write? | ||
// should we decode strings into buffers before passing to _write? | ||
// this is here so that some node-core streams can optimize string | ||
// handling at a lower level. | ||
var noDecode = options.decodeStrings === false; | ||
this.decodeStrings = !noDecode; | ||
var noDecode = options.decodeStrings === false; | ||
this.decodeStrings = !noDecode; // Crypto is kind of old and crusty. Historically, its default string | ||
// Crypto is kind of old and crusty. Historically, its default string | ||
// encoding is 'binary' so we have to make this configurable. | ||
// Everything else in the universe uses 'utf8', though. | ||
this.defaultEncoding = options.defaultEncoding || 'utf8'; | ||
this.defaultEncoding = options.defaultEncoding || 'utf8'; // not an actual buffer we keep track of, but a measurement | ||
// not an actual buffer we keep track of, but a measurement | ||
// of how much we're waiting to get pushed to some underlying | ||
// socket or file. | ||
this.length = 0; | ||
this.length = 0; // a flag to see when we're in the middle of a write. | ||
// a flag to see when we're in the middle of a write. | ||
this.writing = false; | ||
this.writing = false; // when true all writes will be buffered until .uncork() call | ||
// when true all writes will be buffered until .uncork() call | ||
this.corked = 0; | ||
this.corked = 0; // a flag to be able to tell if the onwrite cb is called immediately, | ||
// a flag to be able to tell if the onwrite cb is called immediately, | ||
// or on a later tick. We set this to true at first, because any | ||
// actions that shouldn't happen until "later" should generally also | ||
// not happen before the first write call. | ||
this.sync = true; | ||
this.sync = true; // a flag to know if we're processing previously buffered items, which | ||
// a flag to know if we're processing previously buffered items, which | ||
// may call the _write() callback in the same tick, so that we don't | ||
// end up in an overlapped onwrite situation. | ||
this.bufferProcessing = false; | ||
this.bufferProcessing = false; // the callback that's passed to _write(chunk,cb) | ||
// the callback that's passed to _write(chunk,cb) | ||
this.onwrite = function (er) { | ||
onwrite(stream, er); | ||
}; // the callback that the user supplies to write(chunk,encoding,cb) | ||
}; | ||
// the callback that the user supplies to write(chunk,encoding,cb) | ||
this.writecb = null; | ||
this.writecb = null; // the amount that is being written when _write is called. | ||
// the amount that is being written when _write is called. | ||
this.writelen = 0; | ||
this.bufferedRequest = null; | ||
this.lastBufferedRequest = null; // number of pending user-supplied write callbacks | ||
this.lastBufferedRequest = null; | ||
// number of pending user-supplied write callbacks | ||
// this must be 0 before 'finish' can be emitted | ||
this.pendingcb = 0; | ||
this.pendingcb = 0; // emit prefinish if the only thing we're waiting for is _write cbs | ||
// emit prefinish if the only thing we're waiting for is _write cbs | ||
// This is relevant for synchronous Transform streams | ||
this.prefinished = false; | ||
this.prefinished = false; // True if the error was already emitted and should not be thrown again | ||
// True if the error was already emitted and should not be thrown again | ||
this.errorEmitted = false; | ||
this.errorEmitted = false; // Should close be emitted on destroy. Defaults to true. | ||
// Should close be emitted on destroy. Defaults to true. | ||
this.emitClose = options.emitClose !== false; | ||
this.emitClose = options.emitClose !== false; // Should .destroy() be called after 'finish' (and potentially 'end') | ||
// Should .destroy() be called after 'finish' (and potentially 'end') | ||
this.autoDestroy = !!options.autoDestroy; | ||
this.autoDestroy = !!options.autoDestroy; // count buffered requests | ||
// count buffered requests | ||
this.bufferedRequestCount = 0; | ||
this.bufferedRequestCount = 0; // allocate the first CorkedRequest, there is always | ||
// allocate the first CorkedRequest, there is always | ||
// one allocated and free to use, and we maintain at most two | ||
this.corkedRequestsFree = new CorkedRequest(this); | ||
} | ||
WritableState.prototype.getBuffer = function getBuffer() { | ||
var current = this.bufferedRequest; | ||
var out = []; | ||
while (current) { | ||
@@ -202,6 +207,4 @@ out.push(current); | ||
} | ||
return out; | ||
}; | ||
(function () { | ||
@@ -215,8 +218,7 @@ try { | ||
} catch (_) {} | ||
})(); // Test _writableState for inheritance to account for Duplex streams, | ||
})(); | ||
// Test _writableState for inheritance to account for Duplex streams, | ||
// whose prototype chain only points to Readable. | ||
var realHasInstance; | ||
if (typeof Symbol === 'function' && Symbol.hasInstance && typeof Function.prototype[Symbol.hasInstance] === 'function') { | ||
@@ -236,19 +238,21 @@ realHasInstance = Function.prototype[Symbol.hasInstance]; | ||
} | ||
function Writable(options) { | ||
Duplex = Duplex || require('./_stream_duplex'); | ||
function Writable(options) { | ||
Duplex = Duplex || require('./_stream_duplex'); // Writable ctor is applied to Duplexes, too. | ||
// Writable ctor is applied to Duplexes, too. | ||
// `realHasInstance` is necessary because using plain `instanceof` | ||
// would return false, as no `_writableState` property is attached. | ||
// Trying to use the custom `instanceof` for Writable here will also break the | ||
// Node.js LazyTransform implementation, which has a non-trivial getter for | ||
// `_writableState` that would lead to infinite recursion. | ||
// Checking for a Stream.Duplex instance is faster here instead of inside | ||
// the WritableState constructor, at least with V8 6.5 | ||
var isDuplex = this instanceof Duplex; | ||
const isDuplex = this instanceof Duplex; | ||
if (!isDuplex && !realHasInstance.call(Writable, this)) return new Writable(options); | ||
this._writableState = new WritableState(options, this, isDuplex); // legacy. | ||
this._writableState = new WritableState(options, this, isDuplex); | ||
// legacy. | ||
this.writable = true; | ||
if (options) { | ||
@@ -260,24 +264,21 @@ if (typeof options.write === 'function') this._write = options.write; | ||
} | ||
Stream.call(this); | ||
} // Otherwise people can pipe Writable streams, which is just wrong. | ||
} | ||
// Otherwise people can pipe Writable streams, which is just wrong. | ||
Writable.prototype.pipe = function () { | ||
errorOrDestroy(this, new ERR_STREAM_CANNOT_PIPE()); | ||
}; | ||
function writeAfterEnd(stream, cb) { | ||
var er = new ERR_STREAM_WRITE_AFTER_END(); // TODO: defer error events consistently everywhere, not just the cb | ||
var er = new ERR_STREAM_WRITE_AFTER_END(); | ||
// TODO: defer error events consistently everywhere, not just the cb | ||
errorOrDestroy(stream, er); | ||
process.nextTick(cb, er); | ||
} // Checks that a user-supplied chunk is valid, especially for the particular | ||
} | ||
// Checks that a user-supplied chunk is valid, especially for the particular | ||
// mode the stream is in. Currently this means that `null` is never accepted | ||
// and undefined/non-string values are only allowed in object mode. | ||
function validChunk(stream, state, chunk, cb) { | ||
var er; | ||
if (chunk === null) { | ||
@@ -288,3 +289,2 @@ er = new ERR_STREAM_NULL_VALUES(); | ||
} | ||
if (er) { | ||
@@ -295,16 +295,11 @@ errorOrDestroy(stream, er); | ||
} | ||
return true; | ||
} | ||
Writable.prototype.write = function (chunk, encoding, cb) { | ||
var state = this._writableState; | ||
var ret = false; | ||
var isBuf = !state.objectMode && _isUint8Array(chunk); | ||
if (isBuf && !Buffer.isBuffer(chunk)) { | ||
chunk = _uint8ArrayToBuffer(chunk); | ||
} | ||
if (typeof encoding === 'function') { | ||
@@ -314,3 +309,2 @@ cb = encoding; | ||
} | ||
if (isBuf) encoding = 'buffer';else if (!encoding) encoding = state.defaultEncoding; | ||
@@ -324,10 +318,7 @@ if (typeof cb !== 'function') cb = nop; | ||
}; | ||
Writable.prototype.cork = function () { | ||
this._writableState.corked++; | ||
}; | ||
Writable.prototype.uncork = function () { | ||
var state = this._writableState; | ||
if (state.corked) { | ||
@@ -338,3 +329,2 @@ state.corked--; | ||
}; | ||
Writable.prototype.setDefaultEncoding = function setDefaultEncoding(encoding) { | ||
@@ -347,3 +337,2 @@ // node::ParseEncoding() requires lower case. | ||
}; | ||
Object.defineProperty(Writable.prototype, 'writableBuffer', { | ||
@@ -358,3 +347,2 @@ // making it explicit this property is not enumerable | ||
}); | ||
function decodeChunk(state, chunk, encoding) { | ||
@@ -364,6 +352,4 @@ if (!state.objectMode && state.decodeStrings !== false && typeof chunk === 'string') { | ||
} | ||
return chunk; | ||
} | ||
Object.defineProperty(Writable.prototype, 'writableHighWaterMark', { | ||
@@ -377,10 +363,10 @@ // making it explicit this property is not enumerable | ||
} | ||
}); // if we're already writing something, then just put this | ||
}); | ||
// if we're already writing something, then just put this | ||
// in the queue, and wait our turn. Otherwise, call _write | ||
// If we return false, then we need a drain event, so set that flag. | ||
function writeOrBuffer(stream, state, isBuf, chunk, encoding, cb) { | ||
if (!isBuf) { | ||
var newChunk = decodeChunk(state, chunk, encoding); | ||
if (chunk !== newChunk) { | ||
@@ -392,19 +378,16 @@ isBuf = true; | ||
} | ||
var len = state.objectMode ? 1 : chunk.length; | ||
state.length += len; | ||
var ret = state.length < state.highWaterMark; // we must ensure that previous needDrain will not be reset to false. | ||
var ret = state.length < state.highWaterMark; | ||
// we must ensure that previous needDrain will not be reset to false. | ||
if (!ret) state.needDrain = true; | ||
if (state.writing || state.corked) { | ||
var last = state.lastBufferedRequest; | ||
state.lastBufferedRequest = { | ||
chunk: chunk, | ||
encoding: encoding, | ||
isBuf: isBuf, | ||
chunk, | ||
encoding, | ||
isBuf, | ||
callback: cb, | ||
next: null | ||
}; | ||
if (last) { | ||
@@ -415,3 +398,2 @@ last.next = state.lastBufferedRequest; | ||
} | ||
state.bufferedRequestCount += 1; | ||
@@ -421,6 +403,4 @@ } else { | ||
} | ||
return ret; | ||
} | ||
function doWrite(stream, state, writev, len, chunk, encoding, cb) { | ||
@@ -434,12 +414,10 @@ state.writelen = len; | ||
} | ||
function onwriteError(stream, state, sync, er, cb) { | ||
--state.pendingcb; | ||
if (sync) { | ||
// defer the callback if we are being called synchronously | ||
// to avoid piling up things on the stack | ||
process.nextTick(cb, er); // this can emit finish, and it will always happen | ||
process.nextTick(cb, er); | ||
// this can emit finish, and it will always happen | ||
// after error | ||
process.nextTick(finishMaybe, stream, state); | ||
@@ -453,9 +431,8 @@ stream._writableState.errorEmitted = true; | ||
stream._writableState.errorEmitted = true; | ||
errorOrDestroy(stream, er); // this can emit finish, but finish must | ||
errorOrDestroy(stream, er); | ||
// this can emit finish, but finish must | ||
// always follow error | ||
finishMaybe(stream, state); | ||
} | ||
} | ||
function onwriteStateUpdate(state) { | ||
@@ -467,3 +444,2 @@ state.writing = false; | ||
} | ||
function onwrite(stream, er) { | ||
@@ -478,7 +454,5 @@ var state = stream._writableState; | ||
var finished = needFinish(state) || stream.destroyed; | ||
if (!finished && !state.corked && !state.bufferProcessing && state.bufferedRequest) { | ||
clearBuffer(stream, state); | ||
} | ||
if (sync) { | ||
@@ -491,3 +465,2 @@ process.nextTick(afterWrite, stream, state, finished, cb); | ||
} | ||
function afterWrite(stream, state, finished, cb) { | ||
@@ -498,7 +471,7 @@ if (!finished) onwriteDrain(stream, state); | ||
finishMaybe(stream, state); | ||
} // Must force callback to be called on nextTick, so that we don't | ||
} | ||
// Must force callback to be called on nextTick, so that we don't | ||
// emit 'drain' before the write() consumer gets the 'false' return | ||
// value, and has a chance to attach a 'drain' listener. | ||
function onwriteDrain(stream, state) { | ||
@@ -509,9 +482,8 @@ if (state.length === 0 && state.needDrain) { | ||
} | ||
} // if there's something in the buffer waiting, then process it | ||
} | ||
// if there's something in the buffer waiting, then process it | ||
function clearBuffer(stream, state) { | ||
state.bufferProcessing = true; | ||
var entry = state.bufferedRequest; | ||
if (stream._writev && entry && entry.next) { | ||
@@ -525,3 +497,2 @@ // Fast case, write everything using _writev() | ||
var allBuffers = true; | ||
while (entry) { | ||
@@ -533,10 +504,9 @@ buffer[count] = entry; | ||
} | ||
buffer.allBuffers = allBuffers; | ||
doWrite(stream, state, true, state.length, buffer, '', holder.finish); | ||
buffer.allBuffers = allBuffers; | ||
doWrite(stream, state, true, state.length, buffer, '', holder.finish); // doWrite is almost always async, defer these to save a bit of time | ||
// doWrite is almost always async, defer these to save a bit of time | ||
// as the hot path ends with doWrite | ||
state.pendingcb++; | ||
state.lastBufferedRequest = null; | ||
if (holder.next) { | ||
@@ -548,3 +518,2 @@ state.corkedRequestsFree = holder.next; | ||
} | ||
state.bufferedRequestCount = 0; | ||
@@ -560,7 +529,7 @@ } else { | ||
entry = entry.next; | ||
state.bufferedRequestCount--; // if we didn't call the onwrite immediately, then | ||
state.bufferedRequestCount--; | ||
// if we didn't call the onwrite immediately, then | ||
// it means that we need to wait until it does. | ||
// also, that means that the chunk and cb are currently | ||
// being processed, so move the buffer counter past them. | ||
if (state.writing) { | ||
@@ -570,19 +539,13 @@ break; | ||
} | ||
if (entry === null) state.lastBufferedRequest = null; | ||
} | ||
state.bufferedRequest = entry; | ||
state.bufferProcessing = false; | ||
} | ||
Writable.prototype._write = function (chunk, encoding, cb) { | ||
cb(new ERR_METHOD_NOT_IMPLEMENTED('_write()')); | ||
}; | ||
Writable.prototype._writev = null; | ||
Writable.prototype.end = function (chunk, encoding, cb) { | ||
var state = this._writableState; | ||
if (typeof chunk === 'function') { | ||
@@ -596,15 +559,14 @@ cb = chunk; | ||
} | ||
if (chunk !== null && chunk !== undefined) this.write(chunk, encoding); | ||
if (chunk !== null && chunk !== undefined) this.write(chunk, encoding); // .end() fully uncorks | ||
// .end() fully uncorks | ||
if (state.corked) { | ||
state.corked = 1; | ||
this.uncork(); | ||
} // ignore unnecessary end() calls. | ||
} | ||
// ignore unnecessary end() calls. | ||
if (!state.ending) endWritable(this, state, cb); | ||
return this; | ||
}; | ||
Object.defineProperty(Writable.prototype, 'writableLength', { | ||
@@ -615,19 +577,15 @@ // making it explicit this property is not enumerable | ||
enumerable: false, | ||
get: function get() { | ||
get() { | ||
return this._writableState.length; | ||
} | ||
}); | ||
function needFinish(state) { | ||
return state.ending && state.length === 0 && state.bufferedRequest === null && !state.finished && !state.writing; | ||
} | ||
function callFinal(stream, state) { | ||
stream._final(function (err) { | ||
stream._final(err => { | ||
state.pendingcb--; | ||
if (err) { | ||
errorOrDestroy(stream, err); | ||
} | ||
state.prefinished = true; | ||
@@ -638,3 +596,2 @@ stream.emit('prefinish'); | ||
} | ||
function prefinish(stream, state) { | ||
@@ -652,18 +609,13 @@ if (!state.prefinished && !state.finalCalled) { | ||
} | ||
function finishMaybe(stream, state) { | ||
var need = needFinish(state); | ||
if (need) { | ||
prefinish(stream, state); | ||
if (state.pendingcb === 0) { | ||
state.finished = true; | ||
stream.emit('finish'); | ||
if (state.autoDestroy) { | ||
// In case of duplex streams we need a way to detect | ||
// if the readable side is ready for autoDestroy as well | ||
var rState = stream._readableState; | ||
const rState = stream._readableState; | ||
if (!rState || rState.autoDestroy && rState.endEmitted) { | ||
@@ -675,22 +627,16 @@ stream.destroy(); | ||
} | ||
return need; | ||
} | ||
function endWritable(stream, state, cb) { | ||
state.ending = true; | ||
finishMaybe(stream, state); | ||
if (cb) { | ||
if (state.finished) process.nextTick(cb);else stream.once('finish', cb); | ||
} | ||
state.ended = true; | ||
stream.writable = false; | ||
} | ||
function onCorkedFinish(corkReq, state, err) { | ||
var entry = corkReq.entry; | ||
corkReq.entry = null; | ||
while (entry) { | ||
@@ -701,8 +647,7 @@ var cb = entry.callback; | ||
entry = entry.next; | ||
} // reuse the free corkReq. | ||
} | ||
// reuse the free corkReq. | ||
state.corkedRequestsFree.next = corkReq; | ||
} | ||
Object.defineProperty(Writable.prototype, 'destroyed', { | ||
@@ -713,10 +658,9 @@ // making it explicit this property is not enumerable | ||
enumerable: false, | ||
get: function get() { | ||
get() { | ||
if (this._writableState === undefined) { | ||
return false; | ||
} | ||
return this._writableState.destroyed; | ||
}, | ||
set: function set(value) { | ||
set(value) { | ||
// we ignore the value if the stream | ||
@@ -726,6 +670,6 @@ // has not been initialized yet | ||
return; | ||
} // backward compatibility, the user is explicitly | ||
} | ||
// backward compatibility, the user is explicitly | ||
// managing destroyed | ||
this._writableState.destroyed = value; | ||
@@ -736,5 +680,4 @@ } | ||
Writable.prototype._undestroy = destroyImpl.undestroy; | ||
Writable.prototype._destroy = function (err, cb) { | ||
cb(err); | ||
}; |
'use strict'; | ||
var _Object$setPrototypeO; | ||
function _defineProperty(obj, key, value) { if (key in obj) { Object.defineProperty(obj, key, { value: value, enumerable: true, configurable: true, writable: true }); } else { obj[key] = value; } return obj; } | ||
var finished = require('./end-of-stream'); | ||
var kLastResolve = Symbol('lastResolve'); | ||
var kLastReject = Symbol('lastReject'); | ||
var kError = Symbol('error'); | ||
var kEnded = Symbol('ended'); | ||
var kLastPromise = Symbol('lastPromise'); | ||
var kHandlePromise = Symbol('handlePromise'); | ||
var kStream = Symbol('stream'); | ||
const finished = require('./end-of-stream'); | ||
const kLastResolve = Symbol('lastResolve'); | ||
const kLastReject = Symbol('lastReject'); | ||
const kError = Symbol('error'); | ||
const kEnded = Symbol('ended'); | ||
const kLastPromise = Symbol('lastPromise'); | ||
const kHandlePromise = Symbol('handlePromise'); | ||
const kStream = Symbol('stream'); | ||
function createIterResult(value, done) { | ||
return { | ||
value: value, | ||
done: done | ||
value, | ||
done | ||
}; | ||
} | ||
function readAndResolve(iter) { | ||
var resolve = iter[kLastResolve]; | ||
const resolve = iter[kLastResolve]; | ||
if (resolve !== null) { | ||
var data = iter[kStream].read(); // we defer if data is null | ||
const data = iter[kStream].read(); | ||
// we defer if data is null | ||
// we can be expecting either 'end' or | ||
// 'error' | ||
if (data !== null) { | ||
@@ -40,3 +32,2 @@ iter[kLastPromise] = null; | ||
} | ||
function onReadable(iter) { | ||
@@ -47,6 +38,5 @@ // we wait for the next tick, because it might | ||
} | ||
function wrapForNext(lastPromise, iter) { | ||
return function (resolve, reject) { | ||
lastPromise.then(function () { | ||
return (resolve, reject) => { | ||
lastPromise.then(() => { | ||
if (iter[kEnded]) { | ||
@@ -56,3 +46,2 @@ resolve(createIterResult(undefined, true)); | ||
} | ||
iter[kHandlePromise](resolve, reject); | ||
@@ -62,24 +51,17 @@ }, reject); | ||
} | ||
var AsyncIteratorPrototype = Object.getPrototypeOf(function () {}); | ||
var ReadableStreamAsyncIteratorPrototype = Object.setPrototypeOf((_Object$setPrototypeO = { | ||
const AsyncIteratorPrototype = Object.getPrototypeOf(function () {}); | ||
const ReadableStreamAsyncIteratorPrototype = Object.setPrototypeOf({ | ||
get stream() { | ||
return this[kStream]; | ||
}, | ||
next: function next() { | ||
var _this = this; | ||
next() { | ||
// if we have detected an error in the meanwhile | ||
// reject straight away | ||
var error = this[kError]; | ||
const error = this[kError]; | ||
if (error !== null) { | ||
return Promise.reject(error); | ||
} | ||
if (this[kEnded]) { | ||
return Promise.resolve(createIterResult(undefined, true)); | ||
} | ||
if (this[kStream].destroyed) { | ||
@@ -90,6 +72,6 @@ // We need to defer via nextTick because if .destroy(err) is | ||
// waiting to be emitted. | ||
return new Promise(function (resolve, reject) { | ||
process.nextTick(function () { | ||
if (_this[kError]) { | ||
reject(_this[kError]); | ||
return new Promise((resolve, reject) => { | ||
process.nextTick(() => { | ||
if (this[kError]) { | ||
reject(this[kError]); | ||
} else { | ||
@@ -100,11 +82,10 @@ resolve(createIterResult(undefined, true)); | ||
}); | ||
} // if we have multiple next() calls | ||
} | ||
// if we have multiple next() calls | ||
// we will wait for the previous Promise to finish | ||
// this logic is optimized to support for await loops, | ||
// where next() is only called once at a time | ||
var lastPromise = this[kLastPromise]; | ||
var promise; | ||
const lastPromise = this[kLastPromise]; | ||
let promise; | ||
if (lastPromise) { | ||
@@ -115,74 +96,76 @@ promise = new Promise(wrapForNext(lastPromise, this)); | ||
// without triggering the next() queue | ||
var data = this[kStream].read(); | ||
const data = this[kStream].read(); | ||
if (data !== null) { | ||
return Promise.resolve(createIterResult(data, false)); | ||
} | ||
promise = new Promise(this[kHandlePromise]); | ||
} | ||
this[kLastPromise] = promise; | ||
return promise; | ||
}, | ||
[Symbol.asyncIterator]() { | ||
return this; | ||
}, | ||
return() { | ||
// destroy(err, cb) is a private API | ||
// we can guarantee we have that here, because we control the | ||
// Readable class this is attached to | ||
return new Promise((resolve, reject) => { | ||
this[kStream].destroy(null, err => { | ||
if (err) { | ||
reject(err); | ||
return; | ||
} | ||
resolve(createIterResult(undefined, true)); | ||
}); | ||
}); | ||
} | ||
}, _defineProperty(_Object$setPrototypeO, Symbol.asyncIterator, function () { | ||
return this; | ||
}), _defineProperty(_Object$setPrototypeO, "return", function _return() { | ||
var _this2 = this; | ||
// destroy(err, cb) is a private API | ||
// we can guarantee we have that here, because we control the | ||
// Readable class this is attached to | ||
return new Promise(function (resolve, reject) { | ||
_this2[kStream].destroy(null, function (err) { | ||
if (err) { | ||
reject(err); | ||
return; | ||
} | ||
resolve(createIterResult(undefined, true)); | ||
}); | ||
}, AsyncIteratorPrototype); | ||
const createReadableStreamAsyncIterator = stream => { | ||
const iterator = Object.create(ReadableStreamAsyncIteratorPrototype, { | ||
[kStream]: { | ||
value: stream, | ||
writable: true | ||
}, | ||
[kLastResolve]: { | ||
value: null, | ||
writable: true | ||
}, | ||
[kLastReject]: { | ||
value: null, | ||
writable: true | ||
}, | ||
[kError]: { | ||
value: null, | ||
writable: true | ||
}, | ||
[kEnded]: { | ||
value: stream._readableState.endEmitted, | ||
writable: true | ||
}, | ||
// the function passed to new Promise | ||
// is cached so we avoid allocating a new | ||
// closure at every run | ||
[kHandlePromise]: { | ||
value: (resolve, reject) => { | ||
const data = iterator[kStream].read(); | ||
if (data) { | ||
iterator[kLastPromise] = null; | ||
iterator[kLastResolve] = null; | ||
iterator[kLastReject] = null; | ||
resolve(createIterResult(data, false)); | ||
} else { | ||
iterator[kLastResolve] = resolve; | ||
iterator[kLastReject] = reject; | ||
} | ||
}, | ||
writable: true | ||
} | ||
}); | ||
}), _Object$setPrototypeO), AsyncIteratorPrototype); | ||
var createReadableStreamAsyncIterator = function createReadableStreamAsyncIterator(stream) { | ||
var _Object$create; | ||
var iterator = Object.create(ReadableStreamAsyncIteratorPrototype, (_Object$create = {}, _defineProperty(_Object$create, kStream, { | ||
value: stream, | ||
writable: true | ||
}), _defineProperty(_Object$create, kLastResolve, { | ||
value: null, | ||
writable: true | ||
}), _defineProperty(_Object$create, kLastReject, { | ||
value: null, | ||
writable: true | ||
}), _defineProperty(_Object$create, kError, { | ||
value: null, | ||
writable: true | ||
}), _defineProperty(_Object$create, kEnded, { | ||
value: stream._readableState.endEmitted, | ||
writable: true | ||
}), _defineProperty(_Object$create, kHandlePromise, { | ||
value: function value(resolve, reject) { | ||
var data = iterator[kStream].read(); | ||
if (data) { | ||
iterator[kLastPromise] = null; | ||
iterator[kLastResolve] = null; | ||
iterator[kLastReject] = null; | ||
resolve(createIterResult(data, false)); | ||
} else { | ||
iterator[kLastResolve] = resolve; | ||
iterator[kLastReject] = reject; | ||
} | ||
}, | ||
writable: true | ||
}), _Object$create)); | ||
iterator[kLastPromise] = null; | ||
finished(stream, function (err) { | ||
finished(stream, err => { | ||
if (err && err.code !== 'ERR_STREAM_PREMATURE_CLOSE') { | ||
var reject = iterator[kLastReject]; // reject if we are waiting for data in the Promise | ||
const reject = iterator[kLastReject]; | ||
// reject if we are waiting for data in the Promise | ||
// returned by next() and store the error | ||
if (reject !== null) { | ||
@@ -194,9 +177,6 @@ iterator[kLastPromise] = null; | ||
} | ||
iterator[kError] = err; | ||
return; | ||
} | ||
var resolve = iterator[kLastResolve]; | ||
const resolve = iterator[kLastResolve]; | ||
if (resolve !== null) { | ||
@@ -208,3 +188,2 @@ iterator[kLastPromise] = null; | ||
} | ||
iterator[kEnded] = true; | ||
@@ -215,3 +194,2 @@ }); | ||
}; | ||
module.exports = createReadableStreamAsyncIterator; |
'use strict'; | ||
function ownKeys(object, enumerableOnly) { var keys = Object.keys(object); if (Object.getOwnPropertySymbols) { var symbols = Object.getOwnPropertySymbols(object); if (enumerableOnly) symbols = symbols.filter(function (sym) { return Object.getOwnPropertyDescriptor(object, sym).enumerable; }); keys.push.apply(keys, symbols); } return keys; } | ||
function _objectSpread(target) { for (var i = 1; i < arguments.length; i++) { var source = arguments[i] != null ? arguments[i] : {}; if (i % 2) { ownKeys(Object(source), true).forEach(function (key) { _defineProperty(target, key, source[key]); }); } else if (Object.getOwnPropertyDescriptors) { Object.defineProperties(target, Object.getOwnPropertyDescriptors(source)); } else { ownKeys(Object(source)).forEach(function (key) { Object.defineProperty(target, key, Object.getOwnPropertyDescriptor(source, key)); }); } } return target; } | ||
function _defineProperty(obj, key, value) { if (key in obj) { Object.defineProperty(obj, key, { value: value, enumerable: true, configurable: true, writable: true }); } else { obj[key] = value; } return obj; } | ||
function _classCallCheck(instance, Constructor) { if (!(instance instanceof Constructor)) { throw new TypeError("Cannot call a class as a function"); } } | ||
function _defineProperties(target, props) { for (var i = 0; i < props.length; i++) { var descriptor = props[i]; descriptor.enumerable = descriptor.enumerable || false; descriptor.configurable = true; if ("value" in descriptor) descriptor.writable = true; Object.defineProperty(target, descriptor.key, descriptor); } } | ||
function _createClass(Constructor, protoProps, staticProps) { if (protoProps) _defineProperties(Constructor.prototype, protoProps); if (staticProps) _defineProperties(Constructor, staticProps); return Constructor; } | ||
var _require = require('buffer'), | ||
Buffer = _require.Buffer; | ||
var _require2 = require('util'), | ||
inspect = _require2.inspect; | ||
var custom = inspect && inspect.custom || 'inspect'; | ||
function ownKeys(object, enumerableOnly) { var keys = Object.keys(object); if (Object.getOwnPropertySymbols) { var symbols = Object.getOwnPropertySymbols(object); enumerableOnly && (symbols = symbols.filter(function (sym) { return Object.getOwnPropertyDescriptor(object, sym).enumerable; })), keys.push.apply(keys, symbols); } return keys; } | ||
function _objectSpread(target) { for (var i = 1; i < arguments.length; i++) { var source = null != arguments[i] ? arguments[i] : {}; i % 2 ? ownKeys(Object(source), !0).forEach(function (key) { _defineProperty(target, key, source[key]); }) : Object.getOwnPropertyDescriptors ? Object.defineProperties(target, Object.getOwnPropertyDescriptors(source)) : ownKeys(Object(source)).forEach(function (key) { Object.defineProperty(target, key, Object.getOwnPropertyDescriptor(source, key)); }); } return target; } | ||
function _defineProperty(obj, key, value) { key = _toPropertyKey(key); if (key in obj) { Object.defineProperty(obj, key, { value: value, enumerable: true, configurable: true, writable: true }); } else { obj[key] = value; } return obj; } | ||
function _toPropertyKey(arg) { var key = _toPrimitive(arg, "string"); return typeof key === "symbol" ? key : String(key); } | ||
function _toPrimitive(input, hint) { if (typeof input !== "object" || input === null) return input; var prim = input[Symbol.toPrimitive]; if (prim !== undefined) { var res = prim.call(input, hint || "default"); if (typeof res !== "object") return res; throw new TypeError("@@toPrimitive must return a primitive value."); } return (hint === "string" ? String : Number)(input); } | ||
const _require = require('buffer'), | ||
Buffer = _require.Buffer; | ||
const _require2 = require('util'), | ||
inspect = _require2.inspect; | ||
const custom = inspect && inspect.custom || 'inspect'; | ||
function copyBuffer(src, target, offset) { | ||
Buffer.prototype.copy.call(src, target, offset); | ||
} | ||
module.exports = | ||
/*#__PURE__*/ | ||
function () { | ||
function BufferList() { | ||
_classCallCheck(this, BufferList); | ||
module.exports = class BufferList { | ||
constructor() { | ||
this.head = null; | ||
@@ -37,175 +22,135 @@ this.tail = null; | ||
} | ||
_createClass(BufferList, [{ | ||
key: "push", | ||
value: function push(v) { | ||
var entry = { | ||
data: v, | ||
next: null | ||
}; | ||
if (this.length > 0) this.tail.next = entry;else this.head = entry; | ||
this.tail = entry; | ||
++this.length; | ||
push(v) { | ||
const entry = { | ||
data: v, | ||
next: null | ||
}; | ||
if (this.length > 0) this.tail.next = entry;else this.head = entry; | ||
this.tail = entry; | ||
++this.length; | ||
} | ||
unshift(v) { | ||
const entry = { | ||
data: v, | ||
next: this.head | ||
}; | ||
if (this.length === 0) this.tail = entry; | ||
this.head = entry; | ||
++this.length; | ||
} | ||
shift() { | ||
if (this.length === 0) return; | ||
const ret = this.head.data; | ||
if (this.length === 1) this.head = this.tail = null;else this.head = this.head.next; | ||
--this.length; | ||
return ret; | ||
} | ||
clear() { | ||
this.head = this.tail = null; | ||
this.length = 0; | ||
} | ||
join(s) { | ||
if (this.length === 0) return ''; | ||
var p = this.head; | ||
var ret = '' + p.data; | ||
while (p = p.next) ret += s + p.data; | ||
return ret; | ||
} | ||
concat(n) { | ||
if (this.length === 0) return Buffer.alloc(0); | ||
const ret = Buffer.allocUnsafe(n >>> 0); | ||
var p = this.head; | ||
var i = 0; | ||
while (p) { | ||
copyBuffer(p.data, ret, i); | ||
i += p.data.length; | ||
p = p.next; | ||
} | ||
}, { | ||
key: "unshift", | ||
value: function unshift(v) { | ||
var entry = { | ||
data: v, | ||
next: this.head | ||
}; | ||
if (this.length === 0) this.tail = entry; | ||
this.head = entry; | ||
++this.length; | ||
} | ||
}, { | ||
key: "shift", | ||
value: function shift() { | ||
if (this.length === 0) return; | ||
var ret = this.head.data; | ||
if (this.length === 1) this.head = this.tail = null;else this.head = this.head.next; | ||
--this.length; | ||
return ret; | ||
} | ||
}, { | ||
key: "clear", | ||
value: function clear() { | ||
this.head = this.tail = null; | ||
this.length = 0; | ||
} | ||
}, { | ||
key: "join", | ||
value: function join(s) { | ||
if (this.length === 0) return ''; | ||
var p = this.head; | ||
var ret = '' + p.data; | ||
return ret; | ||
} | ||
while (p = p.next) { | ||
ret += s + p.data; | ||
} | ||
return ret; | ||
// Consumes a specified amount of bytes or characters from the buffered data. | ||
consume(n, hasStrings) { | ||
var ret; | ||
if (n < this.head.data.length) { | ||
// `slice` is the same for buffers and strings. | ||
ret = this.head.data.slice(0, n); | ||
this.head.data = this.head.data.slice(n); | ||
} else if (n === this.head.data.length) { | ||
// First chunk is a perfect match. | ||
ret = this.shift(); | ||
} else { | ||
// Result spans more than one buffer. | ||
ret = hasStrings ? this._getString(n) : this._getBuffer(n); | ||
} | ||
}, { | ||
key: "concat", | ||
value: function concat(n) { | ||
if (this.length === 0) return Buffer.alloc(0); | ||
var ret = Buffer.allocUnsafe(n >>> 0); | ||
var p = this.head; | ||
var i = 0; | ||
return ret; | ||
} | ||
first() { | ||
return this.head.data; | ||
} | ||
while (p) { | ||
copyBuffer(p.data, ret, i); | ||
i += p.data.length; | ||
p = p.next; | ||
// Consumes a specified amount of characters from the buffered data. | ||
_getString(n) { | ||
var p = this.head; | ||
var c = 1; | ||
var ret = p.data; | ||
n -= ret.length; | ||
while (p = p.next) { | ||
const str = p.data; | ||
const nb = n > str.length ? str.length : n; | ||
if (nb === str.length) ret += str;else ret += str.slice(0, n); | ||
n -= nb; | ||
if (n === 0) { | ||
if (nb === str.length) { | ||
++c; | ||
if (p.next) this.head = p.next;else this.head = this.tail = null; | ||
} else { | ||
this.head = p; | ||
p.data = str.slice(nb); | ||
} | ||
break; | ||
} | ||
return ret; | ||
} // Consumes a specified amount of bytes or characters from the buffered data. | ||
}, { | ||
key: "consume", | ||
value: function consume(n, hasStrings) { | ||
var ret; | ||
if (n < this.head.data.length) { | ||
// `slice` is the same for buffers and strings. | ||
ret = this.head.data.slice(0, n); | ||
this.head.data = this.head.data.slice(n); | ||
} else if (n === this.head.data.length) { | ||
// First chunk is a perfect match. | ||
ret = this.shift(); | ||
} else { | ||
// Result spans more than one buffer. | ||
ret = hasStrings ? this._getString(n) : this._getBuffer(n); | ||
} | ||
return ret; | ||
++c; | ||
} | ||
}, { | ||
key: "first", | ||
value: function first() { | ||
return this.head.data; | ||
} // Consumes a specified amount of characters from the buffered data. | ||
this.length -= c; | ||
return ret; | ||
} | ||
}, { | ||
key: "_getString", | ||
value: function _getString(n) { | ||
var p = this.head; | ||
var c = 1; | ||
var ret = p.data; | ||
n -= ret.length; | ||
while (p = p.next) { | ||
var str = p.data; | ||
var nb = n > str.length ? str.length : n; | ||
if (nb === str.length) ret += str;else ret += str.slice(0, n); | ||
n -= nb; | ||
if (n === 0) { | ||
if (nb === str.length) { | ||
++c; | ||
if (p.next) this.head = p.next;else this.head = this.tail = null; | ||
} else { | ||
this.head = p; | ||
p.data = str.slice(nb); | ||
} | ||
break; | ||
// Consumes a specified amount of bytes from the buffered data. | ||
_getBuffer(n) { | ||
const ret = Buffer.allocUnsafe(n); | ||
var p = this.head; | ||
var c = 1; | ||
p.data.copy(ret); | ||
n -= p.data.length; | ||
while (p = p.next) { | ||
const buf = p.data; | ||
const nb = n > buf.length ? buf.length : n; | ||
buf.copy(ret, ret.length - n, 0, nb); | ||
n -= nb; | ||
if (n === 0) { | ||
if (nb === buf.length) { | ||
++c; | ||
if (p.next) this.head = p.next;else this.head = this.tail = null; | ||
} else { | ||
this.head = p; | ||
p.data = buf.slice(nb); | ||
} | ||
++c; | ||
break; | ||
} | ||
this.length -= c; | ||
return ret; | ||
} // Consumes a specified amount of bytes from the buffered data. | ||
}, { | ||
key: "_getBuffer", | ||
value: function _getBuffer(n) { | ||
var ret = Buffer.allocUnsafe(n); | ||
var p = this.head; | ||
var c = 1; | ||
p.data.copy(ret); | ||
n -= p.data.length; | ||
while (p = p.next) { | ||
var buf = p.data; | ||
var nb = n > buf.length ? buf.length : n; | ||
buf.copy(ret, ret.length - n, 0, nb); | ||
n -= nb; | ||
if (n === 0) { | ||
if (nb === buf.length) { | ||
++c; | ||
if (p.next) this.head = p.next;else this.head = this.tail = null; | ||
} else { | ||
this.head = p; | ||
p.data = buf.slice(nb); | ||
} | ||
break; | ||
} | ||
++c; | ||
} | ||
this.length -= c; | ||
return ret; | ||
} // Make sure the linked list only shows the minimal necessary information. | ||
}, { | ||
key: custom, | ||
value: function value(_, options) { | ||
return inspect(this, _objectSpread({}, options, { | ||
// Only inspect one level. | ||
depth: 0, | ||
// It should not recurse. | ||
customInspect: false | ||
})); | ||
++c; | ||
} | ||
}]); | ||
this.length -= c; | ||
return ret; | ||
} | ||
return BufferList; | ||
}(); | ||
// Make sure the linked list only shows the minimal necessary information. | ||
[custom](_, options) { | ||
return inspect(this, _objectSpread(_objectSpread({}, options), {}, { | ||
// Only inspect one level. | ||
depth: 0, | ||
// It should not recurse. | ||
customInspect: false | ||
})); | ||
} | ||
}; |
@@ -1,9 +0,7 @@ | ||
'use strict'; // undocumented cb() API, needed for core, not for public API | ||
'use strict'; | ||
// undocumented cb() API, needed for core, not for public API | ||
function destroy(err, cb) { | ||
var _this = this; | ||
var readableDestroyed = this._readableState && this._readableState.destroyed; | ||
var writableDestroyed = this._writableState && this._writableState.destroyed; | ||
const readableDestroyed = this._readableState && this._readableState.destroyed; | ||
const writableDestroyed = this._writableState && this._writableState.destroyed; | ||
if (readableDestroyed || writableDestroyed) { | ||
@@ -20,38 +18,35 @@ if (cb) { | ||
} | ||
return this; | ||
} | ||
return this; | ||
} // we set destroyed to true before firing error callbacks in order | ||
// we set destroyed to true before firing error callbacks in order | ||
// to make it re-entrance safe in case destroy() is called within callbacks | ||
if (this._readableState) { | ||
this._readableState.destroyed = true; | ||
} // if this is a duplex stream mark the writable part as destroyed as well | ||
} | ||
// if this is a duplex stream mark the writable part as destroyed as well | ||
if (this._writableState) { | ||
this._writableState.destroyed = true; | ||
} | ||
this._destroy(err || null, function (err) { | ||
this._destroy(err || null, err => { | ||
if (!cb && err) { | ||
if (!_this._writableState) { | ||
process.nextTick(emitErrorAndCloseNT, _this, err); | ||
} else if (!_this._writableState.errorEmitted) { | ||
_this._writableState.errorEmitted = true; | ||
process.nextTick(emitErrorAndCloseNT, _this, err); | ||
if (!this._writableState) { | ||
process.nextTick(emitErrorAndCloseNT, this, err); | ||
} else if (!this._writableState.errorEmitted) { | ||
this._writableState.errorEmitted = true; | ||
process.nextTick(emitErrorAndCloseNT, this, err); | ||
} else { | ||
process.nextTick(emitCloseNT, _this); | ||
process.nextTick(emitCloseNT, this); | ||
} | ||
} else if (cb) { | ||
process.nextTick(emitCloseNT, _this); | ||
process.nextTick(emitCloseNT, this); | ||
cb(err); | ||
} else { | ||
process.nextTick(emitCloseNT, _this); | ||
process.nextTick(emitCloseNT, this); | ||
} | ||
}); | ||
return this; | ||
} | ||
function emitErrorAndCloseNT(self, err) { | ||
@@ -61,3 +56,2 @@ emitErrorNT(self, err); | ||
} | ||
function emitCloseNT(self) { | ||
@@ -68,3 +62,2 @@ if (self._writableState && !self._writableState.emitClose) return; | ||
} | ||
function undestroy() { | ||
@@ -77,3 +70,2 @@ if (this._readableState) { | ||
} | ||
if (this._writableState) { | ||
@@ -89,7 +81,5 @@ this._writableState.destroyed = false; | ||
} | ||
function emitErrorNT(self, err) { | ||
self.emit('error', err); | ||
} | ||
function errorOrDestroy(stream, err) { | ||
@@ -101,11 +91,11 @@ // We have tests that rely on errors being emitted | ||
// semver major update we should change the default to this. | ||
var rState = stream._readableState; | ||
var wState = stream._writableState; | ||
const rState = stream._readableState; | ||
const wState = stream._writableState; | ||
if (rState && rState.autoDestroy || wState && wState.autoDestroy) stream.destroy(err);else stream.emit('error', err); | ||
} | ||
module.exports = { | ||
destroy: destroy, | ||
undestroy: undestroy, | ||
errorOrDestroy: errorOrDestroy | ||
destroy, | ||
undestroy, | ||
errorOrDestroy | ||
}; |
// Ported from https://github.com/mafintosh/end-of-stream with | ||
// permission from the author, Mathias Buus (@mafintosh). | ||
'use strict'; | ||
var ERR_STREAM_PREMATURE_CLOSE = require('../../../errors').codes.ERR_STREAM_PREMATURE_CLOSE; | ||
const ERR_STREAM_PREMATURE_CLOSE = require('../../../errors').codes.ERR_STREAM_PREMATURE_CLOSE; | ||
function once(callback) { | ||
var called = false; | ||
let called = false; | ||
return function () { | ||
if (called) return; | ||
called = true; | ||
for (var _len = arguments.length, args = new Array(_len), _key = 0; _key < _len; _key++) { | ||
args[_key] = arguments[_key]; | ||
} | ||
callback.apply(this, args); | ||
}; | ||
} | ||
function noop() {} | ||
function isRequest(stream) { | ||
return stream.setHeader && typeof stream.abort === 'function'; | ||
} | ||
function eos(stream, opts, callback) { | ||
@@ -31,12 +26,9 @@ if (typeof opts === 'function') return eos(stream, null, opts); | ||
callback = once(callback || noop); | ||
var readable = opts.readable || opts.readable !== false && stream.readable; | ||
var writable = opts.writable || opts.writable !== false && stream.writable; | ||
var onlegacyfinish = function onlegacyfinish() { | ||
let readable = opts.readable || opts.readable !== false && stream.readable; | ||
let writable = opts.writable || opts.writable !== false && stream.writable; | ||
const onlegacyfinish = () => { | ||
if (!stream.writable) onfinish(); | ||
}; | ||
var writableEnded = stream._writableState && stream._writableState.finished; | ||
var onfinish = function onfinish() { | ||
const onfinish = () => { | ||
writable = false; | ||
@@ -46,6 +38,4 @@ writableEnded = true; | ||
}; | ||
var readableEnded = stream._readableState && stream._readableState.endEmitted; | ||
var onend = function onend() { | ||
const onend = () => { | ||
readable = false; | ||
@@ -55,10 +45,7 @@ readableEnded = true; | ||
}; | ||
var onerror = function onerror(err) { | ||
const onerror = err => { | ||
callback.call(stream, err); | ||
}; | ||
var onclose = function onclose() { | ||
var err; | ||
const onclose = () => { | ||
let err; | ||
if (readable && !readableEnded) { | ||
@@ -68,3 +55,2 @@ if (!stream._readableState || !stream._readableState.ended) err = new ERR_STREAM_PREMATURE_CLOSE(); | ||
} | ||
if (writable && !writableEnded) { | ||
@@ -75,7 +61,5 @@ if (!stream._writableState || !stream._writableState.ended) err = new ERR_STREAM_PREMATURE_CLOSE(); | ||
}; | ||
var onrequest = function onrequest() { | ||
const onrequest = () => { | ||
stream.req.on('finish', onfinish); | ||
}; | ||
if (isRequest(stream)) { | ||
@@ -90,3 +74,2 @@ stream.on('complete', onfinish); | ||
} | ||
stream.on('end', onend); | ||
@@ -109,3 +92,2 @@ stream.on('finish', onfinish); | ||
} | ||
module.exports = eos; |
'use strict'; | ||
function asyncGeneratorStep(gen, resolve, reject, _next, _throw, key, arg) { try { var info = gen[key](arg); var value = info.value; } catch (error) { reject(error); return; } if (info.done) { resolve(value); } else { Promise.resolve(value).then(_next, _throw); } } | ||
function _asyncToGenerator(fn) { return function () { var self = this, args = arguments; return new Promise(function (resolve, reject) { var gen = fn.apply(self, args); function _next(value) { asyncGeneratorStep(gen, resolve, reject, _next, _throw, "next", value); } function _throw(err) { asyncGeneratorStep(gen, resolve, reject, _next, _throw, "throw", err); } _next(undefined); }); }; } | ||
function ownKeys(object, enumerableOnly) { var keys = Object.keys(object); if (Object.getOwnPropertySymbols) { var symbols = Object.getOwnPropertySymbols(object); if (enumerableOnly) symbols = symbols.filter(function (sym) { return Object.getOwnPropertyDescriptor(object, sym).enumerable; }); keys.push.apply(keys, symbols); } return keys; } | ||
function _objectSpread(target) { for (var i = 1; i < arguments.length; i++) { var source = arguments[i] != null ? arguments[i] : {}; if (i % 2) { ownKeys(Object(source), true).forEach(function (key) { _defineProperty(target, key, source[key]); }); } else if (Object.getOwnPropertyDescriptors) { Object.defineProperties(target, Object.getOwnPropertyDescriptors(source)); } else { ownKeys(Object(source)).forEach(function (key) { Object.defineProperty(target, key, Object.getOwnPropertyDescriptor(source, key)); }); } } return target; } | ||
function _defineProperty(obj, key, value) { if (key in obj) { Object.defineProperty(obj, key, { value: value, enumerable: true, configurable: true, writable: true }); } else { obj[key] = value; } return obj; } | ||
var ERR_INVALID_ARG_TYPE = require('../../../errors').codes.ERR_INVALID_ARG_TYPE; | ||
function ownKeys(object, enumerableOnly) { var keys = Object.keys(object); if (Object.getOwnPropertySymbols) { var symbols = Object.getOwnPropertySymbols(object); enumerableOnly && (symbols = symbols.filter(function (sym) { return Object.getOwnPropertyDescriptor(object, sym).enumerable; })), keys.push.apply(keys, symbols); } return keys; } | ||
function _objectSpread(target) { for (var i = 1; i < arguments.length; i++) { var source = null != arguments[i] ? arguments[i] : {}; i % 2 ? ownKeys(Object(source), !0).forEach(function (key) { _defineProperty(target, key, source[key]); }) : Object.getOwnPropertyDescriptors ? Object.defineProperties(target, Object.getOwnPropertyDescriptors(source)) : ownKeys(Object(source)).forEach(function (key) { Object.defineProperty(target, key, Object.getOwnPropertyDescriptor(source, key)); }); } return target; } | ||
function _defineProperty(obj, key, value) { key = _toPropertyKey(key); if (key in obj) { Object.defineProperty(obj, key, { value: value, enumerable: true, configurable: true, writable: true }); } else { obj[key] = value; } return obj; } | ||
function _toPropertyKey(arg) { var key = _toPrimitive(arg, "string"); return typeof key === "symbol" ? key : String(key); } | ||
function _toPrimitive(input, hint) { if (typeof input !== "object" || input === null) return input; var prim = input[Symbol.toPrimitive]; if (prim !== undefined) { var res = prim.call(input, hint || "default"); if (typeof res !== "object") return res; throw new TypeError("@@toPrimitive must return a primitive value."); } return (hint === "string" ? String : Number)(input); } | ||
const ERR_INVALID_ARG_TYPE = require('../../../errors').codes.ERR_INVALID_ARG_TYPE; | ||
function from(Readable, iterable, opts) { | ||
var iterator; | ||
let iterator; | ||
if (iterable && typeof iterable.next === 'function') { | ||
iterator = iterable; | ||
} else if (iterable && iterable[Symbol.asyncIterator]) iterator = iterable[Symbol.asyncIterator]();else if (iterable && iterable[Symbol.iterator]) iterator = iterable[Symbol.iterator]();else throw new ERR_INVALID_ARG_TYPE('iterable', ['Iterable'], iterable); | ||
var readable = new Readable(_objectSpread({ | ||
const readable = new Readable(_objectSpread({ | ||
objectMode: true | ||
}, opts)); // Reading boolean to protect against _read | ||
}, opts)); | ||
// Reading boolean to protect against _read | ||
// being called before last iteration completion. | ||
var reading = false; | ||
let reading = false; | ||
readable._read = function () { | ||
@@ -35,17 +28,14 @@ if (!reading) { | ||
}; | ||
function next() { | ||
return _next2.apply(this, arguments); | ||
} | ||
function _next2() { | ||
_next2 = _asyncToGenerator(function* () { | ||
try { | ||
var _ref = yield iterator.next(), | ||
value = _ref.value, | ||
done = _ref.done; | ||
const _yield$iterator$next = yield iterator.next(), | ||
value = _yield$iterator$next.value, | ||
done = _yield$iterator$next.done; | ||
if (done) { | ||
readable.push(null); | ||
} else if (readable.push((yield value))) { | ||
} else if (readable.push(yield value)) { | ||
next(); | ||
@@ -61,6 +51,4 @@ } else { | ||
} | ||
return readable; | ||
} | ||
module.exports = from; |
// Ported from https://github.com/mafintosh/pump with | ||
// permission from the author, Mathias Buus (@mafintosh). | ||
'use strict'; | ||
var eos; | ||
let eos; | ||
function once(callback) { | ||
var called = false; | ||
let called = false; | ||
return function () { | ||
if (called) return; | ||
called = true; | ||
callback.apply(void 0, arguments); | ||
callback(...arguments); | ||
}; | ||
} | ||
var _require$codes = require('../../../errors').codes, | ||
ERR_MISSING_ARGS = _require$codes.ERR_MISSING_ARGS, | ||
ERR_STREAM_DESTROYED = _require$codes.ERR_STREAM_DESTROYED; | ||
const _require$codes = require('../../../errors').codes, | ||
ERR_MISSING_ARGS = _require$codes.ERR_MISSING_ARGS, | ||
ERR_STREAM_DESTROYED = _require$codes.ERR_STREAM_DESTROYED; | ||
function noop(err) { | ||
@@ -24,11 +22,9 @@ // Rethrow the error if it exists to avoid swallowing it | ||
} | ||
function isRequest(stream) { | ||
return stream.setHeader && typeof stream.abort === 'function'; | ||
} | ||
function destroyer(stream, reading, writing, callback) { | ||
callback = once(callback); | ||
var closed = false; | ||
stream.on('close', function () { | ||
let closed = false; | ||
stream.on('close', () => { | ||
closed = true; | ||
@@ -40,3 +36,3 @@ }); | ||
writable: writing | ||
}, function (err) { | ||
}, err => { | ||
if (err) return callback(err); | ||
@@ -46,8 +42,9 @@ closed = true; | ||
}); | ||
var destroyed = false; | ||
return function (err) { | ||
let destroyed = false; | ||
return err => { | ||
if (closed) return; | ||
if (destroyed) return; | ||
destroyed = true; // request.destroy just do .end - .abort is what we want | ||
destroyed = true; | ||
// request.destroy just do .end - .abort is what we want | ||
if (isRequest(stream)) return stream.abort(); | ||
@@ -58,11 +55,8 @@ if (typeof stream.destroy === 'function') return stream.destroy(); | ||
} | ||
function call(fn) { | ||
fn(); | ||
} | ||
function pipe(from, to) { | ||
return from.pipe(to); | ||
} | ||
function popCallback(streams) { | ||
@@ -73,3 +67,2 @@ if (!streams.length) return noop; | ||
} | ||
function pipeline() { | ||
@@ -79,14 +72,11 @@ for (var _len = arguments.length, streams = new Array(_len), _key = 0; _key < _len; _key++) { | ||
} | ||
var callback = popCallback(streams); | ||
const callback = popCallback(streams); | ||
if (Array.isArray(streams[0])) streams = streams[0]; | ||
if (streams.length < 2) { | ||
throw new ERR_MISSING_ARGS('streams'); | ||
} | ||
var error; | ||
var destroys = streams.map(function (stream, i) { | ||
var reading = i < streams.length - 1; | ||
var writing = i > 0; | ||
let error; | ||
const destroys = streams.map(function (stream, i) { | ||
const reading = i < streams.length - 1; | ||
const writing = i > 0; | ||
return destroyer(stream, reading, writing, function (err) { | ||
@@ -102,3 +92,2 @@ if (!error) error = err; | ||
} | ||
module.exports = pipeline; |
'use strict'; | ||
var ERR_INVALID_OPT_VALUE = require('../../../errors').codes.ERR_INVALID_OPT_VALUE; | ||
const ERR_INVALID_OPT_VALUE = require('../../../errors').codes.ERR_INVALID_OPT_VALUE; | ||
function highWaterMarkFrom(options, isDuplex, duplexKey) { | ||
return options.highWaterMark != null ? options.highWaterMark : isDuplex ? options[duplexKey] : null; | ||
} | ||
function getHighWaterMark(state, options, duplexKey, isDuplex) { | ||
var hwm = highWaterMarkFrom(options, isDuplex, duplexKey); | ||
const hwm = highWaterMarkFrom(options, isDuplex, duplexKey); | ||
if (hwm != null) { | ||
if (!(isFinite(hwm) && Math.floor(hwm) === hwm) || hwm < 0) { | ||
var name = isDuplex ? duplexKey : 'highWaterMark'; | ||
const name = isDuplex ? duplexKey : 'highWaterMark'; | ||
throw new ERR_INVALID_OPT_VALUE(name, hwm); | ||
} | ||
return Math.floor(hwm); | ||
} // Default value | ||
} | ||
// Default value | ||
return state.objectMode ? 16 : 16 * 1024; | ||
} | ||
module.exports = { | ||
getHighWaterMark: getHighWaterMark | ||
getHighWaterMark | ||
}; |
{ | ||
"name": "readable-stream", | ||
"version": "3.6.0", | ||
"version": "3.6.1", | ||
"description": "Streams3, a user-land copy of the stream library from Node.js", | ||
@@ -5,0 +5,0 @@ "main": "readable.js", |
@@ -18,3 +18,3 @@ # readable-stream | ||
Full documentation may be found on the [Node.js website](https://nodejs.org/dist/v10.19.0/docs/api/stream.html). | ||
Full documentation may be found on the [Node.js website](https://nodejs.org/dist/v10.18.1/docs/api/stream.html). | ||
@@ -21,0 +21,0 @@ If you want to guarantee a stable streams base, regardless of what version of |
2784
120956