dynamographdeploymentrequest_controller.go 68.8 KB
Newer Older
1
/*
2
 * SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
 * SPDX-License-Identifier: Apache-2.0
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package controller

import (
	"bytes"
	"context"
23
24
	"encoding/json"
	"errors"
25
	"fmt"
26
	"io"
27
	"strings"
28
29
30
31
32
33
34
	"text/template"

	batchv1 "k8s.io/api/batch/v1"
	corev1 "k8s.io/api/core/v1"
	apierrors "k8s.io/apimachinery/pkg/api/errors"
	"k8s.io/apimachinery/pkg/api/meta"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
35
	"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
36
37
	"k8s.io/apimachinery/pkg/runtime"
	"k8s.io/apimachinery/pkg/types"
38
	"k8s.io/apimachinery/pkg/util/yaml"
39
	"k8s.io/client-go/tools/record"
40
	"k8s.io/utils/ptr"
41
42
43
44
45
46
47
	ctrl "sigs.k8s.io/controller-runtime"
	"sigs.k8s.io/controller-runtime/pkg/builder"
	"sigs.k8s.io/controller-runtime/pkg/client"
	"sigs.k8s.io/controller-runtime/pkg/event"
	"sigs.k8s.io/controller-runtime/pkg/handler"
	"sigs.k8s.io/controller-runtime/pkg/log"
	"sigs.k8s.io/controller-runtime/pkg/predicate"
48
	sigsyaml "sigs.k8s.io/yaml"
49

50
	configv1alpha1 "github.com/ai-dynamo/dynamo/deploy/operator/api/config/v1alpha1"
51
52
	dgdv1alpha1 "github.com/ai-dynamo/dynamo/deploy/operator/api/v1alpha1"
	nvidiacomv1beta1 "github.com/ai-dynamo/dynamo/deploy/operator/api/v1beta1"
53
	"github.com/ai-dynamo/dynamo/deploy/operator/internal/consts"
54
	commonController "github.com/ai-dynamo/dynamo/deploy/operator/internal/controller_common"
55
	"github.com/ai-dynamo/dynamo/deploy/operator/internal/gpu"
56
	"github.com/ai-dynamo/dynamo/deploy/operator/internal/observability"
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
)

const (
	// Job naming
	JobNamePrefixOnline = "profile-online-"
	JobNamePrefixAIC    = "profile-aic-"

	// Container names
	ContainerNameProfiler     = "profiler"
	ContainerNameOutputCopier = "output-copier"

	// ServiceAccount
	ServiceAccountProfilingJob = "dgdr-profiling-job"

	// ConfigMap naming
	ConfigMapOutputPrefix = "dgdr-output-"

74
75
76
	// Annotation keys
	AnnotationAdditionalResources = "dgdr.nvidia.com/additional-resources"

77
78
79
80
81
82
	// Annotation keys for v1alpha1 round-trip compatibility.
	// The conversion layer stores v1alpha1 fields that have no v1beta1 spec equivalent
	// as annotations so the controller can still honour them for converted resources.
	AnnotationConfigMapRef = "nvidia.com/dgdr-config-map-ref"
	AnnotationOutputPVC    = "nvidia.com/dgdr-output-pvc"

83
84
85
	// Size limits
	MaxAnnotationSize = 250000 // ~250KB, below K8s 256KB limit

86
87
88
89
90
	// Sidecar image
	SidecarImage = "bitnami/kubectl:latest"

	// Volume names
	VolumeNameProfilingOutput = "profiling-output"
91
	VolumeNameProfilingConfig = "profiling-config"
92
	VolumeNameModelCache      = "model-cache"
93
94

	// Volume paths
95
	ProfilingOutputPath        = "/data"
96
	ProfilingOutputFile        = "final_config.yaml"
97
98
	ProfilingConfigMountPath   = "/config"
	ProfilingConfigDefaultKey  = "disagg.yaml"
99
	DefaultModelCacheMountPath = "/opt/model-cache"
100
101
102
103
104
105
106
107
108
109

	// Command line arguments
	ArgModel   = "--model"
	ArgBackend = "--backend"
	ArgTTFT    = "--ttft"
	ArgITL     = "--itl"
	ArgConfig  = "--config"

	// Messages
	MessageInitialized               = "DGDR initialized successfully"
110
	MessageDiscoveringHardware       = "Discovering GPU hardware and preparing profiling job"
111
112
113
114
	MessageProfilingJobCreated       = "Profiling job created"
	MessageAICProfilingJobCreated    = "AIC profiling job created"
	MessageProfilingInProgress       = "Profiling is in progress"
	MessageSpecGenerated             = "DynamoGraphDeployment spec generated successfully"
115
	MessageSpecAvailable             = "Generated spec is available in annotation nvidia.com/generated-dgd-spec"
116
117
118
119
120
	MessageDeploymentCreated         = "DynamoGraphDeployment %s created successfully"
	MessageDeploymentReady           = "DynamoGraphDeployment %s is ready"
	MessageDeploymentDegraded        = "DynamoGraphDeployment %s degraded from Ready to %s"
	MessageDeploymentDeleted         = "DGD %s was deleted. DGDR will not recreate it. Delete this DGDR and create a new one to redeploy."
	MessageInvalidState              = "Invalid state"
121
	MessageSpecChangeRejected        = "Cannot modify spec in phase '%s'. DynamoGraphDeploymentRequest is immutable once profiling starts. Create a new resource with a different name instead."
122
123
124
125
126
127
128
129
	MessageJobCreationFailed         = "JobCreationFailed"
	MessageDeploymentCreationFailed  = "DeploymentCreationFailed"
	MessageResultsRetrievalFailed    = "ResultsRetrievalFailed"
	MessageGenerationFailed          = "GenerationFailed"
	MessageAIConfiguratorCheckFailed = "AIConfiguratorCheckFailed"
	MessageProfilingCheckFailed      = "ProfilingCheckFailed"
	MessageConfigMapNotFound         = "ConfigMap %s not found in namespace %s"
	MessageConfigMapKeyNotFound      = "key %s not found in ConfigMap %s"
130
	MessageModelCachePVCNotFound     = "model cache PVC %s not found in namespace %s"
131
132
133
134
135
136
)

// shell script template for the output copier sidecar
const sidecarScriptTemplate = `
set -e
set -o pipefail
137
138

# Wait for profiler container to terminate (no timeout - profiling can take hours)
139
echo "Waiting for profiler to complete..."
140
141
142
143
START_TIME=$(date +%s)
LAST_PROGRESS_LOG=$START_TIME
PROGRESS_INTERVAL=300

144
while true; do
145
146
147
148
149
150
151
152
153
154
155
156
157
  CURRENT_TIME=$(date +%s)
  ELAPSED=$((CURRENT_TIME - START_TIME))

  # Log progress every 5 minutes
  if [ $((CURRENT_TIME - LAST_PROGRESS_LOG)) -ge $PROGRESS_INTERVAL ]; then
    echo "Still waiting... ($(($ELAPSED / 60)) minutes elapsed)"
    LAST_PROGRESS_LOG=$CURRENT_TIME
  fi

  # Check if profiler container terminated
  CONTAINER_STATUS=$(kubectl get pod $HOSTNAME -n {{.Namespace}} -o jsonpath='{.status.containerStatuses[?(@.name=="profiler")].state}' 2>/dev/null || echo "")
  if echo "$CONTAINER_STATUS" | grep -q "terminated"; then
    echo "Profiler terminated (ran for $(($ELAPSED / 60)) minutes)"
158
159
160
161
162
    break
  fi
  sleep 5
done

163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
# Check profiler status file (2 minute timeout)
echo "Checking profiler status..."
STATUS_FILE="{{.OutputPath}}/profiler_status.yaml"
TIMEOUT=120
CHECK_START=$(date +%s)

# Wait for status file to exist
while [ ! -f "$STATUS_FILE" ]; do
  ELAPSED=$(($(date +%s) - CHECK_START))
  if [ $ELAPSED -ge $TIMEOUT ]; then
    echo "ERROR: Status file not found after ${TIMEOUT}s"
    exit 1
  fi
  sleep 2
done

# Read and parse status from YAML file
STATUS=$(grep "^status:" "$STATUS_FILE" | awk '{print $2}' | tr -d '"' | tr -d "'")

if [ -z "$STATUS" ]; then
  echo "ERROR: Invalid status file format"
  exit 1
fi

# Check status value
case "$STATUS" in
  success)
    MESSAGE=$(grep "^message:" "$STATUS_FILE" | sed 's/^message: *//' | tr -d '"' | tr -d "'")
    echo "Profiler succeeded: $MESSAGE"
    ;;
  failed)
    ERROR=$(grep "^error:" "$STATUS_FILE" | sed 's/^error: *//' | tr -d '"' | tr -d "'")
    MESSAGE=$(grep "^message:" "$STATUS_FILE" | sed 's/^message: *//' | tr -d '"' | tr -d "'")
    echo "ERROR: Profiler failed: ${ERROR:-$MESSAGE}"
    exit 1
    ;;
  running)
    echo "ERROR: Profiler still running (unexpected)"
    exit 1
    ;;
  *)
    echo "ERROR: Unknown status: $STATUS"
    exit 1
    ;;
esac

echo "Creating ConfigMap..."
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225

# Start building ConfigMap YAML with DGD spec
cat >/tmp/cm.yaml <<EOF
apiVersion: v1
kind: ConfigMap
metadata:
  name: {{.ConfigMapName}}
  namespace: {{.Namespace}}
  labels:
    dgdr.nvidia.com/name: {{.DGDRName}}
    nvidia.com/managed-by: dynamo-operator
data:
  {{.OutputFile}}: |
EOF
sed 's/^/    /' {{.OutputPath}}/{{.OutputFile}} >> /tmp/cm.yaml

226
227
228
229
230
231
# Add profiler status file for debugging
if [ -f {{.OutputPath}}/profiler_status.yaml ]; then
  echo "  profiler_status.yaml: |" >> /tmp/cm.yaml
  sed 's/^/    /' {{.OutputPath}}/profiler_status.yaml >> /tmp/cm.yaml
fi

232
233
234
235
236
237
# Add webui_data.json for pareto curve data (used by operator to populate status.profilingResults.pareto)
if [ -f {{.OutputPath}}/webui_data.json ]; then
  echo "  webui_data.json: |" >> /tmp/cm.yaml
  sed 's/^/    /' {{.OutputPath}}/webui_data.json >> /tmp/cm.yaml
fi

238
239
# Note: Profiling data (raw_data.npz converted to JSON) is included in the
# generated DGD YAML as a separate ConfigMap by the profiler, no need to add it here
240
241
242
243
244
245
246
247

kubectl apply -f /tmp/cm.yaml
echo "Saved profiling output to ConfigMap {{.ConfigMapName}}"
`

// DynamoGraphDeploymentRequestReconciler reconciles a DynamoGraphDeploymentRequest object
type DynamoGraphDeploymentRequestReconciler struct {
	client.Client
248
249
250
251
252
253
	APIReader         client.Reader
	Recorder          record.EventRecorder
	Config            *configv1alpha1.OperatorConfiguration
	RuntimeConfig     *commonController.RuntimeConfig
	GPUDiscoveryCache *gpu.GPUDiscoveryCache
	GPUDiscovery      *gpu.GPUDiscovery
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
	// RBACMgr handles RBAC setup for profiling jobs
	RBACManager RBACManager
}

// RBACManager interface for managing RBAC resources
type RBACManager interface {
	EnsureServiceAccountWithRBAC(ctx context.Context, targetNamespace, serviceAccountName, clusterRoleName string) error
}

// GetRecorder implements commonController.Reconciler interface
func (r *DynamoGraphDeploymentRequestReconciler) GetRecorder() record.EventRecorder {
	return r.Recorder
}

// FinalizeResource implements commonController.Finalizer interface
269
func (r *DynamoGraphDeploymentRequestReconciler) FinalizeResource(ctx context.Context, dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest) error {
270
271
272
273
274
275
276
277
278
279
280
281
282
	logger := log.FromContext(ctx)

	logger.Info("DGDR finalized successfully", "name", dgdr.Name)
	return nil
}

// +kubebuilder:rbac:groups=nvidia.com,resources=dynamographdeploymentrequests,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=nvidia.com,resources=dynamographdeploymentrequests/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=nvidia.com,resources=dynamographdeploymentrequests/finalizers,verbs=update
// +kubebuilder:rbac:groups=nvidia.com,resources=dynamographdeployments,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=nvidia.com,resources=dynamographdeployments/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=nvidia.com,resources=dynamographdeployments/finalizers,verbs=update
// +kubebuilder:rbac:groups=batch,resources=jobs,verbs=get;list;watch;create;update;patch;delete
283
// +kubebuilder:rbac:groups=core,resources=pods,verbs=get;list;watch
284
285
286
287
288
289
290
291
292
// +kubebuilder:rbac:groups=core,resources=configmaps,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=core,resources=events,verbs=create;patch

// Reconcile handles the reconciliation loop for DynamoGraphDeploymentRequest
func (r *DynamoGraphDeploymentRequestReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
	logger := log.FromContext(ctx)
	logger.Info("Reconciling DynamoGraphDeploymentRequest", "name", req.Name, "namespace", req.Namespace)

	// Fetch the DGDR instance
293
	dgdr := &nvidiacomv1beta1.DynamoGraphDeploymentRequest{}
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
	if err := r.Get(ctx, req.NamespacedName, dgdr); err != nil {
		if apierrors.IsNotFound(err) {
			logger.Info("DGDR resource not found, ignoring since object must be deleted")
			return ctrl.Result{}, nil
		}
		logger.Error(err, "Failed to get DGDR")
		return ctrl.Result{}, err
	}

	// Handle finalizer using common function
	finalized, err := commonController.HandleFinalizer(ctx, dgdr, r.Client, r)
	if err != nil {
		return ctrl.Result{}, err
	}
	if finalized {
		// Resource was deleted and finalized
		return ctrl.Result{}, nil
	}

	// Check for spec changes (immutability enforcement)
	if dgdr.Status.ObservedGeneration > 0 && dgdr.Status.ObservedGeneration != dgdr.Generation {
		// Spec changed after initial processing
316
317
318
319
		if dgdr.Status.Phase == nvidiacomv1beta1.DGDRPhaseProfiling || dgdr.Status.Phase == nvidiacomv1beta1.DGDRPhaseDeploying ||
			dgdr.Status.Phase == nvidiacomv1beta1.DGDRPhaseReady || dgdr.Status.Phase == nvidiacomv1beta1.DGDRPhaseDeployed {
			logger.Info("Spec change detected in immutable phase",
				"phase", dgdr.Status.Phase,
320
321
322
				"observedGeneration", dgdr.Status.ObservedGeneration,
				"currentGeneration", dgdr.Generation)

323
324
			r.Recorder.Event(dgdr, corev1.EventTypeWarning, nvidiacomv1beta1.EventReasonSpecChangeRejected,
				fmt.Sprintf(MessageSpecChangeRejected, dgdr.Status.Phase))
325
326

			// Keep the old observedGeneration to continue rejecting changes
327
			// No phase transition - stay in current phase with old spec
328
329
330
			return ctrl.Result{}, nil
		}
	}
331
332
333
334
335
336
337
338
339
340
341
342
343
344
	// Phase machine: handle different phases
	switch dgdr.Status.Phase {
	case nvidiacomv1beta1.DGDRPhasePending, "":
		return r.handlePendingPhase(ctx, dgdr)
	case nvidiacomv1beta1.DGDRPhaseProfiling:
		return r.handleProfilingPhase(ctx, dgdr)
	case nvidiacomv1beta1.DGDRPhaseDeploying:
		return r.handleDeployingPhase(ctx, dgdr)
	case nvidiacomv1beta1.DGDRPhaseReady:
		return r.handleReadyPhase(ctx, dgdr)
	case nvidiacomv1beta1.DGDRPhaseDeployed:
		return r.handleDeployedPhase(ctx, dgdr)
	case nvidiacomv1beta1.DGDRPhaseFailed:
		return r.handleFailedPhase(ctx, dgdr)
345
	default:
346
347
		logger.Info("Unknown phase", "phase", dgdr.Status.Phase)
		return r.updatePhaseAndRequeue(ctx, dgdr, nvidiacomv1beta1.DGDRPhaseFailed, MessageInvalidState)
348
349
350
	}
}

351
352
353
354
// handlePendingPhase processes newly created or pending DGDR resources.
// When ObservedGeneration == 0, performs initial validation (merged from v1alpha1 Initializing state).
// Otherwise, starts the profiling process.
func (r *DynamoGraphDeploymentRequestReconciler) handlePendingPhase(ctx context.Context, dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest) (ctrl.Result, error) {
355
356
	logger := log.FromContext(ctx)

357
358
359
	// First-time processing: validate spec (merged from handleInitialState)
	if dgdr.Status.ObservedGeneration == 0 {
		logger.Info("Handling initial validation", "name", dgdr.Name)
360

361
362
363
364
365
		// Validate the spec
		if err := r.validateSpec(ctx, dgdr); err != nil {
			r.Recorder.Event(dgdr, corev1.EventTypeWarning, nvidiacomv1beta1.EventReasonValidationFailed, err.Error())
			return r.updatePhaseWithCondition(ctx, dgdr, nvidiacomv1beta1.DGDRPhaseFailed, nvidiacomv1beta1.ConditionTypeValidation, metav1.ConditionFalse, nvidiacomv1beta1.EventReasonValidationFailed, err.Error())
		}
366

367
368
		// Set observedGeneration to track the spec we're processing
		dgdr.Status.ObservedGeneration = dgdr.Generation
369

370
		// Initialize status — next reconcile will discover hardware and create the profiling job.
371
		r.Recorder.Event(dgdr, corev1.EventTypeNormal, nvidiacomv1beta1.EventReasonInitialized, MessageInitialized)
372
373
374
		return r.updatePhaseWithCondition(ctx, dgdr, nvidiacomv1beta1.DGDRPhasePending,
			nvidiacomv1beta1.ConditionTypeProfiling, metav1.ConditionFalse,
			"DiscoveringHardware", MessageDiscoveringHardware)
375
	}
376

377
	logger.Info("Handling pending phase", "name", dgdr.Name)
378
379
380

	// Create profiling job (online or AIC)
	if err := r.createProfilingJob(ctx, dgdr); err != nil {
381
382
		r.Recorder.Event(dgdr, corev1.EventTypeWarning, nvidiacomv1beta1.EventReasonProfilingJobFailed, err.Error())
		return r.updatePhaseWithCondition(ctx, dgdr, nvidiacomv1beta1.DGDRPhaseFailed, nvidiacomv1beta1.ConditionTypeProfiling, metav1.ConditionFalse, MessageJobCreationFailed, err.Error())
383
384
385
	}

	// Record event with appropriate message
386
	if isOnlineProfiling(dgdr) {
387
		r.Recorder.Event(dgdr, corev1.EventTypeNormal, nvidiacomv1beta1.EventReasonProfilingJobCreated, MessageProfilingJobCreated)
388
	} else {
389
		r.Recorder.Event(dgdr, corev1.EventTypeNormal, nvidiacomv1beta1.EventReasonProfilingJobCreated, MessageAICProfilingJobCreated)
390
391
	}

392
	// Update to Profiling phase — show DiscoveringHardware until the job is confirmed running.
393
	dgdr.SetProfilingPhase(nvidiacomv1beta1.ProfilingPhaseInitializing)
394
	return r.updatePhaseWithCondition(ctx, dgdr, nvidiacomv1beta1.DGDRPhaseProfiling, nvidiacomv1beta1.ConditionTypeProfiling, metav1.ConditionFalse, "DiscoveringHardware", MessageDiscoveringHardware)
395
396
}

397
398
// handleProfilingPhase monitors profiling progress and generates spec when complete
func (r *DynamoGraphDeploymentRequestReconciler) handleProfilingPhase(ctx context.Context, dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest) (ctrl.Result, error) {
399
	logger := log.FromContext(ctx)
400
	logger.Info("Handling profiling phase", "name", dgdr.Name)
401
402
403
404
405
406

	// Check profiling job status (both online and offline/AIC run as Jobs)
	// Note: We watch the Job via Owns(), so we'll be triggered automatically on Job changes
	completed, err := r.checkProfilingJobStatus(ctx, dgdr)
	if err != nil {
		r.Recorder.Event(dgdr, corev1.EventTypeWarning, MessageProfilingCheckFailed, err.Error())
407
408
409
		// Job failed - clear profiling sub-phase and transition to Failed
		dgdr.ClearProfilingPhase()
		return r.updatePhaseWithCondition(ctx, dgdr, nvidiacomv1beta1.DGDRPhaseFailed, nvidiacomv1beta1.ConditionTypeProfiling, metav1.ConditionFalse, "ProfilingFailed", err.Error())
410
411
412
413
	}

	if !completed {
		logger.Info("Profiling job still running", "name", dgdr.Name)
414
415
416
417
418
		// Transition from DiscoveringHardware to ProfilingRunning once the job is confirmed active.
		cond := meta.FindStatusCondition(dgdr.Status.Conditions, nvidiacomv1beta1.ConditionTypeProfiling)
		if cond != nil && cond.Reason == "DiscoveringHardware" {
			return r.updatePhaseWithCondition(ctx, dgdr, nvidiacomv1beta1.DGDRPhaseProfiling, nvidiacomv1beta1.ConditionTypeProfiling, metav1.ConditionFalse, "ProfilingRunning", MessageProfilingInProgress)
		}
419
420
421
422
		// Don't requeue - we'll be triggered when the Job completes/fails
		return ctrl.Result{}, nil
	}

423
424
425
426
427
428
429
430
431
	profilingResults, dgdName, err := r.generateDGDSpec(ctx, dgdr)
	if err != nil {
		dgdr.ClearProfilingPhase()
		r.Recorder.Event(dgdr, corev1.EventTypeWarning, MessageGenerationFailed, err.Error())
		return r.updatePhaseWithCondition(ctx, dgdr, nvidiacomv1beta1.DGDRPhaseFailed, nvidiacomv1beta1.ConditionTypeSpecGenerated, metav1.ConditionFalse, MessageGenerationFailed, err.Error())
	}
	if err := r.Get(ctx, types.NamespacedName{Name: dgdr.Name, Namespace: dgdr.Namespace}, dgdr); err != nil {
		return ctrl.Result{}, fmt.Errorf("failed to refetch DGDR after generateDGDSpec: %w", err)
	}
432

433
	dgdr.ClearProfilingPhase()
434
	meta.SetStatusCondition(&dgdr.Status.Conditions, metav1.Condition{
435
		Type:               nvidiacomv1beta1.ConditionTypeProfiling,
436
437
438
439
440
		Status:             metav1.ConditionTrue,
		ObservedGeneration: dgdr.Generation,
		Reason:             "ProfilingCompleted",
		Message:            "Profiling job completed successfully",
	})
441
442
	dgdr.Status.DGDName = dgdName
	dgdr.Status.ProfilingResults = profilingResults
443

444
	r.Recorder.Event(dgdr, corev1.EventTypeNormal, nvidiacomv1beta1.EventReasonSpecGenerated, MessageSpecGenerated)
445

446
447
	// Create additional resources (ConfigMaps) immediately after profiling
	// This ensures that the `planner-profile-data` ConfigMap is available for both auto and manual deployment
448
	// v1beta1 uses the DGDR namespace for additional resources.
449
450
451
452
453
454
455
456
	targetNamespace := dgdr.Namespace
	if err := r.createAdditionalResources(ctx, dgdr, targetNamespace); err != nil {
		logger.Error(err, "Failed to create additional resources after profiling")
		// Don't fail the DGDR, just log the error - ConfigMaps can be created manually
		r.Recorder.Event(dgdr, corev1.EventTypeWarning, "ConfigMapCreationFailed",
			fmt.Sprintf("Failed to create ConfigMaps from profiling output: %v", err))
	}

457
	// If autoApply is enabled, transition to Deploying phase
458
	if dgdr.Spec.AutoApply == nil || *dgdr.Spec.AutoApply {
459
460
		logger.Info("AutoApply enabled, transitioning to Deploying phase")
		return r.updatePhaseWithCondition(ctx, dgdr, nvidiacomv1beta1.DGDRPhaseDeploying, nvidiacomv1beta1.ConditionTypeSpecGenerated, metav1.ConditionTrue, nvidiacomv1beta1.EventReasonSpecGenerated, MessageSpecGenerated)
461
462
	}

463
464
	// Otherwise, transition to Ready phase
	return r.updatePhaseWithCondition(ctx, dgdr, nvidiacomv1beta1.DGDRPhaseReady, nvidiacomv1beta1.ConditionTypeSpecGenerated, metav1.ConditionTrue, nvidiacomv1beta1.EventReasonSpecGenerated, MessageSpecAvailable)
465
466
}

467
468
// handleReadyPhase handles DGDR in Ready phase (profiling complete, spec available)
func (r *DynamoGraphDeploymentRequestReconciler) handleReadyPhase(ctx context.Context, dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest) (ctrl.Result, error) {
469
470
471
	logger := log.FromContext(ctx)
	logger.Info("DGDR is ready", "name", dgdr.Name)

472
473
474
475
476
477
478
479
480
	// Nothing to monitor in Ready phase - spec is available for manual application
	return ctrl.Result{}, nil
}

// handleDeployingPhase handles DGD creation and monitors deployment
func (r *DynamoGraphDeploymentRequestReconciler) handleDeployingPhase(ctx context.Context, dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest) (ctrl.Result, error) {
	logger := log.FromContext(ctx)
	logger.Info("Handling deploying phase", "name", dgdr.Name)

481
	if dgdr.Spec.AutoApply != nil && !*dgdr.Spec.AutoApply {
482
483
484
485
486
		// Shouldn't be in this phase without autoApply
		logger.Info("AutoApply not enabled, transitioning to Ready")
		dgdr.Status.Phase = nvidiacomv1beta1.DGDRPhaseReady
		setSucceededCondition(dgdr, nvidiacomv1beta1.DGDRPhaseReady)
		return ctrl.Result{}, r.Status().Update(ctx, dgdr)
487
488
	}

489
490
491
492
493
	if dgdr.Status.DGDName == "" {
		return r.createDGD(ctx, dgdr)
	}

	dgd := &dgdv1alpha1.DynamoGraphDeployment{}
494
	err := r.Get(ctx, types.NamespacedName{
495
496
		Name:      dgdr.Status.DGDName,
		Namespace: dgdr.Namespace,
497
498
499
	}, dgd)

	if apierrors.IsNotFound(err) {
500
501
502
503
504
		// Annotation present means DGD was never created (spec ready but create not yet called).
		// Annotation absent means DGD was previously created and then manually deleted.
		if _, hasSpec := dgdr.Annotations["nvidia.com/generated-dgd-spec"]; hasSpec {
			return r.createDGD(ctx, dgdr)
		}
505
506
507
508
509
510
511
		return r.handleDGDDeleted(ctx, dgdr)
	}

	if err != nil {
		return ctrl.Result{}, err
	}

512
513
514
	// Check if DGD is Ready
	var condStatus metav1.ConditionStatus
	var condReason, condMessage string
515

516
517
518
519
	if dgd.Status.State == dgdv1alpha1.DGDStateSuccessful {
		logger.Info("DGD is Ready, transitioning to Deployed phase")
		dgdr.Status.Phase = nvidiacomv1beta1.DGDRPhaseDeployed
		setSucceededCondition(dgdr, nvidiacomv1beta1.DGDRPhaseDeployed)
520

521
522
		r.Recorder.Event(dgdr, corev1.EventTypeNormal, nvidiacomv1beta1.EventReasonDeploymentReady,
			fmt.Sprintf(MessageDeploymentReady, dgd.Name))
523

524
525
526
527
528
		condStatus = metav1.ConditionTrue
		condReason = nvidiacomv1beta1.EventReasonDeploymentReady
		condMessage = fmt.Sprintf(MessageDeploymentReady, dgd.Name)
	} else {
		logger.Info("DGD not yet ready", "name", dgd.Name, "state", dgd.Status.State)
529

530
531
532
		condStatus = metav1.ConditionFalse
		condReason = "DeploymentInProgress"
		condMessage = fmt.Sprintf("DGD %s is in %s state", dgd.Name, string(dgd.Status.State))
533
534
	}

535
536
537
538
539
540
541
542
	updateDeploymentInfo(dgdr, dgd)
	meta.SetStatusCondition(&dgdr.Status.Conditions, metav1.Condition{
		Type:    nvidiacomv1beta1.ConditionTypeDeploymentReady,
		Status:  condStatus,
		Reason:  condReason,
		Message: condMessage,
	})

543
544
545
	return ctrl.Result{}, r.Status().Update(ctx, dgdr)
}

546
547
// handleDeployedPhase monitors a healthy DGD and detects degradation or deletion
func (r *DynamoGraphDeploymentRequestReconciler) handleDeployedPhase(ctx context.Context, dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest) (ctrl.Result, error) {
548
	logger := log.FromContext(ctx)
549
	logger.Info("DGDR is deployed", "name", dgdr.Name)
550

551
552
	// Check if DGD still exists and monitor its status
	dgd := &dgdv1alpha1.DynamoGraphDeployment{}
553
	err := r.Get(ctx, types.NamespacedName{
554
555
		Name:      dgdr.Status.DGDName,
		Namespace: dgdr.Namespace,
556
557
558
559
560
561
562
563
564
565
566
	}, dgd)

	if apierrors.IsNotFound(err) {
		// DGD was deleted by user
		return r.handleDGDDeleted(ctx, dgdr)
	}

	if err != nil {
		return ctrl.Result{}, err
	}

567
568
569
570
	// Check if DGD degraded from Ready
	if dgd.Status.State != dgdv1alpha1.DGDStateSuccessful {
		logger.Info("DGD degraded, transitioning back to Deploying",
			"dgdState", dgd.Status.State)
571

572
573
574
		dgdr.Status.Phase = nvidiacomv1beta1.DGDRPhaseDeploying
		setSucceededCondition(dgdr, nvidiacomv1beta1.DGDRPhaseDeploying)
		updateDeploymentInfo(dgdr, dgd)
575

576
577
		r.Recorder.Event(dgdr, corev1.EventTypeWarning, nvidiacomv1beta1.EventReasonDeploymentDegraded,
			fmt.Sprintf(MessageDeploymentDegraded, dgd.Name, string(dgd.Status.State)))
578
579

		meta.SetStatusCondition(&dgdr.Status.Conditions, metav1.Condition{
580
581
582
583
			Type:    nvidiacomv1beta1.ConditionTypeDeploymentReady,
			Status:  metav1.ConditionFalse,
			Reason:  nvidiacomv1beta1.EventReasonDeploymentDegraded,
			Message: fmt.Sprintf("Deployment degraded to %s", string(dgd.Status.State)),
584
		})
585
586
587
588
589
590
	} else {
		// DGD is healthy — update replica info only if changed
		if !updateDeploymentInfo(dgdr, dgd) {
			// Nothing changed, skip the status write
			return ctrl.Result{}, nil
		}
591
592
593
594
595
	}

	return ctrl.Result{}, r.Status().Update(ctx, dgdr)
}

596
597
598
// handleDGDDeleted handles the case when auto-created DGD is deleted by user.
// In v1beta1, this transitions to Failed (DeploymentDeleted phase was removed).
func (r *DynamoGraphDeploymentRequestReconciler) handleDGDDeleted(ctx context.Context, dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest) (ctrl.Result, error) {
599
	logger := log.FromContext(ctx)
600
	logger.Info("DGD was deleted by user, transitioning to Failed phase")
601

602
603
	dgdr.Status.Phase = nvidiacomv1beta1.DGDRPhaseFailed
	setSucceededCondition(dgdr, nvidiacomv1beta1.DGDRPhaseFailed)
604

605
606
	r.Recorder.Event(dgdr, corev1.EventTypeWarning, nvidiacomv1beta1.EventReasonDeploymentDeleted,
		fmt.Sprintf(MessageDeploymentDeleted, dgdr.Status.DGDName))
607

608
609
	dgdr.Status.DGDName = ""
	dgdr.Status.DeploymentInfo = nil
610

611
	meta.SetStatusCondition(&dgdr.Status.Conditions, metav1.Condition{
612
		Type:    nvidiacomv1beta1.ConditionTypeDeploymentReady,
613
		Status:  metav1.ConditionFalse,
614
		Reason:  nvidiacomv1beta1.EventReasonDeploymentDeleted,
615
616
617
618
619
620
621
		Message: "Deployment was deleted by user. Create a new DGDR to redeploy.",
	})

	return ctrl.Result{}, r.Status().Update(ctx, dgdr)
}

// createDGD creates a DynamoGraphDeployment with the generated spec
622
func (r *DynamoGraphDeploymentRequestReconciler) createDGD(ctx context.Context, dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest) (ctrl.Result, error) {
623
624
	logger := log.FromContext(ctx)

625
626
627
628
	// Extract DGD spec from annotation (stored by generateDGDSpec)
	dgdSpecYAML, ok := dgdr.Annotations["nvidia.com/generated-dgd-spec"]
	if !ok || dgdSpecYAML == "" {
		return ctrl.Result{}, fmt.Errorf("generated DGD spec not found in annotation nvidia.com/generated-dgd-spec")
629
630
	}

631
632
633
	generatedDGD := &dgdv1alpha1.DynamoGraphDeployment{}
	if err := yaml.Unmarshal([]byte(dgdSpecYAML), generatedDGD); err != nil {
		return ctrl.Result{}, fmt.Errorf("failed to unmarshal generated deployment from annotation: %w", err)
634
635
	}

636
	// Determine DGD name and namespace from generated deployment
637
638
639
640
641
642
643
644
645
646
647
	dgdName := generatedDGD.Name
	dgdNamespace := dgdr.Namespace

	// Build labels (start with generated DGD's labels)
	labels := make(map[string]string)
	if generatedDGD.Labels != nil {
		for k, v := range generatedDGD.Labels {
			labels[k] = v
		}
	}
	// Add/override with managed labels
648
649
650
	labels[nvidiacomv1beta1.LabelDGDRName] = dgdr.Name
	labels[nvidiacomv1beta1.LabelDGDRNamespace] = dgdr.Namespace
	labels[nvidiacomv1beta1.LabelManagedBy] = nvidiacomv1beta1.LabelValueDynamoOperator
651
652
653
654
655
656
657
658
659
660

	// Build annotations (start with generated DGD's annotations)
	annotations := make(map[string]string)
	if generatedDGD.Annotations != nil {
		for k, v := range generatedDGD.Annotations {
			annotations[k] = v
		}
	}

	// Create DGD from generated deployment
661
	dgd := &dgdv1alpha1.DynamoGraphDeployment{
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
		ObjectMeta: metav1.ObjectMeta{
			Name:        dgdName,
			Namespace:   dgdNamespace,
			Labels:      labels,
			Annotations: annotations,
		},
		Spec: generatedDGD.Spec,
	}

	// Note: We don't set owner reference on DGD
	// If a DGDR is deleted, the DGD may be serving traffic and should persist independently.
	// We use labels (LabelDGDRName) to track the relationship.

	logger.Info("Creating DynamoGraphDeployment", "name", dgdName, "namespace", dgdNamespace)

	if err := r.Create(ctx, dgd); err != nil {
		if apierrors.IsAlreadyExists(err) {
			logger.Info("DGD already exists, updating status")
680
681
682
683
684
			delete(dgdr.Annotations, "nvidia.com/generated-dgd-spec")
			if updateErr := r.Update(ctx, dgdr); updateErr != nil {
				logger.Error(updateErr, "Failed to remove generated-dgd-spec annotation on IsAlreadyExists path")
				return ctrl.Result{}, updateErr
			}
685
			dgdr.Status.DGDName = dgdName
686
687
688
689
690
691
			return ctrl.Result{}, r.Status().Update(ctx, dgdr)
		}
		r.Recorder.Event(dgdr, corev1.EventTypeWarning, MessageDeploymentCreationFailed, err.Error())
		return ctrl.Result{}, err
	}

692
693
694
695
696
697
698
	delete(dgdr.Annotations, "nvidia.com/generated-dgd-spec")
	if err := r.Update(ctx, dgdr); err != nil {
		// Return the error to force a retry. The DGD was created successfully, so a
		// retry will hit the IsAlreadyExists path above and attempt cleanup again.
		return ctrl.Result{}, fmt.Errorf("failed to remove generated-dgd-spec annotation after DGD creation: %w", err)
	}

699
	// Update status
700
	dgdr.Status.DGDName = dgdName
701

702
	r.Recorder.Event(dgdr, corev1.EventTypeNormal, nvidiacomv1beta1.EventReasonDeploymentCreated,
703
704
705
		fmt.Sprintf(MessageDeploymentCreated, dgdName))

	meta.SetStatusCondition(&dgdr.Status.Conditions, metav1.Condition{
706
		Type:    nvidiacomv1beta1.ConditionTypeDeploymentReady,
707
		Status:  metav1.ConditionFalse,
708
		Reason:  nvidiacomv1beta1.EventReasonDeploymentCreated,
709
710
711
712
713
714
715
716
		Message: fmt.Sprintf("DGD %s created, waiting for Ready", dgdName),
	})

	logger.Info("DynamoGraphDeployment created successfully", "name", dgdName)

	return ctrl.Result{}, r.Status().Update(ctx, dgdr)
}

717
// createAdditionalResources creates ConfigMaps from the profiling output that should be deployed alongside the DGD
718
func (r *DynamoGraphDeploymentRequestReconciler) createAdditionalResources(ctx context.Context, dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest, targetNamespace string) error {
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
	logger := log.FromContext(ctx)

	// Check if there are additional resources stored in annotations
	if dgdr.Annotations == nil {
		return nil
	}

	resourcesYAML, exists := dgdr.Annotations[AnnotationAdditionalResources]
	if !exists || resourcesYAML == "" {
		return nil
	}

	// Parse using standard Kubernetes YAML decoder
	decoder := yaml.NewYAMLOrJSONDecoder(bytes.NewReader([]byte(resourcesYAML)), 4096)
	resourceCount := 0

	for {
		obj := &unstructured.Unstructured{}
		if err := decoder.Decode(obj); err != nil {
			if err == io.EOF {
				break
			}
			logger.Error(err, "Failed to decode resource, skipping")
			continue
		}

		if obj.GetKind() == "" {
			continue
		}

		resourceCount++

		// Only support ConfigMap for now (what profiler actually generates)
		if obj.GetKind() != "ConfigMap" {
			logger.Info("Skipping non-ConfigMap resource from profiling output", "kind", obj.GetKind(), "name", obj.GetName())
			continue
		}

		cm := &corev1.ConfigMap{}
		if err := runtime.DefaultUnstructuredConverter.FromUnstructured(obj.Object, cm); err != nil {
			logger.Error(err, "Failed to convert to ConfigMap", "name", obj.GetName())
			continue
		}

		// Override namespace and add tracking labels
		cm.Namespace = targetNamespace
		if cm.Labels == nil {
			cm.Labels = make(map[string]string)
		}
768
769
770
		cm.Labels[nvidiacomv1beta1.LabelDGDRName] = dgdr.Name
		cm.Labels[nvidiacomv1beta1.LabelDGDRNamespace] = dgdr.Namespace
		cm.Labels[nvidiacomv1beta1.LabelManagedBy] = nvidiacomv1beta1.LabelValueDynamoOperator
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790

		// Create the ConfigMap
		if err := r.Create(ctx, cm); err != nil {
			if apierrors.IsAlreadyExists(err) {
				logger.Info("ConfigMap already exists, skipping", "name", cm.Name)
			} else {
				return fmt.Errorf("failed to create ConfigMap %s: %w", cm.Name, err)
			}
		} else {
			logger.Info("Created ConfigMap from profiling output", "name", cm.Name, "namespace", targetNamespace)
		}
	}

	if resourceCount > 0 {
		logger.Info("Deploying additional resources from profiling output", "count", resourceCount)
	}

	return nil
}

791
792
// handleFailedPhase handles DGDR in Failed phase
func (r *DynamoGraphDeploymentRequestReconciler) handleFailedPhase(ctx context.Context, dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest) (ctrl.Result, error) {
793
	logger := log.FromContext(ctx)
794
	logger.Info("DGDR is in failed phase", "name", dgdr.Name)
795
796
797
798
799

	// Could implement retry logic here if desired
	return ctrl.Result{}, nil
}

800
// getProfilingJobName returns the job name for a DGDR
801
func getProfilingJobName(dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest) string {
802
803
	// Use "profile-" prefix for all profiling jobs
	return fmt.Sprintf("profile-%s", dgdr.Name)
804
805
806
}

// getOutputConfigMapName returns the ConfigMap name for profiling output
807
func getOutputConfigMapName(dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest) string {
808
809
810
	return fmt.Sprintf("%s%s", ConfigMapOutputPrefix, dgdr.Name)
}

811
812
813
// isOnlineProfiling returns true. In v1beta1, the profiler decides online vs AIC
// mode internally based on its config. The controller always uses the same label.
func isOnlineProfiling(_ *nvidiacomv1beta1.DynamoGraphDeploymentRequest) bool {
814
815
	return true
}
816

817
// validateSpec validates the DGDR spec
818
819
820
821
822
823
824
825
826
827
828
func (r *DynamoGraphDeploymentRequestReconciler) validateSpec(ctx context.Context, dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest) error {
	var errs []error

	// Disallow searchStrategy: thorough with backend: auto.
	if dgdr.Spec.SearchStrategy == nvidiacomv1beta1.SearchStrategyThorough &&
		dgdr.Spec.Backend == nvidiacomv1beta1.BackendTypeAuto {
		errs = append(errs, fmt.Errorf(
			"spec.searchStrategy %q is incompatible with spec.backend %q: set spec.backend to a specific backend (sglang, trtllm, or vllm)",
			nvidiacomv1beta1.SearchStrategyThorough,
			nvidiacomv1beta1.BackendTypeAuto,
		))
829
830
	}

831
	// Validate model cache PVC if provided
832
	if dgdr.Spec.ModelCache != nil && dgdr.Spec.ModelCache.PVCName != "" {
833
834
		pvc := &corev1.PersistentVolumeClaim{}
		err := r.Get(ctx, types.NamespacedName{
835
			Name:      dgdr.Spec.ModelCache.PVCName,
836
837
838
839
840
			Namespace: dgdr.Namespace,
		}, pvc)

		if err != nil {
			if apierrors.IsNotFound(err) {
841
842
843
				errs = append(errs, fmt.Errorf(MessageModelCachePVCNotFound, dgdr.Spec.ModelCache.PVCName, dgdr.Namespace))
			} else {
				return err
844
845
846
847
			}
		}
	}

848
	if err := r.validateGPUHardwareInfo(ctx, dgdr); err != nil {
849
		errs = append(errs, err)
850
851
	}

852
	// The profiler will validate the rest of the configuration
853
	return errors.Join(errs...)
854
855
856
}

// validateGPUHardwareInfo ensures GPU hardware information is available when required for profiling
857
func (r *DynamoGraphDeploymentRequestReconciler) validateGPUHardwareInfo(ctx context.Context, dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest) error {
858
859
	logger := log.FromContext(ctx)

860
861
862
863
	// Check if user provided hardware info in the typed spec
	hasManualConfig := dgdr.Spec.Hardware != nil && (dgdr.Spec.Hardware.GPUSKU != "" ||
		dgdr.Spec.Hardware.VRAMMB != nil ||
		dgdr.Spec.Hardware.NumGPUsPerNode != nil)
864

865
866
	// If manual config is provided, validation passes
	if hasManualConfig {
867
868
869
		return nil
	}

870
	isNamespaceScoped := r.Config.Namespace.Restricted != ""
871
	if isNamespaceScoped {
872
873
		return fmt.Errorf(
			"GPU hardware info required but cannot be auto-discovered." +
874
875
876
				"\n\nOptions to resolve:" +
				"\n\n1. Re-enable GPU discovery (if it was disabled during Helm install):" +
				"\n   helm upgrade ... --set dynamo-operator.gpuDiscovery.enabled=true" +
877
878
879
880
				"\n\n2. Add hardware config to spec.hardware:" +
				"\n   numGpusPerNode: 8" +
				"\n   gpuSku: \"H100-SXM5-80GB\"" +
				"\n   vramMb: 81920")
881
882
	}

883
884
885
886
887
888
889
890
	_, err := r.GPUDiscovery.DiscoverGPUsFromDCGM(ctx, r.APIReader, r.GPUDiscoveryCache)
	if err == nil {
		// GPU discovery is available, validation passes
		return nil
	}
	// Refine the logger message
	reason := GetGPUDiscoveryFailureReason(err)
	logger.Info("GPU discovery not available", "reason", reason, "error", err.Error())
891
	return fmt.Errorf("GPU hardware info required but auto-discovery failed. Add spec.hardware.gpuSku, spec.hardware.vramMb, spec.hardware.numGpusPerNode")
892
893
}

894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
// GetGPUDiscoveryFailureReason classifies a GPU discovery error and
// returns a stable, actionable reason string suitable for structured logging.
//
// The classification is based on known error message patterns produced during:
//   - DCGM exporter pod discovery
//   - Helm-based GPU operator and DCGM discovery
//   - Metrics scraping
//   - Prometheus parsing
//
// If the error does not match any known category, "unknown" is returned.
func GetGPUDiscoveryFailureReason(err error) string {
	if err == nil {
		return "unknown"
	}
	errMsg := strings.ToLower(err.Error())

	switch {
	case strings.Contains(errMsg, "list pods"):
		return "failed to list DCGM exporter pods (RBAC/cluster connectivity issue)"
	case strings.Contains(errMsg, "gpu operator is not installed"):
		return "GPU Operator not installed in expected namespace"
	case strings.Contains(errMsg, "helm init failed"):
		return "failed to initialize Helm client (RBAC, kubeconfig, or Helm driver issue)"
	case strings.Contains(errMsg, "timeout waiting for dcgm exporter pods"):
		return "timeout while waiting for DCGM exporter pods to become ready"
	case strings.Contains(errMsg, "http get"):
		return "failed to reach DCGM metrics endpoint on pod (network/port issue)"
	case strings.Contains(errMsg, "metrics endpoint") &&
		strings.Contains(errMsg, "status"):
		return "DCGM pod metrics endpoint returned non-200 status"
	case strings.Contains(errMsg, "parse prometheus metrics"):
		return "failed to parse dcgm Prometheus metrics (invalid format)"
	case strings.Contains(errMsg, "no gpus detected"):
		return "no GPUs detected in dcgm metrics (GPU model or metrics missing)"
	case strings.Contains(errMsg, "dcgm is not enabled in the GPU Operator"):
		return "DCGM is not enabled in the GPU Operator (check GPU Operator configuration and permissions)"
	case strings.Contains(errMsg, "failed to scrape any dcgm exporter pod"):
		return "failed to scrape any dcgm exporter pod (check DCGM exporter pod status and network connectivity)"
	case strings.Contains(errMsg, "no gpu metrics could be parsed from any dcgm pod"):
		return "no GPU metrics could be parsed from any DCGM pod (check DCGM exporter pod status and network connectivity)"
	case strings.Contains(errMsg, "failed to create helm path"):
		return "failed to initialize Helm client (RBAC, kubeconfig, or Helm driver issue)"
	}
	return "unknown"
}

940
// createProfilingJob creates a Kubernetes Job for profiling using SyncResource
941
func (r *DynamoGraphDeploymentRequestReconciler) createProfilingJob(ctx context.Context, dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest) error {
942
943
	logger := log.FromContext(ctx)

944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
	// Delete any existing output ConfigMap to ensure fresh profiling results
	// This prevents using stale data from previous profiling runs
	outputConfigMapName := getOutputConfigMapName(dgdr)
	existingCM := &corev1.ConfigMap{}
	err := r.Get(ctx, types.NamespacedName{
		Name:      outputConfigMapName,
		Namespace: dgdr.Namespace,
	}, existingCM)
	if err == nil {
		// ConfigMap exists, delete it
		logger.Info("Deleting existing output ConfigMap to ensure fresh profiling results", "configMap", outputConfigMapName)
		if err := r.Delete(ctx, existingCM); err != nil && !apierrors.IsNotFound(err) {
			logger.Error(err, "Failed to delete existing output ConfigMap", "configMap", outputConfigMapName)
			return fmt.Errorf("failed to delete existing output ConfigMap: %w", err)
		}
		logger.Info("Successfully deleted old output ConfigMap", "configMap", outputConfigMapName)
	} else if !apierrors.IsNotFound(err) {
		// Unexpected error checking for ConfigMap
		logger.Error(err, "Failed to check for existing output ConfigMap", "configMap", outputConfigMapName)
		return fmt.Errorf("failed to check for existing output ConfigMap: %w", err)
	}

	// Ensure profiling job RBAC exists (only for cluster-wide installation)
967
	if r.Config.Namespace.Restricted == "" {
968
969
970
971
972
973
974
975
976
977
978
		if err := r.RBACManager.EnsureServiceAccountWithRBAC(
			ctx,
			dgdr.Namespace,
			ServiceAccountProfilingJob,
			r.Config.RBAC.DGDRProfilingClusterRoleName,
		); err != nil {
			logger.Error(err, "Failed to ensure profiling job RBAC")
			return fmt.Errorf("failed to ensure profiling job RBAC: %w", err)
		}
	}

979
980
981
982
	// Enrich hardware from GPU discovery before marshalling the spec.
	// This fills in gpuSku, vramMb, numGpusPerNode if the user didn't set them.
	if err := r.enrichHardwareFromDiscovery(ctx, dgdr); err != nil {
		logger.Info("GPU discovery not available, proceeding without enrichment", "reason", err.Error())
983
984
	}

985
986
987
988
989
	// Use SyncResource to create/update the job
	modified, job, err := commonController.SyncResource(ctx, r, dgdr, func(ctx context.Context) (*batchv1.Job, bool, error) {
		jobName := getProfilingJobName(dgdr)
		outputConfigMapName := getOutputConfigMapName(dgdr)

990
991
		// Marshal the DGDR spec to JSON — the profiler receives the spec verbatim
		specJSON, err := marshalDGDRSpec(dgdr)
992
		if err != nil {
993
			return nil, false, err
994
		}
995
996

		// Common environment variables
997
		profilerEnv := []corev1.EnvVar{
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
			{
				Name: "HUGGING_FACE_HUB_TOKEN",
				ValueFrom: &corev1.EnvVarSource{
					SecretKeyRef: &corev1.SecretKeySelector{
						LocalObjectReference: corev1.LocalObjectReference{
							Name: "hf-token-secret",
						},
						Key: "HF_TOKEN",
					},
				},
			},
			{
				Name:  "NATS_SERVER",
				Value: fmt.Sprintf("nats://%s-nats:4222", dgdr.Namespace),
			},
			{
				Name:  "ETCD_ENDPOINTS",
				Value: fmt.Sprintf("%s-etcd:2379", dgdr.Namespace),
			},
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
			// DGDR metadata for setting ownerReferences
			{
				Name:  "DGDR_NAME",
				Value: dgdr.Name,
			},
			{
				Name:  "DGDR_NAMESPACE",
				Value: dgdr.Namespace,
			},
			{
				Name:  "DGDR_UID",
				Value: string(dgdr.UID),
			},
1030
1031
		}

1032
		// Build volume mounts
1033
1034
1035
1036
1037
1038
1039
		volumeMounts := []corev1.VolumeMount{
			{
				Name:      VolumeNameProfilingOutput,
				MountPath: ProfilingOutputPath,
			},
		}

1040
		// Add model cache PVC mount if configured
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
		modelCachePVC, modelCacheMountPath := extractModelCachePVCConfig(dgdr)
		if modelCachePVC != "" {
			logger.Info("Mounting model cache PVC to profiler pod", "pvc", modelCachePVC, "mountPath", modelCacheMountPath)
			volumeMounts = append(volumeMounts, corev1.VolumeMount{
				Name:      VolumeNameModelCache,
				MountPath: modelCacheMountPath,
				ReadOnly:  true,
			})
		}

1051
1052
1053
1054
1055
1056
1057
1058
		// v1alpha1 round-trip: mount ConfigMap if referenced via annotation
		cmRef := configMapRefFromAnnotation(dgdr)
		if cmRef != nil {
			volumeMounts = append(volumeMounts, corev1.VolumeMount{
				Name:      VolumeNameProfilingConfig,
				MountPath: ProfilingConfigMountPath,
				ReadOnly:  true,
			})
1059
1060
		}

1061
		// Profiler args: pass the DGDR spec as JSON via --config
1062
1063
		// --output-dir must match ProfilingOutputPath so the sidecar can find profiler_status.yaml
		profilerArgs := []string{"--config", specJSON, "--output-dir", ProfilingOutputPath}
1064

1065
1066
		// Use image from spec; the defaulting webhook fills this in for production builds.
		// Guard against empty image in case the webhook didn't run (e.g. local dev builds).
1067
		imageName := dgdr.Spec.Image
1068
1069
1070
		if imageName == "" {
			return nil, false, fmt.Errorf("spec.image is required but not set; ensure the defaulting webhook ran or set spec.image explicitly")
		}
1071
1072
		logger.Info("Using profiler image", "image", imageName)

1073
		profilerContainer := corev1.Container{
1074
1075
			Name:         ContainerNameProfiler,
			Image:        imageName,
1076
			Command:      []string{"python", "-m", "dynamo.profiler"},
1077
			Args:         profilerArgs,
1078
1079
			Env:          profilerEnv,
			VolumeMounts: volumeMounts,
1080
			WorkingDir:   "/workspace",
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
		}

		// Generate sidecar script from template
		tmpl, err := template.New("sidecar").Parse(sidecarScriptTemplate)
		if err != nil {
			return nil, false, fmt.Errorf("failed to parse sidecar script template: %w", err)
		}

		var scriptBuf bytes.Buffer
		err = tmpl.Execute(&scriptBuf, map[string]string{
1091
1092
1093
1094
1095
			"OutputPath":    ProfilingOutputPath,
			"OutputFile":    ProfilingOutputFile,
			"ConfigMapName": outputConfigMapName,
			"Namespace":     dgdr.Namespace,
			"DGDRName":      dgdr.Name,
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
		})
		if err != nil {
			return nil, false, fmt.Errorf("failed to execute sidecar script template: %w", err)
		}

		sidecarContainer := corev1.Container{
			Name:    ContainerNameOutputCopier,
			Image:   SidecarImage,
			Command: []string{"/bin/sh", "-c"},
			Args:    []string{scriptBuf.String()},
			VolumeMounts: []corev1.VolumeMount{{
				Name:      VolumeNameProfilingOutput,
				MountPath: ProfilingOutputPath,
				ReadOnly:  true,
			}},
		}

1113
1114
		// Use PVC for profiling output if round-tripped v1alpha1 annotation is present,
		// otherwise use emptyDir (v1beta1 default).
1115
		var profilingOutputVolume corev1.Volume
1116
1117
		if outputPVC := outputPVCFromAnnotation(dgdr); outputPVC != "" {
			logger.Info("Using PVC for profiling output (from v1alpha1 annotation)", "pvc", outputPVC)
1118
1119
1120
1121
			profilingOutputVolume = corev1.Volume{
				Name: VolumeNameProfilingOutput,
				VolumeSource: corev1.VolumeSource{
					PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{
1122
						ClaimName: outputPVC,
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
					},
				},
			}
		} else {
			profilingOutputVolume = corev1.Volume{
				Name: VolumeNameProfilingOutput,
				VolumeSource: corev1.VolumeSource{
					EmptyDir: &corev1.EmptyDirVolumeSource{},
				},
			}
		}
		volumes := []corev1.Volume{profilingOutputVolume}
1135

1136
1137
		// Add model cache PVC volume if configured
		if modelCachePVC != "" {
1138
			volumes = append(volumes, corev1.Volume{
1139
				Name: VolumeNameModelCache,
1140
				VolumeSource: corev1.VolumeSource{
1141
1142
1143
					PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{
						ClaimName: modelCachePVC,
						ReadOnly:  true,
1144
1145
1146
1147
1148
					},
				},
			})
		}

1149
1150
1151
1152
1153
1154
		// v1alpha1 round-trip: add ConfigMap volume if referenced via annotation
		if cmRef != nil {
			cmKey := cmRef.Key
			if cmKey == "" {
				cmKey = ProfilingConfigDefaultKey
			}
1155
			volumes = append(volumes, corev1.Volume{
1156
				Name: VolumeNameProfilingConfig,
1157
				VolumeSource: corev1.VolumeSource{
1158
1159
1160
1161
1162
1163
1164
1165
					ConfigMap: &corev1.ConfigMapVolumeSource{
						LocalObjectReference: corev1.LocalObjectReference{
							Name: cmRef.Name,
						},
						Items: []corev1.KeyToPath{{
							Key:  cmKey,
							Path: ProfilingConfigDefaultKey,
						}},
1166
1167
1168
1169
1170
					},
				},
			})
		}

1171
1172
1173
		// Limit retries to prevent infinite loop
		backoffLimit := int32(3)

1174
1175
1176
1177
		podSpec := corev1.PodSpec{
			ServiceAccountName: ServiceAccountProfilingJob,
			RestartPolicy:      corev1.RestartPolicyNever,
			SecurityContext: &corev1.PodSecurityContext{
1178
1179
1180
1181
				RunAsNonRoot: ptr.To(true),
				RunAsUser:    ptr.To[int64](1000),
				RunAsGroup:   ptr.To[int64](1000),
				FSGroup:      ptr.To[int64](1000),
1182
1183
1184
1185
1186
1187
1188
1189
			},
			Containers: []corev1.Container{profilerContainer, sidecarContainer},
			Volumes:    volumes,
			ImagePullSecrets: []corev1.LocalObjectReference{
				{Name: "nvcr-imagepullsecret"},
			},
		}

1190
1191
1192
1193
1194
		job := &batchv1.Job{
			ObjectMeta: metav1.ObjectMeta{
				Name:      jobName,
				Namespace: dgdr.Namespace,
				Labels: map[string]string{
1195
1196
1197
					nvidiacomv1beta1.LabelApp:       nvidiacomv1beta1.LabelValueDynamoProfiler,
					nvidiacomv1beta1.LabelDGDR:      dgdr.Name,
					nvidiacomv1beta1.LabelManagedBy: nvidiacomv1beta1.LabelValueDynamoOperator,
1198
1199
1200
1201
1202
				},
			},
			Spec: batchv1.JobSpec{
				BackoffLimit: &backoffLimit,
				Template: corev1.PodTemplateSpec{
1203
					Spec: podSpec,
1204
1205
1206
1207
				},
			},
		}

1208
1209
1210
1211
1212
		var jobOverrides *batchv1.JobSpec
		if dgdr.Spec.Overrides != nil {
			jobOverrides = dgdr.Spec.Overrides.ProfilingJob
		}
		applyProfilingJobOverrides(job, jobOverrides)
1213

1214
1215
1216
1217
1218
1219
1220
1221
		return job, false, nil
	})

	if err != nil {
		return err
	}

	if modified {
1222
		logger.Info("Profiling job created/updated", "job", job.Name)
1223
1224
	}

1225
1226
1227
	// Store the job name in status for observability
	dgdr.Status.ProfilingJobName = job.Name

1228
1229
1230
	return nil
}

1231
1232
1233
1234
1235
1236
// marshalDGDRSpec produces the JSON string passed to the profiler via --config.
// The profiler receives the DGDR spec verbatim — no bespoke key mapping needed.
func marshalDGDRSpec(dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest) (string, error) {
	specJSON, err := json.Marshal(dgdr.Spec)
	if err != nil {
		return "", fmt.Errorf("failed to marshal DGDR spec to JSON: %w", err)
1237
	}
1238
1239
	return string(specJSON), nil
}
1240

1241
1242
1243
1244
1245
1246
1247
// enrichHardwareFromDiscovery fills in hardware fields that the user didn't set.
// Called before marshalDGDRSpec(). Mutates dgdr.Spec.Hardware in-place (memory only, not persisted).
func (r *DynamoGraphDeploymentRequestReconciler) enrichHardwareFromDiscovery(ctx context.Context, dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest) error {
	if dgdr.Spec.Hardware == nil {
		dgdr.Spec.Hardware = &nvidiacomv1beta1.HardwareSpec{}
	}
	hw := dgdr.Spec.Hardware
1248

1249
	if hw.GPUSKU != "" && hw.VRAMMB != nil && hw.NumGPUsPerNode != nil {
hhzhang16's avatar
hhzhang16 committed
1250
		return nil // all fields already set by user; TotalGPUs is filled below when discovery runs
1251
1252
	}

1253
	var gpuInfo *gpu.GPUInfo
1254
	logger := log.FromContext(ctx)
1255
1256
1257
1258
1259
	// Check if user provided hardware info in the typed spec
	hasManualConfig := dgdr.Spec.Hardware != nil && (dgdr.Spec.Hardware.GPUSKU != "" ||
		dgdr.Spec.Hardware.VRAMMB != nil ||
		dgdr.Spec.Hardware.NumGPUsPerNode != nil)
	if !hasManualConfig {
1260

1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
		logger.Info("Attempting GPU discovery for profiling job")
		discoveredInfo, err := r.GPUDiscovery.DiscoverGPUsFromDCGM(ctx, r.APIReader, r.GPUDiscoveryCache)
		if err != nil {
			// This path is expected for namespace-restricted operators without node read permissions
			// Refine the logger message
			reason := GetGPUDiscoveryFailureReason(err)
			logger.Info("GPU discovery not available, using manual hardware configuration from profiling config",
				"reason", reason, "error", err.Error())
			return err
		} else {
			gpuInfo = discoveredInfo
			logger.Info("GPU discovery completed successfully",
				"gpusPerNode", gpuInfo.GPUsPerNode,
				"nodesWithGPUs", gpuInfo.NodesWithGPUs,
				"totalGpus", gpuInfo.GPUsPerNode*gpuInfo.NodesWithGPUs,
				"model", gpuInfo.Model,
				"vramMiB", gpuInfo.VRAMPerGPU,
				"system", gpuInfo.System,
				"cloudprovider", gpuInfo.CloudProvider)
		}
	}
1282
	if hw.GPUSKU == "" {
1283
1284
1285
1286
1287
1288
		if gpuInfo.System != "" {
			hw.GPUSKU = gpuInfo.System
		} else {
			// Unknown GPU type: use raw model name; profiler will attempt naive config generation.
			hw.GPUSKU = nvidiacomv1beta1.GPUSKUType(gpuInfo.Model)
		}
1289
1290
1291
1292
	}
	if hw.VRAMMB == nil {
		vram := float64(gpuInfo.VRAMPerGPU)
		hw.VRAMMB = &vram
1293
	}
1294
1295
1296
1297
	if hw.NumGPUsPerNode == nil {
		n := int32(gpuInfo.GPUsPerNode)
		hw.NumGPUsPerNode = &n
	}
hhzhang16's avatar
hhzhang16 committed
1298
	if hw.TotalGPUs == nil {
1299
1300
1301
		// TODO: This is a temporary limit to prevent the profiler from using too many GPUs.
		// Will be removed once a fix is in the Profiler/AIC.
		const defaultMaxAutoGPUs = int32(32)
hhzhang16's avatar
hhzhang16 committed
1302
		total := int32(gpuInfo.GPUsPerNode * gpuInfo.NodesWithGPUs)
1303
1304
1305
1306
1307
		if total > defaultMaxAutoGPUs {
			logger.Info("Capping auto-discovered TotalGPUs at default limit; set hardware.totalGpus to override",
				"discovered", total, "cap", defaultMaxAutoGPUs)
			total = defaultMaxAutoGPUs
		}
hhzhang16's avatar
hhzhang16 committed
1308
1309
		hw.TotalGPUs = &total
	}
1310
1311
	return nil
}
1312

1313
1314
1315
1316
// extractModelCachePVCConfig reads model cache PVC settings from the typed v1beta1 spec.
// Returns (pvcName, mountPath) — both empty if not configured.
func extractModelCachePVCConfig(dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest) (string, string) {
	if dgdr.Spec.ModelCache == nil || dgdr.Spec.ModelCache.PVCName == "" {
1317
1318
		return "", ""
	}
1319
	mountPath := dgdr.Spec.ModelCache.PVCMountPath
1320
1321
1322
	if mountPath == "" {
		mountPath = DefaultModelCacheMountPath
	}
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
	return dgdr.Spec.ModelCache.PVCName, mountPath
}

// configMapKeySelector mirrors v1alpha1.ConfigMapKeySelector for annotation deserialization.
type configMapKeySelector struct {
	Name string `json:"name"`
	Key  string `json:"key,omitempty"`
}

// configMapRefFromAnnotation reads the ConfigMap reference from the round-trip annotation.
// Returns nil for native v1beta1 resources (no annotation present).
func configMapRefFromAnnotation(dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest) *configMapKeySelector {
	if dgdr.Annotations == nil {
		return nil
	}
	raw, ok := dgdr.Annotations[AnnotationConfigMapRef]
	if !ok || raw == "" {
		return nil
	}
	var ref configMapKeySelector
	if err := json.Unmarshal([]byte(raw), &ref); err != nil {
		return nil
	}
	return &ref
}
1348

1349
1350
1351
1352
1353
1354
1355
// outputPVCFromAnnotation reads the output PVC name from the round-trip annotation.
// Returns "" for native v1beta1 resources (always emptyDir).
func outputPVCFromAnnotation(dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest) string {
	if dgdr.Annotations == nil {
		return ""
	}
	return dgdr.Annotations[AnnotationOutputPVC]
1356
1357
}

1358
// checkProfilingJobStatus checks if the profiling job has completed
1359
func (r *DynamoGraphDeploymentRequestReconciler) checkProfilingJobStatus(ctx context.Context, dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest) (bool, error) {
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
	logger := log.FromContext(ctx)
	jobName := getProfilingJobName(dgdr)

	job := &batchv1.Job{}
	if err := r.Get(ctx, types.NamespacedName{Name: jobName, Namespace: dgdr.Namespace}, job); err != nil {
		return false, err
	}

	// Check job conditions
	for _, condition := range job.Status.Conditions {
		if condition.Type == batchv1.JobComplete && condition.Status == corev1.ConditionTrue {
			logger.Info("Profiling job completed", "job", jobName)
			return true, nil
		}
		if condition.Type == batchv1.JobFailed && condition.Status == corev1.ConditionTrue {
1375
1376
1377
1378
1379
			// Get detailed error from pod logs
			detailedError := r.getProfilingJobErrorDetails(ctx, dgdr, job)
			if detailedError != "" {
				return false, fmt.Errorf("profiling job failed: %s. Details: %s", condition.Message, detailedError)
			}
1380
1381
1382
1383
1384
1385
1386
			return false, fmt.Errorf("profiling job failed: %s", condition.Message)
		}
	}

	return false, nil
}

1387
// getProfilingJobErrorDetails retrieves detailed error information from failed profiling job pods
1388
func (r *DynamoGraphDeploymentRequestReconciler) getProfilingJobErrorDetails(ctx context.Context, dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest, job *batchv1.Job) string {
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
	logger := log.FromContext(ctx)

	// List pods owned by this job
	podList := &corev1.PodList{}
	labelSelector := client.MatchingLabels{
		"job-name": job.Name,
	}

	if err := r.List(ctx, podList, client.InNamespace(dgdr.Namespace), labelSelector); err != nil {
		logger.Error(err, "Failed to list pods for profiling job")
		return ""
	}

	// Look for failed pods and extract error details
	for _, pod := range podList.Items {
		// Check pod phase and container statuses
		if pod.Status.Phase == corev1.PodFailed {
			// Get profiler container status (first container)
			for _, containerStatus := range pod.Status.ContainerStatuses {
				if containerStatus.Name == ContainerNameProfiler && containerStatus.State.Terminated != nil {
					terminated := containerStatus.State.Terminated
					// Construct detailed error message
					errorMsg := fmt.Sprintf("Pod: %s, Container: %s, ExitCode: %d, Reason: %s",
						pod.Name, containerStatus.Name, terminated.ExitCode, terminated.Reason)
					if terminated.Message != "" {
						errorMsg += fmt.Sprintf(", Message: %s", terminated.Message)
					}
					logger.Info("Retrieved profiling job error details", "error", errorMsg)
					return errorMsg
				}
			}

			// If no terminated state found, check waiting state
			for _, containerStatus := range pod.Status.ContainerStatuses {
				if containerStatus.Name == ContainerNameProfiler && containerStatus.State.Waiting != nil {
					waiting := containerStatus.State.Waiting
					errorMsg := fmt.Sprintf("Pod: %s, Container: %s, Waiting - Reason: %s, Message: %s",
						pod.Name, containerStatus.Name, waiting.Reason, waiting.Message)
					logger.Info("Retrieved profiling job waiting details", "error", errorMsg)
					return errorMsg
				}
			}
		}
	}

	return ""
}

1437
1438
1439
1440
// generateDGDSpec reads profiling output from the sidecar ConfigMap, extracts the
// DynamoGraphDeployment spec and pareto configs, stores the spec in an annotation via
// r.Update, and returns the ProfilingResultsStatus and DGD name.
func (r *DynamoGraphDeploymentRequestReconciler) generateDGDSpec(ctx context.Context, dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest) (*nvidiacomv1beta1.ProfilingResultsStatus, string, error) {
1441
	logger := log.FromContext(ctx)
1442
	logger.Info("Generating DGD spec from profiling results", "name", dgdr.Name, "backend", dgdr.Spec.Backend)
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453

	// Read the generated spec from ConfigMap (created by sidecar)
	outputConfigMapName := getOutputConfigMapName(dgdr)
	cm := &corev1.ConfigMap{}
	err := r.Get(ctx, types.NamespacedName{
		Name:      outputConfigMapName,
		Namespace: dgdr.Namespace,
	}, cm)

	if err != nil {
		if apierrors.IsNotFound(err) {
1454
			return nil, "", fmt.Errorf("output ConfigMap %s not found - profiling may not have completed yet", outputConfigMapName)
1455
		}
1456
		return nil, "", fmt.Errorf("failed to get output ConfigMap: %w", err)
1457
1458
	}

1459
	// Select the right config file based on mocker feature flag
1460
1461
	// Profiler writes the selected config (real or mocker) to a single output file
	outputFile := ProfilingOutputFile
1462

1463
	// Get YAML content from ConfigMap
1464
	yamlContent, exists := cm.Data[outputFile]
1465
	if !exists {
1466
		return nil, "", fmt.Errorf("key %s not found in ConfigMap %s", outputFile, outputConfigMapName)
1467
1468
	}

1469
	logger.Info("Found profiling output in ConfigMap", "configMap", outputConfigMapName, "outputFile", outputFile, "size", len(yamlContent))
1470

1471
1472
1473
	// Extract DGD and any supporting resources from potentially multi-document YAML (ConfigMap + DGD)
	dgd, additionalResources, err := r.extractResourcesFromYAML([]byte(yamlContent))
	if err != nil {
1474
		return nil, "", fmt.Errorf("failed to extract DGD from %s: %w", outputFile, err)
1475
1476
	}

1477
1478
1479
1480
1481
	logger.Info("Parsed profiling output", "dgdName", dgd.Name, "additionalResources", len(additionalResources))

	if len(additionalResources) > 0 {
		if err := r.storeAdditionalResources(ctx, dgdr, additionalResources); err != nil {
			logger.Error(err, "Failed to store additional resources")
1482
			return nil, "", err
1483
		}
1484
1485
		// storeAdditionalResources calls r.Update internally, bumping resourceVersion.
		// Refetch so the subsequent r.Update for the spec annotation doesn't 409.
1486
		if err := r.Get(ctx, types.NamespacedName{Name: dgdr.Name, Namespace: dgdr.Namespace}, dgdr); err != nil {
1487
			return nil, "", fmt.Errorf("failed to refetch DGDR after storing additional resources: %w", err)
1488
1489
		}
	}
1490

1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
	profilingResults := &nvidiacomv1beta1.ProfilingResultsStatus{}
	if webUIData, ok := cm.Data["webui_data.json"]; ok {
		pareto, err := extractParetoFromWebUIData([]byte(webUIData))
		if err != nil {
			logger.Error(err, "Failed to parse webui_data.json; skipping pareto population")
		} else {
			profilingResults.Pareto = pareto
			logger.Info("Populated ProfilingResults.Pareto", "count", len(pareto))
		}
	}
1501

1502
	// Store the generated DGD in ProfilingResults.SelectedConfig
1503
1504
	dgdJSON, err := json.Marshal(dgd)
	if err != nil {
1505
		return nil, "", fmt.Errorf("failed to marshal generated DGD to JSON: %w", err)
1506
	}
1507
	profilingResults.SelectedConfig = &runtime.RawExtension{Raw: dgdJSON}
1508
1509
1510
1511

	// Serialize the DGD spec to an annotation so createDGD can retrieve it
	dgdBytes, err := sigsyaml.Marshal(dgd)
	if err != nil {
1512
		return nil, "", fmt.Errorf("failed to marshal generated DGD: %w", err)
1513
1514
1515
1516
1517
1518
1519
	}
	if dgdr.Annotations == nil {
		dgdr.Annotations = make(map[string]string)
	}
	dgdr.Annotations["nvidia.com/generated-dgd-spec"] = string(dgdBytes)

	if err := r.Update(ctx, dgdr); err != nil {
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
		return nil, "", fmt.Errorf("failed to update DGDR with generated DGD annotation: %w", err)
	}
	return profilingResults, dgd.Name, nil
}

// extractParetoFromWebUIData parses webui_data.json and returns all Pareto-optimal
// deployment configurations from the cost table. Each row's last column ("Action")
// is a partial DynamoGraphDeployment YAML snippet.
func extractParetoFromWebUIData(data []byte) ([]nvidiacomv1beta1.ParetoConfig, error) {
	var parsed struct {
		Cost struct {
			Table struct {
				Data [][]json.RawMessage `json:"data"`
			} `json:"table"`
		} `json:"cost"`
	}
	if err := json.Unmarshal(data, &parsed); err != nil {
		return nil, fmt.Errorf("failed to unmarshal webui_data.json: %w", err)
1538
1539
	}

1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
	rows := parsed.Cost.Table.Data
	if len(rows) == 0 {
		return nil, nil
	}

	// Schema: [TTFT(ms), PrefillThpt, ITL(ms), DecodeThpt, TokensPerUser, GPUHours, ActionYAML]
	const minColumns = 7
	const actionColumnIndex = 6

	pareto := make([]nvidiacomv1beta1.ParetoConfig, 0, len(rows))
	for _, row := range rows {
		if len(row) < minColumns {
			continue
		}

		var actionYAML string
		if err := json.Unmarshal(row[actionColumnIndex], &actionYAML); err != nil {
			continue
		}

		var configObj map[string]interface{}
		if err := sigsyaml.Unmarshal([]byte(stripYAMLComments(actionYAML)), &configObj); err != nil {
			continue
		}

		if len(configObj) == 0 {
			continue
		}

		configJSON, err := json.Marshal(configObj)
		if err != nil {
			continue
		}

		pareto = append(pareto, nvidiacomv1beta1.ParetoConfig{
			Config: runtime.RawExtension{Raw: configJSON},
		})
1577
1578
	}

1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
	return pareto, nil
}

// stripYAMLComments removes comment lines (lines whose first non-whitespace character
// is '#') from a YAML string. The profiler prefixes action snippets with comment lines.
func stripYAMLComments(s string) string {
	lines := strings.Split(s, "\n")
	out := lines[:0] // reuse backing array; write index always <= range read index
	for _, line := range lines {
		if !strings.HasPrefix(strings.TrimLeft(line, " \t"), "#") {
			out = append(out, line)
		}
	}
	return strings.Join(out, "\n")
1593
1594
}

1595
1596
// storeAdditionalResources marshals additional resources to YAML and stores them in DGDR annotations.
// Validates annotation size and fails gracefully if too large.
1597
func (r *DynamoGraphDeploymentRequestReconciler) storeAdditionalResources(ctx context.Context, dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest, resources []*unstructured.Unstructured) error {
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631
	if len(resources) == 0 {
		return nil
	}

	var resourcesYAML []byte

	for i, res := range resources {
		resYAML, err := sigsyaml.Marshal(res.Object)
		if err != nil {
			return fmt.Errorf("failed to marshal resource %s/%s: %w", res.GetKind(), res.GetName(), err)
		}
		if i > 0 {
			resourcesYAML = append(resourcesYAML, []byte("\n---\n")...)
		}
		resourcesYAML = append(resourcesYAML, resYAML...)
	}

	// Validate size before storing
	if len(resourcesYAML) > MaxAnnotationSize {
		return fmt.Errorf("additional resources YAML size (%d bytes) exceeds maximum annotation size (%d bytes); "+
			"consider reducing the number of resources or storing them separately",
			len(resourcesYAML), MaxAnnotationSize)
	}

	if dgdr.Annotations == nil {
		dgdr.Annotations = make(map[string]string)
	}
	dgdr.Annotations[AnnotationAdditionalResources] = string(resourcesYAML)

	return r.Update(ctx, dgdr)
}

// extractResourcesFromYAML parses multi-document YAML from profiling output,
// extracting the DynamoGraphDeployment and any ConfigMaps that should be deployed with it.
1632
func (r *DynamoGraphDeploymentRequestReconciler) extractResourcesFromYAML(yamlContent []byte) (*dgdv1alpha1.DynamoGraphDeployment, []*unstructured.Unstructured, error) {
1633
1634
	decoder := yaml.NewYAMLOrJSONDecoder(bytes.NewReader(yamlContent), 4096)

1635
	var dgd *dgdv1alpha1.DynamoGraphDeployment
1636
1637
1638
1639
1640
1641
1642
1643
1644
1645
1646
1647
1648
1649
1650
1651
1652
1653
	var additionalResources []*unstructured.Unstructured

	for {
		obj := &unstructured.Unstructured{}
		if err := decoder.Decode(obj); err != nil {
			if err == io.EOF {
				break
			}
			// Skip invalid documents and continue
			continue
		}

		// Skip empty objects
		if obj.GetKind() == "" {
			continue
		}

		if obj.GetKind() == "DynamoGraphDeployment" {
1654
			dgd = &dgdv1alpha1.DynamoGraphDeployment{}
1655
1656
1657
1658
1659
1660
1661
1662
1663
1664
1665
1666
1667
1668
1669
1670
1671
			if err := runtime.DefaultUnstructuredConverter.FromUnstructured(obj.Object, dgd); err != nil {
				return nil, nil, fmt.Errorf("failed to convert to DynamoGraphDeployment: %w", err)
			}
		} else {
			// Store ConfigMaps or other resources for deployment
			additionalResources = append(additionalResources, obj)
		}
	}

	if dgd == nil {
		return nil, nil, fmt.Errorf("no DynamoGraphDeployment found in YAML content")
	}

	return dgd, additionalResources, nil
}

// extractDGDFromYAML is a convenience wrapper that extracts only the DGD (used by tests)
1672
func (r *DynamoGraphDeploymentRequestReconciler) extractDGDFromYAML(yamlContent []byte) (*dgdv1alpha1.DynamoGraphDeployment, error) {
1673
1674
1675
1676
	dgd, _, err := r.extractResourcesFromYAML(yamlContent)
	return dgd, err
}

1677
1678
1679
1680
1681
1682
1683
1684
1685
1686
1687
1688
1689
1690
1691
1692
1693
1694
1695
1696
1697
1698
1699
1700
1701
1702
1703
1704
1705
1706
1707
1708
1709
1710
1711
1712
1713
1714
1715
1716
1717
1718
1719
1720
1721
1722
1723
1724
1725
1726
1727
1728
1729
1730
1731
1732
1733
1734
1735
1736
1737
// updateDeploymentInfo populates status.deploymentInfo from DGD service replica counts.
func updateDeploymentInfo(dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest, dgd *dgdv1alpha1.DynamoGraphDeployment) bool {
	var totalReplicas, totalAvailable int32
	for _, svc := range dgd.Status.Services {
		totalReplicas += svc.Replicas
		if svc.AvailableReplicas != nil {
			totalAvailable += *svc.AvailableReplicas
		}
	}

	// Short-circuit if nothing changed
	if cur := dgdr.Status.DeploymentInfo; cur != nil &&
		cur.Replicas != nil && *cur.Replicas == totalReplicas &&
		cur.AvailableReplicas != nil && *cur.AvailableReplicas == totalAvailable {
		return false
	}

	dgdr.Status.DeploymentInfo = &nvidiacomv1beta1.DeploymentInfoStatus{
		Replicas:          &totalReplicas,
		AvailableReplicas: &totalAvailable,
	}
	return true
}

// setSucceededCondition sets the aggregate Succeeded condition based on the current phase.
func setSucceededCondition(dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest, phase nvidiacomv1beta1.DGDRPhase) {
	var status metav1.ConditionStatus
	var reason, message string

	switch phase {
	case nvidiacomv1beta1.DGDRPhasePending, "":
		status, reason, message = metav1.ConditionFalse, "Pending", "DGDR is pending"
	case nvidiacomv1beta1.DGDRPhaseProfiling:
		status, reason, message = metav1.ConditionFalse, "Profiling", "Profiling is in progress"
	case nvidiacomv1beta1.DGDRPhaseReady:
		status, reason, message = metav1.ConditionTrue, "SpecGenerated", "Profiling complete, spec available"
	case nvidiacomv1beta1.DGDRPhaseDeploying:
		status, reason, message = metav1.ConditionFalse, "Deploying", "Deployment is in progress"
	case nvidiacomv1beta1.DGDRPhaseDeployed:
		status, reason, message = metav1.ConditionTrue, "Deployed", "Deployment is healthy"
	case nvidiacomv1beta1.DGDRPhaseFailed:
		status, reason, message = metav1.ConditionFalse, "Failed", "DGDR has failed"
	default:
		status, reason, message = metav1.ConditionFalse, "Unknown", "Unknown phase"
	}

	meta.SetStatusCondition(&dgdr.Status.Conditions, metav1.Condition{
		Type:               nvidiacomv1beta1.ConditionTypeSucceeded,
		Status:             status,
		ObservedGeneration: dgdr.Generation,
		Reason:             reason,
		Message:            message,
	})
}

// updatePhaseAndRequeue updates the DGDR phase and requeues
func (r *DynamoGraphDeploymentRequestReconciler) updatePhaseAndRequeue(ctx context.Context, dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest, phase nvidiacomv1beta1.DGDRPhase, message string) (ctrl.Result, error) {
	logger := log.FromContext(ctx)
	logger.Info("Updating DGDR phase", "name", dgdr.Name, "phase", phase, "message", message)
	dgdr.Status.Phase = phase
	setSucceededCondition(dgdr, phase)
1738
1739
1740
1741
1742
1743
	if err := r.Status().Update(ctx, dgdr); err != nil {
		return ctrl.Result{}, err
	}
	return ctrl.Result{Requeue: true}, nil
}

1744
1745
// updatePhaseWithCondition updates phase and adds/updates a condition
func (r *DynamoGraphDeploymentRequestReconciler) updatePhaseWithCondition(
1746
	ctx context.Context,
1747
1748
	dgdr *nvidiacomv1beta1.DynamoGraphDeploymentRequest,
	phase nvidiacomv1beta1.DGDRPhase,
1749
1750
1751
1752
1753
	conditionType string,
	status metav1.ConditionStatus,
	reason string,
	message string,
) (ctrl.Result, error) {
1754
1755
	dgdr.Status.Phase = phase
	setSucceededCondition(dgdr, phase)
1756
1757
1758
1759
1760
1761
1762
1763
1764
1765
1766
1767
1768
1769
1770
1771
1772
1773
1774
1775
1776
1777

	condition := metav1.Condition{
		Type:               conditionType,
		Status:             status,
		ObservedGeneration: dgdr.Generation,
		LastTransitionTime: metav1.Now(),
		Reason:             reason,
		Message:            message,
	}

	dgdr.AddStatusCondition(condition)

	if err := r.Status().Update(ctx, dgdr); err != nil {
		return ctrl.Result{}, err
	}

	return ctrl.Result{Requeue: true}, nil
}

// SetupWithManager sets up the controller with the Manager
func (r *DynamoGraphDeploymentRequestReconciler) SetupWithManager(mgr ctrl.Manager) error {
	return ctrl.NewControllerManagedBy(mgr).
1778
		For(&nvidiacomv1beta1.DynamoGraphDeploymentRequest{}).
1779
		Named(consts.ResourceTypeDynamoGraphDeploymentRequest).
1780
1781
1782
1783
1784
1785
1786
1787
		Owns(&batchv1.Job{}, builder.WithPredicates(predicate.Funcs{
			// ignore creation cause we don't want to be called again after we create the job
			CreateFunc:  func(ce event.CreateEvent) bool { return false },
			DeleteFunc:  func(de event.DeleteEvent) bool { return true },
			UpdateFunc:  func(de event.UpdateEvent) bool { return true },
			GenericFunc: func(ge event.GenericEvent) bool { return true },
		})). // Watch Jobs created by this controller (via ownerReference)
		Watches(
1788
			&dgdv1alpha1.DynamoGraphDeployment{},
1789
1790
			handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, obj client.Object) []ctrl.Request {
				// Find DGDR by label instead of owner reference
1791
1792
1793
				dgd := obj.(*dgdv1alpha1.DynamoGraphDeployment)
				dgdrName, hasName := dgd.Labels[nvidiacomv1beta1.LabelDGDRName]
				dgdrNamespace, hasNamespace := dgd.Labels[nvidiacomv1beta1.LabelDGDRNamespace]
1794
1795
1796
1797
1798
1799
1800
1801
1802
1803
1804
1805
1806
1807
1808
1809
1810
				if !hasName || !hasNamespace {
					return nil
				}
				return []ctrl.Request{{
					NamespacedName: types.NamespacedName{
						Name:      dgdrName,
						Namespace: dgdrNamespace,
					},
				}}
			}),
			builder.WithPredicates(predicate.Funcs{
				// ignore creation cause we don't want to be called again after we create the DGD
				CreateFunc:  func(ce event.CreateEvent) bool { return false },
				DeleteFunc:  func(de event.DeleteEvent) bool { return true },
				UpdateFunc:  func(ue event.UpdateEvent) bool { return true },
				GenericFunc: func(ge event.GenericEvent) bool { return true },
			}),
1811
1812
1813
1814
		).
		// Watch DGDs created by this controller (via label)
		// Set the event filter to ignore resources handled by other controllers in namespace-restricted mode
		WithEventFilter(commonController.EphemeralDeploymentEventFilter(r.Config, r.RuntimeConfig)).
1815
		Complete(observability.NewObservedReconciler(r, consts.ResourceTypeDynamoGraphDeploymentRequest))
1816
}