PyHackTheBox
Advanced tools
| API_BASE = "https://www.hackthebox.eu/api/v4/" | ||
| USER_AGENT = "htb-api/0.4.4" | ||
| USER_AGENT = "htb-api/0.5.0" | ||
| DOWNLOAD_COOLDOWN = 30 |
@@ -6,2 +6,6 @@ class HtbException(Exception): | ||
| class ApiError(HtbException): | ||
| """The API responded in an unexpected way""" | ||
| class AuthenticationException(HtbException): | ||
@@ -78,1 +82,6 @@ """An error authenticating to the API""" | ||
| super().__init__(message) | ||
| class CacheException(HtbException): | ||
| """There was an issue with the token cache""" | ||
| pass |
+98
-29
| from __future__ import annotations | ||
| import atexit | ||
| import base64 | ||
| import getpass | ||
| import json | ||
| import os | ||
| import time | ||
@@ -11,4 +14,4 @@ from typing import List, Callable, Union | ||
| from .constants import API_BASE, USER_AGENT | ||
| from .errors import AuthenticationException, MissingPasswordException, MissingEmailException, NotFoundException, \ | ||
| MissingOTPException, IncorrectOTPException | ||
| from .errors import AuthenticationException, NotFoundException, \ | ||
| IncorrectOTPException, ApiError | ||
@@ -61,9 +64,12 @@ | ||
| r = requests.post(self._api_base + "login/refresh", json={ | ||
| "refresh_token": self._refresh_token | ||
| }, headers=headers) | ||
| "refresh_token": self._refresh_token | ||
| }, headers=headers) | ||
| data = r.json()['message'] | ||
| if data == "Unauthenticated": | ||
| raise AuthenticationException | ||
| self._access_token = data['access_token'] | ||
| self._refresh_token = data['refresh_token'] | ||
| def do_request(self, endpoint, json_data=None, data=None, authorized=True, download=False, post=False) -> Union[dict, bytes]: | ||
| def do_request(self, endpoint, json_data=None, data=None, authorized=True, download=False, post=False) -> Union[ | ||
| dict, bytes]: | ||
| """ | ||
@@ -111,29 +117,92 @@ | ||
| def __init__(self, email: str = None, password: str = None, otp: str|int = None, api_base: str = API_BASE): | ||
| def __init__(self, email: str = None, password: str = None, otp: str | int = None, | ||
| cache: str = None, api_base: str = API_BASE): | ||
| """ | ||
| Authenticates to the API. | ||
| If `cache` is set, the client will attempt to load access tokens from the given path. If they cannot be found, | ||
| or are expired, normal API authentication will take place, and the tokens will be dumped to the file for the | ||
| next launch. | ||
| Args: | ||
| email: The authenticating user's email address | ||
| password: The authenticating user's password | ||
| otp: The current OTP of the user, if 2FA is enabled | ||
| cache: The path to load/store access tokens from/to | ||
| """ | ||
| self._api_base = api_base | ||
| if not password and not email: | ||
| print("Must give an authentication method") | ||
| raise AuthenticationException | ||
| elif password and not email: | ||
| raise MissingEmailException | ||
| elif email and not password: | ||
| raise MissingPasswordException | ||
| if cache is not None: | ||
| if self.load_from_cache(cache) is False: | ||
| self.do_login(email, password, otp) | ||
| self.dump_to_cache(cache) | ||
| # Make sure we dump our current tokens out when we exit | ||
| atexit.register(self.dump_to_cache, cache) | ||
| else: | ||
| data = self.do_request("login", json_data={ | ||
| "email": email, "password": password | ||
| }, authorized=False) | ||
| self._access_token = data['message']['access_token'] | ||
| self._refresh_token = data['message']['refresh_token'] | ||
| if data['message']['is2FAEnabled'] is True: | ||
| if otp is None: | ||
| raise MissingOTPException | ||
| if type(otp) == int: | ||
| # Optimistically try and create a string | ||
| otp = f"{otp:06d}" | ||
| resp = self.do_request("2fa/login", json_data={ | ||
| "one_time_password": otp | ||
| }) | ||
| if "correct" not in resp['message']: | ||
| raise IncorrectOTPException | ||
| self.do_login(email, password, otp) | ||
| def load_from_cache(self, cache: str) -> bool: | ||
| """ | ||
| Args: | ||
| cache: The cache file path | ||
| Returns: Whether loading from the cache was successful | ||
| """ | ||
| if not os.path.exists(cache): | ||
| return False | ||
| with open(cache, 'r') as f: | ||
| data = json.load(f) | ||
| self._access_token = data['access_token'] | ||
| self._refresh_token = data['refresh_token'] | ||
| if jwt_expired(self._access_token): | ||
| try: | ||
| self._refresh_access_token() | ||
| # Our refresh token is also invalid, we must log in again | ||
| except AuthenticationException: | ||
| return False | ||
| return True | ||
| def dump_to_cache(self, cache): | ||
| """ | ||
| Dumps the current access and refresh tokens to a file | ||
| Args: | ||
| cache: The path to the cache file | ||
| """ | ||
| with open(cache, 'w') as f: | ||
| json.dump({ | ||
| "access_token": self._access_token, | ||
| "refresh_token": self._refresh_token | ||
| }, f) | ||
| def do_login(self, email: str = None, password: str = None, otp: str | int = None): | ||
| """ | ||
| Authenticates against the API. If credentials are not provided, they will be prompted for. | ||
| """ | ||
| if email is None: | ||
| email = input("Email: ") | ||
| if password is None: | ||
| password = getpass.getpass() | ||
| data = self.do_request("login", json_data={ | ||
| "email": email, "password": password | ||
| }, authorized=False) | ||
| msg = data['message'] | ||
| self._access_token = msg.get('access_token') | ||
| if self._access_token is None: | ||
| raise ApiError(f"Failed to get access token: {msg}") | ||
| self._refresh_token = msg.get('refresh_token') | ||
| if self._refresh_token is None: | ||
| raise ApiError(f"Failed to get refresh token: {msg}") | ||
| if data['message']['is2FAEnabled'] is True: | ||
| if otp is None: | ||
| otp = input("OTP: ") | ||
| if type(otp) == int: | ||
| # Optimistically try and create a string | ||
| otp = f"{otp:06d}" | ||
| resp = self.do_request("2fa/login", json_data={ | ||
| "one_time_password": otp | ||
| }) | ||
| if "correct" not in resp['message']: | ||
| raise IncorrectOTPException | ||
| # noinspection PyUnresolvedReferences | ||
@@ -140,0 +209,0 @@ def search(self, search_term: str) -> "Search": |
@@ -29,3 +29,3 @@ from datetime import datetime, timedelta | ||
| difficulty: The difficulty of the machine | ||
| ip: The IP address of the machine | ||
| :noindex: ip: The IP address of the machine | ||
@@ -160,3 +160,3 @@ active: Whether the Machine is active | ||
| data = self._client.do_request("vm/spawn", json_data={"machine_id": self.id}) | ||
| if "Machine deployed" in data.get("message"): | ||
| if "Machine deployed" in data.get("message") or "You have been assigned" in data.get("message"): | ||
| ip = self._client.do_request(f"machine/profile/{self.id}")["info"]["ip"] | ||
@@ -163,0 +163,0 @@ server = self._client.get_current_vpn_server() |
@@ -28,5 +28,10 @@ """ | ||
| friendly_name: Friendly name of the server | ||
| Example: ``'US Free 1'`` | ||
| current_clients: The number of currently connected clients | ||
| location: The physical location of the server | ||
| Example: ``'US'`` | ||
| """ | ||
@@ -51,2 +56,5 @@ | ||
| def __str__(self): | ||
| return f"{self.friendly_name}" | ||
| def switch(self) -> bool: | ||
@@ -53,0 +61,0 @@ """ |
+1
-1
| Metadata-Version: 2.1 | ||
| Name: PyHackTheBox | ||
| Version: 0.4.4 | ||
| Version: 0.5.0 | ||
| Summary: A wrapper for the Hack The Box API. | ||
@@ -5,0 +5,0 @@ Home-page: https://github.com/clubby789/htb-api |
| Metadata-Version: 2.1 | ||
| Name: PyHackTheBox | ||
| Version: 0.4.4 | ||
| Version: 0.5.0 | ||
| Summary: A wrapper for the Hack The Box API. | ||
@@ -5,0 +5,0 @@ Home-page: https://github.com/clubby789/htb-api |
+1
-1
@@ -11,3 +11,3 @@ import setuptools | ||
| name="PyHackTheBox", | ||
| version="0.4.4", | ||
| version="0.5.0", | ||
| author="clubby789@github.com", | ||
@@ -14,0 +14,0 @@ author_email="clubby789@gmail.com", |
Alert delta unavailable
Currently unable to show alert delta for PyPI packages.
73844
3.75%1685
4.33%