Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

github.com/riverqueue/river

Package Overview
Dependencies
Alerts
File Explorer
Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

github.com/riverqueue/river

  • v0.14.2
  • Source
  • Go
  • Socket score

Version published
Created
Source

River Build Status Go Reference

River is a robust high-performance job processing system for Go and Postgres.

See homepage, docs, and godoc, as well as the River UI.

Being built for Postgres, River encourages the use of the same database for application data and job queue. By enqueueing jobs transactionally along with other database changes, whole classes of distributed systems problems are avoided. Jobs are guaranteed to be enqueued if their transaction commits, are removed if their transaction rolls back, and aren't visible for work until commit. See transactional enqueueing for more background on this philosophy.

Job args and workers

Jobs are defined in struct pairs, with an implementation of JobArgs and one of Worker.

Job args contain json annotations and define how jobs are serialized to and from the database, along with a "kind", a stable string that uniquely identifies the job.

type SortArgs struct {
    // Strings is a slice of strings to sort.
    Strings []string `json:"strings"`
}

func (SortArgs) Kind() string { return "sort" }

Workers expose a Work function that dictates how jobs run.

type SortWorker struct {
    // An embedded WorkerDefaults sets up default methods to fulfill the rest of
    // the Worker interface:
    river.WorkerDefaults[SortArgs]
}

func (w *SortWorker) Work(ctx context.Context, job *river.Job[SortArgs]) error {
    sort.Strings(job.Args.Strings)
    fmt.Printf("Sorted strings: %+v\n", job.Args.Strings)
    return nil
}

Registering workers

Jobs are uniquely identified by their "kind" string. Workers are registered on start up so that River knows how to assign jobs to workers:

workers := river.NewWorkers()
// AddWorker panics if the worker is already registered or invalid:
river.AddWorker(workers, &SortWorker{})

Starting a client

A River Client provides an interface for job insertion and manages job processing and maintenance services. A client's created with a database pool, driver, and config struct containing a Workers bundle and other settings. Here's a client Client working one queue ("default") with up to 100 worker goroutines at a time:

riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{
    Queues: map[string]river.QueueConfig{
        river.QueueDefault: {MaxWorkers: 100},
    },
    Workers: workers,
})
if err != nil {
    panic(err)
}

// Run the client inline. All executed jobs will inherit from ctx:
if err := riverClient.Start(ctx); err != nil {
    panic(err)
}

Insert-only clients

It's often desirable to have a client that'll be used for inserting jobs, but not working them. This is possible by omitting the Queues configuration, and skipping the call to Start:

riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{
    Workers: workers,
})
if err != nil {
    panic(err)
}

Workers can also be omitted, but it's better to include it so River can check that inserted job kinds have a worker that can run them.

Stopping

The client should also be stopped on program shutdown:

// Stop fetching new work and wait for active jobs to finish.
if err := riverClient.Stop(ctx); err != nil {
    panic(err)
}

There are some complexities around ensuring clients stop cleanly, but also in a timely manner. See graceful shutdown for more details on River's stop modes.

Inserting jobs

Client.InsertTx is used in conjunction with an instance of job args to insert a job to work on a transaction:

_, err = riverClient.InsertTx(ctx, tx, SortArgs{
    Strings: []string{
        "whale", "tiger", "bear",
    },
}, nil)

if err != nil {
    panic(err)
}

See the InsertAndWork example for complete code.

Other features

Cross language enqueueing

River supports inserting jobs in some non-Go languages which are then worked by Go implementations. This may be desirable in performance sensitive cases so that jobs can take advantage of Go's fast runtime.

Development

See developing River.

Thank you

River was in large part inspired by our experiences with other background job libraries over the years, most notably:

Thank you for driving the software ecosystem forward.

FAQs

Package last updated on 16 Nov 2024

Did you know?

Socket

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Install

Related posts

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc