
Company News
Socket Has Acquired Secure Annex
Socket has acquired Secure Annex to expand extension security across browsers, IDEs, and AI tools.
River is a robust high-performance job processing system for Go and Postgres.
See homepage, docs, and godoc.
Being built for Postgres, River encourages the use of the same database for application data and job queue. By enqueueing jobs transactionally along with other database changes, whole classes of distributed systems problems are avoided. Jobs are guaranteed to be enqueued if their transaction commits, are removed if their transaction rolls back, and aren't visible for work until commit. See transactional enqueueing for more background on this philosophy.
Jobs are defined in struct pairs, with an implementation of JobArgs and one
of Worker.
Job args contain json annotations and define how jobs are serialized to and
from the database, along with a "kind", a stable string that uniquely identifies
the job.
type SortArgs struct {
// Strings is a slice of strings to sort.
Strings []string `json:"strings"`
}
func (SortArgs) Kind() string { return "sort" }
Workers expose a Work function that dictates how jobs run.
type SortWorker struct {
// An embedded WorkerDefaults sets up default methods to fulfill the rest of
// the Worker interface:
river.WorkerDefaults[SortArgs]
}
func (w *SortWorker) Work(ctx context.Context, job *river.Job[SortArgs]) error {
sort.Strings(job.Args.Strings)
fmt.Printf("Sorted strings: %+v\n", job.Args.Strings)
return nil
}
Jobs are uniquely identified by their "kind" string. Workers are registered on start up so that River knows how to assign jobs to workers:
workers := river.NewWorkers()
// AddWorker panics if the worker is already registered or invalid:
river.AddWorker(workers, &SortWorker{})
A River Client provides an interface for job insertion and manages job
processing and maintenance services. A client's created with a database pool,
driver, and config struct containing a Workers bundle and other settings.
Here's a client Client working one queue ("default") with up to 100 worker
goroutines at a time:
riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{
Queues: map[string]river.QueueConfig{
river.QueueDefault: {MaxWorkers: 100},
},
Workers: workers,
})
if err != nil {
panic(err)
}
// Run the client inline. All executed jobs will inherit from ctx:
if err := riverClient.Start(ctx); err != nil {
panic(err)
}
The client should also be stopped on program shutdown:
// Stop fetching new work and wait for active jobs to finish.
if err := riverClient.Stop(ctx); err != nil {
panic(err)
}
There are some complexities around ensuring clients stop cleanly, but also in a timely manner. See graceful shutdown for more details on River's stop modes.
Client.InsertTx is used in conjunction with an instance of job args to
insert a job to work on a transaction:
_, err = riverClient.InsertTx(ctx, tx, SortArgs{
Strings: []string{
"whale", "tiger", "bear",
},
}, nil)
if err != nil {
panic(err)
}
See the InsertAndWork example for complete code.
Batch job insertion for efficiently inserting many jobs at once using
Postgres COPY FROM.
Cancelling jobs from inside a work function.
Scheduled jobs that run automatically at their scheduled time in the future.
Snoozing jobs from inside a work function.
Subscriptions to queue activity and statistics, providing easy hooks for telemetry like logging and metrics.
Transactional job completion to guarantee job completion commits with other changes in a transaction.
Unique jobs by args, period, queue, and state.
Work functions for simplified worker implementation.
See developing River.
River was in large part inspired by our experiences with other background job libaries over the years, most notably:
Thank you for driving the software ecosystem forward.
FAQs
Unknown package
Did you know?

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Company News
Socket has acquired Secure Annex to expand extension security across browsers, IDEs, and AI tools.

Research
/Security News
Socket is tracking cloned Open VSX extensions tied to GlassWorm, with several updated from benign-looking sleepers into malware delivery vehicles.

Product
Reachability analysis for PHP is now available in experimental, helping teams identify which vulnerabilities are actually exploitable.