stream-chopper
Advanced tools
Comparing version 2.1.1 to 2.2.0
77
index.js
@@ -31,3 +31,8 @@ 'use strict' | ||
: opts.type | ||
this._transform = opts.transform | ||
if (this._transform && this.type === StreamChopper.split) { | ||
throw new Error('stream-chopper cannot split a transform stream') | ||
} | ||
this._bytes = 0 | ||
@@ -82,3 +87,16 @@ this._stream = null | ||
this._bytes = 0 | ||
this._stream = new PassThrough() | ||
if (this._transform) { | ||
this._stream = this._transform().once('resume', () => { | ||
// `resume` will be emitted before the first `data` event | ||
this._stream.on('data', chunk => { | ||
this._bytes += chunk.length | ||
this._maybeEndTransformSteam() | ||
}) | ||
}) | ||
} else { | ||
this._stream = new PassThrough() | ||
} | ||
this._stream | ||
.on('close', this._oneos) | ||
@@ -110,2 +128,14 @@ .on('error', this._oneos) | ||
StreamChopper.prototype._maybeEndTransformSteam = function () { | ||
if (this._stream === null) return | ||
// in case of backpresure on the transform stream, count how many bytes are | ||
// buffered | ||
const bufferedSize = getBufferedSize(this._stream) | ||
const overflow = (this._bytes + bufferedSize) - this.size | ||
if (overflow >= 0) this._endStream() | ||
} | ||
StreamChopper.prototype.resetTimer = function (time) { | ||
@@ -148,10 +178,13 @@ if (arguments.length > 0) this.time = time | ||
if (this._stream === null) return | ||
const stream = this._stream | ||
this._stream = null | ||
if (this._timer !== null) clearTimeout(this._timer) | ||
if (this._stream._writableState.needDrain) this._ondrain() | ||
this._stream.removeListener('error', this._oneos) | ||
this._stream.removeListener('close', this._oneos) | ||
this._stream.removeListener('finish', this._oneos) | ||
this._stream.removeListener('end', this._oneos) | ||
this._stream.removeListener('drain', this._ondrain) | ||
this._stream = null | ||
if (stream._writableState.needDrain) this._ondrain() | ||
stream.removeListener('error', this._oneos) | ||
stream.removeListener('close', this._oneos) | ||
stream.removeListener('finish', this._oneos) | ||
stream.removeListener('end', this._oneos) | ||
stream.removeListener('drain', this._ondrain) | ||
} | ||
@@ -177,2 +210,13 @@ | ||
if (this._transform) { | ||
// The size of a transform stream is counted post-transform and so the size | ||
// guard is located elsewhere. We can therefore just write to the stream | ||
// without any checks. | ||
this._unprotectedWrite(chunk, enc, cb) | ||
} else { | ||
this._protectedWrite(chunk, enc, cb) | ||
} | ||
} | ||
StreamChopper.prototype._protectedWrite = function (chunk, enc, cb) { | ||
this._bytes += chunk.length | ||
@@ -201,5 +245,3 @@ | ||
if (overflow < 0) { | ||
if (this._stream.write(chunk) === false) this._draining = true | ||
if (this._draining === false) cb() | ||
else this._next = cb | ||
this._unprotectedWrite(chunk, enc, cb) | ||
} else { | ||
@@ -212,2 +254,8 @@ // if we reached the size limit, just end the stream already | ||
StreamChopper.prototype._unprotectedWrite = function (chunk, enc, cb) { | ||
if (this._stream.write(chunk) === false) this._draining = true | ||
if (this._draining === false) cb() | ||
else this._next = cb | ||
} | ||
StreamChopper.prototype._destroy = function (err, cb) { | ||
@@ -241,1 +289,8 @@ if (this._destroyed) return | ||
function noop () {} | ||
function getBufferedSize (stream) { | ||
const buffer = stream.writableBuffer || stream._writableState.getBuffer() | ||
return buffer.reduce((total, b) => { | ||
return total + b.chunk.length | ||
}, 0) | ||
} |
{ | ||
"name": "stream-chopper", | ||
"version": "2.1.1", | ||
"version": "2.2.0", | ||
"description": "Chop a single stream of data into a series of readable streams", | ||
@@ -47,5 +47,5 @@ "main": "index.js", | ||
"coordinates": [ | ||
55.777472, | ||
12.592148 | ||
55.778266, | ||
12.593123 | ||
] | ||
} |
@@ -100,2 +100,7 @@ # stream-chopper | ||
the entire chunk to the next stream | ||
- `transform` - An optional function that returns a transform stream | ||
used for transforming the data in some way (e.g. a zlib Gzip stream). | ||
If used, the `size` option will count towards the size of the output | ||
chunks. This config option cannot be used together with the | ||
`StreamChopper.split` type | ||
@@ -102,0 +107,0 @@ If `type` is `StreamChopper.underflow` and the size of the chunk to be |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
15995
234
189