node-worker-threads-pool
Advanced tools
Comparing version 1.4.0 to 1.4.1
{ | ||
"name": "node-worker-threads-pool", | ||
"version": "1.4.0", | ||
"version": "1.4.1", | ||
"description": "Simple worker threads pool using Node's worker_threads module. Compatible with ES6+ Promise, Async/Await.", | ||
@@ -5,0 +5,0 @@ "main": "index.js", |
@@ -70,3 +70,3 @@ /** | ||
}; | ||
return this.dispatchTask(workerParam, { timeout }); | ||
return this.runTask(workerParam, { timeout }); | ||
} | ||
@@ -73,0 +73,0 @@ |
@@ -21,3 +21,3 @@ /** | ||
this.isReady = false; | ||
this.ready = false; | ||
@@ -32,3 +32,3 @@ this.once("online", () => this.readyToWork()); | ||
run(param, taskConfig) { | ||
this.isReady = false; | ||
this.ready = false; | ||
@@ -61,7 +61,20 @@ const timeout = taskConfig.timeout ? taskConfig.timeout : 0; | ||
readyToWork() { | ||
this.isReady = true; | ||
this.ready = true; | ||
this.emit("ready", this); | ||
} | ||
/** | ||
* @override | ||
*/ | ||
terminate() { | ||
this.once("exit", () => { | ||
setImmediate(() => { | ||
this.removeAllListeners(); | ||
}); | ||
}); | ||
return super.terminate(); | ||
} | ||
} | ||
module.exports.PoolWorker = PoolWorker; |
119
src/pool.js
@@ -17,8 +17,11 @@ /** | ||
super(); | ||
if (typeof size !== "number") { | ||
throw new TypeError('"size" must be the type of number!'); | ||
} | ||
if (Number.isNaN(size)) { | ||
throw new Error('"size" must not be NaN!'); | ||
} | ||
if (size < 1) { | ||
@@ -32,3 +35,3 @@ throw new RangeError('"size" must not be lower than 1!'); | ||
/** @private */ | ||
this._isDeprecated = false; | ||
this._deprecated = false; | ||
@@ -50,3 +53,3 @@ /** | ||
*/ | ||
this._queue = []; | ||
this._taskQueue = []; | ||
@@ -60,16 +63,4 @@ this._addEventHandlers(); | ||
_addEventHandlers() { | ||
this.on("worker-ready", (/** @type {PoolWorker} */ worker) => { | ||
const taskContainer = this._queue.shift(); | ||
if (taskContainer) { | ||
const { param, taskConfig, resolve, reject } = taskContainer; | ||
worker | ||
.run(param, taskConfig) | ||
.then(resolve) | ||
.catch((err) => { | ||
if (isTimeoutError(err)) { | ||
worker.terminate(); | ||
} | ||
reject(err); | ||
}); | ||
} | ||
this.on("worker-ready", (worker) => { | ||
this._processTask(worker); | ||
}); | ||
@@ -82,12 +73,10 @@ } | ||
*/ | ||
_addWorkerHooks(worker) { | ||
_addWorkerLifecycleHandlers(worker) { | ||
worker.on("ready", (worker) => this.emit("worker-ready", worker)); | ||
worker.once("exit", (code) => { | ||
if (this._isDeprecated || code === 0) { | ||
if (this._deprecated || code === 0) { | ||
return; | ||
} | ||
this._replaceBrokenWorker(worker); | ||
worker.terminate(); | ||
worker.removeAllListeners(); | ||
this._replaceWorker(worker); | ||
}); | ||
@@ -98,8 +87,8 @@ } | ||
* @private | ||
* @param {() => PoolWorker} workerGen | ||
* @param {() => PoolWorker} getWorker | ||
*/ | ||
_setWorkerGen(workerGen) { | ||
_setWorkerCreator(getWorker) { | ||
this._createWorker = () => { | ||
const worker = workerGen(); | ||
this._addWorkerHooks(worker); | ||
const worker = getWorker(); | ||
this._addWorkerLifecycleHandlers(worker); | ||
return worker; | ||
@@ -113,5 +102,6 @@ }; | ||
*/ | ||
_replaceBrokenWorker(worker) { | ||
_replaceWorker(worker) { | ||
const i = this._workers.indexOf(worker); | ||
if (i > 0) { | ||
if (i >= 0) { | ||
this._workers[i] = this._createWorker(); | ||
@@ -122,7 +112,44 @@ } | ||
/** | ||
* @param {() => PoolWorker} workerGen | ||
* @returns {PoolWorker | null} | ||
*/ | ||
fill(workerGen) { | ||
this._setWorkerGen(workerGen); | ||
_getIdleWorker() { | ||
const worker = this._workers.find((worker) => worker.ready); | ||
return worker ? worker : null; | ||
} | ||
/** | ||
* @param {PoolWorker} worker | ||
* @private | ||
*/ | ||
_processTask(worker) { | ||
const task = this._taskQueue.shift(); | ||
if (!task) { | ||
return; | ||
} | ||
const { param, resolve, reject, taskConfig } = task; | ||
worker | ||
.run(param, taskConfig) | ||
.then(resolve) | ||
.catch((error) => { | ||
if (isTimeoutError(error)) { | ||
worker.terminate(); | ||
} | ||
reject(error); | ||
}); | ||
} | ||
/** | ||
* @param {() => PoolWorker} getWorker | ||
*/ | ||
fill(getWorker) { | ||
if (!this._createWorker) { | ||
this._setWorkerCreator(getWorker); | ||
} | ||
const size = this._size; | ||
for (let i = 0; i < size; i++) { | ||
@@ -137,28 +164,16 @@ this._workers.push(this._createWorker()); | ||
*/ | ||
async dispatchTask(param, taskConfig) { | ||
if (this._isDeprecated) { | ||
runTask(param, taskConfig) { | ||
if (this._deprecated) { | ||
throw new Error("This pool is deprecated! Please use a new one."); | ||
} | ||
const worker = this._workers.find((worker) => worker.isReady); | ||
return new Promise((resolve, reject) => { | ||
const task = new TaskContainer(param, resolve, reject, taskConfig); | ||
if (worker) { | ||
try { | ||
return await worker.run(param, taskConfig); | ||
} catch (err) { | ||
if (isTimeoutError(err)) { | ||
worker.terminate(); | ||
} | ||
throw err; | ||
this._taskQueue.push(task); | ||
const worker = this._getIdleWorker(); | ||
if (worker) { | ||
this._processTask(worker); | ||
} | ||
} | ||
return new Promise((resolve, reject) => { | ||
const taskContainer = new TaskContainer( | ||
param, | ||
resolve, | ||
reject, | ||
taskConfig | ||
); | ||
this._queue.push(taskContainer); | ||
}); | ||
@@ -168,3 +183,3 @@ } | ||
async destroy() { | ||
this._isDeprecated = true; | ||
this._deprecated = true; | ||
this.removeAllListeners(); | ||
@@ -171,0 +186,0 @@ const workers = this._workers; |
@@ -24,7 +24,8 @@ class TimeoutError extends Error { | ||
this._timerID = null; | ||
this._timeoutSymbol = Symbol("timeoutSymbol"); | ||
} | ||
_timer() { | ||
_createTimer() { | ||
return new Promise((resolve) => { | ||
this._timerID = setTimeout(resolve, this._timeout, this._timer); | ||
this._timerID = setTimeout(resolve, this._timeout, this._timeoutSymbol); | ||
}); | ||
@@ -37,9 +38,11 @@ } | ||
} | ||
const res = await Promise.race([this._p, this._timer()]); | ||
if (res === this._timer) { | ||
// timeout. | ||
const result = await Promise.race([this._p, this._createTimer()]); | ||
if (result === this._timeoutSymbol) { | ||
throw new TimeoutError("timeout"); | ||
} | ||
clearTimeout(this._timerID); | ||
return res; | ||
return result; | ||
} | ||
@@ -46,0 +49,0 @@ } |
@@ -6,3 +6,2 @@ const { Pool } = require("./pool"); | ||
const fnReg = /^task[^]*([^]*)[^]*{[^]*}$/; | ||
/** | ||
@@ -82,3 +81,3 @@ * @param {Function} fn | ||
} | ||
return this.dispatchTask(param, { timeout }); | ||
return this.runTask(param, { timeout }); | ||
} | ||
@@ -85,0 +84,0 @@ |
@@ -53,3 +53,3 @@ /** | ||
this._called = true; | ||
return await this._pool.dispatchTask(param, this._taskConfig); | ||
return await this._pool.runTask(param, this._taskConfig); | ||
} | ||
@@ -56,0 +56,0 @@ } |
Sorry, the diff of this file is not supported yet
29838
628