Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

@lage-run/worker-threads-pool

Package Overview
Dependencies
Maintainers
1
Versions
26
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@lage-run/worker-threads-pool - npm Package Compare versions

Comparing version 0.4.5 to 0.5.0

lib/TaskInfo.d.ts

17

CHANGELOG.json

@@ -5,3 +5,18 @@ {

{
"date": "Wed, 16 Nov 2022 17:12:13 GMT",
"date": "Wed, 16 Nov 2022 20:06:53 GMT",
"tag": "@lage-run/worker-threads-pool_v0.5.0",
"version": "0.5.0",
"comments": {
"minor": [
{
"author": "kchau@microsoft.com",
"package": "@lage-run/worker-threads-pool",
"commit": "04f9c13f6f37c7a14a67ab27acaf253ad09bfbd8",
"comment": "Refactoring the worker threads pool to use proper classes for Worker"
}
]
}
},
{
"date": "Wed, 16 Nov 2022 17:12:24 GMT",
"tag": "@lage-run/worker-threads-pool_v0.4.5",

@@ -8,0 +23,0 @@ "version": "0.4.5",

# Change Log - @lage-run/worker-threads-pool
This log was last generated on Wed, 16 Nov 2022 17:12:13 GMT and should not be manually modified.
This log was last generated on Wed, 16 Nov 2022 20:06:53 GMT and should not be manually modified.
<!-- Start content -->
## 0.5.0
Wed, 16 Nov 2022 20:06:53 GMT
### Minor changes
- Refactoring the worker threads pool to use proper classes for Worker (kchau@microsoft.com)
## 0.4.5
Wed, 16 Nov 2022 17:12:13 GMT
Wed, 16 Nov 2022 17:12:24 GMT

@@ -11,0 +19,0 @@ ### Patches

5

lib/AggregatedPool.d.ts

@@ -6,5 +6,6 @@ /// <reference types="node" />

import type { Readable } from "stream";
import type { Worker, WorkerOptions } from "worker_threads";
import type { WorkerOptions } from "worker_threads";
import type { Pool } from "./types/Pool.js";
import type { Logger } from "@lage-run/logger";
import type { IWorker } from "./types/WorkerQueue.js";
import { WorkerPool } from "./WorkerPool.js";

@@ -29,5 +30,5 @@ interface AggregatedPoolOptions {

};
exec(data: Record<string, unknown>, weight: number, setup?: (worker: Worker, stdout: Readable, stderr: Readable) => void, cleanup?: (args: any) => void, abortSignal?: AbortSignal): Promise<unknown>;
exec(data: Record<string, unknown>, weight: number, setup?: (worker: IWorker, stdout: Readable, stderr: Readable) => void, cleanup?: (args: any) => void, abortSignal?: AbortSignal): Promise<unknown>;
close(): Promise<unknown>;
}
export {};
/// <reference types="node" />
/// <reference types="node" />
/// <reference types="node" />
/// <reference types="global" />
import type { Worker } from "worker_threads";
import type { Readable } from "stream";
import type { IWorker } from "./WorkerQueue.js";
export interface PoolStats {

@@ -12,5 +11,5 @@ maxWorkerMemoryUsage: number;

export interface Pool {
exec(data: unknown, weight: number, setup?: (worker: Worker, stdout: Readable, stderr: Readable) => void, cleanup?: (args: any) => void, abortSignal?: AbortSignal): Promise<unknown>;
exec(data: unknown, weight: number, setup?: (worker: IWorker, stdout: Readable, stderr: Readable) => void, cleanup?: (args: any) => void, abortSignal?: AbortSignal): Promise<unknown>;
stats(): PoolStats;
close(): Promise<unknown>;
}

@@ -1,33 +0,20 @@

/**
* Heavily based on a publically available worker pool implementation in node.js documentation:
* https://nodejs.org/api/async_context.html#using-asyncresource-for-a-worker-thread-pool
*/
/// <reference types="node" />
/// <reference types="node" />
/// <reference types="node" />
/// <reference types="global" />
/// <reference types="node" />
import { EventEmitter } from "events";
import { Worker } from "worker_threads";
import type { IWorker, QueueItem } from "./types/WorkerQueue.js";
import type { Pool } from "./types/Pool.js";
import type { Readable } from "stream";
import type { WorkerPoolOptions } from "./types/WorkerPoolOptions.js";
interface QueueItem {
setup?: (worker: Worker, stdout: Readable, stderr: Readable) => void;
cleanup?: (worker: Worker) => void;
task: Record<string, unknown>;
weight: number;
resolve: (value?: unknown) => void;
reject: (reason: unknown) => void;
}
export declare class WorkerPool extends EventEmitter implements Pool {
private options;
workers: Worker[];
freeWorkers: Worker[];
workers: IWorker[];
freeWorkers: IWorker[];
queue: QueueItem[];
maxWorkers: number;
availability: number;
maxWorkerMemoryUsage: number;
workerRestarts: number;
constructor(options: WorkerPoolOptions);
get workerRestarts(): number;
get maxWorkerMemoryUsage(): number;
stats(): {

@@ -38,11 +25,6 @@ maxWorkerMemoryUsage: number;

ensureWorkers(): void;
captureWorkerStdioStreams(worker: Worker): void;
addNewWorker(): void;
exec(task: Record<string, unknown>, weight: number, setup?: (worker: Worker, stdout: Readable, stderr: Readable) => void, cleanup?: (worker: Worker) => void, abortSignal?: AbortSignal): Promise<unknown>;
exec(task: Record<string, unknown>, weight: number, setup?: (worker: IWorker, stdout: Readable, stderr: Readable) => void, cleanup?: (worker: IWorker) => void, abortSignal?: AbortSignal): Promise<unknown>;
_exec(abortSignal?: AbortSignal): void;
checkMemoryUsage(worker: Worker): void;
freeWorker(worker: Worker): void;
restartWorker(worker: Worker): void;
close(): Promise<void>;
}
export {};

@@ -1,5 +0,2 @@

/**
* Heavily based on a publically available worker pool implementation in node.js documentation:
* https://nodejs.org/api/async_context.html#using-asyncresource-for-a-worker-thread-pool
*/ "use strict";
"use strict";
Object.defineProperty(exports, "__esModule", {

@@ -12,9 +9,4 @@ value: true

});
const _asyncHooks = require("async_hooks");
const _createFilteredStreamTransformJs = require("./createFilteredStreamTransform.js");
const _readline = require("readline");
const _stdioStreamMarkersJs = require("./stdioStreamMarkers.js");
const _events = require("events");
const _workerThreads = require("worker_threads");
const _crypto = /*#__PURE__*/ _interopRequireDefault(require("crypto"));
const _threadWorkerJs = require("./ThreadWorker.js");
const _os = /*#__PURE__*/ _interopRequireDefault(require("os"));

@@ -26,36 +18,10 @@ function _interopRequireDefault(obj) {

}
const kTaskInfo = Symbol("kTaskInfo");
const kWorkerFreedEvent = Symbol("kWorkerFreedEvent");
const kWorkerCapturedStdoutResolve = Symbol("kWorkerCapturedStdoutResolve");
const kWorkerCapturedStderrResolve = Symbol("kWorkerCapturedStderrResolve");
const kWorkerCapturedStdoutPromise = Symbol("kWorkerCapturedStdoutPromise");
const kWorkerCapturedStderrPromise = Symbol("kWorkerCapturedStderrPromise");
class WorkerPoolTaskInfo extends _asyncHooks.AsyncResource {
get id() {
return this.options.id;
const workerFreedEvent = "free";
class WorkerPool extends _events.EventEmitter {
get workerRestarts() {
return this.workers.reduce((acc, worker)=>acc + worker.restarts, 0);
}
get weight() {
return this.options.weight;
get maxWorkerMemoryUsage() {
return this.workers.reduce((acc, worker)=>Math.max(acc, worker.maxWorkerMemoryUsage), 0);
}
done(err, results) {
const { cleanup , worker , resolve , reject } = this.options;
if (cleanup) {
this.runInAsyncScope(cleanup, null, worker);
}
if (err) {
this.runInAsyncScope(reject, null, err, worker);
} else {
this.runInAsyncScope(resolve, null, results, worker);
}
this.emitDestroy();
}
constructor(options){
super("WorkerPoolTaskInfo");
this.options = options;
if (options.setup) {
this.runInAsyncScope(options.setup, null, options.worker, options.worker["filteredStdout"], options.worker["filteredStderr"]);
}
}
}
class WorkerPool extends _events.EventEmitter {
stats() {

@@ -74,102 +40,14 @@ return {

}
captureWorkerStdioStreams(worker) {
const stdout = worker.stdout;
const stdoutInterface = (0, _readline.createInterface)({
input: stdout,
crlfDelay: Infinity
});
const stderr = worker.stderr;
const stderrInterface = (0, _readline.createInterface)({
input: stderr,
crlfDelay: Infinity
});
const lineHandlerFactory = (outputType)=>{
let lines = [];
let resolve;
return (line)=>{
if (!worker[kTaskInfo]) {
// Somehow this lineHandler function is called AFTER the worker has been freed.
// This can happen if there are stray setTimeout(), etc. with callbacks that outputs some messages in stdout/stderr
// In this case, we will ignore the output
return;
}
if (line.includes((0, _stdioStreamMarkersJs.startMarker)(worker[kTaskInfo].id))) {
lines = [];
if (outputType === "stdout") {
resolve = worker[kWorkerCapturedStdoutResolve];
} else {
resolve = worker[kWorkerCapturedStderrResolve];
}
} else if (line.includes((0, _stdioStreamMarkersJs.endMarker)(worker[kTaskInfo].id))) {
resolve();
} else {
lines.push(line);
}
};
};
const stdoutLineHandler = lineHandlerFactory("stdout");
const stderrLineHandler = lineHandlerFactory("stderr");
stdoutInterface.on("line", stdoutLineHandler);
stderrInterface.on("line", stderrLineHandler);
}
addNewWorker() {
const { script , workerOptions } = this.options;
const worker = new _workerThreads.Worker(script, {
...workerOptions,
stdout: true,
stderr: true
const worker = new _threadWorkerJs.ThreadWorker(script, {
workerOptions,
workerIdleMemoryLimit: this.options.workerIdleMemoryLimit
});
worker[kWorkerCapturedStderrPromise] = Promise.resolve();
worker[kWorkerCapturedStdoutPromise] = Promise.resolve();
this.captureWorkerStdioStreams(worker);
worker["filteredStdout"] = worker.stdout.pipe((0, _createFilteredStreamTransformJs.createFilteredStreamTransform)());
worker["filteredStderr"] = worker.stderr.pipe((0, _createFilteredStreamTransformJs.createFilteredStreamTransform)());
const msgHandler = (data)=>{
if (data.type === "status") {
// In case of success: Call the callback that was passed to `runTask`,
// remove the `TaskInfo` associated with the Worker, and mark it as free
// again.
Promise.all([
worker[kWorkerCapturedStdoutPromise],
worker[kWorkerCapturedStderrPromise]
]).then(()=>{
const { err , results } = data;
const weight = worker[kTaskInfo].weight;
worker[kTaskInfo].done(err, results);
worker[kTaskInfo] = null;
this.availability += weight;
this.checkMemoryUsage(worker);
});
} else if (data.type === "report-memory-usage") {
this.maxWorkerMemoryUsage = Math.max(this.maxWorkerMemoryUsage, data.memoryUsage);
const limit = this.options.workerIdleMemoryLimit ?? _os.default.totalmem();
if (limit && data.memoryUsage > limit) {
this.restartWorker(worker);
} else {
this.freeWorker(worker);
}
}
};
worker.on("message", msgHandler);
const errHandler = (err)=>{
Promise.all([
worker[kWorkerCapturedStdoutPromise],
worker[kWorkerCapturedStderrPromise]
]).then(()=>{
// In case of an uncaught exception: Call the callback that was passed to
// `runTask` with the error.
if (worker[kTaskInfo]) {
const weight = worker[kTaskInfo].weight;
worker[kTaskInfo].done(err, null);
this.availability += weight;
}
this.emit("error", err);
this.restartWorker(worker);
});
};
// The 'error' event is emitted if the worker thread throws an uncaught exception. In that case, the worker is terminated.
worker.on("error", errHandler);
worker.on("free", (data)=>{
const { weight } = data;
this.availability += weight;
this.emit(workerFreedEvent);
});
this.workers.push(worker);
this.freeWorkers.push(worker);
this.emit(kWorkerFreedEvent);
}

@@ -203,67 +81,13 @@ exec(task, weight, setup, cleanup, abortSignal) {

}
if (this.freeWorkers.length > 0) {
const worker = this.freeWorkers.pop();
// This is to immediate execute tasks if there ARE free workers
// If there are no free workers, the "workerFreedEvent" will call this function again to start the task
const worker = this.workers.find((w)=>w.status === "free");
if (worker) {
const work = this.queue[workIndex];
this.queue.splice(workIndex, 1);
this.availability -= work.weight;
this.queue.splice(workIndex, 1);
const { task , resolve , reject , cleanup , setup } = work;
if (worker) {
abortSignal?.addEventListener("abort", ()=>{
worker.postMessage({
type: "abort"
});
});
const id = _crypto.default.randomBytes(32).toString("hex");
worker[kTaskInfo] = new WorkerPoolTaskInfo({
id,
weight: work.weight,
cleanup,
resolve,
reject,
worker,
setup
});
// Create a pair of promises that are only resolved when a specific task end marker is detected
// in the worker's stdout/stderr streams.
worker[kWorkerCapturedStdoutPromise] = new Promise((onResolve)=>{
worker[kWorkerCapturedStdoutResolve] = onResolve;
});
worker[kWorkerCapturedStderrPromise] = new Promise((onResolve)=>{
worker[kWorkerCapturedStderrResolve] = onResolve;
});
worker.postMessage({
type: "start",
task: {
...task,
weight: work.weight
},
id
});
}
worker.start(work, abortSignal);
}
}
checkMemoryUsage(worker) {
worker.postMessage({
type: "check-memory-usage"
});
}
freeWorker(worker) {
this.freeWorkers.push(worker);
this.emit(kWorkerFreedEvent);
}
restartWorker(worker) {
this.workerRestarts++;
worker.terminate();
const freeWorkerIndex = this.freeWorkers.indexOf(worker);
if (freeWorkerIndex !== -1) {
this.freeWorkers.splice(freeWorkerIndex, 1); // remove from free workers list
}
this.workers.splice(this.workers.indexOf(worker), 1);
this.addNewWorker();
}
async close() {
for (const worker of this.workers){
worker.removeAllListeners();
worker.unref();
}
await Promise.all(this.workers.map((worker)=>worker.terminate()));

@@ -279,4 +103,2 @@ }

this.availability = 0;
this.maxWorkerMemoryUsage = 0;
this.workerRestarts = 0;
this.maxWorkers = this.options.maxWorkers ?? _os.default.cpus().length - 1;

@@ -288,5 +110,5 @@ this.availability = this.maxWorkers;

this.ensureWorkers();
// Any time the kWorkerFreedEvent is emitted, dispatch
// Any time the workerFreedEvent is emitted, dispatch
// the next task pending in the queue, if any.
this.on(kWorkerFreedEvent, ()=>{
this.on(workerFreedEvent, ()=>{
if (this.queue.length > 0) {

@@ -293,0 +115,0 @@ this._exec();

{
"name": "@lage-run/worker-threads-pool",
"version": "0.4.5",
"version": "0.5.0",
"description": "A worker_threads pool implementation based on the official Node.js async_hooks documentation",

@@ -5,0 +5,0 @@ "repository": {

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc