Unverified Commit 55a949cb authored by Hongkuan Zhou's avatar Hongkuan Zhou Committed by GitHub
Browse files

feat(profiler): wire mocker-rapid to direct AIC flags, drop profiler AIC interp (#8455)


Signed-off-by: default avatarhongkuanz <hongkuanz@nvidia.com>
Co-authored-by: default avatarClaude Opus 4.7 (1M context) <noreply@anthropic.com>
parent cc583b2f
......@@ -285,6 +285,16 @@ def parse_args(argv: list[str] | None = None) -> argparse.Namespace:
default=None,
help="AIC system name (e.g., 'h200_sxm'). Used with --aic-perf-model.",
)
parser.add_argument(
"--aic-backend",
type=str,
default=None,
choices=["vllm", "sglang", "trtllm"],
help="AIC backend name used for perf database lookups. When unset, "
"falls back to --engine-type. Set this to decouple the AIC perf model "
"from the simulated engine type (e.g. simulate with vllm while using "
"trtllm AIC data).",
)
parser.add_argument(
"--aic-backend-version",
type=str,
......
......@@ -58,7 +58,11 @@ def build_mocker_engine_args(args: argparse.Namespace) -> MockEngineArgs:
aic_moe_ep_size = None
aic_attention_dp_size = None
if getattr(args, "aic_perf_model", False):
aic_backend = getattr(args, "engine_type", None) or "vllm"
aic_backend = (
getattr(args, "aic_backend", None)
or getattr(args, "engine_type", None)
or "vllm"
)
aic_system = getattr(args, "aic_system", None)
aic_backend_version = getattr(args, "aic_backend_version", None)
aic_tp_size = getattr(args, "aic_tp_size", None)
......
......@@ -53,6 +53,7 @@ def make_args(**overrides):
"sglang_schedule_conservativeness": None,
"aic_perf_model": False,
"aic_system": None,
"aic_backend": None,
"aic_backend_version": None,
"aic_tp_size": None,
"aic_moe_tp_size": None,
......@@ -239,6 +240,22 @@ def test_build_mocker_engine_args_preserves_cli_mapped_fields(tmp_path):
assert "has_perf_model" not in payload
def test_aic_backend_override_decouples_from_engine_type():
args = make_args(
engine_type="vllm",
aic_perf_model=True,
aic_system="h200_sxm",
aic_backend="trtllm",
aic_tp_size=4,
)
engine_args = CONFIG.build_mocker_engine_args(args)
payload = json.loads(engine_args.dump_json())
assert payload["engine_type"] == "vllm"
assert payload["aic_backend"] == "trtllm"
def test_mock_engine_args_from_json_ignores_legacy_has_perf_model_field():
payload = {
"engine_type": "vllm",
......
......@@ -13,7 +13,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""Interpolation curve generation for planner pre-deployment sweeping."""
"""Real-GPU interpolation curve generation for planner thorough-mode sweeps.
Rapid-mode interpolation is no longer generated by the profiler: the planner
runs AIConfigurator in-process at bootstrap (``planner/monitoring/aic_interpolation.py``)
and the mocker pulls AIC perf data at runtime via ``--aic-perf-model`` flags
injected by :func:`dynamo.profiler.utils.dgd_generation.generate_mocker_config`.
This module only handles the thorough path (real GPUs → NPZ on disk).
"""
import logging
import os
......@@ -30,19 +37,12 @@ from dynamo.profiler.utils.config_modifiers.parallelization_mapping import (
)
from dynamo.profiler.utils.defaults import EngineType
from dynamo.profiler.utils.dgdr_v1beta1_types import DynamoGraphDeploymentRequestSpec
from dynamo.profiler.utils.estimate_perf import AIConfiguratorPerfEstimator
from dynamo.profiler.utils.profile_common import (
ProfilerOperationalConfig,
inject_tolerations_into_dgd,
)
from dynamo.profiler.utils.profile_decode import (
profile_decode,
profile_decode_aiconfigurator,
)
from dynamo.profiler.utils.profile_prefill import (
profile_prefill,
profile_prefill_aiconfigurator,
)
from dynamo.profiler.utils.profile_decode import profile_decode
from dynamo.profiler.utils.profile_prefill import profile_prefill
logger = logging.getLogger(__name__)
......@@ -53,19 +53,17 @@ async def run_interpolation(
disagg_config: dict,
best_prefill_config: PickedParallelConfig,
best_decode_config: PickedParallelConfig,
model: str,
system: str,
backend: str,
isl: int,
osl: int,
sweep_max_context_length: int,
deployment_clients: list[DynamoDeploymentClient],
job_tolerations: list | None = None,
) -> None:
"""Generate interpolation curves for the planner based on sweep mode.
"""Generate real-GPU interpolation curves for thorough-mode deployments.
Takes the output disagg DGD config and uses ``convert_config`` to strip
it down to standalone prefill / decode engines for profiling.
it down to standalone prefill / decode engines for profiling. Rapid mode
short-circuits here because its interpolation is now handled by the
planner (AIC in-process) and the mocker (``--aic-perf-model`` at runtime).
"""
planner_cfg = (
dgdr.features.planner if (dgdr.features and dgdr.features.planner) else None
......@@ -74,9 +72,11 @@ async def run_interpolation(
if planner_cfg and planner_cfg.pre_deployment_sweeping_mode:
sweep_mode = planner_cfg.pre_deployment_sweeping_mode
if sweep_mode == PlannerPreDeploymentSweepMode.None_:
if sweep_mode != PlannerPreDeploymentSweepMode.Thorough:
logger.info(
"Planner pre-deployment sweeping is disabled — skipping interpolation."
"Skipping real-GPU interpolation for sweep_mode=%s; rapid-mode "
"consumers (planner, mocker) use AIC at runtime.",
sweep_mode,
)
return
......@@ -97,22 +97,6 @@ async def run_interpolation(
with open(prefill_config_fn, "w") as f:
yaml.dump(prefill_config, f)
if sweep_mode == PlannerPreDeploymentSweepMode.Rapid:
logger.info("Using AIC simulation for prefill interpolation.")
estimator = AIConfiguratorPerfEstimator(
hf_id=model,
system=system.lower(),
backend=backend,
)
profile_prefill_aiconfigurator(
work_dir,
best_prefill_gpus,
sweep_max_context_length,
ops.prefill_interpolation_granularity,
estimator,
tp_size=best_prefill_config.tp_size,
)
elif sweep_mode == PlannerPreDeploymentSweepMode.Thorough:
logger.info("Using real GPUs for prefill interpolation.")
frontend_port = config_modifier.get_port(prefill_config)
client = DynamoDeploymentClient(
......@@ -161,30 +145,6 @@ async def run_interpolation(
with open(decode_config_fn, "w") as f:
yaml.dump(decode_config, f)
if sweep_mode == PlannerPreDeploymentSweepMode.Rapid:
logger.info("Using AIC simulation for decode interpolation.")
estimator = AIConfiguratorPerfEstimator(
hf_id=model,
system=system.lower(),
backend=backend,
)
attention_dp_size = best_decode_config.dp
max_kv_tokens = estimator.get_max_kv_tokens(
isl,
osl,
tp_size=best_decode_config.tp_size,
)
profile_decode_aiconfigurator(
work_dir,
best_decode_gpus,
max_kv_tokens,
sweep_max_context_length,
ops.decode_interpolation_granularity,
estimator,
attention_dp_size,
tp_size=best_decode_config.tp_size,
)
elif sweep_mode == PlannerPreDeploymentSweepMode.Thorough:
logger.info("Using real GPUs for decode interpolation.")
frontend_port = config_modifier.get_port(decode_config)
client = DynamoDeploymentClient(
......
......@@ -414,11 +414,7 @@ async def run_profile(
dgd_config,
best_prefill_config,
best_decode_config,
model,
system,
resolved_backend,
isl,
osl,
sweep_max_context_length,
deployment_clients,
job_tolerations=job_tolerations,
......
......@@ -14,12 +14,14 @@ try:
)
from dynamo.profiler.utils.dgd_generation import (
_build_planner_config,
_inject_mocker_aic_args,
build_aic_interpolation_spec,
enable_vllm_benchmark_mode,
)
from dynamo.profiler.utils.dgdr_v1beta1_types import (
DynamoGraphDeploymentRequestSpec,
FeaturesSpec,
MockerSpec,
)
except ImportError as e:
pytest.skip(f"Missing dependency: {e}", allow_module_level=True)
......@@ -34,8 +36,14 @@ pytestmark = [
def _dgdr(
planner: PlannerConfig | None = None,
model: str = "Qwen/Qwen3-32B",
mocker_enabled: bool = False,
) -> DynamoGraphDeploymentRequestSpec:
features = FeaturesSpec(planner=planner) if planner else None
features = None
if planner is not None or mocker_enabled:
features = FeaturesSpec(
planner=planner,
mocker=MockerSpec(enabled=True) if mocker_enabled else None,
)
return DynamoGraphDeploymentRequestSpec(model=model, features=features)
......@@ -165,6 +173,74 @@ class TestBuildAICInterpolationSpec:
)
assert got is None
def test_mocker_rapid_without_throughput_scaling_produces_spec(self):
"""Mocker-only consumer still gets an AIC spec so --aic-* flags can be
injected on its worker args."""
planner = PlannerConfig(
enable_throughput_scaling=False,
enable_load_scaling=True,
pre_deployment_sweeping_mode=PlannerPreDeploymentSweepMode.Rapid,
)
dgdr = _dgdr(planner=planner, mocker_enabled=True)
pick = PickedParallelConfig(tp=1, dp=8, moe_ep=8)
spec = build_aic_interpolation_spec(
dgdr,
best_prefill_pick=pick,
best_decode_pick=pick,
isl=1000,
osl=100,
sweep_max_context_length=4096,
resolved_backend="trtllm",
system="h200_sxm",
prefill_interpolation_granularity=8,
decode_interpolation_granularity=4,
)
assert isinstance(spec, AICInterpolationSpec)
assert spec.backend == "trtllm"
class TestInjectMockerAicArgs:
def _spec(self, backend: str = "trtllm") -> AICInterpolationSpec:
pick = PickedParallelConfig(tp=1, dp=8, moe_tp=1, moe_ep=8)
return AICInterpolationSpec(
hf_id="Qwen/Qwen3-235B",
system="h200_sxm",
backend=backend,
isl=1000,
osl=100,
sweep_max_context_length=4096,
prefill_interpolation_granularity=8,
decode_interpolation_granularity=4,
prefill_pick=pick,
decode_pick=pick,
)
def test_injects_all_required_flags(self):
spec = self._spec("trtllm")
args = ["--model-path", "Qwen/Qwen3-235B", "--disaggregation-mode", "prefill"]
out = _inject_mocker_aic_args(args, spec, spec.prefill_pick)
assert "--aic-perf-model" in out
assert out[out.index("--aic-backend") + 1] == "trtllm"
assert out[out.index("--aic-system") + 1] == "h200_sxm"
assert out[out.index("--aic-tp-size") + 1] == "1"
assert out[out.index("--aic-moe-tp-size") + 1] == "1"
assert out[out.index("--aic-moe-ep-size") + 1] == "8"
assert out[out.index("--aic-attention-dp-size") + 1] == "8"
# trtllm is not a mocker engine_type; leave --engine-type alone.
assert "--engine-type" not in out
def test_matches_engine_type_for_vllm(self):
spec = self._spec("vllm")
out = _inject_mocker_aic_args([], spec, spec.prefill_pick)
assert out[out.index("--engine-type") + 1] == "vllm"
assert out[out.index("--aic-backend") + 1] == "vllm"
def test_matches_engine_type_for_sglang(self):
spec = self._spec("sglang")
out = _inject_mocker_aic_args([], spec, spec.decode_pick)
assert out[out.index("--engine-type") + 1] == "sglang"
assert out[out.index("--aic-backend") + 1] == "sglang"
class TestBuildPlannerConfigEmbedsAicSpec:
def test_spec_threads_into_planner_config(self):
......@@ -232,6 +308,32 @@ class TestNeedsProfileDataRapid:
dgdr = _dgdr(planner=planner)
assert needs_profile_data(dgdr) is True
def test_mocker_rapid_returns_false(self):
"""Mocker + rapid: mocker pulls AIC perf data at runtime; no NPZ files."""
from dynamo.profiler.utils.profile_common import needs_profile_data
planner = PlannerConfig(
enable_throughput_scaling=True,
enable_load_scaling=False,
optimization_target="sla",
pre_deployment_sweeping_mode=PlannerPreDeploymentSweepMode.Rapid,
)
dgdr = _dgdr(planner=planner, mocker_enabled=True)
assert needs_profile_data(dgdr) is False
def test_mocker_thorough_returns_true(self):
"""Mocker + thorough: mocker consumes real-GPU NPZ."""
from dynamo.profiler.utils.profile_common import needs_profile_data
planner = PlannerConfig(
enable_throughput_scaling=True,
enable_load_scaling=False,
optimization_target="sla",
pre_deployment_sweeping_mode=PlannerPreDeploymentSweepMode.Thorough,
)
dgdr = _dgdr(planner=planner, mocker_enabled=True)
assert needs_profile_data(dgdr) is True
def _benchmark_mode(svc: dict) -> str | None:
env = svc.get("extraPodSpec", {}).get("mainContainer", {}).get("env", [])
......
......@@ -552,8 +552,10 @@ class TestAssembleFinalConfig:
@pytest.mark.pre_merge
@pytest.mark.gpu_0
def test_mocker_plus_planner_uses_mocker_base(self, tmp_path):
"""Mocker + planner: mocker base is created first, then planner layered."""
def test_mocker_plus_planner_rapid_skips_profile_cm(self, tmp_path):
"""Mocker + planner + rapid: mocker base is created first, then planner
layered. The profile-data ConfigMap is NOT emitted because the mocker
pulls AIC perf data at runtime via --aic-perf-model flags."""
dgdr = _make_dgdr(
features=FeaturesSpec(
planner=_make_planner(),
......@@ -565,7 +567,6 @@ class TestAssembleFinalConfig:
dgd_config = {"kind": "DGD"}
mocker_base = {"kind": "MockerDGD", "spec": {"services": {}}}
planner_cm = {"kind": "ConfigMap", "metadata": {"name": "planner-cm"}}
profile_cm = {"kind": "ConfigMap", "metadata": {"name": "profile-cm"}}
with (
patch(
......@@ -578,8 +579,7 @@ class TestAssembleFinalConfig:
) as mock_planner,
patch(
f"{_DGD_GEN}.add_profile_data_to_config",
return_value=profile_cm,
),
) as mock_profile,
):
result = assemble_final_config(
dgdr,
......@@ -591,7 +591,53 @@ class TestAssembleFinalConfig:
mock_mocker.assert_called_once()
mock_planner.assert_called_once()
mock_profile.assert_not_called()
assert mock_planner.call_args.args[1] is mocker_base
assert result == [planner_cm, mocker_base]
@pytest.mark.pre_merge
@pytest.mark.gpu_0
def test_mocker_plus_planner_thorough_keeps_profile_cm(self, tmp_path):
"""Mocker + planner + thorough: the profile-data ConfigMap is emitted
so both planner and mocker can consume the real-GPU NPZ files."""
dgdr = _make_dgdr(
features=FeaturesSpec(
planner=_make_planner(
pre_deployment_sweeping_mode=PlannerPreDeploymentSweepMode.Thorough,
),
mocker=MockerSpec(enabled=True),
)
)
ops = _make_ops(tmp_path)
os.makedirs(ops.output_dir, exist_ok=True)
dgd_config = {"kind": "DGD"}
mocker_base = {"kind": "MockerDGD", "spec": {"services": {}}}
planner_cm = {"kind": "ConfigMap", "metadata": {"name": "planner-cm"}}
profile_cm = {"kind": "ConfigMap", "metadata": {"name": "profile-cm"}}
with (
patch(
f"{_DGD_GEN}.generate_mocker_config",
return_value=mocker_base,
),
patch(
f"{_DGD_GEN}.add_planner_to_config",
return_value=planner_cm,
),
patch(
f"{_DGD_GEN}.add_profile_data_to_config",
return_value=profile_cm,
) as mock_profile,
):
result = assemble_final_config(
dgdr,
ops,
dgd_config,
PickedParallelConfig(tp=1),
PickedParallelConfig(tp=1),
)
mock_profile.assert_called_once()
assert result == [planner_cm, profile_cm, mocker_base]
@pytest.mark.pre_merge
......@@ -1058,9 +1104,9 @@ class TestRunProfileSkipsInterpolationForAggConfig:
# run_interpolation must be called, and with the resolved 'vllm' backend, not 'auto'
mock_interp.assert_called_once()
call_kwargs = mock_interp.call_args
# backend is the 8th positional argument (index 7)
# backend is the 6th positional argument (index 5)
called_backend = (
call_kwargs.args[7]
call_kwargs.args[5]
if call_kwargs.args
else call_kwargs.kwargs.get("backend")
)
......
......@@ -28,7 +28,10 @@ from dynamo.planner.config.backend_components import (
MockerComponentName,
VllmComponentName,
)
from dynamo.planner.config.parallelization import PickedParallelConfig
from dynamo.planner.config.parallelization import (
PickedParallelConfig,
picked_to_aic_model_config_kwargs,
)
from dynamo.planner.config.planner_config import (
PlannerConfig,
PlannerPreDeploymentSweepMode,
......@@ -107,7 +110,7 @@ def assemble_final_config(
# Step 1: choose base config
if mocker:
logger.info("Mocker enabled — using mocker DGD as base.")
base = generate_mocker_config(dgdr)
base = generate_mocker_config(dgdr, aic_spec=aic_spec)
else:
base = dgd_config
......@@ -189,9 +192,16 @@ def enable_vllm_benchmark_mode(config_dict: dict) -> None:
)
def generate_mocker_config(dgdr) -> dict:
def generate_mocker_config(
dgdr, aic_spec: Optional[AICInterpolationSpec] = None
) -> dict:
"""Load the mocker DGD template and apply DGDR images and model paths.
When ``aic_spec`` is provided (planner-rapid with an AIC-supported backend),
inject ``--aic-perf-model`` plus related flags onto the prefill/decode
workers so each mocker pod pulls its latency model directly from the
AIConfigurator SDK at runtime — no NPZ round-trip through the profiler.
Returns:
The mocker DGD config dict (no planner, no ConfigMaps).
"""
......@@ -212,6 +222,7 @@ def generate_mocker_config(dgdr) -> dict:
service_config["extraPodSpec"]["mainContainer"]["image"] = image
model = dgdr.model
aic_workers = _mocker_aic_worker_picks(aic_spec)
for worker_name in _mocker_worker_names():
service_config = (
mocker_config.get("spec", {}).get("services", {}).get(worker_name)
......@@ -223,11 +234,56 @@ def generate_mocker_config(dgdr) -> dict:
args_list = main_container.get("args", [])
args_list = set_argument_value(args_list, "--model-path", model)
args_list = set_argument_value(args_list, "--model-name", model)
pick = aic_workers.get(worker_name) if aic_workers else None
if pick is not None and aic_spec is not None:
args_list = _inject_mocker_aic_args(args_list, aic_spec, pick)
main_container["args"] = args_list
return mocker_config
def _mocker_aic_worker_picks(
aic_spec: Optional[AICInterpolationSpec],
) -> Optional[dict[str, PickedParallelConfig]]:
if aic_spec is None:
return None
return {
MockerComponentName.prefill_worker_k8s_name: aic_spec.prefill_pick,
MockerComponentName.decode_worker_k8s_name: aic_spec.decode_pick,
}
def _inject_mocker_aic_args(
args_list: list,
aic_spec: AICInterpolationSpec,
pick: PickedParallelConfig,
) -> list:
"""Inject ``--aic-*`` flags onto a single mocker worker's args list.
The mocker simulates vllm/sglang scheduling; for trtllm AIC data we keep
the default ``--engine-type`` and only override ``--aic-backend`` so the
perf-model lookups point at the correct database.
"""
kwargs = picked_to_aic_model_config_kwargs(pick)
if "--aic-perf-model" not in args_list:
args_list.append("--aic-perf-model")
args_list = set_argument_value(args_list, "--aic-backend", aic_spec.backend)
args_list = set_argument_value(args_list, "--aic-system", aic_spec.system)
args_list = set_argument_value(args_list, "--aic-tp-size", str(kwargs["tp_size"]))
args_list = set_argument_value(
args_list, "--aic-moe-tp-size", str(kwargs["moe_tp_size"])
)
args_list = set_argument_value(
args_list, "--aic-moe-ep-size", str(kwargs["moe_ep_size"])
)
args_list = set_argument_value(
args_list, "--aic-attention-dp-size", str(kwargs["attention_dp_size"])
)
if aic_spec.backend in ("vllm", "sglang"):
args_list = set_argument_value(args_list, "--engine-type", aic_spec.backend)
return args_list
def add_planner_to_config(
dgdr,
config_dict: dict,
......@@ -452,28 +508,37 @@ def build_aic_interpolation_spec(
prefill_interpolation_granularity: int,
decode_interpolation_granularity: int,
) -> Optional[AICInterpolationSpec]:
"""Build an ``AICInterpolationSpec`` for the planner in rapid mode.
"""Build an ``AICInterpolationSpec`` for rapid-mode AIC consumers.
Returns ``None`` (the planner falls through to the file-based loader) when
any of the following hold:
Consumed by both the planner (to bootstrap perf models in-process) and
the mocker (via ``--aic-perf-model`` flags injected into worker args).
Returns ``None`` when any of the following hold:
* planner is not enabled
* no AIC consumer needs it — planner is disabled or has
``enable_throughput_scaling=False``, **and** mocker is disabled
* ``pre_deployment_sweeping_mode`` is not ``Rapid``
* ``throughput_scaling`` is disabled (no pre-deployment data needed)
* picks are missing
* ``resolved_backend`` is not one AIC supports as a planner bootstrap source
* ``resolved_backend`` is not one AIC supports
"""
if not is_planner_enabled(dgdr):
return None
planner = dgdr.features.planner # type: ignore[union-attr]
if not planner.enable_throughput_scaling:
planner = (
dgdr.features.planner # type: ignore[union-attr]
if dgdr.features is not None and dgdr.features.planner is not None
else None
)
mocker_enabled = is_mocker_enabled(dgdr)
planner_needs_aic = (
is_planner_enabled(dgdr)
and planner is not None
and planner.enable_throughput_scaling
)
if not planner_needs_aic and not mocker_enabled:
return None
if planner.pre_deployment_sweeping_mode != PlannerPreDeploymentSweepMode.Rapid:
sweep_mode = planner.pre_deployment_sweeping_mode if planner is not None else None
if sweep_mode != PlannerPreDeploymentSweepMode.Rapid:
return None
if best_prefill_pick is None or best_decode_pick is None:
logger.info(
"Rapid mode but picks are missing; skipping aic_interpolation spec. "
"Planner will fall back to the file-based loader."
"Rapid mode but picks are missing; skipping aic_interpolation spec."
)
return None
if resolved_backend not in ("trtllm", "vllm", "sglang"):
......
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
"""Profiler-side shim for AIConfiguratorPerfEstimator.
The real implementation has moved to
``dynamo.planner.monitoring.aic_estimator`` so the planner can run AIC
interpolation in-process at bootstrap time. This module re-exports the
estimator for back-compat with existing profiler callers.
"""
from dynamo.planner.monitoring.aic_estimator import AIConfiguratorPerfEstimator
__all__ = ["AIConfiguratorPerfEstimator"]
......@@ -189,22 +189,29 @@ def needs_profile_data(dgdr: DynamoGraphDeploymentRequestSpec) -> bool:
Profile data (NPZ/JSON on disk) is consumed by:
* **Mocker workers** for latency simulation — always required when
mocker is enabled.
* **Mocker workers** for latency simulation — required for thorough
mode. In rapid mode the mocker pulls latency data directly from the
AIConfigurator SDK via ``--aic-perf-model`` flags injected by the
profiler, so no NPZ is emitted.
* **Planner** when throughput scaling is enabled — required for
thorough mode only. In rapid mode the planner now runs AIC
interpolation in-process at bootstrap (see ``aic_interpolation.py``),
so the profiler no longer emits NPZ for planner-only rapid deployments.
thorough mode only. In rapid mode the planner runs AIC interpolation
in-process at bootstrap (see ``aic_interpolation.py``), so the
profiler no longer emits NPZ for planner rapid deployments either.
"""
sweep_mode = (
dgdr.features.planner.pre_deployment_sweeping_mode
if dgdr.features is not None and dgdr.features.planner is not None
else None
)
is_rapid = sweep_mode == PlannerPreDeploymentSweepMode.Rapid
if is_mocker_enabled(dgdr):
return True
return not is_rapid
if (
dgdr.features is not None
and dgdr.features.planner is not None
and dgdr.features.planner.enable_throughput_scaling
):
sweep_mode = dgdr.features.planner.pre_deployment_sweeping_mode
return sweep_mode != PlannerPreDeploymentSweepMode.Rapid
return not is_rapid
return False
......
......@@ -2,13 +2,12 @@
# SPDX-License-Identifier: Apache-2.0
import logging
from typing import Any, Callable, Optional, Tuple
from typing import Callable, Optional, Tuple
import numpy as np
from dynamo.profiler.utils.aiperf import get_decode_itl_and_thpt_per_gpu
from dynamo.profiler.utils.defaults import DECODE_MAX_CONCURRENCY
from dynamo.profiler.utils.estimate_perf import AIConfiguratorPerfEstimator
from dynamo.profiler.utils.plot import plot_decode_3d_surface
logger = logging.getLogger(__name__)
......@@ -137,34 +136,3 @@ def profile_decode(
get_itl_and_thpt_per_gpu,
attention_dp_size,
)
def profile_decode_aiconfigurator(
work_dir: str,
num_gpus: int,
max_kv_tokens: int,
max_context_length: int,
interpolation_granularity: int,
ai_configurator_perf_estimator: AIConfiguratorPerfEstimator,
attention_dp_size: int,
**model_config_kwargs: Any,
) -> None:
def get_itl_and_thpt_per_gpu(isl, osl, num_request):
perf_dict = ai_configurator_perf_estimator.estimate_perf(
isl,
osl,
num_request,
mode="decode",
**model_config_kwargs,
)
return perf_dict["tpot"], perf_dict["tokens/s/gpu"]
return _profile_decode_helper(
work_dir,
num_gpus,
max_kv_tokens,
max_context_length,
interpolation_granularity,
get_itl_and_thpt_per_gpu,
attention_dp_size,
)
......@@ -2,12 +2,11 @@
# SPDX-License-Identifier: Apache-2.0
import logging
from typing import Any, Callable, Optional
from typing import Callable, Optional
import numpy as np
from dynamo.profiler.utils.aiperf import get_prefill_ttft
from dynamo.profiler.utils.estimate_perf import AIConfiguratorPerfEstimator
from dynamo.profiler.utils.plot import plot_prefill_interpolation
logger = logging.getLogger(__name__)
......@@ -110,30 +109,3 @@ def profile_prefill(
get_ttft,
attention_dp_size=attention_dp_size,
)
def profile_prefill_aiconfigurator(
work_dir: str,
num_gpus: int,
max_context_length: int,
interpolation_granularity: int,
ai_configurator_perf_estimator: AIConfiguratorPerfEstimator,
**model_config_kwargs: Any,
) -> None:
def get_ttft(isl):
perf_dict = ai_configurator_perf_estimator.estimate_prefill_perf(
isl,
**model_config_kwargs,
)
ttft = perf_dict["context_latency"]
logger.info(f"Estimated prefill TTFT: {ttft:.2f}ms")
return ttft
return _profile_prefill_helper(
work_dir,
num_gpus,
max_context_length,
interpolation_granularity,
get_ttft,
)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment