dynamographdeployment_controller.go 14.1 KB
Newer Older
Neelay Shah's avatar
Neelay Shah committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/*
 * SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
 * SPDX-License-Identifier: Apache-2.0
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package controller

import (
	"context"
22
	"fmt"
Neelay Shah's avatar
Neelay Shah committed
23

24
25
26
27
	grovev1alpha1 "github.com/NVIDIA/grove/operator/api/core/v1alpha1"
	networkingv1beta1 "istio.io/client-go/pkg/apis/networking/v1beta1"
	corev1 "k8s.io/api/core/v1"
	networkingv1 "k8s.io/api/networking/v1"
Neelay Shah's avatar
Neelay Shah committed
28
29
30
31
32
33
34
35
36
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/client-go/tools/record"
	ctrl "sigs.k8s.io/controller-runtime"
	"sigs.k8s.io/controller-runtime/pkg/builder"
	"sigs.k8s.io/controller-runtime/pkg/client"
	"sigs.k8s.io/controller-runtime/pkg/event"
	"sigs.k8s.io/controller-runtime/pkg/log"
	"sigs.k8s.io/controller-runtime/pkg/predicate"

37
	nvidiacomv1alpha1 "github.com/ai-dynamo/dynamo/deploy/cloud/operator/api/v1alpha1"
38
	"github.com/ai-dynamo/dynamo/deploy/cloud/operator/internal/consts"
39
40
	commonController "github.com/ai-dynamo/dynamo/deploy/cloud/operator/internal/controller_common"
	"github.com/ai-dynamo/dynamo/deploy/cloud/operator/internal/dynamo"
Neelay Shah's avatar
Neelay Shah committed
41
42
)

43
44
45
46
type State string
type Reason string
type Message string

Neelay Shah's avatar
Neelay Shah committed
47
const (
48
49
50
	FailedState  State = "failed"
	ReadyState   State = "successful"
	PendingState State = "pending"
Neelay Shah's avatar
Neelay Shah committed
51
52
)

53
54
55
56
type etcdStorage interface {
	DeleteKeys(ctx context.Context, prefix string) error
}

57
58
// DynamoGraphDeploymentReconciler reconciles a DynamoGraphDeployment object
type DynamoGraphDeploymentReconciler struct {
Neelay Shah's avatar
Neelay Shah committed
59
	client.Client
60
61
62
	Config                commonController.Config
	Recorder              record.EventRecorder
	DockerSecretRetriever dockerSecretRetriever
Neelay Shah's avatar
Neelay Shah committed
63
64
}

65
66
67
// +kubebuilder:rbac:groups=nvidia.com,resources=dynamographdeployments,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=nvidia.com,resources=dynamographdeployments/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=nvidia.com,resources=dynamographdeployments/finalizers,verbs=update
68
// +kubebuilder:rbac:groups=grove.io,resources=podgangsets,verbs=get;list;watch;create;update;patch;delete
Neelay Shah's avatar
Neelay Shah committed
69
70
71
72

// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
// TODO(user): Modify the Reconcile function to compare the state specified by
73
// the DynamoGraphDeployment object against the actual cluster state, and then
Neelay Shah's avatar
Neelay Shah committed
74
75
76
77
78
// perform operations to make the cluster state reflect the state specified by
// the user.
//
// For more details, check Reconcile and its Result here:
// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.19.1/pkg/reconcile
79
func (r *DynamoGraphDeploymentReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
Neelay Shah's avatar
Neelay Shah committed
80
81
82
	logger := log.FromContext(ctx)

	var err error
83
84
85
	reason := Reason("undefined")
	message := Message("")
	state := PendingState
Neelay Shah's avatar
Neelay Shah committed
86
87
	readyStatus := metav1.ConditionFalse
	// retrieve the CRD
88
	dynamoDeployment := &nvidiacomv1alpha1.DynamoGraphDeployment{}
Neelay Shah's avatar
Neelay Shah committed
89
90
91
92
93
94
95
96
97
98
	if err = r.Get(ctx, req.NamespacedName, dynamoDeployment); err != nil {
		return ctrl.Result{}, client.IgnoreNotFound(err)
	}
	if err != nil {
		// not found, nothing to do
		return ctrl.Result{}, nil
	}

	defer func() {
		if err != nil {
99
100
			state = FailedState
			message = Message(err.Error())
101
			logger.Error(err, "Reconciliation failed")
Neelay Shah's avatar
Neelay Shah committed
102
		}
103
104
105
106
		dynamoDeployment.SetState(string(state))
		if state == ReadyState {
			readyStatus = metav1.ConditionTrue
		}
Neelay Shah's avatar
Neelay Shah committed
107
		// update the CRD status condition
108
109
110
		dynamoDeployment.AddStatusCondition(metav1.Condition{
			Type:               "Ready",
			Status:             readyStatus,
111
112
			Reason:             string(reason),
			Message:            string(message),
113
114
			LastTransitionTime: metav1.Now(),
		})
Neelay Shah's avatar
Neelay Shah committed
115
116
117
118
119
120
121
		err = r.Status().Update(ctx, dynamoDeployment)
		if err != nil {
			logger.Error(err, "Unable to update the CRD status", "crd", req.NamespacedName)
		}
		logger.Info("Reconciliation done")
	}()

122
123
	deleted, err := commonController.HandleFinalizer(ctx, dynamoDeployment, r.Client, r)
	if err != nil {
124
		logger.Error(err, "failed to handle the finalizer")
125
126
127
128
129
130
		reason = "failed_to_handle_the_finalizer"
		return ctrl.Result{}, err
	}
	if deleted {
		return ctrl.Result{}, nil
	}
131
	state, reason, message, err = r.reconcileResources(ctx, dynamoDeployment)
Neelay Shah's avatar
Neelay Shah committed
132
	if err != nil {
133
134
		logger.Error(err, "failed to reconcile the resources")
		reason = "failed_to_reconcile_the_resources"
Neelay Shah's avatar
Neelay Shah committed
135
136
		return ctrl.Result{}, err
	}
137
138
	return ctrl.Result{}, nil
}
Neelay Shah's avatar
Neelay Shah committed
139

140
141
142
143
type Resource interface {
	IsReady() bool
	GetName() string
}
Neelay Shah's avatar
Neelay Shah committed
144

145
146
func (r *DynamoGraphDeploymentReconciler) reconcileResources(ctx context.Context, dynamoDeployment *nvidiacomv1alpha1.DynamoGraphDeployment) (State, Reason, Message, error) {
	logger := log.FromContext(ctx)
147
	if r.Config.Grove.Enabled {
148
149
150
151
		// check if explicit opt out of grove
		if dynamoDeployment.Annotations[consts.KubeAnnotationEnableGrove] == consts.KubeLabelValueFalse {
			logger.Info("Grove is explicitly disabled for this deployment, skipping grove resources reconciliation")
			return r.reconcileDynamoComponentsDeployments(ctx, dynamoDeployment)
Neelay Shah's avatar
Neelay Shah committed
152
		}
153
		return r.reconcileGroveResources(ctx, dynamoDeployment)
Neelay Shah's avatar
Neelay Shah committed
154
	}
155
	return r.reconcileDynamoComponentsDeployments(ctx, dynamoDeployment)
Neelay Shah's avatar
Neelay Shah committed
156
157
158

}

159
160
161
162
163
164
165
func (r *DynamoGraphDeploymentReconciler) reconcileGroveResources(ctx context.Context, dynamoDeployment *nvidiacomv1alpha1.DynamoGraphDeployment) (State, Reason, Message, error) {
	logger := log.FromContext(ctx)
	// generate the dynamoComponentsDeployments from the config
	groveGangSet, err := dynamo.GenerateGrovePodGangSet(ctx, dynamoDeployment, r.Config, r.DockerSecretRetriever)
	if err != nil {
		logger.Error(err, "failed to generate the Grove GangSet")
		return "", "", "", fmt.Errorf("failed to generate the Grove GangSet: %w", err)
166
	}
167
168
169
170
171
172
	_, syncedGroveGangSet, err := commonController.SyncResource(ctx, r, dynamoDeployment, func(ctx context.Context) (*grovev1alpha1.PodGangSet, bool, error) {
		return groveGangSet, false, nil
	})
	if err != nil {
		logger.Error(err, "failed to sync the Grove GangSet")
		return "", "", "", fmt.Errorf("failed to sync the Grove GangSet: %w", err)
173
	}
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
	groveGangSetAsResource := commonController.WrapResource(syncedGroveGangSet, func() bool {
		if syncedGroveGangSet.Status.LastOperation != nil && syncedGroveGangSet.Status.LastOperation.State == grovev1alpha1.LastOperationStateSucceeded {
			return true
		}
		return false
	})
	resources := []Resource{groveGangSetAsResource}
	for componentName, component := range dynamoDeployment.Spec.Services {
		if component.ComponentType == consts.ComponentTypeMain {
			// generate the main component service
			mainComponentService, err := dynamo.GenerateComponentService(ctx, dynamo.GetDynamoComponentName(dynamoDeployment, componentName), dynamoDeployment.Namespace)
			if err != nil {
				logger.Error(err, "failed to generate the main component service")
				return "", "", "", fmt.Errorf("failed to generate the main component service: %w", err)
			}
			_, syncedMainComponentService, err := commonController.SyncResource(ctx, r, dynamoDeployment, func(ctx context.Context) (*corev1.Service, bool, error) {
				return mainComponentService, false, nil
			})
			if err != nil {
				logger.Error(err, "failed to sync the main component service")
				return "", "", "", fmt.Errorf("failed to sync the main component service: %w", err)
			}
			mainComponentServiceAsResource := commonController.WrapResource(syncedMainComponentService, func() bool {
				return true
			})
			resources = append(resources, mainComponentServiceAsResource)
			// generate the main component ingress
			ingressSpec := dynamo.GenerateDefaultIngressSpec(dynamoDeployment, r.Config.IngressConfig)
			if component.Ingress != nil {
				ingressSpec = *component.Ingress
			}
			mainComponentIngress := dynamo.GenerateComponentIngress(ctx, dynamo.GetDynamoComponentName(dynamoDeployment, componentName), dynamoDeployment.Namespace, ingressSpec)
			if err != nil {
				logger.Error(err, "failed to generate the main component ingress")
				return "", "", "", fmt.Errorf("failed to generate the main component ingress: %w", err)
			}
			_, syncedMainComponentIngress, err := commonController.SyncResource(ctx, r, dynamoDeployment, func(ctx context.Context) (*networkingv1.Ingress, bool, error) {
				if !ingressSpec.Enabled || ingressSpec.IngressControllerClassName == nil {
					logger.Info("Ingress is not enabled")
					return mainComponentIngress, true, nil
				}
				return mainComponentIngress, false, nil
			})
			if err != nil {
				logger.Error(err, "failed to sync the main component ingress")
				return "", "", "", fmt.Errorf("failed to sync the main component ingress: %w", err)
			}
			resources = append(resources, commonController.WrapResource(syncedMainComponentIngress, func() bool {
				return true
			}))
			// generate the main component virtual service
			mainComponentVirtualService := dynamo.GenerateComponentVirtualService(ctx, dynamo.GetDynamoComponentName(dynamoDeployment, componentName), dynamoDeployment.Namespace, ingressSpec)
			if err != nil {
				logger.Error(err, "failed to generate the main component virtual service")
				return "", "", "", fmt.Errorf("failed to generate the main component virtual service: %w", err)
			}
			_, syncedMainComponentVirtualService, err := commonController.SyncResource(ctx, r, dynamoDeployment, func(ctx context.Context) (*networkingv1beta1.VirtualService, bool, error) {
				vsEnabled := ingressSpec.Enabled && ingressSpec.UseVirtualService && ingressSpec.VirtualServiceGateway != nil
				if !vsEnabled {
					logger.Info("VirtualService is not enabled")
					return mainComponentVirtualService, true, nil
				}
				return mainComponentVirtualService, false, nil
			})
			if err != nil {
				logger.Error(err, "failed to sync the main component virtual service")
				return "", "", "", fmt.Errorf("failed to sync the main component virtual service: %w", err)
			}
			resources = append(resources, commonController.WrapResource(syncedMainComponentVirtualService, func() bool {
				return true
			}))
245
246
		}
	}
247
248
249
250
251
252
253
254
255
	return r.checkResourcesReadiness(resources)
}

func (r *DynamoGraphDeploymentReconciler) checkResourcesReadiness(resources []Resource) (State, Reason, Message, error) {
	notReadyResources := []string{}
	for _, resource := range resources {
		if !resource.IsReady() {
			notReadyResources = append(notReadyResources, resource.GetName())
		}
256
	}
257
258
	if len(notReadyResources) == 0 {
		return ReadyState, "all_resources_are_ready", Message("All resources are ready"), nil
259
	}
260
	return PendingState, "some_resources_are_not_ready", Message(fmt.Sprintf("%d resources not ready: %v", len(notReadyResources), notReadyResources)), nil
261
262
}

263
264
265
266
267
268
269
270
271
272
func (r *DynamoGraphDeploymentReconciler) reconcileDynamoComponentsDeployments(ctx context.Context, dynamoDeployment *nvidiacomv1alpha1.DynamoGraphDeployment) (State, Reason, Message, error) {
	resources := []Resource{}
	logger := log.FromContext(ctx)

	// generate the dynamoComponentsDeployments from the config
	defaultIngressSpec := dynamo.GenerateDefaultIngressSpec(dynamoDeployment, r.Config.IngressConfig)
	dynamoComponentsDeployments, err := dynamo.GenerateDynamoComponentsDeployments(ctx, dynamoDeployment, &defaultIngressSpec)
	if err != nil {
		logger.Error(err, "failed to generate the DynamoComponentsDeployments")
		return "", "", "", fmt.Errorf("failed to generate the DynamoComponentsDeployments: %w", err)
273
	}
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288

	// reconcile the dynamoComponentsDeployments
	for serviceName, dynamoComponentDeployment := range dynamoComponentsDeployments {
		logger.Info("Reconciling the DynamoComponentDeployment", "serviceName", serviceName, "dynamoComponentDeployment", dynamoComponentDeployment)
		_, dynamoComponentDeployment, err = commonController.SyncResource(ctx, r, dynamoDeployment, func(ctx context.Context) (*nvidiacomv1alpha1.DynamoComponentDeployment, bool, error) {
			return dynamoComponentDeployment, false, nil
		})
		if err != nil {
			logger.Error(err, "failed to sync the DynamoComponentDeployment")
			return "", "", "", fmt.Errorf("failed to sync the DynamoComponentDeployment: %w", err)
		}
		resources = append(resources, dynamoComponentDeployment)
	}

	return r.checkResourcesReadiness(resources)
289
290
}

291
func (r *DynamoGraphDeploymentReconciler) FinalizeResource(ctx context.Context, dynamoDeployment *nvidiacomv1alpha1.DynamoGraphDeployment) error {
292
293
294
295
	// for now doing nothing
	return nil
}

Neelay Shah's avatar
Neelay Shah committed
296
// SetupWithManager sets up the controller with the Manager.
297
func (r *DynamoGraphDeploymentReconciler) SetupWithManager(mgr ctrl.Manager) error {
298
	ctrlBuilder := ctrl.NewControllerManagedBy(mgr).
299
		For(&nvidiacomv1alpha1.DynamoGraphDeployment{}, builder.WithPredicates(
300
301
			predicate.GenerationChangedPredicate{},
		)).
302
303
		Named("dynamographdeployment").
		Owns(&nvidiacomv1alpha1.DynamoComponentDeployment{}, builder.WithPredicates(predicate.Funcs{
Neelay Shah's avatar
Neelay Shah committed
304
305
306
307
308
309
			// ignore creation cause we don't want to be called again after we create the deployment
			CreateFunc:  func(ce event.CreateEvent) bool { return false },
			DeleteFunc:  func(de event.DeleteEvent) bool { return true },
			UpdateFunc:  func(de event.UpdateEvent) bool { return true },
			GenericFunc: func(ge event.GenericEvent) bool { return true },
		})).
310
		WithEventFilter(commonController.EphemeralDeploymentEventFilter(r.Config))
311
	if r.Config.Grove.Enabled {
312
313
314
315
316
317
318
319
320
		ctrlBuilder = ctrlBuilder.Owns(&grovev1alpha1.PodGangSet{}, builder.WithPredicates(predicate.Funcs{
			// ignore creation cause we don't want to be called again after we create the pod gang set
			CreateFunc:  func(ce event.CreateEvent) bool { return false },
			DeleteFunc:  func(de event.DeleteEvent) bool { return true },
			UpdateFunc:  func(de event.UpdateEvent) bool { return true },
			GenericFunc: func(ge event.GenericEvent) bool { return true },
		}))
	}
	return ctrlBuilder.Complete(r)
Neelay Shah's avatar
Neelay Shah committed
321
}
322
323
324
325

func (r *DynamoGraphDeploymentReconciler) GetRecorder() record.EventRecorder {
	return r.Recorder
}