"launch/vscode:/vscode.git/clone" did not exist on "ae4e96a2bcdcbdf7ebc09dd79bc1e5d6ac381319"
Unverified Commit 011a200a authored by nv-oviya's avatar nv-oviya Committed by GitHub
Browse files

feat(fault-injection): Add core testing helper utilities (#4040)

parent 78934278
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
#
# SPDX-License-Identifier: Apache-2.0
#
"""
Fault tolerance testing helper utilities.
This package provides reusable utilities for testing fault tolerance scenarios.
"""
__all__ = [
"InferenceLoadTester",
"get_inference_endpoint",
"NodeOperations",
"PodOperations",
]
from .inference_testing import InferenceLoadTester, get_inference_endpoint
from .k8s_operations import NodeOperations, PodOperations
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
#
# SPDX-License-Identifier: Apache-2.0
#
"""
Inference load testing utilities for fault tolerance tests.
Provides continuous load generation and statistics tracking for
validating inference availability during fault injection scenarios.
Supports both local (port-forwarded) and in-cluster execution.
"""
import os
import threading
import time
from typing import Dict, List, Optional
import requests
def get_inference_endpoint(
deployment_name: str, namespace: str, local_port: int = 8000
) -> str:
"""
Get inference endpoint URL based on environment.
Args:
deployment_name: Name of the deployment
namespace: Kubernetes namespace
local_port: Port for local port-forwarding (default: 8000)
Returns:
Inference endpoint URL
"""
in_cluster = os.getenv("KUBERNETES_SERVICE_HOST") is not None
if in_cluster:
# Use cluster-internal service DNS
return (
f"http://{deployment_name}.{namespace}.svc.cluster.local:80/v1/completions"
)
else:
# Use port-forwarded localhost
return f"http://localhost:{local_port}/v1/completions"
class InferenceLoadTester:
"""Continuous inference load generator for fault tolerance testing."""
def __init__(self, endpoint: str, model_name: str, timeout: int = 30):
"""
Initialize the inference load tester.
Args:
endpoint: Inference endpoint URL (e.g., "http://localhost:8000/v1/completions")
model_name: Model name to use in requests
timeout: Request timeout in seconds (default: 30)
"""
self.endpoint = endpoint
self.model_name = model_name
self.timeout = timeout
self.running = False
self.thread: Optional[threading.Thread] = None
self.results: List[Dict] = []
self.lock = threading.Lock()
def send_inference_request(self, prompt: str = "Hello, world!") -> Dict:
"""
Send a single inference request and return result.
Args:
prompt: Text prompt for inference
Returns:
Dict with keys: success, status_code, latency, timestamp, error
"""
try:
start_time = time.time()
response = requests.post(
self.endpoint,
json={
"model": self.model_name,
"prompt": prompt,
"max_tokens": 50,
"temperature": 0.7,
},
timeout=self.timeout,
)
latency = time.time() - start_time
return {
"success": response.status_code == 200,
"status_code": response.status_code,
"latency": latency,
"timestamp": time.time(),
"error": None if response.status_code == 200 else response.text[:200],
}
except requests.exceptions.Timeout:
return {
"success": False,
"status_code": None,
"latency": self.timeout,
"timestamp": time.time(),
"error": "Request timeout",
}
except Exception as e:
return {
"success": False,
"status_code": None,
"latency": time.time() - start_time if "start_time" in locals() else 0,
"timestamp": time.time(),
"error": str(e)[:200],
}
def _load_loop(self, interval: float = 2.0):
"""Background loop sending requests at specified interval."""
while self.running:
result = self.send_inference_request()
with self.lock:
self.results.append(result)
time.sleep(interval)
def start(self, interval: float = 2.0):
"""
Start sending inference requests in background.
Args:
interval: Seconds between requests (default: 2.0)
"""
if self.running:
return
self.running = True
self.results = []
self.thread = threading.Thread(
target=self._load_loop, args=(interval,), daemon=True
)
self.thread.start()
def stop(self) -> List[Dict]:
"""
Stop sending requests and return results.
Returns:
List of all request results
"""
self.running = False
if self.thread:
self.thread.join(timeout=5)
with self.lock:
return self.results.copy()
def get_stats(self) -> Dict:
"""
Get statistics for current results.
Returns:
Dict with keys: total, success, failed, success_rate, avg_latency, errors
"""
with self.lock:
if not self.results:
return {
"total": 0,
"success": 0,
"failed": 0,
"success_rate": 0.0,
"avg_latency": 0.0,
"errors": [],
}
total = len(self.results)
success = sum(1 for r in self.results if r["success"])
failed = total - success
avg_latency = sum(r["latency"] for r in self.results if r["success"]) / max(
success, 1
)
return {
"total": total,
"success": success,
"failed": failed,
"success_rate": (success / total) * 100,
"avg_latency": avg_latency,
"errors": [r["error"] for r in self.results if r["error"]][:5],
}
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
#
# SPDX-License-Identifier: Apache-2.0
#
"""
Kubernetes operations helpers for fault tolerance testing.
Provides utilities for managing nodes, pods, and deployments
during fault injection scenarios.
"""
import logging
import time
from typing import Dict, List, Optional
from kubernetes import client
from kubernetes.client.rest import ApiException
logger = logging.getLogger(__name__)
class NodeOperations:
"""Helper for node-level Kubernetes operations."""
def __init__(self, k8s_core: client.CoreV1Api):
"""
Initialize node operations helper.
Args:
k8s_core: Kubernetes CoreV1Api client
"""
self.k8s_core = k8s_core
# Track original schedulable state for each node before cordoning
self._original_node_states: Dict[str, bool] = {}
def cordon_node(self, node_name: str, reason: str = "fault-injection-test") -> bool:
"""
Cordon a node (make it unschedulable).
Stores the node's original schedulable state before cordoning,
which can be restored later with uncordon_node.
Args:
node_name: Name of the node to cordon
reason: Reason for cordoning (used in label)
Returns:
True if successful, False otherwise
"""
try:
# Read and store the original state before cordoning
node = self.k8s_core.read_node(node_name)
original_unschedulable = node.spec.unschedulable or False
self._original_node_states[node_name] = original_unschedulable
self.k8s_core.patch_node(
node_name,
{
"spec": {"unschedulable": True},
"metadata": {
"labels": {
"test.fault-injection/cordoned": "true",
"test.fault-injection/reason": reason,
}
},
},
)
# Verify cordon took effect
node = self.k8s_core.read_node(node_name)
return node.spec.unschedulable is True
except Exception as e:
logger.error(f"Failed to cordon node {node_name}: {e}")
return False
def uncordon_node(
self, node_name: str, restore_previous_state: bool = False
) -> bool:
"""
Uncordon a node (make it schedulable again).
Args:
node_name: Name of the node to uncordon
restore_previous_state: If True, restore the node to its original
schedulable state before cordoning. If False (default), always
make the node schedulable (unschedulable=False).
Returns:
True if successful, False otherwise
"""
try:
# Determine the target unschedulable state
if restore_previous_state and node_name in self._original_node_states:
# Restore to the state before we cordoned it
target_unschedulable = self._original_node_states[node_name]
# Clean up stored state
del self._original_node_states[node_name]
else:
# Default behavior: make node schedulable
target_unschedulable = False
self.k8s_core.patch_node(
node_name,
{
"spec": {"unschedulable": target_unschedulable},
"metadata": {
"labels": {
"test.fault-injection/cordoned": None,
"test.fault-injection/reason": None,
}
},
},
)
return True
except Exception as e:
logger.error(f"Failed to uncordon node {node_name}: {e}")
return False
def is_node_cordoned(self, node_name: str) -> bool:
"""Check if a node is cordoned (unschedulable)."""
try:
node = self.k8s_core.read_node(node_name)
return node.spec.unschedulable or False
except Exception:
return False
def restart_gpu_driver(self, node_name: str, wait_timeout: int = 300) -> bool:
"""
Restart the NVIDIA GPU driver pod on a specific node.
Simulates NVSentinel fault-remediation-module behavior:
- Reference: fault-remediation-module/pkg/reconciler/reconciler.go:184-232
- Action: COMPONENT_RESET for XID 79
Args:
node_name: Name of the node to restart GPU driver on
wait_timeout: Max seconds to wait for driver to be ready (default: 300)
Returns:
True if driver restart succeeded, False otherwise
"""
try:
# Find the nvidia-driver-daemonset pod on this node
pods = self.k8s_core.list_namespaced_pod(
namespace="gpu-operator", label_selector="app=nvidia-driver-daemonset"
)
target_pod = None
for pod in pods.items:
if pod.spec.node_name == node_name:
target_pod = pod.metadata.name
break
if not target_pod:
logger.error(f"No GPU driver pod found on node {node_name}")
return False
logger.info(f"Found driver pod: {target_pod}")
# Get the current pod's creation timestamp before deletion
old_pod = self.k8s_core.read_namespaced_pod(
name=target_pod, namespace="gpu-operator"
)
old_creation_time = old_pod.metadata.creation_timestamp
# Delete the pod to force restart
logger.info("Deleting pod to trigger restart...")
self.k8s_core.delete_namespaced_pod(
name=target_pod, namespace="gpu-operator", grace_period_seconds=0
)
# Wait for new pod to be ready
logger.info(
f"Waiting for new driver pod to be ready (max {wait_timeout}s)..."
)
start_time = time.time()
while time.time() - start_time < wait_timeout:
try:
# List pods again since DaemonSet creates a new pod with a different name
pods = self.k8s_core.list_namespaced_pod(
namespace="gpu-operator",
label_selector="app=nvidia-driver-daemonset",
)
# Find the new pod on this node
pod = None
for p in pods.items:
if (
p.spec.node_name == node_name
and p.metadata.creation_timestamp > old_creation_time
):
pod = p
break
if not pod:
# New pod not yet created
time.sleep(5)
continue
# Check if pod is ready
if pod.status.phase == "Running":
# Check all containers are ready
all_ready = True
if pod.status.container_statuses:
for container in pod.status.container_statuses:
if not container.ready:
all_ready = False
break
if all_ready:
elapsed = int(time.time() - start_time)
logger.info(
f"New driver pod ready: {pod.metadata.name} (took {elapsed}s)"
)
# Wait a bit more for GPU initialization
logger.info(
"Waiting additional 30s for GPU initialization..."
)
time.sleep(30)
logger.info("GPU driver restarted successfully")
return True
except Exception:
pass
time.sleep(5)
logger.error(f"GPU driver pod did not become ready within {wait_timeout}s")
return False
except Exception as e:
logger.error(f"Failed to restart GPU driver: {e}")
return False
class PodOperations:
"""Helper for pod-level Kubernetes operations."""
def __init__(self, k8s_core: client.CoreV1Api):
"""
Initialize pod operations helper.
Args:
k8s_core: Kubernetes CoreV1Api client
"""
self.k8s_core = k8s_core
def drain_pods(
self, namespace: str, label_selector: str, node_name: Optional[str] = None
) -> int:
"""
Drain (delete) pods matching selector, optionally filtered by node.
Simulates NVSentinel node-drainer-module behavior:
- Reference: node-drainer-module/pkg/informers/informers.go:471-535
Args:
namespace: Kubernetes namespace
label_selector: Label selector for pods to drain
node_name: If provided, only drain pods on this node
Returns:
Number of pods successfully drained
"""
try:
# Build field selector
field_selector = f"spec.nodeName={node_name}" if node_name else None
# Get pods to drain
pods = self.k8s_core.list_namespaced_pod(
namespace=namespace,
label_selector=label_selector,
field_selector=field_selector,
)
drained_count = 0
for pod in pods.items:
try:
self.k8s_core.delete_namespaced_pod(
name=pod.metadata.name,
namespace=namespace,
grace_period_seconds=0,
)
logger.info(f"Evicted: {pod.metadata.name}")
drained_count += 1
except ApiException as e:
if e.status != 404: # Ignore if already deleted
logger.warning(f"Failed to evict {pod.metadata.name}: {e}")
return drained_count
except Exception as e:
logger.error(f"Failed to drain pods: {e}")
return 0
def get_pod_distribution(
self, namespace: str, label_selector: str
) -> Dict[str, int]:
"""
Get distribution of pods across nodes.
Args:
namespace: Kubernetes namespace
label_selector: Label selector for pods
Returns:
Dict mapping node names to pod counts
"""
try:
pods = self.k8s_core.list_namespaced_pod(
namespace=namespace, label_selector=label_selector
)
distribution: Dict[str, int] = {}
for pod in pods.items:
if pod.status.phase == "Running":
node = pod.spec.node_name
distribution[node] = distribution.get(node, 0) + 1
return distribution
except Exception as e:
logger.error(f"Failed to get pod distribution: {e}")
return {}
def wait_for_pods_ready(
self,
namespace: str,
label_selector: str,
expected_count: int,
timeout: int = 900,
exclude_node: Optional[str] = None,
) -> bool:
"""
Wait for pods to become ready.
Args:
namespace: Kubernetes namespace
label_selector: Label selector for pods
expected_count: Expected number of ready pods
timeout: Max seconds to wait (default: 900 = 15 minutes)
exclude_node: If provided, only count pods NOT on this node
Returns:
True if expected pods became ready, False if timeout
"""
start_time = time.time()
while time.time() - start_time < timeout:
try:
pods = self.k8s_core.list_namespaced_pod(
namespace=namespace, label_selector=label_selector
)
ready_count = 0
for pod in pods.items:
# Skip if on excluded node
if exclude_node and pod.spec.node_name == exclude_node:
continue
# Check if ready (all containers must be ready)
if pod.status.phase == "Running" and pod.status.container_statuses:
# Check all containers are ready
all_ready = all(
container.ready
for container in pod.status.container_statuses
)
if all_ready:
ready_count += 1
elapsed = int(time.time() - start_time)
logger.debug(f"{elapsed}s: {ready_count}/{expected_count} ready")
if ready_count >= expected_count:
logger.info(f"All {expected_count} pods ready after {elapsed}s!")
return True
except Exception as e:
logger.warning(f"Error checking pods: {e}")
time.sleep(30)
return False
def get_pod_status_details(
self, namespace: str, label_selector: str, node_name: Optional[str] = None
) -> List[Dict]:
"""
Get detailed status for each pod.
Args:
namespace: Kubernetes namespace
label_selector: Label selector for pods
node_name: If provided, only get pods on this node
Returns:
List of dicts with keys: name (pod name), node (node name), and state (pod state)
"""
try:
field_selector = f"spec.nodeName={node_name}" if node_name else None
pods = self.k8s_core.list_namespaced_pod(
namespace=namespace,
label_selector=label_selector,
field_selector=field_selector,
)
details = []
for pod in pods.items:
pod_name = pod.metadata.name
node = pod.spec.node_name
if pod.status.container_statuses:
cs = pod.status.container_statuses[0]
if cs.state.waiting:
state = cs.state.waiting.reason
elif cs.state.terminated:
state = f"Terminated ({cs.state.terminated.reason})"
elif cs.state.running:
state = "Running"
else:
state = "Unknown"
else:
state = f"{pod.status.phase} (no container status)"
details.append({"name": pod_name, "node": node, "state": state})
return details
except Exception as e:
logger.error(f"Failed to get pod status details: {e}")
return []
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment