
aioretry
Asyncio retry utility for Python 3.7+
Install
$ pip install aioretry
A conda-forge recipe is also available, so you can also use
conda install -c conda-forge aioretry
Usage
import asyncio
from aioretry import (
retry,
RetryPolicyStrategy,
RetryInfo
)
def retry_policy(info: RetryInfo) -> RetryPolicyStrategy:
"""
- It will always retry until succeeded: abandon = False
- If fails for the first time, it will retry immediately,
- If it fails again,
aioretry will perform a 100ms delay before the second retry,
200ms delay before the 3rd retry,
the 4th retry immediately,
100ms delay before the 5th retry,
etc...
"""
return False, (info.fails - 1) % 3 * 0.1
@retry(retry_policy)
async def connect_to_server():
...
asyncio.run(connect_to_server())
Use as class instance method decorator
We could also use retry
as a decorator for instance method
class Client:
@retry(retry_policy)
async def connect(self):
await self._connect()
asyncio.run(Client().connect())
Use instance method as retry policy
retry_policy
could be the method name of the class if retry
is used as a decorator for instance method.
class ClientWithConfigurableRetryPolicy(Client):
MAX_RETRIES = 3
@classmethod
def class_retry_policy(cls, info: RetryInfo) -> RetryPolicyStrategy:
return info.fails > cls.MAX_RETRIES, info.fails * 0.1
@staticmethod
def static_retry_policy(info: RetryInfo) -> RetryPolicyStrategy:
return info.fails > 3, info.fails * 0.1
def __init__(self, max_retries: int = 3):
self._max_retries = max_retries
def _retry_policy(self, info: RetryInfo) -> RetryPolicyStrategy:
return info.fails > self._max_retries, info.fails * 0.1
@retry('_retry_policy')
async def connect_with_retry_policy_name(self):
await self._connect()
@retry(_retry_policy)
async def connect_with_method_retry_policy(self):
await self._connect()
@retry(class_retry_policy)
async def connect_with_class_retry_policy(self):
await self._connect()
@retry(static_retry_policy)
async def connect_with_static_retry_policy(self):
await self._connect()
asyncio.run(ClientWithConfigurableRetryPolicy(10).connect_with_retry_policy_name())
asyncio.run(ClientWithConfigurableRetryPolicy(10).connect_with_method_retry_policy())
asyncio.run(ClientWithConfigurableRetryPolicy().connect_with_class_retry_policy())
asyncio.run(ClientWithConfigurableRetryPolicy().connect_with_static_retry_policy())
Register an before_retry
callback
We could also register an before_retry
callback which will be executed after every failure of the target function if the corresponding retry is not abandoned.
class ClientTrackableFailures(ClientWithConfigurableRetryPolicy):
async def _before_retry(self, info: RetryInfo) -> None:
await self._send_failure_log(info.exception, info.fails)
@retry(
retry_policy='_retry_policy',
before_retry='_before_retry'
)
async def connect(self):
await self._connect()
Only retry for certain types of exceptions
def retry_policy(info: RetryInfo) -> RetryPolicyStrategy:
if isinstance(info.exception, (KeyError, ValueError)):
return True, 0
return False, 0
@retry(retry_policy)
async def foo():
...
APIs
retry(retry_policy, before_retry)(fn)
- fn
Callable[[...], Awaitable]
the function to be wrapped. The function should be an async function or normal function returns an awaitable. - retry_policy
Union[str, RetryPolicy]
- before_retry?
Optional[Union[str, Callable[[RetryInfo], Optional[Awaitable]]]]
If specified, before_retry
is called after each failure of fn
and before the corresponding retry. If the retry is abandoned, before_retry
will not be executed.
Returns a wrapped function which accepts the same arguments as fn
and returns an Awaitable
.
RetryPolicy
RetryPolicyStrategy = Tuple[bool, int | float]
RetryPolicy = Callable[[RetryInfo], RetryPolicyStrategy | Awaitable[RetryPolicyStrategy]]
Retry policy is used to determine what to do next after the fn
fails to do some certain thing.
rt = retry_policy(info)
abandon, delay = (
await rt
if inspect.isawaitable(rt)
else rt
)
- info
RetryInfo
- info.fails
int
is the counter number of how many times function fn
performs as a failure. If fn
fails for the first time, then fails
will be 1
. - info.exception
Exception
is the exception that fn
raised. - info.since
float
is the fractional time seconds generated by time.monotonic()
when the first failure happens.
- If
abandon
is True
, then aioretry will give up the retry and raise the exception directly, otherwise aioretry will sleep delay
seconds (asyncio.sleep(delay)
) before the next retry.
def retry_policy(info: RetryInfo):
if isinstance(info.exception, KeyError):
return True, 0
return False, info.fails * 0.1
Python typings
from aioretry import (
RetryPolicy,
RetryPolicyStrategy,
BeforeRetry,
RetryInfo
)
Upgrade guide
Since 5.0.0
, aioretry introduces RetryInfo
as the only parameter of retry_policy
or before_retry
2.x -> 5.x
2.x
def retry_policy(fails: int):
"""A policy that gives no chances to retry
"""
return True, 0.1 * fails
5.x
def retry_policy(info: RetryInfo):
return True, 0.1 * info.fails
3.x -> 5.x
3.x
def before_retry(e: Exception, fails: int):
...
5.x
def before_retry(info: RetryInfo):
info.exception
info.fails
...
4.x -> 5.x
Since 5.0.0
, both retry_policy
and before_retry
have only one parameter of type RetryInfo
respectively.
5.x -> 6.x
Since 6.0.0
, RetryInfo::since
is a float
value which is generated by time.monotonic()
and is better for measuring intervals than datetime
, while in 5.x
RetryInfo::since
is a datetime
from datetime import datetime
def retry_policy(info: RetryInfo) -> RetryPolicyStrategy:
print('error occurred', (datetime.now() - info.since).total_seconds(), 'seconds ago')
...
import time
def retry_policy(info: RetryInfo) -> RetryPolicyStrategy:
print('error occurred', time.monotonic() - info.since, 'seconds ago')
...
License
MIT