"deploy/vscode:/vscode.git/clone" did not exist on "5bbbeae3596eb6b0babe66be352497dd8cd88cf8"
Unverified Commit 4ad739dd authored by MatejKosec's avatar MatejKosec Committed by GitHub
Browse files

fix: prevent aiperf pipe hang in planner scaling test (#6099)

Replace PIPE-based stdout/stderr capture with direct file output in LoadGenerator.generate_load() to prevent orphaned aiperf child processes from blocking communicate() indefinitely
Add start_new_session=True so os.killpg() can kill the entire process tree on timeout (not just the main process)
Add unit test validating process-group kill on timeout
Fixes DYN-2086
parent ae09b929
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
"""
Unit test for LoadGenerator subprocess management (DYN-2086).
Validates that aiperf timeouts kill the entire process group via os.killpg,
preventing orphaned child processes from holding pipe FDs and causing hangs.
"""
import asyncio
import signal
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from utils.load_generator import LoadGenerator
pytestmark = [
pytest.mark.gpu_0,
pytest.mark.pre_merge,
pytest.mark.unit,
pytest.mark.planner,
]
def test_timeout_kills_process_group(tmp_path):
"""On timeout, the entire process group must be killed via os.killpg."""
target_pid = 99999
generator = LoadGenerator()
mock_proc = MagicMock()
mock_proc.pid = target_pid
mock_proc.returncode = -9
mock_proc.wait = AsyncMock(return_value=-9)
async def fake_exec(*args, **kwargs):
return mock_proc
async def fake_wait_for(coro, timeout=None):
if hasattr(coro, "close"):
coro.close()
raise asyncio.TimeoutError()
async def _run():
with (
patch("asyncio.create_subprocess_exec", side_effect=fake_exec),
patch("asyncio.wait_for", side_effect=fake_wait_for),
patch("os.killpg") as mock_killpg,
):
with pytest.raises(RuntimeError, match="timed out"):
await generator.generate_load(1.0, 1, str(tmp_path))
mock_killpg.assert_called_once_with(target_pid, signal.SIGKILL)
asyncio.run(_run())
...@@ -13,6 +13,7 @@ import asyncio ...@@ -13,6 +13,7 @@ import asyncio
import json import json
import logging import logging
import os import os
import signal
import tempfile import tempfile
import time import time
from typing import Any, Dict, List, Optional from typing import Any, Dict, List, Optional
...@@ -134,34 +135,38 @@ class LoadGenerator: ...@@ -134,34 +135,38 @@ class LoadGenerator:
start_time = time.time() start_time = time.time()
# More generous timeout for high-load tests - allow 2x duration + 2 minutes buffer # More generous timeout for high-load tests - allow 2x duration + 2 minutes buffer
timeout = max(duration_sec * 2 + 120, int(duration_sec * 2.5)) timeout = max(duration_sec * 2 + 120, int(duration_sec * 2.5))
# Write stdout/stderr to files instead of using PIPE. aiperf may fork
# child processes that inherit pipe FDs; if those children outlive aiperf,
# communicate() blocks forever waiting for EOF. File-based output avoids
# this entirely. We also run aiperf in its own process group so that
# os.killpg() can clean up the entire tree on timeout.
stdout_path = os.path.join(artifact_dir, "aiperf.stdout.log")
stderr_path = os.path.join(artifact_dir, "aiperf.stderr.log")
try: try:
with open(stdout_path, "wb") as stdout_f, open(
stderr_path, "wb"
) as stderr_f:
proc = await asyncio.create_subprocess_exec( proc = await asyncio.create_subprocess_exec(
*cmd, *cmd,
stdout=asyncio.subprocess.PIPE, stdout=stdout_f,
stderr=asyncio.subprocess.PIPE, stderr=stderr_f,
start_new_session=True,
) )
try: try:
stdout, stderr = await asyncio.wait_for( await asyncio.wait_for(proc.wait(), timeout=timeout)
proc.communicate(), timeout=timeout
)
except asyncio.TimeoutError: except asyncio.TimeoutError:
proc.kill() try:
await proc.communicate() os.killpg(proc.pid, signal.SIGKILL)
except ProcessLookupError:
pass
await proc.wait()
logger.error("aiperf timed out") logger.error("aiperf timed out")
raise RuntimeError("Load generation timed out") raise RuntimeError("Load generation timed out")
end_time = time.time() end_time = time.time()
actual_duration = end_time - start_time actual_duration = end_time - start_time
# Persist logs for debugging
try:
with open(os.path.join(artifact_dir, "aiperf.stdout.log"), "wb") as f:
f.write(stdout or b"")
with open(os.path.join(artifact_dir, "aiperf.stderr.log"), "wb") as f:
f.write(stderr or b"")
except Exception:
pass
if proc.returncode == 0: if proc.returncode == 0:
logger.info("Load generation completed successfully") logger.info("Load generation completed successfully")
logger.info(f"Actual duration: {actual_duration:.2f}s") logger.info(f"Actual duration: {actual_duration:.2f}s")
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment