@travetto/worker
Advanced tools
Comparing version 1.0.0-rc.8 to 1.0.0
@@ -9,5 +9,6 @@ { | ||
}, | ||
"title": "Worker", | ||
"description": "Process management utilties, with a focus on inter-process communication", | ||
"dependencies": { | ||
"@travetto/base": "^1.0.0-rc.8", | ||
"@travetto/base": "^1.0.0", | ||
"@types/generic-pool": "^3.1.9", | ||
@@ -34,4 +35,4 @@ "generic-pool": "^3.7.1" | ||
}, | ||
"version": "1.0.0-rc.8", | ||
"gitHead": "cbb74e652d4277ec98c98098b931d6503a586af2" | ||
"version": "1.0.0", | ||
"gitHead": "7579e30893669f4136472c7d0a327a9cfdd1c804" | ||
} |
176
README.md
@@ -1,17 +0,17 @@ | ||
travetto: Worker | ||
=== | ||
<!-- This file was generated by the framweork and should not be modified directly --> | ||
<!-- Please modify https://github.com/travetto/travetto/tree/master/module/worker/README.js and execute "npm run docs" to rebuild --> | ||
# Worker | ||
## Process management utilties, with a focus on inter-process communication | ||
**Install: primary** | ||
**Install: @travetto/worker** | ||
```bash | ||
$ npm install @travetto/worker | ||
npm install @travetto/worker | ||
``` | ||
This module provides the necessary primitives for handling dependent workers. A worker can be an individual actor or could be a pool of workers. Node provides ipc (inter-process communication) functionality out of the box. This module builds upon that by providing enhanced event management, richer process management, as well as constructs for orchestrating a conversation between two processes. | ||
This module provides the necessary primitives for handling dependent workers. A worker can be an individual actor or could be a pool of workers. Node provides ipc (inter-process communication) functionality out of the box. This module builds upon that by providing enhanced event management, richer process management, as well as constructs for orchestrating a conversation between two processes. | ||
## Execution Pools | ||
With respect to managing multiple executions, [`WorkPool`](./src/pool.ts) is provided to allow for concurrent operation, and processing of jobs concurrently. To manage the flow of jobs, there are various [`InputSource`](./src/input/types.ts) implementation that allow for a wide range of use cases. | ||
With respect to managing multiple executions, [WorkPool](https://github.com/travetto/travetto/tree/master/module/worker/src/pool.ts#L23) is provided to allow for concurrent operation, and processing of jobs concurrently. To manage the flow of jobs, there are various [InputSource](src/input/types.ts#L3) implementation that allow for a wide range of use cases. | ||
The supported `InputSource`s are | ||
- ```Iterable``` supports any iterable (Array, Set, etc) input as well as any async iterable input. The source will continue to produce jobs until the underlying iterator is exhausted. | ||
- ```Event``` is an asynchronous source that allows the caller to determine when the next item is available. Useful triggering work on event driven problems. | ||
The only provided [InputSource](src/input/types.ts#L3) is the [IterableInputSource](https://github.com/travetto/travetto/tree/master/module/worker/src/input/iterable.ts#L11) which supports all `Iterable` and `Iterator` sources. Additionally, the module provides [DynamicAsyncIterator](https://github.com/travetto/travetto/tree/master/module/worker/src/input/async-iterator.ts#L6) which allows for manual control of iteration, which is useful for event driven work loads. | ||
@@ -22,14 +22,22 @@ Below is a pool that will convert images on demand, while queuing as needed. | ||
```typescript | ||
class ImageProcessor { | ||
import { ExecUtil, ExecutionState } from '@travetto/boot'; | ||
import { Worker, WorkPool, IterableInputSource, DynamicAsyncIterator } from '@travetto/worker'; | ||
class ImageProcessor implements Worker<string> { | ||
active = false; | ||
proc: ChildProcess; | ||
proc: ExecutionState; | ||
destroy() { | ||
this.proc.destroy(); | ||
get id() { | ||
return this.proc.process.pid; | ||
} | ||
async destroy() { | ||
this.proc.process.kill(); | ||
} | ||
async execute(path: string) { | ||
this.active = true; | ||
try { | ||
this.proc = ...convert ... | ||
this.proc = ExecUtil.spawn('convert images', [path]); | ||
await this.proc; | ||
@@ -43,5 +51,5 @@ } catch (e) { | ||
class ImageCompressor extends WorkerPool { | ||
export class ImageCompressor extends WorkPool<string, ImageProcessor> { | ||
pendingImages = new EventInputSource<string>(); | ||
pendingImages = new DynamicAsyncIterator<string>(); | ||
@@ -53,7 +61,7 @@ constructor() { | ||
begin() { | ||
this.process(this.pendingImages); | ||
this.process(new IterableInputSource(this.pendingImages)); | ||
} | ||
convert(...images: string[]) { | ||
this.pendingImages.trigger(images); | ||
this.pendingImages.add(images); | ||
} | ||
@@ -67,55 +75,115 @@ } | ||
Within the `comm` package, there is support for two primary communication elements: `child` and `parent`. Usually `parent` indicates it is the owner of the sub process. `Child` indicates that it has been created/spawned/forked by the parent and will communicate back to it's parent. This generally means that a `parent` channel can be destroyed (i.e. killing the subprocess) where a `child` channel can only exit the process, but the channel cannot be destroyed. | ||
Within the `comm` package, there is support for two primary communication elements: [ChildCommChannel](https://github.com/travetto/travetto/tree/master/module/worker/src/comm/child.ts#L6) and [ParentCommChannel](https://github.com/travetto/travetto/tree/master/module/worker/src/comm/parent.ts#L9). Usually [ParentCommChannel](https://github.com/travetto/travetto/tree/master/module/worker/src/comm/parent.ts#L9) indicates it is the owner of the sub process. [ChildCommChannel](https://github.com/travetto/travetto/tree/master/module/worker/src/comm/child.ts#L6) indicates that it has been created/spawned/forked by the parent and will communicate back to it's parent. This generally means that a [ParentCommChannel](https://github.com/travetto/travetto/tree/master/module/worker/src/comm/parent.ts#L9) can be destroyed (i.e. killing the subprocess) where a [ChildCommChannel](https://github.com/travetto/travetto/tree/master/module/worker/src/comm/child.ts#L6) can only exit the process, but the channel cannot be destroyed. | ||
### IPC as a Worker | ||
A common pattern is to want to model a sub process as a worker, to be a valid candidate in a `WorkPool`. The `WorkUtil` class provides a utility to facilitate this desire. | ||
A common pattern is to want to model a sub process as a worker, to be a valid candidate in a [WorkPool](https://github.com/travetto/travetto/tree/master/module/worker/src/pool.ts#L23). The [WorkUtil](https://github.com/travetto/travetto/tree/master/module/worker/src/util.ts#L8) class provides a utility to facilitate this desire. | ||
**Code: Spawned Worker Signature** | ||
**Code: Spawned Worker** | ||
```typescript | ||
class WorkUtil { | ||
static spawnedWorker<X>( | ||
config: SpawnConfig & { | ||
execute: (channel: ParentCommChannel, input: X) => any, | ||
destroy?: (channel: ParentCommChannel) => any, | ||
init?: (channel: ParentCommChannel) => any, | ||
import { ExecUtil, ExecutionOptions, } from '@travetto/boot'; | ||
import { ParentCommChannel } from './comm/parent'; | ||
import { Worker } from './pool'; | ||
/** | ||
* Spawned worker | ||
*/ | ||
export class WorkUtil { | ||
/** | ||
* Create a process channel worker from a given spawn config | ||
*/ | ||
static spawnedWorker<X>( | ||
command: string, | ||
{ args, opts, handlers }: { | ||
args?: string[]; | ||
opts?: ExecutionOptions; | ||
handlers: { | ||
init?: (channel: ParentCommChannel) => Promise<any>; | ||
execute: (channel: ParentCommChannel, input: X) => Promise<any>; | ||
destroy?: (channel: ParentCommChannel) => Promise<any>; | ||
}; | ||
} | ||
) | ||
): Worker<X> { | ||
const channel = new ParentCommChannel( | ||
ExecUtil.fork(command, args, opts) | ||
); | ||
return { | ||
get id() { return channel.id; }, | ||
get active() { return channel.active; }, | ||
init: handlers.init ? handlers.init.bind(handlers, channel) : undefined, | ||
execute: handlers.execute.bind(handlers, channel), | ||
async destroy() { | ||
if (handlers.destroy) { | ||
await handlers.destroy(channel); | ||
} | ||
await channel.destroy(); | ||
}, | ||
}; | ||
} | ||
} | ||
``` | ||
When creating your work, via process spawning, you will need to provide the script (and any other features you would like in `SpawnConfig`). Additionally you must, at a minimum, provide functionality to run whenever an input element is up for grabs in the input source. This method will be provided the communication channel (`parent`) and the input value. A simple example could look like: | ||
When creating your work, via process spawning, you will need to provide the script (and any other features you would like in `SpawnConfig`). Additionally you must, at a minimum, provide functionality to run whenever an input element is up for grabs in the input source. This method will be provided the communication channel ([ParentCommChannel](https://github.com/travetto/travetto/tree/master/module/worker/src/comm/parent.ts#L9)) and the input value. A simple example could look like: | ||
**Code: Simple Spawned Worker** | ||
**Code: Spawning Pool** | ||
```typescript | ||
const pool = new WorkPool(() => | ||
WorkUtil.spawnedWorker<string>(FsUtil.resolveUnix(__dirname, 'simple.child-launcher.js'), { | ||
handlers: { | ||
async init(channel) { | ||
return channel.listenOnce('ready'); // Wait for child to indicate it is ready | ||
}, | ||
async execute(channel, inp) { | ||
const res = channel.listenOnce('response'); // Register response listener | ||
channel.send('request', { data: inp }); // Send request | ||
import { WorkUtil } from '@travetto/worker/src/util'; | ||
import { FsUtil } from '@travetto/boot'; | ||
const { data } = await res; // Get answer | ||
console.log('Sent', inp, 'Received', data); | ||
import { WorkPool } from '@travetto/worker/src/pool'; | ||
import { IterableInputSource } from '@travetto/worker/src/input/iterable'; | ||
assert(inp + inp === data); // Ensure the answer is double the input | ||
} | ||
const pool = new WorkPool(() => | ||
WorkUtil.spawnedWorker<string>(FsUtil.resolveUnix(__dirname, 'spawned.js'), { | ||
handlers: { | ||
async init(channel) { | ||
return channel.listenOnce('ready'); // Wait for child to indicate it is ready | ||
}, | ||
async execute(channel, inp) { | ||
const res = channel.listenOnce('response'); // Register response listener | ||
channel.send('request', { data: inp }); // Send request | ||
const { data } = await res; // Get answer | ||
console.log('Sent', inp, 'Received', data); | ||
if (!(inp + inp === data)) { | ||
// Ensure the answer is double the input | ||
throw new Error(`Didn't get the double`); | ||
} | ||
}) | ||
); | ||
} | ||
} | ||
}) | ||
); | ||
pool.process(new IterableInputSource([1, 2, 3, 4, 5])).then(x => pool.shutdown()); | ||
``` | ||
**Code: Spawned Worker Target** | ||
```typescript | ||
const exec = new ChildCommChannel<{ data: string }>(); | ||
**Code: Spawned Worker** | ||
```javascript | ||
require('@travetto/boot/register'); | ||
require('@travetto/base').PhaseManager.init().then(async () => { | ||
const { ChildCommChannel } = require('../../..'); | ||
exec.listenFor('request', data => { | ||
exec.send('response', { data: (data.data + data.data) }); // When data is received, return double | ||
const exec = new ChildCommChannel(); | ||
exec.listenFor('request', data => { | ||
exec.send('response', { data: (data.data + data.data) }); // When data is received, return double | ||
}); | ||
exec.send('ready'); // Indicate the child is ready to receive requests | ||
const heartbeat = () => setTimeout(heartbeat, 5000); // Keep-alive | ||
heartbeat(); | ||
}); | ||
``` | ||
exec.send('ready'); // Indicate the child is ready to receive requests | ||
**Terminal: Output** | ||
```bash | ||
$ ./alt/docs/src/spawner.ts -r @travetto/boot/register ./alt/docs/src/spawner.ts | ||
const heartbeat = () => setTimeout(heartbeat, 5000); // Keep-alive | ||
heartbeat(); | ||
``` | ||
Sent 1 Received 2 | ||
Sent 2 Received 4 | ||
Sent 3 Received 6 | ||
Sent 4 Received 8 | ||
Sent 5 Received 10 | ||
``` | ||
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
No v1
QualityPackage is not semver >=1. This means it is not stable and does not support ^ ranges.
Found 1 instance in 1 package
26949
17
575
0
42
Updated@travetto/base@^1.0.0