Socket
Socket
Sign inDemoInstall

@rimbu/stream

Package Overview
Dependencies
Maintainers
1
Versions
56
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@rimbu/stream - npm Package Compare versions

Comparing version 0.10.5 to 0.10.6

707

dist/module/async-custom/async-fast-iterator-base.js
import { __awaiter } from "tslib";
import { Token } from '@rimbu/base';
import { AsyncOptLazy } from '@rimbu/common';
import { closeIters, fromAsyncStreamSource } from '@rimbu/stream/async-custom';
import { AsyncOptLazy, CollectFun, TraverseState, } from '@rimbu/common';
import { closeIters, fromAsyncStreamSource, isEmptyAsyncStreamSourceInstance, } from '@rimbu/stream/async-custom';
export const fixedDoneAsyncIteratorResult = Promise.resolve({

@@ -222,2 +222,705 @@ done: true,

}
export class AsyncPrependIterator extends AsyncFastIteratorBase {
constructor(source, item) {
super();
this.source = source;
this.item = item;
this.prependDone = false;
this.return = () => __awaiter(this, void 0, void 0, function* () {
if (this.prependDone)
return closeIters(this.source);
});
}
fastNext(otherwise) {
if (this.prependDone) {
return this.source.fastNext(otherwise);
}
this.prependDone = true;
return AsyncOptLazy.toMaybePromise(this.item);
}
}
export class AsyncAppendIterator extends AsyncFastIteratorBase {
constructor(source, item) {
super();
this.source = source;
this.item = item;
this.appendDone = false;
this.return = () => __awaiter(this, void 0, void 0, function* () {
if (!this.appendDone)
return closeIters(source);
});
}
fastNext(otherwise) {
return __awaiter(this, void 0, void 0, function* () {
if (this.appendDone)
return AsyncOptLazy.toMaybePromise(otherwise);
const done = Symbol('Done');
const value = yield this.source.fastNext(done);
if (done !== value)
return value;
this.appendDone = true;
return AsyncOptLazy.toMaybePromise(this.item);
});
}
}
export class AsyncIndexedIterator extends AsyncFastIteratorBase {
constructor(source, startIndex = 0) {
super();
this.source = source;
this.startIndex = startIndex;
this.index = startIndex;
this.return = () => closeIters(source);
}
fastNext(otherwise) {
return __awaiter(this, void 0, void 0, function* () {
const done = Symbol('Done');
const value = yield this.source.fastNext(done);
if (done === value) {
return AsyncOptLazy.toMaybePromise(otherwise);
}
return [this.index++, value];
});
}
}
export class AsyncMapIterator extends AsyncFastIteratorBase {
constructor(source, mapFun) {
super();
this.source = source;
this.mapFun = mapFun;
this.state = TraverseState();
this.return = () => closeIters(source);
}
fastNext(otherwise) {
return __awaiter(this, void 0, void 0, function* () {
const state = this.state;
const done = Symbol('Done');
const next = yield this.source.fastNext(done);
if (done === next) {
return AsyncOptLazy.toMaybePromise(otherwise);
}
return this.mapFun(next, state.nextIndex());
});
}
}
export class AsyncMapPureIterator extends AsyncFastIteratorBase {
constructor(source, mapFun, args) {
super();
this.source = source;
this.mapFun = mapFun;
this.args = args;
this.return = () => closeIters(source);
}
fastNext(otherwise) {
return __awaiter(this, void 0, void 0, function* () {
const done = Symbol('Done');
const next = yield this.source.fastNext(done);
if (done === next)
return AsyncOptLazy.toMaybePromise(otherwise);
return this.mapFun(next, ...this.args);
});
}
}
export class AsyncFlatMapIterator extends AsyncFastIteratorBase {
constructor(source, flatMapFun) {
super();
this.source = source;
this.flatMapFun = flatMapFun;
this.state = TraverseState();
this.done = false;
this.currentIterator = null;
this.iterator = this.source[Symbol.asyncIterator]();
this.return = () => closeIters(this.currentIterator, this.iterator);
}
fastNext(otherwise) {
return __awaiter(this, void 0, void 0, function* () {
const state = this.state;
if (state.halted || this.done) {
return AsyncOptLazy.toMaybePromise(otherwise);
}
const done = Symbol('Done');
let nextValue = done;
while (null === this.currentIterator ||
done === (nextValue = yield this.currentIterator.fastNext(done))) {
const nextIter = yield this.iterator.fastNext(done);
if (done === nextIter) {
this.done = true;
return AsyncOptLazy.toMaybePromise(otherwise);
}
const nextSource = this.flatMapFun(nextIter, state.nextIndex(), state.halt);
const currentIterator = fromAsyncStreamSource(nextSource)[Symbol.asyncIterator]();
this.currentIterator = currentIterator;
}
return nextValue;
});
}
}
export class AsyncConcatIterator extends AsyncFastIteratorBase {
constructor(source, otherSources) {
super();
this.source = source;
this.otherSources = otherSources;
this.sourceIndex = 0;
this.iterator = source[Symbol.asyncIterator]();
this.return = () => closeIters(this.iterator);
}
fastNext(otherwise) {
return __awaiter(this, void 0, void 0, function* () {
const done = Symbol('Done');
let value;
const length = this.otherSources.length;
while (done === (value = yield this.iterator.fastNext(done))) {
if (this.sourceIndex >= length) {
return AsyncOptLazy.toMaybePromise(otherwise);
}
let nextSource = this.otherSources[this.sourceIndex++];
while (isEmptyAsyncStreamSourceInstance(nextSource)) {
if (this.sourceIndex >= length) {
return AsyncOptLazy.toMaybePromise(otherwise);
}
nextSource = this.otherSources[this.sourceIndex++];
}
this.iterator = fromAsyncStreamSource(nextSource)[Symbol.asyncIterator]();
}
return value;
});
}
}
export class AsyncIntersperseIterator extends AsyncFastIteratorBase {
constructor(source, sepStream) {
super();
this.source = source;
this.sepStream = sepStream;
this.isInitialized = false;
this.isDone = false;
this.return = () => __awaiter(this, void 0, void 0, function* () {
if (!this.isDone)
return closeIters(this.sepIterator, this.source);
});
}
fastNext(otherwise) {
return __awaiter(this, void 0, void 0, function* () {
if (this.isDone) {
return AsyncOptLazy.toMaybePromise(otherwise);
}
const done = Symbol('Done');
if (undefined !== this.sepIterator) {
const sepNext = yield this.sepIterator.fastNext(done);
if (done !== sepNext)
return sepNext;
this.sepIterator = undefined;
}
if (this.isInitialized) {
const newNextValue = yield this.source.fastNext(done);
if (done === newNextValue) {
this.isDone = true;
return this.nextValue;
}
const currentNextValue = this.nextValue;
this.nextValue = newNextValue;
this.sepIterator = this.sepStream[Symbol.asyncIterator]();
return currentNextValue;
}
const nextValue = yield this.source.fastNext(done);
if (done === nextValue) {
return AsyncOptLazy.toMaybePromise(otherwise);
}
const newNextValue = yield this.source.fastNext(done);
if (done === newNextValue) {
return nextValue;
}
this.nextValue = newNextValue;
this.isInitialized = true;
this.sepIterator = this.sepStream[Symbol.asyncIterator]();
return nextValue;
});
}
}
export class AsyncFilterIterator extends AsyncFastIteratorBase {
constructor(source, pred, invert) {
super();
this.source = source;
this.pred = pred;
this.invert = invert;
this.state = TraverseState();
this.return = () => closeIters(source);
}
fastNext(otherwise) {
return __awaiter(this, void 0, void 0, function* () {
const state = this.state;
if (state.halted) {
return AsyncOptLazy.toMaybePromise(otherwise);
}
const done = Symbol('Done');
let value;
const source = this.source;
const pred = this.pred;
const halt = state.halt;
const invert = this.invert;
while (!state.halted && done !== (value = yield source.fastNext(done))) {
const cond = yield pred(value, state.nextIndex(), halt);
if (cond !== invert)
return value;
}
if (state.halted && done !== value) {
yield closeIters(this);
}
return AsyncOptLazy.toMaybePromise(otherwise);
});
}
}
export class AsyncFilterPureIterator extends AsyncFastIteratorBase {
constructor(source, pred, args, invert) {
super();
this.source = source;
this.pred = pred;
this.args = args;
this.invert = invert;
this.return = () => closeIters(source);
}
fastNext(otherwise) {
return __awaiter(this, void 0, void 0, function* () {
const done = Symbol('Done');
let value;
const source = this.source;
const pred = this.pred;
const args = this.args;
const invert = this.invert;
while (done !== (value = yield source.fastNext(done))) {
const cond = yield pred(value, ...args);
if (cond !== invert)
return value;
}
return AsyncOptLazy.toMaybePromise(otherwise);
});
}
}
export class AsyncCollectIterator extends AsyncFastIteratorBase {
constructor(source, collectFun) {
super();
this.source = source;
this.collectFun = collectFun;
this.state = TraverseState();
this.return = () => closeIters(source);
}
fastNext(otherwise) {
return __awaiter(this, void 0, void 0, function* () {
const state = this.state;
if (state.halted) {
return AsyncOptLazy.toMaybePromise(otherwise);
}
const { halt } = state;
const done = Symbol('Done');
let value;
const source = this.source;
const collectFun = this.collectFun;
try {
while (!state.halted && done !== (value = yield source.fastNext(done))) {
const result = yield collectFun(value, state.nextIndex(), CollectFun.Skip, halt);
if (CollectFun.Skip === result)
continue;
return result;
}
return AsyncOptLazy.toMaybePromise(otherwise);
}
finally {
if (state.halted && done !== value) {
yield closeIters(this);
}
}
});
}
}
export class AsyncIndicesWhereIterator extends AsyncFastIteratorBase {
constructor(source, pred) {
super();
this.source = source;
this.pred = pred;
this.index = 0;
this.return = () => closeIters(source);
}
fastNext(otherwise) {
return __awaiter(this, void 0, void 0, function* () {
const done = Symbol('Done');
let value;
const source = this.source;
const pred = this.pred;
while (done !== (value = yield source.fastNext(done))) {
const cond = yield pred(value);
if (cond) {
return this.index++;
}
this.index++;
}
return AsyncOptLazy.toMaybePromise(otherwise);
});
}
}
export class AsyncIndicesOfIterator extends AsyncFastIteratorBase {
constructor(source, searchValue, eq) {
super();
this.source = source;
this.searchValue = searchValue;
this.eq = eq;
this.index = 0;
this.return = () => closeIters(source);
}
fastNext(otherwise) {
return __awaiter(this, void 0, void 0, function* () {
const done = Symbol('Done');
let value;
const source = this.source;
const searchValue = this.searchValue;
const eq = this.eq;
while (done !== (value = yield source.fastNext(done))) {
if (eq(searchValue, value))
return this.index++;
this.index++;
}
return AsyncOptLazy.toMaybePromise(otherwise);
});
}
}
export class AsyncTakeWhileIterator extends AsyncFastIteratorBase {
constructor(source, pred) {
super();
this.source = source;
this.pred = pred;
this.isDone = false;
this.index = 0;
this.return = () => closeIters(source);
}
fastNext(otherwise) {
return __awaiter(this, void 0, void 0, function* () {
if (this.isDone)
return AsyncOptLazy.toMaybePromise(otherwise);
const done = Symbol('Done');
const next = yield this.source.fastNext(done);
if (done === next) {
this.isDone = true;
return AsyncOptLazy.toMaybePromise(otherwise);
}
if (yield this.pred(next, this.index++))
return next;
this.isDone = true;
yield closeIters(this);
return AsyncOptLazy.toMaybePromise(otherwise);
});
}
}
export class AsyncDropWhileIterator extends AsyncFastIteratorBase {
constructor(source, pred) {
super();
this.source = source;
this.pred = pred;
this.pass = false;
this.index = 0;
this.return = () => closeIters(source);
}
fastNext(otherwise) {
return __awaiter(this, void 0, void 0, function* () {
const source = this.source;
if (this.pass)
return source.fastNext(otherwise);
const done = Symbol('Done');
let value;
while (done !== (value = yield source.fastNext(done))) {
this.pass = !(yield this.pred(value, this.index++));
if (this.pass)
return value;
}
return AsyncOptLazy.toMaybePromise(otherwise);
});
}
}
export class AsyncTakeIterator extends AsyncFastIteratorBase {
constructor(source, amount) {
super();
this.source = source;
this.amount = amount;
this.i = 0;
this.return = () => closeIters(source);
}
fastNext(otherwise) {
return __awaiter(this, void 0, void 0, function* () {
if (this.i++ >= this.amount) {
yield closeIters(this);
this.return = undefined;
return AsyncOptLazy.toMaybePromise(otherwise);
}
return this.source.fastNext(otherwise);
});
}
}
export class AsyncDropIterator extends AsyncFastIteratorBase {
constructor(source, amount) {
super();
this.source = source;
this.amount = amount;
this.return = () => closeIters(source);
this.remain = amount;
}
fastNext(otherwise) {
return __awaiter(this, void 0, void 0, function* () {
const source = this.source;
if (this.remain <= 0)
return source.fastNext(otherwise);
const done = Symbol('Done');
let value;
while (done !== (value = yield source.fastNext(done))) {
if (this.remain-- <= 0) {
return value;
}
}
return AsyncOptLazy.toMaybePromise(otherwise);
});
}
}
export class AsyncRepeatIterator extends AsyncFastIteratorBase {
constructor(source, amount) {
super();
this.source = source;
this.amount = amount;
this.isEmpty = true;
this.iterator = source[Symbol.asyncIterator]();
this.return = () => closeIters(this.iterator);
this.remain = amount;
}
fastNext(otherwise) {
return __awaiter(this, void 0, void 0, function* () {
const done = Symbol('Done');
const iterator = this.iterator;
let value = yield iterator.fastNext(done);
if (done !== value) {
this.isEmpty = false;
return value;
}
if (this.isEmpty) {
return AsyncOptLazy.toMaybePromise(otherwise);
}
if (undefined !== this.remain) {
this.remain--;
if (this.remain <= 0) {
return AsyncOptLazy.toMaybePromise(otherwise);
}
}
this.iterator = this.source[Symbol.asyncIterator]();
value = yield this.iterator.fastNext(done);
if (done === value) {
return AsyncOptLazy.toMaybePromise(otherwise);
}
return value;
});
}
}
export class AsyncSplitWhereIterator extends AsyncFastIteratorBase {
constructor(source, pred) {
super();
this.source = source;
this.pred = pred;
this.index = 0;
this.return = () => closeIters(source);
}
fastNext(otherwise) {
return __awaiter(this, void 0, void 0, function* () {
const startIndex = this.index;
if (startIndex < 0) {
return AsyncOptLazy.toMaybePromise(otherwise);
}
const result = [];
const source = this.source;
const done = Symbol('Done');
let value;
const pred = this.pred;
while (done !== (value = yield source.fastNext(done))) {
if (yield pred(value, this.index++))
return result;
result.push(value);
}
this.return = undefined;
if (startIndex === this.index) {
this.index = -1;
return AsyncOptLazy.toMaybePromise(otherwise);
}
this.index = -1;
return result;
});
}
}
export class AsyncFoldIterator extends AsyncFastIteratorBase {
constructor(source, init, getNext) {
super();
this.source = source;
this.init = init;
this.getNext = getNext;
this.isInitialized = false;
this.state = TraverseState();
this.return = () => closeIters(source);
}
fastNext(otherwise) {
return __awaiter(this, void 0, void 0, function* () {
if (!this.isInitialized) {
this.current = yield AsyncOptLazy.toMaybePromise(this.init);
this.isInitialized = true;
}
const state = this.state;
if (state.halted) {
yield closeIters(this);
this.return = undefined;
return AsyncOptLazy.toMaybePromise(otherwise);
}
const done = Symbol('done');
const value = yield this.source.fastNext(done);
if (done === value) {
this.return = undefined;
return AsyncOptLazy.toMaybePromise(otherwise);
}
this.current = yield this.getNext(this.current, value, state.nextIndex(), state.halt);
return this.current;
});
}
}
export class AsyncSplitOnIterator extends AsyncFastIteratorBase {
constructor(source, sepElem, eq) {
super();
this.source = source;
this.sepElem = sepElem;
this.eq = eq;
this.isDone = false;
this.return = () => closeIters(source);
}
fastNext(otherwise) {
return __awaiter(this, void 0, void 0, function* () {
if (this.isDone) {
return AsyncOptLazy.toMaybePromise(otherwise);
}
const result = [];
const source = this.source;
const done = Symbol('Done');
let value;
let processed = false;
const eq = this.eq;
const sepElem = this.sepElem;
while (done !== (value = yield source.fastNext(done))) {
processed = true;
if (eq(value, sepElem))
return result;
result.push(value);
}
this.return = undefined;
this.isDone = true;
if (!processed)
return AsyncOptLazy.toMaybePromise(otherwise);
return result;
});
}
}
export class AsyncReduceIterator extends AsyncFastIteratorBase {
constructor(source, reducer) {
super();
this.source = source;
this.reducer = reducer;
this.traverseState = TraverseState();
this.isInitialized = false;
this.isDone = false;
this.return = () => closeIters(source);
}
fastNext(otherwise) {
var _a, _b;
return __awaiter(this, void 0, void 0, function* () {
if (this.isDone) {
return AsyncOptLazy.toMaybePromise(otherwise);
}
const done = Symbol('Done');
const value = yield this.source.fastNext(done);
if (done === value) {
this.isDone = true;
this.return = undefined;
return AsyncOptLazy.toMaybePromise(otherwise);
}
const reducer = this.reducer;
if (!this.isInitialized) {
this.state = yield AsyncOptLazy.toMaybePromise(reducer.init);
this.isInitialized = true;
}
const traverseState = this.traverseState;
try {
this.state = yield reducer.next(this.state, value, traverseState.nextIndex(), traverseState.halt);
}
catch (err) {
this.isDone = true;
yield closeIters(this);
this.return = undefined;
yield ((_a = reducer.onClose) === null || _a === void 0 ? void 0 : _a.call(reducer, this.state, err));
throw err;
}
if (traverseState.halted) {
this.isDone = true;
yield closeIters(this);
this.return = undefined;
yield ((_b = reducer.onClose) === null || _b === void 0 ? void 0 : _b.call(reducer, this.state));
}
return reducer.stateToResult(this.state);
});
}
}
export class AsyncReduceAllIterator extends AsyncFastIteratorBase {
constructor(source, reducers) {
super();
this.source = source;
this.reducers = reducers;
this.index = 0;
this.isDone = false;
this.reducersToClose = new Set(reducers);
this.return = () => __awaiter(this, void 0, void 0, function* () {
yield closeIters(source);
const state = this.state;
if (state) {
yield Promise.all([...this.reducersToClose].map((reducer, index) => { var _a; return (_a = reducer.onClose) === null || _a === void 0 ? void 0 : _a.call(reducer, state[index], this.err); }));
}
});
}
fastNext(otherwise) {
return __awaiter(this, void 0, void 0, function* () {
const reducers = this.reducers;
if (undefined === this.state) {
this.state = yield Promise.all(reducers.map((d) => AsyncOptLazy.toMaybePromise(d.init)));
}
if (this.isDone) {
yield closeIters(this);
this.return = undefined;
return AsyncOptLazy.toMaybePromise(otherwise);
}
const done = Symbol('Done');
const value = yield this.source.fastNext(done);
if (done === value) {
return AsyncOptLazy.toMaybePromise(otherwise);
}
const state = this.state;
this.state = yield Promise.all(reducers.map((red, i) => {
const st = state[i];
if (!this.reducersToClose.has(red))
return st;
try {
return red.next(st, value, this.index, () => __awaiter(this, void 0, void 0, function* () {
var _a;
this.reducersToClose.delete(red);
yield ((_a = red.onClose) === null || _a === void 0 ? void 0 : _a.call(red, st));
return st;
}));
}
catch (e) {
this.err = e;
}
}));
this.index++;
this.isDone = this.reducersToClose.size === 0 || undefined !== this.err;
if (this.isDone) {
yield closeIters(this);
this.return = undefined;
return AsyncOptLazy.toMaybePromise(otherwise);
}
return Promise.all(this.state.map((s, i) => reducers[i].stateToResult(s)));
});
}
}
//# sourceMappingURL=async-fast-iterator-base.js.map
import { Token } from '@rimbu/base';
import { ArrayNonEmpty, AsyncOptLazy, MaybePromise } from '@rimbu/common';
import type { AsyncFastIterator, AsyncStreamSource } from '@rimbu/stream/async';
import { ArrayNonEmpty, AsyncCollectFun, AsyncOptLazy, AsyncReducer, Eq, MaybePromise, TraverseState } from '@rimbu/common';
import type { AsyncFastIterator, AsyncStream, AsyncStreamSource } from '@rimbu/stream/async';
export declare const fixedDoneAsyncIteratorResult: Promise<IteratorReturnResult<any> | IteratorYieldResult<any>>;

@@ -82,1 +82,190 @@ export declare function isAsyncFastIterator<T>(iterator: AsyncIterator<T>): iterator is AsyncFastIterator<T>;

}
export declare class AsyncPrependIterator<T> extends AsyncFastIteratorBase<T> {
readonly source: AsyncFastIterator<T>;
readonly item: AsyncOptLazy<T>;
constructor(source: AsyncFastIterator<T>, item: AsyncOptLazy<T>);
prependDone: boolean;
fastNext<O>(otherwise?: AsyncOptLazy<O>): MaybePromise<T | O>;
}
export declare class AsyncAppendIterator<T> extends AsyncFastIteratorBase<T> {
readonly source: AsyncFastIterator<T>;
readonly item: AsyncOptLazy<T>;
constructor(source: AsyncFastIterator<T>, item: AsyncOptLazy<T>);
appendDone: boolean;
fastNext<O>(otherwise?: AsyncOptLazy<O>): Promise<T | O>;
}
export declare class AsyncIndexedIterator<T> extends AsyncFastIteratorBase<[
number,
T
]> {
readonly source: AsyncFastIterator<T>;
readonly startIndex: number;
index: number;
constructor(source: AsyncFastIterator<T>, startIndex?: number);
fastNext<O>(otherwise?: AsyncOptLazy<O>): Promise<[number, T] | O>;
}
export declare class AsyncMapIterator<T, T2> extends AsyncFastIteratorBase<T2> {
readonly source: AsyncFastIterator<T>;
readonly mapFun: (value: T, index: number) => MaybePromise<T2>;
constructor(source: AsyncFastIterator<T>, mapFun: (value: T, index: number) => MaybePromise<T2>);
readonly state: TraverseState;
fastNext<O>(otherwise?: AsyncOptLazy<O>): Promise<T2 | O>;
}
export declare class AsyncMapPureIterator<T, A extends readonly unknown[], T2> extends AsyncFastIteratorBase<T2> {
readonly source: AsyncFastIterator<T>;
readonly mapFun: (value: T, ...args: A) => MaybePromise<T2>;
readonly args: A;
constructor(source: AsyncFastIterator<T>, mapFun: (value: T, ...args: A) => MaybePromise<T2>, args: A);
fastNext<O>(otherwise?: AsyncOptLazy<O>): Promise<T2 | O>;
}
export declare class AsyncFlatMapIterator<T, T2> extends AsyncFastIteratorBase<T2> {
readonly source: AsyncStream<T>;
readonly flatMapFun: (value: T, index: number, halt: () => void) => AsyncStreamSource<T2>;
iterator: AsyncFastIterator<T>;
constructor(source: AsyncStream<T>, flatMapFun: (value: T, index: number, halt: () => void) => AsyncStreamSource<T2>);
readonly state: TraverseState;
done: boolean;
currentIterator: null | AsyncFastIterator<T2>;
fastNext<O>(otherwise?: AsyncOptLazy<O>): Promise<T2 | O>;
}
export declare class AsyncConcatIterator<T> extends AsyncFastIteratorBase<T> {
readonly source: AsyncStream<T>;
readonly otherSources: AsyncStreamSource<T>[];
iterator: AsyncFastIterator<T>;
constructor(source: AsyncStream<T>, otherSources: AsyncStreamSource<T>[]);
sourceIndex: number;
fastNext<O>(otherwise?: AsyncOptLazy<O>): Promise<T | O>;
}
export declare class AsyncIntersperseIterator<T, S> extends AsyncFastIteratorBase<T | S> {
readonly source: AsyncFastIterator<T>;
readonly sepStream: AsyncStream<S>;
constructor(source: AsyncFastIterator<T>, sepStream: AsyncStream<S>);
sepIterator: AsyncFastIterator<S> | undefined;
nextValue: T;
isInitialized: boolean;
isDone: boolean;
fastNext<O>(otherwise?: AsyncOptLazy<O>): Promise<T | S | O>;
}
export declare class AsyncFilterIterator<T> extends AsyncFastIteratorBase<T> {
readonly source: AsyncFastIterator<T>;
readonly pred: (value: T, index: number, halt: () => void) => MaybePromise<boolean>;
readonly invert: boolean;
constructor(source: AsyncFastIterator<T>, pred: (value: T, index: number, halt: () => void) => MaybePromise<boolean>, invert: boolean);
readonly state: TraverseState;
fastNext<O>(otherwise?: AsyncOptLazy<O>): Promise<T | O>;
}
export declare class AsyncFilterPureIterator<T, A extends readonly unknown[]> extends AsyncFastIteratorBase<T> {
readonly source: AsyncFastIterator<T>;
readonly pred: (value: T, ...args: A) => MaybePromise<boolean>;
readonly args: A;
readonly invert: boolean;
constructor(source: AsyncFastIterator<T>, pred: (value: T, ...args: A) => MaybePromise<boolean>, args: A, invert: boolean);
fastNext<O>(otherwise?: AsyncOptLazy<O>): Promise<T | O>;
}
export declare class AsyncCollectIterator<T, R> extends AsyncFastIteratorBase<R> {
readonly source: AsyncFastIterator<T>;
readonly collectFun: AsyncCollectFun<T, R>;
constructor(source: AsyncFastIterator<T>, collectFun: AsyncCollectFun<T, R>);
readonly state: TraverseState;
fastNext<O>(otherwise?: AsyncOptLazy<O>): Promise<R | O>;
}
export declare class AsyncIndicesWhereIterator<T> extends AsyncFastIteratorBase<number> {
readonly source: AsyncFastIterator<T>;
readonly pred: (value: T) => MaybePromise<boolean>;
constructor(source: AsyncFastIterator<T>, pred: (value: T) => MaybePromise<boolean>);
index: number;
fastNext<O>(otherwise?: AsyncOptLazy<O>): Promise<number | O>;
}
export declare class AsyncIndicesOfIterator<T> extends AsyncFastIteratorBase<number> {
readonly source: AsyncFastIterator<T>;
readonly searchValue: T;
readonly eq: Eq<T>;
constructor(source: AsyncFastIterator<T>, searchValue: T, eq: Eq<T>);
index: number;
fastNext<O>(otherwise?: AsyncOptLazy<O>): Promise<number | O>;
}
export declare class AsyncTakeWhileIterator<T> extends AsyncFastIteratorBase<T> {
readonly source: AsyncFastIterator<T>;
readonly pred: (value: T, index: number) => MaybePromise<boolean>;
constructor(source: AsyncFastIterator<T>, pred: (value: T, index: number) => MaybePromise<boolean>);
isDone: boolean;
index: number;
fastNext<O>(otherwise?: AsyncOptLazy<O>): Promise<T | O>;
}
export declare class AsyncDropWhileIterator<T> extends AsyncFastIteratorBase<T> {
readonly source: AsyncFastIterator<T>;
readonly pred: (value: T, index: number) => MaybePromise<boolean>;
constructor(source: AsyncFastIterator<T>, pred: (value: T, index: number) => MaybePromise<boolean>);
pass: boolean;
index: number;
fastNext<O>(otherwise?: AsyncOptLazy<O>): Promise<T | O>;
}
export declare class AsyncTakeIterator<T> extends AsyncFastIteratorBase<T> {
readonly source: AsyncFastIterator<T>;
readonly amount: number;
constructor(source: AsyncFastIterator<T>, amount: number);
i: number;
fastNext<O>(otherwise?: AsyncOptLazy<O>): Promise<T | O>;
}
export declare class AsyncDropIterator<T> extends AsyncFastIteratorBase<T> {
readonly source: AsyncFastIterator<T>;
readonly amount: number;
remain: number;
constructor(source: AsyncFastIterator<T>, amount: number);
fastNext<O>(otherwise?: AsyncOptLazy<O>): Promise<T | O>;
}
export declare class AsyncRepeatIterator<T> extends AsyncFastIteratorBase<T> {
readonly source: AsyncStream<T>;
readonly amount?: number | undefined;
iterator: AsyncFastIterator<T>;
remain?: number;
constructor(source: AsyncStream<T>, amount?: number | undefined);
isEmpty: boolean;
fastNext<O>(otherwise?: AsyncOptLazy<O>): Promise<T | O>;
}
export declare class AsyncSplitWhereIterator<T> extends AsyncFastIteratorBase<T[]> {
readonly source: AsyncFastIterator<T>;
readonly pred: (value: T, index: number) => MaybePromise<boolean>;
constructor(source: AsyncFastIterator<T>, pred: (value: T, index: number) => MaybePromise<boolean>);
index: number;
fastNext<O>(otherwise?: AsyncOptLazy<O>): Promise<T[] | O>;
}
export declare class AsyncFoldIterator<I, R> extends AsyncFastIteratorBase<R> {
readonly source: AsyncFastIterator<I>;
readonly init: AsyncOptLazy<R>;
readonly getNext: (current: R, value: I, index: number, halt: () => void) => MaybePromise<R>;
constructor(source: AsyncFastIterator<I>, init: AsyncOptLazy<R>, getNext: (current: R, value: I, index: number, halt: () => void) => MaybePromise<R>);
current?: R;
isInitialized: boolean;
state: TraverseState;
fastNext<O>(otherwise?: AsyncOptLazy<O>): Promise<R | O>;
}
export declare class AsyncSplitOnIterator<T> extends AsyncFastIteratorBase<T[]> {
readonly source: AsyncFastIterator<T>;
readonly sepElem: T;
readonly eq: Eq<T>;
constructor(source: AsyncFastIterator<T>, sepElem: T, eq: Eq<T>);
isDone: boolean;
fastNext<O>(otherwise?: AsyncOptLazy<O>): Promise<T[] | O>;
}
export declare class AsyncReduceIterator<I, R> extends AsyncFastIteratorBase<R> {
readonly source: AsyncFastIterator<I>;
readonly reducer: AsyncReducer<I, R>;
state: unknown;
constructor(source: AsyncFastIterator<I>, reducer: AsyncReducer<I, R>);
readonly traverseState: TraverseState;
isInitialized: boolean;
isDone: boolean;
fastNext<O>(otherwise?: AsyncOptLazy<O>): Promise<R | O>;
}
export declare class AsyncReduceAllIterator<I, R> extends AsyncFastIteratorBase<R> {
readonly source: AsyncFastIterator<I>;
readonly reducers: AsyncReducer<I, any>[];
state?: unknown[];
readonly reducersToClose: Set<AsyncReducer<any>>;
constructor(source: AsyncFastIterator<I>, reducers: AsyncReducer<I, any>[]);
index: number;
isDone: boolean;
err: any;
fastNext<O>(otherwise?: AsyncOptLazy<O>): Promise<R | O>;
}

2

dist/types/async-custom/async-stream-custom.d.ts
import { ArrayNonEmpty, AsyncCollectFun, AsyncOptLazy, AsyncReducer, Eq, MaybePromise, ToJSON, TraverseState } from '@rimbu/common';
import type { AsyncFastIterator, AsyncStream, AsyncStreamSource } from '@rimbu/stream/async';
import type { AsyncStreamConstructors } from '@rimbu/stream/async-custom';
import { AsyncStreamConstructors } from '@rimbu/stream/async-custom';
export declare abstract class AsyncStreamBase<T> implements AsyncStream<T> {

@@ -5,0 +5,0 @@ abstract [Symbol.asyncIterator](): AsyncFastIterator<T>;

{
"name": "@rimbu/stream",
"version": "0.10.5",
"version": "0.10.6",
"description": "Efficient structure representing a sequence of elements, with powerful operations for TypeScript",

@@ -80,3 +80,3 @@ "keywords": [

},
"gitHead": "c9c3842b12ed66ba247e3d5462c6b92b41e0007a"
"gitHead": "e55ecad9205b4a29d7a61315574ec9c0937afd80"
}
import { Token } from '@rimbu/base';
import { ArrayNonEmpty, AsyncOptLazy, MaybePromise } from '@rimbu/common';
import {
ArrayNonEmpty,
AsyncCollectFun,
AsyncOptLazy,
AsyncReducer,
CollectFun,
Eq,
MaybePromise,
TraverseState,
} from '@rimbu/common';
import { closeIters, fromAsyncStreamSource } from '@rimbu/stream/async-custom';
import type { AsyncFastIterator, AsyncStreamSource } from '@rimbu/stream/async';
import {
closeIters,
fromAsyncStreamSource,
isEmptyAsyncStreamSourceInstance,
} from '@rimbu/stream/async-custom';
import type {
AsyncFastIterator,
AsyncStream,
AsyncStreamSource,
} from '@rimbu/stream/async';

@@ -306,1 +323,922 @@ export const fixedDoneAsyncIteratorResult = Promise.resolve({

}
export class AsyncPrependIterator<T> extends AsyncFastIteratorBase<T> {
constructor(
readonly source: AsyncFastIterator<T>,
readonly item: AsyncOptLazy<T>
) {
super();
this.return = async (): Promise<void> => {
if (this.prependDone) return closeIters(this.source);
};
}
prependDone = false;
fastNext<O>(otherwise?: AsyncOptLazy<O>): MaybePromise<T | O> {
if (this.prependDone) {
return this.source.fastNext(otherwise!);
}
this.prependDone = true;
return AsyncOptLazy.toMaybePromise(this.item);
}
}
export class AsyncAppendIterator<T> extends AsyncFastIteratorBase<T> {
constructor(
readonly source: AsyncFastIterator<T>,
readonly item: AsyncOptLazy<T>
) {
super();
this.return = async (): Promise<void> => {
if (!this.appendDone) return closeIters(source);
};
}
appendDone = false;
async fastNext<O>(otherwise?: AsyncOptLazy<O>): Promise<T | O> {
if (this.appendDone) return AsyncOptLazy.toMaybePromise(otherwise!);
const done = Symbol('Done');
const value = await this.source.fastNext(done);
if (done !== value) return value;
this.appendDone = true;
return AsyncOptLazy.toMaybePromise(this.item);
}
}
export class AsyncIndexedIterator<T> extends AsyncFastIteratorBase<
[number, T]
> {
index: number;
constructor(readonly source: AsyncFastIterator<T>, readonly startIndex = 0) {
super();
this.index = startIndex;
this.return = (): Promise<void> => closeIters(source);
}
async fastNext<O>(otherwise?: AsyncOptLazy<O>): Promise<[number, T] | O> {
const done = Symbol('Done');
const value = await this.source.fastNext(done);
if (done === value) {
return AsyncOptLazy.toMaybePromise(otherwise!);
}
return [this.index++, value];
}
}
export class AsyncMapIterator<T, T2> extends AsyncFastIteratorBase<T2> {
constructor(
readonly source: AsyncFastIterator<T>,
readonly mapFun: (value: T, index: number) => MaybePromise<T2>
) {
super();
this.return = (): Promise<void> => closeIters(source);
}
readonly state = TraverseState();
async fastNext<O>(otherwise?: AsyncOptLazy<O>): Promise<T2 | O> {
const state = this.state;
const done = Symbol('Done');
const next = await this.source.fastNext(done);
if (done === next) {
return AsyncOptLazy.toMaybePromise(otherwise!);
}
return this.mapFun(next, state.nextIndex());
}
}
export class AsyncMapPureIterator<
T,
A extends readonly unknown[],
T2
> extends AsyncFastIteratorBase<T2> {
constructor(
readonly source: AsyncFastIterator<T>,
readonly mapFun: (value: T, ...args: A) => MaybePromise<T2>,
readonly args: A
) {
super();
this.return = (): Promise<void> => closeIters(source);
}
async fastNext<O>(otherwise?: AsyncOptLazy<O>): Promise<T2 | O> {
const done = Symbol('Done');
const next = await this.source.fastNext(done);
if (done === next) return AsyncOptLazy.toMaybePromise(otherwise!);
return this.mapFun(next, ...this.args);
}
}
export class AsyncFlatMapIterator<T, T2> extends AsyncFastIteratorBase<T2> {
iterator: AsyncFastIterator<T>;
constructor(
readonly source: AsyncStream<T>,
readonly flatMapFun: (
value: T,
index: number,
halt: () => void
) => AsyncStreamSource<T2>
) {
super();
this.iterator = this.source[Symbol.asyncIterator]();
this.return = (): Promise<void> =>
closeIters(this.currentIterator, this.iterator);
}
readonly state = TraverseState();
done = false;
currentIterator: null | AsyncFastIterator<T2> = null;
async fastNext<O>(otherwise?: AsyncOptLazy<O>): Promise<T2 | O> {
const state = this.state;
if (state.halted || this.done) {
return AsyncOptLazy.toMaybePromise(otherwise!);
}
const done = Symbol('Done');
let nextValue: T2 | typeof done = done;
while (
null === this.currentIterator ||
done === (nextValue = await this.currentIterator.fastNext(done))
) {
const nextIter = await this.iterator.fastNext(done);
if (done === nextIter) {
this.done = true;
return AsyncOptLazy.toMaybePromise(otherwise!);
}
const nextSource = this.flatMapFun(
nextIter,
state.nextIndex(),
state.halt
);
const currentIterator =
fromAsyncStreamSource(nextSource)[Symbol.asyncIterator]();
this.currentIterator = currentIterator;
}
return nextValue;
}
}
export class AsyncConcatIterator<T> extends AsyncFastIteratorBase<T> {
iterator: AsyncFastIterator<T>;
constructor(
readonly source: AsyncStream<T>,
readonly otherSources: AsyncStreamSource<T>[]
) {
super();
this.iterator = source[Symbol.asyncIterator]();
this.return = (): Promise<void> => closeIters(this.iterator);
}
sourceIndex = 0;
async fastNext<O>(otherwise?: AsyncOptLazy<O>): Promise<T | O> {
const done = Symbol('Done');
let value: T | typeof done;
const length = this.otherSources.length;
while (done === (value = await this.iterator.fastNext(done))) {
if (this.sourceIndex >= length) {
return AsyncOptLazy.toMaybePromise(otherwise!);
}
let nextSource: AsyncStreamSource<T> =
this.otherSources[this.sourceIndex++];
while (isEmptyAsyncStreamSourceInstance(nextSource)) {
if (this.sourceIndex >= length) {
return AsyncOptLazy.toMaybePromise(otherwise!);
}
nextSource = this.otherSources[this.sourceIndex++];
}
this.iterator = fromAsyncStreamSource(nextSource)[Symbol.asyncIterator]();
}
return value;
}
}
export class AsyncIntersperseIterator<T, S> extends AsyncFastIteratorBase<
T | S
> {
constructor(
readonly source: AsyncFastIterator<T>,
readonly sepStream: AsyncStream<S>
) {
super();
this.return = async (): Promise<void> => {
if (!this.isDone) return closeIters(this.sepIterator, this.source);
};
}
sepIterator: AsyncFastIterator<S> | undefined;
nextValue!: T;
isInitialized = false;
isDone = false;
async fastNext<O>(otherwise?: AsyncOptLazy<O>): Promise<T | S | O> {
if (this.isDone) {
return AsyncOptLazy.toMaybePromise(otherwise!);
}
const done = Symbol('Done');
if (undefined !== this.sepIterator) {
const sepNext = await this.sepIterator.fastNext(done);
if (done !== sepNext) return sepNext;
this.sepIterator = undefined;
}
if (this.isInitialized) {
const newNextValue = await this.source.fastNext(done);
if (done === newNextValue) {
this.isDone = true;
return this.nextValue;
}
const currentNextValue = this.nextValue;
this.nextValue = newNextValue;
this.sepIterator = this.sepStream[Symbol.asyncIterator]();
return currentNextValue;
}
const nextValue = await this.source.fastNext(done);
if (done === nextValue) {
return AsyncOptLazy.toMaybePromise(otherwise!);
}
const newNextValue = await this.source.fastNext(done);
if (done === newNextValue) {
return nextValue;
}
this.nextValue = newNextValue;
this.isInitialized = true;
this.sepIterator = this.sepStream[Symbol.asyncIterator]();
return nextValue;
}
}
export class AsyncFilterIterator<T> extends AsyncFastIteratorBase<T> {
constructor(
readonly source: AsyncFastIterator<T>,
readonly pred: (
value: T,
index: number,
halt: () => void
) => MaybePromise<boolean>,
readonly invert: boolean
) {
super();
this.return = (): Promise<void> => closeIters(source);
}
readonly state = TraverseState();
async fastNext<O>(otherwise?: AsyncOptLazy<O>): Promise<T | O> {
const state = this.state;
if (state.halted) {
return AsyncOptLazy.toMaybePromise(otherwise!);
}
const done = Symbol('Done');
let value: T | typeof done;
const source = this.source;
const pred = this.pred;
const halt = state.halt;
const invert = this.invert;
while (!state.halted && done !== (value = await source.fastNext(done))) {
const cond = await pred(value, state.nextIndex(), halt);
if (cond !== invert) return value;
}
if (state.halted && done !== value!) {
await closeIters(this);
}
return AsyncOptLazy.toMaybePromise(otherwise!);
}
}
export class AsyncFilterPureIterator<
T,
A extends readonly unknown[]
> extends AsyncFastIteratorBase<T> {
constructor(
readonly source: AsyncFastIterator<T>,
readonly pred: (value: T, ...args: A) => MaybePromise<boolean>,
readonly args: A,
readonly invert: boolean
) {
super();
this.return = (): Promise<void> => closeIters(source);
}
async fastNext<O>(otherwise?: AsyncOptLazy<O>): Promise<T | O> {
const done = Symbol('Done');
let value: T | typeof done;
const source = this.source;
const pred = this.pred;
const args = this.args;
const invert = this.invert;
while (done !== (value = await source.fastNext(done))) {
const cond = await pred(value, ...args);
if (cond !== invert) return value;
}
return AsyncOptLazy.toMaybePromise(otherwise!);
}
}
export class AsyncCollectIterator<T, R> extends AsyncFastIteratorBase<R> {
constructor(
readonly source: AsyncFastIterator<T>,
readonly collectFun: AsyncCollectFun<T, R>
) {
super();
this.return = (): Promise<void> => closeIters(source);
}
readonly state = TraverseState();
async fastNext<O>(otherwise?: AsyncOptLazy<O>): Promise<R | O> {
const state = this.state;
if (state.halted) {
return AsyncOptLazy.toMaybePromise(otherwise!);
}
const { halt } = state;
const done = Symbol('Done');
let value: T | typeof done;
const source = this.source;
const collectFun = this.collectFun;
try {
while (!state.halted && done !== (value = await source.fastNext(done))) {
const result = await collectFun(
value,
state.nextIndex(),
CollectFun.Skip,
halt
);
if (CollectFun.Skip === result) continue;
return result as R;
}
return AsyncOptLazy.toMaybePromise(otherwise!);
} finally {
if (state.halted && done !== value!) {
await closeIters(this);
}
}
}
}
export class AsyncIndicesWhereIterator<
T
> extends AsyncFastIteratorBase<number> {
constructor(
readonly source: AsyncFastIterator<T>,
readonly pred: (value: T) => MaybePromise<boolean>
) {
super();
this.return = (): Promise<void> => closeIters(source);
}
index = 0;
async fastNext<O>(otherwise?: AsyncOptLazy<O>): Promise<number | O> {
const done = Symbol('Done');
let value: T | typeof done;
const source = this.source;
const pred = this.pred;
while (done !== (value = await source.fastNext(done))) {
const cond = await pred(value);
if (cond) {
return this.index++;
}
this.index++;
}
return AsyncOptLazy.toMaybePromise(otherwise!);
}
}
export class AsyncIndicesOfIterator<T> extends AsyncFastIteratorBase<number> {
constructor(
readonly source: AsyncFastIterator<T>,
readonly searchValue: T,
readonly eq: Eq<T>
) {
super();
this.return = (): Promise<void> => closeIters(source);
}
index = 0;
async fastNext<O>(otherwise?: AsyncOptLazy<O>): Promise<number | O> {
const done = Symbol('Done');
let value: T | typeof done;
const source = this.source;
const searchValue = this.searchValue;
const eq = this.eq;
while (done !== (value = await source.fastNext(done))) {
if (eq(searchValue, value)) return this.index++;
this.index++;
}
return AsyncOptLazy.toMaybePromise(otherwise!);
}
}
export class AsyncTakeWhileIterator<T> extends AsyncFastIteratorBase<T> {
constructor(
readonly source: AsyncFastIterator<T>,
readonly pred: (value: T, index: number) => MaybePromise<boolean>
) {
super();
this.return = (): Promise<void> => closeIters(source);
}
isDone = false;
index = 0;
async fastNext<O>(otherwise?: AsyncOptLazy<O>): Promise<T | O> {
if (this.isDone) return AsyncOptLazy.toMaybePromise(otherwise!);
const done = Symbol('Done');
const next = await this.source.fastNext(done);
if (done === next) {
this.isDone = true;
return AsyncOptLazy.toMaybePromise(otherwise!);
}
if (await this.pred(next, this.index++)) return next;
this.isDone = true;
await closeIters(this);
return AsyncOptLazy.toMaybePromise(otherwise!);
}
}
export class AsyncDropWhileIterator<T> extends AsyncFastIteratorBase<T> {
constructor(
readonly source: AsyncFastIterator<T>,
readonly pred: (value: T, index: number) => MaybePromise<boolean>
) {
super();
this.return = (): Promise<void> => closeIters(source);
}
pass = false;
index = 0;
async fastNext<O>(otherwise?: AsyncOptLazy<O>): Promise<T | O> {
const source = this.source;
if (this.pass) return source.fastNext(otherwise!);
const done = Symbol('Done');
let value: T | typeof done;
while (done !== (value = await source.fastNext(done))) {
this.pass = !(await this.pred(value, this.index++));
if (this.pass) return value;
}
return AsyncOptLazy.toMaybePromise(otherwise!);
}
}
export class AsyncTakeIterator<T> extends AsyncFastIteratorBase<T> {
constructor(readonly source: AsyncFastIterator<T>, readonly amount: number) {
super();
this.return = (): Promise<void> => closeIters(source);
}
i = 0;
async fastNext<O>(otherwise?: AsyncOptLazy<O>): Promise<T | O> {
if (this.i++ >= this.amount) {
await closeIters(this);
this.return = undefined;
return AsyncOptLazy.toMaybePromise(otherwise!);
}
return this.source.fastNext(otherwise!);
}
}
export class AsyncDropIterator<T> extends AsyncFastIteratorBase<T> {
remain: number;
constructor(readonly source: AsyncFastIterator<T>, readonly amount: number) {
super();
this.return = (): Promise<void> => closeIters(source);
this.remain = amount;
}
async fastNext<O>(otherwise?: AsyncOptLazy<O>): Promise<T | O> {
const source = this.source;
if (this.remain <= 0) return source.fastNext(otherwise!);
const done = Symbol('Done');
let value: T | typeof done;
while (done !== (value = await source.fastNext(done))) {
if (this.remain-- <= 0) {
return value;
}
}
return AsyncOptLazy.toMaybePromise(otherwise!);
}
}
export class AsyncRepeatIterator<T> extends AsyncFastIteratorBase<T> {
iterator: AsyncFastIterator<T>;
remain?: number;
constructor(readonly source: AsyncStream<T>, readonly amount?: number) {
super();
this.iterator = source[Symbol.asyncIterator]();
this.return = (): Promise<void> => closeIters(this.iterator);
this.remain = amount;
}
isEmpty = true;
async fastNext<O>(otherwise?: AsyncOptLazy<O>): Promise<T | O> {
const done = Symbol('Done');
const iterator = this.iterator;
let value = await iterator.fastNext(done);
if (done !== value) {
this.isEmpty = false;
return value;
}
if (this.isEmpty) {
return AsyncOptLazy.toMaybePromise(otherwise!) as O;
}
if (undefined !== this.remain) {
this.remain--;
if (this.remain <= 0) {
return AsyncOptLazy.toMaybePromise(otherwise!);
}
}
this.iterator = this.source[Symbol.asyncIterator]();
value = await this.iterator.fastNext(done);
if (done === value) {
return AsyncOptLazy.toMaybePromise(otherwise!);
}
return value;
}
}
export class AsyncSplitWhereIterator<T> extends AsyncFastIteratorBase<T[]> {
constructor(
readonly source: AsyncFastIterator<T>,
readonly pred: (value: T, index: number) => MaybePromise<boolean>
) {
super();
this.return = (): Promise<void> => closeIters(source);
}
index = 0;
async fastNext<O>(otherwise?: AsyncOptLazy<O>): Promise<T[] | O> {
const startIndex = this.index;
if (startIndex < 0) {
return AsyncOptLazy.toMaybePromise(otherwise!);
}
const result: T[] = [];
const source = this.source;
const done = Symbol('Done');
let value: T | typeof done;
const pred = this.pred;
while (done !== (value = await source.fastNext(done))) {
if (await pred(value, this.index++)) return result;
result.push(value);
}
this.return = undefined;
if (startIndex === this.index) {
this.index = -1;
return AsyncOptLazy.toMaybePromise(otherwise!);
}
this.index = -1;
return result;
}
}
export class AsyncFoldIterator<I, R> extends AsyncFastIteratorBase<R> {
constructor(
readonly source: AsyncFastIterator<I>,
readonly init: AsyncOptLazy<R>,
readonly getNext: (
current: R,
value: I,
index: number,
halt: () => void
) => MaybePromise<R>
) {
super();
this.return = (): Promise<void> => closeIters(source);
}
current?: R;
isInitialized = false;
state = TraverseState();
async fastNext<O>(otherwise?: AsyncOptLazy<O>): Promise<R | O> {
if (!this.isInitialized) {
this.current = await AsyncOptLazy.toMaybePromise(this.init);
this.isInitialized = true;
}
const state = this.state;
if (state.halted) {
await closeIters(this);
this.return = undefined;
return AsyncOptLazy.toMaybePromise(otherwise!);
}
const done = Symbol('done');
const value = await this.source.fastNext(done);
if (done === value) {
this.return = undefined;
return AsyncOptLazy.toMaybePromise(otherwise!);
}
this.current = await this.getNext(
this.current!,
value,
state.nextIndex(),
state.halt
);
return this.current;
}
}
export class AsyncSplitOnIterator<T> extends AsyncFastIteratorBase<T[]> {
constructor(
readonly source: AsyncFastIterator<T>,
readonly sepElem: T,
readonly eq: Eq<T>
) {
super();
this.return = (): Promise<void> => closeIters(source);
}
isDone = false;
async fastNext<O>(otherwise?: AsyncOptLazy<O>): Promise<T[] | O> {
if (this.isDone) {
return AsyncOptLazy.toMaybePromise(otherwise!);
}
const result: T[] = [];
const source = this.source;
const done = Symbol('Done');
let value: T | typeof done;
let processed = false;
const eq = this.eq;
const sepElem = this.sepElem;
while (done !== (value = await source.fastNext(done))) {
processed = true;
if (eq(value, sepElem)) return result;
result.push(value);
}
this.return = undefined;
this.isDone = true;
if (!processed) return AsyncOptLazy.toMaybePromise(otherwise!);
return result;
}
}
export class AsyncReduceIterator<I, R> extends AsyncFastIteratorBase<R> {
state: unknown;
constructor(
readonly source: AsyncFastIterator<I>,
readonly reducer: AsyncReducer<I, R>
) {
super();
this.return = (): Promise<void> => closeIters(source);
}
readonly traverseState = TraverseState();
isInitialized = false;
isDone = false;
async fastNext<O>(otherwise?: AsyncOptLazy<O>): Promise<R | O> {
if (this.isDone) {
return AsyncOptLazy.toMaybePromise(otherwise!);
}
const done = Symbol('Done');
const value = await this.source.fastNext(done);
if (done === value) {
this.isDone = true;
this.return = undefined;
return AsyncOptLazy.toMaybePromise(otherwise!);
}
const reducer = this.reducer;
if (!this.isInitialized) {
this.state = await AsyncOptLazy.toMaybePromise(reducer.init);
this.isInitialized = true;
}
const traverseState = this.traverseState;
try {
this.state = await reducer.next(
this.state,
value,
traverseState.nextIndex(),
traverseState.halt
);
} catch (err) {
this.isDone = true;
await closeIters(this);
this.return = undefined;
await reducer.onClose?.(this.state, err);
throw err;
}
if (traverseState.halted) {
this.isDone = true;
await closeIters(this);
this.return = undefined;
await reducer.onClose?.(this.state);
}
return reducer.stateToResult(this.state);
}
}
export class AsyncReduceAllIterator<I, R> extends AsyncFastIteratorBase<R> {
state?: unknown[];
readonly reducersToClose: Set<AsyncReducer<any>>;
constructor(
readonly source: AsyncFastIterator<I>,
readonly reducers: AsyncReducer<I, any>[]
) {
super();
this.reducersToClose = new Set(reducers);
this.return = async (): Promise<void> => {
await closeIters(source);
const state = this.state;
if (state) {
await Promise.all(
[...this.reducersToClose].map((reducer, index) =>
reducer.onClose?.(state[index], this.err)
)
);
}
};
}
index = 0;
isDone = false;
err: any;
async fastNext<O>(otherwise?: AsyncOptLazy<O>): Promise<R | O> {
const reducers = this.reducers;
if (undefined === this.state) {
this.state = await Promise.all(
reducers.map((d: any): unknown => AsyncOptLazy.toMaybePromise(d.init))
);
}
if (this.isDone) {
await closeIters(this);
this.return = undefined;
return AsyncOptLazy.toMaybePromise(otherwise!);
}
const done = Symbol('Done');
const value = await this.source.fastNext(done);
if (done === value) {
return AsyncOptLazy.toMaybePromise(otherwise!);
}
const state = this.state;
this.state = await Promise.all(
reducers.map((red, i) => {
const st = state[i];
if (!this.reducersToClose.has(red)) return st;
try {
return red.next(st, value, this.index, async () => {
this.reducersToClose.delete(red);
await red.onClose?.(st);
return st;
});
} catch (e) {
this.err = e;
}
})
);
this.index++;
this.isDone = this.reducersToClose.size === 0 || undefined !== this.err;
if (this.isDone) {
await closeIters(this);
this.return = undefined;
return AsyncOptLazy.toMaybePromise(otherwise!);
}
return Promise.all(
this.state.map((s, i) => reducers[i].stateToResult(s))
) as any;
}
}

@@ -6,3 +6,2 @@ import {

AsyncReducer,
CollectFun,
Comp,

@@ -22,8 +21,5 @@ Eq,

} from '@rimbu/stream/async';
import type { AsyncStreamConstructors } from '@rimbu/stream/async-custom';
import {
closeIters,
emptyAsyncFastIterator,
AsyncFastIteratorBase,
AsyncOfIterator,
FromAsyncIterator,

@@ -37,2 +33,27 @@ FromIterator,

AsyncZipAllWithItererator,
AsyncAppendIterator,
AsyncCollectIterator,
AsyncConcatIterator,
AsyncDropIterator,
AsyncSplitWhereIterator,
AsyncDropWhileIterator,
AsyncFilterIterator,
AsyncFilterPureIterator,
AsyncFlatMapIterator,
AsyncFoldIterator,
AsyncIndexedIterator,
AsyncIndicesOfIterator,
AsyncIndicesWhereIterator,
AsyncIntersperseIterator,
AsyncMapIterator,
AsyncMapPureIterator,
AsyncOfIterator,
AsyncPrependIterator,
AsyncReduceAllIterator,
AsyncReduceIterator,
AsyncRepeatIterator,
AsyncSplitOnIterator,
AsyncStreamConstructors,
AsyncTakeIterator,
AsyncTakeWhileIterator,
} from '@rimbu/stream/async-custom';

@@ -794,26 +815,2 @@ import { Stream } from '@rimbu/stream';

class AsyncPrependIterator<T> extends AsyncFastIteratorBase<T> {
constructor(
readonly source: AsyncFastIterator<T>,
readonly item: AsyncOptLazy<T>
) {
super();
this.return = async (): Promise<void> => {
if (this.prependDone) return closeIters(this.source);
};
}
prependDone = false;
fastNext<O>(otherwise?: AsyncOptLazy<O>): MaybePromise<T | O> {
if (this.prependDone) {
return this.source.fastNext(otherwise!);
}
this.prependDone = true;
return AsyncOptLazy.toMaybePromise(this.item);
}
}
class AsyncAppendStream<T> extends AsyncStreamBase<T> {

@@ -844,31 +841,2 @@ constructor(readonly source: AsyncStream<T>, readonly item: AsyncOptLazy<T>) {

class AsyncAppendIterator<T> extends AsyncFastIteratorBase<T> {
constructor(
readonly source: AsyncFastIterator<T>,
readonly item: AsyncOptLazy<T>
) {
super();
this.return = async (): Promise<void> => {
if (!this.appendDone) return closeIters(source);
};
}
appendDone = false;
async fastNext<O>(otherwise?: AsyncOptLazy<O>): Promise<T | O> {
if (this.appendDone) return AsyncOptLazy.toMaybePromise(otherwise!);
const done = Symbol('Done');
const value = await this.source.fastNext(done);
if (done !== value) return value;
this.appendDone = true;
return AsyncOptLazy.toMaybePromise(this.item);
}
}
class AsyncIndexedStream<T> extends AsyncStreamBase<[number, T]> {

@@ -891,23 +859,2 @@ constructor(readonly source: AsyncStream<T>, readonly startIndex: number) {

class AsyncIndexedIterator<T> extends AsyncFastIteratorBase<[number, T]> {
index: number;
constructor(readonly source: AsyncFastIterator<T>, readonly startIndex = 0) {
super();
this.index = startIndex;
this.return = (): Promise<void> => closeIters(source);
}
async fastNext<O>(otherwise?: AsyncOptLazy<O>): Promise<[number, T] | O> {
const done = Symbol('Done');
const value = await this.source.fastNext(done);
if (done === value) {
return AsyncOptLazy.toMaybePromise(otherwise!);
}
return [this.index++, value];
}
}
class AsyncMapStream<T, T2> extends AsyncStreamBase<T2> {

@@ -965,27 +912,2 @@ constructor(

class AsyncMapIterator<T, T2> extends AsyncFastIteratorBase<T2> {
constructor(
readonly source: AsyncFastIterator<T>,
readonly mapFun: (value: T, index: number) => MaybePromise<T2>
) {
super();
this.return = (): Promise<void> => closeIters(source);
}
readonly state = TraverseState();
async fastNext<O>(otherwise?: AsyncOptLazy<O>): Promise<T2 | O> {
const state = this.state;
const done = Symbol('Done');
const next = await this.source.fastNext(done);
if (done === next) {
return AsyncOptLazy.toMaybePromise(otherwise!);
}
return this.mapFun(next, state.nextIndex());
}
}
class AsyncMapPureStream<

@@ -1041,26 +963,2 @@ T,

class AsyncMapPureIterator<
T,
A extends readonly unknown[],
T2
> extends AsyncFastIteratorBase<T2> {
constructor(
readonly source: AsyncFastIterator<T>,
readonly mapFun: (value: T, ...args: A) => MaybePromise<T2>,
readonly args: A
) {
super();
this.return = (): Promise<void> => closeIters(source);
}
async fastNext<O>(otherwise?: AsyncOptLazy<O>): Promise<T2 | O> {
const done = Symbol('Done');
const next = await this.source.fastNext(done);
if (done === next) return AsyncOptLazy.toMaybePromise(otherwise!);
return this.mapFun(next, ...this.args);
}
}
class AsyncFlatMapStream<T, T2> extends AsyncStreamBase<T2> {

@@ -1083,104 +981,2 @@ constructor(

class AsyncFlatMapIterator<T, T2> extends AsyncFastIteratorBase<T2> {
iterator: AsyncFastIterator<T>;
constructor(
readonly source: AsyncStream<T>,
readonly flatMapFun: (
value: T,
index: number,
halt: () => void
) => AsyncStreamSource<T2>
) {
super();
this.iterator = this.source[Symbol.asyncIterator]();
this.return = (): Promise<void> =>
closeIters(this.currentIterator, this.iterator);
}
readonly state = TraverseState();
done = false;
currentIterator: null | AsyncFastIterator<T2> = null;
async fastNext<O>(otherwise?: AsyncOptLazy<O>): Promise<T2 | O> {
const state = this.state;
if (state.halted || this.done) {
return AsyncOptLazy.toMaybePromise(otherwise!);
}
const done = Symbol('Done');
let nextValue: T2 | typeof done = done;
while (
null === this.currentIterator ||
done === (nextValue = await this.currentIterator.fastNext(done))
) {
const nextIter = await this.iterator.fastNext(done);
if (done === nextIter) {
this.done = true;
return AsyncOptLazy.toMaybePromise(otherwise!);
}
const nextSource = this.flatMapFun(
nextIter,
state.nextIndex(),
state.halt
);
const currentIterator =
fromAsyncStreamSource(nextSource)[Symbol.asyncIterator]();
this.currentIterator = currentIterator;
}
return nextValue;
}
}
class AsyncConcatIterator<T> extends AsyncFastIteratorBase<T> {
iterator: AsyncFastIterator<T>;
constructor(
readonly source: AsyncStream<T>,
readonly otherSources: AsyncStreamSource<T>[]
) {
super();
this.iterator = source[Symbol.asyncIterator]();
this.return = (): Promise<void> => closeIters(this.iterator);
}
sourceIndex = 0;
async fastNext<O>(otherwise?: AsyncOptLazy<O>): Promise<T | O> {
const done = Symbol('Done');
let value: T | typeof done;
const length = this.otherSources.length;
while (done === (value = await this.iterator.fastNext(done))) {
if (this.sourceIndex >= length) {
return AsyncOptLazy.toMaybePromise(otherwise!);
}
let nextSource: AsyncStreamSource<T> =
this.otherSources[this.sourceIndex++];
while (isEmptyAsyncStreamSourceInstance(nextSource)) {
if (this.sourceIndex >= length) {
return AsyncOptLazy.toMaybePromise(otherwise!);
}
nextSource = this.otherSources[this.sourceIndex++];
}
this.iterator = fromAsyncStreamSource(nextSource)[Symbol.asyncIterator]();
}
return value;
}
}
class AsyncConcatStream<T> extends AsyncStreamBase<T> {

@@ -1199,64 +995,2 @@ constructor(

class AsyncIntersperseIterator<T, S> extends AsyncFastIteratorBase<T | S> {
constructor(
readonly source: AsyncFastIterator<T>,
readonly sepStream: AsyncStream<S>
) {
super();
this.return = async (): Promise<void> => {
if (!this.isDone) return closeIters(this.sepIterator, this.source);
};
}
sepIterator: AsyncFastIterator<S> | undefined;
nextValue!: T;
isInitialized = false;
isDone = false;
async fastNext<O>(otherwise?: AsyncOptLazy<O>): Promise<T | S | O> {
if (this.isDone) {
return AsyncOptLazy.toMaybePromise(otherwise!);
}
const done = Symbol('Done');
if (undefined !== this.sepIterator) {
const sepNext = await this.sepIterator.fastNext(done);
if (done !== sepNext) return sepNext;
this.sepIterator = undefined;
}
if (this.isInitialized) {
const newNextValue = await this.source.fastNext(done);
if (done === newNextValue) {
this.isDone = true;
return this.nextValue;
}
const currentNextValue = this.nextValue;
this.nextValue = newNextValue;
this.sepIterator = this.sepStream[Symbol.asyncIterator]();
return currentNextValue;
}
const nextValue = await this.source.fastNext(done);
if (done === nextValue) {
return AsyncOptLazy.toMaybePromise(otherwise!);
}
const newNextValue = await this.source.fastNext(done);
if (done === newNextValue) {
return nextValue;
}
this.nextValue = newNextValue;
this.isInitialized = true;
this.sepIterator = this.sepStream[Symbol.asyncIterator]();
return nextValue;
}
}
class AsyncIntersperseStream<T> extends AsyncStreamBase<T> {

@@ -1278,45 +1012,2 @@ constructor(

class AsyncFilterIterator<T> extends AsyncFastIteratorBase<T> {
constructor(
readonly source: AsyncFastIterator<T>,
readonly pred: (
value: T,
index: number,
halt: () => void
) => MaybePromise<boolean>,
readonly invert: boolean
) {
super();
this.return = (): Promise<void> => closeIters(source);
}
readonly state = TraverseState();
async fastNext<O>(otherwise?: AsyncOptLazy<O>): Promise<T | O> {
const state = this.state;
if (state.halted) {
return AsyncOptLazy.toMaybePromise(otherwise!);
}
const done = Symbol('Done');
let value: T | typeof done;
const source = this.source;
const pred = this.pred;
const halt = state.halt;
const invert = this.invert;
while (!state.halted && done !== (value = await source.fastNext(done))) {
const cond = await pred(value, state.nextIndex(), halt);
if (cond !== invert) return value;
}
if (state.halted && done !== value!) {
await closeIters(this);
}
return AsyncOptLazy.toMaybePromise(otherwise!);
}
}
class AsyncFilterStream<T> extends AsyncStreamBase<T> {

@@ -1354,34 +1045,2 @@ constructor(

class AsyncFilterPureIterator<
T,
A extends readonly unknown[]
> extends AsyncFastIteratorBase<T> {
constructor(
readonly source: AsyncFastIterator<T>,
readonly pred: (value: T, ...args: A) => MaybePromise<boolean>,
readonly args: A,
readonly invert: boolean
) {
super();
this.return = (): Promise<void> => closeIters(source);
}
async fastNext<O>(otherwise?: AsyncOptLazy<O>): Promise<T | O> {
const done = Symbol('Done');
let value: T | typeof done;
const source = this.source;
const pred = this.pred;
const args = this.args;
const invert = this.invert;
while (done !== (value = await source.fastNext(done))) {
const cond = await pred(value, ...args);
if (cond !== invert) return value;
}
return AsyncOptLazy.toMaybePromise(otherwise!);
}
}
class AsyncFilterPureStream<

@@ -1410,49 +1069,2 @@ T,

class AsyncCollectIterator<T, R> extends AsyncFastIteratorBase<R> {
constructor(
readonly source: AsyncFastIterator<T>,
readonly collectFun: AsyncCollectFun<T, R>
) {
super();
this.return = (): Promise<void> => closeIters(source);
}
readonly state = TraverseState();
async fastNext<O>(otherwise?: AsyncOptLazy<O>): Promise<R | O> {
const state = this.state;
if (state.halted) {
return AsyncOptLazy.toMaybePromise(otherwise!);
}
const { halt } = state;
const done = Symbol('Done');
let value: T | typeof done;
const source = this.source;
const collectFun = this.collectFun;
try {
while (!state.halted && done !== (value = await source.fastNext(done))) {
const result = await collectFun(
value,
state.nextIndex(),
CollectFun.Skip,
halt
);
if (CollectFun.Skip === result) continue;
return result as R;
}
return AsyncOptLazy.toMaybePromise(otherwise!);
} finally {
if (state.halted && done !== value!) {
await closeIters(this);
}
}
}
}
class AsyncCollectStream<T, R> extends AsyncStreamBase<R> {

@@ -1474,31 +1086,2 @@ constructor(

class AsyncIndicesWhereIterator<T> extends AsyncFastIteratorBase<number> {
constructor(
readonly source: AsyncFastIterator<T>,
readonly pred: (value: T) => MaybePromise<boolean>
) {
super();
this.return = (): Promise<void> => closeIters(source);
}
index = 0;
async fastNext<O>(otherwise?: AsyncOptLazy<O>): Promise<number | O> {
const done = Symbol('Done');
let value: T | typeof done;
const source = this.source;
const pred = this.pred;
while (done !== (value = await source.fastNext(done))) {
const cond = await pred(value);
if (cond) {
return this.index++;
}
this.index++;
}
return AsyncOptLazy.toMaybePromise(otherwise!);
}
}
class AsyncIndicesWhereStream<T> extends AsyncStreamBase<number> {

@@ -1520,30 +1103,2 @@ constructor(

class AsyncIndicesOfIterator<T> extends AsyncFastIteratorBase<number> {
constructor(
readonly source: AsyncFastIterator<T>,
readonly searchValue: T,
readonly eq: Eq<T>
) {
super();
this.return = (): Promise<void> => closeIters(source);
}
index = 0;
async fastNext<O>(otherwise?: AsyncOptLazy<O>): Promise<number | O> {
const done = Symbol('Done');
let value: T | typeof done;
const source = this.source;
const searchValue = this.searchValue;
const eq = this.eq;
while (done !== (value = await source.fastNext(done))) {
if (eq(searchValue, value)) return this.index++;
this.index++;
}
return AsyncOptLazy.toMaybePromise(otherwise!);
}
}
class AsyncIndicesOfStream<T> extends AsyncStreamBase<number> {

@@ -1567,34 +1122,2 @@ constructor(

class AsyncTakeWhileIterator<T> extends AsyncFastIteratorBase<T> {
constructor(
readonly source: AsyncFastIterator<T>,
readonly pred: (value: T, index: number) => MaybePromise<boolean>
) {
super();
this.return = (): Promise<void> => closeIters(source);
}
isDone = false;
index = 0;
async fastNext<O>(otherwise?: AsyncOptLazy<O>): Promise<T | O> {
if (this.isDone) return AsyncOptLazy.toMaybePromise(otherwise!);
const done = Symbol('Done');
const next = await this.source.fastNext(done);
if (done === next) {
this.isDone = true;
return AsyncOptLazy.toMaybePromise(otherwise!);
}
if (await this.pred(next, this.index++)) return next;
this.isDone = true;
await closeIters(this);
return AsyncOptLazy.toMaybePromise(otherwise!);
}
}
class AsyncTakeWhileStream<T> extends AsyncStreamBase<T> {

@@ -1616,30 +1139,2 @@ constructor(

class AsyncDropWhileIterator<T> extends AsyncFastIteratorBase<T> {
constructor(
readonly source: AsyncFastIterator<T>,
readonly pred: (value: T, index: number) => MaybePromise<boolean>
) {
super();
this.return = (): Promise<void> => closeIters(source);
}
pass = false;
index = 0;
async fastNext<O>(otherwise?: AsyncOptLazy<O>): Promise<T | O> {
const source = this.source;
if (this.pass) return source.fastNext(otherwise!);
const done = Symbol('Done');
let value: T | typeof done;
while (done !== (value = await source.fastNext(done))) {
this.pass = !(await this.pred(value, this.index++));
if (this.pass) return value;
}
return AsyncOptLazy.toMaybePromise(otherwise!);
}
}
class AsyncDropWhileStream<T> extends AsyncStreamBase<T> {

@@ -1661,22 +1156,2 @@ constructor(

class AsyncTakeIterator<T> extends AsyncFastIteratorBase<T> {
constructor(readonly source: AsyncFastIterator<T>, readonly amount: number) {
super();
this.return = (): Promise<void> => closeIters(source);
}
i = 0;
async fastNext<O>(otherwise?: AsyncOptLazy<O>): Promise<T | O> {
if (this.i++ >= this.amount) {
await closeIters(this);
this.return = undefined;
return AsyncOptLazy.toMaybePromise(otherwise!);
}
return this.source.fastNext(otherwise!);
}
}
class AsyncTakeStream<T> extends AsyncStreamBase<T> {

@@ -1700,30 +1175,2 @@ constructor(readonly source: AsyncStream<T>, readonly amount: number) {

class AsyncDropIterator<T> extends AsyncFastIteratorBase<T> {
remain: number;
constructor(readonly source: AsyncFastIterator<T>, readonly amount: number) {
super();
this.return = (): Promise<void> => closeIters(source);
this.remain = amount;
}
async fastNext<O>(otherwise?: AsyncOptLazy<O>): Promise<T | O> {
const source = this.source;
if (this.remain <= 0) return source.fastNext(otherwise!);
const done = Symbol('Done');
let value: T | typeof done;
while (done !== (value = await source.fastNext(done))) {
if (this.remain-- <= 0) {
return value;
}
}
return AsyncOptLazy.toMaybePromise(otherwise!);
}
}
class AsyncDropStream<T> extends AsyncStreamBase<T> {

@@ -1747,95 +1194,2 @@ constructor(readonly source: AsyncStream<T>, readonly amount: number) {

class AsyncRepeatIterator<T> extends AsyncFastIteratorBase<T> {
iterator: AsyncFastIterator<T>;
remain?: number;
constructor(readonly source: AsyncStream<T>, readonly amount?: number) {
super();
this.iterator = source[Symbol.asyncIterator]();
this.return = (): Promise<void> => closeIters(this.iterator);
this.remain = amount;
}
isEmpty = true;
async fastNext<O>(otherwise?: AsyncOptLazy<O>): Promise<T | O> {
const done = Symbol('Done');
const iterator = this.iterator;
let value = await iterator.fastNext(done);
if (done !== value) {
this.isEmpty = false;
return value;
}
if (this.isEmpty) {
return AsyncOptLazy.toMaybePromise(otherwise!) as O;
}
if (undefined !== this.remain) {
this.remain--;
if (this.remain <= 0) {
return AsyncOptLazy.toMaybePromise(otherwise!);
}
}
this.iterator = this.source[Symbol.asyncIterator]();
value = await this.iterator.fastNext(done);
if (done === value) {
return AsyncOptLazy.toMaybePromise(otherwise!);
}
return value;
}
}
class AsyncSplitWhereIterator<T> extends AsyncFastIteratorBase<T[]> {
constructor(
readonly source: AsyncFastIterator<T>,
readonly pred: (value: T, index: number) => MaybePromise<boolean>
) {
super();
this.return = (): Promise<void> => closeIters(source);
}
index = 0;
async fastNext<O>(otherwise?: AsyncOptLazy<O>): Promise<T[] | O> {
const startIndex = this.index;
if (startIndex < 0) {
return AsyncOptLazy.toMaybePromise(otherwise!);
}
const result: T[] = [];
const source = this.source;
const done = Symbol('Done');
let value: T | typeof done;
const pred = this.pred;
while (done !== (value = await source.fastNext(done))) {
if (await pred(value, this.index++)) return result;
result.push(value);
}
this.return = undefined;
if (startIndex === this.index) {
this.index = -1;
return AsyncOptLazy.toMaybePromise(otherwise!);
}
this.index = -1;
return result;
}
}
class AsyncSplitWhereStream<T> extends AsyncStreamBase<T[]> {

@@ -1857,97 +1211,2 @@ constructor(

class AsyncFoldIterator<I, R> extends AsyncFastIteratorBase<R> {
constructor(
readonly source: AsyncFastIterator<I>,
readonly init: AsyncOptLazy<R>,
readonly getNext: (
current: R,
value: I,
index: number,
halt: () => void
) => MaybePromise<R>
) {
super();
this.return = (): Promise<void> => closeIters(source);
}
current?: R;
isInitialized = false;
state = TraverseState();
async fastNext<O>(otherwise?: AsyncOptLazy<O>): Promise<R | O> {
if (!this.isInitialized) {
this.current = await AsyncOptLazy.toMaybePromise(this.init);
this.isInitialized = true;
}
const state = this.state;
if (state.halted) {
await closeIters(this);
this.return = undefined;
return AsyncOptLazy.toMaybePromise(otherwise!);
}
const done = Symbol('done');
const value = await this.source.fastNext(done);
if (done === value) {
this.return = undefined;
return AsyncOptLazy.toMaybePromise(otherwise!);
}
this.current = await this.getNext(
this.current!,
value,
state.nextIndex(),
state.halt
);
return this.current;
}
}
class AsyncSplitOnIterator<T> extends AsyncFastIteratorBase<T[]> {
constructor(
readonly source: AsyncFastIterator<T>,
readonly sepElem: T,
readonly eq: Eq<T>
) {
super();
this.return = (): Promise<void> => closeIters(source);
}
isDone = false;
async fastNext<O>(otherwise?: AsyncOptLazy<O>): Promise<T[] | O> {
if (this.isDone) {
return AsyncOptLazy.toMaybePromise(otherwise!);
}
const result: T[] = [];
const source = this.source;
const done = Symbol('Done');
let value: T | typeof done;
let processed = false;
const eq = this.eq;
const sepElem = this.sepElem;
while (done !== (value = await source.fastNext(done))) {
processed = true;
if (eq(value, sepElem)) return result;
result.push(value);
}
this.return = undefined;
this.isDone = true;
if (!processed) return AsyncOptLazy.toMaybePromise(otherwise!);
return result;
}
}
class AsyncSplitOnStream<T> extends AsyncStreamBase<T[]> {

@@ -1971,67 +1230,2 @@ constructor(

class AsyncReduceIterator<I, R> extends AsyncFastIteratorBase<R> {
state: unknown;
constructor(
readonly source: AsyncFastIterator<I>,
readonly reducer: AsyncReducer<I, R>
) {
super();
this.return = (): Promise<void> => closeIters(source);
}
readonly traverseState = TraverseState();
isInitialized = false;
isDone = false;
async fastNext<O>(otherwise?: AsyncOptLazy<O>): Promise<R | O> {
if (this.isDone) {
return AsyncOptLazy.toMaybePromise(otherwise!);
}
const done = Symbol('Done');
const value = await this.source.fastNext(done);
if (done === value) {
this.isDone = true;
this.return = undefined;
return AsyncOptLazy.toMaybePromise(otherwise!);
}
const reducer = this.reducer;
if (!this.isInitialized) {
this.state = await AsyncOptLazy.toMaybePromise(reducer.init);
this.isInitialized = true;
}
const traverseState = this.traverseState;
try {
this.state = await reducer.next(
this.state,
value,
traverseState.nextIndex(),
traverseState.halt
);
} catch (err) {
this.isDone = true;
await closeIters(this);
this.return = undefined;
await reducer.onClose?.(this.state, err);
throw err;
}
if (traverseState.halted) {
this.isDone = true;
await closeIters(this);
this.return = undefined;
await reducer.onClose?.(this.state);
}
return reducer.stateToResult(this.state);
}
}
class AsyncReduceStream<I, R> extends AsyncStreamBase<R> {

@@ -2053,90 +1247,2 @@ constructor(

class AsyncReduceAllIterator<I, R> extends AsyncFastIteratorBase<R> {
state?: unknown[];
readonly reducersToClose: Set<AsyncReducer<any>>;
constructor(
readonly source: AsyncFastIterator<I>,
readonly reducers: AsyncReducer<I, any>[]
) {
super();
this.reducersToClose = new Set(reducers);
this.return = async (): Promise<void> => {
await closeIters(source);
const state = this.state;
if (state) {
await Promise.all(
[...this.reducersToClose].map((reducer, index) =>
reducer.onClose?.(state[index], this.err)
)
);
}
};
}
index = 0;
isDone = false;
err: any;
async fastNext<O>(otherwise?: AsyncOptLazy<O>): Promise<R | O> {
const reducers = this.reducers;
if (undefined === this.state) {
this.state = await Promise.all(
reducers.map((d: any): unknown => AsyncOptLazy.toMaybePromise(d.init))
);
}
if (this.isDone) {
await closeIters(this);
this.return = undefined;
return AsyncOptLazy.toMaybePromise(otherwise!);
}
const done = Symbol('Done');
const value = await this.source.fastNext(done);
if (done === value) {
return AsyncOptLazy.toMaybePromise(otherwise!);
}
const state = this.state;
this.state = await Promise.all(
reducers.map((red, i) => {
const st = state[i];
if (!this.reducersToClose.has(red)) return st;
try {
return red.next(st, value, this.index, async () => {
this.reducersToClose.delete(red);
await red.onClose?.(st);
return st;
});
} catch (e) {
this.err = e;
}
})
);
this.index++;
this.isDone = this.reducersToClose.size === 0 || undefined !== this.err;
if (this.isDone) {
await closeIters(this);
this.return = undefined;
return AsyncOptLazy.toMaybePromise(otherwise!);
}
return Promise.all(
this.state.map((s, i) => reducers[i].stateToResult(s))
) as any;
}
}
class AsyncReduceAllStream<I, R> extends AsyncStreamBase<R> {

@@ -2143,0 +1249,0 @@ constructor(

Sorry, the diff of this file is too big to display

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is too big to display

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is too big to display

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc