buffered-async-iterable
Advanced tools
Comparing version 0.3.0 to 1.0.0
@@ -0,3 +1,7 @@ | ||
export function mergeIterables<T>(input: (AsyncIterable<T> | Iterable<T> | T[])[], { bufferSize }?: { | ||
bufferSize?: number | undefined; | ||
} | undefined): AsyncIterable<T>; | ||
export function bufferedAsyncMap<T, R>(input: AsyncIterable<T> | Iterable<T> | T[], callback: (item: T) => Promise<R> | AsyncIterable<R>, options?: { | ||
bufferSize?: number | undefined; | ||
ordered?: boolean | undefined; | ||
} | undefined): AsyncIterableIterator<R> & { | ||
@@ -4,0 +8,0 @@ return: (value?: any) => Promise<IteratorResult<R, any>>; |
96
index.js
@@ -8,15 +8,32 @@ /* eslint-disable promise/prefer-await-to-then */ | ||
// TODO: THERE'S ACTUALLY A "throw" method MENTION IN https://tc39.es/ecma262/#sec-generator-function-definitions-runtime-semantics-evaluation: "NOTE: Exceptions from the inner iterator throw method are propagated. Normal completions from an inner throw method are processed similarly to an inner next." THOUGH NOT SURE HOW TO TRIGGER IT IN PRACTICE, SEE yield.spec.js | ||
// TODO: Have option to persist order? To not use Promise.race()? | ||
// TODO: Make a proper merge for async iterables by accepting multiple input iterables, see: https://twitter.com/matteocollina/status/1392056092482576385 | ||
import { findLeastTargeted } from './lib/find-least-targeted.js'; | ||
import { makeIterableAsync } from './lib/misc.js'; | ||
import { isAsyncIterable, isIterable, isPartOfSet } from './lib/type-checks.js'; | ||
import { arrayDeleteInPlace, makeIterableAsync } from './lib/misc.js'; | ||
import { isAsyncIterable, isIterable, isPartOfArray } from './lib/type-checks.js'; | ||
/** | ||
* @template T | ||
* @param {AsyncIterable<T> | Iterable<T> | T[]} item | ||
* @returns {AsyncIterable<T>} | ||
*/ | ||
async function * yieldIterable (item) { | ||
yield * item; | ||
} | ||
/** | ||
* @template T | ||
* @param {Array<AsyncIterable<T> | Iterable<T> | T[]>} input | ||
* @param {{ bufferSize?: number|undefined }} [options] | ||
* @returns {AsyncIterable<T>} | ||
*/ | ||
export async function * mergeIterables (input, { bufferSize } = {}) { | ||
yield * bufferedAsyncMap(input, yieldIterable, { bufferSize }); | ||
} | ||
/** | ||
* @template T | ||
* @template R | ||
* @param {AsyncIterable<T> | Iterable<T> | T[]} input | ||
* @param {(item: T) => (Promise<R>|AsyncIterable<R>)} callback | ||
* @param {{ bufferSize?: number|undefined }} [options] | ||
* @param {{ bufferSize?: number|undefined, ordered?: boolean|undefined }} [options] | ||
* @returns {AsyncIterableIterator<R> & { return: NonNullable<AsyncIterableIterator<R>["return"]>, throw: NonNullable<AsyncIterableIterator<R>["throw"]> }} | ||
@@ -28,2 +45,3 @@ */ | ||
bufferSize = 6, | ||
ordered = false, | ||
} = options || {}; | ||
@@ -44,7 +62,7 @@ | ||
/** @type {Set<AsyncIterator<R, unknown>>} */ | ||
const subIterators = new Set(); | ||
/** @type {AsyncIterator<R, unknown>[]} */ | ||
const subIterators = []; | ||
/** @type {Set<BufferPromise>} */ | ||
const bufferedPromises = new Set(); | ||
/** @type {BufferPromise[]} */ | ||
const bufferedPromises = []; | ||
@@ -82,4 +100,4 @@ /** @type {WeakMap<BufferPromise, AsyncIterator<T>|AsyncIterator<R>>} */ | ||
// TODO: Could we use an AbortController to improve this? See eg. https://github.com/mcollina/hwp/pull/10 | ||
bufferedPromises.clear(); | ||
subIterators.clear(); | ||
bufferedPromises.splice(0, bufferedPromises.length); | ||
subIterators.splice(0, subIterators.length); | ||
@@ -97,11 +115,17 @@ if (throwAnyError && hasError) { | ||
// Check which iterator that has the least amount of queued promises right now | ||
const iterator = findLeastTargeted( | ||
mainReturnedDone ? subIterators : [...subIterators, asyncIterator], | ||
bufferedPromises, | ||
promisesToSourceIteratorMap | ||
); | ||
/** @type {AsyncIterator<R, unknown>|undefined} */ | ||
let currentSubIterator; | ||
const currentSubIterator = isPartOfSet(iterator, subIterators) ? iterator : undefined; | ||
if (ordered) { | ||
currentSubIterator = subIterators[0]; | ||
} else { | ||
const iterator = findLeastTargeted( | ||
mainReturnedDone ? subIterators : [...subIterators, asyncIterator], | ||
bufferedPromises, | ||
promisesToSourceIteratorMap | ||
); | ||
currentSubIterator = isPartOfArray(iterator, subIterators) ? iterator : undefined; | ||
} | ||
/** @type {BufferPromise} */ | ||
@@ -118,3 +142,3 @@ const bufferPromise = currentSubIterator | ||
if ('err' in result || result.done) { | ||
subIterators.delete(currentSubIterator); | ||
arrayDeleteInPlace(subIterators, currentSubIterator); | ||
} | ||
@@ -183,5 +207,16 @@ | ||
promisesToSourceIteratorMap.set(bufferPromise, currentSubIterator || asyncIterator); | ||
bufferedPromises.add(bufferPromise); | ||
if (bufferedPromises.size < bufferSize) { | ||
if (ordered && currentSubIterator) { | ||
let i = 0; | ||
while (promisesToSourceIteratorMap.get(/** @type {BufferPromise} */ (bufferedPromises[i])) === currentSubIterator) { | ||
i += 1; | ||
} | ||
bufferedPromises.splice(i, 0, bufferPromise); | ||
} else { | ||
bufferedPromises.push(bufferPromise); | ||
} | ||
if (bufferedPromises.length < bufferSize) { | ||
fillQueue(); | ||
@@ -193,8 +228,13 @@ } | ||
const nextValue = async () => { | ||
if (bufferedPromises.size === 0) return markAsEnded(true); | ||
const nextBufferedPromise = bufferedPromises[0]; | ||
if (!nextBufferedPromise) return markAsEnded(true); | ||
if (isDone) return { done: true, value: undefined }; | ||
/** @type {Awaited<BufferPromise>} */ | ||
const resolvedPromise = await (ordered ? nextBufferedPromise : Promise.race(bufferedPromises)); | ||
arrayDeleteInPlace(bufferedPromises, resolvedPromise.bufferPromise); | ||
// Wait for some of the current promises to be finished | ||
const { | ||
bufferPromise, | ||
done, | ||
@@ -205,6 +245,4 @@ err, | ||
value, | ||
} = await Promise.race(bufferedPromises); | ||
} = resolvedPromise; | ||
bufferedPromises.delete(bufferPromise); | ||
// We are mandated by the spec to always do this return if the iterator is done | ||
@@ -218,7 +256,7 @@ if (isDone) { | ||
if (fromSubIterator || subIterators.size !== 0) { | ||
if (fromSubIterator || subIterators.length > 0) { | ||
fillQueue(); | ||
} | ||
return bufferedPromises.size === 0 | ||
return bufferedPromises.length === 0 | ||
? markAsEnded(true) | ||
@@ -228,3 +266,3 @@ : nextValue(); | ||
// TODO: Handle possible error here? Or too obscure? | ||
subIterators.add(value[Symbol.asyncIterator]()); | ||
subIterators.unshift(value[Symbol.asyncIterator]()); | ||
fillQueue(); | ||
@@ -231,0 +269,0 @@ return nextValue(); |
@@ -1,2 +0,2 @@ | ||
export function findLeastTargeted<Target, Item extends object>(targets: Iterable<Target> | Target[], items: Set<Item>, itemTargets: WeakMap<Item, Target>): Target | undefined; | ||
export function findLeastTargeted<Target, Item extends object>(targets: Iterable<Target> | Target[], items: Iterable<Item> | Item[], itemTargets: WeakMap<Item, Target>): Target | undefined; | ||
//# sourceMappingURL=find-least-targeted.d.ts.map |
/** | ||
* @template Target | ||
* @template {object} Item | ||
* @param {Set<Item>} items | ||
* @param {Iterable<Item> | Item[]} items | ||
* @param {WeakMap<Item, Target>} itemTargets | ||
@@ -28,3 +28,3 @@ * @returns {Map<Target, number>} | ||
* @param {Iterable<Target> | Target[]} targets | ||
* @param {Set<Item>} items | ||
* @param {Iterable<Item> | Item[]} items | ||
* @param {WeakMap<Item, Target>} itemTargets | ||
@@ -31,0 +31,0 @@ * @returns {Target|undefined} |
export function makeIterableAsync<T>(input: Iterable<T> | T[]): AsyncIterable<T>; | ||
export function arrayDeleteInPlace<T>(list: T[], value: T): void; | ||
//# sourceMappingURL=misc.d.ts.map |
@@ -11,1 +11,15 @@ /** | ||
} | ||
/** | ||
* Similar to the .delete() on a set | ||
* | ||
* @template T | ||
* @param {T[]} list | ||
* @param {T} value | ||
*/ | ||
export function arrayDeleteInPlace (list, value) { | ||
const index = list.indexOf(value); | ||
if (index !== -1) { | ||
list.splice(index, 1); | ||
} | ||
} |
export function isIterable(value: unknown): value is Iterable<unknown>; | ||
export function isAsyncIterable(value: unknown): value is AsyncIterable<unknown>; | ||
export function isPartOfSet<SetValue>(value: unknown, set: Set<SetValue>): value is SetValue; | ||
export function isPartOfArray<Values>(value: unknown, list: Values[]): value is Values; | ||
//# sourceMappingURL=type-checks.d.ts.map |
@@ -14,7 +14,7 @@ /** | ||
/** | ||
* @template SetValue | ||
* @template Values | ||
* @param {unknown} value | ||
* @param {Set<SetValue>} set | ||
* @returns {value is SetValue} | ||
* @param {Values[]} list | ||
* @returns {value is Values} | ||
*/ | ||
export const isPartOfSet = (value, set) => set.has(/** @type {SetValue} */ (value)); | ||
export const isPartOfArray = (value, list) => list.includes(/** @type {Values} */ (value)); |
{ | ||
"name": "buffered-async-iterable", | ||
"version": "0.3.0", | ||
"version": "1.0.0", | ||
"description": "Creates a prefetched async iterable", | ||
@@ -50,3 +50,3 @@ "homepage": "http://github.com/voxpelli/buffered-async-iterable", | ||
"devDependencies": { | ||
"@types/chai": "^4.3.12", | ||
"@types/chai": "^4.3.16", | ||
"@types/chai-as-promised": "^7.1.8", | ||
@@ -65,21 +65,21 @@ "@types/chai-quantifiers": "^1.0.4", | ||
"eslint": "^8.57.0", | ||
"eslint-plugin-es-x": "^7.5.0", | ||
"eslint-plugin-es-x": "^7.6.0", | ||
"eslint-plugin-import": "^2.29.1", | ||
"eslint-plugin-jsdoc": "^46.10.1", | ||
"eslint-plugin-mocha": "^10.4.1", | ||
"eslint-plugin-mocha": "^10.4.3", | ||
"eslint-plugin-n": "^16.6.2", | ||
"eslint-plugin-promise": "^6.1.1", | ||
"eslint-plugin-security": "^1.7.1", | ||
"eslint-plugin-sort-destructure-keys": "^1.5.0", | ||
"eslint-plugin-sort-destructure-keys": "^1.6.0", | ||
"eslint-plugin-unicorn": "^48.0.1", | ||
"husky": "^9.0.11", | ||
"installed-check": "^8.0.1", | ||
"knip": "^5.1.1", | ||
"mocha": "^10.3.0", | ||
"knip": "^5.15.0", | ||
"mocha": "^10.4.0", | ||
"npm-run-all2": "^6.1.2", | ||
"sinon": "^17.0.1", | ||
"sinon-chai": "^3.7.0", | ||
"type-coverage": "^2.27.1", | ||
"typescript": "~5.4.2" | ||
"type-coverage": "^2.28.2", | ||
"typescript": "~5.4.5" | ||
} | ||
} |
@@ -10,3 +10,3 @@ <div align="center"> | ||
Buffered processing of async iterables / generators in parallel to achieve comparable performance to `Promise.all()` | ||
Buffered parallel processing of async iterables / generators. | ||
@@ -20,3 +20,2 @@ [![npm version](https://img.shields.io/npm/v/buffered-async-iterable.svg?style=flat)](https://www.npmjs.com/package/buffered-async-iterable) | ||
**WORK IN PROGRESS – early prerelease** | ||
@@ -75,6 +74,12 @@ ## Usage | ||
### `bufferedAsyncMap(input, callback[, { bufferSize=6 }]) => AsyncIterableIterator` | ||
### bufferedAsyncMap() | ||
Iterates and applies the `callback` to up to `bufferSize` items from `input` yielding values as they resolve. | ||
#### Syntax | ||
`bufferedAsyncMap(input, callback[, { bufferSize=6, ordered=false }]) => AsyncIterableIterator` | ||
#### Arguments | ||
* `input` – either an async iterable, an ordinare iterable or an array | ||
@@ -86,3 +91,20 @@ * `callback(item)` – should be either an async generator or an ordinary async function. Items from async generators are buffered in the main buffer and the buffer is refilled by the one that has least items in the current buffer (`input` is considered equal to sub iterators in this regard when refilling the buffer) | ||
* `bufferSize` – _optional_ – defaults to `6`, sets the max amount of simultanoeus items that processed at once in the buffer. | ||
* `ordered` – _optional_ – defaults to `false`, when `true` the result will be returned in order instead of unordered | ||
### mergeIterables() | ||
Merges all given (async) iterables in parallel, returning the values as they resolve | ||
#### Syntax | ||
`mergeIterables(input[, { bufferSize=6 }]) => AsyncIterableIterator` | ||
#### Arguments | ||
* `input` – an array of async iterables, ordinare iterables and/or arrays | ||
#### Options | ||
* `bufferSize` – _optional_ – defaults to `6`, sets the max amount of simultanoeus items that processed at once in the buffer. | ||
## Similar modules | ||
@@ -89,0 +111,0 @@ |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
No v1
QualityPackage is not semver >=1. This means it is not stable and does not support ^ ranges.
Found 1 instance in 1 package
21735
345
0
114