vxutils
Advanced tools
+1
-1
| Metadata-Version: 2.4 | ||
| Name: vxutils | ||
| Version: 20251201 | ||
| Version: 20251209 | ||
| Summary: A toolbox for vxquant | ||
@@ -5,0 +5,0 @@ Author-email: libao <libao@vxquant.com> |
+8
-1
| [project] | ||
| name = "vxutils" | ||
| version = "20251201" | ||
| version = "20251209" | ||
| description = "A toolbox for vxquant" | ||
@@ -25,1 +25,8 @@ readme = "README.md" | ||
| default = true | ||
| [dependency-groups] | ||
| dev = [ | ||
| "pytest>=8.3.5", | ||
| ] |
@@ -21,2 +21,3 @@ from .executor import VXThreadPoolExecutor, DynamicThreadPoolExecutor | ||
| timeout, | ||
| rate_limit, | ||
| ) | ||
@@ -52,2 +53,3 @@ from .datamodel import ( | ||
| "timeout", | ||
| "rate_limit", | ||
| "VXDataModel", | ||
@@ -54,0 +56,0 @@ "VXDataAdapter", |
| import logging | ||
| import functools | ||
| import time | ||
| from collections import deque | ||
| from concurrent.futures import ThreadPoolExecutor, TimeoutError as FutureTimeoutError | ||
| from threading import Lock | ||
| from typing import Callable, Type, Tuple, Union, Any | ||
| from typing import Callable, Type, Tuple, Union, Any, Deque | ||
@@ -14,2 +15,3 @@ __all__ = [ | ||
| "timeout", | ||
| "rate_limit", | ||
| ] | ||
@@ -171,1 +173,30 @@ | ||
| return wrapper | ||
| ################################################ | ||
| ################################################ | ||
| # @rate_limit(times:int, period:float) | ||
| # 用于限制某个应用调用次数的修饰器 | ||
| ################################################ | ||
| def rate_limit(times: int, period: float) -> Any: | ||
| lock = Lock() | ||
| calls: Deque[float] = deque([0] * times, maxlen=times) | ||
| def decorator(func: Callable[[Any], Any]) -> Callable[[Any], Any]: | ||
| @functools.wraps(func) | ||
| def wrapper(*args: Any, **kwargs: Any) -> Any: | ||
| while True: | ||
| now = time.monotonic() | ||
| with lock: | ||
| if calls[0] + period <= now: | ||
| calls.append(now) | ||
| break | ||
| sleep_for = period - (now - calls[0]) | ||
| time.sleep(sleep_for) | ||
| return func(*args, **kwargs) | ||
| return wrapper | ||
| return decorator |
@@ -8,3 +8,3 @@ import os | ||
| __all__ = ["VXThreadPoolExecutor", "DynamicThreadPoolExecutor"] | ||
| __all__ = ["VXThreadPoolExecutor"] | ||
@@ -199,4 +199,1 @@ | ||
| return v | ||
| # 兼容旧名称 | ||
| DynamicThreadPoolExecutor = VXThreadPoolExecutor |
@@ -5,3 +5,3 @@ import unittest | ||
| from typing import List | ||
| from vxutils.decorators import retry, timer, log_exception, singleton, timeout | ||
| from vxutils.decorators import retry, timer, log_exception, singleton, timeout, rate_limit | ||
@@ -72,3 +72,19 @@ | ||
| class TestRateLimit(unittest.TestCase): | ||
| def test_rate_limit_throttles(self): | ||
| calls = {"count": 0} | ||
| @rate_limit(times=2, period=0.2) | ||
| def fn(): | ||
| calls["count"] += 1 | ||
| return calls["count"] | ||
| start = time.monotonic() | ||
| [fn() for _ in range(5)] | ||
| elapsed = time.monotonic() - start | ||
| self.assertGreaterEqual(elapsed, 0.35) | ||
| self.assertEqual(calls["count"], 5) | ||
| if __name__ == "__main__": | ||
| unittest.main() |
Alert delta unavailable
Currently unable to show alert delta for PyPI packages.
79301
1.86%1931
1.9%