
Research
/Security News
Critical Vulnerability in NestJS Devtools: Localhost RCE via Sandbox Escape
A flawed sandbox in @nestjs/devtools-integration lets attackers run code on your machine via CSRF, leading to full Remote Code Execution (RCE).
Adaptio 是一个基于 Python asyncio 的智能并发控制工具。它借鉴了 TCP 拥塞控制算法的思想,可以根据系统负载动态调整并发任务的数量,从而优化任务吞吐量并防止过载。此外,还提供了一个装饰器,当任务因系统过载失败时自动重试。
An intelligent adaptive asynchronous concurrency control library that makes your Python async tasks run more stably and efficiently
Adaptio is an intelligent concurrency control tool based on Python asyncio. It draws inspiration from TCP congestion control algorithms to dynamically adjust the number of concurrent tasks based on system load, optimizing task throughput and preventing overload. Additionally, it provides a decorator for automatic retry of tasks that fail due to system overload.
Install the latest stable version from PyPI:
pip install adaptio
The library provides an adaptive retry decorator: with_adaptive_retry
This decorator can be used to automatically retry tasks that fail due to system overload (ServiceOverloadError).
Decorator Parameters:
Usage Example:
from adaptio import with_adaptive_retry, ServiceOverloadError
import asyncio
import random
# Design a test task that triggers ServiceOverloadError at 16 concurrency
sample_task_overload_threshold = 16
sample_task_running_count = 0
async def sample_task(task_id):
"""A sample task that simulates workload and triggers overload at a certain concurrency."""
global sample_task_running_count
sample_task_running_count += 1
# Simulate random task duration
await asyncio.sleep(random.uniform(1, 3))
# Simulate overload error
if sample_task_running_count > sample_task_overload_threshold:
sample_task_running_count -= 1
raise ServiceOverloadError(
f"Service overloaded with {sample_task_running_count} tasks > {sample_task_overload_threshold}"
)
else:
sample_task_running_count -= 1
return f"Task {task_id} done"
# Method 1: Using default configuration
@with_adaptive_retry()
async def sample_task_with_retry(task_id):
return await sample_task(task_id)
# Method 2: Custom configuration parameters
@with_adaptive_retry(
max_retries=512,
retry_interval_seconds=3,
max_concurrency=128,
min_concurrency=4,
initial_concurrency=4,
adjust_overload_rate=0.2
)
async def sample_task_with_custom_retry(task_id):
return await sample_task(task_id)
# Method 3: Using custom scheduler (shared between multiple functions)
# Create a shared scheduler instance
from adaptio import AdaptiveAsyncConcurrencyLimiter
shared_scheduler = AdaptiveAsyncConcurrencyLimiter(
max_concurrency=64,
min_concurrency=2,
initial_concurrency=4,
adjust_overload_rate=0.15
)
# Multiple functions sharing the same scheduler
@with_adaptive_retry(scheduler=shared_scheduler)
async def task_type_a(task_id):
return await sample_task(task_id)
@with_adaptive_retry(scheduler=shared_scheduler)
async def task_type_b(task_id):
return await sample_task(task_id)
# Run example tasks
async def main():
print("=== Testing Method 1: Using default configuration ===")
tasks1 = [sample_task_with_retry(i) for i in range(100)]
for result in asyncio.as_completed(tasks1):
try:
print(await result)
except Exception as e:
print(f"Task failed: {e}")
print("\n=== Testing Method 2: Using custom configuration ===")
tasks2 = [sample_task_with_custom_retry(i) for i in range(100)]
for result in asyncio.as_completed(tasks2):
try:
print(await result)
except Exception as e:
print(f"Task failed: {e}")
print("\n=== Testing Method 3: Using shared scheduler ===")
# Mix different types of tasks, they will share concurrency limits
tasks3 = []
for i in range(100):
if i % 2 == 0:
tasks3.append(task_type_a(i))
else:
tasks3.append(task_type_b(i))
for result in asyncio.as_completed(tasks3):
try:
print(await result)
except Exception as e:
print(f"Task failed: {e}")
if __name__ == "__main__":
asyncio.run(main())
Explanation:
Usage Recommendations:
The raise_on_aiohttp_overload
decorator is used to convert specific HTTP status codes from aiohttp into ServiceOverloadError exceptions, making it easier to integrate with dynamic task schedulers.
Decorator Parameters:
Usage Example:
from adaptio import with_adaptive_retry, raise_on_aiohttp_overload
import aiohttp
@with_adaptive_retry()
@raise_on_aiohttp_overload()
async def fetch_data(session: aiohttp.ClientSession, url: str):
async with session.get(url) as response:
response.raise_for_status()
return await response.json()
# Combined usage example
async def main(data_id: str):
async with aiohttp.ClientSession() as session:
try:
data = await fetch_data(session, f"http://api.example.com/data/{data_id}")
print(f"Data retrieved successfully: {data}")
except Exception as e:
print(f"Failed to retrieve data: {data_id=} {e}")
if __name__ == "__main__":
asyncio.run(asyncio.gather(*(main(data_id) for data_id in range(100))))
Notes:
Usage Recommendations:
This decorator provides a comprehensive async operation control solution, supporting concurrency limits, QPS control, and retry mechanisms.
Decorator Parameters:
Usage Example:
from adaptio import with_async_control
import asyncio
@with_async_control(
exception_type=ValueError, # Only catch ValueError
max_concurrency=5, # Maximum 5 concurrent tasks
max_qps=10, # Maximum 10 requests per second
retry_n=2, # Retry 2 times after failure
retry_delay=0.5 # Retry interval 0.5 seconds
)
async def api_call(i: int) -> str:
# Simulate API call
await asyncio.sleep(1.0)
return f"Request {i} successful"
async def main():
# Create multiple concurrent tasks
tasks = [api_call(i) for i in range(10)]
# Wait for all tasks to complete
results = await asyncio.gather(*tasks)
for i, result in enumerate(results):
print(f"Task {i}: {result}")
if __name__ == "__main__":
asyncio.run(main())
Usage Scenarios:
Differences from with_adaptive_retry:
git clone https://github.com/Haskely/adaptio.git
cd adaptio
python3.10 -m venv .venv --prompt adaptio
source .venv/bin/activate # Linux/macOS
# or
.venv\Scripts\activate # Windows
pip install -e ".[dev]"
pre-commit install
This project uses multiple tools to ensure code quality:
Ruff: For code formatting and linting
ruff check --fix .
ruff format .
MyPy: For static type checking
mypy .
Pre-commit hooks:
Run unit tests:
python -m unittest discover tests
This project fully supports type hints and includes a py.typed
marker file. Users can get complete type checking support in their projects.
Example:
from adaptio import AdaptiveAsyncConcurrencyLimiter
from typing import AsyncIterator
async def process_items(items: AsyncIterator[str]) -> None:
scheduler = AdaptiveAsyncConcurrencyLimiter(
max_concurrency=10,
min_concurrency=1
)
async for item in items:
await scheduler.submit(process_item(item))
cz bump
git push
git push --tags
A: It's recommended to start with a small value (like 4-8) and let the system automatically adjust to the optimal value. Too large an initial value may cause system overload at startup.
A:
with_adaptive_retry
: Suitable for scenarios requiring dynamic concurrency adjustment, especially when load varies significantlywith_async_control
: Suitable for scenarios requiring fixed concurrency limits and QPS controlraise_on_aiohttp_overload
: Specifically for handling HTTP request overload situationsA: You can view detailed adjustment process by setting log_level="DEBUG"
, or directly access scheduler properties like current_concurrency
to get runtime status.
ignore_loop_bound_exception
parameter?A: This parameter is mainly used to handle special cases when using async code in a multi-threaded environment. If you initialize a semaphore in one thread and then use it in an async function in another thread, you might encounter the "is bound to a different event loop" error. Usually, this indicates a design issue in the code, and the async/sync interaction logic should be fixed. However, in some unavoidable cases, you can set this parameter to True to ignore the exception, but note that this will cause concurrency control to fail. Most applications don't need to set this parameter.
FAQs
Adaptio 是一个基于 Python asyncio 的智能并发控制工具。它借鉴了 TCP 拥塞控制算法的思想,可以根据系统负载动态调整并发任务的数量,从而优化任务吞吐量并防止过载。此外,还提供了一个装饰器,当任务因系统过载失败时自动重试。
We found that adaptio demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Research
/Security News
A flawed sandbox in @nestjs/devtools-integration lets attackers run code on your machine via CSRF, leading to full Remote Code Execution (RCE).
Product
Customize license detection with Socket’s new license overlays: gain control, reduce noise, and handle edge cases with precision.
Product
Socket now supports Rust and Cargo, offering package search for all users and experimental SBOM generation for enterprise projects.