Unverified Commit 83e259a7 authored by Hongkuan Zhou's avatar Hongkuan Zhou Committed by GitHub
Browse files

feat: pre-deployment profiling automatically generates and deploys optimized...


feat: pre-deployment profiling automatically generates and deploys optimized DGD with planner (#3441)
Signed-off-by: default avatarhongkuanz <hongkuanz@nvidia.com>
parent 43ea602a
......@@ -101,6 +101,7 @@ TensorRT-LLM
# Benchmarks
benchmarks/results
profiling_results*
# Direnv
.envrc
\ No newline at end of file
.envrc
......@@ -22,9 +22,14 @@ import os
import numpy as np
import yaml
from benchmarks.profiler.utils.config import CONFIG_MODIFIERS, WORKER_COMPONENT_NAMES
from benchmarks.profiler.utils.config import (
CONFIG_MODIFIERS,
WORKER_COMPONENT_NAMES,
generate_dgd_config_with_planner,
)
from benchmarks.profiler.utils.estimate_perf import AIConfiguratorPerfEstimator
from benchmarks.profiler.utils.genai_perf import benchmark_decode, benchmark_prefill
from benchmarks.profiler.utils.planner_utils import add_planner_arguments_to_parser
from benchmarks.profiler.utils.plot import (
plot_decode_performance,
plot_prefill_performance,
......@@ -64,6 +69,10 @@ async def run_profile(args):
# List to track all created deployment clients for cleanup in case of failure
deployment_clients = []
# Inherit aic_backend from backend if not explicitly set
if not args.aic_backend:
args.aic_backend = args.backend
try:
# Log MoE model support
if args.is_moe_model:
......@@ -132,20 +141,20 @@ async def run_profile(args):
raise ValueError(
"Must provide --aic-model-name when using --use-ai-configurator."
)
if not args.backend_version:
if not args.aic_backend_version:
raise ValueError(
"Must provide --backend-version when using --use-ai-configurator."
"Must provide --aic-backend-version when using --use-ai-configurator."
)
logger.info("Will use aiconfigurator to estimate perf.")
ai_configurator_perf_estimator = AIConfiguratorPerfEstimator(
args.aic_model_name,
args.aic_system.lower(),
args.backend,
args.backend_version,
args.aic_backend,
args.aic_backend_version,
)
else:
if args.aic_system or args.aic_model_name or args.backend_version:
if args.aic_system or args.aic_model_name or args.aic_backend_version:
logger.warning(
"Will ignore --aic-system, --aic-model-name, and/or --backend-version "
"when not using --use-ai-configurator."
......@@ -705,6 +714,23 @@ async def run_profile(args):
deployment_clients.remove(client)
logger.info("Deployment deleted")
# generate DGD with planner based on profiling results
config = generate_dgd_config_with_planner(
config_path=args.config,
config_modifier=config_modifier,
best_prefill_gpus=best_prefill_gpus,
best_decode_gpus=best_decode_gpus,
output_dir=args.output_dir,
args=args,
is_moe_model=args.is_moe_model,
num_gpus_per_node=args.num_gpus_per_node,
)
logger.info(f"Final DGD config with planner: {config}")
# save DGD config with planner
with open(f"{args.output_dir}/config_with_planner.yaml", "w") as f:
yaml.dump(config, f)
except Exception as e:
logger.error(f"Profile job failed with error: {e}")
raise
......@@ -714,6 +740,21 @@ async def run_profile(args):
await cleanup_remaining_deployments(deployment_clients, args.namespace)
logger.info("Final cleanup completed.")
# deploy the optimized DGD with planner
if args.deploy_after_profile and not args.dry_run:
logger.info("Deploying the optimized DGD with planner...")
# TODO: check conflicts for dynamo namespace and DGD name
# TODO: handle deployment errors and propagate proper error messages to users
client = DynamoDeploymentClient(
namespace=args.namespace,
base_log_dir=f"{args.output_dir}/final_deployment",
model_name=model_name,
service_name=args.service_name,
frontend_port=frontend_port,
deployment_name=config["metadata"]["name"],
)
await client.create_deployment(f"{args.output_dir}/config_with_planner.yaml")
if __name__ == "__main__":
parser = argparse.ArgumentParser(
......@@ -778,7 +819,8 @@ if __name__ == "__main__":
parser.add_argument(
"--itl", type=int, default=10, help="target Inter Token Latency in ms"
)
# below are arguments used for interpolating TTFT and ITL under different ISL/OSL
# arguments used for interpolating TTFT and ITL under different ISL/OSL
parser.add_argument(
"--max-context-length",
type=int,
......@@ -820,6 +862,17 @@ if __name__ == "__main__":
default=8,
help="Number of GPUs per node for MoE models - this will be the granularity when searching for the best TEP/DEP size",
)
# arguments for dgd config generation and deployment
parser.add_argument(
"--deploy-after-profile",
action="store_true",
help="deploy the optimized DGD with planner",
)
# Dynamically add all planner arguments from planner_argparse.py
add_planner_arguments_to_parser(parser, prefix="planner-")
# arguments if using aiconfigurator
parser.add_argument(
"--use-ai-configurator",
action="store_true",
......@@ -836,7 +889,13 @@ if __name__ == "__main__":
help="aiconfigurator name of the target model (e.g. QWEN3_32B, DEEPSEEK_V3)",
)
parser.add_argument(
"--backend-version",
"--aic-backend",
type=str,
default="",
help="aiconfigurator backend of the target model, if not provided, will use args.backend",
)
parser.add_argument(
"--aic-backend-version",
type=str,
help="Specify backend version when using aiconfigurator to estimate perf.",
)
......
......@@ -20,12 +20,14 @@ import re
import shlex
from typing import Literal, Optional, Protocol
import yaml
from pydantic import BaseModel
from benchmarks.profiler.utils.defaults import (
DEFAULT_MODEL_NAME,
DYNAMO_RUN_DEFAULT_PORT,
)
from benchmarks.profiler.utils.planner_utils import build_planner_args_from_namespace
from dynamo.planner.defaults import WORKER_COMPONENT_NAMES, SubComponentType
logger = logging.getLogger(__name__)
......@@ -39,7 +41,15 @@ console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
class VolumeMount(BaseModel):
name: str = "dynamo-pvc"
mountPoint: str = "/data"
class Container(BaseModel):
image: Optional[str] = None
workingDir: Optional[str] = None
command: Optional[list[str]] = None
args: Optional[list[str]] = None
model_config = {"extra": "allow"}
......@@ -67,12 +77,21 @@ class Services(BaseModel):
model_config = {"extra": "allow"}
class PVCConfig(BaseModel):
name: str = "dynamo-pvc"
create: Optional[bool] = False
model_config = {"extra": "allow"}
class Spec(BaseModel):
services: dict[str, Service]
pvcs: Optional[list[PVCConfig]] = None
model_config = {"extra": "allow"}
class Metadata(BaseModel):
name: str
model_config = {"extra": "allow"}
class Config(BaseModel):
......@@ -85,6 +104,22 @@ class MultinodeConfig(BaseModel):
nodeCount: int
class DgdPlannerServiceConfig(BaseModel):
dynamoNamespace: str = "dynamo" # placeholder
componentType: str = "planner"
replicas: int = 1
volumeMounts: list[VolumeMount] = [VolumeMount()]
extraPodSpec: PodSpec = PodSpec(
mainContainer=Container(
image="my-registry/dynamo-runtime:my-tag", # placeholder
workingDir="/workspace/components/src/dynamo/planner",
command=["python3", "-m", "planner_sla"],
args=[],
)
)
model_config = {"extra": "allow"}
def break_arguments(args: list[str] | None) -> list[str]:
ans: list[str] = []
if args is None:
......@@ -342,18 +377,31 @@ class ConfigModifierProtocol(Protocol):
...
@classmethod
def set_config_tp_size(cls, config: dict, tp_size: int) -> dict:
def set_config_tp_size(
cls,
config: dict,
tp_size: int,
component_type: SubComponentType = SubComponentType.DECODE,
) -> dict:
...
@classmethod
def set_config_tep_size(
cls, config: dict, tep_size: int, num_gpus_per_node: int
cls,
config: dict,
tep_size: int,
num_gpus_per_node: int,
component_type: SubComponentType = SubComponentType.DECODE,
) -> dict:
...
@classmethod
def set_config_dep_size(
cls, config: dict, dep_size: int, num_gpus_per_node: int
cls,
config: dict,
dep_size: int,
num_gpus_per_node: int,
component_type: SubComponentType = SubComponentType.DECODE,
) -> dict:
...
......@@ -473,9 +521,16 @@ class VllmV1ConfigModifier:
return cfg.model_dump()
@classmethod
def set_config_tp_size(cls, config: dict, tp_size: int):
def set_config_tp_size(
cls,
config: dict,
tp_size: int,
component_type: SubComponentType = SubComponentType.DECODE,
):
cfg = Config.model_validate(config)
worker_service = get_worker_service_from_config(config, backend="vllm")
worker_service = get_worker_service_from_config(
config, backend="vllm", sub_component_type=component_type
)
# Set up resources
setup_worker_service_resources(worker_service, tp_size)
......@@ -495,13 +550,25 @@ class VllmV1ConfigModifier:
return cfg.model_dump()
@classmethod
def set_config_tep_size(cls, config: dict, tep_size: int, num_gpus_per_node: int):
def set_config_tep_size(
cls,
config: dict,
tep_size: int,
num_gpus_per_node: int,
component_type: SubComponentType = SubComponentType.DECODE,
):
raise NotImplementedError(
"TEP (Tensor Expert Parallelism) is not implemented for VLLM backend"
)
@classmethod
def set_config_dep_size(cls, config: dict, dep_size: int, num_gpus_per_node: int):
def set_config_dep_size(
cls,
config: dict,
dep_size: int,
num_gpus_per_node: int,
component_type: SubComponentType = SubComponentType.DECODE,
):
raise NotImplementedError(
"DEP (Data Expert Parallelism) is not implemented for VLLM backend"
)
......@@ -692,9 +759,16 @@ class SGLangConfigModifier:
return cfg.model_dump()
@classmethod
def set_config_tp_size(cls, config: dict, tp_size: int):
def set_config_tp_size(
cls,
config: dict,
tp_size: int,
component_type: SubComponentType = SubComponentType.DECODE,
):
cfg = Config.model_validate(config)
worker_service = get_worker_service_from_config(config, backend="sglang")
worker_service = get_worker_service_from_config(
config, backend="sglang", sub_component_type=component_type
)
# Set up resources
setup_worker_service_resources(worker_service, tp_size)
......@@ -709,9 +783,17 @@ class SGLangConfigModifier:
return cfg.model_dump()
@classmethod
def set_config_tep_size(cls, config: dict, tep_size: int, num_gpus_per_node: int):
def set_config_tep_size(
cls,
config: dict,
tep_size: int,
num_gpus_per_node: int,
component_type: SubComponentType = SubComponentType.DECODE,
):
cfg = Config.model_validate(config)
worker_service = get_worker_service_from_config(config, backend="sglang")
worker_service = get_worker_service_from_config(
config, backend="sglang", sub_component_type=component_type
)
# Set up resources with multinode configuration
setup_worker_service_resources(worker_service, tep_size, num_gpus_per_node)
......@@ -736,9 +818,17 @@ class SGLangConfigModifier:
return cfg.model_dump()
@classmethod
def set_config_dep_size(cls, config: dict, dep_size: int, num_gpus_per_node: int):
def set_config_dep_size(
cls,
config: dict,
dep_size: int,
num_gpus_per_node: int,
component_type: SubComponentType = SubComponentType.DECODE,
):
cfg = Config.model_validate(config)
worker_service = get_worker_service_from_config(config, backend="sglang")
worker_service = get_worker_service_from_config(
config, backend="sglang", sub_component_type=component_type
)
# Set up resources with multinode configuration
setup_worker_service_resources(worker_service, dep_size, num_gpus_per_node)
......@@ -966,12 +1056,19 @@ class TrtllmConfigModifier:
return cfg.model_dump()
@classmethod
def set_config_tp_size(cls, config: dict, tp_size: int):
def set_config_tp_size(
cls,
config: dict,
tp_size: int,
component_type: SubComponentType = SubComponentType.DECODE,
):
cfg = Config.model_validate(config)
# Get the worker service using helper function
# This assumes convert_config has been called, so the service is named decode_worker_k8s_name
worker_service = get_worker_service_from_config(config, backend="trtllm")
worker_service = get_worker_service_from_config(
config, backend="trtllm", sub_component_type=component_type
)
# Set up resources
setup_worker_service_resources(worker_service, tp_size)
......@@ -996,13 +1093,25 @@ class TrtllmConfigModifier:
return cfg.model_dump()
@classmethod
def set_config_tep_size(cls, config: dict, tep_size: int, num_gpus_per_node: int):
def set_config_tep_size(
cls,
config: dict,
tep_size: int,
num_gpus_per_node: int,
component_type: SubComponentType = SubComponentType.DECODE,
):
raise NotImplementedError(
"TEP (Tensor Expert Parallelism) is not implemented for TrtLLM backend"
)
@classmethod
def set_config_dep_size(cls, config: dict, dep_size: int, num_gpus_per_node: int):
def set_config_dep_size(
cls,
config: dict,
dep_size: int,
num_gpus_per_node: int,
component_type: SubComponentType = SubComponentType.DECODE,
):
raise NotImplementedError(
"DEP (Data Expert Parallelism) is not implemented for TrtLLM backend"
)
......@@ -1083,5 +1192,118 @@ CONFIG_MODIFIERS: dict[str, type[ConfigModifierProtocol]] = {
"trtllm": TrtllmConfigModifier,
}
def generate_dgd_config_with_planner(
config_path: str,
config_modifier,
best_prefill_gpus: int,
best_decode_gpus: int,
output_dir: str,
args,
is_moe_model: bool = False,
num_gpus_per_node: int = 8,
):
"""Generate DGD config with planner based on profiling results.
Args:
config_path: Path to the YAML config file
config_modifier: Config modifier instance (e.g., SGLangConfigModifier)
best_prefill_gpus: Number of GPUs for prefill engine
best_decode_gpus: Number of GPUs for decode engine
output_dir: Output directory for profile results
args: Parsed arguments namespace from profile_sla
is_moe_model: Whether this is an MoE model
num_gpus_per_node: Number of GPUs per node (for MoE models)
Returns:
dict: Final DGD config with planner service configured
"""
# Load config from file
with open(config_path, "r") as f:
config = yaml.safe_load(f)
if not is_moe_model:
# dense model, use TP for both prefill and decode
config = config_modifier.set_config_tp_size(
config, best_prefill_gpus, SubComponentType.PREFILL
)
config = config_modifier.set_config_tp_size(
config, best_decode_gpus, SubComponentType.DECODE
)
else:
# MoE model, use TEP for prefill and DEP for decode
config = config_modifier.set_config_tep_size(
config,
best_prefill_gpus,
num_gpus_per_node,
SubComponentType.PREFILL,
)
config = config_modifier.set_config_dep_size(
config,
best_decode_gpus,
num_gpus_per_node,
SubComponentType.DECODE,
)
config = Config.model_validate(config)
# add PVC config if not present
if not config.spec.pvcs:
config.spec.pvcs = [PVCConfig()]
# add the planner service
planner_config = DgdPlannerServiceConfig()
frontend_service = config.spec.services["Frontend"]
planner_config.dynamoNamespace = getattr(frontend_service, "dynamoNamespace", "dynamo") # type: ignore[attr-defined]
if frontend_service.extraPodSpec and frontend_service.extraPodSpec.mainContainer:
frontend_image = frontend_service.extraPodSpec.mainContainer.image
if frontend_image and planner_config.extraPodSpec.mainContainer:
planner_config.extraPodSpec.mainContainer.image = frontend_image
# Build planner args dynamically from parsed arguments
# This includes shared args (ttft, itl, backend, namespace) from profile_sla
# and planner-specific args (with planner_ prefix)
planner_args = build_planner_args_from_namespace(args, prefix="planner_")
# Override profiling-specific arguments with results from profiling
# Remove and re-add to ensure correct values from profiling context
planner_args = [
arg
for arg in planner_args
if not any(
arg.startswith(f"--{key}=")
for key in [
"namespace",
"prefill-engine-num-gpu",
"decode-engine-num-gpu",
"profile-results-dir",
]
)
]
# Add arguments determined by profiling results
frontend_namespace = getattr(config.spec.services["Frontend"], "dynamoNamespace", "dynamo") # type: ignore[attr-defined]
planner_args.extend(
[
f"--namespace={frontend_namespace}",
f"--prefill-engine-num-gpu={best_prefill_gpus}",
f"--decode-engine-num-gpu={best_decode_gpus}",
f"--profile-results-dir={output_dir}",
]
)
if (
planner_config.extraPodSpec.mainContainer
and planner_config.extraPodSpec.mainContainer.args is not None
):
planner_config.extraPodSpec.mainContainer.args.extend(planner_args)
# Convert planner config to dict first, then the entire config to dict
planner_dict = planner_config.model_dump(exclude_unset=False)
config_dict = config.model_dump(exclude_unset=False)
config_dict["spec"]["services"]["Planner"] = planner_dict
return config_dict
# Re-export WORKER_COMPONENT_NAMES for profile_sla.py
__all__ = ["CONFIG_MODIFIERS", "WORKER_COMPONENT_NAMES"]
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
from dynamo.planner.utils.planner_argparse import create_sla_planner_parser
def _get_action_type(action: argparse.Action) -> str | None:
"""
Extract action type string from an argparse Action object.
Args:
action: The argparse Action object
Returns:
Action type string ('store_true', 'store_false', 'store_const') or None
"""
action_class_name = type(action).__name__
if action_class_name == "_StoreTrueAction":
return "store_true"
elif action_class_name == "_StoreFalseAction":
return "store_false"
elif action_class_name == "_StoreConstAction":
return "store_const"
return None
def _build_action_kwargs(
action: argparse.Action, action_type: str | None, prefix: str
) -> dict:
"""
Build kwargs dictionary for add_argument based on action type.
Args:
action: The argparse Action object
action_type: The action type string ('store_true', 'store_false', etc.)
prefix: Prefix for the destination name
Returns:
Dictionary of kwargs for add_argument
"""
kwargs = {
"dest": f"{prefix.replace('-', '_')}{action.dest}",
"default": action.default,
"help": action.help,
}
# Add action type if specified
if action_type is not None:
kwargs["action"] = action_type
# For store_true/store_false, don't add type, nargs, metavar, const
# For other actions, add them if they're set
if action_type not in ["store_true", "store_false"]:
if action.type is not None:
kwargs["type"] = action.type
if action.nargs is not None:
kwargs["nargs"] = action.nargs
if action.metavar is not None:
kwargs["metavar"] = action.metavar
if action.choices is not None:
kwargs["choices"] = action.choices
if action_type == "store_const" and action.const is not None:
kwargs["const"] = action.const
return kwargs
def _format_arg_for_command_line(arg_name: str, value) -> list[str]:
"""
Format an argument name and value for command line usage.
Args:
arg_name: The argument name (without dashes)
value: The argument value
Returns:
List of command-line argument strings (empty list if value is None or False bool)
"""
if value is None:
return []
if isinstance(value, bool):
# For boolean flags, only add if True
if value:
return [f"--{arg_name}"]
return []
else:
# For valued arguments
return [f"--{arg_name}={value}"]
def _collect_args_from_namespace(
args: argparse.Namespace, arg_names: list[str], prefix_to_strip: str = ""
) -> list[str]:
"""
Collect and format command-line arguments from a namespace for given attribute names.
Args:
args: The argparse Namespace containing parsed arguments
arg_names: List of attribute names to collect from the namespace
prefix_to_strip: Optional prefix to remove from attribute names before formatting
Returns:
List of formatted command-line argument strings
"""
result = []
for attr_name in sorted(arg_names): # sorted for consistent ordering
value = getattr(args, attr_name)
# Strip prefix and convert to command-line argument name
if prefix_to_strip and attr_name.startswith(prefix_to_strip):
arg_name = attr_name[len(prefix_to_strip) :].replace("_", "-")
else:
arg_name = attr_name.replace("_", "-")
result.extend(_format_arg_for_command_line(arg_name, value))
return result
def add_planner_arguments_to_parser(
parser: argparse.ArgumentParser, prefix: str = "planner-"
):
"""
Dynamically add planner arguments from create_sla_planner_parser() to the given parser.
Only adds arguments that don't already exist in the parser (without prefix).
Args:
parser: The ArgumentParser to add arguments to
prefix: Prefix to add to planner argument names to avoid conflicts
"""
# Create a temporary planner parser to extract its arguments
planner_parser = create_sla_planner_parser()
# Get existing argument names in the parser (without dashes)
existing_dests = {action.dest for action in parser._actions}
# Add a group for planner arguments
planner_group = parser.add_argument_group(
"planner arguments",
"Arguments that will be passed to the planner service (only showing args not already in profile_sla)",
)
# Iterate through planner parser actions and add them with prefix
for action in planner_parser._actions:
# Skip help and positional arguments
if action.dest in ["help"] or not action.option_strings:
continue
# Skip if this argument already exists in the main parser (without prefix)
if action.dest in existing_dests:
continue
# Create new option strings with prefix
new_option_strings = [
f"--{prefix}{opt.lstrip('-')}" for opt in action.option_strings
]
# Determine the action type and build kwargs
action_type = _get_action_type(action)
kwargs = _build_action_kwargs(action, action_type, prefix)
planner_group.add_argument(*new_option_strings, **kwargs)
def build_planner_args_from_namespace(
args: argparse.Namespace, prefix: str = "planner_"
) -> list[str]:
"""
Build planner command-line arguments from parsed args namespace.
Automatically detects shared arguments between profile_sla and planner,
and uses profile_sla values for those.
Args:
args: Parsed arguments namespace
prefix: Prefix used for planner arguments
Returns:
List of planner command-line arguments
"""
planner_args = []
# Auto-detect shared arguments by comparing planner parser with args namespace
planner_parser = create_sla_planner_parser()
planner_arg_dests = {
action.dest
for action in planner_parser._actions
if action.dest != "help" and action.option_strings
}
# Find arguments in args namespace that match planner arguments (not prefixed)
# These are shared arguments that should come from profile_sla
shared_arg_dests = {dest for dest in planner_arg_dests if hasattr(args, dest)}
# Add shared arguments from profile_sla (without prefix)
planner_args.extend(_collect_args_from_namespace(args, list(shared_arg_dests)))
# Get all planner-prefixed attributes from args (planner-specific only)
prefixed_attrs = [attr for attr in dir(args) if attr.startswith(prefix)]
planner_args.extend(
_collect_args_from_namespace(args, prefixed_attrs, prefix_to_strip=prefix)
)
return planner_args
......@@ -31,7 +31,7 @@ spec:
extraPodSpec:
mainContainer:
image: my-registry/sglang-runtime:my-tag
workingDir: /workspace/components/planner/src/dynamo/planner
workingDir: /workspace/components/src/dynamo/planner
command:
- python3
- -m
......
......@@ -30,7 +30,7 @@ spec:
extraPodSpec:
mainContainer:
image: nvcr.io/nvidia/ai-dynamo/vllm-runtime:my-tag
workingDir: /workspace/components/planner/src/dynamo/planner
workingDir: /workspace/components/src/dynamo/planner
command:
- python3
- -m
......
......@@ -8,7 +8,6 @@ metadata:
labels:
app: pvc-access
spec:
activeDeadlineSeconds: 3000 # Auto-terminate after ~50 minutes; Pod will be Failed (not auto-deleted)
securityContext:
runAsNonRoot: true
runAsUser: 1000
......
......@@ -128,10 +128,13 @@ spec:
**For MoE models**, use `profile_sla_moe_job.yaml` with TEP/DEP configuration instead.
If you want to automatically deploy the optimized DGD with planner after profiling, add `--deploy-after-profile` to the profiling job. It will deploy the DGD with the engine of the optimized parallelization mapping found for the SLA targets.
### Advanced Configuration
- **Model caching**: For large models, create a multi-attach PVC to cache the model. See [recipes](../../recipes/README.md) for details.
- **Custom configurations**: Use the manifest injector to place custom DGD configurations in the PVC.
- **Custom disaggregated configurations**: Use the manifest injector to place custom DGD configurations in the PVC.
- **Planner Config Passthrough**: To specify custom planner configurations (e.g., `adjustment-interval` or `load-predictor`) in the generated or deployed DGD config, add a `planner-` prefix to the argument. For example, to specify `--adjustment-interval=60` in SLA planner, add `--planner-adjustment-interval=60` arg to the profiling job.
- **Resource allocation**: Modify the job YAML to adjust GPU and memory requirements.
### Viewing Profiling Results
......@@ -168,9 +171,10 @@ The profiling results directory contains the following structure:
│ ├── raw_data.npz # Prefill interpolation data
│ ├── prefill_ttft_interpolation.png # TTFT vs ISL plot
│ └── prefill_throughput_interpolation.png # Throughput vs ISL plot
└── selected_decode_interpolation/
├── raw_data.npz # Decode interpolation data
└── decode_tp{best_tp}.png # 3D ITL surface plot
├── selected_decode_interpolation/
│ ├── raw_data.npz # Decode interpolation data
│ └── decode_tp{best_tp}.png # 3D ITL surface plot
└── config_with_planner.yaml # Generated DGD config with planner
```
#### Viewing Performance Plots
......@@ -272,11 +276,12 @@ Example command for TensorRT-LLM:
```bash
python3 -m benchmarks.profiler.profile_sla \
--config ./components/backends/trtllm/deploy/disagg.yaml \
--backend trtllm \
--use-ai-configurator \
--aic-system h200_sxm \
--aic-model-name QWEN3_32B \
--backend trtllm \
--backend-version 0.20.0 \
--aic-backend trtllm \ # optional, will use --backend if not provided
--aic-backend-version 0.20.0 \
--isl 3000 \
--osl 150 \
--ttft 0.2 \
......
......@@ -34,7 +34,16 @@ flowchart TD
style B fill:#fff8e1
```
## Phase 1: Pre-Deployment Profiling (REQUIRED)
## Prerequisites
Before deploying the SLA planner, ensure:
- **Dynamo platform installed** (see [Installation Guide](/docs/kubernetes/installation_guide.md))
- **[kube-prometheus-stack](/docs/kubernetes/metrics.md) installed and running.** By default, the prometheus server is not deployed in the `monitoring` namespace. If it is deployed to a different namespace, set `dynamo-operator.dynamo.metrics.prometheusEndpoint="http://prometheus-kube-prometheus-prometheus.<namespace>.svc.cluster.local:9090"`.
- **Benchmarking resources setup** (see [Kubernetes utilities for Dynamo Benchmarking and Profiling](../../deploy/utils/README.md)) The script will create a `dynamo-pvc` with `ReadWriteMany` access, if your cluster's default storageClassName does not allow `ReadWriteMany`, you need to specify a different storageClassName in `pvc.yaml`.
## Pre-Deployment Profiling
Deploying planner starts with running pre-deployment profiling.
> [!WARNING]
> **MANDATORY**: Pre-deployment profiling must be completed before deploying SLA planner. This process analyzes your model's performance characteristics to determine optimal tensor parallelism configurations and scaling parameters.
......@@ -87,10 +96,13 @@ spec:
- "20" # target ITL is 20ms
- --backend
- <vllm/sglang>
- --deploy-after-profile
```
For MoE models, edit `$DYNAMO_HOME/benchmarks/profiler/deploy/profile_sla_moe_job.yaml` instead.
To automatically deploy the optimized DGD with planner after profiling, add `--deploy-after-profile` to the profiling job. It will deploy the DGD with the engine of the optimized parallelization mapping found for the SLA targets.
### Step 1.4: Run Profiling
Set the container image and config path:
......@@ -138,33 +150,14 @@ For detailed information about the output structure, performance plots, and how
```
Suggested prefill TP:4 (TTFT 48.37 ms, throughput 15505.23 tokens/s/GPU)
Suggested decode TP:4 (ITL 4.83 ms, throughput 51.22 tokens/s/GPU)
...
Final DGD config with planner: {...}
Deploying the optimized DGD with planner...
```
## Phase 2: Deploy SLA Planner
### Step 2.1: Verify Prerequisites
Before deploying the SLA planner, ensure:
- **Pre-deployment profiling completed successfully** (from Phase 1)
- **Profiling results saved to `dynamo-pvc` PVC**
- **[kube-prometheus-stack](/docs/kubernetes/metrics.md) installed and running.** By default, the prometheus server is not deployed in the `monitoring` namespace. If it is deployed to a different namespace, set `dynamo-operator.dynamo.metrics.prometheusEndpoint="http://prometheus-kube-prometheus-prometheus.<namespace>.svc.cluster.local:9090"`.
- **Dynamo platform installed** (see [Installation Guide](/docs/kubernetes/installation_guide.md))
- **Prefill and decode workers use the best parallelization mapping from profiling**
### Step 2.2: Deploy the System
We use vllm as the backend engine in this guide. SLA planner also supports SGLang and TensorRT-LLM.
### Step 1.7: Wait for Deployment to be Ready
```bash
# Apply the disaggregated planner deployment
kubectl apply -f components/backends/vllm/deploy/disagg_planner.yaml -n $NAMESPACE # for vllm
kubectl apply -f components/backends/sglang/deploy/disagg_planner.yaml -n $NAMESPACE # for sglang
kubectl apply -f components/backends/trtllm/deploy/disagg_planner.yaml -n $NAMESPACE # for trtllm
# Check deployment status
kubectl get pods -n $NAMESPACE
```
......@@ -176,7 +169,7 @@ vllm-disagg-planner-backend-* 1/1 Running
vllm-disagg-planner-prefill-* 1/1 Running
```
### Step 2.3: Test the System
### Step 1.8: Test the System
```bash
# Port forward to frontend
......@@ -198,7 +191,7 @@ curl -N http://localhost:8000/v1/chat/completions \
}'
```
### Step 2.4: Monitor Scaling
### Step 1.9: Monitor Scaling
```bash
# Check planner logs for scaling decisions
......@@ -213,7 +206,7 @@ Observed ttft: X.XXXs itl: X.XXXs
Number of prefill workers: 1, number of decode workers: 1
```
## Phase 3: Production Readiness
## Production Readiness
### Monitoring Metrics
......
......@@ -77,7 +77,7 @@ spec:
extraPodSpec:
mainContainer:
image: my-registry/vllm-runtime:my-tag
workingDir: /workspace/components/planner/src/dynamo/planner
workingDir: /workspace/components/src/dynamo/planner
ports:
- name: metrics
containerPort: 9085
......
......@@ -42,7 +42,7 @@ spec:
extraPodSpec:
mainContainer:
image: my-registry/vllm-runtime:my-tag
workingDir: /workspace/components/planner/src/dynamo/planner
workingDir: /workspace/components/src/dynamo/planner
ports:
- name: metrics
containerPort: 9085
......
......@@ -48,15 +48,17 @@ class TestProfileSlaAiconfigurator:
self.use_ai_configurator = True
self.aic_system = "h200_sxm"
self.aic_model_name = "QWEN3_32B"
self.backend_version = "0.20.0"
self.aic_backend = ""
self.aic_backend_version = "0.20.0"
self.num_gpus_per_node = 8
self.deploy_after_profile = False
return Args()
@pytest.mark.pre_merge
@pytest.mark.asyncio
@pytest.mark.parametrize(
"missing_arg", ["aic_system", "aic_model_name", "backend_version"]
"missing_arg", ["aic_system", "aic_model_name", "aic_backend_version"]
)
async def test_aiconfigurator_missing_args(self, trtllm_args, missing_arg):
# Check that validation error happens when a required arg is missing.
......@@ -71,7 +73,7 @@ class TestProfileSlaAiconfigurator:
[
# these values don't exist in the aiconfigurator database.
("aic_system", "fake_gpu_system"),
("backend_version", "0.1.0"),
("aic_backend_version", "0.1.0"),
],
)
async def test_aiconfiguator_no_data(self, trtllm_args, arg_name, bad_value):
......@@ -89,7 +91,7 @@ class TestProfileSlaAiconfigurator:
@pytest.mark.asyncio
@pytest.mark.parametrize(
"backend, backend_version",
"backend, aic_backend_version",
[
("trtllm", "0.20.0"),
("trtllm", "1.0.0rc3"),
......@@ -97,10 +99,10 @@ class TestProfileSlaAiconfigurator:
)
@pytest.mark.parametrize("model_name", ["QWEN3_32B", "GPT_7B", "LLAMA3.1_405B"])
async def test_trtllm_aiconfigurator_many(
self, trtllm_args, model_name, backend, backend_version
self, trtllm_args, model_name, backend, aic_backend_version
):
# Test that profile_sla works with a variety of backend versions and model names.
trtllm_args.aic_model_name = model_name
trtllm_args.backend = backend
trtllm_args.backend_version = backend_version
trtllm_args.aic_backend_version = aic_backend_version
await run_profile(trtllm_args)
......@@ -50,8 +50,10 @@ class TestProfileSLADryRun:
self.use_ai_configurator = False
self.aic_system = None
self.aic_model_name = None
self.backend_version = None
self.aic_backend = ""
self.aic_backend_version = None
self.num_gpus_per_node = 8
self.deploy_after_profile = False
return Args()
......@@ -82,8 +84,10 @@ class TestProfileSLADryRun:
self.use_ai_configurator = False
self.aic_system = None
self.aic_model_name = None
self.backend_version = None
self.aic_backend = ""
self.aic_backend_version = None
self.num_gpus_per_node = 8
self.deploy_after_profile = False
return Args()
......@@ -128,8 +132,10 @@ class TestProfileSLADryRun:
self.use_ai_configurator = False
self.aic_system = None
self.aic_model_name = None
self.backend_version = None
self.aic_backend = ""
self.aic_backend_version = None
self.num_gpus_per_node = 8
self.deploy_after_profile = False
return Args()
......@@ -169,8 +175,10 @@ class TestProfileSLADryRun:
self.use_ai_configurator = False
self.aic_system = None
self.aic_model_name = None
self.backend_version = None
self.aic_backend = ""
self.aic_backend_version = None
self.num_gpus_per_node = 8
self.deploy_after_profile = False
return Args()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment