dynamographdeployment_controller.go 15.8 KB
Newer Older
Neelay Shah's avatar
Neelay Shah committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/*
 * SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
 * SPDX-License-Identifier: Apache-2.0
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package controller

import (
	"context"
22
	"encoding/json"
23
	"fmt"
Neelay Shah's avatar
Neelay Shah committed
24
25

	"dario.cat/mergo"
26
	corev1 "k8s.io/api/core/v1"
Neelay Shah's avatar
Neelay Shah committed
27
28
29
30
31
32
33
34
35
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/client-go/tools/record"
	ctrl "sigs.k8s.io/controller-runtime"
	"sigs.k8s.io/controller-runtime/pkg/builder"
	"sigs.k8s.io/controller-runtime/pkg/client"
	"sigs.k8s.io/controller-runtime/pkg/event"
	"sigs.k8s.io/controller-runtime/pkg/log"
	"sigs.k8s.io/controller-runtime/pkg/predicate"

36
	dynamoCommon "github.com/ai-dynamo/dynamo/deploy/cloud/operator/api/dynamo/common"
37
	nvidiacomv1alpha1 "github.com/ai-dynamo/dynamo/deploy/cloud/operator/api/v1alpha1"
38
	"github.com/ai-dynamo/dynamo/deploy/cloud/operator/internal/consts"
39
40
	commonController "github.com/ai-dynamo/dynamo/deploy/cloud/operator/internal/controller_common"
	"github.com/ai-dynamo/dynamo/deploy/cloud/operator/internal/dynamo"
Neelay Shah's avatar
Neelay Shah committed
41
42
43
44
45
46
)

const (
	FailedState  = "failed"
	ReadyState   = "successful"
	PendingState = "pending"
47
48

	DYN_DEPLOYMENT_CONFIG_ENV_VAR = "DYN_DEPLOYMENT_CONFIG"
Neelay Shah's avatar
Neelay Shah committed
49
50
)

51
52
53
54
type etcdStorage interface {
	DeleteKeys(ctx context.Context, prefix string) error
}

55
56
// DynamoGraphDeploymentReconciler reconciles a DynamoGraphDeployment object
type DynamoGraphDeploymentReconciler struct {
Neelay Shah's avatar
Neelay Shah committed
57
	client.Client
58
59
60
61
62
63
	Config                     commonController.Config
	Recorder                   record.EventRecorder
	VirtualServiceGateway      string
	IngressControllerClassName string
	IngressControllerTLSSecret string
	IngressHostSuffix          string
Neelay Shah's avatar
Neelay Shah committed
64
65
}

66
67
68
// +kubebuilder:rbac:groups=nvidia.com,resources=dynamographdeployments,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=nvidia.com,resources=dynamographdeployments/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=nvidia.com,resources=dynamographdeployments/finalizers,verbs=update
Neelay Shah's avatar
Neelay Shah committed
69
70
71
72

// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
// TODO(user): Modify the Reconcile function to compare the state specified by
73
// the DynamoGraphDeployment object against the actual cluster state, and then
Neelay Shah's avatar
Neelay Shah committed
74
75
76
77
78
// perform operations to make the cluster state reflect the state specified by
// the user.
//
// For more details, check Reconcile and its Result here:
// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.19.1/pkg/reconcile
79
func (r *DynamoGraphDeploymentReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
Neelay Shah's avatar
Neelay Shah committed
80
81
82
83
	logger := log.FromContext(ctx)

	var err error
	reason := "undefined"
84
	message := ""
Neelay Shah's avatar
Neelay Shah committed
85
86
	readyStatus := metav1.ConditionFalse
	// retrieve the CRD
87
	dynamoDeployment := &nvidiacomv1alpha1.DynamoGraphDeployment{}
Neelay Shah's avatar
Neelay Shah committed
88
89
90
91
92
93
94
95
96
97
98
99
	if err = r.Get(ctx, req.NamespacedName, dynamoDeployment); err != nil {
		return ctrl.Result{}, client.IgnoreNotFound(err)
	}
	if err != nil {
		// not found, nothing to do
		return ctrl.Result{}, nil
	}

	defer func() {
		if err != nil {
			dynamoDeployment.SetState(FailedState)
			message = err.Error()
100
			logger.Error(err, "Reconciliation failed")
Neelay Shah's avatar
Neelay Shah committed
101
102
		}
		// update the CRD status condition
103
104
105
106
107
108
109
		dynamoDeployment.AddStatusCondition(metav1.Condition{
			Type:               "Ready",
			Status:             readyStatus,
			Reason:             reason,
			Message:            message,
			LastTransitionTime: metav1.Now(),
		})
Neelay Shah's avatar
Neelay Shah committed
110
111
112
113
114
115
116
		err = r.Status().Update(ctx, dynamoDeployment)
		if err != nil {
			logger.Error(err, "Unable to update the CRD status", "crd", req.NamespacedName)
		}
		logger.Info("Reconciliation done")
	}()

117
118
	deleted, err := commonController.HandleFinalizer(ctx, dynamoDeployment, r.Client, r)
	if err != nil {
119
		logger.Error(err, "failed to handle the finalizer")
120
121
122
123
124
125
126
		reason = "failed_to_handle_the_finalizer"
		return ctrl.Result{}, err
	}
	if deleted {
		return ctrl.Result{}, nil
	}

127
128
	// fetch the dynamoGraphConfig
	dynamoGraphConfig, err := dynamo.GetDynamoGraphConfig(ctx, dynamoDeployment, r.Recorder)
Neelay Shah's avatar
Neelay Shah committed
129
	if err != nil {
130
		logger.Error(err, "failed to get the DynamoGraphConfig")
131
		reason = "failed_to_get_the_DynamoGraphConfig"
Neelay Shah's avatar
Neelay Shah committed
132
133
134
		return ctrl.Result{}, err
	}

135
136
	// generate the dynamoComponentsDeployments from the config
	dynamoComponentsDeployments, err := dynamo.GenerateDynamoComponentsDeployments(ctx, dynamoDeployment, dynamoGraphConfig, r.generateDefaultIngressSpec(dynamoDeployment))
Neelay Shah's avatar
Neelay Shah committed
137
	if err != nil {
138
		logger.Error(err, "failed to generate the DynamoComponentsDeployments")
139
		reason = "failed_to_generate_the_DynamoComponentsDeployments"
Neelay Shah's avatar
Neelay Shah committed
140
141
142
		return ctrl.Result{}, err
	}

143
144
	// merge the dynamoComponentsDeployments with the dynamoComponentsDeployments from the CRD
	for serviceName, deployment := range dynamoComponentsDeployments {
Neelay Shah's avatar
Neelay Shah committed
145
		if _, ok := dynamoDeployment.Spec.Services[serviceName]; ok {
146
			err := mergo.Merge(&deployment.Spec.DynamoComponentDeploymentSharedSpec, dynamoDeployment.Spec.Services[serviceName].DynamoComponentDeploymentSharedSpec, mergo.WithOverride)
Neelay Shah's avatar
Neelay Shah committed
147
			if err != nil {
148
				logger.Error(err, "failed to merge the DynamoComponentsDeployments")
149
				reason = "failed_to_merge_the_DynamoComponentsDeployments"
Neelay Shah's avatar
Neelay Shah committed
150
151
152
				return ctrl.Result{}, err
			}
		}
153
		if deployment.Spec.Ingress.Enabled {
154
			dynamoDeployment.SetEndpointStatus(r.isEndpointSecured(), getIngressHost(deployment.Spec.Ingress))
155
		}
Neelay Shah's avatar
Neelay Shah committed
156
157
	}

158
159
	// Set common env vars on each of the dynamoComponentsDeployments
	for _, deployment := range dynamoComponentsDeployments {
160
161
162
		if len(dynamoDeployment.Spec.Envs) > 0 {
			deployment.Spec.Envs = mergeEnvs(dynamoDeployment.Spec.Envs, deployment.Spec.Envs)
		}
163
164
165
166
167
		err := updateDynDeploymentConfig(deployment, consts.DynamoServicePort)
		if err != nil {
			logger.Error(err, fmt.Sprintf("Failed to update the %v env var", DYN_DEPLOYMENT_CONFIG_ENV_VAR))
			return ctrl.Result{}, err
		}
168
169
170
171
172
		err = overrideWithDynDeploymentConfig(ctx, deployment)
		if err != nil {
			logger.Error(err, fmt.Sprintf("Failed to override the component config with the %v env var", DYN_DEPLOYMENT_CONFIG_ENV_VAR))
			return ctrl.Result{}, err
		}
173
174
	}

175
	// reconcile the dynamoComponent
176
	// for now we use the same component for all the services and we differentiate them by the service name when launching the component
177
	dynamoComponent := &nvidiacomv1alpha1.DynamoComponent{
Neelay Shah's avatar
Neelay Shah committed
178
		ObjectMeta: metav1.ObjectMeta{
179
			Name:      getK8sName(dynamoDeployment.Spec.DynamoGraph),
Neelay Shah's avatar
Neelay Shah committed
180
181
			Namespace: dynamoDeployment.Namespace,
		},
182
		Spec: nvidiacomv1alpha1.DynamoComponentSpec{
183
			DynamoComponent: dynamoDeployment.Spec.DynamoGraph,
Neelay Shah's avatar
Neelay Shah committed
184
185
		},
	}
186
187
188
	_, dynamoComponent, err = commonController.SyncResource(ctx, r, dynamoDeployment, func(ctx context.Context) (*nvidiacomv1alpha1.DynamoComponent, bool, error) {
		return dynamoComponent, false, nil
	})
Neelay Shah's avatar
Neelay Shah committed
189
	if err != nil {
190
		logger.Error(err, "failed to sync the DynamoComponent")
191
		reason = "failed_to_sync_the_DynamoComponent"
Neelay Shah's avatar
Neelay Shah committed
192
193
		return ctrl.Result{}, err
	}
194
195
196
197
198
199
200
	if !dynamoComponent.IsReady() {
		logger.Info("The DynamoComponent is not ready")
		reason = "dynamoComponent_is_not_ready"
		message = "The DynamoComponent is not ready"
		readyStatus = metav1.ConditionFalse
		return ctrl.Result{}, nil
	}
Neelay Shah's avatar
Neelay Shah committed
201

202
	notReadyDeployments := []string{}
203
204
	// reconcile the dynamoComponentsDeployments
	for serviceName, dynamoComponentDeployment := range dynamoComponentsDeployments {
205
		logger.Info("Reconciling the DynamoComponentDeployment", "serviceName", serviceName, "dynamoComponentDeployment", dynamoComponentDeployment)
206
207
208
		_, dynamoComponentDeployment, err = commonController.SyncResource(ctx, r, dynamoDeployment, func(ctx context.Context) (*nvidiacomv1alpha1.DynamoComponentDeployment, bool, error) {
			return dynamoComponentDeployment, false, nil
		})
Neelay Shah's avatar
Neelay Shah committed
209
		if err != nil {
210
			logger.Error(err, "failed to sync the DynamoComponentDeployment")
211
			reason = "failed_to_sync_the_DynamoComponentDeployment"
Neelay Shah's avatar
Neelay Shah committed
212
213
			return ctrl.Result{}, err
		}
214
215
		if !dynamoComponentDeployment.Status.IsReady() {
			notReadyDeployments = append(notReadyDeployments, dynamoComponentDeployment.Name)
Neelay Shah's avatar
Neelay Shah committed
216
217
		}
	}
218
	if len(notReadyDeployments) == 0 {
Neelay Shah's avatar
Neelay Shah committed
219
		dynamoDeployment.SetState(ReadyState)
220
221
		reason = "all_deployments_are_ready"
		message = "All deployments are ready"
Neelay Shah's avatar
Neelay Shah committed
222
223
		readyStatus = metav1.ConditionTrue
	} else {
224
225
		reason = "some_deployments_are_not_ready"
		message = fmt.Sprintf("The following deployments are not ready: %v", notReadyDeployments)
Neelay Shah's avatar
Neelay Shah committed
226
227
228
229
230
231
232
		dynamoDeployment.SetState(PendingState)
	}

	return ctrl.Result{}, nil

}

233
func (r *DynamoGraphDeploymentReconciler) generateDefaultIngressSpec(dynamoDeployment *nvidiacomv1alpha1.DynamoGraphDeployment) *nvidiacomv1alpha1.IngressSpec {
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
	res := &nvidiacomv1alpha1.IngressSpec{
		Enabled:           r.VirtualServiceGateway != "" || r.IngressControllerClassName != "",
		Host:              dynamoDeployment.Name,
		UseVirtualService: r.VirtualServiceGateway != "",
	}
	if r.IngressControllerClassName != "" {
		res.IngressControllerClassName = &r.IngressControllerClassName
	}
	if r.IngressControllerTLSSecret != "" {
		res.TLS = &nvidiacomv1alpha1.IngressTLSSpec{
			SecretName: r.IngressControllerTLSSecret,
		}
	}
	if r.IngressHostSuffix != "" {
		res.HostSuffix = &r.IngressHostSuffix
	}
	if r.VirtualServiceGateway != "" {
		res.VirtualServiceGateway = &r.VirtualServiceGateway
	}
	return res
}

256
func (r *DynamoGraphDeploymentReconciler) isEndpointSecured() bool {
257
258
259
	if r.VirtualServiceGateway != "" && r.Config.VirtualServiceSupportsHTTPS {
		return true
	}
260
261
262
	return r.IngressControllerTLSSecret != ""
}

263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
func mergeEnvs(common, specific []corev1.EnvVar) []corev1.EnvVar {
	envMap := make(map[string]corev1.EnvVar)

	// Add all common environment variables.
	for _, env := range common {
		envMap[env.Name] = env
	}

	// Override or add with service-specific environment variables.
	for _, env := range specific {
		envMap[env.Name] = env
	}

	// Convert the map back to a slice.
	merged := make([]corev1.EnvVar, 0, len(envMap))
	for _, env := range envMap {
		merged = append(merged, env)
	}
	return merged
}

284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
// updateDynDeploymentConfig updates the DYN_DEPLOYMENT_CONFIG env var for the given dynamoDeploymentComponent
// It updates the port for the given service in the DYN_DEPLOYMENT_CONFIG env var (if it is the main component)
func updateDynDeploymentConfig(dynamoDeploymentComponent *nvidiacomv1alpha1.DynamoComponentDeployment, newPort int) error {
	if dynamoDeploymentComponent.IsMainComponent() {
		for i, env := range dynamoDeploymentComponent.Spec.Envs {
			if env.Name == DYN_DEPLOYMENT_CONFIG_ENV_VAR {
				var config map[string]any
				if err := json.Unmarshal([]byte(env.Value), &config); err != nil {
					return fmt.Errorf("failed to unmarshal %v: %w", DYN_DEPLOYMENT_CONFIG_ENV_VAR, err)
				}

				// Safely navigate and update the config
				if serviceConfig, ok := config[dynamoDeploymentComponent.Spec.ServiceName].(map[string]any); ok {
					if _, portExists := serviceConfig["port"]; portExists {
						serviceConfig["port"] = newPort
					}
				}

				// Marshal back to JSON string
				updated, err := json.Marshal(config)
				if err != nil {
					return fmt.Errorf("failed to marshal updated config: %w", err)
				}

				// Update env var
				dynamoDeploymentComponent.Spec.Envs[i].Value = string(updated)
				break
			}
		}
	}
	return nil
}

317
318
319
320
321
322
323
324
325
func overrideWithDynDeploymentConfig(ctx context.Context, dynamoDeploymentComponent *nvidiacomv1alpha1.DynamoComponentDeployment) error {
	for _, env := range dynamoDeploymentComponent.Spec.Envs {
		if env.Name == DYN_DEPLOYMENT_CONFIG_ENV_VAR {
			dynDeploymentConfig, err := dynamo.ParseDynDeploymentConfig(ctx, []byte(env.Value))
			if err != nil {
				return fmt.Errorf("failed to parse %v: %w", DYN_DEPLOYMENT_CONFIG_ENV_VAR, err)
			}
			componentDynConfig := dynDeploymentConfig[dynamoDeploymentComponent.Spec.ServiceName]
			if componentDynConfig != nil {
326
327
328
				if componentDynConfig.ServiceArgs != nil && componentDynConfig.ServiceArgs.Workers != nil && dynamoDeploymentComponent.Spec.Replicas == nil {
					// we only override the replicas if it is not set in the CRD.
					// replicas, if set in the CRD set in the CRD must always be the source of truth.
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
					dynamoDeploymentComponent.Spec.Replicas = componentDynConfig.ServiceArgs.Workers
				}
				if componentDynConfig.ServiceArgs != nil && componentDynConfig.ServiceArgs.Resources != nil {
					requests := &dynamoCommon.ResourceItem{}
					limits := &dynamoCommon.ResourceItem{}
					if dynamoDeploymentComponent.Spec.Resources == nil {
						dynamoDeploymentComponent.Spec.Resources = &dynamoCommon.Resources{
							Requests: requests,
							Limits:   limits,
						}
					} else {
						if dynamoDeploymentComponent.Spec.Resources.Requests != nil {
							requests = dynamoDeploymentComponent.Spec.Resources.Requests
						} else {
							dynamoDeploymentComponent.Spec.Resources.Requests = requests
						}
						if dynamoDeploymentComponent.Spec.Resources.Limits != nil {
							limits = dynamoDeploymentComponent.Spec.Resources.Limits
						} else {
							dynamoDeploymentComponent.Spec.Resources.Limits = limits
						}
					}
					if componentDynConfig.ServiceArgs.Resources.GPU != nil {
						requests.GPU = *componentDynConfig.ServiceArgs.Resources.GPU
						limits.GPU = *componentDynConfig.ServiceArgs.Resources.GPU
					}
					if componentDynConfig.ServiceArgs.Resources.CPU != nil {
						requests.CPU = *componentDynConfig.ServiceArgs.Resources.CPU
						limits.CPU = *componentDynConfig.ServiceArgs.Resources.CPU
					}
					if componentDynConfig.ServiceArgs.Resources.Memory != nil {
						requests.Memory = *componentDynConfig.ServiceArgs.Resources.Memory
						limits.Memory = *componentDynConfig.ServiceArgs.Resources.Memory
					}
					if componentDynConfig.ServiceArgs.Resources.Custom != nil {
						requests.Custom = componentDynConfig.ServiceArgs.Resources.Custom
						limits.Custom = componentDynConfig.ServiceArgs.Resources.Custom
					}
367
368
369
					if err := dynamo.SetLwsAnnotations(componentDynConfig.ServiceArgs, dynamoDeploymentComponent); err != nil {
						return err
					}
370
371
372
373
374
375
376
377
				}
			}
			break
		}
	}
	return nil
}

378
func (r *DynamoGraphDeploymentReconciler) FinalizeResource(ctx context.Context, dynamoDeployment *nvidiacomv1alpha1.DynamoGraphDeployment) error {
379
380
381
382
	// for now doing nothing
	return nil
}

Neelay Shah's avatar
Neelay Shah committed
383
// SetupWithManager sets up the controller with the Manager.
384
func (r *DynamoGraphDeploymentReconciler) SetupWithManager(mgr ctrl.Manager) error {
Neelay Shah's avatar
Neelay Shah committed
385
	return ctrl.NewControllerManagedBy(mgr).
386
		For(&nvidiacomv1alpha1.DynamoGraphDeployment{}, builder.WithPredicates(
387
388
			predicate.GenerationChangedPredicate{},
		)).
389
390
		Named("dynamographdeployment").
		Owns(&nvidiacomv1alpha1.DynamoComponentDeployment{}, builder.WithPredicates(predicate.Funcs{
Neelay Shah's avatar
Neelay Shah committed
391
392
393
394
395
396
			// ignore creation cause we don't want to be called again after we create the deployment
			CreateFunc:  func(ce event.CreateEvent) bool { return false },
			DeleteFunc:  func(de event.DeleteEvent) bool { return true },
			UpdateFunc:  func(de event.UpdateEvent) bool { return true },
			GenericFunc: func(ge event.GenericEvent) bool { return true },
		})).
397
398
399
400
401
402
403
		Owns(&nvidiacomv1alpha1.DynamoComponent{}, builder.WithPredicates(predicate.Funcs{
			// ignore creation cause we don't want to be called again after we create the deployment
			CreateFunc:  func(ce event.CreateEvent) bool { return false },
			DeleteFunc:  func(de event.DeleteEvent) bool { return true },
			UpdateFunc:  func(de event.UpdateEvent) bool { return true },
			GenericFunc: func(ge event.GenericEvent) bool { return true },
		})).
Neelay Shah's avatar
Neelay Shah committed
404
405
406
		WithEventFilter(commonController.EphemeralDeploymentEventFilter(r.Config)).
		Complete(r)
}
407
408
409
410

func (r *DynamoGraphDeploymentReconciler) GetRecorder() record.EventRecorder {
	return r.Recorder
}