"lib/bindings/vscode:/vscode.git/clone" did not exist on "533b8cee1c0490dedb3b4f6b97553f8dd53c0c80"
dynamographdeployment_controller.go 22.9 KB
Newer Older
Neelay Shah's avatar
Neelay Shah committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/*
 * SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
 * SPDX-License-Identifier: Apache-2.0
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package controller

import (
	"context"
22
	"fmt"
23
	"strings"
Neelay Shah's avatar
Neelay Shah committed
24

25
	grovev1alpha1 "github.com/NVIDIA/grove/operator/api/core/v1alpha1"
26
27
	"k8s.io/apimachinery/pkg/api/errors"

28
29
	"github.com/ai-dynamo/dynamo/deploy/cloud/operator/internal/secret"

30
31
32
	networkingv1beta1 "istio.io/client-go/pkg/apis/networking/v1beta1"
	corev1 "k8s.io/api/core/v1"
	networkingv1 "k8s.io/api/networking/v1"
Neelay Shah's avatar
Neelay Shah committed
33
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
34
	"k8s.io/apimachinery/pkg/runtime/schema"
35
	"k8s.io/apimachinery/pkg/types"
36
	"k8s.io/client-go/scale"
Neelay Shah's avatar
Neelay Shah committed
37
38
39
40
	"k8s.io/client-go/tools/record"
	ctrl "sigs.k8s.io/controller-runtime"
	"sigs.k8s.io/controller-runtime/pkg/builder"
	"sigs.k8s.io/controller-runtime/pkg/client"
41
	"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
Neelay Shah's avatar
Neelay Shah committed
42
43
44
45
	"sigs.k8s.io/controller-runtime/pkg/event"
	"sigs.k8s.io/controller-runtime/pkg/log"
	"sigs.k8s.io/controller-runtime/pkg/predicate"

46
	nvidiacomv1alpha1 "github.com/ai-dynamo/dynamo/deploy/cloud/operator/api/v1alpha1"
47
	"github.com/ai-dynamo/dynamo/deploy/cloud/operator/internal/consts"
48
49
	commonController "github.com/ai-dynamo/dynamo/deploy/cloud/operator/internal/controller_common"
	"github.com/ai-dynamo/dynamo/deploy/cloud/operator/internal/dynamo"
Neelay Shah's avatar
Neelay Shah committed
50
51
)

52
53
54
55
type State string
type Reason string
type Message string

Neelay Shah's avatar
Neelay Shah committed
56
const (
57
58
59
	FailedState  State = "failed"
	ReadyState   State = "successful"
	PendingState State = "pending"
Neelay Shah's avatar
Neelay Shah committed
60
61
)

62
63
64
65
type etcdStorage interface {
	DeleteKeys(ctx context.Context, prefix string) error
}

66
67
68
69
70
// rbacManager interface for managing RBAC resources
type rbacManager interface {
	EnsureServiceAccountWithRBAC(ctx context.Context, targetNamespace, serviceAccountName, clusterRoleName string) error
}

71
72
// DynamoGraphDeploymentReconciler reconciles a DynamoGraphDeployment object
type DynamoGraphDeploymentReconciler struct {
Neelay Shah's avatar
Neelay Shah committed
73
	client.Client
74
75
76
	Config                commonController.Config
	Recorder              record.EventRecorder
	DockerSecretRetriever dockerSecretRetriever
77
	ScaleClient           scale.ScalesGetter
78
	MPISecretReplicator   *secret.SecretReplicator
79
	RBACManager           rbacManager
Neelay Shah's avatar
Neelay Shah committed
80
81
}

82
83
84
// +kubebuilder:rbac:groups=nvidia.com,resources=dynamographdeployments,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=nvidia.com,resources=dynamographdeployments/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=nvidia.com,resources=dynamographdeployments/finalizers,verbs=update
85
// +kubebuilder:rbac:groups=grove.io,resources=podcliquesets,verbs=get;list;watch;create;update;patch;delete
86
87
// +kubebuilder:rbac:groups=grove.io,resources=podcliques/scale,verbs=get;update;patch
// +kubebuilder:rbac:groups=grove.io,resources=podcliquescalinggroups/scale,verbs=get;update;patch
88
// +kubebuilder:rbac:groups=scheduling.run.ai,resources=queues,verbs=get;list
Neelay Shah's avatar
Neelay Shah committed
89
90
91
92

// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
// TODO(user): Modify the Reconcile function to compare the state specified by
93
// the DynamoGraphDeployment object against the actual cluster state, and then
Neelay Shah's avatar
Neelay Shah committed
94
95
96
97
98
// perform operations to make the cluster state reflect the state specified by
// the user.
//
// For more details, check Reconcile and its Result here:
// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.19.1/pkg/reconcile
99
func (r *DynamoGraphDeploymentReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
Neelay Shah's avatar
Neelay Shah committed
100
101
102
	logger := log.FromContext(ctx)

	var err error
103
104
105
	reason := Reason("undefined")
	message := Message("")
	state := PendingState
Neelay Shah's avatar
Neelay Shah committed
106
	// retrieve the CRD
107
	dynamoDeployment := &nvidiacomv1alpha1.DynamoGraphDeployment{}
Neelay Shah's avatar
Neelay Shah committed
108
109
110
111
112
113
	if err = r.Get(ctx, req.NamespacedName, dynamoDeployment); err != nil {
		return ctrl.Result{}, client.IgnoreNotFound(err)
	}

	defer func() {
		if err != nil {
114
115
			state = FailedState
			message = Message(err.Error())
116
			logger.Error(err, "Reconciliation failed")
Neelay Shah's avatar
Neelay Shah committed
117
		}
118
		dynamoDeployment.SetState(string(state))
119
120

		readyStatus := metav1.ConditionFalse
121
122
123
		if state == ReadyState {
			readyStatus = metav1.ConditionTrue
		}
124
125

		// Update Ready condition
126
127
128
		dynamoDeployment.AddStatusCondition(metav1.Condition{
			Type:               "Ready",
			Status:             readyStatus,
129
130
			Reason:             string(reason),
			Message:            string(message),
131
132
			LastTransitionTime: metav1.Now(),
		})
133

Neelay Shah's avatar
Neelay Shah committed
134
135
		err = r.Status().Update(ctx, dynamoDeployment)
		if err != nil {
136
			logger.Error(err, "Unable to update the CRD status", "crd", req.NamespacedName, "state", state, "reason", reason, "message", message)
Neelay Shah's avatar
Neelay Shah committed
137
138
139
140
		}
		logger.Info("Reconciliation done")
	}()

141
142
	deleted, err := commonController.HandleFinalizer(ctx, dynamoDeployment, r.Client, r)
	if err != nil {
143
		logger.Error(err, "failed to handle the finalizer")
144
145
146
147
148
149
		reason = "failed_to_handle_the_finalizer"
		return ctrl.Result{}, err
	}
	if deleted {
		return ctrl.Result{}, nil
	}
150
	state, reason, message, err = r.reconcileResources(ctx, dynamoDeployment)
Neelay Shah's avatar
Neelay Shah committed
151
	if err != nil {
152
153
		logger.Error(err, "failed to reconcile the resources")
		reason = "failed_to_reconcile_the_resources"
Neelay Shah's avatar
Neelay Shah committed
154
155
		return ctrl.Result{}, err
	}
156
157
	return ctrl.Result{}, nil
}
Neelay Shah's avatar
Neelay Shah committed
158

159
type Resource interface {
160
	IsReady() (ready bool, reason string)
161
162
	GetName() string
}
Neelay Shah's avatar
Neelay Shah committed
163

164
165
func (r *DynamoGraphDeploymentReconciler) reconcileResources(ctx context.Context, dynamoDeployment *nvidiacomv1alpha1.DynamoGraphDeployment) (State, Reason, Message, error) {
	logger := log.FromContext(ctx)
166

167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
	// Ensure planner RBAC exists in cluster-wide mode
	if r.Config.RestrictedNamespace == "" {
		if r.RBACManager == nil {
			return "", "", "", fmt.Errorf("RBAC manager not initialized in cluster-wide mode")
		}
		if r.Config.RBAC.PlannerClusterRoleName == "" {
			return "", "", "", fmt.Errorf("planner ClusterRole name is required in cluster-wide mode")
		}
		if err := r.RBACManager.EnsureServiceAccountWithRBAC(
			ctx,
			dynamoDeployment.Namespace,
			consts.PlannerServiceAccountName,
			r.Config.RBAC.PlannerClusterRoleName,
		); err != nil {
			logger.Error(err, "Failed to ensure planner RBAC")
			return "", "", "", fmt.Errorf("failed to ensure planner RBAC: %w", err)
		}
	}

186
187
188
189
190
191
192
	// Reconcile top-level PVCs first
	err := r.reconcilePVCs(ctx, dynamoDeployment)
	if err != nil {
		logger.Error(err, "Failed to reconcile top-level PVCs")
		return "", "", "", fmt.Errorf("failed to reconcile top-level PVCs: %w", err)
	}

193
194
195
196
197
198
199
200
201
202
203
	// Orchestrator selection via single boolean annotation: nvidia.com/enable-grove
	// Unset or not "false": Grove if available; else component mode
	// "false": component mode (multinode -> LWS; single-node -> standard)
	enableGrove := true
	if dynamoDeployment.Annotations != nil && strings.ToLower(dynamoDeployment.Annotations[consts.KubeAnnotationEnableGrove]) == consts.KubeLabelValueFalse {
		enableGrove = false
	}

	// Determine if any service is multinode
	hasMultinode := dynamoDeployment.HasAnyMultinodeService()

204
205
206
207
208
209
210
211
212
	// Always ensure MPI SSH secret is available in this namespace
	if r.MPISecretReplicator != nil {
		err := r.MPISecretReplicator.Replicate(ctx, dynamoDeployment.Namespace)
		if err != nil {
			logger.Error(err, "Failed to replicate MPI secret", "namespace", dynamoDeployment.Namespace)
			return "", "", "", fmt.Errorf("failed to replicate MPI secret: %w", err)
		}
	}

213
214
	if enableGrove && r.Config.Grove.Enabled {
		logger.Info("Reconciling Grove resources", "enableGrove", enableGrove, "groveEnabled", r.Config.Grove.Enabled, "hasMultinode", hasMultinode, "lwsEnabled", r.Config.LWS.Enabled)
215
		return r.reconcileGroveResources(ctx, dynamoDeployment)
Neelay Shah's avatar
Neelay Shah committed
216
	}
217
218
219
220
221
222
	if hasMultinode && !r.Config.LWS.Enabled {
		err := fmt.Errorf("no multinode orchestrator available")
		logger.Error(err, err.Error(), "hasMultinode", hasMultinode, "lwsEnabled", r.Config.LWS.Enabled, "enableGrove", enableGrove, "groveEnabled", r.Config.Grove.Enabled)
		return "", "", "", err
	}
	logger.Info("Reconciling Dynamo components deployments", "hasMultinode", hasMultinode, "lwsEnabled", r.Config.LWS.Enabled, "enableGrove", enableGrove, "groveEnabled", r.Config.Grove.Enabled)
223
	return r.reconcileDynamoComponentsDeployments(ctx, dynamoDeployment)
Neelay Shah's avatar
Neelay Shah committed
224
225
226

}

227
228
229
230
231
232
233
// scaleGroveResource scales a Grove resource using the generic scaling function
func (r *DynamoGraphDeploymentReconciler) scaleGroveResource(ctx context.Context, resourceName, namespace string, newReplicas int32, resourceType string) error {
	logger := log.FromContext(ctx)
	// Determine the GroupVersionResource based on resource type
	var gvr schema.GroupVersionResource
	switch resourceType {
	case "PodClique":
234
		gvr = consts.PodCliqueGVR
235
	case "PodCliqueScalingGroup":
236
		gvr = consts.PodCliqueScalingGroupGVR
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
	default:
		return fmt.Errorf("unsupported Grove resource type: %s", resourceType)
	}

	// Use the generic scaling function
	err := commonController.ScaleResource(ctx, r.ScaleClient, gvr, namespace, resourceName, newReplicas)
	if err != nil {
		if errors.IsNotFound(err) {
			// Resource doesn't exist yet - this is normal during initial creation when Grove is still creating the resources asynchronously
			logger.V(1).Info("Grove resource not found yet, skipping scaling for now - will retry on next reconciliation", "gvr", gvr, "name", resourceName, "namespace", namespace)
			return nil
		}
	}
	return err
}

// reconcileGroveScaling handles scaling operations for Grove resources based on service replica changes
func (r *DynamoGraphDeploymentReconciler) reconcileGroveScaling(ctx context.Context, dynamoDeployment *nvidiacomv1alpha1.DynamoGraphDeployment) error {
	logger := log.FromContext(ctx)
	logger.V(1).Info("Reconciling Grove scaling operations")

	replicaIndex := 0
	for serviceName, component := range dynamoDeployment.Spec.Services {
		// Skip if replicas are not specified
		if component.Replicas == nil {
			continue
		}

		numberOfNodes := component.GetNumberOfNodes()
		isMultinode := numberOfNodes > 1

		if isMultinode {
			// Scale PodCliqueScalingGroup for multinode services
			// Grove naming pattern: {DGD.name}-{replicaIndex}-{serviceName}
			resourceName := fmt.Sprintf("%s-%d-%s", dynamoDeployment.Name, replicaIndex, strings.ToLower(serviceName))
			err := r.scaleGroveResource(ctx,
				resourceName,
				dynamoDeployment.Namespace,
				*component.Replicas,
				"PodCliqueScalingGroup")
			if err != nil {
				logger.Error(err, "Failed to scale PodCliqueScalingGroup", "serviceName", serviceName, "resourceName", resourceName, "replicas", *component.Replicas)
				return fmt.Errorf("failed to scale PodCliqueScalingGroup %s: %w", resourceName, err)
			}
		} else {
			// Scale individual PodClique for single-node services
			// Grove naming pattern: {DGD.name}-{replicaIndex}-{serviceName}
			resourceName := fmt.Sprintf("%s-%d-%s", dynamoDeployment.Name, replicaIndex, strings.ToLower(serviceName))
			err := r.scaleGroveResource(ctx,
				resourceName,
				dynamoDeployment.Namespace,
				*component.Replicas,
				"PodClique")
			if err != nil {
				logger.Error(err, "Failed to scale PodClique", "serviceName", serviceName, "resourceName", resourceName, "replicas", *component.Replicas)
				return fmt.Errorf("failed to scale PodClique %s: %w", resourceName, err)
			}
		}
	}

	logger.V(1).Info("Successfully reconciled Grove scaling operations")
	return nil
}

301
302
303
func (r *DynamoGraphDeploymentReconciler) reconcileGroveResources(ctx context.Context, dynamoDeployment *nvidiacomv1alpha1.DynamoGraphDeployment) (State, Reason, Message, error) {
	logger := log.FromContext(ctx)
	// generate the dynamoComponentsDeployments from the config
304
	groveGangSet, err := dynamo.GenerateGrovePodCliqueSet(ctx, dynamoDeployment, r.Config, r.DockerSecretRetriever)
305
306
307
	if err != nil {
		logger.Error(err, "failed to generate the Grove GangSet")
		return "", "", "", fmt.Errorf("failed to generate the Grove GangSet: %w", err)
308
	}
309
	_, syncedGroveGangSet, err := commonController.SyncResource(ctx, r, dynamoDeployment, func(ctx context.Context) (*grovev1alpha1.PodCliqueSet, bool, error) {
310
311
312
313
314
		return groveGangSet, false, nil
	})
	if err != nil {
		logger.Error(err, "failed to sync the Grove GangSet")
		return "", "", "", fmt.Errorf("failed to sync the Grove GangSet: %w", err)
315
	}
316
317
318
319
320
321
322
323
324
325
326
	groveGangSetAsResource := commonController.WrapResource(
		syncedGroveGangSet,
		func() (bool, string) {
			// Grove readiness: all underlying PodCliques and PodCliqueScalingGroups have replicas == availableReplicas
			allComponentsReady, reason := dynamo.EvaluateAllComponentsReady(ctx, r.Client, dynamoDeployment)
			if !allComponentsReady {
				return false, reason
			}
			return true, ""
		},
	)
327
328
329
330

	// Handle Grove scaling operations after structural changes
	if err := r.reconcileGroveScaling(ctx, dynamoDeployment); err != nil {
		logger.Error(err, "failed to reconcile Grove scaling")
331
		return "", "", "", fmt.Errorf("failed to reconcile Grove scaling: %w", err)
332
333
	}

334
335
	resources := []Resource{groveGangSetAsResource}
	for componentName, component := range dynamoDeployment.Spec.Services {
336
		if component.ComponentType == consts.ComponentTypeFrontend {
337
338
339
340
341
342
343
344
345
346
347
348
349
			// generate the main component service
			mainComponentService, err := dynamo.GenerateComponentService(ctx, dynamo.GetDynamoComponentName(dynamoDeployment, componentName), dynamoDeployment.Namespace)
			if err != nil {
				logger.Error(err, "failed to generate the main component service")
				return "", "", "", fmt.Errorf("failed to generate the main component service: %w", err)
			}
			_, syncedMainComponentService, err := commonController.SyncResource(ctx, r, dynamoDeployment, func(ctx context.Context) (*corev1.Service, bool, error) {
				return mainComponentService, false, nil
			})
			if err != nil {
				logger.Error(err, "failed to sync the main component service")
				return "", "", "", fmt.Errorf("failed to sync the main component service: %w", err)
			}
350
351
352
353
			mainComponentServiceAsResource := commonController.WrapResource(syncedMainComponentService,
				func() (bool, string) {
					return true, ""
				})
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
			resources = append(resources, mainComponentServiceAsResource)
			// generate the main component ingress
			ingressSpec := dynamo.GenerateDefaultIngressSpec(dynamoDeployment, r.Config.IngressConfig)
			if component.Ingress != nil {
				ingressSpec = *component.Ingress
			}
			mainComponentIngress := dynamo.GenerateComponentIngress(ctx, dynamo.GetDynamoComponentName(dynamoDeployment, componentName), dynamoDeployment.Namespace, ingressSpec)
			_, syncedMainComponentIngress, err := commonController.SyncResource(ctx, r, dynamoDeployment, func(ctx context.Context) (*networkingv1.Ingress, bool, error) {
				if !ingressSpec.Enabled || ingressSpec.IngressControllerClassName == nil {
					logger.Info("Ingress is not enabled")
					return mainComponentIngress, true, nil
				}
				return mainComponentIngress, false, nil
			})
			if err != nil {
				logger.Error(err, "failed to sync the main component ingress")
				return "", "", "", fmt.Errorf("failed to sync the main component ingress: %w", err)
			}
372
373
374
375
			resources = append(resources, commonController.WrapResource(syncedMainComponentIngress,
				func() (bool, string) {
					return true, ""
				}))
376
			// generate the main component virtual service
377
378
379
380
381
382
383
384
385
386
387
388
389
			if r.Config.IngressConfig.UseVirtualService() {
				mainComponentVirtualService := dynamo.GenerateComponentVirtualService(ctx, dynamo.GetDynamoComponentName(dynamoDeployment, componentName), dynamoDeployment.Namespace, ingressSpec)
				_, syncedMainComponentVirtualService, err := commonController.SyncResource(ctx, r, dynamoDeployment, func(ctx context.Context) (*networkingv1beta1.VirtualService, bool, error) {
					if !ingressSpec.IsVirtualServiceEnabled() {
						logger.Info("VirtualService is not enabled")
						return mainComponentVirtualService, true, nil
					}
					return mainComponentVirtualService, false, nil
				})
				if err != nil {
					logger.Error(err, "failed to sync the main component virtual service")
					return "", "", "", fmt.Errorf("failed to sync the main component virtual service: %w", err)
				}
390
391
392
393
				resources = append(resources, commonController.WrapResource(syncedMainComponentVirtualService,
					func() (bool, string) {
						return true, ""
					}))
394
			}
395
396
		}
	}
397
398
399
400
	return r.checkResourcesReadiness(resources)
}

func (r *DynamoGraphDeploymentReconciler) checkResourcesReadiness(resources []Resource) (State, Reason, Message, error) {
401
	var notReadyReasons []string
402
	notReadyResources := []string{}
403

404
	for _, resource := range resources {
405
406
		ready, reason := resource.IsReady()
		if !ready {
407
			notReadyResources = append(notReadyResources, resource.GetName())
408
			notReadyReasons = append(notReadyReasons, fmt.Sprintf("%s: %s", resource.GetName(), reason))
409
		}
410
	}
411

412
413
	if len(notReadyResources) == 0 {
		return ReadyState, "all_resources_are_ready", Message("All resources are ready"), nil
414
	}
415
	return PendingState, "some_resources_are_not_ready", Message(fmt.Sprintf("Resources not ready: %s", strings.Join(notReadyReasons, "; "))), nil
416
417
}

418
419
420
421
422
423
424
425
426
427
func (r *DynamoGraphDeploymentReconciler) reconcileDynamoComponentsDeployments(ctx context.Context, dynamoDeployment *nvidiacomv1alpha1.DynamoGraphDeployment) (State, Reason, Message, error) {
	resources := []Resource{}
	logger := log.FromContext(ctx)

	// generate the dynamoComponentsDeployments from the config
	defaultIngressSpec := dynamo.GenerateDefaultIngressSpec(dynamoDeployment, r.Config.IngressConfig)
	dynamoComponentsDeployments, err := dynamo.GenerateDynamoComponentsDeployments(ctx, dynamoDeployment, &defaultIngressSpec)
	if err != nil {
		logger.Error(err, "failed to generate the DynamoComponentsDeployments")
		return "", "", "", fmt.Errorf("failed to generate the DynamoComponentsDeployments: %w", err)
428
	}
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443

	// reconcile the dynamoComponentsDeployments
	for serviceName, dynamoComponentDeployment := range dynamoComponentsDeployments {
		logger.Info("Reconciling the DynamoComponentDeployment", "serviceName", serviceName, "dynamoComponentDeployment", dynamoComponentDeployment)
		_, dynamoComponentDeployment, err = commonController.SyncResource(ctx, r, dynamoDeployment, func(ctx context.Context) (*nvidiacomv1alpha1.DynamoComponentDeployment, bool, error) {
			return dynamoComponentDeployment, false, nil
		})
		if err != nil {
			logger.Error(err, "failed to sync the DynamoComponentDeployment")
			return "", "", "", fmt.Errorf("failed to sync the DynamoComponentDeployment: %w", err)
		}
		resources = append(resources, dynamoComponentDeployment)
	}

	return r.checkResourcesReadiness(resources)
444
445
}

446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
// reconcilePVC reconciles a single top-level PVC defined in the DynamoGraphDeployment spec
func (r *DynamoGraphDeploymentReconciler) reconcilePVC(ctx context.Context, dynamoDeployment *nvidiacomv1alpha1.DynamoGraphDeployment, pvcName string, pvcConfig nvidiacomv1alpha1.PVC) (*corev1.PersistentVolumeClaim, error) {
	logger := log.FromContext(ctx)

	pvc := &corev1.PersistentVolumeClaim{}
	pvcNamespacedName := types.NamespacedName{Name: pvcName, Namespace: dynamoDeployment.Namespace}
	err := r.Get(ctx, pvcNamespacedName, pvc)
	if err != nil && client.IgnoreNotFound(err) != nil {
		logger.Error(err, "Unable to retrieve top-level PVC", "pvcName", pvcName)
		return nil, err
	}

	// If PVC does not exist, create a new one
	if err != nil {
		if pvcConfig.Create == nil || !*pvcConfig.Create {
			logger.Error(err, "Top-level PVC does not exist and create is not enabled", "pvcName", pvcName)
			return nil, err
		}

		pvc = constructPVC(dynamoDeployment, pvcConfig)
		if err := controllerutil.SetControllerReference(dynamoDeployment, pvc, r.Client.Scheme()); err != nil {
			logger.Error(err, "Failed to set controller reference for top-level PVC", "pvcName", pvcName)
			return nil, err
		}

		err = r.Create(ctx, pvc)
		if err != nil {
			logger.Error(err, "Failed to create top-level PVC", "pvcName", pvcName)
			return nil, err
		}
		logger.Info("Top-level PVC created", "pvcName", pvcName, "namespace", dynamoDeployment.Namespace)
	}

	return pvc, nil
}

// reconcilePVCs reconciles all top-level PVCs defined in the DynamoGraphDeployment spec
func (r *DynamoGraphDeploymentReconciler) reconcilePVCs(ctx context.Context, dynamoDeployment *nvidiacomv1alpha1.DynamoGraphDeployment) error {
	logger := log.FromContext(ctx)

	if dynamoDeployment.Spec.PVCs == nil {
		return nil
	}

	for _, pvcConfig := range dynamoDeployment.Spec.PVCs {
		if pvcConfig.Name == nil || *pvcConfig.Name == "" {
			logger.Error(nil, "PVC not reconcilable: name is required", "pvcConfig", pvcConfig)
			continue
		}

		pvcName := *pvcConfig.Name
		logger.Info("Reconciling top-level PVC", "pvcName", pvcName, "namespace", dynamoDeployment.Namespace)

		_, err := r.reconcilePVC(ctx, dynamoDeployment, pvcName, pvcConfig)
		if err != nil {
			return err
		}
	}

	return nil
}

508
func (r *DynamoGraphDeploymentReconciler) FinalizeResource(ctx context.Context, dynamoDeployment *nvidiacomv1alpha1.DynamoGraphDeployment) error {
509
510
511
512
	// for now doing nothing
	return nil
}

Neelay Shah's avatar
Neelay Shah committed
513
// SetupWithManager sets up the controller with the Manager.
514
func (r *DynamoGraphDeploymentReconciler) SetupWithManager(mgr ctrl.Manager) error {
515
	ctrlBuilder := ctrl.NewControllerManagedBy(mgr).
516
		For(&nvidiacomv1alpha1.DynamoGraphDeployment{}, builder.WithPredicates(
517
518
			predicate.GenerationChangedPredicate{},
		)).
519
520
		Named("dynamographdeployment").
		Owns(&nvidiacomv1alpha1.DynamoComponentDeployment{}, builder.WithPredicates(predicate.Funcs{
Neelay Shah's avatar
Neelay Shah committed
521
522
523
524
525
526
			// ignore creation cause we don't want to be called again after we create the deployment
			CreateFunc:  func(ce event.CreateEvent) bool { return false },
			DeleteFunc:  func(de event.DeleteEvent) bool { return true },
			UpdateFunc:  func(de event.UpdateEvent) bool { return true },
			GenericFunc: func(ge event.GenericEvent) bool { return true },
		})).
527
528
529
530
531
532
533
		Owns(&corev1.PersistentVolumeClaim{}, builder.WithPredicates(predicate.Funcs{
			// ignore creation cause we don't want to be called again after we create the PVC
			CreateFunc:  func(ce event.CreateEvent) bool { return false },
			DeleteFunc:  func(de event.DeleteEvent) bool { return true },
			UpdateFunc:  func(de event.UpdateEvent) bool { return true },
			GenericFunc: func(ge event.GenericEvent) bool { return true },
		})).
534
		WithEventFilter(commonController.EphemeralDeploymentEventFilter(r.Config))
535
	if r.Config.Grove.Enabled {
536
		ctrlBuilder = ctrlBuilder.Owns(&grovev1alpha1.PodCliqueSet{}, builder.WithPredicates(predicate.Funcs{
537
538
539
540
541
542
543
544
			// ignore creation cause we don't want to be called again after we create the pod gang set
			CreateFunc:  func(ce event.CreateEvent) bool { return false },
			DeleteFunc:  func(de event.DeleteEvent) bool { return true },
			UpdateFunc:  func(de event.UpdateEvent) bool { return true },
			GenericFunc: func(ge event.GenericEvent) bool { return true },
		}))
	}
	return ctrlBuilder.Complete(r)
Neelay Shah's avatar
Neelay Shah committed
545
}
546
547
548
549

func (r *DynamoGraphDeploymentReconciler) GetRecorder() record.EventRecorder {
	return r.Recorder
}