"vscode:/vscode.git/clone" did not exist on "cc583b2fe14a7d606f1ccc302887e059c5ba63d8"
Unverified Commit dad4237d authored by Julien Mancuso's avatar Julien Mancuso Committed by GitHub
Browse files

chore: remove DCD debug features and consolidate k8s service generation (#6397)


Signed-off-by: default avatarJulien Mancuso <jmancuso@nvidia.com>
parent 0688d584
...@@ -115,8 +115,6 @@ The chart includes built-in validation to prevent all operator conflicts: ...@@ -115,8 +115,6 @@ The chart includes built-in validation to prevent all operator conflicts:
| dynamo-operator.controllerManager.manager.args[1] | string | `"--metrics-bind-address=127.0.0.1:8080"` | Metrics endpoint for Prometheus scraping (localhost only for security) | | dynamo-operator.controllerManager.manager.args[1] | string | `"--metrics-bind-address=127.0.0.1:8080"` | Metrics endpoint for Prometheus scraping (localhost only for security) |
| dynamo-operator.imagePullSecrets | list | `[]` | Secrets for pulling private container images | | dynamo-operator.imagePullSecrets | list | `[]` | Secrets for pulling private container images |
| dynamo-operator.dynamo.groveTerminationDelay | string | `"4h"` | How long to wait before forcefully terminating Grove instances | | dynamo-operator.dynamo.groveTerminationDelay | string | `"4h"` | How long to wait before forcefully terminating Grove instances |
| dynamo-operator.dynamo.internalImages.debugger | string | `"python:3.12-slim"` | Debugger image for troubleshooting deployments |
| dynamo-operator.dynamo.enableRestrictedSecurityContext | bool | `false` | Whether to enable restricted security contexts for enhanced security |
| dynamo-operator.dynamo.dockerRegistry.useKubernetesSecret | bool | `false` | Whether to use Kubernetes secrets for registry authentication | | dynamo-operator.dynamo.dockerRegistry.useKubernetesSecret | bool | `false` | Whether to use Kubernetes secrets for registry authentication |
| dynamo-operator.dynamo.dockerRegistry.server | string | `nil` | Docker registry server URL | | dynamo-operator.dynamo.dockerRegistry.server | string | `nil` | Docker registry server URL |
| dynamo-operator.dynamo.dockerRegistry.username | string | `nil` | Registry username | | dynamo-operator.dynamo.dockerRegistry.username | string | `nil` | Registry username |
......
...@@ -177,9 +177,6 @@ spec: ...@@ -177,9 +177,6 @@ spec:
env: env:
- name: KUBERNETES_CLUSTER_DOMAIN - name: KUBERNETES_CLUSTER_DOMAIN
value: {{ quote .Values.kubernetesClusterDomain }} value: {{ quote .Values.kubernetesClusterDomain }}
envFrom:
- secretRef:
name: dynamo-deployment-env
imagePullPolicy: {{ .Values.controllerManager.manager.image.pullPolicy | quote }} imagePullPolicy: {{ .Values.controllerManager.manager.image.pullPolicy | quote }}
image: {{ .Values.controllerManager.manager.image.repository }}:{{ .Values.controllerManager.manager.image.tag image: {{ .Values.controllerManager.manager.image.repository }}:{{ .Values.controllerManager.manager.image.tag
| default .Chart.AppVersion }} | default .Chart.AppVersion }}
......
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
apiVersion: v1
kind: Secret
metadata:
name: dynamo-deployment-env
namespace: {{ .Release.Namespace }}
labels:
{{- include "dynamo-operator.labels" . | nindent 4 }}
type: Opaque
stringData:
INTERNAL_IMAGES_DEBUGGER: {{ .Values.dynamo.internalImages.debugger | quote }}
{{- if .Values.dynamo.enableRestrictedSecurityContext }}
ENABLE_RESTRICTED_SECURITY_CONTEXT: "true"
{{- end }}
{{- if .Values.dynamo.dynamoIngressSuffix }}
DYNAMO_INGRESS_SUFFIX: {{ .Values.dynamo.dynamoIngressSuffix | quote }}
{{- end }}
\ No newline at end of file
...@@ -109,11 +109,6 @@ dynamo: ...@@ -109,11 +109,6 @@ dynamo:
groveTerminationDelay: 15m groveTerminationDelay: 15m
internalImages:
debugger: python:3.12-slim
enableRestrictedSecurityContext: false
dockerRegistry: dockerRegistry:
server: '' server: ''
# set to true if you want to use the kubernetes secret for the registry credentials # set to true if you want to use the kubernetes secret for the registry credentials
......
...@@ -96,14 +96,6 @@ dynamo-operator: ...@@ -96,14 +96,6 @@ dynamo-operator:
# -- How long to wait before forcefully terminating Grove instances # -- How long to wait before forcefully terminating Grove instances
groveTerminationDelay: 4h groveTerminationDelay: 4h
# Internal utility images used by the platform
internalImages:
# -- Debugger image for troubleshooting deployments
debugger: python:3.12-slim
# -- Whether to enable restricted security contexts for enhanced security
enableRestrictedSecurityContext: false
# Docker registry configuration for private repositories # Docker registry configuration for private repositories
dockerRegistry: dockerRegistry:
# -- Whether to use Kubernetes secrets for registry authentication # -- Whether to use Kubernetes secrets for registry authentication
......
...@@ -43,18 +43,17 @@ const ( ...@@ -43,18 +43,17 @@ const (
KubeAnnotationDisableImagePullSecretDiscovery = "nvidia.com/disable-image-pull-secret-discovery" KubeAnnotationDisableImagePullSecretDiscovery = "nvidia.com/disable-image-pull-secret-discovery"
KubeAnnotationDynamoDiscoveryBackend = "nvidia.com/dynamo-discovery-backend" KubeAnnotationDynamoDiscoveryBackend = "nvidia.com/dynamo-discovery-backend"
KubeLabelDynamoGraphDeploymentName = "nvidia.com/dynamo-graph-deployment-name" KubeLabelDynamoGraphDeploymentName = "nvidia.com/dynamo-graph-deployment-name"
KubeLabelDynamoComponent = "nvidia.com/dynamo-component" KubeLabelDynamoComponent = "nvidia.com/dynamo-component"
KubeLabelDynamoNamespace = "nvidia.com/dynamo-namespace" KubeLabelDynamoNamespace = "nvidia.com/dynamo-namespace"
KubeLabelDynamoDeploymentTargetType = "nvidia.com/dynamo-deployment-target-type" KubeLabelDynamoComponentType = "nvidia.com/dynamo-component-type"
KubeLabelDynamoComponentType = "nvidia.com/dynamo-component-type" KubeLabelDynamoSubComponentType = "nvidia.com/dynamo-sub-component-type"
KubeLabelDynamoSubComponentType = "nvidia.com/dynamo-sub-component-type" KubeLabelDynamoBaseModel = "nvidia.com/dynamo-base-model"
KubeLabelDynamoBaseModel = "nvidia.com/dynamo-base-model" KubeLabelDynamoBaseModelHash = "nvidia.com/dynamo-base-model-hash"
KubeLabelDynamoBaseModelHash = "nvidia.com/dynamo-base-model-hash" KubeAnnotationDynamoBaseModel = "nvidia.com/dynamo-base-model"
KubeAnnotationDynamoBaseModel = "nvidia.com/dynamo-base-model" KubeLabelDynamoDiscoveryBackend = "nvidia.com/dynamo-discovery-backend"
KubeLabelDynamoDiscoveryBackend = "nvidia.com/dynamo-discovery-backend" KubeLabelDynamoDiscoveryEnabled = "nvidia.com/dynamo-discovery-enabled"
KubeLabelDynamoDiscoveryEnabled = "nvidia.com/dynamo-discovery-enabled" KubeLabelDynamoWorkerHash = "nvidia.com/dynamo-worker-hash"
KubeLabelDynamoWorkerHash = "nvidia.com/dynamo-worker-hash"
KubeLabelValueFalse = "false" KubeLabelValueFalse = "false"
KubeLabelValueTrue = "true" KubeLabelValueTrue = "true"
......
...@@ -23,7 +23,6 @@ import ( ...@@ -23,7 +23,6 @@ import (
"context" "context"
"fmt" "fmt"
"maps" "maps"
"os"
"slices" "slices"
"time" "time"
...@@ -45,7 +44,6 @@ import ( ...@@ -45,7 +44,6 @@ import (
networkingv1beta1 "istio.io/client-go/pkg/apis/networking/v1beta1" networkingv1beta1 "istio.io/client-go/pkg/apis/networking/v1beta1"
k8serrors "k8s.io/apimachinery/pkg/api/errors" k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/client-go/tools/record" "k8s.io/client-go/tools/record"
...@@ -61,15 +59,11 @@ import ( ...@@ -61,15 +59,11 @@ import (
) )
const ( const (
DefaultClusterName = "default" DefaultClusterName = "default"
DefaultServiceAccountName = "default" DefaultServiceAccountName = "default"
KubeAnnotationDeploymentStrategy = "nvidia.com/deployment-strategy" KubeAnnotationDeploymentStrategy = "nvidia.com/deployment-strategy"
KubeAnnotationDeploymentRollingUpdateMaxSurge = "nvidia.com/deployment-rolling-update-max-surge" KubeAnnotationDeploymentRollingUpdateMaxSurge = "nvidia.com/deployment-rolling-update-max-surge"
KubeAnnotationDeploymentRollingUpdateMaxUnavailable = "nvidia.com/deployment-rolling-update-max-unavailable" KubeAnnotationDeploymentRollingUpdateMaxUnavailable = "nvidia.com/deployment-rolling-update-max-unavailable"
KubeAnnotationEnableStealingTrafficDebugMode = "nvidia.com/enable-stealing-traffic-debug-mode"
KubeAnnotationEnableDebugMode = "nvidia.com/enable-debug-mode"
KubeAnnotationEnableDebugPodReceiveProductionTraffic = "nvidia.com/enable-debug-pod-receive-production-traffic"
DeploymentTargetTypeDebug = "debug"
) )
// DynamoComponentDeploymentReconciler reconciles a DynamoComponentDeployment object // DynamoComponentDeploymentReconciler reconciles a DynamoComponentDeployment object
...@@ -360,10 +354,8 @@ func (r *DynamoComponentDeploymentReconciler) reconcileLeaderWorkerSetResources( ...@@ -360,10 +354,8 @@ func (r *DynamoComponentDeploymentReconciler) reconcileLeaderWorkerSetResources(
for i := range int(desiredReplicas) { for i := range int(desiredReplicas) {
volcanoPodGroupModified, _, err := commonController.SyncResource(ctx, r, dynamoComponentDeployment, func(ctx context.Context) (*volcanov1beta1.PodGroup, bool, error) { volcanoPodGroupModified, _, err := commonController.SyncResource(ctx, r, dynamoComponentDeployment, func(ctx context.Context) (*volcanov1beta1.PodGroup, bool, error) {
return r.generateVolcanoPodGroup(ctx, generateResourceOption{ return r.generateVolcanoPodGroup(ctx, generateResourceOption{
dynamoComponentDeployment: dynamoComponentDeployment, dynamoComponentDeployment: dynamoComponentDeployment,
isStealingTrafficDebugModeEnabled: false, instanceID: &i,
containsStealingTrafficDebugModeEnabled: false,
instanceID: &i,
}) })
}) })
if err != nil { if err != nil {
...@@ -372,10 +364,8 @@ func (r *DynamoComponentDeploymentReconciler) reconcileLeaderWorkerSetResources( ...@@ -372,10 +364,8 @@ func (r *DynamoComponentDeploymentReconciler) reconcileLeaderWorkerSetResources(
leaderWorkerSetModified, lwsObj, err := commonController.SyncResource(ctx, r, dynamoComponentDeployment, func(ctx context.Context) (*leaderworkersetv1.LeaderWorkerSet, bool, error) { leaderWorkerSetModified, lwsObj, err := commonController.SyncResource(ctx, r, dynamoComponentDeployment, func(ctx context.Context) (*leaderworkersetv1.LeaderWorkerSet, bool, error) {
return r.generateLeaderWorkerSet(ctx, generateResourceOption{ return r.generateLeaderWorkerSet(ctx, generateResourceOption{
dynamoComponentDeployment: dynamoComponentDeployment, dynamoComponentDeployment: dynamoComponentDeployment,
isStealingTrafficDebugModeEnabled: false, instanceID: &i,
containsStealingTrafficDebugModeEnabled: false,
instanceID: &i,
}) })
}) })
if err != nil { if err != nil {
...@@ -389,10 +379,8 @@ func (r *DynamoComponentDeploymentReconciler) reconcileLeaderWorkerSetResources( ...@@ -389,10 +379,8 @@ func (r *DynamoComponentDeploymentReconciler) reconcileLeaderWorkerSetResources(
} }
// Clean up any excess LeaderWorkerSets (if replicas were decreased) // Clean up any excess LeaderWorkerSets (if replicas were decreased)
baseKubeName := r.getKubeName(dynamoComponentDeployment, false)
for i := int(desiredReplicas); ; i++ { for i := int(desiredReplicas); ; i++ {
// Try to find a LeaderWorkerSet with the next index nextLWSName := lwsInstanceName(dynamoComponentDeployment, i)
nextLWSName := fmt.Sprintf("%s-%d", baseKubeName, i)
lwsToDelete := &leaderworkersetv1.LeaderWorkerSet{} lwsToDelete := &leaderworkersetv1.LeaderWorkerSet{}
err := r.Get(ctx, types.NamespacedName{ err := r.Get(ctx, types.NamespacedName{
Name: nextLWSName, Name: nextLWSName,
...@@ -578,8 +566,7 @@ func (r *DynamoComponentDeploymentReconciler) generateVolcanoPodGroup(ctx contex ...@@ -578,8 +566,7 @@ func (r *DynamoComponentDeploymentReconciler) generateVolcanoPodGroup(ctx contex
return nil, false, fmt.Errorf("generateVolcanoPodGroup: instanceID cannot be negative, got %d", instanceID) return nil, false, fmt.Errorf("generateVolcanoPodGroup: instanceID cannot be negative, got %d", instanceID)
} }
podGroupName := r.getKubeName(opt.dynamoComponentDeployment, opt.isStealingTrafficDebugModeEnabled) podGroupName := lwsInstanceName(opt.dynamoComponentDeployment, instanceID)
podGroupName = fmt.Sprintf("%s-%d", podGroupName, instanceID)
kubeNs := opt.dynamoComponentDeployment.Namespace kubeNs := opt.dynamoComponentDeployment.Namespace
...@@ -705,8 +692,7 @@ func (r *DynamoComponentDeploymentReconciler) generateLeaderWorkerSet(ctx contex ...@@ -705,8 +692,7 @@ func (r *DynamoComponentDeploymentReconciler) generateLeaderWorkerSet(ctx contex
return nil, false, fmt.Errorf("generateLeaderWorkerSet: instanceID cannot be negative, got %d", instanceID) return nil, false, fmt.Errorf("generateLeaderWorkerSet: instanceID cannot be negative, got %d", instanceID)
} }
kubeName := r.getKubeName(opt.dynamoComponentDeployment, opt.isStealingTrafficDebugModeEnabled) kubeName := lwsInstanceName(opt.dynamoComponentDeployment, instanceID)
kubeName = fmt.Sprintf("%s-%d", kubeName, instanceID)
kubeNs := opt.dynamoComponentDeployment.Namespace kubeNs := opt.dynamoComponentDeployment.Namespace
labels := r.getKubeLabels(opt.dynamoComponentDeployment) labels := r.getKubeLabels(opt.dynamoComponentDeployment)
...@@ -759,6 +745,10 @@ func (r *DynamoComponentDeploymentReconciler) generateLeaderWorkerSet(ctx contex ...@@ -759,6 +745,10 @@ func (r *DynamoComponentDeploymentReconciler) generateLeaderWorkerSet(ctx contex
return leaderWorkerSet, false, nil return leaderWorkerSet, false, nil
} }
func lwsInstanceName(dcd *v1alpha1.DynamoComponentDeployment, instanceID int) string {
return fmt.Sprintf("%s-%d", dcd.Name, instanceID)
}
func (r *DynamoComponentDeploymentReconciler) FinalizeResource(ctx context.Context, dynamoComponentDeployment *v1alpha1.DynamoComponentDeployment) error { func (r *DynamoComponentDeploymentReconciler) FinalizeResource(ctx context.Context, dynamoComponentDeployment *v1alpha1.DynamoComponentDeployment) error {
logger := log.FromContext(ctx) logger := log.FromContext(ctx)
logger.Info("Finalizing the DynamoComponentDeployment", "dynamoComponentDeployment", dynamoComponentDeployment) logger.Info("Finalizing the DynamoComponentDeployment", "dynamoComponentDeployment", dynamoComponentDeployment)
...@@ -841,34 +831,14 @@ func (r *DynamoComponentDeploymentReconciler) setStatusConditions(ctx context.Co ...@@ -841,34 +831,14 @@ func (r *DynamoComponentDeploymentReconciler) setStatusConditions(ctx context.Co
return return
} }
//nolint:nakedret func (r *DynamoComponentDeploymentReconciler) createOrUpdateOrDeleteDeployments(ctx context.Context, opt generateResourceOption) (bool, *appsv1.Deployment, error) {
func (r *DynamoComponentDeploymentReconciler) createOrUpdateOrDeleteDeployments(ctx context.Context, opt generateResourceOption) (modified bool, depl *appsv1.Deployment, err error) { modified, depl, err := commonController.SyncResource(ctx, r, opt.dynamoComponentDeployment, func(ctx context.Context) (*appsv1.Deployment, bool, error) {
containsStealingTrafficDebugModeEnabled := checkIfContainsStealingTrafficDebugModeEnabled(opt.dynamoComponentDeployment) return r.generateDeployment(ctx, opt)
// create the main deployment
modified, depl, err = commonController.SyncResource(ctx, r, opt.dynamoComponentDeployment, func(ctx context.Context) (*appsv1.Deployment, bool, error) {
return r.generateDeployment(ctx, generateResourceOption{
dynamoComponentDeployment: opt.dynamoComponentDeployment,
isStealingTrafficDebugModeEnabled: false,
containsStealingTrafficDebugModeEnabled: containsStealingTrafficDebugModeEnabled,
})
}) })
if err != nil { if err != nil {
err = errors.Wrap(err, "create or update deployment") return false, nil, errors.Wrap(err, "create or update deployment")
return
} }
// create the debug deployment return modified, depl, nil
modified2, _, err := commonController.SyncResource(ctx, r, opt.dynamoComponentDeployment, func(ctx context.Context) (*appsv1.Deployment, bool, error) {
return r.generateDeployment(ctx, generateResourceOption{
dynamoComponentDeployment: opt.dynamoComponentDeployment,
isStealingTrafficDebugModeEnabled: true,
containsStealingTrafficDebugModeEnabled: containsStealingTrafficDebugModeEnabled,
})
})
if err != nil {
err = errors.Wrap(err, "create or update debug deployment")
}
modified = modified || modified2
return
} }
func getResourceAnnotations(dynamoComponentDeployment *v1alpha1.DynamoComponentDeployment) map[string]string { func getResourceAnnotations(dynamoComponentDeployment *v1alpha1.DynamoComponentDeployment) map[string]string {
...@@ -880,82 +850,14 @@ func getResourceAnnotations(dynamoComponentDeployment *v1alpha1.DynamoComponentD ...@@ -880,82 +850,14 @@ func getResourceAnnotations(dynamoComponentDeployment *v1alpha1.DynamoComponentD
return resourceAnnotations return resourceAnnotations
} }
func checkIfIsDebugModeEnabled(annotations map[string]string) bool { func (r *DynamoComponentDeploymentReconciler) createOrUpdateOrDeleteServices(ctx context.Context, opt generateResourceOption) (bool, error) {
if annotations == nil { modified, _, err := commonController.SyncResource(ctx, r, opt.dynamoComponentDeployment, func(ctx context.Context) (*corev1.Service, bool, error) {
return false return r.generateService(opt)
}
return annotations[KubeAnnotationEnableDebugMode] == commonconsts.KubeLabelValueTrue
}
func checkIfIsStealingTrafficDebugModeEnabled(annotations map[string]string) bool {
if annotations == nil {
return false
}
return annotations[KubeAnnotationEnableStealingTrafficDebugMode] == commonconsts.KubeLabelValueTrue
}
func checkIfIsDebugPodReceiveProductionTrafficEnabled(annotations map[string]string) bool {
if annotations == nil {
return false
}
return annotations[KubeAnnotationEnableDebugPodReceiveProductionTraffic] == commonconsts.KubeLabelValueTrue
}
func checkIfContainsStealingTrafficDebugModeEnabled(dynamoComponentDeployment *v1alpha1.DynamoComponentDeployment) bool {
return checkIfIsStealingTrafficDebugModeEnabled(dynamoComponentDeployment.Spec.Annotations)
}
//nolint:nakedret
func (r *DynamoComponentDeploymentReconciler) createOrUpdateOrDeleteServices(ctx context.Context, opt generateResourceOption) (modified bool, err error) {
resourceAnnotations := getResourceAnnotations(opt.dynamoComponentDeployment)
isDebugPodReceiveProductionTrafficEnabled := checkIfIsDebugPodReceiveProductionTrafficEnabled(resourceAnnotations)
containsStealingTrafficDebugModeEnabled := checkIfContainsStealingTrafficDebugModeEnabled(opt.dynamoComponentDeployment)
// main generic service
modified, _, err = commonController.SyncResource(ctx, r, opt.dynamoComponentDeployment, func(ctx context.Context) (*corev1.Service, bool, error) {
return r.generateService(generateResourceOption{
dynamoComponentDeployment: opt.dynamoComponentDeployment,
isStealingTrafficDebugModeEnabled: false,
isDebugPodReceiveProductionTraffic: isDebugPodReceiveProductionTrafficEnabled,
containsStealingTrafficDebugModeEnabled: containsStealingTrafficDebugModeEnabled,
isGenericService: true,
})
})
if err != nil {
return
}
// debug production service (if enabled)
modified_, _, err := commonController.SyncResource(ctx, r, opt.dynamoComponentDeployment, func(ctx context.Context) (*corev1.Service, bool, error) {
return r.generateService(generateResourceOption{
dynamoComponentDeployment: opt.dynamoComponentDeployment,
isStealingTrafficDebugModeEnabled: false,
isDebugPodReceiveProductionTraffic: isDebugPodReceiveProductionTrafficEnabled,
containsStealingTrafficDebugModeEnabled: containsStealingTrafficDebugModeEnabled,
isGenericService: false,
})
})
if err != nil {
return
}
modified = modified || modified_
// debug service (if enabled)
modified_, _, err = commonController.SyncResource(ctx, r, opt.dynamoComponentDeployment, func(ctx context.Context) (*corev1.Service, bool, error) {
return r.generateService(generateResourceOption{
dynamoComponentDeployment: opt.dynamoComponentDeployment,
isStealingTrafficDebugModeEnabled: true,
isDebugPodReceiveProductionTraffic: isDebugPodReceiveProductionTrafficEnabled,
containsStealingTrafficDebugModeEnabled: containsStealingTrafficDebugModeEnabled,
isGenericService: false,
})
}) })
if err != nil { if err != nil {
return return false, err
} }
modified = modified || modified_ return modified, nil
return
} }
func (r *DynamoComponentDeploymentReconciler) createOrUpdateOrDeleteIngress(ctx context.Context, opt generateResourceOption) (bool, error) { func (r *DynamoComponentDeploymentReconciler) createOrUpdateOrDeleteIngress(ctx context.Context, opt generateResourceOption) (bool, error) {
...@@ -1013,27 +915,6 @@ func (r *DynamoComponentDeploymentReconciler) generateVirtualService(ctx context ...@@ -1013,27 +915,6 @@ func (r *DynamoComponentDeploymentReconciler) generateVirtualService(ctx context
return dynamo.GenerateComponentVirtualService(ctx, opt.dynamoComponentDeployment.Name, opt.dynamoComponentDeployment.Namespace, *opt.dynamoComponentDeployment.Spec.Ingress), false, nil return dynamo.GenerateComponentVirtualService(ctx, opt.dynamoComponentDeployment.Name, opt.dynamoComponentDeployment.Namespace, *opt.dynamoComponentDeployment.Spec.Ingress), false, nil
} }
func (r *DynamoComponentDeploymentReconciler) getKubeName(dynamoComponentDeployment *v1alpha1.DynamoComponentDeployment, debug bool) string {
if debug {
return fmt.Sprintf("%s-d", dynamoComponentDeployment.Name)
}
return dynamoComponentDeployment.Name
}
func (r *DynamoComponentDeploymentReconciler) getServiceName(dynamoComponentDeployment *v1alpha1.DynamoComponentDeployment, debug bool) string {
var kubeName string
if debug {
kubeName = fmt.Sprintf("%s-d", dynamoComponentDeployment.Name)
} else {
kubeName = fmt.Sprintf("%s-p", dynamoComponentDeployment.Name)
}
return kubeName
}
func (r *DynamoComponentDeploymentReconciler) getGenericServiceName(dynamoComponentDeployment *v1alpha1.DynamoComponentDeployment) string {
return r.getKubeName(dynamoComponentDeployment, false)
}
func (r *DynamoComponentDeploymentReconciler) getKubeLabels(dynamoComponentDeployment *v1alpha1.DynamoComponentDeployment) map[string]string { func (r *DynamoComponentDeploymentReconciler) getKubeLabels(dynamoComponentDeployment *v1alpha1.DynamoComponentDeployment) map[string]string {
labels := map[string]string{} labels := map[string]string{}
if dynamoComponentDeployment != nil { if dynamoComponentDeployment != nil {
...@@ -1070,7 +951,7 @@ func (r *DynamoComponentDeploymentReconciler) generateDeployment(ctx context.Con ...@@ -1070,7 +951,7 @@ func (r *DynamoComponentDeploymentReconciler) generateDeployment(ctx context.Con
annotations := r.getKubeAnnotations(opt.dynamoComponentDeployment) annotations := r.getKubeAnnotations(opt.dynamoComponentDeployment)
kubeName := r.getKubeName(opt.dynamoComponentDeployment, opt.isStealingTrafficDebugModeEnabled) kubeName := opt.dynamoComponentDeployment.Name
kubeDeployment = &appsv1.Deployment{ kubeDeployment = &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
...@@ -1081,11 +962,6 @@ func (r *DynamoComponentDeploymentReconciler) generateDeployment(ctx context.Con ...@@ -1081,11 +962,6 @@ func (r *DynamoComponentDeploymentReconciler) generateDeployment(ctx context.Con
}, },
} }
if opt.isStealingTrafficDebugModeEnabled && !opt.containsStealingTrafficDebugModeEnabled {
// if stealing traffic debug mode is enabked but disabled in the deployment, we need to delete the deployment
return kubeDeployment, true, nil
}
// nolint: gosimple // nolint: gosimple
podTemplateSpec, err := r.generatePodTemplateSpec(ctx, opt, dynamo.RoleMain) podTemplateSpec, err := r.generatePodTemplateSpec(ctx, opt, dynamo.RoleMain)
if err != nil { if err != nil {
...@@ -1122,14 +998,8 @@ func (r *DynamoComponentDeploymentReconciler) generateDeployment(ctx context.Con ...@@ -1122,14 +998,8 @@ func (r *DynamoComponentDeploymentReconciler) generateDeployment(ctx context.Con
} }
} }
var replicas *int32
replicas = opt.dynamoComponentDeployment.Spec.Replicas
if opt.isStealingTrafficDebugModeEnabled {
replicas = &[]int32{int32(1)}[0]
}
kubeDeployment.Spec = appsv1.DeploymentSpec{ kubeDeployment.Spec = appsv1.DeploymentSpec{
Replicas: replicas, Replicas: opt.dynamoComponentDeployment.Spec.Replicas,
Selector: &metav1.LabelSelector{ Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{ MatchLabels: map[string]string{
commonconsts.KubeLabelDynamoSelector: kubeName, commonconsts.KubeLabelDynamoSelector: kubeName,
...@@ -1157,20 +1027,13 @@ func getDeploymentRollingUpdateMaxSurgeAndMaxUnavailable(annotations map[string] ...@@ -1157,20 +1027,13 @@ func getDeploymentRollingUpdateMaxSurgeAndMaxUnavailable(annotations map[string]
} }
type generateResourceOption struct { type generateResourceOption struct {
dynamoComponentDeployment *v1alpha1.DynamoComponentDeployment dynamoComponentDeployment *v1alpha1.DynamoComponentDeployment
isStealingTrafficDebugModeEnabled bool instanceID *int
containsStealingTrafficDebugModeEnabled bool
isDebugPodReceiveProductionTraffic bool
isGenericService bool
instanceID *int
} }
//nolint:gocyclo,nakedret //nolint:gocyclo,nakedret
func (r *DynamoComponentDeploymentReconciler) generatePodTemplateSpec(ctx context.Context, opt generateResourceOption, role dynamo.Role) (podTemplateSpec *corev1.PodTemplateSpec, err error) { func (r *DynamoComponentDeploymentReconciler) generatePodTemplateSpec(ctx context.Context, opt generateResourceOption, role dynamo.Role) (podTemplateSpec *corev1.PodTemplateSpec, err error) {
podLabels := r.getKubeLabels(opt.dynamoComponentDeployment) podLabels := r.getKubeLabels(opt.dynamoComponentDeployment)
if opt.isStealingTrafficDebugModeEnabled {
podLabels[commonconsts.KubeLabelDynamoDeploymentTargetType] = DeploymentTargetTypeDebug
}
// Convert user-provided metrics annotation into controller-managed label // Convert user-provided metrics annotation into controller-managed label
// By default (no annotation), metrics are enabled // By default (no annotation), metrics are enabled
...@@ -1200,7 +1063,7 @@ func (r *DynamoComponentDeploymentReconciler) generatePodTemplateSpec(ctx contex ...@@ -1200,7 +1063,7 @@ func (r *DynamoComponentDeploymentReconciler) generatePodTemplateSpec(ctx contex
podAnnotations := make(map[string]string) podAnnotations := make(map[string]string)
kubeName := r.getKubeName(opt.dynamoComponentDeployment, opt.isStealingTrafficDebugModeEnabled) kubeName := opt.dynamoComponentDeployment.Name
resourceAnnotations := opt.dynamoComponentDeployment.Spec.Annotations resourceAnnotations := opt.dynamoComponentDeployment.Spec.Annotations
...@@ -1208,8 +1071,6 @@ func (r *DynamoComponentDeploymentReconciler) generatePodTemplateSpec(ctx contex ...@@ -1208,8 +1071,6 @@ func (r *DynamoComponentDeploymentReconciler) generatePodTemplateSpec(ctx contex
resourceAnnotations = make(map[string]string) resourceAnnotations = make(map[string]string)
} }
isDebugModeEnabled := checkIfIsDebugModeEnabled(resourceAnnotations)
// Resolve checkpoint for this component // Resolve checkpoint for this component
var checkpointInfo *checkpoint.CheckpointInfo var checkpointInfo *checkpoint.CheckpointInfo
if opt.dynamoComponentDeployment.Spec.Checkpoint != nil && opt.dynamoComponentDeployment.Spec.Checkpoint.Enabled { if opt.dynamoComponentDeployment.Spec.Checkpoint != nil && opt.dynamoComponentDeployment.Spec.Checkpoint.Enabled {
...@@ -1231,40 +1092,6 @@ func (r *DynamoComponentDeploymentReconciler) generatePodTemplateSpec(ctx contex ...@@ -1231,40 +1092,6 @@ func (r *DynamoComponentDeploymentReconciler) generatePodTemplateSpec(ctx contex
return nil, errors.New("no containers found in base pod spec") return nil, errors.New("no containers found in base pod spec")
} }
debuggerImage := "python:3.12-slim"
debuggerImage_ := os.Getenv("INTERNAL_IMAGES_DEBUGGER")
if debuggerImage_ != "" {
debuggerImage = debuggerImage_
}
if opt.isStealingTrafficDebugModeEnabled || isDebugModeEnabled {
podSpec.Containers = append(podSpec.Containers, corev1.Container{
Name: "debugger",
Image: debuggerImage,
Command: []string{
"sleep",
"infinity",
},
SecurityContext: &corev1.SecurityContext{
Capabilities: &corev1.Capabilities{
Add: []corev1.Capability{"SYS_PTRACE"},
},
},
Resources: corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("100m"),
corev1.ResourceMemory: resource.MustParse("100Mi"),
},
Limits: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("1000m"),
corev1.ResourceMemory: resource.MustParse("1000Mi"),
},
},
Stdin: true,
TTY: true,
})
}
podLabels[commonconsts.KubeLabelDynamoSelector] = kubeName podLabels[commonconsts.KubeLabelDynamoSelector] = kubeName
extraPodMetadata := opt.dynamoComponentDeployment.Spec.ExtraPodMetadata extraPodMetadata := opt.dynamoComponentDeployment.Spec.ExtraPodMetadata
...@@ -1296,10 +1123,6 @@ func (r *DynamoComponentDeploymentReconciler) generatePodTemplateSpec(ctx contex ...@@ -1296,10 +1123,6 @@ func (r *DynamoComponentDeploymentReconciler) generatePodTemplateSpec(ctx contex
} }
} }
if opt.isStealingTrafficDebugModeEnabled || isDebugModeEnabled {
podSpec.ShareProcessNamespace = &[]bool{true}[0]
}
podTemplateSpec = &corev1.PodTemplateSpec{ podTemplateSpec = &corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Labels: podLabels, Labels: podLabels,
...@@ -1312,112 +1135,42 @@ func (r *DynamoComponentDeploymentReconciler) generatePodTemplateSpec(ctx contex ...@@ -1312,112 +1135,42 @@ func (r *DynamoComponentDeploymentReconciler) generatePodTemplateSpec(ctx contex
} }
func (r *DynamoComponentDeploymentReconciler) generateService(opt generateResourceOption) (*corev1.Service, bool, error) { func (r *DynamoComponentDeploymentReconciler) generateService(opt generateResourceOption) (*corev1.Service, bool, error) {
var kubeName string dcd := opt.dynamoComponentDeployment
if opt.isGenericService {
kubeName = r.getGenericServiceName(opt.dynamoComponentDeployment)
} else {
kubeName = r.getServiceName(opt.dynamoComponentDeployment, opt.isStealingTrafficDebugModeEnabled)
}
kubeNs := opt.dynamoComponentDeployment.Namespace deleteStub := &corev1.Service{
kubeService := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: kubeName, Name: dcd.Name,
Namespace: kubeNs, Namespace: dcd.Namespace,
}, },
} }
isK8sDiscovery := r.Config.IsK8sDiscoveryEnabled(opt.dynamoComponentDeployment.Spec.Annotations) isK8sDiscovery := r.Config.IsK8sDiscoveryEnabled(dcd.Spec.Annotations)
// if discovery backend is k8s we want to create a service for each component if !(isK8sDiscovery || dcd.IsFrontendComponent()) {
// else, only create for the frontend component return deleteStub, true, nil
if !opt.isGenericService && !opt.containsStealingTrafficDebugModeEnabled && !(isK8sDiscovery || opt.dynamoComponentDeployment.IsFrontendComponent()) {
// if it's not the main component or if it's not a generic service and not contains stealing traffic debug mode enabled, we don't need to create the service
return kubeService, true, nil
} }
labels := r.getKubeLabels(opt.dynamoComponentDeployment) if dcd.Spec.DynamoNamespace == nil {
return nil, false, fmt.Errorf("expected DynamoComponentDeployment %s to have a dynamoNamespace", dcd.Name)
if opt.dynamoComponentDeployment.Spec.DynamoNamespace == nil {
return nil, false, fmt.Errorf("expected DynamoComponentDeployment %s to have a dynamoNamespace", opt.dynamoComponentDeployment.Name)
} }
selector := map[string]string{ svc, err := dynamo.GenerateComponentService(dynamo.ComponentServiceParams{
commonconsts.KubeLabelDynamoComponentType: opt.dynamoComponentDeployment.Spec.ComponentType, // e.g. "worker" ServiceName: dcd.Name,
commonconsts.KubeLabelDynamoNamespace: *opt.dynamoComponentDeployment.Spec.DynamoNamespace, // result of ComputeDynamoNamespace(k8sNamespace, dgdName) Namespace: dcd.Namespace,
// The original user provided component name (the service map key, e.g. "VllmDecodeWorker" in the DGD). ComponentType: dcd.Spec.ComponentType,
// Needed to disambiguate amongst distinct components with the same component type within a DGD (e.g prefill/decode workers). DynamoNamespace: *dcd.Spec.DynamoNamespace,
commonconsts.KubeLabelDynamoComponent: opt.dynamoComponentDeployment.Spec.ServiceName, ComponentName: dcd.Spec.ServiceName,
} Labels: r.getKubeLabels(dcd),
// // If using LeaderWorkerSet, modify selector to only target leaders Annotations: r.getKubeAnnotations(dcd),
if opt.dynamoComponentDeployment.IsMultinode() { IsK8sDiscovery: isK8sDiscovery,
selector["role"] = "leader" })
} if err != nil {
if opt.isStealingTrafficDebugModeEnabled { return nil, false, err
selector[commonconsts.KubeLabelDynamoDeploymentTargetType] = DeploymentTargetTypeDebug
}
if isK8sDiscovery {
labels[commonconsts.KubeLabelDynamoDiscoveryBackend] = "kubernetes"
labels[commonconsts.KubeLabelDynamoDiscoveryEnabled] = commonconsts.KubeLabelValueTrue
}
var servicePort corev1.ServicePort
switch opt.dynamoComponentDeployment.Spec.ComponentType {
case commonconsts.ComponentTypeFrontend:
servicePort = corev1.ServicePort{
Name: commonconsts.DynamoServicePortName,
Port: commonconsts.DynamoServicePort,
TargetPort: intstr.FromString(commonconsts.DynamoContainerPortName),
Protocol: corev1.ProtocolTCP,
}
case commonconsts.ComponentTypeEPP:
// EPP exposes the gRPC endpoint for InferencePool communication
servicePort = corev1.ServicePort{
Name: commonconsts.EPPGRPCPortName,
Port: commonconsts.EPPGRPCPort,
TargetPort: intstr.FromInt(commonconsts.EPPGRPCPort),
Protocol: corev1.ProtocolTCP,
}
default:
// Worker and other components use the system port for metrics/health
servicePort = corev1.ServicePort{
Name: commonconsts.DynamoSystemPortName,
Port: commonconsts.DynamoSystemPort,
TargetPort: intstr.FromString(commonconsts.DynamoSystemPortName),
Protocol: corev1.ProtocolTCP,
}
} }
if dcd.IsMultinode() {
spec := corev1.ServiceSpec{ svc.Spec.Selector["role"] = "leader"
Selector: selector,
Ports: []corev1.ServicePort{servicePort},
} }
return svc, false, nil
annotations := r.getKubeAnnotations(opt.dynamoComponentDeployment)
kubeService.ObjectMeta.Annotations = annotations
kubeService.ObjectMeta.Labels = labels
kubeService.Spec = spec
return kubeService, false, nil
}
type TLSModeOpt string
const (
TLSModeNone TLSModeOpt = "none"
TLSModeAuto TLSModeOpt = "auto"
TLSModeStatic TLSModeOpt = "static"
)
type IngressConfig struct {
ClassName *string
Annotations map[string]string
Path string
PathType networkingv1.PathType
TLSMode TLSModeOpt
StaticTLSSecretName string
} }
// SetupWithManager sets up the controller with the Manager. // SetupWithManager sets up the controller with the Manager.
......
...@@ -629,7 +629,18 @@ func (r *DynamoGraphDeploymentReconciler) reconcileGroveResources(ctx context.Co ...@@ -629,7 +629,18 @@ func (r *DynamoGraphDeploymentReconciler) reconcileGroveResources(ctx context.Co
// else, only create for the frontend component // else, only create for the frontend component
isK8sDiscoveryEnabled := r.Config.IsK8sDiscoveryEnabled(dynamoDeployment.Annotations) isK8sDiscoveryEnabled := r.Config.IsK8sDiscoveryEnabled(dynamoDeployment.Annotations)
if isK8sDiscoveryEnabled || component.ComponentType == consts.ComponentTypeFrontend { if isK8sDiscoveryEnabled || component.ComponentType == consts.ComponentTypeFrontend {
mainComponentService, err := dynamo.GenerateComponentService(ctx, dynamoDeployment, component, componentName, isK8sDiscoveryEnabled) if component.DynamoNamespace == nil {
return ReconcileResult{}, fmt.Errorf("expected component %s to have a dynamoNamespace", componentName)
}
mainComponentService, err := dynamo.GenerateComponentService(dynamo.ComponentServiceParams{
ServiceName: dynamo.GetDCDResourceName(dynamoDeployment, componentName, ""),
Namespace: dynamoDeployment.Namespace,
ComponentType: component.ComponentType,
DynamoNamespace: *component.DynamoNamespace,
ComponentName: componentName,
Labels: component.Labels,
IsK8sDiscovery: isK8sDiscoveryEnabled,
})
if err != nil { if err != nil {
logger.Error(err, "failed to generate the main component service") logger.Error(err, "failed to generate the main component service")
return ReconcileResult{}, fmt.Errorf("failed to generate the main component service: %w", err) return ReconcileResult{}, fmt.Errorf("failed to generate the main component service: %w", err)
......
...@@ -612,15 +612,23 @@ func getCliqueStartupDependencies( ...@@ -612,15 +612,23 @@ func getCliqueStartupDependencies(
return nil return nil
} }
func GenerateComponentService(ctx context.Context, dynamoDeployment *v1alpha1.DynamoGraphDeployment, component *v1alpha1.DynamoComponentDeploymentSharedSpec, componentName string, isK8sDiscoveryEnabled bool) (*corev1.Service, error) { // ComponentServiceParams contains all the fields needed to generate a Kubernetes
if component.DynamoNamespace == nil { // Service for a Dynamo component, independent of whether the caller is the DGD
return nil, fmt.Errorf("expected DynamoComponentDeployment %s to have a dynamoNamespace", componentName) // (Grove) or DCD controller.
} type ComponentServiceParams struct {
// DNS-safe service resource name: "{dgd-name}-{lowercase(componentName)}" ServiceName string
kubeServiceName := GetDCDResourceName(dynamoDeployment, componentName, "") Namespace string
ComponentType string
DynamoNamespace string
ComponentName string // original user-provided name, used in selector
Labels map[string]string
Annotations map[string]string
IsK8sDiscovery bool
}
func GenerateComponentService(params ComponentServiceParams) (*corev1.Service, error) {
var servicePort corev1.ServicePort var servicePort corev1.ServicePort
switch component.ComponentType { switch params.ComponentType {
case commonconsts.ComponentTypeFrontend: case commonconsts.ComponentTypeFrontend:
servicePort = corev1.ServicePort{ servicePort = corev1.ServicePort{
Name: commonconsts.DynamoServicePortName, Name: commonconsts.DynamoServicePortName,
...@@ -629,7 +637,6 @@ func GenerateComponentService(ctx context.Context, dynamoDeployment *v1alpha1.Dy ...@@ -629,7 +637,6 @@ func GenerateComponentService(ctx context.Context, dynamoDeployment *v1alpha1.Dy
Protocol: corev1.ProtocolTCP, Protocol: corev1.ProtocolTCP,
} }
case commonconsts.ComponentTypeEPP: case commonconsts.ComponentTypeEPP:
// EPP only exposes the gRPC endpoint for InferencePool communication
servicePort = corev1.ServicePort{ servicePort = corev1.ServicePort{
Name: commonconsts.EPPGRPCPortName, Name: commonconsts.EPPGRPCPortName,
Port: commonconsts.EPPGRPCPort, Port: commonconsts.EPPGRPCPort,
...@@ -646,33 +653,36 @@ func GenerateComponentService(ctx context.Context, dynamoDeployment *v1alpha1.Dy ...@@ -646,33 +653,36 @@ func GenerateComponentService(ctx context.Context, dynamoDeployment *v1alpha1.Dy
} }
} }
// Start with user-defined labels from component.Labels
labels := make(map[string]string) labels := make(map[string]string)
for k, v := range component.Labels { for k, v := range params.Labels {
labels[k] = v labels[k] = v
} }
if params.IsK8sDiscovery {
// Add k8s discovery labels (these take precedence over user labels)
if isK8sDiscoveryEnabled {
labels[commonconsts.KubeLabelDynamoDiscoveryBackend] = "kubernetes" labels[commonconsts.KubeLabelDynamoDiscoveryBackend] = "kubernetes"
labels[commonconsts.KubeLabelDynamoDiscoveryEnabled] = commonconsts.KubeLabelValueTrue labels[commonconsts.KubeLabelDynamoDiscoveryEnabled] = commonconsts.KubeLabelValueTrue
} }
selector := map[string]string{
commonconsts.KubeLabelDynamoComponentType: params.ComponentType,
commonconsts.KubeLabelDynamoNamespace: params.DynamoNamespace,
commonconsts.KubeLabelDynamoComponent: params.ComponentName,
}
annotations := make(map[string]string)
for k, v := range params.Annotations {
annotations[k] = v
}
service := &corev1.Service{ service := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: kubeServiceName, Name: params.ServiceName,
Namespace: dynamoDeployment.Namespace, Namespace: params.Namespace,
Labels: labels, Labels: labels,
Annotations: annotations,
}, },
Spec: corev1.ServiceSpec{ Spec: corev1.ServiceSpec{
Selector: map[string]string{ Selector: selector,
commonconsts.KubeLabelDynamoComponentType: component.ComponentType, // e.g "worker" Ports: []corev1.ServicePort{servicePort},
commonconsts.KubeLabelDynamoNamespace: *component.DynamoNamespace, // result of ComputeDynamoNamespace(k8sNamespace, dgdName)
// The original user provided component name (the service map key, e.g. "VllmDecodeWorker" in the DGD).
// Needed to disambiguate amongst distinct components with the same component type within a DGD (e.g prefill/decode workers).
commonconsts.KubeLabelDynamoComponent: componentName,
},
Ports: []corev1.ServicePort{servicePort},
}, },
} }
return service, nil return service, nil
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment