readable-stream
Advanced tools
Comparing version 1.0.2 to 1.0.15
@@ -71,2 +71,3 @@ // Copyright Joyent, Inc. and other Node contributors. | ||
this.emittedReadable = false; | ||
this.readableListening = false; | ||
@@ -78,2 +79,7 @@ | ||
// Crypto is kind of old and crusty. Historically, its default string | ||
// encoding is 'binary' so we have to make this configurable. | ||
// Everything else in the universe uses 'utf8', though. | ||
this.defaultEncoding = options.defaultEncoding || 'utf8'; | ||
// when piping, we only care about 'readable' events that happen | ||
@@ -90,2 +96,3 @@ // after read()ing all the bytes and not getting any pushback. | ||
this.decoder = null; | ||
this.encoding = null; | ||
if (options.encoding) { | ||
@@ -95,2 +102,3 @@ if (!StringDecoder) | ||
this.decoder = new StringDecoder(options.encoding); | ||
this.encoding = options.encoding; | ||
} | ||
@@ -115,19 +123,23 @@ } | ||
// write() some more. | ||
Readable.prototype.push = function(chunk) { | ||
Readable.prototype.push = function(chunk, encoding) { | ||
var state = this._readableState; | ||
if (typeof chunk === 'string' && !state.objectMode) | ||
chunk = new Buffer(chunk, arguments[1]); | ||
return readableAddChunk(this, state, chunk, false); | ||
if (typeof chunk === 'string' && !state.objectMode) { | ||
encoding = encoding || state.defaultEncoding; | ||
if (encoding !== state.encoding) { | ||
chunk = new Buffer(chunk, encoding); | ||
encoding = ''; | ||
} | ||
} | ||
return readableAddChunk(this, state, chunk, encoding, false); | ||
}; | ||
// Unshift should *always* be something directly out of read() | ||
Readable.prototype.unshift = function(chunk) { | ||
var state = this._readableState; | ||
if (typeof chunk === 'string' && !state.objectMode) | ||
chunk = new Buffer(chunk, arguments[1]); | ||
return readableAddChunk(this, state, chunk, true); | ||
return readableAddChunk(this, state, chunk, '', true); | ||
}; | ||
function readableAddChunk(stream, state, chunk, addToFront) { | ||
state.reading = false; | ||
function readableAddChunk(stream, state, chunk, encoding, addToFront) { | ||
var er = chunkInvalid(state, chunk); | ||
@@ -137,18 +149,32 @@ if (er) { | ||
} else if (chunk === null || chunk === undefined) { | ||
onEofChunk(stream, state); | ||
state.reading = false; | ||
if (!state.ended) | ||
onEofChunk(stream, state); | ||
} else if (state.objectMode || chunk && chunk.length > 0) { | ||
if (state.decoder) | ||
chunk = state.decoder.write(chunk); | ||
if (state.ended && !addToFront) { | ||
var e = new Error('stream.push() after EOF'); | ||
stream.emit('error', e); | ||
} else if (state.endEmitted && addToFront) { | ||
var e = new Error('stream.unshift() after end event'); | ||
stream.emit('error', e); | ||
} else { | ||
if (state.decoder && !addToFront && !encoding) | ||
chunk = state.decoder.write(chunk); | ||
// update the buffer info. | ||
state.length += state.objectMode ? 1 : chunk.length; | ||
if (addToFront) | ||
state.buffer.unshift(chunk); | ||
else | ||
state.buffer.push(chunk); | ||
// update the buffer info. | ||
state.length += state.objectMode ? 1 : chunk.length; | ||
if (addToFront) { | ||
state.buffer.unshift(chunk); | ||
} else { | ||
state.reading = false; | ||
state.buffer.push(chunk); | ||
} | ||
if (state.needReadable) | ||
emitReadable(stream); | ||
if (state.needReadable) | ||
emitReadable(stream); | ||
maybeReadMore(stream, state); | ||
maybeReadMore(stream, state); | ||
} | ||
} else if (!addToFront) { | ||
state.reading = false; | ||
} | ||
@@ -180,2 +206,3 @@ | ||
this._readableState.decoder = new StringDecoder(enc); | ||
this._readableState.encoding = enc; | ||
}; | ||
@@ -248,3 +275,3 @@ | ||
state.needReadable && | ||
state.length >= state.highWaterMark) { | ||
(state.length >= state.highWaterMark || state.ended)) { | ||
emitReadable(this); | ||
@@ -356,4 +383,3 @@ return null; | ||
function onEofChunk(stream, state) { | ||
state.ended = true; | ||
if (state.decoder && state.decoder.end) { | ||
if (state.decoder && !state.ended) { | ||
var chunk = state.decoder.end(); | ||
@@ -365,2 +391,3 @@ if (chunk && chunk.length) { | ||
} | ||
state.ended = true; | ||
@@ -394,3 +421,2 @@ // if we've ended and we have some data left, then emit | ||
function emitReadable_(stream) { | ||
var state = stream._readableState; | ||
stream.emit('readable'); | ||
@@ -672,4 +698,15 @@ } | ||
if (ev === 'readable' && !this._readableState.reading) | ||
this.read(0); | ||
if (ev === 'readable' && this.readable) { | ||
var state = this._readableState; | ||
if (!state.readableListening) { | ||
state.readableListening = true; | ||
state.emittedReadable = false; | ||
state.needReadable = true; | ||
if (!state.reading) { | ||
this.read(0); | ||
} else if (state.length) { | ||
emitReadable(this, state); | ||
} | ||
} | ||
} | ||
@@ -751,4 +788,3 @@ return res; | ||
stream.on('end', function() { | ||
state.ended = true; | ||
if (state.decoder && state.decoder.end) { | ||
if (state.decoder && !state.ended) { | ||
var chunk = state.decoder.end(); | ||
@@ -765,3 +801,3 @@ if (chunk && chunk.length) | ||
chunk = state.decoder.write(chunk); | ||
if (!chunk || !chunk.length) | ||
if (!chunk || !state.objectMode && !chunk.length) | ||
return; | ||
@@ -797,6 +833,8 @@ | ||
if (paused) { | ||
paused = false; | ||
stream.resume(); | ||
paused = false; | ||
} | ||
}; | ||
return self; | ||
}; | ||
@@ -885,8 +923,11 @@ | ||
state.ended = true; | ||
state.endEmitted = true; | ||
process.nextTick(function() { | ||
stream.readable = false; | ||
stream.emit('end'); | ||
// Check that we didn't get one last unshift. | ||
if (!state.endEmitted && state.length === 0) { | ||
state.endEmitted = true; | ||
stream.readable = false; | ||
stream.emit('end'); | ||
} | ||
}); | ||
} | ||
} |
@@ -73,4 +73,2 @@ // Copyright Joyent, Inc. and other Node contributors. | ||
function TransformState(options, stream) { | ||
var ts = this; | ||
this.afterTransform = function(er, data) { | ||
@@ -105,2 +103,3 @@ return afterTransform(stream, er, data); | ||
var rs = stream._readableState; | ||
rs.reading = false; | ||
if (rs.needReadable || rs.length < rs.highWaterMark) { | ||
@@ -141,5 +140,5 @@ stream._read(rs.highWaterMark); | ||
Transform.prototype.push = function(chunk) { | ||
Transform.prototype.push = function(chunk, encoding) { | ||
this._transformState.needTransform = false; | ||
return Duplex.prototype.push.call(this, chunk); | ||
return Duplex.prototype.push.call(this, chunk, encoding); | ||
}; | ||
@@ -157,3 +156,3 @@ | ||
// never call cb(), then you'll never get another chunk. | ||
Transform.prototype._transform = function(chunk, output, cb) { | ||
Transform.prototype._transform = function(chunk, encoding, cb) { | ||
throw new Error('not implemented'); | ||
@@ -177,3 +176,3 @@ }; | ||
// Doesn't matter what the args are here. | ||
// the output and callback functions passed to _transform do all the work. | ||
// _transform does all the work. | ||
// That we got here means that the readable side wants more data. | ||
@@ -180,0 +179,0 @@ Transform.prototype._read = function(n) { |
@@ -71,2 +71,7 @@ // Copyright Joyent, Inc. and other Node contributors. | ||
// Crypto is kind of old and crusty. Historically, its default string | ||
// encoding is 'binary' so we have to make this configurable. | ||
// Everything else in the universe uses 'utf8', though. | ||
this.defaultEncoding = options.defaultEncoding || 'utf8'; | ||
// not an actual buffer we keep track of, but a measurement | ||
@@ -164,5 +169,8 @@ // of how much we're waiting to get pushed to some underlying | ||
} | ||
if (!encoding) | ||
encoding = 'utf8'; | ||
if (Buffer.isBuffer(chunk)) | ||
encoding = 'buffer'; | ||
else if (!encoding) | ||
encoding = state.defaultEncoding; | ||
if (typeof cb !== 'function') | ||
@@ -245,3 +253,4 @@ cb = function() {}; | ||
else { | ||
var finished = finishMaybe(stream, state); | ||
// Check if we're actually ready to finish, but don't emit yet | ||
var finished = needFinish(stream, state); | ||
@@ -265,2 +274,4 @@ if (!finished && !state.bufferProcessing && state.buffer.length) | ||
cb(); | ||
if (finished) | ||
finishMaybe(stream, state); | ||
} | ||
@@ -333,8 +344,17 @@ | ||
function needFinish(stream, state) { | ||
return (state.ending && | ||
state.length === 0 && | ||
!state.finished && | ||
!state.writing); | ||
} | ||
function finishMaybe(stream, state) { | ||
if (state.ending && state.length === 0 && !state.finished) { | ||
var need = needFinish(stream, state); | ||
if (need) { | ||
state.finished = true; | ||
stream.emit('finish'); | ||
} | ||
return state.finished; | ||
return need; | ||
} | ||
@@ -341,0 +361,0 @@ |
{ | ||
"name": "readable-stream", | ||
"version": "1.0.2", | ||
"version": "1.0.15", | ||
"description": "An exploration of a new kind of readable streams for Node.js", | ||
@@ -5,0 +5,0 @@ "main": "readable.js", |
213580
38
5747