Unverified Commit fd035b19 authored by Tzu-Ling Kan's avatar Tzu-Ling Kan Committed by GitHub
Browse files

feat: Add gpu info to the tests when we kill a process. (#6552)


Signed-off-by: default avatarTzu-Ling <tzulingk@nvidia.com>
parent c6d8f225
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
import asyncio import asyncio
import logging import logging
import re import re
import time
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from dataclasses import dataclass, field from dataclasses import dataclass, field
from enum import Enum, auto from enum import Enum, auto
...@@ -28,6 +29,22 @@ from tests.utils.managed_deployment import DeploymentSpec, ManagedDeployment ...@@ -28,6 +29,22 @@ from tests.utils.managed_deployment import DeploymentSpec, ManagedDeployment
if TYPE_CHECKING: if TYPE_CHECKING:
from tests.fault_tolerance.deploy.base_checker import BaseChecker from tests.fault_tolerance.deploy.base_checker import BaseChecker
logger = logging.getLogger(__name__)
# Lazy import to avoid kubernetes dependency during module import
def _get_gpu_helpers():
"""Lazily import GPU helper functions to avoid kubernetes dependency at module level."""
from kubernetes.client.rest import ApiException
from tests.fault_tolerance.hardware.fault_injection_service.helpers import (
get_available_gpu_ids,
get_gpu_info,
get_processes_on_gpu,
)
return get_available_gpu_ids, get_gpu_info, get_processes_on_gpu, ApiException
# Import checker factory (actual import, not TYPE_CHECKING) # Import checker factory (actual import, not TYPE_CHECKING)
def _get_checkers_for_scenario( def _get_checkers_for_scenario(
...@@ -149,7 +166,9 @@ class Load: ...@@ -149,7 +166,9 @@ class Load:
max_retries: int = 3 # Increased for fault tolerance max_retries: int = 3 # Increased for fault tolerance
sla: Optional[float] = None sla: Optional[float] = None
client_type: str = "aiperf" # "aiperf" or "legacy" client_type: str = "aiperf" # "aiperf" or "legacy"
max_request_rate: float = 1.0 # Rate limiting for legacy client (requests/sec) max_request_rate: float = (
1.0 # Rate limiting (requests/sec) for both AI-Perf and legacy clients
)
success_threshold: float = 90.0 # Success rate threshold for tests success_threshold: float = 90.0 # Success rate threshold for tests
# For mixed token testing (overflow + recovery) # For mixed token testing (overflow + recovery)
...@@ -274,6 +293,364 @@ class TerminateProcessFailure(Failure): ...@@ -274,6 +293,364 @@ class TerminateProcessFailure(Failure):
self.process_name = process_name self.process_name = process_name
self.signal = signal self.signal = signal
def _log_process_list(self, pod):
"""Log filtered process list from ps aux."""
*_, ApiException = _get_gpu_helpers()
try:
result = pod.exec(["ps", "aux"])
if result.returncode != 0:
logger.warning(f"ps aux command exited with code {result.returncode}")
return
ps_output = result.stdout.decode() if result.stdout else ""
lines = ps_output.split("\n")
relevant_processes = [
line
for line in lines[1:]
if any(
keyword in line.lower() for keyword in ["python", "vllm", "dynamo"]
)
]
# Log as single block to avoid [TEST] prefix on each line
output_lines = ["\n--- Process List (ps aux) ---", lines[0]] # Header
output_lines.extend(relevant_processes)
logger.info("\n".join(output_lines))
except ApiException as e:
logger.warning(f"Kubernetes API error getting ps aux: {e}")
except Exception:
logger.exception("Unexpected error getting process list")
def _get_process_details_string(self, pod, pid: int) -> str:
"""Get detailed information for a specific PID as a string."""
*_, ApiException = _get_gpu_helpers()
try:
ps_result = pod.exec(["ps", "-p", str(pid), "-o", "pid,comm,args"])
if ps_result.returncode != 0:
return ""
ps_line = ps_result.stdout.decode().strip()
ps_lines = ps_line.split("\n")
if len(ps_lines) > 1:
return f" PID {pid}: {ps_lines[1]}"
return ""
except ApiException:
# Process may not exist or API unavailable - expected during termination
return ""
except Exception:
# Unexpected error (AttributeError, IndexError, UnicodeDecodeError, etc.)
logger.exception(f"Unexpected error getting process details for PID {pid}")
return ""
def _log_gpu_discovery_info(self, pod):
"""Log GPU information using gpu_discovery utilities."""
try:
(
get_available_gpu_ids,
get_gpu_info,
get_processes_on_gpu,
ApiException,
) = _get_gpu_helpers()
gpu_ids = get_available_gpu_ids(pod)
if not gpu_ids:
logger.warning("No GPUs found in pod")
return
# Build output as single message
output_lines = [
"\n--- GPU Information ---",
f"Available GPUs: {gpu_ids}",
"\n--- Per-GPU Process Mapping (from query-compute-apps) ---",
]
for gpu_id in gpu_ids:
gpu_info_lines = self._get_single_gpu_info(pod, gpu_id)
output_lines.extend(gpu_info_lines)
logger.info("\n".join(output_lines))
except ApiException as e:
logger.warning(f"Kubernetes API error getting GPU information: {e}")
except Exception:
logger.exception("Unexpected error getting GPU information")
def _get_single_gpu_info(self, pod, gpu_id: int) -> list[str]:
"""Get information for a single GPU as list of strings."""
(
get_available_gpu_ids,
get_gpu_info,
get_processes_on_gpu,
) = _get_gpu_helpers()
lines = []
gpu_info = get_gpu_info(pod, gpu_id)
if gpu_info:
lines.append(
f"\nGPU {gpu_id}: {gpu_info.get('name', 'Unknown')} "
f"(Memory: {gpu_info.get('memory_total', 'Unknown')})"
)
else:
lines.append(f"\nGPU {gpu_id}:")
pids = get_processes_on_gpu(pod, gpu_id)
if pids:
lines.append(f" Processes (PIDs): {pids}")
for pid in pids:
proc_details = self._get_process_details_string(pod, pid)
if proc_details:
lines.append(proc_details)
else:
lines.append(
" No processes running (note: small memory footprints may not appear)"
)
return lines
def _parse_nvidia_smi_process_line(self, line: str):
"""Parse a single line from nvidia-smi processes section.
Returns:
Tuple of (gpu_id, pid, process_name, memory) or None if parsing fails
"""
parts = [p.strip() for p in line.split("|") if p.strip()]
if not parts:
return None
fields = parts[0].split()
if len(fields) < 6:
return None
try:
gpu_id = fields[0]
pid = fields[3]
process_name = " ".join(fields[5:-1])
memory = fields[-1]
return (gpu_id, pid, process_name, memory)
except (ValueError, IndexError):
return None
def _log_nvidia_smi_output(self, pod):
"""Log complete nvidia-smi output with parsed process mapping."""
*_, ApiException = _get_gpu_helpers()
try:
result = pod.exec(["nvidia-smi"])
if result.returncode != 0:
logger.warning(
f"nvidia-smi command exited with code {result.returncode}"
)
return
gpu_status = result.stdout.decode() if result.stdout else ""
output_lines = [
"\n--- Complete GPU->Process Mapping (from full nvidia-smi) ---"
]
if "Processes:" in gpu_status:
output_lines.extend(self._get_parsed_nvidia_smi_processes(gpu_status))
output_lines.append("\n--- Full nvidia-smi Output (for reference) ---")
output_lines.append(gpu_status)
logger.info("\n".join(output_lines))
except ApiException as e:
logger.warning(f"Kubernetes API error getting nvidia-smi: {e}")
except Exception:
logger.exception("Unexpected error getting nvidia-smi output")
def _get_parsed_nvidia_smi_processes(self, gpu_status: str) -> list[str]:
"""Parse nvidia-smi processes section and return as list of strings."""
lines = ["GPU -> PID -> Process Name -> Memory:"]
try:
processes_section = gpu_status.split("Processes:")[1]
processes_lines = processes_section.split("\n")
for line in processes_lines:
if "MiB" in line and "|" in line:
parsed = self._parse_nvidia_smi_process_line(line)
if parsed:
gpu_id, pid, process_name, memory = parsed
lines.append(
f" GPU {gpu_id}: PID {pid} ({process_name}) - {memory}"
)
except (IndexError, ValueError) as e:
# Expected if nvidia-smi output format is unexpected
logger.debug(f"Failed to parse nvidia-smi processes: {e}")
except Exception:
# Unexpected error - should be investigated
logger.exception("Unexpected error parsing nvidia-smi processes")
return lines
def _log_pod_diagnostics(self, pod, phase: str):
"""Log comprehensive pod diagnostics including process list, GPU info, and nvidia-smi."""
logger.info(
f"\n{'=' * 80}\nPOD DIAGNOSTICS - {phase}\nPod: {pod.name}\n{'=' * 80}"
)
self._log_process_list(pod)
self._log_gpu_discovery_info(pod)
self._log_nvidia_smi_output(pod)
logger.info("=" * 80)
def _wait_for_pod_ready(
self,
pod,
max_wait: int = 120,
poll_interval: int = 1,
) -> Optional[int]:
"""Poll for pod to become ready and return elapsed time or None if timeout.
Checks Kubernetes pod readiness (readiness probe passes). Clients perform
their own service health checks independently.
Args:
pod: Kubernetes pod to check
max_wait: Maximum seconds to wait (default: 120)
poll_interval: Seconds between polls (default: 1)
Returns:
Elapsed seconds when pod becomes ready, or None if timeout
"""
*_, ApiException = _get_gpu_helpers()
for elapsed in range(max_wait):
time.sleep(poll_interval)
try:
pod.refresh()
if pod.ready():
actual_elapsed = (elapsed + 1) * poll_interval
logger.info(
f"Pod '{pod.name}' became ready after ~{actual_elapsed}s"
)
return actual_elapsed
except ApiException as e:
logger.debug(f"Kubernetes API error checking pod status: {e}")
except Exception as e:
logger.exception(
f"Unexpected error checking pod readiness for {pod.name}: {e}"
)
raise
logger.warning(f"Pod '{pod.name}' did not become ready within {max_wait}s")
return None
def _check_frontend_health_after_restart(
self,
deployment,
service_name: str,
base_status: str,
) -> str:
"""Check Frontend service health after a pod restart.
Args:
deployment: ManagedDeployment instance
service_name: Name of the service that was restarted
base_status: Base status string (e.g., "ready after 102s")
Returns:
Updated status string with Frontend health check result
"""
from tests.fault_tolerance.deploy.client import get_frontend_port
from tests.utils.client import wait_for_model_availability
logger.info(
f"Checking Frontend service health (after {service_name} pod restart)..."
)
pod_ports = {} # Temporary dict for port forward tracking
try:
logger.info("Getting frontend pod and setting up port forward...")
frontend_pod_name, local_port, frontend_pod = get_frontend_port(
managed_deployment=deployment,
client_index=0, # Use first frontend pod
deployment_spec=deployment.deployment_spec,
pod_ports=pod_ports,
logger=logger,
)
if not frontend_pod_name or not local_port:
logger.warning("Failed to get frontend port forward")
return f"{base_status}, Frontend port forward failed"
# Get model from deployment spec
model = self._get_model_from_deployment_spec(deployment, service_name)
endpoint = getattr(
deployment.deployment_spec, "_endpoint", "/v1/chat/completions"
)
logger.info(
f"Checking model '{model}' availability at localhost:{local_port}..."
)
url = f"http://localhost:{local_port}"
service_healthy = wait_for_model_availability(
url=url,
endpoint=endpoint,
model=model,
logger=logger,
)
if service_healthy:
logger.info("Frontend service health check passed")
return f"{base_status}, Frontend healthy"
else:
logger.warning("Frontend service health check failed")
return f"{base_status}, Frontend health check failed"
except Exception as e:
logger.exception(f"Error checking Frontend health: {e}")
return f"{base_status}, Frontend health check error"
finally:
# Clean up port forwards
for pf_name, port_forward in pod_ports.items():
try:
port_forward.stop()
except Exception as e:
logger.warning(f"Error stopping port forward: {e}")
def _get_model_from_deployment_spec(
self,
deployment,
service_name: str,
) -> str:
"""Get model name from deployment spec.
Tries to get model from the terminated service, otherwise uses default.
Args:
deployment: ManagedDeployment instance
service_name: Name of the service that was terminated
Returns:
Model name (always returns a value, uses default as fallback)
"""
logger.info(f"Attempting to get model from terminated service '{service_name}'")
try:
terminated_service_spec = deployment.deployment_spec[service_name]
model = terminated_service_spec.model
if model:
logger.info(
f"Got model '{model}' from terminated service '{service_name}'"
)
return model
except (KeyError, AttributeError) as e:
logger.info(f"Could not get model from {service_name}: {e}")
# Fallback to default
model = "Qwen/Qwen3-0.6B"
logger.info(f"Using default model: {model}")
return model
async def execute( async def execute(
self, deployment: ManagedDeployment, logger: logging.Logger self, deployment: ManagedDeployment, logger: logging.Logger
) -> list[str]: ) -> list[str]:
...@@ -282,6 +659,9 @@ class TerminateProcessFailure(Failure): ...@@ -282,6 +659,9 @@ class TerminateProcessFailure(Failure):
pod_names: list[str] = [] pod_names: list[str] = []
for service_name, pods in service_pod_dict.items(): for service_name, pods in service_pod_dict.items():
for pod in pods: for pod in pods:
# Log diagnostics before termination
self._log_pod_diagnostics(pod, "BEFORE PROCESS TERMINATION")
processes = deployment.get_processes(pod) processes = deployment.get_processes(pod)
for process in processes: for process in processes:
if self.process_name in process.command: if self.process_name in process.command:
...@@ -289,6 +669,28 @@ class TerminateProcessFailure(Failure): ...@@ -289,6 +669,28 @@ class TerminateProcessFailure(Failure):
f"Terminating {service_name} pod {pod} Pid {process.pid} Command {process.command}" f"Terminating {service_name} pod {pod} Pid {process.pid} Command {process.command}"
) )
process.kill(self.signal) process.kill(self.signal)
# Wait for pod to recover after process termination
logger.info(
f"\nWaiting for pod '{pod.name}' to become ready (max {120}s)..."
)
elapsed = self._wait_for_pod_ready(pod)
if not elapsed:
restart_status = f"timeout after {120}s"
self._log_pod_diagnostics(pod, f"AFTER RESTART ({restart_status})")
pod_names.append(pod.name)
continue
# Check Frontend service health after pod is ready
restart_status = self._check_frontend_health_after_restart(
deployment=deployment,
service_name=service_name,
base_status=f"ready after {elapsed}s",
)
self._log_pod_diagnostics(pod, f"AFTER RESTART ({restart_status})")
pod_names.append(pod.name) pod_names.append(pod.name)
return pod_names return pod_names
......
...@@ -9,7 +9,7 @@ import re ...@@ -9,7 +9,7 @@ import re
import signal import signal
from contextlib import contextmanager from contextlib import contextmanager
from multiprocessing.context import SpawnProcess from multiprocessing.context import SpawnProcess
from typing import Any, Optional from typing import Any
import pytest import pytest
...@@ -29,6 +29,61 @@ from tests.utils.managed_deployment import DeploymentSpec, ManagedDeployment ...@@ -29,6 +29,61 @@ from tests.utils.managed_deployment import DeploymentSpec, ManagedDeployment
from tests.utils.test_output import resolve_test_output_path from tests.utils.test_output import resolve_test_output_path
def get_model_from_deployment(
deployment_spec: DeploymentSpec,
scenario: Scenario = None,
service_name: str = None,
) -> str:
"""Get model name from deployment spec.
Args:
deployment_spec: Deployment specification
scenario: Optional Scenario object with backend and model info
service_name: Optional specific service to get model from
Returns:
Model name (never None, falls back to default)
"""
# If scenario specifies a model, use that
if scenario and scenario.model:
return scenario.model
# Try to get model from specified service
if service_name:
try:
service_spec = deployment_spec[service_name]
if service_spec and service_spec.model:
return service_spec.model
except (KeyError, AttributeError):
pass
# Get model from backend-specific worker (if scenario provided)
if scenario:
try:
if scenario.backend == "vllm":
return deployment_spec["VllmDecodeWorker"].model
elif scenario.backend == "sglang":
return deployment_spec["decode"].model
elif scenario.backend == "trtllm":
# Determine deployment type from scenario deployment name
if (
"agg" in deployment_spec.name
and "disagg" not in deployment_spec.name
):
return deployment_spec["TRTLLMWorker"].model
else:
return deployment_spec["TRTLLMDecodeWorker"].model
except (KeyError, AttributeError) as e:
logging.warning(
f"Could not get model from backend-specific worker "
f"(backend={scenario.backend}): {e}"
)
# Fallback to default
logging.info("Using default model: Qwen/Qwen3-0.6B")
return "Qwen/Qwen3-0.6B"
@pytest.fixture @pytest.fixture
def scenario(scenario_name, client_type): def scenario(scenario_name, client_type):
"""Get scenario and optionally override client type from command line. """Get scenario and optionally override client type from command line.
...@@ -460,32 +515,9 @@ async def test_fault_scenario( ...@@ -460,32 +515,9 @@ async def test_fault_scenario(
if image: if image:
scenario.deployment.set_image(image) scenario.deployment.set_image(image)
model: Optional[str] = None # Get model using helper function and ensure it's set on all services
if scenario.model: model = get_model_from_deployment(scenario.deployment, scenario)
scenario.deployment.set_model(scenario.model) scenario.deployment.set_model(model) # Set model on all services including Frontend
model = scenario.model
else:
# Get model from the appropriate worker based on backend
try:
if scenario.backend == "vllm":
model = scenario.deployment["VllmDecodeWorker"].model
elif scenario.backend == "sglang":
model = scenario.deployment["decode"].model
elif scenario.backend == "trtllm":
# Determine deployment type from scenario deployment name
if (
"agg" in scenario.deployment.name
and "disagg" not in scenario.deployment.name
):
model = scenario.deployment["TRTLLMWorker"].model
else:
model = scenario.deployment["TRTLLMDecodeWorker"].model
else:
model = None
except (KeyError, AttributeError):
model = None
# Fallback to default if still None
model = model or "Qwen/Qwen3-0.6B"
scenario.deployment.set_logging(True, "info") scenario.deployment.set_logging(True, "info")
...@@ -501,7 +533,7 @@ async def test_fault_scenario( ...@@ -501,7 +533,7 @@ async def test_fault_scenario(
with _clients( with _clients(
logger, logger,
request.node.name, resolve_test_output_path(request.node.name),
scenario.deployment, scenario.deployment,
namespace, namespace,
model, model,
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment