@supercharge/promise-pool
Advanced tools
Comparing version 1.5.0 to 1.6.0
# Changelog | ||
## [1.6.0](https://github.com/supercharge/streams/compare/v1.5.0...v1.6.0) - 2020-11-03 | ||
### Added | ||
- `.handleError(handler)` method: aka “bring your own error handling”. This allows you to take over error handling from the pool. If you impelement the `.handleError` method, the pool won’t collect errors anymore. It puts error handling in your hands. | ||
### Updated | ||
- bump dependencies | ||
### Fixed | ||
- failed tasks are handled properly now and the pool ensures the concurrency limit. Before, the pool started to process all items as soon as one failed | ||
## [1.5.0](https://github.com/supercharge/streams/compare/v1.4.0...v1.5.0) - 2020-09-20 | ||
@@ -5,0 +17,0 @@ |
@@ -24,2 +24,6 @@ import { ReturnValue } from './return-value'; | ||
/** | ||
* The async error handling function. | ||
*/ | ||
private errorHandler?; | ||
/** | ||
* The list of errors. | ||
@@ -55,4 +59,12 @@ */ | ||
*/ | ||
withHandler(action: (item: T) => R): this; | ||
withHandler(action: (item: T) => R | Promise<R>): this; | ||
/** | ||
* Set the error handler function to execute when an error occurs. | ||
* | ||
* @param {Function} handler | ||
* | ||
* @returns {PromisePoolExecutor} | ||
*/ | ||
handleError(handler?: (error: Error, item: T) => Promise<void> | void): this; | ||
/** | ||
* Determines whether the number of active tasks is greater or equal to the concurrency limit. | ||
@@ -59,0 +71,0 @@ * |
@@ -17,2 +17,3 @@ 'use strict'; | ||
this.handler = () => { }; | ||
this.errorHandler = undefined; | ||
} | ||
@@ -56,2 +57,14 @@ /** | ||
/** | ||
* Set the error handler function to execute when an error occurs. | ||
* | ||
* @param {Function} handler | ||
* | ||
* @returns {PromisePoolExecutor} | ||
*/ | ||
handleError(handler) { | ||
return goodies_1.tap(this, () => { | ||
this.errorHandler = handler; | ||
}); | ||
} | ||
/** | ||
* Determines whether the number of active tasks is greater or equal to the concurrency limit. | ||
@@ -97,2 +110,7 @@ * | ||
} | ||
if (this.errorHandler) { | ||
if (typeof this.errorHandler !== 'function') { | ||
throw new Error(`The error handler must be a function. Received ${typeof this.errorHandler}`); | ||
} | ||
} | ||
} | ||
@@ -143,2 +161,6 @@ /** | ||
.catch(error => { | ||
this.tasks.splice(this.tasks.indexOf(task), 1); | ||
if (this.errorHandler) { | ||
return this.errorHandler(error, item); | ||
} | ||
this.errors.push(promise_pool_error_1.PromisePoolError.createFrom(error, item)); | ||
@@ -145,0 +167,0 @@ }); |
@@ -13,2 +13,6 @@ import { ReturnValue } from './return-value'; | ||
/** | ||
* The error handler callback function | ||
*/ | ||
private errorHandler?; | ||
/** | ||
* Instantiates a new promise pool with a default `concurrency: 10` and `items: []`. | ||
@@ -52,2 +56,10 @@ * | ||
/** | ||
* Set the error handler function to execute when an error occurs. | ||
* | ||
* @param {Function} handler | ||
* | ||
* @returns {PromisePool} | ||
*/ | ||
handleError(handler: (error: Error, item: T) => Promise<void> | void): PromisePool<T>; | ||
/** | ||
* Starts processing the promise pool by iterating over the items | ||
@@ -60,3 +72,3 @@ * and running each item through the async `callback` function. | ||
*/ | ||
process<R>(callback: (item: T) => Promise<R>): Promise<ReturnValue<T, R>>; | ||
process<R>(callback: (item: T) => R | Promise<R>): Promise<ReturnValue<T, R>>; | ||
} |
@@ -15,2 +15,3 @@ 'use strict'; | ||
this.concurrency = 10; | ||
this.errorHandler = undefined; | ||
} | ||
@@ -65,2 +66,22 @@ /** | ||
} | ||
/** | ||
* Set the error handler function to execute when an error occurs. | ||
* | ||
* @param {Function} handler | ||
* | ||
* @returns {PromisePool} | ||
*/ | ||
handleError(handler) { | ||
return goodies_1.tap(this, () => { | ||
this.errorHandler = handler; | ||
}); | ||
} | ||
/** | ||
* Starts processing the promise pool by iterating over the items | ||
* and running each item through the async `callback` function. | ||
* | ||
* @param {Function} The async processing function receiving each item from the `items` array. | ||
* | ||
* @returns Promise<{ results, errors }> | ||
*/ | ||
async process(callback) { | ||
@@ -70,2 +91,3 @@ return new promise_pool_executor_1.PromisePoolExecutor() | ||
.withHandler(callback) | ||
.handleError(this.errorHandler) | ||
.for(this.items) | ||
@@ -72,0 +94,0 @@ .start(); |
{ | ||
"name": "@supercharge/promise-pool", | ||
"description": "Map-like, concurrent promise processing for Node.js", | ||
"version": "1.5.0", | ||
"version": "1.6.0", | ||
"author": "Marcus Pöhls <marcus@futurestud.io>", | ||
@@ -14,14 +14,14 @@ "bugs": { | ||
"@supercharge/tsconfig": "~1.0.0", | ||
"@types/jest": "~26.0.14", | ||
"@typescript-eslint/eslint-plugin": "~4.1.1", | ||
"eslint": "~7.9.0", | ||
"eslint-config-standard": "~14.1.1", | ||
"@types/jest": "~26.0.15", | ||
"@typescript-eslint/eslint-plugin": "~4.6.1", | ||
"eslint": "~7.12.1", | ||
"eslint-config-standard": "~16.0.1", | ||
"eslint-config-standard-with-typescript": "~19.0.1", | ||
"eslint-plugin-import": "~2.22.0", | ||
"eslint-plugin-import": "~2.22.1", | ||
"eslint-plugin-node": "~11.1.0", | ||
"eslint-plugin-promise": "~4.2.1", | ||
"eslint-plugin-standard": "~4.0.1", | ||
"jest": "~26.4.2", | ||
"eslint-plugin-standard": "~4.0.2", | ||
"jest": "~26.6.2", | ||
"jest-extended": "~0.11.5", | ||
"typescript": "~4.0.3" | ||
"typescript": "~4.0.5" | ||
}, | ||
@@ -28,0 +28,0 @@ "engines": { |
@@ -77,3 +77,43 @@ <div align="center"> | ||
### Bring Your Own Error Handling | ||
The promise pool allows for custom error handling. You can take over the error handling by implementing an error handler using the `.handleError(handler)`. | ||
> If you provide an error handler, the promise pool doesn’t collect any errors. You must then collect errors yourself. | ||
Providing a custom error handler allows you to exit the promise pool early by throwing inside the error handler function. Throwing errors is in line with Node.js error handling using async/await. | ||
```js | ||
try { | ||
const errors = [] | ||
const { results } = await PromisePool | ||
.for(users) | ||
.withConcurrency(4) | ||
.handleError(async (error, user) => { | ||
if (error instanceof ValidationError) { | ||
errors.push(error) // you must collect errors yourself | ||
return | ||
} | ||
if (error instanceof ThrottleError) { // Execute error handling on specific errors | ||
await retryUser(user) | ||
return | ||
} | ||
throw error // Uncaught errors will immediately stop PromisePool | ||
}) | ||
.process(async data => { | ||
// the harder you work for something, | ||
// the greater you’ll feel when you achieve it | ||
}) | ||
await handleCollected(errors) // this may throw | ||
return { results } | ||
} catch (error) { | ||
await handleThrown(error) | ||
} | ||
``` | ||
## Contributing | ||
@@ -80,0 +120,0 @@ |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
25464
596
135