New Research: Supply Chain Attack on Axios Pulls Malicious Dependency from npm.Details โ†’ โ†’
Socket
Book a DemoSign in
Socket

mlopscli

Package Overview
Dependencies
Maintainers
1
Versions
5
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

mlopscli - pypi Package Compare versions

Comparing version
0.1.0
to
0.1.1
+3
mlopscli/constants.py
import time
ARTIFACTS_DIRECTORY = f"mlops_artifacts/run_{time.strftime('%Y-%m-%dT%H:%M:%S')}/"
+25
-2
# mlopscli/cli.py
import typer
from pathlib import Path
from mlopscli.runner import run_pipeline
from mlopscli.runner import run_pipeline, dry_run_pipeline
import shutil

@@ -19,6 +19,9 @@

),
observe: bool = typer.Option(
False, "--observe", help="Enable compute usage tracking per step"
),
):
"""Runs the jobs as a pipeline and renders DAG if requested."""
typer.echo(f"Running job: {job_name}")
run_pipeline(job_name, job_config, render_dag)
run_pipeline(job_name, job_config, render_dag, observe)

@@ -53,3 +56,23 @@

@app.command()
def dry_run(
job_name: str = typer.Option(
..., "--job", help="Job name (e.g., prepare_train_pipeline)"
),
job_config: Path = typer.Option(..., "--job_config", help="Path to job_order.yaml"),
):
"""Simulate running the jobs and show the execution flow without actually running them."""
typer.echo(f"Simulating job: {job_name}")
dry_run_pipeline(job_name, job_config)
@app.command()
def dashboard():
"""Launch the MLOps dashboard to monitor submitted jobs."""
import subprocess
subprocess.run(["streamlit", "run", "dashboard/dashboard.py"])
if __name__ == "__main__":
app()
+9
-12

@@ -5,16 +5,14 @@ # mlopscli/dag_visualizer.py

from pathlib import Path
from mlopscli.constants import ARTIFACTS_DIRECTORY
def visualize_dag(pipeline_name: str, steps: dict, output_path=None):
# Set default output path
if output_path is None:
output_path = f"mlops_artifacts/{pipeline_name}_dag.png"
output_path = f"{ARTIFACTS_DIRECTORY}/{pipeline_name}_dag.png"
# Ensure artifacts folder exists
artifacts_dir = Path(output_path).parent
artifacts_dir.mkdir(parents=True, exist_ok=True)
# Ensure output directory exists
Path(output_path).parent.mkdir(parents=True, exist_ok=True)
G = nx.DiGraph()
# Add nodes and edges
for step_name, step_info in steps.items():

@@ -25,9 +23,6 @@ G.add_node(step_name)

# Simple linear topological sort to arrange nodes
ordered_steps = list(nx.topological_sort(G))
pos = {
step: (i, 0) for i, step in enumerate(ordered_steps)
} # x=i, y=0 for horizontal layout
# Use spring layout for a more natural-looking DAG
pos = nx.spring_layout(G, k=1.2, iterations=100, seed=42)
plt.figure(figsize=(2.5 * len(steps), 2.5))
plt.figure(figsize=(2.5 * len(steps), 3))
nx.draw(

@@ -42,2 +37,4 @@ G,

edge_color="gray",
arrowstyle="-|>",
arrowsize=20,
)

@@ -44,0 +41,0 @@

# mlopscli/pipeline.py
import subprocess
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, wait, FIRST_COMPLETED
from threading import Thread
import networkx as nx
import psutil, time, json
from mlopscli.constants import ARTIFACTS_DIRECTORY
def topological_sort(steps: dict):
from collections import defaultdict, deque
def write_metadata(run_dir, job_name, steps, timestamp):
"""Writes the metadata to a JSON file in the run directory."""
metadata = {"job_name": job_name, "timestamp": timestamp, "steps": steps}
metadata_path = run_dir / "metadata.json"
with open(metadata_path, "w") as f:
json.dump(metadata, f, indent=4)
graph = defaultdict(list)
indegree = defaultdict(int)
for name, step in steps.items():
for dep in step.get("depends_on", []):
graph[dep].append(name)
indegree[name] += 1
def monitor_process(proc, metrics):
p = psutil.Process(proc.pid)
mem_usage = []
cpu_usage = []
queue = deque([name for name in steps if indegree[name] == 0])
sorted_steps = []
while proc.poll() is None:
try:
mem = p.memory_info().rss / (1024 * 1024) # in MB
cpu = p.cpu_percent(interval=0.2)
mem_usage.append(mem)
cpu_usage.append(cpu)
except psutil.NoSuchProcess:
break
while queue:
node = queue.popleft()
sorted_steps.append(steps[node])
for neighbor in graph[node]:
indegree[neighbor] -= 1
if indegree[neighbor] == 0:
queue.append(neighbor)
metrics["memory"] = mem_usage
metrics["cpu"] = cpu_usage
if len(sorted_steps) != len(steps):
raise ValueError("Cycle detected in job dependencies!")
return sorted_steps
def setup_virtualenv(step_name: str, requirements_path: str):

@@ -64,7 +67,23 @@ """Creates and sets up a virtual environment for the given step."""

def execute_scripts(steps_dict):
"""Execute the job steps in the correct order (topologically sorted)."""
sorted_steps = topological_sort(steps_dict)
def build_dependency_graph(steps_dict):
G = nx.DiGraph()
for step_name, step_info in steps_dict.items():
G.add_node(step_name, info=step_info)
for dep in step_info.get("depends_on", []):
G.add_edge(dep, step_name)
return G
for step in sorted_steps:
def execute_scripts(job_name, steps_dict, max_workers=4, observe=False):
timestamp = time.strftime("%Y-%m-%dT%H:%M:%S")
G = build_dependency_graph(steps_dict)
completed = set()
running_futures = {}
run_dir = Path(ARTIFACTS_DIRECTORY)
run_dir.mkdir(parents=True, exist_ok=True)
steps_metadata = []
def run_step(step_name, observe):
start = time.time()
step = steps_dict[step_name]
name = step["name"]

@@ -75,4 +94,2 @@ script = step["script"]

print(f"\n๐Ÿ”ง Running step: {name} ({script})")
# Check if the script exists
script_path = Path(script)

@@ -82,17 +99,74 @@ if not script_path.exists():

# Set up the virtual environment for this step and get the python executable
python_exe = setup_virtualenv(name, requirements)
# Run the script in the virtual environment
result = subprocess.run(
[str(python_exe), str(script_path)], capture_output=True, text=True
)
if observe:
metrics = {}
result = subprocess.Popen([str(python_exe), str(script_path)])
monitor = Thread(target=monitor_process, args=(result, metrics))
monitor.start()
result.wait()
monitor.join()
else:
result = subprocess.run(
[str(python_exe), str(script_path)],
capture_output=True,
text=True,
)
# Handle the result
end = time.time()
duration = end - start
step_metadata = {
"name": name,
"status": "success" if result.returncode == 0 else "failed",
"duration": duration,
"script": str(script_path),
}
steps_metadata.append(step_metadata)
if result.returncode != 0:
print(f"โŒ Step '{name}' failed.")
print(result.stderr)
break
raise RuntimeError(f"โŒ Step '{name}' failed:\n{result.stderr}")
print(f"โœ… Step '{name}' completed in {duration:.2f}s")
if observe:
peak_mem = max(metrics["memory"]) if metrics["memory"] else 0
avg_cpu = sum(metrics["cpu"]) / len(metrics["cpu"]) if metrics["cpu"] else 0
print(f"๐Ÿง  Peak memory for step {name}: {peak_mem:.2f} MB")
print(f"โš™๏ธ Avg CPU for step {name}: {avg_cpu:.2f}%")
else:
print(f"โœ… Step '{name}' completed.")
print(result.stdout)
with ThreadPoolExecutor(max_workers=max_workers) as executor:
while len(completed) < len(G.nodes):
# Get new ready steps (not completed, not already running)
ready_steps = [
node
for node in G.nodes
if node not in completed
and node not in running_futures.values()
and all(pred in completed for pred in G.predecessors(node))
]
# Submit new ready steps
for step_name in ready_steps:
future = executor.submit(run_step, step_name, observe)
running_futures[future] = step_name
if not running_futures:
raise RuntimeError("โš ๏ธ Deadlock detected or no runnable steps found.")
# Process completed futures
done, _ = wait(running_futures.keys(), return_when=FIRST_COMPLETED)
for future in done:
step_name = running_futures.pop(future)
try:
future.result()
completed.add(step_name)
except Exception as e:
print(f"โŒ Error in step '{step_name}': {e}")
executor.shutdown(wait=False)
return # Exit early on error
write_metadata(run_dir, job_name, steps_metadata, timestamp)

@@ -7,4 +7,69 @@ # mlopscli/runner.py

from pathlib import Path
def run_pipeline(job_name: str, job_config: Path, render_dag: bool = True):
def dry_run_pipeline(job_name: str, job_config: Path):
"""Simulate running the pipeline steps without actually executing them."""
_, steps = load_job_config(job_config)
print(f"\n๐Ÿงช Dry Run for Pipeline: {job_name}")
print(f"๐Ÿ“„ Pipeline Configuration: {job_config.resolve()}")
step_names = {step.get("name") for step in steps.values()}
missing_dependencies = []
missing_scripts = []
missing_requirements = []
for step in steps.values():
name = step.get("name")
script = step.get("script")
requirements = step.get("requirements", None)
depends_on = step.get("depends_on", [])
print(f"\n๐Ÿ” Step: {name}")
print(f" ๐Ÿ“œ Script: {script}")
print(f" ๐Ÿ“ฆ Requirements: {requirements or 'N/A'}")
print(f" ๐Ÿ”— Depends on: {', '.join(depends_on) if depends_on else 'None'}")
# Validate script path
if not Path(script).exists():
missing_scripts.append((name, script))
# Validate requirements path (if provided)
if requirements and not Path(requirements).exists():
missing_requirements.append((name, requirements))
# Validate depends_on steps
for dep in depends_on:
if dep not in step_names:
missing_dependencies.append((name, dep))
if missing_scripts or missing_requirements or missing_dependencies:
print("\nโŒ Issues detected during dry run:")
if missing_scripts:
print("\n๐Ÿšซ Missing Scripts:")
for step, path in missing_scripts:
print(f" - Step '{step}': script path not found -> {path}")
if missing_requirements:
print("\n๐Ÿšซ Missing Requirements:")
for step, path in missing_requirements:
print(f" - Step '{step}': requirements path not found -> {path}")
if missing_dependencies:
print("\n๐Ÿšซ Invalid Dependencies:")
for step, dep in missing_dependencies:
print(f" - Step '{step}' depends on undefined step '{dep}'")
print("\n๐Ÿ’ฅ Dry run failed due to the above errors.")
else:
print(
"\nโœ… Dry run completed successfully. All paths and dependencies look good!"
)
def run_pipeline(
job_name: str, job_config: Path, render_dag: bool = True, observe: bool = False
):
pipeline_name, step_dict = load_job_config(job_config)

@@ -14,4 +79,4 @@ print(f"Starting pipeline: {pipeline_name} under job {job_name}")

if render_dag:
visualize_dag(pipeline_name, step_dict)
visualize_dag(job_name, step_dict)
execute_scripts(step_dict)
execute_scripts(job_name=job_name, steps_dict=step_dict, observe=observe)
Metadata-Version: 2.3
Name: mlopscli
Version: 0.1.0
Version: 0.1.1
Summary: CLI to turn DS scripts to composable pipelines.

@@ -12,5 +12,9 @@ Author: Himanshu Bajpai

Classifier: Programming Language :: Python :: 3.13
Provides-Extra: dashboard
Requires-Dist: bumpversion (>=0.6.0)
Requires-Dist: matplotlib (>=3.10.1)
Requires-Dist: networkx (>=3.4.2)
Requires-Dist: psutil (>=6.0.0)
Requires-Dist: pyyaml (>=6.0.2)
Requires-Dist: streamlit (>=1.34.0) ; extra == "dashboard"
Requires-Dist: typer (>=0.15.2)

@@ -26,7 +30,5 @@ Description-Content-Type: text/markdown

1. Write your scripts (`data_prep.py`, `train_model.py`, `evaluate_model.py`)
2. Define them in a `job_order.yaml`
3. Run:
2. Define them in a `job_order.yaml` with the dependencies.
3. Install the mlops cli : `pip install mlopscli`
4. Run the command: `mlopscli execute --job prepare_train_pipeline --job_config job_order.yaml`
```bash
python -m mlopscli.cli --job "prepare_train_pipeline" --job_config job_examples/job_order.yaml
[project]
name = "mlopscli"
version = "0.1.0"
version = "0.1.1"
description = "CLI to turn DS scripts to composable pipelines."

@@ -14,5 +14,12 @@ authors = [

"networkx>=3.4.2",
"matplotlib>=3.10.1"
"matplotlib>=3.10.1",
"bumpversion>=0.6.0",
"psutil>=6.0.0"
]
[project.optional-dependencies]
dashboard = [
"streamlit>=1.34.0",
]
[tool.poetry.scripts]

@@ -19,0 +26,0 @@ mlopscli = "mlopscli.cli:app"

@@ -8,6 +8,4 @@ # mlopscli ๐Ÿš€

1. Write your scripts (`data_prep.py`, `train_model.py`, `evaluate_model.py`)
2. Define them in a `job_order.yaml`
3. Run:
```bash
python -m mlopscli.cli --job "prepare_train_pipeline" --job_config job_examples/job_order.yaml
2. Define them in a `job_order.yaml` with the dependencies.
3. Install the mlops cli : `pip install mlopscli`
4. Run the command: `mlopscli execute --job prepare_train_pipeline --job_config job_order.yaml`