Unverified Commit e49834c9 authored by Hongkuan Zhou's avatar Hongkuan Zhou Committed by GitHub
Browse files

refactor: preview config in profiler webui use planner dgd generation logic (#4940)


Signed-off-by: default avatarhongkuanz <hongkuanz@nvidia.com>
parent cceeb8e3
...@@ -59,7 +59,7 @@ from deploy.utils.dynamo_deployment import ( ...@@ -59,7 +59,7 @@ from deploy.utils.dynamo_deployment import (
DynamoDeploymentClient, DynamoDeploymentClient,
cleanup_remaining_deployments, cleanup_remaining_deployments,
) )
from dynamo.planner.defaults import WORKER_COMPONENT_NAMES from dynamo.planner.defaults import WORKER_COMPONENT_NAMES, SubComponentType
@dataclass @dataclass
...@@ -239,7 +239,7 @@ async def run_profile(args): ...@@ -239,7 +239,7 @@ async def run_profile(args):
prefill_config = apply_parallel_mapping_to_config( prefill_config = apply_parallel_mapping_to_config(
base_prefill_config, base_prefill_config,
mapping, mapping,
EngineType.PREFILL, SubComponentType.PREFILL,
config_modifier, config_modifier,
args.num_gpus_per_node, args.num_gpus_per_node,
) )
...@@ -344,7 +344,7 @@ async def run_profile(args): ...@@ -344,7 +344,7 @@ async def run_profile(args):
decode_config = apply_parallel_mapping_to_config( decode_config = apply_parallel_mapping_to_config(
base_decode_config, base_decode_config,
mapping, mapping,
EngineType.DECODE, SubComponentType.DECODE,
config_modifier, config_modifier,
args.num_gpus_per_node, args.num_gpus_per_node,
) )
...@@ -493,6 +493,9 @@ async def run_profile(args): ...@@ -493,6 +493,9 @@ async def run_profile(args):
selected_prefill_idx, selected_decode_idx = pick_config_with_webui( selected_prefill_idx, selected_decode_idx = pick_config_with_webui(
prefill_data, decode_data, args prefill_data, decode_data, args
) )
# update TTFT/ITL SLA based on selected config
args.ttft = prefill_data.ttft[selected_prefill_idx]
args.itl = decode_data.itl[selected_decode_idx]
else: else:
# automatically select P/D config within SLA with the highest throughput/GPU # automatically select P/D config within SLA with the highest throughput/GPU
# select best parallel mapping for prefill # select best parallel mapping for prefill
...@@ -563,7 +566,7 @@ async def run_profile(args): ...@@ -563,7 +566,7 @@ async def run_profile(args):
prefill_config = apply_parallel_mapping_to_config( prefill_config = apply_parallel_mapping_to_config(
prefill_config, prefill_config,
best_prefill_mapping, best_prefill_mapping,
EngineType.PREFILL, SubComponentType.PREFILL,
config_modifier, config_modifier,
args.num_gpus_per_node, args.num_gpus_per_node,
) )
...@@ -647,7 +650,7 @@ async def run_profile(args): ...@@ -647,7 +650,7 @@ async def run_profile(args):
decode_config = apply_parallel_mapping_to_config( decode_config = apply_parallel_mapping_to_config(
decode_config, decode_config,
best_decode_mapping, best_decode_mapping,
EngineType.DECODE, SubComponentType.DECODE,
config_modifier, config_modifier,
args.num_gpus_per_node, args.num_gpus_per_node,
) )
...@@ -738,17 +741,17 @@ async def run_profile(args): ...@@ -738,17 +741,17 @@ async def run_profile(args):
# save DGD config with planner; support multi-document output when a ConfigMap is included # save DGD config with planner; support multi-document output when a ConfigMap is included
with open(f"{args.output_dir}/config_with_planner.yaml", "w") as f: with open(f"{args.output_dir}/config_with_planner.yaml", "w") as f:
if isinstance(config, list): if isinstance(config, list):
yaml.dump_all(config, f) yaml.safe_dump_all(config, f, sort_keys=False)
else: else:
yaml.dump(config, f) yaml.safe_dump(config, f, sort_keys=False)
# save mocker config with planner for testing purposes # save mocker config with planner for testing purposes
logger.debug(f"Mocker config with planner: {mocker_config}") logger.debug(f"Mocker config with planner: {mocker_config}")
with open(f"{args.output_dir}/mocker_config_with_planner.yaml", "w") as f: with open(f"{args.output_dir}/mocker_config_with_planner.yaml", "w") as f:
if isinstance(mocker_config, list): if isinstance(mocker_config, list):
yaml.dump_all(mocker_config, f) yaml.safe_dump_all(mocker_config, f, sort_keys=False)
else: else:
yaml.dump(mocker_config, f) yaml.safe_dump(mocker_config, f, sort_keys=False)
except Exception as e: except Exception as e:
logger.error(f"Profile job failed with error: {e}") logger.error(f"Profile job failed with error: {e}")
......
...@@ -6,11 +6,12 @@ import logging ...@@ -6,11 +6,12 @@ import logging
from dataclasses import dataclass from dataclasses import dataclass
from enum import Enum from enum import Enum
from benchmarks.profiler.utils.defaults import PREFILL_MAX_NUM_TOKENS, EngineType from benchmarks.profiler.utils.defaults import PREFILL_MAX_NUM_TOKENS
from benchmarks.profiler.utils.model_info import ( from benchmarks.profiler.utils.model_info import (
MOE_ADDITIONAL_TP_ARCHITECTURES, MOE_ADDITIONAL_TP_ARCHITECTURES,
ModelInfo, ModelInfo,
) )
from dynamo.planner.defaults import SubComponentType
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO) logger.setLevel(logging.INFO)
...@@ -222,27 +223,44 @@ def get_candidate_parallel_mappings( ...@@ -222,27 +223,44 @@ def get_candidate_parallel_mappings(
def apply_parallel_mapping_to_config( def apply_parallel_mapping_to_config(
base_config: dict, base_config: dict,
mapping: ParallelizationMapping, mapping: ParallelizationMapping,
phase: str, phase: SubComponentType,
config_modifier, config_modifier,
num_gpus_per_node: int | None, num_gpus_per_node: int | None,
is_aggregated_config: bool = True,
) -> dict: ) -> dict:
cfg = copy.deepcopy(base_config) cfg = copy.deepcopy(base_config)
# In aggregated configs (used for profiling individual phases), the worker service we mutate
# is always the decode worker (prefill is converted to decode in convert_config()).
# In disaggregated configs (final DGD), we mutate the service matching the requested phase.
component_type = SubComponentType.DECODE if is_aggregated_config else phase
if mapping.tp is not None: if mapping.tp is not None:
cfg = config_modifier.set_config_tp_size(cfg, mapping.tp) cfg = config_modifier.set_config_tp_size(cfg, mapping.tp, component_type)
elif mapping.tep is not None: elif mapping.tep is not None:
cfg = config_modifier.set_config_tep_size(cfg, mapping.tep, num_gpus_per_node) cfg = config_modifier.set_config_tep_size(
cfg, mapping.tep, num_gpus_per_node, component_type
)
elif mapping.dep is not None: elif mapping.dep is not None:
cfg = config_modifier.set_config_dep_size(cfg, mapping.dep, num_gpus_per_node) cfg = config_modifier.set_config_dep_size(
cfg, mapping.dep, num_gpus_per_node, component_type
)
else: else:
raise ValueError(f"Invalid mapping: {mapping.label()}") raise ValueError(f"Invalid mapping: {mapping.label()}")
# for prefill,set batch size to attention_dp_size # For prefill, set batch size to attention_dp_size
# (this assume prompt is long enough to saturate the GPU, which is usually valid in disagg) # (this assume prompt is long enough to saturate the GPU, which is usually valid in disagg)
if phase == EngineType.PREFILL: if phase == SubComponentType.PREFILL:
prefill_component_type = (
SubComponentType.DECODE
if is_aggregated_config
else SubComponentType.PREFILL
)
cfg = config_modifier.set_prefill_config( cfg = config_modifier.set_prefill_config(
cfg, cfg,
max_batch_size=mapping.get_attn_dp_size(), max_batch_size=mapping.get_attn_dp_size(),
# max num tokens is shared by all attention dp ranks # max num tokens is shared by all attention dp ranks
max_num_tokens=PREFILL_MAX_NUM_TOKENS * mapping.get_attn_dp_size(), max_num_tokens=PREFILL_MAX_NUM_TOKENS * mapping.get_attn_dp_size(),
component_type=prefill_component_type,
) )
return cfg return cfg
...@@ -64,7 +64,11 @@ class ConfigModifierProtocol(Protocol): ...@@ -64,7 +64,11 @@ class ConfigModifierProtocol(Protocol):
@classmethod @classmethod
def set_prefill_config( def set_prefill_config(
cls, config: dict, max_batch_size: int, max_num_tokens: int cls,
config: dict,
max_batch_size: int,
max_num_tokens: int,
component_type: SubComponentType = SubComponentType.DECODE,
) -> dict: ) -> dict:
... ...
......
...@@ -379,7 +379,11 @@ class SGLangConfigModifier: ...@@ -379,7 +379,11 @@ class SGLangConfigModifier:
@classmethod @classmethod
def set_prefill_config( def set_prefill_config(
cls, config: dict, max_batch_size: int, max_num_tokens: int cls,
config: dict,
max_batch_size: int,
max_num_tokens: int,
component_type: SubComponentType = SubComponentType.DECODE,
) -> dict: ) -> dict:
""" """
Configure prefill-related limits for aggregated prefill runs. Configure prefill-related limits for aggregated prefill runs.
...@@ -388,7 +392,7 @@ class SGLangConfigModifier: ...@@ -388,7 +392,7 @@ class SGLangConfigModifier:
""" """
cfg = Config.model_validate(config) cfg = Config.model_validate(config)
worker_service = get_worker_service_from_config( worker_service = get_worker_service_from_config(
cfg, backend="sglang", sub_component_type=SubComponentType.DECODE cfg, backend="sglang", sub_component_type=component_type
) )
args = validate_and_get_worker_args(worker_service, backend="sglang") args = validate_and_get_worker_args(worker_service, backend="sglang")
args = break_arguments(args) args = break_arguments(args)
......
...@@ -350,7 +350,11 @@ class TrtllmConfigModifier: ...@@ -350,7 +350,11 @@ class TrtllmConfigModifier:
@classmethod @classmethod
def set_prefill_config( def set_prefill_config(
cls, config: dict, max_batch_size: int, max_num_tokens: int cls,
config: dict,
max_batch_size: int,
max_num_tokens: int,
component_type: SubComponentType = SubComponentType.DECODE,
) -> dict: ) -> dict:
""" """
Configure prefill-related limits for aggregated prefill runs. Configure prefill-related limits for aggregated prefill runs.
...@@ -360,7 +364,7 @@ class TrtllmConfigModifier: ...@@ -360,7 +364,7 @@ class TrtllmConfigModifier:
""" """
cfg = Config.model_validate(config) cfg = Config.model_validate(config)
worker_service = get_worker_service_from_config( worker_service = get_worker_service_from_config(
cfg, backend="trtllm", sub_component_type=SubComponentType.DECODE cfg, backend="trtllm", sub_component_type=component_type
) )
args = validate_and_get_worker_args(worker_service, backend="trtllm") args = validate_and_get_worker_args(worker_service, backend="trtllm")
args = break_arguments(args) args = break_arguments(args)
......
...@@ -307,7 +307,11 @@ class VllmV1ConfigModifier: ...@@ -307,7 +307,11 @@ class VllmV1ConfigModifier:
@classmethod @classmethod
def set_prefill_config( def set_prefill_config(
cls, config: dict, max_batch_size: int, max_num_tokens: int cls,
config: dict,
max_batch_size: int,
max_num_tokens: int,
component_type: SubComponentType = SubComponentType.DECODE,
) -> dict: ) -> dict:
""" """
Configure prefill-related limits for aggregated prefill runs. Configure prefill-related limits for aggregated prefill runs.
...@@ -316,7 +320,7 @@ class VllmV1ConfigModifier: ...@@ -316,7 +320,7 @@ class VllmV1ConfigModifier:
""" """
cfg = Config.model_validate(config) cfg = Config.model_validate(config)
worker_service = get_worker_service_from_config( worker_service = get_worker_service_from_config(
cfg, backend="vllm", sub_component_type=SubComponentType.DECODE cfg, backend="vllm", sub_component_type=component_type
) )
args = validate_and_get_worker_args(worker_service, backend="vllm") args = validate_and_get_worker_args(worker_service, backend="vllm")
args = break_arguments(args) args = break_arguments(args)
......
...@@ -26,8 +26,10 @@ from benchmarks.profiler.utils.config import ( ...@@ -26,8 +26,10 @@ from benchmarks.profiler.utils.config import (
DgdPlannerServiceConfig, DgdPlannerServiceConfig,
set_argument_value, set_argument_value,
) )
from benchmarks.profiler.utils.config_modifiers import CONFIG_MODIFIERS
from benchmarks.profiler.utils.config_modifiers.parallelization_mapping import ( from benchmarks.profiler.utils.config_modifiers.parallelization_mapping import (
ParallelizationMapping, ParallelizationMapping,
apply_parallel_mapping_to_config,
) )
from benchmarks.profiler.utils.planner_utils import build_planner_args_from_namespace from benchmarks.profiler.utils.planner_utils import build_planner_args_from_namespace
from dynamo.common.utils.paths import get_workspace_dir from dynamo.common.utils.paths import get_workspace_dir
...@@ -37,14 +39,196 @@ from dynamo.planner.defaults import MockerComponentName, SubComponentType ...@@ -37,14 +39,196 @@ from dynamo.planner.defaults import MockerComponentName, SubComponentType
MOCKER_DISAGG_CONFIG_PATH = "examples/backends/mocker/deploy/disagg.yaml" MOCKER_DISAGG_CONFIG_PATH = "examples/backends/mocker/deploy/disagg.yaml"
def generate_dgd_config_with_planner( def _get_config_modifier_from_args(args):
"""Return an instantiated config modifier for args.backend."""
config_modifier_cls = CONFIG_MODIFIERS[args.backend]
return config_modifier_cls()
def _find_service_name_for_subcomponent(
config: Config, subcomponent: SubComponentType
) -> str:
"""Find the service name in a DGD config for a given subComponentType."""
for service_name, service_cfg in config.spec.services.items():
if getattr(service_cfg, "subComponentType", None) == subcomponent:
return service_name
raise KeyError(f"Could not find service with subComponentType={subcomponent!r}")
def _load_and_apply_mappings(
*,
config_path: str, config_path: str,
args,
config_modifier, config_modifier,
output_dir: str, best_prefill_mapping: ParallelizationMapping | None,
best_decode_mapping: ParallelizationMapping | None,
num_gpus_per_node: int,
) -> Config:
"""Load a DGD config file and apply optional prefill/decode parallel mappings (single source of truth)."""
with open(config_path, "r") as f:
raw = yaml.safe_load(f)
# Update container image if provided (overrides config file images)
if getattr(args, "dgd_image", None):
raw = config_modifier.update_image(raw, args.dgd_image)
if best_prefill_mapping is not None:
raw = apply_parallel_mapping_to_config(
raw,
best_prefill_mapping,
SubComponentType.PREFILL,
config_modifier,
num_gpus_per_node,
is_aggregated_config=False,
)
if best_decode_mapping is not None:
raw = apply_parallel_mapping_to_config(
raw,
best_decode_mapping,
SubComponentType.DECODE,
config_modifier,
num_gpus_per_node,
is_aggregated_config=False,
)
return Config.model_validate(raw)
def build_prefill_service_config(
*,
config_path: str,
args,
best_prefill_mapping: ParallelizationMapping,
num_gpus_per_node: int = 8,
) -> tuple[str, dict]:
"""Return (service_name, service_dict) for the prefill worker after applying mapping."""
return _build_single_worker_service_config(
config_path=config_path,
args=args,
mapping=best_prefill_mapping,
subcomponent=SubComponentType.PREFILL,
num_gpus_per_node=num_gpus_per_node,
)
def build_decode_service_config(
*,
config_path: str,
args,
best_decode_mapping: ParallelizationMapping,
num_gpus_per_node: int = 8,
) -> tuple[str, dict]:
"""Return (service_name, service_dict) for the decode worker after applying mapping."""
return _build_single_worker_service_config(
config_path=config_path,
args=args,
mapping=best_decode_mapping,
subcomponent=SubComponentType.DECODE,
num_gpus_per_node=num_gpus_per_node,
)
def _build_single_worker_service_config(
*,
config_path: str,
args,
mapping: ParallelizationMapping,
subcomponent: SubComponentType,
num_gpus_per_node: int,
) -> tuple[str, dict]:
"""Shared helper for building a single worker service dict (prefill or decode)."""
config_modifier = _get_config_modifier_from_args(args)
config = _load_and_apply_mappings(
config_path=config_path,
args=args,
config_modifier=config_modifier,
best_prefill_mapping=mapping
if subcomponent == SubComponentType.PREFILL
else None,
best_decode_mapping=mapping
if subcomponent == SubComponentType.DECODE
else None,
num_gpus_per_node=num_gpus_per_node,
)
service_name = _find_service_name_for_subcomponent(config, subcomponent)
config_dict = config.model_dump(exclude_unset=False)
return service_name, config_dict["spec"]["services"][service_name]
def generate_prefill_service_config_preview(
*,
config_path: str,
args,
best_prefill_mapping: ParallelizationMapping,
num_gpus_per_node: int = 8,
) -> dict:
"""Generate a prefill-only service config object for WebUI 'Show Config'."""
service_name, service_dict = build_prefill_service_config(
config_path=config_path,
args=args,
best_prefill_mapping=best_prefill_mapping,
num_gpus_per_node=num_gpus_per_node,
)
return {service_name: service_dict}
def generate_decode_service_config_preview(
*,
config_path: str,
args,
best_decode_mapping: ParallelizationMapping,
num_gpus_per_node: int = 8,
) -> dict:
"""Generate a decode-only service config object for WebUI 'Show Config'."""
service_name, service_dict = build_decode_service_config(
config_path=config_path,
args=args,
best_decode_mapping=best_decode_mapping,
num_gpus_per_node=num_gpus_per_node,
)
return {service_name: service_dict}
def generate_prefill_decode_services_config_preview(
*,
config_path: str,
args, args,
best_prefill_mapping: ParallelizationMapping, best_prefill_mapping: ParallelizationMapping,
best_decode_mapping: ParallelizationMapping, best_decode_mapping: ParallelizationMapping,
num_gpus_per_node: int = 8, num_gpus_per_node: int = 8,
) -> dict[str, dict]:
"""Generate a (prefill+decode)-only services config object for WebUI 'Show Config'."""
config_modifier = _get_config_modifier_from_args(args)
config = _load_and_apply_mappings(
config_path=config_path,
args=args,
config_modifier=config_modifier,
best_prefill_mapping=best_prefill_mapping,
best_decode_mapping=best_decode_mapping,
num_gpus_per_node=num_gpus_per_node,
)
prefill_service_name = _find_service_name_for_subcomponent(
config, SubComponentType.PREFILL
)
decode_service_name = _find_service_name_for_subcomponent(
config, SubComponentType.DECODE
)
config_dict = config.model_dump(exclude_unset=False)
services = {
prefill_service_name: config_dict["spec"]["services"][prefill_service_name],
decode_service_name: config_dict["spec"]["services"][decode_service_name],
}
return services
def generate_dgd_config_with_planner(
config_path: str,
config_modifier,
output_dir: str | None,
args,
best_prefill_mapping: ParallelizationMapping | None,
best_decode_mapping: ParallelizationMapping | None,
num_gpus_per_node: int = 8,
) -> tuple[list[dict] | dict, list[dict] | dict]: ) -> tuple[list[dict] | dict, list[dict] | dict]:
"""Generate DGD config with planner based on profiling results. """Generate DGD config with planner based on profiling results.
...@@ -65,62 +249,14 @@ def generate_dgd_config_with_planner( ...@@ -65,62 +249,14 @@ def generate_dgd_config_with_planner(
If a ConfigMap is generated, returns [ConfigMap, DGD]; otherwise returns a single DGD dict. If a ConfigMap is generated, returns [ConfigMap, DGD]; otherwise returns a single DGD dict.
""" """
# Load config from file config = _load_and_apply_mappings(
with open(config_path, "r") as f: config_path=config_path,
config = yaml.safe_load(f) args=args,
config_modifier=config_modifier,
# Update container image if provided best_prefill_mapping=best_prefill_mapping,
# This overrides the default image in the config file for all DGD components best_decode_mapping=best_decode_mapping,
if args.dgd_image: num_gpus_per_node=num_gpus_per_node,
config = config_modifier.update_image(config, args.dgd_image) )
# Apply prefill parallelization based on the actual mapping used in profiling
if best_prefill_mapping.tp is not None:
# Dense model or TP for prefill
config = config_modifier.set_config_tp_size(
config, best_prefill_mapping.tp, SubComponentType.PREFILL
)
elif best_prefill_mapping.tep is not None:
# MoE model with TEP for prefill
config = config_modifier.set_config_tep_size(
config,
best_prefill_mapping.tep,
num_gpus_per_node,
SubComponentType.PREFILL,
)
elif best_prefill_mapping.dep is not None:
# MoE model with DEP for prefill
config = config_modifier.set_config_dep_size(
config,
best_prefill_mapping.dep,
num_gpus_per_node,
SubComponentType.PREFILL,
)
# Apply decode parallelization based on the actual mapping used in profiling
if best_decode_mapping.tp is not None:
# Dense model or TP for decode
config = config_modifier.set_config_tp_size(
config, best_decode_mapping.tp, SubComponentType.DECODE
)
elif best_decode_mapping.tep is not None:
# MoE model with TEP for decode
config = config_modifier.set_config_tep_size(
config,
best_decode_mapping.tep,
num_gpus_per_node,
SubComponentType.DECODE,
)
elif best_decode_mapping.dep is not None:
# MoE model with DEP for decode
config = config_modifier.set_config_dep_size(
config,
best_decode_mapping.dep,
num_gpus_per_node,
SubComponentType.DECODE,
)
config = Config.model_validate(config)
# add the planner service # add the planner service
planner_config = DgdPlannerServiceConfig() planner_config = DgdPlannerServiceConfig()
...@@ -157,66 +293,69 @@ def generate_dgd_config_with_planner( ...@@ -157,66 +293,69 @@ def generate_dgd_config_with_planner(
# Add arguments determined by profiling results # Add arguments determined by profiling results
cm_mount_path = f"{get_workspace_dir()}/profiling_results" cm_mount_path = f"{get_workspace_dir()}/profiling_results"
planner_args.extend( if best_prefill_mapping is not None:
[ planner_args.append(
f"--prefill-engine-num-gpu={best_prefill_mapping.get_num_gpus()}", f"--prefill-engine-num-gpu={best_prefill_mapping.get_num_gpus()}"
f"--decode-engine-num-gpu={best_decode_mapping.get_num_gpus()}", )
f"--profile-results-dir={cm_mount_path}", if best_decode_mapping is not None:
] planner_args.append(
) f"--decode-engine-num-gpu={best_decode_mapping.get_num_gpus()}"
)
if ( # Work with plain dicts for PodSpec/Container extras (e.g. volumes, volumeMounts)
planner_config.extraPodSpec.mainContainer # because those fields are stored as "extra" and aren't exposed as pydantic attributes.
and planner_config.extraPodSpec.mainContainer.args is not None
):
planner_config.extraPodSpec.mainContainer.args.extend(planner_args)
# Convert planner config to dict first, then the entire config to dict
planner_dict = planner_config.model_dump(exclude_unset=False) planner_dict = planner_config.model_dump(exclude_unset=False)
config_dict = config.model_dump(exclude_unset=False) config_dict = config.model_dump(exclude_unset=False)
# Build a ConfigMap from NPZ profiling outputs and mount it into the Planner
# We store data as plain JSON (lists/float/int) to avoid binary artifacts.
prefill_npz = f"{output_dir}/selected_prefill_interpolation/raw_data.npz"
decode_npz = f"{output_dir}/selected_decode_interpolation/raw_data.npz"
config_map_obj: Optional[dict] = None config_map_obj: Optional[dict] = None
try: prefill_json = None
with np.load(prefill_npz) as p_raw: decode_json = None
prefill_json = { if output_dir is not None:
"prefill_isl": p_raw["prefill_isl"].tolist(), # Build a ConfigMap from NPZ profiling outputs and mount it into the Planner
"prefill_ttft": p_raw["prefill_ttft"].tolist(), # We store data as plain JSON (lists/float/int) to avoid binary artifacts.
"prefill_thpt_per_gpu": p_raw["prefill_thpt_per_gpu"].tolist(), prefill_npz = f"{output_dir}/selected_prefill_interpolation/raw_data.npz"
} decode_npz = f"{output_dir}/selected_decode_interpolation/raw_data.npz"
except FileNotFoundError:
prefill_json = None try:
with np.load(prefill_npz) as p_raw:
try: prefill_json = {
with np.load(decode_npz) as d_raw: "prefill_isl": p_raw["prefill_isl"].tolist(),
# max_kv_tokens saved as array; convert to int "prefill_ttft": p_raw["prefill_ttft"].tolist(),
max_kv_tokens = d_raw["max_kv_tokens"] "prefill_thpt_per_gpu": p_raw["prefill_thpt_per_gpu"].tolist(),
if hasattr(max_kv_tokens, "tolist"): }
max_kv_tokens_val = max_kv_tokens.tolist() except FileNotFoundError:
# Handle [value] vs value prefill_json = None
if isinstance(max_kv_tokens_val, list):
max_kv_tokens_val = ( try:
int(max_kv_tokens_val[0]) if max_kv_tokens_val else 0 with np.load(decode_npz) as d_raw:
) # max_kv_tokens saved as array; convert to int
max_kv_tokens = d_raw["max_kv_tokens"]
if hasattr(max_kv_tokens, "tolist"):
max_kv_tokens_val = max_kv_tokens.tolist()
# Handle [value] vs value
if isinstance(max_kv_tokens_val, list):
max_kv_tokens_val = (
int(max_kv_tokens_val[0]) if max_kv_tokens_val else 0
)
else:
max_kv_tokens_val = int(max_kv_tokens_val)
else: else:
max_kv_tokens_val = int(max_kv_tokens_val) max_kv_tokens_val = int(max_kv_tokens)
else:
max_kv_tokens_val = int(max_kv_tokens) decode_json = {
"x_kv_usage": d_raw["x_kv_usage"].tolist(),
decode_json = { "y_context_length": d_raw["y_context_length"].tolist(),
"x_kv_usage": d_raw["x_kv_usage"].tolist(), "z_itl": d_raw["z_itl"].tolist(),
"y_context_length": d_raw["y_context_length"].tolist(), "z_thpt_per_gpu": d_raw["z_thpt_per_gpu"].tolist(),
"z_itl": d_raw["z_itl"].tolist(), "max_kv_tokens": max_kv_tokens_val,
"z_thpt_per_gpu": d_raw["z_thpt_per_gpu"].tolist(), }
"max_kv_tokens": max_kv_tokens_val, except FileNotFoundError:
} decode_json = None
except FileNotFoundError:
decode_json = None
if prefill_json is not None and decode_json is not None: if prefill_json is not None and decode_json is not None:
# Only override planner profile directory when we actually have data to mount.
planner_args.append(f"--profile-results-dir={cm_mount_path}")
config_map_obj = { config_map_obj = {
"apiVersion": "v1", "apiVersion": "v1",
"kind": "ConfigMap", "kind": "ConfigMap",
...@@ -249,6 +388,13 @@ def generate_dgd_config_with_planner( ...@@ -249,6 +388,13 @@ def generate_dgd_config_with_planner(
} }
) )
# Attach planner args (always)
mc_dict = planner_dict.setdefault("extraPodSpec", {}).setdefault(
"mainContainer", {}
)
mc_args = mc_dict.setdefault("args", [])
mc_args.extend(planner_args)
# Finalize DGD services # Finalize DGD services
config_dict["spec"]["services"]["Planner"] = planner_dict config_dict["spec"]["services"]["Planner"] = planner_dict
...@@ -310,7 +456,7 @@ def _generate_mocker_config_with_planner( ...@@ -310,7 +456,7 @@ def _generate_mocker_config_with_planner(
"image" "image"
] = args.dgd_image ] = args.dgd_image
# Update worker args: --planner-profile-data, --model-path, --model-name # Update worker args: --planner-profile-data (if available), --model-path, --model-name
mocker_worker_names = [ mocker_worker_names = [
MockerComponentName.prefill_worker_k8s_name, MockerComponentName.prefill_worker_k8s_name,
MockerComponentName.decode_worker_k8s_name, MockerComponentName.decode_worker_k8s_name,
...@@ -324,9 +470,10 @@ def _generate_mocker_config_with_planner( ...@@ -324,9 +470,10 @@ def _generate_mocker_config_with_planner(
"mainContainer", {} "mainContainer", {}
) )
args_list = main_container.get("args", []) args_list = main_container.get("args", [])
args_list = set_argument_value( if config_map_obj is not None:
args_list, "--planner-profile-data", cm_mount_path args_list = set_argument_value(
) args_list, "--planner-profile-data", cm_mount_path
)
# Update model path and name if available in args # Update model path and name if available in args
args_list = set_argument_value(args_list, "--model-path", args.model) args_list = set_argument_value(args_list, "--model-path", args.model)
args_list = set_argument_value(args_list, "--model-name", args.model) args_list = set_argument_value(args_list, "--model-name", args.model)
......
...@@ -20,6 +20,11 @@ from aiconfigurator.webapp.components.profiling import ( ...@@ -20,6 +20,11 @@ from aiconfigurator.webapp.components.profiling import (
load_profiling_javascript, load_profiling_javascript,
) )
from benchmarks.profiler.utils.dgd_generation import (
generate_decode_service_config_preview,
generate_prefill_decode_services_config_preview,
generate_prefill_service_config_preview,
)
from benchmarks.profiler.utils.pareto import compute_pareto from benchmarks.profiler.utils.pareto import compute_pareto
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
...@@ -56,149 +61,87 @@ def clear_profiling_errors() -> None: ...@@ -56,149 +61,87 @@ def clear_profiling_errors() -> None:
_profiling_errors.clear() _profiling_errors.clear()
def generate_dgd_worker_config_yaml( def dump_yaml_with_header(header_lines: list[str], obj: dict) -> str:
parallel_mapping, """Dump YAML with a leading comment header (used for WebUI config previews)."""
engine_type: str, header = "\n".join(header_lines + ["#"])
model: str | None = None, body = yaml.safe_dump(obj, sort_keys=False)
backend: str | None = None, return f"{header}\n{body}"
ttft_or_itl: float | None = None,
thpt_per_gpu: float | None = None,
) -> str:
"""
Generate a DGD worker service config snippet for display in the WebUI.
Uses ParallelizationMapping.label() for display and shows the service structure
that would be used in the final DynamoGraphDeployment.
Args: def _maybe_add_model_backend_header_lines(header_lines: list[str], args) -> None:
parallel_mapping: ParallelizationMapping instance model = getattr(args, "model", None)
engine_type: "prefill" or "decode" backend = getattr(args, "backend", None)
model: Model name/path if model:
backend: Backend name (sglang, vllm, trtllm) header_lines.append(f"# Model: {model}")
ttft_or_itl: TTFT (prefill) or ITL (decode) in ms if backend:
thpt_per_gpu: Throughput per GPU in tokens/s/GPU header_lines.append(f"# Backend: {backend}")
Returns:
YAML string representation of the DGD worker config
"""
num_gpus = parallel_mapping.get_num_gpus()
# Build the worker config in DGD style
# Note: Actual args vary by backend; this shows the structure
worker_config = {
"componentType": "worker",
"subComponentType": engine_type,
"replicas": 1,
"resources": {
"limits": {
"gpu": str(num_gpus),
}
},
}
# Build header comments with profiling metadata def build_single_service_preview_header_lines(
*,
service_name: str,
engine_type: str,
mapping,
ttft_or_itl_ms: float | None,
thpt_per_gpu: float | None,
args,
) -> list[str]:
header_lines = [ header_lines = [
"# DynamoGraphDeployment Worker Config", "# DynamoGraphDeployment Service Config Preview",
f"# Service: {service_name}",
f"# Engine: {engine_type}", f"# Engine: {engine_type}",
f"# Num GPUs: {num_gpus}", f"# Num GPUs: {mapping.get_num_gpus()}",
f"# Parallelization: {parallel_mapping.label()}", f"# Parallelization: {mapping.label()}",
] ]
if engine_type == "prefill" and ttft_or_itl_ms is not None:
if engine_type == "prefill" and ttft_or_itl is not None: header_lines.append(f"# Profiled TTFT: {round(ttft_or_itl_ms, 2)} ms")
header_lines.append(f"# Profiled TTFT: {round(ttft_or_itl, 2)} ms") if engine_type == "decode" and ttft_or_itl_ms is not None:
elif engine_type == "decode" and ttft_or_itl is not None: header_lines.append(f"# Profiled ITL: {round(ttft_or_itl_ms, 2)} ms")
header_lines.append(f"# Profiled ITL: {round(ttft_or_itl, 2)} ms")
if thpt_per_gpu is not None: if thpt_per_gpu is not None:
header_lines.append( header_lines.append(
f"# Profiled Throughput: {round(thpt_per_gpu, 2)} tokens/s/GPU" f"# Profiled Throughput: {round(thpt_per_gpu, 2)} tokens/s/GPU"
) )
_maybe_add_model_backend_header_lines(header_lines, args)
if model: header_lines.append(
header_lines.append(f"# Model: {model}") "# Note: This is a service-only preview. Final config includes planner."
if backend:
header_lines.append(f"# Backend: {backend}")
header_lines.append("#")
header_lines.append("# Note: Final config generated after selection includes")
header_lines.append("# backend-specific args and planner configuration.")
# Add the actual config
service_name = f"{engine_type.capitalize()}Worker"
body = yaml.dump(
{service_name: worker_config}, default_flow_style=False, sort_keys=False
) )
return header_lines
return "\n".join(header_lines) + "\n" + body
def generate_dgd_config_yaml_for_display( def build_two_service_preview_header_lines(
*,
prefill_service_name: str,
decode_service_name: str,
prefill_mapping, prefill_mapping,
decode_mapping, decode_mapping,
model: str | None = None, prefill_ttft_ms: float | None,
backend: str | None = None, prefill_thpt_per_gpu: float | None,
) -> str: decode_itl_ms: float | None,
""" decode_thpt_per_gpu: float | None,
Generate a DGD config snippet for display in the WebUI. args,
) -> list[str]:
This shows the combined prefill + decode DynamoGraphDeployment structure.
Uses ParallelizationMapping.label() for parallelization info.
Args:
prefill_mapping: ParallelizationMapping for prefill
decode_mapping: ParallelizationMapping for decode
model: Model name/path
backend: Backend name
Returns:
YAML string representation of the DGD configuration
"""
prefill_gpus = prefill_mapping.get_num_gpus()
decode_gpus = decode_mapping.get_num_gpus()
# Build DGD-style config showing the service structure
config = {
"apiVersion": "nvidia.com/v1alpha1",
"kind": "DynamoGraphDeployment",
"spec": {
"services": {
"PrefillWorker": {
"componentType": "worker",
"subComponentType": "prefill",
"replicas": 1,
"resources": {
"limits": {"gpu": str(prefill_gpus)},
},
},
"DecodeWorker": {
"componentType": "worker",
"subComponentType": "decode",
"replicas": 1,
"resources": {
"limits": {"gpu": str(decode_gpus)},
},
},
}
},
}
# Build header comments with parallelization and model info
header_lines = [ header_lines = [
"# DynamoGraphDeployment Configuration Preview", "# DynamoGraphDeployment Services Config Preview",
f"# Prefill: {prefill_gpus} GPU(s), {prefill_mapping.label()}", f"# Prefill service: {prefill_service_name} ({prefill_mapping.get_num_gpus()} GPU(s), {prefill_mapping.label()})",
f"# Decode: {decode_gpus} GPU(s), {decode_mapping.label()}", f"# Decode service: {decode_service_name} ({decode_mapping.get_num_gpus()} GPU(s), {decode_mapping.label()})",
] ]
if model: if prefill_ttft_ms is not None:
header_lines.append(f"# Model: {model}") header_lines.append(f"# Profiled TTFT: {round(prefill_ttft_ms, 2)} ms")
if backend: if decode_itl_ms is not None:
header_lines.append(f"# Backend: {backend}") header_lines.append(f"# Profiled ITL: {round(decode_itl_ms, 2)} ms")
header_lines.append("#") if prefill_thpt_per_gpu is not None:
header_lines.append("# Full config with planner saved to: config_with_planner.yaml") header_lines.append(
f"# Profiled Prefill Throughput: {round(prefill_thpt_per_gpu, 2)} tokens/s/GPU"
header = "\n".join(header_lines) )
body = yaml.dump(config, default_flow_style=False, sort_keys=False) if decode_thpt_per_gpu is not None:
header_lines.append(
return f"{header}\n{body}" f"# Profiled Decode Throughput: {round(decode_thpt_per_gpu, 2)} tokens/s/GPU"
)
_maybe_add_model_backend_header_lines(header_lines, args)
header_lines.append(
"# Note: This is a services-only preview. Final config includes planner."
)
return header_lines
class PlotType(str, Enum): class PlotType(str, Enum):
...@@ -326,15 +269,22 @@ def populate_prefill_data(data, prefill_data, args): ...@@ -326,15 +269,22 @@ def populate_prefill_data(data, prefill_data, args):
prefill_data.parallel_mappings, prefill_data.parallel_mappings,
) )
): ):
# Generate DGD worker config YAML for display config_obj = generate_prefill_service_config_preview(
config_yaml = generate_dgd_worker_config_yaml( config_path=args.config,
parallel_mapping=mapping, args=args,
best_prefill_mapping=mapping,
num_gpus_per_node=getattr(args, "num_gpus_per_node", 8),
)
service_name = next(iter(config_obj.keys()))
header_lines = build_single_service_preview_header_lines(
service_name=service_name,
engine_type="prefill", engine_type="prefill",
model=getattr(args, "model", None), mapping=mapping,
backend=getattr(args, "backend", None), ttft_or_itl_ms=ttft,
ttft_or_itl=ttft,
thpt_per_gpu=thpt, thpt_per_gpu=thpt,
args=args,
) )
config_yaml = dump_yaml_with_header(header_lines, config_obj)
table_data.append([gpu, round(ttft, 2), round(thpt, 2), config_yaml]) table_data.append([gpu, round(ttft, 2), round(thpt, 2), config_yaml])
data[PlotType.PREFILL]["table"]["data"] = table_data data[PlotType.PREFILL]["table"]["data"] = table_data
...@@ -383,15 +333,22 @@ def populate_decode_data(data, decode_data, args): ...@@ -383,15 +333,22 @@ def populate_decode_data(data, decode_data, args):
decode_data.parallel_mappings, decode_data.parallel_mappings,
) )
): ):
# Generate DGD worker config YAML for display config_obj = generate_decode_service_config_preview(
config_yaml = generate_dgd_worker_config_yaml( config_path=args.config,
parallel_mapping=mapping, args=args,
best_decode_mapping=mapping,
num_gpus_per_node=getattr(args, "num_gpus_per_node", 8),
)
service_name = next(iter(config_obj.keys()))
header_lines = build_single_service_preview_header_lines(
service_name=service_name,
engine_type="decode", engine_type="decode",
model=getattr(args, "model", None), mapping=mapping,
backend=getattr(args, "backend", None), ttft_or_itl_ms=itl,
ttft_or_itl=itl,
thpt_per_gpu=thpt, thpt_per_gpu=thpt,
args=args,
) )
config_yaml = dump_yaml_with_header(header_lines, config_obj)
table_data.append([gpu, round(itl, 2), round(thpt, 2), config_yaml]) table_data.append([gpu, round(itl, 2), round(thpt, 2), config_yaml])
data[PlotType.DECODE]["table"]["data"] = table_data data[PlotType.DECODE]["table"]["data"] = table_data
...@@ -468,13 +425,32 @@ def populate_cost_data( ...@@ -468,13 +425,32 @@ def populate_cost_data(
# Store mapping from cost table row to original indices # Store mapping from cost table row to original indices
cost_index_mapping[table_idx] = (orig_prefill_idx, orig_decode_idx) cost_index_mapping[table_idx] = (orig_prefill_idx, orig_decode_idx)
# Generate DGD config YAML for display services_obj = generate_prefill_decode_services_config_preview(
config_yaml = generate_dgd_config_yaml_for_display( config_path=args.config,
args=args,
best_prefill_mapping=prefill_mapping,
best_decode_mapping=decode_mapping,
num_gpus_per_node=getattr(args, "num_gpus_per_node", 8),
)
# Determine service names (backend-dependent)
service_names = list(services_obj.keys())
# Prefer stable names by picking based on subComponentType if present; fallback to insertion order.
prefill_service_name = service_names[0]
decode_service_name = (
service_names[1] if len(service_names) > 1 else service_names[0]
)
header_lines = build_two_service_preview_header_lines(
prefill_service_name=prefill_service_name,
decode_service_name=decode_service_name,
prefill_mapping=prefill_mapping, prefill_mapping=prefill_mapping,
decode_mapping=decode_mapping, decode_mapping=decode_mapping,
model=getattr(args, "model", None), prefill_ttft_ms=float(_p_ttft),
backend=getattr(args, "backend", None), prefill_thpt_per_gpu=float(_p_thpt),
decode_itl_ms=float(_d_itl),
decode_thpt_per_gpu=float(_d_thpt),
args=args,
) )
config_yaml = dump_yaml_with_header(header_lines, services_obj)
# Add to table data (GPU hours, not cost - frontend handles cost conversion) # Add to table data (GPU hours, not cost - frontend handles cost conversion)
table_data.append( table_data.append(
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment