@lage-run/worker-threads-pool
Advanced tools
Comparing version 0.8.4 to 0.8.5
@@ -5,3 +5,18 @@ { | ||
{ | ||
"date": "Wed, 09 Oct 2024 17:20:17 GMT", | ||
"date": "Wed, 04 Dec 2024 23:49:59 GMT", | ||
"version": "0.8.5", | ||
"tag": "@lage-run/worker-threads-pool_v0.8.5", | ||
"comments": { | ||
"patch": [ | ||
{ | ||
"author": "kchau@microsoft.com", | ||
"package": "@lage-run/worker-threads-pool", | ||
"commit": "a512ef3dffa4e87edcd33a762e10418e0579ca02", | ||
"comment": "Fixes the server worker model to use aggregated pool" | ||
} | ||
] | ||
} | ||
}, | ||
{ | ||
"date": "Wed, 09 Oct 2024 17:20:33 GMT", | ||
"version": "0.8.4", | ||
@@ -8,0 +23,0 @@ "tag": "@lage-run/worker-threads-pool_v0.8.4", |
# Change Log - @lage-run/worker-threads-pool | ||
<!-- This log was last generated on Wed, 09 Oct 2024 17:20:17 GMT and should not be manually modified. --> | ||
<!-- This log was last generated on Wed, 04 Dec 2024 23:49:59 GMT and should not be manually modified. --> | ||
<!-- Start content --> | ||
## 0.8.5 | ||
Wed, 04 Dec 2024 23:49:59 GMT | ||
### Patches | ||
- Fixes the server worker model to use aggregated pool (kchau@microsoft.com) | ||
## 0.8.4 | ||
Wed, 09 Oct 2024 17:20:17 GMT | ||
Wed, 09 Oct 2024 17:20:33 GMT | ||
@@ -11,0 +19,0 @@ ### Patches |
@@ -5,2 +5,3 @@ /// <reference types="node" /> | ||
/// <reference types="global" /> | ||
/// <reference types="node" /> | ||
import type { Readable } from "stream"; | ||
@@ -12,2 +13,3 @@ import type { WorkerOptions } from "worker_threads"; | ||
import { WorkerPool } from "./WorkerPool.js"; | ||
import EventEmitter from "events"; | ||
interface AggregatedPoolOptions { | ||
@@ -22,3 +24,3 @@ groupBy: (data: any) => string; | ||
} | ||
export declare class AggregatedPool implements Pool { | ||
export declare class AggregatedPool extends EventEmitter implements Pool { | ||
private options; | ||
@@ -25,0 +27,0 @@ readonly groupedPools: Map<string, WorkerPool>; |
@@ -12,2 +12,3 @@ "use strict"; | ||
const _WorkerPool = require("./WorkerPool.js"); | ||
const _events = /*#__PURE__*/ _interop_require_default(require("events")); | ||
function _define_property(obj, key, value) { | ||
@@ -26,3 +27,8 @@ if (key in obj) { | ||
} | ||
class AggregatedPool { | ||
function _interop_require_default(obj) { | ||
return obj && obj.__esModule ? obj : { | ||
default: obj | ||
}; | ||
} | ||
class AggregatedPool extends _events.default { | ||
stats() { | ||
@@ -61,8 +67,4 @@ const stats = [ | ||
constructor(options){ | ||
_define_property(this, "options", void 0); | ||
_define_property(this, "groupedPools", void 0); | ||
_define_property(this, "defaultPool", void 0); | ||
this.options = options; | ||
this.groupedPools = new Map(); | ||
const { maxWorkers , maxWorkersByGroup , script , workerOptions } = options; | ||
super(), _define_property(this, "options", void 0), _define_property(this, "groupedPools", void 0), _define_property(this, "defaultPool", void 0), this.options = options, this.groupedPools = new Map(); | ||
const { maxWorkers, maxWorkersByGroup, script, workerOptions } = options; | ||
let totalGroupedWorkers = 0; | ||
@@ -98,3 +100,15 @@ for (const [group, groupMaxWorkers] of maxWorkersByGroup.entries()){ | ||
].map(([group, count])=>`${group} (${count})`).join(", ")}`); | ||
// Any time the idle event is emitted by any pool, dispatch an aggregated idle event if everything is idle | ||
const pools = [ | ||
...this.groupedPools.values(), | ||
this.defaultPool | ||
]; | ||
pools.forEach((pool)=>{ | ||
pool?.on(_WorkerPool.WorkerPoolEvents.idle, ()=>{ | ||
if (pools.every((p)=>p?.isIdle())) { | ||
this.emit(_WorkerPool.WorkerPoolEvents.idle); | ||
} | ||
}); | ||
}); | ||
} | ||
} |
@@ -12,4 +12,4 @@ "use strict"; | ||
_export(exports, { | ||
registerWorker: function() { | ||
return _registerWorker.registerWorker; | ||
AggregatedPool: function() { | ||
return _AggregatedPool.AggregatedPool; | ||
}, | ||
@@ -19,4 +19,4 @@ WorkerPool: function() { | ||
}, | ||
AggregatedPool: function() { | ||
return _AggregatedPool.AggregatedPool; | ||
registerWorker: function() { | ||
return _registerWorker.registerWorker; | ||
} | ||
@@ -23,0 +23,0 @@ }); |
@@ -12,13 +12,13 @@ "use strict"; | ||
_export(exports, { | ||
END_MARKER_PREFIX: function() { | ||
return END_MARKER_PREFIX; | ||
}, | ||
START_MARKER_PREFIX: function() { | ||
return START_MARKER_PREFIX; | ||
}, | ||
END_MARKER_PREFIX: function() { | ||
return END_MARKER_PREFIX; | ||
endMarker: function() { | ||
return endMarker; | ||
}, | ||
startMarker: function() { | ||
return startMarker; | ||
}, | ||
endMarker: function() { | ||
return endMarker; | ||
} | ||
@@ -25,0 +25,0 @@ }); |
@@ -36,3 +36,3 @@ "use strict"; | ||
done(err, results) { | ||
const { cleanup , worker , resolve , reject } = this.options; | ||
const { cleanup, worker, resolve, reject } = this.options; | ||
if (cleanup) { | ||
@@ -49,5 +49,3 @@ this.runInAsyncScope(cleanup, null, worker); | ||
constructor(options){ | ||
super("WorkerPoolTaskInfo"); | ||
_define_property(this, "options", void 0); | ||
this.options = options; | ||
super("WorkerPoolTaskInfo"), _define_property(this, "options", void 0), this.options = options; | ||
if (options.setup) { | ||
@@ -54,0 +52,0 @@ this.runInAsyncScope(options.setup, null, options.worker, options.worker.stdout, options.worker.stderr); |
@@ -94,3 +94,3 @@ "use strict"; | ||
this.status = "busy"; | ||
const { task , resolve , reject , cleanup , setup } = work; | ||
const { task, resolve, reject, cleanup, setup } = work; | ||
abortSignal?.addEventListener("abort", _class_private_method_get(this, _handleAbort, handleAbort)); | ||
@@ -160,43 +160,23 @@ const id = _crypto.default.randomBytes(32).toString("hex"); | ||
constructor(script, options){ | ||
super(); | ||
_class_private_method_init(this, _createNewWorker); | ||
_class_private_method_init(this, _ready); | ||
_class_private_method_init(this, _captureWorkerStdioStreams); | ||
_class_private_method_init(this, _handleAbort); | ||
_define_property(this, "script", void 0); | ||
_define_property(this, "options", void 0); | ||
_class_private_field_init(this, _taskInfo, { | ||
super(), _class_private_method_init(this, _createNewWorker), _class_private_method_init(this, _ready), _class_private_method_init(this, _captureWorkerStdioStreams), _class_private_method_init(this, _handleAbort), _define_property(this, "script", void 0), _define_property(this, "options", void 0), _class_private_field_init(this, _taskInfo, { | ||
writable: true, | ||
value: void 0 | ||
}); | ||
_class_private_field_init(this, _stdoutInfo, { | ||
}), _class_private_field_init(this, _stdoutInfo, { | ||
writable: true, | ||
value: void 0 | ||
}); | ||
_class_private_field_init(this, _stderrInfo, { | ||
}), _class_private_field_init(this, _stderrInfo, { | ||
writable: true, | ||
value: void 0 | ||
}); | ||
_class_private_field_init(this, _worker, { | ||
}), _class_private_field_init(this, _worker, { | ||
writable: true, | ||
value: void 0 | ||
}); | ||
_define_property(this, "status", void 0); | ||
_define_property(this, "restarts", void 0); | ||
_define_property(this, "maxWorkerMemoryUsage", void 0); | ||
this.script = script; | ||
this.options = options; | ||
_class_private_field_set(this, _stdoutInfo, { | ||
}), _define_property(this, "status", void 0), _define_property(this, "restarts", void 0), _define_property(this, "maxWorkerMemoryUsage", void 0), this.script = script, this.options = options, _class_private_field_set(this, _stdoutInfo, { | ||
stream: new _stream.Readable(), | ||
promise: Promise.resolve(), | ||
resolve: ()=>{} | ||
}); | ||
_class_private_field_set(this, _stderrInfo, { | ||
}), _class_private_field_set(this, _stderrInfo, { | ||
stream: new _stream.Readable(), | ||
promise: Promise.resolve(), | ||
resolve: ()=>{} | ||
}); | ||
this.status = "busy"; | ||
this.restarts = 0; | ||
this.maxWorkerMemoryUsage = 0; | ||
}), this.status = "busy", this.restarts = 0, this.maxWorkerMemoryUsage = 0; | ||
_class_private_method_get(this, _createNewWorker, createNewWorker).call(this); | ||
@@ -206,3 +186,3 @@ } | ||
function createNewWorker() { | ||
const { workerOptions } = this.options; | ||
const { workerOptions } = this.options; | ||
const script = this.script; | ||
@@ -236,3 +216,3 @@ const worker = new _worker_threads.Worker(script, { | ||
]).then(()=>{ | ||
const { err , results } = data; | ||
const { err, results } = data; | ||
if (_class_private_field_get(this, _taskInfo)) { | ||
@@ -272,2 +252,3 @@ _class_private_field_get(this, _taskInfo).abortSignal?.removeEventListener("abort", _class_private_method_get(this, _handleAbort, handleAbort)); | ||
worker.on("error", errHandler); | ||
// Assign the new worker to private properties | ||
_class_private_field_set(this, _worker, worker); | ||
@@ -274,0 +255,0 @@ _class_private_field_set(this, _stdoutInfo, { |
@@ -11,2 +11,8 @@ /// <reference types="node" /> | ||
import type { WorkerPoolOptions } from "./types/WorkerPoolOptions.js"; | ||
export declare const WorkerPoolEvents: { | ||
readonly freedWorker: "freedWorker"; | ||
readonly idle: "idle"; | ||
readonly busy: "busy"; | ||
readonly restarting: "restarting"; | ||
}; | ||
export declare class WorkerPool extends EventEmitter implements Pool { | ||
@@ -21,2 +27,3 @@ private options; | ||
constructor(options: WorkerPoolOptions); | ||
isIdle(): boolean; | ||
get workerRestarts(): number; | ||
@@ -23,0 +30,0 @@ get maxWorkerMemoryUsage(): number; |
@@ -5,6 +5,14 @@ "use strict"; | ||
}); | ||
Object.defineProperty(exports, "WorkerPool", { | ||
enumerable: true, | ||
get: function() { | ||
function _export(target, all) { | ||
for(var name in all)Object.defineProperty(target, name, { | ||
enumerable: true, | ||
get: all[name] | ||
}); | ||
} | ||
_export(exports, { | ||
WorkerPool: function() { | ||
return WorkerPool; | ||
}, | ||
WorkerPoolEvents: function() { | ||
return WorkerPoolEvents; | ||
} | ||
@@ -33,4 +41,12 @@ }); | ||
} | ||
const workerFreedEvent = "free"; | ||
const WorkerPoolEvents = { | ||
freedWorker: "freedWorker", | ||
idle: "idle", | ||
busy: "busy", | ||
restarting: "restarting" | ||
}; | ||
class WorkerPool extends _events.EventEmitter { | ||
isIdle() { | ||
return this.workers.every((w)=>w.status === "free"); | ||
} | ||
get workerRestarts() { | ||
@@ -57,3 +73,3 @@ return this.workers.reduce((acc, worker)=>acc + worker.restarts, 0); | ||
if (this.workers.length <= this.maxWorkers) { | ||
const { script , workerOptions } = this.options; | ||
const { script, workerOptions } = this.options; | ||
const worker = new _ThreadWorker.ThreadWorker(script, { | ||
@@ -64,5 +80,5 @@ workerOptions, | ||
worker.on("free", (data)=>{ | ||
const { weight } = data; | ||
const { weight } = data; | ||
this.availability += weight; | ||
this.emit(workerFreedEvent); | ||
this.emit(WorkerPoolEvents.freedWorker); | ||
}); | ||
@@ -118,17 +134,3 @@ this.workers.push(worker); | ||
constructor(options){ | ||
super(); | ||
_define_property(this, "options", void 0); | ||
_define_property(this, "workers", void 0); | ||
_define_property(this, "freeWorkers", void 0); | ||
_define_property(this, "queue", void 0); | ||
_define_property(this, "minWorkers", void 0); | ||
_define_property(this, "maxWorkers", void 0); | ||
_define_property(this, "availability", void 0); | ||
this.options = options; | ||
this.workers = []; | ||
this.freeWorkers = []; | ||
this.queue = []; | ||
this.minWorkers = 0; | ||
this.maxWorkers = 0; | ||
this.availability = 0; | ||
super(), _define_property(this, "options", void 0), _define_property(this, "workers", void 0), _define_property(this, "freeWorkers", void 0), _define_property(this, "queue", void 0), _define_property(this, "minWorkers", void 0), _define_property(this, "maxWorkers", void 0), _define_property(this, "availability", void 0), this.options = options, this.workers = [], this.freeWorkers = [], this.queue = [], this.minWorkers = 0, this.maxWorkers = 0, this.availability = 0; | ||
this.minWorkers = this.options.minWorkers ?? 2; | ||
@@ -143,7 +145,8 @@ this.maxWorkers = this.options.maxWorkers ?? _os.default.cpus().length - 1; | ||
// the next task pending in the queue, if any. | ||
this.on(workerFreedEvent, ()=>{ | ||
this.on(WorkerPoolEvents.freedWorker, ()=>{ | ||
if (this.queue.length > 0) { | ||
this._exec(); | ||
} else if (this.workers.every((w)=>w.status === "free")) { | ||
this.emit("idle"); | ||
this.emit(WorkerPoolEvents.busy); | ||
} else if (this.isIdle()) { | ||
this.emit(WorkerPoolEvents.idle); | ||
} | ||
@@ -150,0 +153,0 @@ }); |
{ | ||
"name": "@lage-run/worker-threads-pool", | ||
"version": "0.8.4", | ||
"version": "0.8.5", | ||
"description": "A worker_threads pool implementation based on the official Node.js async_hooks documentation", | ||
@@ -5,0 +5,0 @@ "repository": { |
95303
39
1507