You're Invited:Meet the Socket Team at RSAC and BSidesSF 2026, March 23–26.RSVP
Socket
Book a DemoSign in
Socket

aiofreqlimit

Package Overview
Dependencies
Maintainers
1
Versions
22
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

aiofreqlimit - pypi Package Compare versions

Comparing version
0.1.5
to
0.2.0
+3
src/aiofreqlimit/backends/__init__.py
from .base import FreqLimitBackend
__all__ = ("FreqLimitBackend",)
from collections.abc import Hashable
from typing import Protocol, runtime_checkable
from aiofreqlimit.params import FreqLimitParams
__all__ = ("FreqLimitBackend",)
@runtime_checkable
class FreqLimitBackend(Protocol):
async def reserve(
self,
key: Hashable,
now: float,
params: FreqLimitParams,
) -> float:
"""
Reserve a slot for `key` at moment `now`.
Must:
* read current state (TAT etc.),
* update it,
* return delay in seconds (0.0 means run now).
"""
...
import asyncio
import contextlib
from collections.abc import Hashable
from aiofreqlimit.gcra import gcra_step
from aiofreqlimit.params import FreqLimitParams
# pyright: reportPrivateUsage=false
__all__ = ("InMemoryBackend",)
class InMemoryBackend:
"""
In-process backend: single event loop / single process.
For each key keeps:
* _tat[key] — current TAT per GCRA
* _locks[key] — asyncio.Lock to serialize key traffic
"""
def __init__(
self,
*,
idle_ttl: float | None = None,
sweeper_interval: float | None = None,
) -> None:
if idle_ttl is not None and idle_ttl <= 0:
msg = "idle_ttl must be positive or None"
raise ValueError(msg)
if sweeper_interval is not None and sweeper_interval <= 0:
msg = "sweeper_interval must be positive or None"
raise ValueError(msg)
self._tat: dict[Hashable, float] = {}
self._locks: dict[Hashable, asyncio.Lock] = {}
self._last_seen: dict[Hashable, float] = {}
self._idle_ttl: float | None = idle_ttl
self._sweeper_interval: float | None = sweeper_interval
self._sweeper_task: asyncio.Task[None] | None = None
async def reserve(
self,
key: Hashable,
now: float,
params: FreqLimitParams,
) -> float:
if self._idle_ttl is not None:
self._cleanup_expired(now)
self._ensure_sweeper()
lock = self._locks.get(key)
if lock is None:
lock = asyncio.Lock()
self._locks[key] = lock
async with lock:
tat = self._tat.get(key)
new_tat, delay = gcra_step(now, tat, params)
self._tat[key] = new_tat
self._last_seen[key] = now
return delay
async def clear(self) -> None:
"""Reset state (handy in tests or manual reset)."""
self._tat.clear()
self._locks.clear()
self._last_seen.clear()
if self._sweeper_task is not None:
_ = self._sweeper_task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await self._sweeper_task
self._sweeper_task = None
def _cleanup_expired(self, now: float) -> None:
ttl = self._idle_ttl
if ttl is None:
return
expiry_threshold = now - ttl
for key in list(self._tat):
last_seen = self._last_seen.get(key)
if last_seen is None or last_seen > expiry_threshold:
continue
lock = self._locks.get(key)
if lock is not None and lock.locked():
continue
_ = self._tat.pop(key, None)
_ = self._locks.pop(key, None)
_ = self._last_seen.pop(key, None)
def _ensure_sweeper(self) -> None:
interval = self._sweeper_interval
if interval is None:
return
if self._sweeper_task is not None and not self._sweeper_task.done():
return
loop = asyncio.get_running_loop()
self._sweeper_task = loop.create_task(self._sweep_periodically(interval))
async def _sweep_periodically(self, interval: float) -> None:
try:
while True:
await asyncio.sleep(interval)
now = asyncio.get_running_loop().time()
self._cleanup_expired(now)
except asyncio.CancelledError: # pragma: no cover - lifecycle cleanup
raise
from collections.abc import Hashable
from textwrap import dedent as ddent
from typing import Final
from redis.asyncio import Redis
from redis.commands.core import Script
from ..params import FreqLimitParams
__all__ = ("GCRA_LUA", "RedisBackend")
# Lua script: single GCRA step + TTL handling (atomic on Redis).
#
# KEYS[1] = key holding TAT
# ARGV[1] = interval (T)
# ARGV[2] = tau
# ARGV[3] = extra_ttl (seconds)
#
# Returns string float: delay in seconds until request is conforming.
GCRA_LUA: Final[str] = ddent(
r"""
redis.replicate_commands()
local key = KEYS[1]
local interval = tonumber(ARGV[1])
local tau = tonumber(ARGV[2])
local extra_ttl = tonumber(ARGV[3])
-- Current time from Redis server: seconds + microseconds
local now_time = redis.call("TIME")
local now = tonumber(now_time[1]) + tonumber(now_time[2]) / 1000000.0
-- Read TAT if present
local tat_str = redis.call("GET", key)
local tat
if not tat_str then
tat = now
else
tat = tonumber(tat_str)
end
-- Earliest conforming time
local allowed_time = tat - tau
local delay = 0.0
local effective_now = now
if effective_now < allowed_time then
delay = allowed_time - effective_now
effective_now = allowed_time
end
-- GCRA virtual scheduling
if effective_now >= tat then
tat = effective_now + interval
else
tat = tat + interval
end
-- TTL: keep key while there is debt plus small buffer
local ttl = (tat - now) + extra_ttl
if ttl < 1.0 then
ttl = 1.0
end
redis.call("SET", key, tat, "EX", math.ceil(ttl))
return tostring(delay)
"""
)
class RedisBackend:
"""
Redis backend for FreqLimit.
GCRA logic runs in a Lua script on Redis, using server time (TIME)
so multiple hosts share a clock. Python side only passes parameters
and parses the resulting delay.
"""
def __init__(
self,
redis: Redis,
*,
prefix: str = "freqlimit:gcra:",
extra_ttl: float = 0.0,
) -> None:
"""
redis — redis.asyncio.Redis client.
prefix — prefix for limiter keys.
extra_ttl — extra TTL buffer after backlog is cleared (seconds).
"""
self._redis: Final[Redis] = redis
self._prefix: Final = prefix
self._extra_ttl: Final = float(extra_ttl)
# Script object caches SHA and transparently uses EVAL/EVALSHA.
self._script: Script = redis.register_script(GCRA_LUA)
async def reserve(
self,
key: Hashable,
now: float,
params: FreqLimitParams,
) -> float:
redis_key = f"{self._prefix}{key}"
interval = params.interval
tau = params.tau
# TTL buffer after backlog is cleared; Lua adds debt duration
extra_ttl = self._extra_ttl
_ = now # server time is used inside Lua script
delay_str: str = await self._script(
keys=[redis_key],
args=[interval, tau, extra_ttl],
)
return float(delay_str)
async def clear(self) -> None:
"""Delete keys with the prefix (handy for tests/debug)."""
pattern = f"{self._prefix}*"
async for key in self._redis.scan_iter(match=pattern):
_ = await self._redis.delete(key)
from .params import FreqLimitParams
__all__ = ("gcra_step",)
def gcra_step(
now: float,
tat: float | None,
params: FreqLimitParams,
) -> tuple[float, float]:
"""
Single GCRA step (Generic Cell Rate Algorithm) in virtual
scheduling form.
Input:
* now — current time (same units as tat, usually loop.time())
* tat — TAT (Theoretical Arrival Time) for key, or None if new
* params — limit contract
Output:
* new_tat — updated TAT
* delay — how long to wait from now to be conforming
"""
if tat is None:
tat = now
# Earliest moment when the packet would be conforming
allowed_time = tat - params.tau
delay = 0.0
effective_now = now
if effective_now < allowed_time:
delay = allowed_time - effective_now
effective_now = allowed_time
# GCRA virtual scheduling:
# - if arrived late → tat = effective_now + params.interval
# - if slightly early → tat = tat + params.interval
tat = (
effective_now + params.interval
if effective_now >= tat
else tat + params.interval
)
return tat, delay
import asyncio
import inspect
from collections.abc import AsyncIterator, Awaitable, Callable, Hashable
from contextlib import asynccontextmanager
from typing import Final
from .backends import FreqLimitBackend
from .params import FreqLimitParams
__all__ = ("FreqLimit",)
class FreqLimit:
"""
Async rate limiter built on the Generic Cell Rate Algorithm (GCRA).
Example:
params = FreqLimitParams(limit=1, period=1.0) # 1 опер/сек
limiter = FreqLimit(params, backend=InMemoryBackend())
async with limiter.resource("chat:42"):
await send_message(...)
"""
# Alias so one can write FreqLimit.Params(...)
Params: type[FreqLimitParams] = FreqLimitParams
def __init__(
self,
params: FreqLimitParams,
*,
backend: FreqLimitBackend,
) -> None:
self._params: Final = params
self._backend: Final = backend
@property
def params(self) -> FreqLimitParams:
return self._params
@property
def backend(self) -> FreqLimitBackend:
return self._backend
@asynccontextmanager
async def resource(self, key: Hashable | None = None) -> AsyncIterator[None]:
"""
Context manager that enforces the limit.
key=None — global bucket (single key for whole limiter).
"""
if key is None:
key = "_global"
loop = asyncio.get_running_loop()
now = loop.time()
delay = await self._backend.reserve(key, now, self._params)
if delay > 0:
await asyncio.sleep(delay)
yield
async def clear(self) -> None:
"""
Reset backend state if supported.
For InMemoryBackend — clears data.
"""
clear_obj: Callable[[], Awaitable[object] | object] | None = getattr(
self._backend,
"clear",
None,
)
if clear_obj is None:
return
result = clear_obj()
if inspect.isawaitable(result):
await result
from dataclasses import dataclass
__all__ = ("FreqLimitParams",)
@dataclass(frozen=True, slots=True)
class FreqLimitParams:
"""
GCRA limit parameters:
* limit — number of events allowed per period
* period — window length in seconds
* burst — how many events can be squeezed almost at once
Derived:
* interval (T) = period / limit
* tau = (burst - 1) * interval
"""
limit: int
period: float
burst: int = 1
def __post_init__(self) -> None:
if self.limit <= 0:
raise ValueError("limit must be greater than 0")
if self.period <= 0:
raise ValueError("period must be greater than 0")
if self.burst <= 0:
raise ValueError("burst must be greater than 0")
@property
def interval(self) -> float:
return self.period / self.limit
@property
def tau(self) -> float:
return self.interval * (self.burst - 1)
+2
-2
MIT License
Copyright (c) 2018-2024 Gleb Chipiga
Copyright (c) 2018-2025 Gleb Chipiga

@@ -21,2 +21,2 @@ Permission is hereby granted, free of charge, to any person obtaining a copy

OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
SOFTWARE.
+85
-15
Metadata-Version: 2.3
Name: aiofreqlimit
Version: 0.1.5
Version: 0.2.0
Summary: Frequency limit for asyncio

@@ -8,3 +8,3 @@ Author: Gleb Chipiga

Copyright (c) 2018-2024 Gleb Chipiga
Copyright (c) 2018-2025 Gleb Chipiga

@@ -42,16 +42,27 @@ Permission is hereby granted, free of charge, to any person obtaining a copy

Classifier: Framework :: AsyncIO
Requires-Dist: redis>=5.0.0 ; extra == 'redis'
Requires-Dist: pytest>=9.0.1 ; extra == 'test'
Requires-Dist: pytest-asyncio>=1.3.0 ; extra == 'test'
Requires-Dist: hypothesis>=6.148.1 ; extra == 'test'
Requires-Dist: pytest-cov>=7.0.0 ; extra == 'test'
Requires-Dist: pytest-mock>=3.14.0 ; extra == 'test'
Requires-Dist: redis>=5.0.0 ; extra == 'test'
Requires-Dist: testcontainers[redis]>=4.3.3 ; extra == 'test'
Requires-Python: >=3.11, <3.15
Project-URL: Homepage, https://github.com/gleb-chipiga/aiofreqlimit
Provides-Extra: redis
Provides-Extra: test
Description-Content-Type: text/markdown
# Frequency Limit Context Manager for asyncio
# aiofreqlimit — Async GCRA rate limiter
[![Latest PyPI package version](https://badge.fury.io/py/aiofreqlimit.svg)](https://pypi.org/project/aiofreqlimit)
[![License: MIT](https://img.shields.io/badge/license-MIT-blue.svg)](https://github.com/gleb-chipiga/aiofreqlimit/blob/master/LICENSE)
[![License: MIT](https://img.shields.io/badge/license-MIT-blue.svg)](LICENSE)
[![Downloads](https://img.shields.io/pypi/dm/aiofreqlimit)](https://pypistats.org/packages/aiofreqlimit)
Async rate limiting for Python 3.11+ built on the Generic Cell Rate Algorithm (GCRA) with
type-safe parameters and pluggable backends.
## Installation
aiofreqlimit requires Python 3.11 or greater and is available on PyPI. Use pip to install it:
```bash

@@ -61,5 +72,6 @@ pip install aiofreqlimit

## Using aiofreqlimit
## Quickstart
Pass a value of any hashable type to `acquire`, or omit the argument to use the default key:
Create a contract (`FreqLimitParams`), choose a backend, and wrap your code with the
async context manager:

@@ -69,14 +81,16 @@ ```python

from aiofreqlimit import FreqLimit
from aiofreqlimit import FreqLimit, FreqLimitParams
from aiofreqlimit.backends.memory import InMemoryBackend
limit = FreqLimit(1 / 10)
params = FreqLimitParams(limit=1, period=1.0, burst=1) # 1 op / second
limiter = FreqLimit(params, backend=InMemoryBackend())
async def job():
async with limit.acquire("some_key"):
await some_call()
async def send_message(chat_id: int, text: str) -> None:
async with limiter.resource(f"chat:{chat_id}"):
await bot.send_message(chat_id, text)
async def main():
await asyncio.gather(job() for _ in range(100))
async def main() -> None:
await asyncio.gather(*(send_message(42, f"msg {i}") for i in range(5)))

@@ -86,1 +100,57 @@

```
- `key` is any hashable; `None` uses a global bucket.
- `burst` lets you allow short bursts without changing the long-term rate.
## Params are type-safe
You can keep your limits as constants and reuse them across the project:
```python
from aiofreqlimit import FreqLimitParams
TELEGRAM_PER_CHAT = FreqLimitParams(limit=1, period=1.0, burst=1)
TELEGRAM_PER_GROUP = FreqLimitParams(limit=20, period=60.0, burst=3)
```
## Backends
- `InMemoryBackend` (in-process, single event loop) — import from
`aiofreqlimit.backends.memory`.
- `idle_ttl: float | None` — drop idle keys after this many seconds (default: None).
- `sweeper_interval: float | None` — optional background cleanup period; set to
enable a sweeper task (default: None).
- `RedisBackend` (shared, multi-host) — import from `aiofreqlimit.backends.redis`.
- Install optional deps: `pip install aiofreqlimit[redis]`.
- Uses Redis server time and a Lua script for atomic GCRA steps.
- `prefix: str` — key prefix (default `freqlimit:gcra:`).
- `extra_ttl: float` — small buffer added to debt horizon; controls how long keys
stay after backlog is cleared.
- Implement `FreqLimitBackend` to plug in other stores:
```python
from collections.abc import Hashable
from aiofreqlimit import FreqLimitBackend, FreqLimitParams
class RedisBackend(FreqLimitBackend):
async def reserve(self, key: Hashable, now: float, params: FreqLimitParams) -> float:
...
```
`FreqLimit` requires an explicit backend instance; no default is provided.
## Testing
The library ships with pytest + hypothesis tests. To run them with uv:
```bash
uv run pytest tests
```
Integration tests for the Redis backend use Testcontainers; Docker must be available
for those cases.
## License
MIT

@@ -7,3 +7,3 @@ [build-system]

name = "aiofreqlimit"
version = "0.1.5"
version = "0.2.0"
description = "Frequency limit for asyncio"

@@ -33,2 +33,14 @@ readme = "README.md"

[project.optional-dependencies]
redis = ["redis>=5.0.0"]
test = [
"pytest>=9.0.1",
"pytest-asyncio>=1.3.0",
"hypothesis>=6.148.1",
"pytest-cov>=7.0.0",
"pytest-mock>=3.14.0",
"redis>=5.0.0",
"testcontainers[redis]>=4.3.3",
]
[dependency-groups]

@@ -42,3 +54,5 @@ lint = [

"pytest-mock>=3.14.0",
"redis>=5.0.0",
"hypothesis>=6.148.1",
"testcontainers[redis]>=4.3.3",
]

@@ -51,2 +65,4 @@ test = [

"pytest-mock>=3.14.0",
"redis>=5.0.0",
"testcontainers[redis]>=4.3.3",
]

@@ -62,2 +78,4 @@ dev = [

"pytest-mock>=3.14.0",
"redis>=5.0.0",
"testcontainers[redis]>=4.3.3",
"ruff>=0.14.5",

@@ -104,4 +122,10 @@ "twine>=6.2.0",

[tool.mypy]
mypy_path = "typings"
[tool.pytest.ini_options]
asyncio_mode = "auto"
filterwarnings = [
"ignore:The @wait_container_is_ready decorator is deprecated:DeprecationWarning:testcontainers.*",
]

@@ -108,0 +132,0 @@ [tool.tox]

+73
-13

@@ -1,11 +0,12 @@

# Frequency Limit Context Manager for asyncio
# aiofreqlimit — Async GCRA rate limiter
[![Latest PyPI package version](https://badge.fury.io/py/aiofreqlimit.svg)](https://pypi.org/project/aiofreqlimit)
[![License: MIT](https://img.shields.io/badge/license-MIT-blue.svg)](https://github.com/gleb-chipiga/aiofreqlimit/blob/master/LICENSE)
[![License: MIT](https://img.shields.io/badge/license-MIT-blue.svg)](LICENSE)
[![Downloads](https://img.shields.io/pypi/dm/aiofreqlimit)](https://pypistats.org/packages/aiofreqlimit)
Async rate limiting for Python 3.11+ built on the Generic Cell Rate Algorithm (GCRA) with
type-safe parameters and pluggable backends.
## Installation
aiofreqlimit requires Python 3.11 or greater and is available on PyPI. Use pip to install it:
```bash

@@ -15,5 +16,6 @@ pip install aiofreqlimit

## Using aiofreqlimit
## Quickstart
Pass a value of any hashable type to `acquire`, or omit the argument to use the default key:
Create a contract (`FreqLimitParams`), choose a backend, and wrap your code with the
async context manager:

@@ -23,14 +25,16 @@ ```python

from aiofreqlimit import FreqLimit
from aiofreqlimit import FreqLimit, FreqLimitParams
from aiofreqlimit.backends.memory import InMemoryBackend
limit = FreqLimit(1 / 10)
params = FreqLimitParams(limit=1, period=1.0, burst=1) # 1 op / second
limiter = FreqLimit(params, backend=InMemoryBackend())
async def job():
async with limit.acquire("some_key"):
await some_call()
async def send_message(chat_id: int, text: str) -> None:
async with limiter.resource(f"chat:{chat_id}"):
await bot.send_message(chat_id, text)
async def main():
await asyncio.gather(job() for _ in range(100))
async def main() -> None:
await asyncio.gather(*(send_message(42, f"msg {i}") for i in range(5)))

@@ -40,1 +44,57 @@

```
- `key` is any hashable; `None` uses a global bucket.
- `burst` lets you allow short bursts without changing the long-term rate.
## Params are type-safe
You can keep your limits as constants and reuse them across the project:
```python
from aiofreqlimit import FreqLimitParams
TELEGRAM_PER_CHAT = FreqLimitParams(limit=1, period=1.0, burst=1)
TELEGRAM_PER_GROUP = FreqLimitParams(limit=20, period=60.0, burst=3)
```
## Backends
- `InMemoryBackend` (in-process, single event loop) — import from
`aiofreqlimit.backends.memory`.
- `idle_ttl: float | None` — drop idle keys after this many seconds (default: None).
- `sweeper_interval: float | None` — optional background cleanup period; set to
enable a sweeper task (default: None).
- `RedisBackend` (shared, multi-host) — import from `aiofreqlimit.backends.redis`.
- Install optional deps: `pip install aiofreqlimit[redis]`.
- Uses Redis server time and a Lua script for atomic GCRA steps.
- `prefix: str` — key prefix (default `freqlimit:gcra:`).
- `extra_ttl: float` — small buffer added to debt horizon; controls how long keys
stay after backlog is cleared.
- Implement `FreqLimitBackend` to plug in other stores:
```python
from collections.abc import Hashable
from aiofreqlimit import FreqLimitBackend, FreqLimitParams
class RedisBackend(FreqLimitBackend):
async def reserve(self, key: Hashable, now: float, params: FreqLimitParams) -> float:
...
```
`FreqLimit` requires an explicit backend instance; no default is provided.
## Testing
The library ships with pytest + hypothesis tests. To run them with uv:
```bash
uv run pytest tests
```
Integration tests for the Redis backend use Testcontainers; Docker must be available
for those cases.
## License
MIT

@@ -1,99 +0,13 @@

import asyncio
from collections.abc import AsyncIterator, Hashable
from contextlib import AsyncExitStack, asynccontextmanager, suppress
from types import TracebackType
from typing import Final
from ._version import __version__
from .backends import FreqLimitBackend
from .gcra import gcra_step
from .limiter import FreqLimit
from .params import FreqLimitParams
__all__ = ("FreqLimit", "__version__")
class Lock:
def __init__(self) -> None:
self._ts: float = -float("inf")
self._count: int = 0
self._lock: Final = asyncio.Lock()
@property
def ts(self) -> float:
return self._ts
@ts.setter
def ts(self, ts: float) -> None:
self._ts = ts
async def __aenter__(self) -> None:
self._count += 1
_ = await self._lock.acquire()
async def __aexit__(
self,
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> None:
try:
self._lock.release()
finally:
self._count -= 1
@property
def count(self) -> int:
return self._count
class FreqLimit:
def __init__(
self,
interval: float,
clean_interval: float = 0,
) -> None:
if interval <= 0:
raise RuntimeError("Interval must be greater than 0")
if clean_interval < 0:
raise RuntimeError("Clean interval must be greater than or equal to 0")
self._interval: Final = interval
self._clean_interval: Final = clean_interval if clean_interval > 0 else interval
self._locks: Final = dict[Hashable, Lock]()
self._clean_event: Final = asyncio.Event()
self._clean_task: asyncio.Task[None] | None = None
self._loop: Final = asyncio.get_running_loop()
@asynccontextmanager
async def resource(
self,
key: Hashable = None,
) -> AsyncIterator[None]:
if self._clean_task is None:
self._clean_task = asyncio.create_task(self._clean())
if key not in self._locks:
self._locks[key] = Lock()
async with AsyncExitStack() as stack:
_ = stack.callback(self._clean_event.set)
_ = await stack.enter_async_context(self._locks[key])
delay = self._interval - self._loop.time() + self._locks[key].ts
if delay > 0:
await asyncio.sleep(delay)
self._locks[key].ts = self._loop.time()
yield
async def clear(self) -> None:
if self._clean_task is not None:
_ = self._clean_task.cancel()
with suppress(asyncio.CancelledError):
await self._clean_task
self._clean_task = None
self._locks.clear()
self._clean_event.clear()
async def _clean(self) -> None:
while True:
if len(self._locks) == 0:
_ = await self._clean_event.wait()
self._clean_event.clear()
for key in tuple(self._locks):
age = self._loop.time() - self._locks[key].ts
if self._locks[key].count == 0 and age >= self._clean_interval:
del self._locks[key]
await asyncio.sleep(self._clean_interval)
__all__ = (
"FreqLimit",
"FreqLimitBackend",
"FreqLimitParams",
"__version__",
"gcra_step",
)

@@ -1,3 +0,1 @@

from __future__ import annotations
from importlib import metadata

@@ -4,0 +2,0 @@ from pathlib import Path