Unverified Commit 5652d670 authored by Hongkuan Zhou's avatar Hongkuan Zhou Committed by GitHub
Browse files

feat: support MQA + MoE (Qwen3 MoE) TEP/DEP in Planner Profiler (#4612)


Signed-off-by: default avatarhongkuanz <hongkuanz@nvidia.com>
parent 81ad38ef
...@@ -135,19 +135,6 @@ async def run_profile(args): ...@@ -135,19 +135,6 @@ async def run_profile(args):
args.aic_backend = args.backend args.aic_backend = args.backend
try: try:
# Log MoE model support
if args.model_info.is_moe:
logger.info(
"MoE (Mixture of Experts) model profiling, sweeping TEP/DEP size for prefill and decode"
)
assert args.backend in [
"sglang"
], "MoE model support is only available for SGLang"
else:
logger.info(
"Dense model profiling, sweeping TP size for prefill and decode"
)
config_modifier = CONFIG_MODIFIERS[args.backend] config_modifier = CONFIG_MODIFIERS[args.backend]
with open(args.config, "r") as f: with open(args.config, "r") as f:
...@@ -162,11 +149,7 @@ async def run_profile(args): ...@@ -162,11 +149,7 @@ async def run_profile(args):
for i in range(int(math.log2(args.max_num_gpus_per_engine)) + 1) for i in range(int(math.log2(args.max_num_gpus_per_engine)) + 1)
if args.min_num_gpus_per_engine <= 2**i <= args.max_num_gpus_per_engine if args.min_num_gpus_per_engine <= 2**i <= args.max_num_gpus_per_engine
] ]
if args.model_info.is_moe: logger.info(f"Profiling GPU counts: {profile_num_gpus}")
logger.info(f"Profiling MoE GPU counts (TEP/DEP): {profile_num_gpus}")
else:
logger.info(f"Profiling dense model GPU counts (TP): {profile_num_gpus}")
os.makedirs(args.output_dir, exist_ok=True) os.makedirs(args.output_dir, exist_ok=True)
model_name = config_modifier.get_model_name(config) model_name = config_modifier.get_model_name(config)
...@@ -722,11 +705,10 @@ async def run_profile(args): ...@@ -722,11 +705,10 @@ async def run_profile(args):
config = generate_dgd_config_with_planner( config = generate_dgd_config_with_planner(
config_path=args.config, config_path=args.config,
config_modifier=config_modifier, config_modifier=config_modifier,
best_prefill_gpus=best_prefill_gpus,
best_decode_gpus=best_decode_gpus,
output_dir=args.output_dir, output_dir=args.output_dir,
args=args, args=args,
is_moe_model=args.model_info.is_moe, best_prefill_mapping=best_prefill_mapping,
best_decode_mapping=best_decode_mapping,
num_gpus_per_node=args.num_gpus_per_node, num_gpus_per_node=args.num_gpus_per_node,
) )
logger.debug(f"Final DGD config with planner: {config}") logger.debug(f"Final DGD config with planner: {config}")
......
...@@ -49,8 +49,8 @@ class PodSpec(BaseModel): ...@@ -49,8 +49,8 @@ class PodSpec(BaseModel):
class ServiceResources(BaseModel): class ServiceResources(BaseModel):
requests: Optional[dict[str, str]] = None requests: Optional[dict[str, str | dict]] = None
limits: Optional[dict[str, str]] = None limits: Optional[dict[str, str | dict]] = None
class Service(BaseModel): class Service(BaseModel):
...@@ -298,21 +298,37 @@ def setup_worker_service_resources( ...@@ -298,21 +298,37 @@ def setup_worker_service_resources(
if worker_service.resources is None: if worker_service.resources is None:
worker_service.resources = ServiceResources() worker_service.resources = ServiceResources()
# Ensure requests exists # Ensure limits exists
if worker_service.resources.requests is None: if worker_service.resources.limits is None:
worker_service.resources.requests = {} worker_service.resources.limits = {}
# Set GPU requests # Calculate GPU value
gpu_value = ( gpu_value = (
min(gpu_count, num_gpus_per_node) min(gpu_count, num_gpus_per_node)
if num_gpus_per_node is not None if num_gpus_per_node is not None
else gpu_count else gpu_count
) )
worker_service.resources.requests["gpu"] = str(gpu_value)
# Update limits if they exist def _update_resource_dict(resource_dict: dict[str, str], gpu_value: int):
if worker_service.resources.limits is not None: """Helper function to update gpu and custom rdma/ib fields in a resource dictionary.
worker_service.resources.limits["gpu"] = str(gpu_value)
Args:
resource_dict: The resource dictionary (either limits or requests) to update
gpu_value: The GPU value to set
"""
resource_dict["gpu"] = str(gpu_value)
# also update custom rdma/ib if it exists (some cluster requires this)
if "custom" in resource_dict:
if isinstance(resource_dict["custom"], dict):
if "rdma/ib" in resource_dict["custom"]:
resource_dict["custom"]["rdma/ib"] = str(gpu_value)
# Update limits
_update_resource_dict(worker_service.resources.limits, gpu_value)
# Also update requests if they exist
if worker_service.resources.requests is not None:
_update_resource_dict(worker_service.resources.requests, gpu_value)
def validate_and_get_worker_args(worker_service, backend): def validate_and_get_worker_args(worker_service, backend):
......
...@@ -7,7 +7,10 @@ from dataclasses import dataclass ...@@ -7,7 +7,10 @@ from dataclasses import dataclass
from enum import Enum from enum import Enum
from benchmarks.profiler.utils.defaults import PREFILL_MAX_NUM_TOKENS, EngineType from benchmarks.profiler.utils.defaults import PREFILL_MAX_NUM_TOKENS, EngineType
from benchmarks.profiler.utils.model_info import ModelInfo from benchmarks.profiler.utils.model_info import (
MOE_ADDITIONAL_TP_ARCHITECTURES,
ModelInfo,
)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO) logger.setLevel(logging.INFO)
...@@ -83,6 +86,23 @@ class ParallelizationMapping: ...@@ -83,6 +86,23 @@ class ParallelizationMapping:
""" """
return self.dep if self.dep is not None else 1 # TP and TEP → 1 return self.dep if self.dep is not None else 1 # TP and TEP → 1
def get_num_gpus(self) -> int:
"""
Get the total number of GPUs for this parallelization mapping.
Returns:
The total number of GPUs used
"""
if self.tp is not None:
return self.tp
if self.tep is not None:
return self.tep
if self.dep is not None:
return self.dep
raise ValueError(
"Invalid ParallelizationMapping: no parallelization strategy set"
)
def _check_divisibility( def _check_divisibility(
value: int | None, value: int | None,
...@@ -166,16 +186,12 @@ def get_candidate_parallel_mappings( ...@@ -166,16 +186,12 @@ def get_candidate_parallel_mappings(
candidates: list[ParallelizationMapping] = [] candidates: list[ParallelizationMapping] = []
if is_moe: if is_moe:
if phase == EngineType.PREFILL:
candidates = [ candidates = [
ParallelizationMapping(tep=num_gpus), ParallelizationMapping(tep=num_gpus),
ParallelizationMapping(dep=num_gpus), ParallelizationMapping(dep=num_gpus),
] ]
elif phase == EngineType.DECODE: if model_info.architecture in MOE_ADDITIONAL_TP_ARCHITECTURES:
candidates = [ candidates.append(ParallelizationMapping(tp=num_gpus))
ParallelizationMapping(dep=num_gpus),
ParallelizationMapping(tep=num_gpus),
]
else: else:
candidates = [ParallelizationMapping(tp=num_gpus)] candidates = [ParallelizationMapping(tp=num_gpus)]
......
...@@ -205,6 +205,18 @@ class SGLangConfigModifier: ...@@ -205,6 +205,18 @@ class SGLangConfigModifier:
# Set --tp argument # Set --tp argument
args = set_argument_value(args, "--tp", str(tp_size)) args = set_argument_value(args, "--tp", str(tp_size))
args = remove_valued_arguments(args, "--tp-size")
args = remove_valued_arguments(args, "--tensor-parallel-size")
# Remove --ep if present
args = remove_valued_arguments(args, "--ep")
args = remove_valued_arguments(args, "--ep-size")
args = remove_valued_arguments(args, "--expert-parallel-size")
# remove --dp if present
args = remove_valued_arguments(args, "--dp")
args = remove_valued_arguments(args, "--dp-size")
args = remove_valued_arguments(args, "--data-parallel-size")
worker_service.extraPodSpec.mainContainer.args = args worker_service.extraPodSpec.mainContainer.args = args
return cfg.model_dump() return cfg.model_dump()
...@@ -230,12 +242,18 @@ class SGLangConfigModifier: ...@@ -230,12 +242,18 @@ class SGLangConfigModifier:
# 1. Set --tp=tep_size, if not present add it # 1. Set --tp=tep_size, if not present add it
args = set_argument_value(args, "--tp", str(tep_size)) args = set_argument_value(args, "--tp", str(tep_size))
args = remove_valued_arguments(args, "--tp-size")
args = remove_valued_arguments(args, "--tensor-parallel-size")
# 2. Set --ep-size=tep_size, if not present add it # 2. Set --ep=tep_size, if not present add it
args = set_argument_value(args, "--ep-size", str(tep_size)) args = set_argument_value(args, "--ep", str(tep_size))
args = remove_valued_arguments(args, "--ep-size")
args = remove_valued_arguments(args, "--expert-parallel-size")
# 3. Remove --dp if present # 3. Remove --dp if present
args = remove_valued_arguments(args, "--dp") args = remove_valued_arguments(args, "--dp")
args = remove_valued_arguments(args, "--dp-size")
args = remove_valued_arguments(args, "--data-parallel-size")
# 4. Remove --enable-dp-attention if present # 4. Remove --enable-dp-attention if present
if "--enable-dp-attention" in args: if "--enable-dp-attention" in args:
...@@ -265,16 +283,21 @@ class SGLangConfigModifier: ...@@ -265,16 +283,21 @@ class SGLangConfigModifier:
# 1. Set --tp=dep_size # 1. Set --tp=dep_size
args = set_argument_value(args, "--tp", str(dep_size)) args = set_argument_value(args, "--tp", str(dep_size))
args = remove_valued_arguments(args, "--tp-size")
args = remove_valued_arguments(args, "--tensor-parallel-size")
# 2. Set --dp=dep_size (data parallelism across experts) # 2. Set --dp=dep_size (data parallelism across experts)
args = set_argument_value(args, "--dp", str(dep_size)) args = set_argument_value(args, "--dp", str(dep_size))
args = remove_valued_arguments(args, "--dp-size")
args = remove_valued_arguments(args, "--data-parallel-size")
# 3. Enable --enable-dp-attention # 3. Enable --enable-dp-attention
if "--enable-dp-attention" not in args:
args = append_argument(args, "--enable-dp-attention") args = append_argument(args, "--enable-dp-attention")
# 4. Set --ep-size=dep_size (expert parallelism size) # 4. Set --ep=dep_size (expert parallelism size)
args = set_argument_value(args, "--ep-size", str(dep_size)) args = set_argument_value(args, "--ep", str(dep_size))
args = remove_valued_arguments(args, "--ep-size")
args = remove_valued_arguments(args, "--expert-parallel-size")
worker_service.extraPodSpec.mainContainer.args = args worker_service.extraPodSpec.mainContainer.args = args
return cfg.model_dump() return cfg.model_dump()
......
...@@ -20,6 +20,9 @@ import numpy as np ...@@ -20,6 +20,9 @@ import numpy as np
import yaml import yaml
from benchmarks.profiler.utils.config import Config, DgdPlannerServiceConfig from benchmarks.profiler.utils.config import Config, DgdPlannerServiceConfig
from benchmarks.profiler.utils.config_modifiers.parallelization_mapping import (
ParallelizationMapping,
)
from benchmarks.profiler.utils.planner_utils import build_planner_args_from_namespace from benchmarks.profiler.utils.planner_utils import build_planner_args_from_namespace
from dynamo.common.utils.paths import get_workspace_dir from dynamo.common.utils.paths import get_workspace_dir
from dynamo.planner.defaults import SubComponentType from dynamo.planner.defaults import SubComponentType
...@@ -28,11 +31,10 @@ from dynamo.planner.defaults import SubComponentType ...@@ -28,11 +31,10 @@ from dynamo.planner.defaults import SubComponentType
def generate_dgd_config_with_planner( def generate_dgd_config_with_planner(
config_path: str, config_path: str,
config_modifier, config_modifier,
best_prefill_gpus: int,
best_decode_gpus: int,
output_dir: str, output_dir: str,
args, args,
is_moe_model: bool = False, best_prefill_mapping: ParallelizationMapping,
best_decode_mapping: ParallelizationMapping,
num_gpus_per_node: int = 8, num_gpus_per_node: int = 8,
): ):
"""Generate DGD config with planner based on profiling results. """Generate DGD config with planner based on profiling results.
...@@ -40,12 +42,11 @@ def generate_dgd_config_with_planner( ...@@ -40,12 +42,11 @@ def generate_dgd_config_with_planner(
Args: Args:
config_path: Path to the YAML config file config_path: Path to the YAML config file
config_modifier: Config modifier instance (e.g., SGLangConfigModifier) config_modifier: Config modifier instance (e.g., SGLangConfigModifier)
best_prefill_gpus: Number of GPUs for prefill engine
best_decode_gpus: Number of GPUs for decode engine
output_dir: Output directory for profile results output_dir: Output directory for profile results
args: Parsed arguments namespace from profile_sla args: Parsed arguments namespace from profile_sla
is_moe_model: Whether this is an MoE model best_prefill_mapping: Parallel mapping for prefill (TP/TEP/DEP)
num_gpus_per_node: Number of GPUs per node (for MoE models) best_decode_mapping: Parallel mapping for decode (TP/TEP/DEP)
num_gpus_per_node: Number of GPUs per node (for TEP/DEP models)
Returns: Returns:
list[dict] | dict: If a ConfigMap is generated for planner data, returns a list list[dict] | dict: If a ConfigMap is generated for planner data, returns a list
...@@ -61,28 +62,52 @@ def generate_dgd_config_with_planner( ...@@ -61,28 +62,52 @@ def generate_dgd_config_with_planner(
if args.dgd_image: if args.dgd_image:
config = config_modifier.update_image(config, args.dgd_image) config = config_modifier.update_image(config, args.dgd_image)
if not is_moe_model: # Apply prefill parallelization based on the actual mapping used in profiling
# dense model, use TP for both prefill and decode if best_prefill_mapping.tp is not None:
# Dense model or TP for prefill
config = config_modifier.set_config_tp_size( config = config_modifier.set_config_tp_size(
config, best_prefill_gpus, SubComponentType.PREFILL config, best_prefill_mapping.tp, SubComponentType.PREFILL
) )
elif best_prefill_mapping.tep is not None:
# MoE model with TEP for prefill
config = config_modifier.set_config_tep_size(
config,
best_prefill_mapping.tep,
num_gpus_per_node,
SubComponentType.PREFILL,
)
elif best_prefill_mapping.dep is not None:
# MoE model with DEP for prefill
config = config_modifier.set_config_dep_size(
config,
best_prefill_mapping.dep,
num_gpus_per_node,
SubComponentType.PREFILL,
)
# Apply decode parallelization based on the actual mapping used in profiling
if best_decode_mapping.tp is not None:
# Dense model or TP for decode
config = config_modifier.set_config_tp_size( config = config_modifier.set_config_tp_size(
config, best_decode_gpus, SubComponentType.DECODE config, best_decode_mapping.tp, SubComponentType.DECODE
) )
else: elif best_decode_mapping.tep is not None:
# MoE model, use TEP for prefill and DEP for decode # MoE model with TEP for decode
config = config_modifier.set_config_tep_size( config = config_modifier.set_config_tep_size(
config, config,
best_prefill_gpus, best_decode_mapping.tep,
num_gpus_per_node, num_gpus_per_node,
SubComponentType.PREFILL, SubComponentType.DECODE,
) )
elif best_decode_mapping.dep is not None:
# MoE model with DEP for decode
config = config_modifier.set_config_dep_size( config = config_modifier.set_config_dep_size(
config, config,
best_decode_gpus, best_decode_mapping.dep,
num_gpus_per_node, num_gpus_per_node,
SubComponentType.DECODE, SubComponentType.DECODE,
) )
config = Config.model_validate(config) config = Config.model_validate(config)
# add the planner service # add the planner service
...@@ -121,8 +146,8 @@ def generate_dgd_config_with_planner( ...@@ -121,8 +146,8 @@ def generate_dgd_config_with_planner(
planner_args.extend( planner_args.extend(
[ [
f"--namespace={frontend_namespace}", f"--namespace={frontend_namespace}",
f"--prefill-engine-num-gpu={best_prefill_gpus}", f"--prefill-engine-num-gpu={best_prefill_mapping.get_num_gpus()}",
f"--decode-engine-num-gpu={best_decode_gpus}", f"--decode-engine-num-gpu={best_decode_mapping.get_num_gpus()}",
f"--profile-results-dir={cm_mount_path}", f"--profile-results-dir={cm_mount_path}",
] ]
) )
......
...@@ -28,8 +28,16 @@ CONTEXT_LENGTH_ATTRS = [ ...@@ -28,8 +28,16 @@ CONTEXT_LENGTH_ATTRS = [
"sliding_window", # Mistral with sliding window attention "sliding_window", # Mistral with sliding window attention
] ]
# only for MLA + MoE models, treat other MoE models as dense models # supported MoE architectures
MOE_ARCHITECTURES = {"DeepseekV3ForCausalLM", "DeepseekV32ForCausalLM"} MOE_ARCHITECTURES = {
"DeepseekV3ForCausalLM",
"DeepseekV32ForCausalLM", # MLA + MoE
"Qwen3MoeForCausalLM", # GQA + MoE
}
# MoE architectures that sweeps TP additionally to TEP/DEP
MOE_ADDITIONAL_TP_ARCHITECTURES = {
"Qwen3MoeForCausalLM", # GQA + MoE
}
def get_local_model_weight_size( def get_local_model_weight_size(
......
...@@ -21,15 +21,20 @@ This document covers: ...@@ -21,15 +21,20 @@ This document covers:
## Support Matrix ## Support Matrix
| Backend | Dense Models (P:TP, D:TP) | MoE Models (P:TEP, D:DEP) | | Backend | Dense Models | MoE Models |
|---------|-------------|------------| |---------|-------------|------------|
| vLLM | ✅ | 🚧 | | vLLM | ✅ | 🚧 |
| SGLang | ✅ | ✅ | | SGLang | ✅ | ✅ |
| TensorRT-LLM | ✅ | 🚧 | | TensorRT-LLM | ✅ | 🚧 |
Specifically, the profiler sweeps over the following parallelization mapping for prefill and decode:
| Model Architecture | Prefill Parallelization Mapping | Decode Parallelization Mapping |
|---------|-------------|------------|
| MLA+MoE (DeepseekV3ForCausalLM, DeepseekV32ForCausalLM) | TEP, DEP | TEP, DEP |
| GQA+MoE (Qwen3MoeForCausalLM) | TP, TEP, DEP | TP, TEP, DEP |
| Other Models | TP | TP |
> [!NOTE] > [!NOTE]
> - We only support multi-node engines for MoE models.
> - For MoE models, we currently only support deepseek-style MLA+MoE models. For other MoE models like GQA+MoE, please use the dense mode (sweep over TP sizes) instead.
> - Exact model x parallelization mapping support is dependent on the backend. The profiler does not guarantee that the recommended P/D engine configuration is supported and bug-free by the backend. > - Exact model x parallelization mapping support is dependent on the backend. The profiler does not guarantee that the recommended P/D engine configuration is supported and bug-free by the backend.
## Using DGDR for Profiling (Recommended) ## Using DGDR for Profiling (Recommended)
...@@ -269,7 +274,7 @@ profilingConfig: ...@@ -269,7 +274,7 @@ profilingConfig:
**When to use:** **When to use:**
- **min_num_gpus_per_engine**: Skip small TP sizes if your model is large - **min_num_gpus_per_engine**: Skip small TP sizes if your model is large
- **max_num_gpus_per_engine**: Limit search space or work around constraints (e.g., [AIC attention heads](#ai-configurator-attention-head-constraint-error)) - **max_num_gpus_per_engine**: Limit search space or work around constraints (e.g., [AIC attention heads](#ai-configurator-attention-head-constraint-error))
- **num_gpus_per_node**: Required for MoE models with TEP/DEP sizing - **num_gpus_per_node**: Determine the upper bound of number of GPUs per node for dense models and configure Grove for multi-node MoE engines.
- **gpu_type**: Informational, auto-detected by controller - **gpu_type**: Informational, auto-detected by controller
> [!TIP] > [!TIP]
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment