dynamographdeployment_controller.go 14 KB
Newer Older
Neelay Shah's avatar
Neelay Shah committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/*
 * SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
 * SPDX-License-Identifier: Apache-2.0
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package controller

import (
	"context"
22
	"fmt"
Neelay Shah's avatar
Neelay Shah committed
23

24
25
26
27
	grovev1alpha1 "github.com/NVIDIA/grove/operator/api/core/v1alpha1"
	networkingv1beta1 "istio.io/client-go/pkg/apis/networking/v1beta1"
	corev1 "k8s.io/api/core/v1"
	networkingv1 "k8s.io/api/networking/v1"
Neelay Shah's avatar
Neelay Shah committed
28
29
30
31
32
33
34
35
36
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/client-go/tools/record"
	ctrl "sigs.k8s.io/controller-runtime"
	"sigs.k8s.io/controller-runtime/pkg/builder"
	"sigs.k8s.io/controller-runtime/pkg/client"
	"sigs.k8s.io/controller-runtime/pkg/event"
	"sigs.k8s.io/controller-runtime/pkg/log"
	"sigs.k8s.io/controller-runtime/pkg/predicate"

37
	nvidiacomv1alpha1 "github.com/ai-dynamo/dynamo/deploy/cloud/operator/api/v1alpha1"
38
	"github.com/ai-dynamo/dynamo/deploy/cloud/operator/internal/consts"
39
40
	commonController "github.com/ai-dynamo/dynamo/deploy/cloud/operator/internal/controller_common"
	"github.com/ai-dynamo/dynamo/deploy/cloud/operator/internal/dynamo"
Neelay Shah's avatar
Neelay Shah committed
41
42
)

43
44
45
46
type State string
type Reason string
type Message string

Neelay Shah's avatar
Neelay Shah committed
47
const (
48
49
50
	FailedState  State = "failed"
	ReadyState   State = "successful"
	PendingState State = "pending"
Neelay Shah's avatar
Neelay Shah committed
51
52
)

53
54
55
56
type etcdStorage interface {
	DeleteKeys(ctx context.Context, prefix string) error
}

57
58
// DynamoGraphDeploymentReconciler reconciles a DynamoGraphDeployment object
type DynamoGraphDeploymentReconciler struct {
Neelay Shah's avatar
Neelay Shah committed
59
	client.Client
60
61
62
	Config                commonController.Config
	Recorder              record.EventRecorder
	DockerSecretRetriever dockerSecretRetriever
Neelay Shah's avatar
Neelay Shah committed
63
64
}

65
66
67
// +kubebuilder:rbac:groups=nvidia.com,resources=dynamographdeployments,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=nvidia.com,resources=dynamographdeployments/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=nvidia.com,resources=dynamographdeployments/finalizers,verbs=update
68
// +kubebuilder:rbac:groups=grove.io,resources=podgangsets,verbs=get;list;watch;create;update;patch;delete
Neelay Shah's avatar
Neelay Shah committed
69
70
71
72

// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
// TODO(user): Modify the Reconcile function to compare the state specified by
73
// the DynamoGraphDeployment object against the actual cluster state, and then
Neelay Shah's avatar
Neelay Shah committed
74
75
76
77
78
// perform operations to make the cluster state reflect the state specified by
// the user.
//
// For more details, check Reconcile and its Result here:
// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.19.1/pkg/reconcile
79
func (r *DynamoGraphDeploymentReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
Neelay Shah's avatar
Neelay Shah committed
80
81
82
	logger := log.FromContext(ctx)

	var err error
83
84
85
	reason := Reason("undefined")
	message := Message("")
	state := PendingState
Neelay Shah's avatar
Neelay Shah committed
86
87
	readyStatus := metav1.ConditionFalse
	// retrieve the CRD
88
	dynamoDeployment := &nvidiacomv1alpha1.DynamoGraphDeployment{}
Neelay Shah's avatar
Neelay Shah committed
89
90
91
92
93
94
95
96
97
98
	if err = r.Get(ctx, req.NamespacedName, dynamoDeployment); err != nil {
		return ctrl.Result{}, client.IgnoreNotFound(err)
	}
	if err != nil {
		// not found, nothing to do
		return ctrl.Result{}, nil
	}

	defer func() {
		if err != nil {
99
100
			state = FailedState
			message = Message(err.Error())
101
			logger.Error(err, "Reconciliation failed")
Neelay Shah's avatar
Neelay Shah committed
102
		}
103
104
105
106
		dynamoDeployment.SetState(string(state))
		if state == ReadyState {
			readyStatus = metav1.ConditionTrue
		}
Neelay Shah's avatar
Neelay Shah committed
107
		// update the CRD status condition
108
109
110
		dynamoDeployment.AddStatusCondition(metav1.Condition{
			Type:               "Ready",
			Status:             readyStatus,
111
112
			Reason:             string(reason),
			Message:            string(message),
113
114
			LastTransitionTime: metav1.Now(),
		})
Neelay Shah's avatar
Neelay Shah committed
115
116
117
118
119
120
121
		err = r.Status().Update(ctx, dynamoDeployment)
		if err != nil {
			logger.Error(err, "Unable to update the CRD status", "crd", req.NamespacedName)
		}
		logger.Info("Reconciliation done")
	}()

122
123
	deleted, err := commonController.HandleFinalizer(ctx, dynamoDeployment, r.Client, r)
	if err != nil {
124
		logger.Error(err, "failed to handle the finalizer")
125
126
127
128
129
130
		reason = "failed_to_handle_the_finalizer"
		return ctrl.Result{}, err
	}
	if deleted {
		return ctrl.Result{}, nil
	}
131
	state, reason, message, err = r.reconcileResources(ctx, dynamoDeployment)
Neelay Shah's avatar
Neelay Shah committed
132
	if err != nil {
133
134
		logger.Error(err, "failed to reconcile the resources")
		reason = "failed_to_reconcile_the_resources"
Neelay Shah's avatar
Neelay Shah committed
135
136
		return ctrl.Result{}, err
	}
137
138
	return ctrl.Result{}, nil
}
Neelay Shah's avatar
Neelay Shah committed
139

140
141
142
143
type Resource interface {
	IsReady() bool
	GetName() string
}
Neelay Shah's avatar
Neelay Shah committed
144

145
146
func (r *DynamoGraphDeploymentReconciler) reconcileResources(ctx context.Context, dynamoDeployment *nvidiacomv1alpha1.DynamoGraphDeployment) (State, Reason, Message, error) {
	logger := log.FromContext(ctx)
147
	if r.Config.Grove.Enabled {
148
149
150
151
		// check if explicit opt out of grove
		if dynamoDeployment.Annotations[consts.KubeAnnotationEnableGrove] == consts.KubeLabelValueFalse {
			logger.Info("Grove is explicitly disabled for this deployment, skipping grove resources reconciliation")
			return r.reconcileDynamoComponentsDeployments(ctx, dynamoDeployment)
Neelay Shah's avatar
Neelay Shah committed
152
		}
153
		return r.reconcileGroveResources(ctx, dynamoDeployment)
Neelay Shah's avatar
Neelay Shah committed
154
	}
155
	return r.reconcileDynamoComponentsDeployments(ctx, dynamoDeployment)
Neelay Shah's avatar
Neelay Shah committed
156
157
158

}

159
160
161
162
163
164
165
func (r *DynamoGraphDeploymentReconciler) reconcileGroveResources(ctx context.Context, dynamoDeployment *nvidiacomv1alpha1.DynamoGraphDeployment) (State, Reason, Message, error) {
	logger := log.FromContext(ctx)
	// generate the dynamoComponentsDeployments from the config
	groveGangSet, err := dynamo.GenerateGrovePodGangSet(ctx, dynamoDeployment, r.Config, r.DockerSecretRetriever)
	if err != nil {
		logger.Error(err, "failed to generate the Grove GangSet")
		return "", "", "", fmt.Errorf("failed to generate the Grove GangSet: %w", err)
166
	}
167
168
169
170
171
172
	_, syncedGroveGangSet, err := commonController.SyncResource(ctx, r, dynamoDeployment, func(ctx context.Context) (*grovev1alpha1.PodGangSet, bool, error) {
		return groveGangSet, false, nil
	})
	if err != nil {
		logger.Error(err, "failed to sync the Grove GangSet")
		return "", "", "", fmt.Errorf("failed to sync the Grove GangSet: %w", err)
173
	}
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
	groveGangSetAsResource := commonController.WrapResource(syncedGroveGangSet, func() bool {
		if syncedGroveGangSet.Status.LastOperation != nil && syncedGroveGangSet.Status.LastOperation.State == grovev1alpha1.LastOperationStateSucceeded {
			return true
		}
		return false
	})
	resources := []Resource{groveGangSetAsResource}
	for componentName, component := range dynamoDeployment.Spec.Services {
		if component.ComponentType == consts.ComponentTypeMain {
			// generate the main component service
			mainComponentService, err := dynamo.GenerateComponentService(ctx, dynamo.GetDynamoComponentName(dynamoDeployment, componentName), dynamoDeployment.Namespace)
			if err != nil {
				logger.Error(err, "failed to generate the main component service")
				return "", "", "", fmt.Errorf("failed to generate the main component service: %w", err)
			}
			_, syncedMainComponentService, err := commonController.SyncResource(ctx, r, dynamoDeployment, func(ctx context.Context) (*corev1.Service, bool, error) {
				return mainComponentService, false, nil
			})
			if err != nil {
				logger.Error(err, "failed to sync the main component service")
				return "", "", "", fmt.Errorf("failed to sync the main component service: %w", err)
			}
			mainComponentServiceAsResource := commonController.WrapResource(syncedMainComponentService, func() bool {
				return true
			})
			resources = append(resources, mainComponentServiceAsResource)
			// generate the main component ingress
			ingressSpec := dynamo.GenerateDefaultIngressSpec(dynamoDeployment, r.Config.IngressConfig)
			if component.Ingress != nil {
				ingressSpec = *component.Ingress
			}
			mainComponentIngress := dynamo.GenerateComponentIngress(ctx, dynamo.GetDynamoComponentName(dynamoDeployment, componentName), dynamoDeployment.Namespace, ingressSpec)
			if err != nil {
				logger.Error(err, "failed to generate the main component ingress")
				return "", "", "", fmt.Errorf("failed to generate the main component ingress: %w", err)
			}
			_, syncedMainComponentIngress, err := commonController.SyncResource(ctx, r, dynamoDeployment, func(ctx context.Context) (*networkingv1.Ingress, bool, error) {
				if !ingressSpec.Enabled || ingressSpec.IngressControllerClassName == nil {
					logger.Info("Ingress is not enabled")
					return mainComponentIngress, true, nil
				}
				return mainComponentIngress, false, nil
			})
			if err != nil {
				logger.Error(err, "failed to sync the main component ingress")
				return "", "", "", fmt.Errorf("failed to sync the main component ingress: %w", err)
			}
			resources = append(resources, commonController.WrapResource(syncedMainComponentIngress, func() bool {
				return true
			}))
			// generate the main component virtual service
225
226
227
228
229
			if r.Config.IngressConfig.UseVirtualService() {
				mainComponentVirtualService := dynamo.GenerateComponentVirtualService(ctx, dynamo.GetDynamoComponentName(dynamoDeployment, componentName), dynamoDeployment.Namespace, ingressSpec)
				if err != nil {
					logger.Error(err, "failed to generate the main component virtual service")
					return "", "", "", fmt.Errorf("failed to generate the main component virtual service: %w", err)
230
				}
231
232
233
234
235
236
237
238
239
240
241
242
243
244
				_, syncedMainComponentVirtualService, err := commonController.SyncResource(ctx, r, dynamoDeployment, func(ctx context.Context) (*networkingv1beta1.VirtualService, bool, error) {
					if !ingressSpec.IsVirtualServiceEnabled() {
						logger.Info("VirtualService is not enabled")
						return mainComponentVirtualService, true, nil
					}
					return mainComponentVirtualService, false, nil
				})
				if err != nil {
					logger.Error(err, "failed to sync the main component virtual service")
					return "", "", "", fmt.Errorf("failed to sync the main component virtual service: %w", err)
				}
				resources = append(resources, commonController.WrapResource(syncedMainComponentVirtualService, func() bool {
					return true
				}))
245
			}
246
247
		}
	}
248
249
250
251
252
253
254
255
256
	return r.checkResourcesReadiness(resources)
}

func (r *DynamoGraphDeploymentReconciler) checkResourcesReadiness(resources []Resource) (State, Reason, Message, error) {
	notReadyResources := []string{}
	for _, resource := range resources {
		if !resource.IsReady() {
			notReadyResources = append(notReadyResources, resource.GetName())
		}
257
	}
258
259
	if len(notReadyResources) == 0 {
		return ReadyState, "all_resources_are_ready", Message("All resources are ready"), nil
260
	}
261
	return PendingState, "some_resources_are_not_ready", Message(fmt.Sprintf("%d resources not ready: %v", len(notReadyResources), notReadyResources)), nil
262
263
}

264
265
266
267
268
269
270
271
272
273
func (r *DynamoGraphDeploymentReconciler) reconcileDynamoComponentsDeployments(ctx context.Context, dynamoDeployment *nvidiacomv1alpha1.DynamoGraphDeployment) (State, Reason, Message, error) {
	resources := []Resource{}
	logger := log.FromContext(ctx)

	// generate the dynamoComponentsDeployments from the config
	defaultIngressSpec := dynamo.GenerateDefaultIngressSpec(dynamoDeployment, r.Config.IngressConfig)
	dynamoComponentsDeployments, err := dynamo.GenerateDynamoComponentsDeployments(ctx, dynamoDeployment, &defaultIngressSpec)
	if err != nil {
		logger.Error(err, "failed to generate the DynamoComponentsDeployments")
		return "", "", "", fmt.Errorf("failed to generate the DynamoComponentsDeployments: %w", err)
274
	}
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289

	// reconcile the dynamoComponentsDeployments
	for serviceName, dynamoComponentDeployment := range dynamoComponentsDeployments {
		logger.Info("Reconciling the DynamoComponentDeployment", "serviceName", serviceName, "dynamoComponentDeployment", dynamoComponentDeployment)
		_, dynamoComponentDeployment, err = commonController.SyncResource(ctx, r, dynamoDeployment, func(ctx context.Context) (*nvidiacomv1alpha1.DynamoComponentDeployment, bool, error) {
			return dynamoComponentDeployment, false, nil
		})
		if err != nil {
			logger.Error(err, "failed to sync the DynamoComponentDeployment")
			return "", "", "", fmt.Errorf("failed to sync the DynamoComponentDeployment: %w", err)
		}
		resources = append(resources, dynamoComponentDeployment)
	}

	return r.checkResourcesReadiness(resources)
290
291
}

292
func (r *DynamoGraphDeploymentReconciler) FinalizeResource(ctx context.Context, dynamoDeployment *nvidiacomv1alpha1.DynamoGraphDeployment) error {
293
294
295
296
	// for now doing nothing
	return nil
}

Neelay Shah's avatar
Neelay Shah committed
297
// SetupWithManager sets up the controller with the Manager.
298
func (r *DynamoGraphDeploymentReconciler) SetupWithManager(mgr ctrl.Manager) error {
299
	ctrlBuilder := ctrl.NewControllerManagedBy(mgr).
300
		For(&nvidiacomv1alpha1.DynamoGraphDeployment{}, builder.WithPredicates(
301
302
			predicate.GenerationChangedPredicate{},
		)).
303
304
		Named("dynamographdeployment").
		Owns(&nvidiacomv1alpha1.DynamoComponentDeployment{}, builder.WithPredicates(predicate.Funcs{
Neelay Shah's avatar
Neelay Shah committed
305
306
307
308
309
310
			// ignore creation cause we don't want to be called again after we create the deployment
			CreateFunc:  func(ce event.CreateEvent) bool { return false },
			DeleteFunc:  func(de event.DeleteEvent) bool { return true },
			UpdateFunc:  func(de event.UpdateEvent) bool { return true },
			GenericFunc: func(ge event.GenericEvent) bool { return true },
		})).
311
		WithEventFilter(commonController.EphemeralDeploymentEventFilter(r.Config))
312
	if r.Config.Grove.Enabled {
313
314
315
316
317
318
319
320
321
		ctrlBuilder = ctrlBuilder.Owns(&grovev1alpha1.PodGangSet{}, builder.WithPredicates(predicate.Funcs{
			// ignore creation cause we don't want to be called again after we create the pod gang set
			CreateFunc:  func(ce event.CreateEvent) bool { return false },
			DeleteFunc:  func(de event.DeleteEvent) bool { return true },
			UpdateFunc:  func(de event.UpdateEvent) bool { return true },
			GenericFunc: func(ge event.GenericEvent) bool { return true },
		}))
	}
	return ctrlBuilder.Complete(r)
Neelay Shah's avatar
Neelay Shah committed
322
}
323
324
325
326

func (r *DynamoGraphDeploymentReconciler) GetRecorder() record.EventRecorder {
	return r.Recorder
}