combine-async-iterators
Advanced tools
Comparing version 1.1.1 to 1.1.2
54
index.js
"use strict"; | ||
/** | ||
* @typedef {object} AsyncIterableObject | ||
* @property {number} index | ||
* @property {IteratorResult<any>} value | ||
*/ | ||
/** | ||
* @function getNextAsyncIteratorValue | ||
* @param {AsyncIterableIterator<any>} iterator | ||
* @param {!number} index | ||
* @returns {Promise<AsyncIterableObject>} | ||
*/ | ||
function getNextAsyncIteratorValue(iterator, index) { | ||
return iterator.next().then((value) => { | ||
return { index, value }; | ||
function getNextAsyncIteratorValue(asyncIterator, index) { | ||
return asyncIterator.next().then((iterator) => { | ||
return { index, iterator }; | ||
}); | ||
} | ||
/** | ||
* @async | ||
* @generator | ||
* @function combineAsyncIterators | ||
* @param {...AsyncIterableIterator<any>} iterators | ||
*/ | ||
async function* combineAsyncIterators(...iterators) { | ||
// Return if iterators is empty (avoid infinite loop). | ||
if (iterators.length === 0) { | ||
return; | ||
} | ||
const promiseThatNeverResolve = new Promise(() => null); | ||
try { | ||
if (iterators.length === 0) { | ||
return; | ||
} | ||
const promises = iterators.map(getNextAsyncIteratorValue); | ||
let open = iterators.length; | ||
const asyncIteratorsValues = iterators.map(getNextAsyncIteratorValue); | ||
let numberOfIteratorsAlive = iterators.length; | ||
do { | ||
const { value, index } = await Promise.race(promises); | ||
if (value.done) { | ||
open--; | ||
promises[index] = new Promise(() => null); | ||
const { iterator, index } = await Promise.race(asyncIteratorsValues); | ||
if (iterator.done) { | ||
numberOfIteratorsAlive--; | ||
// We dont want Promise.race to resolve again on this index | ||
// so we replace it with a Promise that will never resolve. | ||
asyncIteratorsValues[index] = promiseThatNeverResolve; | ||
} | ||
else { | ||
yield value.value; | ||
promises[index] = getNextAsyncIteratorValue(iterators[index], index); | ||
yield iterator.value; | ||
asyncIteratorsValues[index] = getNextAsyncIteratorValue(iterators[index], index); | ||
} | ||
} while (open > 0); | ||
} while (numberOfIteratorsAlive > 0); | ||
} | ||
catch (err) { | ||
// TODO: replace .all with .allSettled | ||
await Promise.all(iterators.map((it) => it.return())); | ||
@@ -49,0 +37,0 @@ |
{ | ||
"name": "combine-async-iterators", | ||
"version": "1.1.1", | ||
"version": "1.1.2", | ||
"description": "Combine Multiple Asynchronous Iterators in one (not a sequence)", | ||
@@ -10,3 +10,3 @@ "main": "index.js", | ||
"engines": { | ||
"node": ">=11" | ||
"node": ">=11" | ||
}, | ||
@@ -21,9 +21,13 @@ "repository": { | ||
"keywords": [ | ||
"async", | ||
"asynchronous", | ||
"iterator", | ||
"iterable", | ||
"combine", | ||
"merge", | ||
"sequence" | ||
"async", | ||
"asynchronous", | ||
"iterator", | ||
"iterators", | ||
"iteration", | ||
"iterable", | ||
"combine", | ||
"merge", | ||
"build", | ||
"construct", | ||
"sequence" | ||
], | ||
@@ -37,7 +41,7 @@ "author": "GENTILHOMME Thomas <gentilhomme.thomas@gmail.com>", | ||
"devDependencies": { | ||
"@slimio/eslint-config": "^3.0.2", | ||
"@slimio/eslint-config": "^3.0.3", | ||
"@slimio/is": "^1.5.1", | ||
"@types/node": "^12.6.8", | ||
"japa": "^2.0.10" | ||
"japa": "^3.0.1" | ||
} | ||
} |
# Combine-async-iterators | ||
![version](https://img.shields.io/badge/dynamic/json.svg?url=https://raw.githubusercontent.com/fraxken/combine-async-iterators/master/package.json&query=$.version&label=Version) | ||
![MIT](https://img.shields.io/github/license/mashape/apistatus.svg) | ||
![dep](https://img.shields.io/david/fraxken/combine-async-iterators) | ||
![size](https://img.shields.io/bundlephobia/min/combine-async-iterators) | ||
[![Known Vulnerabilities](https://snyk.io//test/github/fraxken/combine-async-iterators/badge.svg?targetFile=package.json)](https://snyk.io//test/github/fraxken/combine-async-iterators?targetFile=package.json) | ||
Combine Multiple Asynchronous Iterators in one (not a sequence). It use **Promise.race** under the hood (the code idea is from [Targos](http://github.com/targos)). | ||
@@ -19,8 +25,10 @@ | ||
```js | ||
const { promisify } = require("util"); | ||
const combineAsyncIterators = require("combine-async-iterators"); | ||
const sleep = promisify(setTimeout); | ||
async function* getValues(id) { | ||
for (let count = 0; count < 5; count++) { | ||
const ms = Math.ceil(Math.random() * 1000); | ||
await new Promise((resolve) => setTimeout(resolve, ms)); | ||
await sleep(Math.ceil(Math.random() * 1000)); | ||
yield `${id}_${count}`; | ||
@@ -27,0 +35,0 @@ } |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
5517
55
38