Rickover
This holds the code for a scheduler and a job queue written in Go and
backed by Postgres.
The goals/features of this project are:
- Visibility into the system using a tool our team is familiar with (Postgres)
- Correctness - jobs shouldn't get stuck, or dequeued twice, unless that's
desirable
- Good memory performance - with 300 dequeuers, the server and worker take
about 30MB in total.
- No long-running database queries, or open transactions
- All queue actions have an HTTP API.
- All enqueue/dequeue actions are idempotent - it's OK if any part of the
system gets restarted
It might not be the most performant, but it should be easy to use and deploy!
Server Endpoints
The only supported content type for uploads and responses is JSON.
Create a job type
To start enqueueing and dequeueing work, you need to create a job type. Define
a job type with a name, a delivery strategy (idempotent == "at_least_once"
,
not idempotent == "at_most_once"
), and a concurrency - the maximum number
of jobs that can be in flight at once. If the job is idempotent, you can add
"attempts" - the number of times to try to send the job to the downstream
server before giving up.
POST /v1/jobs
{
"id": "invoice-shipments",
"delivery_strategy": "at_least_once",
"attempts": 3,
"concurrency": 5
}
This returns a models.Job on success.
Creating a new job will also send a signal to the dequeuer to ask it to restart
itself (so it can create workers to process jobs using the new job type). If you
do not want to enable this behavior, set DisableMetaShutdown: true
in the
Config
for the dequeuer or for the server.
Enqueue a new job
Once you have a job type, you can enqueue new jobs. Note the client is
responsible for generating a UUID.
PUT /v1/jobs/invoice-shipments/job_282227eb-3c76-4ef7-af7e-25dff933077f
{
"data": {
"shipmentId": "shp_123",
}
"id": "job_282227eb-3c76-4ef7-af7e-25dff933077f",
"run_after": "2016-01-11T18:26:26.000Z",
"expires_at": "2016-01-11T20:26:26.000Z"
}
This inserts a record into the queued_jobs
table and returns a
models.QueuedJob. The client can and should retry in the event of
failure.
You can put any valid JSON in the data
field; we'll send this to the
downstream worker.
There are two special fields - run_after
indicates the earliest possible
time this job can run (or null
to indicate it can run now), and expires_at
indicates the latest possible time this job can run. If a job is dequeued after
the expires_at
date, we don't send it to the downstream worker, and insert it
immediately into the archived_jobs
table with status expired
.
Record a job's success or failure
Once the downstream worker has completed work, record the status of the job by
making a POST request to the same URI.
POST /v1/jobs/invoice-shipments/job_123 HTTP/1.1
{
"status": "succeeded"
"attempt": 3,
}
Note you must include the attempt number in your callback; we use this
for idempotency, and to avoid stale writes. Valid values for status
are
"succeeded" or "failed". If a failed job is retryable, we'll insert the job
back into the queued_jobs
table with a run_after
date set a small amount
of time in the future. If a failed job is retryable but should not be retried,
include "retryable": false
in the body of the POST request, which will
immediately archive the job.
Replay a job
This is handy if the initial job failed, the downstream server had an outage,
or you want to re-run something on an adhoc basis.
POST /v1/jobs/invoice-shipments/job_123/replay HTTP/1.1
Will create a new UUID and enqueue the job to be run immediately. If you
attempt to replay an expired job, the new job will be immediately archived with
a status of "expired".
Get information about a job
GET /v1/jobs/invoice-shipments/job_123 HTTP/1.1
This looks in the queued_jobs table first, then the archived_jobs table, and
returns whatever it finds. Note the fields in these tables don't match up 100%.
List completed jobs
GET /v1/archived-jobs HTTP/1.1
Returns a list of archived jobs. Pass name=x
to filter by jobs named x
. Pass
limit=x
to limit results.
Server Authentication
By default, the server uses an in-memory secret for authentication. Call
server.AddUser
to add an authenticated user and password for the
DefaultServer.
You can use your own authentication scheme with any code that satisifies the
server.Authorizer interface:
type Authorizer interface {
Authorize(user string, token string) error
}
Then, get a http.Handler with your authorizer by calling
import "github.com/kevinburke/rickover/server"
handler := server.Get(authorizer)
http.ListenAndServe(":9090", handler)
Processing jobs
When you get a job from the database, you can do whatever you want with it -
your dequeuer just needs to satisfy the Worker interface.
type Worker interface {
DoWork(*models.QueuedJob) error
}
A default Worker is provided as services.JobProcessor,
which makes an API request to a downstream service. The default client is
downstream.Client. You'll need to set the URL
and password for the downstream service:
import "github.com/kevinburke/rickover/dequeuer"
import "github.com/kevinburke/rickover/services"
func main() {
password := "hymanrickover"
jp := services.NewJobProcessor("http://downstream-service.example.com", password)
pools, err := dequeuer.CreatePools(jp)
fmt.Println(err)
}
The downstream.Client will make a POST request to
/v1/jobs/:job-name/:job-id
:
POST /v1/jobs/invoice-shipment/job_123 HTTP/1.1
Host: downstream.shyp.com
Content-Type: application/json
Accept: application/json
{
"data": {
"shipmentId": "shp_123"
},
"id": "job_123",
"attempts": 3
}
Callbacks
All actions in the system are designed to be short-lived. When the downstream
server has finished processing the job, it should make a callback to the
Rickover server, reporting on the status of the job, with status
set to
succeeded
or failed
.
POST /v1/jobs/invoice-shipments/job_123 HTTP/1.1
Host: rickover.shyp.com
Content-Type: application/json
{
"status": "succeeded"
"attempt": 3,
}
If this request times out or errors, you can try it again; the attempt
number
is used to avoid making a stale update.
You can also report status of a job by calling
services.HandleStatusCallback directly, with success or
failure.
Failure Handling
If the downstream worker never hits the callback, the JobProcessor will time
out after 5 minutes and mark the job as failed.
If the dequeuer gets killed while waiting for a response, we'll time out the
job after 7 minutes, and mark it as failed. (This means the maximum allowable
time for a job is 7 minutes.)
Dashboard
The homepage can embed an iframe of your choice, configurable via the
HOMEPAGE_IFRAME_URL
environment variable. We set up a Librato space with the
metrics we send from this service, and embed that in the homepage:
Stuck jobs
If a dequeuer gets restarted after it's Acquire()d a job but before it can send
it downstream, or if the downstream worker gets restarted before it can hit the
callback, the job can get stuck in-progress indefinitely. Run WatchStuckJobs in
a goroutine to periodically check for in-progress jobs and mark them as failed:
stuckJobTimeout := 7 * time.Minute
go services.WatchStuckJobs(1*time.Minute, stuckJobTimeout)
Database Table Layout
There are three tables, plus one for keeping track of ran migrations.
jobs
- Contains information about a job's name, retry strategy, desired
concurrency.
Table "public.jobs"
Column | Type | Modifiers
-------------------+--------------------------+------------------------
name | text | not null
delivery_strategy | delivery_strategy | not null
attempts | smallint | not null
concurrency | smallint | not null
created_at | timestamp with time zone | not null default now()
Indexes:
"jobs_pkey" PRIMARY KEY, btree (name)
Check constraints:
"jobs_attempts_check" CHECK (attempts > 0)
"jobs_concurrency_check" CHECK (concurrency >= 0)
Referenced by:
TABLE "archived_jobs" CONSTRAINT "archived_jobs_name_fkey" FOREIGN KEY (name) REFERENCES jobs(name)
TABLE "queued_jobs" CONSTRAINT "queued_jobs_name_fkey" FOREIGN KEY (name) REFERENCES jobs(name)
queued_jobs
- The "hot" table, this contains rows that are scheduled to be
dequeued. Should be small, so queries are fast.
Table "public.queued_jobs"
Column | Type | Modifiers
------------+--------------------------+------------------------
id | uuid | not null
name | text | not null
attempts | smallint | not null
run_after | timestamp with time zone | not null
expires_at | timestamp with time zone |
created_at | timestamp with time zone | not null default now()
updated_at | timestamp with time zone | not null default now()
status | job_status | not null
data | jsonb | not null
Indexes:
"queued_jobs_pkey" PRIMARY KEY, btree (id)
"find_queued_job" btree (name, run_after) WHERE status = 'queued'::job_status
"queued_jobs_created_at" btree (created_at)
Check constraints:
"queued_jobs_attempts_check" CHECK (attempts >= 0)
Foreign-key constraints:
"queued_jobs_name_fkey" FOREIGN KEY (name) REFERENCES jobs(name)
archived_jobs
- Insert-only table containing historical records of all
jobs. May grow very large.
Table "public.archived_jobs"
Column | Type | Modifiers
------------+--------------------------+------------------------
id | uuid | not null
name | text | not null
attempts | smallint | not null
status | archived_job_status | not null
created_at | timestamp with time zone | not null default now()
data | jsonb | not null
Indexes:
"archived_jobs_pkey" PRIMARY KEY, btree (id)
Check constraints:
"archived_jobs_attempts_check" CHECK (attempts >= 0)
Foreign-key constraints:
"archived_jobs_name_fkey" FOREIGN KEY (name) REFERENCES jobs(name)
Example servers and dequeuers
Example server and dequeuer instances are stored in commands/server and
commands/dequeuer. You will probably want to modify these to provide your own
authentication scheme.
Configure the server
You can use the following variables to tune the server:
-
PG_SERVER_POOL_SIZE
- Maximum number of database connections from an
individual instance. Across every database connection in the cluster, you want
to have the number of active Postgres connections equal to 2 * (num CPUs on the
Postgres machine). Currently set to 15.
-
PORT
- which port to listen on.
-
LIBRATO_TOKEN
- This library uses Librato for metrics. This environment
variable sets the Librato token for publishing.
-
DATABASE_URL
- Postgres database URL. Currently only connections to the
primary are allowed, there are not a lot of reads in the system, and all
queries are designed to be short.
Configure the dequeuer
The number of dequeuers is determined by the number of entries in the jobs
table. There is currently no way to adjust the number of dequeuers on the fly,
you must update the database and then restart the worker process.
-
PG_WORKER_POOL_SIZE
- How many workers to use. Workers hit Postgres in a
busy loop asking for work with a SELECT ... FOR UPDATE
, which skips rows
if they are active, so queries from the worker tend to cause more active
connections than those from the server.
-
DATABASE_URL
- Postgres database URL. Currently only connections to the
primary are allowed, there are not a lot of reads in the system, and all
queries are designed to be short.
-
DOWNSTREAM_URL
- When you dequeue a job, hit this URL to tell something to
do some work.
-
DOWNSTREAM_WORKER_AUTH
- Basic auth password for the downstream service
(user is "jobs").
Configure metrics
The default metrics client uses Librato - see examples of how to configure it in
example_server_test.go
or example_dequeuer_test.go
. However, you can
override the default metrics client with any client that implements the
metrics.Metrics
interface, something like this:
func (m myCustomClient) Increment(metric string) { ... }
func main() {
metrics.Client = myCustomClient{}
}
You can choose which metrics to send by overriding metrics.Exclude
to
selectively exclude some metrics. By default, no metrics are excluded.
metrics.Exclude = func(metric string) bool {
return !strings.HasSuffix("latency")
}
Local development
We use goose for database migrations. The test database is
rickover_test
and the development database is rickover
. The authenticating
user is rickover
.
To run all migrations, run:
goose --env=test up
To get the database status:
goose --env=test status
You should also be able to use goose to run migrations in your production
environment. Set the DATABASE_URL environment variable to a Postgres string,
then use the cluster
environment, for example
DATABASE_URL=$(heroku config:get DATABASE_URL --app myapp) goose --env=cluster up
Start the server
make serve
Will start the example server on port 8080.
Start the dequeuer
make dequeue
Will try to pull jobs out of the database and send them to the downstream
worker. Note you will need to set DOWNSTREAM_WORKER_AUTH
as the basic auth
password for the downstream service (the user is hardcoded to "jobs"), and
DOWNSTREAM_URL
as the URL to hit when you have a job to dequeue.
Debugging variables
DEBUG_HTTP_TRAFFIC
- Dump all incoming and outgoing http traffic to stdout
Run the tests
First create the test database:
make test-install
Then run all migrations:
make migrate
Finally, you can run the tests:
make test
The race detector takes longer to run, so we only enable it in CI, but
you can run tests with the race detector enabled:
make race-test
View the docs
Run make docs
, which will start a docs server on port 6060 and open it to the
right documentation page. All public API's will be present, and most function
calls should be documented - some with examples.
Working with Dep
Dep (github.com/golang/dep) is the tool for bringing all dependencies into the project.
Benchmarks
The Dequeue
benchmark measures dequeue performance at various concurrency
levels. One kB
is one run, essentially. It does not use the server.
Some benchmark numbers are here: https://docs.google.com/a/shyp.co/spreadsheets/d/1KF3pqCczDMRXZcq-ZqpQeGKo4sPclThltWhsxHdUPTc/edit?usp=sharing
The second bottleneck was the database. Note the database performed best when
the numbers of connection counts and dequeuers were low. In the cluster we will
want to have a higher number of dequeuers, simply because we aren't enqueueing
as many jobs, it's more important to be fast when we need speed than to worry
about the optimal number for peak performance.
In the cluster I was able to dequeue 7,000 jobs per minute with a single $25
web node, a $25 dequeuer node, a $50 database and a $25 Node.js worker. The
first place I would look to improve this would be to increase the number of
Node (downstream) dynos.
I used boom
for enqueuing jobs; I turned off the dequeuer, enqueued
30000 jobs, then started the dequeuer and measured the timestamp difference
between the first enqueued job and the last.
There's a builtin random_id
endpoint which will generate a UUID for you, for
doing load testing.
boom -n 30000 -c 100 -d '{"data": {"user-agent": "boom"}}' -m PUT http://localhost:9090/v1/jobs/echo/random_id
Testing
Tests hit the database, and should either be able to run in parallel with other
tests or clean up after themselves.
Note you must run tests with -p=1
, so packages are tested in turn. Otherwise
t.Parallel()
will run parallel tests from different suites at the same time as
each other, which we currently don't support.
Supported versions
The database uses jsonb
, which is only available in Postgres 9.4 and beyond.
The Go server exposes http/pprof/trace
, which is only available in Go 1.5 and
beyond.
You can probably fork the project to remove the http/pprof handlers and replace
jsonb with json and it should compile/run fine.
Single points of failure
This project has two points of failure:
-
If the database goes down, you can't enqueue any new jobs, or process
any work. This is acceptable for most companies and projects. If this is
unacceptable you may want to check out a distributed scheduler like Chronos, or
use something else for the job queue, like SQS.
-
If the dequeuer goes down, you can't process any work. You can run multiple
dequeuers, or manually split the single concurrency value across multiple
machines, or just wait until the machine comes back up again. Note if the
dequeuer goes down you can still enqueue jobs, the database queue will continue
to grow.
Suggestions for scaling the project
-
Pull jobs out of the database in batches, and send them to the dequeuers over
channels.
-
Use it only as a scheduler, and move the job queue to SQS or something else.
-
Run the server on multiple machines. The worker can't run on multiple
machines without violating the concurrency guarantees.
-
Run the worker on multiple machines, and ignore or update the concurrency
guarantees.
-
Run the downstream worker on a larger number of machines.
-
Shard the Postgres database so jobs A-M are in one database, and N-Z are in
another. Would need to update db.Conn
to be an interface, or wrap it behind a
function.
-
Delete/archive all rows from archived_jobs
that are older than 180 days, on
a rolling basis. I doubt this will help much, but it might.
-
Get a bigger Postgres database.
-
Upgrade to Postgres 9.5 and update the Acquire() strategy to use SKIP LOCKED.
Roadmap
-
Allow UPDATEs to jobs once they've been created
-
API for retrieving recent jobs/paging through archived jobs, by name
-
Dequeuer listens on a local socket/port, so you can send commands to it to
add/remove dequeuers on the fly.