Socket
Socket
Sign inDemoInstall

node-worker-threads-pool

Package Overview
Dependencies
0
Maintainers
1
Versions
31
Alerts
File Explorer

Advanced tools

Install Socket

Detect and block malicious and high-risk dependencies

Install

Comparing version 1.4.0 to 1.4.1

2

package.json
{
"name": "node-worker-threads-pool",
"version": "1.4.0",
"version": "1.4.1",
"description": "Simple worker threads pool using Node's worker_threads module. Compatible with ES6+ Promise, Async/Await.",

@@ -5,0 +5,0 @@ "main": "index.js",

@@ -70,3 +70,3 @@ /**

};
return this.dispatchTask(workerParam, { timeout });
return this.runTask(workerParam, { timeout });
}

@@ -73,0 +73,0 @@

@@ -21,3 +21,3 @@ /**

this.isReady = false;
this.ready = false;

@@ -32,3 +32,3 @@ this.once("online", () => this.readyToWork());

run(param, taskConfig) {
this.isReady = false;
this.ready = false;

@@ -61,7 +61,20 @@ const timeout = taskConfig.timeout ? taskConfig.timeout : 0;

readyToWork() {
this.isReady = true;
this.ready = true;
this.emit("ready", this);
}
/**
* @override
*/
terminate() {
this.once("exit", () => {
setImmediate(() => {
this.removeAllListeners();
});
});
return super.terminate();
}
}
module.exports.PoolWorker = PoolWorker;

@@ -17,8 +17,11 @@ /**

super();
if (typeof size !== "number") {
throw new TypeError('"size" must be the type of number!');
}
if (Number.isNaN(size)) {
throw new Error('"size" must not be NaN!');
}
if (size < 1) {

@@ -32,3 +35,3 @@ throw new RangeError('"size" must not be lower than 1!');

/** @private */
this._isDeprecated = false;
this._deprecated = false;

@@ -50,3 +53,3 @@ /**

*/
this._queue = [];
this._taskQueue = [];

@@ -60,16 +63,4 @@ this._addEventHandlers();

_addEventHandlers() {
this.on("worker-ready", (/** @type {PoolWorker} */ worker) => {
const taskContainer = this._queue.shift();
if (taskContainer) {
const { param, taskConfig, resolve, reject } = taskContainer;
worker
.run(param, taskConfig)
.then(resolve)
.catch((err) => {
if (isTimeoutError(err)) {
worker.terminate();
}
reject(err);
});
}
this.on("worker-ready", (worker) => {
this._processTask(worker);
});

@@ -82,12 +73,10 @@ }

*/
_addWorkerHooks(worker) {
_addWorkerLifecycleHandlers(worker) {
worker.on("ready", (worker) => this.emit("worker-ready", worker));
worker.once("exit", (code) => {
if (this._isDeprecated || code === 0) {
if (this._deprecated || code === 0) {
return;
}
this._replaceBrokenWorker(worker);
worker.terminate();
worker.removeAllListeners();
this._replaceWorker(worker);
});

@@ -98,8 +87,8 @@ }

* @private
* @param {() => PoolWorker} workerGen
* @param {() => PoolWorker} getWorker
*/
_setWorkerGen(workerGen) {
_setWorkerCreator(getWorker) {
this._createWorker = () => {
const worker = workerGen();
this._addWorkerHooks(worker);
const worker = getWorker();
this._addWorkerLifecycleHandlers(worker);
return worker;

@@ -113,5 +102,6 @@ };

*/
_replaceBrokenWorker(worker) {
_replaceWorker(worker) {
const i = this._workers.indexOf(worker);
if (i > 0) {
if (i >= 0) {
this._workers[i] = this._createWorker();

@@ -122,7 +112,44 @@ }

/**
* @param {() => PoolWorker} workerGen
* @returns {PoolWorker | null}
*/
fill(workerGen) {
this._setWorkerGen(workerGen);
_getIdleWorker() {
const worker = this._workers.find((worker) => worker.ready);
return worker ? worker : null;
}
/**
* @param {PoolWorker} worker
* @private
*/
_processTask(worker) {
const task = this._taskQueue.shift();
if (!task) {
return;
}
const { param, resolve, reject, taskConfig } = task;
worker
.run(param, taskConfig)
.then(resolve)
.catch((error) => {
if (isTimeoutError(error)) {
worker.terminate();
}
reject(error);
});
}
/**
* @param {() => PoolWorker} getWorker
*/
fill(getWorker) {
if (!this._createWorker) {
this._setWorkerCreator(getWorker);
}
const size = this._size;
for (let i = 0; i < size; i++) {

@@ -137,28 +164,16 @@ this._workers.push(this._createWorker());

*/
async dispatchTask(param, taskConfig) {
if (this._isDeprecated) {
runTask(param, taskConfig) {
if (this._deprecated) {
throw new Error("This pool is deprecated! Please use a new one.");
}
const worker = this._workers.find((worker) => worker.isReady);
return new Promise((resolve, reject) => {
const task = new TaskContainer(param, resolve, reject, taskConfig);
if (worker) {
try {
return await worker.run(param, taskConfig);
} catch (err) {
if (isTimeoutError(err)) {
worker.terminate();
}
throw err;
this._taskQueue.push(task);
const worker = this._getIdleWorker();
if (worker) {
this._processTask(worker);
}
}
return new Promise((resolve, reject) => {
const taskContainer = new TaskContainer(
param,
resolve,
reject,
taskConfig
);
this._queue.push(taskContainer);
});

@@ -168,3 +183,3 @@ }

async destroy() {
this._isDeprecated = true;
this._deprecated = true;
this.removeAllListeners();

@@ -171,0 +186,0 @@ const workers = this._workers;

@@ -24,7 +24,8 @@ class TimeoutError extends Error {

this._timerID = null;
this._timeoutSymbol = Symbol("timeoutSymbol");
}
_timer() {
_createTimer() {
return new Promise((resolve) => {
this._timerID = setTimeout(resolve, this._timeout, this._timer);
this._timerID = setTimeout(resolve, this._timeout, this._timeoutSymbol);
});

@@ -37,9 +38,11 @@ }

}
const res = await Promise.race([this._p, this._timer()]);
if (res === this._timer) {
// timeout.
const result = await Promise.race([this._p, this._createTimer()]);
if (result === this._timeoutSymbol) {
throw new TimeoutError("timeout");
}
clearTimeout(this._timerID);
return res;
return result;
}

@@ -46,0 +49,0 @@ }

@@ -6,3 +6,2 @@ const { Pool } = require("./pool");

const fnReg = /^task[^]*([^]*)[^]*{[^]*}$/;
/**

@@ -82,3 +81,3 @@ * @param {Function} fn

}
return this.dispatchTask(param, { timeout });
return this.runTask(param, { timeout });
}

@@ -85,0 +84,0 @@

@@ -53,3 +53,3 @@ /**

this._called = true;
return await this._pool.dispatchTask(param, this._taskConfig);
return await this._pool.runTask(param, this._taskConfig);
}

@@ -56,0 +56,0 @@ }

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc