Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

@travetto/worker

Package Overview
Dependencies
Maintainers
1
Versions
186
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@travetto/worker - npm Package Compare versions

Comparing version 4.0.0-rc.0 to 4.0.0-rc.1

1

__index__.ts

@@ -10,2 +10,1 @@ export * from './src/comm/channel';

export * from './src/pool';
export * from './src/util';

4

package.json
{
"name": "@travetto/worker",
"version": "4.0.0-rc.0",
"version": "4.0.0-rc.1",
"description": "Process management utilities, with a focus on inter-process communication",

@@ -28,3 +28,3 @@ "keywords": [

"dependencies": {
"@travetto/base": "^4.0.0-rc.0",
"@travetto/base": "^4.0.0-rc.1",
"generic-pool": "^3.9.0"

@@ -31,0 +31,0 @@ },

@@ -21,151 +21,3 @@ <!-- This file was generated by @travetto/doc and should not be modified directly -->

Below is a pool that will convert images on demand, while queuing as needed.
**Code: Image processing queue, with a fixed batch/pool size**
```typescript
import { ExecUtil, ExecutionState } from '@travetto/base';
import { Worker, WorkPool, WorkQueue } from '@travetto/worker';
class ImageProcessor implements Worker<string> {
active = false;
proc: ExecutionState;
get id(): number | undefined {
return this.proc.process.pid;
}
async destroy(): Promise<void> {
this.proc.process.kill();
}
async execute(path: string): Promise<void> {
this.active = true;
try {
this.proc = ExecUtil.spawn('convert images', [path]);
await this.proc;
} catch {
// Do nothing
}
this.active = false;
}
}
export class ImageCompressor {
changes: AsyncIterable<unknown>;
pendingImages = new WorkQueue<string>();
begin(): void {
this.changes ??= WorkPool.runStream(() => new ImageProcessor(), this.pendingImages);
}
convert(...images: string[]): void {
this.pendingImages.addAll(images);
}
}
```
Once a pool is constructed, it can be shutdown by calling the `.shutdown()` method, and awaiting the result.
## IPC Support
Within the `comm` package, there is support for two primary communication elements: [ChildCommChannel](https://github.com/travetto/travetto/tree/main/module/worker/src/comm/child.ts#L6) and [ParentCommChannel](https://github.com/travetto/travetto/tree/main/module/worker/src/comm/parent.ts#L10). Usually [ParentCommChannel](https://github.com/travetto/travetto/tree/main/module/worker/src/comm/parent.ts#L10) indicates it is the owner of the sub process. [ChildCommChannel](https://github.com/travetto/travetto/tree/main/module/worker/src/comm/child.ts#L6) indicates that it has been created/spawned/forked by the parent and will communicate back to it's parent. This generally means that a [ParentCommChannel](https://github.com/travetto/travetto/tree/main/module/worker/src/comm/parent.ts#L10) can be destroyed (i.e. killing the subprocess) where a [ChildCommChannel](https://github.com/travetto/travetto/tree/main/module/worker/src/comm/child.ts#L6) can only exit the process, but the channel cannot be destroyed.
### IPC as a Worker
A common pattern is to want to model a sub process as a worker, to be a valid candidate in a [WorkPool](https://github.com/travetto/travetto/tree/main/module/worker/src/pool.ts#L35). The [WorkUtil](https://github.com/travetto/travetto/tree/main/module/worker/src/util.ts#L14) class provides a utility to facilitate this desire.
**Code: Spawned Worker**
```typescript
import { ExecutionState } from '@travetto/base';
import { ParentCommChannel } from './comm/parent';
import { Worker } from './pool';
type Simple<V> = (ch: ParentCommChannel<V>) => Promise<unknown | void>;
type Param<V, X> = (ch: ParentCommChannel<V>, input: X) => Promise<unknown | void>;
const empty = async (): Promise<void> => { };
/**
* Spawned worker
*/
export class WorkUtil {
/**
* Create a process channel worker from a given spawn config
*/
static spawnedWorker<V, X>(
worker: () => ExecutionState,
init: Simple<V>,
execute: Param<V, X>,
destroy: Simple<V> = empty): Worker<X> {
const channel = new ParentCommChannel<V>(worker());
return {
get id(): number | undefined { return channel.id; },
get active(): boolean { return channel.active; },
init: () => init(channel),
execute: inp => execute(channel, inp),
async destroy(): Promise<void> {
await destroy(channel);
await channel.destroy();
},
};
}
}
```
When creating your work, via process spawning, you will need to provide the script (and any other features you would like in `SpawnConfig`). Additionally you must, at a minimum, provide functionality to run whenever an input element is up for grabs in the input source. This method will be provided the communication channel ([ParentCommChannel](https://github.com/travetto/travetto/tree/main/module/worker/src/comm/parent.ts#L10)) and the input value. A simple example could look like:
**Code: Spawning Pool**
```typescript
import { ExecUtil } from '@travetto/base';
import { WorkPool, WorkUtil } from '@travetto/worker';
export async function main(): Promise<void> {
await WorkPool.run(
() => WorkUtil.spawnedWorker<{ data: number }, number>(
() => ExecUtil.spawn('trv', ['main', '@travetto/worker/doc/spawned.ts']),
ch => ch.once('ready'), // Wait for child to indicate it is ready
async (channel, inp) => {
const res = channel.once('response'); // Register response listener
channel.send('request', { data: inp }); // Send request
const { data } = await res; // Get answer
console.log('Request complete', { input: inp, output: data });
if (!(inp + inp === data)) {
// Ensure the answer is double the input
throw new Error(`Did not get the double: inp=${inp}, data=${data}`);
}
}
), [1, 2, 3, 4, 5]);
}
```
**Code: Spawned Worker**
```typescript
import timers from 'node:timers/promises';
import { ChildCommChannel } from '@travetto/worker';
export async function main(): Promise<void> {
const exec = new ChildCommChannel<{ data: string }>();
exec.on('request', data =>
exec.send('response', { data: (data.data + data.data) })); // When data is received, return double
exec.send('ready'); // Indicate the child is ready to receive requests
for await (const _ of timers.setInterval(5000)) {
// Keep-alive
}
}
```
**Terminal: Output**
```bash
$ trv main doc/spawner.ts
Request complete { input: 1, output: 2 }
Request complete { input: 2, output: 4 }
Request complete { input: 3, output: 6 }
Request complete { input: 4, output: 8 }
Request complete { input: 5, output: 10 }
```
import { ChildProcess } from 'node:child_process';
import { EventEmitter } from 'node:events';
import { ExecUtil } from '@travetto/base';
/**

@@ -59,4 +57,4 @@ * Channel that represents communication between parent/child

throw new Error('this.proc was not defined');
} else if (this.#proc.send) {
this.#proc.send({ ...(data ?? {}), type: eventType });
} else if (this.#proc.send && this.#proc.connected) {
this.#proc.send({ ...(data ?? {}), type: eventType }, (err) => err && console.error(err));
} else {

@@ -96,3 +94,4 @@ throw new Error('this.proc.send was not defined');

if (this.#proc !== process) {
ExecUtil.kill(this.#proc);
// eslint-disable-next-line @typescript-eslint/consistent-type-assertions
(this.#proc as ChildProcess).kill();
}

@@ -99,0 +98,0 @@ this.#proc = undefined;

import { ChildProcess } from 'node:child_process';
import { ShutdownManager, ExecutionState } from '@travetto/base';
import { ShutdownManager } from '@travetto/base';

@@ -12,9 +12,8 @@ import { ProcessCommChannel } from './channel';

#complete: ExecutionState['result'];
#complete: Promise<void>;
constructor(state: ExecutionState) {
super(state.process);
constructor(proc: ChildProcess) {
super(proc);
ShutdownManager.onGracefulShutdown(() => this.destroy(), this);
this.#complete = state.result
.finally(() => { this.proc = undefined; });
this.#complete = new Promise<void>(r => proc.on('close', r)).finally(() => { this.proc = undefined; });
}

@@ -21,0 +20,0 @@

@@ -37,3 +37,3 @@ import gp from 'generic-pool';

static MAX_SIZE = os.cpus().length - 1;
static MAX_SIZE = os.availableParallelism();
static DEFAULT_SIZE = Math.min(WorkPool.MAX_SIZE, 4);

@@ -40,0 +40,0 @@

@@ -71,2 +71,11 @@ import { Util } from '@travetto/base';

/**
* Throw an error from the queue, rejecting and terminating immediately
*/
async throw(e?: Error): Promise<IteratorResult<X>> {
this.#done = true;
this.#ready.reject(e);
return { value: undefined, done: this.#done };
}
/**
* Get size, will change as items are added

@@ -73,0 +82,0 @@ */

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc