Unverified Commit d94c2ea2 authored by Hongkuan Zhou's avatar Hongkuan Zhou Committed by GitHub
Browse files

fix: honor per-node GPU topology in DGD builder (DYN-2544) (#7868)


Signed-off-by: default avatarhongkuanz <hongkuanz@nvidia.com>
parent eac5e463
...@@ -40,7 +40,7 @@ classifiers = [ ...@@ -40,7 +40,7 @@ classifiers = [
] ]
dependencies = [ dependencies = [
"aiconfigurator[webapp] @ git+https://github.com/ai-dynamo/aiconfigurator.git@5d419f99d60fdae0d3911cba06a9b571f3b2965c", "aiconfigurator[webapp] @ git+https://github.com/ai-dynamo/aiconfigurator.git@1fe8084c817194827706947cc175107ee9e6ea25",
"aiperf==0.6.0", "aiperf==0.6.0",
"matplotlib", "matplotlib",
"networkx", "networkx",
......
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
# Case 12: Thorough MoE sweep for Qwen3-30B-A3B on SGLang.
# Regression test for DYN-2544: small MoE models should not generate
# candidate DGDs exceeding cluster GPU capacity (totalGpus=16).
model: "Qwen/Qwen3-30B-A3B"
backend: sglang
image: "nvcr.io/nvidia/ai-dynamo/sglang-runtime:1.0.0"
hardware:
gpuSku: b200_sxm
totalGpus: 16
numGpusPerNode: 8
workload:
isl: 3000
osl: 150
sla:
ttft: 200.0
itl: 20.0
searchStrategy: thorough
...@@ -3,7 +3,7 @@ ...@@ -3,7 +3,7 @@
# Case 4: AIC unsupported model, rapid, without planner # Case 4: AIC unsupported model, rapid, without planner
# l40s + vllm has no disagg support in AIC # l40s + vllm has no disagg support in AIC
model: "Qwen/Qwen3-32B" model: "Qwen/Qwen2.5-0.5B"
backend: vllm backend: vllm
image: "nvcr.io/nvidia/ai-dynamo/dynamo-frontend:latest" image: "nvcr.io/nvidia/ai-dynamo/dynamo-frontend:latest"
hardware: hardware:
......
...@@ -4,7 +4,7 @@ ...@@ -4,7 +4,7 @@
# Case 5: AIC unsupported model, rapid, with planner # Case 5: AIC unsupported model, rapid, with planner
# Should raise error if throughput scaling is enabled (AIC unsupported). # Should raise error if throughput scaling is enabled (AIC unsupported).
# This config uses load-based scaling only to test the non-error path. # This config uses load-based scaling only to test the non-error path.
model: "Qwen/Qwen3-32B" model: "Qwen/Qwen2.5-0.5B"
backend: vllm backend: vllm
image: "nvcr.io/nvidia/ai-dynamo/dynamo-frontend:latest" image: "nvcr.io/nvidia/ai-dynamo/dynamo-frontend:latest"
hardware: hardware:
......
...@@ -4,7 +4,7 @@ ...@@ -4,7 +4,7 @@
# Case 5b: AIC unsupported model, rapid, with planner + throughput scaling # Case 5b: AIC unsupported model, rapid, with planner + throughput scaling
# This should FAIL with a ValueError because throughput-based planner # This should FAIL with a ValueError because throughput-based planner
# requires AIC support. # requires AIC support.
model: "meta-llama/Llama-3.1-8B" model: "Qwen/Qwen2.5-0.5B"
backend: vllm backend: vllm
image: "nvcr.io/nvidia/ai-dynamo/dynamo-frontend:latest" image: "nvcr.io/nvidia/ai-dynamo/dynamo-frontend:latest"
hardware: hardware:
......
...@@ -555,3 +555,46 @@ class TestThoroughMockedOverrides: ...@@ -555,3 +555,46 @@ class TestThoroughMockedOverrides:
secret_names = [s["name"] for s in secrets] secret_names = [s["name"] for s in secrets]
assert "my-registry-secret" in secret_names assert "my-registry-secret" in secret_names
assert "nvcr-pull-secret" in secret_names assert "nvcr-pull-secret" in secret_names
class TestThoroughMoEGpuBudget:
"""DYN-2544: MoE thorough candidates must not exceed cluster GPU budget."""
@pytest.mark.pre_merge
@pytest.mark.gpu_0
def test_thorough_moe_qwen3_30b_candidates_within_budget(self, tmp_path):
"""Case 12: Qwen3-30B-A3B on 16 GPUs (2x8) should not produce >8 GPU candidates.
The model is small enough to fit on a single 8-GPU node, so wideEP
should be disabled and all candidate DGDs should request at most
numGpusPerNode GPUs per worker.
"""
dgdr = _load_dgdr(CONFIGS_DIR / "12_thorough_moe_qwen3_30b_a3b_sglang.yaml")
ops = _make_ops(tmp_path)
_run_mocked_thorough(dgdr, ops, "sglang")
output = tmp_path / "profiling_results" / "final_config.yaml"
assert output.exists(), "final_config.yaml should exist"
config = yaml.safe_load(output.read_text())
assert config and "spec" in config
results_dir = tmp_path / "profiling_results"
num_gpus_per_node = dgdr.hardware.numGpusPerNode
for candidate_dir in results_dir.iterdir():
if not candidate_dir.is_dir():
continue
cfg_file = candidate_dir / "config.yaml"
if not cfg_file.exists():
continue
candidate = yaml.safe_load(cfg_file.read_text())
if not candidate or "spec" not in candidate:
continue
for svc_name, svc in candidate["spec"].get("services", {}).items():
if svc_name in ("Frontend", "Planner"):
continue
limits = (svc.get("resources") or {}).get("limits", {})
gpu_limit = int(limits.get("gpu", 0))
assert gpu_limit <= num_gpus_per_node, (
f"Candidate {candidate_dir.name} service {svc_name} requests "
f"{gpu_limit} GPUs but numGpusPerNode is {num_gpus_per_node}"
)
...@@ -9,6 +9,7 @@ from unittest.mock import AsyncMock, patch ...@@ -9,6 +9,7 @@ from unittest.mock import AsyncMock, patch
import pytest import pytest
from dynamo.profiler.utils.config_modifiers import CONFIG_MODIFIERS
from dynamo.profiler.utils.config_modifiers.parallelization_mapping import ( from dynamo.profiler.utils.config_modifiers.parallelization_mapping import (
PickedParallelConfig, PickedParallelConfig,
) )
...@@ -29,6 +30,31 @@ pytestmark = [ ...@@ -29,6 +30,31 @@ pytestmark = [
] ]
def test_build_dgd_config_shapes_multinode_worker_resources() -> None:
"""build_dgd_config applies per-node GPU shaping when topology is provided."""
modifier = CONFIG_MODIFIERS["sglang"]
dgd_config = modifier.build_dgd_config(
mode="disagg",
model_name="Qwen/Qwen3-30B-A3B",
image="nvcr.io/nvidia/ai-dynamo/sglang-runtime:1.0.0",
prefill_cli_args=["--max-running-requests", "1"],
prefill_replicas=1,
prefill_gpus=1,
decode_cli_args=["--data-parallel-size", "16"],
decode_replicas=1,
decode_gpus=16,
num_gpus_per_node=8,
)
decode_service = next(
service
for service in dgd_config["spec"]["services"].values()
if service.get("subComponentType") == "decode"
)
assert decode_service["resources"]["limits"]["gpu"] == "8"
assert decode_service["multinode"] == {"nodeCount": 2}
def test_apply_dgd_overrides_strips_envelope() -> None: def test_apply_dgd_overrides_strips_envelope() -> None:
"""Envelope fields are stripped; nested payload keys are deep-merged.""" """Envelope fields are stripped; nested payload keys are deep-merged."""
dgd_config = { dgd_config = {
......
...@@ -24,11 +24,11 @@ from dynamo.profiler.utils.config import ( ...@@ -24,11 +24,11 @@ from dynamo.profiler.utils.config import (
Config, Config,
Container, Container,
PodSpec, PodSpec,
ServiceResources,
break_arguments, break_arguments,
get_service_name_by_type, get_service_name_by_type,
sanitize_cli_args, sanitize_cli_args,
set_argument_value, set_argument_value,
setup_worker_service_resources,
update_image, update_image,
) )
from dynamo.profiler.utils.defaults import EngineType from dynamo.profiler.utils.defaults import EngineType
...@@ -124,6 +124,29 @@ class ConfigModifierProtocol(Protocol): ...@@ -124,6 +124,29 @@ class ConfigModifierProtocol(Protocol):
) -> dict: ) -> dict:
... ...
@classmethod
def build_dgd_config(
cls,
mode: str,
model_name: str,
image: str,
prefill_cli_args: list[str] | None = None,
prefill_replicas: int = 1,
prefill_gpus: int = 1,
decode_cli_args: list[str] | None = None,
decode_replicas: int = 1,
decode_gpus: int = 1,
agg_cli_args: list[str] | None = None,
agg_replicas: int = 1,
agg_gpus: int = 1,
namespace: str | None = None,
model_path: str | None = None,
pvc_name: str | None = None,
pvc_mount_path: str | None = None,
num_gpus_per_node: int | None = None,
) -> dict:
...
class BaseConfigModifier: class BaseConfigModifier:
""" """
...@@ -445,6 +468,7 @@ class BaseConfigModifier: ...@@ -445,6 +468,7 @@ class BaseConfigModifier:
model_path: str | None = None, model_path: str | None = None,
pvc_name: str | None = None, pvc_name: str | None = None,
pvc_mount_path: str | None = None, pvc_mount_path: str | None = None,
num_gpus_per_node: int | None = None,
) -> dict: ) -> dict:
""" """
Build a complete DynamoGraphDeployment config by loading a base YAML Build a complete DynamoGraphDeployment config by loading a base YAML
...@@ -471,6 +495,9 @@ class BaseConfigModifier: ...@@ -471,6 +495,9 @@ class BaseConfigModifier:
model_path: Model path if different from model_name (e.g. PVC path) model_path: Model path if different from model_name (e.g. PVC path)
pvc_name: PVC claim name for model cache (optional) pvc_name: PVC claim name for model cache (optional)
pvc_mount_path: PVC mount path (optional) pvc_mount_path: PVC mount path (optional)
num_gpus_per_node: GPUs per physical node. When provided, worker
GPU limits are capped per node and multinode.nodeCount is set
for workers that span multiple nodes.
Returns: Returns:
Complete DGD config dict ready for YAML serialization Complete DGD config dict ready for YAML serialization
...@@ -502,6 +529,7 @@ class BaseConfigModifier: ...@@ -502,6 +529,7 @@ class BaseConfigModifier:
decode_cli_args=decode_cli_args or [], decode_cli_args=decode_cli_args or [],
decode_replicas=decode_replicas, decode_replicas=decode_replicas,
decode_gpus=decode_gpus, decode_gpus=decode_gpus,
num_gpus_per_node=num_gpus_per_node,
) )
else: else:
cls._apply_agg_worker( cls._apply_agg_worker(
...@@ -509,6 +537,7 @@ class BaseConfigModifier: ...@@ -509,6 +537,7 @@ class BaseConfigModifier:
agg_cli_args=agg_cli_args or [], agg_cli_args=agg_cli_args or [],
agg_replicas=agg_replicas, agg_replicas=agg_replicas,
agg_gpus=agg_gpus, agg_gpus=agg_gpus,
num_gpus_per_node=num_gpus_per_node,
) )
# Update model (handles worker args + frontend patching) # Update model (handles worker args + frontend patching)
...@@ -558,15 +587,11 @@ class BaseConfigModifier: ...@@ -558,15 +587,11 @@ class BaseConfigModifier:
cli_args: list[str], cli_args: list[str],
replicas: int, replicas: int,
gpus: int, gpus: int,
num_gpus_per_node: int | None = None,
) -> None: ) -> None:
"""Apply CLI args, replicas, and GPU resources to a single worker service.""" """Apply CLI args, replicas, and GPU resources to a single worker service."""
service.replicas = replicas service.replicas = replicas
setup_worker_service_resources(service, gpus, num_gpus_per_node)
if service.resources is None:
service.resources = ServiceResources()
if service.resources.limits is None:
service.resources.limits = {}
service.resources.limits["gpu"] = str(gpus)
if service.extraPodSpec and service.extraPodSpec.mainContainer: if service.extraPodSpec and service.extraPodSpec.mainContainer:
service.extraPodSpec.mainContainer.args = sanitize_cli_args(list(cli_args)) service.extraPodSpec.mainContainer.args = sanitize_cli_args(list(cli_args))
...@@ -581,6 +606,7 @@ class BaseConfigModifier: ...@@ -581,6 +606,7 @@ class BaseConfigModifier:
decode_cli_args: list[str], decode_cli_args: list[str],
decode_replicas: int, decode_replicas: int,
decode_gpus: int, decode_gpus: int,
num_gpus_per_node: int | None = None,
) -> None: ) -> None:
"""Apply CLI args, replicas, and GPU resources to disagg worker services.""" """Apply CLI args, replicas, and GPU resources to disagg worker services."""
for sct, cli_args, replicas, gpus in [ for sct, cli_args, replicas, gpus in [
...@@ -601,7 +627,11 @@ class BaseConfigModifier: ...@@ -601,7 +627,11 @@ class BaseConfigModifier:
) )
continue continue
cls._apply_worker_config( cls._apply_worker_config(
cfg.spec.services[svc_name], cli_args, replicas, gpus cfg.spec.services[svc_name],
cli_args,
replicas,
gpus,
num_gpus_per_node=num_gpus_per_node,
) )
@classmethod @classmethod
...@@ -611,6 +641,7 @@ class BaseConfigModifier: ...@@ -611,6 +641,7 @@ class BaseConfigModifier:
agg_cli_args: list[str], agg_cli_args: list[str],
agg_replicas: int, agg_replicas: int,
agg_gpus: int, agg_gpus: int,
num_gpus_per_node: int | None = None,
) -> None: ) -> None:
"""Apply CLI args, replicas, and GPU resources to the agg worker service. """Apply CLI args, replicas, and GPU resources to the agg worker service.
...@@ -630,7 +661,11 @@ class BaseConfigModifier: ...@@ -630,7 +661,11 @@ class BaseConfigModifier:
logger.warning("Could not find worker service for agg mode") logger.warning("Could not find worker service for agg mode")
return return
cls._apply_worker_config( cls._apply_worker_config(
cfg.spec.services[svc_name], agg_cli_args, agg_replicas, agg_gpus cfg.spec.services[svc_name],
agg_cli_args,
agg_replicas,
agg_gpus,
num_gpus_per_node=num_gpus_per_node,
) )
......
...@@ -3,7 +3,7 @@ ...@@ -3,7 +3,7 @@
# Dependencies required by the planner, profiler, and global_planner services. # Dependencies required by the planner, profiler, and global_planner services.
aiconfigurator[webapp] @ git+https://github.com/ai-dynamo/aiconfigurator.git@5d419f99d60fdae0d3911cba06a9b571f3b2965c aiconfigurator[webapp] @ git+https://github.com/ai-dynamo/aiconfigurator.git@1fe8084c817194827706947cc175107ee9e6ea25
aiofiles<=25.1.0 aiofiles<=25.1.0
filterpy==1.4.5 filterpy==1.4.5
kubernetes==32.0.1 kubernetes==32.0.1
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment