Socket
Socket
Sign inDemoInstall

wonka

Package Overview
Dependencies
Maintainers
1
Versions
83
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

wonka - npm Package Compare versions

Comparing version 6.1.2 to 6.2.0

CHANGELOG.md

94

dist/wonka.js

@@ -466,2 +466,6 @@ Object.defineProperty(exports, "__esModule", {

var t = {
done: !0
};
function zip(r) {

@@ -732,5 +736,4 @@ var t = Object.keys(r).length;

exports.fromObservable = function fromObservable(e) {
e = e[observableSymbol()] ? e[observableSymbol()]() : e;
return r => {
var t = e.subscribe({
var t = (e[observableSymbol()] ? e[observableSymbol()]() : e).subscribe({
next(e) {

@@ -742,3 +745,5 @@ r(push(e));

},
error() {}
error(e) {
throw e;
}
});

@@ -1252,2 +1257,48 @@ r(start((e => {

exports.toAsyncIterable = r => ({
[Symbol.asyncIterator]() {
var i = [];
var s = !1;
var a = e;
var f;
r((e => {
if (s) {} else if (0 === e) {
if (f) {
f = f(t);
}
s = !0;
} else if (0 === e.tag) {
(a = e[0])(0);
} else if (f) {
f = f({
value: e[0],
done: !1
});
} else {
i.push(e[0]);
}
}));
return {
async next() {
if (s && !i.length) {
return t;
} else if (!s && i.length <= 1) {
a(0);
}
return i.length ? {
value: i.shift(),
done: !1
} : new Promise((e => f = e));
},
async return() {
if (!s) {
f = a(1);
}
s = !0;
return t;
}
};
}
});
exports.toCallbag = function toCallbag(e) {

@@ -1275,25 +1326,32 @@ return (r, t) => {

return {
subscribe(t) {
var i = e;
var s = !1;
subscribe(t, i, s) {
var a = "object" == typeof t ? t : {
next: t,
error: i,
complete: s
};
var f = e;
var n = !1;
r((e => {
if (s) {} else if (0 === e) {
s = !0;
t.complete();
if (n) {} else if (0 === e) {
n = !0;
if (a.complete) {
a.complete();
}
} else if (0 === e.tag) {
(i = e[0])(0);
(f = e[0])(0);
} else {
t.next(e[0]);
i(0);
a.next(e[0]);
f(0);
}
}));
var a = {
var l = {
closed: !1,
unsubscribe() {
a.closed = !0;
s = !0;
i(1);
l.closed = !0;
n = !0;
f(1);
}
};
return a;
return l;
},

@@ -1312,3 +1370,3 @@ [observableSymbol()]() {

if (0 === e) {
t(s);
Promise.resolve(s).then(t);
} else if (0 === e.tag) {

@@ -1315,0 +1373,0 @@ (i = e[0])(0);

64

package.json
{
"name": "wonka",
"description": "A tiny but capable push & pull stream library for TypeScript and Flow",
"version": "6.1.2",
"version": "6.2.0",
"author": "0no.co <hi@0no.co>",

@@ -9,3 +9,3 @@ "source": "./src/index.ts",

"module": "./dist/wonka.mjs",
"types": "./dist/types/index.d.ts",
"types": "./dist/wonka.d.ts",
"exports": {

@@ -15,3 +15,3 @@ ".": {

"require": "./dist/wonka.js",
"types": "./dist/types/index.d.ts",
"types": "./dist/wonka.d.ts",
"source": "./src/index.ts"

@@ -39,10 +39,2 @@ },

],
"scripts": {
"test": "vitest run",
"check": "tsc",
"lint": "eslint --ext=js,ts .",
"build": "rollup -c rollup.config.js",
"clean": "rimraf dist node_modules/.cache",
"prepublishOnly": "run-s clean build check test"
},
"repository": "https://github.com/0no-co/wonka",

@@ -74,32 +66,44 @@ "bugs": {

},
"dependencies": {},
"devDependencies": {
"@rollup/plugin-buble": "^0.21.3",
"@rollup/plugin-commonjs": "^22.0.2",
"@rollup/plugin-node-resolve": "^13.3.0",
"@rollup/plugin-typescript": "^8.3.4",
"@rollup/pluginutils": "^4.2.1",
"@changesets/cli": "^2.25.2",
"@changesets/get-github-info": "^0.5.1",
"@rollup/plugin-buble": "^1.0.1",
"@rollup/plugin-commonjs": "^23.0.3",
"@rollup/plugin-node-resolve": "^15.0.1",
"@rollup/plugin-terser": "^0.1.0",
"@rollup/plugin-typescript": "^10.0.1",
"@rollup/pluginutils": "^5.0.2",
"@types/zen-observable": "^0.8.3",
"@typescript-eslint/eslint-plugin": "^5.33.0",
"@typescript-eslint/parser": "^5.33.0",
"@typescript-eslint/eslint-plugin": "^5.45.0",
"@typescript-eslint/parser": "^5.45.0",
"callbag-from-iter": "^1.3.0",
"callbag-iterate": "^1.0.0",
"callbag-take": "^1.5.0",
"eslint": "^8.21.0",
"eslint": "^8.29.0",
"eslint-config-prettier": "^8.5.0",
"eslint-plugin-prettier": "^4.2.1",
"flowgen": "^1.20.1",
"eslint-plugin-tsdoc": "^0.2.17",
"flowgen": "^1.21.0",
"glob": "^8.0.3",
"husky-v4": "^4.3.8",
"lint-staged": "^13.0.3",
"lint-staged": "^13.0.4",
"npm-run-all": "^4.1.5",
"prettier": "^2.7.1",
"prettier": "^2.8.0",
"rimraf": "^3.0.2",
"rollup": "^2.77.3",
"rollup-plugin-terser": "^7.0.2",
"tslib": "^2.4.0",
"typescript": "^4.8.2",
"vitest": "^0.23.4",
"zen-observable": "^0.8.15"
"rollup": "^3.5.1",
"rollup-plugin-cjs-check": "^1.0.1",
"rollup-plugin-dts": "^5.1.1",
"tslib": "^2.4.1",
"typescript": "^4.9.3",
"vitest": "^0.25.3",
"zen-observable": "^0.10.0"
},
"scripts": {
"test": "vitest run",
"check": "tsc",
"lint": "eslint --ext=js,ts .",
"build": "rollup -c scripts/rollup.config.mjs",
"clean": "rimraf dist node_modules/.cache",
"postinstall": "node ./scripts/postinstall.js"
}
}
}

@@ -232,2 +232,162 @@ import { describe, it, expect, vi } from 'vitest';

describe('toAsyncIterable', () => {
it('creates an async iterable mirroring the Wonka source', async () => {
let pulls = 0;
let sink: Sink<any> | null = null;
const source: Source<any> = _sink => {
sink = _sink;
sink(
start(signal => {
if (signal === TalkbackKind.Pull) pulls++;
})
);
};
const asyncIterator = sinks.toAsyncIterable(source)[Symbol.asyncIterator]();
expect(pulls).toBe(1);
sink!(push(0));
expect(await asyncIterator.next()).toEqual({ value: 0, done: false });
expect(pulls).toBe(2);
sink!(push(1));
expect(await asyncIterator.next()).toEqual({ value: 1, done: false });
expect(pulls).toBe(3);
sink!(SignalKind.End);
expect(await asyncIterator.next()).toEqual({ done: true });
expect(pulls).toBe(3);
});
it('buffers actively pushed values', async () => {
let pulls = 0;
let sink: Sink<any> | null = null;
const source: Source<any> = _sink => {
sink = _sink;
sink(
start(signal => {
if (signal === TalkbackKind.Pull) pulls++;
})
);
};
const asyncIterator = sinks.toAsyncIterable(source)[Symbol.asyncIterator]();
sink!(push(0));
sink!(push(1));
sink!(SignalKind.End);
expect(pulls).toBe(1);
expect(await asyncIterator.next()).toEqual({ value: 0, done: false });
expect(await asyncIterator.next()).toEqual({ value: 1, done: false });
expect(await asyncIterator.next()).toEqual({ done: true });
});
it('asynchronously waits for pulled values', async () => {
let pulls = 0;
let sink: Sink<any> | null = null;
const source: Source<any> = _sink => {
sink = _sink;
sink(
start(signal => {
if (signal === TalkbackKind.Pull) pulls++;
})
);
};
const asyncIterator = sinks.toAsyncIterable(source)[Symbol.asyncIterator]();
expect(pulls).toBe(1);
let resolved = false;
const promise = asyncIterator.next().then(value => {
resolved = true;
return value;
});
await Promise.resolve();
expect(resolved).toBe(false);
sink!(push(0));
sink!(SignalKind.End);
expect(await promise).toEqual({ value: 0, done: false });
expect(await asyncIterator.next()).toEqual({ done: true });
});
it('supports cancellation via return', async () => {
let ended = false;
let sink: Sink<any> | null = null;
const source: Source<any> = _sink => {
sink = _sink;
sink(
start(signal => {
if (signal === TalkbackKind.Close) ended = true;
})
);
};
const asyncIterator = sinks.toAsyncIterable(source)[Symbol.asyncIterator]();
sink!(push(0));
expect(await asyncIterator.next()).toEqual({ value: 0, done: false });
expect(await asyncIterator.return!()).toEqual({ done: true });
sink!(push(1));
expect(await asyncIterator.next()).toEqual({ done: true });
expect(ended).toBeTruthy();
});
it('supports for-await-of', async () => {
let pulls = 0;
const source: Source<any> = sink => {
sink(
start(signal => {
if (signal === TalkbackKind.Pull) {
sink(pulls < 3 ? push(pulls++) : SignalKind.End);
}
})
);
};
const iterable = sinks.toAsyncIterable(source);
const values: any[] = [];
for await (const value of iterable) {
values.push(value);
}
expect(values).toEqual([0, 1, 2]);
});
it('supports for-await-of with early break', async () => {
let pulls = 0;
let closed = false;
const source: Source<any> = sink => {
sink(
start(signal => {
if (signal === TalkbackKind.Pull) {
sink(pulls < 3 ? push(pulls++) : SignalKind.End);
} else {
closed = true;
}
})
);
};
const iterable = sinks.toAsyncIterable(source);
for await (const value of iterable) {
expect(value).toBe(0);
break;
}
expect(closed).toBe(true);
});
});
describe('toObservable', () => {

@@ -234,0 +394,0 @@ it('creates an Observable mirroring the Wonka source', () => {

import { Source, SignalKind } from './types';
import { push, start } from './helpers';
/** A definition of the Callbag type as per its specification.
* @see {@link https://github.com/callbag/callbag} for the Callbag specification.
*/
interface Callbag<I, O> {

@@ -10,2 +13,11 @@ (t: 0, d: Callbag<O, I>): void;

/** Converts a Callbag to a {@link Source}.
* @param callbag - The {@link Callbag} object that will be converted.
* @returns A {@link Source} wrapping the passed Callbag.
*
* @remarks
* This converts a Callbag to a {@link Source}. When this Source receives a {@link Sink} and
* the subscription starts, internally, it'll subscribe to the passed Callbag, passing through
* all of its emitted values.
*/
export function fromCallbag<T>(callbag: Callbag<any, T>): Source<T> {

@@ -29,2 +41,10 @@ return sink => {

/** Converts a {@link Source} to a Callbag.
* @param source - The {@link Source} that will be converted.
* @returns A {@link Callbag} wrapping the passed Source.
*
* @remarks
* This converts a {@link Source} to a {@link Callbag}. When this Callbag is subscribed to, it
* internally subscribes to the Wonka Source and pulls new values.
*/
export function toCallbag<T>(source: Source<T>): Callbag<any, T> {

@@ -31,0 +51,0 @@ return (signal: number, sink: any) => {

@@ -8,13 +8,44 @@ import { Source, TypeOfSource, SignalKind, TalkbackKind, TalkbackFn } from './types';

export function zip<Sources extends readonly [...Source<any>[]]>(
sources: [...Sources]
): Source<TypeOfSourceArray<Sources>>;
/** Combines the latest values of several sources into a Source issuing either tuple or dictionary
* values.
*
* @param sources - Either an array or dictionary object of Sources.
* @returns A {@link Source} issuing a zipped value whenever any input Source updates.
*
* @remarks
* `zip` combines several {@link Source | Sources}. The resulting Source will issue its first value
* once all input Sources have at least issued one value, and will subsequently issue a new value
* each time any of the Sources emits a new value.
*
* Depending on whether an array or dictionary object of Sources is passed to `zip`, its emitted
* values will be arrays or dictionary objects of the Sources' values.
*
* @example
* An example of passing a dictionary object to `zip`. If an array is passed, the resulting
* values will output arrays of the sources' values instead.
*
* ```ts
* pipe(
* zip({
* x: fromValue(1),
* y: fromArray([2, 3]),
* }),
* subscribe(result => {
* // logs { x: 1, y: 2 } then { x: 1, y: 3 }
* console.log(result);
* })
* );
* ```
*/
interface zip {
<Sources extends readonly [...Source<any>[]]>(sources: [...Sources]): Source<
TypeOfSourceArray<Sources>
>;
export function zip<Sources extends { [prop: string]: Source<any> }>(
sources: Sources
): Source<{ [Property in keyof Sources]: TypeOfSource<Sources[Property]> }>;
<Sources extends { [prop: string]: Source<any> }>(sources: Sources): Source<{
[Property in keyof Sources]: TypeOfSource<Sources[Property]>;
}>;
}
export function zip<T>(
sources: Source<T>[] | Record<string, Source<T>>
): Source<T[] | Record<string, T>> {
function zip<T>(sources: Source<T>[] | Record<string, Source<T>>): Source<T[] | Record<string, T>> {
const size = Object.keys(sources).length;

@@ -79,2 +110,26 @@ return sink => {

export { zip };
/** Combines the latest values of all passed sources into a Source issuing tuple values.
*
* @see {@link zip | `zip`} which this helper wraps and uses.
* @param sources - A variadic list of {@link Source} parameters.
* @returns A {@link Source} issuing a zipped value whenever any input Source updates.
*
* @remarks
* `combine` takes one or more {@link Source | Sources} as arguments. Once all input Sources have at
* least issued one value it will issue an array of all of the Sources' values. Subsequently, it
* will issue a new array value whenever any of the Sources update.
*
* @example
*
* ```ts
* pipe(
* combine(fromValue(1), fromValue(2)),
* subscribe(result => {
* console.log(result); // logs [1, 2]
* })
* );
* ```
*/
export function combine<Sources extends Source<any>[]>(

@@ -81,0 +136,0 @@ ...sources: Sources

import { TalkbackFn, TeardownFn, Start, Push, SignalKind } from './types';
/** Placeholder {@link TeardownFn | teardown functions} that's a no-op.
* @see {@link TeardownFn} for the definition and usage of teardowns.
* @internal
*/
export const teardownPlaceholder: TeardownFn = () => {
/*noop*/
};
/** Placeholder {@link TalkbackFn | talkback function} that's a no-op.
* @privateRemarks
* This is frequently used in the codebase as a no-op initializer value for talkback functions in
* the implementation of {@link Operator | Operators}. This is cheaper than initializing the
* variables of talkbacks to `undefined` or `null` and performing an extra check before calling
* them. Since the {@link Start | Start signal} is assumed to come first and carry a talkback, we can
* use this to our advantage and use a no-op placeholder before {@link Start} is received.
*
* @internal
*/
export const talkbackPlaceholder: TalkbackFn = teardownPlaceholder;
/** Wraps the passed {@link TalkbackFn | talkback function} in a {@link Start | Start signal}.
* @internal
*/
export function start<T>(talkback: TalkbackFn): Start<T> {

@@ -14,2 +32,5 @@ const box: any = [talkback];

/** Wraps the passed value in a {@link Push | Push signal}.
* @internal
*/
export function push<T>(value: T): Push<T> {

@@ -16,0 +37,0 @@ const box: any = [value];

@@ -0,1 +1,13 @@

/**
* A tiny but capable push & pull stream library for TypeScript and Flow.
*
* @remarks
* Wonka is a lightweight iterable and observable library and exposes a set of helpers to create
* streams, which are sources emitting multiple values, which allow you to create, transform, and
* consume event streams or iterable sets of data.
*
* It's loosely based on the Callbag spec: {@link https://github.com/callbag/callbag}
* @packageDocumentation
*/
export * from './types';

@@ -2,0 +14,0 @@ export * from './sources';

import { Source, SignalKind, TalkbackKind } from './types';
import { push, start, talkbackPlaceholder } from './helpers';
/** A definition of the ES Observable Subscription type that is returned by
* {@link Observable.subscribe}
*
* @remarks
* The Subscription in ES Observables is a handle that is held while the Observable is actively
* streaming values. As such, it's used to indicate with {@link ObservableSubscription.closed}
* whether it's active, and {@link ObservableSubscription.unsubscribe} may be used to cancel the
* ongoing subscription and end the {@link Observable} early.
*
* @see {@link https://github.com/tc39/proposal-observable} for the ES Observable specification.
*/
interface ObservableSubscription {
/** A boolean flag indicating whether the subscription is closed.
* @remarks
* When `true`, the subscription will not issue new values to the {@link ObservableObserver} and
* has terminated. No new values are expected.
*
* @readonly
*/
closed?: boolean;
/** Cancels the subscription.
* @remarks
* This cancels the ongoing subscription and the {@link ObservableObserver}'s callbacks will
* subsequently not be called at all. The subscription will be terminated and become inactive.
*/
unsubscribe(): void;
}
/** A definition of the ES Observable Observer type that is used to receive data from an
* {@link Observable}.
*
* @remarks
* The Observer in ES Observables is supplied to {@link Observable.subscribe} to receive events from
* an {@link Observable} as it issues them.
*
* @see {@link https://github.com/tc39/proposal-observable#observer} for the ES Observable
* specification of an Observer.
*/
interface ObservableObserver<T> {
/** Callback for the Observable issuing new values.
* @param value - The value that the {@link Observable} is sending.
*/
next(value: T): void;
error(error: any): void;
complete(): void;
/** Callback for the Observable encountering an error, terminating it.
* @param error - The error that the {@link Observable} has encountered.
*/
error?(error: any): void;
/** Callback for the Observable ending, after all values have been issued. */
complete?(): void;
}
/** A looser definition of ES Observable-like types that is used for interoperability.
* @remarks
* The Observable is often used by multiple libraries supporting or creating streams to provide
* interoperability for push-based streams. When converting from an Observable to a {@link Source},
* this looser type is accepted as an input.
*
* @see {@link https://github.com/tc39/proposal-observable} for the ES Observable specification.
* @see {@link Observable} for the full ES Observable type.
*/
interface ObservableLike<T> {
/**
* Subscribes to new signals from an {@link Observable} via callbacks.
* @param observer - An object containing callbacks for the various events of an Observable.
* @returns Subscription handle of type {@link ObservableSubscription}.
*
* @see {@link ObservableObserver} for the callbacks in an object that are called as Observables
* issue events.
*/
subscribe(observer: ObservableObserver<T>): ObservableSubscription;
/** The well-known symbol specifying the default ES Observable for an object. */
[Symbol.observable]?(): Observable<T>;
}
/** An ES Observable type that is a de-facto standard for push-based data sources across the JS
* ecosystem.
*
* @remarks
* The Observable is often used by multiple libraries supporting or creating streams to provide
* interoperability for push-based streams. As Wonka's {@link Source | Sources} are similar in
* functionality to Observables, it provides utilities to cleanly convert to and from Observables.
*
* @see {@link https://github.com/tc39/proposal-observable} for the ES Observable specification.
*/
interface Observable<T> {
/** Subscribes to new signals from an {@link Observable} via callbacks.
* @param observer - An object containing callbacks for the various events of an Observable.
* @returns Subscription handle of type {@link ObservableSubscription}.
*
* @see {@link ObservableObserver} for the callbacks in an object that are called as Observables
* issue events.
*/
subscribe(observer: ObservableObserver<T>): ObservableSubscription;
/** Subscribes to new signals from an {@link Observable} via callbacks.
* @param onNext - Callback for the Observable issuing new values.
* @param onError - Callback for the Observable encountering an error, terminating it.
* @param onComplete - Callback for the Observable ending, after all values have been issued.
* @returns Subscription handle of type {@link ObservableSubscription}.
*/
subscribe(
onNext: (value: T) => any,
onError?: (error: any) => any,
onComplete?: () => any
): ObservableSubscription;
/** The well-known symbol specifying the default ES Observable for an object. */
[Symbol.observable](): Observable<T>;
}
const observableSymbol = (): symbol | string => Symbol.observable || '@@observable';
/** Returns the well-known symbol specifying the default ES Observable.
* @privateRemarks
* This symbol is used to mark an object as a default ES Observable. By the specification, an object
* that abides by the default Observable implementation must carry a method set to this well-known
* symbol that returns the Observable implementation. It's common for this object to be an
* Observable itself and return itself on this method.
*
* @see {@link https://github.com/0no-co/wonka/issues/122} for notes on the intercompatibility
* between Observable implementations.
*
* @internal
*/
const observableSymbol = (): typeof Symbol.observable => Symbol.observable || '@@observable';
export function fromObservable<T>(input: Observable<T>): Source<T> {
input = input[observableSymbol()] ? (input as any)[observableSymbol()]() : input;
/** Converts an ES Observable to a {@link Source}.
* @param input - The {@link ObservableLike} object that will be converted.
* @returns A {@link Source} wrapping the passed Observable.
*
* @remarks
* This converts an ES Observable to a {@link Source}. When this Source receives a {@link Sink} and
* the subscription starts, internally, it'll subscribe to the passed Observable, passing through
* all of the Observable's values. As such, this utility provides intercompatibility converting from
* standard Observables to Wonka Sources.
*
* @throws
* When the passed ES Observable throws, the error is simply re-thrown as {@link Source} does
* not support or expect errors to be handled by streams.
*/
export function fromObservable<T>(input: ObservableLike<T>): Source<T> {
return sink => {
const subscription = input.subscribe({
const subscription = (
input[observableSymbol()] ? input[observableSymbol()]!() : input
).subscribe({
next(value: T) {

@@ -31,4 +154,4 @@ sink(push(value));

},
error() {
/*noop*/
error(error) {
throw error;
},

@@ -44,5 +167,20 @@ });

/** Converts a {@link Source} to an ES Observable.
* @param source - The {@link Source} that will be converted.
* @returns An {@link Observable} wrapping the passed Source.
*
* @remarks
* This converts a {@link Source} to an {@link Observable}. When this Observable is subscribed to, it
* internally subscribes to the Wonka Source and pulls new values. As such, this utility provides
* intercompatibility converting from Wonka Sources to standard ES Observables.
*/
export function toObservable<T>(source: Source<T>): Observable<T> {
return {
subscribe(observer: ObservableObserver<T>) {
subscribe(
next: ObservableObserver<T> | ((value: T) => any),
error?: (error: any) => any | undefined,
complete?: () => any | undefined
) {
const observer: ObservableObserver<T> =
typeof next == 'object' ? next : { next, error, complete };
let talkback = talkbackPlaceholder;

@@ -55,3 +193,3 @@ let ended = false;

ended = true;
observer.complete();
if (observer.complete) observer.complete();
} else if (signal.tag === SignalKind.Start) {

@@ -58,0 +196,0 @@ (talkback = signal[0])(TalkbackKind.Pull);

@@ -7,2 +7,25 @@ import { Source, Sink, Operator, SignalKind, TalkbackKind, TalkbackFn } from './types';

/** Buffers values and emits the array of bufferd values each time a `notifier` Source emits.
*
* @param notifier - A {@link Source} that releases the current buffer.
* @returns An {@link Operator}.
*
* @remarks
* `buffer` will buffer values from the input {@link Source}. When the passed `notifier` Source
* emits, it will emit an array of all buffered values.
*
* This can be used to group values over time. A buffer will only be emitted when it contains any
* values.
*
* @example
* ```ts
* pipe(
* interval(50),
* buffer(interval(100)),
* subscribe(x => {
* console.log(text); // logs: [0], [1, 2], [3, 4]...
* })
* );
* ```
*/
export function buffer<S, T>(notifier: Source<S>): Operator<T, T[]> {

@@ -68,2 +91,26 @@ return source => sink => {

/** Emits in order from the Sources returned by a mapping function per value of the Source.
*
* @param map - A function returning a {@link Source} per value.
* @returns An {@link Operator}.
*
* @remarks
* `concatMap` accepts a mapping function which must return a {@link Source} per value.
* The output {@link Source} will emit values from each Source the function returned, in order,
* queuing sources that aren't yet active.
*
* This can be used to issue multiple values per emission of an input {@link Source}, while keeping
* the order of their outputs consistent.
*
* @example
* ```ts
* pipe(
* fromArray([1, 2]),
* concatMap(x => fromArray([x, x * 2])),
* subscribe(x => {
* console.log(text); // logs: 1, 2, 2, 4
* })
* );
* ```
*/
export function concatMap<In, Out>(map: (value: In) => Source<Out>): Operator<In, Out> {

@@ -149,2 +196,27 @@ return source => sink => {

/** Flattens a Source emitting Sources into a single Source emitting the inner values in order.
*
* @see {@link concatMap} which this helper uses and instead accept a mapping function.
* @param source - An {@link Source} emitting {@link Source | Sources}.
* @returns A {@link Source} emitting values from the inner Sources.
*
* @remarks
* `concatAll` accepts a {@link Source} emitting {@link Source | Sources}.
* The output {@link Source} will emit values from each Source, in order, queuing sources that
* aren't yet active.
*
* @example
* ```ts
* pipe(
* fromArray([
* fromArray([1, 2]),
* fromArray([3, 4]),
* ]),
* concatAll,
* subscribe(x => {
* console.log(text); // logs: 1, 2, 3, 4
* })
* );
* ```
*/
export function concatAll<T>(source: Source<Source<T>>): Source<T> {

@@ -154,2 +226,26 @@ return concatMap<Source<T>, T>(identity)(source);

/** Emits values from the passed sources in order.
*
* @param sources - An array of {@link Source | Sources}.
* @returns A {@link Source} emitting values from the input Sources.
*
* @remarks
* `concat` accepts an array of {@link Source | Sources} and will emit values from them, starting
* with the first one and continuing to the next only when the prior source ended.
*
* This can be used to issue combine sources while keeping the order of their values intact.
*
* @example
* ```ts
* pipe(
* concat([
* fromArray([1, 2]),
* fromArray([3, 4]),
* ]),
* subscribe(x => {
* console.log(text); // logs: 1, 2, 3, 4
* })
* );
* ```
*/
export function concat<T>(sources: Source<T>[]): Source<T> {

@@ -159,2 +255,22 @@ return concatAll(fromArray(sources));

/** Filters out emitted values for which the passed predicate function returns `false`.
*
* @param predicate - A function returning a boolean per value.
* @returns An {@link Operator}.
*
* @remarks
* `filter` will omit values from the {@link Source} for which the passed `predicate` function
* returns `false`.
*
* @example
* ```ts
* pipe(
* fromArray([1, 2, 3]),
* filter(x => x % 2 === 0),
* subscribe(x => {
* console.log(text); // logs: 2
* })
* );
* ```
*/
export function filter<T>(predicate: (value: T) => boolean): Operator<T, T> {

@@ -178,2 +294,22 @@ return source => sink => {

/** Maps emitted values using the passed mapping function.
*
* @param map - A function returning transforming the {@link Source | Source's} values.
* @returns An {@link Operator}.
*
* @remarks
* `map` accepts a transform function and calls it on each emitted value. It then emits
* the values returned by the transform function instead.
*
* @example
* ```ts
* pipe(
* fromArray([1, 2, 3]),
* map(x => x * 2),
* subscribe(x => {
* console.log(text); // logs: 2, 4, 6
* })
* );
* ```
*/
export function map<In, Out>(map: (value: In) => Out): Operator<In, Out> {

@@ -190,2 +326,29 @@ return source => sink =>

/** Emits from the Sources returned by a mapping function per value of the Source.
*
* @param map - A function returning a {@link Source} per value.
* @returns An {@link Operator}.
*
* @remarks
* `mergeMap` accepts a mapping function which must return a {@link Source} per value.
* The output {@link Source} will emit values from all {@link Source | Sources} the mapping function
* returned.
*
* This can be used to issue multiple values per emission of an input {@link Source}, essentially
* multiplexing all values to multiple Sources.
*
* @example
* ```ts
* pipe(
* interval(50),
* mergeMap(x => pipe(
* fromValue(x),
* delay(100)
* )),
* subscribe(x => {
* console.log(text); // logs: 0, 1, 2...
* })
* );
* ```
*/
export function mergeMap<In, Out>(map: (value: In) => Source<Out>): Operator<In, Out> {

@@ -264,2 +427,26 @@ return source => sink => {

/** Flattens a Source emitting Sources into a single Source emitting the inner values.
*
* @see {@link mergeMap} which this helper uses and instead accept a mapping function.
* @param source - An {@link Source} emitting {@link Source | Sources}.
* @returns A {@link Source} emitting values from the inner Sources.
*
* @remarks
* `mergeAll` accepts a {@link Source} which must emit {@link Source | Sources}. It will subscribe
* to each incoming source immediately and start passing its emitted values through.
*
* @example
* ```ts
* pipe(
* fromArray([
* interval(50),
* interval(100),
* ]),
* mergeAll,
* subscribe(x => {
* console.log(text); // logs: 0, 0, 1, 2, 1, 3, 4, 2
* })
* );
* ```
*/
export function mergeAll<T>(source: Source<Source<T>>): Source<T> {

@@ -269,2 +456,26 @@ return mergeMap<Source<T>, T>(identity)(source);

/** Emits values from the passed sources simultaneously.
*
* @param sources - An array of {@link Source | Sources}.
* @returns A {@link Source} emitting values from the input Sources.
*
* @remarks
* `merge` accepts an array of {@link Source | Sources} and will subscribe to all of them, passing
* through all their emitted values simultaneously.
*
* This can be used to interleave the values of multiple sources.
*
* @example
* ```ts
* pipe(
* merge([
* interval(50),
* interval(100),
* ]),
* subscribe(x => {
* console.log(text); // logs: 0, 0, 1, 2, 1, 3, 4, 2
* })
* );
* ```
*/
export function merge<T>(sources: Source<T>[]): Source<T> {

@@ -274,2 +485,25 @@ return mergeAll(fromArray(sources));

/** Calls the passed callback function when the Source ends or is closed.
*
* @param callback - A function that is called when the {@link Source} ends.
* @returns An {@link Operator}.
*
* @remarks
* `onEnd` accepts a callback which is called when the {@link Source} either ends
* or is closed.
*
* This operator can be used to add side-effects to a Source.
*
* @example
* ```ts
* pipe(
* fromArray([1, 2, 3]),
* take(1),
* onEnd(() => {
* console.log('end');
* }),
* publish
* );
* ```
*/
export function onEnd<T>(callback: () => void): Operator<T, T> {

@@ -305,2 +539,24 @@ return source => sink => {

/** Calls the passed callback function when the Source emits a value.
*
* @param callback - A function that is called with each value the {@link Source} emits.
* @returns An {@link Operator}.
*
* @remarks
* `onPush` accepts a callback which is called for every emitted value of
* the {@link Source}.
*
* This operator can be used to add side-effects to a Source.
*
* @example
* ```ts
* pipe(
* fromArray([1, 2, 3]),
* onPush(value => {
* console.log(value); // logs: 1, 2, 3
* }),
* publish
* );
* ```
*/
export function onPush<T>(callback: (value: T) => void): Operator<T, T> {

@@ -331,2 +587,26 @@ return source => sink => {

/** Calls the passed callback function when the Source starts.
*
* @param callback - A function that is called when the {@link Source} is started.
* @returns An {@link Operator}.
*
* @remarks
* `onPush` accepts a callback which is called for every emitted value of
* the {@link Source}.
*
* This operator can be used to add side-effects to a Source.
* Specifically, it's useful to add a side-effect for a Source that triggers only once
* the {@link Source} is used and started.
*
* @example
* ```ts
* pipe(
* fromArray([1, 2, 3]),
* onStart(() => {
* console.log('start');
* }),
* publish
* );
* ```
*/
export function onStart<T>(callback: () => void): Operator<T, T> {

@@ -346,2 +626,28 @@ return source => sink =>

/** Emits the last value the {@link Source} emitted, whenever the notifier Source emits a value.
*
* @param notifier - A {@link Source} that triggers the last value to be emitted.
* @returns An {@link Operator}.
*
* @remarks
* `sample` will store the latest value the {@link Source} emitted. Every time the `notifier` Source
* emits, it will emit the latest value.
*
* This is a back pressure operator that can be used to omit values from a {@link Source} coming in
* too frequently.
*
* {@link Source | Sources} emitting `undefined` are undefined behaviour and these values will be
* ignored.
*
* @example
* ```ts
* pipe(
* interval(50),
* sample(interval(100)),
* subscribe(x => {
* console.log(text); // logs: 0, 2, 4...
* })
* );
* ```
*/
export function sample<S, T>(notifier: Source<S>): Operator<T, T> {

@@ -405,2 +711,28 @@ return source => sink => {

/** Maps emitted values using the passed reducer function.
*
* @param reducer - A function called with the last value by the `reducer` and the emitted value.
* @param seed - The initial value that is passed to the `reducer`.
* @returns An {@link Operator}.
*
* @remarks
* `scan` accepts a reducer function and a seed value. The reducer will be called initially with the
* seed value and the first emitted value. The {@link Source} will then emit the value returned by
* the reducer function. Subsequently, the `reducer` is called with the last value the `reducer`
* returned and the emitted value.
*
* This operator is similar to `Array.prototype.reduce`, but instead is called over time and emits
* each value of the reducer.
*
* @example
* ```ts
* pipe(
* fromArray([1, 2, 3]),
* scan((acc, x) => acc + x, 0),
* subscribe(x => {
* console.log(text); // logs: 1, 3, 6
* })
* );
* ```
*/
export function scan<In, Out>(reducer: (acc: Out, value: In) => Out, seed: Out): Operator<In, Out> {

@@ -421,2 +753,12 @@ return source => sink => {

/** Shares one underlying subscription to the Source between all Sinks.
*
* @param source - A {@link Source} that should be shared.
* @returns A shared {@link Source}.
*
* @remarks
* `share` accepts a {@link Source} and returns one. It will emit all values as normal, however, it
* will share one subscription to the input source. This allows side-effects on the input
* {@link Source} to only be triggerd once.
*/
export function share<T>(source: Source<T>): Source<T> {

@@ -456,2 +798,22 @@ let sinks: Sink<T>[] = [];

/** Omits `wait` amount of values from the Source and then runs as usual.
*
* @param wait - The number of values to be omitted.
* @returns An {@link Operator}.
*
* @remarks
* `skip` will skip `wait` number of emitted values, then issue all values as normal afterwards.
* This essentially skips a given number of values on the input {@link Source}.
*
* @example
* ```ts
* pipe(
* fromArray([1, 2, 3]),
* skip(2),
* subscribe(x => {
* console.log(text); // logs: 3
* })
* );
* ```
*/
export function skip<T>(wait: number): Operator<T, T> {

@@ -476,2 +838,22 @@ return source => sink => {

/** Omits values from an input Source until a notifier Source emits a value.
*
* @param notifier - A {@link Source} that starts the operator's sent values.
* @returns An {@link Operator}.
*
* @remarks
* `skipUntil` will omit all values from the input {@link Source} until the `notifier`
* Source emits a value of its own. It'll then start passing values from the Source through.
*
* @example
* ```ts
* pipe(
* interval(50),
* skipUntil(interval(150)),
* subscribe(x => {
* console.log(text); // logs: 2, 3...
* })
* );
* ```
*/
export function skipUntil<S, T>(notifier: Source<S>): Operator<T, T> {

@@ -533,2 +915,23 @@ return source => sink => {

/** Omits values from an input Source until a predicate function returns `false`.
*
* @param predicate - A function returning a boolean per value.
* @returns An {@link Operator}.
*
* @remarks
* `skipWhile` will omit all values from the input {@link Source} until the `predicate`
* function returns `false`. When the `predicate` function returns `false`, the Source's values will
* be passed through.
*
* @example
* ```ts
* pipe(
* fromArray([1, 2, 3]),
* skipWhile(x => x < 2),
* subscribe(x => {
* console.log(text); // logs: 2, 3
* })
* );
* ```
*/
export function skipWhile<T>(predicate: (value: T) => boolean): Operator<T, T> {

@@ -558,2 +961,27 @@ return source => sink => {

/** Emits from the latest Source returned by a mapping function per value of the Source.
*
* @param map - A function returning a {@link Source} per value.
* @returns An {@link Operator}.
*
* @remarks
* `switchMap` accepts a mapping function which must return a {@link Source} per value.
* The output {@link Source} will emit values from the latest Source the mapping function
* returned. If a value is emitted while the last returned Source is still active, the prior Source
* will be closed.
*
* This can be used to issue multiple values per emission of an input {@link Source}, while only
* letting one of these sub-Sources be active at a time.
*
* @example
* ```ts
* pipe(
* interval(100),
* switchMap(() => interval(50)),
* subscribe(x => {
* console.log(text); // logs: 0, 0, 0...
* })
* );
* ```
*/
export function switchMap<In, Out>(map: (value: In) => Source<Out>): Operator<In, Out> {

@@ -641,2 +1069,25 @@ return source => sink => {

/** Flattens a Source emitting Sources into a single Source emitting the inner values.
*
* @see {@link switchMap} which this helper uses and instead accept a mapping function.
* @param source - An {@link Source} emitting {@link Source | Sources}.
* @returns A {@link Source} emitting values from the inner Sources.
*
* @remarks
* `switchAll` accepts a {@link Source} which must emit {@link Source | Sources}. Each time it
* receives a {@link Source} it will close its prior subscription and subscribe to the new Source
* instead, passing through its values.
*
* @example
* ```ts
* pipe(
* interval(100),
* map(() => interval(50)),
* switchAll,
* subscribe(x => {
* console.log(text); // logs: 0, 0, 0...
* })
* );
* ```
*/
export function switchAll<T>(source: Source<Source<T>>): Source<T> {

@@ -646,2 +1097,22 @@ return switchMap<Source<T>, T>(identity)(source);

/** Emits `max` values from the Source and then ends.
*
* @param max - The maximum number of values emitted.
* @returns An {@link Operator}.
*
* @remarks
* `take` will issue all values as normal until the `max` number of emitted values has been reached.
* It will then end and close the {@link Source}.
*
* @example
* ```ts
* pipe(
* fromArray([1, 2, 3]),
* take(2),
* subscribe(x => {
* console.log(text); // logs: 1, 2
* })
* );
* ```
*/
export function take<T>(max: number): Operator<T, T> {

@@ -690,2 +1161,25 @@ return source => sink => {

/** Buffers the `max` last values of the Source and emits them once the Source ends.
*
* @param max - The maximum number of values buffered.
* @returns An {@link Operator}.
*
* @remarks
* `takeLast` will buffer values from the input {@link Source} up until the given `max` number. It
* will only emit values stored in the buffer once the {@link Source} ends.
*
* All values in the buffer are emitted like the {@link fromArray | `fromArray`} source would
* synchronously.
*
* @example
* ```ts
* pipe(
* fromArray([1, 2, 3]),
* takeLast(1),
* subscribe(x => {
* console.log(text); // logs: 3
* })
* );
* ```
*/
export function takeLast<T>(max: number): Operator<T, T> {

@@ -714,2 +1208,22 @@ return source => sink => {

/** Takes values from an input Source until a notifier Source emits a value.
*
* @param notifier - A {@link Source} that stops the operator's sent values.
* @returns An {@link Operator}.
*
* @remarks
* `takeUntil` will issue all values as normal from the input {@link Source} until the `notifier`
* Source emits a value of its own. It'll then close the {@link Source}.
*
* @example
* ```ts
* pipe(
* interval(50),
* takeUntil(interval(150)),
* subscribe(x => {
* console.log(text); // logs: 0, 1
* })
* );
* ```
*/
export function takeUntil<S, T>(notifier: Source<S>): Operator<T, T> {

@@ -759,2 +1273,23 @@ return source => sink => {

/** Takes values from an input Source until a predicate function returns `false`.
*
* @param predicate - A function returning a boolean per value.
* @returns An {@link Operator}.
*
* @remarks
* `takeWhile` will issue all values as normal from the input {@link Source} until the `predicate`
* function returns `false`. When the `predicate` function returns `false`, the current value is
* omitted and the {@link Source} is closed.
*
* @example
* ```ts
* pipe(
* fromArray([1, 2, 3]),
* takeWhile(x => x < 2),
* subscribe(x => {
* console.log(text); // logs: 1
* })
* );
* ```
*/
export function takeWhile<T>(predicate: (value: T) => boolean): Operator<T, T> {

@@ -784,2 +1319,29 @@ return source => sink => {

/** Debounces a Source by omitting values until a given timeframe has passed.
*
* @param timing - A function returning a debounce time (ms) per emitted value.
* @returns An {@link Operator}.
*
* @remarks
* `debounce` accepts a mapping function that can be used to return a time (in ms) per emitted
* value. All emitted values issued by the {@link Source} during the returned time will be omitted
* until the time has passed.
*
* Debouncing means that the returned {@link Source} will wait for a minimum time of silence until a
* value is let through.
*
* This is a back pressure operator that can be used to omit values from a {@link Source} coming in
* too frequently.
*
* @example
* ```ts
* pipe(
* interval(50),
* debounce(() => 100),
* subscribe(x => {
* console.log(text); // never logs any value
* })
* );
* ```
*/
export function debounce<T>(timing: (value: T) => number): Operator<T, T> {

@@ -826,2 +1388,14 @@ return source => sink => {

/** Delays each signal emitted by a Source by given time (ms).
*
* @param wait - A time (in ms) by which each {@link SignalKind | signal} is delayed.
* @returns An {@link Operator}.
*
* @remarks
* `delay` accepts a time (in ms) by which each {@link SignalKind | signal} will be delayed by.
* This will create a timeout per received signal and delay the emitted values accordingly.
*
* Since the operator only calls `setTimeout` per signal, it relies on the timeout implementation to
* be ordered. Otherwise, signals will arrive in the wrong order at the sink.
*/
export function delay<T>(wait: number): Operator<T, T> {

@@ -846,2 +1420,27 @@ return source => sink => {

/** Throttles a Source by omitting values that are emitted before a given timeout.
*
* @param timing - A function returning a throttle time (ms) per emitted value.
* @returns An {@link Operator}.
*
* @remarks
* `throttle` accepts a mapping function that can be used to return a time (in ms) per emitted
* value. During the returned timeframe all values issued by the {@link Source} will be omitted and
* dropped.
*
* This is a back pressure operator that can be used to omit values from a {@link Source} coming in
* too frequently.
*
* @example
* ```ts
* pipe(
* interval(50),
* throttle(() => 100),
* subscribe(x => {
* // omits every second value: 0, 2, 4...
* console.log(text);
* })
* );
* ```
*/
export function throttle<T>(timing: (value: T) => number): Operator<T, T> {

@@ -848,0 +1447,0 @@ return source => sink => {

@@ -1,2 +0,2 @@

import { Source } from './types';
import { Source, Sink, Operator } from './types';

@@ -7,150 +7,175 @@ interface UnaryFn<T, R> {

/* pipe definitions for source + operators composition */
/** Chain calls operators on a given source and returns the last result.
* @param args - A source, then a variable number of transform functions
*
* @remarks
* The `pipe` utility can be called with a {@link Source} then one or more unary transform functions.
* Each transform function will be called in turn with the last function's return value, starting
* with the source passed as the first argument to `pipe`.
*
* It's used to transform a source with a list of {@link Operator | Operators}. The last argument may
* also be a {@link Sink} that returns something else than a Source.
*
* @example
*
* ```ts
* pipe(
* fromArray([1, 2, 3]),
* map(x => x * 2),
* subscribe(console.log)
* );
* ```
*
* @see {@link https://github.com/tc39/proposal-pipeline-operator} for the JS Pipeline Operator spec, for which this is a replacement utility for.
*/
interface pipe {
/* pipe definitions for source + operators composition */
function pipe<T, A>(source: Source<T>, op1: UnaryFn<Source<T>, Source<A>>): Source<A>;
<T, A>(source: Source<T>, op1: UnaryFn<Source<T>, Source<A>>): Source<A>;
function pipe<T, A, B>(
source: Source<T>,
op1: UnaryFn<Source<T>, Source<A>>,
op2: UnaryFn<Source<A>, Source<B>>
): Source<B>;
<T, A, B>(
source: Source<T>,
op1: UnaryFn<Source<T>, Source<A>>,
op2: UnaryFn<Source<A>, Source<B>>
): Source<B>;
function pipe<T, A, B, C>(
source: Source<T>,
op1: UnaryFn<Source<T>, Source<A>>,
op2: UnaryFn<Source<A>, Source<B>>,
op3: UnaryFn<Source<B>, Source<C>>
): Source<C>;
<T, A, B, C>(
source: Source<T>,
op1: UnaryFn<Source<T>, Source<A>>,
op2: UnaryFn<Source<A>, Source<B>>,
op3: UnaryFn<Source<B>, Source<C>>
): Source<C>;
function pipe<T, A, B, C, D>(
source: Source<T>,
op1: UnaryFn<Source<T>, Source<A>>,
op2: UnaryFn<Source<A>, Source<B>>,
op3: UnaryFn<Source<B>, Source<C>>,
op4: UnaryFn<Source<C>, Source<D>>
): Source<D>;
<T, A, B, C, D>(
source: Source<T>,
op1: UnaryFn<Source<T>, Source<A>>,
op2: UnaryFn<Source<A>, Source<B>>,
op3: UnaryFn<Source<B>, Source<C>>,
op4: UnaryFn<Source<C>, Source<D>>
): Source<D>;
function pipe<T, A, B, C, D, E>(
source: Source<T>,
op1: UnaryFn<Source<T>, Source<A>>,
op2: UnaryFn<Source<A>, Source<B>>,
op3: UnaryFn<Source<B>, Source<C>>,
op4: UnaryFn<Source<C>, Source<D>>,
op5: UnaryFn<Source<D>, Source<E>>
): Source<E>;
<T, A, B, C, D, E>(
source: Source<T>,
op1: UnaryFn<Source<T>, Source<A>>,
op2: UnaryFn<Source<A>, Source<B>>,
op3: UnaryFn<Source<B>, Source<C>>,
op4: UnaryFn<Source<C>, Source<D>>,
op5: UnaryFn<Source<D>, Source<E>>
): Source<E>;
function pipe<T, A, B, C, D, E, F>(
source: Source<T>,
op1: UnaryFn<Source<T>, Source<A>>,
op2: UnaryFn<Source<A>, Source<B>>,
op3: UnaryFn<Source<B>, Source<C>>,
op4: UnaryFn<Source<C>, Source<D>>,
op5: UnaryFn<Source<D>, Source<E>>,
op6: UnaryFn<Source<E>, Source<F>>
): Source<F>;
<T, A, B, C, D, E, F>(
source: Source<T>,
op1: UnaryFn<Source<T>, Source<A>>,
op2: UnaryFn<Source<A>, Source<B>>,
op3: UnaryFn<Source<B>, Source<C>>,
op4: UnaryFn<Source<C>, Source<D>>,
op5: UnaryFn<Source<D>, Source<E>>,
op6: UnaryFn<Source<E>, Source<F>>
): Source<F>;
function pipe<T, A, B, C, D, E, F, G>(
source: Source<T>,
op1: UnaryFn<Source<T>, Source<A>>,
op2: UnaryFn<Source<A>, Source<B>>,
op3: UnaryFn<Source<B>, Source<C>>,
op4: UnaryFn<Source<C>, Source<D>>,
op5: UnaryFn<Source<D>, Source<E>>,
op6: UnaryFn<Source<E>, Source<F>>,
op7: UnaryFn<Source<F>, Source<G>>
): Source<G>;
<T, A, B, C, D, E, F, G>(
source: Source<T>,
op1: UnaryFn<Source<T>, Source<A>>,
op2: UnaryFn<Source<A>, Source<B>>,
op3: UnaryFn<Source<B>, Source<C>>,
op4: UnaryFn<Source<C>, Source<D>>,
op5: UnaryFn<Source<D>, Source<E>>,
op6: UnaryFn<Source<E>, Source<F>>,
op7: UnaryFn<Source<F>, Source<G>>
): Source<G>;
function pipe<T, A, B, C, D, E, F, G, H>(
source: Source<T>,
op1: UnaryFn<Source<T>, Source<A>>,
op2: UnaryFn<Source<A>, Source<B>>,
op3: UnaryFn<Source<B>, Source<C>>,
op4: UnaryFn<Source<C>, Source<D>>,
op5: UnaryFn<Source<D>, Source<E>>,
op6: UnaryFn<Source<E>, Source<F>>,
op7: UnaryFn<Source<F>, Source<G>>,
op8: UnaryFn<Source<G>, Source<H>>
): Source<H>;
<T, A, B, C, D, E, F, G, H>(
source: Source<T>,
op1: UnaryFn<Source<T>, Source<A>>,
op2: UnaryFn<Source<A>, Source<B>>,
op3: UnaryFn<Source<B>, Source<C>>,
op4: UnaryFn<Source<C>, Source<D>>,
op5: UnaryFn<Source<D>, Source<E>>,
op6: UnaryFn<Source<E>, Source<F>>,
op7: UnaryFn<Source<F>, Source<G>>,
op8: UnaryFn<Source<G>, Source<H>>
): Source<H>;
/* pipe definitions for source + operators + consumer composition */
/* pipe definitions for source + operators + consumer composition */
function pipe<T, R>(source: Source<T>, consumer: UnaryFn<Source<T>, R>): R;
<T, R>(source: Source<T>, consumer: UnaryFn<Source<T>, R>): R;
function pipe<T, A, R>(
source: Source<T>,
op1: UnaryFn<Source<T>, Source<A>>,
consumer: UnaryFn<Source<A>, R>
): R;
<T, A, R>(
source: Source<T>,
op1: UnaryFn<Source<T>, Source<A>>,
consumer: UnaryFn<Source<A>, R>
): R;
function pipe<T, A, B, R>(
source: Source<T>,
op1: UnaryFn<Source<T>, Source<A>>,
op2: UnaryFn<Source<A>, Source<B>>,
consumer: UnaryFn<Source<B>, R>
): R;
<T, A, B, R>(
source: Source<T>,
op1: UnaryFn<Source<T>, Source<A>>,
op2: UnaryFn<Source<A>, Source<B>>,
consumer: UnaryFn<Source<B>, R>
): R;
function pipe<T, A, B, C, R>(
source: Source<T>,
op1: UnaryFn<Source<T>, Source<A>>,
op2: UnaryFn<Source<A>, Source<B>>,
op3: UnaryFn<Source<B>, Source<C>>,
consumer: UnaryFn<Source<C>, R>
): R;
<T, A, B, C, R>(
source: Source<T>,
op1: UnaryFn<Source<T>, Source<A>>,
op2: UnaryFn<Source<A>, Source<B>>,
op3: UnaryFn<Source<B>, Source<C>>,
consumer: UnaryFn<Source<C>, R>
): R;
function pipe<T, A, B, C, D, R>(
source: Source<T>,
op1: UnaryFn<Source<T>, Source<A>>,
op2: UnaryFn<Source<A>, Source<B>>,
op3: UnaryFn<Source<B>, Source<C>>,
op4: UnaryFn<Source<C>, Source<D>>,
consumer: UnaryFn<Source<D>, R>
): R;
<T, A, B, C, D, R>(
source: Source<T>,
op1: UnaryFn<Source<T>, Source<A>>,
op2: UnaryFn<Source<A>, Source<B>>,
op3: UnaryFn<Source<B>, Source<C>>,
op4: UnaryFn<Source<C>, Source<D>>,
consumer: UnaryFn<Source<D>, R>
): R;
function pipe<T, A, B, C, D, E, R>(
source: Source<T>,
op1: UnaryFn<Source<T>, Source<A>>,
op2: UnaryFn<Source<A>, Source<B>>,
op3: UnaryFn<Source<B>, Source<C>>,
op4: UnaryFn<Source<C>, Source<D>>,
op5: UnaryFn<Source<D>, Source<E>>,
consumer: UnaryFn<Source<E>, R>
): R;
<T, A, B, C, D, E, R>(
source: Source<T>,
op1: UnaryFn<Source<T>, Source<A>>,
op2: UnaryFn<Source<A>, Source<B>>,
op3: UnaryFn<Source<B>, Source<C>>,
op4: UnaryFn<Source<C>, Source<D>>,
op5: UnaryFn<Source<D>, Source<E>>,
consumer: UnaryFn<Source<E>, R>
): R;
function pipe<T, A, B, C, D, E, F, R>(
source: Source<T>,
op1: UnaryFn<Source<T>, Source<A>>,
op2: UnaryFn<Source<A>, Source<B>>,
op3: UnaryFn<Source<B>, Source<C>>,
op4: UnaryFn<Source<C>, Source<D>>,
op5: UnaryFn<Source<D>, Source<E>>,
op6: UnaryFn<Source<E>, Source<F>>,
consumer: UnaryFn<Source<F>, R>
): R;
<T, A, B, C, D, E, F, R>(
source: Source<T>,
op1: UnaryFn<Source<T>, Source<A>>,
op2: UnaryFn<Source<A>, Source<B>>,
op3: UnaryFn<Source<B>, Source<C>>,
op4: UnaryFn<Source<C>, Source<D>>,
op5: UnaryFn<Source<D>, Source<E>>,
op6: UnaryFn<Source<E>, Source<F>>,
consumer: UnaryFn<Source<F>, R>
): R;
function pipe<T, A, B, C, D, E, F, G, R>(
source: Source<T>,
op1: UnaryFn<Source<T>, Source<A>>,
op2: UnaryFn<Source<A>, Source<B>>,
op3: UnaryFn<Source<B>, Source<C>>,
op4: UnaryFn<Source<C>, Source<D>>,
op5: UnaryFn<Source<D>, Source<E>>,
op6: UnaryFn<Source<E>, Source<F>>,
op7: UnaryFn<Source<F>, Source<G>>,
consumer: UnaryFn<Source<G>, R>
): R;
<T, A, B, C, D, E, F, G, R>(
source: Source<T>,
op1: UnaryFn<Source<T>, Source<A>>,
op2: UnaryFn<Source<A>, Source<B>>,
op3: UnaryFn<Source<B>, Source<C>>,
op4: UnaryFn<Source<C>, Source<D>>,
op5: UnaryFn<Source<D>, Source<E>>,
op6: UnaryFn<Source<E>, Source<F>>,
op7: UnaryFn<Source<F>, Source<G>>,
consumer: UnaryFn<Source<G>, R>
): R;
function pipe<T, A, B, C, D, E, F, G, H, R>(
source: Source<T>,
op1: UnaryFn<Source<T>, Source<A>>,
op2: UnaryFn<Source<A>, Source<B>>,
op3: UnaryFn<Source<B>, Source<C>>,
op4: UnaryFn<Source<C>, Source<D>>,
op5: UnaryFn<Source<D>, Source<E>>,
op6: UnaryFn<Source<E>, Source<F>>,
op7: UnaryFn<Source<F>, Source<G>>,
op8: UnaryFn<Source<G>, Source<H>>,
consumer: UnaryFn<Source<H>, R>
): R;
<T, A, B, C, D, E, F, G, H, R>(
source: Source<T>,
op1: UnaryFn<Source<T>, Source<A>>,
op2: UnaryFn<Source<A>, Source<B>>,
op3: UnaryFn<Source<B>, Source<C>>,
op4: UnaryFn<Source<C>, Source<D>>,
op5: UnaryFn<Source<D>, Source<E>>,
op6: UnaryFn<Source<E>, Source<F>>,
op7: UnaryFn<Source<F>, Source<G>>,
op8: UnaryFn<Source<G>, Source<H>>,
consumer: UnaryFn<Source<H>, R>
): R;
}
function pipe(...args: any[]) {
function pipe(...args: Function[]): any {
let x = args[0];

@@ -157,0 +182,0 @@ for (let i = 1, l = args.length; i < l; i++) x = args[i](x);

import { Source, Subscription, TalkbackKind, SignalKind } from './types';
import { talkbackPlaceholder } from './helpers';
/** Creates a subscription to a given source and invokes a `subscriber` callback for each value.
* @param subscriber - A callback function called for each issued value.
* @returns A function accepting a {@link Source} and returning a {@link Subscription}.
*
* @remarks
* `subscribe` accepts a `subscriber` callback and returns a function accepting a {@link Source}.
* When a source is passed to the returned funtion, the subscription will start and `subscriber`
* will be called for each new value the Source issues. This will also return a {@link Subscription}
* object that can cancel the ongoing {@link Source} early.
*
* @example
* ```ts
* const subscription = pipe(
* fromValue('test'),
* subscribe(text => {
* console.log(text); // 'test'
* })
* );
* ```
*/
export function subscribe<T>(subscriber: (value: T) => void) {

@@ -29,2 +49,23 @@ return (source: Source<T>): Subscription => {

/** Creates a subscription to a given source and invokes a `subscriber` callback for each value.
* @see {@link subscribe} which this helper aliases without returnin a {@link Subscription}.
* @param subscriber - A callback function called for each issued value.
* @returns A function accepting a {@link Source}.
*
* @remarks
* `forEach` accepts a `subscriber` callback and returns a function accepting a {@link Source}.
* When a source is passed to the returned funtion, the subscription will start and `subscriber`
* will be called for each new value the Source issues. Unlike `subscribe` it will not return a
* Subscription object and can't be cancelled early.
*
* @example
* ```ts
* pipe(
* fromValue('test'),
* forEach(text => {
* console.log(text); // 'test'
* })
* ); // undefined
* ```
*/
export function forEach<T>(subscriber: (value: T) => void) {

@@ -36,2 +77,24 @@ return (source: Source<T>): void => {

/** Creates a subscription to a given source and invokes a `subscriber` callback for each value.
* @see {@link subscribe} which this helper aliases without accepting parameters or returning a
* {@link Subscription | Subscription}.
*
* @param source - A {@link Source}.
*
* @remarks
* `publish` accepts a {@link Source} and subscribes to it, starting its values. The resulting
* values cannot be observed and the subscription can't be cancelled, as this helper is purely
* intended to start side-effects.
*
* @example
* ```ts
* pipe(
* lazy(() => {
* console.log('test'); // this is called
* return fromValue(123); // this is never used
* }),
* publish
* ); // undefined
* ```
*/
export function publish<T>(source: Source<T>): void {

@@ -43,2 +106,87 @@ subscribe(_value => {

const doneResult = { done: true } as IteratorReturnResult<void>;
/** Converts a Source to an AsyncIterable that pulls and issues values from the Source.
*
* @param source - A {@link Source}.
* @returns An {@link AsyncIterable | `AsyncIterable`} issuing values from the Source.
*
* @remarks
* `toAsyncIterable` will create an {@link AsyncIterable} that pulls and issues values from a given
* {@link Source}. This can be used in many interoperability situations, to provide an iterable when
* a consumer requires it.
*
* @see {@link https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Iteration_protocols#the_async_iterator_and_async_iterable_protocols}
* for the JS Iterable protocol.
*
* @example
* ```ts
* const iterable = toAsyncIterable(fromArray([1, 2, 3]));
* for await (const value of iterable) {
* console.log(value); // outputs: 1, 2, 3
* }
* ```
*/
export const toAsyncIterable = <T>(source: Source<T>): AsyncIterable<T> => ({
[Symbol.asyncIterator](): AsyncIterator<T> {
const buffer: T[] = [];
let ended = false;
let talkback = talkbackPlaceholder;
let next: ((value: IteratorResult<T>) => void) | void;
source(signal => {
if (ended) {
/*noop*/
} else if (signal === SignalKind.End) {
if (next) next = next(doneResult);
ended = true;
} else if (signal.tag === SignalKind.Start) {
(talkback = signal[0])(TalkbackKind.Pull);
} else if (next) {
next = next({ value: signal[0], done: false });
} else {
buffer.push(signal[0]);
}
});
return {
async next(): Promise<IteratorResult<T>> {
if (ended && !buffer.length) {
return doneResult;
} else if (!ended && buffer.length <= 1) {
talkback(TalkbackKind.Pull);
}
return buffer.length
? { value: buffer.shift()!, done: false }
: new Promise(resolve => (next = resolve));
},
async return(): Promise<IteratorReturnResult<void>> {
if (!ended) next = talkback(TalkbackKind.Close);
ended = true;
return doneResult;
},
};
},
});
/** Subscribes to a given source and collects all synchronous values into an array.
* @param source - A {@link Source}.
* @returns An array of values collected from the {@link Source}.
*
* @remarks
* `toArray` accepts a {@link Source} and returns an array of all synchronously issued values from
* this Source. It will issue {@link TalkbackKind.Pull | Pull signals} after every value it receives
* and expects the Source to recursively issue values.
*
* Any asynchronously issued values will not be
* added to the array and a {@link TalkbackKind.Close | Close signal} is issued by the sink before
* returning the array.
*
* @example
* ```ts
* toArray(fromArray([1, 2, 3])); // [1, 2, 3]
* ```
*/
export function toArray<T>(source: Source<T>): T[] {

@@ -62,2 +210,22 @@ const values: T[] = [];

/** Subscribes to a given source and returns a Promise that will resolve with the last value the
* source issues.
*
* @param source - A {@link Source}.
* @returns A {@link Promise} resolving to the last value of the {@link Source}.
*
* @remarks
* `toPromise` will subscribe to the passed {@link Source} and resolve to the last value of it once
* it receives the last value, as signaled by the {@link SignalKind.End | End signal}.
*
* To keep its implementation simple, padding sources that don't issue any values to `toPromise` is
* undefined behaviour and `toPromise` will issue `undefined` in that case.
*
* The returned {@link Promise} delays its value by a microtick, using `Promise.resolve`.
*
* @example
* ```ts
* toPromise(fromValue('test')); // resolves: 'test'
* ```
*/
export function toPromise<T>(source: Source<T>): Promise<T> {

@@ -69,3 +237,3 @@ return new Promise(resolve => {

if (signal === SignalKind.End) {
resolve(value!);
Promise.resolve(value!).then(resolve);
} else if (signal.tag === SignalKind.Start) {

@@ -72,0 +240,0 @@ (talkback = signal[0])(TalkbackKind.Pull);

@@ -5,6 +5,45 @@ import { Source, Sink, SignalKind, TalkbackKind, Observer, Subject, TeardownFn } from './types';

export function lazy<T>(make: () => Source<T>): Source<T> {
return sink => make()(sink);
/** Helper creating a Source from a factory function when it's subscribed to.
* @param produce - A factory function returning a {@link Source}.
* @returns A {@link Source} lazyily subscribing to the Source returned by the given factory
* function.
*
* @remarks
* At times it's necessary to create a {@link Source} lazily. The time of a {@link Source} being
* created could be different from when it's subscribed to, and hence we may want to split the
* creation and subscription time. This is especially useful when the Source we wrap is "hot" and
* issues values as soon as it's created, which we may then not receive in a subscriber.
*
* @example An example of creating a {@link Source} that issues the timestamp of subscription. Here
* we effectively use `lazy` with the simple {@link fromValue | `fromValue`} source, to quickly
* create a Source that issues the time of its subscription, rather than the time of its creation
* that it would otherwise issue without `lazy`.
*
* ```ts
* lazy(() => fromValue(Date.now()));
* ```
*/
export function lazy<T>(produce: () => Source<T>): Source<T> {
return sink => produce()(sink);
}
/** Converts an AsyncIterable to a Source that pulls and issues values from it as requested.
*
* @see {@link fromIterable | `fromIterable`} for the non-async Iterable version of this helper,
* which calls this helper automatically as needed.
*
* @param iterable - An {@link AsyncIterable | `AsyncIterable`}.
* @returns A {@link Source} issuing values sourced from the Iterable.
*
* @remarks
* `fromAsyncIterable` will create a {@link Source} that pulls and issues values from a given
* {@link AsyncIterable}. This can be used in many interoperability situations, including to consume
* an async generator function.
*
* When the {@link Sink} throws an exception when a new value is pushed, this helper will rethrow it
* using {@link AsyncIterator.throw}, which allows an async generator to recover from the exception.
*
* @see {@link https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Iteration_protocols#the_async_iterator_and_async_iterable_protocols}
* for the JS Iterable protocol.
*/
export function fromAsyncIterable<T>(iterable: AsyncIterable<T>): Source<T> {

@@ -50,2 +89,26 @@ return sink => {

/** Converts an Iterable to a Source that pulls and issues values from it as requested.
* @see {@link fromAsyncIterable | `fromAsyncIterable`} for the AsyncIterable version of this helper.
* @param iterable - An {@link Iterable | `Iterable`} or an `AsyncIterable`
* @returns A {@link Source} issuing values sourced from the Iterable.
*
* @remarks
* `fromIterable` will create a {@link Source} that pulls and issues values from a given
* {@link Iterable | JS Iterable}. As iterables are the common standard for any lazily iterated list
* of values in JS it can be applied to many different JS data types, including a JS Generator
* function.
*
* This Source will only call {@link Iterator.next} on the iterator when the subscribing {@link Sink}
* has pulled a new value with the {@link TalkbackKind.Pull | Pull signal}. `fromIterable` can
* therefore also be applied to "infinite" iterables, without a predefined end.
*
* This helper will call {@link fromAsyncIterable | `fromAsyncIterable`} automatically when the
* passed object also implements the async iterator protocol.
*
* When the {@link Sink} throws an exception when a new value is pushed, this helper will rethrow it
* using {@link Iterator.throw}, which allows a generator to recover from the exception.
*
* @see {@link https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Iteration_protocols#the_iterable_protocol}
* for the JS Iterable protocol.
*/
export function fromIterable<T>(iterable: Iterable<T> | AsyncIterable<T>): Source<T> {

@@ -92,4 +155,28 @@ if (iterable[Symbol.asyncIterator]) return fromAsyncIterable(iterable as AsyncIterable<T>);

/** Creates a Source that issues a each value of a given array synchronously.
* @see {@link fromIterable} which `fromArray` aliases.
* @param array - The array whose values will be issued one by one.
* @returns A {@link Source} issuing the array's values.
*
* @remarks
* `fromArray` will create a {@link Source} that issues the values of a given JS array one by one. It
* will issue values as they're pulled and is hence a "cold" source, not eagerly emitting values. It
* will end and issue the {@link SignalKind.End | End signal} when the array is exhausted of values.
*
* @example
* ```ts
* fromArray([1, 2, 3]);
* ```
*/
export const fromArray: <T>(array: T[]) => Source<T> = fromIterable;
/** Creates a Source that issues a single value and ends immediately after.
* @param value - The value that will be issued.
* @returns A {@link Source} issuing the single value.
*
* @example
* ```ts
* fromValue('test');
* ```
*/
export function fromValue<T>(value: T): Source<T> {

@@ -112,6 +199,35 @@ return sink => {

export function make<T>(produce: (observer: Observer<T>) => TeardownFn): Source<T> {
/** Creates a new Source from scratch from a passed `subscriber` function.
* @param subscriber - A callback that is called when the {@link Source} is subscribed to.
* @returns A {@link Source} created from the `subscriber` parameter.
*
* @remarks
* `make` is used to create a new, arbitrary {@link Source} from scratch. It calls the passed
* `subscriber` function when it's subscribed to.
*
* The `subscriber` function receives an {@link Observer}. You may call {@link Observer.next} to
* issue values on the Source, and {@link Observer.complete} to end the Source.
*
* Your `subscribr` function must return a {@link TeardownFn | teardown function} which is only
* called when your source is cancelled — not when you invoke `complete` yourself. As this creates a
* "cold" source, every time this source is subscribed to, it will invoke the `subscriber` function
* again and create a new source.
*
* @example
*
* ```ts
* make(observer => {
* const frame = requestAnimationFrame(() => {
* observer.next('animate!');
* });
* return () => {
* cancelAnimationFrame(frame);
* };
* });
* ```
*/
export function make<T>(subscriber: (observer: Observer<T>) => TeardownFn): Source<T> {
return sink => {
let ended = false;
const teardown = produce({
const teardown = subscriber({
next(value: T) {

@@ -138,2 +254,19 @@ if (!ended) sink(push(value));

/** Creates a new Subject which can be used as an IO event hub.
* @returns A new {@link Subject}.
*
* @remarks
* `makeSubject` creates a new {@link Subject}. A Subject is a {@link Source} and an {@link Observer}
* combined in one interface, as the Observer is used to send new signals to the Source. This means
* that it's "hot" and hence all subscriptions to {@link Subject.source} share the same underlying
* signals coming from {@link Subject.next} and {@link Subject.complete}.
*
* @example
* ```ts
* const subject = makeSubject();
* pipe(subject.source, subscribe(console.log));
* // This will log the string on the above subscription
* subject.next('hello subject!');
* ```
*/
export function makeSubject<T>(): Subject<T> {

@@ -159,2 +292,9 @@ let next: Subject<T>['next'] | void;

/** A {@link Source} that immediately ends.
* @remarks
* `empty` is a {@link Source} that immediately issues an {@link SignalKind.End | End signal} when
* it's subscribed to, ending immediately.
*
* @see {@link never | `never`} for a source that instead never ends.
*/
export const empty: Source<any> = (sink: Sink<any>): void => {

@@ -174,2 +314,8 @@ let ended = false;

/** A {@link Source} without values that never ends.
* @remarks
* `never` is a {@link Source} that never issues any signals and neither sends values nor ends.
*
* @see {@link empty | `empty`} for a source that instead ends immediately.
*/
export const never: Source<any> = (sink: Sink<any>): void => {

@@ -179,2 +325,19 @@ sink(start(talkbackPlaceholder));

/** Creates a Source that issues an incrementing integer in intervals.
* @param ms - The interval in milliseconds.
* @returns A {@link Source} issuing an incrementing count on each interval.
*
* @remarks
* `interval` will create a {@link Source} that issues an incrementing counter each time the `ms`
* interval expires.
*
* It'll only stop when it's cancelled by a {@link TalkbackKind.Close | Close signal}.
*
* @example
* An example printing `0`, then `1`, and so on, in intervals of 50ms.
*
* ```ts
* pipe(interval(50), subscribe(console.log));
* ```
*/
export function interval(ms: number): Source<number> {

@@ -188,2 +351,23 @@ return make(observer => {

/** Converts DOM Events to a Source given an `HTMLElement` and an event's name.
* @param element - The {@link HTMLElement} to listen to.
* @param event - The DOM Event name to listen to.
* @returns A {@link Source} issuing the {@link Event | DOM Events} as they're issued by the DOM.
*
* @remarks
* `fromDomEvent` will create a {@link Source} that listens to the given element's events and issues
* them as values on the source. This source will only stop when it's cancelled by a
* {@link TalkbackKind.Close | Close signal}.
*
* @example
* An example printing `'clicked!'` when the given `#root` element is clicked.
*
* ```ts
* const element = document.getElementById('root');
* pipe(
* fromDomEvent(element, 'click'),
* subscribe(() => console.log('clicked!'))
* );
* ```
*/
export function fromDomEvent(element: HTMLElement, event: string): Source<Event> {

@@ -196,2 +380,20 @@ return make(observer => {

/** Converts a Promise to a Source that issues the resolving Promise's value and then ends.
* @param promise - The promise that will be wrapped.
* @returns A {@link Source} issuing the promise's value when it resolves.
*
* @remarks
* `fromPromise` will create a {@link Source} that issues the {@link Promise}'s resolving value
* asynchronously and ends immediately after resolving.
*
* This helper will not handle the promise's exceptions, and will cause uncaught errors if the
* promise rejects without a value.
*
* @example
* An example printing `'resolved!'` when the given promise resolves after a tick.
*
* ```ts
* pipe(fromPromise(Promise.resolve('resolved!')), subscribe(console.log));
* ```
*/
export function fromPromise<T>(promise: Promise<T>): Source<T> {

@@ -198,0 +400,0 @@ return make(observer => {

@@ -1,17 +0,78 @@

/** A talkback signal is used to tell a [Source] that either the [Sink] is ready for new values or that the stream should be cancelled */
/**
* Talkback signal that sends instructions from a sink to a source.
*
* @remarks
* This signal is issued via {@link TalkbackFn | talkback functions} that a {@link Sink} receives via
* the {@link Start} signal, to tell a {@link Source} to either send a new value (pulling) or stop
* sending values altogether (cancellation).
*/
export const enum TalkbackKind {
/** Instructs the {@link Source} to send the next value. */
Pull = 0,
/** Instructs the {@link Source} to stop sending values and cancels it. */
Close = 1,
}
/** A talkback callback is sent to the sink with the [Start] signal to communicate signals back to the source. */
/**
* Talkback callback that sends instructions to a source.
*
* @remarks
* This function sends a {@link TalkbackKind} signal to the source to instruct it to send a new value
* (pulling) or to be cancelled and stop sending values altogether.
*/
export type TalkbackFn = (signal: TalkbackKind) => void;
/**
* Callback that is called when a source is cancelled.
*
* @remarks
* This is used, in particular, in the {@link make | make Source} and is a returned function that is
* called when the {@link TalkbackKind.Close} signal is received by the source.
*/
export type TeardownFn = () => void;
/**
* Tag enum that is used to on signals that are sent from a source to a sink.
*
* @remarks
* This signal is issued by a {@link Source} and {@link Sink | Sinks} are called with it. The signals
* carrying values ({@link Start} and {@link Push}) are sent as a unary `[T]` tuple tagged with
* {@link Tag}. The {@link End} signal carries no value and is sent as a raw `0` value.
* @see {@link Start} for the data structure of the start signal.
* @see {@link Push} for the data structure of the push signal, carrying values.
*/
export const enum SignalKind {
/**
* Informs the {@link Sink} that it's being called by a {@link Source}.
*
* @remarks
* This starts the stream of values and carries a {@link TalkbackFn | talkback function} with it
* that is used by the {@link Sink} to communicate back to the {@link Source}.
* @see {@link Start} for the data structure of the signal.
*/
Start = 0,
/**
* Informs the {@link Sink} of a new values that's incoming from the {@link Source}.
*
* @remarks
* This informs the {@link Sink} of new values that are sent by the {@link Source}.
* @see {@link Push} for the data structure of the signal.
*/
Push = 1,
/**
* Informs the {@link Sink} that the {@link Source} has ended and that it won't send more values.
*
* @remarks
* This signal signifies that the stream has stopped and that no more values are expected. Some
* sources don't have a set end or limit on how many values will be sent. This signal is not sent
* when the {@link Source} is cancelled with a {@link TalkbackKind.Close | Close talkback signal}.
*/
End = 0,
}
/**
* The tag property that's put on unary `[T]` tuple to turn them into signals carrying values.
*
* @internal
*/
export interface Tag<T> {

@@ -21,31 +82,122 @@ tag: T;

/** The start [Signal] is the first signal and carries a callback (talkback) so the sink can send signals to the source */
/**
* Indicates the start of a stream to a {@link Sink}.
*
* @remarks
* This signal is sent from a {@link Source} to a {@link Sink} at the start of a stream to inform it
* that values can be pulled and/or will be sent. This signal carries a
* {@link TalkbackFn | talkback function} that is used by the {@link Sink} to communicate back to the
* {@link Source} as a callback. The talkback accepts {@link TalkbackKind.Pull | Pull} and
* {@link TalkbackKind.Close | Close} signals.
*/
export type Start<_T> = Tag<SignalKind.Start> & [TalkbackFn];
/** The Push [Signal] carries new values to the sink, like in an event emitter */
/**
* Sends a new value to a {@link Sink}.
*
* @remarks
* This signal is sent from a {@link Source} to a {@link Sink} to send a new value to it. This is
* essentially the signal that wraps new values coming in, like an event. Values are carried on
* unary tuples and can be accessed using `signal[0]`.
*/
export type Push<T> = Tag<SignalKind.Push> & [T];
/** A signal that communicates new events to a sink. */
/**
* Signals are sent from {@link Source | Sources} to {@link Sink | Sinks} to inform them of changes.
*
* @remarks
* A {@link Source}, when consumed, sends a sequence of events to {@link Sink | Sinks}. In order, a
* {@link SignalKind.Start | Start} signal will always be sent first, followed optionally by one or
* more {@link SignalKind.Push | Push signals}, carrying values and representing the stream. A
* {@link Source} will send the {@link SignalKind.End | End signal} when it runs out of values. The
* End signal will be omitted if the Source is cancelled by a
* {@link TalkbackKind.Close | Close signal}, sent back from the {@link Sink}.
* @see {@link SignalKind} for the kinds signals sent by {@link Source | Sources}.
* @see {@link Start} for the data structure of the start signal.
* @see {@link Push} for the data structure of the push signal.
*/
export type Signal<T> = Start<T> | Push<T> | SignalKind.End;
/** A sink accepts new values from a [Source], like [Push], [Start], and an end signal. The [Start] is used to receive a callback to send talkback signals back to the source. */
/**
* Callback function that is called by a {@link Source} with {@link Signal | Signals}.
*
* @remarks
* A Sink is a function that is called repeatedly with signals from a {@link Source}. It represents
* the receiver of the stream of signals/events coming from a {@link Source}.
* @see {@link Signal} for the data structure of signals.
*/
export type Sink<T> = (signal: Signal<T>) => void;
/** A source is a function that accepts a [Sink] and then starts sending [Signal]s to it. */
/** Factory function that calls {@link Sink | Sinks} with {@link Signal | Signals} when invoked.
* @remarks
* A Source is a factory function that when invoked with a {@link Sink}, calls it with
* {@link Signal | Signals} to create a stream of events, informing it of new values and the
* potential end of the stream of values. The first signal a Source sends is always a
* {@link Start | Start signal} that sends a talkback function to the {@link Sink}, so it may request
* new values or cancel the source.
*
* @see {@link Signal} for the data structure of signals.
* @see {@link Sink} for the data structure of sinks.
*/
export type Source<T> = (sink: Sink<T>) => void;
/** An operator transforms a [Source] and returns a new [Source], potentially with different timings or output types. */
/** Transform function that accepts a {@link Source} and returns a new one.
* @remarks
* Wonka comes with several helper operators that transform a given {@link Source} into a new one,
* potentially changing its outputs, or the outputs' timing. An "operator" in Wonka typically
* accepts arguments and then returns this kind of function, so they can be chained and composed.
*
* @see {@link pipe | `pipe`} for the helper used to compose operators.
*/
export type Operator<In, Out> = (a: Source<In>) => Source<Out>;
/** Extracts the type of a given Source */
/** Type utility to determine the type of a {@link Source}. */
export type TypeOfSource<T> = T extends Source<infer U> ? U : never;
/** Subscription object that can be used to cancel a {@link Source}.
* @see {@link subscribe | subscribe sink} for a helper that returns this structure.
*/
export interface Subscription {
/**
* Cancels a {@link Source} to stop the subscription from receiving new values.
*
* @see {@link TalkbackKind.Close | Close signal} This uses the {@link TalkbackFn | talkback function} to send a {@link TalkbackKind.Close | Close signal}
* to the subscribed-to {@link Source} to stop it from sending new values. This cleans up the subscription
* and ends it immediately.
*/
unsubscribe(): void;
}
/** An Observer represents sending signals manually to a {@link Sink}.
* @remarks
* The Observer is used whenever a utility allows for signals to be sent manually as a {@link Source}
* would send them.
*
* @see {@link make | `make` source} for a helper that uses this structure.
*/
export interface Observer<T> {
/** Sends a new value to the receiving Sink.
* @remarks
* This creates a {@link Push | Push signal} that is sent to a {@link Sink}.
*/
next(value: T): void;
/** Indicates to the receiving Sink that no more values will be sent.
* @remarks
* This creates an {@link SignalKind.End | End signal} that is sent to a {@link Sink}. The Observer
* will accept no more values via {@link Observer.next | `next` calls} once this method has been
* invoked.
*/
complete(): void;
}
/** Subjects combine a {@link Source} with the {@link Observer} that is used to send values on said Source.
* @remarks
* A Subject is used whenever an event hub-like structure is needed, as it both provides the
* {@link Observer}'s methods to send signals, as well as the `source` to receive said signals.
*
* @see {@link makeSubject | `makeSubject` source} for a helper that creates this structure.
*/
export interface Subject<T> extends Observer<T> {
/** The {@link Source} that issues the signals as the {@link Observer} methods are called. */
source: Source<T>;
}

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc