Unverified Commit 740130eb authored by Jonathan Tong's avatar Jonathan Tong Committed by GitHub
Browse files

feat: surface status and errors for profiler in DGDR (#6855)


Signed-off-by: default avatarJont828 <jt572@cornell.edu>
parent 1114004e
......@@ -36,6 +36,7 @@ from dynamo.profiler.utils.dgd_generation import assemble_final_config
from dynamo.profiler.utils.dgdr_v1beta1_types import (
BackendType,
DynamoGraphDeploymentRequestSpec,
ProfilingPhase,
)
from dynamo.profiler.utils.dgdr_validate import (
valid_dgdr_spec,
......@@ -183,6 +184,14 @@ async def _execute_strategy(
deployment_clients,
)
ops.current_phase = ProfilingPhase.SelectingConfig
write_profiler_status(
ops.output_dir,
status=ProfilerStatus.RUNNING,
message="Filtering results and selecting cost-efficient configuration",
phase=ProfilingPhase.SelectingConfig,
)
best_config_df = pick_result["best_config_df"]
best_latencies = pick_result["best_latencies"]
......@@ -244,6 +253,7 @@ def _write_final_output(ops: ProfilerOperationalConfig, final_config: Any) -> bo
status=ProfilerStatus.FAILED,
error=error_msg,
message=error_msg,
phase=ProfilingPhase.GeneratingDGD,
)
return False
else:
......@@ -261,6 +271,7 @@ def _write_final_output(ops: ProfilerOperationalConfig, final_config: Any) -> bo
outputs={
"final_config": "final_config.yaml",
},
phase=ProfilingPhase.Done,
)
return True
......@@ -287,6 +298,7 @@ async def run_profile(
ops.output_dir,
status=ProfilerStatus.RUNNING,
message="Profiler job started",
phase=ProfilingPhase.Initializing,
)
try:
......@@ -312,6 +324,14 @@ async def run_profile(
# then validate DGDR features based on AIC support
validate_dgdr_dynamo_features(dgdr, aic_supported)
ops.current_phase = ProfilingPhase.SweepingPrefill
write_profiler_status(
ops.output_dir,
status=ProfilerStatus.RUNNING,
message="Sweeping parallelization strategies",
phase=ops.current_phase,
)
(
pick_result,
best_prefill_config,
......@@ -357,6 +377,13 @@ async def run_profile(
chosen_exp = pick_result.get("chosen_exp", "")
is_disagg_config = chosen_exp not in ("agg",) and bool(chosen_exp)
if not ops.dry_run and dgd_config and needs_profile_data(dgdr):
ops.current_phase = ProfilingPhase.BuildingCurves
write_profiler_status(
ops.output_dir,
status=ProfilerStatus.RUNNING,
message="Building interpolation curves for planner integration",
phase=ops.current_phase,
)
if not is_disagg_config:
logger.info(
"Picked config is aggregated (chosen_exp=%r) — "
......@@ -396,6 +423,13 @@ async def run_profile(
# ---------------------------------------------------------------
# Final DGD assembly
# ---------------------------------------------------------------
ops.current_phase = ProfilingPhase.GeneratingDGD
write_profiler_status(
ops.output_dir,
status=ProfilerStatus.RUNNING,
message="Packaging data and generating final DGD YAML",
phase=ops.current_phase,
)
final_config = assemble_final_config(
dgdr, ops, dgd_config, best_prefill_config, best_decode_config
)
......@@ -431,6 +465,7 @@ async def run_profile(
status=ProfilerStatus.FAILED,
error=str(e),
message=f"Profiler failed with exception: {type(e).__name__}",
phase=ops.current_phase,
)
raise
finally:
......
......@@ -43,6 +43,7 @@ from dynamo.profiler.utils.config_modifiers.protocol import apply_dgd_overrides
from dynamo.profiler.utils.dgdr_v1beta1_types import (
DynamoGraphDeploymentRequestSpec,
ModelCacheSpec,
ProfilingPhase,
)
from dynamo.profiler.utils.profile_common import (
ProfilerOperationalConfig,
......@@ -51,6 +52,7 @@ from dynamo.profiler.utils.profile_common import (
inject_tolerations_into_dgd,
)
from dynamo.profiler.utils.profile_decode import get_num_request_range
from dynamo.profiler.utils.profiler_status import ProfilerStatus, write_profiler_status
logger = logging.getLogger(__name__)
......@@ -68,7 +70,8 @@ async def _benchmark_prefill_candidates(
) -> pd.DataFrame:
"""Deploy each prefill candidate, measure TTFT, return prefill_df."""
prefill_rows: list[dict] = []
for candidate in prefill_candidates:
total_prefill = len(prefill_candidates)
for idx, candidate in enumerate(prefill_candidates, 1):
num_gpus = candidate.num_gpus
label = make_parallel_label(
candidate.tp,
......@@ -88,7 +91,18 @@ async def _benchmark_prefill_candidates(
model_name, model_path = config_modifier.get_model_name(candidate.dgd_config)
frontend_port = config_modifier.get_port(candidate.dgd_config)
progress_msg = (
f"Benchmarking prefill candidate {idx}/{total_prefill}: "
f"{label} ({num_gpus} GPUs)"
)
logger.info("Profiling prefill candidate %s with %d GPUs...", label, num_gpus)
ops.current_phase = ProfilingPhase.SweepingPrefill
write_profiler_status(
ops.output_dir,
status=ProfilerStatus.RUNNING,
message=progress_msg,
phase=ProfilingPhase.SweepingPrefill,
)
client = DynamoDeploymentClient(
namespace=ops.k8s_namespace,
......@@ -158,7 +172,8 @@ async def _benchmark_decode_candidates(
) -> pd.DataFrame:
"""Deploy each decode candidate, sweep num_request, return decode_df."""
decode_rows: list[dict] = []
for candidate in decode_candidates:
total_decode = len(decode_candidates)
for idx, candidate in enumerate(decode_candidates, 1):
num_gpus = candidate.num_gpus
label = make_parallel_label(
candidate.tp,
......@@ -178,7 +193,18 @@ async def _benchmark_decode_candidates(
model_name, model_path = config_modifier.get_model_name(candidate.dgd_config)
frontend_port = config_modifier.get_port(candidate.dgd_config)
progress_msg = (
f"Benchmarking decode candidate {idx}/{total_decode}: "
f"{label} ({num_gpus} GPUs)"
)
logger.info("Profiling decode candidate %s with %d GPUs...", label, num_gpus)
ops.current_phase = ProfilingPhase.SweepingDecode
write_profiler_status(
ops.output_dir,
status=ProfilerStatus.RUNNING,
message=progress_msg,
phase=ProfilingPhase.SweepingDecode,
)
client = DynamoDeploymentClient(
namespace=ops.k8s_namespace,
......@@ -377,6 +403,13 @@ async def run_thorough(
config_modifier = CONFIG_MODIFIERS[backend]
# --- Stage 2: Benchmarking ---
ops.current_phase = ProfilingPhase.SweepingPrefill
write_profiler_status(
ops.output_dir,
status=ProfilerStatus.RUNNING,
message="Sweeping parallelization strategies for prefill, measuring TTFT",
phase=ops.current_phase,
)
prefill_df = await _benchmark_prefill_candidates(
prefill_candidates,
ops,
......@@ -388,6 +421,13 @@ async def run_thorough(
deployment_clients,
config_modifier,
)
ops.current_phase = ProfilingPhase.SweepingDecode
write_profiler_status(
ops.output_dir,
status=ProfilerStatus.RUNNING,
message="Sweeping parallelization strategies for decode, measuring ITL",
phase=ops.current_phase,
)
decode_df = await _benchmark_decode_candidates(
decode_candidates,
ops,
......
......@@ -18,14 +18,17 @@
import copy
import logging
import os
from dataclasses import dataclass
from dataclasses import dataclass, field
import pandas as pd
from dynamo.profiler.utils.config_modifiers.parallelization_mapping import (
PickedParallelConfig,
)
from dynamo.profiler.utils.dgdr_v1beta1_types import DynamoGraphDeploymentRequestSpec
from dynamo.profiler.utils.dgdr_v1beta1_types import (
DynamoGraphDeploymentRequestSpec,
ProfilingPhase,
)
logger = logging.getLogger(__name__)
......@@ -111,6 +114,7 @@ class ProfilerOperationalConfig:
prefill_interpolation_granularity: int = DEFAULT_PREFILL_INTERPOLATION_GRANULARITY
decode_interpolation_granularity: int = DEFAULT_DECODE_INTERPOLATION_GRANULARITY
dry_run: bool = DEFAULT_DRY_RUN
current_phase: ProfilingPhase = field(default=ProfilingPhase.Initializing)
# ---------------------------------------------------------------------------
......
......@@ -16,6 +16,8 @@ from typing import Any
import yaml
from dynamo.profiler.utils.dgdr_v1beta1_types import ProfilingPhase
logger = logging.getLogger(__name__)
......@@ -36,6 +38,7 @@ def write_profiler_status(
message: str = "",
error: str = "",
outputs: dict | None = None,
phase: ProfilingPhase | None = None,
) -> None:
"""
Write profiler status file.
......@@ -46,6 +49,8 @@ def write_profiler_status(
message: Optional status message
error: Optional error message (for failed status)
outputs: Optional dict of output files (for success status)
phase: Optional profiling sub-phase (e.g. ProfilingPhase value).
Relayed by the sidecar to the controller for kubectl visibility.
"""
status_file = os.path.join(output_dir, STATUS_FILE_NAME)
status_data: dict[str, Any] = {
......@@ -58,6 +63,8 @@ def write_profiler_status(
status_data["error"] = error
if outputs:
status_data["outputs"] = outputs
if phase:
status_data["phase"] = phase.value
try:
with open(status_file, "w") as f:
......
......@@ -480,11 +480,9 @@ spec:
type: string
- jsonPath: .status.conditions[?(@.type=="Succeeded")].reason
name: Reason
priority: 1
type: string
- jsonPath: .status.conditions[?(@.type=="Succeeded")].message
name: Message
priority: 1
type: string
- jsonPath: .status.dgdName
name: DGD
......
......@@ -6,6 +6,7 @@
*.so
*.dylib
bin/*
tilt_bin/
Dockerfile.cross
#temp files
......@@ -28,6 +29,9 @@ go.work
.idea
.vscode
*.swp
# Tilt local settings (personal overrides)
tilt-settings.local.yaml
*.swo
*~
......
......@@ -499,8 +499,8 @@ type DynamoGraphDeploymentRequestStatus struct {
// +kubebuilder:printcolumn:name="Backend",type=string,JSONPath=`.spec.backend`
// +kubebuilder:printcolumn:name="Phase",type=string,JSONPath=`.status.phase`
// +kubebuilder:printcolumn:name="Profiling",type=string,JSONPath=`.status.profilingPhase`
// +kubebuilder:printcolumn:name="Reason",type=string,JSONPath=`.status.conditions[?(@.type=="Succeeded")].reason`,priority=1
// +kubebuilder:printcolumn:name="Message",type=string,JSONPath=`.status.conditions[?(@.type=="Succeeded")].message`,priority=1
// +kubebuilder:printcolumn:name="Reason",type=string,JSONPath=`.status.conditions[?(@.type=="Succeeded")].reason`
// +kubebuilder:printcolumn:name="Message",type=string,JSONPath=`.status.conditions[?(@.type=="Succeeded")].message`
// +kubebuilder:printcolumn:name="DGD",type=string,JSONPath=`.status.dgdName`
// +kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp"
type DynamoGraphDeploymentRequest struct {
......
......@@ -480,11 +480,9 @@ spec:
type: string
- jsonPath: .status.conditions[?(@.type=="Succeeded")].reason
name: Reason
priority: 1
type: string
- jsonPath: .status.conditions[?(@.type=="Succeeded")].message
name: Message
priority: 1
type: string
- jsonPath: .status.dgdName
name: DGD
......
......@@ -130,21 +130,61 @@ const (
MessageModelCachePVCNotFound = "model cache PVC %s not found in namespace %s"
)
// shell script template for the output copier sidecar
// shell script template for the output copier sidecar.
//
// The sidecar is a continuous poller that:
// 1. During profiling: polls profiler_status.yaml every 10s, relays phase+message
// to the output ConfigMap so the controller can track sub-phase progress.
// 2. After profiler terminates: writes the final profiling output (final_config.yaml
// + profiler_status.yaml) to the same ConfigMap, preserving the phase+message keys.
const sidecarScriptTemplate = `
set -e
set -o pipefail
# Wait for profiler container to terminate (no timeout - profiling can take hours)
echo "Waiting for profiler to complete..."
STATUS_FILE="{{.OutputPath}}/profiler_status.yaml"
LAST_PHASE=""
START_TIME=$(date +%s)
LAST_PROGRESS_LOG=$START_TIME
PROGRESS_INTERVAL=300
# relay_phase: read phase+message from profiler_status.yaml and write to ConfigMap.
# Only writes when the phase changes (debounce).
relay_phase() {
if [ ! -f "$STATUS_FILE" ]; then
return
fi
PHASE=$(grep "^phase:" "$STATUS_FILE" 2>/dev/null | awk '{print $2}' | tr -d '"' | tr -d "'" || true)
MESSAGE=$(grep "^message:" "$STATUS_FILE" 2>/dev/null | sed 's/^message: *//' | tr -d '"' | tr -d "'" || true)
if [ -z "$PHASE" ] || [ "$PHASE" = "$LAST_PHASE" ]; then
return
fi
echo "Phase update: $PHASE - $MESSAGE"
cat >/tmp/progress.yaml <<PEOF
apiVersion: v1
kind: ConfigMap
metadata:
name: {{.ConfigMapName}}
namespace: {{.Namespace}}
labels:
dgdr.nvidia.com/name: {{.DGDRName}}
dgdr.nvidia.com/namespace: {{.Namespace}}
nvidia.com/managed-by: dynamo-operator
data:
phase: "$PHASE"
message: "$MESSAGE"
PEOF
kubectl apply -f /tmp/progress.yaml 2>/dev/null && LAST_PHASE="$PHASE" || echo "Warning: failed to update progress ConfigMap"
}
# Main loop: poll profiler_status.yaml and wait for profiler to terminate
echo "Waiting for profiler to complete..."
while true; do
CURRENT_TIME=$(date +%s)
ELAPSED=$((CURRENT_TIME - START_TIME))
# Relay phase updates to ConfigMap
relay_phase
# Log progress every 5 minutes
if [ $((CURRENT_TIME - LAST_PROGRESS_LOG)) -ge $PROGRESS_INTERVAL ]; then
echo "Still waiting... ($(($ELAPSED / 60)) minutes elapsed)"
......@@ -157,12 +197,14 @@ while true; do
echo "Profiler terminated (ran for $(($ELAPSED / 60)) minutes)"
break
fi
sleep 5
sleep 10
done
# Final relay: pick up any last phase change written just before termination
relay_phase
# Check profiler status file (2 minute timeout)
echo "Checking profiler status..."
STATUS_FILE="{{.OutputPath}}/profiler_status.yaml"
TIMEOUT=120
CHECK_START=$(date +%s)
......@@ -206,9 +248,13 @@ case "$STATUS" in
;;
esac
echo "Creating ConfigMap..."
echo "Writing profiling output to ConfigMap..."
# Read final phase+message to preserve them alongside the profiling output
FINAL_PHASE=$(grep "^phase:" "$STATUS_FILE" 2>/dev/null | awk '{print $2}' | tr -d '"' | tr -d "'" || true)
FINAL_MESSAGE=$(grep "^message:" "$STATUS_FILE" 2>/dev/null | sed 's/^message: *//' | tr -d '"' | tr -d "'" || true)
# Start building ConfigMap YAML with DGD spec
# Start building ConfigMap YAML with DGD spec + preserved phase/message
cat >/tmp/cm.yaml <<EOF
apiVersion: v1
kind: ConfigMap
......@@ -217,8 +263,11 @@ metadata:
namespace: {{.Namespace}}
labels:
dgdr.nvidia.com/name: {{.DGDRName}}
dgdr.nvidia.com/namespace: {{.Namespace}}
nvidia.com/managed-by: dynamo-operator
data:
phase: "$FINAL_PHASE"
message: "$FINAL_MESSAGE"
{{.OutputFile}}: |
EOF
sed 's/^/ /' {{.OutputPath}}/{{.OutputFile}} >> /tmp/cm.yaml
......@@ -242,6 +291,44 @@ kubectl apply -f /tmp/cm.yaml
echo "Saved profiling output to ConfigMap {{.ConfigMapName}}"
`
// profilingPhaseReason returns the condition Reason for a profiling sub-phase.
// By design, the ProfilingPhase string values are identical to the Reason values
// (e.g., ProfilingPhaseSweepingDecode = "SweepingDecode" = ProfilingReasonSweepingDecode).
func profilingPhaseReason(phase nvidiacomv1beta1.ProfilingPhase) string {
if phase == nvidiacomv1beta1.ProfilingPhaseDone {
return nvidiacomv1beta1.ProfilingReasonCompleted
}
return string(phase)
}
// profilingPhaseFailureReason returns the condition Reason for a failed profiling sub-phase.
// By convention, failure reasons are "<Phase>Failed" (e.g., "SweepingDecodeFailed").
// An empty phase yields the generic "ProfilingFailed".
func profilingPhaseFailureReason(phase nvidiacomv1beta1.ProfilingPhase) string {
if phase == "" {
return "ProfilingFailed"
}
return string(phase) + "Failed"
}
// validProfilingPhases is the set of phases the profiler sidecar may report.
var validProfilingPhases = map[nvidiacomv1beta1.ProfilingPhase]struct{}{
nvidiacomv1beta1.ProfilingPhaseInitializing: {},
nvidiacomv1beta1.ProfilingPhaseSweepingPrefill: {},
nvidiacomv1beta1.ProfilingPhaseSweepingDecode: {},
nvidiacomv1beta1.ProfilingPhaseSelectingConfig: {},
nvidiacomv1beta1.ProfilingPhaseBuildingCurves: {},
nvidiacomv1beta1.ProfilingPhaseGeneratingDGD: {},
nvidiacomv1beta1.ProfilingPhaseDone: {},
}
// isValidProfilingPhase returns true if phase is a recognized ProfilingPhase value.
func isValidProfilingPhase(phase string) bool {
_, ok := validProfilingPhases[nvidiacomv1beta1.ProfilingPhase(phase)]
return ok
}
// DynamoGraphDeploymentRequestReconciler reconciles a DynamoGraphDeploymentRequest object
type DynamoGraphDeploymentRequestReconciler struct {
client.Client
......@@ -389,9 +476,66 @@ func (r *DynamoGraphDeploymentRequestReconciler) handlePendingPhase(ctx context.
r.Recorder.Event(dgdr, corev1.EventTypeNormal, nvidiacomv1beta1.EventReasonProfilingJobCreated, MessageAICProfilingJobCreated)
}
// Update to Profiling phase — show DiscoveringHardware until the job is confirmed running.
// Update to Profiling phase — use Initializing reason to indicate the profiler is loading.
dgdr.SetProfilingPhase(nvidiacomv1beta1.ProfilingPhaseInitializing)
return r.updatePhaseWithCondition(ctx, dgdr, nvidiacomv1beta1.DGDRPhaseProfiling, nvidiacomv1beta1.ConditionTypeProfiling, metav1.ConditionFalse, "DiscoveringHardware", MessageDiscoveringHardware)
return r.updatePhaseWithCondition(ctx, dgdr, nvidiacomv1beta1.DGDRPhaseProfiling, nvidiacomv1beta1.ConditionTypeProfiling, metav1.ConditionFalse, nvidiacomv1beta1.ProfilingReasonInitializing, MessageDiscoveringHardware)
}
// updateProfilingSubPhase reads the output ConfigMap and updates status.profilingPhase
// and the Profiling/Succeeded conditions. The sidecar continuously polls profiler_status.yaml
// and writes phase+message to the output ConfigMap (dgdr-output-<name>). This function
// reads those keys and copies them verbatim into the DGDR status.
func (r *DynamoGraphDeploymentRequestReconciler) updateProfilingSubPhase(
ctx context.Context,
dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest,
) error {
logger := log.FromContext(ctx)
outputCMName := getOutputConfigMapName(dgdr)
cm := &corev1.ConfigMap{}
if err := r.Get(ctx, types.NamespacedName{
Name: outputCMName, Namespace: dgdr.Namespace,
}, cm); err != nil {
return nil // No output ConfigMap yet — skip
}
phase, exists := cm.Data["phase"]
if !exists || phase == "" {
return nil
}
if !isValidProfilingPhase(phase) {
return fmt.Errorf("invalid profiling phase %q in ConfigMap %s", phase, outputCMName)
}
profilingPhase := nvidiacomv1beta1.ProfilingPhase(phase)
if dgdr.Status.ProfilingPhase == profilingPhase {
return nil // No change
}
logger.Info("Profiling sub-phase updated", "phase", phase)
dgdr.SetProfilingPhase(profilingPhase)
// Reason is derived from phase; message comes from the profiler via ConfigMap.
reason := profilingPhaseReason(profilingPhase)
message := cm.Data["message"] // written by profiler, relayed by sidecar
meta.SetStatusCondition(&dgdr.Status.Conditions, metav1.Condition{
Type: nvidiacomv1beta1.ConditionTypeProfiling,
Status: metav1.ConditionFalse,
ObservedGeneration: dgdr.Generation,
Reason: reason,
Message: message,
})
meta.SetStatusCondition(&dgdr.Status.Conditions, metav1.Condition{
Type: nvidiacomv1beta1.ConditionTypeSucceeded,
Status: metav1.ConditionFalse,
ObservedGeneration: dgdr.Generation,
Reason: reason,
Message: message,
})
return r.Status().Update(ctx, dgdr)
}
// handleProfilingPhase monitors profiling progress and generates spec when complete
......@@ -399,21 +543,54 @@ func (r *DynamoGraphDeploymentRequestReconciler) handleProfilingPhase(ctx contex
logger := log.FromContext(ctx)
logger.Info("Handling profiling phase", "name", dgdr.Name)
// Check for sub-phase updates from output ConfigMap (populated by sidecar poller)
if err := r.updateProfilingSubPhase(ctx, dgdr); err != nil {
return ctrl.Result{}, err
}
// Check profiling job status (both online and offline/AIC run as Jobs)
// Note: We watch the Job via Owns(), so we'll be triggered automatically on Job changes
completed, err := r.checkProfilingJobStatus(ctx, dgdr)
if err != nil {
r.Recorder.Event(dgdr, corev1.EventTypeWarning, MessageProfilingCheckFailed, err.Error())
// Job failed - clear profiling sub-phase and transition to Failed
dgdr.ClearProfilingPhase()
return r.updatePhaseWithCondition(ctx, dgdr, nvidiacomv1beta1.DGDRPhaseFailed, nvidiacomv1beta1.ConditionTypeProfiling, metav1.ConditionFalse, "ProfilingFailed", err.Error())
// Job failed - keep profilingPhase set so users can see where it died.
// profilingPhase is already current: set to Initializing on entry,
// then updated by updateProfilingSubPhase() above (reads output ConfigMap).
failureReason := "ProfilingFailed"
failureMessage := err.Error()
if dgdr.Status.ProfilingPhase != "" {
failureReason = profilingPhaseFailureReason(dgdr.Status.ProfilingPhase)
}
// Set phase and conditions directly so we can use sub-phase-specific failure
// reason on both Profiling and Succeeded conditions. (updatePhaseWithCondition
// would hardcode Succeeded reason to generic "Failed".)
dgdr.Status.Phase = nvidiacomv1beta1.DGDRPhaseFailed
meta.SetStatusCondition(&dgdr.Status.Conditions, metav1.Condition{
Type: nvidiacomv1beta1.ConditionTypeSucceeded,
Status: metav1.ConditionFalse,
ObservedGeneration: dgdr.Generation,
Reason: failureReason,
Message: failureMessage,
})
dgdr.AddStatusCondition(metav1.Condition{
Type: nvidiacomv1beta1.ConditionTypeProfiling,
Status: metav1.ConditionFalse,
ObservedGeneration: dgdr.Generation,
Reason: failureReason,
Message: failureMessage,
})
if err := r.Status().Update(ctx, dgdr); err != nil {
return ctrl.Result{}, err
}
return ctrl.Result{Requeue: true}, nil
}
if !completed {
logger.Info("Profiling job still running", "name", dgdr.Name)
// Transition from DiscoveringHardware to ProfilingRunning once the job is confirmed active.
// Transition from Initializing to ProfilingRunning once the job is confirmed active.
cond := meta.FindStatusCondition(dgdr.Status.Conditions, nvidiacomv1beta1.ConditionTypeProfiling)
if cond != nil && cond.Reason == "DiscoveringHardware" {
if cond != nil && cond.Reason == nvidiacomv1beta1.ProfilingReasonInitializing {
return r.updatePhaseWithCondition(ctx, dgdr, nvidiacomv1beta1.DGDRPhaseProfiling, nvidiacomv1beta1.ConditionTypeProfiling, metav1.ConditionFalse, "ProfilingRunning", MessageProfilingInProgress)
}
// Don't requeue - we'll be triggered when the Job completes/fails
......@@ -1784,6 +1961,7 @@ func (r *DynamoGraphDeploymentRequestReconciler) SetupWithManager(mgr ctrl.Manag
UpdateFunc: func(de event.UpdateEvent) bool { return true },
GenericFunc: func(ge event.GenericEvent) bool { return true },
})). // Watch Jobs created by this controller (via ownerReference)
// Watch DGDs created by this controller (via label)
Watches(
&dgdv1alpha1.DynamoGraphDeployment{},
handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, obj client.Object) []ctrl.Request {
......@@ -1809,7 +1987,41 @@ func (r *DynamoGraphDeploymentRequestReconciler) SetupWithManager(mgr ctrl.Manag
GenericFunc: func(ge event.GenericEvent) bool { return true },
}),
).
// Watch DGDs created by this controller (via label)
// Watch output ConfigMaps for profiling sub-phase updates (via label)
Watches(
&corev1.ConfigMap{},
handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, obj client.Object) []ctrl.Request {
// Only trigger for ConfigMaps with DGDR labels (written by the sidecar)
cm := obj.(*corev1.ConfigMap)
dgdrName, hasName := cm.Labels[nvidiacomv1beta1.LabelDGDRName]
dgdrNamespace, hasNamespace := cm.Labels[nvidiacomv1beta1.LabelDGDRNamespace]
if !hasName || !hasNamespace {
return nil
}
return []ctrl.Request{{
NamespacedName: types.NamespacedName{
Name: dgdrName,
Namespace: dgdrNamespace,
},
}}
}),
builder.WithPredicates(predicate.Funcs{
CreateFunc: func(ce event.CreateEvent) bool {
labels := ce.Object.GetLabels()
_, hasName := labels[nvidiacomv1beta1.LabelDGDRName]
_, hasNamespace := labels[nvidiacomv1beta1.LabelDGDRNamespace]
return hasName && hasNamespace
},
UpdateFunc: func(ue event.UpdateEvent) bool {
labels := ue.ObjectNew.GetLabels()
_, hasName := labels[nvidiacomv1beta1.LabelDGDRName]
_, hasNamespace := labels[nvidiacomv1beta1.LabelDGDRNamespace]
return hasName && hasNamespace
},
DeleteFunc: func(de event.DeleteEvent) bool { return false },
GenericFunc: func(ge event.GenericEvent) bool { return false },
}),
).
// Set the event filter to ignore resources handled by other controllers in namespace-restricted mode
WithEventFilter(commonController.EphemeralDeploymentEventFilter(r.Config, r.RuntimeConfig)).
Complete(observability.NewObservedReconciler(r, consts.ResourceTypeDynamoGraphDeploymentRequest))
......
......@@ -240,10 +240,11 @@ class DynamoDeploymentClient:
self.deployment_spec is not None
), "Failed to load deployment specification"
# Extract component names
self.components = [
svc.lower() for svc in self.deployment_spec["spec"]["services"].keys()
]
# Extract component names (original case for label queries, lowercase for directories)
self._original_components = list(
self.deployment_spec["spec"]["services"].keys()
)
self.components = [svc.lower() for svc in self._original_components]
# Ensure name and namespace are set correctly
self.deployment_spec["metadata"]["name"] = self.deployment_name
......@@ -450,14 +451,17 @@ class DynamoDeploymentClient:
base_dir = self.base_log_dir / self.deployment_name
base_dir.mkdir(parents=True, exist_ok=True)
for component in self.components:
for component, original_name in zip(self.components, self._original_components):
component_dir = base_dir / component
component_dir.mkdir(exist_ok=True)
# List pods for this component using the selector label
# nvidia.com/selector: deployment-name-component
# Use DGD name + component name labels which are consistent across
# both Grove (PodCliqueSet) and non-Grove (DCD) deployment pathways.
# The previous nvidia.com/selector label includes a worker hash suffix
# on the DCD pathway, causing a mismatch with the expected base name.
label_selector = (
f"nvidia.com/selector={self.deployment_name}-{component.lower()}"
f"nvidia.com/dynamo-graph-deployment-name={self.deployment_name},"
f"nvidia.com/dynamo-component={original_name}"
)
pods = await self.core_api.list_namespaced_pod(
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment