"lib/runtime/Cargo.lock" did not exist on "8588e33a464d9f82d6ad93a433590a3bc3ff92de"
dynamographdeployment_controller.go 10.9 KB
Newer Older
Neelay Shah's avatar
Neelay Shah committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/*
 * SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
 * SPDX-License-Identifier: Apache-2.0
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package controller

import (
	"context"
22
	"fmt"
Neelay Shah's avatar
Neelay Shah committed
23
24

	"dario.cat/mergo"
25
	corev1 "k8s.io/api/core/v1"
Neelay Shah's avatar
Neelay Shah committed
26
27
28
29
30
31
32
33
34
35
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/apimachinery/pkg/runtime"
	"k8s.io/client-go/tools/record"
	ctrl "sigs.k8s.io/controller-runtime"
	"sigs.k8s.io/controller-runtime/pkg/builder"
	"sigs.k8s.io/controller-runtime/pkg/client"
	"sigs.k8s.io/controller-runtime/pkg/event"
	"sigs.k8s.io/controller-runtime/pkg/log"
	"sigs.k8s.io/controller-runtime/pkg/predicate"

36
37
38
	nvidiacomv1alpha1 "github.com/ai-dynamo/dynamo/deploy/cloud/operator/api/v1alpha1"
	commonController "github.com/ai-dynamo/dynamo/deploy/cloud/operator/internal/controller_common"
	"github.com/ai-dynamo/dynamo/deploy/cloud/operator/internal/dynamo"
Neelay Shah's avatar
Neelay Shah committed
39
40
41
42
43
44
45
46
)

const (
	FailedState  = "failed"
	ReadyState   = "successful"
	PendingState = "pending"
)

47
48
49
50
type etcdStorage interface {
	DeleteKeys(ctx context.Context, prefix string) error
}

51
52
// DynamoGraphDeploymentReconciler reconciles a DynamoGraphDeployment object
type DynamoGraphDeploymentReconciler struct {
Neelay Shah's avatar
Neelay Shah committed
53
	client.Client
54
55
56
57
58
59
60
	Scheme                     *runtime.Scheme
	Config                     commonController.Config
	Recorder                   record.EventRecorder
	VirtualServiceGateway      string
	IngressControllerClassName string
	IngressControllerTLSSecret string
	IngressHostSuffix          string
Neelay Shah's avatar
Neelay Shah committed
61
62
}

63
64
65
// +kubebuilder:rbac:groups=nvidia.com,resources=dynamographdeployments,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=nvidia.com,resources=dynamographdeployments/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=nvidia.com,resources=dynamographdeployments/finalizers,verbs=update
Neelay Shah's avatar
Neelay Shah committed
66
67
68
69

// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
// TODO(user): Modify the Reconcile function to compare the state specified by
70
// the DynamoGraphDeployment object against the actual cluster state, and then
Neelay Shah's avatar
Neelay Shah committed
71
72
73
74
75
// perform operations to make the cluster state reflect the state specified by
// the user.
//
// For more details, check Reconcile and its Result here:
// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.19.1/pkg/reconcile
76
func (r *DynamoGraphDeploymentReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
Neelay Shah's avatar
Neelay Shah committed
77
78
79
80
	logger := log.FromContext(ctx)

	var err error
	reason := "undefined"
81
	message := ""
Neelay Shah's avatar
Neelay Shah committed
82
83
	readyStatus := metav1.ConditionFalse
	// retrieve the CRD
84
	dynamoDeployment := &nvidiacomv1alpha1.DynamoGraphDeployment{}
Neelay Shah's avatar
Neelay Shah committed
85
86
87
88
89
90
91
92
93
94
95
96
97
98
	if err = r.Get(ctx, req.NamespacedName, dynamoDeployment); err != nil {
		return ctrl.Result{}, client.IgnoreNotFound(err)
	}
	if err != nil {
		// not found, nothing to do
		return ctrl.Result{}, nil
	}

	defer func() {
		if err != nil {
			dynamoDeployment.SetState(FailedState)
			message = err.Error()
		}
		// update the CRD status condition
99
100
101
102
103
104
105
		dynamoDeployment.AddStatusCondition(metav1.Condition{
			Type:               "Ready",
			Status:             readyStatus,
			Reason:             reason,
			Message:            message,
			LastTransitionTime: metav1.Now(),
		})
Neelay Shah's avatar
Neelay Shah committed
106
107
108
109
110
111
112
		err = r.Status().Update(ctx, dynamoDeployment)
		if err != nil {
			logger.Error(err, "Unable to update the CRD status", "crd", req.NamespacedName)
		}
		logger.Info("Reconciliation done")
	}()

113
114
115
116
117
118
119
120
121
	deleted, err := commonController.HandleFinalizer(ctx, dynamoDeployment, r.Client, r)
	if err != nil {
		reason = "failed_to_handle_the_finalizer"
		return ctrl.Result{}, err
	}
	if deleted {
		return ctrl.Result{}, nil
	}

122
123
	// fetch the dynamoGraphConfig
	dynamoGraphConfig, err := dynamo.GetDynamoGraphConfig(ctx, dynamoDeployment, r.Recorder)
Neelay Shah's avatar
Neelay Shah committed
124
	if err != nil {
125
		reason = "failed_to_get_the_DynamoGraphConfig"
Neelay Shah's avatar
Neelay Shah committed
126
127
128
		return ctrl.Result{}, err
	}

129
130
	// generate the dynamoComponentsDeployments from the config
	dynamoComponentsDeployments, err := dynamo.GenerateDynamoComponentsDeployments(ctx, dynamoDeployment, dynamoGraphConfig, r.generateDefaultIngressSpec(dynamoDeployment))
Neelay Shah's avatar
Neelay Shah committed
131
	if err != nil {
132
		reason = "failed_to_generate_the_DynamoComponentsDeployments"
Neelay Shah's avatar
Neelay Shah committed
133
134
135
		return ctrl.Result{}, err
	}

136
137
	// merge the dynamoComponentsDeployments with the dynamoComponentsDeployments from the CRD
	for serviceName, deployment := range dynamoComponentsDeployments {
Neelay Shah's avatar
Neelay Shah committed
138
		if _, ok := dynamoDeployment.Spec.Services[serviceName]; ok {
139
			err := mergo.Merge(&deployment.Spec.DynamoComponentDeploymentSharedSpec, dynamoDeployment.Spec.Services[serviceName].DynamoComponentDeploymentSharedSpec, mergo.WithOverride)
Neelay Shah's avatar
Neelay Shah committed
140
			if err != nil {
141
				reason = "failed_to_merge_the_DynamoComponentsDeployments"
Neelay Shah's avatar
Neelay Shah committed
142
143
144
				return ctrl.Result{}, err
			}
		}
145
		if deployment.Spec.Ingress.Enabled {
146
			dynamoDeployment.SetEndpointStatus(r.isEndpointSecured(), getIngressHost(deployment.Spec.Ingress))
147
		}
Neelay Shah's avatar
Neelay Shah committed
148
149
	}

150
151
	// Set common env vars on each of the dynamoComponentsDeployments
	for _, deployment := range dynamoComponentsDeployments {
152
153
154
155
156
		if len(dynamoDeployment.Spec.Envs) > 0 {
			deployment.Spec.Envs = mergeEnvs(dynamoDeployment.Spec.Envs, deployment.Spec.Envs)
		}
	}

157
	// reconcile the dynamoComponent
158
	// for now we use the same component for all the services and we differentiate them by the service name when launching the component
159
	dynamoComponent := &nvidiacomv1alpha1.DynamoComponent{
Neelay Shah's avatar
Neelay Shah committed
160
		ObjectMeta: metav1.ObjectMeta{
161
			Name:      getK8sName(dynamoDeployment.Spec.DynamoGraph),
Neelay Shah's avatar
Neelay Shah committed
162
163
			Namespace: dynamoDeployment.Namespace,
		},
164
		Spec: nvidiacomv1alpha1.DynamoComponentSpec{
165
			DynamoComponent: dynamoDeployment.Spec.DynamoGraph,
Neelay Shah's avatar
Neelay Shah committed
166
167
		},
	}
168
169
	if err := ctrl.SetControllerReference(dynamoDeployment, dynamoComponent, r.Scheme); err != nil {
		reason = "failed_to_set_the_controller_reference_for_the_DynamoComponent"
Neelay Shah's avatar
Neelay Shah committed
170
171
		return ctrl.Result{}, err
	}
172
	dynamoComponent, err = commonController.SyncResource(ctx, r.Client, dynamoComponent, false)
Neelay Shah's avatar
Neelay Shah committed
173
	if err != nil {
174
		reason = "failed_to_sync_the_DynamoComponent"
Neelay Shah's avatar
Neelay Shah committed
175
176
		return ctrl.Result{}, err
	}
177
178
179
180
181
182
183
	if !dynamoComponent.IsReady() {
		logger.Info("The DynamoComponent is not ready")
		reason = "dynamoComponent_is_not_ready"
		message = "The DynamoComponent is not ready"
		readyStatus = metav1.ConditionFalse
		return ctrl.Result{}, nil
	}
Neelay Shah's avatar
Neelay Shah committed
184

185
	notReadyDeployments := []string{}
186
187
	// reconcile the dynamoComponentsDeployments
	for serviceName, dynamoComponentDeployment := range dynamoComponentsDeployments {
188
		logger.Info("Reconciling the DynamoComponentDeployment", "serviceName", serviceName, "dynamoComponentDeployment", dynamoComponentDeployment)
189
190
		if err := ctrl.SetControllerReference(dynamoDeployment, dynamoComponentDeployment, r.Scheme); err != nil {
			reason = "failed_to_set_the_controller_reference_for_the_DynamoComponentDeployment"
Neelay Shah's avatar
Neelay Shah committed
191
192
			return ctrl.Result{}, err
		}
193
		dynamoComponentDeployment, err = commonController.SyncResource(ctx, r.Client, dynamoComponentDeployment, false)
Neelay Shah's avatar
Neelay Shah committed
194
		if err != nil {
195
			reason = "failed_to_sync_the_DynamoComponentDeployment"
Neelay Shah's avatar
Neelay Shah committed
196
197
			return ctrl.Result{}, err
		}
198
199
		if !dynamoComponentDeployment.Status.IsReady() {
			notReadyDeployments = append(notReadyDeployments, dynamoComponentDeployment.Name)
Neelay Shah's avatar
Neelay Shah committed
200
201
		}
	}
202
	if len(notReadyDeployments) == 0 {
Neelay Shah's avatar
Neelay Shah committed
203
		dynamoDeployment.SetState(ReadyState)
204
205
		reason = "all_deployments_are_ready"
		message = "All deployments are ready"
Neelay Shah's avatar
Neelay Shah committed
206
207
		readyStatus = metav1.ConditionTrue
	} else {
208
209
		reason = "some_deployments_are_not_ready"
		message = fmt.Sprintf("The following deployments are not ready: %v", notReadyDeployments)
Neelay Shah's avatar
Neelay Shah committed
210
211
212
213
214
215
216
		dynamoDeployment.SetState(PendingState)
	}

	return ctrl.Result{}, nil

}

217
func (r *DynamoGraphDeploymentReconciler) generateDefaultIngressSpec(dynamoDeployment *nvidiacomv1alpha1.DynamoGraphDeployment) *nvidiacomv1alpha1.IngressSpec {
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
	res := &nvidiacomv1alpha1.IngressSpec{
		Enabled:           r.VirtualServiceGateway != "" || r.IngressControllerClassName != "",
		Host:              dynamoDeployment.Name,
		UseVirtualService: r.VirtualServiceGateway != "",
	}
	if r.IngressControllerClassName != "" {
		res.IngressControllerClassName = &r.IngressControllerClassName
	}
	if r.IngressControllerTLSSecret != "" {
		res.TLS = &nvidiacomv1alpha1.IngressTLSSpec{
			SecretName: r.IngressControllerTLSSecret,
		}
	}
	if r.IngressHostSuffix != "" {
		res.HostSuffix = &r.IngressHostSuffix
	}
	if r.VirtualServiceGateway != "" {
		res.VirtualServiceGateway = &r.VirtualServiceGateway
	}
	return res
}

240
func (r *DynamoGraphDeploymentReconciler) isEndpointSecured() bool {
241
242
243
	if r.VirtualServiceGateway != "" && r.Config.VirtualServiceSupportsHTTPS {
		return true
	}
244
245
246
	return r.IngressControllerTLSSecret != ""
}

247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
func mergeEnvs(common, specific []corev1.EnvVar) []corev1.EnvVar {
	envMap := make(map[string]corev1.EnvVar)

	// Add all common environment variables.
	for _, env := range common {
		envMap[env.Name] = env
	}

	// Override or add with service-specific environment variables.
	for _, env := range specific {
		envMap[env.Name] = env
	}

	// Convert the map back to a slice.
	merged := make([]corev1.EnvVar, 0, len(envMap))
	for _, env := range envMap {
		merged = append(merged, env)
	}
	return merged
}

268
func (r *DynamoGraphDeploymentReconciler) FinalizeResource(ctx context.Context, dynamoDeployment *nvidiacomv1alpha1.DynamoGraphDeployment) error {
269
270
271
272
	// for now doing nothing
	return nil
}

Neelay Shah's avatar
Neelay Shah committed
273
// SetupWithManager sets up the controller with the Manager.
274
func (r *DynamoGraphDeploymentReconciler) SetupWithManager(mgr ctrl.Manager) error {
Neelay Shah's avatar
Neelay Shah committed
275
	return ctrl.NewControllerManagedBy(mgr).
276
		For(&nvidiacomv1alpha1.DynamoGraphDeployment{}, builder.WithPredicates(
277
278
			predicate.GenerationChangedPredicate{},
		)).
279
280
		Named("dynamographdeployment").
		Owns(&nvidiacomv1alpha1.DynamoComponentDeployment{}, builder.WithPredicates(predicate.Funcs{
Neelay Shah's avatar
Neelay Shah committed
281
282
283
284
285
286
			// ignore creation cause we don't want to be called again after we create the deployment
			CreateFunc:  func(ce event.CreateEvent) bool { return false },
			DeleteFunc:  func(de event.DeleteEvent) bool { return true },
			UpdateFunc:  func(de event.UpdateEvent) bool { return true },
			GenericFunc: func(ge event.GenericEvent) bool { return true },
		})).
287
288
289
290
291
292
293
		Owns(&nvidiacomv1alpha1.DynamoComponent{}, builder.WithPredicates(predicate.Funcs{
			// ignore creation cause we don't want to be called again after we create the deployment
			CreateFunc:  func(ce event.CreateEvent) bool { return false },
			DeleteFunc:  func(de event.DeleteEvent) bool { return true },
			UpdateFunc:  func(de event.UpdateEvent) bool { return true },
			GenericFunc: func(ge event.GenericEvent) bool { return true },
		})).
Neelay Shah's avatar
Neelay Shah committed
294
295
296
		WithEventFilter(commonController.EphemeralDeploymentEventFilter(r.Config)).
		Complete(r)
}