
Security News
MCP Community Begins Work on Official MCP Metaregistry
The MCP community is launching an official registry to standardize AI tool discovery and let agents dynamically find and install MCP servers.
A general purpose Task and TaskQueue for running tasks with dependencies and failure/retry, potentially in parallel.
A general purpose Task and TaskQueue for running tasks with dependencies and failure/retry, potentially in parallel.
Latest release 20250120: BaseTask.init: accept the state as positional or keyword.
BaseTask(cs.fsm.FSM, cs.resources.RunStateMixin)
A base class subclassing cs.fsm.FSM
with a RunStateMixin
.
Note that this class and the FSM
base class does not provide
a FSM_DEFAULT_STATE
attribute; a default state
value of
None
will leave .fsm_state
unset.
This behaviour is is chosen mostly to support subclasses
with unusual behaviour, particularly Django's Model
class
whose refresh_from_db
method seems to not refresh fields
which already exist, and setting .fsm_state
from a
FSM_DEFAULT_STATE
class attribute thus breaks this method.
Subclasses of this class and Model
should not provide a
FSM_DEFAULT_STATE
attribute, instead relying on the field
definition to provide this default in the usual way.
BaseTask.as_dot(self, name=None, **kw)
:
Return a DOT syntax digraph starting at this Task
.
Parameters are as for Task.tasks_as_dot
.
BaseTask.dot_node_label(self)
:
The default DOT node label.
BaseTask.tasks_as_dot(tasks, name=None, *, follow_blocking=False, sep=None)
:
Return a DOT syntax digraph of the iterable tasks
.
Nodes will be coloured according to DOT_NODE_FILLCOLOR_PALETTE
based on their state.
Parameters:
* `tasks`: an iterable of `Task`s to populate the graph
* `name`: optional graph name
* `follow_blocking`: optional flag to follow each `Task`'s
`.blocking` attribute recursively and also render those
`Task`s
* `sep`: optional node seprator, default `'
'`
BaseTask.tasks_as_svg(tasks, name=None, **kw)
:
Return an SVG diagram of the iterable tasks
.
This takes the same parameters as tasks_as_dot
.
BlockedError(TaskError)
Raised by a blocked Task
if attempted.
main(argv)
Dummy main programme to exercise something.
make(*tasks, fail_fast=False, queue=None)
Generator which completes all the supplied tasks
by dispatching them
once they are no longer blocked.
Yield each task from tasks
as it completes (or becomes cancelled).
Parameters:
tasks
: Task
s as positional parametersfail_fast
: default False
; if true, cease evaluation as soon as a
task completes in a state with is not DONE
queue
: optional callable to submit a task for execution later
via some queue such as Later
or celeryThe following rules are applied by this function:
FSMError
fail_fast
and the task is not done, returnExamples:
>>> t1 = Task('t1', lambda: print('doing t1'), track=True)
>>> t2 = t1.then('t2', lambda: print('doing t2'), track=True)
>>> list(make(t2)) # doctest: +ELLIPSIS
t1 PENDING->dispatch->RUNNING
doing t1
t1 RUNNING->done->DONE
t2 PENDING->dispatch->RUNNING
doing t2
t2 RUNNING->done->DONE
[Task('t2',<function <lambda> at ...>,state='DONE')]
make_later(L, *tasks, fail_fast=False)
Dispatch the tasks
via L:Later
for asynchronous execution
if it is not already completed.
The caller can wait on t.result
for completion.
This calls make_now()
in a thread and uses L.defer
to
queue the task and its prerequisites for execution.
make_now(*tasks, fail_fast=False, queue=None)
Run the generator make(*tasks)
to completion and return the
list of completed tasks.
Task(BaseTask, cs.threads.HasThreadState)
A task which may require the completion of other tasks.
The model here may not be quite as expected; it is aimed at
tasks which can be repaired and rerun.
As such, if self.run(func,...)
raises an exception from
func
then this Task
will still block dependent Task
s.
Dually, a Task
which completes without an exception is
considered complete and does not block dependent Task
s.
Keyword parameters:
cancel_on_exception
: if true, cancel this Task
if .run
raises an exception; the default is False
, allowing repair
and retrycancel_on_result
: optional callable to test the Task.result
after .run
; if the callable returns True
the Task
is marked
as cancelled, allowing repair and retryfunc
: the function to call to complete the Task
;
it will be called as func(*func_args,**func_kwargs)
func_args
: optional positional arguments, default ()
func_kwargs
: optional keyword arguments, default {}
lock
: optional lock, default an RLock
state
: initial state, default from self._state.initial_state
,
which is initally 'PENDING
'track
: default False
;
if True
then apply a callback for all states to print task transitions;
otherwise it should be a callback function suitable for FSM.fsm_callback
Other arguments are passed to the Result
initialiser.Example:
t1 = Task(name="task1")
t1.bg(time.sleep, 10)
t2 = Task(name="task2")
# prevent t2 from running until t1 completes
t2.require(t1)
# try to run sleep(5) for t2 immediately after t1 completes
t1.notify(t2.call, sleep, 5)
Users wanting more immediate semantics can supply
cancel_on_exception
and/or cancel_on_result
to control
these behaviours.
Example:
t1 = Task(name="task1")
t1.bg(time.sleep, 2)
t2 = Task(name="task2")
# prevent t2 from running until t1 completes
t2.require(t1)
# try to run sleep(5) for t2 immediately after t1 completes
t1.notify(t2.call, sleep, 5)
Task State Diagram
PREPARE
PREPARE
PENDING
PENDING
PREPARE->PENDING
prepared
ABORT
ABORT
PENDING->ABORT
abort
CANCELLED
CANCELLED
PENDING->CANCELLED
cancel
RUNNING
RUNNING
PENDING->RUNNING
dispatch
FAILED
FAILED
PENDING->FAILED
error
QUEUED
QUEUED
PENDING->QUEUED
queue
CANCELLED->PENDING
retry
CANCELLED->ABORT
abort
RUNNING->CANCELLED
cancel
RUNNING->FAILED
except
DONE
DONE
RUNNING->DONE
done
FAILED->PENDING
retry
FAILED->ABORT
abort
QUEUED->CANCELLED
cancel
QUEUED->RUNNING
dispatch
Task State Diagram
Task.__call__(self)
:
Block on self.result
awaiting completion
by calling self.result()
.
Task.bg(self)
:
Dispatch a function to complete the Task
in a separate Thread
,
returning the Thread
.
This raises BlockedError
for a blocked task.
otherwise the thread runs self.dispatch()
.
Task.block(self, otask)
:
Block another task until we are complete.
The converse of .require()
.
Task.blockers(self)
:
A generator yielding tasks from self.required
which should block this task.
Aborted tasks are not blockers
but if we encounter one we do abort the current task.
Task.cancel(self)
:
Transition this Task
to CANCELLED
state.
If the task is running, set .cancelled
on the RunState
,
allowing clean task cancellation and subsequent transition
(mediated by the .run()
method).
Otherwise fire the 'cancel'
event directly.
Task.dispatch(self)
:
Dispatch the Task
:
If the task is blocked, raise BlockedError
.
If a prerequisite is aborted, fire the 'abort' method.
Otherwise fire the 'dispatch'
event and then run the
task's function via the .run()
method.
Task.isblocked(self)
:
A task is blocked if any prerequisite is not complete.
Task.iscompleted(self)
:
This task is completed (even if failed) and does not block contingent tasks.
Task.join(self)
:
Wait for this task to complete.
Task.make(self, fail_fast=False)
:
Complete self
and its prerequisites.
This calls the global make()
function with self
.
It returns a Boolean indicating whether this task completed.
Task.perthread_state
Task.require(self, otask: 'TaskSubType')
:
Add a requirement that otask
be complete before we proceed.
This is the simple Task
only version of .then()
.
Task.run(self)
:
Run the function associated with this task,
completing the self.result
Result
appropriately when finished.
WARNING: this ignores the current state and any blocking Task
s.
You should usually use dispatch
or make
.
During the run the thread local Task.default()
will be self
and the self.runstate
will be running.
Otherwise run func_result=self.func(*self.func_args,**self.func_kwargs)
with the following effects:
CancellationError
, cancel the Task
self.cancel_on_exception
then cancel the task
else complete self.result
with the exception
and fire the 'error'
`eventself.runstate.canceled
or self.cancel_on_result
was provided and self.cancel_on_result(func_result)
is
true, cancel the taskself.result
with func_result
and fire the 'done'
eventTask.then(self, func: Union[str, Callable, ForwardRef('TaskSubType')], *a, func_args=(), func_kwargs=None, **task_kw)
:
Prepare a new Task
or function which may not run before self
completes.
This may be called in two ways:
task.then(some_Task): block the
Taskinstance
some_Taskbehind
self`task.then([name,]func[,func_args=][,func_kwargs=][,Task_kwargs...]): make a new
Taskto be blocked behind
selfReturn the new
Task`.This supports preparing a chain of actions:
>>> t_root = Task("t_root", lambda: 0)
>>> t_leaf = t_root.then(lambda: 1).then(lambda: 2)
>>> t_root.iscompleted() # the root task has not yet run
False
>>> t_leaf.iscompleted() # the final task has not yet run
False
>>> # t_leaf is blocked by t_root
>>> t_leaf.dispatch() # doctest: +ELLIPSIS
Traceback (most recent call last):
...
cs.taskqueue.BlockedError: ...
>>> t_leaf.make() # make the leaf, but make t_root first
True
>>> t_root.iscompleted() # implicitly completed by make
True
>>> t_leaf.iscompleted()
True
TaskError(cs.fsm.FSMError)
Raised by Task
related errors.
TaskQueue
A task queue for managing and running a set of related tasks.
Unlike make
and Task.make
, this is aimed at a "dispatch" worker
which dispatches individual tasks as required.
Example 1, put 2 dependent tasks in a queue and run:
>>> t1 = Task("t1", lambda: print("t1"))
>>> t2 = t1.then("t2", lambda: print("t2"))
>>> q = TaskQueue(t1, t2)
>>> for _ in q.run(): pass
...
t1
t2
Example 2, put 1 task in a queue and run. The queue only runs the specified tasks:
>>> t1 = Task("t1", lambda: print("t1"))
>>> t2 = t1.then("t2", lambda: print("t2"))
>>> q = TaskQueue(t1)
>>> for _ in q.run(): pass
...
t1
Example 2, put 1 task in a queue with run_dependent_tasks=True
and run.
The queue pulls in the dependencies of completed tasks and also runs those:
>>> t1 = Task("t1", lambda: print("t1"))
>>> t2 = t1.then("t2", lambda: print("t2"))
>>> q = TaskQueue(t1, run_dependent_tasks=True)
>>> for _ in q.run(): pass
...
t1
t2
TaskQueue.__init__(self, *tasks, run_dependent_tasks=False)
:
Initialise the queue with the supplied tasks
.
TaskQueue.add(self, task)
:
Add a task to the tasks managed by this queue.
TaskQueue.as_dot(self, name=None, **kw)
:
Compute a DOT syntax graph description of the tasks in the queue.
TaskQueue.get(self)
:
Pull a completed or an unblocked pending task from the queue.
Return the task or None
if nothing is available.
The returned task is no longer tracked by this queue.
TaskQueue.run(self, runstate=None, once=False)
:
Process tasks in the queue until the queue has no completed tasks,
yielding each task, immediately if task.iscompleted()
otherwise after taks.dispatch()
.
An optional RunState
may be provided to allow early termination
via runstate.cancel()
.
An incomplete task is dispatch
ed before yield
;
ideally it will be complete when the yield happens,
but its semantics might mean it is in another state such as CANCELLED
.
The consumer of run
must handle these situations.
Release 20250120: BaseTask.init: accept the state as positional or keyword.
Release 20240423: Small fixes.
Release 20230401: Add missing requirement to DISTINFO.
Release 20230331:
Release 20230217: Task: subclass HasThreadState, drop .current_task() class method.
Release 20221207:
Release 20220805: Initial PyPI release.
FAQs
A general purpose Task and TaskQueue for running tasks with dependencies and failure/retry, potentially in parallel.
We found that cs-taskqueue demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Security News
The MCP community is launching an official registry to standardize AI tool discovery and let agents dynamically find and install MCP servers.
Research
Security News
Socket uncovers an npm Trojan stealing crypto wallets and BullX credentials via obfuscated code and Telegram exfiltration.
Research
Security News
Malicious npm packages posing as developer tools target macOS Cursor IDE users, stealing credentials and modifying files to gain persistent backdoor access.