aiotools
Idiomatic asyncio utilties
NOTE: This project is under early stage of development. The public APIs may break version by version.
Modules
I also recommend to try the following asyncio libraries for your happier life.
- async_timeout: Provides a light-weight timeout wrapper that does not spawn subtasks.
- aiojobs: Provides a concurrency-limited scheduler for asyncio tasks with graceful shutdown.
- trio: An alternative implementation of asynchronous IO stack for Python, with focus on cancellation scopes and task groups called "nursery".
Examples
Async Context Manager
This is an asynchronous version of contextlib.contextmanager
to make it
easier to write asynchronous context managers without creating boilerplate
classes.
import asyncio
import aiotools
@aiotools.actxmgr
async def mygen(a):
await asyncio.sleep(1)
yield a + 1
await asyncio.sleep(1)
async def somewhere():
async with mygen(1) as b:
assert b == 2
Note that you need to wrap yield
with a try-finally block to
ensure resource releases (e.g., locks), even in the case when
an exception is ocurred inside the async-with block.
import asyncio
import aiotools
lock = asyncio.Lock()
@aiotools.actxmgr
async def mygen(a):
await lock.acquire()
try:
yield a + 1
finally:
lock.release()
async def somewhere():
try:
async with mygen(1) as b:
raise RuntimeError('oops')
except RuntimeError:
print('caught!')
You can also create a group of async context managers, which
are entered/exited all at once using asyncio.gather()
.
import asyncio
import aiotools
@aiotools.actxmgr
async def mygen(a):
yield a + 10
async def somewhere():
ctxgrp = aiotools.actxgroup(mygen(i) for i in range(10))
async with ctxgrp as values:
assert len(values) == 10
for i in range(10):
assert values[i] == i + 10
Async Server
This implements a common pattern to launch asyncio-based server daemons.
import asyncio
import aiotools
async def echo(reader, writer):
data = await reader.read(100)
writer.write(data)
await writer.drain()
writer.close()
@aiotools.server
async def myworker(loop, pidx, args):
server = await asyncio.start_server(echo, '0.0.0.0', 8888,
reuse_port=True, loop=loop)
print(f'[{pidx}] started')
yield
server.close()
await server.wait_closed()
print(f'[{pidx}] terminated')
if __name__ == '__main__':
aiotools.start_server(myworker, num_workers=4)
It handles SIGINT/SIGTERM signals automatically to stop the server,
as well as lifecycle management of event loops running on multiple processes.
Internally it uses aiotools.fork
module to get kernel support to resolve
potential signal/PID related races via PID file descriptors on supported versions
(Python 3.9+ and Linux kernel 5.4+).
Async TaskGroup
A TaskGroup
object manages the lifecycle of sub-tasks spawned via its create_task()
method by guarding them with an async context manager which exits only when all sub-tasks
are either completed or cancelled.
This is motivated from trio's nursery API
and a draft implementation is adopted from EdgeDB's Python client library.
import aiotools
async def do():
async with aiotools.TaskGroup() as tg:
tg.create_task(...)
tg.create_task(...)
...
Async Timer
import aiotools
i = 0
async def mytick(interval):
print(i)
i += 1
async def somewhere():
t = aiotools.create_timer(mytick, 1.0)
...
t.cancel()
await t
t
is an asyncio.Task
object.
To stop the timer, call t.cancel(); await t
.
Please don't forget await
-ing t
because it requires extra steps to
cancel and await all pending tasks.
To make your timer function to be cancellable, add a try-except clause
catching asyncio.CancelledError
since we use it as a termination
signal.
You may add TimerDelayPolicy
argument to control the behavior when the
timer-fired task takes longer than the timer interval.
DEFAULT
is to accumulate them and cancel all the remainings at once when
the timer is cancelled.
CANCEL
is to cancel any pending previously fired tasks on every interval.
import asyncio
import aiotools
async def mytick(interval):
await asyncio.sleep(100)
async def somewhere():
t = aiotools.create_timer(mytick, 1.0, aiotools.TimerDelayPolicy.CANCEL)
...
t.cancel()
await t
Virtual Clock
It provides a virtual clock that advances the event loop time instantly upon
any combination of asyncio.sleep()
calls in multiple coroutine tasks,
by temporarily patching the event loop selector.
This is also used in our timer test suite.
import aiotools
import pytest
@pytest.mark.asyncio
async def test_sleeps():
loop = aiotools.compat.get_running_loop()
vclock = aiotools.VirtualClock()
with vclock.patch_loop():
print(loop.time())
await asyncio.sleep(3600)
print(loop.time())
Changelog
1.8.2 (2025-01-23)
Features
- Update and modernize the type annotations of the
taskgroup
module for Python 3.9 or later (#73)
Fixes
- Track task references explicitly and discard them after their completion in taskgroups (#60)
1.8.1 (2025-01-22)
Fixes
- Fix static type checker's recognition of
aiotools.taskgroup
public exports, such as aiotools.PersistentTaskGroup
(#72)
1.8.0 (2025-01-18)
Breaking changes
- Dropped the support for Python 3.8 as it's end-of-life.
Features
- Add support for Python 3.12 and 3.13.
- Add
aiotools.context.resetting()
as a sync/async context manager to auto-reset the given context variable (#62) - Add type checker support - now includes py.typed in the package to indicate to type checkers like mypy that typing is supported. (#63)
1.7.0 (2023-08-25)
Breaking changes
- Dropped the support for Python 3.7 as it's end-of-life.
Fixes
- Correct the type annotation of the callback argument in
create_timer()
(#61)
1.6.1 (2023-05-02)
Fixes
- PersistentTaskGroup no longer stores the history of unhandled exceptions and raises them as an exception group to prevent memory leaks (#54)
1.6.0 (2023-03-14)
Features
- Add
as_completed_safe()
which enhances asyncio.as_completed()
using PersistentTaskGroup
(#52)
1.5.9 (2022-04-26)
Fixes
- Improve checks for pidfd availability to avoid corner cases that may fail on Linux kernel 5.1 and 5.2 where
signal.pidfd_send_signal()
is available but os.pidfd_open()
is not (#51)
1.5.8 (2022-04-25)
Fixes
- Explicitly attach the event loop to the
PidfdChildWatcher
when first initialized (#50)
1.5.7 (2022-04-12)
Fixes
- Fix regression of the default imports in macOS by removing the unused code that caused the misleading fix in #47 (#49)
1.5.6 (2022-04-11)
Features
- Add the
closing_async()
async context manager, in addition to aclosing()
(#48)
Fixes
- Allow importing aiotools on Windows platforms, removing incompatible modules from the default
__all__
import list (#47)
1.5.5 (2022-03-22)
Features
- Add
wait_timeout
option to start_server()
(#46)
Fixes
- Resolve singal races by minimizing expose of event loop in
afork()
-ed child processes (#46)
Miscellaneous
- Now the CI runs with Python 3.11a6 or later, with stdlib support of
asyncio.TaskGroup
(#45)
1.5.4 (2022-03-10)
Features
- Propagate task results and exceptions via separate future instances if they are
await
-ed by the caller of create_task()
in PersistentTaskGroup
, in addition to invocation of task group exception handler. Note that await
-ing those futures hangs indefinitely in Python 3.6 but we don't fix it since Python 3.6 is EoL as of December 2021. (#44)
1.5.3 (2022-03-07)
Fixes
- Fix feature detection for
ExceptionGroup
and let MultiError
inherit ExceptionGroup
instead of BaseExceptionGroup
(#42)
1.5.2 (2022-03-06)
Fixes
- Restore the default export of
MultiError
for backward compatibility (#40) - Set
current_ptaskgroup
only when PersistentTaskGroup
is used via the async with
statement. (#41)
1.5.1 (2022-03-06)
Fixes
- Fix missing naming support of
TaskGroup
in Python 3.11 (#39)
1.5.0 (2022-03-06)
Features
- Add support for Python 3.9's
msg
argument to Task.cancel()
. (#32) - Fix "unexpected cancel" bug in
TaskGroup
. (#35) - Rewrite PersistentTaskGroup to use Python 3.11's latest additions such as
Task.uncancel()
and Task.cancelling()
while still supporting older Python versions (#36) - Add
PersistentTaskGroup.all()
to enumerate all non-terminated persistent task groups (#38)
1.4.0 (2022-01-10)
Features
- ptaskgroup: Implement
PersistentTaskGroup
(#30) - server: Expose
process_index
context variable for worker processes (#31)
1.3.0 (2021-12-19)
Fixes
- Add support for Python 3.10. (#28)
Documentation Changes
- Fix documentation builds on Python 3.10 and Sphinx 4.x, by removing the 3rd-party autodoc-typehints extension and custom stylesheet overrides. (#28)
1.2.2 (2021-06-07)
Fixes
- fork: Handle children's segfault (core-dump with signals) explicitly in
PidfdChildProcess
(#27)
1.2.1 (2021-01-12)
Fixes
- Avoid side effects of custom
clone()
function and resorts back to the combinatino of os.fork()
and os.pidfd_open()
for now (#25)
1.2.0 (2021-01-12)
Breaking Changes
- server: The
use_threading
argument for start_server()
is completely deprecated. (#23)
Features
- Now the primary target is Python 3.9, though we still support from Python 3.6 (#22)
- fork: Add a new module
fork
to support PID file descriptors in Linux 5.4+ and a POSIX-compatible fallback to asynchornously fork the Python process without signal/PID races. (#22) - server: Completely rewrote the module using the new
fork
module with handling of various edge cases such as async failures of sibiling child processes (#23)
1.1.1 (2020-12-16)
Fixes
- Fix a potential memory leak with
TaskGroup
when it's used for long-lived asyncio tasks. (#21)
1.1.0 (2020-10-18)
Features
- Add a
current_taskgroup
context-variable to the taskgroup module (only available for Python 3.7 or later)
Fixes
- Fix missing auto-import of
taskgroup
module exports in the aiotools
root package.
1.0.0 (2020-10-18)
Features
- Adopt an implementation of the taskgroup API as
aiotools.taskgroup
from EdgeDB (#18) - Add
timer.VirtualClock
which provides a virtual clock that makes a block of asyncio codes using asyncio.sleep()
to complete instantly and deterministically (#19)
Miscellaneous
- Adopt towncrier for changelog management (#15)
- Migrate to GitHub Actions for CI (#19)
0.9.1 (2020-02-25)
- A maintenance release to fix up the
defer
module exports in the aiotools
namespace.
0.9.0 (2020-02-25)
- defer: A new module that emulates Golang's
defer()
API with asyncio awareness.
0.8.5 (2019-11-19)
- server: Rewrite internals of the worker main functions to use native
async with
instead of manually unrolling __aenter__()
and __aexit__()
dunder methods, to keep
the code simple and avoid potential bugs.
0.8.4 (2019-11-18)
- Python 3.8 is now officially supported.
- server: Fix errors when
multiprocessing.set_start_method("spawn")
is used.
- NOTE: This is now the default for macOS since Python 3.8.
- KNOWN ISSUE: #12
- Remove some packaging hacks in
__init__.py
and let setuptools read the version
from a separate aiotools/VERSION
text file.
0.8.3 (2019-10-07)
- context: Fix
aclosing()
's __aexit__()
exception arguments.
0.8.2 (2019-08-28)
- context, server: Catch asyncio.CancelledError along with BaseException to
make the cancellation behavior consistent in Python 3.6, 3.7, and 3.8.
0.8.1 (2019-02-24)
- server: Fix yields of the received stop signal in main/worker context managers
when using threaded workers.
0.8.0 (2018-11-18)
- server: Updated stop signal handling and now user-defined worker/main context
managers have a way to distinguish the stop signal received. See the updated
docs for more details.
0.7.3 (2018-10-16)
- This ia a technical release to fix a test case preventing the automated CI
release procedure.
0.7.2 (2018-10-16)
- Improve support for Python 3.6/3.7 using a small compatibility module against asyncio.
- func: Add
expire_after
option to lru_cache()
function.
0.7.1 (2018-08-24)
- Minor updates to the documentation
0.7.0 (2018-08-24)
- Add support for Python 3.7
- context: Updated to work like Python 3.7
- context: Deprecated
AsyncContextDecorator
stuffs in Python 3.7+ - context: Added an alias to
contextlib.AsyncExitStack
in the standard library.
0.6.0 (2018-04-10)
- Introduce a new module
aiotools.iter
with aiter()
function which
corresponds to an async version of the builtin iter()
.
0.5.4 (2018-02-01)
- server: Remove use of unncessary setpgrp syscall, which is also blocked by
Docker's default seccomp profile!
0.5.3 (2018-01-12)
- server: Ooops! (a finally block should have been an else block)
0.5.2 (2018-01-12)
- server: Improve inner beauty (code readability)
- server: Improve reliability and portability of worker-to-main interrupts
0.5.1 (2018-01-11)
- server: Fix a race condition related to handling of worker
initialization errors with multiple workers
0.5.0 (2017-11-08)
- func: Add
lru_cache()
which is a coroutine version of
functools.lru_cache()
0.4.5 (2017-10-14)
- server: Fix a race condition related to signal handling in the
multiprocessing module during termination
- server: Improve error handling during initialization of workers
(automatic shutdown of other workers and the main loop after
logging the exception)
0.4.4 (2017-09-12)
- Add a new module
aiotools.func
with apartial()
function which is an
async version of functools.partial()
in the standard library
0.4.3 (2017-08-06)
- Add
aclosing()
context manager like closing()
in the standard library - Speed up Travis CI builds for packaging
- Now provide README in rst as well as CHANGES (this file)
0.4.2 (2017-08-01)
server
: Fix spawning subprocesses in child workers- Add support for
uvloop
0.4.0 (2017-08-01)
- Add
use_threading
argument to - Add initial documentation (which currently not served
on readthedocs.io due to Python version problem)
0.3.2 (2017-07-31)
- Add
extra_procs
argument to start_server()
function - Add socket and ZeroMQ server examples
- Improve CI configs
0.3.1 (2017-07-26)
- Improve CI scripts
- Adopt editorconfig
0.3.0 (2017-04-26)
- Add
start_server()
function using multiprocessing
with automatic children lifecycle management - Clarify the semantics of
AsyncContextGroup
using
asyncio.gather()
with return_exceptions=True
0.2.0 (2017-04-20)
- Add abstract types for
AsyncContextManager
- Rename
AsyncGenContextManager
to AsyncContextManager
- Add
AsyncContextGroup
0.1.1 (2017-04-14)