Unverified Commit b97fde10 authored by hhzhang16's avatar hhzhang16 Committed by GitHub
Browse files

fix: propagate tolerations and cap auto-discovered GPUs (#6947)


Signed-off-by: default avatarHannah Zhang <hannahz@nvidia.com>
parent 503d042a
......@@ -44,6 +44,8 @@ from dynamo.profiler.utils.dgdr_validate import (
from dynamo.profiler.utils.profile_common import (
ProfilerOperationalConfig,
determine_picking_mode,
get_profiling_job_tolerations,
inject_tolerations_into_dgd,
needs_profile_data,
picked_config_from_row,
resolve_model_path,
......@@ -57,6 +59,17 @@ logger = logging.getLogger(__name__)
_CONCRETE_BACKENDS = ["trtllm", "sglang", "vllm"]
def _apply_tolerations_to_final_config(final_config: Any, tolerations: list) -> Any:
"""Apply tolerations to a final DGD config (dict or multi-doc list)."""
if not tolerations or not final_config:
return final_config
if isinstance(final_config, list):
result = list(final_config)
result[-1] = inject_tolerations_into_dgd(result[-1], tolerations)
return result
return inject_tolerations_into_dgd(final_config, tolerations)
def _check_auto_backend_support(model: str, system: str) -> bool:
"""
Return True if *any* concrete backend is AIC-supported for this model/system.
......@@ -370,6 +383,18 @@ async def run_profile(
elif isinstance(final_config, dict):
final_config = apply_dgd_overrides(final_config, dgdr.overrides.dgd)
logger.info("Applied DGD overrides to the final config.")
# Propagate profiling-job tolerations to the final DGD
job_tolerations = get_profiling_job_tolerations(dgdr)
if job_tolerations and final_config:
final_config = _apply_tolerations_to_final_config(
final_config, job_tolerations
)
logger.debug(
"Propagated %d profiling-job toleration(s) to the final DGD config.",
len(job_tolerations),
)
if not _write_final_output(ops, final_config):
return
......
......@@ -47,6 +47,8 @@ from dynamo.profiler.utils.dgdr_v1beta1_types import (
from dynamo.profiler.utils.profile_common import (
ProfilerOperationalConfig,
derive_backend_image,
get_profiling_job_tolerations,
inject_tolerations_into_dgd,
)
from dynamo.profiler.utils.profile_decode import get_num_request_range
......@@ -354,6 +356,24 @@ async def run_thorough(
len(decode_candidates),
)
# Propagate profiling-job tolerations to candidate DGDs
job_tolerations = get_profiling_job_tolerations(dgdr)
if job_tolerations:
for candidate in prefill_candidates:
candidate.dgd_config = inject_tolerations_into_dgd(
candidate.dgd_config, job_tolerations
)
for candidate in decode_candidates:
candidate.dgd_config = inject_tolerations_into_dgd(
candidate.dgd_config, job_tolerations
)
logger.debug(
"Propagated %d profiling-job toleration(s) to %d prefill + %d decode candidates.",
len(job_tolerations),
len(prefill_candidates),
len(decode_candidates),
)
config_modifier = CONFIG_MODIFIERS[backend]
# --- Stage 2: Benchmarking ---
......
......@@ -15,6 +15,7 @@
"""Shared helpers and configuration for the profiler pipeline."""
import copy
import logging
import os
from dataclasses import dataclass
......@@ -231,3 +232,35 @@ def warn_gpu_shortage(
gpus_needed,
total_gpus,
)
def get_profiling_job_tolerations(dgdr: DynamoGraphDeploymentRequestSpec) -> list:
"""Return tolerations from overrides.profilingJob.template.spec.tolerations."""
try:
if dgdr.overrides is None or dgdr.overrides.profilingJob is None:
return []
return (
dgdr.overrides.profilingJob.get("template", {})
.get("spec", {})
.get("tolerations", [])
)
except (AttributeError, KeyError):
return []
def inject_tolerations_into_dgd(dgd_config: dict, tolerations: list) -> dict:
"""Add tolerations to every service's extraPodSpec in a DGD config dict.
Tolerations already present in a service are preserved; only new entries
(by identity) are appended. Returns a deep copy with tolerations applied.
"""
result = copy.deepcopy(dgd_config)
for _svc_name, svc in result.get("spec", {}).get("services", {}).items():
if not isinstance(svc, dict):
continue
eps = svc.setdefault("extraPodSpec", {})
existing = eps.get("tolerations", [])
new_entries = [t for t in tolerations if t not in existing]
if new_entries:
eps["tolerations"] = list(existing) + new_entries
return result
......@@ -106,6 +106,7 @@ const (
// Messages
MessageInitialized = "DGDR initialized successfully"
MessageDiscoveringHardware = "Discovering GPU hardware and preparing profiling job"
MessageProfilingJobCreated = "Profiling job created"
MessageAICProfilingJobCreated = "AIC profiling job created"
MessageProfilingInProgress = "Profiling is in progress"
......@@ -358,9 +359,11 @@ func (r *DynamoGraphDeploymentRequestReconciler) handlePendingPhase(ctx context.
// Set observedGeneration to track the spec we're processing
dgdr.Status.ObservedGeneration = dgdr.Generation
// Initialize status
// Initialize status — next reconcile will discover hardware and create the profiling job.
r.Recorder.Event(dgdr, corev1.EventTypeNormal, nvidiacomv1beta1.EventReasonInitialized, MessageInitialized)
return r.updatePhaseAndRequeue(ctx, dgdr, nvidiacomv1beta1.DGDRPhasePending, MessageInitialized)
return r.updatePhaseWithCondition(ctx, dgdr, nvidiacomv1beta1.DGDRPhasePending,
nvidiacomv1beta1.ConditionTypeProfiling, metav1.ConditionFalse,
"DiscoveringHardware", MessageDiscoveringHardware)
}
logger.Info("Handling pending phase", "name", dgdr.Name)
......@@ -378,9 +381,9 @@ func (r *DynamoGraphDeploymentRequestReconciler) handlePendingPhase(ctx context.
r.Recorder.Event(dgdr, corev1.EventTypeNormal, nvidiacomv1beta1.EventReasonProfilingJobCreated, MessageAICProfilingJobCreated)
}
// Update to Profiling phase with Running status
// Update to Profiling phase — show DiscoveringHardware until the job is confirmed running.
dgdr.SetProfilingPhase(nvidiacomv1beta1.ProfilingPhaseInitializing)
return r.updatePhaseWithCondition(ctx, dgdr, nvidiacomv1beta1.DGDRPhaseProfiling, nvidiacomv1beta1.ConditionTypeProfiling, metav1.ConditionFalse, "ProfilingRunning", MessageProfilingInProgress)
return r.updatePhaseWithCondition(ctx, dgdr, nvidiacomv1beta1.DGDRPhaseProfiling, nvidiacomv1beta1.ConditionTypeProfiling, metav1.ConditionFalse, "DiscoveringHardware", MessageDiscoveringHardware)
}
// handleProfilingPhase monitors profiling progress and generates spec when complete
......@@ -400,6 +403,11 @@ func (r *DynamoGraphDeploymentRequestReconciler) handleProfilingPhase(ctx contex
if !completed {
logger.Info("Profiling job still running", "name", dgdr.Name)
// Transition from DiscoveringHardware to ProfilingRunning once the job is confirmed active.
cond := meta.FindStatusCondition(dgdr.Status.Conditions, nvidiacomv1beta1.ConditionTypeProfiling)
if cond != nil && cond.Reason == "DiscoveringHardware" {
return r.updatePhaseWithCondition(ctx, dgdr, nvidiacomv1beta1.DGDRPhaseProfiling, nvidiacomv1beta1.ConditionTypeProfiling, metav1.ConditionFalse, "ProfilingRunning", MessageProfilingInProgress)
}
// Don't requeue - we'll be triggered when the Job completes/fails
return ctrl.Result{}, nil
}
......@@ -1197,7 +1205,15 @@ func (r *DynamoGraphDeploymentRequestReconciler) enrichHardwareFromDiscovery(ctx
hw.NumGPUsPerNode = &n
}
if hw.TotalGPUs == nil {
// TODO: This is a temporary limit to prevent the profiler from using too many GPUs.
// Will be removed once a fix is in the Profiler/AIC.
const defaultMaxAutoGPUs = int32(32)
total := int32(gpuInfo.GPUsPerNode * gpuInfo.NodesWithGPUs)
if total > defaultMaxAutoGPUs {
logger.Info("Capping auto-discovered TotalGPUs at default limit; set hardware.totalGpus to override",
"discovered", total, "cap", defaultMaxAutoGPUs)
total = defaultMaxAutoGPUs
}
hw.TotalGPUs = &total
}
return nil
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment