Unverified Commit 5e4a339a authored by Hongkuan Zhou's avatar Hongkuan Zhou Committed by GitHub
Browse files

feat: support TEP/DEP for DSR1 in SGLang MoE Planner (#4203)


Signed-off-by: default avatarhongkuanz <hongkuanz@nvidia.com>
Signed-off-by: default avatarHongkuan Zhou <tedzhouhk@gmail.com>
Co-authored-by: default avatarcoderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
parent a91e6348
......@@ -22,7 +22,10 @@ from dataclasses import dataclass, field
import numpy as np
import yaml
from benchmarks.profiler.utils.aiperf import benchmark_decode, benchmark_prefill
from benchmarks.profiler.utils.aiperf import (
get_decode_itl_and_thpt_per_gpu,
get_prefill_ttft,
)
from benchmarks.profiler.utils.config_modifiers import CONFIG_MODIFIERS
from benchmarks.profiler.utils.config_modifiers.parallelization_mapping import (
ParallelizationMapping,
......@@ -135,7 +138,7 @@ async def run_profile(args):
# Log MoE model support
if args.model_info.is_moe:
logger.info(
"MoE (Mixture of Experts) model profiling, sweeping TEP size for prefill and DEP size for decode"
"MoE (Mixture of Experts) model profiling, sweeping TEP/DEP size for prefill and decode"
)
assert args.backend in [
"sglang"
......@@ -145,7 +148,7 @@ async def run_profile(args):
), "MoE model is not supported in ai-configurator"
else:
logger.info(
"Standard dense model profiling, sweeping TP size for both prefill and decode"
"Dense model profiling, sweeping TP size for prefill and decode"
)
config_modifier = CONFIG_MODIFIERS[args.backend]
......@@ -155,7 +158,7 @@ async def run_profile(args):
if args.dgd_image:
config = config_modifier.update_image(config, args.dgd_image)
logger.info(f"Using DGD image: {args.dgd_image}")
logger.debug(f"Using DGD image: {args.dgd_image}")
profile_num_gpus = [
2**i
......@@ -210,7 +213,7 @@ async def run_profile(args):
"Must provide --aic-backend-version when using --use-ai-configurator."
)
logger.info("Will use aiconfigurator to estimate perf.")
logger.info("Using aiconfigurator to estimate performance...")
ai_configurator_perf_estimator = AIConfiguratorPerfEstimator(
args.aic_model_name,
args.aic_system.lower(),
......@@ -220,7 +223,7 @@ async def run_profile(args):
else:
if args.aic_system or args.aic_model_name or args.aic_backend_version:
logger.warning(
"Will ignore --aic-system, --aic-model-name, and/or --backend-version "
"Ignoring --aic-system, --aic-model-name, and/or --backend-version "
"when not using --use-ai-configurator."
)
......@@ -248,7 +251,7 @@ async def run_profile(args):
config_modifier,
args.num_gpus_per_node,
)
logger.info(f"Dynamo config: {prefill_config}")
logger.debug(f"Dynamo config: {prefill_config}")
# Work dir includes mapping label (safe chars only)
parallel_mapping_tag = (
......@@ -267,10 +270,10 @@ async def run_profile(args):
if args.dry_run:
logger.info("Skipping deployment creation in dry run mode")
elif args.use_ai_configurator:
logger.info("Using ai-configurator to estimate prefill latency.")
logger.info("Using ai-configurator to estimate prefill latency")
perf_dict = ai_configurator_perf_estimator.estimate_prefill_perf(
args.isl,
tp_size=(mapping.tp or num_gpus),
tp_size=mapping.get_tp_size(),
)
ttft = perf_dict["context_latency"]
logger.info(f"Estimated prefill TTFT: {ttft:.2f}ms")
......@@ -301,15 +304,14 @@ async def run_profile(args):
# run ai-perf
base_url = client.get_service_url()
ai_perf_artifact_dir = f"{work_dir}/aiperf_isl{args.isl}"
aiperf_result = benchmark_prefill(
ttft = get_prefill_ttft(
args.isl,
ai_perf_artifact_dir,
model_name,
model_name,
base_url=base_url,
base_url,
attention_dp_size=mapping.get_attn_dp_size(),
)
if aiperf_result is not None:
ttft = aiperf_result["time_to_first_token"]["avg"]
logger.info("Cleaning up deployment...")
await client.delete_deployment()
......@@ -320,7 +322,11 @@ async def run_profile(args):
prefill_data.add_data(
num_gpus=num_gpus,
ttft=ttft,
thpt_per_gpu=args.isl / ttft / num_gpus * 1000,
thpt_per_gpu=args.isl
/ ttft
/ num_gpus
* 1000
* mapping.get_attn_dp_size(),
parallel_mapping_label=mapping.label(),
parallel_mapping=mapping,
)
......@@ -350,7 +356,7 @@ async def run_profile(args):
config_modifier,
args.num_gpus_per_node,
)
logger.info(f"Dynamo config: {decode_config}")
logger.debug(f"Dynamo config: {decode_config}")
parallel_mapping_tag = (
mapping.label()
......@@ -373,7 +379,7 @@ async def run_profile(args):
# Compute max_concurrency and max_kv_tokens to know which
# num_request to sweep over.
max_concurrency = ai_configurator_perf_estimator.get_max_batch_size(
args.isl, args.osl, tp_size=(mapping.tp or num_gpus)
args.isl, args.osl, tp_size=mapping.get_tp_size()
)
max_kv_tokens = max_concurrency * (args.isl + args.osl)
......@@ -400,7 +406,7 @@ async def run_profile(args):
# Compute max_concurrency and max_kv_tokens to know which
# num_request to sweep over.
attention_dp_size = mapping.get_attn_dp_size(num_gpus)
attention_dp_size = mapping.get_attn_dp_size()
max_kv_tokens = config_modifier.get_kv_cache_size_from_dynamo_log(
f"{work_dir}/{client.deployment_name}/{WORKER_COMPONENT_NAMES[args.backend].decode_worker_k8s_name.lower()}/0.log",
attention_dp_size=attention_dp_size,
......@@ -408,7 +414,7 @@ async def run_profile(args):
max_concurrency = max_kv_tokens // (args.isl + args.osl)
if not args.dry_run:
attention_dp_size = mapping.get_attn_dp_size(num_gpus)
attention_dp_size = mapping.get_attn_dp_size()
sweep_num_request = get_num_request_range(
attention_dp_size,
max_concurrency,
......@@ -429,7 +435,7 @@ async def run_profile(args):
args.osl,
num_request,
mode=EngineType.DECODE,
tp_size=(mapping.tp or num_gpus),
tp_size=mapping.get_tp_size(),
)
itl = perf_dict["tpot"]
......@@ -441,7 +447,7 @@ async def run_profile(args):
else:
base_url = client.get_service_url()
ai_perf_artifact_dir = f"{work_dir}/aiperf_request{num_request}_isl{args.isl}_osl{args.osl}_n{num_request}"
aiperf_result = benchmark_decode(
itl, thpt_per_gpu = get_decode_itl_and_thpt_per_gpu(
args.isl,
args.osl,
num_request,
......@@ -449,12 +455,7 @@ async def run_profile(args):
model_name,
model_name,
base_url=base_url,
)
if aiperf_result is not None:
itl = aiperf_result["inter_token_latency"]["avg"]
thpt_per_gpu = (
aiperf_result["output_token_throughput"]["avg"]
/ num_gpus
num_gpus=num_gpus,
)
if itl is not None and thpt_per_gpu is not None:
......@@ -493,8 +494,8 @@ async def run_profile(args):
# select best parallel mapping for prefill
if min(prefill_data.ttft) > args.ttft:
logger.info(
"No TP size satisfies the TTFT requirement, please try a smaller model or a more powerful GPU SKU"
logger.warning(
"No engine configuration satisfies the TTFT requirement, please try a smaller model or more powerful hardware"
)
selected_prefill_idx = int(np.argmin(np.array(prefill_data.ttft)))
else:
......@@ -514,8 +515,8 @@ async def run_profile(args):
logger.error("No decode results produced; skipping recommendations.")
return
if min(decode_data.itl) > args.itl:
logger.info(
"No TP size satisfies the ITL requirement, please try a smaller model or a more powerful GPU SKU"
logger.warning(
"No engine configuration satisfies the ITL requirement, please try a smaller model or more powerful hardware"
)
selected_decode_idx = int(np.argmin(np.array(decode_data.itl)))
else:
......@@ -559,7 +560,7 @@ async def run_profile(args):
config_modifier,
args.num_gpus_per_node,
)
logger.info(f"Dynamo config: {prefill_config}")
logger.debug(f"Dynamo config: {prefill_config}")
work_dir = f"{args.output_dir}/selected_prefill_interpolation"
os.makedirs(work_dir, exist_ok=True)
......@@ -577,7 +578,7 @@ async def run_profile(args):
sweep_max_context_length,
args.prefill_interpolation_granularity,
ai_configurator_perf_estimator,
tp_size=(best_prefill_mapping.tp or best_prefill_gpus),
tp_size=best_prefill_mapping.get_tp_size(),
)
else:
client = DynamoDeploymentClient(
......@@ -619,6 +620,7 @@ async def run_profile(args):
best_prefill_gpus,
sweep_max_context_length,
args.prefill_interpolation_granularity,
attention_dp_size=best_prefill_mapping.get_attn_dp_size(),
)
logger.info("Cleaning up deployment...")
......@@ -642,7 +644,7 @@ async def run_profile(args):
config_modifier,
args.num_gpus_per_node,
)
logger.info(f"Dynamo config: {decode_config}")
logger.debug(f"Dynamo config: {decode_config}")
work_dir = f"{args.output_dir}/selected_decode_interpolation"
os.makedirs(work_dir, exist_ok=True)
......@@ -654,9 +656,9 @@ async def run_profile(args):
if args.dry_run:
logger.info("Skipping deployment creation in dry run mode")
elif args.use_ai_configurator:
attention_dp_size = best_decode_mapping.get_attn_dp_size(best_decode_gpus)
attention_dp_size = best_decode_mapping.get_attn_dp_size()
max_kv_tokens = ai_configurator_perf_estimator.get_max_kv_tokens(
args.isl, args.osl, tp_size=(best_decode_mapping.tp or best_decode_gpus)
args.isl, args.osl, tp_size=best_decode_mapping.get_tp_size()
)
profile_decode_aiconfigurator(
work_dir,
......@@ -666,7 +668,7 @@ async def run_profile(args):
args.decode_interpolation_granularity,
ai_configurator_perf_estimator,
attention_dp_size,
tp_size=(best_decode_mapping.tp or best_decode_gpus),
tp_size=best_decode_mapping.get_tp_size(),
)
else:
client = DynamoDeploymentClient(
......@@ -689,7 +691,7 @@ async def run_profile(args):
f"Logs have been saved to {client.base_log_dir / client.deployment_name}"
)
attention_dp_size = best_decode_mapping.get_attn_dp_size(best_decode_gpus)
attention_dp_size = best_decode_mapping.get_attn_dp_size()
max_kv_tokens = config_modifier.get_kv_cache_size_from_dynamo_log(
f"{work_dir}/{client.deployment_name}/{WORKER_COMPONENT_NAMES[args.backend].decode_worker_k8s_name.lower()}/0.log",
attention_dp_size=attention_dp_size,
......@@ -725,7 +727,7 @@ async def run_profile(args):
is_moe_model=args.model_info.is_moe,
num_gpus_per_node=args.num_gpus_per_node,
)
logger.info(f"Final DGD config with planner: {config}")
logger.debug(f"Final DGD config with planner: {config}")
# save DGD config with planner; support multi-document output when a ConfigMap is included
with open(f"{args.output_dir}/config_with_planner.yaml", "w") as f:
......
......@@ -18,6 +18,7 @@ import logging
import os
import random
import subprocess
from typing import Optional, Tuple
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
......@@ -36,6 +37,7 @@ def _get_common_aiperf_cmd(
model="deepseek-ai/DeepSeek-R1-Distill-Llama-8B",
tokenizer="deepseek-ai/DeepSeek-R1-Distill-Llama-8B",
base_url="http://localhost:8000",
warmup_request_count: int = 3,
):
return [
"aiperf",
......@@ -56,11 +58,13 @@ def _get_common_aiperf_cmd(
"--extra-inputs",
'{"nvext":{"ignore_eos":true}}',
"--warmup-request-count",
"3",
str(warmup_request_count),
"--artifact-dir",
artifact_dir,
"--random-seed",
str(seed),
"--request-timeout-seconds",
"1800",
]
......@@ -72,6 +76,9 @@ def get_prefill_aiperf_cmd(
tokenizer="deepseek-ai/DeepSeek-R1-Distill-Llama-8B",
osl=5,
base_url="http://localhost:8000",
concurrency: int = 1,
request_count: int = 1,
warmup_request_count: int = 3,
):
return _get_common_aiperf_cmd(
artifact_dir,
......@@ -79,6 +86,7 @@ def get_prefill_aiperf_cmd(
model,
tokenizer,
base_url,
warmup_request_count=warmup_request_count,
) + [
"--synthetic-input-tokens-mean",
str(isl),
......@@ -93,9 +101,9 @@ def get_prefill_aiperf_cmd(
"--extra-inputs",
f"min_tokens:{osl}",
"--concurrency",
"1",
str(concurrency),
"--request-count",
"1",
str(request_count),
]
......@@ -157,6 +165,9 @@ def benchmark_prefill(
model_name,
tokenizer,
base_url="http://localhost:8000",
concurrency: int = 1,
request_count: int = 1,
warmup_request_count: int = 3,
):
logger.info(f"Running aiperf with isl {isl}")
aiperf_cmd = get_prefill_aiperf_cmd(
......@@ -165,9 +176,12 @@ def benchmark_prefill(
model=model_name,
tokenizer=tokenizer,
base_url=base_url,
concurrency=concurrency,
request_count=request_count,
warmup_request_count=warmup_request_count,
)
print(f"aiperf cmd: {aiperf_cmd}")
# import pdb; pdb.set_trace()
logger.debug(f"aiperf cmd: {aiperf_cmd}")
aiperf_process = subprocess.Popen(
aiperf_cmd,
stdout=subprocess.PIPE,
......@@ -177,7 +191,7 @@ def benchmark_prefill(
stdout, stderr = aiperf_process.communicate()
if aiperf_process.returncode == 0:
logger.info("AIperf profiling completed successfully")
logger.info(stdout)
logger.debug(stdout)
aiperf_result = get_aiperf_result(aiperf_artifact_dir)
return aiperf_result
else:
......@@ -186,6 +200,98 @@ def benchmark_prefill(
return None
def get_prefill_ttft(
isl: int,
aiperf_artifact_dir: str,
model_name: str,
tokenizer: str,
base_url: str = "http://localhost:8000",
attention_dp_size: int = 1,
attn_dp_num_req_ratio: int = 4,
) -> Optional[float]:
"""
Run prefill benchmark and extract TTFT (ms). Returns None on failure.
If attention_dp_size > 1 (DEP), send attn_dp_size * attn_dp_num_req_ratio concurrent requests (single burst),
then compute TTFT as (max TTFT across burst) / attn_dp_num_req_ratio.
attn_dp_num_req_ratio defaults to 4 rounds to account for the error margin caused
by the first batch being launched too early without enough requests.
"""
# DEP-aware measurement (waves of size attention_dp_size)
if attention_dp_size > 1:
total_concurrency = attention_dp_size * attn_dp_num_req_ratio
logger.info(
f"DEP prefill measurement: isl={isl}, attn_dp={attention_dp_size}, attn_dp_num_req_ratio={attn_dp_num_req_ratio}, "
f"total_concurrency={total_concurrency}"
)
# Run aiperf with the requested concurrency; allow normal warmup behavior
aiperf_result = benchmark_prefill(
isl,
aiperf_artifact_dir,
model_name,
tokenizer,
base_url=base_url,
concurrency=total_concurrency,
request_count=total_concurrency,
)
try:
max_ttft = float(aiperf_result["time_to_first_token"]["max"])
return max_ttft / float(attn_dp_num_req_ratio)
except (KeyError, TypeError, ValueError):
logger.warning(
"Failed to extract max TTFT from AIPerf result for DEP prefill"
)
return None
# Default path (non-DEP): use AIPerf's TTFT metric
aiperf_result = benchmark_prefill(
isl,
aiperf_artifact_dir,
model_name,
tokenizer,
base_url=base_url,
)
try:
return float(aiperf_result["time_to_first_token"]["avg"])
except (KeyError, TypeError, ValueError):
logger.warning("Failed to extract TTFT from AIPerf result")
return None
def get_decode_itl_and_thpt_per_gpu(
isl: int,
osl: int,
num_request: int,
aiperf_artifact_dir: str,
model_name: str,
tokenizer: str,
base_url: str = "http://localhost:8000",
num_gpus: int = 1,
) -> Tuple[Optional[float], Optional[float]]:
"""
Run decode benchmark and extract (ITL ms, throughput per GPU).
Returns (None, None) on failure.
"""
aiperf_result = benchmark_decode(
isl,
osl,
num_request,
aiperf_artifact_dir,
model_name,
tokenizer,
base_url=base_url,
)
if aiperf_result is None:
return None, None
try:
itl = float(aiperf_result["inter_token_latency"]["avg"])
thpt_total = float(aiperf_result["output_token_throughput"]["avg"])
thpt_per_gpu = thpt_total / max(num_gpus, 1)
return itl, thpt_per_gpu
except (KeyError, TypeError, ValueError):
logger.warning("Failed to extract decode metrics from AIPerf result")
return None, None
def benchmark_decode(
isl,
osl,
......@@ -238,7 +344,7 @@ def benchmark_decode(
stdout, stderr = aiperf_process.communicate()
if aiperf_process.returncode == 0:
logger.info("AIperf profiling completed successfully")
logger.info(stdout)
logger.debug(stdout)
aiperf_result = get_aiperf_result(aiperf_artifact_dir)
return aiperf_result
else:
......
......@@ -6,7 +6,7 @@ import logging
from dataclasses import dataclass
from enum import Enum
from benchmarks.profiler.utils.defaults import EngineType
from benchmarks.profiler.utils.defaults import PREFILL_MAX_NUM_TOKENS, EngineType
from benchmarks.profiler.utils.model_info import ModelInfo
logger = logging.getLogger(__name__)
......@@ -69,21 +69,19 @@ class ParallelizationMapping:
return self.dep
return 1 # TP has expert split of 1
def get_attn_dp_size(self, num_gpus: int) -> int:
def get_attn_dp_size(self) -> int:
"""
Get the attention data parallelism size.
DEP uses data parallelism for attention (returns num_gpus).
DEP uses data parallelism for attention (returns dep size).
TP and TEP don't use data parallelism for attention (returns 1).
Args:
num_gpus: Total number of GPUs being used
None
Returns:
The attention data parallelism size
"""
if self.dep is not None:
return num_gpus
return 1 # TP and TEP have attention DP size of 1
return self.dep if self.dep is not None else 1 # TP and TEP → 1
def _check_divisibility(
......@@ -169,7 +167,10 @@ def get_candidate_parallel_mappings(
candidates: list[ParallelizationMapping] = []
if is_moe:
if phase == EngineType.PREFILL:
candidates = [ParallelizationMapping(tep=num_gpus)]
candidates = [
ParallelizationMapping(tep=num_gpus),
ParallelizationMapping(dep=num_gpus),
]
elif phase == EngineType.DECODE:
candidates = [
ParallelizationMapping(dep=num_gpus),
......@@ -212,10 +213,19 @@ def apply_parallel_mapping_to_config(
cfg = copy.deepcopy(base_config)
if mapping.tp is not None:
cfg = config_modifier.set_config_tp_size(cfg, mapping.tp)
elif phase == EngineType.PREFILL and mapping.tep is not None:
elif mapping.tep is not None:
cfg = config_modifier.set_config_tep_size(cfg, mapping.tep, num_gpus_per_node)
elif phase == EngineType.DECODE and mapping.dep is not None:
elif mapping.dep is not None:
cfg = config_modifier.set_config_dep_size(cfg, mapping.dep, num_gpus_per_node)
else:
pass
raise ValueError(f"Invalid mapping: {mapping.label()}")
# for prefill,set batch size to attention_dp_size
# (this assume prompt is long enough to saturate the GPU, which is usually valid in disagg)
if phase == EngineType.PREFILL:
cfg = config_modifier.set_prefill_config(
cfg,
max_batch_size=mapping.get_attn_dp_size(),
max_num_tokens=PREFILL_MAX_NUM_TOKENS,
)
return cfg
......@@ -62,6 +62,12 @@ class ConfigModifierProtocol(Protocol):
def get_model_name(cls, config: dict) -> str:
...
@classmethod
def set_prefill_config(
cls, config: dict, max_batch_size: int, max_num_tokens: int
) -> dict:
...
@classmethod
def get_port(cls, config: dict) -> int:
...
......
......@@ -353,3 +353,28 @@ class SGLangConfigModifier:
except Exception as e:
logger.warning(f"Failed to parse KV cache size from log file. Error: {e}")
return 0
@classmethod
def set_prefill_config(
cls, config: dict, max_batch_size: int, max_num_tokens: int
) -> dict:
"""
Configure prefill-related limits for aggregated prefill runs.
- Batch size is applied as server concurrency.
- Max tokens is applied as a total token cap to avoid chunked prefill.
"""
cfg = Config.model_validate(config)
worker_service = get_worker_service_from_config(
cfg, backend="sglang", sub_component_type=SubComponentType.DECODE
)
args = validate_and_get_worker_args(worker_service, backend="sglang")
args = break_arguments(args)
# Set max concurrency to control effective batch size
args = set_argument_value(args, "--max-running-requests", str(max_batch_size))
# Cap total tokens processed in a batch to avoid chunked prefill
args = set_argument_value(args, "--chunked-prefill-size", str(max_num_tokens))
worker_service.extraPodSpec.mainContainer.args = args
return cfg.model_dump()
......@@ -347,3 +347,30 @@ class TrtllmConfigModifier:
"Could not find KV cache size in TRT-LLM logs, using default value of 100000"
)
return 100000 # Default fallback value for TRT-LLM
@classmethod
def set_prefill_config(
cls, config: dict, max_batch_size: int, max_num_tokens: int
) -> dict:
"""
Configure prefill-related limits for aggregated prefill runs.
For TRT-LLM we set these via --override-engine-args JSON:
- max_batch_size
- max_num_tokens
"""
cfg = Config.model_validate(config)
worker_service = get_worker_service_from_config(
cfg, backend="trtllm", sub_component_type=SubComponentType.DECODE
)
args = validate_and_get_worker_args(worker_service, backend="trtllm")
args = break_arguments(args)
# Parse existing override-engine-args (if any) and update
override_dict, args = parse_override_engine_args(args)
override_dict["max_batch_size"] = int(max_batch_size)
override_dict["max_num_tokens"] = int(max_num_tokens)
override_str = json.dumps(override_dict)
args = append_argument(args, ["--override-engine-args", override_str])
worker_service.extraPodSpec.mainContainer.args = args
return cfg.model_dump()
......@@ -303,3 +303,27 @@ class VllmV1ConfigModifier:
f"Failed to parse KV cache size from line: {line}. Error: {e}"
)
return 0
@classmethod
def set_prefill_config(
cls, config: dict, max_batch_size: int, max_num_tokens: int
) -> dict:
"""
Configure prefill-related limits for aggregated prefill runs.
vLLM uses --max-num-seqs to limit concurrency and
--max-num-batched-tokens to cap total tokens per step.
"""
cfg = Config.model_validate(config)
worker_service = get_worker_service_from_config(
cfg, backend="vllm", sub_component_type=SubComponentType.DECODE
)
args = validate_and_get_worker_args(worker_service, backend="vllm")
args = break_arguments(args)
# Concurrency / batch size
args = set_argument_value(args, "--max-num-seqs", str(max_batch_size))
# Token cap per step
args = set_argument_value(args, "--max-num-batched-tokens", str(max_num_tokens))
worker_service.extraPodSpec.mainContainer.args = args
return cfg.model_dump()
......@@ -22,6 +22,9 @@ DYNAMO_RUN_DEFAULT_PORT = 8000
# for MoE models with attn-dp, we might hit this limit
DECODE_MAX_CONCURRENCY = 2000
# set a prefill maximum number of tokens to 32768 to avoid chunked prefill but not too large to cause activation tensor too large
PREFILL_MAX_NUM_TOKENS = 32768
class EngineType(str, Enum):
PREFILL = "prefill"
......
......@@ -6,7 +6,7 @@ from typing import Callable, Optional, Tuple
import numpy as np
from benchmarks.profiler.utils.aiperf import benchmark_decode
from benchmarks.profiler.utils.aiperf import get_decode_itl_and_thpt_per_gpu
from benchmarks.profiler.utils.defaults import DECODE_MAX_CONCURRENCY
from benchmarks.profiler.utils.estimate_perf import AIConfiguratorPerfEstimator
from benchmarks.profiler.utils.plot import plot_decode_3d_surface
......@@ -114,7 +114,7 @@ def profile_decode(
):
def get_itl_and_thpt_per_gpu(isl, osl, num_request):
ai_perf_artifact_dir = f"{work_dir}/aiperf_isl{isl}_osl{osl}_n{num_request}"
aiperf_result = benchmark_decode(
return get_decode_itl_and_thpt_per_gpu(
isl,
osl,
num_request,
......@@ -122,12 +122,8 @@ def profile_decode(
model_name,
tokenizer,
base_url=url,
num_gpus=num_gpus,
)
if aiperf_result is not None:
itl = aiperf_result["inter_token_latency"]["avg"]
thpt_per_gpu = aiperf_result["output_token_throughput"]["avg"] / num_gpus
return itl, thpt_per_gpu
return None, None
return _profile_decode_helper(
work_dir,
......
......@@ -6,7 +6,7 @@ from typing import Callable, Optional
import numpy as np
from benchmarks.profiler.utils.aiperf import benchmark_prefill
from benchmarks.profiler.utils.aiperf import get_prefill_ttft
from benchmarks.profiler.utils.estimate_perf import AIConfiguratorPerfEstimator
from benchmarks.profiler.utils.plot import plot_prefill_interpolation
......@@ -27,6 +27,7 @@ def _profile_prefill_helper(
max_context_length,
interpolation_granularity,
get_ttft: Callable[[int], Optional[float]],
attention_dp_size: int = 1,
):
prefill_isl = []
prefill_ttft = []
......@@ -47,7 +48,9 @@ def _profile_prefill_helper(
if ttft is not None:
prefill_isl.append(isl)
prefill_ttft.append(ttft)
prefill_thpt_per_gpu.append(isl / ttft / num_gpus * 1000)
prefill_thpt_per_gpu.append(
isl / ttft / num_gpus * 1000 * attention_dp_size
)
# Interpolate prefill_ttft vs prefill_isl with quadratic function (y=ax^2+bx+c)
if len(prefill_isl) > 2:
......@@ -86,19 +89,20 @@ def profile_prefill(
num_gpus,
max_context_length,
interpolation_granularity,
attention_dp_size: int = 1,
attn_dp_num_req_ratio: int = 4,
):
def get_ttft(isl):
ai_perf_artifact_dir = f"{work_dir}/aiperf_isl{isl}"
aiperf_result = benchmark_prefill(
return get_prefill_ttft(
isl,
ai_perf_artifact_dir,
model_name,
tokenizer,
base_url=url,
attention_dp_size=attention_dp_size,
attn_dp_num_req_ratio=attn_dp_num_req_ratio,
)
if aiperf_result is not None:
return aiperf_result["time_to_first_token"]["avg"]
return None
return _profile_prefill_helper(
work_dir,
......@@ -106,6 +110,7 @@ def profile_prefill(
max_context_length,
interpolation_granularity,
get_ttft,
attention_dp_size=attention_dp_size,
)
......
......@@ -4,7 +4,7 @@
accelerate==1.6.0
aiconfigurator @ git+https://github.com/ai-dynamo/aiconfigurator.git@11b6d821f1fbb34300bb0ed4945f647e89fb411a
aiofiles
aiperf @ git+https://github.com/ai-dynamo/aiperf.git@e8f69abf180ff9ea96de9f9a8c955df8c024625b
aiperf @ git+https://github.com/ai-dynamo/aiperf.git@16dad7c02fcd959ba96823d7bfe7e681e5d5b41d
av==15.0.0
fastapi==0.120.1
ftfy
......
......@@ -82,8 +82,12 @@ This feature is only available with cluster-scoped operators (`namespaceRestrict
1. **Hardware Setup**: Uses defaults or user-specified hardware configuration. Optionally, cluster-scoped operators can enable automatic GPU discovery to detect specifications from cluster nodes.
2. **Identify Sweep Ranges**: Automatically determine minimum and maximum number of GPUs per engine. Minimum is determined by the model size and GPU VRAM. Maximum is set to one node for dense model and 4 nodes for MoE models.
3. **Parallelization Mapping Sweep**: Use the input ISL and OSL, test the performance of the engines with different parallelization mappings. For dense models, we test different TP sizes for both prefill and decode. For MoE models, we test different TEP sizes for prefill and DEP sizes for decode.
- **Prefill**: For prefill, since there is no in-flight batching (assume isl is long enough to saturate the GPU), we directly measure the TTFT for a request with given isl without kv-reusing. For example, the below plot shows the prefill parallelization mapping sweep results for H100 for deepseek-ai/DeepSeek-R1-Distill-Llama-8B.
3. **Parallelization Mapping Sweep**: Use the input ISL and OSL, test the performance of the engines with different parallelization mappings.
- For dense models, we test different TP sizes for both prefill and decode.
- For MoE models (SGLang), we evaluate both TEP and DEP as candidates for prefill and decode.
- **Prefill**:
- TP/TEP: We measure TTFT with batch size = 1 (assuming ISL is long enough to saturate compute) without KV reuse.
- DEP: Attention uses data parallelism. We send a single burst with total concurrency `attention_dp_size × attn_dp_num_req_ratio` (defaults to 4) and compute the reported TTFT as `time_to_first_token.max / attn_dp_num_req_ratio` from the AIPerf summary of that burst. This stabilizes measurements when the first batch may launch before all requests arrive.
![Prefill Performance](../images/h100_prefill_performance.png)
- **Decode**: Since the ITL (or iteration time) is relevant with how many requests are in-flight, we measure the ITL under different number of in-flight requests. The range of the number of in-flight requests is from 1 to the maximum number of requests that the kv cache of the engine can hold. To measure the ITL without being affected by piggy-backed prefill requests, the script will enable kv-reuse and warm up the engine by issuing the same prompts before measuring the ITL. Since the kv cache is sufficient for all the requests, it can hold the kv cache of the pre-computed prompts and skip the prefill phase when measuring the ITL. However, for MoE models, this is not guaranteed because the kv cache in different attention DP ranks is different. We are working on framework-side change to fix this issue. For example, the below plot shows the decode parallelization mapping sweep results for H100 for deepseek-ai/DeepSeek-R1-Distill-Llama-8B.
![Decode Performance](../images/h100_decode_performance.png)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment