minipass
Advanced tools
Comparing version 4.0.1 to 4.0.2
@@ -143,6 +143,6 @@ /// <reference types="node" /> | ||
[Symbol.iterator](): Iterator<RType> | ||
[Symbol.asyncIterator](): AsyncIterator<RType> | ||
[Symbol.iterator](): Generator<RType, void, void> | ||
[Symbol.asyncIterator](): AsyncGenerator<RType, void, void> | ||
} | ||
export = Minipass |
457
index.js
'use strict' | ||
const proc = typeof process === 'object' && process ? process : { | ||
stdout: null, | ||
stderr: null, | ||
} | ||
const proc = | ||
typeof process === 'object' && process | ||
? process | ||
: { | ||
stdout: null, | ||
stderr: null, | ||
} | ||
const EE = require('events') | ||
@@ -30,3 +33,6 @@ const Stream = require('stream') | ||
const OBJECTMODE = Symbol('objectMode') | ||
// internal event when stream is destroyed | ||
const DESTROYED = Symbol('destroyed') | ||
// internal event when stream has an error | ||
const ERROR = Symbol('error') | ||
const EMITDATA = Symbol('emitData') | ||
@@ -40,7 +46,7 @@ const EMITEND = Symbol('emitEnd') | ||
// TODO remove when Node v8 support drops | ||
const doIter = global._MP_NO_ITERATOR_SYMBOLS_ !== '1' | ||
const ASYNCITERATOR = doIter && Symbol.asyncIterator | ||
|| Symbol('asyncIterator not implemented') | ||
const ITERATOR = doIter && Symbol.iterator | ||
|| Symbol('iterator not implemented') | ||
const doIter = global._MP_NO_ITERATOR_SYMBOLS_ !== '1' | ||
const ASYNCITERATOR = | ||
(doIter && Symbol.asyncIterator) || Symbol('asyncIterator not implemented') | ||
const ITERATOR = | ||
(doIter && Symbol.iterator) || Symbol('iterator not implemented') | ||
@@ -50,12 +56,10 @@ // events that mean 'the stream is over' | ||
// if they are listened for after emitting. | ||
const isEndish = ev => | ||
ev === 'end' || | ||
ev === 'finish' || | ||
ev === 'prefinish' | ||
const isEndish = ev => ev === 'end' || ev === 'finish' || ev === 'prefinish' | ||
const isArrayBuffer = b => b instanceof ArrayBuffer || | ||
typeof b === 'object' && | ||
b.constructor && | ||
b.constructor.name === 'ArrayBuffer' && | ||
b.byteLength >= 0 | ||
const isArrayBuffer = b => | ||
b instanceof ArrayBuffer || | ||
(typeof b === 'object' && | ||
b.constructor && | ||
b.constructor.name === 'ArrayBuffer' && | ||
b.byteLength >= 0) | ||
@@ -65,3 +69,3 @@ const isArrayBufferView = b => !Buffer.isBuffer(b) && ArrayBuffer.isView(b) | ||
class Pipe { | ||
constructor (src, dest, opts) { | ||
constructor(src, dest, opts) { | ||
this.src = src | ||
@@ -73,11 +77,10 @@ this.dest = dest | ||
} | ||
unpipe () { | ||
unpipe() { | ||
this.dest.removeListener('drain', this.ondrain) | ||
} | ||
// istanbul ignore next - only here for the prototype | ||
proxyErrors () {} | ||
end () { | ||
proxyErrors() {} | ||
end() { | ||
this.unpipe() | ||
if (this.opts.end) | ||
this.dest.end() | ||
if (this.opts.end) this.dest.end() | ||
} | ||
@@ -87,7 +90,7 @@ } | ||
class PipeProxyErrors extends Pipe { | ||
unpipe () { | ||
unpipe() { | ||
this.src.removeListener('error', this.proxyErrors) | ||
super.unpipe() | ||
} | ||
constructor (src, dest, opts) { | ||
constructor(src, dest, opts) { | ||
super(src, dest, opts) | ||
@@ -100,3 +103,3 @@ this.proxyErrors = er => dest.emit('error', er) | ||
module.exports = class Minipass extends Stream { | ||
constructor (options) { | ||
constructor(options) { | ||
super() | ||
@@ -108,10 +111,7 @@ this[FLOWING] = false | ||
this[BUFFER] = [] | ||
this[OBJECTMODE] = options && options.objectMode || false | ||
if (this[OBJECTMODE]) | ||
this[ENCODING] = null | ||
else | ||
this[ENCODING] = options && options.encoding || null | ||
if (this[ENCODING] === 'buffer') | ||
this[ENCODING] = null | ||
this[ASYNC] = options && !!options.async || false | ||
this[OBJECTMODE] = (options && options.objectMode) || false | ||
if (this[OBJECTMODE]) this[ENCODING] = null | ||
else this[ENCODING] = (options && options.encoding) || null | ||
if (this[ENCODING] === 'buffer') this[ENCODING] = null | ||
this[ASYNC] = (options && !!options.async) || false | ||
this[DECODER] = this[ENCODING] ? new SD(this[ENCODING]) : null | ||
@@ -135,11 +135,17 @@ this[EOF] = false | ||
get bufferLength () { return this[BUFFERLENGTH] } | ||
get bufferLength() { | ||
return this[BUFFERLENGTH] | ||
} | ||
get encoding () { return this[ENCODING] } | ||
set encoding (enc) { | ||
if (this[OBJECTMODE]) | ||
throw new Error('cannot set encoding in objectMode') | ||
get encoding() { | ||
return this[ENCODING] | ||
} | ||
set encoding(enc) { | ||
if (this[OBJECTMODE]) throw new Error('cannot set encoding in objectMode') | ||
if (this[ENCODING] && enc !== this[ENCODING] && | ||
(this[DECODER] && this[DECODER].lastNeed || this[BUFFERLENGTH])) | ||
if ( | ||
this[ENCODING] && | ||
enc !== this[ENCODING] && | ||
((this[DECODER] && this[DECODER].lastNeed) || this[BUFFERLENGTH]) | ||
) | ||
throw new Error('cannot change encoding') | ||
@@ -156,29 +162,37 @@ | ||
setEncoding (enc) { | ||
setEncoding(enc) { | ||
this.encoding = enc | ||
} | ||
get objectMode () { return this[OBJECTMODE] } | ||
set objectMode (om) { this[OBJECTMODE] = this[OBJECTMODE] || !!om } | ||
get objectMode() { | ||
return this[OBJECTMODE] | ||
} | ||
set objectMode(om) { | ||
this[OBJECTMODE] = this[OBJECTMODE] || !!om | ||
} | ||
get ['async'] () { return this[ASYNC] } | ||
set ['async'] (a) { this[ASYNC] = this[ASYNC] || !!a } | ||
get ['async']() { | ||
return this[ASYNC] | ||
} | ||
set ['async'](a) { | ||
this[ASYNC] = this[ASYNC] || !!a | ||
} | ||
write (chunk, encoding, cb) { | ||
if (this[EOF]) | ||
throw new Error('write after end') | ||
write(chunk, encoding, cb) { | ||
if (this[EOF]) throw new Error('write after end') | ||
if (this[DESTROYED]) { | ||
this.emit('error', Object.assign( | ||
new Error('Cannot call write after a stream was destroyed'), | ||
{ code: 'ERR_STREAM_DESTROYED' } | ||
)) | ||
this.emit( | ||
'error', | ||
Object.assign( | ||
new Error('Cannot call write after a stream was destroyed'), | ||
{ code: 'ERR_STREAM_DESTROYED' } | ||
) | ||
) | ||
return true | ||
} | ||
if (typeof encoding === 'function') | ||
cb = encoding, encoding = 'utf8' | ||
if (typeof encoding === 'function') (cb = encoding), (encoding = 'utf8') | ||
if (!encoding) | ||
encoding = 'utf8' | ||
if (!encoding) encoding = 'utf8' | ||
@@ -194,4 +208,3 @@ const fn = this[ASYNC] ? defer : f => f() | ||
chunk = Buffer.from(chunk.buffer, chunk.byteOffset, chunk.byteLength) | ||
else if (isArrayBuffer(chunk)) | ||
chunk = Buffer.from(chunk) | ||
else if (isArrayBuffer(chunk)) chunk = Buffer.from(chunk) | ||
else if (typeof chunk !== 'string') | ||
@@ -206,15 +219,10 @@ // use the setter so we throw if we have encoding set | ||
/* istanbul ignore if - maybe impossible? */ | ||
if (this.flowing && this[BUFFERLENGTH] !== 0) | ||
this[FLUSH](true) | ||
if (this.flowing && this[BUFFERLENGTH] !== 0) this[FLUSH](true) | ||
if (this.flowing) | ||
this.emit('data', chunk) | ||
else | ||
this[BUFFERPUSH](chunk) | ||
if (this.flowing) this.emit('data', chunk) | ||
else this[BUFFERPUSH](chunk) | ||
if (this[BUFFERLENGTH] !== 0) | ||
this.emit('readable') | ||
if (this[BUFFERLENGTH] !== 0) this.emit('readable') | ||
if (cb) | ||
fn(cb) | ||
if (cb) fn(cb) | ||
@@ -227,6 +235,4 @@ return this.flowing | ||
if (!chunk.length) { | ||
if (this[BUFFERLENGTH] !== 0) | ||
this.emit('readable') | ||
if (cb) | ||
fn(cb) | ||
if (this[BUFFERLENGTH] !== 0) this.emit('readable') | ||
if (cb) fn(cb) | ||
return this.flowing | ||
@@ -237,5 +243,7 @@ } | ||
// an empty buffer, skipping the buffer/decoder dance | ||
if (typeof chunk === 'string' && | ||
// unless it is a string already ready for us to use | ||
!(encoding === this[ENCODING] && !this[DECODER].lastNeed)) { | ||
if ( | ||
typeof chunk === 'string' && | ||
// unless it is a string already ready for us to use | ||
!(encoding === this[ENCODING] && !this[DECODER].lastNeed) | ||
) { | ||
chunk = Buffer.from(chunk, encoding) | ||
@@ -248,15 +256,10 @@ } | ||
// Note: flushing CAN potentially switch us into not-flowing mode | ||
if (this.flowing && this[BUFFERLENGTH] !== 0) | ||
this[FLUSH](true) | ||
if (this.flowing && this[BUFFERLENGTH] !== 0) this[FLUSH](true) | ||
if (this.flowing) | ||
this.emit('data', chunk) | ||
else | ||
this[BUFFERPUSH](chunk) | ||
if (this.flowing) this.emit('data', chunk) | ||
else this[BUFFERPUSH](chunk) | ||
if (this[BUFFERLENGTH] !== 0) | ||
this.emit('readable') | ||
if (this[BUFFERLENGTH] !== 0) this.emit('readable') | ||
if (cb) | ||
fn(cb) | ||
if (cb) fn(cb) | ||
@@ -266,5 +269,4 @@ return this.flowing | ||
read (n) { | ||
if (this[DESTROYED]) | ||
return null | ||
read(n) { | ||
if (this[DESTROYED]) return null | ||
@@ -276,10 +278,7 @@ if (this[BUFFERLENGTH] === 0 || n === 0 || n > this[BUFFERLENGTH]) { | ||
if (this[OBJECTMODE]) | ||
n = null | ||
if (this[OBJECTMODE]) n = null | ||
if (this[BUFFER].length > 1 && !this[OBJECTMODE]) { | ||
if (this.encoding) | ||
this[BUFFER] = [this[BUFFER].join('')] | ||
else | ||
this[BUFFER] = [Buffer.concat(this[BUFFER], this[BUFFERLENGTH])] | ||
if (this.encoding) this[BUFFER] = [this[BUFFER].join('')] | ||
else this[BUFFER] = [Buffer.concat(this[BUFFER], this[BUFFERLENGTH])] | ||
} | ||
@@ -292,5 +291,4 @@ | ||
[READ] (n, chunk) { | ||
if (n === chunk.length || n === null) | ||
this[BUFFERSHIFT]() | ||
[READ](n, chunk) { | ||
if (n === chunk.length || n === null) this[BUFFERSHIFT]() | ||
else { | ||
@@ -304,4 +302,3 @@ this[BUFFER][0] = chunk.slice(n) | ||
if (!this[BUFFER].length && !this[EOF]) | ||
this.emit('drain') | ||
if (!this[BUFFER].length && !this[EOF]) this.emit('drain') | ||
@@ -311,11 +308,7 @@ return chunk | ||
end (chunk, encoding, cb) { | ||
if (typeof chunk === 'function') | ||
cb = chunk, chunk = null | ||
if (typeof encoding === 'function') | ||
cb = encoding, encoding = 'utf8' | ||
if (chunk) | ||
this.write(chunk, encoding) | ||
if (cb) | ||
this.once('end', cb) | ||
end(chunk, encoding, cb) { | ||
if (typeof chunk === 'function') (cb = chunk), (chunk = null) | ||
if (typeof encoding === 'function') (cb = encoding), (encoding = 'utf8') | ||
if (chunk) this.write(chunk, encoding) | ||
if (cb) this.once('end', cb) | ||
this[EOF] = true | ||
@@ -328,4 +321,3 @@ this.writable = false | ||
// This makes MP more suitable to write-only use cases. | ||
if (this.flowing || !this[PAUSED]) | ||
this[MAYBE_EMIT_END]() | ||
if (this.flowing || !this[PAUSED]) this[MAYBE_EMIT_END]() | ||
return this | ||
@@ -335,5 +327,4 @@ } | ||
// don't let the internal resume be overwritten | ||
[RESUME] () { | ||
if (this[DESTROYED]) | ||
return | ||
[RESUME]() { | ||
if (this[DESTROYED]) return | ||
@@ -343,15 +334,12 @@ this[PAUSED] = false | ||
this.emit('resume') | ||
if (this[BUFFER].length) | ||
this[FLUSH]() | ||
else if (this[EOF]) | ||
this[MAYBE_EMIT_END]() | ||
else | ||
this.emit('drain') | ||
if (this[BUFFER].length) this[FLUSH]() | ||
else if (this[EOF]) this[MAYBE_EMIT_END]() | ||
else this.emit('drain') | ||
} | ||
resume () { | ||
resume() { | ||
return this[RESUME]() | ||
} | ||
pause () { | ||
pause() { | ||
this[FLOWING] = false | ||
@@ -361,28 +349,24 @@ this[PAUSED] = true | ||
get destroyed () { | ||
get destroyed() { | ||
return this[DESTROYED] | ||
} | ||
get flowing () { | ||
get flowing() { | ||
return this[FLOWING] | ||
} | ||
get paused () { | ||
get paused() { | ||
return this[PAUSED] | ||
} | ||
[BUFFERPUSH] (chunk) { | ||
if (this[OBJECTMODE]) | ||
this[BUFFERLENGTH] += 1 | ||
else | ||
this[BUFFERLENGTH] += chunk.length | ||
[BUFFERPUSH](chunk) { | ||
if (this[OBJECTMODE]) this[BUFFERLENGTH] += 1 | ||
else this[BUFFERLENGTH] += chunk.length | ||
this[BUFFER].push(chunk) | ||
} | ||
[BUFFERSHIFT] () { | ||
[BUFFERSHIFT]() { | ||
if (this[BUFFER].length) { | ||
if (this[OBJECTMODE]) | ||
this[BUFFERLENGTH] -= 1 | ||
else | ||
this[BUFFERLENGTH] -= this[BUFFER][0].length | ||
if (this[OBJECTMODE]) this[BUFFERLENGTH] -= 1 | ||
else this[BUFFERLENGTH] -= this[BUFFER][0].length | ||
} | ||
@@ -392,23 +376,19 @@ return this[BUFFER].shift() | ||
[FLUSH] (noDrain) { | ||
[FLUSH](noDrain) { | ||
do {} while (this[FLUSHCHUNK](this[BUFFERSHIFT]())) | ||
if (!noDrain && !this[BUFFER].length && !this[EOF]) | ||
this.emit('drain') | ||
if (!noDrain && !this[BUFFER].length && !this[EOF]) this.emit('drain') | ||
} | ||
[FLUSHCHUNK] (chunk) { | ||
[FLUSHCHUNK](chunk) { | ||
return chunk ? (this.emit('data', chunk), this.flowing) : false | ||
} | ||
pipe (dest, opts) { | ||
if (this[DESTROYED]) | ||
return | ||
pipe(dest, opts) { | ||
if (this[DESTROYED]) return | ||
const ended = this[EMITTED_END] | ||
opts = opts || {} | ||
if (dest === proc.stdout || dest === proc.stderr) | ||
opts.end = false | ||
else | ||
opts.end = opts.end !== false | ||
if (dest === proc.stdout || dest === proc.stderr) opts.end = false | ||
else opts.end = opts.end !== false | ||
opts.proxyErrors = !!opts.proxyErrors | ||
@@ -418,11 +398,11 @@ | ||
if (ended) { | ||
if (opts.end) | ||
dest.end() | ||
if (opts.end) dest.end() | ||
} else { | ||
this[PIPES].push(!opts.proxyErrors ? new Pipe(this, dest, opts) | ||
: new PipeProxyErrors(this, dest, opts)) | ||
if (this[ASYNC]) | ||
defer(() => this[RESUME]()) | ||
else | ||
this[RESUME]() | ||
this[PIPES].push( | ||
!opts.proxyErrors | ||
? new Pipe(this, dest, opts) | ||
: new PipeProxyErrors(this, dest, opts) | ||
) | ||
if (this[ASYNC]) defer(() => this[RESUME]()) | ||
else this[RESUME]() | ||
} | ||
@@ -433,3 +413,3 @@ | ||
unpipe (dest) { | ||
unpipe(dest) { | ||
const p = this[PIPES].find(p => p.dest === dest) | ||
@@ -442,10 +422,9 @@ if (p) { | ||
addListener (ev, fn) { | ||
addListener(ev, fn) { | ||
return this.on(ev, fn) | ||
} | ||
on (ev, fn) { | ||
on(ev, fn) { | ||
const ret = super.on(ev, fn) | ||
if (ev === 'data' && !this[PIPES].length && !this.flowing) | ||
this[RESUME]() | ||
if (ev === 'data' && !this[PIPES].length && !this.flowing) this[RESUME]() | ||
else if (ev === 'readable' && this[BUFFERLENGTH] !== 0) | ||
@@ -457,6 +436,4 @@ super.emit('readable') | ||
} else if (ev === 'error' && this[EMITTED_ERROR]) { | ||
if (this[ASYNC]) | ||
defer(() => fn.call(this, this[EMITTED_ERROR])) | ||
else | ||
fn.call(this, this[EMITTED_ERROR]) | ||
if (this[ASYNC]) defer(() => fn.call(this, this[EMITTED_ERROR])) | ||
else fn.call(this, this[EMITTED_ERROR]) | ||
} | ||
@@ -466,12 +443,14 @@ return ret | ||
get emittedEnd () { | ||
get emittedEnd() { | ||
return this[EMITTED_END] | ||
} | ||
[MAYBE_EMIT_END] () { | ||
if (!this[EMITTING_END] && | ||
!this[EMITTED_END] && | ||
!this[DESTROYED] && | ||
this[BUFFER].length === 0 && | ||
this[EOF]) { | ||
[MAYBE_EMIT_END]() { | ||
if ( | ||
!this[EMITTING_END] && | ||
!this[EMITTED_END] && | ||
!this[DESTROYED] && | ||
this[BUFFER].length === 0 && | ||
this[EOF] | ||
) { | ||
this[EMITTING_END] = true | ||
@@ -481,4 +460,3 @@ this.emit('end') | ||
this.emit('finish') | ||
if (this[CLOSED]) | ||
this.emit('close') | ||
if (this[CLOSED]) this.emit('close') | ||
this[EMITTING_END] = false | ||
@@ -488,3 +466,3 @@ } | ||
emit (ev, data, ...extra) { | ||
emit(ev, data, ...extra) { | ||
// error and close are only events allowed after calling destroy() | ||
@@ -494,4 +472,6 @@ if (ev !== 'error' && ev !== 'close' && ev !== DESTROYED && this[DESTROYED]) | ||
else if (ev === 'data') { | ||
return !data ? false | ||
: this[ASYNC] ? defer(() => this[EMITDATA](data)) | ||
return !data | ||
? false | ||
: this[ASYNC] | ||
? defer(() => this[EMITDATA](data)) | ||
: this[EMITDATA](data) | ||
@@ -503,4 +483,3 @@ } else if (ev === 'end') { | ||
// don't emit close before 'end' and 'finish' | ||
if (!this[EMITTED_END] && !this[DESTROYED]) | ||
return | ||
if (!this[EMITTED_END] && !this[DESTROYED]) return | ||
const ret = super.emit('close') | ||
@@ -511,2 +490,3 @@ this.removeAllListeners('close') | ||
this[EMITTED_ERROR] = data | ||
super.emit(ERROR, data) | ||
const ret = super.emit('error', data) | ||
@@ -531,6 +511,5 @@ this[MAYBE_EMIT_END]() | ||
[EMITDATA] (data) { | ||
[EMITDATA](data) { | ||
for (const p of this[PIPES]) { | ||
if (p.dest.write(data) === false) | ||
this.pause() | ||
if (p.dest.write(data) === false) this.pause() | ||
} | ||
@@ -542,15 +521,12 @@ const ret = super.emit('data', data) | ||
[EMITEND] () { | ||
if (this[EMITTED_END]) | ||
return | ||
[EMITEND]() { | ||
if (this[EMITTED_END]) return | ||
this[EMITTED_END] = true | ||
this.readable = false | ||
if (this[ASYNC]) | ||
defer(() => this[EMITEND2]()) | ||
else | ||
this[EMITEND2]() | ||
if (this[ASYNC]) defer(() => this[EMITEND2]()) | ||
else this[EMITEND2]() | ||
} | ||
[EMITEND2] () { | ||
[EMITEND2]() { | ||
if (this[DECODER]) { | ||
@@ -575,6 +551,5 @@ const data = this[DECODER].end() | ||
// const all = await stream.collect() | ||
collect () { | ||
collect() { | ||
const buf = [] | ||
if (!this[OBJECTMODE]) | ||
buf.dataLength = 0 | ||
if (!this[OBJECTMODE]) buf.dataLength = 0 | ||
// set the promise first, in case an error is raised | ||
@@ -585,4 +560,3 @@ // by triggering the flow here. | ||
buf.push(c) | ||
if (!this[OBJECTMODE]) | ||
buf.dataLength += c.length | ||
if (!this[OBJECTMODE]) buf.dataLength += c.length | ||
}) | ||
@@ -593,3 +567,3 @@ return p.then(() => buf) | ||
// const data = await stream.concat() | ||
concat () { | ||
concat() { | ||
return this[OBJECTMODE] | ||
@@ -600,7 +574,10 @@ ? Promise.reject(new Error('cannot concat in objectMode')) | ||
? Promise.reject(new Error('cannot concat in objectMode')) | ||
: this[ENCODING] ? buf.join('') : Buffer.concat(buf, buf.dataLength)) | ||
: this[ENCODING] | ||
? buf.join('') | ||
: Buffer.concat(buf, buf.dataLength) | ||
) | ||
} | ||
// stream.promise().then(() => done, er => emitted error) | ||
promise () { | ||
promise() { | ||
return new Promise((resolve, reject) => { | ||
@@ -614,10 +591,15 @@ this.on(DESTROYED, () => reject(new Error('stream destroyed'))) | ||
// for await (let chunk of stream) | ||
[ASYNCITERATOR] () { | ||
[ASYNCITERATOR]() { | ||
let stopped = false | ||
const stop = () => { | ||
this.pause() | ||
stopped = true | ||
return Promise.resolve({ done: true }) | ||
} | ||
const next = () => { | ||
if (stopped) return stop() | ||
const res = this.read() | ||
if (res !== null) | ||
return Promise.resolve({ done: false, value: res }) | ||
if (res !== null) return Promise.resolve({ done: false, value: res }) | ||
if (this[EOF]) | ||
return Promise.resolve({ done: true }) | ||
if (this[EOF]) return stop() | ||
@@ -629,2 +611,3 @@ let resolve = null | ||
this.removeListener('end', onend) | ||
stop() | ||
reject(er) | ||
@@ -641,2 +624,3 @@ } | ||
this.removeListener('data', ondata) | ||
stop() | ||
resolve({ done: true }) | ||
@@ -655,21 +639,45 @@ } | ||
return { next } | ||
return { | ||
next, | ||
throw: stop, | ||
return: stop, | ||
[ASYNCITERATOR]() { | ||
return this | ||
}, | ||
} | ||
} | ||
// for (let chunk of stream) | ||
[ITERATOR] () { | ||
[ITERATOR]() { | ||
let stopped = false | ||
const stop = () => { | ||
this.pause() | ||
this.removeListener(ERROR, stop) | ||
this.removeListener('end', stop) | ||
stopped = true | ||
return { done: true } | ||
} | ||
const next = () => { | ||
if (stopped) return stop() | ||
const value = this.read() | ||
const done = value === null | ||
return { value, done } | ||
return value === null ? stop() : { value } | ||
} | ||
return { next } | ||
this.once('end', stop) | ||
this.once(ERROR, stop) | ||
return { | ||
next, | ||
throw: stop, | ||
return: stop, | ||
[ITERATOR]() { | ||
return this | ||
}, | ||
} | ||
} | ||
destroy (er) { | ||
destroy(er) { | ||
if (this[DESTROYED]) { | ||
if (er) | ||
this.emit('error', er) | ||
else | ||
this.emit(DESTROYED) | ||
if (er) this.emit('error', er) | ||
else this.emit(DESTROYED) | ||
return this | ||
@@ -684,9 +692,7 @@ } | ||
if (typeof this.close === 'function' && !this[CLOSED]) | ||
this.close() | ||
if (typeof this.close === 'function' && !this[CLOSED]) this.close() | ||
if (er) | ||
this.emit('error', er) | ||
else // if no error to emit, still reject pending promises | ||
this.emit(DESTROYED) | ||
if (er) this.emit('error', er) | ||
// if no error to emit, still reject pending promises | ||
else this.emit(DESTROYED) | ||
@@ -696,9 +702,12 @@ return this | ||
static isStream (s) { | ||
return !!s && (s instanceof Minipass || s instanceof Stream || | ||
s instanceof EE && ( | ||
typeof s.pipe === 'function' || // readable | ||
(typeof s.write === 'function' && typeof s.end === 'function') // writable | ||
)) | ||
static isStream(s) { | ||
return ( | ||
!!s && | ||
(s instanceof Minipass || | ||
s instanceof Stream || | ||
(s instanceof EE && | ||
(typeof s.pipe === 'function' || // readable | ||
(typeof s.write === 'function' && typeof s.end === 'function')))) // writable | ||
) | ||
} | ||
} |
{ | ||
"name": "minipass", | ||
"version": "4.0.1", | ||
"version": "4.0.2", | ||
"description": "minimal implementation of a PassThrough stream", | ||
@@ -22,3 +22,4 @@ "main": "index.js", | ||
"postpublish": "git push origin --follow-tags", | ||
"typedoc": "typedoc ./index.d.ts" | ||
"typedoc": "typedoc ./index.d.ts", | ||
"format": "prettier --write . --loglevel warn" | ||
}, | ||
@@ -25,0 +26,0 @@ "repository": { |
246
README.md
@@ -17,16 +17,16 @@ # minipass | ||
from this stream via `'data'` events or by calling `pipe()` into some other | ||
stream. Calling `read()` requires the buffer to be flattened in some | ||
stream. Calling `read()` requires the buffer to be flattened in some | ||
cases, which requires copying memory. | ||
If you set `objectMode: true` in the options, then whatever is written will | ||
be emitted. Otherwise, it'll do a minimal amount of Buffer copying to | ||
be emitted. Otherwise, it'll do a minimal amount of Buffer copying to | ||
ensure proper Streams semantics when `read(n)` is called. | ||
`objectMode` can also be set by doing `stream.objectMode = true`, or by | ||
writing any non-string/non-buffer data. `objectMode` cannot be set to | ||
writing any non-string/non-buffer data. `objectMode` cannot be set to | ||
false once it is set. | ||
This is not a `through` or `through2` stream. It doesn't transform the | ||
data, it just passes it right through. If you want to transform the data, | ||
extend the class, and override the `write()` method. Once you're done | ||
This is not a `through` or `through2` stream. It doesn't transform the | ||
data, it just passes it right through. If you want to transform the data, | ||
extend the class, and override the `write()` method. Once you're done | ||
transforming the data however you want, call `super.write()` with the | ||
@@ -70,5 +70,5 @@ transform output. | ||
Minipass streams are designed to support synchronous use-cases. Thus, data | ||
is emitted as soon as it is available, always. It is buffered until read, | ||
but no longer. Another way to look at it is that Minipass streams are | ||
Minipass streams are designed to support synchronous use-cases. Thus, data | ||
is emitted as soon as it is available, always. It is buffered until read, | ||
but no longer. Another way to look at it is that Minipass streams are | ||
exactly as synchronous as the logic that writes into them. | ||
@@ -82,3 +82,3 @@ | ||
Minipass to achieve the speeds it does, or support the synchronous use | ||
cases that it does. Simply put, waiting takes time. | ||
cases that it does. Simply put, waiting takes time. | ||
@@ -125,3 +125,3 @@ This non-deferring approach makes Minipass streams much easier to reason | ||
Switching _out_ of async mode is unsafe, as it could cause data | ||
corruption, and so is not enabled. Example: | ||
corruption, and so is not enabled. Example: | ||
@@ -173,3 +173,3 @@ ```js | ||
Minipass streams are much simpler. The `write()` method will return `true` | ||
Minipass streams are much simpler. The `write()` method will return `true` | ||
if the data has somewhere to go (which is to say, given the timing | ||
@@ -190,3 +190,3 @@ guarantees, that the data is already there by the time `write()` returns). | ||
whether the data was fully flushed, backpressure is communicated | ||
immediately to the upstream caller. This minimizes buffering. | ||
immediately to the upstream caller. This minimizes buffering. | ||
@@ -196,3 +196,3 @@ Consider this case: | ||
```js | ||
const {PassThrough} = require('stream') | ||
const { PassThrough } = require('stream') | ||
const p1 = new PassThrough({ highWaterMark: 1024 }) | ||
@@ -226,3 +226,3 @@ const p2 = new PassThrough({ highWaterMark: 1024 }) | ||
the code to think an advisory maximum of 1KiB is being set for the | ||
pipeline. However, the actual advisory buffering level is the _sum_ of | ||
pipeline. However, the actual advisory buffering level is the _sum_ of | ||
`highWaterMark` values, since each one has its own bucket. | ||
@@ -254,3 +254,3 @@ | ||
It is extremely unlikely that you _don't_ want to buffer any data written, | ||
or _ever_ buffer data that can be flushed all the way through. Neither | ||
or _ever_ buffer data that can be flushed all the way through. Neither | ||
node-core streams nor Minipass ever fail to buffer written data, but | ||
@@ -278,3 +278,3 @@ node-core streams do a lot of unnecessary buffering and pausing. | ||
One hazard of immediately emitting `'end'` is that you may not yet have had | ||
a chance to add a listener. In order to avoid this hazard, Minipass | ||
a chance to add a listener. In order to avoid this hazard, Minipass | ||
streams safely re-emit the `'end'` event if a new listener is added after | ||
@@ -284,3 +284,3 @@ `'end'` has been emitted. | ||
Ie, if you do `stream.on('end', someFunction)`, and the stream has already | ||
emitted `end`, then it will call the handler right away. (You can think of | ||
emitted `end`, then it will call the handler right away. (You can think of | ||
this somewhat like attaching a new `.then(fn)` to a previously-resolved | ||
@@ -296,3 +296,3 @@ Promise.) | ||
The most recent error object passed to the `'error'` event is | ||
stored on the stream. If a new `'error'` event handler is added, | ||
stored on the stream. If a new `'error'` event handler is added, | ||
and an error was previously emitted, then the event handler will | ||
@@ -343,3 +343,3 @@ be called immediately (or on `process.nextTick` in the case of | ||
The same caveat applies to `on('data')` event listeners. The first one | ||
The same caveat applies to `on('data')` event listeners. The first one | ||
added will _immediately_ receive all of the data, leaving nothing for the | ||
@@ -370,3 +370,3 @@ second: | ||
async: true }` in the Minipass constructor, or by setting | ||
`stream.async = true` afterwards. Note that this does add some | ||
`stream.async = true` afterwards. Note that this does add some | ||
overhead, so should only be done in cases where you are willing | ||
@@ -378,3 +378,3 @@ to lose a bit of performance in order to avoid having to refactor | ||
It's a stream! Use it like a stream and it'll most likely do what you | ||
It's a stream! Use it like a stream and it'll most likely do what you | ||
want. | ||
@@ -392,12 +392,12 @@ | ||
* `encoding` How would you like the data coming _out_ of the stream to be | ||
encoded? Accepts any values that can be passed to `Buffer.toString()`. | ||
* `objectMode` Emit data exactly as it comes in. This will be flipped on | ||
- `encoding` How would you like the data coming _out_ of the stream to be | ||
encoded? Accepts any values that can be passed to `Buffer.toString()`. | ||
- `objectMode` Emit data exactly as it comes in. This will be flipped on | ||
by default if you write() something other than a string or Buffer at any | ||
point. Setting `objectMode: true` will prevent setting any encoding | ||
point. Setting `objectMode: true` will prevent setting any encoding | ||
value. | ||
* `async` Defaults to `false`. Set to `true` to defer data | ||
emission until next tick. This reduces performance slightly, | ||
- `async` Defaults to `false`. Set to `true` to defer data | ||
emission until next tick. This reduces performance slightly, | ||
but makes Minipass streams use timing behavior closer to Node | ||
core streams. See [Timing](#timing) for more details. | ||
core streams. See [Timing](#timing) for more details. | ||
@@ -411,48 +411,48 @@ ### API | ||
* `write(chunk, [encoding], [callback])` - Put data in. (Note that, in the | ||
base Minipass class, the same data will come out.) Returns `false` if | ||
- `write(chunk, [encoding], [callback])` - Put data in. (Note that, in the | ||
base Minipass class, the same data will come out.) Returns `false` if | ||
the stream will buffer the next write, or true if it's still in "flowing" | ||
mode. | ||
* `end([chunk, [encoding]], [callback])` - Signal that you have no more | ||
data to write. This will queue an `end` event to be fired when all the | ||
- `end([chunk, [encoding]], [callback])` - Signal that you have no more | ||
data to write. This will queue an `end` event to be fired when all the | ||
data has been consumed. | ||
* `setEncoding(encoding)` - Set the encoding for data coming of the stream. | ||
- `setEncoding(encoding)` - Set the encoding for data coming of the stream. | ||
This can only be done once. | ||
* `pause()` - No more data for a while, please. This also prevents `end` | ||
- `pause()` - No more data for a while, please. This also prevents `end` | ||
from being emitted for empty streams until the stream is resumed. | ||
* `resume()` - Resume the stream. If there's data in the buffer, it is all | ||
discarded. Any buffered events are immediately emitted. | ||
* `pipe(dest)` - Send all output to the stream provided. When | ||
- `resume()` - Resume the stream. If there's data in the buffer, it is all | ||
discarded. Any buffered events are immediately emitted. | ||
- `pipe(dest)` - Send all output to the stream provided. When | ||
data is emitted, it is immediately written to any and all pipe | ||
destinations. (Or written on next tick in `async` mode.) | ||
* `unpipe(dest)` - Stop piping to the destination stream. This | ||
destinations. (Or written on next tick in `async` mode.) | ||
- `unpipe(dest)` - Stop piping to the destination stream. This | ||
is immediate, meaning that any asynchronously queued data will | ||
_not_ make it to the destination when running in `async` mode. | ||
* `options.end` - Boolean, end the destination stream when | ||
the source stream ends. Default `true`. | ||
* `options.proxyErrors` - Boolean, proxy `error` events from | ||
the source stream to the destination stream. Note that | ||
errors are _not_ proxied after the pipeline terminates, | ||
either due to the source emitting `'end'` or manually | ||
unpiping with `src.unpipe(dest)`. Default `false`. | ||
* `on(ev, fn)`, `emit(ev, fn)` - Minipass streams are EventEmitters. Some | ||
events are given special treatment, however. (See below under "events".) | ||
* `promise()` - Returns a Promise that resolves when the stream emits | ||
- `options.end` - Boolean, end the destination stream when | ||
the source stream ends. Default `true`. | ||
- `options.proxyErrors` - Boolean, proxy `error` events from | ||
the source stream to the destination stream. Note that | ||
errors are _not_ proxied after the pipeline terminates, | ||
either due to the source emitting `'end'` or manually | ||
unpiping with `src.unpipe(dest)`. Default `false`. | ||
- `on(ev, fn)`, `emit(ev, fn)` - Minipass streams are EventEmitters. Some | ||
events are given special treatment, however. (See below under "events".) | ||
- `promise()` - Returns a Promise that resolves when the stream emits | ||
`end`, or rejects if the stream emits `error`. | ||
* `collect()` - Return a Promise that resolves on `end` with an array | ||
- `collect()` - Return a Promise that resolves on `end` with an array | ||
containing each chunk of data that was emitted, or rejects if the stream | ||
emits `error`. Note that this consumes the stream data. | ||
* `concat()` - Same as `collect()`, but concatenates the data into a single | ||
Buffer object. Will reject the returned promise if the stream is in | ||
emits `error`. Note that this consumes the stream data. | ||
- `concat()` - Same as `collect()`, but concatenates the data into a single | ||
Buffer object. Will reject the returned promise if the stream is in | ||
objectMode, or if it goes into objectMode by the end of the data. | ||
* `read(n)` - Consume `n` bytes of data out of the buffer. If `n` is not | ||
provided, then consume all of it. If `n` bytes are not available, then | ||
it returns null. **Note** consuming streams in this way is less | ||
- `read(n)` - Consume `n` bytes of data out of the buffer. If `n` is not | ||
provided, then consume all of it. If `n` bytes are not available, then | ||
it returns null. **Note** consuming streams in this way is less | ||
efficient, and can lead to unnecessary Buffer copying. | ||
* `destroy([er])` - Destroy the stream. If an error is provided, then an | ||
`'error'` event is emitted. If the stream has a `close()` method, and | ||
- `destroy([er])` - Destroy the stream. If an error is provided, then an | ||
`'error'` event is emitted. If the stream has a `close()` method, and | ||
has not emitted a `'close'` event yet, then `stream.close()` will be | ||
called. Any Promises returned by `.promise()`, `.collect()` or | ||
`.concat()` will be rejected. After being destroyed, writing to the | ||
stream will emit an error. No more data will be emitted if the stream is | ||
called. Any Promises returned by `.promise()`, `.collect()` or | ||
`.concat()` will be rejected. After being destroyed, writing to the | ||
stream will emit an error. No more data will be emitted if the stream is | ||
destroyed, even if it was previously buffered. | ||
@@ -462,25 +462,25 @@ | ||
* `bufferLength` Read-only. Total number of bytes buffered, or in the case | ||
- `bufferLength` Read-only. Total number of bytes buffered, or in the case | ||
of objectMode, the total number of objects. | ||
* `encoding` The encoding that has been set. (Setting this is equivalent | ||
- `encoding` The encoding that has been set. (Setting this is equivalent | ||
to calling `setEncoding(enc)` and has the same prohibition against | ||
setting multiple times.) | ||
* `flowing` Read-only. Boolean indicating whether a chunk written to the | ||
- `flowing` Read-only. Boolean indicating whether a chunk written to the | ||
stream will be immediately emitted. | ||
* `emittedEnd` Read-only. Boolean indicating whether the end-ish events | ||
(ie, `end`, `prefinish`, `finish`) have been emitted. Note that | ||
- `emittedEnd` Read-only. Boolean indicating whether the end-ish events | ||
(ie, `end`, `prefinish`, `finish`) have been emitted. Note that | ||
listening on any end-ish event will immediateyl re-emit it if it has | ||
already been emitted. | ||
* `writable` Whether the stream is writable. Default `true`. Set to | ||
- `writable` Whether the stream is writable. Default `true`. Set to | ||
`false` when `end()` | ||
* `readable` Whether the stream is readable. Default `true`. | ||
* `buffer` A [yallist](http://npm.im/yallist) linked list of chunks written | ||
to the stream that have not yet been emitted. (It's probably a bad idea | ||
- `readable` Whether the stream is readable. Default `true`. | ||
- `buffer` A [yallist](http://npm.im/yallist) linked list of chunks written | ||
to the stream that have not yet been emitted. (It's probably a bad idea | ||
to mess with this.) | ||
* `pipes` A [yallist](http://npm.im/yallist) linked list of streams that | ||
this stream is piping into. (It's probably a bad idea to mess with | ||
- `pipes` A [yallist](http://npm.im/yallist) linked list of streams that | ||
this stream is piping into. (It's probably a bad idea to mess with | ||
this.) | ||
* `destroyed` A getter that indicates whether the stream was destroyed. | ||
* `paused` True if the stream has been explicitly paused, otherwise false. | ||
* `objectMode` Indicates whether the stream is in `objectMode`. Once set | ||
- `destroyed` A getter that indicates whether the stream was destroyed. | ||
- `paused` True if the stream has been explicitly paused, otherwise false. | ||
- `objectMode` Indicates whether the stream is in `objectMode`. Once set | ||
to `true`, it cannot be set to `false`. | ||
@@ -490,24 +490,24 @@ | ||
* `data` Emitted when there's data to read. Argument is the data to read. | ||
This is never emitted while not flowing. If a listener is attached, that | ||
- `data` Emitted when there's data to read. Argument is the data to read. | ||
This is never emitted while not flowing. If a listener is attached, that | ||
will resume the stream. | ||
* `end` Emitted when there's no more data to read. This will be emitted | ||
immediately for empty streams when `end()` is called. If a listener is | ||
- `end` Emitted when there's no more data to read. This will be emitted | ||
immediately for empty streams when `end()` is called. If a listener is | ||
attached, and `end` was already emitted, then it will be emitted again. | ||
All listeners are removed when `end` is emitted. | ||
* `prefinish` An end-ish event that follows the same logic as `end` and is | ||
emitted in the same conditions where `end` is emitted. Emitted after | ||
- `prefinish` An end-ish event that follows the same logic as `end` and is | ||
emitted in the same conditions where `end` is emitted. Emitted after | ||
`'end'`. | ||
* `finish` An end-ish event that follows the same logic as `end` and is | ||
emitted in the same conditions where `end` is emitted. Emitted after | ||
- `finish` An end-ish event that follows the same logic as `end` and is | ||
emitted in the same conditions where `end` is emitted. Emitted after | ||
`'prefinish'`. | ||
* `close` An indication that an underlying resource has been released. | ||
- `close` An indication that an underlying resource has been released. | ||
Minipass does not emit this event, but will defer it until after `end` | ||
has been emitted, since it throws off some stream libraries otherwise. | ||
* `drain` Emitted when the internal buffer empties, and it is again | ||
- `drain` Emitted when the internal buffer empties, and it is again | ||
suitable to `write()` into the stream. | ||
* `readable` Emitted when data is buffered and ready to be read by a | ||
- `readable` Emitted when data is buffered and ready to be read by a | ||
consumer. | ||
* `resume` Emitted when stream changes state from buffering to flowing | ||
mode. (Ie, when `resume` is called, `pipe` is called, or a `data` event | ||
- `resume` Emitted when stream changes state from buffering to flowing | ||
mode. (Ie, when `resume` is called, `pipe` is called, or a `data` event | ||
listener is added.) | ||
@@ -517,6 +517,6 @@ | ||
* `Minipass.isStream(stream)` Returns `true` if the argument is a stream, | ||
and false otherwise. To be considered a stream, the object must be | ||
- `Minipass.isStream(stream)` Returns `true` if the argument is a stream, | ||
and false otherwise. To be considered a stream, the object must be | ||
either an instance of Minipass, or an EventEmitter that has either a | ||
`pipe()` method, or both `write()` and `end()` methods. (Pretty much any | ||
`pipe()` method, or both `write()` and `end()` methods. (Pretty much any | ||
stream in node-land will return `true` for this.) | ||
@@ -531,7 +531,10 @@ | ||
```js | ||
mp.promise().then(() => { | ||
// stream is finished | ||
}, er => { | ||
// stream emitted an error | ||
}) | ||
mp.promise().then( | ||
() => { | ||
// stream is finished | ||
}, | ||
er => { | ||
// stream emitted an error | ||
} | ||
) | ||
``` | ||
@@ -572,3 +575,3 @@ | ||
Synchronous iteration will end when the currently available data is | ||
consumed, even if the `end` event has not been reached. In string and | ||
consumed, even if the `end` event has not been reached. In string and | ||
buffer mode, the data is concatenated, so unless multiple writes are | ||
@@ -612,4 +615,3 @@ occurring in the same tick as the `read()`, sync iteration loops will | ||
const inter = setInterval(() => { | ||
if (i-- > 0) | ||
mp.write(Buffer.from('foo\n', 'utf8')) | ||
if (i-- > 0) mp.write(Buffer.from('foo\n', 'utf8')) | ||
else { | ||
@@ -622,3 +624,3 @@ mp.end() | ||
// consume the data with asynchronous iteration | ||
async function consume () { | ||
async function consume() { | ||
for await (let chunk of mp) { | ||
@@ -638,7 +640,7 @@ console.log(chunk) | ||
class Logger extends Minipass { | ||
write (chunk, encoding, callback) { | ||
write(chunk, encoding, callback) { | ||
console.log('WRITE', chunk, encoding) | ||
return super.write(chunk, encoding, callback) | ||
} | ||
end (chunk, encoding, callback) { | ||
end(chunk, encoding, callback) { | ||
console.log('END', chunk, encoding) | ||
@@ -657,17 +659,19 @@ return super.end(chunk, encoding, callback) | ||
someSource | ||
.pipe(new (class extends Minipass { | ||
emit (ev, ...data) { | ||
// let's also log events, because debugging some weird thing | ||
console.log('EMIT', ev) | ||
return super.emit(ev, ...data) | ||
} | ||
write (chunk, encoding, callback) { | ||
console.log('WRITE', chunk, encoding) | ||
return super.write(chunk, encoding, callback) | ||
} | ||
end (chunk, encoding, callback) { | ||
console.log('END', chunk, encoding) | ||
return super.end(chunk, encoding, callback) | ||
} | ||
})) | ||
.pipe( | ||
new (class extends Minipass { | ||
emit(ev, ...data) { | ||
// let's also log events, because debugging some weird thing | ||
console.log('EMIT', ev) | ||
return super.emit(ev, ...data) | ||
} | ||
write(chunk, encoding, callback) { | ||
console.log('WRITE', chunk, encoding) | ||
return super.write(chunk, encoding, callback) | ||
} | ||
end(chunk, encoding, callback) { | ||
console.log('END', chunk, encoding) | ||
return super.end(chunk, encoding, callback) | ||
} | ||
})() | ||
) | ||
.pipe(someDest) | ||
@@ -680,3 +684,3 @@ ``` | ||
class SlowEnd extends Minipass { | ||
emit (ev, ...args) { | ||
emit(ev, ...args) { | ||
if (ev === 'end') { | ||
@@ -699,3 +703,3 @@ console.log('going to end, hold on a sec') | ||
class NDJSONEncode extends Minipass { | ||
write (obj, cb) { | ||
write(obj, cb) { | ||
try { | ||
@@ -708,3 +712,3 @@ // JSON.stringify can throw, emit an error on that | ||
} | ||
end (obj, cb) { | ||
end(obj, cb) { | ||
if (typeof obj === 'function') { | ||
@@ -711,0 +715,0 @@ cb = obj |
Sorry, the diff of this file is not supported yet
48947
698
733