dynamographdeploymentrequest_controller.go 78.6 KB
Newer Older
1
/*
2
 * SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
 * SPDX-License-Identifier: Apache-2.0
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package controller

import (
	"bytes"
	"context"
23
24
	"encoding/json"
	"errors"
25
	"fmt"
26
	"io"
27
	"strings"
28
29
30
31
32
33
34
	"text/template"

	batchv1 "k8s.io/api/batch/v1"
	corev1 "k8s.io/api/core/v1"
	apierrors "k8s.io/apimachinery/pkg/api/errors"
	"k8s.io/apimachinery/pkg/api/meta"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
35
	"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
36
37
	"k8s.io/apimachinery/pkg/runtime"
	"k8s.io/apimachinery/pkg/types"
38
	"k8s.io/apimachinery/pkg/util/yaml"
39
	"k8s.io/client-go/tools/record"
40
	"k8s.io/utils/ptr"
41
42
43
44
45
46
47
	ctrl "sigs.k8s.io/controller-runtime"
	"sigs.k8s.io/controller-runtime/pkg/builder"
	"sigs.k8s.io/controller-runtime/pkg/client"
	"sigs.k8s.io/controller-runtime/pkg/event"
	"sigs.k8s.io/controller-runtime/pkg/handler"
	"sigs.k8s.io/controller-runtime/pkg/log"
	"sigs.k8s.io/controller-runtime/pkg/predicate"
48
	sigsyaml "sigs.k8s.io/yaml"
49

50
	configv1alpha1 "github.com/ai-dynamo/dynamo/deploy/operator/api/config/v1alpha1"
51
52
	dgdv1alpha1 "github.com/ai-dynamo/dynamo/deploy/operator/api/v1alpha1"
	nvidiacomv1beta1 "github.com/ai-dynamo/dynamo/deploy/operator/api/v1beta1"
53
	"github.com/ai-dynamo/dynamo/deploy/operator/internal/consts"
54
	commonController "github.com/ai-dynamo/dynamo/deploy/operator/internal/controller_common"
55
	"github.com/ai-dynamo/dynamo/deploy/operator/internal/gpu"
56
	"github.com/ai-dynamo/dynamo/deploy/operator/internal/observability"
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
)

const (
	// Job naming
	JobNamePrefixOnline = "profile-online-"
	JobNamePrefixAIC    = "profile-aic-"

	// Container names
	ContainerNameProfiler     = "profiler"
	ContainerNameOutputCopier = "output-copier"

	// ServiceAccount
	ServiceAccountProfilingJob = "dgdr-profiling-job"

	// ConfigMap naming
	ConfigMapOutputPrefix = "dgdr-output-"

74
75
76
	// Annotation keys
	AnnotationAdditionalResources = "dgdr.nvidia.com/additional-resources"

77
78
79
80
81
82
	// Annotation keys for v1alpha1 round-trip compatibility.
	// The conversion layer stores v1alpha1 fields that have no v1beta1 spec equivalent
	// as annotations so the controller can still honour them for converted resources.
	AnnotationConfigMapRef = "nvidia.com/dgdr-config-map-ref"
	AnnotationOutputPVC    = "nvidia.com/dgdr-output-pvc"

83
84
85
	// Size limits
	MaxAnnotationSize = 250000 // ~250KB, below K8s 256KB limit

86
87
88
89
90
	// Sidecar image
	SidecarImage = "bitnami/kubectl:latest"

	// Volume names
	VolumeNameProfilingOutput = "profiling-output"
91
	VolumeNameProfilingConfig = "profiling-config"
92
	VolumeNameModelCache      = "model-cache"
93
94

	// Volume paths
95
	ProfilingOutputPath        = "/data"
96
	ProfilingOutputFile        = "final_config.yaml"
97
98
	ProfilingConfigMountPath   = "/config"
	ProfilingConfigDefaultKey  = "disagg.yaml"
99
	DefaultModelCacheMountPath = "/opt/model-cache"
100
101
102
103
104
105
106
107
108
109

	// Command line arguments
	ArgModel   = "--model"
	ArgBackend = "--backend"
	ArgTTFT    = "--ttft"
	ArgITL     = "--itl"
	ArgConfig  = "--config"

	// Messages
	MessageInitialized               = "DGDR initialized successfully"
110
	MessageDiscoveringHardware       = "Discovering GPU hardware and preparing profiling job"
111
112
113
114
	MessageProfilingJobCreated       = "Profiling job created"
	MessageAICProfilingJobCreated    = "AIC profiling job created"
	MessageProfilingInProgress       = "Profiling is in progress"
	MessageSpecGenerated             = "DynamoGraphDeployment spec generated successfully"
115
	MessageSpecAvailable             = "Generated spec is available in annotation nvidia.com/generated-dgd-spec"
116
117
118
119
120
	MessageDeploymentCreated         = "DynamoGraphDeployment %s created successfully"
	MessageDeploymentReady           = "DynamoGraphDeployment %s is ready"
	MessageDeploymentDegraded        = "DynamoGraphDeployment %s degraded from Ready to %s"
	MessageDeploymentDeleted         = "DGD %s was deleted. DGDR will not recreate it. Delete this DGDR and create a new one to redeploy."
	MessageInvalidState              = "Invalid state"
121
	MessageSpecChangeRejected        = "Cannot modify spec in phase '%s'. DynamoGraphDeploymentRequest is immutable once profiling starts. Create a new resource with a different name instead."
122
123
124
125
126
127
128
129
	MessageJobCreationFailed         = "JobCreationFailed"
	MessageDeploymentCreationFailed  = "DeploymentCreationFailed"
	MessageResultsRetrievalFailed    = "ResultsRetrievalFailed"
	MessageGenerationFailed          = "GenerationFailed"
	MessageAIConfiguratorCheckFailed = "AIConfiguratorCheckFailed"
	MessageProfilingCheckFailed      = "ProfilingCheckFailed"
	MessageConfigMapNotFound         = "ConfigMap %s not found in namespace %s"
	MessageConfigMapKeyNotFound      = "key %s not found in ConfigMap %s"
130
	MessageModelCachePVCNotFound     = "model cache PVC %s not found in namespace %s"
131
132
)

133
134
135
136
137
138
139
// shell script template for the output copier sidecar.
//
// The sidecar is a continuous poller that:
//  1. During profiling: polls profiler_status.yaml every 10s, relays phase+message
//     to the output ConfigMap so the controller can track sub-phase progress.
//  2. After profiler terminates: writes the final profiling output (final_config.yaml
//     + profiler_status.yaml) to the same ConfigMap, preserving the phase+message keys.
140
141
142
const sidecarScriptTemplate = `
set -e
set -o pipefail
143

144
145
STATUS_FILE="{{.OutputPath}}/profiler_status.yaml"
LAST_PHASE=""
146
147
148
149
START_TIME=$(date +%s)
LAST_PROGRESS_LOG=$START_TIME
PROGRESS_INTERVAL=300

150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
# relay_phase: read phase+message from profiler_status.yaml and write to ConfigMap.
# Only writes when the phase changes (debounce).
relay_phase() {
  if [ ! -f "$STATUS_FILE" ]; then
    return
  fi
  PHASE=$(grep "^phase:" "$STATUS_FILE" 2>/dev/null | awk '{print $2}' | tr -d '"' | tr -d "'" || true)
  MESSAGE=$(grep "^message:" "$STATUS_FILE" 2>/dev/null | sed 's/^message: *//' | tr -d '"' | tr -d "'" || true)
  if [ -z "$PHASE" ] || [ "$PHASE" = "$LAST_PHASE" ]; then
    return
  fi
  echo "Phase update: $PHASE - $MESSAGE"
  cat >/tmp/progress.yaml <<PEOF
apiVersion: v1
kind: ConfigMap
metadata:
  name: {{.ConfigMapName}}
  namespace: {{.Namespace}}
  labels:
    dgdr.nvidia.com/name: {{.DGDRName}}
    dgdr.nvidia.com/namespace: {{.Namespace}}
    nvidia.com/managed-by: dynamo-operator
172
173
174
175
176
177
178
  ownerReferences:
  - apiVersion: nvidia.com/v1beta1
    kind: DynamoGraphDeploymentRequest
    name: {{.DGDRName}}
    uid: {{.DGDRuid}}
    blockOwnerDeletion: true
    controller: true
179
180
181
182
183
184
185
186
187
data:
  phase: "$PHASE"
  message: "$MESSAGE"
PEOF
  kubectl apply -f /tmp/progress.yaml 2>/dev/null && LAST_PHASE="$PHASE" || echo "Warning: failed to update progress ConfigMap"
}

# Main loop: poll profiler_status.yaml and wait for profiler to terminate
echo "Waiting for profiler to complete..."
188
while true; do
189
190
191
  CURRENT_TIME=$(date +%s)
  ELAPSED=$((CURRENT_TIME - START_TIME))

192
193
194
  # Relay phase updates to ConfigMap
  relay_phase

195
196
197
198
199
200
201
202
203
204
  # Log progress every 5 minutes
  if [ $((CURRENT_TIME - LAST_PROGRESS_LOG)) -ge $PROGRESS_INTERVAL ]; then
    echo "Still waiting... ($(($ELAPSED / 60)) minutes elapsed)"
    LAST_PROGRESS_LOG=$CURRENT_TIME
  fi

  # Check if profiler container terminated
  CONTAINER_STATUS=$(kubectl get pod $HOSTNAME -n {{.Namespace}} -o jsonpath='{.status.containerStatuses[?(@.name=="profiler")].state}' 2>/dev/null || echo "")
  if echo "$CONTAINER_STATUS" | grep -q "terminated"; then
    echo "Profiler terminated (ran for $(($ELAPSED / 60)) minutes)"
205
206
    break
  fi
207
  sleep 10
208
209
done

210
211
212
# Final relay: pick up any last phase change written just before termination
relay_phase

213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
# Check profiler status file (2 minute timeout)
echo "Checking profiler status..."
TIMEOUT=120
CHECK_START=$(date +%s)

# Wait for status file to exist
while [ ! -f "$STATUS_FILE" ]; do
  ELAPSED=$(($(date +%s) - CHECK_START))
  if [ $ELAPSED -ge $TIMEOUT ]; then
    echo "ERROR: Status file not found after ${TIMEOUT}s"
    exit 1
  fi
  sleep 2
done

# Read and parse status from YAML file
STATUS=$(grep "^status:" "$STATUS_FILE" | awk '{print $2}' | tr -d '"' | tr -d "'")

if [ -z "$STATUS" ]; then
  echo "ERROR: Invalid status file format"
  exit 1
fi

# Check status value
case "$STATUS" in
  success)
    MESSAGE=$(grep "^message:" "$STATUS_FILE" | sed 's/^message: *//' | tr -d '"' | tr -d "'")
    echo "Profiler succeeded: $MESSAGE"
    ;;
  failed)
    ERROR=$(grep "^error:" "$STATUS_FILE" | sed 's/^error: *//' | tr -d '"' | tr -d "'")
    MESSAGE=$(grep "^message:" "$STATUS_FILE" | sed 's/^message: *//' | tr -d '"' | tr -d "'")
    echo "ERROR: Profiler failed: ${ERROR:-$MESSAGE}"
    exit 1
    ;;
  running)
    echo "ERROR: Profiler still running (unexpected)"
    exit 1
    ;;
  *)
    echo "ERROR: Unknown status: $STATUS"
    exit 1
    ;;
esac

258
259
260
261
262
echo "Writing profiling output to ConfigMap..."

# Read final phase+message to preserve them alongside the profiling output
FINAL_PHASE=$(grep "^phase:" "$STATUS_FILE" 2>/dev/null | awk '{print $2}' | tr -d '"' | tr -d "'" || true)
FINAL_MESSAGE=$(grep "^message:" "$STATUS_FILE" 2>/dev/null | sed 's/^message: *//' | tr -d '"' | tr -d "'" || true)
263

264
# Start building ConfigMap YAML with DGD spec + preserved phase/message
265
266
267
268
269
270
271
272
cat >/tmp/cm.yaml <<EOF
apiVersion: v1
kind: ConfigMap
metadata:
  name: {{.ConfigMapName}}
  namespace: {{.Namespace}}
  labels:
    dgdr.nvidia.com/name: {{.DGDRName}}
273
    dgdr.nvidia.com/namespace: {{.Namespace}}
274
    nvidia.com/managed-by: dynamo-operator
275
276
277
278
279
280
281
  ownerReferences:
  - apiVersion: nvidia.com/v1beta1
    kind: DynamoGraphDeploymentRequest
    name: {{.DGDRName}}
    uid: {{.DGDRuid}}
    blockOwnerDeletion: true
    controller: true
282
data:
283
284
  phase: "$FINAL_PHASE"
  message: "$FINAL_MESSAGE"
285
286
287
288
  {{.OutputFile}}: |
EOF
sed 's/^/    /' {{.OutputPath}}/{{.OutputFile}} >> /tmp/cm.yaml

289
290
291
292
293
294
# Add profiler status file for debugging
if [ -f {{.OutputPath}}/profiler_status.yaml ]; then
  echo "  profiler_status.yaml: |" >> /tmp/cm.yaml
  sed 's/^/    /' {{.OutputPath}}/profiler_status.yaml >> /tmp/cm.yaml
fi

295
296
297
298
299
300
# Add webui_data.json for pareto curve data (used by operator to populate status.profilingResults.pareto)
if [ -f {{.OutputPath}}/webui_data.json ]; then
  echo "  webui_data.json: |" >> /tmp/cm.yaml
  sed 's/^/    /' {{.OutputPath}}/webui_data.json >> /tmp/cm.yaml
fi

301
302
# Note: Profiling data (raw_data.npz converted to JSON) is included in the
# generated DGD YAML as a separate ConfigMap by the profiler, no need to add it here
303
304
305
306
307

kubectl apply -f /tmp/cm.yaml
echo "Saved profiling output to ConfigMap {{.ConfigMapName}}"
`

308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
// profilingPhaseReason returns the condition Reason for a profiling sub-phase.
// By design, the ProfilingPhase string values are identical to the Reason values
// (e.g., ProfilingPhaseSweepingDecode = "SweepingDecode" = ProfilingReasonSweepingDecode).
func profilingPhaseReason(phase nvidiacomv1beta1.ProfilingPhase) string {
	if phase == nvidiacomv1beta1.ProfilingPhaseDone {
		return nvidiacomv1beta1.ProfilingReasonCompleted
	}

	return string(phase)
}

// profilingPhaseFailureReason returns the condition Reason for a failed profiling sub-phase.
// By convention, failure reasons are "<Phase>Failed" (e.g., "SweepingDecodeFailed").
// An empty phase yields the generic "ProfilingFailed".
func profilingPhaseFailureReason(phase nvidiacomv1beta1.ProfilingPhase) string {
	if phase == "" {
		return "ProfilingFailed"
	}
	return string(phase) + "Failed"
}

// validProfilingPhases is the set of phases the profiler sidecar may report.
var validProfilingPhases = map[nvidiacomv1beta1.ProfilingPhase]struct{}{
	nvidiacomv1beta1.ProfilingPhaseInitializing:    {},
	nvidiacomv1beta1.ProfilingPhaseSweepingPrefill: {},
	nvidiacomv1beta1.ProfilingPhaseSweepingDecode:  {},
	nvidiacomv1beta1.ProfilingPhaseSelectingConfig: {},
	nvidiacomv1beta1.ProfilingPhaseBuildingCurves:  {},
	nvidiacomv1beta1.ProfilingPhaseGeneratingDGD:   {},
	nvidiacomv1beta1.ProfilingPhaseDone:            {},
}

// isValidProfilingPhase returns true if phase is a recognized ProfilingPhase value.
func isValidProfilingPhase(phase string) bool {
	_, ok := validProfilingPhases[nvidiacomv1beta1.ProfilingPhase(phase)]
	return ok
}

346
347
348
// DynamoGraphDeploymentRequestReconciler reconciles a DynamoGraphDeploymentRequest object
type DynamoGraphDeploymentRequestReconciler struct {
	client.Client
349
350
351
352
353
354
	APIReader         client.Reader
	Recorder          record.EventRecorder
	Config            *configv1alpha1.OperatorConfiguration
	RuntimeConfig     *commonController.RuntimeConfig
	GPUDiscoveryCache *gpu.GPUDiscoveryCache
	GPUDiscovery      *gpu.GPUDiscovery
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
	// RBACMgr handles RBAC setup for profiling jobs
	RBACManager RBACManager
}

// RBACManager interface for managing RBAC resources
type RBACManager interface {
	EnsureServiceAccountWithRBAC(ctx context.Context, targetNamespace, serviceAccountName, clusterRoleName string) error
}

// GetRecorder implements commonController.Reconciler interface
func (r *DynamoGraphDeploymentRequestReconciler) GetRecorder() record.EventRecorder {
	return r.Recorder
}

// FinalizeResource implements commonController.Finalizer interface
370
func (r *DynamoGraphDeploymentRequestReconciler) FinalizeResource(ctx context.Context, dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest) error {
371
372
373
374
375
376
377
378
379
380
381
382
383
	logger := log.FromContext(ctx)

	logger.Info("DGDR finalized successfully", "name", dgdr.Name)
	return nil
}

// +kubebuilder:rbac:groups=nvidia.com,resources=dynamographdeploymentrequests,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=nvidia.com,resources=dynamographdeploymentrequests/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=nvidia.com,resources=dynamographdeploymentrequests/finalizers,verbs=update
// +kubebuilder:rbac:groups=nvidia.com,resources=dynamographdeployments,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=nvidia.com,resources=dynamographdeployments/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=nvidia.com,resources=dynamographdeployments/finalizers,verbs=update
// +kubebuilder:rbac:groups=batch,resources=jobs,verbs=get;list;watch;create;update;patch;delete
384
// +kubebuilder:rbac:groups=core,resources=pods,verbs=get;list;watch
385
386
387
388
389
390
391
392
393
// +kubebuilder:rbac:groups=core,resources=configmaps,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=core,resources=events,verbs=create;patch

// Reconcile handles the reconciliation loop for DynamoGraphDeploymentRequest
func (r *DynamoGraphDeploymentRequestReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
	logger := log.FromContext(ctx)
	logger.Info("Reconciling DynamoGraphDeploymentRequest", "name", req.Name, "namespace", req.Namespace)

	// Fetch the DGDR instance
394
	dgdr := &nvidiacomv1beta1.DynamoGraphDeploymentRequest{}
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
	if err := r.Get(ctx, req.NamespacedName, dgdr); err != nil {
		if apierrors.IsNotFound(err) {
			logger.Info("DGDR resource not found, ignoring since object must be deleted")
			return ctrl.Result{}, nil
		}
		logger.Error(err, "Failed to get DGDR")
		return ctrl.Result{}, err
	}

	// Handle finalizer using common function
	finalized, err := commonController.HandleFinalizer(ctx, dgdr, r.Client, r)
	if err != nil {
		return ctrl.Result{}, err
	}
	if finalized {
		// Resource was deleted and finalized
		return ctrl.Result{}, nil
	}

	// Check for spec changes (immutability enforcement)
	if dgdr.Status.ObservedGeneration > 0 && dgdr.Status.ObservedGeneration != dgdr.Generation {
		// Spec changed after initial processing
417
418
419
420
		if dgdr.Status.Phase == nvidiacomv1beta1.DGDRPhaseProfiling || dgdr.Status.Phase == nvidiacomv1beta1.DGDRPhaseDeploying ||
			dgdr.Status.Phase == nvidiacomv1beta1.DGDRPhaseReady || dgdr.Status.Phase == nvidiacomv1beta1.DGDRPhaseDeployed {
			logger.Info("Spec change detected in immutable phase",
				"phase", dgdr.Status.Phase,
421
422
423
				"observedGeneration", dgdr.Status.ObservedGeneration,
				"currentGeneration", dgdr.Generation)

424
425
			r.Recorder.Event(dgdr, corev1.EventTypeWarning, nvidiacomv1beta1.EventReasonSpecChangeRejected,
				fmt.Sprintf(MessageSpecChangeRejected, dgdr.Status.Phase))
426
427

			// Keep the old observedGeneration to continue rejecting changes
428
			// No phase transition - stay in current phase with old spec
429
430
431
			return ctrl.Result{}, nil
		}
	}
432
433
434
435
436
437
438
439
440
441
442
443
444
445
	// Phase machine: handle different phases
	switch dgdr.Status.Phase {
	case nvidiacomv1beta1.DGDRPhasePending, "":
		return r.handlePendingPhase(ctx, dgdr)
	case nvidiacomv1beta1.DGDRPhaseProfiling:
		return r.handleProfilingPhase(ctx, dgdr)
	case nvidiacomv1beta1.DGDRPhaseDeploying:
		return r.handleDeployingPhase(ctx, dgdr)
	case nvidiacomv1beta1.DGDRPhaseReady:
		return r.handleReadyPhase(ctx, dgdr)
	case nvidiacomv1beta1.DGDRPhaseDeployed:
		return r.handleDeployedPhase(ctx, dgdr)
	case nvidiacomv1beta1.DGDRPhaseFailed:
		return r.handleFailedPhase(ctx, dgdr)
446
	default:
447
448
		logger.Info("Unknown phase", "phase", dgdr.Status.Phase)
		return r.updatePhaseAndRequeue(ctx, dgdr, nvidiacomv1beta1.DGDRPhaseFailed, MessageInvalidState)
449
450
451
	}
}

452
453
454
455
// handlePendingPhase processes newly created or pending DGDR resources.
// When ObservedGeneration == 0, performs initial validation (merged from v1alpha1 Initializing state).
// Otherwise, starts the profiling process.
func (r *DynamoGraphDeploymentRequestReconciler) handlePendingPhase(ctx context.Context, dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest) (ctrl.Result, error) {
456
457
	logger := log.FromContext(ctx)

458
459
460
	// First-time processing: validate spec (merged from handleInitialState)
	if dgdr.Status.ObservedGeneration == 0 {
		logger.Info("Handling initial validation", "name", dgdr.Name)
461

462
463
464
465
466
		// Validate the spec
		if err := r.validateSpec(ctx, dgdr); err != nil {
			r.Recorder.Event(dgdr, corev1.EventTypeWarning, nvidiacomv1beta1.EventReasonValidationFailed, err.Error())
			return r.updatePhaseWithCondition(ctx, dgdr, nvidiacomv1beta1.DGDRPhaseFailed, nvidiacomv1beta1.ConditionTypeValidation, metav1.ConditionFalse, nvidiacomv1beta1.EventReasonValidationFailed, err.Error())
		}
467

468
469
		// Set observedGeneration to track the spec we're processing
		dgdr.Status.ObservedGeneration = dgdr.Generation
470

471
		// Initialize status — next reconcile will discover hardware and create the profiling job.
472
		r.Recorder.Event(dgdr, corev1.EventTypeNormal, nvidiacomv1beta1.EventReasonInitialized, MessageInitialized)
473
474
475
		return r.updatePhaseWithCondition(ctx, dgdr, nvidiacomv1beta1.DGDRPhasePending,
			nvidiacomv1beta1.ConditionTypeProfiling, metav1.ConditionFalse,
			"DiscoveringHardware", MessageDiscoveringHardware)
476
	}
477

478
	logger.Info("Handling pending phase", "name", dgdr.Name)
479
480
481

	// Create profiling job (online or AIC)
	if err := r.createProfilingJob(ctx, dgdr); err != nil {
482
483
		r.Recorder.Event(dgdr, corev1.EventTypeWarning, nvidiacomv1beta1.EventReasonProfilingJobFailed, err.Error())
		return r.updatePhaseWithCondition(ctx, dgdr, nvidiacomv1beta1.DGDRPhaseFailed, nvidiacomv1beta1.ConditionTypeProfiling, metav1.ConditionFalse, MessageJobCreationFailed, err.Error())
484
485
486
	}

	// Record event with appropriate message
487
	if isOnlineProfiling(dgdr) {
488
		r.Recorder.Event(dgdr, corev1.EventTypeNormal, nvidiacomv1beta1.EventReasonProfilingJobCreated, MessageProfilingJobCreated)
489
	} else {
490
		r.Recorder.Event(dgdr, corev1.EventTypeNormal, nvidiacomv1beta1.EventReasonProfilingJobCreated, MessageAICProfilingJobCreated)
491
492
	}

493
	// Update to Profiling phase — use Initializing reason to indicate the profiler is loading.
494
	dgdr.SetProfilingPhase(nvidiacomv1beta1.ProfilingPhaseInitializing)
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
	return r.updatePhaseWithCondition(ctx, dgdr, nvidiacomv1beta1.DGDRPhaseProfiling, nvidiacomv1beta1.ConditionTypeProfiling, metav1.ConditionFalse, nvidiacomv1beta1.ProfilingReasonInitializing, MessageDiscoveringHardware)
}

// updateProfilingSubPhase reads the output ConfigMap and updates status.profilingPhase
// and the Profiling/Succeeded conditions. The sidecar continuously polls profiler_status.yaml
// and writes phase+message to the output ConfigMap (dgdr-output-<name>). This function
// reads those keys and copies them verbatim into the DGDR status.
func (r *DynamoGraphDeploymentRequestReconciler) updateProfilingSubPhase(
	ctx context.Context,
	dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest,
) error {
	logger := log.FromContext(ctx)
	outputCMName := getOutputConfigMapName(dgdr)

	cm := &corev1.ConfigMap{}
	if err := r.Get(ctx, types.NamespacedName{
		Name: outputCMName, Namespace: dgdr.Namespace,
	}, cm); err != nil {
		return nil // No output ConfigMap yet — skip
	}

	phase, exists := cm.Data["phase"]
	if !exists || phase == "" {
		return nil
	}

	if !isValidProfilingPhase(phase) {
		return fmt.Errorf("invalid profiling phase %q in ConfigMap %s", phase, outputCMName)
	}

	profilingPhase := nvidiacomv1beta1.ProfilingPhase(phase)
	if dgdr.Status.ProfilingPhase == profilingPhase {
		return nil // No change
	}

	logger.Info("Profiling sub-phase updated", "phase", phase)
	dgdr.SetProfilingPhase(profilingPhase)

	// Reason is derived from phase; message comes from the profiler via ConfigMap.
	reason := profilingPhaseReason(profilingPhase)
	message := cm.Data["message"] // written by profiler, relayed by sidecar

	meta.SetStatusCondition(&dgdr.Status.Conditions, metav1.Condition{
		Type:               nvidiacomv1beta1.ConditionTypeProfiling,
		Status:             metav1.ConditionFalse,
		ObservedGeneration: dgdr.Generation,
		Reason:             reason,
		Message:            message,
	})
	meta.SetStatusCondition(&dgdr.Status.Conditions, metav1.Condition{
		Type:               nvidiacomv1beta1.ConditionTypeSucceeded,
		Status:             metav1.ConditionFalse,
		ObservedGeneration: dgdr.Generation,
		Reason:             reason,
		Message:            message,
	})

	return r.Status().Update(ctx, dgdr)
553
554
}

555
556
// handleProfilingPhase monitors profiling progress and generates spec when complete
func (r *DynamoGraphDeploymentRequestReconciler) handleProfilingPhase(ctx context.Context, dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest) (ctrl.Result, error) {
557
	logger := log.FromContext(ctx)
558
	logger.Info("Handling profiling phase", "name", dgdr.Name)
559

560
561
562
563
564
	// Check for sub-phase updates from output ConfigMap (populated by sidecar poller)
	if err := r.updateProfilingSubPhase(ctx, dgdr); err != nil {
		return ctrl.Result{}, err
	}

565
566
567
568
569
	// Check profiling job status (both online and offline/AIC run as Jobs)
	// Note: We watch the Job via Owns(), so we'll be triggered automatically on Job changes
	completed, err := r.checkProfilingJobStatus(ctx, dgdr)
	if err != nil {
		r.Recorder.Event(dgdr, corev1.EventTypeWarning, MessageProfilingCheckFailed, err.Error())
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
		// Job failed - keep profilingPhase set so users can see where it died.
		// profilingPhase is already current: set to Initializing on entry,
		// then updated by updateProfilingSubPhase() above (reads output ConfigMap).
		failureReason := "ProfilingFailed"
		failureMessage := err.Error()
		if dgdr.Status.ProfilingPhase != "" {
			failureReason = profilingPhaseFailureReason(dgdr.Status.ProfilingPhase)
		}

		// Set phase and conditions directly so we can use sub-phase-specific failure
		// reason on both Profiling and Succeeded conditions. (updatePhaseWithCondition
		// would hardcode Succeeded reason to generic "Failed".)
		dgdr.Status.Phase = nvidiacomv1beta1.DGDRPhaseFailed
		meta.SetStatusCondition(&dgdr.Status.Conditions, metav1.Condition{
			Type:               nvidiacomv1beta1.ConditionTypeSucceeded,
			Status:             metav1.ConditionFalse,
			ObservedGeneration: dgdr.Generation,
			Reason:             failureReason,
			Message:            failureMessage,
		})
		dgdr.AddStatusCondition(metav1.Condition{
			Type:               nvidiacomv1beta1.ConditionTypeProfiling,
			Status:             metav1.ConditionFalse,
			ObservedGeneration: dgdr.Generation,
			Reason:             failureReason,
			Message:            failureMessage,
		})
		if err := r.Status().Update(ctx, dgdr); err != nil {
			return ctrl.Result{}, err
		}
		return ctrl.Result{Requeue: true}, nil
601
602
603
604
	}

	if !completed {
		logger.Info("Profiling job still running", "name", dgdr.Name)
605
		// Transition from Initializing to ProfilingRunning once the job is confirmed active.
606
		cond := meta.FindStatusCondition(dgdr.Status.Conditions, nvidiacomv1beta1.ConditionTypeProfiling)
607
		if cond != nil && cond.Reason == nvidiacomv1beta1.ProfilingReasonInitializing {
608
609
			return r.updatePhaseWithCondition(ctx, dgdr, nvidiacomv1beta1.DGDRPhaseProfiling, nvidiacomv1beta1.ConditionTypeProfiling, metav1.ConditionFalse, "ProfilingRunning", MessageProfilingInProgress)
		}
610
611
612
613
		// Don't requeue - we'll be triggered when the Job completes/fails
		return ctrl.Result{}, nil
	}

614
615
616
617
618
619
620
621
622
	profilingResults, dgdName, err := r.generateDGDSpec(ctx, dgdr)
	if err != nil {
		dgdr.ClearProfilingPhase()
		r.Recorder.Event(dgdr, corev1.EventTypeWarning, MessageGenerationFailed, err.Error())
		return r.updatePhaseWithCondition(ctx, dgdr, nvidiacomv1beta1.DGDRPhaseFailed, nvidiacomv1beta1.ConditionTypeSpecGenerated, metav1.ConditionFalse, MessageGenerationFailed, err.Error())
	}
	if err := r.Get(ctx, types.NamespacedName{Name: dgdr.Name, Namespace: dgdr.Namespace}, dgdr); err != nil {
		return ctrl.Result{}, fmt.Errorf("failed to refetch DGDR after generateDGDSpec: %w", err)
	}
623

624
	dgdr.ClearProfilingPhase()
625
	meta.SetStatusCondition(&dgdr.Status.Conditions, metav1.Condition{
626
		Type:               nvidiacomv1beta1.ConditionTypeProfiling,
627
628
629
630
631
		Status:             metav1.ConditionTrue,
		ObservedGeneration: dgdr.Generation,
		Reason:             "ProfilingCompleted",
		Message:            "Profiling job completed successfully",
	})
632
633
	dgdr.Status.DGDName = dgdName
	dgdr.Status.ProfilingResults = profilingResults
634

635
	r.Recorder.Event(dgdr, corev1.EventTypeNormal, nvidiacomv1beta1.EventReasonSpecGenerated, MessageSpecGenerated)
636

637
638
	// Create additional resources (ConfigMaps) immediately after profiling
	// This ensures that the `planner-profile-data` ConfigMap is available for both auto and manual deployment
639
	// v1beta1 uses the DGDR namespace for additional resources.
640
641
642
643
644
645
646
647
	targetNamespace := dgdr.Namespace
	if err := r.createAdditionalResources(ctx, dgdr, targetNamespace); err != nil {
		logger.Error(err, "Failed to create additional resources after profiling")
		// Don't fail the DGDR, just log the error - ConfigMaps can be created manually
		r.Recorder.Event(dgdr, corev1.EventTypeWarning, "ConfigMapCreationFailed",
			fmt.Sprintf("Failed to create ConfigMaps from profiling output: %v", err))
	}

648
	// If autoApply is enabled, transition to Deploying phase
649
	if dgdr.Spec.AutoApply == nil || *dgdr.Spec.AutoApply {
650
651
		logger.Info("AutoApply enabled, transitioning to Deploying phase")
		return r.updatePhaseWithCondition(ctx, dgdr, nvidiacomv1beta1.DGDRPhaseDeploying, nvidiacomv1beta1.ConditionTypeSpecGenerated, metav1.ConditionTrue, nvidiacomv1beta1.EventReasonSpecGenerated, MessageSpecGenerated)
652
653
	}

654
655
	// Otherwise, transition to Ready phase
	return r.updatePhaseWithCondition(ctx, dgdr, nvidiacomv1beta1.DGDRPhaseReady, nvidiacomv1beta1.ConditionTypeSpecGenerated, metav1.ConditionTrue, nvidiacomv1beta1.EventReasonSpecGenerated, MessageSpecAvailable)
656
657
}

658
659
// handleReadyPhase handles DGDR in Ready phase (profiling complete, spec available)
func (r *DynamoGraphDeploymentRequestReconciler) handleReadyPhase(ctx context.Context, dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest) (ctrl.Result, error) {
660
661
662
	logger := log.FromContext(ctx)
	logger.Info("DGDR is ready", "name", dgdr.Name)

663
664
665
666
667
668
669
670
671
	// Nothing to monitor in Ready phase - spec is available for manual application
	return ctrl.Result{}, nil
}

// handleDeployingPhase handles DGD creation and monitors deployment
func (r *DynamoGraphDeploymentRequestReconciler) handleDeployingPhase(ctx context.Context, dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest) (ctrl.Result, error) {
	logger := log.FromContext(ctx)
	logger.Info("Handling deploying phase", "name", dgdr.Name)

672
	if dgdr.Spec.AutoApply != nil && !*dgdr.Spec.AutoApply {
673
674
675
676
677
		// Shouldn't be in this phase without autoApply
		logger.Info("AutoApply not enabled, transitioning to Ready")
		dgdr.Status.Phase = nvidiacomv1beta1.DGDRPhaseReady
		setSucceededCondition(dgdr, nvidiacomv1beta1.DGDRPhaseReady)
		return ctrl.Result{}, r.Status().Update(ctx, dgdr)
678
679
	}

680
681
682
683
684
	if dgdr.Status.DGDName == "" {
		return r.createDGD(ctx, dgdr)
	}

	dgd := &dgdv1alpha1.DynamoGraphDeployment{}
685
	err := r.Get(ctx, types.NamespacedName{
686
687
		Name:      dgdr.Status.DGDName,
		Namespace: dgdr.Namespace,
688
689
690
	}, dgd)

	if apierrors.IsNotFound(err) {
691
692
693
694
695
		// Annotation present means DGD was never created (spec ready but create not yet called).
		// Annotation absent means DGD was previously created and then manually deleted.
		if _, hasSpec := dgdr.Annotations["nvidia.com/generated-dgd-spec"]; hasSpec {
			return r.createDGD(ctx, dgdr)
		}
696
697
698
699
700
701
702
		return r.handleDGDDeleted(ctx, dgdr)
	}

	if err != nil {
		return ctrl.Result{}, err
	}

703
704
705
	// Check if DGD is Ready
	var condStatus metav1.ConditionStatus
	var condReason, condMessage string
706

707
708
709
710
	if dgd.Status.State == dgdv1alpha1.DGDStateSuccessful {
		logger.Info("DGD is Ready, transitioning to Deployed phase")
		dgdr.Status.Phase = nvidiacomv1beta1.DGDRPhaseDeployed
		setSucceededCondition(dgdr, nvidiacomv1beta1.DGDRPhaseDeployed)
711

712
713
		r.Recorder.Event(dgdr, corev1.EventTypeNormal, nvidiacomv1beta1.EventReasonDeploymentReady,
			fmt.Sprintf(MessageDeploymentReady, dgd.Name))
714

715
716
717
718
719
		condStatus = metav1.ConditionTrue
		condReason = nvidiacomv1beta1.EventReasonDeploymentReady
		condMessage = fmt.Sprintf(MessageDeploymentReady, dgd.Name)
	} else {
		logger.Info("DGD not yet ready", "name", dgd.Name, "state", dgd.Status.State)
720

721
722
723
		condStatus = metav1.ConditionFalse
		condReason = "DeploymentInProgress"
		condMessage = fmt.Sprintf("DGD %s is in %s state", dgd.Name, string(dgd.Status.State))
724
725
	}

726
727
728
729
730
731
732
733
	updateDeploymentInfo(dgdr, dgd)
	meta.SetStatusCondition(&dgdr.Status.Conditions, metav1.Condition{
		Type:    nvidiacomv1beta1.ConditionTypeDeploymentReady,
		Status:  condStatus,
		Reason:  condReason,
		Message: condMessage,
	})

734
735
736
	return ctrl.Result{}, r.Status().Update(ctx, dgdr)
}

737
738
// handleDeployedPhase monitors a healthy DGD and detects degradation or deletion
func (r *DynamoGraphDeploymentRequestReconciler) handleDeployedPhase(ctx context.Context, dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest) (ctrl.Result, error) {
739
	logger := log.FromContext(ctx)
740
	logger.Info("DGDR is deployed", "name", dgdr.Name)
741

742
743
	// Check if DGD still exists and monitor its status
	dgd := &dgdv1alpha1.DynamoGraphDeployment{}
744
	err := r.Get(ctx, types.NamespacedName{
745
746
		Name:      dgdr.Status.DGDName,
		Namespace: dgdr.Namespace,
747
748
749
750
751
752
753
754
755
756
757
	}, dgd)

	if apierrors.IsNotFound(err) {
		// DGD was deleted by user
		return r.handleDGDDeleted(ctx, dgdr)
	}

	if err != nil {
		return ctrl.Result{}, err
	}

758
759
760
761
	// Check if DGD degraded from Ready
	if dgd.Status.State != dgdv1alpha1.DGDStateSuccessful {
		logger.Info("DGD degraded, transitioning back to Deploying",
			"dgdState", dgd.Status.State)
762

763
764
765
		dgdr.Status.Phase = nvidiacomv1beta1.DGDRPhaseDeploying
		setSucceededCondition(dgdr, nvidiacomv1beta1.DGDRPhaseDeploying)
		updateDeploymentInfo(dgdr, dgd)
766

767
768
		r.Recorder.Event(dgdr, corev1.EventTypeWarning, nvidiacomv1beta1.EventReasonDeploymentDegraded,
			fmt.Sprintf(MessageDeploymentDegraded, dgd.Name, string(dgd.Status.State)))
769
770

		meta.SetStatusCondition(&dgdr.Status.Conditions, metav1.Condition{
771
772
773
774
			Type:    nvidiacomv1beta1.ConditionTypeDeploymentReady,
			Status:  metav1.ConditionFalse,
			Reason:  nvidiacomv1beta1.EventReasonDeploymentDegraded,
			Message: fmt.Sprintf("Deployment degraded to %s", string(dgd.Status.State)),
775
		})
776
777
778
779
780
781
	} else {
		// DGD is healthy — update replica info only if changed
		if !updateDeploymentInfo(dgdr, dgd) {
			// Nothing changed, skip the status write
			return ctrl.Result{}, nil
		}
782
783
784
785
786
	}

	return ctrl.Result{}, r.Status().Update(ctx, dgdr)
}

787
788
789
// handleDGDDeleted handles the case when auto-created DGD is deleted by user.
// In v1beta1, this transitions to Failed (DeploymentDeleted phase was removed).
func (r *DynamoGraphDeploymentRequestReconciler) handleDGDDeleted(ctx context.Context, dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest) (ctrl.Result, error) {
790
	logger := log.FromContext(ctx)
791
	logger.Info("DGD was deleted by user, transitioning to Failed phase")
792

793
794
	dgdr.Status.Phase = nvidiacomv1beta1.DGDRPhaseFailed
	setSucceededCondition(dgdr, nvidiacomv1beta1.DGDRPhaseFailed)
795

796
797
	r.Recorder.Event(dgdr, corev1.EventTypeWarning, nvidiacomv1beta1.EventReasonDeploymentDeleted,
		fmt.Sprintf(MessageDeploymentDeleted, dgdr.Status.DGDName))
798

799
800
	dgdr.Status.DGDName = ""
	dgdr.Status.DeploymentInfo = nil
801

802
	meta.SetStatusCondition(&dgdr.Status.Conditions, metav1.Condition{
803
		Type:    nvidiacomv1beta1.ConditionTypeDeploymentReady,
804
		Status:  metav1.ConditionFalse,
805
		Reason:  nvidiacomv1beta1.EventReasonDeploymentDeleted,
806
807
808
809
810
811
812
		Message: "Deployment was deleted by user. Create a new DGDR to redeploy.",
	})

	return ctrl.Result{}, r.Status().Update(ctx, dgdr)
}

// createDGD creates a DynamoGraphDeployment with the generated spec
813
func (r *DynamoGraphDeploymentRequestReconciler) createDGD(ctx context.Context, dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest) (ctrl.Result, error) {
814
815
	logger := log.FromContext(ctx)

816
817
818
819
	// Extract DGD spec from annotation (stored by generateDGDSpec)
	dgdSpecYAML, ok := dgdr.Annotations["nvidia.com/generated-dgd-spec"]
	if !ok || dgdSpecYAML == "" {
		return ctrl.Result{}, fmt.Errorf("generated DGD spec not found in annotation nvidia.com/generated-dgd-spec")
820
821
	}

822
823
824
	generatedDGD := &dgdv1alpha1.DynamoGraphDeployment{}
	if err := yaml.Unmarshal([]byte(dgdSpecYAML), generatedDGD); err != nil {
		return ctrl.Result{}, fmt.Errorf("failed to unmarshal generated deployment from annotation: %w", err)
825
826
	}

827
	// Determine DGD name and namespace from generated deployment
828
829
830
831
832
833
834
835
836
837
838
	dgdName := generatedDGD.Name
	dgdNamespace := dgdr.Namespace

	// Build labels (start with generated DGD's labels)
	labels := make(map[string]string)
	if generatedDGD.Labels != nil {
		for k, v := range generatedDGD.Labels {
			labels[k] = v
		}
	}
	// Add/override with managed labels
839
840
841
	labels[nvidiacomv1beta1.LabelDGDRName] = dgdr.Name
	labels[nvidiacomv1beta1.LabelDGDRNamespace] = dgdr.Namespace
	labels[nvidiacomv1beta1.LabelManagedBy] = nvidiacomv1beta1.LabelValueDynamoOperator
842
843
844
845
846
847
848
849
850
851

	// Build annotations (start with generated DGD's annotations)
	annotations := make(map[string]string)
	if generatedDGD.Annotations != nil {
		for k, v := range generatedDGD.Annotations {
			annotations[k] = v
		}
	}

	// Create DGD from generated deployment
852
	dgd := &dgdv1alpha1.DynamoGraphDeployment{
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
		ObjectMeta: metav1.ObjectMeta{
			Name:        dgdName,
			Namespace:   dgdNamespace,
			Labels:      labels,
			Annotations: annotations,
		},
		Spec: generatedDGD.Spec,
	}

	// Note: We don't set owner reference on DGD
	// If a DGDR is deleted, the DGD may be serving traffic and should persist independently.
	// We use labels (LabelDGDRName) to track the relationship.

	logger.Info("Creating DynamoGraphDeployment", "name", dgdName, "namespace", dgdNamespace)

	if err := r.Create(ctx, dgd); err != nil {
		if apierrors.IsAlreadyExists(err) {
			logger.Info("DGD already exists, updating status")
871
872
873
874
875
			delete(dgdr.Annotations, "nvidia.com/generated-dgd-spec")
			if updateErr := r.Update(ctx, dgdr); updateErr != nil {
				logger.Error(updateErr, "Failed to remove generated-dgd-spec annotation on IsAlreadyExists path")
				return ctrl.Result{}, updateErr
			}
876
			dgdr.Status.DGDName = dgdName
877
878
879
880
881
882
			return ctrl.Result{}, r.Status().Update(ctx, dgdr)
		}
		r.Recorder.Event(dgdr, corev1.EventTypeWarning, MessageDeploymentCreationFailed, err.Error())
		return ctrl.Result{}, err
	}

883
884
885
886
887
888
889
	delete(dgdr.Annotations, "nvidia.com/generated-dgd-spec")
	if err := r.Update(ctx, dgdr); err != nil {
		// Return the error to force a retry. The DGD was created successfully, so a
		// retry will hit the IsAlreadyExists path above and attempt cleanup again.
		return ctrl.Result{}, fmt.Errorf("failed to remove generated-dgd-spec annotation after DGD creation: %w", err)
	}

890
	// Update status
891
	dgdr.Status.DGDName = dgdName
892

893
	r.Recorder.Event(dgdr, corev1.EventTypeNormal, nvidiacomv1beta1.EventReasonDeploymentCreated,
894
895
896
		fmt.Sprintf(MessageDeploymentCreated, dgdName))

	meta.SetStatusCondition(&dgdr.Status.Conditions, metav1.Condition{
897
		Type:    nvidiacomv1beta1.ConditionTypeDeploymentReady,
898
		Status:  metav1.ConditionFalse,
899
		Reason:  nvidiacomv1beta1.EventReasonDeploymentCreated,
900
901
902
903
904
905
906
907
		Message: fmt.Sprintf("DGD %s created, waiting for Ready", dgdName),
	})

	logger.Info("DynamoGraphDeployment created successfully", "name", dgdName)

	return ctrl.Result{}, r.Status().Update(ctx, dgdr)
}

908
// createAdditionalResources creates ConfigMaps from the profiling output that should be deployed alongside the DGD
909
func (r *DynamoGraphDeploymentRequestReconciler) createAdditionalResources(ctx context.Context, dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest, targetNamespace string) error {
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
	logger := log.FromContext(ctx)

	// Check if there are additional resources stored in annotations
	if dgdr.Annotations == nil {
		return nil
	}

	resourcesYAML, exists := dgdr.Annotations[AnnotationAdditionalResources]
	if !exists || resourcesYAML == "" {
		return nil
	}

	// Parse using standard Kubernetes YAML decoder
	decoder := yaml.NewYAMLOrJSONDecoder(bytes.NewReader([]byte(resourcesYAML)), 4096)
	resourceCount := 0

	for {
		obj := &unstructured.Unstructured{}
		if err := decoder.Decode(obj); err != nil {
			if err == io.EOF {
				break
			}
			logger.Error(err, "Failed to decode resource, skipping")
			continue
		}

		if obj.GetKind() == "" {
			continue
		}

		resourceCount++

		// Only support ConfigMap for now (what profiler actually generates)
		if obj.GetKind() != "ConfigMap" {
			logger.Info("Skipping non-ConfigMap resource from profiling output", "kind", obj.GetKind(), "name", obj.GetName())
			continue
		}

		cm := &corev1.ConfigMap{}
		if err := runtime.DefaultUnstructuredConverter.FromUnstructured(obj.Object, cm); err != nil {
			logger.Error(err, "Failed to convert to ConfigMap", "name", obj.GetName())
			continue
		}

		// Override namespace and add tracking labels
		cm.Namespace = targetNamespace
		if cm.Labels == nil {
			cm.Labels = make(map[string]string)
		}
959
960
961
		cm.Labels[nvidiacomv1beta1.LabelDGDRName] = dgdr.Name
		cm.Labels[nvidiacomv1beta1.LabelDGDRNamespace] = dgdr.Namespace
		cm.Labels[nvidiacomv1beta1.LabelManagedBy] = nvidiacomv1beta1.LabelValueDynamoOperator
962

963
964
965
966
967
968
		// Use SyncResource to create/update the ConfigMap with owner reference and change detection
		_, _, err := commonController.SyncResource(ctx, r, dgdr, func(ctx context.Context) (*corev1.ConfigMap, bool, error) {
			return cm, false, nil
		})
		if err != nil {
			return fmt.Errorf("failed to sync ConfigMap %s: %w", cm.Name, err)
969
		}
970
		logger.Info("Synced ConfigMap from profiling output", "name", cm.Name, "namespace", targetNamespace)
971
972
973
974
975
976
977
978
979
	}

	if resourceCount > 0 {
		logger.Info("Deploying additional resources from profiling output", "count", resourceCount)
	}

	return nil
}

980
981
// handleFailedPhase handles DGDR in Failed phase
func (r *DynamoGraphDeploymentRequestReconciler) handleFailedPhase(ctx context.Context, dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest) (ctrl.Result, error) {
982
	logger := log.FromContext(ctx)
983
	logger.Info("DGDR is in failed phase", "name", dgdr.Name)
984
985
986
987
988

	// Could implement retry logic here if desired
	return ctrl.Result{}, nil
}

989
// getProfilingJobName returns the job name for a DGDR
990
func getProfilingJobName(dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest) string {
991
992
	// Use "profile-" prefix for all profiling jobs
	return fmt.Sprintf("profile-%s", dgdr.Name)
993
994
995
}

// getOutputConfigMapName returns the ConfigMap name for profiling output
996
func getOutputConfigMapName(dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest) string {
997
998
999
	return fmt.Sprintf("%s%s", ConfigMapOutputPrefix, dgdr.Name)
}

1000
1001
1002
// isOnlineProfiling returns true. In v1beta1, the profiler decides online vs AIC
// mode internally based on its config. The controller always uses the same label.
func isOnlineProfiling(_ *nvidiacomv1beta1.DynamoGraphDeploymentRequest) bool {
1003
1004
	return true
}
1005

1006
// validateSpec validates the DGDR spec
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
func (r *DynamoGraphDeploymentRequestReconciler) validateSpec(ctx context.Context, dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest) error {
	var errs []error

	// Disallow searchStrategy: thorough with backend: auto.
	if dgdr.Spec.SearchStrategy == nvidiacomv1beta1.SearchStrategyThorough &&
		dgdr.Spec.Backend == nvidiacomv1beta1.BackendTypeAuto {
		errs = append(errs, fmt.Errorf(
			"spec.searchStrategy %q is incompatible with spec.backend %q: set spec.backend to a specific backend (sglang, trtllm, or vllm)",
			nvidiacomv1beta1.SearchStrategyThorough,
			nvidiacomv1beta1.BackendTypeAuto,
		))
1018
1019
	}

1020
	// Validate model cache PVC if provided
1021
	if dgdr.Spec.ModelCache != nil && dgdr.Spec.ModelCache.PVCName != "" {
1022
1023
		pvc := &corev1.PersistentVolumeClaim{}
		err := r.Get(ctx, types.NamespacedName{
1024
			Name:      dgdr.Spec.ModelCache.PVCName,
1025
1026
1027
1028
1029
			Namespace: dgdr.Namespace,
		}, pvc)

		if err != nil {
			if apierrors.IsNotFound(err) {
1030
1031
1032
				errs = append(errs, fmt.Errorf(MessageModelCachePVCNotFound, dgdr.Spec.ModelCache.PVCName, dgdr.Namespace))
			} else {
				return err
1033
1034
1035
1036
			}
		}
	}

1037
	if err := r.validateGPUHardwareInfo(ctx, dgdr); err != nil {
1038
		errs = append(errs, err)
1039
1040
	}

1041
	// The profiler will validate the rest of the configuration
1042
	return errors.Join(errs...)
1043
1044
1045
}

// validateGPUHardwareInfo ensures GPU hardware information is available when required for profiling
1046
func (r *DynamoGraphDeploymentRequestReconciler) validateGPUHardwareInfo(ctx context.Context, dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest) error {
1047
1048
	logger := log.FromContext(ctx)

1049
1050
1051
1052
	// Check if user provided hardware info in the typed spec
	hasManualConfig := dgdr.Spec.Hardware != nil && (dgdr.Spec.Hardware.GPUSKU != "" ||
		dgdr.Spec.Hardware.VRAMMB != nil ||
		dgdr.Spec.Hardware.NumGPUsPerNode != nil)
1053

1054
1055
	// If manual config is provided, validation passes
	if hasManualConfig {
1056
1057
1058
		return nil
	}

1059
	isNamespaceScoped := r.Config.Namespace.Restricted != ""
1060
	if isNamespaceScoped {
1061
1062
		return fmt.Errorf(
			"GPU hardware info required but cannot be auto-discovered." +
1063
1064
1065
				"\n\nOptions to resolve:" +
				"\n\n1. Re-enable GPU discovery (if it was disabled during Helm install):" +
				"\n   helm upgrade ... --set dynamo-operator.gpuDiscovery.enabled=true" +
1066
1067
1068
1069
				"\n\n2. Add hardware config to spec.hardware:" +
				"\n   numGpusPerNode: 8" +
				"\n   gpuSku: \"H100-SXM5-80GB\"" +
				"\n   vramMb: 81920")
1070
1071
	}

1072
1073
1074
1075
1076
1077
1078
1079
	_, err := r.GPUDiscovery.DiscoverGPUsFromDCGM(ctx, r.APIReader, r.GPUDiscoveryCache)
	if err == nil {
		// GPU discovery is available, validation passes
		return nil
	}
	// Refine the logger message
	reason := GetGPUDiscoveryFailureReason(err)
	logger.Info("GPU discovery not available", "reason", reason, "error", err.Error())
1080
	return fmt.Errorf("GPU hardware info required but auto-discovery failed. Add spec.hardware.gpuSku, spec.hardware.vramMb, spec.hardware.numGpusPerNode")
1081
1082
}

1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
// GetGPUDiscoveryFailureReason classifies a GPU discovery error and
// returns a stable, actionable reason string suitable for structured logging.
//
// The classification is based on known error message patterns produced during:
//   - DCGM exporter pod discovery
//   - Helm-based GPU operator and DCGM discovery
//   - Metrics scraping
//   - Prometheus parsing
//
// If the error does not match any known category, "unknown" is returned.
func GetGPUDiscoveryFailureReason(err error) string {
	if err == nil {
		return "unknown"
	}
	errMsg := strings.ToLower(err.Error())

	switch {
	case strings.Contains(errMsg, "list pods"):
		return "failed to list DCGM exporter pods (RBAC/cluster connectivity issue)"
	case strings.Contains(errMsg, "gpu operator is not installed"):
		return "GPU Operator not installed in expected namespace"
	case strings.Contains(errMsg, "helm init failed"):
		return "failed to initialize Helm client (RBAC, kubeconfig, or Helm driver issue)"
	case strings.Contains(errMsg, "timeout waiting for dcgm exporter pods"):
		return "timeout while waiting for DCGM exporter pods to become ready"
	case strings.Contains(errMsg, "http get"):
		return "failed to reach DCGM metrics endpoint on pod (network/port issue)"
	case strings.Contains(errMsg, "metrics endpoint") &&
		strings.Contains(errMsg, "status"):
		return "DCGM pod metrics endpoint returned non-200 status"
	case strings.Contains(errMsg, "parse prometheus metrics"):
		return "failed to parse dcgm Prometheus metrics (invalid format)"
	case strings.Contains(errMsg, "no gpus detected"):
		return "no GPUs detected in dcgm metrics (GPU model or metrics missing)"
	case strings.Contains(errMsg, "dcgm is not enabled in the GPU Operator"):
		return "DCGM is not enabled in the GPU Operator (check GPU Operator configuration and permissions)"
	case strings.Contains(errMsg, "failed to scrape any dcgm exporter pod"):
		return "failed to scrape any dcgm exporter pod (check DCGM exporter pod status and network connectivity)"
	case strings.Contains(errMsg, "no gpu metrics could be parsed from any dcgm pod"):
		return "no GPU metrics could be parsed from any DCGM pod (check DCGM exporter pod status and network connectivity)"
	case strings.Contains(errMsg, "failed to create helm path"):
		return "failed to initialize Helm client (RBAC, kubeconfig, or Helm driver issue)"
	}
	return "unknown"
}

1129
// createProfilingJob creates a Kubernetes Job for profiling using SyncResource
1130
func (r *DynamoGraphDeploymentRequestReconciler) createProfilingJob(ctx context.Context, dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest) error {
1131
1132
	logger := log.FromContext(ctx)

1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
	// Delete any existing output ConfigMap to ensure fresh profiling results
	// This prevents using stale data from previous profiling runs
	outputConfigMapName := getOutputConfigMapName(dgdr)
	existingCM := &corev1.ConfigMap{}
	err := r.Get(ctx, types.NamespacedName{
		Name:      outputConfigMapName,
		Namespace: dgdr.Namespace,
	}, existingCM)
	if err == nil {
		// ConfigMap exists, delete it
		logger.Info("Deleting existing output ConfigMap to ensure fresh profiling results", "configMap", outputConfigMapName)
		if err := r.Delete(ctx, existingCM); err != nil && !apierrors.IsNotFound(err) {
			logger.Error(err, "Failed to delete existing output ConfigMap", "configMap", outputConfigMapName)
			return fmt.Errorf("failed to delete existing output ConfigMap: %w", err)
		}
		logger.Info("Successfully deleted old output ConfigMap", "configMap", outputConfigMapName)
	} else if !apierrors.IsNotFound(err) {
		// Unexpected error checking for ConfigMap
		logger.Error(err, "Failed to check for existing output ConfigMap", "configMap", outputConfigMapName)
		return fmt.Errorf("failed to check for existing output ConfigMap: %w", err)
	}

	// Ensure profiling job RBAC exists (only for cluster-wide installation)
1156
	if r.Config.Namespace.Restricted == "" {
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
		if err := r.RBACManager.EnsureServiceAccountWithRBAC(
			ctx,
			dgdr.Namespace,
			ServiceAccountProfilingJob,
			r.Config.RBAC.DGDRProfilingClusterRoleName,
		); err != nil {
			logger.Error(err, "Failed to ensure profiling job RBAC")
			return fmt.Errorf("failed to ensure profiling job RBAC: %w", err)
		}
	}

1168
1169
1170
1171
	// Enrich hardware from GPU discovery before marshalling the spec.
	// This fills in gpuSku, vramMb, numGpusPerNode if the user didn't set them.
	if err := r.enrichHardwareFromDiscovery(ctx, dgdr); err != nil {
		logger.Info("GPU discovery not available, proceeding without enrichment", "reason", err.Error())
1172
1173
	}

1174
1175
1176
1177
1178
	// Use SyncResource to create/update the job
	modified, job, err := commonController.SyncResource(ctx, r, dgdr, func(ctx context.Context) (*batchv1.Job, bool, error) {
		jobName := getProfilingJobName(dgdr)
		outputConfigMapName := getOutputConfigMapName(dgdr)

1179
1180
		// Marshal the DGDR spec to JSON — the profiler receives the spec verbatim
		specJSON, err := marshalDGDRSpec(dgdr)
1181
		if err != nil {
1182
			return nil, false, err
1183
		}
1184
1185

		// Common environment variables
1186
		profilerEnv := []corev1.EnvVar{
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
			{
				Name: "HUGGING_FACE_HUB_TOKEN",
				ValueFrom: &corev1.EnvVarSource{
					SecretKeyRef: &corev1.SecretKeySelector{
						LocalObjectReference: corev1.LocalObjectReference{
							Name: "hf-token-secret",
						},
						Key: "HF_TOKEN",
					},
				},
			},
			{
				Name:  "NATS_SERVER",
				Value: fmt.Sprintf("nats://%s-nats:4222", dgdr.Namespace),
			},
			{
				Name:  "ETCD_ENDPOINTS",
				Value: fmt.Sprintf("%s-etcd:2379", dgdr.Namespace),
			},
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
			// DGDR metadata for setting ownerReferences
			{
				Name:  "DGDR_NAME",
				Value: dgdr.Name,
			},
			{
				Name:  "DGDR_NAMESPACE",
				Value: dgdr.Namespace,
			},
			{
				Name:  "DGDR_UID",
				Value: string(dgdr.UID),
			},
1219
1220
		}

1221
		// Build volume mounts
1222
1223
1224
1225
1226
1227
1228
		volumeMounts := []corev1.VolumeMount{
			{
				Name:      VolumeNameProfilingOutput,
				MountPath: ProfilingOutputPath,
			},
		}

1229
		// Add model cache PVC mount if configured
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
		modelCachePVC, modelCacheMountPath := extractModelCachePVCConfig(dgdr)
		if modelCachePVC != "" {
			logger.Info("Mounting model cache PVC to profiler pod", "pvc", modelCachePVC, "mountPath", modelCacheMountPath)
			volumeMounts = append(volumeMounts, corev1.VolumeMount{
				Name:      VolumeNameModelCache,
				MountPath: modelCacheMountPath,
				ReadOnly:  true,
			})
		}

1240
1241
1242
1243
1244
1245
1246
1247
		// v1alpha1 round-trip: mount ConfigMap if referenced via annotation
		cmRef := configMapRefFromAnnotation(dgdr)
		if cmRef != nil {
			volumeMounts = append(volumeMounts, corev1.VolumeMount{
				Name:      VolumeNameProfilingConfig,
				MountPath: ProfilingConfigMountPath,
				ReadOnly:  true,
			})
1248
1249
		}

1250
		// Profiler args: pass the DGDR spec as JSON via --config
1251
1252
		// --output-dir must match ProfilingOutputPath so the sidecar can find profiler_status.yaml
		profilerArgs := []string{"--config", specJSON, "--output-dir", ProfilingOutputPath}
1253

1254
1255
		// Use image from spec; the defaulting webhook fills this in for production builds.
		// Guard against empty image in case the webhook didn't run (e.g. local dev builds).
1256
		imageName := dgdr.Spec.Image
1257
1258
1259
		if imageName == "" {
			return nil, false, fmt.Errorf("spec.image is required but not set; ensure the defaulting webhook ran or set spec.image explicitly")
		}
1260
1261
		logger.Info("Using profiler image", "image", imageName)

1262
		profilerContainer := corev1.Container{
1263
1264
			Name:         ContainerNameProfiler,
			Image:        imageName,
1265
			Command:      []string{"python", "-m", "dynamo.profiler"},
1266
			Args:         profilerArgs,
1267
1268
			Env:          profilerEnv,
			VolumeMounts: volumeMounts,
1269
			WorkingDir:   "/workspace",
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
		}

		// Generate sidecar script from template
		tmpl, err := template.New("sidecar").Parse(sidecarScriptTemplate)
		if err != nil {
			return nil, false, fmt.Errorf("failed to parse sidecar script template: %w", err)
		}

		var scriptBuf bytes.Buffer
		err = tmpl.Execute(&scriptBuf, map[string]string{
1280
1281
1282
1283
1284
			"OutputPath":    ProfilingOutputPath,
			"OutputFile":    ProfilingOutputFile,
			"ConfigMapName": outputConfigMapName,
			"Namespace":     dgdr.Namespace,
			"DGDRName":      dgdr.Name,
1285
			"DGDRuid":       string(dgdr.UID),
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
		})
		if err != nil {
			return nil, false, fmt.Errorf("failed to execute sidecar script template: %w", err)
		}

		sidecarContainer := corev1.Container{
			Name:    ContainerNameOutputCopier,
			Image:   SidecarImage,
			Command: []string{"/bin/sh", "-c"},
			Args:    []string{scriptBuf.String()},
			VolumeMounts: []corev1.VolumeMount{{
				Name:      VolumeNameProfilingOutput,
				MountPath: ProfilingOutputPath,
				ReadOnly:  true,
			}},
		}

1303
1304
		// Use PVC for profiling output if round-tripped v1alpha1 annotation is present,
		// otherwise use emptyDir (v1beta1 default).
1305
		var profilingOutputVolume corev1.Volume
1306
1307
		if outputPVC := outputPVCFromAnnotation(dgdr); outputPVC != "" {
			logger.Info("Using PVC for profiling output (from v1alpha1 annotation)", "pvc", outputPVC)
1308
1309
1310
1311
			profilingOutputVolume = corev1.Volume{
				Name: VolumeNameProfilingOutput,
				VolumeSource: corev1.VolumeSource{
					PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{
1312
						ClaimName: outputPVC,
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
					},
				},
			}
		} else {
			profilingOutputVolume = corev1.Volume{
				Name: VolumeNameProfilingOutput,
				VolumeSource: corev1.VolumeSource{
					EmptyDir: &corev1.EmptyDirVolumeSource{},
				},
			}
		}
		volumes := []corev1.Volume{profilingOutputVolume}
1325

1326
1327
		// Add model cache PVC volume if configured
		if modelCachePVC != "" {
1328
			volumes = append(volumes, corev1.Volume{
1329
				Name: VolumeNameModelCache,
1330
				VolumeSource: corev1.VolumeSource{
1331
1332
1333
					PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{
						ClaimName: modelCachePVC,
						ReadOnly:  true,
1334
1335
1336
1337
1338
					},
				},
			})
		}

1339
1340
1341
1342
1343
1344
		// v1alpha1 round-trip: add ConfigMap volume if referenced via annotation
		if cmRef != nil {
			cmKey := cmRef.Key
			if cmKey == "" {
				cmKey = ProfilingConfigDefaultKey
			}
1345
			volumes = append(volumes, corev1.Volume{
1346
				Name: VolumeNameProfilingConfig,
1347
				VolumeSource: corev1.VolumeSource{
1348
1349
1350
1351
1352
1353
1354
1355
					ConfigMap: &corev1.ConfigMapVolumeSource{
						LocalObjectReference: corev1.LocalObjectReference{
							Name: cmRef.Name,
						},
						Items: []corev1.KeyToPath{{
							Key:  cmKey,
							Path: ProfilingConfigDefaultKey,
						}},
1356
1357
1358
1359
1360
					},
				},
			})
		}

1361
1362
1363
		// Limit retries to prevent infinite loop
		backoffLimit := int32(3)

1364
1365
1366
1367
		podSpec := corev1.PodSpec{
			ServiceAccountName: ServiceAccountProfilingJob,
			RestartPolicy:      corev1.RestartPolicyNever,
			SecurityContext: &corev1.PodSecurityContext{
1368
1369
1370
1371
				RunAsNonRoot: ptr.To(true),
				RunAsUser:    ptr.To[int64](1000),
				RunAsGroup:   ptr.To[int64](1000),
				FSGroup:      ptr.To[int64](1000),
1372
1373
1374
1375
1376
1377
1378
1379
			},
			Containers: []corev1.Container{profilerContainer, sidecarContainer},
			Volumes:    volumes,
			ImagePullSecrets: []corev1.LocalObjectReference{
				{Name: "nvcr-imagepullsecret"},
			},
		}

1380
1381
1382
1383
1384
		job := &batchv1.Job{
			ObjectMeta: metav1.ObjectMeta{
				Name:      jobName,
				Namespace: dgdr.Namespace,
				Labels: map[string]string{
1385
1386
1387
					nvidiacomv1beta1.LabelApp:       nvidiacomv1beta1.LabelValueDynamoProfiler,
					nvidiacomv1beta1.LabelDGDR:      dgdr.Name,
					nvidiacomv1beta1.LabelManagedBy: nvidiacomv1beta1.LabelValueDynamoOperator,
1388
1389
1390
1391
1392
				},
			},
			Spec: batchv1.JobSpec{
				BackoffLimit: &backoffLimit,
				Template: corev1.PodTemplateSpec{
1393
					Spec: podSpec,
1394
1395
1396
1397
				},
			},
		}

1398
1399
1400
1401
1402
		var jobOverrides *batchv1.JobSpec
		if dgdr.Spec.Overrides != nil {
			jobOverrides = dgdr.Spec.Overrides.ProfilingJob
		}
		applyProfilingJobOverrides(job, jobOverrides)
1403

1404
1405
1406
1407
1408
1409
1410
1411
		return job, false, nil
	})

	if err != nil {
		return err
	}

	if modified {
1412
		logger.Info("Profiling job created/updated", "job", job.Name)
1413
1414
	}

1415
1416
1417
	// Store the job name in status for observability
	dgdr.Status.ProfilingJobName = job.Name

1418
1419
1420
	return nil
}

1421
1422
1423
1424
1425
1426
// marshalDGDRSpec produces the JSON string passed to the profiler via --config.
// The profiler receives the DGDR spec verbatim — no bespoke key mapping needed.
func marshalDGDRSpec(dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest) (string, error) {
	specJSON, err := json.Marshal(dgdr.Spec)
	if err != nil {
		return "", fmt.Errorf("failed to marshal DGDR spec to JSON: %w", err)
1427
	}
1428
1429
	return string(specJSON), nil
}
1430

1431
1432
1433
1434
1435
1436
1437
// enrichHardwareFromDiscovery fills in hardware fields that the user didn't set.
// Called before marshalDGDRSpec(). Mutates dgdr.Spec.Hardware in-place (memory only, not persisted).
func (r *DynamoGraphDeploymentRequestReconciler) enrichHardwareFromDiscovery(ctx context.Context, dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest) error {
	if dgdr.Spec.Hardware == nil {
		dgdr.Spec.Hardware = &nvidiacomv1beta1.HardwareSpec{}
	}
	hw := dgdr.Spec.Hardware
1438

1439
	if hw.GPUSKU != "" && hw.VRAMMB != nil && hw.NumGPUsPerNode != nil {
hhzhang16's avatar
hhzhang16 committed
1440
		return nil // all fields already set by user; TotalGPUs is filled below when discovery runs
1441
1442
	}

1443
	var gpuInfo *gpu.GPUInfo
1444
	logger := log.FromContext(ctx)
1445
1446
1447
1448
1449
	// Check if user provided hardware info in the typed spec
	hasManualConfig := dgdr.Spec.Hardware != nil && (dgdr.Spec.Hardware.GPUSKU != "" ||
		dgdr.Spec.Hardware.VRAMMB != nil ||
		dgdr.Spec.Hardware.NumGPUsPerNode != nil)
	if !hasManualConfig {
1450

1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
		logger.Info("Attempting GPU discovery for profiling job")
		discoveredInfo, err := r.GPUDiscovery.DiscoverGPUsFromDCGM(ctx, r.APIReader, r.GPUDiscoveryCache)
		if err != nil {
			// This path is expected for namespace-restricted operators without node read permissions
			// Refine the logger message
			reason := GetGPUDiscoveryFailureReason(err)
			logger.Info("GPU discovery not available, using manual hardware configuration from profiling config",
				"reason", reason, "error", err.Error())
			return err
		} else {
			gpuInfo = discoveredInfo
			logger.Info("GPU discovery completed successfully",
				"gpusPerNode", gpuInfo.GPUsPerNode,
				"nodesWithGPUs", gpuInfo.NodesWithGPUs,
				"totalGpus", gpuInfo.GPUsPerNode*gpuInfo.NodesWithGPUs,
				"model", gpuInfo.Model,
				"vramMiB", gpuInfo.VRAMPerGPU,
				"system", gpuInfo.System,
				"cloudprovider", gpuInfo.CloudProvider)
		}
	}
1472
	if hw.GPUSKU == "" {
1473
1474
1475
1476
1477
1478
		if gpuInfo.System != "" {
			hw.GPUSKU = gpuInfo.System
		} else {
			// Unknown GPU type: use raw model name; profiler will attempt naive config generation.
			hw.GPUSKU = nvidiacomv1beta1.GPUSKUType(gpuInfo.Model)
		}
1479
1480
1481
1482
	}
	if hw.VRAMMB == nil {
		vram := float64(gpuInfo.VRAMPerGPU)
		hw.VRAMMB = &vram
1483
	}
1484
1485
1486
1487
	if hw.NumGPUsPerNode == nil {
		n := int32(gpuInfo.GPUsPerNode)
		hw.NumGPUsPerNode = &n
	}
hhzhang16's avatar
hhzhang16 committed
1488
	if hw.TotalGPUs == nil {
1489
1490
1491
		// TODO: This is a temporary limit to prevent the profiler from using too many GPUs.
		// Will be removed once a fix is in the Profiler/AIC.
		const defaultMaxAutoGPUs = int32(32)
hhzhang16's avatar
hhzhang16 committed
1492
		total := int32(gpuInfo.GPUsPerNode * gpuInfo.NodesWithGPUs)
1493
1494
1495
1496
1497
		if total > defaultMaxAutoGPUs {
			logger.Info("Capping auto-discovered TotalGPUs at default limit; set hardware.totalGpus to override",
				"discovered", total, "cap", defaultMaxAutoGPUs)
			total = defaultMaxAutoGPUs
		}
hhzhang16's avatar
hhzhang16 committed
1498
1499
		hw.TotalGPUs = &total
	}
1500
1501
	return nil
}
1502

1503
1504
1505
1506
// extractModelCachePVCConfig reads model cache PVC settings from the typed v1beta1 spec.
// Returns (pvcName, mountPath) — both empty if not configured.
func extractModelCachePVCConfig(dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest) (string, string) {
	if dgdr.Spec.ModelCache == nil || dgdr.Spec.ModelCache.PVCName == "" {
1507
1508
		return "", ""
	}
1509
	mountPath := dgdr.Spec.ModelCache.PVCMountPath
1510
1511
1512
	if mountPath == "" {
		mountPath = DefaultModelCacheMountPath
	}
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
	return dgdr.Spec.ModelCache.PVCName, mountPath
}

// configMapKeySelector mirrors v1alpha1.ConfigMapKeySelector for annotation deserialization.
type configMapKeySelector struct {
	Name string `json:"name"`
	Key  string `json:"key,omitempty"`
}

// configMapRefFromAnnotation reads the ConfigMap reference from the round-trip annotation.
// Returns nil for native v1beta1 resources (no annotation present).
func configMapRefFromAnnotation(dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest) *configMapKeySelector {
	if dgdr.Annotations == nil {
		return nil
	}
	raw, ok := dgdr.Annotations[AnnotationConfigMapRef]
	if !ok || raw == "" {
		return nil
	}
	var ref configMapKeySelector
	if err := json.Unmarshal([]byte(raw), &ref); err != nil {
		return nil
	}
	return &ref
}
1538

1539
1540
1541
1542
1543
1544
1545
// outputPVCFromAnnotation reads the output PVC name from the round-trip annotation.
// Returns "" for native v1beta1 resources (always emptyDir).
func outputPVCFromAnnotation(dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest) string {
	if dgdr.Annotations == nil {
		return ""
	}
	return dgdr.Annotations[AnnotationOutputPVC]
1546
1547
}

1548
// checkProfilingJobStatus checks if the profiling job has completed
1549
func (r *DynamoGraphDeploymentRequestReconciler) checkProfilingJobStatus(ctx context.Context, dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest) (bool, error) {
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
	logger := log.FromContext(ctx)
	jobName := getProfilingJobName(dgdr)

	job := &batchv1.Job{}
	if err := r.Get(ctx, types.NamespacedName{Name: jobName, Namespace: dgdr.Namespace}, job); err != nil {
		return false, err
	}

	// Check job conditions
	for _, condition := range job.Status.Conditions {
		if condition.Type == batchv1.JobComplete && condition.Status == corev1.ConditionTrue {
			logger.Info("Profiling job completed", "job", jobName)
			return true, nil
		}
		if condition.Type == batchv1.JobFailed && condition.Status == corev1.ConditionTrue {
1565
1566
1567
1568
1569
			// Get detailed error from pod logs
			detailedError := r.getProfilingJobErrorDetails(ctx, dgdr, job)
			if detailedError != "" {
				return false, fmt.Errorf("profiling job failed: %s. Details: %s", condition.Message, detailedError)
			}
1570
1571
1572
1573
1574
1575
1576
			return false, fmt.Errorf("profiling job failed: %s", condition.Message)
		}
	}

	return false, nil
}

1577
// getProfilingJobErrorDetails retrieves detailed error information from failed profiling job pods
1578
func (r *DynamoGraphDeploymentRequestReconciler) getProfilingJobErrorDetails(ctx context.Context, dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest, job *batchv1.Job) string {
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
	logger := log.FromContext(ctx)

	// List pods owned by this job
	podList := &corev1.PodList{}
	labelSelector := client.MatchingLabels{
		"job-name": job.Name,
	}

	if err := r.List(ctx, podList, client.InNamespace(dgdr.Namespace), labelSelector); err != nil {
		logger.Error(err, "Failed to list pods for profiling job")
		return ""
	}

	// Look for failed pods and extract error details
	for _, pod := range podList.Items {
		// Check pod phase and container statuses
		if pod.Status.Phase == corev1.PodFailed {
			// Get profiler container status (first container)
			for _, containerStatus := range pod.Status.ContainerStatuses {
				if containerStatus.Name == ContainerNameProfiler && containerStatus.State.Terminated != nil {
					terminated := containerStatus.State.Terminated
					// Construct detailed error message
					errorMsg := fmt.Sprintf("Pod: %s, Container: %s, ExitCode: %d, Reason: %s",
						pod.Name, containerStatus.Name, terminated.ExitCode, terminated.Reason)
					if terminated.Message != "" {
						errorMsg += fmt.Sprintf(", Message: %s", terminated.Message)
					}
					logger.Info("Retrieved profiling job error details", "error", errorMsg)
					return errorMsg
				}
			}

			// If no terminated state found, check waiting state
			for _, containerStatus := range pod.Status.ContainerStatuses {
				if containerStatus.Name == ContainerNameProfiler && containerStatus.State.Waiting != nil {
					waiting := containerStatus.State.Waiting
					errorMsg := fmt.Sprintf("Pod: %s, Container: %s, Waiting - Reason: %s, Message: %s",
						pod.Name, containerStatus.Name, waiting.Reason, waiting.Message)
					logger.Info("Retrieved profiling job waiting details", "error", errorMsg)
					return errorMsg
				}
			}
		}
	}

	return ""
}

1627
1628
1629
1630
1631
1632
1633
1634
1635
1636
1637
1638
1639
1640
1641
1642
1643
1644
1645
// computeDGDName returns the Kubernetes name to use for the DGD that a DGDR owns.
// If the user supplied an explicit name via spec.overrides.dgd.metadata.name that
// value is returned as-is; otherwise the DGDR's own name is used with a "-dgd"
// suffix, guaranteeing uniqueness even when two DGDRs have identical specs (which
// would otherwise both produce the same profiler-generated name, e.g. "vllm-agg").
func computeDGDName(dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest) string {
	if dgdr.Spec.Overrides != nil && dgdr.Spec.Overrides.DGD != nil && len(dgdr.Spec.Overrides.DGD.Raw) > 0 {
		var meta struct {
			Metadata struct {
				Name string `json:"name"`
			} `json:"metadata"`
		}
		if err := json.Unmarshal(dgdr.Spec.Overrides.DGD.Raw, &meta); err == nil && meta.Metadata.Name != "" {
			return meta.Metadata.Name
		}
	}
	return dgdr.Name + "-dgd"
}

1646
1647
1648
1649
// generateDGDSpec reads profiling output from the sidecar ConfigMap, extracts the
// DynamoGraphDeployment spec and pareto configs, stores the spec in an annotation via
// r.Update, and returns the ProfilingResultsStatus and DGD name.
func (r *DynamoGraphDeploymentRequestReconciler) generateDGDSpec(ctx context.Context, dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest) (*nvidiacomv1beta1.ProfilingResultsStatus, string, error) {
1650
	logger := log.FromContext(ctx)
1651
	logger.Info("Generating DGD spec from profiling results", "name", dgdr.Name, "backend", dgdr.Spec.Backend)
1652
1653
1654
1655
1656
1657
1658
1659
1660
1661
1662

	// Read the generated spec from ConfigMap (created by sidecar)
	outputConfigMapName := getOutputConfigMapName(dgdr)
	cm := &corev1.ConfigMap{}
	err := r.Get(ctx, types.NamespacedName{
		Name:      outputConfigMapName,
		Namespace: dgdr.Namespace,
	}, cm)

	if err != nil {
		if apierrors.IsNotFound(err) {
1663
			return nil, "", fmt.Errorf("output ConfigMap %s not found - profiling may not have completed yet", outputConfigMapName)
1664
		}
1665
		return nil, "", fmt.Errorf("failed to get output ConfigMap: %w", err)
1666
1667
	}

1668
	// Select the right config file based on mocker feature flag
1669
1670
	// Profiler writes the selected config (real or mocker) to a single output file
	outputFile := ProfilingOutputFile
1671

1672
	// Get YAML content from ConfigMap
1673
	yamlContent, exists := cm.Data[outputFile]
1674
	if !exists {
1675
		return nil, "", fmt.Errorf("key %s not found in ConfigMap %s", outputFile, outputConfigMapName)
1676
1677
	}

1678
	logger.Info("Found profiling output in ConfigMap", "configMap", outputConfigMapName, "outputFile", outputFile, "size", len(yamlContent))
1679

1680
1681
1682
	// Extract DGD and any supporting resources from potentially multi-document YAML (ConfigMap + DGD)
	dgd, additionalResources, err := r.extractResourcesFromYAML([]byte(yamlContent))
	if err != nil {
1683
		return nil, "", fmt.Errorf("failed to extract DGD from %s: %w", outputFile, err)
1684
1685
	}

1686
1687
1688
1689
1690
1691
1692
	// Override the profiler-generated name with a DGDR-scoped unique name.
	// The profiler emits a static topology-derived name (e.g. "vllm-agg") which
	// collides when multiple DGDRs share identical specs. Derive the name from
	// DGDR identity instead, respecting an explicit override if the user set one.
	dgd.Name = computeDGDName(dgdr)

	logger.Info("Parsed profiling output", "profilerDGDName", dgd.Name, "additionalResources", len(additionalResources))
1693
1694
1695
1696

	if len(additionalResources) > 0 {
		if err := r.storeAdditionalResources(ctx, dgdr, additionalResources); err != nil {
			logger.Error(err, "Failed to store additional resources")
1697
			return nil, "", err
1698
		}
1699
1700
		// storeAdditionalResources calls r.Update internally, bumping resourceVersion.
		// Refetch so the subsequent r.Update for the spec annotation doesn't 409.
1701
		if err := r.Get(ctx, types.NamespacedName{Name: dgdr.Name, Namespace: dgdr.Namespace}, dgdr); err != nil {
1702
			return nil, "", fmt.Errorf("failed to refetch DGDR after storing additional resources: %w", err)
1703
1704
		}
	}
1705

1706
1707
1708
1709
1710
1711
1712
1713
1714
1715
	profilingResults := &nvidiacomv1beta1.ProfilingResultsStatus{}
	if webUIData, ok := cm.Data["webui_data.json"]; ok {
		pareto, err := extractParetoFromWebUIData([]byte(webUIData))
		if err != nil {
			logger.Error(err, "Failed to parse webui_data.json; skipping pareto population")
		} else {
			profilingResults.Pareto = pareto
			logger.Info("Populated ProfilingResults.Pareto", "count", len(pareto))
		}
	}
1716

1717
	// Store the generated DGD in ProfilingResults.SelectedConfig
1718
1719
	dgdJSON, err := json.Marshal(dgd)
	if err != nil {
1720
		return nil, "", fmt.Errorf("failed to marshal generated DGD to JSON: %w", err)
1721
	}
1722
	profilingResults.SelectedConfig = &runtime.RawExtension{Raw: dgdJSON}
1723
1724
1725
1726

	// Serialize the DGD spec to an annotation so createDGD can retrieve it
	dgdBytes, err := sigsyaml.Marshal(dgd)
	if err != nil {
1727
		return nil, "", fmt.Errorf("failed to marshal generated DGD: %w", err)
1728
1729
1730
1731
1732
1733
1734
	}
	if dgdr.Annotations == nil {
		dgdr.Annotations = make(map[string]string)
	}
	dgdr.Annotations["nvidia.com/generated-dgd-spec"] = string(dgdBytes)

	if err := r.Update(ctx, dgdr); err != nil {
1735
1736
1737
1738
1739
1740
1741
1742
1743
1744
1745
1746
1747
1748
1749
1750
1751
1752
		return nil, "", fmt.Errorf("failed to update DGDR with generated DGD annotation: %w", err)
	}
	return profilingResults, dgd.Name, nil
}

// extractParetoFromWebUIData parses webui_data.json and returns all Pareto-optimal
// deployment configurations from the cost table. Each row's last column ("Action")
// is a partial DynamoGraphDeployment YAML snippet.
func extractParetoFromWebUIData(data []byte) ([]nvidiacomv1beta1.ParetoConfig, error) {
	var parsed struct {
		Cost struct {
			Table struct {
				Data [][]json.RawMessage `json:"data"`
			} `json:"table"`
		} `json:"cost"`
	}
	if err := json.Unmarshal(data, &parsed); err != nil {
		return nil, fmt.Errorf("failed to unmarshal webui_data.json: %w", err)
1753
1754
	}

1755
1756
1757
1758
1759
1760
1761
1762
1763
1764
1765
1766
1767
1768
1769
1770
1771
1772
1773
1774
1775
1776
1777
1778
1779
1780
1781
1782
1783
1784
1785
1786
1787
1788
1789
1790
1791
	rows := parsed.Cost.Table.Data
	if len(rows) == 0 {
		return nil, nil
	}

	// Schema: [TTFT(ms), PrefillThpt, ITL(ms), DecodeThpt, TokensPerUser, GPUHours, ActionYAML]
	const minColumns = 7
	const actionColumnIndex = 6

	pareto := make([]nvidiacomv1beta1.ParetoConfig, 0, len(rows))
	for _, row := range rows {
		if len(row) < minColumns {
			continue
		}

		var actionYAML string
		if err := json.Unmarshal(row[actionColumnIndex], &actionYAML); err != nil {
			continue
		}

		var configObj map[string]interface{}
		if err := sigsyaml.Unmarshal([]byte(stripYAMLComments(actionYAML)), &configObj); err != nil {
			continue
		}

		if len(configObj) == 0 {
			continue
		}

		configJSON, err := json.Marshal(configObj)
		if err != nil {
			continue
		}

		pareto = append(pareto, nvidiacomv1beta1.ParetoConfig{
			Config: runtime.RawExtension{Raw: configJSON},
		})
1792
1793
	}

1794
1795
1796
1797
1798
1799
1800
1801
1802
1803
1804
1805
1806
1807
	return pareto, nil
}

// stripYAMLComments removes comment lines (lines whose first non-whitespace character
// is '#') from a YAML string. The profiler prefixes action snippets with comment lines.
func stripYAMLComments(s string) string {
	lines := strings.Split(s, "\n")
	out := lines[:0] // reuse backing array; write index always <= range read index
	for _, line := range lines {
		if !strings.HasPrefix(strings.TrimLeft(line, " \t"), "#") {
			out = append(out, line)
		}
	}
	return strings.Join(out, "\n")
1808
1809
}

1810
1811
// storeAdditionalResources marshals additional resources to YAML and stores them in DGDR annotations.
// Validates annotation size and fails gracefully if too large.
1812
func (r *DynamoGraphDeploymentRequestReconciler) storeAdditionalResources(ctx context.Context, dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest, resources []*unstructured.Unstructured) error {
1813
1814
1815
1816
1817
1818
1819
1820
1821
1822
1823
1824
1825
1826
1827
1828
1829
1830
1831
1832
1833
1834
1835
1836
1837
1838
1839
1840
1841
1842
1843
1844
1845
1846
	if len(resources) == 0 {
		return nil
	}

	var resourcesYAML []byte

	for i, res := range resources {
		resYAML, err := sigsyaml.Marshal(res.Object)
		if err != nil {
			return fmt.Errorf("failed to marshal resource %s/%s: %w", res.GetKind(), res.GetName(), err)
		}
		if i > 0 {
			resourcesYAML = append(resourcesYAML, []byte("\n---\n")...)
		}
		resourcesYAML = append(resourcesYAML, resYAML...)
	}

	// Validate size before storing
	if len(resourcesYAML) > MaxAnnotationSize {
		return fmt.Errorf("additional resources YAML size (%d bytes) exceeds maximum annotation size (%d bytes); "+
			"consider reducing the number of resources or storing them separately",
			len(resourcesYAML), MaxAnnotationSize)
	}

	if dgdr.Annotations == nil {
		dgdr.Annotations = make(map[string]string)
	}
	dgdr.Annotations[AnnotationAdditionalResources] = string(resourcesYAML)

	return r.Update(ctx, dgdr)
}

// extractResourcesFromYAML parses multi-document YAML from profiling output,
// extracting the DynamoGraphDeployment and any ConfigMaps that should be deployed with it.
1847
func (r *DynamoGraphDeploymentRequestReconciler) extractResourcesFromYAML(yamlContent []byte) (*dgdv1alpha1.DynamoGraphDeployment, []*unstructured.Unstructured, error) {
1848
1849
	decoder := yaml.NewYAMLOrJSONDecoder(bytes.NewReader(yamlContent), 4096)

1850
	var dgd *dgdv1alpha1.DynamoGraphDeployment
1851
1852
1853
1854
1855
1856
1857
1858
1859
1860
1861
1862
1863
1864
1865
1866
1867
1868
	var additionalResources []*unstructured.Unstructured

	for {
		obj := &unstructured.Unstructured{}
		if err := decoder.Decode(obj); err != nil {
			if err == io.EOF {
				break
			}
			// Skip invalid documents and continue
			continue
		}

		// Skip empty objects
		if obj.GetKind() == "" {
			continue
		}

		if obj.GetKind() == "DynamoGraphDeployment" {
1869
			dgd = &dgdv1alpha1.DynamoGraphDeployment{}
1870
1871
1872
1873
1874
1875
1876
1877
1878
1879
1880
1881
1882
1883
1884
1885
1886
			if err := runtime.DefaultUnstructuredConverter.FromUnstructured(obj.Object, dgd); err != nil {
				return nil, nil, fmt.Errorf("failed to convert to DynamoGraphDeployment: %w", err)
			}
		} else {
			// Store ConfigMaps or other resources for deployment
			additionalResources = append(additionalResources, obj)
		}
	}

	if dgd == nil {
		return nil, nil, fmt.Errorf("no DynamoGraphDeployment found in YAML content")
	}

	return dgd, additionalResources, nil
}

// extractDGDFromYAML is a convenience wrapper that extracts only the DGD (used by tests)
1887
func (r *DynamoGraphDeploymentRequestReconciler) extractDGDFromYAML(yamlContent []byte) (*dgdv1alpha1.DynamoGraphDeployment, error) {
1888
1889
1890
1891
	dgd, _, err := r.extractResourcesFromYAML(yamlContent)
	return dgd, err
}

1892
1893
1894
1895
1896
1897
1898
1899
1900
1901
1902
1903
1904
1905
1906
1907
1908
1909
1910
1911
1912
1913
1914
1915
1916
1917
1918
1919
1920
1921
1922
1923
1924
1925
1926
1927
1928
1929
1930
1931
1932
1933
1934
1935
1936
1937
1938
1939
1940
1941
1942
1943
1944
1945
1946
1947
1948
1949
1950
1951
1952
// updateDeploymentInfo populates status.deploymentInfo from DGD service replica counts.
func updateDeploymentInfo(dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest, dgd *dgdv1alpha1.DynamoGraphDeployment) bool {
	var totalReplicas, totalAvailable int32
	for _, svc := range dgd.Status.Services {
		totalReplicas += svc.Replicas
		if svc.AvailableReplicas != nil {
			totalAvailable += *svc.AvailableReplicas
		}
	}

	// Short-circuit if nothing changed
	if cur := dgdr.Status.DeploymentInfo; cur != nil &&
		cur.Replicas != nil && *cur.Replicas == totalReplicas &&
		cur.AvailableReplicas != nil && *cur.AvailableReplicas == totalAvailable {
		return false
	}

	dgdr.Status.DeploymentInfo = &nvidiacomv1beta1.DeploymentInfoStatus{
		Replicas:          &totalReplicas,
		AvailableReplicas: &totalAvailable,
	}
	return true
}

// setSucceededCondition sets the aggregate Succeeded condition based on the current phase.
func setSucceededCondition(dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest, phase nvidiacomv1beta1.DGDRPhase) {
	var status metav1.ConditionStatus
	var reason, message string

	switch phase {
	case nvidiacomv1beta1.DGDRPhasePending, "":
		status, reason, message = metav1.ConditionFalse, "Pending", "DGDR is pending"
	case nvidiacomv1beta1.DGDRPhaseProfiling:
		status, reason, message = metav1.ConditionFalse, "Profiling", "Profiling is in progress"
	case nvidiacomv1beta1.DGDRPhaseReady:
		status, reason, message = metav1.ConditionTrue, "SpecGenerated", "Profiling complete, spec available"
	case nvidiacomv1beta1.DGDRPhaseDeploying:
		status, reason, message = metav1.ConditionFalse, "Deploying", "Deployment is in progress"
	case nvidiacomv1beta1.DGDRPhaseDeployed:
		status, reason, message = metav1.ConditionTrue, "Deployed", "Deployment is healthy"
	case nvidiacomv1beta1.DGDRPhaseFailed:
		status, reason, message = metav1.ConditionFalse, "Failed", "DGDR has failed"
	default:
		status, reason, message = metav1.ConditionFalse, "Unknown", "Unknown phase"
	}

	meta.SetStatusCondition(&dgdr.Status.Conditions, metav1.Condition{
		Type:               nvidiacomv1beta1.ConditionTypeSucceeded,
		Status:             status,
		ObservedGeneration: dgdr.Generation,
		Reason:             reason,
		Message:            message,
	})
}

// updatePhaseAndRequeue updates the DGDR phase and requeues
func (r *DynamoGraphDeploymentRequestReconciler) updatePhaseAndRequeue(ctx context.Context, dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest, phase nvidiacomv1beta1.DGDRPhase, message string) (ctrl.Result, error) {
	logger := log.FromContext(ctx)
	logger.Info("Updating DGDR phase", "name", dgdr.Name, "phase", phase, "message", message)
	dgdr.Status.Phase = phase
	setSucceededCondition(dgdr, phase)
1953
1954
1955
1956
1957
1958
	if err := r.Status().Update(ctx, dgdr); err != nil {
		return ctrl.Result{}, err
	}
	return ctrl.Result{Requeue: true}, nil
}

1959
1960
// updatePhaseWithCondition updates phase and adds/updates a condition
func (r *DynamoGraphDeploymentRequestReconciler) updatePhaseWithCondition(
1961
	ctx context.Context,
1962
1963
	dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest,
	phase nvidiacomv1beta1.DGDRPhase,
1964
1965
1966
1967
1968
	conditionType string,
	status metav1.ConditionStatus,
	reason string,
	message string,
) (ctrl.Result, error) {
1969
1970
	dgdr.Status.Phase = phase
	setSucceededCondition(dgdr, phase)
1971
1972
1973
1974
1975
1976
1977
1978
1979
1980
1981
1982
1983
1984
1985
1986
1987
1988
1989
1990
1991
1992

	condition := metav1.Condition{
		Type:               conditionType,
		Status:             status,
		ObservedGeneration: dgdr.Generation,
		LastTransitionTime: metav1.Now(),
		Reason:             reason,
		Message:            message,
	}

	dgdr.AddStatusCondition(condition)

	if err := r.Status().Update(ctx, dgdr); err != nil {
		return ctrl.Result{}, err
	}

	return ctrl.Result{Requeue: true}, nil
}

// SetupWithManager sets up the controller with the Manager
func (r *DynamoGraphDeploymentRequestReconciler) SetupWithManager(mgr ctrl.Manager) error {
	return ctrl.NewControllerManagedBy(mgr).
1993
		For(&nvidiacomv1beta1.DynamoGraphDeploymentRequest{}).
1994
		Named(consts.ResourceTypeDynamoGraphDeploymentRequest).
1995
1996
1997
1998
1999
2000
2001
		Owns(&batchv1.Job{}, builder.WithPredicates(predicate.Funcs{
			// ignore creation cause we don't want to be called again after we create the job
			CreateFunc:  func(ce event.CreateEvent) bool { return false },
			DeleteFunc:  func(de event.DeleteEvent) bool { return true },
			UpdateFunc:  func(de event.UpdateEvent) bool { return true },
			GenericFunc: func(ge event.GenericEvent) bool { return true },
		})). // Watch Jobs created by this controller (via ownerReference)
2002
		// Watch DGDs created by this controller (via label)
2003
		Watches(
2004
			&dgdv1alpha1.DynamoGraphDeployment{},
2005
2006
			handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, obj client.Object) []ctrl.Request {
				// Find DGDR by label instead of owner reference
2007
2008
2009
				dgd := obj.(*dgdv1alpha1.DynamoGraphDeployment)
				dgdrName, hasName := dgd.Labels[nvidiacomv1beta1.LabelDGDRName]
				dgdrNamespace, hasNamespace := dgd.Labels[nvidiacomv1beta1.LabelDGDRNamespace]
2010
2011
2012
2013
2014
2015
2016
2017
2018
2019
2020
2021
2022
2023
2024
2025
2026
				if !hasName || !hasNamespace {
					return nil
				}
				return []ctrl.Request{{
					NamespacedName: types.NamespacedName{
						Name:      dgdrName,
						Namespace: dgdrNamespace,
					},
				}}
			}),
			builder.WithPredicates(predicate.Funcs{
				// ignore creation cause we don't want to be called again after we create the DGD
				CreateFunc:  func(ce event.CreateEvent) bool { return false },
				DeleteFunc:  func(de event.DeleteEvent) bool { return true },
				UpdateFunc:  func(ue event.UpdateEvent) bool { return true },
				GenericFunc: func(ge event.GenericEvent) bool { return true },
			}),
2027
		).
2028
2029
2030
2031
2032
2033
2034
2035
2036
2037
2038
2039
2040
2041
2042
2043
2044
2045
2046
2047
2048
2049
2050
2051
2052
2053
2054
2055
2056
2057
2058
2059
2060
2061
2062
		// Watch output ConfigMaps for profiling sub-phase updates (via label)
		Watches(
			&corev1.ConfigMap{},
			handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, obj client.Object) []ctrl.Request {
				// Only trigger for ConfigMaps with DGDR labels (written by the sidecar)
				cm := obj.(*corev1.ConfigMap)
				dgdrName, hasName := cm.Labels[nvidiacomv1beta1.LabelDGDRName]
				dgdrNamespace, hasNamespace := cm.Labels[nvidiacomv1beta1.LabelDGDRNamespace]
				if !hasName || !hasNamespace {
					return nil
				}
				return []ctrl.Request{{
					NamespacedName: types.NamespacedName{
						Name:      dgdrName,
						Namespace: dgdrNamespace,
					},
				}}
			}),
			builder.WithPredicates(predicate.Funcs{
				CreateFunc: func(ce event.CreateEvent) bool {
					labels := ce.Object.GetLabels()
					_, hasName := labels[nvidiacomv1beta1.LabelDGDRName]
					_, hasNamespace := labels[nvidiacomv1beta1.LabelDGDRNamespace]
					return hasName && hasNamespace
				},
				UpdateFunc: func(ue event.UpdateEvent) bool {
					labels := ue.ObjectNew.GetLabels()
					_, hasName := labels[nvidiacomv1beta1.LabelDGDRName]
					_, hasNamespace := labels[nvidiacomv1beta1.LabelDGDRNamespace]
					return hasName && hasNamespace
				},
				DeleteFunc:  func(de event.DeleteEvent) bool { return false },
				GenericFunc: func(ge event.GenericEvent) bool { return false },
			}),
		).
2063
2064
		// Set the event filter to ignore resources handled by other controllers in namespace-restricted mode
		WithEventFilter(commonController.EphemeralDeploymentEventFilter(r.Config, r.RuntimeConfig)).
2065
		Complete(observability.NewObservedReconciler(r, consts.ResourceTypeDynamoGraphDeploymentRequest))
2066
}