@lage-run/worker-threads-pool
Advanced tools
Comparing version 0.3.0 to 0.4.0
@@ -5,3 +5,18 @@ { | ||
{ | ||
"date": "Wed, 26 Oct 2022 22:00:56 GMT", | ||
"date": "Sat, 29 Oct 2022 01:06:03 GMT", | ||
"tag": "@lage-run/worker-threads-pool_v0.4.0", | ||
"version": "0.4.0", | ||
"comments": { | ||
"minor": [ | ||
{ | ||
"author": "kchau@microsoft.com", | ||
"package": "@lage-run/worker-threads-pool", | ||
"commit": "15f4763e0ac29760f813523d3f37b6a5078f4897", | ||
"comment": "adds memory limit + restart capability" | ||
} | ||
] | ||
} | ||
}, | ||
{ | ||
"date": "Wed, 26 Oct 2022 22:01:13 GMT", | ||
"tag": "@lage-run/worker-threads-pool_v0.3.0", | ||
@@ -8,0 +23,0 @@ "version": "0.3.0", |
# Change Log - @lage-run/worker-threads-pool | ||
This log was last generated on Wed, 26 Oct 2022 22:00:56 GMT and should not be manually modified. | ||
This log was last generated on Sat, 29 Oct 2022 01:06:03 GMT and should not be manually modified. | ||
<!-- Start content --> | ||
## 0.4.0 | ||
Sat, 29 Oct 2022 01:06:03 GMT | ||
### Minor changes | ||
- adds memory limit + restart capability (kchau@microsoft.com) | ||
## 0.3.0 | ||
Wed, 26 Oct 2022 22:00:56 GMT | ||
Wed, 26 Oct 2022 22:01:13 GMT | ||
@@ -11,0 +19,0 @@ ### Minor changes |
@@ -15,2 +15,3 @@ /// <reference types="node" /> | ||
logger: Logger; | ||
workerIdleMemoryLimit?: number; | ||
} | ||
@@ -22,2 +23,6 @@ export declare class AggregatedPool implements Pool { | ||
constructor(options: AggregatedPoolOptions); | ||
stats(): { | ||
maxWorkerMemoryUsage: number; | ||
workerRestarts: number; | ||
}; | ||
exec(data: Record<string, unknown>, weight: number, setup?: (worker: Worker, stdout: Readable, stderr: Readable) => void, cleanup?: (args: any) => void, abortSignal?: AbortSignal): Promise<unknown>; | ||
@@ -24,0 +29,0 @@ close(): Promise<unknown>; |
@@ -12,3 +12,8 @@ "use strict"; | ||
for (const [group, groupMaxWorkers] of maxWorkersByGroup.entries()) { | ||
const pool = new WorkerPool_1.WorkerPool({ maxWorkers: groupMaxWorkers, workerOptions, script }); | ||
const pool = new WorkerPool_1.WorkerPool({ | ||
maxWorkers: groupMaxWorkers, | ||
workerOptions, | ||
script, | ||
workerIdleMemoryLimit: options.workerIdleMemoryLimit, | ||
}); | ||
this.groupedPools.set(group, pool); | ||
@@ -22,3 +27,8 @@ totalGroupedWorkers += groupMaxWorkers; | ||
if (defaultPoolWorkersCount > 0) { | ||
this.defaultPool = new WorkerPool_1.WorkerPool({ maxWorkers: defaultPoolWorkersCount, workerOptions, script }); | ||
this.defaultPool = new WorkerPool_1.WorkerPool({ | ||
maxWorkers: defaultPoolWorkersCount, | ||
workerOptions, | ||
script, | ||
workerIdleMemoryLimit: options.workerIdleMemoryLimit, | ||
}); | ||
} | ||
@@ -29,2 +39,13 @@ this.options.logger.verbose(`Workers pools created: ${[...maxWorkersByGroup.entries(), ["default", defaultPoolWorkersCount]] | ||
} | ||
stats() { | ||
const stats = [...this.groupedPools.values(), this.defaultPool].reduce((acc, pool) => { | ||
if (pool) { | ||
const poolStats = pool.stats(); | ||
acc.maxWorkerMemoryUsage = Math.max(acc.maxWorkerMemoryUsage, poolStats.maxWorkerMemoryUsage); | ||
acc.workerRestarts = acc.workerRestarts + poolStats.workerRestarts; | ||
} | ||
return acc; | ||
}, { maxWorkerMemoryUsage: 0, workerRestarts: 0 }); | ||
return stats; | ||
} | ||
async exec(data, weight, setup, cleanup, abortSignal) { | ||
@@ -31,0 +52,0 @@ var _a; |
@@ -16,2 +16,4 @@ "use strict"; | ||
return abortController === null || abortController === void 0 ? void 0 : abortController.abort(); | ||
case "check-memory-usage": | ||
return reportMemory(worker_threads_1.parentPort); | ||
} | ||
@@ -24,6 +26,6 @@ }); | ||
const results = await fn(task, abortSignal); | ||
worker_threads_1.parentPort === null || worker_threads_1.parentPort === void 0 ? void 0 : worker_threads_1.parentPort.postMessage({ err: undefined, results }); | ||
worker_threads_1.parentPort === null || worker_threads_1.parentPort === void 0 ? void 0 : worker_threads_1.parentPort.postMessage({ type: "status", err: undefined, results }); | ||
} | ||
catch (err) { | ||
worker_threads_1.parentPort === null || worker_threads_1.parentPort === void 0 ? void 0 : worker_threads_1.parentPort.postMessage({ err, results: undefined }); | ||
worker_threads_1.parentPort === null || worker_threads_1.parentPort === void 0 ? void 0 : worker_threads_1.parentPort.postMessage({ type: "status", err, results: undefined }); | ||
} | ||
@@ -35,4 +37,11 @@ finally { | ||
} | ||
function reportMemory(port) { | ||
const message = { | ||
type: "report-memory-usage", | ||
memoryUsage: process.memoryUsage().heapUsed, | ||
}; | ||
port.postMessage(message); | ||
} | ||
} | ||
exports.registerWorker = registerWorker; | ||
//# sourceMappingURL=registerWorker.js.map |
@@ -5,5 +5,10 @@ /// <reference types="node" /> | ||
import type { AbortSignal } from "abort-controller"; | ||
export interface PoolStats { | ||
maxWorkerMemoryUsage: number; | ||
workerRestarts: number; | ||
} | ||
export interface Pool { | ||
exec(data: unknown, weight: number, setup?: (worker: Worker, stdout: Readable, stderr: Readable) => void, cleanup?: (args: any) => void, abortSignal?: AbortSignal): Promise<unknown>; | ||
stats(): PoolStats; | ||
close(): Promise<unknown>; | ||
} |
@@ -6,3 +6,4 @@ /// <reference types="node" /> | ||
script: string; | ||
workerIdleMemoryLimit?: number; | ||
workerOptions?: WorkerOptions; | ||
} |
@@ -27,3 +27,9 @@ /** | ||
availability: number; | ||
maxWorkerMemoryUsage: number; | ||
workerRestarts: number; | ||
constructor(options: WorkerPoolOptions); | ||
stats(): { | ||
maxWorkerMemoryUsage: number; | ||
workerRestarts: number; | ||
}; | ||
ensureWorkers(): void; | ||
@@ -34,4 +40,7 @@ captureWorkerStdioStreams(worker: Worker): void; | ||
_exec(abortSignal?: AbortSignal): void; | ||
checkMemoryUsage(worker: Worker): void; | ||
freeWorker(worker: Worker): void; | ||
restartWorker(worker: Worker): void; | ||
close(): Promise<void>; | ||
} | ||
export {}; |
@@ -63,2 +63,4 @@ "use strict"; | ||
this.availability = 0; | ||
this.maxWorkerMemoryUsage = 0; | ||
this.workerRestarts = 0; | ||
this.maxWorkers = (_a = this.options.maxWorkers) !== null && _a !== void 0 ? _a : os_1.default.cpus().length - 1; | ||
@@ -78,2 +80,8 @@ this.availability = this.maxWorkers; | ||
} | ||
stats() { | ||
return { | ||
maxWorkerMemoryUsage: this.maxWorkerMemoryUsage, | ||
workerRestarts: this.workerRestarts, | ||
}; | ||
} | ||
ensureWorkers() { | ||
@@ -132,14 +140,26 @@ if (this.workers.length === 0) { | ||
const msgHandler = (data) => { | ||
// In case of success: Call the callback that was passed to `runTask`, | ||
// remove the `TaskInfo` associated with the Worker, and mark it as free | ||
// again. | ||
Promise.all([worker[kWorkerCapturedStdoutPromise], worker[kWorkerCapturedStderrPromise]]).then(() => { | ||
const { err, results } = data; | ||
const weight = worker[kTaskInfo].weight; | ||
worker[kTaskInfo].done(err, results); | ||
worker[kTaskInfo] = null; | ||
this.availability += weight; | ||
this.freeWorkers.push(worker); | ||
this.emit(kWorkerFreedEvent); | ||
}); | ||
var _a; | ||
if (data.type === "status") { | ||
// In case of success: Call the callback that was passed to `runTask`, | ||
// remove the `TaskInfo` associated with the Worker, and mark it as free | ||
// again. | ||
Promise.all([worker[kWorkerCapturedStdoutPromise], worker[kWorkerCapturedStderrPromise]]).then(() => { | ||
const { err, results } = data; | ||
const weight = worker[kTaskInfo].weight; | ||
worker[kTaskInfo].done(err, results); | ||
worker[kTaskInfo] = null; | ||
this.availability += weight; | ||
this.checkMemoryUsage(worker); | ||
}); | ||
} | ||
else if (data.type === "report-memory-usage") { | ||
this.maxWorkerMemoryUsage = Math.max(this.maxWorkerMemoryUsage, data.memoryUsage); | ||
const limit = (_a = this.options.workerIdleMemoryLimit) !== null && _a !== void 0 ? _a : os_1.default.totalmem(); | ||
if (limit && data.memoryUsage > limit) { | ||
this.restartWorker(worker); | ||
} | ||
else { | ||
this.freeWorker(worker); | ||
} | ||
} | ||
}; | ||
@@ -157,8 +177,6 @@ worker.on("message", msgHandler); | ||
this.emit("error", err); | ||
// Remove the worker from the list and start a new Worker to replace the | ||
// current one. | ||
this.workers.splice(this.workers.indexOf(worker), 1); | ||
this.addNewWorker(); | ||
this.restartWorker(worker); | ||
}); | ||
}; | ||
// The 'error' event is emitted if the worker thread throws an uncaught exception. In that case, the worker is terminated. | ||
worker.on("error", errHandler); | ||
@@ -210,2 +228,19 @@ this.workers.push(worker); | ||
} | ||
checkMemoryUsage(worker) { | ||
worker.postMessage({ type: "check-memory-usage" }); | ||
} | ||
freeWorker(worker) { | ||
this.freeWorkers.push(worker); | ||
this.emit(kWorkerFreedEvent); | ||
} | ||
restartWorker(worker) { | ||
this.workerRestarts++; | ||
worker.terminate(); | ||
const freeWorkerIndex = this.freeWorkers.indexOf(worker); | ||
if (freeWorkerIndex !== -1) { | ||
this.freeWorkers.splice(freeWorkerIndex, 1); // remove from free workers list | ||
} | ||
this.workers.splice(this.workers.indexOf(worker), 1); | ||
this.addNewWorker(); | ||
} | ||
async close() { | ||
@@ -212,0 +247,0 @@ for (const worker of this.workers) { |
{ | ||
"name": "@lage-run/worker-threads-pool", | ||
"version": "0.3.0", | ||
"version": "0.4.0", | ||
"description": "A worker_threads pool implementation based on the official Node.js async_hooks documentation", | ||
@@ -5,0 +5,0 @@ "repository": { |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
46730
744