stream-stream
Advanced tools
Comparing version 1.2.0 to 1.2.1
@@ -12,3 +12,3 @@ var util = require('util'); | ||
// super | ||
stream.Duplex.call(this, options); | ||
stream.Transform.call(this, options); | ||
options = options || {}; | ||
@@ -55,101 +55,27 @@ | ||
this._writableState.objectMode = true; | ||
this.on('finish', function() { | ||
this._ending = true; | ||
var last = this._last(); | ||
if (last) { | ||
last.once('end', _end.bind(this)); | ||
} | ||
else { | ||
_end.call(this); | ||
} | ||
}); | ||
this._hadFirstStream = false; | ||
} | ||
util.inherits(StreamStream, stream.Duplex); | ||
util.inherits(StreamStream, stream.Transform); | ||
function _end () { | ||
this._ended = true; | ||
this._output.end(); | ||
this.read(0); | ||
} | ||
StreamStream.prototype._read = function _read(size) { | ||
var data = this._output.read(size); | ||
var res; | ||
if(data === null) { | ||
if(this._ended) | ||
res = this.push(null); | ||
else if(this._readableState.objectMode) | ||
this._readableState.reading = false; | ||
else | ||
res = this.push(''); | ||
StreamStream.prototype._transform = function _transform(stream, encoding, done) { | ||
if(this._lastStream && !this._lastStream._readableState.ended) { | ||
return done(new Error('There still a stream active')); | ||
} | ||
else { | ||
res = this.push(data); | ||
if(this._lastStream && this._separator && this._needSeparator) { | ||
var withSep = new StreamStream(this._readableState.objectMode); | ||
withSep.write(this._separator()); | ||
withSep.end(stream); | ||
stream = withSep; | ||
} | ||
if(res === false) { | ||
// STOP READING | ||
} | ||
var self = this; | ||
stream.on('readable', function() { | ||
var chunk = stream.read(); | ||
if(chunk !== null) self.push(chunk); | ||
}); | ||
stream.on('end', done); | ||
this._needSeparator = true; | ||
this._lastStream = stream; | ||
}; | ||
StreamStream.prototype._write = function _write(stream, encoding, callback) { | ||
var length = this._queue.push(stream); | ||
if(length == 1) { | ||
this._startStream(stream); | ||
} | ||
callback(); | ||
}; | ||
/** | ||
* Start processing a stream from the queue | ||
* the given stream MUST be the first in the queue | ||
* @param stream | ||
*/ | ||
StreamStream.prototype._startStream = function _startStream(stream) { | ||
if(stream !== this._first()) { | ||
return this.emit('error', new Error('Unexpected stream to start up')); | ||
} | ||
if(this._separator | ||
&& !stream._isSeparator | ||
&& this._hadFirstStream | ||
&& !stream._separated | ||
) { | ||
var sep = this._separator(); | ||
// push the separator stream in front of the queue | ||
this._queue.unshift(sep); | ||
stream._separated = true; | ||
return this._startStream(sep); | ||
} | ||
stream.once('end', function() { | ||
if(stream !== this._first()) { | ||
return this.emit('error', | ||
new Error('Unexpected stream to remove from the queue')); | ||
} | ||
this._queue.shift(); | ||
var first = this._first(); | ||
if(first) { | ||
this._startStream(first); | ||
} | ||
}.bind(this)); | ||
stream.pipe(this._output, { end: false }); | ||
this._hadFirstStream = true; | ||
this.read(0); | ||
}; | ||
StreamStream.prototype._last = function _last() { | ||
if(this._queue.length == 0) return null; | ||
return this._queue[this._queue.length - 1]; | ||
}; | ||
StreamStream.prototype._first = function _first() { | ||
return this._queue[0]; | ||
}; | ||
module.exports = StreamStream; | ||
{ | ||
"name": "stream-stream", | ||
"version": "1.2.0", | ||
"version": "1.2.1", | ||
"description": "A stream of streams in order to concatenates the contents of several streams", | ||
@@ -25,3 +25,3 @@ "main": "index.js", | ||
"nodeunit": "0.7.x", | ||
"stream-sink": "1.x.x" | ||
"stream-sink": "~1.1" | ||
}, | ||
@@ -28,0 +28,0 @@ "author": "Florent Jaby <florent.jaby@gmail.com>", |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
15444
404