Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

bufferedstream

Package Overview
Dependencies
Maintainers
1
Versions
32
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

bufferedstream - npm Package Compare versions

Comparing version 1.2.0 to 1.3.0

128

buffered-stream.js

@@ -7,7 +7,6 @@ var util = require('util');

/**
* A readable/writable Stream that buffers data until next tick. The maxSize
* determines the byte size at which the buffer is considered "full". This is a
* soft limit that is only used to determine when calls to write will return
* false, which indicates to a writing stream that it should pause. This
* argument may be omitted to indicate this stream has no maximum size.
* A readable/writable Stream subclass that buffers data until next tick. The
* maxSize determines the number of bytes the buffer can hold before it is
* considered "full". This argument may be omitted to indicate this stream has
* no maximum size.
*

@@ -18,4 +17,13 @@ * The source and sourceEncoding arguments may be used to easily wrap this

* contents of this stream and passed to end.
*
* NOTE: The maxSize is a soft limit that is only used to determine when calls
* to write will return false, indicating to streams that are writing to this
* stream that they should pause. In any case, calls to write will still append
* to the buffer so that no data is lost.
*/
function BufferedStream(maxSize, source, sourceEncoding) {
if (!(this instanceof BufferedStream)) {
return new BufferedStream(maxSize, source, sourceEncoding);
}
Stream.call(this);

@@ -33,6 +41,6 @@

this.encoding = null;
this.paused = false;
this.ended = false;
this.readable = true;
this.writable = true;
this.paused = false;
this.ended = false;

@@ -70,3 +78,3 @@ this._buffer = [];

* Sets this stream's encoding. If an encoding is set, this stream will emit
* strings using that encoding. Otherwise, it emits buffers.
* strings using that encoding. Otherwise, it emits Buffer objects.
*/

@@ -95,3 +103,6 @@ BufferedStream.prototype.setEncoding = function (encoding) {

this.emit('resume');
flushOnNextTick(this);
if (!this.empty) {
flushOnNextTick(this);
}
}

@@ -106,3 +117,3 @@ };

BufferedStream.prototype.write = function (chunk, encoding) {
if (!this.writable || this.ended) {
if (!this.writable) {
throw new Error('Stream is not writable');

@@ -117,3 +128,2 @@ }

this.size += chunk.length;
flushOnNextTick(this);

@@ -130,36 +140,6 @@

/**
* Tries to emit all data that is currently in the buffer out to all data
* listeners. If this stream is paused, not readable, has no data in the buffer
* this method does nothing. If this stream has previously returned false from
* a write and any space is available in the buffer after flushing, a drain
* event is emitted.
*/
BufferedStream.prototype.flush = function () {
var chunk;
while (!this.paused && this.readable && this._buffer.length) {
chunk = this._buffer.shift();
this.size -= chunk.length;
if (this.encoding) {
this.emit('data', chunk.toString(this.encoding));
} else {
this.emit('data', chunk);
}
}
// Emit "drain" if the stream was full at one point but now
// has some room in the buffer.
if (this._wasFull && !this.full) {
this._wasFull = false;
this.emit('drain');
}
if (this.ended && this.empty) {
this._emitEnd();
}
};
/**
* Writes the given chunk to this stream and queues the end event to be
* called as soon as all data events have been emitted.
* called as soon as soon as possible. If the stream is not currently
* scheduled to be flushed, the end event will fire immediately. Otherwise, it
* will fire after the next flush.
*/

@@ -177,23 +157,23 @@ BufferedStream.prototype.end = function (chunk, encoding) {

if (this.empty) {
this._emitEnd();
// If the stream isn't already scheduled to flush on the next tick we can
// safely end it now. Otherwise it will end after the next flush.
if (!this._flushing) {
end(this);
}
};
BufferedStream.prototype._emitEnd = function () {
this._buffer = null;
this.readable = false;
this.writable = false;
this.emit('end');
};
function flushOnNextTick(stream) {
if (!stream._flushing) {
process.nextTick(function flush() {
stream.flush();
process.nextTick(function tick() {
if (stream.paused) {
stream._flushing = false;
return;
}
if (stream.empty || stream.paused) {
flush(stream);
if (stream.empty) {
stream._flushing = false;
} else {
process.nextTick(flush);
process.nextTick(tick);
}

@@ -205,1 +185,35 @@ });

}
function flush(stream) {
var chunk;
while (stream._buffer.length) {
chunk = stream._buffer.shift();
stream.size -= chunk.length;
if (stream.encoding) {
stream.emit('data', chunk.toString(stream.encoding));
} else {
stream.emit('data', chunk);
}
// If the stream was full at one point but isn't now, emit "drain".
if (stream._wasFull && !stream.full) {
stream._wasFull = false;
stream.emit('drain');
}
// If the stream was paused in some data event handler, break.
if (stream.paused) {
break;
}
}
if (stream.ended) {
end(stream);
}
}
function end(stream) {
stream.emit('end');
stream._buffer = null;
}

@@ -5,3 +5,3 @@ {

"description": "A base stream class for node that reliably buffers until next tick",
"version": "1.2.0",
"version": "1.3.0",
"repository": {

@@ -8,0 +8,0 @@ "type": "git",

@@ -43,5 +43,4 @@ var assert = require('assert');

describe('with a maxSize of 0', function () {
var stream = new BufferedStream(0);
it('is not full', function () {
var stream = new BufferedStream(0);
assert.ok(!stream.full);

@@ -51,8 +50,16 @@ });

describe('that is paused', function () {
var pauseCount, resumeCount, stream;
describe('setEncoding', function () {
it('sets the encoding of the stream', function () {
var stream = new BufferedStream;
stream.setEncoding('utf8');
assert.equal(stream.encoding, 'utf8');
});
});
describe('', function () {
var stream, pauseCount, resumeCount;
beforeEach(function () {
stream = new BufferedStream;
pauseCount = 0;
resumeCount = 0;
stream = new BufferedStream;

@@ -66,92 +73,33 @@ stream.on('pause', function () {

});
stream.pause();
});
it('does not emit pause when paused again', function () {
assert.equal(pauseCount, 1);
stream.pause();
assert.equal(pauseCount, 1);
});
describe('that is not paused', function () {
it('does not emit resume when resumed', function () {
assert.equal(resumeCount, 0);
stream.resume();
assert.equal(resumeCount, 0);
});
it('emits resume when resumed', function () {
assert.equal(resumeCount, 0);
stream.resume();
assert.equal(resumeCount, 1);
});
});
describe('after end has been called', function () {
var stream = new BufferedStream;
stream.end();
it('is empty', function () {
assert.ok(stream.empty);
});
it('is ended', function () {
assert.ok(stream.ended);
});
it('throws an error when written to', function () {
assert.throws(function () {
stream.write('hello');
}, /not writable/);
});
it('throws an error when end is called', function () {
assert.throws(function () {
stream.end();
}, /already ended/);
});
});
describe('with string contents and no encoding', function () {
it('emits buffers', function (callback) {
var stream = new BufferedStream('hello');
stream.on('data', function (chunk) {
assert.ok(chunk instanceof Buffer);
callback(null);
it('emits pause when paused', function () {
assert.equal(pauseCount, 0);
stream.pause();
assert.equal(pauseCount, 1);
});
});
});
describe('with string contents and an encoding', function () {
var chunk;
beforeEach(function (callback) {
var stream = new BufferedStream('hello');
stream.setEncoding('base64');
stream.on('data', function (c) {
chunk = c;
callback(null);
describe('that is paused', function () {
beforeEach(function () {
stream.pause();
});
});
it('emits strings', function () {
assert.equal(typeof chunk, 'string');
});
it('uses the proper encoding', function () {
assert.equal(chunk, new Buffer('hello').toString('base64'));
});
});
describe('when write() is called with a string in base64 encoding', function () {
it('uses the proper encoding', function (callback) {
var content = 'hello';
var stream = new BufferedStream;
stream.write(new Buffer(content).toString('base64'), 'base64');
stream.end();
var buffer = '';
stream.on('data', function (chunk) {
buffer += chunk.toString();
it('does not emit pause when paused again', function () {
assert.equal(pauseCount, 1);
stream.pause();
assert.equal(pauseCount, 1);
});
stream.on('end', function () {
assert.equal(buffer, content);
callback(null);
it('emits resume when resumed', function () {
assert.equal(resumeCount, 0);
stream.resume();
assert.equal(resumeCount, 1);
});

@@ -183,27 +131,83 @@ });

describe('when sourced from a string', function () {
testSourceType('Hello world', String);
describe('write', function () {
it('throws when a stream is not writable', function () {
var stream = new BufferedStream;
stream.writable = false;
assert.throws(function () {
stream.write('test');
}, /not writable/);
});
describe('when called with a string in base64 encoding', function () {
it('uses the proper encoding', function (callback) {
var content = 'hello';
var stream = new BufferedStream;
stream.write(new Buffer(content).toString('base64'), 'base64');
stream.end();
collectDataInString(stream, function (string) {
assert.equal(string, content);
callback(null);
});
});
});
});
describe('when sourced from a Buffer', function () {
testSourceType('Hello world', Buffer);
describe('end', function () {
var stream;
beforeEach(function () {
stream = new BufferedStream;
stream.end();
});
it('makes a stream ended', function () {
assert.ok(stream.ended);
});
it('throws an error when end is called', function () {
assert.throws(function () {
stream.end();
}, /already ended/);
});
});
describe('when sourced from a Stream', function () {
testSourceType('Hello world', BufferedStream);
});
testSourceType('String', String);
testSourceType('Buffer', Buffer);
testSourceType('BufferedStream', BufferedStream);
});
function bufferSource(source, callback) {
var stream = new BufferedStream(source);
var buffer = '';
function collectData(stream, callback) {
var data = [];
stream.on('data', function (chunk) {
buffer += chunk.toString();
data.push(chunk);
});
stream.on('end', function () {
callback(null, buffer);
callback(data);
});
}
function stringifyData(data) {
return data.map(function (chunk) {
return chunk.toString();
}).join('');
}
function collectDataInString(stream, callback) {
collectData(stream, function (data) {
callback(stringifyData(data));
});
}
function collectDataFromSource(source, encoding, callback) {
if (typeof encoding === 'function') {
callback = encoding;
encoding = null;
}
var stream = new BufferedStream(source);
stream.encoding = encoding;
collectData(stream, callback);
if (typeof source.resume === 'function') {

@@ -216,6 +220,5 @@ source.resume();

function temporarilyPauseThenBufferSource(source, callback) {
var stream = bufferSource(source, callback);
function temporarilyPauseThenCollectDataFromSource(source, encoding, callback) {
var stream = collectDataFromSource(source, encoding, callback);
stream.pause();
setTimeout(function () {

@@ -226,35 +229,59 @@ stream.resume();

function testSourceType(content, sourceType) {
var source;
beforeEach(function () {
source = new sourceType(content);
function testSourceType(sourceTypeName, sourceType) {
describe('when sourced from a ' + sourceTypeName, function () {
var content = 'Hello world';
var source;
beforeEach(function () {
source = sourceType(content);
if (typeof source.pause === 'function') {
source.pause();
}
});
if (typeof source.pause === 'function') {
source.pause();
}
});
it("emits its content", function (callback) {
bufferSource(source, function (err, buffer) {
if (err) {
callback(err);
} else {
assert.equal(buffer, content);
it('emits its content as Buffers', function (callback) {
collectDataFromSource(source, function (data) {
data.forEach(function (chunk) {
assert.ok(chunk instanceof Buffer);
});
assert.equal(stringifyData(data), content);
callback(null);
}
});
});
});
describe('and temporarily paused', function () {
it("emits its content", function (callback) {
temporarilyPauseThenBufferSource(source, function (err, buffer) {
if (err) {
callback(err);
} else {
assert.equal(buffer, content);
describe('and an encoding is set', function () {
it('emits its content as strings', function (callback) {
collectDataFromSource(source, 'utf8', function (data) {
data.forEach(function (chunk) {
assert.equal(typeof chunk, 'string');
});
assert.equal(stringifyData(data), content);
callback(null);
}
});
});
});
describe('and temporarily paused', function () {
it('emits its content as Buffers', function (callback) {
temporarilyPauseThenCollectDataFromSource(source, function (data) {
data.forEach(function (chunk) {
assert.ok(chunk instanceof Buffer);
});
assert.equal(stringifyData(data), content);
callback(null);
});
});
describe('and an encoding is set', function () {
it('emits its content as strings', function (callback) {
temporarilyPauseThenCollectDataFromSource(source, 'utf8', function (data) {
data.forEach(function (chunk) {
assert.equal(typeof chunk, 'string');
});
assert.equal(stringifyData(data), content);
callback(null);
});
});
});
});
});
}

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc