"...controller/dynamographdeploymentrequest_controller.go" did not exist on "6f708832acfea3a869b8522726f17e16476ad3ee"
dynamographdeploymentrequest_controller.go 76.9 KB
Newer Older
1
/*
2
 * SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
 * SPDX-License-Identifier: Apache-2.0
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package controller

import (
	"bytes"
	"context"
23
24
	"encoding/json"
	"errors"
25
	"fmt"
26
	"io"
27
	"strings"
28
29
30
31
32
33
34
	"text/template"

	batchv1 "k8s.io/api/batch/v1"
	corev1 "k8s.io/api/core/v1"
	apierrors "k8s.io/apimachinery/pkg/api/errors"
	"k8s.io/apimachinery/pkg/api/meta"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
35
	"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
36
37
	"k8s.io/apimachinery/pkg/runtime"
	"k8s.io/apimachinery/pkg/types"
38
	"k8s.io/apimachinery/pkg/util/yaml"
39
	"k8s.io/client-go/tools/record"
40
	"k8s.io/utils/ptr"
41
42
43
44
45
46
47
	ctrl "sigs.k8s.io/controller-runtime"
	"sigs.k8s.io/controller-runtime/pkg/builder"
	"sigs.k8s.io/controller-runtime/pkg/client"
	"sigs.k8s.io/controller-runtime/pkg/event"
	"sigs.k8s.io/controller-runtime/pkg/handler"
	"sigs.k8s.io/controller-runtime/pkg/log"
	"sigs.k8s.io/controller-runtime/pkg/predicate"
48
	sigsyaml "sigs.k8s.io/yaml"
49

50
	configv1alpha1 "github.com/ai-dynamo/dynamo/deploy/operator/api/config/v1alpha1"
51
52
	dgdv1alpha1 "github.com/ai-dynamo/dynamo/deploy/operator/api/v1alpha1"
	nvidiacomv1beta1 "github.com/ai-dynamo/dynamo/deploy/operator/api/v1beta1"
53
	"github.com/ai-dynamo/dynamo/deploy/operator/internal/consts"
54
	commonController "github.com/ai-dynamo/dynamo/deploy/operator/internal/controller_common"
55
	"github.com/ai-dynamo/dynamo/deploy/operator/internal/gpu"
56
	"github.com/ai-dynamo/dynamo/deploy/operator/internal/observability"
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
)

const (
	// Job naming
	JobNamePrefixOnline = "profile-online-"
	JobNamePrefixAIC    = "profile-aic-"

	// Container names
	ContainerNameProfiler     = "profiler"
	ContainerNameOutputCopier = "output-copier"

	// ServiceAccount
	ServiceAccountProfilingJob = "dgdr-profiling-job"

	// ConfigMap naming
	ConfigMapOutputPrefix = "dgdr-output-"

74
75
76
	// Annotation keys
	AnnotationAdditionalResources = "dgdr.nvidia.com/additional-resources"

77
78
79
80
81
82
	// Annotation keys for v1alpha1 round-trip compatibility.
	// The conversion layer stores v1alpha1 fields that have no v1beta1 spec equivalent
	// as annotations so the controller can still honour them for converted resources.
	AnnotationConfigMapRef = "nvidia.com/dgdr-config-map-ref"
	AnnotationOutputPVC    = "nvidia.com/dgdr-output-pvc"

83
84
85
	// Size limits
	MaxAnnotationSize = 250000 // ~250KB, below K8s 256KB limit

86
87
88
89
90
	// Sidecar image
	SidecarImage = "bitnami/kubectl:latest"

	// Volume names
	VolumeNameProfilingOutput = "profiling-output"
91
	VolumeNameProfilingConfig = "profiling-config"
92
	VolumeNameModelCache      = "model-cache"
93
94

	// Volume paths
95
	ProfilingOutputPath        = "/data"
96
	ProfilingOutputFile        = "final_config.yaml"
97
98
	ProfilingConfigMountPath   = "/config"
	ProfilingConfigDefaultKey  = "disagg.yaml"
99
	DefaultModelCacheMountPath = "/opt/model-cache"
100
101
102
103
104
105
106
107
108
109

	// Command line arguments
	ArgModel   = "--model"
	ArgBackend = "--backend"
	ArgTTFT    = "--ttft"
	ArgITL     = "--itl"
	ArgConfig  = "--config"

	// Messages
	MessageInitialized               = "DGDR initialized successfully"
110
	MessageDiscoveringHardware       = "Discovering GPU hardware and preparing profiling job"
111
112
113
114
	MessageProfilingJobCreated       = "Profiling job created"
	MessageAICProfilingJobCreated    = "AIC profiling job created"
	MessageProfilingInProgress       = "Profiling is in progress"
	MessageSpecGenerated             = "DynamoGraphDeployment spec generated successfully"
115
	MessageSpecAvailable             = "Generated spec is available in annotation nvidia.com/generated-dgd-spec"
116
117
118
119
120
	MessageDeploymentCreated         = "DynamoGraphDeployment %s created successfully"
	MessageDeploymentReady           = "DynamoGraphDeployment %s is ready"
	MessageDeploymentDegraded        = "DynamoGraphDeployment %s degraded from Ready to %s"
	MessageDeploymentDeleted         = "DGD %s was deleted. DGDR will not recreate it. Delete this DGDR and create a new one to redeploy."
	MessageInvalidState              = "Invalid state"
121
	MessageSpecChangeRejected        = "Cannot modify spec in phase '%s'. DynamoGraphDeploymentRequest is immutable once profiling starts. Create a new resource with a different name instead."
122
123
124
125
126
127
128
129
	MessageJobCreationFailed         = "JobCreationFailed"
	MessageDeploymentCreationFailed  = "DeploymentCreationFailed"
	MessageResultsRetrievalFailed    = "ResultsRetrievalFailed"
	MessageGenerationFailed          = "GenerationFailed"
	MessageAIConfiguratorCheckFailed = "AIConfiguratorCheckFailed"
	MessageProfilingCheckFailed      = "ProfilingCheckFailed"
	MessageConfigMapNotFound         = "ConfigMap %s not found in namespace %s"
	MessageConfigMapKeyNotFound      = "key %s not found in ConfigMap %s"
130
	MessageModelCachePVCNotFound     = "model cache PVC %s not found in namespace %s"
131
132
)

133
134
135
136
137
138
139
// shell script template for the output copier sidecar.
//
// The sidecar is a continuous poller that:
//  1. During profiling: polls profiler_status.yaml every 10s, relays phase+message
//     to the output ConfigMap so the controller can track sub-phase progress.
//  2. After profiler terminates: writes the final profiling output (final_config.yaml
//     + profiler_status.yaml) to the same ConfigMap, preserving the phase+message keys.
140
141
142
const sidecarScriptTemplate = `
set -e
set -o pipefail
143

144
145
STATUS_FILE="{{.OutputPath}}/profiler_status.yaml"
LAST_PHASE=""
146
147
148
149
START_TIME=$(date +%s)
LAST_PROGRESS_LOG=$START_TIME
PROGRESS_INTERVAL=300

150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
# relay_phase: read phase+message from profiler_status.yaml and write to ConfigMap.
# Only writes when the phase changes (debounce).
relay_phase() {
  if [ ! -f "$STATUS_FILE" ]; then
    return
  fi
  PHASE=$(grep "^phase:" "$STATUS_FILE" 2>/dev/null | awk '{print $2}' | tr -d '"' | tr -d "'" || true)
  MESSAGE=$(grep "^message:" "$STATUS_FILE" 2>/dev/null | sed 's/^message: *//' | tr -d '"' | tr -d "'" || true)
  if [ -z "$PHASE" ] || [ "$PHASE" = "$LAST_PHASE" ]; then
    return
  fi
  echo "Phase update: $PHASE - $MESSAGE"
  cat >/tmp/progress.yaml <<PEOF
apiVersion: v1
kind: ConfigMap
metadata:
  name: {{.ConfigMapName}}
  namespace: {{.Namespace}}
  labels:
    dgdr.nvidia.com/name: {{.DGDRName}}
    dgdr.nvidia.com/namespace: {{.Namespace}}
    nvidia.com/managed-by: dynamo-operator
data:
  phase: "$PHASE"
  message: "$MESSAGE"
PEOF
  kubectl apply -f /tmp/progress.yaml 2>/dev/null && LAST_PHASE="$PHASE" || echo "Warning: failed to update progress ConfigMap"
}

# Main loop: poll profiler_status.yaml and wait for profiler to terminate
echo "Waiting for profiler to complete..."
181
while true; do
182
183
184
  CURRENT_TIME=$(date +%s)
  ELAPSED=$((CURRENT_TIME - START_TIME))

185
186
187
  # Relay phase updates to ConfigMap
  relay_phase

188
189
190
191
192
193
194
195
196
197
  # Log progress every 5 minutes
  if [ $((CURRENT_TIME - LAST_PROGRESS_LOG)) -ge $PROGRESS_INTERVAL ]; then
    echo "Still waiting... ($(($ELAPSED / 60)) minutes elapsed)"
    LAST_PROGRESS_LOG=$CURRENT_TIME
  fi

  # Check if profiler container terminated
  CONTAINER_STATUS=$(kubectl get pod $HOSTNAME -n {{.Namespace}} -o jsonpath='{.status.containerStatuses[?(@.name=="profiler")].state}' 2>/dev/null || echo "")
  if echo "$CONTAINER_STATUS" | grep -q "terminated"; then
    echo "Profiler terminated (ran for $(($ELAPSED / 60)) minutes)"
198
199
    break
  fi
200
  sleep 10
201
202
done

203
204
205
# Final relay: pick up any last phase change written just before termination
relay_phase

206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
# Check profiler status file (2 minute timeout)
echo "Checking profiler status..."
TIMEOUT=120
CHECK_START=$(date +%s)

# Wait for status file to exist
while [ ! -f "$STATUS_FILE" ]; do
  ELAPSED=$(($(date +%s) - CHECK_START))
  if [ $ELAPSED -ge $TIMEOUT ]; then
    echo "ERROR: Status file not found after ${TIMEOUT}s"
    exit 1
  fi
  sleep 2
done

# Read and parse status from YAML file
STATUS=$(grep "^status:" "$STATUS_FILE" | awk '{print $2}' | tr -d '"' | tr -d "'")

if [ -z "$STATUS" ]; then
  echo "ERROR: Invalid status file format"
  exit 1
fi

# Check status value
case "$STATUS" in
  success)
    MESSAGE=$(grep "^message:" "$STATUS_FILE" | sed 's/^message: *//' | tr -d '"' | tr -d "'")
    echo "Profiler succeeded: $MESSAGE"
    ;;
  failed)
    ERROR=$(grep "^error:" "$STATUS_FILE" | sed 's/^error: *//' | tr -d '"' | tr -d "'")
    MESSAGE=$(grep "^message:" "$STATUS_FILE" | sed 's/^message: *//' | tr -d '"' | tr -d "'")
    echo "ERROR: Profiler failed: ${ERROR:-$MESSAGE}"
    exit 1
    ;;
  running)
    echo "ERROR: Profiler still running (unexpected)"
    exit 1
    ;;
  *)
    echo "ERROR: Unknown status: $STATUS"
    exit 1
    ;;
esac

251
252
253
254
255
echo "Writing profiling output to ConfigMap..."

# Read final phase+message to preserve them alongside the profiling output
FINAL_PHASE=$(grep "^phase:" "$STATUS_FILE" 2>/dev/null | awk '{print $2}' | tr -d '"' | tr -d "'" || true)
FINAL_MESSAGE=$(grep "^message:" "$STATUS_FILE" 2>/dev/null | sed 's/^message: *//' | tr -d '"' | tr -d "'" || true)
256

257
# Start building ConfigMap YAML with DGD spec + preserved phase/message
258
259
260
261
262
263
264
265
cat >/tmp/cm.yaml <<EOF
apiVersion: v1
kind: ConfigMap
metadata:
  name: {{.ConfigMapName}}
  namespace: {{.Namespace}}
  labels:
    dgdr.nvidia.com/name: {{.DGDRName}}
266
    dgdr.nvidia.com/namespace: {{.Namespace}}
267
268
    nvidia.com/managed-by: dynamo-operator
data:
269
270
  phase: "$FINAL_PHASE"
  message: "$FINAL_MESSAGE"
271
272
273
274
  {{.OutputFile}}: |
EOF
sed 's/^/    /' {{.OutputPath}}/{{.OutputFile}} >> /tmp/cm.yaml

275
276
277
278
279
280
# Add profiler status file for debugging
if [ -f {{.OutputPath}}/profiler_status.yaml ]; then
  echo "  profiler_status.yaml: |" >> /tmp/cm.yaml
  sed 's/^/    /' {{.OutputPath}}/profiler_status.yaml >> /tmp/cm.yaml
fi

281
282
283
284
285
286
# Add webui_data.json for pareto curve data (used by operator to populate status.profilingResults.pareto)
if [ -f {{.OutputPath}}/webui_data.json ]; then
  echo "  webui_data.json: |" >> /tmp/cm.yaml
  sed 's/^/    /' {{.OutputPath}}/webui_data.json >> /tmp/cm.yaml
fi

287
288
# Note: Profiling data (raw_data.npz converted to JSON) is included in the
# generated DGD YAML as a separate ConfigMap by the profiler, no need to add it here
289
290
291
292
293

kubectl apply -f /tmp/cm.yaml
echo "Saved profiling output to ConfigMap {{.ConfigMapName}}"
`

294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
// profilingPhaseReason returns the condition Reason for a profiling sub-phase.
// By design, the ProfilingPhase string values are identical to the Reason values
// (e.g., ProfilingPhaseSweepingDecode = "SweepingDecode" = ProfilingReasonSweepingDecode).
func profilingPhaseReason(phase nvidiacomv1beta1.ProfilingPhase) string {
	if phase == nvidiacomv1beta1.ProfilingPhaseDone {
		return nvidiacomv1beta1.ProfilingReasonCompleted
	}

	return string(phase)
}

// profilingPhaseFailureReason returns the condition Reason for a failed profiling sub-phase.
// By convention, failure reasons are "<Phase>Failed" (e.g., "SweepingDecodeFailed").
// An empty phase yields the generic "ProfilingFailed".
func profilingPhaseFailureReason(phase nvidiacomv1beta1.ProfilingPhase) string {
	if phase == "" {
		return "ProfilingFailed"
	}
	return string(phase) + "Failed"
}

// validProfilingPhases is the set of phases the profiler sidecar may report.
var validProfilingPhases = map[nvidiacomv1beta1.ProfilingPhase]struct{}{
	nvidiacomv1beta1.ProfilingPhaseInitializing:    {},
	nvidiacomv1beta1.ProfilingPhaseSweepingPrefill: {},
	nvidiacomv1beta1.ProfilingPhaseSweepingDecode:  {},
	nvidiacomv1beta1.ProfilingPhaseSelectingConfig: {},
	nvidiacomv1beta1.ProfilingPhaseBuildingCurves:  {},
	nvidiacomv1beta1.ProfilingPhaseGeneratingDGD:   {},
	nvidiacomv1beta1.ProfilingPhaseDone:            {},
}

// isValidProfilingPhase returns true if phase is a recognized ProfilingPhase value.
func isValidProfilingPhase(phase string) bool {
	_, ok := validProfilingPhases[nvidiacomv1beta1.ProfilingPhase(phase)]
	return ok
}

332
333
334
// DynamoGraphDeploymentRequestReconciler reconciles a DynamoGraphDeploymentRequest object
type DynamoGraphDeploymentRequestReconciler struct {
	client.Client
335
336
337
338
339
340
	APIReader         client.Reader
	Recorder          record.EventRecorder
	Config            *configv1alpha1.OperatorConfiguration
	RuntimeConfig     *commonController.RuntimeConfig
	GPUDiscoveryCache *gpu.GPUDiscoveryCache
	GPUDiscovery      *gpu.GPUDiscovery
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
	// RBACMgr handles RBAC setup for profiling jobs
	RBACManager RBACManager
}

// RBACManager interface for managing RBAC resources
type RBACManager interface {
	EnsureServiceAccountWithRBAC(ctx context.Context, targetNamespace, serviceAccountName, clusterRoleName string) error
}

// GetRecorder implements commonController.Reconciler interface
func (r *DynamoGraphDeploymentRequestReconciler) GetRecorder() record.EventRecorder {
	return r.Recorder
}

// FinalizeResource implements commonController.Finalizer interface
356
func (r *DynamoGraphDeploymentRequestReconciler) FinalizeResource(ctx context.Context, dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest) error {
357
358
359
360
361
362
363
364
365
366
367
368
369
	logger := log.FromContext(ctx)

	logger.Info("DGDR finalized successfully", "name", dgdr.Name)
	return nil
}

// +kubebuilder:rbac:groups=nvidia.com,resources=dynamographdeploymentrequests,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=nvidia.com,resources=dynamographdeploymentrequests/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=nvidia.com,resources=dynamographdeploymentrequests/finalizers,verbs=update
// +kubebuilder:rbac:groups=nvidia.com,resources=dynamographdeployments,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=nvidia.com,resources=dynamographdeployments/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=nvidia.com,resources=dynamographdeployments/finalizers,verbs=update
// +kubebuilder:rbac:groups=batch,resources=jobs,verbs=get;list;watch;create;update;patch;delete
370
// +kubebuilder:rbac:groups=core,resources=pods,verbs=get;list;watch
371
372
373
374
375
376
377
378
379
// +kubebuilder:rbac:groups=core,resources=configmaps,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=core,resources=events,verbs=create;patch

// Reconcile handles the reconciliation loop for DynamoGraphDeploymentRequest
func (r *DynamoGraphDeploymentRequestReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
	logger := log.FromContext(ctx)
	logger.Info("Reconciling DynamoGraphDeploymentRequest", "name", req.Name, "namespace", req.Namespace)

	// Fetch the DGDR instance
380
	dgdr := &nvidiacomv1beta1.DynamoGraphDeploymentRequest{}
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
	if err := r.Get(ctx, req.NamespacedName, dgdr); err != nil {
		if apierrors.IsNotFound(err) {
			logger.Info("DGDR resource not found, ignoring since object must be deleted")
			return ctrl.Result{}, nil
		}
		logger.Error(err, "Failed to get DGDR")
		return ctrl.Result{}, err
	}

	// Handle finalizer using common function
	finalized, err := commonController.HandleFinalizer(ctx, dgdr, r.Client, r)
	if err != nil {
		return ctrl.Result{}, err
	}
	if finalized {
		// Resource was deleted and finalized
		return ctrl.Result{}, nil
	}

	// Check for spec changes (immutability enforcement)
	if dgdr.Status.ObservedGeneration > 0 && dgdr.Status.ObservedGeneration != dgdr.Generation {
		// Spec changed after initial processing
403
404
405
406
		if dgdr.Status.Phase == nvidiacomv1beta1.DGDRPhaseProfiling || dgdr.Status.Phase == nvidiacomv1beta1.DGDRPhaseDeploying ||
			dgdr.Status.Phase == nvidiacomv1beta1.DGDRPhaseReady || dgdr.Status.Phase == nvidiacomv1beta1.DGDRPhaseDeployed {
			logger.Info("Spec change detected in immutable phase",
				"phase", dgdr.Status.Phase,
407
408
409
				"observedGeneration", dgdr.Status.ObservedGeneration,
				"currentGeneration", dgdr.Generation)

410
411
			r.Recorder.Event(dgdr, corev1.EventTypeWarning, nvidiacomv1beta1.EventReasonSpecChangeRejected,
				fmt.Sprintf(MessageSpecChangeRejected, dgdr.Status.Phase))
412
413

			// Keep the old observedGeneration to continue rejecting changes
414
			// No phase transition - stay in current phase with old spec
415
416
417
			return ctrl.Result{}, nil
		}
	}
418
419
420
421
422
423
424
425
426
427
428
429
430
431
	// Phase machine: handle different phases
	switch dgdr.Status.Phase {
	case nvidiacomv1beta1.DGDRPhasePending, "":
		return r.handlePendingPhase(ctx, dgdr)
	case nvidiacomv1beta1.DGDRPhaseProfiling:
		return r.handleProfilingPhase(ctx, dgdr)
	case nvidiacomv1beta1.DGDRPhaseDeploying:
		return r.handleDeployingPhase(ctx, dgdr)
	case nvidiacomv1beta1.DGDRPhaseReady:
		return r.handleReadyPhase(ctx, dgdr)
	case nvidiacomv1beta1.DGDRPhaseDeployed:
		return r.handleDeployedPhase(ctx, dgdr)
	case nvidiacomv1beta1.DGDRPhaseFailed:
		return r.handleFailedPhase(ctx, dgdr)
432
	default:
433
434
		logger.Info("Unknown phase", "phase", dgdr.Status.Phase)
		return r.updatePhaseAndRequeue(ctx, dgdr, nvidiacomv1beta1.DGDRPhaseFailed, MessageInvalidState)
435
436
437
	}
}

438
439
440
441
// handlePendingPhase processes newly created or pending DGDR resources.
// When ObservedGeneration == 0, performs initial validation (merged from v1alpha1 Initializing state).
// Otherwise, starts the profiling process.
func (r *DynamoGraphDeploymentRequestReconciler) handlePendingPhase(ctx context.Context, dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest) (ctrl.Result, error) {
442
443
	logger := log.FromContext(ctx)

444
445
446
	// First-time processing: validate spec (merged from handleInitialState)
	if dgdr.Status.ObservedGeneration == 0 {
		logger.Info("Handling initial validation", "name", dgdr.Name)
447

448
449
450
451
452
		// Validate the spec
		if err := r.validateSpec(ctx, dgdr); err != nil {
			r.Recorder.Event(dgdr, corev1.EventTypeWarning, nvidiacomv1beta1.EventReasonValidationFailed, err.Error())
			return r.updatePhaseWithCondition(ctx, dgdr, nvidiacomv1beta1.DGDRPhaseFailed, nvidiacomv1beta1.ConditionTypeValidation, metav1.ConditionFalse, nvidiacomv1beta1.EventReasonValidationFailed, err.Error())
		}
453

454
455
		// Set observedGeneration to track the spec we're processing
		dgdr.Status.ObservedGeneration = dgdr.Generation
456

457
		// Initialize status — next reconcile will discover hardware and create the profiling job.
458
		r.Recorder.Event(dgdr, corev1.EventTypeNormal, nvidiacomv1beta1.EventReasonInitialized, MessageInitialized)
459
460
461
		return r.updatePhaseWithCondition(ctx, dgdr, nvidiacomv1beta1.DGDRPhasePending,
			nvidiacomv1beta1.ConditionTypeProfiling, metav1.ConditionFalse,
			"DiscoveringHardware", MessageDiscoveringHardware)
462
	}
463

464
	logger.Info("Handling pending phase", "name", dgdr.Name)
465
466
467

	// Create profiling job (online or AIC)
	if err := r.createProfilingJob(ctx, dgdr); err != nil {
468
469
		r.Recorder.Event(dgdr, corev1.EventTypeWarning, nvidiacomv1beta1.EventReasonProfilingJobFailed, err.Error())
		return r.updatePhaseWithCondition(ctx, dgdr, nvidiacomv1beta1.DGDRPhaseFailed, nvidiacomv1beta1.ConditionTypeProfiling, metav1.ConditionFalse, MessageJobCreationFailed, err.Error())
470
471
472
	}

	// Record event with appropriate message
473
	if isOnlineProfiling(dgdr) {
474
		r.Recorder.Event(dgdr, corev1.EventTypeNormal, nvidiacomv1beta1.EventReasonProfilingJobCreated, MessageProfilingJobCreated)
475
	} else {
476
		r.Recorder.Event(dgdr, corev1.EventTypeNormal, nvidiacomv1beta1.EventReasonProfilingJobCreated, MessageAICProfilingJobCreated)
477
478
	}

479
	// Update to Profiling phase — use Initializing reason to indicate the profiler is loading.
480
	dgdr.SetProfilingPhase(nvidiacomv1beta1.ProfilingPhaseInitializing)
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
	return r.updatePhaseWithCondition(ctx, dgdr, nvidiacomv1beta1.DGDRPhaseProfiling, nvidiacomv1beta1.ConditionTypeProfiling, metav1.ConditionFalse, nvidiacomv1beta1.ProfilingReasonInitializing, MessageDiscoveringHardware)
}

// updateProfilingSubPhase reads the output ConfigMap and updates status.profilingPhase
// and the Profiling/Succeeded conditions. The sidecar continuously polls profiler_status.yaml
// and writes phase+message to the output ConfigMap (dgdr-output-<name>). This function
// reads those keys and copies them verbatim into the DGDR status.
func (r *DynamoGraphDeploymentRequestReconciler) updateProfilingSubPhase(
	ctx context.Context,
	dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest,
) error {
	logger := log.FromContext(ctx)
	outputCMName := getOutputConfigMapName(dgdr)

	cm := &corev1.ConfigMap{}
	if err := r.Get(ctx, types.NamespacedName{
		Name: outputCMName, Namespace: dgdr.Namespace,
	}, cm); err != nil {
		return nil // No output ConfigMap yet — skip
	}

	phase, exists := cm.Data["phase"]
	if !exists || phase == "" {
		return nil
	}

	if !isValidProfilingPhase(phase) {
		return fmt.Errorf("invalid profiling phase %q in ConfigMap %s", phase, outputCMName)
	}

	profilingPhase := nvidiacomv1beta1.ProfilingPhase(phase)
	if dgdr.Status.ProfilingPhase == profilingPhase {
		return nil // No change
	}

	logger.Info("Profiling sub-phase updated", "phase", phase)
	dgdr.SetProfilingPhase(profilingPhase)

	// Reason is derived from phase; message comes from the profiler via ConfigMap.
	reason := profilingPhaseReason(profilingPhase)
	message := cm.Data["message"] // written by profiler, relayed by sidecar

	meta.SetStatusCondition(&dgdr.Status.Conditions, metav1.Condition{
		Type:               nvidiacomv1beta1.ConditionTypeProfiling,
		Status:             metav1.ConditionFalse,
		ObservedGeneration: dgdr.Generation,
		Reason:             reason,
		Message:            message,
	})
	meta.SetStatusCondition(&dgdr.Status.Conditions, metav1.Condition{
		Type:               nvidiacomv1beta1.ConditionTypeSucceeded,
		Status:             metav1.ConditionFalse,
		ObservedGeneration: dgdr.Generation,
		Reason:             reason,
		Message:            message,
	})

	return r.Status().Update(ctx, dgdr)
539
540
}

541
542
// handleProfilingPhase monitors profiling progress and generates spec when complete
func (r *DynamoGraphDeploymentRequestReconciler) handleProfilingPhase(ctx context.Context, dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest) (ctrl.Result, error) {
543
	logger := log.FromContext(ctx)
544
	logger.Info("Handling profiling phase", "name", dgdr.Name)
545

546
547
548
549
550
	// Check for sub-phase updates from output ConfigMap (populated by sidecar poller)
	if err := r.updateProfilingSubPhase(ctx, dgdr); err != nil {
		return ctrl.Result{}, err
	}

551
552
553
554
555
	// Check profiling job status (both online and offline/AIC run as Jobs)
	// Note: We watch the Job via Owns(), so we'll be triggered automatically on Job changes
	completed, err := r.checkProfilingJobStatus(ctx, dgdr)
	if err != nil {
		r.Recorder.Event(dgdr, corev1.EventTypeWarning, MessageProfilingCheckFailed, err.Error())
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
		// Job failed - keep profilingPhase set so users can see where it died.
		// profilingPhase is already current: set to Initializing on entry,
		// then updated by updateProfilingSubPhase() above (reads output ConfigMap).
		failureReason := "ProfilingFailed"
		failureMessage := err.Error()
		if dgdr.Status.ProfilingPhase != "" {
			failureReason = profilingPhaseFailureReason(dgdr.Status.ProfilingPhase)
		}

		// Set phase and conditions directly so we can use sub-phase-specific failure
		// reason on both Profiling and Succeeded conditions. (updatePhaseWithCondition
		// would hardcode Succeeded reason to generic "Failed".)
		dgdr.Status.Phase = nvidiacomv1beta1.DGDRPhaseFailed
		meta.SetStatusCondition(&dgdr.Status.Conditions, metav1.Condition{
			Type:               nvidiacomv1beta1.ConditionTypeSucceeded,
			Status:             metav1.ConditionFalse,
			ObservedGeneration: dgdr.Generation,
			Reason:             failureReason,
			Message:            failureMessage,
		})
		dgdr.AddStatusCondition(metav1.Condition{
			Type:               nvidiacomv1beta1.ConditionTypeProfiling,
			Status:             metav1.ConditionFalse,
			ObservedGeneration: dgdr.Generation,
			Reason:             failureReason,
			Message:            failureMessage,
		})
		if err := r.Status().Update(ctx, dgdr); err != nil {
			return ctrl.Result{}, err
		}
		return ctrl.Result{Requeue: true}, nil
587
588
589
590
	}

	if !completed {
		logger.Info("Profiling job still running", "name", dgdr.Name)
591
		// Transition from Initializing to ProfilingRunning once the job is confirmed active.
592
		cond := meta.FindStatusCondition(dgdr.Status.Conditions, nvidiacomv1beta1.ConditionTypeProfiling)
593
		if cond != nil && cond.Reason == nvidiacomv1beta1.ProfilingReasonInitializing {
594
595
			return r.updatePhaseWithCondition(ctx, dgdr, nvidiacomv1beta1.DGDRPhaseProfiling, nvidiacomv1beta1.ConditionTypeProfiling, metav1.ConditionFalse, "ProfilingRunning", MessageProfilingInProgress)
		}
596
597
598
599
		// Don't requeue - we'll be triggered when the Job completes/fails
		return ctrl.Result{}, nil
	}

600
601
602
603
604
605
606
607
608
	profilingResults, dgdName, err := r.generateDGDSpec(ctx, dgdr)
	if err != nil {
		dgdr.ClearProfilingPhase()
		r.Recorder.Event(dgdr, corev1.EventTypeWarning, MessageGenerationFailed, err.Error())
		return r.updatePhaseWithCondition(ctx, dgdr, nvidiacomv1beta1.DGDRPhaseFailed, nvidiacomv1beta1.ConditionTypeSpecGenerated, metav1.ConditionFalse, MessageGenerationFailed, err.Error())
	}
	if err := r.Get(ctx, types.NamespacedName{Name: dgdr.Name, Namespace: dgdr.Namespace}, dgdr); err != nil {
		return ctrl.Result{}, fmt.Errorf("failed to refetch DGDR after generateDGDSpec: %w", err)
	}
609

610
	dgdr.ClearProfilingPhase()
611
	meta.SetStatusCondition(&dgdr.Status.Conditions, metav1.Condition{
612
		Type:               nvidiacomv1beta1.ConditionTypeProfiling,
613
614
615
616
617
		Status:             metav1.ConditionTrue,
		ObservedGeneration: dgdr.Generation,
		Reason:             "ProfilingCompleted",
		Message:            "Profiling job completed successfully",
	})
618
619
	dgdr.Status.DGDName = dgdName
	dgdr.Status.ProfilingResults = profilingResults
620

621
	r.Recorder.Event(dgdr, corev1.EventTypeNormal, nvidiacomv1beta1.EventReasonSpecGenerated, MessageSpecGenerated)
622

623
624
	// Create additional resources (ConfigMaps) immediately after profiling
	// This ensures that the `planner-profile-data` ConfigMap is available for both auto and manual deployment
625
	// v1beta1 uses the DGDR namespace for additional resources.
626
627
628
629
630
631
632
633
	targetNamespace := dgdr.Namespace
	if err := r.createAdditionalResources(ctx, dgdr, targetNamespace); err != nil {
		logger.Error(err, "Failed to create additional resources after profiling")
		// Don't fail the DGDR, just log the error - ConfigMaps can be created manually
		r.Recorder.Event(dgdr, corev1.EventTypeWarning, "ConfigMapCreationFailed",
			fmt.Sprintf("Failed to create ConfigMaps from profiling output: %v", err))
	}

634
	// If autoApply is enabled, transition to Deploying phase
635
	if dgdr.Spec.AutoApply == nil || *dgdr.Spec.AutoApply {
636
637
		logger.Info("AutoApply enabled, transitioning to Deploying phase")
		return r.updatePhaseWithCondition(ctx, dgdr, nvidiacomv1beta1.DGDRPhaseDeploying, nvidiacomv1beta1.ConditionTypeSpecGenerated, metav1.ConditionTrue, nvidiacomv1beta1.EventReasonSpecGenerated, MessageSpecGenerated)
638
639
	}

640
641
	// Otherwise, transition to Ready phase
	return r.updatePhaseWithCondition(ctx, dgdr, nvidiacomv1beta1.DGDRPhaseReady, nvidiacomv1beta1.ConditionTypeSpecGenerated, metav1.ConditionTrue, nvidiacomv1beta1.EventReasonSpecGenerated, MessageSpecAvailable)
642
643
}

644
645
// handleReadyPhase handles DGDR in Ready phase (profiling complete, spec available)
func (r *DynamoGraphDeploymentRequestReconciler) handleReadyPhase(ctx context.Context, dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest) (ctrl.Result, error) {
646
647
648
	logger := log.FromContext(ctx)
	logger.Info("DGDR is ready", "name", dgdr.Name)

649
650
651
652
653
654
655
656
657
	// Nothing to monitor in Ready phase - spec is available for manual application
	return ctrl.Result{}, nil
}

// handleDeployingPhase handles DGD creation and monitors deployment
func (r *DynamoGraphDeploymentRequestReconciler) handleDeployingPhase(ctx context.Context, dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest) (ctrl.Result, error) {
	logger := log.FromContext(ctx)
	logger.Info("Handling deploying phase", "name", dgdr.Name)

658
	if dgdr.Spec.AutoApply != nil && !*dgdr.Spec.AutoApply {
659
660
661
662
663
		// Shouldn't be in this phase without autoApply
		logger.Info("AutoApply not enabled, transitioning to Ready")
		dgdr.Status.Phase = nvidiacomv1beta1.DGDRPhaseReady
		setSucceededCondition(dgdr, nvidiacomv1beta1.DGDRPhaseReady)
		return ctrl.Result{}, r.Status().Update(ctx, dgdr)
664
665
	}

666
667
668
669
670
	if dgdr.Status.DGDName == "" {
		return r.createDGD(ctx, dgdr)
	}

	dgd := &dgdv1alpha1.DynamoGraphDeployment{}
671
	err := r.Get(ctx, types.NamespacedName{
672
673
		Name:      dgdr.Status.DGDName,
		Namespace: dgdr.Namespace,
674
675
676
	}, dgd)

	if apierrors.IsNotFound(err) {
677
678
679
680
681
		// Annotation present means DGD was never created (spec ready but create not yet called).
		// Annotation absent means DGD was previously created and then manually deleted.
		if _, hasSpec := dgdr.Annotations["nvidia.com/generated-dgd-spec"]; hasSpec {
			return r.createDGD(ctx, dgdr)
		}
682
683
684
685
686
687
688
		return r.handleDGDDeleted(ctx, dgdr)
	}

	if err != nil {
		return ctrl.Result{}, err
	}

689
690
691
	// Check if DGD is Ready
	var condStatus metav1.ConditionStatus
	var condReason, condMessage string
692

693
694
695
696
	if dgd.Status.State == dgdv1alpha1.DGDStateSuccessful {
		logger.Info("DGD is Ready, transitioning to Deployed phase")
		dgdr.Status.Phase = nvidiacomv1beta1.DGDRPhaseDeployed
		setSucceededCondition(dgdr, nvidiacomv1beta1.DGDRPhaseDeployed)
697

698
699
		r.Recorder.Event(dgdr, corev1.EventTypeNormal, nvidiacomv1beta1.EventReasonDeploymentReady,
			fmt.Sprintf(MessageDeploymentReady, dgd.Name))
700

701
702
703
704
705
		condStatus = metav1.ConditionTrue
		condReason = nvidiacomv1beta1.EventReasonDeploymentReady
		condMessage = fmt.Sprintf(MessageDeploymentReady, dgd.Name)
	} else {
		logger.Info("DGD not yet ready", "name", dgd.Name, "state", dgd.Status.State)
706

707
708
709
		condStatus = metav1.ConditionFalse
		condReason = "DeploymentInProgress"
		condMessage = fmt.Sprintf("DGD %s is in %s state", dgd.Name, string(dgd.Status.State))
710
711
	}

712
713
714
715
716
717
718
719
	updateDeploymentInfo(dgdr, dgd)
	meta.SetStatusCondition(&dgdr.Status.Conditions, metav1.Condition{
		Type:    nvidiacomv1beta1.ConditionTypeDeploymentReady,
		Status:  condStatus,
		Reason:  condReason,
		Message: condMessage,
	})

720
721
722
	return ctrl.Result{}, r.Status().Update(ctx, dgdr)
}

723
724
// handleDeployedPhase monitors a healthy DGD and detects degradation or deletion
func (r *DynamoGraphDeploymentRequestReconciler) handleDeployedPhase(ctx context.Context, dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest) (ctrl.Result, error) {
725
	logger := log.FromContext(ctx)
726
	logger.Info("DGDR is deployed", "name", dgdr.Name)
727

728
729
	// Check if DGD still exists and monitor its status
	dgd := &dgdv1alpha1.DynamoGraphDeployment{}
730
	err := r.Get(ctx, types.NamespacedName{
731
732
		Name:      dgdr.Status.DGDName,
		Namespace: dgdr.Namespace,
733
734
735
736
737
738
739
740
741
742
743
	}, dgd)

	if apierrors.IsNotFound(err) {
		// DGD was deleted by user
		return r.handleDGDDeleted(ctx, dgdr)
	}

	if err != nil {
		return ctrl.Result{}, err
	}

744
745
746
747
	// Check if DGD degraded from Ready
	if dgd.Status.State != dgdv1alpha1.DGDStateSuccessful {
		logger.Info("DGD degraded, transitioning back to Deploying",
			"dgdState", dgd.Status.State)
748

749
750
751
		dgdr.Status.Phase = nvidiacomv1beta1.DGDRPhaseDeploying
		setSucceededCondition(dgdr, nvidiacomv1beta1.DGDRPhaseDeploying)
		updateDeploymentInfo(dgdr, dgd)
752

753
754
		r.Recorder.Event(dgdr, corev1.EventTypeWarning, nvidiacomv1beta1.EventReasonDeploymentDegraded,
			fmt.Sprintf(MessageDeploymentDegraded, dgd.Name, string(dgd.Status.State)))
755
756

		meta.SetStatusCondition(&dgdr.Status.Conditions, metav1.Condition{
757
758
759
760
			Type:    nvidiacomv1beta1.ConditionTypeDeploymentReady,
			Status:  metav1.ConditionFalse,
			Reason:  nvidiacomv1beta1.EventReasonDeploymentDegraded,
			Message: fmt.Sprintf("Deployment degraded to %s", string(dgd.Status.State)),
761
		})
762
763
764
765
766
767
	} else {
		// DGD is healthy — update replica info only if changed
		if !updateDeploymentInfo(dgdr, dgd) {
			// Nothing changed, skip the status write
			return ctrl.Result{}, nil
		}
768
769
770
771
772
	}

	return ctrl.Result{}, r.Status().Update(ctx, dgdr)
}

773
774
775
// handleDGDDeleted handles the case when auto-created DGD is deleted by user.
// In v1beta1, this transitions to Failed (DeploymentDeleted phase was removed).
func (r *DynamoGraphDeploymentRequestReconciler) handleDGDDeleted(ctx context.Context, dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest) (ctrl.Result, error) {
776
	logger := log.FromContext(ctx)
777
	logger.Info("DGD was deleted by user, transitioning to Failed phase")
778

779
780
	dgdr.Status.Phase = nvidiacomv1beta1.DGDRPhaseFailed
	setSucceededCondition(dgdr, nvidiacomv1beta1.DGDRPhaseFailed)
781

782
783
	r.Recorder.Event(dgdr, corev1.EventTypeWarning, nvidiacomv1beta1.EventReasonDeploymentDeleted,
		fmt.Sprintf(MessageDeploymentDeleted, dgdr.Status.DGDName))
784

785
786
	dgdr.Status.DGDName = ""
	dgdr.Status.DeploymentInfo = nil
787

788
	meta.SetStatusCondition(&dgdr.Status.Conditions, metav1.Condition{
789
		Type:    nvidiacomv1beta1.ConditionTypeDeploymentReady,
790
		Status:  metav1.ConditionFalse,
791
		Reason:  nvidiacomv1beta1.EventReasonDeploymentDeleted,
792
793
794
795
796
797
798
		Message: "Deployment was deleted by user. Create a new DGDR to redeploy.",
	})

	return ctrl.Result{}, r.Status().Update(ctx, dgdr)
}

// createDGD creates a DynamoGraphDeployment with the generated spec
799
func (r *DynamoGraphDeploymentRequestReconciler) createDGD(ctx context.Context, dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest) (ctrl.Result, error) {
800
801
	logger := log.FromContext(ctx)

802
803
804
805
	// Extract DGD spec from annotation (stored by generateDGDSpec)
	dgdSpecYAML, ok := dgdr.Annotations["nvidia.com/generated-dgd-spec"]
	if !ok || dgdSpecYAML == "" {
		return ctrl.Result{}, fmt.Errorf("generated DGD spec not found in annotation nvidia.com/generated-dgd-spec")
806
807
	}

808
809
810
	generatedDGD := &dgdv1alpha1.DynamoGraphDeployment{}
	if err := yaml.Unmarshal([]byte(dgdSpecYAML), generatedDGD); err != nil {
		return ctrl.Result{}, fmt.Errorf("failed to unmarshal generated deployment from annotation: %w", err)
811
812
	}

813
	// Determine DGD name and namespace from generated deployment
814
815
816
817
818
819
820
821
822
823
824
	dgdName := generatedDGD.Name
	dgdNamespace := dgdr.Namespace

	// Build labels (start with generated DGD's labels)
	labels := make(map[string]string)
	if generatedDGD.Labels != nil {
		for k, v := range generatedDGD.Labels {
			labels[k] = v
		}
	}
	// Add/override with managed labels
825
826
827
	labels[nvidiacomv1beta1.LabelDGDRName] = dgdr.Name
	labels[nvidiacomv1beta1.LabelDGDRNamespace] = dgdr.Namespace
	labels[nvidiacomv1beta1.LabelManagedBy] = nvidiacomv1beta1.LabelValueDynamoOperator
828
829
830
831
832
833
834
835
836
837

	// Build annotations (start with generated DGD's annotations)
	annotations := make(map[string]string)
	if generatedDGD.Annotations != nil {
		for k, v := range generatedDGD.Annotations {
			annotations[k] = v
		}
	}

	// Create DGD from generated deployment
838
	dgd := &dgdv1alpha1.DynamoGraphDeployment{
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
		ObjectMeta: metav1.ObjectMeta{
			Name:        dgdName,
			Namespace:   dgdNamespace,
			Labels:      labels,
			Annotations: annotations,
		},
		Spec: generatedDGD.Spec,
	}

	// Note: We don't set owner reference on DGD
	// If a DGDR is deleted, the DGD may be serving traffic and should persist independently.
	// We use labels (LabelDGDRName) to track the relationship.

	logger.Info("Creating DynamoGraphDeployment", "name", dgdName, "namespace", dgdNamespace)

	if err := r.Create(ctx, dgd); err != nil {
		if apierrors.IsAlreadyExists(err) {
			logger.Info("DGD already exists, updating status")
857
858
859
860
861
			delete(dgdr.Annotations, "nvidia.com/generated-dgd-spec")
			if updateErr := r.Update(ctx, dgdr); updateErr != nil {
				logger.Error(updateErr, "Failed to remove generated-dgd-spec annotation on IsAlreadyExists path")
				return ctrl.Result{}, updateErr
			}
862
			dgdr.Status.DGDName = dgdName
863
864
865
866
867
868
			return ctrl.Result{}, r.Status().Update(ctx, dgdr)
		}
		r.Recorder.Event(dgdr, corev1.EventTypeWarning, MessageDeploymentCreationFailed, err.Error())
		return ctrl.Result{}, err
	}

869
870
871
872
873
874
875
	delete(dgdr.Annotations, "nvidia.com/generated-dgd-spec")
	if err := r.Update(ctx, dgdr); err != nil {
		// Return the error to force a retry. The DGD was created successfully, so a
		// retry will hit the IsAlreadyExists path above and attempt cleanup again.
		return ctrl.Result{}, fmt.Errorf("failed to remove generated-dgd-spec annotation after DGD creation: %w", err)
	}

876
	// Update status
877
	dgdr.Status.DGDName = dgdName
878

879
	r.Recorder.Event(dgdr, corev1.EventTypeNormal, nvidiacomv1beta1.EventReasonDeploymentCreated,
880
881
882
		fmt.Sprintf(MessageDeploymentCreated, dgdName))

	meta.SetStatusCondition(&dgdr.Status.Conditions, metav1.Condition{
883
		Type:    nvidiacomv1beta1.ConditionTypeDeploymentReady,
884
		Status:  metav1.ConditionFalse,
885
		Reason:  nvidiacomv1beta1.EventReasonDeploymentCreated,
886
887
888
889
890
891
892
893
		Message: fmt.Sprintf("DGD %s created, waiting for Ready", dgdName),
	})

	logger.Info("DynamoGraphDeployment created successfully", "name", dgdName)

	return ctrl.Result{}, r.Status().Update(ctx, dgdr)
}

894
// createAdditionalResources creates ConfigMaps from the profiling output that should be deployed alongside the DGD
895
func (r *DynamoGraphDeploymentRequestReconciler) createAdditionalResources(ctx context.Context, dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest, targetNamespace string) error {
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
	logger := log.FromContext(ctx)

	// Check if there are additional resources stored in annotations
	if dgdr.Annotations == nil {
		return nil
	}

	resourcesYAML, exists := dgdr.Annotations[AnnotationAdditionalResources]
	if !exists || resourcesYAML == "" {
		return nil
	}

	// Parse using standard Kubernetes YAML decoder
	decoder := yaml.NewYAMLOrJSONDecoder(bytes.NewReader([]byte(resourcesYAML)), 4096)
	resourceCount := 0

	for {
		obj := &unstructured.Unstructured{}
		if err := decoder.Decode(obj); err != nil {
			if err == io.EOF {
				break
			}
			logger.Error(err, "Failed to decode resource, skipping")
			continue
		}

		if obj.GetKind() == "" {
			continue
		}

		resourceCount++

		// Only support ConfigMap for now (what profiler actually generates)
		if obj.GetKind() != "ConfigMap" {
			logger.Info("Skipping non-ConfigMap resource from profiling output", "kind", obj.GetKind(), "name", obj.GetName())
			continue
		}

		cm := &corev1.ConfigMap{}
		if err := runtime.DefaultUnstructuredConverter.FromUnstructured(obj.Object, cm); err != nil {
			logger.Error(err, "Failed to convert to ConfigMap", "name", obj.GetName())
			continue
		}

		// Override namespace and add tracking labels
		cm.Namespace = targetNamespace
		if cm.Labels == nil {
			cm.Labels = make(map[string]string)
		}
945
946
947
		cm.Labels[nvidiacomv1beta1.LabelDGDRName] = dgdr.Name
		cm.Labels[nvidiacomv1beta1.LabelDGDRNamespace] = dgdr.Namespace
		cm.Labels[nvidiacomv1beta1.LabelManagedBy] = nvidiacomv1beta1.LabelValueDynamoOperator
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967

		// Create the ConfigMap
		if err := r.Create(ctx, cm); err != nil {
			if apierrors.IsAlreadyExists(err) {
				logger.Info("ConfigMap already exists, skipping", "name", cm.Name)
			} else {
				return fmt.Errorf("failed to create ConfigMap %s: %w", cm.Name, err)
			}
		} else {
			logger.Info("Created ConfigMap from profiling output", "name", cm.Name, "namespace", targetNamespace)
		}
	}

	if resourceCount > 0 {
		logger.Info("Deploying additional resources from profiling output", "count", resourceCount)
	}

	return nil
}

968
969
// handleFailedPhase handles DGDR in Failed phase
func (r *DynamoGraphDeploymentRequestReconciler) handleFailedPhase(ctx context.Context, dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest) (ctrl.Result, error) {
970
	logger := log.FromContext(ctx)
971
	logger.Info("DGDR is in failed phase", "name", dgdr.Name)
972
973
974
975
976

	// Could implement retry logic here if desired
	return ctrl.Result{}, nil
}

977
// getProfilingJobName returns the job name for a DGDR
978
func getProfilingJobName(dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest) string {
979
980
	// Use "profile-" prefix for all profiling jobs
	return fmt.Sprintf("profile-%s", dgdr.Name)
981
982
983
}

// getOutputConfigMapName returns the ConfigMap name for profiling output
984
func getOutputConfigMapName(dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest) string {
985
986
987
	return fmt.Sprintf("%s%s", ConfigMapOutputPrefix, dgdr.Name)
}

988
989
990
// isOnlineProfiling returns true. In v1beta1, the profiler decides online vs AIC
// mode internally based on its config. The controller always uses the same label.
func isOnlineProfiling(_ *nvidiacomv1beta1.DynamoGraphDeploymentRequest) bool {
991
992
	return true
}
993

994
// validateSpec validates the DGDR spec
995
996
997
998
999
1000
1001
1002
1003
1004
1005
func (r *DynamoGraphDeploymentRequestReconciler) validateSpec(ctx context.Context, dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest) error {
	var errs []error

	// Disallow searchStrategy: thorough with backend: auto.
	if dgdr.Spec.SearchStrategy == nvidiacomv1beta1.SearchStrategyThorough &&
		dgdr.Spec.Backend == nvidiacomv1beta1.BackendTypeAuto {
		errs = append(errs, fmt.Errorf(
			"spec.searchStrategy %q is incompatible with spec.backend %q: set spec.backend to a specific backend (sglang, trtllm, or vllm)",
			nvidiacomv1beta1.SearchStrategyThorough,
			nvidiacomv1beta1.BackendTypeAuto,
		))
1006
1007
	}

1008
	// Validate model cache PVC if provided
1009
	if dgdr.Spec.ModelCache != nil && dgdr.Spec.ModelCache.PVCName != "" {
1010
1011
		pvc := &corev1.PersistentVolumeClaim{}
		err := r.Get(ctx, types.NamespacedName{
1012
			Name:      dgdr.Spec.ModelCache.PVCName,
1013
1014
1015
1016
1017
			Namespace: dgdr.Namespace,
		}, pvc)

		if err != nil {
			if apierrors.IsNotFound(err) {
1018
1019
1020
				errs = append(errs, fmt.Errorf(MessageModelCachePVCNotFound, dgdr.Spec.ModelCache.PVCName, dgdr.Namespace))
			} else {
				return err
1021
1022
1023
1024
			}
		}
	}

1025
	if err := r.validateGPUHardwareInfo(ctx, dgdr); err != nil {
1026
		errs = append(errs, err)
1027
1028
	}

1029
	// The profiler will validate the rest of the configuration
1030
	return errors.Join(errs...)
1031
1032
1033
}

// validateGPUHardwareInfo ensures GPU hardware information is available when required for profiling
1034
func (r *DynamoGraphDeploymentRequestReconciler) validateGPUHardwareInfo(ctx context.Context, dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest) error {
1035
1036
	logger := log.FromContext(ctx)

1037
1038
1039
1040
	// Check if user provided hardware info in the typed spec
	hasManualConfig := dgdr.Spec.Hardware != nil && (dgdr.Spec.Hardware.GPUSKU != "" ||
		dgdr.Spec.Hardware.VRAMMB != nil ||
		dgdr.Spec.Hardware.NumGPUsPerNode != nil)
1041

1042
1043
	// If manual config is provided, validation passes
	if hasManualConfig {
1044
1045
1046
		return nil
	}

1047
	isNamespaceScoped := r.Config.Namespace.Restricted != ""
1048
	if isNamespaceScoped {
1049
1050
		return fmt.Errorf(
			"GPU hardware info required but cannot be auto-discovered." +
1051
1052
1053
				"\n\nOptions to resolve:" +
				"\n\n1. Re-enable GPU discovery (if it was disabled during Helm install):" +
				"\n   helm upgrade ... --set dynamo-operator.gpuDiscovery.enabled=true" +
1054
1055
1056
1057
				"\n\n2. Add hardware config to spec.hardware:" +
				"\n   numGpusPerNode: 8" +
				"\n   gpuSku: \"H100-SXM5-80GB\"" +
				"\n   vramMb: 81920")
1058
1059
	}

1060
1061
1062
1063
1064
1065
1066
1067
	_, err := r.GPUDiscovery.DiscoverGPUsFromDCGM(ctx, r.APIReader, r.GPUDiscoveryCache)
	if err == nil {
		// GPU discovery is available, validation passes
		return nil
	}
	// Refine the logger message
	reason := GetGPUDiscoveryFailureReason(err)
	logger.Info("GPU discovery not available", "reason", reason, "error", err.Error())
1068
	return fmt.Errorf("GPU hardware info required but auto-discovery failed. Add spec.hardware.gpuSku, spec.hardware.vramMb, spec.hardware.numGpusPerNode")
1069
1070
}

1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
// GetGPUDiscoveryFailureReason classifies a GPU discovery error and
// returns a stable, actionable reason string suitable for structured logging.
//
// The classification is based on known error message patterns produced during:
//   - DCGM exporter pod discovery
//   - Helm-based GPU operator and DCGM discovery
//   - Metrics scraping
//   - Prometheus parsing
//
// If the error does not match any known category, "unknown" is returned.
func GetGPUDiscoveryFailureReason(err error) string {
	if err == nil {
		return "unknown"
	}
	errMsg := strings.ToLower(err.Error())

	switch {
	case strings.Contains(errMsg, "list pods"):
		return "failed to list DCGM exporter pods (RBAC/cluster connectivity issue)"
	case strings.Contains(errMsg, "gpu operator is not installed"):
		return "GPU Operator not installed in expected namespace"
	case strings.Contains(errMsg, "helm init failed"):
		return "failed to initialize Helm client (RBAC, kubeconfig, or Helm driver issue)"
	case strings.Contains(errMsg, "timeout waiting for dcgm exporter pods"):
		return "timeout while waiting for DCGM exporter pods to become ready"
	case strings.Contains(errMsg, "http get"):
		return "failed to reach DCGM metrics endpoint on pod (network/port issue)"
	case strings.Contains(errMsg, "metrics endpoint") &&
		strings.Contains(errMsg, "status"):
		return "DCGM pod metrics endpoint returned non-200 status"
	case strings.Contains(errMsg, "parse prometheus metrics"):
		return "failed to parse dcgm Prometheus metrics (invalid format)"
	case strings.Contains(errMsg, "no gpus detected"):
		return "no GPUs detected in dcgm metrics (GPU model or metrics missing)"
	case strings.Contains(errMsg, "dcgm is not enabled in the GPU Operator"):
		return "DCGM is not enabled in the GPU Operator (check GPU Operator configuration and permissions)"
	case strings.Contains(errMsg, "failed to scrape any dcgm exporter pod"):
		return "failed to scrape any dcgm exporter pod (check DCGM exporter pod status and network connectivity)"
	case strings.Contains(errMsg, "no gpu metrics could be parsed from any dcgm pod"):
		return "no GPU metrics could be parsed from any DCGM pod (check DCGM exporter pod status and network connectivity)"
	case strings.Contains(errMsg, "failed to create helm path"):
		return "failed to initialize Helm client (RBAC, kubeconfig, or Helm driver issue)"
	}
	return "unknown"
}

1117
// createProfilingJob creates a Kubernetes Job for profiling using SyncResource
1118
func (r *DynamoGraphDeploymentRequestReconciler) createProfilingJob(ctx context.Context, dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest) error {
1119
1120
	logger := log.FromContext(ctx)

1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
	// Delete any existing output ConfigMap to ensure fresh profiling results
	// This prevents using stale data from previous profiling runs
	outputConfigMapName := getOutputConfigMapName(dgdr)
	existingCM := &corev1.ConfigMap{}
	err := r.Get(ctx, types.NamespacedName{
		Name:      outputConfigMapName,
		Namespace: dgdr.Namespace,
	}, existingCM)
	if err == nil {
		// ConfigMap exists, delete it
		logger.Info("Deleting existing output ConfigMap to ensure fresh profiling results", "configMap", outputConfigMapName)
		if err := r.Delete(ctx, existingCM); err != nil && !apierrors.IsNotFound(err) {
			logger.Error(err, "Failed to delete existing output ConfigMap", "configMap", outputConfigMapName)
			return fmt.Errorf("failed to delete existing output ConfigMap: %w", err)
		}
		logger.Info("Successfully deleted old output ConfigMap", "configMap", outputConfigMapName)
	} else if !apierrors.IsNotFound(err) {
		// Unexpected error checking for ConfigMap
		logger.Error(err, "Failed to check for existing output ConfigMap", "configMap", outputConfigMapName)
		return fmt.Errorf("failed to check for existing output ConfigMap: %w", err)
	}

	// Ensure profiling job RBAC exists (only for cluster-wide installation)
1144
	if r.Config.Namespace.Restricted == "" {
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
		if err := r.RBACManager.EnsureServiceAccountWithRBAC(
			ctx,
			dgdr.Namespace,
			ServiceAccountProfilingJob,
			r.Config.RBAC.DGDRProfilingClusterRoleName,
		); err != nil {
			logger.Error(err, "Failed to ensure profiling job RBAC")
			return fmt.Errorf("failed to ensure profiling job RBAC: %w", err)
		}
	}

1156
1157
1158
1159
	// Enrich hardware from GPU discovery before marshalling the spec.
	// This fills in gpuSku, vramMb, numGpusPerNode if the user didn't set them.
	if err := r.enrichHardwareFromDiscovery(ctx, dgdr); err != nil {
		logger.Info("GPU discovery not available, proceeding without enrichment", "reason", err.Error())
1160
1161
	}

1162
1163
1164
1165
1166
	// Use SyncResource to create/update the job
	modified, job, err := commonController.SyncResource(ctx, r, dgdr, func(ctx context.Context) (*batchv1.Job, bool, error) {
		jobName := getProfilingJobName(dgdr)
		outputConfigMapName := getOutputConfigMapName(dgdr)

1167
1168
		// Marshal the DGDR spec to JSON — the profiler receives the spec verbatim
		specJSON, err := marshalDGDRSpec(dgdr)
1169
		if err != nil {
1170
			return nil, false, err
1171
		}
1172
1173

		// Common environment variables
1174
		profilerEnv := []corev1.EnvVar{
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
			{
				Name: "HUGGING_FACE_HUB_TOKEN",
				ValueFrom: &corev1.EnvVarSource{
					SecretKeyRef: &corev1.SecretKeySelector{
						LocalObjectReference: corev1.LocalObjectReference{
							Name: "hf-token-secret",
						},
						Key: "HF_TOKEN",
					},
				},
			},
			{
				Name:  "NATS_SERVER",
				Value: fmt.Sprintf("nats://%s-nats:4222", dgdr.Namespace),
			},
			{
				Name:  "ETCD_ENDPOINTS",
				Value: fmt.Sprintf("%s-etcd:2379", dgdr.Namespace),
			},
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
			// DGDR metadata for setting ownerReferences
			{
				Name:  "DGDR_NAME",
				Value: dgdr.Name,
			},
			{
				Name:  "DGDR_NAMESPACE",
				Value: dgdr.Namespace,
			},
			{
				Name:  "DGDR_UID",
				Value: string(dgdr.UID),
			},
1207
1208
		}

1209
		// Build volume mounts
1210
1211
1212
1213
1214
1215
1216
		volumeMounts := []corev1.VolumeMount{
			{
				Name:      VolumeNameProfilingOutput,
				MountPath: ProfilingOutputPath,
			},
		}

1217
		// Add model cache PVC mount if configured
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
		modelCachePVC, modelCacheMountPath := extractModelCachePVCConfig(dgdr)
		if modelCachePVC != "" {
			logger.Info("Mounting model cache PVC to profiler pod", "pvc", modelCachePVC, "mountPath", modelCacheMountPath)
			volumeMounts = append(volumeMounts, corev1.VolumeMount{
				Name:      VolumeNameModelCache,
				MountPath: modelCacheMountPath,
				ReadOnly:  true,
			})
		}

1228
1229
1230
1231
1232
1233
1234
1235
		// v1alpha1 round-trip: mount ConfigMap if referenced via annotation
		cmRef := configMapRefFromAnnotation(dgdr)
		if cmRef != nil {
			volumeMounts = append(volumeMounts, corev1.VolumeMount{
				Name:      VolumeNameProfilingConfig,
				MountPath: ProfilingConfigMountPath,
				ReadOnly:  true,
			})
1236
1237
		}

1238
		// Profiler args: pass the DGDR spec as JSON via --config
1239
1240
		// --output-dir must match ProfilingOutputPath so the sidecar can find profiler_status.yaml
		profilerArgs := []string{"--config", specJSON, "--output-dir", ProfilingOutputPath}
1241

1242
1243
		// Use image from spec; the defaulting webhook fills this in for production builds.
		// Guard against empty image in case the webhook didn't run (e.g. local dev builds).
1244
		imageName := dgdr.Spec.Image
1245
1246
1247
		if imageName == "" {
			return nil, false, fmt.Errorf("spec.image is required but not set; ensure the defaulting webhook ran or set spec.image explicitly")
		}
1248
1249
		logger.Info("Using profiler image", "image", imageName)

1250
		profilerContainer := corev1.Container{
1251
1252
			Name:         ContainerNameProfiler,
			Image:        imageName,
1253
			Command:      []string{"python", "-m", "dynamo.profiler"},
1254
			Args:         profilerArgs,
1255
1256
			Env:          profilerEnv,
			VolumeMounts: volumeMounts,
1257
			WorkingDir:   "/workspace",
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
		}

		// Generate sidecar script from template
		tmpl, err := template.New("sidecar").Parse(sidecarScriptTemplate)
		if err != nil {
			return nil, false, fmt.Errorf("failed to parse sidecar script template: %w", err)
		}

		var scriptBuf bytes.Buffer
		err = tmpl.Execute(&scriptBuf, map[string]string{
1268
1269
1270
1271
1272
			"OutputPath":    ProfilingOutputPath,
			"OutputFile":    ProfilingOutputFile,
			"ConfigMapName": outputConfigMapName,
			"Namespace":     dgdr.Namespace,
			"DGDRName":      dgdr.Name,
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
		})
		if err != nil {
			return nil, false, fmt.Errorf("failed to execute sidecar script template: %w", err)
		}

		sidecarContainer := corev1.Container{
			Name:    ContainerNameOutputCopier,
			Image:   SidecarImage,
			Command: []string{"/bin/sh", "-c"},
			Args:    []string{scriptBuf.String()},
			VolumeMounts: []corev1.VolumeMount{{
				Name:      VolumeNameProfilingOutput,
				MountPath: ProfilingOutputPath,
				ReadOnly:  true,
			}},
		}

1290
1291
		// Use PVC for profiling output if round-tripped v1alpha1 annotation is present,
		// otherwise use emptyDir (v1beta1 default).
1292
		var profilingOutputVolume corev1.Volume
1293
1294
		if outputPVC := outputPVCFromAnnotation(dgdr); outputPVC != "" {
			logger.Info("Using PVC for profiling output (from v1alpha1 annotation)", "pvc", outputPVC)
1295
1296
1297
1298
			profilingOutputVolume = corev1.Volume{
				Name: VolumeNameProfilingOutput,
				VolumeSource: corev1.VolumeSource{
					PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{
1299
						ClaimName: outputPVC,
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
					},
				},
			}
		} else {
			profilingOutputVolume = corev1.Volume{
				Name: VolumeNameProfilingOutput,
				VolumeSource: corev1.VolumeSource{
					EmptyDir: &corev1.EmptyDirVolumeSource{},
				},
			}
		}
		volumes := []corev1.Volume{profilingOutputVolume}
1312

1313
1314
		// Add model cache PVC volume if configured
		if modelCachePVC != "" {
1315
			volumes = append(volumes, corev1.Volume{
1316
				Name: VolumeNameModelCache,
1317
				VolumeSource: corev1.VolumeSource{
1318
1319
1320
					PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{
						ClaimName: modelCachePVC,
						ReadOnly:  true,
1321
1322
1323
1324
1325
					},
				},
			})
		}

1326
1327
1328
1329
1330
1331
		// v1alpha1 round-trip: add ConfigMap volume if referenced via annotation
		if cmRef != nil {
			cmKey := cmRef.Key
			if cmKey == "" {
				cmKey = ProfilingConfigDefaultKey
			}
1332
			volumes = append(volumes, corev1.Volume{
1333
				Name: VolumeNameProfilingConfig,
1334
				VolumeSource: corev1.VolumeSource{
1335
1336
1337
1338
1339
1340
1341
1342
					ConfigMap: &corev1.ConfigMapVolumeSource{
						LocalObjectReference: corev1.LocalObjectReference{
							Name: cmRef.Name,
						},
						Items: []corev1.KeyToPath{{
							Key:  cmKey,
							Path: ProfilingConfigDefaultKey,
						}},
1343
1344
1345
1346
1347
					},
				},
			})
		}

1348
1349
1350
		// Limit retries to prevent infinite loop
		backoffLimit := int32(3)

1351
1352
1353
1354
		podSpec := corev1.PodSpec{
			ServiceAccountName: ServiceAccountProfilingJob,
			RestartPolicy:      corev1.RestartPolicyNever,
			SecurityContext: &corev1.PodSecurityContext{
1355
1356
1357
1358
				RunAsNonRoot: ptr.To(true),
				RunAsUser:    ptr.To[int64](1000),
				RunAsGroup:   ptr.To[int64](1000),
				FSGroup:      ptr.To[int64](1000),
1359
1360
1361
1362
1363
1364
1365
1366
			},
			Containers: []corev1.Container{profilerContainer, sidecarContainer},
			Volumes:    volumes,
			ImagePullSecrets: []corev1.LocalObjectReference{
				{Name: "nvcr-imagepullsecret"},
			},
		}

1367
1368
1369
1370
1371
		job := &batchv1.Job{
			ObjectMeta: metav1.ObjectMeta{
				Name:      jobName,
				Namespace: dgdr.Namespace,
				Labels: map[string]string{
1372
1373
1374
					nvidiacomv1beta1.LabelApp:       nvidiacomv1beta1.LabelValueDynamoProfiler,
					nvidiacomv1beta1.LabelDGDR:      dgdr.Name,
					nvidiacomv1beta1.LabelManagedBy: nvidiacomv1beta1.LabelValueDynamoOperator,
1375
1376
1377
1378
1379
				},
			},
			Spec: batchv1.JobSpec{
				BackoffLimit: &backoffLimit,
				Template: corev1.PodTemplateSpec{
1380
					Spec: podSpec,
1381
1382
1383
1384
				},
			},
		}

1385
1386
1387
1388
1389
		var jobOverrides *batchv1.JobSpec
		if dgdr.Spec.Overrides != nil {
			jobOverrides = dgdr.Spec.Overrides.ProfilingJob
		}
		applyProfilingJobOverrides(job, jobOverrides)
1390

1391
1392
1393
1394
1395
1396
1397
1398
		return job, false, nil
	})

	if err != nil {
		return err
	}

	if modified {
1399
		logger.Info("Profiling job created/updated", "job", job.Name)
1400
1401
	}

1402
1403
1404
	// Store the job name in status for observability
	dgdr.Status.ProfilingJobName = job.Name

1405
1406
1407
	return nil
}

1408
1409
1410
1411
1412
1413
// marshalDGDRSpec produces the JSON string passed to the profiler via --config.
// The profiler receives the DGDR spec verbatim — no bespoke key mapping needed.
func marshalDGDRSpec(dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest) (string, error) {
	specJSON, err := json.Marshal(dgdr.Spec)
	if err != nil {
		return "", fmt.Errorf("failed to marshal DGDR spec to JSON: %w", err)
1414
	}
1415
1416
	return string(specJSON), nil
}
1417

1418
1419
1420
1421
1422
1423
1424
// enrichHardwareFromDiscovery fills in hardware fields that the user didn't set.
// Called before marshalDGDRSpec(). Mutates dgdr.Spec.Hardware in-place (memory only, not persisted).
func (r *DynamoGraphDeploymentRequestReconciler) enrichHardwareFromDiscovery(ctx context.Context, dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest) error {
	if dgdr.Spec.Hardware == nil {
		dgdr.Spec.Hardware = &nvidiacomv1beta1.HardwareSpec{}
	}
	hw := dgdr.Spec.Hardware
1425

1426
	if hw.GPUSKU != "" && hw.VRAMMB != nil && hw.NumGPUsPerNode != nil {
hhzhang16's avatar
hhzhang16 committed
1427
		return nil // all fields already set by user; TotalGPUs is filled below when discovery runs
1428
1429
	}

1430
	var gpuInfo *gpu.GPUInfo
1431
	logger := log.FromContext(ctx)
1432
1433
1434
1435
1436
	// Check if user provided hardware info in the typed spec
	hasManualConfig := dgdr.Spec.Hardware != nil && (dgdr.Spec.Hardware.GPUSKU != "" ||
		dgdr.Spec.Hardware.VRAMMB != nil ||
		dgdr.Spec.Hardware.NumGPUsPerNode != nil)
	if !hasManualConfig {
1437

1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
		logger.Info("Attempting GPU discovery for profiling job")
		discoveredInfo, err := r.GPUDiscovery.DiscoverGPUsFromDCGM(ctx, r.APIReader, r.GPUDiscoveryCache)
		if err != nil {
			// This path is expected for namespace-restricted operators without node read permissions
			// Refine the logger message
			reason := GetGPUDiscoveryFailureReason(err)
			logger.Info("GPU discovery not available, using manual hardware configuration from profiling config",
				"reason", reason, "error", err.Error())
			return err
		} else {
			gpuInfo = discoveredInfo
			logger.Info("GPU discovery completed successfully",
				"gpusPerNode", gpuInfo.GPUsPerNode,
				"nodesWithGPUs", gpuInfo.NodesWithGPUs,
				"totalGpus", gpuInfo.GPUsPerNode*gpuInfo.NodesWithGPUs,
				"model", gpuInfo.Model,
				"vramMiB", gpuInfo.VRAMPerGPU,
				"system", gpuInfo.System,
				"cloudprovider", gpuInfo.CloudProvider)
		}
	}
1459
	if hw.GPUSKU == "" {
1460
1461
1462
1463
1464
1465
		if gpuInfo.System != "" {
			hw.GPUSKU = gpuInfo.System
		} else {
			// Unknown GPU type: use raw model name; profiler will attempt naive config generation.
			hw.GPUSKU = nvidiacomv1beta1.GPUSKUType(gpuInfo.Model)
		}
1466
1467
1468
1469
	}
	if hw.VRAMMB == nil {
		vram := float64(gpuInfo.VRAMPerGPU)
		hw.VRAMMB = &vram
1470
	}
1471
1472
1473
1474
	if hw.NumGPUsPerNode == nil {
		n := int32(gpuInfo.GPUsPerNode)
		hw.NumGPUsPerNode = &n
	}
hhzhang16's avatar
hhzhang16 committed
1475
	if hw.TotalGPUs == nil {
1476
1477
1478
		// TODO: This is a temporary limit to prevent the profiler from using too many GPUs.
		// Will be removed once a fix is in the Profiler/AIC.
		const defaultMaxAutoGPUs = int32(32)
hhzhang16's avatar
hhzhang16 committed
1479
		total := int32(gpuInfo.GPUsPerNode * gpuInfo.NodesWithGPUs)
1480
1481
1482
1483
1484
		if total > defaultMaxAutoGPUs {
			logger.Info("Capping auto-discovered TotalGPUs at default limit; set hardware.totalGpus to override",
				"discovered", total, "cap", defaultMaxAutoGPUs)
			total = defaultMaxAutoGPUs
		}
hhzhang16's avatar
hhzhang16 committed
1485
1486
		hw.TotalGPUs = &total
	}
1487
1488
	return nil
}
1489

1490
1491
1492
1493
// extractModelCachePVCConfig reads model cache PVC settings from the typed v1beta1 spec.
// Returns (pvcName, mountPath) — both empty if not configured.
func extractModelCachePVCConfig(dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest) (string, string) {
	if dgdr.Spec.ModelCache == nil || dgdr.Spec.ModelCache.PVCName == "" {
1494
1495
		return "", ""
	}
1496
	mountPath := dgdr.Spec.ModelCache.PVCMountPath
1497
1498
1499
	if mountPath == "" {
		mountPath = DefaultModelCacheMountPath
	}
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
	return dgdr.Spec.ModelCache.PVCName, mountPath
}

// configMapKeySelector mirrors v1alpha1.ConfigMapKeySelector for annotation deserialization.
type configMapKeySelector struct {
	Name string `json:"name"`
	Key  string `json:"key,omitempty"`
}

// configMapRefFromAnnotation reads the ConfigMap reference from the round-trip annotation.
// Returns nil for native v1beta1 resources (no annotation present).
func configMapRefFromAnnotation(dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest) *configMapKeySelector {
	if dgdr.Annotations == nil {
		return nil
	}
	raw, ok := dgdr.Annotations[AnnotationConfigMapRef]
	if !ok || raw == "" {
		return nil
	}
	var ref configMapKeySelector
	if err := json.Unmarshal([]byte(raw), &ref); err != nil {
		return nil
	}
	return &ref
}
1525

1526
1527
1528
1529
1530
1531
1532
// outputPVCFromAnnotation reads the output PVC name from the round-trip annotation.
// Returns "" for native v1beta1 resources (always emptyDir).
func outputPVCFromAnnotation(dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest) string {
	if dgdr.Annotations == nil {
		return ""
	}
	return dgdr.Annotations[AnnotationOutputPVC]
1533
1534
}

1535
// checkProfilingJobStatus checks if the profiling job has completed
1536
func (r *DynamoGraphDeploymentRequestReconciler) checkProfilingJobStatus(ctx context.Context, dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest) (bool, error) {
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
	logger := log.FromContext(ctx)
	jobName := getProfilingJobName(dgdr)

	job := &batchv1.Job{}
	if err := r.Get(ctx, types.NamespacedName{Name: jobName, Namespace: dgdr.Namespace}, job); err != nil {
		return false, err
	}

	// Check job conditions
	for _, condition := range job.Status.Conditions {
		if condition.Type == batchv1.JobComplete && condition.Status == corev1.ConditionTrue {
			logger.Info("Profiling job completed", "job", jobName)
			return true, nil
		}
		if condition.Type == batchv1.JobFailed && condition.Status == corev1.ConditionTrue {
1552
1553
1554
1555
1556
			// Get detailed error from pod logs
			detailedError := r.getProfilingJobErrorDetails(ctx, dgdr, job)
			if detailedError != "" {
				return false, fmt.Errorf("profiling job failed: %s. Details: %s", condition.Message, detailedError)
			}
1557
1558
1559
1560
1561
1562
1563
			return false, fmt.Errorf("profiling job failed: %s", condition.Message)
		}
	}

	return false, nil
}

1564
// getProfilingJobErrorDetails retrieves detailed error information from failed profiling job pods
1565
func (r *DynamoGraphDeploymentRequestReconciler) getProfilingJobErrorDetails(ctx context.Context, dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest, job *batchv1.Job) string {
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
	logger := log.FromContext(ctx)

	// List pods owned by this job
	podList := &corev1.PodList{}
	labelSelector := client.MatchingLabels{
		"job-name": job.Name,
	}

	if err := r.List(ctx, podList, client.InNamespace(dgdr.Namespace), labelSelector); err != nil {
		logger.Error(err, "Failed to list pods for profiling job")
		return ""
	}

	// Look for failed pods and extract error details
	for _, pod := range podList.Items {
		// Check pod phase and container statuses
		if pod.Status.Phase == corev1.PodFailed {
			// Get profiler container status (first container)
			for _, containerStatus := range pod.Status.ContainerStatuses {
				if containerStatus.Name == ContainerNameProfiler && containerStatus.State.Terminated != nil {
					terminated := containerStatus.State.Terminated
					// Construct detailed error message
					errorMsg := fmt.Sprintf("Pod: %s, Container: %s, ExitCode: %d, Reason: %s",
						pod.Name, containerStatus.Name, terminated.ExitCode, terminated.Reason)
					if terminated.Message != "" {
						errorMsg += fmt.Sprintf(", Message: %s", terminated.Message)
					}
					logger.Info("Retrieved profiling job error details", "error", errorMsg)
					return errorMsg
				}
			}

			// If no terminated state found, check waiting state
			for _, containerStatus := range pod.Status.ContainerStatuses {
				if containerStatus.Name == ContainerNameProfiler && containerStatus.State.Waiting != nil {
					waiting := containerStatus.State.Waiting
					errorMsg := fmt.Sprintf("Pod: %s, Container: %s, Waiting - Reason: %s, Message: %s",
						pod.Name, containerStatus.Name, waiting.Reason, waiting.Message)
					logger.Info("Retrieved profiling job waiting details", "error", errorMsg)
					return errorMsg
				}
			}
		}
	}

	return ""
}

1614
1615
1616
1617
// generateDGDSpec reads profiling output from the sidecar ConfigMap, extracts the
// DynamoGraphDeployment spec and pareto configs, stores the spec in an annotation via
// r.Update, and returns the ProfilingResultsStatus and DGD name.
func (r *DynamoGraphDeploymentRequestReconciler) generateDGDSpec(ctx context.Context, dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest) (*nvidiacomv1beta1.ProfilingResultsStatus, string, error) {
1618
	logger := log.FromContext(ctx)
1619
	logger.Info("Generating DGD spec from profiling results", "name", dgdr.Name, "backend", dgdr.Spec.Backend)
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630

	// Read the generated spec from ConfigMap (created by sidecar)
	outputConfigMapName := getOutputConfigMapName(dgdr)
	cm := &corev1.ConfigMap{}
	err := r.Get(ctx, types.NamespacedName{
		Name:      outputConfigMapName,
		Namespace: dgdr.Namespace,
	}, cm)

	if err != nil {
		if apierrors.IsNotFound(err) {
1631
			return nil, "", fmt.Errorf("output ConfigMap %s not found - profiling may not have completed yet", outputConfigMapName)
1632
		}
1633
		return nil, "", fmt.Errorf("failed to get output ConfigMap: %w", err)
1634
1635
	}

1636
	// Select the right config file based on mocker feature flag
1637
1638
	// Profiler writes the selected config (real or mocker) to a single output file
	outputFile := ProfilingOutputFile
1639

1640
	// Get YAML content from ConfigMap
1641
	yamlContent, exists := cm.Data[outputFile]
1642
	if !exists {
1643
		return nil, "", fmt.Errorf("key %s not found in ConfigMap %s", outputFile, outputConfigMapName)
1644
1645
	}

1646
	logger.Info("Found profiling output in ConfigMap", "configMap", outputConfigMapName, "outputFile", outputFile, "size", len(yamlContent))
1647

1648
1649
1650
	// Extract DGD and any supporting resources from potentially multi-document YAML (ConfigMap + DGD)
	dgd, additionalResources, err := r.extractResourcesFromYAML([]byte(yamlContent))
	if err != nil {
1651
		return nil, "", fmt.Errorf("failed to extract DGD from %s: %w", outputFile, err)
1652
1653
	}

1654
1655
1656
1657
1658
	logger.Info("Parsed profiling output", "dgdName", dgd.Name, "additionalResources", len(additionalResources))

	if len(additionalResources) > 0 {
		if err := r.storeAdditionalResources(ctx, dgdr, additionalResources); err != nil {
			logger.Error(err, "Failed to store additional resources")
1659
			return nil, "", err
1660
		}
1661
1662
		// storeAdditionalResources calls r.Update internally, bumping resourceVersion.
		// Refetch so the subsequent r.Update for the spec annotation doesn't 409.
1663
		if err := r.Get(ctx, types.NamespacedName{Name: dgdr.Name, Namespace: dgdr.Namespace}, dgdr); err != nil {
1664
			return nil, "", fmt.Errorf("failed to refetch DGDR after storing additional resources: %w", err)
1665
1666
		}
	}
1667

1668
1669
1670
1671
1672
1673
1674
1675
1676
1677
	profilingResults := &nvidiacomv1beta1.ProfilingResultsStatus{}
	if webUIData, ok := cm.Data["webui_data.json"]; ok {
		pareto, err := extractParetoFromWebUIData([]byte(webUIData))
		if err != nil {
			logger.Error(err, "Failed to parse webui_data.json; skipping pareto population")
		} else {
			profilingResults.Pareto = pareto
			logger.Info("Populated ProfilingResults.Pareto", "count", len(pareto))
		}
	}
1678

1679
	// Store the generated DGD in ProfilingResults.SelectedConfig
1680
1681
	dgdJSON, err := json.Marshal(dgd)
	if err != nil {
1682
		return nil, "", fmt.Errorf("failed to marshal generated DGD to JSON: %w", err)
1683
	}
1684
	profilingResults.SelectedConfig = &runtime.RawExtension{Raw: dgdJSON}
1685
1686
1687
1688

	// Serialize the DGD spec to an annotation so createDGD can retrieve it
	dgdBytes, err := sigsyaml.Marshal(dgd)
	if err != nil {
1689
		return nil, "", fmt.Errorf("failed to marshal generated DGD: %w", err)
1690
1691
1692
1693
1694
1695
1696
	}
	if dgdr.Annotations == nil {
		dgdr.Annotations = make(map[string]string)
	}
	dgdr.Annotations["nvidia.com/generated-dgd-spec"] = string(dgdBytes)

	if err := r.Update(ctx, dgdr); err != nil {
1697
1698
1699
1700
1701
1702
1703
1704
1705
1706
1707
1708
1709
1710
1711
1712
1713
1714
		return nil, "", fmt.Errorf("failed to update DGDR with generated DGD annotation: %w", err)
	}
	return profilingResults, dgd.Name, nil
}

// extractParetoFromWebUIData parses webui_data.json and returns all Pareto-optimal
// deployment configurations from the cost table. Each row's last column ("Action")
// is a partial DynamoGraphDeployment YAML snippet.
func extractParetoFromWebUIData(data []byte) ([]nvidiacomv1beta1.ParetoConfig, error) {
	var parsed struct {
		Cost struct {
			Table struct {
				Data [][]json.RawMessage `json:"data"`
			} `json:"table"`
		} `json:"cost"`
	}
	if err := json.Unmarshal(data, &parsed); err != nil {
		return nil, fmt.Errorf("failed to unmarshal webui_data.json: %w", err)
1715
1716
	}

1717
1718
1719
1720
1721
1722
1723
1724
1725
1726
1727
1728
1729
1730
1731
1732
1733
1734
1735
1736
1737
1738
1739
1740
1741
1742
1743
1744
1745
1746
1747
1748
1749
1750
1751
1752
1753
	rows := parsed.Cost.Table.Data
	if len(rows) == 0 {
		return nil, nil
	}

	// Schema: [TTFT(ms), PrefillThpt, ITL(ms), DecodeThpt, TokensPerUser, GPUHours, ActionYAML]
	const minColumns = 7
	const actionColumnIndex = 6

	pareto := make([]nvidiacomv1beta1.ParetoConfig, 0, len(rows))
	for _, row := range rows {
		if len(row) < minColumns {
			continue
		}

		var actionYAML string
		if err := json.Unmarshal(row[actionColumnIndex], &actionYAML); err != nil {
			continue
		}

		var configObj map[string]interface{}
		if err := sigsyaml.Unmarshal([]byte(stripYAMLComments(actionYAML)), &configObj); err != nil {
			continue
		}

		if len(configObj) == 0 {
			continue
		}

		configJSON, err := json.Marshal(configObj)
		if err != nil {
			continue
		}

		pareto = append(pareto, nvidiacomv1beta1.ParetoConfig{
			Config: runtime.RawExtension{Raw: configJSON},
		})
1754
1755
	}

1756
1757
1758
1759
1760
1761
1762
1763
1764
1765
1766
1767
1768
1769
	return pareto, nil
}

// stripYAMLComments removes comment lines (lines whose first non-whitespace character
// is '#') from a YAML string. The profiler prefixes action snippets with comment lines.
func stripYAMLComments(s string) string {
	lines := strings.Split(s, "\n")
	out := lines[:0] // reuse backing array; write index always <= range read index
	for _, line := range lines {
		if !strings.HasPrefix(strings.TrimLeft(line, " \t"), "#") {
			out = append(out, line)
		}
	}
	return strings.Join(out, "\n")
1770
1771
}

1772
1773
// storeAdditionalResources marshals additional resources to YAML and stores them in DGDR annotations.
// Validates annotation size and fails gracefully if too large.
1774
func (r *DynamoGraphDeploymentRequestReconciler) storeAdditionalResources(ctx context.Context, dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest, resources []*unstructured.Unstructured) error {
1775
1776
1777
1778
1779
1780
1781
1782
1783
1784
1785
1786
1787
1788
1789
1790
1791
1792
1793
1794
1795
1796
1797
1798
1799
1800
1801
1802
1803
1804
1805
1806
1807
1808
	if len(resources) == 0 {
		return nil
	}

	var resourcesYAML []byte

	for i, res := range resources {
		resYAML, err := sigsyaml.Marshal(res.Object)
		if err != nil {
			return fmt.Errorf("failed to marshal resource %s/%s: %w", res.GetKind(), res.GetName(), err)
		}
		if i > 0 {
			resourcesYAML = append(resourcesYAML, []byte("\n---\n")...)
		}
		resourcesYAML = append(resourcesYAML, resYAML...)
	}

	// Validate size before storing
	if len(resourcesYAML) > MaxAnnotationSize {
		return fmt.Errorf("additional resources YAML size (%d bytes) exceeds maximum annotation size (%d bytes); "+
			"consider reducing the number of resources or storing them separately",
			len(resourcesYAML), MaxAnnotationSize)
	}

	if dgdr.Annotations == nil {
		dgdr.Annotations = make(map[string]string)
	}
	dgdr.Annotations[AnnotationAdditionalResources] = string(resourcesYAML)

	return r.Update(ctx, dgdr)
}

// extractResourcesFromYAML parses multi-document YAML from profiling output,
// extracting the DynamoGraphDeployment and any ConfigMaps that should be deployed with it.
1809
func (r *DynamoGraphDeploymentRequestReconciler) extractResourcesFromYAML(yamlContent []byte) (*dgdv1alpha1.DynamoGraphDeployment, []*unstructured.Unstructured, error) {
1810
1811
	decoder := yaml.NewYAMLOrJSONDecoder(bytes.NewReader(yamlContent), 4096)

1812
	var dgd *dgdv1alpha1.DynamoGraphDeployment
1813
1814
1815
1816
1817
1818
1819
1820
1821
1822
1823
1824
1825
1826
1827
1828
1829
1830
	var additionalResources []*unstructured.Unstructured

	for {
		obj := &unstructured.Unstructured{}
		if err := decoder.Decode(obj); err != nil {
			if err == io.EOF {
				break
			}
			// Skip invalid documents and continue
			continue
		}

		// Skip empty objects
		if obj.GetKind() == "" {
			continue
		}

		if obj.GetKind() == "DynamoGraphDeployment" {
1831
			dgd = &dgdv1alpha1.DynamoGraphDeployment{}
1832
1833
1834
1835
1836
1837
1838
1839
1840
1841
1842
1843
1844
1845
1846
1847
1848
			if err := runtime.DefaultUnstructuredConverter.FromUnstructured(obj.Object, dgd); err != nil {
				return nil, nil, fmt.Errorf("failed to convert to DynamoGraphDeployment: %w", err)
			}
		} else {
			// Store ConfigMaps or other resources for deployment
			additionalResources = append(additionalResources, obj)
		}
	}

	if dgd == nil {
		return nil, nil, fmt.Errorf("no DynamoGraphDeployment found in YAML content")
	}

	return dgd, additionalResources, nil
}

// extractDGDFromYAML is a convenience wrapper that extracts only the DGD (used by tests)
1849
func (r *DynamoGraphDeploymentRequestReconciler) extractDGDFromYAML(yamlContent []byte) (*dgdv1alpha1.DynamoGraphDeployment, error) {
1850
1851
1852
1853
	dgd, _, err := r.extractResourcesFromYAML(yamlContent)
	return dgd, err
}

1854
1855
1856
1857
1858
1859
1860
1861
1862
1863
1864
1865
1866
1867
1868
1869
1870
1871
1872
1873
1874
1875
1876
1877
1878
1879
1880
1881
1882
1883
1884
1885
1886
1887
1888
1889
1890
1891
1892
1893
1894
1895
1896
1897
1898
1899
1900
1901
1902
1903
1904
1905
1906
1907
1908
1909
1910
1911
1912
1913
1914
// updateDeploymentInfo populates status.deploymentInfo from DGD service replica counts.
func updateDeploymentInfo(dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest, dgd *dgdv1alpha1.DynamoGraphDeployment) bool {
	var totalReplicas, totalAvailable int32
	for _, svc := range dgd.Status.Services {
		totalReplicas += svc.Replicas
		if svc.AvailableReplicas != nil {
			totalAvailable += *svc.AvailableReplicas
		}
	}

	// Short-circuit if nothing changed
	if cur := dgdr.Status.DeploymentInfo; cur != nil &&
		cur.Replicas != nil && *cur.Replicas == totalReplicas &&
		cur.AvailableReplicas != nil && *cur.AvailableReplicas == totalAvailable {
		return false
	}

	dgdr.Status.DeploymentInfo = &nvidiacomv1beta1.DeploymentInfoStatus{
		Replicas:          &totalReplicas,
		AvailableReplicas: &totalAvailable,
	}
	return true
}

// setSucceededCondition sets the aggregate Succeeded condition based on the current phase.
func setSucceededCondition(dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest, phase nvidiacomv1beta1.DGDRPhase) {
	var status metav1.ConditionStatus
	var reason, message string

	switch phase {
	case nvidiacomv1beta1.DGDRPhasePending, "":
		status, reason, message = metav1.ConditionFalse, "Pending", "DGDR is pending"
	case nvidiacomv1beta1.DGDRPhaseProfiling:
		status, reason, message = metav1.ConditionFalse, "Profiling", "Profiling is in progress"
	case nvidiacomv1beta1.DGDRPhaseReady:
		status, reason, message = metav1.ConditionTrue, "SpecGenerated", "Profiling complete, spec available"
	case nvidiacomv1beta1.DGDRPhaseDeploying:
		status, reason, message = metav1.ConditionFalse, "Deploying", "Deployment is in progress"
	case nvidiacomv1beta1.DGDRPhaseDeployed:
		status, reason, message = metav1.ConditionTrue, "Deployed", "Deployment is healthy"
	case nvidiacomv1beta1.DGDRPhaseFailed:
		status, reason, message = metav1.ConditionFalse, "Failed", "DGDR has failed"
	default:
		status, reason, message = metav1.ConditionFalse, "Unknown", "Unknown phase"
	}

	meta.SetStatusCondition(&dgdr.Status.Conditions, metav1.Condition{
		Type:               nvidiacomv1beta1.ConditionTypeSucceeded,
		Status:             status,
		ObservedGeneration: dgdr.Generation,
		Reason:             reason,
		Message:            message,
	})
}

// updatePhaseAndRequeue updates the DGDR phase and requeues
func (r *DynamoGraphDeploymentRequestReconciler) updatePhaseAndRequeue(ctx context.Context, dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest, phase nvidiacomv1beta1.DGDRPhase, message string) (ctrl.Result, error) {
	logger := log.FromContext(ctx)
	logger.Info("Updating DGDR phase", "name", dgdr.Name, "phase", phase, "message", message)
	dgdr.Status.Phase = phase
	setSucceededCondition(dgdr, phase)
1915
1916
1917
1918
1919
1920
	if err := r.Status().Update(ctx, dgdr); err != nil {
		return ctrl.Result{}, err
	}
	return ctrl.Result{Requeue: true}, nil
}

1921
1922
// updatePhaseWithCondition updates phase and adds/updates a condition
func (r *DynamoGraphDeploymentRequestReconciler) updatePhaseWithCondition(
1923
	ctx context.Context,
1924
1925
	dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest,
	phase nvidiacomv1beta1.DGDRPhase,
1926
1927
1928
1929
1930
	conditionType string,
	status metav1.ConditionStatus,
	reason string,
	message string,
) (ctrl.Result, error) {
1931
1932
	dgdr.Status.Phase = phase
	setSucceededCondition(dgdr, phase)
1933
1934
1935
1936
1937
1938
1939
1940
1941
1942
1943
1944
1945
1946
1947
1948
1949
1950
1951
1952
1953
1954

	condition := metav1.Condition{
		Type:               conditionType,
		Status:             status,
		ObservedGeneration: dgdr.Generation,
		LastTransitionTime: metav1.Now(),
		Reason:             reason,
		Message:            message,
	}

	dgdr.AddStatusCondition(condition)

	if err := r.Status().Update(ctx, dgdr); err != nil {
		return ctrl.Result{}, err
	}

	return ctrl.Result{Requeue: true}, nil
}

// SetupWithManager sets up the controller with the Manager
func (r *DynamoGraphDeploymentRequestReconciler) SetupWithManager(mgr ctrl.Manager) error {
	return ctrl.NewControllerManagedBy(mgr).
1955
		For(&nvidiacomv1beta1.DynamoGraphDeploymentRequest{}).
1956
		Named(consts.ResourceTypeDynamoGraphDeploymentRequest).
1957
1958
1959
1960
1961
1962
1963
		Owns(&batchv1.Job{}, builder.WithPredicates(predicate.Funcs{
			// ignore creation cause we don't want to be called again after we create the job
			CreateFunc:  func(ce event.CreateEvent) bool { return false },
			DeleteFunc:  func(de event.DeleteEvent) bool { return true },
			UpdateFunc:  func(de event.UpdateEvent) bool { return true },
			GenericFunc: func(ge event.GenericEvent) bool { return true },
		})). // Watch Jobs created by this controller (via ownerReference)
1964
		// Watch DGDs created by this controller (via label)
1965
		Watches(
1966
			&dgdv1alpha1.DynamoGraphDeployment{},
1967
1968
			handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, obj client.Object) []ctrl.Request {
				// Find DGDR by label instead of owner reference
1969
1970
1971
				dgd := obj.(*dgdv1alpha1.DynamoGraphDeployment)
				dgdrName, hasName := dgd.Labels[nvidiacomv1beta1.LabelDGDRName]
				dgdrNamespace, hasNamespace := dgd.Labels[nvidiacomv1beta1.LabelDGDRNamespace]
1972
1973
1974
1975
1976
1977
1978
1979
1980
1981
1982
1983
1984
1985
1986
1987
1988
				if !hasName || !hasNamespace {
					return nil
				}
				return []ctrl.Request{{
					NamespacedName: types.NamespacedName{
						Name:      dgdrName,
						Namespace: dgdrNamespace,
					},
				}}
			}),
			builder.WithPredicates(predicate.Funcs{
				// ignore creation cause we don't want to be called again after we create the DGD
				CreateFunc:  func(ce event.CreateEvent) bool { return false },
				DeleteFunc:  func(de event.DeleteEvent) bool { return true },
				UpdateFunc:  func(ue event.UpdateEvent) bool { return true },
				GenericFunc: func(ge event.GenericEvent) bool { return true },
			}),
1989
		).
1990
1991
1992
1993
1994
1995
1996
1997
1998
1999
2000
2001
2002
2003
2004
2005
2006
2007
2008
2009
2010
2011
2012
2013
2014
2015
2016
2017
2018
2019
2020
2021
2022
2023
2024
		// Watch output ConfigMaps for profiling sub-phase updates (via label)
		Watches(
			&corev1.ConfigMap{},
			handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, obj client.Object) []ctrl.Request {
				// Only trigger for ConfigMaps with DGDR labels (written by the sidecar)
				cm := obj.(*corev1.ConfigMap)
				dgdrName, hasName := cm.Labels[nvidiacomv1beta1.LabelDGDRName]
				dgdrNamespace, hasNamespace := cm.Labels[nvidiacomv1beta1.LabelDGDRNamespace]
				if !hasName || !hasNamespace {
					return nil
				}
				return []ctrl.Request{{
					NamespacedName: types.NamespacedName{
						Name:      dgdrName,
						Namespace: dgdrNamespace,
					},
				}}
			}),
			builder.WithPredicates(predicate.Funcs{
				CreateFunc: func(ce event.CreateEvent) bool {
					labels := ce.Object.GetLabels()
					_, hasName := labels[nvidiacomv1beta1.LabelDGDRName]
					_, hasNamespace := labels[nvidiacomv1beta1.LabelDGDRNamespace]
					return hasName && hasNamespace
				},
				UpdateFunc: func(ue event.UpdateEvent) bool {
					labels := ue.ObjectNew.GetLabels()
					_, hasName := labels[nvidiacomv1beta1.LabelDGDRName]
					_, hasNamespace := labels[nvidiacomv1beta1.LabelDGDRNamespace]
					return hasName && hasNamespace
				},
				DeleteFunc:  func(de event.DeleteEvent) bool { return false },
				GenericFunc: func(ge event.GenericEvent) bool { return false },
			}),
		).
2025
2026
		// Set the event filter to ignore resources handled by other controllers in namespace-restricted mode
		WithEventFilter(commonController.EphemeralDeploymentEventFilter(r.Config, r.RuntimeConfig)).
2027
		Complete(observability.NewObservedReconciler(r, consts.ResourceTypeDynamoGraphDeploymentRequest))
2028
}