Latest Threat Research:SANDWORM_MODE: Shai-Hulud-Style npm Worm Hijacks CI Workflows and Poisons AI Toolchains.Details
Socket
Book a DemoInstallSign in
Socket

xeauth

Package Overview
Dependencies
Maintainers
1
Versions
27
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

xeauth - npm Package Compare versions

Comparing version
0.1.22
to
0.2.0
+86
xeauth/admin.py
import requests
from .settings import config
def get_active_usernames():
import utilix
users_db = utilix.xent_collection(collection="users")
query = {
"active": {"$in": ["True", "true", True, 1]},
"github": {"$nin": ["", None]},
}
projection = {"github": 1, "_id": 0}
user_docs = users_db.find(query, projection)
return [doc["github"] for doc in user_docs]
def get_group_usernames(groupname):
import utilix
users_db = utilix.xent_collection(collection="users")
query = {
"active": {"$in": ["True", "true", True, 1]},
"github": {"$nin": ["", None]},
"groups": groupname,
}
projection = {"github": 1, "groups": 1, "_id": 0}
user_docs = users_db.find(query, projection)
return [doc["github"] for doc in user_docs if groupname in doc["groups"]]
def get_user_keys(username, token=config.GITHUB_TOKEN, validate=True):
headers = {}
if token is not None:
headers["Authorization"] = f"Bearer {token}"
r = requests.get(
f"https://api.github.com/users/{username}/gpg_keys", headers=headers
)
if not r.ok:
return []
keys = r.json()
if not isinstance(keys, list):
return []
if isinstance(keys, list):
return [key for key in keys if key["can_encrypt_storage"] and key["raw_key"]]
return []
def iter_valid_keys(token=config.GITHUB_TOKEN):
usernames = get_active_usernames()
for username in usernames:
for key in get_user_keys(username, token=token):
if not key["can_encrypt_storage"]:
continue
if not key["raw_key"]:
continue
key["username"] = username
yield key
def get_all_valid_keys(token=config.GITHUB_TOKEN):
keys = list(iter_valid_keys(token=token))
return keys
def iter_valid_user_keys(token=config.GITHUB_TOKEN):
usernames = get_active_usernames()
for username in usernames:
keys = get_user_keys(username, token=token)
keys = [key for key in keys if key["can_encrypt_storage"] and key["raw_key"]]
yield username, keys
def import_key(keydata, gnuhome=None):
import gnupg
gpg = gnupg.GPG(homedir=gnuhome)
gpg.import_keys(keydata)
from contextlib import contextmanager
import httpx
import param
import time
from rich.console import Console
from .settings import config
class GitHubApi(param.Parameterized):
API_URL = 'https://api.github.com'
oauth_token = param.String()
@contextmanager
def Client(self, *args, **kwargs):
kwargs["headers"] = kwargs.get("headers", {})
kwargs["headers"]["Authorization"] = f"Bearer {self.oauth_token}"
kwargs["headers"]["Accept"] = "application/json"
client = httpx.Client(*args, base_url=self.API_URL, **kwargs)
try:
yield client
finally:
client.close()
def get(self, path, *args, **kwargs):
with self.Client() as client:
response = client.get(path, *args, **kwargs)
response.raise_for_status()
return response.json()
def post(self, path, *args, **kwargs):
with self.Client() as client:
response = client.post(path, *args, **kwargs)
response.raise_for_status()
return response.json()
@property
def profile(self):
return self.get('/user')
@property
def username(self):
return self.profile.get('login')
@property
def organizations(self):
return [org.get('login') for org in self.get('/user/orgs')]
@property
def teams(self):
orgs = self.organizations
teams = []
for org in orgs:
teams.extend([team.get('name') for team in self.get(f'/orgs/{org}/teams')])
return teams
@property
def repositories(self):
return [repo.get('name') for repo in self.get('/user/repos')]
@property
def starred_repositories(self):
return [repo.get('name') for repo in self.get('/user/starred')]
@property
def followers(self):
return [user.get('login') for user in self.get('/user/followers')]
@property
def gpg_keys(self):
return self.get('/user/gpg_keys')
class GitHubDeviceCode(param.Parameterized):
"""GitHub device code authentication.
"""
base_url = param.String(default='https://github.com/login', doc='Github auth URL')
client_id = param.String(doc='GitHub App client ID', default=config.DEFAULT_CLIENT_ID)
device_code = param.String(doc='GitHub device code')
user_code = param.String(doc='GitHub user code')
verification_uri = param.String(doc='GitHub verification URI')
expires = param.Number(doc='Expiration time of the device code')
interval = param.Integer(doc='Interval between polling requests')
@classmethod
def from_response_data(cls, client_id, data):
return cls(**data)
def open_browser(self):
import webbrowser
webbrowser.open(self.verification_uri)
@property
def prompt(self):
return f'Go to {self.verification_uri} and enter the code: {self.user_code}'
def await_token(self):
while True:
if time.time() >= self.expires:
raise Exception("Authentication timed out. Please try again.")
access_token = self.check_for_access_token()
if access_token is not None:
return access_token
time.sleep(self.interval)
def check_for_access_token(self):
with httpx.Client(base_url=self.base_url) as client:
response = client.post(
'/oauth/access_token',
data = {
'client_id': self.client_id,
'device_code': self.device_code,
'grant_type': 'urn:ietf:params:oauth:grant-type:device_code',
},
headers = {"Accept": "application/json"},
)
response.raise_for_status()
data = response.json()
return data.get("access_token", None)
class GithubAuth(param.Parameterized):
BASE_URL = 'https://github.com/login'
DEFAULT_SCOPES = ("read:org", "read:user")
oauth_token = param.String()
@classmethod
def get_device_code(cls, client_id=None, scopes=None):
if client_id is None:
client_id = config.DEFAULT_CLIENT_ID
if client_id is None:
raise ValueError("client_id must be provided")
if scopes is None:
scopes = cls.DEFAULT_SCOPES
data = {'client_id': client_id}
if scopes is not None:
data['scope'] = ' '.join(scopes)
with httpx.Client(base_url=cls.BASE_URL) as client:
response = client.post(
'/device/code',
data=data,
headers = {"Accept": "application/json"},
)
response.raise_for_status()
data = response.json()
data['expires'] = time.time() + data.pop('expires_in', 900)
data['client_id'] = client_id
data['base_url'] = cls.BASE_URL
return GitHubDeviceCode(**data)
@classmethod
def device_login(cls, client_id, scopes=None, console=None):
if console is None:
console = Console()
code = cls.get_device_code(client_id, scopes=scopes)
console.print(code.prompt)
token = code.await_token()
return cls(client_id=client_id, oauth_token=token)
@contextmanager
def Client(self, *args, **kwargs):
kwargs["headers"] = kwargs.get("headers", {})
kwargs["headers"]["Authorization"] = f"Bearer {self.oauth_token}"
kwargs["headers"]["Accept"] = "application/json"
client = httpx.Client(*args, **kwargs)
try:
yield client
finally:
client.close()
@property
def api(self):
return GitHubApi(oauth_token=self.oauth_token)
@property
def xenonnt_member(self):
return "XENONnT" in self.api.organizations
@property
def xenon1t_member(self):
return "XENON1T" in self.api.organizations
@property
def xenon_member(self):
return self.xenonnt_member or self.xenon1t_member
import param
from .github import GithubAuth
from .settings import config
class XenonUser(param.Parameterized):
"""
XenonUser is a class that represents a user of the Xenon platform.
"""
username = param.String(doc="The Xenon username of the user.")
email = param.String(doc="The email address of the user.")
full_name = param.String(doc="The full name of the user.", default=None)
github = param.String(doc="The Github username of the user.", default=None)
cell = param.String(doc="The cell phone number of the user.", default=None)
groups = param.List(doc="The groups the user is a member of.", default=[])
institute = param.String(doc="The institute the user belongs to.", default=None)
picture_url = param.String(doc="The URL of the user's picture.", default=None)
lngs_ldap_email = param.String(doc="The email address of the user.", default=None)
lngs_ldap_cn = param.String(doc="The common name of the user.", default=None)
lngs_ldap_uid = param.String(doc="The UID of the user.", default=None)
active = param.Boolean(doc="Whether the user is active.", default=None)
first_name = param.String(doc="The first name of the user.", default=None)
last_name = param.String(doc="The last name of the user.", default=None)
github_orgs = param.List(doc="The Github organizations the user is a member of.", default=[])
github_teams = param.List(doc="The Github teams the user is a member of.", default=[])
@classmethod
def from_github_token(cls, token):
"""
Creates a XenonUser from a Github token.
Args:
token (str): The Github token.
Returns:
XenonUser: The XenonUser.
"""
users_db = config.mongo_collection('users')
api = GithubAuth(oauth_token=token).api
github_user = api.username
teams = api.teams
orgs = api.organizations
data = users_db.find_one({'github': github_user})
if data is None:
raise ValueError(f'User {github_user} not found in Xenon database.')
data['full_name'] = data.get('name', data.get('first_name', '') + ' ' + data.get('last_name', ''))
data['active'] = data.get('active', False) in [True, 'True', 'true']
data['github_teams'] = teams
data['github_orgs'] = orgs
data = {k: v for k,v in data.items() if k in cls.param.params()}
return cls(**data)
+8
-4
Metadata-Version: 2.1
Name: xeauth
Version: 0.1.22
Version: 0.2.0
Summary: Top-level package for xeauth.

@@ -9,3 +9,3 @@ Home-page: https://github.com/jmosbacher/xeauth

Author-email: joe.mosbacher@gmail.com
Requires-Python: >=3.8,<3.11
Requires-Python: >=3.7,<4
Classifier: Development Status :: 2 - Pre-Alpha

@@ -16,10 +16,14 @@ Classifier: Intended Audience :: Developers

Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.7
Classifier: Programming Language :: Python :: 3.8
Classifier: Programming Language :: Python :: 3.9
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.8
Classifier: Programming Language :: Python :: 3.9
Requires-Dist: Authlib (>=1.0.0,<2.0.0)
Requires-Dist: appdirs (>=1.4.4,<2.0.0)
Requires-Dist: click
Requires-Dist: gnupg (>=2.3.1,<3.0.0)
Requires-Dist: httpx (>=0.19,<0.23)
Requires-Dist: param (>=1.12.0,<2.0.0)
Requires-Dist: rich (>=13.1.0,<14.0.0)
Description-Content-Type: text/x-rst

@@ -26,0 +30,0 @@

[tool]
[tool.poetry]
name = "xeauth"
version = "0.1.22"
version = "0.2.0"
homepage = "https://github.com/jmosbacher/xeauth"

@@ -25,8 +25,9 @@ description = "Top-level package for xeauth."

[tool.poetry.dependencies]
python = ">=3.8,<3.11"
python = ">=3.7,<4"
click = "*"
Authlib = "^1.0.0"
httpx = ">=0.19,<0.23"
appdirs = "^1.4.4"
param = "^1.12.0"
gnupg = "^2.3.1"
rich = "^13.1.0"

@@ -33,0 +34,0 @@ [tool.poetry.dev-dependencies]

@@ -5,3 +5,3 @@ # -*- coding: utf-8 -*-

packages = \
['tests', 'xeauth', 'xeauth.integrations']
['tests', 'xeauth']

@@ -12,7 +12,8 @@ package_data = \

install_requires = \
['Authlib>=1.0.0,<2.0.0',
'appdirs>=1.4.4,<2.0.0',
['appdirs>=1.4.4,<2.0.0',
'click',
'gnupg>=2.3.1,<3.0.0',
'httpx>=0.19,<0.23',
'param>=1.12.0,<2.0.0']
'param>=1.12.0,<2.0.0',
'rich>=13.1.0,<14.0.0']

@@ -26,3 +27,3 @@ entry_points = \

'name': 'xeauth',
'version': '0.1.22',
'version': '0.2.0',
'description': 'Top-level package for xeauth.',

@@ -32,4 +33,4 @@ 'long_description': '======\nxeauth\n======\n\n\n.. image:: https://img.shields.io/pypi/v/xeauth.svg\n :target: https://pypi.python.org/pypi/xeauth\n\n.. image:: https://img.shields.io/travis/jmosbacher/xeauth.svg\n :target: https://travis-ci.com/jmosbacher/xeauth\n\n.. image:: https://readthedocs.org/projects/xeauth/badge/?version=latest\n :target: https://xeauth.readthedocs.io/en/latest/?badge=latest\n :alt: Documentation Status\n\n\n\n\nAuthentication client for the Xenon edark matter experiment.\n\n\n* Free software: MIT\n* Documentation: https://xeauth.readthedocs.io.\n\n\nFeatures\n--------\n\n* TODO\n\nCredits\n-------\n\nThis package was created with Cookiecutter_ and the `briggySmalls/cookiecutter-pypackage`_ project template.\n\n.. _Cookiecutter: https://github.com/audreyr/cookiecutter\n.. _`briggySmalls/cookiecutter-pypackage`: https://github.com/briggySmalls/cookiecutter-pypackage\n',

'author_email': 'joe.mosbacher@gmail.com',
'maintainer': None,
'maintainer_email': None,
'maintainer': 'None',
'maintainer_email': 'None',
'url': 'https://github.com/jmosbacher/xeauth',

@@ -40,3 +41,3 @@ 'packages': packages,

'entry_points': entry_points,
'python_requires': '>=3.8,<3.11',
'python_requires': '>=3.7,<4',
}

@@ -43,0 +44,0 @@

@@ -33,5 +33,5 @@ #!/usr/bin/env python

assert result.exit_code == 0
assert 'xeauth.cli.main' in result.output
help_result = runner.invoke(cli.main, ['--help'])
assert "xeauth.cli.main" in result.output
help_result = runner.invoke(cli.main, ["--help"])
assert help_result.exit_code == 0
assert '--help Show this message and exit.' in help_result.output
assert "--help Show this message and exit." in help_result.output
"""Top-level package for xeauth."""
from .xeauth import *
from .settings import config
from . import admin, github, user, utils
__all__ = ["config", "admin", "github", "user", "utils"]
__author__ = """Yossi Mosbacher"""
__email__ = 'joe.mosbacher@gmail.com'
__version__ = '0.1.22'
__email__ = "joe.mosbacher@gmail.com"
__version__ = "0.2.0"
"""Console script for xeauth."""
import os
import sys
import xeauth
import click
import httpx
import pathlib
from rich.console import Console
from rich.progress import track
from rich.live import Live
from rich.table import Table
@click.command()
@click.group()
def main():
"""Console script for xeauth."""
click.echo("Replace this message by putting your code into "
"xeauth.cli.main")
click.echo("See click documentation at https://click.palletsprojects.com/")
return 0
with click.get_current_context() as ctx:
if ctx.invoked_subcommand is None:
click.echo(ctx.get_help())
@click.command()
@main.group()
def secrets():
with click.get_current_context() as ctx:
if ctx.invoked_subcommand is None:
click.echo(ctx.get_help())
@main.group()
def admin():
with click.get_current_context() as ctx:
if ctx.invoked_subcommand is None:
click.echo(ctx.get_help())
@main.command()
def login():
xeauth.cli_login()
token = xeauth.cli_login()
token.to_file(xeauth.config.TOKEN_FILE)
click.echo(f"Token saved to: {xeauth.config.TOKEN_FILE}")
@secrets.command()
@click.option("--name", prompt="Name", help="Your full name.")
@click.option(
"--email", prompt="Email", help="Your email address as it is registered on github."
)
@click.option("--passphrase", prompt="Passphrase", help="Your passphrase for the key.")
@click.option("--gnu-home", default=None, help="Your gnu home directory.")
def key_gen(name, email, passphrase, gnu_home):
HELP_LINK = "https://docs.github.com/en/authentication/managing-commit-signature-verification/adding-a-gpg-key-to-your-github-account"
key = xeauth.utils.new_gpg_key(name, email, passphrase, gnu_home)
click.echo("Key generated:")
click.echo(key)
click.echo(f"Please see {HELP_LINK} for help adding the key to github.")
@admin.command()
@click.option("--filename", default=None)
def list_usernames(filename):
console = Console()
with console.status("Fetching active user list..."):
usernames = xeauth.admin.get_active_usernames()
if filename is not None:
with console.status("Saving user list to file..."):
with open(filename, "w") as f:
for username in usernames:
f.write(f"{username}\n")
click.echo(f"Active usernames written to file {filename}.")
else:
click.echo("Active usernames:")
for username in usernames:
click.echo(username)
@admin.command()
@click.option(
"--token",
prompt="Github token",
default=xeauth.config.GITHUB_TOKEN,
help="A valid github API token.",
)
@click.option("--path",
prompt="Destination folder",
default="~/xenon_user_keys",
)
@click.option("--import_keys", is_flag=True,
default=True,
)
@click.option("--gnuhome",
prompt="GPG home path",
default="~/.gnupg",
)
def fetch_keys(token, path, import_keys, gnuhome):
if path:
path = pathlib.Path(path)
path = path.expanduser()
console = Console()
with console.status("Fetching active user list..."):
usernames = xeauth.admin.get_active_usernames()
table = Table()
table.add_column("Username")
table.add_column("Public Key")
with Live(table, refresh_per_second=4, console=console):
for username in track(usernames, description="Fetching keys..."):
keys = xeauth.admin.get_user_keys(username, token=token)
for key in keys:
table.add_row(username, key["raw_key"])
key["username"] = username
folder = path / key["username"]
if not folder.exists():
folder.mkdir(parents=True)
filename = folder / key["key_id"]
with open(filename, "w") as f:
f.write(key["raw_key"])
if import_keys:
xeauth.admin.import_key(key['raw_key'], gnuhome=gnuhome)
console.print(f"Valid keys written to folder {path.absolute()}.")
if __name__ == "__main__":
sys.exit(main()) # pragma: no cover
main() # pragma: no cover

@@ -6,5 +6,6 @@ import param

