Unverified Commit 031dc589 authored by Indrajit Bhosale's avatar Indrajit Bhosale Committed by GitHub
Browse files

test: TRTLLM K8s FT tests (#3368)


Signed-off-by: default avatarIndrajit Bhosale <iamindrajitb@gmail.com>
parent a4c8f5ab
......@@ -29,6 +29,11 @@ WORKER_MAP = {
"decode": "decode",
"prefill": "prefill",
},
"trtllm": {
"decode": "TRTLLMDecodeWorker",
"decode_agg": "TRTLLMWorker", # Aggregated uses different name
"prefill": "TRTLLMPrefillWorker",
},
}
# Process ready patterns for recovery detection
......@@ -49,6 +54,16 @@ WORKER_READY_PATTERNS: Dict[str, Pattern] = {
"prefill": re.compile(
r"Model registration succeeded|Prefill worker handler initialized|Worker handler initialized"
),
# TensorRT-LLM workers
"TRTLLMWorker": re.compile(
r"TrtllmWorker for (?P<model_name>.*?) has been initialized|Model registration succeeded"
),
"TRTLLMDecodeWorker": re.compile(
r"TrtllmWorker for (?P<model_name>.*?) has been initialized|Model registration succeeded"
),
"TRTLLMPrefillWorker": re.compile(
r"TrtllmWorker for (?P<model_name>.*?) has been initialized|Model registration succeeded"
),
}
......@@ -119,8 +134,14 @@ def _set_replicas(deployment_spec, backend, deploy_type, replicas):
spec["Frontend"].replicas = replicas
if backend in WORKER_MAP:
# For trtllm agg deployments, use different worker name
if backend == "trtllm" and deploy_type == "agg":
decode_worker = WORKER_MAP[backend]["decode_agg"]
else:
decode_worker = WORKER_MAP[backend]["decode"]
# always scale decode
spec[WORKER_MAP[backend]["decode"]].replicas = replicas
spec[decode_worker].replicas = replicas
# scale prefill only for disagg
if deploy_type == "disagg":
spec[WORKER_MAP[backend]["prefill"]].replicas = replicas
......@@ -131,7 +152,11 @@ def _set_tensor_parallel(deployment_spec, backend, deploy_type, tp_size):
spec = deployment_spec["spec"]
if backend in WORKER_MAP:
decode_worker = WORKER_MAP[backend]["decode"]
# For trtllm agg deployments, use different worker name
if backend == "trtllm" and deploy_type == "agg":
decode_worker = WORKER_MAP[backend]["decode_agg"]
else:
decode_worker = WORKER_MAP[backend]["decode"]
prefill_worker = WORKER_MAP[backend]["prefill"]
if deploy_type == "agg":
......@@ -200,6 +225,7 @@ def _create_deployments_for_backend(backend):
deployment_specs = {}
deployment_specs.update(_create_deployments_for_backend("vllm"))
deployment_specs.update(_create_deployments_for_backend("sglang"))
deployment_specs.update(_create_deployments_for_backend("trtllm"))
# Each failure scenaro contains a list of failure injections
......@@ -212,10 +238,21 @@ deployment_specs.update(_create_deployments_for_backend("sglang"))
# "prefill_worker": [Failure(30, "VllmPrefillWorker", "dynamo.vllm", "SIGKILL")],
#
# terminates 1 prefill worker after 30 seconds
def _create_backend_failures(backend):
"""Generate backend-specific failure scenarios."""
def _create_backend_failures(backend, deploy_type="disagg"):
"""Generate backend-specific failure scenarios.
Args:
backend: Backend type (vllm, sglang, trtllm)
deploy_type: Deployment type (agg or disagg)
"""
workers = WORKER_MAP[backend]
decode_worker = workers["decode"]
# Use correct worker name based on deployment type
if backend == "trtllm" and deploy_type == "agg":
decode_worker = workers["decode_agg"]
else:
decode_worker = workers["decode"]
prefill_worker = workers["prefill"]
process_name = f"dynamo.{backend}"
......@@ -249,6 +286,13 @@ def _create_backend_failures(backend):
failures["sglang_prefill_detokenizer"] = [
Failure(30, prefill_worker, "sglang::detokenizer", "SIGKILL")
]
elif backend == "trtllm":
failures["trtllm_decode_engine_core"] = [
Failure(30, decode_worker, "TRTLLM::EngineCore", "SIGKILL")
]
failures["trtllm_prefill_engine_core"] = [
Failure(30, prefill_worker, "TRTLLM::EngineCore", "SIGKILL")
]
return failures
......@@ -263,27 +307,36 @@ model = None
scenarios = {}
# Map of backend to failure definitions
backend_failure_map = {
"vllm": _create_backend_failures("vllm"),
"sglang": _create_backend_failures("sglang"),
}
# Map of backend+deploy_type to failure definitions
backend_failure_map = {}
for backend in ["vllm", "sglang", "trtllm"]:
backend_failure_map[f"{backend}_agg"] = _create_backend_failures(backend, "agg")
backend_failure_map[f"{backend}_disagg"] = _create_backend_failures(
backend, "disagg"
)
for deployment_name, deployment_info in deployment_specs.items():
backend = deployment_info["backend"]
# Validate backend
if backend not in backend_failure_map:
# Determine deployment type from deployment name
deploy_type = (
"agg"
if ("agg" in deployment_name and "disagg" not in deployment_name)
else "disagg"
)
# Get the appropriate failure set for this backend+deploy_type
failure_map_key = f"{backend}_{deploy_type}"
if failure_map_key not in backend_failure_map:
raise ValueError(
f"Unsupported backend: {backend}. Supported backends are: {list(backend_failure_map.keys())}"
f"Unsupported backend+deploy_type: {failure_map_key}. Available: {list(backend_failure_map.keys())}"
)
# Get the appropriate failure set for this backend
failure_set = backend_failure_map[backend]
failure_set = backend_failure_map[failure_map_key]
for failure_name, failure in failure_set.items():
# Skip prefill failures for aggregated deployments
if "prefill" in failure_name and "disagg" not in deployment_name:
if "prefill" in failure_name and deploy_type == "agg":
continue
scenario_name = f"{deployment_name}-{failure_name}"
......
......@@ -153,6 +153,15 @@ async def test_fault_scenario(
model = scenario.deployment["VllmDecodeWorker"].model
elif scenario.backend == "sglang":
model = scenario.deployment["decode"].model
elif scenario.backend == "trtllm":
# Determine deployment type from scenario deployment name
if (
"agg" in scenario.deployment.name
and "disagg" not in scenario.deployment.name
):
model = scenario.deployment["TRTLLMWorker"].model
else:
model = scenario.deployment["TRTLLMDecodeWorker"].model
else:
model = None
except (KeyError, AttributeError):
......
......@@ -97,7 +97,10 @@ class ServiceSpec:
else:
return
self._spec["extraPodSpec"]["mainContainer"]["args"] = [" ".join(parts)]
# Store args as a list of separate strings for proper command-line parsing
# WRONG: [" ".join(parts)] creates ["--model Qwen/Qwen3-0.6B"] (single string)
# RIGHT: parts creates ["--model", "Qwen/Qwen3-0.6B"] (separate strings)
self._spec["extraPodSpec"]["mainContainer"]["args"] = parts
# ----- GPUs -----
@property
......@@ -158,7 +161,12 @@ class ServiceSpec:
# Add new argument
parts.extend(["--tensor-parallel-size", str(value)])
self._spec["extraPodSpec"]["mainContainer"]["args"] = [" ".join(parts)]
# Store args as a list of separate strings for proper command-line parsing
# When TP > 1, this setter is called and adds --tensor-parallel-size to args.
# WRONG: [" ".join(parts)] would create ["--model Qwen/Qwen3-0.6B --tensor-parallel-size 2"]
# causing argparse to fail with "IndexError: list index out of range"
# RIGHT: parts creates ["--model", "Qwen/Qwen3-0.6B", "--tensor-parallel-size", "2"]
self._spec["extraPodSpec"]["mainContainer"]["args"] = parts
# Auto-adjust GPU count to match tensor parallel size
self.gpus = value
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment