stream-stream
Advanced tools
Comparing version 1.0.0 to 1.1.0
@@ -12,6 +12,67 @@ var util = require('util'); | ||
// super | ||
stream.Readable.call(this, options); | ||
stream.Duplex.call(this, options); | ||
options = options || {}; | ||
if(options) { | ||
var separator = options.separator; | ||
switch(typeof separator) { | ||
// if the separator is a constant value | ||
// we make it a function to be treated in the | ||
// next case | ||
case 'string': | ||
var val = separator; | ||
separator = function(cb) { | ||
process.nextTick(function() { | ||
cb(val); | ||
}) | ||
} | ||
// make this._separator() always return | ||
// a stream filled with the results of the callback | ||
// if the reset is a stream, pipe it as separator | ||
case 'function': | ||
var fn = separator; | ||
separator = function() { | ||
var ps = new stream.PassThrough(); | ||
ps._isSeparator = true; | ||
fn(function(res) { | ||
if(res.readable) | ||
res.pipe(ps); | ||
else | ||
ps.end(res); | ||
}); | ||
return ps; | ||
} | ||
break; | ||
default: | ||
separator = null; | ||
break; | ||
} | ||
this._separator = separator; | ||
} | ||
this._readableState.objectMode = options.objectMode; | ||
this._writableState.objectMode = true; | ||
this.on('finish', function() { | ||
this._ending = true; | ||
var last = this._last(); | ||
var self = this; | ||
if (last) { | ||
last.once('end', _end.bind(this)); | ||
} | ||
else { | ||
_end.call(this); | ||
} | ||
}); | ||
this._hadFirstStream = false; | ||
} | ||
util.inherits(StreamStream, stream.Readable); | ||
util.inherits(StreamStream, stream.Duplex); | ||
function _end () { | ||
this._ended = true; | ||
this._output.end(); | ||
this.read(0); | ||
} | ||
StreamStream.prototype._read = function _read(size) { | ||
@@ -33,3 +94,3 @@ var data = this._output.read(size); | ||
StreamStream.prototype.write = function write(stream) { | ||
StreamStream.prototype._write = function _write(stream, encoding, callback) { | ||
var length = this._queue.push(stream); | ||
@@ -39,11 +100,31 @@ if(length == 1) { | ||
} | ||
callback(); | ||
}; | ||
/** | ||
* Start processing a stream from the queue | ||
* the given stream MUST be the first in the queue | ||
* @param stream | ||
*/ | ||
StreamStream.prototype._startStream = function _startStream(stream) { | ||
if(stream !== this._first()) { | ||
throw new Error('Unexpected stream to start up'); | ||
return this.emit('error', new Error('Unexpected stream to start up')); | ||
} | ||
if(this._separator | ||
&& !stream._isSeparator | ||
&& this._hadFirstStream | ||
&& !stream._separated | ||
) { | ||
var sep = this._separator(); | ||
// push the separator stream in front of the queue | ||
this._queue.unshift(sep); | ||
stream._separated = true; | ||
return this._startStream(sep); | ||
} | ||
stream.once('end', function() { | ||
if(stream !== this._first()) { | ||
throw new Error('Unexpected stream to remove from the queue'); | ||
return this.emit('error', | ||
new Error('Unexpected stream to remove from the queue')); | ||
} | ||
@@ -58,25 +139,6 @@ this._queue.shift(); | ||
stream.pipe(this._output, { end: false }); | ||
this._hadFirstStream = true; | ||
this.read(0); | ||
}; | ||
StreamStream.prototype.end = function end(stream) { | ||
if(stream) this.write(stream); | ||
this._ending = true; | ||
var last = this._last(); | ||
if(last) { | ||
last.once('end', this._end.bind(this)); | ||
} | ||
else { | ||
this._end(); | ||
} | ||
}; | ||
StreamStream.prototype._end = function _end() { | ||
this._ending = true; | ||
this._ended = true; | ||
this._output.end(); | ||
this.read(0); | ||
}; | ||
StreamStream.prototype._last = function _last() { | ||
@@ -83,0 +145,0 @@ if(this._queue.length == 0) return null; |
{ | ||
"name": "stream-stream", | ||
"version": "1.0.0", | ||
"version": "1.1.0", | ||
"description": "A stream of streams in order to concatenates the contents of several streams", | ||
@@ -5,0 +5,0 @@ "main": "index.js", |
@@ -11,6 +11,7 @@ var SS = require('../'); | ||
finished = true; | ||
clearTimeout(to); | ||
test.done(); | ||
}); | ||
setTimeout(function() { | ||
var to = setTimeout(function() { | ||
if(!finished) { | ||
@@ -17,0 +18,0 @@ test.fail('No end detected'); |
@@ -13,2 +13,3 @@ var stream = require('stream'); | ||
done = true; | ||
clearTimeout(to); | ||
test.equal('hello world!', data, "Data in sink should be identical"); | ||
@@ -25,3 +26,3 @@ test.done(); | ||
setTimeout(function(){ | ||
var to = setTimeout(function(){ | ||
if(!done) { | ||
@@ -28,0 +29,0 @@ test.fail('no end detected'); |
@@ -11,2 +11,3 @@ var stream = require('stream'); | ||
done = true; | ||
clearTimeout(to); | ||
test.equal('hello', data, "Data in sink should be identical"); | ||
@@ -19,3 +20,3 @@ test.done(); | ||
setTimeout(function(){ | ||
var to = setTimeout(function(){ | ||
if(!done) { | ||
@@ -22,0 +23,0 @@ test.fail('no end detected'); |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
14548
12
404