streamfilter
Advanced tools
Comparing version 1.0.5 to 1.0.6
{ | ||
"name": "streamfilter", | ||
"version": "1.0.5", | ||
"version": "1.0.6", | ||
"description": "Filtering streams.", | ||
"main": "src/index.js", | ||
"metapak": { | ||
"configs": [ | ||
"readme", | ||
"jsdocs", | ||
"eslint", | ||
"mocha", | ||
"codeclimate", | ||
"travis" | ||
], | ||
"data": { | ||
"files": "src/*.js tests/*.mocha.js", | ||
"testsFiles": "tests/*.mocha.js" | ||
} | ||
}, | ||
"scripts": { | ||
"test": "mocha tests/*.mocha.js", | ||
"coveralls": "./node_modules/istanbul/lib/cli.js cover ./node_modules/mocha/bin/_mocha --report lcovonly -- tests/*.mocha.js -R spec -t 5000 && cat ./coverage/lcov.info | coveralls && rm -rf ./coverage", | ||
"cover": "./node_modules/istanbul/lib/cli.js cover ./node_modules/mocha/bin/_mocha --report html -- tests/*.mocha.js -R spec -t 5000", | ||
"lint": "eslint **/*.s", | ||
"cli": "env NPM_RUN_CLI=1" | ||
"changelog": "conventional-changelog -p angular -i CHANGELOG.md -s", | ||
"cli": "env NODE_ENV=${NODE_ENV:-cli}", | ||
"cover": "istanbul cover _mocha --report html -- tests/*.mocha.js -R spec -t 5000", | ||
"coveralls": "istanbul cover _mocha --report lcovonly -- tests/*.mocha.js -R spec -t 5000 && cat ./coverage/lcov.info | coveralls && rm -rf ./coverage", | ||
"cz": "env NODE_ENV=${NODE_ENV:-cli} git cz", | ||
"doc": "mkdir -p .readme; echo \"# API\" > .readme/API.md; jsdoc2md src/*.js tests/*.mocha.js >> .readme/API.md", | ||
"lint": "eslint src/*.js tests/*.mocha.js", | ||
"metapak": "metapak", | ||
"mocha": "mocha tests/*.mocha.js", | ||
"prettier": "prettier --write src/*.js tests/*.mocha.js", | ||
"preversion": "npm t && npm run lint && npm run metapak -s", | ||
"test": "npm run mocha", | ||
"version": "npm run changelog && git add CHANGELOG.md" | ||
}, | ||
@@ -29,12 +51,43 @@ "repository": { | ||
"devDependencies": { | ||
"coveralls": "^2.11.3", | ||
"eslint": "^1.0.0", | ||
"istanbul": "^0.3.17", | ||
"mocha": "^2.2.5", | ||
"sf-lint": "^1.0.2", | ||
"streamtest": "^1.2.1" | ||
"commitizen": "^2.9.6", | ||
"conventional-changelog-cli": "^1.3.5", | ||
"coveralls": "^2.13.3", | ||
"cz-conventional-changelog": "^2.0.0", | ||
"eslint": "^4.12.1", | ||
"eslint-plugin-prettier": "^2.3.1", | ||
"istanbul": "^0.4.5", | ||
"jsdoc-to-markdown": "^3.0.2", | ||
"metapak": "^1.0.2", | ||
"metapak-nfroidure": "^2.0.2", | ||
"mocha": "^3.5.3", | ||
"mocha-lcov-reporter": "^1.3.0", | ||
"prettier": "^1.8.2", | ||
"streamtest": "^1.2.3" | ||
}, | ||
"dependencies": { | ||
"readable-stream": "^2.0.2" | ||
"readable-stream": "^2.3.3" | ||
}, | ||
"engines": { | ||
"node": ">=6.9.5" | ||
}, | ||
"config": { | ||
"commitizen": { | ||
"path": "./node_modules/cz-conventional-changelog" | ||
} | ||
}, | ||
"greenkeeper": { | ||
"ignore": [ | ||
"commitizen", | ||
"cz-conventional-changelog", | ||
"conventional-changelog-cli", | ||
"jsdoc-to-markdown", | ||
"eslint", | ||
"eslint-config-prettier", | ||
"prettier", | ||
"mocha", | ||
"mocha-lcov-reporter", | ||
"coveralls", | ||
"istanbul" | ||
] | ||
} | ||
} |
@@ -0,8 +1,20 @@ | ||
<!-- | ||
# This file is automatically generated by a `metapak` | ||
# module. Do not change it elsewhere, changes would | ||
# be overridden. | ||
--> | ||
# streamfilter | ||
> Filtering streams. | ||
[![NPM version](https://badge.fury.io/js/streamfilter.svg)](https://npmjs.org/package/streamfilter) | ||
[![Build status](https://secure.travis-ci.org/nfroidure/streamfilter.svg)](https://travis-ci.org/nfroidure/streamfilter) | ||
[![Dependency Status](https://david-dm.org/nfroidure/streamfilter.svg)](https://david-dm.org/nfroidure/streamfilter) | ||
[![devDependency Status](https://david-dm.org/nfroidure/streamfilter/dev-status.svg)](https://david-dm.org/nfroidure/streamfilter#info=devDependencies) | ||
[![Coverage Status](https://coveralls.io/repos/nfroidure/streamfilter/badge.svg?branch=master)](https://coveralls.io/r/nfroidure/streamfilter?branch=master) | ||
[![Code Climate](https://codeclimate.com/github/nfroidure/streamfilter.svg)](https://codeclimate.com/github/nfroidure/streamfilter) | ||
[![Dependency Status](https://dependencyci.com/github/nfroidure/streamfilter/badge)](https://dependencyci.com/github/nfroidure/streamfilter) | ||
`streamfilter` is a function based filter for streams inspired per gulp-filter | ||
but no limited to Gulp nor to objectMode streams. | ||
[![NPM version](https://badge.fury.io/js/streamfilter.png)](https://npmjs.org/package/streamfilter) [![Build status](https://secure.travis-ci.org/nfroidure/streamfilter.png)](https://travis-ci.org/nfroidure/streamfilter) [![Dependency Status](https://david-dm.org/nfroidure/streamfilter.png)](https://david-dm.org/nfroidure/streamfilter) [![devDependency Status](https://david-dm.org/nfroidure/streamfilter/dev-status.png)](https://david-dm.org/nfroidure/streamfilter#info=devDependencies) [![Coverage Status](https://coveralls.io/repos/nfroidure/streamfilter/badge.png?branch=master)](https://coveralls.io/r/nfroidure/streamfilter?branch=master) [![Code Climate](https://codeclimate.com/github/nfroidure/streamfilter.png)](https://codeclimate.com/github/nfroidure/streamfilter) | ||
## Installation | ||
@@ -136,1 +148,18 @@ | ||
# API | ||
<a name="StreamFilter"></a> | ||
## StreamFilter(filterCallback, options) ⇒ <code>Stream</code> | ||
[StreamFilter description] | ||
**Kind**: global function | ||
**Returns**: <code>Stream</code> - The filtering stream | ||
| Param | Type | Description | | ||
| --- | --- | --- | | ||
| filterCallback | <code>function</code> | Callback applying the filters | | ||
| options | <code>Object</code> | Filtering options | | ||
# License | ||
[MIT](https://github.com/nfroidure/streamfilter/blob/master/LICENSE) |
'use strict'; | ||
var stream = require('readable-stream'); | ||
var util = require('util'); | ||
const stream = require('readable-stream'); | ||
const util = require('util'); | ||
/** | ||
* [StreamFilter description] | ||
* @param {Function} filterCallback Callback applying the filters | ||
* @param {Object} options Filtering options | ||
* @returns {Stream} The filtering stream | ||
*/ | ||
function StreamFilter(filterCallback, options) { | ||
var _this = this; | ||
const _this = this; | ||
// Ensure new is called | ||
if(!(this instanceof StreamFilter)) { | ||
if (!(this instanceof StreamFilter)) { | ||
return new StreamFilter(filterCallback, options); | ||
@@ -15,3 +21,3 @@ } | ||
// filter callback is required | ||
if(!(filterCallback instanceof Function)) { | ||
if (!(filterCallback instanceof Function)) { | ||
throw new Error('filterCallback must be a function.'); | ||
@@ -23,3 +29,3 @@ } | ||
options.restore = options.restore || false; | ||
options.passthrough = options.restore && options.passthrough || false; | ||
options.passthrough = (options.restore && options.passthrough) || false; | ||
@@ -31,12 +37,14 @@ this._filterStreamEnded = false; | ||
filterCallback(chunk, encoding, function StreamFilterCallback(filter) { | ||
if(!filter) { | ||
if (!filter) { | ||
_this.push(chunk, encoding); | ||
done(); | ||
} else if(options.restore) { | ||
_this._restoreManager.programPush(chunk, encoding, function() { | ||
return; | ||
} | ||
if (options.restore) { | ||
_this._restoreManager.programPush(chunk, encoding, () => { | ||
done(); | ||
}); | ||
} else { | ||
done(); | ||
return; | ||
} | ||
done(); | ||
}); | ||
@@ -47,9 +55,9 @@ }; | ||
this._filterStreamEnded = true; | ||
done(); | ||
if(options.restore) { | ||
if(!options.passthrough) { | ||
this._restoreManager.programPush(null, {}.undef, function() { | ||
done(); // eslint-disable-line | ||
if (options.restore) { | ||
if (!options.passthrough) { | ||
this._restoreManager.programPush(null, {}.undef, () => { | ||
done(); | ||
}); | ||
} else if(this._restoreStreamCallback) { | ||
} else if (this._restoreStreamCallback) { | ||
this._restoreStreamCallback(); | ||
@@ -63,7 +71,11 @@ } | ||
// Creating the restored stream if necessary | ||
if(options.restore) { | ||
if(options.passthrough) { | ||
if (options.restore) { | ||
if (options.passthrough) { | ||
this.restore = new stream.Duplex(options); | ||
this._restoreManager = createReadStreamBackpressureManager(this.restore); | ||
this.restore._write = function streamFilterRestoreWrite(chunk, encoding, done) { | ||
this.restore._write = function streamFilterRestoreWrite( | ||
chunk, | ||
encoding, | ||
done | ||
) { | ||
_this._restoreManager.programPush(chunk, encoding, done); | ||
@@ -73,6 +85,6 @@ }; | ||
this.restore.on('finish', function streamFilterRestoreFinish() { | ||
_this._restoreStreamCallback = function() { | ||
_this._restoreManager.programPush(null, {}.undef, function() {}); | ||
_this._restoreStreamCallback = () => { | ||
_this._restoreManager.programPush(null, {}.undef, () => {}); | ||
}; | ||
if(_this._filterStreamEnded) { | ||
if (_this._filterStreamEnded) { | ||
_this._restoreStreamCallback(); | ||
@@ -92,3 +104,3 @@ } | ||
function createReadStreamBackpressureManager(readableStream) { | ||
var manager = { | ||
const manager = { | ||
waitPush: true, | ||
@@ -107,12 +119,12 @@ programmedPushs: [], | ||
attemptPush: function attemptPush() { | ||
var nextPush; | ||
let nextPush; | ||
if(manager.waitPush) { | ||
if(manager.programmedPushs.length) { | ||
if (manager.waitPush) { | ||
if (manager.programmedPushs.length) { | ||
nextPush = manager.programmedPushs.shift(); | ||
manager.waitPush = readableStream.push(nextPush[0], nextPush[1]); | ||
(nextPush[2])(); | ||
nextPush[2](); | ||
} | ||
} else { | ||
setImmediate(function() { | ||
setImmediate(() => { | ||
// Need to be async to avoid nested push attempts | ||
@@ -119,0 +131,0 @@ readableStream.emit('readable'); |
@@ -0,55 +1,59 @@ | ||
/* eslint max-nested-callbacks:[0], no-magic-numbers:[0] */ | ||
'use strict'; | ||
var assert = require('assert'); | ||
var Stream = require('stream'); | ||
var StreamTest = require('streamtest'); | ||
var StreamFilter = require('../src/index'); | ||
const assert = require('assert'); | ||
const Stream = require('stream'); | ||
const StreamTest = require('streamtest'); | ||
const StreamFilter = require('../src/index'); | ||
describe('StreamFilter', function() { | ||
describe('should fail', function() { | ||
it('if options.filter is not a function', function() { | ||
assert.throws(function() { | ||
new StreamFilter(); | ||
}); | ||
}, /Error/); | ||
describe('StreamFilter', () => { | ||
describe('should fail', () => { | ||
it( | ||
'if options.filter is not a function', | ||
() => { | ||
assert.throws(() => { | ||
new StreamFilter(); // eslint-disable-line | ||
}); | ||
}, | ||
/Error/ | ||
); | ||
}); | ||
describe('should work', function() { | ||
describe('should work', () => { | ||
it('should work without new', () => { | ||
const createFilter = StreamFilter; | ||
it('should work without new', function() { | ||
var createFilter = StreamFilter; | ||
assert(createFilter(function() {}) instanceof StreamFilter); | ||
assert(createFilter(() => {}) instanceof StreamFilter); | ||
}); | ||
}); | ||
// Iterating through versions | ||
StreamTest.versions.forEach(function(version) { | ||
StreamTest.versions.forEach(version => { | ||
describe('for ' + version + ' streams', () => { | ||
describe('in object mode', () => { | ||
describe('should work', () => { | ||
const object1 = { test: 'plop' }; | ||
const object2 = { test: 'plop2' }; | ||
const object3 = { test: 'plop3' }; | ||
describe('for ' + version + ' streams', function() { | ||
describe('in object mode', function() { | ||
describe('should work', function() { | ||
var object1 = { test: 'plop' }; | ||
var object2 = { test: 'plop2' }; | ||
var object3 = { test: 'plop3' }; | ||
it('with no restore option', function(done) { | ||
var inputStream = StreamTest[version].fromObjects([object1, object2]); | ||
var filter = new StreamFilter(function(obj, unused, cb) { | ||
if(obj === object2) { | ||
return cb(true); | ||
it('with no restore option', done => { | ||
const inputStream = StreamTest[version].fromObjects([ | ||
object1, | ||
object2, | ||
]); | ||
const filter = new StreamFilter( | ||
(obj, unused, cb) => { | ||
if (obj === object2) { | ||
return cb(true); | ||
} | ||
return cb(false); | ||
}, | ||
{ | ||
objectMode: true, | ||
} | ||
return cb(false); | ||
}, { | ||
objectMode: true, | ||
}); | ||
var outputStream = StreamTest[version].toObjects(function(err, objs) { | ||
if(err) { | ||
return done(err); | ||
); | ||
const outputStream = StreamTest[version].toObjects((err, objs) => { | ||
if (err) { | ||
done(err); | ||
return; | ||
} | ||
@@ -63,25 +67,35 @@ assert.deepEqual(objs, [object1]); | ||
it('with restore option', function(done) { | ||
var inputStream = StreamTest[version].fromObjects([object1, object2]); | ||
var filter = new StreamFilter(function(obj, unused, cb) { | ||
if(obj === object2) { | ||
return cb(true); | ||
it('with restore option', done => { | ||
const inputStream = StreamTest[version].fromObjects([ | ||
object1, | ||
object2, | ||
]); | ||
const filter = new StreamFilter( | ||
(obj, unused, cb) => { | ||
if (obj === object2) { | ||
return cb(true); | ||
} | ||
return cb(false); | ||
}, | ||
{ | ||
objectMode: true, | ||
restore: true, | ||
} | ||
return cb(false); | ||
}, { | ||
objectMode: true, | ||
restore: true, | ||
}); | ||
var outputStream = StreamTest[version].toObjects(function(err, objs) { | ||
if(err) { | ||
return done(err); | ||
); | ||
const outputStream = StreamTest[version].toObjects((err, objs) => { | ||
if (err) { | ||
done(err); | ||
return; | ||
} | ||
assert.deepEqual(objs, [object1]); | ||
filter.restore.pipe(StreamTest[version].toObjects(function(err2, objs2) { | ||
if(err2) { | ||
return done(err2); | ||
} | ||
assert.deepEqual(objs2, [object2]); | ||
done(); | ||
})); | ||
filter.restore.pipe( | ||
StreamTest[version].toObjects((err2, objs2) => { | ||
if (err2) { | ||
done(err2); | ||
return; | ||
} | ||
assert.deepEqual(objs2, [object2]); | ||
done(); | ||
}) | ||
); | ||
}); | ||
@@ -92,42 +106,107 @@ | ||
it('with restore option and more than 16 nested objects', function(done) { | ||
var nDone = 0; | ||
var inputStream = StreamTest[version].fromObjects([ | ||
object1, object2, object1, object2, object1, object2, object1, object2, | ||
object1, object2, object1, object2, object1, object2, object1, object2, | ||
object1, object2, object1, object2, object1, object2, object1, object2, | ||
object1, object2, object1, object2, object1, object2, object1, object2, | ||
object1, object2, object1, object2, object1, object2, object1, object2, | ||
object1, object2, object1, object2, object1, object2, object1, object2, | ||
object1, object2, object1, object2, object1, object2, object1, object2, | ||
object1, object2, object1, object2, object1, object2, object1, object2, | ||
it('with restore option and more than 16 nested objects', done => { | ||
let nDone = 0; | ||
const inputStream = StreamTest[version].fromObjects([ | ||
object1, | ||
object2, | ||
object1, | ||
object2, | ||
object1, | ||
object2, | ||
object1, | ||
object2, | ||
object1, | ||
object2, | ||
object1, | ||
object2, | ||
object1, | ||
object2, | ||
object1, | ||
object2, | ||
object1, | ||
object2, | ||
object1, | ||
object2, | ||
object1, | ||
object2, | ||
object1, | ||
object2, | ||
object1, | ||
object2, | ||
object1, | ||
object2, | ||
object1, | ||
object2, | ||
object1, | ||
object2, | ||
object1, | ||
object2, | ||
object1, | ||
object2, | ||
object1, | ||
object2, | ||
object1, | ||
object2, | ||
object1, | ||
object2, | ||
object1, | ||
object2, | ||
object1, | ||
object2, | ||
object1, | ||
object2, | ||
object1, | ||
object2, | ||
object1, | ||
object2, | ||
object1, | ||
object2, | ||
object1, | ||
object2, | ||
object1, | ||
object2, | ||
object1, | ||
object2, | ||
object1, | ||
object2, | ||
object1, | ||
object2, | ||
]); | ||
var filter = new StreamFilter(function(obj, unused, cb) { | ||
if(obj === object2) { | ||
return cb(true); | ||
const filter = new StreamFilter( | ||
(obj, unused, cb) => { | ||
if (obj === object2) { | ||
return cb(true); | ||
} | ||
return cb(false); | ||
}, | ||
{ | ||
objectMode: true, | ||
restore: true, | ||
} | ||
return cb(false); | ||
}, { | ||
objectMode: true, | ||
restore: true, | ||
}); | ||
var outputStream = StreamTest[version].toObjects(function(err, objs) { | ||
if(err) { | ||
return done(err); | ||
); | ||
const outputStream = StreamTest[version].toObjects((err, objs) => { | ||
if (err) { | ||
done(err); | ||
return; | ||
} | ||
assert.equal(objs.length, 32); | ||
if(2 === ++nDone) { | ||
if (2 === ++nDone) { | ||
done(); | ||
return; | ||
} | ||
}); | ||
filter.restore.pipe(StreamTest[version].toObjects(function(err2, objs2) { | ||
if(err2) { | ||
return done(err2); | ||
} | ||
assert.equal(objs2.length, 32); | ||
if(2 === ++nDone) { | ||
done(); | ||
} | ||
})); | ||
filter.restore.pipe( | ||
StreamTest[version].toObjects((err2, objs2) => { | ||
if (err2) { | ||
done(err2); | ||
return; | ||
} | ||
assert.equal(objs2.length, 32); | ||
if (2 === ++nDone) { | ||
done(); | ||
return; | ||
} | ||
}) | ||
); | ||
@@ -137,40 +216,91 @@ inputStream.pipe(filter).pipe(outputStream); | ||
it('with restore option and more than 16 objects', function(done) { | ||
var nDone = 0; | ||
var inputStream = StreamTest[version].fromObjects([ | ||
object1, object1, object1, object1, object1, object1, object1, object1, | ||
object1, object1, object1, object1, object1, object1, object1, object1, | ||
object1, object1, object1, object1, object1, object1, object1, object1, | ||
object2, object2, object2, object2, object2, object2, object2, object2, | ||
object2, object2, object2, object2, object2, object2, object2, object2, | ||
object2, object2, object2, object2, object2, object2, object2, object2, | ||
it('with restore option and more than 16 objects', done => { | ||
let nDone = 0; | ||
const inputStream = StreamTest[version].fromObjects([ | ||
object1, | ||
object1, | ||
object1, | ||
object1, | ||
object1, | ||
object1, | ||
object1, | ||
object1, | ||
object1, | ||
object1, | ||
object1, | ||
object1, | ||
object1, | ||
object1, | ||
object1, | ||
object1, | ||
object1, | ||
object1, | ||
object1, | ||
object1, | ||
object1, | ||
object1, | ||
object1, | ||
object1, | ||
object2, | ||
object2, | ||
object2, | ||
object2, | ||
object2, | ||
object2, | ||
object2, | ||
object2, | ||
object2, | ||
object2, | ||
object2, | ||
object2, | ||
object2, | ||
object2, | ||
object2, | ||
object2, | ||
object2, | ||
object2, | ||
object2, | ||
object2, | ||
object2, | ||
object2, | ||
object2, | ||
object2, | ||
]); | ||
var filter = new StreamFilter(function(obj, unused, cb) { | ||
if(obj === object2) { | ||
return cb(true); | ||
const filter = new StreamFilter( | ||
(obj, unused, cb) => { | ||
if (obj === object2) { | ||
return cb(true); | ||
} | ||
return cb(false); | ||
}, | ||
{ | ||
objectMode: true, | ||
restore: true, | ||
} | ||
return cb(false); | ||
}, { | ||
objectMode: true, | ||
restore: true, | ||
}); | ||
var outputStream = StreamTest[version].toObjects(function(err, objs) { | ||
if(err) { | ||
return done(err); | ||
); | ||
const outputStream = StreamTest[version].toObjects((err, objs) => { | ||
if (err) { | ||
done(err); | ||
return; | ||
} | ||
assert.equal(objs.length, 24); | ||
if(2 === ++nDone) { | ||
if (2 === ++nDone) { | ||
done(); | ||
return; | ||
} | ||
}); | ||
filter.restore.pipe(StreamTest[version].toObjects(function(err2, objs2) { | ||
if(err2) { | ||
return done(err2); | ||
} | ||
assert.equal(objs2.length, 24); | ||
if(2 === ++nDone) { | ||
done(); | ||
} | ||
})); | ||
filter.restore.pipe( | ||
StreamTest[version].toObjects((err2, objs2) => { | ||
if (err2) { | ||
done(err2); | ||
return; | ||
} | ||
assert.equal(objs2.length, 24); | ||
if (2 === ++nDone) { | ||
done(); | ||
return; | ||
} | ||
}) | ||
); | ||
@@ -180,28 +310,40 @@ inputStream.pipe(filter).pipe(outputStream); | ||
it('with restore and passthrough option in a different pipeline', function(done) { | ||
var inputStream = StreamTest[version].fromObjects([object1, object2]); | ||
var filter = new StreamFilter(function(obj, unused, cb) { | ||
if(obj === object2) { | ||
return cb(true); | ||
it('with restore and passthrough option in a different pipeline', done => { | ||
const inputStream = StreamTest[version].fromObjects([ | ||
object1, | ||
object2, | ||
]); | ||
const filter = new StreamFilter( | ||
(obj, unused, cb) => { | ||
if (obj === object2) { | ||
return cb(true); | ||
} | ||
return cb(false); | ||
}, | ||
{ | ||
objectMode: true, | ||
restore: true, | ||
passthrough: true, | ||
} | ||
return cb(false); | ||
}, { | ||
objectMode: true, | ||
restore: true, | ||
passthrough: true, | ||
}); | ||
var outputStream = StreamTest[version].toObjects(function(err, objs) { | ||
if(err) { | ||
return done(err); | ||
); | ||
const outputStream = StreamTest[version].toObjects((err, objs) => { | ||
if (err) { | ||
done(err); | ||
return; | ||
} | ||
assert.deepEqual(objs, [object1]); | ||
filter.restore.pipe(StreamTest[version].toObjects(function(err2, objs2) { | ||
if(err2) { | ||
return done(err2); | ||
} | ||
assert.deepEqual(objs2, [object3, object2]); | ||
done(); | ||
})); | ||
filter.restore.pipe( | ||
StreamTest[version].toObjects((err2, objs2) => { | ||
if (err2) { | ||
done(err2); | ||
return; | ||
} | ||
assert.deepEqual(objs2, [object3, object2]); | ||
done(); | ||
}) | ||
); | ||
}); | ||
var restoreInputStream = StreamTest[version].fromObjects([object3]); | ||
const restoreInputStream = StreamTest[version].fromObjects([ | ||
object3, | ||
]); | ||
@@ -212,20 +354,28 @@ inputStream.pipe(filter).pipe(outputStream); | ||
it('with restore and passthrough option in the same pipeline', function(done) { | ||
var passThroughStream1Ended = false; | ||
var passThroughStream2Ended = false; | ||
var duplexStreamEnded = false; | ||
var inputStream = StreamTest[version].fromObjects([object1, object2, object3]); | ||
var filter = new StreamFilter(function(chunk, encoding, cb) { | ||
if(chunk === object2) { | ||
return cb(true); | ||
it('with restore and passthrough option in the same pipeline', done => { | ||
let passThroughStream1Ended = false; | ||
let passThroughStream2Ended = false; | ||
let duplexStreamEnded = false; | ||
const inputStream = StreamTest[version].fromObjects([ | ||
object1, | ||
object2, | ||
object3, | ||
]); | ||
const filter = new StreamFilter( | ||
(chunk, encoding, cb) => { | ||
if (chunk === object2) { | ||
return cb(true); | ||
} | ||
return cb(false); | ||
}, | ||
{ | ||
objectMode: true, | ||
restore: true, | ||
passthrough: true, | ||
} | ||
return cb(false); | ||
}, { | ||
objectMode: true, | ||
restore: true, | ||
passthrough: true, | ||
}); | ||
var outputStream = StreamTest[version].toObjects(function(err, objs) { | ||
if(err) { | ||
return done(err); | ||
); | ||
const outputStream = StreamTest[version].toObjects((err, objs) => { | ||
if (err) { | ||
done(err); | ||
return; | ||
} | ||
@@ -235,41 +385,54 @@ assert.deepEqual(objs, [object1, object2, object3]); | ||
}); | ||
var duplexStream = new Stream.Duplex({ objectMode: true }); | ||
const duplexStream = new Stream.Duplex({ objectMode: true }); | ||
duplexStream._write = function(obj, unused, cb) { | ||
duplexStream._write = (obj, unused, cb) => { | ||
duplexStream.push(obj); | ||
setImmediate(cb); | ||
}; | ||
duplexStream._read = function() {}; | ||
duplexStream.on('finish', function() { | ||
setTimeout(function() { | ||
duplexStream._read = () => {}; | ||
duplexStream.on('finish', () => { | ||
setTimeout(() => { | ||
duplexStream.push(null); | ||
}, 100); | ||
}); | ||
outputStream.on('end', function() { | ||
assert(passThroughStream1Ended, | ||
'PassThrough stream ends before the output one.'); | ||
assert(passThroughStream2Ended, | ||
'PassThrough stream ends before the output one.'); | ||
assert(duplexStreamEnded, | ||
'Duplex stream ends before the output one.'); | ||
outputStream.on('end', () => { | ||
assert( | ||
passThroughStream1Ended, | ||
'PassThrough stream ends before the output one.' | ||
); | ||
assert( | ||
passThroughStream2Ended, | ||
'PassThrough stream ends before the output one.' | ||
); | ||
assert( | ||
duplexStreamEnded, | ||
'Duplex stream ends before the output one.' | ||
); | ||
}); | ||
filter.restore.on('end', function() { | ||
assert(passThroughStream1Ended, | ||
'PassThrough stream ends before the restore one.'); | ||
assert(passThroughStream2Ended, | ||
'PassThrough stream ends before the restore one.'); | ||
assert(duplexStreamEnded, | ||
'Duplex stream ends before the restore one.'); | ||
filter.restore.on('end', () => { | ||
assert( | ||
passThroughStream1Ended, | ||
'PassThrough stream ends before the restore one.' | ||
); | ||
assert( | ||
passThroughStream2Ended, | ||
'PassThrough stream ends before the restore one.' | ||
); | ||
assert( | ||
duplexStreamEnded, | ||
'Duplex stream ends before the restore one.' | ||
); | ||
}); | ||
inputStream.pipe(filter) | ||
inputStream | ||
.pipe(filter) | ||
.pipe(new Stream.PassThrough({ objectMode: true })) | ||
.on('end', function() { | ||
.on('end', () => { | ||
passThroughStream1Ended = true; | ||
}) | ||
.pipe(new Stream.PassThrough({ objectMode: true })) | ||
.on('end', function() { | ||
.on('end', () => { | ||
passThroughStream2Ended = true; | ||
}) | ||
.pipe(duplexStream) | ||
.on('end', function() { | ||
.on('end', () => { | ||
duplexStreamEnded = true; | ||
@@ -281,98 +444,125 @@ }) | ||
it('with restore and passthrough option in the same pipeline and a buffered stream', function(done) { | ||
var passThroughStream1Ended = false; | ||
var passThroughStream2Ended = false; | ||
var duplexStreamEnded = false; | ||
var inputStream = StreamTest[version].fromObjects([object1, object2, object3]); | ||
var filter = new StreamFilter(function(chunk, encoding, cb) { | ||
if(chunk === object2) { | ||
return cb(true); | ||
} | ||
return cb(false); | ||
}, { | ||
objectMode: true, | ||
restore: true, | ||
passthrough: true, | ||
}); | ||
var outputStream = StreamTest[version].toObjects(function(err, objs) { | ||
if(err) { | ||
return done(err); | ||
} | ||
assert.equal(objs.length, 3); | ||
setImmediate(done); | ||
}); | ||
var duplexStream = new Stream.Duplex({ objectMode: true }); | ||
it( | ||
'with restore and passthrough option in the same' + | ||
' pipeline and a buffered stream', | ||
done => { | ||
let passThroughStream1Ended = false; | ||
let passThroughStream2Ended = false; | ||
let duplexStreamEnded = false; | ||
const inputStream = StreamTest[version].fromObjects([ | ||
object1, | ||
object2, | ||
object3, | ||
]); | ||
const filter = new StreamFilter( | ||
(chunk, encoding, cb) => { | ||
if (chunk === object2) { | ||
return cb(true); | ||
} | ||
return cb(false); | ||
}, | ||
{ | ||
objectMode: true, | ||
restore: true, | ||
passthrough: true, | ||
} | ||
); | ||
const outputStream = StreamTest[version].toObjects( | ||
(err, objs) => { | ||
if (err) { | ||
done(err); | ||
return; | ||
} | ||
assert.equal(objs.length, 3); | ||
setImmediate(done); | ||
} | ||
); | ||
const duplexStream = new Stream.Duplex({ objectMode: true }); | ||
duplexStream._objs = []; | ||
duplexStream._write = function(obj, unused, cb) { | ||
duplexStream._objs.push(obj); | ||
cb(); | ||
}; | ||
duplexStream._read = function() { | ||
var obj; | ||
duplexStream._objs = []; | ||
duplexStream._write = (obj, unused, cb) => { | ||
duplexStream._objs.push(obj); | ||
cb(); | ||
}; | ||
duplexStream._read = () => { | ||
let obj; | ||
if(duplexStream._hasFinished) { | ||
while(duplexStream._objs.length) { | ||
obj = duplexStream._objs.shift(); | ||
if(!duplexStream.push(obj)) { | ||
break; | ||
if (duplexStream._hasFinished) { | ||
while (duplexStream._objs.length) { | ||
obj = duplexStream._objs.shift(); | ||
if (!duplexStream.push(obj)) { | ||
break; | ||
} | ||
} | ||
if (0 === duplexStream._objs.length) { | ||
duplexStream.push(null); | ||
} | ||
} | ||
if(0 === duplexStream._objs.length) { | ||
duplexStream.push(null); | ||
} | ||
} | ||
}; | ||
duplexStream.on('finish', function() { | ||
duplexStream._hasFinished = true; | ||
duplexStream._read(); | ||
}); | ||
outputStream.on('end', function() { | ||
assert(passThroughStream1Ended, | ||
'PassThrough stream ends before the output one.'); | ||
assert(passThroughStream2Ended, | ||
'PassThrough stream ends before the output one.'); | ||
assert(duplexStreamEnded, | ||
'Duplex stream ends before the output one.'); | ||
}); | ||
filter.restore.on('end', function() { | ||
assert(passThroughStream1Ended, | ||
'PassThrough stream ends before the restore one.'); | ||
assert(passThroughStream2Ended, | ||
'PassThrough stream ends before the restore one.'); | ||
assert(duplexStreamEnded, | ||
'Duplex stream ends before the restore one.'); | ||
}); | ||
inputStream.pipe(filter) | ||
.pipe(new Stream.PassThrough({ objectMode: true })) | ||
.on('end', function() { | ||
passThroughStream1Ended = true; | ||
}) | ||
.pipe(new Stream.PassThrough({ objectMode: true })) | ||
.on('end', function() { | ||
passThroughStream2Ended = true; | ||
}) | ||
.pipe(duplexStream) | ||
.on('end', function() { | ||
duplexStreamEnded = true; | ||
}) | ||
.pipe(filter.restore) | ||
.pipe(outputStream); | ||
}); | ||
}; | ||
duplexStream.on('finish', () => { | ||
duplexStream._hasFinished = true; | ||
duplexStream._read(); | ||
}); | ||
outputStream.on('end', () => { | ||
assert( | ||
passThroughStream1Ended, | ||
'PassThrough stream ends before the output one.' | ||
); | ||
assert( | ||
passThroughStream2Ended, | ||
'PassThrough stream ends before the output one.' | ||
); | ||
assert( | ||
duplexStreamEnded, | ||
'Duplex stream ends before the output one.' | ||
); | ||
}); | ||
filter.restore.on('end', () => { | ||
assert( | ||
passThroughStream1Ended, | ||
'PassThrough stream ends before the restore one.' | ||
); | ||
assert( | ||
passThroughStream2Ended, | ||
'PassThrough stream ends before the restore one.' | ||
); | ||
assert( | ||
duplexStreamEnded, | ||
'Duplex stream ends before the restore one.' | ||
); | ||
}); | ||
inputStream | ||
.pipe(filter) | ||
.pipe(new Stream.PassThrough({ objectMode: true })) | ||
.on('end', () => { | ||
passThroughStream1Ended = true; | ||
}) | ||
.pipe(new Stream.PassThrough({ objectMode: true })) | ||
.on('end', () => { | ||
passThroughStream2Ended = true; | ||
}) | ||
.pipe(duplexStream) | ||
.on('end', () => { | ||
duplexStreamEnded = true; | ||
}) | ||
.pipe(filter.restore) | ||
.pipe(outputStream); | ||
} | ||
); | ||
}); | ||
}); | ||
describe('in buffer mode', function() { | ||
describe('in buffer mode', () => { | ||
describe('should work', () => { | ||
const buffer1 = new Buffer('plop'); | ||
const buffer2 = new Buffer('plop2'); | ||
const buffer3 = new Buffer('plop3'); | ||
describe('should work', function() { | ||
var buffer1 = new Buffer('plop'); | ||
var buffer2 = new Buffer('plop2'); | ||
var buffer3 = new Buffer('plop3'); | ||
it('with no restore option', function(done) { | ||
var inputStream = StreamTest[version].fromChunks([buffer1, buffer2]); | ||
var filter = new StreamFilter(function(chunk, encoding, cb) { | ||
if(chunk.toString() === buffer1.toString()) { | ||
it('with no restore option', done => { | ||
const inputStream = StreamTest[version].fromChunks([ | ||
buffer1, | ||
buffer2, | ||
]); | ||
const filter = new StreamFilter((chunk, encoding, cb) => { | ||
if (chunk.toString() === buffer1.toString()) { | ||
return cb(true); | ||
@@ -382,5 +572,6 @@ } | ||
}); | ||
var outputStream = StreamTest[version].toText(function(err, text) { | ||
if(err) { | ||
return done(err); | ||
const outputStream = StreamTest[version].toText((err, text) => { | ||
if (err) { | ||
done(err); | ||
return; | ||
} | ||
@@ -394,24 +585,34 @@ assert.equal(text, buffer2.toString()); | ||
it('with restore option', function(done) { | ||
var inputStream = StreamTest[version].fromChunks([buffer1, buffer2]); | ||
var filter = new StreamFilter(function(chunk, encoding, cb) { | ||
if(chunk.toString() === buffer2.toString()) { | ||
return cb(true); | ||
it('with restore option', done => { | ||
const inputStream = StreamTest[version].fromChunks([ | ||
buffer1, | ||
buffer2, | ||
]); | ||
const filter = new StreamFilter( | ||
(chunk, encoding, cb) => { | ||
if (chunk.toString() === buffer2.toString()) { | ||
return cb(true); | ||
} | ||
return cb(false); | ||
}, | ||
{ | ||
restore: true, | ||
} | ||
return cb(false); | ||
}, { | ||
restore: true, | ||
}); | ||
var outputStream = StreamTest[version].toText(function(err, text) { | ||
if(err) { | ||
return done(err); | ||
); | ||
const outputStream = StreamTest[version].toText((err, text) => { | ||
if (err) { | ||
done(err); | ||
return; | ||
} | ||
assert.equal(text, buffer1.toString()); | ||
filter.restore.pipe(StreamTest[version].toText(function(err2, text2) { | ||
if(err2) { | ||
return done(err2); | ||
} | ||
assert.equal(text2, buffer2.toString()); | ||
done(); | ||
})); | ||
filter.restore.pipe( | ||
StreamTest[version].toText((err2, text2) => { | ||
if (err2) { | ||
done(err2); | ||
return; | ||
} | ||
assert.equal(text2, buffer2.toString()); | ||
done(); | ||
}) | ||
); | ||
}); | ||
@@ -422,27 +623,42 @@ | ||
it('with restore and passthrough option', function(done) { | ||
var inputStream = StreamTest[version].fromChunks([buffer1, buffer2]); | ||
var filter = new StreamFilter(function(chunk, encoding, cb) { | ||
if(chunk.toString() === buffer2.toString()) { | ||
return cb(true); | ||
it('with restore and passthrough option', done => { | ||
const inputStream = StreamTest[version].fromChunks([ | ||
buffer1, | ||
buffer2, | ||
]); | ||
const filter = new StreamFilter( | ||
(chunk, encoding, cb) => { | ||
if (chunk.toString() === buffer2.toString()) { | ||
return cb(true); | ||
} | ||
return cb(false); | ||
}, | ||
{ | ||
restore: true, | ||
passthrough: true, | ||
} | ||
return cb(false); | ||
}, { | ||
restore: true, | ||
passthrough: true, | ||
}); | ||
var outputStream = StreamTest[version].toText(function(err, text) { | ||
if(err) { | ||
return done(err); | ||
); | ||
const outputStream = StreamTest[version].toText((err, text) => { | ||
if (err) { | ||
done(err); | ||
return; | ||
} | ||
assert.equal(text, buffer1.toString()); | ||
filter.restore.pipe(StreamTest[version].toText(function(err2, text2) { | ||
if(err2) { | ||
return done(err2); | ||
} | ||
assert.deepEqual(text2, [buffer3.toString(), buffer2.toString()].join('')); | ||
done(); | ||
})); | ||
filter.restore.pipe( | ||
StreamTest[version].toText((err2, text2) => { | ||
if (err2) { | ||
done(err2); | ||
return; | ||
} | ||
assert.deepEqual( | ||
text2, | ||
[buffer3.toString(), buffer2.toString()].join('') | ||
); | ||
done(); | ||
}) | ||
); | ||
}); | ||
var restoreInputStream = StreamTest[version].fromChunks([buffer3]); | ||
const restoreInputStream = StreamTest[version].fromChunks([ | ||
buffer3, | ||
]); | ||
@@ -452,11 +668,6 @@ inputStream.pipe(filter).pipe(outputStream); | ||
}); | ||
}); | ||
}); | ||
}); | ||
}); | ||
}); |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Deprecated
MaintenanceThe maintainer of the package marked it as deprecated. This could indicate that a single version should not be used, or that the package is no longer maintained and any new vulnerabilities will not be fixed.
Found 1 instance in 1 package
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
44010
18
779
165
14
1
1
Updatedreadable-stream@^2.3.3