pyleak

Detect leaked asyncio tasks, threads, and event loop blocking in Python. Inspired by Go's goleak.
Installation
pip install pyleak
Quick Start
import asyncio
from pyleak import no_task_leaks, no_thread_leaks, no_event_loop_blocking
async def main():
async with no_task_leaks():
asyncio.create_task(asyncio.sleep(10))
await asyncio.sleep(0.1)
def sync_main():
with no_thread_leaks():
threading.Thread(target=lambda: time.sleep(10)).start()
async def async_main():
with no_event_loop_blocking():
time.sleep(0.5)
Usage
Context Managers
All detectors can be used as context managers:
async with no_task_leaks():
pass
with no_thread_leaks():
pass
async def main():
with no_event_loop_blocking():
pass
Decorators
All detectors can also be used as decorators:
@no_task_leaks()
async def my_async_function():
pass
@no_thread_leaks()
def my_threaded_function():
pass
@no_event_loop_blocking()
async def my_potentially_blocking_function():
pass
Get stack trace
From leaked asyncio tasks
When using no_task_leaks
, you get detailed stack trace information showing exactly where leaked tasks are executing and where they were created.
import asyncio
from pyleak import TaskLeakError, no_task_leaks
async def leaky_function():
async def background_task():
print("background task started")
await asyncio.sleep(10)
print("creating a long running task")
asyncio.create_task(background_task())
async def main():
try:
async with no_task_leaks(action="raise"):
await leaky_function()
except TaskLeakError as e:
print(e)
if __name__ == "__main__":
asyncio.run(main())
Output:
creating a long running task
background task started
Detected 1 leaked asyncio tasks
Leaked Task: Task-2
ID: 4345977088
State: TaskState.RUNNING
Current Stack:
File "/tmp/example.py", line 9, in background_task
await asyncio.sleep(10)
Include creation stack trace
You can also include the creation stack trace by passing enable_creation_tracking=True
to no_task_leaks
.
async def main():
try:
async with no_task_leaks(action="raise", enable_creation_tracking=True):
await leaky_function()
except TaskLeakError as e:
print(e)
Output:
creating a long running task
background task started
Detected 1 leaked asyncio tasks
Leaked Task: Task-2
ID: 4392245504
State: TaskState.RUNNING
Current Stack:
File "/tmp/example.py", line 9, in background_task
await asyncio.sleep(10)
Creation Stack:
File "/tmp/example.py", line 24, in <module>
asyncio.run(main())
File "/opt/homebrew/.../asyncio/runners.py", line 194, in run
return runner.run(main)
File "/opt/homebrew/.../asyncio/runners.py", line 118, in run
return self._loop.run_until_complete(task)
File "/opt/homebrew/.../asyncio/base_events.py", line 671, in run_until_complete
self.run_forever()
File "/opt/homebrew/.../asyncio/base_events.py", line 638, in run_forever
self._run_once()
File "/opt/homebrew/.../asyncio/base_events.py", line 1971, in _run_once
handle._run()
File "/opt/homebrew/.../asyncio/events.py", line 84, in _run
self._context.run(self._callback, *self._args)
File "/tmp/example.py", line 18, in main
await leaky_function()
File "/tmp/example.py", line 12, in leaky_function
asyncio.create_task(background_task())
TaskLeakError
has a leaked_tasks
attribute that contains a list of LeakedTask
objects including the stack trace details.
Note: enable_creation_tracking
monkey patches asyncio.create_task
to include the creation stack trace. It is not recommended to be used in production to avoid unnecessary side effects.
From event loop blocks
When using no_event_loop_blocking
, you get detailed stack trace information showing exactly where the event loop is blocked and where the blocking code is executing.
import asyncio
import time
from pyleak import EventLoopBlockError, no_event_loop_blocking
async def some_function_with_blocking_code():
print("starting")
time.sleep(1)
print("done")
async def main():
try:
async with no_event_loop_blocking(action="raise"):
await some_function_with_blocking_code()
except EventLoopBlockError as e:
print(e)
if __name__ == "__main__":
asyncio.run(main())
Output:
starting
done
Detected 1 event loop blocks
Event Loop Block: block-1
Duration: 0.605s (threshold: 0.200s)
Timestamp: 1749051796.302
Blocking Stack:
File "/private/tmp/example.py", line 22, in <module>
asyncio.run(main())
File "/opt/homebrew/.../asyncio/runners.py", line 194, in run
return runner.run(main)
File "/opt/homebrew/.../asyncio/runners.py", line 118, in run
return self._loop.run_until_complete(task)
File "/opt/homebrew/.../asyncio/base_events.py", line 671, in run_until_complete
self.run_forever()
File "/opt/homebrew/.../asyncio/base_events.py", line 638, in run_forever
self._run_once()
File "/opt/homebrew/.../asyncio/base_events.py", line 1971, in _run_once
handle._run()
File "/opt/homebrew/.../asyncio/events.py", line 84, in _run
self._context.run(self._callback, *self._args)
File "/private/tmp/example.py", line 16, in main
await some_function_with_blocking_code()
File "/private/tmp/example.py", line 9, in some_function_with_blocking_code
time.sleep(1)
Actions
Control what happens when leaks/blocking are detected:
"warn" (default) | ā
Issues ResourceWarning | ā
Issues ResourceWarning | ā
Issues ResourceWarning |
"log" | ā
Writes to logger | ā
Writes to logger | ā
Writes to logger |
"cancel" | ā
Cancels leaked tasks | ā Warns (can't force-stop) | ā Warns (can't cancel) |
"raise" | ā
Raises TaskLeakError | ā
Raises ThreadLeakError | ā
Raises EventLoopBlockError |
async with no_task_leaks(action="cancel"):
pass
with no_thread_leaks(action="raise"):
pass
with no_event_loop_blocking(action="log"):
pass
Name Filtering
Filter detection by resource names (tasks and threads only):
import re
async with no_task_leaks(name_filter="background-worker"):
pass
with no_thread_leaks(name_filter="worker-thread"):
pass
async with no_task_leaks(name_filter=re.compile(r"worker-\d+")):
pass
with no_thread_leaks(name_filter=re.compile(r"background-.*")):
pass
Note: Event loop blocking detection doesn't support name filtering.
Configuration Options
AsyncIO Tasks
no_task_leaks(
action="warn",
name_filter=None,
logger=None
)
Threads
no_thread_leaks(
action="warn",
name_filter=None,
logger=None,
exclude_daemon=True,
)
Event Loop Blocking
no_event_loop_blocking(
action="warn",
logger=None,
threshold=0.1,
check_interval=0.01
)
Testing
Perfect for catching issues in tests:
import pytest
from pyleak import no_task_leaks, no_thread_leaks, no_event_loop_blocking
@pytest.mark.asyncio
async def test_no_leaked_tasks():
async with no_task_leaks(action="raise"):
await my_async_function()
def test_no_leaked_threads():
with no_thread_leaks(action="raise"):
my_threaded_function()
@pytest.mark.asyncio
async def test_no_event_loop_blocking():
with no_event_loop_blocking(action="raise", threshold=0.1):
await my_potentially_blocking_function()
Real-World Examples
Detecting Synchronous HTTP Calls in Async Code
import httpx
from starlette.testclient import TestClient
async def test_sync_vs_async_http():
with no_event_loop_blocking(action="warn"):
response = TestClient(app).get("/endpoint")
with no_event_loop_blocking(action="warn"):
async with httpx.AsyncClient() as client:
response = await client.get("/endpoint")
Ensuring Proper Resource Cleanup
async def test_background_task_cleanup():
async with no_task_leaks(action="raise"):
asyncio.create_task(long_running_task())
task = asyncio.create_task(long_running_task())
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
Debugging complex task leaks
import asyncio
import random
import re
from pyleak import TaskLeakError, no_task_leaks
async def debug_task_leaks():
"""Example showing how to debug complex task leaks."""
async def worker(worker_id: int, sleep_time: int):
print(f"Worker {worker_id} starting")
await asyncio.sleep(sleep_time)
print(f"Worker {worker_id} done")
async def spawn_workers():
for i in range(3):
asyncio.create_task(worker(i, random.randint(1, 10)), name=f"worker-{i}")
try:
async with no_task_leaks(
action="raise",
enable_creation_tracking=True,
name_filter=re.compile(r"worker-\d+"),
):
await spawn_workers()
await asyncio.sleep(0.1)
except TaskLeakError as e:
print(f"\nFound {e.task_count} leaked worker tasks:")
for task_info in e.leaked_tasks:
print(f"\n--- {task_info.name} ---")
print("Currently executing:")
print(task_info.format_current_stack())
print("Created at:")
print(task_info.format_creation_stack())
if task_info.task_ref:
task_info.task_ref.cancel()
if __name__ == "__main__":
asyncio.run(debug_task_leaks())
Toggle to see the output
Worker 0 starting
Worker 1 starting
Worker 2 starting
Found 3 leaked worker tasks:
--- worker-2 ---
Currently executing:
File "/private/tmp/example.py", line 33, in worker
await asyncio.sleep(sleep_time) # Simulate work
Created at:
File "/private/tmp/example.py", line 65, in <module>
asyncio.run(debug_task_leaks())
File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/runners.py", line 194, in run
return runner.run(main)
File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/runners.py", line 118, in run
return self._loop.run_until_complete(task)
File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/base_events.py", line 671, in run_until_complete
self.run_forever()
File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/base_events.py", line 638, in run_forever
self._run_once()
File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/base_events.py", line 1971, in _run_once
handle._run()
File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/events.py", line 84, in _run
self._context.run(self._callback, *self._args)
File "/private/tmp/example.py", line 47, in debug_task_leaks
await spawn_workers()
File "/private/tmp/example.py", line 39, in spawn_workers
asyncio.create_task(worker(i, random.randint(1, 10)), name=f"worker-{i}")
--- worker-0 ---
Currently executing:
File "/private/tmp/example.py", line 33, in worker
await asyncio.sleep(sleep_time) # Simulate work
Created at:
File "/private/tmp/example.py", line 65, in <module>
asyncio.run(debug_task_leaks())
File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/runners.py", line 194, in run
return runner.run(main)
File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/runners.py", line 118, in run
return self._loop.run_until_complete(task)
File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/base_events.py", line 671, in run_until_complete
self.run_forever()
File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/base_events.py", line 638, in run_forever
self._run_once()
File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/base_events.py", line 1971, in _run_once
handle._run()
File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/events.py", line 84, in _run
self._context.run(self._callback, *self._args)
File "/private/tmp/example.py", line 47, in debug_task_leaks
await spawn_workers()
File "/private/tmp/example.py", line 39, in spawn_workers
asyncio.create_task(worker(i, random.randint(1, 10)), name=f"worker-{i}")
--- worker-1 ---
Currently executing:
File "/private/tmp/example.py", line 33, in worker
await asyncio.sleep(sleep_time) # Simulate work
Created at:
File "/private/tmp/example.py", line 65, in <module>
asyncio.run(debug_task_leaks())
File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/runners.py", line 194, in run
return runner.run(main)
File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/runners.py", line 118, in run
return self._loop.run_until_complete(task)
File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/base_events.py", line 671, in run_until_complete
self.run_forever()
File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/base_events.py", line 638, in run_forever
self._run_once()
File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/base_events.py", line 1971, in _run_once
handle._run()
File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/events.py", line 84, in _run
self._context.run(self._callback, *self._args)
File "/private/tmp/example.py", line 47, in debug_task_leaks
await spawn_workers()
File "/private/tmp/example.py", line 39, in spawn_workers
asyncio.create_task(worker(i, random.randint(1, 10)), name=f"worker-{i}")
Debugging event loop blocking
import asyncio
from pyleak import EventLoopBlockError, no_event_loop_blocking
async def process_user_data(user_id: int):
"""Simulates cpu intensive work - contains blocking operations!"""
print(f"Processing user {user_id}...")
return sum(i * i for i in range(100_000_000))
async def main():
try:
async with no_event_loop_blocking(action="raise", threshold=0.5):
user1 = await process_user_data(1)
user2 = await process_user_data(2)
except EventLoopBlockError as e:
print(f"\nšØ Found {e.block_count} blocking events:")
print(e)
if __name__ == "__main__":
asyncio.run(main())
Toggle to see the output
Processing user 1...
Processing user 2...
šØ Found 5 blocking events:
Detected 5 event loop blocks
Event Loop Block: block-1
Duration: 1.507s (threshold: 0.500s)
Timestamp: 1749052720.456
Blocking Stack:
File "/private/tmp/example.py", line 36, in <module>
asyncio.run(main())
File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/runners.py", line 194, in run
return runner.run(main)
File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/runners.py", line 118, in run
return self._loop.run_until_complete(task)
File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/base_events.py", line 671, in run_until_complete
self.run_forever()
File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/base_events.py", line 638, in run_forever
self._run_once()
File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/base_events.py", line 1971, in _run_once
handle._run()
File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/events.py", line 84, in _run
self._context.run(self._callback, *self._args)
File "/private/tmp/example.py", line 27, in main
user1 = await process_user_data(1)
File "/private/tmp/example.py", line 21, in process_user_data
return sum(i * i for i in range(100_000_000))
File "/private/tmp/example.py", line 21, in <genexpr>
return sum(i * i for i in range(100_000_000))
Event Loop Block: block-2
Duration: 1.516s (threshold: 0.500s)
Timestamp: 1749052722.054
Blocking Stack:
File "/private/tmp/example.py", line 36, in <module>
asyncio.run(main())
File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/runners.py", line 194, in run
return runner.run(main)
File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/runners.py", line 118, in run
return self._loop.run_until_complete(task)
File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/base_events.py", line 671, in run_until_complete
self.run_forever()
File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/base_events.py", line 638, in run_forever
self._run_once()
File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/base_events.py", line 1971, in _run_once
handle._run()
File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/events.py", line 84, in _run
self._context.run(self._callback, *self._args)
File "/private/tmp/example.py", line 27, in main
user1 = await process_user_data(1)
File "/private/tmp/example.py", line 21, in process_user_data
return sum(i * i for i in range(100_000_000))
File "/private/tmp/example.py", line 21, in <genexpr>
return sum(i * i for i in range(100_000_000))
Event Loop Block: block-3
Duration: 1.518s (threshold: 0.500s)
Timestamp: 1749052723.648
Blocking Stack:
File "/private/tmp/example.py", line 36, in <module>
asyncio.run(main())
File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/runners.py", line 194, in run
return runner.run(main)
File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/runners.py", line 118, in run
return self._loop.run_until_complete(task)
File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/base_events.py", line 671, in run_until_complete
self.run_forever()
File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/base_events.py", line 638, in run_forever
self._run_once()
File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/base_events.py", line 1971, in _run_once
handle._run()
File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/events.py", line 84, in _run
self._context.run(self._callback, *self._args)
File "/private/tmp/example.py", line 28, in main
user2 = await process_user_data(2)
File "/private/tmp/example.py", line 21, in process_user_data
return sum(i * i for i in range(100_000_000))
File "/private/tmp/example.py", line 21, in <genexpr>
return sum(i * i for i in range(100_000_000))
Event Loop Block: block-4
Duration: 1.517s (threshold: 0.500s)
Timestamp: 1749052725.247
Blocking Stack:
File "/private/tmp/example.py", line 36, in <module>
asyncio.run(main())
File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/runners.py", line 194, in run
return runner.run(main)
File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/runners.py", line 118, in run
return self._loop.run_until_complete(task)
File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/base_events.py", line 671, in run_until_complete
self.run_forever()
File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/base_events.py", line 638, in run_forever
self._run_once()
File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/base_events.py", line 1971, in _run_once
handle._run()
File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/events.py", line 84, in _run
self._context.run(self._callback, *self._args)
File "/private/tmp/example.py", line 28, in main
user2 = await process_user_data(2)
File "/private/tmp/example.py", line 21, in process_user_data
return sum(i * i for i in range(100_000_000))
File "/private/tmp/example.py", line 21, in <genexpr>
return sum(i * i for i in range(100_000_000))
Event Loop Block: block-5
Duration: 1.513s (threshold: 0.500s)
Timestamp: 1749052726.839
Blocking Stack:
File "/private/tmp/example.py", line 36, in <module>
asyncio.run(main())
File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/runners.py", line 194, in run
return runner.run(main)
File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/runners.py", line 118, in run
return self._loop.run_until_complete(task)
File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/base_events.py", line 671, in run_until_complete
self.run_forever()
File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/base_events.py", line 638, in run_forever
self._run_once()
File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/base_events.py", line 1971, in _run_once
handle._run()
File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/events.py", line 84, in _run
self._context.run(self._callback, *self._args)
File "/private/tmp/example.py", line 28, in main
user2 = await process_user_data(2)
File "/private/tmp/example.py", line 21, in process_user_data
return sum(i * i for i in range(100_000_000))
File "/private/tmp/example.py", line 21, in <genexpr>
return sum(i * i for i in range(100_000_000))
Pytest Plugin
The pytest plugin automatically wraps tests with pyleak detectors based on pytest markers.
Installation
pip install pyleak
Add the plugin to your pytest configuration
pyproject.toml
[tool.pytest.ini_options]
markers = [
"no_leaks: detect asyncio task leaks, thread leaks, and event loop blocking"
]
pytest.ini
[tool:pytest]
markers = no_leaks: detect asyncio task leaks, thread leaks, and event loop blocking
You can also add it to the conftest.py
file.
import pytest
def pytest_configure(config):
config.addinivalue_line(
"markers",
"no_leaks: detect asyncio task leaks, thread leaks, and event loop blocking"
)
Usage
@pytest.mark.no_leaks
@pytest.mark.asyncio
async def test_no_task_leaks():
asyncio.create_task(asyncio.sleep(10))
Selective detection
By default, all detectors are enabled. You can selectively enable or disable detectors using the no_leaks
marker. For example, to only detect task leaks and event loop blocking, you can use the following:
@pytest.mark.no_leaks(tasks=True, blocking=True, threads=False)
@pytest.mark.asyncio
async def test_async_no_leaks():
asyncio.create_task(asyncio.sleep(10))
time.sleep(0.5)
threading.Thread(target=lambda: time.sleep(10)).start()
no_leaks
marker configuration
tasks | True | Whether to detect task leaks |
task_action | raise | Action to take when a task leak is detected |
task_name_filter | None | Filter to apply to task names |
enable_task_creation_tracking | False | Whether to enable task creation tracking |
threads | True | Whether to detect thread leaks |
thread_action | raise | Action to take when a thread leak is detected |
thread_name_filter | None | Filter to apply to thread names |
exclude_daemon_threads | True | Whether to exclude daemon threads |
blocking | True | Whether to detect event loop blocking |
blocking_action | raise | Action to take when a blocking event loop is detected |
blocking_threshold | 0.1 | Threshold for blocking event loop detection |
blocking_check_interval | 0.01 | Interval for checking for blocking event loop |
Why Use pyleak?
AsyncIO Tasks: Leaked tasks can cause memory leaks, prevent graceful shutdown, and make debugging difficult.
Threads: Leaked threads consume system resources and can prevent proper application termination.
Event Loop Blocking: Synchronous operations in async code destroy performance and can cause timeouts.
pyleak
helps you catch these issues during development and testing, optionally using a pytest plugin, before they reach production.
Examples
More examples can be found in the test files:
Disclaimer: Most of the code and tests are written by Claude.