Unverified Commit 7750ed1a authored by Hongkuan Zhou's avatar Hongkuan Zhou Committed by GitHub
Browse files

feat: add parallelization filters (#4144)


Signed-off-by: default avatarhongkuanz <hongkuanz@nvidia.com>
Signed-off-by: default avatarHongkuan Zhou <tedzhouhk@gmail.com>
Co-authored-by: default avatarcoderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
parent e1b0edb9
......@@ -15,10 +15,6 @@ spec:
profilingConfig:
profilerImage: "nvcr.io/nvidia/ai-dynamo/sglang-runtime:0.6.1"
config:
# Engine configuration
engine:
is_moe_model: true # Enable MoE model support (uses TEP/DEP instead of TP)
# Sweep/profiling configuration
sweep:
# Standard online profiling (not using AI Configurator)
......
......@@ -5,6 +5,7 @@ import argparse
import logging
import os
from benchmarks.profiler.utils.defaults import EngineType
from benchmarks.profiler.utils.profile_decode import profile_decode
from benchmarks.profiler.utils.profile_prefill import profile_prefill
......@@ -91,7 +92,11 @@ if __name__ == "__main__":
os.makedirs(args.work_dir, exist_ok=True)
if args.tokenizer_path == "":
args.tokenizer_path = args.model_name
if args.mode == "prefill":
# Convert string mode to EngineType
mode = EngineType(args.mode)
if mode == EngineType.PREFILL:
profile_prefill(
args.work_dir,
args.model_name,
......@@ -101,7 +106,7 @@ if __name__ == "__main__":
args.max_context_length,
args.interpolation_granularity,
)
elif args.mode == "decode":
elif mode == EngineType.DECODE:
assert args.max_kv_tokens > 0, "max_kv_tokens must be provided for decode"
profile_decode(
args.work_dir,
......@@ -115,4 +120,4 @@ if __name__ == "__main__":
args.attention_dp_size,
)
else:
raise ValueError(f"Invalid mode: {args.mode}")
raise ValueError(f"Invalid mode: {mode}")
This diff is collapsed.
......@@ -17,7 +17,7 @@ import json
import logging
import math
import shlex
from typing import Literal, Optional, Protocol
from typing import Optional
from pydantic import BaseModel
......@@ -378,69 +378,3 @@ def update_image(config: dict, image: str) -> dict:
logger.debug(f"Updated image for {service_name} to {image}")
return cfg.model_dump()
class ConfigModifierProtocol(Protocol):
@classmethod
def convert_config(
cls,
config: dict,
target: Literal["prefill", "decode"],
is_moe_model: bool = False,
) -> dict:
...
@classmethod
def set_config_tp_size(
cls,
config: dict,
tp_size: int,
component_type: SubComponentType = SubComponentType.DECODE,
) -> dict:
...
@classmethod
def set_config_tep_size(
cls,
config: dict,
tep_size: int,
num_gpus_per_node: int,
component_type: SubComponentType = SubComponentType.DECODE,
) -> dict:
...
@classmethod
def set_config_dep_size(
cls,
config: dict,
dep_size: int,
num_gpus_per_node: int,
component_type: SubComponentType = SubComponentType.DECODE,
) -> dict:
...
@classmethod
def get_model_name(cls, config: dict) -> str:
...
@classmethod
def get_port(cls, config: dict) -> int:
...
@classmethod
def get_kv_cache_size_from_dynamo_log(
cls, dynamo_log_fn: str, attention_dp_size: int = 1
) -> int:
...
@classmethod
def load_default_config(cls) -> dict:
...
@classmethod
def update_model(cls, config: dict, model_name: str) -> dict:
...
@classmethod
def update_image(cls, config: dict, image: str) -> dict:
...
......@@ -16,7 +16,9 @@
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from benchmarks.profiler.utils.config import ConfigModifierProtocol
from benchmarks.profiler.utils.config_modifiers.protocol import (
ConfigModifierProtocol,
)
from benchmarks.profiler.utils.config_modifiers.sglang import SGLangConfigModifier
from benchmarks.profiler.utils.config_modifiers.trtllm import TrtllmConfigModifier
......
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
import copy
import logging
from dataclasses import dataclass
from enum import Enum
from benchmarks.profiler.utils.defaults import EngineType
from benchmarks.profiler.utils.model_info import ModelInfo
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s", "%Y-%m-%d %H:%M:%S"
)
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
class ParallelizationStrategy(Enum):
"""Enum for parallelization strategy types."""
TP = "TP"
TEP = "TEP"
DEP = "DEP"
@dataclass(frozen=True)
class ParallelizationMapping:
"""
Represents parallelization mapping of configs
"""
tp: int | None = None
tep: int | None = None
dep: int | None = None
def label(self) -> str:
if self.tp is not None:
return f"{ParallelizationStrategy.TP.value}={self.tp}"
if self.tep is not None:
return f"{ParallelizationStrategy.TEP.value}={self.tep}"
if self.dep is not None:
return f"{ParallelizationStrategy.DEP.value}={self.dep}"
return "default"
def get_tp_size(self) -> int:
"""
Get the effective TP size for KV heads splitting.
Both TP and TEP split KV heads, DEP doesn't (returns 1).
"""
if self.tp is not None:
return self.tp
if self.tep is not None:
return self.tep
return 1 # DEP has TP split of 1
def get_expert_split(self) -> int:
"""
Get the effective expert split size.
Both TEP and DEP split experts, TP doesn't (returns 1).
"""
if self.tep is not None:
return self.tep
if self.dep is not None:
return self.dep
return 1 # TP has expert split of 1
def get_attn_dp_size(self, num_gpus: int) -> int:
"""
Get the attention data parallelism size.
DEP uses data parallelism for attention (returns num_gpus).
TP and TEP don't use data parallelism for attention (returns 1).
Args:
num_gpus: Total number of GPUs being used
Returns:
The attention data parallelism size
"""
if self.dep is not None:
return num_gpus
return 1 # TP and TEP have attention DP size of 1
def _check_divisibility(
value: int | None,
divisor: int,
value_name: str,
divisor_name: str,
mapping_label: str,
) -> bool:
"""
Check if value is divisible by divisor.
Returns True if valid (or value is None), False if invalid.
Args:
value: The value to check (e.g., num_kv_heads, num_experts)
divisor: The divisor to check against
value_name: Name of the value for error messages
divisor_name: Name of the divisor for error messages (e.g., "tp_size", "expert_split")
mapping_label: Label of the mapping for error messages
"""
if value is None:
logger.warning(
f"Skipping {value_name} divisibility check for {mapping_label}: {value_name} is unknown"
)
return True
if divisor > 1 and int(value) % divisor != 0:
logger.warning(
f"Invalid mapping {mapping_label}: {value_name}={value} not divisible by {divisor_name}={divisor}"
)
return False
return True
def _validate_intermediate_size(
mapping: ParallelizationMapping,
intermediate_size: int | None,
quant_block: int | None,
) -> bool:
"""
Validate intermediate size and quantization block for TP and TEP strategies.
Checks:
- intermediate_size % tp_size == 0
- (intermediate_size // tp_size) divides quant_block (if quant_block is known)
"""
tp_size = mapping.get_tp_size()
# Check basic divisibility
if not _check_divisibility(
intermediate_size, tp_size, "intermediate_size", "tp_size", mapping.label()
):
return False
# Additional check for quantization block constraint
if intermediate_size is not None and quant_block is not None and tp_size > 1:
per_shard = int(intermediate_size) // tp_size
if not _check_divisibility(
per_shard, quant_block, "per_shard", "quant_block", mapping.label()
):
return False
return True
def get_candidate_parallel_mappings(
num_gpus: int, model_info: ModelInfo, phase: str
) -> list[ParallelizationMapping]:
"""
Return a list of candidate parallelization mappings for a given GPU count and phase,
verified against model properties.
Verification rules:
- TP and TEP must divide num_kv_heads (if available)
- TEP and DEP must divide num_experts (if available)
"""
is_moe = bool(model_info.is_moe)
num_kv_heads = model_info.num_kv_heads
num_experts = model_info.num_experts
intermediate_size = model_info.intermediate_size
quant_block = model_info.quantization_block_size
candidates: list[ParallelizationMapping] = []
if is_moe:
if phase == EngineType.PREFILL:
candidates = [ParallelizationMapping(tep=num_gpus)]
elif phase == EngineType.DECODE:
candidates = [
ParallelizationMapping(dep=num_gpus),
ParallelizationMapping(tep=num_gpus),
]
else:
candidates = [ParallelizationMapping(tp=num_gpus)]
# Verify candidates against model constraints
verified: list[ParallelizationMapping] = []
for m in candidates:
# Check KV heads divisibility
if not _check_divisibility(
num_kv_heads, m.get_tp_size(), "num_kv_heads", "tp_size", m.label()
):
continue
# Check experts divisibility
if not _check_divisibility(
num_experts, m.get_expert_split(), "num_experts", "expert_split", m.label()
):
continue
# Check intermediate size and quantization block
if not _validate_intermediate_size(m, intermediate_size, quant_block):
continue
verified.append(m)
return verified
def apply_parallel_mapping_to_config(
base_config: dict,
mapping: ParallelizationMapping,
phase: str,
config_modifier,
num_gpus_per_node: int | None,
) -> dict:
cfg = copy.deepcopy(base_config)
if mapping.tp is not None:
cfg = config_modifier.set_config_tp_size(cfg, mapping.tp)
elif phase == EngineType.PREFILL and mapping.tep is not None:
cfg = config_modifier.set_config_tep_size(cfg, mapping.tep, num_gpus_per_node)
elif phase == EngineType.DECODE and mapping.dep is not None:
cfg = config_modifier.set_config_dep_size(cfg, mapping.dep, num_gpus_per_node)
else:
pass
return cfg
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Protocol
from benchmarks.profiler.utils.defaults import EngineType
from dynamo.planner.defaults import SubComponentType
class ConfigModifierProtocol(Protocol):
@classmethod
def convert_config(
cls,
config: dict,
target: EngineType,
is_moe_model: bool = False,
) -> dict:
...
@classmethod
def set_config_tp_size(
cls,
config: dict,
tp_size: int,
component_type: SubComponentType = SubComponentType.DECODE,
) -> dict:
...
@classmethod
def set_config_tep_size(
cls,
config: dict,
tep_size: int,
num_gpus_per_node: int,
component_type: SubComponentType = SubComponentType.DECODE,
) -> dict:
...
@classmethod
def set_config_dep_size(
cls,
config: dict,
dep_size: int,
num_gpus_per_node: int,
component_type: SubComponentType = SubComponentType.DECODE,
) -> dict:
...
@classmethod
def get_model_name(cls, config: dict) -> str:
...
@classmethod
def get_port(cls, config: dict) -> int:
...
@classmethod
def get_kv_cache_size_from_dynamo_log(
cls, dynamo_log_fn: str, attention_dp_size: int = 1
) -> int:
...
@classmethod
def load_default_config(cls) -> dict:
...
@classmethod
def update_model(cls, config: dict, model_name: str) -> dict:
...
@classmethod
def update_image(cls, config: dict, image: str) -> dict:
...
......@@ -3,7 +3,6 @@
import logging
import re
from typing import Literal
import yaml
......@@ -22,6 +21,7 @@ from benchmarks.profiler.utils.config import (
from benchmarks.profiler.utils.defaults import (
DEFAULT_MODEL_NAME,
DYNAMO_RUN_DEFAULT_PORT,
EngineType,
)
from dynamo.planner.defaults import SubComponentType
......@@ -82,7 +82,7 @@ class SGLangConfigModifier:
def convert_config(
cls,
config: dict,
target: Literal["prefill", "decode"],
target: EngineType,
is_moe_model: bool = False,
) -> dict:
cfg = Config.model_validate(config)
......@@ -94,7 +94,7 @@ class SGLangConfigModifier:
if "Planner" in cfg.spec.services:
del cfg.spec.services["Planner"]
if target == "prefill":
if target == EngineType.PREFILL:
# Get service names by inferring from subComponentType first
prefill_service_name = get_service_name_by_type(
cfg, "sglang", SubComponentType.PREFILL
......@@ -131,7 +131,7 @@ class SGLangConfigModifier:
worker_service.extraPodSpec.mainContainer.args = args
elif target == "decode":
elif target == EngineType.DECODE:
# Get service names by inferring from subComponentType first
prefill_service_name = get_service_name_by_type(
cfg, "sglang", SubComponentType.PREFILL
......@@ -292,6 +292,12 @@ class SGLangConfigModifier:
return DEFAULT_MODEL_NAME
args = break_arguments(args)
# Check for --model-path first (primary argument for SGLang)
for i, arg in enumerate(args):
if arg == "--model-path" and i + 1 < len(args):
return args[i + 1]
# Fall back to --served-model-name if --model-path not found
for i, arg in enumerate(args):
if arg == "--served-model-name" and i + 1 < len(args):
return args[i + 1]
......
......@@ -4,7 +4,6 @@
import json
import logging
import re
from typing import Literal
import yaml
......@@ -24,6 +23,7 @@ from benchmarks.profiler.utils.config import (
from benchmarks.profiler.utils.defaults import (
DEFAULT_MODEL_NAME,
DYNAMO_RUN_DEFAULT_PORT,
EngineType,
)
from dynamo.planner.defaults import SubComponentType
......@@ -84,7 +84,7 @@ class TrtllmConfigModifier:
def convert_config(
cls,
config: dict,
target: Literal["prefill", "decode"],
target: EngineType,
is_moe_model: bool = False,
) -> dict:
if is_moe_model:
......@@ -101,7 +101,7 @@ class TrtllmConfigModifier:
if "Planner" in cfg.spec.services:
del cfg.spec.services["Planner"]
if target == "prefill":
if target == EngineType.PREFILL:
# Get service names by inferring from subComponentType first
prefill_service_name = get_service_name_by_type(
cfg, "trtllm", SubComponentType.PREFILL
......@@ -157,7 +157,7 @@ class TrtllmConfigModifier:
worker_service.extraPodSpec.mainContainer.args = args
elif target == "decode":
elif target == EngineType.DECODE:
# Get service names by inferring from subComponentType first
prefill_service_name = get_service_name_by_type(
cfg, "trtllm", SubComponentType.PREFILL
......
......@@ -2,7 +2,6 @@
# SPDX-License-Identifier: Apache-2.0
import logging
from typing import Literal
import yaml
......@@ -20,6 +19,7 @@ from benchmarks.profiler.utils.config import (
from benchmarks.profiler.utils.defaults import (
DEFAULT_MODEL_NAME,
DYNAMO_RUN_DEFAULT_PORT,
EngineType,
)
from dynamo.planner.defaults import SubComponentType
......@@ -79,7 +79,7 @@ class VllmV1ConfigModifier:
def convert_config(
cls,
config: dict,
target: Literal["prefill", "decode"],
target: EngineType,
is_moe_model: bool = False,
) -> dict:
if is_moe_model:
......@@ -96,7 +96,7 @@ class VllmV1ConfigModifier:
if "Planner" in cfg.spec.services:
del cfg.spec.services["Planner"]
if target == "prefill":
if target == EngineType.PREFILL:
# Get service names by inferring from subComponentType first
prefill_service_name = get_service_name_by_type(
cfg, "vllm", SubComponentType.PREFILL
......@@ -133,7 +133,7 @@ class VllmV1ConfigModifier:
worker_service.extraPodSpec.mainContainer.args = args
elif target == "decode":
elif target == EngineType.DECODE:
# Get service names by inferring from subComponentType first
prefill_service_name = get_service_name_by_type(
cfg, "vllm", SubComponentType.PREFILL
......
......@@ -13,9 +13,16 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from enum import Enum
DEFAULT_MODEL_NAME = "Qwen/Qwen3-0.6B"
DYNAMO_RUN_DEFAULT_PORT = 8000
# set a decode maximum concurrency due to limits of profiling tools
# for MoE models with attn-dp, we might hit this limit
DECODE_MAX_CONCURRENCY = 2000
class EngineType(str, Enum):
PREFILL = "prefill"
DECODE = "decode"
......@@ -5,6 +5,7 @@ from pathlib import Path
from typing import Optional, Union
from huggingface_hub import model_info
from pydantic import BaseModel
from transformers import AutoConfig
DTYPE_BYTES_MAP = {
......@@ -103,10 +104,21 @@ def get_model_weight_size(
return get_model_weight_size_from_hub(str(model_name_or_path))
class ModelInfo(BaseModel):
model_size: float
architecture: str
is_moe: bool
max_context_length: Optional[int] = None
num_experts: Optional[int] = None
intermediate_size: Optional[int] = None
num_kv_heads: Optional[int] = None
quantization_block_size: Optional[int] = None
def get_model_info(
model_name_or_path: Union[str, Path],
trust_remote_code: bool = False,
) -> dict:
) -> ModelInfo:
model_size = get_model_weight_size(model_name_or_path)
config = AutoConfig.from_pretrained(
......@@ -114,10 +126,11 @@ def get_model_info(
trust_remote_code=trust_remote_code,
)
if config.architectures[0] in MOE_ARCHITECTURES:
config.is_moe = True
architecture = config.architectures[0]
if architecture in MOE_ARCHITECTURES:
is_moe = True
else:
config.is_moe = False
is_moe = False
# Detect max context length from config
# Different models use different attribute names for max context length
......@@ -132,7 +145,7 @@ def get_model_info(
# Detect number of experts for MoE models
# Different models use different attribute names
num_experts = None
if config.is_moe:
if is_moe:
expert_attrs = [
"n_routed_experts", # DeepSeek V3/R1
"num_local_experts", # Mixtral, Qwen
......@@ -145,12 +158,78 @@ def get_model_info(
num_experts = value
break
return {
"model_size": model_size,
"is_moe": config.is_moe,
"max_context_length": max_context_length,
"num_experts": num_experts,
}
# Detect intermediate size (FFN hidden dimension)
intermediate_size = None
intermediate_attrs = [
"intermediate_size", # Most common (BERT, LLaMA, etc.)
"ffn_dim", # Some transformer models
]
for attr in intermediate_attrs:
if hasattr(config, attr):
value = getattr(config, attr)
if value is not None:
intermediate_size = value
break
# Detect number of key-value heads (for GQA)
num_kv_heads = None
kv_head_attrs = [
"num_key_value_heads", # LLaMA 2/3, Mistral, etc.
"num_kv_heads", # Alternative name
]
for attr in kv_head_attrs:
if hasattr(config, attr):
value = getattr(config, attr)
if value is not None:
num_kv_heads = value
break
# If not found, check if it equals num_attention_heads (standard MHA)
if num_kv_heads is None and hasattr(config, "num_attention_heads"):
num_kv_heads = config.num_attention_heads
# Detect quantization block size
quantization_block_size = None
if hasattr(config, "quantization_config"):
quant_config = config.quantization_config
if isinstance(quant_config, dict):
# Check for common quantization block size attributes
quantization_block_size = (
quant_config.get("weight_block_size")
or quant_config.get("block_size")
or quant_config.get("group_size")
or quant_config.get("q_group_size")
)
elif quant_config is not None:
# Handle object-based quantization config
for attr in [
"weight_block_size",
"block_size",
"group_size",
"q_group_size",
]:
if hasattr(quant_config, attr):
value = getattr(quant_config, attr)
if value is not None:
quantization_block_size = value
break
# Handle case where block size is a list (e.g., [128, 128] for [input, output] block sizes)
if (
isinstance(quantization_block_size, list)
and len(quantization_block_size) > 0
):
quantization_block_size = max(quantization_block_size)
return ModelInfo(
model_size=model_size,
architecture=architecture,
is_moe=is_moe,
max_context_length=max_context_length,
num_experts=num_experts,
intermediate_size=intermediate_size,
num_kv_heads=num_kv_heads,
quantization_block_size=quantization_block_size,
)
if __name__ == "__main__":
......
......@@ -14,6 +14,7 @@
# limitations under the License.
import logging
from collections import defaultdict
import matplotlib.pyplot as plt
import numpy as np
......@@ -33,22 +34,27 @@ console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
def plot_prefill_performance(prefill_results, target_ttft, output_dir):
def plot_prefill_performance(prefill_data, target_ttft, output_dir):
"""
Plot prefill performance as a 2D scatter plot with GPU count annotations.
Plot prefill performance as a 2D scatter plot with GPU count and mapping annotations.
Args:
prefill_results: tuple of (prefill_num_gpu, prefill_ttft, prefill_thpt_per_gpu)
prefill_data: PrefillProfileData instance containing profiling results
target_ttft: target TTFT value for the vertical line
output_dir: directory to save the plot
"""
prefill_num_gpu, prefill_ttft, prefill_thpt_per_gpu = prefill_results
plt.figure(figsize=(10, 6))
plt.scatter(prefill_ttft, prefill_thpt_per_gpu, s=100)
for i, num_gpu in enumerate(prefill_num_gpu):
plt.scatter(prefill_data.ttft, prefill_data.thpt_per_gpu, s=100)
for i, num_gpu in enumerate(prefill_data.num_gpus):
label_suffix = (
f" [{prefill_data.parallel_mapping_labels[i]}]"
if prefill_data.parallel_mapping_labels
and i < len(prefill_data.parallel_mapping_labels)
else ""
)
plt.annotate(
f"{num_gpu} GPU(s)",
(prefill_ttft[i], prefill_thpt_per_gpu[i]),
f"{num_gpu} GPU(s){label_suffix}",
(prefill_data.ttft[i], prefill_data.thpt_per_gpu[i]),
xytext=(10, 0),
textcoords="offset points",
fontsize=10,
......@@ -70,19 +76,46 @@ def plot_prefill_performance(prefill_results, target_ttft, output_dir):
plt.close()
def plot_decode_performance(decode_results, target_itl, output_dir):
def plot_decode_performance(decode_data, target_itl, output_dir):
"""
Plot decode performance with multiple GPU count lines.
Args:
decode_results: list of tuples (num_gpu, itl_list, thpt_per_gpu_list)
decode_data: DecodeProfileData instance containing profiling results
target_itl: target ITL value for the vertical line
output_dir: directory to save the plot
"""
plt.figure(figsize=(10, 6))
for num_gpu, itl_list, thpt_per_gpu_list in decode_results:
plt.plot(itl_list, thpt_per_gpu_list, label=f"{num_gpu} GPU(s)")
# Group data by (num_gpus, parallel_mapping_label) combination
grouped_data: defaultdict[tuple[int, str], dict[str, list[float]]] = defaultdict(
lambda: {"itl": [], "thpt": []}
)
for i in range(len(decode_data.num_gpus)):
num_gpu = decode_data.num_gpus[i]
label = (
decode_data.parallel_mapping_labels[i]
if decode_data.parallel_mapping_labels
else ""
)
key = (num_gpu, label)
grouped_data[key]["itl"].append(decode_data.itl[i])
grouped_data[key]["thpt"].append(decode_data.thpt_per_gpu[i])
# Plot each group as a line
for (num_gpu, parallel_mapping_label), data in sorted(grouped_data.items()):
if parallel_mapping_label:
label = f"{num_gpu} GPU(s) [{parallel_mapping_label}]"
else:
label = f"{num_gpu} GPU(s)"
# Sort by ITL for proper line plotting
sorted_pairs = sorted(zip(data["itl"], data["thpt"]))
itl_sorted = [x[0] for x in sorted_pairs]
thpt_sorted = [x[1] for x in sorted_pairs]
plt.plot(itl_sorted, thpt_sorted, label=label, marker="o")
plt.axvline(
x=target_itl, color="r", linestyle="--", label=f"Target ITL: {target_itl} ms"
......@@ -253,18 +286,24 @@ def plot_decode_3d_surface(
plt.close()
def plot_pd_joint_results(isl, osl, prefill_results, decode_results, output_dir):
def plot_pd_joint_results(isl, osl, prefill_data, decode_data, output_dir):
"""
Plot joint prefill and decode results showing cost per 1000 requests under different SLA.
Args:
isl: input sequence length
osl: output sequence length
prefill_data: PrefillProfileData instance containing profiling results
decode_data: DecodeProfileData instance containing profiling results
output_dir: directory to save the plot
"""
GPU_COST_PER_HOUR = 3.0 # $3/hour
# compute pareto front for prefill
p_ttft, p_thpt = compute_pareto(prefill_results[1], prefill_results[2])
p_ttft, p_thpt = compute_pareto(prefill_data.ttft, prefill_data.thpt_per_gpu)
# compute pareto front for decode
_d_itl, _d_thpt = [], []
for _d_result in decode_results:
_d_itl.extend(_d_result[1])
_d_thpt.extend(_d_result[2])
d_itl, d_thpt = compute_pareto(_d_itl, _d_thpt)
d_itl, d_thpt = compute_pareto(decode_data.itl, decode_data.thpt_per_gpu)
# convert to cost per thousand requests
p_ttft = np.array(p_ttft)
......
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import glob
import json
import logging
import os
import re
from typing import List, Optional, Tuple
logger = logging.getLogger(__name__)
def check_prefill_results_exist(output_dir: str, tp_size: int, isl: int) -> bool:
"""Check if prefill results already exist for a given TP size."""
work_dir = f"{output_dir}/prefill_tp{tp_size}"
result_file = f"{work_dir}/aiperf_isl{isl}/*/profile_export_aiperf.json"
# Check if the work directory exists
if not os.path.exists(work_dir):
return False
# Look for the aiperf result file
result_files = glob.glob(result_file)
if not result_files:
return False
# Verify the result file has valid data
try:
with open(result_files[0], "r") as f:
data = json.load(f)
# Check if it has the required metrics
if "time_to_first_token" in data and "avg" in data["time_to_first_token"]:
logger.info(
f"Found existing prefill results for TP{tp_size} at {result_files[0]}"
)
return True
except (json.JSONDecodeError, KeyError, FileNotFoundError):
pass
return False
def check_decode_results_exist(
output_dir: str, tp_size: int, isl: int, osl: int
) -> bool:
"""Check if decode results already exist for a given TP size."""
work_dir = f"{output_dir}/decode_tp{tp_size}"
# Check if the work directory exists
if not os.path.exists(work_dir):
return False
# Look for at least one decode result file
result_pattern = (
f"{work_dir}/aiperf_request*_isl{isl}_osl{osl}_n*/*/profile_export_aiperf.json"
)
result_files = glob.glob(result_pattern)
if not result_files:
return False
# Verify at least one result file has valid data
try:
with open(result_files[0], "r") as f:
data = json.load(f)
# Check if it has the required metrics
if "inter_token_latency" in data and "avg" in data["inter_token_latency"]:
logger.info(
f"Found existing decode results for TP{tp_size} at {result_files[0]} (and {len(result_files)-1} others)"
)
return True
except (json.JSONDecodeError, KeyError, FileNotFoundError):
pass
return False
def load_existing_prefill_results(
output_dir: str, tp_size: int, isl: int
) -> Tuple[Optional[float], Optional[float]]:
"""Load existing prefill results from disk."""
work_dir = f"{output_dir}/prefill_tp{tp_size}"
result_file = f"{work_dir}/aiperf_isl{isl}/*/profile_export_aiperf.json"
result_files = glob.glob(result_file)
if result_files:
try:
with open(result_files[0], "r") as f:
data = json.load(f)
ttft = data["time_to_first_token"]["avg"]
thpt_per_gpu = isl / ttft / tp_size * 1000
return ttft, thpt_per_gpu
except (json.JSONDecodeError, KeyError, FileNotFoundError):
pass
return None, None
def load_existing_decode_results(
output_dir: str, tp_size: int, isl: int, osl: int
) -> List[Tuple[float, float, int]]:
"""Load existing decode results from disk."""
work_dir = f"{output_dir}/decode_tp{tp_size}"
result_pattern = (
f"{work_dir}/aiperf_request*_isl{isl}_osl{osl}_n*/*/profile_export_aiperf.json"
)
result_files = glob.glob(result_pattern)
decode_results = []
for result_file in result_files:
try:
with open(result_file, "r") as f:
data = json.load(f)
itl = data["inter_token_latency"]["avg"]
thpt_per_gpu = data["output_token_throughput"]["avg"] / tp_size
# Extract concurrency from filename
match = re.search(r"aiperf_request(\d+)_", result_file)
if match:
concurrency = int(match.group(1))
decode_results.append((itl, thpt_per_gpu, concurrency))
except (json.JSONDecodeError, KeyError, FileNotFoundError):
continue
return decode_results
......@@ -76,8 +76,6 @@ def create_profiler_parser() -> argparse.Namespace:
max_num_gpus_per_engine: Int (maximum number of GPUs per engine, default: 0)
num_gpus_per_node: Int (number of GPUs per node for MoE models - this will be the granularity when searching for the best TEP/DEP size, default: 0)
sweep:
skip_existing_results: Boolean (skip TP sizes that already have results in the output directory, default: False)
force_rerun: Boolean (force re-running all tests even if results already exist (overrides --skip-existing-results), default: False)
prefill_interpolation_granularity: Int (how many samples to benchmark to interpolate TTFT under different ISL, default: 16)
decode_interpolation_granularity: Int (how many samples to benchmark to interpolate ITL under different active kv cache size and decode context length, default: 6)
use_ai_configurator: Boolean (use ai-configurator to estimate benchmarking results instead of running actual deployment, default: False)
......@@ -158,26 +156,20 @@ def create_profiler_parser() -> argparse.Namespace:
parser.add_argument(
"--min-num-gpus-per-engine",
type=int,
default=config.get("hardware", {}).get("min_num_gpus_per_engine", 1),
default=config.get("hardware", {}).get("min_num_gpus_per_engine", 0),
help="minimum number of GPUs per engine",
)
parser.add_argument(
"--max-num-gpus-per-engine",
type=int,
default=config.get("hardware", {}).get("max_num_gpus_per_engine", 8),
default=config.get("hardware", {}).get("max_num_gpus_per_engine", 0),
help="maximum number of GPUs per engine",
)
parser.add_argument(
"--skip-existing-results",
action="store_true",
default=config.get("sweep", {}).get("skip_existing_results", False),
help="Skip TP sizes that already have results in the output directory",
)
parser.add_argument(
"--force-rerun",
action="store_true",
default=config.get("sweep", {}).get("force_rerun", False),
help="Force re-running all tests even if results already exist (overrides --skip-existing-results)",
"--num-gpus-per-node",
type=int,
default=config.get("hardware", {}).get("num_gpus_per_node", 0),
help="Number of GPUs per node for MoE models - this will be the granularity when searching for the best TEP/DEP size",
)
parser.add_argument(
"--isl",
......@@ -235,19 +227,6 @@ def create_profiler_parser() -> argparse.Namespace:
default=config.get("sweep", {}).get("dry_run", False),
help="Dry run the profile job",
)
parser.add_argument(
"--is-moe-model",
action="store_true",
dest="is_moe_model",
default=config.get("engine", {}).get("is_moe_model", False),
help="Enable MoE (Mixture of Experts) model support, use TEP for prefill and DEP for decode",
)
parser.add_argument(
"--num-gpus-per-node",
type=int,
default=config.get("hardware", {}).get("num_gpus_per_node", 8),
help="Number of GPUs per node for MoE models - this will be the granularity when searching for the best TEP/DEP size",
)
parser.add_argument(
"--enable-gpu-discovery",
action="store_true",
......@@ -311,9 +290,5 @@ def create_profiler_parser() -> argparse.Namespace:
if not args.model and not args.config:
parser.error("--model or --config is required (provide at least one)")
# Run auto-generation if GPU discovery is enabled
# This will override any manually specified hardware parameters
if args.enable_gpu_discovery:
auto_generate_search_space(args)
return args
......@@ -9,7 +9,7 @@ import os
import yaml
from benchmarks.profiler.utils.config_modifiers import CONFIG_MODIFIERS
from benchmarks.profiler.utils.model_info import get_model_info
from benchmarks.profiler.utils.model_info import ModelInfo, get_model_info
from deploy.utils.gpu_inventory import get_gpu_summary
logger = logging.getLogger(__name__)
......@@ -23,7 +23,9 @@ console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
MODEL_GPU_MEM_FRAC_MAX = 0.9
MOE_MODEL_MAX_NUM_GPUS = 32
# for MoE models, we sweep up to number of GPUs that can hold 8x the model weights
MOE_MODEL_MAX_NUM_GPU_FACTOR = 8
def auto_generate_search_space(args: argparse.Namespace) -> None:
......@@ -31,17 +33,16 @@ def auto_generate_search_space(args: argparse.Namespace) -> None:
args.backend
] # args.backend is already validated in argparse
# first check if config file exists
if args.model:
# first get the config
if not args.config:
# modify config file from default config file
logger.info("DGD config file not provided, using default config file")
config = config_modifier.load_default_config()
else:
with open(args.config, "r") as f:
config = yaml.safe_load(f)
if args.model:
logger.info(f"Updating model in DGD config file to {args.model}")
config = config_modifier.update_model(config, args.model)
if args.dgd_image:
......@@ -55,23 +56,26 @@ def auto_generate_search_space(args: argparse.Namespace) -> None:
yaml.dump(config, f)
args.config = config_fn
# now determine the search space
model_info = None
if args.model:
# get model info and update args
model_info: ModelInfo | None = None
if not args.model:
# get the model name from config
args.model = config_modifier.get_model_name(config)
logger.info(f"Getting model info for {args.model}...")
model_info = get_model_info(args.model)
num_experts_str = (
f", num_experts={model_info['num_experts']}"
if model_info.get("num_experts")
f", num_experts={model_info.num_experts}"
if model_info.num_experts is not None
else ""
)
logger.info(
f"Model {args.model} has size {model_info['model_size']}, is_moe={model_info['is_moe']}, and max_context_length={model_info['max_context_length']}{num_experts_str}"
f"Model {args.model} has size {model_info.model_size}, is_moe={model_info.is_moe}, and max_context_length={model_info.max_context_length}{num_experts_str}"
)
args.is_moe_model = model_info["is_moe"] # type: ignore[assignment]
args.max_context_length = model_info["max_context_length"] # type: ignore[assignment]
args.model_info = model_info
# now determine the search space
if args.enable_gpu_discovery:
if (
args.min_num_gpus_per_engine == 0
or args.max_num_gpus_per_engine == 0
......@@ -90,16 +94,20 @@ def auto_generate_search_space(args: argparse.Namespace) -> None:
)
# model_info should be set by now (checked above), but mypy needs explicit verification
assert model_info is not None, "model_info must be set when model is provided"
assert (
model_info is not None
), "model_info must be set when model is provided"
vram_mib = int(gpu_info["vram"]) # type: ignore[call-overload]
gpus_per_node = int(gpu_info["gpus_per_node"]) # type: ignore[call-overload]
min_gpu = math.ceil(
model_info["model_size"] / MODEL_GPU_MEM_FRAC_MAX / gpu_info["vram"] # type: ignore[operator]
)
max_gpu = (
gpu_info["gpus_per_node"] # type: ignore[misc]
if not model_info["is_moe"]
else MOE_MODEL_MAX_NUM_GPUS
model_info.model_size / MODEL_GPU_MEM_FRAC_MAX / vram_mib
)
if not model_info.is_moe:
max_gpu = gpus_per_node
else:
max_gpu = max(min_gpu * MOE_MODEL_MAX_NUM_GPU_FACTOR, gpus_per_node)
if min_gpu > max_gpu:
error_msg = f"No valid GPU configuration found for model {args.model} on the cluster with {gpu_info['gpus_per_node']}x{gpu_info['model']} GPUs per node"
logger.error(error_msg)
......@@ -110,7 +118,22 @@ def auto_generate_search_space(args: argparse.Namespace) -> None:
)
args.min_num_gpus_per_engine = min_gpu
args.max_num_gpus_per_engine = max_gpu
args.num_gpus_per_node = gpu_info["gpus_per_node"] # type: ignore[assignment]
args.num_experts = model_info.get("num_experts") # type: ignore[assignment]
args.num_gpus_per_node = gpus_per_node # type: ignore[assignment]
else:
# use default values for GPUs
if args.min_num_gpus_per_engine == 0:
logger.warning(
"GPU discover is disabled and min_num_gpus_per_engine is not specified, setting to 1"
)
args.min_num_gpus_per_engine = 1
if args.max_num_gpus_per_engine == 0:
logger.warning(
"GPU discover is disabled and max_num_gpus_per_engine is not specified, setting to 4"
)
args.max_num_gpus_per_engine = 4
if args.num_gpus_per_node == 0:
logger.warning(
"GPU discover is disabled and num_gpus_per_node is not specified, setting to 8"
)
args.num_gpus_per_node = 8
return
......@@ -37,8 +37,7 @@ spec:
# Engine configuration
engine:
max_context_length: 16384 # Maximum context length supported by the model
is_moe_model: false # Enable MoE model support (uses TEP/DEP instead of TP)
max_context_length: 16384 # will override max context length of the model if provided
# Hardware configuration
hardware:
......
......@@ -852,7 +852,7 @@ var _ = Describe("DGDR Helper Functions", func() {
ProfilingConfig: nvidiacomv1alpha1.ProfilingConfigSpec{
Config: createTestConfig(map[string]interface{}{
"sweep": map[string]interface{}{
"force_rerun": true,
"prefill_interpolation_granularity": 16,
},
}),
},
......
......@@ -315,7 +315,8 @@ profilingConfig:
# Profiling sweep settings (optional)
sweep:
force_rerun: false
prefill_interpolation_granularity: 16 # Number of samples for prefill ISL sweep
decode_interpolation_granularity: 6 # Number of samples for decode sweep
```
> **Note**: `engine.config` is a **file path** to a DGD YAML file, not inline configuration. Use ConfigMapRef (recommended) or leave it unset to auto-generate.
......
......@@ -18,6 +18,19 @@ project_root = Path(__file__).parent.parent.parent
sys.path.insert(0, str(project_root))
from benchmarks.profiler.profile_sla import run_profile # noqa: E402
from benchmarks.profiler.utils.model_info import ModelInfo # noqa: E402
# Override the logger fixture from conftest.py to prevent directory creation
@pytest.fixture(autouse=True)
def logger(request):
"""Override the logger fixture to prevent test directory creation.
This replaces the logger fixture from tests/conftest.py that creates
directories named after each test.
"""
# Simply do nothing - no directories created, no file handlers added
yield
class TestProfileSlaAiconfigurator:
......@@ -41,11 +54,9 @@ class TestProfileSlaAiconfigurator:
self.osl = 500
self.ttft = 50
self.itl = 10
self.max_context_length = 16384
self.prefill_interpolation_granularity = 16
self.decode_interpolation_granularity = 6
self.service_name = ""
self.is_moe_model = False
self.dry_run = False
self.use_ai_configurator = True
self.aic_system = "h200_sxm"
......@@ -54,6 +65,13 @@ class TestProfileSlaAiconfigurator:
self.aic_backend_version = "0.20.0"
self.num_gpus_per_node = 8
self.deploy_after_profile = False
# Provide minimal model_info to avoid HF queries
self.model_info = ModelInfo(
model_size=16384.0,
architecture="TestArchitecture",
is_moe=False,
max_context_length=16384,
)
return Args()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment