async-transforms
Advanced tools
Comparing version 1.0.6 to 1.0.7
{ | ||
"name": "async-transforms", | ||
"version": "1.0.6", | ||
"version": "1.0.7", | ||
"description": "Asynchronous stream transforms", | ||
@@ -28,3 +28,3 @@ "type": "module", | ||
"scripts": { | ||
"prepare": "bash build.sh", | ||
"prepublishOnly": "bash build.sh", | ||
"test": "mocha" | ||
@@ -31,0 +31,0 @@ }, |
@@ -5,5 +5,5 @@ | ||
export interface Options { | ||
objectMode?: boolean; | ||
order?: boolean; | ||
tasks?: number; | ||
objectMode: boolean; | ||
order: boolean; | ||
tasks: number; | ||
} | ||
@@ -25,3 +25,3 @@ | ||
*/ | ||
export function map(handler: (arg: any, index: number) => any, options?: Options): stream.Transform; | ||
export function map(handler: (arg: any, index: number) => any, options?: Partial<Options>): stream.Transform; | ||
@@ -32,3 +32,3 @@ /** | ||
*/ | ||
export function filter(handler: (arg: any, index: number) => boolean|Promise<boolean>, options?: Options): stream.Transform; | ||
export function filter(handler: (arg: any, index: number) => boolean|Promise<boolean>, options?: Partial<Options>): stream.Transform; | ||
@@ -35,0 +35,0 @@ /** |
@@ -22,7 +22,8 @@ import stream from 'stream'; | ||
* @param {function(?, number): ?} handler | ||
* @param {{objectMode: boolean, order: boolean, tasks: number}=} options | ||
* @param {Partial<import('.').Options>=} options | ||
* @return {!stream.Transform} | ||
*/ | ||
export function map(handler, options={}) { | ||
options = Object.assign({ | ||
export function map(handler, options) { | ||
/** @type {import('.').Options} */ | ||
const o = Object.assign({ | ||
objectMode: true, | ||
@@ -35,13 +36,19 @@ order: false, | ||
let count = 0; | ||
/** @type {stream.TransformCallback?} */ | ||
let flushCallback = null; | ||
options.tasks = Math.ceil(options.tasks) || 0; | ||
const hasTasks = options.tasks > 0; | ||
o.tasks = Math.ceil(o.tasks) || 0; | ||
const hasTasks = o.tasks > 0; | ||
/** @type {{chunk: any, encoding: string}[]} */ | ||
const pending = []; | ||
let orderPushCount = 0; | ||
/** @type {any[]} */ | ||
const orderDone = []; | ||
const s = new stream.Transform({ | ||
objectMode: options.objectMode, | ||
objectMode: o.objectMode, | ||
// nb. Passing writeableHighWaterMark here seems to do nothing, we just enforce tasks manually. | ||
@@ -56,3 +63,3 @@ | ||
if (!hasTasks || count < options.tasks) { | ||
if (!hasTasks || count < o.tasks) { | ||
internalTransform(chunk, encoding); | ||
@@ -77,2 +84,6 @@ } else { | ||
/** | ||
* @param {any} chunk | ||
* @param {string} encoding | ||
*/ | ||
function internalTransform(chunk, encoding) { | ||
@@ -88,2 +99,7 @@ ++count; | ||
/** | ||
* @param {number} localIndex | ||
* @param {any} chunk | ||
* @param {any} result | ||
*/ | ||
function internalResultHandler(localIndex, chunk, result) { | ||
@@ -94,3 +110,3 @@ if (result == null) { | ||
if (options.order) { | ||
if (o.order) { | ||
const doneIndex = localIndex - orderPushCount; | ||
@@ -119,4 +135,4 @@ orderDone[doneIndex] = result; | ||
if (pending.length && count < options.tasks) { | ||
const {chunk, encoding} = pending.shift(); | ||
if (pending.length && count < o.tasks) { | ||
const {chunk, encoding} = /** @type {typeof pending[0]} */ (pending.shift()); | ||
internalTransform(chunk, encoding); | ||
@@ -136,8 +152,8 @@ } else if (count === 0 && flushCallback) { | ||
* @param {function(?, number): ?} handler | ||
* @param {{objectMode: boolean, order: boolean, tasks: number}=} options | ||
* @param {Partial<import('.').Options>=} options | ||
* @return {!stream.Transform} | ||
*/ | ||
export function filter(handler, options={}) { | ||
return map(async (chunk) => { | ||
const result = await handler(chunk); | ||
export function filter(handler, options) { | ||
return map(async (chunk, i) => { | ||
const result = await handler(chunk, i); | ||
return result ? chunk : filterSymbol; | ||
@@ -154,6 +170,7 @@ }, options); | ||
* | ||
* @param {function(!Array<?>): (!Array<?>|!Promise<!Array<?>>} | ||
* @param {function(!Array<?>): (!Array<?>|!Promise<!Array<?>>)} handler | ||
* @return {!stream.Transform} | ||
*/ | ||
export function gate(handler) { | ||
/** @type {any[]} */ | ||
const chunks = []; | ||
@@ -191,4 +208,8 @@ | ||
let s; | ||
/** @type {Promise<any[]>} */ | ||
const promise = new Promise((resolve, reject) => { | ||
s = gate((arr) => resolve(arr)); | ||
s = gate((arr) => { | ||
resolve(arr); | ||
return []; | ||
}); | ||
s.on('error', reject); | ||
@@ -195,0 +216,0 @@ }); |
export interface PoolOptions { | ||
/** | ||
* Minimum number of tasks to keep around. Task startup can be expensive. Default of one. | ||
*/ | ||
minTasks: number, | ||
/** | ||
* Maximum number of tasks to use. Default is 75% of your CPU count, rounded up, with a minimum | ||
* of one. | ||
*/ | ||
tasks?: number, | ||
tasks: number, | ||
@@ -13,4 +19,7 @@ /** | ||
* there's immediately pending tasks), increase if your tasks have high setup costs. | ||
* | ||
* This can be `Infinity` to keep tasks around forever. | ||
*/ | ||
expiry?: number, | ||
expiry: number, | ||
} | ||
@@ -23,2 +32,2 @@ | ||
*/ | ||
export function pool(dep: string, options?: PoolOptions): (...any) => Promise<any>; | ||
export function pool(dep: string, options?: Partial<PoolOptions>): (...any) => Promise<any>; |
@@ -31,7 +31,9 @@ import worker from 'worker_threads'; | ||
* @param {string} dep to run script from | ||
* @param {{tasks?: number, expiry?: number}} options | ||
* @param {Partial<import('.').PoolOptions>} options | ||
* @return {function(...any): Promise<any>} | ||
*/ | ||
export function pool(dep, options) { | ||
options = Object.assign({ | ||
/** @type {import('.').PoolOptions} */ | ||
const o = Object.assign({ | ||
minTasks: 1, | ||
tasks: cpuCount * 0.75, | ||
@@ -41,8 +43,9 @@ expiry: 1000, | ||
options.expiry = Math.max(options.expiry, 0) || 0; | ||
o.expiry = Math.max(o.expiry, 0) || 0; | ||
if (options.tasks > 0 && options.tasks < 1) { | ||
options.tasks = cpuCount * options.tasks; | ||
if (o.tasks > 0 && o.tasks < 1) { | ||
o.tasks = cpuCount * o.tasks; | ||
} | ||
options.tasks = Math.max(Math.ceil(options.tasks), 0) || 1; | ||
o.tasks = Math.max(Math.ceil(o.tasks), 0) || 1; | ||
o.minTasks = Math.max(0, Math.min(o.tasks, ~~o.minTasks)); | ||
@@ -55,38 +58,40 @@ if (!path.isAbsolute(dep)) { | ||
/** @type {Map<worker.Worker, number>} */ | ||
/** @type {Map<worker.Worker, NodeJS.Timeout|undefined>} */ | ||
const availableWorkers = new Map(); | ||
/** @type {{args: any[], resolve: (any) => void}[]} */ | ||
const prepareWorker = () => { | ||
++activeWorkers; | ||
const w = createWorker(dep); | ||
w.on('message', ({ok}) => { | ||
if (ok !== true) { | ||
throw new Error(`got non-ok: ${ok}`); | ||
} | ||
releaseWorker(w); | ||
}); | ||
}; | ||
for (let i = 0; i < o.minTasks; ++i) { | ||
prepareWorker(); | ||
} | ||
/** @type {{args: any[], resolve: (arg: any) => void}[]} */ | ||
const pendingTasks = []; | ||
return async (...args) => { | ||
/** @type {worker.Worker} */ | ||
let w; | ||
if (availableWorkers.size) { | ||
for (w of availableWorkers.keys()) { | ||
break; // get 1st worker from map | ||
} | ||
/** @type {worker.Worker} */ | ||
const w = availableWorkers.keys().next().value; | ||
const timeout = availableWorkers.get(w); | ||
availableWorkers.delete(w); | ||
clearTimeout(timeout); | ||
} else if (activeWorkers < options.tasks) { | ||
if (isModule) { | ||
w = new worker.Worker(workerTarget, {workerData: {dep}}); | ||
} else { | ||
// In commonJS mode, we have to _again_ require the script, as the Worker ctor incorrectly | ||
// only allows ".js" (which attempts to run as a /module/, because of `type: module`) or | ||
// ".mjs" extensions (which is always a module). | ||
// This will probably be fixed in a future Node. Sounds like a bug. | ||
const code = `require(${JSON.stringify(workerTarget)});`; | ||
w = new worker.Worker(code, {workerData: {dep}, eval: true}); | ||
} | ||
++activeWorkers; | ||
} else { | ||
return new Promise((resolve) => { | ||
pendingTasks.push({args, resolve}); | ||
}); | ||
timeout && clearTimeout(timeout); | ||
return enact(w, args); | ||
} | ||
return enact(w, args); | ||
// Start a new worker, but still push the work onto the queue for when it's ready. | ||
if (activeWorkers < o.tasks) { | ||
prepareWorker(); | ||
} | ||
return new Promise((resolve) => { | ||
pendingTasks.push({args, resolve}); | ||
}); | ||
}; | ||
@@ -105,2 +110,3 @@ | ||
return new Promise((resolve, reject) => { | ||
/** @type {(arg: {result: any, error: Error}) => void} */ | ||
const handler = ({result, error}) => { | ||
@@ -119,6 +125,10 @@ port1.off('message', handler); // important to allow GC | ||
*/ | ||
function terimateWorker(w) { | ||
function maybeTerimateWorker(w) { | ||
if (activeWorkers > o.minTasks) { | ||
w.terminate(); | ||
availableWorkers.delete(w); | ||
} else { | ||
availableWorkers.set(w, undefined); | ||
} | ||
--activeWorkers; | ||
w.terminate(); | ||
availableWorkers.delete(w); | ||
} | ||
@@ -130,14 +140,33 @@ | ||
function releaseWorker(w) { | ||
if (pendingTasks.length) { | ||
const immediateTask = pendingTasks.shift(); | ||
if (immediateTask) { | ||
// There's an immediate task, consume it and go. | ||
const {args, resolve} = pendingTasks.shift(); | ||
const {args, resolve} = immediateTask; | ||
resolve(enact(w, args)); | ||
} else if (options.expiry) { | ||
} else if (isFinite(o.expiry)) { | ||
// Otherwise, put it into our queue to be deleted soon. | ||
const timeout = setTimeout(terimateWorker.bind(null, w), options.expiry); | ||
const timeout = setTimeout(maybeTerimateWorker.bind(null, w), o.expiry); | ||
availableWorkers.set(w, timeout); | ||
} else { | ||
terimateWorker(w); | ||
availableWorkers.set(w, undefined); | ||
} | ||
} | ||
} | ||
/** | ||
* @param {string} dep | ||
*/ | ||
function createWorker(dep) { | ||
let w; | ||
if (isModule) { | ||
return new worker.Worker(workerTarget, {workerData: {dep}}); | ||
} | ||
// In commonJS mode, we have to _again_ require the script, as the Worker ctor incorrectly | ||
// only allows ".js" (which attempts to run as a /module/, because of `type: module`) or | ||
// ".mjs" extensions (which is always a module). | ||
// This will probably be fixed in a future Node. Sounds like a bug. | ||
const code = `require(${JSON.stringify(workerTarget)});`; | ||
return new worker.Worker(code, {workerData: {dep}, eval: true}); | ||
} |
@@ -7,3 +7,5 @@ /** | ||
if (worker.isMainThread) { | ||
const {parentPort} = worker; | ||
if (worker.isMainThread || !parentPort) { | ||
throw new TypeError('cannot run on main thread'); | ||
@@ -16,3 +18,3 @@ } | ||
.then(({default: method}) => { | ||
worker.parentPort.on('message', ({args, port}) => { | ||
parentPort.on('message', ({args, port}) => { | ||
Promise.resolve() | ||
@@ -24,6 +26,7 @@ .then(() => method(...args)) | ||
}); | ||
parentPort.postMessage({ok: true}); | ||
}) | ||
.catch((error) => { | ||
// Failure mode: the module couldn't be imported, complain loudly. | ||
worker.parentPort.on('message', ({port}) => { | ||
parentPort.on('message', ({port}) => { | ||
port.postMessage({error}); | ||
@@ -30,0 +33,0 @@ port.close(); |
export interface PoolOptions { | ||
/** | ||
* Minimum number of tasks to keep around. Task startup can be expensive. Default of one. | ||
*/ | ||
minTasks: number, | ||
/** | ||
* Maximum number of tasks to use. Default is 75% of your CPU count, rounded up, with a minimum | ||
* of one. | ||
*/ | ||
tasks?: number, | ||
tasks: number, | ||
@@ -13,4 +19,7 @@ /** | ||
* there's immediately pending tasks), increase if your tasks have high setup costs. | ||
* | ||
* This can be `Infinity` to keep tasks around forever. | ||
*/ | ||
expiry?: number, | ||
expiry: number, | ||
} | ||
@@ -23,2 +32,2 @@ | ||
*/ | ||
export function pool(dep: string, options?: PoolOptions): (...any) => Promise<any>; | ||
export function pool(dep: string, options?: Partial<PoolOptions>): (...any) => Promise<any>; |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
42067
13
830