Security News
Research
Data Theft Repackaged: A Case Study in Malicious Wrapper Packages on npm
The Socket Research Team breaks down a malicious wrapper package that uses obfuscation to harvest credentials and exfiltrate sensitive data.
Idiomatic asyncio utilties
NOTE: This project is under early stage of development. The public APIs may break version by version.
I also recommend to try the following asyncio libraries for your happier life.
This is an asynchronous version of contextlib.contextmanager
to make it
easier to write asynchronous context managers without creating boilerplate
classes.
import asyncio
import aiotools
@aiotools.actxmgr
async def mygen(a):
await asyncio.sleep(1)
yield a + 1
await asyncio.sleep(1)
async def somewhere():
async with mygen(1) as b:
assert b == 2
Note that you need to wrap yield
with a try-finally block to
ensure resource releases (e.g., locks), even in the case when
an exception is ocurred inside the async-with block.
import asyncio
import aiotools
lock = asyncio.Lock()
@aiotools.actxmgr
async def mygen(a):
await lock.acquire()
try:
yield a + 1
finally:
lock.release()
async def somewhere():
try:
async with mygen(1) as b:
raise RuntimeError('oops')
except RuntimeError:
print('caught!') # you can catch exceptions here.
You can also create a group of async context managers, which
are entered/exited all at once using asyncio.gather()
.
import asyncio
import aiotools
@aiotools.actxmgr
async def mygen(a):
yield a + 10
async def somewhere():
ctxgrp = aiotools.actxgroup(mygen(i) for i in range(10))
async with ctxgrp as values:
assert len(values) == 10
for i in range(10):
assert values[i] == i + 10
This implements a common pattern to launch asyncio-based server daemons.
import asyncio
import aiotools
async def echo(reader, writer):
data = await reader.read(100)
writer.write(data)
await writer.drain()
writer.close()
@aiotools.server
async def myworker(loop, pidx, args):
server = await asyncio.start_server(echo, '0.0.0.0', 8888,
reuse_port=True, loop=loop)
print(f'[{pidx}] started')
yield # wait until terminated
server.close()
await server.wait_closed()
print(f'[{pidx}] terminated')
if __name__ == '__main__':
# Run the above server using 4 worker processes.
aiotools.start_server(myworker, num_workers=4)
It handles SIGINT/SIGTERM signals automatically to stop the server,
as well as lifecycle management of event loops running on multiple processes.
Internally it uses aiotools.fork
module to get kernel support to resolve
potential signal/PID related races via PID file descriptors on supported versions
(Python 3.9+ and Linux kernel 5.4+).
A TaskGroup
object manages the lifecycle of sub-tasks spawned via its create_task()
method by guarding them with an async context manager which exits only when all sub-tasks
are either completed or cancelled.
This is motivated from trio's nursery API and a draft implementation is adopted from EdgeDB's Python client library.
import aiotools
async def do():
async with aiotools.TaskGroup() as tg:
tg.create_task(...)
tg.create_task(...)
...
# at this point, all subtasks are either cancelled or done.
import aiotools
i = 0
async def mytick(interval):
print(i)
i += 1
async def somewhere():
t = aiotools.create_timer(mytick, 1.0)
...
t.cancel()
await t
t
is an asyncio.Task
object.
To stop the timer, call t.cancel(); await t
.
Please don't forget await
-ing t
because it requires extra steps to
cancel and await all pending tasks.
To make your timer function to be cancellable, add a try-except clause
catching asyncio.CancelledError
since we use it as a termination
signal.
You may add TimerDelayPolicy
argument to control the behavior when the
timer-fired task takes longer than the timer interval.
DEFAULT
is to accumulate them and cancel all the remainings at once when
the timer is cancelled.
CANCEL
is to cancel any pending previously fired tasks on every interval.
import asyncio
import aiotools
async def mytick(interval):
await asyncio.sleep(100) # cancelled on every next interval.
async def somewhere():
t = aiotools.create_timer(mytick, 1.0, aiotools.TimerDelayPolicy.CANCEL)
...
t.cancel()
await t
It provides a virtual clock that advances the event loop time instantly upon
any combination of asyncio.sleep()
calls in multiple coroutine tasks,
by temporarily patching the event loop selector.
This is also used in our timer test suite.
import aiotools
import pytest
@pytest.mark.asyncio
async def test_sleeps():
loop = aiotools.compat.get_running_loop()
vclock = aiotools.VirtualClock()
with vclock.patch_loop():
print(loop.time()) # -> prints 0
await asyncio.sleep(3600)
print(loop.time()) # -> prints 3600
create_timer()
(#61)as_completed_safe()
which enhances asyncio.as_completed()
using PersistentTaskGroup
(#52)signal.pidfd_send_signal()
is available but os.pidfd_open()
is not (#51)PidfdChildWatcher
when first initialized (#50)closing_async()
async context manager, in addition to aclosing()
(#48)__all__
import list (#47)wait_timeout
option to start_server()
(#46)afork()
-ed child processes (#46)asyncio.TaskGroup
(#45)await
-ed by the caller of create_task()
in PersistentTaskGroup
, in addition to invocation of task group exception handler. Note that await
-ing those futures hangs indefinitely in Python 3.6 but we don't fix it since Python 3.6 is EoL as of December 2021. (#44)ExceptionGroup
and let MultiError
inherit ExceptionGroup
instead of BaseExceptionGroup
(#42)MultiError
for backward compatibility (#40)current_ptaskgroup
only when PersistentTaskGroup
is used via the async with
statement. (#41)TaskGroup
in Python 3.11 (#39)msg
argument to Task.cancel()
. (#32)TaskGroup
. (#35)Task.uncancel()
and Task.cancelling()
while still supporting older Python versions (#36)PersistentTaskGroup.all()
to enumerate all non-terminated persistent task groups (#38)PersistentTaskGroup
(#30)process_index
context variable for worker processes (#31)PidfdChildProcess
(#27)clone()
function and resorts back to the combinatino of os.fork()
and os.pidfd_open()
for now (#25)use_threading
argument for start_server()
is completely deprecated. (#23)fork
to support PID file descriptors in Linux 5.4+ and a POSIX-compatible fallback to asynchornously fork the Python process without signal/PID races. (#22)fork
module with handling of various edge cases such as async failures of sibiling child processes (#23)TaskGroup
when it's used for long-lived asyncio tasks. (#21)current_taskgroup
context-variable to the taskgroup module (only available for Python 3.7 or later)taskgroup
module exports in the aiotools
root package.aiotools.taskgroup
from EdgeDB (#18)timer.VirtualClock
which provides a virtual clock that makes a block of asyncio codes using asyncio.sleep()
to complete instantly and deterministically (#19)defer
module exports in the aiotools
namespace.defer()
API with asyncio awareness.async with
instead of manually unrolling __aenter__()
and __aexit__()
dunder methods, to keep
the code simple and avoid potential bugs.multiprocessing.set_start_method("spawn")
is used.
__init__.py
and let setuptools read the version
from a separate aiotools/VERSION
text file.aclosing()
's __aexit__()
exception arguments.expire_after
option to lru_cache()
function.AsyncContextDecorator
stuffs in Python 3.7+contextlib.AsyncExitStack
in the standard library.aiotools.iter
with aiter()
function which
corresponds to an async version of the builtin iter()
.lru_cache()
which is a coroutine version of
functools.lru_cache()
aiotools.func
with apartial()
function which is an
async version of functools.partial()
in the standard libraryaclosing()
context manager like closing()
in the standard libraryserver
: Fix spawning subprocesses in child workersuvloop
use_threading
argument toextra_procs
argument to start_server()
functionstart_server()
function using multiprocessing
with automatic children lifecycle managementAsyncContextGroup
using
asyncio.gather()
with return_exceptions=True
AsyncContextManager
AsyncGenContextManager
to AsyncContextManager
AsyncContextGroup
FAQs
Idiomatic asyncio utilities
We found that aiotools demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Security News
Research
The Socket Research Team breaks down a malicious wrapper package that uses obfuscation to harvest credentials and exfiltrate sensitive data.
Research
Security News
Attackers used a malicious npm package typosquatting a popular ESLint plugin to steal sensitive data, execute commands, and exploit developer systems.
Security News
The Ultralytics' PyPI Package was compromised four times in one weekend through GitHub Actions cache poisoning and failure to rotate previously compromised API tokens.