Launch Week Day 5: Introducing Reachability for PHP.Learn More
Socket
Book a DemoSign in
Socket

async-transforms

Package Overview
Dependencies
Maintainers
1
Versions
10
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

async-transforms - npm Package Compare versions

Comparing version
1.0.0
to
1.0.1
+27
lib/internal-worker.js
/**
* @fileoverview Runnable that invokes Worker.
*/
import worker from 'worker_threads';
if (worker.isMainThread) {
throw new TypeError('cannot run on main thread');
}
const {dep} = worker.workerData;
import(dep).then(({default: method}) => {
worker.parentPort.on('message', ({task, port}) => {
Promise.resolve()
.then(() => method(task))
.then((result) => port.postMessage({result}))
.catch((error) => port.postMessage({error}))
.then(() => port.close());
});
}).catch((error) => {
// Failure mode: the module couldn't be imported, complain loudly.
worker.parentPort.on('message', ({port}) => {
port.postMessage({error});
port.close();
});
});
import stream from 'stream';
const filterSymbol = Symbol('filter');
/**
* If returned by the map function, will skip this item in the final output.
*/
export const skip = filterSymbol;
/**
* Build a mapping stream. This runs in parallel over receved chunks.
*
* Unlike the built-in Array.map function, returning null or undefined from the mapper will push
* the same chunk onto the output. This acts more like forEach.
*
* By default, this operates in objectMode, and does not guarantee that the output order matches
* the input order.
*
* @param {function(?, number): ?} handler
* @param {{objectMode: boolean, order: boolean, tasks: number}=} options
* @return {!stream.Transform}
*/
export function map(handler, options={}) {
options = Object.assign({
objectMode: true,
order: false,
tasks: 0,
}, options);
let index = 0;
let count = 0;
let flushCallback = null;
options.tasks = Math.ceil(options.tasks) || 0;
const hasTasks = options.tasks > 0;
const pending = [];
let orderPushCount = 0;
const orderDone = [];
const s = new stream.Transform({
objectMode: options.objectMode,
// nb. Passing writeableHighWaterMark here seems to do nothing, we just enforce tasks manually.
transform(chunk, encoding, callback) {
if (flushCallback !== null) {
throw new Error(`got transform() after flush()`);
}
callback();
if (!hasTasks || count < options.tasks) {
internalTransform(chunk, encoding);
} else {
pending.push({chunk, encoding});
}
},
flush(callback) {
if (count === 0) {
callback(); // nothing was pushed, callback immediately
} else {
flushCallback = callback;
}
},
});
return s;
// hoisted methods below
function internalTransform(chunk, encoding) {
++count;
const localIndex = index++;
const resultHandler = internalResultHandler.bind(null, localIndex, chunk);
Promise.resolve()
.then(() => handler(chunk, localIndex))
.then(resultHandler)
.catch((err) => s.destroy(err));
}
function internalResultHandler(localIndex, chunk, result) {
if (result == null) {
result = chunk; // disallow null/undefined as they stop streams
}
if (options.order) {
const doneIndex = localIndex - orderPushCount;
orderDone[doneIndex] = result;
// If we're the first, ship ourselves and any further completed chunks.
if (doneIndex === 0) {
let i = doneIndex;
do {
if (orderDone[i] !== filterSymbol) {
s.push(orderDone[i]);
}
++i;
} while (i < orderDone.length && orderDone[i] !== undefined);
// Splice at once, in case we hit many valid elements.
orderDone.splice(0, i);
orderPushCount += i;
}
} else if (result !== filterSymbol) {
s.push(result); // we don't care about the order, push immediately
}
--count;
if (pending.length && count < options.tasks) {
const {chunk, encoding} = pending.shift();
internalTransform(chunk, encoding);
} else if (count === 0 && flushCallback) {
// this is safe as `else if`, as calling internalTransform again means count > 0
flushCallback();
}
}
}
/**
* As per map, but returning falsey values will remove this from the stream. Returning a truthy
* value will include it.
*
* @param {function(?, number): ?} handler
* @param {{objectMode: boolean, order: boolean, tasks: number}=} options
* @return {!stream.Transform}
*/
export function filter(handler, options={}) {
return map(async (chunk) => {
const result = await handler(chunk);
return result ? chunk : filterSymbol;
}, options);
}
/**
* Asynchronously process all data passed through this stream prior to 'flush' being invoked. This
* gates the throughput and pushes the array of returned values.
*
* This assumes object mode and does not validate or check encoding.
*
* @param {function(!Array<?>): (!Array<?>|!Promise<!Array<?>>}
*/
export function gate(handler, options={}) {
options = Object.assign({
objectMode: true,
}, options);
const chunks = [];
return new stream.Transform({
objectMode: options.objectMode,
transform(chunk, encoding, callback) {
chunks.push(chunk);
callback();
},
flush(callback) {
Promise.resolve(handler(chunks)).then((result) => {
if (result == null) {
result = chunks;
}
// Technically, we allow anything iterable to be returned.
for (const each of result) {
this.push(each);
}
callback();
}).catch(callback);
},
});
}
/**
* Returns a helper that generates an Array from piped data.
*/
export function toArray(options) {
let s;
const promise = new Promise((resolve, reject) => {
s = gate((arr) => resolve(arr), options);
s.on('error', reject);
});
return {stream: s, promise};
}
import worker from 'worker_threads';
import path from 'path';
import os from 'os';
/**
* Determines the absolute directory where this script is contained.
*/
function scriptDir() {
try {
const absolutePath = String(import.meta.url).replace(/^file:\/\//, '');
return path.dirname(absolutePath);
} catch (e) {
// try __dirname
}
try {
return __dirname;
} catch (e) {
throw new Error(`could not resolve __dirname or import.meta.url`);
}
}
const cpuCount = os.cpus().length || 4;
const workerTarget = path.join(scriptDir(), './internal-worker.js');
export function pool(dep, options) {
options = Object.assign({
tasks: cpuCount * 0.75,
expiry: 1000,
}, options);
options.expiry = Math.max(options.task, 0) || 0;
if (options.tasks > 0 && options.tasks < 1) {
options.tasks = cpuCount * options.tasks;
}
options.tasks = Math.max(Math.ceil(options.tasks), 0) || 1;
if (!path.isAbsolute(dep)) {
throw new TypeError(`cannot load worker with relative path: ${dep}`);
}
let activeWorkers = 0;
const availableWorkers = new Map();
const pendingTasks = [];
return async (task) => {
let w;
if (availableWorkers.size) {
for (w of availableWorkers.keys()) {
break; // get 1st worker from map
}
const timeout = availableWorkers.get(w);
availableWorkers.delete(w);
clearTimeout(timeout);
} else if (activeWorkers < options.tasks) {
w = new worker.Worker(workerTarget, {workerData: {dep}});
++activeWorkers;
} else {
return new Promise((resolve) => {
pendingTasks.push({task, resolve});
});
}
return enact(w, task);
};
function enact(w, task) {
// While we could use the worker's parentPort, this gives us less risk of crosstalk.
const {port1, port2} = new worker.MessageChannel();
w.postMessage({task, port: port2}, [port2]);
return new Promise((resolve, reject) => {
const handler = ({result, error}) => {
port1.off('message', handler); // important to allow GC
port1.close();
error ? reject(error) : resolve(result);
releaseWorker(w);
};
port1.on('message', handler);
});
}
function terimateWorker(w) {
--activeWorkers;
w.terminate();
availableWorkers.delete(w);
}
function releaseWorker(w) {
if (pendingTasks.length) {
// There's an immediate task, consume it and go.
const {task, resolve} = pendingTasks.shift();
resolve(enact(w, task));
} else if (options.expiry) {
// Otherwise, put it into our queue to be deleted soon.
const timeout = setTimeout(terimateWorker.bind(null, w), options.expiry);
availableWorkers.set(w, timeout);
} else {
terimateWorker(w)
}
}
}
'use strict';
function _interopDefault (ex) { return (ex && (typeof ex === 'object') && 'default' in ex) ? ex['default'] : ex; }
function _interopNamespace(e) {
if (e && e.__esModule) { return e; } else {
var n = {};
if (e) {
Object.keys(e).forEach(function (k) {
var d = Object.getOwnPropertyDescriptor(e, k);
Object.defineProperty(n, k, d.get ? d : {
enumerable: true,
get: function () {
return e[k];
}
});
});
}
n['default'] = e;
return n;
}
}
var worker = _interopDefault(require('worker_threads'));
/**
* @fileoverview Runnable that invokes Worker.
*/
if (worker.isMainThread) {
throw new TypeError('cannot run on main thread');
}
const {dep} = worker.workerData;
new Promise(function (resolve) { resolve(_interopNamespace(require(dep))); }).then(({default: method}) => {
worker.parentPort.on('message', ({task, port}) => {
Promise.resolve()
.then(() => method(task))
.then((result) => port.postMessage({result}))
.catch((error) => port.postMessage({error}))
.then(() => port.close());
});
}).catch((error) => {
// Failure mode: the module couldn't be imported, complain loudly.
worker.parentPort.on('message', ({port}) => {
port.postMessage({error});
port.close();
});
});
'use strict';
Object.defineProperty(exports, '__esModule', { value: true });
function _interopDefault (ex) { return (ex && (typeof ex === 'object') && 'default' in ex) ? ex['default'] : ex; }
var stream = _interopDefault(require('stream'));
const filterSymbol = Symbol('filter');
/**
* If returned by the map function, will skip this item in the final output.
*/
const skip = filterSymbol;
/**
* Build a mapping stream. This runs in parallel over receved chunks.
*
* Unlike the built-in Array.map function, returning null or undefined from the mapper will push
* the same chunk onto the output. This acts more like forEach.
*
* By default, this operates in objectMode, and does not guarantee that the output order matches
* the input order.
*
* @param {function(?, number): ?} handler
* @param {{objectMode: boolean, order: boolean, tasks: number}=} options
* @return {!stream.Transform}
*/
function map(handler, options={}) {
options = Object.assign({
objectMode: true,
order: false,
tasks: 0,
}, options);
let index = 0;
let count = 0;
let flushCallback = null;
options.tasks = Math.ceil(options.tasks) || 0;
const hasTasks = options.tasks > 0;
const pending = [];
let orderPushCount = 0;
const orderDone = [];
const s = new stream.Transform({
objectMode: options.objectMode,
// nb. Passing writeableHighWaterMark here seems to do nothing, we just enforce tasks manually.
transform(chunk, encoding, callback) {
if (flushCallback !== null) {
throw new Error(`got transform() after flush()`);
}
callback();
if (!hasTasks || count < options.tasks) {
internalTransform(chunk);
} else {
pending.push({chunk, encoding});
}
},
flush(callback) {
if (count === 0) {
callback(); // nothing was pushed, callback immediately
} else {
flushCallback = callback;
}
},
});
return s;
// hoisted methods below
function internalTransform(chunk, encoding) {
++count;
const localIndex = index++;
const resultHandler = internalResultHandler.bind(null, localIndex, chunk);
Promise.resolve()
.then(() => handler(chunk, localIndex))
.then(resultHandler)
.catch((err) => s.destroy(err));
}
function internalResultHandler(localIndex, chunk, result) {
if (result == null) {
result = chunk; // disallow null/undefined as they stop streams
}
if (options.order) {
const doneIndex = localIndex - orderPushCount;
orderDone[doneIndex] = result;
// If we're the first, ship ourselves and any further completed chunks.
if (doneIndex === 0) {
let i = doneIndex;
do {
if (orderDone[i] !== filterSymbol) {
s.push(orderDone[i]);
}
++i;
} while (i < orderDone.length && orderDone[i] !== undefined);
// Splice at once, in case we hit many valid elements.
orderDone.splice(0, i);
orderPushCount += i;
}
} else if (result !== filterSymbol) {
s.push(result); // we don't care about the order, push immediately
}
--count;
if (pending.length && count < options.tasks) {
const {chunk, encoding} = pending.shift();
internalTransform(chunk);
} else if (count === 0 && flushCallback) {
// this is safe as `else if`, as calling internalTransform again means count > 0
flushCallback();
}
}
}
/**
* As per map, but returning falsey values will remove this from the stream. Returning a truthy
* value will include it.
*
* @param {function(?, number): ?} handler
* @param {{objectMode: boolean, order: boolean, tasks: number}=} options
* @return {!stream.Transform}
*/
function filter(handler, options={}) {
return map(async (chunk) => {
const result = await handler(chunk);
return result ? chunk : filterSymbol;
}, options);
}
/**
* Asynchronously process all data passed through this stream prior to 'flush' being invoked. This
* gates the throughput and pushes the array of returned values.
*
* This assumes object mode and does not validate or check encoding.
*
* @param {function(!Array<?>): (!Array<?>|!Promise<!Array<?>>}
*/
function gate(handler, options={}) {
options = Object.assign({
objectMode: true,
}, options);
const chunks = [];
return new stream.Transform({
objectMode: options.objectMode,
transform(chunk, encoding, callback) {
chunks.push(chunk);
callback();
},
flush(callback) {
Promise.resolve(handler(chunks)).then((result) => {
if (result == null) {
result = chunks;
}
// Technically, we allow anything iterable to be returned.
for (const each of result) {
this.push(each);
}
callback();
}).catch(callback);
},
});
}
/**
* Returns a helper that generates an Array from piped data.
*/
function toArray(options) {
let s;
const promise = new Promise((resolve, reject) => {
s = gate((arr) => resolve(arr), options);
s.on('error', reject);
});
return {stream: s, promise};
}
exports.filter = filter;
exports.gate = gate;
exports.map = map;
exports.skip = skip;
exports.toArray = toArray;
'use strict';
Object.defineProperty(exports, '__esModule', { value: true });
function _interopDefault (ex) { return (ex && (typeof ex === 'object') && 'default' in ex) ? ex['default'] : ex; }
var worker = _interopDefault(require('worker_threads'));
var path = _interopDefault(require('path'));
var os = _interopDefault(require('os'));
/**
* Determines the absolute directory where this script is contained.
*/
function scriptDir() {
try {
const absolutePath = String((typeof document === 'undefined' ? new (require('u' + 'rl').URL)('file:' + __filename).href : (document.currentScript && document.currentScript.src || new URL('worker.cjs', document.baseURI).href))).replace(/^file:\/\//, '');
return path.dirname(absolutePath);
} catch (e) {
// try __dirname
}
try {
return __dirname;
} catch (e) {
throw new Error(`could not resolve __dirname or import.meta.url`);
}
}
const cpuCount = os.cpus().length || 4;
const workerTarget = path.join(scriptDir(), './internal-worker.js');
function pool(dep, options) {
options = Object.assign({
tasks: cpuCount * 0.75,
expiry: 1000,
}, options);
options.expiry = Math.max(options.task, 0) || 0;
if (options.tasks > 0 && options.tasks < 1) {
options.tasks = cpuCount * options.tasks;
}
options.tasks = Math.max(Math.ceil(options.tasks), 0) || 1;
if (!path.isAbsolute(dep)) {
throw new TypeError(`cannot load worker with relative path: ${dep}`);
}
let activeWorkers = 0;
const availableWorkers = new Map();
const pendingTasks = [];
return async (task) => {
let w;
if (availableWorkers.size) {
for (w of availableWorkers.keys()) {
break; // get 1st worker from map
}
const timeout = availableWorkers.get(w);
availableWorkers.delete(w);
clearTimeout(timeout);
} else if (activeWorkers < options.tasks) {
w = new worker.Worker(workerTarget, {workerData: {dep}});
++activeWorkers;
} else {
return new Promise((resolve) => {
pendingTasks.push({task, resolve});
});
}
return enact(w, task);
};
function enact(w, task) {
// While we could use the worker's parentPort, this gives us less risk of crosstalk.
const {port1, port2} = new worker.MessageChannel();
w.postMessage({task, port: port2}, [port2]);
return new Promise((resolve, reject) => {
const handler = ({result, error}) => {
port1.off('message', handler); // important to allow GC
port1.close();
error ? reject(error) : resolve(result);
releaseWorker(w);
};
port1.on('message', handler);
});
}
function terimateWorker(w) {
--activeWorkers;
w.terminate();
availableWorkers.delete(w);
}
function releaseWorker(w) {
if (pendingTasks.length) {
// There's an immediate task, consume it and go.
const {task, resolve} = pendingTasks.shift();
resolve(enact(w, task));
} else if (options.expiry) {
// Otherwise, put it into our queue to be deleted soon.
const timeout = setTimeout(terimateWorker.bind(null, w), options.expiry);
availableWorkers.set(w, timeout);
} else {
terimateWorker(w);
}
}
}
exports.pool = pool;
+10
-6
{
"name": "async-transforms",
"version": "1.0.0",
"version": "1.0.1",
"description": "Asynchronous stream transforms",
"type": "module",
"module": "./transforms.js",
"main": "./dist/transforms.cjs",
"module": "./lib/transforms.js",
"main": "./require/transforms.cjs",
"exports": {
"import": "./transforms.js",
"default": "./dist/transforms.cjs"
"import": "./lib/transforms.js",
"require": "./require/transforms.cjs",
"./worker": {
"import": "./lib/worker.js",
"require": "./require/worker.cjs"
}
},

@@ -21,5 +25,5 @@ "repository": "https://github.com/samthor/async-transforms",

"scripts": {
"prepare": "rollup --format=cjs --file=dist/transforms.cjs -- transforms.js",
"prepare": "bash build.sh",
"test": "mocha"
}
}

@@ -66,2 +66,32 @@ [![Build](https://api.travis-ci.org/samthor/async-transforms.svg?branch=master)](https://travis-ci.org/samthor/async-transforms)

While Gulp plugins for Less already exist, this makes it easier to write general-purpose, modern plugins with `async` and `await` syntax.
While Gulp plugins for Less already exist, this makes it easier to write general-purpose, modern plugins with `async` and `await` syntax.
## Worker Pool
This includes a submodule which provides a worker pool.
It's useful when combined with the above transforms handler.
For example:
```js
import {pool} from 'async-transforms/worker';
const compilationPool = pool(path.resolve('./compile.js'));
// use directly
compilationPool(123, {tasks: 2})
.then((result) => console.info('result from passing value to worker', result));
// or as part of a transform
stream.Readable.from([object1, object2])
.pipe(transforms.map(compilationPool))
.pipe(transforms.map(() => {
// do something with the result
}));
```
The pool invokes the default export (or `module.exports` for CJS) of the target file.
By default, it utilizes 75% of your local CPUs, but set `tasks` to control this—use a fraction from 0-1 to set a ratio, and higher for absolute.
Use this for CPU-bound tasks like JS minification.
This doesn't really belong in this module.
'use strict';
Object.defineProperty(exports, '__esModule', { value: true });
function _interopDefault (ex) { return (ex && (typeof ex === 'object') && 'default' in ex) ? ex['default'] : ex; }
var stream = _interopDefault(require('stream'));
const filterSymbol = Symbol('filter');
/**
* If returned by the map function, will skip this item in the final output.
*/
const skip = filterSymbol;
/**
* Build a mapping stream. This runs in parallel over receved chunks.
*
* Unlike the built-in Array.map function, returning null or undefined from the mapper will push
* the same chunk onto the output. This acts more like forEach.
*
* By default, this operates in objectMode, and does not guarantee that the output order matches
* the input order.
*
* @param {function(?, number): ?} handler
* @param {{objectMode: boolean, order: boolean, tasks: number}=} options
* @return {!stream.Transform}
*/
function map(handler, options={}) {
options = Object.assign({
objectMode: true,
order: false,
tasks: 0,
}, options);
let index = 0;
let count = 0;
let flushCallback = null;
options.tasks = Math.ceil(options.tasks) || 0;
const hasTasks = options.tasks > 0;
const pending = [];
let orderPushCount = 0;
const orderDone = [];
const s = new stream.Transform({
objectMode: options.objectMode,
// nb. Passing writeableHighWaterMark here seems to do nothing, we just enforce tasks manually.
transform(chunk, encoding, callback) {
if (flushCallback !== null) {
throw new Error(`got transform() after flush()`);
}
callback();
if (!hasTasks || count < options.tasks) {
internalTransform(chunk);
} else {
pending.push({chunk, encoding});
}
},
flush(callback) {
if (count === 0) {
callback(); // nothing was pushed, callback immediately
} else {
flushCallback = callback;
}
},
});
return s;
// hoisted methods below
function internalTransform(chunk, encoding) {
++count;
const localIndex = index++;
const resultHandler = internalResultHandler.bind(null, localIndex, chunk);
Promise.resolve()
.then(() => handler(chunk, localIndex))
.then(resultHandler)
.catch((err) => s.destroy(err));
}
function internalResultHandler(localIndex, chunk, result) {
if (result == null) {
result = chunk; // disallow null/undefined as they stop streams
}
if (options.order) {
const doneIndex = localIndex - orderPushCount;
orderDone[doneIndex] = result;
// If we're the first, ship ourselves and any further completed chunks.
if (doneIndex === 0) {
let i = doneIndex;
do {
if (orderDone[i] !== filterSymbol) {
s.push(orderDone[i]);
}
++i;
} while (i < orderDone.length && orderDone[i] !== undefined);
// Splice at once, in case we hit many valid elements.
orderDone.splice(0, i);
orderPushCount += i;
}
} else if (result !== filterSymbol) {
s.push(result); // we don't care about the order, push immediately
}
--count;
if (pending.length && count < options.tasks) {
const {chunk, encoding} = pending.shift();
internalTransform(chunk);
} else if (count === 0 && flushCallback) {
// this is safe as `else if`, as calling internalTransform again means count > 0
flushCallback();
}
}
}
/**
* As per map, but returning falsey values will remove this from the stream. Returning a truthy
* value will include it.
*
* @param {function(?, number): ?} handler
* @param {{objectMode: boolean, order: boolean, tasks: number}=} options
* @return {!stream.Transform}
*/
function filter(handler, options={}) {
return map(async (chunk) => {
const result = await handler(chunk);
return result ? chunk : filterSymbol;
}, options);
}
/**
* Asynchronously process all data passed through this stream prior to 'flush' being invoked. This
* gates the throughput and pushes the array of returned values.
*
* This assumes object mode and does not validate or check encoding.
*
* @param {function(!Array<?>): (!Array<?>|!Promise<!Array<?>>}
*/
function gate(handler, options={}) {
options = Object.assign({
objectMode: true,
}, options);
const chunks = [];
return new stream.Transform({
objectMode: options.objectMode,
transform(chunk, encoding, callback) {
chunks.push(chunk);
callback();
},
flush(callback) {
Promise.resolve(handler(chunks)).then((result) => {
if (result == null) {
result = chunks;
}
// Technically, we allow anything iterable to be returned.
for (const each of result) {
this.push(each);
}
callback();
}).catch(callback);
},
});
}
/**
* Returns a helper that generates an Array from piped data.
*/
function toArray(options) {
let s;
const promise = new Promise((resolve, reject) => {
s = gate((arr) => resolve(arr), options);
s.on('error', reject);
});
return {stream: s, promise};
}
exports.filter = filter;
exports.gate = gate;
exports.map = map;
exports.skip = skip;
exports.toArray = toArray;
import stream from 'stream';
const filterSymbol = Symbol('filter');
/**
* If returned by the map function, will skip this item in the final output.
*/
export const skip = filterSymbol;
/**
* Build a mapping stream. This runs in parallel over receved chunks.
*
* Unlike the built-in Array.map function, returning null or undefined from the mapper will push
* the same chunk onto the output. This acts more like forEach.
*
* By default, this operates in objectMode, and does not guarantee that the output order matches
* the input order.
*
* @param {function(?, number): ?} handler
* @param {{objectMode: boolean, order: boolean, tasks: number}=} options
* @return {!stream.Transform}
*/
export function map(handler, options={}) {
options = Object.assign({
objectMode: true,
order: false,
tasks: 0,
}, options);
let index = 0;
let count = 0;
let flushCallback = null;
options.tasks = Math.ceil(options.tasks) || 0;
const hasTasks = options.tasks > 0;
const pending = [];
let orderPushCount = 0;
const orderDone = [];
const s = new stream.Transform({
objectMode: options.objectMode,
// nb. Passing writeableHighWaterMark here seems to do nothing, we just enforce tasks manually.
transform(chunk, encoding, callback) {
if (flushCallback !== null) {
throw new Error(`got transform() after flush()`);
}
callback();
if (!hasTasks || count < options.tasks) {
internalTransform(chunk, encoding);
} else {
pending.push({chunk, encoding});
}
},
flush(callback) {
if (count === 0) {
callback(); // nothing was pushed, callback immediately
} else {
flushCallback = callback;
}
},
});
return s;
// hoisted methods below
function internalTransform(chunk, encoding) {
++count;
const localIndex = index++;
const resultHandler = internalResultHandler.bind(null, localIndex, chunk);
Promise.resolve()
.then(() => handler(chunk, localIndex))
.then(resultHandler)
.catch((err) => s.destroy(err));
}
function internalResultHandler(localIndex, chunk, result) {
if (result == null) {
result = chunk; // disallow null/undefined as they stop streams
}
if (options.order) {
const doneIndex = localIndex - orderPushCount;
orderDone[doneIndex] = result;
// If we're the first, ship ourselves and any further completed chunks.
if (doneIndex === 0) {
let i = doneIndex;
do {
if (orderDone[i] !== filterSymbol) {
s.push(orderDone[i]);
}
++i;
} while (i < orderDone.length && orderDone[i] !== undefined);
// Splice at once, in case we hit many valid elements.
orderDone.splice(0, i);
orderPushCount += i;
}
} else if (result !== filterSymbol) {
s.push(result); // we don't care about the order, push immediately
}
--count;
if (pending.length && count < options.tasks) {
const {chunk, encoding} = pending.shift();
internalTransform(chunk, encoding);
} else if (count === 0 && flushCallback) {
// this is safe as `else if`, as calling internalTransform again means count > 0
flushCallback();
}
}
}
/**
* As per map, but returning falsey values will remove this from the stream. Returning a truthy
* value will include it.
*
* @param {function(?, number): ?} handler
* @param {{objectMode: boolean, order: boolean, tasks: number}=} options
* @return {!stream.Transform}
*/
export function filter(handler, options={}) {
return map(async (chunk) => {
const result = await handler(chunk);
return result ? chunk : filterSymbol;
}, options);
}
/**
* Asynchronously process all data passed through this stream prior to 'flush' being invoked. This
* gates the throughput and pushes the array of returned values.
*
* This assumes object mode and does not validate or check encoding.
*
* @param {function(!Array<?>): (!Array<?>|!Promise<!Array<?>>}
*/
export function gate(handler, options={}) {
options = Object.assign({
objectMode: true,
}, options);
const chunks = [];
return new stream.Transform({
objectMode: options.objectMode,
transform(chunk, encoding, callback) {
chunks.push(chunk);
callback();
},
flush(callback) {
Promise.resolve(handler(chunks)).then((result) => {
if (result == null) {
result = chunks;
}
// Technically, we allow anything iterable to be returned.
for (const each of result) {
this.push(each);
}
callback();
}).catch(callback);
},
});
}
/**
* Returns a helper that generates an Array from piped data.
*/
export function toArray(options) {
let s;
const promise = new Promise((resolve, reject) => {
s = gate((arr) => resolve(arr), options);
s.on('error', reject);
});
return {stream: s, promise};
}