async-transforms
Advanced tools
| /** | ||
| * @fileoverview Runnable that invokes Worker. | ||
| */ | ||
| import worker from 'worker_threads'; | ||
| if (worker.isMainThread) { | ||
| throw new TypeError('cannot run on main thread'); | ||
| } | ||
| const {dep} = worker.workerData; | ||
| import(dep).then(({default: method}) => { | ||
| worker.parentPort.on('message', ({task, port}) => { | ||
| Promise.resolve() | ||
| .then(() => method(task)) | ||
| .then((result) => port.postMessage({result})) | ||
| .catch((error) => port.postMessage({error})) | ||
| .then(() => port.close()); | ||
| }); | ||
| }).catch((error) => { | ||
| // Failure mode: the module couldn't be imported, complain loudly. | ||
| worker.parentPort.on('message', ({port}) => { | ||
| port.postMessage({error}); | ||
| port.close(); | ||
| }); | ||
| }); |
| import stream from 'stream'; | ||
| const filterSymbol = Symbol('filter'); | ||
| /** | ||
| * If returned by the map function, will skip this item in the final output. | ||
| */ | ||
| export const skip = filterSymbol; | ||
| /** | ||
| * Build a mapping stream. This runs in parallel over receved chunks. | ||
| * | ||
| * Unlike the built-in Array.map function, returning null or undefined from the mapper will push | ||
| * the same chunk onto the output. This acts more like forEach. | ||
| * | ||
| * By default, this operates in objectMode, and does not guarantee that the output order matches | ||
| * the input order. | ||
| * | ||
| * @param {function(?, number): ?} handler | ||
| * @param {{objectMode: boolean, order: boolean, tasks: number}=} options | ||
| * @return {!stream.Transform} | ||
| */ | ||
| export function map(handler, options={}) { | ||
| options = Object.assign({ | ||
| objectMode: true, | ||
| order: false, | ||
| tasks: 0, | ||
| }, options); | ||
| let index = 0; | ||
| let count = 0; | ||
| let flushCallback = null; | ||
| options.tasks = Math.ceil(options.tasks) || 0; | ||
| const hasTasks = options.tasks > 0; | ||
| const pending = []; | ||
| let orderPushCount = 0; | ||
| const orderDone = []; | ||
| const s = new stream.Transform({ | ||
| objectMode: options.objectMode, | ||
| // nb. Passing writeableHighWaterMark here seems to do nothing, we just enforce tasks manually. | ||
| transform(chunk, encoding, callback) { | ||
| if (flushCallback !== null) { | ||
| throw new Error(`got transform() after flush()`); | ||
| } | ||
| callback(); | ||
| if (!hasTasks || count < options.tasks) { | ||
| internalTransform(chunk, encoding); | ||
| } else { | ||
| pending.push({chunk, encoding}); | ||
| } | ||
| }, | ||
| flush(callback) { | ||
| if (count === 0) { | ||
| callback(); // nothing was pushed, callback immediately | ||
| } else { | ||
| flushCallback = callback; | ||
| } | ||
| }, | ||
| }); | ||
| return s; | ||
| // hoisted methods below | ||
| function internalTransform(chunk, encoding) { | ||
| ++count; | ||
| const localIndex = index++; | ||
| const resultHandler = internalResultHandler.bind(null, localIndex, chunk); | ||
| Promise.resolve() | ||
| .then(() => handler(chunk, localIndex)) | ||
| .then(resultHandler) | ||
| .catch((err) => s.destroy(err)); | ||
| } | ||
| function internalResultHandler(localIndex, chunk, result) { | ||
| if (result == null) { | ||
| result = chunk; // disallow null/undefined as they stop streams | ||
| } | ||
| if (options.order) { | ||
| const doneIndex = localIndex - orderPushCount; | ||
| orderDone[doneIndex] = result; | ||
| // If we're the first, ship ourselves and any further completed chunks. | ||
| if (doneIndex === 0) { | ||
| let i = doneIndex; | ||
| do { | ||
| if (orderDone[i] !== filterSymbol) { | ||
| s.push(orderDone[i]); | ||
| } | ||
| ++i; | ||
| } while (i < orderDone.length && orderDone[i] !== undefined); | ||
| // Splice at once, in case we hit many valid elements. | ||
| orderDone.splice(0, i); | ||
| orderPushCount += i; | ||
| } | ||
| } else if (result !== filterSymbol) { | ||
| s.push(result); // we don't care about the order, push immediately | ||
| } | ||
| --count; | ||
| if (pending.length && count < options.tasks) { | ||
| const {chunk, encoding} = pending.shift(); | ||
| internalTransform(chunk, encoding); | ||
| } else if (count === 0 && flushCallback) { | ||
| // this is safe as `else if`, as calling internalTransform again means count > 0 | ||
| flushCallback(); | ||
| } | ||
| } | ||
| } | ||
| /** | ||
| * As per map, but returning falsey values will remove this from the stream. Returning a truthy | ||
| * value will include it. | ||
| * | ||
| * @param {function(?, number): ?} handler | ||
| * @param {{objectMode: boolean, order: boolean, tasks: number}=} options | ||
| * @return {!stream.Transform} | ||
| */ | ||
| export function filter(handler, options={}) { | ||
| return map(async (chunk) => { | ||
| const result = await handler(chunk); | ||
| return result ? chunk : filterSymbol; | ||
| }, options); | ||
| } | ||
| /** | ||
| * Asynchronously process all data passed through this stream prior to 'flush' being invoked. This | ||
| * gates the throughput and pushes the array of returned values. | ||
| * | ||
| * This assumes object mode and does not validate or check encoding. | ||
| * | ||
| * @param {function(!Array<?>): (!Array<?>|!Promise<!Array<?>>} | ||
| */ | ||
| export function gate(handler, options={}) { | ||
| options = Object.assign({ | ||
| objectMode: true, | ||
| }, options); | ||
| const chunks = []; | ||
| return new stream.Transform({ | ||
| objectMode: options.objectMode, | ||
| transform(chunk, encoding, callback) { | ||
| chunks.push(chunk); | ||
| callback(); | ||
| }, | ||
| flush(callback) { | ||
| Promise.resolve(handler(chunks)).then((result) => { | ||
| if (result == null) { | ||
| result = chunks; | ||
| } | ||
| // Technically, we allow anything iterable to be returned. | ||
| for (const each of result) { | ||
| this.push(each); | ||
| } | ||
| callback(); | ||
| }).catch(callback); | ||
| }, | ||
| }); | ||
| } | ||
| /** | ||
| * Returns a helper that generates an Array from piped data. | ||
| */ | ||
| export function toArray(options) { | ||
| let s; | ||
| const promise = new Promise((resolve, reject) => { | ||
| s = gate((arr) => resolve(arr), options); | ||
| s.on('error', reject); | ||
| }); | ||
| return {stream: s, promise}; | ||
| } |
+103
| import worker from 'worker_threads'; | ||
| import path from 'path'; | ||
| import os from 'os'; | ||
| /** | ||
| * Determines the absolute directory where this script is contained. | ||
| */ | ||
| function scriptDir() { | ||
| try { | ||
| const absolutePath = String(import.meta.url).replace(/^file:\/\//, ''); | ||
| return path.dirname(absolutePath); | ||
| } catch (e) { | ||
| // try __dirname | ||
| } | ||
| try { | ||
| return __dirname; | ||
| } catch (e) { | ||
| throw new Error(`could not resolve __dirname or import.meta.url`); | ||
| } | ||
| } | ||
| const cpuCount = os.cpus().length || 4; | ||
| const workerTarget = path.join(scriptDir(), './internal-worker.js'); | ||
| export function pool(dep, options) { | ||
| options = Object.assign({ | ||
| tasks: cpuCount * 0.75, | ||
| expiry: 1000, | ||
| }, options); | ||
| options.expiry = Math.max(options.task, 0) || 0; | ||
| if (options.tasks > 0 && options.tasks < 1) { | ||
| options.tasks = cpuCount * options.tasks; | ||
| } | ||
| options.tasks = Math.max(Math.ceil(options.tasks), 0) || 1; | ||
| if (!path.isAbsolute(dep)) { | ||
| throw new TypeError(`cannot load worker with relative path: ${dep}`); | ||
| } | ||
| let activeWorkers = 0; | ||
| const availableWorkers = new Map(); | ||
| const pendingTasks = []; | ||
| return async (task) => { | ||
| let w; | ||
| if (availableWorkers.size) { | ||
| for (w of availableWorkers.keys()) { | ||
| break; // get 1st worker from map | ||
| } | ||
| const timeout = availableWorkers.get(w); | ||
| availableWorkers.delete(w); | ||
| clearTimeout(timeout); | ||
| } else if (activeWorkers < options.tasks) { | ||
| w = new worker.Worker(workerTarget, {workerData: {dep}}); | ||
| ++activeWorkers; | ||
| } else { | ||
| return new Promise((resolve) => { | ||
| pendingTasks.push({task, resolve}); | ||
| }); | ||
| } | ||
| return enact(w, task); | ||
| }; | ||
| function enact(w, task) { | ||
| // While we could use the worker's parentPort, this gives us less risk of crosstalk. | ||
| const {port1, port2} = new worker.MessageChannel(); | ||
| w.postMessage({task, port: port2}, [port2]); | ||
| return new Promise((resolve, reject) => { | ||
| const handler = ({result, error}) => { | ||
| port1.off('message', handler); // important to allow GC | ||
| port1.close(); | ||
| error ? reject(error) : resolve(result); | ||
| releaseWorker(w); | ||
| }; | ||
| port1.on('message', handler); | ||
| }); | ||
| } | ||
| function terimateWorker(w) { | ||
| --activeWorkers; | ||
| w.terminate(); | ||
| availableWorkers.delete(w); | ||
| } | ||
| function releaseWorker(w) { | ||
| if (pendingTasks.length) { | ||
| // There's an immediate task, consume it and go. | ||
| const {task, resolve} = pendingTasks.shift(); | ||
| resolve(enact(w, task)); | ||
| } else if (options.expiry) { | ||
| // Otherwise, put it into our queue to be deleted soon. | ||
| const timeout = setTimeout(terimateWorker.bind(null, w), options.expiry); | ||
| availableWorkers.set(w, timeout); | ||
| } else { | ||
| terimateWorker(w) | ||
| } | ||
| } | ||
| } |
| 'use strict'; | ||
| function _interopDefault (ex) { return (ex && (typeof ex === 'object') && 'default' in ex) ? ex['default'] : ex; } | ||
| function _interopNamespace(e) { | ||
| if (e && e.__esModule) { return e; } else { | ||
| var n = {}; | ||
| if (e) { | ||
| Object.keys(e).forEach(function (k) { | ||
| var d = Object.getOwnPropertyDescriptor(e, k); | ||
| Object.defineProperty(n, k, d.get ? d : { | ||
| enumerable: true, | ||
| get: function () { | ||
| return e[k]; | ||
| } | ||
| }); | ||
| }); | ||
| } | ||
| n['default'] = e; | ||
| return n; | ||
| } | ||
| } | ||
| var worker = _interopDefault(require('worker_threads')); | ||
| /** | ||
| * @fileoverview Runnable that invokes Worker. | ||
| */ | ||
| if (worker.isMainThread) { | ||
| throw new TypeError('cannot run on main thread'); | ||
| } | ||
| const {dep} = worker.workerData; | ||
| new Promise(function (resolve) { resolve(_interopNamespace(require(dep))); }).then(({default: method}) => { | ||
| worker.parentPort.on('message', ({task, port}) => { | ||
| Promise.resolve() | ||
| .then(() => method(task)) | ||
| .then((result) => port.postMessage({result})) | ||
| .catch((error) => port.postMessage({error})) | ||
| .then(() => port.close()); | ||
| }); | ||
| }).catch((error) => { | ||
| // Failure mode: the module couldn't be imported, complain loudly. | ||
| worker.parentPort.on('message', ({port}) => { | ||
| port.postMessage({error}); | ||
| port.close(); | ||
| }); | ||
| }); |
| 'use strict'; | ||
| Object.defineProperty(exports, '__esModule', { value: true }); | ||
| function _interopDefault (ex) { return (ex && (typeof ex === 'object') && 'default' in ex) ? ex['default'] : ex; } | ||
| var stream = _interopDefault(require('stream')); | ||
| const filterSymbol = Symbol('filter'); | ||
| /** | ||
| * If returned by the map function, will skip this item in the final output. | ||
| */ | ||
| const skip = filterSymbol; | ||
| /** | ||
| * Build a mapping stream. This runs in parallel over receved chunks. | ||
| * | ||
| * Unlike the built-in Array.map function, returning null or undefined from the mapper will push | ||
| * the same chunk onto the output. This acts more like forEach. | ||
| * | ||
| * By default, this operates in objectMode, and does not guarantee that the output order matches | ||
| * the input order. | ||
| * | ||
| * @param {function(?, number): ?} handler | ||
| * @param {{objectMode: boolean, order: boolean, tasks: number}=} options | ||
| * @return {!stream.Transform} | ||
| */ | ||
| function map(handler, options={}) { | ||
| options = Object.assign({ | ||
| objectMode: true, | ||
| order: false, | ||
| tasks: 0, | ||
| }, options); | ||
| let index = 0; | ||
| let count = 0; | ||
| let flushCallback = null; | ||
| options.tasks = Math.ceil(options.tasks) || 0; | ||
| const hasTasks = options.tasks > 0; | ||
| const pending = []; | ||
| let orderPushCount = 0; | ||
| const orderDone = []; | ||
| const s = new stream.Transform({ | ||
| objectMode: options.objectMode, | ||
| // nb. Passing writeableHighWaterMark here seems to do nothing, we just enforce tasks manually. | ||
| transform(chunk, encoding, callback) { | ||
| if (flushCallback !== null) { | ||
| throw new Error(`got transform() after flush()`); | ||
| } | ||
| callback(); | ||
| if (!hasTasks || count < options.tasks) { | ||
| internalTransform(chunk); | ||
| } else { | ||
| pending.push({chunk, encoding}); | ||
| } | ||
| }, | ||
| flush(callback) { | ||
| if (count === 0) { | ||
| callback(); // nothing was pushed, callback immediately | ||
| } else { | ||
| flushCallback = callback; | ||
| } | ||
| }, | ||
| }); | ||
| return s; | ||
| // hoisted methods below | ||
| function internalTransform(chunk, encoding) { | ||
| ++count; | ||
| const localIndex = index++; | ||
| const resultHandler = internalResultHandler.bind(null, localIndex, chunk); | ||
| Promise.resolve() | ||
| .then(() => handler(chunk, localIndex)) | ||
| .then(resultHandler) | ||
| .catch((err) => s.destroy(err)); | ||
| } | ||
| function internalResultHandler(localIndex, chunk, result) { | ||
| if (result == null) { | ||
| result = chunk; // disallow null/undefined as they stop streams | ||
| } | ||
| if (options.order) { | ||
| const doneIndex = localIndex - orderPushCount; | ||
| orderDone[doneIndex] = result; | ||
| // If we're the first, ship ourselves and any further completed chunks. | ||
| if (doneIndex === 0) { | ||
| let i = doneIndex; | ||
| do { | ||
| if (orderDone[i] !== filterSymbol) { | ||
| s.push(orderDone[i]); | ||
| } | ||
| ++i; | ||
| } while (i < orderDone.length && orderDone[i] !== undefined); | ||
| // Splice at once, in case we hit many valid elements. | ||
| orderDone.splice(0, i); | ||
| orderPushCount += i; | ||
| } | ||
| } else if (result !== filterSymbol) { | ||
| s.push(result); // we don't care about the order, push immediately | ||
| } | ||
| --count; | ||
| if (pending.length && count < options.tasks) { | ||
| const {chunk, encoding} = pending.shift(); | ||
| internalTransform(chunk); | ||
| } else if (count === 0 && flushCallback) { | ||
| // this is safe as `else if`, as calling internalTransform again means count > 0 | ||
| flushCallback(); | ||
| } | ||
| } | ||
| } | ||
| /** | ||
| * As per map, but returning falsey values will remove this from the stream. Returning a truthy | ||
| * value will include it. | ||
| * | ||
| * @param {function(?, number): ?} handler | ||
| * @param {{objectMode: boolean, order: boolean, tasks: number}=} options | ||
| * @return {!stream.Transform} | ||
| */ | ||
| function filter(handler, options={}) { | ||
| return map(async (chunk) => { | ||
| const result = await handler(chunk); | ||
| return result ? chunk : filterSymbol; | ||
| }, options); | ||
| } | ||
| /** | ||
| * Asynchronously process all data passed through this stream prior to 'flush' being invoked. This | ||
| * gates the throughput and pushes the array of returned values. | ||
| * | ||
| * This assumes object mode and does not validate or check encoding. | ||
| * | ||
| * @param {function(!Array<?>): (!Array<?>|!Promise<!Array<?>>} | ||
| */ | ||
| function gate(handler, options={}) { | ||
| options = Object.assign({ | ||
| objectMode: true, | ||
| }, options); | ||
| const chunks = []; | ||
| return new stream.Transform({ | ||
| objectMode: options.objectMode, | ||
| transform(chunk, encoding, callback) { | ||
| chunks.push(chunk); | ||
| callback(); | ||
| }, | ||
| flush(callback) { | ||
| Promise.resolve(handler(chunks)).then((result) => { | ||
| if (result == null) { | ||
| result = chunks; | ||
| } | ||
| // Technically, we allow anything iterable to be returned. | ||
| for (const each of result) { | ||
| this.push(each); | ||
| } | ||
| callback(); | ||
| }).catch(callback); | ||
| }, | ||
| }); | ||
| } | ||
| /** | ||
| * Returns a helper that generates an Array from piped data. | ||
| */ | ||
| function toArray(options) { | ||
| let s; | ||
| const promise = new Promise((resolve, reject) => { | ||
| s = gate((arr) => resolve(arr), options); | ||
| s.on('error', reject); | ||
| }); | ||
| return {stream: s, promise}; | ||
| } | ||
| exports.filter = filter; | ||
| exports.gate = gate; | ||
| exports.map = map; | ||
| exports.skip = skip; | ||
| exports.toArray = toArray; |
| 'use strict'; | ||
| Object.defineProperty(exports, '__esModule', { value: true }); | ||
| function _interopDefault (ex) { return (ex && (typeof ex === 'object') && 'default' in ex) ? ex['default'] : ex; } | ||
| var worker = _interopDefault(require('worker_threads')); | ||
| var path = _interopDefault(require('path')); | ||
| var os = _interopDefault(require('os')); | ||
| /** | ||
| * Determines the absolute directory where this script is contained. | ||
| */ | ||
| function scriptDir() { | ||
| try { | ||
| const absolutePath = String((typeof document === 'undefined' ? new (require('u' + 'rl').URL)('file:' + __filename).href : (document.currentScript && document.currentScript.src || new URL('worker.cjs', document.baseURI).href))).replace(/^file:\/\//, ''); | ||
| return path.dirname(absolutePath); | ||
| } catch (e) { | ||
| // try __dirname | ||
| } | ||
| try { | ||
| return __dirname; | ||
| } catch (e) { | ||
| throw new Error(`could not resolve __dirname or import.meta.url`); | ||
| } | ||
| } | ||
| const cpuCount = os.cpus().length || 4; | ||
| const workerTarget = path.join(scriptDir(), './internal-worker.js'); | ||
| function pool(dep, options) { | ||
| options = Object.assign({ | ||
| tasks: cpuCount * 0.75, | ||
| expiry: 1000, | ||
| }, options); | ||
| options.expiry = Math.max(options.task, 0) || 0; | ||
| if (options.tasks > 0 && options.tasks < 1) { | ||
| options.tasks = cpuCount * options.tasks; | ||
| } | ||
| options.tasks = Math.max(Math.ceil(options.tasks), 0) || 1; | ||
| if (!path.isAbsolute(dep)) { | ||
| throw new TypeError(`cannot load worker with relative path: ${dep}`); | ||
| } | ||
| let activeWorkers = 0; | ||
| const availableWorkers = new Map(); | ||
| const pendingTasks = []; | ||
| return async (task) => { | ||
| let w; | ||
| if (availableWorkers.size) { | ||
| for (w of availableWorkers.keys()) { | ||
| break; // get 1st worker from map | ||
| } | ||
| const timeout = availableWorkers.get(w); | ||
| availableWorkers.delete(w); | ||
| clearTimeout(timeout); | ||
| } else if (activeWorkers < options.tasks) { | ||
| w = new worker.Worker(workerTarget, {workerData: {dep}}); | ||
| ++activeWorkers; | ||
| } else { | ||
| return new Promise((resolve) => { | ||
| pendingTasks.push({task, resolve}); | ||
| }); | ||
| } | ||
| return enact(w, task); | ||
| }; | ||
| function enact(w, task) { | ||
| // While we could use the worker's parentPort, this gives us less risk of crosstalk. | ||
| const {port1, port2} = new worker.MessageChannel(); | ||
| w.postMessage({task, port: port2}, [port2]); | ||
| return new Promise((resolve, reject) => { | ||
| const handler = ({result, error}) => { | ||
| port1.off('message', handler); // important to allow GC | ||
| port1.close(); | ||
| error ? reject(error) : resolve(result); | ||
| releaseWorker(w); | ||
| }; | ||
| port1.on('message', handler); | ||
| }); | ||
| } | ||
| function terimateWorker(w) { | ||
| --activeWorkers; | ||
| w.terminate(); | ||
| availableWorkers.delete(w); | ||
| } | ||
| function releaseWorker(w) { | ||
| if (pendingTasks.length) { | ||
| // There's an immediate task, consume it and go. | ||
| const {task, resolve} = pendingTasks.shift(); | ||
| resolve(enact(w, task)); | ||
| } else if (options.expiry) { | ||
| // Otherwise, put it into our queue to be deleted soon. | ||
| const timeout = setTimeout(terimateWorker.bind(null, w), options.expiry); | ||
| availableWorkers.set(w, timeout); | ||
| } else { | ||
| terimateWorker(w); | ||
| } | ||
| } | ||
| } | ||
| exports.pool = pool; |
+10
-6
| { | ||
| "name": "async-transforms", | ||
| "version": "1.0.0", | ||
| "version": "1.0.1", | ||
| "description": "Asynchronous stream transforms", | ||
| "type": "module", | ||
| "module": "./transforms.js", | ||
| "main": "./dist/transforms.cjs", | ||
| "module": "./lib/transforms.js", | ||
| "main": "./require/transforms.cjs", | ||
| "exports": { | ||
| "import": "./transforms.js", | ||
| "default": "./dist/transforms.cjs" | ||
| "import": "./lib/transforms.js", | ||
| "require": "./require/transforms.cjs", | ||
| "./worker": { | ||
| "import": "./lib/worker.js", | ||
| "require": "./require/worker.cjs" | ||
| } | ||
| }, | ||
@@ -21,5 +25,5 @@ "repository": "https://github.com/samthor/async-transforms", | ||
| "scripts": { | ||
| "prepare": "rollup --format=cjs --file=dist/transforms.cjs -- transforms.js", | ||
| "prepare": "bash build.sh", | ||
| "test": "mocha" | ||
| } | ||
| } |
+31
-1
@@ -66,2 +66,32 @@ [](https://travis-ci.org/samthor/async-transforms) | ||
| While Gulp plugins for Less already exist, this makes it easier to write general-purpose, modern plugins with `async` and `await` syntax. | ||
| While Gulp plugins for Less already exist, this makes it easier to write general-purpose, modern plugins with `async` and `await` syntax. | ||
| ## Worker Pool | ||
| This includes a submodule which provides a worker pool. | ||
| It's useful when combined with the above transforms handler. | ||
| For example: | ||
| ```js | ||
| import {pool} from 'async-transforms/worker'; | ||
| const compilationPool = pool(path.resolve('./compile.js')); | ||
| // use directly | ||
| compilationPool(123, {tasks: 2}) | ||
| .then((result) => console.info('result from passing value to worker', result)); | ||
| // or as part of a transform | ||
| stream.Readable.from([object1, object2]) | ||
| .pipe(transforms.map(compilationPool)) | ||
| .pipe(transforms.map(() => { | ||
| // do something with the result | ||
| })); | ||
| ``` | ||
| The pool invokes the default export (or `module.exports` for CJS) of the target file. | ||
| By default, it utilizes 75% of your local CPUs, but set `tasks` to control this—use a fraction from 0-1 to set a ratio, and higher for absolute. | ||
| Use this for CPU-bound tasks like JS minification. | ||
| This doesn't really belong in this module. |
| 'use strict'; | ||
| Object.defineProperty(exports, '__esModule', { value: true }); | ||
| function _interopDefault (ex) { return (ex && (typeof ex === 'object') && 'default' in ex) ? ex['default'] : ex; } | ||
| var stream = _interopDefault(require('stream')); | ||
| const filterSymbol = Symbol('filter'); | ||
| /** | ||
| * If returned by the map function, will skip this item in the final output. | ||
| */ | ||
| const skip = filterSymbol; | ||
| /** | ||
| * Build a mapping stream. This runs in parallel over receved chunks. | ||
| * | ||
| * Unlike the built-in Array.map function, returning null or undefined from the mapper will push | ||
| * the same chunk onto the output. This acts more like forEach. | ||
| * | ||
| * By default, this operates in objectMode, and does not guarantee that the output order matches | ||
| * the input order. | ||
| * | ||
| * @param {function(?, number): ?} handler | ||
| * @param {{objectMode: boolean, order: boolean, tasks: number}=} options | ||
| * @return {!stream.Transform} | ||
| */ | ||
| function map(handler, options={}) { | ||
| options = Object.assign({ | ||
| objectMode: true, | ||
| order: false, | ||
| tasks: 0, | ||
| }, options); | ||
| let index = 0; | ||
| let count = 0; | ||
| let flushCallback = null; | ||
| options.tasks = Math.ceil(options.tasks) || 0; | ||
| const hasTasks = options.tasks > 0; | ||
| const pending = []; | ||
| let orderPushCount = 0; | ||
| const orderDone = []; | ||
| const s = new stream.Transform({ | ||
| objectMode: options.objectMode, | ||
| // nb. Passing writeableHighWaterMark here seems to do nothing, we just enforce tasks manually. | ||
| transform(chunk, encoding, callback) { | ||
| if (flushCallback !== null) { | ||
| throw new Error(`got transform() after flush()`); | ||
| } | ||
| callback(); | ||
| if (!hasTasks || count < options.tasks) { | ||
| internalTransform(chunk); | ||
| } else { | ||
| pending.push({chunk, encoding}); | ||
| } | ||
| }, | ||
| flush(callback) { | ||
| if (count === 0) { | ||
| callback(); // nothing was pushed, callback immediately | ||
| } else { | ||
| flushCallback = callback; | ||
| } | ||
| }, | ||
| }); | ||
| return s; | ||
| // hoisted methods below | ||
| function internalTransform(chunk, encoding) { | ||
| ++count; | ||
| const localIndex = index++; | ||
| const resultHandler = internalResultHandler.bind(null, localIndex, chunk); | ||
| Promise.resolve() | ||
| .then(() => handler(chunk, localIndex)) | ||
| .then(resultHandler) | ||
| .catch((err) => s.destroy(err)); | ||
| } | ||
| function internalResultHandler(localIndex, chunk, result) { | ||
| if (result == null) { | ||
| result = chunk; // disallow null/undefined as they stop streams | ||
| } | ||
| if (options.order) { | ||
| const doneIndex = localIndex - orderPushCount; | ||
| orderDone[doneIndex] = result; | ||
| // If we're the first, ship ourselves and any further completed chunks. | ||
| if (doneIndex === 0) { | ||
| let i = doneIndex; | ||
| do { | ||
| if (orderDone[i] !== filterSymbol) { | ||
| s.push(orderDone[i]); | ||
| } | ||
| ++i; | ||
| } while (i < orderDone.length && orderDone[i] !== undefined); | ||
| // Splice at once, in case we hit many valid elements. | ||
| orderDone.splice(0, i); | ||
| orderPushCount += i; | ||
| } | ||
| } else if (result !== filterSymbol) { | ||
| s.push(result); // we don't care about the order, push immediately | ||
| } | ||
| --count; | ||
| if (pending.length && count < options.tasks) { | ||
| const {chunk, encoding} = pending.shift(); | ||
| internalTransform(chunk); | ||
| } else if (count === 0 && flushCallback) { | ||
| // this is safe as `else if`, as calling internalTransform again means count > 0 | ||
| flushCallback(); | ||
| } | ||
| } | ||
| } | ||
| /** | ||
| * As per map, but returning falsey values will remove this from the stream. Returning a truthy | ||
| * value will include it. | ||
| * | ||
| * @param {function(?, number): ?} handler | ||
| * @param {{objectMode: boolean, order: boolean, tasks: number}=} options | ||
| * @return {!stream.Transform} | ||
| */ | ||
| function filter(handler, options={}) { | ||
| return map(async (chunk) => { | ||
| const result = await handler(chunk); | ||
| return result ? chunk : filterSymbol; | ||
| }, options); | ||
| } | ||
| /** | ||
| * Asynchronously process all data passed through this stream prior to 'flush' being invoked. This | ||
| * gates the throughput and pushes the array of returned values. | ||
| * | ||
| * This assumes object mode and does not validate or check encoding. | ||
| * | ||
| * @param {function(!Array<?>): (!Array<?>|!Promise<!Array<?>>} | ||
| */ | ||
| function gate(handler, options={}) { | ||
| options = Object.assign({ | ||
| objectMode: true, | ||
| }, options); | ||
| const chunks = []; | ||
| return new stream.Transform({ | ||
| objectMode: options.objectMode, | ||
| transform(chunk, encoding, callback) { | ||
| chunks.push(chunk); | ||
| callback(); | ||
| }, | ||
| flush(callback) { | ||
| Promise.resolve(handler(chunks)).then((result) => { | ||
| if (result == null) { | ||
| result = chunks; | ||
| } | ||
| // Technically, we allow anything iterable to be returned. | ||
| for (const each of result) { | ||
| this.push(each); | ||
| } | ||
| callback(); | ||
| }).catch(callback); | ||
| }, | ||
| }); | ||
| } | ||
| /** | ||
| * Returns a helper that generates an Array from piped data. | ||
| */ | ||
| function toArray(options) { | ||
| let s; | ||
| const promise = new Promise((resolve, reject) => { | ||
| s = gate((arr) => resolve(arr), options); | ||
| s.on('error', reject); | ||
| }); | ||
| return {stream: s, promise}; | ||
| } | ||
| exports.filter = filter; | ||
| exports.gate = gate; | ||
| exports.map = map; | ||
| exports.skip = skip; | ||
| exports.toArray = toArray; |
-190
| import stream from 'stream'; | ||
| const filterSymbol = Symbol('filter'); | ||
| /** | ||
| * If returned by the map function, will skip this item in the final output. | ||
| */ | ||
| export const skip = filterSymbol; | ||
| /** | ||
| * Build a mapping stream. This runs in parallel over receved chunks. | ||
| * | ||
| * Unlike the built-in Array.map function, returning null or undefined from the mapper will push | ||
| * the same chunk onto the output. This acts more like forEach. | ||
| * | ||
| * By default, this operates in objectMode, and does not guarantee that the output order matches | ||
| * the input order. | ||
| * | ||
| * @param {function(?, number): ?} handler | ||
| * @param {{objectMode: boolean, order: boolean, tasks: number}=} options | ||
| * @return {!stream.Transform} | ||
| */ | ||
| export function map(handler, options={}) { | ||
| options = Object.assign({ | ||
| objectMode: true, | ||
| order: false, | ||
| tasks: 0, | ||
| }, options); | ||
| let index = 0; | ||
| let count = 0; | ||
| let flushCallback = null; | ||
| options.tasks = Math.ceil(options.tasks) || 0; | ||
| const hasTasks = options.tasks > 0; | ||
| const pending = []; | ||
| let orderPushCount = 0; | ||
| const orderDone = []; | ||
| const s = new stream.Transform({ | ||
| objectMode: options.objectMode, | ||
| // nb. Passing writeableHighWaterMark here seems to do nothing, we just enforce tasks manually. | ||
| transform(chunk, encoding, callback) { | ||
| if (flushCallback !== null) { | ||
| throw new Error(`got transform() after flush()`); | ||
| } | ||
| callback(); | ||
| if (!hasTasks || count < options.tasks) { | ||
| internalTransform(chunk, encoding); | ||
| } else { | ||
| pending.push({chunk, encoding}); | ||
| } | ||
| }, | ||
| flush(callback) { | ||
| if (count === 0) { | ||
| callback(); // nothing was pushed, callback immediately | ||
| } else { | ||
| flushCallback = callback; | ||
| } | ||
| }, | ||
| }); | ||
| return s; | ||
| // hoisted methods below | ||
| function internalTransform(chunk, encoding) { | ||
| ++count; | ||
| const localIndex = index++; | ||
| const resultHandler = internalResultHandler.bind(null, localIndex, chunk); | ||
| Promise.resolve() | ||
| .then(() => handler(chunk, localIndex)) | ||
| .then(resultHandler) | ||
| .catch((err) => s.destroy(err)); | ||
| } | ||
| function internalResultHandler(localIndex, chunk, result) { | ||
| if (result == null) { | ||
| result = chunk; // disallow null/undefined as they stop streams | ||
| } | ||
| if (options.order) { | ||
| const doneIndex = localIndex - orderPushCount; | ||
| orderDone[doneIndex] = result; | ||
| // If we're the first, ship ourselves and any further completed chunks. | ||
| if (doneIndex === 0) { | ||
| let i = doneIndex; | ||
| do { | ||
| if (orderDone[i] !== filterSymbol) { | ||
| s.push(orderDone[i]); | ||
| } | ||
| ++i; | ||
| } while (i < orderDone.length && orderDone[i] !== undefined); | ||
| // Splice at once, in case we hit many valid elements. | ||
| orderDone.splice(0, i); | ||
| orderPushCount += i; | ||
| } | ||
| } else if (result !== filterSymbol) { | ||
| s.push(result); // we don't care about the order, push immediately | ||
| } | ||
| --count; | ||
| if (pending.length && count < options.tasks) { | ||
| const {chunk, encoding} = pending.shift(); | ||
| internalTransform(chunk, encoding); | ||
| } else if (count === 0 && flushCallback) { | ||
| // this is safe as `else if`, as calling internalTransform again means count > 0 | ||
| flushCallback(); | ||
| } | ||
| } | ||
| } | ||
| /** | ||
| * As per map, but returning falsey values will remove this from the stream. Returning a truthy | ||
| * value will include it. | ||
| * | ||
| * @param {function(?, number): ?} handler | ||
| * @param {{objectMode: boolean, order: boolean, tasks: number}=} options | ||
| * @return {!stream.Transform} | ||
| */ | ||
| export function filter(handler, options={}) { | ||
| return map(async (chunk) => { | ||
| const result = await handler(chunk); | ||
| return result ? chunk : filterSymbol; | ||
| }, options); | ||
| } | ||
| /** | ||
| * Asynchronously process all data passed through this stream prior to 'flush' being invoked. This | ||
| * gates the throughput and pushes the array of returned values. | ||
| * | ||
| * This assumes object mode and does not validate or check encoding. | ||
| * | ||
| * @param {function(!Array<?>): (!Array<?>|!Promise<!Array<?>>} | ||
| */ | ||
| export function gate(handler, options={}) { | ||
| options = Object.assign({ | ||
| objectMode: true, | ||
| }, options); | ||
| const chunks = []; | ||
| return new stream.Transform({ | ||
| objectMode: options.objectMode, | ||
| transform(chunk, encoding, callback) { | ||
| chunks.push(chunk); | ||
| callback(); | ||
| }, | ||
| flush(callback) { | ||
| Promise.resolve(handler(chunks)).then((result) => { | ||
| if (result == null) { | ||
| result = chunks; | ||
| } | ||
| // Technically, we allow anything iterable to be returned. | ||
| for (const each of result) { | ||
| this.push(each); | ||
| } | ||
| callback(); | ||
| }).catch(callback); | ||
| }, | ||
| }); | ||
| } | ||
| /** | ||
| * Returns a helper that generates an Array from piped data. | ||
| */ | ||
| export function toArray(options) { | ||
| let s; | ||
| const promise = new Promise((resolve, reject) => { | ||
| s = gate((arr) => resolve(arr), options); | ||
| s.on('error', reject); | ||
| }); | ||
| return {stream: s, promise}; | ||
| } |
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
Dynamic require
Supply chain riskDynamic require can indicate the package is performing dangerous or unsafe dynamic code execution.
Found 1 instance in 1 package
33290
37.44%9
80%570
77.02%97
46.97%2
100%1
Infinity%