merge-async-iterators
Advanced tools
Comparing version 0.1.2 to 0.2.0
114
index.js
@@ -1,33 +0,93 @@ | ||
module.exports = async function* merge(iterators, opts = {}) { | ||
module.exports = (iterators, opts = {}) => { | ||
const org = iterators.reduce((org, iterator, i, iterators) => { | ||
if (!iterator.next) { | ||
org.set(iterators[i] = iterator[Symbol.iterator](), iterator); | ||
} | ||
}, new Map); | ||
let done = false; | ||
let interrupt, interruptPromise = new Promise((resolve, reject) => { | ||
interrupt = error => error ? reject(error) : resolve(); | ||
}); | ||
const throwAll = error => iterators.forEach(i => i.throw && i.throw(error)); | ||
const returnAll = () => iterators.forEach(i => i.return && i.return()); | ||
const merged = {}; | ||
const queue = new Map(iterators.map(i => [i])); | ||
const getOutstanding = () => Array.from(queue.keys()).map(iterator => { | ||
const existing = queue.get(iterator); | ||
if (existing) { | ||
return existing; | ||
} else { | ||
const data = iterator.next(); | ||
const promise = data.then ? data.then(data => ({ iterator, data })) : Promise.resolve({ iterator, data }); | ||
queue.set(iterator, promise); | ||
return promise; | ||
const race = () => { | ||
const q = Array.from(queue).map(([, pending]) => pending).filter(Boolean); | ||
if (!q.length) return; | ||
return Promise.race([interruptPromise, ...q]); | ||
}; | ||
const updateQueue = (input) => queue.forEach((i, iterator) => { | ||
if (queue.get(iterator)) return; | ||
let promise = iterator.next(input); | ||
if (!promise.then) { | ||
promise = Promise.resolve(promise); | ||
} | ||
}).filter(Boolean); | ||
promise = promise.then(data => ({ ...data, iterator })); | ||
queue.set(iterator, promise); | ||
}); | ||
try { | ||
while (true) { | ||
const outstanding = getOutstanding(); | ||
if (!outstanding.length) break; | ||
const { iterator, data } = await Promise.race(outstanding); | ||
queue.set(iterator, null); | ||
if (data.done) { | ||
queue.delete(iterator); | ||
const getValue = ({ iterator, value } = {}) => opts.yieldIterator ? { | ||
value: { | ||
iterator: org.get(iterator) || iterator, | ||
value | ||
}, | ||
done | ||
} : { value, done }; | ||
merged.next = async (input) => { | ||
if (done) return getValue(); | ||
updateQueue(input); | ||
try { | ||
const result = await race(); | ||
if (result) { | ||
if (result.done) { | ||
queue.delete(result.iterator); | ||
} else { | ||
queue.set(result.iterator, null); | ||
} | ||
if (!queue.size) { | ||
done = true; | ||
} | ||
return getValue(result); | ||
} else { | ||
done = true; | ||
returnAll(); | ||
return getValue(); | ||
} | ||
const value = opts.yieldIterator ? { iterator, data } : data.value; | ||
yield value | ||
} catch (error) { | ||
done = true; | ||
throwAll(error); | ||
throw error; | ||
} | ||
} catch (error) { | ||
iterators.forEach(i => i.throw(error)); | ||
throw error; | ||
} finally { | ||
iterators.forEach(i => i.return()); | ||
} | ||
}; | ||
merged.throw = error => { | ||
/* TODO: don't set done=true unconditionally here, check to see how other iterators handle the throw */ | ||
if (done) return getValue(); | ||
done = true; | ||
throwAll(error); | ||
interrupt(error); | ||
return getValue(); | ||
}; | ||
merged.return = (value) => { | ||
if (done) return getValue(); | ||
done = true; | ||
returnAll(); | ||
interrupt(); | ||
return getValue({ value }); | ||
}; | ||
merged[Symbol.asyncIterator] = () => merged; | ||
return merged; | ||
}; |
{ | ||
"name": "merge-async-iterators", | ||
"version": "0.1.2", | ||
"version": "0.2.0", | ||
"description": "Merge multiple async iterators", | ||
@@ -5,0 +5,0 @@ "main": "index.js", |
@@ -25,8 +25,6 @@ # merge-async-iterators | ||
* **`yieldIterator[=false]`** Yields `{iterator, data: {value, done}}` (instead of just `value`) | ||
* **`yieldIterator[=false]`** Yields `{iterator, value}` (instead of `value`) | ||
Useful if you wanna know which iterator yielded the value | ||
* **Returns** A single merged async iterable | ||
@@ -39,14 +37,13 @@ | ||
async function* a () { | ||
yield 'a1' | ||
yield 'a2' | ||
return 'a3' | ||
} | ||
async function* b () { | ||
yield 'b1' | ||
yield 'b2' | ||
return 'b3' | ||
} | ||
const array = [1,2]; | ||
const iterable = (function*(){ | ||
yield 3 | ||
yield 4 | ||
})() | ||
const asyncIterable = (async function*(){ | ||
yield 5 | ||
yield 6 | ||
})() | ||
for await (const value of merge([a(), b()])) { | ||
for await (const value of merge([array, iterable, asyncIterable])) { | ||
console.log(value) | ||
@@ -56,12 +53,12 @@ } | ||
``` | ||
a1 | ||
b1 | ||
a2 | ||
b2 | ||
a3 | ||
b3 | ||
1 | ||
2 // order isn't guaranteed | ||
undefined // finished iterators' returns will yield as well | ||
3 | ||
4 | ||
undefined | ||
5 // async wil almost always come after normal ones | ||
6 | ||
``` | ||
## Alternatives | ||
@@ -68,0 +65,0 @@ |
97
test.js
@@ -1,30 +0,45 @@ | ||
const merge = require('.') | ||
// const merge = require('mergeiterator') | ||
const merge = require('.'); | ||
// const merge = require('mergeiterator'); | ||
const stream = require('streams-to-async-iterator'); | ||
const delay = (v, t = | ||
Math.random() | ||
* 100 | ||
) => | ||
// v | ||
new Promise(_ => setTimeout(() => _(v), t)); | ||
const delay = (t = Math.random() * 100) => new Promise(_ => setTimeout(() => _(t), t)); | ||
main().then(() => console.log('ok')).catch(e => { | ||
// console.log('end'); | ||
process.exitCode = 1 | ||
console.error(e); | ||
}); | ||
let array = () => array = ['array', 'array', 'array']; | ||
let basic = () => basic = (function* basic() { | ||
yield 'basic'; | ||
yield 'basic'; | ||
yield 'basic'; | ||
return 'return' | ||
})(); | ||
let asyncBasic10 = () => asyncBasic10 = (async function*() { | ||
yield delay(10); | ||
yield delay(10); | ||
yield delay(10); | ||
return 'return' | ||
})(); | ||
let asyncBasic1000 = () => asyncBasic1000 = (async function*() { | ||
yield delay(1000); | ||
yield delay(1000); | ||
yield delay(1000); | ||
})(); | ||
let stdin = () => stdin = stream(process.stdin.setEncoding('utf8')); | ||
async function main() { | ||
const aI = a(); | ||
const bI = b(); | ||
const merged = merge([ | ||
aI, | ||
bI, | ||
]); | ||
array(), | ||
basic(), | ||
asyncBasic10(), | ||
asyncBasic1000(), | ||
// stdin(), | ||
], { | ||
// yieldIterator: true | ||
}); | ||
let counter = 0 | ||
for await (const val of merged) { | ||
console.log(val); | ||
// let counter = 0 | ||
for await (const value of merged) { | ||
console.log(value); | ||
// if (counter++ > 1) | ||
@@ -34,23 +49,21 @@ // break | ||
// console.log(await merged.next()); | ||
// console.log(await merged.next()); | ||
// console.log(await merged.next()); | ||
// console.log(await merged.next()); | ||
// console.log(await merged.next()); | ||
// console.log(await merged.next()); | ||
// console.log(await merged.next()); | ||
// console.log(await merged.next()); | ||
// console.log(await merged.next()); | ||
// console.log(await merged.next()); | ||
// // console.log('1'); | ||
// console.log(await merged.next()); | ||
// // console.log('1'); | ||
} | ||
async function* a() { | ||
// console.log('before a1'); | ||
yield delay('a1', 10); | ||
throw new Error('a') | ||
// console.log('before a2'); | ||
yield delay('a2', 100); | ||
// console.log('before a3'); | ||
yield delay('a3', 100); | ||
return 'last' | ||
} | ||
async function* b() { | ||
// console.log('before b1'); | ||
yield delay('b1', 1000); | ||
// console.log('before b2'); | ||
yield delay('b2', 1000); | ||
// console.log('before b3'); | ||
yield delay('b3', 1000); | ||
return 'last' | ||
} | ||
main().then(() => console.log('ok')).catch(e => { | ||
// console.log('end'); | ||
process.exitCode = 1 | ||
console.error(e); | ||
}); |
5444
137
68