Socket
Socket
Sign inDemoInstall

@lage-run/worker-threads-pool

Package Overview
Dependencies
0
Maintainers
1
Versions
22
Alerts
File Explorer

Advanced tools

Install Socket

Detect and block malicious and high-risk dependencies

Install

Comparing version 0.3.0 to 0.4.0

17

CHANGELOG.json

@@ -5,3 +5,18 @@ {

{
"date": "Wed, 26 Oct 2022 22:00:56 GMT",
"date": "Sat, 29 Oct 2022 01:06:03 GMT",
"tag": "@lage-run/worker-threads-pool_v0.4.0",
"version": "0.4.0",
"comments": {
"minor": [
{
"author": "kchau@microsoft.com",
"package": "@lage-run/worker-threads-pool",
"commit": "15f4763e0ac29760f813523d3f37b6a5078f4897",
"comment": "adds memory limit + restart capability"
}
]
}
},
{
"date": "Wed, 26 Oct 2022 22:01:13 GMT",
"tag": "@lage-run/worker-threads-pool_v0.3.0",

@@ -8,0 +23,0 @@ "version": "0.3.0",

# Change Log - @lage-run/worker-threads-pool
This log was last generated on Wed, 26 Oct 2022 22:00:56 GMT and should not be manually modified.
This log was last generated on Sat, 29 Oct 2022 01:06:03 GMT and should not be manually modified.
<!-- Start content -->
## 0.4.0
Sat, 29 Oct 2022 01:06:03 GMT
### Minor changes
- adds memory limit + restart capability (kchau@microsoft.com)
## 0.3.0
Wed, 26 Oct 2022 22:00:56 GMT
Wed, 26 Oct 2022 22:01:13 GMT

@@ -11,0 +19,0 @@ ### Minor changes

@@ -15,2 +15,3 @@ /// <reference types="node" />

logger: Logger;
workerIdleMemoryLimit?: number;
}

@@ -22,2 +23,6 @@ export declare class AggregatedPool implements Pool {

constructor(options: AggregatedPoolOptions);
stats(): {
maxWorkerMemoryUsage: number;
workerRestarts: number;
};
exec(data: Record<string, unknown>, weight: number, setup?: (worker: Worker, stdout: Readable, stderr: Readable) => void, cleanup?: (args: any) => void, abortSignal?: AbortSignal): Promise<unknown>;

@@ -24,0 +29,0 @@ close(): Promise<unknown>;

@@ -12,3 +12,8 @@ "use strict";

for (const [group, groupMaxWorkers] of maxWorkersByGroup.entries()) {
const pool = new WorkerPool_1.WorkerPool({ maxWorkers: groupMaxWorkers, workerOptions, script });
const pool = new WorkerPool_1.WorkerPool({
maxWorkers: groupMaxWorkers,
workerOptions,
script,
workerIdleMemoryLimit: options.workerIdleMemoryLimit,
});
this.groupedPools.set(group, pool);

@@ -22,3 +27,8 @@ totalGroupedWorkers += groupMaxWorkers;

if (defaultPoolWorkersCount > 0) {
this.defaultPool = new WorkerPool_1.WorkerPool({ maxWorkers: defaultPoolWorkersCount, workerOptions, script });
this.defaultPool = new WorkerPool_1.WorkerPool({
maxWorkers: defaultPoolWorkersCount,
workerOptions,
script,
workerIdleMemoryLimit: options.workerIdleMemoryLimit,
});
}

@@ -29,2 +39,13 @@ this.options.logger.verbose(`Workers pools created: ${[...maxWorkersByGroup.entries(), ["default", defaultPoolWorkersCount]]

}
stats() {
const stats = [...this.groupedPools.values(), this.defaultPool].reduce((acc, pool) => {
if (pool) {
const poolStats = pool.stats();
acc.maxWorkerMemoryUsage = Math.max(acc.maxWorkerMemoryUsage, poolStats.maxWorkerMemoryUsage);
acc.workerRestarts = acc.workerRestarts + poolStats.workerRestarts;
}
return acc;
}, { maxWorkerMemoryUsage: 0, workerRestarts: 0 });
return stats;
}
async exec(data, weight, setup, cleanup, abortSignal) {

@@ -31,0 +52,0 @@ var _a;

@@ -16,2 +16,4 @@ "use strict";

return abortController === null || abortController === void 0 ? void 0 : abortController.abort();
case "check-memory-usage":
return reportMemory(worker_threads_1.parentPort);
}

@@ -24,6 +26,6 @@ });

const results = await fn(task, abortSignal);
worker_threads_1.parentPort === null || worker_threads_1.parentPort === void 0 ? void 0 : worker_threads_1.parentPort.postMessage({ err: undefined, results });
worker_threads_1.parentPort === null || worker_threads_1.parentPort === void 0 ? void 0 : worker_threads_1.parentPort.postMessage({ type: "status", err: undefined, results });
}
catch (err) {
worker_threads_1.parentPort === null || worker_threads_1.parentPort === void 0 ? void 0 : worker_threads_1.parentPort.postMessage({ err, results: undefined });
worker_threads_1.parentPort === null || worker_threads_1.parentPort === void 0 ? void 0 : worker_threads_1.parentPort.postMessage({ type: "status", err, results: undefined });
}

@@ -35,4 +37,11 @@ finally {

}
function reportMemory(port) {
const message = {
type: "report-memory-usage",
memoryUsage: process.memoryUsage().heapUsed,
};
port.postMessage(message);
}
}
exports.registerWorker = registerWorker;
//# sourceMappingURL=registerWorker.js.map

@@ -5,5 +5,10 @@ /// <reference types="node" />

import type { AbortSignal } from "abort-controller";
export interface PoolStats {
maxWorkerMemoryUsage: number;
workerRestarts: number;
}
export interface Pool {
exec(data: unknown, weight: number, setup?: (worker: Worker, stdout: Readable, stderr: Readable) => void, cleanup?: (args: any) => void, abortSignal?: AbortSignal): Promise<unknown>;
stats(): PoolStats;
close(): Promise<unknown>;
}

@@ -6,3 +6,4 @@ /// <reference types="node" />

script: string;
workerIdleMemoryLimit?: number;
workerOptions?: WorkerOptions;
}

@@ -27,3 +27,9 @@ /**

availability: number;
maxWorkerMemoryUsage: number;
workerRestarts: number;
constructor(options: WorkerPoolOptions);
stats(): {
maxWorkerMemoryUsage: number;
workerRestarts: number;
};
ensureWorkers(): void;

@@ -34,4 +40,7 @@ captureWorkerStdioStreams(worker: Worker): void;

_exec(abortSignal?: AbortSignal): void;
checkMemoryUsage(worker: Worker): void;
freeWorker(worker: Worker): void;
restartWorker(worker: Worker): void;
close(): Promise<void>;
}
export {};

67

lib/WorkerPool.js

@@ -63,2 +63,4 @@ "use strict";

this.availability = 0;
this.maxWorkerMemoryUsage = 0;
this.workerRestarts = 0;
this.maxWorkers = (_a = this.options.maxWorkers) !== null && _a !== void 0 ? _a : os_1.default.cpus().length - 1;

@@ -78,2 +80,8 @@ this.availability = this.maxWorkers;

}
stats() {
return {
maxWorkerMemoryUsage: this.maxWorkerMemoryUsage,
workerRestarts: this.workerRestarts,
};
}
ensureWorkers() {

@@ -132,14 +140,26 @@ if (this.workers.length === 0) {

const msgHandler = (data) => {
// In case of success: Call the callback that was passed to `runTask`,
// remove the `TaskInfo` associated with the Worker, and mark it as free
// again.
Promise.all([worker[kWorkerCapturedStdoutPromise], worker[kWorkerCapturedStderrPromise]]).then(() => {
const { err, results } = data;
const weight = worker[kTaskInfo].weight;
worker[kTaskInfo].done(err, results);
worker[kTaskInfo] = null;
this.availability += weight;
this.freeWorkers.push(worker);
this.emit(kWorkerFreedEvent);
});
var _a;
if (data.type === "status") {
// In case of success: Call the callback that was passed to `runTask`,
// remove the `TaskInfo` associated with the Worker, and mark it as free
// again.
Promise.all([worker[kWorkerCapturedStdoutPromise], worker[kWorkerCapturedStderrPromise]]).then(() => {
const { err, results } = data;
const weight = worker[kTaskInfo].weight;
worker[kTaskInfo].done(err, results);
worker[kTaskInfo] = null;
this.availability += weight;
this.checkMemoryUsage(worker);
});
}
else if (data.type === "report-memory-usage") {
this.maxWorkerMemoryUsage = Math.max(this.maxWorkerMemoryUsage, data.memoryUsage);
const limit = (_a = this.options.workerIdleMemoryLimit) !== null && _a !== void 0 ? _a : os_1.default.totalmem();
if (limit && data.memoryUsage > limit) {
this.restartWorker(worker);
}
else {
this.freeWorker(worker);
}
}
};

@@ -157,8 +177,6 @@ worker.on("message", msgHandler);

this.emit("error", err);
// Remove the worker from the list and start a new Worker to replace the
// current one.
this.workers.splice(this.workers.indexOf(worker), 1);
this.addNewWorker();
this.restartWorker(worker);
});
};
// The 'error' event is emitted if the worker thread throws an uncaught exception. In that case, the worker is terminated.
worker.on("error", errHandler);

@@ -210,2 +228,19 @@ this.workers.push(worker);

}
checkMemoryUsage(worker) {
worker.postMessage({ type: "check-memory-usage" });
}
freeWorker(worker) {
this.freeWorkers.push(worker);
this.emit(kWorkerFreedEvent);
}
restartWorker(worker) {
this.workerRestarts++;
worker.terminate();
const freeWorkerIndex = this.freeWorkers.indexOf(worker);
if (freeWorkerIndex !== -1) {
this.freeWorkers.splice(freeWorkerIndex, 1); // remove from free workers list
}
this.workers.splice(this.workers.indexOf(worker), 1);
this.addNewWorker();
}
async close() {

@@ -212,0 +247,0 @@ for (const worker of this.workers) {

{
"name": "@lage-run/worker-threads-pool",
"version": "0.3.0",
"version": "0.4.0",
"description": "A worker_threads pool implementation based on the official Node.js async_hooks documentation",

@@ -5,0 +5,0 @@ "repository": {

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap

Packages

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc