readable-stream
Advanced tools
Comparing version 0.0.2 to 0.0.3
76
fs.js
@@ -7,11 +7,2 @@ 'use strict'; | ||
// implement a read-method style readable stream. | ||
// | ||
// In a perfect world, some of this dancing and buffering would | ||
// not be necessary; we could just open the file using async IO, | ||
// and then read() synchronously until we raise EWOULDBLOCK. | ||
// | ||
// It a just-slightly-less imperfect world, FS readable streams | ||
// would be the *only* stream that implements this kind of buffering | ||
// behavior, since TCP and pipes can be reliably implemented in this | ||
// fashion at a much lower level. | ||
@@ -38,21 +29,17 @@ var Readable = require('./readable.js'); | ||
function FSReadable(path, options) { | ||
if (!options) options = {}; | ||
if (!options) | ||
options = {}; | ||
Readable.apply(this, options); | ||
Readable.call(this, options); | ||
var state = this._readableState; | ||
this.path = path; | ||
this.flags = 'r'; | ||
this.mode = 438; //=0666 | ||
this.fd = null; | ||
this.bufferSize = 64 * 1024; | ||
this.lowWaterMark = 16 * 1024; | ||
this.flags = options.flags || 'r'; | ||
this.mode = options.mode || 438; //=0666 | ||
this.fd = options.fd || null; | ||
Object.keys(options).forEach(function(k) { | ||
this[k] = options[k]; | ||
}, this); | ||
// a little bit bigger buffer and watermark by default | ||
state.bufferSize = options.bufferSize || 64 * 1024; | ||
state.lowWaterMark = options.lowWaterMark || 16 * 1024; | ||
// cast to an int | ||
assert(typeof this.bufferSize === 'number'); | ||
this.bufferSize = ~~this.bufferSize; | ||
if (this.encoding) { | ||
@@ -64,11 +51,10 @@ this._decoder = new StringDecoder(this.encoding); | ||
if (typeofStart !== 'undefined') { | ||
if (typeofStart !== 'number') { | ||
if (typeofStart !== 'number') | ||
throw new TypeError('start must be a Number'); | ||
} | ||
var typeofEnd = typeof this.end; | ||
if (typeofEnd === 'undefined') { | ||
if (typeofEnd === 'undefined') | ||
this.end = Infinity; | ||
} else if (typeofEnd !== 'number') { | ||
else if (typeofEnd !== 'number') | ||
throw new TypeError('end must be a Number'); | ||
} | ||
@@ -78,5 +64,4 @@ this.pos = this.start; | ||
if (typeof this.fd !== 'number') { | ||
if (typeof this.fd !== 'number') | ||
this.open(); | ||
} | ||
} | ||
@@ -95,3 +80,3 @@ | ||
}.bind(this)); | ||
} | ||
}; | ||
@@ -104,4 +89,4 @@ FSReadable.prototype._read = function(n, cb) { | ||
if (this.reading || this.ended || this.destroyed) return; | ||
this.reading = true; | ||
if (this.destroyed) | ||
return; | ||
@@ -119,8 +104,7 @@ if (!pool || pool.length - pool.used < minPoolSpace) { | ||
if (this.pos !== undefined) { | ||
if (this.pos !== undefined) | ||
toRead = Math.min(this.end - this.pos + 1, toRead); | ||
} | ||
if (toRead <= 0) { | ||
this.reading = false; | ||
this.emit('readable'); | ||
@@ -133,4 +117,2 @@ return; | ||
function onread(er, bytesRead) { | ||
this.reading = false; | ||
if (er) { | ||
@@ -142,13 +124,15 @@ this.destroy(); | ||
var b = null; | ||
if (bytesRead > 0) { | ||
if (bytesRead > 0) | ||
b = thisPool.slice(start, start + bytesRead); | ||
} | ||
cb(null, b); | ||
} | ||
} | ||
}; | ||
FSReadable.prototype.close = function(cb) { | ||
if (cb) this.once('close', cb); | ||
if (cb) | ||
this.once('close', cb); | ||
if (this.closed || this.fd === null) { | ||
if (this.fd === null) this.once('open', this.destroy); | ||
if (this.fd === null) | ||
this.once('open', this.destroy); | ||
return process.nextTick(this.emit.bind(this, 'close')); | ||
@@ -159,4 +143,6 @@ } | ||
fs.close(this.fd, function(er) { | ||
if (er) this.emit('error', er); | ||
else this.emit('close'); | ||
if (er) | ||
this.emit('error', er); | ||
else | ||
this.emit('close'); | ||
}.bind(this)); | ||
@@ -163,0 +149,0 @@ }; |
{ | ||
"name": "readable-stream", | ||
"version": "0.0.2", | ||
"version": "0.0.3", | ||
"description": "An exploration of a new kind of readable streams for Node.js", | ||
@@ -5,0 +5,0 @@ "main": "readable.js", |
@@ -1,56 +0,41 @@ | ||
'use strict'; | ||
// Copyright Joyent, Inc. and other Node contributors. | ||
// | ||
// Permission is hereby granted, free of charge, to any person obtaining a | ||
// copy of this software and associated documentation files (the | ||
// "Software"), to deal in the Software without restriction, including | ||
// without limitation the rights to use, copy, modify, merge, publish, | ||
// distribute, sublicense, and/or sell copies of the Software, and to permit | ||
// persons to whom the Software is furnished to do so, subject to the | ||
// following conditions: | ||
// | ||
// The above copyright notice and this permission notice shall be included | ||
// in all copies or substantial portions of the Software. | ||
// | ||
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS | ||
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF | ||
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN | ||
// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, | ||
// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR | ||
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE | ||
// USE OR OTHER DEALINGS IN THE SOFTWARE. | ||
// a passthrough stream. | ||
// whatever you .write(), you can then .read() later. | ||
// this is not very useful on its own, but it's a handy | ||
// base class for certain sorts of simple filters and | ||
// transforms. | ||
// basically just the most minimal sort of Transform stream. | ||
// Every written chunk gets output as-is. | ||
module.exports = PassThrough; | ||
var Readable = require('./readable.js'); | ||
var Transform = require('./transform.js'); | ||
var util = require('util'); | ||
util.inherits(PassThrough, Transform); | ||
util.inherits(PassThrough, Readable); | ||
function PassThrough(options) { | ||
if (!(this instanceof PassThrough)) | ||
return new PassThrough(options); | ||
var fromList = require('./from-list.js'); | ||
function PassThrough() { | ||
Readable.apply(this); | ||
this.buffer = []; | ||
this.length = 0; | ||
Transform.call(this, options); | ||
} | ||
// override this: | ||
PassThrough.prototype.transform = function(c) { | ||
return c; | ||
PassThrough.prototype._transform = function(chunk, output, cb) { | ||
cb(null, chunk); | ||
}; | ||
PassThrough.prototype.write = function(c) { | ||
var needEmitReadable = this.length === 0; | ||
c = this.transform(c); | ||
if (!c || !c.length) return true; | ||
this.buffer.push(c); | ||
this.length += c.length; | ||
if (needEmitReadable) this.emit('readable'); | ||
return (this.length === 0); | ||
}; | ||
PassThrough.prototype.end = function(c) { | ||
this.ended = true; | ||
if (c && c.length) this.write(c); | ||
else if (!this.length) this.emit('end'); | ||
}; | ||
PassThrough.prototype.read = function(n) { | ||
if (!n || n >= this.length) n = this.length; | ||
var ret = fromList(n, this.buffer, this.length); | ||
this.length = Math.max(this.length - n, 0); | ||
if (this.length === 0) { | ||
var ev = this.ended ? 'end' : 'drain'; | ||
process.nextTick(this.emit.bind(this, ev)); | ||
} | ||
return ret; | ||
}; |
451
readable.js
@@ -1,2 +0,21 @@ | ||
'use strict'; | ||
// Copyright Joyent, Inc. and other Node contributors. | ||
// | ||
// Permission is hereby granted, free of charge, to any person obtaining a | ||
// copy of this software and associated documentation files (the | ||
// "Software"), to deal in the Software without restriction, including | ||
// without limitation the rights to use, copy, modify, merge, publish, | ||
// distribute, sublicense, and/or sell copies of the Software, and to permit | ||
// persons to whom the Software is furnished to do so, subject to the | ||
// following conditions: | ||
// | ||
// The above copyright notice and this permission notice shall be included | ||
// in all copies or substantial portions of the Software. | ||
// | ||
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS | ||
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF | ||
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN | ||
// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, | ||
// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR | ||
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE | ||
// USE OR OTHER DEALINGS IN THE SOFTWARE. | ||
@@ -7,49 +26,190 @@ module.exports = Readable; | ||
var util = require('util'); | ||
var fromList = require('./from-list.js'); | ||
var assert = require('assert'); | ||
var StringDecoder; | ||
util.inherits(Readable, Stream); | ||
function Readable(options) { | ||
function ReadableState(options) { | ||
options = options || {}; | ||
this.bufferSize = options.bufferSize || 16 * 1024; | ||
this.lowWaterMark = options.lowWaterMark || 1024; | ||
assert(typeof this.bufferSize === 'number'); | ||
// cast to an int | ||
this.bufferSize = ~~this.bufferSize; | ||
this.lowWaterMark = options.hasOwnProperty('lowWaterMark') ? | ||
options.lowWaterMark : 1024; | ||
this.buffer = []; | ||
this.length = 0; | ||
this._pipes = []; | ||
this._flowing = false; | ||
this.pipes = []; | ||
this.flowing = false; | ||
this.ended = false; | ||
this.endEmitted = false; | ||
this.reading = false; | ||
// whenever we return null, then we set a flag to say | ||
// that we're awaiting a 'readable' event emission. | ||
this.needReadable = false; | ||
this.decoder = null; | ||
if (options.encoding) { | ||
if (!StringDecoder) | ||
StringDecoder = require('string_decoder').StringDecoder; | ||
this.decoder = new StringDecoder(options.encoding); | ||
} | ||
} | ||
function Readable(options) { | ||
if (!(this instanceof Readable)) | ||
return new Readable(options); | ||
this._readableState = new ReadableState(options, this); | ||
Stream.apply(this); | ||
} | ||
// backwards compatibility. | ||
Readable.prototype.setEncoding = function(enc) { | ||
if (!StringDecoder) | ||
StringDecoder = require('string_decoder').StringDecoder; | ||
this._readableState.decoder = new StringDecoder(enc); | ||
}; | ||
function howMuchToRead(n, state) { | ||
if (state.length === 0 && state.ended) | ||
return 0; | ||
if (isNaN(n)) | ||
return state.length; | ||
if (n <= 0) | ||
return 0; | ||
// don't have that much. return null, unless we've ended. | ||
if (n > state.length) { | ||
if (!state.ended) { | ||
state.needReadable = true; | ||
return 0; | ||
} else | ||
return state.length; | ||
} | ||
return n; | ||
} | ||
// you can override either this method, or _read(n, cb) below. | ||
Readable.prototype.read = function(n) { | ||
if (this.length === 0 && this.ended) { | ||
process.nextTick(this.emit.bind(this, 'end')); | ||
var state = this._readableState; | ||
var nOrig = n; | ||
n = howMuchToRead(n, state); | ||
// if we've ended, and we're now clear, then finish it up. | ||
if (n === 0 && state.ended) { | ||
endReadable(this); | ||
return null; | ||
} | ||
if (isNaN(n) || n <= 0) n = this.length; | ||
n = Math.min(n, this.length); | ||
// All the actual chunk generation logic needs to be | ||
// *below* the call to _read. The reason is that in certain | ||
// synthetic stream cases, such as passthrough streams, _read | ||
// may be a completely synchronous operation which may change | ||
// the state of the read buffer, providing enough data when | ||
// before there was *not* enough. | ||
// | ||
// So, the steps are: | ||
// 1. Figure out what the state of things will be after we do | ||
// a read from the buffer. | ||
// | ||
// 2. If that resulting state will trigger a _read, then call _read. | ||
// Note that this may be asynchronous, or synchronous. Yes, it is | ||
// deeply ugly to write APIs this way, but that still doesn't mean | ||
// that the Readable class should behave improperly, as streams are | ||
// designed to be sync/async agnostic. | ||
// Take note if the _read call is sync or async (ie, if the read call | ||
// has returned yet), so that we know whether or not it's safe to emit | ||
// 'readable' etc. | ||
// | ||
// 3. Actually pull the requested chunks out of the buffer and return. | ||
var ret = n > 0 ? fromList(n, this.buffer, this.length) : null; | ||
this.length -= n; | ||
// if we need a readable event, then we need to do some reading. | ||
var doRead = state.needReadable; | ||
// if we currently have less than the lowWaterMark, then also read some | ||
if (state.length - n <= state.lowWaterMark) | ||
doRead = true; | ||
// however, if we've ended, then there's no point, and if we're already | ||
// reading, then it's unnecessary. | ||
if (state.ended || state.reading) | ||
doRead = false; | ||
if (!this.ended && this.length < this.lowWaterMark) { | ||
this._read(this.bufferSize, function onread(er, chunk) { | ||
if (er) return this.emit('error', er); | ||
if (doRead) { | ||
var sync = true; | ||
state.reading = true; | ||
// call internal read method | ||
this._read(state.bufferSize, function onread(er, chunk) { | ||
state.reading = false; | ||
if (er) | ||
return this.emit('error', er); | ||
if (!chunk || !chunk.length) { | ||
this.ended = true; | ||
if (this.length === 0) this.emit('end'); | ||
// eof | ||
state.ended = true; | ||
// if we've ended and we have some data left, then emit | ||
// 'readable' now to make sure it gets picked up. | ||
if (!sync) { | ||
if (state.length > 0) | ||
this.emit('readable'); | ||
else | ||
endReadable(this); | ||
} | ||
return; | ||
} | ||
this.length += chunk.length; | ||
this.buffer.push(chunk); | ||
if (this.length < this.lowWaterMark) { | ||
this._read(this.bufferSize, onread.bind(this)); | ||
if (state.decoder) | ||
chunk = state.decoder.write(chunk); | ||
// update the buffer info. | ||
if (chunk) { | ||
state.length += chunk.length; | ||
state.buffer.push(chunk); | ||
} | ||
this.emit('readable'); | ||
// if we haven't gotten enough to pass the lowWaterMark, | ||
// and we haven't ended, then don't bother telling the user | ||
// that it's time to read more data. Otherwise, that'll | ||
// probably kick off another stream.read(), which can trigger | ||
// another _read(n,cb) before this one returns! | ||
if (state.length <= state.lowWaterMark) { | ||
state.reading = true; | ||
this._read(state.bufferSize, onread.bind(this)); | ||
return; | ||
} | ||
if (state.needReadable && !sync) { | ||
state.needReadable = false; | ||
this.emit('readable'); | ||
} | ||
}.bind(this)); | ||
sync = false; | ||
} | ||
// If _read called its callback synchronously, then `reading` | ||
// will be false, and we need to re-evaluate how much data we | ||
// can return to the user. | ||
if (doRead && !state.reading) | ||
n = howMuchToRead(nOrig, state); | ||
var ret; | ||
if (n > 0) | ||
ret = fromList(n, state.buffer, state.length, !!state.decoder); | ||
else | ||
ret = null; | ||
if (ret === null || ret.length === 0) { | ||
state.needReadable = true; | ||
n = 0; | ||
} | ||
state.length -= n; | ||
return ret; | ||
@@ -59,2 +219,5 @@ }; | ||
// abstract method. to be overridden in specific implementation classes. | ||
// call cb(er, data) where data is <= n in length. | ||
// for virtual (non-string, non-buffer) streams, "length" is somewhat | ||
// arbitrary, and perhaps not very meaningful. | ||
Readable.prototype._read = function(n, cb) { | ||
@@ -64,32 +227,48 @@ process.nextTick(cb.bind(this, new Error('not implemented'))); | ||
Readable.prototype.pipe = function(dest, opt) { | ||
Readable.prototype.pipe = function(dest, pipeOpts) { | ||
var src = this; | ||
src._pipes.push(dest); | ||
if ((!opt || opt.end !== false) && | ||
dest !== process.stdout && | ||
var state = this._readableState; | ||
if (!pipeOpts) | ||
pipeOpts = {}; | ||
state.pipes.push(dest); | ||
if ((!pipeOpts || pipeOpts.end !== false) && | ||
dest !== process.stdout && | ||
dest !== process.stderr) { | ||
src.once('end', onend); | ||
dest.on('unpipe', function(readable) { | ||
if (readable === src) { | ||
if (readable === src) | ||
src.removeListener('end', onend); | ||
} | ||
}); | ||
} | ||
dest.emit('pipe', src); | ||
if (!src._flowing) process.nextTick(flow.bind(src)); | ||
return dest; | ||
function onend() { | ||
dest.end(); | ||
} | ||
dest.emit('pipe', src); | ||
// start the flow. | ||
if (!state.flowing) { | ||
state.flowing = true; | ||
process.nextTick(flow.bind(null, src, pipeOpts)); | ||
} | ||
return dest; | ||
}; | ||
function flow(src) { | ||
if (!src) src = this; | ||
function flow(src, pipeOpts) { | ||
var state = src._readableState; | ||
var chunk; | ||
var dest; | ||
var needDrain = 0; | ||
while (chunk = src.read()) { | ||
src._pipes.forEach(function(dest, i, list) { | ||
function ondrain() { | ||
needDrain--; | ||
if (needDrain === 0) | ||
flow(src, pipeOpts); | ||
} | ||
while (state.pipes.length && | ||
null !== (chunk = src.read(pipeOpts.chunkSize))) { | ||
state.pipes.forEach(function(dest, i, list) { | ||
var written = dest.write(chunk); | ||
@@ -101,27 +280,40 @@ if (false === written) { | ||
}); | ||
if (needDrain > 0) return; | ||
src.emit('data', chunk); | ||
// if anyone needs a drain, then we have to wait for that. | ||
if (needDrain > 0) | ||
return; | ||
} | ||
src.once('readable', flow); | ||
// if every destination was unpiped, either before entering this | ||
// function, or in the while loop, then stop flowing. | ||
// | ||
// NB: This is a pretty rare edge case. | ||
if (state.pipes.length === 0) { | ||
state.flowing = false; | ||
function ondrain() { | ||
needDrain--; | ||
if (needDrain === 0) { | ||
flow(src); | ||
} | ||
// if there were data event listeners added, then switch to old mode. | ||
if (this.listeners('data').length) | ||
emitDataEvents(this); | ||
return; | ||
} | ||
// at this point, no one needed a drain, so we just ran out of data | ||
// on the next readable event, start it over again. | ||
src.once('readable', flow.bind(null, src, pipeOpts)); | ||
} | ||
Readable.prototype.unpipe = function(dest) { | ||
var state = this._readableState; | ||
if (!dest) { | ||
// remove all of them. | ||
this._pipes.forEach(function(dest, i, list) { | ||
state.pipes.forEach(function(dest, i, list) { | ||
dest.emit('unpipe', this); | ||
}, this); | ||
this._pipes.length = 0; | ||
state.pipes.length = 0; | ||
} else { | ||
var i = this._pipes.indexOf(dest); | ||
var i = state.pipes.indexOf(dest); | ||
if (i !== -1) { | ||
dest.emit('unpipe', this); | ||
this._pipes.splice(i, 1); | ||
state.pipes.splice(i, 1); | ||
} | ||
@@ -136,3 +328,7 @@ } | ||
Readable.prototype.on = function(ev, fn) { | ||
if (ev === 'data') emitDataEvents(this); | ||
// https://github.com/isaacs/readable-stream/issues/16 | ||
// if we're already flowing, then no need to set up data events. | ||
if (ev === 'data' && !this._readableState.flowing) | ||
emitDataEvents(this); | ||
return Stream.prototype.on.call(this, ev, fn); | ||
@@ -142,3 +338,22 @@ }; | ||
// pause() and resume() are remnants of the legacy readable stream API | ||
// If the user uses them, then switch into old mode. | ||
Readable.prototype.resume = function() { | ||
emitDataEvents(this); | ||
return this.resume(); | ||
}; | ||
Readable.prototype.pause = function() { | ||
emitDataEvents(this); | ||
return this.pause(); | ||
}; | ||
function emitDataEvents(stream) { | ||
var state = stream._readableState; | ||
if (state.flowing) { | ||
// https://github.com/isaacs/readable-stream/issues/16 | ||
throw new Error('Cannot switch to old mode now.'); | ||
} | ||
var paused = false; | ||
@@ -155,6 +370,9 @@ var readable = false; | ||
var c; | ||
while (!paused && (c = stream.read())) { | ||
while (!paused && (null !== (c = stream.read()))) | ||
stream.emit('data', c); | ||
if (c === null) { | ||
readable = false; | ||
stream._readableState.needReadable = true; | ||
} | ||
if (c === null) readable = false; | ||
}); | ||
@@ -168,28 +386,32 @@ | ||
paused = false; | ||
if (readable) stream.emit('readable'); | ||
if (readable) | ||
stream.emit('readable'); | ||
}; | ||
// now make it start, just in case it hadn't already. | ||
process.nextTick(function() { | ||
stream.emit('readable'); | ||
}); | ||
} | ||
// wrap an old-style stream | ||
// wrap an old-style stream as the async data source. | ||
// This is *not* part of the readable stream interface. | ||
// It is an ugly unfortunate mess of history. | ||
Readable.prototype.wrap = function(stream) { | ||
this.buffer = []; | ||
this.length = 0; | ||
var state = this._readableState; | ||
var paused = false; | ||
var ended = false; | ||
stream.on('end', function() { | ||
ended = true; | ||
if (this.length === 0) { | ||
this.emit('end'); | ||
} | ||
state.ended = true; | ||
if (state.length === 0) | ||
endReadable(this); | ||
}.bind(this)); | ||
stream.on('data', function(chunk) { | ||
this.buffer.push(chunk); | ||
this.length += chunk.length; | ||
state.buffer.push(chunk); | ||
state.length += chunk.length; | ||
this.emit('readable'); | ||
// if not consumed, then pause the stream. | ||
if (this.length > this.lowWaterMark && !paused) { | ||
if (state.length > state.lowWaterMark && !paused) { | ||
paused = true; | ||
@@ -220,10 +442,22 @@ stream.pause(); | ||
this.read = function(n) { | ||
if (this.length === 0) return null; | ||
if (state.length === 0) { | ||
state.needReadable = true; | ||
return null; | ||
} | ||
if (isNaN(n) || n <= 0) n = this.length; | ||
if (isNaN(n) || n <= 0) | ||
n = state.length; | ||
var ret = fromList(n, this.buffer, this.length); | ||
this.length = Math.max(0, this.length - n); | ||
if (n > state.length) { | ||
if (!state.ended) { | ||
state.needReadable = true; | ||
return null; | ||
} else | ||
n = state.length; | ||
} | ||
if (this.length < this.lowWaterMark && paused) { | ||
var ret = fromList(n, state.buffer, state.length, !!state.decoder); | ||
state.length -= n; | ||
if (state.length <= state.lowWaterMark && paused) { | ||
stream.resume(); | ||
@@ -233,7 +467,82 @@ paused = false; | ||
if (this.length === 0 && ended) { | ||
process.nextTick(this.emit.bind(this, 'end')); | ||
} | ||
if (state.length === 0 && state.ended) | ||
endReadable(this); | ||
return ret; | ||
}; | ||
}; | ||
// exposed for testing purposes only. | ||
Readable._fromList = fromList; | ||
// Pluck off n bytes from an array of buffers. | ||
// Length is the combined lengths of all the buffers in the list. | ||
function fromList(n, list, length, stringMode) { | ||
var ret; | ||
// nothing in the list, definitely empty. | ||
if (list.length === 0) { | ||
return null; | ||
} | ||
if (length === 0) | ||
ret = null; | ||
else if (!n || n >= length) { | ||
// read it all, truncate the array. | ||
if (stringMode) | ||
ret = list.join(''); | ||
else | ||
ret = Buffer.concat(list, length); | ||
list.length = 0; | ||
} else { | ||
// read just some of it. | ||
if (n < list[0].length) { | ||
// just take a part of the first list item. | ||
// slice is the same for buffers and strings. | ||
var buf = list[0]; | ||
ret = buf.slice(0, n); | ||
list[0] = buf.slice(n); | ||
} else if (n === list[0].length) { | ||
// first list is a perfect match | ||
ret = list.shift(); | ||
} else { | ||
// complex case. | ||
// we have enough to cover it, but it spans past the first buffer. | ||
if (stringMode) | ||
ret = ''; | ||
else | ||
ret = new Buffer(n); | ||
var c = 0; | ||
for (var i = 0, l = list.length; i < l && c < n; i++) { | ||
var buf = list[0]; | ||
var cpy = Math.min(n - c, buf.length); | ||
if (stringMode) | ||
ret += buf.slice(0, cpy); | ||
else | ||
buf.copy(ret, c, 0, cpy); | ||
if (cpy < buf.length) | ||
list[0] = buf.slice(cpy); | ||
else | ||
list.shift(); | ||
c += cpy; | ||
} | ||
} | ||
} | ||
return ret; | ||
} | ||
function endReadable(stream) { | ||
var state = stream._readableState; | ||
if (state.endEmitted) | ||
return; | ||
state.ended = true; | ||
state.endEmitted = true; | ||
process.nextTick(stream.emit.bind(stream, 'end')); | ||
} |
461
README.md
@@ -12,40 +12,18 @@ # readable-stream | ||
## Usage | ||
Note that Duplex, Transform, Writable, and PassThrough streams are also | ||
provided as base classes. See the full API details below. | ||
```javascript | ||
var Readable = require('readable-stream'); | ||
var r = new Readable(); | ||
## Justification | ||
r.read = function(n) { | ||
// your magic goes here. | ||
// return n bytes, or null if there is nothing to be read. | ||
// if you return null, then you MUST emit 'readable' at some | ||
// point in the future if there are bytes available, or 'end' | ||
// if you are not going to have any more data. | ||
// | ||
// You MUST NOT emit either 'end' or 'readable' before | ||
// returning from this function, but you MAY emit 'end' or | ||
// 'readable' in process.nextTick(). | ||
}; | ||
<!-- misc --> | ||
r.on('end', function() { | ||
// no more bytes will be provided. | ||
}); | ||
Writable streams in node are relatively straightforward to use and | ||
extend. The `write` method either returns `false` if you would like | ||
the user to back off a bit, in which case a `drain` event at some | ||
point in the future will let them continue writing, or anything other | ||
than false if the bytes could be completely handled and another | ||
`write` should be performed, or The `end()` method lets the user | ||
indicate that no more bytes will be written. That's pretty much the | ||
entire required interface for writing. | ||
r.on('readable', function() { | ||
// now is the time to call read() again. | ||
}); | ||
``` | ||
## Justification | ||
Writable streams in node are very straightforward to use and extend. | ||
The `write` method either returns `true` if the bytes could be | ||
completely handled and another `write` should be performed, or `false` | ||
if you would like the user to back off a bit, in which case a `drain` | ||
event at some point in the future will let them continue writing. The | ||
`end()` method lets the user indicate that no more bytes will be | ||
written. That's pretty much the entire required interface for | ||
writing. | ||
However, readable streams in Node 0.8 and before are rather | ||
@@ -60,2 +38,5 @@ complicated. | ||
buffering yourself. | ||
3. In many streams, `pause()` was purely advisory, so **even while | ||
paused**, you still have to be careful that you might get some | ||
data. This caused a lot of subtle b ugs. | ||
@@ -70,7 +51,10 @@ So, while writers only have to implement `write()`, `end()`, and | ||
And read consumers had to always be prepared for their backpressure | ||
advice to simply be ignored. | ||
If you are using a readable stream, and want to just get the first 10 | ||
bytes, make a decision, and then pass the rest off to somewhere else, | ||
then you have to handle buffering, pausing, and so on. This is all | ||
rather brittle and easy to get wrong for all but the most trivial use | ||
cases. | ||
then you have to handle buffering, pausing, slicing, and so on. This | ||
is all rather brittle and easy to get wrong for all but the most | ||
trivial use cases. | ||
@@ -84,6 +68,9 @@ Additionally, this all made the `reader.pipe(writer)` method | ||
<!-- misc --> | ||
The reader does not have pause/resume methods. If you want to consume | ||
the bytes, you call `read()`. If bytes are not being consumed, then | ||
effectively the stream is in a paused state. It exerts backpressure | ||
on upstream connections, doesn't read from files, etc. | ||
on upstream connections, doesn't read from files, etc. Any data that | ||
was already in the process of being read will be placed in a buffer. | ||
@@ -94,6 +81,9 @@ If `read()` returns `null`, then a future `readable` event will be | ||
This is simpler and conceptually closer to the underlying mechanisms. | ||
The resulting `pipe()` method is much shorter and simpler. | ||
The resulting `pipe()` method is much shorter and simpler. The | ||
problems of data events happening while paused are alleviated. | ||
### Compatibility | ||
<!-- misc --> | ||
It's not particularly difficult to wrap older-style streams in this | ||
@@ -103,13 +93,21 @@ new interface, or to wrap this type of stream in the older-style | ||
The `Readable` class takes an argument which is an old-style stream | ||
with `data` events and `pause()` and `resume()` methods, and uses that | ||
as the data source. For example: | ||
The `Readable` class provides a `wrap(oldStream)` method that takes an | ||
argument which is an old-style stream with `data` events and `pause()` | ||
and `resume()` methods, and uses that as the data source. For | ||
example: | ||
```javascript | ||
var r = new Readable(oldReadableStream); | ||
var r = new Readable(); | ||
r.wrap(oldReadableStream); | ||
// now you can use r.read(), and it will emit 'readable' events | ||
// but the data is based on whatever oldReadableStream spits out of | ||
// its 'data' events. | ||
``` | ||
The `Readable` class will also automatically convert into an old-style | ||
In order to work with programs that use the older interface, some | ||
magic is unfortunately required. At some point in the future, this | ||
magic will be removed. | ||
The `Readable` class will automatically convert into an old-style | ||
`data`-emitting stream if any listeners are added to the `data` event. | ||
@@ -124,4 +122,9 @@ So, this works fine, though you of course lose a lot of the benefits of | ||
// ... | ||
// magic is happening! oh no! the animals are walking upright! | ||
// the brooms are sweeping the floors all by themselves! | ||
}); | ||
// this will also turn on magic-mode: | ||
r.pause(); | ||
// now pause, resume, etc. are patched into place, and r will | ||
@@ -135,1 +138,371 @@ // continually call read() until it returns null, emitting the | ||
``` | ||
## Class: Readable | ||
A base class for implementing Readable streams. Override the | ||
`_read(n,cb)` method to fetch data asynchronously and take advantage | ||
of the buffering built into the Readable class. | ||
### Example | ||
Extend the Readable class, and provide a `_read(n,cb)` implementation | ||
method. | ||
```javascript | ||
var Readable = require('readable-stream'); | ||
var util = require('util'); | ||
util.inherits(MyReadable, Readable); | ||
function MyReadable(options) { | ||
Readable.call(this, options); | ||
} | ||
MyReadable.prototype._read = function(n, cb) { | ||
// your magic goes here. | ||
// call the cb at some time in the future with either up to n bytes, | ||
// or an error, like cb(err, resultData) | ||
// | ||
// The code in the Readable class will call this to keep an internal | ||
// buffer at a healthy level, as the user calls var chunk=stream.read(n) | ||
// to consume chunks. | ||
}; | ||
var r = new MyReadable(); | ||
r.on('end', function() { | ||
// no more bytes will be provided. | ||
}); | ||
r.on('readable', function() { | ||
// now is the time to call read() again. | ||
}); | ||
// to get some bytes out of it: | ||
var data = r.read(optionalLengthArgument); | ||
// now data is either null, or a buffer of optionalLengthArgument | ||
// length. If you don't provide an argument, then it returns whatever | ||
// it has. | ||
// typically you'd just r.pipe() into some writable stream, but you | ||
// can of course do stuff like this, as well: | ||
function flow() { | ||
var chunk; | ||
while (null !== (chunk = r.read())) { | ||
doSomethingWithData(chunk); | ||
} | ||
r.once('readable', flow); | ||
} | ||
flow(); | ||
``` | ||
### new Readable(options) | ||
* `options` {Object} | ||
* `lowWaterMark` {Number} The minimum number of bytes before the | ||
stream is considered 'readable'. Default = `1024` | ||
* `bufferSize` {Number} The number of bytes to try to read from the | ||
underlying `_read` function. Default = `16 * 1024` | ||
Make sure to call the `Readable` constructor in your extension | ||
classes, or else the stream will not be properly initialized. | ||
### readable.read([n]) | ||
* `n` {Number} Optional number of bytes to read. If not provided, | ||
then return however many bytes are available. | ||
* Returns: {Buffer | null} | ||
Pulls the requested number of bytes out of the internal buffer. If | ||
that many bytes are not available, then it returns `null`. | ||
### readable.\_read(n, callback) | ||
* `n` {Number} Number of bytes to read from the underlying | ||
asynchronous data source. | ||
* `callback` {Function} Callback function | ||
* `error` {Error Object} | ||
* `data` {Buffer | null} | ||
**Note: This function is not implemented in the Readable base class.** | ||
Rather, it is up to you to implement `_read` in your extension | ||
classes. | ||
`_read` should fetch the specified number of bytes, and call the | ||
provided callback with `cb(error, data)`, where `error` is any error | ||
encountered, and `data` is the returned data. | ||
This method is prefixed with an underscore because it is internal to | ||
the class that defines it, and should not be called directly by user | ||
programs. However, you **are** expected to override this method in | ||
your own extension classes. | ||
### readable.pipe(destination) | ||
* `destination` {Writable Stream object} | ||
Continually `read()` data out of the readable stream, and `write()` it | ||
into the writable stream. When the `writable.write(chunk)` call | ||
returns `false`, then it will back off until the next `drain` event, | ||
to do backpressure. | ||
Piping to multiple destinations is supported. The slowest destination | ||
stream will limit the speed of the `pipe()` flow. | ||
Note that this puts the readable stream into a state where not very | ||
much can be done with it. You can no longer `read()` from the stream | ||
in other code, without upsetting the pipe() process. However, since | ||
multiple pipe destinations are supported, you can always create a | ||
`PassThrough` stream, and pipe the reader to that. For example: | ||
``` | ||
var r = new ReadableWhatever(); | ||
var pt = new PassThrough(); | ||
r.pipe(someWritableThing); | ||
r.pipe(pt); | ||
// now I can call pt.read() to my heart's content. | ||
// note that if I *don't* call pt.read(), then it'll back up and | ||
// prevent the pipe() from flowing! | ||
``` | ||
### readable.unpipe([destination]) | ||
* `destination` {Writable Stream object} Optional | ||
Remove the provided `destination` stream from the pipe flow. If no | ||
argument is provided, then it will unhook all piped destinations. | ||
### readable.on('readable') | ||
An event that signals more data is now available to be read from the | ||
stream. Emitted when more data arrives, after previously calling | ||
`read()` and getting a null result. | ||
### readable.on('end') | ||
An event that signals that no more data will ever be available on this | ||
stream. It's over. | ||
### readable.\_readableState | ||
* {Object} | ||
An object that tracks the state of the stream. Buffer information, | ||
whether or not it has reached the end of the underlying data source, | ||
etc., are all tracked on this object. | ||
You are strongly encouraged not to modify this in any way, but it is | ||
often useful to read from. | ||
## Class: Writable | ||
A base class for creating Writable streams. Similar to Readable, you | ||
can create child classes by overriding the asynchronous | ||
`_write(chunk,cb)` method, and it will take care of buffering, | ||
backpressure, and so on. | ||
### new Writable(options) | ||
* `options` {Object} | ||
* `highWaterMark` {Number} The number of bytes to store up before it | ||
starts returning `false` from write() calls. Default = `16 * 1024` | ||
* `lowWaterMark` {Number} The number of bytes that the buffer must | ||
get down to before it emits `drain`. Default = `1024` | ||
Make sure to call the `Writable` constructor in your extension | ||
classes, or else the stream will not be properly initialized. | ||
### writable.write(chunk, [encoding]) | ||
* `chunk` {Buffer | String} | ||
* `encoding` {String} An encoding argument to turn the string chunk | ||
into a buffer. Only relevant if `chunk` is a string. | ||
Default = `'utf8'`. | ||
* Returns `false` if you should not write until the next `drain` | ||
event, or some other value otherwise. | ||
The basic write function. | ||
### writable.\_write(chunk, callback) | ||
* `chunk` {Buffer} | ||
* `callback` {Function} | ||
* `error` {Error | null} Call with an error object as the first | ||
argument to indicate that the write() failed for unfixable | ||
reasons. | ||
**Note: This function is not implemented in the Writable base class.** | ||
Rather, it is up to you to implement `_write` in your extension | ||
classes. | ||
`_write` should do whatever has to be done in this specific Writable | ||
class, to handle the bytes being written. Write to a file, send along | ||
a socket, encrypt as an mp3, whatever needs to be done. Do your I/O | ||
asynchronously, and call the callback when it's complete. | ||
This method is prefixed with an underscore because it is internal to | ||
the class that defines it, and should not be called directly by user | ||
programs. However, you **are** expected to override this method in | ||
your own extension classes. | ||
### writable.end([chunk], [encoding]) | ||
* `chunk` {Buffer | String} | ||
* `encoding` {String} | ||
If a chunk (and, optionally, an encoding) are provided, then that | ||
chunk is first passed to `this.write(chunk, encoding)`. | ||
This method is a way to signal to the writable stream that you will | ||
not be writing any more data. It should be called exactly once for | ||
every writable stream. | ||
Calling `write()` *after* calling `end()` will trigger an error. | ||
### writable.on('pipe', source) | ||
Emitted when calling `source.pipe(writable)`. See above for the | ||
description of the `readable.pipe()` method. | ||
### writable.on('unpipe', source) | ||
Emitted when calling `source.unpipe(writable)`. See above for the | ||
description of the `readable.unpipe()` method. | ||
### writable.on('drain') | ||
If a call to `writable.write()` returns false, then at some point in | ||
the future, this event will tell you to start writing again. | ||
### writable.on('finish') | ||
When the stream has been ended, and all the data in its internal | ||
buffer has been consumed, then it emits a `finish` event to let you | ||
know that it's completely done. | ||
This is particularly handy if you want to know when it is safe to shut | ||
down a socket or close a file descriptor. At this time, the writable | ||
stream may be safely disposed. Its mission in life has been | ||
accomplished. | ||
## Class: Duplex | ||
A base class for Duplex streams (ie, streams that are both readable | ||
and writable). | ||
Since JS doesn't have multiple prototypal inheritance, this class | ||
prototypally inherits from Readable, and then parasitically from | ||
Writable. It is thus up to the user to implement both the lowlevel | ||
`_read(n,cb)` method as well as the lowlevel `_write(chunk,cb)` | ||
method on extension duplex classes. | ||
For cases where the written data is transformed into the output, it | ||
may be simpler to use the `Transform` class instead. | ||
### new Duplex(options) | ||
* `options` {Object} Passed to both the Writable and Readable | ||
constructors. | ||
Make sure to call the `Duplex` constructor in your extension | ||
classes, or else the stream will not be properly initialized. | ||
If `options.allowHalfOpen` is set to the value `false`, then the | ||
stream will automatically end the readable side when the writable | ||
side ends, and vice versa. | ||
### duplex.allowHalfOpen | ||
* {Boolean} Default = `true` | ||
Set this flag to either `true` or `false` to determine whether or not | ||
to automatically close the writable side when the readable side ends, | ||
and vice versa. | ||
## Class: Transform | ||
A duplex (ie, both readable and writable) stream that is designed to | ||
make it easy to implement transform operations such as encryption, | ||
decryption, compression, and so on. | ||
Transform streams are `instanceof` Readable, but they have all of the | ||
methods and properties of both Readable and Writable streams. See | ||
above for the list of events and methods that Transform inherits from | ||
Writable and Readable. | ||
Override the `_transform(chunk, outputFunction, callback)` method in | ||
your implementation classes to take advantage of it. | ||
### new Transform(options) | ||
* `options` {Object} Passed to both the Writable and Readable | ||
constructors. | ||
Make sure to call the `Transform` constructor in your extension | ||
classes, or else the stream will not be properly initialized. | ||
### transform.\_transform(chunk, outputFn, callback) | ||
* `chunk` {Buffer} The chunk to be transformed. | ||
* `outputFn` {Function} Call this function with any output data to be | ||
passed to the readable interface. | ||
* `callback` {Function} Call this function (optionally with an error | ||
argument) when you are done processing the supplied chunk. | ||
**Note: This function is not implemented in the Transform base class.** | ||
Rather, it is up to you to implement `_transform` in your extension | ||
classes. | ||
`_transform` should do whatever has to be done in this specific | ||
Transform class, to handle the bytes being written, and pass them off | ||
to the readable portion of the interface. Do asynchronous I/O, | ||
process things, and so on. | ||
Call the callback function only when the current chunk is completely | ||
consumed. Note that this may mean that you call the `outputFn` zero | ||
or more times, depending on how much data you want to output as a | ||
result of this chunk. | ||
This method is prefixed with an underscore because it is internal to | ||
the class that defines it, and should not be called directly by user | ||
programs. However, you **are** expected to override this method in | ||
your own extension classes. | ||
### transform.\_flush(outputFn, callback) | ||
* `outputFn` {Function} Call this function with any output data to be | ||
passed to the readable interface. | ||
* `callback` {Function} Call this function (optionally with an error | ||
argument) when you are done flushing any remaining data. | ||
**Note: This function is not implemented in the Transform base class.** | ||
Rather, it is up to you to implement `_flush` in your extension | ||
classes optionally, if it applies to your use case. | ||
In some cases, your transform operation may need to emit a bit more | ||
data at the end of the stream. For example, a `Zlib` compression | ||
stream will store up some internal state so that it can optimally | ||
compress the output. At the end, however, it needs to do the best it | ||
can with what is left, so that the data will be complete. | ||
In those cases, you can implement a `_flush` method, which will be | ||
called at the very end, after all the written data is consumed, but | ||
before emitting `end` to signal the end of the readable side. Just | ||
like with `_transform`, call `outputFn` zero or more times, as | ||
appropriate, and call `callback` when the flush operation is complete. | ||
This method is prefixed with an underscore because it is internal to | ||
the class that defines it, and should not be called directly by user | ||
programs. However, you **are** expected to override this method in | ||
your own extension classes. | ||
## Class: PassThrough | ||
This is a trivial implementation of a `Transform` stream that simply | ||
passes the input bytes across to the output. Its purpose is mainly | ||
for examples and testing, but there are occasionally use cases where | ||
it can come in handy. |
var test = require('tap').test; | ||
var fromList = require('../from-list.js'); | ||
var fromList = require('../readable.js')._fromList; | ||
test('with length', function(t) { | ||
test('buffers', function(t) { | ||
// have a length | ||
@@ -33,1 +33,31 @@ var len = 16; | ||
}); | ||
test('strings', function(t) { | ||
// have a length | ||
var len = 16; | ||
var list = [ 'foog', | ||
'bark', | ||
'bazy', | ||
'kuel' ]; | ||
// read more than the first element. | ||
var ret = fromList(6, list, 16, true); | ||
t.equal(ret, 'foogba'); | ||
// read exactly the first element. | ||
ret = fromList(2, list, 10, true); | ||
t.equal(ret, 'rk'); | ||
// read less than the first element. | ||
ret = fromList(2, list, 8, true); | ||
t.equal(ret, 'ba'); | ||
// read more than we have. | ||
ret = fromList(100, list, 6, true); | ||
t.equal(ret, 'zykuel'); | ||
// all consumed. | ||
t.same(list, []); | ||
t.end(); | ||
}); |
@@ -46,7 +46,10 @@ var test = require('tap').test | ||
var l = 0; | ||
t.same(res.map(function (c) { return c.length; }), expectLengths); | ||
t.same(res.map(function (c) { | ||
return c.length; | ||
}), expectLengths); | ||
t.end(); | ||
}); | ||
r.pipe(w); | ||
r.pipe(w, { chunkSize: 10 }); | ||
}); |
134
writable.js
@@ -0,1 +1,22 @@ | ||
// Copyright Joyent, Inc. and other Node contributors. | ||
// | ||
// Permission is hereby granted, free of charge, to any person obtaining a | ||
// copy of this software and associated documentation files (the | ||
// "Software"), to deal in the Software without restriction, including | ||
// without limitation the rights to use, copy, modify, merge, publish, | ||
// distribute, sublicense, and/or sell copies of the Software, and to permit | ||
// persons to whom the Software is furnished to do so, subject to the | ||
// following conditions: | ||
// | ||
// The above copyright notice and this permission notice shall be included | ||
// in all copies or substantial portions of the Software. | ||
// | ||
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS | ||
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF | ||
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN | ||
// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, | ||
// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR | ||
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE | ||
// USE OR OTHER DEALINGS IN THE SOFTWARE. | ||
// A bit simpler than readable streams. | ||
@@ -5,23 +26,41 @@ // Implement an async ._write(chunk, cb), and it'll handle all | ||
'use strict'; | ||
module.exports = Writable; | ||
module.exports = Writable | ||
var util = require('util'); | ||
var Stream = require('stream'); | ||
var Duplex = require('./duplex.js'); | ||
util.inherits(Writable, Stream); | ||
function Writable(options) { | ||
// buffer management | ||
function WritableState(options) { | ||
options = options || {}; | ||
this.highWaterMark = options.highWaterMark || 16 * 1024; | ||
this.highWaterMark = options.hasOwnProperty('highWaterMark') ? | ||
options.highWaterMark : 16 * 1024; | ||
this.lowWaterMark = options.hasOwnProperty('lowWaterMark') ? | ||
options.lowWaterMark : 1024; | ||
this.needDrain = false; | ||
this.ended = false; | ||
this.ending = false; | ||
// not an actual buffer we keep track of, but a measurement | ||
// of how much we're waiting to get pushed to some underlying | ||
// socket or file. | ||
this.length = 0; | ||
// the point at which write returns false | ||
this.highWaterMark = options.highWaterMark || 16 * 1024; | ||
this.writing = false; | ||
this.buffer = []; | ||
} | ||
// the point at which drain is emitted | ||
this.lowWaterMark = options.lowWaterMark || 1024; | ||
function Writable(options) { | ||
// Writable ctor is applied to Duplexes, though they're not | ||
// instanceof Writable, they're instanceof Readable. | ||
if (!(this instanceof Writable) && !(this instanceof Duplex)) | ||
return new Writable(options); | ||
this._needDrain = false; | ||
this._ended = false; | ||
this._writableState = new WritableState(options); | ||
// legacy. | ||
this.writable = true; | ||
Stream.call(this); | ||
@@ -32,10 +71,27 @@ } | ||
// override the _write(chunk, cb) method for async streams | ||
Writable.prototype.write = function(chunk) { | ||
var ret = this.length >= this.highWaterMark; | ||
if (ret === false) { | ||
this._needDrain = true; | ||
Writable.prototype.write = function(chunk, encoding) { | ||
var state = this._writableState; | ||
if (state.ended) { | ||
this.emit('error', new Error('write after end')); | ||
return; | ||
} | ||
if (typeof chunk === 'string' || encoding) | ||
chunk = new Buffer(chunk + '', encoding); | ||
var l = chunk.length; | ||
this.length += l; | ||
this._write(chunk, function(er) { | ||
state.length += l; | ||
var ret = state.length < state.highWaterMark; | ||
if (ret === false) | ||
state.needDrain = true; | ||
if (state.writing) { | ||
state.buffer.push(chunk); | ||
return ret; | ||
} | ||
state.writing = true; | ||
this._write(chunk, function writecb(er) { | ||
state.writing = false; | ||
if (er) { | ||
@@ -45,14 +101,33 @@ this.emit('error', er); | ||
} | ||
this.length -= l; | ||
state.length -= l; | ||
if (this.length === 0 && this._ended) { | ||
this.emit('end'); | ||
if (state.length === 0 && (state.ended || state.ending)) { | ||
// emit 'finish' at the very end. | ||
this.emit('finish'); | ||
return; | ||
} | ||
if (this.length < this.lowWaterMark && this._needDrain) { | ||
this._needDrain = false; | ||
this.emit('drain'); | ||
// if there's something in the buffer waiting, then do that, too. | ||
if (state.buffer.length) { | ||
chunk = state.buffer.shift(); | ||
l = chunk.length; | ||
state.writing = true; | ||
this._write(chunk, writecb.bind(this)); | ||
} | ||
if (state.length <= state.lowWaterMark && state.needDrain) { | ||
// Must force callback to be called on nextTick, so that we don't | ||
// emit 'drain' before the write() consumer gets the 'false' return | ||
// value, and has a chance to attach a 'drain' listener. | ||
process.nextTick(function() { | ||
if (!state.needDrain) | ||
return; | ||
state.needDrain = false; | ||
this.emit('drain'); | ||
}.bind(this)); | ||
} | ||
}.bind(this)); | ||
return ret; | ||
}; | ||
@@ -64,7 +139,10 @@ | ||
Writable.prototype.end = function(chunk) { | ||
if (chunk) { | ||
this.write(chunk); | ||
} | ||
this._ended = true; | ||
Writable.prototype.end = function(chunk, encoding) { | ||
var state = this._writableState; | ||
state.ending = true; | ||
if (chunk) | ||
this.write(chunk, encoding); | ||
else if (state.length === 0) | ||
this.emit('finish'); | ||
state.ended = true; | ||
}; |
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
New author
Supply chain riskA new npm collaborator published a version of the package for the first time. New collaborators are usually benign additions to a project, but do indicate a change to the security surface area of a package.
Found 1 instance in 1 package
88095
20
2149
501