Unverified Commit 78934278 authored by nv-oviya's avatar nv-oviya Committed by GitHub
Browse files

feat(fault-injection): Add CUDA fault injection library foundation (#4038)


Signed-off-by: default avatarnv-oviya <oseeniraj@nvidia.com>
Co-authored-by: default avatarcoderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
parent 2e5e68b4
# Compiled library
*.so
*.o
*.a
# Build artifacts
*.dylib
*.dll
# Editor files
*.swp
*~
.DS_Store
# Test outputs
test_output.txt
# Makefile for CUDA Intercept Library
# Simulates various XID errors via LD_PRELOAD
#
# GCC Version: Requires gcc 4.8+ (any modern gcc works)
# Tested with gcc 7.5, 9.4, 11.x, 13.x
# Uses standard C99 features only
.PHONY: all clean test help
# Compiler settings
CC = gcc
CFLAGS = -fPIC -Wall -Wextra -std=c99
LDFLAGS = -shared
LDLIBS = -ldl
TARGET = cuda_intercept.so
SOURCE = cuda_intercept.c
all: $(TARGET)
$(TARGET): $(SOURCE)
@echo "Building CUDA fault injection library..."
$(CC) $(CFLAGS) $(SOURCE) $(LDFLAGS) -o $(TARGET) $(LDLIBS)
@echo "✓ Built: $(TARGET)"
@echo ""
@echo "Usage:"
@echo " LD_PRELOAD=./$(TARGET) python -c 'import torch; print(torch.cuda.is_available())'"
@echo ""
clean:
@echo "Cleaning..."
rm -f $(TARGET)
@echo "✓ Cleaned"
test: $(TARGET)
@echo "Testing CUDA fault injection..."
@echo ""
@echo "Test 1: With fault injection enabled (default)"
@CUDA_FAULT_INJECTION_ENABLED=1 LD_PRELOAD=./$(TARGET) python3 -c 'import torch; print("CUDA available:", torch.cuda.is_available()); print("Device count:", torch.cuda.device_count())' || echo "✓ Expected failure"
@echo ""
@echo "Test 2: With fault injection disabled"
@CUDA_FAULT_INJECTION_ENABLED=0 LD_PRELOAD=./$(TARGET) python3 -c 'import torch; print("CUDA available:", torch.cuda.is_available()); print("Device count:", torch.cuda.device_count())'
@echo ""
help:
@echo "CUDA Fault Injection Library - Makefile"
@echo ""
@echo "Targets:"
@echo " make - Build the library"
@echo " make test - Run basic tests"
@echo " make clean - Remove built files"
@echo " make help - Show this help"
@echo ""
@echo "Usage in tests:"
@echo " export LD_PRELOAD=/path/to/fake_cuda_xid79.so"
@echo " export CUDA_FAULT_INJECTION_ENABLED=1"
@echo " python -m vllm.entrypoints.api_server"
# CUDA Fault Injection - Test Library
**Purpose**: Safely simulate GPU failures (XID errors) in tests without breaking real hardware.
> **⚠️ Note**: This directory contains the **C library source code** only. The library is **compiled in-pod** during Kubernetes tests for Linux compatibility. You do **not** need to build it locally unless doing standalone local testing.
## What This Does
Makes CUDA calls return error codes to simulate various GPU failures. Uses LD_PRELOAD to intercept CUDA library calls.
```
Pod calls cudaMalloc() → LD_PRELOAD intercepts → Returns error → Pod crashes
```
**Result**: Realistic GPU failure testing without hardware damage.
## Scope
This library simulates **software/orchestration-level failures** that occur when GPU hardware becomes inaccessible or unusable:
-**In scope**: CUDA API failures due to GPU becoming unavailable (XID errors, device not found, ECC errors)
-**Use case**: Testing Kubernetes pod rescheduling, inference failover, recovery orchestration
-**Out of scope**: Bit-level Silent Data Corruption (SDC), compute errors, incorrect results
-**Not modeled**: General GPU faulting phenomena at the computation/memory level
**Note**: SDC detection mechanisms will not trigger with this approach, as we intercept at the CUDA API layer, not at the hardware/computation layer.
## Supported XID Errors
| XID | Description | CUDA Error | Use Case |
|-----|-------------|------------|----------|
| **79** | GPU fell off bus | `CUDA_ERROR_NO_DEVICE` | Most common, node-level failure |
| **48** | Double-bit ECC error | `CUDA_ERROR_ECC_UNCORRECTABLE` | Memory corruption |
| **94** | Contained ECC error | `CUDA_ERROR_ECC_UNCORRECTABLE` | Recoverable memory error |
| **95** | Uncontained error | `CUDA_ERROR_UNKNOWN` | Fatal GPU error |
| **43** | GPU stopped responding | `CUDA_ERROR_LAUNCH_TIMEOUT` | Hung kernel |
| **74** | NVLink error | `CUDA_ERROR_PEER_ACCESS_UNSUPPORTED` | Multi-GPU communication failure |
## Files in This Directory
| File | Purpose |
|------|---------|
| `cuda_intercept.c` | C library source that intercepts CUDA calls |
| `inject_into_pods.py` | Helper functions for patching Kubernetes deployments |
| `Makefile` | Builds the `.so` library locally (optional, for standalone testing) |
## Prerequisites
- **gcc compiler** (for building the library)
- **kubectl** with cluster access
- Python packages: `kubernetes`, `requests`
- No local compilation needed (compiled in-pod)
### For Standalone Local Testing (Optional)
- **gcc** (version 7.5+ recommended, any modern gcc works)
- **CUDA development headers** (optional, uses runtime API only)
## Writing Your Own Test
### Import Helper Functions
```python
import sys
from pathlib import Path
# Add cuda-fault-injection to path
cuda_injection_dir = Path(__file__).parent.parent / "cuda-fault-injection"
sys.path.insert(0, str(cuda_injection_dir))
from inject_into_pods import (
create_cuda_fault_configmap, # Step 1: Create ConfigMap with library source
patch_deployment_env, # Step 2: Patch deployment to use it
delete_cuda_fault_configmap # Cleanup: Remove ConfigMap
)
```
\ No newline at end of file
/*
* CUDA Intercept Library
*
* This library intercepts CUDA calls and returns appropriate error codes
* to simulate various GPU failures (XIDs).
*
* Supported XID types (set via CUDA_XID_TYPE environment variable):
* 79 - GPU fell off bus (CUDA_ERROR_NO_DEVICE) - DEFAULT
* 48 - Double-bit ECC error (CUDA_ERROR_ECC_UNCORRECTABLE)
* 94 - Contained ECC error (CUDA_ERROR_ECC_UNCORRECTABLE)
* 95 - Uncontained error (CUDA_ERROR_UNKNOWN)
* 43 - GPU stopped responding (CUDA_ERROR_LAUNCH_TIMEOUT)
* 74 - NVLink error (CUDA_ERROR_PEER_ACCESS_UNSUPPORTED)
*
* Compile:
* gcc -shared -fPIC -ldl cuda_intercept.c -o cuda_intercept.so
*
* Use:
* export CUDA_FAULT_INJECTION_ENABLED=1
* export CUDA_XID_TYPE=79 # or 48, 94, 95, 43, 74
* LD_PRELOAD=/path/to/cuda_intercept.so python -m vllm.entrypoints.api_server
*/
#include <dlfcn.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
typedef int cudaError_t;
typedef struct cudaDeviceProp_st {
char name[256];
size_t totalGlobalMem;
// ... other fields (we don't need them)
} cudaDeviceProp;
// CUDA error codes (from cuda_runtime_api.h)
#define cudaSuccess 0
#define cudaErrorNoDevice 100 // XID 79: GPU fell off bus
#define cudaErrorEccUncorrectable 214 // XID 48, 94: ECC errors
#define cudaErrorUnknown 999 // XID 95: Uncontained error
#define cudaErrorLaunchTimeout 6 // XID 43: GPU stopped responding
#define cudaErrorPeerAccessUnsupported 217 // XID 74: NVLink error
// XID error type mapping
typedef struct {
int xid;
cudaError_t cuda_error;
const char* description;
} xid_mapping_t;
static const xid_mapping_t xid_mappings[] = {
{79, cudaErrorNoDevice, "GPU fell off bus"},
{48, cudaErrorEccUncorrectable, "Double-bit ECC error"},
{94, cudaErrorEccUncorrectable, "Contained ECC error"},
{95, cudaErrorUnknown, "Uncontained error"},
{43, cudaErrorLaunchTimeout, "GPU stopped responding"},
{74, cudaErrorPeerAccessUnsupported, "NVLink error"},
{0, 0, NULL} // Sentinel
};
// Get XID type and corresponding CUDA error
static void
get_fault_config(int* inject, int* xid_type, cudaError_t* error_code)
{
static int initialized = 0;
static int cached_inject = 0;
static int cached_xid = 79; // Default to XID 79
static cudaError_t cached_error = cudaErrorNoDevice;
if (!initialized) {
// Check if injection is enabled
char* env = getenv("CUDA_FAULT_INJECTION_ENABLED");
if (env) {
cached_inject = (strcmp(env, "1") == 0 || strcmp(env, "true") == 0);
}
// Get XID type
char* xid_env = getenv("CUDA_XID_TYPE");
if (xid_env) {
cached_xid = atoi(xid_env);
// Find corresponding CUDA error
int found = 0;
for (int i = 0; xid_mappings[i].description != NULL; i++) {
if (xid_mappings[i].xid == cached_xid) {
cached_error = xid_mappings[i].cuda_error;
fprintf(
stderr, "[CUDA FAULT INJECTION] ENABLED - Simulating XID %d (%s)\n", cached_xid,
xid_mappings[i].description);
found = 1;
break;
}
}
if (!found) {
fprintf(stderr, "[CUDA FAULT INJECTION] WARNING: Unknown XID %d, defaulting to XID 79\n", cached_xid);
cached_xid = 79;
cached_error = cudaErrorNoDevice;
}
} else {
fprintf(
stderr, "[CUDA FAULT INJECTION] %s (default: XID 79 - GPU fell off bus)\n",
cached_inject ? "ENABLED" : "DISABLED");
}
initialized = 1;
}
*inject = cached_inject;
*xid_type = cached_xid;
*error_code = cached_error;
}
// Check if fault should be injected
static int
should_inject_fault()
{
int inject, xid;
cudaError_t error;
get_fault_config(&inject, &xid, &error);
return inject;
}
// Get the error code to return
static cudaError_t
get_error_code()
{
int inject, xid;
cudaError_t error;
get_fault_config(&inject, &xid, &error);
return error;
}
// Log helper
static void
log_intercept(const char* func_name, cudaError_t error_code)
{
if (should_inject_fault()) {
int inject, xid;
cudaError_t err;
get_fault_config(&inject, &xid, &err);
fprintf(stderr, "[XID %d SIM] %s() intercepted -> error %d\n", xid, func_name, error_code);
}
}
// Intercept: Get device count
cudaError_t
cudaGetDeviceCount(int* count)
{
if (should_inject_fault()) {
cudaError_t error = get_error_code();
log_intercept("cudaGetDeviceCount", error);
if (count)
*count = 0;
return error;
}
// If disabled, call real function
typedef cudaError_t (*real_func_t)(int*);
real_func_t real_func = (real_func_t)dlsym(RTLD_NEXT, "cudaGetDeviceCount");
if (real_func) {
return real_func(count);
}
return cudaErrorNoDevice;
}
// Intercept: Set device
cudaError_t
cudaSetDevice(int device)
{
if (should_inject_fault()) {
cudaError_t error = get_error_code();
log_intercept("cudaSetDevice", error);
return error;
}
typedef cudaError_t (*real_func_t)(int);
real_func_t real_func = (real_func_t)dlsym(RTLD_NEXT, "cudaSetDevice");
if (real_func) {
return real_func(device);
}
return cudaErrorNoDevice;
}
// Intercept: Get device
cudaError_t
cudaGetDevice(int* device)
{
if (should_inject_fault()) {
cudaError_t error = get_error_code();
log_intercept("cudaGetDevice", error);
return error;
}
typedef cudaError_t (*real_func_t)(int*);
real_func_t real_func = (real_func_t)dlsym(RTLD_NEXT, "cudaGetDevice");
if (real_func) {
return real_func(device);
}
return cudaErrorNoDevice;
}
// Intercept: Malloc
cudaError_t
cudaMalloc(void** devPtr, size_t size)
{
if (should_inject_fault()) {
cudaError_t error = get_error_code();
log_intercept("cudaMalloc", error);
return error;
}
typedef cudaError_t (*real_func_t)(void**, size_t);
real_func_t real_func = (real_func_t)dlsym(RTLD_NEXT, "cudaMalloc");
if (real_func) {
return real_func(devPtr, size);
}
return cudaErrorNoDevice;
}
// Intercept: Free
cudaError_t
cudaFree(void* devPtr)
{
if (should_inject_fault()) {
cudaError_t error = get_error_code();
log_intercept("cudaFree", error);
return error;
}
typedef cudaError_t (*real_func_t)(void*);
real_func_t real_func = (real_func_t)dlsym(RTLD_NEXT, "cudaFree");
if (real_func) {
return real_func(devPtr);
}
return cudaErrorNoDevice;
}
// Intercept: Memcpy
cudaError_t
cudaMemcpy(void* dst, const void* src, size_t count, int kind)
{
if (should_inject_fault()) {
cudaError_t error = get_error_code();
log_intercept("cudaMemcpy", error);
return error;
}
typedef cudaError_t (*real_func_t)(void*, const void*, size_t, int);
real_func_t real_func = (real_func_t)dlsym(RTLD_NEXT, "cudaMemcpy");
if (real_func) {
return real_func(dst, src, count, kind);
}
return cudaErrorNoDevice;
}
// Intercept: Device synchronize
cudaError_t
cudaDeviceSynchronize(void)
{
if (should_inject_fault()) {
cudaError_t error = get_error_code();
log_intercept("cudaDeviceSynchronize", error);
return error;
}
typedef cudaError_t (*real_func_t)(void);
real_func_t real_func = (real_func_t)dlsym(RTLD_NEXT, "cudaDeviceSynchronize");
if (real_func) {
return real_func();
}
return cudaErrorNoDevice;
}
// Intercept: Get device properties
cudaError_t
cudaGetDeviceProperties(cudaDeviceProp* prop, int device)
{
if (should_inject_fault()) {
cudaError_t error = get_error_code();
log_intercept("cudaGetDeviceProperties", error);
return error;
}
typedef cudaError_t (*real_func_t)(cudaDeviceProp*, int);
real_func_t real_func = (real_func_t)dlsym(RTLD_NEXT, "cudaGetDeviceProperties");
if (real_func) {
return real_func(prop, device);
}
return cudaErrorNoDevice;
}
#!/usr/bin/env python3
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
#
# SPDX-License-Identifier: Apache-2.0
#
"""
Inject CUDA Intercept Library into Running vLLM Pods
This script:
1. Copies cuda_intercept.so into target pods
2. Restarts pods with LD_PRELOAD environment variable set
3. Monitors pod crash/restart behavior
Usage:
# Inject into all vLLM worker pods
python inject_into_pods.py --deployment vllm-v1-disagg-router --namespace dynamo-oviya
# Inject into specific node
python inject_into_pods.py --deployment vllm-v1-disagg-router --node aks-a100a-36888584-vmss000003
# Remove injection (restore normal operation)
python inject_into_pods.py --deployment vllm-v1-disagg-router --remove
"""
import argparse
import os
import subprocess
import sys
import time
from pathlib import Path
from kubernetes import client, config
from kubernetes.client.rest import ApiException
def get_library_path():
"""Get path to the compiled library."""
script_dir = Path(__file__).parent
lib_path = script_dir / "cuda_intercept.so"
if not lib_path.exists():
print(f" Library not found: {lib_path}")
print(" Run: make")
sys.exit(1)
return lib_path
def copy_library_to_pod(
pod_name, namespace, lib_path, target_path="/tmp/cuda_intercept.so"
):
"""Copy library to pod using kubectl cp."""
print(f"[→] Copying library to {pod_name}...")
cmd = ["kubectl", "cp", str(lib_path), f"{namespace}/{pod_name}:{target_path}"]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
print(f" ⚠ Failed to copy: {result.stderr}")
return False
print(f" ✓ Copied to {target_path}")
return True
def create_cuda_fault_configmap(namespace, lib_path=None):
"""Create ConfigMap with CUDA fault injection library source code."""
if lib_path is None:
lib_path = get_library_path()
print("[→] Creating ConfigMap with CUDA fault library source...")
try:
# Get the source file path
lib_dir = os.path.dirname(lib_path)
source_file = os.path.join(lib_dir, "cuda_intercept.c")
if not os.path.exists(source_file):
print(f" Source file not found: {source_file}")
return False
# Read source code
with open(source_file, "r") as f:
source_code = f.read()
print(f" → Source code size: {len(source_code)} bytes")
# Create ConfigMap with source code
# We'll compile it in the init container to ensure Linux compatibility
core_api = client.CoreV1Api()
configmap = client.V1ConfigMap(
metadata=client.V1ObjectMeta(name="cuda-fault-injection-lib"),
data={"cuda_intercept.c": source_code},
)
# Delete existing if present
try:
core_api.delete_namespaced_config_map("cuda-fault-injection-lib", namespace)
print(" → Deleted existing ConfigMap")
time.sleep(2)
except ApiException as e:
if e.status != 404:
print(f" ⚠ Failed to delete existing ConfigMap: {e}")
# Create new
core_api.create_namespaced_config_map(namespace, configmap)
print("[✓] ConfigMap created: cuda-fault-injection-lib")
print(" → Source code will be compiled in init container (Linux-compatible)")
return True
except Exception as e:
print(f" Failed to create ConfigMap: {e}")
import traceback
traceback.print_exc()
return False
def delete_cuda_fault_configmap(namespace):
"""Delete the CUDA fault injection ConfigMap."""
try:
core_api = client.CoreV1Api()
core_api.delete_namespaced_config_map("cuda-fault-injection-lib", namespace)
print("[✓] ConfigMap deleted: cuda-fault-injection-lib")
return True
except ApiException as e:
if e.status == 404:
# ConfigMap doesn't exist - treat as success
print(" ℹ ConfigMap not found (already deleted or never created)")
return True
else:
print(f" ⚠ Failed to delete ConfigMap: {e.reason}")
return False
except Exception as e:
print(f" ⚠ Unexpected error deleting ConfigMap: {e}")
return False
def _patch_service_for_injection(
service, enable, new_envs, use_configmap, target_node, lib_path
):
"""Apply injection patches to a single service.
Args:
service: The service object to patch
enable: Whether to enable or disable injection
new_envs: List of environment variables to add when enabling
use_configmap: Whether to use ConfigMap for library distribution
target_node: Node to pin pods to (or None)
lib_path: Path to the library in the container
"""
# Ensure extraPodSpec exists
if "extraPodSpec" not in service:
service["extraPodSpec"] = {}
if "mainContainer" not in service["extraPodSpec"]:
service["extraPodSpec"]["mainContainer"] = {}
if "env" not in service["extraPodSpec"]["mainContainer"]:
service["extraPodSpec"]["mainContainer"]["env"] = []
# Remove existing LD_PRELOAD, CUDA_FAULT_INJECTION_ENABLED, and CUDA_XID_TYPE if present
service["extraPodSpec"]["mainContainer"]["env"] = [
env
for env in service["extraPodSpec"]["mainContainer"]["env"]
if env.get("name")
not in [
"LD_PRELOAD",
"CUDA_FAULT_INJECTION_ENABLED",
"CUDA_XID_TYPE",
]
]
# Add new environment variables if enabling
if enable:
service["extraPodSpec"]["mainContainer"]["env"].extend(new_envs)
# Handle ConfigMap volume mount
if use_configmap and enable:
# Add emptyDir volume for decoded library
if "volumes" not in service["extraPodSpec"]:
service["extraPodSpec"]["volumes"] = []
# Remove existing volumes if present
service["extraPodSpec"]["volumes"] = [
v
for v in service["extraPodSpec"]["volumes"]
if v.get("name") not in ["cuda-fault-lib-source", "cuda-fault-lib"]
]
# Add ConfigMap volume (source - base64 encoded)
service["extraPodSpec"]["volumes"].append(
{
"name": "cuda-fault-lib-source",
"configMap": {"name": "cuda-fault-injection-lib"},
}
)
# Add emptyDir volume (destination - decoded binary)
service["extraPodSpec"]["volumes"].append(
{"name": "cuda-fault-lib", "emptyDir": {}}
)
# Add init container to decode base64
if "initContainers" not in service["extraPodSpec"]:
service["extraPodSpec"]["initContainers"] = []
# Remove existing init container if present
service["extraPodSpec"]["initContainers"] = [
ic
for ic in service["extraPodSpec"]["initContainers"]
if ic.get("name") not in ["decode-cuda-fault-lib", "compile-cuda-fault-lib"]
]
# Add init container to compile the library
service["extraPodSpec"]["initContainers"].append(
{
"name": "compile-cuda-fault-lib",
"image": "gcc:latest",
"command": ["sh", "-c"],
"args": [
"gcc -shared -fPIC -Wall -Wextra /source/cuda_intercept.c -o /dest/cuda_intercept.so -ldl && "
"chmod 755 /dest/cuda_intercept.so && "
"echo 'Compiled CUDA fault library for Linux' && "
"ls -lh /dest/cuda_intercept.so && "
"file /dest/cuda_intercept.so"
],
"volumeMounts": [
{
"name": "cuda-fault-lib-source",
"mountPath": "/source",
"readOnly": True,
},
{"name": "cuda-fault-lib", "mountPath": "/dest"},
],
}
)
# Add volume mount to main container
if "volumeMounts" not in service["extraPodSpec"]["mainContainer"]:
service["extraPodSpec"]["mainContainer"]["volumeMounts"] = []
# Remove existing mount if present
service["extraPodSpec"]["mainContainer"]["volumeMounts"] = [
vm
for vm in service["extraPodSpec"]["mainContainer"]["volumeMounts"]
if vm.get("name") != "cuda-fault-lib"
]
# Add mount
service["extraPodSpec"]["mainContainer"]["volumeMounts"].append(
{
"name": "cuda-fault-lib",
"mountPath": "/cuda-fault",
"readOnly": True,
}
)
print(" ✓ Added init container to compile library")
print(" ✓ Added ConfigMap volume mount")
# Add node affinity to pin pods to target node (simulates real XID 79 behavior)
if target_node and enable:
if "affinity" not in service["extraPodSpec"]:
service["extraPodSpec"]["affinity"] = {}
service["extraPodSpec"]["affinity"]["nodeAffinity"] = {
"requiredDuringSchedulingIgnoredDuringExecution": {
"nodeSelectorTerms": [
{
"matchExpressions": [
{
"key": "kubernetes.io/hostname",
"operator": "In",
"values": [target_node],
}
]
}
]
}
}
print(f" ✓ Added node affinity to pin pods to {target_node}")
elif not enable:
# Remove ConfigMap volume and mount when disabling
if "volumes" in service["extraPodSpec"]:
service["extraPodSpec"]["volumes"] = [
v
for v in service["extraPodSpec"]["volumes"]
if v.get("name") not in ["cuda-fault-lib", "cuda-fault-lib-source"]
]
if "volumeMounts" in service["extraPodSpec"].get("mainContainer", {}):
service["extraPodSpec"]["mainContainer"]["volumeMounts"] = [
vm
for vm in service["extraPodSpec"]["mainContainer"]["volumeMounts"]
if vm.get("name") != "cuda-fault-lib"
]
# Remove init container
if "initContainers" in service["extraPodSpec"]:
service["extraPodSpec"]["initContainers"] = [
ic
for ic in service["extraPodSpec"]["initContainers"]
if ic.get("name")
not in [
"decode-cuda-fault-lib",
"compile-cuda-fault-lib",
]
]
# Remove node affinity
# Must explicitly set to None and ensure it's in the patch
# (simply deleting the key doesn't remove it from K8s)
service["extraPodSpec"]["affinity"] = None
print(" ✓ Removed node affinity")
def patch_deployment_env(
deployment_name,
namespace,
enable=True,
use_configmap=True,
target_node=None,
xid_type=79,
):
"""Patch deployment to add/remove LD_PRELOAD environment variable.
Args:
deployment_name: Name of the deployment
namespace: Kubernetes namespace
enable: Whether to enable (True) or disable (False) CUDA fault injection
use_configmap: Whether to use ConfigMap for library distribution
target_node: If provided, adds node affinity to pin pods to this node
(simulates real XID where pods crash on the faulty node)
xid_type: XID error type to simulate (79, 48, 94, 95, 43, 74). Default: 79
"""
custom_api = client.CustomObjectsApi()
apps_api = client.AppsV1Api()
# Try DynamoGraphDeployment first
is_dgd = False
dgd = None
try:
print(f" → Attempting to get DynamoGraphDeployment: {deployment_name}")
dgd = custom_api.get_namespaced_custom_object(
group="nvidia.com",
version="v1alpha1",
namespace=namespace,
plural="dynamographdeployments",
name=deployment_name,
)
is_dgd = True
print(" ✓ Found DynamoGraphDeployment")
except ApiException as e:
if e.status == 404:
# Not a DynamoGraphDeployment, will try standard Deployment below
print(
" → DynamoGraphDeployment not found (404), will try standard Deployment"
)
is_dgd = False
else:
print(f" Failed to get DynamoGraphDeployment: {e}")
return False
except Exception as e:
print(f" Unexpected error getting DynamoGraphDeployment: {e}")
return False
if is_dgd:
# Process DynamoGraphDeployment
try:
print(" → Processing DynamoGraphDeployment...")
# Type guard: ensure dgd is not None
assert dgd is not None, "dgd should not be None when is_dgd is True"
# Determine library path based on ConfigMap usage
lib_path = (
"/cuda-fault/cuda_intercept.so"
if use_configmap
else "/tmp/cuda_intercept.so"
)
# Prepare environment variables
new_envs = []
if enable:
new_envs = [
{"name": "LD_PRELOAD", "value": lib_path},
{"name": "CUDA_FAULT_INJECTION_ENABLED", "value": "1"},
{"name": "CUDA_XID_TYPE", "value": str(xid_type)},
]
# Patch worker services (VllmDecodeWorker and VllmPrefillWorker)
services_to_patch = ["VllmDecodeWorker", "VllmPrefillWorker"]
patched_services = []
spec = dgd.get("spec", {})
services = spec.get("services", {}) if spec else {}
available_services = list(services.keys())
print(f" → Available services: {available_services}")
for service_name in services_to_patch:
if service_name in services:
print(f" → Patching service: {service_name}")
service = services[service_name]
_patch_service_for_injection(
service, enable, new_envs, use_configmap, target_node, lib_path
)
patched_services.append(service_name)
print(" → Applying patch to DynamoGraphDeployment...")
# Apply the patch with retry logic for 409 Conflicts
max_retries = 5
for attempt in range(max_retries):
try:
if attempt > 0:
print(f" → Retry attempt {attempt + 1}/{max_retries}")
time.sleep(0.5 * attempt) # Exponential backoff
# Re-fetch the latest version
dgd = custom_api.get_namespaced_custom_object(
group="nvidia.com",
version="v1alpha1",
namespace=namespace,
plural="dynamographdeployments",
name=deployment_name,
)
# Re-apply the patches to fresh copy
for service_name in patched_services:
if service_name in dgd.get("spec", {}).get("services", {}):
service = dgd["spec"]["services"][service_name]
_patch_service_for_injection(
service,
enable,
new_envs,
use_configmap,
target_node,
lib_path,
)
custom_api.patch_namespaced_custom_object(
group="nvidia.com",
version="v1alpha1",
namespace=namespace,
plural="dynamographdeployments",
name=deployment_name,
body=dgd,
)
break # Success!
except ApiException as e:
if e.status == 409 and attempt < max_retries - 1:
# Conflict - resource was modified by controller
print(" ⚠ Conflict (409) - resource modified, retrying...")
continue
else:
# Not a conflict or out of retries
raise
# Primary patch with affinity: None is sufficient
# (Removed secondary JSON patch - caused 400 BadRequest)
action = "enabled" if enable else "disabled"
print(f"[✓] DynamoGraphDeployment patched - CUDA fault injection {action}")
print(f" Services patched: {', '.join(patched_services)}")
if use_configmap and enable:
print(f" Library mounted at: {lib_path}")
return True
except ApiException as e:
print(f" Failed to patch DynamoGraphDeployment (ApiException): {e}")
print(f" Status: {e.status}")
print(f" Reason: {e.reason}")
return False
except Exception as e:
print(
f" Failed to patch DynamoGraphDeployment (Exception): {type(e).__name__}: {e}"
)
import traceback
traceback.print_exc()
return False
# Try standard Deployment
print(" → Not a DynamoGraphDeployment, trying standard Deployment...")
try:
deployment = apps_api.read_namespaced_deployment(deployment_name, namespace)
except ApiException as e:
print(f" Failed to read deployment {deployment_name}: {e}")
return False
# Find all containers and patch their env
for container in deployment.spec.template.spec.containers:
if container.env is None:
container.env = []
# Remove existing LD_PRELOAD, CUDA_FAULT_INJECTION_ENABLED, and CUDA_XID_TYPE
container.env = [
e
for e in container.env
if e.name
not in ["LD_PRELOAD", "CUDA_FAULT_INJECTION_ENABLED", "CUDA_XID_TYPE"]
]
if enable:
# Add new env vars
container.env.append(
client.V1EnvVar(name="LD_PRELOAD", value="/tmp/cuda_intercept.so")
)
container.env.append(
client.V1EnvVar(name="CUDA_FAULT_INJECTION_ENABLED", value="1")
)
container.env.append(
client.V1EnvVar(name="CUDA_XID_TYPE", value=str(xid_type))
)
try:
apps_api.patch_namespaced_deployment(deployment_name, namespace, deployment)
action = "enabled" if enable else "disabled"
print(f"[✓] Deployment patched - CUDA fault injection {action}")
return True
except ApiException as e:
print(f" Failed to patch deployment: {e}")
return False
def get_worker_pods(deployment_name, namespace, node_name=None):
"""Get worker pods for a deployment.
Supports both DynamoGraphDeployment and standard Kubernetes Deployments.
First tries DynamoGraphDeployment-specific labels, then falls back to
standard Deployment labels.
Args:
deployment_name: Name of the deployment
namespace: Kubernetes namespace
node_name: Optional node name to filter by
Returns:
List of pod objects
"""
core_api = client.CoreV1Api()
apps_api = client.AppsV1Api()
field_selector = f"spec.nodeName={node_name}" if node_name else None
# Try DynamoGraphDeployment-specific labels first
dgd_label_selector = f"nvidia.com/dynamo-graph-deployment-name={deployment_name},nvidia.com/dynamo-component-type=worker"
try:
pods = core_api.list_namespaced_pod(
namespace=namespace,
label_selector=dgd_label_selector,
field_selector=field_selector,
)
if pods.items:
return pods.items
except ApiException as e:
print(f" Warning: Failed to list pods with DynamoGraphDeployment labels: {e}")
# Fallback: Try standard Deployment
# Get the deployment to find its label selector
try:
deployment = apps_api.read_namespaced_deployment(deployment_name, namespace)
# Extract match labels from deployment spec
if deployment.spec.selector and deployment.spec.selector.match_labels:
label_parts = [
f"{k}={v}" for k, v in deployment.spec.selector.match_labels.items()
]
std_label_selector = ",".join(label_parts)
pods = core_api.list_namespaced_pod(
namespace=namespace,
label_selector=std_label_selector,
field_selector=field_selector,
)
if pods.items:
return pods.items
except ApiException as e:
print(f" Warning: Failed to list pods with standard Deployment labels: {e}")
# If both methods failed, return empty list
print(f" No pods found for deployment {deployment_name}")
return []
def inject_into_running_pods(deployment_name, namespace, node_name=None):
"""Inject library into currently running pods (temporary - lost on restart)."""
print("\n" + "=" * 80)
print("Method 1: Inject into Running Pods (Temporary)")
print("=" * 80)
print("⚠ This method is TEMPORARY - injection lost when pods restart")
print(" Use Method 2 (deployment patch) for persistent injection")
print()
lib_path = get_library_path()
pods = get_worker_pods(deployment_name, namespace, node_name)
if not pods:
print(f" No worker pods found for deployment {deployment_name}")
return False
print(f"[→] Found {len(pods)} worker pods")
success_count = 0
for pod in pods:
pod_name = pod.metadata.name
print(f"\n[→] Processing pod: {pod_name}")
# Copy library
if copy_library_to_pod(pod_name, namespace, lib_path):
success_count += 1
print(" ✓ Library injected")
print(" ⚠ To activate, pod must be restarted with LD_PRELOAD set")
else:
print(" ✗ Failed to inject")
print(f"\n[→] Injected into {success_count}/{len(pods)} pods")
print("\n⚠ IMPORTANT: Pods must be restarted with LD_PRELOAD environment variable!")
print(" Use Method 2 (--patch-deployment) to set LD_PRELOAD and restart pods")
return success_count > 0
def inject_via_deployment_patch(
deployment_name, namespace, target_node=None, xid_type=79
):
"""Inject by patching deployment and rolling restart (persistent).
Args:
deployment_name: Name of the deployment
namespace: Kubernetes namespace
target_node: Optional node to pin pods to (simulates real XID behavior)
xid_type: XID error type to simulate (79, 48, 94, 95, 43, 74)
"""
print("\n" + "=" * 80)
print("Method 2: Patch Deployment (Persistent)")
print("=" * 80)
print("This method:")
print(" 1. Creates ConfigMap with library source")
print(" 2. Copies library to existing pods")
print(" 3. Patches deployment to set LD_PRELOAD")
print(" 4. Triggers rolling restart")
print(" 5. New pods will have CUDA fault injection enabled")
print()
lib_path = get_library_path()
# Step 1: Create ConfigMap with library source
print("[1/4] Creating ConfigMap with library source...")
if not create_cuda_fault_configmap(namespace, lib_path):
print(" Failed to create ConfigMap")
return False
# Step 2: Copy library to all current pods
print("\n[2/4] Copying library to existing pods...")
pods = get_worker_pods(deployment_name, namespace)
if not pods:
print(" No worker pods found")
return False
for pod in pods:
copy_library_to_pod(pod.metadata.name, namespace, lib_path)
# Step 3: Patch deployment
print("\n[3/4] Patching deployment to enable CUDA fault injection...")
if not patch_deployment_env(
deployment_name,
namespace,
enable=True,
target_node=target_node,
xid_type=xid_type,
):
return False
# Step 4: Trigger rolling restart
print("\n[4/4] Triggering pod restart...")
# For DynamoGraphDeployment, we need to delete pods to trigger restart
# (kubectl rollout restart doesn't work with custom resources)
core_api = client.CoreV1Api()
print(
f"[→] Deleting {len(pods)} worker pods to trigger restart with new env vars..."
)
for pod in pods:
try:
core_api.delete_namespaced_pod(
name=pod.metadata.name,
namespace=namespace,
grace_period_seconds=30, # Graceful shutdown
)
print(f" ✓ Deleted: {pod.metadata.name}")
except ApiException as e:
print(f" ✗ Failed to delete {pod.metadata.name}: {e}")
print("\n[→] Waiting for new pods to start with CUDA fault injection...")
time.sleep(10)
# Wait for pods to be recreated
max_wait = 300 # 5 minutes
start_time = time.time()
while time.time() - start_time < max_wait:
new_pods = get_worker_pods(deployment_name, namespace)
# Check if we have the expected number of pods
if len(new_pods) >= len(pods):
# Check if all pods are running
all_running = all(pod.status.phase == "Running" for pod in new_pods)
if all_running:
print(f"\n[✓] All {len(new_pods)} pods restarted successfully")
print("[✓] CUDA fault injection is now ACTIVE")
print("\n⚠ Pods will crash with 'No CUDA-capable device' error")
print(" This simulates XID 79 (GPU falls off bus)")
return True
time.sleep(5)
print("\n⚠ Timeout waiting for pods to restart")
return False
def remove_injection(deployment_name, namespace):
"""Remove CUDA fault injection from deployment."""
print("\n" + "=" * 80)
print("Removing CUDA Fault Injection")
print("=" * 80)
print("[→] Removing LD_PRELOAD from deployment...")
if not patch_deployment_env(deployment_name, namespace, enable=False):
return False
print("\n[→] Deleting ConfigMap...")
delete_cuda_fault_configmap(namespace)
print("\n[→] Deleting pods to restore normal operation...")
# Get current pods
pods = get_worker_pods(deployment_name, namespace)
if not pods:
print("⚠ No worker pods found")
return True
# Delete pods to trigger restart without fault injection
core_api = client.CoreV1Api()
for pod in pods:
try:
core_api.delete_namespaced_pod(
name=pod.metadata.name, namespace=namespace, grace_period_seconds=30
)
print(f" ✓ Deleted: {pod.metadata.name}")
except ApiException as e:
print(f" ✗ Failed to delete {pod.metadata.name}: {e}")
print("[✓] Injection removed - pods will restart normally")
return True
def verify_injection(deployment_name, namespace):
"""Verify that CUDA fault injection is active in pods."""
print("\n" + "=" * 80)
print("Verifying CUDA Fault Injection")
print("=" * 80)
pods = get_worker_pods(deployment_name, namespace)
if not pods:
print(" No pods found")
return
for pod in pods[:3]: # Check first 3 pods
pod_name = pod.metadata.name
print(f"\n[→] Checking pod: {pod_name}")
# Check if library exists (try both /tmp and /cuda-fault paths)
found_lib = False
for lib_path in ["/tmp/cuda_intercept.so", "/cuda-fault/cuda_intercept.so"]:
cmd = [
"kubectl",
"exec",
"-n",
namespace,
pod_name,
"--",
"ls",
"-lh",
lib_path,
]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0:
print(f" ✓ Library present at {lib_path}: {result.stdout.strip()}")
found_lib = True
break
if not found_lib:
print(" ✗ Library not found at /tmp or /cuda-fault")
# Check environment variables (check for both paths)
cmd = ["kubectl", "exec", "-n", namespace, pod_name, "--", "env"]
result = subprocess.run(cmd, capture_output=True, text=True)
if (
"LD_PRELOAD=/tmp/cuda_intercept.so" in result.stdout
or "LD_PRELOAD=/cuda-fault/cuda_intercept.so" in result.stdout
):
print(" ✓ LD_PRELOAD set correctly")
else:
print(" ✗ LD_PRELOAD not set")
if "CUDA_FAULT_INJECTION_ENABLED=1" in result.stdout:
print(" ✓ Fault injection enabled")
else:
print(" ✗ Fault injection not enabled")
# Check pod logs for injection messages
cmd = ["kubectl", "logs", "-n", namespace, pod_name, "--tail=50"]
result = subprocess.run(cmd, capture_output=True, text=True)
if (
"[CUDA FAULT INJECTION] ENABLED" in result.stderr
or "[CUDA FAULT INJECTION] ENABLED" in result.stdout
):
print(" ✓ Fault injection active (found in logs)")
elif "CrashLoopBackOff" in pod.status.phase or pod.status.container_statuses:
if any(
cs.state.waiting and "CrashLoopBackOff" in cs.state.waiting.reason
for cs in pod.status.container_statuses or []
):
print(" ✓ Pod crashing (expected with fault injection)")
else:
print(" ? Cannot confirm injection status from logs")
def main():
parser = argparse.ArgumentParser(
description="Inject CUDA fault library into vLLM pods",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Patch deployment (persistent, recommended)
python inject_into_pods.py --deployment vllm-v1-disagg-router --namespace dynamo-oviya --patch-deployment
# Simulate different XID error types
python inject_into_pods.py --deployment vllm-v1-disagg-router --patch-deployment --xid-type 48
python inject_into_pods.py --deployment vllm-v1-disagg-router --patch-deployment --xid-type 94
# Pin to specific node (simulates real XID behavior)
python inject_into_pods.py --deployment vllm-v1-disagg-router --patch-deployment --node aks-a100a-36888584-vmss000003
# Inject into running pods on specific node (temporary)
python inject_into_pods.py --deployment vllm-v1-disagg-router --node aks-a100a-36888584-vmss000003
# Remove injection
python inject_into_pods.py --deployment vllm-v1-disagg-router --namespace dynamo-oviya --remove
# Verify injection
python inject_into_pods.py --deployment vllm-v1-disagg-router --namespace dynamo-oviya --verify
""",
)
parser.add_argument(
"--deployment",
required=True,
help="Deployment name (e.g., vllm-v1-disagg-router)",
)
parser.add_argument(
"--namespace", default="dynamo-oviya", help="Kubernetes namespace"
)
parser.add_argument("--node", help="Target specific node (optional)")
parser.add_argument(
"--patch-deployment", action="store_true", help="Patch deployment (persistent)"
)
parser.add_argument(
"--xid-type",
type=int,
default=79,
choices=[79, 48, 94, 95, 43, 74],
help="XID error type to simulate (default: 79)",
)
parser.add_argument("--remove", action="store_true", help="Remove injection")
parser.add_argument("--verify", action="store_true", help="Verify injection status")
args = parser.parse_args()
# Load kubeconfig
try:
config.load_kube_config()
except Exception:
print(" Failed to load kubeconfig")
sys.exit(1)
print("=" * 80)
print("CUDA Fault Injection - Pod Injector")
print("=" * 80)
print(f"Deployment: {args.deployment}")
print(f"Namespace: {args.namespace}")
if args.node:
print(f"Node: {args.node}")
if not args.remove and not args.verify:
print(f"XID Type: {args.xid_type}")
print()
if args.verify:
verify_injection(args.deployment, args.namespace)
elif args.remove:
remove_injection(args.deployment, args.namespace)
elif args.patch_deployment:
inject_via_deployment_patch(
args.deployment, args.namespace, args.node, args.xid_type
)
else:
inject_into_running_pods(args.deployment, args.namespace, args.node)
if __name__ == "__main__":
main()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment