Comparing version 5.1.0 to 5.2.0
@@ -12,4 +12,8 @@ export interface Options { | ||
/** | ||
When set to `false`, instead of stopping when a promise rejects, it will wait for all the promises to settle and then reject with an [aggregated error](https://github.com/sindresorhus/aggregate-error) containing all the errors from the rejected promises. | ||
When `true`, the first mapper rejection will be rejected back to the consumer. | ||
When `false`, instead of stopping when a promise rejects, it will wait for all the promises to settle and then reject with an [aggregated error](https://github.com/sindresorhus/aggregate-error) containing all the errors from the rejected promises. | ||
Caveat: When `true`, any already-started async mappers will continue to run until they resolve or reject. In the case of infinite concurrency with sync iterables, *all* mappers are invoked on startup and will continue after the first rejection. [Issue #51](https://github.com/sindresorhus/p-map/issues/51) can be implemented for abort control. | ||
@default true | ||
@@ -32,3 +36,3 @@ */ | ||
/** | ||
@param input - Iterated over concurrently in the `mapper` function. | ||
@param input - Synchronous or asynchronous iterable that is iterated over concurrently, calling the `mapper` function for each element. Each iterated item is `await`'d before the `mapper` is invoked so the iterable may return a `Promise` that resolves to an item. Asynchronous iterables (different from synchronous iterables that return `Promise` that resolves to an item) can be used when the next item may not be ready without waiting for an asynchronous process to complete and/or the end of the iterable may be reached after the asynchronous process completes. For example, reading from a remote queue when the queue has reached empty, or reading lines from a stream. | ||
@param mapper - Function which is called for every item in `input`. Expected to return a `Promise` or value. | ||
@@ -60,3 +64,3 @@ @returns A `Promise` that is fulfilled when all promises in `input` and ones returned from `mapper` are fulfilled, or rejects if any of the promises reject. The fulfilled value is an `Array` of the fulfilled values returned from `mapper` in `input` order. | ||
export default function pMap<Element, NewElement>( | ||
input: Iterable<Element>, | ||
input: AsyncIterable<Element | Promise<Element>> | Iterable<Element | Promise<Element>>, | ||
mapper: Mapper<Element, NewElement>, | ||
@@ -86,3 +90,3 @@ options?: Options | ||
} catch { | ||
return pMapSkip | ||
return pMapSkip; | ||
} | ||
@@ -89,0 +93,0 @@ }; |
74
index.js
@@ -11,3 +11,7 @@ import AggregateError from 'aggregate-error'; | ||
) { | ||
return new Promise((resolve, reject) => { | ||
return new Promise((resolve, reject_) => { // eslint-disable-line promise/param-names | ||
if (iterable[Symbol.iterator] === undefined && iterable[Symbol.asyncIterator] === undefined) { | ||
throw new TypeError(`Expected \`input\` to be either an \`Iterable\` or \`AsyncIterable\`, got (${typeof iterable})`); | ||
} | ||
if (typeof mapper !== 'function') { | ||
@@ -24,24 +28,40 @@ throw new TypeError('Mapper function is required'); | ||
const skippedIndexes = []; | ||
const iterator = iterable[Symbol.iterator](); | ||
let isRejected = false; | ||
let isResolved = false; | ||
let isIterableDone = false; | ||
let resolvingCount = 0; | ||
let currentIndex = 0; | ||
const iterator = iterable[Symbol.iterator] === undefined ? iterable[Symbol.asyncIterator]() : iterable[Symbol.iterator](); | ||
const next = () => { | ||
if (isRejected) { | ||
const reject = reason => { | ||
isRejected = true; | ||
isResolved = true; | ||
reject_(reason); | ||
}; | ||
const next = async () => { | ||
if (isResolved) { | ||
return; | ||
} | ||
const nextItem = iterator.next(); | ||
const nextItem = await iterator.next(); | ||
const index = currentIndex; | ||
currentIndex++; | ||
// Note: `iterator.next()` can be called many times in parallel. | ||
// This can cause multiple calls to this `next()` function to | ||
// receive a `nextItem` with `done === true`. | ||
// The shutdown logic that rejects/resolves must be protected | ||
// so it runs only one time as the `skippedIndex` logic is | ||
// non-idempotent. | ||
if (nextItem.done) { | ||
isIterableDone = true; | ||
if (resolvingCount === 0) { | ||
if (resolvingCount === 0 && !isResolved) { | ||
if (!stopOnError && errors.length > 0) { | ||
reject(new AggregateError(errors)); | ||
} else { | ||
isResolved = true; | ||
for (const skippedIndex of skippedIndexes) { | ||
@@ -60,2 +80,3 @@ result.splice(skippedIndex, 1); | ||
// Intentionally detached | ||
(async () => { | ||
@@ -65,3 +86,3 @@ try { | ||
if (isRejected) { | ||
if (isResolved) { | ||
return; | ||
@@ -71,2 +92,3 @@ } | ||
const value = await mapper(element, index); | ||
if (value === pMapSkip) { | ||
@@ -79,6 +101,5 @@ skippedIndexes.push(index); | ||
resolvingCount--; | ||
next(); | ||
await next(); | ||
} catch (error) { | ||
if (stopOnError) { | ||
isRejected = true; | ||
reject(error); | ||
@@ -88,3 +109,12 @@ } else { | ||
resolvingCount--; | ||
next(); | ||
// In that case we can't really continue regardless of `stopOnError` state | ||
// since an iterable is likely to continue throwing after it throws once. | ||
// If we continue calling `next()` indefinitely we will likely end up | ||
// in an infinite loop of failed iteration. | ||
try { | ||
await next(); | ||
} catch (error) { | ||
reject(error); | ||
} | ||
} | ||
@@ -95,9 +125,23 @@ } | ||
for (let index = 0; index < concurrency; index++) { | ||
next(); | ||
// Create the concurrent runners in a detached (non-awaited) | ||
// promise. We need this so we can await the `next()` calls | ||
// to stop creating runners before hitting the concurrency limit | ||
// if the iterable has already been marked as done. | ||
// NOTE: We *must* do this for async iterators otherwise we'll spin up | ||
// infinite `next()` calls by default and never start the event loop. | ||
(async () => { | ||
for (let index = 0; index < concurrency; index++) { | ||
try { | ||
// eslint-disable-next-line no-await-in-loop | ||
await next(); | ||
} catch (error) { | ||
reject(error); | ||
break; | ||
} | ||
if (isIterableDone) { | ||
break; | ||
if (isIterableDone || isRejected) { | ||
break; | ||
} | ||
} | ||
} | ||
})(); | ||
}); | ||
@@ -104,0 +148,0 @@ } |
{ | ||
"name": "p-map", | ||
"version": "5.1.0", | ||
"version": "5.2.0", | ||
"description": "Map over promises concurrently", | ||
@@ -5,0 +5,0 @@ "license": "MIT", |
@@ -46,6 +46,8 @@ # p-map | ||
Type: `Iterable<Promise | unknown>` | ||
Type: `AsyncIterable<Promise<unknown> | unknown> | Iterable<Promise<unknown> | unknown>` | ||
Iterated over concurrently in the `mapper` function. | ||
Synchronous or asynchronous iterable that is iterated over concurrently, calling the `mapper` function for each element. Each iterated item is `await`'d before the `mapper` is invoked so the iterable may return a `Promise` that resolves to an item. | ||
Asynchronous iterables (different from synchronous iterables that return `Promise` that resolves to an item) can be used when the next item may not be ready without waiting for an asynchronous process to complete and/or the end of the iterable may be reached after the asynchronous process completes. For example, reading from a remote queue when the queue has reached empty, or reading lines from a stream. | ||
#### mapper(element, index) | ||
@@ -74,4 +76,8 @@ | ||
When set to `false`, instead of stopping when a promise rejects, it will wait for all the promises to settle and then reject with an [aggregated error](https://github.com/sindresorhus/aggregate-error) containing all the errors from the rejected promises. | ||
When `true`, the first mapper rejection will be rejected back to the consumer. | ||
When `false`, instead of stopping when a promise rejects, it will wait for all the promises to settle and then reject with an [aggregated error](https://github.com/sindresorhus/aggregate-error) containing all the errors from the rejected promises. | ||
Caveat: When `true`, any already-started async mappers will continue to run until they resolve or reject. In the case of infinite concurrency with sync iterables, *all* mappers are invoked on startup and will continue after the first rejection. [Issue #51](https://github.com/sindresorhus/p-map/issues/51) can be implemented for abort control. | ||
### pMapSkip | ||
@@ -97,3 +103,3 @@ | ||
} catch { | ||
return pMapSkip | ||
return pMapSkip; | ||
} | ||
@@ -100,0 +106,0 @@ }; |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
14274
199
126