readable-stream
Advanced tools
Comparing version 0.0.1 to 0.0.2
var fs = require('fs'); | ||
var fst = fs.createReadStream(__filename); | ||
var Readable = require('../readable.js'); | ||
var rst = new Readable(fst); | ||
var rst = new Readable(); | ||
rst.wrap(fst); | ||
@@ -6,0 +7,0 @@ rst.on('end', function() { |
98
fs.js
@@ -1,2 +0,2 @@ | ||
"use strict"; | ||
'use strict'; | ||
@@ -23,3 +23,2 @@ module.exports = FSReadable; | ||
// a very basic memory pool. this optimization helps revent lots | ||
@@ -39,8 +38,5 @@ // of allocations when there are many fs readable streams happening | ||
function FSReadable(path, options) { | ||
Readable.apply(this); | ||
if (!options) options = {}; | ||
this._buffer = []; | ||
this._bufferLength = 0; | ||
Readable.apply(this, options); | ||
@@ -52,2 +48,3 @@ this.path = path; | ||
this.bufferSize = 64 * 1024; | ||
this.lowWaterMark = 16 * 1024; | ||
@@ -96,8 +93,10 @@ Object.keys(options).forEach(function(k) { | ||
this.emit('open', fd); | ||
this._read(); | ||
}.bind(this)); | ||
} | ||
FSReadable.prototype._read = function() { | ||
assert(typeof this.fd === 'number'); | ||
FSReadable.prototype._read = function(n, cb) { | ||
if (this.fd === null) { | ||
this.once('open', this._read.bind(this, n, cb)); | ||
return; | ||
} | ||
@@ -115,3 +114,3 @@ if (this.reading || this.ended || this.destroyed) return; | ||
var thisPool = pool; | ||
var toRead = Math.min(pool.length - pool.used, this.bufferSize); | ||
var toRead = Math.min(pool.length - pool.used, n); | ||
var start = pool.used; | ||
@@ -129,5 +128,5 @@ | ||
fs.read(this.fd, pool, pool.used, toRead, this.pos, afterRead.bind(this)); | ||
fs.read(this.fd, pool, pool.used, toRead, this.pos, onread.bind(this)); | ||
function afterRead(er, bytesRead) { | ||
function onread(er, bytesRead) { | ||
this.reading = false; | ||
@@ -137,78 +136,17 @@ | ||
this.destroy(); | ||
this.emit('error', er); | ||
return; | ||
return cb(er); | ||
} | ||
if (bytesRead === 0) { | ||
this.ended = true; | ||
if (this._bufferLength === 0) this.emit('end'); | ||
this.close(); | ||
return; | ||
var b = null; | ||
if (bytesRead > 0) { | ||
b = thisPool.slice(start, start + bytesRead); | ||
} | ||
var b = thisPool.slice(start, start + bytesRead); | ||
var needReadableEvent = this._bufferLength === 0; | ||
this._bufferLength += bytesRead; | ||
this._buffer.push(b); | ||
if (needReadableEvent) this.emit('readable'); | ||
cb(null, b); | ||
} | ||
} | ||
FSReadable.prototype.read = function(n) { | ||
if (this._bufferLength === 0) return null; | ||
if (isNaN(n) || n <= 0) n = this._bufferLength; | ||
var ret; | ||
if (n >= this._bufferLength) { | ||
ret = Buffer.concat(this._buffer); | ||
this._bufferLength = 0; | ||
this._buffer.length = 0; | ||
} else { | ||
// read just some of it. | ||
if (n === this._buffer[0].length) { | ||
// first buffer is a perfect match | ||
ret = this._buffer.shift(); | ||
} else if (n < this._buffer[0].length) { | ||
// just take a part of the first buffer. | ||
var buf = this._buffer[0]; | ||
ret = buf.slice(0, n); | ||
this._buffer[0] = buf.slice(n); | ||
} else { | ||
// complex case. | ||
ret = new Buffer(n); | ||
var c = 0; | ||
for (var i = 0; i < this._buffer.length && c < n; i++) { | ||
var buf = this._buffer[i]; | ||
var cpy = Math.min(n - c, buf.length); | ||
buf.copy(ret, c, 0, cpy); | ||
if (cpy < buf.length) { | ||
this._buffer[i] = buf.slice(cpy); | ||
this._buffer = this._buffer.slice(i); | ||
} | ||
n -= cpy; | ||
} | ||
} | ||
this._bufferLength -= n; | ||
} | ||
if (this._bufferLength === 0 && this.ended) { | ||
process.nextTick(this.emit.bind(this, 'end')); | ||
this.close(); | ||
} | ||
// if we've consumed below the desired threshold, pull in more bytes. | ||
if (this._bufferLength < this.bufferSize) { | ||
this._read(); | ||
} | ||
if (this._decoder) ret = this._decoder.write(ret); | ||
return ret; | ||
}; | ||
FSReadable.prototype.close = function(cb) { | ||
if (cb) this.once('close', cb); | ||
if (this.closed) { | ||
if (this.closed || this.fd === null) { | ||
if (this.fd === null) this.once('open', this.destroy); | ||
return process.nextTick(this.emit.bind(this, 'close')); | ||
@@ -215,0 +153,0 @@ } |
{ | ||
"name": "readable-stream", | ||
"version": "0.0.1", | ||
"version": "0.0.2", | ||
"description": "An exploration of a new kind of readable streams for Node.js", | ||
@@ -5,0 +5,0 @@ "main": "readable.js", |
184
readable.js
@@ -1,2 +0,2 @@ | ||
"use strict"; | ||
'use strict'; | ||
@@ -7,36 +7,121 @@ module.exports = Readable; | ||
var util = require('util'); | ||
var fromList = require('./from-list.js'); | ||
util.inherits(Readable, Stream); | ||
function Readable(stream) { | ||
if (stream) this.wrap(stream); | ||
function Readable(options) { | ||
options = options || {}; | ||
this.bufferSize = options.bufferSize || 16 * 1024; | ||
this.lowWaterMark = options.lowWaterMark || 1024; | ||
this.buffer = []; | ||
this.length = 0; | ||
this._pipes = []; | ||
this._flowing = false; | ||
Stream.apply(this); | ||
} | ||
// override this method. | ||
// you can override either this method, or _read(n, cb) below. | ||
Readable.prototype.read = function(n) { | ||
return null; | ||
if (this.length === 0 && this.ended) { | ||
process.nextTick(this.emit.bind(this, 'end')); | ||
return null; | ||
} | ||
if (isNaN(n) || n <= 0) n = this.length; | ||
n = Math.min(n, this.length); | ||
var ret = n > 0 ? fromList(n, this.buffer, this.length) : null; | ||
this.length -= n; | ||
if (!this.ended && this.length < this.lowWaterMark) { | ||
this._read(this.bufferSize, function onread(er, chunk) { | ||
if (er) return this.emit('error', er); | ||
if (!chunk || !chunk.length) { | ||
this.ended = true; | ||
if (this.length === 0) this.emit('end'); | ||
return; | ||
} | ||
this.length += chunk.length; | ||
this.buffer.push(chunk); | ||
if (this.length < this.lowWaterMark) { | ||
this._read(this.bufferSize, onread.bind(this)); | ||
} | ||
this.emit('readable'); | ||
}.bind(this)); | ||
} | ||
return ret; | ||
}; | ||
// abstract method. to be overridden in specific implementation classes. | ||
Readable.prototype._read = function(n, cb) { | ||
process.nextTick(cb.bind(this, new Error('not implemented'))); | ||
}; | ||
Readable.prototype.pipe = function(dest, opt) { | ||
if (!(opt && opt.end === false || dest === process.stdout || | ||
dest === process.stderr)) { | ||
this.on('end', dest.end.bind(dest)); | ||
var src = this; | ||
src._pipes.push(dest); | ||
if ((!opt || opt.end !== false) && | ||
dest !== process.stdout && | ||
dest !== process.stderr) { | ||
src.once('end', onend); | ||
dest.on('unpipe', function(readable) { | ||
if (readable === src) { | ||
src.removeListener('end', onend); | ||
} | ||
}); | ||
} | ||
dest.emit('pipe', this); | ||
dest.emit('pipe', src); | ||
if (!src._flowing) process.nextTick(flow.bind(src)); | ||
return dest; | ||
flow.call(this); | ||
function onend() { | ||
dest.end(); | ||
} | ||
}; | ||
function flow() { | ||
var chunk; | ||
while (chunk = this.read()) { | ||
function flow(src) { | ||
if (!src) src = this; | ||
var chunk; | ||
var dest; | ||
var needDrain = 0; | ||
while (chunk = src.read()) { | ||
src._pipes.forEach(function(dest, i, list) { | ||
var written = dest.write(chunk); | ||
if (!written) { | ||
dest.once('drain', flow.bind(this)); | ||
return; | ||
if (false === written) { | ||
needDrain++; | ||
dest.once('drain', ondrain); | ||
} | ||
}); | ||
if (needDrain > 0) return; | ||
} | ||
src.once('readable', flow); | ||
function ondrain() { | ||
needDrain--; | ||
if (needDrain === 0) { | ||
flow(src); | ||
} | ||
this.once('readable', flow); | ||
} | ||
} | ||
Readable.prototype.unpipe = function(dest) { | ||
if (!dest) { | ||
// remove all of them. | ||
this._pipes.forEach(function(dest, i, list) { | ||
dest.emit('unpipe', this); | ||
}, this); | ||
this._pipes.length = 0; | ||
} else { | ||
var i = this._pipes.indexOf(dest); | ||
if (i !== -1) { | ||
dest.emit('unpipe', this); | ||
this._pipes.splice(i, 1); | ||
} | ||
} | ||
return this; | ||
}; | ||
@@ -85,4 +170,4 @@ | ||
Readable.prototype.wrap = function(stream) { | ||
this._buffer = []; | ||
this._bufferLength = 0; | ||
this.buffer = []; | ||
this.length = 0; | ||
var paused = false; | ||
@@ -93,3 +178,3 @@ var ended = false; | ||
ended = true; | ||
if (this._bufferLength === 0) { | ||
if (this.length === 0) { | ||
this.emit('end'); | ||
@@ -100,7 +185,7 @@ } | ||
stream.on('data', function(chunk) { | ||
this._buffer.push(chunk); | ||
this._bufferLength += chunk.length; | ||
this.buffer.push(chunk); | ||
this.length += chunk.length; | ||
this.emit('readable'); | ||
// if not consumed, then pause the stream. | ||
if (this._bufferLength > 0 && !paused) { | ||
if (this.length > this.lowWaterMark && !paused) { | ||
paused = true; | ||
@@ -131,47 +216,16 @@ stream.pause(); | ||
this.read = function(n) { | ||
var ret; | ||
if (this.length === 0) return null; | ||
if (this._bufferLength === 0) { | ||
ret = null; | ||
} else if (!n || n >= this._bufferLength) { | ||
// read it all | ||
ret = Buffer.concat(this._buffer); | ||
this._bufferLength = 0; | ||
this._buffer.length = 0; | ||
} else { | ||
// read just some of it. | ||
if (n < this._buffer[0].length) { | ||
// just take a part of the first buffer. | ||
var buf = this._buffer[0]; | ||
ret = buf.slice(0, n); | ||
this._buffer[0] = buf.slice(n); | ||
} else if (n === this._buffer[0].length) { | ||
// first buffer is a perfect match | ||
ret = this._buffer.shift(); | ||
} else { | ||
// complex case. | ||
ret = new Buffer(n); | ||
var c = 0; | ||
for (var i = 0; i < this._buffer.length && c < n; i++) { | ||
var buf = this._buffer[i]; | ||
var cpy = Math.min(n - c, buf.length); | ||
buf.copy(ret, c, 0, cpy); | ||
if (cpy < buf.length) { | ||
this._buffer[i] = buf.slice(cpy); | ||
this._buffer = this._buffer.slice(i); | ||
} | ||
n -= cpy; | ||
} | ||
} | ||
this._bufferLength -= n; | ||
if (isNaN(n) || n <= 0) n = this.length; | ||
var ret = fromList(n, this.buffer, this.length); | ||
this.length = Math.max(0, this.length - n); | ||
if (this.length < this.lowWaterMark && paused) { | ||
stream.resume(); | ||
paused = false; | ||
} | ||
if (this._bufferLength === 0) { | ||
if (paused) { | ||
stream.resume(); | ||
paused = false; | ||
} | ||
if (ended) { | ||
process.nextTick(this.emit.bind(this, 'end')); | ||
} | ||
if (this.length === 0 && ended) { | ||
process.nextTick(this.emit.bind(this, 'end')); | ||
} | ||
@@ -178,0 +232,0 @@ return ret; |
@@ -5,2 +5,3 @@ var tap = require('tap'); | ||
var util = require('util'); | ||
var EE = require('events').EventEmitter; | ||
@@ -18,3 +19,6 @@ function TestReader(n) { | ||
TestReader.prototype.read = function(n) { | ||
var toRead = Math.min(n, this._buffer.length - this._pos); | ||
var max = this._buffer.length - this._pos; | ||
n = n || max; | ||
n = Math.max(n, 0); | ||
var toRead = Math.min(n, max); | ||
if (toRead === 0) { | ||
@@ -26,5 +30,8 @@ // simulate the read buffer filling up with some more bytes some time | ||
this._bufs -= 1; | ||
if (this._bufs === 0) { | ||
if (this._bufs <= 0) { | ||
// read them all! | ||
this.emit('end'); | ||
if (!this.ended) { | ||
this.emit('end'); | ||
this.ended = true; | ||
} | ||
} else { | ||
@@ -42,2 +49,30 @@ this.emit('readable'); | ||
///// | ||
function TestWriter() { | ||
EE.apply(this); | ||
this.received = []; | ||
this.flush = false; | ||
} | ||
util.inherits(TestWriter, EE); | ||
TestWriter.prototype.write = function(c) { | ||
this.received.push(c.toString()); | ||
this.emit('write', c); | ||
return true; | ||
// flip back and forth between immediate acceptance and not. | ||
this.flush = !this.flush; | ||
if (!this.flush) setTimeout(this.emit.bind(this, 'drain'), 10); | ||
return this.flush; | ||
}; | ||
TestWriter.prototype.end = function(c) { | ||
if (c) this.write(c); | ||
this.emit('end', this.received); | ||
}; | ||
//////// | ||
tap.test('a most basic test', function(t) { | ||
@@ -84,1 +119,152 @@ var r = new TestReader(20); | ||
}); | ||
tap.test('pipe', function(t) { | ||
var r = new TestReader(5); | ||
var expect = [ 'xxxxx', | ||
'xxxxx', | ||
'xxxxx', | ||
'xxxxx', | ||
'xxxxx', | ||
'xxxxx', | ||
'xxxxx', | ||
'xxxxx', | ||
'xxxxx', | ||
'xxxxx' ] | ||
var w = new TestWriter; | ||
var flush = true; | ||
w.on('end', function(received) { | ||
t.same(received, expect); | ||
t.end(); | ||
}); | ||
r.pipe(w); | ||
}); | ||
[1,2,3,4,5,6,7,8,9].forEach(function(SPLIT) { | ||
tap.test('unpipe', function(t) { | ||
var r = new TestReader(5); | ||
// unpipe after 3 writes, then write to another stream instead. | ||
var expect = [ 'xxxxx', | ||
'xxxxx', | ||
'xxxxx', | ||
'xxxxx', | ||
'xxxxx', | ||
'xxxxx', | ||
'xxxxx', | ||
'xxxxx', | ||
'xxxxx', | ||
'xxxxx' ]; | ||
expect = [ expect.slice(0, SPLIT), expect.slice(SPLIT) ]; | ||
var w = [ new TestWriter(), new TestWriter() ]; | ||
var writes = SPLIT; | ||
w[0].on('write', function() { | ||
if (--writes === 0) { | ||
r.unpipe(); | ||
w[0].end(); | ||
r.pipe(w[1]); | ||
} | ||
}); | ||
var ended = 0; | ||
w[0].on('end', function(results) { | ||
ended++; | ||
t.same(results, expect[0]); | ||
}); | ||
w[1].on('end', function(results) { | ||
ended++; | ||
t.equal(ended, 2); | ||
t.same(results, expect[1]); | ||
t.end(); | ||
}); | ||
r.pipe(w[0]); | ||
}); | ||
}); | ||
// both writers should get the same exact data. | ||
tap.test('multipipe', function(t) { | ||
var r = new TestReader(5); | ||
var w = [ new TestWriter, new TestWriter ]; | ||
var expect = [ 'xxxxx', | ||
'xxxxx', | ||
'xxxxx', | ||
'xxxxx', | ||
'xxxxx', | ||
'xxxxx', | ||
'xxxxx', | ||
'xxxxx', | ||
'xxxxx', | ||
'xxxxx' ]; | ||
var c = 2; | ||
w[0].on('end', function(received) { | ||
t.same(received, expect, 'first'); | ||
if (--c === 0) t.end(); | ||
}); | ||
w[1].on('end', function(received) { | ||
t.same(received, expect, 'second'); | ||
if (--c === 0) t.end(); | ||
}); | ||
r.pipe(w[0]); | ||
r.pipe(w[1]); | ||
}); | ||
[1,2,3,4,5,6,7,8,9].forEach(function(SPLIT) { | ||
tap.test('multi-unpipe', function(t) { | ||
var r = new TestReader(5); | ||
// unpipe after 3 writes, then write to another stream instead. | ||
var expect = [ 'xxxxx', | ||
'xxxxx', | ||
'xxxxx', | ||
'xxxxx', | ||
'xxxxx', | ||
'xxxxx', | ||
'xxxxx', | ||
'xxxxx', | ||
'xxxxx', | ||
'xxxxx' ]; | ||
expect = [ expect.slice(0, SPLIT), expect.slice(SPLIT) ]; | ||
var w = [ new TestWriter(), new TestWriter(), new TestWriter() ]; | ||
var writes = SPLIT; | ||
w[0].on('write', function() { | ||
if (--writes === 0) { | ||
r.unpipe(); | ||
w[0].end(); | ||
r.pipe(w[1]); | ||
} | ||
}); | ||
var ended = 0; | ||
w[0].on('end', function(results) { | ||
ended++; | ||
t.same(results, expect[0]); | ||
}); | ||
w[1].on('end', function(results) { | ||
ended++; | ||
t.equal(ended, 2); | ||
t.same(results, expect[1]); | ||
t.end(); | ||
}); | ||
r.pipe(w[0]); | ||
r.pipe(w[2]); | ||
}); | ||
}); |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
New author
Supply chain riskA new npm collaborator published a version of the package for the first time. New collaborators are usually benign additions to a project, but do indicate a change to the security surface area of a package.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Non-existent author
Supply chain riskThe package was published by an npm account that no longer exists.
Found 1 instance in 1 package
31096
15
853
0
1