Package collection seeks to provide an expressive and readable way of working with basic data structures in Go. As a former .NET developer, I deeply missed writing programs in the style of Linq. Doing so enables concurrent/ parallel reactive programs to be written in a snap. Go's functional nature enables us to have a very similar, if more verbose, experience. Take for example the scenario of printing the number of Go source files in a directory. Using this package, this takes only a few lines: A directory is a collection of filesystem entries, so we're able to iterate through them using the "Enumerate" function. From there, we filter on only file names that end with ".go". Finally, we print the number of entries that were encountered. This is a trivial example, but imagine building more elaborate pipelines. Maybe take advantage of the `SelectParallel` function which allows multiple goroutines to process a single transform at once, with their results being funnelled into the next phase of the pipeline. Suddenly, injecting new steps can be transparent.
Package async is a library for asynchronous programming. Since Go has already done a great job in bringing green/virtual threads into life, this library only implements a single-threaded Executor type, which some refer to as an async runtime. One can create as many Executors as they like. While Go excels at forking, async, on the other hand, excels at joining. Wanted to execute pieces of code from various goroutines in a single-threaded way? An Executor is designed to be able to run Tasks spawned in various goroutines sequentially. This comes in handy when one wants to do a series of operations on a single thread, for example, to read or update states that are not safe for concurrent access, to write data to the console, to update one's user interfaces, etc. No backpressure alert. Task spawning is designed not to block. If spawning outruns execution, an Executor could easily consume a lot of memory over time. To mitigate, one could introduce a semaphore per hot spot. A Task can be reactive. A Task is spawned with a Coroutine to take care of it. In this user-provided function, one can return a specific Result to tell a Coroutine to watch and await some Events (e.g. Signal, State and Memo, etc.), and the Coroutine can just re-run the Task whenever any of these Events notifies. This is useful when one wants to do something repeatedly. It works like a loop. To exit this loop, just return a Result that ends the Coroutine from within the Task function. Simple. A Coroutine can also switch from one Task to another, just like a state machine can transit from one state to another. This is done by returning another specific Result from within a Task function. A Coroutine can switch from one Task to another until a Task ends it. It's not recommended to have channel operations in an async Task for a Coroutine to do, since they tend to block. For an Executor, if one Coroutine blocks, no other Coroutines can run. So instead of passing data around, one would just handle data in place. One of the advantages of passing data over channels is to be able to reduce allocation. Unfortunately, async Tasks always escape to heap. Any variable they captured also escapes to heap. One should always stay alert and take measures in hot spot, like repeatedly using a same Task. This example demonstrates how to spawn Tasks with different paths. The lower path, the higher priority. This example creates a Task with path "aa" for additional computations and another Task with path "zz" for printing results. The former runs before the latter because "aa" < "zz". This example demonstrates how to chain multiple Tasks together to be worked on in sequence by a Coroutine. This example demonstrates how a Task can conditionally depend on a State. This example demonstrates how a Memo can conditionally depend on a State. This example demonstrates how to add a function call before a Task re-runs, or after a Task ends. This example demonstrates how to end a Task. It creates a Task that prints the value of a State whenever it changes. The Task only prints 0, 1, 2 and 3 because it is ended after 3. This example demonstrates how to use Memos to memoize cheap computations. Memos are evaluated lazily. They take effect only when they are acquired. This example demonstrates how to set up an autorun function to run an Executor in a goroutine automatically whenever a Coroutine is spawned or resumed. This example demonstrates how a Coroutine can switch from one Task to another. This example demonstrates how to yield a Coroutine only for it to resume later with another Task. It computes two values in separate goroutines sequentially, then prints their sum. It showcases what yielding can do, not that it's a useful pattern.
Package collection seeks to provide an expressive and readable way of working with basic data structures in Go. As a former .NET developer, I deeply missed writing programs in the style of Linq. Doing so enables concurrent/ parallel reactive programs to be written in a snap. Go's functional nature enables us to have a very similar, if more verbose, experience. Take for example the scenario of printing the number of Go source files in a directory. Using this package, this takes only a few lines: A directory is a collection of filesystem entries, so we're able to iterate through them using the "Enumerate" function. From there, we filter on only file names that end with ".go". Finally, we print the number of entries that were encountered. This is a trivial example, but imagine building more elaborate pipelines. Maybe take advantage of the `SelectParallel` function which allows multiple goroutines to process a single transform at once, with their results being funnelled into the next phase of the pipeline. Suddenly, injecting new steps can be transparent.
Package rx provides Reactive Extensions for Go, an API for asynchronous programming with observable streams.
Package rx is a reactive programming library for Go, inspired by https://reactivex.io/ (mostly RxJS). An Observer is a consumer of notifications delivered by an Observable. An Observer is usually created and passed to Observable.Subscribe method when subscribing to an Observable. An Observable is a collection of future values, waiting to become a flow of data. Subscribing an Observer to an Observable makes it happen. When an Observable is subscribed, its values, when available, are emitted to a given Observer. There are four kinds of notifications: Both Error notifications and Stop notifications carry an error value. The main differences between them are as follows: An Operator is an operation on an Observable. When applied, they do not change the existing Observable value. Instead, they return a new one, whose subscription logic is based on the first Observable. There are many kinds of Operators in this library. Here is a list of what Operators can do: Previously, Operator was also a function type like Observable and Observer. It was changed to be an interface type for one reason: implementations can carry additional methods for setting extra options. For example, MergeMap has two extra options: MergeMapOperator.WithBuffering and MergeMapOperator.WithConcurrency, and this is how they are specified when using a MergeMap: To chain multiple Operators, do either this: or this: There are 9 Pipe functions in this library, from Pipe1 to Pipe9. For different number of Operators, use different Pipe function. When there are really too many Operators to chain, do either this: or this: There are 8 Compose functions in this library, from Compose2 to Compose9. Notifications emitted by an Observable may come from any started goroutine, but they are guaranteed to be in sequence, one after another. Operators in a chain may run in different goroutines. In the following code: Race conditions could happen for any two of ob, op1, op2, op3 and o. Race conditions could also happen for any two Observables, however, not every Operator or Observable has concurrent behavior. The following operations may cause concurrent behavior: The following operations may cause concurrent behavior due to Context cancellation: Since Context cancellations are very common in this library, and that a Context cancellation usually results in a Stop notification, emitted in a goroutine started by Context.AfterFunc or Context.Go, handling Stop notifications must take extra precaution. The problem is that, Stop notifications are not deterministic. They may just come from random goroutines. If that happens, one would have to deal with race conditions. It's very common that an Observable, when subscribed, also subscribes to other Observables. In this library, inner Observables are usually subscribed in the same goroutine where the outer one is being subscribed. However, When in doubt, read the code. Multicasts and Unicasts are special Observables that developers can decide what values they produce or when they complete, later after they are subscribed. Multicasts can be subscribed multiple times, whereas Unicasts can only be successfully subscribed once. Both Multicasts and Unicasts are safe for concurrent use. Here is an example demonstrates how to use a Unicast.
Package gopher_pipes provides utilities for reactive programming in golang