Unverified Commit d688aa68 authored by Hongkuan Zhou's avatar Hongkuan Zhou Committed by GitHub
Browse files

chore: better error message for planner sweeping mode (#6844)


Signed-off-by: default avatarhongkuanz <hongkuanz@nvidia.com>
parent a60cdf59
...@@ -38,8 +38,8 @@ from dynamo.profiler.utils.dgdr_v1beta1_types import ( ...@@ -38,8 +38,8 @@ from dynamo.profiler.utils.dgdr_v1beta1_types import (
DynamoGraphDeploymentRequestSpec, DynamoGraphDeploymentRequestSpec,
) )
from dynamo.profiler.utils.dgdr_validate import ( from dynamo.profiler.utils.dgdr_validate import (
run_gate_checks, valid_dgdr_spec,
validate_dgdr_for_profiler, validate_dgdr_dynamo_features,
) )
from dynamo.profiler.utils.profile_common import ( from dynamo.profiler.utils.profile_common import (
ProfilerOperationalConfig, ProfilerOperationalConfig,
...@@ -67,6 +67,27 @@ def _check_auto_backend_support(model: str, system: str) -> bool: ...@@ -67,6 +67,27 @@ def _check_auto_backend_support(model: str, system: str) -> bool:
) )
def _needs_interpolation(dgdr: DynamoGraphDeploymentRequestSpec) -> bool:
"""True when interpolation data will actually be consumed.
Only throughput-based scaling and the mocker backend use the
per-engine performance curves produced by ``run_interpolation``.
Load-based scaling does not require them.
"""
if dgdr.features is None:
return False
planner = dgdr.features.planner
if planner and planner.enable_throughput_scaling:
return True
mocker = dgdr.features.mocker
if mocker and mocker.enabled:
return True
return False
def _extract_profiler_params(dgdr: DynamoGraphDeploymentRequestSpec) -> tuple: def _extract_profiler_params(dgdr: DynamoGraphDeploymentRequestSpec) -> tuple:
"""Pull all profiler parameters from dgdr and log them.""" """Pull all profiler parameters from dgdr and log them."""
model = dgdr.model model = dgdr.model
...@@ -311,9 +332,8 @@ async def run_profile( ...@@ -311,9 +332,8 @@ async def run_profile(
) )
try: try:
# Validate and normalise — after this, required fields are guaranteed non-None # Validate DGDR spec — after this, required fields are guaranteed non-None
validate_dgdr_for_profiler(dgdr) valid_dgdr_spec(dgdr)
( (
model, model,
backend, backend,
...@@ -327,12 +347,12 @@ async def run_profile( ...@@ -327,12 +347,12 @@ async def run_profile(
search_strategy, search_strategy,
picking_mode, picking_mode,
) = _extract_profiler_params(dgdr) ) = _extract_profiler_params(dgdr)
if backend == "auto": if backend == "auto":
aic_supported = _check_auto_backend_support(model, system) aic_supported = _check_auto_backend_support(model, system)
else: else:
aic_supported = check_model_hardware_support(model, system, backend) aic_supported = check_model_hardware_support(model, system, backend)
run_gate_checks(dgdr, aic_supported, search_strategy, backend) # then validate DGDR features based on AIC support
validate_dgdr_dynamo_features(dgdr, aic_supported)
( (
pick_result, pick_result,
...@@ -361,9 +381,10 @@ async def run_profile( ...@@ -361,9 +381,10 @@ async def run_profile(
dgd_config = pick_result.get("dgd_config") if not ops.dry_run else None dgd_config = pick_result.get("dgd_config") if not ops.dry_run else None
# --------------------------------------------------------------- # ---------------------------------------------------------------
# Interpolation curves # Interpolation curves — only needed when something consumes
# the per-engine performance data (throughput scaling or mocker).
# --------------------------------------------------------------- # ---------------------------------------------------------------
if not ops.dry_run and is_planner_enabled(dgdr) and dgd_config: if not ops.dry_run and dgd_config and _needs_interpolation(dgdr):
try: try:
model_cfg = get_model_config_from_model_path(resolve_model_path(dgdr)) model_cfg = get_model_config_from_model_path(resolve_model_path(dgdr))
sweep_max_context_length = model_cfg.get("max_position_embeddings", 0) sweep_max_context_length = model_cfg.get("max_position_embeddings", 0)
......
...@@ -132,6 +132,8 @@ def _run_naive_fallback( ...@@ -132,6 +132,8 @@ def _run_naive_fallback(
pvc_mount_path=dgdr.modelCache.pvcMountPath, pvc_mount_path=dgdr.modelCache.pvcMountPath,
pvc_path=dgdr.modelCache.pvcModelPath or "", pvc_path=dgdr.modelCache.pvcModelPath or "",
) )
else:
dgd_config = config_modifier.update_model(dgd_config, model_name=model)
return { return {
"best_config_df": pd.DataFrame(), "best_config_df": pd.DataFrame(),
......
...@@ -39,7 +39,7 @@ from dynamo.profiler.utils.profile_common import is_planner_enabled ...@@ -39,7 +39,7 @@ from dynamo.profiler.utils.profile_common import is_planner_enabled
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def validate_dgdr_for_profiler( def valid_dgdr_spec(
dgdr: DynamoGraphDeploymentRequestSpec, dgdr: DynamoGraphDeploymentRequestSpec,
) -> DynamoGraphDeploymentRequestSpec: ) -> DynamoGraphDeploymentRequestSpec:
"""Validate and normalise a DGDR spec for the profiler. """Validate and normalise a DGDR spec for the profiler.
...@@ -63,7 +63,7 @@ def validate_dgdr_for_profiler( ...@@ -63,7 +63,7 @@ def validate_dgdr_for_profiler(
_validate_required_fields(dgdr) _validate_required_fields(dgdr)
_validate_workload(dgdr.workload) _validate_workload(dgdr.workload)
_validate_sla(dgdr.sla) _validate_sla(dgdr.sla)
_validate_features(dgdr) _validate_parallelization_sweeping_mode(dgdr)
return dgdr return dgdr
...@@ -124,53 +124,47 @@ def _validate_sla(sla: SLASpec) -> None: ...@@ -124,53 +124,47 @@ def _validate_sla(sla: SLASpec) -> None:
) )
def run_gate_checks( def _validate_parallelization_sweeping_mode(
dgdr: DynamoGraphDeploymentRequestSpec, dgdr: DynamoGraphDeploymentRequestSpec,
aic_supported: bool,
search_strategy: SearchStrategy,
backend: str,
) -> None: ) -> None:
"""Raise ValueError or log warnings for unsupported combos. # do not support auto backend selection for real GPU sweeping
if dgdr.searchStrategy == SearchStrategy.THOROUGH and dgdr.backend == "auto":
Must be called after ``validate_dgdr_for_profiler``.
"""
if is_planner_enabled(dgdr) and not aic_supported:
model = dgdr.model
system = dgdr.hardware.gpuSku.lower()
planner_cfg = dgdr.features.planner
if planner_cfg.enable_throughput_scaling:
raise ValueError(
"Throughput-based planner scaling requires AIC support, but "
f"{model} on {system}/{backend} is not supported by AIC. "
"Use a supported model/hardware/backend combination or disable throughput scaling."
)
if (
planner_cfg.pre_deployment_sweeping_mode
== PlannerPreDeploymentSweepMode.Rapid
):
logger.warning(
"Planner pre-deployment sweeping mode is 'rapid' but AIC does not support "
"%s on %s/%s. Falling back to 'none' (no pre-deployment sweeping).",
model,
system,
backend,
)
planner_cfg.pre_deployment_sweeping_mode = (
PlannerPreDeploymentSweepMode.None_
)
if search_strategy == SearchStrategy.THOROUGH and backend == "auto":
raise ValueError( raise ValueError(
"THOROUGH search strategy does not support 'auto' backend. " "THOROUGH search strategy does not support 'auto' backend. "
"Please specify a concrete backend (trtllm, vllm, sglang)." "Please specify a concrete backend (trtllm, vllm, sglang)."
) )
def _validate_features(dgdr: DynamoGraphDeploymentRequestSpec) -> None: def validate_dgdr_dynamo_features(
dgdr: DynamoGraphDeploymentRequestSpec, aic_supported: bool
) -> None:
"""Cross-field validation for features.""" """Cross-field validation for features."""
if not dgdr.features: if not dgdr.features:
return return
# Planner
if is_planner_enabled(dgdr):
planner_cfg = dgdr.features.planner
# throughput scaling requires in-depth profiling data
if planner_cfg.enable_throughput_scaling:
planner_sweep_mode = planner_cfg.pre_deployment_sweeping_mode
if (
planner_sweep_mode is None
or planner_sweep_mode == PlannerPreDeploymentSweepMode.None_
):
raise ValueError(
"pre_deployment_sweeping_mode in PlannerConfig cannot be 'none' when enable_throughput_scaling is enabled. "
"Throughput-based scaling requires pre-deployment sweeping to generate engine performance data."
)
elif (
planner_sweep_mode == PlannerPreDeploymentSweepMode.Rapid
and not aic_supported
):
raise ValueError(
f"AIC does not support {dgdr.model} on {dgdr.hardware.gpuSku.lower()} and {dgdr.backend}. "
"pre_deployment_sweeping_mode in PlannerConfig can only be 'thorough' when AIC does not support the model/hardware/backend combination. "
)
# Mocker requires pre-deployment sweeping # Mocker requires pre-deployment sweeping
if dgdr.features.mocker and dgdr.features.mocker.enabled and dgdr.features.planner: if dgdr.features.mocker and dgdr.features.mocker.enabled and dgdr.features.planner:
sweep_mode = dgdr.features.planner.pre_deployment_sweeping_mode sweep_mode = dgdr.features.planner.pre_deployment_sweeping_mode
......
...@@ -41,7 +41,10 @@ try: ...@@ -41,7 +41,10 @@ try:
SLASpec, SLASpec,
WorkloadSpec, WorkloadSpec,
) )
from dynamo.profiler.utils.dgdr_validate import run_gate_checks from dynamo.profiler.utils.dgdr_validate import (
valid_dgdr_spec,
validate_dgdr_dynamo_features,
)
from dynamo.profiler.utils.profile_common import ProfilerOperationalConfig from dynamo.profiler.utils.profile_common import ProfilerOperationalConfig
except ImportError as e: except ImportError as e:
pytest.skip(f"Skip (missing dependency): {e}", allow_module_level=True) pytest.skip(f"Skip (missing dependency): {e}", allow_module_level=True)
...@@ -159,123 +162,211 @@ class TestExtractProfilerParams: ...@@ -159,123 +162,211 @@ class TestExtractProfilerParams:
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# run_gate_checks # valid_dgdr_spec
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
class TestRunGateChecks: class TestValidDgdrSpec:
@pytest.mark.pre_merge @pytest.mark.pre_merge
@pytest.mark.gpu_0 @pytest.mark.gpu_0
def test_thorough_auto_backend_raises(self): def test_thorough_auto_backend_raises(self):
"""THOROUGH + 'auto' backend is rejected.""" """THOROUGH + 'auto' backend is rejected."""
dgdr = _make_dgdr() dgdr = _make_dgdr(searchStrategy="thorough", backend="auto")
with pytest.raises(ValueError, match="does not support 'auto' backend"): with pytest.raises(ValueError, match="does not support 'auto' backend"):
run_gate_checks( valid_dgdr_spec(dgdr)
dgdr,
aic_supported=True,
search_strategy=SearchStrategy.THOROUGH,
backend="auto",
)
@pytest.mark.pre_merge @pytest.mark.pre_merge
@pytest.mark.gpu_0 @pytest.mark.gpu_0
def test_thorough_concrete_backend_passes(self): def test_thorough_concrete_backend_passes(self):
"""THOROUGH + concrete backend is fine.""" """THOROUGH + concrete backend is fine."""
dgdr = _make_dgdr() dgdr = _make_dgdr(searchStrategy="thorough", backend="trtllm")
run_gate_checks( valid_dgdr_spec(dgdr)
dgdr,
aic_supported=True,
search_strategy=SearchStrategy.THOROUGH,
backend="trtllm",
)
@pytest.mark.pre_merge @pytest.mark.pre_merge
@pytest.mark.gpu_0 @pytest.mark.gpu_0
def test_rapid_auto_backend_passes(self): def test_rapid_auto_backend_passes(self):
"""RAPID allows 'auto' backend.""" """RAPID allows 'auto' backend."""
dgdr = _make_dgdr() dgdr = _make_dgdr(backend="auto")
run_gate_checks( valid_dgdr_spec(dgdr)
dgdr,
aic_supported=False, @pytest.mark.pre_merge
search_strategy=SearchStrategy.RAPID, @pytest.mark.gpu_0
backend="auto", def test_missing_image_raises(self):
"""image is required."""
dgdr = _make_dgdr(image="")
with pytest.raises(ValueError, match="image.*required"):
valid_dgdr_spec(dgdr)
@pytest.mark.pre_merge
@pytest.mark.gpu_0
def test_missing_hardware_raises(self):
"""hardware is required."""
dgdr = _make_dgdr(hardware=None)
with pytest.raises(ValueError, match="hardware.*required"):
valid_dgdr_spec(dgdr)
@pytest.mark.pre_merge
@pytest.mark.gpu_0
def test_missing_gpu_sku_raises(self):
"""hardware.gpuSku is required."""
dgdr = _make_dgdr(hardware=HardwareSpec(gpuSku="", numGpusPerNode=8))
with pytest.raises(ValueError, match="gpuSku.*required"):
valid_dgdr_spec(dgdr)
@pytest.mark.pre_merge
@pytest.mark.gpu_0
def test_zero_gpus_per_node_raises(self):
"""hardware.numGpusPerNode must be positive."""
dgdr = _make_dgdr(hardware=HardwareSpec(gpuSku="h200_sxm", numGpusPerNode=0))
with pytest.raises(ValueError, match="numGpusPerNode.*positive"):
valid_dgdr_spec(dgdr)
@pytest.mark.pre_merge
@pytest.mark.gpu_0
def test_none_workload_gets_default(self):
"""None workload is populated with a default WorkloadSpec."""
dgdr = _make_dgdr(workload=None)
valid_dgdr_spec(dgdr)
assert dgdr.workload is not None
@pytest.mark.pre_merge
@pytest.mark.gpu_0
def test_none_sla_gets_default(self):
"""None sla is populated with a default SLASpec."""
dgdr = _make_dgdr(sla=None)
valid_dgdr_spec(dgdr)
assert dgdr.sla is not None
@pytest.mark.pre_merge
@pytest.mark.gpu_0
def test_both_concurrency_and_rate_raises(self):
"""concurrency and requestRate are mutually exclusive."""
dgdr = _make_dgdr(
workload=WorkloadSpec(isl=4000, osl=1000, concurrency=10, requestRate=5.0)
) )
with pytest.raises(ValueError, match="concurrency.*requestRate"):
valid_dgdr_spec(dgdr)
@pytest.mark.pre_merge
@pytest.mark.gpu_0
def test_negative_sla_ttft_raises(self):
"""Negative SLA ttft must be rejected."""
dgdr = _make_dgdr(sla=SLASpec(ttft=-1.0, itl=30.0))
with pytest.raises(ValueError, match="ttft.*positive"):
valid_dgdr_spec(dgdr)
@pytest.mark.pre_merge @pytest.mark.pre_merge
@pytest.mark.gpu_0 @pytest.mark.gpu_0
def test_no_planner_aic_unsupported_passes(self): def test_e2e_latency_clears_ttft_itl(self):
"""No planner, AIC unsupported — no error.""" """e2eLatency takes precedence and nulls out ttft/itl."""
dgdr = _make_dgdr(sla=SLASpec(ttft=None, itl=None, e2eLatency=35000.0))
valid_dgdr_spec(dgdr)
assert dgdr.sla.ttft is None
assert dgdr.sla.itl is None
assert dgdr.sla.e2eLatency == 35000.0
@pytest.mark.pre_merge
@pytest.mark.gpu_0
def test_missing_ttft_and_itl_and_e2e_raises(self):
"""At least ttft+itl or e2eLatency must be provided."""
dgdr = _make_dgdr(sla=SLASpec(ttft=None, itl=None, e2eLatency=None))
with pytest.raises(ValueError, match="ttft.*itl.*e2eLatency"):
valid_dgdr_spec(dgdr)
# ---------------------------------------------------------------------------
# validate_dgdr_dynamo_features
# ---------------------------------------------------------------------------
class TestValidateDgdrDynamoFeatures:
@pytest.mark.pre_merge
@pytest.mark.gpu_0
def test_no_features_passes(self):
"""No features → no error."""
dgdr = _make_dgdr() dgdr = _make_dgdr()
run_gate_checks( validate_dgdr_dynamo_features(dgdr, aic_supported=False)
dgdr,
aic_supported=False,
search_strategy=SearchStrategy.RAPID,
backend="vllm",
)
@pytest.mark.pre_merge @pytest.mark.pre_merge
@pytest.mark.gpu_0 @pytest.mark.gpu_0
def test_planner_throughput_scaling_aic_unsupported_raises(self): def test_planner_throughput_scaling_aic_unsupported_rapid_sweep_raises(self):
"""Throughput-based planner scaling requires AIC support.""" """Throughput scaling + rapid sweep + AIC unsupported is rejected."""
dgdr = _make_dgdr( dgdr = _make_dgdr(
features=FeaturesSpec( features=FeaturesSpec(
planner=_make_planner( planner=_make_planner(
enable_throughput_scaling=True, enable_throughput_scaling=True,
pre_deployment_sweeping_mode=PlannerPreDeploymentSweepMode.Rapid,
backend="vllm", backend="vllm",
) )
) )
) )
with pytest.raises( with pytest.raises(ValueError, match="AIC does not support"):
ValueError, match="Throughput-based planner scaling requires AIC support" validate_dgdr_dynamo_features(dgdr, aic_supported=False)
):
run_gate_checks(
dgdr,
aic_supported=False,
search_strategy=SearchStrategy.RAPID,
backend="vllm",
)
@pytest.mark.pre_merge @pytest.mark.pre_merge
@pytest.mark.gpu_0 @pytest.mark.gpu_0
def test_planner_rapid_sweep_aic_unsupported_mutates_to_none(self): def test_planner_throughput_scaling_aic_supported_passes(self):
"""Rapid pre-deployment sweep falls back to None when AIC is unsupported.""" """Throughput scaling + rapid sweep + AIC supported is fine."""
planner = _make_planner( planner = _make_planner(
enable_throughput_scaling=False,
enable_load_scaling=True,
pre_deployment_sweeping_mode=PlannerPreDeploymentSweepMode.Rapid, pre_deployment_sweeping_mode=PlannerPreDeploymentSweepMode.Rapid,
backend="vllm",
) )
dgdr = _make_dgdr(features=FeaturesSpec(planner=planner)) dgdr = _make_dgdr(features=FeaturesSpec(planner=planner))
run_gate_checks( validate_dgdr_dynamo_features(dgdr, aic_supported=True)
dgdr,
aic_supported=False,
search_strategy=SearchStrategy.RAPID,
backend="vllm",
)
assert ( assert (
dgdr.features.planner.pre_deployment_sweeping_mode dgdr.features.planner.pre_deployment_sweeping_mode
== PlannerPreDeploymentSweepMode.None_ == PlannerPreDeploymentSweepMode.Rapid
) )
@pytest.mark.pre_merge @pytest.mark.pre_merge
@pytest.mark.gpu_0 @pytest.mark.gpu_0
def test_planner_aic_supported_no_mutation(self): def test_planner_load_scaling_only_aic_unsupported_passes(self):
"""When AIC is supported, planner config is left unchanged.""" """Load scaling only (no throughput scaling) + AIC unsupported passes."""
planner = _make_planner( planner = _make_planner(
enable_throughput_scaling=False,
enable_load_scaling=True,
pre_deployment_sweeping_mode=PlannerPreDeploymentSweepMode.Rapid, pre_deployment_sweeping_mode=PlannerPreDeploymentSweepMode.Rapid,
backend="vllm",
) )
dgdr = _make_dgdr(features=FeaturesSpec(planner=planner)) dgdr = _make_dgdr(features=FeaturesSpec(planner=planner))
run_gate_checks( validate_dgdr_dynamo_features(dgdr, aic_supported=False)
dgdr,
aic_supported=True,
search_strategy=SearchStrategy.RAPID,
backend="trtllm",
)
assert ( assert (
dgdr.features.planner.pre_deployment_sweeping_mode dgdr.features.planner.pre_deployment_sweeping_mode
== PlannerPreDeploymentSweepMode.Rapid == PlannerPreDeploymentSweepMode.Rapid
) )
@pytest.mark.pre_merge
@pytest.mark.gpu_0
def test_mocker_enabled_sweep_none_raises(self):
"""Mocker enabled + sweep mode None_ is rejected."""
dgdr = _make_dgdr(
features=FeaturesSpec(
planner=_make_planner(
enable_throughput_scaling=False,
enable_load_scaling=True,
pre_deployment_sweeping_mode=PlannerPreDeploymentSweepMode.None_,
),
mocker=MockerSpec(enabled=True),
)
)
with pytest.raises(ValueError, match="cannot be 'none'.*mocker"):
validate_dgdr_dynamo_features(dgdr, aic_supported=True)
@pytest.mark.pre_merge
@pytest.mark.gpu_0
def test_mocker_enabled_sweep_rapid_passes(self):
"""Mocker enabled + sweep mode Rapid is fine."""
dgdr = _make_dgdr(
features=FeaturesSpec(
planner=_make_planner(
enable_throughput_scaling=False,
enable_load_scaling=True,
pre_deployment_sweeping_mode=PlannerPreDeploymentSweepMode.Rapid,
),
mocker=MockerSpec(enabled=True),
)
)
validate_dgdr_dynamo_features(dgdr, aic_supported=True)
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# _write_final_output # _write_final_output
......
...@@ -186,9 +186,7 @@ class TestRapidUnsupported: ...@@ -186,9 +186,7 @@ class TestRapidUnsupported:
CONFIGS_DIR / "5b_rapid_unsupported_planner_throughput_error.yaml" CONFIGS_DIR / "5b_rapid_unsupported_planner_throughput_error.yaml"
) )
ops = _make_ops(tmp_path) ops = _make_ops(tmp_path)
with pytest.raises( with pytest.raises(ValueError, match="AIC does not support"):
ValueError, match="Throughput-based planner scaling requires AIC support"
):
asyncio.run(run_profile(dgdr, ops)) asyncio.run(run_profile(dgdr, ops))
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment