You're Invited:Meet the Socket Team at RSAC and BSidesSF 2026, March 23–26.RSVP
Socket
Book a DemoSign in
Socket

tenacity

Package Overview
Dependencies
Maintainers
2
Versions
61
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

tenacity - pypi Package Compare versions

Comparing version
8.3.0
to
8.4.0
+5
releasenotes/notes/add-async-actions-b249c527d99723bb.yaml
---
features:
- |
Added the ability to use async functions for retries. This way, you can now use
asyncio coroutines for retry strategy predicates.
---
features:
- |
If you're using `Trio <https://trio.readthedocs.io>`__, then
``@retry`` now works automatically. It's no longer necessary to
pass ``sleep=trio.sleep``.
# Copyright 2016 Étienne Bersac
# Copyright 2016 Julien Danjou
# Copyright 2016 Joshua Harlow
# Copyright 2013-2014 Ray Holder
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import functools
import sys
import typing as t
import tenacity
from tenacity import AttemptManager
from tenacity import BaseRetrying
from tenacity import DoAttempt
from tenacity import DoSleep
from tenacity import RetryCallState
from tenacity import RetryError
from tenacity import after_nothing
from tenacity import before_nothing
from tenacity import _utils
# Import all built-in retry strategies for easier usage.
from .retry import RetryBaseT
from .retry import retry_all # noqa
from .retry import retry_any # noqa
from .retry import retry_if_exception # noqa
from .retry import retry_if_result # noqa
from ..retry import RetryBaseT as SyncRetryBaseT
if t.TYPE_CHECKING:
from tenacity.stop import StopBaseT
from tenacity.wait import WaitBaseT
WrappedFnReturnT = t.TypeVar("WrappedFnReturnT")
WrappedFn = t.TypeVar("WrappedFn", bound=t.Callable[..., t.Awaitable[t.Any]])
def _portable_async_sleep(seconds: float) -> t.Awaitable[None]:
# If trio is already imported, then importing it is cheap.
# If trio isn't already imported, then it's definitely not running, so we
# can skip further checks.
if "trio" in sys.modules:
# If trio is available, then sniffio is too
import trio
import sniffio
if sniffio.current_async_library() == "trio":
return trio.sleep(seconds)
# Otherwise, assume asyncio
# Lazy import asyncio as it's expensive (responsible for 25-50% of total import overhead).
import asyncio
return asyncio.sleep(seconds)
class AsyncRetrying(BaseRetrying):
def __init__(
self,
sleep: t.Callable[
[t.Union[int, float]], t.Union[None, t.Awaitable[None]]
] = _portable_async_sleep,
stop: "StopBaseT" = tenacity.stop.stop_never,
wait: "WaitBaseT" = tenacity.wait.wait_none(),
retry: "t.Union[SyncRetryBaseT, RetryBaseT]" = tenacity.retry_if_exception_type(),
before: t.Callable[
["RetryCallState"], t.Union[None, t.Awaitable[None]]
] = before_nothing,
after: t.Callable[
["RetryCallState"], t.Union[None, t.Awaitable[None]]
] = after_nothing,
before_sleep: t.Optional[
t.Callable[["RetryCallState"], t.Union[None, t.Awaitable[None]]]
] = None,
reraise: bool = False,
retry_error_cls: t.Type["RetryError"] = RetryError,
retry_error_callback: t.Optional[
t.Callable[["RetryCallState"], t.Union[t.Any, t.Awaitable[t.Any]]]
] = None,
) -> None:
super().__init__(
sleep=sleep, # type: ignore[arg-type]
stop=stop,
wait=wait,
retry=retry, # type: ignore[arg-type]
before=before, # type: ignore[arg-type]
after=after, # type: ignore[arg-type]
before_sleep=before_sleep, # type: ignore[arg-type]
reraise=reraise,
retry_error_cls=retry_error_cls,
retry_error_callback=retry_error_callback,
)
async def __call__( # type: ignore[override]
self, fn: WrappedFn, *args: t.Any, **kwargs: t.Any
) -> WrappedFnReturnT:
self.begin()
retry_state = RetryCallState(retry_object=self, fn=fn, args=args, kwargs=kwargs)
while True:
do = await self.iter(retry_state=retry_state)
if isinstance(do, DoAttempt):
try:
result = await fn(*args, **kwargs)
except BaseException: # noqa: B902
retry_state.set_exception(sys.exc_info()) # type: ignore[arg-type]
else:
retry_state.set_result(result)
elif isinstance(do, DoSleep):
retry_state.prepare_for_next_attempt()
await self.sleep(do) # type: ignore[misc]
else:
return do # type: ignore[no-any-return]
def _add_action_func(self, fn: t.Callable[..., t.Any]) -> None:
self.iter_state.actions.append(_utils.wrap_to_async_func(fn))
async def _run_retry(self, retry_state: "RetryCallState") -> None: # type: ignore[override]
self.iter_state.retry_run_result = await _utils.wrap_to_async_func(self.retry)(
retry_state
)
async def _run_wait(self, retry_state: "RetryCallState") -> None: # type: ignore[override]
if self.wait:
sleep = await _utils.wrap_to_async_func(self.wait)(retry_state)
else:
sleep = 0.0
retry_state.upcoming_sleep = sleep
async def _run_stop(self, retry_state: "RetryCallState") -> None: # type: ignore[override]
self.statistics["delay_since_first_attempt"] = retry_state.seconds_since_start
self.iter_state.stop_run_result = await _utils.wrap_to_async_func(self.stop)(
retry_state
)
async def iter(
self, retry_state: "RetryCallState"
) -> t.Union[DoAttempt, DoSleep, t.Any]: # noqa: A003
self._begin_iter(retry_state)
result = None
for action in self.iter_state.actions:
result = await action(retry_state)
return result
def __iter__(self) -> t.Generator[AttemptManager, None, None]:
raise TypeError("AsyncRetrying object is not iterable")
def __aiter__(self) -> "AsyncRetrying":
self.begin()
self._retry_state = RetryCallState(self, fn=None, args=(), kwargs={})
return self
async def __anext__(self) -> AttemptManager:
while True:
do = await self.iter(retry_state=self._retry_state)
if do is None:
raise StopAsyncIteration
elif isinstance(do, DoAttempt):
return AttemptManager(retry_state=self._retry_state)
elif isinstance(do, DoSleep):
self._retry_state.prepare_for_next_attempt()
await self.sleep(do) # type: ignore[misc]
else:
raise StopAsyncIteration
def wraps(self, fn: WrappedFn) -> WrappedFn:
fn = super().wraps(fn)
# Ensure wrapper is recognized as a coroutine function.
@functools.wraps(
fn, functools.WRAPPER_ASSIGNMENTS + ("__defaults__", "__kwdefaults__")
)
async def async_wrapped(*args: t.Any, **kwargs: t.Any) -> t.Any:
return await fn(*args, **kwargs)
# Preserve attributes
async_wrapped.retry = fn.retry # type: ignore[attr-defined]
async_wrapped.retry_with = fn.retry_with # type: ignore[attr-defined]
return async_wrapped # type: ignore[return-value]
__all__ = [
"retry_all",
"retry_any",
"retry_if_exception",
"retry_if_result",
"WrappedFn",
"AsyncRetrying",
]
# Copyright 2016–2021 Julien Danjou
# Copyright 2016 Joshua Harlow
# Copyright 2013-2014 Ray Holder
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import abc
import typing
from tenacity import _utils
from tenacity import retry_base
if typing.TYPE_CHECKING:
from tenacity import RetryCallState
class async_retry_base(retry_base):
"""Abstract base class for async retry strategies."""
@abc.abstractmethod
async def __call__(self, retry_state: "RetryCallState") -> bool: # type: ignore[override]
pass
def __and__( # type: ignore[override]
self, other: "typing.Union[retry_base, async_retry_base]"
) -> "retry_all":
return retry_all(self, other)
def __rand__( # type: ignore[misc,override]
self, other: "typing.Union[retry_base, async_retry_base]"
) -> "retry_all":
return retry_all(other, self)
def __or__( # type: ignore[override]
self, other: "typing.Union[retry_base, async_retry_base]"
) -> "retry_any":
return retry_any(self, other)
def __ror__( # type: ignore[misc,override]
self, other: "typing.Union[retry_base, async_retry_base]"
) -> "retry_any":
return retry_any(other, self)
RetryBaseT = typing.Union[
async_retry_base, typing.Callable[["RetryCallState"], typing.Awaitable[bool]]
]
class retry_if_exception(async_retry_base):
"""Retry strategy that retries if an exception verifies a predicate."""
def __init__(
self, predicate: typing.Callable[[BaseException], typing.Awaitable[bool]]
) -> None:
self.predicate = predicate
async def __call__(self, retry_state: "RetryCallState") -> bool: # type: ignore[override]
if retry_state.outcome is None:
raise RuntimeError("__call__() called before outcome was set")
if retry_state.outcome.failed:
exception = retry_state.outcome.exception()
if exception is None:
raise RuntimeError("outcome failed but the exception is None")
return await self.predicate(exception)
else:
return False
class retry_if_result(async_retry_base):
"""Retries if the result verifies a predicate."""
def __init__(
self, predicate: typing.Callable[[typing.Any], typing.Awaitable[bool]]
) -> None:
self.predicate = predicate
async def __call__(self, retry_state: "RetryCallState") -> bool: # type: ignore[override]
if retry_state.outcome is None:
raise RuntimeError("__call__() called before outcome was set")
if not retry_state.outcome.failed:
return await self.predicate(retry_state.outcome.result())
else:
return False
class retry_any(async_retry_base):
"""Retries if any of the retries condition is valid."""
def __init__(self, *retries: typing.Union[retry_base, async_retry_base]) -> None:
self.retries = retries
async def __call__(self, retry_state: "RetryCallState") -> bool: # type: ignore[override]
result = False
for r in self.retries:
result = result or await _utils.wrap_to_async_func(r)(retry_state)
if result:
break
return result
class retry_all(async_retry_base):
"""Retries if all the retries condition are valid."""
def __init__(self, *retries: typing.Union[retry_base, async_retry_base]) -> None:
self.retries = retries
async def __call__(self, retry_state: "RetryCallState") -> bool: # type: ignore[override]
result = True
for r in self.retries:
result = result and await _utils.wrap_to_async_func(r)(retry_state)
if not result:
break
return result
+3
-3

@@ -30,3 +30,3 @@ name: Continuous Integration

- python: "3.12"
tox: py312
tox: py312,py312-trio
- python: "3.12"

@@ -38,3 +38,3 @@ tox: pep8

- name: Checkout 🛎️
uses: actions/checkout@v4.1.1
uses: actions/checkout@v4.1.6
with:

@@ -44,3 +44,3 @@ fetch-depth: 0

- name: Setup Python 🔧
uses: actions/setup-python@v5.0.0
uses: actions/setup-python@v5.1.0
with:

@@ -47,0 +47,0 @@ python-version: ${{ matrix.python }}

@@ -14,3 +14,3 @@ name: Release deploy

- name: Checkout 🛎️
uses: actions/checkout@v4.1.1
uses: actions/checkout@v4.1.6
with:

@@ -20,3 +20,3 @@ fetch-depth: 0

- name: Setup Python 🔧
uses: actions/setup-python@v5.0.0
uses: actions/setup-python@v5.1.0
with:

@@ -23,0 +23,0 @@ python-version: 3.11

@@ -17,3 +17,3 @@ queue_rules:

- "check-success=test (3.11, py311)"
- "check-success=test (3.12, py312)"
- "check-success=test (3.12, py312,py312-trio)"
- "check-success=test (3.12, pep8)"

@@ -20,0 +20,0 @@

@@ -82,3 +82,3 @@ Tenacity

.. testsetup:: *
.. testsetup::

@@ -572,3 +572,3 @@ import logging

Finally, ``retry`` works also on asyncio and Tornado (>= 4.5) coroutines.
Finally, ``retry`` works also on asyncio, Trio, and Tornado (>= 4.5) coroutines.
Sleeps are done asynchronously too.

@@ -579,3 +579,3 @@

@retry
async def my_async_function(loop):
async def my_asyncio_function(loop):
await loop.getaddrinfo('8.8.8.8', 53)

@@ -586,12 +586,18 @@

@retry
async def my_async_trio_function():
await trio.socket.getaddrinfo('8.8.8.8', 53)
.. code-block:: python
@retry
@tornado.gen.coroutine
def my_async_function(http_client, url):
def my_async_tornado_function(http_client, url):
yield http_client.fetch(url)
You can even use alternative event loops such as `curio` or `Trio` by passing the correct sleep function:
You can even use alternative event loops such as `curio` by passing the correct sleep function:
.. code-block:: python
@retry(sleep=trio.sleep)
async def my_async_function(loop):
@retry(sleep=curio.sleep)
async def my_async_curio_function():
await asks.get('https://example.org')

@@ -598,0 +604,0 @@

Metadata-Version: 2.1
Name: tenacity
Version: 8.3.0
Version: 8.4.0
Summary: Retry code until it succeeds

@@ -5,0 +5,0 @@ Home-page: https://github.com/jd/tenacity

@@ -82,3 +82,3 @@ Tenacity

.. testsetup:: *
.. testsetup::

@@ -572,3 +572,3 @@ import logging

Finally, ``retry`` works also on asyncio and Tornado (>= 4.5) coroutines.
Finally, ``retry`` works also on asyncio, Trio, and Tornado (>= 4.5) coroutines.
Sleeps are done asynchronously too.

@@ -579,3 +579,3 @@

@retry
async def my_async_function(loop):
async def my_asyncio_function(loop):
await loop.getaddrinfo('8.8.8.8', 53)

@@ -586,12 +586,18 @@

@retry
async def my_async_trio_function():
await trio.socket.getaddrinfo('8.8.8.8', 53)
.. code-block:: python
@retry
@tornado.gen.coroutine
def my_async_function(http_client, url):
def my_async_tornado_function(http_client, url):
yield http_client.fetch(url)
You can even use alternative event loops such as `curio` or `Trio` by passing the correct sleep function:
You can even use alternative event loops such as `curio` by passing the correct sleep function:
.. code-block:: python
@retry(sleep=trio.sleep)
async def my_async_function(loop):
@retry(sleep=curio.sleep)
async def my_async_curio_function():
await asks.get('https://example.org')

@@ -598,0 +604,0 @@

Metadata-Version: 2.1
Name: tenacity
Version: 8.3.0
Version: 8.4.0
Summary: Retry code until it succeeds

@@ -5,0 +5,0 @@ Home-page: https://github.com/jd/tenacity

@@ -21,2 +21,3 @@ .editorconfig

releasenotes/notes/Use--for-formatting-and-validate-using-black-39ec9d57d4691778.yaml
releasenotes/notes/add-async-actions-b249c527d99723bb.yaml
releasenotes/notes/add-reno-d1ab5710f272650a.yaml

@@ -52,5 +53,5 @@ releasenotes/notes/add-retry_except_exception_type-31b31da1924d55f4.yaml

releasenotes/notes/timedelta-for-stop-ef6bf71b88ce9988.yaml
releasenotes/notes/trio-support-retry-22bd544800cd1f36.yaml
releasenotes/notes/wait_exponential_jitter-6ffc81dddcbaa6d3.yaml
tenacity/__init__.py
tenacity/_asyncio.py
tenacity/_utils.py

@@ -71,2 +72,4 @@ tenacity/after.py

tenacity.egg-info/top_level.txt
tenacity/asyncio/__init__.py
tenacity/asyncio/retry.py
tests/__init__.py

@@ -73,0 +76,0 @@ tests/test_after.py

@@ -27,4 +27,5 @@ # Copyright 2016-2018 Julien Danjou

from concurrent import futures
from inspect import iscoroutinefunction
from . import _utils
# Import all built-in retry strategies for easier usage.

@@ -91,2 +92,3 @@ from .retry import retry_base # noqa

from . import asyncio as tasyncio
from .retry import RetryBaseT

@@ -598,12 +600,20 @@ from .stop import StopBaseT

def retry(
sleep: t.Callable[[t.Union[int, float]], t.Optional[t.Awaitable[None]]] = sleep,
sleep: t.Callable[[t.Union[int, float]], t.Union[None, t.Awaitable[None]]] = sleep,
stop: "StopBaseT" = stop_never,
wait: "WaitBaseT" = wait_none(),
retry: "RetryBaseT" = retry_if_exception_type(),
before: t.Callable[["RetryCallState"], None] = before_nothing,
after: t.Callable[["RetryCallState"], None] = after_nothing,
before_sleep: t.Optional[t.Callable[["RetryCallState"], None]] = None,
retry: "t.Union[RetryBaseT, tasyncio.retry.RetryBaseT]" = retry_if_exception_type(),
before: t.Callable[
["RetryCallState"], t.Union[None, t.Awaitable[None]]
] = before_nothing,
after: t.Callable[
["RetryCallState"], t.Union[None, t.Awaitable[None]]
] = after_nothing,
before_sleep: t.Optional[
t.Callable[["RetryCallState"], t.Union[None, t.Awaitable[None]]]
] = None,
reraise: bool = False,
retry_error_cls: t.Type["RetryError"] = RetryError,
retry_error_callback: t.Optional[t.Callable[["RetryCallState"], t.Any]] = None,
retry_error_callback: t.Optional[
t.Callable[["RetryCallState"], t.Union[t.Any, t.Awaitable[t.Any]]]
] = None,
) -> t.Callable[[WrappedFn], WrappedFn]: ...

@@ -630,3 +640,3 @@

r: "BaseRetrying"
if iscoroutinefunction(f):
if _utils.is_coroutine_callable(f):
r = AsyncRetrying(*dargs, **dkw)

@@ -647,3 +657,3 @@ elif (

from tenacity._asyncio import AsyncRetrying # noqa:E402,I100
from tenacity.asyncio import AsyncRetrying # noqa:E402,I100

@@ -650,0 +660,0 @@ if tornado:

@@ -90,1 +90,13 @@ # Copyright 2016 Julien Danjou

return inspect.iscoroutinefunction(dunder_call)
def wrap_to_async_func(
call: typing.Callable[..., typing.Any],
) -> typing.Callable[..., typing.Awaitable[typing.Any]]:
if is_coroutine_callable(call):
return call
async def inner(*args: typing.Any, **kwargs: typing.Any) -> typing.Any:
return call(*args, **kwargs)
return inner

@@ -33,8 +33,14 @@ # Copyright 2016–2021 Julien Danjou

def __and__(self, other: "retry_base") -> "retry_all":
return retry_all(self, other)
return other.__rand__(self)
def __rand__(self, other: "retry_base") -> "retry_all":
return retry_all(other, self)
def __or__(self, other: "retry_base") -> "retry_any":
return retry_any(self, other)
return other.__ror__(self)
def __ror__(self, other: "retry_base") -> "retry_any":
return retry_any(other, self)
RetryBaseT = typing.Union[retry_base, typing.Callable[["RetryCallState"], bool]]

@@ -41,0 +47,0 @@

@@ -21,2 +21,9 @@ # mypy: disable-error-code="no-untyped-def,no-untyped-call"

try:
import trio
except ImportError:
have_trio = False
else:
have_trio = True
import pytest

@@ -26,4 +33,4 @@

from tenacity import AsyncRetrying, RetryError
from tenacity import _asyncio as tasyncio
from tenacity import retry, retry_if_result, stop_after_attempt
from tenacity import asyncio as tasyncio
from tenacity import retry, retry_if_exception, retry_if_result, stop_after_attempt
from tenacity.wait import wait_fixed

@@ -60,3 +67,3 @@

class TestAsync(unittest.TestCase):
class TestAsyncio(unittest.TestCase):
@asynctest

@@ -144,2 +151,17 @@ async def test_retry(self):

@unittest.skipIf(not have_trio, "trio not installed")
class TestTrio(unittest.TestCase):
def test_trio_basic(self):
thing = NoIOErrorAfterCount(5)
@retry
async def trio_function():
await trio.sleep(0.00001)
return thing.go()
trio.run(trio_function)
assert thing.counter == thing.count
class TestContextManager(unittest.TestCase):

@@ -210,2 +232,163 @@ @asynctest

@asynctest
async def test_retry_with_async_result(self):
async def test():
attempts = 0
async def lt_3(x: float) -> bool:
return x < 3
async for attempt in tasyncio.AsyncRetrying(
retry=tasyncio.retry_if_result(lt_3)
):
with attempt:
attempts += 1
assert attempt.retry_state.outcome # help mypy
if not attempt.retry_state.outcome.failed:
attempt.retry_state.set_result(attempts)
return attempts
result = await test()
self.assertEqual(3, result)
@asynctest
async def test_retry_with_async_exc(self):
async def test():
attempts = 0
class CustomException(Exception):
pass
async def is_exc(e: BaseException) -> bool:
return isinstance(e, CustomException)
async for attempt in tasyncio.AsyncRetrying(
retry=tasyncio.retry_if_exception(is_exc)
):
with attempt:
attempts += 1
if attempts < 3:
raise CustomException()
assert attempt.retry_state.outcome # help mypy
if not attempt.retry_state.outcome.failed:
attempt.retry_state.set_result(attempts)
return attempts
result = await test()
self.assertEqual(3, result)
@asynctest
async def test_retry_with_async_result_or(self):
async def test():
attempts = 0
async def lt_3(x: float) -> bool:
return x < 3
class CustomException(Exception):
pass
def is_exc(e: BaseException) -> bool:
return isinstance(e, CustomException)
retry_strategy = tasyncio.retry_if_result(lt_3) | retry_if_exception(is_exc)
async for attempt in tasyncio.AsyncRetrying(retry=retry_strategy):
with attempt:
attempts += 1
if 2 < attempts < 4:
raise CustomException()
assert attempt.retry_state.outcome # help mypy
if not attempt.retry_state.outcome.failed:
attempt.retry_state.set_result(attempts)
return attempts
result = await test()
self.assertEqual(4, result)
@asynctest
async def test_retry_with_async_result_ror(self):
async def test():
attempts = 0
def lt_3(x: float) -> bool:
return x < 3
class CustomException(Exception):
pass
async def is_exc(e: BaseException) -> bool:
return isinstance(e, CustomException)
retry_strategy = retry_if_result(lt_3) | tasyncio.retry_if_exception(is_exc)
async for attempt in tasyncio.AsyncRetrying(retry=retry_strategy):
with attempt:
attempts += 1
if 2 < attempts < 4:
raise CustomException()
assert attempt.retry_state.outcome # help mypy
if not attempt.retry_state.outcome.failed:
attempt.retry_state.set_result(attempts)
return attempts
result = await test()
self.assertEqual(4, result)
@asynctest
async def test_retry_with_async_result_and(self):
async def test():
attempts = 0
async def lt_3(x: float) -> bool:
return x < 3
def gt_0(x: float) -> bool:
return x > 0
retry_strategy = tasyncio.retry_if_result(lt_3) & retry_if_result(gt_0)
async for attempt in tasyncio.AsyncRetrying(retry=retry_strategy):
with attempt:
attempts += 1
attempt.retry_state.set_result(attempts)
return attempts
result = await test()
self.assertEqual(3, result)
@asynctest
async def test_retry_with_async_result_rand(self):
async def test():
attempts = 0
async def lt_3(x: float) -> bool:
return x < 3
def gt_0(x: float) -> bool:
return x > 0
retry_strategy = retry_if_result(gt_0) & tasyncio.retry_if_result(lt_3)
async for attempt in tasyncio.AsyncRetrying(retry=retry_strategy):
with attempt:
attempts += 1
attempt.retry_state.set_result(attempts)
return attempts
result = await test()
self.assertEqual(3, result)
@asynctest
async def test_async_retying_iterator(self):

@@ -212,0 +395,0 @@ thing = NoIOErrorAfterCount(5)

[tox]
envlist = py3{8,9,10,11,12}, pep8, pypy3
envlist = py3{8,9,10,11,12,12-trio}, pep8, pypy3
skip_missing_interpreters = True

@@ -11,2 +11,3 @@

.[doc]
trio: trio
commands =

@@ -28,2 +29,3 @@ py3{8,9,10,11,12},pypy3: pytest {posargs}

pytest # for stubs
trio
commands =

@@ -35,2 +37,2 @@ mypy {posargs}

deps = reno
commands = reno {posargs}
commands = reno {posargs}
# Copyright 2016 Étienne Bersac
# Copyright 2016 Julien Danjou
# Copyright 2016 Joshua Harlow
# Copyright 2013-2014 Ray Holder
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import functools
import sys
import typing as t
from tenacity import AttemptManager
from tenacity import BaseRetrying
from tenacity import DoAttempt
from tenacity import DoSleep
from tenacity import RetryCallState
from tenacity import _utils
WrappedFnReturnT = t.TypeVar("WrappedFnReturnT")
WrappedFn = t.TypeVar("WrappedFn", bound=t.Callable[..., t.Awaitable[t.Any]])
def asyncio_sleep(duration: float) -> t.Awaitable[None]:
# Lazy import asyncio as it's expensive (responsible for 25-50% of total import overhead).
import asyncio
return asyncio.sleep(duration)
class AsyncRetrying(BaseRetrying):
sleep: t.Callable[[float], t.Awaitable[t.Any]]
def __init__(
self,
sleep: t.Callable[[float], t.Awaitable[t.Any]] = asyncio_sleep,
**kwargs: t.Any,
) -> None:
super().__init__(**kwargs)
self.sleep = sleep
async def __call__( # type: ignore[override]
self, fn: WrappedFn, *args: t.Any, **kwargs: t.Any
) -> WrappedFnReturnT:
self.begin()
retry_state = RetryCallState(retry_object=self, fn=fn, args=args, kwargs=kwargs)
while True:
do = await self.iter(retry_state=retry_state)
if isinstance(do, DoAttempt):
try:
result = await fn(*args, **kwargs)
except BaseException: # noqa: B902
retry_state.set_exception(sys.exc_info()) # type: ignore[arg-type]
else:
retry_state.set_result(result)
elif isinstance(do, DoSleep):
retry_state.prepare_for_next_attempt()
await self.sleep(do)
else:
return do # type: ignore[no-any-return]
@classmethod
def _wrap_action_func(cls, fn: t.Callable[..., t.Any]) -> t.Callable[..., t.Any]:
if _utils.is_coroutine_callable(fn):
return fn
async def inner(*args: t.Any, **kwargs: t.Any) -> t.Any:
return fn(*args, **kwargs)
return inner
def _add_action_func(self, fn: t.Callable[..., t.Any]) -> None:
self.iter_state.actions.append(self._wrap_action_func(fn))
async def _run_retry(self, retry_state: "RetryCallState") -> None: # type: ignore[override]
self.iter_state.retry_run_result = await self._wrap_action_func(self.retry)(
retry_state
)
async def _run_wait(self, retry_state: "RetryCallState") -> None: # type: ignore[override]
if self.wait:
sleep = await self._wrap_action_func(self.wait)(retry_state)
else:
sleep = 0.0
retry_state.upcoming_sleep = sleep
async def _run_stop(self, retry_state: "RetryCallState") -> None: # type: ignore[override]
self.statistics["delay_since_first_attempt"] = retry_state.seconds_since_start
self.iter_state.stop_run_result = await self._wrap_action_func(self.stop)(
retry_state
)
async def iter(
self, retry_state: "RetryCallState"
) -> t.Union[DoAttempt, DoSleep, t.Any]: # noqa: A003
self._begin_iter(retry_state)
result = None
for action in self.iter_state.actions:
result = await action(retry_state)
return result
def __iter__(self) -> t.Generator[AttemptManager, None, None]:
raise TypeError("AsyncRetrying object is not iterable")
def __aiter__(self) -> "AsyncRetrying":
self.begin()
self._retry_state = RetryCallState(self, fn=None, args=(), kwargs={})
return self
async def __anext__(self) -> AttemptManager:
while True:
do = await self.iter(retry_state=self._retry_state)
if do is None:
raise StopAsyncIteration
elif isinstance(do, DoAttempt):
return AttemptManager(retry_state=self._retry_state)
elif isinstance(do, DoSleep):
self._retry_state.prepare_for_next_attempt()
await self.sleep(do)
else:
raise StopAsyncIteration
def wraps(self, fn: WrappedFn) -> WrappedFn:
fn = super().wraps(fn)
# Ensure wrapper is recognized as a coroutine function.
@functools.wraps(
fn, functools.WRAPPER_ASSIGNMENTS + ("__defaults__", "__kwdefaults__")
)
async def async_wrapped(*args: t.Any, **kwargs: t.Any) -> t.Any:
return await fn(*args, **kwargs)
# Preserve attributes
async_wrapped.retry = fn.retry # type: ignore[attr-defined]
async_wrapped.retry_with = fn.retry_with # type: ignore[attr-defined]
return async_wrapped # type: ignore[return-value]