You're Invited:Meet the Socket Team at RSAC and BSidesSF 2026, March 23–26.RSVP
Socket
Book a DemoSign in
Socket

auth0-python

Package Overview
Dependencies
Maintainers
1
Versions
69
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

auth0-python - pypi Package Compare versions

Comparing version
3.23.1
to
3.24.0
+182
auth0/v3/authentication/async_token_verifier.py
"""Token Verifier module"""
from .. import TokenValidationError
from ..rest_async import AsyncRestClient
from .token_verifier import AsymmetricSignatureVerifier, JwksFetcher, TokenVerifier
class AsyncAsymmetricSignatureVerifier(AsymmetricSignatureVerifier):
"""Async verifier for RSA signatures, which rely on public key certificates.
Args:
jwks_url (str): The url where the JWK set is located.
algorithm (str, optional): The expected signing algorithm. Defaults to "RS256".
"""
def __init__(self, jwks_url, algorithm="RS256"):
super(AsyncAsymmetricSignatureVerifier, self).__init__(jwks_url, algorithm)
self._fetcher = AsyncJwksFetcher(jwks_url)
def set_session(self, session):
"""Set Client Session to improve performance by reusing session.
Args:
session (aiohttp.ClientSession): The client session which should be closed
manually or within context manager.
"""
self._fetcher.set_session(session)
async def _fetch_key(self, key_id=None):
"""Request the JWKS.
Args:
key_id (str): The key's key id."""
return await self._fetcher.get_key(key_id)
async def verify_signature(self, token):
"""Verifies the signature of the given JSON web token.
Args:
token (str): The JWT to get its signature verified.
Raises:
TokenValidationError: if the token cannot be decoded, the algorithm is invalid
or the token's signature doesn't match the calculated one.
"""
kid = self._get_kid(token)
secret_or_certificate = await self._fetch_key(key_id=kid)
return self._decode_jwt(token, secret_or_certificate)
class AsyncJwksFetcher(JwksFetcher):
"""Class that async fetches and holds a JSON web key set.
This class makes use of an in-memory cache. For it to work properly, define this instance once and re-use it.
Args:
jwks_url (str): The url where the JWK set is located.
cache_ttl (str, optional): The lifetime of the JWK set cache in seconds. Defaults to 600 seconds.
"""
def __init__(self, *args, **kwargs):
super(AsyncJwksFetcher, self).__init__(*args, **kwargs)
self._async_client = AsyncRestClient(None)
def set_session(self, session):
"""Set Client Session to improve performance by reusing session.
Args:
session (aiohttp.ClientSession): The client session which should be closed
manually or within context manager.
"""
self._async_client.set_session(session)
async def _fetch_jwks(self, force=False):
"""Attempts to obtain the JWK set from the cache, as long as it's still valid.
When not, it will perform a network request to the jwks_url to obtain a fresh result
and update the cache value with it.
Args:
force (bool, optional): whether to ignore the cache and force a network request or not. Defaults to False.
"""
if force or self._cache_expired():
self._cache_value = {}
try:
jwks = await self._async_client.get(self._jwks_url)
self._cache_jwks(jwks)
except: # noqa: E722
return self._cache_value
return self._cache_value
self._cache_is_fresh = False
return self._cache_value
async def get_key(self, key_id):
"""Obtains the JWK associated with the given key id.
Args:
key_id (str): The id of the key to fetch.
Returns:
the JWK associated with the given key id.
Raises:
TokenValidationError: when a key with that id cannot be found
"""
keys = await self._fetch_jwks()
if keys and key_id in keys:
return keys[key_id]
if not self._cache_is_fresh:
keys = await self._fetch_jwks(force=True)
if keys and key_id in keys:
return keys[key_id]
raise TokenValidationError(
'RSA Public Key with ID "{}" was not found.'.format(key_id)
)
class AsyncTokenVerifier(TokenVerifier):
"""Class that verifies ID tokens following the steps defined in the OpenID Connect spec.
An OpenID Connect ID token is not meant to be consumed until it's verified.
Args:
signature_verifier (AsyncAsymmetricSignatureVerifier): The instance that knows how to verify the signature.
issuer (str): The expected issuer claim value.
audience (str): The expected audience claim value.
leeway (int, optional): The clock skew to accept when verifying date related claims in seconds.
Defaults to 60 seconds.
"""
def __init__(self, signature_verifier, issuer, audience, leeway=0):
if not signature_verifier or not isinstance(
signature_verifier, AsyncAsymmetricSignatureVerifier
):
raise TypeError(
"signature_verifier must be an instance of AsyncAsymmetricSignatureVerifier."
)
self.iss = issuer
self.aud = audience
self.leeway = leeway
self._sv = signature_verifier
self._clock = None # legacy testing requirement
def set_session(self, session):
"""Set Client Session to improve performance by reusing session.
Args:
session (aiohttp.ClientSession): The client session which should be closed
manually or within context manager.
"""
self._sv.set_session(session)
async def verify(self, token, nonce=None, max_age=None, organization=None):
"""Attempts to verify the given ID token, following the steps defined in the OpenID Connect spec.
Args:
token (str): The JWT to verify.
nonce (str, optional): The nonce value sent during authentication.
max_age (int, optional): The max_age value sent during authentication.
organization (str, optional): The expected organization ID (org_id) claim value. This should be specified
when logging in to an organization.
Returns:
the decoded payload from the token
Raises:
TokenValidationError: when the token cannot be decoded, the token signing algorithm is not the expected one,
the token signature is invalid or the token has a claim missing or with unexpected value.
"""
# Verify token presence
if not token or not isinstance(token, str):
raise TokenValidationError("ID token is required but missing.")
# Verify algorithm and signature
payload = await self._sv.verify_signature(token)
# Verify claims
self._verify_payload(payload, nonce, max_age, organization)
return payload
import aiohttp
from ..asyncify import asyncify
from .auth0 import modules
class AsyncAuth0(object):
"""Provides easy access to all endpoint classes
Args:
domain (str): Your Auth0 domain, for example 'username.auth0.com'
token (str): Management API v2 Token
rest_options (RestClientOptions): Pass an instance of
RestClientOptions to configure additional RestClient
options, such as rate-limit retries.
(defaults to None)
"""
def __init__(self, domain, token, rest_options=None):
self._services = []
for name, cls in modules.items():
cls = asyncify(cls)
service = cls(domain=domain, token=token, rest_options=rest_options)
self._services.append(service)
setattr(
self,
name,
service,
)
def set_session(self, session):
"""Set Client Session to improve performance by reusing session.
Args:
session (aiohttp.ClientSession): The client session which should be closed
manually or within context manager.
"""
self._session = session
for service in self._services:
service.set_session(self._session)
async def __aenter__(self):
"""Automatically create and set session within context manager."""
self.set_session(aiohttp.ClientSession())
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Automatically close session within context manager."""
await self._session.close()
import base64
import json
import platform
import re
import sys
from tempfile import TemporaryFile
from unittest import IsolatedAsyncioTestCase
import aiohttp
from aioresponses import CallbackResult, aioresponses
from callee import Attrs
from mock import ANY, MagicMock
from auth0.v3.management.async_auth0 import AsyncAuth0 as Auth0
clients = re.compile(r"^https://example\.com/api/v2/clients.*")
factors = re.compile(r"^https://example\.com/api/v2/guardian/factors.*")
payload = {"foo": "bar"}
def get_callback(status=200):
mock = MagicMock(return_value=CallbackResult(status=status, payload=payload))
def callback(url, **kwargs):
return mock(url, **kwargs)
return callback, mock
class TestAsyncify(IsolatedAsyncioTestCase):
@aioresponses()
async def test_get(self, mocked):
callback, mock = get_callback()
mocked.get(clients, callback=callback)
auth0 = Auth0(domain="example.com", token="jwt")
self.assertEqual(await auth0.clients.all_async(), payload)
mock.assert_called_with(
Attrs(path="/api/v2/clients"),
allow_redirects=True,
params={"include_fields": "true"},
headers=ANY,
timeout=ANY,
)
@aioresponses()
async def test_shared_session(self, mocked):
callback, mock = get_callback()
callback2, mock2 = get_callback()
mocked.get(clients, callback=callback)
mocked.put(factors, callback=callback2)
async with Auth0(domain="example.com", token="jwt") as auth0:
self.assertEqual(await auth0.clients.all_async(), payload)
self.assertEqual(
await auth0.guardian.update_factor_async("factor-1", {"factor": 1}),
payload,
)
mock.assert_called_with(
Attrs(path="/api/v2/clients"),
allow_redirects=True,
params={"include_fields": "true"},
headers=ANY,
timeout=ANY,
)
mock2.assert_called_with(
Attrs(path="/api/v2/guardian/factors/factor-1"),
allow_redirects=True,
json={"factor": 1},
headers=ANY,
timeout=ANY,
)
import time
import unittest
import jwt
from aioresponses import aioresponses
from callee import Attrs
from cryptography.hazmat.primitives import serialization
from mock import ANY
from .. import TokenValidationError
from ..authentication.async_token_verifier import (
AsyncAsymmetricSignatureVerifier,
AsyncJwksFetcher,
AsyncTokenVerifier,
)
from ..test.authentication.test_token_verifier import (
JWKS_RESPONSE_MULTIPLE_KEYS,
JWKS_RESPONSE_SINGLE_KEY,
RSA_PUB_KEY_1_JWK,
RSA_PUB_KEY_1_PEM,
RSA_PUB_KEY_2_PEM,
)
from .test_asyncify import get_callback
JWKS_URI = "https://example.auth0.com/.well-known/jwks.json"
PRIVATE_KEY = """-----BEGIN RSA PRIVATE KEY-----
MIICXAIBAAKBgQDfytWVSk/4Z6rNu8UZ7C4tnU9x0vj5FCaj4awKZlxVgOR1Kcen
QqDOxJdrXXanTBJbZwh8pk+HpWvqDVgVmKhnt+OkgF//hIXZoJMhDOFVzX504kiZ
cu3bu7kFs+PUfKw5s59tmETFPseA/fIrad9YXHisMkNmPWhuKYJ3WfZAaQIDAQAB
AoGADPSfHL9qlcTanIJsTK3hln5u5PYDt9e0zPP5k7iNS93kW+wJROOUj6PN6EdG
4TSEM4ppcV3naMDo2GnhWY624P6LUB+CbDFzjQKq805vrxJuFnq50blscwVK/ffP
kODBm/gwk+FaliRpQTDAAPWkKbkRfkmPx4JMEmTDBQ45diECQQDxw3qp2+wa5WP5
9w7AYrDPq4Fd6gIFcmxracROUcdhhMmVHKA9DzTWY46cSoWZoChYhQhhyj8dlP8q
El8aevN9AkEA7PhxcNyff8aehqEQ/Z38bm3P+GgB9EkRinjesba2CqhEI5okzvb7
OIYdszgQUBqGKlST0a7s9KuTpd7moyy8XQJAY8hjk0HCxCMTTXMLspnJEh1eKo3P
wcHFP9wKeqzEFtrAfHuxIyJok2fJz3XuiEaTAF3/5KSdwi7h1dJ5UCuY3QJAM9rF
0CGnEWngJKu4MRdSNsP232+7Bb67hOagLJlDyp85keTYKyXmoV7PvvkEsNKtCzRI
yHiTx5KIE6LsK0bNzQJBAMV+1KyI8ua1XmqLDaOexvBPM86HnuP+8u5CthgrXyGm
nh9gurwbs/lBRYV/d4XBLj+dzHb2zC0Jo7u96wrOObw=
-----END RSA PRIVATE KEY-----"""
PUBLIC_KEY = {
"kty": "RSA",
"e": "AQAB",
"kid": "kid-1",
"n": "38rVlUpP-GeqzbvFGewuLZ1PcdL4-RQmo-GsCmZcVYDkdSnHp0KgzsSXa112p0wSW2cIfKZPh6Vr6g1YFZioZ7fjpIBf_4SF2aCTIQzhVc1-dOJImXLt27u5BbPj1HysObOfbZhExT7HgP3yK2nfWFx4rDJDZj1obimCd1n2QGk",
}
def get_pem_bytes(rsa_public_key):
return rsa_public_key.public_bytes(
serialization.Encoding.PEM, serialization.PublicFormat.SubjectPublicKeyInfo
)
class TestAsyncAsymmetricSignatureVerifier(unittest.IsolatedAsyncioTestCase):
@aioresponses()
async def test_async_asymmetric_verifier_fetches_key(self, mocked):
callback, mock = get_callback(200, JWKS_RESPONSE_SINGLE_KEY)
mocked.get(JWKS_URI, callback=callback)
verifier = AsyncAsymmetricSignatureVerifier(JWKS_URI)
key = await verifier._fetch_key("test-key-1")
self.assertEqual(get_pem_bytes(key), RSA_PUB_KEY_1_PEM)
class TestAsyncJwksFetcher(unittest.IsolatedAsyncioTestCase):
@aioresponses()
async def test_async_get_jwks_json_twice_on_cache_expired(self, mocked):
fetcher = AsyncJwksFetcher(JWKS_URI, cache_ttl=1)
callback, mock = get_callback(200, JWKS_RESPONSE_SINGLE_KEY)
mocked.get(JWKS_URI, callback=callback)
mocked.get(JWKS_URI, callback=callback)
key_1 = await fetcher.get_key("test-key-1")
expected_key_1_pem = get_pem_bytes(key_1)
self.assertEqual(expected_key_1_pem, RSA_PUB_KEY_1_PEM)
mock.assert_called_with(
Attrs(path="/.well-known/jwks.json"),
allow_redirects=True,
params=None,
headers=ANY,
timeout=ANY,
)
self.assertEqual(mock.call_count, 1)
time.sleep(2)
# 2 seconds has passed, cache should be expired
key_1 = await fetcher.get_key("test-key-1")
expected_key_1_pem = get_pem_bytes(key_1)
self.assertEqual(expected_key_1_pem, RSA_PUB_KEY_1_PEM)
mock.assert_called_with(
Attrs(path="/.well-known/jwks.json"),
allow_redirects=True,
params=None,
headers=ANY,
timeout=ANY,
)
self.assertEqual(mock.call_count, 2)
@aioresponses()
async def test_async_get_jwks_json_once_on_cache_hit(self, mocked):
fetcher = AsyncJwksFetcher(JWKS_URI, cache_ttl=1)
callback, mock = get_callback(200, JWKS_RESPONSE_MULTIPLE_KEYS)
mocked.get(JWKS_URI, callback=callback)
mocked.get(JWKS_URI, callback=callback)
key_1 = await fetcher.get_key("test-key-1")
key_2 = await fetcher.get_key("test-key-2")
expected_key_1_pem = get_pem_bytes(key_1)
expected_key_2_pem = get_pem_bytes(key_2)
self.assertEqual(expected_key_1_pem, RSA_PUB_KEY_1_PEM)
self.assertEqual(expected_key_2_pem, RSA_PUB_KEY_2_PEM)
mock.assert_called_with(
Attrs(path="/.well-known/jwks.json"),
allow_redirects=True,
params=None,
headers=ANY,
timeout=ANY,
)
self.assertEqual(mock.call_count, 1)
@aioresponses()
async def test_async_fetches_jwks_json_forced_on_cache_miss(self, mocked):
fetcher = AsyncJwksFetcher(JWKS_URI, cache_ttl=1)
callback, mock = get_callback(200, {"keys": [RSA_PUB_KEY_1_JWK]})
mocked.get(JWKS_URI, callback=callback)
# Triggers the first call
key_1 = await fetcher.get_key("test-key-1")
expected_key_1_pem = get_pem_bytes(key_1)
self.assertEqual(expected_key_1_pem, RSA_PUB_KEY_1_PEM)
mock.assert_called_with(
Attrs(path="/.well-known/jwks.json"),
allow_redirects=True,
params=None,
headers=ANY,
timeout=ANY,
)
self.assertEqual(mock.call_count, 1)
callback, mock = get_callback(200, JWKS_RESPONSE_MULTIPLE_KEYS)
mocked.get(JWKS_URI, callback=callback)
# Triggers the second call
key_2 = await fetcher.get_key("test-key-2")
expected_key_2_pem = get_pem_bytes(key_2)
self.assertEqual(expected_key_2_pem, RSA_PUB_KEY_2_PEM)
mock.assert_called_with(
Attrs(path="/.well-known/jwks.json"),
allow_redirects=True,
params=None,
headers=ANY,
timeout=ANY,
)
self.assertEqual(mock.call_count, 1)
@aioresponses()
async def test_async_fetches_jwks_json_once_on_cache_miss(self, mocked):
fetcher = AsyncJwksFetcher(JWKS_URI, cache_ttl=1)
callback, mock = get_callback(200, JWKS_RESPONSE_SINGLE_KEY)
mocked.get(JWKS_URI, callback=callback)
with self.assertRaises(Exception) as err:
await fetcher.get_key("missing-key")
mock.assert_called_with(
Attrs(path="/.well-known/jwks.json"),
allow_redirects=True,
params=None,
headers=ANY,
timeout=ANY,
)
self.assertEqual(
str(err.exception), 'RSA Public Key with ID "missing-key" was not found.'
)
self.assertEqual(mock.call_count, 1)
@aioresponses()
async def test_async_fails_to_fetch_jwks_json_after_retrying_twice(self, mocked):
fetcher = AsyncJwksFetcher(JWKS_URI, cache_ttl=1)
callback, mock = get_callback(500, {})
mocked.get(JWKS_URI, callback=callback)
mocked.get(JWKS_URI, callback=callback)
with self.assertRaises(Exception) as err:
await fetcher.get_key("id1")
mock.assert_called_with(
Attrs(path="/.well-known/jwks.json"),
allow_redirects=True,
params=None,
headers=ANY,
timeout=ANY,
)
self.assertEqual(
str(err.exception), 'RSA Public Key with ID "id1" was not found.'
)
self.assertEqual(mock.call_count, 2)
class TestAsyncTokenVerifier(unittest.IsolatedAsyncioTestCase):
@aioresponses()
async def test_RS256_token_signature_passes(self, mocked):
callback, mock = get_callback(200, {"keys": [PUBLIC_KEY]})
mocked.get(JWKS_URI, callback=callback)
issuer = "https://tokens-test.auth0.com/"
audience = "tokens-test-123"
token = jwt.encode(
{
"iss": issuer,
"sub": "auth0|123456789",
"aud": audience,
"exp": int(time.time()) + 86400,
"iat": int(time.time()),
},
PRIVATE_KEY,
algorithm="RS256",
headers={"kid": "kid-1"},
)
tv = AsyncTokenVerifier(
signature_verifier=AsyncAsymmetricSignatureVerifier(JWKS_URI),
issuer=issuer,
audience=audience,
)
payload = await tv.verify(token)
self.assertEqual(payload["sub"], "auth0|123456789")
@aioresponses()
async def test_RS256_token_signature_fails(self, mocked):
callback, mock = get_callback(
200, {"keys": [RSA_PUB_KEY_1_JWK]}
) # different pub key
mocked.get(JWKS_URI, callback=callback)
issuer = "https://tokens-test.auth0.com/"
audience = "tokens-test-123"
token = jwt.encode(
{
"iss": issuer,
"sub": "auth0|123456789",
"aud": audience,
"exp": int(time.time()) + 86400,
"iat": int(time.time()),
},
PRIVATE_KEY,
algorithm="RS256",
headers={"kid": "test-key-1"},
)
tv = AsyncTokenVerifier(
signature_verifier=AsyncAsymmetricSignatureVerifier(JWKS_URI),
issuer=issuer,
audience=audience,
)
with self.assertRaises(TokenValidationError) as err:
await tv.verify(token)
self.assertEqual(str(err.exception), "Invalid token signature.")
+8
-1
Metadata-Version: 2.1
Name: auth0-python
Version: 3.23.1
Version: 3.24.0
Summary: Auth0 Python SDK

@@ -372,2 +372,8 @@ Home-page: https://github.com/auth0/auth0-python

# To share a session amongst multiple calls to multiple services
async with Auth0('domain', 'mgmt_api_token') as auth0:
user = await auth0.users.get_async(user_id)
connection = await auth0.connections.get_async(connection_id)
# Use asyncify directly on services

@@ -421,2 +427,3 @@ Users = asyncify(Users)

- Blacklists() ( ``Auth0().blacklists`` )
- Branding() ( ``Auth0().branding`` )
- ClientGrants() ( ``Auth0().client_grants`` )

@@ -423,0 +430,0 @@ - Clients() ( ``Auth0().clients`` )

@@ -12,2 +12,3 @@ LICENSE

auth0/v3/authentication/__init__.py
auth0/v3/authentication/async_token_verifier.py
auth0/v3/authentication/authorize_client.py

@@ -27,2 +28,3 @@ auth0/v3/authentication/base.py

auth0/v3/management/actions.py
auth0/v3/management/async_auth0.py
auth0/v3/management/attack_protection.py

@@ -104,2 +106,4 @@ auth0/v3/management/auth0.py

auth0/v3/test_async/__init__.py
auth0/v3/test_async/test_async_auth0.py
auth0/v3/test_async/test_async_token_verifier.py
auth0/v3/test_async/test_asyncify.py

@@ -106,0 +110,0 @@ auth0_python.egg-info/PKG-INFO

+1
-1

@@ -1,1 +0,1 @@

__version__ = "3.23.1"
__version__ = "3.24.0"

@@ -73,7 +73,15 @@ import aiohttp

def set_session(self, session):
"""Set Client Session to improve performance by reusing session.
Args:
session (aiohttp.ClientSession): The client session which should be closed
manually or within context manager.
"""
self._session = session
self._async_client.client.set_session(self._session)
async def __aenter__(self):
"""Automatically create and set session within context manager."""
async_rest_client = self._async_client.client
self._session = aiohttp.ClientSession()
async_rest_client.set_session(self._session)
self.set_session(aiohttp.ClientSession())
return self

@@ -80,0 +88,0 @@

@@ -48,7 +48,7 @@ """Token Verifier module"""

def verify_signature(self, token):
"""Verifies the signature of the given JSON web token.
def _get_kid(self, token):
"""Gets the key id from the kid claim of the header of the token
Args:
token (str): The JWT to get its signature verified.
token (str): The JWT to get the header from.

@@ -58,2 +58,5 @@ Raises:

or the token's signature doesn't match the calculated one.
Returns:
the key id or None
"""

@@ -72,5 +75,15 @@ try:

kid = header.get("kid", None)
secret_or_certificate = self._fetch_key(key_id=kid)
return header.get("kid", None)
def _decode_jwt(self, token, secret_or_certificate):
"""Verifies and decodes the given JSON web token with the given public key or shared secret.
Args:
token (str): The JWT to get its signature verified.
secret_or_certificate (str): The public key or shared secret.
Raises:
TokenValidationError: if the token cannot be decoded, the algorithm is invalid
or the token's signature doesn't match the calculated one.
"""
try:

@@ -87,3 +100,18 @@ decoded = jwt.decode(

def verify_signature(self, token):
"""Verifies the signature of the given JSON web token.
Args:
token (str): The JWT to get its signature verified.
Raises:
TokenValidationError: if the token cannot be decoded, the algorithm is invalid
or the token's signature doesn't match the calculated one.
"""
kid = self._get_kid(token)
secret_or_certificate = self._fetch_key(key_id=kid)
return self._decode_jwt(token, secret_or_certificate)
class SymmetricSignatureVerifier(SignatureVerifier):

@@ -143,2 +171,20 @@ """Verifier for HMAC signatures, which rely on shared secrets.

def _cache_expired(self):
"""Checks if the cache is expired
Returns:
True if it should use the cache.
"""
return self._cache_date + self._cache_ttl < time.time()
def _cache_jwks(self, jwks):
"""Cache the response of the JWKS request
Args:
jwks (dict): The JWKS
"""
self._cache_value = self._parse_jwks(jwks)
self._cache_is_fresh = True
self._cache_date = time.time()
def _fetch_jwks(self, force=False):

@@ -152,19 +198,11 @@ """Attempts to obtain the JWK set from the cache, as long as it's still valid.

"""
has_expired = self._cache_date + self._cache_ttl < time.time()
if not force and not has_expired:
# Return from cache
self._cache_is_fresh = False
if force or self._cache_expired():
self._cache_value = {}
response = requests.get(self._jwks_url)
if response.ok:
jwks = response.json()
self._cache_jwks(jwks)
return self._cache_value
# Invalidate cache and fetch fresh data
self._cache_value = {}
response = requests.get(self._jwks_url)
if response.ok:
# Update cache
jwks = response.json()
self._cache_value = self._parse_jwks(jwks)
self._cache_is_fresh = True
self._cache_date = time.time()
self._cache_is_fresh = False
return self._cache_value

@@ -171,0 +209,0 @@

class Auth0Error(Exception):
def __init__(self, status_code, error_code, message):
def __init__(self, status_code, error_code, message, content=None):
self.status_code = status_code
self.error_code = error_code
self.message = message
self.content = content

@@ -7,0 +8,0 @@ def __str__(self):

@@ -0,5 +1,6 @@

from ..utils import is_async_available
from .actions import Actions
from .attack_protection import AttackProtection
from .auth0 import Auth0
from .blacklists import Blacklists
from .branding import Branding
from .client_grants import ClientGrants

@@ -30,2 +31,7 @@ from .clients import Clients

if is_async_available():
from .async_auth0 import AsyncAuth0 as Auth0
else:
from .auth0 import Auth0
__all__ = (

@@ -36,2 +42,3 @@ "Auth0",

"Blacklists",
"Branding",
"ClientGrants",

@@ -38,0 +45,0 @@ "Clients",

@@ -5,2 +5,3 @@ from ..utils import is_async_available

from .blacklists import Blacklists
from .branding import Branding
from .client_grants import ClientGrants

@@ -36,2 +37,3 @@ from .clients import Clients

"blacklists": Blacklists,
"branding": Branding,
"client_grants": ClientGrants,

@@ -80,18 +82,7 @@ "clients": Clients,

def __init__(self, domain, token, rest_options=None):
if is_async_available():
from ..asyncify import asyncify
for name, cls in modules.items():
cls = asyncify(cls)
setattr(
self,
name,
cls(domain=domain, token=token, rest_options=rest_options),
)
else:
for name, cls in modules.items():
setattr(
self,
name,
cls(domain=domain, token=token, rest_options=rest_options),
)
for name, cls in modules.items():
setattr(
self,
name,
cls(domain=domain, token=token, rest_options=rest_options),
)

@@ -341,3 +341,9 @@ from ..rest import RestClient

# Organization Invitations
def all_organization_invitations(self, id, page=None, per_page=None):
def all_organization_invitations(
self,
id,
page=None,
per_page=None,
include_totals=False,
):
"""Retrieves a list of all the organization invitations.

@@ -354,5 +360,14 @@

include_totals (bool, optional): True if the query summary is
to be included in the result, False otherwise. Defaults to False.
NOTE: returns start and limit, total count is not yet supported
See: https://auth0.com/docs/api/management/v2#!/Organizations/get_invitations
"""
params = {"page": page, "per_page": per_page}
params = {
"page": page,
"per_page": per_page,
"include_totals": str(include_totals).lower(),
}
return self.client.get(self._url(id, "invitations"), params=params)

@@ -359,0 +374,0 @@

import asyncio
import json

@@ -8,5 +7,3 @@ import aiohttp

from .rest import EmptyResponse, JsonResponse, PlainResponse
from .rest import Response as _Response
from .rest import RestClient
from .rest import EmptyResponse, JsonResponse, PlainResponse, RestClient

@@ -13,0 +10,0 @@

@@ -259,2 +259,9 @@ import base64

)
if self._error_code() == "mfa_required":
raise Auth0Error(
status_code=self._status_code,
error_code=self._error_code(),
message=self._error_message(),
content=self._content,
)

@@ -261,0 +268,0 @@ raise Auth0Error(

@@ -42,4 +42,6 @@ import base64

def get_callback(status=200):
mock = MagicMock(return_value=CallbackResult(status=status, payload=payload))
def get_callback(status=200, response=None):
mock = MagicMock(
return_value=CallbackResult(status=status, payload=response or payload)
)

@@ -46,0 +48,0 @@ def callback(url, **kwargs):

@@ -119,2 +119,19 @@ import base64

@mock.patch("requests.post")
def test_post_error_mfa_required(self, mock_post):
ab = AuthenticationBase("auth0.com", telemetry=False)
mock_post.return_value.status_code = 403
mock_post.return_value.text = '{"error": "mfa_required", "error_description": "Multifactor authentication required", "mfa_token": "Fe26...Ha"}'
with self.assertRaises(Auth0Error) as context:
ab.post("the-url", data={"a": "b"}, headers={"c": "d"})
self.assertEqual(context.exception.status_code, 403)
self.assertEqual(context.exception.error_code, "mfa_required")
self.assertEqual(
context.exception.message, "Multifactor authentication required"
)
self.assertEqual(context.exception.content.get("mfa_token"), "Fe26...Ha")
@mock.patch("requests.post")
def test_post_rate_limit_error(self, mock_post):

@@ -121,0 +138,0 @@ ab = AuthenticationBase("auth0.com", telemetry=False)

@@ -375,3 +375,10 @@ import unittest

)
self.assertEqual(kwargs["params"], {"page": None, "per_page": None})
self.assertEqual(
kwargs["params"],
{
"page": None,
"per_page": None,
"include_totals": "false",
},
)

@@ -386,4 +393,30 @@ # Specific pagination

)
self.assertEqual(kwargs["params"], {"page": 7, "per_page": 25})
self.assertEqual(
kwargs["params"],
{
"page": 7,
"per_page": 25,
"include_totals": "false",
},
)
# Return paged collection with paging properties
c.all_organization_invitations(
"test-org", page=7, per_page=25, include_totals=True
)
args, kwargs = mock_instance.get.call_args
self.assertEqual(
"https://domain/api/v2/organizations/test-org/invitations", args[0]
)
self.assertEqual(
kwargs["params"],
{
"page": 7,
"per_page": 25,
"include_totals": "true",
},
)
@mock.patch("auth0.v3.management.organizations.RestClient")

@@ -390,0 +423,0 @@ def test_get_organization_invitation(self, mock_rc):

Metadata-Version: 2.1
Name: auth0-python
Version: 3.23.1
Version: 3.24.0
Summary: Auth0 Python SDK

@@ -372,2 +372,8 @@ Home-page: https://github.com/auth0/auth0-python

# To share a session amongst multiple calls to multiple services
async with Auth0('domain', 'mgmt_api_token') as auth0:
user = await auth0.users.get_async(user_id)
connection = await auth0.connections.get_async(connection_id)
# Use asyncify directly on services

@@ -421,2 +427,3 @@ Users = asyncify(Users)

- Blacklists() ( ``Auth0().blacklists`` )
- Branding() ( ``Auth0().branding`` )
- ClientGrants() ( ``Auth0().client_grants`` )

@@ -423,0 +430,0 @@ - Clients() ( ``Auth0().clients`` )

@@ -346,2 +346,8 @@ |pypi| |build| |coverage| |license|

# To share a session amongst multiple calls to multiple services
async with Auth0('domain', 'mgmt_api_token') as auth0:
user = await auth0.users.get_async(user_id)
connection = await auth0.connections.get_async(connection_id)
# Use asyncify directly on services

@@ -395,2 +401,3 @@ Users = asyncify(Users)

- Blacklists() ( ``Auth0().blacklists`` )
- Branding() ( ``Auth0().branding`` )
- ClientGrants() ( ``Auth0().client_grants`` )

@@ -397,0 +404,0 @@ - Clients() ( ``Auth0().clients`` )