Comparing version 1.0.0-alpha.3 to 1.0.0-alpha.4
@@ -13,2 +13,3 @@ import Observable from "zen-observable"; | ||
taskQueued = "taskQueued", | ||
taskQueueDrained = "taskQueueDrained", | ||
taskStart = "taskStart", | ||
@@ -29,2 +30,4 @@ terminated = "terminated" | ||
} | { | ||
type: PoolEventType.taskQueueDrained; | ||
} | { | ||
type: PoolEventType.taskStart; | ||
@@ -47,10 +50,38 @@ taskID: number; | ||
}; | ||
/** | ||
* Thread pool implementation managing a set of worker threads. | ||
* Use it to queue jobs that are run on those threads with limited | ||
* concurrency. | ||
*/ | ||
export interface Pool<ThreadType extends Thread> { | ||
/** | ||
* Returns a promise that resolves once the job queue is emptied. | ||
* | ||
* @param allowResolvingImmediately Set to `true` to resolve immediately if job queue is currently empty. | ||
*/ | ||
completed(allowResolvingImmediately?: boolean): Promise<any>; | ||
/** | ||
* Returns an observable that yields pool events. | ||
*/ | ||
events(): Observable<PoolEvent<ThreadType>>; | ||
queue<Return>(task: TaskRunFunction<ThreadType, Return>): Promise<Return>; | ||
/** | ||
* Queue a job and return a promise that resolves once the job has been dequeued, | ||
* started and finished. | ||
* | ||
* @param job An async function that takes a thread instance and invokes it. | ||
*/ | ||
queue<Return>(job: TaskRunFunction<ThreadType, Return>): Promise<Return>; | ||
/** | ||
* Terminate all pool threads. | ||
* | ||
* @param force Set to `true` to kill the thread even if it cannot be stopped gracefully. | ||
*/ | ||
terminate(force?: boolean): Promise<void>; | ||
} | ||
export interface PoolOptions { | ||
/** Maximum no. of jobs to run on one worker thread at a time. Defaults to one. */ | ||
concurrency?: number; | ||
/** Gives that pool a name to be used for debug logging, letting you distinguish between log output of different pools. */ | ||
name?: string; | ||
/** No. of worker threads to spawn and to be managed by the pool. */ | ||
size?: number; | ||
@@ -57,0 +88,0 @@ } |
@@ -23,2 +23,8 @@ "use strict"; | ||
const hasSymbol = (name) => hasSymbols() && Boolean(Symbol[name]); | ||
function flatMap(array, mapper) { | ||
return array.reduce((flattened, element) => [...flattened, ...mapper(element)], []); | ||
} | ||
function sleep(ms) { | ||
return new Promise(resolve => setTimeout(resolve, ms)); | ||
} | ||
function slugify(text) { | ||
@@ -33,2 +39,3 @@ return text.replace(/\W/g, " ").trim().replace(/\s+/g, "-"); | ||
PoolEventType["taskQueued"] = "taskQueued"; | ||
PoolEventType["taskQueueDrained"] = "taskQueueDrained"; | ||
PoolEventType["taskStart"] = "taskStart"; | ||
@@ -45,3 +52,3 @@ PoolEventType["terminated"] = "terminated"; | ||
function findIdlingWorker(workers, maxConcurrency) { | ||
return workers.find(worker => worker.runningTasks.length < maxConcurrency); | ||
return workers.find(worker => worker.runningJobs.length < maxConcurrency); | ||
} | ||
@@ -51,3 +58,3 @@ function spawnWorkers(spawnWorker, count) { | ||
init: spawnWorker(), | ||
runningTasks: [] | ||
runningJobs: [] | ||
})); | ||
@@ -63,3 +70,2 @@ } | ||
let nextTaskID = 1; | ||
let runningTaskJobs = []; | ||
const taskQueue = []; | ||
@@ -76,3 +82,3 @@ const workers = spawnWorkers(spawnWorker, size); | ||
const scheduleWork = () => { | ||
debug(`Attempt de-queueing a task to run it...`); | ||
debug(`Attempt de-queueing a task in order to run it...`); | ||
const availableWorker = findIdlingWorker(workers, concurrency); | ||
@@ -82,4 +88,7 @@ if (!availableWorker) | ||
const nextTask = taskQueue.shift(); | ||
if (!nextTask) | ||
if (!nextTask) { | ||
debug(`Task queue is empty`); | ||
eventSubject.next({ type: PoolEventType.taskQueueDrained }); | ||
return; | ||
} | ||
const workerID = workers.indexOf(availableWorker) + 1; | ||
@@ -92,6 +101,12 @@ debug(`Running task #${nextTask.id} on worker #${workerID}...`); | ||
}); | ||
const run = () => __awaiter(this, void 0, void 0, function* () { | ||
const run = (worker, task) => __awaiter(this, void 0, void 0, function* () { | ||
const removeJobFromWorkersRunningJobs = () => { | ||
worker.runningJobs = worker.runningJobs.filter(someRunPromise => someRunPromise !== runPromise); | ||
}; | ||
// Defer job execution by one tick to give handlers time to subscribe | ||
yield sleep(0); | ||
try { | ||
const returnValue = yield nextTask.run(yield availableWorker.init); | ||
const returnValue = yield task.run(yield availableWorker.init); | ||
debug(`Task #${nextTask.id} completed successfully`); | ||
removeJobFromWorkersRunningJobs(); | ||
eventSubject.next({ | ||
@@ -106,2 +121,3 @@ type: PoolEventType.taskCompleted, | ||
debug(`Task #${nextTask.id} failed`); | ||
removeJobFromWorkersRunningJobs(); | ||
eventSubject.next({ | ||
@@ -113,6 +129,4 @@ type: PoolEventType.taskFailed, | ||
}); | ||
throw error; | ||
} | ||
finally { | ||
runningTaskJobs = runningTaskJobs.filter(someRunPromise => someRunPromise !== runPromise); | ||
if (!isClosing) { | ||
@@ -123,48 +137,76 @@ scheduleWork(); | ||
}); | ||
const runPromise = run(); | ||
runningTaskJobs.push(runPromise); | ||
const runPromise = run(availableWorker, nextTask); | ||
availableWorker.runningJobs.push(runPromise); | ||
}; | ||
const pool = { | ||
events() { | ||
return eventObservable; | ||
}, | ||
queue(taskFunction) { | ||
completed(allowResolvingImmediately = false) { | ||
return __awaiter(this, void 0, void 0, function* () { | ||
if (isClosing) { | ||
throw Error(`Cannot schedule pool tasks after terminate() has been called.`); | ||
const getCurrentlyRunningJobs = () => flatMap(workers, worker => worker.runningJobs); | ||
if (allowResolvingImmediately && taskQueue.length === 0) { | ||
return Promise.all(getCurrentlyRunningJobs()); | ||
} | ||
const task = { | ||
id: nextTaskID++, | ||
run: taskFunction | ||
}; | ||
debug(`Queueing task #${task.id}...`); | ||
taskQueue.push(task); | ||
eventSubject.next({ | ||
type: PoolEventType.taskQueued, | ||
taskID: task.id | ||
}); | ||
return new Promise((resolve, reject) => { | ||
const eventSubscription = pool.events().subscribe(event => { | ||
if (event.type === PoolEventType.taskCompleted && event.taskID === task.id) { | ||
eventSubscription.unsubscribe(); | ||
resolve(event.returnValue); | ||
const poolEventPromise = new Promise((resolve, reject) => { | ||
const subscription = eventObservable.subscribe(event => { | ||
if (event.type === PoolEventType.taskQueueDrained) { | ||
subscription.unsubscribe(); | ||
resolve(); | ||
} | ||
else if (event.type === PoolEventType.taskFailed && event.taskID === task.id) { | ||
eventSubscription.unsubscribe(); | ||
else if (event.type === PoolEventType.taskFailed) { | ||
subscription.unsubscribe(); | ||
reject(event.error); | ||
} | ||
else if (event.type === PoolEventType.terminated) { | ||
eventSubscription.unsubscribe(); | ||
reject(Error("Pool has been terminated before task was run.")); | ||
} | ||
}); | ||
try { | ||
scheduleWork(); | ||
}); | ||
yield Promise.race([ | ||
poolEventPromise, | ||
eventObservable // make a pool-wide error reject the completed() result promise | ||
]); | ||
yield Promise.all(getCurrentlyRunningJobs()); | ||
}); | ||
}, | ||
events() { | ||
return eventObservable; | ||
}, | ||
queue(taskFunction) { | ||
if (isClosing) { | ||
throw Error(`Cannot schedule pool tasks after terminate() has been called.`); | ||
} | ||
const task = { | ||
id: nextTaskID++, | ||
run: taskFunction | ||
}; | ||
debug(`Queueing task #${task.id}...`); | ||
taskQueue.push(task); | ||
eventSubject.next({ | ||
type: PoolEventType.taskQueued, | ||
taskID: task.id | ||
}); | ||
const resultPromise = new Promise((resolve, reject) => { | ||
const eventSubscription = pool.events().subscribe(event => { | ||
if (event.type === PoolEventType.taskCompleted && event.taskID === task.id) { | ||
eventSubscription.unsubscribe(); | ||
resolve(event.returnValue); | ||
} | ||
catch (error) { | ||
else if (event.type === PoolEventType.taskFailed && event.taskID === task.id) { | ||
eventSubscription.unsubscribe(); | ||
reject(error); | ||
reject(event.error); | ||
} | ||
else if (event.type === PoolEventType.terminated) { | ||
eventSubscription.unsubscribe(); | ||
reject(Error("Pool has been terminated before task was run.")); | ||
} | ||
}); | ||
try { | ||
scheduleWork(); | ||
} | ||
catch (error) { | ||
eventSubscription.unsubscribe(); | ||
reject(error); | ||
} | ||
}); | ||
// Don't raise an UnhandledPromiseRejection error if not handled | ||
// Reason: Because we just return this promise for convenience, but usually only | ||
// pool.completed() will be used, leaving this quasi-duplicate promise unhandled. | ||
resultPromise.catch(() => undefined); | ||
return resultPromise; | ||
}, | ||
@@ -175,3 +217,3 @@ terminate(force) { | ||
if (!force) { | ||
yield Promise.all(runningTaskJobs); | ||
yield pool.completed(true); | ||
} | ||
@@ -178,0 +220,0 @@ eventSubject.next({ |
import Observable from "zen-observable"; | ||
import { FunctionParams, FunctionThread, ModuleThread, Worker as WorkerType } from "../types/master"; | ||
import { WorkerFunction, WorkerModule } from "../types/worker"; | ||
declare type ExposedToThreadType<Exposed extends WorkerFunction | WorkerModule<any>> = Exposed extends WorkerFunction ? FunctionThread<FunctionParams<Exposed>, StripAsync<ReturnType<Exposed>>> : Exposed extends WorkerModule<any> ? ModuleThread<Exposed> : never; | ||
declare type ArbitraryWorkerInterface = WorkerFunction & WorkerModule<string> & { | ||
somekeythatisneverusedinproductioncode123: "magicmarker123"; | ||
}; | ||
declare type ArbitraryFunctionOrModuleThread = FunctionThread<any, any> & ModuleThread<any>; | ||
declare type ExposedToThreadType<Exposed extends WorkerFunction | WorkerModule<any>> = Exposed extends ArbitraryWorkerInterface ? ArbitraryFunctionOrModuleThread : Exposed extends WorkerFunction ? FunctionThread<FunctionParams<Exposed>, StripAsync<ReturnType<Exposed>>> : Exposed extends WorkerModule<any> ? ModuleThread<Exposed> : never; | ||
declare type StripAsync<Type> = Type extends Promise<infer PromiseBaseType> ? PromiseBaseType : Type extends Observable<infer ObservableBaseType> ? ObservableBaseType : Type; | ||
export declare function spawn<Exposed extends WorkerFunction | WorkerModule<any>>(worker: WorkerType): Promise<ExposedToThreadType<Exposed>>; | ||
export declare function spawn<Exposed extends WorkerFunction | WorkerModule<any> = ArbitraryWorkerInterface>(worker: WorkerType): Promise<ExposedToThreadType<Exposed>>; | ||
export {}; |
declare type UnsubscribeFn = () => void; | ||
export interface AbstractedWorkerAPI { | ||
isWorkerRuntime(): boolean; | ||
postMessageToMaster(message: any, transferList?: Transferable[]): void; | ||
@@ -4,0 +5,0 @@ subscribeToMasterMessages(onMessage: (data: any) => void): UnsubscribeFn; |
/// <reference no-default-lib="true"/> | ||
declare const _default: { | ||
isWorkerRuntime: () => boolean; | ||
postMessageToMaster: (message: any, transferList?: Transferable[] | undefined) => void; | ||
@@ -4,0 +5,0 @@ subscribeToMasterMessages: (onMessage: (data: any) => void) => () => void; |
@@ -5,2 +5,5 @@ "use strict"; | ||
// tslint:disable no-shadowed-variable | ||
const isWorkerRuntime = function isWorkerRuntime() { | ||
return typeof self !== "undefined" && self.postMessage ? true : false; | ||
}; | ||
const postMessageToMaster = function postMessageToMaster(data) { | ||
@@ -21,4 +24,5 @@ // TODO: Transferables | ||
module.exports = { | ||
isWorkerRuntime, | ||
postMessageToMaster, | ||
subscribeToMasterMessages | ||
}; |
/// <reference no-default-lib="true"/> | ||
declare const _default: { | ||
isWorkerRuntime: () => boolean; | ||
postMessageToMaster: (message: any, transferList?: Transferable[] | undefined) => void; | ||
@@ -4,0 +5,0 @@ subscribeToMasterMessages: (onMessage: (data: any) => void) => () => void; |
@@ -5,2 +5,5 @@ "use strict"; | ||
// tslint:disable no-shadowed-variable | ||
const isWorkerRuntime = function isWorkerRuntime() { | ||
return typeof self !== "undefined" && self.postMessage ? true : false; | ||
}; | ||
const postMessageToMaster = function postMessageToMaster(data) { | ||
@@ -21,4 +24,5 @@ // TODO: Warn that Transferables are not supported on first attempt to use feature | ||
module.exports = { | ||
isWorkerRuntime, | ||
postMessageToMaster, | ||
subscribeToMasterMessages | ||
}; |
declare const _default: { | ||
isWorkerRuntime: () => boolean; | ||
postMessageToMaster: (message: any, transferList?: Transferable[] | undefined) => void; | ||
@@ -3,0 +4,0 @@ subscribeToMasterMessages: (onMessage: (data: any) => void) => () => void; |
@@ -10,2 +10,5 @@ "use strict"; | ||
} | ||
const isWorkerRuntime = function isWorkerRuntime() { | ||
return !worker_threads_1.isMainThread; | ||
}; | ||
const postMessageToMaster = function postMessageToMaster(data, transferList) { | ||
@@ -28,4 +31,5 @@ assertMessagePort(worker_threads_1.parentPort).postMessage(data, transferList); | ||
module.exports = { | ||
isWorkerRuntime, | ||
postMessageToMaster, | ||
subscribeToMasterMessages | ||
}; |
@@ -21,3 +21,3 @@ "use strict"; | ||
exports.Transfer = transferable_2.Transfer; | ||
let exposedCalled = false; | ||
let exposeCalled = false; | ||
const isMasterJobRunMessage = (thing) => thing && thing.type === messages_1.MasterMessageType.run; | ||
@@ -113,6 +113,9 @@ /** There are issues with `is-observable` not recognizing zen-observable's instances */ | ||
function expose(exposed) { | ||
if (exposedCalled) { | ||
if (!implementation_1.default.isWorkerRuntime()) { | ||
throw Error("expose() called in the master thread."); | ||
} | ||
if (exposeCalled) { | ||
throw Error("expose() called more than once. This is not possible. Pass an object to expose() if you want to expose multiple functions."); | ||
} | ||
exposedCalled = true; | ||
exposeCalled = true; | ||
if (typeof exposed === "function") { | ||
@@ -119,0 +122,0 @@ implementation_1.default.subscribeToMasterMessages(messageData => { |
{ | ||
"name": "threads", | ||
"version": "1.0.0-alpha.3", | ||
"version": "1.0.0-alpha.4", | ||
"description": "Easy to use, yet powerful multi-threading library for node.js and the browser!", | ||
@@ -5,0 +5,0 @@ "main": "dist/index.js", |
@@ -176,3 +176,3 @@ <h1 align="center">threads</h1> | ||
await pool.queue(async multiplier => { | ||
pool.queue(async multiplier => { | ||
const multiplied = await multiplier(2, 3) | ||
@@ -185,2 +185,3 @@ console.log(`2 * 3 = ${multiplied}`) | ||
await pool.completed() | ||
await pool.terminate() | ||
@@ -191,3 +192,3 @@ ``` | ||
The promise returned by `pool.queue()` will resolve once the scheduled callback has been executed and completed. A failing scheduled callback will also make the promise returned by `pool.queue()` reject. | ||
The promise returned by `pool.completed()` will resolve once the scheduled callbacks have been executed and completed. A failing job will also make the promise reject. | ||
@@ -194,0 +195,0 @@ </details> |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
71781
1462
279