Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

merge-async-iterators

Package Overview
Dependencies
Maintainers
1
Versions
5
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

merge-async-iterators - npm Package Compare versions

Comparing version 0.1.2 to 0.2.0

114

index.js

@@ -1,33 +0,93 @@

module.exports = async function* merge(iterators, opts = {}) {
module.exports = (iterators, opts = {}) => {
const org = iterators.reduce((org, iterator, i, iterators) => {
if (!iterator.next) {
org.set(iterators[i] = iterator[Symbol.iterator](), iterator);
}
}, new Map);
let done = false;
let interrupt, interruptPromise = new Promise((resolve, reject) => {
interrupt = error => error ? reject(error) : resolve();
});
const throwAll = error => iterators.forEach(i => i.throw && i.throw(error));
const returnAll = () => iterators.forEach(i => i.return && i.return());
const merged = {};
const queue = new Map(iterators.map(i => [i]));
const getOutstanding = () => Array.from(queue.keys()).map(iterator => {
const existing = queue.get(iterator);
if (existing) {
return existing;
} else {
const data = iterator.next();
const promise = data.then ? data.then(data => ({ iterator, data })) : Promise.resolve({ iterator, data });
queue.set(iterator, promise);
return promise;
const race = () => {
const q = Array.from(queue).map(([, pending]) => pending).filter(Boolean);
if (!q.length) return;
return Promise.race([interruptPromise, ...q]);
};
const updateQueue = (input) => queue.forEach((i, iterator) => {
if (queue.get(iterator)) return;
let promise = iterator.next(input);
if (!promise.then) {
promise = Promise.resolve(promise);
}
}).filter(Boolean);
promise = promise.then(data => ({ ...data, iterator }));
queue.set(iterator, promise);
});
try {
while (true) {
const outstanding = getOutstanding();
if (!outstanding.length) break;
const { iterator, data } = await Promise.race(outstanding);
queue.set(iterator, null);
if (data.done) {
queue.delete(iterator);
const getValue = ({ iterator, value } = {}) => opts.yieldIterator ? {
value: {
iterator: org.get(iterator) || iterator,
value
},
done
} : { value, done };
merged.next = async (input) => {
if (done) return getValue();
updateQueue(input);
try {
const result = await race();
if (result) {
if (result.done) {
queue.delete(result.iterator);
} else {
queue.set(result.iterator, null);
}
if (!queue.size) {
done = true;
}
return getValue(result);
} else {
done = true;
returnAll();
return getValue();
}
const value = opts.yieldIterator ? { iterator, data } : data.value;
yield value
} catch (error) {
done = true;
throwAll(error);
throw error;
}
} catch (error) {
iterators.forEach(i => i.throw(error));
throw error;
} finally {
iterators.forEach(i => i.return());
}
};
merged.throw = error => {
/* TODO: don't set done=true unconditionally here, check to see how other iterators handle the throw */
if (done) return getValue();
done = true;
throwAll(error);
interrupt(error);
return getValue();
};
merged.return = (value) => {
if (done) return getValue();
done = true;
returnAll();
interrupt();
return getValue({ value });
};
merged[Symbol.asyncIterator] = () => merged;
return merged;
};
{
"name": "merge-async-iterators",
"version": "0.1.2",
"version": "0.2.0",
"description": "Merge multiple async iterators",

@@ -5,0 +5,0 @@ "main": "index.js",

@@ -25,8 +25,6 @@ # merge-async-iterators

* **`yieldIterator[=false]`** Yields `{iterator, data: {value, done}}` (instead of just `value`)
* **`yieldIterator[=false]`** Yields `{iterator, value}` (instead of `value`)
Useful if you wanna know which iterator yielded the value
* **Returns** A single merged async iterable

@@ -39,14 +37,13 @@

async function* a () {
yield 'a1'
yield 'a2'
return 'a3'
}
async function* b () {
yield 'b1'
yield 'b2'
return 'b3'
}
const array = [1,2];
const iterable = (function*(){
yield 3
yield 4
})()
const asyncIterable = (async function*(){
yield 5
yield 6
})()
for await (const value of merge([a(), b()])) {
for await (const value of merge([array, iterable, asyncIterable])) {
console.log(value)

@@ -56,12 +53,12 @@ }

```
a1
b1
a2
b2
a3
b3
1
2 // order isn't guaranteed
undefined // finished iterators' returns will yield as well
3
4
undefined
5 // async wil almost always come after normal ones
6
```
## Alternatives

@@ -68,0 +65,0 @@

@@ -1,30 +0,45 @@

const merge = require('.')
// const merge = require('mergeiterator')
const merge = require('.');
// const merge = require('mergeiterator');
const stream = require('streams-to-async-iterator');
const delay = (v, t =
Math.random()
* 100
) =>
// v
new Promise(_ => setTimeout(() => _(v), t));
const delay = (t = Math.random() * 100) => new Promise(_ => setTimeout(() => _(t), t));
main().then(() => console.log('ok')).catch(e => {
// console.log('end');
process.exitCode = 1
console.error(e);
});
let array = () => array = ['array', 'array', 'array'];
let basic = () => basic = (function* basic() {
yield 'basic';
yield 'basic';
yield 'basic';
return 'return'
})();
let asyncBasic10 = () => asyncBasic10 = (async function*() {
yield delay(10);
yield delay(10);
yield delay(10);
return 'return'
})();
let asyncBasic1000 = () => asyncBasic1000 = (async function*() {
yield delay(1000);
yield delay(1000);
yield delay(1000);
})();
let stdin = () => stdin = stream(process.stdin.setEncoding('utf8'));
async function main() {
const aI = a();
const bI = b();
const merged = merge([
aI,
bI,
]);
array(),
basic(),
asyncBasic10(),
asyncBasic1000(),
// stdin(),
], {
// yieldIterator: true
});
let counter = 0
for await (const val of merged) {
console.log(val);
// let counter = 0
for await (const value of merged) {
console.log(value);
// if (counter++ > 1)

@@ -34,23 +49,21 @@ // break

// console.log(await merged.next());
// console.log(await merged.next());
// console.log(await merged.next());
// console.log(await merged.next());
// console.log(await merged.next());
// console.log(await merged.next());
// console.log(await merged.next());
// console.log(await merged.next());
// console.log(await merged.next());
// console.log(await merged.next());
// // console.log('1');
// console.log(await merged.next());
// // console.log('1');
}
async function* a() {
// console.log('before a1');
yield delay('a1', 10);
throw new Error('a')
// console.log('before a2');
yield delay('a2', 100);
// console.log('before a3');
yield delay('a3', 100);
return 'last'
}
async function* b() {
// console.log('before b1');
yield delay('b1', 1000);
// console.log('before b2');
yield delay('b2', 1000);
// console.log('before b3');
yield delay('b3', 1000);
return 'last'
}
main().then(() => console.log('ok')).catch(e => {
// console.log('end');
process.exitCode = 1
console.error(e);
});
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc