Socket
Socket
Sign inDemoInstall

@lage-run/worker-threads-pool

Package Overview
Dependencies
Maintainers
1
Versions
24
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@lage-run/worker-threads-pool - npm Package Compare versions

Comparing version 0.2.0 to 0.3.0

17

CHANGELOG.json

@@ -5,3 +5,18 @@ {

{
"date": "Fri, 21 Oct 2022 21:36:26 GMT",
"date": "Wed, 26 Oct 2022 22:00:56 GMT",
"tag": "@lage-run/worker-threads-pool_v0.3.0",
"version": "0.3.0",
"comments": {
"minor": [
{
"author": "kchau@microsoft.com",
"package": "@lage-run/worker-threads-pool",
"commit": "d4f959e1c46d50725f556f92917f1a1cf3ee0bd9",
"comment": "adds support for weighted targets"
}
]
}
},
{
"date": "Fri, 21 Oct 2022 21:36:38 GMT",
"tag": "@lage-run/worker-threads-pool_v0.2.0",

@@ -8,0 +23,0 @@ "version": "0.2.0",

# Change Log - @lage-run/worker-threads-pool
This log was last generated on Fri, 21 Oct 2022 21:36:26 GMT and should not be manually modified.
This log was last generated on Wed, 26 Oct 2022 22:00:56 GMT and should not be manually modified.
<!-- Start content -->
## 0.3.0
Wed, 26 Oct 2022 22:00:56 GMT
### Minor changes
- adds support for weighted targets (kchau@microsoft.com)
## 0.2.0
Fri, 21 Oct 2022 21:36:26 GMT
Fri, 21 Oct 2022 21:36:38 GMT

@@ -11,0 +19,0 @@ ### Minor changes

2

lib/AggregatedPool.d.ts

@@ -21,5 +21,5 @@ /// <reference types="node" />

constructor(options: AggregatedPoolOptions);
exec(data: unknown, setup?: (worker: Worker, stdout: Readable, stderr: Readable) => void, cleanup?: (args: any) => void, abortSignal?: AbortSignal): Promise<unknown>;
exec(data: Record<string, unknown>, weight: number, setup?: (worker: Worker, stdout: Readable, stderr: Readable) => void, cleanup?: (args: any) => void, abortSignal?: AbortSignal): Promise<unknown>;
close(): Promise<unknown>;
}
export {};

@@ -23,7 +23,7 @@ "use strict";

}
this.options.logger.verbose(`Workers pools created: ${[...maxWorkersByGroup.entries()]
this.options.logger.verbose(`Workers pools created: ${[...maxWorkersByGroup.entries(), ["default", defaultPoolWorkersCount]]
.map(([group, count]) => `${group} (${count})`)
.join(", ")}, default (${defaultPoolWorkersCount})`);
.join(", ")}`);
}
async exec(data, setup, cleanup, abortSignal) {
async exec(data, weight, setup, cleanup, abortSignal) {
var _a;

@@ -35,3 +35,3 @@ const group = this.options.groupBy(data);

}
return pool.exec(data, setup, cleanup, abortSignal);
return pool.exec(data, weight, setup, cleanup, abortSignal);
}

@@ -38,0 +38,0 @@ async close() {

@@ -6,4 +6,4 @@ /// <reference types="node" />

export interface Pool {
exec(data: unknown, setup?: (worker: Worker, stdout: Readable, stderr: Readable) => void, cleanup?: (args: any) => void, abortSignal?: AbortSignal): Promise<unknown>;
exec(data: unknown, weight: number, setup?: (worker: Worker, stdout: Readable, stderr: Readable) => void, cleanup?: (args: any) => void, abortSignal?: AbortSignal): Promise<unknown>;
close(): Promise<unknown>;
}

@@ -15,3 +15,4 @@ /**

cleanup?: (worker: Worker) => void;
task: unknown;
task: Record<string, unknown>;
weight: number;
resolve: (value?: unknown) => void;

@@ -25,2 +26,4 @@ reject: (reason: unknown) => void;

queue: QueueItem[];
maxWorkers: number;
availability: number;
constructor(options: WorkerPoolOptions);

@@ -30,3 +33,3 @@ ensureWorkers(): void;

addNewWorker(): void;
exec(task: unknown, setup?: (worker: Worker, stdout: Readable, stderr: Readable) => void, cleanup?: (worker: Worker) => void, abortSignal?: AbortSignal): Promise<unknown>;
exec(task: Record<string, unknown>, weight: number, setup?: (worker: Worker, stdout: Readable, stderr: Readable) => void, cleanup?: (worker: Worker) => void, abortSignal?: AbortSignal): Promise<unknown>;
_exec(abortSignal?: AbortSignal): void;

@@ -33,0 +36,0 @@ close(): Promise<void>;

@@ -36,2 +36,5 @@ "use strict";

}
get weight() {
return this.options.weight;
}
done(err, results) {

@@ -53,2 +56,3 @@ const { cleanup, worker, resolve, reject } = this.options;

constructor(options) {
var _a;
super();

@@ -59,2 +63,6 @@ this.options = options;

this.queue = [];
this.maxWorkers = 0;
this.availability = 0;
this.maxWorkers = (_a = this.options.maxWorkers) !== null && _a !== void 0 ? _a : os_1.default.cpus().length - 1;
this.availability = this.maxWorkers;
this.workers = [];

@@ -74,4 +82,3 @@ this.freeWorkers = [];

if (this.workers.length === 0) {
const { maxWorkers = os_1.default.cpus().length - 1 } = this.options;
for (let i = 0; i < maxWorkers; i++) {
for (let i = 0; i < this.maxWorkers; i++) {
this.addNewWorker();

@@ -132,4 +139,6 @@ }

const { err, results } = data;
const weight = worker[kTaskInfo].weight;
worker[kTaskInfo].done(err, results);
worker[kTaskInfo] = null;
this.availability += weight;
this.freeWorkers.push(worker);

@@ -145,3 +154,5 @@ this.emit(kWorkerFreedEvent);

if (worker[kTaskInfo]) {
const weight = worker[kTaskInfo].weight;
worker[kTaskInfo].done(err, null);
this.availability += weight;
}

@@ -160,8 +171,10 @@ this.emit("error", err);

}
exec(task, setup, cleanup, abortSignal) {
exec(task, weight, setup, cleanup, abortSignal) {
if (abortSignal === null || abortSignal === void 0 ? void 0 : abortSignal.aborted) {
return Promise.resolve();
}
// cull the weight of the task to be [1, maxWorkers]
weight = Math.min(Math.max(1, weight), this.maxWorkers);
return new Promise((resolve, reject) => {
this.queue.push({ task, resolve, reject, cleanup, setup });
this.queue.push({ task: Object.assign(Object.assign({}, task), { weight }), weight, resolve, reject, cleanup, setup });
this._exec(abortSignal);

@@ -171,5 +184,12 @@ });

_exec(abortSignal) {
// find work that will fit the availability of workers
const workIndex = this.queue.findIndex((item) => item.weight <= this.availability);
if (workIndex === -1) {
return;
}
if (this.freeWorkers.length > 0) {
const worker = this.freeWorkers.pop();
const work = this.queue.shift();
const work = this.queue[workIndex];
this.availability -= work.weight;
this.queue.splice(workIndex, 1);
const { task, resolve, reject, cleanup, setup } = work;

@@ -181,3 +201,3 @@ if (worker) {

const id = crypto_1.default.randomBytes(32).toString("hex");
worker[kTaskInfo] = new WorkerPoolTaskInfo({ id, cleanup, resolve, reject, worker, setup });
worker[kTaskInfo] = new WorkerPoolTaskInfo({ id, weight: work.weight, cleanup, resolve, reject, worker, setup });
// Create a pair of promises that are only resolved when a specific task end marker is detected

@@ -191,3 +211,3 @@ // in the worker's stdout/stderr streams.

});
worker.postMessage({ type: "start", task, id });
worker.postMessage({ type: "start", task: Object.assign(Object.assign({}, task), { weight: work.weight }), id });
}

@@ -194,0 +214,0 @@ }

{
"name": "@lage-run/worker-threads-pool",
"version": "0.2.0",
"version": "0.3.0",
"description": "A worker_threads pool implementation based on the official Node.js async_hooks documentation",

@@ -5,0 +5,0 @@ "repository": {

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc