xeauth
Advanced tools
| import requests | ||
| from .settings import config | ||
| def get_active_usernames(): | ||
| import utilix | ||
| users_db = utilix.xent_collection(collection="users") | ||
| query = { | ||
| "active": {"$in": ["True", "true", True, 1]}, | ||
| "github": {"$nin": ["", None]}, | ||
| } | ||
| projection = {"github": 1, "_id": 0} | ||
| user_docs = users_db.find(query, projection) | ||
| return [doc["github"] for doc in user_docs] | ||
| def get_group_usernames(groupname): | ||
| import utilix | ||
| users_db = utilix.xent_collection(collection="users") | ||
| query = { | ||
| "active": {"$in": ["True", "true", True, 1]}, | ||
| "github": {"$nin": ["", None]}, | ||
| "groups": groupname, | ||
| } | ||
| projection = {"github": 1, "groups": 1, "_id": 0} | ||
| user_docs = users_db.find(query, projection) | ||
| return [doc["github"] for doc in user_docs if groupname in doc["groups"]] | ||
| def get_user_keys(username, token=config.GITHUB_TOKEN, validate=True): | ||
| headers = {} | ||
| if token is not None: | ||
| headers["Authorization"] = f"Bearer {token}" | ||
| r = requests.get( | ||
| f"https://api.github.com/users/{username}/gpg_keys", headers=headers | ||
| ) | ||
| if not r.ok: | ||
| return [] | ||
| keys = r.json() | ||
| if not isinstance(keys, list): | ||
| return [] | ||
| if isinstance(keys, list): | ||
| return [key for key in keys if key["can_encrypt_storage"] and key["raw_key"]] | ||
| return [] | ||
| def iter_valid_keys(token=config.GITHUB_TOKEN): | ||
| usernames = get_active_usernames() | ||
| for username in usernames: | ||
| for key in get_user_keys(username, token=token): | ||
| if not key["can_encrypt_storage"]: | ||
| continue | ||
| if not key["raw_key"]: | ||
| continue | ||
| key["username"] = username | ||
| yield key | ||
| def get_all_valid_keys(token=config.GITHUB_TOKEN): | ||
| keys = list(iter_valid_keys(token=token)) | ||
| return keys | ||
| def iter_valid_user_keys(token=config.GITHUB_TOKEN): | ||
| usernames = get_active_usernames() | ||
| for username in usernames: | ||
| keys = get_user_keys(username, token=token) | ||
| keys = [key for key in keys if key["can_encrypt_storage"] and key["raw_key"]] | ||
| yield username, keys | ||
| def import_key(keydata, gnuhome=None): | ||
| import gnupg | ||
| gpg = gnupg.GPG(homedir=gnuhome) | ||
| gpg.import_keys(keydata) |
+194
| from contextlib import contextmanager | ||
| import httpx | ||
| import param | ||
| import time | ||
| from rich.console import Console | ||
| from .settings import config | ||
| class GitHubApi(param.Parameterized): | ||
| API_URL = 'https://api.github.com' | ||
| oauth_token = param.String() | ||
| @contextmanager | ||
| def Client(self, *args, **kwargs): | ||
| kwargs["headers"] = kwargs.get("headers", {}) | ||
| kwargs["headers"]["Authorization"] = f"Bearer {self.oauth_token}" | ||
| kwargs["headers"]["Accept"] = "application/json" | ||
| client = httpx.Client(*args, base_url=self.API_URL, **kwargs) | ||
| try: | ||
| yield client | ||
| finally: | ||
| client.close() | ||
| def get(self, path, *args, **kwargs): | ||
| with self.Client() as client: | ||
| response = client.get(path, *args, **kwargs) | ||
| response.raise_for_status() | ||
| return response.json() | ||
| def post(self, path, *args, **kwargs): | ||
| with self.Client() as client: | ||
| response = client.post(path, *args, **kwargs) | ||
| response.raise_for_status() | ||
| return response.json() | ||
| @property | ||
| def profile(self): | ||
| return self.get('/user') | ||
| @property | ||
| def username(self): | ||
| return self.profile.get('login') | ||
| @property | ||
| def organizations(self): | ||
| return [org.get('login') for org in self.get('/user/orgs')] | ||
| @property | ||
| def teams(self): | ||
| orgs = self.organizations | ||
| teams = [] | ||
| for org in orgs: | ||
| teams.extend([team.get('name') for team in self.get(f'/orgs/{org}/teams')]) | ||
| return teams | ||
| @property | ||
| def repositories(self): | ||
| return [repo.get('name') for repo in self.get('/user/repos')] | ||
| @property | ||
| def starred_repositories(self): | ||
| return [repo.get('name') for repo in self.get('/user/starred')] | ||
| @property | ||
| def followers(self): | ||
| return [user.get('login') for user in self.get('/user/followers')] | ||
| @property | ||
| def gpg_keys(self): | ||
| return self.get('/user/gpg_keys') | ||
| class GitHubDeviceCode(param.Parameterized): | ||
| """GitHub device code authentication. | ||
| """ | ||
| base_url = param.String(default='https://github.com/login', doc='Github auth URL') | ||
| client_id = param.String(doc='GitHub App client ID', default=config.DEFAULT_CLIENT_ID) | ||
| device_code = param.String(doc='GitHub device code') | ||
| user_code = param.String(doc='GitHub user code') | ||
| verification_uri = param.String(doc='GitHub verification URI') | ||
| expires = param.Number(doc='Expiration time of the device code') | ||
| interval = param.Integer(doc='Interval between polling requests') | ||
| @classmethod | ||
| def from_response_data(cls, client_id, data): | ||
| return cls(**data) | ||
| def open_browser(self): | ||
| import webbrowser | ||
| webbrowser.open(self.verification_uri) | ||
| @property | ||
| def prompt(self): | ||
| return f'Go to {self.verification_uri} and enter the code: {self.user_code}' | ||
| def await_token(self): | ||
| while True: | ||
| if time.time() >= self.expires: | ||
| raise Exception("Authentication timed out. Please try again.") | ||
| access_token = self.check_for_access_token() | ||
| if access_token is not None: | ||
| return access_token | ||
| time.sleep(self.interval) | ||
| def check_for_access_token(self): | ||
| with httpx.Client(base_url=self.base_url) as client: | ||
| response = client.post( | ||
| '/oauth/access_token', | ||
| data = { | ||
| 'client_id': self.client_id, | ||
| 'device_code': self.device_code, | ||
| 'grant_type': 'urn:ietf:params:oauth:grant-type:device_code', | ||
| }, | ||
| headers = {"Accept": "application/json"}, | ||
| ) | ||
| response.raise_for_status() | ||
| data = response.json() | ||
| return data.get("access_token", None) | ||
| class GithubAuth(param.Parameterized): | ||
| BASE_URL = 'https://github.com/login' | ||
| DEFAULT_SCOPES = ("read:org", "read:user") | ||
| oauth_token = param.String() | ||
| @classmethod | ||
| def get_device_code(cls, client_id=None, scopes=None): | ||
| if client_id is None: | ||
| client_id = config.DEFAULT_CLIENT_ID | ||
| if client_id is None: | ||
| raise ValueError("client_id must be provided") | ||
| if scopes is None: | ||
| scopes = cls.DEFAULT_SCOPES | ||
| data = {'client_id': client_id} | ||
| if scopes is not None: | ||
| data['scope'] = ' '.join(scopes) | ||
| with httpx.Client(base_url=cls.BASE_URL) as client: | ||
| response = client.post( | ||
| '/device/code', | ||
| data=data, | ||
| headers = {"Accept": "application/json"}, | ||
| ) | ||
| response.raise_for_status() | ||
| data = response.json() | ||
| data['expires'] = time.time() + data.pop('expires_in', 900) | ||
| data['client_id'] = client_id | ||
| data['base_url'] = cls.BASE_URL | ||
| return GitHubDeviceCode(**data) | ||
| @classmethod | ||
| def device_login(cls, client_id, scopes=None, console=None): | ||
| if console is None: | ||
| console = Console() | ||
| code = cls.get_device_code(client_id, scopes=scopes) | ||
| console.print(code.prompt) | ||
| token = code.await_token() | ||
| return cls(client_id=client_id, oauth_token=token) | ||
| @contextmanager | ||
| def Client(self, *args, **kwargs): | ||
| kwargs["headers"] = kwargs.get("headers", {}) | ||
| kwargs["headers"]["Authorization"] = f"Bearer {self.oauth_token}" | ||
| kwargs["headers"]["Accept"] = "application/json" | ||
| client = httpx.Client(*args, **kwargs) | ||
| try: | ||
| yield client | ||
| finally: | ||
| client.close() | ||
| @property | ||
| def api(self): | ||
| return GitHubApi(oauth_token=self.oauth_token) | ||
| @property | ||
| def xenonnt_member(self): | ||
| return "XENONnT" in self.api.organizations | ||
| @property | ||
| def xenon1t_member(self): | ||
| return "XENON1T" in self.api.organizations | ||
| @property | ||
| def xenon_member(self): | ||
| return self.xenonnt_member or self.xenon1t_member |
| import param | ||
| from .github import GithubAuth | ||
| from .settings import config | ||
| class XenonUser(param.Parameterized): | ||
| """ | ||
| XenonUser is a class that represents a user of the Xenon platform. | ||
| """ | ||
| username = param.String(doc="The Xenon username of the user.") | ||
| email = param.String(doc="The email address of the user.") | ||
| full_name = param.String(doc="The full name of the user.", default=None) | ||
| github = param.String(doc="The Github username of the user.", default=None) | ||
| cell = param.String(doc="The cell phone number of the user.", default=None) | ||
| groups = param.List(doc="The groups the user is a member of.", default=[]) | ||
| institute = param.String(doc="The institute the user belongs to.", default=None) | ||
| picture_url = param.String(doc="The URL of the user's picture.", default=None) | ||
| lngs_ldap_email = param.String(doc="The email address of the user.", default=None) | ||
| lngs_ldap_cn = param.String(doc="The common name of the user.", default=None) | ||
| lngs_ldap_uid = param.String(doc="The UID of the user.", default=None) | ||
| active = param.Boolean(doc="Whether the user is active.", default=None) | ||
| first_name = param.String(doc="The first name of the user.", default=None) | ||
| last_name = param.String(doc="The last name of the user.", default=None) | ||
| github_orgs = param.List(doc="The Github organizations the user is a member of.", default=[]) | ||
| github_teams = param.List(doc="The Github teams the user is a member of.", default=[]) | ||
| @classmethod | ||
| def from_github_token(cls, token): | ||
| """ | ||
| Creates a XenonUser from a Github token. | ||
| Args: | ||
| token (str): The Github token. | ||
| Returns: | ||
| XenonUser: The XenonUser. | ||
| """ | ||
| users_db = config.mongo_collection('users') | ||
| api = GithubAuth(oauth_token=token).api | ||
| github_user = api.username | ||
| teams = api.teams | ||
| orgs = api.organizations | ||
| data = users_db.find_one({'github': github_user}) | ||
| if data is None: | ||
| raise ValueError(f'User {github_user} not found in Xenon database.') | ||
| data['full_name'] = data.get('name', data.get('first_name', '') + ' ' + data.get('last_name', '')) | ||
| data['active'] = data.get('active', False) in [True, 'True', 'true'] | ||
| data['github_teams'] = teams | ||
| data['github_orgs'] = orgs | ||
| data = {k: v for k,v in data.items() if k in cls.param.params()} | ||
| return cls(**data) |
+8
-4
| Metadata-Version: 2.1 | ||
| Name: xeauth | ||
| Version: 0.1.22 | ||
| Version: 0.2.0 | ||
| Summary: Top-level package for xeauth. | ||
@@ -9,3 +9,3 @@ Home-page: https://github.com/jmosbacher/xeauth | ||
| Author-email: joe.mosbacher@gmail.com | ||
| Requires-Python: >=3.8,<3.11 | ||
| Requires-Python: >=3.7,<4 | ||
| Classifier: Development Status :: 2 - Pre-Alpha | ||
@@ -16,10 +16,14 @@ Classifier: Intended Audience :: Developers | ||
| Classifier: Programming Language :: Python :: 3 | ||
| Classifier: Programming Language :: Python :: 3.7 | ||
| Classifier: Programming Language :: Python :: 3.8 | ||
| Classifier: Programming Language :: Python :: 3.9 | ||
| Classifier: Programming Language :: Python :: 3.10 | ||
| Classifier: Programming Language :: Python :: 3.11 | ||
| Classifier: Programming Language :: Python :: 3.8 | ||
| Classifier: Programming Language :: Python :: 3.9 | ||
| Requires-Dist: Authlib (>=1.0.0,<2.0.0) | ||
| Requires-Dist: appdirs (>=1.4.4,<2.0.0) | ||
| Requires-Dist: click | ||
| Requires-Dist: gnupg (>=2.3.1,<3.0.0) | ||
| Requires-Dist: httpx (>=0.19,<0.23) | ||
| Requires-Dist: param (>=1.12.0,<2.0.0) | ||
| Requires-Dist: rich (>=13.1.0,<14.0.0) | ||
| Description-Content-Type: text/x-rst | ||
@@ -26,0 +30,0 @@ |
+4
-3
| [tool] | ||
| [tool.poetry] | ||
| name = "xeauth" | ||
| version = "0.1.22" | ||
| version = "0.2.0" | ||
| homepage = "https://github.com/jmosbacher/xeauth" | ||
@@ -25,8 +25,9 @@ description = "Top-level package for xeauth." | ||
| [tool.poetry.dependencies] | ||
| python = ">=3.8,<3.11" | ||
| python = ">=3.7,<4" | ||
| click = "*" | ||
| Authlib = "^1.0.0" | ||
| httpx = ">=0.19,<0.23" | ||
| appdirs = "^1.4.4" | ||
| param = "^1.12.0" | ||
| gnupg = "^2.3.1" | ||
| rich = "^13.1.0" | ||
@@ -33,0 +34,0 @@ [tool.poetry.dev-dependencies] |
+9
-8
@@ -5,3 +5,3 @@ # -*- coding: utf-8 -*- | ||
| packages = \ | ||
| ['tests', 'xeauth', 'xeauth.integrations'] | ||
| ['tests', 'xeauth'] | ||
@@ -12,7 +12,8 @@ package_data = \ | ||
| install_requires = \ | ||
| ['Authlib>=1.0.0,<2.0.0', | ||
| 'appdirs>=1.4.4,<2.0.0', | ||
| ['appdirs>=1.4.4,<2.0.0', | ||
| 'click', | ||
| 'gnupg>=2.3.1,<3.0.0', | ||
| 'httpx>=0.19,<0.23', | ||
| 'param>=1.12.0,<2.0.0'] | ||
| 'param>=1.12.0,<2.0.0', | ||
| 'rich>=13.1.0,<14.0.0'] | ||
@@ -26,3 +27,3 @@ entry_points = \ | ||
| 'name': 'xeauth', | ||
| 'version': '0.1.22', | ||
| 'version': '0.2.0', | ||
| 'description': 'Top-level package for xeauth.', | ||
@@ -32,4 +33,4 @@ 'long_description': '======\nxeauth\n======\n\n\n.. image:: https://img.shields.io/pypi/v/xeauth.svg\n :target: https://pypi.python.org/pypi/xeauth\n\n.. image:: https://img.shields.io/travis/jmosbacher/xeauth.svg\n :target: https://travis-ci.com/jmosbacher/xeauth\n\n.. image:: https://readthedocs.org/projects/xeauth/badge/?version=latest\n :target: https://xeauth.readthedocs.io/en/latest/?badge=latest\n :alt: Documentation Status\n\n\n\n\nAuthentication client for the Xenon edark matter experiment.\n\n\n* Free software: MIT\n* Documentation: https://xeauth.readthedocs.io.\n\n\nFeatures\n--------\n\n* TODO\n\nCredits\n-------\n\nThis package was created with Cookiecutter_ and the `briggySmalls/cookiecutter-pypackage`_ project template.\n\n.. _Cookiecutter: https://github.com/audreyr/cookiecutter\n.. _`briggySmalls/cookiecutter-pypackage`: https://github.com/briggySmalls/cookiecutter-pypackage\n', | ||
| 'author_email': 'joe.mosbacher@gmail.com', | ||
| 'maintainer': None, | ||
| 'maintainer_email': None, | ||
| 'maintainer': 'None', | ||
| 'maintainer_email': 'None', | ||
| 'url': 'https://github.com/jmosbacher/xeauth', | ||
@@ -40,3 +41,3 @@ 'packages': packages, | ||
| 'entry_points': entry_points, | ||
| 'python_requires': '>=3.8,<3.11', | ||
| 'python_requires': '>=3.7,<4', | ||
| } | ||
@@ -43,0 +44,0 @@ |
@@ -33,5 +33,5 @@ #!/usr/bin/env python | ||
| assert result.exit_code == 0 | ||
| assert 'xeauth.cli.main' in result.output | ||
| help_result = runner.invoke(cli.main, ['--help']) | ||
| assert "xeauth.cli.main" in result.output | ||
| help_result = runner.invoke(cli.main, ["--help"]) | ||
| assert help_result.exit_code == 0 | ||
| assert '--help Show this message and exit.' in help_result.output | ||
| assert "--help Show this message and exit." in help_result.output |
| """Top-level package for xeauth.""" | ||
| from .xeauth import * | ||
| from .settings import config | ||
| from . import admin, github, user, utils | ||
| __all__ = ["config", "admin", "github", "user", "utils"] | ||
| __author__ = """Yossi Mosbacher""" | ||
| __email__ = 'joe.mosbacher@gmail.com' | ||
| __version__ = '0.1.22' | ||
| __email__ = "joe.mosbacher@gmail.com" | ||
| __version__ = "0.2.0" |
+117
-9
| """Console script for xeauth.""" | ||
| import os | ||
| import sys | ||
| import xeauth | ||
| import click | ||
| import httpx | ||
| import pathlib | ||
| from rich.console import Console | ||
| from rich.progress import track | ||
| from rich.live import Live | ||
| from rich.table import Table | ||
| @click.command() | ||
| @click.group() | ||
| def main(): | ||
| """Console script for xeauth.""" | ||
| click.echo("Replace this message by putting your code into " | ||
| "xeauth.cli.main") | ||
| click.echo("See click documentation at https://click.palletsprojects.com/") | ||
| return 0 | ||
| with click.get_current_context() as ctx: | ||
| if ctx.invoked_subcommand is None: | ||
| click.echo(ctx.get_help()) | ||
| @click.command() | ||
| @main.group() | ||
| def secrets(): | ||
| with click.get_current_context() as ctx: | ||
| if ctx.invoked_subcommand is None: | ||
| click.echo(ctx.get_help()) | ||
| @main.group() | ||
| def admin(): | ||
| with click.get_current_context() as ctx: | ||
| if ctx.invoked_subcommand is None: | ||
| click.echo(ctx.get_help()) | ||
| @main.command() | ||
| def login(): | ||
| xeauth.cli_login() | ||
| token = xeauth.cli_login() | ||
| token.to_file(xeauth.config.TOKEN_FILE) | ||
| click.echo(f"Token saved to: {xeauth.config.TOKEN_FILE}") | ||
| @secrets.command() | ||
| @click.option("--name", prompt="Name", help="Your full name.") | ||
| @click.option( | ||
| "--email", prompt="Email", help="Your email address as it is registered on github." | ||
| ) | ||
| @click.option("--passphrase", prompt="Passphrase", help="Your passphrase for the key.") | ||
| @click.option("--gnu-home", default=None, help="Your gnu home directory.") | ||
| def key_gen(name, email, passphrase, gnu_home): | ||
| HELP_LINK = "https://docs.github.com/en/authentication/managing-commit-signature-verification/adding-a-gpg-key-to-your-github-account" | ||
| key = xeauth.utils.new_gpg_key(name, email, passphrase, gnu_home) | ||
| click.echo("Key generated:") | ||
| click.echo(key) | ||
| click.echo(f"Please see {HELP_LINK} for help adding the key to github.") | ||
| @admin.command() | ||
| @click.option("--filename", default=None) | ||
| def list_usernames(filename): | ||
| console = Console() | ||
| with console.status("Fetching active user list..."): | ||
| usernames = xeauth.admin.get_active_usernames() | ||
| if filename is not None: | ||
| with console.status("Saving user list to file..."): | ||
| with open(filename, "w") as f: | ||
| for username in usernames: | ||
| f.write(f"{username}\n") | ||
| click.echo(f"Active usernames written to file {filename}.") | ||
| else: | ||
| click.echo("Active usernames:") | ||
| for username in usernames: | ||
| click.echo(username) | ||
| @admin.command() | ||
| @click.option( | ||
| "--token", | ||
| prompt="Github token", | ||
| default=xeauth.config.GITHUB_TOKEN, | ||
| help="A valid github API token.", | ||
| ) | ||
| @click.option("--path", | ||
| prompt="Destination folder", | ||
| default="~/xenon_user_keys", | ||
| ) | ||
| @click.option("--import_keys", is_flag=True, | ||
| default=True, | ||
| ) | ||
| @click.option("--gnuhome", | ||
| prompt="GPG home path", | ||
| default="~/.gnupg", | ||
| ) | ||
| def fetch_keys(token, path, import_keys, gnuhome): | ||
| if path: | ||
| path = pathlib.Path(path) | ||
| path = path.expanduser() | ||
| console = Console() | ||
| with console.status("Fetching active user list..."): | ||
| usernames = xeauth.admin.get_active_usernames() | ||
| table = Table() | ||
| table.add_column("Username") | ||
| table.add_column("Public Key") | ||
| with Live(table, refresh_per_second=4, console=console): | ||
| for username in track(usernames, description="Fetching keys..."): | ||
| keys = xeauth.admin.get_user_keys(username, token=token) | ||
| for key in keys: | ||
| table.add_row(username, key["raw_key"]) | ||
| key["username"] = username | ||
| folder = path / key["username"] | ||
| if not folder.exists(): | ||
| folder.mkdir(parents=True) | ||
| filename = folder / key["key_id"] | ||
| with open(filename, "w") as f: | ||
| f.write(key["raw_key"]) | ||
| if import_keys: | ||
| xeauth.admin.import_key(key['raw_key'], gnuhome=gnuhome) | ||
| console.print(f"Valid keys written to folder {path.absolute()}.") | ||
| if __name__ == "__main__": | ||
| sys.exit(main()) # pragma: no cover | ||
| main() # pragma: no cover |
+47
-34
@@ -6,5 +6,6 @@ import param | ||
| from appdirs import AppDirs | ||
| import unittest | ||
| DIRS = AppDirs("xeauth", "XENON") | ||
| DIRS = AppDirs("xeauth", "xenon") | ||
| CACHE_DIR = DIRS.user_cache_dir | ||
@@ -14,36 +15,33 @@ if not os.path.isdir(CACHE_DIR): | ||
| DEFAULT_TOKEN_FILE = os.path.join(CACHE_DIR, f"{getpass.getuser()}_xetoken.json") | ||
| class ConfigParameter(param.Parameter): | ||
| __slots__ = ["env_prefix", "klass"] | ||
| class EnvConfigurable(param.Parameterized): | ||
| """ | ||
| A class that can be used to configure its parameters from the environment. | ||
| """ | ||
| def __init__(self, klass, env_prefix="", **kwargs): | ||
| super().__init__(**kwargs) | ||
| self.env_prefix = env_prefix | ||
| self.klass = klass | ||
| def _set_names(self, attrib_name): | ||
| env_name = attrib_name.upper() | ||
| env_name = self.env_prefix.upper() + "_" + env_name | ||
| if os.getenv(env_name, ""): | ||
| env = os.getenv(env_name, "") | ||
| try: | ||
| env = json.loads(env) | ||
| except Exception as e: | ||
| pass | ||
| self.default = self.klass(env) | ||
| super()._set_names(attrib_name) | ||
| @classmethod | ||
| def from_env(cls, prefix="", **overrides): | ||
| params = {} | ||
| for name in cls.param.params(): | ||
| env_name = "_".join([prefix.upper() + name.upper()]) | ||
| val = os.getenv(env_name, None) | ||
| if val: | ||
| try: | ||
| val = json.loads(val) | ||
| except Exception as e: | ||
| pass | ||
| params[name] = val | ||
| class Config(param.Parameterized): | ||
| OAUTH_DOMAIN = ConfigParameter(str, env_prefix="xeauth", default="https://xenon-experiment.eu.auth0.com/oauth") | ||
| OAUTH_CODE_PATH = ConfigParameter(str, env_prefix="xeauth", default="/device/code") | ||
| OAUTH_TOKEN_PATH = ConfigParameter(str, env_prefix="xeauth", default="/token") | ||
| OAUTH_CERT_PATH = ConfigParameter(str, env_prefix="xeauth", default="/.well-known/jwks.json") | ||
| AUTH0_SUBDOMAIN = ConfigParameter(str, env_prefix="xeauth", default="xenon-experiment.eu") | ||
| DEFAULT_CLIENT_ID = ConfigParameter(str, env_prefix="xeauth", default="EC3adX50KdNHQuEmib30GCRDTFDibMK7") | ||
| DEFAULT_AUDIENCE = ConfigParameter(str, env_prefix="xeauth", default="https://users.xenonnt.org") | ||
| DEFAULT_SCOPE = ConfigParameter(str, env_prefix="xeauth", default="openid profile email offline_access") | ||
| DEBUG = ConfigParameter(bool, env_prefix="xeauth", default=False) | ||
| params.update(overrides) | ||
| return cls(**params) | ||
| class Config(EnvConfigurable): | ||
| DEFAULT_CLIENT_ID = param.String(default="4a7b1485afcfcfa45271") | ||
| MONGO_URI = param.String(default="mongodb://localhost:27017") | ||
| MONGO_USER = param.String(default="") | ||
| MONGO_PASSWORD = param.String(default="") | ||
| DEBUG = param.Boolean(default=False) | ||
| MAX_LOG_SIZE = 20 | ||
@@ -53,5 +51,20 @@ MAX_MESSAGES = 3 | ||
| GUI_WIDTH = 600 | ||
| DEFAULT_AVATAR = "http://www.sibberhuuske.nl/wp-content/uploads/2016/10/default-avatar.png" | ||
| TOKEN_FILE = ConfigParameter(str, env_prefix="xeauth", default=DEFAULT_TOKEN_FILE) | ||
| DEFAULT_AVATAR = ( | ||
| "http://www.sibberhuuske.nl/wp-content/uploads/2016/10/default-avatar.png" | ||
| ) | ||
| TOKEN_FILE = param.String(default=DEFAULT_TOKEN_FILE) | ||
| GITHUB_TOKEN = param.String(default="") | ||
| def mongo_collection(self, collection_name, database='xenonnt'): | ||
| try: | ||
| from utilix import xent_collection | ||
| return xent_collection(collection_name, database=database) | ||
| except: | ||
| from pymongo import MongoClient | ||
| client = MongoClient(self.MONGO_URI) | ||
| db = client[database] | ||
| if self.MONGO_USER and self.MONGO_PASSWORD: | ||
| db.authenticate(self.MONGO_USER, self.MONGO_PASSWORD) | ||
| return db[collection_name] | ||
| config = Config() | ||
| config = Config.from_env(prefix="XEAUTH") |
+25
-5
@@ -1,3 +0,1 @@ | ||
| def url_link_button(url, label="Authenticate", **kwargs): | ||
@@ -15,6 +13,6 @@ import panel as pn | ||
| id_token = pn.state.cookies.get('id_token') | ||
| id_token = pn.state.cookies.get("id_token") | ||
| if id_token is None or pn.config.cookie_secret is None: | ||
| return None | ||
| id_token = decode_signed_value(pn.config.cookie_secret, 'id_token', id_token) | ||
| id_token = decode_signed_value(pn.config.cookie_secret, "id_token", id_token) | ||
| if pn.state.encryption is None: | ||
@@ -24,2 +22,24 @@ id_token = id_token | ||
| id_token = pn.state.encryption.decrypt(id_token) | ||
| return id_token.decode() | ||
| return id_token.decode() | ||
| def new_gpg_key(name, email, passphrase="", gnu_home=None): | ||
| import gnupg | ||
| gpg = gnupg.GPG(gnupghome=gnu_home) | ||
| arg = gpg.gen_key_input( | ||
| key_type="RSA", | ||
| key_length=2048, | ||
| name_real=name, | ||
| passphrase=passphrase, | ||
| name_email=email, | ||
| ) | ||
| key = gpg.gen_key(arg) | ||
| if not key.status == "ok": | ||
| raise RuntimeError("Unable to create key.") | ||
| public_key = gpg.export_keys(key.fingerprint, False) | ||
| return public_key |
| import param | ||
| import httpx | ||
| import authlib | ||
| from authlib import jose | ||
| from authlib.jose import KeySet | ||
| import time | ||
| from contextlib import contextmanager | ||
| import logging | ||
| from .settings import config | ||
| logger = logging.getLogger(__name__) | ||
| class XeKeySet(param.Parameterized): | ||
| oauth_domain = param.String(config.OAUTH_DOMAIN) | ||
| cert_path = param.String(config.OAUTH_CERT_PATH) | ||
| _keyset = param.ClassSelector(KeySet, default=KeySet({})) | ||
| _keys_timestamp = param.Number(0) | ||
| _keys_ttl = param.Number(300) | ||
| def fetch_keys(self, headers={}): | ||
| with httpx.Client(base_url=self.oauth_domain, headers=headers) as client: | ||
| r = client.get(self.cert_path) | ||
| r.raise_for_status() | ||
| keys = r.json() | ||
| self._keyset = jose.JsonWebKey.import_key_set(keys) | ||
| def extract_claims(self, token): | ||
| header_str = authlib.common.encoding.urlsafe_b64decode(token.split(".")[0].encode()).decode('utf-8') | ||
| header = authlib.common.encoding.json_loads(header_str) | ||
| key = self.find_by_kid(header["kid"]) | ||
| return jose.jwt.decode(token, key) | ||
| def extract_verified_claims(self, token, options={}): | ||
| try: | ||
| claims = self.extract_claims(token) | ||
| claims.options = options | ||
| claims.validate() | ||
| return claims | ||
| except Exception as e: | ||
| logger.error(f"Exception raised while validating claims: {e}") | ||
| return jose.JWTClaims("", "") | ||
| def validate_claims(self, token, **required_claims): | ||
| options = {k: {"value": v, "essential": True} for k,v in required_claims.items()} | ||
| claims = self.extract_claims(token) | ||
| claims.options = options | ||
| claims.validate() | ||
| def find_by_kid(self, kid): | ||
| if kid not in self._keyset.keys: | ||
| self.fetch_keys() | ||
| return self._keyset.find_by_kid(kid) | ||
| def __getitem__(self, kid): | ||
| return self.find_by_kid(kid) | ||
| certs = XeKeySet() |
| import time | ||
| import param | ||
| import httpx | ||
| from .settings import config | ||
| from .token import XeToken | ||
| from .oauth import XeAuthStep | ||
| class XeTokenRequest(XeAuthStep): | ||
| oauth_domain = param.String(config.OAUTH_DOMAIN) | ||
| oauth_token_path = param.String(config.OAUTH_TOKEN_PATH) | ||
| user_code = param.String() | ||
| device_code = param.String() | ||
| client_id = param.String() | ||
| headers = param.Dict() | ||
| verification_uri = param.String() | ||
| verification_uri_complete = param.String() | ||
| expires = param.Number() | ||
| interval = param.Number(5) | ||
| open_browser = param.Boolean(True) | ||
| def prompt(self, p): | ||
| print(f'Please visit the following URL to complete ' | ||
| f'the login: {self.verification_uri_complete}', file=p.console) | ||
| if p.open_browser: | ||
| import webbrowser | ||
| webbrowser.open(self.verification_uri_complete) | ||
| return p | ||
| def perform(self, p): | ||
| while True: | ||
| if time.time()>p.expires: | ||
| raise TimeoutError("Device code hase expired but not yet authorized.") | ||
| try: | ||
| s = self.fetch_token(p.oauth_domain, p.oauth_token_path, | ||
| p.device_code, p.client_id, headers=p.headers) | ||
| return s | ||
| except Exception as e: | ||
| time.sleep(p.interval) | ||
| def fetch_token(self, oauth_domain, oauth_token_path, device_code, client_id, headers={}): | ||
| with httpx.Client(base_url=oauth_domain, headers=headers) as client: | ||
| r = client.post( | ||
| oauth_token_path, | ||
| data={ | ||
| "grant_type": "urn:ietf:params:oauth:grant-type:device_code", | ||
| "device_code": device_code, | ||
| "client_id": client_id, | ||
| }, | ||
| headers={"content-type": "application/x-www-form-urlencoded"}, | ||
| ) | ||
| r.raise_for_status() | ||
| params = r.json() | ||
| params["expires"] = time.time() + params.pop("expires_in", 1e6) | ||
| params["client_id"] = self.client_id | ||
| params['oauth_domain'] = oauth_domain | ||
| params['oauth_token_path'] = oauth_token_path | ||
| return XeToken(**params) | ||
| class XeAuthCodeRequest(XeAuthStep): | ||
| oauth_domain = param.String(config.OAUTH_DOMAIN) | ||
| oauth_code_path = param.String(config.OAUTH_CODE_PATH) | ||
| client_id = param.String(config.DEFAULT_CLIENT_ID) | ||
| scopes = param.List(config.DEFAULT_SCOPE.split(' ')) | ||
| audience = param.String(config.DEFAULT_AUDIENCE) | ||
| extra_fields = param.Dict({}) | ||
| headers = param.Dict({}) | ||
| @property | ||
| def scope_str(self): | ||
| return ' '.join(self.scopes) | ||
| def perform(self, p): | ||
| data = { | ||
| "client_id": p.client_id, | ||
| "scope": ' '.join(p.scopes), | ||
| "audience": p.audience, | ||
| } | ||
| data.update(p.extra_fields) | ||
| with httpx.Client(base_url=p.oauth_domain, headers=p.headers) as client: | ||
| r = client.post( | ||
| p.oauth_code_path, | ||
| data=data, | ||
| headers={"content-type": "application/x-www-form-urlencoded"}) | ||
| r.raise_for_status() | ||
| params = r.json() | ||
| params['expires'] = time.time() + params.pop("expires_in", 1) | ||
| params['oauth_domain'] = p.oauth_domain | ||
| params['client_id'] = p.client_id | ||
| return XeTokenRequest.instance(**params) | ||
| from .eve_panel import XenonEveAuth | ||
| from .eve_server import XeTokenAuth |
| from warnings import warn | ||
| import param | ||
| from ..session import NotebookSession | ||
| try: | ||
| from eve_panel.auth import EveAuthBase | ||
| except ImportError: | ||
| class EveAuthBase: | ||
| def __init__(self) -> None: | ||
| raise RuntimeError('eve_panel not installed.') | ||
| class XenonEveAuth(NotebookSession, EveAuthBase): | ||
| # session = param.ClassSelector(NotebookSession, default=NotebookSession()) | ||
| def get_headers(self): | ||
| """Generate auth headers for HTTP requests. | ||
| Returns: | ||
| dict: Auth related headers to be included in all requests. | ||
| """ | ||
| if self.logged_in: | ||
| return {"Authorization": f"Bearer {self.access_token}"} | ||
| else: | ||
| return {} | ||
| def login(self, notify_email=None): | ||
| """perform any actions required to aquire credentials. | ||
| Returns: | ||
| bool: whether login was successful | ||
| """ | ||
| self.login_requested(None) | ||
| self.authorize() | ||
| def set_credentials(self, **credentials): | ||
| """Set the access credentials manually. | ||
| """ | ||
| for k,v in credentials.items(): | ||
| if k in ['access_token', "id_token", "refresh_token", "expires"]: | ||
| setattr(self.token, k, v) | ||
| else: | ||
| setattr(self, k, v) | ||
| def credentials_view(self): | ||
| return self.gui |
| import xeauth | ||
| try: | ||
| from eve.auth import TokenAuth | ||
| from eve.utils import config | ||
| except ImportError: | ||
| class TokenAuth: | ||
| def __init__(self, *args, **kwargs): | ||
| raise RuntimeError('eve not installed.') | ||
| class XeTokenAuth(TokenAuth): | ||
| @property | ||
| def global_read_token(self): | ||
| return config.get('API_GLOBAL_READ_TOKEN', None) | ||
| @property | ||
| def audience(self): | ||
| return config.get('JWT_AUDIENCE', 'https//:api.pmts.xenonnt.org') | ||
| def check_auth(self, token, allowed_roles, resource, method): | ||
| if not token: | ||
| return False | ||
| if method in ['GET', 'HEAD'] and self.global_read_token==token: | ||
| return True | ||
| try: | ||
| xeauth.certs.validate_claims(token, audience=self.audience) | ||
| except: | ||
| return False | ||
| c = xeauth.certs.extract_verified_claims(token) | ||
| roles = c.get('scope', '').split(' ') | ||
| return any([r in allowed_roles for r in roles]) |
| from xeauth.settings import config as xeconfig | ||
| import logging | ||
| log = logging.getLogger(__name__) | ||
| try: | ||
| from tornado.auth import OAuth2Mixin | ||
| from panel.auth import OAuthIDTokenLoginHandler | ||
| from panel.config import config as pnconfig | ||
| except ImportError: | ||
| class OAuthIDTokenLoginHandler: | ||
| def __init__(self, *args, **kwargs) -> None: | ||
| raise RuntimeError('panel not installed.') | ||
| class OAuth2Mixin: | ||
| def __init__(self, *args, **kwargs) -> None: | ||
| raise RuntimeError('panel not installed.') | ||
| class XenonPanelAuth(OAuthIDTokenLoginHandler, OAuth2Mixin): | ||
| _AUDIENCE = xeconfig.DEFAULT_AUDIENCE | ||
| _SCOPE = xeconfig.DEFAULT_SCOPE.split(' ') | ||
| _USER_KEY = 'email' | ||
| _EXTRA_TOKEN_PARAMS = { | ||
| 'grant_type': 'authorization_code', | ||
| } | ||
| _EXTRA_AUTHORIZE_PARAMS = { | ||
| 'grant_type': 'authorization_code', | ||
| } | ||
| _OAUTH_ACCESS_TOKEN_URL_ = 'https://{0}.auth0.com/oauth/token' | ||
| _OAUTH_AUTHORIZE_URL_ = 'https://{0}.auth0.com/authorize' | ||
| _OAUTH_USER_URL_ = 'https://{0}.auth0.com/userinfo?access_token=' | ||
| @property | ||
| def _OAUTH_ACCESS_TOKEN_URL(self): | ||
| url = pnconfig.oauth_extra_params.get('subdomain', xeconfig.AUTH0_SUBDOMAIN) | ||
| return self._OAUTH_ACCESS_TOKEN_URL_.format(url) | ||
| @property | ||
| def _OAUTH_AUTHORIZE_URL(self): | ||
| url = pnconfig.oauth_extra_params.get('subdomain', xeconfig.AUTH0_SUBDOMAIN) | ||
| return self._OAUTH_AUTHORIZE_URL_.format(url) | ||
| @property | ||
| def _OAUTH_USER_URL(self): | ||
| url = pnconfig.oauth_extra_params.get('subdomain', xeconfig.AUTH0_SUBDOMAIN) | ||
| return self._OAUTH_USER_URL_.format(url) | ||
| async def get_authenticated_user(self, redirect_uri, client_id, state, | ||
| client_secret=None, code=None): | ||
| """ | ||
| Fetches the authenticated user | ||
| Arguments | ||
| --------- | ||
| redirect_uri: (str) | ||
| The OAuth redirect URI | ||
| client_id: (str) | ||
| The OAuth client ID | ||
| state: (str) | ||
| The unguessable random string to protect against | ||
| cross-site request forgery attacks | ||
| client_secret: (str, optional) | ||
| The client secret | ||
| code: (str, optional) | ||
| The response code from the server | ||
| """ | ||
| if code: | ||
| return await self._fetch_access_token( | ||
| code, | ||
| redirect_uri, | ||
| client_id, | ||
| client_secret | ||
| ) | ||
| params = { | ||
| 'redirect_uri': redirect_uri, | ||
| 'client_id': client_id, | ||
| 'client_secret': client_secret, | ||
| 'extra_params': { | ||
| 'state': state, | ||
| }, | ||
| } | ||
| if self._SCOPE is not None: | ||
| params['scope'] = self._SCOPE | ||
| if 'scope' in pnconfig.oauth_extra_params: | ||
| params['scope'] = pnconfig.oauth_extra_params['scope'] | ||
| if self._AUDIENCE is not None: | ||
| params['extra_params']['audience'] = self._AUDIENCE | ||
| if 'audience' in pnconfig.oauth_extra_params: | ||
| params['extra_params']['audience'] = pnconfig.oauth_extra_params['audience'] | ||
| log.debug("%s making authorize request" % type(self).__name__) | ||
| self.authorize_redirect(**params) |
| import io | ||
| import sys | ||
| import param | ||
| class XeAuthStep(param.ParameterizedFunction): | ||
| auto_advance = param.Boolean(True) | ||
| prompt_response = param.Parameter(instantiate=False) | ||
| console = param.ClassSelector(io.IOBase, default=sys.stdout, | ||
| instantiate=False, pickle_default_value=False) | ||
| def perform(self, p): | ||
| pass | ||
| def prompt(self, p): | ||
| return p | ||
| def __call__(self, **params): | ||
| p = param.ParamOverrides(self, params) | ||
| p = self.prompt(p) | ||
| next = self.perform(p) | ||
| if isinstance(next, XeAuthStep) and p.auto_advance: | ||
| params = {k:v for k,v in params.items() if k in next.param.params()} | ||
| next = next(**params) | ||
| return next | ||
| import time | ||
| import param | ||
| import httpx | ||
| from .settings import config | ||
| from .token import XeToken | ||
| from .oauth import XeAuthStep | ||
| class TokenRefresh(XeAuthStep): | ||
| client_id = param.String(config.DEFAULT_CLIENT_ID) | ||
| oauth_domain = param.String(config.OAUTH_DOMAIN) | ||
| oauth_token_path = param.String(config.OAUTH_TOKEN_PATH) | ||
| access_token = param.String(readonly=True) | ||
| id_token = param.String(readonly=True) | ||
| refresh_token = param.String(readonly=True) | ||
| def perform(self, p): | ||
| with httpx.Client(base_url=self.oauth_domain, headers=p.headers) as client: | ||
| r = client.post( | ||
| p.oauth_token_path, | ||
| headers={"content-type":"application/x-www-form-urlencoded"}, | ||
| data={ | ||
| "grant_type": "refresh_token", | ||
| "refresh_token": p.refresh_token, | ||
| "client_id": p.client_id, | ||
| } | ||
| ) | ||
| r.raise_for_status() | ||
| params = r.json() | ||
| params["expires"] = time.time() + params.pop("expires_in", 1e6) | ||
| params["client_id"] = p.client_id | ||
| params['oauth_domain'] = p.oauth_domain | ||
| params['oauth_token_path'] = p.oauth_token_path | ||
| return XeToken(**params) |
| import os | ||
| import param | ||
| import httpx | ||
| import time | ||
| import webbrowser | ||
| import logging | ||
| from datetime import datetime | ||
| from contextlib import contextmanager, asynccontextmanager | ||
| from .settings import config | ||
| from .utils import url_link_button, id_token_from_server_state | ||
| from .certificates import certs | ||
| from .token import XeToken | ||
| from .device_auth_flow import XeAuthCodeRequest | ||
| from .oauth import XeAuthStep | ||
| logger = logging.getLogger(__name__) | ||
| class XeAuthSession(param.Parameterized): | ||
| keyset = certs | ||
| oauth_domain = param.String(config.OAUTH_DOMAIN) | ||
| oauth_code_path = param.String(config.OAUTH_CODE_PATH) | ||
| oauth_token_path = param.String(config.OAUTH_TOKEN_PATH) | ||
| auto_persist_session = param.Boolean(False) | ||
| token_file = param.String(config.TOKEN_FILE) | ||
| client_id = param.String(config.DEFAULT_CLIENT_ID) | ||
| scopes = param.List([]) | ||
| audience = param.String(config.DEFAULT_AUDIENCE) | ||
| notify_email = param.String(allow_None=True) | ||
| flow = param.ClassSelector(default=XeAuthCodeRequest, class_=XeAuthCodeRequest, is_instance=False) | ||
| token = param.ClassSelector(XeToken, default=None) | ||
| state = param.Selector(["Disconnected", "Logged in", "Awaiting token", | ||
| "Checking token ready", "Token expired"], | ||
| default="Disconnected") | ||
| def __init__(self, **params): | ||
| super().__init__(**params) | ||
| if os.path.isfile(self.token_file): | ||
| self.token = XeToken.from_file(self.token_file) | ||
| if self.logged_in: | ||
| self.state = "Logged in" | ||
| def login_from_server(self): | ||
| import panel as pn | ||
| access_token = pn.state.access_token | ||
| id_token = id_token_from_server_state() | ||
| self.token = XeToken(access_token=access_token, | ||
| id_token=id_token, | ||
| ) | ||
| return self.token | ||
| @property | ||
| def scope(self): | ||
| scopes = set(config.DEFAULT_SCOPE.split(" ") + self.scopes) | ||
| return " ".join(scopes) | ||
| def login(self, open_browser=False, print_url=False, extra_headers={}, notify_email=None): | ||
| extra_fields = {} | ||
| if notify_email: | ||
| extra_fields["notify_email"] = notify_email | ||
| if self.token: | ||
| self.state = "Logged in" | ||
| else: | ||
| self.token = self.flow(self.oauth_domain, self.oauth_code_path, self.oauth_token_path, | ||
| self.client_id, self.scope, self.audience, headers=extra_headers, | ||
| extra_fields=extra_fields, | ||
| open_browser=open_browser, print_url=print_url) | ||
| if self.token: | ||
| self.state = "Logged in" | ||
| return self.token | ||
| def persist_token(self): | ||
| if self.token_file: | ||
| try: | ||
| self.token.to_file(self.token_file) | ||
| except Exception as e: | ||
| logger.error("Exception raised while persisted token: "+str(e)) | ||
| def logout(self): | ||
| self.token = None | ||
| self.state = "Disconnected" | ||
| return True | ||
| def request_token(self, extra_headers={}): | ||
| self.flow.request_token(self.oauth_domain, self.oauth_code_path, | ||
| self.client_id, self.scope, self.audience, headers=extra_headers) | ||
| self.state = "Awaiting token" | ||
| def token_ready(self, extra_headers={}): | ||
| if self.token is None: | ||
| try: | ||
| self.token = self.flow.fetch_token(self.oauth_domain, | ||
| self.oauth_token_path, headers=extra_headers) | ||
| if self.auto_persist_session: | ||
| self.persist_token() | ||
| self.state = "Logged in" | ||
| return True | ||
| except: | ||
| pass | ||
| return False | ||
| def refresh_tokens(self, extra_headers={}): | ||
| self.token.refresh_tokens(self.oauth_domain, self.oauth_token_path, | ||
| self.client_id, headers=extra_headers) | ||
| if self.auto_persist_session: | ||
| self.persist_token() | ||
| @property | ||
| def logged_in(self): | ||
| if self.token is None: | ||
| return False | ||
| if self.token.expired: | ||
| return False | ||
| return True | ||
| @property | ||
| def id_token(self): | ||
| return self.token.id_token | ||
| @property | ||
| def access_token(self): | ||
| return self.token.access_token | ||
| @property | ||
| def profile(self): | ||
| claims = self.keyset.extract_verified_claims(self.id_token) | ||
| return {k:v for k,v in claims.items() if k not in claims.REGISTERED_CLAIMS} | ||
| @property | ||
| def claims(self): | ||
| claims = self.keyset.extract_verified_claims(self.access_token) | ||
| return {k:v for k,v in claims.items() if k in claims.REGISTERED_CLAIMS} | ||
| @property | ||
| def extra_claims(self): | ||
| claims = self.keyset.extract_verified_claims(self.access_token) | ||
| return {k:v for k,v in claims.items() if k not in claims.REGISTERED_CLAIMS} | ||
| @property | ||
| def permissions(self): | ||
| claims = self.keyset.extract_verified_claims(self.access_token) | ||
| return claims.get("permissions", []) | ||
| @contextmanager | ||
| def Client(self, *args, **kwargs): | ||
| kwargs["headers"] = kwargs.get("headers", {}) | ||
| kwargs["headers"]["Authorization"] = f"Bearer {self.access_token}" | ||
| client = httpx.Client(*args, **kwargs) | ||
| try: | ||
| yield client | ||
| finally: | ||
| client.close() | ||
| @asynccontextmanager | ||
| async def AsyncClient(self, *args, **kwargs ): | ||
| kwargs["headers"] = kwargs.get("headers", {}) | ||
| kwargs["headers"]["Authorization"] = f"Bearer {self.access_token}" | ||
| client = httpx.AsyncClient(*args, **kwargs) | ||
| try: | ||
| yield client | ||
| finally: | ||
| await client.aclose() | ||
| def authorize(self): | ||
| webbrowser.open(self.flow.verification_uri_complete) | ||
| class NotebookSession(XeAuthSession): | ||
| message = param.String("") | ||
| _gui = None | ||
| _cb = None | ||
| @property | ||
| def gui(self): | ||
| import panel as pn | ||
| if self._gui is None: | ||
| self._gui = pn.panel(self._make_gui) | ||
| return self._gui | ||
| def await_token_cb(self): | ||
| if self.token_ready() and self.token: | ||
| self._cb.stop() | ||
| def login_requested(self, event): | ||
| try: | ||
| import panel as pn | ||
| self.request_token() | ||
| logger.info("Sent request...") | ||
| self._cb = pn.state.add_periodic_callback(self.await_token_cb, | ||
| 1000*self.flow.interval, | ||
| timeout=1000*max(1, int(self.flow.expires-time.time()))) | ||
| except Exception as e: | ||
| logging.error(e) | ||
| print(e) | ||
| def logged_in_gui(self): | ||
| import panel as pn | ||
| profile = self.profile | ||
| details = pn.Row( | ||
| pn.pane.PNG(profile.get("picture", config.DEFAULT_AVATAR), width=60, height=60), | ||
| pn.widgets.TextInput(name='Name', value=profile.get("name", "Unknown"), disabled=True, height=35), | ||
| pn.widgets.TextInput(name='Email', value=profile.get("email", "Unknown"), disabled=True, height=35), | ||
| height=70 | ||
| ) | ||
| token = pn.widgets.input.TextAreaInput(name='Access token', value=self.access_token, width=700, height=100) | ||
| token_props = pn.Row( | ||
| pn.widgets.TextInput(name='Scope', value=self.token.scope, disabled=True, height=35), | ||
| pn.widgets.DatetimeInput(disabled=True, name="Expiration date", | ||
| value=datetime.utcfromtimestamp(self.token.expires)), | ||
| width=700, | ||
| ) | ||
| logout = pn.widgets.Button(name="Logout", button_type='warning') | ||
| logout.on_click(lambda event: self.logout()) | ||
| return pn.Column( | ||
| details, | ||
| token, | ||
| token_props, | ||
| logout, | ||
| height=300, | ||
| width=700) | ||
| def awaiting_token_gui(self): | ||
| import panel as pn | ||
| cancel = pn.widgets.Button(name="Cancel", width=50) | ||
| cancel.on_click(lambda event: self.logout()) | ||
| authenticate = url_link_button(self.flow.verification_uri_complete, button_type='primary', width=100) | ||
| waiting_ind = pn.indicators.LoadingSpinner(value=False, width=30, height=30, align="center") | ||
| def activate(event): | ||
| waiting_ind.value = True | ||
| authenticate.on_click(activate) | ||
| return pn.Row(waiting_ind, authenticate, cancel) | ||
| def token_expired_gui(self): | ||
| import panel as pn | ||
| refresh = pn.widgets.Button(name="Renew", align="end") | ||
| refresh.on_click(lambda e: self.refresh_tokens()) | ||
| return refresh | ||
| @param.depends("state") | ||
| def _make_gui(self): | ||
| import panel as pn | ||
| status = pn.indicators.BooleanStatus(width=20, height=20, value=True, color="danger", align="center") | ||
| header = pn.Row(status, f"Status: {self.state}.") | ||
| panel = pn.Column(header) | ||
| if self.state == "Logged in": | ||
| status.color = "success" | ||
| panel.append(self.logged_in_gui()) | ||
| elif self.state == "Awaiting token": | ||
| status.color = "primary" | ||
| panel.append(self.awaiting_token_gui()) | ||
| elif self.state == "Token expired": | ||
| status.color = "warning" | ||
| return pn.panel(self.token_expired_gui()) | ||
| else: | ||
| login = pn.widgets.Button(name="Login", button_type='success', width=30) | ||
| login.on_click(self.login_requested) | ||
| panel.append(pn.Param(self.param, parameters=["scopes", "auto_persist_session"])) | ||
| panel.append(login) | ||
| return panel | ||
| def _repr_mimebundle_(self, include=None, exclude=None): | ||
| return self.gui._repr_mimebundle_(include=include, exclude=exclude) |
-116
| import param | ||
| import time | ||
| import httpx | ||
| import json | ||
| from contextlib import contextmanager, asynccontextmanager | ||
| from .settings import config | ||
| from .utils import id_token_from_server_state | ||
| from .certificates import certs | ||
| class XeToken(param.Parameterized): | ||
| client_id = param.String(config.DEFAULT_CLIENT_ID) | ||
| oauth_domain = param.String(config.OAUTH_DOMAIN) | ||
| oauth_token_path = param.String(config.OAUTH_TOKEN_PATH) | ||
| access_token = param.String(constant=True) | ||
| id_token = param.String(constant=True) | ||
| refresh_token = param.String(constant=True) | ||
| expires = param.Number(constant=True) | ||
| scope = param.String(constant=True) | ||
| token_type = param.String("Bearer", constant=True) | ||
| @property | ||
| def expired(self): | ||
| return time.time()>self.expires | ||
| @property | ||
| def profile(self): | ||
| claims = certs.extract_verified_claims(self.id_token) | ||
| return {k:v for k,v in claims.items() if k not in claims.REGISTERED_CLAIMS} | ||
| @property | ||
| def username(self): | ||
| return self.profile.get('name', 'unknown') | ||
| @property | ||
| def claims(self): | ||
| claims = certs.extract_verified_claims(self.access_token) | ||
| return {k:v for k,v in claims.items() if k in claims.REGISTERED_CLAIMS} | ||
| @property | ||
| def extra_claims(self): | ||
| claims = certs.extract_verified_claims(self.access_token) | ||
| return {k:v for k,v in claims.items() if k not in claims.REGISTERED_CLAIMS} | ||
| @property | ||
| def permissions(self): | ||
| claims = certs.extract_verified_claims(self.access_token) | ||
| return claims.get("permissions", []) | ||
| @classmethod | ||
| def from_file(cls, path): | ||
| with open(path, "r") as f: | ||
| data = json.load(f) | ||
| return cls(**data) | ||
| @classmethod | ||
| def from_panel_server(cls): | ||
| import panel as pn | ||
| access_token = pn.state.access_token | ||
| id_token = id_token_from_server_state() | ||
| token = cls(access_token=access_token, | ||
| id_token=id_token, | ||
| ) | ||
| return token | ||
| def to_file(self, path): | ||
| with open(path, "w") as f: | ||
| json.dump(self.to_dict(), f) | ||
| def to_dict(self): | ||
| return {k:v for k,v in self.param.get_param_values() if not k.startswith("_")} | ||
| def refresh(self, headers={}): | ||
| with httpx.Client(base_url=self.oauth_domain, headers=headers) as client: | ||
| r = client.post( | ||
| self.oauth_token_path, | ||
| headers={"content-type":"application/x-www-form-urlencoded"}, | ||
| data={ | ||
| "grant_type": "refresh_token", | ||
| "refresh_token": self.refresh_token, | ||
| "client_id": self.client_id, | ||
| } | ||
| ) | ||
| r.raise_for_status() | ||
| params = r.json() | ||
| params["expires"] = time.time() + params.pop("expires_in", 1e6) | ||
| self.param.set_param(**params) | ||
| @contextmanager | ||
| def Client(self, *args, **kwargs): | ||
| kwargs["headers"] = kwargs.get("headers", {}) | ||
| kwargs["headers"]["Authorization"] = f"Bearer {self.access_token}" | ||
| client = httpx.Client(*args, **kwargs) | ||
| try: | ||
| yield client | ||
| finally: | ||
| client.close() | ||
| @asynccontextmanager | ||
| async def AsyncClient(self, *args, **kwargs ): | ||
| kwargs["headers"] = kwargs.get("headers", {}) | ||
| kwargs["headers"]["Authorization"] = f"Bearer {self.access_token}" | ||
| client = httpx.AsyncClient(*args, **kwargs) | ||
| try: | ||
| yield client | ||
| finally: | ||
| await client.aclose() | ||
| def __repr__(self): | ||
| return ("XeToken(" | ||
| f"user={self.profile.get('name', 'unknown')}, " | ||
| f"access_token={self.access_token})" | ||
| ) |
| import param | ||
| import httpx | ||
| import time | ||
| import getpass | ||
| from .oauth import XeAuthStep | ||
| from .token import XeToken | ||
| from .settings import config | ||
| class UserCredentialsAuth(XeAuthStep): | ||
| username = param.String(default=None) | ||
| password = param.String(default=None) | ||
| auth_url = param.String(config.OAUTH_DOMAIN.rstrip('/')+'/token') | ||
| audience = param.String(config.DEFAULT_AUDIENCE) | ||
| scopes = param.List(config.DEFAULT_SCOPE.split(' ')) | ||
| client_id = param.String(config.DEFAULT_CLIENT_ID) | ||
| headers = param.Dict({'content-type': 'application/x-www-form-urlencoded'}) | ||
| def prompt(self, p): | ||
| if p.username is None: | ||
| p.username = getpass.getuser() | ||
| if p.password is None: | ||
| p.password = getpass.getpass() | ||
| return p | ||
| def perform(self, p): | ||
| data = dict( | ||
| grant_type='password', | ||
| username=p.username, | ||
| password=p.password, | ||
| audience=p.audience, | ||
| scope=' '.join(p.scopes), | ||
| client_id=p.client_id, | ||
| ) | ||
| r = httpx.post(p.auth_url, data=data, headers=p.headers) | ||
| r.raise_for_status() | ||
| kwargs = r.json() | ||
| kwargs['expires'] = time.time() + kwargs.pop('expires_in') | ||
| return XeToken(client_id=p.client_id, **kwargs) |
| import os | ||
| from .settings import config | ||
| from .user_credentials import UserCredentialsAuth | ||
| from .device_auth_flow import XeAuthCodeRequest | ||
| from .certificates import certs | ||
| user_login = UserCredentialsAuth.instance(auto_advance=True) | ||
| device_login = XeAuthCodeRequest.instance(auto_advance=True) | ||
| def login(username=None, password=None, **kwargs): | ||
| if username is None: | ||
| return device_login(**kwargs) | ||
| return user_login(username=username, | ||
| password=password, | ||
| **kwargs) | ||
| def cli_login(**kwargs): | ||
| token = login(**kwargs) | ||
| print(f"logged in as: {token.profile.get('name', 'unknown')}") | ||
| print(f"Access token: {token.access_token}") | ||
| print(f"ID token: {token.id_token}") | ||
| def validate_claims(token, **claims): | ||
| return certs.validate_claims(token, **claims) | ||
| def clear_cache(): | ||
| os.remove(config.CACHE_FILE) | ||
| def cmt_login(scopes=[], **kwargs): | ||
| if isinstance(scopes, str): | ||
| scopes = scopes.split(' ') | ||
| if not isinstance(scopes, list): | ||
| raise TypeError('scopes must be a list or string') | ||
| scopes = set(scopes) | ||
| scopes.add('read:all') | ||
| scopes = list(scopes) | ||
| audience = kwargs.pop('audience', 'https://api.cmt.xenonnt.org') | ||
| # base_url = kwargs.pop('base_url', DEFAULT_BASE_URL) | ||
| return login(audience=audience, scopes=scopes, **kwargs) | ||
Alert delta unavailable
Currently unable to show alert delta for PyPI packages.
26557
-35.43%14
-39.13%527
-39.22%