Unverified Commit e55ebec5 authored by Keiven C's avatar Keiven C Committed by GitHub
Browse files

chore: rename terminate_existing to terminate_all_matching_processes (#5923)


Signed-off-by: default avatarKeiven Chang <keivenchang@users.noreply.github.com>
Co-authored-by: default avatarKeiven Chang <keivenchang@users.noreply.github.com>
parent 4d9e64a2
...@@ -54,7 +54,7 @@ class EchoTensorWorkerProcess(ManagedProcess): ...@@ -54,7 +54,7 @@ class EchoTensorWorkerProcess(ManagedProcess):
timeout=300, timeout=300,
display_output=True, display_output=True,
log_dir=log_dir, log_dir=log_dir,
terminate_existing=False, terminate_all_matching_process_names=False,
) )
......
...@@ -107,7 +107,7 @@ class VllmPromptEmbedsWorkerProcess(ManagedProcess): ...@@ -107,7 +107,7 @@ class VllmPromptEmbedsWorkerProcess(ManagedProcess):
], ],
timeout=500, timeout=500,
display_output=True, display_output=True,
terminate_existing=False, terminate_all_matching_process_names=False,
stragglers=["VLLM::EngineCore"], stragglers=["VLLM::EngineCore"],
straggler_commands=["-m dynamo.vllm"], straggler_commands=["-m dynamo.vllm"],
log_dir=log_dir, log_dir=log_dir,
...@@ -151,7 +151,7 @@ def start_services( ...@@ -151,7 +151,7 @@ def start_services(
with DynamoFrontendProcess( with DynamoFrontendProcess(
request, request,
frontend_port=frontend_port, frontend_port=frontend_port,
terminate_existing=False, terminate_all_matching_process_names=False,
extra_args=["--store-kv", "file", "--request-plane", "tcp"], extra_args=["--store-kv", "file", "--request-plane", "tcp"],
): ):
logger.info("Frontend started for prompt embeds tests") logger.info("Frontend started for prompt embeds tests")
......
...@@ -118,7 +118,7 @@ class VllmWorkerProcess(ManagedProcess): ...@@ -118,7 +118,7 @@ class VllmWorkerProcess(ManagedProcess):
], ],
timeout=500, timeout=500,
display_output=True, display_output=True,
terminate_existing=False, terminate_all_matching_process_names=False,
stragglers=["VLLM::EngineCore"], stragglers=["VLLM::EngineCore"],
straggler_commands=["-m dynamo.vllm"], straggler_commands=["-m dynamo.vllm"],
log_dir=log_dir, log_dir=log_dir,
...@@ -180,7 +180,7 @@ def start_services( ...@@ -180,7 +180,7 @@ def start_services(
# If the frontend hits a Rust panic, enabling backtraces makes failures diagnosable # If the frontend hits a Rust panic, enabling backtraces makes failures diagnosable
# from CI logs without needing to repro locally. # from CI logs without needing to repro locally.
# extra_env={"RUST_BACKTRACE": "1", "TOKIO_BACKTRACE": "1"}, # extra_env={"RUST_BACKTRACE": "1", "TOKIO_BACKTRACE": "1"},
terminate_existing=False, terminate_all_matching_process_names=False,
): ):
logger.info("Frontend started for tests") logger.info("Frontend started for tests")
with VllmWorkerProcess( with VllmWorkerProcess(
......
...@@ -635,7 +635,7 @@ def llm_server_kvbm(request, runtime_services_dynamic_ports): ...@@ -635,7 +635,7 @@ def llm_server_kvbm(request, runtime_services_dynamic_ports):
except Exception: except Exception:
pass # Continue cleanup even if one process fails pass # Continue cleanup even if one process fails
# SAFETY: Do NOT use terminate_existing=True or stragglers=["vllm"] here. # SAFETY: Do NOT use terminate_all_matching_process_names=True or stragglers=["vllm"] here.
# Those kill ALL vLLM processes system-wide, breaking parallel test execution. # Those kill ALL vLLM processes system-wide, breaking parallel test execution.
# Port-based cleanup above is targeted and xdist-safe. # Port-based cleanup above is targeted and xdist-safe.
with ManagedProcess( with ManagedProcess(
...@@ -644,7 +644,7 @@ def llm_server_kvbm(request, runtime_services_dynamic_ports): ...@@ -644,7 +644,7 @@ def llm_server_kvbm(request, runtime_services_dynamic_ports):
health_check_ports=[port, metrics_port], # vLLM server + KVBM metrics health_check_ports=[port, metrics_port], # vLLM server + KVBM metrics
timeout=timeout, timeout=timeout,
display_output=True, display_output=True,
terminate_existing=False, # Port-based cleanup done above instead terminate_all_matching_process_names=False, # Port-based cleanup done above instead
stragglers=[], # Empty - we handle cleanup manually per port stragglers=[], # Empty - we handle cleanup manually per port
straggler_commands=[], # Empty - we handle cleanup manually per port straggler_commands=[], # Empty - we handle cleanup manually per port
log_dir=log_dir, log_dir=log_dir,
......
...@@ -312,7 +312,7 @@ def frontend_server(test_directory, runtime_services): ...@@ -312,7 +312,7 @@ def frontend_server(test_directory, runtime_services):
working_dir=str(test_directory), working_dir=str(test_directory),
display_output=False, display_output=False,
log_dir=str(frontend_log_dir), # Absolute path keeps logs in test directory log_dir=str(frontend_log_dir), # Absolute path keeps logs in test directory
terminate_existing=False, # Don't kill nats-server/etcd started by runtime_services terminate_all_matching_process_names=False, # Don't kill nats-server/etcd started by runtime_services
) as frontend_process: ) as frontend_process:
# Get actual log file path from ManagedProcess (it may modify log_dir to use temp directory) # Get actual log file path from ManagedProcess (it may modify log_dir to use temp directory)
log_file = Path(frontend_process._log_path) log_file = Path(frontend_process._log_path)
...@@ -405,7 +405,7 @@ def llm_worker(frontend_server, test_directory, runtime_services, engine_type): ...@@ -405,7 +405,7 @@ def llm_worker(frontend_server, test_directory, runtime_services, engine_type):
working_dir=str(test_directory), working_dir=str(test_directory),
display_output=False, display_output=False,
log_dir=str(worker_log_dir), # Absolute path keeps logs in test directory log_dir=str(worker_log_dir), # Absolute path keeps logs in test directory
terminate_existing=False, terminate_all_matching_process_names=False,
) as worker_process: ) as worker_process:
# Get actual log file path from ManagedProcess (it may modify log_dir to use temp directory) # Get actual log file path from ManagedProcess (it may modify log_dir to use temp directory)
log_file = Path(worker_process._log_path) log_file = Path(worker_process._log_path)
...@@ -743,7 +743,7 @@ class TestConsolidatorRouterE2E: ...@@ -743,7 +743,7 @@ class TestConsolidatorRouterE2E:
working_dir=str(test_directory), working_dir=str(test_directory),
display_output=False, display_output=False,
log_dir=str(frontend_log_dir), # Absolute path keeps logs in test directory log_dir=str(frontend_log_dir), # Absolute path keeps logs in test directory
terminate_existing=False, # Don't kill nats-server/etcd started by runtime_services terminate_all_matching_process_names=False, # Don't kill nats-server/etcd started by runtime_services
) as _frontend_process: ) as _frontend_process:
# Get actual log file path from ManagedProcess # Get actual log file path from ManagedProcess
frontend_log = Path(_frontend_process._log_path) frontend_log = Path(_frontend_process._log_path)
...@@ -829,7 +829,7 @@ class TestConsolidatorRouterE2E: ...@@ -829,7 +829,7 @@ class TestConsolidatorRouterE2E:
log_dir=str( log_dir=str(
worker_log_dir worker_log_dir
), # Absolute path keeps logs in test directory ), # Absolute path keeps logs in test directory
terminate_existing=False, terminate_all_matching_process_names=False,
) as _worker_process: ) as _worker_process:
# Get actual log file path from ManagedProcess (it may modify log_dir to use temp directory) # Get actual log file path from ManagedProcess (it may modify log_dir to use temp directory)
worker_log = Path(_worker_process._log_path) worker_log = Path(_worker_process._log_path)
......
...@@ -88,7 +88,7 @@ class DynamoWorkerProcess(ManagedProcess): ...@@ -88,7 +88,7 @@ class DynamoWorkerProcess(ManagedProcess):
], ],
timeout=300, timeout=300,
display_output=True, display_output=True,
terminate_existing=False, terminate_all_matching_process_names=False,
log_dir=log_dir, log_dir=log_dir,
) )
......
...@@ -94,7 +94,7 @@ class KVRouterProcess(ManagedProcess): ...@@ -94,7 +94,7 @@ class KVRouterProcess(ManagedProcess):
(f"http://localhost:{frontend_port}/v1/models", self._check_ready) (f"http://localhost:{frontend_port}/v1/models", self._check_ready)
], ],
log_dir=request.node.name, log_dir=request.node.name,
terminate_existing=False, terminate_all_matching_process_names=False,
) )
self.port = frontend_port self.port = frontend_port
...@@ -1222,7 +1222,7 @@ def _test_router_overload_503( ...@@ -1222,7 +1222,7 @@ def _test_router_overload_503(
) )
], ],
log_dir=request.node.name, log_dir=request.node.name,
terminate_existing=False, terminate_all_matching_process_names=False,
) )
kv_router.__enter__() kv_router.__enter__()
......
...@@ -208,7 +208,7 @@ class MockerProcess: ...@@ -208,7 +208,7 @@ class MockerProcess:
health_check_ports=[], health_check_ports=[],
health_check_urls=[], health_check_urls=[],
log_dir=request.node.name, log_dir=request.node.name,
terminate_existing=False, terminate_all_matching_process_names=False,
) )
logger.info( logger.info(
f"Created mocker process with {num_mockers} worker(s), endpoint: {self.endpoint}" f"Created mocker process with {num_mockers} worker(s), endpoint: {self.endpoint}"
...@@ -294,7 +294,7 @@ class DisaggMockerProcess: ...@@ -294,7 +294,7 @@ class DisaggMockerProcess:
health_check_ports=[], health_check_ports=[],
health_check_urls=[], health_check_urls=[],
log_dir=request.node.name, log_dir=request.node.name,
terminate_existing=False, terminate_all_matching_process_names=False,
) )
logger.info( logger.info(
f"Created {worker_type} mocker process with {num_mockers} worker(s), " f"Created {worker_type} mocker process with {num_mockers} worker(s), "
......
...@@ -209,7 +209,7 @@ class SGLangProcess: ...@@ -209,7 +209,7 @@ class SGLangProcess:
health_check_ports=[], health_check_ports=[],
health_check_urls=[], health_check_urls=[],
log_dir=request.node.name, log_dir=request.node.name,
terminate_existing=False, terminate_all_matching_process_names=False,
) )
self.worker_processes.append(process) self.worker_processes.append(process)
if data_parallel_size is not None: if data_parallel_size is not None:
...@@ -251,7 +251,7 @@ class SGLangProcess: ...@@ -251,7 +251,7 @@ class SGLangProcess:
if process.data_dir: if process.data_dir:
process._remove_directory(process.data_dir) process._remove_directory(process.data_dir)
process._terminate_existing() process._terminate_all_matching_process_names()
logger.info( logger.info(
f"[SGLangProcess] Launching process {i} (pid will be assigned)..." f"[SGLangProcess] Launching process {i} (pid will be assigned)..."
) )
......
...@@ -189,7 +189,7 @@ class TRTLLMProcess: ...@@ -189,7 +189,7 @@ class TRTLLMProcess:
health_check_ports=[], health_check_ports=[],
health_check_urls=[], health_check_urls=[],
log_dir=request.node.name, log_dir=request.node.name,
terminate_existing=False, terminate_all_matching_process_names=False,
) )
self.worker_processes.append(process) self.worker_processes.append(process)
logger.info( logger.info(
...@@ -224,7 +224,7 @@ class TRTLLMProcess: ...@@ -224,7 +224,7 @@ class TRTLLMProcess:
if process.data_dir: if process.data_dir:
process._remove_directory(process.data_dir) process._remove_directory(process.data_dir)
process._terminate_existing() process._terminate_all_matching_process_names()
logger.info( logger.info(
f"[TRTLLMProcess] Launching process {i} (pid will be assigned)..." f"[TRTLLMProcess] Launching process {i} (pid will be assigned)..."
) )
......
...@@ -226,7 +226,7 @@ class VLLMProcess: ...@@ -226,7 +226,7 @@ class VLLMProcess:
health_check_ports=[], health_check_ports=[],
health_check_urls=[], health_check_urls=[],
log_dir=request.node.name, log_dir=request.node.name,
terminate_existing=False, terminate_all_matching_process_names=False,
) )
self.worker_processes.append(process) self.worker_processes.append(process)
if data_parallel_size is not None: if data_parallel_size is not None:
...@@ -268,7 +268,7 @@ class VLLMProcess: ...@@ -268,7 +268,7 @@ class VLLMProcess:
if process.data_dir: if process.data_dir:
process._remove_directory(process.data_dir) process._remove_directory(process.data_dir)
process._terminate_existing() process._terminate_all_matching_process_names()
logger.info( logger.info(
f"[VLLMProcess] Launching process {i} (pid will be assigned)..." f"[VLLMProcess] Launching process {i} (pid will be assigned)..."
) )
......
...@@ -187,7 +187,7 @@ class EngineProcess(ManagedProcess): ...@@ -187,7 +187,7 @@ class EngineProcess(ManagedProcess):
), ),
], ],
delayed_start=config.delayed_start, delayed_start=config.delayed_start,
terminate_existing=False, terminate_all_matching_process_names=False,
stragglers=config.stragglers, stragglers=config.stragglers,
log_dir=request.node.name, log_dir=request.node.name,
) )
......
...@@ -21,13 +21,20 @@ from tests.utils.port_utils import allocate_port, deallocate_port ...@@ -21,13 +21,20 @@ from tests.utils.port_utils import allocate_port, deallocate_port
def terminate_process(process, logger=logging.getLogger(), immediate_kill=False): def terminate_process(process, logger=logging.getLogger(), immediate_kill=False):
"""Terminate a single process.
Kill Strategy:
- immediate_kill=False: Send SIGTERM (graceful, signal 15)
- immediate_kill=True: Send SIGKILL (force, signal 9, aka kill -9)
"""
try: try:
logger.info("Terminating PID: %s name: %s", process.pid, process.name()) logger.info("Terminating PID: %s name: %s", process.pid, process.name())
if immediate_kill: if immediate_kill:
logger.info("Sending Kill: %s %s", process.pid, process.name()) logger.info("Sending SIGKILL (kill -9): %s %s", process.pid, process.name())
process.kill() process.kill() # SIGKILL (9)
else: else:
process.terminate() logger.info("Sending SIGTERM (kill): %s %s", process.pid, process.name())
process.terminate() # SIGTERM (15)
except psutil.AccessDenied: except psutil.AccessDenied:
logger.warning("Access denied for PID %s", process.pid) logger.warning("Access denied for PID %s", process.pid)
except psutil.NoSuchProcess: except psutil.NoSuchProcess:
...@@ -37,6 +44,24 @@ def terminate_process(process, logger=logging.getLogger(), immediate_kill=False) ...@@ -37,6 +44,24 @@ def terminate_process(process, logger=logging.getLogger(), immediate_kill=False)
def terminate_process_tree( def terminate_process_tree(
pid, logger=logging.getLogger(), immediate_kill=False, timeout=10 pid, logger=logging.getLogger(), immediate_kill=False, timeout=10
): ):
"""Terminate a process and all its children.
Kill Sequence:
==============
1. Snapshot all children (recursive)
2. Send SIGTERM to parent IMMEDIATELY (no delay)
3. Wait up to `timeout` seconds (default 10s) for parent to exit
4. If parent still alive after timeout: Send SIGKILL to parent (force)
5. Send SIGTERM to all children IMMEDIATELY
6. Wait up to `timeout` seconds for all processes to exit
7. If any still alive after timeout: Send SIGKILL to remaining (force)
Timeout Parameter:
- Controls how long to WAIT AFTER SIGTERM before escalating to SIGKILL
- NOT a delay before sending SIGTERM (SIGTERM is sent immediately)
Summary: SIGTERM (immediate) → wait → SIGKILL if still alive
"""
try: try:
parent = psutil.Process(pid) parent = psutil.Process(pid)
except psutil.NoSuchProcess: except psutil.NoSuchProcess:
...@@ -71,6 +96,32 @@ def terminate_process_tree( ...@@ -71,6 +96,32 @@ def terminate_process_tree(
@dataclass @dataclass
class ManagedProcess: class ManagedProcess:
"""Manages a subprocess with health checks and automatic cleanup.
Parallel Execution Safety:
-------------------------
For pytest-xdist or parallel test execution, use terminate_all_matching_process_names=False:
# ❌ NOT SAFE for parallel tests (kills ALL vllm processes system-wide):
ManagedProcess(
command=["vllm", "serve", "--port", "8000"],
terminate_all_matching_process_names=True, # Kills all vllm, including other tests!
stragglers=["vllm"],
)
# ✅ SAFE for parallel tests (only kills what we launch):
ManagedProcess(
command=["vllm", "serve", "--port", "8000"],
terminate_all_matching_process_names=False, # Don't kill other processes
)
With terminate_all_matching_process_names=False and dynamic/randomized ports:
- Each test gets unique ports, so zombies from previous runs won't conflict
- ManagedProcess only terminates the process it launched (via self.proc.pid)
- No risk of killing other tests' processes or developer's manual processes
- Pytest will clean up any remaining processes when the test process exits
"""
command: List[str] command: List[str]
env: Optional[dict] = None env: Optional[dict] = None
health_check_ports: List[int] = field(default_factory=list) health_check_ports: List[int] = field(default_factory=list)
...@@ -81,7 +132,11 @@ class ManagedProcess: ...@@ -81,7 +132,11 @@ class ManagedProcess:
working_dir: Optional[str] = None working_dir: Optional[str] = None
display_output: bool = False display_output: bool = False
data_dir: Optional[str] = None data_dir: Optional[str] = None
terminate_existing: bool = True # WARNING: terminate_all_matching_process_names=True is NOT safe for pytest-xdist
# Kills ALL processes system-wide with matching name (other tests, dev processes, etc.)
# Use False for parallel tests with dynamic ports to only kill launched process.
# TODO: Change default to False once all tests use dynamic ports
terminate_all_matching_process_names: bool = True
stragglers: List[str] = field(default_factory=list) stragglers: List[str] = field(default_factory=list)
straggler_commands: List[str] = field(default_factory=list) straggler_commands: List[str] = field(default_factory=list)
log_dir: str = os.getcwd() log_dir: str = os.getcwd()
...@@ -141,7 +196,7 @@ class ManagedProcess: ...@@ -141,7 +196,7 @@ class ManagedProcess:
if self.data_dir: if self.data_dir:
self._remove_directory(self.data_dir) self._remove_directory(self.data_dir)
self._terminate_existing() self._terminate_all_matching_process_names() # Name-based cleanup (NOT xdist safe)
self._start_process() self._start_process()
time.sleep(self.delayed_start) time.sleep(self.delayed_start)
elapsed = self._check_ports(self.timeout) elapsed = self._check_ports(self.timeout)
...@@ -206,6 +261,29 @@ class ManagedProcess: ...@@ -206,6 +261,29 @@ class ManagedProcess:
self._logger.error("Error during straggler cleanup: %s", e) self._logger.error("Error during straggler cleanup: %s", e)
def __exit__(self, exc_type, exc_val, exc_tb): def __exit__(self, exc_type, exc_val, exc_tb):
"""Cleanup: Terminate launched processes.
Termination Strategy (Graceful → Escalate to Force):
=====================================================
1. Send SIGTERM to process group immediately (no delay before SIGTERM)
2. Wait up to 2s (poll every 0.1s) for processes to exit
3. If still alive after 2s: Send SIGKILL (force kill)
4. Terminate individual processes (self.proc, tee, sed):
- Send SIGTERM immediately (no delay)
- Wait up to 10s for exit
- If still alive after 10s: Send SIGKILL (force kill)
5. Clean up straggler processes (if configured)
Signal Details:
- SIGTERM (15): Graceful - allows cleanup handlers to run
- SIGKILL (9): Force kill - immediate, cannot be caught
Timeout Parameter:
- Controls how long to WAIT AFTER SIGTERM before escalating to SIGKILL
- NOT a delay before sending SIGTERM (SIGTERM is sent immediately)
This ALWAYS runs regardless of terminate_all_matching_process_names setting.
"""
try: try:
self._terminate_process_group() self._terminate_process_group()
...@@ -295,14 +373,25 @@ class ManagedProcess: ...@@ -295,14 +373,25 @@ class ManagedProcess:
def _terminate_process_group(self, timeout: float = 2.0): def _terminate_process_group(self, timeout: float = 2.0):
"""Terminate the entire process group/session started for the child. """Terminate the entire process group/session started for the child.
This catches cases where the launcher shell exits and its children are reparented, Kill Sequence:
leaving no parent PID to traverse, but they remain in the same process group. ==============
1. Send SIGTERM to entire process group IMMEDIATELY (no delay)
2. Wait up to `timeout` seconds (default 2s), polling every 0.1s
3. If still alive after timeout: Send SIGKILL (force kill, immediate)
Timeout Parameter:
- Controls how long to WAIT AFTER SIGTERM before escalating to SIGKILL
- NOT a delay before sending SIGTERM (SIGTERM is sent immediately)
Process groups catch cases where the launcher shell exits and its
children are reparented, leaving no parent PID to traverse, but they
remain in the same process group.
""" """
if self._pgid is None: if self._pgid is None:
return return
try: try:
self._logger.info("Terminating process group: %s", self._pgid) self._logger.info("Terminating process group: %s", self._pgid)
os.killpg(self._pgid, signal.SIGTERM) os.killpg(self._pgid, signal.SIGTERM) # Step 1: Graceful SIGTERM
except ProcessLookupError: except ProcessLookupError:
return return
except Exception as e: except Exception as e:
...@@ -311,13 +400,13 @@ class ManagedProcess: ...@@ -311,13 +400,13 @@ class ManagedProcess:
) )
return return
# Poll for process exit instead of fixed sleep to minimize teardown time # Step 2: Poll for process exit instead of fixed sleep to minimize teardown time
poll_interval = 0.1 poll_interval = 0.1
elapsed = 0.0 elapsed = 0.0
while elapsed < timeout: while elapsed < timeout:
try: try:
# Check if any process in the group is still alive # Check if any process in the group is still alive
os.killpg(self._pgid, 0) # Signal 0 = check existence os.killpg(self._pgid, 0) # Signal 0 = check existence (no kill)
except ProcessLookupError: except ProcessLookupError:
# Process group no longer exists - done # Process group no longer exists - done
return return
...@@ -327,9 +416,9 @@ class ManagedProcess: ...@@ -327,9 +416,9 @@ class ManagedProcess:
time.sleep(poll_interval) time.sleep(poll_interval)
elapsed += poll_interval elapsed += poll_interval
# Force kill if anything remains after timeout # Step 3: Force kill if anything remains after timeout
try: try:
os.killpg(self._pgid, signal.SIGKILL) os.killpg(self._pgid, signal.SIGKILL) # SIGKILL (kill -9) - immediate
except ProcessLookupError: except ProcessLookupError:
pass pass
except Exception as e: except Exception as e:
...@@ -554,8 +643,19 @@ class ManagedProcess: ...@@ -554,8 +643,19 @@ class ManagedProcess:
) )
raise RuntimeError("FAILED: Custom health check") raise RuntimeError("FAILED: Custom health check")
def _terminate_existing(self): def _terminate_all_matching_process_names(self):
if self.terminate_existing: # WARNING: This method is NOT pytest-xdist safe! Kills ALL matching processes system-wide.
# DANGER: When terminate_all_matching_process_names=True, this kills:
# - Processes with matching name (self._command_name)
# - Processes in stragglers list
# - Processes with matching command substring (straggler_commands)
# It does NOT check:
# - Port numbers (kills processes on ALL ports, not just ours)
# - Process ownership (kills other tests' processes)
# - Working directory (kills unrelated processes)
# RECOMMENDED: Use terminate_all_matching_process_names=False for parallel tests.
# ManagedProcess already only kills what it launched (self.proc.pid) on exit.
if self.terminate_all_matching_process_names:
for proc in psutil.process_iter(["name", "cmdline"]): for proc in psutil.process_iter(["name", "cmdline"]):
try: try:
if ( if (
...@@ -624,7 +724,7 @@ class DynamoFrontendProcess(ManagedProcess): ...@@ -624,7 +724,7 @@ class DynamoFrontendProcess(ManagedProcess):
extra_args: Optional[list[str]] = None, extra_args: Optional[list[str]] = None,
extra_env: Optional[dict[str, str]] = None, extra_env: Optional[dict[str, str]] = None,
# Default to false so pytest-xdist workers don't kill each other's frontends. # Default to false so pytest-xdist workers don't kill each other's frontends.
terminate_existing: bool = False, terminate_all_matching_process_names: bool = False,
display_name: Optional[str] = None, display_name: Optional[str] = None,
): ):
# TODO: Refactor remaining duplicate "DynamoFrontendProcess" helpers in tests to # TODO: Refactor remaining duplicate "DynamoFrontendProcess" helpers in tests to
...@@ -673,7 +773,7 @@ class DynamoFrontendProcess(ManagedProcess): ...@@ -673,7 +773,7 @@ class DynamoFrontendProcess(ManagedProcess):
command=command, command=command,
env=env, env=env,
display_output=True, display_output=True,
terminate_existing=terminate_existing, terminate_all_matching_process_names=terminate_all_matching_process_names,
log_dir=log_dir, log_dir=log_dir,
display_name=display_name, display_name=display_name,
) )
...@@ -695,10 +795,12 @@ class DynamoFrontendProcess(ManagedProcess): ...@@ -695,10 +795,12 @@ class DynamoFrontendProcess(ManagedProcess):
def main(): def main():
# NOTE: This entrypoint is for manual testing/debugging of `ManagedProcess` only. # NOTE: This entrypoint is for manual testing/debugging of `ManagedProcess` only.
# It is not used by the pytest suite. # It is not used by the pytest suite.
# Example: Safe for parallel tests with terminate_all_matching_process_names=False
with ManagedProcess( with ManagedProcess(
command=["python", "-m", "dynamo.frontend"], command=["python", "-m", "dynamo.frontend", "--port", "8000"],
display_output=True, display_output=True,
terminate_existing=True, terminate_all_matching_process_names=False, # ✅ Safe - only kills what we launch
health_check_ports=[8000], health_check_ports=[8000],
health_check_urls=["http://localhost:8000/v1/models"], health_check_urls=["http://localhost:8000/v1/models"],
timeout=10, timeout=10,
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment