Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

streamfilter

Package Overview
Dependencies
Maintainers
1
Versions
11
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

streamfilter - npm Package Compare versions

Comparing version 1.0.4 to 1.0.5

2

package.json
{
"name": "streamfilter",
"version": "1.0.4",
"version": "1.0.5",
"description": "Filtering streams.",

@@ -5,0 +5,0 @@ "main": "src/index.js",

@@ -33,9 +33,5 @@ 'use strict';

} else if(options.restore) {
if(options.passthrough) {
_this.restore.write(chunk, encoding, done);
} else {
_this.restore.__programPush(chunk, encoding, function() {
done();
});
}
_this._restoreManager.programPush(chunk, encoding, function() {
done();
});
} else {

@@ -52,3 +48,5 @@ done();

if(!options.passthrough) {
this.restore.push(null);
this._restoreManager.programPush(null, {}.undef, function() {
done();
});
} else if(this._restoreStreamCallback) {

@@ -65,63 +63,68 @@ this._restoreStreamCallback();

if(options.passthrough) {
this.restore = new stream.Transform(options);
this.restore._transform = function streamFilterRestoreTransform(chunk, encoding, done) {
_this.restore.push(chunk, encoding);
done();
this.restore = new stream.Duplex(options);
this._restoreManager = createReadStreamBackpressureManager(this.restore);
this.restore._write = function streamFilterRestoreWrite(chunk, encoding, done) {
_this._restoreManager.programPush(chunk, encoding, done);
};
this.restore._flush = function streamFilterRestoreFlush(done) {
_this._restoreStreamCallback = done;
this.restore.on('finish', function streamFilterRestoreFinish() {
_this._restoreStreamCallback = function() {
_this._restoreManager.programPush(null, {}.undef, function() {});
};
if(_this._filterStreamEnded) {
done();
_this._restoreStreamCallback();
}
};
});
} else {
this.restore = new stream.Readable(options);
this.restore.__waitPush = true;
this.restore.__programmedPush = null;
this._restoreManager = createReadStreamBackpressureManager(this.restore);
}
}
}
this.restore.__programPush = function streamFilterRestoreProgramPush(chunk, encoding, done) {
if(_this.restore.__programmedPush) {
_this.restore.emit('error', new Error('Not supposed to happen!'));
}
_this.restore.__programmedPush = [chunk, encoding, done];
// Need to be async to avoid nested push attempts
setImmediate(_this.restore.__attemptPush.bind(_this.restore));
_this.restore.emit('readable');
_this.restore.emit('drain');
};
util.inherits(StreamFilter, stream.Transform);
this.restore.__attemptPush = function streamFilterRestoreAttemptPush() {
var cb = null;
// Utils to manage readable stream backpressure
function createReadStreamBackpressureManager(readableStream) {
var manager = {
waitPush: true,
programmedPushs: [],
programPush: function programPush(chunk, encoding, done) {
// Store the current write
manager.programmedPushs.push([chunk, encoding, done]);
// Need to be async to avoid nested push attempts
// Programm a push attempt
setImmediate(manager.attemptPush);
// Let's say we're ready for a read
readableStream.emit('readable');
readableStream.emit('drain');
},
attemptPush: function attemptPush() {
var nextPush;
if(_this.restore.__waitPush) {
if(_this.restore.__programmedPush) {
cb = _this.restore.__programmedPush[2];
_this.restore.__waitPush = _this.restore.push(
_this.restore.__programmedPush[0],
_this.restore.__programmedPush[1]
);
_this.restore.__programmedPush = null;
cb();
}
} else {
setImmediate(function() {
// Need to be async to avoid nested push attempts
_this.restore.emit('readable');
});
if(manager.waitPush) {
if(manager.programmedPushs.length) {
nextPush = manager.programmedPushs.shift();
manager.waitPush = readableStream.push(nextPush[0], nextPush[1]);
(nextPush[2])();
}
};
} else {
setImmediate(function() {
// Need to be async to avoid nested push attempts
readableStream.emit('readable');
});
}
},
};
this.restore._read = function streamFilterRestoreRead() {
_this.restore.__waitPush = true;
// Need to be async to avoid nested push attempts
setImmediate(_this.restore.__attemptPush.bind(this));
};
}
}
// Patch the readable stream to manage reads
readableStream._read = function streamFilterRestoreRead() {
manager.waitPush = true;
// Need to be async to avoid nested push attempts
setImmediate(manager.attemptPush);
};
return manager;
}
util.inherits(StreamFilter, stream.Transform);
module.exports = StreamFilter;
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc