dynamographdeploymentrequest_controller.go 61 KB
Newer Older
1
/*
2
 * SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
 * SPDX-License-Identifier: Apache-2.0
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package controller

import (
	"bytes"
	"context"
23
24
	"encoding/json"
	"errors"
25
	"fmt"
26
	"io"
27
28
29
30
31
32
33
	"text/template"

	batchv1 "k8s.io/api/batch/v1"
	corev1 "k8s.io/api/core/v1"
	apierrors "k8s.io/apimachinery/pkg/api/errors"
	"k8s.io/apimachinery/pkg/api/meta"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
34
	"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
35
36
	"k8s.io/apimachinery/pkg/runtime"
	"k8s.io/apimachinery/pkg/types"
37
	"k8s.io/apimachinery/pkg/util/yaml"
38
	"k8s.io/client-go/tools/record"
39
	"k8s.io/utils/ptr"
40
41
42
43
44
45
46
	ctrl "sigs.k8s.io/controller-runtime"
	"sigs.k8s.io/controller-runtime/pkg/builder"
	"sigs.k8s.io/controller-runtime/pkg/client"
	"sigs.k8s.io/controller-runtime/pkg/event"
	"sigs.k8s.io/controller-runtime/pkg/handler"
	"sigs.k8s.io/controller-runtime/pkg/log"
	"sigs.k8s.io/controller-runtime/pkg/predicate"
47
	sigsyaml "sigs.k8s.io/yaml"
48

49
	configv1alpha1 "github.com/ai-dynamo/dynamo/deploy/operator/api/config/v1alpha1"
50
51
	dgdv1alpha1 "github.com/ai-dynamo/dynamo/deploy/operator/api/v1alpha1"
	nvidiacomv1beta1 "github.com/ai-dynamo/dynamo/deploy/operator/api/v1beta1"
52
	"github.com/ai-dynamo/dynamo/deploy/operator/internal/consts"
53
	commonController "github.com/ai-dynamo/dynamo/deploy/operator/internal/controller_common"
54
	"github.com/ai-dynamo/dynamo/deploy/operator/internal/gpu"
55
	"github.com/ai-dynamo/dynamo/deploy/operator/internal/observability"
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
)

const (
	// Job naming
	JobNamePrefixOnline = "profile-online-"
	JobNamePrefixAIC    = "profile-aic-"

	// Container names
	ContainerNameProfiler     = "profiler"
	ContainerNameOutputCopier = "output-copier"

	// ServiceAccount
	ServiceAccountProfilingJob = "dgdr-profiling-job"

	// ConfigMap naming
	ConfigMapOutputPrefix = "dgdr-output-"

73
74
75
	// Annotation keys
	AnnotationAdditionalResources = "dgdr.nvidia.com/additional-resources"

76
77
78
79
80
81
	// Annotation keys for v1alpha1 round-trip compatibility.
	// The conversion layer stores v1alpha1 fields that have no v1beta1 spec equivalent
	// as annotations so the controller can still honour them for converted resources.
	AnnotationConfigMapRef = "nvidia.com/dgdr-config-map-ref"
	AnnotationOutputPVC    = "nvidia.com/dgdr-output-pvc"

82
83
84
	// Size limits
	MaxAnnotationSize = 250000 // ~250KB, below K8s 256KB limit

85
86
87
88
89
	// Sidecar image
	SidecarImage = "bitnami/kubectl:latest"

	// Volume names
	VolumeNameProfilingOutput = "profiling-output"
90
	VolumeNameProfilingConfig = "profiling-config"
91
	VolumeNameModelCache      = "model-cache"
92
93

	// Volume paths
94
	ProfilingOutputPath        = "/data"
95
	ProfilingOutputFile        = "final_config.yaml"
96
97
	ProfilingConfigMountPath   = "/config"
	ProfilingConfigDefaultKey  = "disagg.yaml"
98
	DefaultModelCacheMountPath = "/opt/model-cache"
99
100
101
102
103
104
105
106
107
108
109
110
111
112

	// Command line arguments
	ArgModel   = "--model"
	ArgBackend = "--backend"
	ArgTTFT    = "--ttft"
	ArgITL     = "--itl"
	ArgConfig  = "--config"

	// Messages
	MessageInitialized               = "DGDR initialized successfully"
	MessageProfilingJobCreated       = "Profiling job created"
	MessageAICProfilingJobCreated    = "AIC profiling job created"
	MessageProfilingInProgress       = "Profiling is in progress"
	MessageSpecGenerated             = "DynamoGraphDeployment spec generated successfully"
113
	MessageSpecAvailable             = "Generated spec is available in annotation nvidia.com/generated-dgd-spec"
114
115
116
117
118
	MessageDeploymentCreated         = "DynamoGraphDeployment %s created successfully"
	MessageDeploymentReady           = "DynamoGraphDeployment %s is ready"
	MessageDeploymentDegraded        = "DynamoGraphDeployment %s degraded from Ready to %s"
	MessageDeploymentDeleted         = "DGD %s was deleted. DGDR will not recreate it. Delete this DGDR and create a new one to redeploy."
	MessageInvalidState              = "Invalid state"
119
	MessageSpecChangeRejected        = "Cannot modify spec in phase '%s'. DynamoGraphDeploymentRequest is immutable once profiling starts. Create a new resource with a different name instead."
120
121
122
123
124
125
126
127
	MessageJobCreationFailed         = "JobCreationFailed"
	MessageDeploymentCreationFailed  = "DeploymentCreationFailed"
	MessageResultsRetrievalFailed    = "ResultsRetrievalFailed"
	MessageGenerationFailed          = "GenerationFailed"
	MessageAIConfiguratorCheckFailed = "AIConfiguratorCheckFailed"
	MessageProfilingCheckFailed      = "ProfilingCheckFailed"
	MessageConfigMapNotFound         = "ConfigMap %s not found in namespace %s"
	MessageConfigMapKeyNotFound      = "key %s not found in ConfigMap %s"
128
	MessageModelCachePVCNotFound     = "model cache PVC %s not found in namespace %s"
129
130
131
132
133
134
)

// shell script template for the output copier sidecar
const sidecarScriptTemplate = `
set -e
set -o pipefail
135
136

# Wait for profiler container to terminate (no timeout - profiling can take hours)
137
echo "Waiting for profiler to complete..."
138
139
140
141
START_TIME=$(date +%s)
LAST_PROGRESS_LOG=$START_TIME
PROGRESS_INTERVAL=300

142
while true; do
143
144
145
146
147
148
149
150
151
152
153
154
155
  CURRENT_TIME=$(date +%s)
  ELAPSED=$((CURRENT_TIME - START_TIME))

  # Log progress every 5 minutes
  if [ $((CURRENT_TIME - LAST_PROGRESS_LOG)) -ge $PROGRESS_INTERVAL ]; then
    echo "Still waiting... ($(($ELAPSED / 60)) minutes elapsed)"
    LAST_PROGRESS_LOG=$CURRENT_TIME
  fi

  # Check if profiler container terminated
  CONTAINER_STATUS=$(kubectl get pod $HOSTNAME -n {{.Namespace}} -o jsonpath='{.status.containerStatuses[?(@.name=="profiler")].state}' 2>/dev/null || echo "")
  if echo "$CONTAINER_STATUS" | grep -q "terminated"; then
    echo "Profiler terminated (ran for $(($ELAPSED / 60)) minutes)"
156
157
158
159
160
    break
  fi
  sleep 5
done

161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
# Check profiler status file (2 minute timeout)
echo "Checking profiler status..."
STATUS_FILE="{{.OutputPath}}/profiler_status.yaml"
TIMEOUT=120
CHECK_START=$(date +%s)

# Wait for status file to exist
while [ ! -f "$STATUS_FILE" ]; do
  ELAPSED=$(($(date +%s) - CHECK_START))
  if [ $ELAPSED -ge $TIMEOUT ]; then
    echo "ERROR: Status file not found after ${TIMEOUT}s"
    exit 1
  fi
  sleep 2
done

# Read and parse status from YAML file
STATUS=$(grep "^status:" "$STATUS_FILE" | awk '{print $2}' | tr -d '"' | tr -d "'")

if [ -z "$STATUS" ]; then
  echo "ERROR: Invalid status file format"
  exit 1
fi

# Check status value
case "$STATUS" in
  success)
    MESSAGE=$(grep "^message:" "$STATUS_FILE" | sed 's/^message: *//' | tr -d '"' | tr -d "'")
    echo "Profiler succeeded: $MESSAGE"
    ;;
  failed)
    ERROR=$(grep "^error:" "$STATUS_FILE" | sed 's/^error: *//' | tr -d '"' | tr -d "'")
    MESSAGE=$(grep "^message:" "$STATUS_FILE" | sed 's/^message: *//' | tr -d '"' | tr -d "'")
    echo "ERROR: Profiler failed: ${ERROR:-$MESSAGE}"
    exit 1
    ;;
  running)
    echo "ERROR: Profiler still running (unexpected)"
    exit 1
    ;;
  *)
    echo "ERROR: Unknown status: $STATUS"
    exit 1
    ;;
esac

echo "Creating ConfigMap..."
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223

# Start building ConfigMap YAML with DGD spec
cat >/tmp/cm.yaml <<EOF
apiVersion: v1
kind: ConfigMap
metadata:
  name: {{.ConfigMapName}}
  namespace: {{.Namespace}}
  labels:
    dgdr.nvidia.com/name: {{.DGDRName}}
    nvidia.com/managed-by: dynamo-operator
data:
  {{.OutputFile}}: |
EOF
sed 's/^/    /' {{.OutputPath}}/{{.OutputFile}} >> /tmp/cm.yaml

224
225
226
227
228
229
# Add profiler status file for debugging
if [ -f {{.OutputPath}}/profiler_status.yaml ]; then
  echo "  profiler_status.yaml: |" >> /tmp/cm.yaml
  sed 's/^/    /' {{.OutputPath}}/profiler_status.yaml >> /tmp/cm.yaml
fi

230
231
# Note: Profiling data (raw_data.npz converted to JSON) is included in the
# generated DGD YAML as a separate ConfigMap by the profiler, no need to add it here
232
233
234
235
236
237
238
239

kubectl apply -f /tmp/cm.yaml
echo "Saved profiling output to ConfigMap {{.ConfigMapName}}"
`

// DynamoGraphDeploymentRequestReconciler reconciles a DynamoGraphDeploymentRequest object
type DynamoGraphDeploymentRequestReconciler struct {
	client.Client
240
	APIReader     client.Reader
241
242
243
	Recorder      record.EventRecorder
	Config        *configv1alpha1.OperatorConfiguration
	RuntimeConfig *commonController.RuntimeConfig
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259

	// RBACMgr handles RBAC setup for profiling jobs
	RBACManager RBACManager
}

// RBACManager interface for managing RBAC resources
type RBACManager interface {
	EnsureServiceAccountWithRBAC(ctx context.Context, targetNamespace, serviceAccountName, clusterRoleName string) error
}

// GetRecorder implements commonController.Reconciler interface
func (r *DynamoGraphDeploymentRequestReconciler) GetRecorder() record.EventRecorder {
	return r.Recorder
}

// FinalizeResource implements commonController.Finalizer interface
260
func (r *DynamoGraphDeploymentRequestReconciler) FinalizeResource(ctx context.Context, dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest) error {
261
262
263
264
265
266
267
268
269
270
271
272
273
	logger := log.FromContext(ctx)

	logger.Info("DGDR finalized successfully", "name", dgdr.Name)
	return nil
}

// +kubebuilder:rbac:groups=nvidia.com,resources=dynamographdeploymentrequests,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=nvidia.com,resources=dynamographdeploymentrequests/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=nvidia.com,resources=dynamographdeploymentrequests/finalizers,verbs=update
// +kubebuilder:rbac:groups=nvidia.com,resources=dynamographdeployments,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=nvidia.com,resources=dynamographdeployments/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=nvidia.com,resources=dynamographdeployments/finalizers,verbs=update
// +kubebuilder:rbac:groups=batch,resources=jobs,verbs=get;list;watch;create;update;patch;delete
274
// +kubebuilder:rbac:groups=core,resources=pods,verbs=get;list;watch
275
276
277
278
279
280
281
282
283
// +kubebuilder:rbac:groups=core,resources=configmaps,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=core,resources=events,verbs=create;patch

// Reconcile handles the reconciliation loop for DynamoGraphDeploymentRequest
func (r *DynamoGraphDeploymentRequestReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
	logger := log.FromContext(ctx)
	logger.Info("Reconciling DynamoGraphDeploymentRequest", "name", req.Name, "namespace", req.Namespace)

	// Fetch the DGDR instance
284
	dgdr := &nvidiacomv1beta1.DynamoGraphDeploymentRequest{}
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
	if err := r.Get(ctx, req.NamespacedName, dgdr); err != nil {
		if apierrors.IsNotFound(err) {
			logger.Info("DGDR resource not found, ignoring since object must be deleted")
			return ctrl.Result{}, nil
		}
		logger.Error(err, "Failed to get DGDR")
		return ctrl.Result{}, err
	}

	// Handle finalizer using common function
	finalized, err := commonController.HandleFinalizer(ctx, dgdr, r.Client, r)
	if err != nil {
		return ctrl.Result{}, err
	}
	if finalized {
		// Resource was deleted and finalized
		return ctrl.Result{}, nil
	}

	// Check for spec changes (immutability enforcement)
	if dgdr.Status.ObservedGeneration > 0 && dgdr.Status.ObservedGeneration != dgdr.Generation {
		// Spec changed after initial processing
307
308
309
310
		if dgdr.Status.Phase == nvidiacomv1beta1.DGDRPhaseProfiling || dgdr.Status.Phase == nvidiacomv1beta1.DGDRPhaseDeploying ||
			dgdr.Status.Phase == nvidiacomv1beta1.DGDRPhaseReady || dgdr.Status.Phase == nvidiacomv1beta1.DGDRPhaseDeployed {
			logger.Info("Spec change detected in immutable phase",
				"phase", dgdr.Status.Phase,
311
312
313
				"observedGeneration", dgdr.Status.ObservedGeneration,
				"currentGeneration", dgdr.Generation)

314
315
			r.Recorder.Event(dgdr, corev1.EventTypeWarning, nvidiacomv1beta1.EventReasonSpecChangeRejected,
				fmt.Sprintf(MessageSpecChangeRejected, dgdr.Status.Phase))
316
317

			// Keep the old observedGeneration to continue rejecting changes
318
			// No phase transition - stay in current phase with old spec
319
320
321
			return ctrl.Result{}, nil
		}
	}
322
323
324
325
326
327
328
329
330
331
332
333
334
335
	// Phase machine: handle different phases
	switch dgdr.Status.Phase {
	case nvidiacomv1beta1.DGDRPhasePending, "":
		return r.handlePendingPhase(ctx, dgdr)
	case nvidiacomv1beta1.DGDRPhaseProfiling:
		return r.handleProfilingPhase(ctx, dgdr)
	case nvidiacomv1beta1.DGDRPhaseDeploying:
		return r.handleDeployingPhase(ctx, dgdr)
	case nvidiacomv1beta1.DGDRPhaseReady:
		return r.handleReadyPhase(ctx, dgdr)
	case nvidiacomv1beta1.DGDRPhaseDeployed:
		return r.handleDeployedPhase(ctx, dgdr)
	case nvidiacomv1beta1.DGDRPhaseFailed:
		return r.handleFailedPhase(ctx, dgdr)
336
	default:
337
338
		logger.Info("Unknown phase", "phase", dgdr.Status.Phase)
		return r.updatePhaseAndRequeue(ctx, dgdr, nvidiacomv1beta1.DGDRPhaseFailed, MessageInvalidState)
339
340
341
	}
}

342
343
344
345
// handlePendingPhase processes newly created or pending DGDR resources.
// When ObservedGeneration == 0, performs initial validation (merged from v1alpha1 Initializing state).
// Otherwise, starts the profiling process.
func (r *DynamoGraphDeploymentRequestReconciler) handlePendingPhase(ctx context.Context, dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest) (ctrl.Result, error) {
346
347
	logger := log.FromContext(ctx)

348
349
350
	// First-time processing: validate spec (merged from handleInitialState)
	if dgdr.Status.ObservedGeneration == 0 {
		logger.Info("Handling initial validation", "name", dgdr.Name)
351

352
353
354
355
356
		// Validate the spec
		if err := r.validateSpec(ctx, dgdr); err != nil {
			r.Recorder.Event(dgdr, corev1.EventTypeWarning, nvidiacomv1beta1.EventReasonValidationFailed, err.Error())
			return r.updatePhaseWithCondition(ctx, dgdr, nvidiacomv1beta1.DGDRPhaseFailed, nvidiacomv1beta1.ConditionTypeValidation, metav1.ConditionFalse, nvidiacomv1beta1.EventReasonValidationFailed, err.Error())
		}
357

358
359
		// Set observedGeneration to track the spec we're processing
		dgdr.Status.ObservedGeneration = dgdr.Generation
360

361
362
363
364
		// Initialize status
		r.Recorder.Event(dgdr, corev1.EventTypeNormal, nvidiacomv1beta1.EventReasonInitialized, MessageInitialized)
		return r.updatePhaseAndRequeue(ctx, dgdr, nvidiacomv1beta1.DGDRPhasePending, MessageInitialized)
	}
365

366
	logger.Info("Handling pending phase", "name", dgdr.Name)
367
368
369

	// Create profiling job (online or AIC)
	if err := r.createProfilingJob(ctx, dgdr); err != nil {
370
371
		r.Recorder.Event(dgdr, corev1.EventTypeWarning, nvidiacomv1beta1.EventReasonProfilingJobFailed, err.Error())
		return r.updatePhaseWithCondition(ctx, dgdr, nvidiacomv1beta1.DGDRPhaseFailed, nvidiacomv1beta1.ConditionTypeProfiling, metav1.ConditionFalse, MessageJobCreationFailed, err.Error())
372
373
374
	}

	// Record event with appropriate message
375
	if isOnlineProfiling(dgdr) {
376
		r.Recorder.Event(dgdr, corev1.EventTypeNormal, nvidiacomv1beta1.EventReasonProfilingJobCreated, MessageProfilingJobCreated)
377
	} else {
378
		r.Recorder.Event(dgdr, corev1.EventTypeNormal, nvidiacomv1beta1.EventReasonProfilingJobCreated, MessageAICProfilingJobCreated)
379
380
	}

381
382
383
	// Update to Profiling phase with Running status
	dgdr.SetProfilingPhase(nvidiacomv1beta1.ProfilingPhaseInitializing)
	return r.updatePhaseWithCondition(ctx, dgdr, nvidiacomv1beta1.DGDRPhaseProfiling, nvidiacomv1beta1.ConditionTypeProfiling, metav1.ConditionFalse, "ProfilingRunning", MessageProfilingInProgress)
384
385
}

386
387
// handleProfilingPhase monitors profiling progress and generates spec when complete
func (r *DynamoGraphDeploymentRequestReconciler) handleProfilingPhase(ctx context.Context, dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest) (ctrl.Result, error) {
388
	logger := log.FromContext(ctx)
389
	logger.Info("Handling profiling phase", "name", dgdr.Name)
390
391
392
393
394
395

	// Check profiling job status (both online and offline/AIC run as Jobs)
	// Note: We watch the Job via Owns(), so we'll be triggered automatically on Job changes
	completed, err := r.checkProfilingJobStatus(ctx, dgdr)
	if err != nil {
		r.Recorder.Event(dgdr, corev1.EventTypeWarning, MessageProfilingCheckFailed, err.Error())
396
397
398
		// Job failed - clear profiling sub-phase and transition to Failed
		dgdr.ClearProfilingPhase()
		return r.updatePhaseWithCondition(ctx, dgdr, nvidiacomv1beta1.DGDRPhaseFailed, nvidiacomv1beta1.ConditionTypeProfiling, metav1.ConditionFalse, "ProfilingFailed", err.Error())
399
400
401
402
403
404
405
406
	}

	if !completed {
		logger.Info("Profiling job still running", "name", dgdr.Name)
		// Don't requeue - we'll be triggered when the Job completes/fails
		return ctrl.Result{}, nil
	}

407
408
409
	// Profiling complete — clear the profiling sub-phase
	dgdr.ClearProfilingPhase()

410
411
	// Mark profiling as completed successfully
	meta.SetStatusCondition(&dgdr.Status.Conditions, metav1.Condition{
412
		Type:               nvidiacomv1beta1.ConditionTypeProfiling,
413
414
415
416
417
418
419
420
421
		Status:             metav1.ConditionTrue,
		ObservedGeneration: dgdr.Generation,
		Reason:             "ProfilingCompleted",
		Message:            "Profiling job completed successfully",
	})

	// Retrieve profiling results and generate spec
	if err := r.generateDGDSpec(ctx, dgdr); err != nil {
		r.Recorder.Event(dgdr, corev1.EventTypeWarning, MessageGenerationFailed, err.Error())
422
		return r.updatePhaseWithCondition(ctx, dgdr, nvidiacomv1beta1.DGDRPhaseFailed, nvidiacomv1beta1.ConditionTypeSpecGenerated, metav1.ConditionFalse, MessageGenerationFailed, err.Error())
423
424
425
	}

	// Record spec generation event
426
	r.Recorder.Event(dgdr, corev1.EventTypeNormal, nvidiacomv1beta1.EventReasonSpecGenerated, MessageSpecGenerated)
427

428
429
	// Create additional resources (ConfigMaps) immediately after profiling
	// This ensures that the `planner-profile-data` ConfigMap is available for both auto and manual deployment
430
	// v1beta1 uses the DGDR namespace for additional resources.
431
432
433
434
435
436
437
438
	targetNamespace := dgdr.Namespace
	if err := r.createAdditionalResources(ctx, dgdr, targetNamespace); err != nil {
		logger.Error(err, "Failed to create additional resources after profiling")
		// Don't fail the DGDR, just log the error - ConfigMaps can be created manually
		r.Recorder.Event(dgdr, corev1.EventTypeWarning, "ConfigMapCreationFailed",
			fmt.Sprintf("Failed to create ConfigMaps from profiling output: %v", err))
	}

439
	// If autoApply is enabled, transition to Deploying phase
440
	if dgdr.Spec.AutoApply == nil || *dgdr.Spec.AutoApply {
441
442
		logger.Info("AutoApply enabled, transitioning to Deploying phase")
		return r.updatePhaseWithCondition(ctx, dgdr, nvidiacomv1beta1.DGDRPhaseDeploying, nvidiacomv1beta1.ConditionTypeSpecGenerated, metav1.ConditionTrue, nvidiacomv1beta1.EventReasonSpecGenerated, MessageSpecGenerated)
443
444
	}

445
446
	// Otherwise, transition to Ready phase
	return r.updatePhaseWithCondition(ctx, dgdr, nvidiacomv1beta1.DGDRPhaseReady, nvidiacomv1beta1.ConditionTypeSpecGenerated, metav1.ConditionTrue, nvidiacomv1beta1.EventReasonSpecGenerated, MessageSpecAvailable)
447
448
}

449
450
// handleReadyPhase handles DGDR in Ready phase (profiling complete, spec available)
func (r *DynamoGraphDeploymentRequestReconciler) handleReadyPhase(ctx context.Context, dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest) (ctrl.Result, error) {
451
452
453
	logger := log.FromContext(ctx)
	logger.Info("DGDR is ready", "name", dgdr.Name)

454
455
456
457
458
459
460
461
462
	// Nothing to monitor in Ready phase - spec is available for manual application
	return ctrl.Result{}, nil
}

// handleDeployingPhase handles DGD creation and monitors deployment
func (r *DynamoGraphDeploymentRequestReconciler) handleDeployingPhase(ctx context.Context, dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest) (ctrl.Result, error) {
	logger := log.FromContext(ctx)
	logger.Info("Handling deploying phase", "name", dgdr.Name)

463
	if dgdr.Spec.AutoApply != nil && !*dgdr.Spec.AutoApply {
464
465
466
467
468
		// Shouldn't be in this phase without autoApply
		logger.Info("AutoApply not enabled, transitioning to Ready")
		dgdr.Status.Phase = nvidiacomv1beta1.DGDRPhaseReady
		setSucceededCondition(dgdr, nvidiacomv1beta1.DGDRPhaseReady)
		return ctrl.Result{}, r.Status().Update(ctx, dgdr)
469
470
	}

471
472
473
474
475
476
477
	// Check if we need to create DGD
	if dgdr.Status.DGDName == "" {
		return r.createDGD(ctx, dgdr)
	}

	// DGD was already created, check its status
	dgd := &dgdv1alpha1.DynamoGraphDeployment{}
478
	err := r.Get(ctx, types.NamespacedName{
479
480
		Name:      dgdr.Status.DGDName,
		Namespace: dgdr.Namespace,
481
482
483
484
485
486
487
488
489
490
491
	}, dgd)

	if apierrors.IsNotFound(err) {
		// DGD was deleted by user
		return r.handleDGDDeleted(ctx, dgdr)
	}

	if err != nil {
		return ctrl.Result{}, err
	}

492
493
494
	// Check if DGD is Ready
	var condStatus metav1.ConditionStatus
	var condReason, condMessage string
495

496
497
498
499
	if dgd.Status.State == dgdv1alpha1.DGDStateSuccessful {
		logger.Info("DGD is Ready, transitioning to Deployed phase")
		dgdr.Status.Phase = nvidiacomv1beta1.DGDRPhaseDeployed
		setSucceededCondition(dgdr, nvidiacomv1beta1.DGDRPhaseDeployed)
500

501
502
		r.Recorder.Event(dgdr, corev1.EventTypeNormal, nvidiacomv1beta1.EventReasonDeploymentReady,
			fmt.Sprintf(MessageDeploymentReady, dgd.Name))
503

504
505
506
507
508
		condStatus = metav1.ConditionTrue
		condReason = nvidiacomv1beta1.EventReasonDeploymentReady
		condMessage = fmt.Sprintf(MessageDeploymentReady, dgd.Name)
	} else {
		logger.Info("DGD not yet ready", "name", dgd.Name, "state", dgd.Status.State)
509

510
511
512
		condStatus = metav1.ConditionFalse
		condReason = "DeploymentInProgress"
		condMessage = fmt.Sprintf("DGD %s is in %s state", dgd.Name, string(dgd.Status.State))
513
514
	}

515
516
517
518
519
520
521
522
	updateDeploymentInfo(dgdr, dgd)
	meta.SetStatusCondition(&dgdr.Status.Conditions, metav1.Condition{
		Type:    nvidiacomv1beta1.ConditionTypeDeploymentReady,
		Status:  condStatus,
		Reason:  condReason,
		Message: condMessage,
	})

523
524
525
	return ctrl.Result{}, r.Status().Update(ctx, dgdr)
}

526
527
// handleDeployedPhase monitors a healthy DGD and detects degradation or deletion
func (r *DynamoGraphDeploymentRequestReconciler) handleDeployedPhase(ctx context.Context, dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest) (ctrl.Result, error) {
528
	logger := log.FromContext(ctx)
529
	logger.Info("DGDR is deployed", "name", dgdr.Name)
530

531
532
	// Check if DGD still exists and monitor its status
	dgd := &dgdv1alpha1.DynamoGraphDeployment{}
533
	err := r.Get(ctx, types.NamespacedName{
534
535
		Name:      dgdr.Status.DGDName,
		Namespace: dgdr.Namespace,
536
537
538
539
540
541
542
543
544
545
546
	}, dgd)

	if apierrors.IsNotFound(err) {
		// DGD was deleted by user
		return r.handleDGDDeleted(ctx, dgdr)
	}

	if err != nil {
		return ctrl.Result{}, err
	}

547
548
549
550
	// Check if DGD degraded from Ready
	if dgd.Status.State != dgdv1alpha1.DGDStateSuccessful {
		logger.Info("DGD degraded, transitioning back to Deploying",
			"dgdState", dgd.Status.State)
551

552
553
554
		dgdr.Status.Phase = nvidiacomv1beta1.DGDRPhaseDeploying
		setSucceededCondition(dgdr, nvidiacomv1beta1.DGDRPhaseDeploying)
		updateDeploymentInfo(dgdr, dgd)
555

556
557
		r.Recorder.Event(dgdr, corev1.EventTypeWarning, nvidiacomv1beta1.EventReasonDeploymentDegraded,
			fmt.Sprintf(MessageDeploymentDegraded, dgd.Name, string(dgd.Status.State)))
558
559

		meta.SetStatusCondition(&dgdr.Status.Conditions, metav1.Condition{
560
561
562
563
			Type:    nvidiacomv1beta1.ConditionTypeDeploymentReady,
			Status:  metav1.ConditionFalse,
			Reason:  nvidiacomv1beta1.EventReasonDeploymentDegraded,
			Message: fmt.Sprintf("Deployment degraded to %s", string(dgd.Status.State)),
564
		})
565
566
567
568
569
570
	} else {
		// DGD is healthy — update replica info only if changed
		if !updateDeploymentInfo(dgdr, dgd) {
			// Nothing changed, skip the status write
			return ctrl.Result{}, nil
		}
571
572
573
574
575
	}

	return ctrl.Result{}, r.Status().Update(ctx, dgdr)
}

576
577
578
// handleDGDDeleted handles the case when auto-created DGD is deleted by user.
// In v1beta1, this transitions to Failed (DeploymentDeleted phase was removed).
func (r *DynamoGraphDeploymentRequestReconciler) handleDGDDeleted(ctx context.Context, dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest) (ctrl.Result, error) {
579
	logger := log.FromContext(ctx)
580
	logger.Info("DGD was deleted by user, transitioning to Failed phase")
581

582
583
	dgdr.Status.Phase = nvidiacomv1beta1.DGDRPhaseFailed
	setSucceededCondition(dgdr, nvidiacomv1beta1.DGDRPhaseFailed)
584

585
586
	r.Recorder.Event(dgdr, corev1.EventTypeWarning, nvidiacomv1beta1.EventReasonDeploymentDeleted,
		fmt.Sprintf(MessageDeploymentDeleted, dgdr.Status.DGDName))
587

588
589
	dgdr.Status.DGDName = ""
	dgdr.Status.DeploymentInfo = nil
590

591
	meta.SetStatusCondition(&dgdr.Status.Conditions, metav1.Condition{
592
		Type:    nvidiacomv1beta1.ConditionTypeDeploymentReady,
593
		Status:  metav1.ConditionFalse,
594
		Reason:  nvidiacomv1beta1.EventReasonDeploymentDeleted,
595
596
597
598
599
600
601
		Message: "Deployment was deleted by user. Create a new DGDR to redeploy.",
	})

	return ctrl.Result{}, r.Status().Update(ctx, dgdr)
}

// createDGD creates a DynamoGraphDeployment with the generated spec
602
func (r *DynamoGraphDeploymentRequestReconciler) createDGD(ctx context.Context, dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest) (ctrl.Result, error) {
603
604
	logger := log.FromContext(ctx)

605
606
607
608
	// Extract DGD spec from annotation (stored by generateDGDSpec)
	dgdSpecYAML, ok := dgdr.Annotations["nvidia.com/generated-dgd-spec"]
	if !ok || dgdSpecYAML == "" {
		return ctrl.Result{}, fmt.Errorf("generated DGD spec not found in annotation nvidia.com/generated-dgd-spec")
609
610
	}

611
612
613
	generatedDGD := &dgdv1alpha1.DynamoGraphDeployment{}
	if err := yaml.Unmarshal([]byte(dgdSpecYAML), generatedDGD); err != nil {
		return ctrl.Result{}, fmt.Errorf("failed to unmarshal generated deployment from annotation: %w", err)
614
615
	}

616
	// Determine DGD name and namespace from generated deployment
617
618
619
620
621
622
623
624
625
626
627
	dgdName := generatedDGD.Name
	dgdNamespace := dgdr.Namespace

	// Build labels (start with generated DGD's labels)
	labels := make(map[string]string)
	if generatedDGD.Labels != nil {
		for k, v := range generatedDGD.Labels {
			labels[k] = v
		}
	}
	// Add/override with managed labels
628
629
630
	labels[nvidiacomv1beta1.LabelDGDRName] = dgdr.Name
	labels[nvidiacomv1beta1.LabelDGDRNamespace] = dgdr.Namespace
	labels[nvidiacomv1beta1.LabelManagedBy] = nvidiacomv1beta1.LabelValueDynamoOperator
631
632
633
634
635
636
637
638
639
640

	// Build annotations (start with generated DGD's annotations)
	annotations := make(map[string]string)
	if generatedDGD.Annotations != nil {
		for k, v := range generatedDGD.Annotations {
			annotations[k] = v
		}
	}

	// Create DGD from generated deployment
641
	dgd := &dgdv1alpha1.DynamoGraphDeployment{
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
		ObjectMeta: metav1.ObjectMeta{
			Name:        dgdName,
			Namespace:   dgdNamespace,
			Labels:      labels,
			Annotations: annotations,
		},
		Spec: generatedDGD.Spec,
	}

	// Note: We don't set owner reference on DGD
	// If a DGDR is deleted, the DGD may be serving traffic and should persist independently.
	// We use labels (LabelDGDRName) to track the relationship.

	logger.Info("Creating DynamoGraphDeployment", "name", dgdName, "namespace", dgdNamespace)

	if err := r.Create(ctx, dgd); err != nil {
		if apierrors.IsAlreadyExists(err) {
			// DGD already exists, just update status
			logger.Info("DGD already exists, updating status")
661
			dgdr.Status.DGDName = dgdName
662
663
664
665
666
667
668
			return ctrl.Result{}, r.Status().Update(ctx, dgdr)
		}
		r.Recorder.Event(dgdr, corev1.EventTypeWarning, MessageDeploymentCreationFailed, err.Error())
		return ctrl.Result{}, err
	}

	// Update status
669
	dgdr.Status.DGDName = dgdName
670

671
	r.Recorder.Event(dgdr, corev1.EventTypeNormal, nvidiacomv1beta1.EventReasonDeploymentCreated,
672
673
674
		fmt.Sprintf(MessageDeploymentCreated, dgdName))

	meta.SetStatusCondition(&dgdr.Status.Conditions, metav1.Condition{
675
		Type:    nvidiacomv1beta1.ConditionTypeDeploymentReady,
676
		Status:  metav1.ConditionFalse,
677
		Reason:  nvidiacomv1beta1.EventReasonDeploymentCreated,
678
679
680
681
682
683
684
685
		Message: fmt.Sprintf("DGD %s created, waiting for Ready", dgdName),
	})

	logger.Info("DynamoGraphDeployment created successfully", "name", dgdName)

	return ctrl.Result{}, r.Status().Update(ctx, dgdr)
}

686
// createAdditionalResources creates ConfigMaps from the profiling output that should be deployed alongside the DGD
687
func (r *DynamoGraphDeploymentRequestReconciler) createAdditionalResources(ctx context.Context, dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest, targetNamespace string) error {
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
	logger := log.FromContext(ctx)

	// Check if there are additional resources stored in annotations
	if dgdr.Annotations == nil {
		return nil
	}

	resourcesYAML, exists := dgdr.Annotations[AnnotationAdditionalResources]
	if !exists || resourcesYAML == "" {
		return nil
	}

	// Parse using standard Kubernetes YAML decoder
	decoder := yaml.NewYAMLOrJSONDecoder(bytes.NewReader([]byte(resourcesYAML)), 4096)
	resourceCount := 0

	for {
		obj := &unstructured.Unstructured{}
		if err := decoder.Decode(obj); err != nil {
			if err == io.EOF {
				break
			}
			logger.Error(err, "Failed to decode resource, skipping")
			continue
		}

		if obj.GetKind() == "" {
			continue
		}

		resourceCount++

		// Only support ConfigMap for now (what profiler actually generates)
		if obj.GetKind() != "ConfigMap" {
			logger.Info("Skipping non-ConfigMap resource from profiling output", "kind", obj.GetKind(), "name", obj.GetName())
			continue
		}

		cm := &corev1.ConfigMap{}
		if err := runtime.DefaultUnstructuredConverter.FromUnstructured(obj.Object, cm); err != nil {
			logger.Error(err, "Failed to convert to ConfigMap", "name", obj.GetName())
			continue
		}

		// Override namespace and add tracking labels
		cm.Namespace = targetNamespace
		if cm.Labels == nil {
			cm.Labels = make(map[string]string)
		}
737
738
739
		cm.Labels[nvidiacomv1beta1.LabelDGDRName] = dgdr.Name
		cm.Labels[nvidiacomv1beta1.LabelDGDRNamespace] = dgdr.Namespace
		cm.Labels[nvidiacomv1beta1.LabelManagedBy] = nvidiacomv1beta1.LabelValueDynamoOperator
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759

		// Create the ConfigMap
		if err := r.Create(ctx, cm); err != nil {
			if apierrors.IsAlreadyExists(err) {
				logger.Info("ConfigMap already exists, skipping", "name", cm.Name)
			} else {
				return fmt.Errorf("failed to create ConfigMap %s: %w", cm.Name, err)
			}
		} else {
			logger.Info("Created ConfigMap from profiling output", "name", cm.Name, "namespace", targetNamespace)
		}
	}

	if resourceCount > 0 {
		logger.Info("Deploying additional resources from profiling output", "count", resourceCount)
	}

	return nil
}

760
761
// handleFailedPhase handles DGDR in Failed phase
func (r *DynamoGraphDeploymentRequestReconciler) handleFailedPhase(ctx context.Context, dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest) (ctrl.Result, error) {
762
	logger := log.FromContext(ctx)
763
	logger.Info("DGDR is in failed phase", "name", dgdr.Name)
764
765
766
767
768

	// Could implement retry logic here if desired
	return ctrl.Result{}, nil
}

769
// getProfilingJobName returns the job name for a DGDR
770
func getProfilingJobName(dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest) string {
771
772
	// Use "profile-" prefix for all profiling jobs
	return fmt.Sprintf("profile-%s", dgdr.Name)
773
774
775
}

// getOutputConfigMapName returns the ConfigMap name for profiling output
776
func getOutputConfigMapName(dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest) string {
777
778
779
	return fmt.Sprintf("%s%s", ConfigMapOutputPrefix, dgdr.Name)
}

780
781
782
// isOnlineProfiling returns true. In v1beta1, the profiler decides online vs AIC
// mode internally based on its config. The controller always uses the same label.
func isOnlineProfiling(_ *nvidiacomv1beta1.DynamoGraphDeploymentRequest) bool {
783
784
	return true
}
785

786
// validateSpec validates the DGDR spec
787
788
789
790
791
792
793
794
795
796
797
func (r *DynamoGraphDeploymentRequestReconciler) validateSpec(ctx context.Context, dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest) error {
	var errs []error

	// Disallow searchStrategy: thorough with backend: auto.
	if dgdr.Spec.SearchStrategy == nvidiacomv1beta1.SearchStrategyThorough &&
		dgdr.Spec.Backend == nvidiacomv1beta1.BackendTypeAuto {
		errs = append(errs, fmt.Errorf(
			"spec.searchStrategy %q is incompatible with spec.backend %q: set spec.backend to a specific backend (sglang, trtllm, or vllm)",
			nvidiacomv1beta1.SearchStrategyThorough,
			nvidiacomv1beta1.BackendTypeAuto,
		))
798
799
	}

800
	// Validate model cache PVC if provided
801
	if dgdr.Spec.ModelCache != nil && dgdr.Spec.ModelCache.PVCName != "" {
802
803
		pvc := &corev1.PersistentVolumeClaim{}
		err := r.Get(ctx, types.NamespacedName{
804
			Name:      dgdr.Spec.ModelCache.PVCName,
805
806
807
808
809
			Namespace: dgdr.Namespace,
		}, pvc)

		if err != nil {
			if apierrors.IsNotFound(err) {
810
811
812
				errs = append(errs, fmt.Errorf(MessageModelCachePVCNotFound, dgdr.Spec.ModelCache.PVCName, dgdr.Namespace))
			} else {
				return err
813
814
815
816
			}
		}
	}

817
	if err := r.validateGPUHardwareInfo(ctx, dgdr); err != nil {
818
		errs = append(errs, err)
819
820
	}

821
	// The profiler will validate the rest of the configuration
822
	return errors.Join(errs...)
823
824
825
}

// validateGPUHardwareInfo ensures GPU hardware information is available when required for profiling
826
func (r *DynamoGraphDeploymentRequestReconciler) validateGPUHardwareInfo(ctx context.Context, dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest) error {
827
828
	logger := log.FromContext(ctx)

829
830
831
832
	// Check if user provided hardware info in the typed spec
	hasManualConfig := dgdr.Spec.Hardware != nil && (dgdr.Spec.Hardware.GPUSKU != "" ||
		dgdr.Spec.Hardware.VRAMMB != nil ||
		dgdr.Spec.Hardware.NumGPUsPerNode != nil)
833

834
835
	// If manual config is provided, validation passes
	if hasManualConfig {
836
837
838
		return nil
	}

839
	_, err := gpu.DiscoverGPUs(ctx, r.APIReader)
840
841
842
843
844
845
846
	if err == nil {
		// GPU discovery is available, validation passes
		return nil
	}

	logger.Info("GPU discovery not available", "reason", err.Error())

847
	isNamespaceScoped := r.Config.Namespace.Restricted != ""
848
	if isNamespaceScoped {
849
850
		return fmt.Errorf(
			"GPU hardware info required but cannot be auto-discovered." +
851
852
853
				"\n\nOptions to resolve:" +
				"\n\n1. Re-enable GPU discovery (if it was disabled during Helm install):" +
				"\n   helm upgrade ... --set dynamo-operator.gpuDiscovery.enabled=true" +
854
855
856
857
				"\n\n2. Add hardware config to spec.hardware:" +
				"\n   numGpusPerNode: 8" +
				"\n   gpuSku: \"H100-SXM5-80GB\"" +
				"\n   vramMb: 81920")
858
859
	}

860
	return fmt.Errorf("GPU hardware info required but auto-discovery failed. Add spec.hardware.gpuSku, spec.hardware.vramMb, spec.hardware.numGpusPerNode")
861
862
}

863
// createProfilingJob creates a Kubernetes Job for profiling using SyncResource
864
func (r *DynamoGraphDeploymentRequestReconciler) createProfilingJob(ctx context.Context, dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest) error {
865
866
	logger := log.FromContext(ctx)

867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
	// Delete any existing output ConfigMap to ensure fresh profiling results
	// This prevents using stale data from previous profiling runs
	outputConfigMapName := getOutputConfigMapName(dgdr)
	existingCM := &corev1.ConfigMap{}
	err := r.Get(ctx, types.NamespacedName{
		Name:      outputConfigMapName,
		Namespace: dgdr.Namespace,
	}, existingCM)
	if err == nil {
		// ConfigMap exists, delete it
		logger.Info("Deleting existing output ConfigMap to ensure fresh profiling results", "configMap", outputConfigMapName)
		if err := r.Delete(ctx, existingCM); err != nil && !apierrors.IsNotFound(err) {
			logger.Error(err, "Failed to delete existing output ConfigMap", "configMap", outputConfigMapName)
			return fmt.Errorf("failed to delete existing output ConfigMap: %w", err)
		}
		logger.Info("Successfully deleted old output ConfigMap", "configMap", outputConfigMapName)
	} else if !apierrors.IsNotFound(err) {
		// Unexpected error checking for ConfigMap
		logger.Error(err, "Failed to check for existing output ConfigMap", "configMap", outputConfigMapName)
		return fmt.Errorf("failed to check for existing output ConfigMap: %w", err)
	}

	// Ensure profiling job RBAC exists (only for cluster-wide installation)
890
	if r.Config.Namespace.Restricted == "" {
891
892
893
894
895
896
897
898
899
900
901
		if err := r.RBACManager.EnsureServiceAccountWithRBAC(
			ctx,
			dgdr.Namespace,
			ServiceAccountProfilingJob,
			r.Config.RBAC.DGDRProfilingClusterRoleName,
		); err != nil {
			logger.Error(err, "Failed to ensure profiling job RBAC")
			return fmt.Errorf("failed to ensure profiling job RBAC: %w", err)
		}
	}

902
903
904
905
	// Enrich hardware from GPU discovery before marshalling the spec.
	// This fills in gpuSku, vramMb, numGpusPerNode if the user didn't set them.
	if err := r.enrichHardwareFromDiscovery(ctx, dgdr); err != nil {
		logger.Info("GPU discovery not available, proceeding without enrichment", "reason", err.Error())
906
907
	}

908
909
910
911
912
	// Use SyncResource to create/update the job
	modified, job, err := commonController.SyncResource(ctx, r, dgdr, func(ctx context.Context) (*batchv1.Job, bool, error) {
		jobName := getProfilingJobName(dgdr)
		outputConfigMapName := getOutputConfigMapName(dgdr)

913
914
		// Marshal the DGDR spec to JSON — the profiler receives the spec verbatim
		specJSON, err := marshalDGDRSpec(dgdr)
915
		if err != nil {
916
			return nil, false, err
917
		}
918
919

		// Common environment variables
920
		profilerEnv := []corev1.EnvVar{
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
			{
				Name: "HUGGING_FACE_HUB_TOKEN",
				ValueFrom: &corev1.EnvVarSource{
					SecretKeyRef: &corev1.SecretKeySelector{
						LocalObjectReference: corev1.LocalObjectReference{
							Name: "hf-token-secret",
						},
						Key: "HF_TOKEN",
					},
				},
			},
			{
				Name:  "NATS_SERVER",
				Value: fmt.Sprintf("nats://%s-nats:4222", dgdr.Namespace),
			},
			{
				Name:  "ETCD_ENDPOINTS",
				Value: fmt.Sprintf("%s-etcd:2379", dgdr.Namespace),
			},
940
941
942
943
944
945
946
947
948
949
950
951
952
			// DGDR metadata for setting ownerReferences
			{
				Name:  "DGDR_NAME",
				Value: dgdr.Name,
			},
			{
				Name:  "DGDR_NAMESPACE",
				Value: dgdr.Namespace,
			},
			{
				Name:  "DGDR_UID",
				Value: string(dgdr.UID),
			},
953
954
		}

955
		// Build volume mounts
956
957
958
959
960
961
962
		volumeMounts := []corev1.VolumeMount{
			{
				Name:      VolumeNameProfilingOutput,
				MountPath: ProfilingOutputPath,
			},
		}

963
		// Add model cache PVC mount if configured
964
965
966
967
968
969
970
971
972
973
		modelCachePVC, modelCacheMountPath := extractModelCachePVCConfig(dgdr)
		if modelCachePVC != "" {
			logger.Info("Mounting model cache PVC to profiler pod", "pvc", modelCachePVC, "mountPath", modelCacheMountPath)
			volumeMounts = append(volumeMounts, corev1.VolumeMount{
				Name:      VolumeNameModelCache,
				MountPath: modelCacheMountPath,
				ReadOnly:  true,
			})
		}

974
975
976
977
978
979
980
981
		// v1alpha1 round-trip: mount ConfigMap if referenced via annotation
		cmRef := configMapRefFromAnnotation(dgdr)
		if cmRef != nil {
			volumeMounts = append(volumeMounts, corev1.VolumeMount{
				Name:      VolumeNameProfilingConfig,
				MountPath: ProfilingConfigMountPath,
				ReadOnly:  true,
			})
982
983
		}

984
		// Profiler args: pass the DGDR spec as JSON via --config
985
986
		// --output-dir must match ProfilingOutputPath so the sidecar can find profiler_status.yaml
		profilerArgs := []string{"--config", specJSON, "--output-dir", ProfilingOutputPath}
987

988
989
		// Use image from spec; the defaulting webhook fills this in for production builds.
		// Guard against empty image in case the webhook didn't run (e.g. local dev builds).
990
		imageName := dgdr.Spec.Image
991
992
993
		if imageName == "" {
			return nil, false, fmt.Errorf("spec.image is required but not set; ensure the defaulting webhook ran or set spec.image explicitly")
		}
994
995
		logger.Info("Using profiler image", "image", imageName)

996
		profilerContainer := corev1.Container{
997
998
			Name:         ContainerNameProfiler,
			Image:        imageName,
999
			Command:      []string{"python", "-m", "dynamo.profiler"},
1000
			Args:         profilerArgs,
1001
1002
			Env:          profilerEnv,
			VolumeMounts: volumeMounts,
1003
			WorkingDir:   "/workspace",
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
		}

		// Generate sidecar script from template
		tmpl, err := template.New("sidecar").Parse(sidecarScriptTemplate)
		if err != nil {
			return nil, false, fmt.Errorf("failed to parse sidecar script template: %w", err)
		}

		var scriptBuf bytes.Buffer
		err = tmpl.Execute(&scriptBuf, map[string]string{
1014
1015
1016
1017
1018
			"OutputPath":    ProfilingOutputPath,
			"OutputFile":    ProfilingOutputFile,
			"ConfigMapName": outputConfigMapName,
			"Namespace":     dgdr.Namespace,
			"DGDRName":      dgdr.Name,
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
		})
		if err != nil {
			return nil, false, fmt.Errorf("failed to execute sidecar script template: %w", err)
		}

		sidecarContainer := corev1.Container{
			Name:    ContainerNameOutputCopier,
			Image:   SidecarImage,
			Command: []string{"/bin/sh", "-c"},
			Args:    []string{scriptBuf.String()},
			VolumeMounts: []corev1.VolumeMount{{
				Name:      VolumeNameProfilingOutput,
				MountPath: ProfilingOutputPath,
				ReadOnly:  true,
			}},
		}

1036
1037
		// Use PVC for profiling output if round-tripped v1alpha1 annotation is present,
		// otherwise use emptyDir (v1beta1 default).
1038
		var profilingOutputVolume corev1.Volume
1039
1040
		if outputPVC := outputPVCFromAnnotation(dgdr); outputPVC != "" {
			logger.Info("Using PVC for profiling output (from v1alpha1 annotation)", "pvc", outputPVC)
1041
1042
1043
1044
			profilingOutputVolume = corev1.Volume{
				Name: VolumeNameProfilingOutput,
				VolumeSource: corev1.VolumeSource{
					PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{
1045
						ClaimName: outputPVC,
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
					},
				},
			}
		} else {
			profilingOutputVolume = corev1.Volume{
				Name: VolumeNameProfilingOutput,
				VolumeSource: corev1.VolumeSource{
					EmptyDir: &corev1.EmptyDirVolumeSource{},
				},
			}
		}
		volumes := []corev1.Volume{profilingOutputVolume}
1058

1059
1060
		// Add model cache PVC volume if configured
		if modelCachePVC != "" {
1061
			volumes = append(volumes, corev1.Volume{
1062
				Name: VolumeNameModelCache,
1063
				VolumeSource: corev1.VolumeSource{
1064
1065
1066
					PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{
						ClaimName: modelCachePVC,
						ReadOnly:  true,
1067
1068
1069
1070
1071
					},
				},
			})
		}

1072
1073
1074
1075
1076
1077
		// v1alpha1 round-trip: add ConfigMap volume if referenced via annotation
		if cmRef != nil {
			cmKey := cmRef.Key
			if cmKey == "" {
				cmKey = ProfilingConfigDefaultKey
			}
1078
			volumes = append(volumes, corev1.Volume{
1079
				Name: VolumeNameProfilingConfig,
1080
				VolumeSource: corev1.VolumeSource{
1081
1082
1083
1084
1085
1086
1087
1088
					ConfigMap: &corev1.ConfigMapVolumeSource{
						LocalObjectReference: corev1.LocalObjectReference{
							Name: cmRef.Name,
						},
						Items: []corev1.KeyToPath{{
							Key:  cmKey,
							Path: ProfilingConfigDefaultKey,
						}},
1089
1090
1091
1092
1093
					},
				},
			})
		}

1094
1095
1096
		// Limit retries to prevent infinite loop
		backoffLimit := int32(3)

1097
1098
1099
1100
		podSpec := corev1.PodSpec{
			ServiceAccountName: ServiceAccountProfilingJob,
			RestartPolicy:      corev1.RestartPolicyNever,
			SecurityContext: &corev1.PodSecurityContext{
1101
1102
1103
1104
				RunAsNonRoot: ptr.To(true),
				RunAsUser:    ptr.To[int64](1000),
				RunAsGroup:   ptr.To[int64](1000),
				FSGroup:      ptr.To[int64](1000),
1105
1106
1107
1108
1109
1110
1111
1112
			},
			Containers: []corev1.Container{profilerContainer, sidecarContainer},
			Volumes:    volumes,
			ImagePullSecrets: []corev1.LocalObjectReference{
				{Name: "nvcr-imagepullsecret"},
			},
		}

1113
1114
1115
1116
1117
		job := &batchv1.Job{
			ObjectMeta: metav1.ObjectMeta{
				Name:      jobName,
				Namespace: dgdr.Namespace,
				Labels: map[string]string{
1118
1119
1120
					nvidiacomv1beta1.LabelApp:       nvidiacomv1beta1.LabelValueDynamoProfiler,
					nvidiacomv1beta1.LabelDGDR:      dgdr.Name,
					nvidiacomv1beta1.LabelManagedBy: nvidiacomv1beta1.LabelValueDynamoOperator,
1121
1122
1123
1124
1125
				},
			},
			Spec: batchv1.JobSpec{
				BackoffLimit: &backoffLimit,
				Template: corev1.PodTemplateSpec{
1126
					Spec: podSpec,
1127
1128
1129
1130
				},
			},
		}

1131
1132
1133
1134
1135
		var jobOverrides *batchv1.JobSpec
		if dgdr.Spec.Overrides != nil {
			jobOverrides = dgdr.Spec.Overrides.ProfilingJob
		}
		applyProfilingJobOverrides(job, jobOverrides)
1136

1137
1138
1139
1140
1141
1142
1143
1144
		return job, false, nil
	})

	if err != nil {
		return err
	}

	if modified {
1145
		logger.Info("Profiling job created/updated", "job", job.Name)
1146
1147
	}

1148
1149
1150
	// Store the job name in status for observability
	dgdr.Status.ProfilingJobName = job.Name

1151
1152
1153
	return nil
}

1154
1155
1156
1157
1158
1159
// marshalDGDRSpec produces the JSON string passed to the profiler via --config.
// The profiler receives the DGDR spec verbatim — no bespoke key mapping needed.
func marshalDGDRSpec(dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest) (string, error) {
	specJSON, err := json.Marshal(dgdr.Spec)
	if err != nil {
		return "", fmt.Errorf("failed to marshal DGDR spec to JSON: %w", err)
1160
	}
1161
1162
	return string(specJSON), nil
}
1163

1164
1165
1166
1167
1168
1169
1170
1171
// enrichHardwareFromDiscovery fills in hardware fields that the user didn't set.
// Called before marshalDGDRSpec(). Mutates dgdr.Spec.Hardware in-place (memory only, not persisted).
func (r *DynamoGraphDeploymentRequestReconciler) enrichHardwareFromDiscovery(ctx context.Context, dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest) error {
	if dgdr.Spec.Hardware == nil {
		dgdr.Spec.Hardware = &nvidiacomv1beta1.HardwareSpec{}
	}
	hw := dgdr.Spec.Hardware
	if hw.GPUSKU != "" && hw.VRAMMB != nil && hw.NumGPUsPerNode != nil {
hhzhang16's avatar
hhzhang16 committed
1172
		return nil // all fields already set by user; TotalGPUs is filled below when discovery runs
1173
1174
	}

1175
	gpuInfo, err := gpu.DiscoverGPUs(ctx, r.APIReader)
1176
1177
	if err != nil {
		return err
1178
1179
	}

1180
1181
1182
	logger := log.FromContext(ctx)
	logger.Info("GPU discovery completed successfully",
		"gpusPerNode", gpuInfo.GPUsPerNode,
hhzhang16's avatar
hhzhang16 committed
1183
1184
		"nodesWithGPUs", gpuInfo.NodesWithGPUs,
		"totalGpus", gpuInfo.GPUsPerNode*gpuInfo.NodesWithGPUs,
1185
1186
1187
1188
1189
1190
1191
1192
1193
		"model", gpuInfo.Model,
		"vramMiB", gpuInfo.VRAMPerGPU)

	if hw.GPUSKU == "" {
		hw.GPUSKU = gpuInfo.Model
	}
	if hw.VRAMMB == nil {
		vram := float64(gpuInfo.VRAMPerGPU)
		hw.VRAMMB = &vram
1194
	}
1195
1196
1197
1198
	if hw.NumGPUsPerNode == nil {
		n := int32(gpuInfo.GPUsPerNode)
		hw.NumGPUsPerNode = &n
	}
hhzhang16's avatar
hhzhang16 committed
1199
1200
1201
1202
	if hw.TotalGPUs == nil {
		total := int32(gpuInfo.GPUsPerNode * gpuInfo.NodesWithGPUs)
		hw.TotalGPUs = &total
	}
1203
1204
	return nil
}
1205

1206
1207
1208
1209
// extractModelCachePVCConfig reads model cache PVC settings from the typed v1beta1 spec.
// Returns (pvcName, mountPath) — both empty if not configured.
func extractModelCachePVCConfig(dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest) (string, string) {
	if dgdr.Spec.ModelCache == nil || dgdr.Spec.ModelCache.PVCName == "" {
1210
1211
		return "", ""
	}
1212
	mountPath := dgdr.Spec.ModelCache.PVCMountPath
1213
1214
1215
	if mountPath == "" {
		mountPath = DefaultModelCacheMountPath
	}
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
	return dgdr.Spec.ModelCache.PVCName, mountPath
}

// configMapKeySelector mirrors v1alpha1.ConfigMapKeySelector for annotation deserialization.
type configMapKeySelector struct {
	Name string `json:"name"`
	Key  string `json:"key,omitempty"`
}

// configMapRefFromAnnotation reads the ConfigMap reference from the round-trip annotation.
// Returns nil for native v1beta1 resources (no annotation present).
func configMapRefFromAnnotation(dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest) *configMapKeySelector {
	if dgdr.Annotations == nil {
		return nil
	}
	raw, ok := dgdr.Annotations[AnnotationConfigMapRef]
	if !ok || raw == "" {
		return nil
	}
	var ref configMapKeySelector
	if err := json.Unmarshal([]byte(raw), &ref); err != nil {
		return nil
	}
	return &ref
}
1241

1242
1243
1244
1245
1246
1247
1248
// outputPVCFromAnnotation reads the output PVC name from the round-trip annotation.
// Returns "" for native v1beta1 resources (always emptyDir).
func outputPVCFromAnnotation(dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest) string {
	if dgdr.Annotations == nil {
		return ""
	}
	return dgdr.Annotations[AnnotationOutputPVC]
1249
1250
}

1251
// checkProfilingJobStatus checks if the profiling job has completed
1252
func (r *DynamoGraphDeploymentRequestReconciler) checkProfilingJobStatus(ctx context.Context, dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest) (bool, error) {
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
	logger := log.FromContext(ctx)
	jobName := getProfilingJobName(dgdr)

	job := &batchv1.Job{}
	if err := r.Get(ctx, types.NamespacedName{Name: jobName, Namespace: dgdr.Namespace}, job); err != nil {
		return false, err
	}

	// Check job conditions
	for _, condition := range job.Status.Conditions {
		if condition.Type == batchv1.JobComplete && condition.Status == corev1.ConditionTrue {
			logger.Info("Profiling job completed", "job", jobName)
			return true, nil
		}
		if condition.Type == batchv1.JobFailed && condition.Status == corev1.ConditionTrue {
1268
1269
1270
1271
1272
			// Get detailed error from pod logs
			detailedError := r.getProfilingJobErrorDetails(ctx, dgdr, job)
			if detailedError != "" {
				return false, fmt.Errorf("profiling job failed: %s. Details: %s", condition.Message, detailedError)
			}
1273
1274
1275
1276
1277
1278
1279
			return false, fmt.Errorf("profiling job failed: %s", condition.Message)
		}
	}

	return false, nil
}

1280
// getProfilingJobErrorDetails retrieves detailed error information from failed profiling job pods
1281
func (r *DynamoGraphDeploymentRequestReconciler) getProfilingJobErrorDetails(ctx context.Context, dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest, job *batchv1.Job) string {
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
	logger := log.FromContext(ctx)

	// List pods owned by this job
	podList := &corev1.PodList{}
	labelSelector := client.MatchingLabels{
		"job-name": job.Name,
	}

	if err := r.List(ctx, podList, client.InNamespace(dgdr.Namespace), labelSelector); err != nil {
		logger.Error(err, "Failed to list pods for profiling job")
		return ""
	}

	// Look for failed pods and extract error details
	for _, pod := range podList.Items {
		// Check pod phase and container statuses
		if pod.Status.Phase == corev1.PodFailed {
			// Get profiler container status (first container)
			for _, containerStatus := range pod.Status.ContainerStatuses {
				if containerStatus.Name == ContainerNameProfiler && containerStatus.State.Terminated != nil {
					terminated := containerStatus.State.Terminated
					// Construct detailed error message
					errorMsg := fmt.Sprintf("Pod: %s, Container: %s, ExitCode: %d, Reason: %s",
						pod.Name, containerStatus.Name, terminated.ExitCode, terminated.Reason)
					if terminated.Message != "" {
						errorMsg += fmt.Sprintf(", Message: %s", terminated.Message)
					}
					logger.Info("Retrieved profiling job error details", "error", errorMsg)
					return errorMsg
				}
			}

			// If no terminated state found, check waiting state
			for _, containerStatus := range pod.Status.ContainerStatuses {
				if containerStatus.Name == ContainerNameProfiler && containerStatus.State.Waiting != nil {
					waiting := containerStatus.State.Waiting
					errorMsg := fmt.Sprintf("Pod: %s, Container: %s, Waiting - Reason: %s, Message: %s",
						pod.Name, containerStatus.Name, waiting.Reason, waiting.Message)
					logger.Info("Retrieved profiling job waiting details", "error", errorMsg)
					return errorMsg
				}
			}
		}
	}

	return ""
}

1330
// generateDGDSpec generates DGD spec from profiling results (online or offline/AIC)
1331
func (r *DynamoGraphDeploymentRequestReconciler) generateDGDSpec(ctx context.Context, dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest) error {
1332
	logger := log.FromContext(ctx)
1333
	logger.Info("Generating DGD spec from profiling results", "name", dgdr.Name, "backend", dgdr.Spec.Backend)
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349

	// Read the generated spec from ConfigMap (created by sidecar)
	outputConfigMapName := getOutputConfigMapName(dgdr)
	cm := &corev1.ConfigMap{}
	err := r.Get(ctx, types.NamespacedName{
		Name:      outputConfigMapName,
		Namespace: dgdr.Namespace,
	}, cm)

	if err != nil {
		if apierrors.IsNotFound(err) {
			return fmt.Errorf("output ConfigMap %s not found - profiling may not have completed yet", outputConfigMapName)
		}
		return fmt.Errorf("failed to get output ConfigMap: %w", err)
	}

1350
	// Select the right config file based on mocker feature flag
1351
1352
	// Profiler writes the selected config (real or mocker) to a single output file
	outputFile := ProfilingOutputFile
1353

1354
	// Get YAML content from ConfigMap
1355
	yamlContent, exists := cm.Data[outputFile]
1356
	if !exists {
1357
		return fmt.Errorf("key %s not found in ConfigMap %s", outputFile, outputConfigMapName)
1358
1359
	}

1360
	logger.Info("Found profiling output in ConfigMap", "configMap", outputConfigMapName, "outputFile", outputFile, "size", len(yamlContent))
1361

1362
1363
1364
	// Extract DGD and any supporting resources from potentially multi-document YAML (ConfigMap + DGD)
	dgd, additionalResources, err := r.extractResourcesFromYAML([]byte(yamlContent))
	if err != nil {
1365
		return fmt.Errorf("failed to extract DGD from %s: %w", outputFile, err)
1366
1367
	}

1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
	logger.Info("Parsed profiling output", "dgdName", dgd.Name, "additionalResources", len(additionalResources))

	// Store additional resources (ConfigMaps) in annotations first
	if len(additionalResources) > 0 {
		if err := r.storeAdditionalResources(ctx, dgdr, additionalResources); err != nil {
			logger.Error(err, "Failed to store additional resources")
			return err
		}
		// Refetch the DGDR after updating annotations to get the latest resourceVersion
		if err := r.Get(ctx, types.NamespacedName{Name: dgdr.Name, Namespace: dgdr.Namespace}, dgdr); err != nil {
			return fmt.Errorf("failed to refetch DGDR after storing annotations: %w", err)
		}
	}
1381

1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
	// Store the generated DGD name in status and cache the spec in an annotation for createDGD
	dgdr.Status.DGDName = dgd.Name

	// Store the generated DGD in ProfilingResults.SelectedConfig for status visibility
	dgdJSON, err := json.Marshal(dgd)
	if err != nil {
		return fmt.Errorf("failed to marshal generated DGD to JSON: %w", err)
	}
	if dgdr.Status.ProfilingResults == nil {
		dgdr.Status.ProfilingResults = &nvidiacomv1beta1.ProfilingResultsStatus{}
	}
	dgdr.Status.ProfilingResults.SelectedConfig = &runtime.RawExtension{Raw: dgdJSON}

	// Serialize the DGD spec to an annotation so createDGD can retrieve it
	dgdBytes, err := sigsyaml.Marshal(dgd)
	if err != nil {
		return fmt.Errorf("failed to marshal generated DGD: %w", err)
	}
	if dgdr.Annotations == nil {
		dgdr.Annotations = make(map[string]string)
	}
	dgdr.Annotations["nvidia.com/generated-dgd-spec"] = string(dgdBytes)

	// Update the object (annotations are on the object, not status)
	if err := r.Update(ctx, dgdr); err != nil {
		return fmt.Errorf("failed to update DGDR with generated DGD annotation: %w", err)
	}

	// Refetch the DGDR after the annotation update to get the latest resourceVersion
	// and avoid conflicts with concurrent modifications before updating status.
	if err := r.Get(ctx, types.NamespacedName{Name: dgdr.Name, Namespace: dgdr.Namespace}, dgdr); err != nil {
		return fmt.Errorf("failed to refetch DGDR after annotation update: %w", err)
1414
1415
1416
1417
1418
	}

	return r.Status().Update(ctx, dgdr)
}

1419
1420
// storeAdditionalResources marshals additional resources to YAML and stores them in DGDR annotations.
// Validates annotation size and fails gracefully if too large.
1421
func (r *DynamoGraphDeploymentRequestReconciler) storeAdditionalResources(ctx context.Context, dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest, resources []*unstructured.Unstructured) error {
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
	if len(resources) == 0 {
		return nil
	}

	var resourcesYAML []byte

	for i, res := range resources {
		resYAML, err := sigsyaml.Marshal(res.Object)
		if err != nil {
			return fmt.Errorf("failed to marshal resource %s/%s: %w", res.GetKind(), res.GetName(), err)
		}
		if i > 0 {
			resourcesYAML = append(resourcesYAML, []byte("\n---\n")...)
		}
		resourcesYAML = append(resourcesYAML, resYAML...)
	}

	// Validate size before storing
	if len(resourcesYAML) > MaxAnnotationSize {
		return fmt.Errorf("additional resources YAML size (%d bytes) exceeds maximum annotation size (%d bytes); "+
			"consider reducing the number of resources or storing them separately",
			len(resourcesYAML), MaxAnnotationSize)
	}

	if dgdr.Annotations == nil {
		dgdr.Annotations = make(map[string]string)
	}
	dgdr.Annotations[AnnotationAdditionalResources] = string(resourcesYAML)

	return r.Update(ctx, dgdr)
}

// extractResourcesFromYAML parses multi-document YAML from profiling output,
// extracting the DynamoGraphDeployment and any ConfigMaps that should be deployed with it.
1456
func (r *DynamoGraphDeploymentRequestReconciler) extractResourcesFromYAML(yamlContent []byte) (*dgdv1alpha1.DynamoGraphDeployment, []*unstructured.Unstructured, error) {
1457
1458
	decoder := yaml.NewYAMLOrJSONDecoder(bytes.NewReader(yamlContent), 4096)

1459
	var dgd *dgdv1alpha1.DynamoGraphDeployment
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
	var additionalResources []*unstructured.Unstructured

	for {
		obj := &unstructured.Unstructured{}
		if err := decoder.Decode(obj); err != nil {
			if err == io.EOF {
				break
			}
			// Skip invalid documents and continue
			continue
		}

		// Skip empty objects
		if obj.GetKind() == "" {
			continue
		}

		if obj.GetKind() == "DynamoGraphDeployment" {
1478
			dgd = &dgdv1alpha1.DynamoGraphDeployment{}
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
			if err := runtime.DefaultUnstructuredConverter.FromUnstructured(obj.Object, dgd); err != nil {
				return nil, nil, fmt.Errorf("failed to convert to DynamoGraphDeployment: %w", err)
			}
		} else {
			// Store ConfigMaps or other resources for deployment
			additionalResources = append(additionalResources, obj)
		}
	}

	if dgd == nil {
		return nil, nil, fmt.Errorf("no DynamoGraphDeployment found in YAML content")
	}

	return dgd, additionalResources, nil
}

// extractDGDFromYAML is a convenience wrapper that extracts only the DGD (used by tests)
1496
func (r *DynamoGraphDeploymentRequestReconciler) extractDGDFromYAML(yamlContent []byte) (*dgdv1alpha1.DynamoGraphDeployment, error) {
1497
1498
1499
1500
	dgd, _, err := r.extractResourcesFromYAML(yamlContent)
	return dgd, err
}

1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
// updateDeploymentInfo populates status.deploymentInfo from DGD service replica counts.
func updateDeploymentInfo(dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest, dgd *dgdv1alpha1.DynamoGraphDeployment) bool {
	var totalReplicas, totalAvailable int32
	for _, svc := range dgd.Status.Services {
		totalReplicas += svc.Replicas
		if svc.AvailableReplicas != nil {
			totalAvailable += *svc.AvailableReplicas
		}
	}

	// Short-circuit if nothing changed
	if cur := dgdr.Status.DeploymentInfo; cur != nil &&
		cur.Replicas != nil && *cur.Replicas == totalReplicas &&
		cur.AvailableReplicas != nil && *cur.AvailableReplicas == totalAvailable {
		return false
	}

	dgdr.Status.DeploymentInfo = &nvidiacomv1beta1.DeploymentInfoStatus{
		Replicas:          &totalReplicas,
		AvailableReplicas: &totalAvailable,
	}
	return true
}

// setSucceededCondition sets the aggregate Succeeded condition based on the current phase.
func setSucceededCondition(dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest, phase nvidiacomv1beta1.DGDRPhase) {
	var status metav1.ConditionStatus
	var reason, message string

	switch phase {
	case nvidiacomv1beta1.DGDRPhasePending, "":
		status, reason, message = metav1.ConditionFalse, "Pending", "DGDR is pending"
	case nvidiacomv1beta1.DGDRPhaseProfiling:
		status, reason, message = metav1.ConditionFalse, "Profiling", "Profiling is in progress"
	case nvidiacomv1beta1.DGDRPhaseReady:
		status, reason, message = metav1.ConditionTrue, "SpecGenerated", "Profiling complete, spec available"
	case nvidiacomv1beta1.DGDRPhaseDeploying:
		status, reason, message = metav1.ConditionFalse, "Deploying", "Deployment is in progress"
	case nvidiacomv1beta1.DGDRPhaseDeployed:
		status, reason, message = metav1.ConditionTrue, "Deployed", "Deployment is healthy"
	case nvidiacomv1beta1.DGDRPhaseFailed:
		status, reason, message = metav1.ConditionFalse, "Failed", "DGDR has failed"
	default:
		status, reason, message = metav1.ConditionFalse, "Unknown", "Unknown phase"
	}

	meta.SetStatusCondition(&dgdr.Status.Conditions, metav1.Condition{
		Type:               nvidiacomv1beta1.ConditionTypeSucceeded,
		Status:             status,
		ObservedGeneration: dgdr.Generation,
		Reason:             reason,
		Message:            message,
	})
}

// updatePhaseAndRequeue updates the DGDR phase and requeues
func (r *DynamoGraphDeploymentRequestReconciler) updatePhaseAndRequeue(ctx context.Context, dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest, phase nvidiacomv1beta1.DGDRPhase, message string) (ctrl.Result, error) {
	logger := log.FromContext(ctx)
	logger.Info("Updating DGDR phase", "name", dgdr.Name, "phase", phase, "message", message)
	dgdr.Status.Phase = phase
	setSucceededCondition(dgdr, phase)
1562
1563
1564
1565
1566
1567
	if err := r.Status().Update(ctx, dgdr); err != nil {
		return ctrl.Result{}, err
	}
	return ctrl.Result{Requeue: true}, nil
}

1568
1569
// updatePhaseWithCondition updates phase and adds/updates a condition
func (r *DynamoGraphDeploymentRequestReconciler) updatePhaseWithCondition(
1570
	ctx context.Context,
1571
1572
	dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest,
	phase nvidiacomv1beta1.DGDRPhase,
1573
1574
1575
1576
1577
	conditionType string,
	status metav1.ConditionStatus,
	reason string,
	message string,
) (ctrl.Result, error) {
1578
1579
	dgdr.Status.Phase = phase
	setSucceededCondition(dgdr, phase)
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601

	condition := metav1.Condition{
		Type:               conditionType,
		Status:             status,
		ObservedGeneration: dgdr.Generation,
		LastTransitionTime: metav1.Now(),
		Reason:             reason,
		Message:            message,
	}

	dgdr.AddStatusCondition(condition)

	if err := r.Status().Update(ctx, dgdr); err != nil {
		return ctrl.Result{}, err
	}

	return ctrl.Result{Requeue: true}, nil
}

// SetupWithManager sets up the controller with the Manager
func (r *DynamoGraphDeploymentRequestReconciler) SetupWithManager(mgr ctrl.Manager) error {
	return ctrl.NewControllerManagedBy(mgr).
1602
		For(&nvidiacomv1beta1.DynamoGraphDeploymentRequest{}).
1603
		Named(consts.ResourceTypeDynamoGraphDeploymentRequest).
1604
1605
1606
1607
1608
1609
1610
1611
		Owns(&batchv1.Job{}, builder.WithPredicates(predicate.Funcs{
			// ignore creation cause we don't want to be called again after we create the job
			CreateFunc:  func(ce event.CreateEvent) bool { return false },
			DeleteFunc:  func(de event.DeleteEvent) bool { return true },
			UpdateFunc:  func(de event.UpdateEvent) bool { return true },
			GenericFunc: func(ge event.GenericEvent) bool { return true },
		})). // Watch Jobs created by this controller (via ownerReference)
		Watches(
1612
			&dgdv1alpha1.DynamoGraphDeployment{},
1613
1614
			handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, obj client.Object) []ctrl.Request {
				// Find DGDR by label instead of owner reference
1615
1616
1617
				dgd := obj.(*dgdv1alpha1.DynamoGraphDeployment)
				dgdrName, hasName := dgd.Labels[nvidiacomv1beta1.LabelDGDRName]
				dgdrNamespace, hasNamespace := dgd.Labels[nvidiacomv1beta1.LabelDGDRNamespace]
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631
1632
1633
1634
				if !hasName || !hasNamespace {
					return nil
				}
				return []ctrl.Request{{
					NamespacedName: types.NamespacedName{
						Name:      dgdrName,
						Namespace: dgdrNamespace,
					},
				}}
			}),
			builder.WithPredicates(predicate.Funcs{
				// ignore creation cause we don't want to be called again after we create the DGD
				CreateFunc:  func(ce event.CreateEvent) bool { return false },
				DeleteFunc:  func(de event.DeleteEvent) bool { return true },
				UpdateFunc:  func(ue event.UpdateEvent) bool { return true },
				GenericFunc: func(ge event.GenericEvent) bool { return true },
			}),
1635
1636
1637
1638
		).
		// Watch DGDs created by this controller (via label)
		// Set the event filter to ignore resources handled by other controllers in namespace-restricted mode
		WithEventFilter(commonController.EphemeralDeploymentEventFilter(r.Config, r.RuntimeConfig)).
1639
		Complete(observability.NewObservedReconciler(r, consts.ResourceTypeDynamoGraphDeploymentRequest))
1640
}