Socket
Socket
Sign inDemoInstall

@rimbu/stream

Package Overview
Dependencies
Maintainers
1
Versions
56
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@rimbu/stream - npm Package Compare versions

Comparing version 0.8.0 to 0.8.1

76

dist/main/async-stream/constructors.js

@@ -65,11 +65,12 @@ "use strict";

exports.fromResource = fromResource;
/** Returns an AsyncStream with the result of applying given `zipFun` to each successive value resulting from the given `streams`.
/** Returns an AsyncStream with the result of applying given `zipFun` to each successive value resulting from the given `sources`.
* @param sources - the input async stream sources
* @param zipFun - a potentially asynchronous function taking one element from each given Stream, and returning a result value
* @param streams - the input async stream sources
* @example
* await AsyncStream.zipWith(
* async (a, b, c) => c ? a + b : a - b,
* [1, 2],
* [3, 4, 5],
* [true, false]
* )(
* async (a, b, c) => c ? a + b : a - b,
* ).toArray()

@@ -79,16 +80,18 @@ * // => [4, -2]

*/
var zipWith = function (zipFun) {
var iters = [];
for (var _i = 1; _i < arguments.length; _i++) {
iters[_i - 1] = arguments[_i];
var zipWith = function () {
var sources = [];
for (var _i = 0; _i < arguments.length; _i++) {
sources[_i] = arguments[_i];
}
if (iters.some(internal_1.AsyncStreamSource.isEmptyInstance)) {
return internal_1.AsyncStream.empty();
}
return new async_stream_custom_1.AsyncFromStream(function () { return new AsyncZipWithIterator(iters, zipFun); });
return function (zipFun) {
if (sources.some(internal_1.AsyncStreamSource.isEmptyInstance)) {
return internal_1.AsyncStream.empty();
}
return new async_stream_custom_1.AsyncFromStream(function () { return new AsyncZipWithIterator(sources, zipFun); });
};
};
exports.zipWith = zipWith;
/**
* Returns an AsyncStream with tuples containing each successive value from the given `streams`.
* @param streams - the input async stream sources
* Returns an AsyncStream with tuples containing each successive value from the given `sources`.
* @param sources - the input async stream sources
* @example

@@ -104,43 +107,46 @@ * await AsyncStream.zip(

var zip = function () {
var iters = [];
var sources = [];
for (var _i = 0; _i < arguments.length; _i++) {
iters[_i] = arguments[_i];
sources[_i] = arguments[_i];
}
return exports.zipWith.apply(void 0, (0, tslib_1.__spreadArray)([Array], (0, tslib_1.__read)(iters), false));
return exports.zipWith.apply(void 0, (0, tslib_1.__spreadArray)([], (0, tslib_1.__read)(sources), false))(Array);
};
exports.zip = zip;
/**
* Returns an AsyncStream with the result of applying given `zipFun` to each successive value resulting from the given `streams`, adding
* Returns an AsyncStream with the result of applying given `zipFun` to each successive value resulting from the given `sources`, adding
* given `fillValue` to any Streams that end before all streams have ended.
* @param sources - the input async stream sources
* @param fillValue - the `AsyncOptLazyz value to add to streams that end early
* @param zipFun - a potentially asynchronous function taking one element from each given Stream, and returning a result value
* @param streams - the input async stream sources
* @example
* await AsyncStream.zipAllWith(
* async () => 0,
* async (a, b, c) => a + b + c,
* [1, 2],
* [3, 4, 5],
* [6, 7]
* )(
* async () => 0,
* async (a, b, c) => a + b + c,
* ).toArray()
* // => [10, 13, 5]
*/
var zipAllWith = function (fillValue, zipFun) {
var streams = [];
for (var _i = 2; _i < arguments.length; _i++) {
streams[_i - 2] = arguments[_i];
var zipAllWith = function () {
var sources = [];
for (var _i = 0; _i < arguments.length; _i++) {
sources[_i] = arguments[_i];
}
if (streams.every(internal_1.AsyncStreamSource.isEmptyInstance)) {
return internal_1.AsyncStream.empty();
}
return new async_stream_custom_1.AsyncFromStream(function () {
return new AsyncZipAllWithItererator(fillValue, streams, zipFun);
});
return function (fillValue, zipFun) {
if (sources.every(internal_1.AsyncStreamSource.isEmptyInstance)) {
return internal_1.AsyncStream.empty();
}
return new async_stream_custom_1.AsyncFromStream(function () {
return new AsyncZipAllWithItererator(fillValue, sources, zipFun);
});
};
};
exports.zipAllWith = zipAllWith;
/**
* Returns an AsyncStream with tuples containing each successive value from the given `streams`, adding given `fillValue` to any streams
* Returns an AsyncStream with tuples containing each successive value from the given `sources`, adding given `fillValue` to any streams
* that end before all streams have ended.
* @param fillValue - the `AsyncOptLazy` value to add to streams that end early
* @param streams - the input async stream sources
* @param sources - the input async stream sources
* @example

@@ -157,7 +163,7 @@ * await AsyncStream.zipAll(

var zipAll = function (fillValue) {
var streams = [];
var sources = [];
for (var _i = 1; _i < arguments.length; _i++) {
streams[_i - 1] = arguments[_i];
sources[_i - 1] = arguments[_i];
}
return exports.zipAllWith.apply(void 0, (0, tslib_1.__spreadArray)([fillValue, Array], (0, tslib_1.__read)(streams), false));
return exports.zipAllWith.apply(void 0, (0, tslib_1.__spreadArray)([], (0, tslib_1.__read)(sources), false))(fillValue, Array);
};

@@ -164,0 +170,0 @@ exports.zipAll = zipAll;

@@ -510,7 +510,13 @@ "use strict";

/**
* Returns a Stream with the result of applying given `zipFun` to each successive value resulting from the given `streams`.
* Returns a Stream with the result of applying given `zipFun` to each successive value resulting from the given `sources`.
* @param sources - the input stream sources
* @param zipFun - a function taking one element from each given Stream, and returning a result value
* @param streams - the input stream sources
* @example
* Stream.zipWith((a, b, c) => c ? a + b : a - b, [1, 2], [3, 4, 5], [true, false]).toArray()
* Stream.zipWith(
* [1, 2],
* [3, 4, 5],
* [true, false]
* )(
* (a, b, c) => c ? a + b : a - b
* ).toArray()
* // => [4, -2]

@@ -520,11 +526,11 @@ * @note ends the Stream when any of the given streams ends

var zipWith = function () {
var iters = [];
var sources = [];
for (var _i = 0; _i < arguments.length; _i++) {
iters[_i] = arguments[_i];
sources[_i] = arguments[_i];
}
return function (zipFun) {
if (iters.some(internal_1.StreamSource.isEmptyInstance)) {
if (sources.some(internal_1.StreamSource.isEmptyInstance)) {
return internal_1.Stream.empty();
}
return new stream_custom_1.FromStream(function () { return new ZipWithIterator(iters, zipFun); });
return new stream_custom_1.FromStream(function () { return new ZipWithIterator(sources, zipFun); });
};

@@ -534,4 +540,4 @@ };

/**
* Returns a Stream with tuples containing each successive value from the given `streams`.
* @param streams - the input stream sources
* Returns a Stream with tuples containing each successive value from the given `sources`.
* @param sources - the input stream sources
* @example

@@ -542,22 +548,23 @@ * Stream.zip([1, 2, 3], [4, 5], ['a', 'b', 'c']).toArray() // => [[1, 4, 'a'], [2, 5, 'b']]

var zip = function () {
var iters = [];
var sources = [];
for (var _i = 0; _i < arguments.length; _i++) {
iters[_i] = arguments[_i];
sources[_i] = arguments[_i];
}
return exports.zipWith.apply(void 0, (0, tslib_1.__spreadArray)([], (0, tslib_1.__read)(iters), false))(Array);
return exports.zipWith.apply(void 0, (0, tslib_1.__spreadArray)([], (0, tslib_1.__read)(sources), false))(Array);
};
exports.zip = zip;
/**
* Returns a Stream with the result of applying given `zipFun` to each successive value resulting from the given `streams`, adding
* Returns a Stream with the result of applying given `zipFun` to each successive value resulting from the given `sources`, adding
* given `fillValue` to any Streams that end before all streams have ended.
* @param sources - the input stream sources
* @param fillValue - the value to add to streams that end early
* @param zipFun - a function taking one element from each given Stream, and returning a result value
* @param streams - the input stream sources
* @example
* Stream.zipAllWith(
* 0,
* (a, b, c) => a + b + c,
* [1, 2],
* [3, 4, 5],
* [6, 7]
* )(
* 0,
* (a, b, c) => a + b + c,
* ).toArray()

@@ -567,12 +574,12 @@ * // => [10, 13, 5]

var zipAllWith = function () {
var streams = [];
var sources = [];
for (var _i = 0; _i < arguments.length; _i++) {
streams[_i] = arguments[_i];
sources[_i] = arguments[_i];
}
return function (fillValue, zipFun) {
if (streams.every(internal_1.StreamSource.isEmptyInstance)) {
if (sources.every(internal_1.StreamSource.isEmptyInstance)) {
return internal_1.Stream.empty();
}
return new stream_custom_1.FromStream(function () {
return new ZipAllWithItererator(fillValue, streams, zipFun);
return new ZipAllWithItererator(fillValue, sources, zipFun);
});

@@ -583,6 +590,6 @@ };

/**
* Returns a Stream with tuples containing each successive value from the given `streams`, adding given `fillValue` to any Streams
* Returns a Stream with tuples containing each successive value from the given `sources`, adding given `fillValue` to any Streams
* that end before all streams have ended.
* @param fillValue - the value to add to streams that end early
* @param streams - the input stream sources
* @param sources - the input stream sources
* @example

@@ -598,7 +605,7 @@ * Stream.zipAll(

var zipAll = function (fillValue) {
var streams = [];
var sources = [];
for (var _i = 1; _i < arguments.length; _i++) {
streams[_i - 1] = arguments[_i];
sources[_i - 1] = arguments[_i];
}
return exports.zipAllWith.apply(void 0, (0, tslib_1.__spreadArray)([], (0, tslib_1.__read)(streams), false))(fillValue, Array);
return exports.zipAllWith.apply(void 0, (0, tslib_1.__spreadArray)([], (0, tslib_1.__read)(sources), false))(fillValue, Array);
};

@@ -605,0 +612,0 @@ exports.zipAll = zipAll;

import { __awaiter } from "tslib";
import { RimbuError, Token } from '@rimbu/base';
import { AsyncOptLazy, Reducer, } from '@rimbu/common';
import { AsyncFastIterator, AsyncStream, AsyncStreamSource, Stream, } from '../internal';
import { AsyncFastIteratorBase, AsyncFromStream, AsyncStreamBase, } from './async-stream-custom';
import { AsyncOptLazy, Reducer } from '@rimbu/common';
import { AsyncFastIterator, AsyncStream, AsyncStreamSource, Stream } from '../internal';
import { AsyncFastIteratorBase, AsyncFromStream, AsyncStreamBase } from './async-stream-custom';
import { closeIters } from './utils';

@@ -44,11 +44,12 @@ class AsyncOfIterator extends AsyncFastIteratorBase {

};
/** Returns an AsyncStream with the result of applying given `zipFun` to each successive value resulting from the given `streams`.
/** Returns an AsyncStream with the result of applying given `zipFun` to each successive value resulting from the given `sources`.
* @param sources - the input async stream sources
* @param zipFun - a potentially asynchronous function taking one element from each given Stream, and returning a result value
* @param streams - the input async stream sources
* @example
* await AsyncStream.zipWith(
* async (a, b, c) => c ? a + b : a - b,
* [1, 2],
* [3, 4, 5],
* [true, false]
* )(
* async (a, b, c) => c ? a + b : a - b,
* ).toArray()

@@ -58,11 +59,13 @@ * // => [4, -2]

*/
export const zipWith = (zipFun, ...iters) => {
if (iters.some(AsyncStreamSource.isEmptyInstance)) {
return AsyncStream.empty();
}
return new AsyncFromStream(() => new AsyncZipWithIterator(iters, zipFun));
export const zipWith = (...sources) => {
return (zipFun) => {
if (sources.some(AsyncStreamSource.isEmptyInstance)) {
return AsyncStream.empty();
}
return new AsyncFromStream(() => new AsyncZipWithIterator(sources, zipFun));
};
};
/**
* Returns an AsyncStream with tuples containing each successive value from the given `streams`.
* @param streams - the input async stream sources
* Returns an AsyncStream with tuples containing each successive value from the given `sources`.
* @param sources - the input async stream sources
* @example

@@ -77,30 +80,33 @@ * await AsyncStream.zip(

*/
export const zip = (...iters) => zipWith(Array, ...iters);
export const zip = (...sources) => zipWith(...sources)(Array);
/**
* Returns an AsyncStream with the result of applying given `zipFun` to each successive value resulting from the given `streams`, adding
* Returns an AsyncStream with the result of applying given `zipFun` to each successive value resulting from the given `sources`, adding
* given `fillValue` to any Streams that end before all streams have ended.
* @param sources - the input async stream sources
* @param fillValue - the `AsyncOptLazyz value to add to streams that end early
* @param zipFun - a potentially asynchronous function taking one element from each given Stream, and returning a result value
* @param streams - the input async stream sources
* @example
* await AsyncStream.zipAllWith(
* async () => 0,
* async (a, b, c) => a + b + c,
* [1, 2],
* [3, 4, 5],
* [6, 7]
* )(
* async () => 0,
* async (a, b, c) => a + b + c,
* ).toArray()
* // => [10, 13, 5]
*/
export const zipAllWith = (fillValue, zipFun, ...streams) => {
if (streams.every(AsyncStreamSource.isEmptyInstance)) {
return AsyncStream.empty();
}
return new AsyncFromStream(() => new AsyncZipAllWithItererator(fillValue, streams, zipFun));
export const zipAllWith = (...sources) => {
return (fillValue, zipFun) => {
if (sources.every(AsyncStreamSource.isEmptyInstance)) {
return AsyncStream.empty();
}
return new AsyncFromStream(() => new AsyncZipAllWithItererator(fillValue, sources, zipFun));
};
};
/**
* Returns an AsyncStream with tuples containing each successive value from the given `streams`, adding given `fillValue` to any streams
* Returns an AsyncStream with tuples containing each successive value from the given `sources`, adding given `fillValue` to any streams
* that end before all streams have ended.
* @param fillValue - the `AsyncOptLazy` value to add to streams that end early
* @param streams - the input async stream sources
* @param sources - the input async stream sources
* @example

@@ -116,3 +122,3 @@ * await AsyncStream.zipAll(

*/
export const zipAll = (fillValue, ...streams) => zipAllWith(fillValue, Array, ...streams);
export const zipAll = (fillValue, ...sources) => zipAllWith(...sources)(fillValue, Array);
/**

@@ -119,0 +125,0 @@ * Returns an AsyncStream concatenating the given `source` AsyncStreamSource containing StreamSources.

@@ -444,21 +444,27 @@ import { RimbuError, Token } from '@rimbu/base';

/**
* Returns a Stream with the result of applying given `zipFun` to each successive value resulting from the given `streams`.
* Returns a Stream with the result of applying given `zipFun` to each successive value resulting from the given `sources`.
* @param sources - the input stream sources
* @param zipFun - a function taking one element from each given Stream, and returning a result value
* @param streams - the input stream sources
* @example
* Stream.zipWith((a, b, c) => c ? a + b : a - b, [1, 2], [3, 4, 5], [true, false]).toArray()
* Stream.zipWith(
* [1, 2],
* [3, 4, 5],
* [true, false]
* )(
* (a, b, c) => c ? a + b : a - b
* ).toArray()
* // => [4, -2]
* @note ends the Stream when any of the given streams ends
*/
export const zipWith = (...iters) => {
export const zipWith = (...sources) => {
return (zipFun) => {
if (iters.some(StreamSource.isEmptyInstance)) {
if (sources.some(StreamSource.isEmptyInstance)) {
return Stream.empty();
}
return new FromStream(() => new ZipWithIterator(iters, zipFun));
return new FromStream(() => new ZipWithIterator(sources, zipFun));
};
};
/**
* Returns a Stream with tuples containing each successive value from the given `streams`.
* @param streams - the input stream sources
* Returns a Stream with tuples containing each successive value from the given `sources`.
* @param sources - the input stream sources
* @example

@@ -468,32 +474,33 @@ * Stream.zip([1, 2, 3], [4, 5], ['a', 'b', 'c']).toArray() // => [[1, 4, 'a'], [2, 5, 'b']]

*/
export const zip = (...iters) => zipWith(...iters)(Array);
export const zip = (...sources) => zipWith(...sources)(Array);
/**
* Returns a Stream with the result of applying given `zipFun` to each successive value resulting from the given `streams`, adding
* Returns a Stream with the result of applying given `zipFun` to each successive value resulting from the given `sources`, adding
* given `fillValue` to any Streams that end before all streams have ended.
* @param sources - the input stream sources
* @param fillValue - the value to add to streams that end early
* @param zipFun - a function taking one element from each given Stream, and returning a result value
* @param streams - the input stream sources
* @example
* Stream.zipAllWith(
* 0,
* (a, b, c) => a + b + c,
* [1, 2],
* [3, 4, 5],
* [6, 7]
* )(
* 0,
* (a, b, c) => a + b + c,
* ).toArray()
* // => [10, 13, 5]
*/
export const zipAllWith = (...streams) => {
export const zipAllWith = (...sources) => {
return (fillValue, zipFun) => {
if (streams.every(StreamSource.isEmptyInstance)) {
if (sources.every(StreamSource.isEmptyInstance)) {
return Stream.empty();
}
return new FromStream(() => new ZipAllWithItererator(fillValue, streams, zipFun));
return new FromStream(() => new ZipAllWithItererator(fillValue, sources, zipFun));
};
};
/**
* Returns a Stream with tuples containing each successive value from the given `streams`, adding given `fillValue` to any Streams
* Returns a Stream with tuples containing each successive value from the given `sources`, adding given `fillValue` to any Streams
* that end before all streams have ended.
* @param fillValue - the value to add to streams that end early
* @param streams - the input stream sources
* @param sources - the input stream sources
* @example

@@ -508,3 +515,3 @@ * Stream.zipAll(

*/
export const zipAll = (fillValue, ...streams) => zipAllWith(...streams)(fillValue, Array);
export const zipAll = (fillValue, ...sources) => zipAllWith(...sources)(fillValue, Array);
class ArrayIterator extends FastIteratorBase {

@@ -511,0 +518,0 @@ constructor(array, startIndex, endIndex) {

@@ -1,7 +0,7 @@

import type { MaybePromise, OptLazy } from '@rimbu/common';
import { AsyncStreamable, StreamSource } from '../internal';
export declare type AsyncStreamSource<T> = AsyncStreamSource.NonEmpty<T> | (() => MaybePromise<AsyncStreamSource<T>>) | AsyncStreamable<T> | StreamSource<T> | AsyncIterable<T>;
import type { MaybePromise } from '@rimbu/common';
import { AsyncStream, AsyncStreamable, StreamSource } from '../internal';
export declare type AsyncStreamSource<T> = AsyncStreamSource.NonEmpty<T> | AsyncStream<T> | (() => MaybePromise<AsyncStreamSource<T>>) | AsyncStreamable<T> | StreamSource<T> | AsyncIterable<T>;
export declare namespace AsyncStreamSource {
type NonEmpty<T> = OptLazy<AsyncStreamable.NonEmpty<T> | StreamSource.NonEmpty<T> | (() => MaybePromise<AsyncStreamSource.NonEmpty<T>>)>;
type NonEmpty<T> = AsyncStream.NonEmpty<T> | AsyncStreamable.NonEmpty<T> | StreamSource.NonEmpty<T> | (() => MaybePromise<AsyncStreamSource.NonEmpty<T>>);
function isEmptyInstance(source: AsyncStreamSource<any>): boolean;
}
import { Token } from '@rimbu/base';
import { ArrayNonEmpty, AsyncOptLazy, MaybePromise, OptLazy } from '@rimbu/common';
import { ArrayNonEmpty, AsyncOptLazy, MaybePromise } from '@rimbu/common';
import { AsyncStream, AsyncStreamSource } from '../internal';

@@ -13,11 +13,12 @@ export declare function of<T>(...values: ArrayNonEmpty<AsyncOptLazy<T>>): AsyncStream.NonEmpty<T>;

};
/** Returns an AsyncStream with the result of applying given `zipFun` to each successive value resulting from the given `streams`.
/** Returns an AsyncStream with the result of applying given `zipFun` to each successive value resulting from the given `sources`.
* @param sources - the input async stream sources
* @param zipFun - a potentially asynchronous function taking one element from each given Stream, and returning a result value
* @param streams - the input async stream sources
* @example
* await AsyncStream.zipWith(
* async (a, b, c) => c ? a + b : a - b,
* [1, 2],
* [3, 4, 5],
* [true, false]
* )(
* async (a, b, c) => c ? a + b : a - b,
* ).toArray()

@@ -28,12 +29,12 @@ * // => [4, -2]

export declare const zipWith: {
<I extends readonly unknown[], R>(zipFun: (...values: I) => R, ...iters: {
<I extends readonly unknown[]>(...sources: {
[K in keyof I]: AsyncStreamSource.NonEmpty<I[K]>;
} & unknown[]): AsyncStream.NonEmpty<R>;
<I extends readonly unknown[], R>(zipFun: (...values: I) => R, ...iters: {
} & unknown[]): <R>(zipFun: (...values: I) => R) => AsyncStream.NonEmpty<R>;
<I extends readonly unknown[]>(...sources: {
[K in keyof I]: AsyncStreamSource<I[K]>;
} & unknown[]): AsyncStream<R>;
} & unknown[]): <R>(zipFun: (...values: I) => R) => AsyncStream<R>;
};
/**
* Returns an AsyncStream with tuples containing each successive value from the given `streams`.
* @param streams - the input async stream sources
* Returns an AsyncStream with tuples containing each successive value from the given `sources`.
* @param sources - the input async stream sources
* @example

@@ -49,6 +50,6 @@ * await AsyncStream.zip(

export declare const zip: {
<I extends readonly unknown[]>(...iters: {
<I extends readonly unknown[]>(...sources: {
[K in keyof I]: AsyncStreamSource.NonEmpty<I[K]>;
} & unknown[]): AsyncStream.NonEmpty<I>;
<I extends readonly unknown[]>(...iters: {
<I extends readonly unknown[]>(...sources: {
[K in keyof I]: AsyncStreamSource<I[K]>;

@@ -58,14 +59,15 @@ } & unknown[]): AsyncStream<I>;

/**
* Returns an AsyncStream with the result of applying given `zipFun` to each successive value resulting from the given `streams`, adding
* Returns an AsyncStream with the result of applying given `zipFun` to each successive value resulting from the given `sources`, adding
* given `fillValue` to any Streams that end before all streams have ended.
* @param sources - the input async stream sources
* @param fillValue - the `AsyncOptLazyz value to add to streams that end early
* @param zipFun - a potentially asynchronous function taking one element from each given Stream, and returning a result value
* @param streams - the input async stream sources
* @example
* await AsyncStream.zipAllWith(
* async () => 0,
* async (a, b, c) => a + b + c,
* [1, 2],
* [3, 4, 5],
* [6, 7]
* )(
* async () => 0,
* async (a, b, c) => a + b + c,
* ).toArray()

@@ -75,18 +77,18 @@ * // => [10, 13, 5]

export declare const zipAllWith: {
<I extends readonly unknown[], O, R>(fillValue: AsyncOptLazy<O>, zipFun: (...values: {
[K in keyof I]: I[K] | O;
}) => MaybePromise<R>, ...streams: {
<I extends readonly unknown[]>(...sources: {
[K in keyof I]: AsyncStreamSource.NonEmpty<I[K]>;
} & unknown[]): AsyncStream.NonEmpty<R>;
<I extends readonly unknown[], O, R>(fillValue: OptLazy<O>, zipFun: (...values: {
} & unknown[]): <O, R>(fillValue: AsyncOptLazy<O>, zipFun: (...values: {
[K in keyof I]: I[K] | O;
}) => MaybePromise<R>, ...streams: {
}) => MaybePromise<R>) => AsyncStream.NonEmpty<R>;
<I extends readonly unknown[]>(...sources: {
[K in keyof I]: AsyncStreamSource<I[K]>;
} & unknown[]): AsyncStream<R>;
} & unknown[]): <O, R>(fillValue: AsyncOptLazy<O>, zipFun: (...values: {
[K in keyof I]: I[K] | O;
}) => MaybePromise<R>) => AsyncStream<R>;
};
/**
* Returns an AsyncStream with tuples containing each successive value from the given `streams`, adding given `fillValue` to any streams
* Returns an AsyncStream with tuples containing each successive value from the given `sources`, adding given `fillValue` to any streams
* that end before all streams have ended.
* @param fillValue - the `AsyncOptLazy` value to add to streams that end early
* @param streams - the input async stream sources
* @param sources - the input async stream sources
* @example

@@ -103,3 +105,3 @@ * await AsyncStream.zipAll(

export declare const zipAll: {
<I extends readonly unknown[], O>(fillValue: AsyncOptLazy<O>, ...streams: {
<I extends readonly unknown[], O>(fillValue: AsyncOptLazy<O>, ...sources: {
[K in keyof I]: AsyncStreamSource.NonEmpty<I[K]>;

@@ -109,3 +111,3 @@ } & unknown[]): AsyncStream.NonEmpty<{

}>;
<I extends readonly unknown[], O>(fillValue: AsyncOptLazy<O>, ...streams: {
<I extends readonly unknown[], O>(fillValue: AsyncOptLazy<O>, ...sources: {
[K in keyof I]: AsyncStreamSource<I[K]>;

@@ -112,0 +114,0 @@ } & unknown[]): AsyncStream<{

@@ -198,7 +198,13 @@ import { Token } from '@rimbu/base';

/**
* Returns a Stream with the result of applying given `zipFun` to each successive value resulting from the given `streams`.
* Returns a Stream with the result of applying given `zipFun` to each successive value resulting from the given `sources`.
* @param sources - the input stream sources
* @param zipFun - a function taking one element from each given Stream, and returning a result value
* @param streams - the input stream sources
* @example
* Stream.zipWith((a, b, c) => c ? a + b : a - b, [1, 2], [3, 4, 5], [true, false]).toArray()
* Stream.zipWith(
* [1, 2],
* [3, 4, 5],
* [true, false]
* )(
* (a, b, c) => c ? a + b : a - b
* ).toArray()
* // => [4, -2]

@@ -208,6 +214,6 @@ * @note ends the Stream when any of the given streams ends

export declare const zipWith: {
<I extends readonly unknown[]>(...iters: {
<I extends readonly unknown[]>(...sources: {
[K in keyof I]: StreamSource.NonEmpty<I[K]>;
} & unknown[]): <R>(zipFun: (...values: I) => R) => Stream.NonEmpty<R>;
<I extends readonly unknown[]>(...iters: {
<I extends readonly unknown[]>(...sources: {
[K in keyof I]: StreamSource<I[K]>;

@@ -217,4 +223,4 @@ } & unknown[]): <R>(zipFun: (...values: I) => R) => Stream<R>;

/**
* Returns a Stream with tuples containing each successive value from the given `streams`.
* @param streams - the input stream sources
* Returns a Stream with tuples containing each successive value from the given `sources`.
* @param sources - the input stream sources
* @example

@@ -225,6 +231,6 @@ * Stream.zip([1, 2, 3], [4, 5], ['a', 'b', 'c']).toArray() // => [[1, 4, 'a'], [2, 5, 'b']]

export declare const zip: {
<I extends readonly unknown[]>(...iters: {
<I extends readonly unknown[]>(...sources: {
[K in keyof I]: StreamSource.NonEmpty<I[K]>;
} & unknown[]): Stream.NonEmpty<I>;
<I extends readonly unknown[]>(...iters: {
<I extends readonly unknown[]>(...sources: {
[K in keyof I]: StreamSource<I[K]>;

@@ -234,14 +240,15 @@ } & unknown[]): Stream<I>;

/**
* Returns a Stream with the result of applying given `zipFun` to each successive value resulting from the given `streams`, adding
* Returns a Stream with the result of applying given `zipFun` to each successive value resulting from the given `sources`, adding
* given `fillValue` to any Streams that end before all streams have ended.
* @param sources - the input stream sources
* @param fillValue - the value to add to streams that end early
* @param zipFun - a function taking one element from each given Stream, and returning a result value
* @param streams - the input stream sources
* @example
* Stream.zipAllWith(
* 0,
* (a, b, c) => a + b + c,
* [1, 2],
* [3, 4, 5],
* [6, 7]
* )(
* 0,
* (a, b, c) => a + b + c,
* ).toArray()

@@ -251,3 +258,3 @@ * // => [10, 13, 5]

export declare const zipAllWith: {
<I extends readonly unknown[]>(...streams: {
<I extends readonly unknown[]>(...sources: {
[K in keyof I]: StreamSource.NonEmpty<I[K]>;

@@ -257,3 +264,3 @@ } & unknown[]): <O, R>(fillValue: OptLazy<O>, zipFun: (...values: {

}) => R) => Stream.NonEmpty<R>;
<I extends readonly unknown[]>(...streams: {
<I extends readonly unknown[]>(...sources: {
[K in keyof I]: StreamSource<I[K]>;

@@ -265,6 +272,6 @@ } & unknown[]): <O, R>(fillValue: OptLazy<O>, zipFun: (...values: {

/**
* Returns a Stream with tuples containing each successive value from the given `streams`, adding given `fillValue` to any Streams
* Returns a Stream with tuples containing each successive value from the given `sources`, adding given `fillValue` to any Streams
* that end before all streams have ended.
* @param fillValue - the value to add to streams that end early
* @param streams - the input stream sources
* @param sources - the input stream sources
* @example

@@ -280,3 +287,3 @@ * Stream.zipAll(

export declare const zipAll: {
<I extends readonly unknown[], O>(fillValue: OptLazy<O>, ...streams: {
<I extends readonly unknown[], O>(fillValue: OptLazy<O>, ...sources: {
[K in keyof I]: StreamSource.NonEmpty<I[K]>;

@@ -286,3 +293,3 @@ } & unknown[]): Stream.NonEmpty<{

}>;
<I extends readonly unknown[], O>(fillValue: OptLazy<O>, ...streams: {
<I extends readonly unknown[], O>(fillValue: OptLazy<O>, ...sources: {
[K in keyof I]: StreamSource<I[K]>;

@@ -289,0 +296,0 @@ } & unknown[]): Stream<{

@@ -1,6 +0,6 @@

import { Streamable } from '../internal';
import { Stream, Streamable } from '../internal';
/**
* Any object that is Iterable, a Stream, or can produce a Stream.
*/
export declare type StreamSource<T> = Iterable<T> | Streamable<T>;
export declare type StreamSource<T> = Iterable<T> | Stream<T> | Streamable<T>;
export declare namespace StreamSource {

@@ -10,3 +10,3 @@ /**

*/
type NonEmpty<T> = Streamable.NonEmpty<T> | readonly [T, ...T[]];
type NonEmpty<T> = Stream.NonEmpty<T> | Streamable.NonEmpty<T> | readonly [T, ...T[]];
/**

@@ -13,0 +13,0 @@ * Returns true if the given `source` StreamSource is known to be empty.

{
"name": "@rimbu/stream",
"version": "0.8.0",
"version": "0.8.1",
"description": "Efficient structure representing a sequence of elements, with powerful operations for TypeScript",

@@ -61,4 +61,4 @@ "keywords": [

"dependencies": {
"@rimbu/base": "^0.7.0",
"@rimbu/common": "^0.8.0",
"@rimbu/base": "^0.7.1",
"@rimbu/common": "^0.8.1",
"tslib": "^2.3.1"

@@ -72,3 +72,3 @@ },

},
"gitHead": "c321aa32b1c5fd8ca8b7fb1c26bd4f7bbf3ef70d"
"gitHead": "d507e628ab82d501bc31351d69168e09bac5ae14"
}

@@ -1,2 +0,2 @@

import type { MaybePromise, OptLazy } from '@rimbu/common';
import type { MaybePromise } from '@rimbu/common';
import { AsyncStream, AsyncStreamable, StreamSource } from '../internal';

@@ -6,2 +6,3 @@

| AsyncStreamSource.NonEmpty<T>
| AsyncStream<T>
| (() => MaybePromise<AsyncStreamSource<T>>)

@@ -13,7 +14,7 @@ | AsyncStreamable<T>

export namespace AsyncStreamSource {
export type NonEmpty<T> = OptLazy<
export type NonEmpty<T> =
| AsyncStream.NonEmpty<T>
| AsyncStreamable.NonEmpty<T>
| StreamSource.NonEmpty<T>
| (() => MaybePromise<AsyncStreamSource.NonEmpty<T>>)
>;
| (() => MaybePromise<AsyncStreamSource.NonEmpty<T>>);

@@ -20,0 +21,0 @@ export function isEmptyInstance(source: AsyncStreamSource<any>): boolean {

@@ -6,5 +6,3 @@ import { RimbuError, Token } from '@rimbu/base';

AsyncReducer,
MaybePromise,
OptLazy,
Reducer,
MaybePromise, Reducer
} from '@rimbu/common';

@@ -15,3 +13,3 @@ import {

AsyncStreamSource,
Stream,
Stream
} from '../internal';

@@ -21,3 +19,3 @@ import {

AsyncFromStream,
AsyncStreamBase,
AsyncStreamBase
} from './async-stream-custom';

@@ -91,11 +89,12 @@ import { closeIters } from './utils';

/** Returns an AsyncStream with the result of applying given `zipFun` to each successive value resulting from the given `streams`.
/** Returns an AsyncStream with the result of applying given `zipFun` to each successive value resulting from the given `sources`.
* @param sources - the input async stream sources
* @param zipFun - a potentially asynchronous function taking one element from each given Stream, and returning a result value
* @param streams - the input async stream sources
* @example
* await AsyncStream.zipWith(
* async (a, b, c) => c ? a + b : a - b,
* [1, 2],
* [3, 4, 5],
* [true, false]
* )(
* async (a, b, c) => c ? a + b : a - b,
* ).toArray()

@@ -106,23 +105,21 @@ * // => [4, -2]

export const zipWith: {
<I extends readonly unknown[], R>(
zipFun: (...values: I) => R,
...iters: { [K in keyof I]: AsyncStreamSource.NonEmpty<I[K]> } & unknown[]
): AsyncStream.NonEmpty<R>;
<I extends readonly unknown[], R>(
zipFun: (...values: I) => R,
...iters: { [K in keyof I]: AsyncStreamSource<I[K]> } & unknown[]
): AsyncStream<R>;
} = (zipFun, ...iters) => {
if (iters.some(AsyncStreamSource.isEmptyInstance)) {
return AsyncStream.empty();
}
<I extends readonly unknown[]>(
...sources: { [K in keyof I]: AsyncStreamSource.NonEmpty<I[K]> } & unknown[]
): <R>(zipFun: (...values: I) => R) => AsyncStream.NonEmpty<R>;
<I extends readonly unknown[]>(
...sources: { [K in keyof I]: AsyncStreamSource<I[K]> } & unknown[]
): <R>(zipFun: (...values: I) => R) => AsyncStream<R>;
} = (...sources): any => {
return (zipFun: any): any => {
if (sources.some(AsyncStreamSource.isEmptyInstance)) {
return AsyncStream.empty();
}
return new AsyncFromStream(
() => new AsyncZipWithIterator(iters as any, zipFun)
) as any;
return new AsyncFromStream(() => new AsyncZipWithIterator(sources, zipFun));
};
};
/**
* Returns an AsyncStream with tuples containing each successive value from the given `streams`.
* @param streams - the input async stream sources
* Returns an AsyncStream with tuples containing each successive value from the given `sources`.
* @param sources - the input async stream sources
* @example

@@ -139,22 +136,23 @@ * await AsyncStream.zip(

<I extends readonly unknown[]>(
...iters: { [K in keyof I]: AsyncStreamSource.NonEmpty<I[K]> } & unknown[]
...sources: { [K in keyof I]: AsyncStreamSource.NonEmpty<I[K]> } & unknown[]
): AsyncStream.NonEmpty<I>;
<I extends readonly unknown[]>(
...iters: { [K in keyof I]: AsyncStreamSource<I[K]> } & unknown[]
...sources: { [K in keyof I]: AsyncStreamSource<I[K]> } & unknown[]
): AsyncStream<I>;
} = (...iters) => zipWith(Array, ...(iters as any)) as any;
} = (...sources): any => zipWith(...sources)(Array);
/**
* Returns an AsyncStream with the result of applying given `zipFun` to each successive value resulting from the given `streams`, adding
* Returns an AsyncStream with the result of applying given `zipFun` to each successive value resulting from the given `sources`, adding
* given `fillValue` to any Streams that end before all streams have ended.
* @param sources - the input async stream sources
* @param fillValue - the `AsyncOptLazyz value to add to streams that end early
* @param zipFun - a potentially asynchronous function taking one element from each given Stream, and returning a result value
* @param streams - the input async stream sources
* @example
* await AsyncStream.zipAllWith(
* async () => 0,
* async (a, b, c) => a + b + c,
* [1, 2],
* [3, 4, 5],
* [6, 7]
* )(
* async () => 0,
* async (a, b, c) => a + b + c,
* ).toArray()

@@ -164,28 +162,32 @@ * // => [10, 13, 5]

export const zipAllWith: {
<I extends readonly unknown[], O, R>(
<I extends readonly unknown[]>(
...sources: { [K in keyof I]: AsyncStreamSource.NonEmpty<I[K]> } & unknown[]
): <O, R>(
fillValue: AsyncOptLazy<O>,
zipFun: (...values: { [K in keyof I]: I[K] | O }) => MaybePromise<R>,
...streams: { [K in keyof I]: AsyncStreamSource.NonEmpty<I[K]> } & unknown[]
): AsyncStream.NonEmpty<R>;
<I extends readonly unknown[], O, R>(
fillValue: OptLazy<O>,
zipFun: (...values: { [K in keyof I]: I[K] | O }) => MaybePromise<R>,
...streams: { [K in keyof I]: AsyncStreamSource<I[K]> } & unknown[]
): AsyncStream<R>;
} = (fillValue, zipFun, ...streams) => {
if (streams.every(AsyncStreamSource.isEmptyInstance)) {
return AsyncStream.empty();
}
zipFun: (...values: { [K in keyof I]: I[K] | O }) => MaybePromise<R>
) => AsyncStream.NonEmpty<R>;
<I extends readonly unknown[]>(
...sources: { [K in keyof I]: AsyncStreamSource<I[K]> } & unknown[]
): <O, R>(
fillValue: AsyncOptLazy<O>,
zipFun: (...values: { [K in keyof I]: I[K] | O }) => MaybePromise<R>
) => AsyncStream<R>;
} = (...sources): any => {
return (fillValue: any, zipFun: any): any => {
if (sources.every(AsyncStreamSource.isEmptyInstance)) {
return AsyncStream.empty();
}
return new AsyncFromStream(
(): AsyncFastIterator<any> =>
new AsyncZipAllWithItererator(fillValue, streams, zipFun as any)
) as any;
return new AsyncFromStream(
(): AsyncFastIterator<any> =>
new AsyncZipAllWithItererator(fillValue, sources, zipFun)
);
};
};
/**
* Returns an AsyncStream with tuples containing each successive value from the given `streams`, adding given `fillValue` to any streams
* Returns an AsyncStream with tuples containing each successive value from the given `sources`, adding given `fillValue` to any streams
* that end before all streams have ended.
* @param fillValue - the `AsyncOptLazy` value to add to streams that end early
* @param streams - the input async stream sources
* @param sources - the input async stream sources
* @example

@@ -204,10 +206,10 @@ * await AsyncStream.zipAll(

fillValue: AsyncOptLazy<O>,
...streams: { [K in keyof I]: AsyncStreamSource.NonEmpty<I[K]> } & unknown[]
...sources: { [K in keyof I]: AsyncStreamSource.NonEmpty<I[K]> } & unknown[]
): AsyncStream.NonEmpty<{ [K in keyof I]: I[K] | O }>;
<I extends readonly unknown[], O>(
fillValue: AsyncOptLazy<O>,
...streams: { [K in keyof I]: AsyncStreamSource<I[K]> } & unknown[]
...sources: { [K in keyof I]: AsyncStreamSource<I[K]> } & unknown[]
): AsyncStream<{ [K in keyof I]: I[K] | O }>;
} = (fillValue, ...streams) =>
zipAllWith(fillValue, Array, ...(streams as any)) as any;
} = (fillValue, ...sources): any =>
zipAllWith(...(sources as any))(fillValue, Array);

@@ -214,0 +216,0 @@ /**

@@ -561,7 +561,13 @@ import { RimbuError, Token } from '@rimbu/base';

/**
* Returns a Stream with the result of applying given `zipFun` to each successive value resulting from the given `streams`.
* Returns a Stream with the result of applying given `zipFun` to each successive value resulting from the given `sources`.
* @param sources - the input stream sources
* @param zipFun - a function taking one element from each given Stream, and returning a result value
* @param streams - the input stream sources
* @example
* Stream.zipWith((a, b, c) => c ? a + b : a - b, [1, 2], [3, 4, 5], [true, false]).toArray()
* Stream.zipWith(
* [1, 2],
* [3, 4, 5],
* [true, false]
* )(
* (a, b, c) => c ? a + b : a - b
* ).toArray()
* // => [4, -2]

@@ -572,14 +578,14 @@ * @note ends the Stream when any of the given streams ends

<I extends readonly unknown[]>(
...iters: { [K in keyof I]: StreamSource.NonEmpty<I[K]> } & unknown[]
...sources: { [K in keyof I]: StreamSource.NonEmpty<I[K]> } & unknown[]
): <R>(zipFun: (...values: I) => R) => Stream.NonEmpty<R>;
<I extends readonly unknown[]>(
...iters: { [K in keyof I]: StreamSource<I[K]> } & unknown[]
...sources: { [K in keyof I]: StreamSource<I[K]> } & unknown[]
): <R>(zipFun: (...values: I) => R) => Stream<R>;
} = (...iters) => {
} = (...sources) => {
return (zipFun): any => {
if (iters.some(StreamSource.isEmptyInstance)) {
if (sources.some(StreamSource.isEmptyInstance)) {
return Stream.empty();
}
return new FromStream(() => new ZipWithIterator(iters as any, zipFun));
return new FromStream(() => new ZipWithIterator(sources as any, zipFun));
};

@@ -589,4 +595,4 @@ };

/**
* Returns a Stream with tuples containing each successive value from the given `streams`.
* @param streams - the input stream sources
* Returns a Stream with tuples containing each successive value from the given `sources`.
* @param sources - the input stream sources
* @example

@@ -598,22 +604,23 @@ * Stream.zip([1, 2, 3], [4, 5], ['a', 'b', 'c']).toArray() // => [[1, 4, 'a'], [2, 5, 'b']]

<I extends readonly unknown[]>(
...iters: { [K in keyof I]: StreamSource.NonEmpty<I[K]> } & unknown[]
...sources: { [K in keyof I]: StreamSource.NonEmpty<I[K]> } & unknown[]
): Stream.NonEmpty<I>;
<I extends readonly unknown[]>(
...iters: { [K in keyof I]: StreamSource<I[K]> } & unknown[]
...sources: { [K in keyof I]: StreamSource<I[K]> } & unknown[]
): Stream<I>;
} = (...iters) => zipWith(...(iters as any))(Array);
} = (...sources) => zipWith(...(sources as any))(Array);
/**
* Returns a Stream with the result of applying given `zipFun` to each successive value resulting from the given `streams`, adding
* Returns a Stream with the result of applying given `zipFun` to each successive value resulting from the given `sources`, adding
* given `fillValue` to any Streams that end before all streams have ended.
* @param sources - the input stream sources
* @param fillValue - the value to add to streams that end early
* @param zipFun - a function taking one element from each given Stream, and returning a result value
* @param streams - the input stream sources
* @example
* Stream.zipAllWith(
* 0,
* (a, b, c) => a + b + c,
* [1, 2],
* [3, 4, 5],
* [6, 7]
* )(
* 0,
* (a, b, c) => a + b + c,
* ).toArray()

@@ -624,3 +631,3 @@ * // => [10, 13, 5]

<I extends readonly unknown[]>(
...streams: { [K in keyof I]: StreamSource.NonEmpty<I[K]> } & unknown[]
...sources: { [K in keyof I]: StreamSource.NonEmpty<I[K]> } & unknown[]
): <O, R>(

@@ -631,3 +638,3 @@ fillValue: OptLazy<O>,

<I extends readonly unknown[]>(
...streams: { [K in keyof I]: StreamSource<I[K]> } & unknown[]
...sources: { [K in keyof I]: StreamSource<I[K]> } & unknown[]
): <O, R>(

@@ -637,5 +644,5 @@ fillValue: OptLazy<O>,

) => Stream<R>;
} = (...streams) => {
} = (...sources) => {
return (fillValue, zipFun: any): any => {
if (streams.every(StreamSource.isEmptyInstance)) {
if (sources.every(StreamSource.isEmptyInstance)) {
return Stream.empty();

@@ -646,3 +653,3 @@ }

(): FastIterator<any> =>
new ZipAllWithItererator(fillValue, streams, zipFun)
new ZipAllWithItererator(fillValue, sources, zipFun)
);

@@ -653,6 +660,6 @@ };

/**
* Returns a Stream with tuples containing each successive value from the given `streams`, adding given `fillValue` to any Streams
* Returns a Stream with tuples containing each successive value from the given `sources`, adding given `fillValue` to any Streams
* that end before all streams have ended.
* @param fillValue - the value to add to streams that end early
* @param streams - the input stream sources
* @param sources - the input stream sources
* @example

@@ -670,9 +677,9 @@ * Stream.zipAll(

fillValue: OptLazy<O>,
...streams: { [K in keyof I]: StreamSource.NonEmpty<I[K]> } & unknown[]
...sources: { [K in keyof I]: StreamSource.NonEmpty<I[K]> } & unknown[]
): Stream.NonEmpty<{ [K in keyof I]: I[K] | O }>;
<I extends readonly unknown[], O>(
fillValue: OptLazy<O>,
...streams: { [K in keyof I]: StreamSource<I[K]> } & unknown[]
...sources: { [K in keyof I]: StreamSource<I[K]> } & unknown[]
): Stream<{ [K in keyof I]: I[K] | O }>;
} = (fillValue, ...streams) => zipAllWith(...streams)(fillValue, Array) as any;
} = (fillValue, ...sources) => zipAllWith(...sources)(fillValue, Array) as any;

@@ -679,0 +686,0 @@ class ArrayIterator<T> extends FastIteratorBase<T> {

@@ -6,3 +6,3 @@ import { Stream, Streamable } from '../internal';

*/
export type StreamSource<T> = Iterable<T> | Streamable<T>;
export type StreamSource<T> = Iterable<T> | Stream<T> | Streamable<T>;

@@ -13,3 +13,6 @@ export namespace StreamSource {

*/
export type NonEmpty<T> = Streamable.NonEmpty<T> | readonly [T, ...T[]];
export type NonEmpty<T> =
| Stream.NonEmpty<T>
| Streamable.NonEmpty<T>
| readonly [T, ...T[]];

@@ -16,0 +19,0 @@ /**

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc