River
River is a robust high-performance job processing system for Go and Postgres.
See homepage, docs, and godoc, as well as the River UI.
Being built for Postgres, River encourages the use of the same database for
application data and job queue. By enqueueing jobs transactionally along with
other database changes, whole classes of distributed systems problems are
avoided. Jobs are guaranteed to be enqueued if their transaction commits, are
removed if their transaction rolls back, and aren't visible for work until
commit. See transactional enqueueing for more background on this philosophy.
Job args and workers
Jobs are defined in struct pairs, with an implementation of JobArgs
and one
of Worker
.
Job args contain json
annotations and define how jobs are serialized to and
from the database, along with a "kind", a stable string that uniquely identifies
the job.
type SortArgs struct {
Strings []string `json:"strings"`
}
func (SortArgs) Kind() string { return "sort" }
Workers expose a Work
function that dictates how jobs run.
type SortWorker struct {
river.WorkerDefaults[SortArgs]
}
func (w *SortWorker) Work(ctx context.Context, job *river.Job[SortArgs]) error {
sort.Strings(job.Args.Strings)
fmt.Printf("Sorted strings: %+v\n", job.Args.Strings)
return nil
}
Registering workers
Jobs are uniquely identified by their "kind" string. Workers are registered on
start up so that River knows how to assign jobs to workers:
workers := river.NewWorkers()
river.AddWorker(workers, &SortWorker{})
Starting a client
A River Client
provides an interface for job insertion and manages job
processing and maintenance services. A client's created with a database pool,
driver, and config struct containing a Workers
bundle and other settings.
Here's a client Client
working one queue ("default"
) with up to 100 worker
goroutines at a time:
riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{
Queues: map[string]river.QueueConfig{
river.QueueDefault: {MaxWorkers: 100},
},
Workers: workers,
})
if err != nil {
panic(err)
}
if err := riverClient.Start(ctx); err != nil {
panic(err)
}
Insert-only clients
It's often desirable to have a client that'll be used for inserting jobs, but
not working them. This is possible by omitting the Queues
configuration, and
skipping the call to Start
:
riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{
Workers: workers,
})
if err != nil {
panic(err)
}
Workers
can also be omitted, but it's better to include it so River can check
that inserted job kinds have a worker that can run them.
Stopping
The client should also be stopped on program shutdown:
if err := riverClient.Stop(ctx); err != nil {
panic(err)
}
There are some complexities around ensuring clients stop cleanly, but also in a
timely manner. See graceful shutdown for more details on River's stop modes.
Inserting jobs
Client.InsertTx
is used in conjunction with an instance of job args to
insert a job to work on a transaction:
_, err = riverClient.InsertTx(ctx, tx, SortArgs{
Strings: []string{
"whale", "tiger", "bear",
},
}, nil)
if err != nil {
panic(err)
}
See the InsertAndWork
example for complete code.
Other features
Cross language enqueueing
River supports inserting jobs in some non-Go languages which are then worked by Go implementations. This may be desirable in performance sensitive cases so that jobs can take advantage of Go's fast runtime.
Development
See developing River.
Thank you
River was in large part inspired by our experiences with other background job libraries over the years, most notably:
Thank you for driving the software ecosystem forward.