Unverified Commit d3706221 authored by Hongkuan Zhou's avatar Hongkuan Zhou Committed by GitHub
Browse files

feat: use configmap instead of PVC for profiling results (#3981)


Signed-off-by: default avatarhongkuanz <hongkuanz@nvidia.com>
Signed-off-by: default avatarHannah Zhang <hannahz@nvidia.com>
Co-authored-by: default avatarHannah Zhang <hannahz@nvidia.com>
parent 51c103b2
...@@ -22,8 +22,8 @@ import numpy as np ...@@ -22,8 +22,8 @@ import numpy as np
import yaml import yaml
from benchmarks.profiler.utils.aiperf import benchmark_decode, benchmark_prefill from benchmarks.profiler.utils.aiperf import benchmark_decode, benchmark_prefill
from benchmarks.profiler.utils.config import generate_dgd_config_with_planner
from benchmarks.profiler.utils.config_modifiers import CONFIG_MODIFIERS from benchmarks.profiler.utils.config_modifiers import CONFIG_MODIFIERS
from benchmarks.profiler.utils.dgd_generation import generate_dgd_config_with_planner
from benchmarks.profiler.utils.estimate_perf import AIConfiguratorPerfEstimator from benchmarks.profiler.utils.estimate_perf import AIConfiguratorPerfEstimator
from benchmarks.profiler.utils.plot import ( from benchmarks.profiler.utils.plot import (
plot_decode_performance, plot_decode_performance,
...@@ -92,7 +92,6 @@ async def run_profile(args): ...@@ -92,7 +92,6 @@ async def run_profile(args):
with open(args.config, "r") as f: with open(args.config, "r") as f:
config = yaml.safe_load(f) config = yaml.safe_load(f)
config = config_modifier.update_model(config, args.model)
if args.dgd_image: if args.dgd_image:
config = config_modifier.update_image(config, args.dgd_image) config = config_modifier.update_image(config, args.dgd_image)
logger.info(f"Using DGD image: {args.dgd_image}") logger.info(f"Using DGD image: {args.dgd_image}")
...@@ -741,8 +740,11 @@ async def run_profile(args): ...@@ -741,8 +740,11 @@ async def run_profile(args):
) )
logger.info(f"Final DGD config with planner: {config}") logger.info(f"Final DGD config with planner: {config}")
# save DGD config with planner # save DGD config with planner; support multi-document output when a ConfigMap is included
with open(f"{args.output_dir}/config_with_planner.yaml", "w") as f: with open(f"{args.output_dir}/config_with_planner.yaml", "w") as f:
if isinstance(config, list):
yaml.dump_all(config, f)
else:
yaml.dump(config, f) yaml.dump(config, f)
except Exception as e: except Exception as e:
......
...@@ -19,10 +19,8 @@ import math ...@@ -19,10 +19,8 @@ import math
import shlex import shlex
from typing import Literal, Optional, Protocol from typing import Literal, Optional, Protocol
import yaml
from pydantic import BaseModel from pydantic import BaseModel
from benchmarks.profiler.utils.planner_utils import build_planner_args_from_namespace
from dynamo.common.utils.paths import get_workspace_dir from dynamo.common.utils.paths import get_workspace_dir
from dynamo.planner.defaults import WORKER_COMPONENT_NAMES, SubComponentType from dynamo.planner.defaults import WORKER_COMPONENT_NAMES, SubComponentType
...@@ -104,7 +102,8 @@ class DgdPlannerServiceConfig(BaseModel): ...@@ -104,7 +102,8 @@ class DgdPlannerServiceConfig(BaseModel):
dynamoNamespace: str = "dynamo" # placeholder dynamoNamespace: str = "dynamo" # placeholder
componentType: str = "planner" componentType: str = "planner"
replicas: int = 1 replicas: int = 1
volumeMounts: list[VolumeMount] = [VolumeMount()] # Do not attach PVC; we'll mount a ConfigMap for planner data instead.
volumeMounts: list[VolumeMount] = []
extraPodSpec: PodSpec = PodSpec( extraPodSpec: PodSpec = PodSpec(
mainContainer=Container( mainContainer=Container(
image="my-registry/dynamo-runtime:my-tag", # placeholder image="my-registry/dynamo-runtime:my-tag", # placeholder
...@@ -445,124 +444,3 @@ class ConfigModifierProtocol(Protocol): ...@@ -445,124 +444,3 @@ class ConfigModifierProtocol(Protocol):
@classmethod @classmethod
def update_image(cls, config: dict, image: str) -> dict: def update_image(cls, config: dict, image: str) -> dict:
... ...
def generate_dgd_config_with_planner(
config_path: str,
config_modifier,
best_prefill_gpus: int,
best_decode_gpus: int,
output_dir: str,
args,
is_moe_model: bool = False,
num_gpus_per_node: int = 8,
):
"""Generate DGD config with planner based on profiling results.
Args:
config_path: Path to the YAML config file
config_modifier: Config modifier instance (e.g., SGLangConfigModifier)
best_prefill_gpus: Number of GPUs for prefill engine
best_decode_gpus: Number of GPUs for decode engine
output_dir: Output directory for profile results
args: Parsed arguments namespace from profile_sla
is_moe_model: Whether this is an MoE model
num_gpus_per_node: Number of GPUs per node (for MoE models)
Returns:
dict: Final DGD config with planner service configured
"""
# Load config from file
with open(config_path, "r") as f:
config = yaml.safe_load(f)
# Update model name in config from profiling args
# This ensures the final DGD uses the model specified in the DGDR, not the default in the config file
config = config_modifier.update_model(config, args.model)
# Update container image if provided
# This overrides the default image in the config file for all DGD components
if args.dgd_image:
config = config_modifier.update_image(config, args.dgd_image)
if not is_moe_model:
# dense model, use TP for both prefill and decode
config = config_modifier.set_config_tp_size(
config, best_prefill_gpus, SubComponentType.PREFILL
)
config = config_modifier.set_config_tp_size(
config, best_decode_gpus, SubComponentType.DECODE
)
else:
# MoE model, use TEP for prefill and DEP for decode
config = config_modifier.set_config_tep_size(
config,
best_prefill_gpus,
num_gpus_per_node,
SubComponentType.PREFILL,
)
config = config_modifier.set_config_dep_size(
config,
best_decode_gpus,
num_gpus_per_node,
SubComponentType.DECODE,
)
config = Config.model_validate(config)
# add PVC config if not present
if not config.spec.pvcs:
config.spec.pvcs = [PVCConfig()]
# add the planner service
planner_config = DgdPlannerServiceConfig()
frontend_service = config.spec.services["Frontend"]
planner_config.dynamoNamespace = getattr(frontend_service, "dynamoNamespace", "dynamo") # type: ignore[attr-defined]
if frontend_service.extraPodSpec and frontend_service.extraPodSpec.mainContainer:
frontend_image = frontend_service.extraPodSpec.mainContainer.image
if frontend_image and planner_config.extraPodSpec.mainContainer:
planner_config.extraPodSpec.mainContainer.image = frontend_image
# Build planner args dynamically from parsed arguments
# This includes shared args (ttft, itl, backend, namespace) from profile_sla
# and planner-specific args (with planner_ prefix)
planner_args = build_planner_args_from_namespace(args, prefix="planner_")
# Override profiling-specific arguments with results from profiling
# Remove and re-add to ensure correct values from profiling context
planner_args = [
arg
for arg in planner_args
if not any(
arg.startswith(f"--{key}=")
for key in [
"namespace",
"prefill-engine-num-gpu",
"decode-engine-num-gpu",
"profile-results-dir",
]
)
]
# Add arguments determined by profiling results
frontend_namespace = getattr(config.spec.services["Frontend"], "dynamoNamespace", "dynamo") # type: ignore[attr-defined]
planner_args.extend(
[
f"--namespace={frontend_namespace}",
f"--prefill-engine-num-gpu={best_prefill_gpus}",
f"--decode-engine-num-gpu={best_decode_gpus}",
f"--profile-results-dir={output_dir}",
]
)
if (
planner_config.extraPodSpec.mainContainer
and planner_config.extraPodSpec.mainContainer.args is not None
):
planner_config.extraPodSpec.mainContainer.args.extend(planner_args)
# Convert planner config to dict first, then the entire config to dict
planner_dict = planner_config.model_dump(exclude_unset=False)
config_dict = config.model_dump(exclude_unset=False)
config_dict["spec"]["services"]["Planner"] = planner_dict
return config_dict
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
from typing import Optional
import numpy as np
import yaml
from benchmarks.profiler.utils.config import Config, DgdPlannerServiceConfig
from benchmarks.profiler.utils.planner_utils import build_planner_args_from_namespace
from dynamo.common.utils.paths import get_workspace_dir
from dynamo.planner.defaults import SubComponentType
def generate_dgd_config_with_planner(
config_path: str,
config_modifier,
best_prefill_gpus: int,
best_decode_gpus: int,
output_dir: str,
args,
is_moe_model: bool = False,
num_gpus_per_node: int = 8,
):
"""Generate DGD config with planner based on profiling results.
Args:
config_path: Path to the YAML config file
config_modifier: Config modifier instance (e.g., SGLangConfigModifier)
best_prefill_gpus: Number of GPUs for prefill engine
best_decode_gpus: Number of GPUs for decode engine
output_dir: Output directory for profile results
args: Parsed arguments namespace from profile_sla
is_moe_model: Whether this is an MoE model
num_gpus_per_node: Number of GPUs per node (for MoE models)
Returns:
list[dict] | dict: If a ConfigMap is generated for planner data, returns a list
of two YAML documents [ConfigMap, DGD]; otherwise returns a single DGD dict.
"""
# Load config from file
with open(config_path, "r") as f:
config = yaml.safe_load(f)
# Update container image if provided
# This overrides the default image in the config file for all DGD components
if args.dgd_image:
config = config_modifier.update_image(config, args.dgd_image)
if not is_moe_model:
# dense model, use TP for both prefill and decode
config = config_modifier.set_config_tp_size(
config, best_prefill_gpus, SubComponentType.PREFILL
)
config = config_modifier.set_config_tp_size(
config, best_decode_gpus, SubComponentType.DECODE
)
else:
# MoE model, use TEP for prefill and DEP for decode
config = config_modifier.set_config_tep_size(
config,
best_prefill_gpus,
num_gpus_per_node,
SubComponentType.PREFILL,
)
config = config_modifier.set_config_dep_size(
config,
best_decode_gpus,
num_gpus_per_node,
SubComponentType.DECODE,
)
config = Config.model_validate(config)
# add the planner service
planner_config = DgdPlannerServiceConfig()
frontend_service = config.spec.services["Frontend"]
planner_config.dynamoNamespace = getattr(frontend_service, "dynamoNamespace", "dynamo") # type: ignore[attr-defined]
if frontend_service.extraPodSpec and frontend_service.extraPodSpec.mainContainer:
frontend_image = frontend_service.extraPodSpec.mainContainer.image
if frontend_image and planner_config.extraPodSpec.mainContainer:
planner_config.extraPodSpec.mainContainer.image = frontend_image
# Build planner args dynamically from parsed arguments
# This includes shared args (ttft, itl, backend, namespace) from profile_sla
# and planner-specific args (with planner_ prefix)
planner_args = build_planner_args_from_namespace(args, prefix="planner_")
# Override profiling-specific arguments with results from profiling
# Remove and re-add to ensure correct values from profiling context
planner_args = [
arg
for arg in planner_args
if not any(
arg.startswith(f"--{key}=")
for key in [
"namespace",
"prefill-engine-num-gpu",
"decode-engine-num-gpu",
"profile-results-dir",
]
)
]
# Add arguments determined by profiling results
frontend_namespace = getattr(config.spec.services["Frontend"], "dynamoNamespace", "dynamo") # type: ignore[attr-defined]
cm_mount_path = f"{get_workspace_dir()}/profiling_results"
planner_args.extend(
[
f"--namespace={frontend_namespace}",
f"--prefill-engine-num-gpu={best_prefill_gpus}",
f"--decode-engine-num-gpu={best_decode_gpus}",
f"--profile-results-dir={cm_mount_path}",
]
)
if (
planner_config.extraPodSpec.mainContainer
and planner_config.extraPodSpec.mainContainer.args is not None
):
planner_config.extraPodSpec.mainContainer.args.extend(planner_args)
# Convert planner config to dict first, then the entire config to dict
planner_dict = planner_config.model_dump(exclude_unset=False)
config_dict = config.model_dump(exclude_unset=False)
# Build a ConfigMap from NPZ profiling outputs and mount it into the Planner
# We store data as plain JSON (lists/float/int) to avoid binary artifacts.
prefill_npz = f"{output_dir}/selected_prefill_interpolation/raw_data.npz"
decode_npz = f"{output_dir}/selected_decode_interpolation/raw_data.npz"
config_map_obj: Optional[dict] = None
try:
with np.load(prefill_npz) as p_raw:
prefill_json = {
"prefill_isl": p_raw["prefill_isl"].tolist(),
"prefill_ttft": p_raw["prefill_ttft"].tolist(),
"prefill_thpt_per_gpu": p_raw["prefill_thpt_per_gpu"].tolist(),
}
except FileNotFoundError:
prefill_json = None
try:
with np.load(decode_npz) as d_raw:
# max_kv_tokens saved as array; convert to int
max_kv_tokens = d_raw["max_kv_tokens"]
if hasattr(max_kv_tokens, "tolist"):
max_kv_tokens_val = max_kv_tokens.tolist()
# Handle [value] vs value
if isinstance(max_kv_tokens_val, list):
max_kv_tokens_val = (
int(max_kv_tokens_val[0]) if max_kv_tokens_val else 0
)
else:
max_kv_tokens_val = int(max_kv_tokens_val)
else:
max_kv_tokens_val = int(max_kv_tokens)
decode_json = {
"x_kv_usage": d_raw["x_kv_usage"].tolist(),
"y_context_length": d_raw["y_context_length"].tolist(),
"z_itl": d_raw["z_itl"].tolist(),
"z_thpt_per_gpu": d_raw["z_thpt_per_gpu"].tolist(),
"max_kv_tokens": max_kv_tokens_val,
}
except FileNotFoundError:
decode_json = None
if prefill_json is not None and decode_json is not None:
config_map_obj = {
"apiVersion": "v1",
"kind": "ConfigMap",
"metadata": {"name": "planner-profile-data"},
"data": {
"prefill_raw_data.json": json.dumps(prefill_json),
"decode_raw_data.json": json.dumps(decode_json),
},
}
# Attach the ConfigMap as a volume in the Planner service
planner_volumes = planner_dict.setdefault("extraPodSpec", {}).setdefault(
"volumes", []
)
planner_volumes.append(
{
"name": "planner-profile-data",
"configMap": {"name": "planner-profile-data"},
}
)
mc_dict = planner_dict.setdefault("extraPodSpec", {}).setdefault(
"mainContainer", {}
)
mc_mounts = mc_dict.setdefault("volumeMounts", [])
mc_mounts.append(
{
"name": "planner-profile-data",
"mountPath": cm_mount_path,
"readOnly": True,
}
)
# Finalize DGD services
config_dict["spec"]["services"]["Planner"] = planner_dict
# Return multi-doc YAML (ConfigMap + DGD) when ConfigMap is created; else DGD only
if config_map_obj is not None:
return [config_map_obj, config_dict]
return config_dict
...@@ -32,7 +32,7 @@ def auto_generate_search_space(args: argparse.Namespace) -> None: ...@@ -32,7 +32,7 @@ def auto_generate_search_space(args: argparse.Namespace) -> None:
] # args.backend is already validated in argparse ] # args.backend is already validated in argparse
# first check if config file exists # first check if config file exists
if args.model is not None: if args.model:
if not args.config: if not args.config:
# modify config file from default config file # modify config file from default config file
logger.info("DGD config file not provided, using default config file") logger.info("DGD config file not provided, using default config file")
...@@ -56,9 +56,10 @@ def auto_generate_search_space(args: argparse.Namespace) -> None: ...@@ -56,9 +56,10 @@ def auto_generate_search_space(args: argparse.Namespace) -> None:
args.config = config_fn args.config = config_fn
# now determine the search space # now determine the search space
if args.model is not None: model_info = None
if args.model:
logger.info(f"Getting model info for {args.model}...")
model_info = get_model_info(args.model) model_info = get_model_info(args.model)
gpu_info = get_gpu_summary()
num_experts_str = ( num_experts_str = (
f", num_experts={model_info['num_experts']}" f", num_experts={model_info['num_experts']}"
...@@ -68,10 +69,29 @@ def auto_generate_search_space(args: argparse.Namespace) -> None: ...@@ -68,10 +69,29 @@ def auto_generate_search_space(args: argparse.Namespace) -> None:
logger.info( logger.info(
f"Model {args.model} has size {model_info['model_size']}, is_moe={model_info['is_moe']}, and max_context_length={model_info['max_context_length']}{num_experts_str}" f"Model {args.model} has size {model_info['model_size']}, is_moe={model_info['is_moe']}, and max_context_length={model_info['max_context_length']}{num_experts_str}"
) )
args.is_moe_model = model_info["is_moe"] # type: ignore[assignment]
args.max_context_length = model_info["max_context_length"] # type: ignore[assignment]
if (
args.min_num_gpus_per_engine == 0
or args.max_num_gpus_per_engine == 0
or args.num_gpus_per_node == 0
):
if not args.model:
# TODO: get model info provided DGD config
error_msg = "No model provided, cannot auto-generate GPU search space. Please provide `--model` or GPU info"
logger.error(error_msg)
raise RuntimeError(error_msg)
logger.info("Getting GPU info from k8s cluster...")
gpu_info = get_gpu_summary()
logger.info( logger.info(
f"Cluster has {gpu_info['gpus_per_node']}x{gpu_info['model']} GPUs per node with {gpu_info['vram']} VRAM" f"Cluster has {gpu_info['gpus_per_node']}x{gpu_info['model']} GPUs per node with {gpu_info['vram']} VRAM"
) )
# model_info should be set by now (checked above), but mypy needs explicit verification
assert model_info is not None, "model_info must be set when model is provided"
min_gpu = math.ceil( min_gpu = math.ceil(
model_info["model_size"] / MODEL_GPU_MEM_FRAC_MAX / gpu_info["vram"] # type: ignore[operator] model_info["model_size"] / MODEL_GPU_MEM_FRAC_MAX / gpu_info["vram"] # type: ignore[operator]
) )
...@@ -90,8 +110,6 @@ def auto_generate_search_space(args: argparse.Namespace) -> None: ...@@ -90,8 +110,6 @@ def auto_generate_search_space(args: argparse.Namespace) -> None:
) )
args.min_num_gpus_per_engine = min_gpu args.min_num_gpus_per_engine = min_gpu
args.max_num_gpus_per_engine = max_gpu args.max_num_gpus_per_engine = max_gpu
args.is_moe_model = model_info["is_moe"] # type: ignore[assignment]
args.max_context_length = model_info["max_context_length"] # type: ignore[assignment]
args.num_gpus_per_node = gpu_info["gpus_per_node"] # type: ignore[assignment] args.num_gpus_per_node = gpu_info["gpus_per_node"] # type: ignore[assignment]
args.num_experts = model_info.get("num_experts") # type: ignore[assignment] args.num_experts = model_info.get("num_experts") # type: ignore[assignment]
......
...@@ -14,7 +14,9 @@ ...@@ -14,7 +14,9 @@
# limitations under the License. # limitations under the License.
import json
import logging import logging
import os
from typing import Optional from typing import Optional
import numpy as np import numpy as np
...@@ -53,9 +55,18 @@ class PrefillInterpolator: ...@@ -53,9 +55,18 @@ class PrefillInterpolator:
self.prefill_isl = raw_data["prefill_isl"] self.prefill_isl = raw_data["prefill_isl"]
self.prefill_ttft = raw_data["prefill_ttft"] # in milliseconds self.prefill_ttft = raw_data["prefill_ttft"] # in milliseconds
self.prefill_thpt_per_gpu = raw_data["prefill_thpt_per_gpu"] self.prefill_thpt_per_gpu = raw_data["prefill_thpt_per_gpu"]
except FileNotFoundError:
# Fallback to JSON provided via ConfigMap mounted at profile_results_dir
json_fn = os.path.join(profile_results_dir, "prefill_raw_data.json")
try:
with open(json_fn, "r") as f:
data = json.load(f)
self.prefill_isl = np.array(data["prefill_isl"]) # type: ignore[index]
self.prefill_ttft = np.array(data["prefill_ttft"]) # type: ignore[index]
self.prefill_thpt_per_gpu = np.array(data["prefill_thpt_per_gpu"]) # type: ignore[index]
except FileNotFoundError: except FileNotFoundError:
logger.error( logger.error(
f"Prefill interpolation file not found: {prefill_npz_fn}\n" f"Prefill interpolation files not found: {prefill_npz_fn} and {json_fn}\n"
f"{MISSING_PROFILING_DATA_ERROR_MESSAGE}" f"{MISSING_PROFILING_DATA_ERROR_MESSAGE}"
) )
exit(1) exit(1)
...@@ -110,9 +121,20 @@ class DecodeInterpolator: ...@@ -110,9 +121,20 @@ class DecodeInterpolator:
self.z_itl = raw_data["z_itl"] self.z_itl = raw_data["z_itl"]
self.z_thpt_per_gpu = raw_data["z_thpt_per_gpu"] self.z_thpt_per_gpu = raw_data["z_thpt_per_gpu"]
self.max_kv_tokens = raw_data["max_kv_tokens"][0] self.max_kv_tokens = raw_data["max_kv_tokens"][0]
except FileNotFoundError:
# Fallback to JSON provided via ConfigMap mounted at profile_results_dir
json_fn = os.path.join(profile_results_dir, "decode_raw_data.json")
try:
with open(json_fn, "r") as f:
data = json.load(f)
self.x_kv_usage = np.array(data["x_kv_usage"]) # type: ignore[index]
self.y_context_length = np.array(data["y_context_length"]) # type: ignore[index]
self.z_itl = np.array(data["z_itl"]) # type: ignore[index]
self.z_thpt_per_gpu = np.array(data["z_thpt_per_gpu"]) # type: ignore[index]
self.max_kv_tokens = int(data["max_kv_tokens"]) # type: ignore[index]
except FileNotFoundError: except FileNotFoundError:
logger.error( logger.error(
f"Decode interpolation file not found: {decode_npz_fn}\n" f"Decode interpolation files not found: {decode_npz_fn} and {json_fn}\n"
f"{MISSING_PROFILING_DATA_ERROR_MESSAGE}" f"{MISSING_PROFILING_DATA_ERROR_MESSAGE}"
) )
exit(1) exit(1)
......
...@@ -22,6 +22,7 @@ import ( ...@@ -22,6 +22,7 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"io"
"text/template" "text/template"
batchv1 "k8s.io/api/batch/v1" batchv1 "k8s.io/api/batch/v1"
...@@ -30,8 +31,10 @@ import ( ...@@ -30,8 +31,10 @@ import (
"k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/yaml"
"k8s.io/client-go/tools/record" "k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime" ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/builder"
...@@ -40,7 +43,7 @@ import ( ...@@ -40,7 +43,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/yaml" sigsyaml "sigs.k8s.io/yaml"
nvidiacomv1alpha1 "github.com/ai-dynamo/dynamo/deploy/cloud/operator/api/v1alpha1" nvidiacomv1alpha1 "github.com/ai-dynamo/dynamo/deploy/cloud/operator/api/v1alpha1"
commonController "github.com/ai-dynamo/dynamo/deploy/cloud/operator/internal/controller_common" commonController "github.com/ai-dynamo/dynamo/deploy/cloud/operator/internal/controller_common"
...@@ -101,6 +104,12 @@ const ( ...@@ -101,6 +104,12 @@ const (
// ConfigMap naming // ConfigMap naming
ConfigMapOutputPrefix = "dgdr-output-" ConfigMapOutputPrefix = "dgdr-output-"
// Annotation keys
AnnotationAdditionalResources = "dgdr.nvidia.com/additional-resources"
// Size limits
MaxAnnotationSize = 250000 // ~250KB, below K8s 256KB limit
// Sidecar image // Sidecar image
SidecarImage = "bitnami/kubectl:latest" SidecarImage = "bitnami/kubectl:latest"
...@@ -470,6 +479,20 @@ func (r *DynamoGraphDeploymentRequestReconciler) handleDeployingState(ctx contex ...@@ -470,6 +479,20 @@ func (r *DynamoGraphDeploymentRequestReconciler) handleDeployingState(ctx contex
// Check if we need to create DGD // Check if we need to create DGD
if dgdr.Status.Deployment == nil || !dgdr.Status.Deployment.Created { if dgdr.Status.Deployment == nil || !dgdr.Status.Deployment.Created {
// Determine target namespace for deployment
targetNamespace := dgdr.Namespace
if dgdr.Spec.DeploymentOverrides != nil && dgdr.Spec.DeploymentOverrides.Namespace != "" {
targetNamespace = dgdr.Spec.DeploymentOverrides.Namespace
}
// Deploy additional resources (ConfigMaps) from the profiling output first
if err := r.createAdditionalResources(ctx, dgdr, targetNamespace); err != nil {
logger.Error(err, "Failed to create additional resources")
r.Recorder.Event(dgdr, corev1.EventTypeWarning, MessageDeploymentCreationFailed,
fmt.Sprintf("Failed to create additional resources: %v", err))
return ctrl.Result{}, err
}
return r.createDGD(ctx, dgdr) return r.createDGD(ctx, dgdr)
} }
...@@ -667,6 +690,80 @@ func (r *DynamoGraphDeploymentRequestReconciler) createDGD(ctx context.Context, ...@@ -667,6 +690,80 @@ func (r *DynamoGraphDeploymentRequestReconciler) createDGD(ctx context.Context,
return ctrl.Result{}, r.Status().Update(ctx, dgdr) return ctrl.Result{}, r.Status().Update(ctx, dgdr)
} }
// createAdditionalResources creates ConfigMaps from the profiling output that should be deployed alongside the DGD
func (r *DynamoGraphDeploymentRequestReconciler) createAdditionalResources(ctx context.Context, dgdr *nvidiacomv1alpha1.DynamoGraphDeploymentRequest, targetNamespace string) error {
logger := log.FromContext(ctx)
// Check if there are additional resources stored in annotations
if dgdr.Annotations == nil {
return nil
}
resourcesYAML, exists := dgdr.Annotations[AnnotationAdditionalResources]
if !exists || resourcesYAML == "" {
return nil
}
// Parse using standard Kubernetes YAML decoder
decoder := yaml.NewYAMLOrJSONDecoder(bytes.NewReader([]byte(resourcesYAML)), 4096)
resourceCount := 0
for {
obj := &unstructured.Unstructured{}
if err := decoder.Decode(obj); err != nil {
if err == io.EOF {
break
}
logger.Error(err, "Failed to decode resource, skipping")
continue
}
if obj.GetKind() == "" {
continue
}
resourceCount++
// Only support ConfigMap for now (what profiler actually generates)
if obj.GetKind() != "ConfigMap" {
logger.Info("Skipping non-ConfigMap resource from profiling output", "kind", obj.GetKind(), "name", obj.GetName())
continue
}
cm := &corev1.ConfigMap{}
if err := runtime.DefaultUnstructuredConverter.FromUnstructured(obj.Object, cm); err != nil {
logger.Error(err, "Failed to convert to ConfigMap", "name", obj.GetName())
continue
}
// Override namespace and add tracking labels
cm.Namespace = targetNamespace
if cm.Labels == nil {
cm.Labels = make(map[string]string)
}
cm.Labels[LabelDGDRName] = dgdr.Name
cm.Labels[LabelDGDRNamespace] = dgdr.Namespace
cm.Labels[LabelManagedBy] = LabelValueDynamoOperator
// Create the ConfigMap
if err := r.Create(ctx, cm); err != nil {
if apierrors.IsAlreadyExists(err) {
logger.Info("ConfigMap already exists, skipping", "name", cm.Name)
} else {
return fmt.Errorf("failed to create ConfigMap %s: %w", cm.Name, err)
}
} else {
logger.Info("Created ConfigMap from profiling output", "name", cm.Name, "namespace", targetNamespace)
}
}
if resourceCount > 0 {
logger.Info("Deploying additional resources from profiling output", "count", resourceCount)
}
return nil
}
// handleFailedState handles DGDR in Failed state // handleFailedState handles DGDR in Failed state
func (r *DynamoGraphDeploymentRequestReconciler) handleFailedState(ctx context.Context, dgdr *nvidiacomv1alpha1.DynamoGraphDeploymentRequest) (ctrl.Result, error) { func (r *DynamoGraphDeploymentRequestReconciler) handleFailedState(ctx context.Context, dgdr *nvidiacomv1alpha1.DynamoGraphDeploymentRequest) (ctrl.Result, error) {
logger := log.FromContext(ctx) logger := log.FromContext(ctx)
...@@ -879,7 +976,7 @@ func (r *DynamoGraphDeploymentRequestReconciler) createProfilingJob(ctx context. ...@@ -879,7 +976,7 @@ func (r *DynamoGraphDeploymentRequestReconciler) createProfilingJob(ctx context.
} }
// Serialize config to YAML for passing to profiler // Serialize config to YAML for passing to profiler
configYAML, err := yaml.Marshal(config) configYAML, err := sigsyaml.Marshal(config)
if err != nil { if err != nil {
return nil, false, fmt.Errorf("failed to marshal profiling config to YAML: %w", err) return nil, false, fmt.Errorf("failed to marshal profiling config to YAML: %w", err)
} }
...@@ -1186,28 +1283,117 @@ func (r *DynamoGraphDeploymentRequestReconciler) generateDGDSpec(ctx context.Con ...@@ -1186,28 +1283,117 @@ func (r *DynamoGraphDeploymentRequestReconciler) generateDGDSpec(ctx context.Con
logger.Info("Found profiling output in ConfigMap", "configMap", outputConfigMapName, "size", len(yamlContent)) logger.Info("Found profiling output in ConfigMap", "configMap", outputConfigMapName, "size", len(yamlContent))
// Parse YAML into full DynamoGraphDeployment object first to validate and get name // Extract DGD and any supporting resources from potentially multi-document YAML (ConfigMap + DGD)
dgd := &nvidiacomv1alpha1.DynamoGraphDeployment{} dgd, additionalResources, err := r.extractResourcesFromYAML([]byte(yamlContent))
if err := yaml.Unmarshal([]byte(yamlContent), dgd); err != nil { if err != nil {
return fmt.Errorf("failed to parse %s: %w", ProfilingOutputFile, err) return fmt.Errorf("failed to extract DGD from %s: %w", ProfilingOutputFile, err)
} }
logger.Info("Parsed DGD from ConfigMap", "dgdName", dgd.Name) logger.Info("Parsed profiling output", "dgdName", dgd.Name, "additionalResources", len(additionalResources))
// Store additional resources (ConfigMaps) in annotations first
if len(additionalResources) > 0 {
if err := r.storeAdditionalResources(ctx, dgdr, additionalResources); err != nil {
logger.Error(err, "Failed to store additional resources")
return err
}
// Refetch the DGDR after updating annotations to get the latest resourceVersion
if err := r.Get(ctx, types.NamespacedName{Name: dgdr.Name, Namespace: dgdr.Namespace}, dgdr); err != nil {
return fmt.Errorf("failed to refetch DGDR after storing annotations: %w", err)
}
}
// Store as RawExtension (need to marshal to JSON as RawExtension expects JSON) // Store the generated DGD in status
// This preserves all fields including metadata
dgdr.Status.GeneratedDeployment = &runtime.RawExtension{ dgdr.Status.GeneratedDeployment = &runtime.RawExtension{
Object: dgd, Object: dgd,
} }
// Set profiling results reference
dgdr.Status.ProfilingResults = fmt.Sprintf("configmap/%s", outputConfigMapName) dgdr.Status.ProfilingResults = fmt.Sprintf("configmap/%s", outputConfigMapName)
logger.Info("Successfully generated DGD from profiling output", "dgdName", dgd.Name)
return r.Status().Update(ctx, dgdr) return r.Status().Update(ctx, dgdr)
} }
// storeAdditionalResources marshals additional resources to YAML and stores them in DGDR annotations.
// Validates annotation size and fails gracefully if too large.
func (r *DynamoGraphDeploymentRequestReconciler) storeAdditionalResources(ctx context.Context, dgdr *nvidiacomv1alpha1.DynamoGraphDeploymentRequest, resources []*unstructured.Unstructured) error {
if len(resources) == 0 {
return nil
}
var resourcesYAML []byte
for i, res := range resources {
resYAML, err := sigsyaml.Marshal(res.Object)
if err != nil {
return fmt.Errorf("failed to marshal resource %s/%s: %w", res.GetKind(), res.GetName(), err)
}
if i > 0 {
resourcesYAML = append(resourcesYAML, []byte("\n---\n")...)
}
resourcesYAML = append(resourcesYAML, resYAML...)
}
// Validate size before storing
if len(resourcesYAML) > MaxAnnotationSize {
return fmt.Errorf("additional resources YAML size (%d bytes) exceeds maximum annotation size (%d bytes); "+
"consider reducing the number of resources or storing them separately",
len(resourcesYAML), MaxAnnotationSize)
}
if dgdr.Annotations == nil {
dgdr.Annotations = make(map[string]string)
}
dgdr.Annotations[AnnotationAdditionalResources] = string(resourcesYAML)
return r.Update(ctx, dgdr)
}
// extractResourcesFromYAML parses multi-document YAML from profiling output,
// extracting the DynamoGraphDeployment and any ConfigMaps that should be deployed with it.
func (r *DynamoGraphDeploymentRequestReconciler) extractResourcesFromYAML(yamlContent []byte) (*nvidiacomv1alpha1.DynamoGraphDeployment, []*unstructured.Unstructured, error) {
decoder := yaml.NewYAMLOrJSONDecoder(bytes.NewReader(yamlContent), 4096)
var dgd *nvidiacomv1alpha1.DynamoGraphDeployment
var additionalResources []*unstructured.Unstructured
for {
obj := &unstructured.Unstructured{}
if err := decoder.Decode(obj); err != nil {
if err == io.EOF {
break
}
// Skip invalid documents and continue
continue
}
// Skip empty objects
if obj.GetKind() == "" {
continue
}
if obj.GetKind() == "DynamoGraphDeployment" {
dgd = &nvidiacomv1alpha1.DynamoGraphDeployment{}
if err := runtime.DefaultUnstructuredConverter.FromUnstructured(obj.Object, dgd); err != nil {
return nil, nil, fmt.Errorf("failed to convert to DynamoGraphDeployment: %w", err)
}
} else {
// Store ConfigMaps or other resources for deployment
additionalResources = append(additionalResources, obj)
}
}
if dgd == nil {
return nil, nil, fmt.Errorf("no DynamoGraphDeployment found in YAML content")
}
return dgd, additionalResources, nil
}
// extractDGDFromYAML is a convenience wrapper that extracts only the DGD (used by tests)
func (r *DynamoGraphDeploymentRequestReconciler) extractDGDFromYAML(yamlContent []byte) (*nvidiacomv1alpha1.DynamoGraphDeployment, error) {
dgd, _, err := r.extractResourcesFromYAML(yamlContent)
return dgd, err
}
// updateStateAndRequeue updates the DGDR state and requeues // updateStateAndRequeue updates the DGDR state and requeues
func (r *DynamoGraphDeploymentRequestReconciler) updateStateAndRequeue(ctx context.Context, dgdr *nvidiacomv1alpha1.DynamoGraphDeploymentRequest, state, _ string) (ctrl.Result, error) { func (r *DynamoGraphDeploymentRequestReconciler) updateStateAndRequeue(ctx context.Context, dgdr *nvidiacomv1alpha1.DynamoGraphDeploymentRequest, state, _ string) (ctrl.Result, error) {
dgdr.Status.State = state dgdr.Status.State = state
......
...@@ -1243,4 +1243,175 @@ var _ = Describe("DGDR Error Handling", func() { ...@@ -1243,4 +1243,175 @@ var _ = Describe("DGDR Error Handling", func() {
Expect(condition.Message).Should(ContainSubstring("profiling job failed")) Expect(condition.Message).Should(ContainSubstring("profiling job failed"))
}) })
}) })
Context("When parsing multi-document YAML", func() {
It("Should extract DGD from ConfigMap + DGD YAML", func() {
// Multi-document YAML with ConfigMap first, then DGD
multiDocYAML := `---
apiVersion: v1
kind: ConfigMap
metadata:
name: test-config
namespace: default
data:
some-data: "value"
---
apiVersion: nvidia.com/v1alpha1
kind: DynamoGraphDeployment
metadata:
name: test-dgd
namespace: default
spec:
backendFramework: vllm
services: {}`
dgd, err := reconciler.extractDGDFromYAML([]byte(multiDocYAML))
Expect(err).NotTo(HaveOccurred())
Expect(dgd).NotTo(BeNil())
Expect(dgd.Kind).Should(Equal("DynamoGraphDeployment"))
Expect(dgd.Name).Should(Equal("test-dgd"))
Expect(dgd.Spec.BackendFramework).Should(Equal("vllm"))
})
It("Should extract DGD from single-document YAML", func() {
// Single document YAML without separator
singleDocYAML := `apiVersion: nvidia.com/v1alpha1
kind: DynamoGraphDeployment
metadata:
name: test-dgd-single
namespace: default
spec:
backendFramework: vllm
services: {}`
dgd, err := reconciler.extractDGDFromYAML([]byte(singleDocYAML))
Expect(err).NotTo(HaveOccurred())
Expect(dgd).NotTo(BeNil())
Expect(dgd.Kind).Should(Equal("DynamoGraphDeployment"))
Expect(dgd.Name).Should(Equal("test-dgd-single"))
})
It("Should handle DGD + ConfigMap order (DGD first)", func() {
// Multi-document YAML with DGD first, then ConfigMap
multiDocYAML := `---
apiVersion: nvidia.com/v1alpha1
kind: DynamoGraphDeployment
metadata:
name: test-dgd-first
namespace: default
spec:
backendFramework: vllm
services: {}
---
apiVersion: v1
kind: ConfigMap
metadata:
name: test-config
namespace: default
data:
some-data: "value"`
dgd, err := reconciler.extractDGDFromYAML([]byte(multiDocYAML))
Expect(err).NotTo(HaveOccurred())
Expect(dgd).NotTo(BeNil())
Expect(dgd.Kind).Should(Equal("DynamoGraphDeployment"))
Expect(dgd.Name).Should(Equal("test-dgd-first"))
})
It("Should return error when no DGD found", func() {
// YAML with only ConfigMap
configMapOnlyYAML := `---
apiVersion: v1
kind: ConfigMap
metadata:
name: test-config
namespace: default
data:
some-data: "value"`
_, err := reconciler.extractDGDFromYAML([]byte(configMapOnlyYAML))
Expect(err).To(HaveOccurred())
Expect(err.Error()).Should(ContainSubstring("no DynamoGraphDeployment found"))
})
It("Should handle YAML with leading separator", func() {
// YAML starting with --- separator
yamlWithLeadingSeparator := `---
apiVersion: nvidia.com/v1alpha1
kind: DynamoGraphDeployment
metadata:
name: test-dgd-leading
namespace: default
spec:
backendFramework: vllm
services: {}`
dgd, err := reconciler.extractDGDFromYAML([]byte(yamlWithLeadingSeparator))
Expect(err).NotTo(HaveOccurred())
Expect(dgd).NotTo(BeNil())
Expect(dgd.Name).Should(Equal("test-dgd-leading"))
})
It("Should extract DGD and additional resources correctly", func() {
// Multi-document YAML with ConfigMap and DGD
multiDocYAML := `---
apiVersion: v1
kind: ConfigMap
metadata:
name: model-config
namespace: default
data:
model.json: '{"name": "test-model"}'
---
apiVersion: nvidia.com/v1alpha1
kind: DynamoGraphDeployment
metadata:
name: test-dgd
namespace: default
spec:
backendFramework: vllm
services: {}`
dgd, additionalResources, err := reconciler.extractResourcesFromYAML([]byte(multiDocYAML))
Expect(err).NotTo(HaveOccurred())
Expect(dgd).NotTo(BeNil())
Expect(dgd.Name).Should(Equal("test-dgd"))
Expect(additionalResources).To(HaveLen(1))
Expect(additionalResources[0].GetKind()).Should(Equal("ConfigMap"))
Expect(additionalResources[0].GetName()).Should(Equal("model-config"))
})
It("Should handle multiple additional resources", func() {
// Multi-document YAML with multiple ConfigMaps and DGD
multiDocYAML := `---
apiVersion: v1
kind: ConfigMap
metadata:
name: config1
data:
key1: value1
---
apiVersion: v1
kind: ConfigMap
metadata:
name: config2
data:
key2: value2
---
apiVersion: nvidia.com/v1alpha1
kind: DynamoGraphDeployment
metadata:
name: test-dgd
spec:
backendFramework: vllm
services: {}`
dgd, additionalResources, err := reconciler.extractResourcesFromYAML([]byte(multiDocYAML))
Expect(err).NotTo(HaveOccurred())
Expect(dgd).NotTo(BeNil())
Expect(additionalResources).To(HaveLen(2))
Expect(additionalResources[0].GetName()).Should(Equal("config1"))
Expect(additionalResources[1].GetName()).Should(Equal("config2"))
})
})
}) })
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment