dynamographdeploymentrequest_controller.go 78.2 KB
Newer Older
1
/*
2
 * SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
 * SPDX-License-Identifier: Apache-2.0
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package controller

import (
	"bytes"
	"context"
23
24
	"encoding/json"
	"errors"
25
	"fmt"
26
	"io"
27
	"strings"
28
29
30
31
32
33
34
	"text/template"

	batchv1 "k8s.io/api/batch/v1"
	corev1 "k8s.io/api/core/v1"
	apierrors "k8s.io/apimachinery/pkg/api/errors"
	"k8s.io/apimachinery/pkg/api/meta"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
35
	"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
36
37
	"k8s.io/apimachinery/pkg/runtime"
	"k8s.io/apimachinery/pkg/types"
38
	"k8s.io/apimachinery/pkg/util/yaml"
39
	"k8s.io/client-go/tools/record"
40
	"k8s.io/utils/ptr"
41
42
43
44
45
46
47
	ctrl "sigs.k8s.io/controller-runtime"
	"sigs.k8s.io/controller-runtime/pkg/builder"
	"sigs.k8s.io/controller-runtime/pkg/client"
	"sigs.k8s.io/controller-runtime/pkg/event"
	"sigs.k8s.io/controller-runtime/pkg/handler"
	"sigs.k8s.io/controller-runtime/pkg/log"
	"sigs.k8s.io/controller-runtime/pkg/predicate"
48
	sigsyaml "sigs.k8s.io/yaml"
49

50
	configv1alpha1 "github.com/ai-dynamo/dynamo/deploy/operator/api/config/v1alpha1"
51
52
	dgdv1alpha1 "github.com/ai-dynamo/dynamo/deploy/operator/api/v1alpha1"
	nvidiacomv1beta1 "github.com/ai-dynamo/dynamo/deploy/operator/api/v1beta1"
53
	"github.com/ai-dynamo/dynamo/deploy/operator/internal/consts"
54
	commonController "github.com/ai-dynamo/dynamo/deploy/operator/internal/controller_common"
55
	"github.com/ai-dynamo/dynamo/deploy/operator/internal/gpu"
56
	"github.com/ai-dynamo/dynamo/deploy/operator/internal/observability"
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
)

const (
	// Job naming
	JobNamePrefixOnline = "profile-online-"
	JobNamePrefixAIC    = "profile-aic-"

	// Container names
	ContainerNameProfiler     = "profiler"
	ContainerNameOutputCopier = "output-copier"

	// ServiceAccount
	ServiceAccountProfilingJob = "dgdr-profiling-job"

	// ConfigMap naming
	ConfigMapOutputPrefix = "dgdr-output-"

74
75
76
	// Annotation keys
	AnnotationAdditionalResources = "dgdr.nvidia.com/additional-resources"

77
78
79
80
81
82
	// Annotation keys for v1alpha1 round-trip compatibility.
	// The conversion layer stores v1alpha1 fields that have no v1beta1 spec equivalent
	// as annotations so the controller can still honour them for converted resources.
	AnnotationConfigMapRef = "nvidia.com/dgdr-config-map-ref"
	AnnotationOutputPVC    = "nvidia.com/dgdr-output-pvc"

83
84
85
	// Size limits
	MaxAnnotationSize = 250000 // ~250KB, below K8s 256KB limit

86
87
88
89
90
	// Sidecar image
	SidecarImage = "bitnami/kubectl:latest"

	// Volume names
	VolumeNameProfilingOutput = "profiling-output"
91
	VolumeNameProfilingConfig = "profiling-config"
92
	VolumeNameModelCache      = "model-cache"
93
94

	// Volume paths
95
	ProfilingOutputPath        = "/data"
96
	ProfilingOutputFile        = "final_config.yaml"
97
98
	ProfilingConfigMountPath   = "/config"
	ProfilingConfigDefaultKey  = "disagg.yaml"
99
	DefaultModelCacheMountPath = "/opt/model-cache"
100
101
102
103
104
105
106
107
108
109

	// Command line arguments
	ArgModel   = "--model"
	ArgBackend = "--backend"
	ArgTTFT    = "--ttft"
	ArgITL     = "--itl"
	ArgConfig  = "--config"

	// Messages
	MessageInitialized               = "DGDR initialized successfully"
110
	MessageDiscoveringHardware       = "Discovering GPU hardware and preparing profiling job"
111
112
113
114
	MessageProfilingJobCreated       = "Profiling job created"
	MessageAICProfilingJobCreated    = "AIC profiling job created"
	MessageProfilingInProgress       = "Profiling is in progress"
	MessageSpecGenerated             = "DynamoGraphDeployment spec generated successfully"
115
	MessageSpecAvailable             = "Generated spec is available in annotation nvidia.com/generated-dgd-spec"
116
117
118
119
120
	MessageDeploymentCreated         = "DynamoGraphDeployment %s created successfully"
	MessageDeploymentReady           = "DynamoGraphDeployment %s is ready"
	MessageDeploymentDegraded        = "DynamoGraphDeployment %s degraded from Ready to %s"
	MessageDeploymentDeleted         = "DGD %s was deleted. DGDR will not recreate it. Delete this DGDR and create a new one to redeploy."
	MessageInvalidState              = "Invalid state"
121
	MessageSpecChangeRejected        = "Cannot modify spec in phase '%s'. DynamoGraphDeploymentRequest is immutable once profiling starts. Create a new resource with a different name instead."
122
123
124
125
126
127
128
129
	MessageJobCreationFailed         = "JobCreationFailed"
	MessageDeploymentCreationFailed  = "DeploymentCreationFailed"
	MessageResultsRetrievalFailed    = "ResultsRetrievalFailed"
	MessageGenerationFailed          = "GenerationFailed"
	MessageAIConfiguratorCheckFailed = "AIConfiguratorCheckFailed"
	MessageProfilingCheckFailed      = "ProfilingCheckFailed"
	MessageConfigMapNotFound         = "ConfigMap %s not found in namespace %s"
	MessageConfigMapKeyNotFound      = "key %s not found in ConfigMap %s"
130
	MessageModelCachePVCNotFound     = "model cache PVC %s not found in namespace %s"
131
132
)

133
134
135
136
137
138
139
// shell script template for the output copier sidecar.
//
// The sidecar is a continuous poller that:
//  1. During profiling: polls profiler_status.yaml every 10s, relays phase+message
//     to the output ConfigMap so the controller can track sub-phase progress.
//  2. After profiler terminates: writes the final profiling output (final_config.yaml
//     + profiler_status.yaml) to the same ConfigMap, preserving the phase+message keys.
140
141
142
const sidecarScriptTemplate = `
set -e
set -o pipefail
143

144
145
STATUS_FILE="{{.OutputPath}}/profiler_status.yaml"
LAST_PHASE=""
146
147
148
149
START_TIME=$(date +%s)
LAST_PROGRESS_LOG=$START_TIME
PROGRESS_INTERVAL=300

150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
# relay_phase: read phase+message from profiler_status.yaml and write to ConfigMap.
# Only writes when the phase changes (debounce).
relay_phase() {
  if [ ! -f "$STATUS_FILE" ]; then
    return
  fi
  PHASE=$(grep "^phase:" "$STATUS_FILE" 2>/dev/null | awk '{print $2}' | tr -d '"' | tr -d "'" || true)
  MESSAGE=$(grep "^message:" "$STATUS_FILE" 2>/dev/null | sed 's/^message: *//' | tr -d '"' | tr -d "'" || true)
  if [ -z "$PHASE" ] || [ "$PHASE" = "$LAST_PHASE" ]; then
    return
  fi
  echo "Phase update: $PHASE - $MESSAGE"
  cat >/tmp/progress.yaml <<PEOF
apiVersion: v1
kind: ConfigMap
metadata:
  name: {{.ConfigMapName}}
  namespace: {{.Namespace}}
  labels:
    dgdr.nvidia.com/name: {{.DGDRName}}
    dgdr.nvidia.com/namespace: {{.Namespace}}
    nvidia.com/managed-by: dynamo-operator
172
173
174
175
176
177
178
  ownerReferences:
  - apiVersion: nvidia.com/v1beta1
    kind: DynamoGraphDeploymentRequest
    name: {{.DGDRName}}
    uid: {{.DGDRuid}}
    blockOwnerDeletion: true
    controller: true
179
180
181
182
183
184
185
186
187
data:
  phase: "$PHASE"
  message: "$MESSAGE"
PEOF
  kubectl apply -f /tmp/progress.yaml 2>/dev/null && LAST_PHASE="$PHASE" || echo "Warning: failed to update progress ConfigMap"
}

# Main loop: poll profiler_status.yaml and wait for profiler to terminate
echo "Waiting for profiler to complete..."
188
while true; do
189
190
191
  CURRENT_TIME=$(date +%s)
  ELAPSED=$((CURRENT_TIME - START_TIME))

192
193
194
  # Relay phase updates to ConfigMap
  relay_phase

195
196
197
198
199
200
201
202
203
204
  # Log progress every 5 minutes
  if [ $((CURRENT_TIME - LAST_PROGRESS_LOG)) -ge $PROGRESS_INTERVAL ]; then
    echo "Still waiting... ($(($ELAPSED / 60)) minutes elapsed)"
    LAST_PROGRESS_LOG=$CURRENT_TIME
  fi

  # Check if profiler container terminated
  CONTAINER_STATUS=$(kubectl get pod $HOSTNAME -n {{.Namespace}} -o jsonpath='{.status.containerStatuses[?(@.name=="profiler")].state}' 2>/dev/null || echo "")
  if echo "$CONTAINER_STATUS" | grep -q "terminated"; then
    echo "Profiler terminated (ran for $(($ELAPSED / 60)) minutes)"
205
206
    break
  fi
207
  sleep 10
208
209
done

210
211
212
# Final relay: pick up any last phase change written just before termination
relay_phase

213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
# Check profiler status file (2 minute timeout)
echo "Checking profiler status..."
TIMEOUT=120
CHECK_START=$(date +%s)

# Wait for status file to exist
while [ ! -f "$STATUS_FILE" ]; do
  ELAPSED=$(($(date +%s) - CHECK_START))
  if [ $ELAPSED -ge $TIMEOUT ]; then
    echo "ERROR: Status file not found after ${TIMEOUT}s"
    exit 1
  fi
  sleep 2
done

# Read and parse status from YAML file
STATUS=$(grep "^status:" "$STATUS_FILE" | awk '{print $2}' | tr -d '"' | tr -d "'")

if [ -z "$STATUS" ]; then
  echo "ERROR: Invalid status file format"
  exit 1
fi

# Check status value
case "$STATUS" in
  success)
    MESSAGE=$(grep "^message:" "$STATUS_FILE" | sed 's/^message: *//' | tr -d '"' | tr -d "'")
    echo "Profiler succeeded: $MESSAGE"
    ;;
  failed)
    ERROR=$(grep "^error:" "$STATUS_FILE" | sed 's/^error: *//' | tr -d '"' | tr -d "'")
    MESSAGE=$(grep "^message:" "$STATUS_FILE" | sed 's/^message: *//' | tr -d '"' | tr -d "'")
    echo "ERROR: Profiler failed: ${ERROR:-$MESSAGE}"
    exit 1
    ;;
  running)
    echo "ERROR: Profiler still running (unexpected)"
    exit 1
    ;;
  *)
    echo "ERROR: Unknown status: $STATUS"
    exit 1
    ;;
esac

258
259
260
261
262
echo "Writing profiling output to ConfigMap..."

# Read final phase+message to preserve them alongside the profiling output
FINAL_PHASE=$(grep "^phase:" "$STATUS_FILE" 2>/dev/null | awk '{print $2}' | tr -d '"' | tr -d "'" || true)
FINAL_MESSAGE=$(grep "^message:" "$STATUS_FILE" 2>/dev/null | sed 's/^message: *//' | tr -d '"' | tr -d "'" || true)
263

264
# Start building ConfigMap YAML with DGD spec + preserved phase/message
265
266
267
268
269
270
271
272
cat >/tmp/cm.yaml <<EOF
apiVersion: v1
kind: ConfigMap
metadata:
  name: {{.ConfigMapName}}
  namespace: {{.Namespace}}
  labels:
    dgdr.nvidia.com/name: {{.DGDRName}}
273
    dgdr.nvidia.com/namespace: {{.Namespace}}
274
    nvidia.com/managed-by: dynamo-operator
275
276
277
278
279
280
281
  ownerReferences:
  - apiVersion: nvidia.com/v1beta1
    kind: DynamoGraphDeploymentRequest
    name: {{.DGDRName}}
    uid: {{.DGDRuid}}
    blockOwnerDeletion: true
    controller: true
282
data:
283
284
  phase: "$FINAL_PHASE"
  message: "$FINAL_MESSAGE"
285
286
287
288
  {{.OutputFile}}: |
EOF
sed 's/^/    /' {{.OutputPath}}/{{.OutputFile}} >> /tmp/cm.yaml

289
290
291
292
293
294
# Add profiler status file for debugging
if [ -f {{.OutputPath}}/profiler_status.yaml ]; then
  echo "  profiler_status.yaml: |" >> /tmp/cm.yaml
  sed 's/^/    /' {{.OutputPath}}/profiler_status.yaml >> /tmp/cm.yaml
fi

295
296
297
298
299
300
# Add webui_data.json for pareto curve data (used by operator to populate status.profilingResults.pareto)
if [ -f {{.OutputPath}}/webui_data.json ]; then
  echo "  webui_data.json: |" >> /tmp/cm.yaml
  sed 's/^/    /' {{.OutputPath}}/webui_data.json >> /tmp/cm.yaml
fi

301
302
# Note: Profiling data (raw_data.npz converted to JSON) is included in the
# generated DGD YAML as a separate ConfigMap by the profiler, no need to add it here
303
304
305
306
307

kubectl apply -f /tmp/cm.yaml
echo "Saved profiling output to ConfigMap {{.ConfigMapName}}"
`

308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
// profilingPhaseReason returns the condition Reason for a profiling sub-phase.
// By design, the ProfilingPhase string values are identical to the Reason values
// (e.g., ProfilingPhaseSweepingDecode = "SweepingDecode" = ProfilingReasonSweepingDecode).
func profilingPhaseReason(phase nvidiacomv1beta1.ProfilingPhase) string {
	if phase == nvidiacomv1beta1.ProfilingPhaseDone {
		return nvidiacomv1beta1.ProfilingReasonCompleted
	}

	return string(phase)
}

// profilingPhaseFailureReason returns the condition Reason for a failed profiling sub-phase.
// By convention, failure reasons are "<Phase>Failed" (e.g., "SweepingDecodeFailed").
// An empty phase yields the generic "ProfilingFailed".
func profilingPhaseFailureReason(phase nvidiacomv1beta1.ProfilingPhase) string {
	if phase == "" {
		return "ProfilingFailed"
	}
	return string(phase) + "Failed"
}

// validProfilingPhases is the set of phases the profiler sidecar may report.
var validProfilingPhases = map[nvidiacomv1beta1.ProfilingPhase]struct{}{
	nvidiacomv1beta1.ProfilingPhaseInitializing:    {},
	nvidiacomv1beta1.ProfilingPhaseSweepingPrefill: {},
	nvidiacomv1beta1.ProfilingPhaseSweepingDecode:  {},
	nvidiacomv1beta1.ProfilingPhaseSelectingConfig: {},
	nvidiacomv1beta1.ProfilingPhaseBuildingCurves:  {},
	nvidiacomv1beta1.ProfilingPhaseGeneratingDGD:   {},
	nvidiacomv1beta1.ProfilingPhaseDone:            {},
}

// isValidProfilingPhase returns true if phase is a recognized ProfilingPhase value.
func isValidProfilingPhase(phase string) bool {
	_, ok := validProfilingPhases[nvidiacomv1beta1.ProfilingPhase(phase)]
	return ok
}

346
347
348
// DynamoGraphDeploymentRequestReconciler reconciles a DynamoGraphDeploymentRequest object
type DynamoGraphDeploymentRequestReconciler struct {
	client.Client
349
350
351
352
353
354
	APIReader         client.Reader
	Recorder          record.EventRecorder
	Config            *configv1alpha1.OperatorConfiguration
	RuntimeConfig     *commonController.RuntimeConfig
	GPUDiscoveryCache *gpu.GPUDiscoveryCache
	GPUDiscovery      *gpu.GPUDiscovery
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
	// RBACMgr handles RBAC setup for profiling jobs
	RBACManager RBACManager
}

// RBACManager interface for managing RBAC resources
type RBACManager interface {
	EnsureServiceAccountWithRBAC(ctx context.Context, targetNamespace, serviceAccountName, clusterRoleName string) error
}

// GetRecorder implements commonController.Reconciler interface
func (r *DynamoGraphDeploymentRequestReconciler) GetRecorder() record.EventRecorder {
	return r.Recorder
}

// FinalizeResource implements commonController.Finalizer interface
370
func (r *DynamoGraphDeploymentRequestReconciler) FinalizeResource(ctx context.Context, dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest) error {
371
372
373
374
375
376
377
378
379
380
381
382
383
	logger := log.FromContext(ctx)

	logger.Info("DGDR finalized successfully", "name", dgdr.Name)
	return nil
}

// +kubebuilder:rbac:groups=nvidia.com,resources=dynamographdeploymentrequests,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=nvidia.com,resources=dynamographdeploymentrequests/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=nvidia.com,resources=dynamographdeploymentrequests/finalizers,verbs=update
// +kubebuilder:rbac:groups=nvidia.com,resources=dynamographdeployments,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=nvidia.com,resources=dynamographdeployments/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=nvidia.com,resources=dynamographdeployments/finalizers,verbs=update
// +kubebuilder:rbac:groups=batch,resources=jobs,verbs=get;list;watch;create;update;patch;delete
384
// +kubebuilder:rbac:groups=core,resources=pods,verbs=get;list;watch
385
386
387
388
389
390
391
392
393
// +kubebuilder:rbac:groups=core,resources=configmaps,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=core,resources=events,verbs=create;patch

// Reconcile handles the reconciliation loop for DynamoGraphDeploymentRequest
func (r *DynamoGraphDeploymentRequestReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
	logger := log.FromContext(ctx)
	logger.Info("Reconciling DynamoGraphDeploymentRequest", "name", req.Name, "namespace", req.Namespace)

	// Fetch the DGDR instance
394
	dgdr := &nvidiacomv1beta1.DynamoGraphDeploymentRequest{}
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
	if err := r.Get(ctx, req.NamespacedName, dgdr); err != nil {
		if apierrors.IsNotFound(err) {
			logger.Info("DGDR resource not found, ignoring since object must be deleted")
			return ctrl.Result{}, nil
		}
		logger.Error(err, "Failed to get DGDR")
		return ctrl.Result{}, err
	}

	// Handle finalizer using common function
	finalized, err := commonController.HandleFinalizer(ctx, dgdr, r.Client, r)
	if err != nil {
		return ctrl.Result{}, err
	}
	if finalized {
		// Resource was deleted and finalized
		return ctrl.Result{}, nil
	}

	// Check for spec changes (immutability enforcement)
	if dgdr.Status.ObservedGeneration > 0 && dgdr.Status.ObservedGeneration != dgdr.Generation {
		// Spec changed after initial processing
417
418
419
420
		if dgdr.Status.Phase == nvidiacomv1beta1.DGDRPhaseProfiling || dgdr.Status.Phase == nvidiacomv1beta1.DGDRPhaseDeploying ||
			dgdr.Status.Phase == nvidiacomv1beta1.DGDRPhaseReady || dgdr.Status.Phase == nvidiacomv1beta1.DGDRPhaseDeployed {
			logger.Info("Spec change detected in immutable phase",
				"phase", dgdr.Status.Phase,
421
422
423
				"observedGeneration", dgdr.Status.ObservedGeneration,
				"currentGeneration", dgdr.Generation)

424
425
			r.Recorder.Event(dgdr, corev1.EventTypeWarning, nvidiacomv1beta1.EventReasonSpecChangeRejected,
				fmt.Sprintf(MessageSpecChangeRejected, dgdr.Status.Phase))
426
427

			// Keep the old observedGeneration to continue rejecting changes
428
			// No phase transition - stay in current phase with old spec
429
430
431
			return ctrl.Result{}, nil
		}
	}
432
433
434
435
436
437
438
439
440
441
442
443
444
445
	// Phase machine: handle different phases
	switch dgdr.Status.Phase {
	case nvidiacomv1beta1.DGDRPhasePending, "":
		return r.handlePendingPhase(ctx, dgdr)
	case nvidiacomv1beta1.DGDRPhaseProfiling:
		return r.handleProfilingPhase(ctx, dgdr)
	case nvidiacomv1beta1.DGDRPhaseDeploying:
		return r.handleDeployingPhase(ctx, dgdr)
	case nvidiacomv1beta1.DGDRPhaseReady:
		return r.handleReadyPhase(ctx, dgdr)
	case nvidiacomv1beta1.DGDRPhaseDeployed:
		return r.handleDeployedPhase(ctx, dgdr)
	case nvidiacomv1beta1.DGDRPhaseFailed:
		return r.handleFailedPhase(ctx, dgdr)
446
	default:
447
448
		logger.Info("Unknown phase", "phase", dgdr.Status.Phase)
		return r.updatePhaseAndRequeue(ctx, dgdr, nvidiacomv1beta1.DGDRPhaseFailed, MessageInvalidState)
449
450
451
	}
}

452
453
454
455
// handlePendingPhase processes newly created or pending DGDR resources.
// When ObservedGeneration == 0, performs initial validation (merged from v1alpha1 Initializing state).
// Otherwise, starts the profiling process.
func (r *DynamoGraphDeploymentRequestReconciler) handlePendingPhase(ctx context.Context, dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest) (ctrl.Result, error) {
456
457
	logger := log.FromContext(ctx)

458
459
460
	// First-time processing: validate spec (merged from handleInitialState)
	if dgdr.Status.ObservedGeneration == 0 {
		logger.Info("Handling initial validation", "name", dgdr.Name)
461

462
463
464
465
466
		// Validate the spec
		if err := r.validateSpec(ctx, dgdr); err != nil {
			r.Recorder.Event(dgdr, corev1.EventTypeWarning, nvidiacomv1beta1.EventReasonValidationFailed, err.Error())
			return r.updatePhaseWithCondition(ctx, dgdr, nvidiacomv1beta1.DGDRPhaseFailed, nvidiacomv1beta1.ConditionTypeValidation, metav1.ConditionFalse, nvidiacomv1beta1.EventReasonValidationFailed, err.Error())
		}
467

468
469
		// Set observedGeneration to track the spec we're processing
		dgdr.Status.ObservedGeneration = dgdr.Generation
470

471
		// Initialize status — next reconcile will discover hardware and create the profiling job.
472
		r.Recorder.Event(dgdr, corev1.EventTypeNormal, nvidiacomv1beta1.EventReasonInitialized, MessageInitialized)
473
474
475
		return r.updatePhaseWithCondition(ctx, dgdr, nvidiacomv1beta1.DGDRPhasePending,
			nvidiacomv1beta1.ConditionTypeProfiling, metav1.ConditionFalse,
			"DiscoveringHardware", MessageDiscoveringHardware)
476
	}
477

478
	logger.Info("Handling pending phase", "name", dgdr.Name)
479
480
481

	// Create profiling job (online or AIC)
	if err := r.createProfilingJob(ctx, dgdr); err != nil {
482
483
		r.Recorder.Event(dgdr, corev1.EventTypeWarning, nvidiacomv1beta1.EventReasonProfilingJobFailed, err.Error())
		return r.updatePhaseWithCondition(ctx, dgdr, nvidiacomv1beta1.DGDRPhaseFailed, nvidiacomv1beta1.ConditionTypeProfiling, metav1.ConditionFalse, MessageJobCreationFailed, err.Error())
484
485
486
	}

	// Record event with appropriate message
487
	if isOnlineProfiling(dgdr) {
488
		r.Recorder.Event(dgdr, corev1.EventTypeNormal, nvidiacomv1beta1.EventReasonProfilingJobCreated, MessageProfilingJobCreated)
489
	} else {
490
		r.Recorder.Event(dgdr, corev1.EventTypeNormal, nvidiacomv1beta1.EventReasonProfilingJobCreated, MessageAICProfilingJobCreated)
491
492
	}

493
	// Update to Profiling phase — use Initializing reason to indicate the profiler is loading.
494
	dgdr.SetProfilingPhase(nvidiacomv1beta1.ProfilingPhaseInitializing)
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
	return r.updatePhaseWithCondition(ctx, dgdr, nvidiacomv1beta1.DGDRPhaseProfiling, nvidiacomv1beta1.ConditionTypeProfiling, metav1.ConditionFalse, nvidiacomv1beta1.ProfilingReasonInitializing, MessageDiscoveringHardware)
}

// updateProfilingSubPhase reads the output ConfigMap and updates status.profilingPhase
// and the Profiling/Succeeded conditions. The sidecar continuously polls profiler_status.yaml
// and writes phase+message to the output ConfigMap (dgdr-output-<name>). This function
// reads those keys and copies them verbatim into the DGDR status.
func (r *DynamoGraphDeploymentRequestReconciler) updateProfilingSubPhase(
	ctx context.Context,
	dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest,
) error {
	logger := log.FromContext(ctx)
	outputCMName := getOutputConfigMapName(dgdr)

	cm := &corev1.ConfigMap{}
	if err := r.Get(ctx, types.NamespacedName{
		Name: outputCMName, Namespace: dgdr.Namespace,
	}, cm); err != nil {
		return nil // No output ConfigMap yet — skip
	}

	phase, exists := cm.Data["phase"]
	if !exists || phase == "" {
		return nil
	}

	if !isValidProfilingPhase(phase) {
		return fmt.Errorf("invalid profiling phase %q in ConfigMap %s", phase, outputCMName)
	}

	profilingPhase := nvidiacomv1beta1.ProfilingPhase(phase)
	if dgdr.Status.ProfilingPhase == profilingPhase {
		return nil // No change
	}

	logger.Info("Profiling sub-phase updated", "phase", phase)
	dgdr.SetProfilingPhase(profilingPhase)

	// Reason is derived from phase; message comes from the profiler via ConfigMap.
	reason := profilingPhaseReason(profilingPhase)
	message := cm.Data["message"] // written by profiler, relayed by sidecar

	meta.SetStatusCondition(&dgdr.Status.Conditions, metav1.Condition{
		Type:               nvidiacomv1beta1.ConditionTypeProfiling,
		Status:             metav1.ConditionFalse,
		ObservedGeneration: dgdr.Generation,
		Reason:             reason,
		Message:            message,
	})
	meta.SetStatusCondition(&dgdr.Status.Conditions, metav1.Condition{
		Type:               nvidiacomv1beta1.ConditionTypeSucceeded,
		Status:             metav1.ConditionFalse,
		ObservedGeneration: dgdr.Generation,
		Reason:             reason,
		Message:            message,
	})

	return r.Status().Update(ctx, dgdr)
553
554
}

555
556
// handleProfilingPhase monitors profiling progress and generates spec when complete
func (r *DynamoGraphDeploymentRequestReconciler) handleProfilingPhase(ctx context.Context, dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest) (ctrl.Result, error) {
557
	logger := log.FromContext(ctx)
558
	logger.Info("Handling profiling phase", "name", dgdr.Name)
559

560
561
562
563
564
	// Check for sub-phase updates from output ConfigMap (populated by sidecar poller)
	if err := r.updateProfilingSubPhase(ctx, dgdr); err != nil {
		return ctrl.Result{}, err
	}

565
566
567
568
569
	// Check profiling job status (both online and offline/AIC run as Jobs)
	// Note: We watch the Job via Owns(), so we'll be triggered automatically on Job changes
	completed, err := r.checkProfilingJobStatus(ctx, dgdr)
	if err != nil {
		r.Recorder.Event(dgdr, corev1.EventTypeWarning, MessageProfilingCheckFailed, err.Error())
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
		// Job failed - keep profilingPhase set so users can see where it died.
		// profilingPhase is already current: set to Initializing on entry,
		// then updated by updateProfilingSubPhase() above (reads output ConfigMap).
		failureReason := "ProfilingFailed"
		failureMessage := err.Error()
		if dgdr.Status.ProfilingPhase != "" {
			failureReason = profilingPhaseFailureReason(dgdr.Status.ProfilingPhase)
		}

		// Set phase and conditions directly so we can use sub-phase-specific failure
		// reason on both Profiling and Succeeded conditions. (updatePhaseWithCondition
		// would hardcode Succeeded reason to generic "Failed".)
		dgdr.Status.Phase = nvidiacomv1beta1.DGDRPhaseFailed
		meta.SetStatusCondition(&dgdr.Status.Conditions, metav1.Condition{
			Type:               nvidiacomv1beta1.ConditionTypeSucceeded,
			Status:             metav1.ConditionFalse,
			ObservedGeneration: dgdr.Generation,
			Reason:             failureReason,
			Message:            failureMessage,
		})
		dgdr.AddStatusCondition(metav1.Condition{
			Type:               nvidiacomv1beta1.ConditionTypeProfiling,
			Status:             metav1.ConditionFalse,
			ObservedGeneration: dgdr.Generation,
			Reason:             failureReason,
			Message:            failureMessage,
		})
		if err := r.Status().Update(ctx, dgdr); err != nil {
			return ctrl.Result{}, err
		}
		return ctrl.Result{Requeue: true}, nil
601
602
603
604
	}

	if !completed {
		logger.Info("Profiling job still running", "name", dgdr.Name)
605
		// Transition from Initializing to ProfilingRunning once the job is confirmed active.
606
		cond := meta.FindStatusCondition(dgdr.Status.Conditions, nvidiacomv1beta1.ConditionTypeProfiling)
607
		if cond != nil && cond.Reason == nvidiacomv1beta1.ProfilingReasonInitializing {
608
609
			return r.updatePhaseWithCondition(ctx, dgdr, nvidiacomv1beta1.DGDRPhaseProfiling, nvidiacomv1beta1.ConditionTypeProfiling, metav1.ConditionFalse, "ProfilingRunning", MessageProfilingInProgress)
		}
610
611
612
613
		// Don't requeue - we'll be triggered when the Job completes/fails
		return ctrl.Result{}, nil
	}

614
615
616
617
618
619
620
621
622
	profilingResults, dgdName, err := r.generateDGDSpec(ctx, dgdr)
	if err != nil {
		dgdr.ClearProfilingPhase()
		r.Recorder.Event(dgdr, corev1.EventTypeWarning, MessageGenerationFailed, err.Error())
		return r.updatePhaseWithCondition(ctx, dgdr, nvidiacomv1beta1.DGDRPhaseFailed, nvidiacomv1beta1.ConditionTypeSpecGenerated, metav1.ConditionFalse, MessageGenerationFailed, err.Error())
	}
	if err := r.Get(ctx, types.NamespacedName{Name: dgdr.Name, Namespace: dgdr.Namespace}, dgdr); err != nil {
		return ctrl.Result{}, fmt.Errorf("failed to refetch DGDR after generateDGDSpec: %w", err)
	}
623

624
	dgdr.ClearProfilingPhase()
625
	meta.SetStatusCondition(&dgdr.Status.Conditions, metav1.Condition{
626
		Type:               nvidiacomv1beta1.ConditionTypeProfiling,
627
628
629
630
631
		Status:             metav1.ConditionTrue,
		ObservedGeneration: dgdr.Generation,
		Reason:             "ProfilingCompleted",
		Message:            "Profiling job completed successfully",
	})
632
633
	dgdr.Status.DGDName = dgdName
	dgdr.Status.ProfilingResults = profilingResults
634

635
	r.Recorder.Event(dgdr, corev1.EventTypeNormal, nvidiacomv1beta1.EventReasonSpecGenerated, MessageSpecGenerated)
636

637
638
	// Create additional resources (ConfigMaps) immediately after profiling
	// This ensures that the `planner-profile-data` ConfigMap is available for both auto and manual deployment
639
	// v1beta1 uses the DGDR namespace for additional resources.
640
641
642
643
644
645
646
647
	targetNamespace := dgdr.Namespace
	if err := r.createAdditionalResources(ctx, dgdr, targetNamespace); err != nil {
		logger.Error(err, "Failed to create additional resources after profiling")
		// Don't fail the DGDR, just log the error - ConfigMaps can be created manually
		r.Recorder.Event(dgdr, corev1.EventTypeWarning, "ConfigMapCreationFailed",
			fmt.Sprintf("Failed to create ConfigMaps from profiling output: %v", err))
	}

648
	// If autoApply is enabled, transition to Deploying phase
649
	if dgdr.Spec.AutoApply == nil || *dgdr.Spec.AutoApply {
650
651
		logger.Info("AutoApply enabled, transitioning to Deploying phase")
		return r.updatePhaseWithCondition(ctx, dgdr, nvidiacomv1beta1.DGDRPhaseDeploying, nvidiacomv1beta1.ConditionTypeSpecGenerated, metav1.ConditionTrue, nvidiacomv1beta1.EventReasonSpecGenerated, MessageSpecGenerated)
652
653
	}

654
655
	// Otherwise, transition to Ready phase
	return r.updatePhaseWithCondition(ctx, dgdr, nvidiacomv1beta1.DGDRPhaseReady, nvidiacomv1beta1.ConditionTypeSpecGenerated, metav1.ConditionTrue, nvidiacomv1beta1.EventReasonSpecGenerated, MessageSpecAvailable)
656
657
}

658
659
// handleReadyPhase handles DGDR in Ready phase (profiling complete, spec available)
func (r *DynamoGraphDeploymentRequestReconciler) handleReadyPhase(ctx context.Context, dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest) (ctrl.Result, error) {
660
661
662
	logger := log.FromContext(ctx)
	logger.Info("DGDR is ready", "name", dgdr.Name)

663
664
665
666
667
668
669
670
671
	// Nothing to monitor in Ready phase - spec is available for manual application
	return ctrl.Result{}, nil
}

// handleDeployingPhase handles DGD creation and monitors deployment
func (r *DynamoGraphDeploymentRequestReconciler) handleDeployingPhase(ctx context.Context, dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest) (ctrl.Result, error) {
	logger := log.FromContext(ctx)
	logger.Info("Handling deploying phase", "name", dgdr.Name)

672
	if dgdr.Spec.AutoApply != nil && !*dgdr.Spec.AutoApply {
673
674
675
676
677
		// Shouldn't be in this phase without autoApply
		logger.Info("AutoApply not enabled, transitioning to Ready")
		dgdr.Status.Phase = nvidiacomv1beta1.DGDRPhaseReady
		setSucceededCondition(dgdr, nvidiacomv1beta1.DGDRPhaseReady)
		return ctrl.Result{}, r.Status().Update(ctx, dgdr)
678
679
	}

680
681
682
683
684
	if dgdr.Status.DGDName == "" {
		return r.createDGD(ctx, dgdr)
	}

	dgd := &dgdv1alpha1.DynamoGraphDeployment{}
685
	err := r.Get(ctx, types.NamespacedName{
686
687
		Name:      dgdr.Status.DGDName,
		Namespace: dgdr.Namespace,
688
689
690
	}, dgd)

	if apierrors.IsNotFound(err) {
691
692
693
694
695
		// Annotation present means DGD was never created (spec ready but create not yet called).
		// Annotation absent means DGD was previously created and then manually deleted.
		if _, hasSpec := dgdr.Annotations["nvidia.com/generated-dgd-spec"]; hasSpec {
			return r.createDGD(ctx, dgdr)
		}
696
697
698
699
700
701
702
		return r.handleDGDDeleted(ctx, dgdr)
	}

	if err != nil {
		return ctrl.Result{}, err
	}

703
704
705
	// Check if DGD is Ready
	var condStatus metav1.ConditionStatus
	var condReason, condMessage string
706

707
708
709
710
	if dgd.Status.State == dgdv1alpha1.DGDStateSuccessful {
		logger.Info("DGD is Ready, transitioning to Deployed phase")
		dgdr.Status.Phase = nvidiacomv1beta1.DGDRPhaseDeployed
		setSucceededCondition(dgdr, nvidiacomv1beta1.DGDRPhaseDeployed)
711

712
713
		r.Recorder.Event(dgdr, corev1.EventTypeNormal, nvidiacomv1beta1.EventReasonDeploymentReady,
			fmt.Sprintf(MessageDeploymentReady, dgd.Name))
714

715
716
717
718
719
		condStatus = metav1.ConditionTrue
		condReason = nvidiacomv1beta1.EventReasonDeploymentReady
		condMessage = fmt.Sprintf(MessageDeploymentReady, dgd.Name)
	} else {
		logger.Info("DGD not yet ready", "name", dgd.Name, "state", dgd.Status.State)
720

721
722
723
		condStatus = metav1.ConditionFalse
		condReason = "DeploymentInProgress"
		condMessage = fmt.Sprintf("DGD %s is in %s state", dgd.Name, string(dgd.Status.State))
724
725
	}

726
727
728
729
730
731
732
733
	updateDeploymentInfo(dgdr, dgd)
	meta.SetStatusCondition(&dgdr.Status.Conditions, metav1.Condition{
		Type:    nvidiacomv1beta1.ConditionTypeDeploymentReady,
		Status:  condStatus,
		Reason:  condReason,
		Message: condMessage,
	})

734
735
736
	return ctrl.Result{}, r.Status().Update(ctx, dgdr)
}

737
738
// handleDeployedPhase monitors a healthy DGD and detects degradation or deletion
func (r *DynamoGraphDeploymentRequestReconciler) handleDeployedPhase(ctx context.Context, dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest) (ctrl.Result, error) {
739
	logger := log.FromContext(ctx)
740
	logger.Info("DGDR is deployed", "name", dgdr.Name)
741

742
743
	// Check if DGD still exists and monitor its status
	dgd := &dgdv1alpha1.DynamoGraphDeployment{}
744
	err := r.Get(ctx, types.NamespacedName{
745
746
		Name:      dgdr.Status.DGDName,
		Namespace: dgdr.Namespace,
747
748
749
750
751
752
753
754
755
756
757
	}, dgd)

	if apierrors.IsNotFound(err) {
		// DGD was deleted by user
		return r.handleDGDDeleted(ctx, dgdr)
	}

	if err != nil {
		return ctrl.Result{}, err
	}

758
759
760
761
	// Check if DGD degraded from Ready
	if dgd.Status.State != dgdv1alpha1.DGDStateSuccessful {
		logger.Info("DGD degraded, transitioning back to Deploying",
			"dgdState", dgd.Status.State)
762

763
764
765
		dgdr.Status.Phase = nvidiacomv1beta1.DGDRPhaseDeploying
		setSucceededCondition(dgdr, nvidiacomv1beta1.DGDRPhaseDeploying)
		updateDeploymentInfo(dgdr, dgd)
766

767
768
		r.Recorder.Event(dgdr, corev1.EventTypeWarning, nvidiacomv1beta1.EventReasonDeploymentDegraded,
			fmt.Sprintf(MessageDeploymentDegraded, dgd.Name, string(dgd.Status.State)))
769
770

		meta.SetStatusCondition(&dgdr.Status.Conditions, metav1.Condition{
771
772
773
774
			Type:    nvidiacomv1beta1.ConditionTypeDeploymentReady,
			Status:  metav1.ConditionFalse,
			Reason:  nvidiacomv1beta1.EventReasonDeploymentDegraded,
			Message: fmt.Sprintf("Deployment degraded to %s", string(dgd.Status.State)),
775
		})
776
777
778
779
780
781
	} else {
		// DGD is healthy — update replica info only if changed
		if !updateDeploymentInfo(dgdr, dgd) {
			// Nothing changed, skip the status write
			return ctrl.Result{}, nil
		}
782
783
784
785
786
	}

	return ctrl.Result{}, r.Status().Update(ctx, dgdr)
}

787
788
789
// handleDGDDeleted handles the case when auto-created DGD is deleted by user.
// In v1beta1, this transitions to Failed (DeploymentDeleted phase was removed).
func (r *DynamoGraphDeploymentRequestReconciler) handleDGDDeleted(ctx context.Context, dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest) (ctrl.Result, error) {
790
	logger := log.FromContext(ctx)
791
	logger.Info("DGD was deleted by user, transitioning to Failed phase")
792

793
794
	dgdr.Status.Phase = nvidiacomv1beta1.DGDRPhaseFailed
	setSucceededCondition(dgdr, nvidiacomv1beta1.DGDRPhaseFailed)
795

796
797
	r.Recorder.Event(dgdr, corev1.EventTypeWarning, nvidiacomv1beta1.EventReasonDeploymentDeleted,
		fmt.Sprintf(MessageDeploymentDeleted, dgdr.Status.DGDName))
798

799
800
	dgdr.Status.DGDName = ""
	dgdr.Status.DeploymentInfo = nil
801

802
	meta.SetStatusCondition(&dgdr.Status.Conditions, metav1.Condition{
803
		Type:    nvidiacomv1beta1.ConditionTypeDeploymentReady,
804
		Status:  metav1.ConditionFalse,
805
		Reason:  nvidiacomv1beta1.EventReasonDeploymentDeleted,
806
807
808
809
810
811
812
		Message: "Deployment was deleted by user. Create a new DGDR to redeploy.",
	})

	return ctrl.Result{}, r.Status().Update(ctx, dgdr)
}

// createDGD creates a DynamoGraphDeployment with the generated spec
813
func (r *DynamoGraphDeploymentRequestReconciler) createDGD(ctx context.Context, dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest) (ctrl.Result, error) {
814
815
	logger := log.FromContext(ctx)

816
817
818
819
	// Extract DGD spec from annotation (stored by generateDGDSpec)
	dgdSpecYAML, ok := dgdr.Annotations["nvidia.com/generated-dgd-spec"]
	if !ok || dgdSpecYAML == "" {
		return ctrl.Result{}, fmt.Errorf("generated DGD spec not found in annotation nvidia.com/generated-dgd-spec")
820
821
	}

822
823
824
	generatedDGD := &dgdv1alpha1.DynamoGraphDeployment{}
	if err := yaml.Unmarshal([]byte(dgdSpecYAML), generatedDGD); err != nil {
		return ctrl.Result{}, fmt.Errorf("failed to unmarshal generated deployment from annotation: %w", err)
825
826
	}

827
	// Determine DGD name and namespace from generated deployment
828
829
830
831
832
833
834
835
836
837
838
	dgdName := generatedDGD.Name
	dgdNamespace := dgdr.Namespace

	// Build labels (start with generated DGD's labels)
	labels := make(map[string]string)
	if generatedDGD.Labels != nil {
		for k, v := range generatedDGD.Labels {
			labels[k] = v
		}
	}
	// Add/override with managed labels
839
840
841
	labels[nvidiacomv1beta1.LabelDGDRName] = dgdr.Name
	labels[nvidiacomv1beta1.LabelDGDRNamespace] = dgdr.Namespace
	labels[nvidiacomv1beta1.LabelManagedBy] = nvidiacomv1beta1.LabelValueDynamoOperator
842
843
844
845
846
847
848
849
850
851

	// Build annotations (start with generated DGD's annotations)
	annotations := make(map[string]string)
	if generatedDGD.Annotations != nil {
		for k, v := range generatedDGD.Annotations {
			annotations[k] = v
		}
	}

	// Create DGD from generated deployment
852
	dgd := &dgdv1alpha1.DynamoGraphDeployment{
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
		ObjectMeta: metav1.ObjectMeta{
			Name:        dgdName,
			Namespace:   dgdNamespace,
			Labels:      labels,
			Annotations: annotations,
		},
		Spec: generatedDGD.Spec,
	}

	// Note: We don't set owner reference on DGD
	// If a DGDR is deleted, the DGD may be serving traffic and should persist independently.
	// We use labels (LabelDGDRName) to track the relationship.

	logger.Info("Creating DynamoGraphDeployment", "name", dgdName, "namespace", dgdNamespace)

	if err := r.Create(ctx, dgd); err != nil {
		if apierrors.IsAlreadyExists(err) {
			logger.Info("DGD already exists, updating status")
871
872
873
874
875
			delete(dgdr.Annotations, "nvidia.com/generated-dgd-spec")
			if updateErr := r.Update(ctx, dgdr); updateErr != nil {
				logger.Error(updateErr, "Failed to remove generated-dgd-spec annotation on IsAlreadyExists path")
				return ctrl.Result{}, updateErr
			}
876
			dgdr.Status.DGDName = dgdName
877
878
879
880
881
882
			return ctrl.Result{}, r.Status().Update(ctx, dgdr)
		}
		r.Recorder.Event(dgdr, corev1.EventTypeWarning, MessageDeploymentCreationFailed, err.Error())
		return ctrl.Result{}, err
	}

883
884
885
886
887
888
889
	delete(dgdr.Annotations, "nvidia.com/generated-dgd-spec")
	if err := r.Update(ctx, dgdr); err != nil {
		// Return the error to force a retry. The DGD was created successfully, so a
		// retry will hit the IsAlreadyExists path above and attempt cleanup again.
		return ctrl.Result{}, fmt.Errorf("failed to remove generated-dgd-spec annotation after DGD creation: %w", err)
	}

890
	// Update status
891
	dgdr.Status.DGDName = dgdName
892

893
	r.Recorder.Event(dgdr, corev1.EventTypeNormal, nvidiacomv1beta1.EventReasonDeploymentCreated,
894
895
896
		fmt.Sprintf(MessageDeploymentCreated, dgdName))

	meta.SetStatusCondition(&dgdr.Status.Conditions, metav1.Condition{
897
		Type:    nvidiacomv1beta1.ConditionTypeDeploymentReady,
898
		Status:  metav1.ConditionFalse,
899
		Reason:  nvidiacomv1beta1.EventReasonDeploymentCreated,
900
901
902
903
904
905
906
907
		Message: fmt.Sprintf("DGD %s created, waiting for Ready", dgdName),
	})

	logger.Info("DynamoGraphDeployment created successfully", "name", dgdName)

	return ctrl.Result{}, r.Status().Update(ctx, dgdr)
}

908
// createAdditionalResources creates ConfigMaps from the profiling output that should be deployed alongside the DGD
909
func (r *DynamoGraphDeploymentRequestReconciler) createAdditionalResources(ctx context.Context, dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest, targetNamespace string) error {
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
	logger := log.FromContext(ctx)

	// Check if there are additional resources stored in annotations
	if dgdr.Annotations == nil {
		return nil
	}

	resourcesYAML, exists := dgdr.Annotations[AnnotationAdditionalResources]
	if !exists || resourcesYAML == "" {
		return nil
	}

	// Parse using standard Kubernetes YAML decoder
	decoder := yaml.NewYAMLOrJSONDecoder(bytes.NewReader([]byte(resourcesYAML)), 4096)
	resourceCount := 0

	for {
		obj := &unstructured.Unstructured{}
		if err := decoder.Decode(obj); err != nil {
			if err == io.EOF {
				break
			}
			logger.Error(err, "Failed to decode resource, skipping")
			continue
		}

		if obj.GetKind() == "" {
			continue
		}

		resourceCount++

		// Only support ConfigMap for now (what profiler actually generates)
		if obj.GetKind() != "ConfigMap" {
			logger.Info("Skipping non-ConfigMap resource from profiling output", "kind", obj.GetKind(), "name", obj.GetName())
			continue
		}

		cm := &corev1.ConfigMap{}
		if err := runtime.DefaultUnstructuredConverter.FromUnstructured(obj.Object, cm); err != nil {
			logger.Error(err, "Failed to convert to ConfigMap", "name", obj.GetName())
			continue
		}

		// Override namespace and add tracking labels
		cm.Namespace = targetNamespace
		if cm.Labels == nil {
			cm.Labels = make(map[string]string)
		}
959
960
961
		cm.Labels[nvidiacomv1beta1.LabelDGDRName] = dgdr.Name
		cm.Labels[nvidiacomv1beta1.LabelDGDRNamespace] = dgdr.Namespace
		cm.Labels[nvidiacomv1beta1.LabelManagedBy] = nvidiacomv1beta1.LabelValueDynamoOperator
962

963
964
965
966
967
968
		// Use SyncResource to create/update the ConfigMap with owner reference and change detection
		_, _, err := commonController.SyncResource(ctx, r, dgdr, func(ctx context.Context) (*corev1.ConfigMap, bool, error) {
			return cm, false, nil
		})
		if err != nil {
			return fmt.Errorf("failed to sync ConfigMap %s: %w", cm.Name, err)
969
		}
970
		logger.Info("Synced ConfigMap from profiling output", "name", cm.Name, "namespace", targetNamespace)
971
972
973
974
975
976
977
978
979
	}

	if resourceCount > 0 {
		logger.Info("Deploying additional resources from profiling output", "count", resourceCount)
	}

	return nil
}

980
981
// handleFailedPhase handles DGDR in Failed phase
func (r *DynamoGraphDeploymentRequestReconciler) handleFailedPhase(ctx context.Context, dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest) (ctrl.Result, error) {
982
	logger := log.FromContext(ctx)
983
	logger.Info("DGDR is in failed phase", "name", dgdr.Name)
984
985
986
987
988

	// Could implement retry logic here if desired
	return ctrl.Result{}, nil
}

989
// getProfilingJobName returns the job name for a DGDR
990
func getProfilingJobName(dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest) string {
991
992
	// Use "profile-" prefix for all profiling jobs
	return fmt.Sprintf("profile-%s", dgdr.Name)
993
994
995
}

// getOutputConfigMapName returns the ConfigMap name for profiling output
996
func getOutputConfigMapName(dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest) string {
997
998
999
	return fmt.Sprintf("%s%s", ConfigMapOutputPrefix, dgdr.Name)
}

1000
1001
1002
// isOnlineProfiling returns true. In v1beta1, the profiler decides online vs AIC
// mode internally based on its config. The controller always uses the same label.
func isOnlineProfiling(_ *nvidiacomv1beta1.DynamoGraphDeploymentRequest) bool {
1003
1004
	return true
}
1005

1006
// validateSpec validates the DGDR spec
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
func (r *DynamoGraphDeploymentRequestReconciler) validateSpec(ctx context.Context, dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest) error {
	var errs []error

	// Disallow searchStrategy: thorough with backend: auto.
	if dgdr.Spec.SearchStrategy == nvidiacomv1beta1.SearchStrategyThorough &&
		dgdr.Spec.Backend == nvidiacomv1beta1.BackendTypeAuto {
		errs = append(errs, fmt.Errorf(
			"spec.searchStrategy %q is incompatible with spec.backend %q: set spec.backend to a specific backend (sglang, trtllm, or vllm)",
			nvidiacomv1beta1.SearchStrategyThorough,
			nvidiacomv1beta1.BackendTypeAuto,
		))
1018
1019
	}

1020
	// Validate model cache PVC if provided
1021
	if dgdr.Spec.ModelCache != nil && dgdr.Spec.ModelCache.PVCName != "" {
1022
1023
		pvc := &corev1.PersistentVolumeClaim{}
		err := r.Get(ctx, types.NamespacedName{
1024
			Name:      dgdr.Spec.ModelCache.PVCName,
1025
1026
1027
1028
1029
			Namespace: dgdr.Namespace,
		}, pvc)

		if err != nil {
			if apierrors.IsNotFound(err) {
1030
1031
1032
				errs = append(errs, fmt.Errorf(MessageModelCachePVCNotFound, dgdr.Spec.ModelCache.PVCName, dgdr.Namespace))
			} else {
				return err
1033
1034
1035
1036
			}
		}
	}

1037
	if err := r.validateGPUHardwareInfo(ctx, dgdr); err != nil {
1038
		errs = append(errs, err)
1039
1040
	}

1041
	// The profiler will validate the rest of the configuration
1042
	return errors.Join(errs...)
1043
1044
1045
}

// validateGPUHardwareInfo ensures GPU hardware information is available when required for profiling
1046
func (r *DynamoGraphDeploymentRequestReconciler) validateGPUHardwareInfo(ctx context.Context, dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest) error {
1047
1048
	logger := log.FromContext(ctx)

1049
1050
1051
	// All four hardware fields set — discovery not needed.
	hw := dgdr.Spec.Hardware
	if hw != nil && hw.GPUSKU != "" && hw.VRAMMB != nil && hw.NumGPUsPerNode != nil && hw.TotalGPUs != nil {
1052
1053
1054
		return nil
	}

1055
1056
1057
	// Try DCGM discovery. In namespace-scoped mode this requires a ClusterRole
	// granting pod list/get (provisioned by the Helm chart when
	// gpuDiscovery.enabled=true).
1058
1059
1060
1061
	_, err := r.GPUDiscovery.DiscoverGPUsFromDCGM(ctx, r.APIReader, r.GPUDiscoveryCache)
	if err == nil {
		return nil
	}
1062

1063
1064
	reason := GetGPUDiscoveryFailureReason(err)
	logger.Info("GPU discovery not available", "reason", reason, "error", err.Error())
1065
	return fmt.Errorf("GPU hardware info required but auto-discovery failed. Add spec.hardware.gpuSku, spec.hardware.vramMb, spec.hardware.numGpusPerNode, spec.hardware.totalGpus")
1066
1067
}

1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
// GetGPUDiscoveryFailureReason classifies a GPU discovery error and
// returns a stable, actionable reason string suitable for structured logging.
//
// The classification is based on known error message patterns produced during:
//   - DCGM exporter pod discovery
//   - Helm-based GPU operator and DCGM discovery
//   - Metrics scraping
//   - Prometheus parsing
//
// If the error does not match any known category, "unknown" is returned.
func GetGPUDiscoveryFailureReason(err error) string {
	if err == nil {
		return "unknown"
	}
	errMsg := strings.ToLower(err.Error())

	switch {
	case strings.Contains(errMsg, "list pods"):
		return "failed to list DCGM exporter pods (RBAC/cluster connectivity issue)"
	case strings.Contains(errMsg, "gpu operator is not installed"):
		return "GPU Operator not installed in expected namespace"
	case strings.Contains(errMsg, "helm init failed"):
		return "failed to initialize Helm client (RBAC, kubeconfig, or Helm driver issue)"
	case strings.Contains(errMsg, "timeout waiting for dcgm exporter pods"):
		return "timeout while waiting for DCGM exporter pods to become ready"
	case strings.Contains(errMsg, "http get"):
		return "failed to reach DCGM metrics endpoint on pod (network/port issue)"
	case strings.Contains(errMsg, "metrics endpoint") &&
		strings.Contains(errMsg, "status"):
		return "DCGM pod metrics endpoint returned non-200 status"
	case strings.Contains(errMsg, "parse prometheus metrics"):
		return "failed to parse dcgm Prometheus metrics (invalid format)"
	case strings.Contains(errMsg, "no gpus detected"):
		return "no GPUs detected in dcgm metrics (GPU model or metrics missing)"
	case strings.Contains(errMsg, "dcgm is not enabled in the GPU Operator"):
		return "DCGM is not enabled in the GPU Operator (check GPU Operator configuration and permissions)"
	case strings.Contains(errMsg, "failed to scrape any dcgm exporter pod"):
		return "failed to scrape any dcgm exporter pod (check DCGM exporter pod status and network connectivity)"
	case strings.Contains(errMsg, "no gpu metrics could be parsed from any dcgm pod"):
		return "no GPU metrics could be parsed from any DCGM pod (check DCGM exporter pod status and network connectivity)"
	case strings.Contains(errMsg, "failed to create helm path"):
		return "failed to initialize Helm client (RBAC, kubeconfig, or Helm driver issue)"
	}
	return "unknown"
}

1114
// createProfilingJob creates a Kubernetes Job for profiling using SyncResource
1115
func (r *DynamoGraphDeploymentRequestReconciler) createProfilingJob(ctx context.Context, dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest) error {
1116
1117
	logger := log.FromContext(ctx)

1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
	// Delete any existing output ConfigMap to ensure fresh profiling results
	// This prevents using stale data from previous profiling runs
	outputConfigMapName := getOutputConfigMapName(dgdr)
	existingCM := &corev1.ConfigMap{}
	err := r.Get(ctx, types.NamespacedName{
		Name:      outputConfigMapName,
		Namespace: dgdr.Namespace,
	}, existingCM)
	if err == nil {
		// ConfigMap exists, delete it
		logger.Info("Deleting existing output ConfigMap to ensure fresh profiling results", "configMap", outputConfigMapName)
		if err := r.Delete(ctx, existingCM); err != nil && !apierrors.IsNotFound(err) {
			logger.Error(err, "Failed to delete existing output ConfigMap", "configMap", outputConfigMapName)
			return fmt.Errorf("failed to delete existing output ConfigMap: %w", err)
		}
		logger.Info("Successfully deleted old output ConfigMap", "configMap", outputConfigMapName)
	} else if !apierrors.IsNotFound(err) {
		// Unexpected error checking for ConfigMap
		logger.Error(err, "Failed to check for existing output ConfigMap", "configMap", outputConfigMapName)
		return fmt.Errorf("failed to check for existing output ConfigMap: %w", err)
	}

	// Ensure profiling job RBAC exists (only for cluster-wide installation)
1141
	if r.Config.Namespace.Restricted == "" {
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
		if err := r.RBACManager.EnsureServiceAccountWithRBAC(
			ctx,
			dgdr.Namespace,
			ServiceAccountProfilingJob,
			r.Config.RBAC.DGDRProfilingClusterRoleName,
		); err != nil {
			logger.Error(err, "Failed to ensure profiling job RBAC")
			return fmt.Errorf("failed to ensure profiling job RBAC: %w", err)
		}
	}

1153
	// Enrich hardware from GPU discovery before marshalling the spec.
1154
	// This fills in any missing hardware fields (gpuSku, vramMb, numGpusPerNode, totalGpus).
1155
	if err := r.enrichHardwareFromDiscovery(ctx, dgdr); err != nil {
1156
		return err
1157
1158
	}

1159
1160
1161
1162
1163
	// Use SyncResource to create/update the job
	modified, job, err := commonController.SyncResource(ctx, r, dgdr, func(ctx context.Context) (*batchv1.Job, bool, error) {
		jobName := getProfilingJobName(dgdr)
		outputConfigMapName := getOutputConfigMapName(dgdr)

1164
1165
		// Marshal the DGDR spec to JSON — the profiler receives the spec verbatim
		specJSON, err := marshalDGDRSpec(dgdr)
1166
		if err != nil {
1167
			return nil, false, err
1168
		}
1169
1170

		// Common environment variables
1171
		profilerEnv := []corev1.EnvVar{
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
			{
				Name: "HUGGING_FACE_HUB_TOKEN",
				ValueFrom: &corev1.EnvVarSource{
					SecretKeyRef: &corev1.SecretKeySelector{
						LocalObjectReference: corev1.LocalObjectReference{
							Name: "hf-token-secret",
						},
						Key: "HF_TOKEN",
					},
				},
			},
			{
				Name:  "NATS_SERVER",
				Value: fmt.Sprintf("nats://%s-nats:4222", dgdr.Namespace),
			},
			{
				Name:  "ETCD_ENDPOINTS",
				Value: fmt.Sprintf("%s-etcd:2379", dgdr.Namespace),
			},
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
			// DGDR metadata for setting ownerReferences
			{
				Name:  "DGDR_NAME",
				Value: dgdr.Name,
			},
			{
				Name:  "DGDR_NAMESPACE",
				Value: dgdr.Namespace,
			},
			{
				Name:  "DGDR_UID",
				Value: string(dgdr.UID),
			},
1204
1205
		}

1206
		// Build volume mounts
1207
1208
1209
1210
1211
1212
1213
		volumeMounts := []corev1.VolumeMount{
			{
				Name:      VolumeNameProfilingOutput,
				MountPath: ProfilingOutputPath,
			},
		}

1214
		// Add model cache PVC mount if configured
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
		modelCachePVC, modelCacheMountPath := extractModelCachePVCConfig(dgdr)
		if modelCachePVC != "" {
			logger.Info("Mounting model cache PVC to profiler pod", "pvc", modelCachePVC, "mountPath", modelCacheMountPath)
			volumeMounts = append(volumeMounts, corev1.VolumeMount{
				Name:      VolumeNameModelCache,
				MountPath: modelCacheMountPath,
				ReadOnly:  true,
			})
		}

1225
1226
1227
1228
1229
1230
1231
1232
		// v1alpha1 round-trip: mount ConfigMap if referenced via annotation
		cmRef := configMapRefFromAnnotation(dgdr)
		if cmRef != nil {
			volumeMounts = append(volumeMounts, corev1.VolumeMount{
				Name:      VolumeNameProfilingConfig,
				MountPath: ProfilingConfigMountPath,
				ReadOnly:  true,
			})
1233
1234
		}

1235
		// Profiler args: pass the DGDR spec as JSON via --config
1236
1237
		// --output-dir must match ProfilingOutputPath so the sidecar can find profiler_status.yaml
		profilerArgs := []string{"--config", specJSON, "--output-dir", ProfilingOutputPath}
1238

1239
1240
		// Use image from spec; the defaulting webhook fills this in for production builds.
		// Guard against empty image in case the webhook didn't run (e.g. local dev builds).
1241
1242
1243
1244
1245
1246
1247
		//
		// Starting with Dynamo 1.1.0, the profiler's runtime dependencies
		// (kubernetes_asyncio, pmdarima, prophet, aiconfigurator, ...) live in the
		// dedicated dynamo-planner image, not in backend runtime or frontend images.
		// Users on 1.1.0+ must set spec.image to a planner image
		// (e.g. nvcr.io/nvidia/ai-dynamo/dynamo-planner:<version>); earlier versions
		// can continue using the frontend/backend image they were using before.
1248
		imageName := dgdr.Spec.Image
1249
1250
1251
		if imageName == "" {
			return nil, false, fmt.Errorf("spec.image is required but not set; ensure the defaulting webhook ran or set spec.image explicitly")
		}
1252
1253
		logger.Info("Using profiler image", "image", imageName)

1254
		profilerContainer := corev1.Container{
1255
1256
			Name:         ContainerNameProfiler,
			Image:        imageName,
1257
			Command:      []string{"python", "-m", "dynamo.profiler"},
1258
			Args:         profilerArgs,
1259
1260
			Env:          profilerEnv,
			VolumeMounts: volumeMounts,
1261
			WorkingDir:   "/workspace",
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
		}

		// Generate sidecar script from template
		tmpl, err := template.New("sidecar").Parse(sidecarScriptTemplate)
		if err != nil {
			return nil, false, fmt.Errorf("failed to parse sidecar script template: %w", err)
		}

		var scriptBuf bytes.Buffer
		err = tmpl.Execute(&scriptBuf, map[string]string{
1272
1273
1274
1275
1276
			"OutputPath":    ProfilingOutputPath,
			"OutputFile":    ProfilingOutputFile,
			"ConfigMapName": outputConfigMapName,
			"Namespace":     dgdr.Namespace,
			"DGDRName":      dgdr.Name,
1277
			"DGDRuid":       string(dgdr.UID),
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
		})
		if err != nil {
			return nil, false, fmt.Errorf("failed to execute sidecar script template: %w", err)
		}

		sidecarContainer := corev1.Container{
			Name:    ContainerNameOutputCopier,
			Image:   SidecarImage,
			Command: []string{"/bin/sh", "-c"},
			Args:    []string{scriptBuf.String()},
			VolumeMounts: []corev1.VolumeMount{{
				Name:      VolumeNameProfilingOutput,
				MountPath: ProfilingOutputPath,
				ReadOnly:  true,
			}},
		}

1295
1296
		// Use PVC for profiling output if round-tripped v1alpha1 annotation is present,
		// otherwise use emptyDir (v1beta1 default).
1297
		var profilingOutputVolume corev1.Volume
1298
1299
		if outputPVC := outputPVCFromAnnotation(dgdr); outputPVC != "" {
			logger.Info("Using PVC for profiling output (from v1alpha1 annotation)", "pvc", outputPVC)
1300
1301
1302
1303
			profilingOutputVolume = corev1.Volume{
				Name: VolumeNameProfilingOutput,
				VolumeSource: corev1.VolumeSource{
					PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{
1304
						ClaimName: outputPVC,
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
					},
				},
			}
		} else {
			profilingOutputVolume = corev1.Volume{
				Name: VolumeNameProfilingOutput,
				VolumeSource: corev1.VolumeSource{
					EmptyDir: &corev1.EmptyDirVolumeSource{},
				},
			}
		}
		volumes := []corev1.Volume{profilingOutputVolume}
1317

1318
1319
		// Add model cache PVC volume if configured
		if modelCachePVC != "" {
1320
			volumes = append(volumes, corev1.Volume{
1321
				Name: VolumeNameModelCache,
1322
				VolumeSource: corev1.VolumeSource{
1323
1324
1325
					PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{
						ClaimName: modelCachePVC,
						ReadOnly:  true,
1326
1327
1328
1329
1330
					},
				},
			})
		}

1331
1332
1333
1334
1335
1336
		// v1alpha1 round-trip: add ConfigMap volume if referenced via annotation
		if cmRef != nil {
			cmKey := cmRef.Key
			if cmKey == "" {
				cmKey = ProfilingConfigDefaultKey
			}
1337
			volumes = append(volumes, corev1.Volume{
1338
				Name: VolumeNameProfilingConfig,
1339
				VolumeSource: corev1.VolumeSource{
1340
1341
1342
1343
1344
1345
1346
1347
					ConfigMap: &corev1.ConfigMapVolumeSource{
						LocalObjectReference: corev1.LocalObjectReference{
							Name: cmRef.Name,
						},
						Items: []corev1.KeyToPath{{
							Key:  cmKey,
							Path: ProfilingConfigDefaultKey,
						}},
1348
1349
1350
1351
1352
					},
				},
			})
		}

1353
1354
1355
		// Limit retries to prevent infinite loop
		backoffLimit := int32(3)

1356
1357
1358
1359
		podSpec := corev1.PodSpec{
			ServiceAccountName: ServiceAccountProfilingJob,
			RestartPolicy:      corev1.RestartPolicyNever,
			SecurityContext: &corev1.PodSecurityContext{
1360
1361
1362
1363
				RunAsNonRoot: ptr.To(true),
				RunAsUser:    ptr.To[int64](1000),
				RunAsGroup:   ptr.To[int64](1000),
				FSGroup:      ptr.To[int64](1000),
1364
1365
1366
1367
1368
1369
1370
1371
			},
			Containers: []corev1.Container{profilerContainer, sidecarContainer},
			Volumes:    volumes,
			ImagePullSecrets: []corev1.LocalObjectReference{
				{Name: "nvcr-imagepullsecret"},
			},
		}

1372
1373
1374
1375
1376
		job := &batchv1.Job{
			ObjectMeta: metav1.ObjectMeta{
				Name:      jobName,
				Namespace: dgdr.Namespace,
				Labels: map[string]string{
1377
1378
1379
					nvidiacomv1beta1.LabelApp:       nvidiacomv1beta1.LabelValueDynamoProfiler,
					nvidiacomv1beta1.LabelDGDR:      dgdr.Name,
					nvidiacomv1beta1.LabelManagedBy: nvidiacomv1beta1.LabelValueDynamoOperator,
1380
1381
1382
1383
1384
				},
			},
			Spec: batchv1.JobSpec{
				BackoffLimit: &backoffLimit,
				Template: corev1.PodTemplateSpec{
1385
					Spec: podSpec,
1386
1387
1388
1389
				},
			},
		}

1390
1391
1392
1393
1394
		var jobOverrides *batchv1.JobSpec
		if dgdr.Spec.Overrides != nil {
			jobOverrides = dgdr.Spec.Overrides.ProfilingJob
		}
		applyProfilingJobOverrides(job, jobOverrides)
1395

1396
1397
1398
1399
1400
1401
1402
1403
		return job, false, nil
	})

	if err != nil {
		return err
	}

	if modified {
1404
		logger.Info("Profiling job created/updated", "job", job.Name)
1405
1406
	}

1407
1408
1409
	// Store the job name in status for observability
	dgdr.Status.ProfilingJobName = job.Name

1410
1411
1412
	return nil
}

1413
1414
1415
1416
1417
1418
// marshalDGDRSpec produces the JSON string passed to the profiler via --config.
// The profiler receives the DGDR spec verbatim — no bespoke key mapping needed.
func marshalDGDRSpec(dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest) (string, error) {
	specJSON, err := json.Marshal(dgdr.Spec)
	if err != nil {
		return "", fmt.Errorf("failed to marshal DGDR spec to JSON: %w", err)
1419
	}
1420
1421
	return string(specJSON), nil
}
1422

1423
1424
1425
1426
1427
1428
1429
// enrichHardwareFromDiscovery fills in hardware fields that the user didn't set.
// Called before marshalDGDRSpec(). Mutates dgdr.Spec.Hardware in-place (memory only, not persisted).
func (r *DynamoGraphDeploymentRequestReconciler) enrichHardwareFromDiscovery(ctx context.Context, dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest) error {
	if dgdr.Spec.Hardware == nil {
		dgdr.Spec.Hardware = &nvidiacomv1beta1.HardwareSpec{}
	}
	hw := dgdr.Spec.Hardware
1430

1431
1432
1433
	// All fields already provided — nothing to discover.
	if hw.GPUSKU != "" && hw.VRAMMB != nil && hw.NumGPUsPerNode != nil && hw.TotalGPUs != nil {
		return nil
1434
1435
	}

1436
	// Run discovery to fill in any fields the user didn't set.
1437
	logger := log.FromContext(ctx)
1438
1439
1440
1441
1442
1443
	logger.Info("Attempting GPU discovery for profiling job")
	gpuInfo, err := r.GPUDiscovery.DiscoverGPUsFromDCGMFiltered(ctx, r.APIReader, r.GPUDiscoveryCache, hw.GPUSKU)
	if err != nil {
		reason := GetGPUDiscoveryFailureReason(err)
		logger.Info("GPU discovery not available", "reason", reason, "error", err.Error())
		return fmt.Errorf("GPU hardware info required but auto-discovery failed. Add spec.hardware.gpuSku, spec.hardware.vramMb, spec.hardware.numGpusPerNode, spec.hardware.totalGpus")
1444
	}
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454

	logger.Info("GPU discovery completed successfully",
		"gpusPerNode", gpuInfo.GPUsPerNode,
		"nodesWithGPUs", gpuInfo.NodesWithGPUs,
		"totalGpus", gpuInfo.GPUsPerNode*gpuInfo.NodesWithGPUs,
		"model", gpuInfo.Model,
		"vramMiB", gpuInfo.VRAMPerGPU,
		"system", gpuInfo.System,
		"cloudprovider", gpuInfo.CloudProvider)

1455
	if hw.GPUSKU == "" {
1456
1457
1458
		inferred := gpu.InferHardwareSystem(gpuInfo.Model)
		switch {
		case gpuInfo.System != "":
1459
			hw.GPUSKU = gpuInfo.System
1460
1461
1462
		case inferred != "":
			hw.GPUSKU = inferred
		default:
1463
1464
			hw.GPUSKU = nvidiacomv1beta1.GPUSKUType(gpuInfo.Model)
		}
1465
1466
1467
1468
	}
	if hw.VRAMMB == nil {
		vram := float64(gpuInfo.VRAMPerGPU)
		hw.VRAMMB = &vram
1469
	}
1470
1471
1472
1473
	if hw.NumGPUsPerNode == nil {
		n := int32(gpuInfo.GPUsPerNode)
		hw.NumGPUsPerNode = &n
	}
hhzhang16's avatar
hhzhang16 committed
1474
	if hw.TotalGPUs == nil {
1475
1476
1477
		// TODO: This is a temporary limit to prevent the profiler from using too many GPUs.
		// Will be removed once a fix is in the Profiler/AIC.
		const defaultMaxAutoGPUs = int32(32)
hhzhang16's avatar
hhzhang16 committed
1478
		total := int32(gpuInfo.GPUsPerNode * gpuInfo.NodesWithGPUs)
1479
1480
1481
1482
1483
		if total > defaultMaxAutoGPUs {
			logger.Info("Capping auto-discovered TotalGPUs at default limit; set hardware.totalGpus to override",
				"discovered", total, "cap", defaultMaxAutoGPUs)
			total = defaultMaxAutoGPUs
		}
hhzhang16's avatar
hhzhang16 committed
1484
1485
		hw.TotalGPUs = &total
	}
1486
1487
	return nil
}
1488

1489
1490
1491
1492
// extractModelCachePVCConfig reads model cache PVC settings from the typed v1beta1 spec.
// Returns (pvcName, mountPath) — both empty if not configured.
func extractModelCachePVCConfig(dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest) (string, string) {
	if dgdr.Spec.ModelCache == nil || dgdr.Spec.ModelCache.PVCName == "" {
1493
1494
		return "", ""
	}
1495
	mountPath := dgdr.Spec.ModelCache.PVCMountPath
1496
1497
1498
	if mountPath == "" {
		mountPath = DefaultModelCacheMountPath
	}
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
	return dgdr.Spec.ModelCache.PVCName, mountPath
}

// configMapKeySelector mirrors v1alpha1.ConfigMapKeySelector for annotation deserialization.
type configMapKeySelector struct {
	Name string `json:"name"`
	Key  string `json:"key,omitempty"`
}

// configMapRefFromAnnotation reads the ConfigMap reference from the round-trip annotation.
// Returns nil for native v1beta1 resources (no annotation present).
func configMapRefFromAnnotation(dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest) *configMapKeySelector {
	if dgdr.Annotations == nil {
		return nil
	}
	raw, ok := dgdr.Annotations[AnnotationConfigMapRef]
	if !ok || raw == "" {
		return nil
	}
	var ref configMapKeySelector
	if err := json.Unmarshal([]byte(raw), &ref); err != nil {
		return nil
	}
	return &ref
}
1524

1525
1526
1527
1528
1529
1530
1531
// outputPVCFromAnnotation reads the output PVC name from the round-trip annotation.
// Returns "" for native v1beta1 resources (always emptyDir).
func outputPVCFromAnnotation(dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest) string {
	if dgdr.Annotations == nil {
		return ""
	}
	return dgdr.Annotations[AnnotationOutputPVC]
1532
1533
}

1534
// checkProfilingJobStatus checks if the profiling job has completed
1535
func (r *DynamoGraphDeploymentRequestReconciler) checkProfilingJobStatus(ctx context.Context, dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest) (bool, error) {
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
	logger := log.FromContext(ctx)
	jobName := getProfilingJobName(dgdr)

	job := &batchv1.Job{}
	if err := r.Get(ctx, types.NamespacedName{Name: jobName, Namespace: dgdr.Namespace}, job); err != nil {
		return false, err
	}

	// Check job conditions
	for _, condition := range job.Status.Conditions {
		if condition.Type == batchv1.JobComplete && condition.Status == corev1.ConditionTrue {
			logger.Info("Profiling job completed", "job", jobName)
			return true, nil
		}
		if condition.Type == batchv1.JobFailed && condition.Status == corev1.ConditionTrue {
1551
1552
1553
1554
1555
			// Get detailed error from pod logs
			detailedError := r.getProfilingJobErrorDetails(ctx, dgdr, job)
			if detailedError != "" {
				return false, fmt.Errorf("profiling job failed: %s. Details: %s", condition.Message, detailedError)
			}
1556
1557
1558
1559
1560
1561
1562
			return false, fmt.Errorf("profiling job failed: %s", condition.Message)
		}
	}

	return false, nil
}

1563
// getProfilingJobErrorDetails retrieves detailed error information from failed profiling job pods
1564
func (r *DynamoGraphDeploymentRequestReconciler) getProfilingJobErrorDetails(ctx context.Context, dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest, job *batchv1.Job) string {
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
	logger := log.FromContext(ctx)

	// List pods owned by this job
	podList := &corev1.PodList{}
	labelSelector := client.MatchingLabels{
		"job-name": job.Name,
	}

	if err := r.List(ctx, podList, client.InNamespace(dgdr.Namespace), labelSelector); err != nil {
		logger.Error(err, "Failed to list pods for profiling job")
		return ""
	}

	// Look for failed pods and extract error details
	for _, pod := range podList.Items {
		// Check pod phase and container statuses
		if pod.Status.Phase == corev1.PodFailed {
			// Get profiler container status (first container)
			for _, containerStatus := range pod.Status.ContainerStatuses {
				if containerStatus.Name == ContainerNameProfiler && containerStatus.State.Terminated != nil {
					terminated := containerStatus.State.Terminated
					// Construct detailed error message
					errorMsg := fmt.Sprintf("Pod: %s, Container: %s, ExitCode: %d, Reason: %s",
						pod.Name, containerStatus.Name, terminated.ExitCode, terminated.Reason)
					if terminated.Message != "" {
						errorMsg += fmt.Sprintf(", Message: %s", terminated.Message)
					}
					logger.Info("Retrieved profiling job error details", "error", errorMsg)
					return errorMsg
				}
			}

			// If no terminated state found, check waiting state
			for _, containerStatus := range pod.Status.ContainerStatuses {
				if containerStatus.Name == ContainerNameProfiler && containerStatus.State.Waiting != nil {
					waiting := containerStatus.State.Waiting
					errorMsg := fmt.Sprintf("Pod: %s, Container: %s, Waiting - Reason: %s, Message: %s",
						pod.Name, containerStatus.Name, waiting.Reason, waiting.Message)
					logger.Info("Retrieved profiling job waiting details", "error", errorMsg)
					return errorMsg
				}
			}
		}
	}

	return ""
}

1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631
// computeDGDName returns the Kubernetes name to use for the DGD that a DGDR owns.
// If the user supplied an explicit name via spec.overrides.dgd.metadata.name that
// value is returned as-is; otherwise the DGDR's own name is used with a "-dgd"
// suffix, guaranteeing uniqueness even when two DGDRs have identical specs (which
// would otherwise both produce the same profiler-generated name, e.g. "vllm-agg").
func computeDGDName(dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest) string {
	if dgdr.Spec.Overrides != nil && dgdr.Spec.Overrides.DGD != nil && len(dgdr.Spec.Overrides.DGD.Raw) > 0 {
		var meta struct {
			Metadata struct {
				Name string `json:"name"`
			} `json:"metadata"`
		}
		if err := json.Unmarshal(dgdr.Spec.Overrides.DGD.Raw, &meta); err == nil && meta.Metadata.Name != "" {
			return meta.Metadata.Name
		}
	}
	return dgdr.Name + "-dgd"
}

1632
1633
1634
1635
// generateDGDSpec reads profiling output from the sidecar ConfigMap, extracts the
// DynamoGraphDeployment spec and pareto configs, stores the spec in an annotation via
// r.Update, and returns the ProfilingResultsStatus and DGD name.
func (r *DynamoGraphDeploymentRequestReconciler) generateDGDSpec(ctx context.Context, dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest) (*nvidiacomv1beta1.ProfilingResultsStatus, string, error) {
1636
	logger := log.FromContext(ctx)
1637
	logger.Info("Generating DGD spec from profiling results", "name", dgdr.Name, "backend", dgdr.Spec.Backend)
1638
1639
1640
1641
1642
1643
1644
1645
1646
1647
1648

	// Read the generated spec from ConfigMap (created by sidecar)
	outputConfigMapName := getOutputConfigMapName(dgdr)
	cm := &corev1.ConfigMap{}
	err := r.Get(ctx, types.NamespacedName{
		Name:      outputConfigMapName,
		Namespace: dgdr.Namespace,
	}, cm)

	if err != nil {
		if apierrors.IsNotFound(err) {
1649
			return nil, "", fmt.Errorf("output ConfigMap %s not found - profiling may not have completed yet", outputConfigMapName)
1650
		}
1651
		return nil, "", fmt.Errorf("failed to get output ConfigMap: %w", err)
1652
1653
	}

1654
	// Select the right config file based on mocker feature flag
1655
1656
	// Profiler writes the selected config (real or mocker) to a single output file
	outputFile := ProfilingOutputFile
1657

1658
	// Get YAML content from ConfigMap
1659
	yamlContent, exists := cm.Data[outputFile]
1660
	if !exists {
1661
		return nil, "", fmt.Errorf("key %s not found in ConfigMap %s", outputFile, outputConfigMapName)
1662
1663
	}

1664
	logger.Info("Found profiling output in ConfigMap", "configMap", outputConfigMapName, "outputFile", outputFile, "size", len(yamlContent))
1665

1666
1667
1668
	// Extract DGD and any supporting resources from potentially multi-document YAML (ConfigMap + DGD)
	dgd, additionalResources, err := r.extractResourcesFromYAML([]byte(yamlContent))
	if err != nil {
1669
		return nil, "", fmt.Errorf("failed to extract DGD from %s: %w", outputFile, err)
1670
1671
	}

1672
1673
1674
1675
1676
1677
1678
	// Override the profiler-generated name with a DGDR-scoped unique name.
	// The profiler emits a static topology-derived name (e.g. "vllm-agg") which
	// collides when multiple DGDRs share identical specs. Derive the name from
	// DGDR identity instead, respecting an explicit override if the user set one.
	dgd.Name = computeDGDName(dgdr)

	logger.Info("Parsed profiling output", "profilerDGDName", dgd.Name, "additionalResources", len(additionalResources))
1679
1680
1681
1682

	if len(additionalResources) > 0 {
		if err := r.storeAdditionalResources(ctx, dgdr, additionalResources); err != nil {
			logger.Error(err, "Failed to store additional resources")
1683
			return nil, "", err
1684
		}
1685
1686
		// storeAdditionalResources calls r.Update internally, bumping resourceVersion.
		// Refetch so the subsequent r.Update for the spec annotation doesn't 409.
1687
		if err := r.Get(ctx, types.NamespacedName{Name: dgdr.Name, Namespace: dgdr.Namespace}, dgdr); err != nil {
1688
			return nil, "", fmt.Errorf("failed to refetch DGDR after storing additional resources: %w", err)
1689
1690
		}
	}
1691

1692
1693
1694
1695
1696
1697
1698
1699
1700
1701
	profilingResults := &nvidiacomv1beta1.ProfilingResultsStatus{}
	if webUIData, ok := cm.Data["webui_data.json"]; ok {
		pareto, err := extractParetoFromWebUIData([]byte(webUIData))
		if err != nil {
			logger.Error(err, "Failed to parse webui_data.json; skipping pareto population")
		} else {
			profilingResults.Pareto = pareto
			logger.Info("Populated ProfilingResults.Pareto", "count", len(pareto))
		}
	}
1702

1703
	// Store the generated DGD in ProfilingResults.SelectedConfig
1704
1705
	dgdJSON, err := json.Marshal(dgd)
	if err != nil {
1706
		return nil, "", fmt.Errorf("failed to marshal generated DGD to JSON: %w", err)
1707
	}
1708
	profilingResults.SelectedConfig = &runtime.RawExtension{Raw: dgdJSON}
1709
1710
1711
1712

	// Serialize the DGD spec to an annotation so createDGD can retrieve it
	dgdBytes, err := sigsyaml.Marshal(dgd)
	if err != nil {
1713
		return nil, "", fmt.Errorf("failed to marshal generated DGD: %w", err)
1714
1715
1716
1717
1718
1719
1720
	}
	if dgdr.Annotations == nil {
		dgdr.Annotations = make(map[string]string)
	}
	dgdr.Annotations["nvidia.com/generated-dgd-spec"] = string(dgdBytes)

	if err := r.Update(ctx, dgdr); err != nil {
1721
1722
1723
1724
1725
1726
1727
1728
1729
1730
1731
1732
1733
1734
1735
1736
1737
1738
		return nil, "", fmt.Errorf("failed to update DGDR with generated DGD annotation: %w", err)
	}
	return profilingResults, dgd.Name, nil
}

// extractParetoFromWebUIData parses webui_data.json and returns all Pareto-optimal
// deployment configurations from the cost table. Each row's last column ("Action")
// is a partial DynamoGraphDeployment YAML snippet.
func extractParetoFromWebUIData(data []byte) ([]nvidiacomv1beta1.ParetoConfig, error) {
	var parsed struct {
		Cost struct {
			Table struct {
				Data [][]json.RawMessage `json:"data"`
			} `json:"table"`
		} `json:"cost"`
	}
	if err := json.Unmarshal(data, &parsed); err != nil {
		return nil, fmt.Errorf("failed to unmarshal webui_data.json: %w", err)
1739
1740
	}

1741
1742
1743
1744
1745
1746
1747
1748
1749
1750
1751
1752
1753
1754
1755
1756
1757
1758
1759
1760
1761
1762
1763
1764
1765
1766
1767
1768
1769
1770
1771
1772
1773
1774
1775
1776
1777
	rows := parsed.Cost.Table.Data
	if len(rows) == 0 {
		return nil, nil
	}

	// Schema: [TTFT(ms), PrefillThpt, ITL(ms), DecodeThpt, TokensPerUser, GPUHours, ActionYAML]
	const minColumns = 7
	const actionColumnIndex = 6

	pareto := make([]nvidiacomv1beta1.ParetoConfig, 0, len(rows))
	for _, row := range rows {
		if len(row) < minColumns {
			continue
		}

		var actionYAML string
		if err := json.Unmarshal(row[actionColumnIndex], &actionYAML); err != nil {
			continue
		}

		var configObj map[string]interface{}
		if err := sigsyaml.Unmarshal([]byte(stripYAMLComments(actionYAML)), &configObj); err != nil {
			continue
		}

		if len(configObj) == 0 {
			continue
		}

		configJSON, err := json.Marshal(configObj)
		if err != nil {
			continue
		}

		pareto = append(pareto, nvidiacomv1beta1.ParetoConfig{
			Config: runtime.RawExtension{Raw: configJSON},
		})
1778
1779
	}

1780
1781
1782
1783
1784
1785
1786
1787
1788
1789
1790
1791
1792
1793
	return pareto, nil
}

// stripYAMLComments removes comment lines (lines whose first non-whitespace character
// is '#') from a YAML string. The profiler prefixes action snippets with comment lines.
func stripYAMLComments(s string) string {
	lines := strings.Split(s, "\n")
	out := lines[:0] // reuse backing array; write index always <= range read index
	for _, line := range lines {
		if !strings.HasPrefix(strings.TrimLeft(line, " \t"), "#") {
			out = append(out, line)
		}
	}
	return strings.Join(out, "\n")
1794
1795
}

1796
1797
// storeAdditionalResources marshals additional resources to YAML and stores them in DGDR annotations.
// Validates annotation size and fails gracefully if too large.
1798
func (r *DynamoGraphDeploymentRequestReconciler) storeAdditionalResources(ctx context.Context, dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest, resources []*unstructured.Unstructured) error {
1799
1800
1801
1802
1803
1804
1805
1806
1807
1808
1809
1810
1811
1812
1813
1814
1815
1816
1817
1818
1819
1820
1821
1822
1823
1824
1825
1826
1827
1828
1829
1830
1831
1832
	if len(resources) == 0 {
		return nil
	}

	var resourcesYAML []byte

	for i, res := range resources {
		resYAML, err := sigsyaml.Marshal(res.Object)
		if err != nil {
			return fmt.Errorf("failed to marshal resource %s/%s: %w", res.GetKind(), res.GetName(), err)
		}
		if i > 0 {
			resourcesYAML = append(resourcesYAML, []byte("\n---\n")...)
		}
		resourcesYAML = append(resourcesYAML, resYAML...)
	}

	// Validate size before storing
	if len(resourcesYAML) > MaxAnnotationSize {
		return fmt.Errorf("additional resources YAML size (%d bytes) exceeds maximum annotation size (%d bytes); "+
			"consider reducing the number of resources or storing them separately",
			len(resourcesYAML), MaxAnnotationSize)
	}

	if dgdr.Annotations == nil {
		dgdr.Annotations = make(map[string]string)
	}
	dgdr.Annotations[AnnotationAdditionalResources] = string(resourcesYAML)

	return r.Update(ctx, dgdr)
}

// extractResourcesFromYAML parses multi-document YAML from profiling output,
// extracting the DynamoGraphDeployment and any ConfigMaps that should be deployed with it.
1833
func (r *DynamoGraphDeploymentRequestReconciler) extractResourcesFromYAML(yamlContent []byte) (*dgdv1alpha1.DynamoGraphDeployment, []*unstructured.Unstructured, error) {
1834
1835
	decoder := yaml.NewYAMLOrJSONDecoder(bytes.NewReader(yamlContent), 4096)

1836
	var dgd *dgdv1alpha1.DynamoGraphDeployment
1837
1838
1839
1840
1841
1842
1843
1844
1845
1846
1847
1848
1849
1850
1851
1852
1853
1854
	var additionalResources []*unstructured.Unstructured

	for {
		obj := &unstructured.Unstructured{}
		if err := decoder.Decode(obj); err != nil {
			if err == io.EOF {
				break
			}
			// Skip invalid documents and continue
			continue
		}

		// Skip empty objects
		if obj.GetKind() == "" {
			continue
		}

		if obj.GetKind() == "DynamoGraphDeployment" {
1855
			dgd = &dgdv1alpha1.DynamoGraphDeployment{}
1856
1857
1858
1859
1860
1861
1862
1863
1864
1865
1866
1867
1868
1869
1870
1871
1872
			if err := runtime.DefaultUnstructuredConverter.FromUnstructured(obj.Object, dgd); err != nil {
				return nil, nil, fmt.Errorf("failed to convert to DynamoGraphDeployment: %w", err)
			}
		} else {
			// Store ConfigMaps or other resources for deployment
			additionalResources = append(additionalResources, obj)
		}
	}

	if dgd == nil {
		return nil, nil, fmt.Errorf("no DynamoGraphDeployment found in YAML content")
	}

	return dgd, additionalResources, nil
}

// extractDGDFromYAML is a convenience wrapper that extracts only the DGD (used by tests)
1873
func (r *DynamoGraphDeploymentRequestReconciler) extractDGDFromYAML(yamlContent []byte) (*dgdv1alpha1.DynamoGraphDeployment, error) {
1874
1875
1876
1877
	dgd, _, err := r.extractResourcesFromYAML(yamlContent)
	return dgd, err
}

1878
1879
1880
1881
1882
1883
1884
1885
1886
1887
1888
1889
1890
1891
1892
1893
1894
1895
1896
1897
1898
1899
1900
1901
1902
1903
1904
1905
1906
1907
1908
1909
1910
1911
1912
1913
1914
1915
1916
1917
1918
1919
1920
1921
1922
1923
1924
1925
1926
1927
1928
1929
1930
1931
1932
1933
1934
1935
1936
1937
1938
// updateDeploymentInfo populates status.deploymentInfo from DGD service replica counts.
func updateDeploymentInfo(dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest, dgd *dgdv1alpha1.DynamoGraphDeployment) bool {
	var totalReplicas, totalAvailable int32
	for _, svc := range dgd.Status.Services {
		totalReplicas += svc.Replicas
		if svc.AvailableReplicas != nil {
			totalAvailable += *svc.AvailableReplicas
		}
	}

	// Short-circuit if nothing changed
	if cur := dgdr.Status.DeploymentInfo; cur != nil &&
		cur.Replicas != nil && *cur.Replicas == totalReplicas &&
		cur.AvailableReplicas != nil && *cur.AvailableReplicas == totalAvailable {
		return false
	}

	dgdr.Status.DeploymentInfo = &nvidiacomv1beta1.DeploymentInfoStatus{
		Replicas:          &totalReplicas,
		AvailableReplicas: &totalAvailable,
	}
	return true
}

// setSucceededCondition sets the aggregate Succeeded condition based on the current phase.
func setSucceededCondition(dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest, phase nvidiacomv1beta1.DGDRPhase) {
	var status metav1.ConditionStatus
	var reason, message string

	switch phase {
	case nvidiacomv1beta1.DGDRPhasePending, "":
		status, reason, message = metav1.ConditionFalse, "Pending", "DGDR is pending"
	case nvidiacomv1beta1.DGDRPhaseProfiling:
		status, reason, message = metav1.ConditionFalse, "Profiling", "Profiling is in progress"
	case nvidiacomv1beta1.DGDRPhaseReady:
		status, reason, message = metav1.ConditionTrue, "SpecGenerated", "Profiling complete, spec available"
	case nvidiacomv1beta1.DGDRPhaseDeploying:
		status, reason, message = metav1.ConditionFalse, "Deploying", "Deployment is in progress"
	case nvidiacomv1beta1.DGDRPhaseDeployed:
		status, reason, message = metav1.ConditionTrue, "Deployed", "Deployment is healthy"
	case nvidiacomv1beta1.DGDRPhaseFailed:
		status, reason, message = metav1.ConditionFalse, "Failed", "DGDR has failed"
	default:
		status, reason, message = metav1.ConditionFalse, "Unknown", "Unknown phase"
	}

	meta.SetStatusCondition(&dgdr.Status.Conditions, metav1.Condition{
		Type:               nvidiacomv1beta1.ConditionTypeSucceeded,
		Status:             status,
		ObservedGeneration: dgdr.Generation,
		Reason:             reason,
		Message:            message,
	})
}

// updatePhaseAndRequeue updates the DGDR phase and requeues
func (r *DynamoGraphDeploymentRequestReconciler) updatePhaseAndRequeue(ctx context.Context, dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest, phase nvidiacomv1beta1.DGDRPhase, message string) (ctrl.Result, error) {
	logger := log.FromContext(ctx)
	logger.Info("Updating DGDR phase", "name", dgdr.Name, "phase", phase, "message", message)
	dgdr.Status.Phase = phase
	setSucceededCondition(dgdr, phase)
1939
1940
1941
1942
1943
1944
	if err := r.Status().Update(ctx, dgdr); err != nil {
		return ctrl.Result{}, err
	}
	return ctrl.Result{Requeue: true}, nil
}

1945
1946
// updatePhaseWithCondition updates phase and adds/updates a condition
func (r *DynamoGraphDeploymentRequestReconciler) updatePhaseWithCondition(
1947
	ctx context.Context,
1948
1949
	dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest,
	phase nvidiacomv1beta1.DGDRPhase,
1950
1951
1952
1953
1954
	conditionType string,
	status metav1.ConditionStatus,
	reason string,
	message string,
) (ctrl.Result, error) {
1955
1956
	dgdr.Status.Phase = phase
	setSucceededCondition(dgdr, phase)
1957
1958
1959
1960
1961
1962
1963
1964
1965
1966
1967
1968
1969
1970
1971
1972
1973
1974
1975
1976
1977
1978

	condition := metav1.Condition{
		Type:               conditionType,
		Status:             status,
		ObservedGeneration: dgdr.Generation,
		LastTransitionTime: metav1.Now(),
		Reason:             reason,
		Message:            message,
	}

	dgdr.AddStatusCondition(condition)

	if err := r.Status().Update(ctx, dgdr); err != nil {
		return ctrl.Result{}, err
	}

	return ctrl.Result{Requeue: true}, nil
}

// SetupWithManager sets up the controller with the Manager
func (r *DynamoGraphDeploymentRequestReconciler) SetupWithManager(mgr ctrl.Manager) error {
	return ctrl.NewControllerManagedBy(mgr).
1979
		For(&nvidiacomv1beta1.DynamoGraphDeploymentRequest{}).
1980
		Named(consts.ResourceTypeDynamoGraphDeploymentRequest).
1981
1982
1983
1984
1985
1986
1987
		Owns(&batchv1.Job{}, builder.WithPredicates(predicate.Funcs{
			// ignore creation cause we don't want to be called again after we create the job
			CreateFunc:  func(ce event.CreateEvent) bool { return false },
			DeleteFunc:  func(de event.DeleteEvent) bool { return true },
			UpdateFunc:  func(de event.UpdateEvent) bool { return true },
			GenericFunc: func(ge event.GenericEvent) bool { return true },
		})). // Watch Jobs created by this controller (via ownerReference)
1988
		// Watch DGDs created by this controller (via label)
1989
		Watches(
1990
			&dgdv1alpha1.DynamoGraphDeployment{},
1991
1992
			handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, obj client.Object) []ctrl.Request {
				// Find DGDR by label instead of owner reference
1993
1994
1995
				dgd := obj.(*dgdv1alpha1.DynamoGraphDeployment)
				dgdrName, hasName := dgd.Labels[nvidiacomv1beta1.LabelDGDRName]
				dgdrNamespace, hasNamespace := dgd.Labels[nvidiacomv1beta1.LabelDGDRNamespace]
1996
1997
1998
1999
2000
2001
2002
2003
2004
2005
2006
2007
2008
2009
2010
2011
2012
				if !hasName || !hasNamespace {
					return nil
				}
				return []ctrl.Request{{
					NamespacedName: types.NamespacedName{
						Name:      dgdrName,
						Namespace: dgdrNamespace,
					},
				}}
			}),
			builder.WithPredicates(predicate.Funcs{
				// ignore creation cause we don't want to be called again after we create the DGD
				CreateFunc:  func(ce event.CreateEvent) bool { return false },
				DeleteFunc:  func(de event.DeleteEvent) bool { return true },
				UpdateFunc:  func(ue event.UpdateEvent) bool { return true },
				GenericFunc: func(ge event.GenericEvent) bool { return true },
			}),
2013
		).
2014
2015
2016
2017
2018
2019
2020
2021
2022
2023
2024
2025
2026
2027
2028
2029
2030
2031
2032
2033
2034
2035
2036
2037
2038
2039
2040
2041
2042
2043
2044
2045
2046
2047
2048
		// Watch output ConfigMaps for profiling sub-phase updates (via label)
		Watches(
			&corev1.ConfigMap{},
			handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, obj client.Object) []ctrl.Request {
				// Only trigger for ConfigMaps with DGDR labels (written by the sidecar)
				cm := obj.(*corev1.ConfigMap)
				dgdrName, hasName := cm.Labels[nvidiacomv1beta1.LabelDGDRName]
				dgdrNamespace, hasNamespace := cm.Labels[nvidiacomv1beta1.LabelDGDRNamespace]
				if !hasName || !hasNamespace {
					return nil
				}
				return []ctrl.Request{{
					NamespacedName: types.NamespacedName{
						Name:      dgdrName,
						Namespace: dgdrNamespace,
					},
				}}
			}),
			builder.WithPredicates(predicate.Funcs{
				CreateFunc: func(ce event.CreateEvent) bool {
					labels := ce.Object.GetLabels()
					_, hasName := labels[nvidiacomv1beta1.LabelDGDRName]
					_, hasNamespace := labels[nvidiacomv1beta1.LabelDGDRNamespace]
					return hasName && hasNamespace
				},
				UpdateFunc: func(ue event.UpdateEvent) bool {
					labels := ue.ObjectNew.GetLabels()
					_, hasName := labels[nvidiacomv1beta1.LabelDGDRName]
					_, hasNamespace := labels[nvidiacomv1beta1.LabelDGDRNamespace]
					return hasName && hasNamespace
				},
				DeleteFunc:  func(de event.DeleteEvent) bool { return false },
				GenericFunc: func(ge event.GenericEvent) bool { return false },
			}),
		).
2049
2050
		// Set the event filter to ignore resources handled by other controllers in namespace-restricted mode
		WithEventFilter(commonController.EphemeralDeploymentEventFilter(r.Config, r.RuntimeConfig)).
2051
		Complete(observability.NewObservedReconciler(r, consts.ResourceTypeDynamoGraphDeploymentRequest))
2052
}