Comparing version 3.1.6 to 3.2.0
257
index.js
@@ -8,3 +8,2 @@ 'use strict' | ||
const Stream = require('stream') | ||
const Yallist = require('yallist') | ||
const SD = require('string_decoder').StringDecoder | ||
@@ -31,3 +30,8 @@ | ||
const DESTROYED = Symbol('destroyed') | ||
const EMITDATA = Symbol('emitData') | ||
const EMITEND = Symbol('emitData') | ||
const ASYNC = Symbol('async') | ||
const defer = fn => process.nextTick(fn) | ||
// TODO remove when Node v8 support drops | ||
@@ -56,2 +60,17 @@ const doIter = global._MP_NO_ITERATOR_SYMBOLS_ !== '1' | ||
class Pipe { | ||
constructor (src, dest, opts) { | ||
this.dest = dest | ||
this.opts = opts | ||
this.ondrain = () => src[RESUME]() | ||
dest.on('drain', this.ondrain) | ||
} | ||
end () { | ||
if (this.opts.end) | ||
this.dest.end() | ||
this.dest.removeListener('drain', this.ondrain) | ||
} | ||
} | ||
module.exports = class Minipass extends Stream { | ||
@@ -63,4 +82,4 @@ constructor (options) { | ||
this[PAUSED] = false | ||
this.pipes = new Yallist() | ||
this.buffer = new Yallist() | ||
this.pipes = [] | ||
this.buffer = [] | ||
this[OBJECTMODE] = options && options.objectMode || false | ||
@@ -73,2 +92,3 @@ if (this[OBJECTMODE]) | ||
this[ENCODING] = null | ||
this[ASYNC] = options && !!options.async || false | ||
this[DECODER] = this[ENCODING] ? new SD(this[ENCODING]) : null | ||
@@ -113,2 +133,5 @@ this[EOF] = false | ||
get ['async'] () { return this[ASYNC] } | ||
set ['async'] (a) { this[ASYNC] = this[ASYNC] || !!a } | ||
write (chunk, encoding, cb) { | ||
@@ -132,2 +155,4 @@ if (this[EOF]) | ||
const fn = this[ASYNC] ? defer : f => f() | ||
// convert array buffers and typed array views into buffers | ||
@@ -147,9 +172,30 @@ // at some point in the future, we may want to do the opposite! | ||
// this ensures at this point that the chunk is a buffer or string | ||
// handle object mode up front, since it's simpler | ||
// this yields better performance, fewer checks later. | ||
if (this[OBJECTMODE]) { | ||
/* istanbul ignore if - maybe impossible? */ | ||
if (this.flowing && this[BUFFERLENGTH] !== 0) | ||
this[FLUSH](true) | ||
if (this.flowing) | ||
this.emit('data', chunk) | ||
else | ||
this[BUFFERPUSH](chunk) | ||
if (this[BUFFERLENGTH] !== 0) | ||
this.emit('readable') | ||
if (cb) | ||
fn(cb) | ||
return this.flowing | ||
} | ||
// at this point the chunk is a buffer or string | ||
// don't buffer it up or send it to the decoder | ||
if (!this.objectMode && !chunk.length) { | ||
if (!chunk.length) { | ||
if (this[BUFFERLENGTH] !== 0) | ||
this.emit('readable') | ||
if (cb) | ||
cb() | ||
fn(cb) | ||
return this.flowing | ||
@@ -160,3 +206,3 @@ } | ||
// an empty buffer, skipping the buffer/decoder dance | ||
if (typeof chunk === 'string' && !this[OBJECTMODE] && | ||
if (typeof chunk === 'string' && | ||
// unless it is a string already ready for us to use | ||
@@ -170,16 +216,9 @@ !(encoding === this[ENCODING] && !this[DECODER].lastNeed)) { | ||
if (this.flowing) { | ||
// if we somehow have something in the buffer, but we think we're | ||
// flowing, then we need to flush all that out first, or we get | ||
// chunks coming in out of order. Can't emit 'drain' here though, | ||
// because we're mid-write, so that'd be bad. | ||
if (this[BUFFERLENGTH] !== 0) | ||
this[FLUSH](true) | ||
// Note: flushing CAN potentially switch us into not-flowing mode | ||
if (this.flowing && this[BUFFERLENGTH] !== 0) | ||
this[FLUSH](true) | ||
// if we are still flowing after flushing the buffer we can emit the | ||
// chunk otherwise we have to buffer it. | ||
this.flowing | ||
? this.emit('data', chunk) | ||
: this[BUFFERPUSH](chunk) | ||
} else | ||
if (this.flowing) | ||
this.emit('data', chunk) | ||
else | ||
this[BUFFERPUSH](chunk) | ||
@@ -191,3 +230,3 @@ | ||
if (cb) | ||
cb() | ||
fn(cb) | ||
@@ -201,24 +240,20 @@ return this.flowing | ||
try { | ||
if (this[BUFFERLENGTH] === 0 || n === 0 || n > this[BUFFERLENGTH]) | ||
return null | ||
if (this[BUFFERLENGTH] === 0 || n === 0 || n > this[BUFFERLENGTH]) { | ||
this[MAYBE_EMIT_END]() | ||
return null | ||
} | ||
if (this[OBJECTMODE]) | ||
n = null | ||
if (this[OBJECTMODE]) | ||
n = null | ||
if (this.buffer.length > 1 && !this[OBJECTMODE]) { | ||
if (this.encoding) | ||
this.buffer = new Yallist([ | ||
Array.from(this.buffer).join('') | ||
]) | ||
else | ||
this.buffer = new Yallist([ | ||
Buffer.concat(Array.from(this.buffer), this[BUFFERLENGTH]) | ||
]) | ||
} | ||
if (this.buffer.length > 1 && !this[OBJECTMODE]) { | ||
if (this.encoding) | ||
this.buffer = [this.buffer.join('')] | ||
else | ||
this.buffer = [Buffer.concat(this.buffer, this[BUFFERLENGTH])] | ||
} | ||
return this[READ](n || null, this.buffer.head.value) | ||
} finally { | ||
this[MAYBE_EMIT_END]() | ||
} | ||
const ret = this[READ](n || null, this.buffer[0]) | ||
this[MAYBE_EMIT_END]() | ||
return ret | ||
} | ||
@@ -230,3 +265,3 @@ | ||
else { | ||
this.buffer.head.value = chunk.slice(n) | ||
this.buffer[0] = chunk.slice(n) | ||
chunk = chunk.slice(0, n) | ||
@@ -307,3 +342,3 @@ this[BUFFERLENGTH] -= n | ||
this[BUFFERLENGTH] += chunk.length | ||
return this.buffer.push(chunk) | ||
this.buffer.push(chunk) | ||
} | ||
@@ -316,3 +351,3 @@ | ||
else | ||
this[BUFFERLENGTH] -= this.buffer.head.value.length | ||
this[BUFFERLENGTH] -= this.buffer[0].length | ||
} | ||
@@ -344,10 +379,14 @@ return this.buffer.shift() | ||
const p = { dest: dest, opts: opts, ondrain: _ => this[RESUME]() } | ||
this.pipes.push(p) | ||
// piping an ended stream ends immediately | ||
if (ended) { | ||
if (opts.end) | ||
dest.end() | ||
} else { | ||
this.pipes.push(new Pipe(this, dest, opts)) | ||
if (this[ASYNC]) | ||
defer(() => this[RESUME]()) | ||
else | ||
this[RESUME]() | ||
} | ||
dest.on('drain', p.ondrain) | ||
this[RESUME]() | ||
// piping an ended stream ends immediately | ||
if (ended && p.opts.end) | ||
p.dest.end() | ||
return dest | ||
@@ -361,14 +400,17 @@ } | ||
on (ev, fn) { | ||
try { | ||
return super.on(ev, fn) | ||
} finally { | ||
if (ev === 'data' && !this.pipes.length && !this.flowing) | ||
this[RESUME]() | ||
else if (isEndish(ev) && this[EMITTED_END]) { | ||
super.emit(ev) | ||
this.removeAllListeners(ev) | ||
} else if (ev === 'error' && this[EMITTED_ERROR]) { | ||
const ret = super.on(ev, fn) | ||
if (ev === 'data' && !this.pipes.length && !this.flowing) | ||
this[RESUME]() | ||
else if (ev === 'readable' && this[BUFFERLENGTH] !== 0) | ||
super.emit('readable') | ||
else if (isEndish(ev) && this[EMITTED_END]) { | ||
super.emit(ev) | ||
this.removeAllListeners(ev) | ||
} else if (ev === 'error' && this[EMITTED_ERROR]) { | ||
if (this[ASYNC]) | ||
defer(() => fn.call(this, this[EMITTED_ERROR])) | ||
else | ||
fn.call(this, this[EMITTED_ERROR]) | ||
} | ||
} | ||
return ret | ||
} | ||
@@ -396,3 +438,3 @@ | ||
emit (ev, data) { | ||
emit (ev, data, ...extra) { | ||
// error and close are only events allowed after calling destroy() | ||
@@ -402,29 +444,10 @@ if (ev !== 'error' && ev !== 'close' && ev !== DESTROYED && this[DESTROYED]) | ||
else if (ev === 'data') { | ||
if (!data) | ||
return | ||
if (this.pipes.length) | ||
this.pipes.forEach(p => | ||
p.dest.write(data) === false && this.pause()) | ||
return !data ? false | ||
: this[ASYNC] ? defer(() => this[EMITDATA](data)) | ||
: this[EMITDATA](data) | ||
} else if (ev === 'end') { | ||
// only actual end gets this treatment | ||
if (this[EMITTED_END] === true) | ||
return | ||
this[EMITTED_END] = true | ||
this.readable = false | ||
if (this[DECODER]) { | ||
data = this[DECODER].end() | ||
if (data) { | ||
this.pipes.forEach(p => p.dest.write(data)) | ||
super.emit('data', data) | ||
} | ||
} | ||
this.pipes.forEach(p => { | ||
p.dest.removeListener('drain', p.ondrain) | ||
if (p.opts.end) | ||
p.dest.end() | ||
}) | ||
return this[EMITTED_END] ? false | ||
: this[ASYNC] ? defer(() => this[EMITEND]()) | ||
: this[EMITEND]() | ||
} else if (ev === 'close') { | ||
@@ -435,24 +458,56 @@ this[CLOSED] = true | ||
return | ||
const ret = super.emit('close') | ||
this.removeAllListeners('close') | ||
return ret | ||
} else if (ev === 'error') { | ||
this[EMITTED_ERROR] = data | ||
const ret = super.emit('error', data) | ||
this[MAYBE_EMIT_END]() | ||
return ret | ||
} else if (ev === 'resume') { | ||
const ret = super.emit('resume') | ||
this[MAYBE_EMIT_END]() | ||
return ret | ||
} else if (ev === 'finish' || ev === 'prefinish') { | ||
const ret = super.emit(ev) | ||
this.removeAllListeners(ev) | ||
return ret | ||
} | ||
// TODO: replace with a spread operator when Node v4 support drops | ||
const args = new Array(arguments.length) | ||
args[0] = ev | ||
args[1] = data | ||
if (arguments.length > 2) { | ||
for (let i = 2; i < arguments.length; i++) { | ||
args[i] = arguments[i] | ||
// Some other unknown event | ||
const ret = super.emit(ev, data, ...extra) | ||
this[MAYBE_EMIT_END]() | ||
return ret | ||
} | ||
[EMITDATA] (data) { | ||
for (const p of this.pipes) { | ||
if (p.dest.write(data) === false) | ||
this.pause() | ||
} | ||
const ret = super.emit('data', data) | ||
this[MAYBE_EMIT_END]() | ||
return ret | ||
} | ||
[EMITEND] () { | ||
this[EMITTED_END] = true | ||
this.readable = false | ||
if (this[DECODER]) { | ||
const data = this[DECODER].end() | ||
if (data) { | ||
for (const p of this.pipes) { | ||
p.dest.write(data) | ||
} | ||
super.emit('data', data) | ||
} | ||
} | ||
try { | ||
return super.emit.apply(this, args) | ||
} finally { | ||
if (!isEndish(ev)) | ||
this[MAYBE_EMIT_END]() | ||
else | ||
this.removeAllListeners(ev) | ||
for (const p of this.pipes) { | ||
p.end() | ||
} | ||
const ret = super.emit('end') | ||
this.removeAllListeners('end') | ||
return ret | ||
} | ||
@@ -559,3 +614,3 @@ | ||
// throw away all buffered data, it's never coming out | ||
this.buffer = new Yallist() | ||
this.buffer.length = 0 | ||
this[BUFFERLENGTH] = 0 | ||
@@ -562,0 +617,0 @@ |
{ | ||
"name": "minipass", | ||
"version": "3.1.6", | ||
"version": "3.2.0", | ||
"description": "minimal implementation of a PassThrough stream", | ||
@@ -11,3 +11,3 @@ "main": "index.js", | ||
"end-of-stream": "^1.4.0", | ||
"tap": "^15.0.9", | ||
"tap": "^16.2.0", | ||
"through2": "^2.0.3" | ||
@@ -14,0 +14,0 @@ }, |
106
README.md
@@ -66,2 +66,6 @@ # minipass | ||
You can avoid most of these differences entirely (for a very | ||
small performance penalty) by setting `{async: true}` in the | ||
constructor options. | ||
### Timing | ||
@@ -86,2 +90,78 @@ | ||
Example: | ||
```js | ||
const Minipass = require('minipass') | ||
const stream = new Minipass({ async: true }) | ||
stream.on('data', () => console.log('data event')) | ||
console.log('before write') | ||
stream.write('hello') | ||
console.log('after write') | ||
// output: | ||
// before write | ||
// data event | ||
// after write | ||
``` | ||
### Exception: Async Opt-In | ||
If you wish to have a Minipass stream with behavior that more | ||
closely mimics Node.js core streams, you can set the stream in | ||
async mode either by setting `async: true` in the constructor | ||
options, or by setting `stream.async = true` later on. | ||
```js | ||
const Minipass = require('minipass') | ||
const asyncStream = new Minipass({ async: true }) | ||
asyncStream.on('data', () => console.log('data event')) | ||
console.log('before write') | ||
asyncStream.write('hello') | ||
console.log('after write') | ||
// output: | ||
// before write | ||
// after write | ||
// data event <-- this is deferred until the next tick | ||
``` | ||
Switching _out_ of async mode is unsafe, as it could cause data | ||
corruption, and so is not enabled. Example: | ||
```js | ||
const Minipass = require('minipass') | ||
const stream = new Minipass({ encoding: 'utf8' }) | ||
stream.on('data', chunk => console.log(chunk)) | ||
stream.async = true | ||
console.log('before writes') | ||
stream.write('hello') | ||
setStreamSyncAgainSomehow(stream) // <-- this doesn't actually exist! | ||
stream.write('world') | ||
console.log('after writes') | ||
// hypothetical output would be: | ||
// before writes | ||
// world | ||
// after writes | ||
// hello | ||
// NOT GOOD! | ||
``` | ||
To avoid this problem, once set into async mode, any attempt to | ||
make the stream sync again will be ignored. | ||
```js | ||
const Minipass = require('minipass') | ||
const stream = new Minipass({ encoding: 'utf8' }) | ||
stream.on('data', chunk => console.log(chunk)) | ||
stream.async = true | ||
console.log('before writes') | ||
stream.write('hello') | ||
stream.async = false // <-- no-op, stream already async | ||
stream.write('world') | ||
console.log('after writes') | ||
// actual output: | ||
// before writes | ||
// after writes | ||
// hello | ||
// world | ||
``` | ||
### No High/Low Water Marks | ||
@@ -102,2 +182,5 @@ | ||
Since nothing is ever buffered unnecessarily, there is much less | ||
copying data, and less bookkeeping about buffer capacity levels. | ||
### Hazards of Buffering (or: Why Minipass Is So Fast) | ||
@@ -187,2 +270,4 @@ | ||
However, this is _usually_ not a problem because: | ||
### Emit `end` When Asked | ||
@@ -204,2 +289,14 @@ | ||
### Emit `error` When Asked | ||
The most recent error object passed to the `'error'` event is | ||
stored on the stream. If a new `'error'` event handler is added, | ||
and an error was previously emitted, then the event handler will | ||
be called immediately (or on `process.nextTick` in the case of | ||
async streams). | ||
This makes it much more difficult to end up trying to interact | ||
with a broken stream, if the error handler is added after an | ||
error was previously emitted. | ||
### Impact of "immediate flow" on Tee-streams | ||
@@ -229,3 +326,3 @@ | ||
The solution is to create a dedicated tee-stream junction that pipes to | ||
One solution is to create a dedicated tee-stream junction that pipes to | ||
both locations, and then pipe to _that_ instead. | ||
@@ -267,2 +364,9 @@ | ||
All of the hazards in this section are avoided by setting `{ | ||
async: true }` in the Minipass constructor, or by setting | ||
`stream.async = true` afterwards. Note that this does add some | ||
overhead, so should only be done in cases where you are willing | ||
to lose a bit of performance in order to avoid having to refactor | ||
program logic. | ||
## USAGE | ||
@@ -269,0 +373,0 @@ |
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
41954
527
718