šŸš€ Big News: Socket Acquires Coana to Bring Reachability Analysis to Every Appsec Team.Learn more →
Socket
DemoInstallSign in
Socket

pyleak

Package Overview
Dependencies
Maintainers
1
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

pyleak

Detect leaked asyncio tasks, threads, and event loop blocking in Python. Inspired by Go's goleak

0.1.11
PyPI
Maintainers
1

pyleak

PyPI PyPI Downloads

Detect leaked asyncio tasks, threads, and event loop blocking in Python. Inspired by Go's goleak.

Installation

pip install pyleak

Quick Start

import asyncio
from pyleak import no_task_leaks, no_thread_leaks, no_event_loop_blocking

# Detect leaked asyncio tasks
async def main():
    async with no_task_leaks():
        asyncio.create_task(asyncio.sleep(10))  # This will be detected
        await asyncio.sleep(0.1)

# Detect leaked threads  
def sync_main():
    with no_thread_leaks():
        threading.Thread(target=lambda: time.sleep(10)).start()  # This will be detected

# Detect event loop blocking
async def async_main():
    with no_event_loop_blocking():
        time.sleep(0.5)  # This will be detected

Usage

Context Managers

All detectors can be used as context managers:

# AsyncIO tasks (async context)
async with no_task_leaks():
    # Your async code here
    pass

# Threads (sync context)
with no_thread_leaks():
    # Your threaded code here
    pass

# Event loop blocking (async context only)
async def main():
    with no_event_loop_blocking():
        # Your potentially blocking code here
        pass

Decorators

All detectors can also be used as decorators:

@no_task_leaks()
async def my_async_function():
    # Any leaked tasks will be detected
    pass

@no_thread_leaks()
def my_threaded_function():
    # Any leaked threads will be detected  
    pass

@no_event_loop_blocking()
async def my_potentially_blocking_function():
    # Any event loop blocking will be detected
    pass

Get stack trace

From leaked asyncio tasks

When using no_task_leaks, you get detailed stack trace information showing exactly where leaked tasks are executing and where they were created.

import asyncio
from pyleak import TaskLeakError, no_task_leaks

async def leaky_function():
    async def background_task():
        print("background task started")
        await asyncio.sleep(10)

    print("creating a long running task")
    asyncio.create_task(background_task())

async def main():
    try:
        async with no_task_leaks(action="raise"):
            await leaky_function()
    except TaskLeakError as e:
        print(e)

if __name__ == "__main__":
    asyncio.run(main())

Output:

creating a long running task
background task started
Detected 1 leaked asyncio tasks

Leaked Task: Task-2
  ID: 4345977088
  State: TaskState.RUNNING
  Current Stack:
    File "/tmp/example.py", line 9, in background_task
        await asyncio.sleep(10)

Include creation stack trace

You can also include the creation stack trace by passing enable_creation_tracking=True to no_task_leaks.

async def main():
    try:
        async with no_task_leaks(action="raise", enable_creation_tracking=True):
            await leaky_function()
    except TaskLeakError as e:
        print(e)

Output:

creating a long running task
background task started
Detected 1 leaked asyncio tasks

Leaked Task: Task-2
  ID: 4392245504
  State: TaskState.RUNNING
  Current Stack:
    File "/tmp/example.py", line 9, in background_task
        await asyncio.sleep(10)
  Creation Stack:
    File "/tmp/example.py", line 24, in <module>
        asyncio.run(main())
    File "/opt/homebrew/.../asyncio/runners.py", line 194, in run
        return runner.run(main)
    File "/opt/homebrew/.../asyncio/runners.py", line 118, in run
        return self._loop.run_until_complete(task)
    File "/opt/homebrew/.../asyncio/base_events.py", line 671, in run_until_complete
        self.run_forever()
    File "/opt/homebrew/.../asyncio/base_events.py", line 638, in run_forever
        self._run_once()
    File "/opt/homebrew/.../asyncio/base_events.py", line 1971, in _run_once
        handle._run()
    File "/opt/homebrew/.../asyncio/events.py", line 84, in _run
        self._context.run(self._callback, *self._args)
    File "/tmp/example.py", line 18, in main
        await leaky_function()
    File "/tmp/example.py", line 12, in leaky_function
        asyncio.create_task(background_task())

TaskLeakError has a leaked_tasks attribute that contains a list of LeakedTask objects including the stack trace details.

Note: enable_creation_tracking monkey patches asyncio.create_task to include the creation stack trace. It is not recommended to be used in production to avoid unnecessary side effects.

From event loop blocks

When using no_event_loop_blocking, you get detailed stack trace information showing exactly where the event loop is blocked and where the blocking code is executing.

import asyncio
import time

from pyleak import EventLoopBlockError, no_event_loop_blocking


async def some_function_with_blocking_code():
    print("starting")
    time.sleep(1)
    print("done")


async def main():
    try:
        async with no_event_loop_blocking(action="raise"):
            await some_function_with_blocking_code()
    except EventLoopBlockError as e:
        print(e)


if __name__ == "__main__":
    asyncio.run(main())

Output:

starting
done
Detected 1 event loop blocks

Event Loop Block: block-1
  Duration: 0.605s (threshold: 0.200s)
  Timestamp: 1749051796.302
  Blocking Stack:
    File "/private/tmp/example.py", line 22, in <module>
        asyncio.run(main())
      File "/opt/homebrew/.../asyncio/runners.py", line 194, in run
        return runner.run(main)
      File "/opt/homebrew/.../asyncio/runners.py", line 118, in run
        return self._loop.run_until_complete(task)
      File "/opt/homebrew/.../asyncio/base_events.py", line 671, in run_until_complete
        self.run_forever()
      File "/opt/homebrew/.../asyncio/base_events.py", line 638, in run_forever
        self._run_once()
      File "/opt/homebrew/.../asyncio/base_events.py", line 1971, in _run_once
        handle._run()
      File "/opt/homebrew/.../asyncio/events.py", line 84, in _run
        self._context.run(self._callback, *self._args)
      File "/private/tmp/example.py", line 16, in main
        await some_function_with_blocking_code()
      File "/private/tmp/example.py", line 9, in some_function_with_blocking_code
        time.sleep(1)

Actions

Control what happens when leaks/blocking are detected:

ActionAsyncIO TasksThreadsEvent Loop Blocking
"warn" (default)āœ… Issues ResourceWarningāœ… Issues ResourceWarningāœ… Issues ResourceWarning
"log"āœ… Writes to loggerāœ… Writes to loggerāœ… Writes to logger
"cancel"āœ… Cancels leaked tasksāŒ Warns (can't force-stop)āŒ Warns (can't cancel)
"raise"āœ… Raises TaskLeakErrorāœ… Raises ThreadLeakErrorāœ… Raises EventLoopBlockError
# Examples
async with no_task_leaks(action="cancel"):  # Cancels leaked tasks
    pass

with no_thread_leaks(action="raise"):  # Raises exception on thread leaks
    pass

with no_event_loop_blocking(action="log"):  # Logs blocking events
    pass

Name Filtering

Filter detection by resource names (tasks and threads only):

import re

# Exact match
async with no_task_leaks(name_filter="background-worker"):
    pass

with no_thread_leaks(name_filter="worker-thread"):
    pass

# Regex pattern
async with no_task_leaks(name_filter=re.compile(r"worker-\d+")):
    pass

with no_thread_leaks(name_filter=re.compile(r"background-.*")):
    pass

Note: Event loop blocking detection doesn't support name filtering.

Configuration Options

AsyncIO Tasks

no_task_leaks(
    action="warn",           # Action to take on detection
    name_filter=None,        # Filter by task name
    logger=None              # Custom logger
)

Threads

no_thread_leaks(
    action="warn",           # Action to take on detection
    name_filter=None,        # Filter by thread name
    logger=None,             # Custom logger
    exclude_daemon=True,     # Exclude daemon threads
)

Event Loop Blocking

no_event_loop_blocking(
    action="warn",           # Action to take on detection
    logger=None,             # Custom logger
    threshold=0.1,           # Minimum blocking time to report (seconds)
    check_interval=0.01      # How often to check (seconds)
)

Testing

Perfect for catching issues in tests:

import pytest
from pyleak import no_task_leaks, no_thread_leaks, no_event_loop_blocking

@pytest.mark.asyncio
async def test_no_leaked_tasks():
    async with no_task_leaks(action="raise"):
        await my_async_function()

def test_no_leaked_threads():
    with no_thread_leaks(action="raise"):
        my_threaded_function()

@pytest.mark.asyncio        
async def test_no_event_loop_blocking():
    with no_event_loop_blocking(action="raise", threshold=0.1):
        await my_potentially_blocking_function()

Real-World Examples

Detecting Synchronous HTTP Calls in Async Code

import httpx
from starlette.testclient import TestClient

async def test_sync_vs_async_http():
    # This will detect blocking
    with no_event_loop_blocking(action="warn"):
        response = TestClient(app).get("/endpoint")  # Synchronous!
        
    # This will not detect blocking  
    with no_event_loop_blocking(action="warn"):
        async with httpx.AsyncClient() as client:
            response = await client.get("/endpoint")  # Asynchronous!

Ensuring Proper Resource Cleanup

async def test_background_task_cleanup():
    async with no_task_leaks(action="raise"):
        # This would fail the test
        asyncio.create_task(long_running_task())
        
        # This would pass
        task = asyncio.create_task(long_running_task())
        task.cancel()
        try:
            await task
        except asyncio.CancelledError:
            pass

Debugging complex task leaks

import asyncio
import random
import re
from pyleak import TaskLeakError, no_task_leaks

async def debug_task_leaks():
    """Example showing how to debug complex task leaks."""

    async def worker(worker_id: int, sleep_time: int):
        print(f"Worker {worker_id} starting")
        await asyncio.sleep(sleep_time)  # Simulate work
        print(f"Worker {worker_id} done")

    async def spawn_workers():
        for i in range(3):
            asyncio.create_task(worker(i, random.randint(1, 10)), name=f"worker-{i}")

    try:
        async with no_task_leaks(
            action="raise",
            enable_creation_tracking=True,
            name_filter=re.compile(r"worker-\d+"),  # Only catch worker tasks
        ):
            await spawn_workers()
            await asyncio.sleep(0.1)  # Let workers start

    except TaskLeakError as e:
        print(f"\nFound {e.task_count} leaked worker tasks:")
        for task_info in e.leaked_tasks:
            print(f"\n--- {task_info.name} ---")
            print("Currently executing:")
            print(task_info.format_current_stack())
            print("Created at:")
            print(task_info.format_creation_stack())

            # Cancel the leaked task
            if task_info.task_ref:
                task_info.task_ref.cancel()


if __name__ == "__main__":
    asyncio.run(debug_task_leaks())

Toggle to see the output
Worker 0 starting
Worker 1 starting
Worker 2 starting

Found 3 leaked worker tasks:

--- worker-2 ---
Currently executing:
  File "/private/tmp/example.py", line 33, in worker
    await asyncio.sleep(sleep_time)  # Simulate work

Created at:
  File "/private/tmp/example.py", line 65, in <module>
    asyncio.run(debug_task_leaks())
  File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/runners.py", line 194, in run
    return runner.run(main)
  File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/runners.py", line 118, in run
    return self._loop.run_until_complete(task)
  File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/base_events.py", line 671, in run_until_complete
    self.run_forever()
  File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/base_events.py", line 638, in run_forever
    self._run_once()
  File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/base_events.py", line 1971, in _run_once
    handle._run()
  File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/events.py", line 84, in _run
    self._context.run(self._callback, *self._args)
  File "/private/tmp/example.py", line 47, in debug_task_leaks
    await spawn_workers()
  File "/private/tmp/example.py", line 39, in spawn_workers
    asyncio.create_task(worker(i, random.randint(1, 10)), name=f"worker-{i}")


--- worker-0 ---
Currently executing:
  File "/private/tmp/example.py", line 33, in worker
    await asyncio.sleep(sleep_time)  # Simulate work

Created at:
  File "/private/tmp/example.py", line 65, in <module>
    asyncio.run(debug_task_leaks())
  File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/runners.py", line 194, in run
    return runner.run(main)
  File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/runners.py", line 118, in run
    return self._loop.run_until_complete(task)
  File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/base_events.py", line 671, in run_until_complete
    self.run_forever()
  File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/base_events.py", line 638, in run_forever
    self._run_once()
  File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/base_events.py", line 1971, in _run_once
    handle._run()
  File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/events.py", line 84, in _run
    self._context.run(self._callback, *self._args)
  File "/private/tmp/example.py", line 47, in debug_task_leaks
    await spawn_workers()
  File "/private/tmp/example.py", line 39, in spawn_workers
    asyncio.create_task(worker(i, random.randint(1, 10)), name=f"worker-{i}")


--- worker-1 ---
Currently executing:
  File "/private/tmp/example.py", line 33, in worker
    await asyncio.sleep(sleep_time)  # Simulate work

Created at:
  File "/private/tmp/example.py", line 65, in <module>
    asyncio.run(debug_task_leaks())
  File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/runners.py", line 194, in run
    return runner.run(main)
  File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/runners.py", line 118, in run
    return self._loop.run_until_complete(task)
  File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/base_events.py", line 671, in run_until_complete
    self.run_forever()
  File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/base_events.py", line 638, in run_forever
    self._run_once()
  File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/base_events.py", line 1971, in _run_once
    handle._run()
  File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/events.py", line 84, in _run
    self._context.run(self._callback, *self._args)
  File "/private/tmp/example.py", line 47, in debug_task_leaks
    await spawn_workers()
  File "/private/tmp/example.py", line 39, in spawn_workers
    asyncio.create_task(worker(i, random.randint(1, 10)), name=f"worker-{i}")

Debugging event loop blocking

import asyncio
from pyleak import EventLoopBlockError, no_event_loop_blocking

async def process_user_data(user_id: int):
    """Simulates cpu intensive work - contains blocking operations!"""
    print(f"Processing user {user_id}...")
    return sum(i * i for i in range(100_000_000))

async def main():
    try:
        async with no_event_loop_blocking(action="raise", threshold=0.5):
            user1 = await process_user_data(1)
            user2 = await process_user_data(2)

    except EventLoopBlockError as e:
        print(f"\n🚨 Found {e.block_count} blocking events:")
        print(e)

if __name__ == "__main__":
    asyncio.run(main())
Toggle to see the output
Processing user 1...
Processing user 2...

🚨 Found 5 blocking events:
Detected 5 event loop blocks

Event Loop Block: block-1
  Duration: 1.507s (threshold: 0.500s)
  Timestamp: 1749052720.456
  Blocking Stack:
    File "/private/tmp/example.py", line 36, in <module>
        asyncio.run(main())
      File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/runners.py", line 194, in run
        return runner.run(main)
      File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/runners.py", line 118, in run
        return self._loop.run_until_complete(task)
      File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/base_events.py", line 671, in run_until_complete
        self.run_forever()
      File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/base_events.py", line 638, in run_forever
        self._run_once()
      File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/base_events.py", line 1971, in _run_once
        handle._run()
      File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/events.py", line 84, in _run
        self._context.run(self._callback, *self._args)
      File "/private/tmp/example.py", line 27, in main
        user1 = await process_user_data(1)
      File "/private/tmp/example.py", line 21, in process_user_data
        return sum(i * i for i in range(100_000_000))
      File "/private/tmp/example.py", line 21, in <genexpr>
        return sum(i * i for i in range(100_000_000))
Event Loop Block: block-2
  Duration: 1.516s (threshold: 0.500s)
  Timestamp: 1749052722.054
  Blocking Stack:
    File "/private/tmp/example.py", line 36, in <module>
        asyncio.run(main())
      File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/runners.py", line 194, in run
        return runner.run(main)
      File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/runners.py", line 118, in run
        return self._loop.run_until_complete(task)
      File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/base_events.py", line 671, in run_until_complete
        self.run_forever()
      File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/base_events.py", line 638, in run_forever
        self._run_once()
      File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/base_events.py", line 1971, in _run_once
        handle._run()
      File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/events.py", line 84, in _run
        self._context.run(self._callback, *self._args)
      File "/private/tmp/example.py", line 27, in main
        user1 = await process_user_data(1)
      File "/private/tmp/example.py", line 21, in process_user_data
        return sum(i * i for i in range(100_000_000))
      File "/private/tmp/example.py", line 21, in <genexpr>
        return sum(i * i for i in range(100_000_000))
Event Loop Block: block-3
  Duration: 1.518s (threshold: 0.500s)
  Timestamp: 1749052723.648
  Blocking Stack:
    File "/private/tmp/example.py", line 36, in <module>
        asyncio.run(main())
      File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/runners.py", line 194, in run
        return runner.run(main)
      File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/runners.py", line 118, in run
        return self._loop.run_until_complete(task)
      File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/base_events.py", line 671, in run_until_complete
        self.run_forever()
      File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/base_events.py", line 638, in run_forever
        self._run_once()
      File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/base_events.py", line 1971, in _run_once
        handle._run()
      File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/events.py", line 84, in _run
        self._context.run(self._callback, *self._args)
      File "/private/tmp/example.py", line 28, in main
        user2 = await process_user_data(2)
      File "/private/tmp/example.py", line 21, in process_user_data
        return sum(i * i for i in range(100_000_000))
      File "/private/tmp/example.py", line 21, in <genexpr>
        return sum(i * i for i in range(100_000_000))
Event Loop Block: block-4
  Duration: 1.517s (threshold: 0.500s)
  Timestamp: 1749052725.247
  Blocking Stack:
    File "/private/tmp/example.py", line 36, in <module>
        asyncio.run(main())
      File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/runners.py", line 194, in run
        return runner.run(main)
      File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/runners.py", line 118, in run
        return self._loop.run_until_complete(task)
      File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/base_events.py", line 671, in run_until_complete
        self.run_forever()
      File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/base_events.py", line 638, in run_forever
        self._run_once()
      File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/base_events.py", line 1971, in _run_once
        handle._run()
      File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/events.py", line 84, in _run
        self._context.run(self._callback, *self._args)
      File "/private/tmp/example.py", line 28, in main
        user2 = await process_user_data(2)
      File "/private/tmp/example.py", line 21, in process_user_data
        return sum(i * i for i in range(100_000_000))
      File "/private/tmp/example.py", line 21, in <genexpr>
        return sum(i * i for i in range(100_000_000))
Event Loop Block: block-5
  Duration: 1.513s (threshold: 0.500s)
  Timestamp: 1749052726.839
  Blocking Stack:
    File "/private/tmp/example.py", line 36, in <module>
        asyncio.run(main())
      File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/runners.py", line 194, in run
        return runner.run(main)
      File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/runners.py", line 118, in run
        return self._loop.run_until_complete(task)
      File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/base_events.py", line 671, in run_until_complete
        self.run_forever()
      File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/base_events.py", line 638, in run_forever
        self._run_once()
      File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/base_events.py", line 1971, in _run_once
        handle._run()
      File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/events.py", line 84, in _run
        self._context.run(self._callback, *self._args)
      File "/private/tmp/example.py", line 28, in main
        user2 = await process_user_data(2)
      File "/private/tmp/example.py", line 21, in process_user_data
        return sum(i * i for i in range(100_000_000))
      File "/private/tmp/example.py", line 21, in <genexpr>
        return sum(i * i for i in range(100_000_000))

Pytest Plugin

The pytest plugin automatically wraps tests with pyleak detectors based on pytest markers.

Installation

pip install pyleak

Add the plugin to your pytest configuration

pyproject.toml

[tool.pytest.ini_options]
markers = [
    "no_leaks: detect asyncio task leaks, thread leaks, and event loop blocking"
]

pytest.ini

[tool:pytest]
markers = no_leaks: detect asyncio task leaks, thread leaks, and event loop blocking

You can also add it to the conftest.py file.

# conftest.py
import pytest

def pytest_configure(config):
    config.addinivalue_line(
        "markers", 
        "no_leaks: detect asyncio task leaks, thread leaks, and event loop blocking"
    )

Usage

@pytest.mark.no_leaks
@pytest.mark.asyncio
async def test_no_task_leaks():
    asyncio.create_task(asyncio.sleep(10))

Selective detection

By default, all detectors are enabled. You can selectively enable or disable detectors using the no_leaks marker. For example, to only detect task leaks and event loop blocking, you can use the following:

@pytest.mark.no_leaks(tasks=True, blocking=True, threads=False)
@pytest.mark.asyncio
async def test_async_no_leaks():
    asyncio.create_task(asyncio.sleep(10))  # This will be detected
    time.sleep(0.5)  # This will be detected
    threading.Thread(target=lambda: time.sleep(10)).start()  # This will not be detected

no_leaks marker configuration

NameDefaultDescription
tasksTrueWhether to detect task leaks
task_actionraiseAction to take when a task leak is detected
task_name_filterNoneFilter to apply to task names
enable_task_creation_trackingFalseWhether to enable task creation tracking
threadsTrueWhether to detect thread leaks
thread_actionraiseAction to take when a thread leak is detected
thread_name_filterNoneFilter to apply to thread names
exclude_daemon_threadsTrueWhether to exclude daemon threads
blockingTrueWhether to detect event loop blocking
blocking_actionraiseAction to take when a blocking event loop is detected
blocking_threshold0.1Threshold for blocking event loop detection
blocking_check_interval0.01Interval for checking for blocking event loop

Why Use pyleak?

AsyncIO Tasks: Leaked tasks can cause memory leaks, prevent graceful shutdown, and make debugging difficult.

Threads: Leaked threads consume system resources and can prevent proper application termination.

Event Loop Blocking: Synchronous operations in async code destroy performance and can cause timeouts.

pyleak helps you catch these issues during development and testing, optionally using a pytest plugin, before they reach production.

Examples

More examples can be found in the test files:

Disclaimer: Most of the code and tests are written by Claude.

Keywords

async

FAQs

Did you know?

Socket

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Install

Related posts