@lage-run/worker-threads-pool
Advanced tools
Comparing version 0.1.3 to 0.1.4
@@ -5,3 +5,18 @@ { | ||
{ | ||
"date": "Fri, 30 Sep 2022 23:00:05 GMT", | ||
"date": "Sat, 01 Oct 2022 05:25:19 GMT", | ||
"tag": "@lage-run/worker-threads-pool_v0.1.4", | ||
"version": "0.1.4", | ||
"comments": { | ||
"patch": [ | ||
{ | ||
"author": "ken@gizzar.com", | ||
"package": "@lage-run/worker-threads-pool", | ||
"commit": "e850f24b770908df902212442baa5fb8ba04cf7c", | ||
"comment": "adds a stdio cpature inside threadpool" | ||
} | ||
] | ||
} | ||
}, | ||
{ | ||
"date": "Fri, 30 Sep 2022 23:00:17 GMT", | ||
"tag": "@lage-run/worker-threads-pool_v0.1.3", | ||
@@ -8,0 +23,0 @@ "version": "0.1.3", |
# Change Log - @lage-run/worker-threads-pool | ||
This log was last generated on Fri, 30 Sep 2022 23:00:05 GMT and should not be manually modified. | ||
This log was last generated on Sat, 01 Oct 2022 05:25:19 GMT and should not be manually modified. | ||
<!-- Start content --> | ||
## 0.1.4 | ||
Sat, 01 Oct 2022 05:25:19 GMT | ||
### Patches | ||
- adds a stdio cpature inside threadpool (ken@gizzar.com) | ||
## 0.1.3 | ||
Fri, 30 Sep 2022 23:00:05 GMT | ||
Fri, 30 Sep 2022 23:00:17 GMT | ||
@@ -11,0 +19,0 @@ ### Patches |
export { registerWorker } from "./registerWorker"; | ||
export { WorkerPool } from "./WorkerPool"; | ||
export type { Pool } from "./Pool"; |
@@ -5,5 +5,8 @@ "use strict"; | ||
const worker_threads_1 = require("worker_threads"); | ||
const stdioStreamMarkers_1 = require("./stdioStreamMarkers"); | ||
function registerWorker(fn) { | ||
worker_threads_1.parentPort === null || worker_threads_1.parentPort === void 0 ? void 0 : worker_threads_1.parentPort.on("message", async (task) => { | ||
try { | ||
process.stdout.write(`${stdioStreamMarkers_1.START_WORKER_STREAM_MARKER}\n`); | ||
process.stderr.write(`${stdioStreamMarkers_1.START_WORKER_STREAM_MARKER}\n`); | ||
const results = await fn(task); | ||
@@ -15,2 +18,6 @@ worker_threads_1.parentPort === null || worker_threads_1.parentPort === void 0 ? void 0 : worker_threads_1.parentPort.postMessage({ err: undefined, results }); | ||
} | ||
finally { | ||
process.stdout.write(`${stdioStreamMarkers_1.END_WORKER_STREAM_MARKER}\n`); | ||
process.stderr.write(`${stdioStreamMarkers_1.END_WORKER_STREAM_MARKER}\n`); | ||
} | ||
}); | ||
@@ -17,0 +24,0 @@ } |
@@ -8,2 +8,4 @@ /** | ||
import { Worker } from "worker_threads"; | ||
import type { Pool } from "./Pool"; | ||
import type { Readable } from "stream"; | ||
import type { WorkerOptions } from "worker_threads"; | ||
@@ -16,3 +18,3 @@ interface WorkerPoolOptions { | ||
interface QueueItem { | ||
setup?: (worker: Worker) => void; | ||
setup?: (worker: Worker, stdout: Readable, stderr: Readable) => void; | ||
cleanup?: (worker: Worker) => void; | ||
@@ -23,3 +25,3 @@ task: unknown; | ||
} | ||
export declare class WorkerPool extends EventEmitter { | ||
export declare class WorkerPool extends EventEmitter implements Pool { | ||
private options; | ||
@@ -30,4 +32,6 @@ workers: Worker[]; | ||
constructor(options: WorkerPoolOptions); | ||
ensureWorkers(): void; | ||
captureWorkerStdioStreams(worker: Worker): void; | ||
addNewWorker(): void; | ||
exec(task: unknown, setup?: (worker: Worker) => void, cleanup?: (worker: Worker) => void): Promise<unknown>; | ||
exec(task: unknown, setup?: (worker?: Worker, stdout?: Readable, stderr?: Readable) => void, cleanup?: (worker: Worker) => void): Promise<unknown>; | ||
_exec(): void; | ||
@@ -34,0 +38,0 @@ close(): Promise<void>; |
@@ -12,2 +12,5 @@ "use strict"; | ||
const async_hooks_1 = require("async_hooks"); | ||
const createFilteredStreamTransform_1 = require("./createFilteredStreamTransform"); | ||
const readline_1 = require("readline"); | ||
const stdioStreamMarkers_1 = require("./stdioStreamMarkers"); | ||
const events_1 = require("events"); | ||
@@ -18,2 +21,4 @@ const worker_threads_1 = require("worker_threads"); | ||
const kWorkerFreedEvent = Symbol("kWorkerFreedEvent"); | ||
const kWorkerCapturedStreamEvents = Symbol("kWorkerCapturedStreamEvents"); | ||
const kWorkerCapturedStreamPromise = Symbol("kWorkerCapturedStreamPromise"); | ||
class WorkerPoolTaskInfo extends async_hooks_1.AsyncResource { | ||
@@ -24,3 +29,3 @@ constructor(options) { | ||
if (options.setup) { | ||
this.runInAsyncScope(options.setup, null, options.worker); | ||
this.runInAsyncScope(options.setup, null, options.worker, options.worker["filteredStdout"], options.worker["filteredStderr"]); | ||
} | ||
@@ -49,9 +54,6 @@ } | ||
this.queue = []; | ||
const { maxWorkers = os_1.default.cpus().length - 1 } = options; | ||
this.workers = []; | ||
this.freeWorkers = []; | ||
this.queue = []; | ||
for (let i = 0; i < maxWorkers; i++) { | ||
this.addNewWorker(); | ||
} | ||
this.ensureWorkers(); | ||
// Any time the kWorkerFreedEvent is emitted, dispatch | ||
@@ -65,5 +67,51 @@ // the next task pending in the queue, if any. | ||
} | ||
ensureWorkers() { | ||
if (this.workers.length === 0) { | ||
const { maxWorkers = os_1.default.cpus().length - 1 } = this.options; | ||
for (let i = 0; i < maxWorkers; i++) { | ||
this.addNewWorker(); | ||
} | ||
} | ||
} | ||
captureWorkerStdioStreams(worker) { | ||
const capturedStreamEvent = new events_1.EventEmitter(); | ||
worker[kWorkerCapturedStreamEvents] = capturedStreamEvent; | ||
const stdout = worker.stdout; | ||
const stdoutInterface = (0, readline_1.createInterface)({ | ||
input: stdout, | ||
crlfDelay: Infinity, | ||
}); | ||
const stderr = worker.stderr; | ||
const stderrInterface = (0, readline_1.createInterface)({ | ||
input: stderr, | ||
crlfDelay: Infinity, | ||
}); | ||
const lineHandlerFactory = () => { | ||
let lines = []; | ||
return (line) => { | ||
if (line.includes(stdioStreamMarkers_1.START_WORKER_STREAM_MARKER)) { | ||
lines = []; | ||
} | ||
else if (line.includes(stdioStreamMarkers_1.END_WORKER_STREAM_MARKER)) { | ||
worker[kWorkerCapturedStreamEvents].emit("end", lines); | ||
} | ||
else { | ||
lines.push(line); | ||
} | ||
}; | ||
}; | ||
const stdoutLineHandler = lineHandlerFactory(); | ||
const stderrLineHandler = lineHandlerFactory(); | ||
stdoutInterface.on("line", stdoutLineHandler); | ||
stderrInterface.on("line", stderrLineHandler); | ||
} | ||
addNewWorker() { | ||
const { script, workerOptions } = this.options; | ||
const worker = new worker_threads_1.Worker(script, workerOptions); | ||
const worker = new worker_threads_1.Worker(script, Object.assign(Object.assign({}, workerOptions), { stdout: true, stderr: true })); | ||
const capturedStreamEvent = new events_1.EventEmitter(); | ||
worker[kWorkerCapturedStreamEvents] = capturedStreamEvent; | ||
worker[kWorkerCapturedStreamPromise] = Promise.resolve(); | ||
this.captureWorkerStdioStreams(worker); | ||
worker["filteredStdout"] = worker.stdout.pipe((0, createFilteredStreamTransform_1.createFilteredStreamTransform)()); | ||
worker["filteredStderr"] = worker.stderr.pipe((0, createFilteredStreamTransform_1.createFilteredStreamTransform)()); | ||
const msgHandler = (data) => { | ||
@@ -73,20 +121,24 @@ // In case of success: Call the callback that was passed to `runTask`, | ||
// again. | ||
const { err, results } = data; | ||
worker[kTaskInfo].done(err, results); | ||
worker[kTaskInfo] = null; | ||
this.freeWorkers.push(worker); | ||
this.emit(kWorkerFreedEvent); | ||
worker[kWorkerCapturedStreamPromise].then(() => { | ||
const { err, results } = data; | ||
worker[kTaskInfo].done(err, results); | ||
worker[kTaskInfo] = null; | ||
this.freeWorkers.push(worker); | ||
this.emit(kWorkerFreedEvent); | ||
}); | ||
}; | ||
worker.on("message", msgHandler); | ||
const errHandler = (err) => { | ||
// In case of an uncaught exception: Call the callback that was passed to | ||
// `runTask` with the error. | ||
if (worker[kTaskInfo]) { | ||
worker[kTaskInfo].done(err, null); | ||
} | ||
this.emit("error", err); | ||
// Remove the worker from the list and start a new Worker to replace the | ||
// current one. | ||
this.workers.splice(this.workers.indexOf(worker), 1); | ||
this.addNewWorker(); | ||
worker[kWorkerCapturedStreamPromise].then(() => { | ||
// In case of an uncaught exception: Call the callback that was passed to | ||
// `runTask` with the error. | ||
if (worker[kTaskInfo]) { | ||
worker[kTaskInfo].done(err, null); | ||
} | ||
this.emit("error", err); | ||
// Remove the worker from the list and start a new Worker to replace the | ||
// current one. | ||
this.workers.splice(this.workers.indexOf(worker), 1); | ||
this.addNewWorker(); | ||
}); | ||
}; | ||
@@ -111,2 +163,5 @@ worker.on("error", errHandler); | ||
worker[kTaskInfo] = new WorkerPoolTaskInfo({ cleanup, resolve, reject, worker, setup }); | ||
worker[kWorkerCapturedStreamPromise] = new Promise((onResolve) => { | ||
worker[kWorkerCapturedStreamEvents].once("end", onResolve); | ||
}); | ||
worker.postMessage(task); | ||
@@ -113,0 +168,0 @@ } |
{ | ||
"name": "@lage-run/worker-threads-pool", | ||
"version": "0.1.3", | ||
"version": "0.1.4", | ||
"description": "A worker_threads pool implementation based on the official Node.js async_hooks documentation", | ||
@@ -5,0 +5,0 @@ "repository": { |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
25996
24
427