Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

poolifier

Package Overview
Dependencies
Maintainers
1
Versions
192
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

poolifier - npm Package Compare versions

Comparing version 2.2.0 to 2.2.1

345

lib/index.d.ts
/// <reference types="node" />
import EventEmitter from "events";
import { Worker } from "cluster";
import { Worker as ClusterWorker } from "cluster";
import { MessagePort, MessageChannel } from "worker_threads";
import { Worker as Worker$0 } from "worker_threads";
import EventEmitter from "events";
import { AsyncResource } from "async_hooks";

@@ -62,3 +63,3 @@ /**

*/
interface MessageValue<Data = unknown, MainWorker extends Worker | MessagePort | unknown = unknown> {
interface MessageValue<Data = unknown, MainWorker extends ClusterWorker | MessagePort | unknown = unknown> {
/**

@@ -83,3 +84,3 @@ * Input data that will be passed to the worker.

*
* _Only for internal use_
* Only for internal use.
*/

@@ -94,3 +95,3 @@ readonly parent?: MainWorker;

*/
interface PromiseWorkerResponseWrapper<Worker extends IWorker, Response = unknown> {
interface PromiseWorkerResponseWrapper<Worker extends AbstractPoolWorker, Response = unknown> {
/**

@@ -110,2 +111,77 @@ * Resolve callback to fulfill the promise.

/**
* Callback invoked if the worker has received a message.
*/
type MessageHandler<Worker> = (this: Worker, m: unknown) => void;
/**
* Callback invoked if the worker raised an error.
*/
type ErrorHandler<Worker> = (this: Worker, e: Error) => void;
/**
* Callback invoked when the worker has started successfully.
*/
type OnlineHandler<Worker> = (this: Worker) => void;
/**
* Callback invoked when the worker exits successfully.
*/
type ExitHandler<Worker> = (this: Worker, code: number) => void;
/**
* Basic interface that describes the minimum required implementation of listener events for a pool worker.
*/
interface IPoolWorker {
/**
* Worker identifier.
*/
readonly id?: number;
/**
* Register a listener to the message event.
*
* @param event `'message'`.
* @param handler The message handler.
*/
on(event: "message", handler: MessageHandler<this>): void;
/**
* Register a listener to the error event.
*
* @param event `'error'`.
* @param handler The error handler.
*/
on(event: "error", handler: ErrorHandler<this>): void;
/**
* Register a listener to the online event.
*
* @param event `'online'`.
* @param handler The online handler.
*/
on(event: "online", handler: OnlineHandler<this>): void;
/**
* Register a listener to the exit event.
*
* @param event `'exit'`.
* @param handler The exit handler.
*/
on(event: "exit", handler: ExitHandler<this>): void;
/**
* Register a listener to the exit event that will only performed once.
*
* @param event `'exit'`.
* @param handler The exit handler.
*/
once(event: "exit", handler: ExitHandler<this>): void;
}
/**
* Basic class that implement the minimum required for a pool worker.
*/
declare abstract class AbstractPoolWorker implements IPoolWorker {
/** @inheritDoc */
abstract on(event: "message", handler: MessageHandler<this>): void;
/** @inheritDoc */
abstract on(event: "error", handler: ErrorHandler<this>): void;
/** @inheritDoc */
abstract on(event: "online", handler: OnlineHandler<this>): void;
/** @inheritDoc */
abstract on(event: "exit", handler: ExitHandler<this>): void;
/** @inheritDoc */
abstract once(event: "exit", handler: ExitHandler<this>): void;
}
/**
* Enumeration of worker choice strategies.

@@ -122,40 +198,31 @@ */

/**
* The worker choice strategy context.
*
* @template Worker Type of worker.
* @template Data Type of data sent to the worker. This can only be serializable data.
* @template Response Type of response of execution. This can only be serializable data.
* Options for a poolifier pool.
*/
declare class WorkerChoiceStrategyContext<Worker extends IWorker, Data, Response> {
private readonly pool;
private createDynamicallyWorkerCallback;
// Will be set by setter in constructor
private workerChoiceStrategy;
interface PoolOptions<Worker> {
/**
* Worker choice strategy context constructor.
*
* @param pool The pool instance.
* @param createDynamicallyWorkerCallback The worker creation callback for dynamic pool.
* @param workerChoiceStrategy The worker choice strategy.
* A function that will listen for message event on each worker.
*/
constructor(pool: IPoolInternal<Worker, Data, Response>, createDynamicallyWorkerCallback: () => Worker, workerChoiceStrategy?: WorkerChoiceStrategy);
messageHandler?: MessageHandler<Worker>;
/**
* Get the worker choice strategy instance specific to the pool type.
*
* @param workerChoiceStrategy The worker choice strategy.
* @returns The worker choice strategy instance for the pool type.
* A function that will listen for error event on each worker.
*/
private getPoolWorkerChoiceStrategy;
errorHandler?: ErrorHandler<Worker>;
/**
* Set the worker choice strategy to use in the context.
*
* @param workerChoiceStrategy The worker choice strategy to set.
* A function that will listen for online event on each worker.
*/
setWorkerChoiceStrategy(workerChoiceStrategy: WorkerChoiceStrategy): void;
onlineHandler?: OnlineHandler<Worker>;
/**
* Choose a worker with the underlying selection strategy.
* A function that will listen for exit event on each worker.
*/
exitHandler?: ExitHandler<Worker>;
/**
* The work choice strategy to use in this pool.
*/
workerChoiceStrategy?: WorkerChoiceStrategy;
/**
* Pool events emission.
*
* @returns The chosen one.
* @default true
*/
execute(): Worker;
enableEvents?: boolean;
}

@@ -206,3 +273,3 @@ /**

*/
interface IPoolInternal<Worker extends IWorker, Data = unknown, Response = unknown> extends IPool<Data, Response> {
interface IPoolInternal<Worker extends AbstractPoolWorker, Data = unknown, Response = unknown> extends IPool<Data, Response> {
/**

@@ -248,101 +315,64 @@ * List of currently available workers.

/**
* Find a tasks map entry with a free worker based on the number of tasks the worker has applied.
* Find a free worker based on the number of tasks the worker has applied.
*
* If an entry is found with a worker that has `0` tasks, it is detected as free.
* If a worker is found with `0` running tasks, it is detected as free and returned.
*
* If no tasks map entry with a free worker was found, `false` will be returned.
* If no free worker is found, `false` is returned.
*
* @returns A tasks map entry with a free worker if there was one, otherwise `false`.
* @returns A free worker if there is one, otherwise `false`.
*/
findFreeTasksMapEntry(): [
Worker,
number
] | false;
}
/**
* Callback invoked if the worker has received a message.
*/
type MessageHandler<Worker> = (this: Worker, m: unknown) => void;
/**
* Callback invoked if the worker raised an error.
*/
type ErrorHandler<Worker> = (this: Worker, e: Error) => void;
/**
* Callback invoked when the worker has started successfully.
*/
type OnlineHandler<Worker> = (this: Worker) => void;
/**
* Callback invoked when the worker exits successfully.
*/
type ExitHandler<Worker> = (this: Worker, code: number) => void;
/**
* Basic interface that describes the minimum required implementation of listener events for a pool-worker.
*/
interface IWorker {
findFreeWorker(): Worker | false;
/**
* Register a listener to the message event.
* Get worker index.
*
* @param event `'message'`.
* @param handler The message handler.
* @param worker The worker.
* @returns The worker index.
*/
on(event: "message", handler: MessageHandler<this>): void;
getWorkerIndex(worker: Worker): number;
/**
* Register a listener to the error event.
* Get worker running tasks.
*
* @param event `'error'`.
* @param handler The error handler.
* @param worker The worker.
* @returns The number of tasks currently running on the worker.
*/
on(event: "error", handler: ErrorHandler<this>): void;
getWorkerRunningTasks(worker: Worker): number | undefined;
}
/**
* The worker choice strategy context.
*
* @template Worker Type of worker.
* @template Data Type of data sent to the worker. This can only be serializable data.
* @template Response Type of response of execution. This can only be serializable data.
*/
declare class WorkerChoiceStrategyContext<Worker extends AbstractPoolWorker, Data, Response> {
private readonly pool;
private createDynamicallyWorkerCallback;
private workerChoiceStrategy;
/**
* Register a listener to the online event.
* Worker choice strategy context constructor.
*
* @param event `'online'`.
* @param handler The online handler.
* @param pool The pool instance.
* @param createDynamicallyWorkerCallback The worker creation callback for dynamic pool.
* @param workerChoiceStrategy The worker choice strategy.
*/
on(event: "online", handler: OnlineHandler<this>): void;
constructor(pool: IPoolInternal<Worker, Data, Response>, createDynamicallyWorkerCallback: () => Worker, workerChoiceStrategy?: WorkerChoiceStrategy);
/**
* Register a listener to the exit event.
* Get the worker choice strategy instance specific to the pool type.
*
* @param event `'exit'`.
* @param handler The exit handler.
* @param workerChoiceStrategy The worker choice strategy.
* @returns The worker choice strategy instance for the pool type.
*/
on(event: "exit", handler: ExitHandler<this>): void;
private getPoolWorkerChoiceStrategy;
/**
* Register a listener to the exit event that will only performed once.
* Set the worker choice strategy to use in the context.
*
* @param event `'exit'`.
* @param handler The exit handler.
* @param workerChoiceStrategy The worker choice strategy to set.
*/
once(event: "exit", handler: ExitHandler<this>): void;
}
/**
* Options for a poolifier pool.
*/
interface PoolOptions<Worker> {
setWorkerChoiceStrategy(workerChoiceStrategy: WorkerChoiceStrategy): void;
/**
* A function that will listen for message event on each worker.
*/
messageHandler?: MessageHandler<Worker>;
/**
* A function that will listen for error event on each worker.
*/
errorHandler?: ErrorHandler<Worker>;
/**
* A function that will listen for online event on each worker.
*/
onlineHandler?: OnlineHandler<Worker>;
/**
* A function that will listen for exit event on each worker.
*/
exitHandler?: ExitHandler<Worker>;
/**
* The work choice strategy to use in this pool.
*/
workerChoiceStrategy?: WorkerChoiceStrategy;
/**
* Pool events emission.
* Choose a worker with the underlying selection strategy.
*
* @default true
* @returns The chosen one.
*/
enableEvents?: boolean;
execute(): Worker;
}

@@ -356,13 +386,13 @@ /**

*/
declare abstract class AbstractPool<Worker extends IWorker, Data = unknown, Response = unknown> implements IPoolInternal<Worker, Data, Response> {
declare abstract class AbstractPool<Worker extends AbstractPoolWorker, Data = unknown, Response = unknown> implements IPoolInternal<Worker, Data, Response> {
readonly numberOfWorkers: number;
readonly filePath: string;
readonly opts: PoolOptions<Worker>;
/** @inheritdoc */
/** @inheritDoc */
readonly workers: Worker[];
/** @inheritdoc */
/** @inheritDoc */
readonly tasks: Map<Worker, number>;
/** @inheritdoc */
/** @inheritDoc */
readonly emitter?: PoolEmitter;
/** @inheritdoc */
/** @inheritDoc */
readonly max?: number;

@@ -399,19 +429,20 @@ /**

private checkPoolOptions;
/** @inheritdoc */
/** @inheritDoc */
abstract get type(): PoolType;
/** @inheritdoc */
/** @inheritDoc */
get numberOfRunningTasks(): number;
/** @inheritdoc */
/** @inheritDoc */
getWorkerRunningTasks(worker: Worker): number | undefined;
/** @inheritDoc */
getWorkerIndex(worker: Worker): number;
/** @inheritDoc */
setWorkerChoiceStrategy(workerChoiceStrategy: WorkerChoiceStrategy): void;
/** @inheritdoc */
/** @inheritDoc */
abstract get busy(): boolean;
protected internalGetBusyStatus(): boolean;
/** @inheritdoc */
findFreeTasksMapEntry(): [
Worker,
number
] | false;
/** @inheritdoc */
/** @inheritDoc */
findFreeWorker(): Worker | false;
/** @inheritDoc */
execute(data: Data): Promise<Response>;
/** @inheritdoc */
/** @inheritDoc */
destroy(): Promise<void>;

@@ -502,3 +533,3 @@ /**

*
* @returns The listener function to execute when a message is sent from a worker.
* @returns The listener function to execute when a message is received from a worker.
*/

@@ -542,19 +573,19 @@ protected workerListener(): (message: MessageValue<Response>) => void;

constructor(numberOfWorkers: number, filePath: string, opts?: ClusterPoolOptions);
/** @inheritdoc */
/** @inheritDoc */
protected setupHook(): void;
/** @inheritdoc */
/** @inheritDoc */
protected isMain(): boolean;
/** @inheritdoc */
/** @inheritDoc */
destroyWorker(worker: Worker): void;
/** @inheritdoc */
/** @inheritDoc */
protected sendToWorker(worker: Worker, message: MessageValue<Data>): void;
/** @inheritdoc */
/** @inheritDoc */
registerWorkerMessageListener<Message extends Data | Response>(worker: Worker, listener: (message: MessageValue<Message>) => void): void;
/** @inheritdoc */
/** @inheritDoc */
protected createWorker(): Worker;
/** @inheritdoc */
/** @inheritDoc */
protected afterWorkerSetup(worker: Worker): void;
/** @inheritdoc */
/** @inheritDoc */
get type(): PoolType;
/** @inheritdoc */
/** @inheritDoc */
get busy(): boolean;

@@ -584,5 +615,5 @@ }

constructor(min: number, max: number, filePath: string, opts?: ClusterPoolOptions);
/** @inheritdoc */
/** @inheritDoc */
get type(): PoolType;
/** @inheritdoc */
/** @inheritDoc */
get busy(): boolean;

@@ -615,17 +646,17 @@ }

constructor(numberOfThreads: number, filePath: string, opts?: PoolOptions<ThreadWorkerWithMessageChannel>);
/** @inheritdoc */
/** @inheritDoc */
protected isMain(): boolean;
/** @inheritdoc */
/** @inheritDoc */
destroyWorker(worker: ThreadWorkerWithMessageChannel): Promise<void>;
/** @inheritdoc */
/** @inheritDoc */
protected sendToWorker(worker: ThreadWorkerWithMessageChannel, message: MessageValue<Data>): void;
/** @inheritdoc */
/** @inheritDoc */
registerWorkerMessageListener<Message extends Data | Response>(messageChannel: ThreadWorkerWithMessageChannel, listener: (message: MessageValue<Message>) => void): void;
/** @inheritdoc */
/** @inheritDoc */
protected createWorker(): ThreadWorkerWithMessageChannel;
/** @inheritdoc */
/** @inheritDoc */
protected afterWorkerSetup(worker: ThreadWorkerWithMessageChannel): void;
/** @inheritdoc */
/** @inheritDoc */
get type(): PoolType;
/** @inheritdoc */
/** @inheritDoc */
get busy(): boolean;

@@ -655,5 +686,5 @@ }

constructor(min: number, max: number, filePath: string, opts?: PoolOptions<ThreadWorkerWithMessageChannel>);
/** @inheritdoc */
/** @inheritDoc */
get type(): PoolType;
/** @inheritdoc */
/** @inheritDoc */
get busy(): boolean;

@@ -670,3 +701,2 @@ }

protected mainWorker: MainWorker | undefined | null;
readonly opts: WorkerOptions;
/**

@@ -681,2 +711,6 @@ * Timestamp of the last task processed by this worker.

/**
* Options for the worker.
*/
readonly opts: WorkerOptions;
/**
* Constructs a new poolifier worker.

@@ -691,2 +725,3 @@ *

constructor(type: string, isMain: boolean, fn: (data: Data) => Response, mainWorker: MainWorker | undefined | null, opts?: WorkerOptions);
protected messageListener(value: MessageValue<Data, MainWorker>, fn: (data: Data) => Response): void;
private checkWorkerOptions;

@@ -751,3 +786,3 @@ /**

*/
declare class ClusterWorker<Data = unknown, Response = unknown> extends AbstractWorker<Worker, Data, Response> {
declare class ClusterWorker$0<Data = unknown, Response = unknown> extends AbstractWorker<Worker, Data, Response> {
/**

@@ -760,5 +795,5 @@ * Constructs a new poolifier cluster worker.

constructor(fn: (data: Data) => Response, opts?: WorkerOptions);
/** @inheritdoc */
/** @inheritDoc */
protected sendToMainWorker(message: MessageValue<Response>): void;
/** @inheritdoc */
/** @inheritDoc */
protected handleError(e: Error | string): string;

@@ -788,6 +823,6 @@ }

constructor(fn: (data: Data) => Response, opts?: WorkerOptions);
/** @inheritdoc */
/** @inheritDoc */
protected sendToMainWorker(message: MessageValue<Response>): void;
}
export type { ErrorHandler, ExitHandler, IWorker, OnlineHandler, PoolOptions, ClusterPoolOptions, IPool, WorkerChoiceStrategy, ThreadWorkerWithMessageChannel, KillBehavior, WorkerOptions };
export { DynamicClusterPool, FixedClusterPool, WorkerChoiceStrategies, DynamicThreadPool, FixedThreadPool, AbstractWorker, ClusterWorker, ThreadWorker, KillBehaviors };
export { DynamicClusterPool, FixedClusterPool, WorkerChoiceStrategies, DynamicThreadPool, FixedThreadPool, AbstractWorker, ClusterWorker$0 as ClusterWorker, ThreadWorker, KillBehaviors };
export type { ClusterPoolOptions, IPool, PoolOptions, ErrorHandler, ExitHandler, IPoolWorker, OnlineHandler, WorkerChoiceStrategy, ThreadWorkerWithMessageChannel, KillBehavior, WorkerOptions };

@@ -1,1 +0,1 @@

"use strict";Object.defineProperty(exports,"__esModule",{value:!0});var e=require("events"),t=require("cluster"),r=require("worker_threads"),s=require("async_hooks");function o(e){return e&&"object"==typeof e&&"default"in e?e:{default:e}}var i,n=o(e),a=o(t);!function(e){e.FIXED="fixed",e.DYNAMIC="dynamic"}(i||(i={}));class h extends n.default{}const c=()=>{},l=Object.freeze({SOFT:"SOFT",HARD:"HARD"});const k=Object.freeze({ROUND_ROBIN:"ROUND_ROBIN",LESS_RECENTLY_USED:"LESS_RECENTLY_USED"});class u{constructor(e){this.pool=e,this.nextWorkerIndex=0}choose(){const e=this.pool.workers[this.nextWorkerIndex];return this.nextWorkerIndex=this.pool.workers.length-1===this.nextWorkerIndex?0:this.nextWorkerIndex+1,e}}class p{constructor(e){this.pool=e}choose(){const e=this.pool.type===i.DYNAMIC;let t,r=1/0;for(const[s,o]of this.pool.tasks){if(!e&&0===o)return s;o<r&&(t=s,r=o)}return t}}class d{constructor(e,t,r=k.ROUND_ROBIN){this.pool=e,this.createDynamicallyWorkerCallback=t,this.workerChoiceStrategy=W.getWorkerChoiceStrategy(this.pool,r)}choose(){const e=this.pool.findFreeTasksMapEntry();return e?e[0]:this.pool.busy?this.workerChoiceStrategy.choose():this.createDynamicallyWorkerCallback()}}class y{constructor(e,t,r=k.ROUND_ROBIN){this.pool=e,this.createDynamicallyWorkerCallback=t,this.setWorkerChoiceStrategy(r)}getPoolWorkerChoiceStrategy(e=k.ROUND_ROBIN){return this.pool.type===i.DYNAMIC?new d(this.pool,this.createDynamicallyWorkerCallback,e):W.getWorkerChoiceStrategy(this.pool,e)}setWorkerChoiceStrategy(e){this.workerChoiceStrategy=this.getPoolWorkerChoiceStrategy(e)}execute(){return this.workerChoiceStrategy.choose()}}class W{static getWorkerChoiceStrategy(e,t=k.ROUND_ROBIN){switch(t){case k.ROUND_ROBIN:return new u(e);case k.LESS_RECENTLY_USED:return new p(e);default:throw new Error(`Worker choice strategy '${t}' not found`)}}}class m{constructor(e,t,r){if(this.numberOfWorkers=e,this.filePath=t,this.opts=r,this.workers=[],this.tasks=new Map,this.promiseMap=new Map,this.nextMessageId=0,!this.isMain())throw new Error("Cannot start a pool from a worker!");this.checkNumberOfWorkers(this.numberOfWorkers),this.checkFilePath(this.filePath),this.checkPoolOptions(this.opts),this.setupHook();for(let e=1;e<=this.numberOfWorkers;e++)this.createAndSetupWorker();this.opts.enableEvents&&(this.emitter=new h),this.workerChoiceStrategyContext=new y(this,(()=>{const e=this.createAndSetupWorker();return this.registerWorkerMessageListener(e,(async t=>{const r=this.tasks.get(e);var s;s=l.HARD,(t.kill===s||0===r)&&await this.destroyWorker(e)})),e}),this.opts.workerChoiceStrategy)}checkFilePath(e){if(!e)throw new Error("Please specify a file with a worker implementation")}checkNumberOfWorkers(e){if(null==e)throw new Error("Cannot instantiate a pool without specifying the number of workers");if(!Number.isSafeInteger(e))throw new Error("Cannot instantiate a pool with a non integer number of workers");if(e<0)throw new Error("Cannot instantiate a pool with a negative number of workers");if(this.type===i.FIXED&&0===e)throw new Error("Cannot instantiate a fixed pool with no worker")}checkPoolOptions(e){var t,r;this.opts.workerChoiceStrategy=null!==(t=e.workerChoiceStrategy)&&void 0!==t?t:k.ROUND_ROBIN,this.opts.enableEvents=null===(r=e.enableEvents)||void 0===r||r}get numberOfRunningTasks(){return this.promiseMap.size}setWorkerChoiceStrategy(e){this.opts.workerChoiceStrategy=e,this.workerChoiceStrategyContext.setWorkerChoiceStrategy(e)}internalGetBusyStatus(){return this.numberOfRunningTasks>=this.numberOfWorkers&&!1===this.findFreeTasksMapEntry()}findFreeTasksMapEntry(){for(const[e,t]of this.tasks)if(0===t)return[e,t];return!1}execute(e){const t=this.chooseWorker(),r=++this.nextMessageId,s=this.internalExecute(t,r);return this.checkAndEmitBusy(),this.sendToWorker(t,{data:e||{},id:r}),s}async destroy(){await Promise.all(this.workers.map((e=>this.destroyWorker(e))))}setupHook(){}increaseWorkersTask(e){this.stepWorkerNumberOfTasks(e,1)}decreaseWorkersTasks(e){this.stepWorkerNumberOfTasks(e,-1)}stepWorkerNumberOfTasks(e,t){const r=this.tasks.get(e);if(void 0===r)throw Error("Worker could not be found in tasks map");this.tasks.set(e,r+t)}removeWorker(e){const t=this.workers.indexOf(e);this.workers.splice(t,1),this.tasks.delete(e)}chooseWorker(){return this.workerChoiceStrategyContext.execute()}internalExecute(e,t){return this.increaseWorkersTask(e),new Promise(((r,s)=>{this.promiseMap.set(t,{resolve:r,reject:s,worker:e})}))}createAndSetupWorker(){var e,t,r,s;const o=this.createWorker();return o.on("message",null!==(e=this.opts.messageHandler)&&void 0!==e?e:c),o.on("error",null!==(t=this.opts.errorHandler)&&void 0!==t?t:c),o.on("online",null!==(r=this.opts.onlineHandler)&&void 0!==r?r:c),o.on("exit",null!==(s=this.opts.exitHandler)&&void 0!==s?s:c),o.once("exit",(()=>this.removeWorker(o))),this.workers.push(o),this.tasks.set(o,0),this.afterWorkerSetup(o),o}workerListener(){return e=>{if(e.id){const t=this.promiseMap.get(e.id);t&&(this.decreaseWorkersTasks(t.worker),e.error?t.reject(e.error):t.resolve(e.data),this.promiseMap.delete(e.id))}}}checkAndEmitBusy(){var e;this.opts.enableEvents&&this.busy&&(null===(e=this.emitter)||void 0===e||e.emit("busy"))}}class w extends m{constructor(e,t,r={}){super(e,t,r),this.opts=r}setupHook(){a.default.setupPrimary({exec:this.filePath})}isMain(){return a.default.isPrimary}destroyWorker(e){this.sendToWorker(e,{kill:1}),e.kill()}sendToWorker(e,t){e.send(t)}registerWorkerMessageListener(e,t){e.on("message",t)}createWorker(){return a.default.fork(this.opts.env)}afterWorkerSetup(e){this.registerWorkerMessageListener(e,super.workerListener())}get type(){return i.FIXED}get busy(){return this.internalGetBusyStatus()}}class f extends m{constructor(e,t,r={}){super(e,t,r)}isMain(){return r.isMainThread}async destroyWorker(e){this.sendToWorker(e,{kill:1}),await e.terminate()}sendToWorker(e,t){e.postMessage(t)}registerWorkerMessageListener(e,t){var r;null===(r=e.port2)||void 0===r||r.on("message",t)}createWorker(){return new r.Worker(this.filePath,{env:r.SHARE_ENV})}afterWorkerSetup(e){const{port1:t,port2:s}=new r.MessageChannel;e.postMessage({parent:t},[t]),e.port1=t,e.port2=s,this.registerWorkerMessageListener(e,super.workerListener())}get type(){return i.FIXED}get busy(){return this.internalGetBusyStatus()}}const g=l.SOFT;class v extends s.AsyncResource{constructor(e,t,r,s,o={killBehavior:g,maxInactiveTime:6e4}){var i,n;super(e),this.mainWorker=s,this.opts=o,this.checkFunctionInput(r),this.checkWorkerOptions(this.opts),this.lastTaskTimestamp=Date.now(),t||(this.aliveInterval=setInterval(this.checkAlive.bind(this),(null!==(i=this.opts.maxInactiveTime)&&void 0!==i?i:6e4)/2),this.checkAlive.bind(this)()),null===(n=this.mainWorker)||void 0===n||n.on("message",(e=>{(null==e?void 0:e.data)&&e.id?this.opts.async?this.runInAsyncScope(this.runAsync.bind(this),this,r,e):this.runInAsyncScope(this.run.bind(this),this,r,e):e.parent?this.mainWorker=e.parent:e.kill&&(this.aliveInterval&&clearInterval(this.aliveInterval),this.emitDestroy())}))}checkWorkerOptions(e){var t,r;this.opts.killBehavior=null!==(t=e.killBehavior)&&void 0!==t?t:g,this.opts.maxInactiveTime=null!==(r=e.maxInactiveTime)&&void 0!==r?r:6e4,this.opts.async=!!e.async}checkFunctionInput(e){if(!e)throw new Error("fn parameter is mandatory")}getMainWorker(){if(!this.mainWorker)throw new Error("Main worker was not set");return this.mainWorker}checkAlive(){var e;Date.now()-this.lastTaskTimestamp>(null!==(e=this.opts.maxInactiveTime)&&void 0!==e?e:6e4)&&this.sendToMainWorker({kill:this.opts.killBehavior})}handleError(e){return e}run(e,t){try{const r=e(t.data);this.sendToMainWorker({data:r,id:t.id})}catch(e){const r=this.handleError(e);this.sendToMainWorker({error:r,id:t.id})}finally{this.lastTaskTimestamp=Date.now()}}runAsync(e,t){e(t.data).then((e=>(this.sendToMainWorker({data:e,id:t.id}),null))).catch((e=>{const r=this.handleError(e);this.sendToMainWorker({error:r,id:t.id})})).finally((()=>{this.lastTaskTimestamp=Date.now()})).catch(c)}}exports.AbstractWorker=v,exports.ClusterWorker=class extends v{constructor(e,t={}){super("worker-cluster-pool:poolifier",a.default.isPrimary,e,a.default.worker,t)}sendToMainWorker(e){this.getMainWorker().send(e)}handleError(e){return e instanceof Error?e.message:e}},exports.DynamicClusterPool=class extends w{constructor(e,t,r,s={}){super(e,r,s),this.max=t}get type(){return i.DYNAMIC}get busy(){return this.workers.length===this.max}},exports.DynamicThreadPool=class extends f{constructor(e,t,r,s={}){super(e,r,s),this.max=t}get type(){return i.DYNAMIC}get busy(){return this.workers.length===this.max}},exports.FixedClusterPool=w,exports.FixedThreadPool=f,exports.KillBehaviors=l,exports.ThreadWorker=class extends v{constructor(e,t={}){super("worker-thread-pool:poolifier",r.isMainThread,e,r.parentPort,t)}sendToMainWorker(e){this.getMainWorker().postMessage(e)}},exports.WorkerChoiceStrategies=k;
"use strict";Object.defineProperty(exports,"__esModule",{value:!0});var e=require("events"),r=require("cluster"),t=require("worker_threads"),s=require("async_hooks");function o(e){return e&&"object"==typeof e&&"default"in e?e:{default:e}}var i,n=o(e),a=o(r);!function(e){e.FIXED="fixed",e.DYNAMIC="dynamic"}(i||(i={}));class h extends n.default{}const c=()=>{},l=Object.freeze({SOFT:"SOFT",HARD:"HARD"});const k=Object.freeze({ROUND_ROBIN:"ROUND_ROBIN",LESS_RECENTLY_USED:"LESS_RECENTLY_USED"});class u{constructor(e){this.pool=e,this.isDynamicPool=this.pool.type===i.DYNAMIC}}class p extends u{choose(){let e,r=1/0;for(const t of this.pool.workers){const s=this.pool.getWorkerRunningTasks(t);if(!1===this.isDynamicPool&&0===s)return t;s<r&&(e=t,r=s)}return e}}class d extends u{constructor(){super(...arguments),this.nextWorkerIndex=0}choose(){const e=this.pool.workers[this.nextWorkerIndex];return this.nextWorkerIndex=this.nextWorkerIndex===this.pool.workers.length-1?0:this.nextWorkerIndex+1,e}}class W{static getWorkerChoiceStrategy(e,r=k.ROUND_ROBIN){switch(r){case k.ROUND_ROBIN:return new d(e);case k.LESS_RECENTLY_USED:return new p(e);default:throw new Error(`Worker choice strategy '${r}' not found`)}}}class g extends u{constructor(e,r,t=k.ROUND_ROBIN){super(e),this.createDynamicallyWorkerCallback=r,this.workerChoiceStrategy=W.getWorkerChoiceStrategy(this.pool,t)}choose(){const e=this.pool.findFreeWorker();return e||(this.pool.busy?this.workerChoiceStrategy.choose():this.createDynamicallyWorkerCallback())}}class m{constructor(e,r,t=k.ROUND_ROBIN){this.pool=e,this.createDynamicallyWorkerCallback=r,this.setWorkerChoiceStrategy(t)}getPoolWorkerChoiceStrategy(e=k.ROUND_ROBIN){return this.pool.type===i.DYNAMIC?new g(this.pool,this.createDynamicallyWorkerCallback,e):W.getWorkerChoiceStrategy(this.pool,e)}setWorkerChoiceStrategy(e){this.workerChoiceStrategy=this.getPoolWorkerChoiceStrategy(e)}execute(){return this.workerChoiceStrategy.choose()}}class y{constructor(e,r,t){if(this.numberOfWorkers=e,this.filePath=r,this.opts=t,this.workers=[],this.tasks=new Map,this.promiseMap=new Map,this.nextMessageId=0,!this.isMain())throw new Error("Cannot start a pool from a worker!");this.checkNumberOfWorkers(this.numberOfWorkers),this.checkFilePath(this.filePath),this.checkPoolOptions(this.opts),this.setupHook();for(let e=1;e<=this.numberOfWorkers;e++)this.createAndSetupWorker();this.opts.enableEvents&&(this.emitter=new h),this.workerChoiceStrategyContext=new m(this,(()=>{const e=this.createAndSetupWorker();return this.registerWorkerMessageListener(e,(r=>{var t;t=l.HARD,(r.kill===t||0===this.getWorkerRunningTasks(e))&&this.destroyWorker(e)})),e}),this.opts.workerChoiceStrategy)}checkFilePath(e){if(!e)throw new Error("Please specify a file with a worker implementation")}checkNumberOfWorkers(e){if(null==e)throw new Error("Cannot instantiate a pool without specifying the number of workers");if(!Number.isSafeInteger(e))throw new Error("Cannot instantiate a pool with a non integer number of workers");if(e<0)throw new Error("Cannot instantiate a pool with a negative number of workers");if(this.type===i.FIXED&&0===e)throw new Error("Cannot instantiate a fixed pool with no worker")}checkPoolOptions(e){var r,t;this.opts.workerChoiceStrategy=null!==(r=e.workerChoiceStrategy)&&void 0!==r?r:k.ROUND_ROBIN,this.opts.enableEvents=null===(t=e.enableEvents)||void 0===t||t}get numberOfRunningTasks(){return this.promiseMap.size}getWorkerRunningTasks(e){return this.tasks.get(e)}getWorkerIndex(e){return this.workers.indexOf(e)}setWorkerChoiceStrategy(e){this.opts.workerChoiceStrategy=e,this.workerChoiceStrategyContext.setWorkerChoiceStrategy(e)}internalGetBusyStatus(){return this.numberOfRunningTasks>=this.numberOfWorkers&&!1===this.findFreeWorker()}findFreeWorker(){for(const e of this.workers)if(0===this.getWorkerRunningTasks(e))return e;return!1}execute(e){const r=this.chooseWorker(),t=++this.nextMessageId,s=this.internalExecute(r,t);return this.checkAndEmitBusy(),e=null!=e?e:{},this.sendToWorker(r,{data:e,id:t}),s}async destroy(){await Promise.all(this.workers.map((e=>this.destroyWorker(e))))}setupHook(){}increaseWorkersTask(e){this.stepWorkerNumberOfTasks(e,1)}decreaseWorkersTasks(e){this.stepWorkerNumberOfTasks(e,-1)}stepWorkerNumberOfTasks(e,r){const t=this.tasks.get(e);if(void 0===t)throw Error("Worker could not be found in tasks map");this.tasks.set(e,t+r)}removeWorker(e){this.workers.splice(this.getWorkerIndex(e),1),this.tasks.delete(e)}chooseWorker(){return this.workerChoiceStrategyContext.execute()}internalExecute(e,r){return this.increaseWorkersTask(e),new Promise(((t,s)=>{this.promiseMap.set(r,{resolve:t,reject:s,worker:e})}))}createAndSetupWorker(){var e,r,t,s;const o=this.createWorker();return o.on("message",null!==(e=this.opts.messageHandler)&&void 0!==e?e:c),o.on("error",null!==(r=this.opts.errorHandler)&&void 0!==r?r:c),o.on("online",null!==(t=this.opts.onlineHandler)&&void 0!==t?t:c),o.on("exit",null!==(s=this.opts.exitHandler)&&void 0!==s?s:c),o.once("exit",(()=>this.removeWorker(o))),this.workers.push(o),this.tasks.set(o,0),this.afterWorkerSetup(o),o}workerListener(){return e=>{if(void 0!==e.id){const r=this.promiseMap.get(e.id);void 0!==r&&(this.decreaseWorkersTasks(r.worker),e.error?r.reject(e.error):r.resolve(e.data),this.promiseMap.delete(e.id))}}}checkAndEmitBusy(){var e;this.opts.enableEvents&&this.busy&&(null===(e=this.emitter)||void 0===e||e.emit("busy"))}}class w extends y{constructor(e,r,t={}){super(e,r,t),this.opts=t}setupHook(){a.default.setupPrimary({exec:this.filePath})}isMain(){return a.default.isPrimary}destroyWorker(e){this.sendToWorker(e,{kill:1}),e.kill()}sendToWorker(e,r){e.send(r)}registerWorkerMessageListener(e,r){e.on("message",r)}createWorker(){return a.default.fork(this.opts.env)}afterWorkerSetup(e){this.registerWorkerMessageListener(e,super.workerListener())}get type(){return i.FIXED}get busy(){return this.internalGetBusyStatus()}}class f extends y{constructor(e,r,t={}){super(e,r,t)}isMain(){return t.isMainThread}async destroyWorker(e){this.sendToWorker(e,{kill:1}),await e.terminate()}sendToWorker(e,r){e.postMessage(r)}registerWorkerMessageListener(e,r){var t;null===(t=e.port2)||void 0===t||t.on("message",r)}createWorker(){return new t.Worker(this.filePath,{env:t.SHARE_ENV})}afterWorkerSetup(e){const{port1:r,port2:s}=new t.MessageChannel;e.postMessage({parent:r},[r]),e.port1=r,e.port2=s,this.registerWorkerMessageListener(e,super.workerListener())}get type(){return i.FIXED}get busy(){return this.internalGetBusyStatus()}}const v=l.SOFT;class x extends s.AsyncResource{constructor(e,r,t,s,o={killBehavior:v,maxInactiveTime:6e4}){var i,n;super(e),this.mainWorker=s,this.opts=o,this.checkFunctionInput(t),this.checkWorkerOptions(this.opts),this.lastTaskTimestamp=Date.now(),r||(this.aliveInterval=setInterval(this.checkAlive.bind(this),(null!==(i=this.opts.maxInactiveTime)&&void 0!==i?i:6e4)/2),this.checkAlive.bind(this)()),null===(n=this.mainWorker)||void 0===n||n.on("message",(e=>{this.messageListener(e,t)}))}messageListener(e,r){void 0!==e.data&&void 0!==e.id?this.opts.async?this.runInAsyncScope(this.runAsync.bind(this),this,r,e):this.runInAsyncScope(this.run.bind(this),this,r,e):void 0!==e.parent?this.mainWorker=e.parent:void 0!==e.kill&&(this.aliveInterval&&clearInterval(this.aliveInterval),this.emitDestroy())}checkWorkerOptions(e){var r,t;this.opts.killBehavior=null!==(r=e.killBehavior)&&void 0!==r?r:v,this.opts.maxInactiveTime=null!==(t=e.maxInactiveTime)&&void 0!==t?t:6e4,this.opts.async=!!e.async}checkFunctionInput(e){if(!e)throw new Error("fn parameter is mandatory")}getMainWorker(){if(!this.mainWorker)throw new Error("Main worker was not set");return this.mainWorker}checkAlive(){var e;Date.now()-this.lastTaskTimestamp>(null!==(e=this.opts.maxInactiveTime)&&void 0!==e?e:6e4)&&this.sendToMainWorker({kill:this.opts.killBehavior})}handleError(e){return e}run(e,r){try{const t=e(r.data);this.sendToMainWorker({data:t,id:r.id})}catch(e){const t=this.handleError(e);this.sendToMainWorker({error:t,id:r.id})}finally{this.lastTaskTimestamp=Date.now()}}runAsync(e,r){e(r.data).then((e=>(this.sendToMainWorker({data:e,id:r.id}),null))).catch((e=>{const t=this.handleError(e);this.sendToMainWorker({error:t,id:r.id})})).finally((()=>{this.lastTaskTimestamp=Date.now()})).catch(c)}}exports.AbstractWorker=x,exports.ClusterWorker=class extends x{constructor(e,r={}){super("worker-cluster-pool:poolifier",a.default.isPrimary,e,a.default.worker,r)}sendToMainWorker(e){this.getMainWorker().send(e)}handleError(e){return e instanceof Error?e.message:e}},exports.DynamicClusterPool=class extends w{constructor(e,r,t,s={}){super(e,t,s),this.max=r}get type(){return i.DYNAMIC}get busy(){return this.workers.length===this.max}},exports.DynamicThreadPool=class extends f{constructor(e,r,t,s={}){super(e,t,s),this.max=r}get type(){return i.DYNAMIC}get busy(){return this.workers.length===this.max}},exports.FixedClusterPool=w,exports.FixedThreadPool=f,exports.KillBehaviors=l,exports.ThreadWorker=class extends x{constructor(e,r={}){super("worker-thread-pool:poolifier",t.isMainThread,e,t.parentPort,r)}sendToMainWorker(e){this.getMainWorker().postMessage(e)}},exports.WorkerChoiceStrategies=k;
{
"name": "poolifier",
"version": "2.2.0",
"version": "2.2.1",
"description": "A fast, easy to use Node.js Worker Thread Pool and Cluster Pool implementation",

@@ -19,5 +19,6 @@ "main": "lib/index.js",

"format": "prettier --loglevel silent --write .; prettierx --write .",
"lint": "eslint .",
"lint:fix": "eslint . --fix",
"lint": "eslint . --cache",
"lint:fix": "eslint . --cache --fix",
"typedoc": "typedoc",
"sonar:properties": "./updateSonarProps.sh",
"prepublishOnly": "npm run build:prod"

@@ -66,24 +67,26 @@ },

"devDependencies": {
"@types/node": "^16.11.19",
"@typescript-eslint/eslint-plugin": "^5.9.0",
"@typescript-eslint/parser": "^5.9.0",
"@types/node": "^18.8.3",
"@typescript-eslint/eslint-plugin": "^5.39.0",
"@typescript-eslint/parser": "^5.39.0",
"benchmark": "^2.1.4",
"eslint": "^8.6.0",
"eslint-config-standard": "^16.0.3",
"eslint-define-config": "^1.2.1",
"eslint-plugin-import": "^2.25.4",
"eslint-plugin-jsdoc": "^37.5.1",
"eslint": "^8.25.0",
"eslint-config-standard": "^17.0.0",
"eslint-define-config": "^1.7.0",
"eslint-import-resolver-typescript": "^3.5.1",
"eslint-plugin-import": "^2.26.0",
"eslint-plugin-jsdoc": "^39.3.6",
"eslint-plugin-n": "^15.3.0",
"eslint-plugin-node": "^11.1.0",
"eslint-plugin-prettierx": "^0.18.0",
"eslint-plugin-promise": "^6.0.0",
"eslint-plugin-spellcheck": "0.0.19",
"expect": "^27.4.6",
"microtime": "^3.0.0",
"mocha": "^9.1.3",
"mochawesome": "^7.0.1",
"eslint-plugin-promise": "^6.0.1",
"eslint-plugin-spellcheck": "^0.0.19",
"expect": "^29.1.2",
"microtime": "^3.1.1",
"mocha": "^10.0.0",
"mochawesome": "^7.1.3",
"nyc": "^15.1.0",
"prettier": "^2.5.1",
"prettier-plugin-organize-imports": "^2.3.4",
"prettier": "^2.7.1",
"prettier-plugin-organize-imports": "^3.1.1",
"prettierx": "^0.18.3",
"rollup": "^2.63.0",
"rollup": "^2.79.1",
"rollup-plugin-analyzer": "^4.0.0",

@@ -94,6 +97,7 @@ "rollup-plugin-command": "^1.1.3",

"rollup-plugin-terser": "^7.0.2",
"rollup-plugin-ts": "^2.0.5",
"rollup-plugin-ts": "^3.0.2",
"sinon": "^14.0.1",
"source-map-support": "^0.5.21",
"typedoc": "^0.22.10",
"typescript": "^4.5.4"
"typedoc": "^0.23.15",
"typescript": "^4.8.4"
},

@@ -100,0 +104,0 @@ "engines": {

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc