PyHackTheBox
Advanced tools
+94
-44
@@ -60,4 +60,5 @@ from __future__ import annotations | ||
| _user: Optional["User"] = None | ||
| _access_token: str | ||
| _refresh_token: str | ||
| _access_token: Optional[str] | ||
| _refresh_token: Optional[str] | ||
| _app_token: Optional[str] | ||
| _api_base: str | ||
@@ -102,5 +103,10 @@ challenge_cooldown: int = 0 | ||
| # i.e. we're running a test | ||
| if jwt_expired(self._access_token): | ||
| self._refresh_access_token() | ||
| headers['Authorization'] = "Bearer " + self._access_token | ||
| if self._app_token is not None: | ||
| headers['Authorization'] = "Bearer " + self._app_token | ||
| elif self._access_token is not None and self._refresh_token is not None: | ||
| if jwt_expired(self._access_token): | ||
| self._refresh_access_token() | ||
| headers['Authorization'] = "Bearer " + self._access_token | ||
| else: | ||
| raise AuthenticationException("No authentication tokens available") | ||
| while True: | ||
@@ -129,3 +135,4 @@ if not json_data and not data: | ||
| def __init__(self, email: Optional[str] = None, password: Optional[str] = None, otp: Optional[str | int] = None, | ||
| cache: Optional[str] = None, api_base: str = API_BASE, remember: Optional[bool] = False): | ||
| cache: Optional[str] = None, api_base: str = API_BASE, remember: Optional[bool] = False, | ||
| app_token: Optional[str] = None): | ||
| """ | ||
@@ -144,2 +151,3 @@ Authenticates to the API. | ||
| remember: Whether to create a long-lasting 'remember me' token | ||
| app_token: Authenticate using a provided App Token | ||
| """ | ||
@@ -149,3 +157,3 @@ self._api_base = api_base | ||
| if self.load_from_cache(cache) is False: | ||
| self.do_login(email, password, otp, remember) | ||
| self.do_login(email, password, otp, remember, app_token) | ||
| self.dump_to_cache(cache) | ||
@@ -155,3 +163,3 @@ # Make sure we dump our current tokens out when we exit | ||
| else: | ||
| self.do_login(email, password, otp, remember) | ||
| self.do_login(email, password, otp, remember, app_token) | ||
@@ -169,10 +177,12 @@ def load_from_cache(self, cache: str) -> bool: | ||
| data = json.load(f) | ||
| self._access_token = data['access_token'] | ||
| self._refresh_token = data['refresh_token'] | ||
| if jwt_expired(self._access_token): | ||
| try: | ||
| self._refresh_access_token() | ||
| # Our refresh token is also invalid, we must log in again | ||
| except AuthenticationException: | ||
| return False | ||
| self._access_token = data.get('access_token') | ||
| self._refresh_token = data.get('refresh_token') | ||
| self._app_token = data.get('app_token') | ||
| if self._access_token is not None: | ||
| if jwt_expired(self._access_token): | ||
| try: | ||
| self._refresh_access_token() | ||
| # Our refresh token is also invalid, we must log in again | ||
| except AuthenticationException: | ||
| return False | ||
| return True | ||
@@ -189,38 +199,44 @@ | ||
| "access_token": self._access_token, | ||
| "refresh_token": self._refresh_token | ||
| "refresh_token": self._refresh_token, | ||
| "app_token": self._app_token | ||
| }, f) | ||
| def do_login(self, email: Optional[str] = None, password: Optional[str] = None, otp: Optional[str | int] = None, | ||
| remember: Optional[bool] = False): | ||
| remember: Optional[bool] = False, app_token: Optional[str] = None): | ||
| """ | ||
| Authenticates against the API. If credentials are not provided, they will be prompted for. | ||
| """ | ||
| if email is None: | ||
| email = input("Email: ") | ||
| if password is None: | ||
| password = getpass.getpass() | ||
| data = cast(dict, self.do_request("login", json_data={ | ||
| "email": email, "password": password, "remember": remember | ||
| }, authorized=False)) | ||
| msg = data['message'] | ||
| self._app_token = app_token | ||
| if app_token is not None: | ||
| self._access_token = self._refresh_token = None | ||
| else: | ||
| if email is None: | ||
| email = input("Email: ") | ||
| if password is None: | ||
| password = getpass.getpass() | ||
| self._access_token = msg.get('access_token') | ||
| if self._access_token is None: | ||
| raise ApiError(f"Failed to get access token: {msg}") | ||
| self._refresh_token = msg.get('refresh_token') | ||
| if self._refresh_token is None: | ||
| raise ApiError(f"Failed to get refresh token: {msg}") | ||
| if data['message']['is2FAEnabled'] is True: | ||
| if otp is None: | ||
| otp = input("OTP: ") | ||
| if type(otp) == int: | ||
| # Optimistically try and create a string | ||
| otp = f"{otp:06d}" | ||
| resp = cast(dict, self.do_request("2fa/login", json_data={ | ||
| "one_time_password": otp | ||
| })) | ||
| if "correct" not in resp['message']: | ||
| raise IncorrectOTPException | ||
| data = cast(dict, self.do_request("login", json_data={ | ||
| "email": email, "password": password, "remember": remember | ||
| }, authorized=False)) | ||
| msg = data['message'] | ||
| self._access_token = msg.get('access_token') | ||
| if self._access_token is None: | ||
| raise ApiError(f"Failed to get access token: {msg}") | ||
| self._refresh_token = msg.get('refresh_token') | ||
| if self._refresh_token is None: | ||
| raise ApiError(f"Failed to get refresh token: {msg}") | ||
| if data['message']['is2FAEnabled'] is True: | ||
| if otp is None: | ||
| otp = input("OTP: ") | ||
| if type(otp) == int: | ||
| # Optimistically try and create a string | ||
| otp = f"{otp:06d}" | ||
| resp = cast(dict, self.do_request("2fa/login", json_data={ | ||
| "one_time_password": otp | ||
| })) | ||
| if "correct" not in resp['message']: | ||
| raise IncorrectOTPException | ||
| # noinspection PyUnresolvedReferences | ||
@@ -254,2 +270,33 @@ def search(self, search_term: str) -> "Search": | ||
| # noinspection PyUnresolvedReferences | ||
| def get_todo_machines(self, limit: int = None) -> List[int]: | ||
| """ | ||
| Retrieve a list of `Machine` ID's from the API based on the users todo list | ||
| Args: | ||
| limit: The maximum number to fetch | ||
| Returns: A list of `Machine` ID's | ||
| """ | ||
| data = cast(dict, self.do_request("home/user/todo"))['data']['machines'][:limit] | ||
| return [m['id'] for m in data] | ||
| # noinspection PyUnresolvedReferences | ||
| def get_active_machine(self) -> Optional[Machine]: | ||
| """ | ||
| Retrieve `Machine` currently assigned to user | ||
| Returns: The `Machine` currently assigned (or active) to user | ||
| """ | ||
| from .machine import Machine | ||
| info = cast(dict, self.do_request(f"machine/active"))['info'] | ||
| if info: | ||
| return self.get_machine(info['id']) | ||
| return None | ||
| # noinspection PyUnresolvedReferences | ||
| def get_machines(self, limit: int = None, retired: bool = False) -> List["Machine"]: | ||
@@ -272,3 +319,6 @@ """ | ||
| data = cast(dict, self.do_request("machine/list/retired"))['info'][:limit] | ||
| return [Machine(m, self, summary=True) for m in data] | ||
| machines = [Machine(m, self, summary=True) for m in data] | ||
| for machine in machines: | ||
| machine.retired = retired | ||
| return machines | ||
@@ -275,0 +325,0 @@ # noinspection PyUnresolvedReferences |
+1
-1
| Metadata-Version: 2.1 | ||
| Name: PyHackTheBox | ||
| Version: 0.5.4 | ||
| Version: 0.5.5 | ||
| Summary: A wrapper for the Hack The Box API. | ||
@@ -5,0 +5,0 @@ Home-page: https://github.com/clubby789/htb-api |
| Metadata-Version: 2.1 | ||
| Name: PyHackTheBox | ||
| Version: 0.5.4 | ||
| Version: 0.5.5 | ||
| Summary: A wrapper for the Hack The Box API. | ||
@@ -5,0 +5,0 @@ Home-page: https://github.com/clubby789/htb-api |
+1
-1
@@ -11,3 +11,3 @@ import setuptools | ||
| name="PyHackTheBox", | ||
| version="0.5.4", | ||
| version="0.5.5", | ||
| author="clubby789@github.com", | ||
@@ -14,0 +14,0 @@ author_email="clubby789@gmail.com", |
Alert delta unavailable
Currently unable to show alert delta for PyPI packages.
76930
2.58%1751
2.28%