delayed

Delayed is a simple but robust task queue inspired by rq.
Features
- Robust: all the enqueued tasks will run exactly once, even if the worker got killed at any time.
- Clean: finished tasks (including failed) take no space of your Redis.
- Distributed: workers as more as needed can run in the same time without further config.
- Portable: its Go and Python version can call each other.
Requirements
- Python 3.7 or later, tested on CPython 3.7 - 3.12. Versions before 1.0 have also been tested on CPython 2.7, PyPy and PyPy3.
- To gracefully stop the workers, Unix-like systems (with Unix signal) are required, tested on Ubuntu 22.04 and macOS Monterey 12.
- Redis 2.6.0 or later (with Lua scripts).
Getting started
-
Run a redis server:
$ redis-server
-
Install delayed:
$ pip install delayed
-
Create a task queue:
import redis
from delayed.queue import Queue
conn = redis.Redis()
queue = Queue(name='default', conn=conn)
-
Enqueue tasks:
-
Four ways to enqueue Python tasks:
-
Define a task function and enqueue it:
from delayed.delay import delayed
delayed = delayed(queue)
i = 0
@delayed
def delayed_add(a, b):
return a + b
@delayed(retry=3)
def retry_div(x):
global i
i += 1
return x / (i - 1)
delayed_add.delay(1, 2)
delayed_add.delay(1, b=2)
delayed_add(1, 2)
retry_div.delay(1)
-
Directly enqueue a function:
from delayed.delay import delayed
delayed = delayed(queue)
def add(a, b):
return a + b
delayed(add).delay(1, 2)
delayed(add).delay(1, b=2)
delayed(retry=3)(add).delay(1, b=2)
delayed(add, retry=3).delay(1, b=2)
-
Create a task and enqueue it:
from delayed.task import PyTask
def add(a, b):
return a + b
task = PyTask(func=add, args=(1,), kwargs={'b': 2}, retry=1)
queue.enqueue(task)
-
Enqueue a predefined task function without importing it (the fastest and lightest way):
from delayed.task import PyTask
task = PyTask(func='test:add', args=(1,), kwargs={'b': 2}, retry=1)
queue.enqueue(task)
-
Enqueue Go tasks:
from delayed.task import GoTask
task = GoTask(func_path='syscall.Kill', args=(0, 1))
queue.enqueue(task)
task = GoTask(func_path='fmt.Printf', args=('%d %s\n', [1, 'test']))
queue.enqueue(task)
task = GoTask('fmt.Println', (1, 'test'))
queue.enqueue(task)
-
Run a task worker (or more) in a separated process:
import redis
from delayed.queue import Queue
from delayed.worker import Worker
conn = redis.Redis()
queue = Queue(name='default', conn=conn)
worker = Worker(queue=queue)
worker.run()
-
Run a task sweeper in a separated process to recovery lost tasks (mainly due to the worker got killed):
import redis
from delayed.queue import Queue
from delayed.sweeper import Sweeper
conn = redis.Redis()
queue = Queue(name='default', conn=conn)
sweeper = Sweeper(queues=[queue])
sweeper.run()
Examples
See examples.
```bash
$ redis-server &
$ pip install delayed
$ python -m examples.sweeper &
$ python -m examples.worker &
$ python -m examples.caller
```
QA
-
Q: What's the limitation on a task function?
A: A Python task function should be defined in module level (except the __main__ module).
Its args and kwargs should be serializable by MessagePack.
After deserializing, the type of args and kwargs passed to the task function might be changed (tuple -> list), so it should take care of this change.
-
Q: What's the name param of a queue?
A: It's the key used to store the tasks of the queue. A queue with name "default" will use those keys:
- default: list, enqueued tasks.
- default_noti: list, the same length as enqueued tasks.
- default_processing: hash, the processing task of workers.
-
Q: What's lost tasks?
A: There are 2 situations a task might get lost:
- a worker popped a task notification, then got killed before dequeueing the task.
- a worker dequeued a task, then got killed before releasing the task.
-
Q: How to recovery lost tasks?
A: Runs a sweeper. It dose two things:
- it keeps the task notification length the same as the task queue.
- it checks the processing list, if the worker is dead, moves the processing task back to the task queue.
-
Q: How to turn on the debug logs?
A: Adds a logging.DEBUG level handler to delayed.logger.logger. The simplest way is to call delayed.logger.setup_logger():
from delayed.logger import setup_logger
setup_logger()
Release notes
-
1.2:
- Adds
retry param to functions wrapped by delayed.delay().
- Adds
retry param to Task().
- Adds
release param to Queue.enqueue().
- The
Worker won't retry a failed task infinitely by default now. You can set retry=-1 to Task() instead. (BREAKING CHANGE)
-
1.1:
- Adds
log_level param to delayed.logger.setup_logger().
- Prevents different online workers have the same id.
-
1.0:
- Python 2.7 is not supported anymore. (BREAKING CHANGE)
- Supports Go, adds
GoTask.
- Use MessagePack instead of pickle to serialize / deserialize tasks. (BREAKING CHANGE)
- Removes
ForkedWorker and PreforkedWorker. You can use Worker instead. (BREAKING CHANGE)
- Changes params of
Queue(), removes default_timeout, requeue_timeout and busy_len, adds dequeue_timeout and keep_alive_timeout. (BREAKING CHANGE)
- Rename
Task to PyTask. (BREAKING CHANGE)
- Removes those properties of
PyTask: id, func_path, args and kwargs. (BREAKING CHANGE)
- Removes those params of
PyTask(): id, timeout, prior and error_handler_path. (BREAKING CHANGE)
- Removes
PyTask.create(). You can use PyTask() instead. (BREAKING CHANGE)
- Rename
func_path param of PyTask() to func, it accepts both callable and str. (BREAKING CHANGE)
- Removes
delayed.delay(). Removes params of delayed.delayed(). (BREAKING CHANGE)
-
0.11:
- Sleeps random time when a
Worker fails to pop a task before retrying.
-
0.10:
- The
Sweeper can handle multiple queues now. Its queue param has been changed to queues. (BREAKING CHANGE)
- Changes the separator between
module_path and func_name from . to :. (BREAKING CHANGE)
-
0.9:
- Adds
prior and error_handler params to deleyed.delayed(), removes its timeout() method. (BREAKING CHANGE)
- Adds examples.
-
0.8:
- The
Task struct has been changed, it's not compatible with older versions. (BREAKING CHANGE)
- Removes
module_name and func_name from Task, adds func_path instead.
- Adds
error_handler_path to Task.
- Removes
success_handler and error_handler from Worker. (BREAKING CHANGE)
-
0.7:
-
0.6:
- Adds
dequeued_len() and index to Queue.
-
0.5:
- Adds
delayed.task.set_pickle_protocol_version().
-
0.4:
- Refactories and fixes bugs.
-
0.3:
- Changes param
second to timeout for delayed.delayed(). (BREAKING CHANGE)
- Adds debug log.
-
0.2:
- Adds
timeout() to delayed.delayed().
-
0.1: