Unverified Commit 50818575 authored by hhzhang16's avatar hhzhang16 Committed by GitHub
Browse files

fix: propagate resolved backend and skip interpolation for aggregated configs (#7106)


Signed-off-by: default avatarHannah Zhang <hannahz@nvidia.com>
parent 0d5df9f6
...@@ -337,35 +337,49 @@ async def run_profile( ...@@ -337,35 +337,49 @@ async def run_profile(
) )
dgd_config = pick_result.get("dgd_config") if not ops.dry_run else None dgd_config = pick_result.get("dgd_config") if not ops.dry_run else None
resolved_backend = pick_result.get("resolved_backend", backend)
# --------------------------------------------------------------- # ---------------------------------------------------------------
# Interpolation curves — only needed when something consumes # Interpolation curves — only needed when something consumes
# the per-engine performance data (throughput scaling or mocker). # the per-engine performance data (throughput scaling or mocker).
# --------------------------------------------------------------- # ---------------------------------------------------------------
chosen_exp = pick_result.get("chosen_exp", "")
is_disagg_config = chosen_exp not in ("agg",) and bool(chosen_exp)
if not ops.dry_run and dgd_config and needs_profile_data(dgdr): if not ops.dry_run and dgd_config and needs_profile_data(dgdr):
try: if not is_disagg_config:
model_cfg = get_model_config_from_model_path(resolve_model_path(dgdr)) logger.info(
sweep_max_context_length = model_cfg.get("max_position_embeddings", 0) "Picked config is aggregated (chosen_exp=%r) — "
except Exception: "skipping interpolation (requires disaggregated config).",
logger.warning("Could not fetch model max context length.") chosen_exp,
sweep_max_context_length = 0 )
if not sweep_max_context_length: else:
sweep_max_context_length = isl * 2 if isl > 0 else 8192 try:
model_cfg = get_model_config_from_model_path(
await run_interpolation( resolve_model_path(dgdr)
dgdr, )
ops, sweep_max_context_length = model_cfg.get(
dgd_config, "max_position_embeddings", 0
best_prefill_config, )
best_decode_config, except Exception:
model, logger.warning("Could not fetch model max context length.")
system, sweep_max_context_length = 0
backend, if not sweep_max_context_length:
isl, sweep_max_context_length = isl * 2 if isl > 0 else 8192
osl,
sweep_max_context_length, await run_interpolation(
deployment_clients, dgdr,
) ops,
dgd_config,
best_prefill_config,
best_decode_config,
model,
system,
resolved_backend,
isl,
osl,
sweep_max_context_length,
deployment_clients,
)
# --------------------------------------------------------------- # ---------------------------------------------------------------
# Final DGD assembly # Final DGD assembly
......
...@@ -26,7 +26,10 @@ from aiconfigurator.generator.naive import build_naive_generator_params ...@@ -26,7 +26,10 @@ from aiconfigurator.generator.naive import build_naive_generator_params
from aiconfigurator.sdk.task import TaskConfig, TaskRunner from aiconfigurator.sdk.task import TaskConfig, TaskRunner
from dynamo.profiler.utils.dgdr_v1beta1_types import DynamoGraphDeploymentRequestSpec from dynamo.profiler.utils.dgdr_v1beta1_types import DynamoGraphDeploymentRequestSpec
from dynamo.profiler.utils.profile_common import derive_backend_image from dynamo.profiler.utils.profile_common import (
derive_backend_image,
needs_profile_data,
)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
...@@ -149,6 +152,7 @@ def _run_naive_fallback( ...@@ -149,6 +152,7 @@ def _run_naive_fallback(
"best_latencies": {"ttft": 0.0, "tpot": 0.0, "request_latency": 0.0}, "best_latencies": {"ttft": 0.0, "tpot": 0.0, "request_latency": 0.0},
"dgd_config": dgd_config, "dgd_config": dgd_config,
"chosen_exp": "agg", "chosen_exp": "agg",
"resolved_backend": backend,
} }
...@@ -243,6 +247,28 @@ def _run_default_sim( ...@@ -243,6 +247,28 @@ def _run_default_sim(
**load_kwargs, **load_kwargs,
) )
# When interpolation data is needed (mocker or throughput-scaling), a
# disaggregated config is required. If AIC picked an aggregated config,
# override to the best available disaggregated alternative so that
# run_interpolation() can run successfully downstream.
if chosen == "agg" and needs_profile_data(dgdr):
disagg_key = next(
(k for k in best_configs if "disagg" in k and not best_configs[k].empty),
None,
)
if disagg_key:
logger.info(
"AIC picked aggregated config but interpolation data is required — "
"overriding to '%s' to support mocker/throughput-scaling.",
disagg_key,
)
chosen = disagg_key
else:
logger.warning(
"AIC picked aggregated config and no disaggregated alternative "
"is available; interpolation data will be skipped."
)
best_config_df = best_configs.get(chosen, pd.DataFrame()) best_config_df = best_configs.get(chosen, pd.DataFrame())
best_latencies = best_latencies_map.get( best_latencies = best_latencies_map.get(
chosen, {"ttft": 0.0, "tpot": 0.0, "request_latency": 0.0} chosen, {"ttft": 0.0, "tpot": 0.0, "request_latency": 0.0}
......
...@@ -581,3 +581,355 @@ class TestAssembleFinalConfig: ...@@ -581,3 +581,355 @@ class TestAssembleFinalConfig:
mock_planner.assert_not_called() mock_planner.assert_not_called()
mock_profile.assert_called_once() mock_profile.assert_called_once()
assert result == [profile_cm, mocker_base] assert result == [profile_cm, mocker_base]
# ---------------------------------------------------------------------------
# Regression tests: naive fallback resolved_backend propagation (bug fix)
# ---------------------------------------------------------------------------
class TestNaiveFallbackResolvedBackend:
"""Regression tests for the 'auto' backend KeyError bug.
When backend='auto', _run_naive_fallback resolves it to a concrete
backend (e.g. 'vllm') and must expose that in the result dict so
run_profile() can pass the concrete name to run_interpolation().
"""
@pytest.mark.pre_merge
@pytest.mark.gpu_0
@pytest.mark.parallel
def test_naive_fallback_resolved_backend_auto(self):
"""_run_naive_fallback sets 'resolved_backend' to the concrete backend
when the input backend is 'auto'."""
try:
from unittest.mock import patch
from dynamo.profiler.rapid import _run_naive_fallback
except ImportError as e:
pytest.skip(f"Missing dependency: {e}")
dgdr = _make_dgdr(backend="auto")
with (
patch(
"dynamo.profiler.rapid.build_naive_generator_params",
return_value={},
),
patch(
"dynamo.profiler.rapid.generate_backend_artifacts",
return_value={},
),
):
result = _run_naive_fallback(
dgdr,
model="Qwen/Qwen3-32B",
total_gpus=8,
system="h200_sxm",
backend="auto",
)
# The resolved backend must be a concrete name, not 'auto'
assert (
"resolved_backend" in result
), "result dict must contain 'resolved_backend' key"
resolved = result["resolved_backend"]
assert (
resolved != "auto"
), f"resolved_backend must not be 'auto', got {resolved!r}"
assert resolved in (
"vllm",
"sglang",
"trtllm",
), f"resolved_backend must be a concrete backend, got {resolved!r}"
@pytest.mark.pre_merge
@pytest.mark.gpu_0
@pytest.mark.parallel
def test_naive_fallback_resolved_backend_concrete(self):
"""_run_naive_fallback preserves the concrete backend in 'resolved_backend'
when a concrete backend (e.g. 'vllm') is passed directly."""
try:
from unittest.mock import patch
from dynamo.profiler.rapid import _run_naive_fallback
except ImportError as e:
pytest.skip(f"Missing dependency: {e}")
dgdr = _make_dgdr(backend="vllm")
with (
patch(
"dynamo.profiler.rapid.build_naive_generator_params",
return_value={},
),
patch(
"dynamo.profiler.rapid.generate_backend_artifacts",
return_value={},
),
):
result = _run_naive_fallback(
dgdr,
model="Qwen/Qwen3-32B",
total_gpus=8,
system="h200_sxm",
backend="vllm",
)
assert result.get("resolved_backend") == "vllm"
@pytest.mark.pre_merge
@pytest.mark.gpu_0
@pytest.mark.parallel
def test_naive_fallback_chosen_exp_is_agg(self):
"""_run_naive_fallback always returns chosen_exp='agg' (aggregated config)."""
try:
from unittest.mock import patch
from dynamo.profiler.rapid import _run_naive_fallback
except ImportError as e:
pytest.skip(f"Missing dependency: {e}")
dgdr = _make_dgdr(backend="vllm")
with (
patch(
"dynamo.profiler.rapid.build_naive_generator_params",
return_value={},
),
patch(
"dynamo.profiler.rapid.generate_backend_artifacts",
return_value={},
),
):
result = _run_naive_fallback(
dgdr,
model="Qwen/Qwen3-32B",
total_gpus=8,
system="h200_sxm",
backend="vllm",
)
assert result.get("chosen_exp") == "agg"
# ---------------------------------------------------------------------------
# Regression tests: run_profile skips interpolation for aggregated configs
# ---------------------------------------------------------------------------
class TestRunProfileSkipsInterpolationForAggConfig:
"""Regression tests for the aggregated-config crash in run_interpolation.
When the picked DGD config is aggregated (chosen_exp='agg'), run_profile()
must not call run_interpolation(), which only works with disaggregated
configs that have separate prefill and decode services.
"""
@pytest.mark.pre_merge
@pytest.mark.gpu_0
@pytest.mark.parallel
def test_run_profile_skips_interpolation_when_agg(self, tmp_path):
"""run_profile skips run_interpolation when chosen_exp='agg'."""
try:
import asyncio
from unittest.mock import AsyncMock, patch
from dynamo.planner.utils.planner_config import (
PlannerPreDeploymentSweepMode,
)
from dynamo.profiler.profile_sla import run_profile
from dynamo.profiler.utils.dgdr_v1beta1_types import FeaturesSpec
except ImportError as e:
pytest.skip(f"Missing dependency: {e}")
dgdr = _make_dgdr(
backend="auto",
features=FeaturesSpec(
planner=_make_planner(
enable_throughput_scaling=True,
pre_deployment_sweeping_mode=PlannerPreDeploymentSweepMode.Rapid,
)
),
)
ops = _make_ops(tmp_path)
os.makedirs(ops.output_dir, exist_ok=True)
# Simulate naive fallback result: agg config, resolved backend
agg_dgd = {
"metadata": {"name": "vllm-agg"},
"spec": {"services": {"Frontend": {}, "VllmWorker": {}}},
}
pick_result = {
"best_config_df": None,
"best_latencies": {"ttft": 0.0, "tpot": 0.0, "request_latency": 0.0},
"dgd_config": agg_dgd,
"chosen_exp": "agg",
"resolved_backend": "vllm",
}
_PROFILE_SLA = "dynamo.profiler.profile_sla"
with (
patch(
f"{_PROFILE_SLA}._extract_profiler_params",
return_value=(
"Qwen/Qwen3-32B",
"auto",
"h200_sxm",
8,
4000,
1000,
None,
2000.0,
50.0,
__import__(
"dynamo.profiler.utils.defaults", fromlist=["SearchStrategy"]
).SearchStrategy.RAPID,
"autoscale",
),
),
patch(f"{_PROFILE_SLA}.check_model_hardware_support", return_value=False),
patch(f"{_PROFILE_SLA}._check_auto_backend_support", return_value=False),
patch(f"{_PROFILE_SLA}.validate_dgdr_dynamo_features"),
patch(
f"{_PROFILE_SLA}._execute_strategy",
new_callable=AsyncMock,
return_value=(
pick_result,
PickedParallelConfig(tp=1),
PickedParallelConfig(tp=1),
2000.0,
50.0,
),
),
patch(
f"{_PROFILE_SLA}.run_interpolation", new_callable=AsyncMock
) as mock_interp,
patch(f"{_PROFILE_SLA}.assemble_final_config", return_value=agg_dgd),
patch(f"{_PROFILE_SLA}.needs_profile_data", return_value=True),
patch(
f"{_PROFILE_SLA}.get_model_config_from_model_path",
side_effect=Exception("no model"),
),
patch(
f"{_PROFILE_SLA}.cleanup_remaining_deployments", new_callable=AsyncMock
),
patch(f"{_PROFILE_SLA}.valid_dgdr_spec"),
):
asyncio.run(run_profile(dgdr, ops))
# run_interpolation must NOT have been called for agg configs
mock_interp.assert_not_called()
@pytest.mark.pre_merge
@pytest.mark.gpu_0
@pytest.mark.parallel
def test_run_profile_calls_interpolation_with_resolved_backend_for_disagg(
self, tmp_path
):
"""run_profile passes the concrete resolved backend (not 'auto') to
run_interpolation when the picked config is disaggregated."""
try:
import asyncio
from unittest.mock import AsyncMock, patch
from dynamo.planner.utils.planner_config import (
PlannerPreDeploymentSweepMode,
)
from dynamo.profiler.profile_sla import run_profile
from dynamo.profiler.utils.dgdr_v1beta1_types import FeaturesSpec
except ImportError as e:
pytest.skip(f"Missing dependency: {e}")
dgdr = _make_dgdr(
backend="auto",
features=FeaturesSpec(
planner=_make_planner(
enable_throughput_scaling=True,
pre_deployment_sweeping_mode=PlannerPreDeploymentSweepMode.Rapid,
)
),
)
ops = _make_ops(tmp_path)
os.makedirs(ops.output_dir, exist_ok=True)
# Simulate AIC disagg result with 'auto' resolved to 'vllm'
disagg_dgd = {
"metadata": {"name": "vllm-disagg"},
"spec": {
"services": {
"Frontend": {},
"VllmPrefillWorker": {},
"VllmDecodeWorker": {},
}
},
}
pick_result = {
"best_config_df": None,
"best_latencies": {"ttft": 0.0, "tpot": 0.0, "request_latency": 0.0},
"dgd_config": disagg_dgd,
"chosen_exp": "disagg",
"resolved_backend": "vllm",
}
_PROFILE_SLA = "dynamo.profiler.profile_sla"
with (
patch(
f"{_PROFILE_SLA}._extract_profiler_params",
return_value=(
"Qwen/Qwen3-32B",
"auto",
"h200_sxm",
8,
4000,
1000,
None,
2000.0,
50.0,
__import__(
"dynamo.profiler.utils.defaults", fromlist=["SearchStrategy"]
).SearchStrategy.RAPID,
"autoscale",
),
),
patch(f"{_PROFILE_SLA}.check_model_hardware_support", return_value=False),
patch(f"{_PROFILE_SLA}._check_auto_backend_support", return_value=False),
patch(f"{_PROFILE_SLA}.validate_dgdr_dynamo_features"),
patch(
f"{_PROFILE_SLA}._execute_strategy",
new_callable=AsyncMock,
return_value=(
pick_result,
PickedParallelConfig(tp=1),
PickedParallelConfig(tp=1),
2000.0,
50.0,
),
),
patch(
f"{_PROFILE_SLA}.run_interpolation", new_callable=AsyncMock
) as mock_interp,
patch(f"{_PROFILE_SLA}.assemble_final_config", return_value=disagg_dgd),
patch(f"{_PROFILE_SLA}.needs_profile_data", return_value=True),
patch(
f"{_PROFILE_SLA}.get_model_config_from_model_path",
side_effect=Exception("no model"),
),
patch(
f"{_PROFILE_SLA}.cleanup_remaining_deployments", new_callable=AsyncMock
),
patch(f"{_PROFILE_SLA}.valid_dgdr_spec"),
):
asyncio.run(run_profile(dgdr, ops))
# run_interpolation must be called, and with the resolved 'vllm' backend, not 'auto'
mock_interp.assert_called_once()
call_kwargs = mock_interp.call_args
# backend is the 8th positional argument (index 7)
called_backend = (
call_kwargs.args[7]
if call_kwargs.args
else call_kwargs.kwargs.get("backend")
)
assert (
called_backend == "vllm"
), f"run_interpolation must be called with resolved backend 'vllm', got {called_backend!r}"
...@@ -23,7 +23,9 @@ try: ...@@ -23,7 +23,9 @@ try:
from dynamo.profiler.rapid import _run_default_sim, _run_naive_fallback from dynamo.profiler.rapid import _run_default_sim, _run_naive_fallback
from dynamo.profiler.utils.dgdr_v1beta1_types import ( from dynamo.profiler.utils.dgdr_v1beta1_types import (
DynamoGraphDeploymentRequestSpec, DynamoGraphDeploymentRequestSpec,
FeaturesSpec,
HardwareSpec, HardwareSpec,
MockerSpec,
ModelCacheSpec, ModelCacheSpec,
SLASpec, SLASpec,
WorkloadSpec, WorkloadSpec,
...@@ -331,3 +333,85 @@ class TestRunDefaultSim: ...@@ -331,3 +333,85 @@ class TestRunDefaultSim:
assert result["best_latencies"]["ttft"] == 123.0 assert result["best_latencies"]["ttft"] == 123.0
assert result["best_latencies"]["tpot"] == 7.0 assert result["best_latencies"]["tpot"] == 7.0
# ---------------------------------------------------------------------------
# Force-disagg when interpolation data is needed
# ---------------------------------------------------------------------------
class TestRunDefaultSimForceDisagg:
"""When AIC picks an aggregated config but the DGDR requires interpolation
data (mocker or throughput-scaling), _run_default_sim must override the
selection to the best available disaggregated config."""
def _call_default_sim(self, dgdr, execute_return_value):
with (
patch("dynamo.profiler.rapid.build_default_task_configs", return_value={}),
patch(
"dynamo.profiler.rapid._execute_task_configs",
return_value=execute_return_value,
),
patch("dynamo.profiler.rapid._generate_dgd_from_pick", return_value=None),
):
return _run_default_sim(
dgdr,
"Qwen/Qwen3-32B",
"h200_sxm",
"trtllm",
8,
4000,
1000,
2000.0,
50.0,
None,
"default",
)
def _both_configs(self, chosen="agg"):
"""Return value where both agg and disagg configs are available."""
agg_df = pd.DataFrame([{"tp(p)": 1}])
disagg_df = pd.DataFrame([{"tp(p)": 1}])
latencies = {"ttft": 100.0, "tpot": 10.0, "request_latency": 0.0}
return (
chosen,
{"agg": agg_df, "disagg": disagg_df},
None,
None,
{"agg": latencies, "disagg": latencies},
)
@pytest.mark.pre_merge
@pytest.mark.gpu_0
def test_mocker_enabled_agg_picked_overrides_to_disagg(self):
"""When mocker is enabled and AIC picks agg, chosen is overridden to disagg."""
dgdr = _make_dgdr(features=FeaturesSpec(mocker=MockerSpec(enabled=True)))
result = self._call_default_sim(dgdr, self._both_configs(chosen="agg"))
assert result["chosen_exp"] == "disagg"
@pytest.mark.pre_merge
@pytest.mark.gpu_0
def test_no_profile_data_needed_agg_pick_preserved(self):
"""When no interpolation data is needed, an agg pick is kept as-is."""
dgdr = _make_dgdr() # no mocker, no throughput scaling
result = self._call_default_sim(dgdr, self._both_configs(chosen="agg"))
assert result["chosen_exp"] == "agg"
@pytest.mark.pre_merge
@pytest.mark.gpu_0
def test_mocker_enabled_disagg_picked_unchanged(self):
"""When mocker is enabled but AIC already picks disagg, no override happens."""
dgdr = _make_dgdr(features=FeaturesSpec(mocker=MockerSpec(enabled=True)))
result = self._call_default_sim(dgdr, self._both_configs(chosen="disagg"))
assert result["chosen_exp"] == "disagg"
@pytest.mark.pre_merge
@pytest.mark.gpu_0
def test_mocker_enabled_agg_only_available_keeps_agg(self):
"""When mocker is enabled, agg is picked, and no disagg config exists, keep agg."""
dgdr = _make_dgdr(features=FeaturesSpec(mocker=MockerSpec(enabled=True)))
agg_df = pd.DataFrame([{"tp(p)": 1}])
latencies = {"ttft": 100.0, "tpot": 10.0, "request_latency": 0.0}
agg_only = ("agg", {"agg": agg_df}, None, None, {"agg": latencies})
result = self._call_default_sim(dgdr, agg_only)
assert result["chosen_exp"] == "agg"
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment