Latest Threat Research:SANDWORM_MODE: Shai-Hulud-Style npm Worm Hijacks CI Workflows and Poisons AI Toolchains.Details
Socket
Book a DemoSign in
Socket

flops-utils

Package Overview
Dependencies
Maintainers
1
Versions
38
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

flops-utils - pypi Package Compare versions

Comparing version
0.4.7
to
0.5.1
+18
flops_utils/env_vars.py
import os
import sys
from flops_utils.logging import logger
DOCKER_HOST_IP_LINUX = "172.17.0.1"
_ERROR_MESSAGE = (
"Terminating. Make sure to set the environment variables first. Missing: "
)
def get_env_var(name: str, default: str = "") -> str:
env_var = os.environ.get(name) or default
if env_var is None:
logger.fatal(f"{_ERROR_MESSAGE}'{name}'")
sys.exit(1)
return env_var
# Used to abstract away concrete uses of MLflow model flavors.
# Based on the provided flavor from the FLOps SLA the specific MLflow model flavor will be used.
import os
import sys
from flops_utils.logging import logger
from flops_utils.types import MLModelFlavor
def get_ml_model_flavor():
match MLModelFlavor(os.environ.get("ML_MODEL_FLAVOR")):
case MLModelFlavor.KERAS:
import mlflow.keras # type: ignore
return mlflow.keras
case MLModelFlavor.PYTORCH:
import mlflow.pytorch # type: ignore
return mlflow.pytorch
case MLModelFlavor.SKLEARN:
import mlflow.sklearn # type: ignore
return mlflow.sklearn
case _:
logger.exception("Provided MLModelFlavor is not supported yet.")
sys.exit(1)
from .flops_learner_files_proxy import load_dataset
# Note: This proxy is used to provide ML repo developers/users with stub FLOps Learner components.
# E.g. The ML repo developer does not have access to any data of the worker nodes yet.
# This data will be fetched by the Learner's data_loading from the Data Manager Sidecar.
# This data_loading is part of the Learner image and should be abstracted away from the ML repo.
# To be able to include the data_loading methods in the ML repo code these mocks are provided.
# These mocks will be replaced with the real implementation during the FLOps image build process.
import sys
import datasets # type: ignore
from flops_utils.logging import logger
def load_dataset() -> datasets.Dataset:
"""Loads the data from the co-located ml-data-server from the learner node.
Returns a single dataset that encompasses all matching found data from the server.
This dataset is in "Arrow" format.
"""
try:
from data_loading import load_data_from_ml_data_server # type: ignore
return load_data_from_ml_data_server()
except ImportError:
logger.exception("The data_loading file was not found.")
sys.exit(1)
# Note: This proxy is used to handle ml repo files
# which get injected during the image build.
# I.e. these files are not yet present.
# Additionally it handles exceptions and helps linters, etc to work normally.
import sys
from flops_utils.logging import logger
def get_model_manager():
try:
from model_manager import ModelManager # type: ignore
return ModelManager()
except ImportError:
logger.exception("A ML repo file was not found.")
sys.exit(1)
import dataclasses
from flops_utils.types import CustomEnum
class Target(CustomEnum):
FLOPS_MANAGER = "flops_manager"
PROJECT_OBSERVER = "project_observer"
class Subject(CustomEnum):
PROJECT_OBSERVER = "project_observer"
FL_ACTORS_IMAGE_BUILDER = "fl_actors_image_builder"
TRAINED_MODEL_IMAGE_BUILDER = "trained_model_image_builder"
AGGREGATOR = "aggregator"
LEARNER = "learner"
class Status(CustomEnum):
FAILED = "failed"
SUCCESS = "success"
@dataclasses.dataclass
class Topic:
subject: Subject
status: Status
target: Target = Target.FLOPS_MANAGER
def __str__(self) -> str:
return f"{self.target}/{self.subject}/{self.status}"
def find_matching_supported_topic(self) -> "SupportedTopic":
for topic in SupportedTopic:
if str(self) == topic.value:
return topic
raise ValueError(f"'{str(self)}' has no matching supported topic.")
class SupportedTopic(CustomEnum):
PROJECT_OBSERVER_FAILED = str(
Topic(subject=Subject.PROJECT_OBSERVER, status=Status.FAILED)
)
FL_ACTORS_IMAGE_BUILDER_SUCCESS = str(
Topic(subject=Subject.FL_ACTORS_IMAGE_BUILDER, status=Status.SUCCESS)
)
FL_ACTORS_IMAGE_BUILDER_FAILED = str(
Topic(subject=Subject.FL_ACTORS_IMAGE_BUILDER, status=Status.FAILED)
)
TRAINED_MODEL_IMAGE_BUILDER_SUCCESS = str(
Topic(subject=Subject.TRAINED_MODEL_IMAGE_BUILDER, status=Status.SUCCESS)
)
TRAINED_MODEL_IMAGE_BUILDER_FAILED = str(
Topic(subject=Subject.TRAINED_MODEL_IMAGE_BUILDER, status=Status.FAILED)
)
AGGREGATOR_SUCCESS = str(Topic(subject=Subject.AGGREGATOR, status=Status.SUCCESS))
AGGREGATOR_FAILED = str(Topic(subject=Subject.AGGREGATOR, status=Status.FAILED))
LEARNER_FAILED = str(Topic(subject=Subject.LEARNER, status=Status.FAILED))
import shlex
import subprocess
def run_in_shell(
shell_cmd: str,
capture_output=True,
check=True,
text=False,
) -> subprocess.CompletedProcess[bytes]:
return subprocess.run(
shlex.split(shell_cmd), capture_output=capture_output, check=check, text=text
)
+1
-1

@@ -35,3 +35,3 @@ # Reference:

def format(self, record):
log_fmt = self.FORMATS.get(record.levelno)
log_fmt = self.FORMATS.get(record.levelno) # type: ignore
formatter = logging.Formatter(log_fmt)

@@ -38,0 +38,0 @@ return formatter.format(record)

@@ -6,3 +6,5 @@ import json

import paho.mqtt.client as paho_mqtt
from flops_utils.logging import logger
from flops_utils.mqtt_topics import SupportedTopic

@@ -23,5 +25,5 @@

mqtt_ip: str,
topic: str,
topic: SupportedTopic,
msg_payload: dict = {},
error_msg: str = None,
error_msg: str = "",
mqtt_port: int = 9027,

@@ -34,9 +36,9 @@ ) -> None:

"flops_project_id": flops_project_id,
**({"error_msg": error_msg} if error_msg is not None else {}),
**({"error_msg": error_msg} if error_msg else {}),
}
)
mqtt_client = paho_mqtt.Client(paho_mqtt.CallbackAPIVersion.VERSION1)
mqtt_client = paho_mqtt.Client(paho_mqtt.CallbackAPIVersion.VERSION1) # type: ignore
mqtt_client.connect(mqtt_ip, mqtt_port)
mqtt_client.publish(
topic=topic,
topic=topic.value,
payload=json.dumps(payload),

@@ -43,0 +45,0 @@ # Note: qos=2 should be used, however qos=2 does not work for some reason.

from dataclasses import dataclass, field
from datetime import datetime
from typing import Dict, Union
from datetime import datetime, timedelta
from typing import Dict, Optional, Union

@@ -10,3 +10,3 @@

human_readable: bool = False,
) -> Union[float, str]:
) -> Union[timedelta, str]:
duration = end - start

@@ -28,3 +28,3 @@ if not human_readable:

start_time: datetime = field(default_factory=datetime.now, init=False)
end_time: datetime = field(default=None, init=False)
end_time: Optional[datetime] = field(default=None, init=False)

@@ -35,3 +35,3 @@ def end_time_frame(self) -> datetime:

def get_duration(self, human_readable: bool = False) -> Union[float, str]:
def get_duration(self, human_readable: bool = False) -> Union[timedelta, str]:
return _get_duration(

@@ -75,3 +75,3 @@ start=self.start_time,

human_readable: bool = False,
) -> Union[float, str]:
) -> Union[timedelta, str]:
return _get_duration(

@@ -78,0 +78,0 @@ start=self.time_stamps[timestamp_a_name],

@@ -1,8 +0,13 @@

from enum import Enum
import enum
class CustomEnum(enum.Enum):
def __str__(self) -> str:
return self.value
# These flavors are a subset of MLflow's model flavors.
# They are used to decide which MLflow model flavor to use.
# https://mlflow.org/docs/latest/models.html#built-in-model-flavors
class MLModelFlavor(str, Enum):
class MLModelFlavor(str, enum.Enum):
SKLEARN = "sklearn" # Scikit-learn

@@ -13,1 +18,13 @@ PYTORCH = "pytorch"

# This list can be further expanded.
class FLOpsMode(str, enum.Enum):
CLASSIC = "classic"
HIERARCHICAL = "hierarchical"
class AggregatorType(enum.Enum):
CLASSIC_AGGREGATOR = "CLASSIC_AGGREGATOR"
ROOT_AGGREGATOR = "ROOT_AGGREGATOR"
CLUSTER_AGGREGATOR = "CLUSTER_AGGREGATOR"
Metadata-Version: 2.1
Name: flops_utils
Version: 0.4.7
Version: 0.5.1
Summary: A library containing commong utilities for FLOps

@@ -15,3 +15,6 @@ Author: Alexander Malyuk

TODO
# TODO move to the mono FLOps repo once it is public !
# tmp_flops_utils
Will be moved to the Oakestra FLOps Repo once it becomes public
[tool.poetry]
name = "flops_utils"
version = "0.4.7"
version = "0.5.1"
description = "A library containing commong utilities for FLOps"

@@ -18,6 +18,6 @@ authors = ["Alexander Malyuk <malyuk.alexander1999@gmail.com>"]

packages = ["flops_utils/*"]
#packages = ["flops_utils/*"]
line-length = 100
[tool.black]
#line-length = 100
#[tool.black]

@@ -1,1 +0,4 @@

TODO
# TODO move to the mono FLOps repo once it is public !
# tmp_flops_utils
Will be moved to the Oakestra FLOps Repo once it becomes public
# Note: This wrapper is used to provide ML repo developers/users with mock FLOps Learner components.
# E.g. The ML repo developer does not have access to any data of the worker nodes yet.
# This data will be fetched by the Learner's data_loading from the Data Manager Sidecar.
# This data_loading is part of the Learner image and should be abstracted away from the ML repo.
# To be able to include the data_loading methods in the ML repo code these mocks are provided.
# These mocks will be replaced with the real implementation during the FLOps image build process.
import sys
from flops_utils.logging import logger
def load_ml_data():
try:
from data_loading import load_data_from_worker_node # type: ignore
return load_data_from_worker_node()
except ImportError:
logger.exception("The data_loading file was not found.")
sys.exit(1)
# Used to abstract away concrete uses of MLflow model flavors.
# Based on the provided flavor from the FLOps SLA the specific MLflow model flavor will be used.
import os
import sys
from flops_utils.logging import logger
from flops_utils.types import MLModelFlavor
def get_ml_model_flavor():
match MLModelFlavor(os.environ.get("ML_MODEL_FLAVOR")):
case MLModelFlavor.KERAS:
import mlflow.keras
return mlflow.keras
case MLModelFlavor.PYTORCH:
import mlflow.pytorch
return mlflow.pytorch
case MLModelFlavor.SKLEARN:
import mlflow.sklearn
return mlflow.sklearn
case _:
logger.exception("Provided MLModelFlavor is not supported yet.")
sys.exit(1)
# Note: This wrapper is used to handle ml repo files
# which get injected during the image build.
# I.e. these files are not yet present.
# Additionally it handles exceptions and helps linters, etc to work normally.
import sys
from flops_utils.logging import logger
def get_model_manager():
try:
from model_manager import ModelManager # type: ignore
return ModelManager()
except ImportError:
logger.exception("A ML repo file was not found.")
sys.exit(1)