Socket
Socket
Sign inDemoInstall

readable-stream

Package Overview
Dependencies
Maintainers
1
Versions
103
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

readable-stream - npm Package Compare versions

Comparing version 0.0.1 to 0.0.2

from-list.js

3

examples/typer.js
var fs = require('fs');
var fst = fs.createReadStream(__filename);
var Readable = require('../readable.js');
var rst = new Readable(fst);
var rst = new Readable();
rst.wrap(fst);

@@ -6,0 +7,0 @@ rst.on('end', function() {

@@ -1,2 +0,2 @@

"use strict";
'use strict';

@@ -23,3 +23,2 @@ module.exports = FSReadable;

// a very basic memory pool. this optimization helps revent lots

@@ -39,8 +38,5 @@ // of allocations when there are many fs readable streams happening

function FSReadable(path, options) {
Readable.apply(this);
if (!options) options = {};
this._buffer = [];
this._bufferLength = 0;
Readable.apply(this, options);

@@ -52,2 +48,3 @@ this.path = path;

this.bufferSize = 64 * 1024;
this.lowWaterMark = 16 * 1024;

@@ -96,8 +93,10 @@ Object.keys(options).forEach(function(k) {

this.emit('open', fd);
this._read();
}.bind(this));
}
FSReadable.prototype._read = function() {
assert(typeof this.fd === 'number');
FSReadable.prototype._read = function(n, cb) {
if (this.fd === null) {
this.once('open', this._read.bind(this, n, cb));
return;
}

@@ -115,3 +114,3 @@ if (this.reading || this.ended || this.destroyed) return;

var thisPool = pool;
var toRead = Math.min(pool.length - pool.used, this.bufferSize);
var toRead = Math.min(pool.length - pool.used, n);
var start = pool.used;

@@ -129,5 +128,5 @@

fs.read(this.fd, pool, pool.used, toRead, this.pos, afterRead.bind(this));
fs.read(this.fd, pool, pool.used, toRead, this.pos, onread.bind(this));
function afterRead(er, bytesRead) {
function onread(er, bytesRead) {
this.reading = false;

@@ -137,78 +136,17 @@

this.destroy();
this.emit('error', er);
return;
return cb(er);
}
if (bytesRead === 0) {
this.ended = true;
if (this._bufferLength === 0) this.emit('end');
this.close();
return;
var b = null;
if (bytesRead > 0) {
b = thisPool.slice(start, start + bytesRead);
}
var b = thisPool.slice(start, start + bytesRead);
var needReadableEvent = this._bufferLength === 0;
this._bufferLength += bytesRead;
this._buffer.push(b);
if (needReadableEvent) this.emit('readable');
cb(null, b);
}
}
FSReadable.prototype.read = function(n) {
if (this._bufferLength === 0) return null;
if (isNaN(n) || n <= 0) n = this._bufferLength;
var ret;
if (n >= this._bufferLength) {
ret = Buffer.concat(this._buffer);
this._bufferLength = 0;
this._buffer.length = 0;
} else {
// read just some of it.
if (n === this._buffer[0].length) {
// first buffer is a perfect match
ret = this._buffer.shift();
} else if (n < this._buffer[0].length) {
// just take a part of the first buffer.
var buf = this._buffer[0];
ret = buf.slice(0, n);
this._buffer[0] = buf.slice(n);
} else {
// complex case.
ret = new Buffer(n);
var c = 0;
for (var i = 0; i < this._buffer.length && c < n; i++) {
var buf = this._buffer[i];
var cpy = Math.min(n - c, buf.length);
buf.copy(ret, c, 0, cpy);
if (cpy < buf.length) {
this._buffer[i] = buf.slice(cpy);
this._buffer = this._buffer.slice(i);
}
n -= cpy;
}
}
this._bufferLength -= n;
}
if (this._bufferLength === 0 && this.ended) {
process.nextTick(this.emit.bind(this, 'end'));
this.close();
}
// if we've consumed below the desired threshold, pull in more bytes.
if (this._bufferLength < this.bufferSize) {
this._read();
}
if (this._decoder) ret = this._decoder.write(ret);
return ret;
};
FSReadable.prototype.close = function(cb) {
if (cb) this.once('close', cb);
if (this.closed) {
if (this.closed || this.fd === null) {
if (this.fd === null) this.once('open', this.destroy);
return process.nextTick(this.emit.bind(this, 'close'));

@@ -215,0 +153,0 @@ }

{
"name": "readable-stream",
"version": "0.0.1",
"version": "0.0.2",
"description": "An exploration of a new kind of readable streams for Node.js",

@@ -5,0 +5,0 @@ "main": "readable.js",

@@ -1,2 +0,2 @@

"use strict";
'use strict';

@@ -7,36 +7,121 @@ module.exports = Readable;

var util = require('util');
var fromList = require('./from-list.js');
util.inherits(Readable, Stream);
function Readable(stream) {
if (stream) this.wrap(stream);
function Readable(options) {
options = options || {};
this.bufferSize = options.bufferSize || 16 * 1024;
this.lowWaterMark = options.lowWaterMark || 1024;
this.buffer = [];
this.length = 0;
this._pipes = [];
this._flowing = false;
Stream.apply(this);
}
// override this method.
// you can override either this method, or _read(n, cb) below.
Readable.prototype.read = function(n) {
return null;
if (this.length === 0 && this.ended) {
process.nextTick(this.emit.bind(this, 'end'));
return null;
}
if (isNaN(n) || n <= 0) n = this.length;
n = Math.min(n, this.length);
var ret = n > 0 ? fromList(n, this.buffer, this.length) : null;
this.length -= n;
if (!this.ended && this.length < this.lowWaterMark) {
this._read(this.bufferSize, function onread(er, chunk) {
if (er) return this.emit('error', er);
if (!chunk || !chunk.length) {
this.ended = true;
if (this.length === 0) this.emit('end');
return;
}
this.length += chunk.length;
this.buffer.push(chunk);
if (this.length < this.lowWaterMark) {
this._read(this.bufferSize, onread.bind(this));
}
this.emit('readable');
}.bind(this));
}
return ret;
};
// abstract method. to be overridden in specific implementation classes.
Readable.prototype._read = function(n, cb) {
process.nextTick(cb.bind(this, new Error('not implemented')));
};
Readable.prototype.pipe = function(dest, opt) {
if (!(opt && opt.end === false || dest === process.stdout ||
dest === process.stderr)) {
this.on('end', dest.end.bind(dest));
var src = this;
src._pipes.push(dest);
if ((!opt || opt.end !== false) &&
dest !== process.stdout &&
dest !== process.stderr) {
src.once('end', onend);
dest.on('unpipe', function(readable) {
if (readable === src) {
src.removeListener('end', onend);
}
});
}
dest.emit('pipe', this);
dest.emit('pipe', src);
if (!src._flowing) process.nextTick(flow.bind(src));
return dest;
flow.call(this);
function onend() {
dest.end();
}
};
function flow() {
var chunk;
while (chunk = this.read()) {
function flow(src) {
if (!src) src = this;
var chunk;
var dest;
var needDrain = 0;
while (chunk = src.read()) {
src._pipes.forEach(function(dest, i, list) {
var written = dest.write(chunk);
if (!written) {
dest.once('drain', flow.bind(this));
return;
if (false === written) {
needDrain++;
dest.once('drain', ondrain);
}
});
if (needDrain > 0) return;
}
src.once('readable', flow);
function ondrain() {
needDrain--;
if (needDrain === 0) {
flow(src);
}
this.once('readable', flow);
}
}
Readable.prototype.unpipe = function(dest) {
if (!dest) {
// remove all of them.
this._pipes.forEach(function(dest, i, list) {
dest.emit('unpipe', this);
}, this);
this._pipes.length = 0;
} else {
var i = this._pipes.indexOf(dest);
if (i !== -1) {
dest.emit('unpipe', this);
this._pipes.splice(i, 1);
}
}
return this;
};

@@ -85,4 +170,4 @@

Readable.prototype.wrap = function(stream) {
this._buffer = [];
this._bufferLength = 0;
this.buffer = [];
this.length = 0;
var paused = false;

@@ -93,3 +178,3 @@ var ended = false;

ended = true;
if (this._bufferLength === 0) {
if (this.length === 0) {
this.emit('end');

@@ -100,7 +185,7 @@ }

stream.on('data', function(chunk) {
this._buffer.push(chunk);
this._bufferLength += chunk.length;
this.buffer.push(chunk);
this.length += chunk.length;
this.emit('readable');
// if not consumed, then pause the stream.
if (this._bufferLength > 0 && !paused) {
if (this.length > this.lowWaterMark && !paused) {
paused = true;

@@ -131,47 +216,16 @@ stream.pause();

this.read = function(n) {
var ret;
if (this.length === 0) return null;
if (this._bufferLength === 0) {
ret = null;
} else if (!n || n >= this._bufferLength) {
// read it all
ret = Buffer.concat(this._buffer);
this._bufferLength = 0;
this._buffer.length = 0;
} else {
// read just some of it.
if (n < this._buffer[0].length) {
// just take a part of the first buffer.
var buf = this._buffer[0];
ret = buf.slice(0, n);
this._buffer[0] = buf.slice(n);
} else if (n === this._buffer[0].length) {
// first buffer is a perfect match
ret = this._buffer.shift();
} else {
// complex case.
ret = new Buffer(n);
var c = 0;
for (var i = 0; i < this._buffer.length && c < n; i++) {
var buf = this._buffer[i];
var cpy = Math.min(n - c, buf.length);
buf.copy(ret, c, 0, cpy);
if (cpy < buf.length) {
this._buffer[i] = buf.slice(cpy);
this._buffer = this._buffer.slice(i);
}
n -= cpy;
}
}
this._bufferLength -= n;
if (isNaN(n) || n <= 0) n = this.length;
var ret = fromList(n, this.buffer, this.length);
this.length = Math.max(0, this.length - n);
if (this.length < this.lowWaterMark && paused) {
stream.resume();
paused = false;
}
if (this._bufferLength === 0) {
if (paused) {
stream.resume();
paused = false;
}
if (ended) {
process.nextTick(this.emit.bind(this, 'end'));
}
if (this.length === 0 && ended) {
process.nextTick(this.emit.bind(this, 'end'));
}

@@ -178,0 +232,0 @@ return ret;

@@ -5,2 +5,3 @@ var tap = require('tap');

var util = require('util');
var EE = require('events').EventEmitter;

@@ -18,3 +19,6 @@ function TestReader(n) {

TestReader.prototype.read = function(n) {
var toRead = Math.min(n, this._buffer.length - this._pos);
var max = this._buffer.length - this._pos;
n = n || max;
n = Math.max(n, 0);
var toRead = Math.min(n, max);
if (toRead === 0) {

@@ -26,5 +30,8 @@ // simulate the read buffer filling up with some more bytes some time

this._bufs -= 1;
if (this._bufs === 0) {
if (this._bufs <= 0) {
// read them all!
this.emit('end');
if (!this.ended) {
this.emit('end');
this.ended = true;
}
} else {

@@ -42,2 +49,30 @@ this.emit('readable');

/////
function TestWriter() {
EE.apply(this);
this.received = [];
this.flush = false;
}
util.inherits(TestWriter, EE);
TestWriter.prototype.write = function(c) {
this.received.push(c.toString());
this.emit('write', c);
return true;
// flip back and forth between immediate acceptance and not.
this.flush = !this.flush;
if (!this.flush) setTimeout(this.emit.bind(this, 'drain'), 10);
return this.flush;
};
TestWriter.prototype.end = function(c) {
if (c) this.write(c);
this.emit('end', this.received);
};
////////
tap.test('a most basic test', function(t) {

@@ -84,1 +119,152 @@ var r = new TestReader(20);

});
tap.test('pipe', function(t) {
var r = new TestReader(5);
var expect = [ 'xxxxx',
'xxxxx',
'xxxxx',
'xxxxx',
'xxxxx',
'xxxxx',
'xxxxx',
'xxxxx',
'xxxxx',
'xxxxx' ]
var w = new TestWriter;
var flush = true;
w.on('end', function(received) {
t.same(received, expect);
t.end();
});
r.pipe(w);
});
[1,2,3,4,5,6,7,8,9].forEach(function(SPLIT) {
tap.test('unpipe', function(t) {
var r = new TestReader(5);
// unpipe after 3 writes, then write to another stream instead.
var expect = [ 'xxxxx',
'xxxxx',
'xxxxx',
'xxxxx',
'xxxxx',
'xxxxx',
'xxxxx',
'xxxxx',
'xxxxx',
'xxxxx' ];
expect = [ expect.slice(0, SPLIT), expect.slice(SPLIT) ];
var w = [ new TestWriter(), new TestWriter() ];
var writes = SPLIT;
w[0].on('write', function() {
if (--writes === 0) {
r.unpipe();
w[0].end();
r.pipe(w[1]);
}
});
var ended = 0;
w[0].on('end', function(results) {
ended++;
t.same(results, expect[0]);
});
w[1].on('end', function(results) {
ended++;
t.equal(ended, 2);
t.same(results, expect[1]);
t.end();
});
r.pipe(w[0]);
});
});
// both writers should get the same exact data.
tap.test('multipipe', function(t) {
var r = new TestReader(5);
var w = [ new TestWriter, new TestWriter ];
var expect = [ 'xxxxx',
'xxxxx',
'xxxxx',
'xxxxx',
'xxxxx',
'xxxxx',
'xxxxx',
'xxxxx',
'xxxxx',
'xxxxx' ];
var c = 2;
w[0].on('end', function(received) {
t.same(received, expect, 'first');
if (--c === 0) t.end();
});
w[1].on('end', function(received) {
t.same(received, expect, 'second');
if (--c === 0) t.end();
});
r.pipe(w[0]);
r.pipe(w[1]);
});
[1,2,3,4,5,6,7,8,9].forEach(function(SPLIT) {
tap.test('multi-unpipe', function(t) {
var r = new TestReader(5);
// unpipe after 3 writes, then write to another stream instead.
var expect = [ 'xxxxx',
'xxxxx',
'xxxxx',
'xxxxx',
'xxxxx',
'xxxxx',
'xxxxx',
'xxxxx',
'xxxxx',
'xxxxx' ];
expect = [ expect.slice(0, SPLIT), expect.slice(SPLIT) ];
var w = [ new TestWriter(), new TestWriter(), new TestWriter() ];
var writes = SPLIT;
w[0].on('write', function() {
if (--writes === 0) {
r.unpipe();
w[0].end();
r.pipe(w[1]);
}
});
var ended = 0;
w[0].on('end', function(results) {
ended++;
t.same(results, expect[0]);
});
w[1].on('end', function(results) {
ended++;
t.equal(ended, 2);
t.same(results, expect[1]);
t.end();
});
r.pipe(w[0]);
r.pipe(w[2]);
});
});
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc