Socket
Socket
Sign inDemoInstall

@supercharge/promise-pool

Package Overview
Dependencies
0
Maintainers
3
Versions
24
Alerts
File Explorer

Advanced tools

Install Socket

Detect and block malicious and high-risk dependencies

Install

Comparing version 2.3.2 to 2.4.0

6

dist/contracts.d.ts

@@ -46,4 +46,4 @@ export interface UsesConcurrency {

}
export declare type ErrorHandler<T> = (error: Error, item: T, pool: Stoppable & UsesConcurrency) => void | Promise<void>;
export declare type ProcessHandler<T, R> = (item: T, index: number, pool: Stoppable & UsesConcurrency) => R | Promise<R>;
export declare type OnProgressCallback<T> = (item: T, pool: Stoppable & Statistics<T> & UsesConcurrency) => void;
export type ErrorHandler<T> = (error: Error, item: T, pool: Stoppable & UsesConcurrency) => Promise<void> | void;
export type ProcessHandler<T, R> = (item: T, index: number, pool: Stoppable & UsesConcurrency) => Promise<R> | R;
export type OnProgressCallback<T> = (item: T, pool: Stoppable & Statistics<T> & UsesConcurrency) => void;

@@ -6,2 +6,5 @@ export declare class PromisePoolError<T, E = any> extends Error {

item: T;
/**
* Returns the original, raw error instance.
*/
raw: E;

@@ -8,0 +11,0 @@ /**

@@ -46,2 +46,10 @@ import { ReturnValue } from './return-value';

/**
* Set the timeout in ms for the pool handler
*
* @param {Number} timeout
*
* @returns {PromisePool}
*/
withTaskTimeout(timeout: number | undefined): this;
/**
* Returns the number of concurrently processed tasks.

@@ -53,2 +61,14 @@ *

/**
* Assign whether to keep corresponding results between source items and resulting tasks.
*/
useCorrespondingResults(shouldResultsCorrespond: boolean): this;
/**
* Determine whether to keep corresponding results between source items and resulting tasks.
*/
shouldUseCorrespondingResults(): boolean;
/**
* Returns the task timeout in milliseconds.
*/
taskTimeout(): number | undefined;
/**
* Set the items to be processed in the promise pool.

@@ -114,3 +134,3 @@ *

*/
results(): R[];
results(): Array<R | symbol>;
/**

@@ -187,3 +207,3 @@ * Returns the list of errors.

*/
start(): Promise<ReturnValue<T, R>>;
start(): Promise<any>;
/**

@@ -198,2 +218,6 @@ * Determine whether the pool should stop.

/**
* Prefill the results array with `notRun` symbol values if results should correspond.
*/
private prepareResultsArray;
/**
* Starts processing the promise pool by iterating over the items

@@ -212,2 +236,6 @@ * and running each item through the async `callback` function.

/**
* Wait for the next, currently active task to finish processing.
*/
waitForActiveTaskToFinish(): Promise<void>;
/**
* Create a processing function for the given `item`.

@@ -229,9 +257,14 @@ *

/**
* Save the given calculation `result`.
* Returns a promise that times-out after the configured task timeout.
*/
private createTaskTimeout;
/**
* Save the given calculation `result`, possibly at the provided `position`.
*
* @param {*} result
* @param {number} position
*
* @returns {PromisePoolExecutor}
*/
save(result: any): this;
save(result: any, position: number): this;
/**

@@ -248,4 +281,5 @@ * Remove the given `task` from the list of active tasks.

* @param {T} item
* @param {number} index
*/
handleErrorFor(error: Error, item: T): Promise<void>;
handleErrorFor(error: Error, item: T, index: number): Promise<void>;
/**

@@ -300,3 +334,3 @@ * Determine whether the given `error` is a `StopThePromisePoolError` instance.

*/
drained(): Promise<ReturnValue<T, R>>;
drained(): Promise<ReturnValue<T, any>>;
/**

@@ -303,0 +337,0 @@ * Wait for all of the active tasks to finish processing.

'use strict';
Object.defineProperty(exports, "__esModule", { value: true });
exports.PromisePoolExecutor = void 0;
const promise_pool_1 = require("./promise-pool");
const promise_pool_error_1 = require("./promise-pool-error");

@@ -19,3 +20,5 @@ const stop_the_promise_pool_error_1 = require("./stop-the-promise-pool-error");

concurrency: 10,
shouldResultsCorrespond: false,
processedItems: [],
taskTimeout: 0
};

@@ -52,2 +55,13 @@ this.handler = () => { };

/**
* Set the timeout in ms for the pool handler
*
* @param {Number} timeout
*
* @returns {PromisePool}
*/
withTaskTimeout(timeout) {
this.meta.taskTimeout = timeout;
return this;
}
/**
* Returns the number of concurrently processed tasks.

@@ -61,2 +75,21 @@ *

/**
* Assign whether to keep corresponding results between source items and resulting tasks.
*/
useCorrespondingResults(shouldResultsCorrespond) {
this.meta.shouldResultsCorrespond = shouldResultsCorrespond;
return this;
}
/**
* Determine whether to keep corresponding results between source items and resulting tasks.
*/
shouldUseCorrespondingResults() {
return this.meta.shouldResultsCorrespond;
}
/**
* Returns the task timeout in milliseconds.
*/
taskTimeout() {
return this.meta.taskTimeout;
}
/**
* Set the items to be processed in the promise pool.

@@ -242,3 +275,6 @@ *

async start() {
return await this.validateInputs().process();
return await this
.validateInputs()
.prepareResultsArray()
.process();
}

@@ -256,11 +292,15 @@ /**

}
const timeout = this.taskTimeout();
if (!(timeout == null || (typeof timeout === 'number' && timeout >= 0))) {
throw validation_error_1.ValidationError.createFrom(`"timeout" must be undefined or a number. A number must be 0 or up. Received "${String(timeout)}" (${typeof timeout})`);
}
if (!Array.isArray(this.items())) {
throw validation_error_1.ValidationError.createFrom(`"items" must be an array. Received ${typeof this.items()}`);
throw validation_error_1.ValidationError.createFrom(`"items" must be an array. Received "${typeof this.items()}"`);
}
if (this.errorHandler && typeof this.errorHandler !== 'function') {
throw validation_error_1.ValidationError.createFrom(`The error handler must be a function. Received ${typeof this.errorHandler}`);
throw validation_error_1.ValidationError.createFrom(`The error handler must be a function. Received "${typeof this.errorHandler}"`);
}
this.onTaskStartedHandlers.forEach(handler => {
if (handler && typeof handler !== 'function') {
throw validation_error_1.ValidationError.createFrom(`The onTaskStarted handler must be a function. Received ${typeof handler}`);
throw validation_error_1.ValidationError.createFrom(`The onTaskStarted handler must be a function. Received "${typeof handler}"`);
}

@@ -270,3 +310,3 @@ });

if (handler && typeof handler !== 'function') {
throw validation_error_1.ValidationError.createFrom(`The error handler must be a function. Received ${typeof handler}`);
throw validation_error_1.ValidationError.createFrom(`The error handler must be a function. Received "${typeof handler}"`);
}

@@ -277,2 +317,11 @@ });

/**
* Prefill the results array with `notRun` symbol values if results should correspond.
*/
prepareResultsArray() {
if (this.shouldUseCorrespondingResults()) {
this.meta.results = Array(this.items().length).fill(promise_pool_1.PromisePool.notRun);
}
return this;
}
/**
* Starts processing the promise pool by iterating over the items

@@ -305,6 +354,12 @@ * and running each item through the async `callback` function.

while (this.hasReachedConcurrencyLimit()) {
await Promise.race(this.tasks());
await this.waitForActiveTaskToFinish();
}
}
/**
* Wait for the next, currently active task to finish processing.
*/
async waitForActiveTaskToFinish() {
await Promise.race(this.tasks());
}
/**
* Create a processing function for the given `item`.

@@ -318,6 +373,6 @@ *

.then(result => {
this.save(result).removeActive(task);
this.save(result, index).removeActive(task);
})
.catch(async (error) => {
await this.handleErrorFor(error, item);
await this.handleErrorFor(error, item, index);
this.removeActive(task);

@@ -341,13 +396,32 @@ })

async createTaskFor(item, index) {
return this.handler(item, index, this);
if (this.taskTimeout() === undefined) {
return this.handler(item, index, this);
}
return Promise.race([
this.handler(item, index, this),
this.createTaskTimeout(item)
]);
}
/**
* Save the given calculation `result`.
* Returns a promise that times-out after the configured task timeout.
*/
async createTaskTimeout(item) {
return new Promise((_resolve, reject) => {
setTimeout(() => {
reject(new promise_pool_error_1.PromisePoolError(`Promise in pool timed out after ${this.taskTimeout()}ms`, item));
}, this.taskTimeout());
});
}
/**
* Save the given calculation `result`, possibly at the provided `position`.
*
* @param {*} result
* @param {number} position
*
* @returns {PromisePoolExecutor}
*/
save(result) {
this.results().push(result);
save(result, position) {
this.shouldUseCorrespondingResults()
? this.results()[position] = result
: this.results().push(result);
return this;

@@ -369,4 +443,8 @@ }

* @param {T} item
* @param {number} index
*/
async handleErrorFor(error, item) {
async handleErrorFor(error, item, index) {
if (this.shouldUseCorrespondingResults()) {
this.results()[index] = promise_pool_1.PromisePool.failed;
}
if (this.isStoppingThePoolError(error)) {

@@ -373,0 +451,0 @@ return;

import { ReturnValue } from './return-value';
import { ErrorHandler, ProcessHandler, OnProgressCallback } from './contracts';
export declare class PromisePool<T> {
export declare class PromisePool<T, ShouldUseCorrespondingResults extends boolean = false> {
/**

@@ -13,2 +13,12 @@ * The processable items.

/**
* Determine whether to put a task’s result at the same position in the result
* array as its related source item has in the source array. Failing tasks
* and those items that didn’t run carry a related symbol as a value.
*/
private shouldResultsCorrespond;
/**
* The maximum timeout in milliseconds for the item handler, or `undefined` to disable.
*/
private timeout;
/**
* The error handler callback function

@@ -25,2 +35,4 @@ */

private readonly onTaskFinishedHandlers;
static readonly notRun: symbol;
static readonly failed: symbol;
/**

@@ -49,2 +61,18 @@ * Instantiates a new promise pool with a default `concurrency: 10` and `items: []`.

/**
* Set the timeout in milliseconds for the pool handler.
*
* @param {Number} timeout
*
* @returns {PromisePool}
*/
withTaskTimeout(timeout: number): PromisePool<T>;
/**
* Set the timeout in milliseconds for the pool handler.
*
* @param {Number} timeout
*
* @returns {PromisePool}
*/
static withTaskTimeout(timeout: number): PromisePool<unknown>;
/**
* Set the items to be processed in the promise pool.

@@ -82,10 +110,14 @@ *

/**
* Assign the given callback `handler` function to run when a task finished.
*
* @param {OnProgressCallback<T>} handler
*
* @returns {PromisePool}
*/
* Assign the given callback `handler` function to run when a task finished.
*
* @param {OnProgressCallback<T>} handler
*
* @returns {PromisePool}
*/
onTaskFinished(handler: OnProgressCallback<T>): PromisePool<T>;
/**
* Assign whether to keep corresponding results between source items and resulting tasks.
*/
useCorrespondingResults(): PromisePool<T, true>;
/**
* Starts processing the promise pool by iterating over the items

@@ -98,3 +130,3 @@ * and running each item through the async `callback` function.

*/
process<ResultType, ErrorType = any>(callback: ProcessHandler<T, ResultType>): Promise<ReturnValue<T, ResultType, ErrorType>>;
process<ResultType, ErrorType = any>(callback: ProcessHandler<T, ResultType>): Promise<ReturnValue<T, ShouldUseCorrespondingResults extends true ? ResultType | symbol : ResultType, ErrorType>>;
}

@@ -12,3 +12,5 @@ 'use strict';

constructor(items) {
this.timeout = undefined;
this.concurrency = 10;
this.shouldResultsCorrespond = false;
this.items = items !== null && items !== void 0 ? items : [];

@@ -41,2 +43,23 @@ this.errorHandler = undefined;

/**
* Set the timeout in milliseconds for the pool handler.
*
* @param {Number} timeout
*
* @returns {PromisePool}
*/
withTaskTimeout(timeout) {
this.timeout = timeout;
return this;
}
/**
* Set the timeout in milliseconds for the pool handler.
*
* @param {Number} timeout
*
* @returns {PromisePool}
*/
static withTaskTimeout(timeout) {
return new this().withTaskTimeout(timeout);
}
/**
* Set the items to be processed in the promise pool.

@@ -49,3 +72,5 @@ *

for(items) {
return new PromisePool(items).withConcurrency(this.concurrency);
return typeof this.timeout === 'number'
? new PromisePool(items).withConcurrency(this.concurrency).withTaskTimeout(this.timeout)
: new PromisePool(items).withConcurrency(this.concurrency);
}

@@ -85,8 +110,8 @@ /**

/**
* Assign the given callback `handler` function to run when a task finished.
*
* @param {OnProgressCallback<T>} handler
*
* @returns {PromisePool}
*/
* Assign the given callback `handler` function to run when a task finished.
*
* @param {OnProgressCallback<T>} handler
*
* @returns {PromisePool}
*/
onTaskFinished(handler) {

@@ -97,2 +122,9 @@ this.onTaskFinishedHandlers.push(handler);

/**
* Assign whether to keep corresponding results between source items and resulting tasks.
*/
useCorrespondingResults() {
this.shouldResultsCorrespond = true;
return this;
}
/**
* Starts processing the promise pool by iterating over the items

@@ -108,2 +140,4 @@ * and running each item through the async `callback` function.

.useConcurrency(this.concurrency)
.useCorrespondingResults(this.shouldResultsCorrespond)
.withTaskTimeout(this.timeout)
.withHandler(callback)

@@ -118,1 +152,3 @@ .handleError(this.errorHandler)

exports.PromisePool = PromisePool;
PromisePool.notRun = Symbol('notRun');
PromisePool.failed = Symbol('failed');
{
"name": "@supercharge/promise-pool",
"description": "Map-like, concurrent promise processing for Node.js",
"version": "2.3.2",
"version": "2.4.0",
"author": "Marcus Pöhls <marcus@superchargejs.com>",

@@ -10,8 +10,8 @@ "bugs": {

"devDependencies": {
"@supercharge/eslint-config-typescript": "~2.3.0",
"@supercharge/eslint-config-typescript": "~2.3.3",
"@supercharge/tsconfig": "~1.0.0",
"c8": "~7.12.0",
"eslint": "~8.21.0",
"eslint": "~8.33.0",
"expect": "~28.1.3",
"typescript": "~4.7.4",
"typescript": "~4.9.5",
"uvu": "~0.5.6"

@@ -18,0 +18,0 @@ },

@@ -50,3 +50,3 @@ <div align="center">

```js
const { PromisePool } = require('@supercharge/promise-pool')
import { PromisePool } from '@supercharge/promise-pool'

@@ -98,3 +98,3 @@ const users = [

```js
const { PromisePool } = require('@supercharge/promise-pool')
import { PromisePool } from '@supercharge/promise-pool'

@@ -124,3 +124,3 @@ await PromisePool

```js
const { PromisePool } = require('@supercharge/promise-pool')
import { PromisePool } from '@supercharge/promise-pool'

@@ -159,2 +159,3 @@ try {

## Callback for Started and Finished Tasks

@@ -165,3 +166,3 @@ You can use the `onTaskStarted` and `onTaskFinished` methods to hook into the processing of tasks. The provided callback for each method will be called when a task started/finished processing:

```js
const { PromisePool } = require('@supercharge/promise-pool')
import { PromisePool } from '@supercharge/promise-pool'

@@ -173,3 +174,3 @@ await PromisePool

console.log(`Active tasks: ${pool.processedItems().length}`)
console.log(`Active tasks: ${pool.activeTasksCount()}`);
console.log(`Active tasks: ${pool.activeTasksCount()}`)
console.log(`Finished tasks: ${pool.processedItems().length}`)

@@ -189,3 +190,3 @@ console.log(`Finished tasks: ${pool.processedCount()}`)

```js
const { PromisePool } = require('@supercharge/promise-pool')
import { PromisePool } from '@supercharge/promise-pool'

@@ -204,2 +205,84 @@ await PromisePool

## Task Timeouts
Sometimes it’s useful to configure a timeout in which a task must finish processing. A task that times out is marked as failed. You may use the `withTaskTimeout(<milliseconds>)` method to configure a task’s timeout:
```js
import { PromisePool } from '@supercharge/promise-pool'
await PromisePool
.for(users)
.withTaskTimeout(2000) // milliseconds
.process(async (user, index, pool) => {
// processes the `user` data
})
```
**Notice:** a configured timeout is configured for each task, not for the whole pool. The example configures a 2-second timeout for each task in the pool.
## Correspond Source Items and Their Results
Sometimes you want the processed results to align with your source items. The resulting items should have the same position in the `results` array as their related source items. Use the `useCorrespondingResults` method to apply this behavior:
```js
import { setTimeout } from 'node:timers/promises'
import { PromisePool } from '@supercharge/promise-pool'
const { results } = await PromisePool
.for([1, 2, 3])
.withConcurrency(5)
.useCorrespondingResults()
.process(async (number, index) => {
const value = number * 2
return await setTimeout(10 - index, value)
})
/**
* source array: [1, 2, 3]
* result array: [2, 4 ,6]
* --> result values match the position of their source items
*/
```
For example, you may have three items you want to process. Using corresponding results ensures that the processed result for the first item from the source array is located at the first position in the result array (=index `0`). The result for the second item from the source array is placed at the second position in the result array, and so on …
### Return Values When Using Corresponding Results
The `results` array returned by the promise pool after processing has a mixed return type. Each returned item is one of this type:
- the actual value type: for results that successfully finished processing
- `Symbol('notRun')`: for tasks that didn’t run
- `Symbol('failed')`: for tasks that failed processing
The `PromisePool` exposes both symbols and you may access them using
- `Symbol('notRun')`: exposed as `PromisePool.notRun`
- `Symbol('failed')`: exposed as `PromisePool.failed`
You may repeat processing for all tasks that didn’t run or failed:
```js
import { PromisePool } from '@supercharge/promise-pool'
const { results, errors } = await PromisePool
.for([1, 2, 3])
.withConcurrency(5)
.useCorrespondingResults()
.process(async (number) => {
// …
})
const itemsNotRun = results.filter(result => {
return result === PromisePool.notRun
})
const failedItems = results.filter(result => {
return result === PromisePool.failed
})
```
When using corresponding results, you need to go through the `errors` array yourself. The default error handling (collect errors) stays the same and you can follow the described error handling section above.
## Contributing

@@ -206,0 +289,0 @@

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc