Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

@travetto/worker

Package Overview
Dependencies
Maintainers
1
Versions
186
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@travetto/worker - npm Package Compare versions

Comparing version 1.0.0-beta.1 to 1.0.0-rc.0

src/input/async-iterator.ts

12

index.ts

@@ -5,9 +5,9 @@ export * from './src/comm/channel';

export * from './src/comm/types';
export * from './src/comm/util';
export * from './src/input/array';
export * from './src/input/iterator';
export * from './src/input/queue';
export * from './src/input/iterable';
export * from './src/input/async-iterator';
export * from './src/input/types';
export * from './src/idle';
export * from './src/support/barrier';
export * from './src/support/timeout';
export * from './src/support/error';
export * from './src/pool';
export * from './src/util';
export * from './src/util';

@@ -11,4 +11,3 @@ {

"dependencies": {
"@travetto/base": "^1.0.0-beta.1",
"@travetto/exec": "^1.0.0-beta.1",
"@travetto/base": "^1.0.0-rc.0",
"@types/generic-pool": "^3.1.9",

@@ -20,3 +19,2 @@ "generic-pool": "^3.7.1"

"exec",
"docker",
"child-process",

@@ -37,4 +35,3 @@ "ipc",

},
"version": "1.0.0-beta.1",
"gitHead": "81e362a7d911693c635d2c077f3a03229c74d331"
"version": "1.0.0-rc.0"
}

@@ -15,5 +15,4 @@ travetto: Worker

The supported `InputSource`s are
* ```Array``` is a list of jobs, will execute in order until list is exhausted.
- ```Queue``` is similar to list but will execute forever waiting for new items to be added to the queue.
- ```Iterator``` is a generator function that will continue to produce jobs until the iterator is exhausted.
- ```Iterable``` supports any iterable (Array, Set, etc) input as well as any async iterable input. The source will continue to produce jobs until the underlying iterator is exhausted.
- ```Event``` is an asynchronous source that allows the caller to determine when the next item is available. Useful triggering work on event driven problems.

@@ -46,6 +45,6 @@ Below is a pool that will convert images on demand, while queuing as needed.

pendingImages = new QueueInputSource<string>();
pendingImages = new EventInputSource<string>();
constructor() {
super(async () => new ImageProcess());
super(async () => new ImageProcessor());
}

@@ -58,5 +57,3 @@

convert(...images: string[]) {
for (const img of images) {
this.pendingImages.enqueue(img);
}
this.pendingImages.trigger(images);
}

@@ -92,16 +89,16 @@ }

const pool = new WorkPool(() =>
WorkUtil.spawnedWorker<string>({
command: FsUtil.resolveUnix(__dirname, 'simple.child-launcher.js'),
fork: true,
async init(channel) {
return channel.listenOnce('ready'); // Wait for child to indicate it is ready
},
async execute(channel, inp) {
const res = channel.listenOnce('response'); // Register response listener
channel.send('request', { data: inp }); // Send request
WorkUtil.spawnedWorker<string>(FsUtil.resolveUnix(__dirname, 'simple.child-launcher.js'), {
handlers: {
async init(channel) {
return channel.listenOnce('ready'); // Wait for child to indicate it is ready
},
async execute(channel, inp) {
const res = channel.listenOnce('response'); // Register response listener
channel.send('request', { data: inp }); // Send request
const { data } = await res; // Get answer
console.log('Sent', inp, 'Received', data);
const { data } = await res; // Get answer
console.log('Sent', inp, 'Received', data);
assert(inp + inp === data); // Ensure the answer is double the input
assert(inp + inp === data); // Ensure the answer is double the input
}
}

@@ -108,0 +105,0 @@ })

@@ -5,6 +5,7 @@ import { ChildProcess } from 'child_process';

import { CommEvent } from './types';
/**
* Channel that represents communication between parent/child
*/
export class ProcessCommChannel<T extends NodeJS.Process | ChildProcess, V = any, U extends { type: string } = V & { type: string }> {
export class ProcessCommChannel<T extends NodeJS.Process | ChildProcess, U extends CommEvent = CommEvent> {
public proc: T;

@@ -21,2 +22,5 @@

/**
* Get's channel unique identifier
*/
get id() {

@@ -26,2 +30,5 @@ return this.proc && this.proc.pid;

/**
* Determines if channel is active
*/
get active() {

@@ -31,2 +38,5 @@ return !!this.proc;

/**
* Send data to the parent
*/
send(eventType: string, data?: any) {

@@ -37,3 +47,3 @@ if (Env.trace) {

if (this.proc.send) {
this.proc.send({ type: eventType, ...(data || {}) });
this.proc.send({ type: eventType, ...(data ?? {}) });
} else {

@@ -44,2 +54,5 @@ throw new Error('this._proc.send was not defined');

/**
* Listen for an event, once
*/
listenOnce(eventType: string): Promise<U>;

@@ -63,2 +76,5 @@ listenOnce(eventType: string, callback: (e: U) => any): void;

/**
* Remove a specific listener
*/
removeListener(fn: (e: U) => any) {

@@ -68,2 +84,5 @@ this.proc.removeListener('message', fn);

/**
* Listen for a specific message type
*/
listenFor(eventType: string, callback: (e: U, complete: Function) => any) {

@@ -78,8 +97,17 @@ const fn = (event: U, kill: Function) => {

/**
* Listen, and return a handle to remove listener when desired
*/
listen(handler: (e: U, complete: Function) => any) {
let fn: (e: U) => void;
if (!this.proc) {
return;
}
const holder: { fn?(e: U): void } = {};
const kill = (e?: any) => {
this.removeListener(fn);
this.removeListener(holder.fn!);
};
fn = (e: U) => {
holder.fn = (e: U) => {
if (Env.trace) {

@@ -95,8 +123,8 @@ console.trace(`[${this.parentId}] Received [${this.id}] ${e.type}`);

}
} catch (e) {
kill(e);
} catch (err) {
kill(err);
}
};
this.proc.on('message', fn);
this.proc.on('message', holder.fn);

@@ -106,2 +134,5 @@ return kill;

/**
* Destroy self
*/
async destroy() {

@@ -115,2 +146,5 @@ if (this.proc) {

/**
* Remove all listeners, but do not destroy
*/
release() {

@@ -117,0 +151,0 @@ if (this.proc) {

@@ -1,24 +0,37 @@

import { IdleManager } from '../idle';
import { CommEvent } from './types';
import { ProcessCommChannel } from './channel';
export class ChildCommChannel<U extends CommEvent = CommEvent> extends ProcessCommChannel<NodeJS.Process, U> {
idle: IdleManager;
/**
* Child channel, communicates only to parent
*/
export class ChildCommChannel<U = any> extends ProcessCommChannel<NodeJS.Process, U> {
idleTimer: NodeJS.Timer | undefined;
constructor(timeout?: number) {
constructor(private timeout?: number) {
super(process);
if (timeout) {
this.idle = new IdleManager(timeout);
process.on('message', () => this.idle.extend());
this.idle.start();
process.on('message', () => this.idle());
this.idle();
}
}
/**
* Kill self and stop the keep alive
*/
async destroy() {
await super.destroy();
if (this.idle) {
this.idle.stop();
this.idle(0);
}
/**
* Control the idle behavior of the process
*/
async idle(timeout: number | undefined = this.timeout) {
if (this.idleTimer) {
clearTimeout(this.idleTimer);
}
if (timeout) {
this.idleTimer = setTimeout(() => process.exit(0), timeout).unref();
}
}
}

@@ -1,11 +0,12 @@

import * as child_process from 'child_process';
import { ExecutionState, ExecutionResult } from '@travetto/exec';
import { ChildProcess } from 'child_process';
import { ExecutionState, ExecUtil } from '@travetto/boot';
import { CommEvent } from './types';
import { ProcessCommChannel } from './channel';
import { CommUtil } from './util';
export class ParentCommChannel<U extends CommEvent = CommEvent> extends ProcessCommChannel<child_process.ChildProcess, U> {
/**
* Parent channel
*/
export class ParentCommChannel<U = any> extends ProcessCommChannel<ChildProcess, U> {
private complete: Promise<ExecutionResult>;
private complete: ExecutionState['result'];

@@ -18,5 +19,8 @@ constructor(state: ExecutionState) {

/**
* Kill self and child
*/
async destroy() {
if (this.proc) {
CommUtil.killSpawnedProcess(this.proc);
ExecUtil.kill(this.proc);
await this.complete;

@@ -23,0 +27,0 @@ }

@@ -1,25 +0,9 @@

import { ExecutionOptions } from '@travetto/exec';
export interface ChildOptions extends ExecutionOptions {
cwd?: string;
env?: any;
stdio?: any;
uid?: number;
gid?: number;
}
export interface CommEvent {
type?: string;
[key: string]: any;
}
/**
* Process status
*/
export type Status = 'init' | 'release' | 'destroy';
/**
* Listen for changes in status
*/
export type StatusChangeHandler = (status: Status) => any;
export interface SpawnConfig {
command: string;
args?: string[];
fork?: boolean;
opts?: ChildOptions;
}

@@ -0,4 +1,13 @@

/**
* Definition for an input source
*/
export interface InputSource<X> {
/**
* Determines if there is more work to do
*/
hasNext(): boolean | Promise<boolean>;
/**
* Get next item
*/
next(): X | Promise<X>;
}
import * as os from 'os';
import * as gp from 'generic-pool';
import { Shutdown } from '@travetto/base';
import { ShutdownManager } from '@travetto/base';
import { InputSource } from './input/types';
/**
* Worker defintion
*/
export interface Worker<X> {

@@ -17,2 +20,5 @@ active: boolean;

/**
* Work pool support
*/
export class WorkPool<X, T extends Worker<X>> {

@@ -22,6 +28,25 @@

/**
* Generic-pool pool
*/
private pool: gp.Pool<T>;
/**
* Number of acquistions in process
*/
private pendingAcquires = 0;
/**
* List of errors during processing
*/
private errors: Error[] = [];
/**
* Error count during creation
*/
private createErrors = 0;
/**
*
* @param getWorker Produces a new worker for the pool
* @param opts Pool options
*/
constructor(getWorker: () => Promise<T> | T, opts?: gp.Options) {

@@ -32,45 +57,53 @@ const args = {

evictionRunIntervalMillis: 5000,
...(opts || {}),
...(opts ?? {}),
};
let createErrors = 0;
// Create the pool
this.pool = gp.createPool({
create: () => this.createAndTrack(getWorker, args),
destroy: x => this.destroy(x),
validate: async (x: T) => x.active
}, args);
// tslint:disable-next-line: no-this-assignment
const self = this;
ShutdownManager.onShutdown(`worker.pool.${this.constructor.name}`, () => this.shutdown());
}
this.pool = gp.createPool({
async create() {
try {
self.pendingAcquires += 1;
const res = await getWorker();
if (res.init) {
await res.init();
}
/**
* Creates and tracks new worker
*/
async createAndTrack(getWorker: () => Promise<T> | T, opts: gp.Options) {
try {
this.pendingAcquires += 1;
const res = await getWorker();
createErrors = 0; // Reset errors on success
if (res.init) {
await res.init();
}
return res;
} catch (e) {
if (createErrors++ > args.max) { // If error count is bigger than pool size, we broke
console.error(e);
process.exit(1);
}
throw e;
} finally {
self.pendingAcquires -= 1;
}
},
async destroy(x: T) {
console.trace(`[${process.pid}] Destroying ${x.id}`);
return x.destroy();
},
async validate(x: T) {
return x.active;
this.createErrors = 0; // Reset errors on success
return res;
} catch (e) {
if (this.createErrors++ > opts.max!) { // If error count is bigger than pool size, we broke
console.error(e);
process.exit(1);
}
}, args);
throw e;
} finally {
this.pendingAcquires -= 1;
}
}
Shutdown.onShutdown(`worker.pool.${this.constructor.name}`, () => this.shutdown());
/**
* Destroy the worker
*/
async destroy(worker: T) {
console.trace(`[${process.pid}] Destroying ${worker.id}`);
return worker.destroy();
}
/**
* Free worker on completion
*/
async release(worker: T) {

@@ -92,2 +125,5 @@ console.trace(`[${process.pid}] Releasing ${worker.id}`);

/**
* Process a given input source
*/
async process(src: InputSource<X>) {

@@ -116,2 +152,5 @@ const pending = new Set<Promise<any>>();

/**
* Shutdown pool
*/
async shutdown() {

@@ -118,0 +157,0 @@ while (this.pendingAcquires) {

@@ -1,16 +0,26 @@

import { SpawnConfig } from './comm/types';
import { ExecUtil, ExecutionOptions, } from '@travetto/boot';
import { ParentCommChannel } from './comm/parent';
import { CommUtil } from './comm/util';
import { Worker } from './pool';
/**
* Spawned worker
*/
export class WorkUtil {
/**
* Create a process channel worker from a given spawn config
*/
static spawnedWorker<X>(
config: SpawnConfig & {
init?: (channel: ParentCommChannel) => Promise<any>,
execute: (channel: ParentCommChannel, input: X) => Promise<any>,
destroy?: (channel: ParentCommChannel) => Promise<any>,
command: string,
{ args, opts, handlers }: {
args?: string[];
opts?: ExecutionOptions;
handlers: {
init?: (channel: ParentCommChannel) => Promise<any>;
execute: (channel: ParentCommChannel, input: X) => Promise<any>;
destroy?: (channel: ParentCommChannel) => Promise<any>;
};
}
): Worker<X> {
const channel = new ParentCommChannel(
CommUtil.spawnProcess(config)
ExecUtil.fork(command, args, opts)
);

@@ -20,7 +30,7 @@ return {

get active() { return channel.active; },
init: config.init ? config.init.bind(config, channel) : undefined,
execute: config.execute.bind(config, channel),
init: handlers.init ? handlers.init.bind(handlers, channel) : undefined,
execute: handlers.execute.bind(handlers, channel),
async destroy() {
if (config.destroy) {
await config.destroy(channel);
if (handlers.destroy) {
await handlers.destroy(channel);
}

@@ -27,0 +37,0 @@ await channel.destroy();

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc