@travetto/worker
Advanced tools
Comparing version 1.0.0-beta.1 to 1.0.0-rc.0
12
index.ts
@@ -5,9 +5,9 @@ export * from './src/comm/channel'; | ||
export * from './src/comm/types'; | ||
export * from './src/comm/util'; | ||
export * from './src/input/array'; | ||
export * from './src/input/iterator'; | ||
export * from './src/input/queue'; | ||
export * from './src/input/iterable'; | ||
export * from './src/input/async-iterator'; | ||
export * from './src/input/types'; | ||
export * from './src/idle'; | ||
export * from './src/support/barrier'; | ||
export * from './src/support/timeout'; | ||
export * from './src/support/error'; | ||
export * from './src/pool'; | ||
export * from './src/util'; | ||
export * from './src/util'; |
@@ -11,4 +11,3 @@ { | ||
"dependencies": { | ||
"@travetto/base": "^1.0.0-beta.1", | ||
"@travetto/exec": "^1.0.0-beta.1", | ||
"@travetto/base": "^1.0.0-rc.0", | ||
"@types/generic-pool": "^3.1.9", | ||
@@ -20,3 +19,2 @@ "generic-pool": "^3.7.1" | ||
"exec", | ||
"docker", | ||
"child-process", | ||
@@ -37,4 +35,3 @@ "ipc", | ||
}, | ||
"version": "1.0.0-beta.1", | ||
"gitHead": "81e362a7d911693c635d2c077f3a03229c74d331" | ||
"version": "1.0.0-rc.0" | ||
} |
@@ -15,5 +15,4 @@ travetto: Worker | ||
The supported `InputSource`s are | ||
* ```Array``` is a list of jobs, will execute in order until list is exhausted. | ||
- ```Queue``` is similar to list but will execute forever waiting for new items to be added to the queue. | ||
- ```Iterator``` is a generator function that will continue to produce jobs until the iterator is exhausted. | ||
- ```Iterable``` supports any iterable (Array, Set, etc) input as well as any async iterable input. The source will continue to produce jobs until the underlying iterator is exhausted. | ||
- ```Event``` is an asynchronous source that allows the caller to determine when the next item is available. Useful triggering work on event driven problems. | ||
@@ -46,6 +45,6 @@ Below is a pool that will convert images on demand, while queuing as needed. | ||
pendingImages = new QueueInputSource<string>(); | ||
pendingImages = new EventInputSource<string>(); | ||
constructor() { | ||
super(async () => new ImageProcess()); | ||
super(async () => new ImageProcessor()); | ||
} | ||
@@ -58,5 +57,3 @@ | ||
convert(...images: string[]) { | ||
for (const img of images) { | ||
this.pendingImages.enqueue(img); | ||
} | ||
this.pendingImages.trigger(images); | ||
} | ||
@@ -92,16 +89,16 @@ } | ||
const pool = new WorkPool(() => | ||
WorkUtil.spawnedWorker<string>({ | ||
command: FsUtil.resolveUnix(__dirname, 'simple.child-launcher.js'), | ||
fork: true, | ||
async init(channel) { | ||
return channel.listenOnce('ready'); // Wait for child to indicate it is ready | ||
}, | ||
async execute(channel, inp) { | ||
const res = channel.listenOnce('response'); // Register response listener | ||
channel.send('request', { data: inp }); // Send request | ||
WorkUtil.spawnedWorker<string>(FsUtil.resolveUnix(__dirname, 'simple.child-launcher.js'), { | ||
handlers: { | ||
async init(channel) { | ||
return channel.listenOnce('ready'); // Wait for child to indicate it is ready | ||
}, | ||
async execute(channel, inp) { | ||
const res = channel.listenOnce('response'); // Register response listener | ||
channel.send('request', { data: inp }); // Send request | ||
const { data } = await res; // Get answer | ||
console.log('Sent', inp, 'Received', data); | ||
const { data } = await res; // Get answer | ||
console.log('Sent', inp, 'Received', data); | ||
assert(inp + inp === data); // Ensure the answer is double the input | ||
assert(inp + inp === data); // Ensure the answer is double the input | ||
} | ||
} | ||
@@ -108,0 +105,0 @@ }) |
@@ -5,6 +5,7 @@ import { ChildProcess } from 'child_process'; | ||
import { CommEvent } from './types'; | ||
/** | ||
* Channel that represents communication between parent/child | ||
*/ | ||
export class ProcessCommChannel<T extends NodeJS.Process | ChildProcess, V = any, U extends { type: string } = V & { type: string }> { | ||
export class ProcessCommChannel<T extends NodeJS.Process | ChildProcess, U extends CommEvent = CommEvent> { | ||
public proc: T; | ||
@@ -21,2 +22,5 @@ | ||
/** | ||
* Get's channel unique identifier | ||
*/ | ||
get id() { | ||
@@ -26,2 +30,5 @@ return this.proc && this.proc.pid; | ||
/** | ||
* Determines if channel is active | ||
*/ | ||
get active() { | ||
@@ -31,2 +38,5 @@ return !!this.proc; | ||
/** | ||
* Send data to the parent | ||
*/ | ||
send(eventType: string, data?: any) { | ||
@@ -37,3 +47,3 @@ if (Env.trace) { | ||
if (this.proc.send) { | ||
this.proc.send({ type: eventType, ...(data || {}) }); | ||
this.proc.send({ type: eventType, ...(data ?? {}) }); | ||
} else { | ||
@@ -44,2 +54,5 @@ throw new Error('this._proc.send was not defined'); | ||
/** | ||
* Listen for an event, once | ||
*/ | ||
listenOnce(eventType: string): Promise<U>; | ||
@@ -63,2 +76,5 @@ listenOnce(eventType: string, callback: (e: U) => any): void; | ||
/** | ||
* Remove a specific listener | ||
*/ | ||
removeListener(fn: (e: U) => any) { | ||
@@ -68,2 +84,5 @@ this.proc.removeListener('message', fn); | ||
/** | ||
* Listen for a specific message type | ||
*/ | ||
listenFor(eventType: string, callback: (e: U, complete: Function) => any) { | ||
@@ -78,8 +97,17 @@ const fn = (event: U, kill: Function) => { | ||
/** | ||
* Listen, and return a handle to remove listener when desired | ||
*/ | ||
listen(handler: (e: U, complete: Function) => any) { | ||
let fn: (e: U) => void; | ||
if (!this.proc) { | ||
return; | ||
} | ||
const holder: { fn?(e: U): void } = {}; | ||
const kill = (e?: any) => { | ||
this.removeListener(fn); | ||
this.removeListener(holder.fn!); | ||
}; | ||
fn = (e: U) => { | ||
holder.fn = (e: U) => { | ||
if (Env.trace) { | ||
@@ -95,8 +123,8 @@ console.trace(`[${this.parentId}] Received [${this.id}] ${e.type}`); | ||
} | ||
} catch (e) { | ||
kill(e); | ||
} catch (err) { | ||
kill(err); | ||
} | ||
}; | ||
this.proc.on('message', fn); | ||
this.proc.on('message', holder.fn); | ||
@@ -106,2 +134,5 @@ return kill; | ||
/** | ||
* Destroy self | ||
*/ | ||
async destroy() { | ||
@@ -115,2 +146,5 @@ if (this.proc) { | ||
/** | ||
* Remove all listeners, but do not destroy | ||
*/ | ||
release() { | ||
@@ -117,0 +151,0 @@ if (this.proc) { |
@@ -1,24 +0,37 @@ | ||
import { IdleManager } from '../idle'; | ||
import { CommEvent } from './types'; | ||
import { ProcessCommChannel } from './channel'; | ||
export class ChildCommChannel<U extends CommEvent = CommEvent> extends ProcessCommChannel<NodeJS.Process, U> { | ||
idle: IdleManager; | ||
/** | ||
* Child channel, communicates only to parent | ||
*/ | ||
export class ChildCommChannel<U = any> extends ProcessCommChannel<NodeJS.Process, U> { | ||
idleTimer: NodeJS.Timer | undefined; | ||
constructor(timeout?: number) { | ||
constructor(private timeout?: number) { | ||
super(process); | ||
if (timeout) { | ||
this.idle = new IdleManager(timeout); | ||
process.on('message', () => this.idle.extend()); | ||
this.idle.start(); | ||
process.on('message', () => this.idle()); | ||
this.idle(); | ||
} | ||
} | ||
/** | ||
* Kill self and stop the keep alive | ||
*/ | ||
async destroy() { | ||
await super.destroy(); | ||
if (this.idle) { | ||
this.idle.stop(); | ||
this.idle(0); | ||
} | ||
/** | ||
* Control the idle behavior of the process | ||
*/ | ||
async idle(timeout: number | undefined = this.timeout) { | ||
if (this.idleTimer) { | ||
clearTimeout(this.idleTimer); | ||
} | ||
if (timeout) { | ||
this.idleTimer = setTimeout(() => process.exit(0), timeout).unref(); | ||
} | ||
} | ||
} |
@@ -1,11 +0,12 @@ | ||
import * as child_process from 'child_process'; | ||
import { ExecutionState, ExecutionResult } from '@travetto/exec'; | ||
import { ChildProcess } from 'child_process'; | ||
import { ExecutionState, ExecUtil } from '@travetto/boot'; | ||
import { CommEvent } from './types'; | ||
import { ProcessCommChannel } from './channel'; | ||
import { CommUtil } from './util'; | ||
export class ParentCommChannel<U extends CommEvent = CommEvent> extends ProcessCommChannel<child_process.ChildProcess, U> { | ||
/** | ||
* Parent channel | ||
*/ | ||
export class ParentCommChannel<U = any> extends ProcessCommChannel<ChildProcess, U> { | ||
private complete: Promise<ExecutionResult>; | ||
private complete: ExecutionState['result']; | ||
@@ -18,5 +19,8 @@ constructor(state: ExecutionState) { | ||
/** | ||
* Kill self and child | ||
*/ | ||
async destroy() { | ||
if (this.proc) { | ||
CommUtil.killSpawnedProcess(this.proc); | ||
ExecUtil.kill(this.proc); | ||
await this.complete; | ||
@@ -23,0 +27,0 @@ } |
@@ -1,25 +0,9 @@ | ||
import { ExecutionOptions } from '@travetto/exec'; | ||
export interface ChildOptions extends ExecutionOptions { | ||
cwd?: string; | ||
env?: any; | ||
stdio?: any; | ||
uid?: number; | ||
gid?: number; | ||
} | ||
export interface CommEvent { | ||
type?: string; | ||
[key: string]: any; | ||
} | ||
/** | ||
* Process status | ||
*/ | ||
export type Status = 'init' | 'release' | 'destroy'; | ||
/** | ||
* Listen for changes in status | ||
*/ | ||
export type StatusChangeHandler = (status: Status) => any; | ||
export interface SpawnConfig { | ||
command: string; | ||
args?: string[]; | ||
fork?: boolean; | ||
opts?: ChildOptions; | ||
} |
@@ -0,4 +1,13 @@ | ||
/** | ||
* Definition for an input source | ||
*/ | ||
export interface InputSource<X> { | ||
/** | ||
* Determines if there is more work to do | ||
*/ | ||
hasNext(): boolean | Promise<boolean>; | ||
/** | ||
* Get next item | ||
*/ | ||
next(): X | Promise<X>; | ||
} |
105
src/pool.ts
import * as os from 'os'; | ||
import * as gp from 'generic-pool'; | ||
import { Shutdown } from '@travetto/base'; | ||
import { ShutdownManager } from '@travetto/base'; | ||
import { InputSource } from './input/types'; | ||
/** | ||
* Worker defintion | ||
*/ | ||
export interface Worker<X> { | ||
@@ -17,2 +20,5 @@ active: boolean; | ||
/** | ||
* Work pool support | ||
*/ | ||
export class WorkPool<X, T extends Worker<X>> { | ||
@@ -22,6 +28,25 @@ | ||
/** | ||
* Generic-pool pool | ||
*/ | ||
private pool: gp.Pool<T>; | ||
/** | ||
* Number of acquistions in process | ||
*/ | ||
private pendingAcquires = 0; | ||
/** | ||
* List of errors during processing | ||
*/ | ||
private errors: Error[] = []; | ||
/** | ||
* Error count during creation | ||
*/ | ||
private createErrors = 0; | ||
/** | ||
* | ||
* @param getWorker Produces a new worker for the pool | ||
* @param opts Pool options | ||
*/ | ||
constructor(getWorker: () => Promise<T> | T, opts?: gp.Options) { | ||
@@ -32,45 +57,53 @@ const args = { | ||
evictionRunIntervalMillis: 5000, | ||
...(opts || {}), | ||
...(opts ?? {}), | ||
}; | ||
let createErrors = 0; | ||
// Create the pool | ||
this.pool = gp.createPool({ | ||
create: () => this.createAndTrack(getWorker, args), | ||
destroy: x => this.destroy(x), | ||
validate: async (x: T) => x.active | ||
}, args); | ||
// tslint:disable-next-line: no-this-assignment | ||
const self = this; | ||
ShutdownManager.onShutdown(`worker.pool.${this.constructor.name}`, () => this.shutdown()); | ||
} | ||
this.pool = gp.createPool({ | ||
async create() { | ||
try { | ||
self.pendingAcquires += 1; | ||
const res = await getWorker(); | ||
if (res.init) { | ||
await res.init(); | ||
} | ||
/** | ||
* Creates and tracks new worker | ||
*/ | ||
async createAndTrack(getWorker: () => Promise<T> | T, opts: gp.Options) { | ||
try { | ||
this.pendingAcquires += 1; | ||
const res = await getWorker(); | ||
createErrors = 0; // Reset errors on success | ||
if (res.init) { | ||
await res.init(); | ||
} | ||
return res; | ||
} catch (e) { | ||
if (createErrors++ > args.max) { // If error count is bigger than pool size, we broke | ||
console.error(e); | ||
process.exit(1); | ||
} | ||
throw e; | ||
} finally { | ||
self.pendingAcquires -= 1; | ||
} | ||
}, | ||
async destroy(x: T) { | ||
console.trace(`[${process.pid}] Destroying ${x.id}`); | ||
return x.destroy(); | ||
}, | ||
async validate(x: T) { | ||
return x.active; | ||
this.createErrors = 0; // Reset errors on success | ||
return res; | ||
} catch (e) { | ||
if (this.createErrors++ > opts.max!) { // If error count is bigger than pool size, we broke | ||
console.error(e); | ||
process.exit(1); | ||
} | ||
}, args); | ||
throw e; | ||
} finally { | ||
this.pendingAcquires -= 1; | ||
} | ||
} | ||
Shutdown.onShutdown(`worker.pool.${this.constructor.name}`, () => this.shutdown()); | ||
/** | ||
* Destroy the worker | ||
*/ | ||
async destroy(worker: T) { | ||
console.trace(`[${process.pid}] Destroying ${worker.id}`); | ||
return worker.destroy(); | ||
} | ||
/** | ||
* Free worker on completion | ||
*/ | ||
async release(worker: T) { | ||
@@ -92,2 +125,5 @@ console.trace(`[${process.pid}] Releasing ${worker.id}`); | ||
/** | ||
* Process a given input source | ||
*/ | ||
async process(src: InputSource<X>) { | ||
@@ -116,2 +152,5 @@ const pending = new Set<Promise<any>>(); | ||
/** | ||
* Shutdown pool | ||
*/ | ||
async shutdown() { | ||
@@ -118,0 +157,0 @@ while (this.pendingAcquires) { |
@@ -1,16 +0,26 @@ | ||
import { SpawnConfig } from './comm/types'; | ||
import { ExecUtil, ExecutionOptions, } from '@travetto/boot'; | ||
import { ParentCommChannel } from './comm/parent'; | ||
import { CommUtil } from './comm/util'; | ||
import { Worker } from './pool'; | ||
/** | ||
* Spawned worker | ||
*/ | ||
export class WorkUtil { | ||
/** | ||
* Create a process channel worker from a given spawn config | ||
*/ | ||
static spawnedWorker<X>( | ||
config: SpawnConfig & { | ||
init?: (channel: ParentCommChannel) => Promise<any>, | ||
execute: (channel: ParentCommChannel, input: X) => Promise<any>, | ||
destroy?: (channel: ParentCommChannel) => Promise<any>, | ||
command: string, | ||
{ args, opts, handlers }: { | ||
args?: string[]; | ||
opts?: ExecutionOptions; | ||
handlers: { | ||
init?: (channel: ParentCommChannel) => Promise<any>; | ||
execute: (channel: ParentCommChannel, input: X) => Promise<any>; | ||
destroy?: (channel: ParentCommChannel) => Promise<any>; | ||
}; | ||
} | ||
): Worker<X> { | ||
const channel = new ParentCommChannel( | ||
CommUtil.spawnProcess(config) | ||
ExecUtil.fork(command, args, opts) | ||
); | ||
@@ -20,7 +30,7 @@ return { | ||
get active() { return channel.active; }, | ||
init: config.init ? config.init.bind(config, channel) : undefined, | ||
execute: config.execute.bind(config, channel), | ||
init: handlers.init ? handlers.init.bind(handlers, channel) : undefined, | ||
execute: handlers.execute.bind(handlers, channel), | ||
async destroy() { | ||
if (config.destroy) { | ||
await config.destroy(channel); | ||
if (handlers.destroy) { | ||
await handlers.destroy(channel); | ||
} | ||
@@ -27,0 +37,0 @@ await channel.destroy(); |
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
21219
3
574
117
1
- Removed@travetto/exec@^1.0.0-beta.1
- Removed@travetto/exec@1.0.0-beta.1(transitive)
Updated@travetto/base@^1.0.0-rc.0