Security News
Fluent Assertions Faces Backlash After Abandoning Open Source Licensing
Fluent Assertions is facing backlash after dropping the Apache license for a commercial model, leaving users blindsided and questioning contributor rights.
Delayed is a simple but robust task queue inspired by rq.
Run a redis server:
$ redis-server
Install delayed:
$ pip install delayed
Create a task queue:
import redis
from delayed.queue import Queue
conn = redis.Redis()
queue = Queue(name='default', conn=conn)
Enqueue tasks:
Four ways to enqueue Python tasks:
Define a task function and enqueue it:
from delayed.delay import delayed
delayed = delayed(queue)
i = 0
@delayed
def delayed_add(a, b):
return a + b
@delayed(retry=3)
def retry_div(x):
global i
i += 1
return x / (i - 1)
delayed_add.delay(1, 2) # enqueue delayed_add
delayed_add.delay(1, b=2) # same as above
delayed_add(1, 2) # call it immediately
retry_div.delay(1) # enqueue retry_div
Directly enqueue a function:
from delayed.delay import delayed
delayed = delayed(queue)
def add(a, b):
return a + b
delayed(add).delay(1, 2)
delayed(add).delay(1, b=2) # same as above
delayed(retry=3)(add).delay(1, b=2)
delayed(add, retry=3).delay(1, b=2) # same as above
Create a task and enqueue it:
from delayed.task import PyTask
def add(a, b):
return a + b
task = PyTask(func=add, args=(1,), kwargs={'b': 2}, retry=1)
queue.enqueue(task)
Enqueue a predefined task function without importing it (the fastest and lightest way):
from delayed.task import PyTask
task = PyTask(func='test:add', args=(1,), kwargs={'b': 2}, retry=1)
queue.enqueue(task)
Enqueue Go tasks:
from delayed.task import GoTask
task = GoTask(func_path='syscall.Kill', args=(0, 1))
queue.enqueue(task)
task = GoTask(func_path='fmt.Printf', args=('%d %s\n', [1, 'test'])) # the variadic argument needs to be a list or tuple
queue.enqueue(task)
task = GoTask('fmt.Println', (1, 'test')) # if the variadic argument is the only argument, it's not required to wrap it with a list or tuple
queue.enqueue(task)
Run a task worker (or more) in a separated process:
import redis
from delayed.queue import Queue
from delayed.worker import Worker
conn = redis.Redis()
queue = Queue(name='default', conn=conn)
worker = Worker(queue=queue)
worker.run()
Run a task sweeper in a separated process to recovery lost tasks (mainly due to the worker got killed):
import redis
from delayed.queue import Queue
from delayed.sweeper import Sweeper
conn = redis.Redis()
queue = Queue(name='default', conn=conn)
sweeper = Sweeper(queues=[queue])
sweeper.run()
See examples.
```bash
$ redis-server &
$ pip install delayed
$ python -m examples.sweeper &
$ python -m examples.worker &
$ python -m examples.caller
```
Q: What's the limitation on a task function?
A: A Python task function should be defined in module level (except the __main__
module).
Its args
and kwargs
should be serializable by MessagePack.
After deserializing, the type of args
and kwargs
passed to the task function might be changed (tuple -> list), so it should take care of this change.
Q: What's the name
param of a queue?
A: It's the key used to store the tasks of the queue. A queue with name "default" will use those keys:
Q: What's lost tasks?
A: There are 2 situations a task might get lost:
Q: How to recovery lost tasks?
A: Runs a sweeper. It dose two things:
Q: How to turn on the debug logs?
A: Adds a logging.DEBUG
level handler to delayed.logger.logger
. The simplest way is to call delayed.logger.setup_logger()
:
from delayed.logger import setup_logger
setup_logger()
1.2:
retry
param to functions wrapped by delayed.delay()
.retry
param to Task()
.release
param to Queue.enqueue()
.Worker
won't retry a failed task infinitely by default now. You can set retry=-1
to Task()
instead. (BREAKING CHANGE)1.1:
log_level
param to delayed.logger.setup_logger()
.1.0:
GoTask
.ForkedWorker
and PreforkedWorker
. You can use Worker
instead. (BREAKING CHANGE)Queue()
, removes default_timeout
, requeue_timeout
and busy_len
, adds dequeue_timeout
and keep_alive_timeout
. (BREAKING CHANGE)Task
to PyTask
. (BREAKING CHANGE)PyTask
: id
, func_path
, args
and kwargs
. (BREAKING CHANGE)PyTask()
: id
, timeout
, prior
and error_handler_path
. (BREAKING CHANGE)PyTask.create()
. You can use PyTask()
instead. (BREAKING CHANGE)func_path
param of PyTask()
to func
, it accepts both callable
and str
. (BREAKING CHANGE)delayed.delay()
. Removes params of delayed.delayed()
. (BREAKING CHANGE)0.11:
Worker
fails to pop a task
before retrying.0.10:
Sweeper
can handle multiple queues now. Its queue
param has been changed to queues
. (BREAKING CHANGE)module_path
and func_name
from .
to :
. (BREAKING CHANGE)0.9:
prior
and error_handler
params to deleyed.delayed()
, removes its timeout()
method. (BREAKING CHANGE)0.8:
Task
struct has been changed, it's not compatible with older versions. (BREAKING CHANGE)
module_name
and func_name
from Task
, adds func_path
instead.error_handler_path
to Task
.success_handler
and error_handler
from Worker
. (BREAKING CHANGE)0.7:
0.6:
dequeued_len()
and index
to Queue
.0.5:
delayed.task.set_pickle_protocol_version()
.0.4:
0.3:
second
to timeout
for delayed.delayed()
. (BREAKING CHANGE)0.2:
timeout()
to delayed.delayed()
.0.1:
FAQs
a simple but robust task queue
We found that delayed demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Security News
Fluent Assertions is facing backlash after dropping the Apache license for a commercial model, leaving users blindsided and questioning contributor rights.
Research
Security News
Socket researchers uncover the risks of a malicious Python package targeting Discord developers.
Security News
The UK is proposing a bold ban on ransomware payments by public entities to disrupt cybercrime, protect critical services, and lead global cybersecurity efforts.