@repeaterjs/repeater
Advanced tools
Comparing version 3.0.0-beta.1 to 3.0.0
export interface RepeaterBuffer { | ||
full: boolean; | ||
empty: boolean; | ||
add(value: unknown): void; | ||
add(value: unknown): unknown; | ||
remove(): unknown; | ||
@@ -6,0 +6,0 @@ } |
@@ -87,2 +87,10 @@ 'use strict'; | ||
const NOOP = () => { }; | ||
function isPromiseLike(value) { | ||
return value != null && typeof value.then === "function"; | ||
} | ||
function swallow(value) { | ||
if (isPromiseLike(value)) { | ||
Promise.resolve(value).catch(NOOP); | ||
} | ||
} | ||
class RepeaterOverflowError extends Error { | ||
@@ -143,5 +151,5 @@ constructor(message) { | ||
} | ||
catch (error) { | ||
// sync error in executor | ||
this.execution = Promise.reject(error); | ||
catch (err) { | ||
// sync err in executor | ||
this.execution = Promise.reject(err); | ||
} | ||
@@ -170,13 +178,13 @@ // We don’t have to call this.stop with the error because all that does is | ||
* final result or any error are consumed, so that further calls to next, | ||
* return or throw return { done: true }. | ||
* return or throw return { value: undefined, done: true }. | ||
*/ | ||
consume() { | ||
const error = this.error; | ||
const err = this.err; | ||
const execution = Promise.resolve(this.execution).then((value) => { | ||
if (error != null) { | ||
throw error; | ||
if (err != null) { | ||
throw err; | ||
} | ||
return value; | ||
}); | ||
this.error = undefined; | ||
this.err = undefined; | ||
this.execution = execution.then(() => undefined, () => undefined); | ||
@@ -226,3 +234,3 @@ return this.pending === undefined | ||
push(value) { | ||
Promise.resolve(value).catch(NOOP); | ||
swallow(value); | ||
if (this.pushQueue.length >= MAX_QUEUE_LENGTH) { | ||
@@ -237,7 +245,8 @@ throw new RepeaterOverflowError(`No more than ${MAX_QUEUE_LENGTH} pending calls to push are allowed on a single repeater.`); | ||
: this.pending.then(() => value); | ||
valueP = valueP.catch((error) => { | ||
valueP = valueP.catch((err) => { | ||
if (this.state < 2 /* Stopped */) { | ||
this.error = error; | ||
this.err = err; | ||
} | ||
this.reject(); | ||
// Explicitly return undefined to avoid typescript’s horrible void type | ||
return undefined; | ||
@@ -268,7 +277,9 @@ }); | ||
let floating = true; | ||
let error; | ||
const unhandled = next.catch((error1) => { | ||
let err; | ||
const unhandled = next.catch((err1) => { | ||
if (floating) { | ||
error = error1; | ||
err = err1; | ||
} | ||
// Explicitly return undefined to avoid typescript’s horrible void type | ||
return undefined; | ||
}); | ||
@@ -282,6 +293,8 @@ next.then = function (onFulfilled, onRejected) { | ||
.then(() => { | ||
if (error != null) { | ||
this.error = error; | ||
if (err != null) { | ||
this.err = err; | ||
this.reject(); | ||
} | ||
// Explicitly return undefined to avoid typescript’s horrible void type | ||
return undefined; | ||
}); | ||
@@ -295,3 +308,3 @@ return next; | ||
*/ | ||
stop(error) { | ||
stop(err) { | ||
if (this.state >= 2 /* Stopped */) { | ||
@@ -303,4 +316,4 @@ return; | ||
this.onstop(); | ||
if (this.error == null) { | ||
this.error = error; | ||
if (this.err == null) { | ||
this.err = err; | ||
} | ||
@@ -325,3 +338,3 @@ for (const push of this.pushQueue) { | ||
next(value) { | ||
Promise.resolve(value).catch(NOOP); | ||
swallow(value); | ||
if (this.pullQueue.length >= MAX_QUEUE_LENGTH) { | ||
@@ -355,3 +368,3 @@ throw new RepeaterOverflowError(`No more than ${MAX_QUEUE_LENGTH} pending calls to Repeater.prototype.next are allowed on a single repeater.`); | ||
return(value) { | ||
Promise.resolve(value).catch(NOOP); | ||
swallow(value); | ||
this.finish(); | ||
@@ -361,3 +374,3 @@ this.execution = Promise.resolve(this.execution).then(() => value); | ||
} | ||
throw(error) { | ||
throw(err) { | ||
if (this.state <= 0 /* Initial */ || | ||
@@ -367,8 +380,8 @@ this.state >= 2 /* Stopped */ || | ||
this.finish(); | ||
if (this.error == null) { | ||
this.error = error; | ||
if (this.err == null) { | ||
this.err = err; | ||
} | ||
return this.unwrap(this.consume()); | ||
} | ||
return this.next(Promise.reject(error)); | ||
return this.next(Promise.reject(err)); | ||
} | ||
@@ -404,3 +417,3 @@ [Symbol.asyncIterator]() { | ||
} | ||
throw(error) { | ||
throw(err) { | ||
const controller = controllers.get(this); | ||
@@ -410,3 +423,3 @@ if (controller === undefined) { | ||
} | ||
return controller.throw(error); | ||
return controller.throw(err); | ||
} | ||
@@ -421,13 +434,40 @@ [Symbol.asyncIterator]() { | ||
Repeater.latest = latest; | ||
function iterators(contenders) { | ||
function isAsyncIterable(value) { | ||
return value != null && typeof value[Symbol.asyncIterator] === "function"; | ||
} | ||
function isIterable(value) { | ||
return value != null && typeof value[Symbol.iterator] === "function"; | ||
} | ||
function asyncIterators(contenders, options) { | ||
const { yieldValues, returnValues } = options; | ||
const iters = []; | ||
for (const contender of contenders) { | ||
if (typeof contender[Symbol.asyncIterator] === "function") { | ||
if (isAsyncIterable(contender)) { | ||
iters.push(contender[Symbol.asyncIterator]()); | ||
} | ||
else if (typeof contender[Symbol.iterator] === "function") { | ||
iters.push(contender[Symbol.iterator]()); | ||
else if (isIterable(contender)) { | ||
const iter = contender[Symbol.iterator](); | ||
iters.push((async function* syncToAsyncIterator() { | ||
try { | ||
let result = iter.next(); | ||
while (!result.done) { | ||
yield result.value; | ||
result = iter.next(); | ||
} | ||
return result.value; | ||
} | ||
finally { | ||
iter.return && iter.return(); | ||
} | ||
})()); | ||
} | ||
else { | ||
iters.push(new Repeater((_, stop) => (stop(), contender))); | ||
iters.push((async function* valueToAsyncIterator() { | ||
if (yieldValues) { | ||
yield contender; | ||
} | ||
if (returnValues) { | ||
return contender; | ||
} | ||
})()); | ||
} | ||
@@ -438,4 +478,4 @@ } | ||
function race(contenders) { | ||
const iters = asyncIterators(contenders, { returnValues: true }); | ||
return new Repeater(async (push, stop) => { | ||
const iters = iterators(contenders); | ||
if (!iters.length) { | ||
@@ -447,4 +487,4 @@ stop(); | ||
stop.then(() => (stopped = true)); | ||
let returned; | ||
try { | ||
let returned; | ||
while (!stopped) { | ||
@@ -456,7 +496,8 @@ const results = iters.map((iter) => iter.next()); | ||
stop(); | ||
stopped = true; | ||
returned = result.value; | ||
} | ||
}, (error) => stop(error)); | ||
}, (err) => stop(err)); | ||
} | ||
const result = await Promise.race([...results, stop]); | ||
const result = await Promise.race([stop, ...results]); | ||
if (result !== undefined && !result.done) { | ||
@@ -468,5 +509,2 @@ await push(result.value); | ||
} | ||
catch (error) { | ||
stop(error); | ||
} | ||
finally { | ||
@@ -479,4 +517,4 @@ stop(); | ||
function merge(contenders) { | ||
const iters = asyncIterators(contenders, { yieldValues: true }); | ||
return new Repeater(async (push, stop) => { | ||
const iters = iterators(contenders); | ||
if (!iters.length) { | ||
@@ -502,9 +540,4 @@ stop(); | ||
} | ||
catch (error) { | ||
stop(error); | ||
} | ||
finally { | ||
if (iter.return) { | ||
await iter.return(); | ||
} | ||
iter.return && (await iter.return()); | ||
} | ||
@@ -517,3 +550,3 @@ })); | ||
function zip(contenders) { | ||
const iters = iterators(contenders); | ||
const iters = asyncIterators(contenders, { returnValues: true }); | ||
return new Repeater(async (push, stop) => { | ||
@@ -529,5 +562,5 @@ if (!iters.length) { | ||
const resultsP = Promise.all(iters.map((iter) => iter.next())); | ||
const results = await Promise.race([resultsP, stop]); | ||
const results = await Promise.race([stop, resultsP]); | ||
if (results === undefined) { | ||
break; | ||
return; | ||
} | ||
@@ -541,5 +574,2 @@ const values = results.map((result) => result.value); | ||
} | ||
catch (error) { | ||
stop(error); | ||
} | ||
finally { | ||
@@ -552,3 +582,6 @@ stop(); | ||
function latest(contenders) { | ||
const iters = iterators(contenders); | ||
const iters = asyncIterators(contenders, { | ||
yieldValues: true, | ||
returnValues: true, | ||
}); | ||
return new Repeater(async (push, stop) => { | ||
@@ -561,22 +594,17 @@ if (!iters.length) { | ||
stop.then(() => (stopped = true)); | ||
const resultsP = Promise.all(iters.map((iter) => iter.next())); | ||
const results = await Promise.race([stop, resultsP]); | ||
if (results === undefined) { | ||
return Promise.all(iters.map(async (iter) => { | ||
if (iter.return === undefined) { | ||
return; | ||
try { | ||
const resultsP = Promise.all(iters.map((iter) => iter.next())); | ||
const results = await Promise.race([stop, resultsP]); | ||
if (results === undefined) { | ||
return; | ||
} | ||
const values = results.map((result) => result.value); | ||
if (results.every((result) => result.done)) { | ||
return values; | ||
} | ||
await push(values.slice()); | ||
return await Promise.all(iters.map(async (iter, i) => { | ||
if (results[i].done) { | ||
return results[i].value; | ||
} | ||
return (await iter.return()).value; | ||
})); | ||
} | ||
const values = results.map((result) => result.value); | ||
if (results.every((result) => result.done)) { | ||
return values; | ||
} | ||
await push(values.slice()); | ||
const result = await Promise.all(iters.map(async (iter, i) => { | ||
if (results[i].done) { | ||
return results[i].value; | ||
} | ||
try { | ||
while (!stopped) { | ||
@@ -592,14 +620,8 @@ const result = await Promise.race([stop, iter.next()]); | ||
} | ||
} | ||
catch (error) { | ||
stop(error); | ||
} | ||
finally { | ||
if (iter.return) { | ||
await iter.return(); | ||
} | ||
} | ||
})); | ||
stop(); | ||
return result; | ||
})); | ||
} | ||
finally { | ||
stop(); | ||
await Promise.all(iters.map((iter) => iter.return && iter.return())); | ||
} | ||
}); | ||
@@ -606,0 +628,0 @@ } |
@@ -8,3 +8,3 @@ import { RepeaterBuffer } from "./buffers"; | ||
export declare type Push<T, TNext = unknown> = (value: PromiseLike<T> | T) => Promise<TNext | undefined>; | ||
export declare type Stop = ((error?: any) => undefined) & Promise<undefined>; | ||
export declare type Stop = ((err?: any) => undefined) & Promise<undefined>; | ||
export declare type RepeaterExecutor<T, TReturn = any, TNext = unknown> = (push: Push<T, TNext>, stop: Stop) => PromiseLike<TReturn> | TReturn; | ||
@@ -15,3 +15,3 @@ export declare class Repeater<T, TReturn = any, TNext = unknown> { | ||
return(value?: PromiseLike<TReturn> | TReturn): Promise<IteratorResult<T>>; | ||
throw(error?: any): Promise<IteratorResult<T>>; | ||
throw(err?: any): Promise<IteratorResult<T>>; | ||
[Symbol.asyncIterator](): this; | ||
@@ -23,26 +23,6 @@ static race: typeof race; | ||
} | ||
declare type Contender<T> = AsyncIterable<T> | Iterable<T> | PromiseLike<any>; | ||
declare function race(contenders: []): Repeater<never>; | ||
declare function race<T>(contenders: Iterable<Contender<T>>): Repeater<T>; | ||
declare function race<T1, T2>(contenders: [Contender<T1>, Contender<T2>]): Repeater<T1 | T2>; | ||
declare function race<T1, T2, T3>(contenders: [Contender<T1>, Contender<T2>, Contender<T3>]): Repeater<T1 | T2 | T3>; | ||
declare function race<T1, T2, T3, T4>(contenders: [Contender<T1>, Contender<T2>, Contender<T3>, Contender<T4>]): Repeater<T1 | T2 | T3 | T4>; | ||
declare function race<T1, T2, T3, T4, T5>(contenders: [Contender<T1>, Contender<T2>, Contender<T3>, Contender<T4>, Contender<T5>]): Repeater<T1 | T2 | T3 | T4 | T5>; | ||
declare function race<T1, T2, T3, T4, T5, T6>(contenders: [Contender<T1>, Contender<T2>, Contender<T3>, Contender<T4>, Contender<T5>, Contender<T6>]): Repeater<T1 | T2 | T3 | T4 | T5 | T6>; | ||
declare function race<T1, T2, T3, T4, T5, T6, T7>(contenders: [Contender<T1>, Contender<T2>, Contender<T3>, Contender<T4>, Contender<T5>, Contender<T6>, Contender<T7>]): Repeater<T1 | T2 | T3 | T4 | T5 | T6 | T7>; | ||
declare function race<T1, T2, T3, T4, T5, T6, T7, T8>(contenders: [Contender<T1>, Contender<T2>, Contender<T3>, Contender<T4>, Contender<T5>, Contender<T6>, Contender<T7>, Contender<T8>]): Repeater<T1 | T2 | T3 | T4 | T5 | T6 | T7 | T8>; | ||
declare function race<T1, T2, T3, T4, T5, T6, T7, T8, T9>(contenders: [Contender<T1>, Contender<T2>, Contender<T3>, Contender<T4>, Contender<T5>, Contender<T6>, Contender<T7>, Contender<T8>, Contender<T9>]): Repeater<T1 | T2 | T3 | T4 | T5 | T6 | T7 | T8 | T9>; | ||
declare function race<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>(contenders: [Contender<T1>, Contender<T2>, Contender<T3>, Contender<T4>, Contender<T5>, Contender<T6>, Contender<T7>, Contender<T8>, Contender<T9>, Contender<T10>]): Repeater<T1 | T2 | T3 | T4 | T5 | T6 | T7 | T8 | T9 | T10>; | ||
declare function merge(contenders: []): Repeater<never>; | ||
declare function merge<T>(contenders: Iterable<Contender<T>>): Repeater<T>; | ||
declare function merge<T1, T2>(contenders: [Contender<T1>, Contender<T2>]): Repeater<T1 | T2>; | ||
declare function merge<T1, T2, T3>(contenders: [Contender<T1>, Contender<T2>, Contender<T3>]): Repeater<T1 | T2 | T3>; | ||
declare function merge<T1, T2, T3, T4>(contenders: [Contender<T1>, Contender<T2>, Contender<T3>, Contender<T4>]): Repeater<T1 | T2 | T3 | T4>; | ||
declare function merge<T1, T2, T3, T4, T5>(contenders: [Contender<T1>, Contender<T2>, Contender<T3>, Contender<T4>, Contender<T5>]): Repeater<T1 | T2 | T3 | T4 | T5>; | ||
declare function merge<T1, T2, T3, T4, T5, T6>(contenders: [Contender<T1>, Contender<T2>, Contender<T3>, Contender<T4>, Contender<T5>, Contender<T6>]): Repeater<T1 | T2 | T3 | T4 | T5 | T6>; | ||
declare function merge<T1, T2, T3, T4, T5, T6, T7>(contenders: [Contender<T1>, Contender<T2>, Contender<T3>, Contender<T4>, Contender<T5>, Contender<T6>, Contender<T7>]): Repeater<T1 | T2 | T3 | T4 | T5 | T6 | T7>; | ||
declare function merge<T1, T2, T3, T4, T5, T6, T7, T8>(contenders: [Contender<T1>, Contender<T2>, Contender<T3>, Contender<T4>, Contender<T5>, Contender<T6>, Contender<T7>, Contender<T8>]): Repeater<T1 | T2 | T3 | T4 | T5 | T6 | T7 | T8>; | ||
declare function merge<T1, T2, T3, T4, T5, T6, T7, T8, T9>(contenders: [Contender<T1>, Contender<T2>, Contender<T3>, Contender<T4>, Contender<T5>, Contender<T6>, Contender<T7>, Contender<T8>, Contender<T9>]): Repeater<T1 | T2 | T3 | T4 | T5 | T6 | T7 | T8 | T9>; | ||
declare function merge<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>(contenders: [Contender<T1>, Contender<T2>, Contender<T3>, Contender<T4>, Contender<T5>, Contender<T6>, Contender<T7>, Contender<T8>, Contender<T9>, Contender<T10>]): Repeater<T1 | T2 | T3 | T4 | T5 | T6 | T7 | T8 | T9 | T10>; | ||
declare function zip(contenders: []): Repeater<never, []>; | ||
declare function race<T>(contenders: Iterable<T>): Repeater<T extends AsyncIterable<infer U> | Iterable<infer U> ? U extends PromiseLike<infer V> ? V : U : never>; | ||
declare function merge<T>(contenders: Iterable<T>): Repeater<T extends AsyncIterable<infer U> | Iterable<infer U> ? U extends PromiseLike<infer V> ? V : U : T extends PromiseLike<infer U> ? U : T>; | ||
declare type Contender<T> = AsyncIterable<Promise<T> | T> | Iterable<Promise<T> | T> | PromiseLike<T> | T; | ||
declare function zip(contenders: []): Repeater<never>; | ||
declare function zip<T>(contenders: Iterable<Contender<T>>): Repeater<T[]>; | ||
@@ -58,3 +38,3 @@ declare function zip<T1, T2>(contenders: [Contender<T1>, Contender<T2>]): Repeater<[T1, T2]>; | ||
declare function zip<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>(contenders: [Contender<T1>, Contender<T2>, Contender<T3>, Contender<T4>, Contender<T5>, Contender<T6>, Contender<T7>, Contender<T8>, Contender<T9>, Contender<T10>]): Repeater<[T1, T2, T3, T4, T5, T6, T7, T8, T9, T10]>; | ||
declare function latest(contenders: []): Repeater<never, []>; | ||
declare function latest(contenders: []): Repeater<never>; | ||
declare function latest<T>(contenders: Iterable<Contender<T>>): Repeater<T[]>; | ||
@@ -61,0 +41,0 @@ declare function latest<T1, T2>(contenders: [Contender<T1>, Contender<T2>]): Repeater<[T1, T2]>; |
@@ -83,2 +83,10 @@ class FixedBuffer { | ||
const NOOP = () => { }; | ||
function isPromiseLike(value) { | ||
return value != null && typeof value.then === "function"; | ||
} | ||
function swallow(value) { | ||
if (isPromiseLike(value)) { | ||
Promise.resolve(value).catch(NOOP); | ||
} | ||
} | ||
class RepeaterOverflowError extends Error { | ||
@@ -139,5 +147,5 @@ constructor(message) { | ||
} | ||
catch (error) { | ||
// sync error in executor | ||
this.execution = Promise.reject(error); | ||
catch (err) { | ||
// sync err in executor | ||
this.execution = Promise.reject(err); | ||
} | ||
@@ -166,13 +174,13 @@ // We don’t have to call this.stop with the error because all that does is | ||
* final result or any error are consumed, so that further calls to next, | ||
* return or throw return { done: true }. | ||
* return or throw return { value: undefined, done: true }. | ||
*/ | ||
consume() { | ||
const error = this.error; | ||
const err = this.err; | ||
const execution = Promise.resolve(this.execution).then((value) => { | ||
if (error != null) { | ||
throw error; | ||
if (err != null) { | ||
throw err; | ||
} | ||
return value; | ||
}); | ||
this.error = undefined; | ||
this.err = undefined; | ||
this.execution = execution.then(() => undefined, () => undefined); | ||
@@ -222,3 +230,3 @@ return this.pending === undefined | ||
push(value) { | ||
Promise.resolve(value).catch(NOOP); | ||
swallow(value); | ||
if (this.pushQueue.length >= MAX_QUEUE_LENGTH) { | ||
@@ -233,7 +241,8 @@ throw new RepeaterOverflowError(`No more than ${MAX_QUEUE_LENGTH} pending calls to push are allowed on a single repeater.`); | ||
: this.pending.then(() => value); | ||
valueP = valueP.catch((error) => { | ||
valueP = valueP.catch((err) => { | ||
if (this.state < 2 /* Stopped */) { | ||
this.error = error; | ||
this.err = err; | ||
} | ||
this.reject(); | ||
// Explicitly return undefined to avoid typescript’s horrible void type | ||
return undefined; | ||
@@ -264,7 +273,9 @@ }); | ||
let floating = true; | ||
let error; | ||
const unhandled = next.catch((error1) => { | ||
let err; | ||
const unhandled = next.catch((err1) => { | ||
if (floating) { | ||
error = error1; | ||
err = err1; | ||
} | ||
// Explicitly return undefined to avoid typescript’s horrible void type | ||
return undefined; | ||
}); | ||
@@ -278,6 +289,8 @@ next.then = function (onFulfilled, onRejected) { | ||
.then(() => { | ||
if (error != null) { | ||
this.error = error; | ||
if (err != null) { | ||
this.err = err; | ||
this.reject(); | ||
} | ||
// Explicitly return undefined to avoid typescript’s horrible void type | ||
return undefined; | ||
}); | ||
@@ -291,3 +304,3 @@ return next; | ||
*/ | ||
stop(error) { | ||
stop(err) { | ||
if (this.state >= 2 /* Stopped */) { | ||
@@ -299,4 +312,4 @@ return; | ||
this.onstop(); | ||
if (this.error == null) { | ||
this.error = error; | ||
if (this.err == null) { | ||
this.err = err; | ||
} | ||
@@ -321,3 +334,3 @@ for (const push of this.pushQueue) { | ||
next(value) { | ||
Promise.resolve(value).catch(NOOP); | ||
swallow(value); | ||
if (this.pullQueue.length >= MAX_QUEUE_LENGTH) { | ||
@@ -351,3 +364,3 @@ throw new RepeaterOverflowError(`No more than ${MAX_QUEUE_LENGTH} pending calls to Repeater.prototype.next are allowed on a single repeater.`); | ||
return(value) { | ||
Promise.resolve(value).catch(NOOP); | ||
swallow(value); | ||
this.finish(); | ||
@@ -357,3 +370,3 @@ this.execution = Promise.resolve(this.execution).then(() => value); | ||
} | ||
throw(error) { | ||
throw(err) { | ||
if (this.state <= 0 /* Initial */ || | ||
@@ -363,8 +376,8 @@ this.state >= 2 /* Stopped */ || | ||
this.finish(); | ||
if (this.error == null) { | ||
this.error = error; | ||
if (this.err == null) { | ||
this.err = err; | ||
} | ||
return this.unwrap(this.consume()); | ||
} | ||
return this.next(Promise.reject(error)); | ||
return this.next(Promise.reject(err)); | ||
} | ||
@@ -400,3 +413,3 @@ [Symbol.asyncIterator]() { | ||
} | ||
throw(error) { | ||
throw(err) { | ||
const controller = controllers.get(this); | ||
@@ -406,3 +419,3 @@ if (controller === undefined) { | ||
} | ||
return controller.throw(error); | ||
return controller.throw(err); | ||
} | ||
@@ -417,13 +430,40 @@ [Symbol.asyncIterator]() { | ||
Repeater.latest = latest; | ||
function iterators(contenders) { | ||
function isAsyncIterable(value) { | ||
return value != null && typeof value[Symbol.asyncIterator] === "function"; | ||
} | ||
function isIterable(value) { | ||
return value != null && typeof value[Symbol.iterator] === "function"; | ||
} | ||
function asyncIterators(contenders, options) { | ||
const { yieldValues, returnValues } = options; | ||
const iters = []; | ||
for (const contender of contenders) { | ||
if (typeof contender[Symbol.asyncIterator] === "function") { | ||
if (isAsyncIterable(contender)) { | ||
iters.push(contender[Symbol.asyncIterator]()); | ||
} | ||
else if (typeof contender[Symbol.iterator] === "function") { | ||
iters.push(contender[Symbol.iterator]()); | ||
else if (isIterable(contender)) { | ||
const iter = contender[Symbol.iterator](); | ||
iters.push((async function* syncToAsyncIterator() { | ||
try { | ||
let result = iter.next(); | ||
while (!result.done) { | ||
yield result.value; | ||
result = iter.next(); | ||
} | ||
return result.value; | ||
} | ||
finally { | ||
iter.return && iter.return(); | ||
} | ||
})()); | ||
} | ||
else { | ||
iters.push(new Repeater((_, stop) => (stop(), contender))); | ||
iters.push((async function* valueToAsyncIterator() { | ||
if (yieldValues) { | ||
yield contender; | ||
} | ||
if (returnValues) { | ||
return contender; | ||
} | ||
})()); | ||
} | ||
@@ -434,4 +474,4 @@ } | ||
function race(contenders) { | ||
const iters = asyncIterators(contenders, { returnValues: true }); | ||
return new Repeater(async (push, stop) => { | ||
const iters = iterators(contenders); | ||
if (!iters.length) { | ||
@@ -443,4 +483,4 @@ stop(); | ||
stop.then(() => (stopped = true)); | ||
let returned; | ||
try { | ||
let returned; | ||
while (!stopped) { | ||
@@ -452,7 +492,8 @@ const results = iters.map((iter) => iter.next()); | ||
stop(); | ||
stopped = true; | ||
returned = result.value; | ||
} | ||
}, (error) => stop(error)); | ||
}, (err) => stop(err)); | ||
} | ||
const result = await Promise.race([...results, stop]); | ||
const result = await Promise.race([stop, ...results]); | ||
if (result !== undefined && !result.done) { | ||
@@ -464,5 +505,2 @@ await push(result.value); | ||
} | ||
catch (error) { | ||
stop(error); | ||
} | ||
finally { | ||
@@ -475,4 +513,4 @@ stop(); | ||
function merge(contenders) { | ||
const iters = asyncIterators(contenders, { yieldValues: true }); | ||
return new Repeater(async (push, stop) => { | ||
const iters = iterators(contenders); | ||
if (!iters.length) { | ||
@@ -498,9 +536,4 @@ stop(); | ||
} | ||
catch (error) { | ||
stop(error); | ||
} | ||
finally { | ||
if (iter.return) { | ||
await iter.return(); | ||
} | ||
iter.return && (await iter.return()); | ||
} | ||
@@ -513,3 +546,3 @@ })); | ||
function zip(contenders) { | ||
const iters = iterators(contenders); | ||
const iters = asyncIterators(contenders, { returnValues: true }); | ||
return new Repeater(async (push, stop) => { | ||
@@ -525,5 +558,5 @@ if (!iters.length) { | ||
const resultsP = Promise.all(iters.map((iter) => iter.next())); | ||
const results = await Promise.race([resultsP, stop]); | ||
const results = await Promise.race([stop, resultsP]); | ||
if (results === undefined) { | ||
break; | ||
return; | ||
} | ||
@@ -537,5 +570,2 @@ const values = results.map((result) => result.value); | ||
} | ||
catch (error) { | ||
stop(error); | ||
} | ||
finally { | ||
@@ -548,3 +578,6 @@ stop(); | ||
function latest(contenders) { | ||
const iters = iterators(contenders); | ||
const iters = asyncIterators(contenders, { | ||
yieldValues: true, | ||
returnValues: true, | ||
}); | ||
return new Repeater(async (push, stop) => { | ||
@@ -557,22 +590,17 @@ if (!iters.length) { | ||
stop.then(() => (stopped = true)); | ||
const resultsP = Promise.all(iters.map((iter) => iter.next())); | ||
const results = await Promise.race([stop, resultsP]); | ||
if (results === undefined) { | ||
return Promise.all(iters.map(async (iter) => { | ||
if (iter.return === undefined) { | ||
return; | ||
try { | ||
const resultsP = Promise.all(iters.map((iter) => iter.next())); | ||
const results = await Promise.race([stop, resultsP]); | ||
if (results === undefined) { | ||
return; | ||
} | ||
const values = results.map((result) => result.value); | ||
if (results.every((result) => result.done)) { | ||
return values; | ||
} | ||
await push(values.slice()); | ||
return await Promise.all(iters.map(async (iter, i) => { | ||
if (results[i].done) { | ||
return results[i].value; | ||
} | ||
return (await iter.return()).value; | ||
})); | ||
} | ||
const values = results.map((result) => result.value); | ||
if (results.every((result) => result.done)) { | ||
return values; | ||
} | ||
await push(values.slice()); | ||
const result = await Promise.all(iters.map(async (iter, i) => { | ||
if (results[i].done) { | ||
return results[i].value; | ||
} | ||
try { | ||
while (!stopped) { | ||
@@ -588,14 +616,8 @@ const result = await Promise.race([stop, iter.next()]); | ||
} | ||
} | ||
catch (error) { | ||
stop(error); | ||
} | ||
finally { | ||
if (iter.return) { | ||
await iter.return(); | ||
} | ||
} | ||
})); | ||
stop(); | ||
return result; | ||
})); | ||
} | ||
finally { | ||
stop(); | ||
await Promise.all(iters.map((iter) => iter.return && iter.return())); | ||
} | ||
}); | ||
@@ -602,0 +624,0 @@ } |
{ | ||
"name": "@repeaterjs/repeater", | ||
"version": "3.0.0-beta.1", | ||
"version": "3.0.0", | ||
"description": "The missing constructor function for creating safe async iterators", | ||
@@ -5,0 +5,0 @@ "repository": { |
175
README.md
@@ -1,64 +0,151 @@ | ||
# @repeaterjs/repeater | ||
The missing constructor for creating safe async iterators | ||
# Repeater.js | ||
The missing constructor for creating safe async iterators. | ||
For more information, visit [repeater.js.org](https://repeater.js.org). | ||
## API | ||
## Installation | ||
```ts | ||
class Repeater<T> implements AsyncIterableIterator<T> { | ||
constructor(executor: RepeaterExecutor<T>, buffer?: RepeaterBuffer<T>); | ||
next(value?: any): Promise<IteratorResult<T>>; | ||
return(value?: any): Promise<IteratorResult<T>>; | ||
throw(error: any): Promise<IteratorResult<T>>; | ||
[Symbol.asyncIterator](): this; | ||
} | ||
Repeater.js is available on [npm](https://www.npmjs.com/package/@repeaterjs/repeater) in the CommonJS and ESModule formats. | ||
`$ npm install @repeaterjs/repeater` | ||
`$ yarn add @repeaterjs/repeater` | ||
## Requirements | ||
The core `@repeaterjs/repeater` module has no dependencies, but requires the following globals in order to work: | ||
- `Promise` | ||
- `WeakMap` | ||
- `Symbol` | ||
- `Symbol.iterator` | ||
- `Symbol.asyncIterator` | ||
In addition, repeaters are most useful when used via `async/await` and `for await…of` syntax. You can transpile your code with babel or typescript to support enviroments which lack these features. | ||
## Examples | ||
<h4 id="timestamps">Logging timestamps with setInterval</h4> | ||
```js | ||
import { Repeater } from "@repeaterjs/repeater"; | ||
const timestamps = new Repeater(async (push, stop) => { | ||
push(Date.now()); | ||
const interval = setInterval(() => push(Date.now()), 1000); | ||
await stop; | ||
clearInterval(interval); | ||
}); | ||
(async function() { | ||
let i = 0; | ||
for await (const timestamp of timestamps) { | ||
console.log(timestamp); | ||
i++; | ||
if (i >= 10) { | ||
console.log("ALL DONE!"); | ||
break; // triggers clearInterval above | ||
} | ||
} | ||
})(); | ||
``` | ||
The `Repeater` class implements the `AsyncIterableIterator` interface. Repeaters are designed to be indistinguishable from async generator objects. | ||
<h4 id="websocket">Creating a repeater from a websocket</h4> | ||
```ts | ||
type Push<T> = (value: PromiseLike<T> | T) => Promise<any | void>; | ||
```js | ||
import { Repeater } from "@repeaterjs/repeater"; | ||
interface Stop extends Promise<any | void> { | ||
(error?: any): void; | ||
} | ||
const socket = new WebSocket("ws://echo.websocket.org"); | ||
const messages = new Repeater(async (push, stop) => { | ||
socket.onmessage = (ev) => push(ev.data); | ||
socket.onerror = () => stop(new Error("WebSocket error")); | ||
socket.onclose = () => stop(); | ||
await stop; | ||
socket.close(); | ||
}); | ||
type RepeaterExecutor<T> = ( | ||
push: Push<T>, | ||
stop: Stop | ||
) => Promise<T | void> | T | void; | ||
(async function() { | ||
for await (const message of messages) { | ||
console.log(message); | ||
if (message === "close") { | ||
console.log("Closing!"); | ||
break; // closes the socket | ||
} | ||
} | ||
})(); | ||
socket.onopen = () => { | ||
socket.send("hello"); // "hello" | ||
socket.send("world"); // "world" | ||
socket.send("close"); // "close", "Closing!" | ||
}; | ||
``` | ||
The `RepeaterExecutor` is passed the arguments `push` and `stop`. | ||
<h4 id="konami-code">Listening for the <a href="https://en.wikipedia.org/wiki/Konami_Code">Konami Code</a> and canceling if <kbd>Escape</kbd> is pressed</h4> | ||
`push` is a function which allows you to enqueue values onto the repeater. It synchronously throws an error if there are too many pending pushes on the repeater (currently set to 1024). It returns a promise which resolves when it’s safe to push more values. | ||
```js | ||
import { Repeater } from "@repeaterjs/repeater"; | ||
`stop` is a both a promise and a function. As a function, `stop` can be called to stop a repeater. Calling `stop` without any arguments stops the repeater without error, and passing an error both stops the repeater and causes the final iteration to reject with that error. | ||
const keys = new Repeater(async (push, stop) => { | ||
const listener = (ev) => { | ||
if (ev.key === "Escape") { | ||
stop(); | ||
} else { | ||
push(ev.key); | ||
} | ||
}; | ||
window.addEventListener("keyup", listener); | ||
await stop; | ||
window.removeEventListener("keyup", listener); | ||
}); | ||
As a promise, `stop` can be awaited to defer event handler cleanup, and it can also be used with `Promise.race` to abort pending promises. If you pass a value to `Repeater.prototype.return`, `stop` will resolve to that value. | ||
const konami = ["ArrowUp", "ArrowUp", "ArrowDown", "ArrowDown", "ArrowLeft", "ArrowRight", "ArrowLeft", "ArrowRight", "b", "a"]; | ||
The value of the final interation of the repeater will be the return value of the executor. If the executor throws an error or returns a promise rejection, the repeater will be immediately stopped and the final iteration will throw. | ||
(async function() { | ||
let i = 0; | ||
for await (const key of keys) { | ||
if (key === konami[i]) { | ||
i++; | ||
} else { | ||
i = 0; | ||
} | ||
if (i >= konami.length) { | ||
console.log("KONAMI!!!"); | ||
break; // removes the keyup listener | ||
} | ||
} | ||
})(); | ||
``` | ||
```ts | ||
interface RepeaterBuffer<T> { | ||
full: boolean; | ||
empty: boolean; | ||
add(value: T): void; | ||
remove(): T | undefined; | ||
} | ||
<h4 id="observables">Converting an observable to an async iterator</h4> | ||
class FixedBuffer<T> implements RepeaterBuffer<T> { | ||
constructor(capacity: number); | ||
} | ||
```js | ||
import { Subject } from "rxjs"; | ||
import { Repeater } from "@repeaterjs/repeater"; | ||
class SlidingBuffer<T> implements RepeaterBuffer<T> { | ||
constructor(capacity: number); | ||
} | ||
const observable = new Subject(); | ||
const repeater = new Repeater(async (push, stop) => { | ||
const subscription = observable.subscribe({ | ||
next: (value) => push(value), | ||
error: (err) => stop(err), | ||
complete: () => stop(), | ||
}); | ||
await stop; | ||
subscription.unsubscribe(); | ||
}); | ||
class DroppingBuffer<T> implements RepeaterBuffer<T> { | ||
constructor(capacity: number); | ||
} | ||
(async function() { | ||
try { | ||
for await (const value of repeater) { | ||
console.log("Value: ", value); | ||
} | ||
} catch (err) { | ||
console.log("Error caught: ", err); | ||
} | ||
})(); | ||
observable.next(1); | ||
// Value: 1 | ||
observable.next(2); | ||
// Value: 2 | ||
observable.error(new Error("Hello from observable")); | ||
// Error caught: Error: Hello from observable | ||
``` | ||
The `Repeater` constructor optionally takes a `RepeaterBuffer` instance as its second argument. Buffers allow multiple values to be pushed onto repeaters without waiting. `FixedBuffer` allows repeaters to push a set number of values, `DroppingBuffer` drops the *latest* values when the buffer has reached capacity, and `SlidingBuffer` drops the *earliest* values when the buffer has reached capacity. You can define custom buffering behaviors by implementing the `RepeaterBuffer` interface. |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
No v1
QualityPackage is not semver >=1. This means it is not stable and does not support ^ ranges.
Found 1 instance in 1 package
1279
1
152
0
53512