@thi.ng/rstream
![npm (scoped)](https://img.shields.io/npm/v/@thi.ng/rstream.svg)
This project is part of the
@thi.ng/umbrella monorepo.
About
Lightweight reactive multi-tap streams and transducer based
transformation pipeline constructs, written in TypeScript.
About
This library provides & uses three key building blocks for reactive
programming:
- Stream sources: event targets, iterables, timers, promises,
watches, workers, CSP channels, custom...
- Subscriptions: chained stream processors, each subscribable
(one-to-many) itself
- Transducers: stream transformers, either as individual
subscription or to transform values for a single subscription. See
@thi.ng/transducers
for 90+ composable operators.
- Recursive teardown: Whenever possible, any unsubscription
initiates cleanup and propagates to parent(s).
Using these building blocks, a growing number of high-level operations
are provided too:
Stream creation helpers
Stream merging
- merge - unsorted merge from multiple inputs (dynamic add/remove)
- sync - synchronized merge and labeled tuple objects
Useful subscription ops
- bisect - split via predicate
- postWorker - send values to workers (incl. optional worker instantiation)
- resolve - resolve on-stream promises
- sidechainPartition - emits chunks from source, controlled by sidechain stream
- sidechainToggle - toggles source based on signals from sidechain
- trace - debug helper
- transduce - transduce or just reduce an entire stream into a promise
Miscellaneous
- Subscriptions implement @thi.ng/api's
IDeref
interface and therefore
can be used directly in UI components based on
@thi.ng/hdom.
Supporting packages
Conceptual differences to RxJS
(No value judgements implied - there's room for both approaches!)
- Streams are not the same as Observables: I.e. stream sources are NOT
(often just cannot) re-run for each new sub added. Only the first sub
is guaranteed to receive all values. Subs added at a later time
MIGHT not receive earlier emitted values, but only the most recent
emitted and any future values)
- Every subscription supports any number of subscribers, which can be
added/removed at any time
- Every unsubscription recursively triggers upstream unsubscriptions
(provided a parent has no other active child subscriptions)
- Every subscription can have its own transducer transforming
incoming values (possibly into multiple new ones)
- Transducers can create streams themselves (only for
merge()
/
sync()
) - Transducers can cause early stream termination and subsequent unwinding
- Values can be manually injected into the stream pipeline / graph at
any point
- Every Stream also is a subscription
- Unhandled errors in subscriptions will move subscription into error
state and cause unsubscription from parent (if any). Unhandled errors
in stream sources will cancel the stream.
- Much smaller API surface since most common & custom operations can
be solved via available transducers. Therefore less need to provide
specialized functions (map / filter etc.) and more flexibility in
terms of composing new operations.
- IMHO less confusing naming / terminology (only streams (producers) &
subscriptions (consumers))
Installation
yarn add @thi.ng/rstream
Usage examples
There're several examples using this package in the /examples
directory of this repo:
Declarative dataflow graph
Source
| Live version
@thi.ng/hdom benchmark
The FPS counter canvas component used in this benchmark is driven by
this package and based on the barebones version shown below.
Source
| Live version
Basic usage patterns
import * as rs from "@thi.ng/rstream";
import * as tx from "@thi.ng/transducers";
FPS counter
const raf = rs.fromRAF();
raf.subscribe(
{
next(x) {
console.log(x.toFixed(1), "fps");
}
},
tx.comp(
tx.benchmark(),
tx.movingAverage(10),
tx.map(x => 1000 / x)
)
);
raf.subscribe(rs.trace());
setTimeout(()=> raf.done(), 10000);
Stream merging
new rs.StreamMerge({
src: [
rs.fromEvent(document, "mousemove"),
rs.fromEvent(document, "mousedown"),
rs.fromEvent(document, "mouseup"),
]
})
.subscribe(tx.map((e) => [e.type, [e.clientX, e.clientY]]))
.subscribe(rs.trace());
Dataflow graph example
This example uses synchronized stream
merging
to implement a dataflow graph whose leaf inputs (and their changes) are
sourced from a central immutable
atom.
import { Atom } from "@thi.ng/atom/atom";
import { map } from "@thi.ng/transducers";
import * as rs from "@thi.ng/rstream";
const graph = new Atom<any>({
a1: { ports: { a: 1, b: 2 } },
a2: { ports: { b: 10 } },
a3: { ports: { c: 0 } },
});
const adder = (src) =>
rs.sync({
src,
xform: map((ports) => {
let sum = 0;
for (let p in ports) {
sum += ports[p];
}
return sum;
}),
reset: false
});
const a1 = adder([
rs.fromView(graph, "a1.ports.a"),
rs.fromView(graph, "a1.ports.b"),
]);
const a2 = adder([
a1,
rs.fromView(graph, "a2.ports.b"),
rs.fromIterable([0, 1, 2]),
]);
const a3 = adder([a1, a2]);
a3.subscribe(rs.trace("result:"));
setTimeout(() => graph.resetIn("a2.ports.b", 100), 100);
Central app state atom with reactive undo / redo
import * as atom from "@thi.ng/atom";
import * as tx from "@thi.ng/transducers";
const app = new atom.Atom({ ui: { theme: "dark", mode: false}, foo: "bar" });
const theme = new atom.Cursor(app, "ui.theme");
const mode = new atom.Cursor(app, "ui.mode");
rs.fromAtom(theme).subscribe(rs.trace("theme:"));
rs.fromAtom(mode).subscribe(rs.trace("mode:"), tx.map(mode => mode ? "advanced" : "basic"));
rs.fromView(app, "session.user").subscribe(rs.trace("user:"));
const hist = new atom.History(new atom.Cursor(app, "ui"));
hist.record();
theme.reset("light");
hist.record();
mode.swap(mode => !mode);
hist.undo();
hist.undo();
hist.redo();
app.swap((state) => atom.setIn(state, "session.user", "asterix"));
hist.redo();
app.deref();
TODO more to come... see tests for now!
Authors
License
© 2017 - 2018 Karsten Schmidt // Apache Software License 2.0