Socket
Socket
Sign inDemoInstall

readable-stream

Package Overview
Dependencies
Maintainers
6
Versions
103
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

readable-stream - npm Package Compare versions

Comparing version 2.3.6 to 3.0.0-rc.1

.airtap.yml

72

lib/_stream_duplex.js

@@ -31,2 +31,10 @@ // Copyright Joyent, Inc. and other Node contributors.

var _keys;
function _load_keys() {
return _keys = _interopRequireDefault(require('babel-runtime/core-js/object/keys'));
}
function _interopRequireDefault(obj) { return obj && obj.__esModule ? obj : { default: obj }; }
var pna = require('process-nextick-args');

@@ -36,3 +44,3 @@ /*</replacement>*/

/*<replacement>*/
var objectKeys = Object.keys || function (obj) {
var objectKeys = (_keys || _load_keys()).default || function (obj) {
var keys = [];

@@ -47,14 +55,9 @@ for (var key in obj) {

/*<replacement>*/
var util = require('core-util-is');
util.inherits = require('inherits');
/*</replacement>*/
var Readable = require('./_stream_readable');
var Writable = require('./_stream_writable');
util.inherits(Duplex, Readable);
require('inherits')(Duplex, Readable);
{
// avoid scope creep, the keys array can then be collected
// Allow the keys array to be GC'ed.
var keys = objectKeys(Writable.prototype);

@@ -72,11 +75,14 @@ for (var v = 0; v < keys.length; v++) {

Writable.call(this, options);
this.allowHalfOpen = true;
if (options && options.readable === false) this.readable = false;
if (options) {
if (options.readable === false) this.readable = false;
if (options && options.writable === false) this.writable = false;
if (options.writable === false) this.writable = false;
this.allowHalfOpen = true;
if (options && options.allowHalfOpen === false) this.allowHalfOpen = false;
this.once('end', onend);
if (options.allowHalfOpen === false) {
this.allowHalfOpen = false;
this.once('end', onend);
}
}
}

@@ -94,7 +100,26 @@

Object.defineProperty(Duplex.prototype, 'writableBuffer', {
// making it explicit this property is not enumerable
// because otherwise some prototype manipulation in
// userland will fail
enumerable: false,
get: function () {
return this._writableState && this._writableState.getBuffer();
}
});
Object.defineProperty(Duplex.prototype, 'writableLength', {
// making it explicit this property is not enumerable
// because otherwise some prototype manipulation in
// userland will fail
enumerable: false,
get: function () {
return this._writableState.length;
}
});
// the no-half-open enforcer
function onend() {
// if we allow half-open state, or if the writable side ended,
// then we're ok.
if (this.allowHalfOpen || this._writableState.ended) return;
// If the writable side ended, then we're ok.
if (this._writableState.ended) return;

@@ -111,2 +136,6 @@ // no more data can be written.

Object.defineProperty(Duplex.prototype, 'destroyed', {
// making it explicit this property is not enumerable
// because otherwise some prototype manipulation in
// userland will fail
enumerable: false,
get: function () {

@@ -130,9 +159,2 @@ if (this._readableState === undefined || this._writableState === undefined) {

}
});
Duplex.prototype._destroy = function (err, cb) {
this.push(null);
this.end();
pna.nextTick(cb, err);
};
});

@@ -32,9 +32,4 @@ // Copyright Joyent, Inc. and other Node contributors.

/*<replacement>*/
var util = require('core-util-is');
util.inherits = require('inherits');
/*</replacement>*/
require('inherits')(PassThrough, Transform);
util.inherits(PassThrough, Transform);
function PassThrough(options) {

@@ -41,0 +36,0 @@ if (!(this instanceof PassThrough)) return new PassThrough(options);

@@ -26,2 +26,22 @@ // Copyright Joyent, Inc. and other Node contributors.

var _symbol;
function _load_symbol() {
return _symbol = _interopRequireDefault(require('babel-runtime/core-js/symbol'));
}
var _isNan;
function _load_isNan() {
return _isNan = _interopRequireDefault(require('babel-runtime/core-js/number/is-nan'));
}
var _getPrototypeOf;
function _load_getPrototypeOf() {
return _getPrototypeOf = _interopRequireDefault(require('babel-runtime/core-js/object/get-prototype-of'));
}
function _interopRequireDefault(obj) { return obj && obj.__esModule ? obj : { default: obj }; }
var pna = require('process-nextick-args');

@@ -33,6 +53,2 @@ /*</replacement>*/

/*<replacement>*/
var isArray = require('isarray');
/*</replacement>*/
/*<replacement>*/
var Duplex;

@@ -55,5 +71,3 @@ /*</replacement>*/

/*<replacement>*/
var Buffer = require('safe-buffer').Buffer;
var Buffer = require('buffer').Buffer;
var OurUint8Array = global.Uint8Array || function () {};

@@ -67,10 +81,3 @@ function _uint8ArrayToBuffer(chunk) {

/*</replacement>*/
/*<replacement>*/
var util = require('core-util-is');
util.inherits = require('inherits');
/*</replacement>*/
/*<replacement>*/
var debugUtil = require('util');

@@ -85,8 +92,25 @@ var debug = void 0;

var BufferList = require('./internal/streams/BufferList');
var BufferList = require('./internal/streams/buffer_list');
var destroyImpl = require('./internal/streams/destroy');
var StringDecoder;
util.inherits(Readable, Stream);
var _require = require('./internal/streams/state'),
getHighWaterMark = _require.getHighWaterMark;
var _require$codes = require('../errors').codes,
ERR_INVALID_ARG_TYPE = _require$codes.ERR_INVALID_ARG_TYPE,
ERR_STREAM_PUSH_AFTER_EOF = _require$codes.ERR_STREAM_PUSH_AFTER_EOF,
ERR_METHOD_NOT_IMPLEMENTED = _require$codes.ERR_METHOD_NOT_IMPLEMENTED,
ERR_STREAM_UNSHIFT_AFTER_END_EVENT = _require$codes.ERR_STREAM_UNSHIFT_AFTER_END_EVENT;
var _require2 = require('../experimentalWarning'),
emitExperimentalWarning = _require2.emitExperimentalWarning;
// Lazy loaded to improve the startup performance.
var StringDecoder = void 0;
var ReadableAsyncIterator = void 0;
require('inherits')(Readable, Stream);
var kProxyEvents = ['error', 'close', 'destroy', 'pause', 'resume'];

@@ -103,6 +127,6 @@

// the prependListener() method. The goal is to eventually remove this hack.
if (!emitter._events || !emitter._events[event]) emitter.on(event, fn);else if (isArray(emitter._events[event])) emitter._events[event].unshift(fn);else emitter._events[event] = [fn, emitter._events[event]];
if (!emitter._events || !emitter._events[event]) emitter.on(event, fn);else if (Array.isArray(emitter._events[event])) emitter._events[event].unshift(fn);else emitter._events[event] = [fn, emitter._events[event]];
}
function ReadableState(options, stream) {
function ReadableState(options, stream, isDuplex) {
Duplex = Duplex || require('./_stream_duplex');

@@ -117,3 +141,3 @@

// These options can be provided separately as readableXXX and writableXXX.
var isDuplex = stream instanceof Duplex;
if (typeof isDuplex !== 'boolean') isDuplex = stream instanceof Duplex;

@@ -128,11 +152,4 @@ // object stream flag. Used to make read(n) ignore n and to

// Note: 0 is a valid value, means "don't call _read preemptively ever"
var hwm = options.highWaterMark;
var readableHwm = options.readableHighWaterMark;
var defaultHwm = this.objectMode ? 16 : 16 * 1024;
this.highWaterMark = getHighWaterMark(this, options, 'readableHighWaterMark', isDuplex);
if (hwm || hwm === 0) this.highWaterMark = hwm;else if (isDuplex && (readableHwm || readableHwm === 0)) this.highWaterMark = readableHwm;else this.highWaterMark = defaultHwm;
// cast to ints.
this.highWaterMark = Math.floor(this.highWaterMark);
// A linked list is used to store data chunks instead of an array because the

@@ -163,2 +180,5 @@ // linked list can remove elements from the beginning faster than

// Should close be emitted on destroy. Defaults to true.
this.emitClose = options.emitClose !== false;
// has it been destroyed

@@ -192,4 +212,8 @@ this.destroyed = false;

this._readableState = new ReadableState(options, this);
// Checking for a Stream.Duplex instance is faster here instead of inside
// the ReadableState constructor, at least with V8 6.5
var isDuplex = this instanceof Duplex;
this._readableState = new ReadableState(options, this, isDuplex);
// legacy

@@ -208,2 +232,6 @@ this.readable = true;

Object.defineProperty(Readable.prototype, 'destroyed', {
// making it explicit this property is not enumerable
// because otherwise some prototype manipulation in
// userland will fail
enumerable: false,
get: function () {

@@ -231,3 +259,2 @@ if (this._readableState === undefined) {

Readable.prototype._destroy = function (err, cb) {
this.push(null);
cb(err);

@@ -266,2 +293,3 @@ };

function readableAddChunk(stream, chunk, encoding, addToFront, skipChunkCheck) {
debug('readableAddChunk', chunk);
var state = stream._readableState;

@@ -277,3 +305,3 @@ if (chunk === null) {

} else if (state.objectMode || chunk && chunk.length > 0) {
if (typeof chunk !== 'string' && !state.objectMode && Object.getPrototypeOf(chunk) !== Buffer.prototype) {
if (typeof chunk !== 'string' && !state.objectMode && (0, (_getPrototypeOf || _load_getPrototypeOf()).default)(chunk) !== Buffer.prototype) {
chunk = _uint8ArrayToBuffer(chunk);

@@ -283,5 +311,7 @@ }

if (addToFront) {
if (state.endEmitted) stream.emit('error', new Error('stream.unshift() after end event'));else addChunk(stream, state, chunk, true);
if (state.endEmitted) stream.emit('error', new ERR_STREAM_UNSHIFT_AFTER_END_EVENT());else addChunk(stream, state, chunk, true);
} else if (state.ended) {
stream.emit('error', new Error('stream.push() after EOF'));
stream.emit('error', new ERR_STREAM_PUSH_AFTER_EOF());
} else if (state.destroyed) {
return false;
} else {

@@ -298,6 +328,10 @@ state.reading = false;

state.reading = false;
maybeReadMore(stream, state);
}
}
return needMoreData(state);
// We can push more data if we are below the highWaterMark.
// Also, if we have no data yet, we can stand some more bytes.
// This is to work around cases where hwm=0, such as the repl.
return !state.ended && (state.length < state.highWaterMark || state.length === 0);
}

@@ -307,4 +341,4 @@

if (state.flowing && state.length === 0 && !state.sync) {
state.awaitDrain = 0;
stream.emit('data', chunk);
stream.read(0);
} else {

@@ -323,3 +357,3 @@ // update the buffer info.

if (!_isUint8Array(chunk) && typeof chunk !== 'string' && chunk !== undefined && !state.objectMode) {
er = new TypeError('Invalid non-string/buffer chunk');
er = new ERR_INVALID_ARG_TYPE('chunk', ['string', 'Buffer', 'Uint8Array'], chunk);
}

@@ -329,13 +363,2 @@ return er;

// if it's past the high water mark, we can push in some more.
// Also, if we have no data yet, we can stand some
// more bytes. This is to work around cases where hwm=0,
// such as the repl. Also, if the push() triggered a
// readable event, and the user called read(largeNumber) such that
// needReadable was set, then we ought to push more, so that another
// 'readable' event will be triggered.
function needMoreData(state) {
return !state.ended && (state.needReadable || state.length < state.highWaterMark || state.length === 0);
}
Readable.prototype.isPaused = function () {

@@ -349,3 +372,4 @@ return this._readableState.flowing === false;

this._readableState.decoder = new StringDecoder(enc);
this._readableState.encoding = enc;
// if setEncoding(null), decoder.encoding equals utf8
this._readableState.encoding = this._readableState.decoder.encoding;
return this;

@@ -378,3 +402,3 @@ };

if (state.objectMode) return 1;
if (n !== n) {
if ((0, (_isNan || _load_isNan()).default)(n)) {
// Only flow one buffer at a time

@@ -479,2 +503,3 @@ if (state.flowing && state.length) return state.buffer.head.data.length;else return state.length;

state.length -= n;
state.awaitDrain = 0;
}

@@ -507,4 +532,15 @@

// emit 'readable' now to make sure it gets picked up.
emitReadable(stream);
if (state.sync) {
// if we are sync, wait until next tick to emit the data.
// Otherwise we risk emitting data in the flow()
// the readable code triggers during a read() call
emitReadable(stream);
} else {
// emit 'readable' now to make sure it gets picked up.
state.needReadable = false;
if (!state.emittedReadable) {
state.emittedReadable = true;
emitReadable_(stream);
}
}
}

@@ -521,3 +557,3 @@

state.emittedReadable = true;
if (state.sync) pna.nextTick(emitReadable_, stream);else emitReadable_(stream);
pna.nextTick(emitReadable_, stream);
}

@@ -527,4 +563,15 @@ }

function emitReadable_(stream) {
var state = stream._readableState;
debug('emit readable');
stream.emit('readable');
if (!state.destroyed && (state.length || state.ended)) {
stream.emit('readable');
}
// The stream needs another readable event if
// 1. It is not flowing, as the flow mechanism will take
// care of it.
// 2. It is not ended.
// 3. It is below the highWaterMark, so we can schedule
// another readable later.
state.needReadable = !state.flowing && !state.ended && state.length <= state.highWaterMark;
flow(stream);

@@ -548,3 +595,3 @@ }

var len = state.length;
while (!state.reading && !state.flowing && !state.ended && state.length < state.highWaterMark) {
while (!state.reading && !state.ended && state.length < state.highWaterMark) {
debug('maybeReadMore read 0');

@@ -564,3 +611,3 @@ stream.read(0);

Readable.prototype._read = function (n) {
this.emit('error', new Error('_read() is not implemented'));
this.emit('error', new ERR_METHOD_NOT_IMPLEMENTED('_read()'));
};

@@ -637,13 +684,8 @@

// If the user pushes more data while we're writing to dest then we'll end up
// in ondata again. However, we only want to increase awaitDrain once because
// dest will only emit one 'drain' event for the multiple writes.
// => Introduce a guard on increasing awaitDrain.
var increasedAwaitDrain = false;
src.on('data', ondata);
function ondata(chunk) {
debug('ondata');
increasedAwaitDrain = false;
var ret = dest.write(chunk);
if (false === ret && !increasedAwaitDrain) {
debug('dest.write', ret);
if (ret === false) {
// If the user unpiped during `dest.write()`, it is possible

@@ -654,5 +696,4 @@ // to get stuck in a permanently paused state if that write

if ((state.pipesCount === 1 && state.pipes === dest || state.pipesCount > 1 && indexOf(state.pipes, dest) !== -1) && !cleanedUp) {
debug('false write response, pause', src._readableState.awaitDrain);
src._readableState.awaitDrain++;
increasedAwaitDrain = true;
debug('false write response, pause', state.awaitDrain);
state.awaitDrain++;
}

@@ -750,3 +791,3 @@ src.pause();

for (var i = 0; i < len; i++) {
dests[i].emit('unpipe', this, unpipeInfo);
dests[i].emit('unpipe', this, { hasUnpiped: false });
}return this;

@@ -772,15 +813,19 @@ }

var res = Stream.prototype.on.call(this, ev, fn);
var state = this._readableState;
if (ev === 'data') {
// Start flowing on next tick if stream isn't explicitly paused
if (this._readableState.flowing !== false) this.resume();
// update readableListening so that resume() may be a no-op
// a few lines down. This is needed to support once('readable').
state.readableListening = this.listenerCount('readable') > 0;
// Try start flowing on next tick if stream isn't explicitly paused
if (state.flowing !== false) this.resume();
} else if (ev === 'readable') {
var state = this._readableState;
if (!state.endEmitted && !state.readableListening) {
state.readableListening = state.needReadable = true;
state.emittedReadable = false;
if (!state.reading) {
if (state.length) {
emitReadable(this);
} else if (!state.reading) {
pna.nextTick(nReadingNextTick, this);
} else if (state.length) {
emitReadable(this);
}

@@ -794,2 +839,38 @@ }

Readable.prototype.removeListener = function (ev, fn) {
var res = Stream.prototype.removeListener.call(this, ev, fn);
if (ev === 'readable') {
// We need to check if there is someone still listening to
// readable and reset the state. However this needs to happen
// after readable has been emitted but before I/O (nextTick) to
// support once('readable', fn) cycles. This means that calling
// resume within the same tick will have no
// effect.
pna.nextTick(updateReadableListening, this);
}
return res;
};
Readable.prototype.removeAllListeners = function (ev) {
var res = Stream.prototype.removeAllListeners.apply(this, arguments);
if (ev === 'readable' || ev === undefined) {
// We need to check if there is someone still listening to
// readable and reset the state. However this needs to happen
// after readable has been emitted but before I/O (nextTick) to
// support once('readable', fn) cycles. This means that calling
// resume within the same tick will have no
// effect.
pna.nextTick(updateReadableListening, this);
}
return res;
};
function updateReadableListening(self) {
self._readableState.readableListening = self.listenerCount('readable') > 0;
}
function nReadingNextTick(self) {

@@ -806,3 +887,5 @@ debug('readable nexttick read 0');

debug('resume');
state.flowing = true;
// we flow only if there is no one listening
// for readable
state.flowing = !state.readableListening;
resume(this, state);

@@ -821,4 +904,4 @@ }

function resume_(stream, state) {
debug('resume', state.reading);
if (!state.reading) {
debug('resume read 0');
stream.read(0);

@@ -828,3 +911,2 @@ }

state.resumeScheduled = false;
state.awaitDrain = 0;
stream.emit('resume');

@@ -837,3 +919,3 @@ flow(stream);

debug('call pause flowing=%j', this._readableState.flowing);
if (false !== this._readableState.flowing) {
if (this._readableState.flowing !== false) {
debug('pause');

@@ -915,2 +997,8 @@ this._readableState.flowing = false;

Readable.prototype[(_symbol || _load_symbol()).default.asyncIterator] = function () {
emitExperimentalWarning('Readable[Symbol.asyncIterator]');
if (ReadableAsyncIterator === undefined) ReadableAsyncIterator = require('./internal/streams/async_iterator');
return new ReadableAsyncIterator(this);
};
Object.defineProperty(Readable.prototype, 'readableHighWaterMark', {

@@ -926,5 +1014,40 @@ // making it explicit this property is not enumerable

Object.defineProperty(Readable.prototype, 'readableBuffer', {
// making it explicit this property is not enumerable
// because otherwise some prototype manipulation in
// userland will fail
enumerable: false,
get: function () {
return this._readableState && this._readableState.buffer;
}
});
Object.defineProperty(Readable.prototype, 'readableFlowing', {
// making it explicit this property is not enumerable
// because otherwise some prototype manipulation in
// userland will fail
enumerable: false,
get: function () {
return this._readableState.flowing;
},
set: function (state) {
if (this._readableState) {
this._readableState.flowing = state;
}
}
});
// exposed for testing purposes only.
Readable._fromList = fromList;
Object.defineProperty(Readable.prototype, 'readableLength', {
// making it explicit this property is not enumerable
// because otherwise some prototype manipulation in
// userland will fail
enumerable: false,
get: function () {
return this._readableState.length;
}
});
// Pluck off n bytes from an array of buffers.

@@ -941,7 +1064,7 @@ // Length is the combined lengths of all the buffers in the list.

// read it all, truncate the list
if (state.decoder) ret = state.buffer.join('');else if (state.buffer.length === 1) ret = state.buffer.head.data;else ret = state.buffer.concat(state.length);
if (state.decoder) ret = state.buffer.join('');else if (state.buffer.length === 1) ret = state.buffer.first();else ret = state.buffer.concat(state.length);
state.buffer.clear();
} else {
// read part of list
ret = fromListPartial(n, state.buffer, state.decoder);
ret = state.buffer.consume(n, state.decoder);
}

@@ -952,88 +1075,6 @@

// Extracts only enough buffered data to satisfy the amount requested.
// This function is designed to be inlinable, so please take care when making
// changes to the function body.
function fromListPartial(n, list, hasStrings) {
var ret;
if (n < list.head.data.length) {
// slice is the same for buffers and strings
ret = list.head.data.slice(0, n);
list.head.data = list.head.data.slice(n);
} else if (n === list.head.data.length) {
// first chunk is a perfect match
ret = list.shift();
} else {
// result spans more than one buffer
ret = hasStrings ? copyFromBufferString(n, list) : copyFromBuffer(n, list);
}
return ret;
}
// Copies a specified amount of characters from the list of buffered data
// chunks.
// This function is designed to be inlinable, so please take care when making
// changes to the function body.
function copyFromBufferString(n, list) {
var p = list.head;
var c = 1;
var ret = p.data;
n -= ret.length;
while (p = p.next) {
var str = p.data;
var nb = n > str.length ? str.length : n;
if (nb === str.length) ret += str;else ret += str.slice(0, n);
n -= nb;
if (n === 0) {
if (nb === str.length) {
++c;
if (p.next) list.head = p.next;else list.head = list.tail = null;
} else {
list.head = p;
p.data = str.slice(nb);
}
break;
}
++c;
}
list.length -= c;
return ret;
}
// Copies a specified amount of bytes from the list of buffered data chunks.
// This function is designed to be inlinable, so please take care when making
// changes to the function body.
function copyFromBuffer(n, list) {
var ret = Buffer.allocUnsafe(n);
var p = list.head;
var c = 1;
p.data.copy(ret);
n -= p.data.length;
while (p = p.next) {
var buf = p.data;
var nb = n > buf.length ? buf.length : n;
buf.copy(ret, ret.length - n, 0, nb);
n -= nb;
if (n === 0) {
if (nb === buf.length) {
++c;
if (p.next) list.head = p.next;else list.head = list.tail = null;
} else {
list.head = p;
p.data = buf.slice(nb);
}
break;
}
++c;
}
list.length -= c;
return ret;
}
function endReadable(stream) {
var state = stream._readableState;
// If we get here before consuming all the bytes, then that is a
// bug in node. Should never happen.
if (state.length > 0) throw new Error('"endReadable()" called on non-empty stream');
debug('endReadable', state.endEmitted);
if (!state.endEmitted) {

@@ -1046,2 +1087,4 @@ state.ended = true;

function endReadableNT(state, stream) {
debug('endReadableNT', state.endEmitted, state.length);
// Check that we didn't get one last unshift.

@@ -1048,0 +1091,0 @@ if (!state.endEmitted && state.length === 0) {

@@ -68,11 +68,12 @@ // Copyright Joyent, Inc. and other Node contributors.

var _require$codes = require('../errors').codes,
ERR_METHOD_NOT_IMPLEMENTED = _require$codes.ERR_METHOD_NOT_IMPLEMENTED,
ERR_MULTIPLE_CALLBACK = _require$codes.ERR_MULTIPLE_CALLBACK,
ERR_TRANSFORM_ALREADY_TRANSFORMING = _require$codes.ERR_TRANSFORM_ALREADY_TRANSFORMING,
ERR_TRANSFORM_WITH_LENGTH_0 = _require$codes.ERR_TRANSFORM_WITH_LENGTH_0;
var Duplex = require('./_stream_duplex');
/*<replacement>*/
var util = require('core-util-is');
util.inherits = require('inherits');
/*</replacement>*/
require('inherits')(Transform, Duplex);
util.inherits(Transform, Duplex);
function afterTransform(er, data) {

@@ -84,4 +85,4 @@ var ts = this._transformState;

if (!cb) {
return this.emit('error', new Error('write callback called multiple times'));
if (cb === null) {
return this.emit('error', new ERR_MULTIPLE_CALLBACK());
}

@@ -139,3 +140,3 @@

if (typeof this._flush === 'function') {
if (typeof this._flush === 'function' && !this._readableState.destroyed) {
this._flush(function (er, data) {

@@ -165,3 +166,3 @@ done(_this, er, data);

Transform.prototype._transform = function (chunk, encoding, cb) {
throw new Error('_transform() is not implemented');
cb(new ERR_METHOD_NOT_IMPLEMENTED('_transform()'));
};

@@ -186,3 +187,3 @@

if (ts.writechunk !== null && ts.writecb && !ts.transforming) {
if (ts.writechunk !== null && !ts.transforming) {
ts.transforming = true;

@@ -198,7 +199,4 @@ this._transform(ts.writechunk, ts.writeencoding, ts.afterTransform);

Transform.prototype._destroy = function (err, cb) {
var _this2 = this;
Duplex.prototype._destroy.call(this, err, function (err2) {
cb(err2);
_this2.emit('close');
});

@@ -213,9 +211,9 @@ };

// TODO(BridgeAR): Write a test for these two error cases
// if there's nothing in the write buffer, then that means
// that nothing more will ever be provided
if (stream._writableState.length) throw new Error('Calling transform done when ws.length != 0');
if (stream._writableState.length) throw new ERR_TRANSFORM_WITH_LENGTH_0();
if (stream._transformState.transforming) throw new Error('Calling transform done when still transforming');
if (stream._transformState.transforming) throw new ERR_TRANSFORM_ALREADY_TRANSFORMING();
return stream.push(null);
}

@@ -30,2 +30,28 @@ // Copyright Joyent, Inc. and other Node contributors.

var _defineProperty;
function _load_defineProperty() {
return _defineProperty = _interopRequireDefault(require('babel-runtime/core-js/object/define-property'));
}
var _hasInstance;
function _load_hasInstance() {
return _hasInstance = _interopRequireDefault(require('babel-runtime/core-js/symbol/has-instance'));
}
var _symbol;
function _load_symbol() {
return _symbol = _interopRequireDefault(require('babel-runtime/core-js/symbol'));
}
var _setImmediate2;
function _load_setImmediate() {
return _setImmediate2 = _interopRequireDefault(require('babel-runtime/core-js/set-immediate'));
}
function _interopRequireDefault(obj) { return obj && obj.__esModule ? obj : { default: obj }; }
var pna = require('process-nextick-args');

@@ -58,3 +84,3 @@ /*</replacement>*/

/*<replacement>*/
var asyncWrite = !process.browser && ['v0.10', 'v0.9.'].indexOf(process.version.slice(0, 5)) > -1 ? setImmediate : pna.nextTick;
var asyncWrite = !process.browser && ['v0.10', 'v0.9.'].indexOf(process.version.slice(0, 5)) > -1 ? (_setImmediate2 || _load_setImmediate()).default : pna.nextTick;
/*</replacement>*/

@@ -69,7 +95,2 @@

/*<replacement>*/
var util = require('core-util-is');
util.inherits = require('inherits');
/*</replacement>*/
/*<replacement>*/
var internalUtil = {

@@ -84,5 +105,3 @@ deprecate: require('util-deprecate')

/*<replacement>*/
var Buffer = require('safe-buffer').Buffer;
var Buffer = require('buffer').Buffer;
var OurUint8Array = global.Uint8Array || function () {};

@@ -96,11 +115,22 @@ function _uint8ArrayToBuffer(chunk) {

/*</replacement>*/
var destroyImpl = require('./internal/streams/destroy');
util.inherits(Writable, Stream);
var _require = require('./internal/streams/state'),
getHighWaterMark = _require.getHighWaterMark;
var _require$codes = require('../errors').codes,
ERR_INVALID_ARG_TYPE = _require$codes.ERR_INVALID_ARG_TYPE,
ERR_METHOD_NOT_IMPLEMENTED = _require$codes.ERR_METHOD_NOT_IMPLEMENTED,
ERR_MULTIPLE_CALLBACK = _require$codes.ERR_MULTIPLE_CALLBACK,
ERR_STREAM_CANNOT_PIPE = _require$codes.ERR_STREAM_CANNOT_PIPE,
ERR_STREAM_DESTROYED = _require$codes.ERR_STREAM_DESTROYED,
ERR_STREAM_NULL_VALUES = _require$codes.ERR_STREAM_NULL_VALUES,
ERR_STREAM_WRITE_AFTER_END = _require$codes.ERR_STREAM_WRITE_AFTER_END,
ERR_UNKNOWN_ENCODING = _require$codes.ERR_UNKNOWN_ENCODING;
require('inherits')(Writable, Stream);
function nop() {}
function WritableState(options, stream) {
function WritableState(options, stream, isDuplex) {
Duplex = Duplex || require('./_stream_duplex');

@@ -115,3 +145,3 @@

// These options can be provided separately as readableXXX and writableXXX.
var isDuplex = stream instanceof Duplex;
if (typeof isDuplex !== 'boolean') isDuplex = stream instanceof Duplex;

@@ -127,11 +157,4 @@ // object stream flag to indicate whether or not this stream

// the entire buffer is not flushed immediately on write()
var hwm = options.highWaterMark;
var writableHwm = options.writableHighWaterMark;
var defaultHwm = this.objectMode ? 16 : 16 * 1024;
this.highWaterMark = getHighWaterMark(this, options, 'writableHighWaterMark', isDuplex);
if (hwm || hwm === 0) this.highWaterMark = hwm;else if (isDuplex && (writableHwm || writableHwm === 0)) this.highWaterMark = writableHwm;else this.highWaterMark = defaultHwm;
// cast to ints.
this.highWaterMark = Math.floor(this.highWaterMark);
// if _final has been called

@@ -210,2 +233,5 @@ this.finalCalled = false;

// Should close be emitted on destroy. Defaults to true.
this.emitClose = options.emitClose !== false;
// count buffered requests

@@ -242,5 +268,5 @@ this.bufferedRequestCount = 0;

var realHasInstance;
if (typeof Symbol === 'function' && Symbol.hasInstance && typeof Function.prototype[Symbol.hasInstance] === 'function') {
realHasInstance = Function.prototype[Symbol.hasInstance];
Object.defineProperty(Writable, Symbol.hasInstance, {
if (typeof (_symbol || _load_symbol()).default === 'function' && (_hasInstance || _load_hasInstance()).default && typeof Function.prototype[(_hasInstance || _load_hasInstance()).default] === 'function') {
realHasInstance = Function.prototype[(_hasInstance || _load_hasInstance()).default];
(0, (_defineProperty || _load_defineProperty()).default)(Writable, (_hasInstance || _load_hasInstance()).default, {
value: function (object) {

@@ -269,8 +295,11 @@ if (realHasInstance.call(this, object)) return true;

// `_writableState` that would lead to infinite recursion.
if (!realHasInstance.call(Writable, this) && !(this instanceof Duplex)) {
return new Writable(options);
}
this._writableState = new WritableState(options, this);
// Checking for a Stream.Duplex instance is faster here instead of inside
// the WritableState constructor, at least with V8 6.5
var isDuplex = this instanceof Duplex;
if (!isDuplex && !realHasInstance.call(Writable, this)) return new Writable(options);
this._writableState = new WritableState(options, this, isDuplex);
// legacy.

@@ -294,7 +323,7 @@ this.writable = true;

Writable.prototype.pipe = function () {
this.emit('error', new Error('Cannot pipe, not readable'));
this.emit('error', new ERR_STREAM_CANNOT_PIPE());
};
function writeAfterEnd(stream, cb) {
var er = new Error('write after end');
var er = new ERR_STREAM_WRITE_AFTER_END();
// TODO: defer error events consistently everywhere, not just the cb

@@ -309,9 +338,8 @@ stream.emit('error', er);

function validChunk(stream, state, chunk, cb) {
var valid = true;
var er = false;
var er;
if (chunk === null) {
er = new TypeError('May not write null values to stream');
} else if (typeof chunk !== 'string' && chunk !== undefined && !state.objectMode) {
er = new TypeError('Invalid non-string/buffer chunk');
er = new ERR_STREAM_NULL_VALUES();
} else if (typeof chunk !== 'string' && !state.objectMode) {
er = new ERR_INVALID_ARG_TYPE('chunk', ['string', 'Buffer'], chunk);
}

@@ -321,5 +349,5 @@ if (er) {

pna.nextTick(cb, er);
valid = false;
return false;
}
return valid;
return true;
}

@@ -345,3 +373,3 @@

if (state.ended) writeAfterEnd(this, cb);else if (isBuf || validChunk(this, state, chunk, cb)) {
if (state.ending) writeAfterEnd(this, cb);else if (isBuf || validChunk(this, state, chunk, cb)) {
state.pendingcb++;

@@ -355,5 +383,3 @@ ret = writeOrBuffer(this, state, isBuf, chunk, encoding, cb);

Writable.prototype.cork = function () {
var state = this._writableState;
state.corked++;
this._writableState.corked++;
};

@@ -367,3 +393,3 @@

if (!state.writing && !state.corked && !state.finished && !state.bufferProcessing && state.bufferedRequest) clearBuffer(this, state);
if (!state.writing && !state.corked && !state.bufferProcessing && state.bufferedRequest) clearBuffer(this, state);
}

@@ -375,3 +401,3 @@ };

if (typeof encoding === 'string') encoding = encoding.toLowerCase();
if (!(['hex', 'utf8', 'utf-8', 'ascii', 'binary', 'base64', 'ucs2', 'ucs-2', 'utf16le', 'utf-16le', 'raw'].indexOf((encoding + '').toLowerCase()) > -1)) throw new TypeError('Unknown encoding: ' + encoding);
if (!(['hex', 'utf8', 'utf-8', 'ascii', 'binary', 'base64', 'ucs2', 'ucs-2', 'utf16le', 'utf-16le', 'raw'].indexOf((encoding + '').toLowerCase()) > -1)) throw new ERR_UNKNOWN_ENCODING(encoding);
this._writableState.defaultEncoding = encoding;

@@ -381,2 +407,12 @@ return this;

Object.defineProperty(Writable.prototype, 'writableBuffer', {
// making it explicit this property is not enumerable
// because otherwise some prototype manipulation in
// userland will fail
enumerable: false,
get: function () {
return this._writableState && this._writableState.getBuffer();
}
});
function decodeChunk(state, chunk, encoding) {

@@ -446,3 +482,3 @@ if (!state.objectMode && state.decodeStrings !== false && typeof chunk === 'string') {

state.sync = true;
if (writev) stream._writev(chunk, state.onwrite);else stream._write(chunk, encoding, state.onwrite);
if (state.destroyed) state.onwrite(new ERR_STREAM_DESTROYED('write'));else if (writev) stream._writev(chunk, state.onwrite);else stream._write(chunk, encoding, state.onwrite);
state.sync = false;

@@ -487,2 +523,4 @@ }

if (typeof cb !== 'function') throw new ERR_MULTIPLE_CALLBACK();
onwriteStateUpdate(state);

@@ -588,3 +626,3 @@

Writable.prototype._write = function (chunk, encoding, cb) {
cb(new Error('_write() is not implemented'));
cb(new ERR_METHOD_NOT_IMPLEMENTED('_write()'));
};

@@ -615,5 +653,17 @@

// ignore unnecessary end() calls.
if (!state.ending && !state.finished) endWritable(this, state, cb);
if (!state.ending) endWritable(this, state, cb);
return this;
};
Object.defineProperty(Writable.prototype, 'writableLength', {
// making it explicit this property is not enumerable
// because otherwise some prototype manipulation in
// userland will fail
enumerable: false,
get: function () {
return this._writableState.length;
}
});
function needFinish(state) {

@@ -635,3 +685,3 @@ return state.ending && state.length === 0 && state.bufferedRequest === null && !state.finished && !state.writing;

if (!state.prefinished && !state.finalCalled) {
if (typeof stream._final === 'function') {
if (typeof stream._final === 'function' && !state.destroyed) {
state.pendingcb++;

@@ -678,10 +728,12 @@ state.finalCalled = true;

}
if (state.corkedRequestsFree) {
state.corkedRequestsFree.next = corkReq;
} else {
state.corkedRequestsFree = corkReq;
}
// reuse the free corkReq.
state.corkedRequestsFree.next = corkReq;
}
Object.defineProperty(Writable.prototype, 'destroyed', {
// making it explicit this property is not enumerable
// because otherwise some prototype manipulation in
// userland will fail
enumerable: false,
get: function () {

@@ -709,4 +761,3 @@ if (this._writableState === undefined) {

Writable.prototype._destroy = function (err, cb) {
this.end();
cb(err);
};

@@ -38,3 +38,3 @@ 'use strict';

if (!cb && err) {
pna.nextTick(emitErrorNT, _this, err);
pna.nextTick(emitErrorAndCloseNT, _this, err);
if (_this._writableState) {

@@ -44,3 +44,6 @@ _this._writableState.errorEmitted = true;

} else if (cb) {
pna.nextTick(emitCloseNT, _this);
cb(err);
} else {
pna.nextTick(emitCloseNT, _this);
}

@@ -52,2 +55,13 @@ });

function emitErrorAndCloseNT(self, err) {
emitErrorNT(self, err);
emitCloseNT(self);
}
function emitCloseNT(self) {
if (self._writableState && !self._writableState.emitClose) return;
if (self._readableState && !self._readableState.emitClose) return;
self.emit('close');
}
function undestroy() {

@@ -65,2 +79,4 @@ if (this._readableState) {

this._writableState.ending = false;
this._writableState.finalCalled = false;
this._writableState.prefinished = false;
this._writableState.finished = false;

@@ -67,0 +83,0 @@ this._writableState.errorEmitted = false;

{
"name": "readable-stream",
"version": "2.3.6",
"version": "3.0.0-rc.1",
"description": "Streams3, a user-land copy of the stream library from Node.js",
"main": "readable.js",
"dependencies": {
"core-util-is": "~1.0.0",
"inherits": "~2.0.3",
"isarray": "~1.0.0",
"process-nextick-args": "~2.0.0",
"safe-buffer": "~5.1.1",
"string_decoder": "~1.1.1",
"util-deprecate": "~1.0.1"
"inherits": "^2.0.3",
"process-nextick-args": "^2.0.0",
"string_decoder": "^1.1.1",
"util-deprecate": "^1.0.1"
},
"devDependencies": {
"airtap": "0.0.9",
"assert": "^1.4.0",
"babel-cli": "^6.26.0",
"babel-core": "^6.26.3",
"babel-plugin-transform-async-generator-functions": "^6.24.1",
"babel-plugin-transform-async-to-generator": "^6.24.1",
"babel-plugin-transform-es2015-arrow-functions": "^6.5.2",
"babel-plugin-transform-es2015-block-scoping": "^6.26.0",
"babel-plugin-transform-es2015-classes": "^6.24.1",
"babel-plugin-transform-es2015-computed-properties": "^6.24.1",
"babel-plugin-transform-es2015-destructuring": "^6.18.0",
"babel-plugin-transform-es2015-for-of": "^6.8.0",
"babel-plugin-transform-es2015-parameters": "^6.24.1",
"babel-plugin-transform-es2015-shorthand-properties": "^6.24.1",
"babel-plugin-transform-es2015-spread": "^6.22.0",
"babel-plugin-transform-es2015-template-literals": "^6.8.0",
"babel-plugin-transform-inline-imports-commonjs": "^1.2.0",
"babel-plugin-transform-runtime": "^6.23.0",
"babel-polyfill": "^6.9.1",
"buffer": "^4.9.0",
"lolex": "^2.3.2",
"nyc": "^6.4.0",
"tap": "^0.7.0",
"tape": "^4.8.0"
"babel-preset-env": "^1.7.0",
"bl": "^2.0.0",
"buffer": "^5.1.0",
"deep-strict-equal": "^0.2.0",
"glob": "^7.1.2",
"gunzip-maybe": "^1.4.1",
"hyperquest": "^2.1.3",
"lolex": "^2.6.0",
"nyc": "^11.0.0",
"pump": "^3.0.0",
"rimraf": "^2.6.2",
"tap": "^11.0.0",
"tape": "^4.9.0",
"tar-fs": "^1.16.2",
"util-promisify": "^2.1.0"
},
"scripts": {
"test": "tap test/parallel/*.js test/ours/*.js && node test/verify-dependencies.js",
"ci": "tap test/parallel/*.js test/ours/*.js --tap | tee test.tap && node test/verify-dependencies.js",
"test": "tap -j 4 test/parallel/*.js test/ours/*.js",
"ci": "TAP=1 tap test/parallel/*.js test/ours/*.js | tee test.tap",
"test-browsers": "airtap --sauce-connect --loopback airtap.local -- test/browser.js",
"test-browser-local": "airtap --local -- test/browser.js",
"cover": "nyc npm test",
"report": "nyc report --reporter=lcov"
"report": "nyc report --reporter=lcov",
"update-browser-errors": "babel --presets env -o errors-browser.js errors.js"
},

@@ -41,5 +68,5 @@ "repository": {

"util": false,
"worker_threads": false,
"./errors": "./errors-browser.js",
"./readable.js": "./readable-browser.js",
"./writable.js": "./writable-browser.js",
"./duplex.js": "./duplex-browser.js",
"./lib/internal/streams/stream.js": "./lib/internal/streams/stream-browser.js"

@@ -46,0 +73,0 @@ },

@@ -19,2 +19,4 @@ var Stream = require('stream');

exports.PassThrough = require('./lib/_stream_passthrough.js');
exports.finished = require('./lib/internal/streams/end-of-stream.js');
exports.pipeline = require('./lib/internal/streams/pipeline.js');
}
# readable-stream
***Node-core v8.11.1 streams for userland*** [![Build Status](https://travis-ci.org/nodejs/readable-stream.svg?branch=master)](https://travis-ci.org/nodejs/readable-stream)
***Node.js core v10.5.0 streams for userland*** [![Build Status](https://travis-ci.com/nodejs/readable-stream.svg?branch=master)](https://travis-ci.com/nodejs/readable-stream)

@@ -10,3 +10,3 @@

[![Sauce Test Status](https://saucelabs.com/browser-matrix/readable-stream.svg)](https://saucelabs.com/u/readable-stream)
[![Sauce Test Status](https://saucelabs.com/browser-matrix/readabe-stream.svg)](https://saucelabs.com/u/readabe-stream)

@@ -17,8 +17,7 @@ ```bash

***Node-core streams for userland***
***Node.js core streams for userland***
This package is a mirror of the Streams2 and Streams3 implementations in
Node-core.
This package is a mirror of the streams implementations in Node.js.
Full documentation may be found on the [Node.js website](https://nodejs.org/dist/v8.11.1/docs/api/stream.html).
Full documentation may be found on the [Node.js website](https://nodejs.org/dist/v10.5.0/docs/api/stream.html).

@@ -30,2 +29,8 @@ If you want to guarantee a stable streams base, regardless of what version of

v3.x.x of `readable-stream` supports Node 6, 8, and 10, as well as
evergreen browsers and IE 11.
v2.x.x of `readable-stream` supports all Node.js version from 0.8, as well as
evergreen browsers and IE 10 & 11.
# Streams Working Group

@@ -32,0 +37,0 @@

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc