Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

@lage-run/worker-threads-pool

Package Overview
Dependencies
Maintainers
1
Versions
26
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@lage-run/worker-threads-pool - npm Package Compare versions

Comparing version 0.4.4 to 0.4.5

17

CHANGELOG.json

@@ -5,3 +5,18 @@ {

{
"date": "Tue, 01 Nov 2022 22:48:19 GMT",
"date": "Wed, 16 Nov 2022 17:12:13 GMT",
"tag": "@lage-run/worker-threads-pool_v0.4.5",
"version": "0.4.5",
"comments": {
"patch": [
{
"author": "kchau@microsoft.com",
"package": "@lage-run/worker-threads-pool",
"commit": "a7a4471aeaa018e40c718ec6bd6611a0ab040765",
"comment": "adding a handling case for when lines are still being outputted but the worker is freed"
}
]
}
},
{
"date": "Tue, 01 Nov 2022 22:48:33 GMT",
"tag": "@lage-run/worker-threads-pool_v0.4.4",

@@ -8,0 +23,0 @@ "version": "0.4.4",

# Change Log - @lage-run/worker-threads-pool
This log was last generated on Tue, 01 Nov 2022 22:48:19 GMT and should not be manually modified.
This log was last generated on Wed, 16 Nov 2022 17:12:13 GMT and should not be manually modified.
<!-- Start content -->
## 0.4.5
Wed, 16 Nov 2022 17:12:13 GMT
### Patches
- adding a handling case for when lines are still being outputted but the worker is freed (kchau@microsoft.com)
## 0.4.4
Tue, 01 Nov 2022 22:48:19 GMT
Tue, 01 Nov 2022 22:48:33 GMT

@@ -11,0 +19,0 @@ ### Patches

/// <reference types="node" />
/// <reference types="node" />
/// <reference types="node" />
/// <reference types="global" />

@@ -3,0 +5,0 @@ import type { Readable } from "stream";

93

lib/AggregatedPool.js
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.AggregatedPool = void 0;
const WorkerPool_js_1 = require("./WorkerPool.js");
Object.defineProperty(exports, "__esModule", {
value: true
});
Object.defineProperty(exports, "AggregatedPool", {
enumerable: true,
get: ()=>AggregatedPool
});
const _workerPoolJs = require("./WorkerPool.js");
class AggregatedPool {
constructor(options) {
stats() {
const stats = [
...this.groupedPools.values(),
this.defaultPool
].reduce((acc, pool)=>{
if (pool) {
const poolStats = pool.stats();
acc.maxWorkerMemoryUsage = Math.max(acc.maxWorkerMemoryUsage, poolStats.maxWorkerMemoryUsage);
acc.workerRestarts = acc.workerRestarts + poolStats.workerRestarts;
}
return acc;
}, {
maxWorkerMemoryUsage: 0,
workerRestarts: 0
});
return stats;
}
async exec(data, weight, setup, cleanup, abortSignal) {
const group = this.options.groupBy(data);
const pool = this.groupedPools.get(group) ?? this.defaultPool;
if (!pool) {
throw new Error(`No pool found to be able to run ${group} tasks, try adjusting the maxWorkers & concurrency values`);
}
return pool.exec(data, weight, setup, cleanup, abortSignal);
}
async close() {
const promises = [
...this.groupedPools.values(),
this.defaultPool
].map((pool)=>pool?.close());
return Promise.all(promises);
}
constructor(options){
this.options = options;
this.groupedPools = new Map();
const { maxWorkers, maxWorkersByGroup, script, workerOptions } = options;
const { maxWorkers , maxWorkersByGroup , script , workerOptions } = options;
let totalGroupedWorkers = 0;
for (const [group, groupMaxWorkers] of maxWorkersByGroup.entries()) {
const pool = new WorkerPool_js_1.WorkerPool({
for (const [group, groupMaxWorkers] of maxWorkersByGroup.entries()){
const pool = new _workerPoolJs.WorkerPool({
maxWorkers: groupMaxWorkers,
workerOptions,
script,
workerIdleMemoryLimit: options.workerIdleMemoryLimit,
workerIdleMemoryLimit: options.workerIdleMemoryLimit
});

@@ -26,39 +63,17 @@ this.groupedPools.set(group, pool);

if (defaultPoolWorkersCount > 0) {
this.defaultPool = new WorkerPool_js_1.WorkerPool({
this.defaultPool = new _workerPoolJs.WorkerPool({
maxWorkers: defaultPoolWorkersCount,
workerOptions,
script,
workerIdleMemoryLimit: options.workerIdleMemoryLimit,
workerIdleMemoryLimit: options.workerIdleMemoryLimit
});
}
this.options.logger.verbose(`Workers pools created: ${[...maxWorkersByGroup.entries(), ["default", defaultPoolWorkersCount]]
.map(([group, count]) => `${group} (${count})`)
.join(", ")}`);
this.options.logger.verbose(`Workers pools created: ${[
...maxWorkersByGroup.entries(),
[
"default",
defaultPoolWorkersCount
]
].map(([group, count])=>`${group} (${count})`).join(", ")}`);
}
stats() {
const stats = [...this.groupedPools.values(), this.defaultPool].reduce((acc, pool) => {
if (pool) {
const poolStats = pool.stats();
acc.maxWorkerMemoryUsage = Math.max(acc.maxWorkerMemoryUsage, poolStats.maxWorkerMemoryUsage);
acc.workerRestarts = acc.workerRestarts + poolStats.workerRestarts;
}
return acc;
}, { maxWorkerMemoryUsage: 0, workerRestarts: 0 });
return stats;
}
async exec(data, weight, setup, cleanup, abortSignal) {
var _a;
const group = this.options.groupBy(data);
const pool = (_a = this.groupedPools.get(group)) !== null && _a !== void 0 ? _a : this.defaultPool;
if (!pool) {
throw new Error(`No pool found to be able to run ${group} tasks, try adjusting the maxWorkers & concurrency values`);
}
return pool.exec(data, weight, setup, cleanup, abortSignal);
}
async close() {
const promises = [...this.groupedPools.values(), this.defaultPool].map((pool) => pool === null || pool === void 0 ? void 0 : pool.close());
return Promise.all(promises);
}
}
exports.AggregatedPool = AggregatedPool;
//# sourceMappingURL=AggregatedPool.js.map
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.createFilteredStreamTransform = void 0;
const stream_1 = require("stream");
const stdioStreamMarkers_js_1 = require("./stdioStreamMarkers.js");
Object.defineProperty(exports, "__esModule", {
value: true
});
Object.defineProperty(exports, "createFilteredStreamTransform", {
enumerable: true,
get: ()=>createFilteredStreamTransform
});
const _stream = require("stream");
const _stdioStreamMarkersJs = require("./stdioStreamMarkers.js");
function createFilteredStreamTransform() {
const transform = new stream_1.Transform({
transform(chunk, _encoding, callback) {
const transform = new _stream.Transform({
transform (chunk, _encoding, callback) {
let str = chunk.toString();
if (str.includes(stdioStreamMarkers_js_1.START_MARKER_PREFIX)) {
str = str.replace(new RegExp(stdioStreamMarkers_js_1.START_MARKER_PREFIX + "[0-9a-z]{64}\n"), "");
if (str.includes(_stdioStreamMarkersJs.START_MARKER_PREFIX)) {
str = str.replace(new RegExp(_stdioStreamMarkersJs.START_MARKER_PREFIX + "[0-9a-z]{64}\n"), "");
}
if (str.includes(stdioStreamMarkers_js_1.END_MARKER_PREFIX)) {
str = str.replace(new RegExp(stdioStreamMarkers_js_1.END_MARKER_PREFIX + "[0-9a-z]{64}\n"), "");
if (str.includes(_stdioStreamMarkersJs.END_MARKER_PREFIX)) {
str = str.replace(new RegExp(_stdioStreamMarkersJs.END_MARKER_PREFIX + "[0-9a-z]{64}\n"), "");
}
callback(null, str);
},
}
});
return transform;
}
exports.createFilteredStreamTransform = createFilteredStreamTransform;
//# sourceMappingURL=createFilteredStreamTransform.js.map
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.AggregatedPool = exports.WorkerPool = exports.registerWorker = void 0;
var registerWorker_js_1 = require("./registerWorker.js");
Object.defineProperty(exports, "registerWorker", { enumerable: true, get: function () { return registerWorker_js_1.registerWorker; } });
var WorkerPool_js_1 = require("./WorkerPool.js");
Object.defineProperty(exports, "WorkerPool", { enumerable: true, get: function () { return WorkerPool_js_1.WorkerPool; } });
var AggregatedPool_js_1 = require("./AggregatedPool.js");
Object.defineProperty(exports, "AggregatedPool", { enumerable: true, get: function () { return AggregatedPool_js_1.AggregatedPool; } });
//# sourceMappingURL=index.js.map
Object.defineProperty(exports, "__esModule", {
value: true
});
function _export(target, all) {
for(var name in all)Object.defineProperty(target, name, {
enumerable: true,
get: all[name]
});
}
_export(exports, {
registerWorker: ()=>_registerWorkerJs.registerWorker,
WorkerPool: ()=>_workerPoolJs.WorkerPool,
AggregatedPool: ()=>_aggregatedPoolJs.AggregatedPool
});
const _registerWorkerJs = require("./registerWorker.js");
const _workerPoolJs = require("./WorkerPool.js");
const _aggregatedPoolJs = require("./AggregatedPool.js");
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.registerWorker = void 0;
const worker_threads_1 = require("worker_threads");
const stdioStreamMarkers_js_1 = require("./stdioStreamMarkers.js");
Object.defineProperty(exports, "__esModule", {
value: true
});
Object.defineProperty(exports, "registerWorker", {
enumerable: true,
get: ()=>registerWorker
});
const _workerThreads = require("worker_threads");
const _stdioStreamMarkersJs = require("./stdioStreamMarkers.js");
function registerWorker(fn) {
worker_threads_1.parentPort === null || worker_threads_1.parentPort === void 0 ? void 0 : worker_threads_1.parentPort.on("message", async (message) => {
_workerThreads.parentPort?.on("message", async (message)=>{
let abortController;
switch (message.type) {
switch(message.type){
case "start":
abortController = new AbortController();
return message.task && (await start(message.id, message.task, abortController.signal));
return message.task && await start(message.id, message.task, abortController.signal);
case "abort":
return abortController === null || abortController === void 0 ? void 0 : abortController.abort();
return abortController?.abort();
case "check-memory-usage":
return reportMemory(worker_threads_1.parentPort);
return reportMemory(_workerThreads.parentPort);
}

@@ -21,14 +26,20 @@ });

try {
process.stdout.write(`${(0, stdioStreamMarkers_js_1.startMarker)(workerTaskId)}\n`);
process.stderr.write(`${(0, stdioStreamMarkers_js_1.startMarker)(workerTaskId)}\n`);
process.stdout.write(`${(0, _stdioStreamMarkersJs.startMarker)(workerTaskId)}\n`);
process.stderr.write(`${(0, _stdioStreamMarkersJs.startMarker)(workerTaskId)}\n`);
const results = await fn(task, abortSignal);
worker_threads_1.parentPort === null || worker_threads_1.parentPort === void 0 ? void 0 : worker_threads_1.parentPort.postMessage({ type: "status", err: undefined, results });
_workerThreads.parentPort?.postMessage({
type: "status",
err: undefined,
results
});
} catch (err) {
_workerThreads.parentPort?.postMessage({
type: "status",
err,
results: undefined
});
} finally{
process.stdout.write(`${(0, _stdioStreamMarkersJs.endMarker)(workerTaskId)}\n`);
process.stderr.write(`${(0, _stdioStreamMarkersJs.endMarker)(workerTaskId)}\n`);
}
catch (err) {
worker_threads_1.parentPort === null || worker_threads_1.parentPort === void 0 ? void 0 : worker_threads_1.parentPort.postMessage({ type: "status", err, results: undefined });
}
finally {
process.stdout.write(`${(0, stdioStreamMarkers_js_1.endMarker)(workerTaskId)}\n`);
process.stderr.write(`${(0, stdioStreamMarkers_js_1.endMarker)(workerTaskId)}\n`);
}
}

@@ -38,3 +49,3 @@ function reportMemory(port) {

type: "report-memory-usage",
memoryUsage: process.memoryUsage().heapUsed,
memoryUsage: process.memoryUsage().heapUsed
};

@@ -44,3 +55,1 @@ port.postMessage(message);

}
exports.registerWorker = registerWorker;
//# sourceMappingURL=registerWorker.js.map
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.endMarker = exports.startMarker = exports.END_MARKER_PREFIX = exports.START_MARKER_PREFIX = void 0;
exports.START_MARKER_PREFIX = "## WORKER:START:";
exports.END_MARKER_PREFIX = "## WORKER:END:";
Object.defineProperty(exports, "__esModule", {
value: true
});
function _export(target, all) {
for(var name in all)Object.defineProperty(target, name, {
enumerable: true,
get: all[name]
});
}
_export(exports, {
START_MARKER_PREFIX: ()=>START_MARKER_PREFIX,
END_MARKER_PREFIX: ()=>END_MARKER_PREFIX,
startMarker: ()=>startMarker,
endMarker: ()=>endMarker
});
const START_MARKER_PREFIX = "## WORKER:START:";
const END_MARKER_PREFIX = "## WORKER:END:";
function startMarker(id) {
return `${exports.START_MARKER_PREFIX}${id}`;
return `${START_MARKER_PREFIX}${id}`;
}
exports.startMarker = startMarker;
function endMarker(id) {
return `${exports.END_MARKER_PREFIX}${id}`;
return `${END_MARKER_PREFIX}${id}`;
}
exports.endMarker = endMarker;
//# sourceMappingURL=stdioStreamMarkers.js.map
/// <reference types="node" />
/// <reference types="node" />
/// <reference types="node" />
/// <reference types="global" />

@@ -3,0 +5,0 @@ import type { Worker } from "worker_threads";

"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
//# sourceMappingURL=Pool.js.map
Object.defineProperty(exports, "__esModule", {
value: true
});
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
//# sourceMappingURL=WorkerPoolOptions.js.map
Object.defineProperty(exports, "__esModule", {
value: true
});

@@ -6,3 +6,6 @@ /**

/// <reference types="node" />
/// <reference types="node" />
/// <reference types="node" />
/// <reference types="global" />
/// <reference types="node" />
import { EventEmitter } from "events";

@@ -9,0 +12,0 @@ import { Worker } from "worker_threads";

@@ -1,19 +0,25 @@

"use strict";
/**
* Heavily based on a publically available worker pool implementation in node.js documentation:
* https://nodejs.org/api/async_context.html#using-asyncresource-for-a-worker-thread-pool
*/
var __importDefault = (this && this.__importDefault) || function (mod) {
return (mod && mod.__esModule) ? mod : { "default": mod };
};
Object.defineProperty(exports, "__esModule", { value: true });
exports.WorkerPool = void 0;
const async_hooks_1 = require("async_hooks");
const createFilteredStreamTransform_js_1 = require("./createFilteredStreamTransform.js");
const readline_1 = require("readline");
const stdioStreamMarkers_js_1 = require("./stdioStreamMarkers.js");
const events_1 = require("events");
const worker_threads_1 = require("worker_threads");
const crypto_1 = __importDefault(require("crypto"));
const os_1 = __importDefault(require("os"));
*/ "use strict";
Object.defineProperty(exports, "__esModule", {
value: true
});
Object.defineProperty(exports, "WorkerPool", {
enumerable: true,
get: ()=>WorkerPool
});
const _asyncHooks = require("async_hooks");
const _createFilteredStreamTransformJs = require("./createFilteredStreamTransform.js");
const _readline = require("readline");
const _stdioStreamMarkersJs = require("./stdioStreamMarkers.js");
const _events = require("events");
const _workerThreads = require("worker_threads");
const _crypto = /*#__PURE__*/ _interopRequireDefault(require("crypto"));
const _os = /*#__PURE__*/ _interopRequireDefault(require("os"));
function _interopRequireDefault(obj) {
return obj && obj.__esModule ? obj : {
default: obj
};
}
const kTaskInfo = Symbol("kTaskInfo");

@@ -25,10 +31,3 @@ const kWorkerFreedEvent = Symbol("kWorkerFreedEvent");

const kWorkerCapturedStderrPromise = Symbol("kWorkerCapturedStderrPromise");
class WorkerPoolTaskInfo extends async_hooks_1.AsyncResource {
constructor(options) {
super("WorkerPoolTaskInfo");
this.options = options;
if (options.setup) {
this.runInAsyncScope(options.setup, null, options.worker, options.worker["filteredStdout"], options.worker["filteredStderr"]);
}
}
class WorkerPoolTaskInfo extends _asyncHooks.AsyncResource {
get id() {

@@ -41,3 +40,3 @@ return this.options.id;

done(err, results) {
const { cleanup, worker, resolve, reject } = this.options;
const { cleanup , worker , resolve , reject } = this.options;
if (cleanup) {

@@ -48,4 +47,3 @@ this.runInAsyncScope(cleanup, null, worker);

this.runInAsyncScope(reject, null, err, worker);
}
else {
} else {
this.runInAsyncScope(resolve, null, results, worker);

@@ -55,33 +53,15 @@ }

}
}
class WorkerPool extends events_1.EventEmitter {
constructor(options) {
var _a;
super();
constructor(options){
super("WorkerPoolTaskInfo");
this.options = options;
this.workers = [];
this.freeWorkers = [];
this.queue = [];
this.maxWorkers = 0;
this.availability = 0;
this.maxWorkerMemoryUsage = 0;
this.workerRestarts = 0;
this.maxWorkers = (_a = this.options.maxWorkers) !== null && _a !== void 0 ? _a : os_1.default.cpus().length - 1;
this.availability = this.maxWorkers;
this.workers = [];
this.freeWorkers = [];
this.queue = [];
this.ensureWorkers();
// Any time the kWorkerFreedEvent is emitted, dispatch
// the next task pending in the queue, if any.
this.on(kWorkerFreedEvent, () => {
if (this.queue.length > 0) {
this._exec();
}
});
if (options.setup) {
this.runInAsyncScope(options.setup, null, options.worker, options.worker["filteredStdout"], options.worker["filteredStderr"]);
}
}
}
class WorkerPool extends _events.EventEmitter {
stats() {
return {
maxWorkerMemoryUsage: this.maxWorkerMemoryUsage,
workerRestarts: this.workerRestarts,
workerRestarts: this.workerRestarts
};

@@ -91,3 +71,3 @@ }

if (this.workers.length === 0) {
for (let i = 0; i < this.maxWorkers; i++) {
for(let i = 0; i < this.maxWorkers; i++){
this.addNewWorker();

@@ -99,28 +79,31 @@ }

const stdout = worker.stdout;
const stdoutInterface = (0, readline_1.createInterface)({
const stdoutInterface = (0, _readline.createInterface)({
input: stdout,
crlfDelay: Infinity,
crlfDelay: Infinity
});
const stderr = worker.stderr;
const stderrInterface = (0, readline_1.createInterface)({
const stderrInterface = (0, _readline.createInterface)({
input: stderr,
crlfDelay: Infinity,
crlfDelay: Infinity
});
const lineHandlerFactory = (outputType) => {
const lineHandlerFactory = (outputType)=>{
let lines = [];
let resolve;
return (line) => {
if (line.includes((0, stdioStreamMarkers_js_1.startMarker)(worker[kTaskInfo].id))) {
return (line)=>{
if (!worker[kTaskInfo]) {
// Somehow this lineHandler function is called AFTER the worker has been freed.
// This can happen if there are stray setTimeout(), etc. with callbacks that outputs some messages in stdout/stderr
// In this case, we will ignore the output
return;
}
if (line.includes((0, _stdioStreamMarkersJs.startMarker)(worker[kTaskInfo].id))) {
lines = [];
if (outputType === "stdout") {
resolve = worker[kWorkerCapturedStdoutResolve];
}
else {
} else {
resolve = worker[kWorkerCapturedStderrResolve];
}
}
else if (line.includes((0, stdioStreamMarkers_js_1.endMarker)(worker[kTaskInfo].id))) {
} else if (line.includes((0, _stdioStreamMarkersJs.endMarker)(worker[kTaskInfo].id))) {
resolve();
}
else {
} else {
lines.push(line);

@@ -136,11 +119,14 @@ }

addNewWorker() {
const { script, workerOptions } = this.options;
const worker = new worker_threads_1.Worker(script, Object.assign(Object.assign({}, workerOptions), { stdout: true, stderr: true }));
const { script , workerOptions } = this.options;
const worker = new _workerThreads.Worker(script, {
...workerOptions,
stdout: true,
stderr: true
});
worker[kWorkerCapturedStderrPromise] = Promise.resolve();
worker[kWorkerCapturedStdoutPromise] = Promise.resolve();
this.captureWorkerStdioStreams(worker);
worker["filteredStdout"] = worker.stdout.pipe((0, createFilteredStreamTransform_js_1.createFilteredStreamTransform)());
worker["filteredStderr"] = worker.stderr.pipe((0, createFilteredStreamTransform_js_1.createFilteredStreamTransform)());
const msgHandler = (data) => {
var _a;
worker["filteredStdout"] = worker.stdout.pipe((0, _createFilteredStreamTransformJs.createFilteredStreamTransform)());
worker["filteredStderr"] = worker.stderr.pipe((0, _createFilteredStreamTransformJs.createFilteredStreamTransform)());
const msgHandler = (data)=>{
if (data.type === "status") {

@@ -150,4 +136,7 @@ // In case of success: Call the callback that was passed to `runTask`,

// again.
Promise.all([worker[kWorkerCapturedStdoutPromise], worker[kWorkerCapturedStderrPromise]]).then(() => {
const { err, results } = data;
Promise.all([
worker[kWorkerCapturedStdoutPromise],
worker[kWorkerCapturedStderrPromise]
]).then(()=>{
const { err , results } = data;
const weight = worker[kTaskInfo].weight;

@@ -159,10 +148,8 @@ worker[kTaskInfo].done(err, results);

});
}
else if (data.type === "report-memory-usage") {
} else if (data.type === "report-memory-usage") {
this.maxWorkerMemoryUsage = Math.max(this.maxWorkerMemoryUsage, data.memoryUsage);
const limit = (_a = this.options.workerIdleMemoryLimit) !== null && _a !== void 0 ? _a : os_1.default.totalmem();
const limit = this.options.workerIdleMemoryLimit ?? _os.default.totalmem();
if (limit && data.memoryUsage > limit) {
this.restartWorker(worker);
}
else {
} else {
this.freeWorker(worker);

@@ -173,4 +160,7 @@ }

worker.on("message", msgHandler);
const errHandler = (err) => {
Promise.all([worker[kWorkerCapturedStdoutPromise], worker[kWorkerCapturedStderrPromise]]).then(() => {
const errHandler = (err)=>{
Promise.all([
worker[kWorkerCapturedStdoutPromise],
worker[kWorkerCapturedStderrPromise]
]).then(()=>{
// In case of an uncaught exception: Call the callback that was passed to

@@ -194,3 +184,3 @@ // `runTask` with the error.

exec(task, weight, setup, cleanup, abortSignal) {
if (abortSignal === null || abortSignal === void 0 ? void 0 : abortSignal.aborted) {
if (abortSignal?.aborted) {
return Promise.resolve();

@@ -200,4 +190,14 @@ }

weight = Math.min(Math.max(1, weight), this.maxWorkers);
return new Promise((resolve, reject) => {
this.queue.push({ task: Object.assign(Object.assign({}, task), { weight }), weight, resolve, reject, cleanup, setup });
return new Promise((resolve, reject)=>{
this.queue.push({
task: {
...task,
weight
},
weight,
resolve,
reject,
cleanup,
setup
});
this._exec(abortSignal);

@@ -208,3 +208,3 @@ });

// find work that will fit the availability of workers
const workIndex = this.queue.findIndex((item) => item.weight <= this.availability);
const workIndex = this.queue.findIndex((item)=>item.weight <= this.availability);
if (workIndex === -1) {

@@ -218,18 +218,35 @@ return;

this.queue.splice(workIndex, 1);
const { task, resolve, reject, cleanup, setup } = work;
const { task , resolve , reject , cleanup , setup } = work;
if (worker) {
abortSignal === null || abortSignal === void 0 ? void 0 : abortSignal.addEventListener("abort", () => {
worker.postMessage({ type: "abort" });
abortSignal?.addEventListener("abort", ()=>{
worker.postMessage({
type: "abort"
});
});
const id = crypto_1.default.randomBytes(32).toString("hex");
worker[kTaskInfo] = new WorkerPoolTaskInfo({ id, weight: work.weight, cleanup, resolve, reject, worker, setup });
const id = _crypto.default.randomBytes(32).toString("hex");
worker[kTaskInfo] = new WorkerPoolTaskInfo({
id,
weight: work.weight,
cleanup,
resolve,
reject,
worker,
setup
});
// Create a pair of promises that are only resolved when a specific task end marker is detected
// in the worker's stdout/stderr streams.
worker[kWorkerCapturedStdoutPromise] = new Promise((onResolve) => {
worker[kWorkerCapturedStdoutPromise] = new Promise((onResolve)=>{
worker[kWorkerCapturedStdoutResolve] = onResolve;
});
worker[kWorkerCapturedStderrPromise] = new Promise((onResolve) => {
worker[kWorkerCapturedStderrPromise] = new Promise((onResolve)=>{
worker[kWorkerCapturedStderrResolve] = onResolve;
});
worker.postMessage({ type: "start", task: Object.assign(Object.assign({}, task), { weight: work.weight }), id });
worker.postMessage({
type: "start",
task: {
...task,
weight: work.weight
},
id
});
}

@@ -239,3 +256,5 @@ }

checkMemoryUsage(worker) {
worker.postMessage({ type: "check-memory-usage" });
worker.postMessage({
type: "check-memory-usage"
});
}

@@ -257,10 +276,32 @@ freeWorker(worker) {

async close() {
for (const worker of this.workers) {
for (const worker of this.workers){
worker.removeAllListeners();
worker.unref();
}
await Promise.all(this.workers.map((worker) => worker.terminate()));
await Promise.all(this.workers.map((worker)=>worker.terminate()));
}
constructor(options){
super();
this.options = options;
this.workers = [];
this.freeWorkers = [];
this.queue = [];
this.maxWorkers = 0;
this.availability = 0;
this.maxWorkerMemoryUsage = 0;
this.workerRestarts = 0;
this.maxWorkers = this.options.maxWorkers ?? _os.default.cpus().length - 1;
this.availability = this.maxWorkers;
this.workers = [];
this.freeWorkers = [];
this.queue = [];
this.ensureWorkers();
// Any time the kWorkerFreedEvent is emitted, dispatch
// the next task pending in the queue, if any.
this.on(kWorkerFreedEvent, ()=>{
if (this.queue.length > 0) {
this._exec();
}
});
}
}
exports.WorkerPool = WorkerPool;
//# sourceMappingURL=WorkerPool.js.map
{
"name": "@lage-run/worker-threads-pool",
"version": "0.4.4",
"version": "0.4.5",
"description": "A worker_threads pool implementation based on the official Node.js async_hooks documentation",

@@ -5,0 +5,0 @@ "repository": {

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc