Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

streamfilter

Package Overview
Dependencies
Maintainers
1
Versions
11
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

streamfilter - npm Package Compare versions

Comparing version 1.0.5 to 1.0.6

.codeclimate.yml

79

package.json
{
"name": "streamfilter",
"version": "1.0.5",
"version": "1.0.6",
"description": "Filtering streams.",
"main": "src/index.js",
"metapak": {
"configs": [
"readme",
"jsdocs",
"eslint",
"mocha",
"codeclimate",
"travis"
],
"data": {
"files": "src/*.js tests/*.mocha.js",
"testsFiles": "tests/*.mocha.js"
}
},
"scripts": {
"test": "mocha tests/*.mocha.js",
"coveralls": "./node_modules/istanbul/lib/cli.js cover ./node_modules/mocha/bin/_mocha --report lcovonly -- tests/*.mocha.js -R spec -t 5000 && cat ./coverage/lcov.info | coveralls && rm -rf ./coverage",
"cover": "./node_modules/istanbul/lib/cli.js cover ./node_modules/mocha/bin/_mocha --report html -- tests/*.mocha.js -R spec -t 5000",
"lint": "eslint **/*.s",
"cli": "env NPM_RUN_CLI=1"
"changelog": "conventional-changelog -p angular -i CHANGELOG.md -s",
"cli": "env NODE_ENV=${NODE_ENV:-cli}",
"cover": "istanbul cover _mocha --report html -- tests/*.mocha.js -R spec -t 5000",
"coveralls": "istanbul cover _mocha --report lcovonly -- tests/*.mocha.js -R spec -t 5000 && cat ./coverage/lcov.info | coveralls && rm -rf ./coverage",
"cz": "env NODE_ENV=${NODE_ENV:-cli} git cz",
"doc": "mkdir -p .readme; echo \"# API\" > .readme/API.md; jsdoc2md src/*.js tests/*.mocha.js >> .readme/API.md",
"lint": "eslint src/*.js tests/*.mocha.js",
"metapak": "metapak",
"mocha": "mocha tests/*.mocha.js",
"prettier": "prettier --write src/*.js tests/*.mocha.js",
"preversion": "npm t && npm run lint && npm run metapak -s",
"test": "npm run mocha",
"version": "npm run changelog && git add CHANGELOG.md"
},

@@ -29,12 +51,43 @@ "repository": {

"devDependencies": {
"coveralls": "^2.11.3",
"eslint": "^1.0.0",
"istanbul": "^0.3.17",
"mocha": "^2.2.5",
"sf-lint": "^1.0.2",
"streamtest": "^1.2.1"
"commitizen": "^2.9.6",
"conventional-changelog-cli": "^1.3.5",
"coveralls": "^2.13.3",
"cz-conventional-changelog": "^2.0.0",
"eslint": "^4.12.1",
"eslint-plugin-prettier": "^2.3.1",
"istanbul": "^0.4.5",
"jsdoc-to-markdown": "^3.0.2",
"metapak": "^1.0.2",
"metapak-nfroidure": "^2.0.2",
"mocha": "^3.5.3",
"mocha-lcov-reporter": "^1.3.0",
"prettier": "^1.8.2",
"streamtest": "^1.2.3"
},
"dependencies": {
"readable-stream": "^2.0.2"
"readable-stream": "^2.3.3"
},
"engines": {
"node": ">=6.9.5"
},
"config": {
"commitizen": {
"path": "./node_modules/cz-conventional-changelog"
}
},
"greenkeeper": {
"ignore": [
"commitizen",
"cz-conventional-changelog",
"conventional-changelog-cli",
"jsdoc-to-markdown",
"eslint",
"eslint-config-prettier",
"prettier",
"mocha",
"mocha-lcov-reporter",
"coveralls",
"istanbul"
]
}
}

@@ -0,8 +1,20 @@

<!--
# This file is automatically generated by a `metapak`
# module. Do not change it elsewhere, changes would
# be overridden.
-->
# streamfilter
> Filtering streams.
[![NPM version](https://badge.fury.io/js/streamfilter.svg)](https://npmjs.org/package/streamfilter)
[![Build status](https://secure.travis-ci.org/nfroidure/streamfilter.svg)](https://travis-ci.org/nfroidure/streamfilter)
[![Dependency Status](https://david-dm.org/nfroidure/streamfilter.svg)](https://david-dm.org/nfroidure/streamfilter)
[![devDependency Status](https://david-dm.org/nfroidure/streamfilter/dev-status.svg)](https://david-dm.org/nfroidure/streamfilter#info=devDependencies)
[![Coverage Status](https://coveralls.io/repos/nfroidure/streamfilter/badge.svg?branch=master)](https://coveralls.io/r/nfroidure/streamfilter?branch=master)
[![Code Climate](https://codeclimate.com/github/nfroidure/streamfilter.svg)](https://codeclimate.com/github/nfroidure/streamfilter)
[![Dependency Status](https://dependencyci.com/github/nfroidure/streamfilter/badge)](https://dependencyci.com/github/nfroidure/streamfilter)
`streamfilter` is a function based filter for streams inspired per gulp-filter
but no limited to Gulp nor to objectMode streams.
[![NPM version](https://badge.fury.io/js/streamfilter.png)](https://npmjs.org/package/streamfilter) [![Build status](https://secure.travis-ci.org/nfroidure/streamfilter.png)](https://travis-ci.org/nfroidure/streamfilter) [![Dependency Status](https://david-dm.org/nfroidure/streamfilter.png)](https://david-dm.org/nfroidure/streamfilter) [![devDependency Status](https://david-dm.org/nfroidure/streamfilter/dev-status.png)](https://david-dm.org/nfroidure/streamfilter#info=devDependencies) [![Coverage Status](https://coveralls.io/repos/nfroidure/streamfilter/badge.png?branch=master)](https://coveralls.io/r/nfroidure/streamfilter?branch=master) [![Code Climate](https://codeclimate.com/github/nfroidure/streamfilter.png)](https://codeclimate.com/github/nfroidure/streamfilter)
## Installation

@@ -136,1 +148,18 @@

# API
<a name="StreamFilter"></a>
## StreamFilter(filterCallback, options) ⇒ <code>Stream</code>
[StreamFilter description]
**Kind**: global function
**Returns**: <code>Stream</code> - The filtering stream
| Param | Type | Description |
| --- | --- | --- |
| filterCallback | <code>function</code> | Callback applying the filters |
| options | <code>Object</code> | Filtering options |
# License
[MIT](https://github.com/nfroidure/streamfilter/blob/master/LICENSE)

68

src/index.js
'use strict';
var stream = require('readable-stream');
var util = require('util');
const stream = require('readable-stream');
const util = require('util');
/**
* [StreamFilter description]
* @param {Function} filterCallback Callback applying the filters
* @param {Object} options Filtering options
* @returns {Stream} The filtering stream
*/
function StreamFilter(filterCallback, options) {
var _this = this;
const _this = this;
// Ensure new is called
if(!(this instanceof StreamFilter)) {
if (!(this instanceof StreamFilter)) {
return new StreamFilter(filterCallback, options);

@@ -15,3 +21,3 @@ }

// filter callback is required
if(!(filterCallback instanceof Function)) {
if (!(filterCallback instanceof Function)) {
throw new Error('filterCallback must be a function.');

@@ -23,3 +29,3 @@ }

options.restore = options.restore || false;
options.passthrough = options.restore && options.passthrough || false;
options.passthrough = (options.restore && options.passthrough) || false;

@@ -31,12 +37,14 @@ this._filterStreamEnded = false;

filterCallback(chunk, encoding, function StreamFilterCallback(filter) {
if(!filter) {
if (!filter) {
_this.push(chunk, encoding);
done();
} else if(options.restore) {
_this._restoreManager.programPush(chunk, encoding, function() {
return;
}
if (options.restore) {
_this._restoreManager.programPush(chunk, encoding, () => {
done();
});
} else {
done();
return;
}
done();
});

@@ -47,9 +55,9 @@ };

this._filterStreamEnded = true;
done();
if(options.restore) {
if(!options.passthrough) {
this._restoreManager.programPush(null, {}.undef, function() {
done(); // eslint-disable-line
if (options.restore) {
if (!options.passthrough) {
this._restoreManager.programPush(null, {}.undef, () => {
done();
});
} else if(this._restoreStreamCallback) {
} else if (this._restoreStreamCallback) {
this._restoreStreamCallback();

@@ -63,7 +71,11 @@ }

// Creating the restored stream if necessary
if(options.restore) {
if(options.passthrough) {
if (options.restore) {
if (options.passthrough) {
this.restore = new stream.Duplex(options);
this._restoreManager = createReadStreamBackpressureManager(this.restore);
this.restore._write = function streamFilterRestoreWrite(chunk, encoding, done) {
this.restore._write = function streamFilterRestoreWrite(
chunk,
encoding,
done
) {
_this._restoreManager.programPush(chunk, encoding, done);

@@ -73,6 +85,6 @@ };

this.restore.on('finish', function streamFilterRestoreFinish() {
_this._restoreStreamCallback = function() {
_this._restoreManager.programPush(null, {}.undef, function() {});
_this._restoreStreamCallback = () => {
_this._restoreManager.programPush(null, {}.undef, () => {});
};
if(_this._filterStreamEnded) {
if (_this._filterStreamEnded) {
_this._restoreStreamCallback();

@@ -92,3 +104,3 @@ }

function createReadStreamBackpressureManager(readableStream) {
var manager = {
const manager = {
waitPush: true,

@@ -107,12 +119,12 @@ programmedPushs: [],

attemptPush: function attemptPush() {
var nextPush;
let nextPush;
if(manager.waitPush) {
if(manager.programmedPushs.length) {
if (manager.waitPush) {
if (manager.programmedPushs.length) {
nextPush = manager.programmedPushs.shift();
manager.waitPush = readableStream.push(nextPush[0], nextPush[1]);
(nextPush[2])();
nextPush[2]();
}
} else {
setImmediate(function() {
setImmediate(() => {
// Need to be async to avoid nested push attempts

@@ -119,0 +131,0 @@ readableStream.emit('readable');

@@ -0,55 +1,59 @@

/* eslint max-nested-callbacks:[0], no-magic-numbers:[0] */
'use strict';
var assert = require('assert');
var Stream = require('stream');
var StreamTest = require('streamtest');
var StreamFilter = require('../src/index');
const assert = require('assert');
const Stream = require('stream');
const StreamTest = require('streamtest');
const StreamFilter = require('../src/index');
describe('StreamFilter', function() {
describe('should fail', function() {
it('if options.filter is not a function', function() {
assert.throws(function() {
new StreamFilter();
});
}, /Error/);
describe('StreamFilter', () => {
describe('should fail', () => {
it(
'if options.filter is not a function',
() => {
assert.throws(() => {
new StreamFilter(); // eslint-disable-line
});
},
/Error/
);
});
describe('should work', function() {
describe('should work', () => {
it('should work without new', () => {
const createFilter = StreamFilter;
it('should work without new', function() {
var createFilter = StreamFilter;
assert(createFilter(function() {}) instanceof StreamFilter);
assert(createFilter(() => {}) instanceof StreamFilter);
});
});
// Iterating through versions
StreamTest.versions.forEach(function(version) {
StreamTest.versions.forEach(version => {
describe('for ' + version + ' streams', () => {
describe('in object mode', () => {
describe('should work', () => {
const object1 = { test: 'plop' };
const object2 = { test: 'plop2' };
const object3 = { test: 'plop3' };
describe('for ' + version + ' streams', function() {
describe('in object mode', function() {
describe('should work', function() {
var object1 = { test: 'plop' };
var object2 = { test: 'plop2' };
var object3 = { test: 'plop3' };
it('with no restore option', function(done) {
var inputStream = StreamTest[version].fromObjects([object1, object2]);
var filter = new StreamFilter(function(obj, unused, cb) {
if(obj === object2) {
return cb(true);
it('with no restore option', done => {
const inputStream = StreamTest[version].fromObjects([
object1,
object2,
]);
const filter = new StreamFilter(
(obj, unused, cb) => {
if (obj === object2) {
return cb(true);
}
return cb(false);
},
{
objectMode: true,
}
return cb(false);
}, {
objectMode: true,
});
var outputStream = StreamTest[version].toObjects(function(err, objs) {
if(err) {
return done(err);
);
const outputStream = StreamTest[version].toObjects((err, objs) => {
if (err) {
done(err);
return;
}

@@ -63,25 +67,35 @@ assert.deepEqual(objs, [object1]);

it('with restore option', function(done) {
var inputStream = StreamTest[version].fromObjects([object1, object2]);
var filter = new StreamFilter(function(obj, unused, cb) {
if(obj === object2) {
return cb(true);
it('with restore option', done => {
const inputStream = StreamTest[version].fromObjects([
object1,
object2,
]);
const filter = new StreamFilter(
(obj, unused, cb) => {
if (obj === object2) {
return cb(true);
}
return cb(false);
},
{
objectMode: true,
restore: true,
}
return cb(false);
}, {
objectMode: true,
restore: true,
});
var outputStream = StreamTest[version].toObjects(function(err, objs) {
if(err) {
return done(err);
);
const outputStream = StreamTest[version].toObjects((err, objs) => {
if (err) {
done(err);
return;
}
assert.deepEqual(objs, [object1]);
filter.restore.pipe(StreamTest[version].toObjects(function(err2, objs2) {
if(err2) {
return done(err2);
}
assert.deepEqual(objs2, [object2]);
done();
}));
filter.restore.pipe(
StreamTest[version].toObjects((err2, objs2) => {
if (err2) {
done(err2);
return;
}
assert.deepEqual(objs2, [object2]);
done();
})
);
});

@@ -92,42 +106,107 @@

it('with restore option and more than 16 nested objects', function(done) {
var nDone = 0;
var inputStream = StreamTest[version].fromObjects([
object1, object2, object1, object2, object1, object2, object1, object2,
object1, object2, object1, object2, object1, object2, object1, object2,
object1, object2, object1, object2, object1, object2, object1, object2,
object1, object2, object1, object2, object1, object2, object1, object2,
object1, object2, object1, object2, object1, object2, object1, object2,
object1, object2, object1, object2, object1, object2, object1, object2,
object1, object2, object1, object2, object1, object2, object1, object2,
object1, object2, object1, object2, object1, object2, object1, object2,
it('with restore option and more than 16 nested objects', done => {
let nDone = 0;
const inputStream = StreamTest[version].fromObjects([
object1,
object2,
object1,
object2,
object1,
object2,
object1,
object2,
object1,
object2,
object1,
object2,
object1,
object2,
object1,
object2,
object1,
object2,
object1,
object2,
object1,
object2,
object1,
object2,
object1,
object2,
object1,
object2,
object1,
object2,
object1,
object2,
object1,
object2,
object1,
object2,
object1,
object2,
object1,
object2,
object1,
object2,
object1,
object2,
object1,
object2,
object1,
object2,
object1,
object2,
object1,
object2,
object1,
object2,
object1,
object2,
object1,
object2,
object1,
object2,
object1,
object2,
object1,
object2,
]);
var filter = new StreamFilter(function(obj, unused, cb) {
if(obj === object2) {
return cb(true);
const filter = new StreamFilter(
(obj, unused, cb) => {
if (obj === object2) {
return cb(true);
}
return cb(false);
},
{
objectMode: true,
restore: true,
}
return cb(false);
}, {
objectMode: true,
restore: true,
});
var outputStream = StreamTest[version].toObjects(function(err, objs) {
if(err) {
return done(err);
);
const outputStream = StreamTest[version].toObjects((err, objs) => {
if (err) {
done(err);
return;
}
assert.equal(objs.length, 32);
if(2 === ++nDone) {
if (2 === ++nDone) {
done();
return;
}
});
filter.restore.pipe(StreamTest[version].toObjects(function(err2, objs2) {
if(err2) {
return done(err2);
}
assert.equal(objs2.length, 32);
if(2 === ++nDone) {
done();
}
}));
filter.restore.pipe(
StreamTest[version].toObjects((err2, objs2) => {
if (err2) {
done(err2);
return;
}
assert.equal(objs2.length, 32);
if (2 === ++nDone) {
done();
return;
}
})
);

@@ -137,40 +216,91 @@ inputStream.pipe(filter).pipe(outputStream);

it('with restore option and more than 16 objects', function(done) {
var nDone = 0;
var inputStream = StreamTest[version].fromObjects([
object1, object1, object1, object1, object1, object1, object1, object1,
object1, object1, object1, object1, object1, object1, object1, object1,
object1, object1, object1, object1, object1, object1, object1, object1,
object2, object2, object2, object2, object2, object2, object2, object2,
object2, object2, object2, object2, object2, object2, object2, object2,
object2, object2, object2, object2, object2, object2, object2, object2,
it('with restore option and more than 16 objects', done => {
let nDone = 0;
const inputStream = StreamTest[version].fromObjects([
object1,
object1,
object1,
object1,
object1,
object1,
object1,
object1,
object1,
object1,
object1,
object1,
object1,
object1,
object1,
object1,
object1,
object1,
object1,
object1,
object1,
object1,
object1,
object1,
object2,
object2,
object2,
object2,
object2,
object2,
object2,
object2,
object2,
object2,
object2,
object2,
object2,
object2,
object2,
object2,
object2,
object2,
object2,
object2,
object2,
object2,
object2,
object2,
]);
var filter = new StreamFilter(function(obj, unused, cb) {
if(obj === object2) {
return cb(true);
const filter = new StreamFilter(
(obj, unused, cb) => {
if (obj === object2) {
return cb(true);
}
return cb(false);
},
{
objectMode: true,
restore: true,
}
return cb(false);
}, {
objectMode: true,
restore: true,
});
var outputStream = StreamTest[version].toObjects(function(err, objs) {
if(err) {
return done(err);
);
const outputStream = StreamTest[version].toObjects((err, objs) => {
if (err) {
done(err);
return;
}
assert.equal(objs.length, 24);
if(2 === ++nDone) {
if (2 === ++nDone) {
done();
return;
}
});
filter.restore.pipe(StreamTest[version].toObjects(function(err2, objs2) {
if(err2) {
return done(err2);
}
assert.equal(objs2.length, 24);
if(2 === ++nDone) {
done();
}
}));
filter.restore.pipe(
StreamTest[version].toObjects((err2, objs2) => {
if (err2) {
done(err2);
return;
}
assert.equal(objs2.length, 24);
if (2 === ++nDone) {
done();
return;
}
})
);

@@ -180,28 +310,40 @@ inputStream.pipe(filter).pipe(outputStream);

it('with restore and passthrough option in a different pipeline', function(done) {
var inputStream = StreamTest[version].fromObjects([object1, object2]);
var filter = new StreamFilter(function(obj, unused, cb) {
if(obj === object2) {
return cb(true);
it('with restore and passthrough option in a different pipeline', done => {
const inputStream = StreamTest[version].fromObjects([
object1,
object2,
]);
const filter = new StreamFilter(
(obj, unused, cb) => {
if (obj === object2) {
return cb(true);
}
return cb(false);
},
{
objectMode: true,
restore: true,
passthrough: true,
}
return cb(false);
}, {
objectMode: true,
restore: true,
passthrough: true,
});
var outputStream = StreamTest[version].toObjects(function(err, objs) {
if(err) {
return done(err);
);
const outputStream = StreamTest[version].toObjects((err, objs) => {
if (err) {
done(err);
return;
}
assert.deepEqual(objs, [object1]);
filter.restore.pipe(StreamTest[version].toObjects(function(err2, objs2) {
if(err2) {
return done(err2);
}
assert.deepEqual(objs2, [object3, object2]);
done();
}));
filter.restore.pipe(
StreamTest[version].toObjects((err2, objs2) => {
if (err2) {
done(err2);
return;
}
assert.deepEqual(objs2, [object3, object2]);
done();
})
);
});
var restoreInputStream = StreamTest[version].fromObjects([object3]);
const restoreInputStream = StreamTest[version].fromObjects([
object3,
]);

@@ -212,20 +354,28 @@ inputStream.pipe(filter).pipe(outputStream);

it('with restore and passthrough option in the same pipeline', function(done) {
var passThroughStream1Ended = false;
var passThroughStream2Ended = false;
var duplexStreamEnded = false;
var inputStream = StreamTest[version].fromObjects([object1, object2, object3]);
var filter = new StreamFilter(function(chunk, encoding, cb) {
if(chunk === object2) {
return cb(true);
it('with restore and passthrough option in the same pipeline', done => {
let passThroughStream1Ended = false;
let passThroughStream2Ended = false;
let duplexStreamEnded = false;
const inputStream = StreamTest[version].fromObjects([
object1,
object2,
object3,
]);
const filter = new StreamFilter(
(chunk, encoding, cb) => {
if (chunk === object2) {
return cb(true);
}
return cb(false);
},
{
objectMode: true,
restore: true,
passthrough: true,
}
return cb(false);
}, {
objectMode: true,
restore: true,
passthrough: true,
});
var outputStream = StreamTest[version].toObjects(function(err, objs) {
if(err) {
return done(err);
);
const outputStream = StreamTest[version].toObjects((err, objs) => {
if (err) {
done(err);
return;
}

@@ -235,41 +385,54 @@ assert.deepEqual(objs, [object1, object2, object3]);

});
var duplexStream = new Stream.Duplex({ objectMode: true });
const duplexStream = new Stream.Duplex({ objectMode: true });
duplexStream._write = function(obj, unused, cb) {
duplexStream._write = (obj, unused, cb) => {
duplexStream.push(obj);
setImmediate(cb);
};
duplexStream._read = function() {};
duplexStream.on('finish', function() {
setTimeout(function() {
duplexStream._read = () => {};
duplexStream.on('finish', () => {
setTimeout(() => {
duplexStream.push(null);
}, 100);
});
outputStream.on('end', function() {
assert(passThroughStream1Ended,
'PassThrough stream ends before the output one.');
assert(passThroughStream2Ended,
'PassThrough stream ends before the output one.');
assert(duplexStreamEnded,
'Duplex stream ends before the output one.');
outputStream.on('end', () => {
assert(
passThroughStream1Ended,
'PassThrough stream ends before the output one.'
);
assert(
passThroughStream2Ended,
'PassThrough stream ends before the output one.'
);
assert(
duplexStreamEnded,
'Duplex stream ends before the output one.'
);
});
filter.restore.on('end', function() {
assert(passThroughStream1Ended,
'PassThrough stream ends before the restore one.');
assert(passThroughStream2Ended,
'PassThrough stream ends before the restore one.');
assert(duplexStreamEnded,
'Duplex stream ends before the restore one.');
filter.restore.on('end', () => {
assert(
passThroughStream1Ended,
'PassThrough stream ends before the restore one.'
);
assert(
passThroughStream2Ended,
'PassThrough stream ends before the restore one.'
);
assert(
duplexStreamEnded,
'Duplex stream ends before the restore one.'
);
});
inputStream.pipe(filter)
inputStream
.pipe(filter)
.pipe(new Stream.PassThrough({ objectMode: true }))
.on('end', function() {
.on('end', () => {
passThroughStream1Ended = true;
})
.pipe(new Stream.PassThrough({ objectMode: true }))
.on('end', function() {
.on('end', () => {
passThroughStream2Ended = true;
})
.pipe(duplexStream)
.on('end', function() {
.on('end', () => {
duplexStreamEnded = true;

@@ -281,98 +444,125 @@ })

it('with restore and passthrough option in the same pipeline and a buffered stream', function(done) {
var passThroughStream1Ended = false;
var passThroughStream2Ended = false;
var duplexStreamEnded = false;
var inputStream = StreamTest[version].fromObjects([object1, object2, object3]);
var filter = new StreamFilter(function(chunk, encoding, cb) {
if(chunk === object2) {
return cb(true);
}
return cb(false);
}, {
objectMode: true,
restore: true,
passthrough: true,
});
var outputStream = StreamTest[version].toObjects(function(err, objs) {
if(err) {
return done(err);
}
assert.equal(objs.length, 3);
setImmediate(done);
});
var duplexStream = new Stream.Duplex({ objectMode: true });
it(
'with restore and passthrough option in the same' +
' pipeline and a buffered stream',
done => {
let passThroughStream1Ended = false;
let passThroughStream2Ended = false;
let duplexStreamEnded = false;
const inputStream = StreamTest[version].fromObjects([
object1,
object2,
object3,
]);
const filter = new StreamFilter(
(chunk, encoding, cb) => {
if (chunk === object2) {
return cb(true);
}
return cb(false);
},
{
objectMode: true,
restore: true,
passthrough: true,
}
);
const outputStream = StreamTest[version].toObjects(
(err, objs) => {
if (err) {
done(err);
return;
}
assert.equal(objs.length, 3);
setImmediate(done);
}
);
const duplexStream = new Stream.Duplex({ objectMode: true });
duplexStream._objs = [];
duplexStream._write = function(obj, unused, cb) {
duplexStream._objs.push(obj);
cb();
};
duplexStream._read = function() {
var obj;
duplexStream._objs = [];
duplexStream._write = (obj, unused, cb) => {
duplexStream._objs.push(obj);
cb();
};
duplexStream._read = () => {
let obj;
if(duplexStream._hasFinished) {
while(duplexStream._objs.length) {
obj = duplexStream._objs.shift();
if(!duplexStream.push(obj)) {
break;
if (duplexStream._hasFinished) {
while (duplexStream._objs.length) {
obj = duplexStream._objs.shift();
if (!duplexStream.push(obj)) {
break;
}
}
if (0 === duplexStream._objs.length) {
duplexStream.push(null);
}
}
if(0 === duplexStream._objs.length) {
duplexStream.push(null);
}
}
};
duplexStream.on('finish', function() {
duplexStream._hasFinished = true;
duplexStream._read();
});
outputStream.on('end', function() {
assert(passThroughStream1Ended,
'PassThrough stream ends before the output one.');
assert(passThroughStream2Ended,
'PassThrough stream ends before the output one.');
assert(duplexStreamEnded,
'Duplex stream ends before the output one.');
});
filter.restore.on('end', function() {
assert(passThroughStream1Ended,
'PassThrough stream ends before the restore one.');
assert(passThroughStream2Ended,
'PassThrough stream ends before the restore one.');
assert(duplexStreamEnded,
'Duplex stream ends before the restore one.');
});
inputStream.pipe(filter)
.pipe(new Stream.PassThrough({ objectMode: true }))
.on('end', function() {
passThroughStream1Ended = true;
})
.pipe(new Stream.PassThrough({ objectMode: true }))
.on('end', function() {
passThroughStream2Ended = true;
})
.pipe(duplexStream)
.on('end', function() {
duplexStreamEnded = true;
})
.pipe(filter.restore)
.pipe(outputStream);
});
};
duplexStream.on('finish', () => {
duplexStream._hasFinished = true;
duplexStream._read();
});
outputStream.on('end', () => {
assert(
passThroughStream1Ended,
'PassThrough stream ends before the output one.'
);
assert(
passThroughStream2Ended,
'PassThrough stream ends before the output one.'
);
assert(
duplexStreamEnded,
'Duplex stream ends before the output one.'
);
});
filter.restore.on('end', () => {
assert(
passThroughStream1Ended,
'PassThrough stream ends before the restore one.'
);
assert(
passThroughStream2Ended,
'PassThrough stream ends before the restore one.'
);
assert(
duplexStreamEnded,
'Duplex stream ends before the restore one.'
);
});
inputStream
.pipe(filter)
.pipe(new Stream.PassThrough({ objectMode: true }))
.on('end', () => {
passThroughStream1Ended = true;
})
.pipe(new Stream.PassThrough({ objectMode: true }))
.on('end', () => {
passThroughStream2Ended = true;
})
.pipe(duplexStream)
.on('end', () => {
duplexStreamEnded = true;
})
.pipe(filter.restore)
.pipe(outputStream);
}
);
});
});
describe('in buffer mode', function() {
describe('in buffer mode', () => {
describe('should work', () => {
const buffer1 = new Buffer('plop');
const buffer2 = new Buffer('plop2');
const buffer3 = new Buffer('plop3');
describe('should work', function() {
var buffer1 = new Buffer('plop');
var buffer2 = new Buffer('plop2');
var buffer3 = new Buffer('plop3');
it('with no restore option', function(done) {
var inputStream = StreamTest[version].fromChunks([buffer1, buffer2]);
var filter = new StreamFilter(function(chunk, encoding, cb) {
if(chunk.toString() === buffer1.toString()) {
it('with no restore option', done => {
const inputStream = StreamTest[version].fromChunks([
buffer1,
buffer2,
]);
const filter = new StreamFilter((chunk, encoding, cb) => {
if (chunk.toString() === buffer1.toString()) {
return cb(true);

@@ -382,5 +572,6 @@ }

});
var outputStream = StreamTest[version].toText(function(err, text) {
if(err) {
return done(err);
const outputStream = StreamTest[version].toText((err, text) => {
if (err) {
done(err);
return;
}

@@ -394,24 +585,34 @@ assert.equal(text, buffer2.toString());

it('with restore option', function(done) {
var inputStream = StreamTest[version].fromChunks([buffer1, buffer2]);
var filter = new StreamFilter(function(chunk, encoding, cb) {
if(chunk.toString() === buffer2.toString()) {
return cb(true);
it('with restore option', done => {
const inputStream = StreamTest[version].fromChunks([
buffer1,
buffer2,
]);
const filter = new StreamFilter(
(chunk, encoding, cb) => {
if (chunk.toString() === buffer2.toString()) {
return cb(true);
}
return cb(false);
},
{
restore: true,
}
return cb(false);
}, {
restore: true,
});
var outputStream = StreamTest[version].toText(function(err, text) {
if(err) {
return done(err);
);
const outputStream = StreamTest[version].toText((err, text) => {
if (err) {
done(err);
return;
}
assert.equal(text, buffer1.toString());
filter.restore.pipe(StreamTest[version].toText(function(err2, text2) {
if(err2) {
return done(err2);
}
assert.equal(text2, buffer2.toString());
done();
}));
filter.restore.pipe(
StreamTest[version].toText((err2, text2) => {
if (err2) {
done(err2);
return;
}
assert.equal(text2, buffer2.toString());
done();
})
);
});

@@ -422,27 +623,42 @@

it('with restore and passthrough option', function(done) {
var inputStream = StreamTest[version].fromChunks([buffer1, buffer2]);
var filter = new StreamFilter(function(chunk, encoding, cb) {
if(chunk.toString() === buffer2.toString()) {
return cb(true);
it('with restore and passthrough option', done => {
const inputStream = StreamTest[version].fromChunks([
buffer1,
buffer2,
]);
const filter = new StreamFilter(
(chunk, encoding, cb) => {
if (chunk.toString() === buffer2.toString()) {
return cb(true);
}
return cb(false);
},
{
restore: true,
passthrough: true,
}
return cb(false);
}, {
restore: true,
passthrough: true,
});
var outputStream = StreamTest[version].toText(function(err, text) {
if(err) {
return done(err);
);
const outputStream = StreamTest[version].toText((err, text) => {
if (err) {
done(err);
return;
}
assert.equal(text, buffer1.toString());
filter.restore.pipe(StreamTest[version].toText(function(err2, text2) {
if(err2) {
return done(err2);
}
assert.deepEqual(text2, [buffer3.toString(), buffer2.toString()].join(''));
done();
}));
filter.restore.pipe(
StreamTest[version].toText((err2, text2) => {
if (err2) {
done(err2);
return;
}
assert.deepEqual(
text2,
[buffer3.toString(), buffer2.toString()].join('')
);
done();
})
);
});
var restoreInputStream = StreamTest[version].fromChunks([buffer3]);
const restoreInputStream = StreamTest[version].fromChunks([
buffer3,
]);

@@ -452,11 +668,6 @@ inputStream.pipe(filter).pipe(outputStream);

});
});
});
});
});
});

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc