streamfilter
Advanced tools
Comparing version 1.0.6 to 1.0.7
{ | ||
"name": "streamfilter", | ||
"version": "1.0.6", | ||
"version": "1.0.7", | ||
"description": "Filtering streams.", | ||
"main": "src/index.js", | ||
"metapak": { | ||
"configs": [ | ||
"readme", | ||
"jsdocs", | ||
"eslint", | ||
"mocha", | ||
"codeclimate", | ||
"travis" | ||
], | ||
"data": { | ||
"files": "src/*.js tests/*.mocha.js", | ||
"testsFiles": "tests/*.mocha.js" | ||
} | ||
}, | ||
"scripts": { | ||
"changelog": "conventional-changelog -p angular -i CHANGELOG.md -s", | ||
"cli": "env NODE_ENV=${NODE_ENV:-cli}", | ||
"cover": "istanbul cover _mocha --report html -- tests/*.mocha.js -R spec -t 5000", | ||
"coveralls": "istanbul cover _mocha --report lcovonly -- tests/*.mocha.js -R spec -t 5000 && cat ./coverage/lcov.info | coveralls && rm -rf ./coverage", | ||
"cz": "env NODE_ENV=${NODE_ENV:-cli} git cz", | ||
"doc": "mkdir -p .readme; echo \"# API\" > .readme/API.md; jsdoc2md src/*.js tests/*.mocha.js >> .readme/API.md", | ||
"lint": "eslint src/*.js tests/*.mocha.js", | ||
"metapak": "metapak", | ||
"mocha": "mocha tests/*.mocha.js", | ||
"prettier": "prettier --write src/*.js tests/*.mocha.js", | ||
"preversion": "npm t && npm run lint && npm run metapak -s", | ||
"test": "npm run mocha", | ||
"version": "npm run changelog && git add CHANGELOG.md" | ||
"test": "mocha tests/*.mocha.js", | ||
"coveralls": "./node_modules/istanbul/lib/cli.js cover ./node_modules/mocha/bin/_mocha --report lcovonly -- tests/*.mocha.js -R spec -t 5000 && cat ./coverage/lcov.info | coveralls && rm -rf ./coverage", | ||
"cover": "./node_modules/istanbul/lib/cli.js cover ./node_modules/mocha/bin/_mocha --report html -- tests/*.mocha.js -R spec -t 5000", | ||
"lint": "eslint **/*.s", | ||
"cli": "env NPM_RUN_CLI=1" | ||
}, | ||
@@ -51,43 +29,12 @@ "repository": { | ||
"devDependencies": { | ||
"commitizen": "^2.9.6", | ||
"conventional-changelog-cli": "^1.3.5", | ||
"coveralls": "^2.13.3", | ||
"cz-conventional-changelog": "^2.0.0", | ||
"eslint": "^4.12.1", | ||
"eslint-plugin-prettier": "^2.3.1", | ||
"istanbul": "^0.4.5", | ||
"jsdoc-to-markdown": "^3.0.2", | ||
"metapak": "^1.0.2", | ||
"metapak-nfroidure": "^2.0.2", | ||
"mocha": "^3.5.3", | ||
"mocha-lcov-reporter": "^1.3.0", | ||
"prettier": "^1.8.2", | ||
"streamtest": "^1.2.3" | ||
"coveralls": "^2.11.3", | ||
"eslint": "^1.0.0", | ||
"istanbul": "^0.3.17", | ||
"mocha": "^2.2.5", | ||
"sf-lint": "^1.0.2", | ||
"streamtest": "^1.2.1" | ||
}, | ||
"dependencies": { | ||
"readable-stream": "^2.3.3" | ||
}, | ||
"engines": { | ||
"node": ">=6.9.5" | ||
}, | ||
"config": { | ||
"commitizen": { | ||
"path": "./node_modules/cz-conventional-changelog" | ||
} | ||
}, | ||
"greenkeeper": { | ||
"ignore": [ | ||
"commitizen", | ||
"cz-conventional-changelog", | ||
"conventional-changelog-cli", | ||
"jsdoc-to-markdown", | ||
"eslint", | ||
"eslint-config-prettier", | ||
"prettier", | ||
"mocha", | ||
"mocha-lcov-reporter", | ||
"coveralls", | ||
"istanbul" | ||
] | ||
"readable-stream": "^2.0.2" | ||
} | ||
} |
@@ -1,20 +0,8 @@ | ||
<!-- | ||
# This file is automatically generated by a `metapak` | ||
# module. Do not change it elsewhere, changes would | ||
# be overridden. | ||
--> | ||
# streamfilter | ||
> Filtering streams. | ||
[![NPM version](https://badge.fury.io/js/streamfilter.svg)](https://npmjs.org/package/streamfilter) | ||
[![Build status](https://secure.travis-ci.org/nfroidure/streamfilter.svg)](https://travis-ci.org/nfroidure/streamfilter) | ||
[![Dependency Status](https://david-dm.org/nfroidure/streamfilter.svg)](https://david-dm.org/nfroidure/streamfilter) | ||
[![devDependency Status](https://david-dm.org/nfroidure/streamfilter/dev-status.svg)](https://david-dm.org/nfroidure/streamfilter#info=devDependencies) | ||
[![Coverage Status](https://coveralls.io/repos/nfroidure/streamfilter/badge.svg?branch=master)](https://coveralls.io/r/nfroidure/streamfilter?branch=master) | ||
[![Code Climate](https://codeclimate.com/github/nfroidure/streamfilter.svg)](https://codeclimate.com/github/nfroidure/streamfilter) | ||
[![Dependency Status](https://dependencyci.com/github/nfroidure/streamfilter/badge)](https://dependencyci.com/github/nfroidure/streamfilter) | ||
`streamfilter` is a function based filter for streams inspired per gulp-filter | ||
but no limited to Gulp nor to objectMode streams. | ||
[![NPM version](https://badge.fury.io/js/streamfilter.png)](https://npmjs.org/package/streamfilter) [![Build status](https://secure.travis-ci.org/nfroidure/streamfilter.png)](https://travis-ci.org/nfroidure/streamfilter) [![Dependency Status](https://david-dm.org/nfroidure/streamfilter.png)](https://david-dm.org/nfroidure/streamfilter) [![devDependency Status](https://david-dm.org/nfroidure/streamfilter/dev-status.png)](https://david-dm.org/nfroidure/streamfilter#info=devDependencies) [![Coverage Status](https://coveralls.io/repos/nfroidure/streamfilter/badge.png?branch=master)](https://coveralls.io/r/nfroidure/streamfilter?branch=master) [![Code Climate](https://codeclimate.com/github/nfroidure/streamfilter.png)](https://codeclimate.com/github/nfroidure/streamfilter) | ||
## Installation | ||
@@ -148,18 +136,1 @@ | ||
# API | ||
<a name="StreamFilter"></a> | ||
## StreamFilter(filterCallback, options) ⇒ <code>Stream</code> | ||
[StreamFilter description] | ||
**Kind**: global function | ||
**Returns**: <code>Stream</code> - The filtering stream | ||
| Param | Type | Description | | ||
| --- | --- | --- | | ||
| filterCallback | <code>function</code> | Callback applying the filters | | ||
| options | <code>Object</code> | Filtering options | | ||
# License | ||
[MIT](https://github.com/nfroidure/streamfilter/blob/master/LICENSE) |
'use strict'; | ||
const stream = require('readable-stream'); | ||
const util = require('util'); | ||
var stream = require('readable-stream'); | ||
var util = require('util'); | ||
/** | ||
* [StreamFilter description] | ||
* @param {Function} filterCallback Callback applying the filters | ||
* @param {Object} options Filtering options | ||
* @returns {Stream} The filtering stream | ||
*/ | ||
function StreamFilter(filterCallback, options) { | ||
const _this = this; | ||
var _this = this; | ||
// Ensure new is called | ||
if (!(this instanceof StreamFilter)) { | ||
if(!(this instanceof StreamFilter)) { | ||
return new StreamFilter(filterCallback, options); | ||
@@ -21,3 +15,3 @@ } | ||
// filter callback is required | ||
if (!(filterCallback instanceof Function)) { | ||
if(!(filterCallback instanceof Function)) { | ||
throw new Error('filterCallback must be a function.'); | ||
@@ -29,3 +23,3 @@ } | ||
options.restore = options.restore || false; | ||
options.passthrough = (options.restore && options.passthrough) || false; | ||
options.passthrough = options.restore && options.passthrough || false; | ||
@@ -37,14 +31,12 @@ this._filterStreamEnded = false; | ||
filterCallback(chunk, encoding, function StreamFilterCallback(filter) { | ||
if (!filter) { | ||
if(!filter) { | ||
_this.push(chunk, encoding); | ||
done(); | ||
return; | ||
} | ||
if (options.restore) { | ||
_this._restoreManager.programPush(chunk, encoding, () => { | ||
} else if(options.restore) { | ||
_this._restoreManager.programPush(chunk, encoding, function() { | ||
done(); | ||
}); | ||
return; | ||
} else { | ||
done(); | ||
} | ||
done(); | ||
}); | ||
@@ -55,9 +47,9 @@ }; | ||
this._filterStreamEnded = true; | ||
done(); // eslint-disable-line | ||
if (options.restore) { | ||
if (!options.passthrough) { | ||
this._restoreManager.programPush(null, {}.undef, () => { | ||
done(); | ||
if(options.restore) { | ||
if(!options.passthrough) { | ||
this._restoreManager.programPush(null, {}.undef, function() { | ||
done(); | ||
}); | ||
} else if (this._restoreStreamCallback) { | ||
} else if(this._restoreStreamCallback) { | ||
this._restoreStreamCallback(); | ||
@@ -71,11 +63,7 @@ } | ||
// Creating the restored stream if necessary | ||
if (options.restore) { | ||
if (options.passthrough) { | ||
if(options.restore) { | ||
if(options.passthrough) { | ||
this.restore = new stream.Duplex(options); | ||
this._restoreManager = createReadStreamBackpressureManager(this.restore); | ||
this.restore._write = function streamFilterRestoreWrite( | ||
chunk, | ||
encoding, | ||
done | ||
) { | ||
this.restore._write = function streamFilterRestoreWrite(chunk, encoding, done) { | ||
_this._restoreManager.programPush(chunk, encoding, done); | ||
@@ -85,6 +73,6 @@ }; | ||
this.restore.on('finish', function streamFilterRestoreFinish() { | ||
_this._restoreStreamCallback = () => { | ||
_this._restoreManager.programPush(null, {}.undef, () => {}); | ||
_this._restoreStreamCallback = function() { | ||
_this._restoreManager.programPush(null, {}.undef, function() {}); | ||
}; | ||
if (_this._filterStreamEnded) { | ||
if(_this._filterStreamEnded) { | ||
_this._restoreStreamCallback(); | ||
@@ -104,3 +92,3 @@ } | ||
function createReadStreamBackpressureManager(readableStream) { | ||
const manager = { | ||
var manager = { | ||
waitPush: true, | ||
@@ -119,12 +107,12 @@ programmedPushs: [], | ||
attemptPush: function attemptPush() { | ||
let nextPush; | ||
var nextPush; | ||
if (manager.waitPush) { | ||
if (manager.programmedPushs.length) { | ||
if(manager.waitPush) { | ||
if(manager.programmedPushs.length) { | ||
nextPush = manager.programmedPushs.shift(); | ||
manager.waitPush = readableStream.push(nextPush[0], nextPush[1]); | ||
nextPush[2](); | ||
(nextPush[2])(); | ||
} | ||
} else { | ||
setImmediate(() => { | ||
setImmediate(function() { | ||
// Need to be async to avoid nested push attempts | ||
@@ -131,0 +119,0 @@ readableStream.emit('readable'); |
@@ -1,59 +0,55 @@ | ||
/* eslint max-nested-callbacks:[0], no-magic-numbers:[0] */ | ||
'use strict'; | ||
const assert = require('assert'); | ||
const Stream = require('stream'); | ||
const StreamTest = require('streamtest'); | ||
const StreamFilter = require('../src/index'); | ||
var assert = require('assert'); | ||
var Stream = require('stream'); | ||
var StreamTest = require('streamtest'); | ||
var StreamFilter = require('../src/index'); | ||
describe('StreamFilter', () => { | ||
describe('should fail', () => { | ||
it( | ||
'if options.filter is not a function', | ||
() => { | ||
assert.throws(() => { | ||
new StreamFilter(); // eslint-disable-line | ||
}); | ||
}, | ||
/Error/ | ||
); | ||
describe('StreamFilter', function() { | ||
describe('should fail', function() { | ||
it('if options.filter is not a function', function() { | ||
assert.throws(function() { | ||
new StreamFilter(); | ||
}); | ||
}, /Error/); | ||
}); | ||
describe('should work', () => { | ||
it('should work without new', () => { | ||
const createFilter = StreamFilter; | ||
describe('should work', function() { | ||
assert(createFilter(() => {}) instanceof StreamFilter); | ||
it('should work without new', function() { | ||
var createFilter = StreamFilter; | ||
assert(createFilter(function() {}) instanceof StreamFilter); | ||
}); | ||
}); | ||
// Iterating through versions | ||
StreamTest.versions.forEach(version => { | ||
describe('for ' + version + ' streams', () => { | ||
describe('in object mode', () => { | ||
describe('should work', () => { | ||
const object1 = { test: 'plop' }; | ||
const object2 = { test: 'plop2' }; | ||
const object3 = { test: 'plop3' }; | ||
StreamTest.versions.forEach(function(version) { | ||
it('with no restore option', done => { | ||
const inputStream = StreamTest[version].fromObjects([ | ||
object1, | ||
object2, | ||
]); | ||
const filter = new StreamFilter( | ||
(obj, unused, cb) => { | ||
if (obj === object2) { | ||
return cb(true); | ||
} | ||
return cb(false); | ||
}, | ||
{ | ||
objectMode: true, | ||
describe('for ' + version + ' streams', function() { | ||
describe('in object mode', function() { | ||
describe('should work', function() { | ||
var object1 = { test: 'plop' }; | ||
var object2 = { test: 'plop2' }; | ||
var object3 = { test: 'plop3' }; | ||
it('with no restore option', function(done) { | ||
var inputStream = StreamTest[version].fromObjects([object1, object2]); | ||
var filter = new StreamFilter(function(obj, unused, cb) { | ||
if(obj === object2) { | ||
return cb(true); | ||
} | ||
); | ||
const outputStream = StreamTest[version].toObjects((err, objs) => { | ||
if (err) { | ||
done(err); | ||
return; | ||
return cb(false); | ||
}, { | ||
objectMode: true, | ||
}); | ||
var outputStream = StreamTest[version].toObjects(function(err, objs) { | ||
if(err) { | ||
return done(err); | ||
} | ||
@@ -67,35 +63,25 @@ assert.deepEqual(objs, [object1]); | ||
it('with restore option', done => { | ||
const inputStream = StreamTest[version].fromObjects([ | ||
object1, | ||
object2, | ||
]); | ||
const filter = new StreamFilter( | ||
(obj, unused, cb) => { | ||
if (obj === object2) { | ||
return cb(true); | ||
} | ||
return cb(false); | ||
}, | ||
{ | ||
objectMode: true, | ||
restore: true, | ||
it('with restore option', function(done) { | ||
var inputStream = StreamTest[version].fromObjects([object1, object2]); | ||
var filter = new StreamFilter(function(obj, unused, cb) { | ||
if(obj === object2) { | ||
return cb(true); | ||
} | ||
); | ||
const outputStream = StreamTest[version].toObjects((err, objs) => { | ||
if (err) { | ||
done(err); | ||
return; | ||
return cb(false); | ||
}, { | ||
objectMode: true, | ||
restore: true, | ||
}); | ||
var outputStream = StreamTest[version].toObjects(function(err, objs) { | ||
if(err) { | ||
return done(err); | ||
} | ||
assert.deepEqual(objs, [object1]); | ||
filter.restore.pipe( | ||
StreamTest[version].toObjects((err2, objs2) => { | ||
if (err2) { | ||
done(err2); | ||
return; | ||
} | ||
assert.deepEqual(objs2, [object2]); | ||
done(); | ||
}) | ||
); | ||
filter.restore.pipe(StreamTest[version].toObjects(function(err2, objs2) { | ||
if(err2) { | ||
return done(err2); | ||
} | ||
assert.deepEqual(objs2, [object2]); | ||
done(); | ||
})); | ||
}); | ||
@@ -106,107 +92,42 @@ | ||
it('with restore option and more than 16 nested objects', done => { | ||
let nDone = 0; | ||
const inputStream = StreamTest[version].fromObjects([ | ||
object1, | ||
object2, | ||
object1, | ||
object2, | ||
object1, | ||
object2, | ||
object1, | ||
object2, | ||
object1, | ||
object2, | ||
object1, | ||
object2, | ||
object1, | ||
object2, | ||
object1, | ||
object2, | ||
object1, | ||
object2, | ||
object1, | ||
object2, | ||
object1, | ||
object2, | ||
object1, | ||
object2, | ||
object1, | ||
object2, | ||
object1, | ||
object2, | ||
object1, | ||
object2, | ||
object1, | ||
object2, | ||
object1, | ||
object2, | ||
object1, | ||
object2, | ||
object1, | ||
object2, | ||
object1, | ||
object2, | ||
object1, | ||
object2, | ||
object1, | ||
object2, | ||
object1, | ||
object2, | ||
object1, | ||
object2, | ||
object1, | ||
object2, | ||
object1, | ||
object2, | ||
object1, | ||
object2, | ||
object1, | ||
object2, | ||
object1, | ||
object2, | ||
object1, | ||
object2, | ||
object1, | ||
object2, | ||
object1, | ||
object2, | ||
it('with restore option and more than 16 nested objects', function(done) { | ||
var nDone = 0; | ||
var inputStream = StreamTest[version].fromObjects([ | ||
object1, object2, object1, object2, object1, object2, object1, object2, | ||
object1, object2, object1, object2, object1, object2, object1, object2, | ||
object1, object2, object1, object2, object1, object2, object1, object2, | ||
object1, object2, object1, object2, object1, object2, object1, object2, | ||
object1, object2, object1, object2, object1, object2, object1, object2, | ||
object1, object2, object1, object2, object1, object2, object1, object2, | ||
object1, object2, object1, object2, object1, object2, object1, object2, | ||
object1, object2, object1, object2, object1, object2, object1, object2, | ||
]); | ||
const filter = new StreamFilter( | ||
(obj, unused, cb) => { | ||
if (obj === object2) { | ||
return cb(true); | ||
} | ||
return cb(false); | ||
}, | ||
{ | ||
objectMode: true, | ||
restore: true, | ||
var filter = new StreamFilter(function(obj, unused, cb) { | ||
if(obj === object2) { | ||
return cb(true); | ||
} | ||
); | ||
const outputStream = StreamTest[version].toObjects((err, objs) => { | ||
if (err) { | ||
done(err); | ||
return; | ||
return cb(false); | ||
}, { | ||
objectMode: true, | ||
restore: true, | ||
}); | ||
var outputStream = StreamTest[version].toObjects(function(err, objs) { | ||
if(err) { | ||
return done(err); | ||
} | ||
assert.equal(objs.length, 32); | ||
if (2 === ++nDone) { | ||
if(2 === ++nDone) { | ||
done(); | ||
return; | ||
} | ||
}); | ||
filter.restore.pipe( | ||
StreamTest[version].toObjects((err2, objs2) => { | ||
if (err2) { | ||
done(err2); | ||
return; | ||
} | ||
assert.equal(objs2.length, 32); | ||
if (2 === ++nDone) { | ||
done(); | ||
return; | ||
} | ||
}) | ||
); | ||
filter.restore.pipe(StreamTest[version].toObjects(function(err2, objs2) { | ||
if(err2) { | ||
return done(err2); | ||
} | ||
assert.equal(objs2.length, 32); | ||
if(2 === ++nDone) { | ||
done(); | ||
} | ||
})); | ||
@@ -216,91 +137,40 @@ inputStream.pipe(filter).pipe(outputStream); | ||
it('with restore option and more than 16 objects', done => { | ||
let nDone = 0; | ||
const inputStream = StreamTest[version].fromObjects([ | ||
object1, | ||
object1, | ||
object1, | ||
object1, | ||
object1, | ||
object1, | ||
object1, | ||
object1, | ||
object1, | ||
object1, | ||
object1, | ||
object1, | ||
object1, | ||
object1, | ||
object1, | ||
object1, | ||
object1, | ||
object1, | ||
object1, | ||
object1, | ||
object1, | ||
object1, | ||
object1, | ||
object1, | ||
object2, | ||
object2, | ||
object2, | ||
object2, | ||
object2, | ||
object2, | ||
object2, | ||
object2, | ||
object2, | ||
object2, | ||
object2, | ||
object2, | ||
object2, | ||
object2, | ||
object2, | ||
object2, | ||
object2, | ||
object2, | ||
object2, | ||
object2, | ||
object2, | ||
object2, | ||
object2, | ||
object2, | ||
it('with restore option and more than 16 objects', function(done) { | ||
var nDone = 0; | ||
var inputStream = StreamTest[version].fromObjects([ | ||
object1, object1, object1, object1, object1, object1, object1, object1, | ||
object1, object1, object1, object1, object1, object1, object1, object1, | ||
object1, object1, object1, object1, object1, object1, object1, object1, | ||
object2, object2, object2, object2, object2, object2, object2, object2, | ||
object2, object2, object2, object2, object2, object2, object2, object2, | ||
object2, object2, object2, object2, object2, object2, object2, object2, | ||
]); | ||
const filter = new StreamFilter( | ||
(obj, unused, cb) => { | ||
if (obj === object2) { | ||
return cb(true); | ||
} | ||
return cb(false); | ||
}, | ||
{ | ||
objectMode: true, | ||
restore: true, | ||
var filter = new StreamFilter(function(obj, unused, cb) { | ||
if(obj === object2) { | ||
return cb(true); | ||
} | ||
); | ||
const outputStream = StreamTest[version].toObjects((err, objs) => { | ||
if (err) { | ||
done(err); | ||
return; | ||
return cb(false); | ||
}, { | ||
objectMode: true, | ||
restore: true, | ||
}); | ||
var outputStream = StreamTest[version].toObjects(function(err, objs) { | ||
if(err) { | ||
return done(err); | ||
} | ||
assert.equal(objs.length, 24); | ||
if (2 === ++nDone) { | ||
if(2 === ++nDone) { | ||
done(); | ||
return; | ||
} | ||
}); | ||
filter.restore.pipe( | ||
StreamTest[version].toObjects((err2, objs2) => { | ||
if (err2) { | ||
done(err2); | ||
return; | ||
} | ||
assert.equal(objs2.length, 24); | ||
if (2 === ++nDone) { | ||
done(); | ||
return; | ||
} | ||
}) | ||
); | ||
filter.restore.pipe(StreamTest[version].toObjects(function(err2, objs2) { | ||
if(err2) { | ||
return done(err2); | ||
} | ||
assert.equal(objs2.length, 24); | ||
if(2 === ++nDone) { | ||
done(); | ||
} | ||
})); | ||
@@ -310,40 +180,28 @@ inputStream.pipe(filter).pipe(outputStream); | ||
it('with restore and passthrough option in a different pipeline', done => { | ||
const inputStream = StreamTest[version].fromObjects([ | ||
object1, | ||
object2, | ||
]); | ||
const filter = new StreamFilter( | ||
(obj, unused, cb) => { | ||
if (obj === object2) { | ||
return cb(true); | ||
} | ||
return cb(false); | ||
}, | ||
{ | ||
objectMode: true, | ||
restore: true, | ||
passthrough: true, | ||
it('with restore and passthrough option in a different pipeline', function(done) { | ||
var inputStream = StreamTest[version].fromObjects([object1, object2]); | ||
var filter = new StreamFilter(function(obj, unused, cb) { | ||
if(obj === object2) { | ||
return cb(true); | ||
} | ||
); | ||
const outputStream = StreamTest[version].toObjects((err, objs) => { | ||
if (err) { | ||
done(err); | ||
return; | ||
return cb(false); | ||
}, { | ||
objectMode: true, | ||
restore: true, | ||
passthrough: true, | ||
}); | ||
var outputStream = StreamTest[version].toObjects(function(err, objs) { | ||
if(err) { | ||
return done(err); | ||
} | ||
assert.deepEqual(objs, [object1]); | ||
filter.restore.pipe( | ||
StreamTest[version].toObjects((err2, objs2) => { | ||
if (err2) { | ||
done(err2); | ||
return; | ||
} | ||
assert.deepEqual(objs2, [object3, object2]); | ||
done(); | ||
}) | ||
); | ||
filter.restore.pipe(StreamTest[version].toObjects(function(err2, objs2) { | ||
if(err2) { | ||
return done(err2); | ||
} | ||
assert.deepEqual(objs2, [object3, object2]); | ||
done(); | ||
})); | ||
}); | ||
const restoreInputStream = StreamTest[version].fromObjects([ | ||
object3, | ||
]); | ||
var restoreInputStream = StreamTest[version].fromObjects([object3]); | ||
@@ -354,28 +212,20 @@ inputStream.pipe(filter).pipe(outputStream); | ||
it('with restore and passthrough option in the same pipeline', done => { | ||
let passThroughStream1Ended = false; | ||
let passThroughStream2Ended = false; | ||
let duplexStreamEnded = false; | ||
const inputStream = StreamTest[version].fromObjects([ | ||
object1, | ||
object2, | ||
object3, | ||
]); | ||
const filter = new StreamFilter( | ||
(chunk, encoding, cb) => { | ||
if (chunk === object2) { | ||
return cb(true); | ||
} | ||
return cb(false); | ||
}, | ||
{ | ||
objectMode: true, | ||
restore: true, | ||
passthrough: true, | ||
it('with restore and passthrough option in the same pipeline', function(done) { | ||
var passThroughStream1Ended = false; | ||
var passThroughStream2Ended = false; | ||
var duplexStreamEnded = false; | ||
var inputStream = StreamTest[version].fromObjects([object1, object2, object3]); | ||
var filter = new StreamFilter(function(chunk, encoding, cb) { | ||
if(chunk === object2) { | ||
return cb(true); | ||
} | ||
); | ||
const outputStream = StreamTest[version].toObjects((err, objs) => { | ||
if (err) { | ||
done(err); | ||
return; | ||
return cb(false); | ||
}, { | ||
objectMode: true, | ||
restore: true, | ||
passthrough: true, | ||
}); | ||
var outputStream = StreamTest[version].toObjects(function(err, objs) { | ||
if(err) { | ||
return done(err); | ||
} | ||
@@ -385,54 +235,41 @@ assert.deepEqual(objs, [object1, object2, object3]); | ||
}); | ||
const duplexStream = new Stream.Duplex({ objectMode: true }); | ||
var duplexStream = new Stream.Duplex({ objectMode: true }); | ||
duplexStream._write = (obj, unused, cb) => { | ||
duplexStream._write = function(obj, unused, cb) { | ||
duplexStream.push(obj); | ||
setImmediate(cb); | ||
}; | ||
duplexStream._read = () => {}; | ||
duplexStream.on('finish', () => { | ||
setTimeout(() => { | ||
duplexStream._read = function() {}; | ||
duplexStream.on('finish', function() { | ||
setTimeout(function() { | ||
duplexStream.push(null); | ||
}, 100); | ||
}); | ||
outputStream.on('end', () => { | ||
assert( | ||
passThroughStream1Ended, | ||
'PassThrough stream ends before the output one.' | ||
); | ||
assert( | ||
passThroughStream2Ended, | ||
'PassThrough stream ends before the output one.' | ||
); | ||
assert( | ||
duplexStreamEnded, | ||
'Duplex stream ends before the output one.' | ||
); | ||
outputStream.on('end', function() { | ||
assert(passThroughStream1Ended, | ||
'PassThrough stream ends before the output one.'); | ||
assert(passThroughStream2Ended, | ||
'PassThrough stream ends before the output one.'); | ||
assert(duplexStreamEnded, | ||
'Duplex stream ends before the output one.'); | ||
}); | ||
filter.restore.on('end', () => { | ||
assert( | ||
passThroughStream1Ended, | ||
'PassThrough stream ends before the restore one.' | ||
); | ||
assert( | ||
passThroughStream2Ended, | ||
'PassThrough stream ends before the restore one.' | ||
); | ||
assert( | ||
duplexStreamEnded, | ||
'Duplex stream ends before the restore one.' | ||
); | ||
filter.restore.on('end', function() { | ||
assert(passThroughStream1Ended, | ||
'PassThrough stream ends before the restore one.'); | ||
assert(passThroughStream2Ended, | ||
'PassThrough stream ends before the restore one.'); | ||
assert(duplexStreamEnded, | ||
'Duplex stream ends before the restore one.'); | ||
}); | ||
inputStream | ||
.pipe(filter) | ||
inputStream.pipe(filter) | ||
.pipe(new Stream.PassThrough({ objectMode: true })) | ||
.on('end', () => { | ||
.on('end', function() { | ||
passThroughStream1Ended = true; | ||
}) | ||
.pipe(new Stream.PassThrough({ objectMode: true })) | ||
.on('end', () => { | ||
.on('end', function() { | ||
passThroughStream2Ended = true; | ||
}) | ||
.pipe(duplexStream) | ||
.on('end', () => { | ||
.on('end', function() { | ||
duplexStreamEnded = true; | ||
@@ -444,125 +281,98 @@ }) | ||
it( | ||
'with restore and passthrough option in the same' + | ||
' pipeline and a buffered stream', | ||
done => { | ||
let passThroughStream1Ended = false; | ||
let passThroughStream2Ended = false; | ||
let duplexStreamEnded = false; | ||
const inputStream = StreamTest[version].fromObjects([ | ||
object1, | ||
object2, | ||
object3, | ||
]); | ||
const filter = new StreamFilter( | ||
(chunk, encoding, cb) => { | ||
if (chunk === object2) { | ||
return cb(true); | ||
it('with restore and passthrough option in the same pipeline and a buffered stream', function(done) { | ||
var passThroughStream1Ended = false; | ||
var passThroughStream2Ended = false; | ||
var duplexStreamEnded = false; | ||
var inputStream = StreamTest[version].fromObjects([object1, object2, object3]); | ||
var filter = new StreamFilter(function(chunk, encoding, cb) { | ||
if(chunk === object2) { | ||
return cb(true); | ||
} | ||
return cb(false); | ||
}, { | ||
objectMode: true, | ||
restore: true, | ||
passthrough: true, | ||
}); | ||
var outputStream = StreamTest[version].toObjects(function(err, objs) { | ||
if(err) { | ||
return done(err); | ||
} | ||
assert.equal(objs.length, 3); | ||
setImmediate(done); | ||
}); | ||
var duplexStream = new Stream.Duplex({ objectMode: true }); | ||
duplexStream._objs = []; | ||
duplexStream._write = function(obj, unused, cb) { | ||
duplexStream._objs.push(obj); | ||
cb(); | ||
}; | ||
duplexStream._read = function() { | ||
var obj; | ||
if(duplexStream._hasFinished) { | ||
while(duplexStream._objs.length) { | ||
obj = duplexStream._objs.shift(); | ||
if(!duplexStream.push(obj)) { | ||
break; | ||
} | ||
return cb(false); | ||
}, | ||
{ | ||
objectMode: true, | ||
restore: true, | ||
passthrough: true, | ||
} | ||
); | ||
const outputStream = StreamTest[version].toObjects( | ||
(err, objs) => { | ||
if (err) { | ||
done(err); | ||
return; | ||
} | ||
assert.equal(objs.length, 3); | ||
setImmediate(done); | ||
if(0 === duplexStream._objs.length) { | ||
duplexStream.push(null); | ||
} | ||
); | ||
const duplexStream = new Stream.Duplex({ objectMode: true }); | ||
} | ||
}; | ||
duplexStream.on('finish', function() { | ||
duplexStream._hasFinished = true; | ||
duplexStream._read(); | ||
}); | ||
outputStream.on('end', function() { | ||
assert(passThroughStream1Ended, | ||
'PassThrough stream ends before the output one.'); | ||
assert(passThroughStream2Ended, | ||
'PassThrough stream ends before the output one.'); | ||
assert(duplexStreamEnded, | ||
'Duplex stream ends before the output one.'); | ||
}); | ||
filter.restore.on('end', function() { | ||
assert(passThroughStream1Ended, | ||
'PassThrough stream ends before the restore one.'); | ||
assert(passThroughStream2Ended, | ||
'PassThrough stream ends before the restore one.'); | ||
assert(duplexStreamEnded, | ||
'Duplex stream ends before the restore one.'); | ||
}); | ||
inputStream.pipe(filter) | ||
.pipe(new Stream.PassThrough({ objectMode: true })) | ||
.on('end', function() { | ||
passThroughStream1Ended = true; | ||
}) | ||
.pipe(new Stream.PassThrough({ objectMode: true })) | ||
.on('end', function() { | ||
passThroughStream2Ended = true; | ||
}) | ||
.pipe(duplexStream) | ||
.on('end', function() { | ||
duplexStreamEnded = true; | ||
}) | ||
.pipe(filter.restore) | ||
.pipe(outputStream); | ||
}); | ||
duplexStream._objs = []; | ||
duplexStream._write = (obj, unused, cb) => { | ||
duplexStream._objs.push(obj); | ||
cb(); | ||
}; | ||
duplexStream._read = () => { | ||
let obj; | ||
}); | ||
if (duplexStream._hasFinished) { | ||
while (duplexStream._objs.length) { | ||
obj = duplexStream._objs.shift(); | ||
if (!duplexStream.push(obj)) { | ||
break; | ||
} | ||
} | ||
if (0 === duplexStream._objs.length) { | ||
duplexStream.push(null); | ||
} | ||
} | ||
}; | ||
duplexStream.on('finish', () => { | ||
duplexStream._hasFinished = true; | ||
duplexStream._read(); | ||
}); | ||
outputStream.on('end', () => { | ||
assert( | ||
passThroughStream1Ended, | ||
'PassThrough stream ends before the output one.' | ||
); | ||
assert( | ||
passThroughStream2Ended, | ||
'PassThrough stream ends before the output one.' | ||
); | ||
assert( | ||
duplexStreamEnded, | ||
'Duplex stream ends before the output one.' | ||
); | ||
}); | ||
filter.restore.on('end', () => { | ||
assert( | ||
passThroughStream1Ended, | ||
'PassThrough stream ends before the restore one.' | ||
); | ||
assert( | ||
passThroughStream2Ended, | ||
'PassThrough stream ends before the restore one.' | ||
); | ||
assert( | ||
duplexStreamEnded, | ||
'Duplex stream ends before the restore one.' | ||
); | ||
}); | ||
inputStream | ||
.pipe(filter) | ||
.pipe(new Stream.PassThrough({ objectMode: true })) | ||
.on('end', () => { | ||
passThroughStream1Ended = true; | ||
}) | ||
.pipe(new Stream.PassThrough({ objectMode: true })) | ||
.on('end', () => { | ||
passThroughStream2Ended = true; | ||
}) | ||
.pipe(duplexStream) | ||
.on('end', () => { | ||
duplexStreamEnded = true; | ||
}) | ||
.pipe(filter.restore) | ||
.pipe(outputStream); | ||
} | ||
); | ||
}); | ||
}); | ||
describe('in buffer mode', () => { | ||
describe('should work', () => { | ||
const buffer1 = new Buffer('plop'); | ||
const buffer2 = new Buffer('plop2'); | ||
const buffer3 = new Buffer('plop3'); | ||
describe('in buffer mode', function() { | ||
it('with no restore option', done => { | ||
const inputStream = StreamTest[version].fromChunks([ | ||
buffer1, | ||
buffer2, | ||
]); | ||
const filter = new StreamFilter((chunk, encoding, cb) => { | ||
if (chunk.toString() === buffer1.toString()) { | ||
describe('should work', function() { | ||
var buffer1 = new Buffer('plop'); | ||
var buffer2 = new Buffer('plop2'); | ||
var buffer3 = new Buffer('plop3'); | ||
it('with no restore option', function(done) { | ||
var inputStream = StreamTest[version].fromChunks([buffer1, buffer2]); | ||
var filter = new StreamFilter(function(chunk, encoding, cb) { | ||
if(chunk.toString() === buffer1.toString()) { | ||
return cb(true); | ||
@@ -572,6 +382,5 @@ } | ||
}); | ||
const outputStream = StreamTest[version].toText((err, text) => { | ||
if (err) { | ||
done(err); | ||
return; | ||
var outputStream = StreamTest[version].toText(function(err, text) { | ||
if(err) { | ||
return done(err); | ||
} | ||
@@ -585,34 +394,24 @@ assert.equal(text, buffer2.toString()); | ||
it('with restore option', done => { | ||
const inputStream = StreamTest[version].fromChunks([ | ||
buffer1, | ||
buffer2, | ||
]); | ||
const filter = new StreamFilter( | ||
(chunk, encoding, cb) => { | ||
if (chunk.toString() === buffer2.toString()) { | ||
return cb(true); | ||
} | ||
return cb(false); | ||
}, | ||
{ | ||
restore: true, | ||
it('with restore option', function(done) { | ||
var inputStream = StreamTest[version].fromChunks([buffer1, buffer2]); | ||
var filter = new StreamFilter(function(chunk, encoding, cb) { | ||
if(chunk.toString() === buffer2.toString()) { | ||
return cb(true); | ||
} | ||
); | ||
const outputStream = StreamTest[version].toText((err, text) => { | ||
if (err) { | ||
done(err); | ||
return; | ||
return cb(false); | ||
}, { | ||
restore: true, | ||
}); | ||
var outputStream = StreamTest[version].toText(function(err, text) { | ||
if(err) { | ||
return done(err); | ||
} | ||
assert.equal(text, buffer1.toString()); | ||
filter.restore.pipe( | ||
StreamTest[version].toText((err2, text2) => { | ||
if (err2) { | ||
done(err2); | ||
return; | ||
} | ||
assert.equal(text2, buffer2.toString()); | ||
done(); | ||
}) | ||
); | ||
filter.restore.pipe(StreamTest[version].toText(function(err2, text2) { | ||
if(err2) { | ||
return done(err2); | ||
} | ||
assert.equal(text2, buffer2.toString()); | ||
done(); | ||
})); | ||
}); | ||
@@ -623,42 +422,27 @@ | ||
it('with restore and passthrough option', done => { | ||
const inputStream = StreamTest[version].fromChunks([ | ||
buffer1, | ||
buffer2, | ||
]); | ||
const filter = new StreamFilter( | ||
(chunk, encoding, cb) => { | ||
if (chunk.toString() === buffer2.toString()) { | ||
return cb(true); | ||
} | ||
return cb(false); | ||
}, | ||
{ | ||
restore: true, | ||
passthrough: true, | ||
it('with restore and passthrough option', function(done) { | ||
var inputStream = StreamTest[version].fromChunks([buffer1, buffer2]); | ||
var filter = new StreamFilter(function(chunk, encoding, cb) { | ||
if(chunk.toString() === buffer2.toString()) { | ||
return cb(true); | ||
} | ||
); | ||
const outputStream = StreamTest[version].toText((err, text) => { | ||
if (err) { | ||
done(err); | ||
return; | ||
return cb(false); | ||
}, { | ||
restore: true, | ||
passthrough: true, | ||
}); | ||
var outputStream = StreamTest[version].toText(function(err, text) { | ||
if(err) { | ||
return done(err); | ||
} | ||
assert.equal(text, buffer1.toString()); | ||
filter.restore.pipe( | ||
StreamTest[version].toText((err2, text2) => { | ||
if (err2) { | ||
done(err2); | ||
return; | ||
} | ||
assert.deepEqual( | ||
text2, | ||
[buffer3.toString(), buffer2.toString()].join('') | ||
); | ||
done(); | ||
}) | ||
); | ||
filter.restore.pipe(StreamTest[version].toText(function(err2, text2) { | ||
if(err2) { | ||
return done(err2); | ||
} | ||
assert.deepEqual(text2, [buffer3.toString(), buffer2.toString()].join('')); | ||
done(); | ||
})); | ||
}); | ||
const restoreInputStream = StreamTest[version].fromChunks([ | ||
buffer3, | ||
]); | ||
var restoreInputStream = StreamTest[version].fromChunks([buffer3]); | ||
@@ -668,6 +452,11 @@ inputStream.pipe(filter).pipe(outputStream); | ||
}); | ||
}); | ||
}); | ||
}); | ||
}); | ||
}); |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Deprecated
MaintenanceThe maintainer of the package marked it as deprecated. This could indicate that a single version should not be used, or that the package is no longer maintained and any new vulnerabilities will not be fixed.
Found 1 instance in 1 package
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
6
0
30569
7
516
136
Updatedreadable-stream@^2.0.2