Join our webinar on Wednesday, June 26, at 1pm EDTHow Chia Mitigates Risk in the Crypto Industry.Register
Socket
Socket
Sign inDemoInstall

minipass

Package Overview
Dependencies
1
Maintainers
7
Versions
94
Alerts
File Explorer

Advanced tools

Install Socket

Detect and block malicious and high-risk dependencies

Install

Comparing version 3.1.6 to 3.2.0

257

index.js

@@ -8,3 +8,2 @@ 'use strict'

const Stream = require('stream')
const Yallist = require('yallist')
const SD = require('string_decoder').StringDecoder

@@ -31,3 +30,8 @@

const DESTROYED = Symbol('destroyed')
const EMITDATA = Symbol('emitData')
const EMITEND = Symbol('emitData')
const ASYNC = Symbol('async')
const defer = fn => process.nextTick(fn)
// TODO remove when Node v8 support drops

@@ -56,2 +60,17 @@ const doIter = global._MP_NO_ITERATOR_SYMBOLS_ !== '1'

class Pipe {
constructor (src, dest, opts) {
this.dest = dest
this.opts = opts
this.ondrain = () => src[RESUME]()
dest.on('drain', this.ondrain)
}
end () {
if (this.opts.end)
this.dest.end()
this.dest.removeListener('drain', this.ondrain)
}
}
module.exports = class Minipass extends Stream {

@@ -63,4 +82,4 @@ constructor (options) {

this[PAUSED] = false
this.pipes = new Yallist()
this.buffer = new Yallist()
this.pipes = []
this.buffer = []
this[OBJECTMODE] = options && options.objectMode || false

@@ -73,2 +92,3 @@ if (this[OBJECTMODE])

this[ENCODING] = null
this[ASYNC] = options && !!options.async || false
this[DECODER] = this[ENCODING] ? new SD(this[ENCODING]) : null

@@ -113,2 +133,5 @@ this[EOF] = false

get ['async'] () { return this[ASYNC] }
set ['async'] (a) { this[ASYNC] = this[ASYNC] || !!a }
write (chunk, encoding, cb) {

@@ -132,2 +155,4 @@ if (this[EOF])

const fn = this[ASYNC] ? defer : f => f()
// convert array buffers and typed array views into buffers

@@ -147,9 +172,30 @@ // at some point in the future, we may want to do the opposite!

// this ensures at this point that the chunk is a buffer or string
// handle object mode up front, since it's simpler
// this yields better performance, fewer checks later.
if (this[OBJECTMODE]) {
/* istanbul ignore if - maybe impossible? */
if (this.flowing && this[BUFFERLENGTH] !== 0)
this[FLUSH](true)
if (this.flowing)
this.emit('data', chunk)
else
this[BUFFERPUSH](chunk)
if (this[BUFFERLENGTH] !== 0)
this.emit('readable')
if (cb)
fn(cb)
return this.flowing
}
// at this point the chunk is a buffer or string
// don't buffer it up or send it to the decoder
if (!this.objectMode && !chunk.length) {
if (!chunk.length) {
if (this[BUFFERLENGTH] !== 0)
this.emit('readable')
if (cb)
cb()
fn(cb)
return this.flowing

@@ -160,3 +206,3 @@ }

// an empty buffer, skipping the buffer/decoder dance
if (typeof chunk === 'string' && !this[OBJECTMODE] &&
if (typeof chunk === 'string' &&
// unless it is a string already ready for us to use

@@ -170,16 +216,9 @@ !(encoding === this[ENCODING] && !this[DECODER].lastNeed)) {

if (this.flowing) {
// if we somehow have something in the buffer, but we think we're
// flowing, then we need to flush all that out first, or we get
// chunks coming in out of order. Can't emit 'drain' here though,
// because we're mid-write, so that'd be bad.
if (this[BUFFERLENGTH] !== 0)
this[FLUSH](true)
// Note: flushing CAN potentially switch us into not-flowing mode
if (this.flowing && this[BUFFERLENGTH] !== 0)
this[FLUSH](true)
// if we are still flowing after flushing the buffer we can emit the
// chunk otherwise we have to buffer it.
this.flowing
? this.emit('data', chunk)
: this[BUFFERPUSH](chunk)
} else
if (this.flowing)
this.emit('data', chunk)
else
this[BUFFERPUSH](chunk)

@@ -191,3 +230,3 @@

if (cb)
cb()
fn(cb)

@@ -201,24 +240,20 @@ return this.flowing

try {
if (this[BUFFERLENGTH] === 0 || n === 0 || n > this[BUFFERLENGTH])
return null
if (this[BUFFERLENGTH] === 0 || n === 0 || n > this[BUFFERLENGTH]) {
this[MAYBE_EMIT_END]()
return null
}
if (this[OBJECTMODE])
n = null
if (this[OBJECTMODE])
n = null
if (this.buffer.length > 1 && !this[OBJECTMODE]) {
if (this.encoding)
this.buffer = new Yallist([
Array.from(this.buffer).join('')
])
else
this.buffer = new Yallist([
Buffer.concat(Array.from(this.buffer), this[BUFFERLENGTH])
])
}
if (this.buffer.length > 1 && !this[OBJECTMODE]) {
if (this.encoding)
this.buffer = [this.buffer.join('')]
else
this.buffer = [Buffer.concat(this.buffer, this[BUFFERLENGTH])]
}
return this[READ](n || null, this.buffer.head.value)
} finally {
this[MAYBE_EMIT_END]()
}
const ret = this[READ](n || null, this.buffer[0])
this[MAYBE_EMIT_END]()
return ret
}

@@ -230,3 +265,3 @@

else {
this.buffer.head.value = chunk.slice(n)
this.buffer[0] = chunk.slice(n)
chunk = chunk.slice(0, n)

@@ -307,3 +342,3 @@ this[BUFFERLENGTH] -= n

this[BUFFERLENGTH] += chunk.length
return this.buffer.push(chunk)
this.buffer.push(chunk)
}

@@ -316,3 +351,3 @@

else
this[BUFFERLENGTH] -= this.buffer.head.value.length
this[BUFFERLENGTH] -= this.buffer[0].length
}

@@ -344,10 +379,14 @@ return this.buffer.shift()

const p = { dest: dest, opts: opts, ondrain: _ => this[RESUME]() }
this.pipes.push(p)
// piping an ended stream ends immediately
if (ended) {
if (opts.end)
dest.end()
} else {
this.pipes.push(new Pipe(this, dest, opts))
if (this[ASYNC])
defer(() => this[RESUME]())
else
this[RESUME]()
}
dest.on('drain', p.ondrain)
this[RESUME]()
// piping an ended stream ends immediately
if (ended && p.opts.end)
p.dest.end()
return dest

@@ -361,14 +400,17 @@ }

on (ev, fn) {
try {
return super.on(ev, fn)
} finally {
if (ev === 'data' && !this.pipes.length && !this.flowing)
this[RESUME]()
else if (isEndish(ev) && this[EMITTED_END]) {
super.emit(ev)
this.removeAllListeners(ev)
} else if (ev === 'error' && this[EMITTED_ERROR]) {
const ret = super.on(ev, fn)
if (ev === 'data' && !this.pipes.length && !this.flowing)
this[RESUME]()
else if (ev === 'readable' && this[BUFFERLENGTH] !== 0)
super.emit('readable')
else if (isEndish(ev) && this[EMITTED_END]) {
super.emit(ev)
this.removeAllListeners(ev)
} else if (ev === 'error' && this[EMITTED_ERROR]) {
if (this[ASYNC])
defer(() => fn.call(this, this[EMITTED_ERROR]))
else
fn.call(this, this[EMITTED_ERROR])
}
}
return ret
}

@@ -396,3 +438,3 @@

emit (ev, data) {
emit (ev, data, ...extra) {
// error and close are only events allowed after calling destroy()

@@ -402,29 +444,10 @@ if (ev !== 'error' && ev !== 'close' && ev !== DESTROYED && this[DESTROYED])

else if (ev === 'data') {
if (!data)
return
if (this.pipes.length)
this.pipes.forEach(p =>
p.dest.write(data) === false && this.pause())
return !data ? false
: this[ASYNC] ? defer(() => this[EMITDATA](data))
: this[EMITDATA](data)
} else if (ev === 'end') {
// only actual end gets this treatment
if (this[EMITTED_END] === true)
return
this[EMITTED_END] = true
this.readable = false
if (this[DECODER]) {
data = this[DECODER].end()
if (data) {
this.pipes.forEach(p => p.dest.write(data))
super.emit('data', data)
}
}
this.pipes.forEach(p => {
p.dest.removeListener('drain', p.ondrain)
if (p.opts.end)
p.dest.end()
})
return this[EMITTED_END] ? false
: this[ASYNC] ? defer(() => this[EMITEND]())
: this[EMITEND]()
} else if (ev === 'close') {

@@ -435,24 +458,56 @@ this[CLOSED] = true

return
const ret = super.emit('close')
this.removeAllListeners('close')
return ret
} else if (ev === 'error') {
this[EMITTED_ERROR] = data
const ret = super.emit('error', data)
this[MAYBE_EMIT_END]()
return ret
} else if (ev === 'resume') {
const ret = super.emit('resume')
this[MAYBE_EMIT_END]()
return ret
} else if (ev === 'finish' || ev === 'prefinish') {
const ret = super.emit(ev)
this.removeAllListeners(ev)
return ret
}
// TODO: replace with a spread operator when Node v4 support drops
const args = new Array(arguments.length)
args[0] = ev
args[1] = data
if (arguments.length > 2) {
for (let i = 2; i < arguments.length; i++) {
args[i] = arguments[i]
// Some other unknown event
const ret = super.emit(ev, data, ...extra)
this[MAYBE_EMIT_END]()
return ret
}
[EMITDATA] (data) {
for (const p of this.pipes) {
if (p.dest.write(data) === false)
this.pause()
}
const ret = super.emit('data', data)
this[MAYBE_EMIT_END]()
return ret
}
[EMITEND] () {
this[EMITTED_END] = true
this.readable = false
if (this[DECODER]) {
const data = this[DECODER].end()
if (data) {
for (const p of this.pipes) {
p.dest.write(data)
}
super.emit('data', data)
}
}
try {
return super.emit.apply(this, args)
} finally {
if (!isEndish(ev))
this[MAYBE_EMIT_END]()
else
this.removeAllListeners(ev)
for (const p of this.pipes) {
p.end()
}
const ret = super.emit('end')
this.removeAllListeners('end')
return ret
}

@@ -559,3 +614,3 @@

// throw away all buffered data, it's never coming out
this.buffer = new Yallist()
this.buffer.length = 0
this[BUFFERLENGTH] = 0

@@ -562,0 +617,0 @@

{
"name": "minipass",
"version": "3.1.6",
"version": "3.2.0",
"description": "minimal implementation of a PassThrough stream",

@@ -11,3 +11,3 @@ "main": "index.js",

"end-of-stream": "^1.4.0",
"tap": "^15.0.9",
"tap": "^16.2.0",
"through2": "^2.0.3"

@@ -14,0 +14,0 @@ },

@@ -66,2 +66,6 @@ # minipass

You can avoid most of these differences entirely (for a very
small performance penalty) by setting `{async: true}` in the
constructor options.
### Timing

@@ -86,2 +90,78 @@

Example:
```js
const Minipass = require('minipass')
const stream = new Minipass({ async: true })
stream.on('data', () => console.log('data event'))
console.log('before write')
stream.write('hello')
console.log('after write')
// output:
// before write
// data event
// after write
```
### Exception: Async Opt-In
If you wish to have a Minipass stream with behavior that more
closely mimics Node.js core streams, you can set the stream in
async mode either by setting `async: true` in the constructor
options, or by setting `stream.async = true` later on.
```js
const Minipass = require('minipass')
const asyncStream = new Minipass({ async: true })
asyncStream.on('data', () => console.log('data event'))
console.log('before write')
asyncStream.write('hello')
console.log('after write')
// output:
// before write
// after write
// data event <-- this is deferred until the next tick
```
Switching _out_ of async mode is unsafe, as it could cause data
corruption, and so is not enabled. Example:
```js
const Minipass = require('minipass')
const stream = new Minipass({ encoding: 'utf8' })
stream.on('data', chunk => console.log(chunk))
stream.async = true
console.log('before writes')
stream.write('hello')
setStreamSyncAgainSomehow(stream) // <-- this doesn't actually exist!
stream.write('world')
console.log('after writes')
// hypothetical output would be:
// before writes
// world
// after writes
// hello
// NOT GOOD!
```
To avoid this problem, once set into async mode, any attempt to
make the stream sync again will be ignored.
```js
const Minipass = require('minipass')
const stream = new Minipass({ encoding: 'utf8' })
stream.on('data', chunk => console.log(chunk))
stream.async = true
console.log('before writes')
stream.write('hello')
stream.async = false // <-- no-op, stream already async
stream.write('world')
console.log('after writes')
// actual output:
// before writes
// after writes
// hello
// world
```
### No High/Low Water Marks

@@ -102,2 +182,5 @@

Since nothing is ever buffered unnecessarily, there is much less
copying data, and less bookkeeping about buffer capacity levels.
### Hazards of Buffering (or: Why Minipass Is So Fast)

@@ -187,2 +270,4 @@

However, this is _usually_ not a problem because:
### Emit `end` When Asked

@@ -204,2 +289,14 @@

### Emit `error` When Asked
The most recent error object passed to the `'error'` event is
stored on the stream. If a new `'error'` event handler is added,
and an error was previously emitted, then the event handler will
be called immediately (or on `process.nextTick` in the case of
async streams).
This makes it much more difficult to end up trying to interact
with a broken stream, if the error handler is added after an
error was previously emitted.
### Impact of "immediate flow" on Tee-streams

@@ -229,3 +326,3 @@

The solution is to create a dedicated tee-stream junction that pipes to
One solution is to create a dedicated tee-stream junction that pipes to
both locations, and then pipe to _that_ instead.

@@ -267,2 +364,9 @@

All of the hazards in this section are avoided by setting `{
async: true }` in the Minipass constructor, or by setting
`stream.async = true` afterwards. Note that this does add some
overhead, so should only be done in cases where you are willing
to lose a bit of performance in order to avoid having to refactor
program logic.
## USAGE

@@ -269,0 +373,0 @@

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc