Socket
Socket
Sign inDemoInstall

readable-stream

Package Overview
Dependencies
0
Maintainers
1
Versions
103
Alerts
File Explorer

Advanced tools

Install Socket

Detect and block malicious and high-risk dependencies

Install

Comparing version 0.0.2 to 0.0.3

duplex.js

76

fs.js

@@ -7,11 +7,2 @@ 'use strict';

// implement a read-method style readable stream.
//
// In a perfect world, some of this dancing and buffering would
// not be necessary; we could just open the file using async IO,
// and then read() synchronously until we raise EWOULDBLOCK.
//
// It a just-slightly-less imperfect world, FS readable streams
// would be the *only* stream that implements this kind of buffering
// behavior, since TCP and pipes can be reliably implemented in this
// fashion at a much lower level.

@@ -38,21 +29,17 @@ var Readable = require('./readable.js');

function FSReadable(path, options) {
if (!options) options = {};
if (!options)
options = {};
Readable.apply(this, options);
Readable.call(this, options);
var state = this._readableState;
this.path = path;
this.flags = 'r';
this.mode = 438; //=0666
this.fd = null;
this.bufferSize = 64 * 1024;
this.lowWaterMark = 16 * 1024;
this.flags = options.flags || 'r';
this.mode = options.mode || 438; //=0666
this.fd = options.fd || null;
Object.keys(options).forEach(function(k) {
this[k] = options[k];
}, this);
// a little bit bigger buffer and watermark by default
state.bufferSize = options.bufferSize || 64 * 1024;
state.lowWaterMark = options.lowWaterMark || 16 * 1024;
// cast to an int
assert(typeof this.bufferSize === 'number');
this.bufferSize = ~~this.bufferSize;
if (this.encoding) {

@@ -64,11 +51,10 @@ this._decoder = new StringDecoder(this.encoding);

if (typeofStart !== 'undefined') {
if (typeofStart !== 'number') {
if (typeofStart !== 'number')
throw new TypeError('start must be a Number');
}
var typeofEnd = typeof this.end;
if (typeofEnd === 'undefined') {
if (typeofEnd === 'undefined')
this.end = Infinity;
} else if (typeofEnd !== 'number') {
else if (typeofEnd !== 'number')
throw new TypeError('end must be a Number');
}

@@ -78,5 +64,4 @@ this.pos = this.start;

if (typeof this.fd !== 'number') {
if (typeof this.fd !== 'number')
this.open();
}
}

@@ -95,3 +80,3 @@

}.bind(this));
}
};

