mlopscli
Advanced tools
| import time | ||
| ARTIFACTS_DIRECTORY = f"mlops_artifacts/run_{time.strftime('%Y-%m-%dT%H:%M:%S')}/" |
+25
-2
| # mlopscli/cli.py | ||
| import typer | ||
| from pathlib import Path | ||
| from mlopscli.runner import run_pipeline | ||
| from mlopscli.runner import run_pipeline, dry_run_pipeline | ||
| import shutil | ||
@@ -19,6 +19,9 @@ | ||
| ), | ||
| observe: bool = typer.Option( | ||
| False, "--observe", help="Enable compute usage tracking per step" | ||
| ), | ||
| ): | ||
| """Runs the jobs as a pipeline and renders DAG if requested.""" | ||
| typer.echo(f"Running job: {job_name}") | ||
| run_pipeline(job_name, job_config, render_dag) | ||
| run_pipeline(job_name, job_config, render_dag, observe) | ||
@@ -53,3 +56,23 @@ | ||
| @app.command() | ||
| def dry_run( | ||
| job_name: str = typer.Option( | ||
| ..., "--job", help="Job name (e.g., prepare_train_pipeline)" | ||
| ), | ||
| job_config: Path = typer.Option(..., "--job_config", help="Path to job_order.yaml"), | ||
| ): | ||
| """Simulate running the jobs and show the execution flow without actually running them.""" | ||
| typer.echo(f"Simulating job: {job_name}") | ||
| dry_run_pipeline(job_name, job_config) | ||
| @app.command() | ||
| def dashboard(): | ||
| """Launch the MLOps dashboard to monitor submitted jobs.""" | ||
| import subprocess | ||
| subprocess.run(["streamlit", "run", "dashboard/dashboard.py"]) | ||
| if __name__ == "__main__": | ||
| app() |
@@ -5,16 +5,14 @@ # mlopscli/dag_visualizer.py | ||
| from pathlib import Path | ||
| from mlopscli.constants import ARTIFACTS_DIRECTORY | ||
| def visualize_dag(pipeline_name: str, steps: dict, output_path=None): | ||
| # Set default output path | ||
| if output_path is None: | ||
| output_path = f"mlops_artifacts/{pipeline_name}_dag.png" | ||
| output_path = f"{ARTIFACTS_DIRECTORY}/{pipeline_name}_dag.png" | ||
| # Ensure artifacts folder exists | ||
| artifacts_dir = Path(output_path).parent | ||
| artifacts_dir.mkdir(parents=True, exist_ok=True) | ||
| # Ensure output directory exists | ||
| Path(output_path).parent.mkdir(parents=True, exist_ok=True) | ||
| G = nx.DiGraph() | ||
| # Add nodes and edges | ||
| for step_name, step_info in steps.items(): | ||
@@ -25,9 +23,6 @@ G.add_node(step_name) | ||
| # Simple linear topological sort to arrange nodes | ||
| ordered_steps = list(nx.topological_sort(G)) | ||
| pos = { | ||
| step: (i, 0) for i, step in enumerate(ordered_steps) | ||
| } # x=i, y=0 for horizontal layout | ||
| # Use spring layout for a more natural-looking DAG | ||
| pos = nx.spring_layout(G, k=1.2, iterations=100, seed=42) | ||
| plt.figure(figsize=(2.5 * len(steps), 2.5)) | ||
| plt.figure(figsize=(2.5 * len(steps), 3)) | ||
| nx.draw( | ||
@@ -42,2 +37,4 @@ G, | ||
| edge_color="gray", | ||
| arrowstyle="-|>", | ||
| arrowsize=20, | ||
| ) | ||
@@ -44,0 +41,0 @@ |
+112
-38
| # mlopscli/pipeline.py | ||
| import subprocess | ||
| from pathlib import Path | ||
| from concurrent.futures import ThreadPoolExecutor, wait, FIRST_COMPLETED | ||
| from threading import Thread | ||
| import networkx as nx | ||
| import psutil, time, json | ||
| from mlopscli.constants import ARTIFACTS_DIRECTORY | ||
| def topological_sort(steps: dict): | ||
| from collections import defaultdict, deque | ||
| def write_metadata(run_dir, job_name, steps, timestamp): | ||
| """Writes the metadata to a JSON file in the run directory.""" | ||
| metadata = {"job_name": job_name, "timestamp": timestamp, "steps": steps} | ||
| metadata_path = run_dir / "metadata.json" | ||
| with open(metadata_path, "w") as f: | ||
| json.dump(metadata, f, indent=4) | ||
| graph = defaultdict(list) | ||
| indegree = defaultdict(int) | ||
| for name, step in steps.items(): | ||
| for dep in step.get("depends_on", []): | ||
| graph[dep].append(name) | ||
| indegree[name] += 1 | ||
| def monitor_process(proc, metrics): | ||
| p = psutil.Process(proc.pid) | ||
| mem_usage = [] | ||
| cpu_usage = [] | ||
| queue = deque([name for name in steps if indegree[name] == 0]) | ||
| sorted_steps = [] | ||
| while proc.poll() is None: | ||
| try: | ||
| mem = p.memory_info().rss / (1024 * 1024) # in MB | ||
| cpu = p.cpu_percent(interval=0.2) | ||
| mem_usage.append(mem) | ||
| cpu_usage.append(cpu) | ||
| except psutil.NoSuchProcess: | ||
| break | ||
| while queue: | ||
| node = queue.popleft() | ||
| sorted_steps.append(steps[node]) | ||
| for neighbor in graph[node]: | ||
| indegree[neighbor] -= 1 | ||
| if indegree[neighbor] == 0: | ||
| queue.append(neighbor) | ||
| metrics["memory"] = mem_usage | ||
| metrics["cpu"] = cpu_usage | ||
| if len(sorted_steps) != len(steps): | ||
| raise ValueError("Cycle detected in job dependencies!") | ||
| return sorted_steps | ||
| def setup_virtualenv(step_name: str, requirements_path: str): | ||
@@ -64,7 +67,23 @@ """Creates and sets up a virtual environment for the given step.""" | ||
| def execute_scripts(steps_dict): | ||
| """Execute the job steps in the correct order (topologically sorted).""" | ||
| sorted_steps = topological_sort(steps_dict) | ||
| def build_dependency_graph(steps_dict): | ||
| G = nx.DiGraph() | ||
| for step_name, step_info in steps_dict.items(): | ||
| G.add_node(step_name, info=step_info) | ||
| for dep in step_info.get("depends_on", []): | ||
| G.add_edge(dep, step_name) | ||
| return G | ||
| for step in sorted_steps: | ||
| def execute_scripts(job_name, steps_dict, max_workers=4, observe=False): | ||
| timestamp = time.strftime("%Y-%m-%dT%H:%M:%S") | ||
| G = build_dependency_graph(steps_dict) | ||
| completed = set() | ||
| running_futures = {} | ||
| run_dir = Path(ARTIFACTS_DIRECTORY) | ||
| run_dir.mkdir(parents=True, exist_ok=True) | ||
| steps_metadata = [] | ||
| def run_step(step_name, observe): | ||
| start = time.time() | ||
| step = steps_dict[step_name] | ||
| name = step["name"] | ||
@@ -75,4 +94,2 @@ script = step["script"] | ||
| print(f"\n๐ง Running step: {name} ({script})") | ||
| # Check if the script exists | ||
| script_path = Path(script) | ||
@@ -82,17 +99,74 @@ if not script_path.exists(): | ||
| # Set up the virtual environment for this step and get the python executable | ||
| python_exe = setup_virtualenv(name, requirements) | ||
| # Run the script in the virtual environment | ||
| result = subprocess.run( | ||
| [str(python_exe), str(script_path)], capture_output=True, text=True | ||
| ) | ||
| if observe: | ||
| metrics = {} | ||
| result = subprocess.Popen([str(python_exe), str(script_path)]) | ||
| monitor = Thread(target=monitor_process, args=(result, metrics)) | ||
| monitor.start() | ||
| result.wait() | ||
| monitor.join() | ||
| else: | ||
| result = subprocess.run( | ||
| [str(python_exe), str(script_path)], | ||
| capture_output=True, | ||
| text=True, | ||
| ) | ||
| # Handle the result | ||
| end = time.time() | ||
| duration = end - start | ||
| step_metadata = { | ||
| "name": name, | ||
| "status": "success" if result.returncode == 0 else "failed", | ||
| "duration": duration, | ||
| "script": str(script_path), | ||
| } | ||
| steps_metadata.append(step_metadata) | ||
| if result.returncode != 0: | ||
| print(f"โ Step '{name}' failed.") | ||
| print(result.stderr) | ||
| break | ||
| raise RuntimeError(f"โ Step '{name}' failed:\n{result.stderr}") | ||
| print(f"โ Step '{name}' completed in {duration:.2f}s") | ||
| if observe: | ||
| peak_mem = max(metrics["memory"]) if metrics["memory"] else 0 | ||
| avg_cpu = sum(metrics["cpu"]) / len(metrics["cpu"]) if metrics["cpu"] else 0 | ||
| print(f"๐ง Peak memory for step {name}: {peak_mem:.2f} MB") | ||
| print(f"โ๏ธ Avg CPU for step {name}: {avg_cpu:.2f}%") | ||
| else: | ||
| print(f"โ Step '{name}' completed.") | ||
| print(result.stdout) | ||
| with ThreadPoolExecutor(max_workers=max_workers) as executor: | ||
| while len(completed) < len(G.nodes): | ||
| # Get new ready steps (not completed, not already running) | ||
| ready_steps = [ | ||
| node | ||
| for node in G.nodes | ||
| if node not in completed | ||
| and node not in running_futures.values() | ||
| and all(pred in completed for pred in G.predecessors(node)) | ||
| ] | ||
| # Submit new ready steps | ||
| for step_name in ready_steps: | ||
| future = executor.submit(run_step, step_name, observe) | ||
| running_futures[future] = step_name | ||
| if not running_futures: | ||
| raise RuntimeError("โ ๏ธ Deadlock detected or no runnable steps found.") | ||
| # Process completed futures | ||
| done, _ = wait(running_futures.keys(), return_when=FIRST_COMPLETED) | ||
| for future in done: | ||
| step_name = running_futures.pop(future) | ||
| try: | ||
| future.result() | ||
| completed.add(step_name) | ||
| except Exception as e: | ||
| print(f"โ Error in step '{step_name}': {e}") | ||
| executor.shutdown(wait=False) | ||
| return # Exit early on error | ||
| write_metadata(run_dir, job_name, steps_metadata, timestamp) |
+68
-3
@@ -7,4 +7,69 @@ # mlopscli/runner.py | ||
| from pathlib import Path | ||
| def run_pipeline(job_name: str, job_config: Path, render_dag: bool = True): | ||
| def dry_run_pipeline(job_name: str, job_config: Path): | ||
| """Simulate running the pipeline steps without actually executing them.""" | ||
| _, steps = load_job_config(job_config) | ||
| print(f"\n๐งช Dry Run for Pipeline: {job_name}") | ||
| print(f"๐ Pipeline Configuration: {job_config.resolve()}") | ||
| step_names = {step.get("name") for step in steps.values()} | ||
| missing_dependencies = [] | ||
| missing_scripts = [] | ||
| missing_requirements = [] | ||
| for step in steps.values(): | ||
| name = step.get("name") | ||
| script = step.get("script") | ||
| requirements = step.get("requirements", None) | ||
| depends_on = step.get("depends_on", []) | ||
| print(f"\n๐ Step: {name}") | ||
| print(f" ๐ Script: {script}") | ||
| print(f" ๐ฆ Requirements: {requirements or 'N/A'}") | ||
| print(f" ๐ Depends on: {', '.join(depends_on) if depends_on else 'None'}") | ||
| # Validate script path | ||
| if not Path(script).exists(): | ||
| missing_scripts.append((name, script)) | ||
| # Validate requirements path (if provided) | ||
| if requirements and not Path(requirements).exists(): | ||
| missing_requirements.append((name, requirements)) | ||
| # Validate depends_on steps | ||
| for dep in depends_on: | ||
| if dep not in step_names: | ||
| missing_dependencies.append((name, dep)) | ||
| if missing_scripts or missing_requirements or missing_dependencies: | ||
| print("\nโ Issues detected during dry run:") | ||
| if missing_scripts: | ||
| print("\n๐ซ Missing Scripts:") | ||
| for step, path in missing_scripts: | ||
| print(f" - Step '{step}': script path not found -> {path}") | ||
| if missing_requirements: | ||
| print("\n๐ซ Missing Requirements:") | ||
| for step, path in missing_requirements: | ||
| print(f" - Step '{step}': requirements path not found -> {path}") | ||
| if missing_dependencies: | ||
| print("\n๐ซ Invalid Dependencies:") | ||
| for step, dep in missing_dependencies: | ||
| print(f" - Step '{step}' depends on undefined step '{dep}'") | ||
| print("\n๐ฅ Dry run failed due to the above errors.") | ||
| else: | ||
| print( | ||
| "\nโ Dry run completed successfully. All paths and dependencies look good!" | ||
| ) | ||
| def run_pipeline( | ||
| job_name: str, job_config: Path, render_dag: bool = True, observe: bool = False | ||
| ): | ||
| pipeline_name, step_dict = load_job_config(job_config) | ||
@@ -14,4 +79,4 @@ print(f"Starting pipeline: {pipeline_name} under job {job_name}") | ||
| if render_dag: | ||
| visualize_dag(pipeline_name, step_dict) | ||
| visualize_dag(job_name, step_dict) | ||
| execute_scripts(step_dict) | ||
| execute_scripts(job_name=job_name, steps_dict=step_dict, observe=observe) |
+8
-6
| Metadata-Version: 2.3 | ||
| Name: mlopscli | ||
| Version: 0.1.0 | ||
| Version: 0.1.1 | ||
| Summary: CLI to turn DS scripts to composable pipelines. | ||
@@ -12,5 +12,9 @@ Author: Himanshu Bajpai | ||
| Classifier: Programming Language :: Python :: 3.13 | ||
| Provides-Extra: dashboard | ||
| Requires-Dist: bumpversion (>=0.6.0) | ||
| Requires-Dist: matplotlib (>=3.10.1) | ||
| Requires-Dist: networkx (>=3.4.2) | ||
| Requires-Dist: psutil (>=6.0.0) | ||
| Requires-Dist: pyyaml (>=6.0.2) | ||
| Requires-Dist: streamlit (>=1.34.0) ; extra == "dashboard" | ||
| Requires-Dist: typer (>=0.15.2) | ||
@@ -26,7 +30,5 @@ Description-Content-Type: text/markdown | ||
| 1. Write your scripts (`data_prep.py`, `train_model.py`, `evaluate_model.py`) | ||
| 2. Define them in a `job_order.yaml` | ||
| 3. Run: | ||
| 2. Define them in a `job_order.yaml` with the dependencies. | ||
| 3. Install the mlops cli : `pip install mlopscli` | ||
| 4. Run the command: `mlopscli execute --job prepare_train_pipeline --job_config job_order.yaml` | ||
| ```bash | ||
| python -m mlopscli.cli --job "prepare_train_pipeline" --job_config job_examples/job_order.yaml | ||
+9
-2
| [project] | ||
| name = "mlopscli" | ||
| version = "0.1.0" | ||
| version = "0.1.1" | ||
| description = "CLI to turn DS scripts to composable pipelines." | ||
@@ -14,5 +14,12 @@ authors = [ | ||
| "networkx>=3.4.2", | ||
| "matplotlib>=3.10.1" | ||
| "matplotlib>=3.10.1", | ||
| "bumpversion>=0.6.0", | ||
| "psutil>=6.0.0" | ||
| ] | ||
| [project.optional-dependencies] | ||
| dashboard = [ | ||
| "streamlit>=1.34.0", | ||
| ] | ||
| [tool.poetry.scripts] | ||
@@ -19,0 +26,0 @@ mlopscli = "mlopscli.cli:app" |
+3
-5
@@ -8,6 +8,4 @@ # mlopscli ๐ | ||
| 1. Write your scripts (`data_prep.py`, `train_model.py`, `evaluate_model.py`) | ||
| 2. Define them in a `job_order.yaml` | ||
| 3. Run: | ||
| ```bash | ||
| python -m mlopscli.cli --job "prepare_train_pipeline" --job_config job_examples/job_order.yaml | ||
| 2. Define them in a `job_order.yaml` with the dependencies. | ||
| 3. Install the mlops cli : `pip install mlopscli` | ||
| 4. Run the command: `mlopscli execute --job prepare_train_pipeline --job_config job_order.yaml` |
Alert delta unavailable
Currently unable to show alert delta for PyPI packages.
15645
69.04%9
12.5%328
67.35%