dynamocomponentdeployment_controller.go 53.1 KB
Newer Older
1
/*
2
 * SPDX-FileCopyrightText: Copyright (c) 2022 Atalaya Tech. Inc
3
4
5
6
7
8
9
10
11
12
13
14
15
16
 * SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
 * SPDX-License-Identifier: Apache-2.0
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
17
 * Modifications Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
 */

package controller

import (
	"context"
	"fmt"
	"os"
	"sort"
	"strconv"
	"strings"
	"time"

	appsv1 "k8s.io/api/apps/v1"
	autoscalingv2 "k8s.io/api/autoscaling/v2"
	corev1 "k8s.io/api/core/v1"
	networkingv1 "k8s.io/api/networking/v1"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

	"emperror.dev/errors"
38
39
40
41
42
43
44
	dynamoCommon "github.com/ai-dynamo/dynamo/deploy/cloud/operator/api/dynamo/common"
	"github.com/ai-dynamo/dynamo/deploy/cloud/operator/api/dynamo/schemas"
	"github.com/ai-dynamo/dynamo/deploy/cloud/operator/api/v1alpha1"
	"github.com/ai-dynamo/dynamo/deploy/cloud/operator/internal/config"
	commonconsts "github.com/ai-dynamo/dynamo/deploy/cloud/operator/internal/consts"
	"github.com/ai-dynamo/dynamo/deploy/cloud/operator/internal/controller_common"
	commonController "github.com/ai-dynamo/dynamo/deploy/cloud/operator/internal/controller_common"
45
46
47
48
49
50
51
52
53
54
55
56
57
58
	"github.com/huandu/xstrings"
	istioNetworking "istio.io/api/networking/v1beta1"
	networkingv1beta1 "istio.io/client-go/pkg/apis/networking/v1beta1"
	k8serrors "k8s.io/apimachinery/pkg/api/errors"
	"k8s.io/apimachinery/pkg/api/meta"
	"k8s.io/apimachinery/pkg/api/resource"
	"k8s.io/apimachinery/pkg/types"
	"k8s.io/apimachinery/pkg/util/intstr"
	"k8s.io/client-go/tools/record"
	"k8s.io/utils/ptr"
	ctrl "sigs.k8s.io/controller-runtime"
	"sigs.k8s.io/controller-runtime/pkg/builder"
	"sigs.k8s.io/controller-runtime/pkg/client"
	"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
59
	"sigs.k8s.io/controller-runtime/pkg/event"
60
61
62
63
64
	"sigs.k8s.io/controller-runtime/pkg/log"
	"sigs.k8s.io/controller-runtime/pkg/predicate"
)

const (
65
66
67
68
69
70
71
72
73
74
75
	DefaultClusterName                                   = "default"
	DefaultServiceAccountName                            = "default"
	KubeValueNameSharedMemory                            = "shared-memory"
	KubeAnnotationDeploymentStrategy                     = "nvidia.com/deployment-strategy"
	KubeAnnotationEnableStealingTrafficDebugMode         = "nvidia.com/enable-stealing-traffic-debug-mode"
	KubeAnnotationEnableDebugMode                        = "nvidia.com/enable-debug-mode"
	KubeAnnotationEnableDebugPodReceiveProductionTraffic = "nvidia.com/enable-debug-pod-receive-production-traffic"
	DeploymentTargetTypeProduction                       = "production"
	DeploymentTargetTypeDebug                            = "debug"
	HeaderNameDebug                                      = "X-Nvidia-Debug"
	DefaultIngressSuffix                                 = "local"
76
	KubernetesDeploymentStrategy                         = "kubernetes"
77
78
)

79
80
// DynamoComponentDeploymentReconciler reconciles a DynamoComponentDeployment object
type DynamoComponentDeploymentReconciler struct {
81
	client.Client
82
83
84
85
86
87
	Recorder          record.EventRecorder
	Config            controller_common.Config
	NatsAddr          string
	EtcdAddr          string
	EtcdStorage       etcdStorage
	UseVirtualService bool
88
89
}

90
91
92
// +kubebuilder:rbac:groups=nvidia.com,resources=dynamocomponentdeployments,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=nvidia.com,resources=dynamocomponentdeployments/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=nvidia.com,resources=dynamocomponentdeployments/finalizers,verbs=update
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109

//+kubebuilder:rbac:groups=apps,resources=deployments,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=core,resources=pods,verbs=get;list;watch
//+kubebuilder:rbac:groups=core,resources=services,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=core,resources=configmaps,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=core,resources=events,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=autoscaling,resources=horizontalpodautoscalers,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=networking.k8s.io,resources=ingressclasses,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=networking.k8s.io,resources=ingresses,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=events.k8s.io,resources=events,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=coordination.k8s.io,resources=leases,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=networking.istio.io,resources=virtualservices,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=core,resources=persistentvolumeclaims,verbs=get;list;create;delete

// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
// TODO(user): Modify the Reconcile function to compare the state specified by
110
// the DynamoComponentDeployment object against the actual cluster state, and then
111
112
113
114
115
116
117
// perform operations to make the cluster state reflect the state specified by
// the user.
//
// For more details, check Reconcile and its Result here:
// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.18.2/pkg/reconcile
//
//nolint:gocyclo,nakedret
118
func (r *DynamoComponentDeploymentReconciler) Reconcile(ctx context.Context, req ctrl.Request) (result ctrl.Result, err error) {
119
120
	logs := log.FromContext(ctx)

121
122
	dynamoComponentDeployment := &v1alpha1.DynamoComponentDeployment{}
	err = r.Get(ctx, req.NamespacedName, dynamoComponentDeployment)
123
124
125
126
	if err != nil {
		if k8serrors.IsNotFound(err) {
			// Object not found, return.  Created objects are automatically garbage collected.
			// For additional cleanup logic use finalizers.
127
			logs.Info("DynamoComponentDeployment resource not found. Ignoring since object must be deleted.")
128
129
130
131
			err = nil
			return
		}
		// Error reading the object - requeue the request.
132
		logs.Error(err, "Failed to get DynamoComponentDeployment.")
133
134
135
		return
	}

136
	logs = logs.WithValues("dynamoComponentDeployment", dynamoComponentDeployment.Name, "namespace", dynamoComponentDeployment.Namespace)
137

138
	deleted, err := commonController.HandleFinalizer(ctx, dynamoComponentDeployment, r.Client, r)
139
140
141
142
143
144
145
146
	if err != nil {
		logs.Error(err, "Failed to handle finalizer")
		return ctrl.Result{}, err
	}
	if deleted {
		return ctrl.Result{}, nil
	}

147
148
149
150
151
	if len(dynamoComponentDeployment.Status.Conditions) == 0 {
		logs.Info("Starting to reconcile DynamoComponentDeployment")
		logs.Info("Initializing DynamoComponentDeployment status")
		r.Recorder.Event(dynamoComponentDeployment, corev1.EventTypeNormal, "Reconciling", "Starting to reconcile DynamoComponentDeployment")
		dynamoComponentDeployment, err = r.setStatusConditions(ctx, req,
152
			metav1.Condition{
153
				Type:    v1alpha1.DynamoGraphDeploymentConditionTypeAvailable,
154
155
				Status:  metav1.ConditionUnknown,
				Reason:  "Reconciling",
156
				Message: "Starting to reconcile DynamoComponentDeployment",
157
158
			},
			metav1.Condition{
159
				Type:    v1alpha1.DynamoGraphDeploymentConditionTypeDynamoComponentReady,
160
161
				Status:  metav1.ConditionUnknown,
				Reason:  "Reconciling",
162
				Message: "Starting to reconcile DynamoComponentDeployment",
163
164
165
166
167
168
169
170
171
172
173
			},
		)
		if err != nil {
			return
		}
	}

	defer func() {
		if err == nil {
			return
		}
174
175
		logs.Error(err, "Failed to reconcile DynamoComponentDeployment.")
		r.Recorder.Eventf(dynamoComponentDeployment, corev1.EventTypeWarning, "ReconcileError", "Failed to reconcile DynamoComponentDeployment: %v", err)
176
177
		_, err = r.setStatusConditions(ctx, req,
			metav1.Condition{
178
				Type:    v1alpha1.DynamoGraphDeploymentConditionTypeAvailable,
179
180
				Status:  metav1.ConditionFalse,
				Reason:  "Reconciling",
181
				Message: fmt.Sprintf("Failed to reconcile DynamoComponentDeployment: %v", err),
182
183
184
185
186
187
188
			},
		)
		if err != nil {
			return
		}
	}()

189
	// retrieve the dynamo component
190
	dynamoComponentCR := &v1alpha1.DynamoComponent{}
191
192
193
	err = r.Get(ctx, types.NamespacedName{Name: getK8sName(dynamoComponentDeployment.Spec.DynamoComponent), Namespace: dynamoComponentDeployment.Namespace}, dynamoComponentCR)
	if err != nil {
		logs.Error(err, "Failed to get DynamoComponent")
194
195
		return
	}
196
197
198
199
200

	// check if the component is ready
	if dynamoComponentCR.IsReady() {
		logs.Info(fmt.Sprintf("DynamoComponent %s ready", dynamoComponentDeployment.Spec.DynamoComponent))
		r.Recorder.Eventf(dynamoComponentDeployment, corev1.EventTypeNormal, "GetDynamoComponent", "DynamoComponent %s is ready", dynamoComponentDeployment.Spec.DynamoComponent)
201
		dynamoComponentDeployment, err = r.setStatusConditions(ctx, req,
202
			metav1.Condition{
203
204
				Type:    v1alpha1.DynamoGraphDeploymentConditionTypeDynamoComponentReady,
				Status:  metav1.ConditionTrue,
205
				Reason:  "Reconciling",
206
				Message: "DynamoComponent is ready",
207
208
209
210
211
			},
		)
		if err != nil {
			return
		}
212
213
214
215
	} else {
		logs.Info(fmt.Sprintf("DynamoComponent %s not ready", dynamoComponentDeployment.Spec.DynamoComponent))
		r.Recorder.Eventf(dynamoComponentDeployment, corev1.EventTypeWarning, "GetDynamoComponent", "DynamoComponent %s is not ready", dynamoComponentDeployment.Spec.DynamoComponent)
		_, err_ := r.setStatusConditions(ctx, req,
216
			metav1.Condition{
217
218
				Type:    v1alpha1.DynamoGraphDeploymentConditionTypeDynamoComponentReady,
				Status:  metav1.ConditionFalse,
219
				Reason:  "Reconciling",
220
				Message: "DynamoComponent not ready",
221
222
			},
			metav1.Condition{
223
224
				Type:    v1alpha1.DynamoGraphDeploymentConditionTypeAvailable,
				Status:  metav1.ConditionFalse,
225
				Reason:  "Reconciling",
226
				Message: "DynamoComponent not ready",
227
228
			},
		)
229
230
		err = err_
		return
231
232
233
234
235
	}

	modified := false

	// Reconcile PVC
236
	_, err = r.reconcilePVC(ctx, dynamoComponentDeployment)
237
238
239
240
241
242
	if err != nil {
		logs.Error(err, "Unable to create PVC", "crd", req.NamespacedName)
		return ctrl.Result{}, err
	}

	// create or update api-server deployment
243
	modified_, deployment, err := r.createOrUpdateOrDeleteDeployments(ctx, generateResourceOption{
244
245
		dynamoComponentDeployment: dynamoComponentDeployment,
		dynamoComponent:           dynamoComponentCR,
246
247
248
249
250
251
252
253
254
255
	})
	if err != nil {
		return
	}

	if modified_ {
		modified = true
	}

	// create or update api-server hpa
256
257
258
259
260
261
	modified_, _, err = commonController.SyncResource(ctx, r, dynamoComponentDeployment, func(ctx context.Context) (*autoscalingv2.HorizontalPodAutoscaler, bool, error) {
		return r.generateHPA(generateResourceOption{
			dynamoComponentDeployment: dynamoComponentDeployment,
			dynamoComponent:           dynamoComponentCR,
		})
	})
262
263
264
265
266
267
268
269
270
	if err != nil {
		return
	}

	if modified_ {
		modified = true
	}

	// create or update api-server service
271
	modified_, err = r.createOrUpdateOrDeleteServices(ctx, generateResourceOption{
272
273
		dynamoComponentDeployment: dynamoComponentDeployment,
		dynamoComponent:           dynamoComponentCR,
274
275
276
277
278
279
280
281
282
283
	})
	if err != nil {
		return
	}

	if modified_ {
		modified = true
	}

	// create or update api-server ingresses
284
	modified_, err = r.createOrUpdateOrDeleteIngress(ctx, generateResourceOption{
285
286
		dynamoComponentDeployment: dynamoComponentDeployment,
		dynamoComponent:           dynamoComponentCR,
287
	})
288
289
290
291
292
293
294
295
296
	if err != nil {
		return
	}

	if modified_ {
		modified = true
	}

	if !modified {
297
		r.Recorder.Eventf(dynamoComponentDeployment, corev1.EventTypeNormal, "UpdateDynamoGraphDeployment", "No changes to dynamo deployment %s", dynamoComponentDeployment.Name)
298
299
300
	}

	logs.Info("Finished reconciling.")
301
	r.Recorder.Eventf(dynamoComponentDeployment, corev1.EventTypeNormal, "Update", "All resources updated!")
302
	err = r.computeAvailableStatusCondition(ctx, req, deployment)
303
304
305
	return
}

306
func (r *DynamoComponentDeploymentReconciler) FinalizeResource(ctx context.Context, dynamoComponentDeployment *v1alpha1.DynamoComponentDeployment) error {
307
	logger := log.FromContext(ctx)
308
309
310
311
	logger.Info("Finalizing the DynamoComponentDeployment", "dynamoComponentDeployment", dynamoComponentDeployment)
	if dynamoComponentDeployment.Spec.ServiceName != "" && dynamoComponentDeployment.Spec.DynamoNamespace != nil && *dynamoComponentDeployment.Spec.DynamoNamespace != "" {
		logger.Info("Deleting the etcd keys for the service", "service", dynamoComponentDeployment.Spec.ServiceName, "dynamoNamespace", *dynamoComponentDeployment.Spec.DynamoNamespace)
		err := r.EtcdStorage.DeleteKeys(ctx, fmt.Sprintf("/%s/components/%s", *dynamoComponentDeployment.Spec.DynamoNamespace, dynamoComponentDeployment.Spec.ServiceName))
312
		if err != nil {
313
			logger.Error(err, "Failed to delete the etcd keys for the service", "service", dynamoComponentDeployment.Spec.ServiceName, "dynamoNamespace", *dynamoComponentDeployment.Spec.DynamoNamespace)
314
315
316
317
318
319
			return err
		}
	}
	return nil
}

320
func (r *DynamoComponentDeploymentReconciler) computeAvailableStatusCondition(ctx context.Context, req ctrl.Request, deployment *appsv1.Deployment) error {
321
322
323
324
325
	logs := log.FromContext(ctx)
	if IsDeploymentReady(deployment) {
		logs.Info("Deployment is ready. Setting available status condition to true.")
		_, err := r.setStatusConditions(ctx, req,
			metav1.Condition{
326
				Type:    v1alpha1.DynamoGraphDeploymentConditionTypeAvailable,
327
328
329
330
331
332
333
334
335
336
				Status:  metav1.ConditionTrue,
				Reason:  "DeploymentReady",
				Message: "Deployment is ready",
			},
		)
		return err
	} else {
		logs.Info("Deployment is not ready. Setting available status condition to false.")
		_, err := r.setStatusConditions(ctx, req,
			metav1.Condition{
337
				Type:    v1alpha1.DynamoGraphDeploymentConditionTypeAvailable,
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
				Status:  metav1.ConditionFalse,
				Reason:  "DeploymentNotReady",
				Message: "Deployment is not ready",
			},
		)
		return err
	}
}

// IsDeploymentReady determines if a Kubernetes Deployment is fully ready and available.
// It checks various status fields to ensure all replicas are available and the deployment
// configuration has been fully applied.
func IsDeploymentReady(deployment *appsv1.Deployment) bool {
	if deployment == nil {
		return false
	}
	// Paused deployments should not be considered ready
	if deployment.Spec.Paused {
		return false
	}
	// Default to 1 replica if not specified
	desiredReplicas := int32(1)
	if deployment.Spec.Replicas != nil {
		desiredReplicas = *deployment.Spec.Replicas
	}
	// Special case: if no replicas are desired, the deployment is considered ready
	if desiredReplicas == 0 {
		return true
	}
	status := deployment.Status
	// Check all basic status requirements:
	// 1. ObservedGeneration: Deployment controller has observed the latest configuration
	// 2. UpdatedReplicas: All replicas have been updated to the latest version
	// 3. AvailableReplicas: All desired replicas are available (schedulable and healthy)
	if status.ObservedGeneration < deployment.Generation ||
		status.UpdatedReplicas < desiredReplicas ||
		status.AvailableReplicas < desiredReplicas {
		return false
	}
	// Finally, check for the DeploymentAvailable condition
	// This is Kubernetes' own assessment that the deployment is available
	for _, cond := range deployment.Status.Conditions {
		if cond.Type == appsv1.DeploymentAvailable && cond.Status == corev1.ConditionTrue {
			return true
		}
	}
	// If we get here, the basic checks passed but the Available condition wasn't found
	return false
}

388
func (r *DynamoComponentDeploymentReconciler) reconcilePVC(ctx context.Context, crd *v1alpha1.DynamoComponentDeployment) (*corev1.PersistentVolumeClaim, error) {
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
	logger := log.FromContext(ctx)
	if crd.Spec.PVC == nil {
		return nil, nil
	}
	pvcConfig := *crd.Spec.PVC
	pvc := &corev1.PersistentVolumeClaim{}
	pvcName := types.NamespacedName{Name: getPvcName(crd, pvcConfig.Name), Namespace: crd.GetNamespace()}
	err := r.Get(ctx, pvcName, pvc)
	if err != nil && client.IgnoreNotFound(err) != nil {
		logger.Error(err, "Unable to retrieve PVC", "crd", crd.GetName())
		return nil, err
	}

	// If PVC does not exist, create a new one
	if err != nil {
		if pvcConfig.Create == nil || !*pvcConfig.Create {
			logger.Error(err, "Unknown PVC", "pvc", pvc.Name)
			return nil, err
		}
		pvc = constructPVC(crd, pvcConfig)
409
		if err := controllerutil.SetControllerReference(crd, pvc, r.Client.Scheme()); err != nil {
410
411
412
413
414
415
416
417
418
419
420
421
422
			logger.Error(err, "Failed to set controller reference", "pvc", pvc.Name)
			return nil, err
		}
		err = r.Create(ctx, pvc)
		if err != nil {
			logger.Error(err, "Failed to create pvc", "pvc", pvc.Name)
			return nil, err
		}
		logger.Info("PVC created", "pvc", pvcName)
	}
	return pvc, nil
}

423
424
func (r *DynamoComponentDeploymentReconciler) setStatusConditions(ctx context.Context, req ctrl.Request, conditions ...metav1.Condition) (dynamoComponentDeployment *v1alpha1.DynamoComponentDeployment, err error) {
	dynamoComponentDeployment = &v1alpha1.DynamoComponentDeployment{}
425
426
	maxRetries := 3
	for range maxRetries - 1 {
427
428
		if err = r.Get(ctx, req.NamespacedName, dynamoComponentDeployment); err != nil {
			err = errors.Wrap(err, "Failed to re-fetch DynamoComponentDeployment")
429
430
431
			return
		}
		for _, condition := range conditions {
432
			meta.SetStatusCondition(&dynamoComponentDeployment.Status.Conditions, condition)
433
		}
434
		if err = r.Status().Update(ctx, dynamoComponentDeployment); err != nil {
435
436
437
438
439
			if k8serrors.IsConflict(err) {
				time.Sleep(100 * time.Millisecond)
				continue
			}
			break
440
441
442
443
444
		} else {
			break
		}
	}
	if err != nil {
445
		err = errors.Wrap(err, "Failed to update DynamoComponentDeployment status")
446
447
		return
	}
448
449
	if err = r.Get(ctx, req.NamespacedName, dynamoComponentDeployment); err != nil {
		err = errors.Wrap(err, "Failed to re-fetch DynamoComponentDeployment")
450
451
452
453
454
455
		return
	}
	return
}

//nolint:nakedret
456
457
func (r *DynamoComponentDeploymentReconciler) createOrUpdateOrDeleteDeployments(ctx context.Context, opt generateResourceOption) (modified bool, depl *appsv1.Deployment, err error) {
	containsStealingTrafficDebugModeEnabled := checkIfContainsStealingTrafficDebugModeEnabled(opt.dynamoComponentDeployment)
458
	// create the main deployment
459
460
461
462
463
464
465
466
	modified, depl, err = commonController.SyncResource(ctx, r, opt.dynamoComponentDeployment, func(ctx context.Context) (*appsv1.Deployment, bool, error) {
		return r.generateDeployment(ctx, generateResourceOption{
			dynamoComponentDeployment:               opt.dynamoComponentDeployment,
			dynamoComponent:                         opt.dynamoComponent,
			isStealingTrafficDebugModeEnabled:       false,
			containsStealingTrafficDebugModeEnabled: containsStealingTrafficDebugModeEnabled,
		})
	})
467
468
469
470
	if err != nil {
		err = errors.Wrap(err, "create or update deployment")
		return
	}
471
	// create the debug deployment
472
473
474
475
476
477
478
479
	modified2, _, err := commonController.SyncResource(ctx, r, opt.dynamoComponentDeployment, func(ctx context.Context) (*appsv1.Deployment, bool, error) {
		return r.generateDeployment(ctx, generateResourceOption{
			dynamoComponentDeployment:               opt.dynamoComponentDeployment,
			dynamoComponent:                         opt.dynamoComponent,
			isStealingTrafficDebugModeEnabled:       true,
			containsStealingTrafficDebugModeEnabled: containsStealingTrafficDebugModeEnabled,
		})
	})
480
481
	if err != nil {
		err = errors.Wrap(err, "create or update debug deployment")
482
	}
483
	modified = modified || modified2
484
485
486
	return
}

487
488
func getResourceAnnotations(dynamoComponentDeployment *v1alpha1.DynamoComponentDeployment) map[string]string {
	resourceAnnotations := dynamoComponentDeployment.Spec.Annotations
489
490
491
492
493
494
495
496
497
498
499
500
	if resourceAnnotations == nil {
		resourceAnnotations = map[string]string{}
	}

	return resourceAnnotations
}

func checkIfIsDebugModeEnabled(annotations map[string]string) bool {
	if annotations == nil {
		return false
	}

501
	return annotations[KubeAnnotationEnableDebugMode] == commonconsts.KubeLabelValueTrue
502
503
504
505
506
507
508
}

func checkIfIsStealingTrafficDebugModeEnabled(annotations map[string]string) bool {
	if annotations == nil {
		return false
	}

509
	return annotations[KubeAnnotationEnableStealingTrafficDebugMode] == commonconsts.KubeLabelValueTrue
510
511
512
513
514
515
516
}

func checkIfIsDebugPodReceiveProductionTrafficEnabled(annotations map[string]string) bool {
	if annotations == nil {
		return false
	}

517
	return annotations[KubeAnnotationEnableDebugPodReceiveProductionTraffic] == commonconsts.KubeLabelValueTrue
518
519
}

520
521
func checkIfContainsStealingTrafficDebugModeEnabled(dynamoComponentDeployment *v1alpha1.DynamoComponentDeployment) bool {
	return checkIfIsStealingTrafficDebugModeEnabled(dynamoComponentDeployment.Spec.Annotations)
522
523
524
}

//nolint:nakedret
525
526
func (r *DynamoComponentDeploymentReconciler) createOrUpdateOrDeleteServices(ctx context.Context, opt generateResourceOption) (modified bool, err error) {
	resourceAnnotations := getResourceAnnotations(opt.dynamoComponentDeployment)
527
	isDebugPodReceiveProductionTrafficEnabled := checkIfIsDebugPodReceiveProductionTrafficEnabled(resourceAnnotations)
528
	containsStealingTrafficDebugModeEnabled := checkIfContainsStealingTrafficDebugModeEnabled(opt.dynamoComponentDeployment)
529
	// main generic service
530
531
532
533
534
535
536
537
538
539
	modified, _, err = commonController.SyncResource(ctx, r, opt.dynamoComponentDeployment, func(ctx context.Context) (*corev1.Service, bool, error) {
		return r.generateService(ctx, generateResourceOption{
			dynamoComponentDeployment:               opt.dynamoComponentDeployment,
			dynamoComponent:                         opt.dynamoComponent,
			isStealingTrafficDebugModeEnabled:       false,
			isDebugPodReceiveProductionTraffic:      isDebugPodReceiveProductionTrafficEnabled,
			containsStealingTrafficDebugModeEnabled: containsStealingTrafficDebugModeEnabled,
			isGenericService:                        true,
		})
	})
540
541
542
543
	if err != nil {
		return
	}

544
	// debug production service (if enabled)
545
546
547
548
549
550
551
552
553
554
	modified_, _, err := commonController.SyncResource(ctx, r, opt.dynamoComponentDeployment, func(ctx context.Context) (*corev1.Service, bool, error) {
		return r.generateService(ctx, generateResourceOption{
			dynamoComponentDeployment:               opt.dynamoComponentDeployment,
			dynamoComponent:                         opt.dynamoComponent,
			isStealingTrafficDebugModeEnabled:       false,
			isDebugPodReceiveProductionTraffic:      isDebugPodReceiveProductionTrafficEnabled,
			containsStealingTrafficDebugModeEnabled: containsStealingTrafficDebugModeEnabled,
			isGenericService:                        false,
		})
	})
555
556
557
	if err != nil {
		return
	}
558
559
	modified = modified || modified_
	// debug service (if enabled)
560
561
562
563
564
565
566
567
568
569
	modified_, _, err = commonController.SyncResource(ctx, r, opt.dynamoComponentDeployment, func(ctx context.Context) (*corev1.Service, bool, error) {
		return r.generateService(ctx, generateResourceOption{
			dynamoComponentDeployment:               opt.dynamoComponentDeployment,
			dynamoComponent:                         opt.dynamoComponent,
			isStealingTrafficDebugModeEnabled:       true,
			isDebugPodReceiveProductionTraffic:      isDebugPodReceiveProductionTrafficEnabled,
			containsStealingTrafficDebugModeEnabled: containsStealingTrafficDebugModeEnabled,
			isGenericService:                        false,
		})
	})
570
	if err != nil {
571
572
		return
	}
573
	modified = modified || modified_
574
575
576
	return
}

577
func (r *DynamoComponentDeploymentReconciler) createOrUpdateOrDeleteIngress(ctx context.Context, opt generateResourceOption) (modified bool, err error) {
578
579
580
	modified, _, err = commonController.SyncResource(ctx, r, opt.dynamoComponentDeployment, func(ctx context.Context) (*networkingv1.Ingress, bool, error) {
		return r.generateIngress(ctx, opt)
	})
581
582
583
	if err != nil {
		return
	}
584
585
586
	modified_, _, err := commonController.SyncResource(ctx, r, opt.dynamoComponentDeployment, func(ctx context.Context) (*networkingv1beta1.VirtualService, bool, error) {
		return r.generateVirtualService(ctx, opt)
	})
587
588
589
590
591
592
593
	if err != nil {
		return
	}
	modified = modified || modified_
	return
}

594
func (r *DynamoComponentDeploymentReconciler) generateIngress(ctx context.Context, opt generateResourceOption) (*networkingv1.Ingress, bool, error) {
595
	log := log.FromContext(ctx)
596
597
598
599
	log.Info("Starting generateIngress")

	ingress := &networkingv1.Ingress{
		ObjectMeta: metav1.ObjectMeta{
600
601
			Name:      opt.dynamoComponentDeployment.Name,
			Namespace: opt.dynamoComponentDeployment.Namespace,
602
603
		},
	}
604

605
	if !opt.dynamoComponentDeployment.Spec.Ingress.Enabled || opt.dynamoComponentDeployment.Spec.Ingress.IngressControllerClassName == nil {
606
607
		log.Info("Ingress is not enabled")
		return ingress, true, nil
608
	}
609
	host := getIngressHost(opt.dynamoComponentDeployment.Spec.Ingress)
610
611

	ingress.Spec = networkingv1.IngressSpec{
612
		IngressClassName: opt.dynamoComponentDeployment.Spec.Ingress.IngressControllerClassName,
613
614
		Rules: []networkingv1.IngressRule{
			{
615
				Host: host,
616
617
618
619
620
621
622
623
				IngressRuleValue: networkingv1.IngressRuleValue{
					HTTP: &networkingv1.HTTPIngressRuleValue{
						Paths: []networkingv1.HTTPIngressPath{
							{
								Path:     "/",
								PathType: &[]networkingv1.PathType{networkingv1.PathTypePrefix}[0],
								Backend: networkingv1.IngressBackend{
									Service: &networkingv1.IngressServiceBackend{
624
										Name: opt.dynamoComponentDeployment.Name,
625
										Port: networkingv1.ServiceBackendPort{
626
											Number: commonconsts.DynamoServicePort,
627
628
629
630
631
632
633
634
635
										},
									},
								},
							},
						},
					},
				},
			},
		},
636
	}
637

638
	if opt.dynamoComponentDeployment.Spec.Ingress.TLS != nil {
639
640
641
		ingress.Spec.TLS = []networkingv1.IngressTLS{
			{
				Hosts:      []string{host},
642
				SecretName: opt.dynamoComponentDeployment.Spec.Ingress.TLS.SecretName,
643
644
645
646
			},
		}
	}

647
648
649
	return ingress, false, nil
}

650
func (r *DynamoComponentDeploymentReconciler) generateVirtualService(ctx context.Context, opt generateResourceOption) (*networkingv1beta1.VirtualService, bool, error) {
651
652
653
	log := log.FromContext(ctx)
	log.Info("Starting generateVirtualService")

654
655
	vs := &networkingv1beta1.VirtualService{
		ObjectMeta: metav1.ObjectMeta{
656
657
			Name:      opt.dynamoComponentDeployment.Name,
			Namespace: opt.dynamoComponentDeployment.Namespace,
658
		},
659
660
	}

661
	vsEnabled := opt.dynamoComponentDeployment.Spec.Ingress.Enabled && opt.dynamoComponentDeployment.Spec.Ingress.UseVirtualService && opt.dynamoComponentDeployment.Spec.Ingress.VirtualServiceGateway != nil
662
663
664
665
666
667
668
	if !vsEnabled {
		log.Info("VirtualService is not enabled")
		return vs, true, nil
	}

	vs.Spec = istioNetworking.VirtualService{
		Hosts: []string{
669
			getIngressHost(opt.dynamoComponentDeployment.Spec.Ingress),
670
		},
671
		Gateways: []string{*opt.dynamoComponentDeployment.Spec.Ingress.VirtualServiceGateway},
672
673
674
675
676
677
		Http: []*istioNetworking.HTTPRoute{
			{
				Match: []*istioNetworking.HTTPMatchRequest{
					{
						Uri: &istioNetworking.StringMatch{
							MatchType: &istioNetworking.StringMatch_Prefix{Prefix: "/"},
678
679
						},
					},
680
681
682
683
				},
				Route: []*istioNetworking.HTTPRouteDestination{
					{
						Destination: &istioNetworking.Destination{
684
							Host: opt.dynamoComponentDeployment.Name,
685
							Port: &istioNetworking.PortSelector{
686
								Number: commonconsts.DynamoServicePort,
687
688
689
690
691
692
693
							},
						},
					},
				},
			},
		},
	}
694
	return vs, false, nil
695
696
}

697
func (r *DynamoComponentDeploymentReconciler) getKubeName(dynamoComponentDeployment *v1alpha1.DynamoComponentDeployment, _ *v1alpha1.DynamoComponent, debug bool) string {
698
	if debug {
699
		return fmt.Sprintf("%s-d", dynamoComponentDeployment.Name)
700
	}
701
	return dynamoComponentDeployment.Name
702
703
}

704
func (r *DynamoComponentDeploymentReconciler) getServiceName(dynamoComponentDeployment *v1alpha1.DynamoComponentDeployment, _ *v1alpha1.DynamoComponent, debug bool) string {
705
706
	var kubeName string
	if debug {
707
		kubeName = fmt.Sprintf("%s-d", dynamoComponentDeployment.Name)
708
	} else {
709
		kubeName = fmt.Sprintf("%s-p", dynamoComponentDeployment.Name)
710
711
712
713
	}
	return kubeName
}

714
715
func (r *DynamoComponentDeploymentReconciler) getGenericServiceName(dynamoComponentDeployment *v1alpha1.DynamoComponentDeployment, dynamoComponent *v1alpha1.DynamoComponent) string {
	return r.getKubeName(dynamoComponentDeployment, dynamoComponent, false)
716
717
}

718
func (r *DynamoComponentDeploymentReconciler) getKubeLabels(_ *v1alpha1.DynamoComponentDeployment, dynamoComponent *v1alpha1.DynamoComponent) map[string]string {
719
	labels := map[string]string{
720
		commonconsts.KubeLabelDynamoComponent: dynamoComponent.Name,
721
	}
722
	labels[commonconsts.KubeLabelDynamoComponentType] = commonconsts.DynamoApiServerComponentName
723
724
725
	return labels
}

726
727
func (r *DynamoComponentDeploymentReconciler) getKubeAnnotations(dynamoComponentDeployment *v1alpha1.DynamoComponentDeployment, dynamoComponent *v1alpha1.DynamoComponent) map[string]string {
	dynamoComponentRepositoryName, dynamoComponentVersion := getDynamoComponentRepositoryNameAndDynamoComponentVersion(dynamoComponent)
728
	annotations := map[string]string{
729
730
		commonconsts.KubeAnnotationDynamoRepository: dynamoComponentRepositoryName,
		commonconsts.KubeAnnotationDynamoVersion:    dynamoComponentVersion,
731
732
	}
	var extraAnnotations map[string]string
733
734
	if dynamoComponentDeployment.Spec.ExtraPodMetadata != nil {
		extraAnnotations = dynamoComponentDeployment.Spec.ExtraPodMetadata.Annotations
735
736
737
738
739
740
741
742
743
744
	} else {
		extraAnnotations = map[string]string{}
	}
	for k, v := range extraAnnotations {
		annotations[k] = v
	}
	return annotations
}

//nolint:nakedret
745
746
func (r *DynamoComponentDeploymentReconciler) generateDeployment(ctx context.Context, opt generateResourceOption) (kubeDeployment *appsv1.Deployment, toDelete bool, err error) {
	kubeNs := opt.dynamoComponentDeployment.Namespace
747

748
	labels := r.getKubeLabels(opt.dynamoComponentDeployment, opt.dynamoComponent)
749

750
	annotations := r.getKubeAnnotations(opt.dynamoComponentDeployment, opt.dynamoComponent)
751

752
	kubeName := r.getKubeName(opt.dynamoComponentDeployment, opt.dynamoComponent, opt.isStealingTrafficDebugModeEnabled)
753

754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
	kubeDeployment = &appsv1.Deployment{
		ObjectMeta: metav1.ObjectMeta{
			Name:        kubeName,
			Namespace:   kubeNs,
			Labels:      labels,
			Annotations: annotations,
		},
	}

	if opt.isStealingTrafficDebugModeEnabled && !opt.containsStealingTrafficDebugModeEnabled {
		// if stealing traffic debug mode is enabked but disabled in the deployment, we need to delete the deployment
		return kubeDeployment, true, nil
	}

	// nolint: gosimple
	podTemplateSpec, err := r.generatePodTemplateSpec(ctx, opt)
	if err != nil {
		return
	}

774
775
776
777
778
779
780
781
782
783
784
	defaultMaxSurge := intstr.FromString("25%")
	defaultMaxUnavailable := intstr.FromString("25%")

	strategy := appsv1.DeploymentStrategy{
		Type: appsv1.RollingUpdateDeploymentStrategyType,
		RollingUpdate: &appsv1.RollingUpdateDeployment{
			MaxSurge:       &defaultMaxSurge,
			MaxUnavailable: &defaultMaxUnavailable,
		},
	}

785
	resourceAnnotations := getResourceAnnotations(opt.dynamoComponentDeployment)
786
787
	strategyStr := resourceAnnotations[KubeAnnotationDeploymentStrategy]
	if strategyStr != "" {
788
		strategyType := schemas.DeploymentStrategy(strategyStr)
789
		switch strategyType {
790
		case schemas.DeploymentStrategyRollingUpdate:
791
792
793
794
795
796
797
			strategy = appsv1.DeploymentStrategy{
				Type: appsv1.RollingUpdateDeploymentStrategyType,
				RollingUpdate: &appsv1.RollingUpdateDeployment{
					MaxSurge:       &defaultMaxSurge,
					MaxUnavailable: &defaultMaxUnavailable,
				},
			}
798
		case schemas.DeploymentStrategyRecreate:
799
800
801
			strategy = appsv1.DeploymentStrategy{
				Type: appsv1.RecreateDeploymentStrategyType,
			}
802
		case schemas.DeploymentStrategyRampedSlowRollout:
803
804
805
806
807
808
809
			strategy = appsv1.DeploymentStrategy{
				Type: appsv1.RollingUpdateDeploymentStrategyType,
				RollingUpdate: &appsv1.RollingUpdateDeployment{
					MaxSurge:       &[]intstr.IntOrString{intstr.FromInt(1)}[0],
					MaxUnavailable: &[]intstr.IntOrString{intstr.FromInt(0)}[0],
				},
			}
810
		case schemas.DeploymentStrategyBestEffortControlledRollout:
811
812
813
814
815
816
817
818
819
820
821
			strategy = appsv1.DeploymentStrategy{
				Type: appsv1.RollingUpdateDeploymentStrategyType,
				RollingUpdate: &appsv1.RollingUpdateDeployment{
					MaxSurge:       &[]intstr.IntOrString{intstr.FromInt(0)}[0],
					MaxUnavailable: &[]intstr.IntOrString{intstr.FromString("20%")}[0],
				},
			}
		}
	}

	var replicas *int32
822
	replicas = opt.dynamoComponentDeployment.Spec.Replicas
823
824
825
826
	if opt.isStealingTrafficDebugModeEnabled {
		replicas = &[]int32{int32(1)}[0]
	}

827
828
829
830
	kubeDeployment.Spec = appsv1.DeploymentSpec{
		Replicas: replicas,
		Selector: &metav1.LabelSelector{
			MatchLabels: map[string]string{
831
				commonconsts.KubeLabelDynamoSelector: kubeName,
832
833
			},
		},
834
835
		Template: *podTemplateSpec,
		Strategy: strategy,
836
837
838
839
840
	}

	return
}

841
type generateResourceOption struct {
842
843
	dynamoComponentDeployment               *v1alpha1.DynamoComponentDeployment
	dynamoComponent                         *v1alpha1.DynamoComponent
844
845
846
847
848
	isStealingTrafficDebugModeEnabled       bool
	containsStealingTrafficDebugModeEnabled bool
	isDebugPodReceiveProductionTraffic      bool
	isGenericService                        bool
}
849

850
func (r *DynamoComponentDeploymentReconciler) generateHPA(opt generateResourceOption) (*autoscalingv2.HorizontalPodAutoscaler, bool, error) {
851
	labels := r.getKubeLabels(opt.dynamoComponentDeployment, opt.dynamoComponent)
852

853
	annotations := r.getKubeAnnotations(opt.dynamoComponentDeployment, opt.dynamoComponent)
854

855
	kubeName := r.getKubeName(opt.dynamoComponentDeployment, opt.dynamoComponent, false)
856

857
	kubeNs := opt.dynamoComponentDeployment.Namespace
858

859
	hpaConf := opt.dynamoComponentDeployment.Spec.Autoscaling
860
861
862
863
864
865
866
867

	kubeHpa := &autoscalingv2.HorizontalPodAutoscaler{
		ObjectMeta: metav1.ObjectMeta{
			Name:        kubeName,
			Namespace:   kubeNs,
			Labels:      labels,
			Annotations: annotations,
		},
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
	}

	if hpaConf == nil || !hpaConf.Enabled {
		// if hpa is not enabled, we need to delete the hpa
		return kubeHpa, true, nil
	}

	minReplica := int32(hpaConf.MinReplicas)

	kubeHpa.Spec = autoscalingv2.HorizontalPodAutoscalerSpec{
		MinReplicas: &minReplica,
		MaxReplicas: int32(hpaConf.MaxReplicas),
		ScaleTargetRef: autoscalingv2.CrossVersionObjectReference{
			APIVersion: "apps/v1",
			Kind:       "Deployment",
			Name:       kubeName,
884
		},
885
		Metrics: hpaConf.Metrics,
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
	}

	if len(kubeHpa.Spec.Metrics) == 0 {
		averageUtilization := int32(commonconsts.HPACPUDefaultAverageUtilization)
		kubeHpa.Spec.Metrics = []autoscalingv2.MetricSpec{
			{
				Type: autoscalingv2.ResourceMetricSourceType,
				Resource: &autoscalingv2.ResourceMetricSource{
					Name: corev1.ResourceCPU,
					Target: autoscalingv2.MetricTarget{
						Type:               autoscalingv2.UtilizationMetricType,
						AverageUtilization: &averageUtilization,
					},
				},
			},
		}
	}

904
	return kubeHpa, false, nil
905
906
}

907
908
func getDynamoComponentRepositoryNameAndDynamoComponentVersion(dynamoComponent *v1alpha1.DynamoComponent) (repositoryName string, version string) {
	repositoryName, _, version = xstrings.Partition(dynamoComponent.Spec.DynamoComponent, ":")
909
910
911
912
913

	return
}

//nolint:gocyclo,nakedret
914
915
func (r *DynamoComponentDeploymentReconciler) generatePodTemplateSpec(ctx context.Context, opt generateResourceOption) (podTemplateSpec *corev1.PodTemplateSpec, err error) {
	podLabels := r.getKubeLabels(opt.dynamoComponentDeployment, opt.dynamoComponent)
916
	if opt.isStealingTrafficDebugModeEnabled {
917
		podLabels[commonconsts.KubeLabelDynamoDeploymentTargetType] = DeploymentTargetTypeDebug
918
919
	}

920
	podAnnotations := r.getKubeAnnotations(opt.dynamoComponentDeployment, opt.dynamoComponent)
921

922
	kubeName := r.getKubeName(opt.dynamoComponentDeployment, opt.dynamoComponent, opt.isStealingTrafficDebugModeEnabled)
923

924
	containerPort := commonconsts.DynamoServicePort
925
926
927
928

	var envs []corev1.EnvVar
	envsSeen := make(map[string]struct{})

929
930
	resourceAnnotations := opt.dynamoComponentDeployment.Spec.Annotations
	specEnvs := opt.dynamoComponentDeployment.Spec.Envs
931
932
933
934
935
936
937
938
939
940
941
942
943
944

	if resourceAnnotations == nil {
		resourceAnnotations = make(map[string]string)
	}

	isDebugModeEnabled := checkIfIsDebugModeEnabled(resourceAnnotations)

	if specEnvs != nil {
		envs = make([]corev1.EnvVar, 0, len(specEnvs)+1)

		for _, env := range specEnvs {
			if _, ok := envsSeen[env.Name]; ok {
				continue
			}
945
			if env.Name == commonconsts.EnvDynamoServicePort {
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
				// nolint: gosec
				containerPort, err = strconv.Atoi(env.Value)
				if err != nil {
					return nil, errors.Wrapf(err, "invalid port value %s", env.Value)
				}
			}
			envsSeen[env.Name] = struct{}{}
			envs = append(envs, corev1.EnvVar{
				Name:  env.Name,
				Value: env.Value,
			})
		}
	}

	defaultEnvs := []corev1.EnvVar{
		{
962
			Name:  commonconsts.EnvDynamoServicePort,
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
			Value: fmt.Sprintf("%d", containerPort),
		},
	}

	if r.NatsAddr != "" {
		defaultEnvs = append(defaultEnvs, corev1.EnvVar{
			Name:  "NATS_SERVER",
			Value: r.NatsAddr,
		})
	}

	if r.EtcdAddr != "" {
		defaultEnvs = append(defaultEnvs, corev1.EnvVar{
			Name:  "ETCD_ENDPOINTS",
			Value: r.EtcdAddr,
		})
	}

	for _, env := range defaultEnvs {
		if _, ok := envsSeen[env.Name]; !ok {
			envs = append(envs, env)
		}
	}

987
	var livenessProbe *corev1.Probe
988
989
	if opt.dynamoComponentDeployment.Spec.LivenessProbe != nil {
		livenessProbe = opt.dynamoComponentDeployment.Spec.LivenessProbe
990
991
	}

992
	var readinessProbe *corev1.Probe
993
994
	if opt.dynamoComponentDeployment.Spec.ReadinessProbe != nil {
		readinessProbe = opt.dynamoComponentDeployment.Spec.ReadinessProbe
995
996
997
998
999
1000
1001
	}

	volumes := make([]corev1.Volume, 0)
	volumeMounts := make([]corev1.VolumeMount, 0)

	args := make([]string, 0)

1002
	args = append(args, "cd", "src", "&&", "uv", "run", "dynamo", "serve")
1003

1004
1005
	// todo : remove this line when https://github.com/ai-dynamo/dynamo/issues/345 is fixed
	enableDependsOption := false
1006
	if len(opt.dynamoComponentDeployment.Spec.ExternalServices) > 0 && enableDependsOption {
1007
		serviceSuffix := fmt.Sprintf("%s.svc.cluster.local:%d", opt.dynamoComponentDeployment.Namespace, containerPort)
1008
		keys := make([]string, 0, len(opt.dynamoComponentDeployment.Spec.ExternalServices))
1009

1010
		for key := range opt.dynamoComponentDeployment.Spec.ExternalServices {
1011
1012
1013
1014
1015
			keys = append(keys, key)
		}

		sort.Strings(keys)
		for _, key := range keys {
1016
			service := opt.dynamoComponentDeployment.Spec.ExternalServices[key]
1017
1018
1019
1020
1021

			// Check if DeploymentSelectorKey is not "name"
			if service.DeploymentSelectorKey == "name" {
				dependsFlag := fmt.Sprintf("--depends \"%s=http://%s.%s\"", key, service.DeploymentSelectorValue, serviceSuffix)
				args = append(args, dependsFlag)
1022
1023
			} else if service.DeploymentSelectorKey == "dynamo" {
				dependsFlag := fmt.Sprintf("--depends \"%s=dynamo://%s\"", key, service.DeploymentSelectorValue)
1024
1025
				args = append(args, dependsFlag)
			} else {
1026
				return nil, errors.Errorf("DeploymentSelectorKey '%s' not supported. Only 'name' and 'dynamo' are supported", service.DeploymentSelectorKey)
1027
1028
1029
1030
			}
		}
	}

1031
1032
1033
1034
1035
	if opt.dynamoComponentDeployment.Spec.ServiceName != "" {
		args = append(args, []string{"--service-name", opt.dynamoComponentDeployment.Spec.ServiceName}...)
		args = append(args, opt.dynamoComponentDeployment.Spec.DynamoTag)
		if opt.dynamoComponentDeployment.Spec.DynamoNamespace != nil && *opt.dynamoComponentDeployment.Spec.DynamoNamespace != "" {
			args = append(args, fmt.Sprintf("--%s.ServiceArgs.dynamo.namespace=%s", opt.dynamoComponentDeployment.Spec.ServiceName, *opt.dynamoComponentDeployment.Spec.DynamoNamespace))
1036
		}
1037
		args = append(args, fmt.Sprintf("--%s.environment=%s", opt.dynamoComponentDeployment.Spec.ServiceName, KubernetesDeploymentStrategy))
1038
1039
	}

1040
1041
	if len(opt.dynamoComponentDeployment.Spec.Envs) > 0 {
		for _, env := range opt.dynamoComponentDeployment.Spec.Envs {
1042
1043
1044
1045
1046
1047
			if env.Name == "DYNAMO_CONFIG_PATH" {
				args = append(args, "-f", env.Value)
			}
		}
	}

1048
	dynamoResources := opt.dynamoComponentDeployment.Spec.Resources
1049

1050
	resources, err := getResourcesConfig(dynamoResources)
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
	if err != nil {
		err = errors.Wrap(err, "failed to get resources config")
		return nil, err
	}

	sharedMemorySizeLimit := resource.MustParse("64Mi")
	memoryLimit := resources.Limits[corev1.ResourceMemory]
	if !memoryLimit.IsZero() {
		sharedMemorySizeLimit.SetMilli(memoryLimit.MilliValue() / 2)
	}

	volumes = append(volumes, corev1.Volume{
		Name: KubeValueNameSharedMemory,
		VolumeSource: corev1.VolumeSource{
			EmptyDir: &corev1.EmptyDirVolumeSource{
				Medium:    corev1.StorageMediumMemory,
				SizeLimit: &sharedMemorySizeLimit,
			},
		},
	})
	volumeMounts = append(volumeMounts, corev1.VolumeMount{
		Name:      KubeValueNameSharedMemory,
		MountPath: "/dev/shm",
	})
1075
	if opt.dynamoComponentDeployment.Spec.PVC != nil {
1076
		volumes = append(volumes, corev1.Volume{
1077
			Name: getPvcName(opt.dynamoComponentDeployment, opt.dynamoComponentDeployment.Spec.PVC.Name),
1078
1079
			VolumeSource: corev1.VolumeSource{
				PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{
1080
					ClaimName: getPvcName(opt.dynamoComponentDeployment, opt.dynamoComponentDeployment.Spec.PVC.Name),
1081
1082
1083
1084
				},
			},
		})
		volumeMounts = append(volumeMounts, corev1.VolumeMount{
1085
1086
			Name:      getPvcName(opt.dynamoComponentDeployment, opt.dynamoComponentDeployment.Spec.PVC.Name),
			MountPath: *opt.dynamoComponentDeployment.Spec.PVC.MountPoint,
1087
1088
1089
		})
	}

1090
1091
1092
1093
	imageName := opt.dynamoComponent.GetImage()
	if imageName == "" {
		return nil, errors.Errorf("image is not ready for component %s", opt.dynamoComponent.Name)
	}
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133

	var securityContext *corev1.SecurityContext
	var mainContainerSecurityContext *corev1.SecurityContext

	enableRestrictedSecurityContext := os.Getenv("ENABLE_RESTRICTED_SECURITY_CONTEXT") == "true"
	if enableRestrictedSecurityContext {
		securityContext = &corev1.SecurityContext{
			AllowPrivilegeEscalation: ptr.To(false),
			RunAsNonRoot:             ptr.To(true),
			RunAsUser:                ptr.To(int64(1000)),
			RunAsGroup:               ptr.To(int64(1000)),
			SeccompProfile: &corev1.SeccompProfile{
				Type: corev1.SeccompProfileTypeRuntimeDefault,
			},
			Capabilities: &corev1.Capabilities{
				Drop: []corev1.Capability{"ALL"},
			},
		}
		mainContainerSecurityContext = securityContext.DeepCopy()
		mainContainerSecurityContext.RunAsUser = ptr.To(int64(1034))
	}

	containers := make([]corev1.Container, 0, 2)

	// TODO: Temporarily disabling probes
	container := corev1.Container{
		Name:           "main",
		Image:          imageName,
		Command:        []string{"sh", "-c"},
		Args:           []string{strings.Join(args, " ")},
		LivenessProbe:  livenessProbe,
		ReadinessProbe: readinessProbe,
		Resources:      resources,
		Env:            envs,
		TTY:            true,
		Stdin:          true,
		VolumeMounts:   volumeMounts,
		Ports: []corev1.ContainerPort{
			{
				Protocol:      corev1.ProtocolTCP,
1134
				Name:          commonconsts.DynamoContainerPortName,
1135
1136
1137
1138
1139
1140
				ContainerPort: int32(containerPort), // nolint: gosec
			},
		},
		SecurityContext: mainContainerSecurityContext,
	}

1141
	if opt.dynamoComponentDeployment.Spec.EnvFromSecret != nil {
1142
1143
1144
1145
		container.EnvFrom = []corev1.EnvFromSource{
			{
				SecretRef: &corev1.SecretEnvSource{
					LocalObjectReference: corev1.LocalObjectReference{
1146
						Name: *opt.dynamoComponentDeployment.Spec.EnvFromSecret,
1147
1148
1149
1150
1151
1152
					},
				},
			},
		}
	}

1153
	if resourceAnnotations["nvidia.com/enable-container-privileged"] == commonconsts.KubeLabelValueTrue {
1154
1155
1156
1157
1158
1159
		if container.SecurityContext == nil {
			container.SecurityContext = &corev1.SecurityContext{}
		}
		container.SecurityContext.Privileged = &[]bool{true}[0]
	}

1160
	if resourceAnnotations["nvidia.com/enable-container-ptrace"] == commonconsts.KubeLabelValueTrue {
1161
1162
1163
1164
1165
1166
1167
1168
		if container.SecurityContext == nil {
			container.SecurityContext = &corev1.SecurityContext{}
		}
		container.SecurityContext.Capabilities = &corev1.Capabilities{
			Add: []corev1.Capability{"SYS_PTRACE"},
		}
	}

1169
	if resourceAnnotations["nvidia.com/run-container-as-root"] == commonconsts.KubeLabelValueTrue {
1170
1171
1172
1173
1174
1175
1176
1177
		if container.SecurityContext == nil {
			container.SecurityContext = &corev1.SecurityContext{}
		}
		container.SecurityContext.RunAsUser = &[]int64{0}[0]
	}

	containers = append(containers, container)

1178
	debuggerImage := "python:3.12-slim"
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
	debuggerImage_ := os.Getenv("INTERNAL_IMAGES_DEBUGGER")
	if debuggerImage_ != "" {
		debuggerImage = debuggerImage_
	}

	if opt.isStealingTrafficDebugModeEnabled || isDebugModeEnabled {
		containers = append(containers, corev1.Container{
			Name:  "debugger",
			Image: debuggerImage,
			Command: []string{
				"sleep",
				"infinity",
			},
			SecurityContext: &corev1.SecurityContext{
				Capabilities: &corev1.Capabilities{
					Add: []corev1.Capability{"SYS_PTRACE"},
				},
			},
			Resources: corev1.ResourceRequirements{
				Requests: corev1.ResourceList{
					corev1.ResourceCPU:    resource.MustParse("100m"),
					corev1.ResourceMemory: resource.MustParse("100Mi"),
				},
				Limits: corev1.ResourceList{
					corev1.ResourceCPU:    resource.MustParse("1000m"),
					corev1.ResourceMemory: resource.MustParse("1000Mi"),
				},
			},
			Stdin: true,
			TTY:   true,
		})
	}

1212
	podLabels[commonconsts.KubeLabelDynamoSelector] = kubeName
1213
1214
1215
1216
1217
1218

	podSpec := corev1.PodSpec{
		Containers: containers,
		Volumes:    volumes,
	}

1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
	podSpec.ImagePullSecrets = []corev1.LocalObjectReference{
		{
			Name: config.GetDockerRegistryConfig().SecretName,
		},
	}
	if opt.dynamoComponent.Spec.DockerConfigJSONSecretName != "" {
		podSpec.ImagePullSecrets = append(podSpec.ImagePullSecrets, corev1.LocalObjectReference{
			Name: opt.dynamoComponent.Spec.DockerConfigJSONSecretName,
		})
	}
	podSpec.ImagePullSecrets = append(podSpec.ImagePullSecrets, opt.dynamoComponent.Spec.ImagePullSecrets...)
1230

1231
	extraPodMetadata := opt.dynamoComponentDeployment.Spec.ExtraPodMetadata
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242

	if extraPodMetadata != nil {
		for k, v := range extraPodMetadata.Annotations {
			podAnnotations[k] = v
		}

		for k, v := range extraPodMetadata.Labels {
			podLabels[k] = v
		}
	}

1243
	extraPodSpec := opt.dynamoComponentDeployment.Spec.ExtraPodSpec
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256

	if extraPodSpec != nil {
		podSpec.SchedulerName = extraPodSpec.SchedulerName
		podSpec.NodeSelector = extraPodSpec.NodeSelector
		podSpec.Affinity = extraPodSpec.Affinity
		podSpec.Tolerations = extraPodSpec.Tolerations
		podSpec.TopologySpreadConstraints = extraPodSpec.TopologySpreadConstraints
		podSpec.Containers = append(podSpec.Containers, extraPodSpec.Containers...)
		podSpec.ServiceAccountName = extraPodSpec.ServiceAccountName
	}

	if podSpec.ServiceAccountName == "" {
		serviceAccounts := &corev1.ServiceAccountList{}
1257
		err = r.List(ctx, serviceAccounts, client.InNamespace(opt.dynamoComponentDeployment.Namespace), client.MatchingLabels{
1258
			commonconsts.KubeLabelDynamoDeploymentPod: commonconsts.KubeLabelValueTrue,
1259
1260
		})
		if err != nil {
1261
			err = errors.Wrapf(err, "failed to list service accounts in namespace %s", opt.dynamoComponentDeployment.Namespace)
1262
1263
1264
1265
1266
1267
1268
1269
1270
			return
		}
		if len(serviceAccounts.Items) > 0 {
			podSpec.ServiceAccountName = serviceAccounts.Items[0].Name
		} else {
			podSpec.ServiceAccountName = DefaultServiceAccountName
		}
	}

1271
	if resourceAnnotations["nvidia.com/enable-host-ipc"] == commonconsts.KubeLabelValueTrue {
1272
1273
1274
		podSpec.HostIPC = true
	}

1275
	if resourceAnnotations["nvidia.com/enable-host-network"] == commonconsts.KubeLabelValueTrue {
1276
1277
1278
		podSpec.HostNetwork = true
	}

1279
	if resourceAnnotations["nvidia.com/enable-host-pid"] == commonconsts.KubeLabelValueTrue {
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
		podSpec.HostPID = true
	}

	if opt.isStealingTrafficDebugModeEnabled || isDebugModeEnabled {
		podSpec.ShareProcessNamespace = &[]bool{true}[0]
	}

	podTemplateSpec = &corev1.PodTemplateSpec{
		ObjectMeta: metav1.ObjectMeta{
			Labels:      podLabels,
			Annotations: podAnnotations,
		},
		Spec: podSpec,
	}

	return
}

Neelay Shah's avatar
Neelay Shah committed
1298
func getResourcesConfig(resources *dynamoCommon.Resources) (corev1.ResourceRequirements, error) {
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
	currentResources := corev1.ResourceRequirements{
		Requests: corev1.ResourceList{
			corev1.ResourceCPU:    resource.MustParse("300m"),
			corev1.ResourceMemory: resource.MustParse("500Mi"),
		},
		Limits: corev1.ResourceList{
			corev1.ResourceCPU:    resource.MustParse("500m"),
			corev1.ResourceMemory: resource.MustParse("1Gi"),
		},
	}

	if resources == nil {
		return currentResources, nil
	}

	if resources.Limits != nil {
		if resources.Limits.CPU != "" {
			q, err := resource.ParseQuantity(resources.Limits.CPU)
			if err != nil {
				return currentResources, errors.Wrapf(err, "parse limits cpu quantity")
			}
			if currentResources.Limits == nil {
				currentResources.Limits = make(corev1.ResourceList)
			}
			currentResources.Limits[corev1.ResourceCPU] = q
		}
		if resources.Limits.Memory != "" {
			q, err := resource.ParseQuantity(resources.Limits.Memory)
			if err != nil {
				return currentResources, errors.Wrapf(err, "parse limits memory quantity")
			}
			if currentResources.Limits == nil {
				currentResources.Limits = make(corev1.ResourceList)
			}
			currentResources.Limits[corev1.ResourceMemory] = q
		}
		if resources.Limits.GPU != "" {
			q, err := resource.ParseQuantity(resources.Limits.GPU)
			if err != nil {
				return currentResources, errors.Wrapf(err, "parse limits gpu quantity")
			}
			if currentResources.Limits == nil {
				currentResources.Limits = make(corev1.ResourceList)
			}
			currentResources.Limits[commonconsts.KubeResourceGPUNvidia] = q
		}
		for k, v := range resources.Limits.Custom {
			q, err := resource.ParseQuantity(v)
			if err != nil {
				return currentResources, errors.Wrapf(err, "parse limits %s quantity", k)
			}
			if currentResources.Limits == nil {
				currentResources.Limits = make(corev1.ResourceList)
			}
			currentResources.Limits[corev1.ResourceName(k)] = q
		}
	}
	if resources.Requests != nil {
		if resources.Requests.CPU != "" {
			q, err := resource.ParseQuantity(resources.Requests.CPU)
			if err != nil {
				return currentResources, errors.Wrapf(err, "parse requests cpu quantity")
			}
			if currentResources.Requests == nil {
				currentResources.Requests = make(corev1.ResourceList)
			}
			currentResources.Requests[corev1.ResourceCPU] = q
		}
		if resources.Requests.Memory != "" {
			q, err := resource.ParseQuantity(resources.Requests.Memory)
			if err != nil {
				return currentResources, errors.Wrapf(err, "parse requests memory quantity")
			}
			if currentResources.Requests == nil {
				currentResources.Requests = make(corev1.ResourceList)
			}
			currentResources.Requests[corev1.ResourceMemory] = q
		}
		for k, v := range resources.Requests.Custom {
			q, err := resource.ParseQuantity(v)
			if err != nil {
				return currentResources, errors.Wrapf(err, "parse requests %s quantity", k)
			}
			if currentResources.Requests == nil {
				currentResources.Requests = make(corev1.ResourceList)
			}
			currentResources.Requests[corev1.ResourceName(k)] = q
		}
	}
	return currentResources, nil
}

//nolint:nakedret
1392
func (r *DynamoComponentDeploymentReconciler) generateService(_ context.Context, opt generateResourceOption) (kubeService *corev1.Service, toDelete bool, err error) {
1393
1394
	var kubeName string
	if opt.isGenericService {
1395
		kubeName = r.getGenericServiceName(opt.dynamoComponentDeployment, opt.dynamoComponent)
1396
	} else {
1397
		kubeName = r.getServiceName(opt.dynamoComponentDeployment, opt.dynamoComponent, opt.isStealingTrafficDebugModeEnabled)
1398
1399
	}

1400
	kubeNs := opt.dynamoComponentDeployment.Namespace
1401
1402
1403
1404
1405
1406
1407
1408

	kubeService = &corev1.Service{
		ObjectMeta: metav1.ObjectMeta{
			Name:      kubeName,
			Namespace: kubeNs,
		},
	}

1409
1410
	if !opt.dynamoComponentDeployment.IsMainComponent() || (!opt.isGenericService && !opt.containsStealingTrafficDebugModeEnabled) {
		// if it's not the main component or if it's not a generic service and not contains stealing traffic debug mode enabled, we don't need to create the service
1411
1412
1413
		return kubeService, true, nil
	}

1414
	labels := r.getKubeLabels(opt.dynamoComponentDeployment, opt.dynamoComponent)
1415
1416
1417
1418
1419
1420
1421
1422

	selector := make(map[string]string)

	for k, v := range labels {
		selector[k] = v
	}

	if opt.isStealingTrafficDebugModeEnabled {
1423
		selector[commonconsts.KubeLabelDynamoDeploymentTargetType] = DeploymentTargetTypeDebug
1424
1425
	}

1426
	targetPort := intstr.FromString(commonconsts.DynamoContainerPortName)
1427
1428
1429
1430
1431

	spec := corev1.ServiceSpec{
		Selector: selector,
		Ports: []corev1.ServicePort{
			{
1432
1433
				Name:       commonconsts.DynamoServicePortName,
				Port:       commonconsts.DynamoServicePort,
1434
1435
1436
1437
1438
1439
				TargetPort: targetPort,
				Protocol:   corev1.ProtocolTCP,
			},
		},
	}

1440
	annotations := r.getKubeAnnotations(opt.dynamoComponentDeployment, opt.dynamoComponent)
1441

1442
1443
1444
	kubeService.ObjectMeta.Annotations = annotations
	kubeService.ObjectMeta.Labels = labels
	kubeService.Spec = spec
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466

	return
}

type TLSModeOpt string

const (
	TLSModeNone   TLSModeOpt = "none"
	TLSModeAuto   TLSModeOpt = "auto"
	TLSModeStatic TLSModeOpt = "static"
)

type IngressConfig struct {
	ClassName           *string
	Annotations         map[string]string
	Path                string
	PathType            networkingv1.PathType
	TLSMode             TLSModeOpt
	StaticTLSSecretName string
}

// SetupWithManager sets up the controller with the Manager.
1467
func (r *DynamoComponentDeploymentReconciler) SetupWithManager(mgr ctrl.Manager) error {
1468
1469

	m := ctrl.NewControllerManagedBy(mgr).
1470
		For(&v1alpha1.DynamoComponentDeployment{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})).
1471
1472
1473
1474
1475
1476
1477
		Owns(&appsv1.Deployment{}, builder.WithPredicates(predicate.Funcs{
			// ignore creation cause we don't want to be called again after we create the deployment
			CreateFunc:  func(ce event.CreateEvent) bool { return false },
			DeleteFunc:  func(de event.DeleteEvent) bool { return true },
			UpdateFunc:  func(de event.UpdateEvent) bool { return true },
			GenericFunc: func(ge event.GenericEvent) bool { return true },
		})).
1478
1479
1480
		Owns(&corev1.Service{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})).
		Owns(&networkingv1.Ingress{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})).
		Owns(&corev1.PersistentVolumeClaim{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})).
1481
		WithEventFilter(controller_common.EphemeralDeploymentEventFilter(r.Config))
1482

1483
	if r.UseVirtualService {
1484
1485
		m.Owns(&networkingv1beta1.VirtualService{}, builder.WithPredicates(predicate.GenerationChangedPredicate{}))
	}
1486
1487
1488
	m.Owns(&autoscalingv2.HorizontalPodAutoscaler{})
	return m.Complete(r)
}
1489
1490
1491
1492

func (r *DynamoComponentDeploymentReconciler) GetRecorder() record.EventRecorder {
	return r.Recorder
}