@lage-run/worker-threads-pool
Advanced tools
Comparing version 0.4.5 to 0.5.0
@@ -5,3 +5,18 @@ { | ||
{ | ||
"date": "Wed, 16 Nov 2022 17:12:13 GMT", | ||
"date": "Wed, 16 Nov 2022 20:06:53 GMT", | ||
"tag": "@lage-run/worker-threads-pool_v0.5.0", | ||
"version": "0.5.0", | ||
"comments": { | ||
"minor": [ | ||
{ | ||
"author": "kchau@microsoft.com", | ||
"package": "@lage-run/worker-threads-pool", | ||
"commit": "04f9c13f6f37c7a14a67ab27acaf253ad09bfbd8", | ||
"comment": "Refactoring the worker threads pool to use proper classes for Worker" | ||
} | ||
] | ||
} | ||
}, | ||
{ | ||
"date": "Wed, 16 Nov 2022 17:12:24 GMT", | ||
"tag": "@lage-run/worker-threads-pool_v0.4.5", | ||
@@ -8,0 +23,0 @@ "version": "0.4.5", |
# Change Log - @lage-run/worker-threads-pool | ||
This log was last generated on Wed, 16 Nov 2022 17:12:13 GMT and should not be manually modified. | ||
This log was last generated on Wed, 16 Nov 2022 20:06:53 GMT and should not be manually modified. | ||
<!-- Start content --> | ||
## 0.5.0 | ||
Wed, 16 Nov 2022 20:06:53 GMT | ||
### Minor changes | ||
- Refactoring the worker threads pool to use proper classes for Worker (kchau@microsoft.com) | ||
## 0.4.5 | ||
Wed, 16 Nov 2022 17:12:13 GMT | ||
Wed, 16 Nov 2022 17:12:24 GMT | ||
@@ -11,0 +19,0 @@ ### Patches |
@@ -6,5 +6,6 @@ /// <reference types="node" /> | ||
import type { Readable } from "stream"; | ||
import type { Worker, WorkerOptions } from "worker_threads"; | ||
import type { WorkerOptions } from "worker_threads"; | ||
import type { Pool } from "./types/Pool.js"; | ||
import type { Logger } from "@lage-run/logger"; | ||
import type { IWorker } from "./types/WorkerQueue.js"; | ||
import { WorkerPool } from "./WorkerPool.js"; | ||
@@ -29,5 +30,5 @@ interface AggregatedPoolOptions { | ||
}; | ||
exec(data: Record<string, unknown>, weight: number, setup?: (worker: Worker, stdout: Readable, stderr: Readable) => void, cleanup?: (args: any) => void, abortSignal?: AbortSignal): Promise<unknown>; | ||
exec(data: Record<string, unknown>, weight: number, setup?: (worker: IWorker, stdout: Readable, stderr: Readable) => void, cleanup?: (args: any) => void, abortSignal?: AbortSignal): Promise<unknown>; | ||
close(): Promise<unknown>; | ||
} | ||
export {}; |
/// <reference types="node" /> | ||
/// <reference types="node" /> | ||
/// <reference types="node" /> | ||
/// <reference types="global" /> | ||
import type { Worker } from "worker_threads"; | ||
import type { Readable } from "stream"; | ||
import type { IWorker } from "./WorkerQueue.js"; | ||
export interface PoolStats { | ||
@@ -12,5 +11,5 @@ maxWorkerMemoryUsage: number; | ||
export interface Pool { | ||
exec(data: unknown, weight: number, setup?: (worker: Worker, stdout: Readable, stderr: Readable) => void, cleanup?: (args: any) => void, abortSignal?: AbortSignal): Promise<unknown>; | ||
exec(data: unknown, weight: number, setup?: (worker: IWorker, stdout: Readable, stderr: Readable) => void, cleanup?: (args: any) => void, abortSignal?: AbortSignal): Promise<unknown>; | ||
stats(): PoolStats; | ||
close(): Promise<unknown>; | ||
} |
@@ -1,33 +0,20 @@ | ||
/** | ||
* Heavily based on a publically available worker pool implementation in node.js documentation: | ||
* https://nodejs.org/api/async_context.html#using-asyncresource-for-a-worker-thread-pool | ||
*/ | ||
/// <reference types="node" /> | ||
/// <reference types="node" /> | ||
/// <reference types="node" /> | ||
/// <reference types="global" /> | ||
/// <reference types="node" /> | ||
import { EventEmitter } from "events"; | ||
import { Worker } from "worker_threads"; | ||
import type { IWorker, QueueItem } from "./types/WorkerQueue.js"; | ||
import type { Pool } from "./types/Pool.js"; | ||
import type { Readable } from "stream"; | ||
import type { WorkerPoolOptions } from "./types/WorkerPoolOptions.js"; | ||
interface QueueItem { | ||
setup?: (worker: Worker, stdout: Readable, stderr: Readable) => void; | ||
cleanup?: (worker: Worker) => void; | ||
task: Record<string, unknown>; | ||
weight: number; | ||
resolve: (value?: unknown) => void; | ||
reject: (reason: unknown) => void; | ||
} | ||
export declare class WorkerPool extends EventEmitter implements Pool { | ||
private options; | ||
workers: Worker[]; | ||
freeWorkers: Worker[]; | ||
workers: IWorker[]; | ||
freeWorkers: IWorker[]; | ||
queue: QueueItem[]; | ||
maxWorkers: number; | ||
availability: number; | ||
maxWorkerMemoryUsage: number; | ||
workerRestarts: number; | ||
constructor(options: WorkerPoolOptions); | ||
get workerRestarts(): number; | ||
get maxWorkerMemoryUsage(): number; | ||
stats(): { | ||
@@ -38,11 +25,6 @@ maxWorkerMemoryUsage: number; | ||
ensureWorkers(): void; | ||
captureWorkerStdioStreams(worker: Worker): void; | ||
addNewWorker(): void; | ||
exec(task: Record<string, unknown>, weight: number, setup?: (worker: Worker, stdout: Readable, stderr: Readable) => void, cleanup?: (worker: Worker) => void, abortSignal?: AbortSignal): Promise<unknown>; | ||
exec(task: Record<string, unknown>, weight: number, setup?: (worker: IWorker, stdout: Readable, stderr: Readable) => void, cleanup?: (worker: IWorker) => void, abortSignal?: AbortSignal): Promise<unknown>; | ||
_exec(abortSignal?: AbortSignal): void; | ||
checkMemoryUsage(worker: Worker): void; | ||
freeWorker(worker: Worker): void; | ||
restartWorker(worker: Worker): void; | ||
close(): Promise<void>; | ||
} | ||
export {}; |
@@ -1,5 +0,2 @@ | ||
/** | ||
* Heavily based on a publically available worker pool implementation in node.js documentation: | ||
* https://nodejs.org/api/async_context.html#using-asyncresource-for-a-worker-thread-pool | ||
*/ "use strict"; | ||
"use strict"; | ||
Object.defineProperty(exports, "__esModule", { | ||
@@ -12,9 +9,4 @@ value: true | ||
}); | ||
const _asyncHooks = require("async_hooks"); | ||
const _createFilteredStreamTransformJs = require("./createFilteredStreamTransform.js"); | ||
const _readline = require("readline"); | ||
const _stdioStreamMarkersJs = require("./stdioStreamMarkers.js"); | ||
const _events = require("events"); | ||
const _workerThreads = require("worker_threads"); | ||
const _crypto = /*#__PURE__*/ _interopRequireDefault(require("crypto")); | ||
const _threadWorkerJs = require("./ThreadWorker.js"); | ||
const _os = /*#__PURE__*/ _interopRequireDefault(require("os")); | ||
@@ -26,36 +18,10 @@ function _interopRequireDefault(obj) { | ||
} | ||
const kTaskInfo = Symbol("kTaskInfo"); | ||
const kWorkerFreedEvent = Symbol("kWorkerFreedEvent"); | ||
const kWorkerCapturedStdoutResolve = Symbol("kWorkerCapturedStdoutResolve"); | ||
const kWorkerCapturedStderrResolve = Symbol("kWorkerCapturedStderrResolve"); | ||
const kWorkerCapturedStdoutPromise = Symbol("kWorkerCapturedStdoutPromise"); | ||
const kWorkerCapturedStderrPromise = Symbol("kWorkerCapturedStderrPromise"); | ||
class WorkerPoolTaskInfo extends _asyncHooks.AsyncResource { | ||
get id() { | ||
return this.options.id; | ||
const workerFreedEvent = "free"; | ||
class WorkerPool extends _events.EventEmitter { | ||
get workerRestarts() { | ||
return this.workers.reduce((acc, worker)=>acc + worker.restarts, 0); | ||
} | ||
get weight() { | ||
return this.options.weight; | ||
get maxWorkerMemoryUsage() { | ||
return this.workers.reduce((acc, worker)=>Math.max(acc, worker.maxWorkerMemoryUsage), 0); | ||
} | ||
done(err, results) { | ||
const { cleanup , worker , resolve , reject } = this.options; | ||
if (cleanup) { | ||
this.runInAsyncScope(cleanup, null, worker); | ||
} | ||
if (err) { | ||
this.runInAsyncScope(reject, null, err, worker); | ||
} else { | ||
this.runInAsyncScope(resolve, null, results, worker); | ||
} | ||
this.emitDestroy(); | ||
} | ||
constructor(options){ | ||
super("WorkerPoolTaskInfo"); | ||
this.options = options; | ||
if (options.setup) { | ||
this.runInAsyncScope(options.setup, null, options.worker, options.worker["filteredStdout"], options.worker["filteredStderr"]); | ||
} | ||
} | ||
} | ||
class WorkerPool extends _events.EventEmitter { | ||
stats() { | ||
@@ -74,102 +40,14 @@ return { | ||
} | ||
captureWorkerStdioStreams(worker) { | ||
const stdout = worker.stdout; | ||
const stdoutInterface = (0, _readline.createInterface)({ | ||
input: stdout, | ||
crlfDelay: Infinity | ||
}); | ||
const stderr = worker.stderr; | ||
const stderrInterface = (0, _readline.createInterface)({ | ||
input: stderr, | ||
crlfDelay: Infinity | ||
}); | ||
const lineHandlerFactory = (outputType)=>{ | ||
let lines = []; | ||
let resolve; | ||
return (line)=>{ | ||
if (!worker[kTaskInfo]) { | ||
// Somehow this lineHandler function is called AFTER the worker has been freed. | ||
// This can happen if there are stray setTimeout(), etc. with callbacks that outputs some messages in stdout/stderr | ||
// In this case, we will ignore the output | ||
return; | ||
} | ||
if (line.includes((0, _stdioStreamMarkersJs.startMarker)(worker[kTaskInfo].id))) { | ||
lines = []; | ||
if (outputType === "stdout") { | ||
resolve = worker[kWorkerCapturedStdoutResolve]; | ||
} else { | ||
resolve = worker[kWorkerCapturedStderrResolve]; | ||
} | ||
} else if (line.includes((0, _stdioStreamMarkersJs.endMarker)(worker[kTaskInfo].id))) { | ||
resolve(); | ||
} else { | ||
lines.push(line); | ||
} | ||
}; | ||
}; | ||
const stdoutLineHandler = lineHandlerFactory("stdout"); | ||
const stderrLineHandler = lineHandlerFactory("stderr"); | ||
stdoutInterface.on("line", stdoutLineHandler); | ||
stderrInterface.on("line", stderrLineHandler); | ||
} | ||
addNewWorker() { | ||
const { script , workerOptions } = this.options; | ||
const worker = new _workerThreads.Worker(script, { | ||
...workerOptions, | ||
stdout: true, | ||
stderr: true | ||
const worker = new _threadWorkerJs.ThreadWorker(script, { | ||
workerOptions, | ||
workerIdleMemoryLimit: this.options.workerIdleMemoryLimit | ||
}); | ||
worker[kWorkerCapturedStderrPromise] = Promise.resolve(); | ||
worker[kWorkerCapturedStdoutPromise] = Promise.resolve(); | ||
this.captureWorkerStdioStreams(worker); | ||
worker["filteredStdout"] = worker.stdout.pipe((0, _createFilteredStreamTransformJs.createFilteredStreamTransform)()); | ||
worker["filteredStderr"] = worker.stderr.pipe((0, _createFilteredStreamTransformJs.createFilteredStreamTransform)()); | ||
const msgHandler = (data)=>{ | ||
if (data.type === "status") { | ||
// In case of success: Call the callback that was passed to `runTask`, | ||
// remove the `TaskInfo` associated with the Worker, and mark it as free | ||
// again. | ||
Promise.all([ | ||
worker[kWorkerCapturedStdoutPromise], | ||
worker[kWorkerCapturedStderrPromise] | ||
]).then(()=>{ | ||
const { err , results } = data; | ||
const weight = worker[kTaskInfo].weight; | ||
worker[kTaskInfo].done(err, results); | ||
worker[kTaskInfo] = null; | ||
this.availability += weight; | ||
this.checkMemoryUsage(worker); | ||
}); | ||
} else if (data.type === "report-memory-usage") { | ||
this.maxWorkerMemoryUsage = Math.max(this.maxWorkerMemoryUsage, data.memoryUsage); | ||
const limit = this.options.workerIdleMemoryLimit ?? _os.default.totalmem(); | ||
if (limit && data.memoryUsage > limit) { | ||
this.restartWorker(worker); | ||
} else { | ||
this.freeWorker(worker); | ||
} | ||
} | ||
}; | ||
worker.on("message", msgHandler); | ||
const errHandler = (err)=>{ | ||
Promise.all([ | ||
worker[kWorkerCapturedStdoutPromise], | ||
worker[kWorkerCapturedStderrPromise] | ||
]).then(()=>{ | ||
// In case of an uncaught exception: Call the callback that was passed to | ||
// `runTask` with the error. | ||
if (worker[kTaskInfo]) { | ||
const weight = worker[kTaskInfo].weight; | ||
worker[kTaskInfo].done(err, null); | ||
this.availability += weight; | ||
} | ||
this.emit("error", err); | ||
this.restartWorker(worker); | ||
}); | ||
}; | ||
// The 'error' event is emitted if the worker thread throws an uncaught exception. In that case, the worker is terminated. | ||
worker.on("error", errHandler); | ||
worker.on("free", (data)=>{ | ||
const { weight } = data; | ||
this.availability += weight; | ||
this.emit(workerFreedEvent); | ||
}); | ||
this.workers.push(worker); | ||
this.freeWorkers.push(worker); | ||
this.emit(kWorkerFreedEvent); | ||
} | ||
@@ -203,67 +81,13 @@ exec(task, weight, setup, cleanup, abortSignal) { | ||
} | ||
if (this.freeWorkers.length > 0) { | ||
const worker = this.freeWorkers.pop(); | ||
// This is to immediate execute tasks if there ARE free workers | ||
// If there are no free workers, the "workerFreedEvent" will call this function again to start the task | ||
const worker = this.workers.find((w)=>w.status === "free"); | ||
if (worker) { | ||
const work = this.queue[workIndex]; | ||
this.queue.splice(workIndex, 1); | ||
this.availability -= work.weight; | ||
this.queue.splice(workIndex, 1); | ||
const { task , resolve , reject , cleanup , setup } = work; | ||
if (worker) { | ||
abortSignal?.addEventListener("abort", ()=>{ | ||
worker.postMessage({ | ||
type: "abort" | ||
}); | ||
}); | ||
const id = _crypto.default.randomBytes(32).toString("hex"); | ||
worker[kTaskInfo] = new WorkerPoolTaskInfo({ | ||
id, | ||
weight: work.weight, | ||
cleanup, | ||
resolve, | ||
reject, | ||
worker, | ||
setup | ||
}); | ||
// Create a pair of promises that are only resolved when a specific task end marker is detected | ||
// in the worker's stdout/stderr streams. | ||
worker[kWorkerCapturedStdoutPromise] = new Promise((onResolve)=>{ | ||
worker[kWorkerCapturedStdoutResolve] = onResolve; | ||
}); | ||
worker[kWorkerCapturedStderrPromise] = new Promise((onResolve)=>{ | ||
worker[kWorkerCapturedStderrResolve] = onResolve; | ||
}); | ||
worker.postMessage({ | ||
type: "start", | ||
task: { | ||
...task, | ||
weight: work.weight | ||
}, | ||
id | ||
}); | ||
} | ||
worker.start(work, abortSignal); | ||
} | ||
} | ||
checkMemoryUsage(worker) { | ||
worker.postMessage({ | ||
type: "check-memory-usage" | ||
}); | ||
} | ||
freeWorker(worker) { | ||
this.freeWorkers.push(worker); | ||
this.emit(kWorkerFreedEvent); | ||
} | ||
restartWorker(worker) { | ||
this.workerRestarts++; | ||
worker.terminate(); | ||
const freeWorkerIndex = this.freeWorkers.indexOf(worker); | ||
if (freeWorkerIndex !== -1) { | ||
this.freeWorkers.splice(freeWorkerIndex, 1); // remove from free workers list | ||
} | ||
this.workers.splice(this.workers.indexOf(worker), 1); | ||
this.addNewWorker(); | ||
} | ||
async close() { | ||
for (const worker of this.workers){ | ||
worker.removeAllListeners(); | ||
worker.unref(); | ||
} | ||
await Promise.all(this.workers.map((worker)=>worker.terminate())); | ||
@@ -279,4 +103,2 @@ } | ||
this.availability = 0; | ||
this.maxWorkerMemoryUsage = 0; | ||
this.workerRestarts = 0; | ||
this.maxWorkers = this.options.maxWorkers ?? _os.default.cpus().length - 1; | ||
@@ -288,5 +110,5 @@ this.availability = this.maxWorkers; | ||
this.ensureWorkers(); | ||
// Any time the kWorkerFreedEvent is emitted, dispatch | ||
// Any time the workerFreedEvent is emitted, dispatch | ||
// the next task pending in the queue, if any. | ||
this.on(kWorkerFreedEvent, ()=>{ | ||
this.on(workerFreedEvent, ()=>{ | ||
if (this.queue.length > 0) { | ||
@@ -293,0 +115,0 @@ this._exec(); |
{ | ||
"name": "@lage-run/worker-threads-pool", | ||
"version": "0.4.5", | ||
"version": "0.5.0", | ||
"description": "A worker_threads pool implementation based on the official Node.js async_hooks documentation", | ||
@@ -5,0 +5,0 @@ "repository": { |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
44918
28
1182