stream-stream
Advanced tools
Comparing version
@@ -12,6 +12,67 @@ var util = require('util'); | ||
// super | ||
stream.Readable.call(this, options); | ||
stream.Duplex.call(this, options); | ||
options = options || {}; | ||
if(options) { | ||
var separator = options.separator; | ||
switch(typeof separator) { | ||
// if the separator is a constant value | ||
// we make it a function to be treated in the | ||
// next case | ||
case 'string': | ||
var val = separator; | ||
separator = function(cb) { | ||
process.nextTick(function() { | ||
cb(val); | ||
}) | ||
} | ||
// make this._separator() always return | ||
// a stream filled with the results of the callback | ||
// if the reset is a stream, pipe it as separator | ||
case 'function': | ||
var fn = separator; | ||
separator = function() { | ||
var ps = new stream.PassThrough(); | ||
ps._isSeparator = true; | ||
fn(function(res) { | ||
if(res.readable) | ||
res.pipe(ps); | ||
else | ||
ps.end(res); | ||
}); | ||
return ps; | ||
} | ||
break; | ||
default: | ||
separator = null; | ||
break; | ||
} | ||
this._separator = separator; | ||
} | ||
this._readableState.objectMode = options.objectMode; | ||
this._writableState.objectMode = true; | ||
this.on('finish', function() { | ||
this._ending = true; | ||
var last = this._last(); | ||
var self = this; | ||
if (last) { | ||
last.once('end', _end.bind(this)); | ||
} | ||
else { | ||
_end.call(this); | ||
} | ||
}); | ||
this._hadFirstStream = false; | ||
} | ||
util.inherits(StreamStream, stream.Readable); | ||
util.inherits(StreamStream, stream.Duplex); | ||
function _end () { | ||
this._ended = true; | ||
this._output.end(); | ||
this.read(0); | ||
} | ||
StreamStream.prototype._read = function _read(size) { | ||
@@ -33,3 +94,3 @@ var data = this._output.read(size); | ||
StreamStream.prototype.write = function write(stream) { | ||
StreamStream.prototype._write = function _write(stream, encoding, callback) { | ||
var length = this._queue.push(stream); | ||
@@ -39,11 +100,31 @@ if(length == 1) { | ||
} | ||
callback(); | ||
}; | ||
/** | ||
* Start processing a stream from the queue | ||
* the given stream MUST be the first in the queue | ||
* @param stream | ||
*/ | ||
StreamStream.prototype._startStream = function _startStream(stream) { | ||
if(stream !== this._first()) { | ||
throw new Error('Unexpected stream to start up'); | ||
return this.emit('error', new Error('Unexpected stream to start up')); | ||
} | ||
if(this._separator | ||
&& !stream._isSeparator | ||
&& this._hadFirstStream | ||
&& !stream._separated | ||
) { | ||
var sep = this._separator(); | ||
// push the separator stream in front of the queue | ||
this._queue.unshift(sep); | ||
stream._separated = true; | ||
return this._startStream(sep); | ||
} | ||
stream.once('end', function() { | ||
if(stream !== this._first()) { | ||
throw new Error('Unexpected stream to remove from the queue'); | ||
return this.emit('error', | ||
new Error('Unexpected stream to remove from the queue')); | ||
} | ||
@@ -58,25 +139,6 @@ this._queue.shift(); | ||
stream.pipe(this._output, { end: false }); | ||
this._hadFirstStream = true; | ||
this.read(0); | ||
}; | ||
StreamStream.prototype.end = function end(stream) { | ||
if(stream) this.write(stream); | ||
this._ending = true; | ||
var last = this._last(); | ||
if(last) { | ||
last.once('end', this._end.bind(this)); | ||
} | ||
else { | ||
this._end(); | ||
} | ||
}; | ||
StreamStream.prototype._end = function _end() { | ||
this._ending = true; | ||
this._ended = true; | ||
this._output.end(); | ||
this.read(0); | ||
}; | ||
StreamStream.prototype._last = function _last() { | ||
@@ -83,0 +145,0 @@ if(this._queue.length == 0) return null; |
{ | ||
"name": "stream-stream", | ||
"version": "1.0.0", | ||
"version": "1.1.0", | ||
"description": "A stream of streams in order to concatenates the contents of several streams", | ||
@@ -5,0 +5,0 @@ "main": "index.js", |
@@ -11,6 +11,7 @@ var SS = require('../'); | ||
finished = true; | ||
clearTimeout(to); | ||
test.done(); | ||
}); | ||
setTimeout(function() { | ||
var to = setTimeout(function() { | ||
if(!finished) { | ||
@@ -17,0 +18,0 @@ test.fail('No end detected'); |
@@ -13,2 +13,3 @@ var stream = require('stream'); | ||
done = true; | ||
clearTimeout(to); | ||
test.equal('hello world!', data, "Data in sink should be identical"); | ||
@@ -25,3 +26,3 @@ test.done(); | ||
setTimeout(function(){ | ||
var to = setTimeout(function(){ | ||
if(!done) { | ||
@@ -28,0 +29,0 @@ test.fail('no end detected'); |
@@ -11,2 +11,3 @@ var stream = require('stream'); | ||
done = true; | ||
clearTimeout(to); | ||
test.equal('hello', data, "Data in sink should be identical"); | ||
@@ -19,3 +20,3 @@ test.done(); | ||
setTimeout(function(){ | ||
var to = setTimeout(function(){ | ||
if(!done) { | ||
@@ -22,0 +23,0 @@ test.fail('no end detected'); |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
14548
79.98%12
20%404
107.18%