synckit
Advanced tools
Comparing version 0.1.6 to 0.2.0
@@ -5,4 +5,21 @@ import { execSync } from 'child_process'; | ||
import fs from 'fs'; | ||
import { MessageChannel, Worker, receiveMessageOnPort, workerData, parentPort } from 'worker_threads'; | ||
import { v4 } from 'uuid'; | ||
var __defProp = Object.defineProperty; | ||
var __getOwnPropSymbols = Object.getOwnPropertySymbols; | ||
var __hasOwnProp = Object.prototype.hasOwnProperty; | ||
var __propIsEnum = Object.prototype.propertyIsEnumerable; | ||
var __defNormalProp = (obj, key, value) => key in obj ? __defProp(obj, key, { enumerable: true, configurable: true, writable: true, value }) : obj[key] = value; | ||
var __spreadValues = (a, b) => { | ||
for (var prop in b || (b = {})) | ||
if (__hasOwnProp.call(b, prop)) | ||
__defNormalProp(a, prop, b[prop]); | ||
if (__getOwnPropSymbols) | ||
for (var prop of __getOwnPropSymbols(b)) { | ||
if (__propIsEnum.call(b, prop)) | ||
__defNormalProp(a, prop, b[prop]); | ||
} | ||
return a; | ||
}; | ||
var __async = (__this, __arguments, generator) => { | ||
@@ -29,17 +46,5 @@ return new Promise((resolve, reject) => { | ||
const tmpdir = fs.realpathSync(tmpdir$1()); | ||
let tsconfigPathsAvailable; | ||
const TSCONFIG_PATH = process.env.TSCONFIG_PATH || "tsconfig.json"; | ||
const isTsconfigPathsAvailable = () => { | ||
if (typeof tsconfigPathsAvailable === "boolean") { | ||
return tsconfigPathsAvailable; | ||
} | ||
try { | ||
tsconfigPathsAvailable = !!require.resolve("tsconfig-paths"); | ||
} catch (e) { | ||
tsconfigPathsAvailable = false; | ||
} | ||
return tsconfigPathsAvailable; | ||
}; | ||
const useWorkerThreads = !["0", "false"].includes(process.env.SYNCKIT_WORKER_THREADS); | ||
const syncFnCache = new Map(); | ||
function createSyncFn(workerPath) { | ||
function createSyncFn(workerPath, bufferSize) { | ||
if (!path.isAbsolute(workerPath)) { | ||
@@ -56,7 +61,12 @@ throw new Error("`workerPath` must be absolute"); | ||
} | ||
const executor = resolvedWorkerPath.endsWith(".ts") ? "ts-node -P " + TSCONFIG_PATH + (isTsconfigPathsAvailable() ? " -r tsconfig-paths/register" : "") : "node"; | ||
const syncFn = (...args) => { | ||
const syncFn = (useWorkerThreads ? startWorkerThread : startChildProcess)(resolvedWorkerPath, bufferSize); | ||
syncFnCache.set(workerPath, syncFn); | ||
return syncFn; | ||
} | ||
function startChildProcess(workerPath) { | ||
const executor = workerPath.endsWith(".ts") ? "ts-node" : "node"; | ||
return (...args) => { | ||
const filename = path.resolve(tmpdir, `synckit-${v4()}.json`); | ||
fs.writeFileSync(filename, JSON.stringify(args)); | ||
const command = `${executor} ${resolvedWorkerPath} ${filename}`; | ||
const command = `${executor} ${workerPath} ${filename}`; | ||
try { | ||
@@ -66,4 +76,7 @@ execSync(command, { | ||
}); | ||
const result = fs.readFileSync(filename, "utf8"); | ||
return JSON.parse(result); | ||
const { result, error } = JSON.parse(fs.readFileSync(filename, "utf8")); | ||
if (error) { | ||
throw typeof error === "object" && error && "message" in error ? Object.assign(new Error(), error) : error; | ||
} | ||
return result; | ||
} finally { | ||
@@ -73,12 +86,74 @@ fs.unlinkSync(filename); | ||
}; | ||
syncFnCache.set(workerPath, syncFn); | ||
} | ||
function startWorkerThread(workerPath, bufferSize = 1024) { | ||
const { port1: mainPort, port2: workerPort } = new MessageChannel(); | ||
const isTs = workerPath.endsWith(".ts"); | ||
const worker = new Worker(isTs ? `require('ts-node/register');require(require('worker_threads').workerData.workerPath)` : workerPath, { | ||
eval: isTs, | ||
workerData: { workerPath, workerPort }, | ||
transferList: [workerPort], | ||
execArgv: [] | ||
}); | ||
let nextID = 0; | ||
const syncFn = (...args) => { | ||
const id = nextID++; | ||
const sharedBuffer = new SharedArrayBuffer(bufferSize); | ||
const sharedBufferView = new Int32Array(sharedBuffer); | ||
const msg = { sharedBuffer, id, args }; | ||
worker.postMessage(msg); | ||
const status = Atomics.wait(sharedBufferView, 0, 0); | ||
if (status !== "ok" && status !== "not-equal") { | ||
throw new Error("Internal error: Atomics.wait() failed: " + status); | ||
} | ||
const { | ||
id: id2, | ||
result, | ||
error, | ||
properties | ||
} = receiveMessageOnPort(mainPort).message; | ||
if (id !== id2) { | ||
throw new Error(`Internal error: Expected id ${id} but got id ${id2}`); | ||
} | ||
if (error) { | ||
throw Object.assign(error, properties); | ||
} | ||
return result; | ||
}; | ||
worker.unref(); | ||
return syncFn; | ||
} | ||
const runAsWorker = (fn) => __async(undefined, null, function* () { | ||
const filename = process.argv[2]; | ||
const content = fs.readFileSync(filename, "utf-8"); | ||
const options = JSON.parse(content); | ||
fs.writeFileSync(filename, JSON.stringify(yield fn(...options))); | ||
if (!workerData) { | ||
const filename = process.argv[2]; | ||
const content = fs.readFileSync(filename, "utf8"); | ||
const args = JSON.parse(content); | ||
let msg; | ||
try { | ||
msg = { result: yield fn(...args) }; | ||
} catch (err) { | ||
msg = { | ||
error: err instanceof Error ? { name: err.name, message: err.message, stack: err.stack } : err | ||
}; | ||
} | ||
fs.writeFileSync(filename, JSON.stringify(msg)); | ||
return; | ||
} | ||
const { workerPort } = workerData; | ||
parentPort.on("message", ({ sharedBuffer, id, args }) => { | ||
(() => __async(this, null, function* () { | ||
const sharedBufferView = new Int32Array(sharedBuffer); | ||
let msg; | ||
try { | ||
msg = { id, result: yield fn(...args) }; | ||
} catch (err) { | ||
const error = err; | ||
msg = { id, error, properties: __spreadValues({}, error) }; | ||
} | ||
workerPort.postMessage(msg); | ||
Atomics.add(sharedBufferView, 0, 1); | ||
Atomics.notify(sharedBufferView, 0, Number.POSITIVE_INFINITY); | ||
}))(); | ||
}); | ||
}); | ||
export { createSyncFn, runAsWorker, tmpdir }; | ||
export { createSyncFn, runAsWorker, tmpdir, useWorkerThreads }; |
@@ -7,3 +7,4 @@ import { AnyAsyncFn, Syncify } from './types'; | ||
export declare const tmpdir: string; | ||
export declare function createSyncFn<T extends AnyAsyncFn>(workerPath: string): Syncify<T>; | ||
export declare const useWorkerThreads: boolean; | ||
export declare function createSyncFn<T extends AnyAsyncFn>(workerPath: string, bufferSize?: number): Syncify<T>; | ||
export declare const runAsWorker: <T extends AnyAsyncFn>(fn: T) => Promise<void>; |
136
lib/index.js
"use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
exports.runAsWorker = exports.createSyncFn = exports.tmpdir = void 0; | ||
exports.runAsWorker = exports.createSyncFn = exports.useWorkerThreads = exports.tmpdir = void 0; | ||
const tslib_1 = require("tslib"); | ||
@@ -9,2 +9,3 @@ const child_process_1 = require("child_process"); | ||
const fs_1 = tslib_1.__importDefault(require("fs")); | ||
const worker_threads_1 = require("worker_threads"); | ||
const uuid_1 = require("uuid"); | ||
@@ -16,23 +17,5 @@ tslib_1.__exportStar(require("./types"), exports); | ||
exports.tmpdir = fs_1.default.realpathSync(os_1.tmpdir()); | ||
let tsconfigPathsAvailable; | ||
const TSCONFIG_PATH = process.env.TSCONFIG_PATH || 'tsconfig.json'; | ||
const isTsconfigPathsAvailable = () => { | ||
if (typeof tsconfigPathsAvailable === 'boolean') { | ||
return tsconfigPathsAvailable; | ||
} | ||
try { | ||
tsconfigPathsAvailable = !!require.resolve('tsconfig-paths'); | ||
} | ||
catch (_a) { | ||
/** | ||
* `require.resolve` can not be mocked to fail | ||
* @link https://github.com/facebook/jest/issues/9543 | ||
*/ | ||
/* istanbul ignore next */ | ||
tsconfigPathsAvailable = false; | ||
} | ||
return tsconfigPathsAvailable; | ||
}; | ||
exports.useWorkerThreads = !['0', 'false'].includes(process.env.SYNCKIT_WORKER_THREADS); | ||
const syncFnCache = new Map(); | ||
function createSyncFn(workerPath) { | ||
function createSyncFn(workerPath, bufferSize) { | ||
if (!path_1.default.isAbsolute(workerPath)) { | ||
@@ -49,13 +32,13 @@ throw new Error('`workerPath` must be absolute'); | ||
} | ||
const executor = resolvedWorkerPath.endsWith('.ts') | ||
? 'ts-node -P ' + | ||
TSCONFIG_PATH + | ||
(isTsconfigPathsAvailable() | ||
? ' -r tsconfig-paths/register' | ||
: /* istanbul ignore next */ '') | ||
: 'node'; | ||
const syncFn = (...args) => { | ||
const syncFn = (exports.useWorkerThreads ? startWorkerThread : startChildProcess)(resolvedWorkerPath, bufferSize); | ||
syncFnCache.set(workerPath, syncFn); | ||
return syncFn; | ||
} | ||
exports.createSyncFn = createSyncFn; | ||
function startChildProcess(workerPath) { | ||
const executor = workerPath.endsWith('.ts') ? 'ts-node' : 'node'; | ||
return (...args) => { | ||
const filename = path_1.default.resolve(exports.tmpdir, `synckit-${uuid_1.v4()}.json`); | ||
fs_1.default.writeFileSync(filename, JSON.stringify(args)); | ||
const command = `${executor} ${resolvedWorkerPath} ${filename}`; | ||
const command = `${executor} ${workerPath} ${filename}`; | ||
try { | ||
@@ -65,4 +48,10 @@ child_process_1.execSync(command, { | ||
}); | ||
const result = fs_1.default.readFileSync(filename, 'utf8'); | ||
return JSON.parse(result); | ||
const { result, error } = JSON.parse(fs_1.default.readFileSync(filename, 'utf8')); | ||
if (error) { | ||
throw typeof error === 'object' && error && 'message' in error | ||
? // eslint-disable-next-line unicorn/error-message | ||
Object.assign(new Error(), error) | ||
: error; | ||
} | ||
return result; | ||
} | ||
@@ -73,13 +62,84 @@ finally { | ||
}; | ||
syncFnCache.set(workerPath, syncFn); | ||
} | ||
function startWorkerThread(workerPath, bufferSize = 1024) { | ||
const { port1: mainPort, port2: workerPort } = new worker_threads_1.MessageChannel(); | ||
const isTs = workerPath.endsWith('.ts'); | ||
const worker = new worker_threads_1.Worker(isTs | ||
? `require('ts-node/register');require(require('worker_threads').workerData.workerPath)` | ||
: workerPath, { | ||
eval: isTs, | ||
workerData: { workerPath, workerPort }, | ||
transferList: [workerPort], | ||
execArgv: [], | ||
}); | ||
let nextID = 0; | ||
const syncFn = (...args) => { | ||
const id = nextID++; | ||
const sharedBuffer = new SharedArrayBuffer(bufferSize); | ||
const sharedBufferView = new Int32Array(sharedBuffer); | ||
const msg = { sharedBuffer, id, args }; | ||
worker.postMessage(msg); | ||
const status = Atomics.wait(sharedBufferView, 0, 0); | ||
/* istanbul ignore if */ | ||
if (status !== 'ok' && status !== 'not-equal') { | ||
throw new Error('Internal error: Atomics.wait() failed: ' + status); | ||
} | ||
const { id: id2, result, error, properties, } = worker_threads_1.receiveMessageOnPort(mainPort).message; | ||
/* istanbul ignore if */ | ||
if (id !== id2) { | ||
throw new Error(`Internal error: Expected id ${id} but got id ${id2}`); | ||
} | ||
if (error) { | ||
// MessagePort doesn't copy the properties of Error objects. We still want | ||
// error objects to have extra properties such as "warnings" so implement the | ||
// property copying manually. | ||
throw Object.assign(error, properties); | ||
} | ||
return result; | ||
}; | ||
worker.unref(); | ||
return syncFn; | ||
} | ||
exports.createSyncFn = createSyncFn; | ||
const runAsWorker = (fn) => tslib_1.__awaiter(void 0, void 0, void 0, function* () { | ||
const filename = process.argv[2]; | ||
const content = fs_1.default.readFileSync(filename, 'utf-8'); | ||
const options = JSON.parse(content); | ||
fs_1.default.writeFileSync(filename, JSON.stringify(yield fn(...options))); | ||
if (!worker_threads_1.workerData) { | ||
const filename = process.argv[2]; | ||
const content = fs_1.default.readFileSync(filename, 'utf8'); | ||
const args = JSON.parse(content); | ||
let msg; | ||
try { | ||
msg = { result: (yield fn(...args)) }; | ||
} | ||
catch (err) { | ||
msg = { | ||
error: err instanceof Error | ||
? { name: err.name, message: err.message, stack: err.stack } | ||
: err, | ||
}; | ||
} | ||
fs_1.default.writeFileSync(filename, JSON.stringify(msg)); | ||
return; | ||
} | ||
/* istanbul ignore next */ | ||
const { workerPort } = worker_threads_1.workerData; | ||
/* istanbul ignore next */ | ||
worker_threads_1.parentPort.on('message', ({ sharedBuffer, id, args }) => { | ||
// eslint-disable-next-line @typescript-eslint/no-floating-promises | ||
; | ||
(() => tslib_1.__awaiter(void 0, void 0, void 0, function* () { | ||
const sharedBufferView = new Int32Array(sharedBuffer); | ||
let msg; | ||
try { | ||
msg = { id, result: (yield fn(...args)) }; | ||
} | ||
catch (err) { | ||
const error = err; | ||
msg = { id, error, properties: Object.assign({}, error) }; | ||
} | ||
workerPort.postMessage(msg); | ||
Atomics.add(sharedBufferView, 0, 1); | ||
Atomics.notify(sharedBufferView, 0, Number.POSITIVE_INFINITY); | ||
}))(); | ||
}); | ||
}); | ||
exports.runAsWorker = runAsWorker; | ||
//# sourceMappingURL=index.js.map |
@@ -0,1 +1,3 @@ | ||
/// <reference types="node" /> | ||
import { MessagePort } from 'worker_threads'; | ||
export declare type AnyFn<T = any, R extends any[] = any[]> = (...args: R) => T; | ||
@@ -6,1 +8,17 @@ export declare type AnyPromise = Promise<any>; | ||
export declare type PromiseType<T extends AnyPromise> = T extends Promise<infer R> ? R : never; | ||
export interface MainToWorkerMessage { | ||
sharedBuffer: SharedArrayBuffer; | ||
id: number; | ||
args: unknown[]; | ||
} | ||
export interface WorkerData { | ||
workerPort: MessagePort; | ||
} | ||
export interface DataMessage<T = unknown> { | ||
result?: T; | ||
error?: unknown; | ||
} | ||
export interface WorkerToMainMessage<T = unknown> extends DataMessage<T> { | ||
id: number; | ||
properties?: object; | ||
} |
{ | ||
"name": "synckit", | ||
"version": "0.1.6", | ||
"version": "0.2.0", | ||
"description": "Perform async work synchronously in Node.js using a separate process with first-class TypeScript support", | ||
@@ -9,3 +9,3 @@ "repository": "git+https://github.com/rx-ts/synckit.git", | ||
"engines": { | ||
"node": ">=4.0" | ||
"node": ">=8.10" | ||
}, | ||
@@ -12,0 +12,0 @@ "main": "lib", |
@@ -21,4 +21,15 @@ # synckit | ||
## TOC <!-- omit in toc --> | ||
- [Usage](#usage) | ||
- [Install](#install) | ||
- [API](#api) | ||
- [TypeScript](#typescript) | ||
- [Changelog](#changelog) | ||
- [License](#license) | ||
## Usage | ||
### Install | ||
```sh | ||
@@ -34,2 +45,4 @@ # yarn | ||
`worker_threads` is used by default for performance, if you have any problem with it, you can set env `SYNCKIT_WORKER_THREADS=0` to disable it and fallback to previously `child_process` solution, and please raise an issue here so that we can improve it. | ||
```js | ||
@@ -52,3 +65,2 @@ // runner.js | ||
// do expensive work | ||
// but you must make sure the `result` is serializable by `JSON.stringify` | ||
return result | ||
@@ -58,2 +70,15 @@ }) | ||
You must make sure: | ||
1. if `worker_threads` is enabled (by default), the `result` is serialized by [`Structured Clone Algorithm`](https://developer.mozilla.org/en-US/docs/Web/API/Web_Workers_API/Structured_clone_algorithm) | ||
2. if `child_process` is used, the `result` is serialized by `JSON.stringify` | ||
### TypeScript | ||
If you want to use `ts-node` for worker file (a `.ts` file), it is supported out of box! | ||
If you want to use a custom tsconfig as project instead of default `tsconfig.json`, use `TS_NODE_PROJECT` env. Please view [ts-node](https://github.com/TypeStrong/ts-node#tsconfig) for more details. | ||
If you want to integrate with [tsconfig-paths](https://www.npmjs.com/package/tsconfig-paths), please view [ts-node](https://github.com/TypeStrong/ts-node#paths-and-baseurl) for more details. | ||
## Changelog | ||
@@ -60,0 +85,0 @@ |
Sorry, the diff of this file is not supported yet
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
Uses eval
Supply chain riskPackage uses eval() which is a dangerous function. This prevents the code from running in certain environments and increases the risk that the code may contain exploits or malicious behavior.
Found 1 instance in 1 package
Environment variable access
Supply chain riskPackage accesses environment variables, which may be a sign of credential stuffing or data theft.
Found 1 instance in 1 package
Environment variable access
Supply chain riskPackage accesses environment variables, which may be a sign of credential stuffing or data theft.
Found 1 instance in 1 package
22634
325
91
4