running-process
Advanced tools
| """Line iterator module. | ||
| This module contains the _RunningProcessLineIterator class for iterating over | ||
| process output lines in a context-managed way. | ||
| """ | ||
| from collections.abc import Iterator | ||
| from contextlib import AbstractContextManager | ||
| from typing import TYPE_CHECKING, Any | ||
| if TYPE_CHECKING: | ||
| from running_process.running_process import RunningProcess | ||
| from running_process.process_output_reader import EndOfStream | ||
| class _RunningProcessLineIterator(AbstractContextManager[Iterator[str]], Iterator[str]): | ||
| """Context-managed iterator over a RunningProcess's output lines. | ||
| Yields only strings (never None). Stops on EndOfStream or when a per-line | ||
| timeout elapses. | ||
| """ | ||
| def __init__(self, rp: "RunningProcess", timeout: float | None) -> None: | ||
| self._rp = rp | ||
| self._timeout = timeout | ||
| # Context manager protocol | ||
| def __enter__(self) -> "_RunningProcessLineIterator": | ||
| return self | ||
| def __exit__( | ||
| self, | ||
| exc_type: type[BaseException] | None, | ||
| exc: BaseException | None, | ||
| tb: Any | None, | ||
| ) -> bool: | ||
| # Do not suppress exceptions | ||
| return False | ||
| # Iterator protocol | ||
| def __iter__(self) -> Iterator[str]: | ||
| return self | ||
| def __next__(self) -> str: | ||
| next_item: str | EndOfStream = self._rp.get_next_line(timeout=self._timeout) | ||
| if isinstance(next_item, EndOfStream): | ||
| raise StopIteration | ||
| # Must be a string by contract | ||
| return next_item |
| """Process output reader module. | ||
| This module contains the ProcessOutputReader class for handling subprocess output | ||
| in a dedicated thread to prevent blocking issues. | ||
| """ | ||
| import _thread | ||
| import logging | ||
| import threading | ||
| import time | ||
| import traceback | ||
| import warnings | ||
| from collections.abc import Callable | ||
| from subprocess import Popen | ||
| from typing import Any | ||
| from running_process.output_formatter import NullOutputFormatter, OutputFormatter | ||
| logger = logging.getLogger(__name__) | ||
| class EndOfStream: | ||
| """Sentinel used to indicate end-of-stream from the reader.""" | ||
| class ProcessOutputReader: | ||
| """Dedicated reader that drains a process's stdout and enqueues lines. | ||
| This keeps the stdout pipe drained to prevent blocking and forwards | ||
| transformed, non-empty lines to the provided output queue. It also invokes | ||
| lifecycle callbacks for timing/unregister behaviors. | ||
| """ | ||
| def __init__( | ||
| self, | ||
| proc: Popen[Any], | ||
| shutdown: threading.Event, | ||
| output_formatter: OutputFormatter | None, | ||
| on_output: Callable[[str | EndOfStream], None], | ||
| on_end: Callable[[], None], | ||
| ) -> None: | ||
| output_formatter = output_formatter or NullOutputFormatter() | ||
| self._proc = proc | ||
| self._shutdown = shutdown | ||
| self._output_formatter = output_formatter | ||
| self._on_output = on_output | ||
| self._on_end = on_end | ||
| self.last_stdout_ts: float | None = None | ||
| self._eos_emitted: bool = False | ||
| def _emit_eos_once(self) -> None: | ||
| """Ensure EndOfStream is only forwarded a single time.""" | ||
| if not self._eos_emitted: | ||
| self._eos_emitted = True | ||
| self._on_output(EndOfStream()) | ||
| def _initialize_formatter(self) -> None: | ||
| """Initialize the output formatter.""" | ||
| try: | ||
| self._output_formatter.begin() | ||
| except (AttributeError, TypeError, ValueError, RuntimeError) as e: | ||
| formatter_error_msg = f"Output formatter begin() failed: {e}" | ||
| warnings.warn(formatter_error_msg, stacklevel=2) | ||
| def _process_stdout_lines(self) -> None: | ||
| """Process stdout lines and forward them to output.""" | ||
| assert self._proc.stdout is not None | ||
| for line in self._proc.stdout: | ||
| self.last_stdout_ts = time.time() | ||
| if self._shutdown.is_set(): | ||
| break | ||
| line_stripped = line.rstrip() | ||
| if not line_stripped: | ||
| continue | ||
| transformed_line = self._output_formatter.transform(line_stripped) | ||
| self._on_output(transformed_line) | ||
| def _handle_keyboard_interrupt(self) -> None: | ||
| """Handle KeyboardInterrupt in reader thread.""" | ||
| # Per project rules, handle interrupts in threads explicitly | ||
| thread_id = threading.current_thread().ident | ||
| thread_name = threading.current_thread().name | ||
| logger.warning("Thread %s (%s) caught KeyboardInterrupt", thread_id, thread_name) | ||
| logger.warning("Stack trace for thread %s:", thread_id) | ||
| traceback.print_exc() | ||
| # Try to ensure child process is terminated promptly | ||
| try: | ||
| self._proc.kill() | ||
| except (ProcessLookupError, PermissionError, OSError) as kill_error: | ||
| logger.warning("Failed to kill process: %s", kill_error) | ||
| # Propagate to main thread and re-raise | ||
| _thread.interrupt_main() | ||
| # EOF | ||
| self._emit_eos_once() | ||
| def _handle_io_error(self, e: ValueError | OSError) -> None: | ||
| """Handle IO errors during stdout reading.""" | ||
| # Normal shutdown scenarios include closed file descriptors. | ||
| error_str = str(e) | ||
| if any(msg in error_str for msg in ["closed file", "Bad file descriptor"]): | ||
| closed_file_msg = f"Output reader encountered closed file: {e}" | ||
| warnings.warn(closed_file_msg, stacklevel=2) | ||
| else: | ||
| logger.warning("Output reader encountered error: %s", e) | ||
| def _cleanup_stdout(self) -> None: | ||
| """Close stdout stream safely.""" | ||
| if self._proc.stdout and not self._proc.stdout.closed: | ||
| try: | ||
| self._proc.stdout.close() | ||
| except (ValueError, OSError) as err: | ||
| reader_error_msg = f"Output reader encountered error: {err}" | ||
| warnings.warn(reader_error_msg, stacklevel=2) | ||
| def _finalize_formatter(self) -> None: | ||
| """Finalize the output formatter.""" | ||
| try: | ||
| self._output_formatter.end() | ||
| except (AttributeError, TypeError, ValueError, RuntimeError) as e: | ||
| formatter_end_error_msg = f"Output formatter end() failed: {e}" | ||
| warnings.warn(formatter_end_error_msg, stacklevel=2) | ||
| def _run_with_error_handling(self) -> None: | ||
| """Run stdout processing with error handling.""" | ||
| try: | ||
| self._process_stdout_lines() | ||
| except KeyboardInterrupt: | ||
| self._handle_keyboard_interrupt() | ||
| raise | ||
| except (ValueError, OSError) as e: | ||
| self._handle_io_error(e) | ||
| finally: | ||
| # Signal end-of-stream to consumers exactly once | ||
| self._emit_eos_once() | ||
| def _perform_final_cleanup(self) -> None: | ||
| """Perform final cleanup operations.""" | ||
| # Cleanup stream and invoke completion callback | ||
| self._cleanup_stdout() | ||
| # Notify parent for timing/unregistration | ||
| try: | ||
| self._on_end() | ||
| finally: | ||
| # End formatter lifecycle within the reader context | ||
| self._finalize_formatter() | ||
| def run(self) -> None: | ||
| """Continuously read stdout lines and forward them until EOF or shutdown.""" | ||
| try: | ||
| # Begin formatter lifecycle within the reader context | ||
| self._initialize_formatter() | ||
| self._run_with_error_handling() | ||
| finally: | ||
| self._perform_final_cleanup() |
| """Process watcher module. | ||
| This module contains the ProcessWatcher class for monitoring subprocess execution | ||
| in a background thread. | ||
| """ | ||
| import _thread | ||
| import contextlib | ||
| import logging | ||
| import subprocess | ||
| import threading | ||
| import time | ||
| import traceback | ||
| from typing import TYPE_CHECKING | ||
| if TYPE_CHECKING: | ||
| from running_process.running_process import RunningProcess | ||
| logger = logging.getLogger(__name__) | ||
| class ProcessWatcher: | ||
| """Background watcher that polls a process until it terminates.""" | ||
| def __init__(self, running_process: "RunningProcess") -> None: | ||
| self._rp = running_process | ||
| self._thread: threading.Thread | None = None | ||
| def start(self) -> None: | ||
| name: str = "RPWatcher" | ||
| with contextlib.suppress(AttributeError, TypeError): | ||
| if self._rp.proc is not None: | ||
| name = f"RPWatcher-{self._rp.proc.pid}" | ||
| self._thread = threading.Thread(target=self._run, name=name, daemon=True) | ||
| self._thread.start() | ||
| def _run(self) -> None: | ||
| thread_id = threading.current_thread().ident | ||
| thread_name = threading.current_thread().name | ||
| try: | ||
| while not self._rp.shutdown.is_set(): | ||
| # Enforce per-process timeout independently of wait() | ||
| if ( | ||
| self._rp.timeout is not None | ||
| and self._rp.start_time is not None | ||
| and (time.time() - self._rp.start_time) > self._rp.timeout | ||
| ): | ||
| logger.warning( | ||
| "Process timeout after %s seconds (watcher), killing: %s", | ||
| self._rp.timeout, | ||
| self._rp.command, | ||
| ) | ||
| # Execute user-provided timeout callback if available | ||
| if self._rp.on_timeout is not None: | ||
| try: | ||
| process_info = self._rp._create_process_info() # noqa: SLF001 | ||
| self._rp.on_timeout(process_info) | ||
| except (AttributeError, TypeError, ValueError, RuntimeError) as e: | ||
| logger.warning("Watcher timeout callback failed: %s", e) | ||
| self._rp.kill() | ||
| break | ||
| rc: int | None = self._rp.poll() | ||
| if rc is not None: | ||
| break | ||
| time.sleep(0.1) | ||
| except KeyboardInterrupt: | ||
| logger.warning("Thread %s (%s) caught KeyboardInterrupt", thread_id, thread_name) | ||
| logger.warning("Stack trace for thread %s:", thread_id) | ||
| traceback.print_exc() | ||
| _thread.interrupt_main() | ||
| raise | ||
| except (OSError, subprocess.SubprocessError, RuntimeError) as e: | ||
| # Surface unexpected errors and keep behavior consistent | ||
| logger.warning("Watcher thread error in %s: %s", thread_name, e) | ||
| traceback.print_exc() | ||
| @property | ||
| def thread(self) -> threading.Thread | None: | ||
| return self._thread |
| """Private subprocess runner module. | ||
| This module contains the private implementation of subprocess.run() replacement | ||
| using RunningProcess as the backend. | ||
| """ | ||
| import subprocess | ||
| from collections.abc import Callable | ||
| from pathlib import Path | ||
| from typing import TYPE_CHECKING | ||
| if TYPE_CHECKING: | ||
| from running_process.running_process import ProcessInfo | ||
| def execute_subprocess_run( | ||
| command: str | list[str], | ||
| cwd: Path | None, | ||
| check: bool, | ||
| timeout: int, | ||
| on_timeout: Callable[["ProcessInfo"], None] | None = None, | ||
| ) -> subprocess.CompletedProcess[str]: | ||
| """ | ||
| Execute a command with robust stdout handling, emulating subprocess.run(). | ||
| Uses RunningProcess as the backend to provide: | ||
| - Continuous stdout streaming to prevent pipe blocking | ||
| - Merged stderr into stdout for unified output | ||
| - Timeout protection with optional stack trace dumping | ||
| - Standard subprocess.CompletedProcess return value | ||
| Args: | ||
| command: Command to execute as string or list of arguments. | ||
| cwd: Working directory for command execution. Required parameter. | ||
| check: If True, raise CalledProcessError for non-zero exit codes. | ||
| timeout: Maximum execution time in seconds. | ||
| on_timeout: Callback function executed when process times out. | ||
| Returns: | ||
| CompletedProcess with combined stdout and process return code. | ||
| stderr field is None since it's merged into stdout. | ||
| Raises: | ||
| RuntimeError: If process times out (wraps TimeoutError). | ||
| CalledProcessError: If check=True and process exits with non-zero code. | ||
| """ | ||
| # Import here to avoid circular imports during module load | ||
| from running_process.running_process import RunningProcess # noqa: PLC0415 | ||
| # Use RunningProcess for robust stdout pumping with merged stderr | ||
| proc = RunningProcess( | ||
| command=command, | ||
| cwd=cwd, | ||
| check=False, | ||
| auto_run=True, | ||
| timeout=timeout, | ||
| on_timeout=on_timeout, | ||
| on_complete=None, | ||
| output_formatter=None, | ||
| ) | ||
| try: | ||
| return_code: int = proc.wait() | ||
| except KeyboardInterrupt: | ||
| # Propagate interrupt behavior consistent with subprocess.run | ||
| raise | ||
| except TimeoutError as e: | ||
| # Align with subprocess.TimeoutExpired semantics by raising a CalledProcessError-like | ||
| # error with available output. Using TimeoutError here is consistent with internal RP. | ||
| error_message = f"CRITICAL: Process timed out after {timeout} seconds: {command}" | ||
| raise RuntimeError(error_message) from e | ||
| combined_stdout: str = proc.stdout | ||
| # Construct CompletedProcess (stderr is merged into stdout by design) | ||
| completed = subprocess.CompletedProcess( | ||
| args=command, | ||
| returncode=return_code, | ||
| stdout=combined_stdout, | ||
| stderr=None, | ||
| ) | ||
| if check and return_code != 0: | ||
| # Raise the standard exception with captured output | ||
| raise subprocess.CalledProcessError( | ||
| returncode=return_code, | ||
| cmd=command, | ||
| output=combined_stdout, | ||
| stderr=None, | ||
| ) | ||
| return completed |
+31
-15
| Metadata-Version: 2.4 | ||
| Name: running_process | ||
| Version: 1.0.0 | ||
| Version: 1.0.1 | ||
| Summary: A modern subprocess.Popen wrapper with improved process management | ||
@@ -108,7 +108,6 @@ Project-URL: Homepage, https://github.com/yourusername/running-process | ||
| ```python | ||
| from running_process import RunningProcess | ||
| from running_process.output_formatter import create_sketch_path_formatter | ||
| from running_process import RunningProcess, TimeDeltaFormatter | ||
| # Use built-in path formatter | ||
| formatter = create_sketch_path_formatter("MyProject") | ||
| # Use built-in time delta formatter | ||
| formatter = TimeDeltaFormatter() | ||
| process = RunningProcess( | ||
@@ -151,13 +150,8 @@ ["gcc", "-v", "main.c"], | ||
| ```bash | ||
| pip install running-process | ||
| pip install running_process | ||
| ``` | ||
| ### Optional Dependencies | ||
| ### Dependencies | ||
| For process tree termination support: | ||
| ```bash | ||
| pip install running-process[psutil] | ||
| # or | ||
| pip install psutil | ||
| ``` | ||
| This package includes `psutil` as a required dependency for process tree management functionality. | ||
@@ -171,4 +165,4 @@ ## Architecture | ||
| - **RunningProcessManager**: Thread-safe singleton registry for tracking active processes | ||
| - **OutputFormatter**: Protocol for transforming process output with built-in implementations | ||
| - **process_utils**: Utilities for process tree operations (requires optional psutil dependency) | ||
| - **OutputFormatter**: Protocol for transforming process output (with NullOutputFormatter and TimeDeltaFormatter implementations) | ||
| - **process_utils**: Utilities for process tree operations | ||
@@ -246,2 +240,20 @@ ## Development | ||
| ### ProcessOutputReader | ||
| Internal threaded reader that drains process stdout/stderr: | ||
| ```python | ||
| class ProcessOutputReader: | ||
| def __init__( | ||
| self, | ||
| proc: subprocess.Popen[Any], | ||
| shutdown: threading.Event, | ||
| output_formatter: OutputFormatter | None, | ||
| on_output: Callable[[str | EndOfStream], None], | ||
| on_end: Callable[[], None], | ||
| ) -> None: ... | ||
| def run(self) -> None: ... # Thread entry point | ||
| ``` | ||
| ### OutputFormatter Protocol | ||
@@ -256,2 +268,6 @@ | ||
| Built-in implementations: | ||
| - `NullOutputFormatter`: No-op formatter (default) | ||
| - `TimeDeltaFormatter`: Adds elapsed time prefix to each line | ||
| ## License | ||
@@ -258,0 +274,0 @@ |
+9
-5
| running_process/__init__.py,sha256=gTVGqReRvViI2XHrk9ahRSkAxm_bmeFgu3EvXuFgQAE,670 | ||
| running_process/line_iterator.py,sha256=lbsjssk0yKjCRCb2knOuU5C2hbvdqKEgDjv2hPwIqrQ,1544 | ||
| running_process/output_formatter.py,sha256=ie8gRQSZRGpBcNuZt5ns-yK6DDjO_SzAsiQAqLo71D0,1917 | ||
| running_process/process_output_reader.py,sha256=k7tRcqOFdl6H_KAPlAHyVLhMMfvbiXInqLjUQTejEzk,6022 | ||
| running_process/process_utils.py,sha256=fNCdxfebjzhK-4t6K6x6Vhil7luIHpVQIVZJAQa98u8,2255 | ||
| running_process/running_process.py,sha256=l7tT5eO9y0wS_nGy-M1gDgeKDV6Ay8BeYXFRW1Dj2fA,44882 | ||
| running_process/process_watcher.py,sha256=i8YpgJlQVvY0NbuEbs2N39HP9kITb0uyd0I2i29u6sg,3110 | ||
| running_process/running_process.py,sha256=1CsxvKZgzoA1aFdexU-l6cinrBZ_YygIE65g4kb_68I,36444 | ||
| running_process/running_process_manager.py,sha256=xx_kmXw9j-hjOI_pRrlTKdc9yekKBVQHhUq1eDpBGJU,2467 | ||
| running_process/subprocess_runner.py,sha256=O_Wwe2vrXWnYkiNEpauNLre3PqNO3uDogrRz0JCkH7M,3223 | ||
| running_process/assets/example.txt,sha256=lTBovRjiz0_TgtAtbA1C5hNi2ffbqnNPqkKg6UiKCT8,54 | ||
| running_process-1.0.0.dist-info/METADATA,sha256=L1KTugkLpEmU-f2V44h2RS93SrTzChd0id-lCEvw6KI,7969 | ||
| running_process-1.0.0.dist-info/WHEEL,sha256=qtCwoSJWgHk21S1Kb4ihdzI2rlJ1ZKaIurTj_ngOhyQ,87 | ||
| running_process-1.0.0.dist-info/licenses/LICENSE,sha256=b6pOoifSXiUaz_lDS84vWlG3fr4yUKwB8fzkrH9R8bQ,1064 | ||
| running_process-1.0.0.dist-info/RECORD,, | ||
| running_process-1.0.1.dist-info/METADATA,sha256=qhA7xeu5DqbT76ZYNDuydLta5PmK2yYhh76scQORh-M,8465 | ||
| running_process-1.0.1.dist-info/WHEEL,sha256=qtCwoSJWgHk21S1Kb4ihdzI2rlJ1ZKaIurTj_ngOhyQ,87 | ||
| running_process-1.0.1.dist-info/licenses/LICENSE,sha256=b6pOoifSXiUaz_lDS84vWlG3fr4yUKwB8fzkrH9R8bQ,1064 | ||
| running_process-1.0.1.dist-info/RECORD,, |
@@ -14,3 +14,3 @@ """Enhanced subprocess execution with timeout protection, output streaming, and process tree management. | ||
| process = RunningProcess(["ls", "-la"], auto_run=False) | ||
| process.run() | ||
| process.start() | ||
| exit_code = process.wait() | ||
@@ -106,6 +106,4 @@ ``` | ||
| import time | ||
| import traceback | ||
| import warnings | ||
| from collections.abc import Callable, Iterator | ||
| from contextlib import AbstractContextManager | ||
| from collections.abc import Callable | ||
| from dataclasses import dataclass | ||
@@ -116,5 +114,9 @@ from pathlib import Path | ||
| from running_process.line_iterator import _RunningProcessLineIterator | ||
| from running_process.output_formatter import NullOutputFormatter, OutputFormatter | ||
| from running_process.process_output_reader import EndOfStream, ProcessOutputReader | ||
| from running_process.process_utils import kill_process_tree | ||
| from running_process.process_watcher import ProcessWatcher | ||
| from running_process.running_process_manager import RunningProcessManagerSingleton | ||
| from running_process.subprocess_runner import execute_subprocess_run | ||
@@ -166,244 +168,5 @@ # Create module-level logger | ||
| class EndOfStream: | ||
| """Sentinel used to indicate end-of-stream from the reader.""" | ||
| # Console UTF-8 configuration is now handled globally in ci/__init__.py | ||
| class ProcessOutputReader: | ||
| """Dedicated reader that drains a process's stdout and enqueues lines. | ||
| This keeps the stdout pipe drained to prevent blocking and forwards | ||
| transformed, non-empty lines to the provided output queue. It also invokes | ||
| lifecycle callbacks for timing/unregister behaviors. | ||
| """ | ||
| def __init__( | ||
| self, | ||
| proc: subprocess.Popen[Any], | ||
| shutdown: threading.Event, | ||
| output_formatter: OutputFormatter | None, | ||
| on_output: Callable[[str | EndOfStream], None], | ||
| on_end: Callable[[], None], | ||
| ) -> None: | ||
| output_formatter = output_formatter or NullOutputFormatter() | ||
| self._proc = proc | ||
| self._shutdown = shutdown | ||
| self._output_formatter = output_formatter | ||
| self._on_output = on_output | ||
| self._on_end = on_end | ||
| self.last_stdout_ts: float | None = None | ||
| self._eos_emitted: bool = False | ||
| def _emit_eos_once(self) -> None: | ||
| """Ensure EndOfStream is only forwarded a single time.""" | ||
| if not self._eos_emitted: | ||
| self._eos_emitted = True | ||
| self._on_output(EndOfStream()) | ||
| def _initialize_formatter(self) -> None: | ||
| """Initialize the output formatter.""" | ||
| try: | ||
| self._output_formatter.begin() | ||
| except (AttributeError, TypeError, ValueError, RuntimeError) as e: | ||
| formatter_error_msg = f"Output formatter begin() failed: {e}" | ||
| warnings.warn(formatter_error_msg, stacklevel=2) | ||
| def _process_stdout_lines(self) -> None: | ||
| """Process stdout lines and forward them to output.""" | ||
| assert self._proc.stdout is not None | ||
| for line in self._proc.stdout: | ||
| self.last_stdout_ts = time.time() | ||
| if self._shutdown.is_set(): | ||
| break | ||
| line_stripped = line.rstrip() | ||
| if not line_stripped: | ||
| continue | ||
| transformed_line = self._output_formatter.transform(line_stripped) | ||
| self._on_output(transformed_line) | ||
| def _handle_keyboard_interrupt(self) -> None: | ||
| """Handle KeyboardInterrupt in reader thread.""" | ||
| # Per project rules, handle interrupts in threads explicitly | ||
| thread_id = threading.current_thread().ident | ||
| thread_name = threading.current_thread().name | ||
| logger.warning("Thread %s (%s) caught KeyboardInterrupt", thread_id, thread_name) | ||
| logger.warning("Stack trace for thread %s:", thread_id) | ||
| traceback.print_exc() | ||
| # Try to ensure child process is terminated promptly | ||
| try: | ||
| self._proc.kill() | ||
| except (ProcessLookupError, PermissionError, OSError) as kill_error: | ||
| logger.warning("Failed to kill process: %s", kill_error) | ||
| # Propagate to main thread and re-raise | ||
| _thread.interrupt_main() | ||
| # EOF | ||
| self._emit_eos_once() | ||
| def _handle_io_error(self, e: ValueError | OSError) -> None: | ||
| """Handle IO errors during stdout reading.""" | ||
| # Normal shutdown scenarios include closed file descriptors. | ||
| error_str = str(e) | ||
| if any(msg in error_str for msg in ["closed file", "Bad file descriptor"]): | ||
| closed_file_msg = f"Output reader encountered closed file: {e}" | ||
| warnings.warn(closed_file_msg, stacklevel=2) | ||
| else: | ||
| logger.warning("Output reader encountered error: %s", e) | ||
| def _cleanup_stdout(self) -> None: | ||
| """Close stdout stream safely.""" | ||
| if self._proc.stdout and not self._proc.stdout.closed: | ||
| try: | ||
| self._proc.stdout.close() | ||
| except (ValueError, OSError) as err: | ||
| reader_error_msg = f"Output reader encountered error: {err}" | ||
| warnings.warn(reader_error_msg, stacklevel=2) | ||
| def _finalize_formatter(self) -> None: | ||
| """Finalize the output formatter.""" | ||
| try: | ||
| self._output_formatter.end() | ||
| except (AttributeError, TypeError, ValueError, RuntimeError) as e: | ||
| formatter_end_error_msg = f"Output formatter end() failed: {e}" | ||
| warnings.warn(formatter_end_error_msg, stacklevel=2) | ||
| def _run_with_error_handling(self) -> None: | ||
| """Run stdout processing with error handling.""" | ||
| try: | ||
| self._process_stdout_lines() | ||
| except KeyboardInterrupt: | ||
| self._handle_keyboard_interrupt() | ||
| raise | ||
| except (ValueError, OSError) as e: | ||
| self._handle_io_error(e) | ||
| finally: | ||
| # Signal end-of-stream to consumers exactly once | ||
| self._emit_eos_once() | ||
| def _perform_final_cleanup(self) -> None: | ||
| """Perform final cleanup operations.""" | ||
| # Cleanup stream and invoke completion callback | ||
| self._cleanup_stdout() | ||
| # Notify parent for timing/unregistration | ||
| try: | ||
| self._on_end() | ||
| finally: | ||
| # End formatter lifecycle within the reader context | ||
| self._finalize_formatter() | ||
| def run(self) -> None: | ||
| """Continuously read stdout lines and forward them until EOF or shutdown.""" | ||
| try: | ||
| # Begin formatter lifecycle within the reader context | ||
| self._initialize_formatter() | ||
| self._run_with_error_handling() | ||
| finally: | ||
| self._perform_final_cleanup() | ||
| class ProcessWatcher: | ||
| """Background watcher that polls a process until it terminates.""" | ||
| def __init__(self, running_process: "RunningProcess") -> None: | ||
| self._rp = running_process | ||
| self._thread: threading.Thread | None = None | ||
| def start(self) -> None: | ||
| name: str = "RPWatcher" | ||
| with contextlib.suppress(AttributeError, TypeError): | ||
| if self._rp.proc is not None: | ||
| name = f"RPWatcher-{self._rp.proc.pid}" | ||
| self._thread = threading.Thread(target=self._run, name=name, daemon=True) | ||
| self._thread.start() | ||
| def _run(self) -> None: | ||
| thread_id = threading.current_thread().ident | ||
| thread_name = threading.current_thread().name | ||
| try: | ||
| while not self._rp.shutdown.is_set(): | ||
| # Enforce per-process timeout independently of wait() | ||
| if ( | ||
| self._rp.timeout is not None | ||
| and self._rp.start_time is not None | ||
| and (time.time() - self._rp.start_time) > self._rp.timeout | ||
| ): | ||
| logger.warning( | ||
| "Process timeout after %s seconds (watcher), killing: %s", | ||
| self._rp.timeout, | ||
| self._rp.command, | ||
| ) | ||
| # Execute user-provided timeout callback if available | ||
| if self._rp.on_timeout is not None: | ||
| try: | ||
| process_info = self._rp._create_process_info() # noqa: SLF001 | ||
| self._rp.on_timeout(process_info) | ||
| except (AttributeError, TypeError, ValueError, RuntimeError) as e: | ||
| logger.warning("Watcher timeout callback failed: %s", e) | ||
| self._rp.kill() | ||
| break | ||
| rc: int | None = self._rp.poll() | ||
| if rc is not None: | ||
| break | ||
| time.sleep(0.1) | ||
| except KeyboardInterrupt: | ||
| logger.warning("Thread %s (%s) caught KeyboardInterrupt", thread_id, thread_name) | ||
| logger.warning("Stack trace for thread %s:", thread_id) | ||
| traceback.print_exc() | ||
| _thread.interrupt_main() | ||
| raise | ||
| except (OSError, subprocess.SubprocessError, RuntimeError) as e: | ||
| # Surface unexpected errors and keep behavior consistent | ||
| logger.warning("Watcher thread error in %s: %s", thread_name, e) | ||
| traceback.print_exc() | ||
| @property | ||
| def thread(self) -> threading.Thread | None: | ||
| return self._thread | ||
| class _RunningProcessLineIterator(AbstractContextManager[Iterator[str]], Iterator[str]): | ||
| """Context-managed iterator over a RunningProcess's output lines. | ||
| Yields only strings (never None). Stops on EndOfStream or when a per-line | ||
| timeout elapses. | ||
| """ | ||
| def __init__(self, rp: "RunningProcess", timeout: float | None) -> None: | ||
| self._rp = rp | ||
| self._timeout = timeout | ||
| # Context manager protocol | ||
| def __enter__(self) -> "_RunningProcessLineIterator": | ||
| return self | ||
| def __exit__( | ||
| self, | ||
| exc_type: type[BaseException] | None, | ||
| exc: BaseException | None, | ||
| tb: Any | None, | ||
| ) -> bool: | ||
| # Do not suppress exceptions | ||
| return False | ||
| # Iterator protocol | ||
| def __iter__(self) -> Iterator[str]: | ||
| return self | ||
| def __next__(self) -> str: | ||
| next_item: str | EndOfStream = self._rp.get_next_line(timeout=self._timeout) | ||
| if isinstance(next_item, EndOfStream): | ||
| raise StopIteration | ||
| # Must be a string by contract | ||
| return next_item | ||
| class RunningProcess: | ||
@@ -465,2 +228,13 @@ """ | ||
| shell = any(part in shell_meta for part in command) | ||
| # Validate shell metacharacters when shell=False | ||
| if shell is False and isinstance(command, list): | ||
| shell_meta = {"&&", "||", "|", ";", ">", "<", "2>", "&"} | ||
| found_meta = [part for part in command if part in shell_meta] | ||
| if found_meta: | ||
| error_message = ( | ||
| f"Shell metacharacters {found_meta} found in command but shell=False. " | ||
| f"Either set shell=True or remove shell metacharacters from the command." | ||
| ) | ||
| raise ValueError(error_message) | ||
| self.command = command | ||
@@ -473,4 +247,3 @@ self.shell: bool = shell | ||
| self.check = check | ||
| # Force auto_run to False if NO_PARALLEL is set | ||
| self.auto_run = False if os.environ.get("NO_PARALLEL") else auto_run | ||
| self.auto_run = auto_run | ||
| self.timeout = timeout | ||
@@ -489,3 +262,3 @@ self.on_timeout = on_timeout | ||
| if auto_run: | ||
| self.run() | ||
| self.start() | ||
@@ -658,3 +431,3 @@ def get_command_str(self) -> str: | ||
| def run(self) -> None: | ||
| def start(self) -> None: | ||
| """ | ||
@@ -1106,3 +879,38 @@ Execute the command and stream its output to the queue. | ||
| @staticmethod | ||
| def run( | ||
| command: str | list[str], | ||
| cwd: Path | None, | ||
| check: bool, | ||
| timeout: int, | ||
| on_timeout: Callable[[ProcessInfo], None] | None = None, | ||
| ) -> subprocess.CompletedProcess[str]: | ||
| """Public static accessor for subprocess.run() replacement. | ||
| Execute a command with robust stdout handling, emulating subprocess.run(). | ||
| Uses RunningProcess as the backend to provide: | ||
| - Continuous stdout streaming to prevent pipe blocking | ||
| - Merged stderr into stdout for unified output | ||
| - Timeout protection with optional stack trace dumping | ||
| - Standard subprocess.CompletedProcess return value | ||
| Args: | ||
| command: Command to execute as string or list of arguments. | ||
| cwd: Working directory for command execution. Required parameter. | ||
| check: If True, raise CalledProcessError for non-zero exit codes. | ||
| timeout: Maximum execution time in seconds. | ||
| on_timeout: Callback function executed when process times out. | ||
| Returns: | ||
| CompletedProcess with combined stdout and process return code. | ||
| stderr field is None since it's merged into stdout. | ||
| Raises: | ||
| RuntimeError: If process times out (wraps TimeoutError). | ||
| CalledProcessError: If check=True and process exits with non-zero code. | ||
| """ | ||
| return execute_subprocess_run(command, cwd, check, timeout, on_timeout) | ||
| # NOTE: RunningProcessManager and its singleton live in running_process_manager.py | ||
@@ -1118,10 +926,5 @@ | ||
| ) -> subprocess.CompletedProcess[str]: | ||
| """ | ||
| Execute a command with robust stdout handling, emulating subprocess.run(). | ||
| """Execute a command with robust stdout handling, emulating subprocess.run(). | ||
| Uses RunningProcess as the backend to provide: | ||
| - Continuous stdout streaming to prevent pipe blocking | ||
| - Merged stderr into stdout for unified output | ||
| - Timeout protection with optional stack trace dumping | ||
| - Standard subprocess.CompletedProcess return value | ||
| This is a backward compatibility wrapper for the static method RunningProcess.run(). | ||
@@ -1143,44 +946,2 @@ Args: | ||
| """ | ||
| # Use RunningProcess for robust stdout pumping with merged stderr | ||
| proc = RunningProcess( | ||
| command=command, | ||
| cwd=cwd, | ||
| check=False, | ||
| auto_run=True, | ||
| timeout=timeout, | ||
| on_timeout=on_timeout, | ||
| on_complete=None, | ||
| output_formatter=None, | ||
| ) | ||
| try: | ||
| return_code: int = proc.wait() | ||
| except KeyboardInterrupt: | ||
| # Propagate interrupt behavior consistent with subprocess.run | ||
| raise | ||
| except TimeoutError as e: | ||
| # Align with subprocess.TimeoutExpired semantics by raising a CalledProcessError-like | ||
| # error with available output. Using TimeoutError here is consistent with internal RP. | ||
| error_message = f"CRITICAL: Process timed out after {timeout} seconds: {command}" | ||
| raise RuntimeError(error_message) from e | ||
| combined_stdout: str = proc.stdout | ||
| # Construct CompletedProcess (stderr is merged into stdout by design) | ||
| completed = subprocess.CompletedProcess( | ||
| args=command, | ||
| returncode=return_code, | ||
| stdout=combined_stdout, | ||
| stderr=None, | ||
| ) | ||
| if check and return_code != 0: | ||
| # Raise the standard exception with captured output | ||
| raise subprocess.CalledProcessError( | ||
| returncode=return_code, | ||
| cmd=command, | ||
| output=combined_stdout, | ||
| stderr=None, | ||
| ) | ||
| return completed | ||
| return RunningProcess.run(command, cwd, check, timeout, on_timeout) |
Alert delta unavailable
Currently unable to show alert delta for PyPI packages.