Socket
Socket
Sign inDemoInstall

@rimbu/stream

Package Overview
Dependencies
Maintainers
1
Versions
56
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@rimbu/stream - npm Package Compare versions

Comparing version 0.11.0 to 0.12.0

dist/main/async/async-transformer.js

1

dist/main/async/index.js

@@ -16,2 +16,3 @@ "use strict";

tslib_1.__exportStar(require("./interface"), exports);
tslib_1.__exportStar(require("./async-transformer"), exports);
//# sourceMappingURL=index.js.map
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.MapPureIterator = exports.MapIterator = exports.IndexedIterator = exports.AppendIterator = exports.PrependIterator = exports.ZipAllWithItererator = exports.ZipWithIterator = exports.UnfoldIterator = exports.RandomIntIterator = exports.RandomIterator = exports.RangeDownIterator = exports.RangeUpIterator = exports.FilterApplyIterator = exports.MapApplyIterator = exports.AlwaysIterator = exports.ArrayReverseIterator = exports.ArrayIterator = exports.ReduceAllIterator = exports.ReduceIterator = exports.SplitOnIterator = exports.SplitWhereIterator = exports.IntersperseIterator = exports.RepeatIterator = exports.DropIterator = exports.TakeIterator = exports.DropWhileIterator = exports.TakeWhileIterator = exports.IndicesOfIterator = exports.IndicesWhereIterator = exports.CollectIterator = exports.FilterPureIterator = exports.FilterIterator = exports.ConcatIterator = exports.FlatMapIterator = exports.FastIteratorBase = exports.isFastIterator = exports.emptyFastIterator = exports.fixedDoneIteratorResult = void 0;
exports.WindowIterator = exports.DistinctPreviousIterator = exports.MapPureIterator = exports.MapIterator = exports.IndexedIterator = exports.AppendIterator = exports.PrependIterator = exports.ZipAllWithItererator = exports.ZipWithIterator = exports.UnfoldIterator = exports.RandomIntIterator = exports.RandomIterator = exports.RangeDownIterator = exports.RangeUpIterator = exports.FilterApplyIterator = exports.MapApplyIterator = exports.AlwaysIterator = exports.ArrayReverseIterator = exports.ArrayIterator = exports.ReduceAllIterator = exports.ReduceIterator = exports.SplitOnIterator = exports.SplitWhereIterator = exports.IntersperseIterator = exports.RepeatIterator = exports.DropIterator = exports.TakeIterator = exports.DropWhileIterator = exports.TakeWhileIterator = exports.IndicesOfIterator = exports.IndicesWhereIterator = exports.CollectIterator = exports.FilterPureIterator = exports.FilterIterator = exports.ConcatIterator = exports.FlatMapIterator = exports.FastIteratorBase = exports.isFastIterator = exports.emptyFastIterator = exports.fixedDoneIteratorResult = void 0;
var tslib_1 = require("tslib");

@@ -919,2 +919,94 @@ var base_1 = require("@rimbu/base");

exports.MapPureIterator = MapPureIterator;
var DistinctPreviousIterator = /** @class */ (function (_super) {
tslib_1.__extends(DistinctPreviousIterator, _super);
function DistinctPreviousIterator(source, eq) {
var _this = _super.call(this) || this;
_this.source = source;
_this.eq = eq;
_this.previous = [];
return _this;
}
DistinctPreviousIterator.prototype.fastNext = function (otherwise) {
var done = Symbol('Done');
var next;
var source = this.source;
var previous = this.previous;
while (done !== (next = source.fastNext(done))) {
previous.push(next);
if (previous.length === 1) {
return next;
}
var prev = previous.shift();
if (!this.eq(prev, next)) {
return next;
}
}
return (0, common_1.OptLazy)(otherwise);
};
return DistinctPreviousIterator;
}(FastIteratorBase));
exports.DistinctPreviousIterator = DistinctPreviousIterator;
var WindowIterator = /** @class */ (function (_super) {
tslib_1.__extends(WindowIterator, _super);
function WindowIterator(source, windowSize, skipAmount, collector) {
var _this = _super.call(this) || this;
_this.source = source;
_this.windowSize = windowSize;
_this.skipAmount = skipAmount;
_this.collector = collector;
_this.state = new Set();
_this.index = 0;
return _this;
}
WindowIterator.prototype.fastNext = function (otherwise) {
var e_1, _a;
var source = this.source;
var collector = this.collector;
var windowSize = this.windowSize;
var skipAmount = this.skipAmount;
var done = Symbol('Done');
var state = this.state;
var next;
var result = done;
while (done !== (next = source.fastNext(done))) {
try {
for (var state_1 = (e_1 = void 0, tslib_1.__values(state)), state_1_1 = state_1.next(); !state_1_1.done; state_1_1 = state_1.next()) {
var current = state_1_1.value;
current.result = collector.next(current.result, next, current.size, current.halt);
current.size++;
if (current.size >= windowSize || current.halted) {
result = collector.stateToResult(current.result);
state.delete(current);
}
}
}
catch (e_1_1) { e_1 = { error: e_1_1 }; }
finally {
try {
if (state_1_1 && !state_1_1.done && (_a = state_1.return)) _a.call(state_1);
}
finally { if (e_1) throw e_1.error; }
}
if (this.index % skipAmount === 0) {
var newState = {
result: (0, common_1.OptLazy)(collector.init),
size: 1,
halted: false,
halt: function () {
this.halted = true;
},
};
newState.result = collector.next((0, common_1.OptLazy)(collector.init), next, 0, newState.halt);
state.add(newState);
}
this.index++;
if (done !== result) {
return result;
}
}
return (0, common_1.OptLazy)(otherwise);
};
return WindowIterator;
}(FastIteratorBase));
exports.WindowIterator = WindowIterator;
//# sourceMappingURL=fast-iterator-custom.js.map

@@ -19,3 +19,4 @@ "use strict";

tslib_1.__exportStar(require("./interface"), exports);
tslib_1.__exportStar(require("./transformer"), exports);
tslib_1.__exportStar(require("../async"), exports);
//# sourceMappingURL=index.js.map

@@ -925,2 +925,81 @@ import { __awaiter } from "tslib";

}
export class AsyncDistinctPreviousIterator extends AsyncFastIteratorBase {
constructor(source, eq) {
super();
this.source = source;
this.eq = eq;
this.previous = [];
this.return = () => closeIters(source);
}
fastNext(otherwise) {
return __awaiter(this, void 0, void 0, function* () {
const done = Symbol('Done');
let next;
const source = this.source;
const previous = this.previous;
while (done !== (next = yield source.fastNext(done))) {
previous.push(next);
if (previous.length === 1) {
return next;
}
const prev = previous.shift();
if (!this.eq(prev, next)) {
return next;
}
}
return AsyncOptLazy.toMaybePromise(otherwise);
});
}
}
export class AsyncWindowIterator extends AsyncFastIteratorBase {
constructor(source, windowSize, skipAmount, collector) {
super();
this.source = source;
this.windowSize = windowSize;
this.skipAmount = skipAmount;
this.collector = collector;
this.state = new Set();
this.index = 0;
this.return = () => closeIters(source);
}
fastNext(otherwise) {
return __awaiter(this, void 0, void 0, function* () {
const source = this.source;
const collector = this.collector;
const windowSize = this.windowSize;
const skipAmount = this.skipAmount;
const done = Symbol('Done');
const state = this.state;
let next;
let result = done;
while (done !== (next = yield source.fastNext(done))) {
for (const current of state) {
current.result = yield collector.next(current.result, next, current.size, current.halt);
current.size++;
if (current.size >= windowSize || current.halted) {
result = yield collector.stateToResult(current.result);
state.delete(current);
}
}
if (this.index % skipAmount === 0) {
const newState = {
result: yield AsyncOptLazy.toMaybePromise(collector.init),
size: 1,
halted: false,
halt() {
this.halted = true;
},
};
newState.result = yield collector.next(AsyncOptLazy.toMaybePromise(collector.init), next, 0, newState.halt);
state.add(newState);
}
this.index++;
if (done !== result) {
return result;
}
}
return AsyncOptLazy.toMaybePromise(otherwise);
});
}
}
//# sourceMappingURL=async-fast-iterator-base.js.map
import { __awaiter } from "tslib";
import { AsyncOptLazy, Comp, Eq, TraverseState, } from '@rimbu/common';
import { AsyncOptLazy, AsyncReducer, Comp, Eq, TraverseState, } from '@rimbu/common';
import { RimbuError } from '@rimbu/base';
import { closeIters, emptyAsyncFastIterator, FromAsyncIterator, FromIterator, FromPromise, isAsyncFastIterator, FromResourceIterator, AsyncUnfoldIterator, AsyncZipWithIterator, AsyncZipAllWithItererator, AsyncAppendIterator, AsyncCollectIterator, AsyncConcatIterator, AsyncDropIterator, AsyncSplitWhereIterator, AsyncDropWhileIterator, AsyncFilterIterator, AsyncFilterPureIterator, AsyncFlatMapIterator, AsyncFoldIterator, AsyncIndexedIterator, AsyncIndicesOfIterator, AsyncIndicesWhereIterator, AsyncIntersperseIterator, AsyncMapIterator, AsyncMapPureIterator, AsyncOfIterator, AsyncPrependIterator, AsyncReduceAllIterator, AsyncReduceIterator, AsyncRepeatIterator, AsyncSplitOnIterator, AsyncTakeIterator, AsyncTakeWhileIterator, } from '@rimbu/stream/async-custom';
import { closeIters, emptyAsyncFastIterator, FromAsyncIterator, FromIterator, FromPromise, isAsyncFastIterator, FromResourceIterator, AsyncUnfoldIterator, AsyncZipWithIterator, AsyncZipAllWithItererator, AsyncAppendIterator, AsyncCollectIterator, AsyncConcatIterator, AsyncDropIterator, AsyncSplitWhereIterator, AsyncDistinctPreviousIterator, AsyncDropWhileIterator, AsyncFilterIterator, AsyncFilterPureIterator, AsyncFlatMapIterator, AsyncFoldIterator, AsyncIndexedIterator, AsyncIndicesOfIterator, AsyncIndicesWhereIterator, AsyncIntersperseIterator, AsyncMapIterator, AsyncMapPureIterator, AsyncOfIterator, AsyncPrependIterator, AsyncReduceAllIterator, AsyncReduceIterator, AsyncRepeatIterator, AsyncSplitOnIterator, AsyncTakeIterator, AsyncTakeWhileIterator, AsyncWindowIterator, } from '@rimbu/stream/async-custom';
import { Stream } from '@rimbu/stream';

@@ -104,4 +104,4 @@ import { isEmptyStreamSourceInstance } from '@rimbu/stream/custom';

}
flatReduceStream(reducer) {
return AsyncStreamConstructorsImpl.flatten(this.reduceStream(reducer));
transform(transformer) {
return AsyncStreamConstructorsImpl.flatten(this.reduceStream(transformer));
}

@@ -129,3 +129,3 @@ filter(pred) {

if (done === value) {
return AsyncOptLazy.toMaybePromise(otherwise);
return AsyncOptLazy.toPromise(otherwise);
}

@@ -146,6 +146,20 @@ yield closeIters(iter);

if (done === lastValue)
return AsyncOptLazy.toMaybePromise(otherwise);
return AsyncOptLazy.toPromise(otherwise);
return lastValue;
});
}
single(otherwise) {
return __awaiter(this, void 0, void 0, function* () {
const iterator = this[Symbol.asyncIterator]();
const done = Symbol('Done');
const value = yield iterator.fastNext(done);
if (done !== value) {
if (done === (yield iterator.fastNext(done))) {
return value;
}
}
yield closeIters(iterator);
return AsyncOptLazy.toPromise(otherwise);
});
}
count() {

@@ -208,3 +222,3 @@ return __awaiter(this, void 0, void 0, function* () {

if (occurrance <= 0)
return AsyncOptLazy.toMaybePromise(otherwise);
return AsyncOptLazy.toPromise(otherwise);
const done = Symbol('Done');

@@ -222,3 +236,3 @@ const iterator = this[Symbol.asyncIterator]();

}
return AsyncOptLazy.toMaybePromise(otherwise);
return AsyncOptLazy.toPromise(otherwise);
}

@@ -234,3 +248,3 @@ finally {

if (index < 0)
return AsyncOptLazy.toMaybePromise(otherwise);
return AsyncOptLazy.toPromise(otherwise);
const done = Symbol('Done');

@@ -247,3 +261,3 @@ const iterator = this[Symbol.asyncIterator]();

}
return AsyncOptLazy.toMaybePromise(otherwise);
return AsyncOptLazy.toPromise(otherwise);
}

@@ -350,2 +364,24 @@ finally {

}
containsSlice(source, eq = Eq.objectIs) {
return __awaiter(this, void 0, void 0, function* () {
const iterator = this[Symbol.asyncIterator]();
const sourceStream = fromAsyncStreamSource(source);
let sourceIterator = sourceStream[Symbol.asyncIterator]();
const done = Symbol('Done');
while (true) {
const sourceValue = yield sourceIterator.fastNext(done);
if (done === sourceValue) {
yield closeIters(iterator);
return true;
}
const value = yield iterator.fastNext(done);
if (done === value) {
return false;
}
if (!eq(sourceValue, value)) {
sourceIterator = sourceStream[Symbol.asyncIterator]();
}
}
});
}
takeWhile(pred) {

@@ -390,3 +426,3 @@ return new AsyncTakeWhileStream(this, pred);

if (done === result)
return AsyncOptLazy.toMaybePromise(otherwise);
return AsyncOptLazy.toPromise(otherwise);
while (done !== (value = yield iterator.fastNext(done))) {

@@ -416,3 +452,3 @@ if (compare(value, result) < 0)

if (done === result)
return AsyncOptLazy.toMaybePromise(otherwise);
return AsyncOptLazy.toPromise(otherwise);
while (done !== (value = yield iterator.fastNext(done))) {

@@ -468,2 +504,8 @@ if (compare(value, result) > 0)

}
distinctPrevious(eq = Eq.objectIs) {
return new AsyncDistinctPreviousStream(this, eq);
}
window(windowSize, skipAmount = windowSize, collector = AsyncReducer.toArray()) {
return new AsyncWindowStream(this, windowSize, skipAmount, collector);
}
fold(init, next) {

@@ -616,3 +658,3 @@ return __awaiter(this, void 0, void 0, function* () {

return __awaiter(this, void 0, void 0, function* () {
return AsyncOptLazy.toMaybePromise(this.item);
return AsyncOptLazy.toPromise(this.item);
});

@@ -645,3 +687,3 @@ }

return __awaiter(this, void 0, void 0, function* () {
return AsyncOptLazy.toMaybePromise(this.item);
return AsyncOptLazy.toPromise(this.item);
});

@@ -682,3 +724,3 @@ }

if (done === value)
return AsyncOptLazy.toMaybePromise(otherwise);
return AsyncOptLazy.toPromise(otherwise);
return this.mapFun(value, 0);

@@ -692,3 +734,3 @@ });

if (done === value)
return AsyncOptLazy.toMaybePromise(otherwise);
return AsyncOptLazy.toPromise(otherwise);
return this.mapFun(value, 0);

@@ -707,3 +749,3 @@ });

if (done === value)
return AsyncOptLazy.toMaybePromise(otherwise);
return AsyncOptLazy.toPromise(otherwise);
return this.mapFun(value, index);

@@ -731,3 +773,3 @@ });

if (done === value)
return AsyncOptLazy.toMaybePromise(otherwise);
return AsyncOptLazy.toPromise(otherwise);
return this.mapFun(value, ...this.args);

@@ -741,3 +783,3 @@ });

if (done === value)
return AsyncOptLazy.toMaybePromise(otherwise);
return AsyncOptLazy.toPromise(otherwise);
return this.mapFun(value, ...this.args);

@@ -756,3 +798,3 @@ });

if (done === value)
return AsyncOptLazy.toMaybePromise(otherwise);
return AsyncOptLazy.toPromise(otherwise);
return this.mapFun(value, ...this.args);

@@ -920,2 +962,24 @@ });

}
class AsyncDistinctPreviousStream extends AsyncStreamBase {
constructor(source, eq) {
super();
this.source = source;
this.eq = eq;
}
[Symbol.asyncIterator]() {
return new AsyncDistinctPreviousIterator(this.source[Symbol.asyncIterator](), this.eq);
}
}
class AsyncWindowStream extends AsyncStreamBase {
constructor(source, windowSize, skipAmount, collector) {
super();
this.source = source;
this.windowSize = windowSize;
this.skipAmount = skipAmount;
this.collector = collector;
}
[Symbol.asyncIterator]() {
return new AsyncWindowIterator(this.source[Symbol.asyncIterator](), this.windowSize, this.skipAmount, this.collector);
}
}
class AsyncReduceStream extends AsyncStreamBase {

@@ -957,2 +1021,20 @@ constructor(source, reducerDef) {

}
asyncStream() {
return this;
}
equals(other) {
return __awaiter(this, void 0, void 0, function* () {
if (other === this)
return true;
const done = Symbol('done');
return (done ===
fromAsyncStreamSource(other)[Symbol.asyncIterator]().fastNext(done));
});
}
prepend(value) {
return AsyncStreamConstructorsImpl.of(value);
}
append(value) {
return AsyncStreamConstructorsImpl.of(value);
}
assumeNonEmpty() {

@@ -966,2 +1048,7 @@ RimbuError.throwEmptyCollectionAssumedNonEmptyError();

}
forEachPure() {
return __awaiter(this, void 0, void 0, function* () {
//
});
}
indexed() {

@@ -979,2 +1066,10 @@ return this;

}
flatZip() {
return this;
}
transform(transformer) {
return AsyncStreamConstructorsImpl.from(() => __awaiter(this, void 0, void 0, function* () {
return yield transformer.stateToResult(yield AsyncOptLazy.toMaybePromise(transformer.init));
}));
}
filter() {

@@ -1001,2 +1096,5 @@ return this;

}
single(otherwise) {
return AsyncOptLazy.toPromise(otherwise);
}
count() {

@@ -1054,2 +1152,7 @@ return __awaiter(this, void 0, void 0, function* () {

}
containsSlice() {
return __awaiter(this, void 0, void 0, function* () {
return false;
});
}
takeWhile() {

@@ -1103,2 +1206,14 @@ return this;

}
splitOn() {
return this;
}
splitWhere() {
return this;
}
distinctPrevious() {
return this;
}
window() {
return this;
}
fold(init) {

@@ -1112,3 +1227,3 @@ return AsyncOptLazy.toPromise(init);

return __awaiter(this, void 0, void 0, function* () {
return reducer.stateToResult(yield AsyncOptLazy.toMaybePromise(reducer.init));
return reducer.stateToResult(yield AsyncOptLazy.toPromise(reducer.init));
});

@@ -1133,2 +1248,10 @@ }

}
toJSON() {
return __awaiter(this, void 0, void 0, function* () {
return {
dataType: 'AsyncStream',
value: [],
};
});
}
}

@@ -1135,0 +1258,0 @@ export class FromSource extends AsyncStreamBase {

@@ -13,2 +13,3 @@ /**

export * from './interface';
export * from './async-transformer';
//# sourceMappingURL=index.js.map

@@ -767,2 +767,75 @@ import { Token } from '@rimbu/base';

}
export class DistinctPreviousIterator extends FastIteratorBase {
constructor(source, eq) {
super();
this.source = source;
this.eq = eq;
this.previous = [];
}
fastNext(otherwise) {
const done = Symbol('Done');
let next;
const source = this.source;
const previous = this.previous;
while (done !== (next = source.fastNext(done))) {
previous.push(next);
if (previous.length === 1) {
return next;
}
const prev = previous.shift();
if (!this.eq(prev, next)) {
return next;
}
}
return OptLazy(otherwise);
}
}
export class WindowIterator extends FastIteratorBase {
constructor(source, windowSize, skipAmount, collector) {
super();
this.source = source;
this.windowSize = windowSize;
this.skipAmount = skipAmount;
this.collector = collector;
this.state = new Set();
this.index = 0;
}
fastNext(otherwise) {
const source = this.source;
const collector = this.collector;
const windowSize = this.windowSize;
const skipAmount = this.skipAmount;
const done = Symbol('Done');
const state = this.state;
let next;
let result = done;
while (done !== (next = source.fastNext(done))) {
for (const current of state) {
current.result = collector.next(current.result, next, current.size, current.halt);
current.size++;
if (current.size >= windowSize || current.halted) {
result = collector.stateToResult(current.result);
state.delete(current);
}
}
if (this.index % skipAmount === 0) {
const newState = {
result: OptLazy(collector.init),
size: 1,
halted: false,
halt() {
this.halted = true;
},
};
newState.result = collector.next(OptLazy(collector.init), next, 0, newState.halt);
state.add(newState);
}
this.index++;
if (done !== result) {
return result;
}
}
return OptLazy(otherwise);
}
}
//# sourceMappingURL=fast-iterator-custom.js.map
import { Comp, Eq, ErrBase, IndexRange, OptLazy, Range, Reducer, TraverseState, } from '@rimbu/common';
import { isFastIterator, emptyFastIterator, FlatMapIterator, ConcatIterator, FilterIterator, FilterPureIterator, IndicesWhereIterator, IndicesOfIterator, TakeWhileIterator, DropWhileIterator, TakeIterator, DropIterator, IntersperseIterator, SplitWhereIterator, SplitOnIterator, ReduceIterator, ReduceAllIterator, RepeatIterator, CollectIterator, ArrayIterator, ArrayReverseIterator, AlwaysIterator, MapApplyIterator, FilterApplyIterator, RangeDownIterator, RangeUpIterator, RandomIntIterator, RandomIterator, UnfoldIterator, ZipAllWithItererator, ZipWithIterator, AppendIterator, IndexedIterator, MapIterator, MapPureIterator, PrependIterator, } from '@rimbu/stream/custom';
import { isFastIterator, emptyFastIterator, FlatMapIterator, ConcatIterator, FilterIterator, FilterPureIterator, IndicesWhereIterator, IndicesOfIterator, TakeWhileIterator, DropWhileIterator, TakeIterator, DropIterator, IntersperseIterator, SplitWhereIterator, SplitOnIterator, ReduceIterator, ReduceAllIterator, RepeatIterator, CollectIterator, ArrayIterator, ArrayReverseIterator, AlwaysIterator, MapApplyIterator, FilterApplyIterator, RangeDownIterator, RangeUpIterator, RandomIntIterator, RandomIterator, UnfoldIterator, ZipAllWithItererator, ZipWithIterator, AppendIterator, IndexedIterator, MapIterator, MapPureIterator, PrependIterator, DistinctPreviousIterator, WindowIterator, } from '@rimbu/stream/custom';
import { RimbuError } from '@rimbu/base';

@@ -70,4 +70,4 @@ export class StreamBase {

}
flatReduceStream(reducer) {
return StreamConstructorsImpl.flatten(this.reduceStream(reducer));
transform(transformer) {
return StreamConstructorsImpl.flatten(this.reduceStream(transformer));
}

@@ -104,2 +104,13 @@ filter(pred) {

}
single(otherwise) {
const iterator = this[Symbol.iterator]();
const done = Symbol('Done');
const value = iterator.fastNext(done);
if (done !== value) {
if (done === iterator.fastNext(done)) {
return value;
}
}
return OptLazy(otherwise);
}
count() {

@@ -224,2 +235,19 @@ let result = 0;

}
containsSlice(source, eq = Eq.objectIs) {
const iterator = this[Symbol.iterator]();
const sourceStream = fromStreamSource(source);
let sourceIterator = sourceStream[Symbol.iterator]();
const done = Symbol('Done');
while (true) {
const sourceValue = sourceIterator.fastNext(done);
if (done === sourceValue)
return true;
const value = iterator.fastNext(done);
if (done === value)
return false;
if (!eq(sourceValue, value)) {
sourceIterator = sourceStream[Symbol.iterator]();
}
}
}
takeWhile(pred) {

@@ -314,2 +342,8 @@ return new TakeWhileStream(this, pred);

}
distinctPrevious(eq = Eq.objectIs) {
return new DistinctPreviousStream(this, eq);
}
window(windowSize, skipAmount = windowSize, collector = Reducer.toArray()) {
return new WindowStream(this, windowSize, skipAmount, collector);
}
fold(init, next) {

@@ -801,8 +835,24 @@ return this.reduce(Reducer.createOutput(init, next));

}
stream() {
return this;
}
assumeNonEmpty() {
RimbuError.throwEmptyCollectionAssumedNonEmptyError();
}
equals(other) {
const done = Symbol('Done');
return done === fromStreamSource(other)[Symbol.iterator]().fastNext(done);
}
prepend(value) {
return StreamConstructorsImpl.of(OptLazy(value));
}
append(value) {
return StreamConstructorsImpl.of(OptLazy(value));
}
forEach() {
//
}
forEachPure() {
//
}
indexed() {

@@ -823,4 +873,4 @@ return this;

}
flatReduceStream(reducer) {
return StreamConstructorsImpl.from(reducer.stateToResult(OptLazy(reducer.init)));
transform(transformer) {
return StreamConstructorsImpl.from(transformer.stateToResult(OptLazy(transformer.init)));
}

@@ -848,2 +898,5 @@ filter() {

}
single(otherwise) {
return OptLazy(otherwise);
}
count() {

@@ -885,2 +938,5 @@ return 0;

}
containsSlice() {
return false;
}
takeWhile() {

@@ -932,2 +988,14 @@ return this;

}
splitOn() {
return this;
}
splitWhere() {
return this;
}
distinctPrevious() {
return this;
}
window() {
return this;
}
fold(init) {

@@ -957,2 +1025,8 @@ return OptLazy(init);

}
toJSON() {
return {
dataType: 'Stream',
value: [],
};
}
}

@@ -1227,2 +1301,24 @@ export class ArrayStream extends StreamBase {

}
export class DistinctPreviousStream extends StreamBase {
constructor(source, eq) {
super();
this.source = source;
this.eq = eq;
}
[Symbol.iterator]() {
return new DistinctPreviousIterator(this.source[Symbol.iterator](), this.eq);
}
}
export class WindowStream extends StreamBase {
constructor(source, windowSize, skipAmount, collector) {
super();
this.source = source;
this.windowSize = windowSize;
this.skipAmount = skipAmount;
this.collector = collector;
}
[Symbol.iterator]() {
return new WindowIterator(this.source[Symbol.iterator](), this.windowSize, this.skipAmount, this.collector);
}
}
export const emptyStream = Object.freeze(new EmptyStream());

@@ -1229,0 +1325,0 @@ export function isStream(obj) {

@@ -16,3 +16,4 @@ /**

export * from './interface';
export * from './transformer';
export * from '../async';
//# sourceMappingURL=index.js.map

@@ -271,1 +271,23 @@ import { Token } from '@rimbu/base';

}
export declare class AsyncDistinctPreviousIterator<T> extends AsyncFastIteratorBase<T> {
readonly source: AsyncFastIterator<T>;
readonly eq: Eq<T>;
constructor(source: AsyncFastIterator<T>, eq: Eq<T>);
readonly previous: T[];
fastNext<O>(otherwise?: AsyncOptLazy<O>): Promise<T | O>;
}
export declare class AsyncWindowIterator<T, R> extends AsyncFastIteratorBase<R> {
readonly source: AsyncFastIterator<T>;
readonly windowSize: number;
readonly skipAmount: number;
readonly collector: AsyncReducer<T, R>;
constructor(source: AsyncFastIterator<T>, windowSize: number, skipAmount: number, collector: AsyncReducer<T, R>);
state: Set<{
result: unknown;
size: number;
halted: boolean;
halt: () => void;
}>;
index: number;
fastNext<O>(otherwise?: AsyncOptLazy<O>): Promise<R | O>;
}

@@ -19,3 +19,3 @@ import { ArrayNonEmpty, AsyncCollectFun, AsyncOptLazy, AsyncReducer, Eq, MaybePromise, ToJSON, TraverseState } from '@rimbu/common';

flatZip<T2>(flatMapFun: (value: T, index: number, halt: () => void) => AsyncStreamSource<T2>): AsyncStream<[T, T2]>;
flatReduceStream<R>(reducer: AsyncReducer<T, AsyncStreamSource<R>>): AsyncStream<R>;
transform<R>(transformer: AsyncReducer<T, AsyncStreamSource<R>>): AsyncStream<R>;
filter(pred: (value: T, index: number, halt: () => void) => MaybePromise<boolean>): AsyncStream<T>;

@@ -28,2 +28,3 @@ filterNot(pred: (value: T, index: number, halt: () => void) => MaybePromise<boolean>): AsyncStream<T>;

last<O>(otherwise?: AsyncOptLazy<O>): Promise<T | O>;
single<O>(otherwise?: AsyncOptLazy<O>): Promise<T | O>;
count(): Promise<number>;

@@ -41,2 +42,3 @@ countElement(value: T, eq?: Eq<any>): Promise<number>;

contains(searchValue: T, amount?: number, eq?: Eq<T>): Promise<boolean>;
containsSlice(source: AsyncStreamSource.NonEmpty<T>, eq?: Eq<any>): Promise<boolean>;
takeWhile(pred: (value: T, index: number) => MaybePromise<boolean>): AsyncStream<T>;

@@ -67,2 +69,4 @@ dropWhile(pred: (value: T, index: number) => MaybePromise<boolean>): AsyncStream<T>;

splitOn(sepElem: T, eq?: Eq<T>): AsyncStream<T[]>;
distinctPrevious(eq?: Eq<T>): AsyncStream<T>;
window<R>(windowSize: number, skipAmount?: number, collector?: AsyncReducer<T, R>): AsyncStream<R>;
fold<R>(init: AsyncOptLazy<R>, next: (current: R, value: T, index: number, halt: () => void) => MaybePromise<R>): Promise<R>;

@@ -69,0 +73,0 @@ foldStream<R>(init: AsyncOptLazy<R>, next: (current: R, value: T, index: number, halt: () => void) => MaybePromise<R>): AsyncStream<R>;

16

dist/types/async-custom/constructors.d.ts

@@ -26,6 +26,6 @@ import type { Token } from '@rimbu/base';

*/
zipWith<I extends readonly unknown[]>(...sources: {
zipWith<I extends readonly [unknown, ...unknown[]]>(...sources: {
[K in keyof I]: AsyncStreamSource.NonEmpty<I[K]>;
} & unknown[]): <R>(zipFun: (...values: I) => R) => AsyncStream.NonEmpty<R>;
zipWith<I extends readonly unknown[]>(...sources: {
zipWith<I extends readonly [unknown, ...unknown[]]>(...sources: {
[K in keyof I]: AsyncStreamSource<I[K]>;

@@ -47,6 +47,6 @@ } & unknown[]): <R>(zipFun: (...values: I) => R) => AsyncStream<R>;

*/
zip<I extends readonly unknown[]>(...sources: {
zip<I extends readonly [unknown, ...unknown[]]>(...sources: {
[K in keyof I]: AsyncStreamSource.NonEmpty<I[K]>;
} & unknown[]): AsyncStream.NonEmpty<I>;
zip<I extends readonly unknown[]>(...sources: {
zip<I extends readonly [unknown, ...unknown[]]>(...sources: {
[K in keyof I]: AsyncStreamSource<I[K]>;

@@ -73,3 +73,3 @@ } & unknown[]): AsyncStream<I>;

*/
zipAllWith<I extends readonly unknown[]>(...sources: {
zipAllWith<I extends readonly [unknown, ...unknown[]]>(...sources: {
[K in keyof I]: AsyncStreamSource.NonEmpty<I[K]>;

@@ -79,3 +79,3 @@ } & unknown[]): <O, R>(fillValue: AsyncOptLazy<O>, zipFun: (...values: {

}) => MaybePromise<R>) => AsyncStream.NonEmpty<R>;
zipAllWith<I extends readonly unknown[]>(...sources: {
zipAllWith<I extends readonly [unknown, ...unknown[]]>(...sources: {
[K in keyof I]: AsyncStreamSource<I[K]>;

@@ -102,3 +102,3 @@ } & unknown[]): <O, R>(fillValue: AsyncOptLazy<O>, zipFun: (...values: {

*/
zipAll<I extends readonly unknown[], O>(fillValue: AsyncOptLazy<O>, ...sources: {
zipAll<I extends readonly [unknown, ...unknown[]], O>(fillValue: AsyncOptLazy<O>, ...sources: {
[K in keyof I]: AsyncStreamSource.NonEmpty<I[K]>;

@@ -108,3 +108,3 @@ } & unknown[]): AsyncStream.NonEmpty<{

}>;
zipAll<I extends readonly unknown[], O>(fillValue: AsyncOptLazy<O>, ...sources: {
zipAll<I extends readonly [unknown, ...unknown[]], O>(fillValue: AsyncOptLazy<O>, ...sources: {
[K in keyof I]: AsyncStreamSource<I[K]>;

@@ -111,0 +111,0 @@ } & unknown[]): AsyncStream<{

@@ -13,1 +13,2 @@ /**

export * from './interface';
export * from './async-transformer';
import type { ArrayNonEmpty, AsyncCollectFun, AsyncOptLazy, AsyncReducer, Eq, MaybePromise, ToJSON, TraverseState } from '@rimbu/common';
import type { AsyncFastIterable, AsyncStreamable, AsyncStreamSource } from '@rimbu/stream/async';
import type { AsyncFastIterable, AsyncStreamable, AsyncStreamSource, AsyncTransformer } from '@rimbu/stream/async';
import type { AsyncStreamConstructors } from '@rimbu/stream/async-custom';

@@ -190,12 +190,11 @@ /**

* @typeparam R - the resulting element type
* @param reducer - an async reducer taking elements ot type T as input, and returing an `AsyncStreamSource` of element type R
*
* @param transformer - an async reducer taking elements ot type T as input, and returing an `AsyncStreamSource` of element type R
* @note O(1)
* @example
* ```ts
* await AsyncStream.of(1, 2, 3, 4, 5, 6).flatReduceStream(Reducer.windowReducer(2)).toArray()
* await AsyncStream.of(1, 2, 3, 4, 5, 6).transform(AsyncTransformer.window(3)).toArray()
* // => [[1, 2, 3], [4, 5, 6]]
* ```
*/
flatReduceStream<R>(reducer: AsyncReducer<T, AsyncStreamSource<R>>): AsyncStream<R>;
transform<R>(transformer: AsyncTransformer<T, R>): AsyncStream<R>;
/**

@@ -282,5 +281,19 @@ * Returns an AsyncStream containing only those elements from this stream for which the given `pred` function returns true.

*/
first(): MaybePromise<T | undefined>;
first(): Promise<T | undefined>;
first<O>(otherwise: AsyncOptLazy<O>): Promise<T | O>;
/**
* Returns the first element of the Stream if it only has one element, or a fallback value if the Stream does not have exactly one value.
* @typeparam O - the optional value to return if the stream does not have exactly one value.
* @param otherwise - (default: undefined) an `OptLazy` value to return if the Stream does not have exactly one value.
* @example
* ```ts
* await AsyncStream.empty<number>().single() // => undefined
* await AsyncStream.of(1, 2, 3).single() // => undefined
* await AsyncStream.of(1).single() // => 1
* await AsyncStream.of(1, 2, 3).single(0) // => 0
* ```
*/
single(): Promise<T | undefined>;
single<O>(otherwise: AsyncOptLazy<O>): Promise<T | O>;
/**
* Returns the last element of the AsyncStream, or a fallback value (default undefined) if the stream is empty.

@@ -460,2 +473,16 @@ * @typeparam O - the optional value type to return if the stream is empty

/**
* Returns true if this stream contains the same sequence of elements as the given `source`,
* false otherwise.
* @param source - a non-empty async stream source containing the element sequence to find
* @param eq - (default: `Eq.objectIs`) the function to use to test element equality
* @example
* ```ts
* await AsyncStream.of(1, 2, 3, 4, 5).containsSlice([2, 3, 4])
* // => true
* await AsyncStream.of(1, 2, 3, 4, 5).containsSlice([4, 3, 2])
* // => false
* ```
*/
containsSlice(source: AsyncStreamSource.NonEmpty<T>, eq?: Eq<T>): Promise<boolean>;
/**
* Returns an AsyncStream that contains the elements of this stream up to the first element that does not satisfy given `pred` function.

@@ -673,2 +700,32 @@ * @param pred - a potentially asynchronous predicate function taking an element and its index

/**
* Returns an AsyncStream containing non-repetitive elements of the source stream, where repetitive elements
* are compared using the optionally given `eq` equality function.
* @param eq - (default: `Eq.objectIs`) the `Eq` instance to use to test equality of elements
* @example
* ```ts
* await AsyncStream.of(1, 1, 2, 2, 3, 1).distinctPrevious().toArray()
* // => [1, 2, 3, 1]
* ```
*/
distinctPrevious(eq?: Eq<T>): AsyncStream<T>;
/**
* Returns an AsyncStream containing `windows` of `windowSize` consecutive elements of the source stream, with each
* window starting `skipAmount` elements after the previous one.
* @typeparam R - the collector reducer result type
* @param windowSize - the size in elements of the windows
* @param skipAmount - (default: `windowSize`) the amount of elements to skip to start the next window
* @param collector - (default: `AsyncArray.toArray()`) the async reducer to use to collect the window values
* @example
* ```ts
* await Stream.of(1, 2, 3, 4, 5, 6, 7).window(3).toArray()
* // => [[1, 2, 3], [4, 5, 6]]
* await Stream.of(1, 2, 3, 4, 5).window(3, 1).toArray()
* // => [[1, 2, 3], [2, 3, 4], [3, 4, 5]]
* await Stream.of(1, 2, 3, 4).window(2, 2, AsyncReducer.toJSSet()).toArray()
* // => [Set(1, 2), Set(3, 4)]
* ```
*/
window(windowSize: number, skipAmount?: number): AsyncStream<T[]>;
window<R>(windowSize: number, skipAmount?: number, collector?: AsyncReducer<T, R>): AsyncStream<R>;
/**
* Returns the value resulting from applying the given the given `next` function to a current state (initially the given `init` value),

@@ -928,13 +985,12 @@ * and the next stream value, and returning the new state. When all elements are processed, the resulting state is returned.

* @typeparam R - the resulting element type
* @param reducer - an async reducer taking elements ot type T as input, and returing an `AsyncStreamSource` of element type R
*
* @param transformer - an async reducer taking elements ot type T as input, and returing an `AsyncStreamSource` of element type R.
* @note O(1)
* @example
* ```ts
* await AsyncStream.of(1, 2, 3, 4, 5, 6).flatReduceStream(Reducer.windowReducer(2)).toArray()
* await AsyncStream.of(1, 2, 3, 4, 5, 6).transform(AsyncTransformer.window(3)).toArray()
* // => [[1, 2, 3], [4, 5, 6]]
* ```
*/
flatReduceStream<R>(reducer: AsyncReducer<T, AsyncStreamSource.NonEmpty<R>>): AsyncStream.NonEmpty<R>;
flatReduceStream<R>(reducer: AsyncReducer<T, AsyncStreamSource<R>>): AsyncStream<R>;
transform<R>(transformer: AsyncTransformer.NonEmpty<T, R>): AsyncStream.NonEmpty<R>;
transform<R>(transformer: AsyncTransformer<T, R>): AsyncStream<R>;
/**

@@ -1056,2 +1112,13 @@ * Returns the first element of the AsyncStream.

/**
* Returns a non-empty AsyncStream containing non-repetitive elements of the source stream, where repetitive elements
* are compared using the optionally given `eq` equality function.
* @param eq - (default: `Eq.objectIs`) the `Eq` instance to use to test equality of elements
* @example
* ```ts
* await AsyncStream.of(1, 1, 2, 2, 3, 1).distinctPrevious().toArray()
* // => [1, 2, 3, 1]
* ```
*/
distinctPrevious(eq?: Eq<T>): AsyncStream.NonEmpty<T>;
/**
* Returns an AsyncStream containing the values resulting from applying the given the given `next` function to a current state (initially the given `init` value),

@@ -1058,0 +1125,0 @@ * and the next stream value, and returning the new state.

@@ -285,1 +285,23 @@ import { Token } from '@rimbu/base';

}
export declare class DistinctPreviousIterator<T> extends FastIteratorBase<T> {
readonly source: FastIterator<T>;
readonly eq: Eq<T>;
constructor(source: FastIterator<T>, eq: Eq<T>);
readonly previous: T[];
fastNext<O>(otherwise?: OptLazy<O>): T | O;
}
export declare class WindowIterator<T, R> extends FastIteratorBase<R> {
readonly source: FastIterator<T>;
readonly windowSize: number;
readonly skipAmount: number;
readonly collector: Reducer<T, R>;
constructor(source: FastIterator<T>, windowSize: number, skipAmount: number, collector: Reducer<T, R>);
state: Set<{
result: unknown;
size: number;
halted: boolean;
halt: () => void;
}>;
index: number;
fastNext<O>(otherwise?: OptLazy<O>): R | O;
}
import { ArrayNonEmpty, CollectFun, Eq, OptLazy, Reducer, ToJSON, TraverseState } from '@rimbu/common';
import type { FastIterator, Stream, StreamSource } from '@rimbu/stream';
import type { FastIterator, Stream, StreamSource, Transformer } from '@rimbu/stream';
import type { StreamConstructors } from '@rimbu/stream/custom';

@@ -19,3 +19,3 @@ export declare abstract class StreamBase<T> implements Stream<T> {

flatZip<T2>(flatMapFun: (value: T, index: number, halt: () => void) => StreamSource<T2>): Stream<[T, T2]>;
flatReduceStream<R>(reducer: Reducer<T, StreamSource<R>>): Stream<R>;
transform<R>(transformer: Transformer<T, R>): Stream<R>;
filter(pred: (value: T, index: number, halt: () => void) => boolean): Stream<T>;

@@ -28,2 +28,3 @@ filterNot(pred: (value: T, index: number, halt: () => void) => boolean): Stream<T>;

last<O>(otherwise?: OptLazy<O>): T | O;
single<O>(otherwise?: OptLazy<O>): T | O;
count(): number;

@@ -41,2 +42,3 @@ countElement(value: T, eq?: Eq<T>): number;

contains(searchValue: T, amount?: number, eq?: Eq<T>): boolean;
containsSlice(source: StreamSource.NonEmpty<T>, eq?: Eq<any>): boolean;
takeWhile(pred: (value: T, index: number) => boolean): Stream<T>;

@@ -67,2 +69,4 @@ dropWhile(pred: (value: T, index: number) => boolean): Stream<T>;

splitOn(sepElem: T, eq?: Eq<T>): Stream<T[]>;
distinctPrevious(eq?: Eq<T>): Stream<T>;
window<R>(windowSize: number, skipAmount?: number, collector?: Reducer<T, R>): Stream<R>;
fold<R>(init: OptLazy<R>, next: (current: R, value: T, index: number, halt: () => void) => R): R;

@@ -148,2 +152,16 @@ foldStream<R>(init: OptLazy<R>, next: (current: R, value: T, index: number, halt: () => void) => R): Stream<R>;

}
export declare class DistinctPreviousStream<T> extends StreamBase<T> {
readonly source: Stream<T>;
readonly eq: Eq<T>;
constructor(source: Stream<T>, eq: Eq<T>);
[Symbol.iterator](): FastIterator<T>;
}
export declare class WindowStream<T, R> extends StreamBase<R> {
readonly source: Stream<T>;
readonly windowSize: number;
readonly skipAmount: number;
readonly collector: Reducer<T, R>;
constructor(source: Stream<T>, windowSize: number, skipAmount: number, collector: Reducer<T, R>);
[Symbol.iterator](): FastIterator<R>;
}
export declare const emptyStream: Stream<any>;

@@ -150,0 +168,0 @@ export declare function isStream(obj: any): obj is Stream<any>;

@@ -16,2 +16,3 @@ /**

export * from './interface';
export * from './transformer';
export * from '../async';
import type { ArrayNonEmpty, CollectFun, Eq, OptLazy, Reducer, ToJSON, TraverseState } from '@rimbu/common';
import type { StreamConstructors } from '@rimbu/stream/custom';
import type { FastIterable, Streamable, StreamSource } from '@rimbu/stream';
import type { FastIterable, Streamable, StreamSource, Transformer } from '@rimbu/stream';
/**

@@ -190,12 +190,13 @@ * A possibly infinite sequence of elements of type T.

* @typeparam R - the resulting element type
* @param reducer - a reducer taking elements ot type T as input, and returing a `StreamSource` of element type R
*
* @param transformer - a reducer taking elements ot type T as input, and returing a `StreamSource` of element type R
* @note O(1)
* @example
* ```ts
* Stream.of(1, 2, 3, 4, 5, 6).flatReduceStream(Reducer.windowReducer(2)).toArray()
* Stream.of(1, 2, 3, 4, 5, 6)
* .transform(Transformer.window(3))
* .toArray()
* // => [[1, 2, 3], [4, 5, 6]]
* ```
*/
flatReduceStream<R>(reducer: Reducer<T, StreamSource<R>>): Stream<R>;
transform<R>(transformer: Transformer<T, R>): Stream<R>;
/**

@@ -290,3 +291,3 @@ * Returns a Stream containing only those elements from this Stream for which the given `pred` function returns true.

* @typeparam O - the optional value type to return if the stream is empty
* @param otherwise - (default: undefined) an `OptLazy` value to be returned if the Stream is empty.
* @param otherwise - (default: undefined) an `OptLazy` value to return if the Stream is empty.
* @example

@@ -303,2 +304,16 @@ * ```ts

/**
* Returns the first element of the Stream if it only has one element, or a fallback value if the Stream does not have exactly one value.
* @typeparam O - the optional value to return if the stream does not have exactly one value.
* @param otherwise - (default: undefined) an `OptLazy` value to return if the Stream does not have exactly one value.
* @example
* ```ts
* Stream.empty<number>().single() // => undefined
* Stream.of(1, 2, 3).single() // => undefined
* Stream.of(1).single() // => 1
* Stream.of(1, 2, 3).single(0) // => 0
* ```
*/
single(): T | undefined;
single<O>(otherwise: OptLazy<O>): T | O;
/**
* Returns the amount of elements in the Stream.

@@ -461,2 +476,16 @@ * @example

/**
* Returns true if this stream contains the same sequence of elements as the given `source`,
* false otherwise.
* @param source - a non-empty stream source containing the element sequence to find
* @param eq - (default: `Eq.objectIs`) the function to use to test element equality
* @example
* ```ts
* Stream.of(1, 2, 3, 4, 5).containsSlice([2, 3, 4])
* // => true
* Stream.of(1, 2, 3, 4, 5).containsSlice([4, 3, 2])
* // => false
* ```
*/
containsSlice(source: StreamSource.NonEmpty<T>, eq?: Eq<T>): boolean;
/**
* Returns a Stream that contains the elements of this Stream up to the first element that does not satisfy given `pred` function.

@@ -671,2 +700,32 @@ * @param pred - a predicate function taking an element and its index

/**
* Returns a Stream containing non-repetitive elements of the source stream, where repetitive elements
* are compared using the optionally given `eq` equality function.
* @param eq - the `Eq` instance to use to test equality of elements
* @example
* ```ts
* Stream.of(1, 1, 2, 2, 3, 1).distinctPrevious().toArray()
* // => [1, 2, 3, 1]
* ```
*/
distinctPrevious(eq?: Eq<T>): Stream<T>;
/**
* Returns a Stream containing `windows` of `windowSize` consecutive elements of the source stream, with each
* window starting `skipAmount` elements after the previous one.
* @typeparam R - the collector reducer result type
* @param windowSize - the size in elements of the windows
* @param skipAmount - (default: windowsize) the amount of elements to skip to start the next window
* @param collector - (optional, default: toArray reducer) the reducer to use to collect the window values
* @example
* ```ts
* console.log(Stream.of(1, 2, 3, 4, 5, 6, 7).window(3).toArray())
* // => [[1, 2, 3], [4, 5, 6]]
* console.log(Stream.of(1, 2, 3, 4, 5).window(3, 1).toArray())
* // => [[1, 2, 3], [2, 3, 4], [3, 4, 5]]
* console.log(Stream.of(1, 2, 3, 4).window(2, 2, Reducer.toJSSet()).toArray())
* // => [Set(1, 2), Set(3, 4)]
* ```
*/
window(windowSize: number, skipAmount?: number): Stream<T[]>;
window<R>(windowSize: number, skipAmount?: number, collector?: Reducer<T, R>): Stream<R>;
/**
* Returns the value resulting from applying the given the given `next` function to a current state (initially the given `init` value),

@@ -924,13 +983,14 @@ * and the next Stream value, and returning the new state. When all elements are processed, the resulting state is returned.

* @typeparam R - the resulting element type
* @param reducer - a reducer taking elements ot type T as input, and returing a `StreamSource` of element type R
*
* @param transformer - a reducer taking elements ot type T as input, and returing a `StreamSource` of element type R
* @note O(1)
* @example
* ```ts
* Stream.of(1, 2, 3, 4, 5, 6).flatReduceStream(Reducer.createWindowReducer(2)).toArray()
* Stream.of(1, 2, 3, 4, 5, 6)
* .transform(Transformer.window(3))
* .toArray()
* // => [[1, 2, 3], [4, 5, 6]]
* ```
*/
flatReduceStream<R>(reducer: Reducer<T, StreamSource.NonEmpty<R>>): Stream.NonEmpty<R>;
flatReduceStream<R>(reducer: Reducer<T, StreamSource<R>>): Stream<R>;
transform<R>(transformer: Transformer.NonEmpty<T, R>): Stream.NonEmpty<R>;
transform<R>(transformer: Transformer<T, R>): Stream<R>;
/**

@@ -1049,2 +1109,39 @@ * Returns the first element of the Stream.

}): Stream.NonEmpty<T>;
/**
* Returns a non-empty Stream containing non-repetitive elements of the source stream, where repetitive elements
* are compared using the optionally given `eq` equality function.
* @param eq - the `Eq` instance to use to test equality of elements
* @example
* ```ts
* Stream.of(1, 1, 2, 2, 3, 1).distinctPrevious().toArray()
* // => [1, 2, 3, 1]
* ```
*/
distinctPrevious(eq?: Eq<T>): Stream.NonEmpty<T>;
/**
* Returns a Stream containing the values resulting from applying the given the given `next` function to a current state (initially the given `init` value),
* and the next Stream value, and returning the new state.
* @typeparam R - the resulting element type
* @param init - the initial result/state value
* @param next - a function taking the parameters below and returning the new result/state value<br/>
* - current: the current result/state value, initially `init`.<br/>
* - value: the next Stream value<br/>
* - index: the index of the given value<br/>
* - halt: a function that, if called, ensures that no new elements are passed
* @example
* ```ts
* console.log(
* Stream.empty<number>()
* .foldStream(5, (current, value) => current + value)
* .toArray()
* )
* // => []
* console.log(
* Stream.of(1, 2, 3)
* .foldStream(5, (current, value) => current + value)
* .toArray()
* )
* // => [6, 8, 11]
* ```
*/
foldStream<R>(init: OptLazy<R>, next: (current: R, value: T, index: number) => R): Stream.NonEmpty<R>;

@@ -1051,0 +1148,0 @@ foldStream<R>(init: OptLazy<R>, next: (current: R, value: T, index: number, halt: () => void) => R): Stream<R>;

{
"name": "@rimbu/stream",
"version": "0.11.0",
"version": "0.12.0",
"description": "Efficient structure representing a sequence of elements, with powerful operations for TypeScript",

@@ -88,3 +88,3 @@ "keywords": [

},
"gitHead": "4d6b7411c36f599331185b1bc597e58f34954d1a"
"gitHead": "99e13768dec4c6bf02502c01f76cdbf5325de6a5"
}

@@ -1248,1 +1248,111 @@ import { Token } from '@rimbu/base';

}
export class AsyncDistinctPreviousIterator<T> extends AsyncFastIteratorBase<T> {
constructor(readonly source: AsyncFastIterator<T>, readonly eq: Eq<T>) {
super();
this.return = (): Promise<void> => closeIters(source);
}
readonly previous = [] as T[];
async fastNext<O>(otherwise?: AsyncOptLazy<O>): Promise<T | O> {
const done = Symbol('Done');
let next: T | typeof done;
const source = this.source;
const previous = this.previous;
while (done !== (next = await source.fastNext(done))) {
previous.push(next);
if (previous.length === 1) {
return next;
}
const prev = previous.shift()!;
if (!this.eq(prev, next)) {
return next;
}
}
return AsyncOptLazy.toMaybePromise(otherwise!);
}
}
export class AsyncWindowIterator<T, R> extends AsyncFastIteratorBase<R> {
constructor(
readonly source: AsyncFastIterator<T>,
readonly windowSize: number,
readonly skipAmount: number,
readonly collector: AsyncReducer<T, R>
) {
super();
this.return = (): Promise<void> => closeIters(source);
}
state = new Set<{
result: unknown;
size: number;
halted: boolean;
halt: () => void;
}>();
index = 0;
async fastNext<O>(otherwise?: AsyncOptLazy<O>): Promise<R | O> {
const source = this.source;
const collector = this.collector;
const windowSize = this.windowSize;
const skipAmount = this.skipAmount;
const done = Symbol('Done');
const state = this.state;
let next: T | typeof done;
let result: R | typeof done = done;
while (done !== (next = await source.fastNext(done))) {
for (const current of state) {
current.result = await collector.next(
current.result,
next,
current.size,
current.halt
);
current.size++;
if (current.size >= windowSize || current.halted) {
result = await collector.stateToResult(current.result);
state.delete(current);
}
}
if (this.index % skipAmount === 0) {
const newState = {
result: await AsyncOptLazy.toMaybePromise(collector.init),
size: 1,
halted: false,
halt(): void {
this.halted = true;
},
};
newState.result = await collector.next(
AsyncOptLazy.toMaybePromise(collector.init),
next,
0,
newState.halt
);
state.add(newState);
}
this.index++;
if (done !== result) {
return result;
}
}
return AsyncOptLazy.toMaybePromise(otherwise!);
}
}

@@ -18,2 +18,3 @@ import {

AsyncStreamSource,
AsyncTransformer,
} from '@rimbu/stream/async';

@@ -36,2 +37,3 @@ import {

AsyncSplitWhereIterator,
AsyncDistinctPreviousIterator,
AsyncDropWhileIterator,

@@ -57,2 +59,3 @@ AsyncFilterIterator,

AsyncTakeWhileIterator,
AsyncWindowIterator,
} from '@rimbu/stream/async-custom';

@@ -202,6 +205,6 @@ import { Stream } from '@rimbu/stream';

flatReduceStream<R>(
reducer: AsyncReducer<T, AsyncStreamSource<R>>
transform<R>(
transformer: AsyncReducer<T, AsyncStreamSource<R>>
): AsyncStream<R> {
return AsyncStreamConstructorsImpl.flatten(this.reduceStream(reducer));
return AsyncStreamConstructorsImpl.flatten(this.reduceStream(transformer));
}

@@ -246,3 +249,3 @@

if (done === value) {
return AsyncOptLazy.toMaybePromise(otherwise!);
return AsyncOptLazy.toPromise(otherwise!);
}

@@ -263,3 +266,3 @@ await closeIters(iter);

if (done === lastValue) return AsyncOptLazy.toMaybePromise(otherwise!);
if (done === lastValue) return AsyncOptLazy.toPromise(otherwise!);

@@ -269,2 +272,19 @@ return lastValue;

async single<O>(otherwise?: AsyncOptLazy<O>): Promise<T | O> {
const iterator = this[Symbol.asyncIterator]();
const done = Symbol('Done');
const value = await iterator.fastNext(done);
if (done !== value) {
if (done === (await iterator.fastNext(done))) {
return value;
}
}
await closeIters(iterator);
return AsyncOptLazy.toPromise(otherwise!);
}
async count(): Promise<number> {

@@ -327,3 +347,3 @@ let result = 0;

): Promise<T | O> {
if (occurrance <= 0) return AsyncOptLazy.toMaybePromise(otherwise!);
if (occurrance <= 0) return AsyncOptLazy.toPromise(otherwise!);

@@ -344,3 +364,3 @@ const done = Symbol('Done');

return AsyncOptLazy.toMaybePromise(otherwise!);
return AsyncOptLazy.toPromise(otherwise!);
} finally {

@@ -355,3 +375,3 @@ if (done !== value!) await closeIters(iterator);

): Promise<T | O> {
if (index < 0) return AsyncOptLazy.toMaybePromise(otherwise!);
if (index < 0) return AsyncOptLazy.toPromise(otherwise!);

@@ -371,3 +391,3 @@ const done = Symbol('Done');

return AsyncOptLazy.toMaybePromise(otherwise!);
return AsyncOptLazy.toPromise(otherwise!);
} finally {

@@ -483,2 +503,30 @@ if (done !== value!) await closeIters(iterator);

async containsSlice(
source: AsyncStreamSource.NonEmpty<T>,
eq = Eq.objectIs
): Promise<boolean> {
const iterator = this[Symbol.asyncIterator]();
const sourceStream = fromAsyncStreamSource(source);
let sourceIterator = sourceStream[Symbol.asyncIterator]();
const done = Symbol('Done');
while (true) {
const sourceValue = await sourceIterator.fastNext(done);
if (done === sourceValue) {
await closeIters(iterator);
return true;
}
const value = await iterator.fastNext(done);
if (done === value) {
return false;
}
if (!eq(sourceValue, value)) {
sourceIterator = sourceStream[Symbol.asyncIterator]();
}
}
}
takeWhile(

@@ -541,3 +589,3 @@ pred: (value: T, index: number) => MaybePromise<boolean>

if (done === result) return AsyncOptLazy.toMaybePromise(otherwise!);
if (done === result) return AsyncOptLazy.toPromise(otherwise!);

@@ -571,3 +619,3 @@ while (done !== (value = await iterator.fastNext(done))) {

if (done === result) return AsyncOptLazy.toMaybePromise(otherwise!);
if (done === result) return AsyncOptLazy.toPromise(otherwise!);

@@ -641,2 +689,14 @@ while (done !== (value = await iterator.fastNext(done))) {

distinctPrevious(eq: Eq<T> = Eq.objectIs): AsyncStream<T> {
return new AsyncDistinctPreviousStream<T>(this, eq);
}
window<R>(
windowSize: number,
skipAmount = windowSize,
collector: AsyncReducer<T, R> = AsyncReducer.toArray() as any
): AsyncStream<R> {
return new AsyncWindowStream<T, R>(this, windowSize, skipAmount, collector);
}
async fold<R>(

@@ -781,3 +841,3 @@ init: AsyncOptLazy<R>,

): any {
return new AsyncReduceAllStream(this, reducers);
return new AsyncReduceAllStream<T, R>(this, reducers);
}

@@ -829,3 +889,3 @@

[Symbol.asyncIterator](): AsyncFastIterator<T> {
return new AsyncPrependIterator(
return new AsyncPrependIterator<T>(
this.source[Symbol.asyncIterator](),

@@ -837,3 +897,3 @@ this.item

async first(): Promise<T> {
return AsyncOptLazy.toMaybePromise(this.item);
return AsyncOptLazy.toPromise(this.item);
}

@@ -856,3 +916,3 @@

[Symbol.asyncIterator](): AsyncFastIterator<T> {
return new AsyncAppendIterator(
return new AsyncAppendIterator<T>(
this.source[Symbol.asyncIterator](),

@@ -868,3 +928,3 @@ this.item

async last(): Promise<T> {
return AsyncOptLazy.toMaybePromise(this.item);
return AsyncOptLazy.toPromise(this.item);
}

@@ -912,3 +972,3 @@

const value = await this.source.first(done);
if (done === value) return AsyncOptLazy.toMaybePromise(otherwise!);
if (done === value) return AsyncOptLazy.toPromise(otherwise!);
return this.mapFun(value, 0);

@@ -920,3 +980,3 @@ }

const value = await this.source.last(done);
if (done === value) return AsyncOptLazy.toMaybePromise(otherwise!);
if (done === value) return AsyncOptLazy.toPromise(otherwise!);
return this.mapFun(value, 0);

@@ -935,3 +995,3 @@ }

const value = await this.source.elementAt(index, done);
if (done === value) return AsyncOptLazy.toMaybePromise(otherwise!);
if (done === value) return AsyncOptLazy.toPromise(otherwise!);
return this.mapFun(value, index);

@@ -943,3 +1003,3 @@ }

): AsyncStream<T3> {
return new AsyncMapStream(this.source, async (value, index) =>
return new AsyncMapStream<T, T3>(this.source, async (value, index) =>
mapFun(await this.mapFun(value, index), index)

@@ -964,3 +1024,3 @@ );

[Symbol.asyncIterator](): AsyncFastIterator<T2> {
return new AsyncMapPureIterator(
return new AsyncMapPureIterator<T, A, T2>(
this.source[Symbol.asyncIterator](),

@@ -975,3 +1035,3 @@ this.mapFun,

const value = await this.source.first(done);
if (done === value) return AsyncOptLazy.toMaybePromise(otherwise!);
if (done === value) return AsyncOptLazy.toPromise(otherwise!);
return this.mapFun(value, ...this.args);

@@ -983,3 +1043,3 @@ }

const value = await this.source.last(done);
if (done === value) return AsyncOptLazy.toMaybePromise(otherwise!);
if (done === value) return AsyncOptLazy.toPromise(otherwise!);
return this.mapFun(value, ...this.args);

@@ -998,3 +1058,3 @@ }

const value = await this.source.elementAt(index, done);
if (done === value) return AsyncOptLazy.toMaybePromise(otherwise!);
if (done === value) return AsyncOptLazy.toPromise(otherwise!);
return this.mapFun(value, ...this.args);

@@ -1064,3 +1124,3 @@ }

[Symbol.asyncIterator](): AsyncFastIterator<T> {
return new AsyncFilterIterator(
return new AsyncFilterIterator<T>(
this.source[Symbol.asyncIterator](),

@@ -1097,3 +1157,3 @@ this.pred,

[Symbol.asyncIterator](): AsyncFastIterator<T> {
return new AsyncFilterPureIterator(
return new AsyncFilterPureIterator<T, A>(
this.source[Symbol.asyncIterator](),

@@ -1116,3 +1176,3 @@ this.pred,

[Symbol.asyncIterator](): AsyncFastIterator<R> {
return new AsyncCollectIterator(
return new AsyncCollectIterator<T, R>(
this.source[Symbol.asyncIterator](),

@@ -1133,3 +1193,3 @@ this.collectFun

[Symbol.asyncIterator](): AsyncFastIterator<number> {
return new AsyncIndicesWhereIterator(
return new AsyncIndicesWhereIterator<T>(
this.source[Symbol.asyncIterator](),

@@ -1151,3 +1211,3 @@ this.pred

[Symbol.asyncIterator](): AsyncFastIterator<number> {
return new AsyncIndicesOfIterator(
return new AsyncIndicesOfIterator<T>(
this.source[Symbol.asyncIterator](),

@@ -1169,3 +1229,3 @@ this.searchValue,

[Symbol.asyncIterator](): AsyncFastIterator<T> {
return new AsyncTakeWhileIterator(
return new AsyncTakeWhileIterator<T>(
this.source[Symbol.asyncIterator](),

@@ -1186,3 +1246,3 @@ this.pred

[Symbol.asyncIterator](): AsyncFastIterator<T> {
return new AsyncDropWhileIterator(
return new AsyncDropWhileIterator<T>(
this.source[Symbol.asyncIterator](),

@@ -1200,3 +1260,3 @@ this.pred

[Symbol.asyncIterator](): AsyncFastIterator<T> {
return new AsyncTakeIterator(
return new AsyncTakeIterator<T>(
this.source[Symbol.asyncIterator](),

@@ -1219,3 +1279,3 @@ this.amount

[Symbol.asyncIterator](): AsyncFastIterator<T> {
return new AsyncDropIterator(
return new AsyncDropIterator<T>(
this.source[Symbol.asyncIterator](),

@@ -1241,3 +1301,3 @@ this.amount

[Symbol.asyncIterator](): AsyncFastIterator<T[]> {
return new AsyncSplitWhereIterator(
return new AsyncSplitWhereIterator<T>(
this.source[Symbol.asyncIterator](),

@@ -1259,3 +1319,3 @@ this.pred

[Symbol.asyncIterator](): AsyncFastIterator<T[]> {
return new AsyncSplitOnIterator(
return new AsyncSplitOnIterator<T>(
this.source[Symbol.asyncIterator](),

@@ -1268,2 +1328,35 @@ this.sepElem,

class AsyncDistinctPreviousStream<T> extends AsyncStreamBase<T> {
constructor(readonly source: AsyncStream<T>, readonly eq: Eq<T>) {
super();
}
[Symbol.asyncIterator](): AsyncFastIterator<T> {
return new AsyncDistinctPreviousIterator<T>(
this.source[Symbol.asyncIterator](),
this.eq
);
}
}
class AsyncWindowStream<T, R> extends AsyncStreamBase<R> {
constructor(
readonly source: AsyncStream<T>,
readonly windowSize: number,
readonly skipAmount: number,
readonly collector: AsyncReducer<T, R>
) {
super();
}
[Symbol.asyncIterator](): AsyncFastIterator<R> {
return new AsyncWindowIterator<T, R>(
this.source[Symbol.asyncIterator](),
this.windowSize,
this.skipAmount,
this.collector
);
}
}
class AsyncReduceStream<I, R> extends AsyncStreamBase<R> {

@@ -1278,3 +1371,3 @@ constructor(

[Symbol.asyncIterator](): AsyncFastIterator<R> {
return new AsyncReduceIterator(
return new AsyncReduceIterator<I, R>(
this.source[Symbol.asyncIterator](),

@@ -1295,3 +1388,3 @@ this.reducerDef

[Symbol.asyncIterator](): AsyncFastIterator<R> {
return new AsyncReduceAllIterator(
return new AsyncReduceAllIterator<I, R>(
this.source[Symbol.asyncIterator](),

@@ -1309,3 +1402,3 @@ this.reducers

[Symbol.asyncIterator](): AsyncFastIterator<T> {
return new AsyncOfIterator(this.values);
return new AsyncOfIterator<T>(this.values);
}

@@ -1326,25 +1419,57 @@ }

asyncStream(): this {
return this;
}
async equals(other: AsyncStreamSource<T>): Promise<boolean> {
if (other === this) return true;
const done = Symbol('done');
return (
done ===
fromAsyncStreamSource(other)[Symbol.asyncIterator]().fastNext(done)
);
}
prepend(value: AsyncOptLazy<T>): AsyncStream.NonEmpty<T> {
return AsyncStreamConstructorsImpl.of(value);
}
append(value: AsyncOptLazy<T>): AsyncStream.NonEmpty<T> {
return AsyncStreamConstructorsImpl.of(value);
}
assumeNonEmpty(): never {
RimbuError.throwEmptyCollectionAssumedNonEmptyError();
}
async forEach(): Promise<void> {
//
}
async forEachPure(): Promise<void> {
//
}
indexed(): AsyncStream<[number, T]> {
return this as any;
}
map<T2>(): AsyncStream<T2> {
return this as any;
}
mapPure<T2>(): AsyncStream<T2> {
return this as any;
}
flatMap<T2>(): AsyncStream<T2> {
return this as any;
}
flatZip<T2>(): AsyncStream<[T, T2]> {
return this as any;
}
transform<R>(transformer: AsyncTransformer<T, R>): AsyncStream<R> {
return AsyncStreamConstructorsImpl.from<R>(
async () =>
await transformer.stateToResult(
await AsyncOptLazy.toMaybePromise(transformer.init)
)
);
}
filter(): AsyncStream<T> {

@@ -1371,2 +1496,5 @@ return this;

}
single<O>(otherwise?: AsyncOptLazy<O>): Promise<T | O> {
return AsyncOptLazy.toPromise(otherwise!);
}
async count(): Promise<0> {

@@ -1412,2 +1540,5 @@ return 0;

}
async containsSlice(): Promise<false> {
return false;
}
takeWhile(): AsyncStream<T> {

@@ -1466,2 +1597,14 @@ return this;

}
splitOn(): AsyncStream<T[]> {
return this as any;
}
splitWhere(): AsyncStream<T[]> {
return this as any;
}
distinctPrevious(): AsyncStream<T> {
return this;
}
window<R>(): AsyncStream<R> {
return this as any;
}
fold<R>(init: AsyncOptLazy<R>): Promise<R> {

@@ -1474,5 +1617,3 @@ return AsyncOptLazy.toPromise(init);

async reduce<O>(reducer: AsyncReducer<T, O>): Promise<O> {
return reducer.stateToResult(
await AsyncOptLazy.toMaybePromise(reducer.init)
);
return reducer.stateToResult(await AsyncOptLazy.toPromise(reducer.init));
}

@@ -1498,2 +1639,8 @@ reduceStream(): any {

}
async toJSON(): Promise<ToJSON<T[], 'AsyncStream'>> {
return {
dataType: 'AsyncStream',
value: [],
};
}
}

@@ -1636,3 +1783,3 @@

zip(...sources): any {
return AsyncStreamConstructorsImpl.zipWith(...sources)(Array);
return AsyncStreamConstructorsImpl.zipWith(...(sources as any))(Array);
},

@@ -1639,0 +1786,0 @@ zipAllWith(...sources): any {

@@ -38,6 +38,6 @@ import type { Token } from '@rimbu/base';

*/
zipWith<I extends readonly unknown[]>(
zipWith<I extends readonly [unknown, ...unknown[]]>(
...sources: { [K in keyof I]: AsyncStreamSource.NonEmpty<I[K]> } & unknown[]
): <R>(zipFun: (...values: I) => R) => AsyncStream.NonEmpty<R>;
zipWith<I extends readonly unknown[]>(
zipWith<I extends readonly [unknown, ...unknown[]]>(
...sources: { [K in keyof I]: AsyncStreamSource<I[K]> } & unknown[]

@@ -59,6 +59,6 @@ ): <R>(zipFun: (...values: I) => R) => AsyncStream<R>;

*/
zip<I extends readonly unknown[]>(
zip<I extends readonly [unknown, ...unknown[]]>(
...sources: { [K in keyof I]: AsyncStreamSource.NonEmpty<I[K]> } & unknown[]
): AsyncStream.NonEmpty<I>;
zip<I extends readonly unknown[]>(
zip<I extends readonly [unknown, ...unknown[]]>(
...sources: { [K in keyof I]: AsyncStreamSource<I[K]> } & unknown[]

@@ -85,3 +85,3 @@ ): AsyncStream<I>;

*/
zipAllWith<I extends readonly unknown[]>(
zipAllWith<I extends readonly [unknown, ...unknown[]]>(
...sources: { [K in keyof I]: AsyncStreamSource.NonEmpty<I[K]> } & unknown[]

@@ -92,3 +92,3 @@ ): <O, R>(

) => AsyncStream.NonEmpty<R>;
zipAllWith<I extends readonly unknown[]>(
zipAllWith<I extends readonly [unknown, ...unknown[]]>(
...sources: { [K in keyof I]: AsyncStreamSource<I[K]> } & unknown[]

@@ -117,7 +117,7 @@ ): <O, R>(

*/
zipAll<I extends readonly unknown[], O>(
zipAll<I extends readonly [unknown, ...unknown[]], O>(
fillValue: AsyncOptLazy<O>,
...sources: { [K in keyof I]: AsyncStreamSource.NonEmpty<I[K]> } & unknown[]
): AsyncStream.NonEmpty<{ [K in keyof I]: I[K] | O }>;
zipAll<I extends readonly unknown[], O>(
zipAll<I extends readonly [unknown, ...unknown[]], O>(
fillValue: AsyncOptLazy<O>,

@@ -124,0 +124,0 @@ ...sources: { [K in keyof I]: AsyncStreamSource<I[K]> } & unknown[]

@@ -15,1 +15,3 @@ /**

export * from './interface';
export * from './async-transformer';

@@ -15,2 +15,3 @@ import type {

AsyncStreamSource,
AsyncTransformer,
} from '@rimbu/stream/async';

@@ -231,14 +232,11 @@ import type { AsyncStreamConstructors } from '@rimbu/stream/async-custom';

* @typeparam R - the resulting element type
* @param reducer - an async reducer taking elements ot type T as input, and returing an `AsyncStreamSource` of element type R
*
* @param transformer - an async reducer taking elements ot type T as input, and returing an `AsyncStreamSource` of element type R
* @note O(1)
* @example
* ```ts
* await AsyncStream.of(1, 2, 3, 4, 5, 6).flatReduceStream(Reducer.windowReducer(2)).toArray()
* await AsyncStream.of(1, 2, 3, 4, 5, 6).transform(AsyncTransformer.window(3)).toArray()
* // => [[1, 2, 3], [4, 5, 6]]
* ```
*/
flatReduceStream<R>(
reducer: AsyncReducer<T, AsyncStreamSource<R>>
): AsyncStream<R>;
transform<R>(transformer: AsyncTransformer<T, R>): AsyncStream<R>;
/**

@@ -335,5 +333,19 @@ * Returns an AsyncStream containing only those elements from this stream for which the given `pred` function returns true.

*/
first(): MaybePromise<T | undefined>;
first(): Promise<T | undefined>;
first<O>(otherwise: AsyncOptLazy<O>): Promise<T | O>;
/**
* Returns the first element of the Stream if it only has one element, or a fallback value if the Stream does not have exactly one value.
* @typeparam O - the optional value to return if the stream does not have exactly one value.
* @param otherwise - (default: undefined) an `OptLazy` value to return if the Stream does not have exactly one value.
* @example
* ```ts
* await AsyncStream.empty<number>().single() // => undefined
* await AsyncStream.of(1, 2, 3).single() // => undefined
* await AsyncStream.of(1).single() // => 1
* await AsyncStream.of(1, 2, 3).single(0) // => 0
* ```
*/
single(): Promise<T | undefined>;
single<O>(otherwise: AsyncOptLazy<O>): Promise<T | O>;
/**
* Returns the last element of the AsyncStream, or a fallback value (default undefined) if the stream is empty.

@@ -531,2 +543,19 @@ * @typeparam O - the optional value type to return if the stream is empty

/**
* Returns true if this stream contains the same sequence of elements as the given `source`,
* false otherwise.
* @param source - a non-empty async stream source containing the element sequence to find
* @param eq - (default: `Eq.objectIs`) the function to use to test element equality
* @example
* ```ts
* await AsyncStream.of(1, 2, 3, 4, 5).containsSlice([2, 3, 4])
* // => true
* await AsyncStream.of(1, 2, 3, 4, 5).containsSlice([4, 3, 2])
* // => false
* ```
*/
containsSlice(
source: AsyncStreamSource.NonEmpty<T>,
eq?: Eq<T>
): Promise<boolean>;
/**
* Returns an AsyncStream that contains the elements of this stream up to the first element that does not satisfy given `pred` function.

@@ -760,2 +789,36 @@ * @param pred - a potentially asynchronous predicate function taking an element and its index

/**
* Returns an AsyncStream containing non-repetitive elements of the source stream, where repetitive elements
* are compared using the optionally given `eq` equality function.
* @param eq - (default: `Eq.objectIs`) the `Eq` instance to use to test equality of elements
* @example
* ```ts
* await AsyncStream.of(1, 1, 2, 2, 3, 1).distinctPrevious().toArray()
* // => [1, 2, 3, 1]
* ```
*/
distinctPrevious(eq?: Eq<T>): AsyncStream<T>;
/**
* Returns an AsyncStream containing `windows` of `windowSize` consecutive elements of the source stream, with each
* window starting `skipAmount` elements after the previous one.
* @typeparam R - the collector reducer result type
* @param windowSize - the size in elements of the windows
* @param skipAmount - (default: `windowSize`) the amount of elements to skip to start the next window
* @param collector - (default: `AsyncArray.toArray()`) the async reducer to use to collect the window values
* @example
* ```ts
* await Stream.of(1, 2, 3, 4, 5, 6, 7).window(3).toArray()
* // => [[1, 2, 3], [4, 5, 6]]
* await Stream.of(1, 2, 3, 4, 5).window(3, 1).toArray()
* // => [[1, 2, 3], [2, 3, 4], [3, 4, 5]]
* await Stream.of(1, 2, 3, 4).window(2, 2, AsyncReducer.toJSSet()).toArray()
* // => [Set(1, 2), Set(3, 4)]
* ```
*/
window(windowSize: number, skipAmount?: number): AsyncStream<T[]>;
window<R>(
windowSize: number,
skipAmount?: number,
collector?: AsyncReducer<T, R>
): AsyncStream<R>;
/**
* Returns the value resulting from applying the given the given `next` function to a current state (initially the given `init` value),

@@ -1053,17 +1116,14 @@ * and the next stream value, and returning the new state. When all elements are processed, the resulting state is returned.

* @typeparam R - the resulting element type
* @param reducer - an async reducer taking elements ot type T as input, and returing an `AsyncStreamSource` of element type R
*
* @param transformer - an async reducer taking elements ot type T as input, and returing an `AsyncStreamSource` of element type R.
* @note O(1)
* @example
* ```ts
* await AsyncStream.of(1, 2, 3, 4, 5, 6).flatReduceStream(Reducer.windowReducer(2)).toArray()
* await AsyncStream.of(1, 2, 3, 4, 5, 6).transform(AsyncTransformer.window(3)).toArray()
* // => [[1, 2, 3], [4, 5, 6]]
* ```
*/
flatReduceStream<R>(
reducer: AsyncReducer<T, AsyncStreamSource.NonEmpty<R>>
transform<R>(
transformer: AsyncTransformer.NonEmpty<T, R>
): AsyncStream.NonEmpty<R>;
flatReduceStream<R>(
reducer: AsyncReducer<T, AsyncStreamSource<R>>
): AsyncStream<R>;
transform<R>(transformer: AsyncTransformer<T, R>): AsyncStream<R>;
/**

@@ -1187,2 +1247,14 @@ * Returns the first element of the AsyncStream.

/**
* Returns a non-empty AsyncStream containing non-repetitive elements of the source stream, where repetitive elements
* are compared using the optionally given `eq` equality function.
* @param eq - (default: `Eq.objectIs`) the `Eq` instance to use to test equality of elements
* @example
* ```ts
* await AsyncStream.of(1, 1, 2, 2, 3, 1).distinctPrevious().toArray()
* // => [1, 2, 3, 1]
* ```
*/
distinctPrevious(eq?: Eq<T>): AsyncStream.NonEmpty<T>;
/**
* Returns an AsyncStream containing the values resulting from applying the given the given `next` function to a current state (initially the given `init` value),

@@ -1189,0 +1261,0 @@ * and the next stream value, and returning the new state.

@@ -1011,1 +1011,109 @@ import { Token } from '@rimbu/base';

}
export class DistinctPreviousIterator<T> extends FastIteratorBase<T> {
constructor(readonly source: FastIterator<T>, readonly eq: Eq<T>) {
super();
}
readonly previous = [] as T[];
fastNext<O>(otherwise?: OptLazy<O>): T | O {
const done = Symbol('Done');
let next: T | typeof done;
const source = this.source;
const previous = this.previous;
while (done !== (next = source.fastNext(done))) {
previous.push(next);
if (previous.length === 1) {
return next;
}
const prev = previous.shift()!;
if (!this.eq(prev, next)) {
return next;
}
}
return OptLazy(otherwise!);
}
}
export class WindowIterator<T, R> extends FastIteratorBase<R> {
constructor(
readonly source: FastIterator<T>,
readonly windowSize: number,
readonly skipAmount: number,
readonly collector: Reducer<T, R>
) {
super();
}
state = new Set<{
result: unknown;
size: number;
halted: boolean;
halt: () => void;
}>();
index = 0;
fastNext<O>(otherwise?: OptLazy<O>): R | O {
const source = this.source;
const collector = this.collector;
const windowSize = this.windowSize;
const skipAmount = this.skipAmount;
const done = Symbol('Done');
const state = this.state;
let next: T | typeof done;
let result: R | typeof done = done;
while (done !== (next = source.fastNext(done))) {
for (const current of state) {
current.result = collector.next(
current.result,
next,
current.size,
current.halt
);
current.size++;
if (current.size >= windowSize || current.halted) {
result = collector.stateToResult(current.result);
state.delete(current);
}
}
if (this.index % skipAmount === 0) {
const newState = {
result: OptLazy(collector.init),
size: 1,
halted: false,
halt(): void {
this.halted = true;
},
};
newState.result = collector.next(
OptLazy(collector.init),
next,
0,
newState.halt
);
state.add(newState);
}
this.index++;
if (done !== result) {
return result;
}
}
return OptLazy(otherwise!);
}
}

@@ -14,3 +14,8 @@ import {

} from '@rimbu/common';
import type { FastIterator, Stream, StreamSource } from '@rimbu/stream';
import type {
FastIterator,
Stream,
StreamSource,
Transformer,
} from '@rimbu/stream';
import type { StreamConstructors } from '@rimbu/stream/custom';

@@ -54,2 +59,4 @@ import {

PrependIterator,
DistinctPreviousIterator,
WindowIterator,
} from '@rimbu/stream/custom';

@@ -156,4 +163,4 @@ import { RimbuError, Token } from '@rimbu/base';

flatReduceStream<R>(reducer: Reducer<T, StreamSource<R>>): Stream<R> {
return StreamConstructorsImpl.flatten(this.reduceStream(reducer));
transform<R>(transformer: Transformer<T, R>): Stream<R> {
return StreamConstructorsImpl.flatten(this.reduceStream(transformer));
}

@@ -209,2 +216,17 @@

single<O>(otherwise?: OptLazy<O>): T | O {
const iterator = this[Symbol.iterator]();
const done = Symbol('Done');
const value = iterator.fastNext(done);
if (done !== value) {
if (done === iterator.fastNext(done)) {
return value;
}
}
return OptLazy(otherwise!);
}
count(): number {

@@ -365,2 +387,22 @@ let result = 0;

containsSlice(source: StreamSource.NonEmpty<T>, eq = Eq.objectIs): boolean {
const iterator = this[Symbol.iterator]();
const sourceStream = fromStreamSource(source);
let sourceIterator = sourceStream[Symbol.iterator]();
const done = Symbol('Done');
while (true) {
const sourceValue = sourceIterator.fastNext(done);
if (done === sourceValue) return true;
const value = iterator.fastNext(done);
if (done === value) return false;
if (!eq(sourceValue, value)) {
sourceIterator = sourceStream[Symbol.iterator]();
}
}
}
takeWhile(pred: (value: T, index: number) => boolean): Stream<T> {

@@ -487,2 +529,14 @@ return new TakeWhileStream<T>(this, pred);

distinctPrevious(eq: Eq<T> = Eq.objectIs): Stream<T> {
return new DistinctPreviousStream<T>(this, eq);
}
window<R>(
windowSize: number,
skipAmount = windowSize,
collector: Reducer<T, R> = Reducer.toArray() as any
): Stream<R> {
return new WindowStream<T, R>(this, windowSize, skipAmount, collector);
}
fold<R>(

@@ -1147,10 +1201,25 @@ init: OptLazy<R>,

stream(): this {
return this;
}
assumeNonEmpty(): never {
RimbuError.throwEmptyCollectionAssumedNonEmptyError();
}
equals(other: StreamSource<T>): boolean {
const done = Symbol('Done');
return done === fromStreamSource(other)[Symbol.iterator]().fastNext(done);
}
prepend(value: OptLazy<T>): Stream.NonEmpty<T> {
return StreamConstructorsImpl.of(OptLazy(value));
}
append(value: OptLazy<T>): Stream.NonEmpty<T> {
return StreamConstructorsImpl.of(OptLazy(value));
}
forEach(): void {
//
}
forEachPure(): void {
//
}
indexed(): Stream<[number, T]> {

@@ -1171,5 +1240,5 @@ return this as any;

}
flatReduceStream<R>(reducer: Reducer<T, StreamSource<R>>): Stream<R> {
transform<R>(transformer: Transformer<T, R>): Stream<R> {
return StreamConstructorsImpl.from(
reducer.stateToResult(OptLazy(reducer.init))
transformer.stateToResult(OptLazy(transformer.init))
);

@@ -1198,2 +1267,5 @@ }

}
single<O>(otherwise?: OptLazy<O>): O {
return OptLazy(otherwise) as O;
}
count(): 0 {

@@ -1239,2 +1311,5 @@ return 0;

}
containsSlice(): false {
return false;
}
takeWhile(): Stream<T> {

@@ -1288,2 +1363,14 @@ return this;

}
splitOn(): Stream<T[]> {
return this as any;
}
splitWhere(): Stream<T[]> {
return this as any;
}
distinctPrevious(): Stream<T> {
return this;
}
window<R>(): Stream<R> {
return this as any;
}
fold<R>(init: OptLazy<R>): R {

@@ -1313,2 +1400,8 @@ return OptLazy(init);

}
toJSON(): ToJSON<T[], 'Stream'> {
return {
dataType: 'Stream',
value: [],
};
}
}

@@ -1672,2 +1765,35 @@

export class DistinctPreviousStream<T> extends StreamBase<T> {
constructor(readonly source: Stream<T>, readonly eq: Eq<T>) {
super();
}
[Symbol.iterator](): FastIterator<T> {
return new DistinctPreviousIterator(
this.source[Symbol.iterator](),
this.eq
);
}
}
export class WindowStream<T, R> extends StreamBase<R> {
constructor(
readonly source: Stream<T>,
readonly windowSize: number,
readonly skipAmount: number,
readonly collector: Reducer<T, R>
) {
super();
}
[Symbol.iterator](): FastIterator<R> {
return new WindowIterator(
this.source[Symbol.iterator](),
this.windowSize,
this.skipAmount,
this.collector
);
}
}
export const emptyStream: Stream<any> = Object.freeze(new EmptyStream());

@@ -1674,0 +1800,0 @@

@@ -19,2 +19,4 @@ /**

export * from './transformer';
export * from '../async';

@@ -12,3 +12,8 @@ import type {

import { StreamConstructorsImpl } from '@rimbu/stream/custom';
import type { FastIterable, Streamable, StreamSource } from '@rimbu/stream';
import type {
FastIterable,
Streamable,
StreamSource,
Transformer,
} from '@rimbu/stream';

@@ -214,12 +219,13 @@ /**

* @typeparam R - the resulting element type
* @param reducer - a reducer taking elements ot type T as input, and returing a `StreamSource` of element type R
*
* @param transformer - a reducer taking elements ot type T as input, and returing a `StreamSource` of element type R
* @note O(1)
* @example
* ```ts
* Stream.of(1, 2, 3, 4, 5, 6).flatReduceStream(Reducer.windowReducer(2)).toArray()
* Stream.of(1, 2, 3, 4, 5, 6)
* .transform(Transformer.window(3))
* .toArray()
* // => [[1, 2, 3], [4, 5, 6]]
* ```
*/
flatReduceStream<R>(reducer: Reducer<T, StreamSource<R>>): Stream<R>;
transform<R>(transformer: Transformer<T, R>): Stream<R>;
/**

@@ -324,3 +330,3 @@ * Returns a Stream containing only those elements from this Stream for which the given `pred` function returns true.

* @typeparam O - the optional value type to return if the stream is empty
* @param otherwise - (default: undefined) an `OptLazy` value to be returned if the Stream is empty.
* @param otherwise - (default: undefined) an `OptLazy` value to return if the Stream is empty.
* @example

@@ -337,2 +343,16 @@ * ```ts

/**
* Returns the first element of the Stream if it only has one element, or a fallback value if the Stream does not have exactly one value.
* @typeparam O - the optional value to return if the stream does not have exactly one value.
* @param otherwise - (default: undefined) an `OptLazy` value to return if the Stream does not have exactly one value.
* @example
* ```ts
* Stream.empty<number>().single() // => undefined
* Stream.of(1, 2, 3).single() // => undefined
* Stream.of(1).single() // => 1
* Stream.of(1, 2, 3).single(0) // => 0
* ```
*/
single(): T | undefined;
single<O>(otherwise: OptLazy<O>): T | O;
/**
* Returns the amount of elements in the Stream.

@@ -505,2 +525,16 @@ * @example

/**
* Returns true if this stream contains the same sequence of elements as the given `source`,
* false otherwise.
* @param source - a non-empty stream source containing the element sequence to find
* @param eq - (default: `Eq.objectIs`) the function to use to test element equality
* @example
* ```ts
* Stream.of(1, 2, 3, 4, 5).containsSlice([2, 3, 4])
* // => true
* Stream.of(1, 2, 3, 4, 5).containsSlice([4, 3, 2])
* // => false
* ```
*/
containsSlice(source: StreamSource.NonEmpty<T>, eq?: Eq<T>): boolean;
/**
* Returns a Stream that contains the elements of this Stream up to the first element that does not satisfy given `pred` function.

@@ -717,2 +751,36 @@ * @param pred - a predicate function taking an element and its index

/**
* Returns a Stream containing non-repetitive elements of the source stream, where repetitive elements
* are compared using the optionally given `eq` equality function.
* @param eq - the `Eq` instance to use to test equality of elements
* @example
* ```ts
* Stream.of(1, 1, 2, 2, 3, 1).distinctPrevious().toArray()
* // => [1, 2, 3, 1]
* ```
*/
distinctPrevious(eq?: Eq<T>): Stream<T>;
/**
* Returns a Stream containing `windows` of `windowSize` consecutive elements of the source stream, with each
* window starting `skipAmount` elements after the previous one.
* @typeparam R - the collector reducer result type
* @param windowSize - the size in elements of the windows
* @param skipAmount - (default: windowsize) the amount of elements to skip to start the next window
* @param collector - (optional, default: toArray reducer) the reducer to use to collect the window values
* @example
* ```ts
* console.log(Stream.of(1, 2, 3, 4, 5, 6, 7).window(3).toArray())
* // => [[1, 2, 3], [4, 5, 6]]
* console.log(Stream.of(1, 2, 3, 4, 5).window(3, 1).toArray())
* // => [[1, 2, 3], [2, 3, 4], [3, 4, 5]]
* console.log(Stream.of(1, 2, 3, 4).window(2, 2, Reducer.toJSSet()).toArray())
* // => [Set(1, 2), Set(3, 4)]
* ```
*/
window(windowSize: number, skipAmount?: number): Stream<T[]>;
window<R>(
windowSize: number,
skipAmount?: number,
collector?: Reducer<T, R>
): Stream<R>;
/**
* Returns the value resulting from applying the given the given `next` function to a current state (initially the given `init` value),

@@ -996,15 +1064,14 @@ * and the next Stream value, and returning the new state. When all elements are processed, the resulting state is returned.

* @typeparam R - the resulting element type
* @param reducer - a reducer taking elements ot type T as input, and returing a `StreamSource` of element type R
*
* @param transformer - a reducer taking elements ot type T as input, and returing a `StreamSource` of element type R
* @note O(1)
* @example
* ```ts
* Stream.of(1, 2, 3, 4, 5, 6).flatReduceStream(Reducer.createWindowReducer(2)).toArray()
* Stream.of(1, 2, 3, 4, 5, 6)
* .transform(Transformer.window(3))
* .toArray()
* // => [[1, 2, 3], [4, 5, 6]]
* ```
*/
flatReduceStream<R>(
reducer: Reducer<T, StreamSource.NonEmpty<R>>
): Stream.NonEmpty<R>;
flatReduceStream<R>(reducer: Reducer<T, StreamSource<R>>): Stream<R>;
transform<R>(transformer: Transformer.NonEmpty<T, R>): Stream.NonEmpty<R>;
transform<R>(transformer: Transformer<T, R>): Stream<R>;
/**

@@ -1125,2 +1192,39 @@ * Returns the first element of the Stream.

}): Stream.NonEmpty<T>;
/**
* Returns a non-empty Stream containing non-repetitive elements of the source stream, where repetitive elements
* are compared using the optionally given `eq` equality function.
* @param eq - the `Eq` instance to use to test equality of elements
* @example
* ```ts
* Stream.of(1, 1, 2, 2, 3, 1).distinctPrevious().toArray()
* // => [1, 2, 3, 1]
* ```
*/
distinctPrevious(eq?: Eq<T>): Stream.NonEmpty<T>;
/**
* Returns a Stream containing the values resulting from applying the given the given `next` function to a current state (initially the given `init` value),
* and the next Stream value, and returning the new state.
* @typeparam R - the resulting element type
* @param init - the initial result/state value
* @param next - a function taking the parameters below and returning the new result/state value<br/>
* - current: the current result/state value, initially `init`.<br/>
* - value: the next Stream value<br/>
* - index: the index of the given value<br/>
* - halt: a function that, if called, ensures that no new elements are passed
* @example
* ```ts
* console.log(
* Stream.empty<number>()
* .foldStream(5, (current, value) => current + value)
* .toArray()
* )
* // => []
* console.log(
* Stream.of(1, 2, 3)
* .foldStream(5, (current, value) => current + value)
* .toArray()
* )
* // => [6, 8, 11]
* ```
*/
foldStream<R>(

@@ -1127,0 +1231,0 @@ init: OptLazy<R>,

Sorry, the diff of this file is too big to display

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is too big to display

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is too big to display

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc