Socket
Socket
Sign inDemoInstall

readable-stream

Package Overview
Dependencies
0
Maintainers
2
Versions
103
Alerts
File Explorer

Advanced tools

Install Socket

Detect and block malicious and high-risk dependencies

Install

Comparing version 0.2.0 to 0.3.0

6

lib/_stream_duplex.js

@@ -46,2 +46,8 @@ // Copyright Joyent, Inc. and other Node contributors.

if (options && options.readable === false)
this.readable = false;
if (options && options.writable === false)
this.writable = false;
this.allowHalfOpen = true;

@@ -48,0 +54,0 @@ if (options && options.allowHalfOpen === false)

141

lib/_stream_readable.js

@@ -51,2 +51,5 @@ // Copyright Joyent, Inc. and other Node contributors.

if (this.lowWaterMark > this.highWaterMark)
throw new Error('lowWaterMark cannot be higher than highWaterMark');
this.buffer = [];

@@ -70,2 +73,7 @@ this.length = 0;

// object stream flag. Used to make read(n) ignore n and to
// make all the buffer merging and length checks go away
this.objectMode = !!options.objectMode;
// when piping, we only care about 'readable' events that happen

@@ -110,4 +118,9 @@ // after read()ing all the bytes and not getting any pushback.

// more bytes. This is to work around cases where hwm=0 and
// lwm=0, such as the repl.
return rs.length < rs.highWaterMark || rs.length <= rs.lowWaterMark;
// lwm=0, such as the repl. Also, if the push() triggered a
// readable event, and the user called read(largeNumber) such that
// needReadable was set, then we ought to push more, so that another
// 'readable' event will be triggered.
return rs.needReadable ||
rs.length < rs.highWaterMark ||
rs.length <= rs.lowWaterMark;
};

@@ -127,2 +140,5 @@

if (state.objectMode)
return n === 0 ? 0 : 1;
if (isNaN(n) || n === null)

@@ -158,3 +174,4 @@ return state.length;

if (n === 0 && state.ended) {
endReadable(this);
if (state.length === 0)
endReadable(this);
return null;

@@ -216,7 +233,7 @@ }

if (n > 0)
ret = fromList(n, state.buffer, state.length, !!state.decoder);
ret = fromList(n, state);
else
ret = null;
if (ret === null || ret.length === 0) {
if (ret === null || (!state.objectMode && ret.length === 0)) {
state.needReadable = true;

@@ -233,2 +250,8 @@ n = 0;

// If we happened to read() exactly the remaining amount in the
// buffer, and the EOF has been seen at this point, then make sure
// that we emit 'end' on the very next tick.
if (state.ended && !state.endEmitted && state.length === 0)
endReadable(this);
return ret;

@@ -241,2 +264,15 @@ };

// If we get something that is not a buffer, string, null, or undefined,
// then switch into objectMode. Now stream chunks are all considered
// to be of length=1, and the watermarks determine how many objects to
// keep in the buffer, rather than how many bytes or characters.
if (!Buffer.isBuffer(chunk) &&
'string' !== typeof chunk &&
chunk !== null &&
chunk !== undefined) {
state.objectMode = true;
state.length = state.buffer.length;
state.decoder = null;
}
state.reading = false;

@@ -246,3 +282,3 @@ if (er)

if (!chunk || !chunk.length) {
if (chunk === null || chunk === undefined) {
// eof

@@ -254,20 +290,26 @@ state.ended = true;

state.buffer.push(chunk);
state.length += chunk.length;
state.length += state.objectMode ? 1 : chunk.length;
}
}
// if we've ended and we have some data left, then emit
// 'readable' now to make sure it gets picked up.
if (!sync) {
if (state.length > 0) {
state.needReadable = false;
if (!state.emittedReadable) {
state.emittedReadable = true;
stream.emit('readable');
}
} else
endReadable(stream);
}
if (state.length > 0)
emitReadable(stream);
else
endReadable(stream);
return;
}
// at this point, if we got a zero-length buffer or string,
// and we're not in object-mode, then there's really no point
// continuing. it means that there is nothing to read right
// now, but as we have not received the EOF-signaling null,
// we're not ended. we've already unset the reading flag,
// so just get out of here.
if (!state.objectMode &&
(chunk || typeof chunk === 'string') &&
0 === chunk.length)
return;
if (state.decoder)

@@ -277,6 +319,4 @@ chunk = state.decoder.write(chunk);

// update the buffer info.
if (chunk) {
state.length += chunk.length;
state.buffer.push(chunk);
}
state.length += state.objectMode ? 1 : chunk.length;
state.buffer.push(chunk);

@@ -294,11 +334,25 @@ // if we haven't gotten enough to pass the lowWaterMark,

if (state.needReadable && !sync) {
state.needReadable = false;
if (!state.emittedReadable) {
state.emittedReadable = true;
stream.emit('readable');
}
// Don't emit readable right away in sync mode, because this can trigger
// another read() call => stack overflow. This way, it might trigger
// a nextTick recursion warning, but that's not so bad.
if (state.needReadable) {
if (!sync)
emitReadable(stream);
else
process.nextTick(function() {
emitReadable(stream);
});
}
}
function emitReadable(stream) {
var state = stream._readableState;
state.needReadable = false;
if (state.emittedReadable)
return;
state.emittedReadable = true;
stream.emit('readable');
}
// abstract method. to be overridden in specific implementation classes.

@@ -361,3 +415,3 @@ // call cb(er, data) where data is <= n in length.

// cleanup event handlers once the pipe is broken
dest.removeListener('close', unpipe);
dest.removeListener('close', onclose);
dest.removeListener('finish', onfinish);

@@ -388,7 +442,11 @@ dest.removeListener('drain', ondrain);

// if the dest emits close, then presumably there's no point writing
// to it any more.
dest.once('close', unpipe);
// Both close and finish should trigger unpipe, but only once.
function onclose() {
dest.removeListener('finish', onfinish);
unpipe();
}
dest.once('close', onclose);
function onfinish() {
dest.removeListener('close', unpipe);
dest.removeListener('close', onclose);
unpipe();
}

@@ -504,2 +562,3 @@ dest.once('finish', onfinish);

this.removeListener('readable', pipeOnReadable);
state.flowing = false;
if (dest)

@@ -519,2 +578,3 @@ dest.emit('unpipe', this);

this.removeListener('readable', pipeOnReadable);
state.flowing = false;

@@ -684,12 +744,17 @@ for (var i = 0; i < len; i++)

// Length is the combined lengths of all the buffers in the list.
function fromList(n, list, length, stringMode) {
function fromList(n, state) {
var list = state.buffer;
var length = state.length;
var stringMode = !!state.decoder;
var objectMode = !!state.objectMode;
var ret;
// nothing in the list, definitely empty.
if (list.length === 0) {
if (list.length === 0)
return null;
}
if (length === 0)
ret = null;
else if (objectMode)
ret = list.shift();
else if (!n || n >= length) {

@@ -746,2 +811,8 @@ // read it all, truncate the array.

var state = stream._readableState;
// If we get here before consuming all the bytes, then that is a
// bug in node. Should never happen.
if (state.length > 0)
throw new Error('endReadable called on non-empty stream');
if (state.endEmitted)

@@ -748,0 +819,0 @@ return;

@@ -73,11 +73,45 @@ // Copyright Joyent, Inc. and other Node contributors.

function TransformState(stream) {
this.buffer = [];
this.transforming = false;
this.pendingReadCb = null;
function TransformState(options, stream) {
var ts = this;
this.output = function(chunk) {
ts.needTransform = false;
stream.push(chunk);
};
this.afterTransform = function(er, data) {
return afterTransform(stream, er, data);
};
this.needTransform = false;
this.transforming = false;
this.writecb = null;
this.writechunk = null;
}
function afterTransform(stream, er, data) {
var ts = stream._transformState;
ts.transforming = false;
var cb = ts.writecb;
if (!cb)
return this.emit('error', new Error('no writecb in Transform class'));
ts.writechunk = null;
ts.writecb = null;
if (data !== null && data !== undefined)
ts.output(data);
if (cb)
cb(er);
var rs = stream._readableState;
if (rs.needReadable || rs.length < rs.highWaterMark) {
stream._read();
}
}
function Transform(options) {

@@ -89,9 +123,10 @@ if (!(this instanceof Transform))

// bind output so that it can be passed around as a regular function.
var ts = this._transformState = new TransformState(options, this);
// when the writable side finishes, then flush out anything remaining.
var stream = this;
// the queue of _write chunks that are pending being transformed
var ts = this._transformState = new TransformState(stream);
// start out asking for a readable event once data is transformed.
this._readableState.needReadable = true;
// when the writable side finishes, then flush out anything remaining.
this.once('finish', function() {

@@ -123,52 +158,26 @@ if ('function' === typeof this._flush)

var ts = this._transformState;
var rs = this._readableState;
ts.buffer.push([chunk, cb]);
// no need for auto-pull if already in the midst of one.
ts.writecb = cb;
ts.writechunk = chunk;
if (ts.transforming)
return;
// now we have something to transform, if we were waiting for it.
// kick off a _read to pull it in.
if (ts.pendingReadCb) {
var readcb = ts.pendingReadCb;
ts.pendingReadCb = null;
this._read(0, readcb);
}
// if we weren't waiting for it, but nothing is queued up, then
// still kick off a transform, just so it's there when the user asks.
var doRead = rs.needReadable || rs.length <= rs.highWaterMark;
if (doRead && !rs.reading) {
var ret = this.read(0);
if (ret !== null)
return cb(new Error('invalid stream transform state'));
}
var rs = this._readableState;
if (ts.needTransform || rs.needReadable || rs.length < rs.highWaterMark)
this._read();
};
Transform.prototype._read = function(n, readcb) {
var ws = this._writableState;
var rs = this._readableState;
// Doesn't matter what the args are here.
// the output and callback functions passed to _transform do all the work.
// That we got here means that the readable side wants more data.
Transform.prototype._read = function(n, cb) {
var ts = this._transformState;
ts.pendingReadCb = readcb;
// if there's nothing pending, then we just wait.
// if we're already transforming, then also just hold on a sec.
// we've already stashed the readcb, so we can come back later
// when we have something to transform
if (ts.buffer.length === 0 || ts.transforming)
if (ts.writechunk && ts.writecb && !ts.transforming) {
ts.transforming = true;
this._transform(ts.writechunk, ts.output, ts.afterTransform);
return;
}
// go ahead and transform that thing, now that someone wants it
var req = ts.buffer.shift();
var chunk = req[0];
var writecb = req[1];
ts.transforming = true;
this._transform(chunk, ts.output, function(er, data) {
ts.transforming = false;
if (data)
ts.output(data);
writecb(er);
});
// mark that we need a transform, so that any data that comes in
// will get processed, now that we've asked for it.
ts.needTransform = true;
};

@@ -175,0 +184,0 @@

@@ -49,2 +49,6 @@ // Copyright Joyent, Inc. and other Node contributors.

// object stream flag to indicate whether or not this stream
// contains buffers or objects.
this.objectMode = !!options.objectMode;
// cast to ints.

@@ -54,2 +58,5 @@ this.lowWaterMark = ~~this.lowWaterMark;

if (this.lowWaterMark > this.highWaterMark)
throw new Error('lowWaterMark cannot be higher than highWaterMark');
this.needDrain = false;

@@ -133,11 +140,25 @@ // at the start of calling end()

var l = chunk.length;
if (false === state.decodeStrings)
chunk = [chunk, encoding || 'utf8'];
else if (typeof chunk === 'string' || encoding) {
chunk = new Buffer(chunk + '', encoding);
l = chunk.length;
// Writing something other than a string or buffer will switch
// the stream into objectMode.
if (!state.objectMode &&
typeof chunk !== 'string' &&
chunk !== null &&
chunk !== undefined &&
!Buffer.isBuffer(chunk))
state.objectMode = true;
var len;
if (state.objectMode)
len = 1;
else {
len = chunk.length;
if (false === state.decodeStrings)
chunk = [chunk, encoding || 'utf8'];
else if (typeof chunk === 'string') {
chunk = new Buffer(chunk, encoding);
len = chunk.length;
}
}
state.length += l;
state.length += len;

@@ -157,3 +178,3 @@ var ret = state.length < state.highWaterMark;

state.sync = true;
state.writelen = l;
state.writelen = len;
state.writecb = cb;

@@ -170,3 +191,3 @@ this._write(chunk, state.onwrite);

var cb = state.writecb;
var l = state.writelen;
var len = state.writelen;

@@ -194,3 +215,3 @@ state.writing = false;

}
state.length -= l;
state.length -= len;

@@ -239,8 +260,10 @@ if (cb) {

if (false === state.decodeStrings)
l = chunk[0].length;
if (state.objectMode)
len = 1;
else if (false === state.decodeStrings)
len = chunk[0].length;
else
l = chunk.length;
len = chunk.length;
state.writelen = l;
state.writelen = len;
state.writecb = cb;

@@ -277,3 +300,3 @@ state.writechunk = chunk;

Writable.prototype.end = function(chunk, encoding) {
Writable.prototype.end = function(chunk, encoding, cb) {
var state = this._writableState;

@@ -285,5 +308,14 @@

if (typeof chunk === 'function') {
cb = chunk;
chunk = null;
encoding = null;
} else if (typeof encoding === 'function') {
cb = encoding;
encoding = null;
}
state.ending = true;
if (chunk)
this.write(chunk, encoding);
this.write(chunk, encoding, cb);
else if (state.length === 0 && !state.finishing && !state.finished) {

@@ -293,4 +325,8 @@ state.finishing = true;

state.finished = true;
if (cb) process.nextTick(cb);
} else if (cb) {
this.once('finish', cb);
}
state.ended = true;
};
{
"name": "readable-stream",
"version": "0.2.0",
"version": "0.3.0",
"description": "An exploration of a new kind of readable streams for Node.js",

@@ -5,0 +5,0 @@ "main": "readable.js",

@@ -99,3 +99,6 @@ // Copyright Joyent, Inc. and other Node contributors.

var tests = [];
var count = 0;
function test(name, fn) {
count++;
tests.push([name, fn]);

@@ -115,6 +118,14 @@ }

equal: assert.equal,
end: run
end: function () {
count--;
run();
}
});
}
// ensure all tests have run
process.on("exit", function () {
assert.equal(count, 0);
});
process.nextTick(run);

@@ -323,1 +334,137 @@

});
test('back pressure respected', function (t) {
function noop() {}
var r = new R();
var counter = 0;
r.push(["one"]);
r.push(["two"]);
r.push(["three"]);
r.push(["four"]);
r.push(null);
r._read = noop;
var w1 = new R();
w1.write = function (chunk) {
assert.equal(chunk[0], "one");
w1.emit("close");
process.nextTick(function () {
r.pipe(w2);
r.pipe(w3);
})
};
w1.end = noop;
r.pipe(w1);
var expected = ["two", "two", "three", "three", "four", "four"];
var w2 = new R();
w2.write = function (chunk) {
assert.equal(chunk[0], expected.shift());
assert.equal(counter, 0);
counter++;
if (chunk[0] === "four") {
return true;
}
setTimeout(function () {
counter--;
w2.emit("drain");
}, 10);
return false;
}
w2.end = noop;
var w3 = new R();
w3.write = function (chunk) {
assert.equal(chunk[0], expected.shift());
assert.equal(counter, 1);
counter++;
if (chunk[0] === "four") {
return true;
}
setTimeout(function () {
counter--;
w3.emit("drain");
}, 50);
return false;
};
w3.end = function () {
assert.equal(counter, 2);
assert.equal(expected.length, 0);
t.end();
};
});
test('read(0) for ended streams', function (t) {
var r = new R();
var written = false;
var ended = false;
r._read = function () {};
r.push(new Buffer("foo"));
r.push(null);
var v = r.read(0);
assert.equal(v, null);
var w = new R();
w.write = function (buffer) {
written = true;
assert.equal(ended, false);
assert.equal(buffer.toString(), "foo")
};
w.end = function () {
ended = true;
assert.equal(written, true);
t.end();
};
r.pipe(w);
})
test('sync _read ending', function (t) {
var r = new R();
var called = false;
r._read = function (n, cb) {
cb(null, null);
};
r.once('end', function () {
called = true;
})
r.read();
process.nextTick(function () {
assert.equal(called, true);
t.end();
})
});
assert.throws(function() {
var bad = new R({
highWaterMark: 10,
lowWaterMark: 1000
});
});
assert.throws(function() {
var W = require('stream').Writable;
var bad = new W({
highWaterMark: 10,
lowWaterMark: 1000
});
});

@@ -28,3 +28,6 @@ // Copyright Joyent, Inc. and other Node contributors.

var tests = [];
var count = 0;
function test(name, fn) {
count++;
tests.push([name, fn]);

@@ -44,6 +47,14 @@ }

equal: assert.equal,
end: run
end: function () {
count--;
run();
}
});
}
// ensure all tests have run
process.on("exit", function () {
assert.equal(count, 0);
});
process.nextTick(run);

@@ -62,15 +73,15 @@

// read more than the first element.
var ret = fromList(6, list, 16);
var ret = fromList(6, { buffer: list, length: 16 });
t.equal(ret.toString(), 'foogba');
// read exactly the first element.
ret = fromList(2, list, 10);
ret = fromList(2, { buffer: list, length: 10 });
t.equal(ret.toString(), 'rk');
// read less than the first element.
ret = fromList(2, list, 8);
ret = fromList(2, { buffer: list, length: 8 });
t.equal(ret.toString(), 'ba');
// read more than we have.
ret = fromList(100, list, 6);
ret = fromList(100, { buffer: list, length: 6 });
t.equal(ret.toString(), 'zykuel');

@@ -93,15 +104,15 @@

// read more than the first element.
var ret = fromList(6, list, 16, true);
var ret = fromList(6, { buffer: list, length: 16, decoder: true });
t.equal(ret, 'foogba');
// read exactly the first element.
ret = fromList(2, list, 10, true);
ret = fromList(2, { buffer: list, length: 10, decoder: true });
t.equal(ret, 'rk');
// read less than the first element.
ret = fromList(2, list, 8, true);
ret = fromList(2, { buffer: list, length: 8, decoder: true });
t.equal(ret, 'ba');
// read more than we have.
ret = fromList(100, list, 6, true);
ret = fromList(100, { buffer: list, length: 6, decoder: true });
t.equal(ret, 'zykuel');

@@ -108,0 +119,0 @@

@@ -30,3 +30,6 @@ // Copyright Joyent, Inc. and other Node contributors.

var tests = [];
var count = 0;
function test(name, fn) {
count++;
tests.push([name, fn]);

@@ -46,6 +49,14 @@ }

equal: assert.equal,
end: run
end: function () {
count--;
run();
}
});
}
// ensure all tests have run
process.on("exit", function () {
assert.equal(count, 0);
});
process.nextTick(run);

@@ -82,2 +93,4 @@

console.log("cb(null, ret)", ret)
return cb(null, ret);

@@ -183,2 +196,3 @@ }.bind(this), 1);

tr.on('readable', function flow() {
console.log("readable once")
var chunk;

@@ -190,2 +204,3 @@ while (null !== (chunk = tr.read(13)))

tr.on('end', function() {
console.log("END")
t.same(out, expect);

@@ -192,0 +207,0 @@ t.end();

@@ -29,3 +29,6 @@ // Copyright Joyent, Inc. and other Node contributors.

var tests = [];
var count = 0;
function test(name, fn) {
count++;
tests.push([name, fn]);

@@ -45,6 +48,15 @@ }

equal: assert.equal,
end: run
ok: assert,
end: function () {
count--;
run();
}
});
}
// ensure all tests have run
process.on("exit", function () {
assert.equal(count, 0);
});
process.nextTick(run);

@@ -54,2 +66,29 @@

test('writable side consumption', function(t) {
var tx = new Transform({
highWaterMark: 10
});
var transformed = 0;
tx._transform = function(chunk, output, cb) {
transformed += chunk.length;
output(chunk);
cb();
};
for (var i = 1; i <= 10; i++) {
tx.write(new Buffer(i));
}
tx.end();
t.equal(tx._readableState.length, 10);
t.equal(transformed, 10);
t.equal(tx._transformState.writechunk.length, 5);
t.same(tx._writableState.buffer.map(function(c) {
return c[0].length;
}), [6, 7, 8, 9, 10]);
t.end();
});
test('passthrough', function(t) {

@@ -326,1 +365,81 @@ var pt = new PassThrough();

});
test('object transform (json parse)', function(t) {
console.error('json parse stream');
var jp = new Transform({ objectMode: true });
jp._transform = function(data, output, cb) {
try {
output(JSON.parse(data));
cb();
} catch (er) {
cb(er);
}
};
// anything except null/undefined is fine.
// those are "magic" in the stream API, because they signal EOF.
var objects = [
{ foo: 'bar' },
100,
"string",
{ nested: { things: [ { foo: 'bar' }, 100, "string" ] } }
];
var ended = false;
jp.on('end', function() {
ended = true;
});
objects.forEach(function(obj) {
jp.write(JSON.stringify(obj));
var res = jp.read();
t.same(res, obj);
});
jp.end();
process.nextTick(function() {
t.ok(ended);
t.end();
})
});
test('object transform (json stringify)', function(t) {
console.error('json parse stream');
var js = new Transform({ objectMode: true });
js._transform = function(data, output, cb) {
try {
output(JSON.stringify(data));
cb();
} catch (er) {
cb(er);
}
};
// anything except null/undefined is fine.
// those are "magic" in the stream API, because they signal EOF.
var objects = [
{ foo: 'bar' },
100,
"string",
{ nested: { things: [ { foo: 'bar' }, 100, "string" ] } }
];
var ended = false;
js.on('end', function() {
ended = true;
});
objects.forEach(function(obj) {
js.write(obj);
var res = js.read();
t.equal(res, JSON.stringify(obj));
});
js.end();
process.nextTick(function() {
t.ok(ended);
t.end();
})
});

@@ -51,3 +51,6 @@ // Copyright Joyent, Inc. and other Node contributors.

var tests = [];
var count = 0;
function test(name, fn) {
count++;
tests.push([name, fn]);

@@ -59,10 +62,6 @@ }

if (!next)
return console.log('ok');
return console.error('ok');
var name = next[0];
var fn = next[1];
if (!fn)
return run();
console.log('# %s', name);

@@ -72,6 +71,14 @@ fn({

equal: assert.equal,
end: run
end: function () {
count--;
run();
}
});
}
// ensure all tests have run
process.on("exit", function () {
assert.equal(count, 0);
});
process.nextTick(run);

@@ -250,1 +257,41 @@

});
test('end callback', function (t) {
var tw = new TestWriter();
tw.end(function () {
t.end();
});
});
test('end callback with chunk', function (t) {
var tw = new TestWriter();
tw.end(new Buffer('hello world'), function () {
t.end();
});
});
test('end callback with chunk and encoding', function (t) {
var tw = new TestWriter();
tw.end('hello world', 'ascii', function () {
t.end();
});
});
test('end callback after .write() call', function (t) {
var tw = new TestWriter();
tw.write(new Buffer('hello world'));
tw.end(function () {
t.end();
});
});
test('encoding should be ignored for buffers', function(t) {
var tw = new W();
var hex = '018b5e9a8f6236ffe30e31baf80d2cf6eb';
tw._write = function(chunk, cb) {
t.equal(chunk.toString('hex'), hex);
t.end();
};
var buf = new Buffer(hex, 'hex');
tw.write(buf, 'binary');
});
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc