Socket
Socket
Sign inDemoInstall

threads

Package Overview
Dependencies
9
Maintainers
1
Versions
73
Alerts
File Explorer

Advanced tools

Install Socket

Detect and block malicious and high-risk dependencies

Install

Comparing version 1.0.0-alpha.3 to 1.0.0-alpha.4

33

dist/master/pool.d.ts

@@ -13,2 +13,3 @@ import Observable from "zen-observable";

taskQueued = "taskQueued",
taskQueueDrained = "taskQueueDrained",
taskStart = "taskStart",

@@ -29,2 +30,4 @@ terminated = "terminated"

} | {
type: PoolEventType.taskQueueDrained;
} | {
type: PoolEventType.taskStart;

@@ -47,10 +50,38 @@ taskID: number;

};
/**
* Thread pool implementation managing a set of worker threads.
* Use it to queue jobs that are run on those threads with limited
* concurrency.
*/
export interface Pool<ThreadType extends Thread> {
/**
* Returns a promise that resolves once the job queue is emptied.
*
* @param allowResolvingImmediately Set to `true` to resolve immediately if job queue is currently empty.
*/
completed(allowResolvingImmediately?: boolean): Promise<any>;
/**
* Returns an observable that yields pool events.
*/
events(): Observable<PoolEvent<ThreadType>>;
queue<Return>(task: TaskRunFunction<ThreadType, Return>): Promise<Return>;
/**
* Queue a job and return a promise that resolves once the job has been dequeued,
* started and finished.
*
* @param job An async function that takes a thread instance and invokes it.
*/
queue<Return>(job: TaskRunFunction<ThreadType, Return>): Promise<Return>;
/**
* Terminate all pool threads.
*
* @param force Set to `true` to kill the thread even if it cannot be stopped gracefully.
*/
terminate(force?: boolean): Promise<void>;
}
export interface PoolOptions {
/** Maximum no. of jobs to run on one worker thread at a time. Defaults to one. */
concurrency?: number;
/** Gives that pool a name to be used for debug logging, letting you distinguish between log output of different pools. */
name?: string;
/** No. of worker threads to spawn and to be managed by the pool. */
size?: number;

@@ -57,0 +88,0 @@ }

128

dist/master/pool.js

@@ -23,2 +23,8 @@ "use strict";

const hasSymbol = (name) => hasSymbols() && Boolean(Symbol[name]);
function flatMap(array, mapper) {
return array.reduce((flattened, element) => [...flattened, ...mapper(element)], []);
}
function sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
function slugify(text) {

@@ -33,2 +39,3 @@ return text.replace(/\W/g, " ").trim().replace(/\s+/g, "-");

PoolEventType["taskQueued"] = "taskQueued";
PoolEventType["taskQueueDrained"] = "taskQueueDrained";
PoolEventType["taskStart"] = "taskStart";

@@ -45,3 +52,3 @@ PoolEventType["terminated"] = "terminated";

function findIdlingWorker(workers, maxConcurrency) {
return workers.find(worker => worker.runningTasks.length < maxConcurrency);
return workers.find(worker => worker.runningJobs.length < maxConcurrency);
}

@@ -51,3 +58,3 @@ function spawnWorkers(spawnWorker, count) {

init: spawnWorker(),
runningTasks: []
runningJobs: []
}));

@@ -63,3 +70,2 @@ }

let nextTaskID = 1;
let runningTaskJobs = [];
const taskQueue = [];

@@ -76,3 +82,3 @@ const workers = spawnWorkers(spawnWorker, size);

const scheduleWork = () => {
debug(`Attempt de-queueing a task to run it...`);
debug(`Attempt de-queueing a task in order to run it...`);
const availableWorker = findIdlingWorker(workers, concurrency);

@@ -82,4 +88,7 @@ if (!availableWorker)

const nextTask = taskQueue.shift();
if (!nextTask)
if (!nextTask) {
debug(`Task queue is empty`);
eventSubject.next({ type: PoolEventType.taskQueueDrained });
return;
}
const workerID = workers.indexOf(availableWorker) + 1;

@@ -92,6 +101,12 @@ debug(`Running task #${nextTask.id} on worker #${workerID}...`);

});
const run = () => __awaiter(this, void 0, void 0, function* () {
const run = (worker, task) => __awaiter(this, void 0, void 0, function* () {
const removeJobFromWorkersRunningJobs = () => {
worker.runningJobs = worker.runningJobs.filter(someRunPromise => someRunPromise !== runPromise);
};
// Defer job execution by one tick to give handlers time to subscribe
yield sleep(0);
try {
const returnValue = yield nextTask.run(yield availableWorker.init);
const returnValue = yield task.run(yield availableWorker.init);
debug(`Task #${nextTask.id} completed successfully`);
removeJobFromWorkersRunningJobs();
eventSubject.next({

@@ -106,2 +121,3 @@ type: PoolEventType.taskCompleted,

debug(`Task #${nextTask.id} failed`);
removeJobFromWorkersRunningJobs();
eventSubject.next({

@@ -113,6 +129,4 @@ type: PoolEventType.taskFailed,

});
throw error;
}
finally {
runningTaskJobs = runningTaskJobs.filter(someRunPromise => someRunPromise !== runPromise);
if (!isClosing) {

@@ -123,48 +137,76 @@ scheduleWork();

});
const runPromise = run();
runningTaskJobs.push(runPromise);
const runPromise = run(availableWorker, nextTask);
availableWorker.runningJobs.push(runPromise);
};
const pool = {
events() {
return eventObservable;
},
queue(taskFunction) {
completed(allowResolvingImmediately = false) {
return __awaiter(this, void 0, void 0, function* () {
if (isClosing) {
throw Error(`Cannot schedule pool tasks after terminate() has been called.`);
const getCurrentlyRunningJobs = () => flatMap(workers, worker => worker.runningJobs);
if (allowResolvingImmediately && taskQueue.length === 0) {
return Promise.all(getCurrentlyRunningJobs());
}
const task = {
id: nextTaskID++,
run: taskFunction
};
debug(`Queueing task #${task.id}...`);
taskQueue.push(task);
eventSubject.next({
type: PoolEventType.taskQueued,
taskID: task.id
});
return new Promise((resolve, reject) => {
const eventSubscription = pool.events().subscribe(event => {
if (event.type === PoolEventType.taskCompleted && event.taskID === task.id) {
eventSubscription.unsubscribe();
resolve(event.returnValue);
const poolEventPromise = new Promise((resolve, reject) => {
const subscription = eventObservable.subscribe(event => {
if (event.type === PoolEventType.taskQueueDrained) {
subscription.unsubscribe();
resolve();
}
else if (event.type === PoolEventType.taskFailed && event.taskID === task.id) {
eventSubscription.unsubscribe();
else if (event.type === PoolEventType.taskFailed) {
subscription.unsubscribe();
reject(event.error);
}
else if (event.type === PoolEventType.terminated) {
eventSubscription.unsubscribe();
reject(Error("Pool has been terminated before task was run."));
}
});
try {
scheduleWork();
});
yield Promise.race([
poolEventPromise,
eventObservable // make a pool-wide error reject the completed() result promise
]);
yield Promise.all(getCurrentlyRunningJobs());
});
},
events() {
return eventObservable;
},
queue(taskFunction) {
if (isClosing) {
throw Error(`Cannot schedule pool tasks after terminate() has been called.`);
}
const task = {
id: nextTaskID++,
run: taskFunction
};
debug(`Queueing task #${task.id}...`);
taskQueue.push(task);
eventSubject.next({
type: PoolEventType.taskQueued,
taskID: task.id
});
const resultPromise = new Promise((resolve, reject) => {
const eventSubscription = pool.events().subscribe(event => {
if (event.type === PoolEventType.taskCompleted && event.taskID === task.id) {
eventSubscription.unsubscribe();
resolve(event.returnValue);
}
catch (error) {
else if (event.type === PoolEventType.taskFailed && event.taskID === task.id) {
eventSubscription.unsubscribe();
reject(error);
reject(event.error);
}
else if (event.type === PoolEventType.terminated) {
eventSubscription.unsubscribe();
reject(Error("Pool has been terminated before task was run."));
}
});
try {
scheduleWork();
}
catch (error) {
eventSubscription.unsubscribe();
reject(error);
}
});
// Don't raise an UnhandledPromiseRejection error if not handled
// Reason: Because we just return this promise for convenience, but usually only
// pool.completed() will be used, leaving this quasi-duplicate promise unhandled.
resultPromise.catch(() => undefined);
return resultPromise;
},

@@ -175,3 +217,3 @@ terminate(force) {

if (!force) {
yield Promise.all(runningTaskJobs);
yield pool.completed(true);
}

@@ -178,0 +220,0 @@ eventSubject.next({

import Observable from "zen-observable";
import { FunctionParams, FunctionThread, ModuleThread, Worker as WorkerType } from "../types/master";
import { WorkerFunction, WorkerModule } from "../types/worker";
declare type ExposedToThreadType<Exposed extends WorkerFunction | WorkerModule<any>> = Exposed extends WorkerFunction ? FunctionThread<FunctionParams<Exposed>, StripAsync<ReturnType<Exposed>>> : Exposed extends WorkerModule<any> ? ModuleThread<Exposed> : never;
declare type ArbitraryWorkerInterface = WorkerFunction & WorkerModule<string> & {
somekeythatisneverusedinproductioncode123: "magicmarker123";
};
declare type ArbitraryFunctionOrModuleThread = FunctionThread<any, any> & ModuleThread<any>;
declare type ExposedToThreadType<Exposed extends WorkerFunction | WorkerModule<any>> = Exposed extends ArbitraryWorkerInterface ? ArbitraryFunctionOrModuleThread : Exposed extends WorkerFunction ? FunctionThread<FunctionParams<Exposed>, StripAsync<ReturnType<Exposed>>> : Exposed extends WorkerModule<any> ? ModuleThread<Exposed> : never;
declare type StripAsync<Type> = Type extends Promise<infer PromiseBaseType> ? PromiseBaseType : Type extends Observable<infer ObservableBaseType> ? ObservableBaseType : Type;
export declare function spawn<Exposed extends WorkerFunction | WorkerModule<any>>(worker: WorkerType): Promise<ExposedToThreadType<Exposed>>;
export declare function spawn<Exposed extends WorkerFunction | WorkerModule<any> = ArbitraryWorkerInterface>(worker: WorkerType): Promise<ExposedToThreadType<Exposed>>;
export {};
declare type UnsubscribeFn = () => void;
export interface AbstractedWorkerAPI {
isWorkerRuntime(): boolean;
postMessageToMaster(message: any, transferList?: Transferable[]): void;

@@ -4,0 +5,0 @@ subscribeToMasterMessages(onMessage: (data: any) => void): UnsubscribeFn;

/// <reference no-default-lib="true"/>
declare const _default: {
isWorkerRuntime: () => boolean;
postMessageToMaster: (message: any, transferList?: Transferable[] | undefined) => void;

@@ -4,0 +5,0 @@ subscribeToMasterMessages: (onMessage: (data: any) => void) => () => void;

@@ -5,2 +5,5 @@ "use strict";

// tslint:disable no-shadowed-variable
const isWorkerRuntime = function isWorkerRuntime() {
return typeof self !== "undefined" && self.postMessage ? true : false;
};
const postMessageToMaster = function postMessageToMaster(data) {

@@ -21,4 +24,5 @@ // TODO: Transferables

module.exports = {
isWorkerRuntime,
postMessageToMaster,
subscribeToMasterMessages
};
/// <reference no-default-lib="true"/>
declare const _default: {
isWorkerRuntime: () => boolean;
postMessageToMaster: (message: any, transferList?: Transferable[] | undefined) => void;

@@ -4,0 +5,0 @@ subscribeToMasterMessages: (onMessage: (data: any) => void) => () => void;

@@ -5,2 +5,5 @@ "use strict";

// tslint:disable no-shadowed-variable
const isWorkerRuntime = function isWorkerRuntime() {
return typeof self !== "undefined" && self.postMessage ? true : false;
};
const postMessageToMaster = function postMessageToMaster(data) {

@@ -21,4 +24,5 @@ // TODO: Warn that Transferables are not supported on first attempt to use feature

module.exports = {
isWorkerRuntime,
postMessageToMaster,
subscribeToMasterMessages
};
declare const _default: {
isWorkerRuntime: () => boolean;
postMessageToMaster: (message: any, transferList?: Transferable[] | undefined) => void;

@@ -3,0 +4,0 @@ subscribeToMasterMessages: (onMessage: (data: any) => void) => () => void;

@@ -10,2 +10,5 @@ "use strict";

}
const isWorkerRuntime = function isWorkerRuntime() {
return !worker_threads_1.isMainThread;
};
const postMessageToMaster = function postMessageToMaster(data, transferList) {

@@ -28,4 +31,5 @@ assertMessagePort(worker_threads_1.parentPort).postMessage(data, transferList);

module.exports = {
isWorkerRuntime,
postMessageToMaster,
subscribeToMasterMessages
};

@@ -21,3 +21,3 @@ "use strict";

exports.Transfer = transferable_2.Transfer;
let exposedCalled = false;
let exposeCalled = false;
const isMasterJobRunMessage = (thing) => thing && thing.type === messages_1.MasterMessageType.run;

@@ -113,6 +113,9 @@ /** There are issues with `is-observable` not recognizing zen-observable's instances */

function expose(exposed) {
if (exposedCalled) {
if (!implementation_1.default.isWorkerRuntime()) {
throw Error("expose() called in the master thread.");
}
if (exposeCalled) {
throw Error("expose() called more than once. This is not possible. Pass an object to expose() if you want to expose multiple functions.");
}
exposedCalled = true;
exposeCalled = true;
if (typeof exposed === "function") {

@@ -119,0 +122,0 @@ implementation_1.default.subscribeToMasterMessages(messageData => {

{
"name": "threads",
"version": "1.0.0-alpha.3",
"version": "1.0.0-alpha.4",
"description": "Easy to use, yet powerful multi-threading library for node.js and the browser!",

@@ -5,0 +5,0 @@ "main": "dist/index.js",

@@ -176,3 +176,3 @@ <h1 align="center">threads</h1>

await pool.queue(async multiplier => {
pool.queue(async multiplier => {
const multiplied = await multiplier(2, 3)

@@ -185,2 +185,3 @@ console.log(`2 * 3 = ${multiplied}`)

await pool.completed()
await pool.terminate()

@@ -191,3 +192,3 @@ ```

The promise returned by `pool.queue()` will resolve once the scheduled callback has been executed and completed. A failing scheduled callback will also make the promise returned by `pool.queue()` reject.
The promise returned by `pool.completed()` will resolve once the scheduled callbacks have been executed and completed. A failing job will also make the promise reject.

@@ -194,0 +195,0 @@ </details>

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc