aiofreqlimit
Advanced tools
| import asyncio | ||
| from collections.abc import AsyncIterator, Hashable | ||
| from contextlib import AsyncExitStack, asynccontextmanager, suppress | ||
| from types import TracebackType | ||
| from typing import Final | ||
| __all__ = ("FreqLimit", "__version__") | ||
| __version__ = "0.0.16" | ||
| class Lock: | ||
| def __init__(self) -> None: | ||
| self._ts: float = -float("inf") | ||
| self._count: int = 0 | ||
| self._lock: Final = asyncio.Lock() | ||
| @property | ||
| def ts(self) -> float: | ||
| return self._ts | ||
| @ts.setter | ||
| def ts(self, ts: float) -> None: | ||
| self._ts = ts | ||
| async def __aenter__(self) -> None: | ||
| self._count += 1 | ||
| _ = await self._lock.acquire() | ||
| async def __aexit__( | ||
| self, | ||
| exc_type: type[BaseException] | None, | ||
| exc_val: BaseException | None, | ||
| exc_tb: TracebackType | None, | ||
| ) -> None: | ||
| try: | ||
| self._lock.release() | ||
| finally: | ||
| self._count -= 1 | ||
| @property | ||
| def count(self) -> int: | ||
| return self._count | ||
| class FreqLimit: | ||
| def __init__( | ||
| self, | ||
| interval: float, | ||
| clean_interval: float = 0, | ||
| ) -> None: | ||
| if interval <= 0: | ||
| raise RuntimeError("Interval must be greater than 0") | ||
| if clean_interval < 0: | ||
| raise RuntimeError("Clean interval must be greater than or equal to 0") | ||
| self._interval: Final = interval | ||
| self._clean_interval: Final = clean_interval if clean_interval > 0 else interval | ||
| self._locks: Final = dict[Hashable, Lock]() | ||
| self._clean_event: Final = asyncio.Event() | ||
| self._clean_task: asyncio.Task[None] | None = None | ||
| self._loop: Final = asyncio.get_running_loop() | ||
| @asynccontextmanager | ||
| async def resource( | ||
| self, | ||
| key: Hashable = None, | ||
| ) -> AsyncIterator[None]: | ||
| if self._clean_task is None: | ||
| self._clean_task = asyncio.create_task(self._clean()) | ||
| if key not in self._locks: | ||
| self._locks[key] = Lock() | ||
| async with AsyncExitStack() as stack: | ||
| _ = stack.callback(self._clean_event.set) | ||
| _ = await stack.enter_async_context(self._locks[key]) | ||
| delay = self._interval - self._loop.time() + self._locks[key].ts | ||
| if delay > 0: | ||
| await asyncio.sleep(delay) | ||
| self._locks[key].ts = self._loop.time() | ||
| yield | ||
| async def clear(self) -> None: | ||
| if self._clean_task is not None: | ||
| _ = self._clean_task.cancel() | ||
| with suppress(asyncio.CancelledError): | ||
| await self._clean_task | ||
| self._clean_task = None | ||
| self._locks.clear() | ||
| self._clean_event.clear() | ||
| async def _clean(self) -> None: | ||
| while True: | ||
| if len(self._locks) == 0: | ||
| _ = await self._clean_event.wait() | ||
| self._clean_event.clear() | ||
| for key in tuple(self._locks): | ||
| age = self._loop.time() - self._locks[key].ts | ||
| if self._locks[key].count == 0 and age >= self._clean_interval: | ||
| del self._locks[key] | ||
| await asyncio.sleep(self._clean_interval) |
| PEP 561 marker |
+28
-7
@@ -1,8 +0,27 @@ | ||
| Metadata-Version: 2.1 | ||
| Metadata-Version: 2.3 | ||
| Name: aiofreqlimit | ||
| Version: 0.0.16 | ||
| Version: 0.1.1 | ||
| Summary: Frequency limit for asyncio | ||
| Home-page: https://github.com/gleb-chipiga/aiofreqlimit | ||
| Author: Gleb Chipiga | ||
| License: MIT | ||
| License: MIT License | ||
| Copyright (c) 2018-2024 Gleb Chipiga | ||
| Permission is hereby granted, free of charge, to any person obtaining a copy | ||
| of this software and associated documentation files (the "Software"), to deal | ||
| in the Software without restriction, including without limitation the rights | ||
| to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | ||
| copies of the Software, and to permit persons to whom the Software is | ||
| furnished to do so, subject to the following conditions: | ||
| The above copyright notice and this permission notice shall be included in all | ||
| copies or substantial portions of the Software. | ||
| THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | ||
| IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | ||
| FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | ||
| AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | ||
| LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | ||
| OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | ||
| SOFTWARE. | ||
| Classifier: Intended Audience :: Developers | ||
@@ -13,2 +32,4 @@ Classifier: Programming Language :: Python | ||
| Classifier: Programming Language :: Python :: 3.12 | ||
| Classifier: Programming Language :: Python :: 3.13 | ||
| Classifier: Programming Language :: Python :: 3.14 | ||
| Classifier: License :: OSI Approved :: MIT License | ||
@@ -21,5 +42,5 @@ Classifier: Development Status :: 4 - Beta | ||
| Classifier: Framework :: AsyncIO | ||
| Requires-Python: >=3.11,<3.13 | ||
| Requires-Python: >=3.11, <3.15 | ||
| Project-URL: Homepage, https://github.com/gleb-chipiga/aiofreqlimit | ||
| Description-Content-Type: text/x-rst | ||
| License-File: LICENSE | ||
@@ -72,2 +93,2 @@ =========================================== | ||
| asyncio.run(main()) | ||
| asyncio.run(main()) |
+133
-22
@@ -1,8 +0,99 @@ | ||
| [tool.isort] | ||
| profile = "black" | ||
| line_length = 79 | ||
| [build-system] | ||
| requires = ["uv_build>=0.9.6,<0.10.0"] | ||
| build-backend = "uv_build" | ||
| [tool.black] | ||
| line-length = 79 | ||
| [project] | ||
| name = "aiofreqlimit" | ||
| version = "0.1.1" | ||
| description = "Frequency limit for asyncio" | ||
| readme = "README.rst" | ||
| license = { file = "LICENSE" } | ||
| authors = [{ name = "Gleb Chipiga" }] | ||
| requires-python = ">=3.11,<3.15" | ||
| urls = { Homepage = "https://github.com/gleb-chipiga/aiofreqlimit" } | ||
| classifiers = [ | ||
| "Intended Audience :: Developers", | ||
| "Programming Language :: Python", | ||
| "Programming Language :: Python :: 3", | ||
| "Programming Language :: Python :: 3.11", | ||
| "Programming Language :: Python :: 3.12", | ||
| "Programming Language :: Python :: 3.13", | ||
| "Programming Language :: Python :: 3.14", | ||
| "License :: OSI Approved :: MIT License", | ||
| "Development Status :: 4 - Beta", | ||
| "Operating System :: POSIX", | ||
| "Operating System :: MacOS :: MacOS X", | ||
| "Operating System :: Microsoft :: Windows", | ||
| "Topic :: Internet", | ||
| "Framework :: AsyncIO", | ||
| ] | ||
| dependencies = [] | ||
| [dependency-groups] | ||
| lint = [ | ||
| "basedpyright>=1.33.0", | ||
| "mypy>=1.18.2", | ||
| "ruff>=0.14.5", | ||
| "pytest>=9.0.1", | ||
| "pytest-asyncio>=1.3.0", | ||
| "pytest-mock>=3.14.0", | ||
| "hypothesis>=6.148.1", | ||
| ] | ||
| test = [ | ||
| "hypothesis>=6.148.1", | ||
| "pytest>=9.0.1", | ||
| "pytest-asyncio>=1.3.0", | ||
| "pytest-cov>=7.0.0", | ||
| "pytest-mock>=3.14.0", | ||
| ] | ||
| dev = [ | ||
| "basedpyright>=1.33.0", | ||
| "build>=1.3.0", | ||
| "hypothesis>=6.148.1", | ||
| "mypy>=1.18.2", | ||
| "pytest>=9.0.1", | ||
| "pytest-asyncio>=1.3.0", | ||
| "pytest-cov>=7.0.0", | ||
| "pytest-mock>=3.14.0", | ||
| "ruff>=0.14.5", | ||
| "twine>=6.2.0", | ||
| ] | ||
| [tool.ruff] | ||
| line-length = 88 | ||
| target-version = "py311" | ||
| preview = true | ||
| [tool.ruff.format] | ||
| docstring-code-format = true | ||
| indent-style = "space" | ||
| line-ending = "lf" | ||
| quote-style = "double" | ||
| skip-magic-trailing-comma = false | ||
| [tool.ruff.lint] | ||
| select = [ | ||
| "E", | ||
| "F", | ||
| "W", | ||
| "B", | ||
| "C4", | ||
| "I", | ||
| "N", | ||
| "RET", | ||
| "RUF", | ||
| "SIM", | ||
| "UP", | ||
| "YTT", | ||
| ] | ||
| fixable = ["ALL"] | ||
| [tool.ruff.lint.pycodestyle] | ||
| max-line-length = 88 | ||
| max-doc-length = 72 | ||
| [tool.ruff.lint.isort] | ||
| combine-as-imports = true | ||
| known-first-party = ["aiofreqlimit"] | ||
| [tool.pytest.ini_options] | ||
@@ -12,20 +103,40 @@ asyncio_mode = "auto" | ||
| [tool.tox] | ||
| legacy_tox_ini = """ | ||
| [tox] | ||
| envlist = py311,py312 | ||
| requires = ["tox>=4.22", "tox-uv>=1.15"] | ||
| env_list = ["py311", "py312", "py313", "py314", "lint"] | ||
| [testenv] | ||
| deps = | ||
| flake8 | ||
| pytest | ||
| pytest-asyncio | ||
| pytest_mock | ||
| hypothesis | ||
| importlib-metadata | ||
| mypy | ||
| [tool.tox.env_run_base] | ||
| runner = "uv-venv-lock-runner" | ||
| commands = [ | ||
| ["pytest", "tests"], | ||
| ] | ||
| commands = | ||
| flake8 --exclude .tox . | ||
| mypy --strict . | ||
| pytest tests | ||
| """ | ||
| [tool.tox.env.py311] | ||
| description = "Tests on CPython 3.11" | ||
| base_python = ["python3.11"] | ||
| dependency_groups = ["test"] | ||
| [tool.tox.env.py312] | ||
| description = "Tests on CPython 3.12" | ||
| base_python = ["python3.12"] | ||
| dependency_groups = ["test"] | ||
| [tool.tox.env.py313] | ||
| description = "Tests on CPython 3.13" | ||
| base_python = ["python3.13"] | ||
| dependency_groups = ["test"] | ||
| [tool.tox.env.py314] | ||
| description = "Tests on CPython 3.14" | ||
| base_python = ["python3.14"] | ||
| dependency_groups = ["test"] | ||
| [tool.tox.env.lint] | ||
| description = "Lint + type checks on CPython 3.11" | ||
| base_python = ["python3.11"] | ||
| dependency_groups = ["lint"] | ||
| commands = [ | ||
| ["ruff", "format", "--check", "src/aiofreqlimit", "tests"], | ||
| ["ruff", "check", "src/aiofreqlimit", "tests"], | ||
| ["mypy", "--strict", "src/aiofreqlimit", "tests"], | ||
| ["basedpyright"], | ||
| ] |
| Metadata-Version: 2.1 | ||
| Name: aiofreqlimit | ||
| Version: 0.0.16 | ||
| Summary: Frequency limit for asyncio | ||
| Home-page: https://github.com/gleb-chipiga/aiofreqlimit | ||
| Author: Gleb Chipiga | ||
| License: MIT | ||
| Classifier: Intended Audience :: Developers | ||
| Classifier: Programming Language :: Python | ||
| Classifier: Programming Language :: Python :: 3 | ||
| Classifier: Programming Language :: Python :: 3.11 | ||
| Classifier: Programming Language :: Python :: 3.12 | ||
| Classifier: License :: OSI Approved :: MIT License | ||
| Classifier: Development Status :: 4 - Beta | ||
| Classifier: Operating System :: POSIX | ||
| Classifier: Operating System :: MacOS :: MacOS X | ||
| Classifier: Operating System :: Microsoft :: Windows | ||
| Classifier: Topic :: Internet | ||
| Classifier: Framework :: AsyncIO | ||
| Requires-Python: >=3.11,<3.13 | ||
| Description-Content-Type: text/x-rst | ||
| License-File: LICENSE | ||
| =========================================== | ||
| Frequency limit context manager for asyncio | ||
| =========================================== | ||
| .. image:: https://badge.fury.io/py/aiofreqlimit.svg | ||
| :target: https://pypi.org/project/aiofreqlimit | ||
| :alt: Latest PyPI package version | ||
| .. image:: https://img.shields.io/badge/license-MIT-blue.svg | ||
| :target: https://github.com/gleb-chipiga/aiofreqlimit/blob/master/LICENSE | ||
| :alt: License | ||
| .. image:: https://img.shields.io/pypi/dm/aiofreqlimit | ||
| :target: https://pypistats.org/packages/aiofreqlimit | ||
| :alt: Downloads count | ||
| Installation | ||
| ============ | ||
| aiofreqlimit requires Python 3.11 or greater and is available on PyPI. Use pip to install it: | ||
| .. code-block:: bash | ||
| pip install aiofreqlimit | ||
| Using aiofreqlimit | ||
| ================== | ||
| Pass a value of any hashable type to `acquire` or do not specify any parameter: | ||
| .. code-block:: python | ||
| import asyncio | ||
| from aiofreqlimit import FreqLimit | ||
| limit = FreqLimit(1 / 10) | ||
| async def job(): | ||
| async with limit.acquire('some_key'): | ||
| await some_call() | ||
| async def main(): | ||
| await asyncio.gather(job() for _ in range(100)) | ||
| asyncio.run(main()) |
| LICENSE | ||
| README.rst | ||
| pyproject.toml | ||
| setup.py | ||
| aiofreqlimit/__init__.py | ||
| aiofreqlimit/py.typed | ||
| aiofreqlimit.egg-info/PKG-INFO | ||
| aiofreqlimit.egg-info/SOURCES.txt | ||
| aiofreqlimit.egg-info/dependency_links.txt | ||
| aiofreqlimit.egg-info/top_level.txt | ||
| tests/test_aiofreqlimit.py |
| aiofreqlimit |
| import asyncio | ||
| from contextlib import AsyncExitStack, asynccontextmanager, suppress | ||
| from types import TracebackType | ||
| from typing import AsyncIterator, Final, Hashable | ||
| __all__ = ("FreqLimit", "__version__") | ||
| __version__ = "0.0.16" | ||
| class Lock: | ||
| def __init__(self) -> None: | ||
| self._ts = -float("inf") | ||
| self._count = 0 | ||
| self._lock: Final = asyncio.Lock() | ||
| @property | ||
| def ts(self) -> float: | ||
| return self._ts | ||
| @ts.setter | ||
| def ts(self, ts: float) -> None: | ||
| self._ts = ts | ||
| async def __aenter__(self) -> None: | ||
| self._count += 1 | ||
| await self._lock.acquire() | ||
| async def __aexit__( | ||
| self, | ||
| exc_type: type[BaseException] | None, | ||
| exc_val: BaseException | None, | ||
| exc_tb: TracebackType | None, | ||
| ) -> None: | ||
| try: | ||
| self._lock.release() | ||
| finally: | ||
| self._count -= 1 | ||
| @property | ||
| def count(self) -> int: | ||
| return self._count | ||
| class FreqLimit: | ||
| def __init__( | ||
| self, | ||
| interval: float, | ||
| clean_interval: float = 0, | ||
| ) -> None: | ||
| if interval <= 0: | ||
| raise RuntimeError("Interval must be greater than 0") | ||
| if clean_interval < 0: | ||
| raise RuntimeError( | ||
| "Clean interval must be greater than or equal to 0" | ||
| ) | ||
| self._interval: Final = interval | ||
| self._clean_interval: Final = ( | ||
| clean_interval if clean_interval > 0 else interval | ||
| ) | ||
| self._locks: Final = dict[Hashable, Lock]() | ||
| self._clean_event: Final = asyncio.Event() | ||
| self._clean_task: asyncio.Task[None] | None = None | ||
| self._loop: Final = asyncio.get_running_loop() | ||
| @asynccontextmanager | ||
| async def resource( | ||
| self, | ||
| key: Hashable = None, | ||
| ) -> AsyncIterator[None]: | ||
| if self._clean_task is None: | ||
| self._clean_task = asyncio.create_task(self._clean()) | ||
| if key not in self._locks: | ||
| self._locks[key] = Lock() | ||
| async with AsyncExitStack() as stack: | ||
| stack.callback(self._clean_event.set) | ||
| await stack.enter_async_context(self._locks[key]) | ||
| delay = self._interval - self._loop.time() + self._locks[key].ts | ||
| if delay > 0: | ||
| await asyncio.sleep(delay) | ||
| self._locks[key].ts = self._loop.time() | ||
| yield | ||
| async def clear(self) -> None: | ||
| if self._clean_task is not None: | ||
| self._clean_task.cancel() | ||
| with suppress(asyncio.CancelledError): | ||
| await self._clean_task | ||
| self._clean_task = None | ||
| self._locks.clear() | ||
| self._clean_event.clear() | ||
| async def _clean(self) -> None: | ||
| while True: | ||
| if len(self._locks) == 0: | ||
| await self._clean_event.wait() | ||
| self._clean_event.clear() | ||
| for key in tuple(self._locks): | ||
| age = self._loop.time() - self._locks[key].ts | ||
| if self._locks[key].count == 0 and age >= self._clean_interval: | ||
| del self._locks[key] | ||
| await asyncio.sleep(self._clean_interval) |
| PEP 561 marker |
| [egg_info] | ||
| tag_build = | ||
| tag_date = 0 | ||
-45
| import re | ||
| from pathlib import Path | ||
| from setuptools import setup # type: ignore | ||
| path = Path(__file__).parent | ||
| txt = (path / "aiofreqlimit" / "__init__.py").read_text("utf-8") | ||
| version = re.findall(r"^__version__ = \"([^\"]+)\"\r?$", txt, re.M)[0] | ||
| readme = (path / "README.rst").read_text("utf-8") | ||
| setup( | ||
| name="aiofreqlimit", | ||
| version=version, | ||
| description="Frequency limit for asyncio", | ||
| long_description=readme, | ||
| long_description_content_type="text/x-rst", | ||
| url="https://github.com/gleb-chipiga/aiofreqlimit", | ||
| license="MIT", | ||
| author="Gleb Chipiga", | ||
| # author_email='', | ||
| classifiers=[ | ||
| "Intended Audience :: Developers", | ||
| "Programming Language :: Python", | ||
| "Programming Language :: Python :: 3", | ||
| "Programming Language :: Python :: 3.11", | ||
| "Programming Language :: Python :: 3.12", | ||
| "License :: OSI Approved :: MIT License", | ||
| "Development Status :: 4 - Beta", | ||
| "Operating System :: POSIX", | ||
| "Operating System :: MacOS :: MacOS X", | ||
| "Operating System :: Microsoft :: Windows", | ||
| "Topic :: Internet", | ||
| "Framework :: AsyncIO", | ||
| ], | ||
| packages=["aiofreqlimit"], | ||
| package_data={"aiofreqlimit": ["py.typed"]}, | ||
| python_requires=">=3.11,<3.13", | ||
| tests_require=[ | ||
| "pytest", | ||
| "pytest-asyncio", | ||
| "pytest-mock", | ||
| "pytest-cov", | ||
| "hypothesis", | ||
| ], | ||
| ) |
| import asyncio | ||
| from contextlib import suppress | ||
| from random import uniform | ||
| from typing import cast | ||
| from weakref import ref | ||
| import pytest | ||
| from hypothesis import given | ||
| from hypothesis.strategies import floats | ||
| from pytest_mock import MockerFixture | ||
| import aiofreqlimit | ||
| @pytest.mark.asyncio | ||
| async def test_lock_context_manager_enter(mocker: MockerFixture) -> None: | ||
| lock = aiofreqlimit.Lock() | ||
| acquire = mocker.patch.object(lock._lock, "acquire") | ||
| acquire.side_effect = RuntimeError("acquire") | ||
| release = mocker.patch.object(lock._lock, "release") | ||
| flag = False | ||
| with pytest.raises(RuntimeError, match="acquire"): | ||
| async with lock: | ||
| flag = True | ||
| assert lock.count == 1 | ||
| acquire.assert_called_once_with() | ||
| release.assert_not_called() | ||
| assert not flag | ||
| @pytest.mark.asyncio | ||
| async def test_lock_context_manager_exit(mocker: MockerFixture) -> None: | ||
| lock = aiofreqlimit.Lock() | ||
| acquire = mocker.patch.object(lock._lock, "acquire") | ||
| release = mocker.patch.object(lock._lock, "release") | ||
| release.side_effect = RuntimeError("release") | ||
| flag = False | ||
| with pytest.raises(RuntimeError, match="release"): | ||
| async with lock: | ||
| flag = True | ||
| assert lock.count == 0 | ||
| acquire.assert_called_once_with() | ||
| release.assert_called_once_with() | ||
| assert flag | ||
| @pytest.mark.asyncio | ||
| async def test_lock_context_manager() -> None: | ||
| lock = aiofreqlimit.Lock() | ||
| async with lock: | ||
| assert lock.count == 1 | ||
| assert lock.count == 0 | ||
| with suppress(RuntimeError): | ||
| async with lock: | ||
| assert lock.count == 1 | ||
| raise RuntimeError() | ||
| assert lock.count == 0 | ||
| @given(interval=floats(max_value=0)) | ||
| def test_freq_limit_interval(interval: float) -> None: | ||
| with pytest.raises(RuntimeError, match="Interval must be greater than 0"): | ||
| aiofreqlimit.FreqLimit(interval) | ||
| @given( | ||
| interval=floats(min_value=0, exclude_min=True), | ||
| clean_interval=floats(max_value=0, exclude_max=True), | ||
| ) | ||
| def test_freq_limit_clean_interval( | ||
| interval: float, clean_interval: float | ||
| ) -> None: | ||
| with pytest.raises( | ||
| RuntimeError, | ||
| match="Clean interval must be greater than or equal to 0", | ||
| ): | ||
| aiofreqlimit.FreqLimit(interval, clean_interval) | ||
| @pytest.mark.asyncio | ||
| async def test_freq_limit() -> None: | ||
| freq_limit = aiofreqlimit.FreqLimit(0.1) | ||
| loop = asyncio.get_running_loop() | ||
| async def limit( | ||
| _freq_limit: aiofreqlimit.FreqLimit, interval: float | ||
| ) -> tuple[float, float, float]: | ||
| time1 = loop.time() | ||
| async with _freq_limit.resource("key"): | ||
| assert tuple(freq_limit._locks) == ("key",) | ||
| time2 = loop.time() | ||
| await asyncio.sleep(interval) | ||
| time3 = loop.time() | ||
| return time2, time3, time2 - time1 | ||
| tasks = (limit(freq_limit, uniform(0, 0.1)) for _ in range(5)) | ||
| intervals = cast( | ||
| list[tuple[float, float, float]], | ||
| await asyncio.gather(*tasks), | ||
| ) | ||
| assert all(isinstance(value, tuple) for value in intervals) | ||
| intervals = sorted(intervals, key=lambda interval: interval[0]) | ||
| for i in range(len(intervals)): | ||
| if i + 1 < len(intervals): | ||
| assert intervals[i + 1][0] - intervals[i][0] > 0.1 | ||
| assert intervals[i][1] < intervals[i + 1][0] | ||
| await asyncio.sleep(0.11) | ||
| assert tuple(freq_limit._locks) == () | ||
| async with freq_limit.resource("key"): | ||
| pass | ||
| assert tuple(freq_limit._locks) == ("key",) | ||
| await asyncio.sleep(0.33) | ||
| assert tuple(freq_limit._locks) == () | ||
| await freq_limit.clear() | ||
| assert freq_limit._clean_task is None | ||
| assert tuple(freq_limit._locks) == () | ||
| assert not freq_limit._clean_event.is_set() | ||
| @pytest.mark.asyncio | ||
| async def test_freq_limit_keys() -> None: | ||
| freq_limit = aiofreqlimit.FreqLimit(0.1) | ||
| assert tuple(freq_limit._locks) == () | ||
| async with freq_limit.resource("key2"): | ||
| assert tuple(freq_limit._locks.keys()) == ("key2",) | ||
| async with freq_limit.resource("key3"): | ||
| assert tuple(freq_limit._locks.keys()) == ("key2", "key3") | ||
| async with freq_limit.resource("key4"): | ||
| assert tuple(freq_limit._locks.keys()) == ( | ||
| "key2", | ||
| "key3", | ||
| "key4", | ||
| ) | ||
| await asyncio.sleep(0.11) | ||
| assert tuple(freq_limit._locks.keys()) == ("key2",) | ||
| await asyncio.sleep(0.11) | ||
| assert tuple(freq_limit._locks) == () | ||
| await freq_limit.clear() | ||
| @pytest.mark.asyncio | ||
| async def test_freq_limit_overlaps() -> None: | ||
| async def task1(_freq_limit: aiofreqlimit.FreqLimit) -> None: | ||
| async with _freq_limit.resource("key1"): | ||
| assert tuple(_freq_limit._locks) == ("key1",) | ||
| await asyncio.sleep(0.11) | ||
| assert tuple(_freq_limit._locks) == ("key1", "key2") | ||
| async def task2(_freq_limit: aiofreqlimit.FreqLimit) -> None: | ||
| await asyncio.sleep(0.05) | ||
| assert tuple(_freq_limit._locks) == ("key1",) | ||
| async with _freq_limit.resource("key2"): | ||
| assert tuple(_freq_limit._locks) == ("key1", "key2") | ||
| await asyncio.sleep(0.16) | ||
| assert tuple(_freq_limit._locks) == ("key2", "key3") | ||
| async def task3(_freq_limit: aiofreqlimit.FreqLimit) -> None: | ||
| await asyncio.sleep(0.21) | ||
| assert tuple(_freq_limit._locks) == ("key2",) | ||
| async with _freq_limit.resource("key3"): | ||
| assert tuple(_freq_limit._locks) == ("key2", "key3") | ||
| lock2_ref = ref(_freq_limit._locks["key2"]) | ||
| async with _freq_limit.resource("key2"): | ||
| assert tuple(_freq_limit._locks) == ("key2", "key3") | ||
| assert lock2_ref() is _freq_limit._locks["key2"] | ||
| await asyncio.sleep(0.1) | ||
| assert tuple(_freq_limit._locks) == ("key2", "key3") | ||
| await asyncio.sleep(0.1) | ||
| assert lock2_ref() is None | ||
| assert tuple(_freq_limit._locks) == ("key3",) | ||
| assert tuple(_freq_limit._locks) == ("key3",) | ||
| await asyncio.sleep(0.1) | ||
| assert tuple(_freq_limit._locks) == () | ||
| freq_limit = aiofreqlimit.FreqLimit(0.1) | ||
| await asyncio.gather( | ||
| task1(freq_limit), task2(freq_limit), task3(freq_limit) | ||
| ) | ||
| await freq_limit.clear() | ||
| @pytest.mark.asyncio | ||
| async def test_freq_limit_frequency() -> None: | ||
| loop = asyncio.get_running_loop() | ||
| intervals: list[float] = [] | ||
| time = loop.time() | ||
| freq_limit = aiofreqlimit.FreqLimit(0.05) | ||
| lock_ref = None | ||
| for index in range(10): | ||
| async with freq_limit.resource("key"): | ||
| pass | ||
| if index == 0: | ||
| lock_ref = ref(freq_limit._locks["key"]) | ||
| else: | ||
| intervals.append(loop.time() - time) | ||
| assert lock_ref is not None | ||
| assert lock_ref() is freq_limit._locks["key"] | ||
| time = loop.time() | ||
| assert "key" in freq_limit._locks | ||
| assert all(0.05 <= interval <= 0.051 for interval in intervals) | ||
| await freq_limit.clear() |
Alert delta unavailable
Currently unable to show alert delta for PyPI packages.
11897
-35.82%6
-53.85%85
-72.22%