poolifier
Advanced tools
Comparing version 2.2.2 to 2.3.0
@@ -33,3 +33,3 @@ /// <reference types="node" /> | ||
* | ||
* @default 60.000 ms | ||
* @default 60000 ms | ||
*/ | ||
@@ -82,2 +82,6 @@ maxInactiveTime?: number; | ||
/** | ||
* Task runtime. | ||
*/ | ||
readonly taskRunTime?: number; | ||
/** | ||
* Reference to main worker. | ||
@@ -95,3 +99,3 @@ * | ||
*/ | ||
interface PromiseWorkerResponseWrapper<Worker extends AbstractPoolWorker, Response = unknown> { | ||
interface PromiseWorkerResponseWrapper<Worker extends IPoolWorker, Response = unknown> { | ||
/** | ||
@@ -127,10 +131,6 @@ * Resolve callback to fulfill the promise. | ||
/** | ||
* Basic interface that describes the minimum required implementation of listener events for a pool worker. | ||
* Interface that describes the minimum required implementation of listener events for a pool worker. | ||
*/ | ||
interface IPoolWorker { | ||
/** | ||
* Worker identifier. | ||
*/ | ||
readonly id?: number; | ||
/** | ||
* Register a listener to the message event. | ||
@@ -172,17 +172,2 @@ * | ||
/** | ||
* Basic class that implement the minimum required for a pool worker. | ||
*/ | ||
declare abstract class AbstractPoolWorker implements IPoolWorker { | ||
/** @inheritDoc */ | ||
abstract on(event: "message", handler: MessageHandler<this>): void; | ||
/** @inheritDoc */ | ||
abstract on(event: "error", handler: ErrorHandler<this>): void; | ||
/** @inheritDoc */ | ||
abstract on(event: "online", handler: OnlineHandler<this>): void; | ||
/** @inheritDoc */ | ||
abstract on(event: "exit", handler: ExitHandler<this>): void; | ||
/** @inheritDoc */ | ||
abstract once(event: "exit", handler: ExitHandler<this>): void; | ||
} | ||
/** | ||
* Enumeration of worker choice strategies. | ||
@@ -193,2 +178,4 @@ */ | ||
readonly LESS_RECENTLY_USED: "LESS_RECENTLY_USED"; | ||
readonly FAIR_SHARE: "FAIR_SHARE"; | ||
readonly WEIGHTED_ROUND_ROBIN: "WEIGHTED_ROUND_ROBIN"; | ||
}>; | ||
@@ -200,2 +187,31 @@ /** | ||
/** | ||
* Pool tasks usage statistics requirements. | ||
*/ | ||
type RequiredStatistics = { | ||
runTime: boolean; | ||
}; | ||
/** | ||
* Worker choice strategy interface. | ||
* | ||
* @template Worker Type of worker which manages the strategy. | ||
*/ | ||
interface IWorkerChoiceStrategy<Worker extends IPoolWorker> { | ||
/** | ||
* Is the pool attached to the strategy dynamic?. | ||
*/ | ||
readonly isDynamicPool: boolean; | ||
/** | ||
* Required pool tasks usage statistics. | ||
*/ | ||
readonly requiredStatistics: RequiredStatistics; | ||
/** | ||
* Resets strategy internals (counters, statistics, etc.). | ||
*/ | ||
reset(): boolean; | ||
/** | ||
* Chooses a worker in the pool. | ||
*/ | ||
choose(): Worker; | ||
} | ||
/** | ||
* Options for a poolifier pool. | ||
@@ -221,3 +237,3 @@ */ | ||
/** | ||
* The work choice strategy to use in this pool. | ||
* The worker choice strategy to use in this pool. | ||
*/ | ||
@@ -240,3 +256,3 @@ workerChoiceStrategy?: WorkerChoiceStrategy; | ||
/** | ||
* Perform the task specified in the constructor with the data parameter. | ||
* Performs the task specified in the constructor with the data parameter. | ||
* | ||
@@ -248,7 +264,7 @@ * @param data The input for the specified task. This can only be serializable data. | ||
/** | ||
* Shut down every current worker in this pool. | ||
* Shutdowns every current worker in this pool. | ||
*/ | ||
destroy(): Promise<void>; | ||
/** | ||
* Set the worker choice strategy in this pool. | ||
* Sets the worker choice strategy in this pool. | ||
* | ||
@@ -267,2 +283,11 @@ * @param workerChoiceStrategy The worker choice strategy. | ||
/** | ||
* Tasks usage statistics. | ||
*/ | ||
interface TasksUsage { | ||
run: number; | ||
running: number; | ||
runTime: number; | ||
avgRunTime: number; | ||
} | ||
/** | ||
* Internal poolifier pool emitter. | ||
@@ -279,3 +304,3 @@ */ | ||
*/ | ||
interface IPoolInternal<Worker extends AbstractPoolWorker, Data = unknown, Response = unknown> extends IPool<Data, Response> { | ||
interface IPoolInternal<Worker extends IPoolWorker, Data = unknown, Response = unknown> extends IPool<Data, Response> { | ||
/** | ||
@@ -286,8 +311,8 @@ * List of currently available workers. | ||
/** | ||
* The tasks map. | ||
* The workers tasks usage map. | ||
* | ||
* - `key`: The `Worker` | ||
* - `value`: Number of tasks currently in progress on the worker. | ||
* `key`: The `Worker` | ||
* `value`: Worker tasks usage statistics. | ||
*/ | ||
readonly tasks: Map<Worker, number>; | ||
readonly workersTasksUsage: Map<Worker, TasksUsage>; | ||
/** | ||
@@ -345,2 +370,9 @@ * Emitter on which events can be listened to. | ||
getWorkerRunningTasks(worker: Worker): number | undefined; | ||
/** | ||
* Get worker average tasks runtime. | ||
* | ||
* @param worker The worker. | ||
* @returns The average tasks runtime on the worker. | ||
*/ | ||
getWorkerAverageTasksRunTime(worker: Worker): number | undefined; | ||
} | ||
@@ -354,3 +386,3 @@ /** | ||
*/ | ||
declare class WorkerChoiceStrategyContext<Worker extends AbstractPoolWorker, Data, Response> { | ||
declare class WorkerChoiceStrategyContext<Worker extends IPoolWorker, Data, Response> { | ||
private readonly pool; | ||
@@ -368,3 +400,3 @@ private createDynamicallyWorkerCallback; | ||
/** | ||
* Get the worker choice strategy instance specific to the pool type. | ||
* Gets the worker choice strategy instance specific to the pool type. | ||
* | ||
@@ -376,4 +408,10 @@ * @param workerChoiceStrategy The worker choice strategy. | ||
/** | ||
* Set the worker choice strategy to use in the context. | ||
* Gets the worker choice strategy used in the context. | ||
* | ||
* @returns The worker choice strategy. | ||
*/ | ||
getWorkerChoiceStrategy(): IWorkerChoiceStrategy<Worker>; | ||
/** | ||
* Sets the worker choice strategy to use in the context. | ||
* | ||
* @param workerChoiceStrategy The worker choice strategy to set. | ||
@@ -383,3 +421,3 @@ */ | ||
/** | ||
* Choose a worker with the underlying selection strategy. | ||
* Chooses a worker with the underlying selection strategy. | ||
* | ||
@@ -391,3 +429,3 @@ * @returns The chosen one. | ||
/** | ||
* Base class containing some shared logic for all poolifier pools. | ||
* Base class that implements some shared logic for all poolifier pools. | ||
* | ||
@@ -398,3 +436,3 @@ * @template Worker Type of worker which manages this pool. | ||
*/ | ||
declare abstract class AbstractPool<Worker extends AbstractPoolWorker, Data = unknown, Response = unknown> implements IPoolInternal<Worker, Data, Response> { | ||
declare abstract class AbstractPool<Worker extends IPoolWorker, Data = unknown, Response = unknown> implements IPoolInternal<Worker, Data, Response> { | ||
readonly numberOfWorkers: number; | ||
@@ -406,3 +444,3 @@ readonly filePath: string; | ||
/** @inheritDoc */ | ||
readonly tasks: Map<Worker, number>; | ||
readonly workersTasksUsage: Map<Worker, TasksUsage>; | ||
/** @inheritDoc */ | ||
@@ -447,5 +485,7 @@ readonly emitter?: PoolEmitter; | ||
/** @inheritDoc */ | ||
getWorkerIndex(worker: Worker): number; | ||
/** @inheritDoc */ | ||
getWorkerRunningTasks(worker: Worker): number | undefined; | ||
/** @inheritDoc */ | ||
getWorkerIndex(worker: Worker): number; | ||
getWorkerAverageTasksRunTime(worker: Worker): number | undefined; | ||
/** @inheritDoc */ | ||
@@ -463,3 +503,3 @@ setWorkerChoiceStrategy(workerChoiceStrategy: WorkerChoiceStrategy): void; | ||
/** | ||
* Shut down given worker. | ||
* Shutdowns given worker. | ||
* | ||
@@ -479,21 +519,17 @@ * @param worker A worker within `workers`. | ||
/** | ||
* Increase the number of tasks that the given worker has applied. | ||
* Hook executed before the worker task promise resolution. | ||
* Can be overridden. | ||
* | ||
* @param worker Worker whose tasks are increased. | ||
* @param worker The worker. | ||
*/ | ||
protected increaseWorkersTask(worker: Worker): void; | ||
protected beforePromiseWorkerResponseHook(worker: Worker): void; | ||
/** | ||
* Decrease the number of tasks that the given worker has applied. | ||
* Hook executed after the worker task promise resolution. | ||
* Can be overridden. | ||
* | ||
* @param worker Worker whose tasks are decreased. | ||
* @param message The received message. | ||
* @param promise The Promise response. | ||
*/ | ||
protected decreaseWorkersTasks(worker: Worker): void; | ||
protected afterPromiseWorkerResponseHook(message: MessageValue<Response>, promise: PromiseWorkerResponseWrapper<Worker, Response>): void; | ||
/** | ||
* Step the number of tasks that the given worker has applied. | ||
* | ||
* @param worker Worker whose tasks are set. | ||
* @param step Worker number of tasks step. | ||
*/ | ||
private stepWorkerNumberOfTasks; | ||
/** | ||
* Removes the given worker from the pool. | ||
@@ -505,3 +541,3 @@ * | ||
/** | ||
* Choose a worker for the next task. | ||
* Chooses a worker for the next task. | ||
* | ||
@@ -514,3 +550,3 @@ * The default implementation uses a round robin algorithm to distribute the load. | ||
/** | ||
* Send a message to the given worker. | ||
* Sends a message to the given worker. | ||
* | ||
@@ -522,3 +558,3 @@ * @param worker The worker which should receive the message. | ||
/** | ||
* Register a listener callback on a given worker. | ||
* Registers a listener callback on a given worker. | ||
* | ||
@@ -555,2 +591,53 @@ * @param worker A worker. | ||
private checkAndEmitBusy; | ||
/** | ||
* Increases the number of tasks that the given worker has applied. | ||
* | ||
* @param worker Worker which running tasks is increased. | ||
*/ | ||
private increaseWorkerRunningTasks; | ||
/** | ||
* Decreases the number of tasks that the given worker has applied. | ||
* | ||
* @param worker Worker which running tasks is decreased. | ||
*/ | ||
private decreaseWorkerRunningTasks; | ||
/** | ||
* Steps the number of tasks that the given worker has applied. | ||
* | ||
* @param worker Worker which running tasks are stepped. | ||
* @param step Number of running tasks step. | ||
*/ | ||
private stepWorkerRunningTasks; | ||
/** | ||
* Steps the number of tasks that the given worker has run. | ||
* | ||
* @param worker Worker which has run tasks. | ||
* @param step Number of run tasks step. | ||
*/ | ||
private stepWorkerRunTasks; | ||
/** | ||
* Updates tasks runtime for the given worker. | ||
* | ||
* @param worker Worker which run the task. | ||
* @param taskRunTime Worker task runtime. | ||
*/ | ||
private updateWorkerTasksRunTime; | ||
/** | ||
* Initializes tasks usage statistics. | ||
* | ||
* @param worker The worker. | ||
*/ | ||
initWorkerTasksUsage(worker: Worker): void; | ||
/** | ||
* Removes worker tasks usage statistics. | ||
* | ||
* @param worker The worker. | ||
*/ | ||
private removeWorkerTasksUsage; | ||
/** | ||
* Resets worker tasks usage statistics. | ||
* | ||
* @param worker The worker. | ||
*/ | ||
private resetWorkerTasksUsage; | ||
} | ||
@@ -707,3 +794,3 @@ /** | ||
/** | ||
* Base class containing some shared logic for all poolifier workers. | ||
* Base class that implements some shared logic for all poolifier workers. | ||
* | ||
@@ -837,2 +924,2 @@ * @template MainWorker Type of main worker. | ||
export { DynamicClusterPool, FixedClusterPool, WorkerChoiceStrategies, DynamicThreadPool, FixedThreadPool, AbstractWorker, ClusterWorker$0 as ClusterWorker, ThreadWorker, KillBehaviors }; | ||
export type { ClusterPoolOptions, IPool, PoolOptions, ErrorHandler, ExitHandler, IPoolWorker, OnlineHandler, WorkerChoiceStrategy, ThreadWorkerWithMessageChannel, KillBehavior, WorkerOptions }; | ||
export type { ClusterPoolOptions, IPool, PoolOptions, ErrorHandler, ExitHandler, IPoolWorker, MessageHandler, OnlineHandler, WorkerChoiceStrategy, ThreadWorkerWithMessageChannel, KillBehavior, WorkerOptions }; |
@@ -1,1 +0,1 @@ | ||
"use strict";Object.defineProperty(exports,"__esModule",{value:!0});var e=require("events"),r=require("cluster"),t=require("worker_threads"),s=require("async_hooks");function o(e){return e&&"object"==typeof e&&"default"in e?e:{default:e}}var i,n=o(e),a=o(r);!function(e){e.FIXED="fixed",e.DYNAMIC="dynamic"}(i||(i={}));class h extends n.default{}const c=()=>{},l=Object.freeze({SOFT:"SOFT",HARD:"HARD"});const k=Object.freeze({ROUND_ROBIN:"ROUND_ROBIN",LESS_RECENTLY_USED:"LESS_RECENTLY_USED"});class u{constructor(e){this.pool=e,this.isDynamicPool=this.pool.type===i.DYNAMIC}}class p extends u{choose(){let e,r=1/0;for(const t of this.pool.workers){const s=this.pool.getWorkerRunningTasks(t);if(!1===this.isDynamicPool&&0===s)return t;s<r&&(e=t,r=s)}return e}}class d extends u{constructor(){super(...arguments),this.nextWorkerIndex=0}choose(){const e=this.pool.workers[this.nextWorkerIndex];return this.nextWorkerIndex=this.nextWorkerIndex===this.pool.workers.length-1?0:this.nextWorkerIndex+1,e}}class W{static getWorkerChoiceStrategy(e,r=k.ROUND_ROBIN){switch(r){case k.ROUND_ROBIN:return new d(e);case k.LESS_RECENTLY_USED:return new p(e);default:throw new Error(`Worker choice strategy '${r}' not found`)}}}class g extends u{constructor(e,r,t=k.ROUND_ROBIN){super(e),this.createDynamicallyWorkerCallback=r,this.workerChoiceStrategy=W.getWorkerChoiceStrategy(this.pool,t)}choose(){const e=this.pool.findFreeWorker();return e||(this.pool.busy?this.workerChoiceStrategy.choose():this.createDynamicallyWorkerCallback())}}class m{constructor(e,r,t=k.ROUND_ROBIN){this.pool=e,this.createDynamicallyWorkerCallback=r,this.setWorkerChoiceStrategy(t)}getPoolWorkerChoiceStrategy(e=k.ROUND_ROBIN){return this.pool.type===i.DYNAMIC?new g(this.pool,this.createDynamicallyWorkerCallback,e):W.getWorkerChoiceStrategy(this.pool,e)}setWorkerChoiceStrategy(e){this.workerChoiceStrategy=this.getPoolWorkerChoiceStrategy(e)}execute(){return this.workerChoiceStrategy.choose()}}class y{constructor(e,r,t){if(this.numberOfWorkers=e,this.filePath=r,this.opts=t,this.workers=[],this.tasks=new Map,this.promiseMap=new Map,this.nextMessageId=0,!this.isMain())throw new Error("Cannot start a pool from a worker!");this.checkNumberOfWorkers(this.numberOfWorkers),this.checkFilePath(this.filePath),this.checkPoolOptions(this.opts),this.setupHook();for(let e=1;e<=this.numberOfWorkers;e++)this.createAndSetupWorker();this.opts.enableEvents&&(this.emitter=new h),this.workerChoiceStrategyContext=new m(this,(()=>{const e=this.createAndSetupWorker();return this.registerWorkerMessageListener(e,(r=>{var t;t=l.HARD,(r.kill===t||0===this.getWorkerRunningTasks(e))&&this.destroyWorker(e)})),e}),this.opts.workerChoiceStrategy)}checkFilePath(e){if(!e)throw new Error("Please specify a file with a worker implementation")}checkNumberOfWorkers(e){if(null==e)throw new Error("Cannot instantiate a pool without specifying the number of workers");if(!Number.isSafeInteger(e))throw new Error("Cannot instantiate a pool with a non integer number of workers");if(e<0)throw new Error("Cannot instantiate a pool with a negative number of workers");if(this.type===i.FIXED&&0===e)throw new Error("Cannot instantiate a fixed pool with no worker")}checkPoolOptions(e){var r,t;this.opts.workerChoiceStrategy=null!==(r=e.workerChoiceStrategy)&&void 0!==r?r:k.ROUND_ROBIN,this.opts.enableEvents=null===(t=e.enableEvents)||void 0===t||t}get numberOfRunningTasks(){return this.promiseMap.size}getWorkerRunningTasks(e){return this.tasks.get(e)}getWorkerIndex(e){return this.workers.indexOf(e)}setWorkerChoiceStrategy(e){this.opts.workerChoiceStrategy=e,this.workerChoiceStrategyContext.setWorkerChoiceStrategy(e)}internalGetBusyStatus(){return this.numberOfRunningTasks>=this.numberOfWorkers&&!1===this.findFreeWorker()}findFreeWorker(){for(const e of this.workers)if(0===this.getWorkerRunningTasks(e))return e;return!1}execute(e){const r=this.chooseWorker(),t=++this.nextMessageId,s=this.internalExecute(r,t);return this.checkAndEmitBusy(),e=null!=e?e:{},this.sendToWorker(r,{data:e,id:t}),s}async destroy(){await Promise.all(this.workers.map((e=>this.destroyWorker(e))))}setupHook(){}increaseWorkersTask(e){this.stepWorkerNumberOfTasks(e,1)}decreaseWorkersTasks(e){this.stepWorkerNumberOfTasks(e,-1)}stepWorkerNumberOfTasks(e,r){const t=this.tasks.get(e);if(void 0===t)throw Error("Worker could not be found in tasks map");this.tasks.set(e,t+r)}removeWorker(e){this.workers.splice(this.getWorkerIndex(e),1),this.tasks.delete(e)}chooseWorker(){return this.workerChoiceStrategyContext.execute()}internalExecute(e,r){return this.increaseWorkersTask(e),new Promise(((t,s)=>{this.promiseMap.set(r,{resolve:t,reject:s,worker:e})}))}createAndSetupWorker(){var e,r,t,s;const o=this.createWorker();return o.on("message",null!==(e=this.opts.messageHandler)&&void 0!==e?e:c),o.on("error",null!==(r=this.opts.errorHandler)&&void 0!==r?r:c),o.on("online",null!==(t=this.opts.onlineHandler)&&void 0!==t?t:c),o.on("exit",null!==(s=this.opts.exitHandler)&&void 0!==s?s:c),o.once("exit",(()=>this.removeWorker(o))),this.workers.push(o),this.tasks.set(o,0),this.afterWorkerSetup(o),o}workerListener(){return e=>{if(void 0!==e.id){const r=this.promiseMap.get(e.id);void 0!==r&&(this.decreaseWorkersTasks(r.worker),e.error?r.reject(e.error):r.resolve(e.data),this.promiseMap.delete(e.id))}}}checkAndEmitBusy(){var e;this.opts.enableEvents&&this.busy&&(null===(e=this.emitter)||void 0===e||e.emit("busy"))}}class w extends y{constructor(e,r,t={}){super(e,r,t),this.opts=t}setupHook(){a.default.setupPrimary({exec:this.filePath})}isMain(){return a.default.isPrimary}destroyWorker(e){this.sendToWorker(e,{kill:1}),e.kill()}sendToWorker(e,r){e.send(r)}registerWorkerMessageListener(e,r){e.on("message",r)}createWorker(){return a.default.fork(this.opts.env)}afterWorkerSetup(e){this.registerWorkerMessageListener(e,super.workerListener())}get type(){return i.FIXED}get busy(){return this.internalGetBusyStatus()}}class f extends y{constructor(e,r,t={}){super(e,r,t)}isMain(){return t.isMainThread}async destroyWorker(e){this.sendToWorker(e,{kill:1}),await e.terminate()}sendToWorker(e,r){e.postMessage(r)}registerWorkerMessageListener(e,r){var t;null===(t=e.port2)||void 0===t||t.on("message",r)}createWorker(){return new t.Worker(this.filePath,{env:t.SHARE_ENV})}afterWorkerSetup(e){const{port1:r,port2:s}=new t.MessageChannel;e.postMessage({parent:r},[r]),e.port1=r,e.port2=s,this.registerWorkerMessageListener(e,super.workerListener())}get type(){return i.FIXED}get busy(){return this.internalGetBusyStatus()}}const v=l.SOFT;class x extends s.AsyncResource{constructor(e,r,t,s,o={killBehavior:v,maxInactiveTime:6e4}){var i,n;super(e),this.mainWorker=s,this.opts=o,this.checkFunctionInput(t),this.checkWorkerOptions(this.opts),this.lastTaskTimestamp=Date.now(),r||(this.aliveInterval=setInterval(this.checkAlive.bind(this),(null!==(i=this.opts.maxInactiveTime)&&void 0!==i?i:6e4)/2),this.checkAlive.bind(this)()),null===(n=this.mainWorker)||void 0===n||n.on("message",(e=>{this.messageListener(e,t)}))}messageListener(e,r){void 0!==e.data&&void 0!==e.id?this.opts.async?this.runInAsyncScope(this.runAsync.bind(this),this,r,e):this.runInAsyncScope(this.run.bind(this),this,r,e):void 0!==e.parent?this.mainWorker=e.parent:void 0!==e.kill&&(this.aliveInterval&&clearInterval(this.aliveInterval),this.emitDestroy())}checkWorkerOptions(e){var r,t;this.opts.killBehavior=null!==(r=e.killBehavior)&&void 0!==r?r:v,this.opts.maxInactiveTime=null!==(t=e.maxInactiveTime)&&void 0!==t?t:6e4,this.opts.async=!!e.async}checkFunctionInput(e){if(!e)throw new Error("fn parameter is mandatory")}getMainWorker(){if(!this.mainWorker)throw new Error("Main worker was not set");return this.mainWorker}checkAlive(){var e;Date.now()-this.lastTaskTimestamp>(null!==(e=this.opts.maxInactiveTime)&&void 0!==e?e:6e4)&&this.sendToMainWorker({kill:this.opts.killBehavior})}handleError(e){return e}run(e,r){try{const t=e(r.data);this.sendToMainWorker({data:t,id:r.id})}catch(e){const t=this.handleError(e);this.sendToMainWorker({error:t,id:r.id})}finally{this.lastTaskTimestamp=Date.now()}}runAsync(e,r){e(r.data).then((e=>(this.sendToMainWorker({data:e,id:r.id}),null))).catch((e=>{const t=this.handleError(e);this.sendToMainWorker({error:t,id:r.id})})).finally((()=>{this.lastTaskTimestamp=Date.now()})).catch(c)}}exports.AbstractWorker=x,exports.ClusterWorker=class extends x{constructor(e,r={}){super("worker-cluster-pool:poolifier",a.default.isPrimary,e,a.default.worker,r)}sendToMainWorker(e){this.getMainWorker().send(e)}handleError(e){return e instanceof Error?e.message:e}},exports.DynamicClusterPool=class extends w{constructor(e,r,t,s={}){super(e,t,s),this.max=r}get type(){return i.DYNAMIC}get busy(){return this.workers.length===this.max}},exports.DynamicThreadPool=class extends f{constructor(e,r,t,s={}){super(e,t,s),this.max=r}get type(){return i.DYNAMIC}get busy(){return this.workers.length===this.max}},exports.FixedClusterPool=w,exports.FixedThreadPool=f,exports.KillBehaviors=l,exports.ThreadWorker=class extends x{constructor(e,r={}){super("worker-thread-pool:poolifier",t.isMainThread,e,t.parentPort,r)}sendToMainWorker(e){this.getMainWorker().postMessage(e)}},exports.WorkerChoiceStrategies=k; | ||
"use strict";var e,r=require("events"),t=require("cluster"),s=require("os"),o=require("worker_threads"),i=require("async_hooks");!function(e){e.FIXED="fixed",e.DYNAMIC="dynamic"}(e||(e={}));class n extends r{}const a=()=>{},h=Object.freeze({SOFT:"SOFT",HARD:"HARD"});const k=Object.freeze({ROUND_ROBIN:"ROUND_ROBIN",LESS_RECENTLY_USED:"LESS_RECENTLY_USED",FAIR_SHARE:"FAIR_SHARE",WEIGHTED_ROUND_ROBIN:"WEIGHTED_ROUND_ROBIN"});class u{constructor(r){this.pool=r,this.isDynamicPool=this.pool.type===e.DYNAMIC,this.requiredStatistics={runTime:!1}}}class c extends u{constructor(){super(...arguments),this.requiredStatistics={runTime:!0},this.workerLastVirtualTaskTimestamp=new Map}reset(){return this.workerLastVirtualTaskTimestamp.clear(),!0}choose(){this.computeWorkerLastVirtualTaskTimestamp();let e,r=1/0;for(const t of this.pool.workers){const s=this.workerLastVirtualTaskTimestamp.get(t)?.end??0;s<r&&(r=s,e=t)}return e}computeWorkerLastVirtualTaskTimestamp(){for(const e of this.pool.workers){const r=Math.max(Date.now(),this.workerLastVirtualTaskTimestamp.get(e)?.end??-1/0),t=r+(this.pool.getWorkerAverageTasksRunTime(e)??0);this.workerLastVirtualTaskTimestamp.set(e,{start:r,end:t})}}}class l extends u{reset(){return!0}choose(){let e,r=1/0;for(const t of this.pool.workers){const s=this.pool.getWorkerRunningTasks(t);if(!1===this.isDynamicPool&&0===s)return t;s<r&&(e=t,r=s)}return e}}class p extends u{constructor(){super(...arguments),this.nextWorkerIndex=0}reset(){return this.nextWorkerIndex=0,!0}choose(){const e=this.pool.workers[this.nextWorkerIndex];return this.nextWorkerIndex=this.nextWorkerIndex===this.pool.workers.length-1?0:this.nextWorkerIndex+1,e}}class W extends u{constructor(e){super(e),this.requiredStatistics={runTime:!0},this.previousWorkerIndex=0,this.currentWorkerIndex=0,this.workersTaskRunTime=new Map,this.defaultWorkerWeight=this.computeWorkerWeight(),this.initWorkersTaskRunTime()}reset(){return this.previousWorkerIndex=0,this.currentWorkerIndex=0,this.workersTaskRunTime.clear(),this.initWorkersTaskRunTime(),!0}choose(){const e=this.pool.workers[this.currentWorkerIndex];!0===this.isDynamicPool&&!1===this.workersTaskRunTime.has(e)&&this.initWorkerTaskRunTime(e);const r=this.getWorkerVirtualTaskRunTime(e)??0,t=this.workersTaskRunTime.get(e)?.weight??this.defaultWorkerWeight;if(this.currentWorkerIndex===this.previousWorkerIndex){const s=(this.workersTaskRunTime.get(e)?.runTime??0)+r;this.setWorkerTaskRunTime(e,t,s)}else this.setWorkerTaskRunTime(e,t,0);return r<t?this.previousWorkerIndex=this.currentWorkerIndex:(this.previousWorkerIndex=this.currentWorkerIndex,this.currentWorkerIndex=this.pool.workers.length-1===this.currentWorkerIndex?0:this.currentWorkerIndex+1),this.pool.workers[this.currentWorkerIndex]}initWorkersTaskRunTime(){for(const e of this.pool.workers)this.initWorkerTaskRunTime(e)}initWorkerTaskRunTime(e){this.setWorkerTaskRunTime(e,this.defaultWorkerWeight,0)}setWorkerTaskRunTime(e,r,t){this.workersTaskRunTime.set(e,{weight:r,runTime:t})}getWorkerVirtualTaskRunTime(e){return this.pool.getWorkerAverageTasksRunTime(e)}computeWorkerWeight(){let e=0;for(const r of s.cpus()){const t=r.speed.toString().length-1;e+=1/(r.speed/Math.pow(10,t))*Math.pow(10,t)}return Math.round(e/s.cpus().length)}}class g{static getWorkerChoiceStrategy(e,r=k.ROUND_ROBIN){switch(r){case k.ROUND_ROBIN:return new p(e);case k.LESS_RECENTLY_USED:return new l(e);case k.FAIR_SHARE:return new c(e);case k.WEIGHTED_ROUND_ROBIN:return new W(e);default:throw new Error(`Worker choice strategy '${r}' not found`)}}}class d extends u{constructor(e,r,t=k.ROUND_ROBIN){super(e),this.createDynamicallyWorkerCallback=r,this.workerChoiceStrategy=g.getWorkerChoiceStrategy(this.pool,t),this.requiredStatistics=this.workerChoiceStrategy.requiredStatistics}reset(){return this.workerChoiceStrategy.reset()}choose(){const e=this.pool.findFreeWorker();return e||(!0===this.pool.busy?this.workerChoiceStrategy.choose():this.createDynamicallyWorkerCallback())}}class m{constructor(e,r,t=k.ROUND_ROBIN){this.pool=e,this.createDynamicallyWorkerCallback=r,this.setWorkerChoiceStrategy(t)}getPoolWorkerChoiceStrategy(r=k.ROUND_ROBIN){return this.pool.type===e.DYNAMIC?new d(this.pool,this.createDynamicallyWorkerCallback,r):g.getWorkerChoiceStrategy(this.pool,r)}getWorkerChoiceStrategy(){return this.workerChoiceStrategy}setWorkerChoiceStrategy(e){this.workerChoiceStrategy?.reset(),this.workerChoiceStrategy=this.getPoolWorkerChoiceStrategy(e)}execute(){return this.workerChoiceStrategy.choose()}}class T{constructor(e,r,t){if(this.numberOfWorkers=e,this.filePath=r,this.opts=t,this.workers=[],this.workersTasksUsage=new Map,this.promiseMap=new Map,this.nextMessageId=0,!this.isMain())throw new Error("Cannot start a pool from a worker!");this.checkNumberOfWorkers(this.numberOfWorkers),this.checkFilePath(this.filePath),this.checkPoolOptions(this.opts),this.setupHook();for(let e=1;e<=this.numberOfWorkers;e++)this.createAndSetupWorker();this.opts.enableEvents&&(this.emitter=new n),this.workerChoiceStrategyContext=new m(this,(()=>{const e=this.createAndSetupWorker();return this.registerWorkerMessageListener(e,(r=>{var t;t=h.HARD,(r.kill===t||0===this.getWorkerRunningTasks(e))&&this.destroyWorker(e)})),e}),this.opts.workerChoiceStrategy)}checkFilePath(e){if(!e)throw new Error("Please specify a file with a worker implementation")}checkNumberOfWorkers(r){if(null==r)throw new Error("Cannot instantiate a pool without specifying the number of workers");if(!1===Number.isSafeInteger(r))throw new Error("Cannot instantiate a pool with a non integer number of workers");if(r<0)throw new Error("Cannot instantiate a pool with a negative number of workers");if(this.type===e.FIXED&&0===r)throw new Error("Cannot instantiate a fixed pool with no worker")}checkPoolOptions(e){this.opts.workerChoiceStrategy=e.workerChoiceStrategy??k.ROUND_ROBIN,this.opts.enableEvents=e.enableEvents??!0}get numberOfRunningTasks(){return this.promiseMap.size}getWorkerIndex(e){return this.workers.indexOf(e)}getWorkerRunningTasks(e){return this.workersTasksUsage.get(e)?.running}getWorkerAverageTasksRunTime(e){return this.workersTasksUsage.get(e)?.avgRunTime}setWorkerChoiceStrategy(e){this.opts.workerChoiceStrategy=e;for(const e of this.workers)this.resetWorkerTasksUsage(e);this.workerChoiceStrategyContext.setWorkerChoiceStrategy(e)}internalGetBusyStatus(){return this.numberOfRunningTasks>=this.numberOfWorkers&&!1===this.findFreeWorker()}findFreeWorker(){for(const e of this.workers)if(0===this.getWorkerRunningTasks(e))return e;return!1}execute(e){const r=this.chooseWorker(),t=++this.nextMessageId,s=this.internalExecute(r,t);return this.checkAndEmitBusy(),e=e??{},this.sendToWorker(r,{data:e,id:t}),s}async destroy(){await Promise.all(this.workers.map((e=>this.destroyWorker(e))))}setupHook(){}beforePromiseWorkerResponseHook(e){this.increaseWorkerRunningTasks(e)}afterPromiseWorkerResponseHook(e,r){this.decreaseWorkerRunningTasks(r.worker),this.stepWorkerRunTasks(r.worker,1),this.updateWorkerTasksRunTime(r.worker,e.taskRunTime)}removeWorker(e){this.workers.splice(this.getWorkerIndex(e),1),this.removeWorkerTasksUsage(e)}chooseWorker(){return this.workerChoiceStrategyContext.execute()}internalExecute(e,r){return this.beforePromiseWorkerResponseHook(e),new Promise(((t,s)=>{this.promiseMap.set(r,{resolve:t,reject:s,worker:e})}))}createAndSetupWorker(){const e=this.createWorker();return e.on("message",this.opts.messageHandler??a),e.on("error",this.opts.errorHandler??a),e.on("online",this.opts.onlineHandler??a),e.on("exit",this.opts.exitHandler??a),e.once("exit",(()=>this.removeWorker(e))),this.workers.push(e),this.initWorkerTasksUsage(e),this.afterWorkerSetup(e),e}workerListener(){return e=>{if(void 0!==e.id){const r=this.promiseMap.get(e.id);void 0!==r&&(this.afterPromiseWorkerResponseHook(e,r),e.error?r.reject(e.error):r.resolve(e.data),this.promiseMap.delete(e.id))}}}checkAndEmitBusy(){this.opts.enableEvents&&this.busy&&this.emitter?.emit("busy")}increaseWorkerRunningTasks(e){this.stepWorkerRunningTasks(e,1)}decreaseWorkerRunningTasks(e){this.stepWorkerRunningTasks(e,-1)}stepWorkerRunningTasks(e,r){const t=this.workersTasksUsage.get(e);if(void 0===t)throw new Error("Worker could not be found in worker tasks usage map");t.running=t.running+r,this.workersTasksUsage.set(e,t)}stepWorkerRunTasks(e,r){const t=this.workersTasksUsage.get(e);if(void 0===t)throw new Error("Worker could not be found in worker tasks usage map");t.run=t.run+r,this.workersTasksUsage.set(e,t)}updateWorkerTasksRunTime(e,r){if(!0===this.workerChoiceStrategyContext.getWorkerChoiceStrategy().requiredStatistics.runTime){const t=this.workersTasksUsage.get(e);if(void 0===t)throw new Error("Worker could not be found in worker tasks usage map");t.runTime+=r??0,0!==t.run&&(t.avgRunTime=t.runTime/t.run),this.workersTasksUsage.set(e,t)}}initWorkerTasksUsage(e){this.workersTasksUsage.set(e,{run:0,running:0,runTime:0,avgRunTime:0})}removeWorkerTasksUsage(e){this.workersTasksUsage.delete(e)}resetWorkerTasksUsage(e){this.removeWorkerTasksUsage(e),this.initWorkerTasksUsage(e)}}class w extends T{constructor(e,r,t={}){super(e,r,t),this.opts=t}setupHook(){t.setupPrimary({exec:this.filePath})}isMain(){return t.isPrimary}destroyWorker(e){this.sendToWorker(e,{kill:1}),e.kill()}sendToWorker(e,r){e.send(r)}registerWorkerMessageListener(e,r){e.on("message",r)}createWorker(){return t.fork(this.opts.env)}afterWorkerSetup(e){this.registerWorkerMessageListener(e,super.workerListener())}get type(){return e.FIXED}get busy(){return this.internalGetBusyStatus()}}class y extends T{constructor(e,r,t={}){super(e,r,t)}isMain(){return o.isMainThread}async destroyWorker(e){this.sendToWorker(e,{kill:1}),await e.terminate()}sendToWorker(e,r){e.postMessage(r)}registerWorkerMessageListener(e,r){e.port2?.on("message",r)}createWorker(){return new o.Worker(this.filePath,{env:o.SHARE_ENV})}afterWorkerSetup(e){const{port1:r,port2:t}=new o.MessageChannel;e.postMessage({parent:r},[r]),e.port1=r,e.port2=t,this.registerWorkerMessageListener(e,super.workerListener())}get type(){return e.FIXED}get busy(){return this.internalGetBusyStatus()}}const R=h.SOFT;class f extends i.AsyncResource{constructor(e,r,t,s,o={killBehavior:R,maxInactiveTime:6e4}){super(e),this.mainWorker=s,this.opts=o,this.checkFunctionInput(t),this.checkWorkerOptions(this.opts),this.lastTaskTimestamp=Date.now(),!1===r&&(this.aliveInterval=setInterval(this.checkAlive.bind(this),(this.opts.maxInactiveTime??6e4)/2),this.checkAlive.bind(this)()),this.mainWorker?.on("message",(e=>{this.messageListener(e,t)}))}messageListener(e,r){void 0!==e.data&&void 0!==e.id?this.opts.async?this.runInAsyncScope(this.runAsync.bind(this),this,r,e):this.runInAsyncScope(this.run.bind(this),this,r,e):void 0!==e.parent?this.mainWorker=e.parent:void 0!==e.kill&&(this.aliveInterval&&clearInterval(this.aliveInterval),this.emitDestroy())}checkWorkerOptions(e){this.opts.killBehavior=e.killBehavior??R,this.opts.maxInactiveTime=e.maxInactiveTime??6e4,this.opts.async=!!e.async}checkFunctionInput(e){if(!e)throw new Error("fn parameter is mandatory")}getMainWorker(){if(!this.mainWorker)throw new Error("Main worker was not set");return this.mainWorker}checkAlive(){Date.now()-this.lastTaskTimestamp>(this.opts.maxInactiveTime??6e4)&&this.sendToMainWorker({kill:this.opts.killBehavior})}handleError(e){return e}run(e,r){try{const t=Date.now(),s=e(r.data),o=Date.now()-t;this.sendToMainWorker({data:s,id:r.id,taskRunTime:o})}catch(e){const t=this.handleError(e);this.sendToMainWorker({error:t,id:r.id})}finally{this.lastTaskTimestamp=Date.now()}}runAsync(e,r){const t=Date.now();e(r.data).then((e=>{const s=Date.now()-t;return this.sendToMainWorker({data:e,id:r.id,taskRunTime:s}),null})).catch((e=>{const t=this.handleError(e);this.sendToMainWorker({error:t,id:r.id})})).finally((()=>{this.lastTaskTimestamp=Date.now()})).catch(a)}}exports.AbstractWorker=f,exports.ClusterWorker=class extends f{constructor(e,r={}){super("worker-cluster-pool:poolifier",t.isPrimary,e,t.worker,r)}sendToMainWorker(e){this.getMainWorker().send(e)}handleError(e){return e instanceof Error?e.message:e}},exports.DynamicClusterPool=class extends w{constructor(e,r,t,s={}){super(e,t,s),this.max=r}get type(){return e.DYNAMIC}get busy(){return this.workers.length===this.max}},exports.DynamicThreadPool=class extends y{constructor(e,r,t,s={}){super(e,t,s),this.max=r}get type(){return e.DYNAMIC}get busy(){return this.workers.length===this.max}},exports.FixedClusterPool=w,exports.FixedThreadPool=y,exports.KillBehaviors=h,exports.ThreadWorker=class extends f{constructor(e,r={}){super("worker-thread-pool:poolifier",o.isMainThread,e,o.parentPort,r)}sendToMainWorker(e){this.getMainWorker().postMessage(e)}},exports.WorkerChoiceStrategies=k; |
{ | ||
"name": "poolifier", | ||
"version": "2.2.2", | ||
"version": "2.3.0", | ||
"description": "A fast, easy to use Node.js Worker Thread Pool and Cluster Pool implementation", | ||
"main": "lib/index.js", | ||
"scripts": { | ||
"prepare": "node prepare.js", | ||
"build": "rollup --config --environment BUILD:development", | ||
@@ -20,2 +21,3 @@ "build:typedoc": "rollup --config --environment BUILD:development --environment DOCUMENTATION", | ||
"lint:fix": "eslint . --cache --fix", | ||
"lint:report": "eslint . --cache --format json --output-file reports/eslint.json", | ||
"typedoc": "typedoc", | ||
@@ -66,5 +68,5 @@ "sonar:properties": "./updateSonarProps.sh", | ||
"devDependencies": { | ||
"@types/node": "^18.8.3", | ||
"@typescript-eslint/eslint-plugin": "^5.39.0", | ||
"@typescript-eslint/parser": "^5.39.0", | ||
"@types/node": "^18.8.4", | ||
"@typescript-eslint/eslint-plugin": "^5.40.0", | ||
"@typescript-eslint/parser": "^5.40.0", | ||
"benchmark": "^2.1.4", | ||
@@ -80,5 +82,7 @@ "eslint": "^8.25.0", | ||
"eslint-plugin-prettierx": "^0.18.0", | ||
"eslint-plugin-promise": "^6.0.1", | ||
"eslint-plugin-promise": "^6.1.0", | ||
"eslint-plugin-spellcheck": "^0.0.19", | ||
"expect": "^29.1.2", | ||
"husky": "^8.0.1", | ||
"lint-staged": "^13.0.3", | ||
"microtime": "^3.1.1", | ||
@@ -91,3 +95,3 @@ "mocha": "^10.0.0", | ||
"prettierx": "^0.18.3", | ||
"rollup": "^2.79.1", | ||
"rollup": "^3.1.0", | ||
"rollup-plugin-analyzer": "^4.0.0", | ||
@@ -101,3 +105,3 @@ "rollup-plugin-command": "^1.1.3", | ||
"source-map-support": "^0.5.21", | ||
"typedoc": "^0.23.15", | ||
"typedoc": "^0.23.16", | ||
"typescript": "^4.8.4" | ||
@@ -104,0 +108,0 @@ }, |
@@ -159,10 +159,14 @@ <div align="center"> | ||
- `messageHandler` (optional) - A function that will listen for message event on each worker | ||
- `errorHandler` (optional) - A function that will listen for error event on each worker | ||
- `onlineHandler` (optional) - A function that will listen for online event on each worker | ||
- `exitHandler` (optional) - A function that will listen for exit event on each worker | ||
- `workerChoiceStrategy` (optional) - The work choice strategy to use in this pool: | ||
- `workerChoiceStrategy` (optional) - The worker choice strategy to use in this pool: | ||
- `WorkerChoiceStrategies.ROUND_ROBIN`: Submit tasks to worker in this pool in a round robbin fashion | ||
- `WorkerChoiceStrategies.LESS_RECENTLY_USED`: Submit tasks to the less recently used worker in the pool | ||
- `WorkerChoiceStrategies.ROUND_ROBIN`: Submit tasks to worker in a round robbin fashion | ||
- `WorkerChoiceStrategies.LESS_RECENTLY_USED`: Submit tasks to the less recently used worker | ||
- `WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN` Submit tasks to worker using a weighted round robin scheduling algorithm based on tasks execution time | ||
- `WorkerChoiceStrategies.FAIR_SHARE`: Submit tasks to worker using a fair share tasks scheduling algorithm based on tasks execution time | ||
`WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN` and `WorkerChoiceStrategies.FAIR_SHARE` strategies are targeted to heavy and long tasks | ||
Default: `WorkerChoiceStrategies.ROUND_ROBIN` | ||
@@ -198,5 +202,5 @@ | ||
If `killBehavior` is set to `KillBehaviors.SOFT` your tasks have no timeout and your workers will not be terminated until your task is completed. | ||
Default: 60.000 ms | ||
Default: 60000 ms | ||
- `async` - true/false, true if your function contains async pieces else false | ||
- `async` - true/false, true if your function contains async code pieces, else false | ||
- `killBehavior` - Dictates if your async unit (worker/process) will be deleted in case that a task is active on it. | ||
@@ -249,3 +253,3 @@ **KillBehaviors.SOFT**: If `currentTime - lastActiveTime` is greater than `maxInactiveTime` but a task is still running, then the worker **won't** be deleted. | ||
See guidelines [CONTRIBUTING](CONTRIBUTING.md) | ||
Choose your task here [2.0.0](https://github.com/poolifier/poolifier/projects/1), propose an idea, a fix, an improvement. | ||
Choose your task here [2.3.0](https://github.com/orgs/poolifier/projects/1), propose an idea, a fix, an improvement. | ||
@@ -252,0 +256,0 @@ ## Team |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
64852
944
269
36
6