dynamographdeployment_controller.go 36.1 KB
Newer Older
Neelay Shah's avatar
Neelay Shah committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/*
 * SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
 * SPDX-License-Identifier: Apache-2.0
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package controller

import (
	"context"
22
	"fmt"
23
	"sort"
24
	"strings"
Neelay Shah's avatar
Neelay Shah committed
25

26
	grovev1alpha1 "github.com/NVIDIA/grove/operator/api/core/v1alpha1"
27
28
	"k8s.io/apimachinery/pkg/api/errors"

29
30
	"github.com/ai-dynamo/dynamo/deploy/operator/internal/discovery"
	"github.com/ai-dynamo/dynamo/deploy/operator/internal/secret"
31

32
33
34
	networkingv1beta1 "istio.io/client-go/pkg/apis/networking/v1beta1"
	corev1 "k8s.io/api/core/v1"
	networkingv1 "k8s.io/api/networking/v1"
Neelay Shah's avatar
Neelay Shah committed
35
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
36
	"k8s.io/apimachinery/pkg/runtime/schema"
37
	"k8s.io/apimachinery/pkg/types"
38
	"k8s.io/client-go/scale"
Neelay Shah's avatar
Neelay Shah committed
39
40
41
42
	"k8s.io/client-go/tools/record"
	ctrl "sigs.k8s.io/controller-runtime"
	"sigs.k8s.io/controller-runtime/pkg/builder"
	"sigs.k8s.io/controller-runtime/pkg/client"
43
	"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
Neelay Shah's avatar
Neelay Shah committed
44
	"sigs.k8s.io/controller-runtime/pkg/event"
45
	"sigs.k8s.io/controller-runtime/pkg/handler"
Neelay Shah's avatar
Neelay Shah committed
46
47
48
	"sigs.k8s.io/controller-runtime/pkg/log"
	"sigs.k8s.io/controller-runtime/pkg/predicate"

49
50
51
52
53
54
	"github.com/ai-dynamo/dynamo/deploy/operator/api/v1alpha1"
	nvidiacomv1alpha1 "github.com/ai-dynamo/dynamo/deploy/operator/api/v1alpha1"
	"github.com/ai-dynamo/dynamo/deploy/operator/internal/consts"
	commoncontroller "github.com/ai-dynamo/dynamo/deploy/operator/internal/controller_common"
	"github.com/ai-dynamo/dynamo/deploy/operator/internal/dynamo"
	webhookvalidation "github.com/ai-dynamo/dynamo/deploy/operator/internal/webhook/validation"
55
	rbacv1 "k8s.io/api/rbac/v1"
Neelay Shah's avatar
Neelay Shah committed
56
57
)

58
59
60
61
type State string
type Reason string
type Message string

Neelay Shah's avatar
Neelay Shah committed
62
const (
63
64
65
	FailedState  State = "failed"
	ReadyState   State = "successful"
	PendingState State = "pending"
Neelay Shah's avatar
Neelay Shah committed
66
67
)

68
69
70
71
type etcdStorage interface {
	DeleteKeys(ctx context.Context, prefix string) error
}

72
73
74
75
76
// rbacManager interface for managing RBAC resources
type rbacManager interface {
	EnsureServiceAccountWithRBAC(ctx context.Context, targetNamespace, serviceAccountName, clusterRoleName string) error
}

77
78
// DynamoGraphDeploymentReconciler reconciles a DynamoGraphDeployment object
type DynamoGraphDeploymentReconciler struct {
Neelay Shah's avatar
Neelay Shah committed
79
	client.Client
80
	Config                commoncontroller.Config
81
82
	Recorder              record.EventRecorder
	DockerSecretRetriever dockerSecretRetriever
83
	ScaleClient           scale.ScalesGetter
84
	MPISecretReplicator   *secret.SecretReplicator
85
	RBACManager           rbacManager
Neelay Shah's avatar
Neelay Shah committed
86
87
}

88
89
90
// +kubebuilder:rbac:groups=nvidia.com,resources=dynamographdeployments,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=nvidia.com,resources=dynamographdeployments/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=nvidia.com,resources=dynamographdeployments/finalizers,verbs=update
91
// +kubebuilder:rbac:groups=nvidia.com,resources=dynamographdeploymentscalingadapters,verbs=get;list;watch;create;update;patch;delete
92
// +kubebuilder:rbac:groups=grove.io,resources=podcliquesets,verbs=get;list;watch;create;update;patch;delete
93
94
// +kubebuilder:rbac:groups=grove.io,resources=podcliques/scale,verbs=get;update;patch
// +kubebuilder:rbac:groups=grove.io,resources=podcliquescalinggroups/scale,verbs=get;update;patch
95
// +kubebuilder:rbac:groups=scheduling.run.ai,resources=queues,verbs=get;list
Neelay Shah's avatar
Neelay Shah committed
96
97
98
99

// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
// TODO(user): Modify the Reconcile function to compare the state specified by
100
// the DynamoGraphDeployment object against the actual cluster state, and then
Neelay Shah's avatar
Neelay Shah committed
101
102
103
104
105
// perform operations to make the cluster state reflect the state specified by
// the user.
//
// For more details, check Reconcile and its Result here:
// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.19.1/pkg/reconcile
106
func (r *DynamoGraphDeploymentReconciler) Reconcile(ctx context.Context, req ctrl.Request) (result ctrl.Result, err error) {
Neelay Shah's avatar
Neelay Shah committed
107
108
	logger := log.FromContext(ctx)

109
110
111
	reason := Reason("undefined")
	message := Message("")
	state := PendingState
Neelay Shah's avatar
Neelay Shah committed
112
	// retrieve the CRD
113
	dynamoDeployment := &nvidiacomv1alpha1.DynamoGraphDeployment{}
Neelay Shah's avatar
Neelay Shah committed
114
115
116
117
118
	if err = r.Get(ctx, req.NamespacedName, dynamoDeployment); err != nil {
		return ctrl.Result{}, client.IgnoreNotFound(err)
	}

	defer func() {
119
120
121
122
123
124
		// Skip status update if DGD is being deleted
		if !dynamoDeployment.GetDeletionTimestamp().IsZero() {
			logger.Info("Reconciliation done - skipping status update for deleted resource")
			return
		}

Neelay Shah's avatar
Neelay Shah committed
125
		if err != nil {
126
127
			state = FailedState
			message = Message(err.Error())
128
			logger.Error(err, "Reconciliation failed")
Neelay Shah's avatar
Neelay Shah committed
129
		}
130
		dynamoDeployment.SetState(string(state))
131
132

		readyStatus := metav1.ConditionFalse
133
134
135
		if state == ReadyState {
			readyStatus = metav1.ConditionTrue
		}
136
137

		// Update Ready condition
138
139
140
		dynamoDeployment.AddStatusCondition(metav1.Condition{
			Type:               "Ready",
			Status:             readyStatus,
141
142
			Reason:             string(reason),
			Message:            string(message),
143
144
			LastTransitionTime: metav1.Now(),
		})
145

146
147
148
149
150
151
152
		updateErr := r.Status().Update(ctx, dynamoDeployment)
		if updateErr != nil {
			logger.Error(updateErr, "Unable to update the CRD status", "crd", req.NamespacedName, "state", state, "reason", reason, "message", message)
			// Set err to trigger requeue
			if err == nil {
				err = updateErr
			}
Neelay Shah's avatar
Neelay Shah committed
153
154
155
156
		}
		logger.Info("Reconciliation done")
	}()

157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
	// Validate the DynamoGraphDeployment spec (defense in depth - only when webhooks are disabled)
	if !r.Config.WebhooksEnabled {
		validator := webhookvalidation.NewDynamoGraphDeploymentValidator(dynamoDeployment)
		if _, validationErr := validator.Validate(); validationErr != nil {
			logger.Error(validationErr, "DynamoGraphDeployment validation failed, refusing to reconcile")

			// Set validation error state and reason (defer will update status)
			state = FailedState
			reason = Reason("ValidationFailed")
			message = Message(fmt.Sprintf("Validation failed: %v", validationErr))

			// Record event for visibility
			r.Recorder.Event(dynamoDeployment, corev1.EventTypeWarning, "ValidationFailed", validationErr.Error())

			// Don't requeue - user must fix the spec
			logger.Info("DynamoGraphDeployment is invalid, not reconciling until spec is fixed")

			// Return without error so defer updates status but doesn't requeue
			return ctrl.Result{}, nil
		}
	}

179
	deleted, err := commoncontroller.HandleFinalizer(ctx, dynamoDeployment, r.Client, r)
180
	if err != nil {
181
		logger.Error(err, "failed to handle the finalizer")
182
183
184
185
186
187
		reason = "failed_to_handle_the_finalizer"
		return ctrl.Result{}, err
	}
	if deleted {
		return ctrl.Result{}, nil
	}
188
189
190
191
192
193
	reconcileResult, err := r.reconcileResources(ctx, dynamoDeployment)
	state = reconcileResult.State
	reason = reconcileResult.Reason
	message = reconcileResult.Message
	dynamoDeployment.Status.Services = reconcileResult.ServiceStatus

Neelay Shah's avatar
Neelay Shah committed
194
	if err != nil {
195
196
		logger.Error(err, "failed to reconcile the resources")
		reason = "failed_to_reconcile_the_resources"
Neelay Shah's avatar
Neelay Shah committed
197
198
		return ctrl.Result{}, err
	}
199
200
	return ctrl.Result{}, nil
}
Neelay Shah's avatar
Neelay Shah committed
201

202
type Resource interface {
203
	IsReady() (ready bool, reason string)
204
	GetName() string
205
206
207
208
209
210
211
212
	GetServiceStatuses() map[string]v1alpha1.ServiceReplicaStatus
}

type ReconcileResult struct {
	State         State
	Reason        Reason
	Message       Message
	ServiceStatus map[string]nvidiacomv1alpha1.ServiceReplicaStatus
213
}
Neelay Shah's avatar
Neelay Shah committed
214

215
func (r *DynamoGraphDeploymentReconciler) reconcileResources(ctx context.Context, dynamoDeployment *nvidiacomv1alpha1.DynamoGraphDeployment) (ReconcileResult, error) {
216
	logger := log.FromContext(ctx)
217

218
219
220
	// Ensure planner RBAC exists in cluster-wide mode
	if r.Config.RestrictedNamespace == "" {
		if r.RBACManager == nil {
221
			return ReconcileResult{}, fmt.Errorf("RBAC manager not initialized in cluster-wide mode")
222
223
		}
		if r.Config.RBAC.PlannerClusterRoleName == "" {
224
			return ReconcileResult{}, fmt.Errorf("planner ClusterRole name is required in cluster-wide mode")
225
226
227
228
229
230
231
232
		}
		if err := r.RBACManager.EnsureServiceAccountWithRBAC(
			ctx,
			dynamoDeployment.Namespace,
			consts.PlannerServiceAccountName,
			r.Config.RBAC.PlannerClusterRoleName,
		); err != nil {
			logger.Error(err, "Failed to ensure planner RBAC")
233
			return ReconcileResult{}, fmt.Errorf("failed to ensure planner RBAC: %w", err)
234
235
236
		}
	}

237
238
239
240
	// Reconcile top-level PVCs first
	err := r.reconcilePVCs(ctx, dynamoDeployment)
	if err != nil {
		logger.Error(err, "Failed to reconcile top-level PVCs")
241
		return ReconcileResult{}, fmt.Errorf("failed to reconcile top-level PVCs: %w", err)
242
243
	}

244
245
246
247
	// Reconcile DynamoGraphDeploymentScalingAdapters for each service
	err = r.reconcileScalingAdapters(ctx, dynamoDeployment)
	if err != nil {
		logger.Error(err, "Failed to reconcile scaling adapters")
248
		return ReconcileResult{}, fmt.Errorf("failed to reconcile scaling adapters: %w", err)
249
250
	}

251
252
253
254
	// Reconcile the SA, Role and RoleBinding if k8s discovery is enabled
	err = r.reconcileK8sDiscoveryResources(ctx, dynamoDeployment)
	if err != nil {
		logger.Error(err, "Failed to reconcile K8s discovery resources")
255
		return ReconcileResult{}, fmt.Errorf("failed to reconcile K8s discovery resources: %w", err)
256
257
	}

258
259
260
261
262
263
264
265
266
267
268
	// Orchestrator selection via single boolean annotation: nvidia.com/enable-grove
	// Unset or not "false": Grove if available; else component mode
	// "false": component mode (multinode -> LWS; single-node -> standard)
	enableGrove := true
	if dynamoDeployment.Annotations != nil && strings.ToLower(dynamoDeployment.Annotations[consts.KubeAnnotationEnableGrove]) == consts.KubeLabelValueFalse {
		enableGrove = false
	}

	// Determine if any service is multinode
	hasMultinode := dynamoDeployment.HasAnyMultinodeService()

269
270
271
272
273
	// Always ensure MPI SSH secret is available in this namespace
	if r.MPISecretReplicator != nil {
		err := r.MPISecretReplicator.Replicate(ctx, dynamoDeployment.Namespace)
		if err != nil {
			logger.Error(err, "Failed to replicate MPI secret", "namespace", dynamoDeployment.Namespace)
274
			return ReconcileResult{}, fmt.Errorf("failed to replicate MPI secret: %w", err)
275
276
277
		}
	}

278
279
	if enableGrove && r.Config.Grove.Enabled {
		logger.Info("Reconciling Grove resources", "enableGrove", enableGrove, "groveEnabled", r.Config.Grove.Enabled, "hasMultinode", hasMultinode, "lwsEnabled", r.Config.LWS.Enabled)
280
		return r.reconcileGroveResources(ctx, dynamoDeployment)
Neelay Shah's avatar
Neelay Shah committed
281
	}
282
283
284
	if hasMultinode && !r.Config.LWS.Enabled {
		err := fmt.Errorf("no multinode orchestrator available")
		logger.Error(err, err.Error(), "hasMultinode", hasMultinode, "lwsEnabled", r.Config.LWS.Enabled, "enableGrove", enableGrove, "groveEnabled", r.Config.Grove.Enabled)
285
		return ReconcileResult{}, fmt.Errorf("failed to reconcile Dynamo components deployments: %w", err)
286
287
	}
	logger.Info("Reconciling Dynamo components deployments", "hasMultinode", hasMultinode, "lwsEnabled", r.Config.LWS.Enabled, "enableGrove", enableGrove, "groveEnabled", r.Config.Grove.Enabled)
288
	return r.reconcileDynamoComponentsDeployments(ctx, dynamoDeployment)
Neelay Shah's avatar
Neelay Shah committed
289
290
291

}

292
293
294
295
296
297
298
// scaleGroveResource scales a Grove resource using the generic scaling function
func (r *DynamoGraphDeploymentReconciler) scaleGroveResource(ctx context.Context, resourceName, namespace string, newReplicas int32, resourceType string) error {
	logger := log.FromContext(ctx)
	// Determine the GroupVersionResource based on resource type
	var gvr schema.GroupVersionResource
	switch resourceType {
	case "PodClique":
299
		gvr = consts.PodCliqueGVR
300
	case "PodCliqueScalingGroup":
301
		gvr = consts.PodCliqueScalingGroupGVR
302
303
304
305
306
	default:
		return fmt.Errorf("unsupported Grove resource type: %s", resourceType)
	}

	// Use the generic scaling function
307
	err := commoncontroller.ScaleResource(ctx, r.ScaleClient, gvr, namespace, resourceName, newReplicas)
308
309
310
311
312
313
314
315
316
317
	if err != nil {
		if errors.IsNotFound(err) {
			// Resource doesn't exist yet - this is normal during initial creation when Grove is still creating the resources asynchronously
			logger.V(1).Info("Grove resource not found yet, skipping scaling for now - will retry on next reconciliation", "gvr", gvr, "name", resourceName, "namespace", namespace)
			return nil
		}
	}
	return err
}

318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
func (r *DynamoGraphDeploymentReconciler) reconcileGrovePodCliqueSet(ctx context.Context, dynamoDeployment *nvidiacomv1alpha1.DynamoGraphDeployment) (*commoncontroller.Resource, error) {
	logger := log.FromContext(ctx)

	// generate the dynamoComponentsDeployments from the config
	grovePodCliqueSet, err := dynamo.GenerateGrovePodCliqueSet(ctx, dynamoDeployment, r.Config, r.DockerSecretRetriever)
	if err != nil {
		logger.Error(err, "failed to generate the Grove GangSet")
		return nil, fmt.Errorf("failed to generate the Grove GangSet: %w", err)
	}
	_, syncedGrovePodCliqueSet, err := commoncontroller.SyncResource(ctx, r, dynamoDeployment, func(ctx context.Context) (*grovev1alpha1.PodCliqueSet, bool, error) {
		return grovePodCliqueSet, false, nil
	})
	if err != nil {
		logger.Error(err, "failed to sync the Grove GangSet")
		return nil, fmt.Errorf("failed to sync the Grove GangSet: %w", err)
	}
	syncedGrovePodCliqueSetAsResource, err := commoncontroller.NewResourceWithServiceStatuses(
		syncedGrovePodCliqueSet,
		func() (bool, string, map[string]v1alpha1.ServiceReplicaStatus) {
			// Grove readiness: all underlying PodCliques and PodCliqueScalingGroups have replicas == availableReplicas
			allComponentsReady, reason, serviceStatuses := dynamo.GetComponentReadinessAndServiceReplicaStatuses(ctx, r.Client, dynamoDeployment)
			if !allComponentsReady {
				return false, reason, serviceStatuses
			}
			return true, "", serviceStatuses
		},
	)
	if err != nil {
		logger.Error(err, "failed to create the Grove PodClique Set resource")
		return nil, fmt.Errorf("failed to create the Grove PodClique Set resource: %w", err)
	}
	return syncedGrovePodCliqueSetAsResource, nil
}

352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
// reconcileGroveScaling handles scaling operations for Grove resources based on service replica changes
func (r *DynamoGraphDeploymentReconciler) reconcileGroveScaling(ctx context.Context, dynamoDeployment *nvidiacomv1alpha1.DynamoGraphDeployment) error {
	logger := log.FromContext(ctx)
	logger.V(1).Info("Reconciling Grove scaling operations")

	replicaIndex := 0
	for serviceName, component := range dynamoDeployment.Spec.Services {
		// Skip if replicas are not specified
		if component.Replicas == nil {
			continue
		}

		numberOfNodes := component.GetNumberOfNodes()
		isMultinode := numberOfNodes > 1

		if isMultinode {
			// Scale PodCliqueScalingGroup for multinode services
			// Grove naming pattern: {DGD.name}-{replicaIndex}-{serviceName}
			resourceName := fmt.Sprintf("%s-%d-%s", dynamoDeployment.Name, replicaIndex, strings.ToLower(serviceName))
			err := r.scaleGroveResource(ctx,
				resourceName,
				dynamoDeployment.Namespace,
				*component.Replicas,
				"PodCliqueScalingGroup")
			if err != nil {
				logger.Error(err, "Failed to scale PodCliqueScalingGroup", "serviceName", serviceName, "resourceName", resourceName, "replicas", *component.Replicas)
				return fmt.Errorf("failed to scale PodCliqueScalingGroup %s: %w", resourceName, err)
			}
		} else {
			// Scale individual PodClique for single-node services
			// Grove naming pattern: {DGD.name}-{replicaIndex}-{serviceName}
			resourceName := fmt.Sprintf("%s-%d-%s", dynamoDeployment.Name, replicaIndex, strings.ToLower(serviceName))
			err := r.scaleGroveResource(ctx,
				resourceName,
				dynamoDeployment.Namespace,
				*component.Replicas,
				"PodClique")
			if err != nil {
				logger.Error(err, "Failed to scale PodClique", "serviceName", serviceName, "resourceName", resourceName, "replicas", *component.Replicas)
				return fmt.Errorf("failed to scale PodClique %s: %w", resourceName, err)
			}
		}
	}

	logger.V(1).Info("Successfully reconciled Grove scaling operations")
	return nil
}

400
func (r *DynamoGraphDeploymentReconciler) reconcileGroveResources(ctx context.Context, dynamoDeployment *nvidiacomv1alpha1.DynamoGraphDeployment) (ReconcileResult, error) {
401
	logger := log.FromContext(ctx)
402

403
	grovePodCliqueSetAsResource, err := r.reconcileGrovePodCliqueSet(ctx, dynamoDeployment)
404
	if err != nil {
405
406
		logger.Error(err, "failed to reconcile the Grove PodClique Set")
		return ReconcileResult{}, fmt.Errorf("failed to reconcile the Grove PodClique Set: %w", err)
407
	}
408
409
410
411

	// Handle Grove scaling operations after structural changes
	if err := r.reconcileGroveScaling(ctx, dynamoDeployment); err != nil {
		logger.Error(err, "failed to reconcile Grove scaling")
412
		return ReconcileResult{}, fmt.Errorf("failed to reconcile Grove scaling: %w", err)
413
414
	}

415
416
417
418
419
420
421
422
423
	// Reconcile headless services for model endpoint discovery
	if err := dynamo.ReconcileModelServicesForComponents(
		ctx,
		r,
		dynamoDeployment,
		dynamoDeployment.Spec.Services,
		dynamoDeployment.Namespace,
	); err != nil {
		logger.Error(err, "failed to reconcile model services")
424
		return ReconcileResult{}, fmt.Errorf("failed to reconcile model services: %w", err)
425
426
	}

427
	resources := []Resource{grovePodCliqueSetAsResource}
428
	for componentName, component := range dynamoDeployment.Spec.Services {
429
430
431

		// if k8s discovery is enabled, create a service for each component
		// else, only create for the frontend component
432
433
434
		isK8sDiscoveryEnabled := r.Config.IsK8sDiscoveryEnabled(dynamoDeployment.Annotations)
		if isK8sDiscoveryEnabled || component.ComponentType == consts.ComponentTypeFrontend {
			mainComponentService, err := dynamo.GenerateComponentService(ctx, dynamoDeployment, component, componentName, isK8sDiscoveryEnabled)
435
436
			if err != nil {
				logger.Error(err, "failed to generate the main component service")
437
				return ReconcileResult{}, fmt.Errorf("failed to generate the main component service: %w", err)
438
			}
439
			_, syncedMainComponentService, err := commoncontroller.SyncResource(ctx, r, dynamoDeployment, func(ctx context.Context) (*corev1.Service, bool, error) {
440
441
442
443
				return mainComponentService, false, nil
			})
			if err != nil {
				logger.Error(err, "failed to sync the main component service")
444
445
446
447
448
449
450
451
452
453
454
				return ReconcileResult{}, fmt.Errorf("failed to sync the main component service: %w", err)
			}
			if syncedMainComponentService != nil {
				mainComponentServiceAsResource, err := commoncontroller.NewResource(syncedMainComponentService,
					func() (bool, string) {
						return true, ""
					})
				if err != nil {
					return ReconcileResult{}, fmt.Errorf("failed to sync the main component service: %w", err)
				}
				resources = append(resources, mainComponentServiceAsResource)
455
			}
456
457
458
		}

		if component.ComponentType == consts.ComponentTypeFrontend {
459
460
461
462
463
464
			// generate the main component ingress
			ingressSpec := dynamo.GenerateDefaultIngressSpec(dynamoDeployment, r.Config.IngressConfig)
			if component.Ingress != nil {
				ingressSpec = *component.Ingress
			}
			mainComponentIngress := dynamo.GenerateComponentIngress(ctx, dynamo.GetDynamoComponentName(dynamoDeployment, componentName), dynamoDeployment.Namespace, ingressSpec)
465
			_, syncedMainComponentIngress, err := commoncontroller.SyncResource(ctx, r, dynamoDeployment, func(ctx context.Context) (*networkingv1.Ingress, bool, error) {
466
467
468
469
470
471
472
473
				if !ingressSpec.Enabled || ingressSpec.IngressControllerClassName == nil {
					logger.Info("Ingress is not enabled")
					return mainComponentIngress, true, nil
				}
				return mainComponentIngress, false, nil
			})
			if err != nil {
				logger.Error(err, "failed to sync the main component ingress")
474
475
476
477
478
479
480
481
482
483
484
				return ReconcileResult{}, fmt.Errorf("failed to sync the main component ingress: %w", err)
			}
			if syncedMainComponentIngress != nil {
				mainComponentIngressAsResource, err := commoncontroller.NewResource(syncedMainComponentIngress,
					func() (bool, string) {
						return true, ""
					})
				if err != nil {
					return ReconcileResult{}, fmt.Errorf("failed to create the main component ingress resource: %w", err)
				}
				resources = append(resources, mainComponentIngressAsResource)
485
486
			}
			// generate the main component virtual service
487
488
			if r.Config.IngressConfig.UseVirtualService() {
				mainComponentVirtualService := dynamo.GenerateComponentVirtualService(ctx, dynamo.GetDynamoComponentName(dynamoDeployment, componentName), dynamoDeployment.Namespace, ingressSpec)
489
				_, syncedMainComponentVirtualService, err := commoncontroller.SyncResource(ctx, r, dynamoDeployment, func(ctx context.Context) (*networkingv1beta1.VirtualService, bool, error) {
490
491
492
493
494
495
496
497
					if !ingressSpec.IsVirtualServiceEnabled() {
						logger.Info("VirtualService is not enabled")
						return mainComponentVirtualService, true, nil
					}
					return mainComponentVirtualService, false, nil
				})
				if err != nil {
					logger.Error(err, "failed to sync the main component virtual service")
498
499
500
501
502
503
504
505
506
507
508
					return ReconcileResult{}, fmt.Errorf("failed to sync the main component virtual service: %w", err)
				}
				if syncedMainComponentVirtualService != nil {
					mainComponentVirtualServiceAsResource, err := commoncontroller.NewResource(syncedMainComponentVirtualService,
						func() (bool, string) {
							return true, ""
						})
					if err != nil {
						return ReconcileResult{}, fmt.Errorf("failed to create the main component virtual service resource: %w", err)
					}
					resources = append(resources, mainComponentVirtualServiceAsResource)
509
				}
510
			}
511
512
		}
	}
513
	return r.checkResourcesReadiness(resources), nil
514
515
}

516
517
518
519
520
521
func (r *DynamoGraphDeploymentReconciler) checkResourcesReadiness(resources []Resource) ReconcileResult {
	// Sort resources by name to ensure deterministic ordering
	sort.Slice(resources, func(i, j int) bool {
		return resources[i].GetName() < resources[j].GetName()
	})

522
	var notReadyReasons []string
523
	notReadyResources := []string{}
524
	serviceStatuses := make(map[string]v1alpha1.ServiceReplicaStatus)
525
	for _, resource := range resources {
526
		ready, reason := resource.IsReady()
527
528
529
530
531
532

		resourceServiceStatuses := resource.GetServiceStatuses()
		for serviceName, serviceStatus := range resourceServiceStatuses {
			serviceStatuses[serviceName] = serviceStatus
		}

533
		if !ready {
534
			notReadyResources = append(notReadyResources, resource.GetName())
535
			notReadyReasons = append(notReadyReasons, fmt.Sprintf("%s: %s", resource.GetName(), reason))
536
		}
537
	}
538

539
	if len(notReadyResources) == 0 {
540
541
542
543
544
545
546
547
548
549
550
551
		return ReconcileResult{
			State:         ReadyState,
			Reason:        "all_resources_are_ready",
			Message:       Message("All resources are ready"),
			ServiceStatus: serviceStatuses,
		}
	}
	return ReconcileResult{
		State:         PendingState,
		Reason:        "some_resources_are_not_ready",
		Message:       Message(fmt.Sprintf("Resources not ready: %s", strings.Join(notReadyReasons, "; "))),
		ServiceStatus: serviceStatuses,
552
553
554
	}
}

555
func (r *DynamoGraphDeploymentReconciler) reconcileDynamoComponentsDeployments(ctx context.Context, dynamoDeployment *nvidiacomv1alpha1.DynamoGraphDeployment) (ReconcileResult, error) {
556
557
558
559
560
561
562
563
	resources := []Resource{}
	logger := log.FromContext(ctx)

	// generate the dynamoComponentsDeployments from the config
	defaultIngressSpec := dynamo.GenerateDefaultIngressSpec(dynamoDeployment, r.Config.IngressConfig)
	dynamoComponentsDeployments, err := dynamo.GenerateDynamoComponentsDeployments(ctx, dynamoDeployment, &defaultIngressSpec)
	if err != nil {
		logger.Error(err, "failed to generate the DynamoComponentsDeployments")
564
		return ReconcileResult{}, fmt.Errorf("failed to generate the DynamoComponentsDeployments: %w", err)
565
	}
566
567
568
569

	// reconcile the dynamoComponentsDeployments
	for serviceName, dynamoComponentDeployment := range dynamoComponentsDeployments {
		logger.Info("Reconciling the DynamoComponentDeployment", "serviceName", serviceName, "dynamoComponentDeployment", dynamoComponentDeployment)
570
		_, dynamoComponentDeployment, err = commoncontroller.SyncResource(ctx, r, dynamoDeployment, func(ctx context.Context) (*nvidiacomv1alpha1.DynamoComponentDeployment, bool, error) {
571
572
573
574
			return dynamoComponentDeployment, false, nil
		})
		if err != nil {
			logger.Error(err, "failed to sync the DynamoComponentDeployment")
575
			return ReconcileResult{}, fmt.Errorf("failed to sync the DynamoComponentDeployment: %w", err)
576
577
578
579
		}
		resources = append(resources, dynamoComponentDeployment)
	}

580
	return r.checkResourcesReadiness(resources), nil
581
582
}

583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
// reconcilePVC reconciles a single top-level PVC defined in the DynamoGraphDeployment spec
func (r *DynamoGraphDeploymentReconciler) reconcilePVC(ctx context.Context, dynamoDeployment *nvidiacomv1alpha1.DynamoGraphDeployment, pvcName string, pvcConfig nvidiacomv1alpha1.PVC) (*corev1.PersistentVolumeClaim, error) {
	logger := log.FromContext(ctx)

	pvc := &corev1.PersistentVolumeClaim{}
	pvcNamespacedName := types.NamespacedName{Name: pvcName, Namespace: dynamoDeployment.Namespace}
	err := r.Get(ctx, pvcNamespacedName, pvc)
	if err != nil && client.IgnoreNotFound(err) != nil {
		logger.Error(err, "Unable to retrieve top-level PVC", "pvcName", pvcName)
		return nil, err
	}

	// If PVC does not exist, create a new one
	if err != nil {
		if pvcConfig.Create == nil || !*pvcConfig.Create {
			logger.Error(err, "Top-level PVC does not exist and create is not enabled", "pvcName", pvcName)
			return nil, err
		}

		pvc = constructPVC(dynamoDeployment, pvcConfig)
		if err := controllerutil.SetControllerReference(dynamoDeployment, pvc, r.Client.Scheme()); err != nil {
			logger.Error(err, "Failed to set controller reference for top-level PVC", "pvcName", pvcName)
			return nil, err
		}

		err = r.Create(ctx, pvc)
		if err != nil {
			logger.Error(err, "Failed to create top-level PVC", "pvcName", pvcName)
			return nil, err
		}
		logger.Info("Top-level PVC created", "pvcName", pvcName, "namespace", dynamoDeployment.Namespace)
	}

	return pvc, nil
}

619
620
621
622
623
624
625
626
627
628
629
func (r *DynamoGraphDeploymentReconciler) reconcileK8sDiscoveryResources(ctx context.Context, dynamoDeployment *nvidiacomv1alpha1.DynamoGraphDeployment) error {
	logger := log.FromContext(ctx)

	if !r.Config.IsK8sDiscoveryEnabled(dynamoDeployment.Annotations) {
		logger.Info("K8s discovery is not enabled")
		return nil
	} else {
		logger.Info("K8s discovery is enabled")
	}

	serviceAccount := discovery.GetK8sDiscoveryServiceAccount(dynamoDeployment.Name, dynamoDeployment.Namespace)
630
	_, _, err := commoncontroller.SyncResource(ctx, r, dynamoDeployment, func(ctx context.Context) (*corev1.ServiceAccount, bool, error) {
631
632
633
634
635
636
637
638
		return serviceAccount, false, nil
	})
	if err != nil {
		logger.Error(err, "failed to sync the k8s discovery service account")
		return fmt.Errorf("failed to sync the k8s discovery service account: %w", err)
	}

	role := discovery.GetK8sDiscoveryRole(dynamoDeployment.Name, dynamoDeployment.Namespace)
639
	_, _, err = commoncontroller.SyncResource(ctx, r, dynamoDeployment, func(ctx context.Context) (*rbacv1.Role, bool, error) {
640
641
642
643
644
645
646
647
		return role, false, nil
	})
	if err != nil {
		logger.Error(err, "failed to sync the k8s discovery role")
		return fmt.Errorf("failed to sync the k8s discovery role: %w", err)
	}

	roleBinding := discovery.GetK8sDiscoveryRoleBinding(dynamoDeployment.Name, dynamoDeployment.Namespace)
648
	_, _, err = commoncontroller.SyncResource(ctx, r, dynamoDeployment, func(ctx context.Context) (*rbacv1.RoleBinding, bool, error) {
649
650
651
652
653
654
655
656
657
658
659
		return roleBinding, false, nil
	})
	if err != nil {
		logger.Error(err, "failed to sync the k8s discovery role binding")
		return fmt.Errorf("failed to sync the k8s discovery role binding: %w", err)
	}

	return nil

}

660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
// reconcilePVCs reconciles all top-level PVCs defined in the DynamoGraphDeployment spec
func (r *DynamoGraphDeploymentReconciler) reconcilePVCs(ctx context.Context, dynamoDeployment *nvidiacomv1alpha1.DynamoGraphDeployment) error {
	logger := log.FromContext(ctx)

	if dynamoDeployment.Spec.PVCs == nil {
		return nil
	}

	for _, pvcConfig := range dynamoDeployment.Spec.PVCs {
		if pvcConfig.Name == nil || *pvcConfig.Name == "" {
			logger.Error(nil, "PVC not reconcilable: name is required", "pvcConfig", pvcConfig)
			continue
		}

		pvcName := *pvcConfig.Name
		logger.Info("Reconciling top-level PVC", "pvcName", pvcName, "namespace", dynamoDeployment.Namespace)

		_, err := r.reconcilePVC(ctx, dynamoDeployment, pvcName, pvcConfig)
		if err != nil {
			return err
		}
	}

	return nil
}

686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
// reconcileScalingAdapters ensures a DynamoGraphDeploymentScalingAdapter exists for each service in the DGD
// that has scaling adapter enabled (default). Services with scalingAdapter.disable=true will not have a DGDSA.
// This enables pluggable autoscaling via HPA, KEDA, or Planner.
func (r *DynamoGraphDeploymentReconciler) reconcileScalingAdapters(ctx context.Context, dynamoDeployment *nvidiacomv1alpha1.DynamoGraphDeployment) error {
	logger := log.FromContext(ctx)

	// Process each service - SyncResource handles create, update, and delete via toDelete flag
	for serviceName, component := range dynamoDeployment.Spec.Services {
		// Check if scaling adapter is disabled for this service
		scalingAdapterDisabled := component.ScalingAdapter != nil && component.ScalingAdapter.Disable

		// Get current replicas (default to 1 if not set)
		currentReplicas := int32(1)
		if component.Replicas != nil {
			currentReplicas = *component.Replicas
		}

		// Use SyncResource to handle creation/updates/deletion
		// When toDelete=true, SyncResource will delete the existing resource if it exists
705
		_, _, err := commoncontroller.SyncResource(ctx, r, dynamoDeployment, func(ctx context.Context) (*nvidiacomv1alpha1.DynamoGraphDeploymentScalingAdapter, bool, error) {
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
			adapterName := generateAdapterName(dynamoDeployment.Name, serviceName)
			adapter := &nvidiacomv1alpha1.DynamoGraphDeploymentScalingAdapter{
				ObjectMeta: metav1.ObjectMeta{
					Name:      adapterName,
					Namespace: dynamoDeployment.Namespace,
					Labels: map[string]string{
						consts.KubeLabelDynamoGraphDeploymentName: dynamoDeployment.Name,
						consts.KubeLabelDynamoComponent:           serviceName,
					},
				},
				Spec: nvidiacomv1alpha1.DynamoGraphDeploymentScalingAdapterSpec{
					Replicas: currentReplicas,
					DGDRef: nvidiacomv1alpha1.DynamoGraphDeploymentServiceRef{
						Name:        dynamoDeployment.Name,
						ServiceName: serviceName,
					},
				},
			}
			// Return toDelete=true if scaling adapter is disabled
			return adapter, scalingAdapterDisabled, nil
		})

		if err != nil {
			logger.Error(err, "Failed to sync DynamoGraphDeploymentScalingAdapter", "service", serviceName)
			return err
		}
	}

	// Clean up adapters for services that were removed from DGD entirely
	adapterList := &nvidiacomv1alpha1.DynamoGraphDeploymentScalingAdapterList{}
	if err := r.List(ctx, adapterList,
		client.InNamespace(dynamoDeployment.Namespace),
		client.MatchingLabels{consts.KubeLabelDynamoGraphDeploymentName: dynamoDeployment.Name},
	); err != nil {
		logger.Error(err, "Failed to list DynamoGraphDeploymentScalingAdapters")
		return err
	}

	for i := range adapterList.Items {
		adapter := &adapterList.Items[i]
		serviceName := adapter.Spec.DGDRef.ServiceName

		// Delete adapter if service no longer exists in DGD
		if _, exists := dynamoDeployment.Spec.Services[serviceName]; !exists {
			logger.Info("Deleting orphaned DynamoGraphDeploymentScalingAdapter", "adapter", adapter.Name, "service", serviceName)
			if err := r.Delete(ctx, adapter); err != nil && !errors.IsNotFound(err) {
				logger.Error(err, "Failed to delete orphaned adapter", "adapter", adapter.Name)
				return err
			}
			r.Recorder.Eventf(dynamoDeployment, corev1.EventTypeNormal, "AdapterDeleted",
				"Deleted orphaned scaling adapter %s for removed service %s", adapter.Name, serviceName)
		}
	}

	return nil
}

// generateAdapterName creates a consistent name for a DynamoGraphDeploymentScalingAdapter
// Service names are lowercased to comply with Kubernetes DNS subdomain naming requirements
func generateAdapterName(dgdName, serviceName string) string {
	return fmt.Sprintf("%s-%s", dgdName, strings.ToLower(serviceName))
}

769
func (r *DynamoGraphDeploymentReconciler) FinalizeResource(ctx context.Context, dynamoDeployment *nvidiacomv1alpha1.DynamoGraphDeployment) error {
770
771
772
773
	// for now doing nothing
	return nil
}

Neelay Shah's avatar
Neelay Shah committed
774
// SetupWithManager sets up the controller with the Manager.
775
func (r *DynamoGraphDeploymentReconciler) SetupWithManager(mgr ctrl.Manager) error {
776
	ctrlBuilder := ctrl.NewControllerManagedBy(mgr).
777
		For(&nvidiacomv1alpha1.DynamoGraphDeployment{}, builder.WithPredicates(
778
779
			predicate.GenerationChangedPredicate{},
		)).
780
781
		Named("dynamographdeployment").
		Owns(&nvidiacomv1alpha1.DynamoComponentDeployment{}, builder.WithPredicates(predicate.Funcs{
Neelay Shah's avatar
Neelay Shah committed
782
783
784
785
786
787
			// ignore creation cause we don't want to be called again after we create the deployment
			CreateFunc:  func(ce event.CreateEvent) bool { return false },
			DeleteFunc:  func(de event.DeleteEvent) bool { return true },
			UpdateFunc:  func(de event.UpdateEvent) bool { return true },
			GenericFunc: func(ge event.GenericEvent) bool { return true },
		})).
788
789
790
791
792
793
794
		Owns(&nvidiacomv1alpha1.DynamoGraphDeploymentScalingAdapter{}, builder.WithPredicates(predicate.Funcs{
			// ignore creation cause we don't want to be called again after we create the adapter
			CreateFunc:  func(ce event.CreateEvent) bool { return false },
			DeleteFunc:  func(de event.DeleteEvent) bool { return true },
			UpdateFunc:  func(de event.UpdateEvent) bool { return false }, // Adapter updates are handled by adapter controller
			GenericFunc: func(ge event.GenericEvent) bool { return false },
		})).
795
796
797
798
799
800
801
		Owns(&corev1.PersistentVolumeClaim{}, builder.WithPredicates(predicate.Funcs{
			// ignore creation cause we don't want to be called again after we create the PVC
			CreateFunc:  func(ce event.CreateEvent) bool { return false },
			DeleteFunc:  func(de event.DeleteEvent) bool { return true },
			UpdateFunc:  func(de event.UpdateEvent) bool { return true },
			GenericFunc: func(ge event.GenericEvent) bool { return true },
		})).
802
		WithEventFilter(commoncontroller.EphemeralDeploymentEventFilter(r.Config))
803
	if r.Config.Grove.Enabled {
804
		ctrlBuilder = ctrlBuilder.Owns(&grovev1alpha1.PodCliqueSet{}, builder.WithPredicates(predicate.Funcs{
805
806
807
808
809
			// ignore creation cause we don't want to be called again after we create the pod gang set
			CreateFunc:  func(ce event.CreateEvent) bool { return false },
			DeleteFunc:  func(de event.DeleteEvent) bool { return true },
			UpdateFunc:  func(de event.UpdateEvent) bool { return true },
			GenericFunc: func(ge event.GenericEvent) bool { return true },
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
		})).
			// Watch PodClique resources - only on status changes
			// Note: We don't need to watch PodCliqueScalingGroup because it's just a container
			// for PodCliques. The actual status changes happen at the PodClique level.
			Watches(
				&grovev1alpha1.PodClique{},
				handler.EnqueueRequestsFromMapFunc(r.mapPodCliqueToRequests),
				builder.WithPredicates(predicate.Funcs{
					CreateFunc: func(ce event.CreateEvent) bool { return false },
					DeleteFunc: func(de event.DeleteEvent) bool { return false },
					UpdateFunc: func(ue event.UpdateEvent) bool {
						// Only trigger on status changes (readyReplicas or replicas)
						oldPC, okOld := ue.ObjectOld.(*grovev1alpha1.PodClique)
						newPC, okNew := ue.ObjectNew.(*grovev1alpha1.PodClique)
						if !okOld || !okNew {
							return false
						}
						// Trigger if readyReplicas or replicas changed
						return oldPC.Status.ReadyReplicas != newPC.Status.ReadyReplicas ||
							oldPC.Spec.Replicas != newPC.Spec.Replicas
					},
					GenericFunc: func(ge event.GenericEvent) bool { return false },
				}),
			)
834
835
	}
	return ctrlBuilder.Complete(r)
Neelay Shah's avatar
Neelay Shah committed
836
}
837
838
839
840

func (r *DynamoGraphDeploymentReconciler) GetRecorder() record.EventRecorder {
	return r.Recorder
}
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865

// mapPodCliqueToRequests maps a PodClique to reconcile requests for its owning DGD
// Uses the nvidia.com/dynamo-graph-deployment-name label for direct lookup - no API calls needed!
func (r *DynamoGraphDeploymentReconciler) mapPodCliqueToRequests(ctx context.Context, obj client.Object) []ctrl.Request {
	podClique, ok := obj.(*grovev1alpha1.PodClique)
	if !ok {
		return nil
	}

	// PodCliques are labeled with the DGD name and live in the same namespace
	dgdName, hasLabel := podClique.GetLabels()[consts.KubeLabelDynamoGraphDeploymentName]
	if !hasLabel || dgdName == "" {
		log.FromContext(ctx).V(1).Info("PodClique missing DGD label",
			"podClique", podClique.Name,
			"namespace", podClique.Namespace)
		return nil
	}

	return []ctrl.Request{{
		NamespacedName: types.NamespacedName{
			Name:      dgdName,
			Namespace: podClique.Namespace,
		},
	}}
}