Unverified Commit 26188122 authored by Hongkuan Zhou's avatar Hongkuan Zhou Committed by GitHub
Browse files

feat(planner): own AIC interpolation; fix MoE-DEP bugs in rapid mode (#8335)


Signed-off-by: default avatarhongkuanz <hongkuanz@nvidia.com>
Co-authored-by: default avatarClaude Opus 4.7 (1M context) <noreply@anthropic.com>
parent 4ef28940
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
"""Schema for the profiler → planner AIC interpolation handoff.
When the profiler runs in rapid mode, it picks parallelism configs with
AIConfigurator but does NOT run interpolation itself. Instead it serialises
this ``AICInterpolationSpec`` onto the planner's ConfigMap. At bootstrap the
planner lazy-imports ``aiconfigurator`` and runs the interpolation in-process.
"""
from typing import Literal
from pydantic import BaseModel, Field
from dynamo.planner.config.parallelization import PickedParallelConfig
class AICInterpolationSpec(BaseModel):
"""Everything the planner needs to reproduce the rapid-mode AIC sweep.
The picks come straight from AIC's picker DataFrame (via
:func:`dynamo.profiler.utils.profile_common.picked_config_from_row`) so
any AIC-valid pick is representable.
"""
hf_id: str = Field(description="HuggingFace model id, e.g. Qwen/Qwen3-32B")
system: str = Field(description="AIC system identifier, e.g. h200_sxm")
backend: Literal["trtllm", "vllm", "sglang"]
isl: int = Field(gt=0)
osl: int = Field(gt=0)
sweep_max_context_length: int = Field(gt=0)
prefill_interpolation_granularity: int = Field(gt=0)
decode_interpolation_granularity: int = Field(gt=0)
prefill_pick: PickedParallelConfig
decode_pick: PickedParallelConfig
......@@ -32,6 +32,9 @@ class VllmComponentName(ComponentName):
decode_worker_k8s_name = "VllmDecodeWorker"
decode_worker_component_name = "backend"
decode_worker_endpoint = "generate"
# Aggregated mode emits a single worker; name matches VllmWorker
# log identifier in dynamo.vllm.main.
agg_worker_k8s_name = "VllmWorker"
class SGLangComponentName(ComponentName):
......
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
"""Parallelization config shared between the profiler and the planner.
``PickedParallelConfig`` stores the full ``(tp, pp, dp, moe_tp, moe_ep)`` tuple
that AIConfigurator's picker emits. Both the profiler (which picks) and the
planner (which consumes the pick to bootstrap perf models) need this type, so
it lives under ``dynamo.planner.config`` rather than the profiler tree.
It is a pydantic ``BaseModel`` so it serialises cleanly into the planner
ConfigMap as part of ``AICInterpolationSpec``.
"""
from pydantic import BaseModel, ConfigDict
class PickedParallelConfig(BaseModel):
"""Lightweight representation of a picked parallelization config.
Uses the same ``(tp, pp, dp, moe_tp, moe_ep)`` tuple that AIC's enumeration
and picking pipelines produce. Frozen so instances are hashable.
"""
model_config = ConfigDict(frozen=True)
tp: int = 1
pp: int = 1
dp: int = 1
moe_tp: int = 1
moe_ep: int = 1
@property
def num_gpus(self) -> int:
return self.tp * self.pp * self.dp
@property
def tp_size(self) -> int:
"""Effective TP for KV-head splitting (TP or TEP; 1 for DEP).
.. warning::
KV-head-split semantics ONLY. This is **NOT** the same quantity as
AIConfigurator's ``ModelConfig.tp_size`` (which is attention TP
per rank). Never pass this value into AIC kwargs — use
:func:`picked_to_aic_model_config_kwargs` instead.
"""
if self.moe_ep > 1:
return 1
if self.moe_tp > 1:
return self.moe_tp
return self.tp
def label(self) -> str:
if self.moe_ep > 1:
return f"dep{self.moe_ep}"
elif self.moe_tp > 1:
return f"tep{self.moe_tp}"
return f"tp{self.tp}"
def picked_to_aic_model_config_kwargs(p: PickedParallelConfig) -> dict[str, int]:
"""Map a ``PickedParallelConfig`` to AIConfigurator ``ModelConfig`` kwargs.
Returned keys: ``tp_size``, ``pp_size``, ``moe_tp_size``, ``moe_ep_size``,
``attention_dp_size``.
For MoE picks AIC's picker always emits
``tp × dp == moe_tp × moe_ep`` (the attention-layer GPU width matches the
MoE-layer GPU width per replica), so the mapping is simply:
* ``tp_size = p.tp`` (AIC's attention TP per rank)
* ``attention_dp_size = p.dp``
* ``moe_tp_size = p.moe_tp``
* ``moe_ep_size = p.moe_ep``
* ``pp_size = p.pp``
This satisfies AIC's MoE-only assertion
``tp_size × attention_dp_size == moe_tp_size × moe_ep_size`` by
construction. For dense picks (``moe_tp = moe_ep = 1``) the assertion
does not apply — AIC's ``BaseModel`` ignores the MoE fields.
Do **not** derive ``tp_size`` from :attr:`PickedParallelConfig.tp_size`
— that property has KV-head-split semantics that conflict with AIC's
definition (it returns 1 for DEP, which breaks the identity).
"""
return {
"tp_size": p.tp,
"pp_size": p.pp,
"moe_tp_size": p.moe_tp,
"moe_ep_size": p.moe_ep,
"attention_dp_size": p.dp,
}
......@@ -24,6 +24,7 @@ from typing import Literal, Optional
import yaml
from pydantic import BaseModel, Field, model_validator
from dynamo.planner.config.aic_interpolation_spec import AICInterpolationSpec
from dynamo.planner.config.defaults import SLAPlannerDefaults
logger = logging.getLogger(__name__)
......@@ -77,6 +78,17 @@ class PlannerConfig(BaseModel):
profile_results_dir: str = SLAPlannerDefaults.profile_results_dir
aic_interpolation: Optional[AICInterpolationSpec] = Field(
default=None,
description=(
"AIConfigurator interpolation spec populated by the profiler in "
"rapid mode. When set, the planner runs the AIC sweep in-process "
"at bootstrap and uses the resulting FPMs to seed the regression "
"models (priority 2 between the get_perf_metrics endpoint and "
"the legacy profile_results_dir file loader)."
),
)
ttft: float = SLAPlannerDefaults.ttft
itl: float = SLAPlannerDefaults.itl
......@@ -222,6 +234,15 @@ class PlannerConfig(BaseModel):
"enable_throughput_scaling is True. Throughput-based scaling "
"requires pre-deployment sweeping to profile engine performance."
)
if (
self.pre_deployment_sweeping_mode == PlannerPreDeploymentSweepMode.Rapid
and self.aic_interpolation is None
):
logger.warning(
"pre_deployment_sweeping_mode='rapid' but aic_interpolation "
"is not set; planner will fall back to profile_results_dir "
"files if the get_perf_metrics endpoint is unavailable."
)
if self.enable_load_scaling:
if self.enable_throughput_scaling:
......
......@@ -32,6 +32,7 @@ class PrefillPlanner(NativePlannerBase):
worker_info=self.prefill_worker_info,
profile_results_dir=self.config.profile_results_dir,
component_type=SubComponentType.PREFILL,
aic_spec=self.config.aic_interpolation,
)
self.state_machine.load_benchmark_fpms(prefill_fpms=fpms)
except Exception as e:
......@@ -70,6 +71,7 @@ class DecodePlanner(NativePlannerBase):
worker_info=self.decode_worker_info,
profile_results_dir=self.config.profile_results_dir,
component_type=SubComponentType.DECODE,
aic_spec=self.config.aic_interpolation,
)
self.state_machine.load_benchmark_fpms(decode_fpms=fpms)
except Exception as e:
......@@ -108,6 +110,7 @@ class AggPlanner(NativePlannerBase):
worker_info=self.decode_worker_info,
profile_results_dir=self.config.profile_results_dir,
component_type=SubComponentType.DECODE,
aic_spec=self.config.aic_interpolation,
)
self.state_machine.load_benchmark_fpms(agg_fpms=fpms)
except Exception as e:
......@@ -155,6 +158,7 @@ class DisaggPlanner(NativePlannerBase):
worker_info=worker_info,
profile_results_dir=self.config.profile_results_dir,
component_type=component,
aic_spec=self.config.aic_interpolation,
)
self.state_machine.load_benchmark_fpms(**{kwarg: fpms})
except Exception as e:
......
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
"""AIConfigurator performance estimator used by the planner (and the profiler).
This thin wrapper around the ``aiconfigurator`` SDK lets callers estimate
prefill / decode latency and KV-cache capacity for a given model + system +
backend + parallelism config without spinning up a real engine. The planner
uses it to bootstrap regression models from an AIC spec in rapid mode.
"""
import logging
from typing import Any
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s", "%Y-%m-%d %H:%M:%S"
)
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
def _try_import_aiconfigurator():
# Lazy-import aiconfigurator because it's an optional dependency.
import aiconfigurator.sdk.backends.factory
import aiconfigurator.sdk.config
import aiconfigurator.sdk.inference_session
import aiconfigurator.sdk.models
import aiconfigurator.sdk.perf_database
return aiconfigurator
class AIConfiguratorPerfEstimator:
"""
This class is used to estimate the performance of a model using aiconfigurator.
An instance of this class stores information about the model, system, and backend.
Methods can be called to estimate prefill and/or decode perf for a given ISL, OSL,
batch_size, and parallelism config.
"""
def __init__(
self,
hf_id: str, # e.g. "Qwen/Qwen3-32B"
system: str, # e.g. "h200_sxm"
backend: str, # e.g. "trtllm"
):
aiconfigurator = _try_import_aiconfigurator()
logger.info("Loading aiconfigurator database. This might take a few seconds...")
version = aiconfigurator.sdk.perf_database.get_latest_database_version(
system,
backend,
)
self.database = aiconfigurator.sdk.perf_database.get_database(
system=system,
backend=backend,
version=version,
)
if not self.database:
raise ValueError(
f"Database not found for system: {system}, backend: {backend}, version: {version}"
)
logger.info("aiconfigurator database loaded.")
self.backend = aiconfigurator.sdk.backends.factory.get_backend(backend)
self.hf_id = hf_id
def _get_model(self, **model_config_kwargs):
aiconfigurator = _try_import_aiconfigurator()
# NOTE: MOE models error out unless moe_tp_size and moe_ep_size are provided.
model_config = aiconfigurator.sdk.config.ModelConfig(**model_config_kwargs)
model = aiconfigurator.sdk.models.get_model(
self.hf_id, model_config, self.backend
)
return model
def estimate_perf(
self,
isl: int,
osl: int,
batch_size: int,
mode: str = "full",
**model_config_kwargs,
) -> dict[str, Any]:
"""
Estimate the perf of this model + system + backend + ISL/OSL/model_config
using aiconfigurator.
Args:
isl: Input sequence length
osl: Output sequence length
batch_size: Batch size
mode: Indicates what perf data to estimate.
"full": Estimate prefill and decode perf.
"prefill": Only estimate context perf.
"decode": Only estimate decode perf.
**model_config_kwargs: aiconfigurator model config kwargs
(such as tp_size, moe_tp_size, etc).
Returns:
dict: Perf metrics returned by aiconfigurator
"""
aiconfigurator = _try_import_aiconfigurator()
mode_to_aic_mode = {
"full": "static",
"prefill": "static_ctx",
"decode": "static_gen",
}
if mode not in mode_to_aic_mode:
raise ValueError(
f"Invalid mode: {mode}. Must be one of {list(mode_to_aic_mode.keys())}."
)
self.runtime_config = aiconfigurator.sdk.config.RuntimeConfig(
batch_size=batch_size,
beam_width=1,
isl=isl,
osl=osl,
)
model = self._get_model(**model_config_kwargs)
session = aiconfigurator.sdk.inference_session.InferenceSession(
model, self.database, self.backend
)
summary = session.run_static(
mode=mode_to_aic_mode[mode], runtime_config=self.runtime_config, stride=32
)
summary_df = summary.get_summary_df()
# Convert pd.Dataframe to dict since there's only one row
return summary_df.to_dict(orient="records")[0]
def estimate_prefill_perf(
self,
isl: int,
**model_config_kwargs,
) -> dict[str, Any]:
"""
Estimate the perf of this model + system + backend + etc assuming it is a prefill worker.
Args:
isl: Input sequence length
**model_config_kwargs: aiconfigurator model config kwargs
(such as tp_size, moe_tp_size, etc).
Returns:
dict: Perf metrics returned by aiconfigurator
"""
return self.estimate_perf(
isl,
5, # small osl
1, # concurrency = 1
mode="prefill",
**model_config_kwargs,
)
def get_max_batch_size(
self,
isl: int,
osl: int,
**model_config_kwargs,
) -> int:
"""
Estimate the largest batch size that would fit on this GPU.
Args:
isl: Input sequence length
osl: Output sequence length
**model_config_kwargs: aiconfigurator model config kwargs
(such as tp_size, moe_tp_size, etc).
Returns:
int: Estimated largest batch size that will fit on the system.
"""
model = self._get_model(**model_config_kwargs)
def get_mem_usage(bs: int):
# TODO: _get_memory_usage might be underestimating because
# 1. it doesn't account for runtime buffers
# 2. it calculates num_tokens = isl*bs which ignores osl
return self.backend._get_memory_usage(
model, self.database, bs, 1, isl, osl
)["total"]
max_memory_gb = self.database.system_spec["gpu"]["mem_capacity"] / (1024**3)
bs = 1
if get_mem_usage(bs) > max_memory_gb:
# Model does not fit on GPU with the given model config.
return 0
# Step 1: find upper bound on batch size.
while get_mem_usage(bs) < max_memory_gb:
bs *= 2
# We know that bs // 2 will fit on GPU but bs will not.
min_bs = bs // 2
max_bs = bs
# Step 2: binary search for max batch size that fits on GPU.
while min_bs < max_bs:
test_bs = (min_bs + max_bs) // 2
if get_mem_usage(test_bs) < max_memory_gb:
# Because of the +1, the new value of min_bs might not fit on the GPU
# even though test_bs did fit. So at the end when min_bs and max_bs converge,
# we need to remember to subtract 1 from the result.
min_bs = test_bs + 1
else:
# max_bs is always a value that doesn't fit on the GPU.
max_bs = test_bs
return min_bs - 1 # see comment above
def get_max_kv_tokens(
self,
isl: int,
osl: int,
**model_config_kwargs,
) -> int:
"""
Estimate the max number of kv cache tokens that will fit on this GPU
for the given ISL, OSL, and model config.
Args:
isl: Input sequence length
osl: Output sequence length
**model_config_kwargs: aiconfigurator model config kwargs
(such as tp_size, moe_tp_size, etc).
Returns:
int: Estimated number of KV cache tokens that will fit on the system.
"""
max_concurrency = self.get_max_batch_size(isl, osl, **model_config_kwargs)
return max_concurrency * (isl + osl)
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
"""In-process AIC interpolation sweep that bootstraps the planner's regression.
This module replaces the profiler's NPZ-based AIC interpolation for rapid
mode. It runs the sweep in the planner pod at startup and produces
``ForwardPassMetrics`` directly — no disk I/O, no ConfigMap round-trip.
The FPM conventions match the thorough-mode path so both bootstrap sources
are interchangeable from the regression models' perspective:
* **Prefill** — one FPM per sweep ISL. ``num_prefill_requests=1``,
``sum_prefill_tokens=isl``, ``wall_time = per-rank TTFT``. AIC's
``estimate_prefill_perf`` with ``batch_size=1`` and the correct
``attention_dp_size`` returns per-rank latency by construction, so no DP
scaling is needed here.
* **Decode** — one FPM per (ISL, aggregate-num-request) sample.
``num_decode_requests = aggregate_num_req``,
``sum_decode_kv_tokens = aggregate_num_req * (isl + osl/2)``,
``wall_time = per-step ITL``. Matches thorough's aggregate semantics. We
convert aggregate → per-rank before calling AIC (AIC's
``RuntimeConfig.batch_size`` is per-attention-DP-rank, per the
``TrtllmWideEPDeepSeekModel`` comment in aiconfigurator/sdk/models.py).
The 7 MoE-DEP bugs that silently corrupted the old profiler path are fixed
here: every AIC call uses :func:`picked_to_aic_model_config_kwargs` so
``moe_tp_size`` / ``moe_ep_size`` / ``attention_dp_size`` reach AIC's
``ModelConfig``; ``get_max_kv_tokens`` is scaled up by ``attention_dp_size``
to aggregate; and the decode concurrency sweep is a plain linear sweep
(the DP-multiples constraint was a thorough-mode routing requirement that
doesn't apply to a static simulator).
"""
import logging
from dynamo.common.forward_pass_metrics import (
ForwardPassMetrics,
ScheduledRequestMetrics,
)
from dynamo.planner.config.aic_interpolation_spec import AICInterpolationSpec
from dynamo.planner.config.defaults import SubComponentType
from dynamo.planner.config.parallelization import (
PickedParallelConfig,
picked_to_aic_model_config_kwargs,
)
# aic_estimator itself lazy-imports aiconfigurator, so importing the wrapper
# class at module load time does NOT pull in the optional dependency —
# ImportError only materialises when the class is instantiated.
from dynamo.planner.monitoring.aic_estimator import AIConfiguratorPerfEstimator
logger = logging.getLogger(__name__)
_PREFILL_CONTEXT_MARGIN = 512 # mirror profile_prefill.py: leave room for chat template
def run_aic_interpolation(
spec: AICInterpolationSpec,
component_type: SubComponentType,
) -> list[ForwardPassMetrics]:
"""Run the AIC interpolation sweep and return synthetic FPMs.
Lazy-imports ``aiconfigurator`` — callers should catch ``ImportError``
and fall back to the file-based loader if the dependency is missing.
"""
from dynamo.planner.monitoring.aic_estimator import AIConfiguratorPerfEstimator
estimator = AIConfiguratorPerfEstimator(
hf_id=spec.hf_id,
system=spec.system,
backend=spec.backend,
)
if component_type == SubComponentType.PREFILL:
return _sweep_prefill(estimator, spec)
if component_type == SubComponentType.DECODE:
return _sweep_decode(estimator, spec)
raise ValueError(
f"Unsupported component_type for AIC interpolation: {component_type}"
)
def _sweep_prefill(
estimator: "AIConfiguratorPerfEstimator",
spec: AICInterpolationSpec,
) -> list[ForwardPassMetrics]:
"""Sweep prefill ISL, emit one FPM per point (per-rank semantics)."""
pick = spec.prefill_pick
kwargs = picked_to_aic_model_config_kwargs(pick)
max_ctx = spec.sweep_max_context_length - _PREFILL_CONTEXT_MARGIN
if max_ctx <= 100:
raise ValueError(
f"sweep_max_context_length {spec.sweep_max_context_length} is too "
f"small to profile prefill (need > {100 + _PREFILL_CONTEXT_MARGIN})"
)
step = max(1, (max_ctx - 100) // spec.prefill_interpolation_granularity)
fpms: list[ForwardPassMetrics] = []
for isl in range(100, max_ctx, step):
perf = estimator.estimate_prefill_perf(isl, **kwargs)
ttft_ms = perf.get("context_latency")
if ttft_ms is None or ttft_ms <= 0:
logger.warning(
"AIC returned invalid context_latency=%s for isl=%s; skipping",
ttft_ms,
isl,
)
continue
fpms.append(_prefill_fpm(isl, ttft_ms))
if len(fpms) < 3:
raise RuntimeError(
f"AIC prefill sweep produced only {len(fpms)} valid points; need >= 3"
)
return fpms
def _sweep_decode(
estimator: "AIConfiguratorPerfEstimator",
spec: AICInterpolationSpec,
) -> list[ForwardPassMetrics]:
"""Sweep decode (ISL, aggregate num_request), emit one FPM per point."""
pick = spec.decode_pick
kwargs = picked_to_aic_model_config_kwargs(pick)
attention_dp = max(1, pick.dp)
# get_max_kv_tokens returns per-rank (AIC's memory accounting is per-GPU).
# Each DP rank has its own KV cache, so aggregate max = per_rank × dp.
per_rank_max_kv = estimator.get_max_kv_tokens(spec.isl, spec.osl, **kwargs)
if per_rank_max_kv <= 0:
raise RuntimeError(
"AIC get_max_kv_tokens returned %s; pick does not fit on GPU"
% per_rank_max_kv
)
max_kv_tokens_aggregate = per_rank_max_kv * attention_dp
osl_sweep = 500 # mirror profile_decode.py: short OSL for stable ITL measurement
if spec.sweep_max_context_length - osl_sweep <= 100:
raise ValueError(
f"sweep_max_context_length {spec.sweep_max_context_length} is too "
f"small to profile decode (need > {100 + osl_sweep})"
)
isl_step = max(
1,
(spec.sweep_max_context_length - osl_sweep)
// spec.decode_interpolation_granularity,
)
fpms: list[ForwardPassMetrics] = []
for isl in range(100, spec.sweep_max_context_length - osl_sweep, isl_step):
ctx_len = isl + osl_sweep / 2.0
max_concurrency_aggregate = max_kv_tokens_aggregate // (isl + osl_sweep)
if max_concurrency_aggregate <= 0:
logger.warning(
"max_kv_tokens_aggregate=%s too small for isl=%s osl=%s; stopping sweep",
max_kv_tokens_aggregate,
isl,
osl_sweep,
)
break
for num_req_aggregate in _concurrency_sweep(
max_concurrency_aggregate, spec.decode_interpolation_granularity
):
# AIC RuntimeConfig.batch_size is per-attention-DP-rank.
batch_size_per_rank = max(1, num_req_aggregate // attention_dp)
perf = estimator.estimate_perf(
isl,
osl_sweep,
batch_size_per_rank,
mode="decode",
**kwargs,
)
itl_ms = perf.get("tpot")
if itl_ms is None or itl_ms <= 0:
logger.warning(
"AIC returned invalid tpot=%s for isl=%s num_req_agg=%s; skipping",
itl_ms,
isl,
num_req_aggregate,
)
continue
fpms.append(_decode_fpm(num_req_aggregate, ctx_len, itl_ms))
if len(fpms) < 3:
raise RuntimeError(
f"AIC decode sweep produced only {len(fpms)} valid points; need >= 3"
)
return fpms
def _concurrency_sweep(max_concurrency: int, granularity: int) -> list[int]:
"""Linear sweep of integer concurrency levels from 1 to ``max_concurrency``.
Unlike the thorough-mode sweep (``get_num_request_range``) we do not need
multiples of ``attention_dp_size`` here — AIC is a static simulator, not
a round-robin request router, so DP alignment is not required.
"""
if max_concurrency <= 0 or granularity <= 0:
return []
if granularity == 1:
# Single sample: take the top of the range, so the one data point is
# informative rather than the trivial concurrency=1 case.
return [max_concurrency]
if max_concurrency <= granularity:
return list(range(1, max_concurrency + 1))
step = (max_concurrency - 1) / (granularity - 1)
points = {max(1, 1 + int(round(i * step))) for i in range(granularity)}
return sorted(points)
def _prefill_fpm(isl: int, ttft_ms: float) -> ForwardPassMetrics:
"""Per-rank, single-request prefill FPM (matches thorough-mode convention)."""
return ForwardPassMetrics(
wall_time=float(ttft_ms) / 1000.0,
scheduled_requests=ScheduledRequestMetrics(
num_prefill_requests=1,
sum_prefill_tokens=int(isl),
),
)
def _decode_fpm(
num_request_aggregate: int,
ctx_len: float,
itl_ms: float,
) -> ForwardPassMetrics:
"""Aggregate-batch decode FPM (matches thorough-mode convention)."""
sum_kv = int(round(num_request_aggregate * ctx_len))
return ForwardPassMetrics(
wall_time=float(itl_ms) / 1000.0,
scheduled_requests=ScheduledRequestMetrics(
num_decode_requests=int(num_request_aggregate),
sum_decode_kv_tokens=sum_kv,
),
)
__all__ = [
"run_aic_interpolation",
"PickedParallelConfig", # re-exported for convenience in callers
]
......@@ -5,8 +5,10 @@
Priority chain:
1. Call ``get_perf_metrics`` Dynamo endpoint (PR 7779 self-benchmark)
2. Fallback: convert legacy profiler npz to synthetic FPMs
3. If both fail: raise
2. Run AIConfigurator interpolation in-process if an ``AICInterpolationSpec``
is supplied (rapid mode)
3. Convert legacy profiler NPZ / JSON to synthetic FPMs (thorough mode)
4. If all three fail: raise
"""
import asyncio
......@@ -21,6 +23,7 @@ from dynamo.common.forward_pass_metrics import (
ForwardPassMetrics,
ScheduledRequestMetrics,
)
from dynamo.planner.config.aic_interpolation_spec import AICInterpolationSpec
from dynamo.planner.config.defaults import SubComponentType
from dynamo.planner.monitoring.worker_info import WorkerInfo
......@@ -33,19 +36,23 @@ async def fetch_pre_deployment_metrics(
worker_info: WorkerInfo,
profile_results_dir: Optional[str],
component_type: SubComponentType,
aic_spec: Optional[AICInterpolationSpec] = None,
) -> list[ForwardPassMetrics]:
"""Fetch pre-deployment engine perf data as an FPM list.
1. Try ``get_perf_metrics`` endpoint (PR 7779 self-benchmark)
2. Fallback: convert legacy profiler data (npz or JSON) to synthetic FPMs
3. If both fail: raise
1. Try ``get_perf_metrics`` endpoint (PR 7779 self-benchmark).
2. If ``aic_spec`` is set, run AIC interpolation in-process (rapid mode).
3. Convert legacy profiler data (NPZ or JSON) to synthetic FPMs
(thorough mode).
4. If all three fail: raise.
Args:
runtime: DistributedRuntime instance.
namespace: Dynamo namespace.
worker_info: WorkerInfo for the target component.
profile_results_dir: Path to legacy profiler npz data (fallback).
profile_results_dir: Path to legacy profiler data (last-resort fallback).
component_type: PREFILL or DECODE.
aic_spec: AIC interpolation spec from the profiler (rapid mode only).
Returns:
List of ForwardPassMetrics suitable for regression bootstrap.
......@@ -57,26 +64,55 @@ async def fetch_pre_deployment_metrics(
)
return fpms
if aic_spec is not None:
try:
fpms = _try_aic_interpolation(aic_spec, component_type)
if fpms:
logger.info(
f"Loaded {len(fpms)} FPMs from AIC interpolation "
f"({aic_spec.hf_id} on {aic_spec.system}/{aic_spec.backend})"
)
return fpms
except ImportError as e:
logger.error(
"aic_interpolation is set but aiconfigurator is not installed "
"in the planner image: %s",
e,
)
except Exception as e:
logger.warning(f"AIC interpolation failed, falling back to files: {e}")
if profile_results_dir:
try:
fpms = _convert_profiling_data_to_fpms(profile_results_dir, component_type)
if fpms:
logger.info(
f"Loaded {len(fpms)} FPMs from legacy profiler npz at {profile_results_dir}"
f"Loaded {len(fpms)} FPMs from legacy profiler data at "
f"{profile_results_dir}"
)
return fpms
except Exception as e:
logger.warning(
f"Failed to load profiling npz from {profile_results_dir}: {e}"
f"Failed to load profiling data from {profile_results_dir}: {e}"
)
raise RuntimeError(
"Failed to obtain pre-deployment performance data. "
"Either enable --benchmark-mode on the vLLM worker (get_perf_metrics endpoint) "
"or provide profiling results via --profile-results-dir."
"Failed to obtain pre-deployment performance data. Either enable the "
"get_perf_metrics endpoint on the worker, provide an aic_interpolation "
"spec (rapid mode), or supply profiling results via profile_results_dir."
)
def _try_aic_interpolation(
aic_spec: AICInterpolationSpec,
component_type: SubComponentType,
) -> list[ForwardPassMetrics]:
"""Delegate to the AIC sweep. Separated so the ImportError is catchable."""
from dynamo.planner.monitoring.aic_interpolation import run_aic_interpolation
return run_aic_interpolation(aic_spec, component_type)
async def _try_endpoint(
runtime: "object",
namespace: str,
......
......@@ -32,6 +32,7 @@ _stubs = {
},
"dynamo.common.forward_pass_metrics": {
"ForwardPassMetrics": MagicMock,
"ScheduledRequestMetrics": MagicMock,
},
}
for _mod_name, _attrs in _stubs.items():
......
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
"""Unit tests for the AIC interpolation handoff types and helpers.
The ``run_aic_interpolation`` sweep itself is tested in a separate follow-up
file once that module lands; this file covers the pure-Python helpers that
don't require ``aiconfigurator`` to be installed.
"""
from unittest.mock import MagicMock, patch
import pytest
from dynamo.planner.config.aic_interpolation_spec import AICInterpolationSpec
from dynamo.planner.config.defaults import SubComponentType
from dynamo.planner.config.parallelization import (
PickedParallelConfig,
picked_to_aic_model_config_kwargs,
)
from dynamo.planner.monitoring import aic_interpolation as aic_mod
pytestmark = [
pytest.mark.gpu_0,
pytest.mark.pre_merge,
pytest.mark.unit,
]
def _make_spec(
*,
prefill_pick: PickedParallelConfig,
decode_pick: PickedParallelConfig,
isl: int = 1000,
osl: int = 200,
sweep_max_context_length: int = 4096,
prefill_granularity: int = 4,
decode_granularity: int = 4,
) -> AICInterpolationSpec:
return AICInterpolationSpec(
hf_id="Qwen/Qwen3-235B-A22B-FP8",
system="h200_sxm",
backend="trtllm",
isl=isl,
osl=osl,
sweep_max_context_length=sweep_max_context_length,
prefill_interpolation_granularity=prefill_granularity,
decode_interpolation_granularity=decode_granularity,
prefill_pick=prefill_pick,
decode_pick=decode_pick,
)
def _patch_estimator(
*,
prefill_latency_ms: float = 50.0,
decode_tpot_ms: float = 20.0,
per_rank_max_kv: int = 100_000,
):
"""Patch AIConfiguratorPerfEstimator so tests don't need aiconfigurator.
``aic_interpolation`` imports the estimator class inside ``run_*`` for
lazy loading, so we patch at the source module.
"""
instance = MagicMock(name="AIConfiguratorPerfEstimator")
instance.estimate_prefill_perf.return_value = {
"context_latency": prefill_latency_ms
}
instance.estimate_perf.return_value = {"tpot": decode_tpot_ms}
instance.get_max_kv_tokens.return_value = per_rank_max_kv
cls = MagicMock(return_value=instance)
return (
patch(
"dynamo.planner.monitoring.aic_estimator.AIConfiguratorPerfEstimator",
cls,
),
instance,
)
class TestPickedToAicKwargs:
"""Verify the pick → AIC ModelConfig kwargs helper for each strategy.
The invariant AIC enforces for MoE models (aiconfigurator sdk/models.py,
~8 assertion sites) is::
tp_size * attention_dp_size == moe_tp_size * moe_ep_size
Each test case asserts both the expected kwargs and the identity.
"""
@staticmethod
def _assert_identity(kw: dict) -> None:
assert (
kw["tp_size"] * kw["attention_dp_size"]
== kw["moe_tp_size"] * kw["moe_ep_size"]
), f"AIC identity violated for {kw}"
def test_tp_only_dense(self):
# Dense 8-GPU TP pick; moe_tp/moe_ep are 1. The MoE identity does NOT
# apply to dense models — AIC's BaseModel doesn't assert it. We only
# check that tp_size carries p.tp and the MoE slots default to 1.
p = PickedParallelConfig(tp=8, pp=1, dp=1, moe_tp=1, moe_ep=1)
kw = picked_to_aic_model_config_kwargs(p)
assert kw == {
"tp_size": 8,
"pp_size": 1,
"moe_tp_size": 1,
"moe_ep_size": 1,
"attention_dp_size": 1,
}
def test_tp_only_moe(self):
# MoE TP-only on a MOE_ADDITIONAL_TP_ARCHITECTURES model, 8 GPUs.
p = PickedParallelConfig(tp=8, pp=1, dp=1, moe_tp=8, moe_ep=1)
kw = picked_to_aic_model_config_kwargs(p)
assert kw["tp_size"] == 8
assert kw["moe_tp_size"] == 8
assert kw["moe_ep_size"] == 1
assert kw["attention_dp_size"] == 1
self._assert_identity(kw)
def test_tep(self):
# TEP-8: attention and experts both sharded across 8 ranks.
p = PickedParallelConfig(tp=8, pp=1, dp=1, moe_tp=8, moe_ep=1)
kw = picked_to_aic_model_config_kwargs(p)
self._assert_identity(kw)
def test_dep(self):
# DEP-8: attention replicated across 8 DP ranks; experts split by EP=8.
p = PickedParallelConfig(tp=1, pp=1, dp=8, moe_tp=1, moe_ep=8)
kw = picked_to_aic_model_config_kwargs(p)
assert kw == {
"tp_size": 1,
"pp_size": 1,
"moe_tp_size": 1,
"moe_ep_size": 8,
"attention_dp_size": 8,
}
self._assert_identity(kw)
def test_hybrid_tep_plus_dp(self):
# Hybrid: attention TP=2 × DP=4, MoE TP=2 × EP=4, 16 total GPUs.
p = PickedParallelConfig(tp=2, pp=1, dp=4, moe_tp=2, moe_ep=4)
kw = picked_to_aic_model_config_kwargs(p)
assert kw["tp_size"] == 2
assert kw["attention_dp_size"] == 4
assert kw["moe_tp_size"] == 2
assert kw["moe_ep_size"] == 4
self._assert_identity(kw)
def test_never_uses_tp_size_property(self):
# Regression guard: the KV-head-split .tp_size returns 1 for DEP
# which would silently break AIC's identity. Confirm the helper
# does NOT derive from that property.
p = PickedParallelConfig(tp=1, dp=8, moe_ep=8)
assert p.tp_size == 1 # KV-head-split semantics
kw = picked_to_aic_model_config_kwargs(p)
# tp_size in AIC terms equals p.tp (1 here), not derived from p.tp_size
assert kw["tp_size"] == p.tp == 1
self._assert_identity(kw)
class TestAICInterpolationSpec:
def test_json_roundtrip(self):
spec = AICInterpolationSpec(
hf_id="Qwen/Qwen3-235B-A22B-FP8",
system="h200_sxm",
backend="trtllm",
isl=3000,
osl=300,
sweep_max_context_length=8192,
prefill_interpolation_granularity=16,
decode_interpolation_granularity=6,
prefill_pick=PickedParallelConfig(tp=4, pp=1, dp=4, moe_tp=1, moe_ep=4),
decode_pick=PickedParallelConfig(tp=1, pp=1, dp=8, moe_tp=2, moe_ep=4),
)
roundtrip = AICInterpolationSpec.model_validate_json(spec.model_dump_json())
assert roundtrip == spec
def test_rejects_unknown_backend(self):
with pytest.raises(ValueError):
AICInterpolationSpec(
hf_id="x",
system="h200_sxm",
backend="bogus", # type: ignore[arg-type]
isl=1,
osl=1,
sweep_max_context_length=1,
prefill_interpolation_granularity=1,
decode_interpolation_granularity=1,
prefill_pick=PickedParallelConfig(),
decode_pick=PickedParallelConfig(),
)
def test_positive_int_constraints(self):
# isl, osl, sweep_max_context_length, granularities must all be > 0.
base_kwargs = dict(
hf_id="x",
system="h200_sxm",
backend="trtllm",
sweep_max_context_length=8192,
prefill_interpolation_granularity=16,
decode_interpolation_granularity=6,
prefill_pick=PickedParallelConfig(),
decode_pick=PickedParallelConfig(),
)
with pytest.raises(ValueError):
AICInterpolationSpec(isl=0, osl=100, **base_kwargs)
with pytest.raises(ValueError):
AICInterpolationSpec(isl=100, osl=0, **base_kwargs)
class TestRunAicInterpolation:
"""Exercise the end-to-end sweep with a mocked AIC estimator.
These tests enforce the MoE-DEP correctness invariants that the old
profiler path silently violated. Each test drives a DEP-8 pick through
the sweep and asserts what reaches AIC and what reaches the FPM list.
"""
def test_prefill_sweep_shape_and_fpm_convention(self):
"""Prefill FPMs match thorough-mode: (1 req, isl tokens, per-rank ttft)."""
pick = PickedParallelConfig(tp=1, pp=1, dp=8, moe_tp=1, moe_ep=8)
spec = _make_spec(
prefill_pick=pick,
decode_pick=pick,
sweep_max_context_length=4096,
prefill_granularity=4,
)
ctx_patch, estimator = _patch_estimator(prefill_latency_ms=123.0)
with ctx_patch:
fpms = aic_mod.run_aic_interpolation(spec, SubComponentType.PREFILL)
assert len(fpms) >= 3
for fpm in fpms:
# Per-rank, single-request semantics.
assert fpm.scheduled_requests.num_prefill_requests == 1
assert fpm.scheduled_requests.sum_prefill_tokens > 0
assert fpm.wall_time == pytest.approx(0.123)
def test_prefill_passes_correct_aic_kwargs_for_dep(self):
"""DEP pick → AIC receives attention_dp_size=8, tp_size=1, etc."""
pick = PickedParallelConfig(tp=1, pp=1, dp=8, moe_tp=1, moe_ep=8)
spec = _make_spec(prefill_pick=pick, decode_pick=pick)
ctx_patch, estimator = _patch_estimator()
with ctx_patch:
aic_mod.run_aic_interpolation(spec, SubComponentType.PREFILL)
# Every call to estimate_prefill_perf must carry the full MoE kwargs
# derived from the pick — this is the fix for the 3-bug cluster that
# crashed MoE-DEP picks in the old profiler path.
for call in estimator.estimate_prefill_perf.call_args_list:
kwargs = call.kwargs
assert kwargs["tp_size"] == 1
assert kwargs["attention_dp_size"] == 8
assert kwargs["moe_tp_size"] == 1
assert kwargs["moe_ep_size"] == 8
assert (
kwargs["tp_size"] * kwargs["attention_dp_size"]
== kwargs["moe_tp_size"] * kwargs["moe_ep_size"]
)
def test_decode_batch_size_passed_per_rank(self):
"""Aggregate num_request gets divided by attention_dp_size for AIC.
AIC's RuntimeConfig.batch_size is per-attention-DP-rank (see the
TrtllmWideEPDeepSeekModel comment in aiconfigurator). Feeding the
aggregate directly would over-count MoE tokens by a factor of DP.
"""
pick = PickedParallelConfig(tp=1, pp=1, dp=8, moe_tp=2, moe_ep=4)
spec = _make_spec(
prefill_pick=pick,
decode_pick=pick,
sweep_max_context_length=2048,
decode_granularity=3,
)
# Large per-rank max_kv so the sweep fills multiple concurrency points.
ctx_patch, estimator = _patch_estimator(per_rank_max_kv=500_000)
with ctx_patch:
fpms = aic_mod.run_aic_interpolation(spec, SubComponentType.DECODE)
assert len(fpms) >= 3
# estimate_perf should always see a per-rank batch_size.
# It's positional: estimator.estimate_perf(isl, osl, batch_size, mode="decode", **kw)
for call in estimator.estimate_perf.call_args_list:
batch_size = call.args[2]
# batch_size is per-rank, so it must be <= aggregate/dp. Since
# the aggregate concurrency sweep can produce values below dp,
# we floor at 1 — but batch_size should NEVER exceed per-rank
# max, which bounds at max_kv/(isl+osl) without the *dp scale.
assert batch_size >= 1
assert call.kwargs["attention_dp_size"] == 8
assert call.kwargs["moe_tp_size"] == 2
assert call.kwargs["moe_ep_size"] == 4
# MoE identity must hold on every AIC call.
assert (
call.kwargs["tp_size"] * call.kwargs["attention_dp_size"]
== call.kwargs["moe_tp_size"] * call.kwargs["moe_ep_size"]
)
def test_decode_fpm_is_aggregate(self):
"""FPMs carry aggregate num_decode_requests, matching thorough mode."""
pick = PickedParallelConfig(tp=1, pp=1, dp=8, moe_tp=1, moe_ep=8)
spec = _make_spec(
prefill_pick=pick,
decode_pick=pick,
sweep_max_context_length=2048,
decode_granularity=3,
)
ctx_patch, _ = _patch_estimator(per_rank_max_kv=500_000)
with ctx_patch:
fpms = aic_mod.run_aic_interpolation(spec, SubComponentType.DECODE)
assert all(f.scheduled_requests.num_decode_requests >= 1 for f in fpms)
assert all(f.scheduled_requests.sum_decode_kv_tokens > 0 for f in fpms)
# sum_decode_kv_tokens should be num_decode_requests * ctx_len
# (ctx_len = isl + osl_sweep/2 where osl_sweep = 500 inside the module).
for fpm in fpms:
req = fpm.scheduled_requests.num_decode_requests
kv = fpm.scheduled_requests.sum_decode_kv_tokens
assert kv / req > 100 # some positive ctx_len
assert all(f.wall_time == pytest.approx(0.020) for f in fpms)
def test_decode_uses_aggregate_max_kv(self):
"""AIC per-rank max_kv is multiplied by attention_dp_size for sweep bound.
Regression for Bug #5: old path used per-rank max, which shrank the
sweep and left the regression under-fit on high-concurrency points.
"""
pick = PickedParallelConfig(tp=1, pp=1, dp=8, moe_tp=1, moe_ep=8)
spec = _make_spec(
prefill_pick=pick,
decode_pick=pick,
isl=1000,
osl=200,
sweep_max_context_length=2048,
decode_granularity=3,
)
# Tiny per-rank max_kv. If the sweep used per-rank, max_concurrency =
# 1200/1200 = 1 and we'd get only one point per isl. With aggregate
# (×8), max_concurrency = 9600/1200 = 8, yielding multiple points.
ctx_patch, _ = _patch_estimator(per_rank_max_kv=1200)
with ctx_patch:
fpms = aic_mod.run_aic_interpolation(spec, SubComponentType.DECODE)
# Across all isl steps we should see multiple distinct concurrency
# levels — evidence the aggregate scaling kicked in.
distinct_concurrencies = {
f.scheduled_requests.num_decode_requests for f in fpms
}
assert len(distinct_concurrencies) >= 2
class TestQwen235MoEPicks:
"""End-to-end exercise with a realistic large-MoE pick pair.
Uses a concrete Qwen-235B-A22B-FP8 pick shape that spans the full MoE
parallelism surface in one test class:
* Prefill: ``tp=4, dp=1, moe_tp=1, moe_ep=4`` — DEP on the MoE layer.
* Decode: ``tp=1, dp=8, moe_tp=2, moe_ep=4`` — hybrid TEP + attention DP.
Together they cover every AIC kwarg that a pick can carry, including the
case where ``.tp_size`` (KV-head-split property) disagrees with AIC's
attention-TP.
"""
PREFILL_PICK = PickedParallelConfig(tp=4, pp=1, dp=1, moe_tp=1, moe_ep=4)
DECODE_PICK = PickedParallelConfig(tp=1, pp=1, dp=8, moe_tp=2, moe_ep=4)
def test_prefill_pick_satisfies_aic_identity(self):
"""AIC's assertion: tp_size * attention_dp_size == moe_tp * moe_ep."""
kw = picked_to_aic_model_config_kwargs(self.PREFILL_PICK)
assert kw["tp_size"] == 4 # raw p.tp, NOT the .tp_size property
assert kw["attention_dp_size"] == 1
assert kw["moe_tp_size"] == 1
assert kw["moe_ep_size"] == 4
assert (
kw["tp_size"] * kw["attention_dp_size"]
== kw["moe_tp_size"] * kw["moe_ep_size"]
)
def test_decode_pick_satisfies_aic_identity(self):
kw = picked_to_aic_model_config_kwargs(self.DECODE_PICK)
assert kw["tp_size"] == 1
assert kw["attention_dp_size"] == 8
assert kw["moe_tp_size"] == 2
assert kw["moe_ep_size"] == 4
assert (
kw["tp_size"] * kw["attention_dp_size"]
== kw["moe_tp_size"] * kw["moe_ep_size"]
)
def test_tp_size_property_differs_from_aic_tp_size(self):
"""Documents that the KV-split property is NOT AIC's tp_size."""
# For this DEP-on-MoE pick, .tp_size returns 1 (because moe_ep > 1),
# which would violate the MoE identity. AIC's real tp_size is 4.
assert self.PREFILL_PICK.tp_size == 1 # KV-head-split semantics
kw = picked_to_aic_model_config_kwargs(self.PREFILL_PICK)
assert kw["tp_size"] == 4 # AIC semantics
assert kw["tp_size"] != self.PREFILL_PICK.tp_size
def test_end_to_end_sweep_delivers_complete_kwargs_to_aic(self):
"""Run both sweeps end-to-end and verify every AIC call is well-formed."""
spec = _make_spec(
prefill_pick=self.PREFILL_PICK,
decode_pick=self.DECODE_PICK,
isl=3000,
osl=300,
sweep_max_context_length=4096,
prefill_granularity=4,
decode_granularity=3,
)
ctx_patch, estimator = _patch_estimator(per_rank_max_kv=500_000)
with ctx_patch:
prefill_fpms = aic_mod.run_aic_interpolation(spec, SubComponentType.PREFILL)
decode_fpms = aic_mod.run_aic_interpolation(spec, SubComponentType.DECODE)
assert prefill_fpms and decode_fpms
# Every prefill AIC call must carry complete MoE kwargs and satisfy
# the parallelism identity.
for call in estimator.estimate_prefill_perf.call_args_list:
kw = call.kwargs
assert {
"tp_size",
"pp_size",
"moe_tp_size",
"moe_ep_size",
"attention_dp_size",
} <= kw.keys()
assert kw["tp_size"] == 4
assert kw["moe_ep_size"] == 4
assert kw["attention_dp_size"] == 1
assert (
kw["tp_size"] * kw["attention_dp_size"]
== kw["moe_tp_size"] * kw["moe_ep_size"]
)
for call in estimator.estimate_perf.call_args_list:
kw = call.kwargs
assert kw["tp_size"] == 1
assert kw["moe_tp_size"] == 2
assert kw["moe_ep_size"] == 4
assert kw["attention_dp_size"] == 8
assert (
kw["tp_size"] * kw["attention_dp_size"]
== kw["moe_tp_size"] * kw["moe_ep_size"]
)
class TestConcurrencySweep:
"""The AIC-path sweep drops thorough-mode's DP-multiples constraint."""
def test_small_max_uses_full_range(self):
assert aic_mod._concurrency_sweep(3, 4) == [1, 2, 3]
def test_granularity_caps_points(self):
out = aic_mod._concurrency_sweep(100, 4)
assert len(out) == 4
assert out[0] == 1
assert out[-1] == 100
assert sorted(out) == out
def test_zero_max_returns_empty(self):
assert aic_mod._concurrency_sweep(0, 4) == []
def test_granularity_one_returns_top_of_range(self):
# Before the guard this raised ZeroDivisionError: (max-1)/(1-1).
assert aic_mod._concurrency_sweep(10, 1) == [10]
assert aic_mod._concurrency_sweep(1, 1) == [1]
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
"""Priority-chain tests for ``fetch_pre_deployment_metrics``.
Order (highest → lowest):
1. ``get_perf_metrics`` endpoint
2. AIC interpolation (if spec is set)
3. File fallback (NPZ / JSON under ``profile_results_dir``)
"""
from unittest.mock import MagicMock, patch
import pytest
from dynamo.planner.config.aic_interpolation_spec import AICInterpolationSpec
from dynamo.planner.config.defaults import SubComponentType
from dynamo.planner.config.parallelization import PickedParallelConfig
from dynamo.planner.monitoring import perf_metrics as pm
pytestmark = [
pytest.mark.gpu_0,
pytest.mark.pre_merge,
pytest.mark.unit,
]
def _make_spec() -> AICInterpolationSpec:
pick = PickedParallelConfig(tp=1, dp=8, moe_tp=1, moe_ep=8)
return AICInterpolationSpec(
hf_id="x",
system="h200_sxm",
backend="trtllm",
isl=1000,
osl=200,
sweep_max_context_length=4096,
prefill_interpolation_granularity=4,
decode_interpolation_granularity=4,
prefill_pick=pick,
decode_pick=pick,
)
def _fpm():
"""Opaque sentinel FPM — identity comparison is enough for these tests."""
return object()
class TestPriorityChain:
@pytest.mark.asyncio
async def test_endpoint_wins(self):
"""When the endpoint returns FPMs, AIC and files are never consulted."""
endpoint_fpms = [_fpm(), _fpm()]
with patch.object(
pm, "_try_endpoint", return_value=endpoint_fpms
) as mock_ep, patch.object(
pm, "_try_aic_interpolation"
) as mock_aic, patch.object(
pm, "_convert_profiling_data_to_fpms"
) as mock_files:
got = await pm.fetch_pre_deployment_metrics(
runtime=MagicMock(),
namespace="dynamo",
worker_info=MagicMock(component_name="worker"),
profile_results_dir="/tmp/profile",
component_type=SubComponentType.PREFILL,
aic_spec=_make_spec(),
)
assert got is endpoint_fpms
mock_ep.assert_awaited_once()
mock_aic.assert_not_called()
mock_files.assert_not_called()
@pytest.mark.asyncio
async def test_aic_fallback_when_endpoint_empty(self):
"""Endpoint returns [] → AIC runs. Files never consulted."""
aic_fpms = [_fpm()]
with patch.object(pm, "_try_endpoint", return_value=[]), patch.object(
pm, "_try_aic_interpolation", return_value=aic_fpms
) as mock_aic, patch.object(
pm, "_convert_profiling_data_to_fpms"
) as mock_files:
got = await pm.fetch_pre_deployment_metrics(
runtime=MagicMock(),
namespace="dynamo",
worker_info=MagicMock(component_name="worker"),
profile_results_dir="/tmp/profile",
component_type=SubComponentType.PREFILL,
aic_spec=_make_spec(),
)
assert got is aic_fpms
mock_aic.assert_called_once()
mock_files.assert_not_called()
@pytest.mark.asyncio
async def test_file_fallback_when_no_spec(self):
"""Endpoint returns [] and aic_spec is None → files load."""
file_fpms = [_fpm()]
with patch.object(pm, "_try_endpoint", return_value=[]), patch.object(
pm, "_try_aic_interpolation"
) as mock_aic, patch.object(
pm, "_convert_profiling_data_to_fpms", return_value=file_fpms
) as mock_files:
got = await pm.fetch_pre_deployment_metrics(
runtime=MagicMock(),
namespace="dynamo",
worker_info=MagicMock(component_name="worker"),
profile_results_dir="/tmp/profile",
component_type=SubComponentType.DECODE,
aic_spec=None,
)
assert got is file_fpms
mock_aic.assert_not_called()
mock_files.assert_called_once()
@pytest.mark.asyncio
async def test_file_fallback_when_aic_fails(self):
"""Endpoint empty, AIC raises at runtime → files are consulted."""
file_fpms = [_fpm()]
with patch.object(pm, "_try_endpoint", return_value=[]), patch.object(
pm, "_try_aic_interpolation", side_effect=RuntimeError("aic boom")
), patch.object(
pm, "_convert_profiling_data_to_fpms", return_value=file_fpms
) as mock_files:
got = await pm.fetch_pre_deployment_metrics(
runtime=MagicMock(),
namespace="dynamo",
worker_info=MagicMock(component_name="worker"),
profile_results_dir="/tmp/profile",
component_type=SubComponentType.DECODE,
aic_spec=_make_spec(),
)
assert got is file_fpms
mock_files.assert_called_once()
@pytest.mark.asyncio
async def test_aic_missing_package_falls_through_to_files(self):
"""aiconfigurator not installed → ImportError is caught, files run."""
file_fpms = [_fpm()]
with patch.object(pm, "_try_endpoint", return_value=[]), patch.object(
pm,
"_try_aic_interpolation",
side_effect=ImportError("no module named aiconfigurator"),
), patch.object(
pm, "_convert_profiling_data_to_fpms", return_value=file_fpms
) as mock_files:
got = await pm.fetch_pre_deployment_metrics(
runtime=MagicMock(),
namespace="dynamo",
worker_info=MagicMock(component_name="worker"),
profile_results_dir="/tmp/profile",
component_type=SubComponentType.DECODE,
aic_spec=_make_spec(),
)
assert got is file_fpms
mock_files.assert_called_once()
@pytest.mark.asyncio
async def test_all_fail_raises(self):
"""No endpoint, no spec, no files → RuntimeError."""
with patch.object(pm, "_try_endpoint", return_value=[]), patch.object(
pm, "_try_aic_interpolation"
) as mock_aic, patch.object(
pm,
"_convert_profiling_data_to_fpms",
side_effect=FileNotFoundError("no npz"),
):
with pytest.raises(RuntimeError, match="Failed to obtain"):
await pm.fetch_pre_deployment_metrics(
runtime=MagicMock(),
namespace="dynamo",
worker_info=MagicMock(component_name="worker"),
profile_results_dir="/tmp/profile",
component_type=SubComponentType.PREFILL,
aic_spec=None,
)
mock_aic.assert_not_called()
......@@ -32,7 +32,10 @@ from dynamo.profiler.utils.config_modifiers.parallelization_mapping import (
)
from dynamo.profiler.utils.config_modifiers.protocol import apply_dgd_overrides
from dynamo.profiler.utils.defaults import SearchStrategy
from dynamo.profiler.utils.dgd_generation import assemble_final_config
from dynamo.profiler.utils.dgd_generation import (
assemble_final_config,
build_aic_interpolation_spec,
)
from dynamo.profiler.utils.dgdr_v1beta1_types import (
BackendType,
DynamoGraphDeploymentRequestSpec,
......@@ -371,11 +374,25 @@ async def run_profile(
)
# ---------------------------------------------------------------
# Interpolation curves — only needed when something consumes
# the per-engine performance data (throughput scaling or mocker).
# Interpolation curves — only needed when something consumes the
# per-engine performance data on disk (thorough-mode planner or
# mocker). Rapid-mode planner bootstraps AIC in-process at
# startup, so the profiler skips the NPZ sweep for that case.
# ---------------------------------------------------------------
chosen_exp = pick_result.get("chosen_exp", "")
is_disagg_config = chosen_exp not in ("agg",) and bool(chosen_exp)
# Compute max context length unconditionally — both the NPZ sweep
# (thorough, mocker) and the planner's rapid-mode AIC spec need it.
try:
model_cfg = get_model_config_from_model_path(resolve_model_path(dgdr))
sweep_max_context_length = model_cfg.get("max_position_embeddings", 0)
except Exception:
logger.warning("Could not fetch model max context length.")
sweep_max_context_length = 0
if not sweep_max_context_length:
sweep_max_context_length = isl * 2 if isl > 0 else 8192
if not ops.dry_run and dgd_config and needs_profile_data(dgdr):
ops.current_phase = ProfilingPhase.BuildingCurves
write_profiler_status(
......@@ -391,19 +408,6 @@ async def run_profile(
chosen_exp,
)
else:
try:
model_cfg = get_model_config_from_model_path(
resolve_model_path(dgdr)
)
sweep_max_context_length = model_cfg.get(
"max_position_embeddings", 0
)
except Exception:
logger.warning("Could not fetch model max context length.")
sweep_max_context_length = 0
if not sweep_max_context_length:
sweep_max_context_length = isl * 2 if isl > 0 else 8192
await run_interpolation(
dgdr,
ops,
......@@ -430,8 +434,30 @@ async def run_profile(
message="Packaging data and generating final DGD YAML",
phase=ops.current_phase,
)
aic_spec = (
build_aic_interpolation_spec(
dgdr,
best_prefill_pick=best_prefill_config,
best_decode_pick=best_decode_config,
isl=isl,
osl=osl,
sweep_max_context_length=sweep_max_context_length,
resolved_backend=resolved_backend,
system=system,
prefill_interpolation_granularity=ops.prefill_interpolation_granularity,
decode_interpolation_granularity=ops.decode_interpolation_granularity,
)
if is_disagg_config and not ops.dry_run
else None
)
final_config = assemble_final_config(
dgdr, ops, dgd_config, best_prefill_config, best_decode_config
dgdr,
ops,
dgd_config,
best_prefill_config,
best_decode_config,
aic_spec=aic_spec,
resolved_backend=resolved_backend,
)
# --- Apply DGD overrides (user-supplied partial DGD) ---
......
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
"""Unit tests for the AIC-spec integration in profiler DGD generation."""
import pytest
try:
from dynamo.planner.config.aic_interpolation_spec import AICInterpolationSpec
from dynamo.planner.config.parallelization import PickedParallelConfig
from dynamo.planner.config.planner_config import (
PlannerConfig,
PlannerPreDeploymentSweepMode,
)
from dynamo.profiler.utils.dgd_generation import (
_build_planner_config,
build_aic_interpolation_spec,
enable_vllm_benchmark_mode,
)
from dynamo.profiler.utils.dgdr_v1beta1_types import (
DynamoGraphDeploymentRequestSpec,
FeaturesSpec,
)
except ImportError as e:
pytest.skip(f"Missing dependency: {e}", allow_module_level=True)
pytestmark = [
pytest.mark.gpu_0,
pytest.mark.pre_merge,
pytest.mark.unit,
]
def _dgdr(
planner: PlannerConfig | None = None,
model: str = "Qwen/Qwen3-32B",
) -> DynamoGraphDeploymentRequestSpec:
features = FeaturesSpec(planner=planner) if planner else None
return DynamoGraphDeploymentRequestSpec(model=model, features=features)
class TestBuildAICInterpolationSpec:
def _rapid_planner(self) -> PlannerConfig:
return PlannerConfig(
enable_throughput_scaling=True,
enable_load_scaling=False,
optimization_target="sla",
pre_deployment_sweeping_mode=PlannerPreDeploymentSweepMode.Rapid,
)
def test_rapid_planner_produces_spec(self):
dgdr = _dgdr(planner=self._rapid_planner())
pick = PickedParallelConfig(tp=1, dp=8, moe_tp=1, moe_ep=8)
spec = build_aic_interpolation_spec(
dgdr,
best_prefill_pick=pick,
best_decode_pick=pick,
isl=3000,
osl=300,
sweep_max_context_length=8192,
resolved_backend="trtllm",
system="h200_sxm",
prefill_interpolation_granularity=16,
decode_interpolation_granularity=6,
)
assert isinstance(spec, AICInterpolationSpec)
assert spec.hf_id == "Qwen/Qwen3-32B"
assert spec.backend == "trtllm"
assert spec.system == "h200_sxm"
assert spec.prefill_pick == pick
assert spec.decode_pick == pick
def test_thorough_planner_returns_none(self):
planner = PlannerConfig(
enable_throughput_scaling=True,
enable_load_scaling=False,
optimization_target="sla",
pre_deployment_sweeping_mode=PlannerPreDeploymentSweepMode.Thorough,
)
dgdr = _dgdr(planner=planner)
pick = PickedParallelConfig(tp=1, dp=8, moe_ep=8)
got = build_aic_interpolation_spec(
dgdr,
best_prefill_pick=pick,
best_decode_pick=pick,
isl=3000,
osl=300,
sweep_max_context_length=8192,
resolved_backend="trtllm",
system="h200_sxm",
prefill_interpolation_granularity=16,
decode_interpolation_granularity=6,
)
assert got is None
def test_throughput_disabled_returns_none(self):
planner = PlannerConfig(
enable_throughput_scaling=False,
enable_load_scaling=True,
pre_deployment_sweeping_mode=PlannerPreDeploymentSweepMode.Rapid,
)
dgdr = _dgdr(planner=planner)
pick = PickedParallelConfig(tp=1)
got = build_aic_interpolation_spec(
dgdr,
best_prefill_pick=pick,
best_decode_pick=pick,
isl=1000,
osl=100,
sweep_max_context_length=4096,
resolved_backend="trtllm",
system="h200_sxm",
prefill_interpolation_granularity=8,
decode_interpolation_granularity=4,
)
assert got is None
def test_missing_picks_returns_none(self):
dgdr = _dgdr(planner=self._rapid_planner())
got = build_aic_interpolation_spec(
dgdr,
best_prefill_pick=None,
best_decode_pick=None,
isl=1000,
osl=100,
sweep_max_context_length=4096,
resolved_backend="trtllm",
system="h200_sxm",
prefill_interpolation_granularity=8,
decode_interpolation_granularity=4,
)
assert got is None
def test_unsupported_backend_returns_none(self):
dgdr = _dgdr(planner=self._rapid_planner())
pick = PickedParallelConfig(tp=1)
got = build_aic_interpolation_spec(
dgdr,
best_prefill_pick=pick,
best_decode_pick=pick,
isl=1000,
osl=100,
sweep_max_context_length=4096,
resolved_backend="mocker",
system="h200_sxm",
prefill_interpolation_granularity=8,
decode_interpolation_granularity=4,
)
assert got is None
def test_no_planner_returns_none(self):
dgdr = _dgdr(planner=None)
pick = PickedParallelConfig(tp=1)
got = build_aic_interpolation_spec(
dgdr,
best_prefill_pick=pick,
best_decode_pick=pick,
isl=1000,
osl=100,
sweep_max_context_length=4096,
resolved_backend="trtllm",
system="h200_sxm",
prefill_interpolation_granularity=8,
decode_interpolation_granularity=4,
)
assert got is None
class TestBuildPlannerConfigEmbedsAicSpec:
def test_spec_threads_into_planner_config(self):
planner = PlannerConfig(
enable_throughput_scaling=True,
enable_load_scaling=False,
optimization_target="sla",
pre_deployment_sweeping_mode=PlannerPreDeploymentSweepMode.Rapid,
)
dgdr = _dgdr(planner=planner)
pick = PickedParallelConfig(tp=1, dp=8, moe_ep=8)
spec = AICInterpolationSpec(
hf_id="x",
system="h200_sxm",
backend="trtllm",
isl=1000,
osl=100,
sweep_max_context_length=4096,
prefill_interpolation_granularity=4,
decode_interpolation_granularity=4,
prefill_pick=pick,
decode_pick=pick,
)
cfg = _build_planner_config(dgdr, pick, pick, aic_spec=spec)
assert cfg.aic_interpolation == spec
# Regression: num-gpu injection still works.
assert cfg.prefill_engine_num_gpu == pick.num_gpus
assert cfg.decode_engine_num_gpu == pick.num_gpus
def test_no_spec_leaves_aic_interpolation_none(self):
planner = PlannerConfig(
enable_throughput_scaling=False,
enable_load_scaling=True,
)
dgdr = _dgdr(planner=planner)
pick = PickedParallelConfig(tp=8)
cfg = _build_planner_config(dgdr, pick, pick, aic_spec=None)
assert cfg.aic_interpolation is None
class TestNeedsProfileDataRapid:
def test_rapid_planner_only_returns_false(self):
"""Planner-only rapid: no files needed; planner will use aic_spec."""
from dynamo.profiler.utils.profile_common import needs_profile_data
planner = PlannerConfig(
enable_throughput_scaling=True,
enable_load_scaling=False,
optimization_target="sla",
pre_deployment_sweeping_mode=PlannerPreDeploymentSweepMode.Rapid,
)
dgdr = _dgdr(planner=planner)
assert needs_profile_data(dgdr) is False
def test_thorough_planner_returns_true(self):
"""Thorough still needs files."""
from dynamo.profiler.utils.profile_common import needs_profile_data
planner = PlannerConfig(
enable_throughput_scaling=True,
enable_load_scaling=False,
optimization_target="sla",
pre_deployment_sweeping_mode=PlannerPreDeploymentSweepMode.Thorough,
)
dgdr = _dgdr(planner=planner)
assert needs_profile_data(dgdr) is True
def _benchmark_mode(svc: dict) -> str | None:
env = svc.get("extraPodSpec", {}).get("mainContainer", {}).get("env", [])
for e in env:
if isinstance(e, dict) and e.get("name") == "DYN_BENCHMARK_MODE":
return e.get("value")
return None
class TestEnableVllmBenchmarkMode:
def test_disagg_sets_prefill_and_decode(self):
cfg = {
"spec": {
"services": {
"Frontend": {},
"VllmPrefillWorker": {},
"VllmDecodeWorker": {},
}
}
}
enable_vllm_benchmark_mode(cfg)
services = cfg["spec"]["services"]
assert _benchmark_mode(services["VllmPrefillWorker"]) == "prefill"
assert _benchmark_mode(services["VllmDecodeWorker"]) == "decode"
# Frontend service is untouched — no env list injected.
assert "env" not in services["Frontend"].get("extraPodSpec", {}).get(
"mainContainer", {}
)
def test_agg_sets_single_worker(self):
cfg = {"spec": {"services": {"Frontend": {}, "VllmWorker": {}}}}
enable_vllm_benchmark_mode(cfg)
assert _benchmark_mode(cfg["spec"]["services"]["VllmWorker"]) == "agg"
def test_idempotent_replaces_existing_value(self):
# Simulates a user override that sets DYN_BENCHMARK_MODE to an
# incorrect role; the helper must overwrite with the canonical value.
cfg = {
"spec": {
"services": {
"VllmDecodeWorker": {
"extraPodSpec": {
"mainContainer": {
"env": [
{"name": "SOMETHING_ELSE", "value": "keep"},
{"name": "DYN_BENCHMARK_MODE", "value": "wrong"},
]
}
}
}
}
}
}
enable_vllm_benchmark_mode(cfg)
env = cfg["spec"]["services"]["VllmDecodeWorker"]["extraPodSpec"][
"mainContainer"
]["env"]
names = [e["name"] for e in env]
assert names.count("DYN_BENCHMARK_MODE") == 1
assert _benchmark_mode(cfg["spec"]["services"]["VllmDecodeWorker"]) == "decode"
# Unrelated env vars are preserved.
assert {"name": "SOMETHING_ELSE", "value": "keep"} in env
def test_non_vllm_services_unchanged(self):
cfg = {
"spec": {
"services": {
"TRTLLMPrefillWorker": {},
"TRTLLMDecodeWorker": {},
"Frontend": {},
}
}
}
enable_vllm_benchmark_mode(cfg)
for svc in cfg["spec"]["services"].values():
assert _benchmark_mode(svc) is None
def test_preserves_unrelated_service_keys(self):
cfg = {
"spec": {
"services": {
"VllmPrefillWorker": {
"extraPodSpec": {
"mainContainer": {
"image": "nvcr.io/foo:1.0",
"args": ["--model-path", "x"],
}
}
}
}
}
}
enable_vllm_benchmark_mode(cfg)
mc = cfg["spec"]["services"]["VllmPrefillWorker"]["extraPodSpec"][
"mainContainer"
]
assert mc["image"] == "nvcr.io/foo:1.0"
assert mc["args"] == ["--model-path", "x"]
assert (
_benchmark_mode(cfg["spec"]["services"]["VllmPrefillWorker"]) == "prefill"
)
......@@ -476,13 +476,56 @@ class TestAssembleFinalConfig:
@pytest.mark.pre_merge
@pytest.mark.gpu_0
def test_planner_no_mocker_returns_config_with_planner_cm(self, tmp_path):
"""Planner enabled, no mocker: result is [planner_cm, profile_cm, dgd_config]."""
def test_rapid_planner_no_mocker_skips_profile_cm(self, tmp_path):
"""Rapid + planner + no mocker: the profile-data ConfigMap is NOT emitted.
The planner runs AIC interpolation in-process at bootstrap using the
aic_interpolation spec embedded in its config, so no NPZ round-trip
is needed.
"""
dgdr = _make_dgdr(features=FeaturesSpec(planner=_make_planner()))
ops = _make_ops(tmp_path)
os.makedirs(ops.output_dir, exist_ok=True)
dgd_config = {"kind": "DGD", "spec": {"services": {}}}
planner_cm = {"kind": "ConfigMap", "metadata": {"name": "planner-cm"}}
with (
patch(
f"{_DGD_GEN}.add_planner_to_config",
return_value=planner_cm,
) as mock_planner,
patch(
f"{_DGD_GEN}.add_profile_data_to_config",
) as mock_profile,
):
result = assemble_final_config(
dgdr,
ops,
dgd_config,
PickedParallelConfig(tp=1),
PickedParallelConfig(tp=1),
)
mock_planner.assert_called_once()
mock_profile.assert_not_called()
assert result == [planner_cm, dgd_config]
@pytest.mark.pre_merge
@pytest.mark.gpu_0
def test_thorough_planner_no_mocker_returns_config_with_both_cms(self, tmp_path):
"""Thorough + planner + no mocker: both planner_cm and profile_cm are emitted."""
dgdr = _make_dgdr(
searchStrategy="thorough",
features=FeaturesSpec(
planner=_make_planner(
pre_deployment_sweeping_mode=PlannerPreDeploymentSweepMode.Thorough,
)
),
)
ops = _make_ops(tmp_path)
os.makedirs(ops.output_dir, exist_ok=True)
dgd_config = {"kind": "DGD", "spec": {"services": {}}}
planner_cm = {"kind": "ConfigMap", "metadata": {"name": "planner-cm"}}
profile_cm = {"kind": "ConfigMap", "metadata": {"name": "profile-cm"}}
with (
......
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
"""Profiler-internal parallelization helpers.
``PickedParallelConfig`` has been relocated to
``dynamo.planner.config.parallelization`` so both the profiler and the planner
can share it. It is re-exported here for back-compat.
"""
import copy
import logging
from dataclasses import dataclass
from enum import Enum
from dynamo.planner.config.defaults import SubComponentType
from dynamo.planner.config.parallelization import PickedParallelConfig
from dynamo.profiler.utils.defaults import PREFILL_MAX_NUM_TOKENS
from dynamo.profiler.utils.model_info import MOE_ADDITIONAL_TP_ARCHITECTURES, ModelInfo
__all__ = [
"ParallelizationStrategy",
"ParallelizationMapping",
"PickedParallelConfig",
"get_candidate_parallel_mappings",
"apply_parallel_mapping_to_config",
]
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
console_handler = logging.StreamHandler()
......@@ -102,51 +118,6 @@ class ParallelizationMapping:
)
@dataclass(frozen=True)
class PickedParallelConfig:
"""Lightweight representation of a picked parallelization config.
Uses the same (tp, pp, dp, moe_tp, moe_ep) tuple that AIC's enumeration
and picking pipelines produce. Unlike :class:`ParallelizationMapping`,
this stores all five dimensions explicitly rather than using mutually
exclusive optional fields.
"""
tp: int = 1
pp: int = 1
dp: int = 1
moe_tp: int = 1
moe_ep: int = 1
@property
def num_gpus(self) -> int:
return self.tp * self.pp * self.dp
@property
def tp_size(self) -> int:
"""Effective TP for KV-head splitting (TP or TEP; 1 for DEP)."""
if self.moe_ep > 1:
return 1
if self.moe_tp > 1:
return self.moe_tp
return self.tp
def label(self) -> str:
if self.moe_ep > 1:
return f"dep{self.moe_ep}"
elif self.moe_tp > 1:
return f"tep{self.moe_tp}"
return f"tp{self.tp}"
def to_parallelization_mapping(self) -> ParallelizationMapping:
"""Convert to :class:`ParallelizationMapping`."""
if self.moe_ep > 1:
return ParallelizationMapping(dep=self.moe_ep)
elif self.moe_tp > 1:
return ParallelizationMapping(tep=self.moe_tp)
return ParallelizationMapping(tp=self.tp)
def _check_divisibility(
value: int | None,
divisor: int,
......
......@@ -23,8 +23,16 @@ import numpy as np
import yaml
from dynamo.common.utils.paths import get_workspace_dir
from dynamo.planner.config.backend_components import MockerComponentName
from dynamo.planner.config.planner_config import PlannerConfig
from dynamo.planner.config.aic_interpolation_spec import AICInterpolationSpec
from dynamo.planner.config.backend_components import (
MockerComponentName,
VllmComponentName,
)
from dynamo.planner.config.parallelization import PickedParallelConfig
from dynamo.planner.config.planner_config import (
PlannerConfig,
PlannerPreDeploymentSweepMode,
)
from dynamo.profiler.utils.config import DgdPlannerServiceConfig, set_argument_value
from dynamo.profiler.utils.profile_common import (
ProfilerOperationalConfig,
......@@ -64,13 +72,22 @@ def assemble_final_config(
dgd_config: dict | None,
best_prefill_config=None,
best_decode_config=None,
aic_spec: Optional[AICInterpolationSpec] = None,
resolved_backend: Optional[str] = None,
) -> Any:
"""Apply Dynamo features to the picked DGD config via composable layers.
1. **Mocker** — swap the base to the mocker DGD template if enabled.
2. **Planner** — inject the Planner service + planner-config ConfigMap.
3. **Profile data** — attach interpolation-data ConfigMap when mocker
or planner-throughput is enabled.
2. **vLLM self-benchmark** — when the resolved backend is vLLM, set
``DYN_BENCHMARK_MODE`` on each worker so the ``get_perf_metrics``
endpoint is populated at runtime. The planner consumes this as
priority 1 of its bootstrap chain, superseding AIC and files.
3. **Planner** — inject the Planner service + planner-config ConfigMap.
When ``aic_spec`` is given (rapid mode), it is embedded in the
planner config so the planner runs AIC interpolation at bootstrap
if the endpoint is unavailable.
4. **Profile data** — attach interpolation-data ConfigMap when mocker
or planner-thorough is enabled.
"""
if not dgd_config:
return dgd_config
......@@ -94,7 +111,13 @@ def assemble_final_config(
else:
base = dgd_config
# Steps 2-3: layer features, collecting ConfigMaps
# Step 2: for vLLM deployments, turn on the per-worker self-benchmark so
# the get_perf_metrics endpoint is available to the planner. Mocker
# workers don't use DYN_BENCHMARK_MODE, so skip when mocker is active.
if not mocker and resolved_backend == "vllm":
enable_vllm_benchmark_mode(base)
# Steps 3-4: layer features, collecting ConfigMaps
config_maps: list[dict] = []
if planner:
......@@ -103,6 +126,7 @@ def assemble_final_config(
base,
best_prefill_mapping=best_prefill_config,
best_decode_mapping=best_decode_config,
aic_spec=aic_spec,
)
config_maps.append(planner_cm)
......@@ -117,6 +141,54 @@ def assemble_final_config(
return base
def _vllm_worker_roles() -> dict[str, str]:
"""Canonical DGD service name → DYN_BENCHMARK_MODE role.
Sourced from :class:`VllmComponentName` so we stay in sync with the
rest of the planner/profiler if the k8s service names are ever
renamed.
"""
return {
VllmComponentName.prefill_worker_k8s_name: "prefill",
VllmComponentName.decode_worker_k8s_name: "decode",
VllmComponentName.agg_worker_k8s_name: "agg",
}
def enable_vllm_benchmark_mode(config_dict: dict) -> None:
"""Set ``DYN_BENCHMARK_MODE`` on every vLLM worker in *config_dict*.
Mutates ``config_dict`` in place. Each recognised worker service
(``VllmPrefillWorker`` / ``VllmDecodeWorker`` / ``VllmWorker``) gets the
mode matching its role so its startup self-benchmark publishes
ForwardPassMetrics via the ``get_perf_metrics`` endpoint.
Idempotent: if ``DYN_BENCHMARK_MODE`` is already set (e.g. via user
overrides) the existing entry is replaced with the role-correct value.
"""
services = config_dict.get("spec", {}).get("services", {})
for svc_name, mode in _vllm_worker_roles().items():
svc = services.get(svc_name)
if svc is None:
continue
main_container = svc.setdefault("extraPodSpec", {}).setdefault(
"mainContainer", {}
)
env_list = main_container.setdefault("env", [])
# Strip any existing DYN_BENCHMARK_MODE; append canonical value.
env_list[:] = [
e
for e in env_list
if not (isinstance(e, dict) and e.get("name") == "DYN_BENCHMARK_MODE")
]
env_list.append({"name": "DYN_BENCHMARK_MODE", "value": mode})
logger.info(
"Enabled vLLM self-benchmark on service %s (DYN_BENCHMARK_MODE=%s)",
svc_name,
mode,
)
def generate_mocker_config(dgdr) -> dict:
"""Load the mocker DGD template and apply DGDR images and model paths.
......@@ -161,6 +233,7 @@ def add_planner_to_config(
config_dict: dict,
best_prefill_mapping=None,
best_decode_mapping=None,
aic_spec: Optional[AICInterpolationSpec] = None,
) -> dict:
"""Add a Planner service and its planner-config ConfigMap to *config_dict*.
......@@ -173,11 +246,15 @@ def add_planner_to_config(
config_dict: The base DGD config (real or mocker) — mutated in place.
best_prefill_mapping: Picked prefill parallel config.
best_decode_mapping: Picked decode parallel config.
aic_spec: AIC interpolation spec (rapid mode). When set, the planner
runs AIC in-process at bootstrap instead of reading NPZ files.
Returns:
The ``planner_config_cm`` ConfigMap dict.
"""
planner_cfg = _build_planner_config(dgdr, best_prefill_mapping, best_decode_mapping)
planner_cfg = _build_planner_config(
dgdr, best_prefill_mapping, best_decode_mapping, aic_spec
)
planner_cfg.profile_results_dir = PROFILE_DATA_MOUNT
planner_service = DgdPlannerServiceConfig()
......@@ -343,6 +420,7 @@ def _build_planner_config(
dgdr,
best_prefill_mapping,
best_decode_mapping,
aic_spec: Optional[AICInterpolationSpec] = None,
) -> PlannerConfig:
"""Build a PlannerConfig from the DGDR spec and picked parallel configs."""
if dgdr.features and dgdr.features.planner:
......@@ -356,9 +434,69 @@ def _build_planner_config(
if best_decode_mapping is not None:
planner_cfg.decode_engine_num_gpu = best_decode_mapping.num_gpus
if aic_spec is not None:
planner_cfg.aic_interpolation = aic_spec
return planner_cfg
def build_aic_interpolation_spec(
dgdr,
best_prefill_pick: Optional[PickedParallelConfig],
best_decode_pick: Optional[PickedParallelConfig],
isl: int,
osl: int,
sweep_max_context_length: int,
resolved_backend: str,
system: str,
prefill_interpolation_granularity: int,
decode_interpolation_granularity: int,
) -> Optional[AICInterpolationSpec]:
"""Build an ``AICInterpolationSpec`` for the planner in rapid mode.
Returns ``None`` (the planner falls through to the file-based loader) when
any of the following hold:
* planner is not enabled
* ``pre_deployment_sweeping_mode`` is not ``Rapid``
* ``throughput_scaling`` is disabled (no pre-deployment data needed)
* picks are missing
* ``resolved_backend`` is not one AIC supports as a planner bootstrap source
"""
if not is_planner_enabled(dgdr):
return None
planner = dgdr.features.planner # type: ignore[union-attr]
if not planner.enable_throughput_scaling:
return None
if planner.pre_deployment_sweeping_mode != PlannerPreDeploymentSweepMode.Rapid:
return None
if best_prefill_pick is None or best_decode_pick is None:
logger.info(
"Rapid mode but picks are missing; skipping aic_interpolation spec. "
"Planner will fall back to the file-based loader."
)
return None
if resolved_backend not in ("trtllm", "vllm", "sglang"):
logger.info(
"Rapid mode but backend %r is not supported by AIC; skipping spec.",
resolved_backend,
)
return None
return AICInterpolationSpec(
hf_id=dgdr.model,
system=system,
backend=resolved_backend,
isl=isl,
osl=osl,
sweep_max_context_length=sweep_max_context_length,
prefill_interpolation_granularity=prefill_interpolation_granularity,
decode_interpolation_granularity=decode_interpolation_granularity,
prefill_pick=best_prefill_pick,
decode_pick=best_decode_pick,
)
def _load_profiling_data(output_dir: str) -> dict:
"""Load interpolation profiling data from NPZ files."""
result: dict = {}
......
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
import logging
from typing import Any
"""Profiler-side shim for AIConfiguratorPerfEstimator.
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s", "%Y-%m-%d %H:%M:%S"
)
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
The real implementation has moved to
``dynamo.planner.monitoring.aic_estimator`` so the planner can run AIC
interpolation in-process at bootstrap time. This module re-exports the
estimator for back-compat with existing profiler callers.
"""
from dynamo.planner.monitoring.aic_estimator import AIConfiguratorPerfEstimator
def _try_import_aiconfigurator():
# Lazy-import aiconfigurator because it's an optional dependency in profile_sla.py
import aiconfigurator.sdk.backends.factory
import aiconfigurator.sdk.config
import aiconfigurator.sdk.inference_session
import aiconfigurator.sdk.models
import aiconfigurator.sdk.perf_database
return aiconfigurator
class AIConfiguratorPerfEstimator:
"""
This class is used to estimate the performance of a model using aiconfigurator.
An instance of this class stores information about the model, system, and backend.
Methods can be called to estimate prefill and/or decode perf for a given ISL, OSL,
batch_size, and parallelism config.
"""
def __init__(
self,
hf_id: str, # e.g. "Qwen/Qwen3-32B"
system: str, # e.g. "h200_sxm"
backend: str, # e.g. "trtllm"
):
aiconfigurator = _try_import_aiconfigurator()
logger.info("Loading aiconfigurator database. This might take a few seconds...")
version = aiconfigurator.sdk.perf_database.get_latest_database_version(
system,
backend,
)
self.database = aiconfigurator.sdk.perf_database.get_database(
system=system,
backend=backend,
version=version,
)
if not self.database:
raise ValueError(
f"Database not found for system: {system}, backend: {backend}, version: {version}"
)
logger.info("aiconfigurator database loaded.")
self.backend = aiconfigurator.sdk.backends.factory.get_backend(backend)
self.hf_id = hf_id
def _get_model(self, **model_config_kwargs):
aiconfigurator = _try_import_aiconfigurator()
# NOTE: MOE models error out unless moe_tp_size and moe_ep_size are provided.
model_config = aiconfigurator.sdk.config.ModelConfig(**model_config_kwargs)
model = aiconfigurator.sdk.models.get_model(
self.hf_id, model_config, self.backend
)
return model
def estimate_perf(
self,
isl: int,
osl: int,
batch_size: int,
mode: str = "full",
**model_config_kwargs,
) -> dict[str, Any]:
"""
Estimate the perf of this model + system + backend + ISL/OSL/model_config
using aiconfigurator.
Args:
isl: Input sequence length
osl: Output sequence length
batch_size: Batch size
mode: Indicates what perf data to estimate.
"full": Estimate prefill and decode perf.
"prefill": Only estimate context perf.
"decode": Only estimate decode perf.
**model_config_kwargs: aiconfigurator model config kwargs
(such as tp_size, moe_tp_size, etc).
Returns:
dict: Perf metrics returned by aiconfigurator
"""
aiconfigurator = _try_import_aiconfigurator()
mode_to_aic_mode = {
"full": "static",
"prefill": "static_ctx",
"decode": "static_gen",
}
if mode not in mode_to_aic_mode:
raise ValueError(
f"Invalid mode: {mode}. Must be one of {list(mode_to_aic_mode.keys())}."
)
self.runtime_config = aiconfigurator.sdk.config.RuntimeConfig(
batch_size=batch_size,
beam_width=1,
isl=isl,
osl=osl,
)
model = self._get_model(**model_config_kwargs)
session = aiconfigurator.sdk.inference_session.InferenceSession(
model, self.database, self.backend
)
summary = session.run_static(
mode=mode_to_aic_mode[mode], runtime_config=self.runtime_config, stride=32
)
summary_df = summary.get_summary_df()
# Convert pd.Dataframe to dict since there's only one row
return summary_df.to_dict(orient="records")[0]
def estimate_prefill_perf(
self,
isl: int,
**model_config_kwargs,
) -> dict[str, Any]:
"""
Estimate the perf of this model + system + backend + etc assuming it is a prefill worker.
Args:
isl: Input sequence length
**model_config_kwargs: aiconfigurator model config kwargs
(such as tp_size, moe_tp_size, etc).
Returns:
dict: Perf metrics returned by aiconfigurator
"""
return self.estimate_perf(
isl,
5, # small osl
1, # concurrency = 1
mode="prefill",
**model_config_kwargs,
)
def get_max_batch_size(
self,
isl: int,
osl: int,
**model_config_kwargs,
) -> int:
"""
Estimate the largest batch size that would fit on this GPU.
Args:
isl: Input sequence length
osl: Output sequence length
**model_config_kwargs: aiconfigurator model config kwargs
(such as tp_size, moe_tp_size, etc).
Returns:
int: Estimated largest batch size that will fit on the system.
"""
model = self._get_model(**model_config_kwargs)
def get_mem_usage(bs: int):
# TODO: _get_memory_usage might be underestimating because
# 1. it doesn't account for runtime buffers
# 2. it calculates num_tokens = isl*bs which ignores osl
return self.backend._get_memory_usage(
model, self.database, bs, 1, isl, osl
)["total"]
max_memory_gb = self.database.system_spec["gpu"]["mem_capacity"] / (1024**3)
bs = 1
if get_mem_usage(bs) > max_memory_gb:
# Model does not fit on GPU with the given model config.
return 0
# Step 1: find upper bound on batch size.
while get_mem_usage(bs) < max_memory_gb:
bs *= 2
# We know that bs // 2 will fit on GPU but bs will not.
min_bs = bs // 2
max_bs = bs
# Step 2: binary search for max batch size that fits on GPU.
while min_bs < max_bs:
test_bs = (min_bs + max_bs) // 2
if get_mem_usage(test_bs) < max_memory_gb:
# Because of the +1, the new value of min_bs might not fit on the GPU
# even though test_bs did fit. So at the end when min_bs and max_bs converge,
# we need to remember to subtract 1 from the result.
min_bs = test_bs + 1
else:
# max_bs is always a value that doesn't fit on the GPU.
max_bs = test_bs
return min_bs - 1 # see comment above
def get_max_kv_tokens(
self,
isl: int,
osl: int,
**model_config_kwargs,
) -> int:
"""
Estimate the max number of kv cache tokens that will fit on this GPU
for the given ISL, OSL, and model config.
Args:
isl: Input sequence length
osl: Output sequence length
**model_config_kwargs: aiconfigurator model config kwargs
(such as tp_size, moe_tp_size, etc).
Returns:
int: Estimated number of KV cache tokens that will fit on the system.
"""
max_concurrency = self.get_max_batch_size(isl, osl, **model_config_kwargs)
return max_concurrency * (isl + osl)
__all__ = ["AIConfiguratorPerfEstimator"]
......@@ -22,6 +22,7 @@ from dataclasses import dataclass, field
import pandas as pd
from dynamo.planner.config.planner_config import PlannerPreDeploymentSweepMode
from dynamo.profiler.utils.config_modifiers.parallelization_mapping import (
PickedParallelConfig,
)
......@@ -184,18 +185,27 @@ def is_mocker_enabled(dgdr: DynamoGraphDeploymentRequestSpec) -> bool:
def needs_profile_data(dgdr: DynamoGraphDeploymentRequestSpec) -> bool:
"""True when the DGDR requires profiling interpolation data.
"""True when the DGDR requires profiling interpolation data *at this stage*.
Profile data is consumed by mocker workers (for latency simulation)
and by the planner when throughput-based scaling is enabled.
Profile data (NPZ/JSON on disk) is consumed by:
* **Mocker workers** for latency simulation — always required when
mocker is enabled.
* **Planner** when throughput scaling is enabled — required for
thorough mode only. In rapid mode the planner now runs AIC
interpolation in-process at bootstrap (see ``aic_interpolation.py``),
so the profiler no longer emits NPZ for planner-only rapid deployments.
"""
if is_mocker_enabled(dgdr):
return True
return (
if (
dgdr.features is not None
and dgdr.features.planner is not None
and dgdr.features.planner.enable_throughput_scaling
)
):
sweep_mode = dgdr.features.planner.pre_deployment_sweeping_mode
return sweep_mode != PlannerPreDeploymentSweepMode.Rapid
return False
def determine_picking_mode(dgdr: DynamoGraphDeploymentRequestSpec) -> str:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment