# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Profiler main entry point.""" import logging import os from typing import Any import yaml from aiconfigurator.generator.enumerate import check_model_hardware_support from aiconfigurator.sdk.utils import get_model_config_from_model_path from deploy.utils.dynamo_deployment import cleanup_remaining_deployments from dynamo.profiler.interpolation import run_interpolation from dynamo.profiler.rapid import run_rapid from dynamo.profiler.thorough import run_thorough from dynamo.profiler.utils.config_modifiers.parallelization_mapping import ( PickedParallelConfig, ) from dynamo.profiler.utils.config_modifiers.protocol import apply_dgd_overrides from dynamo.profiler.utils.defaults import SearchStrategy from dynamo.profiler.utils.dgd_generation import ( assemble_final_config, build_aic_interpolation_spec, ) from dynamo.profiler.utils.dgdr_v1beta1_types import ( BackendType, DynamoGraphDeploymentRequestSpec, ProfilingPhase, ) from dynamo.profiler.utils.dgdr_validate import ( valid_dgdr_spec, validate_dgdr_dynamo_features, ) from dynamo.profiler.utils.profile_common import ( ProfilerOperationalConfig, determine_picking_mode, get_profiling_job_tolerations, inject_tolerations_into_dgd, needs_profile_data, picked_config_from_row, resolve_model_path, warn_and_update_sla, warn_gpu_shortage, ) from dynamo.profiler.utils.profiler_status import ProfilerStatus, write_profiler_status logger = logging.getLogger(__name__) _CONCRETE_BACKENDS = ["trtllm", "sglang", "vllm"] def _apply_tolerations_to_final_config(final_config: Any, tolerations: list) -> Any: """Apply tolerations to a final DGD config (dict or multi-doc list).""" if not tolerations or not final_config: return final_config if isinstance(final_config, list): result = list(final_config) result[-1] = inject_tolerations_into_dgd(result[-1], tolerations) return result return inject_tolerations_into_dgd(final_config, tolerations) def _check_auto_backend_support(model: str, system: str) -> bool: """ Return True if *any* concrete backend is AIC-supported for this model/system. TODO: move this function to AIC and handle partially supported model x backend x hardware """ return any( check_model_hardware_support(model, system, b) for b in _CONCRETE_BACKENDS ) def _extract_profiler_params(dgdr: DynamoGraphDeploymentRequestSpec) -> tuple: """Pull all profiler parameters from dgdr and log them.""" model = dgdr.model backend = BackendType(dgdr.backend).value.lower() system = dgdr.hardware.gpuSku.lower() total_gpus = dgdr.hardware.totalGpus isl = dgdr.workload.isl osl = dgdr.workload.osl request_latency = dgdr.sla.e2eLatency if request_latency is not None: target_ttft = request_latency target_tpot = request_latency else: target_ttft = dgdr.sla.ttft target_tpot = dgdr.sla.itl search_strategy = SearchStrategy(dgdr.searchStrategy) picking_mode = determine_picking_mode(dgdr) logger.info( "Profiler config: model=%s, backend=%s, system=%s, total_gpus=%s, " "isl=%d, osl=%d, ttft=%.1f, itl=%.1f, e2e_latency=%s, strategy=%s, picking=%s", model, backend, system, total_gpus, isl, osl, target_ttft, target_tpot, request_latency, search_strategy.value, picking_mode, ) return ( model, backend, system, total_gpus, isl, osl, request_latency, target_ttft, target_tpot, search_strategy, picking_mode, ) async def _execute_strategy( dgdr: DynamoGraphDeploymentRequestSpec, ops: ProfilerOperationalConfig, picking_mode: str, aic_supported: bool, model: str, system: str, backend: str, total_gpus: int, isl: int, osl: int, target_ttft: float, target_tpot: float, request_latency: float | None, deployment_clients: list, search_strategy: SearchStrategy, ) -> tuple[dict, PickedParallelConfig, PickedParallelConfig, float, float]: """Dispatch dry-run / RAPID / THOROUGH; extract configs; update SLA targets.""" if ops.dry_run: logger.info("Dry run mode — skipping deployment and benchmarking.") best_prefill_config = PickedParallelConfig(tp=1) best_decode_config = PickedParallelConfig(tp=1) pick_result: dict = {} else: if search_strategy == SearchStrategy.RAPID: pick_result = run_rapid( dgdr, picking_mode, aic_supported, model, system, backend, total_gpus, isl, osl, target_ttft, target_tpot, request_latency, ) else: pick_result = await run_thorough( dgdr, ops, picking_mode, model, system, backend, total_gpus, isl, osl, target_ttft, target_tpot, request_latency, deployment_clients, ) ops.current_phase = ProfilingPhase.SelectingConfig write_profiler_status( ops.output_dir, status=ProfilerStatus.RUNNING, message="Filtering results and selecting cost-efficient configuration", phase=ProfilingPhase.SelectingConfig, ) best_config_df = pick_result["best_config_df"] best_latencies = pick_result["best_latencies"] target_ttft, target_tpot = warn_and_update_sla( best_latencies, target_ttft, target_tpot, ) warn_gpu_shortage(picking_mode, best_latencies, total_gpus or 0) if best_config_df is not None and not best_config_df.empty: row = best_config_df.iloc[0] best_prefill_config = picked_config_from_row("(p)", row) best_decode_config = picked_config_from_row("(d)", row) else: best_prefill_config = PickedParallelConfig(tp=1) best_decode_config = PickedParallelConfig(tp=1) logger.info( "Selected prefill: %s (%d GPUs, tp=%d pp=%d dp=%d moe_tp=%d moe_ep=%d), " "decode: %s (%d GPUs, tp=%d pp=%d dp=%d moe_tp=%d moe_ep=%d)", best_prefill_config.label(), best_prefill_config.num_gpus, best_prefill_config.tp, best_prefill_config.pp, best_prefill_config.dp, best_prefill_config.moe_tp, best_prefill_config.moe_ep, best_decode_config.label(), best_decode_config.num_gpus, best_decode_config.tp, best_decode_config.pp, best_decode_config.dp, best_decode_config.moe_tp, best_decode_config.moe_ep, ) return ( pick_result, best_prefill_config, best_decode_config, target_ttft, target_tpot, ) def _write_final_output(ops: ProfilerOperationalConfig, final_config: Any) -> bool: """Write final_config.yaml and profiler status. Returns False on unrecoverable failure.""" output_file = f"{ops.output_dir}/final_config.yaml" if not final_config: if ops.dry_run: logger.warning("Dry run mode — no DGD config produced (expected).") with open(output_file, "w") as f: yaml.safe_dump(None, f, sort_keys=False) else: error_msg = "Profiler did not produce a DGD config." logger.error(error_msg) write_profiler_status( ops.output_dir, status=ProfilerStatus.FAILED, error=error_msg, message=error_msg, phase=ProfilingPhase.GeneratingDGD, ) return False else: with open(output_file, "w") as f: if isinstance(final_config, list): yaml.safe_dump_all(final_config, f, sort_keys=False) else: yaml.safe_dump(final_config, f, sort_keys=False) logger.info("Final DGD config saved to %s", output_file) write_profiler_status( ops.output_dir, status=ProfilerStatus.SUCCESS, message="Profiler completed successfully", outputs={ "final_config": "final_config.yaml", }, phase=ProfilingPhase.Done, ) return True async def run_profile( dgdr: DynamoGraphDeploymentRequestSpec, ops: ProfilerOperationalConfig | None = None, ) -> None: """Run the profiling pipeline. Args: dgdr: The DynamoGraphDeploymentRequest spec describing the model, hardware, workload, SLA, and feature configuration. ops: Operational knobs (output dir, namespace, granularity, etc.). Uses defaults when ``None``. """ if ops is None: ops = ProfilerOperationalConfig() deployment_clients: list = [] os.makedirs(ops.output_dir, exist_ok=True) write_profiler_status( ops.output_dir, status=ProfilerStatus.RUNNING, message="Profiler job started", phase=ProfilingPhase.Initializing, ) try: # Validate DGDR spec — after this, required fields are guaranteed non-None valid_dgdr_spec(dgdr) ( model, backend, system, total_gpus, isl, osl, request_latency, target_ttft, target_tpot, search_strategy, picking_mode, ) = _extract_profiler_params(dgdr) if backend == "auto": aic_supported = _check_auto_backend_support(model, system) else: aic_supported = check_model_hardware_support(model, system, backend) # then validate DGDR features based on AIC support validate_dgdr_dynamo_features(dgdr, aic_supported) ops.current_phase = ProfilingPhase.SweepingPrefill write_profiler_status( ops.output_dir, status=ProfilerStatus.RUNNING, message="Sweeping parallelization strategies", phase=ops.current_phase, ) ( pick_result, best_prefill_config, best_decode_config, target_ttft, target_tpot, ) = await _execute_strategy( dgdr, ops, picking_mode, aic_supported, model, system, backend, total_gpus, isl, osl, target_ttft, target_tpot, request_latency, deployment_clients, search_strategy, ) dgd_config = pick_result.get("dgd_config") if not ops.dry_run else None resolved_backend = pick_result.get("resolved_backend", backend) if dgd_config and dgdr.overrides and dgdr.overrides.dgd: dgd_config = apply_dgd_overrides(dgd_config, dgdr.overrides.dgd) logger.info("Applied DGD overrides to the picked DGD config.") job_tolerations = get_profiling_job_tolerations(dgdr) if job_tolerations and dgd_config: dgd_config = inject_tolerations_into_dgd(dgd_config, job_tolerations) logger.debug( "Propagated %d profiling-job toleration(s) to the picked DGD config.", len(job_tolerations), ) # --------------------------------------------------------------- # Interpolation curves — only needed when something consumes the # per-engine performance data on disk (thorough-mode planner or # mocker). Rapid-mode planner bootstraps AIC in-process at # startup, so the profiler skips the NPZ sweep for that case. # --------------------------------------------------------------- chosen_exp = pick_result.get("chosen_exp", "") is_disagg_config = chosen_exp not in ("agg",) and bool(chosen_exp) # Compute max context length unconditionally — both the NPZ sweep # (thorough, mocker) and the planner's rapid-mode AIC spec need it. try: model_cfg = get_model_config_from_model_path(resolve_model_path(dgdr)) sweep_max_context_length = model_cfg.get("max_position_embeddings", 0) except Exception: logger.warning("Could not fetch model max context length.") sweep_max_context_length = 0 if not sweep_max_context_length: sweep_max_context_length = isl * 2 if isl > 0 else 8192 if not ops.dry_run and dgd_config and needs_profile_data(dgdr): ops.current_phase = ProfilingPhase.BuildingCurves write_profiler_status( ops.output_dir, status=ProfilerStatus.RUNNING, message="Building interpolation curves for planner integration", phase=ops.current_phase, ) if not is_disagg_config: # TODO: agg + throughput-scaling has no profiling-data # fallback today. The NPZ sweep (thorough) and the AIC # spec (rapid, see build_aic_interpolation_spec) are both # shaped around prefill + decode picks. For agg picks the # planner currently falls back to DYN_BENCHMARK_MODE at # runtime only. Extend AICInterpolationSpec and # run_interpolation to carry an agg_pick so both paths # work for aggregated deployments too. logger.info( "Picked config is aggregated (chosen_exp=%r) — " "skipping interpolation (requires disaggregated config).", chosen_exp, ) else: await run_interpolation( dgdr, ops, dgd_config, best_prefill_config, best_decode_config, resolved_backend, sweep_max_context_length, deployment_clients, job_tolerations=job_tolerations, ) # --------------------------------------------------------------- # Final DGD assembly # --------------------------------------------------------------- ops.current_phase = ProfilingPhase.GeneratingDGD write_profiler_status( ops.output_dir, status=ProfilerStatus.RUNNING, message="Packaging data and generating final DGD YAML", phase=ops.current_phase, ) aic_spec = ( build_aic_interpolation_spec( dgdr, best_prefill_pick=best_prefill_config, best_decode_pick=best_decode_config, isl=isl, osl=osl, sweep_max_context_length=sweep_max_context_length, resolved_backend=resolved_backend, system=system, prefill_interpolation_granularity=ops.prefill_interpolation_granularity, decode_interpolation_granularity=ops.decode_interpolation_granularity, ) if is_disagg_config and not ops.dry_run else None ) final_config = assemble_final_config( dgdr, ops, dgd_config, best_prefill_config, best_decode_config, aic_spec=aic_spec, resolved_backend=resolved_backend, ) # --- Apply DGD overrides (user-supplied partial DGD) --- if final_config and dgdr.overrides and dgdr.overrides.dgd: if isinstance(final_config, list): final_config[-1] = apply_dgd_overrides( final_config[-1], dgdr.overrides.dgd ) elif isinstance(final_config, dict): final_config = apply_dgd_overrides(final_config, dgdr.overrides.dgd) logger.info("Applied DGD overrides to the final config.") # Propagate profiling-job tolerations to the final DGD (covers any # services added by assemble_final_config, e.g. Planner). if job_tolerations and final_config: final_config = _apply_tolerations_to_final_config( final_config, job_tolerations ) logger.debug( "Propagated %d profiling-job toleration(s) to the final DGD config.", len(job_tolerations), ) if not _write_final_output(ops, final_config): return except Exception as e: logger.exception("Profile job failed with error") write_profiler_status( ops.output_dir, status=ProfilerStatus.FAILED, error=str(e), message=f"Profiler failed with exception: {type(e).__name__}", phase=ops.current_phase, ) raise finally: logger.info("Performing final cleanup of any remaining deployments...") await cleanup_remaining_deployments(deployment_clients, ops.k8s_namespace) logger.info("Final cleanup completed.")