Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

stream-stream

Package Overview
Dependencies
Maintainers
1
Versions
10
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

stream-stream - npm Package Compare versions

Comparing version 1.0.0 to 1.1.0

test/test-pipe.js

112

lib/StreamStream.js

@@ -12,6 +12,67 @@ var util = require('util');

// super
stream.Readable.call(this, options);
stream.Duplex.call(this, options);
options = options || {};
if(options) {
var separator = options.separator;
switch(typeof separator) {
// if the separator is a constant value
// we make it a function to be treated in the
// next case
case 'string':
var val = separator;
separator = function(cb) {
process.nextTick(function() {
cb(val);
})
}
// make this._separator() always return
// a stream filled with the results of the callback
// if the reset is a stream, pipe it as separator
case 'function':
var fn = separator;
separator = function() {
var ps = new stream.PassThrough();
ps._isSeparator = true;
fn(function(res) {
if(res.readable)
res.pipe(ps);
else
ps.end(res);
});
return ps;
}
break;
default:
separator = null;
break;
}
this._separator = separator;
}
this._readableState.objectMode = options.objectMode;
this._writableState.objectMode = true;
this.on('finish', function() {
this._ending = true;
var last = this._last();
var self = this;
if (last) {
last.once('end', _end.bind(this));
}
else {
_end.call(this);
}
});
this._hadFirstStream = false;
}
util.inherits(StreamStream, stream.Readable);
util.inherits(StreamStream, stream.Duplex);
function _end () {
this._ended = true;
this._output.end();
this.read(0);
}
StreamStream.prototype._read = function _read(size) {

@@ -33,3 +94,3 @@ var data = this._output.read(size);

StreamStream.prototype.write = function write(stream) {
StreamStream.prototype._write = function _write(stream, encoding, callback) {
var length = this._queue.push(stream);

@@ -39,11 +100,31 @@ if(length == 1) {

}
callback();
};
/**
* Start processing a stream from the queue
* the given stream MUST be the first in the queue
* @param stream
*/
StreamStream.prototype._startStream = function _startStream(stream) {
if(stream !== this._first()) {
throw new Error('Unexpected stream to start up');
return this.emit('error', new Error('Unexpected stream to start up'));
}
if(this._separator
&& !stream._isSeparator
&& this._hadFirstStream
&& !stream._separated
) {
var sep = this._separator();
// push the separator stream in front of the queue
this._queue.unshift(sep);
stream._separated = true;
return this._startStream(sep);
}
stream.once('end', function() {
if(stream !== this._first()) {
throw new Error('Unexpected stream to remove from the queue');
return this.emit('error',
new Error('Unexpected stream to remove from the queue'));
}

@@ -58,25 +139,6 @@ this._queue.shift();

stream.pipe(this._output, { end: false });
this._hadFirstStream = true;
this.read(0);
};
StreamStream.prototype.end = function end(stream) {
if(stream) this.write(stream);
this._ending = true;
var last = this._last();
if(last) {
last.once('end', this._end.bind(this));
}
else {
this._end();
}
};
StreamStream.prototype._end = function _end() {
this._ending = true;
this._ended = true;
this._output.end();
this.read(0);
};
StreamStream.prototype._last = function _last() {

@@ -83,0 +145,0 @@ if(this._queue.length == 0) return null;

{
"name": "stream-stream",
"version": "1.0.0",
"version": "1.1.0",
"description": "A stream of streams in order to concatenates the contents of several streams",

@@ -5,0 +5,0 @@ "main": "index.js",

@@ -11,6 +11,7 @@ var SS = require('../');

finished = true;
clearTimeout(to);
test.done();
});
setTimeout(function() {
var to = setTimeout(function() {
if(!finished) {

@@ -17,0 +18,0 @@ test.fail('No end detected');

@@ -13,2 +13,3 @@ var stream = require('stream');

done = true;
clearTimeout(to);
test.equal('hello world!', data, "Data in sink should be identical");

@@ -25,3 +26,3 @@ test.done();

setTimeout(function(){
var to = setTimeout(function(){
if(!done) {

@@ -28,0 +29,0 @@ test.fail('no end detected');

@@ -11,2 +11,3 @@ var stream = require('stream');

done = true;
clearTimeout(to);
test.equal('hello', data, "Data in sink should be identical");

@@ -19,3 +20,3 @@ test.done();

setTimeout(function(){
var to = setTimeout(function(){
if(!done) {

@@ -22,0 +23,0 @@ test.fail('no end detected');

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc