readable-stream
Advanced tools
Comparing version 2.0.5 to 2.0.6
@@ -9,10 +9,11 @@ // a duplex stream is just a stream that is both readable and writable. | ||
/*<replacement>*/ | ||
var objectKeys = Object.keys || function (obj) { | ||
var keys = []; | ||
for (var key in obj) keys.push(key); | ||
return keys; | ||
} | ||
for (var key in obj) { | ||
keys.push(key); | ||
}return keys; | ||
}; | ||
/*</replacement>*/ | ||
module.exports = Duplex; | ||
@@ -24,4 +25,2 @@ | ||
/*<replacement>*/ | ||
@@ -40,9 +39,7 @@ var util = require('core-util-is'); | ||
var method = keys[v]; | ||
if (!Duplex.prototype[method]) | ||
Duplex.prototype[method] = Writable.prototype[method]; | ||
if (!Duplex.prototype[method]) Duplex.prototype[method] = Writable.prototype[method]; | ||
} | ||
function Duplex(options) { | ||
if (!(this instanceof Duplex)) | ||
return new Duplex(options); | ||
if (!(this instanceof Duplex)) return new Duplex(options); | ||
@@ -52,11 +49,8 @@ Readable.call(this, options); | ||
if (options && options.readable === false) | ||
this.readable = false; | ||
if (options && options.readable === false) this.readable = false; | ||
if (options && options.writable === false) | ||
this.writable = false; | ||
if (options && options.writable === false) this.writable = false; | ||
this.allowHalfOpen = true; | ||
if (options && options.allowHalfOpen === false) | ||
this.allowHalfOpen = false; | ||
if (options && options.allowHalfOpen === false) this.allowHalfOpen = false; | ||
@@ -70,4 +64,3 @@ this.once('end', onend); | ||
// then we're ok. | ||
if (this.allowHalfOpen || this._writableState.ended) | ||
return; | ||
if (this.allowHalfOpen || this._writableState.ended) return; | ||
@@ -83,6 +76,6 @@ // no more data can be written. | ||
function forEach (xs, f) { | ||
function forEach(xs, f) { | ||
for (var i = 0, l = xs.length; i < l; i++) { | ||
f(xs[i], i); | ||
} | ||
} | ||
} |
@@ -19,4 +19,3 @@ // a passthrough stream. | ||
function PassThrough(options) { | ||
if (!(this instanceof PassThrough)) | ||
return new PassThrough(options); | ||
if (!(this instanceof PassThrough)) return new PassThrough(options); | ||
@@ -26,4 +25,4 @@ Transform.call(this, options); | ||
PassThrough.prototype._transform = function(chunk, encoding, cb) { | ||
PassThrough.prototype._transform = function (chunk, encoding, cb) { | ||
cb(null, chunk); | ||
}; | ||
}; |
@@ -9,3 +9,2 @@ 'use strict'; | ||
/*<replacement>*/ | ||
@@ -15,3 +14,2 @@ var isArray = require('isarray'); | ||
/*<replacement>*/ | ||
@@ -26,3 +24,3 @@ var Buffer = require('buffer').Buffer; | ||
/*<replacement>*/ | ||
var EElistenerCount = function(emitter, type) { | ||
var EElistenerCount = function (emitter, type) { | ||
return emitter.listeners(type).length; | ||
@@ -32,12 +30,11 @@ }; | ||
/*<replacement>*/ | ||
var Stream; | ||
(function (){try{ | ||
Stream = require('st' + 'ream'); | ||
}catch(_){}finally{ | ||
if (!Stream) | ||
Stream = require('events').EventEmitter; | ||
}}()) | ||
(function () { | ||
try { | ||
Stream = require('st' + 'ream'); | ||
} catch (_) {} finally { | ||
if (!Stream) Stream = require('events').EventEmitter; | ||
} | ||
})(); | ||
/*</replacement>*/ | ||
@@ -52,7 +49,5 @@ | ||
/*<replacement>*/ | ||
var debugUtil = require('util'); | ||
var debug; | ||
var debug = undefined; | ||
if (debugUtil && debugUtil.debuglog) { | ||
@@ -79,4 +74,3 @@ debug = debugUtil.debuglog('stream'); | ||
if (stream instanceof Duplex) | ||
this.objectMode = this.objectMode || !!options.readableObjectMode; | ||
if (stream instanceof Duplex) this.objectMode = this.objectMode || !!options.readableObjectMode; | ||
@@ -87,6 +81,6 @@ // the point at which it stops calling _read() to fill the buffer | ||
var defaultHwm = this.objectMode ? 16 : 16 * 1024; | ||
this.highWaterMark = (hwm || hwm === 0) ? hwm : defaultHwm; | ||
this.highWaterMark = hwm || hwm === 0 ? hwm : defaultHwm; | ||
// cast to ints. | ||
this.highWaterMark = ~~this.highWaterMark; | ||
this.highWaterMark = ~ ~this.highWaterMark; | ||
@@ -113,2 +107,3 @@ this.buffer = []; | ||
this.readableListening = false; | ||
this.resumeScheduled = false; | ||
@@ -133,4 +128,3 @@ // Crypto is kind of old and crusty. Historically, its default string | ||
if (options.encoding) { | ||
if (!StringDecoder) | ||
StringDecoder = require('string_decoder/').StringDecoder; | ||
if (!StringDecoder) StringDecoder = require('string_decoder/').StringDecoder; | ||
this.decoder = new StringDecoder(options.encoding); | ||
@@ -145,4 +139,3 @@ this.encoding = options.encoding; | ||
if (!(this instanceof Readable)) | ||
return new Readable(options); | ||
if (!(this instanceof Readable)) return new Readable(options); | ||
@@ -154,4 +147,3 @@ this._readableState = new ReadableState(options, this); | ||
if (options && typeof options.read === 'function') | ||
this._read = options.read; | ||
if (options && typeof options.read === 'function') this._read = options.read; | ||
@@ -165,3 +157,3 @@ Stream.call(this); | ||
// write() some more. | ||
Readable.prototype.push = function(chunk, encoding) { | ||
Readable.prototype.push = function (chunk, encoding) { | ||
var state = this._readableState; | ||
@@ -181,3 +173,3 @@ | ||
// Unshift should *always* be something directly out of read() | ||
Readable.prototype.unshift = function(chunk) { | ||
Readable.prototype.unshift = function (chunk) { | ||
var state = this._readableState; | ||
@@ -187,3 +179,3 @@ return readableAddChunk(this, state, chunk, '', true); | ||
Readable.prototype.isPaused = function() { | ||
Readable.prototype.isPaused = function () { | ||
return this._readableState.flowing === false; | ||
@@ -207,22 +199,24 @@ }; | ||
} else { | ||
if (state.decoder && !addToFront && !encoding) | ||
var skipAdd; | ||
if (state.decoder && !addToFront && !encoding) { | ||
chunk = state.decoder.write(chunk); | ||
skipAdd = !state.objectMode && chunk.length === 0; | ||
} | ||
if (!addToFront) | ||
state.reading = false; | ||
if (!addToFront) state.reading = false; | ||
// if we want the data now, just emit it. | ||
if (state.flowing && state.length === 0 && !state.sync) { | ||
stream.emit('data', chunk); | ||
stream.read(0); | ||
} else { | ||
// update the buffer info. | ||
state.length += state.objectMode ? 1 : chunk.length; | ||
if (addToFront) | ||
state.buffer.unshift(chunk); | ||
else | ||
state.buffer.push(chunk); | ||
// Don't add to the buffer if we've decoded to an empty string chunk and | ||
// we're not in object mode | ||
if (!skipAdd) { | ||
// if we want the data now, just emit it. | ||
if (state.flowing && state.length === 0 && !state.sync) { | ||
stream.emit('data', chunk); | ||
stream.read(0); | ||
} else { | ||
// update the buffer info. | ||
state.length += state.objectMode ? 1 : chunk.length; | ||
if (addToFront) state.buffer.unshift(chunk);else state.buffer.push(chunk); | ||
if (state.needReadable) | ||
emitReadable(stream); | ||
if (state.needReadable) emitReadable(stream); | ||
} | ||
} | ||
@@ -239,3 +233,2 @@ | ||
// if it's past the high water mark, we can push in some more. | ||
@@ -249,12 +242,8 @@ // Also, if we have no data yet, we can stand some | ||
function needMoreData(state) { | ||
return !state.ended && | ||
(state.needReadable || | ||
state.length < state.highWaterMark || | ||
state.length === 0); | ||
return !state.ended && (state.needReadable || state.length < state.highWaterMark || state.length === 0); | ||
} | ||
// backwards compatibility. | ||
Readable.prototype.setEncoding = function(enc) { | ||
if (!StringDecoder) | ||
StringDecoder = require('string_decoder/').StringDecoder; | ||
Readable.prototype.setEncoding = function (enc) { | ||
if (!StringDecoder) StringDecoder = require('string_decoder/').StringDecoder; | ||
this._readableState.decoder = new StringDecoder(enc); | ||
@@ -284,18 +273,12 @@ this._readableState.encoding = enc; | ||
function howMuchToRead(n, state) { | ||
if (state.length === 0 && state.ended) | ||
return 0; | ||
if (state.length === 0 && state.ended) return 0; | ||
if (state.objectMode) | ||
return n === 0 ? 0 : 1; | ||
if (state.objectMode) return n === 0 ? 0 : 1; | ||
if (n === null || isNaN(n)) { | ||
// only flow one buffer at a time | ||
if (state.flowing && state.buffer.length) | ||
return state.buffer[0].length; | ||
else | ||
return state.length; | ||
if (state.flowing && state.buffer.length) return state.buffer[0].length;else return state.length; | ||
} | ||
if (n <= 0) | ||
return 0; | ||
if (n <= 0) return 0; | ||
@@ -306,4 +289,3 @@ // If we're asking for more than the target buffer level, | ||
// amounts. | ||
if (n > state.highWaterMark) | ||
state.highWaterMark = computeNewHighWaterMark(n); | ||
if (n > state.highWaterMark) state.highWaterMark = computeNewHighWaterMark(n); | ||
@@ -324,3 +306,3 @@ // don't have that much. return null, unless we've ended. | ||
// you can override either this method, or the async _read(n) below. | ||
Readable.prototype.read = function(n) { | ||
Readable.prototype.read = function (n) { | ||
debug('read', n); | ||
@@ -330,4 +312,3 @@ var state = this._readableState; | ||
if (typeof n !== 'number' || n > 0) | ||
state.emittedReadable = false; | ||
if (typeof n !== 'number' || n > 0) state.emittedReadable = false; | ||
@@ -337,10 +318,5 @@ // if we're doing read(0) to trigger a readable event, but we | ||
// the 'readable' event and move on. | ||
if (n === 0 && | ||
state.needReadable && | ||
(state.length >= state.highWaterMark || state.ended)) { | ||
if (n === 0 && state.needReadable && (state.length >= state.highWaterMark || state.ended)) { | ||
debug('read: emitReadable', state.length, state.ended); | ||
if (state.length === 0 && state.ended) | ||
endReadable(this); | ||
else | ||
emitReadable(this); | ||
if (state.length === 0 && state.ended) endReadable(this);else emitReadable(this); | ||
return null; | ||
@@ -353,4 +329,3 @@ } | ||
if (n === 0 && state.ended) { | ||
if (state.length === 0) | ||
endReadable(this); | ||
if (state.length === 0) endReadable(this); | ||
return null; | ||
@@ -403,4 +378,3 @@ } | ||
// if the length is currently zero, then we *need* a readable event. | ||
if (state.length === 0) | ||
state.needReadable = true; | ||
if (state.length === 0) state.needReadable = true; | ||
// call internal read method | ||
@@ -413,10 +387,6 @@ this._read(state.highWaterMark); | ||
// and we need to re-evaluate how much data we can return to the user. | ||
if (doRead && !state.reading) | ||
n = howMuchToRead(nOrig, state); | ||
if (doRead && !state.reading) n = howMuchToRead(nOrig, state); | ||
var ret; | ||
if (n > 0) | ||
ret = fromList(n, state); | ||
else | ||
ret = null; | ||
if (n > 0) ret = fromList(n, state);else ret = null; | ||
@@ -432,11 +402,8 @@ if (ret === null) { | ||
// as soon as we *do* get something into the buffer. | ||
if (state.length === 0 && !state.ended) | ||
state.needReadable = true; | ||
if (state.length === 0 && !state.ended) state.needReadable = true; | ||
// If we tried to read() past the EOF, then emit end on the next tick. | ||
if (nOrig !== n && state.ended && state.length === 0) | ||
endReadable(this); | ||
if (nOrig !== n && state.ended && state.length === 0) endReadable(this); | ||
if (ret !== null) | ||
this.emit('data', ret); | ||
if (ret !== null) this.emit('data', ret); | ||
@@ -448,7 +415,3 @@ return ret; | ||
var er = null; | ||
if (!(Buffer.isBuffer(chunk)) && | ||
typeof chunk !== 'string' && | ||
chunk !== null && | ||
chunk !== undefined && | ||
!state.objectMode) { | ||
if (!Buffer.isBuffer(chunk) && typeof chunk !== 'string' && chunk !== null && chunk !== undefined && !state.objectMode) { | ||
er = new TypeError('Invalid non-string/buffer chunk'); | ||
@@ -459,3 +422,2 @@ } | ||
function onEofChunk(stream, state) { | ||
@@ -485,6 +447,3 @@ if (state.ended) return; | ||
state.emittedReadable = true; | ||
if (state.sync) | ||
processNextTick(emitReadable_, stream); | ||
else | ||
emitReadable_(stream); | ||
if (state.sync) processNextTick(emitReadable_, stream);else emitReadable_(stream); | ||
} | ||
@@ -499,3 +458,2 @@ } | ||
// at this point, the user has presumably seen the 'readable' event, | ||
@@ -516,4 +474,3 @@ // and called read() to consume some data. that may have triggered | ||
var len = state.length; | ||
while (!state.reading && !state.flowing && !state.ended && | ||
state.length < state.highWaterMark) { | ||
while (!state.reading && !state.flowing && !state.ended && state.length < state.highWaterMark) { | ||
debug('maybeReadMore read 0'); | ||
@@ -523,5 +480,3 @@ stream.read(0); | ||
// didn't get any data, stop spinning. | ||
break; | ||
else | ||
len = state.length; | ||
break;else len = state.length; | ||
} | ||
@@ -535,7 +490,7 @@ state.readingMore = false; | ||
// arbitrary, and perhaps not very meaningful. | ||
Readable.prototype._read = function(n) { | ||
Readable.prototype._read = function (n) { | ||
this.emit('error', new Error('not implemented')); | ||
}; | ||
Readable.prototype.pipe = function(dest, pipeOpts) { | ||
Readable.prototype.pipe = function (dest, pipeOpts) { | ||
var src = this; | ||
@@ -558,11 +513,6 @@ var state = this._readableState; | ||
var doEnd = (!pipeOpts || pipeOpts.end !== false) && | ||
dest !== process.stdout && | ||
dest !== process.stderr; | ||
var doEnd = (!pipeOpts || pipeOpts.end !== false) && dest !== process.stdout && dest !== process.stderr; | ||
var endFn = doEnd ? onend : cleanup; | ||
if (state.endEmitted) | ||
processNextTick(endFn); | ||
else | ||
src.once('end', endFn); | ||
if (state.endEmitted) processNextTick(endFn);else src.once('end', endFn); | ||
@@ -609,5 +559,3 @@ dest.on('unpipe', onunpipe); | ||
// If we don't know, then assume that we are waiting for one. | ||
if (state.awaitDrain && | ||
(!dest._writableState || dest._writableState.needDrain)) | ||
ondrain(); | ||
if (state.awaitDrain && (!dest._writableState || dest._writableState.needDrain)) ondrain(); | ||
} | ||
@@ -623,6 +571,3 @@ | ||
// also returned false. | ||
if (state.pipesCount === 1 && | ||
state.pipes[0] === dest && | ||
src.listenerCount('data') === 1 && | ||
!cleanedUp) { | ||
if (state.pipesCount === 1 && state.pipes[0] === dest && src.listenerCount('data') === 1 && !cleanedUp) { | ||
debug('false write response, pause', src._readableState.awaitDrain); | ||
@@ -641,15 +586,8 @@ src._readableState.awaitDrain++; | ||
dest.removeListener('error', onerror); | ||
if (EElistenerCount(dest, 'error') === 0) | ||
dest.emit('error', er); | ||
if (EElistenerCount(dest, 'error') === 0) dest.emit('error', er); | ||
} | ||
// This is a brutally ugly hack to make sure that our error handler | ||
// is attached before any userland ones. NEVER DO THIS. | ||
if (!dest._events || !dest._events.error) | ||
dest.on('error', onerror); | ||
else if (isArray(dest._events.error)) | ||
dest._events.error.unshift(onerror); | ||
else | ||
dest._events.error = [onerror, dest._events.error]; | ||
if (!dest._events || !dest._events.error) dest.on('error', onerror);else if (isArray(dest._events.error)) dest._events.error.unshift(onerror);else dest._events.error = [onerror, dest._events.error]; | ||
// Both close and finish should trigger unpipe, but only once. | ||
@@ -686,7 +624,6 @@ function onclose() { | ||
function pipeOnDrain(src) { | ||
return function() { | ||
return function () { | ||
var state = src._readableState; | ||
debug('pipeOnDrain', state.awaitDrain); | ||
if (state.awaitDrain) | ||
state.awaitDrain--; | ||
if (state.awaitDrain) state.awaitDrain--; | ||
if (state.awaitDrain === 0 && EElistenerCount(src, 'data')) { | ||
@@ -699,9 +636,7 @@ state.flowing = true; | ||
Readable.prototype.unpipe = function(dest) { | ||
Readable.prototype.unpipe = function (dest) { | ||
var state = this._readableState; | ||
// if we're not piping anywhere, then do nothing. | ||
if (state.pipesCount === 0) | ||
return this; | ||
if (state.pipesCount === 0) return this; | ||
@@ -711,7 +646,5 @@ // just one destination. most common case. | ||
// passed in one, but it's not the right one. | ||
if (dest && dest !== state.pipes) | ||
return this; | ||
if (dest && dest !== state.pipes) return this; | ||
if (!dest) | ||
dest = state.pipes; | ||
if (!dest) dest = state.pipes; | ||
@@ -722,4 +655,3 @@ // got a match. | ||
state.flowing = false; | ||
if (dest) | ||
dest.emit('unpipe', this); | ||
if (dest) dest.emit('unpipe', this); | ||
return this; | ||
@@ -738,5 +670,5 @@ } | ||
for (var i = 0; i < len; i++) | ||
dests[i].emit('unpipe', this); | ||
return this; | ||
for (var _i = 0; _i < len; _i++) { | ||
dests[_i].emit('unpipe', this); | ||
}return this; | ||
} | ||
@@ -746,9 +678,7 @@ | ||
var i = indexOf(state.pipes, dest); | ||
if (i === -1) | ||
return this; | ||
if (i === -1) return this; | ||
state.pipes.splice(i, 1); | ||
state.pipesCount -= 1; | ||
if (state.pipesCount === 1) | ||
state.pipes = state.pipes[0]; | ||
if (state.pipesCount === 1) state.pipes = state.pipes[0]; | ||
@@ -762,3 +692,3 @@ dest.emit('unpipe', this); | ||
// Ensure readable listeners eventually get something | ||
Readable.prototype.on = function(ev, fn) { | ||
Readable.prototype.on = function (ev, fn) { | ||
var res = Stream.prototype.on.call(this, ev, fn); | ||
@@ -772,3 +702,3 @@ | ||
if (ev === 'readable' && this.readable) { | ||
if (ev === 'readable' && !this._readableState.endEmitted) { | ||
var state = this._readableState; | ||
@@ -798,3 +728,3 @@ if (!state.readableListening) { | ||
// If the user uses them, then switch into old mode. | ||
Readable.prototype.resume = function() { | ||
Readable.prototype.resume = function () { | ||
var state = this._readableState; | ||
@@ -825,7 +755,6 @@ if (!state.flowing) { | ||
flow(stream); | ||
if (state.flowing && !state.reading) | ||
stream.read(0); | ||
if (state.flowing && !state.reading) stream.read(0); | ||
} | ||
Readable.prototype.pause = function() { | ||
Readable.prototype.pause = function () { | ||
debug('call pause flowing=%j', this._readableState.flowing); | ||
@@ -853,3 +782,3 @@ if (false !== this._readableState.flowing) { | ||
// It is an ugly unfortunate mess of history. | ||
Readable.prototype.wrap = function(stream) { | ||
Readable.prototype.wrap = function (stream) { | ||
var state = this._readableState; | ||
@@ -859,8 +788,7 @@ var paused = false; | ||
var self = this; | ||
stream.on('end', function() { | ||
stream.on('end', function () { | ||
debug('wrapped end'); | ||
if (state.decoder && !state.ended) { | ||
var chunk = state.decoder.end(); | ||
if (chunk && chunk.length) | ||
self.push(chunk); | ||
if (chunk && chunk.length) self.push(chunk); | ||
} | ||
@@ -871,12 +799,8 @@ | ||
stream.on('data', function(chunk) { | ||
stream.on('data', function (chunk) { | ||
debug('wrapped data'); | ||
if (state.decoder) | ||
chunk = state.decoder.write(chunk); | ||
if (state.decoder) chunk = state.decoder.write(chunk); | ||
// don't skip over falsy values in objectMode | ||
if (state.objectMode && (chunk === null || chunk === undefined)) | ||
return; | ||
else if (!state.objectMode && (!chunk || !chunk.length)) | ||
return; | ||
if (state.objectMode && (chunk === null || chunk === undefined)) return;else if (!state.objectMode && (!chunk || !chunk.length)) return; | ||
@@ -894,5 +818,7 @@ var ret = self.push(chunk); | ||
if (this[i] === undefined && typeof stream[i] === 'function') { | ||
this[i] = function(method) { return function() { | ||
return stream[method].apply(stream, arguments); | ||
}; }(i); | ||
this[i] = function (method) { | ||
return function () { | ||
return stream[method].apply(stream, arguments); | ||
}; | ||
}(i); | ||
} | ||
@@ -903,3 +829,3 @@ } | ||
var events = ['error', 'close', 'destroy', 'pause', 'resume']; | ||
forEach(events, function(ev) { | ||
forEach(events, function (ev) { | ||
stream.on(ev, self.emit.bind(self, ev)); | ||
@@ -910,3 +836,3 @@ }); | ||
// underlying stream. | ||
self._read = function(n) { | ||
self._read = function (n) { | ||
debug('wrapped _read', n); | ||
@@ -922,3 +848,2 @@ if (paused) { | ||
// exposed for testing purposes only. | ||
@@ -937,17 +862,7 @@ Readable._fromList = fromList; | ||
// nothing in the list, definitely empty. | ||
if (list.length === 0) | ||
return null; | ||
if (list.length === 0) return null; | ||
if (length === 0) | ||
ret = null; | ||
else if (objectMode) | ||
ret = list.shift(); | ||
else if (!n || n >= length) { | ||
if (length === 0) ret = null;else if (objectMode) ret = list.shift();else if (!n || n >= length) { | ||
// read it all, truncate the array. | ||
if (stringMode) | ||
ret = list.join(''); | ||
else if (list.length === 1) | ||
ret = list[0]; | ||
else | ||
ret = Buffer.concat(list, length); | ||
if (stringMode) ret = list.join('');else if (list.length === 1) ret = list[0];else ret = Buffer.concat(list, length); | ||
list.length = 0; | ||
@@ -968,6 +883,3 @@ } else { | ||
// we have enough to cover it, but it spans past the first buffer. | ||
if (stringMode) | ||
ret = ''; | ||
else | ||
ret = new Buffer(n); | ||
if (stringMode) ret = '';else ret = new Buffer(n); | ||
@@ -979,11 +891,5 @@ var c = 0; | ||
if (stringMode) | ||
ret += buf.slice(0, cpy); | ||
else | ||
buf.copy(ret, c, 0, cpy); | ||
if (stringMode) ret += buf.slice(0, cpy);else buf.copy(ret, c, 0, cpy); | ||
if (cpy < buf.length) | ||
list[0] = buf.slice(cpy); | ||
else | ||
list.shift(); | ||
if (cpy < buf.length) list[0] = buf.slice(cpy);else list.shift(); | ||
@@ -1003,4 +909,3 @@ c += cpy; | ||
// bug in node. Should never happen. | ||
if (state.length > 0) | ||
throw new Error('endReadable called on non-empty stream'); | ||
if (state.length > 0) throw new Error('endReadable called on non-empty stream'); | ||
@@ -1022,3 +927,3 @@ if (!state.endEmitted) { | ||
function forEach (xs, f) { | ||
function forEach(xs, f) { | ||
for (var i = 0, l = xs.length; i < l; i++) { | ||
@@ -1029,3 +934,3 @@ f(xs[i], i); | ||
function indexOf (xs, x) { | ||
function indexOf(xs, x) { | ||
for (var i = 0, l = xs.length; i < l; i++) { | ||
@@ -1035,2 +940,2 @@ if (xs[i] === x) return i; | ||
return -1; | ||
} | ||
} |
@@ -56,5 +56,4 @@ // a transform stream is a readable/writable stream where you do | ||
function TransformState(stream) { | ||
this.afterTransform = function(er, data) { | ||
this.afterTransform = function (er, data) { | ||
return afterTransform(stream, er, data); | ||
@@ -67,2 +66,3 @@ }; | ||
this.writechunk = null; | ||
this.writeencoding = null; | ||
} | ||
@@ -76,4 +76,3 @@ | ||
if (!cb) | ||
return stream.emit('error', new Error('no writecb in Transform class')); | ||
if (!cb) return stream.emit('error', new Error('no writecb in Transform class')); | ||
@@ -83,7 +82,5 @@ ts.writechunk = null; | ||
if (data !== null && data !== undefined) | ||
stream.push(data); | ||
if (data !== null && data !== undefined) stream.push(data); | ||
if (cb) | ||
cb(er); | ||
cb(er); | ||
@@ -97,6 +94,4 @@ var rs = stream._readableState; | ||
function Transform(options) { | ||
if (!(this instanceof Transform)) | ||
return new Transform(options); | ||
if (!(this instanceof Transform)) return new Transform(options); | ||
@@ -119,20 +114,15 @@ Duplex.call(this, options); | ||
if (options) { | ||
if (typeof options.transform === 'function') | ||
this._transform = options.transform; | ||
if (typeof options.transform === 'function') this._transform = options.transform; | ||
if (typeof options.flush === 'function') | ||
this._flush = options.flush; | ||
if (typeof options.flush === 'function') this._flush = options.flush; | ||
} | ||
this.once('prefinish', function() { | ||
if (typeof this._flush === 'function') | ||
this._flush(function(er) { | ||
done(stream, er); | ||
}); | ||
else | ||
done(stream); | ||
this.once('prefinish', function () { | ||
if (typeof this._flush === 'function') this._flush(function (er) { | ||
done(stream, er); | ||
});else done(stream); | ||
}); | ||
} | ||
Transform.prototype.push = function(chunk, encoding) { | ||
Transform.prototype.push = function (chunk, encoding) { | ||
this._transformState.needTransform = false; | ||
@@ -152,7 +142,7 @@ return Duplex.prototype.push.call(this, chunk, encoding); | ||
// never call cb(), then you'll never get another chunk. | ||
Transform.prototype._transform = function(chunk, encoding, cb) { | ||
Transform.prototype._transform = function (chunk, encoding, cb) { | ||
throw new Error('not implemented'); | ||
}; | ||
Transform.prototype._write = function(chunk, encoding, cb) { | ||
Transform.prototype._write = function (chunk, encoding, cb) { | ||
var ts = this._transformState; | ||
@@ -164,6 +154,3 @@ ts.writecb = cb; | ||
var rs = this._readableState; | ||
if (ts.needTransform || | ||
rs.needReadable || | ||
rs.length < rs.highWaterMark) | ||
this._read(rs.highWaterMark); | ||
if (ts.needTransform || rs.needReadable || rs.length < rs.highWaterMark) this._read(rs.highWaterMark); | ||
} | ||
@@ -175,3 +162,3 @@ }; | ||
// That we got here means that the readable side wants more data. | ||
Transform.prototype._read = function(n) { | ||
Transform.prototype._read = function (n) { | ||
var ts = this._transformState; | ||
@@ -189,6 +176,4 @@ | ||
function done(stream, er) { | ||
if (er) | ||
return stream.emit('error', er); | ||
if (er) return stream.emit('error', er); | ||
@@ -200,9 +185,7 @@ // if there's nothing in the write buffer, then that means | ||
if (ws.length) | ||
throw new Error('calling transform done when ws.length != 0'); | ||
if (ws.length) throw new Error('calling transform done when ws.length != 0'); | ||
if (ts.transforming) | ||
throw new Error('calling transform done when still transforming'); | ||
if (ts.transforming) throw new Error('calling transform done when still transforming'); | ||
return stream.push(null); | ||
} | ||
} |
@@ -13,2 +13,5 @@ // A bit simpler than readable streams. | ||
/*<replacement>*/ | ||
var asyncWrite = !process.browser && ['v0.10', 'v0.9.'].indexOf(process.version.slice(0, 5)) > -1 ? setImmediate : processNextTick; | ||
/*</replacement>*/ | ||
@@ -21,3 +24,2 @@ /*<replacement>*/ | ||
/*<replacement>*/ | ||
@@ -28,3 +30,2 @@ var util = require('core-util-is'); | ||
/*<replacement>*/ | ||
@@ -36,12 +37,11 @@ var internalUtil = { | ||
/*<replacement>*/ | ||
var Stream; | ||
(function (){try{ | ||
Stream = require('st' + 'ream'); | ||
}catch(_){}finally{ | ||
if (!Stream) | ||
Stream = require('events').EventEmitter; | ||
}}()) | ||
(function () { | ||
try { | ||
Stream = require('st' + 'ream'); | ||
} catch (_) {} finally { | ||
if (!Stream) Stream = require('events').EventEmitter; | ||
} | ||
})(); | ||
/*</replacement>*/ | ||
@@ -72,4 +72,3 @@ | ||
if (stream instanceof Duplex) | ||
this.objectMode = this.objectMode || !!options.writableObjectMode; | ||
if (stream instanceof Duplex) this.objectMode = this.objectMode || !!options.writableObjectMode; | ||
@@ -81,6 +80,6 @@ // the point at which write() starts returning false | ||
var defaultHwm = this.objectMode ? 16 : 16 * 1024; | ||
this.highWaterMark = (hwm || hwm === 0) ? hwm : defaultHwm; | ||
this.highWaterMark = hwm || hwm === 0 ? hwm : defaultHwm; | ||
// cast to ints. | ||
this.highWaterMark = ~~this.highWaterMark; | ||
this.highWaterMark = ~ ~this.highWaterMark; | ||
@@ -129,3 +128,3 @@ this.needDrain = false; | ||
// the callback that's passed to _write(chunk,cb) | ||
this.onwrite = function(er) { | ||
this.onwrite = function (er) { | ||
onwrite(stream, er); | ||
@@ -153,2 +152,10 @@ }; | ||
this.errorEmitted = false; | ||
// count buffered requests | ||
this.bufferedRequestCount = 0; | ||
// create the two objects needed to store the corked requests | ||
// they are not a linked list, as no new elements are inserted in there | ||
this.corkedRequestsFree = new CorkedRequest(this); | ||
this.corkedRequestsFree.next = new CorkedRequest(this); | ||
} | ||
@@ -166,12 +173,12 @@ | ||
(function (){try { | ||
Object.defineProperty(WritableState.prototype, 'buffer', { | ||
get: internalUtil.deprecate(function() { | ||
return this.getBuffer(); | ||
}, '_writableState.buffer is deprecated. Use _writableState.getBuffer ' + | ||
'instead.') | ||
}); | ||
}catch(_){}}()); | ||
(function () { | ||
try { | ||
Object.defineProperty(WritableState.prototype, 'buffer', { | ||
get: internalUtil.deprecate(function () { | ||
return this.getBuffer(); | ||
}, '_writableState.buffer is deprecated. Use _writableState.getBuffer ' + 'instead.') | ||
}); | ||
} catch (_) {} | ||
})(); | ||
var Duplex; | ||
@@ -183,4 +190,3 @@ function Writable(options) { | ||
// instanceof Writable, they're instanceof Readable. | ||
if (!(this instanceof Writable) && !(this instanceof Duplex)) | ||
return new Writable(options); | ||
if (!(this instanceof Writable) && !(this instanceof Duplex)) return new Writable(options); | ||
@@ -193,7 +199,5 @@ this._writableState = new WritableState(options, this); | ||
if (options) { | ||
if (typeof options.write === 'function') | ||
this._write = options.write; | ||
if (typeof options.write === 'function') this._write = options.write; | ||
if (typeof options.writev === 'function') | ||
this._writev = options.writev; | ||
if (typeof options.writev === 'function') this._writev = options.writev; | ||
} | ||
@@ -205,7 +209,6 @@ | ||
// Otherwise people can pipe Writable streams, which is just wrong. | ||
Writable.prototype.pipe = function() { | ||
Writable.prototype.pipe = function () { | ||
this.emit('error', new Error('Cannot pipe. Not readable.')); | ||
}; | ||
function writeAfterEnd(stream, cb) { | ||
@@ -226,7 +229,3 @@ var er = new Error('write after end'); | ||
if (!(Buffer.isBuffer(chunk)) && | ||
typeof chunk !== 'string' && | ||
chunk !== null && | ||
chunk !== undefined && | ||
!state.objectMode) { | ||
if (!Buffer.isBuffer(chunk) && typeof chunk !== 'string' && chunk !== null && chunk !== undefined && !state.objectMode) { | ||
var er = new TypeError('Invalid non-string/buffer chunk'); | ||
@@ -240,3 +239,3 @@ stream.emit('error', er); | ||
Writable.prototype.write = function(chunk, encoding, cb) { | ||
Writable.prototype.write = function (chunk, encoding, cb) { | ||
var state = this._writableState; | ||
@@ -250,13 +249,7 @@ var ret = false; | ||
if (Buffer.isBuffer(chunk)) | ||
encoding = 'buffer'; | ||
else if (!encoding) | ||
encoding = state.defaultEncoding; | ||
if (Buffer.isBuffer(chunk)) encoding = 'buffer';else if (!encoding) encoding = state.defaultEncoding; | ||
if (typeof cb !== 'function') | ||
cb = nop; | ||
if (typeof cb !== 'function') cb = nop; | ||
if (state.ended) | ||
writeAfterEnd(this, cb); | ||
else if (validChunk(this, state, chunk, cb)) { | ||
if (state.ended) writeAfterEnd(this, cb);else if (validChunk(this, state, chunk, cb)) { | ||
state.pendingcb++; | ||
@@ -269,3 +262,3 @@ ret = writeOrBuffer(this, state, chunk, encoding, cb); | ||
Writable.prototype.cork = function() { | ||
Writable.prototype.cork = function () { | ||
var state = this._writableState; | ||
@@ -276,3 +269,3 @@ | ||
Writable.prototype.uncork = function() { | ||
Writable.prototype.uncork = function () { | ||
var state = this._writableState; | ||
@@ -283,8 +276,3 @@ | ||
if (!state.writing && | ||
!state.corked && | ||
!state.finished && | ||
!state.bufferProcessing && | ||
state.bufferedRequest) | ||
clearBuffer(this, state); | ||
if (!state.writing && !state.corked && !state.finished && !state.bufferProcessing && state.bufferedRequest) clearBuffer(this, state); | ||
} | ||
@@ -295,8 +283,4 @@ }; | ||
// node::ParseEncoding() requires lower case. | ||
if (typeof encoding === 'string') | ||
encoding = encoding.toLowerCase(); | ||
if (!(['hex', 'utf8', 'utf-8', 'ascii', 'binary', 'base64', | ||
'ucs2', 'ucs-2','utf16le', 'utf-16le', 'raw'] | ||
.indexOf((encoding + '').toLowerCase()) > -1)) | ||
throw new TypeError('Unknown encoding: ' + encoding); | ||
if (typeof encoding === 'string') encoding = encoding.toLowerCase(); | ||
if (!(['hex', 'utf8', 'utf-8', 'ascii', 'binary', 'base64', 'ucs2', 'ucs-2', 'utf16le', 'utf-16le', 'raw'].indexOf((encoding + '').toLowerCase()) > -1)) throw new TypeError('Unknown encoding: ' + encoding); | ||
this._writableState.defaultEncoding = encoding; | ||
@@ -306,5 +290,3 @@ }; | ||
function decodeChunk(state, chunk, encoding) { | ||
if (!state.objectMode && | ||
state.decodeStrings !== false && | ||
typeof chunk === 'string') { | ||
if (!state.objectMode && state.decodeStrings !== false && typeof chunk === 'string') { | ||
chunk = new Buffer(chunk, encoding); | ||
@@ -321,4 +303,3 @@ } | ||
if (Buffer.isBuffer(chunk)) | ||
encoding = 'buffer'; | ||
if (Buffer.isBuffer(chunk)) encoding = 'buffer'; | ||
var len = state.objectMode ? 1 : chunk.length; | ||
@@ -330,4 +311,3 @@ | ||
// we must ensure that previous needDrain will not be reset to false. | ||
if (!ret) | ||
state.needDrain = true; | ||
if (!ret) state.needDrain = true; | ||
@@ -342,2 +322,3 @@ if (state.writing || state.corked) { | ||
} | ||
state.bufferedRequestCount += 1; | ||
} else { | ||
@@ -355,6 +336,3 @@ doWrite(stream, state, false, len, chunk, encoding, cb); | ||
state.sync = true; | ||
if (writev) | ||
stream._writev(chunk, state.onwrite); | ||
else | ||
stream._write(chunk, encoding, state.onwrite); | ||
if (writev) stream._writev(chunk, state.onwrite);else stream._write(chunk, encoding, state.onwrite); | ||
state.sync = false; | ||
@@ -365,6 +343,3 @@ } | ||
--state.pendingcb; | ||
if (sync) | ||
processNextTick(cb, er); | ||
else | ||
cb(er); | ||
if (sync) processNextTick(cb, er);else cb(er); | ||
@@ -389,12 +364,7 @@ stream._writableState.errorEmitted = true; | ||
if (er) | ||
onwriteError(stream, state, sync, er, cb); | ||
else { | ||
if (er) onwriteError(stream, state, sync, er, cb);else { | ||
// Check if we're actually ready to finish, but don't emit yet | ||
var finished = needFinish(state); | ||
if (!finished && | ||
!state.corked && | ||
!state.bufferProcessing && | ||
state.bufferedRequest) { | ||
if (!finished && !state.corked && !state.bufferProcessing && state.bufferedRequest) { | ||
clearBuffer(stream, state); | ||
@@ -404,6 +374,8 @@ } | ||
if (sync) { | ||
processNextTick(afterWrite, stream, state, finished, cb); | ||
/*<replacement>*/ | ||
asyncWrite(afterWrite, stream, state, finished, cb); | ||
/*</replacement>*/ | ||
} else { | ||
afterWrite(stream, state, finished, cb); | ||
} | ||
afterWrite(stream, state, finished, cb); | ||
} | ||
} | ||
@@ -413,4 +385,3 @@ } | ||
function afterWrite(stream, state, finished, cb) { | ||
if (!finished) | ||
onwriteDrain(stream, state); | ||
if (!finished) onwriteDrain(stream, state); | ||
state.pendingcb--; | ||
@@ -431,3 +402,2 @@ cb(); | ||
// if there's something in the buffer waiting, then process it | ||
@@ -440,22 +410,22 @@ function clearBuffer(stream, state) { | ||
// Fast case, write everything using _writev() | ||
var buffer = []; | ||
var cbs = []; | ||
var l = state.bufferedRequestCount; | ||
var buffer = new Array(l); | ||
var holder = state.corkedRequestsFree; | ||
holder.entry = entry; | ||
var count = 0; | ||
while (entry) { | ||
cbs.push(entry.callback); | ||
buffer.push(entry); | ||
buffer[count] = entry; | ||
entry = entry.next; | ||
count += 1; | ||
} | ||
// count the one we are adding, as well. | ||
// TODO(isaacs) clean this up | ||
doWrite(stream, state, true, state.length, buffer, '', holder.finish); | ||
// doWrite is always async, defer these to save a bit of time | ||
// as the hot path ends with doWrite | ||
state.pendingcb++; | ||
state.lastBufferedRequest = null; | ||
doWrite(stream, state, true, state.length, buffer, '', function(err) { | ||
for (var i = 0; i < cbs.length; i++) { | ||
state.pendingcb--; | ||
cbs[i](err); | ||
} | ||
}); | ||
// Clear buffer | ||
state.corkedRequestsFree = holder.next; | ||
holder.next = null; | ||
} else { | ||
@@ -480,5 +450,6 @@ // Slow case, write chunks one-by-one | ||
if (entry === null) | ||
state.lastBufferedRequest = null; | ||
if (entry === null) state.lastBufferedRequest = null; | ||
} | ||
state.bufferedRequestCount = 0; | ||
state.bufferedRequest = entry; | ||
@@ -488,3 +459,3 @@ state.bufferProcessing = false; | ||
Writable.prototype._write = function(chunk, encoding, cb) { | ||
Writable.prototype._write = function (chunk, encoding, cb) { | ||
cb(new Error('not implemented')); | ||
@@ -495,3 +466,3 @@ }; | ||
Writable.prototype.end = function(chunk, encoding, cb) { | ||
Writable.prototype.end = function (chunk, encoding, cb) { | ||
var state = this._writableState; | ||
@@ -508,4 +479,3 @@ | ||
if (chunk !== null && chunk !== undefined) | ||
this.write(chunk, encoding); | ||
if (chunk !== null && chunk !== undefined) this.write(chunk, encoding); | ||
@@ -519,13 +489,7 @@ // .end() fully uncorks | ||
// ignore unnecessary end() calls. | ||
if (!state.ending && !state.finished) | ||
endWritable(this, state, cb); | ||
if (!state.ending && !state.finished) endWritable(this, state, cb); | ||
}; | ||
function needFinish(state) { | ||
return (state.ending && | ||
state.length === 0 && | ||
state.bufferedRequest === null && | ||
!state.finished && | ||
!state.writing); | ||
return state.ending && state.length === 0 && state.bufferedRequest === null && !state.finished && !state.writing; | ||
} | ||
@@ -558,8 +522,31 @@ | ||
if (cb) { | ||
if (state.finished) | ||
processNextTick(cb); | ||
else | ||
stream.once('finish', cb); | ||
if (state.finished) processNextTick(cb);else stream.once('finish', cb); | ||
} | ||
state.ended = true; | ||
stream.writable = false; | ||
} | ||
// It seems a linked list but it is not | ||
// there will be only 2 of these for each stream | ||
function CorkedRequest(state) { | ||
var _this = this; | ||
this.next = null; | ||
this.entry = null; | ||
this.finish = function (err) { | ||
var entry = _this.entry; | ||
_this.entry = null; | ||
while (entry) { | ||
var cb = entry.callback; | ||
state.pendingcb--; | ||
cb(err); | ||
entry = entry.next; | ||
} | ||
if (state.corkedRequestsFree) { | ||
state.corkedRequestsFree.next = _this; | ||
} else { | ||
state.corkedRequestsFree = _this; | ||
} | ||
}; | ||
} |
{ | ||
"name": "readable-stream", | ||
"version": "2.0.5", | ||
"description": "Streams3, a user-land copy of the stream library from iojs v2.x", | ||
"version": "2.0.6", | ||
"description": "Streams3, a user-land copy of the stream library from Node.js", | ||
"main": "readable.js", | ||
@@ -9,3 +9,3 @@ "dependencies": { | ||
"inherits": "~2.0.1", | ||
"isarray": "0.0.1", | ||
"isarray": "~1.0.0", | ||
"process-nextick-args": "~1.0.6", | ||
@@ -17,7 +17,7 @@ "string_decoder": "~0.10.x", | ||
"tap": "~0.2.6", | ||
"tape": "~4.0.0", | ||
"zuul": "~3.0.0" | ||
"tape": "~4.5.1", | ||
"zuul": "~3.9.0" | ||
}, | ||
"scripts": { | ||
"test": "tap test/parallel/*.js", | ||
"test": "tap test/parallel/*.js test/ours/*.js", | ||
"browser": "npm run write-zuul && zuul -- test/browser.js", | ||
@@ -24,0 +24,0 @@ "write-zuul": "printf \"ui: tape\nbrowsers:\n - name: $BROWSER_NAME\n version: $BROWSER_VERSION\n\">.zuul.yml" |
# readable-stream | ||
***Node-core streams for userland*** [![Build Status](https://travis-ci.org/nodejs/readable-stream.svg?branch=master)](https://travis-ci.org/nodejs/readable-stream) | ||
***Node-core v5.8.0 streams for userland*** [![Build Status](https://travis-ci.org/nodejs/readable-stream.svg?branch=master)](https://travis-ci.org/nodejs/readable-stream) | ||
@@ -5,0 +5,0 @@ |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
115821
1381
+ Addedisarray@1.0.0(transitive)
- Removedisarray@0.0.1(transitive)
Updatedisarray@~1.0.0