Unverified Commit aea57cae authored by jh-nv's avatar jh-nv Committed by GitHub
Browse files

fix: fix test process termination logic to properly trigger graceful shutdown (#5775)

parent ef1078e4
...@@ -175,6 +175,7 @@ class DynamoWorkerProcess(ManagedProcess): ...@@ -175,6 +175,7 @@ class DynamoWorkerProcess(ManagedProcess):
display_output=True, display_output=True,
terminate_existing=False, terminate_existing=False,
log_dir=log_dir, log_dir=log_dir,
display_name=worker_id,
) )
def __exit__(self, exc_type, exc_val, exc_tb): def __exit__(self, exc_type, exc_val, exc_tb):
......
...@@ -171,6 +171,7 @@ class DynamoWorkerProcess(ManagedProcess): ...@@ -171,6 +171,7 @@ class DynamoWorkerProcess(ManagedProcess):
stragglers=["VLLM::EngineCore"], stragglers=["VLLM::EngineCore"],
straggler_commands=["-m dynamo.vllm"], straggler_commands=["-m dynamo.vllm"],
log_dir=log_dir, log_dir=log_dir,
display_name=worker_id,
) )
def __exit__(self, exc_type, exc_val, exc_tb): def __exit__(self, exc_type, exc_val, exc_tb):
......
...@@ -38,6 +38,7 @@ class DynamoFrontendProcess(BaseDynamoFrontendProcess): ...@@ -38,6 +38,7 @@ class DynamoFrontendProcess(BaseDynamoFrontendProcess):
extra_args=extra_args if extra_args else None, extra_args=extra_args if extra_args else None,
extra_env=extra_env, extra_env=extra_env,
terminate_existing=False, terminate_existing=False,
display_name="frontend",
) )
......
...@@ -35,28 +35,38 @@ def terminate_process(process, logger=logging.getLogger(), immediate_kill=False) ...@@ -35,28 +35,38 @@ def terminate_process(process, logger=logging.getLogger(), immediate_kill=False)
def terminate_process_tree( def terminate_process_tree(
pid, logger=logging.getLogger(), immediate_kill=False, timeout=2 pid, logger=logging.getLogger(), immediate_kill=False, timeout=10
): ):
try: try:
parent = psutil.Process(pid) parent = psutil.Process(pid)
for child in parent.children(recursive=True): except psutil.NoSuchProcess:
terminate_process(child, logger, immediate_kill) return
terminate_process(parent, logger, immediate_kill) # 1. Snapshot children before signaling parent
children = parent.children(recursive=True)
for child in parent.children(recursive=True): # 2. Terminate parent first (graceful)
try: terminate_process(parent, logger, immediate_kill=immediate_kill)
child.wait(timeout)
except psutil.TimeoutExpired: # 3. Wait for parent to exit
terminate_process(child, logger, immediate_kill=True)
try: try:
parent.wait(timeout) parent.wait(timeout=timeout)
except psutil.TimeoutExpired: except psutil.TimeoutExpired:
logger.warning("Parent process did not exit within timeout")
terminate_process(parent, logger, immediate_kill=True) terminate_process(parent, logger, immediate_kill=True)
except psutil.NoSuchProcess: # 4. Terminate children if still alive
# Process already terminated for child in children:
pass terminate_process(child, logger, immediate_kill=immediate_kill)
# 5. Wait for all processes to exit
all_procs = [parent] + children
gone, alive = psutil.wait_procs(all_procs, timeout=timeout)
# 6. Escalate remaining alive processes if needed
for p in alive:
terminate_process(p, logger, immediate_kill=True)
psutil.wait_procs(alive, timeout=timeout)
@dataclass @dataclass
...@@ -75,6 +85,7 @@ class ManagedProcess: ...@@ -75,6 +85,7 @@ class ManagedProcess:
stragglers: List[str] = field(default_factory=list) stragglers: List[str] = field(default_factory=list)
straggler_commands: List[str] = field(default_factory=list) straggler_commands: List[str] = field(default_factory=list)
log_dir: str = os.getcwd() log_dir: str = os.getcwd()
display_name: Optional[str] = None
# Ensure attributes exist even if startup fails early # Ensure attributes exist even if startup fails early
proc: Optional[subprocess.Popen] = None proc: Optional[subprocess.Popen] = None
...@@ -107,6 +118,10 @@ class ManagedProcess: ...@@ -107,6 +118,10 @@ class ManagedProcess:
def __enter__(self): def __enter__(self):
try: try:
self._logger = logging.getLogger(self.__class__.__name__) self._logger = logging.getLogger(self.__class__.__name__)
# self._command_name = self.command[0]
if self.display_name:
self._command_name = self.display_name
else:
self._command_name = self.command[0] self._command_name = self.command[0]
# Keep test logs out of the git working tree: many tests pass a relative # Keep test logs out of the git working tree: many tests pass a relative
...@@ -610,6 +625,7 @@ class DynamoFrontendProcess(ManagedProcess): ...@@ -610,6 +625,7 @@ class DynamoFrontendProcess(ManagedProcess):
extra_env: Optional[dict[str, str]] = None, extra_env: Optional[dict[str, str]] = None,
# Default to false so pytest-xdist workers don't kill each other's frontends. # Default to false so pytest-xdist workers don't kill each other's frontends.
terminate_existing: bool = False, terminate_existing: bool = False,
display_name: Optional[str] = None,
): ):
# TODO: Refactor remaining duplicate "DynamoFrontendProcess" helpers in tests to # TODO: Refactor remaining duplicate "DynamoFrontendProcess" helpers in tests to
# use this shared implementation (and delete the copies): # use this shared implementation (and delete the copies):
...@@ -659,6 +675,7 @@ class DynamoFrontendProcess(ManagedProcess): ...@@ -659,6 +675,7 @@ class DynamoFrontendProcess(ManagedProcess):
display_output=True, display_output=True,
terminate_existing=terminate_existing, terminate_existing=terminate_existing,
log_dir=log_dir, log_dir=log_dir,
display_name=display_name,
) )
def __exit__(self, exc_type, exc_val, exc_tb): def __exit__(self, exc_type, exc_val, exc_tb):
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment