Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

poolifier

Package Overview
Dependencies
Maintainers
1
Versions
192
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

poolifier - npm Package Compare versions

Comparing version 2.2.2 to 2.3.0

203

lib/index.d.ts

@@ -33,3 +33,3 @@ /// <reference types="node" />

*
* @default 60.000 ms
* @default 60000 ms
*/

@@ -82,2 +82,6 @@ maxInactiveTime?: number;

/**
* Task runtime.
*/
readonly taskRunTime?: number;
/**
* Reference to main worker.

@@ -95,3 +99,3 @@ *

*/
interface PromiseWorkerResponseWrapper<Worker extends AbstractPoolWorker, Response = unknown> {
interface PromiseWorkerResponseWrapper<Worker extends IPoolWorker, Response = unknown> {
/**

@@ -127,10 +131,6 @@ * Resolve callback to fulfill the promise.

/**
* Basic interface that describes the minimum required implementation of listener events for a pool worker.
* Interface that describes the minimum required implementation of listener events for a pool worker.
*/
interface IPoolWorker {
/**
* Worker identifier.
*/
readonly id?: number;
/**
* Register a listener to the message event.

@@ -172,17 +172,2 @@ *

/**
* Basic class that implement the minimum required for a pool worker.
*/
declare abstract class AbstractPoolWorker implements IPoolWorker {
/** @inheritDoc */
abstract on(event: "message", handler: MessageHandler<this>): void;
/** @inheritDoc */
abstract on(event: "error", handler: ErrorHandler<this>): void;
/** @inheritDoc */
abstract on(event: "online", handler: OnlineHandler<this>): void;
/** @inheritDoc */
abstract on(event: "exit", handler: ExitHandler<this>): void;
/** @inheritDoc */
abstract once(event: "exit", handler: ExitHandler<this>): void;
}
/**
* Enumeration of worker choice strategies.

@@ -193,2 +178,4 @@ */

readonly LESS_RECENTLY_USED: "LESS_RECENTLY_USED";
readonly FAIR_SHARE: "FAIR_SHARE";
readonly WEIGHTED_ROUND_ROBIN: "WEIGHTED_ROUND_ROBIN";
}>;

@@ -200,2 +187,31 @@ /**

/**
* Pool tasks usage statistics requirements.
*/
type RequiredStatistics = {
runTime: boolean;
};
/**
* Worker choice strategy interface.
*
* @template Worker Type of worker which manages the strategy.
*/
interface IWorkerChoiceStrategy<Worker extends IPoolWorker> {
/**
* Is the pool attached to the strategy dynamic?.
*/
readonly isDynamicPool: boolean;
/**
* Required pool tasks usage statistics.
*/
readonly requiredStatistics: RequiredStatistics;
/**
* Resets strategy internals (counters, statistics, etc.).
*/
reset(): boolean;
/**
* Chooses a worker in the pool.
*/
choose(): Worker;
}
/**
* Options for a poolifier pool.

@@ -221,3 +237,3 @@ */

/**
* The work choice strategy to use in this pool.
* The worker choice strategy to use in this pool.
*/

@@ -240,3 +256,3 @@ workerChoiceStrategy?: WorkerChoiceStrategy;

/**
* Perform the task specified in the constructor with the data parameter.
* Performs the task specified in the constructor with the data parameter.
*

@@ -248,7 +264,7 @@ * @param data The input for the specified task. This can only be serializable data.

/**
* Shut down every current worker in this pool.
* Shutdowns every current worker in this pool.
*/
destroy(): Promise<void>;
/**
* Set the worker choice strategy in this pool.
* Sets the worker choice strategy in this pool.
*

@@ -267,2 +283,11 @@ * @param workerChoiceStrategy The worker choice strategy.

/**
* Tasks usage statistics.
*/
interface TasksUsage {
run: number;
running: number;
runTime: number;
avgRunTime: number;
}
/**
* Internal poolifier pool emitter.

@@ -279,3 +304,3 @@ */

*/
interface IPoolInternal<Worker extends AbstractPoolWorker, Data = unknown, Response = unknown> extends IPool<Data, Response> {
interface IPoolInternal<Worker extends IPoolWorker, Data = unknown, Response = unknown> extends IPool<Data, Response> {
/**

@@ -286,8 +311,8 @@ * List of currently available workers.

/**
* The tasks map.
* The workers tasks usage map.
*
* - `key`: The `Worker`
* - `value`: Number of tasks currently in progress on the worker.
* `key`: The `Worker`
* `value`: Worker tasks usage statistics.
*/
readonly tasks: Map<Worker, number>;
readonly workersTasksUsage: Map<Worker, TasksUsage>;
/**

@@ -345,2 +370,9 @@ * Emitter on which events can be listened to.

getWorkerRunningTasks(worker: Worker): number | undefined;
/**
* Get worker average tasks runtime.
*
* @param worker The worker.
* @returns The average tasks runtime on the worker.
*/
getWorkerAverageTasksRunTime(worker: Worker): number | undefined;
}

@@ -354,3 +386,3 @@ /**

*/
declare class WorkerChoiceStrategyContext<Worker extends AbstractPoolWorker, Data, Response> {
declare class WorkerChoiceStrategyContext<Worker extends IPoolWorker, Data, Response> {
private readonly pool;

@@ -368,3 +400,3 @@ private createDynamicallyWorkerCallback;

/**
* Get the worker choice strategy instance specific to the pool type.
* Gets the worker choice strategy instance specific to the pool type.
*

@@ -376,4 +408,10 @@ * @param workerChoiceStrategy The worker choice strategy.

/**
* Set the worker choice strategy to use in the context.
* Gets the worker choice strategy used in the context.
*
* @returns The worker choice strategy.
*/
getWorkerChoiceStrategy(): IWorkerChoiceStrategy<Worker>;
/**
* Sets the worker choice strategy to use in the context.
*
* @param workerChoiceStrategy The worker choice strategy to set.

@@ -383,3 +421,3 @@ */

/**
* Choose a worker with the underlying selection strategy.
* Chooses a worker with the underlying selection strategy.
*

@@ -391,3 +429,3 @@ * @returns The chosen one.

/**
* Base class containing some shared logic for all poolifier pools.
* Base class that implements some shared logic for all poolifier pools.
*

@@ -398,3 +436,3 @@ * @template Worker Type of worker which manages this pool.

*/
declare abstract class AbstractPool<Worker extends AbstractPoolWorker, Data = unknown, Response = unknown> implements IPoolInternal<Worker, Data, Response> {
declare abstract class AbstractPool<Worker extends IPoolWorker, Data = unknown, Response = unknown> implements IPoolInternal<Worker, Data, Response> {
readonly numberOfWorkers: number;

@@ -406,3 +444,3 @@ readonly filePath: string;

/** @inheritDoc */
readonly tasks: Map<Worker, number>;
readonly workersTasksUsage: Map<Worker, TasksUsage>;
/** @inheritDoc */

@@ -447,5 +485,7 @@ readonly emitter?: PoolEmitter;

/** @inheritDoc */
getWorkerIndex(worker: Worker): number;
/** @inheritDoc */
getWorkerRunningTasks(worker: Worker): number | undefined;
/** @inheritDoc */
getWorkerIndex(worker: Worker): number;
getWorkerAverageTasksRunTime(worker: Worker): number | undefined;
/** @inheritDoc */

@@ -463,3 +503,3 @@ setWorkerChoiceStrategy(workerChoiceStrategy: WorkerChoiceStrategy): void;

/**
* Shut down given worker.
* Shutdowns given worker.
*

@@ -479,21 +519,17 @@ * @param worker A worker within `workers`.

/**
* Increase the number of tasks that the given worker has applied.
* Hook executed before the worker task promise resolution.
* Can be overridden.
*
* @param worker Worker whose tasks are increased.
* @param worker The worker.
*/
protected increaseWorkersTask(worker: Worker): void;
protected beforePromiseWorkerResponseHook(worker: Worker): void;
/**
* Decrease the number of tasks that the given worker has applied.
* Hook executed after the worker task promise resolution.
* Can be overridden.
*
* @param worker Worker whose tasks are decreased.
* @param message The received message.
* @param promise The Promise response.
*/
protected decreaseWorkersTasks(worker: Worker): void;
protected afterPromiseWorkerResponseHook(message: MessageValue<Response>, promise: PromiseWorkerResponseWrapper<Worker, Response>): void;
/**
* Step the number of tasks that the given worker has applied.
*
* @param worker Worker whose tasks are set.
* @param step Worker number of tasks step.
*/
private stepWorkerNumberOfTasks;
/**
* Removes the given worker from the pool.

@@ -505,3 +541,3 @@ *

/**
* Choose a worker for the next task.
* Chooses a worker for the next task.
*

@@ -514,3 +550,3 @@ * The default implementation uses a round robin algorithm to distribute the load.

/**
* Send a message to the given worker.
* Sends a message to the given worker.
*

@@ -522,3 +558,3 @@ * @param worker The worker which should receive the message.

/**
* Register a listener callback on a given worker.
* Registers a listener callback on a given worker.
*

@@ -555,2 +591,53 @@ * @param worker A worker.

private checkAndEmitBusy;
/**
* Increases the number of tasks that the given worker has applied.
*
* @param worker Worker which running tasks is increased.
*/
private increaseWorkerRunningTasks;
/**
* Decreases the number of tasks that the given worker has applied.
*
* @param worker Worker which running tasks is decreased.
*/
private decreaseWorkerRunningTasks;
/**
* Steps the number of tasks that the given worker has applied.
*
* @param worker Worker which running tasks are stepped.
* @param step Number of running tasks step.
*/
private stepWorkerRunningTasks;
/**
* Steps the number of tasks that the given worker has run.
*
* @param worker Worker which has run tasks.
* @param step Number of run tasks step.
*/
private stepWorkerRunTasks;
/**
* Updates tasks runtime for the given worker.
*
* @param worker Worker which run the task.
* @param taskRunTime Worker task runtime.
*/
private updateWorkerTasksRunTime;
/**
* Initializes tasks usage statistics.
*
* @param worker The worker.
*/
initWorkerTasksUsage(worker: Worker): void;
/**
* Removes worker tasks usage statistics.
*
* @param worker The worker.
*/
private removeWorkerTasksUsage;
/**
* Resets worker tasks usage statistics.
*
* @param worker The worker.
*/
private resetWorkerTasksUsage;
}

@@ -707,3 +794,3 @@ /**

/**
* Base class containing some shared logic for all poolifier workers.
* Base class that implements some shared logic for all poolifier workers.
*

@@ -837,2 +924,2 @@ * @template MainWorker Type of main worker.

export { DynamicClusterPool, FixedClusterPool, WorkerChoiceStrategies, DynamicThreadPool, FixedThreadPool, AbstractWorker, ClusterWorker$0 as ClusterWorker, ThreadWorker, KillBehaviors };
export type { ClusterPoolOptions, IPool, PoolOptions, ErrorHandler, ExitHandler, IPoolWorker, OnlineHandler, WorkerChoiceStrategy, ThreadWorkerWithMessageChannel, KillBehavior, WorkerOptions };
export type { ClusterPoolOptions, IPool, PoolOptions, ErrorHandler, ExitHandler, IPoolWorker, MessageHandler, OnlineHandler, WorkerChoiceStrategy, ThreadWorkerWithMessageChannel, KillBehavior, WorkerOptions };

@@ -1,1 +0,1 @@

"use strict";Object.defineProperty(exports,"__esModule",{value:!0});var e=require("events"),r=require("cluster"),t=require("worker_threads"),s=require("async_hooks");function o(e){return e&&"object"==typeof e&&"default"in e?e:{default:e}}var i,n=o(e),a=o(r);!function(e){e.FIXED="fixed",e.DYNAMIC="dynamic"}(i||(i={}));class h extends n.default{}const c=()=>{},l=Object.freeze({SOFT:"SOFT",HARD:"HARD"});const k=Object.freeze({ROUND_ROBIN:"ROUND_ROBIN",LESS_RECENTLY_USED:"LESS_RECENTLY_USED"});class u{constructor(e){this.pool=e,this.isDynamicPool=this.pool.type===i.DYNAMIC}}class p extends u{choose(){let e,r=1/0;for(const t of this.pool.workers){const s=this.pool.getWorkerRunningTasks(t);if(!1===this.isDynamicPool&&0===s)return t;s<r&&(e=t,r=s)}return e}}class d extends u{constructor(){super(...arguments),this.nextWorkerIndex=0}choose(){const e=this.pool.workers[this.nextWorkerIndex];return this.nextWorkerIndex=this.nextWorkerIndex===this.pool.workers.length-1?0:this.nextWorkerIndex+1,e}}class W{static getWorkerChoiceStrategy(e,r=k.ROUND_ROBIN){switch(r){case k.ROUND_ROBIN:return new d(e);case k.LESS_RECENTLY_USED:return new p(e);default:throw new Error(`Worker choice strategy '${r}' not found`)}}}class g extends u{constructor(e,r,t=k.ROUND_ROBIN){super(e),this.createDynamicallyWorkerCallback=r,this.workerChoiceStrategy=W.getWorkerChoiceStrategy(this.pool,t)}choose(){const e=this.pool.findFreeWorker();return e||(this.pool.busy?this.workerChoiceStrategy.choose():this.createDynamicallyWorkerCallback())}}class m{constructor(e,r,t=k.ROUND_ROBIN){this.pool=e,this.createDynamicallyWorkerCallback=r,this.setWorkerChoiceStrategy(t)}getPoolWorkerChoiceStrategy(e=k.ROUND_ROBIN){return this.pool.type===i.DYNAMIC?new g(this.pool,this.createDynamicallyWorkerCallback,e):W.getWorkerChoiceStrategy(this.pool,e)}setWorkerChoiceStrategy(e){this.workerChoiceStrategy=this.getPoolWorkerChoiceStrategy(e)}execute(){return this.workerChoiceStrategy.choose()}}class y{constructor(e,r,t){if(this.numberOfWorkers=e,this.filePath=r,this.opts=t,this.workers=[],this.tasks=new Map,this.promiseMap=new Map,this.nextMessageId=0,!this.isMain())throw new Error("Cannot start a pool from a worker!");this.checkNumberOfWorkers(this.numberOfWorkers),this.checkFilePath(this.filePath),this.checkPoolOptions(this.opts),this.setupHook();for(let e=1;e<=this.numberOfWorkers;e++)this.createAndSetupWorker();this.opts.enableEvents&&(this.emitter=new h),this.workerChoiceStrategyContext=new m(this,(()=>{const e=this.createAndSetupWorker();return this.registerWorkerMessageListener(e,(r=>{var t;t=l.HARD,(r.kill===t||0===this.getWorkerRunningTasks(e))&&this.destroyWorker(e)})),e}),this.opts.workerChoiceStrategy)}checkFilePath(e){if(!e)throw new Error("Please specify a file with a worker implementation")}checkNumberOfWorkers(e){if(null==e)throw new Error("Cannot instantiate a pool without specifying the number of workers");if(!Number.isSafeInteger(e))throw new Error("Cannot instantiate a pool with a non integer number of workers");if(e<0)throw new Error("Cannot instantiate a pool with a negative number of workers");if(this.type===i.FIXED&&0===e)throw new Error("Cannot instantiate a fixed pool with no worker")}checkPoolOptions(e){var r,t;this.opts.workerChoiceStrategy=null!==(r=e.workerChoiceStrategy)&&void 0!==r?r:k.ROUND_ROBIN,this.opts.enableEvents=null===(t=e.enableEvents)||void 0===t||t}get numberOfRunningTasks(){return this.promiseMap.size}getWorkerRunningTasks(e){return this.tasks.get(e)}getWorkerIndex(e){return this.workers.indexOf(e)}setWorkerChoiceStrategy(e){this.opts.workerChoiceStrategy=e,this.workerChoiceStrategyContext.setWorkerChoiceStrategy(e)}internalGetBusyStatus(){return this.numberOfRunningTasks>=this.numberOfWorkers&&!1===this.findFreeWorker()}findFreeWorker(){for(const e of this.workers)if(0===this.getWorkerRunningTasks(e))return e;return!1}execute(e){const r=this.chooseWorker(),t=++this.nextMessageId,s=this.internalExecute(r,t);return this.checkAndEmitBusy(),e=null!=e?e:{},this.sendToWorker(r,{data:e,id:t}),s}async destroy(){await Promise.all(this.workers.map((e=>this.destroyWorker(e))))}setupHook(){}increaseWorkersTask(e){this.stepWorkerNumberOfTasks(e,1)}decreaseWorkersTasks(e){this.stepWorkerNumberOfTasks(e,-1)}stepWorkerNumberOfTasks(e,r){const t=this.tasks.get(e);if(void 0===t)throw Error("Worker could not be found in tasks map");this.tasks.set(e,t+r)}removeWorker(e){this.workers.splice(this.getWorkerIndex(e),1),this.tasks.delete(e)}chooseWorker(){return this.workerChoiceStrategyContext.execute()}internalExecute(e,r){return this.increaseWorkersTask(e),new Promise(((t,s)=>{this.promiseMap.set(r,{resolve:t,reject:s,worker:e})}))}createAndSetupWorker(){var e,r,t,s;const o=this.createWorker();return o.on("message",null!==(e=this.opts.messageHandler)&&void 0!==e?e:c),o.on("error",null!==(r=this.opts.errorHandler)&&void 0!==r?r:c),o.on("online",null!==(t=this.opts.onlineHandler)&&void 0!==t?t:c),o.on("exit",null!==(s=this.opts.exitHandler)&&void 0!==s?s:c),o.once("exit",(()=>this.removeWorker(o))),this.workers.push(o),this.tasks.set(o,0),this.afterWorkerSetup(o),o}workerListener(){return e=>{if(void 0!==e.id){const r=this.promiseMap.get(e.id);void 0!==r&&(this.decreaseWorkersTasks(r.worker),e.error?r.reject(e.error):r.resolve(e.data),this.promiseMap.delete(e.id))}}}checkAndEmitBusy(){var e;this.opts.enableEvents&&this.busy&&(null===(e=this.emitter)||void 0===e||e.emit("busy"))}}class w extends y{constructor(e,r,t={}){super(e,r,t),this.opts=t}setupHook(){a.default.setupPrimary({exec:this.filePath})}isMain(){return a.default.isPrimary}destroyWorker(e){this.sendToWorker(e,{kill:1}),e.kill()}sendToWorker(e,r){e.send(r)}registerWorkerMessageListener(e,r){e.on("message",r)}createWorker(){return a.default.fork(this.opts.env)}afterWorkerSetup(e){this.registerWorkerMessageListener(e,super.workerListener())}get type(){return i.FIXED}get busy(){return this.internalGetBusyStatus()}}class f extends y{constructor(e,r,t={}){super(e,r,t)}isMain(){return t.isMainThread}async destroyWorker(e){this.sendToWorker(e,{kill:1}),await e.terminate()}sendToWorker(e,r){e.postMessage(r)}registerWorkerMessageListener(e,r){var t;null===(t=e.port2)||void 0===t||t.on("message",r)}createWorker(){return new t.Worker(this.filePath,{env:t.SHARE_ENV})}afterWorkerSetup(e){const{port1:r,port2:s}=new t.MessageChannel;e.postMessage({parent:r},[r]),e.port1=r,e.port2=s,this.registerWorkerMessageListener(e,super.workerListener())}get type(){return i.FIXED}get busy(){return this.internalGetBusyStatus()}}const v=l.SOFT;class x extends s.AsyncResource{constructor(e,r,t,s,o={killBehavior:v,maxInactiveTime:6e4}){var i,n;super(e),this.mainWorker=s,this.opts=o,this.checkFunctionInput(t),this.checkWorkerOptions(this.opts),this.lastTaskTimestamp=Date.now(),r||(this.aliveInterval=setInterval(this.checkAlive.bind(this),(null!==(i=this.opts.maxInactiveTime)&&void 0!==i?i:6e4)/2),this.checkAlive.bind(this)()),null===(n=this.mainWorker)||void 0===n||n.on("message",(e=>{this.messageListener(e,t)}))}messageListener(e,r){void 0!==e.data&&void 0!==e.id?this.opts.async?this.runInAsyncScope(this.runAsync.bind(this),this,r,e):this.runInAsyncScope(this.run.bind(this),this,r,e):void 0!==e.parent?this.mainWorker=e.parent:void 0!==e.kill&&(this.aliveInterval&&clearInterval(this.aliveInterval),this.emitDestroy())}checkWorkerOptions(e){var r,t;this.opts.killBehavior=null!==(r=e.killBehavior)&&void 0!==r?r:v,this.opts.maxInactiveTime=null!==(t=e.maxInactiveTime)&&void 0!==t?t:6e4,this.opts.async=!!e.async}checkFunctionInput(e){if(!e)throw new Error("fn parameter is mandatory")}getMainWorker(){if(!this.mainWorker)throw new Error("Main worker was not set");return this.mainWorker}checkAlive(){var e;Date.now()-this.lastTaskTimestamp>(null!==(e=this.opts.maxInactiveTime)&&void 0!==e?e:6e4)&&this.sendToMainWorker({kill:this.opts.killBehavior})}handleError(e){return e}run(e,r){try{const t=e(r.data);this.sendToMainWorker({data:t,id:r.id})}catch(e){const t=this.handleError(e);this.sendToMainWorker({error:t,id:r.id})}finally{this.lastTaskTimestamp=Date.now()}}runAsync(e,r){e(r.data).then((e=>(this.sendToMainWorker({data:e,id:r.id}),null))).catch((e=>{const t=this.handleError(e);this.sendToMainWorker({error:t,id:r.id})})).finally((()=>{this.lastTaskTimestamp=Date.now()})).catch(c)}}exports.AbstractWorker=x,exports.ClusterWorker=class extends x{constructor(e,r={}){super("worker-cluster-pool:poolifier",a.default.isPrimary,e,a.default.worker,r)}sendToMainWorker(e){this.getMainWorker().send(e)}handleError(e){return e instanceof Error?e.message:e}},exports.DynamicClusterPool=class extends w{constructor(e,r,t,s={}){super(e,t,s),this.max=r}get type(){return i.DYNAMIC}get busy(){return this.workers.length===this.max}},exports.DynamicThreadPool=class extends f{constructor(e,r,t,s={}){super(e,t,s),this.max=r}get type(){return i.DYNAMIC}get busy(){return this.workers.length===this.max}},exports.FixedClusterPool=w,exports.FixedThreadPool=f,exports.KillBehaviors=l,exports.ThreadWorker=class extends x{constructor(e,r={}){super("worker-thread-pool:poolifier",t.isMainThread,e,t.parentPort,r)}sendToMainWorker(e){this.getMainWorker().postMessage(e)}},exports.WorkerChoiceStrategies=k;
"use strict";var e,r=require("events"),t=require("cluster"),s=require("os"),o=require("worker_threads"),i=require("async_hooks");!function(e){e.FIXED="fixed",e.DYNAMIC="dynamic"}(e||(e={}));class n extends r{}const a=()=>{},h=Object.freeze({SOFT:"SOFT",HARD:"HARD"});const k=Object.freeze({ROUND_ROBIN:"ROUND_ROBIN",LESS_RECENTLY_USED:"LESS_RECENTLY_USED",FAIR_SHARE:"FAIR_SHARE",WEIGHTED_ROUND_ROBIN:"WEIGHTED_ROUND_ROBIN"});class u{constructor(r){this.pool=r,this.isDynamicPool=this.pool.type===e.DYNAMIC,this.requiredStatistics={runTime:!1}}}class c extends u{constructor(){super(...arguments),this.requiredStatistics={runTime:!0},this.workerLastVirtualTaskTimestamp=new Map}reset(){return this.workerLastVirtualTaskTimestamp.clear(),!0}choose(){this.computeWorkerLastVirtualTaskTimestamp();let e,r=1/0;for(const t of this.pool.workers){const s=this.workerLastVirtualTaskTimestamp.get(t)?.end??0;s<r&&(r=s,e=t)}return e}computeWorkerLastVirtualTaskTimestamp(){for(const e of this.pool.workers){const r=Math.max(Date.now(),this.workerLastVirtualTaskTimestamp.get(e)?.end??-1/0),t=r+(this.pool.getWorkerAverageTasksRunTime(e)??0);this.workerLastVirtualTaskTimestamp.set(e,{start:r,end:t})}}}class l extends u{reset(){return!0}choose(){let e,r=1/0;for(const t of this.pool.workers){const s=this.pool.getWorkerRunningTasks(t);if(!1===this.isDynamicPool&&0===s)return t;s<r&&(e=t,r=s)}return e}}class p extends u{constructor(){super(...arguments),this.nextWorkerIndex=0}reset(){return this.nextWorkerIndex=0,!0}choose(){const e=this.pool.workers[this.nextWorkerIndex];return this.nextWorkerIndex=this.nextWorkerIndex===this.pool.workers.length-1?0:this.nextWorkerIndex+1,e}}class W extends u{constructor(e){super(e),this.requiredStatistics={runTime:!0},this.previousWorkerIndex=0,this.currentWorkerIndex=0,this.workersTaskRunTime=new Map,this.defaultWorkerWeight=this.computeWorkerWeight(),this.initWorkersTaskRunTime()}reset(){return this.previousWorkerIndex=0,this.currentWorkerIndex=0,this.workersTaskRunTime.clear(),this.initWorkersTaskRunTime(),!0}choose(){const e=this.pool.workers[this.currentWorkerIndex];!0===this.isDynamicPool&&!1===this.workersTaskRunTime.has(e)&&this.initWorkerTaskRunTime(e);const r=this.getWorkerVirtualTaskRunTime(e)??0,t=this.workersTaskRunTime.get(e)?.weight??this.defaultWorkerWeight;if(this.currentWorkerIndex===this.previousWorkerIndex){const s=(this.workersTaskRunTime.get(e)?.runTime??0)+r;this.setWorkerTaskRunTime(e,t,s)}else this.setWorkerTaskRunTime(e,t,0);return r<t?this.previousWorkerIndex=this.currentWorkerIndex:(this.previousWorkerIndex=this.currentWorkerIndex,this.currentWorkerIndex=this.pool.workers.length-1===this.currentWorkerIndex?0:this.currentWorkerIndex+1),this.pool.workers[this.currentWorkerIndex]}initWorkersTaskRunTime(){for(const e of this.pool.workers)this.initWorkerTaskRunTime(e)}initWorkerTaskRunTime(e){this.setWorkerTaskRunTime(e,this.defaultWorkerWeight,0)}setWorkerTaskRunTime(e,r,t){this.workersTaskRunTime.set(e,{weight:r,runTime:t})}getWorkerVirtualTaskRunTime(e){return this.pool.getWorkerAverageTasksRunTime(e)}computeWorkerWeight(){let e=0;for(const r of s.cpus()){const t=r.speed.toString().length-1;e+=1/(r.speed/Math.pow(10,t))*Math.pow(10,t)}return Math.round(e/s.cpus().length)}}class g{static getWorkerChoiceStrategy(e,r=k.ROUND_ROBIN){switch(r){case k.ROUND_ROBIN:return new p(e);case k.LESS_RECENTLY_USED:return new l(e);case k.FAIR_SHARE:return new c(e);case k.WEIGHTED_ROUND_ROBIN:return new W(e);default:throw new Error(`Worker choice strategy '${r}' not found`)}}}class d extends u{constructor(e,r,t=k.ROUND_ROBIN){super(e),this.createDynamicallyWorkerCallback=r,this.workerChoiceStrategy=g.getWorkerChoiceStrategy(this.pool,t),this.requiredStatistics=this.workerChoiceStrategy.requiredStatistics}reset(){return this.workerChoiceStrategy.reset()}choose(){const e=this.pool.findFreeWorker();return e||(!0===this.pool.busy?this.workerChoiceStrategy.choose():this.createDynamicallyWorkerCallback())}}class m{constructor(e,r,t=k.ROUND_ROBIN){this.pool=e,this.createDynamicallyWorkerCallback=r,this.setWorkerChoiceStrategy(t)}getPoolWorkerChoiceStrategy(r=k.ROUND_ROBIN){return this.pool.type===e.DYNAMIC?new d(this.pool,this.createDynamicallyWorkerCallback,r):g.getWorkerChoiceStrategy(this.pool,r)}getWorkerChoiceStrategy(){return this.workerChoiceStrategy}setWorkerChoiceStrategy(e){this.workerChoiceStrategy?.reset(),this.workerChoiceStrategy=this.getPoolWorkerChoiceStrategy(e)}execute(){return this.workerChoiceStrategy.choose()}}class T{constructor(e,r,t){if(this.numberOfWorkers=e,this.filePath=r,this.opts=t,this.workers=[],this.workersTasksUsage=new Map,this.promiseMap=new Map,this.nextMessageId=0,!this.isMain())throw new Error("Cannot start a pool from a worker!");this.checkNumberOfWorkers(this.numberOfWorkers),this.checkFilePath(this.filePath),this.checkPoolOptions(this.opts),this.setupHook();for(let e=1;e<=this.numberOfWorkers;e++)this.createAndSetupWorker();this.opts.enableEvents&&(this.emitter=new n),this.workerChoiceStrategyContext=new m(this,(()=>{const e=this.createAndSetupWorker();return this.registerWorkerMessageListener(e,(r=>{var t;t=h.HARD,(r.kill===t||0===this.getWorkerRunningTasks(e))&&this.destroyWorker(e)})),e}),this.opts.workerChoiceStrategy)}checkFilePath(e){if(!e)throw new Error("Please specify a file with a worker implementation")}checkNumberOfWorkers(r){if(null==r)throw new Error("Cannot instantiate a pool without specifying the number of workers");if(!1===Number.isSafeInteger(r))throw new Error("Cannot instantiate a pool with a non integer number of workers");if(r<0)throw new Error("Cannot instantiate a pool with a negative number of workers");if(this.type===e.FIXED&&0===r)throw new Error("Cannot instantiate a fixed pool with no worker")}checkPoolOptions(e){this.opts.workerChoiceStrategy=e.workerChoiceStrategy??k.ROUND_ROBIN,this.opts.enableEvents=e.enableEvents??!0}get numberOfRunningTasks(){return this.promiseMap.size}getWorkerIndex(e){return this.workers.indexOf(e)}getWorkerRunningTasks(e){return this.workersTasksUsage.get(e)?.running}getWorkerAverageTasksRunTime(e){return this.workersTasksUsage.get(e)?.avgRunTime}setWorkerChoiceStrategy(e){this.opts.workerChoiceStrategy=e;for(const e of this.workers)this.resetWorkerTasksUsage(e);this.workerChoiceStrategyContext.setWorkerChoiceStrategy(e)}internalGetBusyStatus(){return this.numberOfRunningTasks>=this.numberOfWorkers&&!1===this.findFreeWorker()}findFreeWorker(){for(const e of this.workers)if(0===this.getWorkerRunningTasks(e))return e;return!1}execute(e){const r=this.chooseWorker(),t=++this.nextMessageId,s=this.internalExecute(r,t);return this.checkAndEmitBusy(),e=e??{},this.sendToWorker(r,{data:e,id:t}),s}async destroy(){await Promise.all(this.workers.map((e=>this.destroyWorker(e))))}setupHook(){}beforePromiseWorkerResponseHook(e){this.increaseWorkerRunningTasks(e)}afterPromiseWorkerResponseHook(e,r){this.decreaseWorkerRunningTasks(r.worker),this.stepWorkerRunTasks(r.worker,1),this.updateWorkerTasksRunTime(r.worker,e.taskRunTime)}removeWorker(e){this.workers.splice(this.getWorkerIndex(e),1),this.removeWorkerTasksUsage(e)}chooseWorker(){return this.workerChoiceStrategyContext.execute()}internalExecute(e,r){return this.beforePromiseWorkerResponseHook(e),new Promise(((t,s)=>{this.promiseMap.set(r,{resolve:t,reject:s,worker:e})}))}createAndSetupWorker(){const e=this.createWorker();return e.on("message",this.opts.messageHandler??a),e.on("error",this.opts.errorHandler??a),e.on("online",this.opts.onlineHandler??a),e.on("exit",this.opts.exitHandler??a),e.once("exit",(()=>this.removeWorker(e))),this.workers.push(e),this.initWorkerTasksUsage(e),this.afterWorkerSetup(e),e}workerListener(){return e=>{if(void 0!==e.id){const r=this.promiseMap.get(e.id);void 0!==r&&(this.afterPromiseWorkerResponseHook(e,r),e.error?r.reject(e.error):r.resolve(e.data),this.promiseMap.delete(e.id))}}}checkAndEmitBusy(){this.opts.enableEvents&&this.busy&&this.emitter?.emit("busy")}increaseWorkerRunningTasks(e){this.stepWorkerRunningTasks(e,1)}decreaseWorkerRunningTasks(e){this.stepWorkerRunningTasks(e,-1)}stepWorkerRunningTasks(e,r){const t=this.workersTasksUsage.get(e);if(void 0===t)throw new Error("Worker could not be found in worker tasks usage map");t.running=t.running+r,this.workersTasksUsage.set(e,t)}stepWorkerRunTasks(e,r){const t=this.workersTasksUsage.get(e);if(void 0===t)throw new Error("Worker could not be found in worker tasks usage map");t.run=t.run+r,this.workersTasksUsage.set(e,t)}updateWorkerTasksRunTime(e,r){if(!0===this.workerChoiceStrategyContext.getWorkerChoiceStrategy().requiredStatistics.runTime){const t=this.workersTasksUsage.get(e);if(void 0===t)throw new Error("Worker could not be found in worker tasks usage map");t.runTime+=r??0,0!==t.run&&(t.avgRunTime=t.runTime/t.run),this.workersTasksUsage.set(e,t)}}initWorkerTasksUsage(e){this.workersTasksUsage.set(e,{run:0,running:0,runTime:0,avgRunTime:0})}removeWorkerTasksUsage(e){this.workersTasksUsage.delete(e)}resetWorkerTasksUsage(e){this.removeWorkerTasksUsage(e),this.initWorkerTasksUsage(e)}}class w extends T{constructor(e,r,t={}){super(e,r,t),this.opts=t}setupHook(){t.setupPrimary({exec:this.filePath})}isMain(){return t.isPrimary}destroyWorker(e){this.sendToWorker(e,{kill:1}),e.kill()}sendToWorker(e,r){e.send(r)}registerWorkerMessageListener(e,r){e.on("message",r)}createWorker(){return t.fork(this.opts.env)}afterWorkerSetup(e){this.registerWorkerMessageListener(e,super.workerListener())}get type(){return e.FIXED}get busy(){return this.internalGetBusyStatus()}}class y extends T{constructor(e,r,t={}){super(e,r,t)}isMain(){return o.isMainThread}async destroyWorker(e){this.sendToWorker(e,{kill:1}),await e.terminate()}sendToWorker(e,r){e.postMessage(r)}registerWorkerMessageListener(e,r){e.port2?.on("message",r)}createWorker(){return new o.Worker(this.filePath,{env:o.SHARE_ENV})}afterWorkerSetup(e){const{port1:r,port2:t}=new o.MessageChannel;e.postMessage({parent:r},[r]),e.port1=r,e.port2=t,this.registerWorkerMessageListener(e,super.workerListener())}get type(){return e.FIXED}get busy(){return this.internalGetBusyStatus()}}const R=h.SOFT;class f extends i.AsyncResource{constructor(e,r,t,s,o={killBehavior:R,maxInactiveTime:6e4}){super(e),this.mainWorker=s,this.opts=o,this.checkFunctionInput(t),this.checkWorkerOptions(this.opts),this.lastTaskTimestamp=Date.now(),!1===r&&(this.aliveInterval=setInterval(this.checkAlive.bind(this),(this.opts.maxInactiveTime??6e4)/2),this.checkAlive.bind(this)()),this.mainWorker?.on("message",(e=>{this.messageListener(e,t)}))}messageListener(e,r){void 0!==e.data&&void 0!==e.id?this.opts.async?this.runInAsyncScope(this.runAsync.bind(this),this,r,e):this.runInAsyncScope(this.run.bind(this),this,r,e):void 0!==e.parent?this.mainWorker=e.parent:void 0!==e.kill&&(this.aliveInterval&&clearInterval(this.aliveInterval),this.emitDestroy())}checkWorkerOptions(e){this.opts.killBehavior=e.killBehavior??R,this.opts.maxInactiveTime=e.maxInactiveTime??6e4,this.opts.async=!!e.async}checkFunctionInput(e){if(!e)throw new Error("fn parameter is mandatory")}getMainWorker(){if(!this.mainWorker)throw new Error("Main worker was not set");return this.mainWorker}checkAlive(){Date.now()-this.lastTaskTimestamp>(this.opts.maxInactiveTime??6e4)&&this.sendToMainWorker({kill:this.opts.killBehavior})}handleError(e){return e}run(e,r){try{const t=Date.now(),s=e(r.data),o=Date.now()-t;this.sendToMainWorker({data:s,id:r.id,taskRunTime:o})}catch(e){const t=this.handleError(e);this.sendToMainWorker({error:t,id:r.id})}finally{this.lastTaskTimestamp=Date.now()}}runAsync(e,r){const t=Date.now();e(r.data).then((e=>{const s=Date.now()-t;return this.sendToMainWorker({data:e,id:r.id,taskRunTime:s}),null})).catch((e=>{const t=this.handleError(e);this.sendToMainWorker({error:t,id:r.id})})).finally((()=>{this.lastTaskTimestamp=Date.now()})).catch(a)}}exports.AbstractWorker=f,exports.ClusterWorker=class extends f{constructor(e,r={}){super("worker-cluster-pool:poolifier",t.isPrimary,e,t.worker,r)}sendToMainWorker(e){this.getMainWorker().send(e)}handleError(e){return e instanceof Error?e.message:e}},exports.DynamicClusterPool=class extends w{constructor(e,r,t,s={}){super(e,t,s),this.max=r}get type(){return e.DYNAMIC}get busy(){return this.workers.length===this.max}},exports.DynamicThreadPool=class extends y{constructor(e,r,t,s={}){super(e,t,s),this.max=r}get type(){return e.DYNAMIC}get busy(){return this.workers.length===this.max}},exports.FixedClusterPool=w,exports.FixedThreadPool=y,exports.KillBehaviors=h,exports.ThreadWorker=class extends f{constructor(e,r={}){super("worker-thread-pool:poolifier",o.isMainThread,e,o.parentPort,r)}sendToMainWorker(e){this.getMainWorker().postMessage(e)}},exports.WorkerChoiceStrategies=k;
{
"name": "poolifier",
"version": "2.2.2",
"version": "2.3.0",
"description": "A fast, easy to use Node.js Worker Thread Pool and Cluster Pool implementation",
"main": "lib/index.js",
"scripts": {
"prepare": "node prepare.js",
"build": "rollup --config --environment BUILD:development",

@@ -20,2 +21,3 @@ "build:typedoc": "rollup --config --environment BUILD:development --environment DOCUMENTATION",

"lint:fix": "eslint . --cache --fix",
"lint:report": "eslint . --cache --format json --output-file reports/eslint.json",
"typedoc": "typedoc",

@@ -66,5 +68,5 @@ "sonar:properties": "./updateSonarProps.sh",

"devDependencies": {
"@types/node": "^18.8.3",
"@typescript-eslint/eslint-plugin": "^5.39.0",
"@typescript-eslint/parser": "^5.39.0",
"@types/node": "^18.8.4",
"@typescript-eslint/eslint-plugin": "^5.40.0",
"@typescript-eslint/parser": "^5.40.0",
"benchmark": "^2.1.4",

@@ -80,5 +82,7 @@ "eslint": "^8.25.0",

"eslint-plugin-prettierx": "^0.18.0",
"eslint-plugin-promise": "^6.0.1",
"eslint-plugin-promise": "^6.1.0",
"eslint-plugin-spellcheck": "^0.0.19",
"expect": "^29.1.2",
"husky": "^8.0.1",
"lint-staged": "^13.0.3",
"microtime": "^3.1.1",

@@ -91,3 +95,3 @@ "mocha": "^10.0.0",

"prettierx": "^0.18.3",
"rollup": "^2.79.1",
"rollup": "^3.1.0",
"rollup-plugin-analyzer": "^4.0.0",

@@ -101,3 +105,3 @@ "rollup-plugin-command": "^1.1.3",

"source-map-support": "^0.5.21",
"typedoc": "^0.23.15",
"typedoc": "^0.23.16",
"typescript": "^4.8.4"

@@ -104,0 +108,0 @@ },

@@ -159,10 +159,14 @@ <div align="center">

- `messageHandler` (optional) - A function that will listen for message event on each worker
- `errorHandler` (optional) - A function that will listen for error event on each worker
- `onlineHandler` (optional) - A function that will listen for online event on each worker
- `exitHandler` (optional) - A function that will listen for exit event on each worker
- `workerChoiceStrategy` (optional) - The work choice strategy to use in this pool:
- `workerChoiceStrategy` (optional) - The worker choice strategy to use in this pool:
- `WorkerChoiceStrategies.ROUND_ROBIN`: Submit tasks to worker in this pool in a round robbin fashion
- `WorkerChoiceStrategies.LESS_RECENTLY_USED`: Submit tasks to the less recently used worker in the pool
- `WorkerChoiceStrategies.ROUND_ROBIN`: Submit tasks to worker in a round robbin fashion
- `WorkerChoiceStrategies.LESS_RECENTLY_USED`: Submit tasks to the less recently used worker
- `WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN` Submit tasks to worker using a weighted round robin scheduling algorithm based on tasks execution time
- `WorkerChoiceStrategies.FAIR_SHARE`: Submit tasks to worker using a fair share tasks scheduling algorithm based on tasks execution time
`WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN` and `WorkerChoiceStrategies.FAIR_SHARE` strategies are targeted to heavy and long tasks
Default: `WorkerChoiceStrategies.ROUND_ROBIN`

@@ -198,5 +202,5 @@

If `killBehavior` is set to `KillBehaviors.SOFT` your tasks have no timeout and your workers will not be terminated until your task is completed.
Default: 60.000 ms
Default: 60000 ms
- `async` - true/false, true if your function contains async pieces else false
- `async` - true/false, true if your function contains async code pieces, else false
- `killBehavior` - Dictates if your async unit (worker/process) will be deleted in case that a task is active on it.

@@ -249,3 +253,3 @@ **KillBehaviors.SOFT**: If `currentTime - lastActiveTime` is greater than `maxInactiveTime` but a task is still running, then the worker **won't** be deleted.

See guidelines [CONTRIBUTING](CONTRIBUTING.md)
Choose your task here [2.0.0](https://github.com/poolifier/poolifier/projects/1), propose an idea, a fix, an improvement.
Choose your task here [2.3.0](https://github.com/orgs/poolifier/projects/1), propose an idea, a fix, an improvement.

@@ -252,0 +256,0 @@ ## Team

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc