@lage-run/worker-threads-pool
Advanced tools
Comparing version 0.4.4 to 0.4.5
@@ -5,3 +5,18 @@ { | ||
{ | ||
"date": "Tue, 01 Nov 2022 22:48:19 GMT", | ||
"date": "Wed, 16 Nov 2022 17:12:13 GMT", | ||
"tag": "@lage-run/worker-threads-pool_v0.4.5", | ||
"version": "0.4.5", | ||
"comments": { | ||
"patch": [ | ||
{ | ||
"author": "kchau@microsoft.com", | ||
"package": "@lage-run/worker-threads-pool", | ||
"commit": "a7a4471aeaa018e40c718ec6bd6611a0ab040765", | ||
"comment": "adding a handling case for when lines are still being outputted but the worker is freed" | ||
} | ||
] | ||
} | ||
}, | ||
{ | ||
"date": "Tue, 01 Nov 2022 22:48:33 GMT", | ||
"tag": "@lage-run/worker-threads-pool_v0.4.4", | ||
@@ -8,0 +23,0 @@ "version": "0.4.4", |
# Change Log - @lage-run/worker-threads-pool | ||
This log was last generated on Tue, 01 Nov 2022 22:48:19 GMT and should not be manually modified. | ||
This log was last generated on Wed, 16 Nov 2022 17:12:13 GMT and should not be manually modified. | ||
<!-- Start content --> | ||
## 0.4.5 | ||
Wed, 16 Nov 2022 17:12:13 GMT | ||
### Patches | ||
- adding a handling case for when lines are still being outputted but the worker is freed (kchau@microsoft.com) | ||
## 0.4.4 | ||
Tue, 01 Nov 2022 22:48:19 GMT | ||
Tue, 01 Nov 2022 22:48:33 GMT | ||
@@ -11,0 +19,0 @@ ### Patches |
/// <reference types="node" /> | ||
/// <reference types="node" /> | ||
/// <reference types="node" /> | ||
/// <reference types="global" /> | ||
@@ -3,0 +5,0 @@ import type { Readable } from "stream"; |
"use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
exports.AggregatedPool = void 0; | ||
const WorkerPool_js_1 = require("./WorkerPool.js"); | ||
Object.defineProperty(exports, "__esModule", { | ||
value: true | ||
}); | ||
Object.defineProperty(exports, "AggregatedPool", { | ||
enumerable: true, | ||
get: ()=>AggregatedPool | ||
}); | ||
const _workerPoolJs = require("./WorkerPool.js"); | ||
class AggregatedPool { | ||
constructor(options) { | ||
stats() { | ||
const stats = [ | ||
...this.groupedPools.values(), | ||
this.defaultPool | ||
].reduce((acc, pool)=>{ | ||
if (pool) { | ||
const poolStats = pool.stats(); | ||
acc.maxWorkerMemoryUsage = Math.max(acc.maxWorkerMemoryUsage, poolStats.maxWorkerMemoryUsage); | ||
acc.workerRestarts = acc.workerRestarts + poolStats.workerRestarts; | ||
} | ||
return acc; | ||
}, { | ||
maxWorkerMemoryUsage: 0, | ||
workerRestarts: 0 | ||
}); | ||
return stats; | ||
} | ||
async exec(data, weight, setup, cleanup, abortSignal) { | ||
const group = this.options.groupBy(data); | ||
const pool = this.groupedPools.get(group) ?? this.defaultPool; | ||
if (!pool) { | ||
throw new Error(`No pool found to be able to run ${group} tasks, try adjusting the maxWorkers & concurrency values`); | ||
} | ||
return pool.exec(data, weight, setup, cleanup, abortSignal); | ||
} | ||
async close() { | ||
const promises = [ | ||
...this.groupedPools.values(), | ||
this.defaultPool | ||
].map((pool)=>pool?.close()); | ||
return Promise.all(promises); | ||
} | ||
constructor(options){ | ||
this.options = options; | ||
this.groupedPools = new Map(); | ||
const { maxWorkers, maxWorkersByGroup, script, workerOptions } = options; | ||
const { maxWorkers , maxWorkersByGroup , script , workerOptions } = options; | ||
let totalGroupedWorkers = 0; | ||
for (const [group, groupMaxWorkers] of maxWorkersByGroup.entries()) { | ||
const pool = new WorkerPool_js_1.WorkerPool({ | ||
for (const [group, groupMaxWorkers] of maxWorkersByGroup.entries()){ | ||
const pool = new _workerPoolJs.WorkerPool({ | ||
maxWorkers: groupMaxWorkers, | ||
workerOptions, | ||
script, | ||
workerIdleMemoryLimit: options.workerIdleMemoryLimit, | ||
workerIdleMemoryLimit: options.workerIdleMemoryLimit | ||
}); | ||
@@ -26,39 +63,17 @@ this.groupedPools.set(group, pool); | ||
if (defaultPoolWorkersCount > 0) { | ||
this.defaultPool = new WorkerPool_js_1.WorkerPool({ | ||
this.defaultPool = new _workerPoolJs.WorkerPool({ | ||
maxWorkers: defaultPoolWorkersCount, | ||
workerOptions, | ||
script, | ||
workerIdleMemoryLimit: options.workerIdleMemoryLimit, | ||
workerIdleMemoryLimit: options.workerIdleMemoryLimit | ||
}); | ||
} | ||
this.options.logger.verbose(`Workers pools created: ${[...maxWorkersByGroup.entries(), ["default", defaultPoolWorkersCount]] | ||
.map(([group, count]) => `${group} (${count})`) | ||
.join(", ")}`); | ||
this.options.logger.verbose(`Workers pools created: ${[ | ||
...maxWorkersByGroup.entries(), | ||
[ | ||
"default", | ||
defaultPoolWorkersCount | ||
] | ||
].map(([group, count])=>`${group} (${count})`).join(", ")}`); | ||
} | ||
stats() { | ||
const stats = [...this.groupedPools.values(), this.defaultPool].reduce((acc, pool) => { | ||
if (pool) { | ||
const poolStats = pool.stats(); | ||
acc.maxWorkerMemoryUsage = Math.max(acc.maxWorkerMemoryUsage, poolStats.maxWorkerMemoryUsage); | ||
acc.workerRestarts = acc.workerRestarts + poolStats.workerRestarts; | ||
} | ||
return acc; | ||
}, { maxWorkerMemoryUsage: 0, workerRestarts: 0 }); | ||
return stats; | ||
} | ||
async exec(data, weight, setup, cleanup, abortSignal) { | ||
var _a; | ||
const group = this.options.groupBy(data); | ||
const pool = (_a = this.groupedPools.get(group)) !== null && _a !== void 0 ? _a : this.defaultPool; | ||
if (!pool) { | ||
throw new Error(`No pool found to be able to run ${group} tasks, try adjusting the maxWorkers & concurrency values`); | ||
} | ||
return pool.exec(data, weight, setup, cleanup, abortSignal); | ||
} | ||
async close() { | ||
const promises = [...this.groupedPools.values(), this.defaultPool].map((pool) => pool === null || pool === void 0 ? void 0 : pool.close()); | ||
return Promise.all(promises); | ||
} | ||
} | ||
exports.AggregatedPool = AggregatedPool; | ||
//# sourceMappingURL=AggregatedPool.js.map |
"use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
exports.createFilteredStreamTransform = void 0; | ||
const stream_1 = require("stream"); | ||
const stdioStreamMarkers_js_1 = require("./stdioStreamMarkers.js"); | ||
Object.defineProperty(exports, "__esModule", { | ||
value: true | ||
}); | ||
Object.defineProperty(exports, "createFilteredStreamTransform", { | ||
enumerable: true, | ||
get: ()=>createFilteredStreamTransform | ||
}); | ||
const _stream = require("stream"); | ||
const _stdioStreamMarkersJs = require("./stdioStreamMarkers.js"); | ||
function createFilteredStreamTransform() { | ||
const transform = new stream_1.Transform({ | ||
transform(chunk, _encoding, callback) { | ||
const transform = new _stream.Transform({ | ||
transform (chunk, _encoding, callback) { | ||
let str = chunk.toString(); | ||
if (str.includes(stdioStreamMarkers_js_1.START_MARKER_PREFIX)) { | ||
str = str.replace(new RegExp(stdioStreamMarkers_js_1.START_MARKER_PREFIX + "[0-9a-z]{64}\n"), ""); | ||
if (str.includes(_stdioStreamMarkersJs.START_MARKER_PREFIX)) { | ||
str = str.replace(new RegExp(_stdioStreamMarkersJs.START_MARKER_PREFIX + "[0-9a-z]{64}\n"), ""); | ||
} | ||
if (str.includes(stdioStreamMarkers_js_1.END_MARKER_PREFIX)) { | ||
str = str.replace(new RegExp(stdioStreamMarkers_js_1.END_MARKER_PREFIX + "[0-9a-z]{64}\n"), ""); | ||
if (str.includes(_stdioStreamMarkersJs.END_MARKER_PREFIX)) { | ||
str = str.replace(new RegExp(_stdioStreamMarkersJs.END_MARKER_PREFIX + "[0-9a-z]{64}\n"), ""); | ||
} | ||
callback(null, str); | ||
}, | ||
} | ||
}); | ||
return transform; | ||
} | ||
exports.createFilteredStreamTransform = createFilteredStreamTransform; | ||
//# sourceMappingURL=createFilteredStreamTransform.js.map |
"use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
exports.AggregatedPool = exports.WorkerPool = exports.registerWorker = void 0; | ||
var registerWorker_js_1 = require("./registerWorker.js"); | ||
Object.defineProperty(exports, "registerWorker", { enumerable: true, get: function () { return registerWorker_js_1.registerWorker; } }); | ||
var WorkerPool_js_1 = require("./WorkerPool.js"); | ||
Object.defineProperty(exports, "WorkerPool", { enumerable: true, get: function () { return WorkerPool_js_1.WorkerPool; } }); | ||
var AggregatedPool_js_1 = require("./AggregatedPool.js"); | ||
Object.defineProperty(exports, "AggregatedPool", { enumerable: true, get: function () { return AggregatedPool_js_1.AggregatedPool; } }); | ||
//# sourceMappingURL=index.js.map | ||
Object.defineProperty(exports, "__esModule", { | ||
value: true | ||
}); | ||
function _export(target, all) { | ||
for(var name in all)Object.defineProperty(target, name, { | ||
enumerable: true, | ||
get: all[name] | ||
}); | ||
} | ||
_export(exports, { | ||
registerWorker: ()=>_registerWorkerJs.registerWorker, | ||
WorkerPool: ()=>_workerPoolJs.WorkerPool, | ||
AggregatedPool: ()=>_aggregatedPoolJs.AggregatedPool | ||
}); | ||
const _registerWorkerJs = require("./registerWorker.js"); | ||
const _workerPoolJs = require("./WorkerPool.js"); | ||
const _aggregatedPoolJs = require("./AggregatedPool.js"); |
"use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
exports.registerWorker = void 0; | ||
const worker_threads_1 = require("worker_threads"); | ||
const stdioStreamMarkers_js_1 = require("./stdioStreamMarkers.js"); | ||
Object.defineProperty(exports, "__esModule", { | ||
value: true | ||
}); | ||
Object.defineProperty(exports, "registerWorker", { | ||
enumerable: true, | ||
get: ()=>registerWorker | ||
}); | ||
const _workerThreads = require("worker_threads"); | ||
const _stdioStreamMarkersJs = require("./stdioStreamMarkers.js"); | ||
function registerWorker(fn) { | ||
worker_threads_1.parentPort === null || worker_threads_1.parentPort === void 0 ? void 0 : worker_threads_1.parentPort.on("message", async (message) => { | ||
_workerThreads.parentPort?.on("message", async (message)=>{ | ||
let abortController; | ||
switch (message.type) { | ||
switch(message.type){ | ||
case "start": | ||
abortController = new AbortController(); | ||
return message.task && (await start(message.id, message.task, abortController.signal)); | ||
return message.task && await start(message.id, message.task, abortController.signal); | ||
case "abort": | ||
return abortController === null || abortController === void 0 ? void 0 : abortController.abort(); | ||
return abortController?.abort(); | ||
case "check-memory-usage": | ||
return reportMemory(worker_threads_1.parentPort); | ||
return reportMemory(_workerThreads.parentPort); | ||
} | ||
@@ -21,14 +26,20 @@ }); | ||
try { | ||
process.stdout.write(`${(0, stdioStreamMarkers_js_1.startMarker)(workerTaskId)}\n`); | ||
process.stderr.write(`${(0, stdioStreamMarkers_js_1.startMarker)(workerTaskId)}\n`); | ||
process.stdout.write(`${(0, _stdioStreamMarkersJs.startMarker)(workerTaskId)}\n`); | ||
process.stderr.write(`${(0, _stdioStreamMarkersJs.startMarker)(workerTaskId)}\n`); | ||
const results = await fn(task, abortSignal); | ||
worker_threads_1.parentPort === null || worker_threads_1.parentPort === void 0 ? void 0 : worker_threads_1.parentPort.postMessage({ type: "status", err: undefined, results }); | ||
_workerThreads.parentPort?.postMessage({ | ||
type: "status", | ||
err: undefined, | ||
results | ||
}); | ||
} catch (err) { | ||
_workerThreads.parentPort?.postMessage({ | ||
type: "status", | ||
err, | ||
results: undefined | ||
}); | ||
} finally{ | ||
process.stdout.write(`${(0, _stdioStreamMarkersJs.endMarker)(workerTaskId)}\n`); | ||
process.stderr.write(`${(0, _stdioStreamMarkersJs.endMarker)(workerTaskId)}\n`); | ||
} | ||
catch (err) { | ||
worker_threads_1.parentPort === null || worker_threads_1.parentPort === void 0 ? void 0 : worker_threads_1.parentPort.postMessage({ type: "status", err, results: undefined }); | ||
} | ||
finally { | ||
process.stdout.write(`${(0, stdioStreamMarkers_js_1.endMarker)(workerTaskId)}\n`); | ||
process.stderr.write(`${(0, stdioStreamMarkers_js_1.endMarker)(workerTaskId)}\n`); | ||
} | ||
} | ||
@@ -38,3 +49,3 @@ function reportMemory(port) { | ||
type: "report-memory-usage", | ||
memoryUsage: process.memoryUsage().heapUsed, | ||
memoryUsage: process.memoryUsage().heapUsed | ||
}; | ||
@@ -44,3 +55,1 @@ port.postMessage(message); | ||
} | ||
exports.registerWorker = registerWorker; | ||
//# sourceMappingURL=registerWorker.js.map |
"use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
exports.endMarker = exports.startMarker = exports.END_MARKER_PREFIX = exports.START_MARKER_PREFIX = void 0; | ||
exports.START_MARKER_PREFIX = "## WORKER:START:"; | ||
exports.END_MARKER_PREFIX = "## WORKER:END:"; | ||
Object.defineProperty(exports, "__esModule", { | ||
value: true | ||
}); | ||
function _export(target, all) { | ||
for(var name in all)Object.defineProperty(target, name, { | ||
enumerable: true, | ||
get: all[name] | ||
}); | ||
} | ||
_export(exports, { | ||
START_MARKER_PREFIX: ()=>START_MARKER_PREFIX, | ||
END_MARKER_PREFIX: ()=>END_MARKER_PREFIX, | ||
startMarker: ()=>startMarker, | ||
endMarker: ()=>endMarker | ||
}); | ||
const START_MARKER_PREFIX = "## WORKER:START:"; | ||
const END_MARKER_PREFIX = "## WORKER:END:"; | ||
function startMarker(id) { | ||
return `${exports.START_MARKER_PREFIX}${id}`; | ||
return `${START_MARKER_PREFIX}${id}`; | ||
} | ||
exports.startMarker = startMarker; | ||
function endMarker(id) { | ||
return `${exports.END_MARKER_PREFIX}${id}`; | ||
return `${END_MARKER_PREFIX}${id}`; | ||
} | ||
exports.endMarker = endMarker; | ||
//# sourceMappingURL=stdioStreamMarkers.js.map |
/// <reference types="node" /> | ||
/// <reference types="node" /> | ||
/// <reference types="node" /> | ||
/// <reference types="global" /> | ||
@@ -3,0 +5,0 @@ import type { Worker } from "worker_threads"; |
"use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
//# sourceMappingURL=Pool.js.map | ||
Object.defineProperty(exports, "__esModule", { | ||
value: true | ||
}); |
"use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
//# sourceMappingURL=WorkerPoolOptions.js.map | ||
Object.defineProperty(exports, "__esModule", { | ||
value: true | ||
}); |
@@ -6,3 +6,6 @@ /** | ||
/// <reference types="node" /> | ||
/// <reference types="node" /> | ||
/// <reference types="node" /> | ||
/// <reference types="global" /> | ||
/// <reference types="node" /> | ||
import { EventEmitter } from "events"; | ||
@@ -9,0 +12,0 @@ import { Worker } from "worker_threads"; |
@@ -1,19 +0,25 @@ | ||
"use strict"; | ||
/** | ||
* Heavily based on a publically available worker pool implementation in node.js documentation: | ||
* https://nodejs.org/api/async_context.html#using-asyncresource-for-a-worker-thread-pool | ||
*/ | ||
var __importDefault = (this && this.__importDefault) || function (mod) { | ||
return (mod && mod.__esModule) ? mod : { "default": mod }; | ||
}; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
exports.WorkerPool = void 0; | ||
const async_hooks_1 = require("async_hooks"); | ||
const createFilteredStreamTransform_js_1 = require("./createFilteredStreamTransform.js"); | ||
const readline_1 = require("readline"); | ||
const stdioStreamMarkers_js_1 = require("./stdioStreamMarkers.js"); | ||
const events_1 = require("events"); | ||
const worker_threads_1 = require("worker_threads"); | ||
const crypto_1 = __importDefault(require("crypto")); | ||
const os_1 = __importDefault(require("os")); | ||
*/ "use strict"; | ||
Object.defineProperty(exports, "__esModule", { | ||
value: true | ||
}); | ||
Object.defineProperty(exports, "WorkerPool", { | ||
enumerable: true, | ||
get: ()=>WorkerPool | ||
}); | ||
const _asyncHooks = require("async_hooks"); | ||
const _createFilteredStreamTransformJs = require("./createFilteredStreamTransform.js"); | ||
const _readline = require("readline"); | ||
const _stdioStreamMarkersJs = require("./stdioStreamMarkers.js"); | ||
const _events = require("events"); | ||
const _workerThreads = require("worker_threads"); | ||
const _crypto = /*#__PURE__*/ _interopRequireDefault(require("crypto")); | ||
const _os = /*#__PURE__*/ _interopRequireDefault(require("os")); | ||
function _interopRequireDefault(obj) { | ||
return obj && obj.__esModule ? obj : { | ||
default: obj | ||
}; | ||
} | ||
const kTaskInfo = Symbol("kTaskInfo"); | ||
@@ -25,10 +31,3 @@ const kWorkerFreedEvent = Symbol("kWorkerFreedEvent"); | ||
const kWorkerCapturedStderrPromise = Symbol("kWorkerCapturedStderrPromise"); | ||
class WorkerPoolTaskInfo extends async_hooks_1.AsyncResource { | ||
constructor(options) { | ||
super("WorkerPoolTaskInfo"); | ||
this.options = options; | ||
if (options.setup) { | ||
this.runInAsyncScope(options.setup, null, options.worker, options.worker["filteredStdout"], options.worker["filteredStderr"]); | ||
} | ||
} | ||
class WorkerPoolTaskInfo extends _asyncHooks.AsyncResource { | ||
get id() { | ||
@@ -41,3 +40,3 @@ return this.options.id; | ||
done(err, results) { | ||
const { cleanup, worker, resolve, reject } = this.options; | ||
const { cleanup , worker , resolve , reject } = this.options; | ||
if (cleanup) { | ||
@@ -48,4 +47,3 @@ this.runInAsyncScope(cleanup, null, worker); | ||
this.runInAsyncScope(reject, null, err, worker); | ||
} | ||
else { | ||
} else { | ||
this.runInAsyncScope(resolve, null, results, worker); | ||
@@ -55,33 +53,15 @@ } | ||
} | ||
} | ||
class WorkerPool extends events_1.EventEmitter { | ||
constructor(options) { | ||
var _a; | ||
super(); | ||
constructor(options){ | ||
super("WorkerPoolTaskInfo"); | ||
this.options = options; | ||
this.workers = []; | ||
this.freeWorkers = []; | ||
this.queue = []; | ||
this.maxWorkers = 0; | ||
this.availability = 0; | ||
this.maxWorkerMemoryUsage = 0; | ||
this.workerRestarts = 0; | ||
this.maxWorkers = (_a = this.options.maxWorkers) !== null && _a !== void 0 ? _a : os_1.default.cpus().length - 1; | ||
this.availability = this.maxWorkers; | ||
this.workers = []; | ||
this.freeWorkers = []; | ||
this.queue = []; | ||
this.ensureWorkers(); | ||
// Any time the kWorkerFreedEvent is emitted, dispatch | ||
// the next task pending in the queue, if any. | ||
this.on(kWorkerFreedEvent, () => { | ||
if (this.queue.length > 0) { | ||
this._exec(); | ||
} | ||
}); | ||
if (options.setup) { | ||
this.runInAsyncScope(options.setup, null, options.worker, options.worker["filteredStdout"], options.worker["filteredStderr"]); | ||
} | ||
} | ||
} | ||
class WorkerPool extends _events.EventEmitter { | ||
stats() { | ||
return { | ||
maxWorkerMemoryUsage: this.maxWorkerMemoryUsage, | ||
workerRestarts: this.workerRestarts, | ||
workerRestarts: this.workerRestarts | ||
}; | ||
@@ -91,3 +71,3 @@ } | ||
if (this.workers.length === 0) { | ||
for (let i = 0; i < this.maxWorkers; i++) { | ||
for(let i = 0; i < this.maxWorkers; i++){ | ||
this.addNewWorker(); | ||
@@ -99,28 +79,31 @@ } | ||
const stdout = worker.stdout; | ||
const stdoutInterface = (0, readline_1.createInterface)({ | ||
const stdoutInterface = (0, _readline.createInterface)({ | ||
input: stdout, | ||
crlfDelay: Infinity, | ||
crlfDelay: Infinity | ||
}); | ||
const stderr = worker.stderr; | ||
const stderrInterface = (0, readline_1.createInterface)({ | ||
const stderrInterface = (0, _readline.createInterface)({ | ||
input: stderr, | ||
crlfDelay: Infinity, | ||
crlfDelay: Infinity | ||
}); | ||
const lineHandlerFactory = (outputType) => { | ||
const lineHandlerFactory = (outputType)=>{ | ||
let lines = []; | ||
let resolve; | ||
return (line) => { | ||
if (line.includes((0, stdioStreamMarkers_js_1.startMarker)(worker[kTaskInfo].id))) { | ||
return (line)=>{ | ||
if (!worker[kTaskInfo]) { | ||
// Somehow this lineHandler function is called AFTER the worker has been freed. | ||
// This can happen if there are stray setTimeout(), etc. with callbacks that outputs some messages in stdout/stderr | ||
// In this case, we will ignore the output | ||
return; | ||
} | ||
if (line.includes((0, _stdioStreamMarkersJs.startMarker)(worker[kTaskInfo].id))) { | ||
lines = []; | ||
if (outputType === "stdout") { | ||
resolve = worker[kWorkerCapturedStdoutResolve]; | ||
} | ||
else { | ||
} else { | ||
resolve = worker[kWorkerCapturedStderrResolve]; | ||
} | ||
} | ||
else if (line.includes((0, stdioStreamMarkers_js_1.endMarker)(worker[kTaskInfo].id))) { | ||
} else if (line.includes((0, _stdioStreamMarkersJs.endMarker)(worker[kTaskInfo].id))) { | ||
resolve(); | ||
} | ||
else { | ||
} else { | ||
lines.push(line); | ||
@@ -136,11 +119,14 @@ } | ||
addNewWorker() { | ||
const { script, workerOptions } = this.options; | ||
const worker = new worker_threads_1.Worker(script, Object.assign(Object.assign({}, workerOptions), { stdout: true, stderr: true })); | ||
const { script , workerOptions } = this.options; | ||
const worker = new _workerThreads.Worker(script, { | ||
...workerOptions, | ||
stdout: true, | ||
stderr: true | ||
}); | ||
worker[kWorkerCapturedStderrPromise] = Promise.resolve(); | ||
worker[kWorkerCapturedStdoutPromise] = Promise.resolve(); | ||
this.captureWorkerStdioStreams(worker); | ||
worker["filteredStdout"] = worker.stdout.pipe((0, createFilteredStreamTransform_js_1.createFilteredStreamTransform)()); | ||
worker["filteredStderr"] = worker.stderr.pipe((0, createFilteredStreamTransform_js_1.createFilteredStreamTransform)()); | ||
const msgHandler = (data) => { | ||
var _a; | ||
worker["filteredStdout"] = worker.stdout.pipe((0, _createFilteredStreamTransformJs.createFilteredStreamTransform)()); | ||
worker["filteredStderr"] = worker.stderr.pipe((0, _createFilteredStreamTransformJs.createFilteredStreamTransform)()); | ||
const msgHandler = (data)=>{ | ||
if (data.type === "status") { | ||
@@ -150,4 +136,7 @@ // In case of success: Call the callback that was passed to `runTask`, | ||
// again. | ||
Promise.all([worker[kWorkerCapturedStdoutPromise], worker[kWorkerCapturedStderrPromise]]).then(() => { | ||
const { err, results } = data; | ||
Promise.all([ | ||
worker[kWorkerCapturedStdoutPromise], | ||
worker[kWorkerCapturedStderrPromise] | ||
]).then(()=>{ | ||
const { err , results } = data; | ||
const weight = worker[kTaskInfo].weight; | ||
@@ -159,10 +148,8 @@ worker[kTaskInfo].done(err, results); | ||
}); | ||
} | ||
else if (data.type === "report-memory-usage") { | ||
} else if (data.type === "report-memory-usage") { | ||
this.maxWorkerMemoryUsage = Math.max(this.maxWorkerMemoryUsage, data.memoryUsage); | ||
const limit = (_a = this.options.workerIdleMemoryLimit) !== null && _a !== void 0 ? _a : os_1.default.totalmem(); | ||
const limit = this.options.workerIdleMemoryLimit ?? _os.default.totalmem(); | ||
if (limit && data.memoryUsage > limit) { | ||
this.restartWorker(worker); | ||
} | ||
else { | ||
} else { | ||
this.freeWorker(worker); | ||
@@ -173,4 +160,7 @@ } | ||
worker.on("message", msgHandler); | ||
const errHandler = (err) => { | ||
Promise.all([worker[kWorkerCapturedStdoutPromise], worker[kWorkerCapturedStderrPromise]]).then(() => { | ||
const errHandler = (err)=>{ | ||
Promise.all([ | ||
worker[kWorkerCapturedStdoutPromise], | ||
worker[kWorkerCapturedStderrPromise] | ||
]).then(()=>{ | ||
// In case of an uncaught exception: Call the callback that was passed to | ||
@@ -194,3 +184,3 @@ // `runTask` with the error. | ||
exec(task, weight, setup, cleanup, abortSignal) { | ||
if (abortSignal === null || abortSignal === void 0 ? void 0 : abortSignal.aborted) { | ||
if (abortSignal?.aborted) { | ||
return Promise.resolve(); | ||
@@ -200,4 +190,14 @@ } | ||
weight = Math.min(Math.max(1, weight), this.maxWorkers); | ||
return new Promise((resolve, reject) => { | ||
this.queue.push({ task: Object.assign(Object.assign({}, task), { weight }), weight, resolve, reject, cleanup, setup }); | ||
return new Promise((resolve, reject)=>{ | ||
this.queue.push({ | ||
task: { | ||
...task, | ||
weight | ||
}, | ||
weight, | ||
resolve, | ||
reject, | ||
cleanup, | ||
setup | ||
}); | ||
this._exec(abortSignal); | ||
@@ -208,3 +208,3 @@ }); | ||
// find work that will fit the availability of workers | ||
const workIndex = this.queue.findIndex((item) => item.weight <= this.availability); | ||
const workIndex = this.queue.findIndex((item)=>item.weight <= this.availability); | ||
if (workIndex === -1) { | ||
@@ -218,18 +218,35 @@ return; | ||
this.queue.splice(workIndex, 1); | ||
const { task, resolve, reject, cleanup, setup } = work; | ||
const { task , resolve , reject , cleanup , setup } = work; | ||
if (worker) { | ||
abortSignal === null || abortSignal === void 0 ? void 0 : abortSignal.addEventListener("abort", () => { | ||
worker.postMessage({ type: "abort" }); | ||
abortSignal?.addEventListener("abort", ()=>{ | ||
worker.postMessage({ | ||
type: "abort" | ||
}); | ||
}); | ||
const id = crypto_1.default.randomBytes(32).toString("hex"); | ||
worker[kTaskInfo] = new WorkerPoolTaskInfo({ id, weight: work.weight, cleanup, resolve, reject, worker, setup }); | ||
const id = _crypto.default.randomBytes(32).toString("hex"); | ||
worker[kTaskInfo] = new WorkerPoolTaskInfo({ | ||
id, | ||
weight: work.weight, | ||
cleanup, | ||
resolve, | ||
reject, | ||
worker, | ||
setup | ||
}); | ||
// Create a pair of promises that are only resolved when a specific task end marker is detected | ||
// in the worker's stdout/stderr streams. | ||
worker[kWorkerCapturedStdoutPromise] = new Promise((onResolve) => { | ||
worker[kWorkerCapturedStdoutPromise] = new Promise((onResolve)=>{ | ||
worker[kWorkerCapturedStdoutResolve] = onResolve; | ||
}); | ||
worker[kWorkerCapturedStderrPromise] = new Promise((onResolve) => { | ||
worker[kWorkerCapturedStderrPromise] = new Promise((onResolve)=>{ | ||
worker[kWorkerCapturedStderrResolve] = onResolve; | ||
}); | ||
worker.postMessage({ type: "start", task: Object.assign(Object.assign({}, task), { weight: work.weight }), id }); | ||
worker.postMessage({ | ||
type: "start", | ||
task: { | ||
...task, | ||
weight: work.weight | ||
}, | ||
id | ||
}); | ||
} | ||
@@ -239,3 +256,5 @@ } | ||
checkMemoryUsage(worker) { | ||
worker.postMessage({ type: "check-memory-usage" }); | ||
worker.postMessage({ | ||
type: "check-memory-usage" | ||
}); | ||
} | ||
@@ -257,10 +276,32 @@ freeWorker(worker) { | ||
async close() { | ||
for (const worker of this.workers) { | ||
for (const worker of this.workers){ | ||
worker.removeAllListeners(); | ||
worker.unref(); | ||
} | ||
await Promise.all(this.workers.map((worker) => worker.terminate())); | ||
await Promise.all(this.workers.map((worker)=>worker.terminate())); | ||
} | ||
constructor(options){ | ||
super(); | ||
this.options = options; | ||
this.workers = []; | ||
this.freeWorkers = []; | ||
this.queue = []; | ||
this.maxWorkers = 0; | ||
this.availability = 0; | ||
this.maxWorkerMemoryUsage = 0; | ||
this.workerRestarts = 0; | ||
this.maxWorkers = this.options.maxWorkers ?? _os.default.cpus().length - 1; | ||
this.availability = this.maxWorkers; | ||
this.workers = []; | ||
this.freeWorkers = []; | ||
this.queue = []; | ||
this.ensureWorkers(); | ||
// Any time the kWorkerFreedEvent is emitted, dispatch | ||
// the next task pending in the queue, if any. | ||
this.on(kWorkerFreedEvent, ()=>{ | ||
if (this.queue.length > 0) { | ||
this._exec(); | ||
} | ||
}); | ||
} | ||
} | ||
exports.WorkerPool = WorkerPool; | ||
//# sourceMappingURL=WorkerPool.js.map |
{ | ||
"name": "@lage-run/worker-threads-pool", | ||
"version": "0.4.4", | ||
"version": "0.4.5", | ||
"description": "A worker_threads pool implementation based on the official Node.js async_hooks documentation", | ||
@@ -5,0 +5,0 @@ "repository": { |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
921
36115
22