@thi.ng/rstream
Lightweight reactive multi-tap streams and transducer based transformation
pipeline constructs, written in TypeScript.
About
This library provides & uses three key building blocks for reactive programming:
- Stream sources: event targets, iterables, timers, promises, watches, workers, CSP channels, custom...
- Subscriptions: chained stream processors, each subscribable itself
- Transducers: stream transformers, individually or as part of a single subscription, see @thi.ng/transducers.
Using these building blocks, a growing number of high-level operations are provided too:
- stream merging
- pubsub
- sidechain partitioning (emits chunks from source, controlled by sidechain stream)
- sidechain toggle (toggles source based on signals from sidechain)
Furthermore, the
@thi.ng/rstream-log
package provides an extensible multi-level, multi-target logging solution based
on this library.
TODO
Installation
yarn add @thi.ng/rstream
Usage examples
Basic usage patterns
import * as rs from "@thi.ng/rstream";
import * as tx from "@thi.ng/transducers";
FPS counter
const raf = rs.fromRAF();
raf.subscribe(
{
next(x) {
console.log(x.toFixed(1), "fps");
}
},
tx.comp(
tx.benchmark(),
tx.movingAverage(10),
tx.map(x => 1000 / x)
)
);
raf.subscribe(rs.trace());
setTimeout(()=> raf.done(), 10000);
Stream merging
new rs.StreamMerge({
src: [
rs.fromEvent(document, "mousemove"),
rs.fromEvent(document, "mousedown"),
rs.fromEvent(document, "mouseup"),
]
})
.subscribe(tx.map((e) => [e.type, [e.clientX, e.clientY]]))
.subscribe(rs.trace());
Central app state atom with reactive undo / redo
import * as atom from "@thi.ng/atom";
import * as tx from "@thi.ng/transducers";
const app = new atom.Atom({ ui: { theme: "dark", mode: false}, foo: "bar" });
const theme = new atom.Cursor(app, "ui.theme");
const mode = new atom.Cursor(app, "ui.mode");
rs.fromAtom(theme).subscribe(rs.trace("theme:"));
rs.fromAtom(mode).subscribe(rs.trace("mode:"), tx.map(mode => mode ? "advanced" : "basic"));
rs.fromAtom(new atom.Cursor(app, "session.user")).subscribe(rs.trace("user:"));
const hist = new atom.History(new atom.Cursor(app, "ui"));
hist.record();
theme.reset("light");
hist.record();
mode.swap(mode => !mode);
hist.undo();
hist.undo();
hist.redo();
app.swap((state) => atom.setIn(state, "session.user", "asterix"));
hist.redo();
app.deref();
TODO more to come... see tests for now!
Authors
License
© 2017 - 2018 Karsten Schmidt // Apache Software License 2.0