"docs/vscode:/vscode.git/clone" did not exist on "525030324e3583f2485dd8f0fdb28dd3988549ff"
Unverified Commit a1b38af2 authored by Hongkuan Zhou's avatar Hongkuan Zhou Committed by GitHub
Browse files

feat: automatic profiling config generation (#3787)


Signed-off-by: default avatarhongkuanz <hongkuanz@nvidia.com>
Signed-off-by: default avatarHongkuan Zhou <tedzhouhk@gmail.com>
Co-authored-by: default avatarcoderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
parent 94aa2a7b
...@@ -410,6 +410,14 @@ class ConfigModifierProtocol(Protocol): ...@@ -410,6 +410,14 @@ class ConfigModifierProtocol(Protocol):
) -> int: ) -> int:
... ...
@classmethod
def load_default_config(cls) -> dict:
...
@classmethod
def update_model(cls, config: dict, model_name: str) -> dict:
...
def generate_dgd_config_with_planner( def generate_dgd_config_with_planner(
config_path: str, config_path: str,
......
...@@ -5,6 +5,8 @@ import logging ...@@ -5,6 +5,8 @@ import logging
import re import re
from typing import Literal from typing import Literal
import yaml
from benchmarks.profiler.utils.config import ( from benchmarks.profiler.utils.config import (
Config, Config,
append_argument, append_argument,
...@@ -33,7 +35,43 @@ console_handler.setFormatter(formatter) ...@@ -33,7 +35,43 @@ console_handler.setFormatter(formatter)
logger.addHandler(console_handler) logger.addHandler(console_handler)
DEFAULT_SGLANG_CONFIG_PATH = "components/backends/sglang/deploy/disagg.yaml"
class SGLangConfigModifier: class SGLangConfigModifier:
@classmethod
def load_default_config(cls) -> dict:
with open(DEFAULT_SGLANG_CONFIG_PATH, "r") as f:
return yaml.safe_load(f)
@classmethod
def update_model(cls, config, model_name: str) -> dict:
# change the model to serve
cfg = Config.model_validate(config)
# Update model for both prefill and decode workers
for sub_component_type in [SubComponentType.PREFILL, SubComponentType.DECODE]:
try:
worker_service = get_worker_service_from_config(
cfg, backend="sglang", sub_component_type=sub_component_type
)
args = validate_and_get_worker_args(worker_service, backend="sglang")
args = break_arguments(args)
# Update both --model-path and --served-model-name
args = set_argument_value(args, "--model-path", model_name)
args = set_argument_value(args, "--served-model-name", model_name)
worker_service.extraPodSpec.mainContainer.args = args
except (ValueError, KeyError):
# Service might not exist (e.g., in aggregated mode)
logger.debug(
f"Skipping {sub_component_type} service as it doesn't exist"
)
continue
return cfg.model_dump()
@classmethod @classmethod
def convert_config( def convert_config(
cls, cls,
......
...@@ -6,6 +6,8 @@ import logging ...@@ -6,6 +6,8 @@ import logging
import re import re
from typing import Literal from typing import Literal
import yaml
from benchmarks.profiler.utils.config import ( from benchmarks.profiler.utils.config import (
Config, Config,
append_argument, append_argument,
...@@ -14,6 +16,7 @@ from benchmarks.profiler.utils.config import ( ...@@ -14,6 +16,7 @@ from benchmarks.profiler.utils.config import (
get_worker_service_from_config, get_worker_service_from_config,
parse_override_engine_args, parse_override_engine_args,
remove_valued_arguments, remove_valued_arguments,
set_argument_value,
setup_worker_service_resources, setup_worker_service_resources,
validate_and_get_worker_args, validate_and_get_worker_args,
) )
...@@ -34,7 +37,43 @@ console_handler.setFormatter(formatter) ...@@ -34,7 +37,43 @@ console_handler.setFormatter(formatter)
logger.addHandler(console_handler) logger.addHandler(console_handler)
DEFAULT_TRTLLM_CONFIG_PATH = "components/backends/trtllm/deploy/disagg.yaml"
class TrtllmConfigModifier: class TrtllmConfigModifier:
@classmethod
def load_default_config(cls) -> dict:
with open(DEFAULT_TRTLLM_CONFIG_PATH, "r") as f:
return yaml.safe_load(f)
@classmethod
def update_model(cls, config, model_name: str) -> dict:
# change the model to serve
cfg = Config.model_validate(config)
# Update model for both prefill and decode workers
for sub_component_type in [SubComponentType.PREFILL, SubComponentType.DECODE]:
try:
worker_service = get_worker_service_from_config(
cfg, backend="trtllm", sub_component_type=sub_component_type
)
args = validate_and_get_worker_args(worker_service, backend="trtllm")
args = break_arguments(args)
# Update both --model-path and --served-model-name
args = set_argument_value(args, "--model-path", model_name)
args = set_argument_value(args, "--served-model-name", model_name)
worker_service.extraPodSpec.mainContainer.args = args
except (ValueError, KeyError):
# Service might not exist (e.g., in aggregated mode)
logger.debug(
f"Skipping {sub_component_type} service as it doesn't exist"
)
continue
return cfg.model_dump()
@classmethod @classmethod
def convert_config( def convert_config(
cls, cls,
......
...@@ -4,12 +4,15 @@ ...@@ -4,12 +4,15 @@
import logging import logging
from typing import Literal from typing import Literal
import yaml
from benchmarks.profiler.utils.config import ( from benchmarks.profiler.utils.config import (
Config, Config,
append_argument, append_argument,
break_arguments, break_arguments,
get_service_name_by_type, get_service_name_by_type,
get_worker_service_from_config, get_worker_service_from_config,
set_argument_value,
setup_worker_service_resources, setup_worker_service_resources,
validate_and_get_worker_args, validate_and_get_worker_args,
) )
...@@ -30,7 +33,42 @@ console_handler.setFormatter(formatter) ...@@ -30,7 +33,42 @@ console_handler.setFormatter(formatter)
logger.addHandler(console_handler) logger.addHandler(console_handler)
DEFAULT_VLLM_CONFIG_PATH = "components/backends/vllm/deploy/disagg.yaml"
class VllmV1ConfigModifier: class VllmV1ConfigModifier:
@classmethod
def load_default_config(cls) -> dict:
with open(DEFAULT_VLLM_CONFIG_PATH, "r") as f:
return yaml.safe_load(f)
@classmethod
def update_model(cls, config, model_name: str) -> dict:
# change the model to serve
cfg = Config.model_validate(config)
# Update model for both prefill and decode workers
for sub_component_type in [SubComponentType.PREFILL, SubComponentType.DECODE]:
try:
worker_service = get_worker_service_from_config(
cfg, backend="vllm", sub_component_type=sub_component_type
)
args = validate_and_get_worker_args(worker_service, backend="vllm")
args = break_arguments(args)
# Update --model (vllm uses --model instead of --model-path and --served-model-name)
args = set_argument_value(args, "--model", model_name)
worker_service.extraPodSpec.mainContainer.args = args
except (ValueError, KeyError):
# Service might not exist (e.g., in aggregated mode)
logger.debug(
f"Skipping {sub_component_type} service as it doesn't exist"
)
continue
return cfg.model_dump()
@classmethod @classmethod
def convert_config( def convert_config(
cls, cls,
......
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
from pathlib import Path
from typing import Optional, Union
from huggingface_hub import model_info
from transformers import AutoConfig
DTYPE_BYTES_MAP = {
"F32": 4, # FP32: 4 bytes per parameter
"BF16": 2, # BF16: 2 bytes per parameter
"F16": 2, # FP16: 2 bytes per parameter
"F8_E4M3": 1, # FP8: 1 byte per parameter
"F8_E5M2": 1, # FP8: 1 byte per parameter
"F8_E8M0": 1, # FP8: 1 byte per parameter
"I8": 1, # INT8: 1 byte per parameter
"I4": 0.5, # INT4: 0.5 bytes per parameter
}
CONTEXT_LENGTH_ATTRS = [
"max_position_embeddings", # Most common (BERT, GPT, LLaMA, etc.)
"n_positions", # GPT-2, GPT-Neo
"max_sequence_length", # Some models
"seq_length", # Some older models
"model_max_length", # Some tokenizer configs
"sliding_window", # Mistral with sliding window attention
]
# only for MLA + MoE models, treat other MoE models as dense models
MOE_ARCHITECTURES = {"DeepseekV3ForCausalLM", "DeepseekV32ForCausalLM"}
def get_local_model_weight_size(
model_path: Union[str, Path],
) -> float:
"""Return model size in MB by scanning local directory."""
model_path = Path(model_path)
if not model_path.exists():
raise FileNotFoundError(f"Model path does not exist: {model_path}")
if not model_path.is_dir():
raise ValueError(f"Model path is not a directory: {model_path}")
# Weight file extensions to look for
weight_extensions = [".safetensors", ".bin", ".pt", ".pth"]
total_size_bytes = 0
for file_path in model_path.rglob("*"):
if file_path.is_file() and any(
str(file_path).endswith(ext) for ext in weight_extensions
):
total_size_bytes += file_path.stat().st_size
return total_size_bytes / (1024**2)
def get_model_weight_size_from_hub(
model_name: str,
token: Optional[str] = None,
) -> float:
"""Return model size in MB by querying Hugging Face Hub API."""
try:
info = model_info(model_name, token=token)
# Filter for model weight files (safetensors or pytorch bin files)
# Also filter out files with None size
weight_extensions = [".safetensors", ".bin", ".pt", ".pth"]
total_size_bytes = 0
if info.siblings is not None:
for sibling in info.siblings:
if any(sibling.rfilename.endswith(ext) for ext in weight_extensions):
if sibling.size is not None:
total_size_bytes += sibling.size
# If no file sizes were available, try to estimate from safetensors metadata
if total_size_bytes == 0 and info.safetensors is not None:
# SafeTensors info gives parameter counts per dtype
for dtype, param_count in info.safetensors.parameters.items():
bytes_per_param = DTYPE_BYTES_MAP.get(
dtype, 2
) # Default to 2 bytes (FP16/BF16)
total_size_bytes += int(param_count * bytes_per_param)
return total_size_bytes / (1024**2)
except Exception as e:
raise RuntimeError(f"Failed to get model info from Hub: {e}")
def get_model_weight_size(
model_name_or_path: Union[str, Path],
) -> float:
"""Return model size in MB (auto-detects local vs HF Hub)."""
path = Path(model_name_or_path)
if path.exists() and path.is_dir():
# Local model
return get_local_model_weight_size(model_name_or_path)
else:
# HF Hub model
return get_model_weight_size_from_hub(str(model_name_or_path))
def get_model_info(
model_name_or_path: Union[str, Path],
trust_remote_code: bool = False,
) -> dict:
model_size = get_model_weight_size(model_name_or_path)
config = AutoConfig.from_pretrained(
model_name_or_path,
trust_remote_code=trust_remote_code,
)
if config.architectures[0] in MOE_ARCHITECTURES:
config.is_moe = True
else:
config.is_moe = False
# Detect max context length from config
# Different models use different attribute names for max context length
max_context_length = None
for attr in CONTEXT_LENGTH_ATTRS:
if hasattr(config, attr):
value = getattr(config, attr)
if value is not None:
max_context_length = value
break
return {
"model_size": model_size,
"is_moe": config.is_moe,
"max_context_length": max_context_length,
}
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--model", type=str, required=True)
args = parser.parse_args()
print(get_model_info(args.model))
...@@ -8,6 +8,7 @@ from typing import Any, Dict ...@@ -8,6 +8,7 @@ from typing import Any, Dict
import yaml import yaml
from benchmarks.profiler.utils.planner_utils import add_planner_arguments_to_parser from benchmarks.profiler.utils.planner_utils import add_planner_arguments_to_parser
from benchmarks.profiler.utils.search_space_autogen import auto_generate_search_space
def parse_config_string(config_str: str) -> Dict[str, Any]: def parse_config_string(config_str: str) -> Dict[str, Any]:
...@@ -64,15 +65,16 @@ def create_profiler_parser() -> argparse.Namespace: ...@@ -64,15 +65,16 @@ def create_profiler_parser() -> argparse.Namespace:
deployment: deployment:
namespace: String (kubernetes namespace, default: dynamo-sla-profiler) namespace: String (kubernetes namespace, default: dynamo-sla-profiler)
service_name: String (service name, default: "") service_name: String (service name, default: "")
model: String (model to serve, can be HF model name or local model path)
engine: engine:
backend: String (backend type, currently support [vllm, sglang, trtllm], default: vllm) backend: String (backend type, currently support [vllm, sglang, trtllm], default: vllm)
config: String (path to the DynamoGraphDeployment config file) config: String (path to the DynamoGraphDeployment config file, default: "")
max_context_length: Int (maximum context length supported by the served model, default: 16384) max_context_length: Int (maximum context length supported by the served model, default: 0)
is_moe_model: Boolean (enable MoE (Mixture of Experts) model support, use TEP for prefill and DEP for decode, default: False) is_moe_model: Boolean (enable MoE (Mixture of Experts) model support, use TEP for prefill and DEP for decode, default: False)
hardware: hardware:
min_num_gpus_per_engine: Int (minimum number of GPUs per engine, default: 1) min_num_gpus_per_engine: Int (minimum number of GPUs per engine, default: 0)
max_num_gpus_per_engine: Int (maximum number of GPUs per engine, default: 8) max_num_gpus_per_engine: Int (maximum number of GPUs per engine, default: 0)
num_gpus_per_node: Int (number of GPUs per node for MoE models - this will be the granularity when searching for the best TEP/DEP size, default: 8) num_gpus_per_node: Int (number of GPUs per node for MoE models - this will be the granularity when searching for the best TEP/DEP size, default: 0)
sweep: sweep:
skip_existing_results: Boolean (skip TP sizes that already have results in the output directory, default: False) skip_existing_results: Boolean (skip TP sizes that already have results in the output directory, default: False)
force_rerun: Boolean (force re-running all tests even if results already exist (overrides --skip-existing-results), default: False) force_rerun: Boolean (force re-running all tests even if results already exist (overrides --skip-existing-results), default: False)
...@@ -113,6 +115,12 @@ def create_profiler_parser() -> argparse.Namespace: ...@@ -113,6 +115,12 @@ def create_profiler_parser() -> argparse.Namespace:
help="Configuration as Python dict literal, YAML, or JSON string. CLI args override config values. " help="Configuration as Python dict literal, YAML, or JSON string. CLI args override config values. "
"Example: \"{'engine': {'backend': 'vllm', 'config': '/path'}, 'sla': {'isl': 3000}}\"", "Example: \"{'engine': {'backend': 'vllm', 'config': '/path'}, 'sla': {'isl': 3000}}\"",
) )
parser.add_argument(
"--model",
type=str,
default=config.get("deployment", {}).get("model", ""),
help="Model to serve, can be HF model name or local model path",
)
# CLI arguments with config-aware defaults (using nested .get() for cleaner code) # CLI arguments with config-aware defaults (using nested .get() for cleaner code)
parser.add_argument( parser.add_argument(
...@@ -144,13 +152,13 @@ def create_profiler_parser() -> argparse.Namespace: ...@@ -144,13 +152,13 @@ def create_profiler_parser() -> argparse.Namespace:
parser.add_argument( parser.add_argument(
"--min-num-gpus-per-engine", "--min-num-gpus-per-engine",
type=int, type=int,
default=config.get("hardware", {}).get("min_num_gpus_per_engine", 1), default=config.get("hardware", {}).get("min_num_gpus_per_engine", 0),
help="minimum number of GPUs per engine", help="minimum number of GPUs per engine",
) )
parser.add_argument( parser.add_argument(
"--max-num-gpus-per-engine", "--max-num-gpus-per-engine",
type=int, type=int,
default=config.get("hardware", {}).get("max_num_gpus_per_engine", 8), default=config.get("hardware", {}).get("max_num_gpus_per_engine", 0),
help="maximum number of GPUs per engine", help="maximum number of GPUs per engine",
) )
parser.add_argument( parser.add_argument(
...@@ -194,7 +202,7 @@ def create_profiler_parser() -> argparse.Namespace: ...@@ -194,7 +202,7 @@ def create_profiler_parser() -> argparse.Namespace:
parser.add_argument( parser.add_argument(
"--max-context-length", "--max-context-length",
type=int, type=int,
default=config.get("engine", {}).get("max_context_length", 16384), default=config.get("engine", {}).get("max_context_length", 0),
help="maximum context length supported by the served model", help="maximum context length supported by the served model",
) )
parser.add_argument( parser.add_argument(
...@@ -231,7 +239,7 @@ def create_profiler_parser() -> argparse.Namespace: ...@@ -231,7 +239,7 @@ def create_profiler_parser() -> argparse.Namespace:
parser.add_argument( parser.add_argument(
"--num-gpus-per-node", "--num-gpus-per-node",
type=int, type=int,
default=config.get("hardware", {}).get("num_gpus_per_node", 8), default=config.get("hardware", {}).get("num_gpus_per_node", 0),
help="Number of GPUs per node for MoE models - this will be the granularity when searching for the best TEP/DEP size", help="Number of GPUs per node for MoE models - this will be the granularity when searching for the best TEP/DEP size",
) )
...@@ -289,5 +297,9 @@ def create_profiler_parser() -> argparse.Namespace: ...@@ -289,5 +297,9 @@ def create_profiler_parser() -> argparse.Namespace:
# Validate required arguments # Validate required arguments
if not args.config: if not args.config:
parser.error("--config is required (either via CLI or profile-config)") parser.error("--config is required (either via CLI or profile-config)")
if not args.model and not args.config:
parser.error("--model or --config is required")
auto_generate_search_space(args)
return args return args
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
import argparse
import logging
import math
import os
import yaml
from benchmarks.profiler.utils.config_modifiers import CONFIG_MODIFIERS
from benchmarks.profiler.utils.model_info import get_model_info
from deploy.utils.gpu_inventory import get_gpu_summary
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s", "%Y-%m-%d %H:%M:%S"
)
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
MODEL_GPU_MEM_FRAC_MAX = 0.9
MOE_MODEL_MAX_NUM_GPUS = 32
def auto_generate_search_space(args: argparse.Namespace) -> None:
config_modifier = CONFIG_MODIFIERS[
args.backend
] # args.backend is already validated in argparse
# first check if config file exists
if args.model is not None:
if not args.config:
# modify config file from default config file
logger.info("DGD config file not provided, using default config file")
config = config_modifier.load_default_config()
else:
with open(args.config, "r") as f:
config = yaml.safe_load(f)
logger.info(f"Updating model in DGD config file to {args.model}")
config = config_modifier.update_model(config, args.model)
config_fn = f"{args.output_dir}/disagg_config.yaml"
logger.info(f"Saving generated disagg DGD config for profiling to {config_fn}")
os.makedirs(args.output_dir, exist_ok=True)
with open(config_fn, "w") as f:
yaml.dump(config, f)
args.config = config_fn
# now determine the search space
if args.model is not None:
model_info = get_model_info(args.model)
gpu_info = get_gpu_summary()
logger.info(
f"Model {args.model} has size {model_info['model_size']}, is_moe={model_info['is_moe']}, and max_context_length={model_info['max_context_length']}"
)
logger.info(
f"Cluster has {gpu_info['gpus_per_node']}x{gpu_info['model']} GPUs per node with {gpu_info['vram']} VRAM"
)
min_gpu = math.ceil(
model_info["model_size"] / MODEL_GPU_MEM_FRAC_MAX / gpu_info["vram"] # type: ignore[operator]
)
max_gpu = (
gpu_info["gpus_per_node"] # type: ignore[misc]
if not model_info["is_moe"]
else MOE_MODEL_MAX_NUM_GPUS
)
if min_gpu > max_gpu:
error_msg = f"No valid GPU configuration found for model {args.model} on the cluster with {gpu_info['gpus_per_node']}x{gpu_info['model']} GPUs per node"
logger.error(error_msg)
raise RuntimeError(error_msg)
logger.info(
f"Auto-generated search space for model {args.model} on the cluster with {gpu_info['gpus_per_node']}x{gpu_info['model']} GPUs per node: {min_gpu} to {max_gpu}"
)
args.min_num_gpus_per_engine = min_gpu
args.max_num_gpus_per_engine = max_gpu
args.is_moe_model = model_info["is_moe"] # type: ignore[assignment]
args.max_context_length = model_info["max_context_length"] # type: ignore[assignment]
args.num_gpus_per_node = gpu_info["gpus_per_node"] # type: ignore[assignment]
return
...@@ -51,7 +51,7 @@ rules: ...@@ -51,7 +51,7 @@ rules:
# Pods - needed for listing pods by label selector and getting logs from test deployments # Pods - needed for listing pods by label selector and getting logs from test deployments
- apiGroups: [""] - apiGroups: [""]
resources: ["pods"] resources: ["pods"]
verbs: ["list", "get"] verbs: ["list", "get", "create", "delete"]
- apiGroups: [""] - apiGroups: [""]
resources: ["pods/log"] resources: ["pods/log"]
verbs: ["get"] verbs: ["get"]
...@@ -73,6 +73,21 @@ subjects: ...@@ -73,6 +73,21 @@ subjects:
- kind: ServiceAccount - kind: ServiceAccount
name: dgdr-profiling-job name: dgdr-profiling-job
namespace: {{ .Release.Namespace }} namespace: {{ .Release.Namespace }}
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: {{ include "dynamo-operator.fullname" . }}-dgdr-profiling-nodes
labels:
{{- include "dynamo-operator.labels" . | nindent 4 }}
app.kubernetes.io/component: dgdr-profiling
rules:
# Nodes - cluster-scoped resource needed for profiling
- apiGroups: [""]
resources: ["nodes"]
verbs: ["get", "list", "watch"]
{{- else }} {{- else }}
# Cluster-wide mode: ClusterRole for DGDR profiling jobs # Cluster-wide mode: ClusterRole for DGDR profiling jobs
--- ---
...@@ -100,9 +115,28 @@ rules: ...@@ -100,9 +115,28 @@ rules:
# Pods - needed for listing pods by label selector and getting logs from test deployments # Pods - needed for listing pods by label selector and getting logs from test deployments
- apiGroups: [""] - apiGroups: [""]
resources: ["pods"] resources: ["pods"]
verbs: ["list", "get"] verbs: ["list", "get", "create", "delete"]
- apiGroups: [""] - apiGroups: [""]
resources: ["pods/log"] resources: ["pods/log"]
verbs: ["get"] verbs: ["get"]
- apiGroups: [""]
resources: ["nodes"]
verbs: ["get", "list", "watch"]
{{- end }} {{- end }}
# (Remove the trailing blank line at end of file)
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: {{ include "dynamo-operator.fullname" . }}-dgdr-profiling-nodes
labels:
{{- include "dynamo-operator.labels" . | nindent 4 }}
app.kubernetes.io/component: dgdr-profiling
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: {{ include "dynamo-operator.fullname" . }}-dgdr-profiling-nodes
subjects:
- kind: ServiceAccount
name: dgdr-profiling-job
namespace: {{ .Release.Namespace }}
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
import argparse
import json
import logging
import re
import shutil
import subprocess
import time
import uuid
from dataclasses import asdict, dataclass
from typing import Dict, List, Optional, Tuple, Union
from kubernetes import client, config
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s", "%Y-%m-%d %H:%M:%S"
)
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
def run_command(cmd: List[str], capture_output: bool = True, exit_on_error: bool = True): # type: ignore
try:
return subprocess.run(cmd, capture_output=capture_output, text=True, check=True)
except subprocess.CalledProcessError as e: # pragma: no cover - passthrough
if exit_on_error:
logger.error(f"Command failed: {' '.join(cmd)}")
if e.stdout:
logger.error(e.stdout)
if e.stderr:
logger.error(e.stderr)
raise RuntimeError(f"Command failed: {' '.join(cmd)}")
raise
NVIDIA_PREFIX = "nvidia.com/"
LABEL_GPU_COUNT = f"{NVIDIA_PREFIX}gpu.count"
LABEL_GPU_PRODUCT = f"{NVIDIA_PREFIX}gpu.product"
LABEL_GPU_MEMORY = f"{NVIDIA_PREFIX}gpu.memory" # MiB per GPU
LABEL_MIG_CAPABLE = f"{NVIDIA_PREFIX}mig.capable"
@dataclass
class NodeGpuInventory:
node_name: str
gpu_count: Optional[int]
gpu_product: Optional[str]
gpu_memory_mib: Optional[int]
mig_capable: Optional[bool]
allocatable_gpu: Optional[int]
mig_resources: Dict[str, str]
def to_dict(self) -> Dict[str, Union[str, int, bool, Dict[str, str], None]]:
return asdict(self)
def _parse_int(value: Optional[str]) -> Optional[int]:
if value is None:
return None
try:
return int(value)
except (TypeError, ValueError):
match = re.search(r"\d+", str(value))
return int(match.group(0)) if match else None
def _bool_from_str(value: Optional[str]) -> Optional[bool]:
if value is None:
return None
s = str(value).strip().lower()
if s in {"true", "1", "yes"}:
return True
if s in {"false", "0", "no"}:
return False
return None
def _normalize_node(node: Union[client.V1Node, Dict]) -> Dict:
# Convert V1Node to dict for uniform access
if hasattr(node, "to_dict"):
return node.to_dict()
return node # assume already dict
def _extract_inventory(node_obj: Dict) -> NodeGpuInventory:
meta = node_obj.get("metadata", {})
status = node_obj.get("status", {})
labels = meta.get("labels", {}) or {}
node_name = meta.get("name", "<unknown>")
gpu_product = labels.get(LABEL_GPU_PRODUCT)
gpu_memory_mib = _parse_int(labels.get(LABEL_GPU_MEMORY))
mig_capable = _bool_from_str(labels.get(LABEL_MIG_CAPABLE))
# Prefer GFD-reported GPU count if present; otherwise use allocatable nvidia.com/gpu
gpu_count = _parse_int(labels.get(LABEL_GPU_COUNT))
alloc = status.get("allocatable", {}) or {}
alloc_gpu = _parse_int(alloc.get(f"{NVIDIA_PREFIX}gpu"))
if gpu_count is None:
gpu_count = alloc_gpu
# Collect MIG resource keys and counts if present
mig_resources: Dict[str, str] = {
k: str(v)
for k, v in alloc.items()
if isinstance(k, str)
and k.startswith(f"{NVIDIA_PREFIX}mig-")
and _parse_int(str(v))
}
return NodeGpuInventory(
node_name=node_name,
gpu_count=gpu_count,
gpu_product=gpu_product,
gpu_memory_mib=gpu_memory_mib,
mig_capable=mig_capable,
allocatable_gpu=alloc_gpu,
mig_resources=mig_resources,
)
def _list_nodes_via_client() -> List[Dict]:
# Assume running inside a Kubernetes pod with service account
try:
config.load_incluster_config()
except Exception as e:
raise RuntimeError(
f"Failed to load in-cluster Kubernetes config. Ensure this runs in a pod with a service account. Error: {e}"
)
v1 = client.CoreV1Api()
items = v1.list_node().items # type: ignore[attr-defined]
return [_normalize_node(n) for n in items]
def _list_nodes_via_kubectl() -> List[Dict]:
if not shutil.which("kubectl"):
raise RuntimeError("kubectl not found in PATH for fallback")
result = run_command(["kubectl", "get", "nodes", "-o", "json"], capture_output=True)
data = json.loads(result.stdout)
return data.get("items", [])
def collect_gpu_inventory(
prefer_client: bool = True,
) -> Tuple[List[NodeGpuInventory], str]:
sources_tried: List[str] = []
errors: List[str] = []
def _via_client() -> List[NodeGpuInventory]:
items = _list_nodes_via_client()
return [_extract_inventory(n) for n in items]
def _via_kubectl() -> List[NodeGpuInventory]:
items = _list_nodes_via_kubectl()
return [_extract_inventory(n) for n in items]
if prefer_client:
try:
sources_tried.append("kubernetes-client")
return _via_client(), ",".join(sources_tried)
except Exception as e:
errors.append(str(e))
try:
sources_tried.append("kubectl-json")
return _via_kubectl(), ",".join(sources_tried)
except Exception as e2:
errors.append(str(e2))
raise RuntimeError("Failed to list nodes: " + " | ".join(errors))
else:
try:
sources_tried.append("kubectl-json")
return _via_kubectl(), ",".join(sources_tried)
except Exception as e:
errors.append(str(e))
try:
sources_tried.append("kubernetes-client")
return _via_client(), ",".join(sources_tried)
except Exception as e2:
errors.append(str(e2))
raise RuntimeError("Failed to list nodes: " + " | ".join(errors))
def _format_gib(mib: Optional[int]) -> str:
if mib is None:
return ""
return f"{mib/1024:.1f} GiB"
def print_table(rows: List[NodeGpuInventory], show_mig: bool = False) -> None:
headers = ["NODE", "GPUS", "MODEL", "VRAM/GPU", "MIG"]
table: List[List[str]] = []
for r in rows:
mig_str = ""
if r.mig_capable is True:
if r.mig_resources:
mig_str = ",".join(
f"{k.split('/')[-1]}={v}"
for k, v in sorted(r.mig_resources.items())
)
else:
mig_str = "capable"
elif r.mig_capable is False:
mig_str = "no"
table.append(
[
r.node_name,
"" if r.gpu_count is None else str(r.gpu_count),
r.gpu_product or "",
_format_gib(r.gpu_memory_mib),
mig_str if show_mig else ("yes" if r.mig_capable else ""),
]
)
# Compute column widths
widths = [len(h) for h in headers]
for row in table:
for i, cell in enumerate(row):
widths[i] = max(widths[i], len(cell))
def _fmt_row(row: List[str]) -> str:
return " ".join(cell.ljust(widths[i]) for i, cell in enumerate(row))
logger.info(_fmt_row(headers))
logger.info(_fmt_row(["-" * w for w in widths]))
for row in table:
logger.info(_fmt_row(row))
def aggregate_valued_rows(
rows: List[NodeGpuInventory],
) -> Tuple[Optional[NodeGpuInventory], int]:
"""Aggregate rows that have meaningful GPU metadata.
Preference order when multiple distinct values exist:
1) Larger GPUs per node (gpu_count)
2) Larger VRAM per GPU (gpu_memory_mib)
Returns (selected_row_like, distinct_count).
"""
valued: List[NodeGpuInventory] = [
r for r in rows if (r.gpu_product is not None or r.gpu_memory_mib is not None)
]
if not valued:
return None, 0
# Group by (product, vram_mib)
from collections import defaultdict
groups: Dict[
Tuple[Optional[str], Optional[int]],
Dict[str, object],
] = defaultdict(lambda: {"max_gpu": 0, "rows": []})
for r in valued:
key = (r.gpu_product, r.gpu_memory_mib)
meta = groups[key]
meta["rows"].append(r) # type: ignore[attr-defined, index]
# Use known gpu_count if available for ranking
if r.gpu_count is not None:
meta["max_gpu"] = max(int(meta["max_gpu"]), int(r.gpu_count)) # type: ignore[arg-type, call-overload, index]
def sort_key(
item: Tuple[
Tuple[Optional[str], Optional[int]],
Dict[str, object],
]
):
(prod, mem_mib), meta = item
max_gpu = int(meta["max_gpu"]) # type: ignore[arg-type, call-overload, index]
mem_val = mem_mib if mem_mib is not None else -1
return (max_gpu, mem_val)
selected_key, selected_meta = sorted(groups.items(), key=sort_key, reverse=True)[0]
sel_prod, sel_mem_mib = selected_key
sel_gpu = int(selected_meta["max_gpu"]) # type: ignore[arg-type, call-overload, index]
selected = NodeGpuInventory(
node_name="<aggregate>",
gpu_count=sel_gpu if sel_gpu > 0 else None,
gpu_product=sel_prod,
gpu_memory_mib=sel_mem_mib,
mig_capable=None,
allocatable_gpu=None,
mig_resources={},
)
return selected, len(groups)
def _get_current_namespace(default: str = "default") -> str:
try:
with open("/var/run/secrets/kubernetes.io/serviceaccount/namespace", "r") as f:
return f.read().strip() or default
except Exception:
return default
def enrich_with_smi(
rows: List[NodeGpuInventory],
namespace: Optional[str] = None,
timeout_seconds: int = 180,
) -> None:
"""For nodes missing product/memory labels, schedule a short-lived pod on each node
that requests 1 GPU and runs nvidia-smi to capture model and memory.
Requires permissions: create/get/delete pods and get pods/log in the namespace.
"""
ns = namespace or _get_current_namespace()
try:
config.load_incluster_config()
except Exception:
pass
v1 = client.CoreV1Api()
for inv in rows:
if not inv.gpu_count or (
inv.gpu_product is not None and inv.gpu_memory_mib is not None
):
continue
pod_name = f"gpu-inv-smi-{uuid.uuid4().hex[:6]}"
container = client.V1Container(
name="smi",
image="nvidia/cuda:12.3.2-base-ubuntu22.04",
command=["bash", "-lc"],
args=[
"nvidia-smi --query-gpu=name,memory.total --format=csv,noheader,nounits"
],
resources=client.V1ResourceRequirements(
limits={"nvidia.com/gpu": "1", "cpu": "100m", "memory": "128Mi"},
requests={"nvidia.com/gpu": "1", "cpu": "50m", "memory": "64Mi"},
),
)
pod = client.V1Pod(
api_version="v1",
kind="Pod",
metadata=client.V1ObjectMeta(name=pod_name, namespace=ns),
spec=client.V1PodSpec(
restart_policy="Never",
node_name=inv.node_name,
containers=[container],
),
)
logs = ""
try:
v1.create_namespaced_pod(namespace=ns, body=pod)
start = time.time()
while time.time() - start < timeout_seconds:
p = v1.read_namespaced_pod(name=pod_name, namespace=ns)
phase = (p.status.phase or "").lower()
if phase in ("succeeded", "failed"):
break
time.sleep(2)
try:
logs = v1.read_namespaced_pod_log(name=pod_name, namespace=ns)
except Exception:
logs = ""
finally:
try:
v1.delete_namespaced_pod(
name=pod_name, namespace=ns, body=client.V1DeleteOptions()
)
except Exception:
pass
for line in logs.splitlines():
parts = [x.strip() for x in line.split(",")]
if len(parts) >= 2 and parts[0]:
inv.gpu_product = inv.gpu_product or parts[0]
mem_match = re.search(r"\d+", parts[1])
if mem_match:
inv.gpu_memory_mib = inv.gpu_memory_mib or int(mem_match.group(0))
break
def get_gpu_summary(
prefer_client: bool = True, enrich_smi: bool = True
) -> Dict[str, object]:
"""Return an aggregate GPU summary for the cluster.
Selection policy when multiple values exist: prefer higher GPUs per node,
then higher VRAM/GPU. Returns dict with keys: gpus_per_node, model, vram.
If model/VRAM unavailable anywhere, returns {"gpus_per_node": max_gpus, "model": "", "vram": 0}.
"""
# TODO: use proper tools (i.e., DCGM) to get GPU inventory
rows, _ = collect_gpu_inventory(prefer_client=prefer_client)
if enrich_smi:
enrich_with_smi(rows)
agg, _distinct = aggregate_valued_rows(rows)
if agg is None:
# Fallback to max GPUs only
max_gpus = 0
for r in rows:
if r.gpu_count is not None:
max_gpus = max(max_gpus, int(r.gpu_count))
return {"gpus_per_node": max_gpus, "model": "", "vram": 0}
gpus_val = int(agg.gpu_count) if agg.gpu_count is not None else 0
model_val = agg.gpu_product or ""
vram_val = int(agg.gpu_memory_mib) if agg.gpu_memory_mib is not None else 0
return {
"gpus_per_node": gpus_val,
"model": model_val,
"vram": vram_val,
}
def main() -> None:
parser = argparse.ArgumentParser(
description="Report GPU inventory per Kubernetes node (count, SKU, VRAM)."
)
parser.add_argument(
"--format",
"-o",
choices=["table", "json"],
default="table",
help="Output format",
)
parser.add_argument(
"--prefer",
choices=["client", "kubectl"],
default="client",
help="Prefer Kubernetes Python client or kubectl JSON fallback",
)
parser.add_argument(
"--show-mig",
action="store_true",
help="In table output, show MIG resource types and counts",
)
parser.add_argument(
"--enrich-smi",
action="store_true",
help="Schedule short-lived pods per node to fetch model/VRAM via nvidia-smi",
)
parser.add_argument(
"--aggregate",
action="store_true",
help="Print a single representative (GPUs per node, MODEL, VRAM/GPU). Warn if multiple values exist",
)
args = parser.parse_args()
prefer_client = args.prefer == "client"
rows, source = collect_gpu_inventory(prefer_client=prefer_client)
if args.enrich_smi:
enrich_with_smi(rows)
if args.format == "json":
payload = {
"source": source,
"items": [r.to_dict() for r in rows],
}
logger.info(json.dumps(payload, indent=2))
return
# Table output
print_table(rows, show_mig=args.show_mig)
if args.aggregate:
agg, distinct = aggregate_valued_rows(rows)
if agg is None:
logger.warning("No nodes expose MODEL/VRAM; cannot aggregate")
return
if distinct > 1:
logger.warning(
f"Multiple distinct GPU model/VRAM pairs detected across nodes: {distinct}. Showing highest GPUs per node, then highest VRAM/GPU."
)
# Print concise aggregate line
model = agg.gpu_product or ""
vram = _format_gib(agg.gpu_memory_mib)
gpus = agg.gpu_count if agg.gpu_count is not None else ""
logger.info(f"Aggregate => GPUS: {gpus} MODEL: {model} VRAM/GPU: {vram}")
if __name__ == "__main__":
main()
...@@ -128,6 +128,16 @@ spec: ...@@ -128,6 +128,16 @@ spec:
**For MoE models**, use `profile_sla_moe_job.yaml` with TEP/DEP configuration instead. **For MoE models**, use `profile_sla_moe_job.yaml` with TEP/DEP configuration instead.
### Auto-Configuration
To automatically configure the profiling job based on the hardware and model information, supply the `--model` argument to the profiling script. The following arguments will be automatically set:
- `--config`: will use the default config file (`components/backends/<backend>/deploy/disagg.yaml`) with model updated to the provided model name
- `--min-num-gpus-per-engine`: will be set to the minimum number of GPUs per engine based on the model size and hardware information
- `--max-num-gpus-per-engine`: will be set to the maximum number of GPUs per engine based on the model size and hardware information
- `--num-gpus-per-node`: will be set to the number of GPUs per node based on the hardware information
- `--is-moe-model`: will be set based on the HF config file
- `--max-context-length`: will be set to the maximum context length supported by the model based on the HF config file
### Advanced Configuration ### Advanced Configuration
- **Model caching**: For large models, create a multi-attach PVC to cache the model. See [recipes](../../recipes/README.md) for details. - **Model caching**: For large models, create a multi-attach PVC to cache the model. See [recipes](../../recipes/README.md) for details.
......
...@@ -10,6 +10,7 @@ for vllm, sglang, and trtllm backends with their respective disagg.yaml configur ...@@ -10,6 +10,7 @@ for vllm, sglang, and trtllm backends with their respective disagg.yaml configur
import sys import sys
from pathlib import Path from pathlib import Path
from unittest.mock import patch
import pytest import pytest
...@@ -18,6 +19,21 @@ project_root = Path(__file__).parent.parent.parent ...@@ -18,6 +19,21 @@ project_root = Path(__file__).parent.parent.parent
sys.path.insert(0, str(project_root)) sys.path.insert(0, str(project_root))
from benchmarks.profiler.profile_sla import run_profile # noqa: E402 from benchmarks.profiler.profile_sla import run_profile # noqa: E402
from benchmarks.profiler.utils.search_space_autogen import ( # noqa: E402
auto_generate_search_space,
)
# Override the logger fixture from conftest.py to prevent directory creation
@pytest.fixture(autouse=True)
def logger(request):
"""Override the logger fixture to prevent test directory creation.
This replaces the logger fixture from tests/conftest.py that creates
directories named after each test.
"""
# Simply do nothing - no directories created, no file handlers added
yield
class TestProfileSLADryRun: class TestProfileSLADryRun:
...@@ -188,3 +204,205 @@ class TestProfileSLADryRun: ...@@ -188,3 +204,205 @@ class TestProfileSLADryRun:
"""Test that profile_sla dry-run works for sglang backend with MoE config.""" """Test that profile_sla dry-run works for sglang backend with MoE config."""
# Run the profile in dry-run mode - should complete without errors # Run the profile in dry-run mode - should complete without errors
await run_profile(sglang_moe_args) await run_profile(sglang_moe_args)
# Example tests with mocked GPU inventory
@pytest.fixture
def mock_h100_gpu_info(self):
"""Mock GPU info for H100 80GB cluster."""
return {
"gpus_per_node": 8,
"model": "h100_sxm",
"vram": 81920, # 80GB in MiB
}
@pytest.fixture
def mock_model_info(self):
"""Mock model info for DeepSeek-R1-Distill-Llama-8B."""
return {
"model_size": 16384, # 16GB model in MiB
"is_moe": False,
"max_context_length": 16384, # 16K tokens
}
@pytest.fixture
def vllm_args_with_model_autogen(self):
"""Create arguments for vllm backend with model-based search space autogeneration."""
class Args:
def __init__(self):
self.backend = "vllm"
self.config = ""
self.output_dir = "/tmp/test_profiling_results"
self.namespace = "test-namespace"
self.model = "deepseek-ai/DeepSeek-R1-Distill-Llama-8B" # Specify model for autogen
self.min_num_gpus_per_engine = 0 # Will be auto-generated
self.max_num_gpus_per_engine = 0 # Will be auto-generated
self.skip_existing_results = False
self.force_rerun = False
self.isl = 3000
self.osl = 500
self.ttft = 50
self.itl = 10
self.max_context_length = 0
self.prefill_interpolation_granularity = 16
self.decode_interpolation_granularity = 6
self.service_name = ""
self.is_moe_model = False
self.dry_run = True
self.use_ai_configurator = False
self.aic_system = None
self.aic_model_name = None
self.aic_backend = ""
self.aic_backend_version = None
self.num_gpus_per_node = None # Will be auto-generated
self.deploy_after_profile = False
return Args()
@pytest.mark.pre_merge
@pytest.mark.asyncio
@patch("benchmarks.profiler.utils.search_space_autogen.get_gpu_summary")
@patch("benchmarks.profiler.utils.search_space_autogen.get_model_info")
async def test_profile_with_autogen_search_space_h100(
self,
mock_get_model_info,
mock_get_gpu_summary,
vllm_args_with_model_autogen,
mock_h100_gpu_info,
mock_model_info,
):
"""Test profile_sla with auto-generated search space on mocked H100 cluster.
This test demonstrates how search space is auto-generated based on model
size and available GPU memory.
"""
# Configure the mocks to return the appropriate info
mock_get_model_info.return_value = mock_model_info
mock_get_gpu_summary.return_value = mock_h100_gpu_info
# Run the profile - the search space will be auto-generated
# based on the model and mocked GPU info
auto_generate_search_space(vllm_args_with_model_autogen)
await run_profile(vllm_args_with_model_autogen)
@pytest.fixture
def sglang_args_with_model_autogen(self):
"""Create arguments for sglang backend with model-based search space autogeneration."""
class Args:
def __init__(self):
self.backend = "sglang"
self.config = ""
self.output_dir = "/tmp/test_profiling_results"
self.namespace = "test-namespace"
self.model = "deepseek-ai/DeepSeek-R1-Distill-Llama-8B" # Specify model for autogen
self.min_num_gpus_per_engine = 0 # Will be auto-generated
self.max_num_gpus_per_engine = 0 # Will be auto-generated
self.skip_existing_results = False
self.force_rerun = False
self.isl = 3000
self.osl = 500
self.ttft = 50
self.itl = 10
self.max_context_length = 0
self.prefill_interpolation_granularity = 16
self.decode_interpolation_granularity = 6
self.service_name = ""
self.is_moe_model = False
self.dry_run = True
self.use_ai_configurator = False
self.aic_system = None
self.aic_model_name = None
self.aic_backend = ""
self.aic_backend_version = None
self.num_gpus_per_node = None # Will be auto-generated
self.deploy_after_profile = False
return Args()
@pytest.mark.pre_merge
@pytest.mark.asyncio
@patch("benchmarks.profiler.utils.search_space_autogen.get_gpu_summary")
@patch("benchmarks.profiler.utils.search_space_autogen.get_model_info")
async def test_sglang_profile_with_autogen_search_space_h100(
self,
mock_get_model_info,
mock_get_gpu_summary,
sglang_args_with_model_autogen,
mock_h100_gpu_info,
mock_model_info,
):
"""Test profile_sla with auto-generated search space for sglang on mocked H100 cluster.
This test demonstrates how search space is auto-generated based on model
size and available GPU memory for sglang backend.
"""
# Configure the mocks to return the appropriate info
mock_get_model_info.return_value = mock_model_info
mock_get_gpu_summary.return_value = mock_h100_gpu_info
# Run the profile - the search space will be auto-generated
# based on the model and mocked GPU info
auto_generate_search_space(sglang_args_with_model_autogen)
await run_profile(sglang_args_with_model_autogen)
@pytest.fixture
def trtllm_args_with_model_autogen(self):
"""Create arguments for trtllm backend with model-based search space autogeneration."""
class Args:
def __init__(self):
self.backend = "trtllm"
self.config = ""
self.output_dir = "/tmp/test_profiling_results"
self.namespace = "test-namespace"
self.model = "deepseek-ai/DeepSeek-R1-Distill-Llama-8B" # Specify model for autogen
self.min_num_gpus_per_engine = 0 # Will be auto-generated
self.max_num_gpus_per_engine = 0 # Will be auto-generated
self.skip_existing_results = False
self.force_rerun = False
self.isl = 3000
self.osl = 500
self.ttft = 50
self.itl = 10
self.max_context_length = 0
self.prefill_interpolation_granularity = 16
self.decode_interpolation_granularity = 6
self.service_name = ""
self.is_moe_model = False
self.dry_run = True
self.use_ai_configurator = False
self.aic_system = None
self.aic_model_name = None
self.aic_backend = ""
self.aic_backend_version = None
self.num_gpus_per_node = None # Will be auto-generated
self.deploy_after_profile = False
return Args()
@pytest.mark.pre_merge
@pytest.mark.asyncio
@patch("benchmarks.profiler.utils.search_space_autogen.get_gpu_summary")
@patch("benchmarks.profiler.utils.search_space_autogen.get_model_info")
async def test_trtllm_profile_with_autogen_search_space_h100(
self,
mock_get_model_info,
mock_get_gpu_summary,
trtllm_args_with_model_autogen,
mock_h100_gpu_info,
mock_model_info,
):
"""Test profile_sla with auto-generated search space for trtllm on mocked H100 cluster.
This test demonstrates how search space is auto-generated based on model
size and available GPU memory for trtllm backend.
"""
# Configure the mocks to return the appropriate info
mock_get_model_info.return_value = mock_model_info
mock_get_gpu_summary.return_value = mock_h100_gpu_info
# Run the profile - the search space will be auto-generated
# based on the model and mocked GPU info
auto_generate_search_space(trtllm_args_with_model_autogen)
await run_profile(trtllm_args_with_model_autogen)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment