flops-utils
Advanced tools
| import os | ||
| import sys | ||
| from flops_utils.logging import logger | ||
| DOCKER_HOST_IP_LINUX = "172.17.0.1" | ||
| _ERROR_MESSAGE = ( | ||
| "Terminating. Make sure to set the environment variables first. Missing: " | ||
| ) | ||
| def get_env_var(name: str, default: str = "") -> str: | ||
| env_var = os.environ.get(name) or default | ||
| if env_var is None: | ||
| logger.fatal(f"{_ERROR_MESSAGE}'{name}'") | ||
| sys.exit(1) | ||
| return env_var |
| # Used to abstract away concrete uses of MLflow model flavors. | ||
| # Based on the provided flavor from the FLOps SLA the specific MLflow model flavor will be used. | ||
| import os | ||
| import sys | ||
| from flops_utils.logging import logger | ||
| from flops_utils.types import MLModelFlavor | ||
| def get_ml_model_flavor(): | ||
| match MLModelFlavor(os.environ.get("ML_MODEL_FLAVOR")): | ||
| case MLModelFlavor.KERAS: | ||
| import mlflow.keras # type: ignore | ||
| return mlflow.keras | ||
| case MLModelFlavor.PYTORCH: | ||
| import mlflow.pytorch # type: ignore | ||
| return mlflow.pytorch | ||
| case MLModelFlavor.SKLEARN: | ||
| import mlflow.sklearn # type: ignore | ||
| return mlflow.sklearn | ||
| case _: | ||
| logger.exception("Provided MLModelFlavor is not supported yet.") | ||
| sys.exit(1) |
| from .flops_learner_files_proxy import load_dataset |
| # Note: This proxy is used to provide ML repo developers/users with stub FLOps Learner components. | ||
| # E.g. The ML repo developer does not have access to any data of the worker nodes yet. | ||
| # This data will be fetched by the Learner's data_loading from the Data Manager Sidecar. | ||
| # This data_loading is part of the Learner image and should be abstracted away from the ML repo. | ||
| # To be able to include the data_loading methods in the ML repo code these mocks are provided. | ||
| # These mocks will be replaced with the real implementation during the FLOps image build process. | ||
| import sys | ||
| import datasets # type: ignore | ||
| from flops_utils.logging import logger | ||
| def load_dataset() -> datasets.Dataset: | ||
| """Loads the data from the co-located ml-data-server from the learner node. | ||
| Returns a single dataset that encompasses all matching found data from the server. | ||
| This dataset is in "Arrow" format. | ||
| """ | ||
| try: | ||
| from data_loading import load_data_from_ml_data_server # type: ignore | ||
| return load_data_from_ml_data_server() | ||
| except ImportError: | ||
| logger.exception("The data_loading file was not found.") | ||
| sys.exit(1) |
| # Note: This proxy is used to handle ml repo files | ||
| # which get injected during the image build. | ||
| # I.e. these files are not yet present. | ||
| # Additionally it handles exceptions and helps linters, etc to work normally. | ||
| import sys | ||
| from flops_utils.logging import logger | ||
| def get_model_manager(): | ||
| try: | ||
| from model_manager import ModelManager # type: ignore | ||
| return ModelManager() | ||
| except ImportError: | ||
| logger.exception("A ML repo file was not found.") | ||
| sys.exit(1) |
| import dataclasses | ||
| from flops_utils.types import CustomEnum | ||
| class Target(CustomEnum): | ||
| FLOPS_MANAGER = "flops_manager" | ||
| PROJECT_OBSERVER = "project_observer" | ||
| class Subject(CustomEnum): | ||
| PROJECT_OBSERVER = "project_observer" | ||
| FL_ACTORS_IMAGE_BUILDER = "fl_actors_image_builder" | ||
| TRAINED_MODEL_IMAGE_BUILDER = "trained_model_image_builder" | ||
| AGGREGATOR = "aggregator" | ||
| LEARNER = "learner" | ||
| class Status(CustomEnum): | ||
| FAILED = "failed" | ||
| SUCCESS = "success" | ||
| @dataclasses.dataclass | ||
| class Topic: | ||
| subject: Subject | ||
| status: Status | ||
| target: Target = Target.FLOPS_MANAGER | ||
| def __str__(self) -> str: | ||
| return f"{self.target}/{self.subject}/{self.status}" | ||
| def find_matching_supported_topic(self) -> "SupportedTopic": | ||
| for topic in SupportedTopic: | ||
| if str(self) == topic.value: | ||
| return topic | ||
| raise ValueError(f"'{str(self)}' has no matching supported topic.") | ||
| class SupportedTopic(CustomEnum): | ||
| PROJECT_OBSERVER_FAILED = str( | ||
| Topic(subject=Subject.PROJECT_OBSERVER, status=Status.FAILED) | ||
| ) | ||
| FL_ACTORS_IMAGE_BUILDER_SUCCESS = str( | ||
| Topic(subject=Subject.FL_ACTORS_IMAGE_BUILDER, status=Status.SUCCESS) | ||
| ) | ||
| FL_ACTORS_IMAGE_BUILDER_FAILED = str( | ||
| Topic(subject=Subject.FL_ACTORS_IMAGE_BUILDER, status=Status.FAILED) | ||
| ) | ||
| TRAINED_MODEL_IMAGE_BUILDER_SUCCESS = str( | ||
| Topic(subject=Subject.TRAINED_MODEL_IMAGE_BUILDER, status=Status.SUCCESS) | ||
| ) | ||
| TRAINED_MODEL_IMAGE_BUILDER_FAILED = str( | ||
| Topic(subject=Subject.TRAINED_MODEL_IMAGE_BUILDER, status=Status.FAILED) | ||
| ) | ||
| AGGREGATOR_SUCCESS = str(Topic(subject=Subject.AGGREGATOR, status=Status.SUCCESS)) | ||
| AGGREGATOR_FAILED = str(Topic(subject=Subject.AGGREGATOR, status=Status.FAILED)) | ||
| LEARNER_FAILED = str(Topic(subject=Subject.LEARNER, status=Status.FAILED)) |
| import shlex | ||
| import subprocess | ||
| def run_in_shell( | ||
| shell_cmd: str, | ||
| capture_output=True, | ||
| check=True, | ||
| text=False, | ||
| ) -> subprocess.CompletedProcess[bytes]: | ||
| return subprocess.run( | ||
| shlex.split(shell_cmd), capture_output=capture_output, check=check, text=text | ||
| ) |
@@ -35,3 +35,3 @@ # Reference: | ||
| def format(self, record): | ||
| log_fmt = self.FORMATS.get(record.levelno) | ||
| log_fmt = self.FORMATS.get(record.levelno) # type: ignore | ||
| formatter = logging.Formatter(log_fmt) | ||
@@ -38,0 +38,0 @@ return formatter.format(record) |
@@ -6,3 +6,5 @@ import json | ||
| import paho.mqtt.client as paho_mqtt | ||
| from flops_utils.logging import logger | ||
| from flops_utils.mqtt_topics import SupportedTopic | ||
@@ -23,5 +25,5 @@ | ||
| mqtt_ip: str, | ||
| topic: str, | ||
| topic: SupportedTopic, | ||
| msg_payload: dict = {}, | ||
| error_msg: str = None, | ||
| error_msg: str = "", | ||
| mqtt_port: int = 9027, | ||
@@ -34,9 +36,9 @@ ) -> None: | ||
| "flops_project_id": flops_project_id, | ||
| **({"error_msg": error_msg} if error_msg is not None else {}), | ||
| **({"error_msg": error_msg} if error_msg else {}), | ||
| } | ||
| ) | ||
| mqtt_client = paho_mqtt.Client(paho_mqtt.CallbackAPIVersion.VERSION1) | ||
| mqtt_client = paho_mqtt.Client(paho_mqtt.CallbackAPIVersion.VERSION1) # type: ignore | ||
| mqtt_client.connect(mqtt_ip, mqtt_port) | ||
| mqtt_client.publish( | ||
| topic=topic, | ||
| topic=topic.value, | ||
| payload=json.dumps(payload), | ||
@@ -43,0 +45,0 @@ # Note: qos=2 should be used, however qos=2 does not work for some reason. |
| from dataclasses import dataclass, field | ||
| from datetime import datetime | ||
| from typing import Dict, Union | ||
| from datetime import datetime, timedelta | ||
| from typing import Dict, Optional, Union | ||
@@ -10,3 +10,3 @@ | ||
| human_readable: bool = False, | ||
| ) -> Union[float, str]: | ||
| ) -> Union[timedelta, str]: | ||
| duration = end - start | ||
@@ -28,3 +28,3 @@ if not human_readable: | ||
| start_time: datetime = field(default_factory=datetime.now, init=False) | ||
| end_time: datetime = field(default=None, init=False) | ||
| end_time: Optional[datetime] = field(default=None, init=False) | ||
@@ -35,3 +35,3 @@ def end_time_frame(self) -> datetime: | ||
| def get_duration(self, human_readable: bool = False) -> Union[float, str]: | ||
| def get_duration(self, human_readable: bool = False) -> Union[timedelta, str]: | ||
| return _get_duration( | ||
@@ -75,3 +75,3 @@ start=self.start_time, | ||
| human_readable: bool = False, | ||
| ) -> Union[float, str]: | ||
| ) -> Union[timedelta, str]: | ||
| return _get_duration( | ||
@@ -78,0 +78,0 @@ start=self.time_stamps[timestamp_a_name], |
+19
-2
@@ -1,8 +0,13 @@ | ||
| from enum import Enum | ||
| import enum | ||
| class CustomEnum(enum.Enum): | ||
| def __str__(self) -> str: | ||
| return self.value | ||
| # These flavors are a subset of MLflow's model flavors. | ||
| # They are used to decide which MLflow model flavor to use. | ||
| # https://mlflow.org/docs/latest/models.html#built-in-model-flavors | ||
| class MLModelFlavor(str, Enum): | ||
| class MLModelFlavor(str, enum.Enum): | ||
| SKLEARN = "sklearn" # Scikit-learn | ||
@@ -13,1 +18,13 @@ PYTORCH = "pytorch" | ||
| # This list can be further expanded. | ||
| class FLOpsMode(str, enum.Enum): | ||
| CLASSIC = "classic" | ||
| HIERARCHICAL = "hierarchical" | ||
| class AggregatorType(enum.Enum): | ||
| CLASSIC_AGGREGATOR = "CLASSIC_AGGREGATOR" | ||
| ROOT_AGGREGATOR = "ROOT_AGGREGATOR" | ||
| CLUSTER_AGGREGATOR = "CLUSTER_AGGREGATOR" |
+5
-2
| Metadata-Version: 2.1 | ||
| Name: flops_utils | ||
| Version: 0.4.7 | ||
| Version: 0.5.1 | ||
| Summary: A library containing commong utilities for FLOps | ||
@@ -15,3 +15,6 @@ Author: Alexander Malyuk | ||
| TODO | ||
| # TODO move to the mono FLOps repo once it is public ! | ||
| # tmp_flops_utils | ||
| Will be moved to the Oakestra FLOps Repo once it becomes public | ||
+4
-4
| [tool.poetry] | ||
| name = "flops_utils" | ||
| version = "0.4.7" | ||
| version = "0.5.1" | ||
| description = "A library containing commong utilities for FLOps" | ||
@@ -18,6 +18,6 @@ authors = ["Alexander Malyuk <malyuk.alexander1999@gmail.com>"] | ||
| packages = ["flops_utils/*"] | ||
| #packages = ["flops_utils/*"] | ||
| line-length = 100 | ||
| [tool.black] | ||
| #line-length = 100 | ||
| #[tool.black] |
+4
-1
@@ -1,1 +0,4 @@ | ||
| TODO | ||
| # TODO move to the mono FLOps repo once it is public ! | ||
| # tmp_flops_utils | ||
| Will be moved to the Oakestra FLOps Repo once it becomes public |
| # Note: This wrapper is used to provide ML repo developers/users with mock FLOps Learner components. | ||
| # E.g. The ML repo developer does not have access to any data of the worker nodes yet. | ||
| # This data will be fetched by the Learner's data_loading from the Data Manager Sidecar. | ||
| # This data_loading is part of the Learner image and should be abstracted away from the ML repo. | ||
| # To be able to include the data_loading methods in the ML repo code these mocks are provided. | ||
| # These mocks will be replaced with the real implementation during the FLOps image build process. | ||
| import sys | ||
| from flops_utils.logging import logger | ||
| def load_ml_data(): | ||
| try: | ||
| from data_loading import load_data_from_worker_node # type: ignore | ||
| return load_data_from_worker_node() | ||
| except ImportError: | ||
| logger.exception("The data_loading file was not found.") | ||
| sys.exit(1) |
| # Used to abstract away concrete uses of MLflow model flavors. | ||
| # Based on the provided flavor from the FLOps SLA the specific MLflow model flavor will be used. | ||
| import os | ||
| import sys | ||
| from flops_utils.logging import logger | ||
| from flops_utils.types import MLModelFlavor | ||
| def get_ml_model_flavor(): | ||
| match MLModelFlavor(os.environ.get("ML_MODEL_FLAVOR")): | ||
| case MLModelFlavor.KERAS: | ||
| import mlflow.keras | ||
| return mlflow.keras | ||
| case MLModelFlavor.PYTORCH: | ||
| import mlflow.pytorch | ||
| return mlflow.pytorch | ||
| case MLModelFlavor.SKLEARN: | ||
| import mlflow.sklearn | ||
| return mlflow.sklearn | ||
| case _: | ||
| logger.exception("Provided MLModelFlavor is not supported yet.") | ||
| sys.exit(1) |
| # Note: This wrapper is used to handle ml repo files | ||
| # which get injected during the image build. | ||
| # I.e. these files are not yet present. | ||
| # Additionally it handles exceptions and helps linters, etc to work normally. | ||
| import sys | ||
| from flops_utils.logging import logger | ||
| def get_model_manager(): | ||
| try: | ||
| from model_manager import ModelManager # type: ignore | ||
| return ModelManager() | ||
| except ImportError: | ||
| logger.exception("A ML repo file was not found.") | ||
| sys.exit(1) |
Alert delta unavailable
Currently unable to show alert delta for PyPI packages.
15931
30.08%15
36.36%368
30.5%