Socket
Socket
Sign inDemoInstall

combined-stream

Package Overview
Dependencies
1
Maintainers
0
Versions
17
Alerts
File Explorer

Advanced tools

Install Socket

Detect and block malicious and high-risk dependencies

Install

Comparing version 0.0.0 to 0.0.1

License

114

lib/combined_stream.js

@@ -9,8 +9,20 @@ var util = require('util');

this.readable = true;
this.dataSize = 0;
this.maxDataSize = 2 * 1024 * 1024;
this.pauseStreams = true;
this._released = false;
this._streams = [];
this._currentStream = null;
}
util.inherits(CombinedStream, Stream);
CombinedStream.create = function() {
CombinedStream.create = function(options) {
var combinedStream = new this();
options = options || {};
for (var option in options) {
combinedStream[option] = options[option];
}
return combinedStream;

@@ -21,6 +33,20 @@ };

if (typeof stream !== 'function') {
stream = DelayedStream.create(stream);
if (!(stream instanceof DelayedStream)) {
stream.on('data', this._checkDataSize.bind(this));
stream = DelayedStream.create(stream, {
maxDataSize: Infinity,
pauseStream: this.pauseStreams,
});
}
this._register(stream);
if (this.pauseStreams) {
stream.pause();
}
}
this._streams.push(stream);
return this;
};

@@ -30,12 +56,9 @@

Stream.prototype.pipe.call(this, dest, options);
this.release();
this.resume();
};
CombinedStream.prototype.release = function() {
this.writable = true;
this._getNext();
};
CombinedStream.prototype._getNext = function() {
this._currentStream = null;
var stream = this._streams.shift();
if (!stream) {

@@ -52,19 +75,23 @@ this.end();

var getStream = stream;
var stream = getStream(function(stream) {
getStream(function(stream) {
stream.on('data', this._checkDataSize.bind(this));
this._register(stream);
this._pipeNext(stream);
}.bind(this));
if (stream) {
this._pipeNext(stream);
}
};
CombinedStream.prototype._pipeNext = function(stream) {
stream.on('end', function() {
this._getNext();
}.bind(this));
this._currentStream = stream;
stream.on('end', this._getNext.bind(this))
stream.pipe(this, {end: false});
};
CombinedStream.prototype._register = function(stream) {
var self = this;
stream.on('error', function(err) {
self._emitError(err);
});
};
CombinedStream.prototype.write = function(data) {

@@ -75,2 +102,6 @@ this.emit('data', data);

CombinedStream.prototype.pause = function() {
if (!this.pauseStreams) {
return;
}
this.emit('pause');

@@ -80,2 +111,8 @@ };

CombinedStream.prototype.resume = function() {
if (!this._released) {
this.released = true;
this.writable = true;
this._getNext();
}
this.emit('resume');

@@ -85,5 +122,48 @@ };

CombinedStream.prototype.end = function() {
this._reset();
this.emit('end');
};
CombinedStream.prototype.destroy = function() {
this._reset();
this.emit('close');
};
CombinedStream.prototype._reset = function() {
this.writable = false;
this._streams = [];
this.emit('end');
this._currentStream = null;
};
CombinedStream.prototype._checkDataSize = function() {
this._updateDataSize();
if (this.dataSize <= this.maxDataSize) {
return;
}
var message =
'DelayedStream#maxDataSize of ' + this.maxDataSize + ' bytes exceeded.'
this._emitError(new Error(message));
};
CombinedStream.prototype._updateDataSize = function() {
this.dataSize = 0;
var self = this;
this._streams.forEach(function(stream) {
if (!stream.dataSize) {
return;
}
self.dataSize += stream.dataSize;
});
if (this._currentStream && this._currentStream.dataSize) {
this.dataSize += this._currentStream.dataSize;
}
};
CombinedStream.prototype._emitError = function(err) {
this._reset();
this.emit('error', err);
};

12

package.json

@@ -5,3 +5,3 @@ {

"description": "A stream that emits multiple other streams one after another.",
"version": "0.0.0",
"version": "0.0.1",
"homepage": "https://github.com/felixge/node-combined-stream",

@@ -16,4 +16,8 @@ "repository": {

},
"dependencies": {},
"devDependencies": {}
}
"dependencies": {
"delayed-stream": "0.0.5"
},
"devDependencies": {
"far": "0.0.1"
}
}

@@ -25,5 +25,5 @@ # combined-stream

While the example above works great, it will buffer any `'data'` events emitted
by the second file, until the first file has finished emitting. So a more
efficient way is to provide the streams via a callback:
While the example above works great, it will pause all source streams until
they are needed. If you don't want that to happen, you can set `pauseStreams`
to `false`:

@@ -34,13 +34,5 @@ ``` javascript

var combinedStream = CombinedStream.create();
combinedStream.append(function() {
// You can either return streams directly
return fs.createReadStream('file1.txt');
});
combinedStream.append(function(next) {
setTimeout(function() {
// Or provide them to the next() function in an async fashion
next(fs.createReadStream('file2.txt'));
}, 100);
});
var combinedStream = CombinedStream.create({pauseStreams: false});
combinedStream.append(fs.createReadStream('file1.txt'));
combinedStream.append(fs.createReadStream('file2.txt'));

@@ -50,4 +42,6 @@ combinedStream.pipe(fs.createWriteStream('combined.txt'));

Last but not least, you can also ask combined-stream to apply back pressure
to the queued streams as neccesary to minimize buffering:
However, what if you don't have all the source streams yet, or you don't want
to allocate the resources (file descriptors, memory, etc.) for them right away?
Well, in that case you can simply provide a callback that supplies the stream
by calling a `next()` function:

@@ -58,5 +52,9 @@ ``` javascript

var combinedStream = CombinedStream.create({pauseStreams: true});
combinedStream.append(fs.createReadStream('file1.txt'));
combinedStream.append(fs.createReadStream('file2.txt'));
var combinedStream = CombinedStream.create();
combinedStream.append(function(next) {
next(fs.createReadStream('file1.txt'));
});
combinedStream.append(function(next) {
next(fs.createReadStream('file2.txt'));
});

@@ -66,4 +64,65 @@ combinedStream.pipe(fs.createWriteStream('combined.txt'));

In the case of files that is probably the best of the 3 approaches. But if you
are dealing with streams that you don't want to slow down, you should consider
either approach #1 or #2.
## API
### CombinedStream.create([options])
Returns a new combined stream object. Available options are:
* `maxDataSize`
* `pauseStreams`
The effect of those options is described below.
### combinedStream.pauseStreams = true
Whether to apply back pressure to the underlaying streams. If set to `false`,
the underlaying streams will never be paused. If set to `true`, the
underlaying streams will be paused right after being appended, as well as when
`delayedStream.pipe()` wants to throttle.
### combinedStream.maxDataSize = 2 * 1024 * 1024
The maximum amount of bytes (or characters) to buffer for all source streams.
If this value is exceeded, `combinedStream` emits an `'error'` event.
### combinedStream.dataSize = 0
The amount of bytes (or characters) currently buffered by `combinedStream`.
### combinedStream.append(stream)
Appends the given `stream` to the combinedStream object. If `pauseStreams` is
set to `true, this stream will also be paused right away.
`streams` can also be a function that takes one parameter called `next`. `next`
is a function that must be invoked in order to provide the `next` stream, see
example above.
Regardless of how the `stream` is appended, combined-stream always attaches an
`'error'` listener to it, so you don't have to do that manually.
### combinedStream.write(data)
You should not call this, `combinedStream` takes care of piping the appended
streams into itself for you.
### combinedStream.resume()
Causes `combinedStream` to start drain the streams it manages. The function is
idempotent, and also emits a `'resume'` event each time which usually goes to
the stream that is currently being drained.
### combinedStream.pause();
If `combinedStream.pauseStreams` is set to `false`, this does nothing.
Otherwise a `'pause'` event is emitted, this goes to the stream that is
currently being drained, so you can use it to apply back pressure.
### combinedStream.end();
Sets `combinedStream.writable` to false, emits an `'end'` event, and removes
all streams from the queue.
## License
combined-stream is licensed under the MIT license.

@@ -15,4 +15,4 @@ var common = require('../common');

});
combinedStream.append(function() {
return fs.createReadStream(FILE2);
combinedStream.append(function(next) {
next(fs.createReadStream(FILE2));
});

@@ -19,0 +19,0 @@

@@ -15,2 +15,9 @@ var common = require('../common');

var stream1 = combinedStream._streams[0];
var stream2 = combinedStream._streams[1];
stream1.on('end', function() {
assert.equal(stream2.dataSize, 0);
});
var tmpFile = common.dir.tmp + '/combined.txt';

@@ -17,0 +24,0 @@ var dest = fs.createWriteStream(tmpFile);

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc