gatsby-worker
Advanced tools
Comparing version 2.3.0-next.1 to 2.4.0-next.0
"use strict"; | ||
var _interopRequireDefault = require("@babel/runtime/helpers/interopRequireDefault"); | ||
exports.__esModule = true; | ||
exports.isWorker = exports.getMessenger = void 0; | ||
var _signalExit = _interopRequireDefault(require("signal-exit")); | ||
var _fsExtra = _interopRequireDefault(require("fs-extra")); | ||
var _types = require("./types"); | ||
@@ -10,2 +16,4 @@ | ||
let counter = 0; | ||
/** | ||
@@ -23,7 +31,23 @@ * Used to check wether current context is executed in worker process | ||
if (process.send && process.env.GATSBY_WORKER_MODULE_PATH) { | ||
if (process.send && process.env.GATSBY_WORKER_MODULE_PATH && process.env.GATSBY_WORKER_IN_FLIGHT_DUMP_LOCATION) { | ||
const workerInFlightsDumpLocation = process.env.GATSBY_WORKER_IN_FLIGHT_DUMP_LOCATION; | ||
exports.isWorker = isWorker = true; | ||
const listeners = []; | ||
const ensuredSendToMain = process.send.bind(process); | ||
const inFlightMessages = new Set(); | ||
(0, _signalExit.default)(() => { | ||
if (inFlightMessages.size > 0) { | ||
// this need to be sync | ||
_fsExtra.default.outputJsonSync(workerInFlightsDumpLocation, Array.from(inFlightMessages)); | ||
} | ||
}); | ||
function ensuredSendToMain(msg) { | ||
inFlightMessages.add(msg); | ||
process.send(msg, undefined, undefined, error => { | ||
if (!error) { | ||
inFlightMessages.delete(msg); | ||
} | ||
}); | ||
} | ||
function onError(error) { | ||
@@ -34,3 +58,3 @@ if (error == null) { | ||
const msg = [_types.ERROR, error.constructor && error.constructor.name, error.message, error.stack, error]; | ||
const msg = [_types.ERROR, ++counter, error.constructor && error.constructor.name, error.message, error.stack, error]; | ||
ensuredSendToMain(msg); | ||
@@ -40,3 +64,3 @@ } | ||
function onResult(result) { | ||
const msg = [_types.RESULT, result]; | ||
const msg = [_types.RESULT, ++counter, result]; | ||
ensuredSendToMain(msg); | ||
@@ -54,3 +78,3 @@ } | ||
sendMessage(msg) { | ||
const poolMsg = [_types.CUSTOM_MESSAGE, msg]; | ||
const poolMsg = [_types.CUSTOM_MESSAGE, ++counter, msg]; | ||
ensuredSendToMain(poolMsg); | ||
@@ -70,3 +94,3 @@ }, | ||
try { | ||
result = child[msg[1]].call(child, ...msg[2]); | ||
result = child[msg[2]].call(child, ...msg[3]); | ||
} catch (e) { | ||
@@ -86,3 +110,3 @@ onError(e); | ||
for (const listener of listeners) { | ||
listener(msg[1]); | ||
listener(msg[2]); | ||
} | ||
@@ -93,3 +117,3 @@ } | ||
process.on(`message`, messageHandler); | ||
ensuredSendToMain([_types.WORKER_READY]); | ||
ensuredSendToMain([_types.WORKER_READY, ++counter]); | ||
} |
@@ -46,2 +46,3 @@ interface IWorkerOptions { | ||
private listeners; | ||
private counter; | ||
constructor(workerPath: string, options?: IWorkerOptions | undefined); | ||
@@ -48,0 +49,0 @@ private startAll; |
"use strict"; | ||
var _interopRequireDefault = require("@babel/runtime/helpers/interopRequireDefault"); | ||
exports.__esModule = true; | ||
@@ -11,2 +13,8 @@ var _exportNames = { | ||
var _fsExtra = _interopRequireDefault(require("fs-extra")); | ||
var _os = _interopRequireDefault(require("os")); | ||
var _path = _interopRequireDefault(require("path")); | ||
var _taskQueue = require("./task-queue"); | ||
@@ -60,2 +68,3 @@ | ||
listeners = []; | ||
counter = 0; | ||
@@ -92,2 +101,6 @@ constructor(workerPath, options) { | ||
startAll() { | ||
this.counter = 0; | ||
const tmpDir = _fsExtra.default.mkdtempSync(_path.default.join(_os.default.tmpdir(), `gatsby-worker`)); | ||
const options = this.options; | ||
@@ -98,2 +111,4 @@ | ||
const workerInFlightsDumpLocation = _path.default.join(tmpDir, `worker-${workerId}.json`); | ||
const worker = (0, _child_process.fork)(childWrapperPath, { | ||
@@ -104,3 +119,4 @@ cwd: process.cwd(), | ||
GATSBY_WORKER_ID: workerId.toString(), | ||
GATSBY_WORKER_MODULE_PATH: this.workerPath | ||
GATSBY_WORKER_MODULE_PATH: this.workerPath, | ||
GATSBY_WORKER_IN_FLIGHT_DUMP_LOCATION: workerInFlightsDumpLocation | ||
}, | ||
@@ -112,25 +128,52 @@ // Suppress --debug / --inspect flags while preserving others (like --harmony). | ||
let workerReadyResolve; | ||
let workerExitResolve; | ||
const workerInfo = { | ||
workerId, | ||
worker, | ||
send: msg => { | ||
if (!worker.connected) { | ||
return; | ||
} | ||
worker.send(msg, undefined, undefined, error => { | ||
if (error && worker.connected) { | ||
throw error; | ||
} | ||
}); | ||
}, | ||
kill: worker.kill.bind(worker), | ||
ready: new Promise(resolve => { | ||
workerReadyResolve = resolve; | ||
}), | ||
lastMessage: 0, | ||
exitedPromise: new Promise(resolve => { | ||
worker.on(`exit`, (code, signal) => { | ||
if (workerInfo.currentTask) { | ||
// worker exited without finishing a task | ||
workerInfo.currentTask.reject(new Error(`Worker exited before finishing task`)); | ||
} // remove worker from list of workers | ||
workerExitResolve = resolve; | ||
}) | ||
}; | ||
const workerProcessMessageHandler = msg => { | ||
if (!Array.isArray(msg)) { | ||
// all gatsby-worker messages should be an array | ||
// if it's not an array we skip it | ||
return; | ||
} else if (msg[1] <= workerInfo.lastMessage) { | ||
// this message was already handled, so skipping it | ||
// this is specifically for special casing worker exits | ||
// where we serialize "in-flight" IPC messages to fs | ||
// and "replay" them here to ensure no messages are lost | ||
// Trickiness is that while we write out in flight IPC messages | ||
// to fs, those messages might actually still go through as regular | ||
// ipc messages so we have to ensure we don't handle same message twice | ||
return; | ||
} else if (msg[1] !== workerInfo.lastMessage + 1) { | ||
// TODO: figure out IPC message order guarantees (or lack of them) - for now | ||
// condition above relies on IPC messages being received in same order | ||
// as they were sent via `process.send` in child process | ||
// generally we expect messages we receive to be next one (lastMessage + 1) | ||
// IF order is not guaranteed, then different strategy for de-duping messages | ||
// is needed. | ||
throw new Error(`[gatsby-worker] Out of order message. Expected ${workerInfo.lastMessage + 1}, got ${msg[1]}.\n\nFull message:\n${JSON.stringify(msg, null, 2)}.`); | ||
} | ||
this.workers.splice(this.workers.indexOf(workerInfo), 1); | ||
resolve({ | ||
code, | ||
signal | ||
}); | ||
}); | ||
}) | ||
}; | ||
worker.on(`message`, msg => { | ||
workerInfo.lastMessage = msg[1]; | ||
if (msg[0] === _types.RESULT) { | ||
@@ -144,3 +187,3 @@ if (!workerInfo.currentTask) { | ||
this.checkForWork(workerInfo); | ||
task.resolve(msg[1]); | ||
task.resolve(msg[2]); | ||
} else if (msg[0] === _types.ERROR) { | ||
@@ -151,12 +194,12 @@ if (!workerInfo.currentTask) { | ||
let error = msg[4]; | ||
let error = msg[5]; | ||
if (error !== null && typeof error === `object`) { | ||
const extra = error; | ||
const NativeCtor = global[msg[1]]; | ||
const NativeCtor = global[msg[2]]; | ||
const Ctor = typeof NativeCtor === `function` ? NativeCtor : Error; | ||
error = new Ctor(msg[2]); // @ts-ignore type doesn't exist on Error, but that's what jest-worker does for errors :shrug: | ||
error = new Ctor(msg[3]); // @ts-ignore type doesn't exist on Error, but that's what jest-worker does for errors :shrug: | ||
error.type = msg[1]; | ||
error.stack = msg[3]; | ||
error.type = msg[2]; | ||
error.stack = msg[4]; | ||
@@ -176,3 +219,3 @@ for (const key in extra) { | ||
for (const listener of this.listeners) { | ||
listener(msg[1], workerId); | ||
listener(msg[2], workerId); | ||
} | ||
@@ -182,2 +225,33 @@ } else if (msg[0] === _types.WORKER_READY) { | ||
} | ||
}; | ||
worker.on(`message`, workerProcessMessageHandler); | ||
worker.on(`exit`, async (code, signal) => { | ||
if (await _fsExtra.default.pathExists(workerInFlightsDumpLocation)) { | ||
const pendingMessages = await _fsExtra.default.readJSON(workerInFlightsDumpLocation); | ||
if (Array.isArray(pendingMessages)) { | ||
for (const msg of pendingMessages) { | ||
workerProcessMessageHandler(msg); | ||
} | ||
} | ||
try { | ||
await _fsExtra.default.remove(workerInFlightsDumpLocation); | ||
} catch {// this is just cleanup, failing to delete this file | ||
// won't cause | ||
} | ||
} | ||
if (workerInfo.currentTask) { | ||
// worker exited without finishing a task | ||
workerInfo.currentTask.reject(new Error(`Worker exited before finishing task`)); | ||
} // remove worker from list of workers | ||
this.workers.splice(this.workers.indexOf(workerInfo), 1); | ||
workerExitResolve({ | ||
code, | ||
signal | ||
}); | ||
}); | ||
@@ -197,7 +271,7 @@ this.workers.push(workerInfo); | ||
// tell worker to end gracefully | ||
const endMessage = [_types.END]; | ||
workerInfo.worker.send(endMessage); // force exit if worker doesn't exit gracefully quickly | ||
const endMessage = [_types.END, ++this.counter]; | ||
workerInfo.send(endMessage); // force exit if worker doesn't exit gracefully quickly | ||
const forceExitTimeout = setTimeout(() => { | ||
workerInfo.worker.kill(`SIGKILL`); | ||
workerInfo.kill(`SIGKILL`); | ||
}, 1000); | ||
@@ -258,4 +332,4 @@ const exitResult = await workerInfo.exitedPromise; | ||
await workerInfo.ready; | ||
const msg = [_types.EXECUTE, taskInfo.functionName, taskInfo.args]; | ||
workerInfo.worker.send(msg); | ||
const msg = [_types.EXECUTE, ++this.counter, taskInfo.functionName, taskInfo.args]; | ||
workerInfo.send(msg); | ||
} | ||
@@ -309,4 +383,4 @@ | ||
const poolMsg = [_types.CUSTOM_MESSAGE, msg]; | ||
worker.worker.send(poolMsg); | ||
const poolMsg = [_types.CUSTOM_MESSAGE, ++this.counter, msg]; | ||
worker.send(poolMsg); | ||
} | ||
@@ -313,0 +387,0 @@ |
@@ -7,8 +7,9 @@ export declare const EXECUTE = 1; | ||
export declare const WORKER_READY = 8; | ||
type CustomMessage = [typeof CUSTOM_MESSAGE, unknown]; | ||
type Counter = number; | ||
type CustomMessage = [typeof CUSTOM_MESSAGE, Counter, unknown]; | ||
type FunctionName = string | number | symbol; | ||
type FunctionArgs = Array<any>; | ||
type ExecuteMessage = [typeof EXECUTE, FunctionName, FunctionArgs]; | ||
type EndMessage = [typeof END]; | ||
type WorkerReadyMessage = [typeof WORKER_READY]; | ||
type ExecuteMessage = [typeof EXECUTE, Counter, FunctionName, FunctionArgs]; | ||
type EndMessage = [typeof END, Counter]; | ||
type WorkerReadyMessage = [typeof WORKER_READY, Counter]; | ||
export type ParentMessageUnion = ExecuteMessage | EndMessage | CustomMessage; | ||
@@ -20,2 +21,3 @@ type ErrorType = string; | ||
typeof ERROR, | ||
Counter, | ||
ErrorType, | ||
@@ -27,4 +29,4 @@ ErrorMessage, | ||
type ResultType = unknown; | ||
type TaskResult = [typeof RESULT, ResultType]; | ||
type TaskResult = [typeof RESULT, Counter, ResultType]; | ||
export type ChildMessageUnion = TaskError | TaskResult | CustomMessage | WorkerReadyMessage; | ||
export {}; |
{ | ||
"name": "gatsby-worker", | ||
"description": "Utility to create worker pools", | ||
"version": "2.3.0-next.1", | ||
"version": "2.4.0-next.0", | ||
"author": "Michal Piechowiak<misiek.piechowiak@gmail.com>", | ||
@@ -11,3 +11,5 @@ "bugs": { | ||
"@babel/core": "^7.15.5", | ||
"@babel/runtime": "^7.15.4" | ||
"@babel/runtime": "^7.15.4", | ||
"fs-extra": "^10.0.0", | ||
"signal-exit": "^3.0.5" | ||
}, | ||
@@ -17,3 +19,3 @@ "devDependencies": { | ||
"@babel/register": "^7.15.3", | ||
"babel-preset-gatsby-package": "^3.3.0-next.0", | ||
"babel-preset-gatsby-package": "^3.4.0-next.0", | ||
"cross-env": "^7.0.3", | ||
@@ -44,3 +46,3 @@ "rimraf": "^3.0.2", | ||
}, | ||
"gitHead": "03d6c591084769e0498e985c9600396b5d282053" | ||
"gitHead": "7938eb07143c06953ae0197d26533ba34eecfc29" | ||
} |
Environment variable access
Supply chain riskPackage accesses environment variables, which may be a sign of credential stuffing or data theft.
Found 1 instance in 1 package
Filesystem access
Supply chain riskAccesses the file system, and could potentially read sensitive data.
Found 1 instance in 1 package
45705
613
4
10
+ Addedfs-extra@^10.0.0
+ Addedsignal-exit@^3.0.5
+ Addedfs-extra@10.1.0(transitive)
+ Addedgraceful-fs@4.2.11(transitive)
+ Addedjsonfile@6.1.0(transitive)
+ Addedsignal-exit@3.0.7(transitive)
+ Addeduniversalify@2.0.1(transitive)