Latest Threat Research:SANDWORM_MODE: Shai-Hulud-Style npm Worm Hijacks CI Workflows and Poisons AI Toolchains.Details
Socket
Book a DemoInstallSign in
Socket

xeauth

Package Overview
Dependencies
Maintainers
1
Versions
27
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

xeauth - npm Package Compare versions

Comparing version
0.1.19
to
0.1.20
+106
xeauth/device_auth_flow.py
import time
import param
import httpx
from .settings import config
from .token import XeToken
from .oauth import XeAuthStep
class XeTokenRequest(XeAuthStep):
oauth_domain = param.String(config.OAUTH_DOMAIN)
oauth_token_path = param.String(config.OAUTH_TOKEN_PATH)
user_code = param.String()
device_code = param.String()
client_id = param.String()
headers = param.Dict()
verification_uri = param.String()
verification_uri_complete = param.String()
expires = param.Number()
interval = param.Number(5)
open_browser = param.Boolean(True)
def prompt(self, p):
print(f'Please visit the following URL to complete '
f'the login: {self.verification_uri_complete}', file=p.console)
if p.open_browser:
import webbrowser
webbrowser.open(self.verification_uri_complete)
return p
def perform(self, p):
while True:
if time.time()>p.expires:
raise TimeoutError("Device code hase expired but not yet authorized.")
try:
s = self.fetch_token(p.oauth_domain, p.oauth_token_path,
p.device_code, p.client_id, headers=p.headers)
return s
except Exception as e:
time.sleep(p.interval)
def fetch_token(self, oauth_domain, oauth_token_path, device_code, client_id, headers={}):
with httpx.Client(base_url=oauth_domain, headers=headers) as client:
r = client.post(
oauth_token_path,
data={
"grant_type": "urn:ietf:params:oauth:grant-type:device_code",
"device_code": device_code,
"client_id": client_id,
},
headers={"content-type": "application/x-www-form-urlencoded"},
)
r.raise_for_status()
params = r.json()
params["expires"] = time.time() + params.pop("expires_in", 1e6)
params["client_id"] = self.client_id
params['oauth_domain'] = oauth_domain
params['oauth_token_path'] = oauth_token_path
return XeToken(**params)
class XeAuthCodeRequest(XeAuthStep):
oauth_domain = param.String(config.OAUTH_DOMAIN)
oauth_code_path = param.String(config.OAUTH_CODE_PATH)
client_id = param.String(config.DEFAULT_CLIENT_ID)
scopes = param.List(config.DEFAULT_SCOPE.split(' '))
audience = param.String(config.DEFAULT_AUDIENCE)
extra_fields = param.Dict({})
headers = param.Dict({})
@property
def scope_str(self):
return ' '.join(self.scopes)
def perform(self, p):
data = {
"client_id": p.client_id,
"scope": ' '.join(p.scopes),
"audience": p.audience,
}
data.update(p.extra_fields)
with httpx.Client(base_url=p.oauth_domain, headers=p.headers) as client:
r = client.post(
p.oauth_code_path,
data=data,
headers={"content-type": "application/x-www-form-urlencoded"})
r.raise_for_status()
params = r.json()
params['expires'] = time.time() + params.pop("expires_in", 1)
params['oauth_domain'] = p.oauth_domain
params['client_id'] = p.client_id
return XeTokenRequest.instance(**params)
import time
import param
import httpx
from .settings import config
from .token import XeToken
from .oauth import XeAuthStep
class TokenRefresh(XeAuthStep):
client_id = param.String(config.DEFAULT_CLIENT_ID)
oauth_domain = param.String(config.OAUTH_DOMAIN)
oauth_token_path = param.String(config.OAUTH_TOKEN_PATH)
access_token = param.String(readonly=True)
id_token = param.String(readonly=True)
refresh_token = param.String(readonly=True)
def perform(self, p):
with httpx.Client(base_url=self.oauth_domain, headers=p.headers) as client:
r = client.post(
p.oauth_token_path,
headers={"content-type":"application/x-www-form-urlencoded"},
data={
"grant_type": "refresh_token",
"refresh_token": p.refresh_token,
"client_id": p.client_id,
}
)
r.raise_for_status()
params = r.json()
params["expires"] = time.time() + params.pop("expires_in", 1e6)
params["client_id"] = p.client_id
params['oauth_domain'] = p.oauth_domain
params['oauth_token_path'] = p.oauth_token_path
return XeToken(**params)
+1
-1
Metadata-Version: 2.1
Name: xeauth
Version: 0.1.19
Version: 0.1.20
Summary: Top-level package for xeauth.

@@ -5,0 +5,0 @@ Home-page: https://github.com/jmosbacher/xeauth

[tool]
[tool.poetry]
name = "xeauth"
version = "0.1.19"
version = "0.1.20"
homepage = "https://github.com/jmosbacher/xeauth"

@@ -6,0 +6,0 @@ description = "Top-level package for xeauth."

@@ -24,3 +24,3 @@ # -*- coding: utf-8 -*-

'name': 'xeauth',
'version': '0.1.19',
'version': '0.1.20',
'description': 'Top-level package for xeauth.',

@@ -27,0 +27,0 @@ 'long_description': '======\nxeauth\n======\n\n\n.. image:: https://img.shields.io/pypi/v/xeauth.svg\n :target: https://pypi.python.org/pypi/xeauth\n\n.. image:: https://img.shields.io/travis/jmosbacher/xeauth.svg\n :target: https://travis-ci.com/jmosbacher/xeauth\n\n.. image:: https://readthedocs.org/projects/xeauth/badge/?version=latest\n :target: https://xeauth.readthedocs.io/en/latest/?badge=latest\n :alt: Documentation Status\n\n\n\n\nAuthentication client for the Xenon edark matter experiment.\n\n\n* Free software: MIT\n* Documentation: https://xeauth.readthedocs.io.\n\n\nFeatures\n--------\n\n* TODO\n\nCredits\n-------\n\nThis package was created with Cookiecutter_ and the `briggySmalls/cookiecutter-pypackage`_ project template.\n\n.. _Cookiecutter: https://github.com/audreyr/cookiecutter\n.. _`briggySmalls/cookiecutter-pypackage`: https://github.com/briggySmalls/cookiecutter-pypackage\n',

@@ -8,2 +8,2 @@ """Top-level package for xeauth."""

__email__ = 'joe.mosbacher@gmail.com'
__version__ = '0.1.19'
__version__ = '0.1.20'
from warnings import warn
import param
import panel as pn
from ..oauth import NotebookSession
from ..session import NotebookSession
try:

@@ -8,0 +7,0 @@ from eve_panel.auth import EveAuthBase

@@ -1,12 +0,6 @@

import os
import io
import sys
import time
import param
import httpx
from .settings import config
from .token import XeToken
class XeAuthStep(param.ParameterizedFunction):

@@ -22,9 +16,11 @@ auto_advance = param.Boolean(True)

def prompt(self, p):
pass
return p
def __call__(self, **params):
p = param.ParamOverrides(self, params)
prompt_response = self.prompt(p)
p['prompt_response'] = prompt_response
p = self.prompt(p)
next = self.perform(p)
if isinstance(next, XeAuthStep) and p.auto_advance:

@@ -35,155 +31,1 @@ params = {k:v for k,v in params.items() if k in next.param.params()}

class XeTokenRequest(XeAuthStep):
oauth_domain = param.String(config.OAUTH_DOMAIN)
oauth_token_path = param.String(config.OAUTH_TOKEN_PATH)
user_code = param.String()
device_code = param.String()
client_id = param.String()
headers = param.Dict()
verification_uri = param.String()
verification_uri_complete = param.String()
expires = param.Number()
interval = param.Number(5)
open_browser = param.Boolean(True)
def prompt(self, p):
print(f'Please visit the following URL to complete '
f'the login: {self.verification_uri_complete}', file=p.console)
if p.open_browser:
import webbrowser
webbrowser.open(self.verification_uri_complete)
def perform(self, p):
while True:
if time.time()>p.expires:
raise TimeoutError("Device code hase expired but not yet authorized.")
try:
s = self.fetch_token(p.oauth_domain, p.oauth_token_path,
p.device_code, p.client_id, headers=p.headers)
return s
except Exception as e:
time.sleep(p.interval)
def fetch_token(self, oauth_domain, oauth_token_path, device_code, client_id, headers={}):
with httpx.Client(base_url=oauth_domain, headers=headers) as client:
r = client.post(
oauth_token_path,
data={
"grant_type": "urn:ietf:params:oauth:grant-type:device_code",
"device_code": device_code,
"client_id": client_id,
},
headers={"content-type": "application/x-www-form-urlencoded"},
)
r.raise_for_status()
params = r.json()
params["expires"] = time.time() + params.pop("expires_in", 1e6)
params["client_id"] = self.client_id
params['oauth_domain'] = oauth_domain
params['oauth_token_path'] = oauth_token_path
return XeToken(**params)
class XeAuthCodeRequest(XeAuthStep):
oauth_domain = param.String(config.OAUTH_DOMAIN)
oauth_code_path = param.String(config.OAUTH_CODE_PATH)
client_id = param.String(config.DEFAULT_CLIENT_ID)
scopes = param.List(config.DEFAULT_SCOPE.split(' '))
audience = param.String(config.DEFAULT_AUDIENCE)
extra_fields = param.Dict({})
headers = param.Dict({})
@property
def scope_str(self):
return ' '.join(self.scopes)
def prompt(self, p):
pass
def perform(self, p):
data = {
"client_id": p.client_id,
"scope": ' '.join(p.scopes),
"audience": p.audience,
}
data.update(p.extra_fields)
with httpx.Client(base_url=p.oauth_domain, headers=p.headers) as client:
r = client.post(
p.oauth_code_path,
data=data,
headers={"content-type": "application/x-www-form-urlencoded"})
r.raise_for_status()
params = r.json()
params['expires'] = time.time() + params.pop("expires_in", 1)
params['oauth_domain'] = p.oauth_domain
params['client_id'] = p.client_id
return XeTokenRequest.instance(**params)
class TokenRefresh(XeAuthStep):
client_id = param.String(config.DEFAULT_CLIENT_ID)
oauth_domain = param.String(config.OAUTH_DOMAIN)
oauth_token_path = param.String(config.OAUTH_TOKEN_PATH)
access_token = param.String(readonly=True)
id_token = param.String(readonly=True)
refresh_token = param.String(readonly=True)
def perform(self, p):
with httpx.Client(base_url=self.oauth_domain, headers=p.headers) as client:
r = client.post(
p.oauth_token_path,
headers={"content-type":"application/x-www-form-urlencoded"},
data={
"grant_type": "refresh_token",
"refresh_token": p.refresh_token,
"client_id": p.client_id,
}
)
r.raise_for_status()
params = r.json()
params["expires"] = time.time() + params.pop("expires_in", 1e6)
params["client_id"] = p.client_id
params['oauth_domain'] = p.oauth_domain
params['oauth_token_path'] = p.oauth_token_path
return XeToken(**params)
class UserCredentialsAuth(XeAuthStep):
username = param.String()
password = param.String()
auth_url = param.String(config.OAUTH_DOMAIN.rstrip('/')+'/token')
audience = param.String(config.DEFAULT_AUDIENCE)
scopes = param.List(config.DEFAULT_SCOPE.split(' '))
client_id = param.String(config.DEFAULT_CLIENT_ID)
headers = param.Dict({'content-type': 'application/x-www-form-urlencoded'})
def perform(self, p):
data = dict(
grant_type='password',
username=p.username,
password=p.password,
audience=p.audience,
scope=' '.join(p.scope),
client_id=p.client_id,
)
r = httpx.post(p.auth_url, data=data, headers=p.headers)
r.raise_for_status()
kwargs = r.json()
kwargs['expires'] = time.time() + kwargs.pop('expires_in')
return XeToken(client_id=p.client_id, **kwargs)

@@ -7,3 +7,2 @@ import os

import logging
import panel as pn

@@ -73,3 +72,3 @@ from datetime import datetime

else:
self.token = self.flow.perform(self.oauth_domain, self.oauth_code_path, self.oauth_token_path,
self.token = self.flow(self.oauth_domain, self.oauth_code_path, self.oauth_token_path,
self.client_id, self.scope, self.audience, headers=extra_headers,

@@ -187,2 +186,3 @@ extra_fields=extra_fields,

def gui(self):
import panel as pn
if self._gui is None:

@@ -198,2 +198,3 @@ self._gui = pn.panel(self._make_gui)

try:
import panel as pn
self.request_token()

@@ -209,2 +210,3 @@ logger.info("Sent request...")

def logged_in_gui(self):
import panel as pn
profile = self.profile

@@ -211,0 +213,0 @@ details = pn.Row(

@@ -11,3 +11,2 @@ import param

class XeToken(param.Parameterized):

@@ -14,0 +13,0 @@ client_id = param.String(config.DEFAULT_CLIENT_ID)

@@ -1,33 +0,40 @@

# import param
# import httpx
# import time
# from .settings import config
# from .token import XeToken
import param
import httpx
import time
import getpass
from .oauth import XeAuthStep
from .token import XeToken
from .settings import config
# class UserCredentialsAuth(param.Parameterized):
# AUTH_URL = param.String(config.OAUTH_DOMAIN.rstrip('/')+'/token')
# audience = param.String(config.DEFAULT_AUDIENCE)
# scope = param.String(config.DEFAULT_SCOPE)
# client_id = param.String(config.DEFAULT_CLIENT_ID)
# headers = param.Dict({'content-type': 'application/x-www-form-urlencoded'})
class UserCredentialsAuth(XeAuthStep):
username = param.String(default=None)
password = param.String(default=None)
# def login(self, username, password, audience=None, scope=None):
# if scope is None:
# scope = self.scope
# if audience is None:
# audience = self.audience
auth_url = param.String(config.OAUTH_DOMAIN.rstrip('/')+'/token')
audience = param.String(config.DEFAULT_AUDIENCE)
scopes = param.List(config.DEFAULT_SCOPE.split(' '))
client_id = param.String(config.DEFAULT_CLIENT_ID)
headers = param.Dict({'content-type': 'application/x-www-form-urlencoded'})
# data = dict(
# grant_type='password',
# username=username,
# password=password,
# audience=audience,
# scope=scope,
# client_id=self.client_id,
# )
# r = httpx.post(self.AUTH_URL, data=data, headers=self.headers)
# r.raise_for_status()
# kwargs = r.json()
# kwargs['expires'] = time.time() + kwargs.pop('expires_in')
# return XeToken(client_id=self.client_id, **kwargs)
def prompt(self, p):
if p.username is None:
p.username = getpass.getuser()
if p.password is None:
p.password = getpass.getpass()
return p
def perform(self, p):
data = dict(
grant_type='password',
username=p.username,
password=p.password,
audience=p.audience,
scope=' '.join(p.scopes),
client_id=p.client_id,
)
r = httpx.post(p.auth_url, data=data, headers=p.headers)
r.raise_for_status()
kwargs = r.json()
kwargs['expires'] = time.time() + kwargs.pop('expires_in')
return XeToken(client_id=p.client_id, **kwargs)
import os
import panel as pn
import getpass
from .settings import config
# from .oauth import XeAuthSession, NotebookSession, UserCredentialsAuth
# from .user_credentials import UserCredentialsAuth
from .oauth import UserCredentialsAuth, XeAuthCodeRequest
from .user_credentials import UserCredentialsAuth
from .device_auth_flow import XeAuthCodeRequest
from .certificates import certs

@@ -16,5 +13,15 @@

login = XeAuthCodeRequest.instance(auto_advance=True)
device_login = XeAuthCodeRequest.instance(auto_advance=True)
def login(username=None, password=None, **kwargs):
if username is None:
return device_login(**kwargs)
return user_login(username=username,
password=password,
**kwargs)
def cli_login(**kwargs):

@@ -44,1 +51,2 @@ token = login(**kwargs)

return login(audience=audience, scopes=scopes, **kwargs)