rx
import "github.com/reactivego/rx"
Package rx
provides Reactive Extensions for Go, an API for asynchronous programming with observables and operators.
Our intellectual powers are rather geared to master static relations and our powers to visualize processes evolving in time are relatively poorly developed.
For that reason we should do our utmost to shorten the conceptual gap between the static program and the dynamic process, to make the correspondence between the program (spread out in text space) and the process (spread out in time) as trivial as possible.
Edsger W. Dijkstra, March 1968
Prerequisites
A working Go environment on your system.
Installation
You can use this package directly without installing anything.
Optionally, install the jig tool in order to regenerate this package.
The jig tool is also needed to support using the rx/generic
sub-package as a generics library.
$ go get github.com/reactivego/jig
The jig tool provides the parametric polymorphism capability that Go 1 is missing.
It works by generating code, replacing place-holder types in generic functions and datatypes with specific types.
Usage
This package can be used either directly as a standard package or as a generics library for Go 1.
Standard Package
To use it as a standard package, import the root rx
package to access the API directly.
package main
import "github.com/reactivego/rx"
func main() {
rx.From(1,2,"hello").Println()
}
For more information see the Go Package Documentation. In this document there is a selection of the most used operators and there is also a complete list of operators in a separate document.
Generics Library
To use it as a Generics Library for Go 1, import the rx/generic
package.
Generic programming supports writing programs that use statically typed Observables and Operators.
For example:
package main
import _ "github.com/reactivego/rx/generic"
func main() {
FromString("Hello", "Gophers!").Println()
}
Note that FromString is statically typed.
Generate statically typed code by running the jig tool.
$ jig -v
found 169 templates in package "rx" (github.com/reactivego/rx/generic)
...
Details on how to use generics can be found in the generic folder.
Observables
The main focus of rx
is Observables.
In accordance with Dijkstra's observation that we are ill-equipped to visualize how dynamic processes evolve over time, the use of Observables indeed narrows the conceptual gap between the static program and the dynamic process. Observables have a dynamic process at their core because they are defined as values which change over time. They are combined in static relations with the help of many different operators. This means that at the program level (spread out in the text space) you no longer have to deal explicitly with dynamic processes, but with more tangible static relations.
An Observable:
- is a stream of events.
- assumes zero to many values over time.
- pushes values
- can take any amount of time to complete (or may never)
- is cancellable
- is lazy (it doesn't do anything until you subscribe).
This package uses interface{}
for value types, so an observable can emit a
mix of differently typed values. To create an observable that emits three
values of different types you could write the following little program.
package main
import "github.com/reactivego/rx"
func main() {
rx.From(1,"hi",2.3).Println()
}
Note the program creates a mixed type observable from an int, string and a float.
Observables in rx
are somewhat similar to Go channels but have much richer
semantics:
Observables can be hot or cold. A hot observable will try to emit values even
when nobody is subscribed. Values emitted during that period will be lost.
The position of a mouse pointer or the current time are examples of hot observables.
A cold observable will only start emitting values after somebody subscribes.
The contents of a file or a database are examples of cold observables.
An observable can complete normally or with an error, it uses subscriptions
that can be canceled from the subscriber side. Where a normal variable is
just a place where you read and write values from, an observable captures how
the value of this variable changes over time.
Concurrency flows naturally from the fact that an observable is an ever
changing stream of values. Every Observable conceptually has at its core a
concurrently running process that pushes out values.
Operators
Operators form a language in which programs featuring Observables can be expressed.
They work on one or more Observables to transform, filter and combine them into new Observables.
A complete list of operators is available in a separate document.
Below is a selection of the most commonly used ones:
BufferTime
buffers the source Observable values for a specific time period and emits those as a
slice periodically in time.
Code:
const ms = time.Millisecond
source := rx.Timer(0*ms, 100*ms).Take(4).ConcatMap(func(i interface{}) rx.Observable {
switch i.(int) {
case 0:
return rx.From("a", "b")
case 1:
return rx.From("c", "d", "e")
case 3:
return rx.From("f", "g")
}
return rx.Empty()
})
source.BufferTime(100 * ms).Println()
Output:
[a b]
[c d e]
[]
[f g]
Catch
recovers from an error notification by continuing the sequence without
emitting the error but by switching to the catch ObservableInt to provide
items.
Code:
const problem = rx.RxError("problem")
rx.From(1, 2, 3).ConcatWith(rx.Throw(problem)).Catch(rx.From(4, 5)).Println()
Output:
1
2
3
4
5
CatchError
catches errors on the Observable to be handled by returning a
new Observable or throwing an error. It is passed a selector function
that takes as arguments err, which is the error, and caught, which is the
source observable, in case you'd like to "retry" that observable by
returning it again. Whatever observable is returned by the selector will be
used to continue the observable chain.
Code:
const problem = rx.RxError("problem")
catcher := func(err error, caught rx.Observable) rx.Observable {
if err == problem {
return rx.From(4, 5)
} else {
return caught
}
}
rx.From(1, 2, 3).ConcatWith(rx.Throw(problem)).CatchError(catcher).Println()
Output:
1
2
3
4
5
CombineLatest
will subscribe to all Observables. It will then wait for all of
them to emit before emitting the first slice. Whenever any of the subscribed
observables emits, a new slice will be emitted containing all the latest value.
Concat
emits the emissions from two or more observables without interleaving them.
ConcatMap
transforms the items emitted by an Observable by applying a
function to each item and returning an Observable. The stream of
Observable items is then flattened by concattenating the emissions from
the observables without interleaving.
ConcatWith
emits the emissions from two or more observables without interleaving them.
Create
provides a way of creating an Observable from scratch by calling observer
methods programmatically.
The create function provided to Create will be called once
to implement the observable. It is provided with a Next, Error,
Complete and Canceled function that can be called by the code that
implements the Observable.
DebounceTime
only emits the last item of a burst from an Observable if a
particular timespan has passed without it emitting another item.
DistinctUntilChanged
only emits when the current value is different from the last.
The operator only compares emitted items from the source Observable against their immediate
predecessors in order to determine whether or not they are distinct.
Code:
rx.From(1, 2, 2, 1, 3).DistinctUntilChanged().Println()
Output:
1
2
1
3
Do
calls a function for each next value passing through the observable.
Filter
emits only those items from an observable that pass a predicate test.
From
creates an observable from multiple values passed in.
Just
creates an observable that emits a particular item.
Map
transforms the items emitted by an Observable by applying a
function to each item.
Merge
combines multiple Observables into one by merging their emissions.
An error from any of the observables will terminate the merged observables.
Code:
a := rx.From(0, 2, 4)
b := rx.From(1, 3, 5)
rx.Merge(a, b).Println()
Output:
0
1
2
3
4
5
MergeMap
transforms the items emitted by an Observable by applying a
function to each item an returning an Observable. The stream of Observable
items is then merged into a single stream of items using the MergeAll
operator.
This operator was previously named FlatMap. The name FlatMap is deprecated as
MergeMap more accurately describes what the operator does with the observables
returned from the Map project function.
MergeWith
combines multiple Observables into one by merging their emissions.
An error from any of the observables will terminate the merged observables.
Code:
a := rx.From(0, 2, 4)
b := rx.From(1, 3, 5)
a.MergeWith(b).Println()
Output:
0
1
2
3
4
5
Of
emits a variable amount of values in a sequence and then emits a complete
notification.
Publish
returns a Multicaster for a Subject to an underlying Observable
and turns the subject into a connnectable observable.
A Subject emits to an observer only those items that are emitted by the
underlying Observable subsequent to the time the observer subscribes.
When the underlying Obervable terminates with an error, then subscribed
observers will receive that error. After all observers have unsubscribed due
to an error, the Multicaster does an internal reset just before the next
observer subscribes. So this Publish operator is re-connectable, unlike the
RxJS 5 behavior that isn't. To simulate the RxJS 5 behavior use
Publish().AutoConnect(1) this will connect on the first subscription but will
never re-connect.
PublishReplay
returns a Multicaster for a ReplaySubject to an underlying
Observable and turns the subject into a connectable observable. A
ReplaySubject emits to any observer all of the items that were emitted by
the source observable, regardless of when the observer subscribes. When the
underlying Obervable terminates with an error, then subscribed observers
will receive that error. After all observers have unsubscribed due to an
error, the Multicaster does an internal reset just before the next observer
subscribes.
Scan
applies a accumulator function to each item emitted by an Observable and
the previous accumulator result.
The operator accepts a seed argument that is passed to the accumulator for the
first item emitted by the Observable. Scan emits every value, both intermediate
and final.
StartWith
returns an observable that, at the moment of subscription, will
synchronously emit all values provided to this operator, then subscribe to
the source and mirror all of its emissions to subscribers.
Code:
rx.From(2, 3).StartWith(1).Println()
Output:
1
2
3
SwitchMap
transforms the items emitted by an Observable by applying a function
to each item an returning an Observable.
In doing so, it behaves much like MergeMap (previously FlatMap), except that
whenever a new Observable is emitted SwitchMap will unsubscribe from the
previous Observable and begin emitting items from the newly emitted one.
Take
emits only the first n items emitted by an Observable.
TakeUntil
emits items emitted by an Observable until another Observable emits an item.
WithLatestFrom
will subscribe to all Observables and wait for all of them to emit before emitting
the first slice. The source observable determines the rate at which the values are emitted. The idea
is that observables that are faster than the source, don't determine the rate at which the resulting
observable emits. The observables that are combined with the source will be allowed to continue
emitting but only will have their last emitted value emitted whenever the source emits.
Note that any values emitted by the source before all other observables have emitted will
effectively be lost. The first emit will occur the first time the source emits after all other
observables have emitted.
Code:
a := rx.From(1,2,3,4,5)
b := rx.From("A","B","C","D","E")
a.WithLatestFrom(b).Println()
Output:
[2 A]
[3 B]
[4 C]
[5 D]
Concurency
The rx
package does not use any 'bare' go routines internally. Concurrency is tightly controlled by the use of a specific scheduler.
Currently there is a choice between 2 different schedulers; a trampoline schedulers and a goroutine scheduler.
By default all subscribing operators except ToChan
use the trampoline scheduler. A trampoline puts tasks on a task queue and only starts processing them when the Wait
method is called on a returned subscription. The subscription itself calls the Wait
method of the scheduler.
Only the Connect
and Subscribe
methods return a subscription. The other subscribing operators Println
, ToSingle
, ToSlice
and Wait
are blocking by default and only return when the scheduler returns after wait. In the case of ToSingle
and ToSlice
both a value and error are returned and in the case of Println
and Wait
just an error is returned or nil when the observable completed succesfully.
The only operator that uses the goroutine scheduler by default is the ToChan
operator. ToChan
returns a channel that it feeds by subscribing on the goroutine scheduler.
To change the scheduler on which subscribing needs to occur, use the SubscribeOn operator
Regenerating this Package
This package is generated from generics in the sub-folder generic
by the jig tool.
You don't need to regenerate this package in order to use it. However, if you are interested in regenerating it, then read on.
If not already done so, install the jig tool.
$ go get github.com/reactivego/jig
Change the current working directory to the package directory and run the jig tool as follows:
$ jig -v
This works by replacing place-holder types of generic functions and datatypes with the interface{}
type.
Acknowledgements
This library started life as the Reactive eXtensions for Go library by Alec Thomas. Although the library has been through the metaphorical meat grinder a few times, its DNA is still clearly present in this library and I owe Alec a debt of grattitude for the work he has made so generously available.
This implementation takes most of its cues from RxJS.
It is the ReactiveX incarnation that pushes the envelope in evolving operator semantics.
License
This library is licensed under the terms of the MIT License. See LICENSE file for copyright notice and exact wording.