live-set
This class is basically a Set with a subscribe() method that calls your
callback after changes happen to the set of values.
This class represents a set of values which may change over time or have
transformations applied to it, resulting in a new LiveSet. After modifications
are made, notifications will be delivered asynchronously, like Javascript
Promises or
MutationObservers
do. This library is inspired by the Kefir
library and the Observable
proposal, but represents a
changing set rather than a stream of a single updating value.
Like Kefir Observables, LiveSets have active and inactive states and start out
in the inactive state. When a LiveSet first gets a subscriber, the LiveSet
becomes active and calls the listen function provided to it. When a LiveSet no
longer has any subscribers, it becomes deactivated. Unlike Kefir Observables,
events are delivered asynchronously, and a LiveSet instance may have its
current values queried at any time.
Here's an example of a liveset being made to represent the direct children of
an HTMLElement, using a MutationObserver to keep the liveset updated:
import LiveSet from 'live-set';
function createElementChildLiveSet(element) {
return new LiveSet({
read() {
return new Set(Array.from(element.children));
},
listen(setValues, controller) {
setValues(this.read());
function changesHandler(mutations) {
mutations.forEach(mutation => {
Array.prototype.forEach.call(mutation.addedNodes, child => {
if (child.nodeType === 1) controller.add(child);
});
Array.prototype.forEach.call(mutation.removedNodes, child => {
if (child.nodeType === 1) controller.remove(child);
});
});
}
const observer = new MutationObserver(changesHandler);
observer.observe(element, {childList: true});
return {
unsubscribe() {
observer.disconnect();
},
pullChanges() {
changesHandler(observer.takeRecords());
}
};
}
});
}
const bodyChildren = createElementChildLiveSet(document.body);
console.log(bodyChildren.values());
const subscription = bodyChildren.subscribe(changes => {
console.log(changes);
});
subscription.unsubscribe();
The LiveSet instance could then be passed around, read, and subscribed to. New
LiveSets can be created by transforming existing LiveSets. Here's an example
building off of the above example:
import filter from 'live-set/filter';
const bodyChildrenNoDivs = filter(bodyChildren, el => el.nodeName !== 'DIV');
console.log(bodyChildrenNoDivs.values());
import flatMap from 'live-set/flatMap';
const bodyChildrenNoDivsChildren = flatMap(bodyChildrenNoDivs, el => createElementChildLiveSet(el));
Like Kefir Observables, the activation model means that intermediate LiveSets
can be created and consumed without needing to be explicitly cleaned up after
the output liveset is unsubscribed from. Consider the following code in which
several livesets are created from other livesets:
import LiveSet from 'live-set';
import map from 'live-set/map';
import merge from 'live-set/merge';
const i1 = new LiveSet({
read() {
throw new Error('not implemented');
},
listen(setValues, controller) {
setValues(new Set([5]));
let i = 6;
const t = setInterval(() => {
controller.add(i);
i++;
}, 1000);
return () => {
clearInterval(t);
};
}
});
const i2 = map(i1, x => x*10);
const final = merge([
i2,
LiveSet.constant(new Set([1]))
]);
const subscription = final.subscribe({
start() {
console.log('All values', Array.from(final.values()));
},
next(changes) {
console.log('changes', changes);
}
});
setTimeout(() => {
subscription.unsubscribe();
}, 3500);
The ability to read the values of an inactive LiveSet is provided for
convenience, but in some situations it has surprising results if
transformations are not pure (including the case where they instantiate objects
and their identities are depended on). Consider the following code:
import LiveSet from 'live-set';
import map from 'live-set/map';
const input = LiveSet.constant(new Set([5, 6]));
const mapped = map(input, x => ({value: x}));
const firstValue1 = Array.from(mapped.values())[0];
console.log(firstValue1);
const firstValue2 = Array.from(mapped.values())[0];
console.log(firstValue2);
console.log(firstValue1 === firstValue2);
The mapped
LiveSet while inactive does not keep a cache of the results of the
transformed input values. It could only know to remove them if it subscribed
to the input liveset, but that could cause input
to keep resources open. The
mapped
LiveSet will only become active and trigger a subscribtion to input
if it is subscribed to first. Here we subscribe to it with an empty observer to
demonstrate the difference:
import LiveSet from 'live-set';
import map from 'live-set/map';
const input = LiveSet.constant(new Set([5, 6]));
const mapped = map(input, x => ({value: x}));
mapped.subscribe({});
const firstValue1 = Array.from(mapped.values())[0];
const firstValue2 = Array.from(mapped.values())[0];
console.log(firstValue1 === firstValue2);
API
Core
LiveSet::constructor
LiveSet<T>::constructor({scheduler?, read, listen})
The constructor must be passed an object containing read
and listen
functions, and optionally a scheduler
property of the Scheduler type.
The read
function is called if the values() method is called on the LiveSet
instance while it is inactive but not yet ended. The read
function is
expected to return a Set object containing the LiveSet's current values. If a
LiveSet is not intended to be read while inactive, then you should give a
function which throws an error.
The listen
function is called whenever the LiveSet is activated. Activation
occurs whenever the LiveSet goes from zero to one subscribers. Activation may
happen multiple times for a LiveSet if it is unsubscribed from and resubscribed
to. The listen
function is passed two parameters, setValues
and
controller
.
setValues
is a function that must be called with the initial values as a Set
before the passed listen
function ends and before any new subscriptions are
added to the LiveSet being activated.
controller
is an object with three methods, add(value)
, remove(value)
,
error(error: any)
and end()
. These are to be used to modify the LiveSet's
values. Do not modify the Set originally passed to setValues
to manipulate the
LiveSet. end()
may be called to signify that the LiveSet will have no more
changes; the LiveSet will become frozen with its current values at that point.
References to subscribers will be released when a LiveSet is ended. The error
function ends the LiveSet and delivers an error value to any current
subscribers.
The listen
function may return a function to call upon deactivation, or an
object with an unsubscribe
method (to call upon deactivation) and optionally
a pullChanges
method. The pullChanges method will be called to flush any
changes from the source when the values()
method is called on the LiveSet, or
the pullChanges
method is called on a LiveSetSubscription. If the listen
function subscribes to a LiveSet, then it may be useful to have the listen
function return the LiveSetSubscription, which has unsubscribe and pullChanges
methods.
LiveSet.constant
LiveSet.constant<T>(values: Set<T>, options?: Object): LiveSet<T>
This creates a LiveSet with a set of values that will never change. The LiveSet
will start in the ended state, and therefore will never deliver change
notifications or keep references to subscribers.
The optional options
parameter may have a scheduler
property specifying the
Scheduler instance to use.
LiveSet.active
LiveSet.active<T>(initialValues?: Set<T>, options?: Object): {liveSet: LiveSet<T>, controller: LiveSetController<T>}
This is a convenience method to create a LiveSet that starts out in the
activated state and never exits the activated state. The new LiveSet and its
controller (the same type as passed to the listen
callback passed to the
constructor) are returned.
Be warned that this function eschews the normal activation/deactivation
lifecycle of LiveSets. If the LiveSet requires some resource to be held open to
keep it populated, then you will not be able to auto-close the resource when
the LiveSet loses its subscribers. You will have to provide your own mechanism
to close the resource manually if necessary.
The optional options
parameter may have a scheduler
property specifying the
Scheduler instance to use.
This function is inspired by the nonstandard "Promise.defer()" function that
some Promise libraries have implemented.
LiveSet.defaultScheduler
LiveSet.defaultScheduler: Scheduler
This is the Scheduler object used by default for new LiveSets.
LiveSet::isEnded
LiveSet<T>::isEnded(): boolean
This returns whether the LiveSet is in the ended state. LiveSets in the ended
state will never have their values change, deliver any change notifications, or
keep references to their subscribers.
LiveSet::getScheduler
LiveSet<T>::getScheduler(): Scheduler
Retrieve the Scheduler object that a LiveSet was instantiated with. This may be
used so that a new LiveSet can be instantiated with the same Scheduler.
LiveSet::values
LiveSet<T>::values(): Set<T>
This returns a Set containing all of the LiveSet's current values at the time
of the method call. If the LiveSet is modified, then previously-returned Set
objects will not include the modifications. The Set object return by the
values() method must not be modified.
If the LiveSet is currently inactive, then this will trigger the read
function passed to the constructor to be called. If the LiveSet is currently
active, then this will trigger the pullChanges
function returned by the
constructor's listen
function if present.
LiveSet::subscribe
LiveSet<T>::subscribe(observer): LiveSetSubscription
This function is used to subscribe to change notifications from the LiveSet.
The observer parameter must either be an Observer object with optional start
,
next
, error
, and complete
functions, or a function which is treated as an
Observer object with that function as the next
method. The subscribe method
returns a LiveSetSubscription object.
The start
function is called when the subscription first starts, before the
subscribe call has returned, and it is passed a reference to the
LiveSetSubscription object which will be returned. During the start
function,
the LiveSet being subscribed to is guaranteed to be active, so it's a good time
to read the current values of the LiveSet with the values() method.
The next
function is called after any changes have been made to the LiveSet's
set of values. These changes notifications are delivered either asynchronously,
or whenever change notifications are flushed early due to a LiveSet::values()
or LiveSetSubscription::pullChanges()
call. The parameter passed to the
next
function is an array of LiveSetChangeRecord objects.
The error
function is called if the LiveSet is ended by a call to
controller.error
, and it's passed the value passed to the controller.error
method.
The complete
function is called if the LiveSet is ended by a call to
controller.end
.
If either the error
or complete
function is called, then there will be no
more calls to any of the observer's functions after that.
This function is intended to be compatible with the Observable subscribe method
of the Observable proposal.
LiveSetSubscription::closed
LiveSetSubscription::closed: boolean
This is true if the LiveSet has ended, or the subscription has been
unsubscribed from.
LiveSetSubscription::unsubscribe
LiveSetSubscription::unsubscribe(): void
This immediately unsubscribes the subscription. None of the observer functions
will be called after unsubscription.
LiveSetSubscription::pullChanges
LiveSetSubscription::pullChanges(): void
This will cause any queued change notifications to be immediately flushed to
this subscription's observer's next
function. This will not affect other
subscriptions to the LiveSet.
LiveSetChangeRecord
This is a simple object with a type
property and an optional value
property. An array of LiveSetChangeRecord objects is passed to the next
callback of a LiveSet::subscribe call.
{type: 'add', value: T} |
{type: 'remove', value: T} |
{type: 'end'};
Transformations
The following functions usually take a pre-existing LiveSet instance as input,
and return a new LiveSet instance. These functions are implemented in separate
modules rather than as methods of LiveSet in part so that only the functions
used have to be included in a javascript bundle built for browsers.
For all functions below which produce a LiveSet, the LiveSet uses the same
scheduler as the input LiveSet or the same as the first input LiveSet if
multiple are given.
live-set/filter
filter<T>(liveSet: LiveSet<T>, cb: (value: T) => any): LiveSet<T>
This creates a LiveSet that contains only the values of the input liveSet
for which they given callback function returns a truthy value for.
live-set/map
map<T,U>(liveSet: LiveSet<T>, cb: (value: T) => U): LiveSet<U>
This creates a LiveSet that contains the result of cb(value)
for each value
in the input liveSet
instead of the original values. The callback will only
be called for the initial values and when values are added; the callback will
not be called when a value is removed.
The behavior is undefined if the callback returns the same value for distinct
input values present in the input liveSet
at the same time.
live-set/transduce
transduce(liveSet: LiveSet<any>, transducer: Function): LiveSet<any>
This creates a new LiveSet based on a transformation implemented by the given
transducer function. It supports any transducers implementation that follows
the transducer protocol,
for example
jlongster/transducers.js
or
cognitect-labs/transducers-js.
To learn more about transducers please visit those libraries' pages.
Transducers are recommended to be used to replace any sequence of multiple map
or filter function calls. The use of transducers removes the need for
intermediate LiveSets to be created.
Note that each input value from the liveSet
passed to the transducer is
expected to immediately map to zero or more values. This mapping is remembered
so that if the input value is later removed from the input liveSet
, then the
associated output values are all removed from the output LiveSet. This is fine
for any combination of common transducers such as map(cb)
, filter(cb)
, and
take(n)
, but transducers which produce a many-to-one relationship between
values such as partition(n)
will not function in a sensible manner.
The behavior is undefined if the transducer outputs equal values to be present
in the output LiveSet at the same time.
live-set/merge
merge<T>(liveSets: Array<LiveSet<T>>): LiveSet<T>
This function takes an array of LiveSets and returns a single LiveSet
containing all of their values.
The behavior is undefined if multiple input LiveSets contain the same value at
the same time.
live-set/flatMap
flatMap<T,U>(liveSet: LiveSet<T>, cb: (value: T) => LiveSet<U>): LiveSet<U>
This function calls the given callback function for each value in the input
liveSet
, and merges the values of all returned LiveSets into one LiveSet.
When a new value is added to the input liveSet
, then the callback will be
called a new LiveSet's values will be merged in. When a value is removed from
the input liveSet
, then the values from the LiveSet created for that value
will be removed from the output LiveSet.
The behavior is undefined if any of the LiveSets returned by the callback
contain equal values at the same time.
live-set/flatMapR
flatMapR<T,U>(liveSet: LiveSet<T>, cb: (value: T) => LiveSet<U>): LiveSet<U>
This function is the same as flatMap
, but it should be used for fully correct
LiveSetSubscription::pullChanges
behavior in recursive situations where it is
expected that a new item being emitted to the output set will cause a new item
to be added to the input set (and therefore potentially more items to the
output set again).
live-set/mapWithRemoval
mapWithRemoval<T,U>(input: LiveSet<T>, cb: (value: T, removal: Promise<void>) => U): LiveSet<U>
This is similar to the live-set/map function, but the callback is also passed a
promise that will resolve when the value is removed from the input liveSet
.
The LiveSet returned by this function may not have values()
called on it
while it is inactive.
The behavior is undefined if the callback returns the same value for distinct
input values present in the input liveSet
at the same time.
live-set/toValueObservable
toValueObservable<T>(liveSet: LiveSet<T>): Observable<{value: T, removal: Promise<void>}>
This will return an Observable
instance which upon subscription will emit a {value, removal}
object for
every value
currently in the input liveSet
where removal
is a Promise
which will resolve after the value
is removed from the input liveSet
.
live-set/Scheduler
Scheduler::constructor()
A scheduler object has a schedule(callback)
method which schedules a callback
to run asynchronously, and a flush()
method to run all currently scheduled
callbacks early.
Bundling Note
To use this module in browsers, a CommonJS bundler such as Browserify or
Webpack should be used.
LiveSet's code adds additional checks in some places if process.env.NODE_ENV
is not set to "production". If you're using Browserify, then setting the
NODE_ENV environment variable to "production" during build is enough to disable
these checks. Instructions for other bundlers can be found in React's
documentation,
which uses the same convention.
The additional checks make sure that the Set passed to setValues
and the Set
returned from the values()
method are not modified.
Types
Flow type declarations for this module are included!
If you are using Flow, they won't require any configuration to use.