@travetto/worker
Advanced tools
Comparing version 4.0.0-rc.0 to 4.0.0-rc.1
@@ -10,2 +10,1 @@ export * from './src/comm/channel'; | ||
export * from './src/pool'; | ||
export * from './src/util'; |
{ | ||
"name": "@travetto/worker", | ||
"version": "4.0.0-rc.0", | ||
"version": "4.0.0-rc.1", | ||
"description": "Process management utilities, with a focus on inter-process communication", | ||
@@ -28,3 +28,3 @@ "keywords": [ | ||
"dependencies": { | ||
"@travetto/base": "^4.0.0-rc.0", | ||
"@travetto/base": "^4.0.0-rc.1", | ||
"generic-pool": "^3.9.0" | ||
@@ -31,0 +31,0 @@ }, |
148
README.md
@@ -21,151 +21,3 @@ <!-- This file was generated by @travetto/doc and should not be modified directly --> | ||
Below is a pool that will convert images on demand, while queuing as needed. | ||
**Code: Image processing queue, with a fixed batch/pool size** | ||
```typescript | ||
import { ExecUtil, ExecutionState } from '@travetto/base'; | ||
import { Worker, WorkPool, WorkQueue } from '@travetto/worker'; | ||
class ImageProcessor implements Worker<string> { | ||
active = false; | ||
proc: ExecutionState; | ||
get id(): number | undefined { | ||
return this.proc.process.pid; | ||
} | ||
async destroy(): Promise<void> { | ||
this.proc.process.kill(); | ||
} | ||
async execute(path: string): Promise<void> { | ||
this.active = true; | ||
try { | ||
this.proc = ExecUtil.spawn('convert images', [path]); | ||
await this.proc; | ||
} catch { | ||
// Do nothing | ||
} | ||
this.active = false; | ||
} | ||
} | ||
export class ImageCompressor { | ||
changes: AsyncIterable<unknown>; | ||
pendingImages = new WorkQueue<string>(); | ||
begin(): void { | ||
this.changes ??= WorkPool.runStream(() => new ImageProcessor(), this.pendingImages); | ||
} | ||
convert(...images: string[]): void { | ||
this.pendingImages.addAll(images); | ||
} | ||
} | ||
``` | ||
Once a pool is constructed, it can be shutdown by calling the `.shutdown()` method, and awaiting the result. | ||
## IPC Support | ||
Within the `comm` package, there is support for two primary communication elements: [ChildCommChannel](https://github.com/travetto/travetto/tree/main/module/worker/src/comm/child.ts#L6) and [ParentCommChannel](https://github.com/travetto/travetto/tree/main/module/worker/src/comm/parent.ts#L10). Usually [ParentCommChannel](https://github.com/travetto/travetto/tree/main/module/worker/src/comm/parent.ts#L10) indicates it is the owner of the sub process. [ChildCommChannel](https://github.com/travetto/travetto/tree/main/module/worker/src/comm/child.ts#L6) indicates that it has been created/spawned/forked by the parent and will communicate back to it's parent. This generally means that a [ParentCommChannel](https://github.com/travetto/travetto/tree/main/module/worker/src/comm/parent.ts#L10) can be destroyed (i.e. killing the subprocess) where a [ChildCommChannel](https://github.com/travetto/travetto/tree/main/module/worker/src/comm/child.ts#L6) can only exit the process, but the channel cannot be destroyed. | ||
### IPC as a Worker | ||
A common pattern is to want to model a sub process as a worker, to be a valid candidate in a [WorkPool](https://github.com/travetto/travetto/tree/main/module/worker/src/pool.ts#L35). The [WorkUtil](https://github.com/travetto/travetto/tree/main/module/worker/src/util.ts#L14) class provides a utility to facilitate this desire. | ||
**Code: Spawned Worker** | ||
```typescript | ||
import { ExecutionState } from '@travetto/base'; | ||
import { ParentCommChannel } from './comm/parent'; | ||
import { Worker } from './pool'; | ||
type Simple<V> = (ch: ParentCommChannel<V>) => Promise<unknown | void>; | ||
type Param<V, X> = (ch: ParentCommChannel<V>, input: X) => Promise<unknown | void>; | ||
const empty = async (): Promise<void> => { }; | ||
/** | ||
* Spawned worker | ||
*/ | ||
export class WorkUtil { | ||
/** | ||
* Create a process channel worker from a given spawn config | ||
*/ | ||
static spawnedWorker<V, X>( | ||
worker: () => ExecutionState, | ||
init: Simple<V>, | ||
execute: Param<V, X>, | ||
destroy: Simple<V> = empty): Worker<X> { | ||
const channel = new ParentCommChannel<V>(worker()); | ||
return { | ||
get id(): number | undefined { return channel.id; }, | ||
get active(): boolean { return channel.active; }, | ||
init: () => init(channel), | ||
execute: inp => execute(channel, inp), | ||
async destroy(): Promise<void> { | ||
await destroy(channel); | ||
await channel.destroy(); | ||
}, | ||
}; | ||
} | ||
} | ||
``` | ||
When creating your work, via process spawning, you will need to provide the script (and any other features you would like in `SpawnConfig`). Additionally you must, at a minimum, provide functionality to run whenever an input element is up for grabs in the input source. This method will be provided the communication channel ([ParentCommChannel](https://github.com/travetto/travetto/tree/main/module/worker/src/comm/parent.ts#L10)) and the input value. A simple example could look like: | ||
**Code: Spawning Pool** | ||
```typescript | ||
import { ExecUtil } from '@travetto/base'; | ||
import { WorkPool, WorkUtil } from '@travetto/worker'; | ||
export async function main(): Promise<void> { | ||
await WorkPool.run( | ||
() => WorkUtil.spawnedWorker<{ data: number }, number>( | ||
() => ExecUtil.spawn('trv', ['main', '@travetto/worker/doc/spawned.ts']), | ||
ch => ch.once('ready'), // Wait for child to indicate it is ready | ||
async (channel, inp) => { | ||
const res = channel.once('response'); // Register response listener | ||
channel.send('request', { data: inp }); // Send request | ||
const { data } = await res; // Get answer | ||
console.log('Request complete', { input: inp, output: data }); | ||
if (!(inp + inp === data)) { | ||
// Ensure the answer is double the input | ||
throw new Error(`Did not get the double: inp=${inp}, data=${data}`); | ||
} | ||
} | ||
), [1, 2, 3, 4, 5]); | ||
} | ||
``` | ||
**Code: Spawned Worker** | ||
```typescript | ||
import timers from 'node:timers/promises'; | ||
import { ChildCommChannel } from '@travetto/worker'; | ||
export async function main(): Promise<void> { | ||
const exec = new ChildCommChannel<{ data: string }>(); | ||
exec.on('request', data => | ||
exec.send('response', { data: (data.data + data.data) })); // When data is received, return double | ||
exec.send('ready'); // Indicate the child is ready to receive requests | ||
for await (const _ of timers.setInterval(5000)) { | ||
// Keep-alive | ||
} | ||
} | ||
``` | ||
**Terminal: Output** | ||
```bash | ||
$ trv main doc/spawner.ts | ||
Request complete { input: 1, output: 2 } | ||
Request complete { input: 2, output: 4 } | ||
Request complete { input: 3, output: 6 } | ||
Request complete { input: 4, output: 8 } | ||
Request complete { input: 5, output: 10 } | ||
``` |
import { ChildProcess } from 'node:child_process'; | ||
import { EventEmitter } from 'node:events'; | ||
import { ExecUtil } from '@travetto/base'; | ||
/** | ||
@@ -59,4 +57,4 @@ * Channel that represents communication between parent/child | ||
throw new Error('this.proc was not defined'); | ||
} else if (this.#proc.send) { | ||
this.#proc.send({ ...(data ?? {}), type: eventType }); | ||
} else if (this.#proc.send && this.#proc.connected) { | ||
this.#proc.send({ ...(data ?? {}), type: eventType }, (err) => err && console.error(err)); | ||
} else { | ||
@@ -96,3 +94,4 @@ throw new Error('this.proc.send was not defined'); | ||
if (this.#proc !== process) { | ||
ExecUtil.kill(this.#proc); | ||
// eslint-disable-next-line @typescript-eslint/consistent-type-assertions | ||
(this.#proc as ChildProcess).kill(); | ||
} | ||
@@ -99,0 +98,0 @@ this.#proc = undefined; |
import { ChildProcess } from 'node:child_process'; | ||
import { ShutdownManager, ExecutionState } from '@travetto/base'; | ||
import { ShutdownManager } from '@travetto/base'; | ||
@@ -12,9 +12,8 @@ import { ProcessCommChannel } from './channel'; | ||
#complete: ExecutionState['result']; | ||
#complete: Promise<void>; | ||
constructor(state: ExecutionState) { | ||
super(state.process); | ||
constructor(proc: ChildProcess) { | ||
super(proc); | ||
ShutdownManager.onGracefulShutdown(() => this.destroy(), this); | ||
this.#complete = state.result | ||
.finally(() => { this.proc = undefined; }); | ||
this.#complete = new Promise<void>(r => proc.on('close', r)).finally(() => { this.proc = undefined; }); | ||
} | ||
@@ -21,0 +20,0 @@ |
@@ -37,3 +37,3 @@ import gp from 'generic-pool'; | ||
static MAX_SIZE = os.cpus().length - 1; | ||
static MAX_SIZE = os.availableParallelism(); | ||
static DEFAULT_SIZE = Math.min(WorkPool.MAX_SIZE, 4); | ||
@@ -40,0 +40,0 @@ |
@@ -71,2 +71,11 @@ import { Util } from '@travetto/base'; | ||
/** | ||
* Throw an error from the queue, rejecting and terminating immediately | ||
*/ | ||
async throw(e?: Error): Promise<IteratorResult<X>> { | ||
this.#done = true; | ||
this.#ready.reject(e); | ||
return { value: undefined, done: this.#done }; | ||
} | ||
/** | ||
* Get size, will change as items are added | ||
@@ -73,0 +82,0 @@ */ |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
19552
13
490
23
Updated@travetto/base@^4.0.0-rc.1