Package gorqs (stands for `Go Runnable Queue Service`) provides routines to queue and execute runnable jobs and caches their execution result (error) for later consultation. The Queue processor can run into synchronous or asynchronous mode. Adding a job to the Queue service is always a non-blocking operation and returns a unique job id on success.
Package river is a robust high-performance job processing system for Go and Postgres. See homepage, docs, and godoc. Being built for Postgres, River encourages the use of the same database for application data and job queue. By enqueueing jobs transactionally along with other database changes, whole classes of distributed systems problems are avoided. Jobs are guaranteed to be enqueued if their transaction commits, are removed if their transaction rolls back, and aren't visible for work _until_ commit. See transactional enqueueing for more background on this philosophy. Jobs are defined in struct pairs, with an implementation of `JobArgs` and one of `Worker`. Job args contain `json` annotations and define how jobs are serialized to and from the database, along with a "kind", a stable string that uniquely identifies the job. Workers expose a `Work` function that dictates how jobs run. Jobs are uniquely identified by their "kind" string. Workers are registered on start up so that River knows how to assign jobs to workers: A River `Client` provides an interface for job insertion and manages job processing and maintenance services. A client's created with a database pool, driver, and config struct containing a `Workers` bundle and other settings. Here's a client `Client` working one queue (`"default"`) with up to 100 worker goroutines at a time: ## Stopping The client should also be stopped on program shutdown: There are some complexities around ensuring clients stop cleanly, but also in a timely manner. See graceful shutdown for more details on River's stop modes. `Client.InsertTx` is used in conjunction with an instance of job args to insert a job to work on a transaction: See the `InsertAndWork` example for complete code. Batch job insertion for efficiently inserting many jobs at once using Postgres `COPY FROM`. Cancelling jobs from inside a work function. Error and panic handling. Periodic and cron jobs. Scheduled jobs that run automatically at their scheduled time in the future. Snoozing jobs from inside a work function. Subscriptions to queue activity and statistics, providing easy hooks for telemetry like logging and metrics. Transactional job completion to guarantee job completion commits with other changes in a transaction. Unique jobs by args, period, queue, and state. Work functions for simplified worker implementation. See developing River. Example_batchInsert demonstrates how many jobs can be inserted for work as part of a single operation. Example_completeJobWithinTx demonstrates how to transactionally complete a job alongside other database changes being made. Example_cronJob demonstrates how to create a cron job with a more complex schedule using a third party cron package to parse more elaborate crontab syntax. Example_customInsertOpts demonstrates the use of a job with custom job-specific insertion options. Example_errorHandler demonstrates how to use the ErrorHandler interface for custom application telemetry. Example_gracefulShutdown demonstrates a realistic-looking stop loop for River. It listens for SIGINT/SIGTERM (like might be received by a Ctrl+C locally or on a platform like Heroku to stop a process) and when received, tries a soft stop that waits for work to finish. If it doesn't finish in time, a second SIGINT/SIGTERM will initiate a hard stop that cancels all jobs using context cancellation. A third will give up on the stop procedure and exit uncleanly. Example_insertAndWork demonstrates how to register job workers, start a client, and insert a job on it to be worked. Example_jobCancel demonstrates how to permanently cancel a job from within Work using JobCancel. Example_jobCancelFromClient demonstrates how to permanently cancel a job from any Client using JobCancel. Example_jobSnooze demonstrates how to snooze a job from within Work using JobSnooze. The job will be run again after 5 minutes and the snooze attempt will increment the job's max attempts, ensuring that one can snooze as many times as desired. Example_periodicJob demonstrates the use of a periodic job. Example_scheduledJob demonstrates how to schedule a job to be worked in the future. Example_subscription demonstrates the use of client subscriptions to receive events containing information about worked jobs. Example_uniqueJob demonstrates the use of a job with custom job-specific insertion options. Example_workFunc demonstrates the use of river.WorkFunc, which can be used to easily add a worker with only a function instead of having to implement a full worker struct.
Package amboy provides basic infrastructure for running and describing tasks and task workflows with, potentially, minimal overhead and additional complexity. Amboy works with 4 basic logical objects: jobs, or descriptions of tasks; runnners, which are responsible for executing tasks; queues, that represent pipelines and offline workflows of tasks (e.g. not real time, processes that run outside of the primary execution path of a program); and dependencies that represent relationships between jobs. The inspiration for amboy was to be able to provide a unified way to define and run jobs, that would feel equally "native" for distributed applications and distributed web application, and move easily between different architectures. While amboy users will generally implement their own Job and dependency implementations, Amboy itself provides several example Queue implementations, as well as several generic examples and prototypes of Job and dependency.Manager objects. Generally speaking you should be able to use included amboy components to provide the queue and runner components, in conjunction with custom and generic job and dependency variations. Consider the following example: The amboy package proves a number of generic methods that, using the Queue.Stats() method, block until all jobs are complete. They provide different semantics, which may be useful in different circumstances. All of these functions wait until the total number of jobs submitted to the queue is equal to the number of completed jobs, and as a result these methods don't prevent other threads from adding jobs to the queue after beginning to wait. Additionally, there are a set of methods that allow callers to wait for a specific job to complete.
Package que is a fully interoperable Golang port of Chris Hanks' Ruby Que queueing library for PostgreSQL. Que uses PostgreSQL's advisory locks for speed and reliability. See the original Que documentation for more details: https://github.com/chanks/que Because que is an interoperable port of Que, you can enqueue jobs in Ruby (i.e. from a Rails app) and write your workers in Go. Or if you have a limited set of jobs that you want to write in Go, you can leave most of your workers in Ruby and just add a few Go workers on a different queue name. Instead of using database/sql and the more popular pq PostgreSQL driver, this package uses the pgx driver: https://github.com/jackc/pgx Because Que uses session-level advisory locks, we have to hold the same connection throughout the process of getting a job, working it, deleting it, and removing the lock. Pq and the built-in database/sql interfaces do not offer this functionality, so we'd have to implement our own connection pool. Fortunately, pgx already has a perfectly usable one built for us. Even better, it offers better performance than pq due largely to its use of binary encoding. que relies on prepared statements for performance. As of now these have to be initialized manually on your connection pool like so: If you have suggestions on how to cleanly do this automatically, please open an issue! Here is a complete example showing worker setup and two jobs enqueued, one with a delay:
Persistent queue library designed for single process thread-safe job queue where data loss is not an option at cost of speed.
Package ticket is designed to provide a ticketed job submission system, with notification for callers. Users of ticket need only supply a handler that can handle multiple Jobs, and use the supplied BatchServer to handle queueing and submission.
Package que-go is a fully interoperable Golang port of Chris Hanks' Ruby Que queueing library for PostgreSQL. Que uses PostgreSQL's advisory locks for speed and reliability. See the original Que documentation for more details: https://github.com/chanks/que Because que-go is an interoperable port of Que, you can enqueue jobs in Ruby (i.e. from a Rails app) and write your workers in Go. Or if you have a limited set of jobs that you want to write in Go, you can leave most of your workers in Ruby and just add a few Go workers on a different queue name. Instead of using database/sql and the more popular pq PostgreSQL driver, this package uses the pgx driver: https://github.com/jackc/pgx Because Que uses session-level advisory locks, we have to hold the same connection throughout the process of getting a job, working it, deleting it, and removing the lock. Pq and the built-in database/sql interfaces do not offer this functionality, so we'd have to implement our own connection pool. Fortunately, pgx already has a perfectly usable one built for us. Even better, it offers better performance than pq due largely to its use of binary encoding. que-go relies on prepared statements for performance. As of now these have to be initialized manually on your connection pool like so: If you have suggestions on how to cleanly do this automatically, please open an issue! Here is a complete example showing worker setup and two jobs enqueued, one with a delay:
goworker is a Resque-compatible, Go-based background worker. It allows you to push jobs into a queue using an expressive language like Ruby while harnessing the efficiency and concurrency of Go to minimize job latency and cost. goworker workers can run alongside Ruby Resque clients so that you can keep all but your most resource-intensive jobs in Ruby. To create a worker, write a function matching the signature and register it using Here is a simple worker that prints its arguments: To create workers that share a database pool or other resources, use a closure to share variables. goworker worker functions receive the queue they are serving and a slice of interfaces. To use them as parameters to other functions, use Go type assertions to convert them into usable types. For testing, it is helpful to use the redis-cli program to insert jobs onto the Redis queue: will insert 100 jobs for the MyClass worker onto the myqueue queue. It is equivalent to: After building your workers, you will have an executable that you can run which will automatically poll a Redis server and call your workers as jobs arrive. There are several flags which control the operation of the goworker client. -queues="comma,delimited,queues" — This is the only required flag. The recommended practice is to separate your Resque workers from your goworkers with different queues. Otherwise, Resque worker classes that have no goworker analog will cause the goworker process to fail the jobs. Because of this, there is no default queue, nor is there a way to select all queues (à la Resque's * queue). Queues are processed in the order they are specififed. If you have multiple queues you can assign them weights. A queue with a weight of 2 will weight of 1: -queues='high=2,low=1'. -interval=5.0 — Specifies the wait period between polling if no job was in the queue the last time one was requested. -concurrency=25 — Specifies the number of concurrently executing workers. This number can be as low as 1 or rather comfortably as high as 100,000, and should be tuned to your workflow and the availability of outside resources. -connections=2 — Specifies the maximum number of Redis connections that goworker will consume between the poller and all workers. There is not much performance gain over two and a slight penalty when using only one. This is configurable in case you need to keep connection counts low for cloud Redis providers who limit plans on maxclients. -uri=redis://localhost:6379/ — Specifies the URI of the Redis database from which goworker polls for jobs. Accepts URIs of the format redis://user:pass@host:port/db or unix:///path/to/redis.sock. The flag may also be set by the environment variable $($REDIS_PROVIDER) or $REDIS_URL. E.g. set $REDIS_PROVIDER to REDISTOGO_URL on Heroku to let the Redis To Go add-on configure the Redis database. -namespace=resque: — Specifies the namespace from which goworker retrieves jobs and stores stats on workers. -exit-on-complete=false — Exits goworker when there are no jobs left in the queue. This is helpful in conjunction with the time command to benchmark different configurations. You can also configure your own flags for use within your workers. Be sure to set them before calling goworker.Main(). It is okay to call flags.Parse() before calling goworker.Main() if you need to do additional processing on your flags. To stop goworker, send a QUIT, TERM, or INT signal to the process. This will immediately stop job polling. There can be up to $CONCURRENCY jobs currently running, which will continue to run until they are finished. Like Resque, goworker makes no guarantees about the safety of jobs in the event of process shutdown. Workers must be both idempotent and tolerant to loss of the job in the event of failure. If the process is killed with a KILL or by a system failure, there may be one job that is currently in the poller's buffer that will be lost without any representation in either the queue or the worker variable. If you are running Goworker on a system like Heroku, which sends a TERM to signal a process that it needs to stop, ten seconds later sends a KILL to force the process to stop, your jobs must finish within 10 seconds or they may be lost. Jobs will be recoverable from the Redis database under as a JSON object with keys queue, run_at, and payload, but the process is manual. Additionally, there is no guarantee that the job in Redis under the worker key has not finished, if the process is killed before goworker can flush the update to Redis.
Package anser provides a document transformation and processing tool to support data migrations. The anser.Application is the primary interface in which migrations are defined and executed. Applications are constructed with a list of MigrationGenerators, and relevant operations. Then the Setup method configures the application, with an anser.Environment, which sets up and collects dependency information. Finally, the Run method executes the migrations in two phases: first by generating migration jobs, and finally by running all migration jobs. The ordering of migrations is derived from the dependency information between generators and the jobs that they generate. When possible jobs are executed in parallel, but the execution of migration operations is a property of the queue object configured in the anser.Environment. The anser package provides a custom amboy/dependency.Manager object, which allows migrations to express dependencies to other migrations. The State() method ensures that all migration IDs specified as edges are satisfied before reporting as "ready" for work. Anser provides the Environment interface, with a global instance accessible via the exported GetEnvironment() function to provide access to runtime configuration state: database connections; amboy.Queue objects, and registries for task implementations. The Environment is an interface: you can build a mock, or use one provided for testing purposes by anser (coming soon). Generators create migration operations and are the first step in an anser Migration. They are supersets of amboy.Job interfaces. The current limitation is that the generated jobs must be stored within the implementation of the generator job, which means they must either all fit in memory *or* be serializable independently (e.g. fit in the 16mb document limit if using a MongoDB backed queue.)
Package que-go is a fully interoperable Golang port of Chris Hanks' Ruby Que queueing library for PostgreSQL. Que uses PostgreSQL's advisory locks for speed and reliability. See the original Que documentation for more details: https://github.com/chanks/que Because que-go is an interoperable port of Que, you can enqueue jobs in Ruby (i.e. from a Rails app) and write your workers in Go. Or if you have a limited set of jobs that you want to write in Go, you can leave most of your workers in Ruby and just add a few Go workers on a different queue name. Instead of using database/sql and the more popular pq PostgreSQL driver, this package uses the pgx driver: https://github.com/jackc/pgx Because Que uses session-level advisory locks, we have to hold the same connection throughout the process of getting a job, working it, deleting it, and removing the lock. Pq and the built-in database/sql interfaces do not offer this functionality, so we'd have to implement our own connection pool. Fortunately, pgx already has a perfectly usable one built for us. Even better, it offers better performance than pq due largely to its use of binary encoding. que-go relies on prepared statements for performance. As of now these have to be initialized manually on your connection pool like so: If you have suggestions on how to cleanly do this automatically, please open an issue! Here is a complete example showing worker setup and two jobs enqueued, one with a delay:
Package goworker is a Resque-compatible, Go-based background worker. It allows you to push jobs into a queue using an expressive language like Ruby while harnessing the efficiency and concurrency of Go to minimize job latency and cost. goworker workers can run alongside Ruby Resque clients so that you can keep all but your most resource-intensive jobs in Ruby. To create a worker, write a function matching the signature and register it using Here is a simple worker that prints its arguments: To create workers that share a database pool or other resources, use a closure to share variables. goworker worker functions receive the queue they are serving and a slice of interfaces. To use them as parameters to other functions, use Go type assertions to convert them into usable types. For testing, it is helpful to use the redis-cli program to insert jobs onto the Redis queue: will insert 100 jobs for the MyClass worker onto the myqueue queue. It is equivalent to: After building your workers, you will have an executable that you can run which will automatically poll a Redis server and call your workers as jobs arrive. There are several flags which control the operation of the goworker client. -queues="comma,delimited,queues" — This is the only required flag. The recommended practice is to separate your Resque workers from your goworkers with different queues. Otherwise, Resque worker classes that have no goworker analog will cause the goworker process to fail the jobs. Because of this, there is no default queue, nor is there a way to select all queues (à la Resque's * queue). Queues are processed in the order they are specififed. If you have multiple queues you can assign them weights. A queue with a weight of 2 will be checked twice as often as a queue with a weight of 1: -queues='high=2,low=1'. -interval=5.0 — Specifies the wait period between polling if no job was in the queue the last time one was requested. -concurrency=25 — Specifies the number of concurrently executing workers. This number can be as low as 1 or rather comfortably as high as 100,000, and should be tuned to your workflow and the availability of outside resources. -connections=2 — Specifies the maximum number of Redis connections that goworker will consume between the poller and all workers. There is not much performance gain over two and a slight penalty when using only one. This is configurable in case you need to keep connection counts low for cloud Redis providers who limit plans on maxclients. -uri=redis://localhost:6379/ — Specifies the URI of the Redis database from which goworker polls for jobs. Accepts URIs of the format redis://user:pass@host:port/db or unix:///path/to/redis.sock. The flag may also be set by the environment variable $($REDIS_PROVIDER) or $REDIS_URL. E.g. set $REDIS_PROVIDER to REDISTOGO_URL on Heroku to let the Redis To Go add-on configure the Redis database. -namespace=resque: — Specifies the namespace from which goworker retrieves jobs and stores stats on workers. -exit-on-complete=false — Exits goworker when there are no jobs left in the queue. This is helpful in conjunction with the time command to benchmark different configurations. -use-number=false — Uses json.Number when decoding numbers in the job payloads. This will avoid issues that occur when goworker and the json package decode large numbers as floats, which then get encoded in scientific notation, losing pecision. This will default to true soon. You can also configure your own flags for use within your workers. Be sure to set them before calling goworker.Main(). It is okay to call flags.Parse() before calling goworker.Main() if you need to do additional processing on your flags. To stop goworker, send a QUIT, TERM, or INT signal to the process. This will immediately stop job polling. There can be up to $CONCURRENCY jobs currently running, which will continue to run until they are finished. Like Resque, goworker makes no guarantees about the safety of jobs in the event of process shutdown. Workers must be both idempotent and tolerant to loss of the job in the event of failure. If the process is killed with a KILL or by a system failure, there may be one job that is currently in the poller's buffer that will be lost without any representation in either the queue or the worker variable. If you are running Goworker on a system like Heroku, which sends a TERM to signal a process that it needs to stop, ten seconds later sends a KILL to force the process to stop, your jobs must finish within 10 seconds or they may be lost. Jobs will be recoverable from the Redis database under as a JSON object with keys queue, run_at, and payload, but the process is manual. Additionally, there is no guarantee that the job in Redis under the worker key has not finished, if the process is killed before goworker can flush the update to Redis.
Package gue implements Golang queue on top of PostgreSQL. It uses transaction-level locks for concurrent work. Package supports several PostgreSQL drivers using adapter interface internally. Currently, adapters for the following drivers have been implemented: Here is a complete example showing worker setup for pgx/v4 and two jobs enqueued, one with a delay:
Package pgq provides an implementation of a Postgres-backed job queue. Safe concurrency is built on top of the SKIP LOCKED functionality introduced in Postgres 9.5. Retries and exponential backoff are supported.
Package gue implements Golang queues on top of PostgreSQL. It uses transaction-level locks for concurrent work. Package supports several PostgreSQL drivers using adapter interface internally. Currently, adapters for the following drivers have been implemented: Here is a complete example showing worker setup for pgx/v4 and two jobs enqueued, one with a delay:
Package parallel coordinates and serialises output written to stdout and stderr by concurrent goroutines. The goal is to make it easy for go command-line tools to process all arguments in parallel, thus reducing latency, while maintaining the illusion that each argument is processed serially. This package is designed for commands which process multiple arguments similar to: Normally such commands are constrained from running a goroutine per argument because their output is randomly intermingled and thus rendered unintelligible. This is unfortunate as go commands are well suited to this style of implementation. The parallel package removes this constraint and enables a goroutine per argument by ensuring output is not intermingled and that all output appears in serial argument order. For those familiar with the GNU parallel, this package achieves similar functionality within commands written in go. Superficially “parallel” appears similar to x/sync/errgroup in the standard library, however they perform quite different functions in that “errgroup” is designed to manage goroutines working to achieve a common goal where a single failure causes a collective failure. In contrast, “parallel” is designed to manage independent goroutines contained in a command-line program. Most importantly, “parallel” is largely about coordinating output to stdout and stderr whereas “errgroup” plays no part in that. When adapting existing commands to use “parallel”, programmers needs to be aware of newly created concurrent interactions between goroutines which may not have existed with the original implementation. In this situation it is suggested that such commands initially be built and tested with the “-race” option. Idiomatic use is to populate a Group with a RunFunc for each command-line argument. Once populated, Group.Run starts a goroutine for each RunFunc in the Group. Following that, a call to Group.Wait is made to wait for completion of all RunFuncs. If your current code serially processes command-line arguments something like this: then to process all arguments in parallel while still generating serially identical output, your replacement code will look something like this: which in this case uses a closure to satisfy the RunFunc signature. An alternative is to use a struct function to satisfy the signature, as described in RunFunc. The main change you have to make is to ensure that your RunFunc *always* uses the io.Writers designated as stdout and stderr as *all* output must be written to these io.Writers, never to os.Stdout or os.Stderr directly. Further examples of how to use “parallel” can be found in the _examples sub-directory. If your code-base is large or complicated it may not be easy to find every relevant reference to os.Stdout and os.Stderr, however since these variables can be modified it's relatively easy to at least identify which output is still being written directly. One way to do this is to replace os.Stdout and os.Stderr with your own os.File *after* calling NewGroup. E.g.: then examine "out.missed" after running your test suite. (More sophisiticated blocking and capturing at the time of occurrence is possible with os.NewFile and Unix named pipes.) Unlike GNU parallel this package does not support detecting RunFunc timeouts or errors, nor does it offer retry attempts or job resumption. Firstly because this adds a lot of opinionated complexity to the API and secondly because such features designed to best suit individual applications can be readily added via a closure or a struct function. As one example, if an application wants their RunFunc to stop the whole Group on error somewhat like x/sync/errgroup, one approach is to create a “terminate” channel which is populated on error. Each RunFunc monitors this channel and terminates immediately if it is written to. Serial processing command-line programs typically do not have to worry about concurrency controls, but when adopting this package, they will now have to do so. Such programs should be particularly aware of modifying shared data such as global counters, progress meters and similar. All such modifications need to be concurrency protected. Naturally access to read-only data structures such as option settings do not require any protection. The parallel package achieves most of its functionality via a “Pipeline” assigned to each RunFunc. Output is steered thru writers in the pipeline based on the Group config options. Specific features are handled by different writers such as “head” and “tagger”. The theory being that new writers which implement future functionality can easily slot into the pipeline. There are currently two types of Pipelines: Queue and Passthru. The initial pipeline for each RunFunc is normally a Queue Pipeline which starts in “background” mode because, much like a background command in a shell, it continues to run, except that all output is buffered until the pipeline is switched to “foreground” mode. This diagram illustrates the “writers” in a Queue Pipeline. The “queue” writer buffers and tracks the arrival order of stdout and stderr outputs. Arrival order has to be maintained to ensure correct transfer to the Group io.Writers. This is necessary because the Group io.Writers may well be one and the same, e.g. if a command is invoked with stderr re-directed to stdout or if both stdout and stderr are a terminal. When a RunFunc becomes a candidate for foreground output (because it has percolated to the front of the queue with OrderRunners(true)), the “queue” buffered output is written to the Group io.Writers and the Queue Pipeline is switched to "foreground" mode. Passthru is a skeletal pipeline intended as a diagnostic tool which bypasses most of the “parallel” functionality. It is created when the Group is constructed with Passthru(true). The main reason for using Passthru is for situations where you have suspicions about the parallel package and want a relatively unfiltered view of what your RunFuncs are generating.
Package goworker is a Resque-compatible, Go-based background worker. It allows you to push jobs into a queue using an expressive language like Ruby while harnessing the efficiency and concurrency of Go to minimize job latency and cost. goworker workers can run alongside Ruby Resque clients so that you can keep all but your most resource-intensive jobs in Ruby. To create a worker, write a function matching the signature and register it using Here is a simple worker that prints its arguments: To create workers that share a database pool or other resources, use a closure to share variables. goworker worker functions receive the queue they are serving and a slice of interfaces. To use them as parameters to other functions, use Go type assertions to convert them into usable types. For testing, it is helpful to use the redis-cli program to insert jobs onto the Redis queue: will insert 100 jobs for the MyClass worker onto the myqueue queue. It is equivalent to: After building your workers, you will have an executable that you can run which will automatically poll a Redis server and call your workers as jobs arrive. There are several flags which control the operation of the goworker client. -queues="comma,delimited,queues" — This is the only required flag. The recommended practice is to separate your Resque workers from your goworkers with different queues. Otherwise, Resque worker classes that have no goworker analog will cause the goworker process to fail the jobs. Because of this, there is no default queue, nor is there a way to select all queues (à la Resque's * queue). Queues are processed in the order they are specififed. If you have multiple queues you can assign them weights. A queue with a weight of 2 will be checked twice as often as a queue with a weight of 1: -queues='high=2,low=1'. -interval=5.0 — Specifies the wait period between polling if no job was in the queue the last time one was requested. -concurrency=25 — Specifies the number of concurrently executing workers. This number can be as low as 1 or rather comfortably as high as 100,000, and should be tuned to your workflow and the availability of outside resources. -connections=2 — Specifies the maximum number of Redis connections that goworker will consume between the poller and all workers. There is not much performance gain over two and a slight penalty when using only one. This is configurable in case you need to keep connection counts low for cloud Redis providers who limit plans on maxclients. -uri=redis://localhost:6379/ — Specifies the URI of the Redis database from which goworker polls for jobs. Accepts URIs of the format redis://user:pass@host:port/db or unix:///path/to/redis.sock. The flag may also be set by the environment variable $($REDIS_PROVIDER) or $REDIS_URL. E.g. set $REDIS_PROVIDER to REDISTOGO_URL on Heroku to let the Redis To Go add-on configure the Redis database. -namespace=resque: — Specifies the namespace from which goworker retrieves jobs and stores stats on workers. -exit-on-complete=false — Exits goworker when there are no jobs left in the queue. This is helpful in conjunction with the time command to benchmark different configurations. -use-number=false — Uses json.Number when decoding numbers in the job payloads. This will avoid issues that occur when goworker and the json package decode large numbers as floats, which then get encoded in scientific notation, losing pecision. This will default to true soon. You can also configure your own flags for use within your workers. Be sure to set them before calling goworker.Main(). It is okay to call flags.Parse() before calling goworker.Main() if you need to do additional processing on your flags. To stop goworker, send a QUIT, TERM, or INT signal to the process. This will immediately stop job polling. There can be up to $CONCURRENCY jobs currently running, which will continue to run until they are finished. Like Resque, goworker makes no guarantees about the safety of jobs in the event of process shutdown. Workers must be both idempotent and tolerant to loss of the job in the event of failure. If the process is killed with a KILL or by a system failure, there may be one job that is currently in the poller's buffer that will be lost without any representation in either the queue or the worker variable. If you are running Goworker on a system like Heroku, which sends a TERM to signal a process that it needs to stop, ten seconds later sends a KILL to force the process to stop, your jobs must finish within 10 seconds or they may be lost. Jobs will be recoverable from the Redis database under as a JSON object with keys queue, run_at, and payload, but the process is manual. Additionally, there is no guarantee that the job in Redis under the worker key has not finished, if the process is killed before goworker can flush the update to Redis.
Package gue implements Golang queues on top of PostgreSQL. It uses transaction-level locks for concurrent work. Package supports several PostgreSQL drivers using adapter interface internally. Currently, adapters for the following drivers have been implemented: Here is a complete example showing worker setup for pgx/v4 and two jobs enqueued, one with a delay:
Package gue implements Golang queues on top of PostgreSQL. It uses transaction-level locks for concurrent work. Package supports several PostgreSQL drivers using adapter interface internally. Currently, adapters for the following drivers have been implemented: Here is a complete example showing worker setup for pgx/v4 and two jobs enqueued, one with a delay:
Package que-go is a fully interoperable Golang port of Chris Hanks' Ruby Que queueing library for PostgreSQL. Que uses PostgreSQL's advisory locks for speed and reliability. See the original Que documentation for more details: https://github.com/chanks/que Because que-go is an interoperable port of Que, you can enqueue jobs in Ruby (i.e. from a Rails app) and write your workers in Go. Or if you have a limited set of jobs that you want to write in Go, you can leave most of your workers in Ruby and just add a few Go workers on a different queue name. Instead of using database/sql and the more popular pq PostgreSQL driver, this package uses the pgx driver: https://github.com/jackc/pgx Because Que uses session-level advisory locks, we have to hold the same connection throughout the process of getting a job, working it, deleting it, and removing the lock. Pq and the built-in database/sql interfaces do not offer this functionality, so we'd have to implement our own connection pool. Fortunately, pgx already has a perfectly usable one built for us. Even better, it offers better performance than pq due largely to its use of binary encoding. que-go relies on prepared statements for performance. As of now these have to be initialized manually on your connection pool like so: If you have suggestions on how to cleanly do this automatically, please open an issue! Here is a complete example showing worker setup and two jobs enqueued, one with a delay:
Package river is a robust high-performance job processing system for Go and Postgres. See homepage, docs, and godoc, as well as the River UI. Being built for Postgres, River encourages the use of the same database for application data and job queue. By enqueueing jobs transactionally along with other database changes, whole classes of distributed systems problems are avoided. Jobs are guaranteed to be enqueued if their transaction commits, are removed if their transaction rolls back, and aren't visible for work _until_ commit. See transactional enqueueing for more background on this philosophy. Jobs are defined in struct pairs, with an implementation of `JobArgs` and one of `Worker`. Job args contain `json` annotations and define how jobs are serialized to and from the database, along with a "kind", a stable string that uniquely identifies the job. Workers expose a `Work` function that dictates how jobs run. Jobs are uniquely identified by their "kind" string. Workers are registered on start up so that River knows how to assign jobs to workers: A River `Client` provides an interface for job insertion and manages job processing and maintenance services. A client's created with a database pool, driver, and config struct containing a `Workers` bundle and other settings. Here's a client `Client` working one queue (`"default"`) with up to 100 worker goroutines at a time: ## Insert-only clients It's often desirable to have a client that'll be used for inserting jobs, but not working them. This is possible by omitting the `Queues` configuration, and skipping the call to `Start`: `Workers` can also be omitted, but it's better to include it so River can check that inserted job kinds have a worker that can run them. ## Stopping The client should also be stopped on program shutdown: There are some complexities around ensuring clients stop cleanly, but also in a timely manner. See graceful shutdown for more details on River's stop modes. `Client.InsertTx` is used in conjunction with an instance of job args to insert a job to work on a transaction: See the `InsertAndWork` example for complete code. Batch job insertion for efficiently inserting many jobs at once using Postgres `COPY FROM`. Cancelling jobs from inside a work function. Error and panic handling. Multiple queues to better guarantee job throughput, worker availability, and isolation between components. Periodic and cron jobs. Scheduled jobs that run automatically at their scheduled time in the future. Snoozing jobs from inside a work function. Subscriptions to queue activity and statistics, providing easy hooks for telemetry like logging and metrics. Test helpers to verify that jobs are inserted as expected. Transactional job completion to guarantee job completion commits with other changes in a transaction. Unique jobs by args, period, queue, and state. Web UI for inspecting and interacting with jobs and queues. Work functions for simplified worker implementation. ## Cross language enqueueing River supports inserting jobs in some non-Go languages which are then worked by Go implementations. This may be desirable in performance sensitive cases so that jobs can take advantage of Go's fast runtime. See developing River. Example_batchInsert demonstrates how many jobs can be inserted for work as part of a single operation. Example_completeJobWithinTx demonstrates how to transactionally complete a job alongside other database changes being made. Example_cronJob demonstrates how to create a cron job with a more complex schedule using a third party cron package to parse more elaborate crontab syntax. Example_customInsertOpts demonstrates the use of a job with custom job-specific insertion options. Example_errorHandler demonstrates how to use the ErrorHandler interface for custom application telemetry. Example_gracefulShutdown demonstrates a realistic-looking stop loop for River. It listens for SIGINT/SIGTERM (like might be received by a Ctrl+C locally or on a platform like Heroku to stop a process) and when received, tries a soft stop that waits for work to finish. If it doesn't finish in time, a second SIGINT/SIGTERM will initiate a hard stop that cancels all jobs using context cancellation. A third will give up on the stop procedure and exit uncleanly. Example_insertAndWork demonstrates how to register job workers, start a client, and insert a job on it to be worked. Example_jobCancel demonstrates how to permanently cancel a job from within Work using JobCancel. Example_jobCancelFromClient demonstrates how to permanently cancel a job from any Client using JobCancel. Example_jobSnooze demonstrates how to snooze a job from within Work using JobSnooze. The job will be run again after 5 minutes and the snooze attempt will increment the job's max attempts, ensuring that one can snooze as many times as desired. Example_periodicJob demonstrates the use of a periodic job. Example_queuePause demonstrates how to pause queues to prevent them from working new jobs, and later resume them. Example_scheduledJob demonstrates how to schedule a job to be worked in the future. Example_subscription demonstrates the use of client subscriptions to receive events containing information about worked jobs. Example_uniqueJob demonstrates the use of a job with custom job-specific insertion options. Example_workFunc demonstrates the use of river.WorkFunc, which can be used to easily add a worker with only a function instead of having to implement a full worker struct.
goworker is a Resque-compatible, Go-based background worker. It allows you to push jobs into a queue using an expressive language like Ruby while harnessing the efficiency and concurrency of Go to minimize job latency and cost. goworker workers can run alongside Ruby Resque clients so that you can keep all but your most resource-intensive jobs in Ruby. To create a worker, write a function matching the signature and register it using Here is a simple worker that prints its arguments: To create workers that share a database pool or other resources, use a closure to share variables. goworker worker functions receive the queue they are serving and a slice of interfaces. To use them as parameters to other functions, use Go type assertions to convert them into usable types. For testing, it is helpful to use the redis-cli program to insert jobs onto the Redis queue: will insert 100 jobs for the MyClass worker onto the myqueue queue. It is equivalent to: After building your workers, you will have an executable that you can run which will automatically poll a Redis server and call your workers as jobs arrive. There are several flags which control the operation of the goworker client. -queues="comma,delimited,queues" — This is the only required flag. The recommended practice is to separate your Resque workers from your goworkers with different queues. Otherwise, Resque worker classes that have no goworker analog will cause the goworker process to fail the jobs. Because of this, there is no default queue, nor is there a way to select all queues (à la Resque's * queue). Queues are processed in the order they are specififed. If you have multiple queues you can assign them weights. A queue with a weight of 2 will weight of 1: -queues='high=2,low=1'. -interval=5.0 — Specifies the wait period between polling if no job was in the queue the last time one was requested. -concurrency=25 — Specifies the number of concurrently executing workers. This number can be as low as 1 or rather comfortably as high as 100,000, and should be tuned to your workflow and the availability of outside resources. -connections=2 — Specifies the maximum number of Redis connections that goworker will consume between the poller and all workers. There is not much performance gain over two and a slight penalty when using only one. This is configurable in case you need to keep connection counts low for cloud Redis providers who limit plans on maxclients. -uri=redis://localhost:6379/ — Specifies the URI of the Redis database from which goworker polls for jobs. Accepts URIs of the format redis://user:pass@host:port/db or unix:///path/to/redis.sock. The flag may also be set by the environment variable $($REDIS_PROVIDER) or $REDIS_URL. E.g. set $REDIS_PROVIDER to REDISTOGO_URL on Heroku to let the Redis To Go add-on configure the Redis database. -namespace=resque: — Specifies the namespace from which goworker retrieves jobs and stores stats on workers. -exit-on-complete=false — Exits goworker when there are no jobs left in the queue. This is helpful in conjunction with the time command to benchmark different configurations. You can also configure your own flags for use within your workers. Be sure to set them before calling goworker.Main(). It is okay to call flags.Parse() before calling goworker.Main() if you need to do additional processing on your flags. To stop goworker, send a QUIT, TERM, or INT signal to the process. This will immediately stop job polling. There can be up to $CONCURRENCY jobs currently running, which will continue to run until they are finished. Like Resque, goworker makes no guarantees about the safety of jobs in the event of process shutdown. Workers must be both idempotent and tolerant to loss of the job in the event of failure. If the process is killed with a KILL or by a system failure, there may be one job that is currently in the poller's buffer that will be lost without any representation in either the queue or the worker variable. If you are running Goworker on a system like Heroku, which sends a TERM to signal a process that it needs to stop, ten seconds later sends a KILL to force the process to stop, your jobs must finish within 10 seconds or they may be lost. Jobs will be recoverable from the Redis database under as a JSON object with keys queue, run_at, and payload, but the process is manual. Additionally, there is no guarantee that the job in Redis under the worker key has not finished, if the process is killed before goworker can flush the update to Redis.