Unverified Commit 81c27803 authored by mohammedabdulwahhab's avatar mohammedabdulwahhab Committed by GitHub
Browse files

fix: operator defaults (#2398)


Signed-off-by: default avatarmohammedabdulwahhab <furkhan324@berkeley.edu>
parent 9ddb3efd
/*
* SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*/
package dynamo
import (
"fmt"
commonconsts "github.com/ai-dynamo/dynamo/deploy/cloud/operator/internal/consts"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/util/intstr"
)
// FrontendDefaults implements ComponentDefaults for Frontend components
type FrontendDefaults struct {
*BaseComponentDefaults
}
func NewFrontendDefaults() *FrontendDefaults {
return &FrontendDefaults{&BaseComponentDefaults{}}
}
func (f *FrontendDefaults) GetBaseContainer(numberOfNodes int32) (corev1.Container, error) {
// Frontend doesn't need backend-specific config
container := f.getCommonContainer()
// Add HTTP port
container.Ports = []corev1.ContainerPort{
{
Protocol: corev1.ProtocolTCP,
Name: commonconsts.DynamoContainerPortName,
ContainerPort: int32(commonconsts.DynamoServicePort),
},
}
// Add frontend-specific defaults
container.LivenessProbe = &corev1.Probe{
ProbeHandler: corev1.ProbeHandler{
HTTPGet: &corev1.HTTPGetAction{
Path: "/health",
Port: intstr.FromString(commonconsts.DynamoContainerPortName),
},
},
InitialDelaySeconds: 60,
PeriodSeconds: 60,
TimeoutSeconds: 30,
FailureThreshold: 10,
}
container.ReadinessProbe = &corev1.Probe{
ProbeHandler: corev1.ProbeHandler{
Exec: &corev1.ExecAction{
Command: []string{
"/bin/sh",
"-c",
"curl -s http://localhost:${DYNAMO_PORT}/health | jq -e \".status == \\\"healthy\\\"\"",
},
},
},
InitialDelaySeconds: 60,
PeriodSeconds: 60,
TimeoutSeconds: 30,
FailureThreshold: 10,
}
container.Resources = corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("1"),
corev1.ResourceMemory: resource.MustParse("2Gi"),
},
Limits: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("1"),
corev1.ResourceMemory: resource.MustParse("2Gi"),
},
}
// Add standard environment variables
container.Env = []corev1.EnvVar{
{
Name: commonconsts.EnvDynamoServicePort,
Value: fmt.Sprintf("%d", commonconsts.DynamoServicePort),
},
}
return container, nil
}
/*
* SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*/
package dynamo
import (
commonconsts "github.com/ai-dynamo/dynamo/deploy/cloud/operator/internal/consts"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
)
// PlannerDefaults implements ComponentDefaults for Planner components
type PlannerDefaults struct {
*BaseComponentDefaults
}
func NewPlannerDefaults() *PlannerDefaults {
return &PlannerDefaults{&BaseComponentDefaults{}}
}
func (p *PlannerDefaults) GetBaseContainer(numberOfNodes int32) (corev1.Container, error) {
container := p.getCommonContainer()
// Add planner-specific defaults
container.Resources = corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("2"),
corev1.ResourceMemory: resource.MustParse("2Gi"),
},
Limits: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("2"),
corev1.ResourceMemory: resource.MustParse("2Gi"),
},
}
return container, nil
}
func (p *PlannerDefaults) GetBasePodSpec(numberOfNodes int32) (corev1.PodSpec, error) {
podSpec := corev1.PodSpec{
ServiceAccountName: commonconsts.PlannerServiceAccountName,
}
return podSpec, nil
}
/*
* SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*/
package dynamo
import (
"fmt"
commonconsts "github.com/ai-dynamo/dynamo/deploy/cloud/operator/internal/consts"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/util/intstr"
)
// WorkerDefaults implements ComponentDefaults for Worker components
type WorkerDefaults struct {
*BaseComponentDefaults
}
func NewWorkerDefaults() *WorkerDefaults {
return &WorkerDefaults{&BaseComponentDefaults{}}
}
func (w *WorkerDefaults) GetBaseContainer(numberOfNodes int32) (corev1.Container, error) {
container := w.getCommonContainer()
// Add system port
container.Ports = []corev1.ContainerPort{
{
Protocol: corev1.ProtocolTCP,
Name: commonconsts.DynamoSystemPortName,
ContainerPort: int32(commonconsts.DynamoSystemPort),
},
}
// Add worker base defaults
container.Resources = corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("10"),
corev1.ResourceMemory: resource.MustParse("20Gi"),
},
Limits: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("10"),
corev1.ResourceMemory: resource.MustParse("20Gi"),
"nvidia.com/gpu": resource.MustParse("1"),
},
}
container.LivenessProbe = &corev1.Probe{
ProbeHandler: corev1.ProbeHandler{
HTTPGet: &corev1.HTTPGetAction{
Path: "/live",
Port: intstr.FromString(commonconsts.DynamoSystemPortName),
},
},
PeriodSeconds: 5,
TimeoutSeconds: 30,
FailureThreshold: 1,
}
container.ReadinessProbe = &corev1.Probe{
ProbeHandler: corev1.ProbeHandler{
HTTPGet: &corev1.HTTPGetAction{
Path: "/health",
Port: intstr.FromString(commonconsts.DynamoSystemPortName),
},
},
PeriodSeconds: 10,
TimeoutSeconds: 30,
FailureThreshold: 60,
}
container.StartupProbe = &corev1.Probe{
ProbeHandler: corev1.ProbeHandler{
HTTPGet: &corev1.HTTPGetAction{
Path: "/live",
Port: intstr.FromString(commonconsts.DynamoSystemPortName),
},
},
PeriodSeconds: 10,
TimeoutSeconds: 5,
FailureThreshold: 60,
}
container.Env = []corev1.EnvVar{
{
Name: "DYN_SYSTEM_ENABLED",
Value: "true",
},
{
Name: "DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS",
Value: "[\"generate\"]",
},
{
Name: "DYN_SYSTEM_PORT",
Value: fmt.Sprintf("%d", commonconsts.DynamoSystemPort),
},
}
return container, nil
}
...@@ -21,6 +21,7 @@ import ( ...@@ -21,6 +21,7 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"maps"
"regexp" "regexp"
"sort" "sort"
"strconv" "strconv"
...@@ -191,7 +192,7 @@ func GenerateDynamoComponentsDeployments(ctx context.Context, parentDynamoGraphD ...@@ -191,7 +192,7 @@ func GenerateDynamoComponentsDeployments(ctx context.Context, parentDynamoGraphD
// finally set the service account name // finally set the service account name
deployment.Spec.ExtraPodSpec.PodSpec.ServiceAccountName = commonconsts.PlannerServiceAccountName deployment.Spec.ExtraPodSpec.PodSpec.ServiceAccountName = commonconsts.PlannerServiceAccountName
} }
if deployment.IsMainComponent() && defaultIngressSpec != nil && deployment.Spec.Ingress == nil { if deployment.IsFrontendComponent() && defaultIngressSpec != nil && deployment.Spec.Ingress == nil {
deployment.Spec.Ingress = defaultIngressSpec deployment.Spec.Ingress = defaultIngressSpec
} }
// merge the envs from the parent deployment with the envs from the service // merge the envs from the parent deployment with the envs from the service
...@@ -219,7 +220,7 @@ func GenerateDynamoComponentsDeployments(ctx context.Context, parentDynamoGraphD ...@@ -219,7 +220,7 @@ func GenerateDynamoComponentsDeployments(ctx context.Context, parentDynamoGraphD
// updateDynDeploymentConfig updates the runtime config object for the given dynamoDeploymentComponent // updateDynDeploymentConfig updates the runtime config object for the given dynamoDeploymentComponent
// It updates the port for the given service (if it is the main component) // It updates the port for the given service (if it is the main component)
func updateDynDeploymentConfig(dynamoDeploymentComponent *v1alpha1.DynamoComponentDeployment, newPort int) error { func updateDynDeploymentConfig(dynamoDeploymentComponent *v1alpha1.DynamoComponentDeployment, newPort int) error {
if dynamoDeploymentComponent.IsMainComponent() { if dynamoDeploymentComponent.IsFrontendComponent() {
dynamoDeploymentConfig := dynamoDeploymentComponent.GetDynamoDeploymentConfig() dynamoDeploymentConfig := dynamoDeploymentComponent.GetDynamoDeploymentConfig()
if dynamoDeploymentConfig != nil { if dynamoDeploymentConfig != nil {
var config map[string]any var config map[string]any
...@@ -668,11 +669,6 @@ func isWorkerComponent(componentType string) bool { ...@@ -668,11 +669,6 @@ func isWorkerComponent(componentType string) bool {
// addStandardEnvVars adds the standard environment variables that are common to both Grove and Controller // addStandardEnvVars adds the standard environment variables that are common to both Grove and Controller
func addStandardEnvVars(container *corev1.Container, controllerConfig controller_common.Config) { func addStandardEnvVars(container *corev1.Container, controllerConfig controller_common.Config) {
container.Env = append(container.Env, corev1.EnvVar{
Name: commonconsts.EnvDynamoServicePort,
Value: fmt.Sprintf("%d", commonconsts.DynamoServicePort),
})
if controllerConfig.NatsAddress != "" { if controllerConfig.NatsAddress != "" {
container.Env = append(container.Env, corev1.EnvVar{ container.Env = append(container.Env, corev1.EnvVar{
Name: "NATS_SERVER", Name: "NATS_SERVER",
...@@ -702,47 +698,60 @@ func GenerateBasePodSpec( ...@@ -702,47 +698,60 @@ func GenerateBasePodSpec(
multinodeDeploymentType commonconsts.MultinodeDeploymentType, multinodeDeploymentType commonconsts.MultinodeDeploymentType,
serviceName string, serviceName string,
) (corev1.PodSpec, error) { ) (corev1.PodSpec, error) {
container := corev1.Container{ // Start with base container generated per component type
Name: "main", componentDefaults := ComponentDefaultsFactory(component.ComponentType, numberOfNodes)
LivenessProbe: component.LivenessProbe, container, err := componentDefaults.GetBaseContainer(numberOfNodes)
ReadinessProbe: component.ReadinessProbe, if err != nil {
Env: component.Envs, return corev1.PodSpec{}, fmt.Errorf("failed to get base container: %w", err)
Ports: []corev1.ContainerPort{
{
Protocol: corev1.ProtocolTCP,
Name: commonconsts.DynamoContainerPortName,
ContainerPort: int32(commonconsts.DynamoServicePort),
},
},
}
// Add system port for worker components
if component.ComponentType == commonconsts.ComponentTypeWorker {
container.Ports = append(container.Ports, corev1.ContainerPort{
Protocol: corev1.ProtocolTCP,
Name: commonconsts.DynamoSystemPortName,
ContainerPort: int32(commonconsts.DynamoSystemPort),
})
} }
// First merge the mainContainer from extraPodSpec to get the base command and args
if component.ExtraPodSpec != nil && component.ExtraPodSpec.MainContainer != nil { if component.ExtraPodSpec != nil && component.ExtraPodSpec.MainContainer != nil {
main := component.ExtraPodSpec.MainContainer.DeepCopy() main := component.ExtraPodSpec.MainContainer.DeepCopy()
if main != nil { if main != nil {
// merge the extraPodSpec from the parent deployment with the extraPodSpec from the service // merge the extraPodSpec from the parent deployment with the extraPodSpec from the service
err := mergo.Merge(&container, *main, mergo.WithOverride) err = mergo.Merge(&container, *main, mergo.WithOverride)
if err != nil { if err != nil {
return corev1.PodSpec{}, fmt.Errorf("failed to merge extraPodSpec: %w", err) return corev1.PodSpec{}, fmt.Errorf("failed to merge extraPodSpec: %w", err)
} }
// main container fields that require special handling
container.Env = MergeEnvs(component.Envs, container.Env) container.Env = MergeEnvs(component.Envs, container.Env)
// Note: startup probe does not have its own top level field so it must be passed in extraPodSpec.MainContainer
// We want to overwrite entirely if provided rather than merge
if main.StartupProbe != nil {
container.StartupProbe = main.StartupProbe
}
} }
} }
resourcesConfig, err := controller_common.GetResourcesConfig(component.Resources) // Merge probes entirely if they are passed (no partial merge)
if component.LivenessProbe != nil {
container.LivenessProbe = component.LivenessProbe.DeepCopy()
}
if component.ReadinessProbe != nil {
container.ReadinessProbe = component.ReadinessProbe.DeepCopy()
}
overrideResources, err := controller_common.GetResourcesConfig(component.Resources)
if err != nil { if err != nil {
return corev1.PodSpec{}, fmt.Errorf("failed to get resources config: %w", err) return corev1.PodSpec{}, fmt.Errorf("failed to get resources config: %w", err)
} }
if resourcesConfig != nil { // Requests
container.Resources = *resourcesConfig if overrideResources != nil && len(overrideResources.Requests) > 0 {
if container.Resources.Requests == nil {
container.Resources.Requests = corev1.ResourceList{}
}
maps.Copy(container.Resources.Requests, overrideResources.Requests)
} }
// Limits
if overrideResources != nil && len(overrideResources.Limits) > 0 {
if container.Resources.Limits == nil {
container.Resources.Limits = corev1.ResourceList{}
}
maps.Copy(container.Resources.Limits, overrideResources.Limits)
}
imagePullSecrets := []corev1.LocalObjectReference{} imagePullSecrets := []corev1.LocalObjectReference{}
if secretsRetriever != nil && component.ExtraPodSpec != nil && component.ExtraPodSpec.MainContainer != nil && component.ExtraPodSpec.MainContainer.Image != "" { if secretsRetriever != nil && component.ExtraPodSpec != nil && component.ExtraPodSpec.MainContainer != nil && component.ExtraPodSpec.MainContainer.Image != "" {
secretsName, err := secretsRetriever.GetSecrets(namespace, component.ExtraPodSpec.MainContainer.Image) secretsName, err := secretsRetriever.GetSecrets(namespace, component.ExtraPodSpec.MainContainer.Image)
...@@ -780,15 +789,26 @@ func GenerateBasePodSpec( ...@@ -780,15 +789,26 @@ func GenerateBasePodSpec(
shmVolume, shmVolumeMount := generateSharedMemoryVolumeAndMount(&container.Resources) shmVolume, shmVolumeMount := generateSharedMemoryVolumeAndMount(&container.Resources)
volumes = append(volumes, shmVolume) volumes = append(volumes, shmVolume)
container.VolumeMounts = append(container.VolumeMounts, shmVolumeMount) container.VolumeMounts = append(container.VolumeMounts, shmVolumeMount)
// Apply backend-specific container modifications // Apply backend-specific container modifications
backend := BackendFactory(backendFramework) backend := BackendFactory(backendFramework)
if backend == nil { if backend == nil {
return corev1.PodSpec{}, fmt.Errorf("unsupported backend framework: %s", backendFramework) return corev1.PodSpec{}, fmt.Errorf("unsupported backend framework: %s", backendFramework)
} }
backend.UpdateContainer(&container, numberOfNodes, role, component, multinodeDeploymentType, serviceName) backend.UpdateContainer(&container, numberOfNodes, role, component, multinodeDeploymentType, serviceName)
var podSpec corev1.PodSpec
// get base podspec from component
podSpec, err := componentDefaults.GetBasePodSpec(numberOfNodes)
if err != nil {
return corev1.PodSpec{}, fmt.Errorf("failed to get base podspec: %w", err)
}
if component.ExtraPodSpec != nil && component.ExtraPodSpec.PodSpec != nil { if component.ExtraPodSpec != nil && component.ExtraPodSpec.PodSpec != nil {
podSpec = *component.ExtraPodSpec.PodSpec.DeepCopy() // merge extraPodSpec PodSpec with base podspec
err := mergo.Merge(&podSpec, component.ExtraPodSpec.PodSpec.DeepCopy(), mergo.WithOverride)
if err != nil {
return corev1.PodSpec{}, fmt.Errorf("failed to merge extraPodSpec: %w", err)
}
} }
podSpec.Containers = append(podSpec.Containers, container) podSpec.Containers = append(podSpec.Containers, container)
podSpec.Volumes = append(podSpec.Volumes, volumes...) podSpec.Volumes = append(podSpec.Volumes, volumes...)
......
...@@ -27,7 +27,7 @@ spec: ...@@ -27,7 +27,7 @@ spec:
timeoutSeconds: 2 timeoutSeconds: 2
failureThreshold: 3 failureThreshold: 3
dynamoNamespace: hello-world dynamoNamespace: hello-world
componentType: main componentType: frontend
replicas: 1 replicas: 1
resources: resources:
requests: requests:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment