Socket
Socket
Sign inDemoInstall

@lage-run/worker-threads-pool

Package Overview
Dependencies
0
Maintainers
1
Versions
22
Alerts
File Explorer

Advanced tools

Install Socket

Detect and block malicious and high-risk dependencies

Install

Comparing version 0.1.6 to 0.1.7

17

CHANGELOG.json

@@ -5,3 +5,18 @@ {

{
"date": "Tue, 04 Oct 2022 03:38:40 GMT",
"date": "Thu, 06 Oct 2022 04:37:06 GMT",
"tag": "@lage-run/worker-threads-pool_v0.1.7",
"version": "0.1.7",
"comments": {
"patch": [
{
"author": "kchau@microsoft.com",
"package": "@lage-run/worker-threads-pool",
"commit": "7356755d5f829c3f3eb0cb9242753d30639a9a3a",
"comment": "fixes stdio streams capturing so the logs belong to the right worker and task"
}
]
}
},
{
"date": "Tue, 04 Oct 2022 03:38:54 GMT",
"tag": "@lage-run/worker-threads-pool_v0.1.6",

@@ -8,0 +23,0 @@ "version": "0.1.6",

# Change Log - @lage-run/worker-threads-pool
This log was last generated on Tue, 04 Oct 2022 03:38:40 GMT and should not be manually modified.
This log was last generated on Thu, 06 Oct 2022 04:37:06 GMT and should not be manually modified.
<!-- Start content -->
## 0.1.7
Thu, 06 Oct 2022 04:37:06 GMT
### Patches
- fixes stdio streams capturing so the logs belong to the right worker and task (kchau@microsoft.com)
## 0.1.6
Tue, 04 Oct 2022 03:38:40 GMT
Tue, 04 Oct 2022 03:38:54 GMT

@@ -11,0 +19,0 @@ ### Patches

8

lib/createFilteredStreamTransform.js

@@ -10,7 +10,7 @@ "use strict";

let str = chunk.toString();
if (str.includes(stdioStreamMarkers_1.START_WORKER_STREAM_MARKER)) {
str = str.replace(stdioStreamMarkers_1.START_WORKER_STREAM_MARKER + "\n", "");
if (str.includes(stdioStreamMarkers_1.START_MARKER_PREFIX)) {
str = str.replace(new RegExp(stdioStreamMarkers_1.START_MARKER_PREFIX + "[0-9a-z]{64}\n"), "");
}
if (str.includes(stdioStreamMarkers_1.END_WORKER_STREAM_MARKER)) {
str = str.replace(stdioStreamMarkers_1.END_WORKER_STREAM_MARKER + "\n", "");
if (str.includes(stdioStreamMarkers_1.END_MARKER_PREFIX)) {
str = str.replace(new RegExp(stdioStreamMarkers_1.END_MARKER_PREFIX + "[0-9a-z]{64}\n"), "");
}

@@ -17,0 +17,0 @@ callback(null, str);

@@ -13,3 +13,3 @@ "use strict";

abortController = new abort_controller_1.AbortController();
return message.task && (await start(message.task, abortController.signal));
return message.task && (await start(message.id, message.task, abortController.signal));
case "abort":

@@ -19,6 +19,6 @@ return abortController === null || abortController === void 0 ? void 0 : abortController.abort();

});
async function start(task, abortSignal) {
async function start(workerTaskId, task, abortSignal) {
try {
process.stdout.write(`${stdioStreamMarkers_1.START_WORKER_STREAM_MARKER}\n`);
process.stderr.write(`${stdioStreamMarkers_1.START_WORKER_STREAM_MARKER}\n`);
process.stdout.write(`${(0, stdioStreamMarkers_1.startMarker)(workerTaskId)}\n`);
process.stderr.write(`${(0, stdioStreamMarkers_1.startMarker)(workerTaskId)}\n`);
const results = await fn(task, abortSignal);

@@ -31,4 +31,4 @@ worker_threads_1.parentPort === null || worker_threads_1.parentPort === void 0 ? void 0 : worker_threads_1.parentPort.postMessage({ err: undefined, results });

finally {
process.stdout.write(`${stdioStreamMarkers_1.END_WORKER_STREAM_MARKER}\n`);
process.stderr.write(`${stdioStreamMarkers_1.END_WORKER_STREAM_MARKER}\n`);
process.stdout.write(`${(0, stdioStreamMarkers_1.endMarker)(workerTaskId)}\n`);
process.stderr.write(`${(0, stdioStreamMarkers_1.endMarker)(workerTaskId)}\n`);
}

@@ -35,0 +35,0 @@ }

@@ -1,2 +0,4 @@

export declare const START_WORKER_STREAM_MARKER = "## WORKER:START:";
export declare const END_WORKER_STREAM_MARKER = "## WORKER:END:";
export declare const START_MARKER_PREFIX = "## WORKER:START:";
export declare const END_MARKER_PREFIX = "## WORKER:END:";
export declare function startMarker(id: string): string;
export declare function endMarker(id: string): string;
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.END_WORKER_STREAM_MARKER = exports.START_WORKER_STREAM_MARKER = void 0;
exports.START_WORKER_STREAM_MARKER = "## WORKER:START:";
exports.END_WORKER_STREAM_MARKER = "## WORKER:END:";
exports.endMarker = exports.startMarker = exports.END_MARKER_PREFIX = exports.START_MARKER_PREFIX = void 0;
exports.START_MARKER_PREFIX = "## WORKER:START:";
exports.END_MARKER_PREFIX = "## WORKER:END:";
function startMarker(id) {
return `${exports.START_MARKER_PREFIX}${id}`;
}
exports.startMarker = startMarker;
function endMarker(id) {
return `${exports.END_MARKER_PREFIX}${id}`;
}
exports.endMarker = endMarker;
//# sourceMappingURL=stdioStreamMarkers.js.map

@@ -17,7 +17,10 @@ "use strict";

const worker_threads_1 = require("worker_threads");
const crypto_1 = __importDefault(require("crypto"));
const os_1 = __importDefault(require("os"));
const kTaskInfo = Symbol("kTaskInfo");
const kWorkerFreedEvent = Symbol("kWorkerFreedEvent");
const kWorkerCapturedStreamEvents = Symbol("kWorkerCapturedStreamEvents");
const kWorkerCapturedStreamPromise = Symbol("kWorkerCapturedStreamPromise");
const kWorkerCapturedStdoutResolve = Symbol("kWorkerCapturedStdoutResolve");
const kWorkerCapturedStderrResolve = Symbol("kWorkerCapturedStderrResolve");
const kWorkerCapturedStdoutPromise = Symbol("kWorkerCapturedStdoutPromise");
const kWorkerCapturedStderrPromise = Symbol("kWorkerCapturedStderrPromise");
class WorkerPoolTaskInfo extends async_hooks_1.AsyncResource {

@@ -31,2 +34,5 @@ constructor(options) {

}
get id() {
return this.options.id;
}
done(err, results) {

@@ -74,4 +80,2 @@ const { cleanup, worker, resolve, reject } = this.options;

captureWorkerStdioStreams(worker) {
const capturedStreamEvent = new events_1.EventEmitter();
worker[kWorkerCapturedStreamEvents] = capturedStreamEvent;
const stdout = worker.stdout;

@@ -87,10 +91,17 @@ const stdoutInterface = (0, readline_1.createInterface)({

});
const lineHandlerFactory = () => {
const lineHandlerFactory = (outputType) => {
let lines = [];
let resolve;
return (line) => {
if (line.includes(stdioStreamMarkers_1.START_WORKER_STREAM_MARKER)) {
if (line.includes((0, stdioStreamMarkers_1.startMarker)(worker[kTaskInfo].id))) {
lines = [];
if (outputType === "stdout") {
resolve = worker[kWorkerCapturedStdoutResolve];
}
else {
resolve = worker[kWorkerCapturedStderrResolve];
}
}
else if (line.includes(stdioStreamMarkers_1.END_WORKER_STREAM_MARKER)) {
worker[kWorkerCapturedStreamEvents].emit("end", lines);
else if (line.includes((0, stdioStreamMarkers_1.endMarker)(worker[kTaskInfo].id))) {
resolve();
}

@@ -102,4 +113,4 @@ else {

};
const stdoutLineHandler = lineHandlerFactory();
const stderrLineHandler = lineHandlerFactory();
const stdoutLineHandler = lineHandlerFactory("stdout");
const stderrLineHandler = lineHandlerFactory("stderr");
stdoutInterface.on("line", stdoutLineHandler);

@@ -111,5 +122,4 @@ stderrInterface.on("line", stderrLineHandler);

const worker = new worker_threads_1.Worker(script, Object.assign(Object.assign({}, workerOptions), { stdout: true, stderr: true }));
const capturedStreamEvent = new events_1.EventEmitter();
worker[kWorkerCapturedStreamEvents] = capturedStreamEvent;
worker[kWorkerCapturedStreamPromise] = Promise.resolve();
worker[kWorkerCapturedStderrPromise] = Promise.resolve();
worker[kWorkerCapturedStdoutPromise] = Promise.resolve();
this.captureWorkerStdioStreams(worker);

@@ -122,3 +132,3 @@ worker["filteredStdout"] = worker.stdout.pipe((0, createFilteredStreamTransform_1.createFilteredStreamTransform)());

// again.
worker[kWorkerCapturedStreamPromise].then(() => {
Promise.all([worker[kWorkerCapturedStdoutPromise], worker[kWorkerCapturedStderrPromise]]).then(() => {
const { err, results } = data;

@@ -133,3 +143,3 @@ worker[kTaskInfo].done(err, results);

const errHandler = (err) => {
worker[kWorkerCapturedStreamPromise].then(() => {
Promise.all([worker[kWorkerCapturedStdoutPromise], worker[kWorkerCapturedStderrPromise]]).then(() => {
// In case of an uncaught exception: Call the callback that was passed to

@@ -167,7 +177,13 @@ // `runTask` with the error.

});
worker[kTaskInfo] = new WorkerPoolTaskInfo({ cleanup, resolve, reject, worker, setup });
worker[kWorkerCapturedStreamPromise] = new Promise((onResolve) => {
worker[kWorkerCapturedStreamEvents].once("end", onResolve);
const id = crypto_1.default.randomBytes(32).toString("hex");
worker[kTaskInfo] = new WorkerPoolTaskInfo({ id, cleanup, resolve, reject, worker, setup });
// Create a pair of promises that are only resolved when a specific task end marker is detected
// in the worker's stdout/stderr streams.
worker[kWorkerCapturedStdoutPromise] = new Promise((onResolve) => {
worker[kWorkerCapturedStdoutResolve] = onResolve;
});
worker.postMessage({ type: "start", task });
worker[kWorkerCapturedStderrPromise] = new Promise((onResolve) => {
worker[kWorkerCapturedStderrResolve] = onResolve;
});
worker.postMessage({ type: "start", task, id });
}

@@ -174,0 +190,0 @@ }

{
"name": "@lage-run/worker-threads-pool",
"version": "0.1.6",
"version": "0.1.7",
"description": "A worker_threads pool implementation based on the official Node.js async_hooks documentation",

@@ -5,0 +5,0 @@ "repository": {

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap

Packages

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc