armasec
Advanced tools
| from armasec.armasec import Armasec | ||
| from armasec.openid_config_loader import OpenidConfigLoader | ||
| from armasec.token_decoder import TokenDecoder | ||
| from armasec.token_decoder import TokenDecoder, extract_keycloak_permissions | ||
| from armasec.token_manager import TokenManager | ||
@@ -15,2 +15,3 @@ from armasec.token_payload import TokenPayload | ||
| "OpenidConfigLoader", | ||
| "extract_keycloak_permissions", | ||
| ] |
@@ -5,3 +5,3 @@ """ | ||
| from typing import Any, Dict, List, Optional, Set, Union | ||
| from typing import Any, Dict, List, Optional, Set, Union, Callable | ||
@@ -50,10 +50,11 @@ import snick | ||
| ) | ||
| payload_claim_mapping: Optional[Dict[str, Any]] = Field( | ||
| permission_extractor: Optional[Callable[[Dict[str, Any]], List[str]]] = Field( | ||
| None, | ||
| description=snick.unwrap( | ||
| """ | ||
| Optional mappings that are applied to map claims to top-level properties of | ||
| TokenPayload. See docs for `TokenDecoder` for more info. | ||
| Optional function that may be used to extract permissions from the decoded token | ||
| dictionary when the permissions are not a top-level claim in the token. | ||
| See docs for `TokenDecoder` for more info. | ||
| """ | ||
| ), | ||
| ) |
+68
-27
@@ -10,4 +10,2 @@ """ | ||
| import jmespath | ||
| import buzz | ||
| from jose import jwt | ||
@@ -34,3 +32,3 @@ | ||
| decode_options_override: dict | None = None, | ||
| payload_claim_mapping: dict | None = None, | ||
| permission_extractor: Callable[[dict], list[str]] | None = None, | ||
| ): | ||
@@ -49,26 +47,36 @@ """ | ||
| setting this to `{ "verify_exp": False }` | ||
| payload_claim_mapping: Optional mappings that are applied to map claims to top-level | ||
| attribute of TokenPayload using a dict format of: | ||
| permission_extractor: Optional function that may be used to extract permissions from | ||
| the decoded token dictionary when the permissions are not a | ||
| top-level claim in the token. If not provided, permissions will | ||
| be assumed to be a top-level claim in the token. | ||
| ``` | ||
| { | ||
| "top_level_attribute": "decoded.token.JMESPath" | ||
| } | ||
| ``` | ||
| The values _must_ be a valid JMESPath. | ||
| Consider the example token: | ||
| Consider this example: | ||
| ``` | ||
| { | ||
| "permissions": "resource_access.default.roles" | ||
| "exp": 1728627701, | ||
| "iat": 1728626801, | ||
| "jti": "24fdb7ef-d773-4e6b-982a-b8126dd58af7", | ||
| "sub": "dfa64115-40b5-46ab-924c-c376e73f631d", | ||
| "azp": "my-client", | ||
| "resource_access": { | ||
| "my-client": { | ||
| "roles": [ | ||
| "read:stuff" | ||
| ] | ||
| }, | ||
| }, | ||
| } | ||
| ``` | ||
| The above example would result in a TokenPayload like: | ||
| In this example, the permissions are found at | ||
| `resource_access.my-client.roles`. To produce a TokenPayload | ||
| with the permissions set as expected, you could supply a | ||
| permission extractor like this: | ||
| ``` | ||
| TokenPayload(permissions=token["resource_access"]["default"]["roles"]) | ||
| def my_extractor(decoded_token: dict) -> list[str]: | ||
| resource_key = decoded_token["azp"] | ||
| return decoded_token["resource_access"][resource_key]["roles"] | ||
| ``` | ||
| Raises a 500 if the path does not match | ||
| """ | ||
@@ -79,3 +87,3 @@ self.algorithm = algorithm | ||
| self.decode_options_override = decode_options_override if decode_options_override else {} | ||
| self.payload_claim_mapping = payload_claim_mapping if payload_claim_mapping else {} | ||
| self.permission_extractor = permission_extractor | ||
@@ -135,14 +143,11 @@ def get_decode_key(self, token: str) -> dict: | ||
| with PayloadMappingError.handle_errors( | ||
| "Failed to map decoded token to payload", | ||
| "Failed to map decoded token to TokenPayload", | ||
| do_except=partial(log_error, self.debug_logger), | ||
| ): | ||
| for payload_key, token_jmespath in self.payload_claim_mapping.items(): | ||
| mapped_value = jmespath.search(token_jmespath, payload_dict) | ||
| buzz.require_condition( | ||
| mapped_value is not None, | ||
| f"No matching values found for claim mapping {token_jmespath} -> {payload_key}", | ||
| raise_exc_class=KeyError, | ||
| if self.permission_extractor is not None: | ||
| self.debug_logger("Attempting to extract permissions.") | ||
| payload_dict["permissions"] = self.permission_extractor(payload_dict) | ||
| self.debug_logger( | ||
| f"Payload dictionary with extracted permissions is {payload_dict}" | ||
| ) | ||
| payload_dict[payload_key] = mapped_value | ||
| self.debug_logger(f"Mapped payload dictionary is {payload_dict}") | ||
@@ -156,1 +161,37 @@ self.debug_logger("Attempting to convert to TokenPayload") | ||
| return token_payload | ||
| def extract_keycloak_permissions(decoded_token: dict) -> list[str]: | ||
| """ | ||
| Provide a permission extractor for Keycloak. | ||
| By default, Keycloak packages the roles for a given client | ||
| nested within the "resource_access" claim. In order to extract | ||
| those roles into the expected permissions in the TokenPayload, | ||
| this permission_extractor can be used. | ||
| Here is an example decoded token from Keycloak (with some stuff | ||
| removed to improve readability): | ||
| ``` | ||
| { | ||
| "exp": 1728627701, | ||
| "iat": 1728626801, | ||
| "jti": "24fdb7ef-d773-4e6b-982a-b8126dd58af7", | ||
| "sub": "dfa64115-40b5-46ab-924c-c376e73f631d", | ||
| "azp": "my-client", | ||
| "resource_access": { | ||
| "my-client": { | ||
| "roles": [ | ||
| "read:stuff" | ||
| ] | ||
| }, | ||
| }, | ||
| } | ||
| ``` | ||
| This extractor would extract the roles `["read:stuff"]` as the | ||
| permissions for the TokenPayload returned by the TokenDecoder. | ||
| """ | ||
| resource_key = decoded_token["azp"] | ||
| return decoded_token["resource_access"][resource_key]["roles"] |
@@ -228,3 +228,3 @@ """ | ||
| debug_logger=self.debug_logger, | ||
| payload_claim_mapping=domain_config.payload_claim_mapping, | ||
| permission_extractor=domain_config.permission_extractor, | ||
| ) | ||
@@ -231,0 +231,0 @@ return TokenManager( |
+1
-2
| Metadata-Version: 2.1 | ||
| Name: armasec | ||
| Version: 2.0.3 | ||
| Version: 2.1.0 | ||
| Summary: Injectable FastAPI auth via OIDC | ||
@@ -21,3 +21,2 @@ Home-page: https://github.com/omnivector-solutions/armasec | ||
| Requires-Dist: httpx (>=0,<1) | ||
| Requires-Dist: jmespath (>=1.0.1,<2.0.0) | ||
| Requires-Dist: loguru (>=0.5.3,<0.6.0) ; extra == "cli" | ||
@@ -24,0 +23,0 @@ Requires-Dist: pendulum (>=3.0.0,<4.0.0) ; extra == "cli" |
+3
-12
| [tool.poetry] | ||
| name = "armasec" | ||
| version = "2.0.3" | ||
| version = "2.1.0" | ||
| description = "Injectable FastAPI auth via OIDC" | ||
@@ -30,2 +30,3 @@ authors = ["Omnivector Engineering Team <info@omnivector.solutions>"] | ||
| py-buzz = "^4.1" | ||
| pluggy = "^1.4.0" | ||
@@ -43,4 +44,2 @@ # These must be included as a main dependency for the pytest extension to work out of the box | ||
| pyperclip = {version = "^1.8.2", optional = true} | ||
| jmespath = "^1.0.1" | ||
| pluggy = "^1.4.0" | ||
@@ -50,3 +49,2 @@ [tool.poetry.extras] | ||
| [tool.poetry.group.dev.dependencies] | ||
@@ -69,5 +67,3 @@ ipython = ">=7,<9" | ||
| ruff = "^0.3" | ||
| types-jmespath = "^1.0.2.7" | ||
| [tool.poetry.scripts] | ||
@@ -88,8 +84,3 @@ armasec = {callable = "armasec_cli.main:app", extras = ["cli"]} | ||
| [[tool.mypy.overrides]] | ||
| module = [ | ||
| "jose", | ||
| "buzz", | ||
| "snick", | ||
| "auto_name_enum", | ||
| ] | ||
| module = ["jose"] | ||
| ignore_missing_imports = true | ||
@@ -96,0 +87,0 @@ |
Alert delta unavailable
Currently unable to show alert delta for PyPI packages.
96431
1.78%2195
1.76%