@@ -104,4 +89,4 @@ FSReadable.prototype._read = function(n, cb) {

if (this.reading || this.ended || this.destroyed) return;
this.reading = true;
if (this.destroyed)
return;

@@ -119,8 +104,7 @@ if (!pool || pool.length - pool.used < minPoolSpace) {

if (this.pos !== undefined) {
if (this.pos !== undefined)
toRead = Math.min(this.end - this.pos + 1, toRead);
}
if (toRead <= 0) {
this.reading = false;
this.emit('readable');

@@ -133,4 +117,2 @@ return;

function onread(er, bytesRead) {
this.reading = false;
if (er) {

@@ -142,13 +124,15 @@ this.destroy();

var b = null;
if (bytesRead > 0) {
if (bytesRead > 0)
b = thisPool.slice(start, start + bytesRead);
}
cb(null, b);
}
}
};
FSReadable.prototype.close = function(cb) {
if (cb) this.once('close', cb);
if (cb)
this.once('close', cb);
if (this.closed || this.fd === null) {
if (this.fd === null) this.once('open', this.destroy);
if (this.fd === null)
this.once('open', this.destroy);
return process.nextTick(this.emit.bind(this, 'close'));

@@ -159,4 +143,6 @@ }

fs.close(this.fd, function(er) {
if (er) this.emit('error', er);
else this.emit('close');
if (er)
this.emit('error', er);
else
this.emit('close');
}.bind(this));

@@ -163,0 +149,0 @@ };

{
"name": "readable-stream",
"version": "0.0.2",
"version": "0.0.3",
"description": "An exploration of a new kind of readable streams for Node.js",

@@ -5,0 +5,0 @@ "main": "readable.js",

@@ -1,56 +0,41 @@

'use strict';
// Copyright Joyent, Inc. and other Node contributors.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the
// "Software"), to deal in the Software without restriction, including
// without limitation the rights to use, copy, modify, merge, publish,
// distribute, sublicense, and/or sell copies of the Software, and to permit
// persons to whom the Software is furnished to do so, subject to the
// following conditions:
//
// The above copyright notice and this permission notice shall be included
// in all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
// USE OR OTHER DEALINGS IN THE SOFTWARE.
// a passthrough stream.
// whatever you .write(), you can then .read() later.
// this is not very useful on its own, but it's a handy
// base class for certain sorts of simple filters and
// transforms.
// basically just the most minimal sort of Transform stream.
// Every written chunk gets output as-is.
module.exports = PassThrough;
var Readable = require('./readable.js');
var Transform = require('./transform.js');
var util = require('util');
util.inherits(PassThrough, Transform);
util.inherits(PassThrough, Readable);
function PassThrough(options) {
if (!(this instanceof PassThrough))
return new PassThrough(options);
var fromList = require('./from-list.js');
function PassThrough() {
Readable.apply(this);
this.buffer = [];
this.length = 0;
Transform.call(this, options);
}
// override this:
PassThrough.prototype.transform = function(c) {
return c;
PassThrough.prototype._transform = function(chunk, output, cb) {
cb(null, chunk);
};
PassThrough.prototype.write = function(c) {
var needEmitReadable = this.length === 0;
c = this.transform(c);
if (!c || !c.length) return true;
this.buffer.push(c);
this.length += c.length;
if (needEmitReadable) this.emit('readable');
return (this.length === 0);
};
PassThrough.prototype.end = function(c) {
this.ended = true;
if (c && c.length) this.write(c);
else if (!this.length) this.emit('end');
};
PassThrough.prototype.read = function(n) {
if (!n || n >= this.length) n = this.length;
var ret = fromList(n, this.buffer, this.length);
this.length = Math.max(this.length - n, 0);
if (this.length === 0) {
var ev = this.ended ? 'end' : 'drain';
process.nextTick(this.emit.bind(this, ev));
}
return ret;
};

@@ -1,2 +0,21 @@

'use strict';
// Copyright Joyent, Inc. and other Node contributors.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the
// "Software"), to deal in the Software without restriction, including
// without limitation the rights to use, copy, modify, merge, publish,
// distribute, sublicense, and/or sell copies of the Software, and to permit
// persons to whom the Software is furnished to do so, subject to the
// following conditions:
//
// The above copyright notice and this permission notice shall be included
// in all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
// USE OR OTHER DEALINGS IN THE SOFTWARE.

@@ -7,49 +26,190 @@ module.exports = Readable;

var util = require('util');
var fromList = require('./from-list.js');
var assert = require('assert');
var StringDecoder;
util.inherits(Readable, Stream);
function Readable(options) {
function ReadableState(options) {
options = options || {};
this.bufferSize = options.bufferSize || 16 * 1024;
this.lowWaterMark = options.lowWaterMark || 1024;
assert(typeof this.bufferSize === 'number');
// cast to an int
this.bufferSize = ~~this.bufferSize;
this.lowWaterMark = options.hasOwnProperty('lowWaterMark') ?
options.lowWaterMark : 1024;
this.buffer = [];
this.length = 0;
this._pipes = [];
this._flowing = false;
this.pipes = [];
this.flowing = false;
this.ended = false;
this.endEmitted = false;
this.reading = false;
// whenever we return null, then we set a flag to say
// that we're awaiting a 'readable' event emission.
this.needReadable = false;
this.decoder = null;
if (options.encoding) {
if (!StringDecoder)
StringDecoder = require('string_decoder').StringDecoder;
this.decoder = new StringDecoder(options.encoding);
}
}
function Readable(options) {
if (!(this instanceof Readable))
return new Readable(options);
this._readableState = new ReadableState(options, this);
Stream.apply(this);
}
// backwards compatibility.
Readable.prototype.setEncoding = function(enc) {
if (!StringDecoder)
StringDecoder = require('string_decoder').StringDecoder;
this._readableState.decoder = new StringDecoder(enc);
};
function howMuchToRead(n, state) {
if (state.length === 0 && state.ended)
return 0;
if (isNaN(n))
return state.length;
if (n <= 0)
return 0;
// don't have that much. return null, unless we've ended.
if (n > state.length) {
if (!state.ended) {
state.needReadable = true;
return 0;
} else
return state.length;
}
return n;
}
// you can override either this method, or _read(n, cb) below.
Readable.prototype.read = function(n) {
if (this.length === 0 && this.ended) {
process.nextTick(this.emit.bind(this, 'end'));
var state = this._readableState;
var nOrig = n;
n = howMuchToRead(n, state);
// if we've ended, and we're now clear, then finish it up.
if (n === 0 && state.ended) {
endReadable(this);
return null;
}
if (isNaN(n) || n <= 0) n = this.length;
n = Math.min(n, this.length);
// All the actual chunk generation logic needs to be
// *below* the call to _read. The reason is that in certain
// synthetic stream cases, such as passthrough streams, _read
// may be a completely synchronous operation which may change
// the state of the read buffer, providing enough data when
// before there was *not* enough.
//
// So, the steps are:
// 1. Figure out what the state of things will be after we do
// a read from the buffer.
//
// 2. If that resulting state will trigger a _read, then call _read.
// Note that this may be asynchronous, or synchronous. Yes, it is
// deeply ugly to write APIs this way, but that still doesn't mean
// that the Readable class should behave improperly, as streams are
// designed to be sync/async agnostic.
// Take note if the _read call is sync or async (ie, if the read call
// has returned yet), so that we know whether or not it's safe to emit
// 'readable' etc.
//
// 3. Actually pull the requested chunks out of the buffer and return.
var ret = n > 0 ? fromList(n, this.buffer, this.length) : null;
this.length -= n;
// if we need a readable event, then we need to do some reading.
var doRead = state.needReadable;
// if we currently have less than the lowWaterMark, then also read some
if (state.length - n <= state.lowWaterMark)
doRead = true;
// however, if we've ended, then there's no point, and if we're already
// reading, then it's unnecessary.
if (state.ended || state.reading)
doRead = false;
if (!this.ended && this.length < this.lowWaterMark) {
this._read(this.bufferSize, function onread(er, chunk) {
if (er) return this.emit('error', er);
if (doRead) {
var sync = true;
state.reading = true;
// call internal read method
this._read(state.bufferSize, function onread(er, chunk) {
state.reading = false;
if (er)
return this.emit('error', er);
if (!chunk || !chunk.length) {
this.ended = true;
if (this.length === 0) this.emit('end');
// eof
state.ended = true;
// if we've ended and we have some data left, then emit
// 'readable' now to make sure it gets picked up.
if (!sync) {
if (state.length > 0)
this.emit('readable');
else
endReadable(this);
}
return;
}
this.length += chunk.length;
this.buffer.push(chunk);
if (this.length < this.lowWaterMark) {
this._read(this.bufferSize, onread.bind(this));
if (state.decoder)
chunk = state.decoder.write(chunk);
// update the buffer info.
if (chunk) {
state.length += chunk.length;
state.buffer.push(chunk);
}
this.emit('readable');
// if we haven't gotten enough to pass the lowWaterMark,
// and we haven't ended, then don't bother telling the user
// that it's time to read more data. Otherwise, that'll
// probably kick off another stream.read(), which can trigger
// another _read(n,cb) before this one returns!
if (state.length <= state.lowWaterMark) {
state.reading = true;
this._read(state.bufferSize, onread.bind(this));
return;
}
if (state.needReadable && !sync) {
state.needReadable = false;
this.emit('readable');
}
}.bind(this));
sync = false;
}
// If _read called its callback synchronously, then `reading`
// will be false, and we need to re-evaluate how much data we
// can return to the user.
if (doRead && !state.reading)
n = howMuchToRead(nOrig, state);
var ret;
if (n > 0)
ret = fromList(n, state.buffer, state.length, !!state.decoder);
else
ret = null;
if (ret === null || ret.length === 0) {
state.needReadable = true;
n = 0;
}
state.length -= n;
return ret;

@@ -59,2 +219,5 @@ };

// abstract method. to be overridden in specific implementation classes.
// call cb(er, data) where data is <= n in length.
// for virtual (non-string, non-buffer) streams, "length" is somewhat
// arbitrary, and perhaps not very meaningful.
Readable.prototype._read = function(n, cb) {

@@ -64,32 +227,48 @@ process.nextTick(cb.bind(this, new Error('not implemented')));

Readable.prototype.pipe = function(dest, opt) {
Readable.prototype.pipe = function(dest, pipeOpts) {
var src = this;
src._pipes.push(dest);
if ((!opt || opt.end !== false) &&
dest !== process.stdout &&
var state = this._readableState;
if (!pipeOpts)
pipeOpts = {};
state.pipes.push(dest);
if ((!pipeOpts || pipeOpts.end !== false) &&
dest !== process.stdout &&
dest !== process.stderr) {
src.once('end', onend);
dest.on('unpipe', function(readable) {
if (readable === src) {
if (readable === src)
src.removeListener('end', onend);
}
});
}
dest.emit('pipe', src);
if (!src._flowing) process.nextTick(flow.bind(src));
return dest;
function onend() {
dest.end();
}
dest.emit('pipe', src);
// start the flow.
if (!state.flowing) {
state.flowing = true;
process.nextTick(flow.bind(null, src, pipeOpts));
}
return dest;
};
function flow(src) {
if (!src) src = this;
function flow(src, pipeOpts) {
var state = src._readableState;
var chunk;
var dest;
var needDrain = 0;
while (chunk = src.read()) {
src._pipes.forEach(function(dest, i, list) {
function ondrain() {
needDrain--;
if (needDrain === 0)
flow(src, pipeOpts);
}
while (state.pipes.length &&
null !== (chunk = src.read(pipeOpts.chunkSize))) {
state.pipes.forEach(function(dest, i, list) {
var written = dest.write(chunk);

@@ -101,27 +280,40 @@ if (false === written) {

});
if (needDrain > 0) return;
src.emit('data', chunk);
// if anyone needs a drain, then we have to wait for that.
if (needDrain > 0)
return;
}
src.once('readable', flow);
// if every destination was unpiped, either before entering this
// function, or in the while loop, then stop flowing.
//
// NB: This is a pretty rare edge case.
if (state.pipes.length === 0) {
state.flowing = false;
function ondrain() {
needDrain--;
if (needDrain === 0) {
flow(src);
}
// if there were data event listeners added, then switch to old mode.
if (this.listeners('data').length)
emitDataEvents(this);
return;
}
// at this point, no one needed a drain, so we just ran out of data
// on the next readable event, start it over again.
src.once('readable', flow.bind(null, src, pipeOpts));
}
Readable.prototype.unpipe = function(dest) {
var state = this._readableState;
if (!dest) {
// remove all of them.
this._pipes.forEach(function(dest, i, list) {
state.pipes.forEach(function(dest, i, list) {
dest.emit('unpipe', this);
}, this);
this._pipes.length = 0;
state.pipes.length = 0;
} else {
var i = this._pipes.indexOf(dest);
var i = state.pipes.indexOf(dest);
if (i !== -1) {
dest.emit('unpipe', this);
this._pipes.splice(i, 1);
state.pipes.splice(i, 1);
}

@@ -136,3 +328,7 @@ }

Readable.prototype.on = function(ev, fn) {
if (ev === 'data') emitDataEvents(this);
// https://github.com/isaacs/readable-stream/issues/16
// if we're already flowing, then no need to set up data events.
if (ev === 'data' && !this._readableState.flowing)
emitDataEvents(this);
return Stream.prototype.on.call(this, ev, fn);

@@ -142,3 +338,22 @@ };

// pause() and resume() are remnants of the legacy readable stream API
// If the user uses them, then switch into old mode.
Readable.prototype.resume = function() {
emitDataEvents(this);
return this.resume();
};
Readable.prototype.pause = function() {
emitDataEvents(this);
return this.pause();
};
function emitDataEvents(stream) {
var state = stream._readableState;
if (state.flowing) {
// https://github.com/isaacs/readable-stream/issues/16
throw new Error('Cannot switch to old mode now.');
}
var paused = false;

@@ -155,6 +370,9 @@ var readable = false;

var c;
while (!paused && (c = stream.read())) {
while (!paused && (null !== (c = stream.read())))
stream.emit('data', c);
if (c === null) {
readable = false;
stream._readableState.needReadable = true;
}
if (c === null) readable = false;
});

@@ -168,28 +386,32 @@

paused = false;
if (readable) stream.emit('readable');
if (readable)
stream.emit('readable');
};
// now make it start, just in case it hadn't already.
process.nextTick(function() {
stream.emit('readable');
});
}
// wrap an old-style stream
// wrap an old-style stream as the async data source.
// This is *not* part of the readable stream interface.
// It is an ugly unfortunate mess of history.
Readable.prototype.wrap = function(stream) {
this.buffer = [];
this.length = 0;
var state = this._readableState;
var paused = false;
var ended = false;
stream.on('end', function() {
ended = true;
if (this.length === 0) {
this.emit('end');
}
state.ended = true;
if (state.length === 0)
endReadable(this);
}.bind(this));
stream.on('data', function(chunk) {
this.buffer.push(chunk);
this.length += chunk.length;
state.buffer.push(chunk);
state.length += chunk.length;
this.emit('readable');
// if not consumed, then pause the stream.
if (this.length > this.lowWaterMark && !paused) {
if (state.length > state.lowWaterMark && !paused) {
paused = true;

@@ -220,10 +442,22 @@ stream.pause();

this.read = function(n) {
if (this.length === 0) return null;
if (state.length === 0) {
state.needReadable = true;
return null;
}
if (isNaN(n) || n <= 0) n = this.length;
if (isNaN(n) || n <= 0)
n = state.length;
var ret = fromList(n, this.buffer, this.length);
this.length = Math.max(0, this.length - n);
if (n > state.length) {
if (!state.ended) {
state.needReadable = true;
return null;
} else
n = state.length;
}
if (this.length < this.lowWaterMark && paused) {
var ret = fromList(n, state.buffer, state.length, !!state.decoder);
state.length -= n;
if (state.length <= state.lowWaterMark && paused) {
stream.resume();

@@ -233,7 +467,82 @@ paused = false;

if (this.length === 0 && ended) {
process.nextTick(this.emit.bind(this, 'end'));
}
if (state.length === 0 && state.ended)
endReadable(this);
return ret;
};
};
// exposed for testing purposes only.
Readable._fromList = fromList;
// Pluck off n bytes from an array of buffers.
// Length is the combined lengths of all the buffers in the list.
function fromList(n, list, length, stringMode) {
var ret;
// nothing in the list, definitely empty.
if (list.length === 0) {
return null;
}
if (length === 0)
ret = null;
else if (!n || n >= length) {
// read it all, truncate the array.
if (stringMode)
ret = list.join('');
else
ret = Buffer.concat(list, length);
list.length = 0;
} else {
// read just some of it.
if (n < list[0].length) {
// just take a part of the first list item.
// slice is the same for buffers and strings.
var buf = list[0];
ret = buf.slice(0, n);
list[0] = buf.slice(n);
} else if (n === list[0].length) {
// first list is a perfect match
ret = list.shift();
} else {
// complex case.
// we have enough to cover it, but it spans past the first buffer.
if (stringMode)
ret = '';
else
ret = new Buffer(n);
var c = 0;
for (var i = 0, l = list.length; i < l && c < n; i++) {
var buf = list[0];
var cpy = Math.min(n - c, buf.length);
if (stringMode)
ret += buf.slice(0, cpy);
else
buf.copy(ret, c, 0, cpy);
if (cpy < buf.length)
list[0] = buf.slice(cpy);
else
list.shift();
c += cpy;
}
}
}
return ret;
}
function endReadable(stream) {
var state = stream._readableState;
if (state.endEmitted)
return;
state.ended = true;
state.endEmitted = true;
process.nextTick(stream.emit.bind(stream, 'end'));
}

@@ -12,40 +12,18 @@ # readable-stream

## Usage
Note that Duplex, Transform, Writable, and PassThrough streams are also
provided as base classes. See the full API details below.
```javascript
var Readable = require('readable-stream');
var r = new Readable();
## Justification
r.read = function(n) {
// your magic goes here.
// return n bytes, or null if there is nothing to be read.
// if you return null, then you MUST emit 'readable' at some
// point in the future if there are bytes available, or 'end'
// if you are not going to have any more data.
//
// You MUST NOT emit either 'end' or 'readable' before
// returning from this function, but you MAY emit 'end' or
// 'readable' in process.nextTick().
};
<!-- misc -->
r.on('end', function() {
// no more bytes will be provided.
});
Writable streams in node are relatively straightforward to use and
extend. The `write` method either returns `false` if you would like
the user to back off a bit, in which case a `drain` event at some
point in the future will let them continue writing, or anything other
than false if the bytes could be completely handled and another
`write` should be performed, or The `end()` method lets the user
indicate that no more bytes will be written. That's pretty much the
entire required interface for writing.
r.on('readable', function() {
// now is the time to call read() again.
});
```
## Justification
Writable streams in node are very straightforward to use and extend.
The `write` method either returns `true` if the bytes could be
completely handled and another `write` should be performed, or `false`
if you would like the user to back off a bit, in which case a `drain`
event at some point in the future will let them continue writing. The
`end()` method lets the user indicate that no more bytes will be
written. That's pretty much the entire required interface for
writing.
However, readable streams in Node 0.8 and before are rather

@@ -60,2 +38,5 @@ complicated.

buffering yourself.
3. In many streams, `pause()` was purely advisory, so **even while
paused**, you still have to be careful that you might get some
data. This caused a lot of subtle b ugs.

@@ -70,7 +51,10 @@ So, while writers only have to implement `write()`, `end()`, and

And read consumers had to always be prepared for their backpressure
advice to simply be ignored.
If you are using a readable stream, and want to just get the first 10
bytes, make a decision, and then pass the rest off to somewhere else,
then you have to handle buffering, pausing, and so on. This is all
rather brittle and easy to get wrong for all but the most trivial use
cases.
then you have to handle buffering, pausing, slicing, and so on. This
is all rather brittle and easy to get wrong for all but the most
trivial use cases.

@@ -84,6 +68,9 @@ Additionally, this all made the `reader.pipe(writer)` method

<!-- misc -->
The reader does not have pause/resume methods. If you want to consume
the bytes, you call `read()`. If bytes are not being consumed, then
effectively the stream is in a paused state. It exerts backpressure
on upstream connections, doesn't read from files, etc.
on upstream connections, doesn't read from files, etc. Any data that
was already in the process of being read will be placed in a buffer.

@@ -94,6 +81,9 @@ If `read()` returns `null`, then a future `readable` event will be

This is simpler and conceptually closer to the underlying mechanisms.
The resulting `pipe()` method is much shorter and simpler.
The resulting `pipe()` method is much shorter and simpler. The
problems of data events happening while paused are alleviated.
### Compatibility
<!-- misc -->
It's not particularly difficult to wrap older-style streams in this

@@ -103,13 +93,21 @@ new interface, or to wrap this type of stream in the older-style

The `Readable` class takes an argument which is an old-style stream
with `data` events and `pause()` and `resume()` methods, and uses that
as the data source. For example:
The `Readable` class provides a `wrap(oldStream)` method that takes an
argument which is an old-style stream with `data` events and `pause()`
and `resume()` methods, and uses that as the data source. For
example:
```javascript
var r = new Readable(oldReadableStream);
var r = new Readable();
r.wrap(oldReadableStream);
// now you can use r.read(), and it will emit 'readable' events
// but the data is based on whatever oldReadableStream spits out of
// its 'data' events.
```
The `Readable` class will also automatically convert into an old-style
In order to work with programs that use the older interface, some
magic is unfortunately required. At some point in the future, this
magic will be removed.
The `Readable` class will automatically convert into an old-style
`data`-emitting stream if any listeners are added to the `data` event.

@@ -124,4 +122,9 @@ So, this works fine, though you of course lose a lot of the benefits of

// ...
// magic is happening! oh no! the animals are walking upright!
// the brooms are sweeping the floors all by themselves!
});
// this will also turn on magic-mode:
r.pause();
// now pause, resume, etc. are patched into place, and r will

@@ -135,1 +138,371 @@ // continually call read() until it returns null, emitting the

```
## Class: Readable
A base class for implementing Readable streams. Override the
`_read(n,cb)` method to fetch data asynchronously and take advantage
of the buffering built into the Readable class.
### Example
Extend the Readable class, and provide a `_read(n,cb)` implementation
method.
```javascript
var Readable = require('readable-stream');
var util = require('util');
util.inherits(MyReadable, Readable);
function MyReadable(options) {
Readable.call(this, options);
}
MyReadable.prototype._read = function(n, cb) {
// your magic goes here.
// call the cb at some time in the future with either up to n bytes,
// or an error, like cb(err, resultData)
//
// The code in the Readable class will call this to keep an internal
// buffer at a healthy level, as the user calls var chunk=stream.read(n)
// to consume chunks.
};
var r = new MyReadable();
r.on('end', function() {
// no more bytes will be provided.
});
r.on('readable', function() {
// now is the time to call read() again.
});
// to get some bytes out of it:
var data = r.read(optionalLengthArgument);
// now data is either null, or a buffer of optionalLengthArgument
// length. If you don't provide an argument, then it returns whatever
// it has.
// typically you'd just r.pipe() into some writable stream, but you
// can of course do stuff like this, as well:
function flow() {
var chunk;
while (null !== (chunk = r.read())) {
doSomethingWithData(chunk);
}
r.once('readable', flow);
}
flow();
```
### new Readable(options)
* `options` {Object}
* `lowWaterMark` {Number} The minimum number of bytes before the
stream is considered 'readable'. Default = `1024`
* `bufferSize` {Number} The number of bytes to try to read from the
underlying `_read` function. Default = `16 * 1024`
Make sure to call the `Readable` constructor in your extension
classes, or else the stream will not be properly initialized.
### readable.read([n])
* `n` {Number} Optional number of bytes to read. If not provided,
then return however many bytes are available.
* Returns: {Buffer | null}
Pulls the requested number of bytes out of the internal buffer. If
that many bytes are not available, then it returns `null`.
### readable.\_read(n, callback)
* `n` {Number} Number of bytes to read from the underlying
asynchronous data source.
* `callback` {Function} Callback function
* `error` {Error Object}
* `data` {Buffer | null}
**Note: This function is not implemented in the Readable base class.**
Rather, it is up to you to implement `_read` in your extension
classes.
`_read` should fetch the specified number of bytes, and call the
provided callback with `cb(error, data)`, where `error` is any error
encountered, and `data` is the returned data.
This method is prefixed with an underscore because it is internal to
the class that defines it, and should not be called directly by user
programs. However, you **are** expected to override this method in
your own extension classes.
### readable.pipe(destination)
* `destination` {Writable Stream object}
Continually `read()` data out of the readable stream, and `write()` it
into the writable stream. When the `writable.write(chunk)` call
returns `false`, then it will back off until the next `drain` event,
to do backpressure.
Piping to multiple destinations is supported. The slowest destination
stream will limit the speed of the `pipe()` flow.
Note that this puts the readable stream into a state where not very
much can be done with it. You can no longer `read()` from the stream
in other code, without upsetting the pipe() process. However, since
multiple pipe destinations are supported, you can always create a
`PassThrough` stream, and pipe the reader to that. For example:
```
var r = new ReadableWhatever();
var pt = new PassThrough();
r.pipe(someWritableThing);
r.pipe(pt);
// now I can call pt.read() to my heart's content.
// note that if I *don't* call pt.read(), then it'll back up and
// prevent the pipe() from flowing!
```
### readable.unpipe([destination])
* `destination` {Writable Stream object} Optional
Remove the provided `destination` stream from the pipe flow. If no
argument is provided, then it will unhook all piped destinations.
### readable.on('readable')
An event that signals more data is now available to be read from the
stream. Emitted when more data arrives, after previously calling
`read()` and getting a null result.
### readable.on('end')
An event that signals that no more data will ever be available on this
stream. It's over.
### readable.\_readableState
* {Object}
An object that tracks the state of the stream. Buffer information,
whether or not it has reached the end of the underlying data source,
etc., are all tracked on this object.
You are strongly encouraged not to modify this in any way, but it is
often useful to read from.
## Class: Writable
A base class for creating Writable streams. Similar to Readable, you
can create child classes by overriding the asynchronous
`_write(chunk,cb)` method, and it will take care of buffering,
backpressure, and so on.
### new Writable(options)
* `options` {Object}
* `highWaterMark` {Number} The number of bytes to store up before it
starts returning `false` from write() calls. Default = `16 * 1024`
* `lowWaterMark` {Number} The number of bytes that the buffer must
get down to before it emits `drain`. Default = `1024`
Make sure to call the `Writable` constructor in your extension
classes, or else the stream will not be properly initialized.
### writable.write(chunk, [encoding])
* `chunk` {Buffer | String}
* `encoding` {String} An encoding argument to turn the string chunk
into a buffer. Only relevant if `chunk` is a string.
Default = `'utf8'`.
* Returns `false` if you should not write until the next `drain`
event, or some other value otherwise.
The basic write function.
### writable.\_write(chunk, callback)
* `chunk` {Buffer}
* `callback` {Function}
* `error` {Error | null} Call with an error object as the first
argument to indicate that the write() failed for unfixable
reasons.
**Note: This function is not implemented in the Writable base class.**
Rather, it is up to you to implement `_write` in your extension
classes.
`_write` should do whatever has to be done in this specific Writable
class, to handle the bytes being written. Write to a file, send along
a socket, encrypt as an mp3, whatever needs to be done. Do your I/O
asynchronously, and call the callback when it's complete.
This method is prefixed with an underscore because it is internal to
the class that defines it, and should not be called directly by user
programs. However, you **are** expected to override this method in
your own extension classes.
### writable.end([chunk], [encoding])
* `chunk` {Buffer | String}
* `encoding` {String}
If a chunk (and, optionally, an encoding) are provided, then that
chunk is first passed to `this.write(chunk, encoding)`.
This method is a way to signal to the writable stream that you will
not be writing any more data. It should be called exactly once for
every writable stream.
Calling `write()` *after* calling `end()` will trigger an error.
### writable.on('pipe', source)
Emitted when calling `source.pipe(writable)`. See above for the
description of the `readable.pipe()` method.
### writable.on('unpipe', source)
Emitted when calling `source.unpipe(writable)`. See above for the
description of the `readable.unpipe()` method.
### writable.on('drain')
If a call to `writable.write()` returns false, then at some point in
the future, this event will tell you to start writing again.
### writable.on('finish')
When the stream has been ended, and all the data in its internal
buffer has been consumed, then it emits a `finish` event to let you
know that it's completely done.
This is particularly handy if you want to know when it is safe to shut
down a socket or close a file descriptor. At this time, the writable
stream may be safely disposed. Its mission in life has been
accomplished.
## Class: Duplex
A base class for Duplex streams (ie, streams that are both readable
and writable).
Since JS doesn't have multiple prototypal inheritance, this class
prototypally inherits from Readable, and then parasitically from
Writable. It is thus up to the user to implement both the lowlevel
`_read(n,cb)` method as well as the lowlevel `_write(chunk,cb)`
method on extension duplex classes.
For cases where the written data is transformed into the output, it
may be simpler to use the `Transform` class instead.
### new Duplex(options)
* `options` {Object} Passed to both the Writable and Readable
constructors.
Make sure to call the `Duplex` constructor in your extension
classes, or else the stream will not be properly initialized.
If `options.allowHalfOpen` is set to the value `false`, then the
stream will automatically end the readable side when the writable
side ends, and vice versa.
### duplex.allowHalfOpen
* {Boolean} Default = `true`
Set this flag to either `true` or `false` to determine whether or not
to automatically close the writable side when the readable side ends,
and vice versa.
## Class: Transform
A duplex (ie, both readable and writable) stream that is designed to
make it easy to implement transform operations such as encryption,
decryption, compression, and so on.
Transform streams are `instanceof` Readable, but they have all of the
methods and properties of both Readable and Writable streams. See
above for the list of events and methods that Transform inherits from
Writable and Readable.
Override the `_transform(chunk, outputFunction, callback)` method in
your implementation classes to take advantage of it.
### new Transform(options)
* `options` {Object} Passed to both the Writable and Readable
constructors.
Make sure to call the `Transform` constructor in your extension
classes, or else the stream will not be properly initialized.
### transform.\_transform(chunk, outputFn, callback)
* `chunk` {Buffer} The chunk to be transformed.
* `outputFn` {Function} Call this function with any output data to be
passed to the readable interface.
* `callback` {Function} Call this function (optionally with an error
argument) when you are done processing the supplied chunk.
**Note: This function is not implemented in the Transform base class.**
Rather, it is up to you to implement `_transform` in your extension
classes.
`_transform` should do whatever has to be done in this specific
Transform class, to handle the bytes being written, and pass them off
to the readable portion of the interface. Do asynchronous I/O,
process things, and so on.
Call the callback function only when the current chunk is completely
consumed. Note that this may mean that you call the `outputFn` zero
or more times, depending on how much data you want to output as a
result of this chunk.
This method is prefixed with an underscore because it is internal to
the class that defines it, and should not be called directly by user
programs. However, you **are** expected to override this method in
your own extension classes.
### transform.\_flush(outputFn, callback)
* `outputFn` {Function} Call this function with any output data to be
passed to the readable interface.
* `callback` {Function} Call this function (optionally with an error
argument) when you are done flushing any remaining data.
**Note: This function is not implemented in the Transform base class.**
Rather, it is up to you to implement `_flush` in your extension
classes optionally, if it applies to your use case.
In some cases, your transform operation may need to emit a bit more
data at the end of the stream. For example, a `Zlib` compression
stream will store up some internal state so that it can optimally
compress the output. At the end, however, it needs to do the best it
can with what is left, so that the data will be complete.
In those cases, you can implement a `_flush` method, which will be
called at the very end, after all the written data is consumed, but
before emitting `end` to signal the end of the readable side. Just
like with `_transform`, call `outputFn` zero or more times, as
appropriate, and call `callback` when the flush operation is complete.
This method is prefixed with an underscore because it is internal to
the class that defines it, and should not be called directly by user
programs. However, you **are** expected to override this method in
your own extension classes.
## Class: PassThrough
This is a trivial implementation of a `Transform` stream that simply
passes the input bytes across to the output. Its purpose is mainly
for examples and testing, but there are occasionally use cases where
it can come in handy.
var test = require('tap').test;
var fromList = require('../from-list.js');
var fromList = require('../readable.js')._fromList;
test('with length', function(t) {
test('buffers', function(t) {
// have a length

@@ -33,1 +33,31 @@ var len = 16;

});
test('strings', function(t) {
// have a length
var len = 16;
var list = [ 'foog',
'bark',
'bazy',
'kuel' ];
// read more than the first element.
var ret = fromList(6, list, 16, true);
t.equal(ret, 'foogba');
// read exactly the first element.
ret = fromList(2, list, 10, true);
t.equal(ret, 'rk');
// read less than the first element.
ret = fromList(2, list, 8, true);
t.equal(ret, 'ba');
// read more than we have.
ret = fromList(100, list, 6, true);
t.equal(ret, 'zykuel');
// all consumed.
t.same(list, []);
t.end();
});

@@ -46,7 +46,10 @@ var test = require('tap').test

var l = 0;
t.same(res.map(function (c) { return c.length; }), expectLengths);
t.same(res.map(function (c) {
return c.length;
}), expectLengths);
t.end();
});
r.pipe(w);
r.pipe(w, { chunkSize: 10 });
});

@@ -0,1 +1,22 @@

// Copyright Joyent, Inc. and other Node contributors.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the
// "Software"), to deal in the Software without restriction, including
// without limitation the rights to use, copy, modify, merge, publish,
// distribute, sublicense, and/or sell copies of the Software, and to permit
// persons to whom the Software is furnished to do so, subject to the
// following conditions:
//
// The above copyright notice and this permission notice shall be included
// in all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
// USE OR OTHER DEALINGS IN THE SOFTWARE.
// A bit simpler than readable streams.

@@ -5,23 +26,41 @@ // Implement an async ._write(chunk, cb), and it'll handle all

'use strict';
module.exports = Writable;
module.exports = Writable
var util = require('util');
var Stream = require('stream');
var Duplex = require('./duplex.js');
util.inherits(Writable, Stream);
function Writable(options) {
// buffer management
function WritableState(options) {
options = options || {};
this.highWaterMark = options.highWaterMark || 16 * 1024;
this.highWaterMark = options.hasOwnProperty('highWaterMark') ?
options.highWaterMark : 16 * 1024;
this.lowWaterMark = options.hasOwnProperty('lowWaterMark') ?
options.lowWaterMark : 1024;
this.needDrain = false;
this.ended = false;
this.ending = false;
// not an actual buffer we keep track of, but a measurement
// of how much we're waiting to get pushed to some underlying
// socket or file.
this.length = 0;
// the point at which write returns false
this.highWaterMark = options.highWaterMark || 16 * 1024;
this.writing = false;
this.buffer = [];
}
// the point at which drain is emitted
this.lowWaterMark = options.lowWaterMark || 1024;
function Writable(options) {
// Writable ctor is applied to Duplexes, though they're not
// instanceof Writable, they're instanceof Readable.
if (!(this instanceof Writable) && !(this instanceof Duplex))
return new Writable(options);
this._needDrain = false;
this._ended = false;
this._writableState = new WritableState(options);
// legacy.
this.writable = true;
Stream.call(this);

@@ -32,10 +71,27 @@ }

// override the _write(chunk, cb) method for async streams
Writable.prototype.write = function(chunk) {
var ret = this.length >= this.highWaterMark;
if (ret === false) {
this._needDrain = true;
Writable.prototype.write = function(chunk, encoding) {
var state = this._writableState;
if (state.ended) {
this.emit('error', new Error('write after end'));
return;
}
if (typeof chunk === 'string' || encoding)
chunk = new Buffer(chunk + '', encoding);
var l = chunk.length;
this.length += l;
this._write(chunk, function(er) {
state.length += l;
var ret = state.length < state.highWaterMark;
if (ret === false)
state.needDrain = true;
if (state.writing) {
state.buffer.push(chunk);
return ret;
}
state.writing = true;
this._write(chunk, function writecb(er) {
state.writing = false;
if (er) {

@@ -45,14 +101,33 @@ this.emit('error', er);

}
this.length -= l;
state.length -= l;
if (this.length === 0 && this._ended) {
this.emit('end');
if (state.length === 0 && (state.ended || state.ending)) {
// emit 'finish' at the very end.
this.emit('finish');
return;
}
if (this.length < this.lowWaterMark && this._needDrain) {
this._needDrain = false;
this.emit('drain');
// if there's something in the buffer waiting, then do that, too.
if (state.buffer.length) {
chunk = state.buffer.shift();
l = chunk.length;
state.writing = true;
this._write(chunk, writecb.bind(this));
}
if (state.length <= state.lowWaterMark && state.needDrain) {
// Must force callback to be called on nextTick, so that we don't
// emit 'drain' before the write() consumer gets the 'false' return
// value, and has a chance to attach a 'drain' listener.
process.nextTick(function() {
if (!state.needDrain)
return;
state.needDrain = false;
this.emit('drain');
}.bind(this));
}
}.bind(this));
return ret;
};

@@ -64,7 +139,10 @@

Writable.prototype.end = function(chunk) {
if (chunk) {
this.write(chunk);
}
this._ended = true;
Writable.prototype.end = function(chunk, encoding) {
var state = this._writableState;
state.ending = true;
if (chunk)
this.write(chunk, encoding);
else if (state.length === 0)
this.emit('finish');
state.ended = true;
};
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc