Latest Threat Research:SANDWORM_MODE: Shai-Hulud-Style npm Worm Hijacks CI Workflows and Poisons AI Toolchains.Details
Socket
Book a DemoInstallSign in
Socket

running-process

Package Overview
Dependencies
Maintainers
1
Versions
11
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

running-process - npm Package Compare versions

Comparing version
1.0.0
to
1.0.1
+52
running_process/line_iterator.py
"""Line iterator module.
This module contains the _RunningProcessLineIterator class for iterating over
process output lines in a context-managed way.
"""
from collections.abc import Iterator
from contextlib import AbstractContextManager
from typing import TYPE_CHECKING, Any
if TYPE_CHECKING:
from running_process.running_process import RunningProcess
from running_process.process_output_reader import EndOfStream
class _RunningProcessLineIterator(AbstractContextManager[Iterator[str]], Iterator[str]):
"""Context-managed iterator over a RunningProcess's output lines.
Yields only strings (never None). Stops on EndOfStream or when a per-line
timeout elapses.
"""
def __init__(self, rp: "RunningProcess", timeout: float | None) -> None:
self._rp = rp
self._timeout = timeout
# Context manager protocol
def __enter__(self) -> "_RunningProcessLineIterator":
return self
def __exit__(
self,
exc_type: type[BaseException] | None,
exc: BaseException | None,
tb: Any | None,
) -> bool:
# Do not suppress exceptions
return False
# Iterator protocol
def __iter__(self) -> Iterator[str]:
return self
def __next__(self) -> str:
next_item: str | EndOfStream = self._rp.get_next_line(timeout=self._timeout)
if isinstance(next_item, EndOfStream):
raise StopIteration
# Must be a string by contract
return next_item
"""Process output reader module.
This module contains the ProcessOutputReader class for handling subprocess output
in a dedicated thread to prevent blocking issues.
"""
import _thread
import logging
import threading
import time
import traceback
import warnings
from collections.abc import Callable
from subprocess import Popen
from typing import Any
from running_process.output_formatter import NullOutputFormatter, OutputFormatter
logger = logging.getLogger(__name__)
class EndOfStream:
"""Sentinel used to indicate end-of-stream from the reader."""
class ProcessOutputReader:
"""Dedicated reader that drains a process's stdout and enqueues lines.
This keeps the stdout pipe drained to prevent blocking and forwards
transformed, non-empty lines to the provided output queue. It also invokes
lifecycle callbacks for timing/unregister behaviors.
"""
def __init__(
self,
proc: Popen[Any],
shutdown: threading.Event,
output_formatter: OutputFormatter | None,
on_output: Callable[[str | EndOfStream], None],
on_end: Callable[[], None],
) -> None:
output_formatter = output_formatter or NullOutputFormatter()
self._proc = proc
self._shutdown = shutdown
self._output_formatter = output_formatter
self._on_output = on_output
self._on_end = on_end
self.last_stdout_ts: float | None = None
self._eos_emitted: bool = False
def _emit_eos_once(self) -> None:
"""Ensure EndOfStream is only forwarded a single time."""
if not self._eos_emitted:
self._eos_emitted = True
self._on_output(EndOfStream())
def _initialize_formatter(self) -> None:
"""Initialize the output formatter."""
try:
self._output_formatter.begin()
except (AttributeError, TypeError, ValueError, RuntimeError) as e:
formatter_error_msg = f"Output formatter begin() failed: {e}"
warnings.warn(formatter_error_msg, stacklevel=2)
def _process_stdout_lines(self) -> None:
"""Process stdout lines and forward them to output."""
assert self._proc.stdout is not None
for line in self._proc.stdout:
self.last_stdout_ts = time.time()
if self._shutdown.is_set():
break
line_stripped = line.rstrip()
if not line_stripped:
continue
transformed_line = self._output_formatter.transform(line_stripped)
self._on_output(transformed_line)
def _handle_keyboard_interrupt(self) -> None:
"""Handle KeyboardInterrupt in reader thread."""
# Per project rules, handle interrupts in threads explicitly
thread_id = threading.current_thread().ident
thread_name = threading.current_thread().name
logger.warning("Thread %s (%s) caught KeyboardInterrupt", thread_id, thread_name)
logger.warning("Stack trace for thread %s:", thread_id)
traceback.print_exc()
# Try to ensure child process is terminated promptly
try:
self._proc.kill()
except (ProcessLookupError, PermissionError, OSError) as kill_error:
logger.warning("Failed to kill process: %s", kill_error)
# Propagate to main thread and re-raise
_thread.interrupt_main()
# EOF
self._emit_eos_once()
def _handle_io_error(self, e: ValueError | OSError) -> None:
"""Handle IO errors during stdout reading."""
# Normal shutdown scenarios include closed file descriptors.
error_str = str(e)
if any(msg in error_str for msg in ["closed file", "Bad file descriptor"]):
closed_file_msg = f"Output reader encountered closed file: {e}"
warnings.warn(closed_file_msg, stacklevel=2)
else:
logger.warning("Output reader encountered error: %s", e)
def _cleanup_stdout(self) -> None:
"""Close stdout stream safely."""
if self._proc.stdout and not self._proc.stdout.closed:
try:
self._proc.stdout.close()
except (ValueError, OSError) as err:
reader_error_msg = f"Output reader encountered error: {err}"
warnings.warn(reader_error_msg, stacklevel=2)
def _finalize_formatter(self) -> None:
"""Finalize the output formatter."""
try:
self._output_formatter.end()
except (AttributeError, TypeError, ValueError, RuntimeError) as e:
formatter_end_error_msg = f"Output formatter end() failed: {e}"
warnings.warn(formatter_end_error_msg, stacklevel=2)
def _run_with_error_handling(self) -> None:
"""Run stdout processing with error handling."""
try:
self._process_stdout_lines()
except KeyboardInterrupt:
self._handle_keyboard_interrupt()
raise
except (ValueError, OSError) as e:
self._handle_io_error(e)
finally:
# Signal end-of-stream to consumers exactly once
self._emit_eos_once()
def _perform_final_cleanup(self) -> None:
"""Perform final cleanup operations."""
# Cleanup stream and invoke completion callback
self._cleanup_stdout()
# Notify parent for timing/unregistration
try:
self._on_end()
finally:
# End formatter lifecycle within the reader context
self._finalize_formatter()
def run(self) -> None:
"""Continuously read stdout lines and forward them until EOF or shutdown."""
try:
# Begin formatter lifecycle within the reader context
self._initialize_formatter()
self._run_with_error_handling()
finally:
self._perform_final_cleanup()
"""Process watcher module.
This module contains the ProcessWatcher class for monitoring subprocess execution
in a background thread.
"""
import _thread
import contextlib
import logging
import subprocess
import threading
import time
import traceback
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from running_process.running_process import RunningProcess
logger = logging.getLogger(__name__)
class ProcessWatcher:
"""Background watcher that polls a process until it terminates."""
def __init__(self, running_process: "RunningProcess") -> None:
self._rp = running_process
self._thread: threading.Thread | None = None
def start(self) -> None:
name: str = "RPWatcher"
with contextlib.suppress(AttributeError, TypeError):
if self._rp.proc is not None:
name = f"RPWatcher-{self._rp.proc.pid}"
self._thread = threading.Thread(target=self._run, name=name, daemon=True)
self._thread.start()
def _run(self) -> None:
thread_id = threading.current_thread().ident
thread_name = threading.current_thread().name
try:
while not self._rp.shutdown.is_set():
# Enforce per-process timeout independently of wait()
if (
self._rp.timeout is not None
and self._rp.start_time is not None
and (time.time() - self._rp.start_time) > self._rp.timeout
):
logger.warning(
"Process timeout after %s seconds (watcher), killing: %s",
self._rp.timeout,
self._rp.command,
)
# Execute user-provided timeout callback if available
if self._rp.on_timeout is not None:
try:
process_info = self._rp._create_process_info() # noqa: SLF001
self._rp.on_timeout(process_info)
except (AttributeError, TypeError, ValueError, RuntimeError) as e:
logger.warning("Watcher timeout callback failed: %s", e)
self._rp.kill()
break
rc: int | None = self._rp.poll()
if rc is not None:
break
time.sleep(0.1)
except KeyboardInterrupt:
logger.warning("Thread %s (%s) caught KeyboardInterrupt", thread_id, thread_name)
logger.warning("Stack trace for thread %s:", thread_id)
traceback.print_exc()
_thread.interrupt_main()
raise
except (OSError, subprocess.SubprocessError, RuntimeError) as e:
# Surface unexpected errors and keep behavior consistent
logger.warning("Watcher thread error in %s: %s", thread_name, e)
traceback.print_exc()
@property
def thread(self) -> threading.Thread | None:
return self._thread
"""Private subprocess runner module.
This module contains the private implementation of subprocess.run() replacement
using RunningProcess as the backend.
"""
import subprocess
from collections.abc import Callable
from pathlib import Path
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from running_process.running_process import ProcessInfo
def execute_subprocess_run(
command: str | list[str],
cwd: Path | None,
check: bool,
timeout: int,
on_timeout: Callable[["ProcessInfo"], None] | None = None,
) -> subprocess.CompletedProcess[str]:
"""
Execute a command with robust stdout handling, emulating subprocess.run().
Uses RunningProcess as the backend to provide:
- Continuous stdout streaming to prevent pipe blocking
- Merged stderr into stdout for unified output
- Timeout protection with optional stack trace dumping
- Standard subprocess.CompletedProcess return value
Args:
command: Command to execute as string or list of arguments.
cwd: Working directory for command execution. Required parameter.
check: If True, raise CalledProcessError for non-zero exit codes.
timeout: Maximum execution time in seconds.
on_timeout: Callback function executed when process times out.
Returns:
CompletedProcess with combined stdout and process return code.
stderr field is None since it's merged into stdout.
Raises:
RuntimeError: If process times out (wraps TimeoutError).
CalledProcessError: If check=True and process exits with non-zero code.
"""
# Import here to avoid circular imports during module load
from running_process.running_process import RunningProcess # noqa: PLC0415
# Use RunningProcess for robust stdout pumping with merged stderr
proc = RunningProcess(
command=command,
cwd=cwd,
check=False,
auto_run=True,
timeout=timeout,
on_timeout=on_timeout,
on_complete=None,
output_formatter=None,
)
try:
return_code: int = proc.wait()
except KeyboardInterrupt:
# Propagate interrupt behavior consistent with subprocess.run
raise
except TimeoutError as e:
# Align with subprocess.TimeoutExpired semantics by raising a CalledProcessError-like
# error with available output. Using TimeoutError here is consistent with internal RP.
error_message = f"CRITICAL: Process timed out after {timeout} seconds: {command}"
raise RuntimeError(error_message) from e
combined_stdout: str = proc.stdout
# Construct CompletedProcess (stderr is merged into stdout by design)
completed = subprocess.CompletedProcess(
args=command,
returncode=return_code,
stdout=combined_stdout,
stderr=None,
)
if check and return_code != 0:
# Raise the standard exception with captured output
raise subprocess.CalledProcessError(
returncode=return_code,
cmd=command,
output=combined_stdout,
stderr=None,
)
return completed
+31
-15
Metadata-Version: 2.4
Name: running_process
Version: 1.0.0
Version: 1.0.1
Summary: A modern subprocess.Popen wrapper with improved process management

@@ -108,7 +108,6 @@ Project-URL: Homepage, https://github.com/yourusername/running-process

```python
from running_process import RunningProcess
from running_process.output_formatter import create_sketch_path_formatter
from running_process import RunningProcess, TimeDeltaFormatter
# Use built-in path formatter
formatter = create_sketch_path_formatter("MyProject")
# Use built-in time delta formatter
formatter = TimeDeltaFormatter()
process = RunningProcess(

@@ -151,13 +150,8 @@ ["gcc", "-v", "main.c"],

```bash
pip install running-process
pip install running_process
```
### Optional Dependencies
### Dependencies
For process tree termination support:
```bash
pip install running-process[psutil]
# or
pip install psutil
```
This package includes `psutil` as a required dependency for process tree management functionality.

@@ -171,4 +165,4 @@ ## Architecture

- **RunningProcessManager**: Thread-safe singleton registry for tracking active processes
- **OutputFormatter**: Protocol for transforming process output with built-in implementations
- **process_utils**: Utilities for process tree operations (requires optional psutil dependency)
- **OutputFormatter**: Protocol for transforming process output (with NullOutputFormatter and TimeDeltaFormatter implementations)
- **process_utils**: Utilities for process tree operations

@@ -246,2 +240,20 @@ ## Development

### ProcessOutputReader
Internal threaded reader that drains process stdout/stderr:
```python
class ProcessOutputReader:
def __init__(
self,
proc: subprocess.Popen[Any],
shutdown: threading.Event,
output_formatter: OutputFormatter | None,
on_output: Callable[[str | EndOfStream], None],
on_end: Callable[[], None],
) -> None: ...
def run(self) -> None: ... # Thread entry point
```
### OutputFormatter Protocol

@@ -256,2 +268,6 @@

Built-in implementations:
- `NullOutputFormatter`: No-op formatter (default)
- `TimeDeltaFormatter`: Adds elapsed time prefix to each line
## License

@@ -258,0 +274,0 @@

+9
-5
running_process/__init__.py,sha256=gTVGqReRvViI2XHrk9ahRSkAxm_bmeFgu3EvXuFgQAE,670
running_process/line_iterator.py,sha256=lbsjssk0yKjCRCb2knOuU5C2hbvdqKEgDjv2hPwIqrQ,1544
running_process/output_formatter.py,sha256=ie8gRQSZRGpBcNuZt5ns-yK6DDjO_SzAsiQAqLo71D0,1917
running_process/process_output_reader.py,sha256=k7tRcqOFdl6H_KAPlAHyVLhMMfvbiXInqLjUQTejEzk,6022
running_process/process_utils.py,sha256=fNCdxfebjzhK-4t6K6x6Vhil7luIHpVQIVZJAQa98u8,2255
running_process/running_process.py,sha256=l7tT5eO9y0wS_nGy-M1gDgeKDV6Ay8BeYXFRW1Dj2fA,44882
running_process/process_watcher.py,sha256=i8YpgJlQVvY0NbuEbs2N39HP9kITb0uyd0I2i29u6sg,3110
running_process/running_process.py,sha256=1CsxvKZgzoA1aFdexU-l6cinrBZ_YygIE65g4kb_68I,36444
running_process/running_process_manager.py,sha256=xx_kmXw9j-hjOI_pRrlTKdc9yekKBVQHhUq1eDpBGJU,2467
running_process/subprocess_runner.py,sha256=O_Wwe2vrXWnYkiNEpauNLre3PqNO3uDogrRz0JCkH7M,3223
running_process/assets/example.txt,sha256=lTBovRjiz0_TgtAtbA1C5hNi2ffbqnNPqkKg6UiKCT8,54
running_process-1.0.0.dist-info/METADATA,sha256=L1KTugkLpEmU-f2V44h2RS93SrTzChd0id-lCEvw6KI,7969
running_process-1.0.0.dist-info/WHEEL,sha256=qtCwoSJWgHk21S1Kb4ihdzI2rlJ1ZKaIurTj_ngOhyQ,87
running_process-1.0.0.dist-info/licenses/LICENSE,sha256=b6pOoifSXiUaz_lDS84vWlG3fr4yUKwB8fzkrH9R8bQ,1064
running_process-1.0.0.dist-info/RECORD,,
running_process-1.0.1.dist-info/METADATA,sha256=qhA7xeu5DqbT76ZYNDuydLta5PmK2yYhh76scQORh-M,8465
running_process-1.0.1.dist-info/WHEEL,sha256=qtCwoSJWgHk21S1Kb4ihdzI2rlJ1ZKaIurTj_ngOhyQ,87
running_process-1.0.1.dist-info/licenses/LICENSE,sha256=b6pOoifSXiUaz_lDS84vWlG3fr4yUKwB8fzkrH9R8bQ,1064
running_process-1.0.1.dist-info/RECORD,,

@@ -14,3 +14,3 @@ """Enhanced subprocess execution with timeout protection, output streaming, and process tree management.

process = RunningProcess(["ls", "-la"], auto_run=False)
process.run()
process.start()
exit_code = process.wait()

@@ -106,6 +106,4 @@ ```

import time
import traceback
import warnings
from collections.abc import Callable, Iterator
from contextlib import AbstractContextManager
from collections.abc import Callable
from dataclasses import dataclass

@@ -116,5 +114,9 @@ from pathlib import Path

from running_process.line_iterator import _RunningProcessLineIterator
from running_process.output_formatter import NullOutputFormatter, OutputFormatter
from running_process.process_output_reader import EndOfStream, ProcessOutputReader
from running_process.process_utils import kill_process_tree
from running_process.process_watcher import ProcessWatcher
from running_process.running_process_manager import RunningProcessManagerSingleton
from running_process.subprocess_runner import execute_subprocess_run

@@ -166,244 +168,5 @@ # Create module-level logger

class EndOfStream:
"""Sentinel used to indicate end-of-stream from the reader."""
# Console UTF-8 configuration is now handled globally in ci/__init__.py
class ProcessOutputReader:
"""Dedicated reader that drains a process's stdout and enqueues lines.
This keeps the stdout pipe drained to prevent blocking and forwards
transformed, non-empty lines to the provided output queue. It also invokes
lifecycle callbacks for timing/unregister behaviors.
"""
def __init__(
self,
proc: subprocess.Popen[Any],
shutdown: threading.Event,
output_formatter: OutputFormatter | None,
on_output: Callable[[str | EndOfStream], None],
on_end: Callable[[], None],
) -> None:
output_formatter = output_formatter or NullOutputFormatter()
self._proc = proc
self._shutdown = shutdown
self._output_formatter = output_formatter
self._on_output = on_output
self._on_end = on_end
self.last_stdout_ts: float | None = None
self._eos_emitted: bool = False
def _emit_eos_once(self) -> None:
"""Ensure EndOfStream is only forwarded a single time."""
if not self._eos_emitted:
self._eos_emitted = True
self._on_output(EndOfStream())
def _initialize_formatter(self) -> None:
"""Initialize the output formatter."""
try:
self._output_formatter.begin()
except (AttributeError, TypeError, ValueError, RuntimeError) as e:
formatter_error_msg = f"Output formatter begin() failed: {e}"
warnings.warn(formatter_error_msg, stacklevel=2)
def _process_stdout_lines(self) -> None:
"""Process stdout lines and forward them to output."""
assert self._proc.stdout is not None
for line in self._proc.stdout:
self.last_stdout_ts = time.time()
if self._shutdown.is_set():
break
line_stripped = line.rstrip()
if not line_stripped:
continue
transformed_line = self._output_formatter.transform(line_stripped)
self._on_output(transformed_line)
def _handle_keyboard_interrupt(self) -> None:
"""Handle KeyboardInterrupt in reader thread."""
# Per project rules, handle interrupts in threads explicitly
thread_id = threading.current_thread().ident
thread_name = threading.current_thread().name
logger.warning("Thread %s (%s) caught KeyboardInterrupt", thread_id, thread_name)
logger.warning("Stack trace for thread %s:", thread_id)
traceback.print_exc()
# Try to ensure child process is terminated promptly
try:
self._proc.kill()
except (ProcessLookupError, PermissionError, OSError) as kill_error:
logger.warning("Failed to kill process: %s", kill_error)
# Propagate to main thread and re-raise
_thread.interrupt_main()
# EOF
self._emit_eos_once()
def _handle_io_error(self, e: ValueError | OSError) -> None:
"""Handle IO errors during stdout reading."""
# Normal shutdown scenarios include closed file descriptors.
error_str = str(e)
if any(msg in error_str for msg in ["closed file", "Bad file descriptor"]):
closed_file_msg = f"Output reader encountered closed file: {e}"
warnings.warn(closed_file_msg, stacklevel=2)
else:
logger.warning("Output reader encountered error: %s", e)
def _cleanup_stdout(self) -> None:
"""Close stdout stream safely."""
if self._proc.stdout and not self._proc.stdout.closed:
try:
self._proc.stdout.close()
except (ValueError, OSError) as err:
reader_error_msg = f"Output reader encountered error: {err}"
warnings.warn(reader_error_msg, stacklevel=2)
def _finalize_formatter(self) -> None:
"""Finalize the output formatter."""
try:
self._output_formatter.end()
except (AttributeError, TypeError, ValueError, RuntimeError) as e:
formatter_end_error_msg = f"Output formatter end() failed: {e}"
warnings.warn(formatter_end_error_msg, stacklevel=2)
def _run_with_error_handling(self) -> None:
"""Run stdout processing with error handling."""
try:
self._process_stdout_lines()
except KeyboardInterrupt:
self._handle_keyboard_interrupt()
raise
except (ValueError, OSError) as e:
self._handle_io_error(e)
finally:
# Signal end-of-stream to consumers exactly once
self._emit_eos_once()
def _perform_final_cleanup(self) -> None:
"""Perform final cleanup operations."""
# Cleanup stream and invoke completion callback
self._cleanup_stdout()
# Notify parent for timing/unregistration
try:
self._on_end()
finally:
# End formatter lifecycle within the reader context
self._finalize_formatter()
def run(self) -> None:
"""Continuously read stdout lines and forward them until EOF or shutdown."""
try:
# Begin formatter lifecycle within the reader context
self._initialize_formatter()
self._run_with_error_handling()
finally:
self._perform_final_cleanup()
class ProcessWatcher:
"""Background watcher that polls a process until it terminates."""
def __init__(self, running_process: "RunningProcess") -> None:
self._rp = running_process
self._thread: threading.Thread | None = None
def start(self) -> None:
name: str = "RPWatcher"
with contextlib.suppress(AttributeError, TypeError):
if self._rp.proc is not None:
name = f"RPWatcher-{self._rp.proc.pid}"
self._thread = threading.Thread(target=self._run, name=name, daemon=True)
self._thread.start()
def _run(self) -> None:
thread_id = threading.current_thread().ident
thread_name = threading.current_thread().name
try:
while not self._rp.shutdown.is_set():
# Enforce per-process timeout independently of wait()
if (
self._rp.timeout is not None
and self._rp.start_time is not None
and (time.time() - self._rp.start_time) > self._rp.timeout
):
logger.warning(
"Process timeout after %s seconds (watcher), killing: %s",
self._rp.timeout,
self._rp.command,
)
# Execute user-provided timeout callback if available
if self._rp.on_timeout is not None:
try:
process_info = self._rp._create_process_info() # noqa: SLF001
self._rp.on_timeout(process_info)
except (AttributeError, TypeError, ValueError, RuntimeError) as e:
logger.warning("Watcher timeout callback failed: %s", e)
self._rp.kill()
break
rc: int | None = self._rp.poll()
if rc is not None:
break
time.sleep(0.1)
except KeyboardInterrupt:
logger.warning("Thread %s (%s) caught KeyboardInterrupt", thread_id, thread_name)
logger.warning("Stack trace for thread %s:", thread_id)
traceback.print_exc()
_thread.interrupt_main()
raise
except (OSError, subprocess.SubprocessError, RuntimeError) as e:
# Surface unexpected errors and keep behavior consistent
logger.warning("Watcher thread error in %s: %s", thread_name, e)
traceback.print_exc()
@property
def thread(self) -> threading.Thread | None:
return self._thread
class _RunningProcessLineIterator(AbstractContextManager[Iterator[str]], Iterator[str]):
"""Context-managed iterator over a RunningProcess's output lines.
Yields only strings (never None). Stops on EndOfStream or when a per-line
timeout elapses.
"""
def __init__(self, rp: "RunningProcess", timeout: float | None) -> None:
self._rp = rp
self._timeout = timeout
# Context manager protocol
def __enter__(self) -> "_RunningProcessLineIterator":
return self
def __exit__(
self,
exc_type: type[BaseException] | None,
exc: BaseException | None,
tb: Any | None,
) -> bool:
# Do not suppress exceptions
return False
# Iterator protocol
def __iter__(self) -> Iterator[str]:
return self
def __next__(self) -> str:
next_item: str | EndOfStream = self._rp.get_next_line(timeout=self._timeout)
if isinstance(next_item, EndOfStream):
raise StopIteration
# Must be a string by contract
return next_item
class RunningProcess:

@@ -465,2 +228,13 @@ """

shell = any(part in shell_meta for part in command)
# Validate shell metacharacters when shell=False
if shell is False and isinstance(command, list):
shell_meta = {"&&", "||", "|", ";", ">", "<", "2>", "&"}
found_meta = [part for part in command if part in shell_meta]
if found_meta:
error_message = (
f"Shell metacharacters {found_meta} found in command but shell=False. "
f"Either set shell=True or remove shell metacharacters from the command."
)
raise ValueError(error_message)
self.command = command

@@ -473,4 +247,3 @@ self.shell: bool = shell

self.check = check
# Force auto_run to False if NO_PARALLEL is set
self.auto_run = False if os.environ.get("NO_PARALLEL") else auto_run
self.auto_run = auto_run
self.timeout = timeout

@@ -489,3 +262,3 @@ self.on_timeout = on_timeout

if auto_run:
self.run()
self.start()

@@ -658,3 +431,3 @@ def get_command_str(self) -> str:

def run(self) -> None:
def start(self) -> None:
"""

@@ -1106,3 +879,38 @@ Execute the command and stream its output to the queue.

@staticmethod
def run(
command: str | list[str],
cwd: Path | None,
check: bool,
timeout: int,
on_timeout: Callable[[ProcessInfo], None] | None = None,
) -> subprocess.CompletedProcess[str]:
"""Public static accessor for subprocess.run() replacement.
Execute a command with robust stdout handling, emulating subprocess.run().
Uses RunningProcess as the backend to provide:
- Continuous stdout streaming to prevent pipe blocking
- Merged stderr into stdout for unified output
- Timeout protection with optional stack trace dumping
- Standard subprocess.CompletedProcess return value
Args:
command: Command to execute as string or list of arguments.
cwd: Working directory for command execution. Required parameter.
check: If True, raise CalledProcessError for non-zero exit codes.
timeout: Maximum execution time in seconds.
on_timeout: Callback function executed when process times out.
Returns:
CompletedProcess with combined stdout and process return code.
stderr field is None since it's merged into stdout.
Raises:
RuntimeError: If process times out (wraps TimeoutError).
CalledProcessError: If check=True and process exits with non-zero code.
"""
return execute_subprocess_run(command, cwd, check, timeout, on_timeout)
# NOTE: RunningProcessManager and its singleton live in running_process_manager.py

@@ -1118,10 +926,5 @@

) -> subprocess.CompletedProcess[str]:
"""
Execute a command with robust stdout handling, emulating subprocess.run().
"""Execute a command with robust stdout handling, emulating subprocess.run().
Uses RunningProcess as the backend to provide:
- Continuous stdout streaming to prevent pipe blocking
- Merged stderr into stdout for unified output
- Timeout protection with optional stack trace dumping
- Standard subprocess.CompletedProcess return value
This is a backward compatibility wrapper for the static method RunningProcess.run().

@@ -1143,44 +946,2 @@ Args:

"""
# Use RunningProcess for robust stdout pumping with merged stderr
proc = RunningProcess(
command=command,
cwd=cwd,
check=False,
auto_run=True,
timeout=timeout,
on_timeout=on_timeout,
on_complete=None,
output_formatter=None,
)
try:
return_code: int = proc.wait()
except KeyboardInterrupt:
# Propagate interrupt behavior consistent with subprocess.run
raise
except TimeoutError as e:
# Align with subprocess.TimeoutExpired semantics by raising a CalledProcessError-like
# error with available output. Using TimeoutError here is consistent with internal RP.
error_message = f"CRITICAL: Process timed out after {timeout} seconds: {command}"
raise RuntimeError(error_message) from e
combined_stdout: str = proc.stdout
# Construct CompletedProcess (stderr is merged into stdout by design)
completed = subprocess.CompletedProcess(
args=command,
returncode=return_code,
stdout=combined_stdout,
stderr=None,
)
if check and return_code != 0:
# Raise the standard exception with captured output
raise subprocess.CalledProcessError(
returncode=return_code,
cmd=command,
output=combined_stdout,
stderr=None,
)
return completed
return RunningProcess.run(command, cwd, check, timeout, on_timeout)