@travetto/exec
Advanced tools
Comparing version 0.0.2 to 0.0.3
@@ -18,3 +18,3 @@ { | ||
}, | ||
"version": "0.0.2" | ||
"version": "0.0.3" | ||
} |
import * as child_process from 'child_process'; | ||
import * as exec from './util';; | ||
import * as exec from './util'; | ||
import { CommonProcess } from './types'; | ||
import { Worker } from './worker'; | ||
import { Executor } from './executor'; | ||
export class ChildWorker<U = any> extends Worker<U, child_process.ChildProcess> { | ||
export class ChildExecutor<U = any> extends Executor<U, child_process.ChildProcess> { | ||
constructor(public command: string, fork = false) { | ||
@@ -13,3 +13,3 @@ super(new Promise((resolve) => { | ||
...process.env, | ||
WORKER: true | ||
EXECUTOR: true | ||
}, | ||
@@ -16,0 +16,0 @@ quiet: true, |
@@ -1,4 +0,4 @@ | ||
import { Worker } from './worker'; | ||
import { Executor } from './executor'; | ||
export class LocalWorker<U = any> extends Worker<U, NodeJS.Process> { | ||
export class LocalExecutor<U = any> extends Executor<U, NodeJS.Process> { | ||
constructor() { | ||
@@ -5,0 +5,0 @@ super(new Promise(resolve => resolve(process))); |
import * as os from 'os'; | ||
import { Worker } from './worker'; | ||
import { Executor } from './executor'; | ||
import { Shutdown } from '@travetto/base'; | ||
@@ -7,14 +7,14 @@ | ||
export class WorkerPool<T extends Worker<U> & { id?: number, completion?: Promise<any> }, U = any> { | ||
workerCount: number; | ||
private availableWorkers = new Set<T>(); | ||
private pendingWorkers = new Set<T>(); | ||
export class ExecPool<T extends Executor<U> & { id?: number, completion?: Promise<any> }, U = any> { | ||
executorCount: number; | ||
private availableExecutors = new Set<T>(); | ||
private pendingExecutors = new Set<T>(); | ||
private initialized: Promise<any>; | ||
constructor(count: number = 0) { | ||
this.workerCount = count || os.cpus().length - 1; | ||
this.executorCount = count || os.cpus().length - 1; | ||
} | ||
async init(create: () => Promise<T>) { | ||
while (this.availableSize < this.workerCount) { | ||
while (this.availableSize < this.executorCount) { | ||
const w = await create(); | ||
@@ -27,11 +27,11 @@ w.id = id++; | ||
get availableSize() { | ||
return this.availableWorkers.size; | ||
return this.availableExecutors.size; | ||
} | ||
async getNextWorker() { | ||
if (this.availableWorkers.size === 0) { | ||
async getNextexecutor() { | ||
if (this.availableExecutors.size === 0) { | ||
return undefined; | ||
} else { | ||
const agent = this.availableWorkers.values().next().value; | ||
this.availableWorkers.delete(agent); | ||
const agent = this.availableExecutors.values().next().value; | ||
this.availableExecutors.delete(agent); | ||
await agent.init(); | ||
@@ -42,9 +42,9 @@ return agent; | ||
returnWorker(worker: T) { | ||
this.pendingWorkers.delete(worker); | ||
this.availableWorkers.add(worker); | ||
worker.clean(); | ||
returnexecutor(executor: T) { | ||
this.pendingExecutors.delete(executor); | ||
this.availableExecutors.add(executor); | ||
executor.clean(); | ||
} | ||
async process<X>(inputs: X[], handler: { init: () => Promise<T>, exec: (inp: X, worker?: T) => Promise<any> }) { | ||
async process<X>(inputs: X[], handler: { init: () => Promise<T>, exec: (inp: X, executor?: T) => Promise<any> }) { | ||
await this.init(handler.init); | ||
@@ -55,29 +55,29 @@ | ||
while (position < inputs.length) { | ||
if (this.pendingWorkers.size < this.availableSize) { | ||
if (this.pendingExecutors.size < this.availableSize) { | ||
const next = position++; | ||
const worker = (await this.getNextWorker())!; | ||
const executor = (await this.getNextexecutor())!; | ||
worker.completion = handler.exec(inputs[next], worker).then(x => worker, e => worker); | ||
executor.completion = handler.exec(inputs[next], executor).then(x => executor, e => executor); | ||
this.pendingWorkers.add(worker); | ||
this.pendingExecutors.add(executor); | ||
} else { | ||
const worker = await Promise.race(Array.from(this.pendingWorkers).map(x => x.completion)); | ||
this.returnWorker(worker); | ||
const executor = await Promise.race(Array.from(this.pendingExecutors).map(x => x.completion)); | ||
this.returnexecutor(executor); | ||
} | ||
} | ||
await Promise.all(Array.from(this.pendingWorkers).map(x => x.completion)); | ||
await Promise.all(Array.from(this.pendingExecutors).map(x => x.completion)); | ||
} | ||
shutdown() { | ||
for (const worker of Array.from(this.pendingWorkers)) { | ||
this.returnWorker(worker); | ||
for (const executor of Array.from(this.pendingExecutors)) { | ||
this.returnexecutor(executor); | ||
} | ||
for (const worker of Array.from(this.availableWorkers)) { | ||
for (const executor of Array.from(this.availableExecutors)) { | ||
try { | ||
console.debug('Killing Process', worker.id) | ||
worker.kill(); | ||
console.debug('Killing Process', executor.id) | ||
executor.kill(); | ||
} catch (e) { | ||
console.error('Error', worker.id, e); | ||
console.error('Error', executor.id, e); | ||
} | ||
@@ -84,0 +84,0 @@ } |
9765