dynamographdeployment_controller.go 12.8 KB
Newer Older
Neelay Shah's avatar
Neelay Shah committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/*
 * SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
 * SPDX-License-Identifier: Apache-2.0
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package controller

import (
	"context"
22
	"encoding/json"
23
	"fmt"
Neelay Shah's avatar
Neelay Shah committed
24
25

	"dario.cat/mergo"
26
	corev1 "k8s.io/api/core/v1"
Neelay Shah's avatar
Neelay Shah committed
27
28
29
30
31
32
33
34
35
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/client-go/tools/record"
	ctrl "sigs.k8s.io/controller-runtime"
	"sigs.k8s.io/controller-runtime/pkg/builder"
	"sigs.k8s.io/controller-runtime/pkg/client"
	"sigs.k8s.io/controller-runtime/pkg/event"
	"sigs.k8s.io/controller-runtime/pkg/log"
	"sigs.k8s.io/controller-runtime/pkg/predicate"

36
	nvidiacomv1alpha1 "github.com/ai-dynamo/dynamo/deploy/cloud/operator/api/v1alpha1"
37
	"github.com/ai-dynamo/dynamo/deploy/cloud/operator/internal/consts"
38
39
	commonController "github.com/ai-dynamo/dynamo/deploy/cloud/operator/internal/controller_common"
	"github.com/ai-dynamo/dynamo/deploy/cloud/operator/internal/dynamo"
Neelay Shah's avatar
Neelay Shah committed
40
41
42
43
44
45
)

const (
	FailedState  = "failed"
	ReadyState   = "successful"
	PendingState = "pending"
46
47

	DYN_DEPLOYMENT_CONFIG_ENV_VAR = "DYN_DEPLOYMENT_CONFIG"
Neelay Shah's avatar
Neelay Shah committed
48
49
)

50
51
52
53
type etcdStorage interface {
	DeleteKeys(ctx context.Context, prefix string) error
}

54
55
// DynamoGraphDeploymentReconciler reconciles a DynamoGraphDeployment object
type DynamoGraphDeploymentReconciler struct {
Neelay Shah's avatar
Neelay Shah committed
56
	client.Client
57
58
59
60
61
62
	Config                     commonController.Config
	Recorder                   record.EventRecorder
	VirtualServiceGateway      string
	IngressControllerClassName string
	IngressControllerTLSSecret string
	IngressHostSuffix          string
Neelay Shah's avatar
Neelay Shah committed
63
64
}

65
66
67
// +kubebuilder:rbac:groups=nvidia.com,resources=dynamographdeployments,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=nvidia.com,resources=dynamographdeployments/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=nvidia.com,resources=dynamographdeployments/finalizers,verbs=update
Neelay Shah's avatar
Neelay Shah committed
68
69
70
71

// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
// TODO(user): Modify the Reconcile function to compare the state specified by
72
// the DynamoGraphDeployment object against the actual cluster state, and then
Neelay Shah's avatar
Neelay Shah committed
73
74
75
76
77
// perform operations to make the cluster state reflect the state specified by
// the user.
//
// For more details, check Reconcile and its Result here:
// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.19.1/pkg/reconcile
78
func (r *DynamoGraphDeploymentReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
Neelay Shah's avatar
Neelay Shah committed
79
80
81
82
	logger := log.FromContext(ctx)

	var err error
	reason := "undefined"
83
	message := ""
Neelay Shah's avatar
Neelay Shah committed
84
85
	readyStatus := metav1.ConditionFalse
	// retrieve the CRD
86
	dynamoDeployment := &nvidiacomv1alpha1.DynamoGraphDeployment{}
Neelay Shah's avatar
Neelay Shah committed
87
88
89
90
91
92
93
94
95
96
97
98
	if err = r.Get(ctx, req.NamespacedName, dynamoDeployment); err != nil {
		return ctrl.Result{}, client.IgnoreNotFound(err)
	}
	if err != nil {
		// not found, nothing to do
		return ctrl.Result{}, nil
	}

	defer func() {
		if err != nil {
			dynamoDeployment.SetState(FailedState)
			message = err.Error()
99
			logger.Error(err, "Reconciliation failed")
Neelay Shah's avatar
Neelay Shah committed
100
101
		}
		// update the CRD status condition
102
103
104
105
106
107
108
		dynamoDeployment.AddStatusCondition(metav1.Condition{
			Type:               "Ready",
			Status:             readyStatus,
			Reason:             reason,
			Message:            message,
			LastTransitionTime: metav1.Now(),
		})
Neelay Shah's avatar
Neelay Shah committed
109
110
111
112
113
114
115
		err = r.Status().Update(ctx, dynamoDeployment)
		if err != nil {
			logger.Error(err, "Unable to update the CRD status", "crd", req.NamespacedName)
		}
		logger.Info("Reconciliation done")
	}()

116
117
	deleted, err := commonController.HandleFinalizer(ctx, dynamoDeployment, r.Client, r)
	if err != nil {
118
		logger.Error(err, "failed to handle the finalizer")
119
120
121
122
123
124
125
		reason = "failed_to_handle_the_finalizer"
		return ctrl.Result{}, err
	}
	if deleted {
		return ctrl.Result{}, nil
	}

126
127
	// fetch the dynamoGraphConfig
	dynamoGraphConfig, err := dynamo.GetDynamoGraphConfig(ctx, dynamoDeployment, r.Recorder)
Neelay Shah's avatar
Neelay Shah committed
128
	if err != nil {
129
		logger.Error(err, "failed to get the DynamoGraphConfig")
130
		reason = "failed_to_get_the_DynamoGraphConfig"
Neelay Shah's avatar
Neelay Shah committed
131
132
133
		return ctrl.Result{}, err
	}

134
135
	// generate the dynamoComponentsDeployments from the config
	dynamoComponentsDeployments, err := dynamo.GenerateDynamoComponentsDeployments(ctx, dynamoDeployment, dynamoGraphConfig, r.generateDefaultIngressSpec(dynamoDeployment))
Neelay Shah's avatar
Neelay Shah committed
136
	if err != nil {
137
		logger.Error(err, "failed to generate the DynamoComponentsDeployments")
138
		reason = "failed_to_generate_the_DynamoComponentsDeployments"
Neelay Shah's avatar
Neelay Shah committed
139
140
141
		return ctrl.Result{}, err
	}

142
143
	// merge the dynamoComponentsDeployments with the dynamoComponentsDeployments from the CRD
	for serviceName, deployment := range dynamoComponentsDeployments {
Neelay Shah's avatar
Neelay Shah committed
144
		if _, ok := dynamoDeployment.Spec.Services[serviceName]; ok {
145
			err := mergo.Merge(&deployment.Spec.DynamoComponentDeploymentSharedSpec, dynamoDeployment.Spec.Services[serviceName].DynamoComponentDeploymentSharedSpec, mergo.WithOverride)
Neelay Shah's avatar
Neelay Shah committed
146
			if err != nil {
147
				logger.Error(err, "failed to merge the DynamoComponentsDeployments")
148
				reason = "failed_to_merge_the_DynamoComponentsDeployments"
Neelay Shah's avatar
Neelay Shah committed
149
150
151
				return ctrl.Result{}, err
			}
		}
152
		if deployment.Spec.Ingress.Enabled {
153
			dynamoDeployment.SetEndpointStatus(r.isEndpointSecured(), getIngressHost(deployment.Spec.Ingress))
154
		}
Neelay Shah's avatar
Neelay Shah committed
155
156
	}

157
158
	// Set common env vars on each of the dynamoComponentsDeployments
	for _, deployment := range dynamoComponentsDeployments {
159
160
161
		if len(dynamoDeployment.Spec.Envs) > 0 {
			deployment.Spec.Envs = mergeEnvs(dynamoDeployment.Spec.Envs, deployment.Spec.Envs)
		}
162
163
164
165
166
		err := updateDynDeploymentConfig(deployment, consts.DynamoServicePort)
		if err != nil {
			logger.Error(err, fmt.Sprintf("Failed to update the %v env var", DYN_DEPLOYMENT_CONFIG_ENV_VAR))
			return ctrl.Result{}, err
		}
167
168
	}

169
	// reconcile the dynamoComponent
170
	// for now we use the same component for all the services and we differentiate them by the service name when launching the component
171
	dynamoComponent := &nvidiacomv1alpha1.DynamoComponent{
Neelay Shah's avatar
Neelay Shah committed
172
		ObjectMeta: metav1.ObjectMeta{
173
			Name:      getK8sName(dynamoDeployment.Spec.DynamoGraph),
Neelay Shah's avatar
Neelay Shah committed
174
175
			Namespace: dynamoDeployment.Namespace,
		},
176
		Spec: nvidiacomv1alpha1.DynamoComponentSpec{
177
			DynamoComponent: dynamoDeployment.Spec.DynamoGraph,
Neelay Shah's avatar
Neelay Shah committed
178
179
		},
	}
180
181
182
	_, dynamoComponent, err = commonController.SyncResource(ctx, r, dynamoDeployment, func(ctx context.Context) (*nvidiacomv1alpha1.DynamoComponent, bool, error) {
		return dynamoComponent, false, nil
	})
Neelay Shah's avatar
Neelay Shah committed
183
	if err != nil {
184
		logger.Error(err, "failed to sync the DynamoComponent")
185
		reason = "failed_to_sync_the_DynamoComponent"
Neelay Shah's avatar
Neelay Shah committed
186
187
		return ctrl.Result{}, err
	}
188
189
190
191
192
193
194
	if !dynamoComponent.IsReady() {
		logger.Info("The DynamoComponent is not ready")
		reason = "dynamoComponent_is_not_ready"
		message = "The DynamoComponent is not ready"
		readyStatus = metav1.ConditionFalse
		return ctrl.Result{}, nil
	}
Neelay Shah's avatar
Neelay Shah committed
195

196
	notReadyDeployments := []string{}
197
198
	// reconcile the dynamoComponentsDeployments
	for serviceName, dynamoComponentDeployment := range dynamoComponentsDeployments {
199
		logger.Info("Reconciling the DynamoComponentDeployment", "serviceName", serviceName, "dynamoComponentDeployment", dynamoComponentDeployment)
200
201
202
		_, dynamoComponentDeployment, err = commonController.SyncResource(ctx, r, dynamoDeployment, func(ctx context.Context) (*nvidiacomv1alpha1.DynamoComponentDeployment, bool, error) {
			return dynamoComponentDeployment, false, nil
		})
Neelay Shah's avatar
Neelay Shah committed
203
		if err != nil {
204
			logger.Error(err, "failed to sync the DynamoComponentDeployment")
205
			reason = "failed_to_sync_the_DynamoComponentDeployment"
Neelay Shah's avatar
Neelay Shah committed
206
207
			return ctrl.Result{}, err
		}
208
209
		if !dynamoComponentDeployment.Status.IsReady() {
			notReadyDeployments = append(notReadyDeployments, dynamoComponentDeployment.Name)
Neelay Shah's avatar
Neelay Shah committed
210
211
		}
	}
212
	if len(notReadyDeployments) == 0 {
Neelay Shah's avatar
Neelay Shah committed
213
		dynamoDeployment.SetState(ReadyState)
214
215
		reason = "all_deployments_are_ready"
		message = "All deployments are ready"
Neelay Shah's avatar
Neelay Shah committed
216
217
		readyStatus = metav1.ConditionTrue
	} else {
218
219
		reason = "some_deployments_are_not_ready"
		message = fmt.Sprintf("The following deployments are not ready: %v", notReadyDeployments)
Neelay Shah's avatar
Neelay Shah committed
220
221
222
223
224
225
226
		dynamoDeployment.SetState(PendingState)
	}

	return ctrl.Result{}, nil

}

227
func (r *DynamoGraphDeploymentReconciler) generateDefaultIngressSpec(dynamoDeployment *nvidiacomv1alpha1.DynamoGraphDeployment) *nvidiacomv1alpha1.IngressSpec {
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
	res := &nvidiacomv1alpha1.IngressSpec{
		Enabled:           r.VirtualServiceGateway != "" || r.IngressControllerClassName != "",
		Host:              dynamoDeployment.Name,
		UseVirtualService: r.VirtualServiceGateway != "",
	}
	if r.IngressControllerClassName != "" {
		res.IngressControllerClassName = &r.IngressControllerClassName
	}
	if r.IngressControllerTLSSecret != "" {
		res.TLS = &nvidiacomv1alpha1.IngressTLSSpec{
			SecretName: r.IngressControllerTLSSecret,
		}
	}
	if r.IngressHostSuffix != "" {
		res.HostSuffix = &r.IngressHostSuffix
	}
	if r.VirtualServiceGateway != "" {
		res.VirtualServiceGateway = &r.VirtualServiceGateway
	}
	return res
}

250
func (r *DynamoGraphDeploymentReconciler) isEndpointSecured() bool {
251
252
253
	if r.VirtualServiceGateway != "" && r.Config.VirtualServiceSupportsHTTPS {
		return true
	}
254
255
256
	return r.IngressControllerTLSSecret != ""
}

257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
func mergeEnvs(common, specific []corev1.EnvVar) []corev1.EnvVar {
	envMap := make(map[string]corev1.EnvVar)

	// Add all common environment variables.
	for _, env := range common {
		envMap[env.Name] = env
	}

	// Override or add with service-specific environment variables.
	for _, env := range specific {
		envMap[env.Name] = env
	}

	// Convert the map back to a slice.
	merged := make([]corev1.EnvVar, 0, len(envMap))
	for _, env := range envMap {
		merged = append(merged, env)
	}
	return merged
}

278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
// updateDynDeploymentConfig updates the DYN_DEPLOYMENT_CONFIG env var for the given dynamoDeploymentComponent
// It updates the port for the given service in the DYN_DEPLOYMENT_CONFIG env var (if it is the main component)
func updateDynDeploymentConfig(dynamoDeploymentComponent *nvidiacomv1alpha1.DynamoComponentDeployment, newPort int) error {
	if dynamoDeploymentComponent.IsMainComponent() {
		for i, env := range dynamoDeploymentComponent.Spec.Envs {
			if env.Name == DYN_DEPLOYMENT_CONFIG_ENV_VAR {
				var config map[string]any
				if err := json.Unmarshal([]byte(env.Value), &config); err != nil {
					return fmt.Errorf("failed to unmarshal %v: %w", DYN_DEPLOYMENT_CONFIG_ENV_VAR, err)
				}

				// Safely navigate and update the config
				if serviceConfig, ok := config[dynamoDeploymentComponent.Spec.ServiceName].(map[string]any); ok {
					if _, portExists := serviceConfig["port"]; portExists {
						serviceConfig["port"] = newPort
					}
				}

				// Marshal back to JSON string
				updated, err := json.Marshal(config)
				if err != nil {
					return fmt.Errorf("failed to marshal updated config: %w", err)
				}

				// Update env var
				dynamoDeploymentComponent.Spec.Envs[i].Value = string(updated)
				break
			}
		}
	}
	return nil
}

311
func (r *DynamoGraphDeploymentReconciler) FinalizeResource(ctx context.Context, dynamoDeployment *nvidiacomv1alpha1.DynamoGraphDeployment) error {
312
313
314
315
	// for now doing nothing
	return nil
}

Neelay Shah's avatar
Neelay Shah committed
316
// SetupWithManager sets up the controller with the Manager.
317
func (r *DynamoGraphDeploymentReconciler) SetupWithManager(mgr ctrl.Manager) error {
Neelay Shah's avatar
Neelay Shah committed
318
	return ctrl.NewControllerManagedBy(mgr).
319
		For(&nvidiacomv1alpha1.DynamoGraphDeployment{}, builder.WithPredicates(
320
321
			predicate.GenerationChangedPredicate{},
		)).
322
323
		Named("dynamographdeployment").
		Owns(&nvidiacomv1alpha1.DynamoComponentDeployment{}, builder.WithPredicates(predicate.Funcs{
Neelay Shah's avatar
Neelay Shah committed
324
325
326
327
328
329
			// ignore creation cause we don't want to be called again after we create the deployment
			CreateFunc:  func(ce event.CreateEvent) bool { return false },
			DeleteFunc:  func(de event.DeleteEvent) bool { return true },
			UpdateFunc:  func(de event.UpdateEvent) bool { return true },
			GenericFunc: func(ge event.GenericEvent) bool { return true },
		})).
330
331
332
333
334
335
336
		Owns(&nvidiacomv1alpha1.DynamoComponent{}, builder.WithPredicates(predicate.Funcs{
			// ignore creation cause we don't want to be called again after we create the deployment
			CreateFunc:  func(ce event.CreateEvent) bool { return false },
			DeleteFunc:  func(de event.DeleteEvent) bool { return true },
			UpdateFunc:  func(de event.UpdateEvent) bool { return true },
			GenericFunc: func(ge event.GenericEvent) bool { return true },
		})).
Neelay Shah's avatar
Neelay Shah committed
337
338
339
		WithEventFilter(commonController.EphemeralDeploymentEventFilter(r.Config)).
		Complete(r)
}
340
341
342
343

func (r *DynamoGraphDeploymentReconciler) GetRecorder() record.EventRecorder {
	return r.Recorder
}