Unverified Commit 7c1a6238 authored by Hongkuan Zhou's avatar Hongkuan Zhou Committed by GitHub
Browse files

fix: refactor profiler's dgd generation workflow to correctly generate mocker config (#6848)


Signed-off-by: default avatarhongkuanz <hongkuanz@nvidia.com>
parent 01baf4a3
...@@ -32,7 +32,7 @@ from dynamo.profiler.utils.config_modifiers.parallelization_mapping import ( ...@@ -32,7 +32,7 @@ from dynamo.profiler.utils.config_modifiers.parallelization_mapping import (
) )
from dynamo.profiler.utils.config_modifiers.protocol import apply_dgd_overrides from dynamo.profiler.utils.config_modifiers.protocol import apply_dgd_overrides
from dynamo.profiler.utils.defaults import SearchStrategy from dynamo.profiler.utils.defaults import SearchStrategy
from dynamo.profiler.utils.dgd_generation import generate_dgd_config_with_planner from dynamo.profiler.utils.dgd_generation import assemble_final_config
from dynamo.profiler.utils.dgdr_v1beta1_types import ( from dynamo.profiler.utils.dgdr_v1beta1_types import (
BackendType, BackendType,
DynamoGraphDeploymentRequestSpec, DynamoGraphDeploymentRequestSpec,
...@@ -44,7 +44,7 @@ from dynamo.profiler.utils.dgdr_validate import ( ...@@ -44,7 +44,7 @@ from dynamo.profiler.utils.dgdr_validate import (
from dynamo.profiler.utils.profile_common import ( from dynamo.profiler.utils.profile_common import (
ProfilerOperationalConfig, ProfilerOperationalConfig,
determine_picking_mode, determine_picking_mode,
is_planner_enabled, needs_profile_data,
picked_config_from_row, picked_config_from_row,
resolve_model_path, resolve_model_path,
warn_and_update_sla, warn_and_update_sla,
...@@ -67,27 +67,6 @@ def _check_auto_backend_support(model: str, system: str) -> bool: ...@@ -67,27 +67,6 @@ def _check_auto_backend_support(model: str, system: str) -> bool:
) )
def _needs_interpolation(dgdr: DynamoGraphDeploymentRequestSpec) -> bool:
"""True when interpolation data will actually be consumed.
Only throughput-based scaling and the mocker backend use the
per-engine performance curves produced by ``run_interpolation``.
Load-based scaling does not require them.
"""
if dgdr.features is None:
return False
planner = dgdr.features.planner
if planner and planner.enable_throughput_scaling:
return True
mocker = dgdr.features.mocker
if mocker and mocker.enabled:
return True
return False
def _extract_profiler_params(dgdr: DynamoGraphDeploymentRequestSpec) -> tuple: def _extract_profiler_params(dgdr: DynamoGraphDeploymentRequestSpec) -> tuple:
"""Pull all profiler parameters from dgdr and log them.""" """Pull all profiler parameters from dgdr and log them."""
model = dgdr.model model = dgdr.model
...@@ -236,40 +215,6 @@ async def _execute_strategy( ...@@ -236,40 +215,6 @@ async def _execute_strategy(
) )
def _assemble_final_config(
dgdr: DynamoGraphDeploymentRequestSpec,
ops: ProfilerOperationalConfig,
dgd_config: dict | None,
best_prefill_config: PickedParallelConfig,
best_decode_config: PickedParallelConfig,
) -> Any:
"""Handle mocker/planner branching and return the final DGD config."""
mocker_enabled = (
dgdr.features is not None
and dgdr.features.mocker is not None
and dgdr.features.mocker.enabled
)
if dgd_config and (is_planner_enabled(dgdr) or mocker_enabled):
dgd_config_path = f"{ops.output_dir}/picked_dgd_config.yaml"
with open(dgd_config_path, "w") as f:
yaml.safe_dump(dgd_config, f, sort_keys=False)
real_config, mocker_config = generate_dgd_config_with_planner(
dgdr=dgdr,
config_path=dgd_config_path,
output_dir=ops.output_dir if not ops.dry_run else None,
best_prefill_mapping=best_prefill_config,
best_decode_mapping=best_decode_config,
)
if mocker_enabled:
logger.info("Mocker enabled — using mocker DGD config.")
return mocker_config
return real_config
return dgd_config
def _write_final_output(ops: ProfilerOperationalConfig, final_config: Any) -> bool: def _write_final_output(ops: ProfilerOperationalConfig, final_config: Any) -> bool:
"""Write final_config.yaml and profiler status. Returns False on unrecoverable failure.""" """Write final_config.yaml and profiler status. Returns False on unrecoverable failure."""
output_file = f"{ops.output_dir}/final_config.yaml" output_file = f"{ops.output_dir}/final_config.yaml"
...@@ -384,7 +329,7 @@ async def run_profile( ...@@ -384,7 +329,7 @@ async def run_profile(
# Interpolation curves — only needed when something consumes # Interpolation curves — only needed when something consumes
# the per-engine performance data (throughput scaling or mocker). # the per-engine performance data (throughput scaling or mocker).
# --------------------------------------------------------------- # ---------------------------------------------------------------
if not ops.dry_run and dgd_config and _needs_interpolation(dgdr): if not ops.dry_run and dgd_config and needs_profile_data(dgdr):
try: try:
model_cfg = get_model_config_from_model_path(resolve_model_path(dgdr)) model_cfg = get_model_config_from_model_path(resolve_model_path(dgdr))
sweep_max_context_length = model_cfg.get("max_position_embeddings", 0) sweep_max_context_length = model_cfg.get("max_position_embeddings", 0)
...@@ -412,7 +357,7 @@ async def run_profile( ...@@ -412,7 +357,7 @@ async def run_profile(
# --------------------------------------------------------------- # ---------------------------------------------------------------
# Final DGD assembly # Final DGD assembly
# --------------------------------------------------------------- # ---------------------------------------------------------------
final_config = _assemble_final_config( final_config = assemble_final_config(
dgdr, ops, dgd_config, best_prefill_config, best_decode_config dgdr, ops, dgd_config, best_prefill_config, best_decode_config
) )
......
...@@ -13,8 +13,8 @@ ...@@ -13,8 +13,8 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import copy
import json import json
import logging
import os import os
import uuid import uuid
from typing import Any, Optional from typing import Any, Optional
...@@ -25,12 +25,16 @@ import yaml ...@@ -25,12 +25,16 @@ import yaml
from dynamo.common.utils.paths import get_workspace_dir from dynamo.common.utils.paths import get_workspace_dir
from dynamo.planner.defaults import MockerComponentName from dynamo.planner.defaults import MockerComponentName
from dynamo.planner.utils.planner_config import PlannerConfig from dynamo.planner.utils.planner_config import PlannerConfig
from dynamo.profiler.utils.config import ( from dynamo.profiler.utils.config import DgdPlannerServiceConfig, set_argument_value
Config, from dynamo.profiler.utils.profile_common import (
DgdPlannerServiceConfig, ProfilerOperationalConfig,
set_argument_value, is_mocker_enabled,
is_planner_enabled,
needs_profile_data,
) )
logger = logging.getLogger(__name__)
# Path to mocker disagg config relative to workspace # Path to mocker disagg config relative to workspace
MOCKER_DISAGG_CONFIG_PATH = "examples/backends/mocker/deploy/disagg.yaml" MOCKER_DISAGG_CONFIG_PATH = "examples/backends/mocker/deploy/disagg.yaml"
...@@ -39,90 +43,151 @@ MOCKER_DISAGG_CONFIG_PATH = "examples/backends/mocker/deploy/disagg.yaml" ...@@ -39,90 +43,151 @@ MOCKER_DISAGG_CONFIG_PATH = "examples/backends/mocker/deploy/disagg.yaml"
PLANNER_CONFIG_PREFIX = "planner-config" PLANNER_CONFIG_PREFIX = "planner-config"
PLANNER_PROFILE_DATA_PREFIX = "planner-profile-data" PLANNER_PROFILE_DATA_PREFIX = "planner-profile-data"
# Well-known mount paths inside pods
PROFILE_DATA_MOUNT = f"{get_workspace_dir()}/profiling_results"
PLANNER_CONFIG_MOUNT = f"{get_workspace_dir()}/planner_config"
def _make_cm_name(prefix: str) -> str: def _make_cm_name(prefix: str) -> str:
return f"{prefix}-{uuid.uuid4().hex[:4]}" return f"{prefix}-{uuid.uuid4().hex[:4]}"
def generate_dgd_config_with_planner( # ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
def assemble_final_config(
dgdr, dgdr,
config_path: str, ops: ProfilerOperationalConfig,
output_dir: str | None, dgd_config: dict | None,
best_prefill_config=None,
best_decode_config=None,
) -> Any:
"""Apply Dynamo features to the picked DGD config via composable layers.
1. **Mocker** — swap the base to the mocker DGD template if enabled.
2. **Planner** — inject the Planner service + planner-config ConfigMap.
3. **Profile data** — attach interpolation-data ConfigMap when mocker
or planner-throughput is enabled.
"""
if not dgd_config:
return dgd_config
mocker = is_mocker_enabled(dgdr)
planner = is_planner_enabled(dgdr)
profile = needs_profile_data(dgdr)
if not mocker and not planner:
return dgd_config
# Save picked config for auditing
dgd_config_path = f"{ops.output_dir}/picked_dgd_config.yaml"
with open(dgd_config_path, "w") as f:
yaml.safe_dump(dgd_config, f, sort_keys=False)
# Step 1: choose base config
if mocker:
logger.info("Mocker enabled — using mocker DGD as base.")
base = generate_mocker_config(dgdr)
else:
base = dgd_config
# Steps 2-3: layer features, collecting ConfigMaps
config_maps: list[dict] = []
if planner:
planner_cm = add_planner_to_config(
dgdr,
base,
best_prefill_mapping=best_prefill_config,
best_decode_mapping=best_decode_config,
)
config_maps.append(planner_cm)
if profile:
output_dir = ops.output_dir if not ops.dry_run else None
profile_cm = add_profile_data_to_config(base, output_dir)
if profile_cm:
config_maps.append(profile_cm)
if config_maps:
return config_maps + [base]
return base
def generate_mocker_config(dgdr) -> dict:
"""Load the mocker DGD template and apply DGDR images and model paths.
Returns:
The mocker DGD config dict (no planner, no ConfigMaps).
"""
workspace_dir = get_workspace_dir()
mocker_config_path = os.path.join(workspace_dir, MOCKER_DISAGG_CONFIG_PATH)
with open(mocker_config_path, "r") as f:
mocker_config = yaml.safe_load(f)
image = dgdr.image
if image:
for service_config in (
mocker_config.get("spec", {}).get("services", {}).values()
):
if service_config.get("extraPodSpec") and service_config[
"extraPodSpec"
].get("mainContainer"):
service_config["extraPodSpec"]["mainContainer"]["image"] = image
model = dgdr.model
for worker_name in _mocker_worker_names():
service_config = (
mocker_config.get("spec", {}).get("services", {}).get(worker_name)
)
if service_config:
main_container = service_config.get("extraPodSpec", {}).get(
"mainContainer", {}
)
args_list = main_container.get("args", [])
args_list = set_argument_value(args_list, "--model-path", model)
args_list = set_argument_value(args_list, "--model-name", model)
main_container["args"] = args_list
return mocker_config
def add_planner_to_config(
dgdr,
config_dict: dict,
best_prefill_mapping=None, best_prefill_mapping=None,
best_decode_mapping=None, best_decode_mapping=None,
) -> tuple[list[dict] | dict, list[dict] | dict]: ) -> dict:
"""Generate DGD config with planner based on profiling results. """Add a Planner service and its planner-config ConfigMap to *config_dict*.
The ``config_path`` should point to a DGD YAML that already has the The planner's ``profile_results_dir`` is always set to the well-known
correct parallelization and image applied (produced by AIC's generator mount path so the pod knows where to look when profile data is
pipeline). This function loads it, adds the planner service (with mounted separately by :func:`add_profile_data_to_config`.
profiling data ConfigMap if available), and produces the final
deployable DGD.
Args: Args:
dgdr: DynamoGraphDeploymentRequestSpec. dgdr: DynamoGraphDeploymentRequestSpec.
config_path: Path to the picked DGD YAML config file (already has config_dict: The base DGD config (real or mocker) — mutated in place.
correct parallelization, replicas, and image). best_prefill_mapping: Picked prefill parallel config.
output_dir: Output directory containing profiling interpolation data. best_decode_mapping: Picked decode parallel config.
best_prefill_mapping: Picked prefill parallel config (PickedParallelConfig).
Used only for ``prefill_engine_num_gpu`` in PlannerConfig.
best_decode_mapping: Picked decode parallel config (PickedParallelConfig).
Used only for ``decode_engine_num_gpu`` in PlannerConfig.
Returns: Returns:
tuple: (dgd_config, mocker_config) The ``planner_config_cm`` ConfigMap dict.
""" """
with open(config_path, "r") as f: planner_cfg = _build_planner_config(dgdr, best_prefill_mapping, best_decode_mapping)
raw = yaml.safe_load(f) planner_cfg.profile_results_dir = PROFILE_DATA_MOUNT
config = Config.model_validate(raw)
# --- Build PlannerConfig ---
planner_cfg = _build_planner_config(
dgdr,
best_prefill_mapping,
best_decode_mapping,
)
# --- Add planner service to DGD ---
planner_service = DgdPlannerServiceConfig() planner_service = DgdPlannerServiceConfig()
if planner_service.extraPodSpec.mainContainer: if planner_service.extraPodSpec.mainContainer:
planner_service.extraPodSpec.mainContainer.image = dgdr.image planner_service.extraPodSpec.mainContainer.image = dgdr.image
planner_dict = planner_service.model_dump(exclude_unset=False) planner_dict = planner_service.model_dump(exclude_unset=False)
config_dict = config.model_dump(exclude_unset=False)
planner_config_cm_name = _make_cm_name(PLANNER_CONFIG_PREFIX) planner_config_cm_name = _make_cm_name(PLANNER_CONFIG_PREFIX)
profile_data_cm_name = _make_cm_name(PLANNER_PROFILE_DATA_PREFIX)
profile_data_mount = f"{get_workspace_dir()}/profiling_results"
planner_config_mount = f"{get_workspace_dir()}/planner_config"
# --- ConfigMap 1: profiling interpolation data ---
profile_data_cm: Optional[dict] = None
profiling_data = _load_profiling_data(output_dir) if output_dir else {}
if profiling_data:
planner_cfg.profile_results_dir = profile_data_mount
profile_cm_data: dict[str, str] = {}
# TODO: use enums
if profiling_data.get("prefill"):
profile_cm_data["prefill_raw_data.json"] = json.dumps(
profiling_data["prefill"]
)
if profiling_data.get("decode"):
profile_cm_data["decode_raw_data.json"] = json.dumps(
profiling_data["decode"]
)
profile_data_cm = { # --- ConfigMap: planner config ---
"apiVersion": "v1",
"kind": "ConfigMap",
"metadata": {"name": profile_data_cm_name},
"data": profile_cm_data,
}
# --- ConfigMap 2: planner config ---
planner_config_cm = { planner_config_cm = {
"apiVersion": "v1", "apiVersion": "v1",
"kind": "ConfigMap", "kind": "ConfigMap",
...@@ -132,7 +197,7 @@ def generate_dgd_config_with_planner( ...@@ -132,7 +197,7 @@ def generate_dgd_config_with_planner(
}, },
} }
# --- Mount both ConfigMaps into the planner service --- # --- Mount planner-config ConfigMap into the planner service ---
planner_volumes = planner_dict.setdefault("extraPodSpec", {}).setdefault( planner_volumes = planner_dict.setdefault("extraPodSpec", {}).setdefault(
"volumes", [] "volumes", []
) )
...@@ -141,7 +206,6 @@ def generate_dgd_config_with_planner( ...@@ -141,7 +206,6 @@ def generate_dgd_config_with_planner(
) )
mc_mounts = mc_dict.setdefault("volumeMounts", []) mc_mounts = mc_dict.setdefault("volumeMounts", [])
# Planner config volume
planner_volumes.append( planner_volumes.append(
{ {
"name": planner_config_cm_name, "name": planner_config_cm_name,
...@@ -151,51 +215,116 @@ def generate_dgd_config_with_planner( ...@@ -151,51 +215,116 @@ def generate_dgd_config_with_planner(
mc_mounts.append( mc_mounts.append(
{ {
"name": planner_config_cm_name, "name": planner_config_cm_name,
"mountPath": planner_config_mount, "mountPath": PLANNER_CONFIG_MOUNT,
"readOnly": True, "readOnly": True,
} }
) )
# Profiling data volume (only if data exists)
if profile_data_cm is not None:
planner_volumes.append(
{
"name": profile_data_cm_name,
"configMap": {"name": profile_data_cm_name},
}
)
mc_mounts.append(
{
"name": profile_data_cm_name,
"mountPath": profile_data_mount,
"readOnly": True,
}
)
# Planner reads its config from the mounted planner-config ConfigMap
mc_args = mc_dict.setdefault("args", []) mc_args = mc_dict.setdefault("args", [])
mc_args.extend(["--config", f"{planner_config_mount}/planner_config.json"]) mc_args.extend(["--config", f"{PLANNER_CONFIG_MOUNT}/planner_config.json"])
config_dict["spec"]["services"]["Planner"] = planner_dict config_dict["spec"]["services"]["Planner"] = planner_dict
# --- Generate mocker config --- return planner_config_cm
mocker_config = _generate_mocker_config_with_planner(
dgdr=dgdr,
profile_data_mount=profile_data_mount,
planner_config_mount=planner_config_mount,
profile_data_cm=profile_data_cm,
planner_config_cm=planner_config_cm,
planner_dict=planner_dict,
)
# Collect all ConfigMaps + DGD into multi-doc output
config_maps = [cm for cm in [profile_data_cm, planner_config_cm] if cm is not None]
if config_maps:
dgd_config: list[dict[str, Any]] = config_maps + [config_dict]
else:
dgd_config = config_dict
return dgd_config, mocker_config def add_profile_data_to_config(
config_dict: dict,
output_dir: str | None,
) -> Optional[dict]:
"""Create a profile-data ConfigMap and mount it into consumers in *config_dict*.
Consumers are auto-detected:
- The **Planner** service (if present) gets the volume mounted.
- **Mocker workers** (if present) get the volume mounted and
``--planner-profile-data`` set.
Args:
config_dict: The DGD config dict — mutated in place.
output_dir: Directory containing profiling interpolation NPZ files.
Returns:
The ``profile_data_cm`` ConfigMap dict, or ``None`` if no profiling
data was found.
"""
profiling_data = _load_profiling_data(output_dir) if output_dir else {}
if not profiling_data:
return None
profile_data_cm_name = _make_cm_name(PLANNER_PROFILE_DATA_PREFIX)
profile_cm_data: dict[str, str] = {}
# TODO: use enums
if profiling_data.get("prefill"):
profile_cm_data["prefill_raw_data.json"] = json.dumps(profiling_data["prefill"])
if profiling_data.get("decode"):
profile_cm_data["decode_raw_data.json"] = json.dumps(profiling_data["decode"])
profile_data_cm = {
"apiVersion": "v1",
"kind": "ConfigMap",
"metadata": {"name": profile_data_cm_name},
"data": profile_cm_data,
}
# Mount into Planner service if it exists
planner_svc = config_dict.get("spec", {}).get("services", {}).get("Planner")
if planner_svc is not None:
_mount_volume_into_service(
planner_svc, profile_data_cm_name, PROFILE_DATA_MOUNT
)
# Mount into mocker workers if they exist
services = config_dict.get("spec", {}).get("services", {})
for worker_name in _mocker_worker_names():
worker_svc = services.get(worker_name)
if worker_svc is not None:
main_container = worker_svc.get("extraPodSpec", {}).get("mainContainer", {})
args_list = main_container.get("args", [])
args_list = set_argument_value(
args_list, "--planner-profile-data", PROFILE_DATA_MOUNT
)
main_container["args"] = args_list
_mount_volume_into_service(
worker_svc, profile_data_cm_name, PROFILE_DATA_MOUNT
)
return profile_data_cm
# ---------------------------------------------------------------------------
# Private helpers
# ---------------------------------------------------------------------------
def _mocker_worker_names() -> list[str]:
return [
MockerComponentName.prefill_worker_k8s_name,
MockerComponentName.decode_worker_k8s_name,
]
def _mount_volume_into_service(
service_dict: dict, cm_name: str, mount_path: str
) -> None:
"""Add a ConfigMap volume + volumeMount to a service's extraPodSpec."""
extra_pod_spec = service_dict.setdefault("extraPodSpec", {})
volumes = extra_pod_spec.setdefault("volumes", [])
volumes.append(
{
"name": cm_name,
"configMap": {"name": cm_name},
}
)
main_container = extra_pod_spec.setdefault("mainContainer", {})
volume_mounts = main_container.setdefault("volumeMounts", [])
volume_mounts.append(
{
"name": cm_name,
"mountPath": mount_path,
"readOnly": True,
}
)
def _build_planner_config( def _build_planner_config(
...@@ -259,86 +388,3 @@ def _load_profiling_data(output_dir: str) -> dict: ...@@ -259,86 +388,3 @@ def _load_profiling_data(output_dir: str) -> dict:
pass pass
return result return result
def _generate_mocker_config_with_planner(
dgdr,
profile_data_mount: str,
planner_config_mount: str,
profile_data_cm: Optional[dict],
planner_config_cm: dict,
planner_dict: dict,
) -> list[dict] | dict:
"""Generate mocker DGD config with planner for testing purposes."""
workspace_dir = get_workspace_dir()
mocker_config_path = os.path.join(workspace_dir, MOCKER_DISAGG_CONFIG_PATH)
with open(mocker_config_path, "r") as f:
mocker_config = yaml.safe_load(f)
image = dgdr.image
if image:
for service_config in (
mocker_config.get("spec", {}).get("services", {}).values()
):
if service_config.get("extraPodSpec") and service_config[
"extraPodSpec"
].get("mainContainer"):
service_config["extraPodSpec"]["mainContainer"]["image"] = image
model = dgdr.model
mocker_worker_names = [
MockerComponentName.prefill_worker_k8s_name,
MockerComponentName.decode_worker_k8s_name,
]
for worker_name in mocker_worker_names:
service_config = (
mocker_config.get("spec", {}).get("services", {}).get(worker_name)
)
if service_config:
main_container = service_config.get("extraPodSpec", {}).get(
"mainContainer", {}
)
args_list = main_container.get("args", [])
if profile_data_cm is not None:
args_list = set_argument_value(
args_list, "--planner-profile-data", profile_data_mount
)
args_list = set_argument_value(args_list, "--model-path", model)
args_list = set_argument_value(args_list, "--model-name", model)
main_container["args"] = args_list
# Mount profiling data ConfigMap into mocker workers
if profile_data_cm is not None:
pd_cm_name = profile_data_cm["metadata"]["name"]
for worker_name in mocker_worker_names:
service_config = (
mocker_config.get("spec", {}).get("services", {}).get(worker_name)
)
if service_config:
extra_pod_spec = service_config.setdefault("extraPodSpec", {})
volumes = extra_pod_spec.setdefault("volumes", [])
volumes.append(
{
"name": pd_cm_name,
"configMap": {"name": pd_cm_name},
}
)
main_container = extra_pod_spec.setdefault("mainContainer", {})
volume_mounts = main_container.setdefault("volumeMounts", [])
volume_mounts.append(
{
"name": pd_cm_name,
"mountPath": profile_data_mount,
"readOnly": True,
}
)
# Reuse planner service dict (already has both ConfigMaps mounted + --config arg)
mocker_planner_dict = copy.deepcopy(planner_dict)
mocker_config["spec"]["services"]["Planner"] = mocker_planner_dict
config_maps = [cm for cm in [profile_data_cm, planner_config_cm] if cm is not None]
if config_maps:
return config_maps + [mocker_config]
return mocker_config
...@@ -153,6 +153,30 @@ def is_planner_enabled(dgdr: DynamoGraphDeploymentRequestSpec) -> bool: ...@@ -153,6 +153,30 @@ def is_planner_enabled(dgdr: DynamoGraphDeploymentRequestSpec) -> bool:
) )
def is_mocker_enabled(dgdr: DynamoGraphDeploymentRequestSpec) -> bool:
"""True when the DGDR spec has mocker explicitly enabled."""
return (
dgdr.features is not None
and dgdr.features.mocker is not None
and dgdr.features.mocker.enabled is True
)
def needs_profile_data(dgdr: DynamoGraphDeploymentRequestSpec) -> bool:
"""True when the DGDR requires profiling interpolation data.
Profile data is consumed by mocker workers (for latency simulation)
and by the planner when throughput-based scaling is enabled.
"""
if is_mocker_enabled(dgdr):
return True
return (
dgdr.features is not None
and dgdr.features.planner is not None
and dgdr.features.planner.enable_throughput_scaling
)
def determine_picking_mode(dgdr: DynamoGraphDeploymentRequestSpec) -> str: def determine_picking_mode(dgdr: DynamoGraphDeploymentRequestSpec) -> str:
target_load_provided = dgdr.workload is not None and ( target_load_provided = dgdr.workload is not None and (
dgdr.workload.requestRate is not None or dgdr.workload.concurrency is not None dgdr.workload.requestRate is not None or dgdr.workload.concurrency is not None
......
...@@ -38,18 +38,18 @@ flowchart TD ...@@ -38,18 +38,18 @@ flowchart TD
Pick --> DGDGen Pick --> DGDGen
Naive --> DGDGen Naive --> DGDGen
DGDGen --> PlannerCheck{"Planner\nenabled?"} DGDGen --> Interpolation["Interpolation\nCurves"]
PlannerCheck -->|yes| Interpolation["Interpolation\nCurves"]
PlannerCheck -->|no| MockerCheck Interpolation --> MockerCheck{mocker?}
MockerCheck -->|yes| MockerBase["generate_mocker_config()"]
Interpolation --> AddPlanner["Add Planner\nService + ConfigMaps"] MockerCheck -->|no| PlannerCheck
AddPlanner --> MockerCheck{"Mocker\nenabled?"} MockerBase --> PlannerCheck{planner?}
PlannerCheck -->|yes| AddPlanner["add_planner_to_config()"]
MockerCheck -->|yes| Mocker["Output Mocker DGD"] PlannerCheck -->|no| ProfileCheck
MockerCheck -->|no| RealDGD["Output Real DGD"] AddPlanner --> ProfileCheck{"needs profile data?\n(mocker or throughput\nplanner enabled)"}
ProfileCheck -->|yes| AddProfile["add_profile_data_to_config()"]
Mocker --> Final["final_config.yaml"] ProfileCheck -->|no| Final
RealDGD --> Final AddProfile --> Final["final_config.yaml"]
``` ```
### Stage-by-stage walkthrough ### Stage-by-stage walkthrough
...@@ -64,9 +64,14 @@ flowchart TD ...@@ -64,9 +64,14 @@ flowchart TD
4. **DGD Generation**: The picked configuration is rendered into a complete DGD YAML via AIC's generator pipeline, including correct parallelization, replica counts, container image, and PVC mounts. 4. **DGD Generation**: The picked configuration is rendered into a complete DGD YAML via AIC's generator pipeline, including correct parallelization, replica counts, container image, and PVC mounts.
5. **Interpolation** (planner only): When the planner is enabled, the profiler generates detailed performance interpolation curves — TTFT vs ISL for prefill, ITL vs KV-cache utilization for decode. These are saved into ConfigMaps for the planner to use at runtime. 5. **Interpolation** (throughput planner/mocker): When the planner is enabled, the profiler generates detailed performance interpolation curves — TTFT vs ISL for prefill, ITL vs KV-cache utilization for decode. These are stored as NPZ files and later packaged into a ConfigMap during final assembly.
6. **Final Assembly** (3 composable layers):
1. **Mocker base**: If mocker is enabled, the base DGD is swapped for the mocker DGD template (`generate_mocker_config`). Otherwise the AIC-picked DGD is kept.
2. **Planner service**: If the planner is enabled, the Planner pod and its planner-config ConfigMap are injected into the DGD (`add_planner_to_config`).
3. **Profile data**: If mocker is enabled or planner throughput-based scaling is enabled, the interpolation data ConfigMap is created and mounted into all consumers — the Planner service and/or mocker workers (`add_profile_data_to_config`).
6. **Final Assembly**: The planner service is added to the DGD if enabled. If mocker is enabled, the mocker DGD is used instead of real workers. The result is written to `final_config.yaml`. The result is written to `final_config.yaml`.
## Search Strategies ## Search Strategies
......
...@@ -25,7 +25,6 @@ try: ...@@ -25,7 +25,6 @@ try:
PlannerPreDeploymentSweepMode, PlannerPreDeploymentSweepMode,
) )
from dynamo.profiler.profile_sla import ( from dynamo.profiler.profile_sla import (
_assemble_final_config,
_extract_profiler_params, _extract_profiler_params,
_write_final_output, _write_final_output,
) )
...@@ -33,6 +32,7 @@ try: ...@@ -33,6 +32,7 @@ try:
PickedParallelConfig, PickedParallelConfig,
) )
from dynamo.profiler.utils.defaults import SearchStrategy from dynamo.profiler.utils.defaults import SearchStrategy
from dynamo.profiler.utils.dgd_generation import assemble_final_config
from dynamo.profiler.utils.dgdr_v1beta1_types import ( from dynamo.profiler.utils.dgdr_v1beta1_types import (
DynamoGraphDeploymentRequestSpec, DynamoGraphDeploymentRequestSpec,
FeaturesSpec, FeaturesSpec,
...@@ -427,9 +427,11 @@ class TestWriteFinalOutput: ...@@ -427,9 +427,11 @@ class TestWriteFinalOutput:
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# _assemble_final_config # assemble_final_config
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
_DGD_GEN = "dynamo.profiler.utils.dgd_generation"
class TestAssembleFinalConfig: class TestAssembleFinalConfig:
@pytest.mark.pre_merge @pytest.mark.pre_merge
...@@ -439,7 +441,7 @@ class TestAssembleFinalConfig: ...@@ -439,7 +441,7 @@ class TestAssembleFinalConfig:
ops = _make_ops(tmp_path) ops = _make_ops(tmp_path)
dgd_config = {"kind": "DynamoGraphDeployment"} dgd_config = {"kind": "DynamoGraphDeployment"}
result = _assemble_final_config( result = assemble_final_config(
dgdr, dgdr,
ops, ops,
dgd_config, dgd_config,
...@@ -455,7 +457,7 @@ class TestAssembleFinalConfig: ...@@ -455,7 +457,7 @@ class TestAssembleFinalConfig:
dgdr = _make_dgdr() dgdr = _make_dgdr()
ops = _make_ops(tmp_path) ops = _make_ops(tmp_path)
result = _assemble_final_config( result = assemble_final_config(
dgdr, dgdr,
ops, ops,
None, None,
...@@ -467,19 +469,26 @@ class TestAssembleFinalConfig: ...@@ -467,19 +469,26 @@ class TestAssembleFinalConfig:
@pytest.mark.pre_merge @pytest.mark.pre_merge
@pytest.mark.gpu_0 @pytest.mark.gpu_0
def test_planner_no_mocker_returns_real_config(self, tmp_path): def test_planner_no_mocker_returns_config_with_planner_cm(self, tmp_path):
"""Planner enabled, no mocker: result is [planner_cm, profile_cm, dgd_config]."""
dgdr = _make_dgdr(features=FeaturesSpec(planner=_make_planner())) dgdr = _make_dgdr(features=FeaturesSpec(planner=_make_planner()))
ops = _make_ops(tmp_path) ops = _make_ops(tmp_path)
os.makedirs(ops.output_dir, exist_ok=True) os.makedirs(ops.output_dir, exist_ok=True)
dgd_config = {"kind": "DGD"} dgd_config = {"kind": "DGD", "spec": {"services": {}}}
real_cfg = {"kind": "real"} planner_cm = {"kind": "ConfigMap", "metadata": {"name": "planner-cm"}}
mocker_cfg = {"kind": "mocker"} profile_cm = {"kind": "ConfigMap", "metadata": {"name": "profile-cm"}}
with patch( with (
"dynamo.profiler.profile_sla.generate_dgd_config_with_planner", patch(
return_value=(real_cfg, mocker_cfg), f"{_DGD_GEN}.add_planner_to_config",
return_value=planner_cm,
) as mock_planner,
patch(
f"{_DGD_GEN}.add_profile_data_to_config",
return_value=profile_cm,
) as mock_profile,
): ):
result = _assemble_final_config( result = assemble_final_config(
dgdr, dgdr,
ops, ops,
dgd_config, dgd_config,
...@@ -487,11 +496,14 @@ class TestAssembleFinalConfig: ...@@ -487,11 +496,14 @@ class TestAssembleFinalConfig:
PickedParallelConfig(tp=1), PickedParallelConfig(tp=1),
) )
assert result is real_cfg mock_planner.assert_called_once()
mock_profile.assert_called_once()
assert result == [planner_cm, profile_cm, dgd_config]
@pytest.mark.pre_merge @pytest.mark.pre_merge
@pytest.mark.gpu_0 @pytest.mark.gpu_0
def test_mocker_enabled_returns_mocker_config(self, tmp_path): def test_mocker_plus_planner_uses_mocker_base(self, tmp_path):
"""Mocker + planner: mocker base is created first, then planner layered."""
dgdr = _make_dgdr( dgdr = _make_dgdr(
features=FeaturesSpec( features=FeaturesSpec(
planner=_make_planner(), planner=_make_planner(),
...@@ -501,14 +513,63 @@ class TestAssembleFinalConfig: ...@@ -501,14 +513,63 @@ class TestAssembleFinalConfig:
ops = _make_ops(tmp_path) ops = _make_ops(tmp_path)
os.makedirs(ops.output_dir, exist_ok=True) os.makedirs(ops.output_dir, exist_ok=True)
dgd_config = {"kind": "DGD"} dgd_config = {"kind": "DGD"}
real_cfg = {"kind": "real"} mocker_base = {"kind": "MockerDGD", "spec": {"services": {}}}
mocker_cfg = {"kind": "mocker"} planner_cm = {"kind": "ConfigMap", "metadata": {"name": "planner-cm"}}
profile_cm = {"kind": "ConfigMap", "metadata": {"name": "profile-cm"}}
with (
patch(
f"{_DGD_GEN}.generate_mocker_config",
return_value=mocker_base,
) as mock_mocker,
patch(
f"{_DGD_GEN}.add_planner_to_config",
return_value=planner_cm,
) as mock_planner,
patch(
f"{_DGD_GEN}.add_profile_data_to_config",
return_value=profile_cm,
),
):
result = assemble_final_config(
dgdr,
ops,
dgd_config,
PickedParallelConfig(tp=1),
PickedParallelConfig(tp=1),
)
with patch( mock_mocker.assert_called_once()
"dynamo.profiler.profile_sla.generate_dgd_config_with_planner", mock_planner.assert_called_once()
return_value=(real_cfg, mocker_cfg), assert mock_planner.call_args.args[1] is mocker_base
assert result == [planner_cm, profile_cm, mocker_base]
@pytest.mark.pre_merge
@pytest.mark.gpu_0
def test_mocker_only_no_planner_returns_mocker_config(self, tmp_path):
"""Mocker-only (no planner): generate_mocker_config is called,
add_planner_to_config is not, profile data is still attached."""
dgdr = _make_dgdr(features=FeaturesSpec(mocker=MockerSpec(enabled=True)))
ops = _make_ops(tmp_path)
os.makedirs(ops.output_dir, exist_ok=True)
dgd_config = {"kind": "DGD"}
mocker_base = {"kind": "MockerDGD", "spec": {"services": {}}}
profile_cm = {"kind": "ConfigMap", "metadata": {"name": "profile-cm"}}
with (
patch(
f"{_DGD_GEN}.generate_mocker_config",
return_value=mocker_base,
) as mock_mocker,
patch(
f"{_DGD_GEN}.add_planner_to_config",
) as mock_planner,
patch(
f"{_DGD_GEN}.add_profile_data_to_config",
return_value=profile_cm,
) as mock_profile,
): ):
result = _assemble_final_config( result = assemble_final_config(
dgdr, dgdr,
ops, ops,
dgd_config, dgd_config,
...@@ -516,4 +577,7 @@ class TestAssembleFinalConfig: ...@@ -516,4 +577,7 @@ class TestAssembleFinalConfig:
PickedParallelConfig(tp=1), PickedParallelConfig(tp=1),
) )
assert result is mocker_cfg mock_mocker.assert_called_once()
mock_planner.assert_not_called()
mock_profile.assert_called_once()
assert result == [profile_cm, mocker_base]
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment