Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

stream-stream

Package Overview
Dependencies
Maintainers
1
Versions
10
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

stream-stream - npm Package Compare versions

Comparing version 1.2.0 to 1.2.1

112

lib/StreamStream.js

@@ -12,3 +12,3 @@ var util = require('util');

// super
stream.Duplex.call(this, options);
stream.Transform.call(this, options);
options = options || {};

@@ -55,101 +55,27 @@

this._writableState.objectMode = true;
this.on('finish', function() {
this._ending = true;
var last = this._last();
if (last) {
last.once('end', _end.bind(this));
}
else {
_end.call(this);
}
});
this._hadFirstStream = false;
}
util.inherits(StreamStream, stream.Duplex);
util.inherits(StreamStream, stream.Transform);
function _end () {
this._ended = true;
this._output.end();
this.read(0);
}
StreamStream.prototype._read = function _read(size) {
var data = this._output.read(size);
var res;
if(data === null) {
if(this._ended)
res = this.push(null);
else if(this._readableState.objectMode)
this._readableState.reading = false;
else
res = this.push('');
StreamStream.prototype._transform = function _transform(stream, encoding, done) {
if(this._lastStream && !this._lastStream._readableState.ended) {
return done(new Error('There still a stream active'));
}
else {
res = this.push(data);
if(this._lastStream && this._separator && this._needSeparator) {
var withSep = new StreamStream(this._readableState.objectMode);
withSep.write(this._separator());
withSep.end(stream);
stream = withSep;
}
if(res === false) {
// STOP READING
}
var self = this;
stream.on('readable', function() {
var chunk = stream.read();
if(chunk !== null) self.push(chunk);
});
stream.on('end', done);
this._needSeparator = true;
this._lastStream = stream;
};
StreamStream.prototype._write = function _write(stream, encoding, callback) {
var length = this._queue.push(stream);
if(length == 1) {
this._startStream(stream);
}
callback();
};
/**
* Start processing a stream from the queue
* the given stream MUST be the first in the queue
* @param stream
*/
StreamStream.prototype._startStream = function _startStream(stream) {
if(stream !== this._first()) {
return this.emit('error', new Error('Unexpected stream to start up'));
}
if(this._separator
&& !stream._isSeparator
&& this._hadFirstStream
&& !stream._separated
) {
var sep = this._separator();
// push the separator stream in front of the queue
this._queue.unshift(sep);
stream._separated = true;
return this._startStream(sep);
}
stream.once('end', function() {
if(stream !== this._first()) {
return this.emit('error',
new Error('Unexpected stream to remove from the queue'));
}
this._queue.shift();
var first = this._first();
if(first) {
this._startStream(first);
}
}.bind(this));
stream.pipe(this._output, { end: false });
this._hadFirstStream = true;
this.read(0);
};
StreamStream.prototype._last = function _last() {
if(this._queue.length == 0) return null;
return this._queue[this._queue.length - 1];
};
StreamStream.prototype._first = function _first() {
return this._queue[0];
};
module.exports = StreamStream;

4

package.json
{
"name": "stream-stream",
"version": "1.2.0",
"version": "1.2.1",
"description": "A stream of streams in order to concatenates the contents of several streams",

@@ -25,3 +25,3 @@ "main": "index.js",

"nodeunit": "0.7.x",
"stream-sink": "1.x.x"
"stream-sink": "~1.1"
},

@@ -28,0 +28,0 @@ "author": "Florent Jaby <florent.jaby@gmail.com>",

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc