xeauth
Advanced tools
| import time | ||
| import param | ||
| import httpx | ||
| from .settings import config | ||
| from .token import XeToken | ||
| from .oauth import XeAuthStep | ||
| class XeTokenRequest(XeAuthStep): | ||
| oauth_domain = param.String(config.OAUTH_DOMAIN) | ||
| oauth_token_path = param.String(config.OAUTH_TOKEN_PATH) | ||
| user_code = param.String() | ||
| device_code = param.String() | ||
| client_id = param.String() | ||
| headers = param.Dict() | ||
| verification_uri = param.String() | ||
| verification_uri_complete = param.String() | ||
| expires = param.Number() | ||
| interval = param.Number(5) | ||
| open_browser = param.Boolean(True) | ||
| def prompt(self, p): | ||
| print(f'Please visit the following URL to complete ' | ||
| f'the login: {self.verification_uri_complete}', file=p.console) | ||
| if p.open_browser: | ||
| import webbrowser | ||
| webbrowser.open(self.verification_uri_complete) | ||
| return p | ||
| def perform(self, p): | ||
| while True: | ||
| if time.time()>p.expires: | ||
| raise TimeoutError("Device code hase expired but not yet authorized.") | ||
| try: | ||
| s = self.fetch_token(p.oauth_domain, p.oauth_token_path, | ||
| p.device_code, p.client_id, headers=p.headers) | ||
| return s | ||
| except Exception as e: | ||
| time.sleep(p.interval) | ||
| def fetch_token(self, oauth_domain, oauth_token_path, device_code, client_id, headers={}): | ||
| with httpx.Client(base_url=oauth_domain, headers=headers) as client: | ||
| r = client.post( | ||
| oauth_token_path, | ||
| data={ | ||
| "grant_type": "urn:ietf:params:oauth:grant-type:device_code", | ||
| "device_code": device_code, | ||
| "client_id": client_id, | ||
| }, | ||
| headers={"content-type": "application/x-www-form-urlencoded"}, | ||
| ) | ||
| r.raise_for_status() | ||
| params = r.json() | ||
| params["expires"] = time.time() + params.pop("expires_in", 1e6) | ||
| params["client_id"] = self.client_id | ||
| params['oauth_domain'] = oauth_domain | ||
| params['oauth_token_path'] = oauth_token_path | ||
| return XeToken(**params) | ||
| class XeAuthCodeRequest(XeAuthStep): | ||
| oauth_domain = param.String(config.OAUTH_DOMAIN) | ||
| oauth_code_path = param.String(config.OAUTH_CODE_PATH) | ||
| client_id = param.String(config.DEFAULT_CLIENT_ID) | ||
| scopes = param.List(config.DEFAULT_SCOPE.split(' ')) | ||
| audience = param.String(config.DEFAULT_AUDIENCE) | ||
| extra_fields = param.Dict({}) | ||
| headers = param.Dict({}) | ||
| @property | ||
| def scope_str(self): | ||
| return ' '.join(self.scopes) | ||
| def perform(self, p): | ||
| data = { | ||
| "client_id": p.client_id, | ||
| "scope": ' '.join(p.scopes), | ||
| "audience": p.audience, | ||
| } | ||
| data.update(p.extra_fields) | ||
| with httpx.Client(base_url=p.oauth_domain, headers=p.headers) as client: | ||
| r = client.post( | ||
| p.oauth_code_path, | ||
| data=data, | ||
| headers={"content-type": "application/x-www-form-urlencoded"}) | ||
| r.raise_for_status() | ||
| params = r.json() | ||
| params['expires'] = time.time() + params.pop("expires_in", 1) | ||
| params['oauth_domain'] = p.oauth_domain | ||
| params['client_id'] = p.client_id | ||
| return XeTokenRequest.instance(**params) | ||
| import time | ||
| import param | ||
| import httpx | ||
| from .settings import config | ||
| from .token import XeToken | ||
| from .oauth import XeAuthStep | ||
| class TokenRefresh(XeAuthStep): | ||
| client_id = param.String(config.DEFAULT_CLIENT_ID) | ||
| oauth_domain = param.String(config.OAUTH_DOMAIN) | ||
| oauth_token_path = param.String(config.OAUTH_TOKEN_PATH) | ||
| access_token = param.String(readonly=True) | ||
| id_token = param.String(readonly=True) | ||
| refresh_token = param.String(readonly=True) | ||
| def perform(self, p): | ||
| with httpx.Client(base_url=self.oauth_domain, headers=p.headers) as client: | ||
| r = client.post( | ||
| p.oauth_token_path, | ||
| headers={"content-type":"application/x-www-form-urlencoded"}, | ||
| data={ | ||
| "grant_type": "refresh_token", | ||
| "refresh_token": p.refresh_token, | ||
| "client_id": p.client_id, | ||
| } | ||
| ) | ||
| r.raise_for_status() | ||
| params = r.json() | ||
| params["expires"] = time.time() + params.pop("expires_in", 1e6) | ||
| params["client_id"] = p.client_id | ||
| params['oauth_domain'] = p.oauth_domain | ||
| params['oauth_token_path'] = p.oauth_token_path | ||
| return XeToken(**params) |
+1
-1
| Metadata-Version: 2.1 | ||
| Name: xeauth | ||
| Version: 0.1.19 | ||
| Version: 0.1.20 | ||
| Summary: Top-level package for xeauth. | ||
@@ -5,0 +5,0 @@ Home-page: https://github.com/jmosbacher/xeauth |
+1
-1
| [tool] | ||
| [tool.poetry] | ||
| name = "xeauth" | ||
| version = "0.1.19" | ||
| version = "0.1.20" | ||
| homepage = "https://github.com/jmosbacher/xeauth" | ||
@@ -6,0 +6,0 @@ description = "Top-level package for xeauth." |
+1
-1
@@ -24,3 +24,3 @@ # -*- coding: utf-8 -*- | ||
| 'name': 'xeauth', | ||
| 'version': '0.1.19', | ||
| 'version': '0.1.20', | ||
| 'description': 'Top-level package for xeauth.', | ||
@@ -27,0 +27,0 @@ 'long_description': '======\nxeauth\n======\n\n\n.. image:: https://img.shields.io/pypi/v/xeauth.svg\n :target: https://pypi.python.org/pypi/xeauth\n\n.. image:: https://img.shields.io/travis/jmosbacher/xeauth.svg\n :target: https://travis-ci.com/jmosbacher/xeauth\n\n.. image:: https://readthedocs.org/projects/xeauth/badge/?version=latest\n :target: https://xeauth.readthedocs.io/en/latest/?badge=latest\n :alt: Documentation Status\n\n\n\n\nAuthentication client for the Xenon edark matter experiment.\n\n\n* Free software: MIT\n* Documentation: https://xeauth.readthedocs.io.\n\n\nFeatures\n--------\n\n* TODO\n\nCredits\n-------\n\nThis package was created with Cookiecutter_ and the `briggySmalls/cookiecutter-pypackage`_ project template.\n\n.. _Cookiecutter: https://github.com/audreyr/cookiecutter\n.. _`briggySmalls/cookiecutter-pypackage`: https://github.com/briggySmalls/cookiecutter-pypackage\n', |
@@ -8,2 +8,2 @@ """Top-level package for xeauth.""" | ||
| __email__ = 'joe.mosbacher@gmail.com' | ||
| __version__ = '0.1.19' | ||
| __version__ = '0.1.20' |
| from warnings import warn | ||
| import param | ||
| import panel as pn | ||
| from ..oauth import NotebookSession | ||
| from ..session import NotebookSession | ||
| try: | ||
@@ -8,0 +7,0 @@ from eve_panel.auth import EveAuthBase |
+5
-163
@@ -1,12 +0,6 @@ | ||
| import os | ||
| import io | ||
| import sys | ||
| import time | ||
| import param | ||
| import httpx | ||
| from .settings import config | ||
| from .token import XeToken | ||
| class XeAuthStep(param.ParameterizedFunction): | ||
@@ -22,9 +16,11 @@ auto_advance = param.Boolean(True) | ||
| def prompt(self, p): | ||
| pass | ||
| return p | ||
| def __call__(self, **params): | ||
| p = param.ParamOverrides(self, params) | ||
| prompt_response = self.prompt(p) | ||
| p['prompt_response'] = prompt_response | ||
| p = self.prompt(p) | ||
| next = self.perform(p) | ||
| if isinstance(next, XeAuthStep) and p.auto_advance: | ||
@@ -35,155 +31,1 @@ params = {k:v for k,v in params.items() if k in next.param.params()} | ||
| class XeTokenRequest(XeAuthStep): | ||
| oauth_domain = param.String(config.OAUTH_DOMAIN) | ||
| oauth_token_path = param.String(config.OAUTH_TOKEN_PATH) | ||
| user_code = param.String() | ||
| device_code = param.String() | ||
| client_id = param.String() | ||
| headers = param.Dict() | ||
| verification_uri = param.String() | ||
| verification_uri_complete = param.String() | ||
| expires = param.Number() | ||
| interval = param.Number(5) | ||
| open_browser = param.Boolean(True) | ||
| def prompt(self, p): | ||
| print(f'Please visit the following URL to complete ' | ||
| f'the login: {self.verification_uri_complete}', file=p.console) | ||
| if p.open_browser: | ||
| import webbrowser | ||
| webbrowser.open(self.verification_uri_complete) | ||
| def perform(self, p): | ||
| while True: | ||
| if time.time()>p.expires: | ||
| raise TimeoutError("Device code hase expired but not yet authorized.") | ||
| try: | ||
| s = self.fetch_token(p.oauth_domain, p.oauth_token_path, | ||
| p.device_code, p.client_id, headers=p.headers) | ||
| return s | ||
| except Exception as e: | ||
| time.sleep(p.interval) | ||
| def fetch_token(self, oauth_domain, oauth_token_path, device_code, client_id, headers={}): | ||
| with httpx.Client(base_url=oauth_domain, headers=headers) as client: | ||
| r = client.post( | ||
| oauth_token_path, | ||
| data={ | ||
| "grant_type": "urn:ietf:params:oauth:grant-type:device_code", | ||
| "device_code": device_code, | ||
| "client_id": client_id, | ||
| }, | ||
| headers={"content-type": "application/x-www-form-urlencoded"}, | ||
| ) | ||
| r.raise_for_status() | ||
| params = r.json() | ||
| params["expires"] = time.time() + params.pop("expires_in", 1e6) | ||
| params["client_id"] = self.client_id | ||
| params['oauth_domain'] = oauth_domain | ||
| params['oauth_token_path'] = oauth_token_path | ||
| return XeToken(**params) | ||
| class XeAuthCodeRequest(XeAuthStep): | ||
| oauth_domain = param.String(config.OAUTH_DOMAIN) | ||
| oauth_code_path = param.String(config.OAUTH_CODE_PATH) | ||
| client_id = param.String(config.DEFAULT_CLIENT_ID) | ||
| scopes = param.List(config.DEFAULT_SCOPE.split(' ')) | ||
| audience = param.String(config.DEFAULT_AUDIENCE) | ||
| extra_fields = param.Dict({}) | ||
| headers = param.Dict({}) | ||
| @property | ||
| def scope_str(self): | ||
| return ' '.join(self.scopes) | ||
| def prompt(self, p): | ||
| pass | ||
| def perform(self, p): | ||
| data = { | ||
| "client_id": p.client_id, | ||
| "scope": ' '.join(p.scopes), | ||
| "audience": p.audience, | ||
| } | ||
| data.update(p.extra_fields) | ||
| with httpx.Client(base_url=p.oauth_domain, headers=p.headers) as client: | ||
| r = client.post( | ||
| p.oauth_code_path, | ||
| data=data, | ||
| headers={"content-type": "application/x-www-form-urlencoded"}) | ||
| r.raise_for_status() | ||
| params = r.json() | ||
| params['expires'] = time.time() + params.pop("expires_in", 1) | ||
| params['oauth_domain'] = p.oauth_domain | ||
| params['client_id'] = p.client_id | ||
| return XeTokenRequest.instance(**params) | ||
| class TokenRefresh(XeAuthStep): | ||
| client_id = param.String(config.DEFAULT_CLIENT_ID) | ||
| oauth_domain = param.String(config.OAUTH_DOMAIN) | ||
| oauth_token_path = param.String(config.OAUTH_TOKEN_PATH) | ||
| access_token = param.String(readonly=True) | ||
| id_token = param.String(readonly=True) | ||
| refresh_token = param.String(readonly=True) | ||
| def perform(self, p): | ||
| with httpx.Client(base_url=self.oauth_domain, headers=p.headers) as client: | ||
| r = client.post( | ||
| p.oauth_token_path, | ||
| headers={"content-type":"application/x-www-form-urlencoded"}, | ||
| data={ | ||
| "grant_type": "refresh_token", | ||
| "refresh_token": p.refresh_token, | ||
| "client_id": p.client_id, | ||
| } | ||
| ) | ||
| r.raise_for_status() | ||
| params = r.json() | ||
| params["expires"] = time.time() + params.pop("expires_in", 1e6) | ||
| params["client_id"] = p.client_id | ||
| params['oauth_domain'] = p.oauth_domain | ||
| params['oauth_token_path'] = p.oauth_token_path | ||
| return XeToken(**params) | ||
| class UserCredentialsAuth(XeAuthStep): | ||
| username = param.String() | ||
| password = param.String() | ||
| auth_url = param.String(config.OAUTH_DOMAIN.rstrip('/')+'/token') | ||
| audience = param.String(config.DEFAULT_AUDIENCE) | ||
| scopes = param.List(config.DEFAULT_SCOPE.split(' ')) | ||
| client_id = param.String(config.DEFAULT_CLIENT_ID) | ||
| headers = param.Dict({'content-type': 'application/x-www-form-urlencoded'}) | ||
| def perform(self, p): | ||
| data = dict( | ||
| grant_type='password', | ||
| username=p.username, | ||
| password=p.password, | ||
| audience=p.audience, | ||
| scope=' '.join(p.scope), | ||
| client_id=p.client_id, | ||
| ) | ||
| r = httpx.post(p.auth_url, data=data, headers=p.headers) | ||
| r.raise_for_status() | ||
| kwargs = r.json() | ||
| kwargs['expires'] = time.time() + kwargs.pop('expires_in') | ||
| return XeToken(client_id=p.client_id, **kwargs) | ||
@@ -7,3 +7,2 @@ import os | ||
| import logging | ||
| import panel as pn | ||
@@ -73,3 +72,3 @@ from datetime import datetime | ||
| else: | ||
| self.token = self.flow.perform(self.oauth_domain, self.oauth_code_path, self.oauth_token_path, | ||
| self.token = self.flow(self.oauth_domain, self.oauth_code_path, self.oauth_token_path, | ||
| self.client_id, self.scope, self.audience, headers=extra_headers, | ||
@@ -187,2 +186,3 @@ extra_fields=extra_fields, | ||
| def gui(self): | ||
| import panel as pn | ||
| if self._gui is None: | ||
@@ -198,2 +198,3 @@ self._gui = pn.panel(self._make_gui) | ||
| try: | ||
| import panel as pn | ||
| self.request_token() | ||
@@ -209,2 +210,3 @@ logger.info("Sent request...") | ||
| def logged_in_gui(self): | ||
| import panel as pn | ||
| profile = self.profile | ||
@@ -211,0 +213,0 @@ details = pn.Row( |
+0
-1
@@ -11,3 +11,2 @@ import param | ||
| class XeToken(param.Parameterized): | ||
@@ -14,0 +13,0 @@ client_id = param.String(config.DEFAULT_CLIENT_ID) |
@@ -1,33 +0,40 @@ | ||
| # import param | ||
| # import httpx | ||
| # import time | ||
| # from .settings import config | ||
| # from .token import XeToken | ||
| import param | ||
| import httpx | ||
| import time | ||
| import getpass | ||
| from .oauth import XeAuthStep | ||
| from .token import XeToken | ||
| from .settings import config | ||
| # class UserCredentialsAuth(param.Parameterized): | ||
| # AUTH_URL = param.String(config.OAUTH_DOMAIN.rstrip('/')+'/token') | ||
| # audience = param.String(config.DEFAULT_AUDIENCE) | ||
| # scope = param.String(config.DEFAULT_SCOPE) | ||
| # client_id = param.String(config.DEFAULT_CLIENT_ID) | ||
| # headers = param.Dict({'content-type': 'application/x-www-form-urlencoded'}) | ||
| class UserCredentialsAuth(XeAuthStep): | ||
| username = param.String(default=None) | ||
| password = param.String(default=None) | ||
| # def login(self, username, password, audience=None, scope=None): | ||
| # if scope is None: | ||
| # scope = self.scope | ||
| # if audience is None: | ||
| # audience = self.audience | ||
| auth_url = param.String(config.OAUTH_DOMAIN.rstrip('/')+'/token') | ||
| audience = param.String(config.DEFAULT_AUDIENCE) | ||
| scopes = param.List(config.DEFAULT_SCOPE.split(' ')) | ||
| client_id = param.String(config.DEFAULT_CLIENT_ID) | ||
| headers = param.Dict({'content-type': 'application/x-www-form-urlencoded'}) | ||
| # data = dict( | ||
| # grant_type='password', | ||
| # username=username, | ||
| # password=password, | ||
| # audience=audience, | ||
| # scope=scope, | ||
| # client_id=self.client_id, | ||
| # ) | ||
| # r = httpx.post(self.AUTH_URL, data=data, headers=self.headers) | ||
| # r.raise_for_status() | ||
| # kwargs = r.json() | ||
| # kwargs['expires'] = time.time() + kwargs.pop('expires_in') | ||
| # return XeToken(client_id=self.client_id, **kwargs) | ||
| def prompt(self, p): | ||
| if p.username is None: | ||
| p.username = getpass.getuser() | ||
| if p.password is None: | ||
| p.password = getpass.getpass() | ||
| return p | ||
| def perform(self, p): | ||
| data = dict( | ||
| grant_type='password', | ||
| username=p.username, | ||
| password=p.password, | ||
| audience=p.audience, | ||
| scope=' '.join(p.scopes), | ||
| client_id=p.client_id, | ||
| ) | ||
| r = httpx.post(p.auth_url, data=data, headers=p.headers) | ||
| r.raise_for_status() | ||
| kwargs = r.json() | ||
| kwargs['expires'] = time.time() + kwargs.pop('expires_in') | ||
| return XeToken(client_id=p.client_id, **kwargs) |
+14
-6
| import os | ||
| import panel as pn | ||
| import getpass | ||
| from .settings import config | ||
| # from .oauth import XeAuthSession, NotebookSession, UserCredentialsAuth | ||
| # from .user_credentials import UserCredentialsAuth | ||
| from .oauth import UserCredentialsAuth, XeAuthCodeRequest | ||
| from .user_credentials import UserCredentialsAuth | ||
| from .device_auth_flow import XeAuthCodeRequest | ||
| from .certificates import certs | ||
@@ -16,5 +13,15 @@ | ||
| login = XeAuthCodeRequest.instance(auto_advance=True) | ||
| device_login = XeAuthCodeRequest.instance(auto_advance=True) | ||
| def login(username=None, password=None, **kwargs): | ||
| if username is None: | ||
| return device_login(**kwargs) | ||
| return user_login(username=username, | ||
| password=password, | ||
| **kwargs) | ||
| def cli_login(**kwargs): | ||
@@ -44,1 +51,2 @@ token = login(**kwargs) | ||
| return login(audience=audience, scopes=scopes, **kwargs) | ||
Alert delta unavailable
Currently unable to show alert delta for PyPI packages.
23
9.52%40979
-1.37%865
-0.69%