Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

piscina

Package Overview
Dependencies
Maintainers
6
Versions
36
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

piscina - npm Package Compare versions

Comparing version 4.8.0 to 5.0.0-alpha.0

benchmark/simple-benchmark-async.js

4

benchmark/piscina-queue-comparison.js

@@ -18,3 +18,3 @@ const { Bench } = require('tinybench');

for (let i = 0; i < QUEUE_SIZE; i++) {
tasks.push(pool.runTask({ a: 4, b: 6 }));
tasks.push(pool.run({ a: 4, b: 6 }));
}

@@ -32,3 +32,3 @@ await Promise.all(tasks);

for (let i = 0; i < QUEUE_SIZE; i++) {
tasks.push(pool.runTask({ a: 4, b: 6 }));
tasks.push(pool.run({ a: 4, b: 6 }));
}

@@ -35,0 +35,0 @@ await Promise.all(tasks);

@@ -21,3 +21,3 @@ 'use strict';

while ((process.hrtime.bigint() - start) / 1_000_000n < duration) {
await pool.runTask({ a: 4, b: 6 });
await pool.run({ a: 4, b: 6 });
done++;

@@ -24,0 +24,0 @@ }

@@ -17,3 +17,3 @@ 'use strict';

while ((process.hrtime.bigint() - start) / 1_000_000n < duration) {
await pool.runTask({ a: 4, b: 6 });
await pool.run({ a: 4, b: 6 });
done++;

@@ -20,0 +20,0 @@ }

@@ -5,10 +5,10 @@ # Changelog

## [4.8.0](https://github.com/piscinajs/piscina/compare/v4.7.0...v4.8.0) (2024-12-04)
## [5.0.0-alpha.0](https://github.com/piscinajs/piscina/compare/v4.6.1...v5.0.0-alpha.0) (2024-12-04)
## [4.7.0](https://github.com/piscinajs/piscina/compare/v4.6.1...v4.7.0) (2024-09-18)
### Features
* **backport:** use @napi-rs/nice to support Windows ([#655](https://github.com/piscinajs/piscina/issues/655)) ([#660](https://github.com/piscinajs/piscina/issues/660)) ([541295d](https://github.com/piscinajs/piscina/commit/541295dad21c4c3f43bc133d78d82871fa86c864))
* Custom Balancer ([#590](https://github.com/piscinajs/piscina/issues/590)) ([5c42b28](https://github.com/piscinajs/piscina/commit/5c42b28942f39399ea4aad39dd1f4367959c1e8f))
* support Atomics.waitAsync ([#687](https://github.com/piscinajs/piscina/issues/687)) ([9c5a19e](https://github.com/piscinajs/piscina/commit/9c5a19ea491b159b82f23512811555a5c4aa2d3f))
* use @napi-rs/nice to support Windows ([#655](https://github.com/piscinajs/piscina/issues/655)) ([c567394](https://github.com/piscinajs/piscina/commit/c56739465000f455fcf7abc2f83501054cab22a4))

@@ -15,0 +15,0 @@

@@ -87,14 +87,5 @@ "use strict";

}
// TODO: drop on v5
function getAvailableParallelism() {
if (typeof node_os_1.availableParallelism === 'function') {
return (0, node_os_1.availableParallelism)();
}
try {
return (0, node_os_1.cpus)().length;
}
catch {
return 1;
}
return (0, node_os_1.availableParallelism)();
}
//# sourceMappingURL=common.js.map
import { Worker, MessagePort } from 'node:worker_threads';
import { EventEmitterAsyncResource } from 'node:events';
import { version } from '../package.json';
import type { Transferable, ResourceLimits, EnvSpecifier } from './types';
import type { Transferable, ResourceLimits, EnvSpecifier, HistogramSummary } from './types';
import { kQueueOptions, kTransferable, kValue } from './symbols';
import { TaskQueue, ArrayTaskQueue, FixedQueue, PiscinaTask, TransferList, TransferListItem } from './task_queue';
import { PiscinaLoadBalancer } from './worker_pool';
import { AbortSignalAny } from './abort';

@@ -16,3 +17,3 @@ interface Options {

concurrentTasksPerWorker?: number;
useAtomics?: boolean;
atomics?: 'sync' | 'async' | 'disabled';
resourceLimits?: ResourceLimits;

@@ -28,2 +29,4 @@ argv?: string[];

recordTiming?: boolean;
loadBalancer?: PiscinaLoadBalancer;
workerHistogram?: boolean;
}

@@ -38,3 +41,3 @@ interface FilledOptions extends Options {

concurrentTasksPerWorker: number;
useAtomics: boolean;
atomics: Options['atomics'];
taskQueue: TaskQueue;

@@ -44,2 +47,3 @@ niceIncrement: number;

recordTiming: boolean;
workerHistogram: boolean;
}

@@ -58,10 +62,2 @@ interface RunOptions {

constructor(options?: Options);
/** @deprecated Use run(task, options) instead **/
runTask(task: T, transferList?: TransferList, filename?: string, abortSignal?: AbortSignalAny): Promise<R>;
/** @deprecated Use run(task, options) instead **/
runTask(task: T, transferList?: TransferList, filename?: AbortSignalAny, abortSignal?: undefined): Promise<R>;
/** @deprecated Use run(task, options) instead **/
runTask(task: T, transferList?: string, filename?: AbortSignalAny, abortSignal?: undefined): Promise<R>;
/** @deprecated Use run(task, options) instead **/
runTask(task: T, transferList?: AbortSignalAny, filename?: undefined, abortSignal?: undefined): Promise<R>;
run(task: T, options?: RunOptions): Promise<R>;

@@ -76,3 +72,3 @@ close(options?: CloseOptions): Promise<void>;

get completed(): number;
get waitTime(): any;
get waitTime(): HistogramSummary | null;
get runTime(): any;

@@ -88,3 +84,3 @@ get utilization(): number;

static get ArrayTaskQueue(): typeof ArrayTaskQueue;
static move(val: Transferable | TransferListItem | ArrayBufferView | ArrayBuffer | MessagePort): ArrayBuffer | ArrayBufferView | MessagePort | Transferable;
static move(val: Transferable | TransferListItem | ArrayBufferView | ArrayBuffer | MessagePort): ArrayBuffer | ArrayBufferView<ArrayBufferLike> | MessagePort | Transferable;
static get transferableSymbol(): symbol;

@@ -91,0 +87,0 @@ static get valueSymbol(): symbol;

@@ -47,3 +47,3 @@ "use strict";

concurrentTasksPerWorker: 1,
useAtomics: true,
atomics: 'sync',
taskQueue: new task_queue_1.ArrayTaskQueue(),

@@ -53,3 +53,4 @@ niceIncrement: 0,

closeTimeout: 30000,
recordTiming: true
recordTiming: true,
workerHistogram: false
};

@@ -83,3 +84,3 @@ const kDefaultRunOptions = {

constructor(publicInterface, options) {
var _a;
var _a, _b, _c;
this.skipQueue = [];

@@ -94,3 +95,3 @@ this.completed = 0;

this.publicInterface = publicInterface;
this.taskQueue = options.taskQueue || new task_queue_1.ArrayTaskQueue();
this.taskQueue = (_a = options.taskQueue) !== null && _a !== void 0 ? _a : new task_queue_1.FixedQueue();
const filename = options.filename ? (0, common_1.maybeFileURLToPath)(options.filename) : null;

@@ -115,10 +116,12 @@ this.options = { ...kDefaultOptions, ...options, filename, maxQueue: 0 };

else {
this.options.maxQueue = (_a = options.maxQueue) !== null && _a !== void 0 ? _a : kDefaultOptions.maxQueue;
this.options.maxQueue = (_b = options.maxQueue) !== null && _b !== void 0 ? _b : kDefaultOptions.maxQueue;
}
this.balancer = (_c = this.options.loadBalancer) !== null && _c !== void 0 ? _c : (0, worker_pool_1.LeastBusyBalancer)({ maximumUsage: this.options.concurrentTasksPerWorker });
this.workers = new worker_pool_1.AsynchronouslyCreatedResourcePool(this.options.concurrentTasksPerWorker);
this.workers.onAvailable((w) => this._onWorkerAvailable(w));
this.workers.onTaskDone((w) => this._onWorkerTaskDone(w));
this.maxCapacity = this.options.maxThreads * this.options.concurrentTasksPerWorker;
this.startingUp = true;
this._ensureMinimumWorkers();
this.startingUp = false;
this.needsDrain = false;
this._needsDrain = false;
}

@@ -134,2 +137,4 @@ _ensureMinimumWorkers() {

_addNewWorker() {
if (this.closingUp)
return;
const pool = this;

@@ -145,3 +150,6 @@ const worker = new node_worker_threads_1.Worker((0, node_path_1.resolve)(__dirname, 'worker.js'), {

const { port1, port2 } = new node_worker_threads_1.MessageChannel();
const workerInfo = new worker_pool_1.WorkerInfo(worker, port1, onMessage);
const workerInfo = new worker_pool_1.WorkerInfo(worker, port1, onMessage, this.options.workerHistogram);
workerInfo.onDestroy(() => {
this.publicInterface.emit('workerDestroy', workerInfo.interface);
});
if (this.startingUp) {

@@ -151,3 +159,15 @@ // There is no point in waiting for the initial set of Workers to indicate

workerInfo.markAsReady();
// We need to emit the event in the next microtask, so that the user can
// attach event listeners before the event is emitted.
queueMicrotask(() => {
this.publicInterface.emit('workerCreate', workerInfo.interface);
this._onWorkerReady(workerInfo);
});
}
else {
workerInfo.onReady(() => {
this.publicInterface.emit('workerCreate', workerInfo.interface);
this._onWorkerReady(workerInfo);
});
}
const message = {

@@ -158,3 +178,3 @@ filename: this.options.filename,

sharedBuffer: workerInfo.sharedBuffer,
useAtomics: this.options.useAtomics,
atomics: this.options.atomics,
niceIncrement: this.options.niceIncrement

@@ -170,3 +190,5 @@ };

workerInfo.taskInfos.delete(taskId);
pool.workers.maybeAvailable(workerInfo);
// TODO: we can abstract the task info handling
// right into the pool.workers.taskDone method
pool.workers.taskDone(workerInfo);
/* istanbul ignore if */

@@ -246,3 +268,3 @@ if (taskInfo === undefined) {

_processPendingMessages() {
if (this.inProcessPendingMessages || !this.options.useAtomics) {
if (this.inProcessPendingMessages || this.options.atomics === 'disabled') {
return;

@@ -264,6 +286,11 @@ }

}
_onWorkerReady(workerInfo) {
this._onWorkerAvailable(workerInfo);
}
_onWorkerTaskDone(workerInfo) {
this._onWorkerAvailable(workerInfo);
}
_onWorkerAvailable(workerInfo) {
var _a;
while ((this.taskQueue.size > 0 || this.skipQueue.length > 0) &&
workerInfo.currentUsage() < this.options.concurrentTasksPerWorker) {
let workers = null;
while ((this.taskQueue.size > 0 || this.skipQueue.length > 0)) {
// The skipQueue will have tasks that we previously shifted off

@@ -274,19 +301,26 @@ // the task queue but had to skip over... we have to make sure

this.taskQueue.shift();
// If the task has an abortSignal and the worker has any other
// tasks, we cannot distribute the task to it. Skip for now.
if (taskInfo.abortSignal && workerInfo.taskInfos.size > 0) {
this.skipQueue.push(taskInfo);
if (workers == null) {
workers = [...this.workers].map(workerInfo => workerInfo.interface);
}
const distributed = this._distributeTask(taskInfo, workers);
if (distributed) {
// If task was distributed, we should continue to distribute more tasks
continue;
}
else if (this.workers.size < this.options.maxThreads) {
// We spawn if possible
// TODO: scheduler will intercept this.
this._addNewWorker();
continue;
}
else {
// If balancer states that pool is busy, we should stop trying to distribute tasks
break;
}
const now = node_perf_hooks_1.performance.now();
(_a = this.waitTime) === null || _a === void 0 ? void 0 : _a.record((0, common_1.toHistogramIntegerNano)(now - taskInfo.created));
taskInfo.started = now;
workerInfo.postTask(taskInfo);
this._maybeDrain();
return;
}
if (workerInfo.taskInfos.size === 0 &&
// If more workers than minThreads, we can remove idle workers
if (workerInfo.currentUsage() === 0 &&
this.workers.size > this.options.minThreads) {
workerInfo.idleTimeout = setTimeout(() => {
node_assert_1.default.strictEqual(workerInfo.taskInfos.size, 0);
node_assert_1.default.strictEqual(workerInfo.currentUsage(), 0);
if (this.workers.size > this.options.minThreads) {

@@ -298,4 +332,29 @@ this._removeWorker(workerInfo);

}
_distributeTask(task, workers) {
var _a;
// We need to verify if the task is aborted already or not
// otherwise we might be distributing aborted tasks to workers
if (task.aborted)
return false;
const candidate = this.balancer(task.interface, workers);
// Seeking for a real worker instead of customized one
if (candidate != null && candidate[symbols_1.kWorkerData] != null) {
const now = node_perf_hooks_1.performance.now();
(_a = this.waitTime) === null || _a === void 0 ? void 0 : _a.record((0, common_1.toHistogramIntegerNano)(now - task.created));
task.started = now;
candidate[symbols_1.kWorkerData].postTask(task);
this._maybeDrain();
// If candidate, let's try to distribute more tasks
return true;
}
if (task.abortSignal) {
this.skipQueue.push(task);
}
else {
this.taskQueue.push(task);
}
return false;
}
runTask(task, options) {
var _a, _b;
var _a;
let { filename, name } = options;

@@ -314,5 +373,5 @@ const { transferList = [] } = options;

let signal;
if (this.closingUp) {
if (this.closingUp || this.destroying) {
const closingUpAbortController = new AbortController();
closingUpAbortController.abort('queue is closing up');
closingUpAbortController.abort('queue is being terminated');
signal = closingUpAbortController.signal;

@@ -345,3 +404,4 @@ }

if (signal.aborted) {
return Promise.reject(new abort_1.AbortError(signal.reason));
reject(new abort_1.AbortError(signal.reason));
return ret;
}

@@ -360,2 +420,3 @@ taskInfo.abortListener = () => {

// Not yet running: Remove it from the queue.
// Call should be idempotent
this.taskQueue.remove(taskInfo);

@@ -366,4 +427,2 @@ }

}
// If there is a task queue, there's no point in looking for an available
// Worker thread. Add this task to the queue, if possible.
if (this.taskQueue.size > 0) {

@@ -373,12 +432,9 @@ const totalCapacity = this.options.maxQueue + this.pendingCapacity();

if (this.options.maxQueue === 0) {
return Promise.reject(errors_1.Errors.NoTaskQueueAvailable());
reject(errors_1.Errors.NoTaskQueueAvailable());
}
else {
return Promise.reject(errors_1.Errors.TaskQueueAtLimit());
reject(errors_1.Errors.TaskQueueAtLimit());
}
}
else {
if (this.workers.size < this.options.maxThreads) {
this._addNewWorker();
}
this.taskQueue.push(taskInfo);

@@ -389,33 +445,16 @@ }

}
// Look for a Worker with a minimum number of tasks it is currently running.
let workerInfo = this.workers.findAvailable();
// If we want the ability to abort this task, use only workers that have
// no running tasks.
if (workerInfo !== null && workerInfo.currentUsage() > 0 && signal) {
workerInfo = null;
}
// If no Worker was found, or that Worker was handling another task in some
// way, and we still have the ability to spawn new threads, do so.
let waitingForNewWorker = false;
if ((workerInfo === null || workerInfo.currentUsage() > 0) &&
this.workers.size < this.options.maxThreads) {
this._addNewWorker();
waitingForNewWorker = true;
}
// If no Worker is found, try to put the task into the queue.
if (workerInfo === null) {
if (this.options.maxQueue <= 0 && !waitingForNewWorker) {
return Promise.reject(errors_1.Errors.NoTaskQueueAvailable());
const workers = [...this.workers.readyItems].map(workerInfo => workerInfo.interface);
const distributed = this._distributeTask(taskInfo, workers);
if (!distributed) {
// We spawn if possible
// TODO: scheduler will intercept this.
if (this.workers.size < this.options.maxThreads) {
this._addNewWorker();
}
else {
this.taskQueue.push(taskInfo);
// We reject if no task queue set and no more pending capacity.
if (this.options.maxQueue <= 0 && this.pendingCapacity() === 0) {
reject(errors_1.Errors.NoTaskQueueAvailable());
}
this._maybeDrain();
return ret;
}
// TODO(addaleax): Clean up the waitTime/runTime recording.
const now = node_perf_hooks_1.performance.now();
(_b = this.waitTime) === null || _b === void 0 ? void 0 : _b.record((0, common_1.toHistogramIntegerNano)(now - taskInfo.created));
taskInfo.started = now;
workerInfo.postTask(taskInfo);
;
this._maybeDrain();

@@ -429,12 +468,18 @@ return ret;

_maybeDrain() {
const totalCapacity = this.options.maxQueue + this.pendingCapacity();
const totalQueueSize = this.taskQueue.size + this.skipQueue.length;
if (totalQueueSize === 0) {
this.needsDrain = false;
/**
* Our goal is to make it possible for user space to use the pool
* in a way where always waiting === 0,
* since we want to avoid creating tasks that can't execute
* immediately in order to provide back pressure to the task source.
*/
const { maxCapacity } = this;
const currentUsage = this.workers.getCurrentUsage();
if (maxCapacity === currentUsage) {
this._needsDrain = true;
this.publicInterface.emit('needsDrain');
}
else if (maxCapacity > currentUsage && this._needsDrain) {
this._needsDrain = false;
this.publicInterface.emit('drain');
}
if (totalQueueSize >= totalCapacity) {
this.needsDrain = true;
this.publicInterface.emit('needsDrain');
}
}

@@ -505,7 +550,7 @@ async destroy() {

checkIfWorkerIsDone(workerInfo);
workerInfo.port.on('message', () => checkIfWorkerIsDone(workerInfo));
this.workers.onTaskDone(checkIfWorkerIsDone);
}
});
const throwOnTimeOut = async (timeout) => {
await (0, promises_1.setTimeout)(timeout);
await (0, promises_1.setTimeout)(timeout, null, { ref: false });
throw errors_1.Errors.CloseTimeout();

@@ -565,5 +610,5 @@ };

}
if (options.useAtomics !== undefined &&
typeof options.useAtomics !== 'boolean') {
throw new TypeError('options.useAtomics must be a boolean value');
if (options.atomics != null && (typeof options.atomics !== 'string' ||
!['sync', 'async', 'disabled'].includes(options.atomics))) {
throw new TypeError('options.atomics should be a value of sync, sync or disabled.');
}

@@ -589,33 +634,9 @@ if (options.resourceLimits !== undefined &&

}
__classPrivateFieldSet(this, _Piscina_pool, new ThreadPool(this, options), "f");
}
/** @deprecated Use run(task, options) instead **/
runTask(task, transferList, filename, signal) {
// If transferList is a string or AbortSignal, shift it.
if ((typeof transferList === 'object' && !Array.isArray(transferList)) ||
typeof transferList === 'string') {
signal = filename;
filename = transferList;
transferList = undefined;
if (options.loadBalancer !== undefined && (typeof options.loadBalancer !== 'function' || options.loadBalancer.length < 1)) {
throw new TypeError('options.loadBalancer must be a function with at least two args');
}
// If filename is an AbortSignal, shift it.
if (typeof filename === 'object' && !Array.isArray(filename)) {
signal = filename;
filename = undefined;
if (options.workerHistogram !== undefined && (typeof options.workerHistogram !== 'boolean')) {
throw new TypeError('options.workerHistogram must be a boolean');
}
if (transferList !== undefined && !Array.isArray(transferList)) {
return Promise.reject(new TypeError('transferList argument must be an Array'));
}
if (filename !== undefined && typeof filename !== 'string') {
return Promise.reject(new TypeError('filename argument must be a string'));
}
if (signal !== undefined && typeof signal !== 'object') {
return Promise.reject(new TypeError('signal argument must be an object'));
}
return __classPrivateFieldGet(this, _Piscina_pool, "f").runTask(task, {
transferList,
filename: filename || null,
name: 'default',
signal: signal || null
});
__classPrivateFieldSet(this, _Piscina_pool, new ThreadPool(this, options), "f");
}

@@ -701,2 +722,5 @@ run(task, options = kDefaultRunOptions) {

}
if (!__classPrivateFieldGet(this, _Piscina_pool, "f").runTime) {
return 0;
}
// The capacity is the max compute time capacity of the

@@ -724,3 +748,3 @@ // pool to this point in time as determined by the length

get needsDrain() {
return __classPrivateFieldGet(this, _Piscina_pool, "f").needsDrain;
return __classPrivateFieldGet(this, _Piscina_pool, "f")._needsDrain;
}

@@ -727,0 +751,0 @@ static get isWorkerThread() {

export declare const kMovable: unique symbol;
export declare const kWorkerData: unique symbol;
export declare const kTransferable: unique symbol;

@@ -3,0 +4,0 @@ export declare const kValue: unique symbol;

"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.kFieldCount = exports.kResponseCountField = exports.kRequestCountField = exports.kQueueOptions = exports.kValue = exports.kTransferable = exports.kMovable = void 0;
exports.kFieldCount = exports.kResponseCountField = exports.kRequestCountField = exports.kQueueOptions = exports.kValue = exports.kTransferable = exports.kWorkerData = exports.kMovable = void 0;
// Internal symbol used to mark Transferable objects returned
// by the Piscina.move() function
exports.kMovable = Symbol('Piscina.kMovable');
exports.kWorkerData = Symbol('Piscina.kWorkerData');
exports.kTransferable = Symbol.for('Piscina.transferable');

@@ -8,0 +9,0 @@ exports.kValue = Symbol.for('Piscina.valueOf');

@@ -30,12 +30,15 @@ import type { MessagePort } from 'node:worker_threads';

abortSignal: AbortSignalAny | null;
abortListener: (() => void) | null;
workerInfo: WorkerInfo | null;
created: number;
started: number;
aborted: boolean;
_abortListener: (() => void) | null;
constructor(task: any, transferList: TransferList, filename: string, name: string, callback: TaskCallback, abortSignal: AbortSignalAny | null, triggerAsyncId: number);
set abortListener(value: (() => void));
get abortListener(): (() => void) | null;
releaseTask(): any;
done(err: Error | null, result?: any): void;
get [kQueueOptions](): object | null;
get [kQueueOptions](): {} | null;
get interface(): PiscinaTask;
}
export { Task, TaskQueue, PiscinaTask };

@@ -34,4 +34,6 @@ "use strict";

super('Piscina.Task', { requireManualDestroy: true, triggerAsyncId });
this.abortListener = null;
// abortListener : (() => void) | null = null;
this.workerInfo = null;
this.aborted = false;
this._abortListener = null;
this.callback = callback;

@@ -56,2 +58,3 @@ this.task = task;

this.name = name;
// TODO: This should not be global
this.taskId = taskIdCounter++;

@@ -62,2 +65,12 @@ this.abortSignal = abortSignal;

}
// TODO: improve this handling - ideally should be extended
set abortListener(value) {
this._abortListener = () => {
this.aborted = true;
value();
};
}
get abortListener() {
return this._abortListener;
}
releaseTask() {

@@ -83,3 +96,4 @@ const ret = this.task;

get [symbols_1.kQueueOptions]() {
return symbols_1.kQueueOptions in this.task ? this.task[symbols_1.kQueueOptions] : null;
var _a, _b;
return (_b = (_a = this.task) === null || _a === void 0 ? void 0 : _a[symbols_1.kQueueOptions]) !== null && _b !== void 0 ? _b : null;
}

@@ -86,0 +100,0 @@ get interface() {

@@ -9,3 +9,3 @@ import type { MessagePort, Worker } from 'node:worker_threads';

sharedBuffer: Int32Array;
useAtomics: boolean;
atomics: 'async' | 'sync' | 'disabled';
niceIncrement: number;

@@ -18,2 +18,3 @@ }

name: string;
histogramEnabled: number;
}

@@ -27,2 +28,3 @@ export interface ReadyMessage {

error: Error | null;
time: number | null;
}

@@ -29,0 +31,0 @@ export declare const commonState: {

import { Worker, MessagePort } from 'node:worker_threads';
import { ResponseMessage } from '../types';
import { RecordableHistogram } from 'node:perf_hooks';
import { HistogramSummary, ResponseMessage } from '../types';
import { TaskInfo } from '../task_queue';
import { kWorkerData } from '../symbols';
import { AsynchronouslyCreatedResource, AsynchronouslyCreatedResourcePool } from './base';
export * from './balancer';
type ResponseCallback = (response: ResponseMessage) => void;
declare abstract class AsynchronouslyCreatedResource {
onreadyListeners: (() => void)[] | null;
markAsReady(): void;
isReady(): boolean;
onReady(fn: () => void): void;
abstract currentUsage(): number;
}
export declare class AsynchronouslyCreatedResourcePool<T extends AsynchronouslyCreatedResource> {
pendingItems: Set<T>;
readyItems: Set<T>;
maximumUsage: number;
onAvailableListeners: ((item: T) => void)[];
constructor(maximumUsage: number);
add(item: T): void;
delete(item: T): void;
findAvailable(): T | null;
[Symbol.iterator](): Generator<T, void, unknown>;
get size(): number;
maybeAvailable(item: T): void;
onAvailable(fn: (item: T) => void): void;
}
export type PiscinaWorker = {
id: number;
currentUsage: number;
isRunningAbortableTask: boolean;
histogram: HistogramSummary | null;
terminating: boolean;
destroyed: boolean;
[kWorkerData]: WorkerInfo;
};
export declare class WorkerInfo extends AsynchronouslyCreatedResource {

@@ -34,3 +26,7 @@ worker: Worker;

onMessage: ResponseCallback;
constructor(worker: Worker, port: MessagePort, onMessage: ResponseCallback);
histogram: RecordableHistogram | null;
terminating: boolean;
destroyed: boolean;
constructor(worker: Worker, port: MessagePort, onMessage: ResponseCallback, enableHistogram: boolean);
get id(): number;
destroy(): void;

@@ -45,3 +41,4 @@ clearIdleTimeout(): void;

currentUsage(): number;
get interface(): PiscinaWorker;
}
export {};
export { AsynchronouslyCreatedResourcePool };
"use strict";
var __createBinding = (this && this.__createBinding) || (Object.create ? (function(o, m, k, k2) {
if (k2 === undefined) k2 = k;
var desc = Object.getOwnPropertyDescriptor(m, k);
if (!desc || ("get" in desc ? !m.__esModule : desc.writable || desc.configurable)) {
desc = { enumerable: true, get: function() { return m[k]; } };
}
Object.defineProperty(o, k2, desc);
}) : (function(o, m, k, k2) {
if (k2 === undefined) k2 = k;
o[k2] = m[k];
}));
var __exportStar = (this && this.__exportStar) || function(m, exports) {
for (var p in m) if (p !== "default" && !Object.prototype.hasOwnProperty.call(exports, p)) __createBinding(exports, m, p);
};
var __importDefault = (this && this.__importDefault) || function (mod) {

@@ -6,91 +20,19 @@ return (mod && mod.__esModule) ? mod : { "default": mod };

Object.defineProperty(exports, "__esModule", { value: true });
exports.WorkerInfo = exports.AsynchronouslyCreatedResourcePool = void 0;
exports.AsynchronouslyCreatedResourcePool = exports.WorkerInfo = void 0;
const node_worker_threads_1 = require("node:worker_threads");
const node_perf_hooks_1 = require("node:perf_hooks");
const node_assert_1 = __importDefault(require("node:assert"));
const errors_1 = require("../errors");
const symbols_1 = require("../symbols");
class AsynchronouslyCreatedResource {
constructor() {
this.onreadyListeners = [];
}
markAsReady() {
const listeners = this.onreadyListeners;
(0, node_assert_1.default)(listeners !== null);
this.onreadyListeners = null;
for (const listener of listeners) {
listener();
}
}
isReady() {
return this.onreadyListeners === null;
}
onReady(fn) {
if (this.onreadyListeners === null) {
fn(); // Zalgo is okay here.
return;
}
this.onreadyListeners.push(fn);
}
}
class AsynchronouslyCreatedResourcePool {
constructor(maximumUsage) {
this.pendingItems = new Set();
this.readyItems = new Set();
this.maximumUsage = maximumUsage;
this.onAvailableListeners = [];
}
add(item) {
this.pendingItems.add(item);
item.onReady(() => {
/* istanbul ignore else */
if (this.pendingItems.has(item)) {
this.pendingItems.delete(item);
this.readyItems.add(item);
this.maybeAvailable(item);
}
});
}
delete(item) {
this.pendingItems.delete(item);
this.readyItems.delete(item);
}
findAvailable() {
let minUsage = this.maximumUsage;
let candidate = null;
for (const item of this.readyItems) {
const usage = item.currentUsage();
if (usage === 0)
return item;
if (usage < minUsage) {
candidate = item;
minUsage = usage;
}
}
return candidate;
}
*[Symbol.iterator]() {
yield* this.pendingItems;
yield* this.readyItems;
}
get size() {
return this.pendingItems.size + this.readyItems.size;
}
maybeAvailable(item) {
/* istanbul ignore else */
if (item.currentUsage() < this.maximumUsage) {
for (const listener of this.onAvailableListeners) {
listener(item);
}
}
}
onAvailable(fn) {
this.onAvailableListeners.push(fn);
}
}
exports.AsynchronouslyCreatedResourcePool = AsynchronouslyCreatedResourcePool;
class WorkerInfo extends AsynchronouslyCreatedResource {
constructor(worker, port, onMessage) {
const common_1 = require("../common");
const base_1 = require("./base");
Object.defineProperty(exports, "AsynchronouslyCreatedResourcePool", { enumerable: true, get: function () { return base_1.AsynchronouslyCreatedResourcePool; } });
__exportStar(require("./balancer"), exports);
class WorkerInfo extends base_1.AsynchronouslyCreatedResource {
constructor(worker, port, onMessage, enableHistogram) {
super();
this.idleTimeout = null; // eslint-disable-line no-undef
this.idleTimeout = null;
this.lastSeenResponseCount = 0;
this.terminating = false;
this.destroyed = false;
this.worker = worker;

@@ -102,4 +44,11 @@ this.port = port;

this.sharedBuffer = new Int32Array(new SharedArrayBuffer(symbols_1.kFieldCount * Int32Array.BYTES_PER_ELEMENT));
this.histogram = enableHistogram ? (0, node_perf_hooks_1.createHistogram)() : null;
}
get id() {
return this.worker.threadId;
}
destroy() {
if (this.terminating || this.destroyed)
return;
this.terminating = true;
this.worker.terminate();

@@ -112,5 +61,8 @@ this.port.close();

this.taskInfos.clear();
this.terminating = false;
this.destroyed = true;
this.markAsDestroyed();
}
clearIdleTimeout() {
if (this.idleTimeout !== null) {
if (this.idleTimeout != null) {
clearTimeout(this.idleTimeout);

@@ -131,2 +83,6 @@ this.idleTimeout = null;

_handleResponse(message) {
var _a;
if (message.time != null) {
(_a = this.histogram) === null || _a === void 0 ? void 0 : _a.record((0, common_1.toHistogramIntegerNano)(message.time));
}
this.onMessage(message);

@@ -141,2 +97,3 @@ if (this.taskInfos.size === 0) {

(0, node_assert_1.default)(!this.taskInfos.has(taskInfo.taskId));
(0, node_assert_1.default)(!this.terminating && !this.destroyed);
const message = {

@@ -146,3 +103,4 @@ task: taskInfo.releaseTask(),

filename: taskInfo.filename,
name: taskInfo.name
name: taskInfo.name,
histogramEnabled: this.histogram != null ? 1 : 0
};

@@ -168,2 +126,4 @@ try {

processPendingMessages() {
if (this.destroyed)
return;
// If we *know* that there are more messages than we have received using

@@ -195,4 +155,28 @@ // 'message' events yet, then try to load and handle them synchronously,

}
get interface() {
const worker = this;
return {
get id() {
return worker.worker.threadId;
},
get currentUsage() {
return worker.currentUsage();
},
get isRunningAbortableTask() {
return worker.isRunningAbortableTask();
},
get histogram() {
return worker.histogram != null ? (0, common_1.createHistogramSummary)(worker.histogram) : null;
},
get terminating() {
return worker.terminating;
},
get destroyed() {
return worker.destroyed;
},
[symbols_1.kWorkerData]: worker
};
}
}
exports.WorkerInfo = WorkerInfo;
//# sourceMappingURL=index.js.map

@@ -18,12 +18,23 @@ "use strict";

});
var __importStar = (this && this.__importStar) || function (mod) {
if (mod && mod.__esModule) return mod;
var result = {};
if (mod != null) for (var k in mod) if (k !== "default" && Object.prototype.hasOwnProperty.call(mod, k)) __createBinding(result, mod, k);
__setModuleDefault(result, mod);
return result;
};
var __importStar = (this && this.__importStar) || (function () {
var ownKeys = function(o) {
ownKeys = Object.getOwnPropertyNames || function (o) {
var ar = [];
for (var k in o) if (Object.prototype.hasOwnProperty.call(o, k)) ar[ar.length] = k;
return ar;
};
return ownKeys(o);
};
return function (mod) {
if (mod && mod.__esModule) return mod;
var result = {};
if (mod != null) for (var k = ownKeys(mod), i = 0; i < k.length; i++) if (k[i] !== "default") __createBinding(result, mod, k[i]);
__setModuleDefault(result, mod);
return result;
};
})();
Object.defineProperty(exports, "__esModule", { value: true });
const node_worker_threads_1 = require("node:worker_threads");
const node_url_1 = require("node:url");
const node_perf_hooks_1 = require("node:perf_hooks");
const symbols_1 = require("./symbols");

@@ -33,4 +44,6 @@ const common_1 = require("./common");

common_1.commonState.workerData = node_worker_threads_1.workerData;
function noop() { }
const handlerCache = new Map();
let useAtomics = process.env.PISCINA_DISABLE_ATOMICS !== '1';
let useAsyncAtomics = process.env.PISCINA_ENABLE_ASYNC_ATOMICS === '1';
// Get `import(x)` as a function that isn't transpiled to `require(x)` by

@@ -52,3 +65,3 @@ // TypeScript for dual ESM/CJS support.

let handler = handlerCache.get(`${filename}/${name}`);
if (handler !== undefined) {
if (handler != null) {
return handler;

@@ -87,20 +100,26 @@ }

// (so we can pre-load and cache the handler).
node_worker_threads_1.parentPort.on('message', (message) => {
useAtomics = process.env.PISCINA_DISABLE_ATOMICS === '1' ? false : message.useAtomics;
node_worker_threads_1.parentPort.on('message', async (message) => {
var _a;
const { port, sharedBuffer, filename, name, niceIncrement } = message;
(async function () {
try {
if (niceIncrement !== 0) {
(await Promise.resolve().then(() => __importStar(require('@napi-rs/nice')))).nice(niceIncrement);
}
}
catch { }
if (filename !== null) {
if (niceIncrement !== 0) {
(_a = (await Promise.resolve().then(() => __importStar(require('@napi-rs/nice'))).catch(noop))) === null || _a === void 0 ? void 0 : _a.nice(niceIncrement);
}
try {
if (filename != null) {
await getHandler(filename, name);
}
const readyMessage = { [common_1.READY]: true };
useAtomics = useAtomics !== false && message.atomics !== 'disabled';
useAsyncAtomics = useAtomics !== false && (useAsyncAtomics || message.atomics === 'async');
node_worker_threads_1.parentPort.postMessage(readyMessage);
port.on('message', onMessage.bind(null, port, sharedBuffer));
atomicsWaitLoop(port, sharedBuffer);
})().catch(throwInNextTick);
if (useAtomics) {
const res = atomicsWaitLoop(port, sharedBuffer);
if ((res === null || res === void 0 ? void 0 : res.then) != null)
await res;
}
}
catch (error) {
throwInNextTick(error);
}
});

@@ -110,4 +129,2 @@ let currentTasks = 0;

function atomicsWaitLoop(port, sharedBuffer) {
if (!useAtomics)
return;
// This function is entered either after receiving the startup message, or

@@ -124,2 +141,16 @@ // when we are done with a task. In those situations, the *only* thing we

// operations without waiting for them to finish, though.
if (useAsyncAtomics === true) {
// @ts-expect-error - for some reason not supported by TS
const { async, value } = Atomics.waitAsync(sharedBuffer, symbols_1.kRequestCountField, lastSeenRequestCount);
// We do not check for result
return async === true && value.then(() => {
lastSeenRequestCount = Atomics.load(sharedBuffer, symbols_1.kRequestCountField);
// We have to read messages *after* updating lastSeenRequestCount in order
// to avoid race conditions.
let entry;
while ((entry = (0, node_worker_threads_1.receiveMessageOnPort)(port)) !== undefined) {
onMessage(port, sharedBuffer, entry.message);
}
});
}
while (currentTasks === 0) {

@@ -129,2 +160,3 @@ // Check whether there are new messages by testing whether the current

// requests received.
// We do not check for result
Atomics.wait(sharedBuffer, symbols_1.kRequestCountField, lastSeenRequestCount);

@@ -140,23 +172,25 @@ lastSeenRequestCount = Atomics.load(sharedBuffer, symbols_1.kRequestCountField);

}
function onMessage(port, sharedBuffer, message) {
async function onMessage(port, sharedBuffer, message) {
currentTasks++;
const { taskId, task, filename, name } = message;
(async function () {
let response;
let transferList = [];
try {
const handler = await getHandler(filename, name);
if (handler === null) {
throw new Error(`No handler function exported from ${filename}`);
}
let result = await handler(task);
if ((0, common_1.isMovable)(result)) {
transferList = transferList.concat(result[symbols_1.kTransferable]);
result = result[symbols_1.kValue];
}
response = {
taskId,
result: result,
error: null
};
let response;
let transferList = [];
const start = message.histogramEnabled === 1 ? node_perf_hooks_1.performance.now() : null;
try {
const handler = await getHandler(filename, name);
if (handler === null) {
throw new Error(`No handler function exported from ${filename}`);
}
let result = await handler(task);
if ((0, common_1.isMovable)(result)) {
transferList = transferList.concat(result[symbols_1.kTransferable]);
result = result[symbols_1.kValue];
}
response = {
taskId,
result,
error: null,
time: start == null ? null : Math.round(node_perf_hooks_1.performance.now() - start)
};
if (useAtomics && !useAsyncAtomics) {
// If the task used e.g. console.log(), wait for the stream to drain

@@ -173,12 +207,15 @@ // before potentially entering the `Atomics.wait()` loop, and before

}
catch (error) {
response = {
taskId,
result: null,
// It may be worth taking a look at the error cloning algorithm we
// use in Node.js core here, it's quite a bit more flexible
error: error
};
}
currentTasks--;
}
catch (error) {
response = {
taskId,
result: null,
// It may be worth taking a look at the error cloning algorithm we
// use in Node.js core here, it's quite a bit more flexible
error: error,
time: start == null ? null : Math.round(node_perf_hooks_1.performance.now() - start)
};
}
currentTasks--;
try {
// Post the response to the parent thread, and let it know that we have

@@ -189,8 +226,15 @@ // an additional message available. If possible, use Atomics.wait()

Atomics.add(sharedBuffer, symbols_1.kResponseCountField, 1);
atomicsWaitLoop(port, sharedBuffer);
})().catch(throwInNextTick);
if (useAtomics) {
const res = atomicsWaitLoop(port, sharedBuffer);
if ((res === null || res === void 0 ? void 0 : res.then) != null)
await res;
}
}
catch (error) {
throwInNextTick(error);
}
}
function throwInNextTick(error) {
process.nextTick(() => { throw error; });
process.nextTick((e) => { throw e; }, error);
}
//# sourceMappingURL=worker.js.map
---
id: Class
id: Instance
sidebar_position: 2

@@ -39,3 +39,2 @@ ---

:::
- `maxQueue`: (`number` | `string`) The maximum number of tasks that may be

@@ -52,8 +51,16 @@ scheduled to run, but not yet running due to lack of available threads, at

handling I/O in parallel.
- `useAtomics`: (`boolean`) Use the [`Atomics`](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Atomics) API for faster communication
- `atomics`: (`sync` | `async` | `disabled`) Use the [`Atomics`](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Atomics) API for faster communication
between threads. This is on by default. You can disable `Atomics` globally by
setting the environment variable `PISCINA_DISABLE_ATOMICS` to `1`.
If `useAtomics` is `true`, it will cause to pause threads (stoping all execution)
setting the environment variable `PISCINA_DISABLE_ATOMICS` to `1` .
If `atomics` is `sync`, it will cause to pause threads (stoping all execution)
between tasks. Ideally, threads should wait for all operations to finish before
returning control to the main thread (avoid having open handles within a thread).
returning control to the main thread (avoid having open handles within a thread). If still want to have the possibility
of having open handles or handle asynchrnous tasks, you can set the environment variable `PISCINA_ENABLE_ASYNC_ATOMICS` to `1` or setting `options.atomics` to `async`.
** :::info
**Note**: The `async` mode comes with performance penalties and can lead to undesired behaviour if open handles are not tracked correctly.
Workers should be designed to wait for all operations to finish before returning control to the main thread, if any background operations are still running
`async` can be of help (e.g. for cache warming, etc).
:::
**
- `resourceLimits`: (`object`) See [Node.js new Worker options](https://nodejs.org/api/worker_threads.html#worker_threads_new_worker_filename_options)

@@ -96,2 +103,8 @@ - `maxOldGenerationSizeMb`: (`number`) The maximum size of each worker threads

for the pool. To disable, set to `false`.
- `workerHistogram`: (`boolean`) By default `false`. It will hint the Worker pool to record statistics for each individual Worker
- `loadBalancer`: ([`PiscinaLoadBalancer`](#piscinaloadbalancer)) By default, Piscina uses a least-busy algorithm. The `loadBalancer`
option can be used to provide an alternative implementation. See [Custom Load Balancers](../advanced-topics/loadbalancer.mdx) for additional detail.
- `workerHistogram`: (`boolean`) By default `false`. It will hint the Worker pool to record statistics for each individual Worker
- `loadBalancer`: ([`PiscinaLoadBalancer`](#piscinaloadbalancer)) By default, Piscina uses a least-busy algorithm. The `loadBalancer`
option can be used to provide an alternative implementation. See [Custom Load Balancers](../advanced-topics/loadbalancer.mdx) for additional detail.

@@ -102,1 +115,127 @@ :::caution

:::
## `PiscinaLoadBalancer`
The `PiscinaLoadBalancer` interface is used to implement custom load balancing algorithm that determines which worker thread should be assigned a task.
> For more information, see [Custom Load Balancers](../advanced-topics/loadbalancer.mdx).
### Interface: `PiscinaLoadBalancer`
```ts
type PiscinaLoadBalancer = (
task: PiscinaTask, // Task to be distributed
workers: PiscinaWorker[] // Array of Worker instances
) => PiscinaWorker | null; // Worker instance to be assigned the task
```
If the `PiscinaLoadBalancer` returns `null`, `Piscina` will attempt to spawn a new worker, otherwise the task will be queued until a worker is available.
### Interface: `PiscinaTask`
```ts
interface PiscinaTask {
taskId: number; // Unique identifier for the task
filename: string; // Filename of the worker module
name: string; // Name of the worker function
created: number; // Timestamp when the task was created
isAbortable: boolean; // Indicates if the task can be aborted through AbortSignal
}
```
### Interface: `PiscinaWorker`
```ts
interface PiscinaWorker {
id: number; // Unique identifier for the worker
currentUsage: number; // Number of tasks currently running on the worker
isRunningAbortableTask: boolean; // Indicates if the worker is running an abortable task
histogram: HistogramSummary | null; // Worker histogram
terminating: boolean; // Indicates if the worker is terminating
destroyed: boolean; // Indicates if the worker has been destroyed
}
```
### Example: Custom Load Balancer
#### JavaScript
<a id="custom-load-balancer-example-js"> </a>
```js
const { Piscina } = require('piscina');
function LeastBusyBalancer(opts) {
const { maximumUsage } = opts;
return (task, workers) => {
let candidate = null;
let checkpoint = maximumUsage;
for (const worker of workers) {
if (worker.currentUsage === 0) {
candidate = worker;
break;
}
if (worker.isRunningAbortableTask) continue;
if (!task.isAbortable && worker.currentUsage < checkpoint) {
candidate = worker;
checkpoint = worker.currentUsage;
}
}
return candidate;
};
}
const piscina = new Piscina({
loadBalancer: LeastBusyBalancer({ maximumUsage: 2 }),
});
piscina
.runTask({ filename: 'worker.js', name: 'default' })
.then((result) => console.log(result))
.catch((err) => console.error(err));
```
#### TypeScript
<a id="custom-load-balancer-example-ts"> </a>
```ts
import { Piscina } from 'piscina';
function LeastBusyBalancer(
opts: LeastBusyBalancerOptions
): PiscinaLoadBalancer {
const { maximumUsage } = opts;
return (task, workers) => {
let candidate: PiscinaWorker | null = null;
let checkpoint = maximumUsage;
for (const worker of workers) {
if (worker.currentUsage === 0) {
candidate = worker;
break;
}
if (worker.isRunningAbortableTask) continue;
if (!task.isAbortable && worker.currentUsage < checkpoint) {
candidate = worker;
checkpoint = worker.currentUsage;
}
}
return candidate;
};
}
const piscina = new Piscina({
loadBalancer: LeastBusyBalancer({ maximumUsage: 2 }),
});
piscina
.runTask({ filename: 'worker.js', name: 'default' })
.then((result) => console.log(result))
.catch((err) => console.error(err));
```

@@ -30,2 +30,14 @@ ---

A `'message'` event is emitted whenever a message is received from a worker thread.
A `'message'` event is emitted whenever a message is received from a worker thread.
## Event: `'workerCreate'`
Event that is triggered when a new worker is created.
As argument, it receives the worker instance.
## Event: `'workerDestroy'`
Event that is triggered when a worker is destroyed.
As argument, it receives the worker instance that has been destroyed.
{
"name": "piscina",
"version": "4.8.0",
"version": "5.0.0-alpha.0",
"description": "A fast, efficient Node.js Worker Thread Pool implementation",

@@ -12,5 +12,8 @@ "main": "./dist/main.js",

},
"engines": {
"node": ">=18.x"
},
"scripts": {
"build": "tsc && gen-esm-wrapper . dist/esm-wrapper.mjs",
"lint": "standardx \"**/*.{ts,mjs,js,cjs}\" | snazzy",
"lint": "eslint",
"test": "c8 tap",

@@ -20,9 +23,10 @@ "test:ci": "npm run lint && npm run build && npm run test:coverage",

"prepack": "npm run build",
"bench": "npm run bench:taskqueue && npm run bench:piscina",
"bench:piscina": "npm run benchmark:piscina-default &&npm run benchmark:piscina-fixed-queue && npm run benchmark:piscina-comparison",
"bench:taskqueue": "npm run benchmark:queue-comparison",
"benchmark:piscina-default": "node benchmark/simple-benchmark.js",
"benchmark:piscina-fixed-queue": "node benchmark/simple-benchmark-fixed-queue.js",
"benchmark:piscina-comparison": "node benchmark/piscina-queue-comparison.js",
"benchmark:queue-comparison": "node benchmark/queue-comparison.js"
"benchmark": "npm run bench:queue && npm run benchmark:piscina",
"benchmark:piscina": "npm run benchmark:default &&npm run benchmark:queue:fixed && npm run benchmark:default:comparison",
"benchmark:default": "node benchmark/simple-benchmark.js",
"benchmark:default:async": "node benchmark/simple-benchmark.js",
"benchmark:default:comparison": "node benchmark/piscina-queue-comparison.js",
"benchmark:queue": "npm run benchmark:queue-comparison",
"benchmark:queue:fixed": "node benchmark/simple-benchmark-fixed-queue.js",
"benchmark:queue:comparison": "node benchmark/queue-comparison.js"
},

@@ -42,3 +46,4 @@ "repository": {

"Anna Henningsen <anna@addaleax.net>",
"Matteo Collina <matteo.collina@gmail.com>"
"Matteo Collina <matteo.collina@gmail.com>",
"Carlos Fuentes <me@metcoder.dev>"
],

@@ -48,14 +53,12 @@ "license": "MIT",

"@types/node": "^22.4.1",
"@typescript-eslint/eslint-plugin": "^6.9.0",
"@typescript-eslint/parser": "^6.9.0",
"abort-controller": "^3.0.0",
"c8": "^10.1.2",
"concat-stream": "^2.0.0",
"eslint": "^9.16.0",
"gen-esm-wrapper": "^1.1.1",
"snazzy": "^9.0.0",
"standardx": "^7.0.0",
"neostandard": "^0.11.9",
"tap": "^16.3.7",
"tinybench": "^2.8.0",
"tinybench": "^3.0.0",
"ts-node": "^10.9.2",
"typescript": "5.6.2"
"typescript": "5.7.2"
},

@@ -65,27 +68,2 @@ "optionalDependencies": {

},
"eslintConfig": {
"rules": {
"semi": [
"error",
"always"
],
"no-unused-vars": "off",
"no-use-before-define": "off",
"no-unreachable-loop": "off",
"no-dupe-class-members": "off",
"@typescript-eslint/no-unused-vars": "error"
},
"globals": {
"SharedArrayBuffer": true,
"Atomics": true,
"AbortController": true,
"MessageChannel": true
}
},
"standardx": {
"parser": "@typescript-eslint/parser",
"plugins": [
"@typescript-eslint/eslint-plugin"
]
},
"bugs": {

@@ -92,0 +70,0 @@ "url": "https://github.com/piscinajs/piscina/issues"

@@ -20,6 +20,10 @@ ![Piscina Logo](https://avatars1.githubusercontent.com/u/65627548?s=200&v=4)

For Node.js 16.x and higher.
For Node.js 18.x and higher.
[MIT Licensed][].
## Documentation
- [Website](https://piscinajs.github.io/piscina/)
## Piscina API

@@ -278,3 +282,3 @@

filename: resolve(__dirname, 'worker.js'),
useAtomics: false
atomics: 'disabled'
});

@@ -363,8 +367,12 @@

handling I/O in parallel.
* `useAtomics`: (`boolean`) Use the [`Atomics`][] API for faster communication
* `atomics`: (`sync` | `async` | `disabled`) Use the [`Atomics`](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Atomics) API for faster communication
between threads. This is on by default. You can disable `Atomics` globally by
setting the environment variable `PISCINA_DISABLE_ATOMICS` to `1`.
If `useAtomics` is `true`, it will cause to pause threads (stoping all execution)
between tasks. Ideally, threads should wait for all operations to finish before
returning control to the main thread (avoid having open handles within a thread).
setting the environment variable `PISCINA_DISABLE_ATOMICS` to `1` .
If `atomics` is `sync`, it will cause to pause threads (stoping all execution)
between tasks. Ideally, threads should wait for all operations to finish before
returning control to the main thread (avoid having open handles within a thread). If still want to have the possibility
of having open handles or handle asynchrnous tasks, you can set the environment variable `PISCINA_ENABLE_ASYNC_ATOMICS` to `1` or setting `options.atomics` to `async`.
> **Note**: The `async` mode comes with performance penalties and can lead to undesired behaviour if open handles are not tracked correctly.
* `resourceLimits`: (`object`) See [Node.js new Worker options][]

@@ -438,29 +446,2 @@ * `maxOldGenerationSizeMb`: (`number`) The maximum size of each worker threads

### Method: `runTask(task[, transferList][, filename][, abortSignal])`
**Deprecated** -- Use `run(task, options)` instead.
Schedules a task to be run on a Worker thread.
* `task`: Any value. This will be passed to the function that is exported from
`filename`.
* `transferList`: An optional lists of objects that is passed to
[`postMessage()`] when posting `task` to the Worker, which are transferred
rather than cloned.
* `filename`: Optionally overrides the `filename` option passed to the
constructor for this task. If no `filename` was specified to the constructor,
this is mandatory.
* `abortSignal`: An [`AbortSignal`][] instance. If passed, this can be used to
cancel a task. If the task is already running, the corresponding `Worker`
thread will be stopped.
(More generally, any `EventEmitter` or `EventTarget` that emits `'abort'`
events can be passed here.) Abortable tasks cannot share threads regardless
of the `concurrentTasksPerWorker` options.
This returns a `Promise` for the return value of the (async) function call
made to the function exported from `filename`. If the (async) function throws
an error, the returned `Promise` will be rejected with that error.
If the task is aborted, the returned `Promise` is rejected with an error
as well.
### Method: `destroy()`

@@ -496,3 +477,3 @@

All other errors are reported by rejecting the `Promise` returned from
`run()` or `runTask()`, including rejections reported by the handler function
`run()`, including rejections reported by the handler function
itself.

@@ -502,4 +483,8 @@

A `'drain'` event is emitted whenever the `queueSize` reaches `0`.
A `'drain'` event is emitted when the current usage of the
pool is below the maximum capacity of the same.
The intended goal is to provide backpressure to the task source
so creating tasks that can not be executed at immediately can be avoided.
### Event: `'needsDrain'`

@@ -756,3 +741,3 @@

The special symbol `Piscina.queueOptionsSymbol` may be set as a property
on tasks submitted to `run()` or `runTask()` as a way of passing additional
on tasks submitted to `run()` as a way of passing additional
options on to the custom `TaskQueue` implementation. (Note that because the

@@ -804,3 +789,3 @@ queue options are set as a property on the task, tasks with queue

for (let i = 0; i < 10; i++) {
piscina.runTask({ data: i }).then((result) => {
piscina.run({ data: i }).then((result) => {
console.log(result);

@@ -812,3 +797,2 @@ }).catch((error) => {

```
**Note** The `FixedQueue` will become the default task queue implementation in a next major version.

@@ -939,98 +923,2 @@ ## Current Limitations (Things we're working on / would love help with)

## Release Notes
### 4.1.0
#### Features
* add `needsDrain` property ([#368](https://github.com/piscinajs/piscina/issues/368)) ([2d49b63](https://github.com/piscinajs/piscina/commit/2d49b63368116c172a52e2019648049b4d280162))
* correctly handle process.exit calls outside of a task ([#361](https://github.com/piscinajs/piscina/issues/361)) ([8e6d16e](https://github.com/piscinajs/piscina/commit/8e6d16e1dc23f8bb39772ed954f6689852ad435f))
#### Bug Fixes
* Fix types for TypeScript 4.7 ([#239](https://github.com/piscinajs/piscina/issues/239)) ([a38fb29](https://github.com/piscinajs/piscina/commit/a38fb292e8fcc45cc20abab8668f82d908a24dc0))
* use CJS imports ([#374](https://github.com/piscinajs/piscina/issues/374)) ([edf8dc4](https://github.com/piscinajs/piscina/commit/edf8dc4f1a19e9b49e266109cdb70d9acc86f3ca))
### 4.0.0
* Drop Node.js 14.x support
* Add Node.js 20.x to CI
### 3.2.0
* Adds a new `PISCINA_DISABLE_ATOMICS` environment variable as an alternative way of
disabling Piscina's internal use of the `Atomics` API. (https://github.com/piscinajs/piscina/pull/163)
* Fixes a bug with transferable objects. (https://github.com/piscinajs/piscina/pull/155)
* Fixes CI issues with TypeScript. (https://github.com/piscinajs/piscina/pull/161)
### 3.1.0
* Deprecates `piscina.runTask()`; adds `piscina.run()` as an alternative.
https://github.com/piscinajs/piscina/commit/d7fa24d7515789001f7237ad6ae9ad42d582fc75
* Allows multiple exported handler functions from a single file.
https://github.com/piscinajs/piscina/commit/d7fa24d7515789001f7237ad6ae9ad42d582fc75
### 3.0.0
* Drops Node.js 10.x support
* Updates minimum TypeScript target to ES2019
### 2.1.0
* Adds name property to indicate `AbortError` when tasks are
canceled using an `AbortController` (or similar)
* More examples
### 2.0.0
* Added unmanaged file descriptor tracking
* Updated dependencies
### 1.6.1
* Bug fix: Reject if AbortSignal is already aborted
* Bug Fix: Use once listener for abort event
### 1.6.0
* Add the `niceIncrement` configuration parameter.
### 1.5.1
* Bug fixes around abortable task selection.
### 1.5.0
* Added `Piscina.move()`
* Added Custom Task Queues
* Added utilization metric
* Wait for workers to be ready before considering them as candidates
* Additional examples
### 1.4.0
* Added `maxQueue = 'auto'` to autocalculate the maximum queue size.
* Added more examples, including an example of implementing a worker
as a Node.js native addon.
### 1.3.0
* Added the `'drain'` event
### 1.2.0
* Added support for ESM and file:// URLs
* Added `env`, `argv`, `execArgv`, and `workerData` options
* More examples
### 1.1.0
* Added support for Worker Thread `resourceLimits`
### 1.0.0
* Initial release!
## The Team

@@ -1041,2 +929,5 @@

* Matteo Collina <matteo.collina@gmail.com>
* Rafael Gonzaga <rafael.nunu@hotmail.com>
* Robert Nagy <ronagy@icloud.com>
* Carlos Fuentes <me@metcoder.dev>

@@ -1043,0 +934,0 @@ ## Acknowledgements

import type { Histogram } from 'node:perf_hooks';
import { fileURLToPath, URL } from 'node:url';
import { availableParallelism, cpus } from 'node:os';
import { availableParallelism } from 'node:os';

@@ -92,13 +92,4 @@ import type { HistogramSummary } from './types';

// TODO: drop on v5
export function getAvailableParallelism () : number {
if (typeof availableParallelism === 'function') {
return availableParallelism();
}
try {
return cpus().length;
} catch {
return 1;
}
return availableParallelism();
}

@@ -15,3 +15,4 @@ import { Worker, MessageChannel, MessagePort } from 'node:worker_threads';

ResourceLimits,
EnvSpecifier
EnvSpecifier,
HistogramSummary
} from './types';

@@ -21,3 +22,4 @@ import {

kTransferable,
kValue
kValue,
kWorkerData
} from './symbols';

@@ -36,3 +38,6 @@ import {

WorkerInfo,
AsynchronouslyCreatedResourcePool
AsynchronouslyCreatedResourcePool,
PiscinaLoadBalancer,
PiscinaWorker,
LeastBusyBalancer
} from './worker_pool';

@@ -66,3 +71,3 @@ import {

concurrentTasksPerWorker? : number,
useAtomics? : boolean,
atomics? : 'sync' | 'async' | 'disabled',
resourceLimits? : ResourceLimits,

@@ -77,3 +82,5 @@ argv? : string[],

closeTimeout?: number,
recordTiming?: boolean
recordTiming?: boolean,
loadBalancer?: PiscinaLoadBalancer,
workerHistogram?: boolean,
}

@@ -89,7 +96,8 @@

concurrentTasksPerWorker : number,
useAtomics: boolean,
atomics: Options['atomics'],
taskQueue : TaskQueue,
niceIncrement : number,
closeTimeout : number,
recordTiming : boolean
recordTiming : boolean,
workerHistogram: boolean,
}

@@ -123,3 +131,3 @@

concurrentTasksPerWorker: 1,
useAtomics: true,
atomics: 'sync',
taskQueue: new ArrayTaskQueue(),

@@ -129,3 +137,4 @@ niceIncrement: 0,

closeTimeout: 30000,
recordTiming: true
recordTiming: true,
workerHistogram: false
};

@@ -175,3 +184,3 @@

waitTime? : RecordableHistogram;
needsDrain : boolean;
_needsDrain : boolean;
start : number = performance.now();

@@ -183,6 +192,8 @@ inProcessPendingMessages : boolean = false;

destroying : boolean = false;
maxCapacity: number;
balancer: PiscinaLoadBalancer;
constructor (publicInterface : Piscina, options : Options) {
this.publicInterface = publicInterface;
this.taskQueue = options.taskQueue || new ArrayTaskQueue();
this.taskQueue = options.taskQueue ?? new FixedQueue();

@@ -213,5 +224,7 @@ const filename =

this.balancer = this.options.loadBalancer ?? LeastBusyBalancer({ maximumUsage: this.options.concurrentTasksPerWorker });
this.workers = new AsynchronouslyCreatedResourcePool<WorkerInfo>(
this.options.concurrentTasksPerWorker);
this.workers.onAvailable((w : WorkerInfo) => this._onWorkerAvailable(w));
this.workers.onTaskDone((w : WorkerInfo) => this._onWorkerTaskDone(w));
this.maxCapacity = this.options.maxThreads * this.options.concurrentTasksPerWorker;

@@ -221,3 +234,3 @@ this.startingUp = true;

this.startingUp = false;
this.needsDrain = false;
this._needsDrain = false;
}

@@ -235,2 +248,4 @@

_addNewWorker () : void {
if (this.closingUp) return;
const pool = this;

@@ -247,3 +262,8 @@ const worker = new Worker(resolve(__dirname, 'worker.js'), {

const { port1, port2 } = new MessageChannel();
const workerInfo = new WorkerInfo(worker, port1, onMessage);
const workerInfo = new WorkerInfo(worker, port1, onMessage, this.options.workerHistogram);
workerInfo.onDestroy(() => {
this.publicInterface.emit('workerDestroy', workerInfo.interface);
});
if (this.startingUp) {

@@ -253,2 +273,13 @@ // There is no point in waiting for the initial set of Workers to indicate

workerInfo.markAsReady();
// We need to emit the event in the next microtask, so that the user can
// attach event listeners before the event is emitted.
queueMicrotask(() => {
this.publicInterface.emit('workerCreate', workerInfo.interface);
this._onWorkerReady(workerInfo);
});
} else {
workerInfo.onReady(() => {
this.publicInterface.emit('workerCreate', workerInfo.interface);
this._onWorkerReady(workerInfo);
});
}

@@ -261,3 +292,3 @@

sharedBuffer: workerInfo.sharedBuffer,
useAtomics: this.options.useAtomics,
atomics: this.options.atomics!,
niceIncrement: this.options.niceIncrement

@@ -275,3 +306,5 @@ };

pool.workers.maybeAvailable(workerInfo);
// TODO: we can abstract the task info handling
// right into the pool.workers.taskDone method
pool.workers.taskDone(workerInfo);

@@ -366,3 +399,3 @@ /* istanbul ignore if */

_processPendingMessages () {
if (this.inProcessPendingMessages || !this.options.useAtomics) {
if (this.inProcessPendingMessages || this.options.atomics === 'disabled') {
return;

@@ -387,5 +420,13 @@ }

_onWorkerReady (workerInfo : WorkerInfo) : void {
this._onWorkerAvailable(workerInfo);
}
_onWorkerTaskDone (workerInfo: WorkerInfo) : void {
this._onWorkerAvailable(workerInfo);
}
_onWorkerAvailable (workerInfo : WorkerInfo) : void {
while ((this.taskQueue.size > 0 || this.skipQueue.length > 0) &&
workerInfo.currentUsage() < this.options.concurrentTasksPerWorker) {
let workers: PiscinaWorker[] | null = null;
while ((this.taskQueue.size > 0 || this.skipQueue.length > 0)) {
// The skipQueue will have tasks that we previously shifted off

@@ -396,20 +437,28 @@ // the task queue but had to skip over... we have to make sure

this.taskQueue.shift() as TaskInfo;
// If the task has an abortSignal and the worker has any other
// tasks, we cannot distribute the task to it. Skip for now.
if (taskInfo.abortSignal && workerInfo.taskInfos.size > 0) {
this.skipQueue.push(taskInfo);
if (workers == null) {
workers = [...this.workers].map(workerInfo => workerInfo.interface);
}
const distributed = this._distributeTask(taskInfo, workers);
if (distributed) {
// If task was distributed, we should continue to distribute more tasks
continue;
} else if (this.workers.size < this.options.maxThreads) {
// We spawn if possible
// TODO: scheduler will intercept this.
this._addNewWorker();
continue;
} else {
// If balancer states that pool is busy, we should stop trying to distribute tasks
break;
}
const now = performance.now();
this.waitTime?.record(toHistogramIntegerNano(now - taskInfo.created));
taskInfo.started = now;
workerInfo.postTask(taskInfo);
this._maybeDrain();
return;
}
if (workerInfo.taskInfos.size === 0 &&
// If more workers than minThreads, we can remove idle workers
if (workerInfo.currentUsage() === 0 &&
this.workers.size > this.options.minThreads) {
workerInfo.idleTimeout = setTimeout(() => {
assert.strictEqual(workerInfo.taskInfos.size, 0);
assert.strictEqual(workerInfo.currentUsage(), 0);
if (this.workers.size > this.options.minThreads) {

@@ -422,2 +471,29 @@ this._removeWorker(workerInfo);

_distributeTask (task: TaskInfo, workers: PiscinaWorker[]): boolean {
// We need to verify if the task is aborted already or not
// otherwise we might be distributing aborted tasks to workers
if (task.aborted) return false;
const candidate = this.balancer(task.interface, workers);
// Seeking for a real worker instead of customized one
if (candidate != null && candidate[kWorkerData] != null) {
const now = performance.now();
this.waitTime?.record(toHistogramIntegerNano(now - task.created));
task.started = now;
candidate[kWorkerData].postTask(task);
this._maybeDrain();
// If candidate, let's try to distribute more tasks
return true;
}
if (task.abortSignal) {
this.skipQueue.push(task);
} else {
this.taskQueue.push(task);
}
return false;
}
runTask (

@@ -445,5 +521,5 @@ task : any,

let signal: AbortSignalAny | null;
if (this.closingUp) {
if (this.closingUp || this.destroying) {
const closingUpAbortController = new AbortController();
closingUpAbortController.abort('queue is closing up');
closingUpAbortController.abort('queue is being terminated');

@@ -484,4 +560,6 @@ signal = closingUpAbortController.signal;

if ((signal as AbortSignalEventTarget).aborted) {
return Promise.reject(new AbortError((signal as AbortSignalEventTarget).reason));
reject!(new AbortError((signal as AbortSignalEventTarget).reason));
return ret;
}
taskInfo.abortListener = () => {

@@ -499,10 +577,10 @@ // Call reject() first to make sure we always reject with the AbortError

// Not yet running: Remove it from the queue.
// Call should be idempotent
this.taskQueue.remove(taskInfo);
}
};
onabort(signal, taskInfo.abortListener);
}
// If there is a task queue, there's no point in looking for an available
// Worker thread. Add this task to the queue, if possible.
if (this.taskQueue.size > 0) {

@@ -512,10 +590,7 @@ const totalCapacity = this.options.maxQueue + this.pendingCapacity();

if (this.options.maxQueue === 0) {
return Promise.reject(Errors.NoTaskQueueAvailable());
reject!(Errors.NoTaskQueueAvailable());
} else {
return Promise.reject(Errors.TaskQueueAtLimit());
reject!(Errors.TaskQueueAtLimit());
}
} else {
if (this.workers.size < this.options.maxThreads) {
this._addNewWorker();
}
this.taskQueue.push(taskInfo);

@@ -528,37 +603,18 @@ }

// Look for a Worker with a minimum number of tasks it is currently running.
let workerInfo : WorkerInfo | null = this.workers.findAvailable();
const workers = [...this.workers.readyItems].map(workerInfo => workerInfo.interface);
const distributed = this._distributeTask(taskInfo, workers);
// If we want the ability to abort this task, use only workers that have
// no running tasks.
if (workerInfo !== null && workerInfo.currentUsage() > 0 && signal) {
workerInfo = null;
}
if (!distributed) {
// We spawn if possible
// TODO: scheduler will intercept this.
if (this.workers.size < this.options.maxThreads) {
this._addNewWorker();
}
// If no Worker was found, or that Worker was handling another task in some
// way, and we still have the ability to spawn new threads, do so.
let waitingForNewWorker = false;
if ((workerInfo === null || workerInfo.currentUsage() > 0) &&
this.workers.size < this.options.maxThreads) {
this._addNewWorker();
waitingForNewWorker = true;
}
// If no Worker is found, try to put the task into the queue.
if (workerInfo === null) {
if (this.options.maxQueue <= 0 && !waitingForNewWorker) {
return Promise.reject(Errors.NoTaskQueueAvailable());
} else {
this.taskQueue.push(taskInfo);
// We reject if no task queue set and no more pending capacity.
if (this.options.maxQueue <= 0 && this.pendingCapacity() === 0) {
reject!(Errors.NoTaskQueueAvailable());
}
};
this._maybeDrain();
return ret;
}
// TODO(addaleax): Clean up the waitTime/runTime recording.
const now = performance.now();
this.waitTime?.record(toHistogramIntegerNano(now - taskInfo.created));
taskInfo.started = now;
workerInfo.postTask(taskInfo);
this._maybeDrain();

@@ -574,14 +630,18 @@ return ret;

_maybeDrain () {
const totalCapacity = this.options.maxQueue + this.pendingCapacity();
const totalQueueSize = this.taskQueue.size + this.skipQueue.length;
/**
* Our goal is to make it possible for user space to use the pool
* in a way where always waiting === 0,
* since we want to avoid creating tasks that can't execute
* immediately in order to provide back pressure to the task source.
*/
const { maxCapacity } = this;
const currentUsage = this.workers.getCurrentUsage();
if (totalQueueSize === 0) {
this.needsDrain = false;
if (maxCapacity === currentUsage) {
this._needsDrain = true;
this.publicInterface.emit('needsDrain');
} else if (maxCapacity > currentUsage && this._needsDrain) {
this._needsDrain = false;
this.publicInterface.emit('drain');
}
if (totalQueueSize >= totalCapacity) {
this.needsDrain = true;
this.publicInterface.emit('needsDrain');
}
}

@@ -662,3 +722,3 @@

workerInfo.port.on('message', () => checkIfWorkerIsDone(workerInfo));
this.workers.onTaskDone(checkIfWorkerIsDone);
}

@@ -668,3 +728,3 @@ });

const throwOnTimeOut = async (timeout: number) => {
await sleep(timeout);
await sleep(timeout, null, { ref: false });
throw Errors.CloseTimeout();

@@ -727,5 +787,5 @@ };

}
if (options.useAtomics !== undefined &&
typeof options.useAtomics !== 'boolean') {
throw new TypeError('options.useAtomics must be a boolean value');
if (options.atomics != null && (typeof options.atomics !== 'string' ||
!['sync', 'async', 'disabled'].includes(options.atomics))) {
throw new TypeError('options.atomics should be a value of sync, sync or disabled.');
}

@@ -751,52 +811,10 @@ if (options.resourceLimits !== undefined &&

}
this.#pool = new ThreadPool(this, options);
}
/** @deprecated Use run(task, options) instead **/
runTask (task : T, transferList? : TransferList, filename? : string, abortSignal? : AbortSignalAny) : Promise<R>;
/** @deprecated Use run(task, options) instead **/
runTask (task : T, transferList? : TransferList, filename? : AbortSignalAny, abortSignal? : undefined) : Promise<R>;
/** @deprecated Use run(task, options) instead **/
runTask (task : T, transferList? : string, filename? : AbortSignalAny, abortSignal? : undefined) : Promise<R>;
/** @deprecated Use run(task, options) instead **/
runTask (task : T, transferList? : AbortSignalAny, filename? : undefined, abortSignal? : undefined) : Promise<R>;
/** @deprecated Use run(task, options) instead **/
runTask (task : T, transferList? : any, filename? : any, signal? : any): Promise<R> {
// If transferList is a string or AbortSignal, shift it.
if ((typeof transferList === 'object' && !Array.isArray(transferList)) ||
typeof transferList === 'string') {
signal = filename as (AbortSignalAny | undefined);
filename = transferList;
transferList = undefined;
if (options.loadBalancer !== undefined && (typeof options.loadBalancer !== 'function' || options.loadBalancer.length < 1)) {
throw new TypeError('options.loadBalancer must be a function with at least two args');
}
// If filename is an AbortSignal, shift it.
if (typeof filename === 'object' && !Array.isArray(filename)) {
signal = filename;
filename = undefined;
if (options.workerHistogram !== undefined && (typeof options.workerHistogram !== 'boolean')) {
throw new TypeError('options.workerHistogram must be a boolean');
}
if (transferList !== undefined && !Array.isArray(transferList)) {
return Promise.reject(
new TypeError('transferList argument must be an Array'));
}
if (filename !== undefined && typeof filename !== 'string') {
return Promise.reject(
new TypeError('filename argument must be a string'));
}
if (signal !== undefined && typeof signal !== 'object') {
return Promise.reject(
new TypeError('signal argument must be an object'));
}
return this.#pool.runTask(
task, {
transferList,
filename: filename || null,
name: 'default',
signal: signal || null
});
this.#pool = new ThreadPool(this, options);
}

@@ -830,2 +848,3 @@

}
return this.#pool.runTask(task, { transferList, filename, name, signal });

@@ -883,3 +902,3 @@ }

get waitTime () : any {
get waitTime () : HistogramSummary | null {
if (!this.#pool.waitTime) {

@@ -906,3 +925,3 @@ return null;

// count is available as of Node.js v16.14.0 but not present in the types
const count = (this.#pool.runTime as RecordableHistogram & { count: number}).count;
const count = (this.#pool.runTime as RecordableHistogram & { count: number }).count;
if (count === 0) {

@@ -912,2 +931,6 @@ return 0;

if (!this.#pool.runTime) {
return 0;
}
// The capacity is the max compute time capacity of the

@@ -938,3 +961,3 @@ // pool to this point in time as determined by the length

get needsDrain () : boolean {
return this.#pool.needsDrain;
return this.#pool._needsDrain;
}

@@ -941,0 +964,0 @@

// Internal symbol used to mark Transferable objects returned
// by the Piscina.move() function
export const kMovable = Symbol('Piscina.kMovable');
export const kWorkerData = Symbol('Piscina.kWorkerData');
export const kTransferable = Symbol.for('Piscina.transferable');

@@ -5,0 +6,0 @@ export const kValue = Symbol.for('Piscina.valueOf');

@@ -55,6 +55,8 @@ import type { MessagePort } from 'node:worker_threads';

abortSignal : AbortSignalAny | null;
abortListener : (() => void) | null = null;
// abortListener : (() => void) | null = null;
workerInfo : WorkerInfo | null = null;
created : number;
started : number;
aborted = false;
_abortListener: (() => void) | null = null;

@@ -91,2 +93,3 @@ constructor (

this.name = name;
// TODO: This should not be global
this.taskId = taskIdCounter++;

@@ -98,2 +101,14 @@ this.abortSignal = abortSignal;

// TODO: improve this handling - ideally should be extended
set abortListener (value: (() => void)) {
this._abortListener = () => {
this.aborted = true;
value();
};
}
get abortListener (): (() => void) | null {
return this._abortListener;
}
releaseTask () : any {

@@ -120,4 +135,4 @@ const ret = this.task;

get [kQueueOptions] () : object | null {
return kQueueOptions in this.task ? this.task[kQueueOptions] : null;
get [kQueueOptions] () : {} | null {
return this.task?.[kQueueOptions] ?? null;
}

@@ -124,0 +139,0 @@

@@ -11,3 +11,3 @@ import type { MessagePort, Worker } from 'node:worker_threads';

sharedBuffer: Int32Array
useAtomics: boolean
atomics: 'async' | 'sync' | 'disabled'
niceIncrement: number

@@ -21,2 +21,3 @@ }

name: string
histogramEnabled: number
}

@@ -32,2 +33,3 @@

error: Error | null
time: number | null
}

@@ -44,3 +46,2 @@ export const commonState = {

/* eslint-disable camelcase */
export interface HistogramSummary {

@@ -68,3 +69,2 @@ average: number;

}
/* eslint-enable camelcase */

@@ -71,0 +71,0 @@ export type ResourceLimits = Worker extends {

import { Worker, MessagePort, receiveMessageOnPort } from 'node:worker_threads';
import { createHistogram, RecordableHistogram } from 'node:perf_hooks';
import assert from 'node:assert';
import { RequestMessage, ResponseMessage } from '../types';
import { HistogramSummary, RequestMessage, ResponseMessage } from '../types';
import { Errors } from '../errors';
import { TaskInfo } from '../task_queue';
import { kFieldCount, kRequestCountField, kResponseCountField } from '../symbols';
import { kFieldCount, kRequestCountField, kResponseCountField, kWorkerData } from '../symbols';
import { createHistogramSummary, toHistogramIntegerNano } from '../common';
import { AsynchronouslyCreatedResource, AsynchronouslyCreatedResourcePool } from './base';
export * from './balancer';
type ResponseCallback = (response : ResponseMessage) => void;
abstract class AsynchronouslyCreatedResource {
onreadyListeners : (() => void)[] | null = [];
markAsReady () : void {
const listeners = this.onreadyListeners;
assert(listeners !== null);
this.onreadyListeners = null;
for (const listener of listeners) {
listener();
}
}
isReady () : boolean {
return this.onreadyListeners === null;
}
onReady (fn : () => void) {
if (this.onreadyListeners === null) {
fn(); // Zalgo is okay here.
return;
}
this.onreadyListeners.push(fn);
}
abstract currentUsage() : number;
export type PiscinaWorker = {
id: number;
currentUsage: number;
isRunningAbortableTask: boolean;
histogram: HistogramSummary | null;
terminating: boolean;
destroyed: boolean;
[kWorkerData]: WorkerInfo;
}
export class AsynchronouslyCreatedResourcePool<
T extends AsynchronouslyCreatedResource> {
pendingItems = new Set<T>();
readyItems = new Set<T>();
maximumUsage : number;
onAvailableListeners : ((item : T) => void)[];
constructor (maximumUsage : number) {
this.maximumUsage = maximumUsage;
this.onAvailableListeners = [];
}
add (item : T) {
this.pendingItems.add(item);
item.onReady(() => {
/* istanbul ignore else */
if (this.pendingItems.has(item)) {
this.pendingItems.delete(item);
this.readyItems.add(item);
this.maybeAvailable(item);
}
});
}
delete (item : T) {
this.pendingItems.delete(item);
this.readyItems.delete(item);
}
findAvailable () : T | null {
let minUsage = this.maximumUsage;
let candidate = null;
for (const item of this.readyItems) {
const usage = item.currentUsage();
if (usage === 0) return item;
if (usage < minUsage) {
candidate = item;
minUsage = usage;
}
}
return candidate;
}
* [Symbol.iterator] () {
yield * this.pendingItems;
yield * this.readyItems;
}
get size () {
return this.pendingItems.size + this.readyItems.size;
}
maybeAvailable (item : T) {
/* istanbul ignore else */
if (item.currentUsage() < this.maximumUsage) {
for (const listener of this.onAvailableListeners) {
listener(item);
}
}
}
onAvailable (fn : (item : T) => void) {
this.onAvailableListeners.push(fn);
}
}
export class WorkerInfo extends AsynchronouslyCreatedResource {
worker : Worker;
taskInfos : Map<number, TaskInfo>;
idleTimeout : NodeJS.Timeout | null = null; // eslint-disable-line no-undef
idleTimeout : NodeJS.Timeout | null = null;
port : MessagePort;

@@ -113,2 +35,5 @@ sharedBuffer : Int32Array;

onMessage : ResponseCallback;
histogram: RecordableHistogram | null;
terminating = false;
destroyed = false;

@@ -118,3 +43,5 @@ constructor (

port : MessagePort,
onMessage : ResponseCallback) {
onMessage : ResponseCallback,
enableHistogram: boolean
) {
super();

@@ -129,5 +56,13 @@ this.worker = worker;

new SharedArrayBuffer(kFieldCount * Int32Array.BYTES_PER_ELEMENT));
this.histogram = enableHistogram ? createHistogram() : null;
}
get id (): number {
return this.worker.threadId;
}
destroy () : void {
if (this.terminating || this.destroyed) return;
this.terminating = true;
this.worker.terminate();

@@ -140,6 +75,10 @@ this.port.close();

this.taskInfos.clear();
this.terminating = false;
this.destroyed = true;
this.markAsDestroyed();
}
clearIdleTimeout () : void {
if (this.idleTimeout !== null) {
if (this.idleTimeout != null) {
clearTimeout(this.idleTimeout);

@@ -163,2 +102,6 @@ this.idleTimeout = null;

_handleResponse (message : ResponseMessage) : void {
if (message.time != null) {
this.histogram?.record(toHistogramIntegerNano(message.time));
}
this.onMessage(message);

@@ -175,2 +118,4 @@

assert(!this.taskInfos.has(taskInfo.taskId));
assert(!this.terminating && !this.destroyed);
const message : RequestMessage = {

@@ -180,3 +125,4 @@ task: taskInfo.releaseTask(),

filename: taskInfo.filename,
name: taskInfo.name
name: taskInfo.name,
histogramEnabled: this.histogram != null ? 1 : 0
};

@@ -205,2 +151,3 @@

processPendingMessages () {
if (this.destroyed) return;
// If we *know* that there are more messages than we have received using

@@ -234,2 +181,29 @@ // 'message' events yet, then try to load and handle them synchronously,

}
get interface (): PiscinaWorker {
const worker = this;
return {
get id () {
return worker.worker.threadId;
},
get currentUsage () {
return worker.currentUsage();
},
get isRunningAbortableTask () {
return worker.isRunningAbortableTask();
},
get histogram () {
return worker.histogram != null ? createHistogramSummary(worker.histogram) : null;
},
get terminating () {
return worker.terminating;
},
get destroyed () {
return worker.destroyed;
},
[kWorkerData]: worker
};
}
}
export { AsynchronouslyCreatedResourcePool };
import { parentPort, MessagePort, receiveMessageOnPort, workerData } from 'node:worker_threads';
import { pathToFileURL } from 'node:url';
import { performance } from 'node:perf_hooks';

@@ -25,4 +26,7 @@ import type {

function noop (): void {}
const handlerCache : Map<string, Function> = new Map();
let useAtomics : boolean = process.env.PISCINA_DISABLE_ATOMICS !== '1';
let useAsyncAtomics : boolean = process.env.PISCINA_ENABLE_ASYNC_ATOMICS === '1';

@@ -46,3 +50,3 @@ // Get `import(x)` as a function that isn't transpiled to `require(x)` by

let handler = handlerCache.get(`${filename}/${name}`);
if (handler !== undefined) {
if (handler != null) {
return handler;

@@ -84,13 +88,11 @@ }

// (so we can pre-load and cache the handler).
parentPort!.on('message', (message: StartupMessage) => {
useAtomics = process.env.PISCINA_DISABLE_ATOMICS === '1' ? false : message.useAtomics;
parentPort!.on('message', async (message: StartupMessage) => {
const { port, sharedBuffer, filename, name, niceIncrement } = message;
(async function () {
try {
if (niceIncrement !== 0) {
(await import('@napi-rs/nice')).nice(niceIncrement);
}
} catch {}
if (filename !== null) {
if (niceIncrement !== 0) {
(await import('@napi-rs/nice').catch(noop))?.nice(niceIncrement);
}
try {
if (filename != null) {
await getHandler(filename, name);

@@ -100,7 +102,15 @@ }

const readyMessage : ReadyMessage = { [READY]: true };
useAtomics = useAtomics !== false && message.atomics !== 'disabled';
useAsyncAtomics = useAtomics !== false && (useAsyncAtomics || message.atomics === 'async');
parentPort!.postMessage(readyMessage);
port.on('message', onMessage.bind(null, port, sharedBuffer));
atomicsWaitLoop(port, sharedBuffer);
})().catch(throwInNextTick);
if (useAtomics) {
const res = atomicsWaitLoop(port, sharedBuffer);
if (res?.then != null) await res;
}
} catch (error) {
throwInNextTick(error as Error);
}
});

@@ -110,5 +120,4 @@

let lastSeenRequestCount : number = 0;
function atomicsWaitLoop (port : MessagePort, sharedBuffer : Int32Array) {
if (!useAtomics) return;
// This function is entered either after receiving the startup message, or

@@ -125,2 +134,20 @@ // when we are done with a task. In those situations, the *only* thing we

// operations without waiting for them to finish, though.
if (useAsyncAtomics === true) {
// @ts-expect-error - for some reason not supported by TS
const { async, value } = Atomics.waitAsync(sharedBuffer, kRequestCountField, lastSeenRequestCount);
// We do not check for result
return async === true && value.then(() => {
lastSeenRequestCount = Atomics.load(sharedBuffer, kRequestCountField);
// We have to read messages *after* updating lastSeenRequestCount in order
// to avoid race conditions.
let entry;
while ((entry = receiveMessageOnPort(port)) !== undefined) {
onMessage(port, sharedBuffer, entry.message);
}
});
}
while (currentTasks === 0) {

@@ -130,3 +157,5 @@ // Check whether there are new messages by testing whether the current

// requests received.
// We do not check for result
Atomics.wait(sharedBuffer, kRequestCountField, lastSeenRequestCount);
lastSeenRequestCount = Atomics.load(sharedBuffer, kRequestCountField);

@@ -143,3 +172,3 @@

function onMessage (
async function onMessage (
port : MessagePort,

@@ -150,22 +179,24 @@ sharedBuffer : Int32Array,

const { taskId, task, filename, name } = message;
let response : ResponseMessage;
let transferList : any[] = [];
const start = message.histogramEnabled === 1 ? performance.now() : null;
(async function () {
let response : ResponseMessage;
let transferList : any[] = [];
try {
const handler = await getHandler(filename, name);
if (handler === null) {
throw new Error(`No handler function exported from ${filename}`);
}
let result = await handler(task);
if (isMovable(result)) {
transferList = transferList.concat(result[kTransferable]);
result = result[kValue];
}
response = {
taskId,
result: result,
error: null
};
try {
const handler = await getHandler(filename, name);
if (handler === null) {
throw new Error(`No handler function exported from ${filename}`);
}
let result = await handler(task);
if (isMovable(result)) {
transferList = transferList.concat(result[kTransferable]);
result = result[kValue];
}
response = {
taskId,
result,
error: null,
time: start == null ? null : Math.round(performance.now() - start)
};
if (useAtomics && !useAsyncAtomics) {
// If the task used e.g. console.log(), wait for the stream to drain

@@ -178,16 +209,20 @@ // before potentially entering the `Atomics.wait()` loop, and before

}
if (process.stderr.writableLength > 0) {
await new Promise((resolve) => process.stderr.write('', resolve));
}
} catch (error) {
response = {
taskId,
result: null,
// It may be worth taking a look at the error cloning algorithm we
// use in Node.js core here, it's quite a bit more flexible
error: <Error>error
};
}
currentTasks--;
} catch (error) {
response = {
taskId,
result: null,
// It may be worth taking a look at the error cloning algorithm we
// use in Node.js core here, it's quite a bit more flexible
error: <Error>error,
time: start == null ? null : Math.round(performance.now() - start)
};
}
currentTasks--;
try {
// Post the response to the parent thread, and let it know that we have

@@ -198,8 +233,14 @@ // an additional message available. If possible, use Atomics.wait()

Atomics.add(sharedBuffer, kResponseCountField, 1);
atomicsWaitLoop(port, sharedBuffer);
})().catch(throwInNextTick);
if (useAtomics) {
const res = atomicsWaitLoop(port, sharedBuffer);
if (res?.then != null) await res;
}
} catch (error) {
throwInNextTick(error as Error);
}
}
function throwInNextTick (error : Error) {
process.nextTick(() => { throw error; });
process.nextTick((e: Error) => { throw e; }, error);
}

@@ -13,3 +13,3 @@ import { EventEmitter } from 'events';

const abortController = new AbortController();
rejects(pool.runTask(buf, abortController.signal),
rejects(pool.run(buf, { signal: abortController.signal }),
/The task has been aborted/);

@@ -30,3 +30,2 @@

const ee = new EventEmitter();
rejects(pool.runTask(buf, ee), /The task has been aborted/);
rejects(pool.run(buf, { signal: ee }), /The task has been aborted/);

@@ -42,3 +41,3 @@

const pool = new Piscina({
filename: resolve(__dirname, 'fixtures/wait-for-notify.ts'),
filename: resolve(__dirname, 'fixtures/wait-for-notify.js'),
maxThreads: 1

@@ -51,7 +50,7 @@ });

];
const task1 = pool.runTask(bufs[0]);
const ee = new EventEmitter();
rejects(pool.runTask(bufs[1], ee), /The task has been aborted/);
rejects(pool.run(bufs[1], { signal: ee }), /The task has been aborted/);
equal(pool.queueSize, 2);
const task1 = pool.run(bufs[0]);
const abortable = pool.run(bufs[1], { signal: ee });
equal(pool.queueSize, 0); // Means it's running
rejects(abortable, /The task has been aborted/);

@@ -77,6 +76,6 @@ ee.emit('abort');

];
const task1 = pool.runTask(bufs[0]);
const task1 = pool.run(bufs[0]);
const ee = new EventEmitter();
rejects(pool.runTask(bufs[1], ee), /The task has been aborted/);
equal(pool.queueSize, 1);
rejects(pool.run(bufs[1], { signal: ee }), /The task has been aborted/);
equal(pool.queueSize, 0);

@@ -91,2 +90,3 @@ ee.emit('abort');

// TODO: move to testing balancer
test('abortable tasks will not share workers (abortable posted first)', async ({ equal, rejects }) => {

@@ -100,4 +100,4 @@ const pool = new Piscina({

const ee = new EventEmitter();
rejects(pool.runTask('while(true);', ee), /The task has been aborted/);
const task2 = pool.runTask('42');
rejects(pool.run('while(true);', { signal: ee }), /The task has been aborted/);
const task2 = pool.run('42');
equal(pool.queueSize, 1);

@@ -111,2 +111,3 @@

// TODO: move to testing balancer
test('abortable tasks will not share workers (on worker available)', async ({ equal }) => {

@@ -124,7 +125,6 @@ const pool = new Piscina({

// until after Task 2 completes because it is abortable.
const ret = await Promise.all([
pool.runTask({ time: 100, a: 1 }),
pool.runTask({ time: 300, a: 2 }),
pool.runTask({ time: 100, a: 3 }, new EventEmitter())
pool.run({ time: 100, a: 1 }),
pool.run({ time: 300, a: 2 }),
pool.run({ time: 100, a: 3 }, { signal: new EventEmitter() })
]);

@@ -150,8 +150,8 @@

pool.runTask({ time: 100, a: 1 }).then(() => {
pool.run({ time: 100, a: 1 }).then(() => {
pool.destroy();
});
rejects(pool.runTask({ time: 300, a: 2 }), /Terminating worker thread/);
rejects(pool.runTask({ time: 100, a: 3 }, new EventEmitter()),
rejects(pool.run({ time: 300, a: 2 }), /Terminating worker thread/);
rejects(pool.run({ time: 100, a: 3 }, { signal: new EventEmitter() }),
/Terminating worker thread/);

@@ -172,3 +172,3 @@ });

const data = new Uint8Array(new SharedArrayBuffer(4));
rejects(pool.runTask(data, [data.buffer], controller.signal),
rejects(pool.run(data, { signal: controller.signal, transferList: [data.buffer] }),
/The task has been aborted/);

@@ -186,3 +186,3 @@

await pool.runTask('1+1', ee);
await pool.run('1+1', { signal: ee });

@@ -196,3 +196,3 @@ const { getEventListeners } = EventEmitter as any;

await pool.runTask('1+1', controller.signal);
await pool.run('1+1', { signal: controller.signal });
});

@@ -247,1 +247,49 @@

});
test('aborted AbortSignal rejects task immediately (with reason)', async ({ match, equal }) => {
const pool = new Piscina({
filename: resolve(__dirname, 'fixtures/move.ts')
});
const customReason = new Error('custom reason');
const controller = new AbortController();
controller.abort(customReason);
equal(controller.signal.aborted, true);
equal(controller.signal.reason, customReason);
// The data won't be moved because the task will abort immediately.
const data = new Uint8Array(new SharedArrayBuffer(4));
try {
await pool.run(data, { transferList: [data.buffer], signal: controller.signal });
} catch (error) {
equal(error.message, 'The task has been aborted');
match(error.cause, customReason);
}
equal(data.length, 4);
});
test('tasks can be aborted through AbortController while running', async ({ equal, match }) => {
const pool = new Piscina({
filename: resolve(__dirname, 'fixtures/notify-then-sleep.ts')
});
const reason = new Error('custom reason');
const buf = new Int32Array(new SharedArrayBuffer(4));
const abortController = new AbortController();
try {
const promise = pool.run(buf, { signal: abortController.signal });
Atomics.wait(buf, 0, 0);
equal(Atomics.load(buf, 0), 1);
abortController.abort(reason);
await promise;
} catch (error) {
equal(error.message, 'The task has been aborted');
match(error.cause, reason);
}
});

@@ -36,3 +36,3 @@ import { createHook, executionAsyncId } from 'async_hooks';

await pool.runTask('42');
await pool.run('42');

@@ -39,0 +39,0 @@ hook.disable();

@@ -1,6 +0,8 @@

import Piscina from '..';
import { resolve } from 'node:path';
import { test } from 'tap';
import { resolve } from 'path';
test('coverage test for Atomics optimization', async ({ equal }) => {
import Piscina from '..';
test('coverage test for Atomics optimization (sync mode)', async ({ equal }) => {
const pool = new Piscina({

@@ -10,7 +12,8 @@ filename: resolve(__dirname, 'fixtures/notify-then-sleep-or.js'),

maxThreads: 2,
concurrentTasksPerWorker: 2
concurrentTasksPerWorker: 2,
atomics: 'sync'
});
const tasks = [];
let v : number;
let v: number;

@@ -20,3 +23,3 @@ // Post 4 tasks, and wait for all of them to be ready.

for (let index = 0; index < 4; index++) {
tasks.push(pool.runTask({ i32array, index }));
tasks.push(pool.run({ i32array, index }));
}

@@ -74,3 +77,4 @@

minThreads: 2,
maxThreads: 2
maxThreads: 2,
atomics: 'sync'
});

@@ -80,3 +84,3 @@

for (let i = 1; i <= 10000; i++) {
tasks.push(pool.runTask(null));
tasks.push(pool.run(null));
}

@@ -86,1 +90,38 @@

});
test('enable async mode', async (t) => {
const pool = new Piscina({
filename: resolve(__dirname, 'fixtures/eval-params.js'),
minThreads: 1,
maxThreads: 1,
atomics: 'async'
});
const bufs = [
new Int32Array(new SharedArrayBuffer(4)),
new Int32Array(new SharedArrayBuffer(4)),
new Int32Array(new SharedArrayBuffer(4))
];
const script = `
setTimeout(() => { Atomics.add(input.shared[0], 0, 1); Atomics.notify(input.shared[0], 0, Infinity); }, 100);
setTimeout(() => { Atomics.add(input.shared[1], 0, 1); Atomics.notify(input.shared[1], 0, Infinity); }, 300);
setTimeout(() => { Atomics.add(input.shared[2], 0, 1); Atomics.notify(input.shared[2], 0, Infinity); }, 500);
true
`;
const promise = pool.run({
code: script,
shared: bufs
});
t.plan(2);
const atResult1 = Atomics.wait(bufs[0], 0, 0);
const atResult2 = Atomics.wait(bufs[1], 0, 0);
const atResult3 = Atomics.wait(bufs[2], 0, 0);
t.same([atResult1, atResult2, atResult3], ['ok', 'ok', 'ok']);
t.equal(await promise, true);
});

@@ -0,17 +1,25 @@

import { resolve } from 'node:path';
import { spawn } from 'node:child_process';
import concat from 'concat-stream';
import { spawn } from 'child_process';
import { resolve } from 'path';
import { test } from 'tap';
test('console.log() calls are not blocked by Atomics.wait()', async ({ equal }) => {
test('console.log() calls are not blocked by Atomics.wait() (sync mode)', async ({ equal }) => {
const proc = spawn(process.execPath, [
...process.execArgv, resolve(__dirname, 'fixtures/console-log.ts')
], {
stdio: ['inherit', 'pipe', 'inherit']
stdio: ['inherit', 'pipe', 'pipe'],
env: {
PISCINA_ENABLE_ASYNC_ATOMICS: '0'
}
});
const data = await new Promise((resolve) => {
const dataStdout = await new Promise((resolve) => {
proc.stdout.setEncoding('utf8').pipe(concat(resolve));
});
equal(data, 'A\nB\n');
const dataStderr = await new Promise((resolve) => {
proc.stderr.setEncoding('utf8').pipe(concat(resolve));
});
equal(dataStdout, 'A\n');
equal(dataStderr, 'B\n');
});

@@ -193,3 +193,3 @@ import { test } from 'tap';

const result = await pool.runTask(null);
const result = await pool.run(null);
equal(result, 'done');

@@ -206,3 +206,3 @@ });

const tasks = ['1+1', '2+2', '3+3'];
const results = await Promise.all(tasks.map((task) => pool.runTask(task)));
const results = await Promise.all(tasks.map((task) => pool.run(task)));
// eslint-disable-next-line

@@ -209,0 +209,0 @@ const expected = tasks.map(eval);

@@ -6,5 +6,8 @@ import Piscina from '../..';

filename: resolve(__dirname, 'eval.js'),
maxThreads: 1
maxThreads: 1,
env: {
PISCINA_ENABLE_ASYNC_ATOMICS: process.env.PISCINA_ENABLE_ASYNC_ATOMICS
}
});
pool.runTask('console.log("A"); console.log("B");');
pool.run('console.log("A"); console.error("B");');
import Piscina from '..';
import { test } from 'tap';
import { resolve } from 'path';
import { PiscinaWorker } from '../dist/worker_pool';

@@ -12,3 +13,3 @@ test('pool will maintain run and wait time histograms by default', async ({ equal, ok }) => {

for (let n = 0; n < 10; n++) {
tasks.push(pool.runTask('42'));
tasks.push(pool.run('42'));
}

@@ -42,3 +43,3 @@ await Promise.all(tasks);

for (let n = 0; n < 10; n++) {
tasks.push(pool.runTask('42'));
tasks.push(pool.run('42'));
}

@@ -62,3 +63,3 @@ await Promise.all(tasks);

for (let n = 0; n < 10; n++) {
tasks.push(pool.runTask('42'));
tasks.push(pool.run('42'));
}

@@ -70,1 +71,126 @@ await Promise.all(tasks);

});
test('workers has histogram', async t => {
let index = 0;
let list: PiscinaWorker[];
// Its expected to have one task get balanced twice due to the load balancer distribution
// first task enters, its distributed; second is enqueued, once first is done, second is distributed and normalizes
t.plan(4);
const pool = new Piscina({
filename: resolve(__dirname, 'fixtures/eval.js'),
maxThreads: 1,
concurrentTasksPerWorker: 1,
workerHistogram: true,
loadBalancer (_task, workers) {
// Verify distribution to properly test this feature
const candidate = workers[index++ % workers.length];
// We assign it everytime is called to check the histogram
// and that the list remains the same
list = workers;
if (candidate.currentUsage !== 0) {
return null;
}
return candidate;
}
});
const tasks = [];
for (let n = 0; n < 10; n++) {
tasks.push(pool.run('new Promise(resolve => setTimeout(resolve, 500))'));
}
await Promise.all(tasks);
const histogram = list[0].histogram;
t.type(histogram?.average, 'number');
t.type(histogram?.max, 'number');
t.type(histogram?.mean, 'number');
t.type(histogram?.min, 'number');
});
test('workers does not have histogram if disabled', async t => {
let index = 0;
// After each task the balancer is called to distribute the next task
// The first task is distributed, the second is enqueued, once the first is done, the second is distributed and normalizes
t.plan(10 * 2);
const pool = new Piscina({
filename: resolve(__dirname, 'fixtures/eval.js'),
maxThreads: 1,
concurrentTasksPerWorker: 1,
workerHistogram: false,
loadBalancer (_task, workers) {
// Verify distribution to properly test this feature
const candidate = workers[index++ % workers.length];
const histogram = candidate.histogram;
t.notOk(histogram);
if (candidate.currentUsage !== 0) {
return null;
}
return candidate;
}
});
const tasks = [];
for (let n = 0; n < 10; n++) {
tasks.push(pool.run('new Promise(resolve => setTimeout(resolve, 500))'));
}
await Promise.all(tasks);
});
// test('histogram of worker should be initialized with max concurrent task set as min', { only: true }, async t => {
// // After each task the balancer is called to distribute the next task
// // The first task is distributed, the second is enqueued, once the first is done, the second is distributed and normalizes
// let counter = 0;
// const pool = new Piscina({
// filename: resolve(__dirname, 'fixtures/eval.js'),
// maxThreads: 2,
// concurrentTasksPerWorker: 1,
// workerHistogram: true,
// });
// const tasks = [];
// t.plan(10 * 2);
// pool.on('workerCreate', worker => {
// if (counter === 0) {
// t.equal(worker.histogram.min, 0);
// } else {
// t.equal(worker.histogram.min, 1);
// }
// })
// for (let n = 0; n < 10; n++) {
// tasks.push(pool.run('new Promise(resolve => setTimeout(resolve, 500))'));
// }
// await Promise.all(tasks);
// });
test('opts.workerHistogram should be a boolean value', async t => {
let index = 0;
t.plan(1);
t.throws(() => {
// eslint-disable-next-line no-new
new Piscina({
filename: resolve(__dirname, 'fixtures/eval.js'),
maxThreads: 1,
concurrentTasksPerWorker: 1,
// @ts-expect-error
workerHistogram: 1,
loadBalancer (_task, workers) {
// Verify distribution to properly test this feature
const candidate = workers[index++ % workers.length];
const histogram = candidate.histogram;
t.notOk(histogram);
if (candidate.currentUsage !== 0) {
return null;
}
return candidate;
}
});
}, 'options.workerHistogram must be a boolean');
});

@@ -20,4 +20,4 @@ import Piscina from '..';

const firstTasks = [
pool.runTask([buffer, 2]),
pool.runTask([buffer, 2])
pool.run([buffer, 2]),
pool.run([buffer, 2])
];

@@ -33,4 +33,4 @@ equal(pool.threads.length, 2);

const secondTasks = [
pool.runTask([buffer, 4]),
pool.runTask([buffer, 4])
pool.run([buffer, 4]),
pool.run([buffer, 4])
];

@@ -37,0 +37,0 @@ equal(pool.threads.length, 2);

@@ -13,3 +13,3 @@ import Piscina from '..';

const taskResult = pool.runTask(`
const taskResult = pool.run(`
require('worker_threads').parentPort.postMessage("some message");

@@ -16,0 +16,0 @@ 42

@@ -78,19 +78,4 @@ import Piscina from '..';

{
const ab = new ArrayBuffer(10);
const ret = await pool.runTask(Piscina.move(ab));
equal(ab.byteLength, 0); // It was moved
ok(types.isAnyArrayBuffer(ret));
}
{
// Test with empty transferList
const ab = new ArrayBuffer(10);
const ret = await pool.runTask(Piscina.move(ab), []);
equal(ab.byteLength, 0); // It was moved
ok(types.isAnyArrayBuffer(ret));
}
{
// Test with empty transferList
const ab = new ArrayBuffer(10);
const ret = await pool.run(Piscina.move(ab));

@@ -97,0 +82,0 @@ equal(ab.byteLength, 0); // It was moved

@@ -6,39 +6,45 @@ import Piscina from '..';

test('can set niceness for threads on Linux', {
skip: process.platform !== 'linux'
}, async ({ equal }) => {
const worker = new Piscina({
filename: resolve(__dirname, 'fixtures/eval.js'),
niceIncrement: 5
test('niceness - Linux:', { skip: process.platform !== 'linux' }, scope => {
scope.plan(2);
scope.test('can set niceness for threads on Linux', async ({ equal }) => {
const worker = new Piscina({
filename: resolve(__dirname, 'fixtures/eval.js'),
niceIncrement: 5
});
// ts-ignore because the dependency is not installed on Windows.
// @ts-ignore
const currentNiceness = getCurrentProcessPriority();
const result = await worker.run('require("@napi-rs/nice").getCurrentProcessPriority()');
// niceness is capped to 19 on Linux.
const expected = Math.min(currentNiceness + 5, 19);
equal(result, expected);
});
const currentNiceness = getCurrentProcessPriority();
const result = await worker.runTask('require("@napi-rs/nice").getCurrentProcessPriority()');
scope.test('setting niceness never does anything bad', async ({ equal }) => {
const worker = new Piscina({
filename: resolve(__dirname, 'fixtures/eval.js'),
niceIncrement: 5
});
// niceness is capped to 19 on Linux.
const expected = Math.min(currentNiceness + 5, 19);
equal(result, expected);
const result = await worker.run('42');
equal(result, 42);
});
});
test('can set niceness for threads on Windows', {
test('niceness - Windows', {
skip: process.platform !== 'win32'
}, async ({ equal }) => {
const worker = new Piscina({
filename: resolve(__dirname, 'fixtures/eval.js'),
niceIncrement: WindowsThreadPriority.ThreadPriorityAboveNormal
});
}, scope => {
scope.plan(1);
scope.test('can set niceness for threads on Windows', async ({ equal }) => {
const worker = new Piscina({
filename: resolve(__dirname, 'fixtures/eval.js'),
niceIncrement: WindowsThreadPriority.ThreadPriorityAboveNormal
});
const result = await worker.runTask('require("@napi-rs/nice").getCurrentProcessPriority()');
const result = await worker.run('require("@napi-rs/nice").getCurrentProcessPriority()');
equal(result, WindowsThreadPriority.ThreadPriorityAboveNormal);
});
test('setting niceness never does anything bad', async ({ equal }) => {
const worker = new Piscina({
filename: resolve(__dirname, 'fixtures/eval.js'),
niceIncrement: 5
equal(result, WindowsThreadPriority.ThreadPriorityAboveNormal);
});
const result = await worker.runTask('42');
equal(result, 42);
});

@@ -77,10 +77,10 @@ import Piscina from '..';

test('useAtomics must be a boolean', async ({ throws }) => {
test('atomics must be valid', async ({ throws }) => {
throws(() => new Piscina(({
useAtomics: -1
}) as any), /options.useAtomics must be a boolean/);
atomics: -1
}) as any), /options.atomics should be a value of sync, sync or disabled./);
throws(() => new Piscina(({
useAtomics: 'string'
}) as any), /options.useAtomics must be a boolean/);
atomics: 'string'
}) as any), /options.atomics should be a value of sync, sync or disabled./);
});

@@ -87,0 +87,0 @@

@@ -0,5 +1,7 @@

import { once } from 'node:events';
import { resolve } from 'node:path';
import { test } from 'tap';
import Piscina from '..';
import { resolve } from 'path';
import { once } from 'events';

@@ -18,70 +20,52 @@ test('close()', async (t) => {

});
});
t.test('queued tasks waits for all tasks to complete', async (t) => {
const pool = new Piscina({ filename: resolve(__dirname, 'fixtures/sleep.js'), maxThreads: 1 });
test('queued tasks waits for all tasks to complete', async (t) => {
const pool = new Piscina({ filename: resolve(__dirname, 'fixtures/sleep.js'), maxThreads: 1 });
const task1 = pool.run({ time: 100 });
const task2 = pool.run({ time: 100 });
const task1 = pool.run({ time: 100 });
const task2 = pool.run({ time: 100 });
setImmediate(() => t.resolves(pool.close(), 'close is resolved when all running tasks are completed'));
setImmediate(() => t.resolves(pool.close(), 'close is resolved when all running tasks are completed'));
await Promise.all([
t.resolves(once(pool, 'close'), 'handler is called when pool is closed'),
t.resolves(task1, 'complete running task'),
t.resolves(task2, 'complete running task')
]);
});
await Promise.all([
t.resolves(once(pool, 'close'), 'handler is called when pool is closed'),
t.resolves(task1, 'complete running task'),
t.resolves(task2, 'complete running task')
]);
});
t.test('abort any task enqueued during closing up', async (t) => {
const pool = new Piscina({ filename: resolve(__dirname, 'fixtures/sleep.js'), maxThreads: 1 });
test('abort any task enqueued during closing up', async (t) => {
const pool = new Piscina({ filename: resolve(__dirname, 'fixtures/sleep.js'), maxThreads: 1 });
setImmediate(() => {
t.resolves(pool.close(), 'close is resolved when running tasks are completed');
t.resolves(pool.run({ time: 1000 }).then(null, err => {
t.equal(err.message, 'The task has been aborted');
t.equal(err.cause, 'queue is closing up');
}));
});
setImmediate(() => {
t.resolves(pool.close(), 'close is resolved when running tasks are completed');
t.resolves(pool.run({ time: 1000 }).then(null, err => {
t.equal(err.message, 'The task has been aborted');
t.equal(err.cause, 'queue is being terminated');
}));
});
await t.resolves(pool.run({ time: 100 }), 'complete running task');
});
await t.resolves(pool.run({ time: 100 }), 'complete running task');
});
test('close({force: true})', async (t) => {
t.test('queued tasks waits for all tasks already running and aborts tasks that are not started yet', async (t) => {
const pool = new Piscina({ filename: resolve(__dirname, 'fixtures/sleep.js'), maxThreads: 1, concurrentTasksPerWorker: 1 });
test('force: queued tasks waits for all tasks already running and aborts tasks that are not started yet', async (t) => {
const pool = new Piscina({ filename: resolve(__dirname, 'fixtures/sleep.js'), maxThreads: 1, concurrentTasksPerWorker: 2 });
const task1 = pool.run({ time: 1000 });
const task2 = pool.run({ time: 200 });
const task1 = pool.run({ time: 1000 });
const task2 = pool.run({ time: 1000 });
// const task3 = pool.run({ time: 100 });
// const task4 = pool.run({ time: 100 });
setImmediate(() => t.resolves(pool.close({ force: true }), 'close is resolved when all running tasks are completed'));
t.plan(6);
await Promise.all([
t.resolves(once(pool, 'close'), 'handler is called when pool is closed'),
t.resolves(task1, 'complete running task'),
t.resolves(task2.then(null, err => {
t.equal(err.message, 'The task has been aborted');
t.equal(err.cause, 'pool is closed');
}))
]);
});
t.resolves(pool.close({ force: true }));
t.resolves(once(pool, 'close'), 'handler is called when pool is closed');
t.resolves(task1, 'complete running task');
t.resolves(task2, 'complete running task');
t.rejects(pool.run({ time: 100 }), /The task has been aborted/, 'abort task that are not started yet');
t.rejects(pool.run({ time: 100 }), /The task has been aborted/, 'abort task that are not started yet');
t.test('queued tasks waits for all tasks already running and aborts tasks that are not started yet', async (t) => {
const pool = new Piscina({ filename: resolve(__dirname, 'fixtures/sleep.js'), maxThreads: 1, concurrentTasksPerWorker: 2 });
const task1 = pool.run({ time: 500 });
const task2 = pool.run({ time: 100 });
const task3 = pool.run({ time: 100 });
const task4 = pool.run({ time: 100 });
setImmediate(() => t.resolves(pool.close({ force: true }), 'close is resolved when all running tasks are completed'));
await Promise.all([
t.resolves(once(pool, 'close'), 'handler is called when pool is closed'),
t.resolves(task1, 'complete running task'),
t.resolves(task2, 'complete running task'),
t.rejects(task3, /The task has been aborted/, 'abort task that are not started yet'),
t.rejects(task4, /The task has been aborted/, 'abort task that are not started yet')
]);
});
await task1;
await task2;
});

@@ -88,0 +72,0 @@

@@ -10,3 +10,3 @@ import Piscina from '..';

setImmediate(() => pool.destroy());
await rejects(pool.runTask('while(1){}'), /Terminating worker thread/);
await rejects(pool.run('while(1){}'), /Terminating worker thread/);
});

@@ -13,13 +13,2 @@ import { MessageChannel } from 'worker_threads';

const ab = new ArrayBuffer(40);
await pool.runTask({ ab }, [ab]);
equal(pool.completed, 1);
equal(ab.byteLength, 0);
});
test('postTask() can transfer ArrayBuffer instances', async ({ equal }) => {
const pool = new Piscina({
filename: resolve(__dirname, 'fixtures/simple-isworkerthread.ts')
});
const ab = new ArrayBuffer(40);
await pool.run({ ab }, { transferList: [ab] });

@@ -36,3 +25,3 @@ equal(pool.completed, 1);

const obj = new MessageChannel().port1;
rejects(pool.runTask({ obj }));
rejects(pool.run({ obj }));
});

@@ -45,3 +34,3 @@

rejects(pool.runTask('Promise.reject(new Error("foo"))'), /foo/);
rejects(pool.run('Promise.reject(new Error("foo"))'), /foo/);
});

@@ -54,3 +43,3 @@

rejects(pool.runTask('throw new Error("foo")'), /foo/);
rejects(pool.run('throw new Error("foo")'), /foo/);
});

@@ -63,5 +52,2 @@

rejects(pool.runTask('0', 42 as any),
/transferList argument must be an Array/);
rejects(pool.run('0', { transferList: 42 as any }),

@@ -76,5 +62,2 @@ /transferList argument must be an Array/);

rejects(pool.runTask('0', [], 42 as any),
/filename argument must be a string/);
rejects(pool.run('0', { filename: 42 as any }),

@@ -98,5 +81,2 @@ /filename argument must be a string/);

rejects(pool.runTask('0', [], undefined, 42 as any),
/signal argument must be an object/);
rejects(pool.run('0', { signal: 42 as any }),

@@ -108,3 +88,4 @@ /signal argument must be an object/);

const pool = new Piscina({
filename: resolve(__dirname, 'fixtures/eval.js')
filename: resolve(__dirname, 'fixtures/eval.js'),
maxThreads: 1
});

@@ -119,3 +100,3 @@

await Promise.all([pool.run('123'), pool.run('123')]);
await Promise.all([pool.run('123'), pool.run('123'), pool.run('123')]);

@@ -126,3 +107,3 @@ ok(drained);

test('Piscina exposes/emits needsDrain to true when capacity is exceeded', async ({ ok }) => {
test('Piscina exposes/emits needsDrain to true when capacity is exceeded', ({ ok, pass, plan }) => {
const pool = new Piscina({

@@ -134,9 +115,9 @@ filename: resolve(__dirname, 'fixtures/eval.js'),

let triggered = false;
let drained = false;
plan(3);
pool.once('drain', () => {
drained = true;
pass();
});
pool.once('needsDrain', () => {
triggered = true;
pass();
});

@@ -150,4 +131,2 @@

ok(pool.needsDrain);
ok(triggered);
ok(drained);
});

@@ -159,3 +138,3 @@

});
equal(await pool.runTask('1'), 1);
equal(await pool.run('1'), 1);
});

@@ -167,3 +146,3 @@

});
equal(await pool.runTask('1'), 1);
equal(await pool.run('1'), 1);
});

@@ -170,0 +149,0 @@

@@ -24,3 +24,3 @@ import Piscina from '..';

});
const result = await worker.runTask(null);
const result = await worker.run(null);
equal(result, 'done');

@@ -33,6 +33,14 @@ });

});
const result = await worker.runTask(null);
const result = await worker.run(null);
equal(result, 'done');
});
test('Piscina.isWorkerThread has the correct value (worker) with named import', async ({ equal }) => {
const worker = new Piscina({
filename: resolve(__dirname, 'fixtures/simple-isworkerthread-named-import.ts')
});
const result = await worker.run(null);
equal(result, 'done');
});
test('Piscina instance is an EventEmitter', async ({ ok }) => {

@@ -59,3 +67,3 @@ const piscina = new Piscina();

});
const result = await worker.runTask('42');
const result = await worker.run('42');
equal(result, 42);

@@ -68,3 +76,3 @@ });

});
const result = await worker.runTask('Promise.resolve(42)');
const result = await worker.run('Promise.resolve(42)');
equal(result, 42);

@@ -75,5 +83,5 @@ });

const worker = new Piscina();
const result = await worker.runTask(
const result = await worker.run(
'Promise.resolve(42)',
resolve(__dirname, 'fixtures/eval.js'));
{ filename: resolve(__dirname, 'fixtures/eval.js') });
equal(result, 42);

@@ -84,5 +92,5 @@ });

const worker = new Piscina({ filename: null });
const result = await worker.runTask(
const result = await worker.run(
'Promise.resolve(42)',
resolve(__dirname, 'fixtures/eval.js'));
{ filename: resolve(__dirname, 'fixtures/eval.js') });
equal(result, 42);

@@ -93,3 +101,3 @@ });

const worker = new Piscina();
rejects(worker.runTask('doesn’t matter'),
rejects(worker.run('doesn’t matter'),
/filename must be provided to run\(\) or in options object/);

@@ -104,3 +112,3 @@ });

const env = await pool.runTask('({...process.env})');
const env = await pool.run('({...process.env})');
same(env, { A: 'foo' });

@@ -115,3 +123,3 @@ });

const env = await pool.runTask('process.argv.slice(2)');
const env = await pool.run('process.argv.slice(2)');
same(env, ['a', 'b', 'c']);

@@ -126,3 +134,3 @@ });

const env = await pool.runTask('process.execArgv');
const env = await pool.run('process.execArgv');
same(env, ['--no-warnings']);

@@ -138,3 +146,3 @@ });

await pool.runTask(null);
await pool.run(null);
});

@@ -149,5 +157,15 @@

await pool.runTask(null);
await pool.run(null);
});
test('passing valid workerData works with named import', async ({ equal }) => {
const pool = new Piscina({
filename: resolve(__dirname, 'fixtures/simple-workerdata-named-import.ts'),
workerData: 'ABC'
});
equal(Piscina.workerData, undefined);
await pool.run(null);
});
test('passing invalid workerData does not work', async ({ throws }) => {

@@ -166,3 +184,3 @@ throws(() => new Piscina(({

});
const result = await worker.runTask('42');
const result = await worker.run('42');
equal(result, 42);

@@ -175,3 +193,3 @@ });

});
const result = await worker.runTask('42');
const result = await worker.run('42');
equal(result, 42);

@@ -189,5 +207,5 @@ });

await Promise.all([
worker.runTask('42'),
worker.runTask('41'),
worker.runTask('40')
worker.run('42'),
worker.run('41'),
worker.run('40')
]);

@@ -194,0 +212,0 @@

@@ -24,15 +24,15 @@ import Piscina, { PiscinaTask, TaskQueue } from '..';

results.push(pool.runTask(buffers[0]));
results.push(pool.run(buffers[0]));
equal(pool.threads.length, 2);
equal(pool.queueSize, 0);
results.push(pool.runTask(buffers[1]));
results.push(pool.run(buffers[1]));
equal(pool.threads.length, 2);
equal(pool.queueSize, 0);
results.push(pool.runTask(buffers[2]));
results.push(pool.run(buffers[2]));
equal(pool.threads.length, 3);
equal(pool.queueSize, 0);
results.push(pool.runTask(buffers[3]));
results.push(pool.run(buffers[3]));
equal(pool.threads.length, 3);

@@ -63,15 +63,15 @@ equal(pool.queueSize, 1);

rejects(pool.runTask('while (true) {}'), /Terminating worker thread/);
rejects(pool.run('while (true) {}'), /Terminating worker thread/);
equal(pool.threads.length, 1);
equal(pool.queueSize, 0);
rejects(pool.runTask('while (true) {}'), /Terminating worker thread/);
rejects(pool.run('while (true) {}'), /Terminating worker thread/);
equal(pool.threads.length, 1);
equal(pool.queueSize, 1);
rejects(pool.runTask('while (true) {}'), /Terminating worker thread/);
rejects(pool.run('while (true) {}'), /Terminating worker thread/);
equal(pool.threads.length, 1);
equal(pool.queueSize, 2);
rejects(pool.runTask('while (true) {}'), /Task queue is at limit/);
rejects(pool.run('while (true) {}'), /Task queue is at limit/);
await pool.destroy();

@@ -91,7 +91,7 @@ });

rejects(pool.runTask('while (true) {}'), /Terminating worker thread/);
rejects(pool.run('while (true) {}'), /Terminating worker thread/);
equal(pool.threads.length, 1);
equal(pool.queueSize, 0);
rejects(pool.runTask('while (true) {}'), /No task queue available and all Workers are busy/);
rejects(pool.run('while (true) {}'), /No task queue available and all Workers are busy/);
await pool.destroy();

@@ -111,7 +111,7 @@ });

rejects(pool.runTask('while (true) {}'), /Terminating worker thread/);
rejects(pool.run('while (true) {}'), /Terminating worker thread/);
equal(pool.threads.length, 1);
equal(pool.queueSize, 0);
rejects(pool.runTask('while (true) {}'), /No task queue available and all Workers are busy/);
rejects(pool.run('while (true) {}'), /No task queue available and all Workers are busy/);
await pool.destroy();

@@ -132,7 +132,7 @@ });

rejects(pool.runTask(new Int32Array(new SharedArrayBuffer(4))));
rejects(pool.run(new Int32Array(new SharedArrayBuffer(4))));
equal(pool.threads.length, 1);
equal(pool.queueSize, 0);
rejects(pool.runTask(new Int32Array(new SharedArrayBuffer(4))));
rejects(pool.run(new Int32Array(new SharedArrayBuffer(4))));
equal(pool.threads.length, 1);

@@ -146,3 +146,3 @@ equal(pool.queueSize, 0);

const pool = new Piscina({
filename: resolve(__dirname, 'fixtures/wait-for-notify.ts'),
filename: resolve(__dirname, 'fixtures/wait-for-notify.js'),
minThreads: 0,

@@ -162,9 +162,10 @@ maxThreads: 1,

const firstTask = pool.runTask(buffers[0]);
const firstTask = pool.run(buffers[0]);
equal(pool.threads.length, 1);
equal(pool.queueSize, 0);
rejects(pool.runTask(
rejects(pool.run(
'new Promise((resolve) => setTimeout(resolve, 1000000))',
resolve(__dirname, 'fixtures/eval.js')), /Terminating worker thread/);
{ filename: resolve(__dirname, 'fixtures/eval.js') })
, /Terminating worker thread/);
equal(pool.threads.length, 1);

@@ -200,7 +201,7 @@ equal(pool.queueSize, 0);

const firstTask = pool.runTask(buffers[0]);
const firstTask = pool.run(buffers[0]);
equal(pool.threads.length, 1);
equal(pool.queueSize, 0);
const secondTask = pool.runTask(buffers[1]);
const secondTask = pool.run(buffers[1]);
equal(pool.threads.length, 1);

@@ -275,5 +276,5 @@ equal(pool.queueSize, 0);

const ret = await Promise.all([
pool.runTask(makeTask({ a: 1 }, 1)),
pool.runTask(makeTask({ a: 2 }, 2)),
pool.runTask({ a: 3 }) // No queueOptionsSymbol attached
pool.run(makeTask({ a: 1 }, 1)),
pool.run(makeTask({ a: 2 }, 2)),
pool.run({ a: 3 }) // No queueOptionsSymbol attached
]);

@@ -280,0 +281,0 @@

@@ -12,3 +12,3 @@ import Piscina from '..';

filename: resolve(__dirname, 'fixtures/send-buffer-then-get-length.js'),
useAtomics: false
atomics: 'disabled'
});

@@ -21,6 +21,11 @@ await pool.run({}, { name: 'send' });

test('objects that implement transferable must be transferred', async ({ equal }) => {
test('objects that implement transferable must be transferred', async ({
equal
}) => {
const pool = new Piscina({
filename: resolve(__dirname, 'fixtures/send-transferrable-then-get-length.js'),
useAtomics: false
filename: resolve(
__dirname,
'fixtures/send-transferrable-then-get-length.js'
),
atomics: 'disabled'
});

@@ -27,0 +32,0 @@ await pool.run({}, { name: 'send' });

@@ -32,4 +32,4 @@ import Piscina from '..';

equal(limits.codeRangeSizeMb, 16);
rejects(worker.runTask(null),
rejects(worker.run(null),
/Worker terminated due to reaching memory limit: JS heap out of memory/);
});

@@ -10,3 +10,3 @@ import Piscina from '..';

});
await rejects(pool.runTask('throw new Error("not_caught")'), /not_caught/);
await rejects(pool.run('throw new Error("not_caught")'), /not_caught/);
});

@@ -19,3 +19,3 @@

await rejects(
pool.runTask(`
pool.run(`
setImmediate(() => { throw new Error("not_caught") });

@@ -30,3 +30,3 @@ new Promise(() => {}) /* act as if we were doing some work */

maxThreads: 1,
useAtomics: false
atomics: 'disabled'
});

@@ -36,3 +36,3 @@

const taskResult = pool.runTask(`
const taskResult = pool.run(`
setTimeout(() => { throw new Error("not_caught") }, 500);

@@ -58,3 +58,3 @@ 42

const originalThreadId = pool.threads[0].threadId;
await rejects(pool.runTask('process.exit(1);'), /worker exited with code: 1/);
await rejects(pool.run('process.exit(1);'), /worker exited with code: 1/);
const newThreadId = pool.threads[0].threadId;

@@ -71,3 +71,3 @@ not(originalThreadId, newThreadId);

const originalThreadId = pool.threads[0].threadId;
const taskResult = await pool.runTask(`
const taskResult = await pool.run(`
setTimeout(() => { process.exit(1); }, 50);

@@ -78,3 +78,3 @@ 42

await rejects(pool.runTask(`
await rejects(pool.run(`
'use strict';

@@ -81,0 +81,0 @@

@@ -0,24 +1,41 @@

import { resolve } from 'node:path';
import { cpus } from 'node:os';
import { once } from 'node:events';
import Piscina from '..';
import { cpus } from 'os';
import { test } from 'tap';
import { resolve } from 'path';
test('will start with minThreads and max out at maxThreads', async ({ equal, rejects }) => {
test('will start with minThreads and max out at maxThreads', { only: true }, async ({ equal, rejects }) => {
const pool = new Piscina({
filename: resolve(__dirname, 'fixtures/eval.js'),
minThreads: 2,
maxThreads: 4
maxThreads: 4,
concurrentTasksPerWorker: 1
});
let counter = 0;
pool.on('workerCreate', () => {
counter++;
});
equal(pool.threads.length, 2);
rejects(pool.runTask('while(true) {}'));
equal(pool.threads.length, 2);
rejects(pool.runTask('while(true) {}'));
equal(pool.threads.length, 2);
rejects(pool.runTask('while(true) {}'));
equal(pool.threads.length, 3);
rejects(pool.runTask('while(true) {}'));
rejects(pool.run('while(true) {}'));
rejects(pool.run('while(true) {}'));
// #3
rejects(pool.run('while(true) {}'));
await once(pool, 'workerCreate');
// #4
rejects(pool.run('while(true) {}'));
await once(pool, 'workerCreate');
// #4 - as spawn does not happen synchronously anymore, we wait for the signal once more
rejects(pool.run('while(true) {}'));
await once(pool, 'workerCreate');
equal(pool.threads.length, 4);
rejects(pool.runTask('while(true) {}'));
equal(pool.threads.length, 4);
await pool.destroy();
equal(pool.threads.length, 0);
equal(counter, 4);
});

@@ -25,0 +42,0 @@

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc