combined-stream
Advanced tools
Comparing version 0.0.0 to 0.0.1
@@ -9,8 +9,20 @@ var util = require('util'); | ||
this.readable = true; | ||
this.dataSize = 0; | ||
this.maxDataSize = 2 * 1024 * 1024; | ||
this.pauseStreams = true; | ||
this._released = false; | ||
this._streams = []; | ||
this._currentStream = null; | ||
} | ||
util.inherits(CombinedStream, Stream); | ||
CombinedStream.create = function() { | ||
CombinedStream.create = function(options) { | ||
var combinedStream = new this(); | ||
options = options || {}; | ||
for (var option in options) { | ||
combinedStream[option] = options[option]; | ||
} | ||
return combinedStream; | ||
@@ -21,6 +33,20 @@ }; | ||
if (typeof stream !== 'function') { | ||
stream = DelayedStream.create(stream); | ||
if (!(stream instanceof DelayedStream)) { | ||
stream.on('data', this._checkDataSize.bind(this)); | ||
stream = DelayedStream.create(stream, { | ||
maxDataSize: Infinity, | ||
pauseStream: this.pauseStreams, | ||
}); | ||
} | ||
this._register(stream); | ||
if (this.pauseStreams) { | ||
stream.pause(); | ||
} | ||
} | ||
this._streams.push(stream); | ||
return this; | ||
}; | ||
@@ -30,12 +56,9 @@ | ||
Stream.prototype.pipe.call(this, dest, options); | ||
this.release(); | ||
this.resume(); | ||
}; | ||
CombinedStream.prototype.release = function() { | ||
this.writable = true; | ||
this._getNext(); | ||
}; | ||
CombinedStream.prototype._getNext = function() { | ||
this._currentStream = null; | ||
var stream = this._streams.shift(); | ||
if (!stream) { | ||
@@ -52,19 +75,23 @@ this.end(); | ||
var getStream = stream; | ||
var stream = getStream(function(stream) { | ||
getStream(function(stream) { | ||
stream.on('data', this._checkDataSize.bind(this)); | ||
this._register(stream); | ||
this._pipeNext(stream); | ||
}.bind(this)); | ||
if (stream) { | ||
this._pipeNext(stream); | ||
} | ||
}; | ||
CombinedStream.prototype._pipeNext = function(stream) { | ||
stream.on('end', function() { | ||
this._getNext(); | ||
}.bind(this)); | ||
this._currentStream = stream; | ||
stream.on('end', this._getNext.bind(this)) | ||
stream.pipe(this, {end: false}); | ||
}; | ||
CombinedStream.prototype._register = function(stream) { | ||
var self = this; | ||
stream.on('error', function(err) { | ||
self._emitError(err); | ||
}); | ||
}; | ||
CombinedStream.prototype.write = function(data) { | ||
@@ -75,2 +102,6 @@ this.emit('data', data); | ||
CombinedStream.prototype.pause = function() { | ||
if (!this.pauseStreams) { | ||
return; | ||
} | ||
this.emit('pause'); | ||
@@ -80,2 +111,8 @@ }; | ||
CombinedStream.prototype.resume = function() { | ||
if (!this._released) { | ||
this.released = true; | ||
this.writable = true; | ||
this._getNext(); | ||
} | ||
this.emit('resume'); | ||
@@ -85,5 +122,48 @@ }; | ||
CombinedStream.prototype.end = function() { | ||
this._reset(); | ||
this.emit('end'); | ||
}; | ||
CombinedStream.prototype.destroy = function() { | ||
this._reset(); | ||
this.emit('close'); | ||
}; | ||
CombinedStream.prototype._reset = function() { | ||
this.writable = false; | ||
this._streams = []; | ||
this.emit('end'); | ||
this._currentStream = null; | ||
}; | ||
CombinedStream.prototype._checkDataSize = function() { | ||
this._updateDataSize(); | ||
if (this.dataSize <= this.maxDataSize) { | ||
return; | ||
} | ||
var message = | ||
'DelayedStream#maxDataSize of ' + this.maxDataSize + ' bytes exceeded.' | ||
this._emitError(new Error(message)); | ||
}; | ||
CombinedStream.prototype._updateDataSize = function() { | ||
this.dataSize = 0; | ||
var self = this; | ||
this._streams.forEach(function(stream) { | ||
if (!stream.dataSize) { | ||
return; | ||
} | ||
self.dataSize += stream.dataSize; | ||
}); | ||
if (this._currentStream && this._currentStream.dataSize) { | ||
this.dataSize += this._currentStream.dataSize; | ||
} | ||
}; | ||
CombinedStream.prototype._emitError = function(err) { | ||
this._reset(); | ||
this.emit('error', err); | ||
}; |
@@ -5,3 +5,3 @@ { | ||
"description": "A stream that emits multiple other streams one after another.", | ||
"version": "0.0.0", | ||
"version": "0.0.1", | ||
"homepage": "https://github.com/felixge/node-combined-stream", | ||
@@ -16,4 +16,8 @@ "repository": { | ||
}, | ||
"dependencies": {}, | ||
"devDependencies": {} | ||
} | ||
"dependencies": { | ||
"delayed-stream": "0.0.5" | ||
}, | ||
"devDependencies": { | ||
"far": "0.0.1" | ||
} | ||
} |
103
Readme.md
@@ -25,5 +25,5 @@ # combined-stream | ||
While the example above works great, it will buffer any `'data'` events emitted | ||
by the second file, until the first file has finished emitting. So a more | ||
efficient way is to provide the streams via a callback: | ||
While the example above works great, it will pause all source streams until | ||
they are needed. If you don't want that to happen, you can set `pauseStreams` | ||
to `false`: | ||
@@ -34,13 +34,5 @@ ``` javascript | ||
var combinedStream = CombinedStream.create(); | ||
combinedStream.append(function() { | ||
// You can either return streams directly | ||
return fs.createReadStream('file1.txt'); | ||
}); | ||
combinedStream.append(function(next) { | ||
setTimeout(function() { | ||
// Or provide them to the next() function in an async fashion | ||
next(fs.createReadStream('file2.txt')); | ||
}, 100); | ||
}); | ||
var combinedStream = CombinedStream.create({pauseStreams: false}); | ||
combinedStream.append(fs.createReadStream('file1.txt')); | ||
combinedStream.append(fs.createReadStream('file2.txt')); | ||
@@ -50,4 +42,6 @@ combinedStream.pipe(fs.createWriteStream('combined.txt')); | ||
Last but not least, you can also ask combined-stream to apply back pressure | ||
to the queued streams as neccesary to minimize buffering: | ||
However, what if you don't have all the source streams yet, or you don't want | ||
to allocate the resources (file descriptors, memory, etc.) for them right away? | ||
Well, in that case you can simply provide a callback that supplies the stream | ||
by calling a `next()` function: | ||
@@ -58,5 +52,9 @@ ``` javascript | ||
var combinedStream = CombinedStream.create({pauseStreams: true}); | ||
combinedStream.append(fs.createReadStream('file1.txt')); | ||
combinedStream.append(fs.createReadStream('file2.txt')); | ||
var combinedStream = CombinedStream.create(); | ||
combinedStream.append(function(next) { | ||
next(fs.createReadStream('file1.txt')); | ||
}); | ||
combinedStream.append(function(next) { | ||
next(fs.createReadStream('file2.txt')); | ||
}); | ||
@@ -66,4 +64,65 @@ combinedStream.pipe(fs.createWriteStream('combined.txt')); | ||
In the case of files that is probably the best of the 3 approaches. But if you | ||
are dealing with streams that you don't want to slow down, you should consider | ||
either approach #1 or #2. | ||
## API | ||
### CombinedStream.create([options]) | ||
Returns a new combined stream object. Available options are: | ||
* `maxDataSize` | ||
* `pauseStreams` | ||
The effect of those options is described below. | ||
### combinedStream.pauseStreams = true | ||
Whether to apply back pressure to the underlaying streams. If set to `false`, | ||
the underlaying streams will never be paused. If set to `true`, the | ||
underlaying streams will be paused right after being appended, as well as when | ||
`delayedStream.pipe()` wants to throttle. | ||
### combinedStream.maxDataSize = 2 * 1024 * 1024 | ||
The maximum amount of bytes (or characters) to buffer for all source streams. | ||
If this value is exceeded, `combinedStream` emits an `'error'` event. | ||
### combinedStream.dataSize = 0 | ||
The amount of bytes (or characters) currently buffered by `combinedStream`. | ||
### combinedStream.append(stream) | ||
Appends the given `stream` to the combinedStream object. If `pauseStreams` is | ||
set to `true, this stream will also be paused right away. | ||
`streams` can also be a function that takes one parameter called `next`. `next` | ||
is a function that must be invoked in order to provide the `next` stream, see | ||
example above. | ||
Regardless of how the `stream` is appended, combined-stream always attaches an | ||
`'error'` listener to it, so you don't have to do that manually. | ||
### combinedStream.write(data) | ||
You should not call this, `combinedStream` takes care of piping the appended | ||
streams into itself for you. | ||
### combinedStream.resume() | ||
Causes `combinedStream` to start drain the streams it manages. The function is | ||
idempotent, and also emits a `'resume'` event each time which usually goes to | ||
the stream that is currently being drained. | ||
### combinedStream.pause(); | ||
If `combinedStream.pauseStreams` is set to `false`, this does nothing. | ||
Otherwise a `'pause'` event is emitted, this goes to the stream that is | ||
currently being drained, so you can use it to apply back pressure. | ||
### combinedStream.end(); | ||
Sets `combinedStream.writable` to false, emits an `'end'` event, and removes | ||
all streams from the queue. | ||
## License | ||
combined-stream is licensed under the MIT license. |
@@ -15,4 +15,4 @@ var common = require('../common'); | ||
}); | ||
combinedStream.append(function() { | ||
return fs.createReadStream(FILE2); | ||
combinedStream.append(function(next) { | ||
next(fs.createReadStream(FILE2)); | ||
}); | ||
@@ -19,0 +19,0 @@ |
@@ -15,2 +15,9 @@ var common = require('../common'); | ||
var stream1 = combinedStream._streams[0]; | ||
var stream2 = combinedStream._streams[1]; | ||
stream1.on('end', function() { | ||
assert.equal(stream2.dataSize, 0); | ||
}); | ||
var tmpFile = common.dir.tmp + '/combined.txt'; | ||
@@ -17,0 +24,0 @@ var dest = fs.createWriteStream(tmpFile); |
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
Network access
Supply chain riskThis module accesses the network.
Found 1 instance in 1 package
Shell access
Supply chain riskThis module accesses the system shell. Accessing the system shell increases the risk of executing arbitrary code.
Found 1 instance in 1 package
Dynamic require
Supply chain riskDynamic require can indicate the package is performing dangerous or unsafe dynamic code execution.
Found 1 instance in 1 package
124
5
1
96689
1
1
16
261
+ Addeddelayed-stream@0.0.5
+ Addeddelayed-stream@0.0.5(transitive)