Socket
Socket
Sign inDemoInstall

readable-stream

Package Overview
Dependencies
Maintainers
4
Versions
103
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

readable-stream - npm Package Compare versions

Comparing version 2.0.5 to 2.0.6

33

lib/_stream_duplex.js

@@ -9,10 +9,11 @@ // a duplex stream is just a stream that is both readable and writable.

/*<replacement>*/
var objectKeys = Object.keys || function (obj) {
var keys = [];
for (var key in obj) keys.push(key);
return keys;
}
for (var key in obj) {
keys.push(key);
}return keys;
};
/*</replacement>*/
module.exports = Duplex;

@@ -24,4 +25,2 @@

/*<replacement>*/

@@ -40,9 +39,7 @@ var util = require('core-util-is');

var method = keys[v];
if (!Duplex.prototype[method])
Duplex.prototype[method] = Writable.prototype[method];
if (!Duplex.prototype[method]) Duplex.prototype[method] = Writable.prototype[method];
}
function Duplex(options) {
if (!(this instanceof Duplex))
return new Duplex(options);
if (!(this instanceof Duplex)) return new Duplex(options);

@@ -52,11 +49,8 @@ Readable.call(this, options);

if (options && options.readable === false)
this.readable = false;
if (options && options.readable === false) this.readable = false;
if (options && options.writable === false)
this.writable = false;
if (options && options.writable === false) this.writable = false;
this.allowHalfOpen = true;
if (options && options.allowHalfOpen === false)
this.allowHalfOpen = false;
if (options && options.allowHalfOpen === false) this.allowHalfOpen = false;

@@ -70,4 +64,3 @@ this.once('end', onend);

// then we're ok.
if (this.allowHalfOpen || this._writableState.ended)
return;
if (this.allowHalfOpen || this._writableState.ended) return;

@@ -83,6 +76,6 @@ // no more data can be written.

function forEach (xs, f) {
function forEach(xs, f) {
for (var i = 0, l = xs.length; i < l; i++) {
f(xs[i], i);
}
}
}

@@ -19,4 +19,3 @@ // a passthrough stream.

function PassThrough(options) {
if (!(this instanceof PassThrough))
return new PassThrough(options);
if (!(this instanceof PassThrough)) return new PassThrough(options);

@@ -26,4 +25,4 @@ Transform.call(this, options);

PassThrough.prototype._transform = function(chunk, encoding, cb) {
PassThrough.prototype._transform = function (chunk, encoding, cb) {
cb(null, chunk);
};
};

@@ -9,3 +9,2 @@ 'use strict';

/*<replacement>*/

@@ -15,3 +14,2 @@ var isArray = require('isarray');

/*<replacement>*/

@@ -26,3 +24,3 @@ var Buffer = require('buffer').Buffer;

/*<replacement>*/
var EElistenerCount = function(emitter, type) {
var EElistenerCount = function (emitter, type) {
return emitter.listeners(type).length;

@@ -32,12 +30,11 @@ };

/*<replacement>*/
var Stream;
(function (){try{
Stream = require('st' + 'ream');
}catch(_){}finally{
if (!Stream)
Stream = require('events').EventEmitter;
}}())
(function () {
try {
Stream = require('st' + 'ream');
} catch (_) {} finally {
if (!Stream) Stream = require('events').EventEmitter;
}
})();
/*</replacement>*/

@@ -52,7 +49,5 @@

/*<replacement>*/
var debugUtil = require('util');
var debug;
var debug = undefined;
if (debugUtil && debugUtil.debuglog) {

@@ -79,4 +74,3 @@ debug = debugUtil.debuglog('stream');

if (stream instanceof Duplex)
this.objectMode = this.objectMode || !!options.readableObjectMode;
if (stream instanceof Duplex) this.objectMode = this.objectMode || !!options.readableObjectMode;

@@ -87,6 +81,6 @@ // the point at which it stops calling _read() to fill the buffer

var defaultHwm = this.objectMode ? 16 : 16 * 1024;
this.highWaterMark = (hwm || hwm === 0) ? hwm : defaultHwm;
this.highWaterMark = hwm || hwm === 0 ? hwm : defaultHwm;
// cast to ints.
this.highWaterMark = ~~this.highWaterMark;
this.highWaterMark = ~ ~this.highWaterMark;

@@ -113,2 +107,3 @@ this.buffer = [];

this.readableListening = false;
this.resumeScheduled = false;

@@ -133,4 +128,3 @@ // Crypto is kind of old and crusty. Historically, its default string

if (options.encoding) {
if (!StringDecoder)
StringDecoder = require('string_decoder/').StringDecoder;
if (!StringDecoder) StringDecoder = require('string_decoder/').StringDecoder;
this.decoder = new StringDecoder(options.encoding);

@@ -145,4 +139,3 @@ this.encoding = options.encoding;

if (!(this instanceof Readable))
return new Readable(options);
if (!(this instanceof Readable)) return new Readable(options);

@@ -154,4 +147,3 @@ this._readableState = new ReadableState(options, this);

if (options && typeof options.read === 'function')
this._read = options.read;
if (options && typeof options.read === 'function') this._read = options.read;

@@ -165,3 +157,3 @@ Stream.call(this);

// write() some more.
Readable.prototype.push = function(chunk, encoding) {
Readable.prototype.push = function (chunk, encoding) {
var state = this._readableState;

@@ -181,3 +173,3 @@

// Unshift should *always* be something directly out of read()
Readable.prototype.unshift = function(chunk) {
Readable.prototype.unshift = function (chunk) {
var state = this._readableState;

@@ -187,3 +179,3 @@ return readableAddChunk(this, state, chunk, '', true);

Readable.prototype.isPaused = function() {
Readable.prototype.isPaused = function () {
return this._readableState.flowing === false;

@@ -207,22 +199,24 @@ };

} else {
if (state.decoder && !addToFront && !encoding)
var skipAdd;
if (state.decoder && !addToFront && !encoding) {
chunk = state.decoder.write(chunk);
skipAdd = !state.objectMode && chunk.length === 0;
}
if (!addToFront)
state.reading = false;
if (!addToFront) state.reading = false;
// if we want the data now, just emit it.
if (state.flowing && state.length === 0 && !state.sync) {
stream.emit('data', chunk);
stream.read(0);
} else {
// update the buffer info.
state.length += state.objectMode ? 1 : chunk.length;
if (addToFront)
state.buffer.unshift(chunk);
else
state.buffer.push(chunk);
// Don't add to the buffer if we've decoded to an empty string chunk and
// we're not in object mode
if (!skipAdd) {
// if we want the data now, just emit it.
if (state.flowing && state.length === 0 && !state.sync) {
stream.emit('data', chunk);
stream.read(0);
} else {
// update the buffer info.
state.length += state.objectMode ? 1 : chunk.length;
if (addToFront) state.buffer.unshift(chunk);else state.buffer.push(chunk);
if (state.needReadable)
emitReadable(stream);
if (state.needReadable) emitReadable(stream);
}
}

@@ -239,3 +233,2 @@

// if it's past the high water mark, we can push in some more.

@@ -249,12 +242,8 @@ // Also, if we have no data yet, we can stand some

function needMoreData(state) {
return !state.ended &&
(state.needReadable ||
state.length < state.highWaterMark ||
state.length === 0);
return !state.ended && (state.needReadable || state.length < state.highWaterMark || state.length === 0);
}
// backwards compatibility.
Readable.prototype.setEncoding = function(enc) {
if (!StringDecoder)
StringDecoder = require('string_decoder/').StringDecoder;
Readable.prototype.setEncoding = function (enc) {
if (!StringDecoder) StringDecoder = require('string_decoder/').StringDecoder;
this._readableState.decoder = new StringDecoder(enc);

@@ -284,18 +273,12 @@ this._readableState.encoding = enc;

function howMuchToRead(n, state) {
if (state.length === 0 && state.ended)
return 0;
if (state.length === 0 && state.ended) return 0;
if (state.objectMode)
return n === 0 ? 0 : 1;
if (state.objectMode) return n === 0 ? 0 : 1;
if (n === null || isNaN(n)) {
// only flow one buffer at a time
if (state.flowing && state.buffer.length)
return state.buffer[0].length;
else
return state.length;
if (state.flowing && state.buffer.length) return state.buffer[0].length;else return state.length;
}
if (n <= 0)
return 0;
if (n <= 0) return 0;

@@ -306,4 +289,3 @@ // If we're asking for more than the target buffer level,

// amounts.
if (n > state.highWaterMark)
state.highWaterMark = computeNewHighWaterMark(n);
if (n > state.highWaterMark) state.highWaterMark = computeNewHighWaterMark(n);

@@ -324,3 +306,3 @@ // don't have that much. return null, unless we've ended.

// you can override either this method, or the async _read(n) below.
Readable.prototype.read = function(n) {
Readable.prototype.read = function (n) {
debug('read', n);

@@ -330,4 +312,3 @@ var state = this._readableState;

if (typeof n !== 'number' || n > 0)
state.emittedReadable = false;
if (typeof n !== 'number' || n > 0) state.emittedReadable = false;

@@ -337,10 +318,5 @@ // if we're doing read(0) to trigger a readable event, but we

// the 'readable' event and move on.
if (n === 0 &&
state.needReadable &&
(state.length >= state.highWaterMark || state.ended)) {
if (n === 0 && state.needReadable && (state.length >= state.highWaterMark || state.ended)) {
debug('read: emitReadable', state.length, state.ended);
if (state.length === 0 && state.ended)
endReadable(this);
else
emitReadable(this);
if (state.length === 0 && state.ended) endReadable(this);else emitReadable(this);
return null;

@@ -353,4 +329,3 @@ }

if (n === 0 && state.ended) {
if (state.length === 0)
endReadable(this);
if (state.length === 0) endReadable(this);
return null;

@@ -403,4 +378,3 @@ }

// if the length is currently zero, then we *need* a readable event.
if (state.length === 0)
state.needReadable = true;
if (state.length === 0) state.needReadable = true;
// call internal read method

@@ -413,10 +387,6 @@ this._read(state.highWaterMark);

// and we need to re-evaluate how much data we can return to the user.
if (doRead && !state.reading)
n = howMuchToRead(nOrig, state);
if (doRead && !state.reading) n = howMuchToRead(nOrig, state);
var ret;
if (n > 0)
ret = fromList(n, state);
else
ret = null;
if (n > 0) ret = fromList(n, state);else ret = null;

@@ -432,11 +402,8 @@ if (ret === null) {

// as soon as we *do* get something into the buffer.
if (state.length === 0 && !state.ended)
state.needReadable = true;
if (state.length === 0 && !state.ended) state.needReadable = true;
// If we tried to read() past the EOF, then emit end on the next tick.
if (nOrig !== n && state.ended && state.length === 0)
endReadable(this);
if (nOrig !== n && state.ended && state.length === 0) endReadable(this);
if (ret !== null)
this.emit('data', ret);
if (ret !== null) this.emit('data', ret);

@@ -448,7 +415,3 @@ return ret;

var er = null;
if (!(Buffer.isBuffer(chunk)) &&
typeof chunk !== 'string' &&
chunk !== null &&
chunk !== undefined &&
!state.objectMode) {
if (!Buffer.isBuffer(chunk) && typeof chunk !== 'string' && chunk !== null && chunk !== undefined && !state.objectMode) {
er = new TypeError('Invalid non-string/buffer chunk');

@@ -459,3 +422,2 @@ }

function onEofChunk(stream, state) {

@@ -485,6 +447,3 @@ if (state.ended) return;

state.emittedReadable = true;
if (state.sync)
processNextTick(emitReadable_, stream);
else
emitReadable_(stream);
if (state.sync) processNextTick(emitReadable_, stream);else emitReadable_(stream);
}

@@ -499,3 +458,2 @@ }

// at this point, the user has presumably seen the 'readable' event,

@@ -516,4 +474,3 @@ // and called read() to consume some data. that may have triggered

var len = state.length;
while (!state.reading && !state.flowing && !state.ended &&
state.length < state.highWaterMark) {
while (!state.reading && !state.flowing && !state.ended && state.length < state.highWaterMark) {
debug('maybeReadMore read 0');

@@ -523,5 +480,3 @@ stream.read(0);

// didn't get any data, stop spinning.
break;
else
len = state.length;
break;else len = state.length;
}

@@ -535,7 +490,7 @@ state.readingMore = false;

// arbitrary, and perhaps not very meaningful.
Readable.prototype._read = function(n) {
Readable.prototype._read = function (n) {
this.emit('error', new Error('not implemented'));
};
Readable.prototype.pipe = function(dest, pipeOpts) {
Readable.prototype.pipe = function (dest, pipeOpts) {
var src = this;

@@ -558,11 +513,6 @@ var state = this._readableState;

var doEnd = (!pipeOpts || pipeOpts.end !== false) &&
dest !== process.stdout &&
dest !== process.stderr;
var doEnd = (!pipeOpts || pipeOpts.end !== false) && dest !== process.stdout && dest !== process.stderr;
var endFn = doEnd ? onend : cleanup;
if (state.endEmitted)
processNextTick(endFn);
else
src.once('end', endFn);
if (state.endEmitted) processNextTick(endFn);else src.once('end', endFn);

@@ -609,5 +559,3 @@ dest.on('unpipe', onunpipe);

// If we don't know, then assume that we are waiting for one.
if (state.awaitDrain &&
(!dest._writableState || dest._writableState.needDrain))
ondrain();
if (state.awaitDrain && (!dest._writableState || dest._writableState.needDrain)) ondrain();
}

@@ -623,6 +571,3 @@

// also returned false.
if (state.pipesCount === 1 &&
state.pipes[0] === dest &&
src.listenerCount('data') === 1 &&
!cleanedUp) {
if (state.pipesCount === 1 && state.pipes[0] === dest && src.listenerCount('data') === 1 && !cleanedUp) {
debug('false write response, pause', src._readableState.awaitDrain);

@@ -641,15 +586,8 @@ src._readableState.awaitDrain++;

dest.removeListener('error', onerror);
if (EElistenerCount(dest, 'error') === 0)
dest.emit('error', er);
if (EElistenerCount(dest, 'error') === 0) dest.emit('error', er);
}
// This is a brutally ugly hack to make sure that our error handler
// is attached before any userland ones. NEVER DO THIS.
if (!dest._events || !dest._events.error)
dest.on('error', onerror);
else if (isArray(dest._events.error))
dest._events.error.unshift(onerror);
else
dest._events.error = [onerror, dest._events.error];
if (!dest._events || !dest._events.error) dest.on('error', onerror);else if (isArray(dest._events.error)) dest._events.error.unshift(onerror);else dest._events.error = [onerror, dest._events.error];
// Both close and finish should trigger unpipe, but only once.

@@ -686,7 +624,6 @@ function onclose() {

function pipeOnDrain(src) {
return function() {
return function () {
var state = src._readableState;
debug('pipeOnDrain', state.awaitDrain);
if (state.awaitDrain)
state.awaitDrain--;
if (state.awaitDrain) state.awaitDrain--;
if (state.awaitDrain === 0 && EElistenerCount(src, 'data')) {

@@ -699,9 +636,7 @@ state.flowing = true;

Readable.prototype.unpipe = function(dest) {
Readable.prototype.unpipe = function (dest) {
var state = this._readableState;
// if we're not piping anywhere, then do nothing.
if (state.pipesCount === 0)
return this;
if (state.pipesCount === 0) return this;

@@ -711,7 +646,5 @@ // just one destination. most common case.

// passed in one, but it's not the right one.
if (dest && dest !== state.pipes)
return this;
if (dest && dest !== state.pipes) return this;
if (!dest)
dest = state.pipes;
if (!dest) dest = state.pipes;

@@ -722,4 +655,3 @@ // got a match.

state.flowing = false;
if (dest)
dest.emit('unpipe', this);
if (dest) dest.emit('unpipe', this);
return this;

@@ -738,5 +670,5 @@ }

for (var i = 0; i < len; i++)
dests[i].emit('unpipe', this);
return this;
for (var _i = 0; _i < len; _i++) {
dests[_i].emit('unpipe', this);
}return this;
}

@@ -746,9 +678,7 @@

var i = indexOf(state.pipes, dest);
if (i === -1)
return this;
if (i === -1) return this;
state.pipes.splice(i, 1);
state.pipesCount -= 1;
if (state.pipesCount === 1)
state.pipes = state.pipes[0];
if (state.pipesCount === 1) state.pipes = state.pipes[0];

@@ -762,3 +692,3 @@ dest.emit('unpipe', this);

// Ensure readable listeners eventually get something
Readable.prototype.on = function(ev, fn) {
Readable.prototype.on = function (ev, fn) {
var res = Stream.prototype.on.call(this, ev, fn);

@@ -772,3 +702,3 @@

if (ev === 'readable' && this.readable) {
if (ev === 'readable' && !this._readableState.endEmitted) {
var state = this._readableState;

@@ -798,3 +728,3 @@ if (!state.readableListening) {

// If the user uses them, then switch into old mode.
Readable.prototype.resume = function() {
Readable.prototype.resume = function () {
var state = this._readableState;

@@ -825,7 +755,6 @@ if (!state.flowing) {

flow(stream);
if (state.flowing && !state.reading)
stream.read(0);
if (state.flowing && !state.reading) stream.read(0);
}
Readable.prototype.pause = function() {
Readable.prototype.pause = function () {
debug('call pause flowing=%j', this._readableState.flowing);

@@ -853,3 +782,3 @@ if (false !== this._readableState.flowing) {

// It is an ugly unfortunate mess of history.
Readable.prototype.wrap = function(stream) {
Readable.prototype.wrap = function (stream) {
var state = this._readableState;

@@ -859,8 +788,7 @@ var paused = false;

var self = this;
stream.on('end', function() {
stream.on('end', function () {
debug('wrapped end');
if (state.decoder && !state.ended) {
var chunk = state.decoder.end();
if (chunk && chunk.length)
self.push(chunk);
if (chunk && chunk.length) self.push(chunk);
}

@@ -871,12 +799,8 @@

stream.on('data', function(chunk) {
stream.on('data', function (chunk) {
debug('wrapped data');
if (state.decoder)
chunk = state.decoder.write(chunk);
if (state.decoder) chunk = state.decoder.write(chunk);
// don't skip over falsy values in objectMode
if (state.objectMode && (chunk === null || chunk === undefined))
return;
else if (!state.objectMode && (!chunk || !chunk.length))
return;
if (state.objectMode && (chunk === null || chunk === undefined)) return;else if (!state.objectMode && (!chunk || !chunk.length)) return;

@@ -894,5 +818,7 @@ var ret = self.push(chunk);

if (this[i] === undefined && typeof stream[i] === 'function') {
this[i] = function(method) { return function() {
return stream[method].apply(stream, arguments);
}; }(i);
this[i] = function (method) {
return function () {
return stream[method].apply(stream, arguments);
};
}(i);
}

@@ -903,3 +829,3 @@ }

var events = ['error', 'close', 'destroy', 'pause', 'resume'];
forEach(events, function(ev) {
forEach(events, function (ev) {
stream.on(ev, self.emit.bind(self, ev));

@@ -910,3 +836,3 @@ });

// underlying stream.
self._read = function(n) {
self._read = function (n) {
debug('wrapped _read', n);

@@ -922,3 +848,2 @@ if (paused) {

// exposed for testing purposes only.

@@ -937,17 +862,7 @@ Readable._fromList = fromList;

// nothing in the list, definitely empty.
if (list.length === 0)
return null;
if (list.length === 0) return null;
if (length === 0)
ret = null;
else if (objectMode)
ret = list.shift();
else if (!n || n >= length) {
if (length === 0) ret = null;else if (objectMode) ret = list.shift();else if (!n || n >= length) {
// read it all, truncate the array.
if (stringMode)
ret = list.join('');
else if (list.length === 1)
ret = list[0];
else
ret = Buffer.concat(list, length);
if (stringMode) ret = list.join('');else if (list.length === 1) ret = list[0];else ret = Buffer.concat(list, length);
list.length = 0;

@@ -968,6 +883,3 @@ } else {

// we have enough to cover it, but it spans past the first buffer.
if (stringMode)
ret = '';
else
ret = new Buffer(n);
if (stringMode) ret = '';else ret = new Buffer(n);

@@ -979,11 +891,5 @@ var c = 0;

if (stringMode)
ret += buf.slice(0, cpy);
else
buf.copy(ret, c, 0, cpy);
if (stringMode) ret += buf.slice(0, cpy);else buf.copy(ret, c, 0, cpy);
if (cpy < buf.length)
list[0] = buf.slice(cpy);
else
list.shift();
if (cpy < buf.length) list[0] = buf.slice(cpy);else list.shift();

@@ -1003,4 +909,3 @@ c += cpy;

// bug in node. Should never happen.
if (state.length > 0)
throw new Error('endReadable called on non-empty stream');
if (state.length > 0) throw new Error('endReadable called on non-empty stream');

@@ -1022,3 +927,3 @@ if (!state.endEmitted) {

function forEach (xs, f) {
function forEach(xs, f) {
for (var i = 0, l = xs.length; i < l; i++) {

@@ -1029,3 +934,3 @@ f(xs[i], i);

function indexOf (xs, x) {
function indexOf(xs, x) {
for (var i = 0, l = xs.length; i < l; i++) {

@@ -1035,2 +940,2 @@ if (xs[i] === x) return i;

return -1;
}
}

@@ -56,5 +56,4 @@ // a transform stream is a readable/writable stream where you do

function TransformState(stream) {
this.afterTransform = function(er, data) {
this.afterTransform = function (er, data) {
return afterTransform(stream, er, data);

@@ -67,2 +66,3 @@ };

this.writechunk = null;
this.writeencoding = null;
}

@@ -76,4 +76,3 @@

if (!cb)
return stream.emit('error', new Error('no writecb in Transform class'));
if (!cb) return stream.emit('error', new Error('no writecb in Transform class'));

@@ -83,7 +82,5 @@ ts.writechunk = null;

if (data !== null && data !== undefined)
stream.push(data);
if (data !== null && data !== undefined) stream.push(data);
if (cb)
cb(er);
cb(er);

@@ -97,6 +94,4 @@ var rs = stream._readableState;

function Transform(options) {
if (!(this instanceof Transform))
return new Transform(options);
if (!(this instanceof Transform)) return new Transform(options);

@@ -119,20 +114,15 @@ Duplex.call(this, options);

if (options) {
if (typeof options.transform === 'function')
this._transform = options.transform;
if (typeof options.transform === 'function') this._transform = options.transform;
if (typeof options.flush === 'function')
this._flush = options.flush;
if (typeof options.flush === 'function') this._flush = options.flush;
}
this.once('prefinish', function() {
if (typeof this._flush === 'function')
this._flush(function(er) {
done(stream, er);
});
else
done(stream);
this.once('prefinish', function () {
if (typeof this._flush === 'function') this._flush(function (er) {
done(stream, er);
});else done(stream);
});
}
Transform.prototype.push = function(chunk, encoding) {
Transform.prototype.push = function (chunk, encoding) {
this._transformState.needTransform = false;

@@ -152,7 +142,7 @@ return Duplex.prototype.push.call(this, chunk, encoding);

// never call cb(), then you'll never get another chunk.
Transform.prototype._transform = function(chunk, encoding, cb) {
Transform.prototype._transform = function (chunk, encoding, cb) {
throw new Error('not implemented');
};
Transform.prototype._write = function(chunk, encoding, cb) {
Transform.prototype._write = function (chunk, encoding, cb) {
var ts = this._transformState;

@@ -164,6 +154,3 @@ ts.writecb = cb;

var rs = this._readableState;
if (ts.needTransform ||
rs.needReadable ||
rs.length < rs.highWaterMark)
this._read(rs.highWaterMark);
if (ts.needTransform || rs.needReadable || rs.length < rs.highWaterMark) this._read(rs.highWaterMark);
}

@@ -175,3 +162,3 @@ };

// That we got here means that the readable side wants more data.
Transform.prototype._read = function(n) {
Transform.prototype._read = function (n) {
var ts = this._transformState;

@@ -189,6 +176,4 @@

function done(stream, er) {
if (er)
return stream.emit('error', er);
if (er) return stream.emit('error', er);

@@ -200,9 +185,7 @@ // if there's nothing in the write buffer, then that means

if (ws.length)
throw new Error('calling transform done when ws.length != 0');
if (ws.length) throw new Error('calling transform done when ws.length != 0');
if (ts.transforming)
throw new Error('calling transform done when still transforming');
if (ts.transforming) throw new Error('calling transform done when still transforming');
return stream.push(null);
}
}

@@ -13,2 +13,5 @@ // A bit simpler than readable streams.

/*<replacement>*/
var asyncWrite = !process.browser && ['v0.10', 'v0.9.'].indexOf(process.version.slice(0, 5)) > -1 ? setImmediate : processNextTick;
/*</replacement>*/

@@ -21,3 +24,2 @@ /*<replacement>*/

/*<replacement>*/

@@ -28,3 +30,2 @@ var util = require('core-util-is');

/*<replacement>*/

@@ -36,12 +37,11 @@ var internalUtil = {

/*<replacement>*/
var Stream;
(function (){try{
Stream = require('st' + 'ream');
}catch(_){}finally{
if (!Stream)
Stream = require('events').EventEmitter;
}}())
(function () {
try {
Stream = require('st' + 'ream');
} catch (_) {} finally {
if (!Stream) Stream = require('events').EventEmitter;
}
})();
/*</replacement>*/

@@ -72,4 +72,3 @@

if (stream instanceof Duplex)
this.objectMode = this.objectMode || !!options.writableObjectMode;
if (stream instanceof Duplex) this.objectMode = this.objectMode || !!options.writableObjectMode;

@@ -81,6 +80,6 @@ // the point at which write() starts returning false

var defaultHwm = this.objectMode ? 16 : 16 * 1024;
this.highWaterMark = (hwm || hwm === 0) ? hwm : defaultHwm;
this.highWaterMark = hwm || hwm === 0 ? hwm : defaultHwm;
// cast to ints.
this.highWaterMark = ~~this.highWaterMark;
this.highWaterMark = ~ ~this.highWaterMark;

@@ -129,3 +128,3 @@ this.needDrain = false;

// the callback that's passed to _write(chunk,cb)
this.onwrite = function(er) {
this.onwrite = function (er) {
onwrite(stream, er);

@@ -153,2 +152,10 @@ };

this.errorEmitted = false;
// count buffered requests
this.bufferedRequestCount = 0;
// create the two objects needed to store the corked requests
// they are not a linked list, as no new elements are inserted in there
this.corkedRequestsFree = new CorkedRequest(this);
this.corkedRequestsFree.next = new CorkedRequest(this);
}

@@ -166,12 +173,12 @@

(function (){try {
Object.defineProperty(WritableState.prototype, 'buffer', {
get: internalUtil.deprecate(function() {
return this.getBuffer();
}, '_writableState.buffer is deprecated. Use _writableState.getBuffer ' +
'instead.')
});
}catch(_){}}());
(function () {
try {
Object.defineProperty(WritableState.prototype, 'buffer', {
get: internalUtil.deprecate(function () {
return this.getBuffer();
}, '_writableState.buffer is deprecated. Use _writableState.getBuffer ' + 'instead.')
});
} catch (_) {}
})();
var Duplex;

@@ -183,4 +190,3 @@ function Writable(options) {

// instanceof Writable, they're instanceof Readable.
if (!(this instanceof Writable) && !(this instanceof Duplex))
return new Writable(options);
if (!(this instanceof Writable) && !(this instanceof Duplex)) return new Writable(options);

@@ -193,7 +199,5 @@ this._writableState = new WritableState(options, this);

if (options) {
if (typeof options.write === 'function')
this._write = options.write;
if (typeof options.write === 'function') this._write = options.write;
if (typeof options.writev === 'function')
this._writev = options.writev;
if (typeof options.writev === 'function') this._writev = options.writev;
}

@@ -205,7 +209,6 @@

// Otherwise people can pipe Writable streams, which is just wrong.
Writable.prototype.pipe = function() {
Writable.prototype.pipe = function () {
this.emit('error', new Error('Cannot pipe. Not readable.'));
};
function writeAfterEnd(stream, cb) {

@@ -226,7 +229,3 @@ var er = new Error('write after end');

if (!(Buffer.isBuffer(chunk)) &&
typeof chunk !== 'string' &&
chunk !== null &&
chunk !== undefined &&
!state.objectMode) {
if (!Buffer.isBuffer(chunk) && typeof chunk !== 'string' && chunk !== null && chunk !== undefined && !state.objectMode) {
var er = new TypeError('Invalid non-string/buffer chunk');

@@ -240,3 +239,3 @@ stream.emit('error', er);

Writable.prototype.write = function(chunk, encoding, cb) {
Writable.prototype.write = function (chunk, encoding, cb) {
var state = this._writableState;

@@ -250,13 +249,7 @@ var ret = false;

if (Buffer.isBuffer(chunk))
encoding = 'buffer';
else if (!encoding)
encoding = state.defaultEncoding;
if (Buffer.isBuffer(chunk)) encoding = 'buffer';else if (!encoding) encoding = state.defaultEncoding;
if (typeof cb !== 'function')
cb = nop;
if (typeof cb !== 'function') cb = nop;
if (state.ended)
writeAfterEnd(this, cb);
else if (validChunk(this, state, chunk, cb)) {
if (state.ended) writeAfterEnd(this, cb);else if (validChunk(this, state, chunk, cb)) {
state.pendingcb++;

@@ -269,3 +262,3 @@ ret = writeOrBuffer(this, state, chunk, encoding, cb);

Writable.prototype.cork = function() {
Writable.prototype.cork = function () {
var state = this._writableState;

@@ -276,3 +269,3 @@

Writable.prototype.uncork = function() {
Writable.prototype.uncork = function () {
var state = this._writableState;

@@ -283,8 +276,3 @@

if (!state.writing &&
!state.corked &&
!state.finished &&
!state.bufferProcessing &&
state.bufferedRequest)
clearBuffer(this, state);
if (!state.writing && !state.corked && !state.finished && !state.bufferProcessing && state.bufferedRequest) clearBuffer(this, state);
}

@@ -295,8 +283,4 @@ };

// node::ParseEncoding() requires lower case.
if (typeof encoding === 'string')
encoding = encoding.toLowerCase();
if (!(['hex', 'utf8', 'utf-8', 'ascii', 'binary', 'base64',
'ucs2', 'ucs-2','utf16le', 'utf-16le', 'raw']
.indexOf((encoding + '').toLowerCase()) > -1))
throw new TypeError('Unknown encoding: ' + encoding);
if (typeof encoding === 'string') encoding = encoding.toLowerCase();
if (!(['hex', 'utf8', 'utf-8', 'ascii', 'binary', 'base64', 'ucs2', 'ucs-2', 'utf16le', 'utf-16le', 'raw'].indexOf((encoding + '').toLowerCase()) > -1)) throw new TypeError('Unknown encoding: ' + encoding);
this._writableState.defaultEncoding = encoding;

@@ -306,5 +290,3 @@ };

function decodeChunk(state, chunk, encoding) {
if (!state.objectMode &&
state.decodeStrings !== false &&
typeof chunk === 'string') {
if (!state.objectMode && state.decodeStrings !== false && typeof chunk === 'string') {
chunk = new Buffer(chunk, encoding);

@@ -321,4 +303,3 @@ }

if (Buffer.isBuffer(chunk))
encoding = 'buffer';
if (Buffer.isBuffer(chunk)) encoding = 'buffer';
var len = state.objectMode ? 1 : chunk.length;

@@ -330,4 +311,3 @@

// we must ensure that previous needDrain will not be reset to false.
if (!ret)
state.needDrain = true;
if (!ret) state.needDrain = true;

@@ -342,2 +322,3 @@ if (state.writing || state.corked) {

}
state.bufferedRequestCount += 1;
} else {

@@ -355,6 +336,3 @@ doWrite(stream, state, false, len, chunk, encoding, cb);

state.sync = true;
if (writev)
stream._writev(chunk, state.onwrite);
else
stream._write(chunk, encoding, state.onwrite);
if (writev) stream._writev(chunk, state.onwrite);else stream._write(chunk, encoding, state.onwrite);
state.sync = false;

@@ -365,6 +343,3 @@ }

--state.pendingcb;
if (sync)
processNextTick(cb, er);
else
cb(er);
if (sync) processNextTick(cb, er);else cb(er);

@@ -389,12 +364,7 @@ stream._writableState.errorEmitted = true;

if (er)
onwriteError(stream, state, sync, er, cb);
else {
if (er) onwriteError(stream, state, sync, er, cb);else {
// Check if we're actually ready to finish, but don't emit yet
var finished = needFinish(state);
if (!finished &&
!state.corked &&
!state.bufferProcessing &&
state.bufferedRequest) {
if (!finished && !state.corked && !state.bufferProcessing && state.bufferedRequest) {
clearBuffer(stream, state);

@@ -404,6 +374,8 @@ }

if (sync) {
processNextTick(afterWrite, stream, state, finished, cb);
/*<replacement>*/
asyncWrite(afterWrite, stream, state, finished, cb);
/*</replacement>*/
} else {
afterWrite(stream, state, finished, cb);
}
afterWrite(stream, state, finished, cb);
}
}

@@ -413,4 +385,3 @@ }

function afterWrite(stream, state, finished, cb) {
if (!finished)
onwriteDrain(stream, state);
if (!finished) onwriteDrain(stream, state);
state.pendingcb--;

@@ -431,3 +402,2 @@ cb();

// if there's something in the buffer waiting, then process it

@@ -440,22 +410,22 @@ function clearBuffer(stream, state) {

// Fast case, write everything using _writev()
var buffer = [];
var cbs = [];
var l = state.bufferedRequestCount;
var buffer = new Array(l);
var holder = state.corkedRequestsFree;
holder.entry = entry;
var count = 0;
while (entry) {
cbs.push(entry.callback);
buffer.push(entry);
buffer[count] = entry;
entry = entry.next;
count += 1;
}
// count the one we are adding, as well.
// TODO(isaacs) clean this up
doWrite(stream, state, true, state.length, buffer, '', holder.finish);
// doWrite is always async, defer these to save a bit of time
// as the hot path ends with doWrite
state.pendingcb++;
state.lastBufferedRequest = null;
doWrite(stream, state, true, state.length, buffer, '', function(err) {
for (var i = 0; i < cbs.length; i++) {
state.pendingcb--;
cbs[i](err);
}
});
// Clear buffer
state.corkedRequestsFree = holder.next;
holder.next = null;
} else {

@@ -480,5 +450,6 @@ // Slow case, write chunks one-by-one

if (entry === null)
state.lastBufferedRequest = null;
if (entry === null) state.lastBufferedRequest = null;
}
state.bufferedRequestCount = 0;
state.bufferedRequest = entry;

@@ -488,3 +459,3 @@ state.bufferProcessing = false;

Writable.prototype._write = function(chunk, encoding, cb) {
Writable.prototype._write = function (chunk, encoding, cb) {
cb(new Error('not implemented'));

@@ -495,3 +466,3 @@ };

Writable.prototype.end = function(chunk, encoding, cb) {
Writable.prototype.end = function (chunk, encoding, cb) {
var state = this._writableState;

@@ -508,4 +479,3 @@

if (chunk !== null && chunk !== undefined)
this.write(chunk, encoding);
if (chunk !== null && chunk !== undefined) this.write(chunk, encoding);

@@ -519,13 +489,7 @@ // .end() fully uncorks

// ignore unnecessary end() calls.
if (!state.ending && !state.finished)
endWritable(this, state, cb);
if (!state.ending && !state.finished) endWritable(this, state, cb);
};
function needFinish(state) {
return (state.ending &&
state.length === 0 &&
state.bufferedRequest === null &&
!state.finished &&
!state.writing);
return state.ending && state.length === 0 && state.bufferedRequest === null && !state.finished && !state.writing;
}

@@ -558,8 +522,31 @@

if (cb) {
if (state.finished)
processNextTick(cb);
else
stream.once('finish', cb);
if (state.finished) processNextTick(cb);else stream.once('finish', cb);
}
state.ended = true;
stream.writable = false;
}
// It seems a linked list but it is not
// there will be only 2 of these for each stream
function CorkedRequest(state) {
var _this = this;
this.next = null;
this.entry = null;
this.finish = function (err) {
var entry = _this.entry;
_this.entry = null;
while (entry) {
var cb = entry.callback;
state.pendingcb--;
cb(err);
entry = entry.next;
}
if (state.corkedRequestsFree) {
state.corkedRequestsFree.next = _this;
} else {
state.corkedRequestsFree = _this;
}
};
}
{
"name": "readable-stream",
"version": "2.0.5",
"description": "Streams3, a user-land copy of the stream library from iojs v2.x",
"version": "2.0.6",
"description": "Streams3, a user-land copy of the stream library from Node.js",
"main": "readable.js",

@@ -9,3 +9,3 @@ "dependencies": {

"inherits": "~2.0.1",
"isarray": "0.0.1",
"isarray": "~1.0.0",
"process-nextick-args": "~1.0.6",

@@ -17,7 +17,7 @@ "string_decoder": "~0.10.x",

"tap": "~0.2.6",
"tape": "~4.0.0",
"zuul": "~3.0.0"
"tape": "~4.5.1",
"zuul": "~3.9.0"
},
"scripts": {
"test": "tap test/parallel/*.js",
"test": "tap test/parallel/*.js test/ours/*.js",
"browser": "npm run write-zuul && zuul -- test/browser.js",

@@ -24,0 +24,0 @@ "write-zuul": "printf \"ui: tape\nbrowsers:\n - name: $BROWSER_NAME\n version: $BROWSER_VERSION\n\">.zuul.yml"

# readable-stream
***Node-core streams for userland*** [![Build Status](https://travis-ci.org/nodejs/readable-stream.svg?branch=master)](https://travis-ci.org/nodejs/readable-stream)
***Node-core v5.8.0 streams for userland*** [![Build Status](https://travis-ci.org/nodejs/readable-stream.svg?branch=master)](https://travis-ci.org/nodejs/readable-stream)

@@ -5,0 +5,0 @@

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc