qless
Qless is a powerful Redis
-based job queueing system inspired by
resque,
but built on a collection of Lua scripts, maintained in the
qless-core repo.
Philosophy and Nomenclature
A job
is a unit of work identified by a job id or jid
. A queue
can contain
several jobs that are scheduled to be run at a certain time, several jobs that are
waiting to run, and jobs that are currently running. A worker
is a process on a
host, identified uniquely, that asks for jobs from the queue, performs some process
associated with that job, and then marks it as complete. When it's completed, it
can be put into another queue.
Jobs can only be in one queue at a time. That queue is whatever queue they were last
put in. So if a worker is working on a job, and you move it, the worker's request to
complete the job will be ignored.
A job can be canceled
, which means it disappears into the ether, and we'll never
pay it any mind ever again. A job can be dropped
, which is when a worker fails
to heartbeat or complete the job in a timely fashion, or a job can be failed
,
which is when a host recognizes some systematically problematic state about the
job. A worker should only fail a job if the error is likely not a transient one;
otherwise, that worker should just drop it and let the system reclaim it.
Features
- Jobs don't get dropped on the floor -- Sometimes workers drop jobs. Qless
automatically picks them back up and gives them to another worker
- Tagging / Tracking -- Some jobs are more interesting than others. Track those
jobs to get updates on their progress. Tag jobs with meaningful identifiers to
find them quickly in the UI.
- Job Dependencies -- One job might need to wait for another job to complete
- Stats --
qless
automatically keeps statistics about how long jobs wait
to be processed and how long they take to be processed. Currently, we keep
track of the count, mean, standard deviation, and a histogram of these times. - Job data is stored temporarily -- Job info sticks around for a configurable
amount of time so you can still look back on a job's history, data, etc.
- Priority -- Jobs with the same priority get popped in the order they were
inserted; a higher priority means that it gets popped faster
- Retry logic -- Every job has a number of retries associated with it, which are
renewed when it is put into a new queue or completed. If a job is repeatedly
dropped, then it is presumed to be problematic, and is automatically failed.
- Web App -- With the advent of a Ruby client, there is a Sinatra-based web
app that gives you control over certain operational issues
- Scheduled Work -- Until a job waits for a specified delay (defaults to 0),
jobs cannot be popped by workers
- Recurring Jobs -- Scheduling's all well and good, but we also support
jobs that need to recur periodically.
- Notifications -- Tracked jobs emit events on pubsub channels as they get
completed, failed, put, popped, etc. Use these events to get notified of
progress on jobs you're interested in.
Enqueing Jobs
First things first, require qless
and create a client. The client accepts all the
same arguments that you'd use when constructing a redis client.
require 'qless'
client = Qless::Client.new
client = Qless::Client.new(:host => 'foo.bar.com', :port => 1234)
Jobs should be classes or modules that define a perform
method, which
must accept a single job
argument:
class MyJobClass
def self.perform(job)
end
end
Now you can access a queue, and add a job to that queue.
queue = client.queues['testing']
queue.put(MyJobClass, :hello => 'howdy')
job = queue.pop
job.perform
The job data must be serializable to JSON, and it is recommended
that you use a hash for it. See below for a list of the supported job options.
The argument returned by queue.put
is the job ID, or jid. Every Qless
job has a unique jid, and it provides a means to interact with an
existing job:
job = client.jobs[jid]
job.klass
job.queue
job.data
job.history
job.dependencies
job.dependents
job.priority
job.tags
job.original_retries
job.retries_left
job.requeue("some_other_queue")
job.cancel
job.tag("foo")
job.untag("foo")
Running A Worker
The Qless ruby worker was heavily inspired by Resque's worker,
but thanks to the power of the qless-core lua scripts, it is
much simpler and you are welcome to write your own (e.g. if
you'd rather save memory by not forking the worker for each job).
As with resque...
- The worker forks a child process for each job in order to provide
resilience against memory leaks. Pass the
RUN_AS_SINGLE_PROCESS
environment variable to force Qless to not fork the child process.
Single process mode should only be used in some test/dev
environments. - The worker updates its procline with its status so you can see
what workers are doing using
ps
. - The worker registers signal handlers so that you can control it
by sending it signals.
- The worker is given a list of queues to pop jobs off of.
- The worker logs out put based on
VERBOSE
or VVERBOSE
(very
verbose) environment variables. - Qless ships with a rake task (
qless:work
) for running workers.
It runs qless:setup
before starting the main work loop so that
users can load their environment in that task. - The sleep interval (for when there is no jobs available) can be
configured with the
INTERVAL
environment variable.
Resque uses queues for its notion of priority. In contrast, qless
has priority support built-in. Thus, the worker supports two strategies
for what order to pop jobs off the queues: ordered and round-robin.
The ordered reserver will keep popping jobs off the first queue until
it is empty, before trying to pop job off the second queue. The
round-robin reserver will pop a job off the first queue, then the second
queue, and so on. You could also easily implement your own.
To start a worker, write a bit of Ruby code that instantiates a
worker and runs it. You could write a rake task to do this, for
example:
namespace :qless do
desc "Run a Qless worker"
task :work do
require 'my_app/environment'
require 'qless'
require 'qless/job_reservers/ordered'
require 'qless/worker'
client = Qless::Client.new(:host => 'foo.bar.com', :port => 1234)
queues = %w[ queue_1 queue_2 ].map do |name|
client.queues[name]
end
reserver = Qless::JobReservers::Ordered.new(queues)
worker = Qless::Workers::ForkingWorker.new(reserver)
worker.run
end
end
The following signals are supported in the parent process:
- TERM: Shutdown immediately, stop processing jobs.
- INT: Shutdown immediately, stop processing jobs.
- QUIT: Shutdown after the current job has finished processing.
- USR1: Kill the forked child immediately, continue processing jobs.
- USR2: Don't process any new jobs, and dump the current backtrace.
- CONT: Start processing jobs again after a USR2
You should send these to the master process, not the child.
The child process supports the USR2
signal, whch causes it to
dump its current backtrace.
Workers also support middleware modules that can be used to inject
logic before, after or around the processing of a single job in
the child process. This can be useful, for example, when you need to
re-establish a connection to your database in each job.
Define a module with an around_perform
method that calls super
where you
want the job to be processed:
module ReEstablishDBConnection
def around_perform(job)
MyORM.establish_connection
super
end
end
Then, mix-it into the worker class. You can mix-in as many
middleware modules as you like:
require 'qless/worker'
Qless::Worker.class_eval do
include ReEstablishDBConnection
include SomeOtherAwesomeMiddleware
end
Per-Job Middlewares
Qless also supports middleware on a per-job basis, when you have some
orthogonal logic to run in the context of some (but not all) jobs.
Per-job middlewares are defined the same as worker middlewares:
module ReEstablishDBConnection
def around_perform(job)
MyORM.establish_connection
super
end
end
To add them to a job class, you first have to make your job class
middleware-enabled by extending it with
Qless::Job::SupportsMiddleware
, then extend your middleware
modules:
class MyJobClass
extend Qless::Job::SupportsMiddleware
extend ReEstablishDBConnection
extend MyOtherAwesomeMiddleware
def self.perform(job)
end
end
Note that Qless::Job::SupportsMiddleware
must be extended onto your
job class before any other middleware modules.
Web Interface
Qless ships with a resque-inspired web app that lets you easily
deal with failures and see what it is processing. If you're project
has a rack-based ruby web app, we recommend you mount Qless's web app
in it. Here's how you can do that with Rack::Builder
in your config.ru
:
client = Qless::Client.new(:host => "some-host", :port => 7000)
Rack::Builder.new do
use SomeMiddleware
map('/some-other-app') { run Apps::Something.new }
map('/qless') { run Qless::Server.new(client) }
end
For an app using Rails 3+, check the router documentation for how to mount
rack apps.
If you wish to run the web interface from the exe
directory, you
have the option to run the server as a daemon. Running as a daemon is
default behavior. To run in the foreground, pass the --foreground
or
-F
flag:
PATH_TO_QLESS_DIST/exe/qless-web -F
Job Dependencies
Let's say you have one job that depends on another, but the task definitions are
fundamentally different. You need to bake a turkey, and you need to make stuffing,
but you can't make the turkey until the stuffing is made:
queue = client.queues['cook']
stuffing_jid = queue.put(MakeStuffing, {:lots => 'of butter'})
turkey_jid = queue.put(MakeTurkey , {:with => 'stuffing'}, :depends=>[stuffing_jid])
When the stuffing job completes, the turkey job is unlocked and free to be processed.
Priority
Some jobs need to get popped sooner than others. Whether it's a trouble ticket, or
debugging, you can do this pretty easily when you put a job in a queue:
queue.put(MyJobClass, {:foo => 'bar'}, :priority => 10)
What happens when you want to adjust a job's priority while it's still waiting in
a queue?
job = client.jobs['0c53b0404c56012f69fa482a1427ab7d']
job.priority = 10
Scheduled Jobs
If you don't want a job to be run right away but some time in the future, you can
specify a delay:
queue.put(MyJobClass, {:foo => 'bar'}, :delay => 600)
This doesn't guarantee that job will be run exactly at 10 minutes. You can accomplish
this by changing the job's priority so that once 10 minutes has elapsed, it's put before
lesser-priority jobs:
queue.put(MyJobClass, {:foo => 'bar'}, :delay => 600, :priority => 100)
Recurring Jobs
Sometimes it's not enough simply to schedule one job, but you want to run jobs regularly.
In particular, maybe you have some batch operation that needs to get run once an hour and
you don't care what worker runs it. Recurring jobs are specified much like other jobs:
queue.recur(MyJobClass, {:widget => 'warble'}, 3600)
You can even access them in much the same way as you would normal jobs:
job = client.jobs['22ac75008a8011e182b24cf9ab3a8f3b']
Changing the interval at which it runs after the fact is trivial:
job.interval = 7200
If you want it to run every hour on the hour, but it's 2:37 right now, you can specify
an offset which is how long it should wait before popping the first job:
queue.recur(MyJobClass, {:howdy => 'hello'}, 3600, :offset => 23 * 60)
Recurring jobs also have priority, a configurable number of retries, and tags. These
settings don't apply to the recurring jobs, but rather the jobs that they create. In the
case where more than one interval passes before a worker tries to pop the job, more than
one job is created. The thinking is that while it's completely client-managed, the state
should not be dependent on how often workers are trying to pop jobs.
queue.recur(MyJobClass, {:lots => 'of jobs'}, 60)
queue.pop(10).length
Configuration Options
You can get and set global (read: in the context of the same Redis instance) configuration
to change the behavior for heartbeating, and so forth. There aren't a tremendous number
of configuration options, but an important one is how long job data is kept around. Job
data is expired after it has been completed for jobs-history
seconds, but is limited to
the last jobs-history-count
completed jobs. These default to 50k jobs, and 30 days, but
depending on volume, your needs may change. To only keep the last 500 jobs for up to 7 days:
client.config['jobs-history'] = 7 * 86400
client.config['jobs-history-count'] = 500
Tagging / Tracking
In qless, 'tracking' means flagging a job as important. Tracked jobs have a tab reserved
for them in the web interface, and they also emit subscribable events as they make progress
(more on that below). You can flag a job from the web interface, or the corresponding code:
client.jobs['b1882e009a3d11e192d0b174d751779d'].track
Jobs can be tagged with strings which are indexed for quick searches. For example, jobs
might be associated with customer accounts, or some other key that makes sense for your
project.
queue.put(MyJobClass, {:tags => 'aplenty'}, :tags => ['12345', 'foo', 'bar'])
This makes them searchable in the web interface, or from code:
jids = client.jobs.tagged('foo')
You can add or remove tags at will, too:
job = client.jobs['b1882e009a3d11e192d0b174d751779d']
job.tag('howdy', 'hello')
job.untag('foo', 'bar')
Notifications
Tracked jobs emit events on specific pubsub channels as things happen to them. Whether
it's getting popped off of a queue, completed by a worker, etc. A good example of how
to make use of this is in the qless-campfire
or qless-growl
. The jist of it goes like
this, though:
client.events.listen do |on|
on.canceled { |jid| puts "#{jid} canceled" }
on.stalled { |jid| puts "#{jid} stalled" }
on.track { |jid| puts "tracking #{jid}" }
on.untrack { |jid| puts "untracking #{jid}" }
on.completed { |jid| puts "#{jid} completed" }
on.failed { |jid| puts "#{jid} failed" }
on.popped { |jid| puts "#{jid} popped" }
on.put { |jid| puts "#{jid} put" }
end
Those familiar with redis pubsub will note that a redis connection can only be used
for pubsub-y commands once listening. For this reason, invoking client.events
actually
creates a second connection so that client
can still be used as it normally would be:
client.events do |on|
on.failed do |jid|
puts "#{jid} failed in #{client.jobs[jid].queue_name}"
end
end
Heartbeating
When a worker is given a job, it is given an exclusive lock to that job. That means
that job won't be given to any other worker, so long as the worker checks in with
progress on the job. By default, jobs have to either report back progress every 60
seconds, or complete it, but that's a configurable option. For longer jobs, this
may not make sense.
job = queue.pop
job.ttl
job.heartbeat
job.complete
If you want to increase the heartbeat in all queues,
client.config['heartbeat'] = 600
client.queues['testing'].heartbeat = 300
When choosing a heartbeat interval, realize that this is the amount of time that
can pass before qless realizes if a job has been dropped. At the same time, you don't
want to burden qless with heartbeating every 10 seconds if your job is expected to
take several hours.
An idiom you're encouraged to use for long-running jobs that want to check in their
progress periodically:
if (job.ttl < 300) && !job.heartbeat
return / die / exit
end
Stats
One nice feature of qless
is that you can get statistics about usage. Stats are
aggregated by day, so when you want stats about a queue, you need to say what queue
and what day you're talking about. By default, you just get the stats for today.
These stats include information about the mean job wait time, standard deviation,
and histogram. This same data is also provided for job completion:
stats = client.stats.get('testing')
Time
It's important to note that Redis doesn't allow access to the system time if you're
going to be making any manipulations to data (which our scripts do). And yet, we
have heartbeating. This means that the clients actually send the current time when
making most requests, and for consistency's sake, means that your workers must be
relatively synchronized. This doesn't mean down to the tens of milliseconds, but if
you're experiencing appreciable clock drift, you should investigate NTP. For what it's
worth, this hasn't been a problem for us, but most of our jobs have heartbeat intervals
of 30 minutes or more.
Ensuring Job Uniqueness
As mentioned above, Jobs are uniquely identied by an id--their jid.
Qless will generate a UUID for each enqueued job or you can specify
one manually:
queue.put(MyJobClass, { :hello => 'howdy' }, :jid => 'my-job-jid')
This can be useful when you want to ensure a job's uniqueness: simply
create a jid that is a function of the Job's class and data, it'll
guaranteed that Qless won't have multiple jobs with the same class
and data.
Setting Default Job Options
Qless::Queue#put
accepts a number of job options (see above for their
semantics):
- jid
- delay
- priority
- tags
- retries
- depends
When enqueueing the same kind of job with the same args in multiple
places it's a pain to have to declare the job options every time.
Instead, you can define default job options directly on the job class:
class MyJobClass
def self.default_job_options(data)
{ :priority => 10, :delay => 100 }
end
end
queue.put(MyJobClass, { :some => "data" }, :delay => 10)
Individual jobs can still specify options, so in this example,
the job would be enqueued with a priority of 10 and a delay of 10.
Testing Jobs
When unit testing your jobs, you will probably want to avoid the
overhead of round-tripping them through redis. You can of course
use a mock job object and pass it to your job class's perform
method. Alternately, if you want a real full-fledged Qless::Job
instance without round-tripping it through Redis, use Qless::Job.build
:
describe MyJobClass do
let(:client) { Qless::Client.new }
let(:job) { Qless::Job.build(client, MyJobClass, :data => { "some" => "data" }) }
it 'does something' do
MyJobClass.perform(job)
end
end
The options hash passed to Qless::Job.build
supports all the same
options a normal job supports. See
the source
for a full list.
Contributing
To bootstrap an environment, first have a redis.
Have rvm
or rbenv
. Then to install the dependencies:
rbenv install
bundle install
./exe/install_phantomjs
rbenv rehash
git submodule init
git submodule update
bundle exec rake core:build
To run the tests:
bundle exec rake spec
The locally installed redis will be flushed before and after each test run.
To change the redis instance used in tests, put the connection information into ./spec/redis.config.yml
.
To help develop the web UI, run bundle exec ./utils/dev/qless-web-dev
to run the server with seed data.
To contribute, fork the repo, use feature branches, run the tests and open PRs.
Mailing List
For questions and general Qless discussion, please join the Qless
Mailing list.
Release Notes
0.12.0
The metric failures
provided by qless-stats
has been replaced by failed
for
compatibility with users of graphite
. See #275
for more details.