from appdirs import AppDirs
import unittest
DIRS = AppDirs("xeauth", "XENON")
DIRS = AppDirs("xeauth", "xenon")
CACHE_DIR = DIRS.user_cache_dir

@@ -14,36 +15,33 @@ if not os.path.isdir(CACHE_DIR):

DEFAULT_TOKEN_FILE = os.path.join(CACHE_DIR, f"{getpass.getuser()}_xetoken.json")
class ConfigParameter(param.Parameter):
__slots__ = ["env_prefix", "klass"]
class EnvConfigurable(param.Parameterized):
"""
A class that can be used to configure its parameters from the environment.
"""
def __init__(self, klass, env_prefix="", **kwargs):
super().__init__(**kwargs)
self.env_prefix = env_prefix
self.klass = klass
def _set_names(self, attrib_name):
env_name = attrib_name.upper()
env_name = self.env_prefix.upper() + "_" + env_name
if os.getenv(env_name, ""):
env = os.getenv(env_name, "")
try:
env = json.loads(env)
except Exception as e:
pass
self.default = self.klass(env)
super()._set_names(attrib_name)
@classmethod
def from_env(cls, prefix="", **overrides):
params = {}
for name in cls.param.params():
env_name = "_".join([prefix.upper() + name.upper()])
val = os.getenv(env_name, None)
if val:
try:
val = json.loads(val)
except Exception as e:
pass
params[name] = val
class Config(param.Parameterized):
OAUTH_DOMAIN = ConfigParameter(str, env_prefix="xeauth", default="https://xenon-experiment.eu.auth0.com/oauth")
OAUTH_CODE_PATH = ConfigParameter(str, env_prefix="xeauth", default="/device/code")
OAUTH_TOKEN_PATH = ConfigParameter(str, env_prefix="xeauth", default="/token")
OAUTH_CERT_PATH = ConfigParameter(str, env_prefix="xeauth", default="/.well-known/jwks.json")
AUTH0_SUBDOMAIN = ConfigParameter(str, env_prefix="xeauth", default="xenon-experiment.eu")
DEFAULT_CLIENT_ID = ConfigParameter(str, env_prefix="xeauth", default="EC3adX50KdNHQuEmib30GCRDTFDibMK7")
DEFAULT_AUDIENCE = ConfigParameter(str, env_prefix="xeauth", default="https://users.xenonnt.org")
DEFAULT_SCOPE = ConfigParameter(str, env_prefix="xeauth", default="openid profile email offline_access")
DEBUG = ConfigParameter(bool, env_prefix="xeauth", default=False)
params.update(overrides)
return cls(**params)
class Config(EnvConfigurable):
DEFAULT_CLIENT_ID = param.String(default="4a7b1485afcfcfa45271")
MONGO_URI = param.String(default="mongodb://localhost:27017")
MONGO_USER = param.String(default="")
MONGO_PASSWORD = param.String(default="")
DEBUG = param.Boolean(default=False)
MAX_LOG_SIZE = 20

@@ -53,5 +51,20 @@ MAX_MESSAGES = 3

GUI_WIDTH = 600
DEFAULT_AVATAR = "http://www.sibberhuuske.nl/wp-content/uploads/2016/10/default-avatar.png"
TOKEN_FILE = ConfigParameter(str, env_prefix="xeauth", default=DEFAULT_TOKEN_FILE)
DEFAULT_AVATAR = (
"http://www.sibberhuuske.nl/wp-content/uploads/2016/10/default-avatar.png"
)
TOKEN_FILE = param.String(default=DEFAULT_TOKEN_FILE)
GITHUB_TOKEN = param.String(default="")
def mongo_collection(self, collection_name, database='xenonnt'):
try:
from utilix import xent_collection
return xent_collection(collection_name, database=database)
except:
from pymongo import MongoClient
client = MongoClient(self.MONGO_URI)
db = client[database]
if self.MONGO_USER and self.MONGO_PASSWORD:
db.authenticate(self.MONGO_USER, self.MONGO_PASSWORD)
return db[collection_name]
config = Config()
config = Config.from_env(prefix="XEAUTH")

@@ -1,3 +0,1 @@

def url_link_button(url, label="Authenticate", **kwargs):

@@ -15,6 +13,6 @@ import panel as pn

id_token = pn.state.cookies.get('id_token')
id_token = pn.state.cookies.get("id_token")
if id_token is None or pn.config.cookie_secret is None:
return None
id_token = decode_signed_value(pn.config.cookie_secret, 'id_token', id_token)
id_token = decode_signed_value(pn.config.cookie_secret, "id_token", id_token)
if pn.state.encryption is None:

@@ -24,2 +22,24 @@ id_token = id_token

id_token = pn.state.encryption.decrypt(id_token)
return id_token.decode()
return id_token.decode()
def new_gpg_key(name, email, passphrase="", gnu_home=None):
import gnupg
gpg = gnupg.GPG(gnupghome=gnu_home)
arg = gpg.gen_key_input(
key_type="RSA",
key_length=2048,
name_real=name,
passphrase=passphrase,
name_email=email,
)
key = gpg.gen_key(arg)
if not key.status == "ok":
raise RuntimeError("Unable to create key.")
public_key = gpg.export_keys(key.fingerprint, False)
return public_key
import param
import httpx
import authlib
from authlib import jose
from authlib.jose import KeySet
import time
from contextlib import contextmanager
import logging
from .settings import config
logger = logging.getLogger(__name__)
class XeKeySet(param.Parameterized):
oauth_domain = param.String(config.OAUTH_DOMAIN)
cert_path = param.String(config.OAUTH_CERT_PATH)
_keyset = param.ClassSelector(KeySet, default=KeySet({}))
_keys_timestamp = param.Number(0)
_keys_ttl = param.Number(300)
def fetch_keys(self, headers={}):
with httpx.Client(base_url=self.oauth_domain, headers=headers) as client:
r = client.get(self.cert_path)
r.raise_for_status()
keys = r.json()
self._keyset = jose.JsonWebKey.import_key_set(keys)
def extract_claims(self, token):
header_str = authlib.common.encoding.urlsafe_b64decode(token.split(".")[0].encode()).decode('utf-8')
header = authlib.common.encoding.json_loads(header_str)
key = self.find_by_kid(header["kid"])
return jose.jwt.decode(token, key)
def extract_verified_claims(self, token, options={}):
try:
claims = self.extract_claims(token)
claims.options = options
claims.validate()
return claims
except Exception as e:
logger.error(f"Exception raised while validating claims: {e}")
return jose.JWTClaims("", "")
def validate_claims(self, token, **required_claims):
options = {k: {"value": v, "essential": True} for k,v in required_claims.items()}
claims = self.extract_claims(token)
claims.options = options
claims.validate()
def find_by_kid(self, kid):
if kid not in self._keyset.keys:
self.fetch_keys()
return self._keyset.find_by_kid(kid)
def __getitem__(self, kid):
return self.find_by_kid(kid)
certs = XeKeySet()
import time
import param
import httpx
from .settings import config
from .token import XeToken
from .oauth import XeAuthStep
class XeTokenRequest(XeAuthStep):
oauth_domain = param.String(config.OAUTH_DOMAIN)
oauth_token_path = param.String(config.OAUTH_TOKEN_PATH)
user_code = param.String()
device_code = param.String()
client_id = param.String()
headers = param.Dict()
verification_uri = param.String()
verification_uri_complete = param.String()
expires = param.Number()
interval = param.Number(5)
open_browser = param.Boolean(True)
def prompt(self, p):
print(f'Please visit the following URL to complete '
f'the login: {self.verification_uri_complete}', file=p.console)
if p.open_browser:
import webbrowser
webbrowser.open(self.verification_uri_complete)
return p
def perform(self, p):
while True:
if time.time()>p.expires:
raise TimeoutError("Device code hase expired but not yet authorized.")
try:
s = self.fetch_token(p.oauth_domain, p.oauth_token_path,
p.device_code, p.client_id, headers=p.headers)
return s
except Exception as e:
time.sleep(p.interval)
def fetch_token(self, oauth_domain, oauth_token_path, device_code, client_id, headers={}):
with httpx.Client(base_url=oauth_domain, headers=headers) as client:
r = client.post(
oauth_token_path,
data={
"grant_type": "urn:ietf:params:oauth:grant-type:device_code",
"device_code": device_code,
"client_id": client_id,
},
headers={"content-type": "application/x-www-form-urlencoded"},
)
r.raise_for_status()
params = r.json()
params["expires"] = time.time() + params.pop("expires_in", 1e6)
params["client_id"] = self.client_id
params['oauth_domain'] = oauth_domain
params['oauth_token_path'] = oauth_token_path
return XeToken(**params)
class XeAuthCodeRequest(XeAuthStep):
oauth_domain = param.String(config.OAUTH_DOMAIN)
oauth_code_path = param.String(config.OAUTH_CODE_PATH)
client_id = param.String(config.DEFAULT_CLIENT_ID)
scopes = param.List(config.DEFAULT_SCOPE.split(' '))
audience = param.String(config.DEFAULT_AUDIENCE)
extra_fields = param.Dict({})
headers = param.Dict({})
@property
def scope_str(self):
return ' '.join(self.scopes)
def perform(self, p):
data = {
"client_id": p.client_id,
"scope": ' '.join(p.scopes),
"audience": p.audience,
}
data.update(p.extra_fields)
with httpx.Client(base_url=p.oauth_domain, headers=p.headers) as client:
r = client.post(
p.oauth_code_path,
data=data,
headers={"content-type": "application/x-www-form-urlencoded"})
r.raise_for_status()
params = r.json()
params['expires'] = time.time() + params.pop("expires_in", 1)
params['oauth_domain'] = p.oauth_domain
params['client_id'] = p.client_id
return XeTokenRequest.instance(**params)
from .eve_panel import XenonEveAuth
from .eve_server import XeTokenAuth
from warnings import warn
import param
from ..session import NotebookSession
try:
from eve_panel.auth import EveAuthBase
except ImportError:
class EveAuthBase:
def __init__(self) -> None:
raise RuntimeError('eve_panel not installed.')
class XenonEveAuth(NotebookSession, EveAuthBase):
# session = param.ClassSelector(NotebookSession, default=NotebookSession())
def get_headers(self):
"""Generate auth headers for HTTP requests.
Returns:
dict: Auth related headers to be included in all requests.
"""
if self.logged_in:
return {"Authorization": f"Bearer {self.access_token}"}
else:
return {}
def login(self, notify_email=None):
"""perform any actions required to aquire credentials.
Returns:
bool: whether login was successful
"""
self.login_requested(None)
self.authorize()
def set_credentials(self, **credentials):
"""Set the access credentials manually.
"""
for k,v in credentials.items():
if k in ['access_token', "id_token", "refresh_token", "expires"]:
setattr(self.token, k, v)
else:
setattr(self, k, v)
def credentials_view(self):
return self.gui
import xeauth
try:
from eve.auth import TokenAuth
from eve.utils import config
except ImportError:
class TokenAuth:
def __init__(self, *args, **kwargs):
raise RuntimeError('eve not installed.')
class XeTokenAuth(TokenAuth):
@property
def global_read_token(self):
return config.get('API_GLOBAL_READ_TOKEN', None)
@property
def audience(self):
return config.get('JWT_AUDIENCE', 'https//:api.pmts.xenonnt.org')
def check_auth(self, token, allowed_roles, resource, method):
if not token:
return False
if method in ['GET', 'HEAD'] and self.global_read_token==token:
return True
try:
xeauth.certs.validate_claims(token, audience=self.audience)
except:
return False
c = xeauth.certs.extract_verified_claims(token)
roles = c.get('scope', '').split(' ')
return any([r in allowed_roles for r in roles])
from xeauth.settings import config as xeconfig
import logging
log = logging.getLogger(__name__)
try:
from tornado.auth import OAuth2Mixin
from panel.auth import OAuthIDTokenLoginHandler
from panel.config import config as pnconfig
except ImportError:
class OAuthIDTokenLoginHandler:
def __init__(self, *args, **kwargs) -> None:
raise RuntimeError('panel not installed.')
class OAuth2Mixin:
def __init__(self, *args, **kwargs) -> None:
raise RuntimeError('panel not installed.')
class XenonPanelAuth(OAuthIDTokenLoginHandler, OAuth2Mixin):
_AUDIENCE = xeconfig.DEFAULT_AUDIENCE
_SCOPE = xeconfig.DEFAULT_SCOPE.split(' ')
_USER_KEY = 'email'
_EXTRA_TOKEN_PARAMS = {
'grant_type': 'authorization_code',
}
_EXTRA_AUTHORIZE_PARAMS = {
'grant_type': 'authorization_code',
}
_OAUTH_ACCESS_TOKEN_URL_ = 'https://{0}.auth0.com/oauth/token'
_OAUTH_AUTHORIZE_URL_ = 'https://{0}.auth0.com/authorize'
_OAUTH_USER_URL_ = 'https://{0}.auth0.com/userinfo?access_token='
@property
def _OAUTH_ACCESS_TOKEN_URL(self):
url = pnconfig.oauth_extra_params.get('subdomain', xeconfig.AUTH0_SUBDOMAIN)
return self._OAUTH_ACCESS_TOKEN_URL_.format(url)
@property
def _OAUTH_AUTHORIZE_URL(self):
url = pnconfig.oauth_extra_params.get('subdomain', xeconfig.AUTH0_SUBDOMAIN)
return self._OAUTH_AUTHORIZE_URL_.format(url)
@property
def _OAUTH_USER_URL(self):
url = pnconfig.oauth_extra_params.get('subdomain', xeconfig.AUTH0_SUBDOMAIN)
return self._OAUTH_USER_URL_.format(url)
async def get_authenticated_user(self, redirect_uri, client_id, state,
client_secret=None, code=None):
"""
Fetches the authenticated user
Arguments
---------
redirect_uri: (str)
The OAuth redirect URI
client_id: (str)
The OAuth client ID
state: (str)
The unguessable random string to protect against
cross-site request forgery attacks
client_secret: (str, optional)
The client secret
code: (str, optional)
The response code from the server
"""
if code:
return await self._fetch_access_token(
code,
redirect_uri,
client_id,
client_secret
)
params = {
'redirect_uri': redirect_uri,
'client_id': client_id,
'client_secret': client_secret,
'extra_params': {
'state': state,
},
}
if self._SCOPE is not None:
params['scope'] = self._SCOPE
if 'scope' in pnconfig.oauth_extra_params:
params['scope'] = pnconfig.oauth_extra_params['scope']
if self._AUDIENCE is not None:
params['extra_params']['audience'] = self._AUDIENCE
if 'audience' in pnconfig.oauth_extra_params:
params['extra_params']['audience'] = pnconfig.oauth_extra_params['audience']
log.debug("%s making authorize request" % type(self).__name__)
self.authorize_redirect(**params)
import io
import sys
import param
class XeAuthStep(param.ParameterizedFunction):
auto_advance = param.Boolean(True)
prompt_response = param.Parameter(instantiate=False)
console = param.ClassSelector(io.IOBase, default=sys.stdout,
instantiate=False, pickle_default_value=False)
def perform(self, p):
pass
def prompt(self, p):
return p
def __call__(self, **params):
p = param.ParamOverrides(self, params)
p = self.prompt(p)
next = self.perform(p)
if isinstance(next, XeAuthStep) and p.auto_advance:
params = {k:v for k,v in params.items() if k in next.param.params()}
next = next(**params)
return next
import time
import param
import httpx
from .settings import config
from .token import XeToken
from .oauth import XeAuthStep
class TokenRefresh(XeAuthStep):
client_id = param.String(config.DEFAULT_CLIENT_ID)
oauth_domain = param.String(config.OAUTH_DOMAIN)
oauth_token_path = param.String(config.OAUTH_TOKEN_PATH)
access_token = param.String(readonly=True)
id_token = param.String(readonly=True)
refresh_token = param.String(readonly=True)
def perform(self, p):
with httpx.Client(base_url=self.oauth_domain, headers=p.headers) as client:
r = client.post(
p.oauth_token_path,
headers={"content-type":"application/x-www-form-urlencoded"},
data={
"grant_type": "refresh_token",
"refresh_token": p.refresh_token,
"client_id": p.client_id,
}
)
r.raise_for_status()
params = r.json()
params["expires"] = time.time() + params.pop("expires_in", 1e6)
params["client_id"] = p.client_id
params['oauth_domain'] = p.oauth_domain
params['oauth_token_path'] = p.oauth_token_path
return XeToken(**params)
import os
import param
import httpx
import time
import webbrowser
import logging
from datetime import datetime
from contextlib import contextmanager, asynccontextmanager
from .settings import config
from .utils import url_link_button, id_token_from_server_state
from .certificates import certs
from .token import XeToken
from .device_auth_flow import XeAuthCodeRequest
from .oauth import XeAuthStep
logger = logging.getLogger(__name__)
class XeAuthSession(param.Parameterized):
keyset = certs
oauth_domain = param.String(config.OAUTH_DOMAIN)
oauth_code_path = param.String(config.OAUTH_CODE_PATH)
oauth_token_path = param.String(config.OAUTH_TOKEN_PATH)
auto_persist_session = param.Boolean(False)
token_file = param.String(config.TOKEN_FILE)
client_id = param.String(config.DEFAULT_CLIENT_ID)
scopes = param.List([])
audience = param.String(config.DEFAULT_AUDIENCE)
notify_email = param.String(allow_None=True)
flow = param.ClassSelector(default=XeAuthCodeRequest, class_=XeAuthCodeRequest, is_instance=False)
token = param.ClassSelector(XeToken, default=None)
state = param.Selector(["Disconnected", "Logged in", "Awaiting token",
"Checking token ready", "Token expired"],
default="Disconnected")
def __init__(self, **params):
super().__init__(**params)
if os.path.isfile(self.token_file):
self.token = XeToken.from_file(self.token_file)
if self.logged_in:
self.state = "Logged in"
def login_from_server(self):
import panel as pn
access_token = pn.state.access_token
id_token = id_token_from_server_state()
self.token = XeToken(access_token=access_token,
id_token=id_token,
)
return self.token
@property
def scope(self):
scopes = set(config.DEFAULT_SCOPE.split(" ") + self.scopes)
return " ".join(scopes)
def login(self, open_browser=False, print_url=False, extra_headers={}, notify_email=None):
extra_fields = {}
if notify_email:
extra_fields["notify_email"] = notify_email
if self.token:
self.state = "Logged in"
else:
self.token = self.flow(self.oauth_domain, self.oauth_code_path, self.oauth_token_path,
self.client_id, self.scope, self.audience, headers=extra_headers,
extra_fields=extra_fields,
open_browser=open_browser, print_url=print_url)
if self.token:
self.state = "Logged in"
return self.token
def persist_token(self):
if self.token_file:
try:
self.token.to_file(self.token_file)
except Exception as e:
logger.error("Exception raised while persisted token: "+str(e))
def logout(self):
self.token = None
self.state = "Disconnected"
return True
def request_token(self, extra_headers={}):
self.flow.request_token(self.oauth_domain, self.oauth_code_path,
self.client_id, self.scope, self.audience, headers=extra_headers)
self.state = "Awaiting token"
def token_ready(self, extra_headers={}):
if self.token is None:
try:
self.token = self.flow.fetch_token(self.oauth_domain,
self.oauth_token_path, headers=extra_headers)
if self.auto_persist_session:
self.persist_token()
self.state = "Logged in"
return True
except:
pass
return False
def refresh_tokens(self, extra_headers={}):
self.token.refresh_tokens(self.oauth_domain, self.oauth_token_path,
self.client_id, headers=extra_headers)
if self.auto_persist_session:
self.persist_token()
@property
def logged_in(self):
if self.token is None:
return False
if self.token.expired:
return False
return True
@property
def id_token(self):
return self.token.id_token
@property
def access_token(self):
return self.token.access_token
@property
def profile(self):
claims = self.keyset.extract_verified_claims(self.id_token)
return {k:v for k,v in claims.items() if k not in claims.REGISTERED_CLAIMS}
@property
def claims(self):
claims = self.keyset.extract_verified_claims(self.access_token)
return {k:v for k,v in claims.items() if k in claims.REGISTERED_CLAIMS}
@property
def extra_claims(self):
claims = self.keyset.extract_verified_claims(self.access_token)
return {k:v for k,v in claims.items() if k not in claims.REGISTERED_CLAIMS}
@property
def permissions(self):
claims = self.keyset.extract_verified_claims(self.access_token)
return claims.get("permissions", [])
@contextmanager
def Client(self, *args, **kwargs):
kwargs["headers"] = kwargs.get("headers", {})
kwargs["headers"]["Authorization"] = f"Bearer {self.access_token}"
client = httpx.Client(*args, **kwargs)
try:
yield client
finally:
client.close()
@asynccontextmanager
async def AsyncClient(self, *args, **kwargs ):
kwargs["headers"] = kwargs.get("headers", {})
kwargs["headers"]["Authorization"] = f"Bearer {self.access_token}"
client = httpx.AsyncClient(*args, **kwargs)
try:
yield client
finally:
await client.aclose()
def authorize(self):
webbrowser.open(self.flow.verification_uri_complete)
class NotebookSession(XeAuthSession):
message = param.String("")
_gui = None
_cb = None
@property
def gui(self):
import panel as pn
if self._gui is None:
self._gui = pn.panel(self._make_gui)
return self._gui
def await_token_cb(self):
if self.token_ready() and self.token:
self._cb.stop()
def login_requested(self, event):
try:
import panel as pn
self.request_token()
logger.info("Sent request...")
self._cb = pn.state.add_periodic_callback(self.await_token_cb,
1000*self.flow.interval,
timeout=1000*max(1, int(self.flow.expires-time.time())))
except Exception as e:
logging.error(e)
print(e)
def logged_in_gui(self):
import panel as pn
profile = self.profile
details = pn.Row(
pn.pane.PNG(profile.get("picture", config.DEFAULT_AVATAR), width=60, height=60),
pn.widgets.TextInput(name='Name', value=profile.get("name", "Unknown"), disabled=True, height=35),
pn.widgets.TextInput(name='Email', value=profile.get("email", "Unknown"), disabled=True, height=35),
height=70
)
token = pn.widgets.input.TextAreaInput(name='Access token', value=self.access_token, width=700, height=100)
token_props = pn.Row(
pn.widgets.TextInput(name='Scope', value=self.token.scope, disabled=True, height=35),
pn.widgets.DatetimeInput(disabled=True, name="Expiration date",
value=datetime.utcfromtimestamp(self.token.expires)),
width=700,
)
logout = pn.widgets.Button(name="Logout", button_type='warning')
logout.on_click(lambda event: self.logout())
return pn.Column(
details,
token,
token_props,
logout,
height=300,
width=700)
def awaiting_token_gui(self):
import panel as pn
cancel = pn.widgets.Button(name="Cancel", width=50)
cancel.on_click(lambda event: self.logout())
authenticate = url_link_button(self.flow.verification_uri_complete, button_type='primary', width=100)
waiting_ind = pn.indicators.LoadingSpinner(value=False, width=30, height=30, align="center")
def activate(event):
waiting_ind.value = True
authenticate.on_click(activate)
return pn.Row(waiting_ind, authenticate, cancel)
def token_expired_gui(self):
import panel as pn
refresh = pn.widgets.Button(name="Renew", align="end")
refresh.on_click(lambda e: self.refresh_tokens())
return refresh
@param.depends("state")
def _make_gui(self):
import panel as pn
status = pn.indicators.BooleanStatus(width=20, height=20, value=True, color="danger", align="center")
header = pn.Row(status, f"Status: {self.state}.")
panel = pn.Column(header)
if self.state == "Logged in":
status.color = "success"
panel.append(self.logged_in_gui())
elif self.state == "Awaiting token":
status.color = "primary"
panel.append(self.awaiting_token_gui())
elif self.state == "Token expired":
status.color = "warning"
return pn.panel(self.token_expired_gui())
else:
login = pn.widgets.Button(name="Login", button_type='success', width=30)
login.on_click(self.login_requested)
panel.append(pn.Param(self.param, parameters=["scopes", "auto_persist_session"]))
panel.append(login)
return panel
def _repr_mimebundle_(self, include=None, exclude=None):
return self.gui._repr_mimebundle_(include=include, exclude=exclude)
import param
import time
import httpx
import json
from contextlib import contextmanager, asynccontextmanager
from .settings import config
from .utils import id_token_from_server_state
from .certificates import certs
class XeToken(param.Parameterized):
client_id = param.String(config.DEFAULT_CLIENT_ID)
oauth_domain = param.String(config.OAUTH_DOMAIN)
oauth_token_path = param.String(config.OAUTH_TOKEN_PATH)
access_token = param.String(constant=True)
id_token = param.String(constant=True)
refresh_token = param.String(constant=True)
expires = param.Number(constant=True)
scope = param.String(constant=True)
token_type = param.String("Bearer", constant=True)
@property
def expired(self):
return time.time()>self.expires
@property
def profile(self):
claims = certs.extract_verified_claims(self.id_token)
return {k:v for k,v in claims.items() if k not in claims.REGISTERED_CLAIMS}
@property
def username(self):
return self.profile.get('name', 'unknown')
@property
def claims(self):
claims = certs.extract_verified_claims(self.access_token)
return {k:v for k,v in claims.items() if k in claims.REGISTERED_CLAIMS}
@property
def extra_claims(self):
claims = certs.extract_verified_claims(self.access_token)
return {k:v for k,v in claims.items() if k not in claims.REGISTERED_CLAIMS}
@property
def permissions(self):
claims = certs.extract_verified_claims(self.access_token)
return claims.get("permissions", [])
@classmethod
def from_file(cls, path):
with open(path, "r") as f:
data = json.load(f)
return cls(**data)
@classmethod
def from_panel_server(cls):
import panel as pn
access_token = pn.state.access_token
id_token = id_token_from_server_state()
token = cls(access_token=access_token,
id_token=id_token,
)
return token
def to_file(self, path):
with open(path, "w") as f:
json.dump(self.to_dict(), f)
def to_dict(self):
return {k:v for k,v in self.param.get_param_values() if not k.startswith("_")}
def refresh(self, headers={}):
with httpx.Client(base_url=self.oauth_domain, headers=headers) as client:
r = client.post(
self.oauth_token_path,
headers={"content-type":"application/x-www-form-urlencoded"},
data={
"grant_type": "refresh_token",
"refresh_token": self.refresh_token,
"client_id": self.client_id,
}
)
r.raise_for_status()
params = r.json()
params["expires"] = time.time() + params.pop("expires_in", 1e6)
self.param.set_param(**params)
@contextmanager
def Client(self, *args, **kwargs):
kwargs["headers"] = kwargs.get("headers", {})
kwargs["headers"]["Authorization"] = f"Bearer {self.access_token}"
client = httpx.Client(*args, **kwargs)
try:
yield client
finally:
client.close()
@asynccontextmanager
async def AsyncClient(self, *args, **kwargs ):
kwargs["headers"] = kwargs.get("headers", {})
kwargs["headers"]["Authorization"] = f"Bearer {self.access_token}"
client = httpx.AsyncClient(*args, **kwargs)
try:
yield client
finally:
await client.aclose()
def __repr__(self):
return ("XeToken("
f"user={self.profile.get('name', 'unknown')}, "
f"access_token={self.access_token})"
)
import param
import httpx
import time
import getpass
from .oauth import XeAuthStep
from .token import XeToken
from .settings import config
class UserCredentialsAuth(XeAuthStep):
username = param.String(default=None)
password = param.String(default=None)
auth_url = param.String(config.OAUTH_DOMAIN.rstrip('/')+'/token')
audience = param.String(config.DEFAULT_AUDIENCE)
scopes = param.List(config.DEFAULT_SCOPE.split(' '))
client_id = param.String(config.DEFAULT_CLIENT_ID)
headers = param.Dict({'content-type': 'application/x-www-form-urlencoded'})
def prompt(self, p):
if p.username is None:
p.username = getpass.getuser()
if p.password is None:
p.password = getpass.getpass()
return p
def perform(self, p):
data = dict(
grant_type='password',
username=p.username,
password=p.password,
audience=p.audience,
scope=' '.join(p.scopes),
client_id=p.client_id,
)
r = httpx.post(p.auth_url, data=data, headers=p.headers)
r.raise_for_status()
kwargs = r.json()
kwargs['expires'] = time.time() + kwargs.pop('expires_in')
return XeToken(client_id=p.client_id, **kwargs)
import os
from .settings import config
from .user_credentials import UserCredentialsAuth
from .device_auth_flow import XeAuthCodeRequest
from .certificates import certs
user_login = UserCredentialsAuth.instance(auto_advance=True)
device_login = XeAuthCodeRequest.instance(auto_advance=True)
def login(username=None, password=None, **kwargs):
if username is None:
return device_login(**kwargs)
return user_login(username=username,
password=password,
**kwargs)
def cli_login(**kwargs):
token = login(**kwargs)
print(f"logged in as: {token.profile.get('name', 'unknown')}")
print(f"Access token: {token.access_token}")
print(f"ID token: {token.id_token}")
def validate_claims(token, **claims):
return certs.validate_claims(token, **claims)
def clear_cache():
os.remove(config.CACHE_FILE)
def cmt_login(scopes=[], **kwargs):
if isinstance(scopes, str):
scopes = scopes.split(' ')
if not isinstance(scopes, list):
raise TypeError('scopes must be a list or string')
scopes = set(scopes)
scopes.add('read:all')
scopes = list(scopes)
audience = kwargs.pop('audience', 'https://api.cmt.xenonnt.org')
# base_url = kwargs.pop('base_url', DEFAULT_BASE_URL)
return login(audience=audience, scopes=scopes, **kwargs)