"tests/kvbm/common.py" did not exist on "316e8844bf96251d79bbc5b9409bb8f845a0a812"
Unverified Commit 4c648b11 authored by Hongkuan Zhou's avatar Hongkuan Zhou Committed by GitHub
Browse files

refactor: move core logics of DPP -> AIC and support static profiling (#6285)


Signed-off-by: default avatarhongkuanz <hongkuanz@nvidia.com>
Signed-off-by: default avatarHannah Zhang <hannahz@nvidia.com>
Co-authored-by: default avatarhhzhang16 <54051230+hhzhang16@users.noreply.github.com>
parent f6d4351f
......@@ -40,7 +40,7 @@ classifiers = [
]
dependencies = [
"aiconfigurator[webapp] @ git+https://github.com/ai-dynamo/aiconfigurator.git@7a24afd98714af13f061cffe784d4808f5356d45",
"aiconfigurator[webapp] @ git+https://github.com/ai-dynamo/aiconfigurator.git@168a948d5bc32209728fe8639191a9e0d9083d18",
"aiperf @ git+https://github.com/ai-dynamo/aiperf.git@c3fc969e9e30e9ddad35b2f613aa7c1d418f2de9",
"matplotlib",
"networkx",
......
......@@ -70,7 +70,7 @@ class SLAPlannerDefaults(BasePlannerDefaults):
kalman_r = 10.0
kalman_min_points = 5
no_correction = False # disable correction factor, might be useful under some conditions like long cold start time
no_correction = True
mode: Literal["disagg", "prefill", "decode", "agg"] = "disagg"
throughput_metrics_source = "frontend" # "frontend" | "router"
......
......@@ -133,6 +133,18 @@ class PlannerConfig(BaseModel):
"(enable_throughput_scaling or enable_load_scaling)"
)
if self.enable_throughput_scaling:
if (
self.pre_deployment_sweeping_mode is None
or self.pre_deployment_sweeping_mode
== PlannerPreDeploymentSweepMode.None_
):
raise ValueError(
"pre_deployment_sweeping_mode cannot be 'none' when "
"enable_throughput_scaling is True. Throughput-based scaling "
"requires pre-deployment sweeping to profile engine performance."
)
if self.enable_load_scaling:
# Router metrics URL is required outside kubernetes mode
if not self.load_router_metrics_url and self.environment != "kubernetes":
......@@ -212,6 +224,9 @@ class PlannerConfig(BaseModel):
return cls.model_validate(data)
def scaling_enabled(self) -> bool:
return self.enable_throughput_scaling or self.enable_load_scaling
if __name__ == "__main__":
from pathlib import Path
......
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Entry point for the Dynamo profiler.
Usage::
python -m dynamo.profiler --config <json string or path to json/yaml>
python -m dynamo.profiler --config '{"model": "Qwen/Qwen3-32B", ...}'
python -m dynamo.profiler --config /path/to/dgdr_spec.yaml
"""
import argparse
import asyncio
import json
import logging
import os
from pathlib import Path
import yaml
from dynamo.profiler.utils.dgdr_v1beta1_types import DynamoGraphDeploymentRequestSpec
from .profile_sla import run_profile
from .utils.profile_common import (
DEFAULT_DECODE_INTERPOLATION_GRANULARITY,
DEFAULT_DEPLOYMENT_TIMEOUT,
DEFAULT_DRY_RUN,
DEFAULT_OUTPUT_DIR,
DEFAULT_PREFILL_INTERPOLATION_GRANULARITY,
ProfilerOperationalConfig,
)
logger = logging.getLogger(__name__)
def _parse_dgdr_spec(config_arg: str) -> DynamoGraphDeploymentRequestSpec:
"""Parse a DGDR spec from a CLI ``--config`` argument.
Accepts a file path (JSON/YAML) or an inline JSON string.
"""
path = Path(config_arg)
if path.is_file():
text = path.read_text()
suffix = path.suffix.lower()
if suffix in (".yaml", ".yml"):
data = yaml.safe_load(text)
else:
try:
data = json.loads(text)
except json.JSONDecodeError:
data = yaml.safe_load(text)
return DynamoGraphDeploymentRequestSpec.model_validate(data)
try:
data = json.loads(config_arg)
except json.JSONDecodeError as e:
raise ValueError(
f"--config value is neither a valid file path nor valid JSON. "
f"File not found: '{config_arg}'. JSON parse error: {e}"
) from e
return DynamoGraphDeploymentRequestSpec.model_validate(data)
def _parse_args() -> tuple[DynamoGraphDeploymentRequestSpec, ProfilerOperationalConfig]:
parser = argparse.ArgumentParser(description="Dynamo Profiler")
parser.add_argument(
"--config",
required=True,
help="DynamoGraphDeploymentRequestSpec as JSON string or path to JSON/YAML file",
)
parser.add_argument(
"--output-dir",
type=str,
default=DEFAULT_OUTPUT_DIR,
help=f"Path to the output results directory (default: {DEFAULT_OUTPUT_DIR})",
)
parser.add_argument(
"--deployment-timeout",
type=int,
default=DEFAULT_DEPLOYMENT_TIMEOUT,
help=f"Max seconds to wait for deployment readiness (default: {DEFAULT_DEPLOYMENT_TIMEOUT})",
)
parser.add_argument(
"--prefill-interpolation-granularity",
type=int,
default=DEFAULT_PREFILL_INTERPOLATION_GRANULARITY,
help=f"Number of ISL samples for prefill interpolation (default: {DEFAULT_PREFILL_INTERPOLATION_GRANULARITY})",
)
parser.add_argument(
"--decode-interpolation-granularity",
type=int,
default=DEFAULT_DECODE_INTERPOLATION_GRANULARITY,
help=f"Number of samples for decode interpolation (default: {DEFAULT_DECODE_INTERPOLATION_GRANULARITY})",
)
parser.add_argument(
"--dry-run",
action="store_true",
default=DEFAULT_DRY_RUN,
help="Skip deployments and benchmarking (dev mode)",
)
args = parser.parse_args()
dgdr = _parse_dgdr_spec(args.config)
ops = ProfilerOperationalConfig(
output_dir=args.output_dir,
deployment_timeout=args.deployment_timeout,
prefill_interpolation_granularity=args.prefill_interpolation_granularity,
decode_interpolation_granularity=args.decode_interpolation_granularity,
dry_run=args.dry_run,
)
return dgdr, ops
def main():
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
try:
dgdr, ops = _parse_args()
except (ValueError, Exception) as e:
logger.error("Failed to parse profiler config: %s", e)
raise SystemExit(1) from e
os.makedirs(ops.output_dir, exist_ok=True)
log_file_handler = logging.FileHandler(f"{ops.output_dir}/profile_sla.log")
log_file_handler.setLevel(logging.INFO)
log_file_handler.setFormatter(
logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s",
"%Y-%m-%d %H:%M:%S",
)
)
logging.getLogger().addHandler(log_file_handler)
asyncio.run(run_profile(dgdr, ops))
if __name__ == "__main__":
main()
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Interpolation curve generation for planner pre-deployment sweeping."""
import logging
import os
import yaml
from deploy.utils.dynamo_deployment import DynamoDeploymentClient
from dynamo.planner.defaults import SubComponentType
from dynamo.planner.utils.planner_config import PlannerPreDeploymentSweepMode
from dynamo.profiler.utils.config import Config, get_service_name_by_type
from dynamo.profiler.utils.config_modifiers import CONFIG_MODIFIERS
from dynamo.profiler.utils.config_modifiers.parallelization_mapping import (
PickedParallelConfig,
)
from dynamo.profiler.utils.defaults import EngineType
from dynamo.profiler.utils.dgdr_v1beta1_types import DynamoGraphDeploymentRequestSpec
from dynamo.profiler.utils.estimate_perf import AIConfiguratorPerfEstimator
from dynamo.profiler.utils.profile_common import ProfilerOperationalConfig
from dynamo.profiler.utils.profile_decode import (
profile_decode,
profile_decode_aiconfigurator,
)
from dynamo.profiler.utils.profile_prefill import (
profile_prefill,
profile_prefill_aiconfigurator,
)
logger = logging.getLogger(__name__)
async def run_interpolation(
dgdr: DynamoGraphDeploymentRequestSpec,
ops: ProfilerOperationalConfig,
disagg_config: dict,
best_prefill_config: PickedParallelConfig,
best_decode_config: PickedParallelConfig,
model: str,
system: str,
backend: str,
isl: int,
osl: int,
sweep_max_context_length: int,
deployment_clients: list,
):
"""Generate interpolation curves for the planner based on sweep mode.
Takes the output disagg DGD config and uses ``convert_config`` to strip
it down to standalone prefill / decode engines for profiling.
"""
planner_cfg = (
dgdr.features.planner if (dgdr.features and dgdr.features.planner) else None
)
sweep_mode = PlannerPreDeploymentSweepMode.None_
if planner_cfg and planner_cfg.pre_deployment_sweeping_mode:
sweep_mode = planner_cfg.pre_deployment_sweeping_mode
if sweep_mode == PlannerPreDeploymentSweepMode.None_:
logger.info(
"Planner pre-deployment sweeping is disabled — skipping interpolation."
)
return
config_modifier = CONFIG_MODIFIERS[backend]
model_name, model_path = config_modifier.get_model_name(disagg_config)
best_prefill_gpus = best_prefill_config.num_gpus
best_decode_gpus = best_decode_config.num_gpus
# --- Prefill interpolation ---
prefill_config = config_modifier.convert_config(disagg_config, EngineType.PREFILL)
work_dir = f"{ops.output_dir}/selected_prefill_interpolation"
os.makedirs(work_dir, exist_ok=True)
prefill_config_fn = f"{work_dir}/config.yaml"
with open(prefill_config_fn, "w") as f:
yaml.dump(prefill_config, f)
if sweep_mode == PlannerPreDeploymentSweepMode.Rapid:
logger.info("Using AIC simulation for prefill interpolation.")
estimator = AIConfiguratorPerfEstimator(
hf_id=model,
system=system.lower(),
backend=backend,
)
profile_prefill_aiconfigurator(
work_dir,
best_prefill_gpus,
sweep_max_context_length,
ops.prefill_interpolation_granularity,
estimator,
tp_size=best_prefill_config.tp_size,
)
elif sweep_mode == PlannerPreDeploymentSweepMode.Thorough:
logger.info("Using real GPUs for prefill interpolation.")
frontend_port = config_modifier.get_port(prefill_config)
client = DynamoDeploymentClient(
namespace=ops.k8s_namespace,
base_log_dir=work_dir,
model_name=model_name,
frontend_port=frontend_port,
deployment_name=prefill_config["metadata"]["name"],
)
deployment_clients.append(client)
await client.create_deployment(prefill_config_fn)
logger.info("Waiting for prefill interpolation deployment...")
try:
await client.wait_for_deployment_ready(timeout=ops.deployment_timeout)
except TimeoutError:
logger.error("Prefill interpolation deployment timed out, skipping.")
await client.delete_deployment()
deployment_clients.remove(client)
return
await client.get_deployment_logs()
base_url = client.get_service_url()
profile_prefill(
work_dir,
model_name,
model_path,
base_url,
best_prefill_gpus,
sweep_max_context_length,
ops.prefill_interpolation_granularity,
attention_dp_size=best_prefill_config.dp,
)
await client.delete_deployment()
deployment_clients.remove(client)
# --- Decode interpolation ---
decode_config = config_modifier.convert_config(disagg_config, EngineType.DECODE)
work_dir = f"{ops.output_dir}/selected_decode_interpolation"
os.makedirs(work_dir, exist_ok=True)
decode_config_fn = f"{work_dir}/config.yaml"
with open(decode_config_fn, "w") as f:
yaml.dump(decode_config, f)
if sweep_mode == PlannerPreDeploymentSweepMode.Rapid:
logger.info("Using AIC simulation for decode interpolation.")
estimator = AIConfiguratorPerfEstimator(
hf_id=model,
system=system.lower(),
backend=backend,
)
attention_dp_size = best_decode_config.dp
max_kv_tokens = estimator.get_max_kv_tokens(
isl,
osl,
tp_size=best_decode_config.tp_size,
)
profile_decode_aiconfigurator(
work_dir,
best_decode_gpus,
max_kv_tokens,
sweep_max_context_length,
ops.decode_interpolation_granularity,
estimator,
attention_dp_size,
tp_size=best_decode_config.tp_size,
)
elif sweep_mode == PlannerPreDeploymentSweepMode.Thorough:
logger.info("Using real GPUs for decode interpolation.")
frontend_port = config_modifier.get_port(decode_config)
client = DynamoDeploymentClient(
namespace=ops.k8s_namespace,
base_log_dir=work_dir,
model_name=model_name,
frontend_port=frontend_port,
deployment_name=decode_config["metadata"]["name"],
)
deployment_clients.append(client)
await client.create_deployment(decode_config_fn)
logger.info("Waiting for decode interpolation deployment...")
try:
await client.wait_for_deployment_ready(timeout=ops.deployment_timeout)
except TimeoutError:
logger.error("Decode interpolation deployment timed out, skipping.")
await client.delete_deployment()
deployment_clients.remove(client)
return
await client.get_deployment_logs()
attention_dp_size = best_decode_config.dp
decode_cfg = Config.model_validate(decode_config)
decode_service_name = get_service_name_by_type(
decode_cfg, backend, SubComponentType.DECODE
).lower()
max_kv_tokens = config_modifier.get_kv_cache_size_from_dynamo_log(
f"{work_dir}/{client.deployment_name}/{decode_service_name}/0.log",
attention_dp_size=attention_dp_size,
)
base_url = client.get_service_url()
profile_decode(
work_dir,
model_name,
model_path,
base_url,
best_decode_gpus,
max_kv_tokens,
sweep_max_context_length,
ops.decode_interpolation_granularity,
attention_dp_size,
)
await client.delete_deployment()
deployment_clients.remove(client)
......@@ -13,859 +13,410 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
"""Profiler main entry point."""
import logging
import math
import os
from dataclasses import dataclass, field
from typing import Any
import numpy as np
import yaml
from aiconfigurator.generator.enumerate import check_model_hardware_support
from aiconfigurator.sdk.utils import get_model_config_from_model_path
from deploy.utils.dynamo_deployment import (
DynamoDeploymentClient,
cleanup_remaining_deployments,
)
from dynamo.planner.defaults import SubComponentType
from dynamo.profiler.utils.aiperf import (
get_decode_itl_and_thpt_per_gpu,
get_prefill_ttft,
)
from dynamo.profiler.utils.config import Config, get_service_name_by_type
from dynamo.profiler.utils.config_modifiers import CONFIG_MODIFIERS
from deploy.utils.dynamo_deployment import cleanup_remaining_deployments
from dynamo.profiler.interpolation import run_interpolation
from dynamo.profiler.rapid import run_rapid
from dynamo.profiler.thorough import run_thorough
from dynamo.profiler.utils.config_modifiers.parallelization_mapping import (
ParallelizationMapping,
apply_parallel_mapping_to_config,
get_candidate_parallel_mappings,
PickedParallelConfig,
)
from dynamo.profiler.utils.defaults import EngineType, SearchStrategy
from dynamo.profiler.utils.config_modifiers.protocol import apply_dgd_overrides
from dynamo.profiler.utils.defaults import SearchStrategy
from dynamo.profiler.utils.dgd_generation import generate_dgd_config_with_planner
from dynamo.profiler.utils.estimate_perf import AIConfiguratorPerfEstimator
from dynamo.profiler.utils.plot import (
plot_decode_performance,
plot_pd_joint_results,
plot_prefill_performance,
from dynamo.profiler.utils.dgdr_v1beta1_types import (
BackendType,
DynamoGraphDeploymentRequestSpec,
)
from dynamo.profiler.utils.profile_decode import (
get_num_request_range,
profile_decode,
profile_decode_aiconfigurator,
from dynamo.profiler.utils.dgdr_validate import (
run_gate_checks,
validate_dgdr_for_profiler,
)
from dynamo.profiler.utils.profile_prefill import (
profile_prefill,
profile_prefill_aiconfigurator,
from dynamo.profiler.utils.profile_common import (
ProfilerOperationalConfig,
determine_picking_mode,
is_planner_enabled,
picked_config_from_row,
resolve_model_path,
warn_and_update_sla,
warn_gpu_shortage,
)
from dynamo.profiler.utils.profiler_argparse import create_profiler_parser
from dynamo.profiler.utils.profiler_status import ProfilerStatus, write_profiler_status
from dynamo.profiler.webui.select_config import (
add_profiling_error,
clear_profiling_errors,
pick_config_with_webui,
)
@dataclass
class PrefillProfileData:
"""Container for prefill profiling results."""
num_gpus: list[int] = field(default_factory=list)
ttft: list[float] = field(default_factory=list)
thpt_per_gpu: list[float] = field(default_factory=list)
parallel_mapping_labels: list[str] = field(default_factory=list)
parallel_mappings: list[ParallelizationMapping] = field(default_factory=list)
def add_data(
self,
num_gpus: int,
ttft: float,
thpt_per_gpu: float,
parallel_mapping_label: str,
parallel_mapping: ParallelizationMapping,
) -> None:
"""Add a complete data point to the profile data."""
self.num_gpus.append(num_gpus)
self.ttft.append(ttft)
self.thpt_per_gpu.append(thpt_per_gpu)
self.parallel_mapping_labels.append(parallel_mapping_label)
self.parallel_mappings.append(parallel_mapping)
@dataclass
class DecodeProfileData:
"""Container for decode profiling results."""
num_gpus: list[int] = field(default_factory=list)
itl: list[float] = field(default_factory=list)
thpt_per_gpu: list[float] = field(default_factory=list)
concurrency: list[int] = field(default_factory=list)
kv_cache_size: list[int] = field(default_factory=list)
parallel_mapping_labels: list[str] = field(default_factory=list)
parallel_mappings: list[ParallelizationMapping] = field(default_factory=list)
def add_data(
self,
num_gpus: int,
itl: float,
thpt_per_gpu: float,
concurrency: int,
kv_cache_size: int,
parallel_mapping_label: str,
parallel_mapping: ParallelizationMapping,
) -> None:
"""Add a complete data point to the profile data."""
self.num_gpus.append(num_gpus)
self.itl.append(itl)
self.thpt_per_gpu.append(thpt_per_gpu)
self.concurrency.append(concurrency)
self.kv_cache_size.append(kv_cache_size)
self.parallel_mapping_labels.append(parallel_mapping_label)
self.parallel_mappings.append(parallel_mapping)
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s", "%Y-%m-%d %H:%M:%S"
)
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
async def run_profile(args):
# List to track all created deployment clients for cleanup in case of failure
deployment_clients = []
# Clear any errors from previous profiling runs
clear_profiling_errors()
# Write initial status for external jobs to monitor
os.makedirs(args.output_dir, exist_ok=True)
write_profiler_status(
args.output_dir,
status=ProfilerStatus.RUNNING,
message="Profiler job started",
)
try:
config_modifier = CONFIG_MODIFIERS[args.backend]
with open(args.config, "r") as f:
config = yaml.safe_load(f)
if args.dgd_image:
config = config_modifier.update_image(config, args.dgd_image)
logger.debug(f"Using DGD image: {args.dgd_image}")
profile_num_gpus = [
2**i
for i in range(int(math.log2(args.max_num_gpus_per_engine)) + 1)
if args.min_num_gpus_per_engine <= 2**i <= args.max_num_gpus_per_engine
]
logger.info(f"Profiling GPU counts: {profile_num_gpus}")
os.makedirs(args.output_dir, exist_ok=True)
model_name, model_path = config_modifier.get_model_name(config)
# Determine sweep max context length: allow user-provided cap to override model's if smaller
use_specified_max_context_len = getattr(args, "max_context_length", None)
model_max_context_len = args.model_info.max_context_length
if not use_specified_max_context_len and not model_max_context_len:
raise ValueError(
"No max_context_length available from args.max_context_length or model_info from HF config"
)
elif not use_specified_max_context_len:
sweep_max_context_length = model_max_context_len
logger.info(
f"Using model's maximum context length: {model_max_context_len}"
)
elif not model_max_context_len:
sweep_max_context_length = use_specified_max_context_len
logger.info(
f"Using user-provided max_context_length: {use_specified_max_context_len}"
)
else:
sweep_max_context_length = min(
use_specified_max_context_len, model_max_context_len
)
logger.info(
f"Using minimum of user-provided and model's maximum context length: {sweep_max_context_length}"
)
# Initialize AI Configurator estimator (only used when search_strategy == SearchStrategy.RAPID)
ai_configurator_perf_estimator: AIConfiguratorPerfEstimator | None = None
_CONCRETE_BACKENDS = ["trtllm", "sglang", "vllm"]
if args.search_strategy == SearchStrategy.RAPID:
# Use AI Configurator for rapid estimation
if not args.system:
raise ValueError(
"Must provide --system (hardware system, e.g. h100_sxm) when using rapid search strategy."
)
if not args.model:
raise ValueError(
"Must provide --model (HuggingFace ID) when using rapid search strategy."
def _check_auto_backend_support(model: str, system: str) -> bool:
"""
Return True if *any* concrete backend is AIC-supported for this model/system.
TODO: move this function to AIC and handle partially supported model x backend x hardware
"""
return any(
check_model_hardware_support(model, system, b) for b in _CONCRETE_BACKENDS
)
logger.info(
"Using AI Configurator to estimate performance (rapid strategy)..."
)
ai_configurator_perf_estimator = AIConfiguratorPerfEstimator(
hf_id=args.model,
system=args.system.lower(),
backend=args.backend,
)
# first profile prefill
prefill_data = PrefillProfileData()
logger.info("Profiling prefill...")
base_prefill_config = config_modifier.convert_config(
config, EngineType.PREFILL, is_moe_model=args.model_info.is_moe
)
frontend_port = config_modifier.get_port(config)
itl: float | None = None
thpt_per_gpu: float | None = None
for num_gpus in profile_num_gpus:
logger.info(f"Profiling prefill with {num_gpus} GPUs...")
candidate_mappings = get_candidate_parallel_mappings(
num_gpus,
args.model_info,
)
for mapping in candidate_mappings:
# Apply parallel mapping to config
prefill_config = apply_parallel_mapping_to_config(
base_prefill_config,
mapping,
SubComponentType.PREFILL,
config_modifier,
args.num_gpus_per_node,
)
logger.debug(f"Dynamo config: {prefill_config}")
# Work dir includes mapping label (safe chars only)
parallel_mapping_tag = (
mapping.label().replace("=", "").replace("/", "_")
)
work_dir = (
f"{args.output_dir}/prefill_{num_gpus}gpus_{parallel_mapping_tag}"
)
os.makedirs(work_dir, exist_ok=True)
prefill_config_fn = f"{work_dir}/config.yaml"
with open(prefill_config_fn, "w") as f:
yaml.dump(prefill_config, f)
ttft = None
if args.dry_run:
logger.info("Skipping deployment creation in dry run mode")
elif (
args.search_strategy == SearchStrategy.RAPID
and ai_configurator_perf_estimator
):
logger.info("Using ai-configurator to estimate prefill latency")
perf_dict = ai_configurator_perf_estimator.estimate_prefill_perf(
args.isl,
tp_size=mapping.get_tp_size(),
)
ttft = perf_dict["context_latency"]
logger.info(f"Estimated prefill TTFT: {ttft:.2f}ms")
def _extract_profiler_params(dgdr: DynamoGraphDeploymentRequestSpec) -> tuple:
"""Pull all profiler parameters from dgdr and log them."""
model = dgdr.model
backend = BackendType(dgdr.backend).value.lower()
system = dgdr.hardware.gpuSku.lower()
total_gpus = dgdr.hardware.totalGpus
isl = dgdr.workload.isl
osl = dgdr.workload.osl
request_latency = dgdr.sla.e2eLatency
if request_latency is not None:
target_ttft = request_latency
target_tpot = request_latency
else:
client = DynamoDeploymentClient(
namespace=args.namespace,
base_log_dir=work_dir,
model_name=model_name,
service_name=args.service_name,
frontend_port=frontend_port,
deployment_name=prefill_config["metadata"]["name"],
)
logger.info(
f"Created client with service_name: {client.service_name}"
)
deployment_clients.append(client) # Track for cleanup
await client.create_deployment(prefill_config_fn)
logger.info("Waiting for deployment to be ready...")
try:
await client.wait_for_deployment_ready(
timeout=getattr(args, "deployment_timeout", 1800)
)
except TimeoutError:
logger.error(
f"Deployment for mapping {mapping.label()} with {num_gpus} GPUs "
f"failed to become ready within timeout during prefill profiling, skipping"
)
add_profiling_error(
f"Mapping {mapping.label()} with {num_gpus} GPUs timed out "
f"during prefill profiling"
)
logger.info("Cleaning up timed-out deployment...")
await client.delete_deployment()
deployment_clients.remove(client)
continue
logger.info("Deployment is ready")
logger.info("Getting deployment logs...")
await client.get_deployment_logs()
target_ttft = dgdr.sla.ttft
target_tpot = dgdr.sla.itl
search_strategy = SearchStrategy(dgdr.searchStrategy)
picking_mode = determine_picking_mode(dgdr)
logger.info(
f"Logs have been saved to {client.base_log_dir / client.deployment_name}"
)
# run ai-perf
base_url = client.get_service_url()
ai_perf_artifact_dir = f"{work_dir}/aiperf_isl{args.isl}"
ttft = get_prefill_ttft(
args.isl,
ai_perf_artifact_dir,
model_name,
model_path,
base_url,
attention_dp_size=mapping.get_attn_dp_size(),
)
logger.info("Cleaning up deployment...")
await client.delete_deployment()
deployment_clients.remove(client)
logger.info("Deployment deleted")
if ttft is not None:
prefill_data.add_data(
num_gpus=num_gpus,
ttft=ttft,
thpt_per_gpu=args.isl
/ ttft
/ num_gpus
* 1000
* mapping.get_attn_dp_size(),
parallel_mapping_label=mapping.label(),
parallel_mapping=mapping,
)
# Plot the results as a 2D scatter plot
if prefill_data.num_gpus and prefill_data.ttft and prefill_data.thpt_per_gpu:
plot_prefill_performance(prefill_data, args.ttft, args.output_dir)
# then profile decode
decode_data = DecodeProfileData()
logger.info("Profiling decode...")
base_decode_config = config_modifier.convert_config(
config, EngineType.DECODE, is_moe_model=args.model_info.is_moe
)
for num_gpus in profile_num_gpus:
logger.info(f"Profiling decode with {num_gpus} GPUs...")
candidate_mappings = get_candidate_parallel_mappings(
num_gpus,
args.model_info,
)
for mapping in candidate_mappings:
# Apply parallel mapping to config
decode_config = apply_parallel_mapping_to_config(
base_decode_config,
mapping,
SubComponentType.DECODE,
config_modifier,
args.num_gpus_per_node,
)
logger.debug(f"Dynamo config: {decode_config}")
parallel_mapping_tag = (
mapping.label()
.replace("=", "")
.replace("/", "_") # safe chars for directory
)
work_dir = (
f"{args.output_dir}/decode_{num_gpus}gpus_{parallel_mapping_tag}"
)
os.makedirs(work_dir, exist_ok=True)
decode_config_fn = f"{work_dir}/config.yaml"
with open(decode_config_fn, "w") as f:
yaml.dump(decode_config, f)
if args.dry_run:
logger.info("Skipping deployment creation in dry run mode")
elif (
args.search_strategy == SearchStrategy.RAPID
and ai_configurator_perf_estimator
):
# Compute max_concurrency and max_kv_tokens to know which
# num_request to sweep over.
max_concurrency = ai_configurator_perf_estimator.get_max_batch_size(
args.isl, args.osl, tp_size=mapping.get_tp_size()
)
max_kv_tokens = max_concurrency * (args.isl + args.osl)
"Profiler config: model=%s, backend=%s, system=%s, total_gpus=%s, "
"isl=%d, osl=%d, ttft=%.1f, itl=%.1f, e2e_latency=%s, strategy=%s, picking=%s",
model,
backend,
system,
total_gpus,
isl,
osl,
target_ttft,
target_tpot,
request_latency,
search_strategy.value,
picking_mode,
)
return (
model,
backend,
system,
total_gpus,
isl,
osl,
request_latency,
target_ttft,
target_tpot,
search_strategy,
picking_mode,
)
async def _execute_strategy(
dgdr: DynamoGraphDeploymentRequestSpec,
ops: ProfilerOperationalConfig,
picking_mode: str,
aic_supported: bool,
model: str,
system: str,
backend: str,
total_gpus: int,
isl: int,
osl: int,
target_ttft: float,
target_tpot: float,
request_latency: float | None,
deployment_clients: list,
search_strategy: SearchStrategy,
) -> tuple[dict, PickedParallelConfig, PickedParallelConfig, float, float]:
"""Dispatch dry-run / RAPID / THOROUGH; extract configs; update SLA targets."""
if ops.dry_run:
logger.info("Dry run mode — skipping deployment and benchmarking.")
best_prefill_config = PickedParallelConfig(tp=1)
best_decode_config = PickedParallelConfig(tp=1)
pick_result: dict = {}
else:
client = DynamoDeploymentClient(
namespace=args.namespace,
base_log_dir=work_dir,
model_name=model_name,
service_name=args.service_name,
frontend_port=frontend_port,
deployment_name=decode_config["metadata"]["name"],
)
deployment_clients.append(client) # Track for cleanup
await client.create_deployment(decode_config_fn)
logger.info("Waiting for deployment to be ready...")
try:
await client.wait_for_deployment_ready(
timeout=getattr(args, "deployment_timeout", 1800)
)
except TimeoutError:
logger.error(
f"Deployment for mapping {mapping.label()} with {num_gpus} GPUs "
f"failed to become ready within timeout during decode profiling, skipping"
)
add_profiling_error(
f"Mapping {mapping.label()} with {num_gpus} GPUs timed out "
f"during decode profiling"
)
logger.info("Cleaning up timed-out deployment...")
await client.delete_deployment()
deployment_clients.remove(client)
continue
logger.info("Deployment is ready")
logger.info("Getting deployment logs...")
await client.get_deployment_logs()
logger.info(
f"Logs have been saved to {client.base_log_dir / client.deployment_name}"
)
# Compute max_concurrency and max_kv_tokens to know which
# num_request to sweep over.
attention_dp_size = mapping.get_attn_dp_size()
# Get the actual decode service name from the config
decode_cfg = Config.model_validate(decode_config)
decode_service_name = get_service_name_by_type(
decode_cfg, args.backend, SubComponentType.DECODE
).lower()
max_kv_tokens = config_modifier.get_kv_cache_size_from_dynamo_log(
f"{work_dir}/{client.deployment_name}/{decode_service_name}/0.log",
attention_dp_size=attention_dp_size,
)
max_concurrency = max_kv_tokens // (args.isl + args.osl)
if not args.dry_run:
attention_dp_size = mapping.get_attn_dp_size()
sweep_num_request = get_num_request_range(
attention_dp_size,
max_concurrency,
args.decode_interpolation_granularity,
)
logger.info(
f"Sweeping num_request range based on maximum number of kv tokens: {sweep_num_request}"
)
for num_request in sweep_num_request:
itl = thpt_per_gpu = None
if (
args.search_strategy == SearchStrategy.RAPID
and ai_configurator_perf_estimator
):
logger.info(
"Using ai-configurator to estimate decode latency."
)
perf_dict = ai_configurator_perf_estimator.estimate_perf(
args.isl,
args.osl,
num_request,
mode=EngineType.DECODE,
tp_size=mapping.get_tp_size(),
)
itl = perf_dict["tpot"]
thpt_per_gpu = perf_dict["tokens/s/gpu"]
logger.info(f"Estimated decode ITL: {itl:.2f}ms")
logger.info(
f"Estimated decode throughput per GPU: {thpt_per_gpu:.2f} tokens/s/GPU"
if search_strategy == SearchStrategy.RAPID:
pick_result = run_rapid(
dgdr,
picking_mode,
aic_supported,
model,
system,
backend,
total_gpus,
isl,
osl,
target_ttft,
target_tpot,
request_latency,
)
else:
base_url = client.get_service_url()
ai_perf_artifact_dir = f"{work_dir}/aiperf_request{num_request}_isl{args.isl}_osl{args.osl}_n{num_request}"
itl, thpt_per_gpu = get_decode_itl_and_thpt_per_gpu(
args.isl,
args.osl,
num_request,
ai_perf_artifact_dir,
model_name,
model_path,
base_url=base_url,
num_gpus=num_gpus,
attention_dp_size=mapping.get_attn_dp_size(),
)
if itl is not None and thpt_per_gpu is not None:
decode_data.add_data(
num_gpus=num_gpus,
itl=itl,
thpt_per_gpu=thpt_per_gpu,
concurrency=num_request,
kv_cache_size=max_kv_tokens,
parallel_mapping_label=mapping.label(),
parallel_mapping=mapping,
)
if (
not args.dry_run
and not args.search_strategy == SearchStrategy.RAPID
):
logger.info("Cleaning up deployment...")
await client.delete_deployment()
deployment_clients.remove(client)
logger.info("Deployment deleted")
# Plot all decode results after profiling is complete
if decode_data.num_gpus:
plot_decode_performance(decode_data, args.itl, args.output_dir)
if prefill_data.num_gpus and decode_data.num_gpus:
plot_pd_joint_results(
args.isl, args.osl, prefill_data, decode_data, args.output_dir
)
pick_result = await run_thorough(
dgdr,
ops,
picking_mode,
model,
system,
backend,
total_gpus,
isl,
osl,
target_ttft,
target_tpot,
request_latency,
deployment_clients,
)
best_config_df = pick_result["best_config_df"]
best_latencies = pick_result["best_latencies"]
target_ttft, target_tpot = warn_and_update_sla(
best_latencies,
target_ttft,
target_tpot,
)
warn_gpu_shortage(picking_mode, best_latencies, total_gpus or 0)
if best_config_df is not None and not best_config_df.empty:
row = best_config_df.iloc[0]
best_prefill_config = picked_config_from_row("(p)", row)
best_decode_config = picked_config_from_row("(d)", row)
else:
best_prefill_config = PickedParallelConfig(tp=1)
best_decode_config = PickedParallelConfig(tp=1)
if args.dry_run:
logger.info("Skipping recommendations in dry run mode")
logger.info(
"Selected prefill: %s (%d GPUs, tp=%d pp=%d dp=%d moe_tp=%d moe_ep=%d), "
"decode: %s (%d GPUs, tp=%d pp=%d dp=%d moe_tp=%d moe_ep=%d)",
best_prefill_config.label(),
best_prefill_config.num_gpus,
best_prefill_config.tp,
best_prefill_config.pp,
best_prefill_config.dp,
best_prefill_config.moe_tp,
best_prefill_config.moe_ep,
best_decode_config.label(),
best_decode_config.num_gpus,
best_decode_config.tp,
best_decode_config.pp,
best_decode_config.dp,
best_decode_config.moe_tp,
best_decode_config.moe_ep,
)
return (
pick_result,
best_prefill_config,
best_decode_config,
target_ttft,
target_tpot,
)
def _assemble_final_config(
dgdr: DynamoGraphDeploymentRequestSpec,
ops: ProfilerOperationalConfig,
dgd_config: dict | None,
best_prefill_config: PickedParallelConfig,
best_decode_config: PickedParallelConfig,
) -> Any:
"""Handle mocker/planner branching and return the final DGD config."""
mocker_enabled = (
dgdr.features is not None
and dgdr.features.mocker is not None
and dgdr.features.mocker.enabled
)
if dgd_config and (is_planner_enabled(dgdr) or mocker_enabled):
dgd_config_path = f"{ops.output_dir}/picked_dgd_config.yaml"
with open(dgd_config_path, "w") as f:
yaml.safe_dump(dgd_config, f, sort_keys=False)
real_config, mocker_config = generate_dgd_config_with_planner(
dgdr=dgdr,
config_path=dgd_config_path,
output_dir=ops.output_dir if not ops.dry_run else None,
best_prefill_mapping=best_prefill_config,
best_decode_mapping=best_decode_config,
)
if mocker_enabled:
logger.info("Mocker enabled — using mocker DGD config.")
return mocker_config
return real_config
return dgd_config
def _write_final_output(ops: ProfilerOperationalConfig, final_config: Any) -> bool:
"""Write final_config.yaml and profiler status. Returns False on unrecoverable failure."""
output_file = f"{ops.output_dir}/final_config.yaml"
if not final_config:
if ops.dry_run:
logger.warning("Dry run mode — no DGD config produced (expected).")
with open(output_file, "w") as f:
yaml.safe_dump(None, f, sort_keys=False)
else:
logger.info("Analyzing results and generate recommendations...")
# Safety guards: no results → exit early with a clear message
if not prefill_data.num_gpus:
error_msg = "No prefill results produced; skipping recommendations."
error_msg = "Profiler did not produce a DGD config."
logger.error(error_msg)
add_profiling_error(error_msg)
write_profiler_status(
args.output_dir,
ops.output_dir,
status=ProfilerStatus.FAILED,
error=error_msg,
message="Profiler failed: no prefill results produced",
message=error_msg,
)
return
if args.pick_with_webui:
# select best P/D config in webUI
selected_prefill_idx, selected_decode_idx = pick_config_with_webui(
prefill_data, decode_data, args
)
# update TTFT/ITL SLA based on selected config
args.ttft = prefill_data.ttft[selected_prefill_idx]
args.itl = decode_data.itl[selected_decode_idx]
return False
else:
# automatically select P/D config within SLA with the highest throughput/GPU
# select best parallel mapping for prefill
if min(prefill_data.ttft) > args.ttft:
warning_msg = "No engine configuration satisfies the TTFT requirement, please try a smaller model or more powerful hardware"
logger.warning(warning_msg)
add_profiling_error(warning_msg)
selected_prefill_idx = int(np.argmin(np.array(prefill_data.ttft)))
with open(output_file, "w") as f:
if isinstance(final_config, list):
yaml.safe_dump_all(final_config, f, sort_keys=False)
else:
valid_indices = [
i
for i, ttft in enumerate(prefill_data.ttft)
if ttft <= args.ttft
]
# Among valid TP sizes, select the one with highest throughput per GPU
valid_thpts = [prefill_data.thpt_per_gpu[i] for i in valid_indices]
max_thpt_idx = valid_indices[int(np.argmax(valid_thpts))]
selected_prefill_idx = max_thpt_idx
logger.info(
f"Suggested prefill parallel mapping: {prefill_data.parallel_mapping_labels[selected_prefill_idx]} on {prefill_data.num_gpus[selected_prefill_idx]} GPU(s) (TTFT {prefill_data.ttft[selected_prefill_idx]:.2f} ms, throughput {prefill_data.thpt_per_gpu[selected_prefill_idx]:.2f} tokens/s/GPU)"
)
yaml.safe_dump(final_config, f, sort_keys=False)
logger.info("Final DGD config saved to %s", output_file)
# select best parallel mapping for decode
if not decode_data.num_gpus:
error_msg = "No decode results produced; skipping recommendations."
logger.error(error_msg)
add_profiling_error(error_msg)
write_profiler_status(
args.output_dir,
status=ProfilerStatus.FAILED,
error=error_msg,
message="Profiler failed: no decode results produced",
)
return
if min(decode_data.itl) > args.itl:
warning_msg = "No engine configuration satisfies the ITL requirement, please try a smaller model or more powerful hardware"
logger.warning(warning_msg)
add_profiling_error(warning_msg)
selected_decode_idx = int(np.argmin(np.array(decode_data.itl)))
else:
valid_indices = [
i for i, itl in enumerate(decode_data.itl) if itl <= args.itl
]
# Among valid TP sizes, select the one with highest throughput per GPU
valid_thpts = [decode_data.thpt_per_gpu[i] for i in valid_indices]
max_thpt_idx = valid_indices[int(np.argmax(valid_thpts))]
selected_decode_idx = max_thpt_idx
logger.info(
f"Suggested decode parallel mapping: {decode_data.parallel_mapping_labels[selected_decode_idx]} on {decode_data.num_gpus[selected_decode_idx]} GPU(s) (ITL {decode_data.itl[selected_decode_idx]:.2f} ms, throughput {decode_data.thpt_per_gpu[selected_decode_idx]:.2f} tokens/s/GPU)"
ops.output_dir,
status=ProfilerStatus.SUCCESS,
message="Profiler completed successfully",
outputs={
"final_config": "final_config.yaml",
},
)
return True
if args.dry_run:
# use min value for prefill and decode GPU counts
prefill_data.num_gpus = [args.min_num_gpus_per_engine]
decode_data.num_gpus = [args.min_num_gpus_per_engine]
prefill_data.parallel_mappings = [
ParallelizationMapping(tp=args.min_num_gpus_per_engine)
]
decode_data.parallel_mappings = [
ParallelizationMapping(tp=args.min_num_gpus_per_engine)
]
selected_prefill_idx = 0
selected_decode_idx = 0
# interpolate ISL - TTFT with best prefill parallel mapping
best_prefill_gpus = prefill_data.num_gpus[selected_prefill_idx]
best_prefill_mapping = prefill_data.parallel_mappings[selected_prefill_idx]
logger.info(
f"Profiling prefill under best {best_prefill_gpus} GPU(s) with parallel mapping [{best_prefill_mapping.label()}] with different ISL..."
)
prefill_config = config_modifier.convert_config(
config, EngineType.PREFILL, is_moe_model=args.model_info.is_moe
)
prefill_config = apply_parallel_mapping_to_config(
prefill_config,
best_prefill_mapping,
SubComponentType.PREFILL,
config_modifier,
args.num_gpus_per_node,
)
logger.debug(f"Dynamo config: {prefill_config}")
work_dir = f"{args.output_dir}/selected_prefill_interpolation"
os.makedirs(work_dir, exist_ok=True)
prefill_config_fn = f"{work_dir}/config.yaml"
with open(prefill_config_fn, "w") as f:
yaml.dump(prefill_config, f)
if args.dry_run:
logger.info("Skipping deployment creation in dry run mode")
elif (
args.search_strategy == SearchStrategy.RAPID
and ai_configurator_perf_estimator
):
profile_prefill_aiconfigurator(
work_dir,
best_prefill_gpus, # num_gpus
sweep_max_context_length,
args.prefill_interpolation_granularity,
ai_configurator_perf_estimator,
tp_size=best_prefill_mapping.get_tp_size(),
)
else:
client = DynamoDeploymentClient(
namespace=args.namespace,
base_log_dir=work_dir,
model_name=model_name,
service_name=args.service_name,
frontend_port=frontend_port,
deployment_name=prefill_config["metadata"]["name"],
)
deployment_clients.append(client) # Track for cleanup
await client.create_deployment(prefill_config_fn)
logger.info("Waiting for deployment to be ready...")
try:
await client.wait_for_deployment_ready(
timeout=getattr(args, "deployment_timeout", 1800)
)
logger.info("Deployment is ready")
skip_profile = False
except TimeoutError:
logger.error(
"Deployment or model failed to become ready within timeout, skipping profiling"
)
skip_profile = True
async def run_profile(
dgdr: DynamoGraphDeploymentRequestSpec,
ops: ProfilerOperationalConfig | None = None,
):
"""Run the profiling pipeline.
if not skip_profile:
logger.info("Getting deployment logs...")
await client.get_deployment_logs()
logger.info(
f"Logs have been saved to {client.base_log_dir / client.deployment_name}"
)
Args:
dgdr: The DynamoGraphDeploymentRequest spec describing the model,
hardware, workload, SLA, and feature configuration.
ops: Operational knobs (output dir, namespace, granularity, etc.).
Uses defaults when ``None``.
"""
if ops is None:
ops = ProfilerOperationalConfig()
base_url = client.get_service_url()
deployment_clients: list = []
profile_prefill(
work_dir,
model_name,
model_path,
base_url,
best_prefill_gpus,
sweep_max_context_length,
args.prefill_interpolation_granularity,
attention_dp_size=best_prefill_mapping.get_attn_dp_size(),
os.makedirs(ops.output_dir, exist_ok=True)
write_profiler_status(
ops.output_dir,
status=ProfilerStatus.RUNNING,
message="Profiler job started",
)
logger.info("Cleaning up deployment...")
await client.delete_deployment()
deployment_clients.remove(client)
logger.info("Deployment deleted")
# interpolate ITL - Active_KV_Cache - Decode_Context_Length with best decode parallel mapping
best_decode_gpus = decode_data.num_gpus[selected_decode_idx]
best_decode_mapping = decode_data.parallel_mappings[selected_decode_idx]
logger.info(
f"Profiling decode with {best_decode_gpus} GPUs with parallel mapping [{best_decode_mapping.label()}]..."
)
decode_config = config_modifier.convert_config(
config, EngineType.DECODE, is_moe_model=args.model_info.is_moe
)
decode_config = apply_parallel_mapping_to_config(
decode_config,
best_decode_mapping,
SubComponentType.DECODE,
config_modifier,
args.num_gpus_per_node,
)
logger.debug(f"Dynamo config: {decode_config}")
work_dir = f"{args.output_dir}/selected_decode_interpolation"
os.makedirs(work_dir, exist_ok=True)
decode_config_fn = f"{work_dir}/config.yaml"
with open(decode_config_fn, "w") as f:
yaml.dump(decode_config, f)
if args.dry_run:
logger.info("Skipping deployment creation in dry run mode")
elif (
args.search_strategy == SearchStrategy.RAPID
and ai_configurator_perf_estimator
):
attention_dp_size = best_decode_mapping.get_attn_dp_size()
max_kv_tokens = ai_configurator_perf_estimator.get_max_kv_tokens(
args.isl, args.osl, tp_size=best_decode_mapping.get_tp_size()
)
profile_decode_aiconfigurator(
work_dir,
best_decode_gpus, # num_gpus
max_kv_tokens,
sweep_max_context_length,
args.decode_interpolation_granularity,
ai_configurator_perf_estimator,
attention_dp_size,
tp_size=best_decode_mapping.get_tp_size(),
)
try:
# Validate and normalise — after this, required fields are guaranteed non-None
validate_dgdr_for_profiler(dgdr)
(
model,
backend,
system,
total_gpus,
isl,
osl,
request_latency,
target_ttft,
target_tpot,
search_strategy,
picking_mode,
) = _extract_profiler_params(dgdr)
if backend == "auto":
aic_supported = _check_auto_backend_support(model, system)
else:
client = DynamoDeploymentClient(
namespace=args.namespace,
base_log_dir=work_dir,
model_name=model_name,
service_name=args.service_name,
frontend_port=frontend_port,
deployment_name=decode_config["metadata"]["name"],
)
deployment_clients.append(client) # Track for cleanup
await client.create_deployment(decode_config_fn)
logger.info("Waiting for deployment to be ready...")
await client.wait_for_deployment_ready()
logger.info("Deployment is ready")
logger.info("Getting deployment logs...")
await client.get_deployment_logs()
logger.info(
f"Logs have been saved to {client.base_log_dir / client.deployment_name}"
)
attention_dp_size = best_decode_mapping.get_attn_dp_size()
# Get the actual decode service name from the config
decode_cfg = Config.model_validate(decode_config)
decode_service_name = get_service_name_by_type(
decode_cfg, args.backend, SubComponentType.DECODE
).lower()
max_kv_tokens = config_modifier.get_kv_cache_size_from_dynamo_log(
f"{work_dir}/{client.deployment_name}/{decode_service_name}/0.log",
attention_dp_size=attention_dp_size,
)
base_url = client.get_service_url()
profile_decode(
work_dir,
model_name,
model_path,
base_url,
best_decode_gpus,
max_kv_tokens,
aic_supported = check_model_hardware_support(model, system, backend)
run_gate_checks(dgdr, aic_supported, search_strategy, backend)
(
pick_result,
best_prefill_config,
best_decode_config,
target_ttft,
target_tpot,
) = await _execute_strategy(
dgdr,
ops,
picking_mode,
aic_supported,
model,
system,
backend,
total_gpus,
isl,
osl,
target_ttft,
target_tpot,
request_latency,
deployment_clients,
search_strategy,
)
dgd_config = pick_result.get("dgd_config") if not ops.dry_run else None
# ---------------------------------------------------------------
# Interpolation curves
# ---------------------------------------------------------------
if not ops.dry_run and is_planner_enabled(dgdr) and dgd_config:
try:
model_cfg = get_model_config_from_model_path(resolve_model_path(dgdr))
sweep_max_context_length = model_cfg.get("max_position_embeddings", 0)
except Exception:
logger.warning("Could not fetch model max context length.")
sweep_max_context_length = 0
if not sweep_max_context_length:
sweep_max_context_length = isl * 2 if isl > 0 else 8192
await run_interpolation(
dgdr,
ops,
dgd_config,
best_prefill_config,
best_decode_config,
model,
system,
backend,
isl,
osl,
sweep_max_context_length,
args.decode_interpolation_granularity,
attention_dp_size,
deployment_clients,
)
logger.info("Cleaning up deployment...")
await client.delete_deployment()
deployment_clients.remove(client)
logger.info("Deployment deleted")
# generate DGD with planner based on profiling results
config, mocker_config = generate_dgd_config_with_planner(
config_path=args.config,
config_modifier=config_modifier,
output_dir=args.output_dir,
args=args,
best_prefill_mapping=best_prefill_mapping,
best_decode_mapping=best_decode_mapping,
num_gpus_per_node=args.num_gpus_per_node,
# ---------------------------------------------------------------
# Final DGD assembly
# ---------------------------------------------------------------
final_config = _assemble_final_config(
dgdr, ops, dgd_config, best_prefill_config, best_decode_config
)
logger.debug(f"Final DGD config with planner: {config}")
# save DGD config with planner; support multi-document output when a ConfigMap is included
with open(f"{args.output_dir}/config_with_planner.yaml", "w") as f:
if isinstance(config, list):
yaml.safe_dump_all(config, f, sort_keys=False)
else:
yaml.safe_dump(config, f, sort_keys=False)
# save mocker config with planner for testing purposes
logger.debug(f"Mocker config with planner: {mocker_config}")
with open(f"{args.output_dir}/mocker_config_with_planner.yaml", "w") as f:
if isinstance(mocker_config, list):
yaml.safe_dump_all(mocker_config, f, sort_keys=False)
else:
yaml.safe_dump(mocker_config, f, sort_keys=False)
# Write success status with output files
write_profiler_status(
args.output_dir,
status=ProfilerStatus.SUCCESS,
message="Profiler completed successfully",
outputs={
"config_with_planner": "config_with_planner.yaml",
"mocker_config_with_planner": "mocker_config_with_planner.yaml",
"disagg_config": "disagg_config.yaml",
},
# --- Apply DGD overrides (user-supplied partial DGD) ---
if final_config and dgdr.overrides and dgdr.overrides.dgd:
if isinstance(final_config, list):
final_config[-1] = apply_dgd_overrides(
final_config[-1], dgdr.overrides.dgd
)
elif isinstance(final_config, dict):
final_config = apply_dgd_overrides(final_config, dgdr.overrides.dgd)
logger.info("Applied DGD overrides to the final config.")
if not _write_final_output(ops, final_config):
return
except Exception as e:
logger.exception("Profile job failed with error")
write_profiler_status(
args.output_dir,
ops.output_dir,
status=ProfilerStatus.FAILED,
error=str(e),
message=f"Profiler failed with exception: {type(e).__name__}",
)
raise
finally:
# Always clean up any remaining deployments, even if the job failed
logger.info("Performing final cleanup of any remaining deployments...")
await cleanup_remaining_deployments(deployment_clients, args.namespace)
await cleanup_remaining_deployments(deployment_clients, ops.k8s_namespace)
logger.info("Final cleanup completed.")
if __name__ == "__main__":
args = create_profiler_parser()
# setup file logging
os.makedirs(args.output_dir, exist_ok=True)
log_file_handler = logging.FileHandler(f"{args.output_dir}/profile_sla.log")
log_file_handler.setLevel(logging.INFO)
formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s", "%Y-%m-%d %H:%M:%S"
)
log_file_handler.setFormatter(formatter)
logger.addHandler(log_file_handler)
asyncio.run(run_profile(args))
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""RAPID search strategy: AIC simulation + picking + DGD generation."""
import logging
import pandas as pd
import yaml
from aiconfigurator.cli.main import _execute_task_configs, build_default_task_configs
from aiconfigurator.generator.api import (
generate_backend_artifacts,
generate_naive_config,
)
from aiconfigurator.generator.module_bridge import task_config_to_generator_config
from aiconfigurator.sdk.task import TaskConfig, TaskRunner
from dynamo.profiler.utils.config_modifiers import CONFIG_MODIFIERS
from dynamo.profiler.utils.dgdr_v1beta1_types import DynamoGraphDeploymentRequestSpec
from dynamo.profiler.utils.profile_common import derive_backend_image
logger = logging.getLogger(__name__)
def _generate_dgd_from_pick(
dgdr: DynamoGraphDeploymentRequestSpec,
best_config_df: pd.DataFrame,
chosen_exp: str,
task_configs: dict[str, TaskConfig],
) -> dict | None:
"""Generate a DGD config dict from the rank-1 picked result via AIC's generator."""
if best_config_df is None or best_config_df.empty:
return None
row = best_config_df.iloc[0]
tc = task_configs.get(chosen_exp)
# TODO: temporary workaround — when backend="auto", AIC's
# merge_experiment_results_by_mode collapses e.g. "agg_vllm" into "agg",
# but task_configs retains the original keys. Reconstruct the key from
# the winning row's backend column. Proper fix: AIC should return the
# original task config key alongside the merged chosen experiment name.
if tc is None and "backend" in row.index:
tc = task_configs.get(f"{chosen_exp}_{row['backend']}")
if tc is None:
return None
original_total_gpus = tc.total_gpus
if "total_gpus_needed" in row.index and row["total_gpus_needed"] > 0:
tc.total_gpus = int(row["total_gpus_needed"])
generator_overrides: dict = {}
k8s_overrides: dict = {}
k8s_overrides["k8s_image"] = derive_backend_image(dgdr.image, tc.backend_name)
if dgdr.modelCache:
if dgdr.modelCache.pvcName:
k8s_overrides["k8s_pvc_name"] = dgdr.modelCache.pvcName
if dgdr.modelCache.pvcMountPath:
k8s_overrides["k8s_pvc_mount_path"] = dgdr.modelCache.pvcMountPath
if dgdr.modelCache.pvcModelPath:
k8s_overrides["k8s_model_path_in_pvc"] = dgdr.modelCache.pvcModelPath
if k8s_overrides:
generator_overrides["K8sConfig"] = k8s_overrides
cfg = task_config_to_generator_config(
task_config=tc,
result_df=row,
generator_overrides=generator_overrides or None,
)
tc.total_gpus = original_total_gpus
artifacts = generate_backend_artifacts(
params=cfg,
backend=tc.backend_name,
backend_version=tc.backend_version,
use_dynamo_generator=True,
)
dgd_yaml = artifacts.get("k8s_deploy.yaml", "")
if dgd_yaml:
return yaml.safe_load(dgd_yaml)
return None
# in naive mode, use vllm as the default backend
_DEFAULT_NAIVE_BACKEND = "vllm"
def _run_naive_fallback(
dgdr: DynamoGraphDeploymentRequestSpec,
model: str,
total_gpus: int,
system: str,
backend: str,
) -> dict:
"""Handle the AIC-unsupported path via naive config generation."""
if backend == "auto":
backend = _DEFAULT_NAIVE_BACKEND
logger.info(
"Auto backend resolved to '%s' for naive fallback.",
backend,
)
logger.info(
"AIC does not support this combo — falling back to naive config generation."
)
naive_result = generate_naive_config(model, total_gpus, system, backend)
dgd_yaml = naive_result.get("artifacts", {}).get("k8s_deploy.yaml", "")
dgd_config = yaml.safe_load(dgd_yaml) if dgd_yaml else None
if dgd_config:
config_modifier = CONFIG_MODIFIERS[backend]
dgd_config = config_modifier.update_image(
dgd_config, derive_backend_image(dgdr.image, backend)
)
if dgdr.modelCache and dgdr.modelCache.pvcName:
dgd_config = config_modifier.update_model_from_pvc(
dgd_config,
model_name=model,
pvc_name=dgdr.modelCache.pvcName,
pvc_mount_path=dgdr.modelCache.pvcMountPath,
pvc_path=dgdr.modelCache.pvcModelPath or "",
)
return {
"best_config_df": pd.DataFrame(),
"best_latencies": {"ttft": 0.0, "tpot": 0.0, "request_latency": 0.0},
"dgd_config": dgd_config,
"chosen_exp": None,
}
def _run_autoscale_sim(
dgdr: DynamoGraphDeploymentRequestSpec,
model: str,
system: str,
backend: str,
total_gpus: int,
isl: int,
osl: int,
target_ttft: float,
target_tpot: float,
request_latency: float | None,
) -> dict:
"""Build a TaskConfig, run autoscale simulation, collect latencies, generate DGD."""
planner_cfg = dgdr.features.planner if dgdr.features else None
if planner_cfg and planner_cfg.enable_throughput_scaling:
logger.warning(
"Throughput-based scaling enabled — only disagg mode is supported."
)
task = TaskConfig(
serving_mode="disagg",
model_path=model,
system_name=system,
backend_name=backend,
total_gpus=total_gpus,
isl=isl,
osl=osl,
ttft=target_ttft,
tpot=target_tpot,
request_latency=request_latency,
)
runner = TaskRunner()
sim_result = runner.run(task, autoscale=True)
pareto_df = sim_result.get("pareto_df", pd.DataFrame())
best_latencies = {"ttft": 0.0, "tpot": 0.0, "request_latency": 0.0}
if pareto_df is not None and not pareto_df.empty:
row = pareto_df.iloc[0]
best_latencies["ttft"] = float(row.get("ttft", 0.0))
best_latencies["tpot"] = float(row.get("tpot", 0.0))
best_latencies["request_latency"] = float(row.get("request_latency", 0.0))
task_configs = {"disagg": task}
dgd_config = _generate_dgd_from_pick(dgdr, pareto_df, "disagg", task_configs)
return {
"best_config_df": pareto_df,
"best_latencies": best_latencies,
"dgd_config": dgd_config,
"chosen_exp": "disagg",
"task_configs": task_configs,
}
def _run_default_sim(
dgdr: DynamoGraphDeploymentRequestSpec,
model: str,
system: str,
backend: str,
total_gpus: int,
isl: int,
osl: int,
target_ttft: float,
target_tpot: float,
request_latency: float | None,
picking_mode: str,
) -> dict:
"""Build default task_configs, apply load_match kwargs, run simulation, generate DGD."""
task_configs = build_default_task_configs(
model_path=model,
total_gpus=total_gpus,
system=system,
backend=backend,
isl=isl,
osl=osl,
ttft=target_ttft,
tpot=target_tpot,
request_latency=request_latency,
)
load_kwargs: dict = {}
if picking_mode == "load_match" and dgdr.workload is not None:
load_kwargs["target_request_rate"] = dgdr.workload.requestRate
load_kwargs["target_concurrency"] = dgdr.workload.concurrency
load_kwargs["max_total_gpus"] = total_gpus
chosen, best_configs, _, _, best_latencies_map = _execute_task_configs(
task_configs,
mode="default",
top_n=5,
**load_kwargs,
)
best_config_df = best_configs.get(chosen, pd.DataFrame())
best_latencies = best_latencies_map.get(
chosen, {"ttft": 0.0, "tpot": 0.0, "request_latency": 0.0}
)
dgd_config = _generate_dgd_from_pick(dgdr, best_config_df, chosen, task_configs)
return {
"best_config_df": best_config_df,
"best_latencies": best_latencies,
"dgd_config": dgd_config,
"chosen_exp": chosen,
"task_configs": task_configs,
}
def run_rapid(
dgdr: DynamoGraphDeploymentRequestSpec,
picking_mode: str,
aic_supported: bool,
model: str,
system: str,
backend: str,
total_gpus: int,
isl: int,
osl: int,
target_ttft: float,
target_tpot: float,
request_latency: float | None,
) -> dict:
"""Run AIC simulation and picking. Returns a result dict with
``best_config_df``, ``best_latencies``, and ``dgd_config``.
"""
if not aic_supported:
return _run_naive_fallback(dgdr, model, total_gpus, system, backend)
if picking_mode == "autoscale":
return _run_autoscale_sim(
dgdr,
model,
system,
backend,
total_gpus,
isl,
osl,
target_ttft,
target_tpot,
request_latency,
)
return _run_default_sim(
dgdr,
model,
system,
backend,
total_gpus,
isl,
osl,
target_ttft,
target_tpot,
request_latency,
picking_mode,
)
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""THOROUGH search strategy: enumerate candidates, deploy, benchmark, pick."""
import logging
import os
import pandas as pd
import yaml
from aiconfigurator.generator.enumerate import enumerate_profiling_configs
from aiconfigurator.sdk.picking import pick_autoscale, pick_default, pick_load_match
from aiconfigurator.sdk.task import TaskConfig
from deploy.utils.dynamo_deployment import DynamoDeploymentClient
from dynamo.planner.defaults import SubComponentType
from dynamo.profiler.rapid import _generate_dgd_from_pick
from dynamo.profiler.utils.aic_dataframe import (
build_decode_row,
build_disagg_df_from_static,
build_prefill_row,
make_parallel_label,
)
from dynamo.profiler.utils.aiperf import (
get_decode_itl_and_thpt_per_gpu,
get_prefill_ttft,
)
from dynamo.profiler.utils.config import Config, get_service_name_by_type
from dynamo.profiler.utils.config_modifiers import CONFIG_MODIFIERS
from dynamo.profiler.utils.config_modifiers.protocol import apply_dgd_overrides
from dynamo.profiler.utils.dgdr_v1beta1_types import (
DynamoGraphDeploymentRequestSpec,
ModelCacheSpec,
)
from dynamo.profiler.utils.profile_common import (
ProfilerOperationalConfig,
derive_backend_image,
)
from dynamo.profiler.utils.profile_decode import get_num_request_range
logger = logging.getLogger(__name__)
async def _benchmark_prefill_candidates(
prefill_candidates,
ops: ProfilerOperationalConfig,
isl: int,
osl: int,
model: str,
system: str,
backend: str,
deployment_clients: list,
config_modifier,
) -> pd.DataFrame:
"""Deploy each prefill candidate, measure TTFT, return prefill_df."""
prefill_rows: list[dict] = []
for candidate in prefill_candidates:
num_gpus = candidate.num_gpus
label = make_parallel_label(
candidate.tp,
candidate.pp,
candidate.dp,
candidate.moe_tp,
candidate.moe_ep,
)
tag = label.replace("=", "").replace("/", "_")
work_dir = f"{ops.output_dir}/prefill_{num_gpus}gpus_{tag}"
os.makedirs(work_dir, exist_ok=True)
config_fn = f"{work_dir}/config.yaml"
with open(config_fn, "w") as f:
yaml.dump(candidate.dgd_config, f)
model_name, model_path = config_modifier.get_model_name(candidate.dgd_config)
frontend_port = config_modifier.get_port(candidate.dgd_config)
logger.info("Profiling prefill candidate %s with %d GPUs...", label, num_gpus)
client = DynamoDeploymentClient(
namespace=ops.k8s_namespace,
base_log_dir=work_dir,
model_name=model_name,
frontend_port=frontend_port,
deployment_name=candidate.dgd_config["metadata"]["name"],
)
deployment_clients.append(client)
await client.create_deployment(config_fn)
logger.info("Waiting for prefill deployment to be ready...")
try:
await client.wait_for_deployment_ready(timeout=ops.deployment_timeout)
except TimeoutError:
logger.error("Prefill %s with %d GPUs timed out", label, num_gpus)
await client.delete_deployment()
deployment_clients.remove(client)
continue
logger.info("Prefill deployment ready")
await client.get_deployment_logs()
base_url = client.get_service_url()
ai_perf_dir = f"{work_dir}/aiperf_isl{isl}"
ttft = get_prefill_ttft(
isl,
ai_perf_dir,
model_name,
model_path,
base_url,
attention_dp_size=candidate.dp,
)
await client.delete_deployment()
deployment_clients.remove(client)
if ttft is not None:
prefill_rows.append(
build_prefill_row(
model=model,
isl=isl,
osl=osl,
ttft=ttft,
tp=candidate.tp,
pp=candidate.pp,
dp=candidate.dp,
moe_tp=candidate.moe_tp,
moe_ep=candidate.moe_ep,
backend=backend,
system=system,
)
)
return pd.DataFrame(prefill_rows) if prefill_rows else pd.DataFrame()
async def _benchmark_decode_candidates(
decode_candidates,
ops: ProfilerOperationalConfig,
isl: int,
osl: int,
model: str,
system: str,
backend: str,
deployment_clients: list,
config_modifier,
) -> pd.DataFrame:
"""Deploy each decode candidate, sweep num_request, return decode_df."""
decode_rows: list[dict] = []
for candidate in decode_candidates:
num_gpus = candidate.num_gpus
label = make_parallel_label(
candidate.tp,
candidate.pp,
candidate.dp,
candidate.moe_tp,
candidate.moe_ep,
)
tag = label.replace("=", "").replace("/", "_")
work_dir = f"{ops.output_dir}/decode_{num_gpus}gpus_{tag}"
os.makedirs(work_dir, exist_ok=True)
config_fn = f"{work_dir}/config.yaml"
with open(config_fn, "w") as f:
yaml.dump(candidate.dgd_config, f)
model_name, model_path = config_modifier.get_model_name(candidate.dgd_config)
frontend_port = config_modifier.get_port(candidate.dgd_config)
logger.info("Profiling decode candidate %s with %d GPUs...", label, num_gpus)
client = DynamoDeploymentClient(
namespace=ops.k8s_namespace,
base_log_dir=work_dir,
model_name=model_name,
frontend_port=frontend_port,
deployment_name=candidate.dgd_config["metadata"]["name"],
)
deployment_clients.append(client)
await client.create_deployment(config_fn)
logger.info("Waiting for decode deployment to be ready...")
try:
await client.wait_for_deployment_ready(timeout=ops.deployment_timeout)
except TimeoutError:
logger.error("Decode %s with %d GPUs timed out", label, num_gpus)
await client.delete_deployment()
deployment_clients.remove(client)
continue
logger.info("Decode deployment ready")
await client.get_deployment_logs()
decode_cfg = Config.model_validate(candidate.dgd_config)
decode_service_name = get_service_name_by_type(
decode_cfg, backend, SubComponentType.DECODE
).lower()
max_kv_tokens = config_modifier.get_kv_cache_size_from_dynamo_log(
f"{work_dir}/{client.deployment_name}/{decode_service_name}/0.log",
attention_dp_size=candidate.dp,
)
max_concurrency = max_kv_tokens // (isl + osl)
sweep_num_request = get_num_request_range(
candidate.dp,
max_concurrency,
ops.decode_interpolation_granularity,
)
logger.info("Sweeping num_request: %s", sweep_num_request)
base_url = client.get_service_url()
for num_request in sweep_num_request:
ai_perf_dir = f"{work_dir}/aiperf_request{num_request}_isl{isl}_osl{osl}_n{num_request}"
itl, thpt_per_gpu = get_decode_itl_and_thpt_per_gpu(
isl,
osl,
num_request,
ai_perf_dir,
model_name,
model_path,
base_url=base_url,
num_gpus=num_gpus,
attention_dp_size=candidate.dp,
)
if itl is not None and thpt_per_gpu is not None:
decode_rows.append(
build_decode_row(
tpot=itl,
thpt_per_gpu=thpt_per_gpu,
num_request=num_request,
num_gpus=num_gpus,
osl=osl,
tp=candidate.tp,
pp=candidate.pp,
dp=candidate.dp,
moe_tp=candidate.moe_tp,
moe_ep=candidate.moe_ep,
backend=backend,
system=system,
)
)
await client.delete_deployment()
deployment_clients.remove(client)
return pd.DataFrame(decode_rows) if decode_rows else pd.DataFrame()
def _pick_thorough_best_config(
prefill_df: pd.DataFrame,
decode_df: pd.DataFrame,
picking_mode: str,
target_ttft: float,
target_tpot: float,
request_latency: float | None,
total_gpus: int,
dgdr: DynamoGraphDeploymentRequestSpec,
) -> dict:
"""Dispatch to pick_autoscale / pick_load_match / pick_default, return result dict."""
if picking_mode == "autoscale":
return pick_autoscale(prefill_df, decode_df, target_ttft, target_tpot)
elif picking_mode == "load_match":
disagg_df = build_disagg_df_from_static(prefill_df, decode_df)
lm_kwargs: dict = {
"pareto_df": disagg_df,
"serving_mode": "disagg",
"top_n": 5,
}
if request_latency is not None:
lm_kwargs["target_request_latency"] = request_latency
else:
lm_kwargs["target_tpot"] = target_tpot
if dgdr.workload and dgdr.workload.requestRate is not None:
lm_kwargs["target_request_rate"] = dgdr.workload.requestRate
if dgdr.workload and dgdr.workload.concurrency is not None:
lm_kwargs["target_concurrency"] = dgdr.workload.concurrency
if total_gpus:
lm_kwargs["max_total_gpus"] = total_gpus
return pick_load_match(**lm_kwargs)
else:
disagg_df = build_disagg_df_from_static(prefill_df, decode_df)
pk_kwargs: dict = {
"pareto_df": disagg_df,
"total_gpus": total_gpus,
"serving_mode": "disagg",
"top_n": 5,
}
if request_latency is not None:
pk_kwargs["target_request_latency"] = request_latency
else:
pk_kwargs["target_tpot"] = target_tpot
return pick_default(**pk_kwargs)
async def run_thorough(
dgdr: DynamoGraphDeploymentRequestSpec,
ops: ProfilerOperationalConfig,
picking_mode: str,
model: str,
system: str,
backend: str,
total_gpus: int,
isl: int,
osl: int,
target_ttft: float,
target_tpot: float,
request_latency: float | None,
deployment_clients: list,
) -> dict:
"""Enumerate candidates, deploy + benchmark each, build DataFrames, pick."""
logger.warning("THOROUGH mode: only disagg configurations are supported.")
# --- Stage 1: Enumeration ---
model_cache = dgdr.modelCache or ModelCacheSpec()
prefill_candidates, decode_candidates = enumerate_profiling_configs(
model_path=model,
system=system,
backend=backend,
image=derive_backend_image(dgdr.image, backend),
isl=isl,
osl=osl,
num_gpus_per_node=dgdr.hardware.numGpusPerNode,
k8s_pvc_name=model_cache.pvcName,
k8s_pvc_mount_path=model_cache.pvcMountPath,
k8s_model_path_in_pvc=model_cache.pvcModelPath,
)
logger.info(
"Enumerated %d prefill candidates, %d decode candidates",
len(prefill_candidates),
len(decode_candidates),
)
if dgdr.overrides and dgdr.overrides.dgd:
for candidate in prefill_candidates:
candidate.dgd_config = apply_dgd_overrides(
candidate.dgd_config, dgdr.overrides.dgd
)
for candidate in decode_candidates:
candidate.dgd_config = apply_dgd_overrides(
candidate.dgd_config, dgdr.overrides.dgd
)
logger.info(
"Applied DGD overrides to %d prefill + %d decode candidates.",
len(prefill_candidates),
len(decode_candidates),
)
config_modifier = CONFIG_MODIFIERS[backend]
# --- Stage 2: Benchmarking ---
prefill_df = await _benchmark_prefill_candidates(
prefill_candidates,
ops,
isl,
osl,
model,
system,
backend,
deployment_clients,
config_modifier,
)
decode_df = await _benchmark_decode_candidates(
decode_candidates,
ops,
isl,
osl,
model,
system,
backend,
deployment_clients,
config_modifier,
)
# --- Stage 3: Picking ---
if prefill_df.empty:
logger.error("No prefill results produced in THOROUGH mode.")
return {
"best_config_df": pd.DataFrame(),
"best_latencies": {"ttft": 0.0, "tpot": 0.0, "request_latency": 0.0},
"dgd_config": None,
"chosen_exp": None,
}
if decode_df.empty:
logger.error("No decode results produced in THOROUGH mode.")
return {
"best_config_df": pd.DataFrame(),
"best_latencies": {"ttft": 0.0, "tpot": 0.0, "request_latency": 0.0},
"dgd_config": None,
"chosen_exp": None,
}
result = _pick_thorough_best_config(
prefill_df,
decode_df,
picking_mode,
target_ttft,
target_tpot,
request_latency,
total_gpus,
dgdr,
)
best_config_df = result.get("best_config_df", pd.DataFrame())
# --- Stage 4: DGD generation ---
task = TaskConfig(
serving_mode="disagg",
model_path=model,
system_name=system,
backend_name=backend,
total_gpus=total_gpus,
isl=isl,
osl=osl,
ttft=target_ttft,
tpot=target_tpot,
request_latency=request_latency,
)
dgd_config = _generate_dgd_from_pick(
dgdr, best_config_df, "disagg", {"disagg": task}
)
return {
"best_config_df": best_config_df,
"best_latencies": result.get(
"best_latencies", {"ttft": 0.0, "tpot": 0.0, "request_latency": 0.0}
),
"dgd_config": dgd_config,
"chosen_exp": "disagg",
}
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Helpers to build AIC-compatible DataFrames from real-GPU benchmark results.
The picking functions in ``aiconfigurator.sdk.picking`` expect DataFrames
whose columns match the ``ColumnsStatic`` schema. Only a subset of columns
are actually accessed; this module populates exactly those columns.
"""
from __future__ import annotations
import pandas as pd
from aiconfigurator.sdk import common
from aiconfigurator.sdk.picking import _build_disagg_summary_dict
def make_parallel_label(tp: int, pp: int, dp: int, moe_tp: int, moe_ep: int) -> str:
"""Build the ``parallel`` label string used for dedup in picking."""
if moe_ep > 1:
return f"dep{moe_ep}"
elif moe_tp > 1:
return f"tep{moe_tp}"
else:
return f"tp{tp}"
def build_prefill_row(
*,
model: str,
isl: int,
osl: int,
ttft: float,
tp: int,
pp: int,
dp: int,
moe_tp: int,
moe_ep: int,
backend: str = "",
system: str = "",
) -> dict:
"""Build a single prefill row dict with the minimal columns needed by AIC picking.
Only columns actually accessed by ``pick_autoscale`` and
``_build_disagg_summary_dict`` are populated.
"""
num_gpus = tp * pp * dp
seq_s = 1000.0 / ttft * dp if ttft > 0 else 0.0
return {
"ttft": ttft,
"seq/s": seq_s,
"seq/s/gpu": seq_s / num_gpus if num_gpus > 0 else 0.0,
"global_bs": 1 * dp,
"parallel": make_parallel_label(tp, pp, dp, moe_tp, moe_ep),
"tp": tp,
"pp": pp,
"dp": dp,
"osl": osl,
"model": model,
"isl": isl,
"bs": 1,
"moe_tp": moe_tp,
"moe_ep": moe_ep,
"prefix": 0,
"gemm": "",
"kvcache": "",
"fmha": "",
"moe": "",
"comm": "",
"memory": "",
"backend": backend,
"version": "",
"system": system,
"power_w": 0.0,
}
def build_decode_row(
*,
tpot: float,
thpt_per_gpu: float,
num_request: int,
num_gpus: int,
osl: int,
tp: int,
pp: int,
dp: int,
moe_tp: int,
moe_ep: int,
backend: str = "",
system: str = "",
) -> dict:
"""Build a single decode row dict with the minimal columns needed by AIC picking.
Only columns actually accessed by ``pick_autoscale`` and
``_build_disagg_summary_dict`` are populated.
"""
seq_s = thpt_per_gpu * num_gpus / osl if osl > 0 else 0.0
return {
"tpot": tpot,
"seq/s": seq_s,
"seq/s/gpu": thpt_per_gpu / osl if osl > 0 else 0.0,
"global_bs": num_request,
"parallel": make_parallel_label(tp, pp, dp, moe_tp, moe_ep),
"tp": tp,
"pp": pp,
"dp": dp,
"concurrency": num_request,
"bs": num_request // dp if dp > 0 else num_request,
"tokens/s/user": 1000.0 / tpot if tpot > 0 else 0.0,
"moe_tp": moe_tp,
"moe_ep": moe_ep,
"gemm": "",
"kvcache": "",
"fmha": "",
"moe": "",
"comm": "",
"memory": "",
"backend": backend,
"version": "",
"system": system,
"power_w": 0.0,
}
def build_disagg_df_from_static(
prefill_df: pd.DataFrame,
decode_df: pd.DataFrame,
) -> pd.DataFrame:
"""Cross-product prefill x decode into a ColumnsDisagg DataFrame.
Used when calling ``pick_default`` or ``pick_load_match`` from
THOROUGH-mode benchmark results.
"""
combos: list[dict] = []
for _, p_row in prefill_df.iterrows():
for _, d_row in decode_df.iterrows():
combo = _build_disagg_summary_dict(
prefill_summary_dict=p_row.to_dict(),
prefill_num_worker=1,
decode_summary_dict=d_row.to_dict(),
decode_num_worker=1,
)
combos.append(combo)
if not combos:
return pd.DataFrame(columns=common.ColumnsDisagg)
return pd.DataFrame(combos, columns=common.ColumnsDisagg)
......@@ -102,6 +102,51 @@ class ParallelizationMapping:
)
@dataclass(frozen=True)
class PickedParallelConfig:
"""Lightweight representation of a picked parallelization config.
Uses the same (tp, pp, dp, moe_tp, moe_ep) tuple that AIC's enumeration
and picking pipelines produce. Unlike :class:`ParallelizationMapping`,
this stores all five dimensions explicitly rather than using mutually
exclusive optional fields.
"""
tp: int = 1
pp: int = 1
dp: int = 1
moe_tp: int = 1
moe_ep: int = 1
@property
def num_gpus(self) -> int:
return self.tp * self.pp * self.dp
@property
def tp_size(self) -> int:
"""Effective TP for KV-head splitting (TP or TEP; 1 for DEP)."""
if self.moe_ep > 1:
return 1
if self.moe_tp > 1:
return self.moe_tp
return self.tp
def label(self) -> str:
if self.moe_ep > 1:
return f"dep{self.moe_ep}"
elif self.moe_tp > 1:
return f"tep{self.moe_tp}"
return f"tp{self.tp}"
def to_parallelization_mapping(self) -> ParallelizationMapping:
"""Convert to :class:`ParallelizationMapping`."""
if self.moe_ep > 1:
return ParallelizationMapping(dep=self.moe_ep)
elif self.moe_tp > 1:
return ParallelizationMapping(tep=self.moe_tp)
return ParallelizationMapping(tp=self.tp)
def _check_divisibility(
value: int | None,
divisor: int,
......
......@@ -15,6 +15,7 @@
from __future__ import annotations
import copy
import logging
from typing import Any, Protocol, Tuple
......@@ -401,18 +402,9 @@ class BaseConfigModifier:
cls._ensure_spec_pvc(cfg, pvc_name)
# Mount to Frontend + prefill + decode services if present.
if "Frontend" in cfg.spec.services:
cls._ensure_service_volume_mount(
cfg.spec.services["Frontend"], pvc_name, pvc_mount_path
)
for sct in (SubComponentType.PREFILL, SubComponentType.DECODE):
svc_name = get_service_name_by_type(cfg, cls.BACKEND, sct)
if svc_name in cfg.spec.services:
cls._ensure_service_volume_mount(
cfg.spec.services[svc_name], pvc_name, pvc_mount_path
)
# Mount PVC to all services (Frontend + workers)
for svc_name, svc in cfg.spec.services.items():
cls._ensure_service_volume_mount(svc, pvc_name, pvc_mount_path)
# Patch workers + frontend with PVC model path.
cls._apply_model_update_to_cfg(
......@@ -515,12 +507,16 @@ class BaseConfigModifier:
# Update model (handles worker args + frontend patching)
effective_model_path = model_path or model_name
if pvc_name and pvc_mount_path:
# Derive pvc_path from effective_model_path by stripping the mount prefix
pvc_path = ""
if effective_model_path and effective_model_path.startswith(pvc_mount_path):
pvc_path = effective_model_path[len(pvc_mount_path) :].strip("/")
result = cls.update_model_from_pvc(
cfg.model_dump(),
model_name=model_name,
pvc_name=pvc_name,
pvc_mount_path=pvc_mount_path,
pvc_path="",
pvc_path=pvc_path,
)
else:
result = cls.update_model(
......@@ -629,3 +625,96 @@ class BaseConfigModifier:
cls._apply_worker_config(
cfg.spec.services[svc_name], agg_cli_args, agg_replicas, agg_gpus
)
# ---------------------------------------------------------------------------
# DGD override merging (module-level, backend-agnostic)
# ---------------------------------------------------------------------------
# Services whose CLI args are fully replaced by overrides.
# For engine-worker services (everything else), the main container args
# are *appended* because they contain profiler-generated sweep results.
_OVERRIDE_NON_WORKER_SERVICES = frozenset({"Frontend", "Planner"})
# The exact path suffix where profiler-generated CLI args live inside a
# service dict. Only this specific location gets append semantics.
_WORKER_ARGS_SUFFIX = ("extraPodSpec", "mainContainer", "args")
def _is_worker_main_container_args(path: list[str]) -> bool:
"""True when *path* is ``spec.services.<worker>.extraPodSpec.mainContainer.args``."""
if len(path) != 6:
return False
return (
path[0] == "spec"
and path[1] == "services"
and path[2] not in _OVERRIDE_NON_WORKER_SERVICES
and tuple(path[3:]) == _WORKER_ARGS_SUFFIX
)
def _deep_merge_overrides(
target: dict,
overrides: dict,
path: list[str],
) -> None:
"""Recursively merge *overrides* into *target* (mutates *target* in-place).
Rules:
- Dicts are merged recursively; missing intermediate keys are created.
- ``spec.services.<name>`` that does not exist in *target* is skipped
with a warning (all nested overrides under that service are dropped).
- Only ``spec.services.<worker>.extraPodSpec.mainContainer.args`` is
*appended* to the existing list (preserving profiler-generated CLI
args). ``args`` at any other path is replaced normally.
- All other leaf values replace the target value.
"""
for key, value in overrides.items():
current_path = path + [key]
# Guard: skip overrides for services that don't exist in the DGD
if (
len(current_path) == 3
and current_path[0] == "spec"
and current_path[1] == "services"
):
services = target.get("services", target) if path == ["spec"] else target
if key not in services:
logger.warning(
"Service '%s' does not exist in the generated DGD config; "
"overrides for this service will not be applied.",
key,
)
continue
if isinstance(value, dict) and isinstance(target.get(key), dict):
_deep_merge_overrides(target[key], value, current_path)
elif isinstance(value, dict) and key not in target:
target[key] = copy.deepcopy(value)
elif (
key == "args"
and isinstance(value, list)
and _is_worker_main_container_args(current_path)
):
existing = target.get(key) or []
target[key] = list(existing) + list(value)
else:
target[key] = (
copy.deepcopy(value) if isinstance(value, (dict, list)) else value
)
def apply_dgd_overrides(dgd_config: dict, overrides: dict) -> dict:
"""Deep-merge an ``overrides.dgd`` dict onto a generated DGD config.
Args:
dgd_config: The generated DynamoGraphDeployment config dict.
overrides: A partial DGD dict with the same structure. Leaf values
overwrite the corresponding keys in *dgd_config*.
Returns:
A new dict with the overrides applied (the original is not mutated).
"""
result = copy.deepcopy(dgd_config)
_deep_merge_overrides(result, overrides, path=[])
return result
......@@ -16,320 +16,229 @@
import copy
import json
import os
import uuid
from typing import Any, Optional
import numpy as np
import yaml
from dynamo.common.utils.paths import get_workspace_dir
from dynamo.planner.defaults import MockerComponentName, SubComponentType
from dynamo.planner.defaults import MockerComponentName
from dynamo.planner.utils.planner_config import PlannerConfig
from dynamo.profiler.utils.config import (
Config,
DgdPlannerServiceConfig,
set_argument_value,
)
from dynamo.profiler.utils.config_modifiers import CONFIG_MODIFIERS
from dynamo.profiler.utils.config_modifiers.parallelization_mapping import (
ParallelizationMapping,
apply_parallel_mapping_to_config,
)
from dynamo.profiler.utils.planner_utils import build_planner_args_from_namespace
# Path to mocker disagg config relative to workspace
MOCKER_DISAGG_CONFIG_PATH = "examples/backends/mocker/deploy/disagg.yaml"
def _get_config_modifier_from_args(args):
"""Return an instantiated config modifier for args.backend."""
config_modifier_cls = CONFIG_MODIFIERS[args.backend]
return config_modifier_cls()
# ConfigMap name prefixes (a 4-char UUID suffix is appended at runtime
# so that multiple deployments in the same namespace don't collide)
PLANNER_CONFIG_PREFIX = "planner-config"
PLANNER_PROFILE_DATA_PREFIX = "planner-profile-data"
def _find_service_name_for_subcomponent(
config: Config, subcomponent: SubComponentType
) -> str:
"""Find the service name in a DGD config for a given subComponentType."""
for service_name, service_cfg in config.spec.services.items():
if getattr(service_cfg, "subComponentType", None) == subcomponent:
return service_name
raise KeyError(f"Could not find service with subComponentType={subcomponent!r}")
def _make_cm_name(prefix: str) -> str:
return f"{prefix}-{uuid.uuid4().hex[:4]}"
def _load_and_apply_mappings(
*,
def generate_dgd_config_with_planner(
dgdr,
config_path: str,
args,
config_modifier,
best_prefill_mapping: ParallelizationMapping | None,
best_decode_mapping: ParallelizationMapping | None,
num_gpus_per_node: int,
) -> Config:
"""Load a DGD config file and apply optional prefill/decode parallel mappings (single source of truth)."""
output_dir: str | None,
best_prefill_mapping=None,
best_decode_mapping=None,
) -> tuple[list[dict] | dict, list[dict] | dict]:
"""Generate DGD config with planner based on profiling results.
The ``config_path`` should point to a DGD YAML that already has the
correct parallelization and image applied (produced by AIC's generator
pipeline). This function loads it, adds the planner service (with
profiling data ConfigMap if available), and produces the final
deployable DGD.
Args:
dgdr: DynamoGraphDeploymentRequestSpec.
config_path: Path to the picked DGD YAML config file (already has
correct parallelization, replicas, and image).
output_dir: Output directory containing profiling interpolation data.
best_prefill_mapping: Picked prefill parallel config (PickedParallelConfig).
Used only for ``prefill_engine_num_gpu`` in PlannerConfig.
best_decode_mapping: Picked decode parallel config (PickedParallelConfig).
Used only for ``decode_engine_num_gpu`` in PlannerConfig.
Returns:
tuple: (dgd_config, mocker_config)
"""
with open(config_path, "r") as f:
raw = yaml.safe_load(f)
# Update container image if provided (overrides config file images)
if getattr(args, "dgd_image", None):
raw = config_modifier.update_image(raw, args.dgd_image)
config = Config.model_validate(raw)
if best_prefill_mapping is not None:
raw = apply_parallel_mapping_to_config(
raw,
# --- Build PlannerConfig ---
planner_cfg = _build_planner_config(
dgdr,
best_prefill_mapping,
SubComponentType.PREFILL,
config_modifier,
num_gpus_per_node,
is_aggregated_config=False,
)
if best_decode_mapping is not None:
raw = apply_parallel_mapping_to_config(
raw,
best_decode_mapping,
SubComponentType.DECODE,
config_modifier,
num_gpus_per_node,
is_aggregated_config=False,
)
return Config.model_validate(raw)
# --- Add planner service to DGD ---
planner_service = DgdPlannerServiceConfig()
if planner_service.extraPodSpec.mainContainer:
planner_service.extraPodSpec.mainContainer.image = dgdr.image
planner_dict = planner_service.model_dump(exclude_unset=False)
config_dict = config.model_dump(exclude_unset=False)
def build_prefill_service_config(
*,
config_path: str,
args,
best_prefill_mapping: ParallelizationMapping,
num_gpus_per_node: int = 8,
) -> tuple[str, dict]:
"""Return (service_name, service_dict) for the prefill worker after applying mapping."""
return _build_single_worker_service_config(
config_path=config_path,
args=args,
mapping=best_prefill_mapping,
subcomponent=SubComponentType.PREFILL,
num_gpus_per_node=num_gpus_per_node,
)
planner_config_cm_name = _make_cm_name(PLANNER_CONFIG_PREFIX)
profile_data_cm_name = _make_cm_name(PLANNER_PROFILE_DATA_PREFIX)
profile_data_mount = f"{get_workspace_dir()}/profiling_results"
planner_config_mount = f"{get_workspace_dir()}/planner_config"
def build_decode_service_config(
*,
config_path: str,
args,
best_decode_mapping: ParallelizationMapping,
num_gpus_per_node: int = 8,
) -> tuple[str, dict]:
"""Return (service_name, service_dict) for the decode worker after applying mapping."""
return _build_single_worker_service_config(
config_path=config_path,
args=args,
mapping=best_decode_mapping,
subcomponent=SubComponentType.DECODE,
num_gpus_per_node=num_gpus_per_node,
)
# --- ConfigMap 1: profiling interpolation data ---
profile_data_cm: Optional[dict] = None
profiling_data = _load_profiling_data(output_dir) if output_dir else {}
if profiling_data:
planner_cfg.profile_results_dir = profile_data_mount
def _build_single_worker_service_config(
*,
config_path: str,
args,
mapping: ParallelizationMapping,
subcomponent: SubComponentType,
num_gpus_per_node: int,
) -> tuple[str, dict]:
"""Shared helper for building a single worker service dict (prefill or decode)."""
config_modifier = _get_config_modifier_from_args(args)
config = _load_and_apply_mappings(
config_path=config_path,
args=args,
config_modifier=config_modifier,
best_prefill_mapping=mapping
if subcomponent == SubComponentType.PREFILL
else None,
best_decode_mapping=mapping
if subcomponent == SubComponentType.DECODE
else None,
num_gpus_per_node=num_gpus_per_node,
profile_cm_data: dict[str, str] = {}
# TODO: use enums
if profiling_data.get("prefill"):
profile_cm_data["prefill_raw_data.json"] = json.dumps(
profiling_data["prefill"]
)
service_name = _find_service_name_for_subcomponent(config, subcomponent)
config_dict = config.model_dump(exclude_unset=False)
return service_name, config_dict["spec"]["services"][service_name]
def generate_prefill_service_config_preview(
*,
config_path: str,
args,
best_prefill_mapping: ParallelizationMapping,
num_gpus_per_node: int = 8,
) -> dict:
"""Generate a prefill-only service config object for WebUI 'Show Config'."""
service_name, service_dict = build_prefill_service_config(
config_path=config_path,
args=args,
best_prefill_mapping=best_prefill_mapping,
num_gpus_per_node=num_gpus_per_node,
if profiling_data.get("decode"):
profile_cm_data["decode_raw_data.json"] = json.dumps(
profiling_data["decode"]
)
return {service_name: service_dict}
profile_data_cm = {
"apiVersion": "v1",
"kind": "ConfigMap",
"metadata": {"name": profile_data_cm_name},
"data": profile_cm_data,
}
def generate_decode_service_config_preview(
*,
config_path: str,
args,
best_decode_mapping: ParallelizationMapping,
num_gpus_per_node: int = 8,
) -> dict:
"""Generate a decode-only service config object for WebUI 'Show Config'."""
service_name, service_dict = build_decode_service_config(
config_path=config_path,
args=args,
best_decode_mapping=best_decode_mapping,
num_gpus_per_node=num_gpus_per_node,
)
return {service_name: service_dict}
# --- ConfigMap 2: planner config ---
planner_config_cm = {
"apiVersion": "v1",
"kind": "ConfigMap",
"metadata": {"name": planner_config_cm_name},
"data": {
"planner_config.json": planner_cfg.model_dump_json(),
},
}
# --- Mount both ConfigMaps into the planner service ---
planner_volumes = planner_dict.setdefault("extraPodSpec", {}).setdefault(
"volumes", []
)
mc_dict = planner_dict.setdefault("extraPodSpec", {}).setdefault(
"mainContainer", {}
)
mc_mounts = mc_dict.setdefault("volumeMounts", [])
def generate_prefill_decode_services_config_preview(
*,
config_path: str,
args,
best_prefill_mapping: ParallelizationMapping,
best_decode_mapping: ParallelizationMapping,
num_gpus_per_node: int = 8,
) -> dict[str, dict]:
"""Generate a (prefill+decode)-only services config object for WebUI 'Show Config'."""
config_modifier = _get_config_modifier_from_args(args)
config = _load_and_apply_mappings(
config_path=config_path,
args=args,
config_modifier=config_modifier,
best_prefill_mapping=best_prefill_mapping,
best_decode_mapping=best_decode_mapping,
num_gpus_per_node=num_gpus_per_node,
# Planner config volume
planner_volumes.append(
{
"name": planner_config_cm_name,
"configMap": {"name": planner_config_cm_name},
}
)
prefill_service_name = _find_service_name_for_subcomponent(
config, SubComponentType.PREFILL
mc_mounts.append(
{
"name": planner_config_cm_name,
"mountPath": planner_config_mount,
"readOnly": True,
}
)
decode_service_name = _find_service_name_for_subcomponent(
config, SubComponentType.DECODE
# Profiling data volume (only if data exists)
if profile_data_cm is not None:
planner_volumes.append(
{
"name": profile_data_cm_name,
"configMap": {"name": profile_data_cm_name},
}
)
config_dict = config.model_dump(exclude_unset=False)
services = {
prefill_service_name: config_dict["spec"]["services"][prefill_service_name],
decode_service_name: config_dict["spec"]["services"][decode_service_name],
mc_mounts.append(
{
"name": profile_data_cm_name,
"mountPath": profile_data_mount,
"readOnly": True,
}
return services
)
# Planner reads its config from the mounted planner-config ConfigMap
mc_args = mc_dict.setdefault("args", [])
mc_args.extend(["--config", f"{planner_config_mount}/planner_config.json"])
def generate_dgd_config_with_planner(
config_path: str,
config_modifier,
output_dir: str | None,
args,
best_prefill_mapping: ParallelizationMapping | None,
best_decode_mapping: ParallelizationMapping | None,
num_gpus_per_node: int = 8,
) -> tuple[list[dict] | dict, list[dict] | dict]:
"""Generate DGD config with planner based on profiling results.
config_dict["spec"]["services"]["Planner"] = planner_dict
Args:
config_path: Path to the YAML config file
config_modifier: Config modifier instance (e.g., SGLangConfigModifier)
output_dir: Output directory for profile results
args: Parsed arguments namespace from profile_sla
best_prefill_mapping: Parallel mapping for prefill (TP/TEP/DEP)
best_decode_mapping: Parallel mapping for decode (TP/TEP/DEP)
num_gpus_per_node: Number of GPUs per node (for TEP/DEP models)
# --- Generate mocker config ---
mocker_config = _generate_mocker_config_with_planner(
dgdr=dgdr,
profile_data_mount=profile_data_mount,
planner_config_mount=planner_config_mount,
profile_data_cm=profile_data_cm,
planner_config_cm=planner_config_cm,
planner_dict=planner_dict,
)
Returns:
tuple: (dgd_config, mocker_config) where:
- dgd_config: list[dict] | dict - If a ConfigMap is generated for planner data,
returns a list of two YAML documents [ConfigMap, DGD]; otherwise returns a single DGD dict.
- mocker_config: list[dict] | dict - Mocker DGD config with planner for testing purposes.
If a ConfigMap is generated, returns [ConfigMap, DGD]; otherwise returns a single DGD dict.
"""
# Collect all ConfigMaps + DGD into multi-doc output
config_maps = [cm for cm in [profile_data_cm, planner_config_cm] if cm is not None]
if config_maps:
dgd_config: list[dict[str, Any]] = config_maps + [config_dict]
else:
dgd_config = config_dict
config = _load_and_apply_mappings(
config_path=config_path,
args=args,
config_modifier=config_modifier,
best_prefill_mapping=best_prefill_mapping,
best_decode_mapping=best_decode_mapping,
num_gpus_per_node=num_gpus_per_node,
)
return dgd_config, mocker_config
# add the planner service
planner_config = DgdPlannerServiceConfig()
frontend_service = config.spec.services["Frontend"]
frontend_image: Optional[str] = None
if frontend_service.extraPodSpec and frontend_service.extraPodSpec.mainContainer:
frontend_image = frontend_service.extraPodSpec.mainContainer.image
if frontend_image and planner_config.extraPodSpec.mainContainer:
planner_config.extraPodSpec.mainContainer.image = frontend_image
# Build planner args dynamically from parsed arguments
# This includes shared args (ttft, itl, backend, namespace) from profile_sla
# and planner-specific args (with planner_ prefix)
planner_args = build_planner_args_from_namespace(args, prefix="planner_")
# Override profiling-specific arguments with results from profiling
# Remove and re-add to ensure correct values from profiling context
planner_args = [
arg
for arg in planner_args
if not any(
arg.startswith(f"--{key}=")
for key in [
"namespace",
"prefill-engine-num-gpu",
"decode-engine-num-gpu",
"profile-results-dir",
]
)
]
# Add arguments determined by profiling results
cm_mount_path = f"{get_workspace_dir()}/profiling_results"
def _build_planner_config(
dgdr,
best_prefill_mapping,
best_decode_mapping,
) -> PlannerConfig:
"""Build a PlannerConfig from the DGDR spec and picked parallel configs."""
if dgdr.features and dgdr.features.planner:
planner_cfg = dgdr.features.planner.model_copy(deep=True)
else:
planner_cfg = PlannerConfig()
if best_prefill_mapping is not None:
planner_args.append(
f"--prefill-engine-num-gpu={best_prefill_mapping.get_num_gpus()}"
)
planner_cfg.prefill_engine_num_gpu = best_prefill_mapping.num_gpus
if best_decode_mapping is not None:
planner_args.append(
f"--decode-engine-num-gpu={best_decode_mapping.get_num_gpus()}"
)
planner_cfg.decode_engine_num_gpu = best_decode_mapping.num_gpus
# Work with plain dicts for PodSpec/Container extras (e.g. volumes, volumeMounts)
# because those fields are stored as "extra" and aren't exposed as pydantic attributes.
planner_dict = planner_config.model_dump(exclude_unset=False)
config_dict = config.model_dump(exclude_unset=False)
return planner_cfg
config_map_obj: Optional[dict] = None
prefill_json = None
decode_json = None
if output_dir is not None:
# Build a ConfigMap from NPZ profiling outputs and mount it into the Planner
# We store data as plain JSON (lists/float/int) to avoid binary artifacts.
prefill_npz = f"{output_dir}/selected_prefill_interpolation/raw_data.npz"
decode_npz = f"{output_dir}/selected_decode_interpolation/raw_data.npz"
def _load_profiling_data(output_dir: str) -> dict:
"""Load interpolation profiling data from NPZ files."""
result: dict = {}
prefill_npz = f"{output_dir}/selected_prefill_interpolation/raw_data.npz"
try:
with np.load(prefill_npz) as p_raw:
prefill_json = {
result["prefill"] = {
"prefill_isl": p_raw["prefill_isl"].tolist(),
"prefill_ttft": p_raw["prefill_ttft"].tolist(),
"prefill_thpt_per_gpu": p_raw["prefill_thpt_per_gpu"].tolist(),
}
except FileNotFoundError:
prefill_json = None
pass
decode_npz = f"{output_dir}/selected_decode_interpolation/raw_data.npz"
try:
with np.load(decode_npz) as d_raw:
# max_kv_tokens saved as array; convert to int
max_kv_tokens = d_raw["max_kv_tokens"]
if hasattr(max_kv_tokens, "tolist"):
max_kv_tokens_val = max_kv_tokens.tolist()
# Handle [value] vs value
if isinstance(max_kv_tokens_val, list):
max_kv_tokens_val = (
int(max_kv_tokens_val[0]) if max_kv_tokens_val else 0
......@@ -339,7 +248,7 @@ def generate_dgd_config_with_planner(
else:
max_kv_tokens_val = int(max_kv_tokens)
decode_json = {
result["decode"] = {
"x_kv_usage": d_raw["x_kv_usage"].tolist(),
"y_context_length": d_raw["y_context_length"].tolist(),
"z_itl": d_raw["z_itl"].tolist(),
......@@ -347,113 +256,37 @@ def generate_dgd_config_with_planner(
"max_kv_tokens": max_kv_tokens_val,
}
except FileNotFoundError:
decode_json = None
if prefill_json is not None and decode_json is not None:
# Only override planner profile directory when we actually have data to mount.
planner_args.append(f"--profile-results-dir={cm_mount_path}")
config_map_obj = {
"apiVersion": "v1",
"kind": "ConfigMap",
"metadata": {"name": "planner-profile-data"},
"data": {
"prefill_raw_data.json": json.dumps(prefill_json),
"decode_raw_data.json": json.dumps(decode_json),
},
}
# Attach the ConfigMap as a volume in the Planner service
planner_volumes = planner_dict.setdefault("extraPodSpec", {}).setdefault(
"volumes", []
)
planner_volumes.append(
{
"name": "planner-profile-data",
"configMap": {"name": "planner-profile-data"},
}
)
mc_dict = planner_dict.setdefault("extraPodSpec", {}).setdefault(
"mainContainer", {}
)
mc_mounts = mc_dict.setdefault("volumeMounts", [])
mc_mounts.append(
{
"name": "planner-profile-data",
"mountPath": cm_mount_path,
"readOnly": True,
}
)
# Attach planner args (always)
mc_dict = planner_dict.setdefault("extraPodSpec", {}).setdefault(
"mainContainer", {}
)
mc_args = mc_dict.setdefault("args", [])
mc_args.extend(planner_args)
# Finalize DGD services
config_dict["spec"]["services"]["Planner"] = planner_dict
# Generate mocker config with planner for testing purposes
mocker_config = _generate_mocker_config_with_planner(
args=args,
cm_mount_path=cm_mount_path,
config_map_obj=config_map_obj,
planner_dict=planner_dict,
)
pass
# Return multi-doc YAML (ConfigMap + DGD) when ConfigMap is created; else DGD only
dgd_config: list[dict[str, Any]] | dict[str, Any]
if config_map_obj is not None:
dgd_config = [config_map_obj, config_dict]
else:
dgd_config = config_dict
return dgd_config, mocker_config
return result
def _generate_mocker_config_with_planner(
args,
cm_mount_path: str,
config_map_obj: Optional[dict],
dgdr,
profile_data_mount: str,
planner_config_mount: str,
profile_data_cm: Optional[dict],
planner_config_cm: dict,
planner_dict: dict,
) -> list[dict] | dict:
"""Generate mocker DGD config with planner for testing purposes.
This loads the mocker disagg.yaml, updates the worker image, mounts the
profiling ConfigMap, sets --planner-profile-data for workers, and adds the planner service.
Args:
args: Parsed arguments namespace from profile_sla
cm_mount_path: Mount path for the ConfigMap containing profiling data
config_map_obj: The ConfigMap object (if created)
planner_dict: The planner service dict to reuse (includes planner image)
Returns:
list[dict] | dict: If a ConfigMap is generated, returns [ConfigMap, DGD];
otherwise returns a single DGD dict.
"""
# Load mocker disagg config
"""Generate mocker DGD config with planner for testing purposes."""
workspace_dir = get_workspace_dir()
mocker_config_path = os.path.join(workspace_dir, MOCKER_DISAGG_CONFIG_PATH)
with open(mocker_config_path, "r") as f:
mocker_config = yaml.safe_load(f)
# Update worker image if provided
if args.dgd_image:
for service_name, service_config in (
mocker_config.get("spec", {}).get("services", {}).items()
image = dgdr.image
if image:
for service_config in (
mocker_config.get("spec", {}).get("services", {}).values()
):
if service_config.get("extraPodSpec") and service_config[
"extraPodSpec"
].get("mainContainer"):
service_config["extraPodSpec"]["mainContainer"][
"image"
] = args.dgd_image
service_config["extraPodSpec"]["mainContainer"]["image"] = image
# Update worker args: --planner-profile-data (if available), --model-path, --model-name
model = dgdr.model
mocker_worker_names = [
MockerComponentName.prefill_worker_k8s_name,
MockerComponentName.decode_worker_k8s_name,
......@@ -467,63 +300,45 @@ def _generate_mocker_config_with_planner(
"mainContainer", {}
)
args_list = main_container.get("args", [])
if config_map_obj is not None:
if profile_data_cm is not None:
args_list = set_argument_value(
args_list, "--planner-profile-data", cm_mount_path
args_list, "--planner-profile-data", profile_data_mount
)
# Update model path and name if available in args
args_list = set_argument_value(args_list, "--model-path", args.model)
args_list = set_argument_value(args_list, "--model-name", args.model)
args_list = set_argument_value(args_list, "--model-path", model)
args_list = set_argument_value(args_list, "--model-name", model)
main_container["args"] = args_list
# Mount the ConfigMap if it exists
if config_map_obj is not None:
# Mount profiling data ConfigMap into mocker workers
if profile_data_cm is not None:
pd_cm_name = profile_data_cm["metadata"]["name"]
for worker_name in mocker_worker_names:
service_config = (
mocker_config.get("spec", {}).get("services", {}).get(worker_name)
)
if service_config:
extra_pod_spec = service_config.setdefault("extraPodSpec", {})
# Add volume
volumes = extra_pod_spec.setdefault("volumes", [])
volumes.append(
{
"name": "planner-profile-data",
"configMap": {"name": "planner-profile-data"},
"name": pd_cm_name,
"configMap": {"name": pd_cm_name},
}
)
# Add volume mount
main_container = extra_pod_spec.setdefault("mainContainer", {})
volume_mounts = main_container.setdefault("volumeMounts", [])
volume_mounts.append(
{
"name": "planner-profile-data",
"mountPath": cm_mount_path,
"name": pd_cm_name,
"mountPath": profile_data_mount,
"readOnly": True,
}
)
# Add planner service (reuse the same planner config but with mocker backend)
# Reuse planner service dict (already has both ConfigMaps mounted + --config arg)
mocker_planner_dict = copy.deepcopy(planner_dict)
# Planner args use --key=value format, so we need to find and replace
planner_main_container = mocker_planner_dict.get("extraPodSpec", {}).get(
"mainContainer", {}
)
planner_args = planner_main_container.get("args", [])
updated_planner_args = []
for arg in planner_args:
if arg.startswith("--backend="):
updated_planner_args.append("--backend=mocker")
else:
updated_planner_args.append(arg)
planner_main_container["args"] = updated_planner_args
mocker_config["spec"]["services"]["Planner"] = mocker_planner_dict
# Return multi-doc YAML (ConfigMap + DGD) when ConfigMap is created; else DGD only
if config_map_obj is not None:
return [config_map_obj, mocker_config]
config_maps = [cm for cm in [profile_data_cm, planner_config_cm] if cm is not None]
if config_maps:
return config_maps + [mocker_config]
return mocker_config
......@@ -223,7 +223,7 @@ class DynamoGraphDeploymentRequestSpec(BaseModel):
)
image: Optional[str] = Field(
default=None,
description='Image is the container image reference for the profiling job (frontend image). Example: "nvcr.io/nvidia/dynamo-runtime:latest" TODO: In a future MR, the operator will derive the backend inference image from the backend type automatically; backend images can be overridden via overrides.dgd.',
description='Image is the container image reference for the profiling job (frontend image). Example: "nvcr.io/nvidia/ai-dynamo/dynamo-frontend:1.0.0".',
)
modelCache: Optional[ModelCacheSpec] = Field(
default=None,
......
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Profiler-side validation for DynamoGraphDeploymentRequestSpec.
The auto-generated Pydantic types in ``dgdr_v1beta1_types.py`` mirror the
Go API and mark most fields as ``Optional``. The profiler requires a
stricter contract. This module validates those requirements and normalises
fields (e.g. populating defaults, resolving SLA modes) so that downstream
code can access them without ``None`` checks.
"""
from __future__ import annotations
import logging
from dynamo.planner.utils.planner_config import PlannerPreDeploymentSweepMode
from dynamo.profiler.utils.defaults import SearchStrategy
from dynamo.profiler.utils.dgdr_v1beta1_types import (
DynamoGraphDeploymentRequestSpec,
SLASpec,
WorkloadSpec,
)
from dynamo.profiler.utils.profile_common import is_planner_enabled
logger = logging.getLogger(__name__)
def validate_dgdr_for_profiler(
dgdr: DynamoGraphDeploymentRequestSpec,
) -> DynamoGraphDeploymentRequestSpec:
"""Validate and normalise a DGDR spec for the profiler.
After this function returns successfully the caller can safely access:
- ``dgdr.image`` (str, non-empty)
- ``dgdr.hardware.gpuSku`` (str, non-empty)
- ``dgdr.hardware.numGpusPerNode`` (int > 0)
- ``dgdr.workload.isl``, ``dgdr.workload.osl`` (int)
- ``dgdr.sla.ttft``, ``dgdr.sla.itl`` (float) **or** ``dgdr.sla.e2eLatency`` (float)
without additional ``None`` guards.
The function mutates ``dgdr`` in-place (e.g. populating defaults) and
returns it for convenience.
Raises:
ValueError: If a required field is missing or invalid.
"""
_validate_required_fields(dgdr)
_validate_workload(dgdr.workload)
_validate_sla(dgdr.sla)
_validate_features(dgdr)
return dgdr
# ---------------------------------------------------------------------------
# Internal validators
# ---------------------------------------------------------------------------
def _validate_required_fields(dgdr: DynamoGraphDeploymentRequestSpec) -> None:
"""Check fields the profiler treats as required."""
if not dgdr.image:
raise ValueError("'image' is required in the DGDR spec.")
if not dgdr.hardware:
raise ValueError("'hardware' is required in the DGDR spec.")
if not dgdr.hardware.gpuSku:
raise ValueError("'hardware.gpuSku' is required in the DGDR spec.")
if not dgdr.hardware.numGpusPerNode or dgdr.hardware.numGpusPerNode <= 0:
raise ValueError("'hardware.numGpusPerNode' must be a positive integer.")
# Populate defaults for optional sub-objects so callers don't need None checks
if dgdr.workload is None:
dgdr.workload = WorkloadSpec()
if dgdr.sla is None:
dgdr.sla = SLASpec()
def _validate_workload(workload: WorkloadSpec) -> None:
"""Concurrency and requestRate are mutually exclusive."""
if workload.concurrency is not None and workload.requestRate is not None:
raise ValueError(
"Only one of 'concurrency' or 'requestRate' can be provided, not both."
)
def _validate_sla(sla: SLASpec) -> None:
"""Validate SLA targets and normalise e2eLatency mode."""
for name, val in [
("ttft", sla.ttft),
("itl", sla.itl),
("e2eLatency", sla.e2eLatency),
]:
if val is not None and val <= 0:
raise ValueError(f"SLA '{name}' must be positive (got {val}).")
has_e2e = sla.e2eLatency is not None
# When e2eLatency is provided it takes precedence — null out the per-token defaults
if has_e2e:
sla.ttft = None
sla.itl = None
return
has_ttft_itl = sla.ttft is not None and sla.itl is not None
if not has_ttft_itl:
raise ValueError(
"Either both 'ttft' and 'itl', or 'e2eLatency', must be provided in the SLA spec."
)
def run_gate_checks(
dgdr: DynamoGraphDeploymentRequestSpec,
aic_supported: bool,
search_strategy: SearchStrategy,
backend: str,
) -> None:
"""Raise ValueError or log warnings for unsupported combos.
Must be called after ``validate_dgdr_for_profiler``.
"""
if is_planner_enabled(dgdr) and not aic_supported:
model = dgdr.model
system = dgdr.hardware.gpuSku.lower()
planner_cfg = dgdr.features.planner
if planner_cfg.enable_throughput_scaling:
raise ValueError(
"Throughput-based planner scaling requires AIC support, but "
f"{model} on {system}/{backend} is not supported by AIC. "
"Use a supported model/hardware/backend combination or disable throughput scaling."
)
if (
planner_cfg.pre_deployment_sweeping_mode
== PlannerPreDeploymentSweepMode.Rapid
):
logger.warning(
"Planner pre-deployment sweeping mode is 'rapid' but AIC does not support "
"%s on %s/%s. Falling back to 'none' (no pre-deployment sweeping).",
model,
system,
backend,
)
planner_cfg.pre_deployment_sweeping_mode = (
PlannerPreDeploymentSweepMode.None_
)
if search_strategy == SearchStrategy.THOROUGH and backend == "auto":
raise ValueError(
"THOROUGH search strategy does not support 'auto' backend. "
"Please specify a concrete backend (trtllm, vllm, sglang)."
)
def _validate_features(dgdr: DynamoGraphDeploymentRequestSpec) -> None:
"""Cross-field validation for features."""
if not dgdr.features:
return
# Mocker requires pre-deployment sweeping
if dgdr.features.mocker and dgdr.features.mocker.enabled and dgdr.features.planner:
sweep_mode = dgdr.features.planner.pre_deployment_sweeping_mode
if sweep_mode is None or sweep_mode == PlannerPreDeploymentSweepMode.None_:
raise ValueError(
"pre_deployment_sweeping_mode cannot be 'none' when mocker is enabled. "
"Mocker backend requires pre-deployment sweeping to generate simulated "
"performance profiles."
)
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Shared helpers and configuration for the profiler pipeline."""
import logging
import os
from dataclasses import dataclass
import pandas as pd
from dynamo.profiler.utils.config_modifiers.parallelization_mapping import (
PickedParallelConfig,
)
from dynamo.profiler.utils.dgdr_v1beta1_types import DynamoGraphDeploymentRequestSpec
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Published container image naming conventions
# ---------------------------------------------------------------------------
# Mapping from backend name to the image-name component of the published
# backend runtime image.
# e.g. vllm → nvcr.io/nvidia/ai-dynamo/vllm-runtime:1.0.0
BACKEND_IMAGE_NAMES: dict[str, str] = {
"vllm": "vllm-runtime",
"sglang": "sglang-runtime",
"trtllm": "tensorrtllm-runtime",
}
def derive_backend_image(profiler_image: str, backend: str) -> str:
"""Derive the backend worker image from the profiler image.
Replaces the image name (the last ``/``-delimited component, before any
``:tag``) with the backend-specific runtime image name, preserving the
registry path and tag unchanged.
Examples::
derive_backend_image(
"nvcr.io/nvidia/ai-dynamo/dynamo-frontend:1.0.0", "vllm"
)
# → "nvcr.io/nvidia/ai-dynamo/vllm-runtime:1.0.0"
derive_backend_image("myregistry.io/sglang-runtime:1.0.0", "sglang")
# → "myregistry.io/sglang-runtime:1.0.0"
Args:
profiler_image: Any Docker image reference of the form
``[REGISTRY/]NAME[:TAG]``.
backend: The resolved backend type (``'vllm'``, ``'sglang'``, or
``'trtllm'``).
Returns:
The backend container image string.
Raises:
ValueError: If *backend* is not a recognised backend.
"""
backend_image_name = BACKEND_IMAGE_NAMES.get(backend)
if backend_image_name is None:
raise ValueError(
f"Cannot derive backend image for unknown backend '{backend}'. "
f"Supported backends: {list(BACKEND_IMAGE_NAMES.keys())}"
)
# Split off the last path component: "registry/path/name:tag" → "name:tag"
slash_idx = profiler_image.rfind("/")
prefix = profiler_image[: slash_idx + 1] if slash_idx >= 0 else ""
suffix = profiler_image[slash_idx + 1 :]
colon_idx = suffix.find(":")
tag = suffix[colon_idx:] if colon_idx >= 0 else ""
return f"{prefix}{backend_image_name}{tag}"
# ---------------------------------------------------------------------------
# Operational defaults not part of DynamoGraphDeploymentRequestSpec
# ---------------------------------------------------------------------------
DEFAULT_OUTPUT_DIR = "profiling_results"
DEFAULT_NAMESPACE = os.environ.get("DGDR_NAMESPACE", "dynamo-sla-profiler")
DEFAULT_DEPLOYMENT_TIMEOUT = 3600
DEFAULT_PREFILL_INTERPOLATION_GRANULARITY = 16
DEFAULT_DECODE_INTERPOLATION_GRANULARITY = 6
DEFAULT_DRY_RUN = False
@dataclass
class ProfilerOperationalConfig:
"""Operational knobs that are not part of the DGDR spec."""
output_dir: str = DEFAULT_OUTPUT_DIR
k8s_namespace: str = DEFAULT_NAMESPACE
deployment_timeout: int = DEFAULT_DEPLOYMENT_TIMEOUT
prefill_interpolation_granularity: int = DEFAULT_PREFILL_INTERPOLATION_GRANULARITY
decode_interpolation_granularity: int = DEFAULT_DECODE_INTERPOLATION_GRANULARITY
dry_run: bool = DEFAULT_DRY_RUN
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def picked_config_from_row(prefix: str, row: pd.Series) -> PickedParallelConfig:
"""Extract a PickedParallelConfig from a picked ColumnsDisagg DataFrame row."""
return PickedParallelConfig(
tp=int(row.get(f"{prefix}tp", 1)),
pp=int(row.get(f"{prefix}pp", 1)),
dp=int(row.get(f"{prefix}dp", 1)),
moe_tp=int(row.get(f"{prefix}moe_tp", 1)),
moe_ep=int(row.get(f"{prefix}moe_ep", 1)),
)
def resolve_model_path(dgdr: DynamoGraphDeploymentRequestSpec) -> str:
"""Resolve the model path, preferring local PVC mount over HF ID."""
if (
dgdr.modelCache
and dgdr.modelCache.pvcName
and dgdr.modelCache.pvcMountPath
and dgdr.modelCache.pvcModelPath
):
mount = dgdr.modelCache.pvcMountPath.rstrip("/")
sub = dgdr.modelCache.pvcModelPath.strip("/")
local_path = f"{mount}/{sub}"
if os.path.isdir(local_path):
return local_path
return dgdr.model
def is_planner_enabled(dgdr: DynamoGraphDeploymentRequestSpec) -> bool:
"""True when the DGDR spec has a planner config with scaling enabled."""
return (
dgdr.features is not None
and dgdr.features.planner is not None
and dgdr.features.planner.scaling_enabled()
)
def determine_picking_mode(dgdr: DynamoGraphDeploymentRequestSpec) -> str:
target_load_provided = dgdr.workload is not None and (
dgdr.workload.requestRate is not None or dgdr.workload.concurrency is not None
)
if is_planner_enabled(dgdr):
return "autoscale"
elif target_load_provided:
return "load_match"
return "default"
def warn_and_update_sla(
best_latencies: dict,
target_ttft: float,
target_tpot: float,
) -> tuple[float, float]:
"""Warn if SLA is unachievable; return (possibly updated) targets."""
achieved_ttft = best_latencies.get("ttft", 0.0)
achieved_tpot = best_latencies.get("tpot", 0.0)
if achieved_ttft > target_ttft:
logger.warning(
"TTFT SLA %.1fms is unachievable. Best achievable: %.1fms. Updating SLA.",
target_ttft,
achieved_ttft,
)
target_ttft = achieved_ttft
if achieved_tpot > target_tpot:
logger.warning(
"ITL SLA %.1fms is unachievable. Best achievable: %.1fms. Updating SLA.",
target_tpot,
achieved_tpot,
)
target_tpot = achieved_tpot
return target_ttft, target_tpot
def warn_gpu_shortage(
picking_mode: str,
best_latencies: dict,
total_gpus: int,
) -> None:
if picking_mode != "load_match":
return
gpus_needed = best_latencies.get("total_gpus_needed")
if gpus_needed is not None and gpus_needed > total_gpus:
logger.warning(
"Load target requires %d GPUs but only %d available. "
"Consider adding more GPUs or reducing the load target.",
gpus_needed,
total_gpus,
)
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
import argparse
import ast
import os
from typing import Any, Dict
import yaml
from dynamo.profiler.utils.defaults import SearchStrategy
from dynamo.profiler.utils.planner_utils import add_planner_arguments_to_parser
from dynamo.profiler.utils.search_space_autogen import auto_generate_search_space
def _get(cfg: Dict[str, Any], camel: str, snake: str, default: Any = None) -> Any:
"""Get config value with camelCase preferred, snake_case fallback."""
if camel in cfg:
return cfg[camel]
return cfg.get(snake, default)
def _camel_to_snake(name: str) -> str:
"""Convert camelCase to snake_case."""
import re
# Insert underscore before uppercase letters and lowercase
s1 = re.sub("(.)([A-Z][a-z]+)", r"\1_\2", name)
return re.sub("([a-z0-9])([A-Z])", r"\1_\2", s1).lower()
def parse_config_string(config_str: str) -> Dict[str, Any]:
"""Parse configuration string as Python dict literal, YAML, or JSON.
Supports multiple input formats:
1. Python dict literal: "{'engine': {'backend': 'vllm'}, 'sla': {'isl': 3000}}"
2. YAML string: "engine:\n backend: vllm\nsla:\n isl: 3000"
3. JSON string: '{"engine": {"backend": "vllm"}, "sla": {"isl": 3000}}'
Args:
config_str: Configuration string in one of the supported formats
Returns:
Dictionary containing the configuration
Raises:
ValueError: If config cannot be parsed or is not a dictionary
"""
config = None
# Try 1: Parse as Python dict literal (most direct for CLI)
try:
config = ast.literal_eval(config_str)
if isinstance(config, dict):
return config
except (ValueError, SyntaxError):
pass
# Try 2: Parse as YAML/JSON (for K8s ConfigMaps and files)
try:
config = yaml.safe_load(config_str)
if config is not None and isinstance(config, dict):
return config
except yaml.YAMLError:
pass
# If we got here, parsing failed
raise ValueError(
"Failed to parse config string. Expected Python dict literal, YAML, or JSON format. "
f"Examples:\n"
f" Python dict: \"{'engine': {'backend': 'vllm'}}\"\n"
f' YAML: "engine:\\n backend: vllm"\n'
f' JSON: \'{{"engine": {{"backend": "vllm"}}}}\''
)
def create_profiler_parser() -> argparse.Namespace:
"""
Create argument parser with support for YAML config string.
Config structure (camelCase preferred, snake_case supported for backwards compat):
outputDir: String (path to the output results directory, default: profiling_results)
deployment:
namespace: String (kubernetes namespace, default: dynamo-sla-profiler)
serviceName: String (service name, default: "")
model: String (served model name)
dgdImage: String (container image to use for DGD components (frontend, planner, workers), overrides images in config file)
deploymentTimeout: Int (maximum time to wait for deployment to become ready in seconds, default: 1800)
modelCache:
pvcName: String (name of the PVC to mount the model cache,
if not provided, model must be HF name and will download from HF, default: "")
pvcPath: String (path to the model cache in the PVC, default: "")
mountPath: String (path to the model cache in the container,
note that the PVC must be mounted to the same path for the profiling job,
default: "/opt/model-cache")
engine:
backend: String (backend type, currently support [vllm, sglang, trtllm], default: vllm)
config: String (path to the DynamoGraphDeployment config file, default: "")
maxContextLength: Int (maximum context length supported by the served model, default: 0)
isMoeModel: Boolean (enable MoE (Mixture of Experts) model support, use TEP for prefill and DEP for decode, default: False)
hardware:
minNumGpusPerEngine: Int (minimum number of GPUs per engine, default: 0)
maxNumGpusPerEngine: Int (maximum number of GPUs per engine, default: 0)
numGpusPerNode: Int (number of GPUs per node, default: 0)
gpuModel: String (GPU model, used for auto-calculating search space, default: "")
gpuVramMib: Int (GPU VRAM in MiB, used for auto-calculating search space, default: 0)
system: String (target hardware system, e.g. h100_sxm, h200_sxm, default: None)
searchStrategy: String (search strategy for profiling: 'rapid' uses AI Configurator for quick estimation, 'thorough' runs actual deployments for comprehensive results, enum: [rapid, thorough], default: rapid)
sweep:
prefillInterpolationGranularity: Int (how many samples to benchmark to interpolate TTFT under different ISL, default: 16)
decodeInterpolationGranularity: Int (how many samples to benchmark to interpolate ITL under different active kv cache size and decode context length, default: 6)
dryRun: Boolean (dry run the profile job, default: False)
pickWithWebui: Boolean (pick the best parallelization mapping using webUI, default: False)
webuiPort: Int (webUI port, default: $PROFILER_WEBUI_PORT or 8000)
sla:
isl: Int (target input sequence length, default: 3000)
osl: Int (target output sequence length, default: 500)
ttft: Float (target Time To First Token in milliseconds, default: 50)
itl: Float (target Inter Token Latency in milliseconds, default: 10)
planner: (planner arguments)
e.g., plannerMinEndpoint: 2
"""
# Step 1: Pre-parse to check if --profile-config is provided
pre_parser = argparse.ArgumentParser(add_help=False)
pre_parser.add_argument("--profile-config", type=str)
pre_args, _ = pre_parser.parse_known_args()
# Step 2: Parse config if provided
config = {}
if pre_args.profile_config:
config = parse_config_string(pre_args.profile_config)
# Step 3: Create main parser with config-aware defaults
parser = argparse.ArgumentParser(
description="Profile the TTFT and ITL of the Prefill and Decode engine with different parallelization mapping. When profiling prefill we mock/fix decode,when profiling decode we mock/fix prefill."
)
parser.add_argument(
"--profile-config",
type=str,
help="Configuration as Python dict literal, YAML, or JSON string. CLI args override config values. "
"Example: \"{'engine': {'backend': 'vllm', 'config': '/path'}, 'sla': {'isl': 3000}}\"",
)
# CLI arguments with config-aware defaults (using nested .get() for cleaner code)
parser.add_argument(
"--model",
type=str,
default=config.get("deployment", {}).get("model", ""),
help="Served model name",
)
model_cache_config = config.get("deployment", {}).get("modelCache", {})
parser.add_argument(
"--model-cache-pvc-name",
type=str,
default=model_cache_config.get("pvcName", ""),
help="Name of the PVC that contains the model weights. If not provided, args.model must be a HF model name and will download from HF",
)
parser.add_argument(
"--model-cache-pvc-path",
type=str,
default=model_cache_config.get("pvcPath", ""),
help="Path to the model cache in the PVC",
)
parser.add_argument(
"--model-cache-pvc-mount-path",
type=str,
default=model_cache_config.get("mountPath", "/opt/model-cache"),
help="Path to the model cache in the container, note that the PVC must be mounted to the same path for the profiling job",
)
deployment_cfg = config.get("deployment", {})
parser.add_argument(
"--dgd-image",
type=str,
default=_get(deployment_cfg, "dgdImage", "dgd_image", ""),
help="Container image to use for DGD components (frontend, planner, workers). Overrides images in config file.",
)
parser.add_argument(
"--deployment-timeout",
type=int,
default=_get(deployment_cfg, "deploymentTimeout", "deployment_timeout", 1800),
help="Maximum time to wait for deployment to become ready in seconds (default: 1800)",
)
parser.add_argument(
"--namespace",
type=str,
default=deployment_cfg.get("namespace", "dynamo-sla-profiler"),
help="Kubernetes namespace to deploy the DynamoGraphDeployment",
)
parser.add_argument(
"--backend",
type=str,
default=config.get("engine", {}).get("backend", "vllm"),
choices=["vllm", "sglang", "trtllm"],
help="backend type, currently support [vllm, sglang, trtllm]",
)
parser.add_argument(
"--config",
type=str,
default=config.get("engine", {}).get("config", ""),
required=False,
help="Path to the DynamoGraphDeployment config file (required, can be provided via CLI or config)",
)
parser.add_argument(
"--output-dir",
type=str,
default=_get(config, "outputDir", "output_dir", "profiling_results"),
help="Path to the output results directory",
)
hardware_cfg = config.get("hardware", {})
parser.add_argument(
"--min-num-gpus-per-engine",
type=int,
default=_get(hardware_cfg, "minNumGpusPerEngine", "min_num_gpus_per_engine", 0),
help="minimum number of GPUs per engine",
)
parser.add_argument(
"--max-num-gpus-per-engine",
type=int,
default=_get(hardware_cfg, "maxNumGpusPerEngine", "max_num_gpus_per_engine", 0),
help="maximum number of GPUs per engine",
)
parser.add_argument(
"--num-gpus-per-node",
type=int,
default=_get(hardware_cfg, "numGpusPerNode", "num_gpus_per_node", 0),
help="Number of GPUs per node",
)
parser.add_argument(
"--gpu-model",
type=str,
default=_get(hardware_cfg, "gpuModel", "gpu_model", ""),
help="GPU model name (used for auto-calculating search space)",
)
parser.add_argument(
"--gpu-vram-mib",
type=int,
default=_get(hardware_cfg, "gpuVramMib", "gpu_vram_mib", 0),
help="GPU VRAM in MiB (used for auto-calculating search space)",
)
parser.add_argument(
"--system",
type=str,
default=_get(hardware_cfg, "system", "system", None),
help="Target hardware system, e.g. h100_sxm, h200_sxm",
)
parser.add_argument(
"--isl",
type=int,
default=config.get("sla", {}).get("isl", 3000),
help="target input sequence length",
)
parser.add_argument(
"--osl",
type=int,
default=config.get("sla", {}).get("osl", 500),
help="target output sequence length",
)
parser.add_argument(
"--ttft",
type=float,
default=config.get("sla", {}).get("ttft", 50.0),
help="target Time To First Token (float, in milliseconds)",
)
parser.add_argument(
"--itl",
type=float,
default=config.get("sla", {}).get("itl", 10.0),
help="target Inter Token Latency (float, in milliseconds)",
)
# High-level profiling strategy argument
parser.add_argument(
"--search-strategy",
type=SearchStrategy,
default=SearchStrategy(
_get(config, "searchStrategy", "search_strategy", "rapid")
),
choices=list(SearchStrategy),
help="Search strategy for profiling: 'rapid' uses AI Configurator for quick estimation, 'thorough' runs actual deployments for comprehensive results",
)
# arguments used for interpolating TTFT and ITL under different ISL/OSL
engine_cfg = config.get("engine", {})
parser.add_argument(
"--max-context-length",
type=int,
default=_get(engine_cfg, "maxContextLength", "max_context_length", 0),
help="maximum context length supported by the served model",
)
sweep_cfg = config.get("sweep", {})
parser.add_argument(
"--prefill-interpolation-granularity",
type=int,
default=_get(
sweep_cfg,
"prefillInterpolationGranularity",
"prefill_interpolation_granularity",
16,
),
help="how many samples to benchmark to interpolate TTFT under different ISL",
)
parser.add_argument(
"--decode-interpolation-granularity",
type=int,
default=_get(
sweep_cfg,
"decodeInterpolationGranularity",
"decode_interpolation_granularity",
6,
),
help="how many samples to benchmark to interpolate ITL under different active kv cache size and decode context length",
)
parser.add_argument(
"--service-name",
type=str,
default=_get(deployment_cfg, "serviceName", "service_name", ""),
help="Service name for port forwarding (default: {deployment_name}-frontend)",
)
parser.add_argument(
"--dry-run",
action="store_true",
default=_get(sweep_cfg, "dryRun", "dry_run", False),
help="Dry run the profile job",
)
parser.add_argument(
"--pick-with-webui",
action="store_true",
default=_get(sweep_cfg, "pickWithWebui", "pick_with_webui", False),
help="Pick the best parallelization mapping using webUI",
)
default_webui_port = 8000
webui_port_env = os.environ.get("PROFILER_WEBUI_PORT")
if webui_port_env:
default_webui_port = int(webui_port_env)
parser.add_argument(
"--webui-port",
type=int,
default=_get(sweep_cfg, "webuiPort", "webui_port", default_webui_port),
help="WebUI port",
)
# Dynamically add all planner arguments from planner_argparse.py
add_planner_arguments_to_parser(parser, prefix="planner-")
# Set defaults for any planner arguments found in config.planner
# Normalize keys: camelCase -> snake_case, hyphens -> underscores
planner_config = config.get("planner", {})
if planner_config:
normalized_planner_config = {
_camel_to_snake(key).replace("-", "_"): value
for key, value in planner_config.items()
}
parser.set_defaults(**normalized_planner_config)
# Parse arguments
args = parser.parse_args()
# remove --profile-config from args
if hasattr(args, "profile_config"):
delattr(args, "profile_config")
# Validate required arguments
# Either --model or --config (or both) must be provided
if not args.model and not args.config:
parser.error("--model or --config is required (provide at least one)")
auto_generate_search_space(args)
return args
......@@ -12,7 +12,7 @@
# For Multimodal EPD (required for device_map="auto" in vision model loading)
accelerate
aiconfigurator[webapp] @ git+https://github.com/ai-dynamo/aiconfigurator.git@7a24afd98714af13f061cffe784d4808f5356d45
aiconfigurator[webapp] @ git+https://github.com/ai-dynamo/aiconfigurator.git@168a948d5bc32209728fe8639191a9e0d9083d18
aiofiles
aiperf @ git+https://github.com/ai-dynamo/aiperf.git@54cd6dc820bff8bfebc875da104e59d745e14f75
av==15.0.0
......
......@@ -594,8 +594,7 @@ spec:
image:
description: |-
Image is the container image reference for the profiling job (frontend image).
Example: "nvcr.io/nvidia/dynamo-runtime:latest"
backend type automatically; backend images can be overridden via overrides.dgd.
Example: "nvcr.io/nvidia/ai-dynamo/dynamo-frontend:1.0.0".
type: string
model:
description: |-
......
......@@ -357,9 +357,7 @@ type DynamoGraphDeploymentRequestSpec struct {
Backend BackendType `json:"backend,omitempty"`
// Image is the container image reference for the profiling job (frontend image).
// Example: "nvcr.io/nvidia/dynamo-runtime:latest"
// TODO: In a future MR, the operator will derive the backend inference image from the
// backend type automatically; backend images can be overridden via overrides.dgd.
// Example: "nvcr.io/nvidia/ai-dynamo/dynamo-frontend:1.0.0".
// +optional
Image string `json:"image,omitempty"`
......
......@@ -594,8 +594,7 @@ spec:
image:
description: |-
Image is the container image reference for the profiling job (frontend image).
Example: "nvcr.io/nvidia/dynamo-runtime:latest"
backend type automatically; backend images can be overridden via overrides.dgd.
Example: "nvcr.io/nvidia/ai-dynamo/dynamo-frontend:1.0.0".
type: string
model:
description: |-
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment