async-broadcaster
Observable alternative using async iterators
Install
npm i async-broadcaster
Why
It's a common requirement to let users subscribe to new values that are produced asynchronously. Often we reach for an observable library (RxJS, zen-observable, Wonka, etc.) to manage subscriptions and allow broadcasting of new values, but full observable implementations may be overkill for simple subscriptions. Observable also bring along a new API, when JavaScript already has an standard API for consuming asynchronous values: async iterables.
What's needed to enable consumers to use async iterables is a way for producers to make async iterables that can be listened to by multiple listeners and correctly handle buffering, unsubscription, etc.
This is what AsyncBroadcaster does.
AsyncBroadcaster takes a single async iterable and adapts it to allow multiple listeners.
Features
- Supports multiple listeners
- Buffers values for slow listeners
- Efficiently clears the buffer of values seen by all listeners
- Supports AbortSignal for cancelling iteration
- Automatically cleans up when listeners break or return from for/await...of loops
Usage
import {AsyncBroadcaster} from 'async-broadcaster';
async function* makeOneHundred() {
for (let i = 0; i < 100; i++) {
yield i;
}
}
const broadcaster = new AsyncBroadcaster(makeOneHundred());
logValues(broadcaster.iterable);
logValues(broadcaster.iterable);
async function logValues(iterable) {
for await (const o of iterable) {
console.log(o);
}
}
Pairs well with IxJS
Async iterables don't have the functional methods that Array does like map()
, reduce()
, or filter()
, nor the common reactive operators like count()
, zip()
, etc.
These can be imported from a library that works with any async iterable, such as IxJS.
import { filter, map } from 'ix/asynciterable/operators';
const results = from(broadcaster.iterable).pipe(
filter(async x => x % 2 === 0),
map(async x => x * x)
);
for await (let item of results) {
console.log(`Next: ${item}`);
}