Socket
Socket
Sign inDemoInstall

buffered-async-iterable

Package Overview
Dependencies
Maintainers
1
Versions
5
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

buffered-async-iterable - npm Package Compare versions

Comparing version 0.1.0 to 0.2.0

5

index.d.ts

@@ -1,4 +0,3 @@

export function map<T, R>(input: AsyncIterable<T> | Iterable<T> | T[], callback: (item: T) => Promise<R> | AsyncIterable<R>, options?: {
queueSize?: number | undefined;
escapeToEventLoopEvery?: number | undefined;
export function bufferedAsyncMap<T, R>(input: AsyncIterable<T> | Iterable<T> | T[], callback: (item: T) => Promise<R> | AsyncIterable<R>, options?: {
bufferSize?: number | undefined;
} | undefined): AsyncIterableIterator<R> & {

@@ -5,0 +4,0 @@ return: (value?: any) => Promise<IteratorResult<R, any>>;

189

index.js

@@ -0,8 +1,11 @@

/* eslint-disable promise/prefer-await-to-then */
// TODO: Get inspired by Matteos https://github.com/mcollina/hwp/blob/main/index.js, eg AbortController is nice?
// FIXME: Check this https://twitter.com/matteocollina/status/1392056117128306691
// FIXME: Read up on https://tc39.es/ecma262/#table-async-iterator-optional and add return() and throw(). return() is called by a "for await" when eg. a "break" or a "throw" happens within it
// TODO: Check docs here https://tc39.es/ecma262/#sec-operations-on-iterator-objects
// TODO: Look into https://tc39.es/ecma262/#sec-iteratorclose / https://tc39.es/ecma262/#sec-asynciteratorclose
// TODO: See "iteratorKind" in https://tc39.es/ecma262/#sec-runtime-semantics-forin-div-ofbodyevaluation-lhs-stmt-iterator-lhskind-labelset – see how it loops and validates the returned values
// TODO: THERE'S ACTUALLY A "throw" method MENTION IN https://tc39.es/ecma262/#sec-generator-function-definitions-runtime-semantics-evaluation: "NOTE: Exceptions from the inner iterator throw method are propagated. Normal completions from an inner throw method are processed similarly to an inner next." THOUGH NOT SURE HOW TO TRIGGER IT IN PRACTICE, SEE yield.spec.js
// TODO: Have option to persist order? To not use Promise.race()?
// TODO: Make a proper merge for async iterables by accepting multiple input iterables, see: https://twitter.com/matteocollina/status/1392056092482576385
import { EventLoopBreather } from './lib/event-loop-breather.js';
import { findLeastTargeted } from './lib/find-least-targeted.js';

@@ -17,11 +20,9 @@ import { makeIterableAsync } from './lib/misc.js';

* @param {(item: T) => (Promise<R>|AsyncIterable<R>)} callback
* @param {{ queueSize?: number|undefined, escapeToEventLoopEvery?: number|undefined }} [options]
* @param {{ bufferSize?: number|undefined }} [options]
* @returns {AsyncIterableIterator<R> & { return: NonNullable<AsyncIterableIterator<R>["return"]>, throw: NonNullable<AsyncIterableIterator<R>["throw"]> }}
*/
export function map (input, callback, options) {
/** @typedef {Promise<IteratorResult<R|AsyncIterable<R>> & { queuePromise: QueuePromise, fromSubIterator?: boolean, isSubIterator?: boolean }>} QueuePromise */
export function bufferedAsyncMap (input, callback, options) {
/** @typedef {Promise<IteratorResult<R|AsyncIterable<R>> & { bufferPromise: BufferPromise, fromSubIterator?: boolean, isSubIterator?: boolean, err?: unknown }>} BufferPromise */
const {
escapeToEventLoopEvery,
queueSize = 6,
bufferSize = 6,
} = options || {};

@@ -37,3 +38,3 @@

if (typeof callback !== 'function') throw new TypeError('Expected callback to be a function');
if (typeof queueSize !== 'number') throw new TypeError('Expected queueSize to be a number');
if (typeof bufferSize !== 'number') throw new TypeError('Expected bufferSize to be a number');

@@ -46,10 +47,8 @@ /** @type {AsyncIterator<T, unknown>} */

/** @type {Set<QueuePromise>} */
const queuedPromises = new Set();
/** @type {Set<BufferPromise>} */
const bufferedPromises = new Set();
/** @type {WeakMap<QueuePromise, AsyncIterator<T>|AsyncIterator<R>>} */
/** @type {WeakMap<BufferPromise, AsyncIterator<T>|AsyncIterator<R>>} */
const promisesToSourceIteratorMap = new WeakMap();
const breather = new EventLoopBreather(escapeToEventLoopEvery);
/** @type {boolean} */

@@ -59,14 +58,34 @@ let mainReturnedDone;

/** @type {boolean} */
let done;
let isDone;
/** @returns {Promise<IteratorReturnResult<undefined>>} */
const markAsEnded = async () => {
if (!done) {
done = true;
queuedPromises.clear();
/** @type {Error|undefined} */
let hasError;
if (asyncIterator.return) {
await asyncIterator.return();
/**
* @param {boolean} [throwAnyError]
* @returns {Promise<IteratorReturnResult<undefined>>}
*/
const markAsEnded = async (throwAnyError) => {
if (!isDone) {
isDone = true;
// TODO: Errors from here, how to handle? allSettled() ensures they will be caught at least
await Promise.allSettled(
[
// Ensure the main iterators are completed
...(mainReturnedDone ? [] : [asyncIterator]),
...subIterators,
]
.map(item => item.return && item.return())
);
// TODO: Could we use an AbortController to improve this? See eg. https://github.com/mcollina/hwp/pull/10
bufferedPromises.clear();
subIterators.clear();
if (throwAnyError && hasError) {
throw hasError;
}
}
return { done: true, value: undefined };

@@ -76,3 +95,3 @@ };

const fillQueue = () => {
if (done) return;
if (hasError || isDone) return;

@@ -82,3 +101,3 @@ // Check which iterator that has the least amount of queued promises right now

mainReturnedDone ? subIterators : [...subIterators, asyncIterator],
queuedPromises,
bufferedPromises,
promisesToSourceIteratorMap

@@ -89,17 +108,25 @@ );

// FIXME: Handle rejected promises from upstream! And properly mark this iterator as completed
/** @type {QueuePromise} */
const queuePromise = currentSubIterator
? breather.breathe(currentSubIterator.next())
// eslint-disable-next-line promise/prefer-await-to-then
/** @type {BufferPromise} */
const bufferPromise = currentSubIterator
? Promise.resolve(currentSubIterator.next())
.catch(err => ({
err: err instanceof Error ? err : new Error('Unknown subiterator error'),
}))
.then(async result => {
if (result.done) {
if (typeof result !== 'object') {
throw new TypeError('Expected an object value');
}
if ('err' in result || result.done) {
subIterators.delete(currentSubIterator);
}
/** @type {Awaited<QueuePromise>} */
/** @type {Awaited<BufferPromise>} */
const promiseValue = {
queuePromise,
bufferPromise,
fromSubIterator: true,
...result,
...(
'err' in result
? { done: true, value: undefined, ...result }
: result
),
};

@@ -109,8 +136,20 @@

})
: breather.breathe(asyncIterator.next())
// eslint-disable-next-line promise/prefer-await-to-then
: Promise.resolve(asyncIterator.next())
.catch(err => ({
err: err instanceof Error ? err : new Error('Unknown iterator error'),
}))
.then(async result => {
if (result.done) {
if (typeof result !== 'object') {
throw new TypeError('Expected an object value');
}
if ('err' in result || result.done) {
mainReturnedDone = true;
return { queuePromise, ...result };
return {
bufferPromise,
...(
'err' in result
? { done: true, value: undefined, ...result }
: result
),
};
}

@@ -120,17 +159,31 @@

const callbackResult = callback(result.value);
const isSubIterator = isAsyncIterable(callbackResult);
/** @type {Awaited<QueuePromise>} */
const promiseValue = {
queuePromise,
isSubIterator: isAsyncIterable(callbackResult),
value: await callbackResult,
};
/** @type {Awaited<BufferPromise>} */
let promiseValue;
try {
const value = await callbackResult;
promiseValue = {
bufferPromise,
isSubIterator,
value,
};
} catch (err) {
promiseValue = {
bufferPromise,
done: true,
err: err instanceof Error ? err : new Error('Unknown callback error'),
value: undefined,
};
}
return promiseValue;
});
promisesToSourceIteratorMap.set(queuePromise, currentSubIterator || asyncIterator);
queuedPromises.add(queuePromise);
promisesToSourceIteratorMap.set(bufferPromise, currentSubIterator || asyncIterator);
bufferedPromises.add(bufferPromise);
if (queuedPromises.size < queueSize) {
if (bufferedPromises.size < bufferSize) {
fillQueue();

@@ -142,20 +195,25 @@ }

const nextValue = async () => {
if (queuedPromises.size === 0) return markAsEnded();
if (done) return { done: true, value: undefined };
if (bufferedPromises.size === 0) return markAsEnded(true);
if (isDone) return { done: true, value: undefined };
// FIXME: Handle rejected promises! We need to remove it from bufferedPromises
// Wait for some of the current promises to be finished
const {
bufferPromise,
done,
err,
fromSubIterator,
isSubIterator,
queuePromise,
...result
} = await Promise.race(queuedPromises);
value,
} = await Promise.race(bufferedPromises);
queuedPromises.delete(queuePromise);
bufferedPromises.delete(bufferPromise);
// We are mandated by the spec to always do this return if the iterator is done
if (done) {
if (isDone) {
return { done: true, value: undefined };
} else if (result.done) {
} else if (err || done) {
if (err && !hasError) {
hasError = err instanceof Error ? err : new Error('Unknown error');
}
if (fromSubIterator || subIterators.size !== 0) {

@@ -165,8 +223,8 @@ fillQueue();

return queuedPromises.size === 0
? markAsEnded()
return bufferedPromises.size === 0
? markAsEnded(true)
: nextValue();
} else if (isSubIterator && isAsyncIterable(result.value)) {
// FIXME: Handle possible error here?
subIterators.add(result.value[Symbol.asyncIterator]());
} else if (isSubIterator && isAsyncIterable(value)) {
// TODO: Handle possible error here? Or too obscure?
subIterators.add(value[Symbol.asyncIterator]());
fillQueue();

@@ -177,5 +235,3 @@ return nextValue();

// TODO: Fix the types
// @ts-ignore
return { value: result.value };
return /** @type {{ value: R }} */ ({ value });
}

@@ -190,3 +246,2 @@ };

async next () {
// eslint-disable-next-line promise/prefer-await-to-then
currentStep = currentStep ? currentStep.then(() => nextValue()) : nextValue();

@@ -199,4 +254,4 @@ return currentStep;

'throw': async (err) => {
// FIXME: Should remember the throw? And return a rejected promise always?
markAsEnded();
// TODO: Should remember the throw? And return a rejected promise always?
await markAsEnded();
throw err;

@@ -203,0 +258,0 @@ },

{
"name": "buffered-async-iterable",
"version": "0.1.0",
"version": "0.2.0",
"description": "Creates a prefetched async iterable",

@@ -28,5 +28,2 @@ "homepage": "http://github.com/voxpelli/buffered-async-iterable",

"scripts": {
"benchmark:event-loop-lag": "node benchmark/event-loop-lag.js",
"benchmark:memory": "node --expose-gc benchmark/memory.js",
"benchmark": "run-s benchmark:*",
"build:0": "run-s clean",

@@ -39,14 +36,6 @@ "build:1-declaration": "tsc -p declaration.tsconfig.json",

"check:tsc": "tsc",
"check:type-coverage": "type-coverage --detail --strict --at-least 95 --ignore-files 'benchmark/*' --ignore-files 'benchmark/*/*'",
"check:type-coverage": "type-coverage --detail --strict --at-least 95 --ignore-files 'test/*.spec.js'",
"check": "run-s clean && run-p check:*",
"clean:declarations": "rm -rf $(find . -maxdepth 2 -type f -name '*.d.ts*')",
"clean": "run-p clean:*",
"flame:real:event-loop-immediate": "0x -o -- node benchmark/event-loop-lag-real-immediate.js",
"flame:real:event-loop-static": "0x -o -- node benchmark/event-loop-lag-real-static.js",
"flame:real": "run-s flame:real:*",
"flame:static:event-loop-immediate-all": "0x -o -- node benchmark/event-loop-lag-immediate-all.js",
"flame:static:event-loop-immediate-tenth": "0x -o -- node benchmark/event-loop-lag-immediate-tenth.js",
"flame:static:event-loop-static": "0x -o -- node benchmark/event-loop-lag-static.js",
"flame:static": "run-s flame:static:*",
"flame": "run-s flame:*",
"prepublishOnly": "run-s build",

@@ -59,4 +48,2 @@ "test:mocha": "c8 --reporter=lcov --reporter text mocha 'test/**/*.spec.js'",

"devDependencies": {
"@airbnb/node-memwatch": "^2.0.0",
"@types/airbnb__node-memwatch": "^2.0.0",
"@types/chai": "^4.3.5",

@@ -71,3 +58,2 @@ "@types/chai-as-promised": "^7.1.5",

"@voxpelli/tsconfig": "^7.0.0",
"0x": "^5.5.0",
"c8": "^7.13.0",

@@ -74,0 +60,0 @@ "chai": "^4.3.7",

@@ -1,2 +0,2 @@

# Async Iterable Prefetch
# buffered-async-iterable

@@ -17,11 +17,67 @@ [![npm version](https://img.shields.io/npm/v/buffered-async-iterable.svg?style=flat)](https://www.npmjs.com/package/buffered-async-iterable)

```javascript
import { map } from 'buffered-async-iterable';
import { bufferedAsyncMap } from 'buffered-async-iterable';
const mappedData = map(asyncIterable, async (item) => {
async function * asyncGenerator() {
yield ...
}
const mappedIterator = bufferedAsyncMap(asyncGenerator(), async (item) => {
// Apply additional async lookup / processing
});
for await (const item of mappedData) {
for await (const item of mappedIterator) {
// Consume the buffered async iterable
}
```
### Array input
```javascript
import { bufferedAsyncMap } from 'buffered-async-iterable';
const mappedIterator = bufferedAsyncMap(['foo'], async (item) => {
// Apply additional async lookup / processing
});
for await (const item of mappedIterator) {
// Consume the buffered async iterable
}
```
### Async generator result
```javascript
import { bufferedAsyncMap } from 'buffered-async-iterable';
const mappedIterator = bufferedAsyncMap(['foo'], async function * (item) => {
// Apply additional async lookup / processing
yield ...
yield * ...
});
for await (const item of mappedIterator) {
// Consume the buffered async iterable
}
```
## API
### `bufferedAsyncMap(input, callback[, { bufferSize=6 }]) => AsyncIterableIterator`
Iterates and applies the `callback` to up to `bufferSize` items from `input` yielding values as they resolve.
* `input` – either an async iterable, an ordinare iterable or an array
* `callback(item)` – should be either an async generator or an ordinary async function. Items from async generators are buffered in the main buffer and the buffer is refilled by the one that has least items in the current buffer (`input` is considered equal to sub iterators in this regard when refilling the buffer)
#### Options
* `bufferSize` – _optional_ – defaults to `6`, sets the max amount of simultanoeus items that processed at once in the buffer.
## Similar modules
* [`hwp`](https://github.com/mcollina/hwp) – similar module by @mcollina
<!-- ## See also
* [Announcement blog post](#)
* [Announcement tweet](#) -->

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc