Unverified Commit 41b357d8 authored by Hongkuan Zhou's avatar Hongkuan Zhou Committed by GitHub
Browse files

fix: use profiler DGD gen route in naive fallback mode (#7099)


Signed-off-by: default avatarhongkuanz <hongkuanz@nvidia.com>
parent 8f3188d3
...@@ -20,20 +20,35 @@ import logging ...@@ -20,20 +20,35 @@ import logging
import pandas as pd import pandas as pd
import yaml import yaml
from aiconfigurator.cli.main import _execute_task_configs, build_default_task_configs from aiconfigurator.cli.main import _execute_task_configs, build_default_task_configs
from aiconfigurator.generator.api import ( from aiconfigurator.generator.api import generate_backend_artifacts
generate_backend_artifacts,
generate_naive_config,
)
from aiconfigurator.generator.module_bridge import task_config_to_generator_config from aiconfigurator.generator.module_bridge import task_config_to_generator_config
from aiconfigurator.generator.naive import build_naive_generator_params
from aiconfigurator.sdk.task import TaskConfig, TaskRunner from aiconfigurator.sdk.task import TaskConfig, TaskRunner
from dynamo.profiler.utils.config_modifiers import CONFIG_MODIFIERS
from dynamo.profiler.utils.dgdr_v1beta1_types import DynamoGraphDeploymentRequestSpec from dynamo.profiler.utils.dgdr_v1beta1_types import DynamoGraphDeploymentRequestSpec
from dynamo.profiler.utils.profile_common import derive_backend_image from dynamo.profiler.utils.profile_common import derive_backend_image
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def _build_k8s_overrides(
dgdr: DynamoGraphDeploymentRequestSpec,
backend: str,
) -> dict:
"""Extract K8s overrides (image, PVC) from a DGDR spec."""
overrides: dict = {
"k8s_image": derive_backend_image(dgdr.image, backend),
}
if dgdr.modelCache:
if dgdr.modelCache.pvcName:
overrides["k8s_pvc_name"] = dgdr.modelCache.pvcName
if dgdr.modelCache.pvcMountPath:
overrides["k8s_pvc_mount_path"] = dgdr.modelCache.pvcMountPath
if dgdr.modelCache.pvcModelPath:
overrides["k8s_model_path_in_pvc"] = dgdr.modelCache.pvcModelPath
return overrides
def _generate_dgd_from_pick( def _generate_dgd_from_pick(
dgdr: DynamoGraphDeploymentRequestSpec, dgdr: DynamoGraphDeploymentRequestSpec,
best_config_df: pd.DataFrame, best_config_df: pd.DataFrame,
...@@ -61,24 +76,11 @@ def _generate_dgd_from_pick( ...@@ -61,24 +76,11 @@ def _generate_dgd_from_pick(
if "total_gpus_needed" in row.index and row["total_gpus_needed"] > 0: if "total_gpus_needed" in row.index and row["total_gpus_needed"] > 0:
tc.total_gpus = int(row["total_gpus_needed"]) tc.total_gpus = int(row["total_gpus_needed"])
generator_overrides: dict = {} k8s_overrides = _build_k8s_overrides(dgdr, tc.backend_name)
k8s_overrides: dict = {}
k8s_overrides["k8s_image"] = derive_backend_image(dgdr.image, tc.backend_name)
if dgdr.modelCache:
if dgdr.modelCache.pvcName:
k8s_overrides["k8s_pvc_name"] = dgdr.modelCache.pvcName
if dgdr.modelCache.pvcMountPath:
k8s_overrides["k8s_pvc_mount_path"] = dgdr.modelCache.pvcMountPath
if dgdr.modelCache.pvcModelPath:
k8s_overrides["k8s_model_path_in_pvc"] = dgdr.modelCache.pvcModelPath
if k8s_overrides:
generator_overrides["K8sConfig"] = k8s_overrides
cfg = task_config_to_generator_config( cfg = task_config_to_generator_config(
task_config=tc, task_config=tc,
result_df=row, result_df=row,
generator_overrides=generator_overrides or None, generator_overrides={"K8sConfig": k8s_overrides} if k8s_overrides else None,
) )
tc.total_gpus = original_total_gpus tc.total_gpus = original_total_gpus
...@@ -105,7 +107,13 @@ def _run_naive_fallback( ...@@ -105,7 +107,13 @@ def _run_naive_fallback(
system: str, system: str,
backend: str, backend: str,
) -> dict: ) -> dict:
"""Handle the AIC-unsupported path via naive config generation.""" """Handle the AIC-unsupported path via naive config generation.
Builds naive generator params (CLI args, parallelism) and then
assembles the DGD via ``build_dgd_config`` — the same route used
by the normal simulation path — so the output always uses the
clean base DGD YAMLs with actual ``command``/``args`` arrays.
"""
if backend == "auto": if backend == "auto":
backend = _DEFAULT_NAIVE_BACKEND backend = _DEFAULT_NAIVE_BACKEND
logger.info( logger.info(
...@@ -115,31 +123,32 @@ def _run_naive_fallback( ...@@ -115,31 +123,32 @@ def _run_naive_fallback(
logger.info( logger.info(
"AIC does not support this combo — falling back to naive config generation." "AIC does not support this combo — falling back to naive config generation."
) )
naive_result = generate_naive_config(model, total_gpus, system, backend)
dgd_yaml = naive_result.get("artifacts", {}).get("k8s_deploy.yaml", "") generator_params = build_naive_generator_params(
dgd_config = yaml.safe_load(dgd_yaml) if dgd_yaml else None
if dgd_config:
config_modifier = CONFIG_MODIFIERS[backend]
dgd_config = config_modifier.update_image(
dgd_config, derive_backend_image(dgdr.image, backend)
)
if dgdr.modelCache and dgdr.modelCache.pvcName:
dgd_config = config_modifier.update_model_from_pvc(
dgd_config,
model_name=model, model_name=model,
pvc_name=dgdr.modelCache.pvcName, total_gpus=total_gpus,
pvc_mount_path=dgdr.modelCache.pvcMountPath, system_name=system,
pvc_path=dgdr.modelCache.pvcModelPath or "", backend_name=backend,
) )
else:
dgd_config = config_modifier.update_model(dgd_config, model_name=model) k8s_overrides = _build_k8s_overrides(dgdr, backend)
generator_params.setdefault("K8sConfig", {}).update(k8s_overrides)
# Generate DGD through the dynamo config modifier (build_dgd_config),
# which loads the clean base YAML and produces proper command/args arrays.
artifacts = generate_backend_artifacts(
params=generator_params,
backend=backend,
use_dynamo_generator=True,
)
dgd_yaml = artifacts.get("k8s_deploy.yaml", "")
dgd_config = yaml.safe_load(dgd_yaml) if dgd_yaml else None
return { return {
"best_config_df": pd.DataFrame(), "best_config_df": pd.DataFrame(),
"best_latencies": {"ttft": 0.0, "tpot": 0.0, "request_latency": 0.0}, "best_latencies": {"ttft": 0.0, "tpot": 0.0, "request_latency": 0.0},
"dgd_config": dgd_config, "dgd_config": dgd_config,
"chosen_exp": "agg", # AIC's naive route always generate agg config "chosen_exp": "agg",
} }
......
...@@ -8,9 +8,10 @@ helpers (_run_autoscale_sim) require the full AIC stack and are covered by ...@@ -8,9 +8,10 @@ helpers (_run_autoscale_sim) require the full AIC stack and are covered by
the end-to-end test suite. the end-to-end test suite.
""" """
import copy
import sys import sys
from pathlib import Path from pathlib import Path
from unittest.mock import MagicMock, patch from unittest.mock import patch
import pandas as pd import pandas as pd
import pytest import pytest
...@@ -49,27 +50,35 @@ def _make_dgdr(**overrides) -> DynamoGraphDeploymentRequestSpec: ...@@ -49,27 +50,35 @@ def _make_dgdr(**overrides) -> DynamoGraphDeploymentRequestSpec:
return DynamoGraphDeploymentRequestSpec(**base) return DynamoGraphDeploymentRequestSpec(**base)
def _fake_modifier(update_image_return=None):
m = MagicMock()
m.update_image.return_value = update_image_return or {"kind": "DGD"}
m.update_model_from_pvc.return_value = {"kind": "DGD"}
return m
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# _run_naive_fallback # _run_naive_fallback
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
_FAKE_GENERATOR_PARAMS: dict = {"params": {"agg": {}}, "K8sConfig": {}}
class TestRunNaiveFallback: class TestRunNaiveFallback:
"""Tests for the naive fallback path.
The naive path calls build_naive_generator_params to compute CLI args /
parallelism, then generate_backend_artifacts(use_dynamo_generator=True)
to assemble the DGD via the config modifier system.
"""
@pytest.mark.pre_merge @pytest.mark.pre_merge
@pytest.mark.gpu_0 @pytest.mark.gpu_0
def test_returns_expected_structure(self): def test_returns_expected_structure(self):
"""Result always has the four required keys with zeroed latencies.""" """Result always has the four required keys with zeroed latencies."""
dgdr = _make_dgdr() dgdr = _make_dgdr()
with patch( with (
"dynamo.profiler.rapid.generate_naive_config", patch(
return_value={"artifacts": {}}, "dynamo.profiler.rapid.build_naive_generator_params",
return_value=copy.deepcopy(_FAKE_GENERATOR_PARAMS),
),
patch(
"dynamo.profiler.rapid.generate_backend_artifacts",
return_value={},
),
): ):
result = _run_naive_fallback(dgdr, "Qwen/Qwen3-32B", 4, "l40s", "vllm") result = _run_naive_fallback(dgdr, "Qwen/Qwen3-32B", 4, "l40s", "vllm")
...@@ -93,17 +102,23 @@ class TestRunNaiveFallback: ...@@ -93,17 +102,23 @@ class TestRunNaiveFallback:
def test_empty_artifacts_yields_none_dgd_config(self): def test_empty_artifacts_yields_none_dgd_config(self):
"""No k8s_deploy.yaml in artifacts → dgd_config is None.""" """No k8s_deploy.yaml in artifacts → dgd_config is None."""
dgdr = _make_dgdr() dgdr = _make_dgdr()
with patch( with (
"dynamo.profiler.rapid.generate_naive_config", patch(
return_value={"artifacts": {}}, "dynamo.profiler.rapid.build_naive_generator_params",
return_value=copy.deepcopy(_FAKE_GENERATOR_PARAMS),
),
patch(
"dynamo.profiler.rapid.generate_backend_artifacts",
return_value={},
),
): ):
result = _run_naive_fallback(dgdr, "Qwen/Qwen3-32B", 4, "l40s", "vllm") result = _run_naive_fallback(dgdr, "Qwen/Qwen3-32B", 4, "l40s", "vllm")
assert result["dgd_config"] is None assert result["dgd_config"] is None
@pytest.mark.pre_merge @pytest.mark.pre_merge
@pytest.mark.gpu_0 @pytest.mark.gpu_0
def test_with_pvc_calls_update_model_from_pvc(self): def test_with_pvc_passes_pvc_overrides(self):
"""When modelCache.pvcName is set, update_model_from_pvc is called.""" """When modelCache.pvcName is set, PVC overrides are injected into generator params."""
dgdr = _make_dgdr( dgdr = _make_dgdr(
modelCache=ModelCacheSpec( modelCache=ModelCacheSpec(
pvcName="model-cache", pvcName="model-cache",
...@@ -111,34 +126,59 @@ class TestRunNaiveFallback: ...@@ -111,34 +126,59 @@ class TestRunNaiveFallback:
pvcMountPath="/opt/model-cache", pvcMountPath="/opt/model-cache",
) )
) )
fake_modifier = _fake_modifier() captured_params = {}
def fake_generate(params, backend, use_dynamo_generator=False):
captured_params.update(params)
return {
"k8s_deploy.yaml": "kind: DGD\nmetadata:\n name: test\nspec:\n services: {}"
}
with ( with (
patch( patch(
"dynamo.profiler.rapid.generate_naive_config", "dynamo.profiler.rapid.build_naive_generator_params",
return_value={"artifacts": {"k8s_deploy.yaml": "kind: DGD"}}, return_value=copy.deepcopy(_FAKE_GENERATOR_PARAMS),
),
patch(
"dynamo.profiler.rapid.generate_backend_artifacts",
side_effect=fake_generate,
), ),
patch("dynamo.profiler.rapid.CONFIG_MODIFIERS", {"vllm": fake_modifier}),
): ):
_run_naive_fallback(dgdr, "Qwen/Qwen3-32B", 4, "l40s", "vllm") _run_naive_fallback(dgdr, "Qwen/Qwen3-32B", 4, "l40s", "vllm")
fake_modifier.update_model_from_pvc.assert_called_once() k8s = captured_params.get("K8sConfig", {})
assert k8s.get("k8s_pvc_name") == "model-cache"
assert k8s.get("k8s_pvc_mount_path") == "/opt/model-cache"
assert k8s.get("k8s_model_path_in_pvc") == "/model/qwen"
@pytest.mark.pre_merge @pytest.mark.pre_merge
@pytest.mark.gpu_0 @pytest.mark.gpu_0
def test_without_pvc_skips_update_model_from_pvc(self): def test_without_pvc_has_no_pvc_overrides(self):
"""When no modelCache, update_model_from_pvc is not called.""" """When no modelCache, PVC keys are absent from generator params."""
dgdr = _make_dgdr() dgdr = _make_dgdr()
fake_modifier = _fake_modifier() captured_params = {}
def fake_generate(params, backend, use_dynamo_generator=False):
captured_params.update(params)
return {
"k8s_deploy.yaml": "kind: DGD\nmetadata:\n name: test\nspec:\n services: {}"
}
with ( with (
patch( patch(
"dynamo.profiler.rapid.generate_naive_config", "dynamo.profiler.rapid.build_naive_generator_params",
return_value={"artifacts": {"k8s_deploy.yaml": "kind: DGD"}}, return_value=copy.deepcopy(_FAKE_GENERATOR_PARAMS),
),
patch(
"dynamo.profiler.rapid.generate_backend_artifacts",
side_effect=fake_generate,
), ),
patch("dynamo.profiler.rapid.CONFIG_MODIFIERS", {"vllm": fake_modifier}),
): ):
_run_naive_fallback(dgdr, "Qwen/Qwen3-32B", 4, "l40s", "vllm") _run_naive_fallback(dgdr, "Qwen/Qwen3-32B", 4, "l40s", "vllm")
fake_modifier.update_model_from_pvc.assert_not_called() k8s = captured_params.get("K8sConfig", {})
assert "k8s_pvc_name" not in k8s
assert "k8s_pvc_mount_path" not in k8s
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment