[!NOTE]
This is one of 199 standalone projects, maintained as part
of the @thi.ng/umbrella monorepo
and anti-framework.
🚀 Please help me to work full-time on these projects by sponsoring me on
GitHub. Thank you! ❤️
About
Reactive streams & subscription primitives for constructing dataflow graphs / pipelines.
This library provides & uses three key building blocks for reactive programming:
- Stream sources: event targets, iterables, timers, promises,watches,
workers, manual-push...
- Subscriptions: chained stream processors, each subscribable (one-tmany)
itself
- Transducers: stream transformers, either as individual subscription or to
transform incoming values for a single subscription. See packages/transducers)
for 100+ composable operators.
- Recursive teardown: Whenever possible, and depending on configuration,
unsubscriptions initiate cleanup and propagate to parent(s).
- Workers: highly configurable, web worker integration for concurrent /
parallel stream processing (fork-join, tunneled stream processing, etc.)
Conceptual differences to RxJS
(No value judgments implied - there's room for both approaches!)
- Streams are not the same as Observables: I.e. stream sources are NOT (often
just cannot) re-run for each new sub added. Only the first sub is guaranteed
to receive all values. Subs added at a later time MIGHT not receive
earlier emitted values, but only the most recent emitted and any future values
- Every subscription supports any number of subscribers, which can be
added/removed at any time
- Depending on configuration options, every unsubscription recursively triggers
upstream unsubscriptions (provided a parent has no other active child
subscriptions)
- Every subscription can have its own transducer transforming incoming values
(possibly into multiple new ones)
- Transducers can create streams themselves (only for
merge()
/sync()
) - Transducers can cause early stream termination and subsequent unwinding for
its parent and downstream subscriptions.
- Values can be manually injected into the stream pipeline / graph at any point
- Unhandled errors in a subscription will move the subscription into an error
state and cause unsubscription from parent (if any). Unhandled errors in
stream sources will cancel the stream.
- Much smaller API surface, since most common & custom operations can be
solved via available transducers. Therefore there's less of a need to provide
specialized functions (map / filter etc.) and gain more flexibility in terms
of composing new operations.
- IMHO less confusing naming / terminology (only streams (producers) &
subscriptions (consumers))
Status
STABLE - used in production
Search or submit any issues for this package
New features & breaking changes in 9.0.0
The CloseMode
enum has been replaced with a more compact & simple string union
type, see docs and
usage.
New features & breaking changes in 6.0.0
Completely revised & improved error handling, stronger
distinction between .subscribe()
and .transform()
methods & internal
simplification of their implementations.
-
All error handlers now MUST return a boolean to indicate if the error was
recoverable from or should put the subscription into the error state. See
error handling for details.
-
The options given to .transform()
and .map()
can now include an error
handler:
src.transform(xf1, xf2,..., { error: (e) => { ... } });
src.transform({ xform: map(...), error: handleError });
- The
.subscribe(sub, xform, opts)
signature has been removed and the xform
(transducer) must now be given as part of the options object:
import { reactive, trace } from "@thi.ng/rstream";
import { filter } from "@thi.ng/transducers";
const src = reactive(1);
src.subscribe(trace("foo"), filter((x) => x < 10), { id: "child-sub" });
src.subscribe(trace("foo"), { xform: filter((x) => x < 10), id: "child-sub" });
- Added generics for PubSub topics, added
.transformTopic()
and updated signatures for .subscribeTopic()
, both in
similarity to above.
import { pubsub } from "@thi.ng/rstream";
import { map } from "@thi.ng/transducers";
type Event = { id: string; value: any; };
const src = pubsub<Event>({ topic: (e) => e.id });
src.transformTopic("foo", map((e) => e.value), { error: handleError })
Notes:
- (1): If using multiple transducers, they must be pre-composed with
comp()
.
Other signatures of .transform()
method support up to 4 transducers and
composes them automatically.
Support packages
Related packages
- @thi.ng/atom - Mutable wrappers for nested immutable values with optional undo/redo history and transaction support
- @thi.ng/hdom - Lightweight vanilla ES6 UI component trees with customizable branch-local behaviors
- @thi.ng/rdom - Lightweight, reactive, VDOM-less UI/DOM components with async lifecycle and @thi.ng/hiccup compatible
- @thi.ng/transducers - Collection of ~170 lightweight, composable transducers, reducers, generators, iterators for functional data transformations
Installation
yarn add @thi.ng/rstream
ESM import:
import * as rs from "@thi.ng/rstream";
Browser ESM import:
<script type="module" src="https://esm.run/@thi.ng/rstream"></script>
JSDelivr documentation
For Node.js REPL:
const rs = await import("@thi.ng/rstream");
Package sizes (brotli'd, pre-treeshake): ESM: 6.28 KB
Dependencies
Note: @thi.ng/api is in most cases a type-only import (not used at runtime)
Usage examples
55 projects in this repo's
/examples
directory are using this package:
Screenshot | Description | Live demo | Source |
---|
| Interactive image processing (adaptive threshold) | Demo | Source |
| Large ASCII font text generator using @thi.ng/rdom | Demo | Source |
| Figlet-style bitmap font creation with transducers | Demo | Source |
| Interactive & reactive image blurhash generator | Demo | Source |
| Canvas based dial widget | Demo | Source |
| Self-modifying, animated typographic grid with emergent complex patterns | Demo | Source |
| Probabilistic color theme generator | Demo | Source |
| Basic crypto-currency candle chart with multiple moving averages plots | Demo | Source |
| Color palette generation via dominant color extraction from uploaded images | Demo | Source |
| Interactive inverse FFT toy synth | Demo | Source |
| Mouse gesture / stroke analysis, simplification, corner detection | Demo | Source |
| Interactive pattern drawing demo using transducers | Demo | Source |
| Various hdom-canvas shape drawing examples & SVG conversion / export | Demo | Source |
| Canvas based Immediate Mode GUI components | Demo | Source |
| Browser REPL for a Lispy S-expression based mini language | Demo | Source |
| Worker based, interactive Mandelbrot visualization | Demo | Source |
| Markdown to Hiccup to HTML parser / transformer | Demo | Source |
| Mastodon API feed reader with support for different media types, fullscreen media modal, HTML rewriting | Demo | Source |
| Basic rstream-gestures multi-touch demo | Demo | Source |
| Parser grammar livecoding editor/playground & codegen | Demo | Source |
| Interactive pixel sorting tool using thi.ng/color & thi.ng/pixel | Demo | Source |
| RGB waveform image analysis | Demo | Source |
| Live coding playground for 2D geometry generation using @thi.ng/pointfree-lang | Demo | Source |
| Procedural stochastic text generation via custom DSL, parse grammar & AST transformation | Demo | Source |
| Scroll-based, reactive, multi-param CSS animation basics | Demo | Source |
| Demonstates various rdom usage patterns | Demo | Source |
| Minimal rdom-canvas animation | Demo | Source |
| Dynamically loaded images w/ preloader state | Demo | Source |
| Basic usage of the declarative rdom-forms generator | Demo | Source |
| rstream & transducer-based FSM for converting key event sequences into high-level commands | Demo | Source |
| Basic usage of thi.ng/rdom keyed list component wrapper | Demo | Source |
| rdom & hiccup-canvas interop test | Demo | Source |
| Full umbrella repo doc string search w/ paginated results | Demo | Source |
| rdom powered SVG graph with draggable nodes | Demo | Source |
| Responsive image gallery with tag-based Jaccard similarity ranking | Demo | Source |
| Generative audio synth offline renderer and WAV file export | Demo | Source |
| Animated Voronoi diagram, cubic splines & SVG download | Demo | Source |
| Minimal demo of using rstream constructs to form an interceptor-style event loop | Demo | Source |
| Interactive grid generator, SVG generation & export, undo/redo support | Demo | Source |
| rstream based UI updates & state handling | Demo | Source |
| rstream based spreadsheet w/ S-expression formula DSL | Demo | Source |
| Minimal rstream sync() example using rdom | Demo | Source |
| Declarative component-based system with central rstream-based pubsub event bus | Demo | Source |
| Fork-join worker-based raymarch renderer (JS/CPU only) | Demo | Source |
| Fitting, transforming & plotting 10k data points per frame using SIMD | Demo | Source |
| Responsive & reactively computed stacked column layout | Demo | Source |
| SVG path parsing & dynamic resampling | Demo | Source |
| hdom based slide deck viewer & slides from my ClojureX 2018 keynote | Demo | Source |
| thi.ng/rdom & thi.ng/rstream based quiz to guess thi.ng package names | Demo | Source |
| Multi-layer vectorization & dithering of bitmap images | Demo | Source |
| Transducer & rstream based hdom UI updates | Demo | Source |
| Interactive ridge-line plot | Demo | Source |
| Interactive scatter & line plot of low-discrepancy samples | Demo | Source |
| rdom & WebGL-based image channel editor | Demo | Source |
| WebGL cube maps with async texture loading | Demo | Source |
API
Generated API docs
Common configuration options
Since version 3.0.0 all stream and subscription factory functions take an
optional object of common configuration
options with
at least these keys (each optional):
interface CommonOpts {
id: string;
closeIn: CloseMode;
closeOut: CloseMode;
cache: boolean;
}
Stream creation
Stream
Docs: stream()
Creates a new Stream
instance, optionally with given StreamSource
function
and / or ID. If a src
function is provided, the function will be only called
(with the Stream
instance as single argument) once the first subscriber has
attached to the stream. If the function returns another function, it will be
used for cleanup purposes if the stream is cancelled, e.g. if the last
subscriber has unsubscribed. Streams are intended as (primarily async) data
sources in a dataflow graph and are the primary construct for the various
from*()
functions provided by the package. However, streams can also be
triggered manually (from outside the stream), in which case the user should call
stream.next()
to cause value propagation.
import { stream, trace } from "@thi.ng/rstream";
a = stream<number>((s) => {
s.next(1);
s.next(2);
s.done();
});
a.subscribe(trace("a"));
b = stream<number>();
b.subscribe(trace("b1"));
b.subscribe(trace("b2"));
b.next(42);
IDeref support
Stream
(like all other types of Subscription
) implements the @thi.ng/api
IDeref
interface
which provides read access to a stream's last received value. This is useful for
various purposes, e.g. in combination with @thi.ng/hdom, which supports direct
embedding of streams (i.e. their values) into UI components (and will be deref'd
automatically). If the stream has not yet emitted a value or if the stream is
already done, it will deref to undefined
.
Furthermore, all subscription types can be configured (via the cache
option)
to NOT retain their last emitted value, in which case .deref()
will always
return undefined
.
Subscription
Docs: subscription()
Creates a new Subscription
instance, the fundamental datatype & building block
provided by this package (Stream
s are Subscription
s too). Subscriptions can
be:
- linked into directed graphs (if async, not necessarily DAGs)
- transformed using transducers (incl. early termination)
- can have any number of subscribers (optionally each w/ their own transducer)
- recursively unsubscribe themselves from parent after their last subscriber
unsubscribed
- will go into a non-recoverable error state if NONE of the subscribers has an
error handler itself
- implement the @thi.ng/api
IDeref
interface
import { subscription, trace } from "@thi.ng/rstream";
import { filter } from "@thi.ng/transducers";
s = subscription<any, any>();
s.subscribe(trace("s1"));
s.subscribe(trace("s2"), filter((x) => x > 25));
s.next(23);
s.next(42);
Other stream creation helpers
Meta streams
Docs: metaStream()
MetaStream
s are streams of streams. A MetaStream
is a subscription type
which transforms each incoming value into a new stream, subscribes to it (via an
hidden / internal subscription) and then only passes values from that stream to
its own subscribers. If a new value is received, the meta stream first
unsubscribes from the possibly still active stream created from the previous
input, before creating and subscribing to the new stream. Hence this stream type
is useful for cases where streams need to be dynamically and invisibly created &
inserted into an existing dataflow topology without changing it, and with the
guarantee that never more than one of these is active at the same time. Similar
behavior (without the restriction in number) can be achieved using merge()
(see further below).
The user supplied factory
function will be called for each incoming value and
is responsible for creating the new stream instances. If the function returns
null
/ undefined
, no further action will be taken (acts like a filter
transducer, i.e. the incoming value is simply ignored).
import { metastream, fromIterable, trace } from "@thi.ng/rstream";
import { repeat } from "@thi.ng/transducers";
a = metastream<number, string>(
(x) => (x & 1)
? fromIterable(repeat("odd: " + x, 3), { delay: 100 })
: null
);
a.subscribe(trace())
a.next(23)
a.next(42)
a.next(43)
The factory function does NOT need to create new streams, but too can merely
return other existing streams, and so making the meta stream act like a switch /
stream selector.
If the meta stream is the only subscriber to these input streams, you'll need to
use the closeOut: "never"
option when creating the inputs. This keeps them
alive and allows for dynamic switching between them.
import { metastream, fromIterable, trace } from "@thi.ng/rstream";
import { repeat } from "@thi.ng/transducers";
a = fromIterable(
repeat("a"),
{ delay: 1000, closeOut: "never" }
);
b = fromIterable(
repeat("b"),
{ delay: 1000, closeOut: "never" }
);
m = metaStream((x) => x ? a : b);
m.subscribe(trace("meta from: "));
m.next(true);
m.next(false);
m.next(true);
Stream merging
Unordered merge from multiple inputs (dynamic add/remove)
Docs: merge()
Returns a new StreamMerge
instance, a subscription type consuming inputs from
multiple inputs and passing received values on to any subscribers. Input streams
can be added and removed dynamically. By default, StreamMerge
calls done()
when the last active input is done, but this behavior can be overridden via the
closeIn
option.
import { merge, fromIterable, trace } from "@thi.ng/rstream";
merge({
src: [
fromIterable([1, 2, 3], { delay: 10 }),
fromIterable([10, 20, 30], { delay: 21 }),
fromIterable([100, 200, 300], { delay: 7 })
]
}).subscribe(trace());
Use the labeled()
transducer for
each input to create a stream of labeled values and track their provenance:
import { merge, fromIterable, trace } from "@thi.ng/rstream";
import { labeled } from "@thi.ng/transducers";
merge({
src: [
fromIterable([1, 2, 3]).transform(labeled("a")),
fromIterable([10, 20, 30]).transform(labeled("b")),
]
}).subscribe(trace());
See
StreamMergeOpts
for further reference of the various behavior options.
Adding inputs automatically
If the StreamMerge
receives a Subscription
-like value from any of its
inputs, it will not be processed as usual, but instead will be added as new
input to the merge and then automatically remove once that stream is exhausted.
import { merge, stream, fromIterable, trace } from "@thi.ng/rstream";
import { repeat } from "@thi.ng/transducers";
a = stream().map((x) => fromIterable(repeat(x, 3)));
b = fromInterval(1000).map((x) => "b" + x);
merge({ src: [a, b] }).subscribe(trace());
a.next("abc");
Synchronized merge and labeled tuple objects
Docs: sync()
Similar to StreamMerge
above, but with extra synchronization of inputs. Before
emitting any new values, StreamSync
collects values until at least one has
been received from all inputs. Once that's the case, the collected values are
sent as labeled tuple object to downstream subscribers. Each value in the
emitted tuple objects is stored under their input stream's ID. Only the last
value received from each input is passed on. After the initial tuple has been
emitted, you can choose from two possible behaviors:
- Any future change in any input will produce a new result tuple. These tuples
will retain the most recently read values from other inputs. This behavior is
the default and illustrated in the above schematic.
- If the
reset
option is true
, every input will have to provide at least
one new value again until another result tuple is produced.
Any done inputs are automatically removed. By default, StreamSync
calls
done()
when the last active input is done, but this behavior can be overridden
via the closeIn
option.
import { sync, stream, trace } from "@thi.ng/rstream";
const a = stream();
const b = stream();
s = sync<any,any>({ src: { a, b } }).subscribe(trace("result: "));
a.next(1);
b.next(2);
Input streams can be added and removed dynamically and the emitted tuple size
adjusts to the current number of inputs (the next time a value is received from
any input).
If the reset
option is enabled, the last emitted tuple is allowed to be
incomplete, by default. To only allow complete tuples, also set the all
option
to false
.
The synchronization is done via the
partitionSync()
transducer from the
@thi.ng/transducers
package. See this function's docs for further details.
See
StreamSyncOpts
for further reference of the various behavior options.
Stream splitting
Topic based splitting
Docs: pubsub()
Topic based stream splitter. Applies topic
function to each received value and
only forwards it to child subscriptions for returned topic. The actual topic
(return value from topic
fn) can be of any type, apart from undefined
.
Complex topics (e.g objects / arrays) are allowed and they're matched with
registered topics using @thi.ng/equiv by default (but customizable via equiv
option). Each topic can have any number of subscribers.
If a transducer is specified for the PubSub
, it is always applied prior to
passing the input to the topic function. I.e. in this case the topic function
will receive the transformed inputs.
PubSub supports dynamic topic subscriptions and unsubscriptions via
subscribeTopic()
and unsubscribeTopic()
. However, the standard
subscribe()
/ unsubscribe()
methods are NOT supported (since meaningless
here) and will throw an error! unsubscribe()
can only be called WITHOUT
argument to unsubscribe the entire PubSub
instance (incl. all topic
subscriptions) from the parent stream.
Splitting via predicate
Docs: bisect()
Returns a new PubSub
instance using given predicate pred
as boolean topic
function and a
& b
as subscribers for truthy (a
) and falsy b
values.
import { bisect, fromIterable, trace } from "@thi.ng/rstream";
fromIterable([1, 2, 3, 4]).subscribe(
bisect((x) => !!(x & 1), trace("odd"), trace("even"))
);
If a
or b
need to be subscribed to directly, then a
/ b
MUST be first
created as Subscription
(if not already) and a reference kept prior to calling
bisect()
.
import { bisect, fromIterable, subscription, trace } from "@thi.ng/rstream";
const odd = subscription();
const even = subscription();
odd.subscribe(trace("odd"));
odd.subscribe(trace("odd x10"), tx.map((x) => x * 10));
even.subscribe(trace("even"));
fromIterable([1, 2, 3, 4]).subscribe(bisect((x) => !!(x & 1), odd, even));
Side-chaining
Input chunking / buffering, controlled by sidechain
Docs: sidechainPartition()
Buffers values from src
until side chain fires, then emits buffer (unless
empty) and repeats process until either input is done. By default, the value
read from the side chain is ignored, however the optional predicate can be used
to only trigger for specific values / conditions.
import {
merge, fromEvent, fromRAF,
sidechainPartition, trace
} from "@thi.ng/rstream";
sidechainPartition(
merge([
fromEvent(document, "mousemove"),
fromEvent(document, "mousedown"),
fromEvent(document, "mouseup")
]),
fromRAF()
).subscribe(trace());
Since v8.0.0 there's
syncRAF()
,
which allows the above to be simplified to:
import { merge, fromEvent, syncRAF, trace } from "@thi.ng/rstream";
syncRAF(
merge([
fromEvent(document, "mousemove"),
fromEvent(document, "mousedown"),
fromEvent(document, "mouseup")
])
).subscribe(trace());
Input toggling, controlled by sidechain
Docs: sidechainToggle()
Filters values from input based on values received from side chain. By default,
the value read from the side chain is ignored, however the optional predicate
can be used to only trigger for specific values/conditions. Every time the
predicate fn returns true, the filter will be toggled on/off. Whilst switched
off, no input values will be forwarded.
import { sidechainToggle, fromInterval, trace } from "@thi.ng/rstream";
sidechainToggle(fromInterval(500), fromInterval(1000)).subscribe(trace());
...
Input passthrough, controlled by sidechain
Docs: sidechainTrigger()
Buffers the most recent value received and only forwards it downstream whenever
a new control value is received from the sidechain.
import { sidechainTrigger, reactive, stream, trace } from "@thi.ng/rstream";
const src = reactive("payload");
const side = stream();
sidechainTrigger(src, side).subscribe(trace("data:"));
side.next(1);
side.next(1);
src.next("update #1");
src.next("update #2");
side.next(1);
Worker support
Parallel stream processing via workers
Docs: forkJoin()
worker.ts
const $self: Worker = <any>self;
self.addEventListener("message", (e) => {
const { buf, factor } = e.data;
$self.postMessage(buf.map((x) => x * factor));
});
main.ts
import { forkJoin, trace } from "@thi.ng/rstream";
const src = stream<number[]>();
forkJoin({
src: src,
fork: (id, numWorkers, buf) => {
const size = (buf.length / numWorkers) | 0;
return {
buf: id < numWorkers - 1
? buf.slice(id * size, (id + 1) * size)
: buf.slice(id * size),
factor: id * 10
};
},
join: (parts) => <number[]>Array.prototype.concat.apply([], parts),
// worker script
worker: "./worker.js",
// default: navigator.hardwareConcurrency
numWorkers: 4
}).subscribe(trace("results"));
src.next(new Array(16).fill(1));
// result: [0, 0, 0, 0, 10, 10, 10, 10, 20, 20, 20, 20, 30, 30, 30, 30]
Stream processing via workers
Docs: tunnel()
Delegate stream value processing to workers and pass on their responses to
downstream subscriptions. Supports multiple worker instances and worker
termination / restart for each new stream value received.
Docs: postWorker()
Send values to workers (incl. optional (inline) worker instantiation)
Docs: fromWorker()
Create value stream from worker messages.
Other subscription ops
- debounce: ignore high frequency interim values
- resolve: resolve on-stream promises
- trace: debug helper
- transduce: transduce or just reduce an entire stream into a promise
- tween: stream interpolation
Error handling
Detailed information, discussion & diagrams about the new error handling can
be found in this issue
The ISubscriber
interface supports optional error handlers which will be
called if code in the next()
or done()
handlers throws an error. If no error
handler is defined for a subscriber, the wrapping Subscription
's own error
handler will be called, which might put this subscription into an error state
and stop it from receiving new values.
import { subscription, State } from "@thi.ng/rstream";
src = subscription({ next(x) { throw x; } });
src.next(1);
src.getState() === State.ERROR
src.next(2)
src = subscription({
next(x) { throw x; },
error(x) { console.warn("eeek", x); return true; }
});
src.next(1)
src.getState() !== State.ERROR
src.next(2)
Authors
If this project contributes to an academic publication, please cite it as:
@misc{thing-rstream,
title = "@thi.ng/rstream",
author = "Karsten Schmidt and others",
note = "https://thi.ng/rstream",
year = 2017
}
License
© 2017 - 2024 Karsten Schmidt // Apache License 2.0