Socket
Socket
Sign inDemoInstall

@lage-run/worker-threads-pool

Package Overview
Dependencies
0
Maintainers
1
Versions
22
Alerts
File Explorer

Advanced tools

Install Socket

Detect and block malicious and high-risk dependencies

Install

Comparing version 0.1.3 to 0.1.4

lib/createFilteredStreamTransform.d.ts

17

CHANGELOG.json

@@ -5,3 +5,18 @@ {

{
"date": "Fri, 30 Sep 2022 23:00:05 GMT",
"date": "Sat, 01 Oct 2022 05:25:19 GMT",
"tag": "@lage-run/worker-threads-pool_v0.1.4",
"version": "0.1.4",
"comments": {
"patch": [
{
"author": "ken@gizzar.com",
"package": "@lage-run/worker-threads-pool",
"commit": "e850f24b770908df902212442baa5fb8ba04cf7c",
"comment": "adds a stdio cpature inside threadpool"
}
]
}
},
{
"date": "Fri, 30 Sep 2022 23:00:17 GMT",
"tag": "@lage-run/worker-threads-pool_v0.1.3",

@@ -8,0 +23,0 @@ "version": "0.1.3",

# Change Log - @lage-run/worker-threads-pool
This log was last generated on Fri, 30 Sep 2022 23:00:05 GMT and should not be manually modified.
This log was last generated on Sat, 01 Oct 2022 05:25:19 GMT and should not be manually modified.
<!-- Start content -->
## 0.1.4
Sat, 01 Oct 2022 05:25:19 GMT
### Patches
- adds a stdio cpature inside threadpool (ken@gizzar.com)
## 0.1.3
Fri, 30 Sep 2022 23:00:05 GMT
Fri, 30 Sep 2022 23:00:17 GMT

@@ -11,0 +19,0 @@ ### Patches

export { registerWorker } from "./registerWorker";
export { WorkerPool } from "./WorkerPool";
export type { Pool } from "./Pool";

@@ -5,5 +5,8 @@ "use strict";

const worker_threads_1 = require("worker_threads");
const stdioStreamMarkers_1 = require("./stdioStreamMarkers");
function registerWorker(fn) {
worker_threads_1.parentPort === null || worker_threads_1.parentPort === void 0 ? void 0 : worker_threads_1.parentPort.on("message", async (task) => {
try {
process.stdout.write(`${stdioStreamMarkers_1.START_WORKER_STREAM_MARKER}\n`);
process.stderr.write(`${stdioStreamMarkers_1.START_WORKER_STREAM_MARKER}\n`);
const results = await fn(task);

@@ -15,2 +18,6 @@ worker_threads_1.parentPort === null || worker_threads_1.parentPort === void 0 ? void 0 : worker_threads_1.parentPort.postMessage({ err: undefined, results });

}
finally {
process.stdout.write(`${stdioStreamMarkers_1.END_WORKER_STREAM_MARKER}\n`);
process.stderr.write(`${stdioStreamMarkers_1.END_WORKER_STREAM_MARKER}\n`);
}
});

@@ -17,0 +24,0 @@ }

10

lib/WorkerPool.d.ts

@@ -8,2 +8,4 @@ /**

import { Worker } from "worker_threads";
import type { Pool } from "./Pool";
import type { Readable } from "stream";
import type { WorkerOptions } from "worker_threads";

@@ -16,3 +18,3 @@ interface WorkerPoolOptions {

interface QueueItem {
setup?: (worker: Worker) => void;
setup?: (worker: Worker, stdout: Readable, stderr: Readable) => void;
cleanup?: (worker: Worker) => void;

@@ -23,3 +25,3 @@ task: unknown;

}
export declare class WorkerPool extends EventEmitter {
export declare class WorkerPool extends EventEmitter implements Pool {
private options;

@@ -30,4 +32,6 @@ workers: Worker[];

constructor(options: WorkerPoolOptions);
ensureWorkers(): void;
captureWorkerStdioStreams(worker: Worker): void;
addNewWorker(): void;
exec(task: unknown, setup?: (worker: Worker) => void, cleanup?: (worker: Worker) => void): Promise<unknown>;
exec(task: unknown, setup?: (worker?: Worker, stdout?: Readable, stderr?: Readable) => void, cleanup?: (worker: Worker) => void): Promise<unknown>;
_exec(): void;

@@ -34,0 +38,0 @@ close(): Promise<void>;

@@ -12,2 +12,5 @@ "use strict";

const async_hooks_1 = require("async_hooks");
const createFilteredStreamTransform_1 = require("./createFilteredStreamTransform");
const readline_1 = require("readline");
const stdioStreamMarkers_1 = require("./stdioStreamMarkers");
const events_1 = require("events");

@@ -18,2 +21,4 @@ const worker_threads_1 = require("worker_threads");

const kWorkerFreedEvent = Symbol("kWorkerFreedEvent");
const kWorkerCapturedStreamEvents = Symbol("kWorkerCapturedStreamEvents");
const kWorkerCapturedStreamPromise = Symbol("kWorkerCapturedStreamPromise");
class WorkerPoolTaskInfo extends async_hooks_1.AsyncResource {

@@ -24,3 +29,3 @@ constructor(options) {

if (options.setup) {
this.runInAsyncScope(options.setup, null, options.worker);
this.runInAsyncScope(options.setup, null, options.worker, options.worker["filteredStdout"], options.worker["filteredStderr"]);
}

@@ -49,9 +54,6 @@ }

this.queue = [];
const { maxWorkers = os_1.default.cpus().length - 1 } = options;
this.workers = [];
this.freeWorkers = [];
this.queue = [];
for (let i = 0; i < maxWorkers; i++) {
this.addNewWorker();
}
this.ensureWorkers();
// Any time the kWorkerFreedEvent is emitted, dispatch

@@ -65,5 +67,51 @@ // the next task pending in the queue, if any.

}
ensureWorkers() {
if (this.workers.length === 0) {
const { maxWorkers = os_1.default.cpus().length - 1 } = this.options;
for (let i = 0; i < maxWorkers; i++) {
this.addNewWorker();
}
}
}
captureWorkerStdioStreams(worker) {
const capturedStreamEvent = new events_1.EventEmitter();
worker[kWorkerCapturedStreamEvents] = capturedStreamEvent;
const stdout = worker.stdout;
const stdoutInterface = (0, readline_1.createInterface)({
input: stdout,
crlfDelay: Infinity,
});
const stderr = worker.stderr;
const stderrInterface = (0, readline_1.createInterface)({
input: stderr,
crlfDelay: Infinity,
});
const lineHandlerFactory = () => {
let lines = [];
return (line) => {
if (line.includes(stdioStreamMarkers_1.START_WORKER_STREAM_MARKER)) {
lines = [];
}
else if (line.includes(stdioStreamMarkers_1.END_WORKER_STREAM_MARKER)) {
worker[kWorkerCapturedStreamEvents].emit("end", lines);
}
else {
lines.push(line);
}
};
};
const stdoutLineHandler = lineHandlerFactory();
const stderrLineHandler = lineHandlerFactory();
stdoutInterface.on("line", stdoutLineHandler);
stderrInterface.on("line", stderrLineHandler);
}
addNewWorker() {
const { script, workerOptions } = this.options;
const worker = new worker_threads_1.Worker(script, workerOptions);
const worker = new worker_threads_1.Worker(script, Object.assign(Object.assign({}, workerOptions), { stdout: true, stderr: true }));
const capturedStreamEvent = new events_1.EventEmitter();
worker[kWorkerCapturedStreamEvents] = capturedStreamEvent;
worker[kWorkerCapturedStreamPromise] = Promise.resolve();
this.captureWorkerStdioStreams(worker);
worker["filteredStdout"] = worker.stdout.pipe((0, createFilteredStreamTransform_1.createFilteredStreamTransform)());
worker["filteredStderr"] = worker.stderr.pipe((0, createFilteredStreamTransform_1.createFilteredStreamTransform)());
const msgHandler = (data) => {

@@ -73,20 +121,24 @@ // In case of success: Call the callback that was passed to `runTask`,

// again.
const { err, results } = data;
worker[kTaskInfo].done(err, results);
worker[kTaskInfo] = null;
this.freeWorkers.push(worker);
this.emit(kWorkerFreedEvent);
worker[kWorkerCapturedStreamPromise].then(() => {
const { err, results } = data;
worker[kTaskInfo].done(err, results);
worker[kTaskInfo] = null;
this.freeWorkers.push(worker);
this.emit(kWorkerFreedEvent);
});
};
worker.on("message", msgHandler);
const errHandler = (err) => {
// In case of an uncaught exception: Call the callback that was passed to
// `runTask` with the error.
if (worker[kTaskInfo]) {
worker[kTaskInfo].done(err, null);
}
this.emit("error", err);
// Remove the worker from the list and start a new Worker to replace the
// current one.
this.workers.splice(this.workers.indexOf(worker), 1);
this.addNewWorker();
worker[kWorkerCapturedStreamPromise].then(() => {
// In case of an uncaught exception: Call the callback that was passed to
// `runTask` with the error.
if (worker[kTaskInfo]) {
worker[kTaskInfo].done(err, null);
}
this.emit("error", err);
// Remove the worker from the list and start a new Worker to replace the
// current one.
this.workers.splice(this.workers.indexOf(worker), 1);
this.addNewWorker();
});
};

@@ -111,2 +163,5 @@ worker.on("error", errHandler);

worker[kTaskInfo] = new WorkerPoolTaskInfo({ cleanup, resolve, reject, worker, setup });
worker[kWorkerCapturedStreamPromise] = new Promise((onResolve) => {
worker[kWorkerCapturedStreamEvents].once("end", onResolve);
});
worker.postMessage(task);

@@ -113,0 +168,0 @@ }

{
"name": "@lage-run/worker-threads-pool",
"version": "0.1.3",
"version": "0.1.4",
"description": "A worker_threads pool implementation based on the official Node.js async_hooks documentation",

@@ -5,0 +5,0 @@ "repository": {

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap

Packages

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc