@supercharge/promise-pool
Advanced tools
Comparing version 2.4.0 to 3.0.0
@@ -49,1 +49,2 @@ export interface UsesConcurrency { | ||
export type OnProgressCallback<T> = (item: T, pool: Stoppable & Statistics<T> & UsesConcurrency) => void; | ||
export type SomeIterable<T> = T[] | Iterable<T> | AsyncIterable<T>; |
@@ -17,3 +17,5 @@ 'use strict'; | ||
this.message = this.messageFrom(error); | ||
Error.captureStackTrace(this, this.constructor); | ||
if (Error.captureStackTrace && typeof Error.captureStackTrace === 'function') { | ||
Error.captureStackTrace(this, this.constructor); | ||
} | ||
} | ||
@@ -20,0 +22,0 @@ /** |
import { ReturnValue } from './return-value'; | ||
import { PromisePoolError } from './promise-pool-error'; | ||
import { ProcessHandler, OnProgressCallback, Statistics, Stoppable, UsesConcurrency } from './contracts'; | ||
import { ProcessHandler, OnProgressCallback, Statistics, Stoppable, UsesConcurrency, SomeIterable } from './contracts'; | ||
export declare class PromisePoolExecutor<T, R> implements UsesConcurrency, Stoppable, Statistics<T> { | ||
@@ -8,3 +8,3 @@ /** | ||
*/ | ||
private meta; | ||
private readonly meta; | ||
/** | ||
@@ -79,11 +79,11 @@ * The async processing function receiving each item from the `items` array. | ||
*/ | ||
for(items: T[]): this; | ||
for(items: SomeIterable<T>): this; | ||
/** | ||
* Returns the list of items to process. | ||
* | ||
* @returns {T[]} | ||
* @returns {T[] | Iterable<T> | AsyncIterable<T>} | ||
*/ | ||
items(): T[]; | ||
items(): SomeIterable<T>; | ||
/** | ||
* Returns the number of items to process. | ||
* Returns the number of items to process, or `NaN` if items are not an array. | ||
* | ||
@@ -126,3 +126,3 @@ * @returns {Number} | ||
/** | ||
* Returns the percentage progress of items that have been processed. | ||
* Returns the percentage progress of items that have been processed, or `NaN` if items is not an array. | ||
*/ | ||
@@ -216,2 +216,3 @@ processedPercentage(): number; | ||
validateInputs(): this; | ||
private areItemsValid; | ||
/** | ||
@@ -218,0 +219,0 @@ * Prefill the results array with `notRun` symbol values if results should correspond. |
@@ -105,3 +105,3 @@ 'use strict'; | ||
* | ||
* @returns {T[]} | ||
* @returns {T[] | Iterable<T> | AsyncIterable<T>} | ||
*/ | ||
@@ -112,3 +112,3 @@ items() { | ||
/** | ||
* Returns the number of items to process. | ||
* Returns the number of items to process, or `NaN` if items are not an array. | ||
* | ||
@@ -118,3 +118,4 @@ * @returns {Number} | ||
itemsCount() { | ||
return this.items().length; | ||
const items = this.items(); | ||
return Array.isArray(items) ? items.length : NaN; | ||
} | ||
@@ -164,3 +165,3 @@ /** | ||
/** | ||
* Returns the percentage progress of items that have been processed. | ||
* Returns the percentage progress of items that have been processed, or `NaN` if items is not an array. | ||
*/ | ||
@@ -296,4 +297,4 @@ processedPercentage() { | ||
} | ||
if (!Array.isArray(this.items())) { | ||
throw validation_error_1.ValidationError.createFrom(`"items" must be an array. Received "${typeof this.items()}"`); | ||
if (!this.areItemsValid()) { | ||
throw validation_error_1.ValidationError.createFrom(`"items" must be an array, an iterable or an async iterable. Received "${typeof this.items()}"`); | ||
} | ||
@@ -315,2 +316,12 @@ if (this.errorHandler && typeof this.errorHandler !== 'function') { | ||
} | ||
areItemsValid() { | ||
const items = this.items(); | ||
if (Array.isArray(items)) | ||
return true; | ||
if (typeof items[Symbol.iterator] === 'function') | ||
return true; | ||
if (typeof items[Symbol.asyncIterator] === 'function') | ||
return true; | ||
return false; | ||
} | ||
/** | ||
@@ -320,5 +331,8 @@ * Prefill the results array with `notRun` symbol values if results should correspond. | ||
prepareResultsArray() { | ||
if (this.shouldUseCorrespondingResults()) { | ||
this.meta.results = Array(this.items().length).fill(promise_pool_1.PromisePool.notRun); | ||
} | ||
const items = this.items(); | ||
if (!Array.isArray(items)) | ||
return this; | ||
if (!this.shouldUseCorrespondingResults()) | ||
return this; | ||
this.meta.results = Array(items.length).fill(promise_pool_1.PromisePool.notRun); | ||
return this; | ||
@@ -335,8 +349,15 @@ } | ||
async process() { | ||
for (const [index, item] of this.items().entries()) { | ||
let index = 0; | ||
for await (const item of this.items()) { | ||
if (this.isStopped()) { | ||
break; | ||
} | ||
if (this.shouldUseCorrespondingResults()) { | ||
this.results()[index] = promise_pool_1.PromisePool.notRun; | ||
} | ||
this.startProcessing(item, index); | ||
index += 1; | ||
// don't consume the next item from iterable | ||
// until there's a free slot for a new task | ||
await this.waitForProcessingSlot(); | ||
this.startProcessing(item, index); | ||
} | ||
@@ -485,5 +506,4 @@ return await this.drained(); | ||
async runErrorHandlerFor(processingError, item) { | ||
var _a; | ||
try { | ||
await ((_a = this.errorHandler) === null || _a === void 0 ? void 0 : _a.call(this, processingError, item, this)); | ||
await this.errorHandler?.(processingError, item, this); | ||
} | ||
@@ -490,0 +510,0 @@ catch (error) { |
import { ReturnValue } from './return-value'; | ||
import { ErrorHandler, ProcessHandler, OnProgressCallback } from './contracts'; | ||
import { ErrorHandler, ProcessHandler, OnProgressCallback, SomeIterable } from './contracts'; | ||
export declare class PromisePool<T, ShouldUseCorrespondingResults extends boolean = false> { | ||
@@ -41,3 +41,3 @@ /** | ||
*/ | ||
constructor(items?: T[]); | ||
constructor(items?: SomeIterable<T>); | ||
/** | ||
@@ -78,15 +78,15 @@ * Set the number of tasks to process concurrently in the promise pool. | ||
* | ||
* @param {T[]} items | ||
* @param {T[] | Iterable<T> | AsyncIterable<T>} items | ||
* | ||
* @returns {PromisePool} | ||
*/ | ||
for<T>(items: T[]): PromisePool<T>; | ||
for<T>(items: SomeIterable<T>): PromisePool<T>; | ||
/** | ||
* Set the items to be processed in the promise pool. | ||
* | ||
* @param {T[]} items | ||
* @param {T[] | Iterable<T> | AsyncIterable<T>} items | ||
* | ||
* @returns {PromisePool} | ||
*/ | ||
static for<T>(items: T[]): PromisePool<T>; | ||
static for<T>(items: SomeIterable<T>): PromisePool<T>; | ||
/** | ||
@@ -93,0 +93,0 @@ * Set the error handler function to execute when an error occurs. |
@@ -15,3 +15,3 @@ 'use strict'; | ||
this.shouldResultsCorrespond = false; | ||
this.items = items !== null && items !== void 0 ? items : []; | ||
this.items = items ?? []; | ||
this.errorHandler = undefined; | ||
@@ -66,3 +66,3 @@ this.onTaskStartedHandlers = []; | ||
* | ||
* @param {T[]} items | ||
* @param {T[] | Iterable<T> | AsyncIterable<T>} items | ||
* | ||
@@ -79,3 +79,3 @@ * @returns {PromisePool} | ||
* | ||
* @param {T[]} items | ||
* @param {T[] | Iterable<T> | AsyncIterable<T>} items | ||
* | ||
@@ -82,0 +82,0 @@ * @returns {PromisePool} |
@@ -12,3 +12,5 @@ 'use strict'; | ||
super(message); | ||
Error.captureStackTrace(this, this.constructor); | ||
if (Error.captureStackTrace && typeof Error.captureStackTrace === 'function') { | ||
Error.captureStackTrace(this, this.constructor); | ||
} | ||
} | ||
@@ -15,0 +17,0 @@ /** |
{ | ||
"name": "@supercharge/promise-pool", | ||
"description": "Map-like, concurrent promise processing for Node.js", | ||
"version": "2.4.0", | ||
"version": "3.0.0", | ||
"author": "Marcus Pöhls <marcus@superchargejs.com>", | ||
@@ -10,8 +10,8 @@ "bugs": { | ||
"devDependencies": { | ||
"@supercharge/eslint-config-typescript": "~2.3.3", | ||
"@supercharge/tsconfig": "~1.0.0", | ||
"c8": "~7.12.0", | ||
"eslint": "~8.33.0", | ||
"expect": "~28.1.3", | ||
"typescript": "~4.9.5", | ||
"@supercharge/eslint-config-typescript": "~3.0.0", | ||
"@supercharge/tsconfig": "~4.0.0", | ||
"c8": "~8.0.0", | ||
"eslint": "~8.44.0", | ||
"expect": "~29.6.1", | ||
"typescript": "~5.1.6", | ||
"uvu": "~0.5.6" | ||
@@ -18,0 +18,0 @@ }, |
53964
1424