poolifier
Advanced tools
Comparing version 2.4.0 to 2.4.1
@@ -1,1 +0,1 @@ | ||
"use strict";var e,r=require("node:cluster"),t=require("node:crypto"),s=require("node:events"),o=require("node:os"),i=require("node:worker_threads"),n=require("node:async_hooks");!function(e){e.FIXED="fixed",e.DYNAMIC="dynamic"}(e||(e={}));const a=Object.freeze((()=>{})),h=Object.freeze({SOFT:"SOFT",HARD:"HARD"});function k(e,r){return r===e}class u extends s{}const c=Object.freeze({ROUND_ROBIN:"ROUND_ROBIN",LESS_USED:"LESS_USED",LESS_BUSY:"LESS_BUSY",FAIR_SHARE:"FAIR_SHARE",WEIGHTED_ROUND_ROBIN:"WEIGHTED_ROUND_ROBIN"});class l{pool;isDynamicPool;requiredStatistics={runTime:!1,avgRunTime:!1};constructor(r){this.pool=r,this.isDynamicPool=this.pool.type===e.DYNAMIC}}class p extends l{requiredStatistics={runTime:!0,avgRunTime:!0};workerLastVirtualTaskTimestamp=new Map;reset(){return this.workerLastVirtualTaskTimestamp.clear(),!0}choose(){let e,r=1/0;for(const[t]of this.pool.workers.entries()){this.computeWorkerLastVirtualTaskTimestamp(t);const s=this.workerLastVirtualTaskTimestamp.get(t)?.end??0;s<r&&(r=s,e=t)}return e}remove(e){const r=this.workerLastVirtualTaskTimestamp.delete(e);for(const[r,t]of this.workerLastVirtualTaskTimestamp.entries())r>e&&this.workerLastVirtualTaskTimestamp.set(r-1,t);return r}computeWorkerLastVirtualTaskTimestamp(e){const r=Math.max(Date.now(),this.workerLastVirtualTaskTimestamp.get(e)?.end??-1/0);this.workerLastVirtualTaskTimestamp.set(e,{start:r,end:r+(this.pool.workers[e].tasksUsage.avgRunTime??0)})}}class m extends l{requiredStatistics={runTime:!0,avgRunTime:!1};reset(){return!0}choose(){const e=this.pool.findFreeWorkerKey();if(!this.isDynamicPool&&-1!==e)return e;let r,t=1/0;for(const[e,s]of this.pool.workers.entries()){const o=s.tasksUsage.runTime;if(0===o)return e;o<t&&(t=o,r=e)}return r}remove(e){return!0}}class d extends l{reset(){return!0}choose(){const e=this.pool.findFreeWorkerKey();if(!this.isDynamicPool&&-1!==e)return e;let r,t=1/0;for(const[e,s]of this.pool.workers.entries()){const o=s.tasksUsage,i=o?.run+o?.running;if(0===i)return e;i<t&&(t=i,r=e)}return r}remove(e){return!0}}class w extends l{nextWorkerId=0;reset(){return this.nextWorkerId=0,!0}choose(){const e=this.nextWorkerId;return this.nextWorkerId=this.nextWorkerId===this.pool.workers.length-1?0:this.nextWorkerId+1,e}remove(e){return this.nextWorkerId===e&&(this.nextWorkerId=this.nextWorkerId>this.pool.workers.length-1?this.pool.workers.length-1:this.nextWorkerId),!0}}class g extends l{requiredStatistics={runTime:!0,avgRunTime:!0};currentWorkerId=0;defaultWorkerWeight;workersTaskRunTime=new Map;constructor(e){super(e),this.defaultWorkerWeight=this.computeWorkerWeight(),this.initWorkersTaskRunTime()}reset(){return this.currentWorkerId=0,this.workersTaskRunTime.clear(),this.initWorkersTaskRunTime(),!0}choose(){const e=this.currentWorkerId;this.isDynamicPool&&!this.workersTaskRunTime.has(e)&&this.initWorkerTaskRunTime(e);const r=this.workersTaskRunTime.get(e)?.runTime??0,t=this.workersTaskRunTime.get(e)?.weight??this.defaultWorkerWeight;return r<t?this.setWorkerTaskRunTime(e,t,r+(this.getWorkerVirtualTaskRunTime(e)??0)):(this.currentWorkerId=this.currentWorkerId===this.pool.workers.length-1?0:this.currentWorkerId+1,this.setWorkerTaskRunTime(this.currentWorkerId,t,0)),e}remove(e){this.currentWorkerId===e&&(this.currentWorkerId=this.currentWorkerId>this.pool.workers.length-1?this.pool.workers.length-1:this.currentWorkerId);const r=this.workersTaskRunTime.delete(e);for(const[r,t]of this.workersTaskRunTime)r>e&&this.workersTaskRunTime.set(r-1,t);return r}initWorkersTaskRunTime(){for(const[e]of this.pool.workers.entries())this.initWorkerTaskRunTime(e)}initWorkerTaskRunTime(e){this.setWorkerTaskRunTime(e,this.defaultWorkerWeight,0)}setWorkerTaskRunTime(e,r,t){this.workersTaskRunTime.set(e,{weight:r,runTime:t})}getWorkerVirtualTaskRunTime(e){return this.pool.workers[e].tasksUsage.avgRunTime}computeWorkerWeight(){let e=0;for(const r of o.cpus()){const t=r.speed.toString().length-1;e+=1/(r.speed/Math.pow(10,t))*Math.pow(10,t)}return Math.round(e/o.cpus().length)}}function W(e,r=c.ROUND_ROBIN){switch(r){case c.ROUND_ROBIN:return new w(e);case c.LESS_USED:return new d(e);case c.LESS_BUSY:return new m(e);case c.FAIR_SHARE:return new p(e);case c.WEIGHTED_ROUND_ROBIN:return new g(e);default:throw new Error(`Worker choice strategy '${r}' not found`)}}class T extends l{createWorkerCallback;workerChoiceStrategy;constructor(e,r,t=c.ROUND_ROBIN){super(e),this.createWorkerCallback=r,this.workerChoiceStrategy=W(this.pool,t),this.requiredStatistics=this.workerChoiceStrategy.requiredStatistics}reset(){return this.workerChoiceStrategy.reset()}choose(){const e=this.pool.findFreeWorkerKey();return-1!==e?e:this.pool.busy?this.workerChoiceStrategy.choose():this.createWorkerCallback()}remove(e){return this.workerChoiceStrategy.remove(e)}}class y{pool;createWorkerCallback;workerChoiceStrategy;constructor(e,r,t=c.ROUND_ROBIN){this.pool=e,this.createWorkerCallback=r,this.setWorkerChoiceStrategy(t)}getPoolWorkerChoiceStrategy(r=c.ROUND_ROBIN){return this.pool.type===e.DYNAMIC?new T(this.pool,this.createWorkerCallback,r):W(this.pool,r)}getRequiredStatistics(){return this.workerChoiceStrategy.requiredStatistics}setWorkerChoiceStrategy(e){this.workerChoiceStrategy?.reset(),this.workerChoiceStrategy=this.getPoolWorkerChoiceStrategy(e)}execute(){return this.workerChoiceStrategy.choose()}remove(e){return this.workerChoiceStrategy.remove(e)}}class f{numberOfWorkers;filePath;opts;workers=[];emitter;promiseResponseMap=new Map;workerChoiceStrategyContext;constructor(e,r,t){if(this.numberOfWorkers=e,this.filePath=r,this.opts=t,!this.isMain())throw new Error("Cannot start a pool from a worker!");this.checkNumberOfWorkers(this.numberOfWorkers),this.checkFilePath(this.filePath),this.checkPoolOptions(this.opts),this.setupHook();for(let e=1;e<=this.numberOfWorkers;e++)this.createAndSetupWorker();!0===this.opts.enableEvents&&(this.emitter=new u),this.workerChoiceStrategyContext=new y(this,(()=>{const e=this.createAndSetupWorker();return this.registerWorkerMessageListener(e,(r=>{(k(h.HARD,r.kill)||0===this.getWorkerTasksUsage(e)?.running)&&this.destroyWorker(e)})),this.getWorkerKey(e)}),this.opts.workerChoiceStrategy)}checkFilePath(e){if(null==e||"string"==typeof e&&0===e.trim().length)throw new Error("Please specify a file with a worker implementation")}checkNumberOfWorkers(r){if(null==r)throw new Error("Cannot instantiate a pool without specifying the number of workers");if(!Number.isSafeInteger(r))throw new TypeError("Cannot instantiate a pool with a non integer number of workers");if(r<0)throw new RangeError("Cannot instantiate a pool with a negative number of workers");if(this.type===e.FIXED&&0===r)throw new Error("Cannot instantiate a fixed pool with no worker")}checkPoolOptions(e){this.opts.workerChoiceStrategy=e.workerChoiceStrategy??c.ROUND_ROBIN,this.opts.enableEvents=e.enableEvents??!0}get numberOfRunningTasks(){return this.promiseResponseMap.size}getWorkerKey(e){return this.workers.findIndex((r=>r.worker===e))}setWorkerChoiceStrategy(e){this.opts.workerChoiceStrategy=e;for(const[e,r]of this.workers.entries())this.setWorker(e,r.worker,{run:0,running:0,runTime:0,avgRunTime:0,error:0});this.workerChoiceStrategyContext.setWorkerChoiceStrategy(e)}internalGetBusyStatus(){return this.numberOfRunningTasks>=this.numberOfWorkers&&-1===this.findFreeWorkerKey()}findFreeWorkerKey(){return this.workers.findIndex((e=>0===e.tasksUsage.running))}async execute(e){const[r,s]=this.chooseWorker(),o=t.randomUUID(),i=this.internalExecute(r,s,o);return this.checkAndEmitBusy(),this.sendToWorker(s,{data:e??{},id:o}),i}async destroy(){await Promise.all(this.workers.map((async e=>{await this.destroyWorker(e.worker)})))}setupHook(){}beforePromiseResponseHook(e){++this.workers[e].tasksUsage.running}afterPromiseResponseHook(e,r){const t=this.getWorkerTasksUsage(e);--t.running,++t.run,null!=r.error&&++t.error,this.workerChoiceStrategyContext.getRequiredStatistics().runTime&&(t.runTime+=r.taskRunTime??0,this.workerChoiceStrategyContext.getRequiredStatistics().avgRunTime&&0!==t.run&&(t.avgRunTime=t.runTime/t.run))}removeWorker(e){const r=this.getWorkerKey(e);this.workers.splice(r,1),this.workerChoiceStrategyContext.remove(r)}chooseWorker(){const e=this.workerChoiceStrategyContext.execute();return[e,this.workers[e].worker]}createAndSetupWorker(){const e=this.createWorker();return e.on("message",this.opts.messageHandler??a),e.on("error",this.opts.errorHandler??a),e.on("online",this.opts.onlineHandler??a),e.on("exit",this.opts.exitHandler??a),e.once("exit",(()=>{this.removeWorker(e)})),this.pushWorker(e,{run:0,running:0,runTime:0,avgRunTime:0,error:0}),this.afterWorkerSetup(e),e}workerListener(){return e=>{if(void 0!==e.id){const r=this.promiseResponseMap.get(e.id);void 0!==r&&(null!=e.error?r.reject(e.error):r.resolve(e.data),this.afterPromiseResponseHook(r.worker,e),this.promiseResponseMap.delete(e.id))}}}async internalExecute(e,r,t){return this.beforePromiseResponseHook(e),await new Promise(((e,s)=>{this.promiseResponseMap.set(t,{resolve:e,reject:s,worker:r})}))}checkAndEmitBusy(){!0===this.opts.enableEvents&&this.busy&&this.emitter?.emit("busy")}getWorkerTasksUsage(e){const r=this.getWorkerKey(e);if(-1!==r)return this.workers[r].tasksUsage;throw new Error("Worker could not be found in the pool")}pushWorker(e,r){this.workers.push({worker:e,tasksUsage:r})}setWorker(e,r,t){this.workers[e]={worker:r,tasksUsage:t}}}class R extends f{opts;constructor(e,r,t={}){super(e,r,t),this.opts=t}setupHook(){r.setupPrimary({...this.opts.settings,exec:this.filePath})}isMain(){return r.isPrimary}destroyWorker(e){this.sendToWorker(e,{kill:1}),e.kill()}sendToWorker(e,r){e.send(r)}registerWorkerMessageListener(e,r){e.on("message",r)}createWorker(){return r.fork(this.opts.env)}afterWorkerSetup(e){this.registerWorkerMessageListener(e,super.workerListener())}get type(){return e.FIXED}get busy(){return this.internalGetBusyStatus()}}class S extends f{constructor(e,r,t={}){super(e,r,t)}isMain(){return i.isMainThread}async destroyWorker(e){this.sendToWorker(e,{kill:1}),await e.terminate()}sendToWorker(e,r){e.postMessage(r)}registerWorkerMessageListener(e,r){e.port2?.on("message",r)}createWorker(){return new i.Worker(this.filePath,{env:i.SHARE_ENV})}afterWorkerSetup(e){const{port1:r,port2:t}=new i.MessageChannel;e.postMessage({parent:r},[r]),e.port1=r,e.port2=t,this.registerWorkerMessageListener(e,super.workerListener())}get type(){return e.FIXED}get busy(){return this.internalGetBusyStatus()}}const x=6e4,I=h.SOFT;class v extends n.AsyncResource{mainWorker;lastTaskTimestamp;aliveInterval;opts;constructor(e,r,t,s,o={killBehavior:I,maxInactiveTime:x}){super(e),this.mainWorker=s,this.opts=o,this.checkFunctionInput(t),this.checkWorkerOptions(this.opts),!r&&k(h.HARD,this.opts.killBehavior)&&(this.lastTaskTimestamp=Date.now(),this.aliveInterval=setInterval(this.checkAlive.bind(this),(this.opts.maxInactiveTime??x)/2),this.checkAlive.bind(this)()),this.mainWorker?.on("message",(e=>{this.messageListener(e,t)}))}messageListener(e,r){void 0!==e.data&&void 0!==e.id?!0===this.opts.async?this.runInAsyncScope(this.runAsync.bind(this),this,r,e):this.runInAsyncScope(this.run.bind(this),this,r,e):void 0!==e.parent?this.mainWorker=e.parent:void 0!==e.kill&&(null!=this.aliveInterval&&clearInterval(this.aliveInterval),this.emitDestroy())}checkWorkerOptions(e){this.opts.killBehavior=e.killBehavior??I,this.opts.maxInactiveTime=e.maxInactiveTime??x,this.opts.async=e.async??!1}checkFunctionInput(e){if(null==e)throw new Error("fn parameter is mandatory");if("function"!=typeof e)throw new TypeError("fn parameter is not a function")}getMainWorker(){if(null==this.mainWorker)throw new Error("Main worker was not set");return this.mainWorker}checkAlive(){Date.now()-this.lastTaskTimestamp>(this.opts.maxInactiveTime??x)&&this.sendToMainWorker({kill:this.opts.killBehavior})}handleError(e){return e}run(e,r){try{const t=Date.now(),s=e(r.data),o=Date.now()-t;this.sendToMainWorker({data:s,id:r.id,taskRunTime:o})}catch(e){const t=this.handleError(e);this.sendToMainWorker({error:t,id:r.id})}finally{k(h.HARD,this.opts.killBehavior)&&(this.lastTaskTimestamp=Date.now())}}runAsync(e,r){const t=Date.now();e(r.data).then((e=>{const s=Date.now()-t;return this.sendToMainWorker({data:e,id:r.id,taskRunTime:s}),null})).catch((e=>{const t=this.handleError(e);this.sendToMainWorker({error:t,id:r.id})})).finally((()=>{k(h.HARD,this.opts.killBehavior)&&(this.lastTaskTimestamp=Date.now())})).catch(a)}}exports.ClusterWorker=class extends v{constructor(e,t={}){super("worker-cluster-pool:poolifier",r.isPrimary,e,r.worker,t)}sendToMainWorker(e){this.getMainWorker().send(e)}handleError(e){return e instanceof Error?e.message:e}},exports.DynamicClusterPool=class extends R{max;constructor(e,r,t,s={}){super(e,t,s),this.max=r}get type(){return e.DYNAMIC}get busy(){return this.workers.length===this.max}},exports.DynamicThreadPool=class extends S{max;constructor(e,r,t,s={}){super(e,t,s),this.max=r}get type(){return e.DYNAMIC}get busy(){return this.workers.length===this.max}},exports.FixedClusterPool=R,exports.FixedThreadPool=S,exports.KillBehaviors=h,exports.ThreadWorker=class extends v{constructor(e,r={}){super("worker-thread-pool:poolifier",i.isMainThread,e,i.parentPort,r)}sendToMainWorker(e){this.getMainWorker().postMessage(e)}},exports.WorkerChoiceStrategies=c; | ||
"use strict";var e,r=require("node:cluster"),t=require("node:crypto"),s=require("node:events"),o=require("node:os"),i=require("node:worker_threads"),n=require("node:async_hooks");!function(e){e.FIXED="fixed",e.DYNAMIC="dynamic"}(e||(e={}));const a=Object.freeze((()=>{})),h=Object.freeze({SOFT:"SOFT",HARD:"HARD"});class k extends s{}const u=Object.freeze({ROUND_ROBIN:"ROUND_ROBIN",LESS_USED:"LESS_USED",LESS_BUSY:"LESS_BUSY",FAIR_SHARE:"FAIR_SHARE",WEIGHTED_ROUND_ROBIN:"WEIGHTED_ROUND_ROBIN"});class c{pool;isDynamicPool;requiredStatistics={runTime:!1,avgRunTime:!1};constructor(r){this.pool=r,this.isDynamicPool=this.pool.type===e.DYNAMIC,this.choose.bind(this)}}class l extends c{requiredStatistics={runTime:!0,avgRunTime:!0};workerLastVirtualTaskTimestamp=new Map;reset(){return this.workerLastVirtualTaskTimestamp.clear(),!0}choose(){let e,r=1/0;for(const[t]of this.pool.workers.entries()){this.computeWorkerLastVirtualTaskTimestamp(t);const s=this.workerLastVirtualTaskTimestamp.get(t)?.end??0;s<r&&(r=s,e=t)}return e}remove(e){const r=this.workerLastVirtualTaskTimestamp.delete(e);for(const[r,t]of this.workerLastVirtualTaskTimestamp.entries())r>e&&this.workerLastVirtualTaskTimestamp.set(r-1,t);return r}computeWorkerLastVirtualTaskTimestamp(e){const r=Math.max(Date.now(),this.workerLastVirtualTaskTimestamp.get(e)?.end??-1/0);this.workerLastVirtualTaskTimestamp.set(e,{start:r,end:r+(this.pool.workers[e].tasksUsage.avgRunTime??0)})}}class m extends c{requiredStatistics={runTime:!0,avgRunTime:!1};reset(){return!0}choose(){const e=this.pool.findFreeWorkerKey();if(-1!==e)return e;let r,t=1/0;for(const[e,s]of this.pool.workers.entries()){const o=s.tasksUsage.runTime;if(0===o)return e;o<t&&(t=o,r=e)}return r}remove(e){return!0}}class p extends c{reset(){return!0}choose(){const e=this.pool.findFreeWorkerKey();if(-1!==e)return e;let r,t=1/0;for(const[e,s]of this.pool.workers.entries()){const o=s.tasksUsage,i=o?.run+o?.running;if(0===i)return e;i<t&&(t=i,r=e)}return r}remove(e){return!0}}class d extends c{nextWorkerId=0;reset(){return this.nextWorkerId=0,!0}choose(){const e=this.nextWorkerId;return this.nextWorkerId=this.nextWorkerId===this.pool.workers.length-1?0:this.nextWorkerId+1,e}remove(e){return this.nextWorkerId===e&&(this.nextWorkerId=this.nextWorkerId>this.pool.workers.length-1?this.pool.workers.length-1:this.nextWorkerId),!0}}class w extends c{requiredStatistics={runTime:!0,avgRunTime:!0};currentWorkerId=0;defaultWorkerWeight;workersTaskRunTime=new Map;constructor(e){super(e),this.defaultWorkerWeight=this.computeWorkerWeight(),this.initWorkersTaskRunTime()}reset(){return this.currentWorkerId=0,this.workersTaskRunTime.clear(),this.initWorkersTaskRunTime(),!0}choose(){const e=this.currentWorkerId;this.isDynamicPool&&!this.workersTaskRunTime.has(e)&&this.initWorkerTaskRunTime(e);const r=this.workersTaskRunTime.get(e)?.runTime??0,t=this.workersTaskRunTime.get(e)?.weight??this.defaultWorkerWeight;return r<t?this.setWorkerTaskRunTime(e,t,r+(this.getWorkerVirtualTaskRunTime(e)??0)):(this.currentWorkerId=this.currentWorkerId===this.pool.workers.length-1?0:this.currentWorkerId+1,this.setWorkerTaskRunTime(this.currentWorkerId,t,0)),e}remove(e){this.currentWorkerId===e&&(this.currentWorkerId=this.currentWorkerId>this.pool.workers.length-1?this.pool.workers.length-1:this.currentWorkerId);const r=this.workersTaskRunTime.delete(e);for(const[r,t]of this.workersTaskRunTime)r>e&&this.workersTaskRunTime.set(r-1,t);return r}initWorkersTaskRunTime(){for(const[e]of this.pool.workers.entries())this.initWorkerTaskRunTime(e)}initWorkerTaskRunTime(e){this.setWorkerTaskRunTime(e,this.defaultWorkerWeight,0)}setWorkerTaskRunTime(e,r,t){this.workersTaskRunTime.set(e,{weight:r,runTime:t})}getWorkerVirtualTaskRunTime(e){return this.pool.workers[e].tasksUsage.avgRunTime}computeWorkerWeight(){let e=0;for(const r of o.cpus()){const t=r.speed.toString().length-1;e+=1/(r.speed/Math.pow(10,t))*Math.pow(10,t)}return Math.round(e/o.cpus().length)}}class g{pool;createWorkerCallback;workerChoiceStrategy;constructor(e,r,t=u.ROUND_ROBIN){this.pool=e,this.createWorkerCallback=r,this.execute.bind(this),this.setWorkerChoiceStrategy(t)}getRequiredStatistics(){return this.workerChoiceStrategy.requiredStatistics}setWorkerChoiceStrategy(e){this.workerChoiceStrategy?.reset(),this.workerChoiceStrategy=function(e,r=u.ROUND_ROBIN){switch(r){case u.ROUND_ROBIN:return new d(e);case u.LESS_USED:return new p(e);case u.LESS_BUSY:return new m(e);case u.FAIR_SHARE:return new l(e);case u.WEIGHTED_ROUND_ROBIN:return new w(e);default:throw new Error(`Worker choice strategy '${r}' not found`)}}(this.pool,e)}execute(){return this.workerChoiceStrategy.isDynamicPool&&!this.pool.full&&-1===this.pool.findFreeWorkerKey()?this.createWorkerCallback():this.workerChoiceStrategy.choose()}remove(e){return this.workerChoiceStrategy.remove(e)}}class W{numberOfWorkers;filePath;opts;workers=[];emitter;promiseResponseMap=new Map;workerChoiceStrategyContext;constructor(e,r,t){if(this.numberOfWorkers=e,this.filePath=r,this.opts=t,!this.isMain())throw new Error("Cannot start a pool from a worker!");this.checkNumberOfWorkers(this.numberOfWorkers),this.checkFilePath(this.filePath),this.checkPoolOptions(this.opts),this.chooseWorker.bind(this),this.internalExecute.bind(this),this.checkAndEmitBusy.bind(this),this.sendToWorker.bind(this),this.setupHook();for(let e=1;e<=this.numberOfWorkers;e++)this.createAndSetupWorker();!0===this.opts.enableEvents&&(this.emitter=new k),this.workerChoiceStrategyContext=new g(this,(()=>{const e=this.createAndSetupWorker();return this.registerWorkerMessageListener(e,(r=>{var t;t=h.HARD,(r.kill===t||0===this.getWorkerTasksUsage(e)?.running)&&this.destroyWorker(e)})),this.getWorkerKey(e)}),this.opts.workerChoiceStrategy)}checkFilePath(e){if(null==e||"string"==typeof e&&0===e.trim().length)throw new Error("Please specify a file with a worker implementation")}checkNumberOfWorkers(r){if(null==r)throw new Error("Cannot instantiate a pool without specifying the number of workers");if(!Number.isSafeInteger(r))throw new TypeError("Cannot instantiate a pool with a non integer number of workers");if(r<0)throw new RangeError("Cannot instantiate a pool with a negative number of workers");if(this.type===e.FIXED&&0===r)throw new Error("Cannot instantiate a fixed pool with no worker")}checkPoolOptions(e){this.opts.workerChoiceStrategy=e.workerChoiceStrategy??u.ROUND_ROBIN,this.opts.enableEvents=e.enableEvents??!0}get numberOfRunningTasks(){return this.promiseResponseMap.size}getWorkerKey(e){return this.workers.findIndex((r=>r.worker===e))}setWorkerChoiceStrategy(e){this.opts.workerChoiceStrategy=e;for(const[e,r]of this.workers.entries())this.setWorker(e,r.worker,{run:0,running:0,runTime:0,avgRunTime:0,error:0});this.workerChoiceStrategyContext.setWorkerChoiceStrategy(e)}internalBusy(){return this.numberOfRunningTasks>=this.numberOfWorkers&&-1===this.findFreeWorkerKey()}findFreeWorkerKey(){return this.workers.findIndex((e=>0===e.tasksUsage.running))}async execute(e){const[r,s]=this.chooseWorker(),o=t.randomUUID(),i=this.internalExecute(r,s,o);return this.checkAndEmitBusy(),this.sendToWorker(s,{data:e??{},id:o}),i}async destroy(){await Promise.all(this.workers.map((async e=>{await this.destroyWorker(e.worker)})))}setupHook(){}beforePromiseResponseHook(e){++this.workers[e].tasksUsage.running}afterPromiseResponseHook(e,r){const t=this.getWorkerTasksUsage(e);--t.running,++t.run,null!=r.error&&++t.error,this.workerChoiceStrategyContext.getRequiredStatistics().runTime&&(t.runTime+=r.taskRunTime??0,this.workerChoiceStrategyContext.getRequiredStatistics().avgRunTime&&0!==t.run&&(t.avgRunTime=t.runTime/t.run))}removeWorker(e){const r=this.getWorkerKey(e);this.workers.splice(r,1),this.workerChoiceStrategyContext.remove(r)}chooseWorker(){const e=this.workerChoiceStrategyContext.execute();return[e,this.workers[e].worker]}createAndSetupWorker(){const e=this.createWorker();return e.on("message",this.opts.messageHandler??a),e.on("error",this.opts.errorHandler??a),e.on("online",this.opts.onlineHandler??a),e.on("exit",this.opts.exitHandler??a),e.once("exit",(()=>{this.removeWorker(e)})),this.pushWorker(e,{run:0,running:0,runTime:0,avgRunTime:0,error:0}),this.afterWorkerSetup(e),e}workerListener(){return e=>{if(void 0!==e.id){const r=this.promiseResponseMap.get(e.id);void 0!==r&&(null!=e.error?r.reject(e.error):r.resolve(e.data),this.afterPromiseResponseHook(r.worker,e),this.promiseResponseMap.delete(e.id))}}}async internalExecute(e,r,t){return this.beforePromiseResponseHook(e),await new Promise(((e,s)=>{this.promiseResponseMap.set(t,{resolve:e,reject:s,worker:r})}))}checkAndEmitBusy(){!0===this.opts.enableEvents&&this.busy&&this.emitter?.emit("busy")}getWorkerTasksUsage(e){const r=this.getWorkerKey(e);if(-1!==r)return this.workers[r].tasksUsage;throw new Error("Worker could not be found in the pool")}pushWorker(e,r){this.workers.push({worker:e,tasksUsage:r})}setWorker(e,r,t){this.workers[e]={worker:r,tasksUsage:t}}}class T extends W{opts;constructor(e,r,t={}){super(e,r,t),this.opts=t}setupHook(){r.setupPrimary({...this.opts.settings,exec:this.filePath})}isMain(){return r.isPrimary}destroyWorker(e){this.sendToWorker(e,{kill:1}),e.kill()}sendToWorker(e,r){e.send(r)}registerWorkerMessageListener(e,r){e.on("message",r)}createWorker(){return r.fork(this.opts.env)}afterWorkerSetup(e){this.registerWorkerMessageListener(e,super.workerListener())}get type(){return e.FIXED}get full(){return this.workers.length===this.numberOfWorkers}get busy(){return this.internalBusy()}}class f extends W{constructor(e,r,t={}){super(e,r,t)}isMain(){return i.isMainThread}async destroyWorker(e){this.sendToWorker(e,{kill:1}),await e.terminate()}sendToWorker(e,r){e.postMessage(r)}registerWorkerMessageListener(e,r){e.port2?.on("message",r)}createWorker(){return new i.Worker(this.filePath,{env:i.SHARE_ENV})}afterWorkerSetup(e){const{port1:r,port2:t}=new i.MessageChannel;e.postMessage({parent:r},[r]),e.port1=r,e.port2=t,this.registerWorkerMessageListener(e,super.workerListener())}get type(){return e.FIXED}get full(){return this.workers.length===this.numberOfWorkers}get busy(){return this.internalBusy()}}const y=6e4,R=h.SOFT;class x extends n.AsyncResource{isMain;mainWorker;lastTaskTimestamp;aliveInterval;opts;constructor(e,r,t,s,o={killBehavior:R,maxInactiveTime:y}){super(e),this.isMain=r,this.mainWorker=s,this.opts=o,this.checkFunctionInput(t),this.checkWorkerOptions(this.opts),this.isMain||(this.lastTaskTimestamp=Date.now(),this.aliveInterval=setInterval(this.checkAlive.bind(this),(this.opts.maxInactiveTime??y)/2),this.checkAlive.bind(this)()),this.mainWorker?.on("message",(e=>{this.messageListener(e,t)}))}messageListener(e,r){void 0!==e.data&&void 0!==e.id?!0===this.opts.async?this.runInAsyncScope(this.runAsync.bind(this),this,r,e):this.runInAsyncScope(this.run.bind(this),this,r,e):void 0!==e.parent?this.mainWorker=e.parent:void 0!==e.kill&&(null!=this.aliveInterval&&clearInterval(this.aliveInterval),this.emitDestroy())}checkWorkerOptions(e){this.opts.killBehavior=e.killBehavior??R,this.opts.maxInactiveTime=e.maxInactiveTime??y,this.opts.async=e.async??!1}checkFunctionInput(e){if(null==e)throw new Error("fn parameter is mandatory");if("function"!=typeof e)throw new TypeError("fn parameter is not a function")}getMainWorker(){if(null==this.mainWorker)throw new Error("Main worker was not set");return this.mainWorker}checkAlive(){Date.now()-this.lastTaskTimestamp>(this.opts.maxInactiveTime??y)&&this.sendToMainWorker({kill:this.opts.killBehavior})}handleError(e){return e}run(e,r){try{const t=Date.now(),s=e(r.data),o=Date.now()-t;this.sendToMainWorker({data:s,id:r.id,taskRunTime:o})}catch(e){const t=this.handleError(e);this.sendToMainWorker({error:t,id:r.id})}finally{!this.isMain&&(this.lastTaskTimestamp=Date.now())}}runAsync(e,r){const t=Date.now();e(r.data).then((e=>{const s=Date.now()-t;return this.sendToMainWorker({data:e,id:r.id,taskRunTime:s}),null})).catch((e=>{const t=this.handleError(e);this.sendToMainWorker({error:t,id:r.id})})).finally((()=>{!this.isMain&&(this.lastTaskTimestamp=Date.now())})).catch(a)}}exports.ClusterWorker=class extends x{constructor(e,t={}){super("worker-cluster-pool:poolifier",r.isPrimary,e,r.worker,t)}sendToMainWorker(e){this.getMainWorker().send(e)}handleError(e){return e instanceof Error?e.message:e}},exports.DynamicClusterPool=class extends T{max;constructor(e,r,t,s={}){super(e,t,s),this.max=r}get type(){return e.DYNAMIC}get full(){return this.workers.length===this.max}get busy(){return this.full&&-1===this.findFreeWorkerKey()}},exports.DynamicThreadPool=class extends f{max;constructor(e,r,t,s={}){super(e,t,s),this.max=r}get type(){return e.DYNAMIC}get full(){return this.workers.length===this.max}get busy(){return this.full&&-1===this.findFreeWorkerKey()}},exports.FixedClusterPool=T,exports.FixedThreadPool=f,exports.KillBehaviors=h,exports.ThreadWorker=class extends x{constructor(e,r={}){super("worker-thread-pool:poolifier",i.isMainThread,e,i.parentPort,r)}sendToMainWorker(e){this.getMainWorker().postMessage(e)}},exports.WorkerChoiceStrategies=u; |
@@ -1,1 +0,1 @@ | ||
"use strict";var e,r=require("node:cluster"),t=require("node:crypto"),s=require("node:events"),o=require("node:os"),i=require("node:worker_threads"),n=require("node:async_hooks");!function(e){e.FIXED="fixed",e.DYNAMIC="dynamic"}(e||(e={}));const a=Object.freeze((()=>{})),h=Object.freeze({SOFT:"SOFT",HARD:"HARD"});function k(e,r){return r===e}class u extends s{}const c=Object.freeze({ROUND_ROBIN:"ROUND_ROBIN",LESS_USED:"LESS_USED",LESS_BUSY:"LESS_BUSY",FAIR_SHARE:"FAIR_SHARE",WEIGHTED_ROUND_ROBIN:"WEIGHTED_ROUND_ROBIN"});class l{pool;isDynamicPool;requiredStatistics={runTime:!1,avgRunTime:!1};constructor(r){this.pool=r,this.isDynamicPool=this.pool.type===e.DYNAMIC}}class p extends l{requiredStatistics={runTime:!0,avgRunTime:!0};workerLastVirtualTaskTimestamp=new Map;reset(){return this.workerLastVirtualTaskTimestamp.clear(),!0}choose(){let e,r=1/0;for(const[t]of this.pool.workers.entries()){this.computeWorkerLastVirtualTaskTimestamp(t);const s=this.workerLastVirtualTaskTimestamp.get(t)?.end??0;s<r&&(r=s,e=t)}return e}remove(e){const r=this.workerLastVirtualTaskTimestamp.delete(e);for(const[r,t]of this.workerLastVirtualTaskTimestamp.entries())r>e&&this.workerLastVirtualTaskTimestamp.set(r-1,t);return r}computeWorkerLastVirtualTaskTimestamp(e){const r=Math.max(Date.now(),this.workerLastVirtualTaskTimestamp.get(e)?.end??-1/0);this.workerLastVirtualTaskTimestamp.set(e,{start:r,end:r+(this.pool.workers[e].tasksUsage.avgRunTime??0)})}}class m extends l{requiredStatistics={runTime:!0,avgRunTime:!1};reset(){return!0}choose(){const e=this.pool.findFreeWorkerKey();if(!this.isDynamicPool&&-1!==e)return e;let r,t=1/0;for(const[e,s]of this.pool.workers.entries()){const o=s.tasksUsage.runTime;if(0===o)return e;o<t&&(t=o,r=e)}return r}remove(e){return!0}}class d extends l{reset(){return!0}choose(){const e=this.pool.findFreeWorkerKey();if(!this.isDynamicPool&&-1!==e)return e;let r,t=1/0;for(const[e,s]of this.pool.workers.entries()){const o=s.tasksUsage,i=o?.run+o?.running;if(0===i)return e;i<t&&(t=i,r=e)}return r}remove(e){return!0}}class w extends l{nextWorkerId=0;reset(){return this.nextWorkerId=0,!0}choose(){const e=this.nextWorkerId;return this.nextWorkerId=this.nextWorkerId===this.pool.workers.length-1?0:this.nextWorkerId+1,e}remove(e){return this.nextWorkerId===e&&(this.nextWorkerId=this.nextWorkerId>this.pool.workers.length-1?this.pool.workers.length-1:this.nextWorkerId),!0}}class g extends l{requiredStatistics={runTime:!0,avgRunTime:!0};currentWorkerId=0;defaultWorkerWeight;workersTaskRunTime=new Map;constructor(e){super(e),this.defaultWorkerWeight=this.computeWorkerWeight(),this.initWorkersTaskRunTime()}reset(){return this.currentWorkerId=0,this.workersTaskRunTime.clear(),this.initWorkersTaskRunTime(),!0}choose(){const e=this.currentWorkerId;this.isDynamicPool&&!this.workersTaskRunTime.has(e)&&this.initWorkerTaskRunTime(e);const r=this.workersTaskRunTime.get(e)?.runTime??0,t=this.workersTaskRunTime.get(e)?.weight??this.defaultWorkerWeight;return r<t?this.setWorkerTaskRunTime(e,t,r+(this.getWorkerVirtualTaskRunTime(e)??0)):(this.currentWorkerId=this.currentWorkerId===this.pool.workers.length-1?0:this.currentWorkerId+1,this.setWorkerTaskRunTime(this.currentWorkerId,t,0)),e}remove(e){this.currentWorkerId===e&&(this.currentWorkerId=this.currentWorkerId>this.pool.workers.length-1?this.pool.workers.length-1:this.currentWorkerId);const r=this.workersTaskRunTime.delete(e);for(const[r,t]of this.workersTaskRunTime)r>e&&this.workersTaskRunTime.set(r-1,t);return r}initWorkersTaskRunTime(){for(const[e]of this.pool.workers.entries())this.initWorkerTaskRunTime(e)}initWorkerTaskRunTime(e){this.setWorkerTaskRunTime(e,this.defaultWorkerWeight,0)}setWorkerTaskRunTime(e,r,t){this.workersTaskRunTime.set(e,{weight:r,runTime:t})}getWorkerVirtualTaskRunTime(e){return this.pool.workers[e].tasksUsage.avgRunTime}computeWorkerWeight(){let e=0;for(const r of o.cpus()){const t=r.speed.toString().length-1;e+=1/(r.speed/Math.pow(10,t))*Math.pow(10,t)}return Math.round(e/o.cpus().length)}}function W(e,r=c.ROUND_ROBIN){switch(r){case c.ROUND_ROBIN:return new w(e);case c.LESS_USED:return new d(e);case c.LESS_BUSY:return new m(e);case c.FAIR_SHARE:return new p(e);case c.WEIGHTED_ROUND_ROBIN:return new g(e);default:throw new Error(`Worker choice strategy '${r}' not found`)}}class T extends l{createWorkerCallback;workerChoiceStrategy;constructor(e,r,t=c.ROUND_ROBIN){super(e),this.createWorkerCallback=r,this.workerChoiceStrategy=W(this.pool,t),this.requiredStatistics=this.workerChoiceStrategy.requiredStatistics}reset(){return this.workerChoiceStrategy.reset()}choose(){const e=this.pool.findFreeWorkerKey();return-1!==e?e:this.pool.busy?this.workerChoiceStrategy.choose():this.createWorkerCallback()}remove(e){return this.workerChoiceStrategy.remove(e)}}class y{pool;createWorkerCallback;workerChoiceStrategy;constructor(e,r,t=c.ROUND_ROBIN){this.pool=e,this.createWorkerCallback=r,this.setWorkerChoiceStrategy(t)}getPoolWorkerChoiceStrategy(r=c.ROUND_ROBIN){return this.pool.type===e.DYNAMIC?new T(this.pool,this.createWorkerCallback,r):W(this.pool,r)}getRequiredStatistics(){return this.workerChoiceStrategy.requiredStatistics}setWorkerChoiceStrategy(e){this.workerChoiceStrategy?.reset(),this.workerChoiceStrategy=this.getPoolWorkerChoiceStrategy(e)}execute(){return this.workerChoiceStrategy.choose()}remove(e){return this.workerChoiceStrategy.remove(e)}}class f{numberOfWorkers;filePath;opts;workers=[];emitter;promiseResponseMap=new Map;workerChoiceStrategyContext;constructor(e,r,t){if(this.numberOfWorkers=e,this.filePath=r,this.opts=t,!this.isMain())throw new Error("Cannot start a pool from a worker!");this.checkNumberOfWorkers(this.numberOfWorkers),this.checkFilePath(this.filePath),this.checkPoolOptions(this.opts),this.setupHook();for(let e=1;e<=this.numberOfWorkers;e++)this.createAndSetupWorker();!0===this.opts.enableEvents&&(this.emitter=new u),this.workerChoiceStrategyContext=new y(this,(()=>{const e=this.createAndSetupWorker();return this.registerWorkerMessageListener(e,(r=>{(k(h.HARD,r.kill)||0===this.getWorkerTasksUsage(e)?.running)&&this.destroyWorker(e)})),this.getWorkerKey(e)}),this.opts.workerChoiceStrategy)}checkFilePath(e){if(null==e||"string"==typeof e&&0===e.trim().length)throw new Error("Please specify a file with a worker implementation")}checkNumberOfWorkers(r){if(null==r)throw new Error("Cannot instantiate a pool without specifying the number of workers");if(!Number.isSafeInteger(r))throw new TypeError("Cannot instantiate a pool with a non integer number of workers");if(r<0)throw new RangeError("Cannot instantiate a pool with a negative number of workers");if(this.type===e.FIXED&&0===r)throw new Error("Cannot instantiate a fixed pool with no worker")}checkPoolOptions(e){this.opts.workerChoiceStrategy=e.workerChoiceStrategy??c.ROUND_ROBIN,this.opts.enableEvents=e.enableEvents??!0}get numberOfRunningTasks(){return this.promiseResponseMap.size}getWorkerKey(e){return this.workers.findIndex((r=>r.worker===e))}setWorkerChoiceStrategy(e){this.opts.workerChoiceStrategy=e;for(const[e,r]of this.workers.entries())this.setWorker(e,r.worker,{run:0,running:0,runTime:0,avgRunTime:0,error:0});this.workerChoiceStrategyContext.setWorkerChoiceStrategy(e)}internalGetBusyStatus(){return this.numberOfRunningTasks>=this.numberOfWorkers&&-1===this.findFreeWorkerKey()}findFreeWorkerKey(){return this.workers.findIndex((e=>0===e.tasksUsage.running))}async execute(e){const[r,s]=this.chooseWorker(),o=t.randomUUID(),i=this.internalExecute(r,s,o);return this.checkAndEmitBusy(),this.sendToWorker(s,{data:e??{},id:o}),i}async destroy(){await Promise.all(this.workers.map((async e=>{await this.destroyWorker(e.worker)})))}setupHook(){}beforePromiseResponseHook(e){++this.workers[e].tasksUsage.running}afterPromiseResponseHook(e,r){const t=this.getWorkerTasksUsage(e);--t.running,++t.run,null!=r.error&&++t.error,this.workerChoiceStrategyContext.getRequiredStatistics().runTime&&(t.runTime+=r.taskRunTime??0,this.workerChoiceStrategyContext.getRequiredStatistics().avgRunTime&&0!==t.run&&(t.avgRunTime=t.runTime/t.run))}removeWorker(e){const r=this.getWorkerKey(e);this.workers.splice(r,1),this.workerChoiceStrategyContext.remove(r)}chooseWorker(){const e=this.workerChoiceStrategyContext.execute();return[e,this.workers[e].worker]}createAndSetupWorker(){const e=this.createWorker();return e.on("message",this.opts.messageHandler??a),e.on("error",this.opts.errorHandler??a),e.on("online",this.opts.onlineHandler??a),e.on("exit",this.opts.exitHandler??a),e.once("exit",(()=>{this.removeWorker(e)})),this.pushWorker(e,{run:0,running:0,runTime:0,avgRunTime:0,error:0}),this.afterWorkerSetup(e),e}workerListener(){return e=>{if(void 0!==e.id){const r=this.promiseResponseMap.get(e.id);void 0!==r&&(null!=e.error?r.reject(e.error):r.resolve(e.data),this.afterPromiseResponseHook(r.worker,e),this.promiseResponseMap.delete(e.id))}}}async internalExecute(e,r,t){return this.beforePromiseResponseHook(e),await new Promise(((e,s)=>{this.promiseResponseMap.set(t,{resolve:e,reject:s,worker:r})}))}checkAndEmitBusy(){!0===this.opts.enableEvents&&this.busy&&this.emitter?.emit("busy")}getWorkerTasksUsage(e){const r=this.getWorkerKey(e);if(-1!==r)return this.workers[r].tasksUsage;throw new Error("Worker could not be found in the pool")}pushWorker(e,r){this.workers.push({worker:e,tasksUsage:r})}setWorker(e,r,t){this.workers[e]={worker:r,tasksUsage:t}}}class R extends f{opts;constructor(e,r,t={}){super(e,r,t),this.opts=t}setupHook(){r.setupPrimary({...this.opts.settings,exec:this.filePath})}isMain(){return r.isPrimary}destroyWorker(e){this.sendToWorker(e,{kill:1}),e.kill()}sendToWorker(e,r){e.send(r)}registerWorkerMessageListener(e,r){e.on("message",r)}createWorker(){return r.fork(this.opts.env)}afterWorkerSetup(e){this.registerWorkerMessageListener(e,super.workerListener())}get type(){return e.FIXED}get busy(){return this.internalGetBusyStatus()}}class S extends f{constructor(e,r,t={}){super(e,r,t)}isMain(){return i.isMainThread}async destroyWorker(e){this.sendToWorker(e,{kill:1}),await e.terminate()}sendToWorker(e,r){e.postMessage(r)}registerWorkerMessageListener(e,r){e.port2?.on("message",r)}createWorker(){return new i.Worker(this.filePath,{env:i.SHARE_ENV})}afterWorkerSetup(e){const{port1:r,port2:t}=new i.MessageChannel;e.postMessage({parent:r},[r]),e.port1=r,e.port2=t,this.registerWorkerMessageListener(e,super.workerListener())}get type(){return e.FIXED}get busy(){return this.internalGetBusyStatus()}}const x=6e4,I=h.SOFT;class v extends n.AsyncResource{mainWorker;lastTaskTimestamp;aliveInterval;opts;constructor(e,r,t,s,o={killBehavior:I,maxInactiveTime:x}){super(e),this.mainWorker=s,this.opts=o,this.checkFunctionInput(t),this.checkWorkerOptions(this.opts),!r&&k(h.HARD,this.opts.killBehavior)&&(this.lastTaskTimestamp=Date.now(),this.aliveInterval=setInterval(this.checkAlive.bind(this),(this.opts.maxInactiveTime??x)/2),this.checkAlive.bind(this)()),this.mainWorker?.on("message",(e=>{this.messageListener(e,t)}))}messageListener(e,r){void 0!==e.data&&void 0!==e.id?!0===this.opts.async?this.runInAsyncScope(this.runAsync.bind(this),this,r,e):this.runInAsyncScope(this.run.bind(this),this,r,e):void 0!==e.parent?this.mainWorker=e.parent:void 0!==e.kill&&(null!=this.aliveInterval&&clearInterval(this.aliveInterval),this.emitDestroy())}checkWorkerOptions(e){this.opts.killBehavior=e.killBehavior??I,this.opts.maxInactiveTime=e.maxInactiveTime??x,this.opts.async=e.async??!1}checkFunctionInput(e){if(null==e)throw new Error("fn parameter is mandatory");if("function"!=typeof e)throw new TypeError("fn parameter is not a function")}getMainWorker(){if(null==this.mainWorker)throw new Error("Main worker was not set");return this.mainWorker}checkAlive(){Date.now()-this.lastTaskTimestamp>(this.opts.maxInactiveTime??x)&&this.sendToMainWorker({kill:this.opts.killBehavior})}handleError(e){return e}run(e,r){try{const t=Date.now(),s=e(r.data),o=Date.now()-t;this.sendToMainWorker({data:s,id:r.id,taskRunTime:o})}catch(e){const t=this.handleError(e);this.sendToMainWorker({error:t,id:r.id})}finally{k(h.HARD,this.opts.killBehavior)&&(this.lastTaskTimestamp=Date.now())}}runAsync(e,r){const t=Date.now();e(r.data).then((e=>{const s=Date.now()-t;return this.sendToMainWorker({data:e,id:r.id,taskRunTime:s}),null})).catch((e=>{const t=this.handleError(e);this.sendToMainWorker({error:t,id:r.id})})).finally((()=>{k(h.HARD,this.opts.killBehavior)&&(this.lastTaskTimestamp=Date.now())})).catch(a)}}exports.ClusterWorker=class extends v{constructor(e,t={}){super("worker-cluster-pool:poolifier",r.isPrimary,e,r.worker,t)}sendToMainWorker(e){this.getMainWorker().send(e)}handleError(e){return e instanceof Error?e.message:e}},exports.DynamicClusterPool=class extends R{max;constructor(e,r,t,s={}){super(e,t,s),this.max=r}get type(){return e.DYNAMIC}get busy(){return this.workers.length===this.max}},exports.DynamicThreadPool=class extends S{max;constructor(e,r,t,s={}){super(e,t,s),this.max=r}get type(){return e.DYNAMIC}get busy(){return this.workers.length===this.max}},exports.FixedClusterPool=R,exports.FixedThreadPool=S,exports.KillBehaviors=h,exports.ThreadWorker=class extends v{constructor(e,r={}){super("worker-thread-pool:poolifier",i.isMainThread,e,i.parentPort,r)}sendToMainWorker(e){this.getMainWorker().postMessage(e)}},exports.WorkerChoiceStrategies=c; | ||
"use strict";var e,r=require("node:cluster"),t=require("node:crypto"),s=require("node:events"),o=require("node:os"),i=require("node:worker_threads"),n=require("node:async_hooks");!function(e){e.FIXED="fixed",e.DYNAMIC="dynamic"}(e||(e={}));const a=Object.freeze((()=>{})),h=Object.freeze({SOFT:"SOFT",HARD:"HARD"});class k extends s{}const u=Object.freeze({ROUND_ROBIN:"ROUND_ROBIN",LESS_USED:"LESS_USED",LESS_BUSY:"LESS_BUSY",FAIR_SHARE:"FAIR_SHARE",WEIGHTED_ROUND_ROBIN:"WEIGHTED_ROUND_ROBIN"});class c{pool;isDynamicPool;requiredStatistics={runTime:!1,avgRunTime:!1};constructor(r){this.pool=r,this.isDynamicPool=this.pool.type===e.DYNAMIC,this.choose.bind(this)}}class l extends c{requiredStatistics={runTime:!0,avgRunTime:!0};workerLastVirtualTaskTimestamp=new Map;reset(){return this.workerLastVirtualTaskTimestamp.clear(),!0}choose(){let e,r=1/0;for(const[t]of this.pool.workers.entries()){this.computeWorkerLastVirtualTaskTimestamp(t);const s=this.workerLastVirtualTaskTimestamp.get(t)?.end??0;s<r&&(r=s,e=t)}return e}remove(e){const r=this.workerLastVirtualTaskTimestamp.delete(e);for(const[r,t]of this.workerLastVirtualTaskTimestamp.entries())r>e&&this.workerLastVirtualTaskTimestamp.set(r-1,t);return r}computeWorkerLastVirtualTaskTimestamp(e){const r=Math.max(Date.now(),this.workerLastVirtualTaskTimestamp.get(e)?.end??-1/0);this.workerLastVirtualTaskTimestamp.set(e,{start:r,end:r+(this.pool.workers[e].tasksUsage.avgRunTime??0)})}}class m extends c{requiredStatistics={runTime:!0,avgRunTime:!1};reset(){return!0}choose(){const e=this.pool.findFreeWorkerKey();if(-1!==e)return e;let r,t=1/0;for(const[e,s]of this.pool.workers.entries()){const o=s.tasksUsage.runTime;if(0===o)return e;o<t&&(t=o,r=e)}return r}remove(e){return!0}}class p extends c{reset(){return!0}choose(){const e=this.pool.findFreeWorkerKey();if(-1!==e)return e;let r,t=1/0;for(const[e,s]of this.pool.workers.entries()){const o=s.tasksUsage,i=o?.run+o?.running;if(0===i)return e;i<t&&(t=i,r=e)}return r}remove(e){return!0}}class d extends c{nextWorkerId=0;reset(){return this.nextWorkerId=0,!0}choose(){const e=this.nextWorkerId;return this.nextWorkerId=this.nextWorkerId===this.pool.workers.length-1?0:this.nextWorkerId+1,e}remove(e){return this.nextWorkerId===e&&(this.nextWorkerId=this.nextWorkerId>this.pool.workers.length-1?this.pool.workers.length-1:this.nextWorkerId),!0}}class w extends c{requiredStatistics={runTime:!0,avgRunTime:!0};currentWorkerId=0;defaultWorkerWeight;workersTaskRunTime=new Map;constructor(e){super(e),this.defaultWorkerWeight=this.computeWorkerWeight(),this.initWorkersTaskRunTime()}reset(){return this.currentWorkerId=0,this.workersTaskRunTime.clear(),this.initWorkersTaskRunTime(),!0}choose(){const e=this.currentWorkerId;this.isDynamicPool&&!this.workersTaskRunTime.has(e)&&this.initWorkerTaskRunTime(e);const r=this.workersTaskRunTime.get(e)?.runTime??0,t=this.workersTaskRunTime.get(e)?.weight??this.defaultWorkerWeight;return r<t?this.setWorkerTaskRunTime(e,t,r+(this.getWorkerVirtualTaskRunTime(e)??0)):(this.currentWorkerId=this.currentWorkerId===this.pool.workers.length-1?0:this.currentWorkerId+1,this.setWorkerTaskRunTime(this.currentWorkerId,t,0)),e}remove(e){this.currentWorkerId===e&&(this.currentWorkerId=this.currentWorkerId>this.pool.workers.length-1?this.pool.workers.length-1:this.currentWorkerId);const r=this.workersTaskRunTime.delete(e);for(const[r,t]of this.workersTaskRunTime)r>e&&this.workersTaskRunTime.set(r-1,t);return r}initWorkersTaskRunTime(){for(const[e]of this.pool.workers.entries())this.initWorkerTaskRunTime(e)}initWorkerTaskRunTime(e){this.setWorkerTaskRunTime(e,this.defaultWorkerWeight,0)}setWorkerTaskRunTime(e,r,t){this.workersTaskRunTime.set(e,{weight:r,runTime:t})}getWorkerVirtualTaskRunTime(e){return this.pool.workers[e].tasksUsage.avgRunTime}computeWorkerWeight(){let e=0;for(const r of o.cpus()){const t=r.speed.toString().length-1;e+=1/(r.speed/Math.pow(10,t))*Math.pow(10,t)}return Math.round(e/o.cpus().length)}}class g{pool;createWorkerCallback;workerChoiceStrategy;constructor(e,r,t=u.ROUND_ROBIN){this.pool=e,this.createWorkerCallback=r,this.execute.bind(this),this.setWorkerChoiceStrategy(t)}getRequiredStatistics(){return this.workerChoiceStrategy.requiredStatistics}setWorkerChoiceStrategy(e){this.workerChoiceStrategy?.reset(),this.workerChoiceStrategy=function(e,r=u.ROUND_ROBIN){switch(r){case u.ROUND_ROBIN:return new d(e);case u.LESS_USED:return new p(e);case u.LESS_BUSY:return new m(e);case u.FAIR_SHARE:return new l(e);case u.WEIGHTED_ROUND_ROBIN:return new w(e);default:throw new Error(`Worker choice strategy '${r}' not found`)}}(this.pool,e)}execute(){return this.workerChoiceStrategy.isDynamicPool&&!this.pool.full&&-1===this.pool.findFreeWorkerKey()?this.createWorkerCallback():this.workerChoiceStrategy.choose()}remove(e){return this.workerChoiceStrategy.remove(e)}}class W{numberOfWorkers;filePath;opts;workers=[];emitter;promiseResponseMap=new Map;workerChoiceStrategyContext;constructor(e,r,t){if(this.numberOfWorkers=e,this.filePath=r,this.opts=t,!this.isMain())throw new Error("Cannot start a pool from a worker!");this.checkNumberOfWorkers(this.numberOfWorkers),this.checkFilePath(this.filePath),this.checkPoolOptions(this.opts),this.chooseWorker.bind(this),this.internalExecute.bind(this),this.checkAndEmitBusy.bind(this),this.sendToWorker.bind(this),this.setupHook();for(let e=1;e<=this.numberOfWorkers;e++)this.createAndSetupWorker();!0===this.opts.enableEvents&&(this.emitter=new k),this.workerChoiceStrategyContext=new g(this,(()=>{const e=this.createAndSetupWorker();return this.registerWorkerMessageListener(e,(r=>{var t;t=h.HARD,(r.kill===t||0===this.getWorkerTasksUsage(e)?.running)&&this.destroyWorker(e)})),this.getWorkerKey(e)}),this.opts.workerChoiceStrategy)}checkFilePath(e){if(null==e||"string"==typeof e&&0===e.trim().length)throw new Error("Please specify a file with a worker implementation")}checkNumberOfWorkers(r){if(null==r)throw new Error("Cannot instantiate a pool without specifying the number of workers");if(!Number.isSafeInteger(r))throw new TypeError("Cannot instantiate a pool with a non integer number of workers");if(r<0)throw new RangeError("Cannot instantiate a pool with a negative number of workers");if(this.type===e.FIXED&&0===r)throw new Error("Cannot instantiate a fixed pool with no worker")}checkPoolOptions(e){this.opts.workerChoiceStrategy=e.workerChoiceStrategy??u.ROUND_ROBIN,this.opts.enableEvents=e.enableEvents??!0}get numberOfRunningTasks(){return this.promiseResponseMap.size}getWorkerKey(e){return this.workers.findIndex((r=>r.worker===e))}setWorkerChoiceStrategy(e){this.opts.workerChoiceStrategy=e;for(const[e,r]of this.workers.entries())this.setWorker(e,r.worker,{run:0,running:0,runTime:0,avgRunTime:0,error:0});this.workerChoiceStrategyContext.setWorkerChoiceStrategy(e)}internalBusy(){return this.numberOfRunningTasks>=this.numberOfWorkers&&-1===this.findFreeWorkerKey()}findFreeWorkerKey(){return this.workers.findIndex((e=>0===e.tasksUsage.running))}async execute(e){const[r,s]=this.chooseWorker(),o=t.randomUUID(),i=this.internalExecute(r,s,o);return this.checkAndEmitBusy(),this.sendToWorker(s,{data:e??{},id:o}),i}async destroy(){await Promise.all(this.workers.map((async e=>{await this.destroyWorker(e.worker)})))}setupHook(){}beforePromiseResponseHook(e){++this.workers[e].tasksUsage.running}afterPromiseResponseHook(e,r){const t=this.getWorkerTasksUsage(e);--t.running,++t.run,null!=r.error&&++t.error,this.workerChoiceStrategyContext.getRequiredStatistics().runTime&&(t.runTime+=r.taskRunTime??0,this.workerChoiceStrategyContext.getRequiredStatistics().avgRunTime&&0!==t.run&&(t.avgRunTime=t.runTime/t.run))}removeWorker(e){const r=this.getWorkerKey(e);this.workers.splice(r,1),this.workerChoiceStrategyContext.remove(r)}chooseWorker(){const e=this.workerChoiceStrategyContext.execute();return[e,this.workers[e].worker]}createAndSetupWorker(){const e=this.createWorker();return e.on("message",this.opts.messageHandler??a),e.on("error",this.opts.errorHandler??a),e.on("online",this.opts.onlineHandler??a),e.on("exit",this.opts.exitHandler??a),e.once("exit",(()=>{this.removeWorker(e)})),this.pushWorker(e,{run:0,running:0,runTime:0,avgRunTime:0,error:0}),this.afterWorkerSetup(e),e}workerListener(){return e=>{if(void 0!==e.id){const r=this.promiseResponseMap.get(e.id);void 0!==r&&(null!=e.error?r.reject(e.error):r.resolve(e.data),this.afterPromiseResponseHook(r.worker,e),this.promiseResponseMap.delete(e.id))}}}async internalExecute(e,r,t){return this.beforePromiseResponseHook(e),await new Promise(((e,s)=>{this.promiseResponseMap.set(t,{resolve:e,reject:s,worker:r})}))}checkAndEmitBusy(){!0===this.opts.enableEvents&&this.busy&&this.emitter?.emit("busy")}getWorkerTasksUsage(e){const r=this.getWorkerKey(e);if(-1!==r)return this.workers[r].tasksUsage;throw new Error("Worker could not be found in the pool")}pushWorker(e,r){this.workers.push({worker:e,tasksUsage:r})}setWorker(e,r,t){this.workers[e]={worker:r,tasksUsage:t}}}class T extends W{opts;constructor(e,r,t={}){super(e,r,t),this.opts=t}setupHook(){r.setupPrimary({...this.opts.settings,exec:this.filePath})}isMain(){return r.isPrimary}destroyWorker(e){this.sendToWorker(e,{kill:1}),e.kill()}sendToWorker(e,r){e.send(r)}registerWorkerMessageListener(e,r){e.on("message",r)}createWorker(){return r.fork(this.opts.env)}afterWorkerSetup(e){this.registerWorkerMessageListener(e,super.workerListener())}get type(){return e.FIXED}get full(){return this.workers.length===this.numberOfWorkers}get busy(){return this.internalBusy()}}class f extends W{constructor(e,r,t={}){super(e,r,t)}isMain(){return i.isMainThread}async destroyWorker(e){this.sendToWorker(e,{kill:1}),await e.terminate()}sendToWorker(e,r){e.postMessage(r)}registerWorkerMessageListener(e,r){e.port2?.on("message",r)}createWorker(){return new i.Worker(this.filePath,{env:i.SHARE_ENV})}afterWorkerSetup(e){const{port1:r,port2:t}=new i.MessageChannel;e.postMessage({parent:r},[r]),e.port1=r,e.port2=t,this.registerWorkerMessageListener(e,super.workerListener())}get type(){return e.FIXED}get full(){return this.workers.length===this.numberOfWorkers}get busy(){return this.internalBusy()}}const y=6e4,R=h.SOFT;class x extends n.AsyncResource{isMain;mainWorker;lastTaskTimestamp;aliveInterval;opts;constructor(e,r,t,s,o={killBehavior:R,maxInactiveTime:y}){super(e),this.isMain=r,this.mainWorker=s,this.opts=o,this.checkFunctionInput(t),this.checkWorkerOptions(this.opts),this.isMain||(this.lastTaskTimestamp=Date.now(),this.aliveInterval=setInterval(this.checkAlive.bind(this),(this.opts.maxInactiveTime??y)/2),this.checkAlive.bind(this)()),this.mainWorker?.on("message",(e=>{this.messageListener(e,t)}))}messageListener(e,r){void 0!==e.data&&void 0!==e.id?!0===this.opts.async?this.runInAsyncScope(this.runAsync.bind(this),this,r,e):this.runInAsyncScope(this.run.bind(this),this,r,e):void 0!==e.parent?this.mainWorker=e.parent:void 0!==e.kill&&(null!=this.aliveInterval&&clearInterval(this.aliveInterval),this.emitDestroy())}checkWorkerOptions(e){this.opts.killBehavior=e.killBehavior??R,this.opts.maxInactiveTime=e.maxInactiveTime??y,this.opts.async=e.async??!1}checkFunctionInput(e){if(null==e)throw new Error("fn parameter is mandatory");if("function"!=typeof e)throw new TypeError("fn parameter is not a function")}getMainWorker(){if(null==this.mainWorker)throw new Error("Main worker was not set");return this.mainWorker}checkAlive(){Date.now()-this.lastTaskTimestamp>(this.opts.maxInactiveTime??y)&&this.sendToMainWorker({kill:this.opts.killBehavior})}handleError(e){return e}run(e,r){try{const t=Date.now(),s=e(r.data),o=Date.now()-t;this.sendToMainWorker({data:s,id:r.id,taskRunTime:o})}catch(e){const t=this.handleError(e);this.sendToMainWorker({error:t,id:r.id})}finally{!this.isMain&&(this.lastTaskTimestamp=Date.now())}}runAsync(e,r){const t=Date.now();e(r.data).then((e=>{const s=Date.now()-t;return this.sendToMainWorker({data:e,id:r.id,taskRunTime:s}),null})).catch((e=>{const t=this.handleError(e);this.sendToMainWorker({error:t,id:r.id})})).finally((()=>{!this.isMain&&(this.lastTaskTimestamp=Date.now())})).catch(a)}}exports.ClusterWorker=class extends x{constructor(e,t={}){super("worker-cluster-pool:poolifier",r.isPrimary,e,r.worker,t)}sendToMainWorker(e){this.getMainWorker().send(e)}handleError(e){return e instanceof Error?e.message:e}},exports.DynamicClusterPool=class extends T{max;constructor(e,r,t,s={}){super(e,t,s),this.max=r}get type(){return e.DYNAMIC}get full(){return this.workers.length===this.max}get busy(){return this.full&&-1===this.findFreeWorkerKey()}},exports.DynamicThreadPool=class extends f{max;constructor(e,r,t,s={}){super(e,t,s),this.max=r}get type(){return e.DYNAMIC}get full(){return this.workers.length===this.max}get busy(){return this.full&&-1===this.findFreeWorkerKey()}},exports.FixedClusterPool=T,exports.FixedThreadPool=f,exports.KillBehaviors=h,exports.ThreadWorker=class extends x{constructor(e,r={}){super("worker-thread-pool:poolifier",i.isMainThread,e,i.parentPort,r)}sendToMainWorker(e){this.getMainWorker().postMessage(e)}},exports.WorkerChoiceStrategies=u; |
@@ -52,5 +52,7 @@ import type { MessageValue, PromiseResponseWrapper } from '../utility-types'; | ||
abstract get type(): PoolType; | ||
/** {@inheritDoc} */ | ||
get numberOfRunningTasks(): number; | ||
/** | ||
* Number of tasks concurrently running. | ||
*/ | ||
private get numberOfRunningTasks(); | ||
/** | ||
* Gets the given worker key. | ||
@@ -65,4 +67,6 @@ * | ||
/** {@inheritDoc} */ | ||
abstract get full(): boolean; | ||
/** {@inheritDoc} */ | ||
abstract get busy(): boolean; | ||
protected internalGetBusyStatus(): boolean; | ||
protected internalBusy(): boolean; | ||
/** {@inheritDoc} */ | ||
@@ -69,0 +73,0 @@ findFreeWorkerKey(): number; |
@@ -8,3 +8,3 @@ import { PoolType } from '../pool-internal'; | ||
* This cluster pool creates new workers when the others are busy, up to the maximum number of workers. | ||
* When the maximum number of workers is reached, an event is emitted. If you want to listen to this event, use the pool's `emitter`. | ||
* When the maximum number of workers is reached and workers are busy, an event is emitted. If you want to listen to this event, use the pool's `emitter`. | ||
* | ||
@@ -17,3 +17,3 @@ * @typeParam Data - Type of data sent to the worker. This can only be serializable data. | ||
export declare class DynamicClusterPool<Data = unknown, Response = unknown> extends FixedClusterPool<Data, Response> { | ||
protected readonly max: number; | ||
private readonly max; | ||
/** | ||
@@ -31,3 +31,5 @@ * Constructs a new poolifier dynamic cluster pool. | ||
/** {@inheritDoc} */ | ||
get full(): boolean; | ||
/** {@inheritDoc} */ | ||
get busy(): boolean; | ||
} |
@@ -63,3 +63,5 @@ /// <reference types="node" /> | ||
/** {@inheritDoc} */ | ||
get full(): boolean; | ||
/** {@inheritDoc} */ | ||
get busy(): boolean; | ||
} |
@@ -23,3 +23,3 @@ import type { IPool } from './pool'; | ||
* | ||
* @typeParam Worker - Type of worker which manages this pool. | ||
* @typeParam Worker - Type of worker type items which manages this pool. | ||
*/ | ||
@@ -34,8 +34,8 @@ export interface WorkerType<Worker extends IPoolWorker> { | ||
* @typeParam Worker - Type of worker which manages this pool. | ||
* @typeParam Data - Type of data sent to the worker. | ||
* @typeParam Response - Type of response of execution. | ||
* @typeParam Data - Type of data sent to the worker. This can only be serializable data. | ||
* @typeParam Response - Type of response of execution. This can only be serializable data. | ||
*/ | ||
export interface IPoolInternal<Worker extends IPoolWorker, Data = unknown, Response = unknown> extends IPool<Data, Response> { | ||
/** | ||
* Pool workers item array. | ||
* Pool worker type items array. | ||
*/ | ||
@@ -50,2 +50,8 @@ readonly workers: Array<WorkerType<Worker>>; | ||
/** | ||
* Whether the pool is full or not. | ||
* | ||
* The pool filling boolean status. | ||
*/ | ||
readonly full: boolean; | ||
/** | ||
* Whether the pool is busy or not. | ||
@@ -57,6 +63,2 @@ * | ||
/** | ||
* Number of tasks currently concurrently running. | ||
*/ | ||
readonly numberOfRunningTasks: number; | ||
/** | ||
* Finds a free worker key based on the number of tasks the worker has applied. | ||
@@ -63,0 +65,0 @@ * |
@@ -5,3 +5,3 @@ import type { IPoolInternal } from '../pool-internal'; | ||
/** | ||
* Abstract worker choice strategy class. | ||
* Worker choice strategy abstract base class. | ||
* | ||
@@ -8,0 +8,0 @@ * @typeParam Worker - Type of worker which manages the strategy. |
@@ -31,3 +31,3 @@ /** | ||
/** | ||
* Pool tasks usage statistics requirements. | ||
* Pool worker tasks usage statistics requirements. | ||
*/ | ||
@@ -34,0 +34,0 @@ export interface RequiredStatistics { |
@@ -24,9 +24,2 @@ import type { IPoolInternal } from '../pool-internal'; | ||
/** | ||
* Gets the worker choice strategy instance specific to the pool type. | ||
* | ||
* @param workerChoiceStrategy - The worker choice strategy. | ||
* @returns The worker choice strategy instance for the pool type. | ||
*/ | ||
private getPoolWorkerChoiceStrategy; | ||
/** | ||
* Gets the worker choice strategy required statistics. | ||
@@ -44,3 +37,3 @@ * | ||
/** | ||
* Chooses a worker with the underlying selection strategy. | ||
* Chooses a worker with the worker choice strategy. | ||
* | ||
@@ -51,3 +44,3 @@ * @returns The key of the chosen one. | ||
/** | ||
* Removes a worker in the underlying selection strategy internals. | ||
* Removes a worker in the worker choice strategy internals. | ||
* | ||
@@ -54,0 +47,0 @@ * @param workerKey - The key of the worker to remove. |
@@ -9,3 +9,3 @@ import type { PoolOptions } from '../pool'; | ||
* This thread pool creates new threads when the others are busy, up to the maximum number of threads. | ||
* When the maximum number of threads is reached, an event is emitted. If you want to listen to this event, use the pool's `emitter`. | ||
* When the maximum number of threads is reached and workers are busy, an event is emitted. If you want to listen to this event, use the pool's `emitter`. | ||
* | ||
@@ -18,3 +18,3 @@ * @typeParam Data - Type of data sent to the worker. This can only be serializable data. | ||
export declare class DynamicThreadPool<Data = unknown, Response = unknown> extends FixedThreadPool<Data, Response> { | ||
protected readonly max: number; | ||
private readonly max; | ||
/** | ||
@@ -32,3 +32,5 @@ * Constructs a new poolifier dynamic thread pool. | ||
/** {@inheritDoc} */ | ||
get full(): boolean; | ||
/** {@inheritDoc} */ | ||
get busy(): boolean; | ||
} |
@@ -47,3 +47,5 @@ /// <reference types="node" /> | ||
/** {@inheritDoc} */ | ||
get full(): boolean; | ||
/** {@inheritDoc} */ | ||
get busy(): boolean; | ||
} |
@@ -18,2 +18,3 @@ /// <reference types="node" /> | ||
export declare abstract class AbstractWorker<MainWorker extends Worker | MessagePort, Data = unknown, Response = unknown> extends AsyncResource { | ||
protected readonly isMain: boolean; | ||
protected mainWorker: MainWorker | undefined | null; | ||
@@ -20,0 +21,0 @@ /** |
{ | ||
"name": "poolifier", | ||
"version": "2.4.0", | ||
"version": "2.4.1", | ||
"description": "A fast, easy to use Node.js Worker Thread Pool and Cluster Pool implementation", | ||
@@ -81,3 +81,3 @@ "license": "MIT", | ||
"@rollup/plugin-terser": "^0.4.0", | ||
"@rollup/plugin-typescript": "^11.0.0", | ||
"@rollup/plugin-typescript": "^11.1.0", | ||
"@types/node": "^18.15.11", | ||
@@ -84,0 +84,0 @@ "@typescript-eslint/eslint-plugin": "^5.57.1", |
@@ -168,3 +168,3 @@ <div align="center"> | ||
- `WorkerChoiceStrategies.LESS_BUSY`: Submit tasks to the less busy worker | ||
- `WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN` Submit tasks to worker using a weighted round robin scheduling algorithm based on tasks execution time | ||
- `WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN`: Submit tasks to worker using a weighted round robin scheduling algorithm based on tasks execution time | ||
- `WorkerChoiceStrategies.FAIR_SHARE`: Submit tasks to worker using a fair share tasks scheduling algorithm based on tasks execution time | ||
@@ -171,0 +171,0 @@ |
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
103248
30
1270