Socket
Socket
Sign inDemoInstall

buffered-async-iterable

Package Overview
Dependencies
Maintainers
1
Versions
5
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

buffered-async-iterable - npm Package Compare versions

Comparing version 0.3.0 to 1.0.0

4

index.d.ts

@@ -0,3 +1,7 @@

export function mergeIterables<T>(input: (AsyncIterable<T> | Iterable<T> | T[])[], { bufferSize }?: {
bufferSize?: number | undefined;
} | undefined): AsyncIterable<T>;
export function bufferedAsyncMap<T, R>(input: AsyncIterable<T> | Iterable<T> | T[], callback: (item: T) => Promise<R> | AsyncIterable<R>, options?: {
bufferSize?: number | undefined;
ordered?: boolean | undefined;
} | undefined): AsyncIterableIterator<R> & {

@@ -4,0 +8,0 @@ return: (value?: any) => Promise<IteratorResult<R, any>>;

96

index.js

@@ -8,15 +8,32 @@ /* eslint-disable promise/prefer-await-to-then */

// TODO: THERE'S ACTUALLY A "throw" method MENTION IN https://tc39.es/ecma262/#sec-generator-function-definitions-runtime-semantics-evaluation: "NOTE: Exceptions from the inner iterator throw method are propagated. Normal completions from an inner throw method are processed similarly to an inner next." THOUGH NOT SURE HOW TO TRIGGER IT IN PRACTICE, SEE yield.spec.js
// TODO: Have option to persist order? To not use Promise.race()?
// TODO: Make a proper merge for async iterables by accepting multiple input iterables, see: https://twitter.com/matteocollina/status/1392056092482576385
import { findLeastTargeted } from './lib/find-least-targeted.js';
import { makeIterableAsync } from './lib/misc.js';
import { isAsyncIterable, isIterable, isPartOfSet } from './lib/type-checks.js';
import { arrayDeleteInPlace, makeIterableAsync } from './lib/misc.js';
import { isAsyncIterable, isIterable, isPartOfArray } from './lib/type-checks.js';
/**
* @template T
* @param {AsyncIterable<T> | Iterable<T> | T[]} item
* @returns {AsyncIterable<T>}
*/
async function * yieldIterable (item) {
yield * item;
}
/**
* @template T
* @param {Array<AsyncIterable<T> | Iterable<T> | T[]>} input
* @param {{ bufferSize?: number|undefined }} [options]
* @returns {AsyncIterable<T>}
*/
export async function * mergeIterables (input, { bufferSize } = {}) {
yield * bufferedAsyncMap(input, yieldIterable, { bufferSize });
}
/**
* @template T
* @template R
* @param {AsyncIterable<T> | Iterable<T> | T[]} input
* @param {(item: T) => (Promise<R>|AsyncIterable<R>)} callback
* @param {{ bufferSize?: number|undefined }} [options]
* @param {{ bufferSize?: number|undefined, ordered?: boolean|undefined }} [options]
* @returns {AsyncIterableIterator<R> & { return: NonNullable<AsyncIterableIterator<R>["return"]>, throw: NonNullable<AsyncIterableIterator<R>["throw"]> }}

@@ -28,2 +45,3 @@ */

bufferSize = 6,
ordered = false,
} = options || {};

@@ -44,7 +62,7 @@

/** @type {Set<AsyncIterator<R, unknown>>} */
const subIterators = new Set();
/** @type {AsyncIterator<R, unknown>[]} */
const subIterators = [];
/** @type {Set<BufferPromise>} */
const bufferedPromises = new Set();
/** @type {BufferPromise[]} */
const bufferedPromises = [];

@@ -82,4 +100,4 @@ /** @type {WeakMap<BufferPromise, AsyncIterator<T>|AsyncIterator<R>>} */

// TODO: Could we use an AbortController to improve this? See eg. https://github.com/mcollina/hwp/pull/10
bufferedPromises.clear();
subIterators.clear();
bufferedPromises.splice(0, bufferedPromises.length);
subIterators.splice(0, subIterators.length);

@@ -97,11 +115,17 @@ if (throwAnyError && hasError) {

// Check which iterator that has the least amount of queued promises right now
const iterator = findLeastTargeted(
mainReturnedDone ? subIterators : [...subIterators, asyncIterator],
bufferedPromises,
promisesToSourceIteratorMap
);
/** @type {AsyncIterator<R, unknown>|undefined} */
let currentSubIterator;
const currentSubIterator = isPartOfSet(iterator, subIterators) ? iterator : undefined;
if (ordered) {
currentSubIterator = subIterators[0];
} else {
const iterator = findLeastTargeted(
mainReturnedDone ? subIterators : [...subIterators, asyncIterator],
bufferedPromises,
promisesToSourceIteratorMap
);
currentSubIterator = isPartOfArray(iterator, subIterators) ? iterator : undefined;
}
/** @type {BufferPromise} */

@@ -118,3 +142,3 @@ const bufferPromise = currentSubIterator

if ('err' in result || result.done) {
subIterators.delete(currentSubIterator);
arrayDeleteInPlace(subIterators, currentSubIterator);
}

@@ -183,5 +207,16 @@

promisesToSourceIteratorMap.set(bufferPromise, currentSubIterator || asyncIterator);
bufferedPromises.add(bufferPromise);
if (bufferedPromises.size < bufferSize) {
if (ordered && currentSubIterator) {
let i = 0;
while (promisesToSourceIteratorMap.get(/** @type {BufferPromise} */ (bufferedPromises[i])) === currentSubIterator) {
i += 1;
}
bufferedPromises.splice(i, 0, bufferPromise);
} else {
bufferedPromises.push(bufferPromise);
}
if (bufferedPromises.length < bufferSize) {
fillQueue();

@@ -193,8 +228,13 @@ }

const nextValue = async () => {
if (bufferedPromises.size === 0) return markAsEnded(true);
const nextBufferedPromise = bufferedPromises[0];
if (!nextBufferedPromise) return markAsEnded(true);
if (isDone) return { done: true, value: undefined };
/** @type {Awaited<BufferPromise>} */
const resolvedPromise = await (ordered ? nextBufferedPromise : Promise.race(bufferedPromises));
arrayDeleteInPlace(bufferedPromises, resolvedPromise.bufferPromise);
// Wait for some of the current promises to be finished
const {
bufferPromise,
done,

@@ -205,6 +245,4 @@ err,

value,
} = await Promise.race(bufferedPromises);
} = resolvedPromise;
bufferedPromises.delete(bufferPromise);
// We are mandated by the spec to always do this return if the iterator is done

@@ -218,7 +256,7 @@ if (isDone) {

if (fromSubIterator || subIterators.size !== 0) {
if (fromSubIterator || subIterators.length > 0) {
fillQueue();
}
return bufferedPromises.size === 0
return bufferedPromises.length === 0
? markAsEnded(true)

@@ -228,3 +266,3 @@ : nextValue();

// TODO: Handle possible error here? Or too obscure?
subIterators.add(value[Symbol.asyncIterator]());
subIterators.unshift(value[Symbol.asyncIterator]());
fillQueue();

@@ -231,0 +269,0 @@ return nextValue();

@@ -1,2 +0,2 @@

export function findLeastTargeted<Target, Item extends object>(targets: Iterable<Target> | Target[], items: Set<Item>, itemTargets: WeakMap<Item, Target>): Target | undefined;
export function findLeastTargeted<Target, Item extends object>(targets: Iterable<Target> | Target[], items: Iterable<Item> | Item[], itemTargets: WeakMap<Item, Target>): Target | undefined;
//# sourceMappingURL=find-least-targeted.d.ts.map
/**
* @template Target
* @template {object} Item
* @param {Set<Item>} items
* @param {Iterable<Item> | Item[]} items
* @param {WeakMap<Item, Target>} itemTargets

@@ -28,3 +28,3 @@ * @returns {Map<Target, number>}

* @param {Iterable<Target> | Target[]} targets
* @param {Set<Item>} items
* @param {Iterable<Item> | Item[]} items
* @param {WeakMap<Item, Target>} itemTargets

@@ -31,0 +31,0 @@ * @returns {Target|undefined}

export function makeIterableAsync<T>(input: Iterable<T> | T[]): AsyncIterable<T>;
export function arrayDeleteInPlace<T>(list: T[], value: T): void;
//# sourceMappingURL=misc.d.ts.map

@@ -11,1 +11,15 @@ /**

}
/**
* Similar to the .delete() on a set
*
* @template T
* @param {T[]} list
* @param {T} value
*/
export function arrayDeleteInPlace (list, value) {
const index = list.indexOf(value);
if (index !== -1) {
list.splice(index, 1);
}
}
export function isIterable(value: unknown): value is Iterable<unknown>;
export function isAsyncIterable(value: unknown): value is AsyncIterable<unknown>;
export function isPartOfSet<SetValue>(value: unknown, set: Set<SetValue>): value is SetValue;
export function isPartOfArray<Values>(value: unknown, list: Values[]): value is Values;
//# sourceMappingURL=type-checks.d.ts.map

@@ -14,7 +14,7 @@ /**

/**
* @template SetValue
* @template Values
* @param {unknown} value
* @param {Set<SetValue>} set
* @returns {value is SetValue}
* @param {Values[]} list
* @returns {value is Values}
*/
export const isPartOfSet = (value, set) => set.has(/** @type {SetValue} */ (value));
export const isPartOfArray = (value, list) => list.includes(/** @type {Values} */ (value));
{
"name": "buffered-async-iterable",
"version": "0.3.0",
"version": "1.0.0",
"description": "Creates a prefetched async iterable",

@@ -50,3 +50,3 @@ "homepage": "http://github.com/voxpelli/buffered-async-iterable",

"devDependencies": {
"@types/chai": "^4.3.12",
"@types/chai": "^4.3.16",
"@types/chai-as-promised": "^7.1.8",

@@ -65,21 +65,21 @@ "@types/chai-quantifiers": "^1.0.4",

"eslint": "^8.57.0",
"eslint-plugin-es-x": "^7.5.0",
"eslint-plugin-es-x": "^7.6.0",
"eslint-plugin-import": "^2.29.1",
"eslint-plugin-jsdoc": "^46.10.1",
"eslint-plugin-mocha": "^10.4.1",
"eslint-plugin-mocha": "^10.4.3",
"eslint-plugin-n": "^16.6.2",
"eslint-plugin-promise": "^6.1.1",
"eslint-plugin-security": "^1.7.1",
"eslint-plugin-sort-destructure-keys": "^1.5.0",
"eslint-plugin-sort-destructure-keys": "^1.6.0",
"eslint-plugin-unicorn": "^48.0.1",
"husky": "^9.0.11",
"installed-check": "^8.0.1",
"knip": "^5.1.1",
"mocha": "^10.3.0",
"knip": "^5.15.0",
"mocha": "^10.4.0",
"npm-run-all2": "^6.1.2",
"sinon": "^17.0.1",
"sinon-chai": "^3.7.0",
"type-coverage": "^2.27.1",
"typescript": "~5.4.2"
"type-coverage": "^2.28.2",
"typescript": "~5.4.5"
}
}

@@ -10,3 +10,3 @@ <div align="center">

Buffered processing of async iterables / generators in parallel to achieve comparable performance to `Promise.all()`
Buffered parallel processing of async iterables / generators.

@@ -20,3 +20,2 @@ [![npm version](https://img.shields.io/npm/v/buffered-async-iterable.svg?style=flat)](https://www.npmjs.com/package/buffered-async-iterable)

**WORK IN PROGRESS – early prerelease**

@@ -75,6 +74,12 @@ ## Usage

### `bufferedAsyncMap(input, callback[, { bufferSize=6 }]) => AsyncIterableIterator`
### bufferedAsyncMap()
Iterates and applies the `callback` to up to `bufferSize` items from `input` yielding values as they resolve.
#### Syntax
`bufferedAsyncMap(input, callback[, { bufferSize=6, ordered=false }]) => AsyncIterableIterator`
#### Arguments
* `input` – either an async iterable, an ordinare iterable or an array

@@ -86,3 +91,20 @@ * `callback(item)` – should be either an async generator or an ordinary async function. Items from async generators are buffered in the main buffer and the buffer is refilled by the one that has least items in the current buffer (`input` is considered equal to sub iterators in this regard when refilling the buffer)

* `bufferSize` – _optional_ – defaults to `6`, sets the max amount of simultanoeus items that processed at once in the buffer.
* `ordered` – _optional_ – defaults to `false`, when `true` the result will be returned in order instead of unordered
### mergeIterables()
Merges all given (async) iterables in parallel, returning the values as they resolve
#### Syntax
`mergeIterables(input[, { bufferSize=6 }]) => AsyncIterableIterator`
#### Arguments
* `input` – an array of async iterables, ordinare iterables and/or arrays
#### Options
* `bufferSize` – _optional_ – defaults to `6`, sets the max amount of simultanoeus items that processed at once in the buffer.
## Similar modules

@@ -89,0 +111,0 @@

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc