aiofreqlimit
Advanced tools
| from .base import FreqLimitBackend | ||
| __all__ = ("FreqLimitBackend",) |
| from collections.abc import Hashable | ||
| from typing import Protocol, runtime_checkable | ||
| from aiofreqlimit.params import FreqLimitParams | ||
| __all__ = ("FreqLimitBackend",) | ||
| @runtime_checkable | ||
| class FreqLimitBackend(Protocol): | ||
| async def reserve( | ||
| self, | ||
| key: Hashable, | ||
| now: float, | ||
| params: FreqLimitParams, | ||
| ) -> float: | ||
| """ | ||
| Reserve a slot for `key` at moment `now`. | ||
| Must: | ||
| * read current state (TAT etc.), | ||
| * update it, | ||
| * return delay in seconds (0.0 means run now). | ||
| """ | ||
| ... |
| import asyncio | ||
| import contextlib | ||
| from collections.abc import Hashable | ||
| from aiofreqlimit.gcra import gcra_step | ||
| from aiofreqlimit.params import FreqLimitParams | ||
| # pyright: reportPrivateUsage=false | ||
| __all__ = ("InMemoryBackend",) | ||
| class InMemoryBackend: | ||
| """ | ||
| In-process backend: single event loop / single process. | ||
| For each key keeps: | ||
| * _tat[key] — current TAT per GCRA | ||
| * _locks[key] — asyncio.Lock to serialize key traffic | ||
| """ | ||
| def __init__( | ||
| self, | ||
| *, | ||
| idle_ttl: float | None = None, | ||
| sweeper_interval: float | None = None, | ||
| ) -> None: | ||
| if idle_ttl is not None and idle_ttl <= 0: | ||
| msg = "idle_ttl must be positive or None" | ||
| raise ValueError(msg) | ||
| if sweeper_interval is not None and sweeper_interval <= 0: | ||
| msg = "sweeper_interval must be positive or None" | ||
| raise ValueError(msg) | ||
| self._tat: dict[Hashable, float] = {} | ||
| self._locks: dict[Hashable, asyncio.Lock] = {} | ||
| self._last_seen: dict[Hashable, float] = {} | ||
| self._idle_ttl: float | None = idle_ttl | ||
| self._sweeper_interval: float | None = sweeper_interval | ||
| self._sweeper_task: asyncio.Task[None] | None = None | ||
| async def reserve( | ||
| self, | ||
| key: Hashable, | ||
| now: float, | ||
| params: FreqLimitParams, | ||
| ) -> float: | ||
| if self._idle_ttl is not None: | ||
| self._cleanup_expired(now) | ||
| self._ensure_sweeper() | ||
| lock = self._locks.get(key) | ||
| if lock is None: | ||
| lock = asyncio.Lock() | ||
| self._locks[key] = lock | ||
| async with lock: | ||
| tat = self._tat.get(key) | ||
| new_tat, delay = gcra_step(now, tat, params) | ||
| self._tat[key] = new_tat | ||
| self._last_seen[key] = now | ||
| return delay | ||
| async def clear(self) -> None: | ||
| """Reset state (handy in tests or manual reset).""" | ||
| self._tat.clear() | ||
| self._locks.clear() | ||
| self._last_seen.clear() | ||
| if self._sweeper_task is not None: | ||
| _ = self._sweeper_task.cancel() | ||
| with contextlib.suppress(asyncio.CancelledError): | ||
| await self._sweeper_task | ||
| self._sweeper_task = None | ||
| def _cleanup_expired(self, now: float) -> None: | ||
| ttl = self._idle_ttl | ||
| if ttl is None: | ||
| return | ||
| expiry_threshold = now - ttl | ||
| for key in list(self._tat): | ||
| last_seen = self._last_seen.get(key) | ||
| if last_seen is None or last_seen > expiry_threshold: | ||
| continue | ||
| lock = self._locks.get(key) | ||
| if lock is not None and lock.locked(): | ||
| continue | ||
| _ = self._tat.pop(key, None) | ||
| _ = self._locks.pop(key, None) | ||
| _ = self._last_seen.pop(key, None) | ||
| def _ensure_sweeper(self) -> None: | ||
| interval = self._sweeper_interval | ||
| if interval is None: | ||
| return | ||
| if self._sweeper_task is not None and not self._sweeper_task.done(): | ||
| return | ||
| loop = asyncio.get_running_loop() | ||
| self._sweeper_task = loop.create_task(self._sweep_periodically(interval)) | ||
| async def _sweep_periodically(self, interval: float) -> None: | ||
| try: | ||
| while True: | ||
| await asyncio.sleep(interval) | ||
| now = asyncio.get_running_loop().time() | ||
| self._cleanup_expired(now) | ||
| except asyncio.CancelledError: # pragma: no cover - lifecycle cleanup | ||
| raise |
| from collections.abc import Hashable | ||
| from textwrap import dedent as ddent | ||
| from typing import Final | ||
| from redis.asyncio import Redis | ||
| from redis.commands.core import Script | ||
| from ..params import FreqLimitParams | ||
| __all__ = ("GCRA_LUA", "RedisBackend") | ||
| # Lua script: single GCRA step + TTL handling (atomic on Redis). | ||
| # | ||
| # KEYS[1] = key holding TAT | ||
| # ARGV[1] = interval (T) | ||
| # ARGV[2] = tau | ||
| # ARGV[3] = extra_ttl (seconds) | ||
| # | ||
| # Returns string float: delay in seconds until request is conforming. | ||
| GCRA_LUA: Final[str] = ddent( | ||
| r""" | ||
| redis.replicate_commands() | ||
| local key = KEYS[1] | ||
| local interval = tonumber(ARGV[1]) | ||
| local tau = tonumber(ARGV[2]) | ||
| local extra_ttl = tonumber(ARGV[3]) | ||
| -- Current time from Redis server: seconds + microseconds | ||
| local now_time = redis.call("TIME") | ||
| local now = tonumber(now_time[1]) + tonumber(now_time[2]) / 1000000.0 | ||
| -- Read TAT if present | ||
| local tat_str = redis.call("GET", key) | ||
| local tat | ||
| if not tat_str then | ||
| tat = now | ||
| else | ||
| tat = tonumber(tat_str) | ||
| end | ||
| -- Earliest conforming time | ||
| local allowed_time = tat - tau | ||
| local delay = 0.0 | ||
| local effective_now = now | ||
| if effective_now < allowed_time then | ||
| delay = allowed_time - effective_now | ||
| effective_now = allowed_time | ||
| end | ||
| -- GCRA virtual scheduling | ||
| if effective_now >= tat then | ||
| tat = effective_now + interval | ||
| else | ||
| tat = tat + interval | ||
| end | ||
| -- TTL: keep key while there is debt plus small buffer | ||
| local ttl = (tat - now) + extra_ttl | ||
| if ttl < 1.0 then | ||
| ttl = 1.0 | ||
| end | ||
| redis.call("SET", key, tat, "EX", math.ceil(ttl)) | ||
| return tostring(delay) | ||
| """ | ||
| ) | ||
| class RedisBackend: | ||
| """ | ||
| Redis backend for FreqLimit. | ||
| GCRA logic runs in a Lua script on Redis, using server time (TIME) | ||
| so multiple hosts share a clock. Python side only passes parameters | ||
| and parses the resulting delay. | ||
| """ | ||
| def __init__( | ||
| self, | ||
| redis: Redis, | ||
| *, | ||
| prefix: str = "freqlimit:gcra:", | ||
| extra_ttl: float = 0.0, | ||
| ) -> None: | ||
| """ | ||
| redis — redis.asyncio.Redis client. | ||
| prefix — prefix for limiter keys. | ||
| extra_ttl — extra TTL buffer after backlog is cleared (seconds). | ||
| """ | ||
| self._redis: Final[Redis] = redis | ||
| self._prefix: Final = prefix | ||
| self._extra_ttl: Final = float(extra_ttl) | ||
| # Script object caches SHA and transparently uses EVAL/EVALSHA. | ||
| self._script: Script = redis.register_script(GCRA_LUA) | ||
| async def reserve( | ||
| self, | ||
| key: Hashable, | ||
| now: float, | ||
| params: FreqLimitParams, | ||
| ) -> float: | ||
| redis_key = f"{self._prefix}{key}" | ||
| interval = params.interval | ||
| tau = params.tau | ||
| # TTL buffer after backlog is cleared; Lua adds debt duration | ||
| extra_ttl = self._extra_ttl | ||
| _ = now # server time is used inside Lua script | ||
| delay_str: str = await self._script( | ||
| keys=[redis_key], | ||
| args=[interval, tau, extra_ttl], | ||
| ) | ||
| return float(delay_str) | ||
| async def clear(self) -> None: | ||
| """Delete keys with the prefix (handy for tests/debug).""" | ||
| pattern = f"{self._prefix}*" | ||
| async for key in self._redis.scan_iter(match=pattern): | ||
| _ = await self._redis.delete(key) |
| from .params import FreqLimitParams | ||
| __all__ = ("gcra_step",) | ||
| def gcra_step( | ||
| now: float, | ||
| tat: float | None, | ||
| params: FreqLimitParams, | ||
| ) -> tuple[float, float]: | ||
| """ | ||
| Single GCRA step (Generic Cell Rate Algorithm) in virtual | ||
| scheduling form. | ||
| Input: | ||
| * now — current time (same units as tat, usually loop.time()) | ||
| * tat — TAT (Theoretical Arrival Time) for key, or None if new | ||
| * params — limit contract | ||
| Output: | ||
| * new_tat — updated TAT | ||
| * delay — how long to wait from now to be conforming | ||
| """ | ||
| if tat is None: | ||
| tat = now | ||
| # Earliest moment when the packet would be conforming | ||
| allowed_time = tat - params.tau | ||
| delay = 0.0 | ||
| effective_now = now | ||
| if effective_now < allowed_time: | ||
| delay = allowed_time - effective_now | ||
| effective_now = allowed_time | ||
| # GCRA virtual scheduling: | ||
| # - if arrived late → tat = effective_now + params.interval | ||
| # - if slightly early → tat = tat + params.interval | ||
| tat = ( | ||
| effective_now + params.interval | ||
| if effective_now >= tat | ||
| else tat + params.interval | ||
| ) | ||
| return tat, delay |
| import asyncio | ||
| import inspect | ||
| from collections.abc import AsyncIterator, Awaitable, Callable, Hashable | ||
| from contextlib import asynccontextmanager | ||
| from typing import Final | ||
| from .backends import FreqLimitBackend | ||
| from .params import FreqLimitParams | ||
| __all__ = ("FreqLimit",) | ||
| class FreqLimit: | ||
| """ | ||
| Async rate limiter built on the Generic Cell Rate Algorithm (GCRA). | ||
| Example: | ||
| params = FreqLimitParams(limit=1, period=1.0) # 1 опер/сек | ||
| limiter = FreqLimit(params, backend=InMemoryBackend()) | ||
| async with limiter.resource("chat:42"): | ||
| await send_message(...) | ||
| """ | ||
| # Alias so one can write FreqLimit.Params(...) | ||
| Params: type[FreqLimitParams] = FreqLimitParams | ||
| def __init__( | ||
| self, | ||
| params: FreqLimitParams, | ||
| *, | ||
| backend: FreqLimitBackend, | ||
| ) -> None: | ||
| self._params: Final = params | ||
| self._backend: Final = backend | ||
| @property | ||
| def params(self) -> FreqLimitParams: | ||
| return self._params | ||
| @property | ||
| def backend(self) -> FreqLimitBackend: | ||
| return self._backend | ||
| @asynccontextmanager | ||
| async def resource(self, key: Hashable | None = None) -> AsyncIterator[None]: | ||
| """ | ||
| Context manager that enforces the limit. | ||
| key=None — global bucket (single key for whole limiter). | ||
| """ | ||
| if key is None: | ||
| key = "_global" | ||
| loop = asyncio.get_running_loop() | ||
| now = loop.time() | ||
| delay = await self._backend.reserve(key, now, self._params) | ||
| if delay > 0: | ||
| await asyncio.sleep(delay) | ||
| yield | ||
| async def clear(self) -> None: | ||
| """ | ||
| Reset backend state if supported. | ||
| For InMemoryBackend — clears data. | ||
| """ | ||
| clear_obj: Callable[[], Awaitable[object] | object] | None = getattr( | ||
| self._backend, | ||
| "clear", | ||
| None, | ||
| ) | ||
| if clear_obj is None: | ||
| return | ||
| result = clear_obj() | ||
| if inspect.isawaitable(result): | ||
| await result |
| from dataclasses import dataclass | ||
| __all__ = ("FreqLimitParams",) | ||
| @dataclass(frozen=True, slots=True) | ||
| class FreqLimitParams: | ||
| """ | ||
| GCRA limit parameters: | ||
| * limit — number of events allowed per period | ||
| * period — window length in seconds | ||
| * burst — how many events can be squeezed almost at once | ||
| Derived: | ||
| * interval (T) = period / limit | ||
| * tau = (burst - 1) * interval | ||
| """ | ||
| limit: int | ||
| period: float | ||
| burst: int = 1 | ||
| def __post_init__(self) -> None: | ||
| if self.limit <= 0: | ||
| raise ValueError("limit must be greater than 0") | ||
| if self.period <= 0: | ||
| raise ValueError("period must be greater than 0") | ||
| if self.burst <= 0: | ||
| raise ValueError("burst must be greater than 0") | ||
| @property | ||
| def interval(self) -> float: | ||
| return self.period / self.limit | ||
| @property | ||
| def tau(self) -> float: | ||
| return self.interval * (self.burst - 1) |
+2
-2
| MIT License | ||
| Copyright (c) 2018-2024 Gleb Chipiga | ||
| Copyright (c) 2018-2025 Gleb Chipiga | ||
@@ -21,2 +21,2 @@ Permission is hereby granted, free of charge, to any person obtaining a copy | ||
| OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | ||
| SOFTWARE. | ||
| SOFTWARE. |
+85
-15
| Metadata-Version: 2.3 | ||
| Name: aiofreqlimit | ||
| Version: 0.1.5 | ||
| Version: 0.2.0 | ||
| Summary: Frequency limit for asyncio | ||
@@ -8,3 +8,3 @@ Author: Gleb Chipiga | ||
| Copyright (c) 2018-2024 Gleb Chipiga | ||
| Copyright (c) 2018-2025 Gleb Chipiga | ||
@@ -42,16 +42,27 @@ Permission is hereby granted, free of charge, to any person obtaining a copy | ||
| Classifier: Framework :: AsyncIO | ||
| Requires-Dist: redis>=5.0.0 ; extra == 'redis' | ||
| Requires-Dist: pytest>=9.0.1 ; extra == 'test' | ||
| Requires-Dist: pytest-asyncio>=1.3.0 ; extra == 'test' | ||
| Requires-Dist: hypothesis>=6.148.1 ; extra == 'test' | ||
| Requires-Dist: pytest-cov>=7.0.0 ; extra == 'test' | ||
| Requires-Dist: pytest-mock>=3.14.0 ; extra == 'test' | ||
| Requires-Dist: redis>=5.0.0 ; extra == 'test' | ||
| Requires-Dist: testcontainers[redis]>=4.3.3 ; extra == 'test' | ||
| Requires-Python: >=3.11, <3.15 | ||
| Project-URL: Homepage, https://github.com/gleb-chipiga/aiofreqlimit | ||
| Provides-Extra: redis | ||
| Provides-Extra: test | ||
| Description-Content-Type: text/markdown | ||
| # Frequency Limit Context Manager for asyncio | ||
| # aiofreqlimit — Async GCRA rate limiter | ||
| [](https://pypi.org/project/aiofreqlimit) | ||
| [](https://github.com/gleb-chipiga/aiofreqlimit/blob/master/LICENSE) | ||
| [](LICENSE) | ||
| [](https://pypistats.org/packages/aiofreqlimit) | ||
| Async rate limiting for Python 3.11+ built on the Generic Cell Rate Algorithm (GCRA) with | ||
| type-safe parameters and pluggable backends. | ||
| ## Installation | ||
| aiofreqlimit requires Python 3.11 or greater and is available on PyPI. Use pip to install it: | ||
| ```bash | ||
@@ -61,5 +72,6 @@ pip install aiofreqlimit | ||
| ## Using aiofreqlimit | ||
| ## Quickstart | ||
| Pass a value of any hashable type to `acquire`, or omit the argument to use the default key: | ||
| Create a contract (`FreqLimitParams`), choose a backend, and wrap your code with the | ||
| async context manager: | ||
@@ -69,14 +81,16 @@ ```python | ||
| from aiofreqlimit import FreqLimit | ||
| from aiofreqlimit import FreqLimit, FreqLimitParams | ||
| from aiofreqlimit.backends.memory import InMemoryBackend | ||
| limit = FreqLimit(1 / 10) | ||
| params = FreqLimitParams(limit=1, period=1.0, burst=1) # 1 op / second | ||
| limiter = FreqLimit(params, backend=InMemoryBackend()) | ||
| async def job(): | ||
| async with limit.acquire("some_key"): | ||
| await some_call() | ||
| async def send_message(chat_id: int, text: str) -> None: | ||
| async with limiter.resource(f"chat:{chat_id}"): | ||
| await bot.send_message(chat_id, text) | ||
| async def main(): | ||
| await asyncio.gather(job() for _ in range(100)) | ||
| async def main() -> None: | ||
| await asyncio.gather(*(send_message(42, f"msg {i}") for i in range(5))) | ||
@@ -86,1 +100,57 @@ | ||
| ``` | ||
| - `key` is any hashable; `None` uses a global bucket. | ||
| - `burst` lets you allow short bursts without changing the long-term rate. | ||
| ## Params are type-safe | ||
| You can keep your limits as constants and reuse them across the project: | ||
| ```python | ||
| from aiofreqlimit import FreqLimitParams | ||
| TELEGRAM_PER_CHAT = FreqLimitParams(limit=1, period=1.0, burst=1) | ||
| TELEGRAM_PER_GROUP = FreqLimitParams(limit=20, period=60.0, burst=3) | ||
| ``` | ||
| ## Backends | ||
| - `InMemoryBackend` (in-process, single event loop) — import from | ||
| `aiofreqlimit.backends.memory`. | ||
| - `idle_ttl: float | None` — drop idle keys after this many seconds (default: None). | ||
| - `sweeper_interval: float | None` — optional background cleanup period; set to | ||
| enable a sweeper task (default: None). | ||
| - `RedisBackend` (shared, multi-host) — import from `aiofreqlimit.backends.redis`. | ||
| - Install optional deps: `pip install aiofreqlimit[redis]`. | ||
| - Uses Redis server time and a Lua script for atomic GCRA steps. | ||
| - `prefix: str` — key prefix (default `freqlimit:gcra:`). | ||
| - `extra_ttl: float` — small buffer added to debt horizon; controls how long keys | ||
| stay after backlog is cleared. | ||
| - Implement `FreqLimitBackend` to plug in other stores: | ||
| ```python | ||
| from collections.abc import Hashable | ||
| from aiofreqlimit import FreqLimitBackend, FreqLimitParams | ||
| class RedisBackend(FreqLimitBackend): | ||
| async def reserve(self, key: Hashable, now: float, params: FreqLimitParams) -> float: | ||
| ... | ||
| ``` | ||
| `FreqLimit` requires an explicit backend instance; no default is provided. | ||
| ## Testing | ||
| The library ships with pytest + hypothesis tests. To run them with uv: | ||
| ```bash | ||
| uv run pytest tests | ||
| ``` | ||
| Integration tests for the Redis backend use Testcontainers; Docker must be available | ||
| for those cases. | ||
| ## License | ||
| MIT |
+25
-1
@@ -7,3 +7,3 @@ [build-system] | ||
| name = "aiofreqlimit" | ||
| version = "0.1.5" | ||
| version = "0.2.0" | ||
| description = "Frequency limit for asyncio" | ||
@@ -33,2 +33,14 @@ readme = "README.md" | ||
| [project.optional-dependencies] | ||
| redis = ["redis>=5.0.0"] | ||
| test = [ | ||
| "pytest>=9.0.1", | ||
| "pytest-asyncio>=1.3.0", | ||
| "hypothesis>=6.148.1", | ||
| "pytest-cov>=7.0.0", | ||
| "pytest-mock>=3.14.0", | ||
| "redis>=5.0.0", | ||
| "testcontainers[redis]>=4.3.3", | ||
| ] | ||
| [dependency-groups] | ||
@@ -42,3 +54,5 @@ lint = [ | ||
| "pytest-mock>=3.14.0", | ||
| "redis>=5.0.0", | ||
| "hypothesis>=6.148.1", | ||
| "testcontainers[redis]>=4.3.3", | ||
| ] | ||
@@ -51,2 +65,4 @@ test = [ | ||
| "pytest-mock>=3.14.0", | ||
| "redis>=5.0.0", | ||
| "testcontainers[redis]>=4.3.3", | ||
| ] | ||
@@ -62,2 +78,4 @@ dev = [ | ||
| "pytest-mock>=3.14.0", | ||
| "redis>=5.0.0", | ||
| "testcontainers[redis]>=4.3.3", | ||
| "ruff>=0.14.5", | ||
@@ -104,4 +122,10 @@ "twine>=6.2.0", | ||
| [tool.mypy] | ||
| mypy_path = "typings" | ||
| [tool.pytest.ini_options] | ||
| asyncio_mode = "auto" | ||
| filterwarnings = [ | ||
| "ignore:The @wait_container_is_ready decorator is deprecated:DeprecationWarning:testcontainers.*", | ||
| ] | ||
@@ -108,0 +132,0 @@ [tool.tox] |
+73
-13
@@ -1,11 +0,12 @@ | ||
| # Frequency Limit Context Manager for asyncio | ||
| # aiofreqlimit — Async GCRA rate limiter | ||
| [](https://pypi.org/project/aiofreqlimit) | ||
| [](https://github.com/gleb-chipiga/aiofreqlimit/blob/master/LICENSE) | ||
| [](LICENSE) | ||
| [](https://pypistats.org/packages/aiofreqlimit) | ||
| Async rate limiting for Python 3.11+ built on the Generic Cell Rate Algorithm (GCRA) with | ||
| type-safe parameters and pluggable backends. | ||
| ## Installation | ||
| aiofreqlimit requires Python 3.11 or greater and is available on PyPI. Use pip to install it: | ||
| ```bash | ||
@@ -15,5 +16,6 @@ pip install aiofreqlimit | ||
| ## Using aiofreqlimit | ||
| ## Quickstart | ||
| Pass a value of any hashable type to `acquire`, or omit the argument to use the default key: | ||
| Create a contract (`FreqLimitParams`), choose a backend, and wrap your code with the | ||
| async context manager: | ||
@@ -23,14 +25,16 @@ ```python | ||
| from aiofreqlimit import FreqLimit | ||
| from aiofreqlimit import FreqLimit, FreqLimitParams | ||
| from aiofreqlimit.backends.memory import InMemoryBackend | ||
| limit = FreqLimit(1 / 10) | ||
| params = FreqLimitParams(limit=1, period=1.0, burst=1) # 1 op / second | ||
| limiter = FreqLimit(params, backend=InMemoryBackend()) | ||
| async def job(): | ||
| async with limit.acquire("some_key"): | ||
| await some_call() | ||
| async def send_message(chat_id: int, text: str) -> None: | ||
| async with limiter.resource(f"chat:{chat_id}"): | ||
| await bot.send_message(chat_id, text) | ||
| async def main(): | ||
| await asyncio.gather(job() for _ in range(100)) | ||
| async def main() -> None: | ||
| await asyncio.gather(*(send_message(42, f"msg {i}") for i in range(5))) | ||
@@ -40,1 +44,57 @@ | ||
| ``` | ||
| - `key` is any hashable; `None` uses a global bucket. | ||
| - `burst` lets you allow short bursts without changing the long-term rate. | ||
| ## Params are type-safe | ||
| You can keep your limits as constants and reuse them across the project: | ||
| ```python | ||
| from aiofreqlimit import FreqLimitParams | ||
| TELEGRAM_PER_CHAT = FreqLimitParams(limit=1, period=1.0, burst=1) | ||
| TELEGRAM_PER_GROUP = FreqLimitParams(limit=20, period=60.0, burst=3) | ||
| ``` | ||
| ## Backends | ||
| - `InMemoryBackend` (in-process, single event loop) — import from | ||
| `aiofreqlimit.backends.memory`. | ||
| - `idle_ttl: float | None` — drop idle keys after this many seconds (default: None). | ||
| - `sweeper_interval: float | None` — optional background cleanup period; set to | ||
| enable a sweeper task (default: None). | ||
| - `RedisBackend` (shared, multi-host) — import from `aiofreqlimit.backends.redis`. | ||
| - Install optional deps: `pip install aiofreqlimit[redis]`. | ||
| - Uses Redis server time and a Lua script for atomic GCRA steps. | ||
| - `prefix: str` — key prefix (default `freqlimit:gcra:`). | ||
| - `extra_ttl: float` — small buffer added to debt horizon; controls how long keys | ||
| stay after backlog is cleared. | ||
| - Implement `FreqLimitBackend` to plug in other stores: | ||
| ```python | ||
| from collections.abc import Hashable | ||
| from aiofreqlimit import FreqLimitBackend, FreqLimitParams | ||
| class RedisBackend(FreqLimitBackend): | ||
| async def reserve(self, key: Hashable, now: float, params: FreqLimitParams) -> float: | ||
| ... | ||
| ``` | ||
| `FreqLimit` requires an explicit backend instance; no default is provided. | ||
| ## Testing | ||
| The library ships with pytest + hypothesis tests. To run them with uv: | ||
| ```bash | ||
| uv run pytest tests | ||
| ``` | ||
| Integration tests for the Redis backend use Testcontainers; Docker must be available | ||
| for those cases. | ||
| ## License | ||
| MIT |
@@ -1,99 +0,13 @@ | ||
| import asyncio | ||
| from collections.abc import AsyncIterator, Hashable | ||
| from contextlib import AsyncExitStack, asynccontextmanager, suppress | ||
| from types import TracebackType | ||
| from typing import Final | ||
| from ._version import __version__ | ||
| from .backends import FreqLimitBackend | ||
| from .gcra import gcra_step | ||
| from .limiter import FreqLimit | ||
| from .params import FreqLimitParams | ||
| __all__ = ("FreqLimit", "__version__") | ||
| class Lock: | ||
| def __init__(self) -> None: | ||
| self._ts: float = -float("inf") | ||
| self._count: int = 0 | ||
| self._lock: Final = asyncio.Lock() | ||
| @property | ||
| def ts(self) -> float: | ||
| return self._ts | ||
| @ts.setter | ||
| def ts(self, ts: float) -> None: | ||
| self._ts = ts | ||
| async def __aenter__(self) -> None: | ||
| self._count += 1 | ||
| _ = await self._lock.acquire() | ||
| async def __aexit__( | ||
| self, | ||
| exc_type: type[BaseException] | None, | ||
| exc_val: BaseException | None, | ||
| exc_tb: TracebackType | None, | ||
| ) -> None: | ||
| try: | ||
| self._lock.release() | ||
| finally: | ||
| self._count -= 1 | ||
| @property | ||
| def count(self) -> int: | ||
| return self._count | ||
| class FreqLimit: | ||
| def __init__( | ||
| self, | ||
| interval: float, | ||
| clean_interval: float = 0, | ||
| ) -> None: | ||
| if interval <= 0: | ||
| raise RuntimeError("Interval must be greater than 0") | ||
| if clean_interval < 0: | ||
| raise RuntimeError("Clean interval must be greater than or equal to 0") | ||
| self._interval: Final = interval | ||
| self._clean_interval: Final = clean_interval if clean_interval > 0 else interval | ||
| self._locks: Final = dict[Hashable, Lock]() | ||
| self._clean_event: Final = asyncio.Event() | ||
| self._clean_task: asyncio.Task[None] | None = None | ||
| self._loop: Final = asyncio.get_running_loop() | ||
| @asynccontextmanager | ||
| async def resource( | ||
| self, | ||
| key: Hashable = None, | ||
| ) -> AsyncIterator[None]: | ||
| if self._clean_task is None: | ||
| self._clean_task = asyncio.create_task(self._clean()) | ||
| if key not in self._locks: | ||
| self._locks[key] = Lock() | ||
| async with AsyncExitStack() as stack: | ||
| _ = stack.callback(self._clean_event.set) | ||
| _ = await stack.enter_async_context(self._locks[key]) | ||
| delay = self._interval - self._loop.time() + self._locks[key].ts | ||
| if delay > 0: | ||
| await asyncio.sleep(delay) | ||
| self._locks[key].ts = self._loop.time() | ||
| yield | ||
| async def clear(self) -> None: | ||
| if self._clean_task is not None: | ||
| _ = self._clean_task.cancel() | ||
| with suppress(asyncio.CancelledError): | ||
| await self._clean_task | ||
| self._clean_task = None | ||
| self._locks.clear() | ||
| self._clean_event.clear() | ||
| async def _clean(self) -> None: | ||
| while True: | ||
| if len(self._locks) == 0: | ||
| _ = await self._clean_event.wait() | ||
| self._clean_event.clear() | ||
| for key in tuple(self._locks): | ||
| age = self._loop.time() - self._locks[key].ts | ||
| if self._locks[key].count == 0 and age >= self._clean_interval: | ||
| del self._locks[key] | ||
| await asyncio.sleep(self._clean_interval) | ||
| __all__ = ( | ||
| "FreqLimit", | ||
| "FreqLimitBackend", | ||
| "FreqLimitParams", | ||
| "__version__", | ||
| "gcra_step", | ||
| ) |
@@ -1,3 +0,1 @@ | ||
| from __future__ import annotations | ||
| from importlib import metadata | ||
@@ -4,0 +2,0 @@ from pathlib import Path |
Alert delta unavailable
Currently unable to show alert delta for PyPI packages.
26366
112.13%14
100%377
249.07%