Unverified Commit efec27f9 authored by Thomas Montfort's avatar Thomas Montfort Committed by GitHub
Browse files

feat: etcd-less operator updates (#4214)

parent afccc9d4
......@@ -102,3 +102,13 @@ Validation for configuration consistency
{{- end -}}
{{- end -}}
{{- end -}}
{{/*
Validation for discoverBackend configuration
*/}}
{{- define "dynamo-operator.validateDiscoveryBackend" -}}
{{- $discoveryBackend := .Values.discoveryBackend -}}
{{- if and (ne $discoveryBackend "") (ne $discoveryBackend "kubernetes") -}}
{{- fail (printf "VALIDATION ERROR: discoveryBackend must be either an empty string (defaults to ETCD) or 'kubernetes'. Got: '%s'" $discoveryBackend) -}}
{{- end -}}
{{- end -}}
......@@ -16,6 +16,7 @@
{{/* Validate installation to prevent conflicts */}}
{{- include "dynamo-operator.validateClusterWideInstallation" . -}}
{{- include "dynamo-operator.validateConfiguration" . -}}
{{- include "dynamo-operator.validateDiscoveryBackend" . -}}
---
apiVersion: apps/v1
......@@ -131,6 +132,9 @@ spec:
- --dgdr-profiling-cluster-role-name={{ include "dynamo-operator.fullname" . }}-dgdr-profiling
- --planner-cluster-role-name={{ include "dynamo-operator.fullname" . }}-planner
{{- end }}
{{- if .Values.discoveryBackend }}
- --discovery-backend={{ .Values.discoveryBackend }}
{{- end }}
{{- if .Values.namespaceRestriction.enabled }}
{{- if .Values.namespaceRestriction.lease }}
- --namespace-scope-lease-duration={{ .Values.namespaceRestriction.lease.duration }}
......
......@@ -42,6 +42,8 @@ dynamo-operator:
# Interval for renewing the namespace scope marker lease (namespace-restricted mode only). The namespace-restricted operator renews its lease at this interval to signal it's still running.
renewInterval: 10s
# -- The Dynamo discovery backend to use. By default, will rely on ETCD for discovery. Can be set to "kubernetes" to use Kubernetes API for service discovery. --
discoveryBackend: ""
# Controller manager configuration
controllerManager:
......
......@@ -146,6 +146,7 @@ func main() {
var namespaceScopeLeaseDuration time.Duration
var namespaceScopeLeaseRenewInterval time.Duration
var operatorVersion string
var discoveryBackend string
flag.StringVar(&metricsAddr, "metrics-bind-address", ":8080", "The address the metric endpoint binds to.")
flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.")
flag.BoolVar(&enableLeaderElection, "leader-elect", false,
......@@ -194,6 +195,8 @@ func main() {
"Interval for renewing namespace scope marker lease (namespace-restricted mode only)")
flag.StringVar(&operatorVersion, "operator-version", "unknown",
"Version of the operator (used in lease holder identity)")
flag.StringVar(&discoveryBackend, "discovery-backend", "",
"Discovery backend to use: empty string (default, uses ETCD) or 'kubernetes' (uses Kubernetes API)")
opts := zap.Options{
Development: true,
}
......@@ -205,6 +208,17 @@ func main() {
os.Exit(1)
}
// Validate discoverBackend value
if discoveryBackend != "" && discoveryBackend != "kubernetes" {
setupLog.Error(nil, "invalid discover-backend value, must be empty string or 'kubernetes'", "value", discoveryBackend)
os.Exit(1)
}
if discoveryBackend != "" {
setupLog.Info("Discovery backend configured", "backend", discoveryBackend)
} else {
setupLog.Info("Discovery backend configured", "backend", "etcd (default)")
}
// Validate modelExpressURL if provided
if modelExpressURL != "" {
if _, err := url.Parse(modelExpressURL); err != nil {
......@@ -253,6 +267,7 @@ func main() {
PlannerClusterRoleName: plannerClusterRoleName,
DGDRProfilingClusterRoleName: dgdrProfilingClusterRoleName,
},
DiscoveryBackend: discoveryBackend,
}
mainCtx := ctrl.SetupSignalHandler()
......
......@@ -31,6 +31,7 @@ const (
KubeAnnotationEnableGrove = "nvidia.com/enable-grove"
KubeAnnotationDisableImagePullSecretDiscovery = "nvidia.com/disable-image-pull-secret-discovery"
KubeAnnotationDynamoDiscoveryBackend = "nvidia.com/dynamo-discovery-backend"
KubeLabelDynamoGraphDeploymentName = "nvidia.com/dynamo-graph-deployment-name"
KubeLabelDynamoComponent = "nvidia.com/dynamo-component"
......@@ -41,6 +42,7 @@ const (
KubeLabelDynamoBaseModel = "nvidia.com/dynamo-base-model"
KubeLabelDynamoBaseModelHash = "nvidia.com/dynamo-base-model-hash"
KubeAnnotationDynamoBaseModel = "nvidia.com/dynamo-base-model"
KubeLabelDynamoDiscoveryBackend = "nvidia.com/dynamo-discovery-backend"
KubeLabelValueFalse = "false"
KubeLabelValueTrue = "true"
......@@ -50,12 +52,17 @@ const (
KubeResourceGPUNvidia = "nvidia.com/gpu"
DynamoDeploymentConfigEnvVar = "DYN_DEPLOYMENT_CONFIG"
DynamoNamespaceEnvVar = "DYN_NAMESPACE"
DynamoComponentEnvVar = "DYN_COMPONENT"
DynamoDiscoveryBackendEnvVar = "DYN_DISCOVERY_BACKEND"
GlobalDynamoNamespace = "dynamo"
ComponentTypePlanner = "planner"
ComponentTypeFrontend = "frontend"
ComponentTypeWorker = "worker"
ComponentTypePrefill = "prefill"
ComponentTypeDecode = "decode"
ComponentTypeDefault = "default"
PlannerServiceAccountName = "planner-serviceaccount"
......
......@@ -63,14 +63,7 @@ const (
KubeAnnotationEnableStealingTrafficDebugMode = "nvidia.com/enable-stealing-traffic-debug-mode"
KubeAnnotationEnableDebugMode = "nvidia.com/enable-debug-mode"
KubeAnnotationEnableDebugPodReceiveProductionTraffic = "nvidia.com/enable-debug-pod-receive-production-traffic"
DeploymentTargetTypeProduction = "production"
DeploymentTargetTypeDebug = "debug"
HeaderNameDebug = "X-Nvidia-Debug"
KubernetesDeploymentStrategy = "kubernetes"
DeploymentTypeStandard = "standard"
DeploymentTypeMultinodeGrove = "multinode-grove"
ComponentTypePlanner = "Planner"
)
// DynamoComponentDeploymentReconciler reconciles a DynamoComponentDeployment object
......@@ -1276,40 +1269,56 @@ func (r *DynamoComponentDeploymentReconciler) generateService(opt generateResour
},
}
if !opt.dynamoComponentDeployment.IsFrontendComponent() || (!opt.isGenericService && !opt.containsStealingTrafficDebugModeEnabled) {
isK8sDiscovery := r.Config.IsK8sDiscoveryEnabled(opt.dynamoComponentDeployment.Spec.Annotations)
// if discovery backend is k8s we want to create a service for each component
// else, only create for the frontend component
if !opt.isGenericService && !opt.containsStealingTrafficDebugModeEnabled && !(isK8sDiscovery || opt.dynamoComponentDeployment.IsFrontendComponent()) {
// if it's not the main component or if it's not a generic service and not contains stealing traffic debug mode enabled, we don't need to create the service
return kubeService, true, nil
}
labels := r.getKubeLabels(opt.dynamoComponentDeployment)
selector := make(map[string]string)
for k, v := range labels {
selector[k] = v
if opt.dynamoComponentDeployment.Spec.DynamoNamespace == nil {
return nil, false, fmt.Errorf("expected DynamoComponentDeployment %s to have a dynamoNamespace", opt.dynamoComponentDeployment.Name)
}
// If using LeaderWorkerSet, modify selector to only target leaders
selector := map[string]string{
commonconsts.KubeLabelDynamoComponentType: opt.dynamoComponentDeployment.Spec.ComponentType,
commonconsts.KubeLabelDynamoNamespace: *opt.dynamoComponentDeployment.Spec.DynamoNamespace,
}
// // If using LeaderWorkerSet, modify selector to only target leaders
if opt.dynamoComponentDeployment.IsMultinode() {
selector["role"] = "leader"
}
if opt.isStealingTrafficDebugModeEnabled {
selector[commonconsts.KubeLabelDynamoDeploymentTargetType] = DeploymentTargetTypeDebug
}
if isK8sDiscovery {
labels[commonconsts.KubeLabelDynamoDiscoveryBackend] = "kubernetes"
}
targetPort := intstr.FromString(commonconsts.DynamoContainerPortName)
var servicePort corev1.ServicePort
if opt.dynamoComponentDeployment.IsFrontendComponent() {
servicePort = corev1.ServicePort{
Name: commonconsts.DynamoServicePortName,
Port: commonconsts.DynamoServicePort,
TargetPort: intstr.FromString(commonconsts.DynamoContainerPortName),
Protocol: corev1.ProtocolTCP,
}
} else { // TODO: only for worker components
servicePort = corev1.ServicePort{
Name: commonconsts.DynamoSystemPortName,
Port: commonconsts.DynamoSystemPort,
TargetPort: intstr.FromString(commonconsts.DynamoSystemPortName),
Protocol: corev1.ProtocolTCP,
}
}
spec := corev1.ServiceSpec{
Selector: selector,
Ports: []corev1.ServicePort{
{
Name: commonconsts.DynamoServicePortName,
Port: commonconsts.DynamoServicePort,
TargetPort: targetPort,
Protocol: corev1.ProtocolTCP,
},
},
Ports: []corev1.ServicePort{servicePort},
}
annotations := r.getKubeAnnotations(opt.dynamoComponentDeployment)
......
......@@ -824,11 +824,22 @@ func TestDynamoComponentDeploymentReconciler_generateLeaderWorkerSet(t *testing.
Command: []string{"/bin/sh", "-c"},
Args: []string{"ray start --head --port=6379 && some dynamo command --tensor-parallel-size 4 --pipeline-parallel-size 1"},
Env: []corev1.EnvVar{
{Name: "DYN_NAMESPACE", Value: "default"},
{Name: commonconsts.DynamoComponentEnvVar, Value: commonconsts.ComponentTypeWorker},
{Name: commonconsts.DynamoNamespaceEnvVar, Value: "default"},
{Name: "DYN_PARENT_DGD_K8S_NAME", Value: "test-lws-deploy"},
{Name: "DYN_PARENT_DGD_K8S_NAMESPACE", Value: "default"},
{Name: "DYN_SYSTEM_PORT", Value: "9090"},
{Name: "DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS", Value: "[\"generate\"]"},
{Name: "POD_NAME", ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: "metadata.name",
},
}},
{Name: "POD_NAMESPACE", ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: "metadata.namespace",
},
}},
{Name: "TEST_ENV_FROM_DYNAMO_COMPONENT_DEPLOYMENT_SPEC", Value: "test_value_from_dynamo_component_deployment_spec"},
{Name: "TEST_ENV_FROM_EXTRA_POD_SPEC", Value: "test_value_from_extra_pod_spec"},
},
......@@ -937,11 +948,22 @@ func TestDynamoComponentDeploymentReconciler_generateLeaderWorkerSet(t *testing.
Command: []string{"/bin/sh", "-c"},
Args: []string{"ray start --address=$(LWS_LEADER_ADDRESS):6379 --block"},
Env: []corev1.EnvVar{
{Name: "DYN_NAMESPACE", Value: "default"},
{Name: commonconsts.DynamoComponentEnvVar, Value: commonconsts.ComponentTypeWorker},
{Name: commonconsts.DynamoNamespaceEnvVar, Value: "default"},
{Name: "DYN_PARENT_DGD_K8S_NAME", Value: "test-lws-deploy"},
{Name: "DYN_PARENT_DGD_K8S_NAMESPACE", Value: "default"},
{Name: "DYN_SYSTEM_PORT", Value: "9090"},
{Name: "DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS", Value: "[\"generate\"]"},
{Name: "POD_NAME", ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: "metadata.name",
},
}},
{Name: "POD_NAMESPACE", ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: "metadata.namespace",
},
}},
{Name: "TEST_ENV_FROM_DYNAMO_COMPONENT_DEPLOYMENT_SPEC", Value: "test_value_from_dynamo_component_deployment_spec"},
{Name: "TEST_ENV_FROM_EXTRA_POD_SPEC", Value: "test_value_from_extra_pod_spec"},
},
......
......@@ -25,6 +25,7 @@ import (
grovev1alpha1 "github.com/NVIDIA/grove/operator/api/core/v1alpha1"
"k8s.io/apimachinery/pkg/api/errors"
"github.com/ai-dynamo/dynamo/deploy/cloud/operator/internal/discovery"
"github.com/ai-dynamo/dynamo/deploy/cloud/operator/internal/secret"
networkingv1beta1 "istio.io/client-go/pkg/apis/networking/v1beta1"
......@@ -48,6 +49,7 @@ import (
"github.com/ai-dynamo/dynamo/deploy/cloud/operator/internal/consts"
commonController "github.com/ai-dynamo/dynamo/deploy/cloud/operator/internal/controller_common"
"github.com/ai-dynamo/dynamo/deploy/cloud/operator/internal/dynamo"
rbacv1 "k8s.io/api/rbac/v1"
)
type State string
......@@ -200,6 +202,13 @@ func (r *DynamoGraphDeploymentReconciler) reconcileResources(ctx context.Context
return "", "", "", fmt.Errorf("failed to reconcile top-level PVCs: %w", err)
}
// Reconcile the SA, Role and RoleBinding if k8s discovery is enabled
err = r.reconcileK8sDiscoveryResources(ctx, dynamoDeployment)
if err != nil {
logger.Error(err, "Failed to reconcile K8s discovery resources")
return "", "", "", fmt.Errorf("failed to reconcile K8s discovery resources: %w", err)
}
// Orchestrator selection via single boolean annotation: nvidia.com/enable-grove
// Unset or not "false": Grove if available; else component mode
// "false": component mode (multinode -> LWS; single-node -> standard)
......@@ -310,6 +319,7 @@ func (r *DynamoGraphDeploymentReconciler) reconcileGroveScaling(ctx context.Cont
func (r *DynamoGraphDeploymentReconciler) reconcileGroveResources(ctx context.Context, dynamoDeployment *nvidiacomv1alpha1.DynamoGraphDeployment) (State, Reason, Message, error) {
logger := log.FromContext(ctx)
// generate the dynamoComponentsDeployments from the config
groveGangSet, err := dynamo.GenerateGrovePodCliqueSet(ctx, dynamoDeployment, r.Config, r.DockerSecretRetriever)
if err != nil {
......@@ -355,9 +365,11 @@ func (r *DynamoGraphDeploymentReconciler) reconcileGroveResources(ctx context.Co
resources := []Resource{groveGangSetAsResource}
for componentName, component := range dynamoDeployment.Spec.Services {
if component.ComponentType == consts.ComponentTypeFrontend {
// generate the main component service
mainComponentService, err := dynamo.GenerateComponentService(ctx, dynamo.GetDynamoComponentName(dynamoDeployment, componentName), dynamoDeployment.Namespace)
// if k8s discovery is enabled, create a service for each component
// else, only create for the frontend component
if r.Config.IsK8sDiscoveryEnabled(dynamoDeployment.Annotations) || component.ComponentType == consts.ComponentTypeFrontend {
mainComponentService, err := dynamo.GenerateComponentService(ctx, dynamoDeployment, component, componentName)
if err != nil {
logger.Error(err, "failed to generate the main component service")
return "", "", "", fmt.Errorf("failed to generate the main component service: %w", err)
......@@ -374,6 +386,9 @@ func (r *DynamoGraphDeploymentReconciler) reconcileGroveResources(ctx context.Co
return true, ""
})
resources = append(resources, mainComponentServiceAsResource)
}
if component.ComponentType == consts.ComponentTypeFrontend {
// generate the main component ingress
ingressSpec := dynamo.GenerateDefaultIngressSpec(dynamoDeployment, r.Config.IngressConfig)
if component.Ingress != nil {
......@@ -501,6 +516,47 @@ func (r *DynamoGraphDeploymentReconciler) reconcilePVC(ctx context.Context, dyna
return pvc, nil
}
func (r *DynamoGraphDeploymentReconciler) reconcileK8sDiscoveryResources(ctx context.Context, dynamoDeployment *nvidiacomv1alpha1.DynamoGraphDeployment) error {
logger := log.FromContext(ctx)
if !r.Config.IsK8sDiscoveryEnabled(dynamoDeployment.Annotations) {
logger.Info("K8s discovery is not enabled")
return nil
} else {
logger.Info("K8s discovery is enabled")
}
serviceAccount := discovery.GetK8sDiscoveryServiceAccount(dynamoDeployment.Name, dynamoDeployment.Namespace)
_, _, err := commonController.SyncResource(ctx, r, dynamoDeployment, func(ctx context.Context) (*corev1.ServiceAccount, bool, error) {
return serviceAccount, false, nil
})
if err != nil {
logger.Error(err, "failed to sync the k8s discovery service account")
return fmt.Errorf("failed to sync the k8s discovery service account: %w", err)
}
role := discovery.GetK8sDiscoveryRole(dynamoDeployment.Name, dynamoDeployment.Namespace)
_, _, err = commonController.SyncResource(ctx, r, dynamoDeployment, func(ctx context.Context) (*rbacv1.Role, bool, error) {
return role, false, nil
})
if err != nil {
logger.Error(err, "failed to sync the k8s discovery role")
return fmt.Errorf("failed to sync the k8s discovery role: %w", err)
}
roleBinding := discovery.GetK8sDiscoveryRoleBinding(dynamoDeployment.Name, dynamoDeployment.Namespace)
_, _, err = commonController.SyncResource(ctx, r, dynamoDeployment, func(ctx context.Context) (*rbacv1.RoleBinding, bool, error) {
return roleBinding, false, nil
})
if err != nil {
logger.Error(err, "failed to sync the k8s discovery role binding")
return fmt.Errorf("failed to sync the k8s discovery role binding: %w", err)
}
return nil
}
// reconcilePVCs reconciles all top-level PVCs defined in the DynamoGraphDeployment spec
func (r *DynamoGraphDeploymentReconciler) reconcilePVCs(ctx context.Context, dynamoDeployment *nvidiacomv1alpha1.DynamoGraphDeployment) error {
logger := log.FromContext(ctx)
......
......@@ -22,6 +22,7 @@ import (
"strings"
"time"
commonconsts "github.com/ai-dynamo/dynamo/deploy/cloud/operator/internal/consts"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/client-go/discovery"
ctrl "sigs.k8s.io/controller-runtime"
......@@ -75,6 +76,9 @@ type Config struct {
RBAC RBACConfig
// ExcludedNamespaces is a thread-safe set of namespaces to exclude (cluster-wide mode only)
ExcludedNamespaces ExcludedNamespacesInterface
// DiscoveryBackend is the discovery backend to use. By default, will rely on ETCD for discovery. Can be set to "kubernetes" to use Kubernetes API for service discovery.
DiscoveryBackend string
}
// RBACConfig holds configuration for RBAC management
......@@ -153,6 +157,19 @@ func detectAPIGroupAvailability(ctx context.Context, mgr ctrl.Manager, groupName
return false
}
// For DGD, pass in the meta annotations
// For DCD, pass in the spec annotations
func (c Config) IsK8sDiscoveryEnabled(annotations map[string]string) bool {
return c.GetDiscoveryBackend(annotations) == "kubernetes"
}
func (c Config) GetDiscoveryBackend(annotations map[string]string) string {
if dgdDiscoveryBackend, exists := annotations[commonconsts.KubeAnnotationDynamoDiscoveryBackend]; exists {
return dgdDiscoveryBackend
}
return c.DiscoveryBackend
}
func EphemeralDeploymentEventFilter(config Config) predicate.Predicate {
return predicate.NewPredicateFuncs(func(o client.Object) bool {
l := log.FromContext(context.Background())
......
/*
* SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*/
package discovery
import (
"fmt"
corev1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
const (
kindServiceAccount = "ServiceAccount"
apiGroupRBAC = "rbac.authorization.k8s.io"
apiGroupCore = ""
)
func GetK8sDiscoveryServiceAccountName(dgdName string) string {
return fmt.Sprintf("%s-k8s-service-discovery", dgdName)
}
func GetK8sDiscoveryServiceAccount(dgdName string, namespace string) *corev1.ServiceAccount {
name := GetK8sDiscoveryServiceAccountName(dgdName)
return &corev1.ServiceAccount{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
Labels: map[string]string{
"app.kubernetes.io/managed-by": "dynamo-operator",
"app.kubernetes.io/component": "rbac",
"app.kubernetes.io/name": name,
},
},
}
}
func GetK8sDiscoveryRole(dgdName string, namespace string) *rbacv1.Role {
name := GetK8sDiscoveryServiceAccountName(dgdName)
roleName := name + "-role"
return &rbacv1.Role{
ObjectMeta: metav1.ObjectMeta{
Name: roleName,
Namespace: namespace,
Labels: map[string]string{
"app.kubernetes.io/managed-by": "dynamo-operator",
"app.kubernetes.io/component": "rbac",
"app.kubernetes.io/name": name,
},
},
Rules: []rbacv1.PolicyRule{
{
APIGroups: []string{apiGroupCore},
Resources: []string{"endpoints"},
Verbs: []string{"get", "list", "watch"},
},
{
APIGroups: []string{"discovery.k8s.io"},
Resources: []string{"endpointslices"},
Verbs: []string{"get", "list", "watch"},
},
},
}
}
func GetK8sDiscoveryRoleBinding(dgdName, namespace string) *rbacv1.RoleBinding {
name := GetK8sDiscoveryServiceAccountName(dgdName)
roleName := name + "-role"
bindingName := name + "-binding"
return &rbacv1.RoleBinding{
ObjectMeta: metav1.ObjectMeta{
Name: bindingName,
Namespace: namespace,
Labels: map[string]string{
"app.kubernetes.io/managed-by": "dynamo-operator",
"app.kubernetes.io/component": "rbac",
"app.kubernetes.io/name": name,
},
},
Subjects: []rbacv1.Subject{
{
Kind: kindServiceAccount,
Name: name,
Namespace: namespace,
},
},
RoleRef: rbacv1.RoleRef{
APIGroup: apiGroupRBAC,
Kind: "Role",
Name: roleName,
},
}
}
......@@ -27,7 +27,7 @@ func ComponentDefaultsFactory(componentType string) ComponentDefaults {
switch componentType {
case commonconsts.ComponentTypeFrontend:
return NewFrontendDefaults()
case commonconsts.ComponentTypeWorker:
case commonconsts.ComponentTypeWorker, commonconsts.ComponentTypePrefill, commonconsts.ComponentTypeDecode:
return NewWorkerDefaults()
case commonconsts.ComponentTypePlanner:
return NewPlannerDefaults()
......@@ -42,8 +42,10 @@ type BaseComponentDefaults struct{}
type ComponentContext struct {
numberOfNodes int32
DynamoNamespace string
ComponentType string
ParentGraphDeploymentName string
ParentGraphDeploymentNamespace string
DiscoveryBackend string
}
func (b *BaseComponentDefaults) GetBaseContainer(context ComponentContext) (corev1.Container, error) {
......@@ -71,9 +73,13 @@ func (b *BaseComponentDefaults) getCommonContainer(context ComponentContext) cor
}
container.Env = []corev1.EnvVar{
{
Name: "DYN_NAMESPACE",
Name: commonconsts.DynamoNamespaceEnvVar,
Value: context.DynamoNamespace,
},
{
Name: commonconsts.DynamoComponentEnvVar,
Value: context.ComponentType,
},
{
Name: "DYN_PARENT_DGD_K8S_NAME",
Value: context.ParentGraphDeploymentName,
......@@ -82,6 +88,29 @@ func (b *BaseComponentDefaults) getCommonContainer(context ComponentContext) cor
Name: "DYN_PARENT_DGD_K8S_NAMESPACE",
Value: context.ParentGraphDeploymentNamespace,
},
{
Name: "POD_NAME",
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: "metadata.name",
},
},
},
{
Name: "POD_NAMESPACE",
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: "metadata.namespace",
},
},
},
}
if context.DiscoveryBackend != "" {
container.Env = append(container.Env, corev1.EnvVar{
Name: commonconsts.DynamoDiscoveryBackendEnvVar,
Value: context.DiscoveryBackend,
})
}
return container
......
......@@ -18,29 +18,24 @@ func TestPlannerDefaults_GetBaseContainer(t *testing.T) {
type fields struct {
BaseComponentDefaults *BaseComponentDefaults
}
type args struct {
numberOfNodes int32
parentGraphDeploymentName string
parentGraphDeploymentNamespace string
dynamoNamespace string
}
tests := []struct {
name string
fields fields
args args
want corev1.Container
wantErr bool
name string
fields fields
componentContext ComponentContext
want corev1.Container
wantErr bool
}{
{
name: "test",
fields: fields{
BaseComponentDefaults: &BaseComponentDefaults{},
},
args: args{
componentContext: ComponentContext{
numberOfNodes: 1,
parentGraphDeploymentName: "name",
parentGraphDeploymentNamespace: "namespace",
dynamoNamespace: "dynamo-namespace",
ParentGraphDeploymentName: "name",
ParentGraphDeploymentNamespace: "namespace",
DynamoNamespace: "dynamo-namespace",
ComponentType: commonconsts.ComponentTypePlanner,
},
want: corev1.Container{
Name: commonconsts.MainContainerName,
......@@ -52,9 +47,23 @@ func TestPlannerDefaults_GetBaseContainer(t *testing.T) {
{Name: commonconsts.DynamoMetricsPortName, ContainerPort: commonconsts.DynamoPlannerMetricsPort, Protocol: corev1.ProtocolTCP},
},
Env: []corev1.EnvVar{
{Name: "DYN_NAMESPACE", Value: "dynamo-namespace"},
{Name: commonconsts.DynamoNamespaceEnvVar, Value: "dynamo-namespace"},
{Name: commonconsts.DynamoComponentEnvVar, Value: commonconsts.ComponentTypePlanner},
{Name: "DYN_PARENT_DGD_K8S_NAME", Value: "name"},
{Name: "DYN_PARENT_DGD_K8S_NAMESPACE", Value: "namespace"},
{
Name: "POD_NAME",
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: "metadata.name",
},
},
},
{Name: "POD_NAMESPACE", ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: "metadata.namespace",
},
}},
{Name: "PLANNER_PROMETHEUS_PORT", Value: fmt.Sprintf("%d", commonconsts.DynamoPlannerMetricsPort)},
},
},
......@@ -65,12 +74,7 @@ func TestPlannerDefaults_GetBaseContainer(t *testing.T) {
p := &PlannerDefaults{
BaseComponentDefaults: tt.fields.BaseComponentDefaults,
}
got, err := p.GetBaseContainer(ComponentContext{
numberOfNodes: tt.args.numberOfNodes,
ParentGraphDeploymentName: tt.args.parentGraphDeploymentName,
ParentGraphDeploymentNamespace: tt.args.parentGraphDeploymentNamespace,
DynamoNamespace: tt.args.dynamoNamespace,
})
got, err := p.GetBaseContainer(tt.componentContext)
if (err != nil) != tt.wantErr {
t.Errorf("PlannerDefaults.GetBaseContainer() error = %v, wantErr %v", err, tt.wantErr)
return
......
......@@ -38,6 +38,7 @@ import (
"github.com/ai-dynamo/dynamo/deploy/cloud/operator/api/v1alpha1"
commonconsts "github.com/ai-dynamo/dynamo/deploy/cloud/operator/internal/consts"
"github.com/ai-dynamo/dynamo/deploy/cloud/operator/internal/controller_common"
"github.com/ai-dynamo/dynamo/deploy/cloud/operator/internal/discovery"
"github.com/imdario/mergo"
networkingv1beta1 "istio.io/client-go/pkg/apis/networking/v1beta1"
corev1 "k8s.io/api/core/v1"
......@@ -135,12 +136,15 @@ func GenerateDynamoComponentsDeployments(ctx context.Context, parentDynamoGraphD
// Propagate metrics annotation from parent deployment if present
if parentDynamoGraphDeployment.Annotations != nil {
if deployment.Spec.Annotations == nil {
deployment.Spec.Annotations = make(map[string]string)
}
if val, exists := parentDynamoGraphDeployment.Annotations[commonconsts.KubeAnnotationEnableMetrics]; exists {
if deployment.Spec.Annotations == nil {
deployment.Spec.Annotations = make(map[string]string)
}
deployment.Spec.Annotations[commonconsts.KubeAnnotationEnableMetrics] = val
}
if val, exists := parentDynamoGraphDeployment.Annotations[commonconsts.KubeAnnotationDynamoDiscoveryBackend]; exists {
deployment.Spec.Annotations[commonconsts.KubeAnnotationDynamoDiscoveryBackend] = val
}
}
if component.ComponentType == commonconsts.ComponentTypePlanner {
......@@ -392,24 +396,39 @@ func getCliqueStartupDependencies(
return nil
}
func GenerateComponentService(ctx context.Context, componentName, componentNamespace string) (*corev1.Service, error) {
func GenerateComponentService(ctx context.Context, dynamoDeployment *v1alpha1.DynamoGraphDeployment, component *v1alpha1.DynamoComponentDeploymentSharedSpec, componentName string) (*corev1.Service, error) {
if component.DynamoNamespace == nil {
return nil, fmt.Errorf("expected DynamoComponentDeployment %s to have a dynamoNamespace", componentName)
}
componentName = GetDynamoComponentName(dynamoDeployment, componentName)
var servicePort corev1.ServicePort
if component.ComponentType == commonconsts.ComponentTypeFrontend {
servicePort = corev1.ServicePort{
Name: commonconsts.DynamoServicePortName,
Port: commonconsts.DynamoServicePort,
TargetPort: intstr.FromString(commonconsts.DynamoContainerPortName),
Protocol: corev1.ProtocolTCP,
}
} else {
servicePort = corev1.ServicePort{
Name: commonconsts.DynamoSystemPortName,
Port: commonconsts.DynamoSystemPort,
TargetPort: intstr.FromString(commonconsts.DynamoSystemPortName),
Protocol: corev1.ProtocolTCP,
}
}
service := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: componentName,
Namespace: componentNamespace,
Namespace: dynamoDeployment.Namespace,
},
Spec: corev1.ServiceSpec{
Selector: map[string]string{
commonconsts.KubeLabelDynamoSelector: componentName,
},
Ports: []corev1.ServicePort{
{
Name: commonconsts.DynamoServicePortName,
Port: commonconsts.DynamoServicePort,
TargetPort: intstr.FromString(commonconsts.DynamoContainerPortName),
Protocol: corev1.ProtocolTCP,
},
commonconsts.KubeLabelDynamoComponentType: component.ComponentType,
commonconsts.KubeLabelDynamoNamespace: *component.DynamoNamespace,
},
Ports: []corev1.ServicePort{servicePort},
},
}
return service, nil
......@@ -631,7 +650,9 @@ func MultinodeDeployerFactory(multinodeDeploymentType commonconsts.MultinodeDepl
// isWorkerComponent checks if a component is a worker that needs backend framework detection
func isWorkerComponent(componentType string) bool {
return componentType == commonconsts.ComponentTypeWorker
return componentType == commonconsts.ComponentTypeWorker ||
componentType == commonconsts.ComponentTypePrefill ||
componentType == commonconsts.ComponentTypeDecode
}
// addStandardEnvVars adds the standard environment variables that are common to both Grove and Controller
......@@ -685,7 +706,7 @@ func GenerateBasePodSpec(
serviceName string,
) (*corev1.PodSpec, error) {
// Start with base container generated per component type
componentContext := generateComponentContext(component, parentGraphDeploymentName, namespace, numberOfNodes)
componentContext := generateComponentContext(component, parentGraphDeploymentName, namespace, numberOfNodes, controllerConfig.GetDiscoveryBackend(component.Annotations))
componentDefaults := ComponentDefaultsFactory(component.ComponentType)
container, err := componentDefaults.GetBaseContainer(componentContext)
if err != nil {
......@@ -833,6 +854,11 @@ func GenerateBasePodSpec(
return nil, fmt.Errorf("failed to merge extraPodSpec: %w", err)
}
}
if controllerConfig.IsK8sDiscoveryEnabled(component.Annotations) {
podSpec.ServiceAccountName = discovery.GetK8sDiscoveryServiceAccountName(parentGraphDeploymentName)
}
podSpec.Containers = append(podSpec.Containers, container)
podSpec.Volumes = append(podSpec.Volumes, volumes...)
podSpec.ImagePullSecrets = append(podSpec.ImagePullSecrets, imagePullSecrets...)
......@@ -852,11 +878,13 @@ func setMetricsLabels(labels map[string]string, dynamoGraphDeployment *v1alpha1.
labels[commonconsts.KubeLabelMetricsEnabled] = commonconsts.KubeLabelValueTrue
}
func generateComponentContext(component *v1alpha1.DynamoComponentDeploymentSharedSpec, parentGraphDeploymentName string, namespace string, numberOfNodes int32) ComponentContext {
func generateComponentContext(component *v1alpha1.DynamoComponentDeploymentSharedSpec, parentGraphDeploymentName string, namespace string, numberOfNodes int32, discoveryBackend string) ComponentContext {
componentContext := ComponentContext{
numberOfNodes: numberOfNodes,
ComponentType: component.ComponentType,
ParentGraphDeploymentName: parentGraphDeploymentName,
ParentGraphDeploymentNamespace: namespace,
DiscoveryBackend: discoveryBackend,
}
if component.DynamoNamespace != nil {
componentContext.DynamoNamespace = *component.DynamoNamespace
......@@ -915,6 +943,8 @@ func GenerateGrovePodCliqueSet(
}
}
discoveryBackend := controllerConfig.GetDiscoveryBackend(dynamoDeployment.Annotations)
var scalingGroups []grovev1alpha1.PodCliqueScalingGroupConfig
for serviceName, component := range dynamoDeployment.Spec.Services {
dynamoNamespace := getDynamoNamespace(dynamoDeployment, component)
......@@ -925,6 +955,13 @@ func GenerateGrovePodCliqueSet(
return nil, fmt.Errorf("failed to determine backend framework for service %s: %w", serviceName, err)
}
if discoveryBackend != "" {
if component.Annotations == nil {
component.Annotations = make(map[string]string)
}
component.Annotations[commonconsts.KubeAnnotationDynamoDiscoveryBackend] = discoveryBackend
}
numberOfNodes := component.GetNumberOfNodes()
isMultinode := numberOfNodes > 1
roles := expandRolesForService(serviceName, component.Replicas, numberOfNodes)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment