@lage-run/worker-threads-pool
Advanced tools
Comparing version 0.2.0 to 0.3.0
@@ -5,3 +5,18 @@ { | ||
{ | ||
"date": "Fri, 21 Oct 2022 21:36:26 GMT", | ||
"date": "Wed, 26 Oct 2022 22:00:56 GMT", | ||
"tag": "@lage-run/worker-threads-pool_v0.3.0", | ||
"version": "0.3.0", | ||
"comments": { | ||
"minor": [ | ||
{ | ||
"author": "kchau@microsoft.com", | ||
"package": "@lage-run/worker-threads-pool", | ||
"commit": "d4f959e1c46d50725f556f92917f1a1cf3ee0bd9", | ||
"comment": "adds support for weighted targets" | ||
} | ||
] | ||
} | ||
}, | ||
{ | ||
"date": "Fri, 21 Oct 2022 21:36:38 GMT", | ||
"tag": "@lage-run/worker-threads-pool_v0.2.0", | ||
@@ -8,0 +23,0 @@ "version": "0.2.0", |
# Change Log - @lage-run/worker-threads-pool | ||
This log was last generated on Fri, 21 Oct 2022 21:36:26 GMT and should not be manually modified. | ||
This log was last generated on Wed, 26 Oct 2022 22:00:56 GMT and should not be manually modified. | ||
<!-- Start content --> | ||
## 0.3.0 | ||
Wed, 26 Oct 2022 22:00:56 GMT | ||
### Minor changes | ||
- adds support for weighted targets (kchau@microsoft.com) | ||
## 0.2.0 | ||
Fri, 21 Oct 2022 21:36:26 GMT | ||
Fri, 21 Oct 2022 21:36:38 GMT | ||
@@ -11,0 +19,0 @@ ### Minor changes |
@@ -21,5 +21,5 @@ /// <reference types="node" /> | ||
constructor(options: AggregatedPoolOptions); | ||
exec(data: unknown, setup?: (worker: Worker, stdout: Readable, stderr: Readable) => void, cleanup?: (args: any) => void, abortSignal?: AbortSignal): Promise<unknown>; | ||
exec(data: Record<string, unknown>, weight: number, setup?: (worker: Worker, stdout: Readable, stderr: Readable) => void, cleanup?: (args: any) => void, abortSignal?: AbortSignal): Promise<unknown>; | ||
close(): Promise<unknown>; | ||
} | ||
export {}; |
@@ -23,7 +23,7 @@ "use strict"; | ||
} | ||
this.options.logger.verbose(`Workers pools created: ${[...maxWorkersByGroup.entries()] | ||
this.options.logger.verbose(`Workers pools created: ${[...maxWorkersByGroup.entries(), ["default", defaultPoolWorkersCount]] | ||
.map(([group, count]) => `${group} (${count})`) | ||
.join(", ")}, default (${defaultPoolWorkersCount})`); | ||
.join(", ")}`); | ||
} | ||
async exec(data, setup, cleanup, abortSignal) { | ||
async exec(data, weight, setup, cleanup, abortSignal) { | ||
var _a; | ||
@@ -35,3 +35,3 @@ const group = this.options.groupBy(data); | ||
} | ||
return pool.exec(data, setup, cleanup, abortSignal); | ||
return pool.exec(data, weight, setup, cleanup, abortSignal); | ||
} | ||
@@ -38,0 +38,0 @@ async close() { |
@@ -6,4 +6,4 @@ /// <reference types="node" /> | ||
export interface Pool { | ||
exec(data: unknown, setup?: (worker: Worker, stdout: Readable, stderr: Readable) => void, cleanup?: (args: any) => void, abortSignal?: AbortSignal): Promise<unknown>; | ||
exec(data: unknown, weight: number, setup?: (worker: Worker, stdout: Readable, stderr: Readable) => void, cleanup?: (args: any) => void, abortSignal?: AbortSignal): Promise<unknown>; | ||
close(): Promise<unknown>; | ||
} |
@@ -15,3 +15,4 @@ /** | ||
cleanup?: (worker: Worker) => void; | ||
task: unknown; | ||
task: Record<string, unknown>; | ||
weight: number; | ||
resolve: (value?: unknown) => void; | ||
@@ -25,2 +26,4 @@ reject: (reason: unknown) => void; | ||
queue: QueueItem[]; | ||
maxWorkers: number; | ||
availability: number; | ||
constructor(options: WorkerPoolOptions); | ||
@@ -30,3 +33,3 @@ ensureWorkers(): void; | ||
addNewWorker(): void; | ||
exec(task: unknown, setup?: (worker: Worker, stdout: Readable, stderr: Readable) => void, cleanup?: (worker: Worker) => void, abortSignal?: AbortSignal): Promise<unknown>; | ||
exec(task: Record<string, unknown>, weight: number, setup?: (worker: Worker, stdout: Readable, stderr: Readable) => void, cleanup?: (worker: Worker) => void, abortSignal?: AbortSignal): Promise<unknown>; | ||
_exec(abortSignal?: AbortSignal): void; | ||
@@ -33,0 +36,0 @@ close(): Promise<void>; |
@@ -36,2 +36,5 @@ "use strict"; | ||
} | ||
get weight() { | ||
return this.options.weight; | ||
} | ||
done(err, results) { | ||
@@ -53,2 +56,3 @@ const { cleanup, worker, resolve, reject } = this.options; | ||
constructor(options) { | ||
var _a; | ||
super(); | ||
@@ -59,2 +63,6 @@ this.options = options; | ||
this.queue = []; | ||
this.maxWorkers = 0; | ||
this.availability = 0; | ||
this.maxWorkers = (_a = this.options.maxWorkers) !== null && _a !== void 0 ? _a : os_1.default.cpus().length - 1; | ||
this.availability = this.maxWorkers; | ||
this.workers = []; | ||
@@ -74,4 +82,3 @@ this.freeWorkers = []; | ||
if (this.workers.length === 0) { | ||
const { maxWorkers = os_1.default.cpus().length - 1 } = this.options; | ||
for (let i = 0; i < maxWorkers; i++) { | ||
for (let i = 0; i < this.maxWorkers; i++) { | ||
this.addNewWorker(); | ||
@@ -132,4 +139,6 @@ } | ||
const { err, results } = data; | ||
const weight = worker[kTaskInfo].weight; | ||
worker[kTaskInfo].done(err, results); | ||
worker[kTaskInfo] = null; | ||
this.availability += weight; | ||
this.freeWorkers.push(worker); | ||
@@ -145,3 +154,5 @@ this.emit(kWorkerFreedEvent); | ||
if (worker[kTaskInfo]) { | ||
const weight = worker[kTaskInfo].weight; | ||
worker[kTaskInfo].done(err, null); | ||
this.availability += weight; | ||
} | ||
@@ -160,8 +171,10 @@ this.emit("error", err); | ||
} | ||
exec(task, setup, cleanup, abortSignal) { | ||
exec(task, weight, setup, cleanup, abortSignal) { | ||
if (abortSignal === null || abortSignal === void 0 ? void 0 : abortSignal.aborted) { | ||
return Promise.resolve(); | ||
} | ||
// cull the weight of the task to be [1, maxWorkers] | ||
weight = Math.min(Math.max(1, weight), this.maxWorkers); | ||
return new Promise((resolve, reject) => { | ||
this.queue.push({ task, resolve, reject, cleanup, setup }); | ||
this.queue.push({ task: Object.assign(Object.assign({}, task), { weight }), weight, resolve, reject, cleanup, setup }); | ||
this._exec(abortSignal); | ||
@@ -171,5 +184,12 @@ }); | ||
_exec(abortSignal) { | ||
// find work that will fit the availability of workers | ||
const workIndex = this.queue.findIndex((item) => item.weight <= this.availability); | ||
if (workIndex === -1) { | ||
return; | ||
} | ||
if (this.freeWorkers.length > 0) { | ||
const worker = this.freeWorkers.pop(); | ||
const work = this.queue.shift(); | ||
const work = this.queue[workIndex]; | ||
this.availability -= work.weight; | ||
this.queue.splice(workIndex, 1); | ||
const { task, resolve, reject, cleanup, setup } = work; | ||
@@ -181,3 +201,3 @@ if (worker) { | ||
const id = crypto_1.default.randomBytes(32).toString("hex"); | ||
worker[kTaskInfo] = new WorkerPoolTaskInfo({ id, cleanup, resolve, reject, worker, setup }); | ||
worker[kTaskInfo] = new WorkerPoolTaskInfo({ id, weight: work.weight, cleanup, resolve, reject, worker, setup }); | ||
// Create a pair of promises that are only resolved when a specific task end marker is detected | ||
@@ -191,3 +211,3 @@ // in the worker's stdout/stderr streams. | ||
}); | ||
worker.postMessage({ type: "start", task, id }); | ||
worker.postMessage({ type: "start", task: Object.assign(Object.assign({}, task), { weight: work.weight }), id }); | ||
} | ||
@@ -194,0 +214,0 @@ } |
{ | ||
"name": "@lage-run/worker-threads-pool", | ||
"version": "0.2.0", | ||
"version": "0.3.0", | ||
"description": "A worker_threads pool implementation based on the official Node.js async_hooks documentation", | ||
@@ -5,0 +5,0 @@ "repository": { |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
41207
644