Unverified Commit 51ca5527 authored by julienmancuso's avatar julienmancuso Committed by GitHub
Browse files

fix: take into account number of workers from config (#1365)

parent 7ca0faa8
......@@ -23,6 +23,7 @@ import (
"strings"
dynamoCommon "github.com/ai-dynamo/dynamo/deploy/cloud/operator/api/dynamo/common"
commonconsts "github.com/ai-dynamo/dynamo/deploy/cloud/operator/internal/consts"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
......@@ -166,3 +167,25 @@ func (s *DynamoComponentDeployment) SetSpec(spec any) {
func (s *DynamoComponentDeployment) IsMainComponent() bool {
return strings.HasSuffix(s.Spec.DynamoTag, s.Spec.ServiceName)
}
func (s *DynamoComponentDeployment) GetDynamoDeploymentConfig() []byte {
for _, env := range s.Spec.Envs {
if env.Name == commonconsts.DynamoDeploymentConfigEnvVar {
return []byte(env.Value)
}
}
return nil
}
func (s *DynamoComponentDeployment) SetDynamoDeploymentConfig(config []byte) {
for i, env := range s.Spec.Envs {
if env.Name == commonconsts.DynamoDeploymentConfigEnvVar {
s.Spec.Envs[i].Value = string(config)
return
}
}
s.Spec.Envs = append(s.Spec.Envs, corev1.EnvVar{
Name: commonconsts.DynamoDeploymentConfigEnvVar,
Value: string(config),
})
}
......@@ -20,8 +20,11 @@
package v1alpha1
import (
"reflect"
"testing"
commonconsts "github.com/ai-dynamo/dynamo/deploy/cloud/operator/internal/consts"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
......@@ -76,3 +79,134 @@ func TestDynamoComponentDeployment_IsMainComponent(t *testing.T) {
})
}
}
func TestDynamoComponentDeployment_GetDynamoDeploymentConfig(t *testing.T) {
type fields struct {
TypeMeta metav1.TypeMeta
ObjectMeta metav1.ObjectMeta
Spec DynamoComponentDeploymentSpec
Status DynamoComponentDeploymentStatus
}
tests := []struct {
name string
fields fields
want []byte
}{
{
name: "no config",
fields: fields{
Spec: DynamoComponentDeploymentSpec{
DynamoComponentDeploymentSharedSpec: DynamoComponentDeploymentSharedSpec{
Envs: []corev1.EnvVar{},
},
},
},
want: nil,
},
{
name: "with config",
fields: fields{
Spec: DynamoComponentDeploymentSpec{
DynamoComponentDeploymentSharedSpec: DynamoComponentDeploymentSharedSpec{
Envs: []corev1.EnvVar{
{
Name: commonconsts.DynamoDeploymentConfigEnvVar,
Value: `{"Frontend":{"port":8080},"Planner":{"environment":"kubernetes"}}`,
},
},
},
},
},
want: []byte(`{"Frontend":{"port":8080},"Planner":{"environment":"kubernetes"}}`),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s := &DynamoComponentDeployment{
TypeMeta: tt.fields.TypeMeta,
ObjectMeta: tt.fields.ObjectMeta,
Spec: tt.fields.Spec,
Status: tt.fields.Status,
}
if got := s.GetDynamoDeploymentConfig(); !reflect.DeepEqual(got, tt.want) {
t.Errorf("DynamoComponentDeployment.GetDynamoDeploymentConfig() = %v, want %v", got, tt.want)
}
})
}
}
func TestDynamoComponentDeployment_SetDynamoDeploymentConfig(t *testing.T) {
type fields struct {
TypeMeta metav1.TypeMeta
ObjectMeta metav1.ObjectMeta
Spec DynamoComponentDeploymentSpec
Status DynamoComponentDeploymentStatus
}
type args struct {
config []byte
}
tests := []struct {
name string
fields fields
args args
want []corev1.EnvVar
}{
{
name: "no config",
fields: fields{
Spec: DynamoComponentDeploymentSpec{
DynamoComponentDeploymentSharedSpec: DynamoComponentDeploymentSharedSpec{
Envs: nil,
},
},
},
args: args{
config: []byte(`{"Frontend":{"port":8080},"Planner":{"environment":"kubernetes"}}`),
},
want: []corev1.EnvVar{
{
Name: commonconsts.DynamoDeploymentConfigEnvVar,
Value: `{"Frontend":{"port":8080},"Planner":{"environment":"kubernetes"}}`,
},
},
},
{
name: "with config",
fields: fields{
Spec: DynamoComponentDeploymentSpec{
DynamoComponentDeploymentSharedSpec: DynamoComponentDeploymentSharedSpec{
Envs: []corev1.EnvVar{
{
Name: commonconsts.DynamoDeploymentConfigEnvVar,
Value: `{"Frontend":{"port":8080},"Planner":{"environment":"kubernetes"}}`,
},
},
},
},
},
args: args{
config: []byte(`{"Frontend":{"port":9000},"Planner":{"environment":"kubernetes"}}`),
},
want: []corev1.EnvVar{
{
Name: commonconsts.DynamoDeploymentConfigEnvVar,
Value: `{"Frontend":{"port":9000},"Planner":{"environment":"kubernetes"}}`,
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s := &DynamoComponentDeployment{
TypeMeta: tt.fields.TypeMeta,
ObjectMeta: tt.fields.ObjectMeta,
Spec: tt.fields.Spec,
Status: tt.fields.Status,
}
s.SetDynamoDeploymentConfig(tt.args.config)
if !reflect.DeepEqual(s.Spec.DynamoComponentDeploymentSharedSpec.Envs, tt.want) {
t.Errorf("DynamoComponentDeployment.SetDynamoDeploymentConfig() = %v, want %v", s.Spec.DynamoComponentDeploymentSharedSpec.Envs, tt.want)
}
})
}
}
......@@ -67,4 +67,6 @@ const (
KubeAnnotationDynamoComponentHash = "nvidia.com/dynamo-request-hash"
KubeAnnotationDynamoComponentImageBuiderHash = "nvidia.com/dynamo-request-image-builder-hash"
KubeAnnotationDynamoComponentStorageNS = "nvidia.com/dynamo-storage-namespace"
DynamoDeploymentConfigEnvVar = "DYN_DEPLOYMENT_CONFIG"
)
......@@ -19,11 +19,8 @@ package controller
import (
"context"
"encoding/json"
"fmt"
"dario.cat/mergo"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
......@@ -33,9 +30,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/predicate"
dynamoCommon "github.com/ai-dynamo/dynamo/deploy/cloud/operator/api/dynamo/common"
nvidiacomv1alpha1 "github.com/ai-dynamo/dynamo/deploy/cloud/operator/api/v1alpha1"
"github.com/ai-dynamo/dynamo/deploy/cloud/operator/internal/consts"
commonController "github.com/ai-dynamo/dynamo/deploy/cloud/operator/internal/controller_common"
"github.com/ai-dynamo/dynamo/deploy/cloud/operator/internal/dynamo"
)
......@@ -44,8 +39,6 @@ const (
FailedState = "failed"
ReadyState = "successful"
PendingState = "pending"
DYN_DEPLOYMENT_CONFIG_ENV_VAR = "DYN_DEPLOYMENT_CONFIG"
)
type etcdStorage interface {
......@@ -141,37 +134,12 @@ func (r *DynamoGraphDeploymentReconciler) Reconcile(ctx context.Context, req ctr
}
// merge the dynamoComponentsDeployments with the dynamoComponentsDeployments from the CRD
for serviceName, deployment := range dynamoComponentsDeployments {
if _, ok := dynamoDeployment.Spec.Services[serviceName]; ok {
err := mergo.Merge(&deployment.Spec.DynamoComponentDeploymentSharedSpec, dynamoDeployment.Spec.Services[serviceName].DynamoComponentDeploymentSharedSpec, mergo.WithOverride)
if err != nil {
logger.Error(err, "failed to merge the DynamoComponentsDeployments")
reason = "failed_to_merge_the_DynamoComponentsDeployments"
return ctrl.Result{}, err
}
}
for _, deployment := range dynamoComponentsDeployments {
if deployment.Spec.Ingress.Enabled {
dynamoDeployment.SetEndpointStatus(r.isEndpointSecured(), getIngressHost(deployment.Spec.Ingress))
}
}
// Set common env vars on each of the dynamoComponentsDeployments
for _, deployment := range dynamoComponentsDeployments {
if len(dynamoDeployment.Spec.Envs) > 0 {
deployment.Spec.Envs = mergeEnvs(dynamoDeployment.Spec.Envs, deployment.Spec.Envs)
}
err := updateDynDeploymentConfig(deployment, consts.DynamoServicePort)
if err != nil {
logger.Error(err, fmt.Sprintf("Failed to update the %v env var", DYN_DEPLOYMENT_CONFIG_ENV_VAR))
return ctrl.Result{}, err
}
err = overrideWithDynDeploymentConfig(ctx, deployment)
if err != nil {
logger.Error(err, fmt.Sprintf("Failed to override the component config with the %v env var", DYN_DEPLOYMENT_CONFIG_ENV_VAR))
return ctrl.Result{}, err
}
}
// reconcile the dynamoComponent
// for now we use the same component for all the services and we differentiate them by the service name when launching the component
dynamoComponent := &nvidiacomv1alpha1.DynamoComponent{
......@@ -260,121 +228,6 @@ func (r *DynamoGraphDeploymentReconciler) isEndpointSecured() bool {
return r.IngressControllerTLSSecret != ""
}
func mergeEnvs(common, specific []corev1.EnvVar) []corev1.EnvVar {
envMap := make(map[string]corev1.EnvVar)
// Add all common environment variables.
for _, env := range common {
envMap[env.Name] = env
}
// Override or add with service-specific environment variables.
for _, env := range specific {
envMap[env.Name] = env
}
// Convert the map back to a slice.
merged := make([]corev1.EnvVar, 0, len(envMap))
for _, env := range envMap {
merged = append(merged, env)
}
return merged
}
// updateDynDeploymentConfig updates the DYN_DEPLOYMENT_CONFIG env var for the given dynamoDeploymentComponent
// It updates the port for the given service in the DYN_DEPLOYMENT_CONFIG env var (if it is the main component)
func updateDynDeploymentConfig(dynamoDeploymentComponent *nvidiacomv1alpha1.DynamoComponentDeployment, newPort int) error {
if dynamoDeploymentComponent.IsMainComponent() {
for i, env := range dynamoDeploymentComponent.Spec.Envs {
if env.Name == DYN_DEPLOYMENT_CONFIG_ENV_VAR {
var config map[string]any
if err := json.Unmarshal([]byte(env.Value), &config); err != nil {
return fmt.Errorf("failed to unmarshal %v: %w", DYN_DEPLOYMENT_CONFIG_ENV_VAR, err)
}
// Safely navigate and update the config
if serviceConfig, ok := config[dynamoDeploymentComponent.Spec.ServiceName].(map[string]any); ok {
if _, portExists := serviceConfig["port"]; portExists {
serviceConfig["port"] = newPort
}
}
// Marshal back to JSON string
updated, err := json.Marshal(config)
if err != nil {
return fmt.Errorf("failed to marshal updated config: %w", err)
}
// Update env var
dynamoDeploymentComponent.Spec.Envs[i].Value = string(updated)
break
}
}
}
return nil
}
func overrideWithDynDeploymentConfig(ctx context.Context, dynamoDeploymentComponent *nvidiacomv1alpha1.DynamoComponentDeployment) error {
for _, env := range dynamoDeploymentComponent.Spec.Envs {
if env.Name == DYN_DEPLOYMENT_CONFIG_ENV_VAR {
dynDeploymentConfig, err := dynamo.ParseDynDeploymentConfig(ctx, []byte(env.Value))
if err != nil {
return fmt.Errorf("failed to parse %v: %w", DYN_DEPLOYMENT_CONFIG_ENV_VAR, err)
}
componentDynConfig := dynDeploymentConfig[dynamoDeploymentComponent.Spec.ServiceName]
if componentDynConfig != nil {
if componentDynConfig.ServiceArgs != nil && componentDynConfig.ServiceArgs.Workers != nil && dynamoDeploymentComponent.Spec.Replicas == nil {
// we only override the replicas if it is not set in the CRD.
// replicas, if set in the CRD set in the CRD must always be the source of truth.
dynamoDeploymentComponent.Spec.Replicas = componentDynConfig.ServiceArgs.Workers
}
if componentDynConfig.ServiceArgs != nil && componentDynConfig.ServiceArgs.Resources != nil {
requests := &dynamoCommon.ResourceItem{}
limits := &dynamoCommon.ResourceItem{}
if dynamoDeploymentComponent.Spec.Resources == nil {
dynamoDeploymentComponent.Spec.Resources = &dynamoCommon.Resources{
Requests: requests,
Limits: limits,
}
} else {
if dynamoDeploymentComponent.Spec.Resources.Requests != nil {
requests = dynamoDeploymentComponent.Spec.Resources.Requests
} else {
dynamoDeploymentComponent.Spec.Resources.Requests = requests
}
if dynamoDeploymentComponent.Spec.Resources.Limits != nil {
limits = dynamoDeploymentComponent.Spec.Resources.Limits
} else {
dynamoDeploymentComponent.Spec.Resources.Limits = limits
}
}
if componentDynConfig.ServiceArgs.Resources.GPU != nil {
requests.GPU = *componentDynConfig.ServiceArgs.Resources.GPU
limits.GPU = *componentDynConfig.ServiceArgs.Resources.GPU
}
if componentDynConfig.ServiceArgs.Resources.CPU != nil {
requests.CPU = *componentDynConfig.ServiceArgs.Resources.CPU
limits.CPU = *componentDynConfig.ServiceArgs.Resources.CPU
}
if componentDynConfig.ServiceArgs.Resources.Memory != nil {
requests.Memory = *componentDynConfig.ServiceArgs.Resources.Memory
limits.Memory = *componentDynConfig.ServiceArgs.Resources.Memory
}
if componentDynConfig.ServiceArgs.Resources.Custom != nil {
requests.Custom = componentDynConfig.ServiceArgs.Resources.Custom
limits.Custom = componentDynConfig.ServiceArgs.Resources.Custom
}
if err := dynamo.SetLwsAnnotations(componentDynConfig.ServiceArgs, dynamoDeploymentComponent); err != nil {
return err
}
}
}
break
}
}
return nil
}
func (r *DynamoGraphDeploymentReconciler) FinalizeResource(ctx context.Context, dynamoDeployment *nvidiacomv1alpha1.DynamoGraphDeployment) error {
// for now doing nothing
return nil
......
/*
* SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package controller
import (
"context"
"reflect"
"sort"
"testing"
"github.com/ai-dynamo/dynamo/deploy/cloud/operator/api/dynamo/common"
nvidiacomv1alpha1 "github.com/ai-dynamo/dynamo/deploy/cloud/operator/api/v1alpha1"
"github.com/bsm/gomega"
"github.com/google/go-cmp/cmp"
corev1 "k8s.io/api/core/v1"
)
func Test_mergeEnvs(t *testing.T) {
type args struct {
common []corev1.EnvVar
specific []corev1.EnvVar
}
tests := []struct {
name string
args args
want []corev1.EnvVar
}{
{
name: "no_common_envs",
args: args{
common: []corev1.EnvVar{},
specific: []corev1.EnvVar{{Name: "FOO", Value: "BAR"}},
},
want: []corev1.EnvVar{{Name: "FOO", Value: "BAR"}},
},
{
name: "no_specific_envs",
args: args{
common: []corev1.EnvVar{{Name: "FOO", Value: "BAR"}},
specific: []corev1.EnvVar{},
},
want: []corev1.EnvVar{{Name: "FOO", Value: "BAR"}},
},
{
name: "common_and_specific_envs",
args: args{
specific: []corev1.EnvVar{{Name: "BAZ", Value: "QUX"}},
common: []corev1.EnvVar{{Name: "FOO", Value: "BAR"}},
},
want: []corev1.EnvVar{{Name: "BAZ", Value: "QUX"}, {Name: "FOO", Value: "BAR"}},
},
{
name: "common_and_specific_envs_with_same_name",
args: args{
common: []corev1.EnvVar{{Name: "FOO", Value: "BAR"}},
specific: []corev1.EnvVar{{Name: "FOO", Value: "QUX"}},
},
want: []corev1.EnvVar{{Name: "FOO", Value: "QUX"}},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := mergeEnvs(tt.args.common, tt.args.specific)
sort.Slice(got, func(i, j int) bool {
return got[i].Name < got[j].Name
})
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("mergeEnvs() = %v, want %v", got, tt.want)
}
})
}
}
func Test_updateDynDeploymentConfig(t *testing.T) {
type args struct {
dynamoDeploymentComponent *nvidiacomv1alpha1.DynamoComponentDeployment
newPort int
}
tests := []struct {
name string
args args
want []corev1.EnvVar
wantErr bool
}{
{
name: "main component",
args: args{
dynamoDeploymentComponent: &nvidiacomv1alpha1.DynamoComponentDeployment{
Spec: nvidiacomv1alpha1.DynamoComponentDeploymentSpec{
DynamoTag: "graphs.agg:Frontend",
DynamoComponentDeploymentSharedSpec: nvidiacomv1alpha1.DynamoComponentDeploymentSharedSpec{
ServiceName: "Frontend",
Envs: []corev1.EnvVar{
{
Name: "DYN_DEPLOYMENT_CONFIG",
Value: `{"Frontend":{"port":8080},"Planner":{"environment":"kubernetes"}}`,
},
{
Name: "OTHER",
Value: `value`,
},
},
},
},
},
newPort: 3000,
},
want: []corev1.EnvVar{
{
Name: "DYN_DEPLOYMENT_CONFIG",
Value: `{"Frontend":{"port":3000},"Planner":{"environment":"kubernetes"}}`,
},
{
Name: "OTHER",
Value: `value`,
},
},
wantErr: false,
},
{
name: "not main component",
args: args{
dynamoDeploymentComponent: &nvidiacomv1alpha1.DynamoComponentDeployment{
Spec: nvidiacomv1alpha1.DynamoComponentDeploymentSpec{
DynamoTag: "graphs.agg:Frontend",
DynamoComponentDeploymentSharedSpec: nvidiacomv1alpha1.DynamoComponentDeploymentSharedSpec{
ServiceName: "Other",
Envs: []corev1.EnvVar{
{
Name: "DYN_DEPLOYMENT_CONFIG",
Value: `{"Frontend":{"port":8080},"Planner":{"environment":"kubernetes"}}`,
},
{
Name: "OTHER",
Value: `value`,
},
},
},
},
},
newPort: 3000,
},
want: []corev1.EnvVar{
{
Name: "DYN_DEPLOYMENT_CONFIG",
Value: `{"Frontend":{"port":8080},"Planner":{"environment":"kubernetes"}}`,
},
{
Name: "OTHER",
Value: `value`,
},
},
wantErr: false,
},
{
name: "no DYN_DEPLOYMENT_CONFIG env variable",
args: args{
dynamoDeploymentComponent: &nvidiacomv1alpha1.DynamoComponentDeployment{
Spec: nvidiacomv1alpha1.DynamoComponentDeploymentSpec{
DynamoTag: "graphs.agg:Frontend",
DynamoComponentDeploymentSharedSpec: nvidiacomv1alpha1.DynamoComponentDeploymentSharedSpec{
ServiceName: "Frontend",
Envs: []corev1.EnvVar{
{
Name: "OTHER",
Value: `value`,
},
},
},
},
},
newPort: 8080,
},
want: []corev1.EnvVar{
{
Name: "OTHER",
Value: `value`,
},
},
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := updateDynDeploymentConfig(tt.args.dynamoDeploymentComponent, tt.args.newPort)
if (err != nil) != tt.wantErr {
t.Errorf("updateDynDeploymentConfig() error = %v, wantErr %v", err, tt.wantErr)
return
}
g := gomega.NewGomegaWithT(t)
g.Expect(tt.args.dynamoDeploymentComponent.Spec.Envs).To(gomega.Equal(tt.want))
})
}
}
func Test_overrideWithDynDeploymentConfig(t *testing.T) {
type args struct {
ctx context.Context
dynamoDeploymentComponent *nvidiacomv1alpha1.DynamoComponentDeployment
}
tests := []struct {
name string
args args
wantErr bool
expected *nvidiacomv1alpha1.DynamoComponentDeployment
}{
{
name: "no env var",
args: args{
ctx: context.Background(),
dynamoDeploymentComponent: &nvidiacomv1alpha1.DynamoComponentDeployment{
Spec: nvidiacomv1alpha1.DynamoComponentDeploymentSpec{
DynamoComponentDeploymentSharedSpec: nvidiacomv1alpha1.DynamoComponentDeploymentSharedSpec{
ServiceName: "Frontend",
Replicas: &[]int32{1}[0],
Resources: &common.Resources{
Requests: &common.ResourceItem{
CPU: "1",
Memory: "1Gi",
GPU: "1",
},
},
},
},
},
},
wantErr: false,
expected: &nvidiacomv1alpha1.DynamoComponentDeployment{
Spec: nvidiacomv1alpha1.DynamoComponentDeploymentSpec{
DynamoComponentDeploymentSharedSpec: nvidiacomv1alpha1.DynamoComponentDeploymentSharedSpec{
ServiceName: "Frontend",
Replicas: &[]int32{1}[0],
Resources: &common.Resources{
Requests: &common.ResourceItem{
CPU: "1",
Memory: "1Gi",
GPU: "1",
},
},
},
},
},
},
{
name: "override workers and resources",
args: args{
ctx: context.Background(),
dynamoDeploymentComponent: &nvidiacomv1alpha1.DynamoComponentDeployment{
Spec: nvidiacomv1alpha1.DynamoComponentDeploymentSpec{
DynamoComponentDeploymentSharedSpec: nvidiacomv1alpha1.DynamoComponentDeploymentSharedSpec{
ServiceName: "Frontend",
Envs: []corev1.EnvVar{
{
Name: "DYN_DEPLOYMENT_CONFIG",
Value: `{"Frontend":{"port":8080,"ServiceArgs":{"Workers":3, "Resources":{"CPU":"2", "Memory":"2Gi", "GPU":"2"}}},"Planner":{"environment":"kubernetes"}}`,
},
},
Replicas: nil,
Resources: &common.Resources{
Requests: &common.ResourceItem{
CPU: "1",
Memory: "1Gi",
GPU: "1",
},
},
},
},
},
},
wantErr: false,
expected: &nvidiacomv1alpha1.DynamoComponentDeployment{
Spec: nvidiacomv1alpha1.DynamoComponentDeploymentSpec{
DynamoComponentDeploymentSharedSpec: nvidiacomv1alpha1.DynamoComponentDeploymentSharedSpec{
ServiceName: "Frontend",
Envs: []corev1.EnvVar{
{
Name: "DYN_DEPLOYMENT_CONFIG",
Value: `{"Frontend":{"port":8080,"ServiceArgs":{"Workers":3, "Resources":{"CPU":"2", "Memory":"2Gi", "GPU":"2"}}},"Planner":{"environment":"kubernetes"}}`,
},
},
Replicas: &[]int32{3}[0],
Resources: &common.Resources{
Requests: &common.ResourceItem{
CPU: "2",
Memory: "2Gi",
GPU: "2",
},
Limits: &common.ResourceItem{
CPU: "2",
Memory: "2Gi",
GPU: "2",
},
},
},
},
},
},
{
name: "override workers and resources with gpusPerNode",
args: args{
ctx: context.Background(),
dynamoDeploymentComponent: &nvidiacomv1alpha1.DynamoComponentDeployment{
Spec: nvidiacomv1alpha1.DynamoComponentDeploymentSpec{
DynamoComponentDeploymentSharedSpec: nvidiacomv1alpha1.DynamoComponentDeploymentSharedSpec{
ServiceName: "Frontend",
Envs: []corev1.EnvVar{
{
Name: "DYN_DEPLOYMENT_CONFIG",
Value: `{"Frontend":{"port":8080,"ServiceArgs":{"Workers":3, "Resources":{"CPU":"2", "Memory":"2Gi", "GPU":"8"}, "total_gpus":16}},"Planner":{"environment":"kubernetes"}}`,
},
},
Replicas: nil,
Resources: &common.Resources{
Requests: &common.ResourceItem{
CPU: "1",
Memory: "1Gi",
GPU: "1",
},
},
},
},
},
},
wantErr: false,
expected: &nvidiacomv1alpha1.DynamoComponentDeployment{
Spec: nvidiacomv1alpha1.DynamoComponentDeploymentSpec{
DynamoComponentDeploymentSharedSpec: nvidiacomv1alpha1.DynamoComponentDeploymentSharedSpec{
ServiceName: "Frontend",
Envs: []corev1.EnvVar{
{
Name: "DYN_DEPLOYMENT_CONFIG",
Value: `{"Frontend":{"port":8080,"ServiceArgs":{"Workers":3, "Resources":{"CPU":"2", "Memory":"2Gi", "GPU":"8"}, "total_gpus":16}},"Planner":{"environment":"kubernetes"}}`,
},
},
Replicas: &[]int32{3}[0],
Resources: &common.Resources{
Requests: &common.ResourceItem{
CPU: "2",
Memory: "2Gi",
GPU: "8",
},
Limits: &common.ResourceItem{
CPU: "2",
Memory: "2Gi",
GPU: "8",
},
},
Annotations: map[string]string{
"nvidia.com/deployment-type": "leader-worker",
"nvidia.com/lws-size": "2",
},
},
},
},
},
{
name: "override subset of resources",
args: args{
ctx: context.Background(),
dynamoDeploymentComponent: &nvidiacomv1alpha1.DynamoComponentDeployment{
Spec: nvidiacomv1alpha1.DynamoComponentDeploymentSpec{
DynamoComponentDeploymentSharedSpec: nvidiacomv1alpha1.DynamoComponentDeploymentSharedSpec{
ServiceName: "Frontend",
Envs: []corev1.EnvVar{
{
Name: "DYN_DEPLOYMENT_CONFIG",
Value: `{"Frontend":{"port":8080,"ServiceArgs":{"Workers":3, "Resources":{"GPU":"2"}}},"Planner":{"environment":"kubernetes"}}`,
},
},
Replicas: nil,
Resources: &common.Resources{
Requests: &common.ResourceItem{
CPU: "1",
Memory: "1Gi",
GPU: "1",
},
},
},
},
},
},
wantErr: false,
expected: &nvidiacomv1alpha1.DynamoComponentDeployment{
Spec: nvidiacomv1alpha1.DynamoComponentDeploymentSpec{
DynamoComponentDeploymentSharedSpec: nvidiacomv1alpha1.DynamoComponentDeploymentSharedSpec{
ServiceName: "Frontend",
Envs: []corev1.EnvVar{
{
Name: "DYN_DEPLOYMENT_CONFIG",
Value: `{"Frontend":{"port":8080,"ServiceArgs":{"Workers":3, "Resources":{"GPU":"2"}}},"Planner":{"environment":"kubernetes"}}`,
},
},
Replicas: &[]int32{3}[0],
Resources: &common.Resources{
Requests: &common.ResourceItem{
CPU: "1",
Memory: "1Gi",
GPU: "2",
},
Limits: &common.ResourceItem{
CPU: "",
Memory: "",
GPU: "2",
},
},
},
},
},
},
{
name: "do not override replicas if explicitly set in the CRD !",
args: args{
ctx: context.Background(),
dynamoDeploymentComponent: &nvidiacomv1alpha1.DynamoComponentDeployment{
Spec: nvidiacomv1alpha1.DynamoComponentDeploymentSpec{
DynamoComponentDeploymentSharedSpec: nvidiacomv1alpha1.DynamoComponentDeploymentSharedSpec{
ServiceName: "Frontend",
Envs: []corev1.EnvVar{
{
Name: "DYN_DEPLOYMENT_CONFIG",
Value: `{"Frontend":{"port":8080,"ServiceArgs":{"Workers":3}},"Planner":{"environment":"kubernetes"}}`,
},
},
Replicas: &[]int32{1}[0],
Resources: &common.Resources{
Requests: &common.ResourceItem{
CPU: "1",
Memory: "1Gi",
GPU: "1",
},
},
},
},
},
},
wantErr: false,
expected: &nvidiacomv1alpha1.DynamoComponentDeployment{
Spec: nvidiacomv1alpha1.DynamoComponentDeploymentSpec{
DynamoComponentDeploymentSharedSpec: nvidiacomv1alpha1.DynamoComponentDeploymentSharedSpec{
ServiceName: "Frontend",
Envs: []corev1.EnvVar{
{
Name: "DYN_DEPLOYMENT_CONFIG",
Value: `{"Frontend":{"port":8080,"ServiceArgs":{"Workers":3}},"Planner":{"environment":"kubernetes"}}`,
},
},
Replicas: &[]int32{1}[0],
Resources: &common.Resources{
Requests: &common.ResourceItem{
CPU: "1",
Memory: "1Gi",
GPU: "1",
},
},
},
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if err := overrideWithDynDeploymentConfig(tt.args.ctx, tt.args.dynamoDeploymentComponent); (err != nil) != tt.wantErr {
t.Errorf("overrideWithDynDeploymentConfig() error = %v, wantErr %v", err, tt.wantErr)
}
if diff := cmp.Diff(tt.args.dynamoDeploymentComponent, tt.expected); diff != "" {
t.Errorf("overrideWithDynDeploymentConfig() mismatch (-want +got):\n%s", diff)
}
})
}
}
......@@ -27,6 +27,7 @@ import (
"strconv"
"strings"
"dario.cat/mergo"
"emperror.dev/errors"
apiStoreClient "github.com/ai-dynamo/dynamo/deploy/cloud/operator/api/dynamo/api_store_client"
"github.com/ai-dynamo/dynamo/deploy/cloud/operator/api/dynamo/common"
......@@ -367,6 +368,32 @@ func GenerateDynamoComponentsDeployments(ctx context.Context, parentDynamoGraphD
deployment.Spec.Autoscaling.MinReplicas = service.Config.Autoscaling.MinReplicas
deployment.Spec.Autoscaling.MaxReplicas = service.Config.Autoscaling.MaxReplicas
}
// override the component config with the component config that is in the parent deployment
if configOverride, ok := parentDynamoGraphDeployment.Spec.Services[service.Name]; ok {
err := mergo.Merge(&deployment.Spec.DynamoComponentDeploymentSharedSpec, configOverride.DynamoComponentDeploymentSharedSpec, mergo.WithOverride)
if err != nil {
return nil, err
}
}
// merge the envs from the parent deployment with the envs from the service
if len(parentDynamoGraphDeployment.Spec.Envs) > 0 {
deployment.Spec.Envs = mergeEnvs(parentDynamoGraphDeployment.Spec.Envs, deployment.Spec.Envs)
}
err := updateDynDeploymentConfig(deployment, commonconsts.DynamoServicePort)
if err != nil {
return nil, err
}
err = overrideWithDynDeploymentConfig(ctx, deployment)
if err != nil {
return nil, err
}
// we only override the replicas if it is not set in the CRD.
// replicas, if set in the CRD must always be the source of truth.
if parentSpec, ok := parentDynamoGraphDeployment.Spec.Services[service.Name]; ok {
if parentSpec.DynamoComponentDeploymentSharedSpec.Replicas != nil {
deployment.Spec.Replicas = parentSpec.DynamoComponentDeploymentSharedSpec.Replicas
}
}
deployments[service.Name] = deployment
}
for _, service := range config.Services {
......@@ -396,3 +423,107 @@ func GenerateDynamoComponentsDeployments(ctx context.Context, parentDynamoGraphD
}
return deployments, nil
}
// updateDynDeploymentConfig updates the runtime config object for the given dynamoDeploymentComponent
// It updates the port for the given service (if it is the main component)
func updateDynDeploymentConfig(dynamoDeploymentComponent *v1alpha1.DynamoComponentDeployment, newPort int) error {
if dynamoDeploymentComponent.IsMainComponent() {
dynamoDeploymentConfig := dynamoDeploymentComponent.GetDynamoDeploymentConfig()
if dynamoDeploymentConfig != nil {
var config map[string]any
if err := json.Unmarshal(dynamoDeploymentConfig, &config); err != nil {
return fmt.Errorf("failed to unmarshal %v: %w", commonconsts.DynamoDeploymentConfigEnvVar, err)
}
// Safely navigate and update the config
if serviceConfig, ok := config[dynamoDeploymentComponent.Spec.ServiceName].(map[string]any); ok {
serviceConfig["port"] = newPort
}
// Marshal back to JSON string
updated, err := json.Marshal(config)
if err != nil {
return fmt.Errorf("failed to marshal updated config: %w", err)
}
dynamoDeploymentComponent.SetDynamoDeploymentConfig(updated)
}
}
return nil
}
func overrideWithDynDeploymentConfig(ctx context.Context, dynamoDeploymentComponent *v1alpha1.DynamoComponentDeployment) error {
dynamoDeploymentConfig := dynamoDeploymentComponent.GetDynamoDeploymentConfig()
if dynamoDeploymentConfig == nil {
return nil
}
dynDeploymentConfig, err := ParseDynDeploymentConfig(ctx, dynamoDeploymentConfig)
if err != nil {
return fmt.Errorf("failed to parse %v: %w", commonconsts.DynamoDeploymentConfigEnvVar, err)
}
componentDynConfig := dynDeploymentConfig[dynamoDeploymentComponent.Spec.ServiceName]
if componentDynConfig != nil {
if componentDynConfig.ServiceArgs != nil && componentDynConfig.ServiceArgs.Workers != nil {
dynamoDeploymentComponent.Spec.Replicas = componentDynConfig.ServiceArgs.Workers
}
if componentDynConfig.ServiceArgs != nil && componentDynConfig.ServiceArgs.Resources != nil {
requests := &common.ResourceItem{}
limits := &common.ResourceItem{}
if dynamoDeploymentComponent.Spec.Resources == nil {
dynamoDeploymentComponent.Spec.Resources = &common.Resources{
Requests: requests,
Limits: limits,
}
} else {
if dynamoDeploymentComponent.Spec.Resources.Requests != nil {
requests = dynamoDeploymentComponent.Spec.Resources.Requests
} else {
dynamoDeploymentComponent.Spec.Resources.Requests = requests
}
if dynamoDeploymentComponent.Spec.Resources.Limits != nil {
limits = dynamoDeploymentComponent.Spec.Resources.Limits
} else {
dynamoDeploymentComponent.Spec.Resources.Limits = limits
}
}
if componentDynConfig.ServiceArgs.Resources.GPU != nil {
requests.GPU = *componentDynConfig.ServiceArgs.Resources.GPU
limits.GPU = *componentDynConfig.ServiceArgs.Resources.GPU
}
if componentDynConfig.ServiceArgs.Resources.CPU != nil {
requests.CPU = *componentDynConfig.ServiceArgs.Resources.CPU
limits.CPU = *componentDynConfig.ServiceArgs.Resources.CPU
}
if componentDynConfig.ServiceArgs.Resources.Memory != nil {
requests.Memory = *componentDynConfig.ServiceArgs.Resources.Memory
limits.Memory = *componentDynConfig.ServiceArgs.Resources.Memory
}
if componentDynConfig.ServiceArgs.Resources.Custom != nil {
requests.Custom = componentDynConfig.ServiceArgs.Resources.Custom
limits.Custom = componentDynConfig.ServiceArgs.Resources.Custom
}
if err := SetLwsAnnotations(componentDynConfig.ServiceArgs, dynamoDeploymentComponent); err != nil {
return err
}
}
}
return nil
}
func mergeEnvs(common, specific []corev1.EnvVar) []corev1.EnvVar {
envMap := make(map[string]corev1.EnvVar)
// Add all common environment variables.
for _, env := range common {
envMap[env.Name] = env
}
// Override or add with service-specific environment variables.
for _, env := range specific {
envMap[env.Name] = env
}
// Convert the map back to a slice.
merged := make([]corev1.EnvVar, 0, len(envMap))
for _, env := range envMap {
merged = append(merged, env)
}
return merged
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment