Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

@travetto/worker

Package Overview
Dependencies
Maintainers
1
Versions
186
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@travetto/worker - npm Package Compare versions

Comparing version 1.0.0-rc.8 to 1.0.0

README.js

7

package.json

@@ -9,5 +9,6 @@ {

},
"title": "Worker",
"description": "Process management utilties, with a focus on inter-process communication",
"dependencies": {
"@travetto/base": "^1.0.0-rc.8",
"@travetto/base": "^1.0.0",
"@types/generic-pool": "^3.1.9",

@@ -34,4 +35,4 @@ "generic-pool": "^3.7.1"

},
"version": "1.0.0-rc.8",
"gitHead": "cbb74e652d4277ec98c98098b931d6503a586af2"
"version": "1.0.0",
"gitHead": "7579e30893669f4136472c7d0a327a9cfdd1c804"
}

@@ -1,17 +0,17 @@

travetto: Worker
===
<!-- This file was generated by the framweork and should not be modified directly -->
<!-- Please modify https://github.com/travetto/travetto/tree/master/module/worker/README.js and execute "npm run docs" to rebuild -->
# Worker
## Process management utilties, with a focus on inter-process communication
**Install: primary**
**Install: @travetto/worker**
```bash
$ npm install @travetto/worker
npm install @travetto/worker
```
This module provides the necessary primitives for handling dependent workers. A worker can be an individual actor or could be a pool of workers. Node provides ipc (inter-process communication) functionality out of the box. This module builds upon that by providing enhanced event management, richer process management, as well as constructs for orchestrating a conversation between two processes.
This module provides the necessary primitives for handling dependent workers. A worker can be an individual actor or could be a pool of workers. Node provides ipc (inter-process communication) functionality out of the box. This module builds upon that by providing enhanced event management, richer process management, as well as constructs for orchestrating a conversation between two processes.
## Execution Pools
With respect to managing multiple executions, [`WorkPool`](./src/pool.ts) is provided to allow for concurrent operation, and processing of jobs concurrently. To manage the flow of jobs, there are various [`InputSource`](./src/input/types.ts) implementation that allow for a wide range of use cases.
With respect to managing multiple executions, [WorkPool](https://github.com/travetto/travetto/tree/master/module/worker/src/pool.ts#L23) is provided to allow for concurrent operation, and processing of jobs concurrently. To manage the flow of jobs, there are various [InputSource](src/input/types.ts#L3) implementation that allow for a wide range of use cases.
The supported `InputSource`s are
- ```Iterable``` supports any iterable (Array, Set, etc) input as well as any async iterable input. The source will continue to produce jobs until the underlying iterator is exhausted.
- ```Event``` is an asynchronous source that allows the caller to determine when the next item is available. Useful triggering work on event driven problems.
The only provided [InputSource](src/input/types.ts#L3) is the [IterableInputSource](https://github.com/travetto/travetto/tree/master/module/worker/src/input/iterable.ts#L11) which supports all `Iterable` and `Iterator` sources. Additionally, the module provides [DynamicAsyncIterator](https://github.com/travetto/travetto/tree/master/module/worker/src/input/async-iterator.ts#L6) which allows for manual control of iteration, which is useful for event driven work loads.

@@ -22,14 +22,22 @@ Below is a pool that will convert images on demand, while queuing as needed.

```typescript
class ImageProcessor {
import { ExecUtil, ExecutionState } from '@travetto/boot';
import { Worker, WorkPool, IterableInputSource, DynamicAsyncIterator } from '@travetto/worker';
class ImageProcessor implements Worker<string> {
active = false;
proc: ChildProcess;
proc: ExecutionState;
destroy() {
this.proc.destroy();
get id() {
return this.proc.process.pid;
}
async destroy() {
this.proc.process.kill();
}
async execute(path: string) {
this.active = true;
try {
this.proc = ...convert ...
this.proc = ExecUtil.spawn('convert images', [path]);
await this.proc;

@@ -43,5 +51,5 @@ } catch (e) {

class ImageCompressor extends WorkerPool {
export class ImageCompressor extends WorkPool<string, ImageProcessor> {
pendingImages = new EventInputSource<string>();
pendingImages = new DynamicAsyncIterator<string>();

@@ -53,7 +61,7 @@ constructor() {

begin() {
this.process(this.pendingImages);
this.process(new IterableInputSource(this.pendingImages));
}
convert(...images: string[]) {
this.pendingImages.trigger(images);
this.pendingImages.add(images);
}

@@ -67,55 +75,115 @@ }

Within the `comm` package, there is support for two primary communication elements: `child` and `parent`. Usually `parent` indicates it is the owner of the sub process. `Child` indicates that it has been created/spawned/forked by the parent and will communicate back to it's parent. This generally means that a `parent` channel can be destroyed (i.e. killing the subprocess) where a `child` channel can only exit the process, but the channel cannot be destroyed.
Within the `comm` package, there is support for two primary communication elements: [ChildCommChannel](https://github.com/travetto/travetto/tree/master/module/worker/src/comm/child.ts#L6) and [ParentCommChannel](https://github.com/travetto/travetto/tree/master/module/worker/src/comm/parent.ts#L9). Usually [ParentCommChannel](https://github.com/travetto/travetto/tree/master/module/worker/src/comm/parent.ts#L9) indicates it is the owner of the sub process. [ChildCommChannel](https://github.com/travetto/travetto/tree/master/module/worker/src/comm/child.ts#L6) indicates that it has been created/spawned/forked by the parent and will communicate back to it's parent. This generally means that a [ParentCommChannel](https://github.com/travetto/travetto/tree/master/module/worker/src/comm/parent.ts#L9) can be destroyed (i.e. killing the subprocess) where a [ChildCommChannel](https://github.com/travetto/travetto/tree/master/module/worker/src/comm/child.ts#L6) can only exit the process, but the channel cannot be destroyed.
### IPC as a Worker
A common pattern is to want to model a sub process as a worker, to be a valid candidate in a `WorkPool`. The `WorkUtil` class provides a utility to facilitate this desire.
A common pattern is to want to model a sub process as a worker, to be a valid candidate in a [WorkPool](https://github.com/travetto/travetto/tree/master/module/worker/src/pool.ts#L23). The [WorkUtil](https://github.com/travetto/travetto/tree/master/module/worker/src/util.ts#L8) class provides a utility to facilitate this desire.
**Code: Spawned Worker Signature**
**Code: Spawned Worker**
```typescript
class WorkUtil {
static spawnedWorker<X>(
config: SpawnConfig & {
execute: (channel: ParentCommChannel, input: X) => any,
destroy?: (channel: ParentCommChannel) => any,
init?: (channel: ParentCommChannel) => any,
import { ExecUtil, ExecutionOptions, } from '@travetto/boot';
import { ParentCommChannel } from './comm/parent';
import { Worker } from './pool';
/**
* Spawned worker
*/
export class WorkUtil {
/**
* Create a process channel worker from a given spawn config
*/
static spawnedWorker<X>(
command: string,
{ args, opts, handlers }: {
args?: string[];
opts?: ExecutionOptions;
handlers: {
init?: (channel: ParentCommChannel) => Promise<any>;
execute: (channel: ParentCommChannel, input: X) => Promise<any>;
destroy?: (channel: ParentCommChannel) => Promise<any>;
};
}
)
): Worker<X> {
const channel = new ParentCommChannel(
ExecUtil.fork(command, args, opts)
);
return {
get id() { return channel.id; },
get active() { return channel.active; },
init: handlers.init ? handlers.init.bind(handlers, channel) : undefined,
execute: handlers.execute.bind(handlers, channel),
async destroy() {
if (handlers.destroy) {
await handlers.destroy(channel);
}
await channel.destroy();
},
};
}
}
```
When creating your work, via process spawning, you will need to provide the script (and any other features you would like in `SpawnConfig`). Additionally you must, at a minimum, provide functionality to run whenever an input element is up for grabs in the input source. This method will be provided the communication channel (`parent`) and the input value. A simple example could look like:
When creating your work, via process spawning, you will need to provide the script (and any other features you would like in `SpawnConfig`). Additionally you must, at a minimum, provide functionality to run whenever an input element is up for grabs in the input source. This method will be provided the communication channel ([ParentCommChannel](https://github.com/travetto/travetto/tree/master/module/worker/src/comm/parent.ts#L9)) and the input value. A simple example could look like:
**Code: Simple Spawned Worker**
**Code: Spawning Pool**
```typescript
const pool = new WorkPool(() =>
WorkUtil.spawnedWorker<string>(FsUtil.resolveUnix(__dirname, 'simple.child-launcher.js'), {
handlers: {
async init(channel) {
return channel.listenOnce('ready'); // Wait for child to indicate it is ready
},
async execute(channel, inp) {
const res = channel.listenOnce('response'); // Register response listener
channel.send('request', { data: inp }); // Send request
import { WorkUtil } from '@travetto/worker/src/util';
import { FsUtil } from '@travetto/boot';
const { data } = await res; // Get answer
console.log('Sent', inp, 'Received', data);
import { WorkPool } from '@travetto/worker/src/pool';
import { IterableInputSource } from '@travetto/worker/src/input/iterable';
assert(inp + inp === data); // Ensure the answer is double the input
}
const pool = new WorkPool(() =>
WorkUtil.spawnedWorker<string>(FsUtil.resolveUnix(__dirname, 'spawned.js'), {
handlers: {
async init(channel) {
return channel.listenOnce('ready'); // Wait for child to indicate it is ready
},
async execute(channel, inp) {
const res = channel.listenOnce('response'); // Register response listener
channel.send('request', { data: inp }); // Send request
const { data } = await res; // Get answer
console.log('Sent', inp, 'Received', data);
if (!(inp + inp === data)) {
// Ensure the answer is double the input
throw new Error(`Didn't get the double`);
}
})
);
}
}
})
);
pool.process(new IterableInputSource([1, 2, 3, 4, 5])).then(x => pool.shutdown());
```
**Code: Spawned Worker Target**
```typescript
const exec = new ChildCommChannel<{ data: string }>();
**Code: Spawned Worker**
```javascript
require('@travetto/boot/register');
require('@travetto/base').PhaseManager.init().then(async () => {
const { ChildCommChannel } = require('../../..');
exec.listenFor('request', data => {
exec.send('response', { data: (data.data + data.data) }); // When data is received, return double
const exec = new ChildCommChannel();
exec.listenFor('request', data => {
exec.send('response', { data: (data.data + data.data) }); // When data is received, return double
});
exec.send('ready'); // Indicate the child is ready to receive requests
const heartbeat = () => setTimeout(heartbeat, 5000); // Keep-alive
heartbeat();
});
```
exec.send('ready'); // Indicate the child is ready to receive requests
**Terminal: Output**
```bash
$ ./alt/docs/src/spawner.ts -r @travetto/boot/register ./alt/docs/src/spawner.ts
const heartbeat = () => setTimeout(heartbeat, 5000); // Keep-alive
heartbeat();
```
Sent 1 Received 2
Sent 2 Received 4
Sent 3 Received 6
Sent 4 Received 8
Sent 5 Received 10
```
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc