bufferedstream
Advanced tools
Comparing version 1.2.0 to 1.3.0
@@ -7,7 +7,6 @@ var util = require('util'); | ||
/** | ||
* A readable/writable Stream that buffers data until next tick. The maxSize | ||
* determines the byte size at which the buffer is considered "full". This is a | ||
* soft limit that is only used to determine when calls to write will return | ||
* false, which indicates to a writing stream that it should pause. This | ||
* argument may be omitted to indicate this stream has no maximum size. | ||
* A readable/writable Stream subclass that buffers data until next tick. The | ||
* maxSize determines the number of bytes the buffer can hold before it is | ||
* considered "full". This argument may be omitted to indicate this stream has | ||
* no maximum size. | ||
* | ||
@@ -18,4 +17,13 @@ * The source and sourceEncoding arguments may be used to easily wrap this | ||
* contents of this stream and passed to end. | ||
* | ||
* NOTE: The maxSize is a soft limit that is only used to determine when calls | ||
* to write will return false, indicating to streams that are writing to this | ||
* stream that they should pause. In any case, calls to write will still append | ||
* to the buffer so that no data is lost. | ||
*/ | ||
function BufferedStream(maxSize, source, sourceEncoding) { | ||
if (!(this instanceof BufferedStream)) { | ||
return new BufferedStream(maxSize, source, sourceEncoding); | ||
} | ||
Stream.call(this); | ||
@@ -33,6 +41,6 @@ | ||
this.encoding = null; | ||
this.paused = false; | ||
this.ended = false; | ||
this.readable = true; | ||
this.writable = true; | ||
this.paused = false; | ||
this.ended = false; | ||
@@ -70,3 +78,3 @@ this._buffer = []; | ||
* Sets this stream's encoding. If an encoding is set, this stream will emit | ||
* strings using that encoding. Otherwise, it emits buffers. | ||
* strings using that encoding. Otherwise, it emits Buffer objects. | ||
*/ | ||
@@ -95,3 +103,6 @@ BufferedStream.prototype.setEncoding = function (encoding) { | ||
this.emit('resume'); | ||
flushOnNextTick(this); | ||
if (!this.empty) { | ||
flushOnNextTick(this); | ||
} | ||
} | ||
@@ -106,3 +117,3 @@ }; | ||
BufferedStream.prototype.write = function (chunk, encoding) { | ||
if (!this.writable || this.ended) { | ||
if (!this.writable) { | ||
throw new Error('Stream is not writable'); | ||
@@ -117,3 +128,2 @@ } | ||
this.size += chunk.length; | ||
flushOnNextTick(this); | ||
@@ -130,36 +140,6 @@ | ||
/** | ||
* Tries to emit all data that is currently in the buffer out to all data | ||
* listeners. If this stream is paused, not readable, has no data in the buffer | ||
* this method does nothing. If this stream has previously returned false from | ||
* a write and any space is available in the buffer after flushing, a drain | ||
* event is emitted. | ||
*/ | ||
BufferedStream.prototype.flush = function () { | ||
var chunk; | ||
while (!this.paused && this.readable && this._buffer.length) { | ||
chunk = this._buffer.shift(); | ||
this.size -= chunk.length; | ||
if (this.encoding) { | ||
this.emit('data', chunk.toString(this.encoding)); | ||
} else { | ||
this.emit('data', chunk); | ||
} | ||
} | ||
// Emit "drain" if the stream was full at one point but now | ||
// has some room in the buffer. | ||
if (this._wasFull && !this.full) { | ||
this._wasFull = false; | ||
this.emit('drain'); | ||
} | ||
if (this.ended && this.empty) { | ||
this._emitEnd(); | ||
} | ||
}; | ||
/** | ||
* Writes the given chunk to this stream and queues the end event to be | ||
* called as soon as all data events have been emitted. | ||
* called as soon as soon as possible. If the stream is not currently | ||
* scheduled to be flushed, the end event will fire immediately. Otherwise, it | ||
* will fire after the next flush. | ||
*/ | ||
@@ -177,23 +157,23 @@ BufferedStream.prototype.end = function (chunk, encoding) { | ||
if (this.empty) { | ||
this._emitEnd(); | ||
// If the stream isn't already scheduled to flush on the next tick we can | ||
// safely end it now. Otherwise it will end after the next flush. | ||
if (!this._flushing) { | ||
end(this); | ||
} | ||
}; | ||
BufferedStream.prototype._emitEnd = function () { | ||
this._buffer = null; | ||
this.readable = false; | ||
this.writable = false; | ||
this.emit('end'); | ||
}; | ||
function flushOnNextTick(stream) { | ||
if (!stream._flushing) { | ||
process.nextTick(function flush() { | ||
stream.flush(); | ||
process.nextTick(function tick() { | ||
if (stream.paused) { | ||
stream._flushing = false; | ||
return; | ||
} | ||
if (stream.empty || stream.paused) { | ||
flush(stream); | ||
if (stream.empty) { | ||
stream._flushing = false; | ||
} else { | ||
process.nextTick(flush); | ||
process.nextTick(tick); | ||
} | ||
@@ -205,1 +185,35 @@ }); | ||
} | ||
function flush(stream) { | ||
var chunk; | ||
while (stream._buffer.length) { | ||
chunk = stream._buffer.shift(); | ||
stream.size -= chunk.length; | ||
if (stream.encoding) { | ||
stream.emit('data', chunk.toString(stream.encoding)); | ||
} else { | ||
stream.emit('data', chunk); | ||
} | ||
// If the stream was full at one point but isn't now, emit "drain". | ||
if (stream._wasFull && !stream.full) { | ||
stream._wasFull = false; | ||
stream.emit('drain'); | ||
} | ||
// If the stream was paused in some data event handler, break. | ||
if (stream.paused) { | ||
break; | ||
} | ||
} | ||
if (stream.ended) { | ||
end(stream); | ||
} | ||
} | ||
function end(stream) { | ||
stream.emit('end'); | ||
stream._buffer = null; | ||
} |
@@ -5,3 +5,3 @@ { | ||
"description": "A base stream class for node that reliably buffers until next tick", | ||
"version": "1.2.0", | ||
"version": "1.3.0", | ||
"repository": { | ||
@@ -8,0 +8,0 @@ "type": "git", |
@@ -43,5 +43,4 @@ var assert = require('assert'); | ||
describe('with a maxSize of 0', function () { | ||
var stream = new BufferedStream(0); | ||
it('is not full', function () { | ||
var stream = new BufferedStream(0); | ||
assert.ok(!stream.full); | ||
@@ -51,8 +50,16 @@ }); | ||
describe('that is paused', function () { | ||
var pauseCount, resumeCount, stream; | ||
describe('setEncoding', function () { | ||
it('sets the encoding of the stream', function () { | ||
var stream = new BufferedStream; | ||
stream.setEncoding('utf8'); | ||
assert.equal(stream.encoding, 'utf8'); | ||
}); | ||
}); | ||
describe('', function () { | ||
var stream, pauseCount, resumeCount; | ||
beforeEach(function () { | ||
stream = new BufferedStream; | ||
pauseCount = 0; | ||
resumeCount = 0; | ||
stream = new BufferedStream; | ||
@@ -66,92 +73,33 @@ stream.on('pause', function () { | ||
}); | ||
stream.pause(); | ||
}); | ||
it('does not emit pause when paused again', function () { | ||
assert.equal(pauseCount, 1); | ||
stream.pause(); | ||
assert.equal(pauseCount, 1); | ||
}); | ||
describe('that is not paused', function () { | ||
it('does not emit resume when resumed', function () { | ||
assert.equal(resumeCount, 0); | ||
stream.resume(); | ||
assert.equal(resumeCount, 0); | ||
}); | ||
it('emits resume when resumed', function () { | ||
assert.equal(resumeCount, 0); | ||
stream.resume(); | ||
assert.equal(resumeCount, 1); | ||
}); | ||
}); | ||
describe('after end has been called', function () { | ||
var stream = new BufferedStream; | ||
stream.end(); | ||
it('is empty', function () { | ||
assert.ok(stream.empty); | ||
}); | ||
it('is ended', function () { | ||
assert.ok(stream.ended); | ||
}); | ||
it('throws an error when written to', function () { | ||
assert.throws(function () { | ||
stream.write('hello'); | ||
}, /not writable/); | ||
}); | ||
it('throws an error when end is called', function () { | ||
assert.throws(function () { | ||
stream.end(); | ||
}, /already ended/); | ||
}); | ||
}); | ||
describe('with string contents and no encoding', function () { | ||
it('emits buffers', function (callback) { | ||
var stream = new BufferedStream('hello'); | ||
stream.on('data', function (chunk) { | ||
assert.ok(chunk instanceof Buffer); | ||
callback(null); | ||
it('emits pause when paused', function () { | ||
assert.equal(pauseCount, 0); | ||
stream.pause(); | ||
assert.equal(pauseCount, 1); | ||
}); | ||
}); | ||
}); | ||
describe('with string contents and an encoding', function () { | ||
var chunk; | ||
beforeEach(function (callback) { | ||
var stream = new BufferedStream('hello'); | ||
stream.setEncoding('base64'); | ||
stream.on('data', function (c) { | ||
chunk = c; | ||
callback(null); | ||
describe('that is paused', function () { | ||
beforeEach(function () { | ||
stream.pause(); | ||
}); | ||
}); | ||
it('emits strings', function () { | ||
assert.equal(typeof chunk, 'string'); | ||
}); | ||
it('uses the proper encoding', function () { | ||
assert.equal(chunk, new Buffer('hello').toString('base64')); | ||
}); | ||
}); | ||
describe('when write() is called with a string in base64 encoding', function () { | ||
it('uses the proper encoding', function (callback) { | ||
var content = 'hello'; | ||
var stream = new BufferedStream; | ||
stream.write(new Buffer(content).toString('base64'), 'base64'); | ||
stream.end(); | ||
var buffer = ''; | ||
stream.on('data', function (chunk) { | ||
buffer += chunk.toString(); | ||
it('does not emit pause when paused again', function () { | ||
assert.equal(pauseCount, 1); | ||
stream.pause(); | ||
assert.equal(pauseCount, 1); | ||
}); | ||
stream.on('end', function () { | ||
assert.equal(buffer, content); | ||
callback(null); | ||
it('emits resume when resumed', function () { | ||
assert.equal(resumeCount, 0); | ||
stream.resume(); | ||
assert.equal(resumeCount, 1); | ||
}); | ||
@@ -183,27 +131,83 @@ }); | ||
describe('when sourced from a string', function () { | ||
testSourceType('Hello world', String); | ||
describe('write', function () { | ||
it('throws when a stream is not writable', function () { | ||
var stream = new BufferedStream; | ||
stream.writable = false; | ||
assert.throws(function () { | ||
stream.write('test'); | ||
}, /not writable/); | ||
}); | ||
describe('when called with a string in base64 encoding', function () { | ||
it('uses the proper encoding', function (callback) { | ||
var content = 'hello'; | ||
var stream = new BufferedStream; | ||
stream.write(new Buffer(content).toString('base64'), 'base64'); | ||
stream.end(); | ||
collectDataInString(stream, function (string) { | ||
assert.equal(string, content); | ||
callback(null); | ||
}); | ||
}); | ||
}); | ||
}); | ||
describe('when sourced from a Buffer', function () { | ||
testSourceType('Hello world', Buffer); | ||
describe('end', function () { | ||
var stream; | ||
beforeEach(function () { | ||
stream = new BufferedStream; | ||
stream.end(); | ||
}); | ||
it('makes a stream ended', function () { | ||
assert.ok(stream.ended); | ||
}); | ||
it('throws an error when end is called', function () { | ||
assert.throws(function () { | ||
stream.end(); | ||
}, /already ended/); | ||
}); | ||
}); | ||
describe('when sourced from a Stream', function () { | ||
testSourceType('Hello world', BufferedStream); | ||
}); | ||
testSourceType('String', String); | ||
testSourceType('Buffer', Buffer); | ||
testSourceType('BufferedStream', BufferedStream); | ||
}); | ||
function bufferSource(source, callback) { | ||
var stream = new BufferedStream(source); | ||
var buffer = ''; | ||
function collectData(stream, callback) { | ||
var data = []; | ||
stream.on('data', function (chunk) { | ||
buffer += chunk.toString(); | ||
data.push(chunk); | ||
}); | ||
stream.on('end', function () { | ||
callback(null, buffer); | ||
callback(data); | ||
}); | ||
} | ||
function stringifyData(data) { | ||
return data.map(function (chunk) { | ||
return chunk.toString(); | ||
}).join(''); | ||
} | ||
function collectDataInString(stream, callback) { | ||
collectData(stream, function (data) { | ||
callback(stringifyData(data)); | ||
}); | ||
} | ||
function collectDataFromSource(source, encoding, callback) { | ||
if (typeof encoding === 'function') { | ||
callback = encoding; | ||
encoding = null; | ||
} | ||
var stream = new BufferedStream(source); | ||
stream.encoding = encoding; | ||
collectData(stream, callback); | ||
if (typeof source.resume === 'function') { | ||
@@ -216,6 +220,5 @@ source.resume(); | ||
function temporarilyPauseThenBufferSource(source, callback) { | ||
var stream = bufferSource(source, callback); | ||
function temporarilyPauseThenCollectDataFromSource(source, encoding, callback) { | ||
var stream = collectDataFromSource(source, encoding, callback); | ||
stream.pause(); | ||
setTimeout(function () { | ||
@@ -226,35 +229,59 @@ stream.resume(); | ||
function testSourceType(content, sourceType) { | ||
var source; | ||
beforeEach(function () { | ||
source = new sourceType(content); | ||
function testSourceType(sourceTypeName, sourceType) { | ||
describe('when sourced from a ' + sourceTypeName, function () { | ||
var content = 'Hello world'; | ||
var source; | ||
beforeEach(function () { | ||
source = sourceType(content); | ||
if (typeof source.pause === 'function') { | ||
source.pause(); | ||
} | ||
}); | ||
if (typeof source.pause === 'function') { | ||
source.pause(); | ||
} | ||
}); | ||
it("emits its content", function (callback) { | ||
bufferSource(source, function (err, buffer) { | ||
if (err) { | ||
callback(err); | ||
} else { | ||
assert.equal(buffer, content); | ||
it('emits its content as Buffers', function (callback) { | ||
collectDataFromSource(source, function (data) { | ||
data.forEach(function (chunk) { | ||
assert.ok(chunk instanceof Buffer); | ||
}); | ||
assert.equal(stringifyData(data), content); | ||
callback(null); | ||
} | ||
}); | ||
}); | ||
}); | ||
describe('and temporarily paused', function () { | ||
it("emits its content", function (callback) { | ||
temporarilyPauseThenBufferSource(source, function (err, buffer) { | ||
if (err) { | ||
callback(err); | ||
} else { | ||
assert.equal(buffer, content); | ||
describe('and an encoding is set', function () { | ||
it('emits its content as strings', function (callback) { | ||
collectDataFromSource(source, 'utf8', function (data) { | ||
data.forEach(function (chunk) { | ||
assert.equal(typeof chunk, 'string'); | ||
}); | ||
assert.equal(stringifyData(data), content); | ||
callback(null); | ||
} | ||
}); | ||
}); | ||
}); | ||
describe('and temporarily paused', function () { | ||
it('emits its content as Buffers', function (callback) { | ||
temporarilyPauseThenCollectDataFromSource(source, function (data) { | ||
data.forEach(function (chunk) { | ||
assert.ok(chunk instanceof Buffer); | ||
}); | ||
assert.equal(stringifyData(data), content); | ||
callback(null); | ||
}); | ||
}); | ||
describe('and an encoding is set', function () { | ||
it('emits its content as strings', function (callback) { | ||
temporarilyPauseThenCollectDataFromSource(source, 'utf8', function (data) { | ||
data.forEach(function (chunk) { | ||
assert.equal(typeof chunk, 'string'); | ||
}); | ||
assert.equal(stringifyData(data), content); | ||
callback(null); | ||
}); | ||
}); | ||
}); | ||
}); | ||
}); | ||
} |
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
17192
432