running-process

A modern subprocess.Popen wrapper with improved process management, real-time output streaming, and enhanced lifecycle control.
Features
- Real-time Output Streaming: Stream process output via queues with customizable formatting
- Thread-safe Process Management: Centralized registry for tracking and debugging active processes
- Enhanced Timeout Handling: Optional stack trace dumping for debugging hanging processes
- Process Tree Termination: Kill entire process trees including child processes (requires psutil)
- Cross-platform Support: Works on Windows (MSYS), macOS, and Linux
- Flexible Output Formatting: Protocol-based output transformation with built-in formatters
- Iterator Interface: Context-managed line-by-line iteration over process output
Quick Start
Basic Usage
from running_process import RunningProcess
process = RunningProcess(["echo", "Hello World"])
for line in process:
print(f"Output: {line}")
if process.wait() != 0:
print("Command failed!")
Advanced Features
from running_process import RunningProcess
from pathlib import Path
process = RunningProcess(
command=["python", "long_script.py"],
cwd=Path("./scripts"),
timeout=300,
enable_stack_trace=True,
check=True,
)
while process.is_running():
try:
line = process.get_next_line(timeout=1.0)
print(f"[{process.elapsed_time:.1f}s] {line}")
except TimeoutError:
print("No output for 1 second...")
continue
exit_code = process.wait()
Output Formatting
from running_process import RunningProcess, TimeDeltaFormatter
formatter = TimeDeltaFormatter()
process = RunningProcess(
["gcc", "-v", "main.c"],
output_formatter=formatter
)
class TimestampFormatter:
def begin(self): pass
def end(self): pass
def transform(self, line: str) -> str:
from datetime import datetime
timestamp = datetime.now().strftime("%H:%M:%S")
return f"[{timestamp}] {line}"
process = RunningProcess(["make"], output_formatter=TimestampFormatter())
Process Management
from running_process import RunningProcessManager
manager = RunningProcessManager.get_instance()
for proc_id, process in manager.get_all_processes():
print(f"Process {proc_id}: {process.command_str}")
manager.cleanup_finished_processes()
Installation
pip install running_process
Dependencies
This package includes psutil as a required dependency for process tree management functionality.
Architecture
The library follows a layered design with these core components:
- RunningProcess: Main class wrapping subprocess.Popen with enhanced features
- ProcessOutputReader: Dedicated threaded reader that drains process stdout/stderr
- RunningProcessManager: Thread-safe singleton registry for tracking active processes
- OutputFormatter: Protocol for transforming process output (with NullOutputFormatter and TimeDeltaFormatter implementations)
- process_utils: Utilities for process tree operations
Development
Setup
git clone https://github.com/yourusername/running-process.git
cd running-process
. ./activate.sh
Testing
./test
uv run pytest --cov=running_process tests/
Linting
./lint
uv run ruff check --fix src tests
uv run black src tests
uv run pyright src tests
API Reference
RunningProcess
The main class for managing subprocess execution:
class RunningProcess:
def __init__(
self,
command: str | list[str],
cwd: Path | None = None,
check: bool = False,
auto_run: bool = True,
shell: bool | None = None,
timeout: int | None = None,
enable_stack_trace: bool = False,
on_complete: Callable[[], None] | None = None,
output_formatter: OutputFormatter | None = None,
) -> None: ...
def get_next_line(self, timeout: float | None = None) -> str | EndOfStream: ...
def wait(self, timeout: float | None = None) -> int: ...
def kill(self) -> None: ...
def is_running(self) -> bool: ...
def drain_stdout(self) -> list[str]: ...
Key Methods
get_next_line(timeout): Get the next line of output with optional timeout
wait(timeout): Wait for process completion, returns exit code
kill(): Terminate the process (and process tree if psutil available)
is_running(): Check if process is still executing
drain_stdout(): Get all currently available output lines
ProcessOutputReader
Internal threaded reader that drains process stdout/stderr:
class ProcessOutputReader:
def __init__(
self,
proc: subprocess.Popen[Any],
shutdown: threading.Event,
output_formatter: OutputFormatter | None,
on_output: Callable[[str | EndOfStream], None],
on_end: Callable[[], None],
) -> None: ...
def run(self) -> None: ...
OutputFormatter Protocol
class OutputFormatter(Protocol):
def begin(self) -> None: ...
def transform(self, line: str) -> str: ...
def end(self) -> None: ...
Built-in implementations:
NullOutputFormatter: No-op formatter (default)
TimeDeltaFormatter: Adds elapsed time prefix to each line
License
BSD 3-Clause License
Contributing
- Fork the repository
- Create a feature branch
- Make your changes following the existing code style
- Run tests and linting:
./test && ./lint
- Submit a pull request
For bug reports and feature requests, please use the GitHub Issues page.