Conduits!
RxJS ReplaySubjects with with additional features to make reactive programming easier.
Dude just use signals???
- Angular's use of RxJS goes back further, and signals still aren't supported everywhere yet.
- Some applications are still better-suited for RxJS.
Features
- 🔄 Late Subscriber Catch-Up: Never miss a value again!
- 💪 Flexible: Easy to use even with minimal planning done in advance!
- 🎯 Type-Safe: Full TypeScript support!
- 🛠 Framework Compatible: Use the subclasses like
NgConduit
for easy cleanup on component destruction!
Installation
npm install rxjs-conduit
Usage
Basic Vanilla Usage
import { Conduit } from 'rxjs-conduit/vanilla';
const source = new Conduit<string>();
source.then(value => console.log(`${value} world`));
source.next("hello");
source.next("hi");
source.subscribe(value => console.log(`${value} npm!`));
source.next("bye");
source.complete();
Angular Integration
import { Component, ViewChild, Input } from '@angular/core';
import { interval } from 'rxjs'
import { NgConduit } from 'rxjs-conduit/angular';
type Thing = {
data$: Conduit<string>
}
@Component({
selector: 'app-thing-view',
templateUrl: './thing-view.component.html',
styleUrls: ['./thing-view.component.scss']
})
export class ThingViewComponent {
@ViewChild(ChildComponent)
protected set child(c: ChildComponent){ _child$.next(c); }
protected get child(){ return _child$.value as ChildComponent }
private _child$ = new NgConduit<ChildComponent>();
@Input()
public set thing(t: Thing){ this._thing$.next(t) }
public get thing(){ return this._thing$.value as Thing }
private _thing$ = new NgConduit<Thing>();
protected thingData$ = _thing$.inner( (thing) => thing.data$ );
constructor(){
_child$.then( child => child.doSomething() );
}
}
API: Conduit
(Things that Subject
can already do are omitted.)
new Conduit(first?: T)
Creates a new conduit, optionally with an initial value.
Conduit.derived(sources, formula): ReadonlyConduit
Creates a conduit whose value is derived using a formula and a set of source conduits.
Completes when all sources complete and errors if any source errors.
Conduit.from(source, first?): Conduit
Creates a conduit from an Observable.
subscribe(observer): Subscription
Adds a subscription to this conduit, which will receive reactive updates.
inner(getter): Conduit
Creates a special "proxy" conduit based on the getter and value(s) of this conduit.
bind(that, conversion...): Unsubscribable
Creates a two-way binding between a pair of conduits.
then(callback): Subscription
Similar to subscribe
, but it only runs once then cleans up the subscription.
unsubscribe(): void
Unsubscribes this conduit from any sources it might be watching.
splice(source): Unsubscribable
Adds a source to this conduit, which will feed it values reactively until it completes.
sealed: boolean
True after it completes or errors.
value: T
Returns the most recent value, or Conduit.EMPTY
if there isn't one yet.
valueOrDefault(fallback): T
Returns the conduit's value, or returns the fallback if it's empty.
valueOrThrow(reason): T
Returns the conduit's value, or throws the reason if it's empty.
flush(): void
Sets the conduit back to Conduit.EMPTY
without notifying subscribers.
Bonus API: Gate
new Gate()
Creates a semaphore.
run(section): T
Runs the section if the gate isn't currently running anything else.
Returns the result, or Gate.BLOCKED
if it didn't run.
wrap(callback): T
Gets a copy of the callback that won't call itself.
The copy returns original result, or Gate.BLOCKED
if it didn't run.
open: boolean
Is the gate ready to run something?
License
MIT License.
Copyright © 2025; Fasteroid
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.