Unverified Commit e94f3444 authored by julienmancuso's avatar julienmancuso Committed by GitHub
Browse files

fix: read 'workers' to set deployments 'replicas' (#1040)

parent e06fd7d2
......@@ -33,6 +33,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/predicate"
dynamoCommon "github.com/ai-dynamo/dynamo/deploy/cloud/operator/api/dynamo/common"
nvidiacomv1alpha1 "github.com/ai-dynamo/dynamo/deploy/cloud/operator/api/v1alpha1"
"github.com/ai-dynamo/dynamo/deploy/cloud/operator/internal/consts"
commonController "github.com/ai-dynamo/dynamo/deploy/cloud/operator/internal/controller_common"
......@@ -164,6 +165,11 @@ func (r *DynamoGraphDeploymentReconciler) Reconcile(ctx context.Context, req ctr
logger.Error(err, fmt.Sprintf("Failed to update the %v env var", DYN_DEPLOYMENT_CONFIG_ENV_VAR))
return ctrl.Result{}, err
}
err = overrideWithDynDeploymentConfig(ctx, deployment)
if err != nil {
logger.Error(err, fmt.Sprintf("Failed to override the component config with the %v env var", DYN_DEPLOYMENT_CONFIG_ENV_VAR))
return ctrl.Result{}, err
}
}
// reconcile the dynamoComponent
......@@ -308,6 +314,62 @@ func updateDynDeploymentConfig(dynamoDeploymentComponent *nvidiacomv1alpha1.Dyna
return nil
}
func overrideWithDynDeploymentConfig(ctx context.Context, dynamoDeploymentComponent *nvidiacomv1alpha1.DynamoComponentDeployment) error {
for _, env := range dynamoDeploymentComponent.Spec.Envs {
if env.Name == DYN_DEPLOYMENT_CONFIG_ENV_VAR {
dynDeploymentConfig, err := dynamo.ParseDynDeploymentConfig(ctx, []byte(env.Value))
if err != nil {
return fmt.Errorf("failed to parse %v: %w", DYN_DEPLOYMENT_CONFIG_ENV_VAR, err)
}
componentDynConfig := dynDeploymentConfig[dynamoDeploymentComponent.Spec.ServiceName]
if componentDynConfig != nil {
if componentDynConfig.ServiceArgs != nil && componentDynConfig.ServiceArgs.Workers != nil {
dynamoDeploymentComponent.Spec.Replicas = componentDynConfig.ServiceArgs.Workers
}
if componentDynConfig.ServiceArgs != nil && componentDynConfig.ServiceArgs.Resources != nil {
requests := &dynamoCommon.ResourceItem{}
limits := &dynamoCommon.ResourceItem{}
if dynamoDeploymentComponent.Spec.Resources == nil {
dynamoDeploymentComponent.Spec.Resources = &dynamoCommon.Resources{
Requests: requests,
Limits: limits,
}
} else {
if dynamoDeploymentComponent.Spec.Resources.Requests != nil {
requests = dynamoDeploymentComponent.Spec.Resources.Requests
} else {
dynamoDeploymentComponent.Spec.Resources.Requests = requests
}
if dynamoDeploymentComponent.Spec.Resources.Limits != nil {
limits = dynamoDeploymentComponent.Spec.Resources.Limits
} else {
dynamoDeploymentComponent.Spec.Resources.Limits = limits
}
}
if componentDynConfig.ServiceArgs.Resources.GPU != nil {
requests.GPU = *componentDynConfig.ServiceArgs.Resources.GPU
limits.GPU = *componentDynConfig.ServiceArgs.Resources.GPU
}
if componentDynConfig.ServiceArgs.Resources.CPU != nil {
requests.CPU = *componentDynConfig.ServiceArgs.Resources.CPU
limits.CPU = *componentDynConfig.ServiceArgs.Resources.CPU
}
if componentDynConfig.ServiceArgs.Resources.Memory != nil {
requests.Memory = *componentDynConfig.ServiceArgs.Resources.Memory
limits.Memory = *componentDynConfig.ServiceArgs.Resources.Memory
}
if componentDynConfig.ServiceArgs.Resources.Custom != nil {
requests.Custom = componentDynConfig.ServiceArgs.Resources.Custom
limits.Custom = componentDynConfig.ServiceArgs.Resources.Custom
}
}
}
break
}
}
return nil
}
func (r *DynamoGraphDeploymentReconciler) FinalizeResource(ctx context.Context, dynamoDeployment *nvidiacomv1alpha1.DynamoGraphDeployment) error {
// for now doing nothing
return nil
......
......@@ -18,10 +18,12 @@
package controller
import (
"context"
"reflect"
"sort"
"testing"
"github.com/ai-dynamo/dynamo/deploy/cloud/operator/api/dynamo/common"
nvidiacomv1alpha1 "github.com/ai-dynamo/dynamo/deploy/cloud/operator/api/v1alpha1"
"github.com/bsm/gomega"
corev1 "k8s.io/api/core/v1"
......@@ -204,3 +206,171 @@ func Test_updateDynDeploymentConfig(t *testing.T) {
})
}
}
func Test_overrideWithDynDeploymentConfig(t *testing.T) {
type args struct {
ctx context.Context
dynamoDeploymentComponent *nvidiacomv1alpha1.DynamoComponentDeployment
}
tests := []struct {
name string
args args
wantErr bool
expected *nvidiacomv1alpha1.DynamoComponentDeployment
}{
{
name: "no env var",
args: args{
ctx: context.Background(),
dynamoDeploymentComponent: &nvidiacomv1alpha1.DynamoComponentDeployment{
Spec: nvidiacomv1alpha1.DynamoComponentDeploymentSpec{
DynamoComponentDeploymentSharedSpec: nvidiacomv1alpha1.DynamoComponentDeploymentSharedSpec{
ServiceName: "Frontend",
Replicas: &[]int32{1}[0],
Resources: &common.Resources{
Requests: &common.ResourceItem{
CPU: "1",
Memory: "1Gi",
GPU: "1",
},
},
},
},
},
},
wantErr: false,
expected: &nvidiacomv1alpha1.DynamoComponentDeployment{
Spec: nvidiacomv1alpha1.DynamoComponentDeploymentSpec{
DynamoComponentDeploymentSharedSpec: nvidiacomv1alpha1.DynamoComponentDeploymentSharedSpec{
ServiceName: "Frontend",
Replicas: &[]int32{1}[0],
Resources: &common.Resources{
Requests: &common.ResourceItem{
CPU: "1",
Memory: "1Gi",
GPU: "1",
},
},
},
},
},
},
{
name: "override workers and resources",
args: args{
ctx: context.Background(),
dynamoDeploymentComponent: &nvidiacomv1alpha1.DynamoComponentDeployment{
Spec: nvidiacomv1alpha1.DynamoComponentDeploymentSpec{
DynamoComponentDeploymentSharedSpec: nvidiacomv1alpha1.DynamoComponentDeploymentSharedSpec{
ServiceName: "Frontend",
Envs: []corev1.EnvVar{
{
Name: "DYN_DEPLOYMENT_CONFIG",
Value: `{"Frontend":{"port":8080,"ServiceArgs":{"Workers":3, "Resources":{"CPU":"2", "Memory":"2Gi", "GPU":"2"}}},"Planner":{"environment":"kubernetes"}}`,
},
},
Replicas: &[]int32{1}[0],
Resources: &common.Resources{
Requests: &common.ResourceItem{
CPU: "1",
Memory: "1Gi",
GPU: "1",
},
},
},
},
},
},
wantErr: false,
expected: &nvidiacomv1alpha1.DynamoComponentDeployment{
Spec: nvidiacomv1alpha1.DynamoComponentDeploymentSpec{
DynamoComponentDeploymentSharedSpec: nvidiacomv1alpha1.DynamoComponentDeploymentSharedSpec{
ServiceName: "Frontend",
Envs: []corev1.EnvVar{
{
Name: "DYN_DEPLOYMENT_CONFIG",
Value: `{"Frontend":{"port":8080,"ServiceArgs":{"Workers":3, "Resources":{"CPU":"2", "Memory":"2Gi", "GPU":"2"}}},"Planner":{"environment":"kubernetes"}}`,
},
},
Replicas: &[]int32{3}[0],
Resources: &common.Resources{
Requests: &common.ResourceItem{
CPU: "2",
Memory: "2Gi",
GPU: "2",
},
Limits: &common.ResourceItem{
CPU: "2",
Memory: "2Gi",
GPU: "2",
},
},
},
},
},
},
{
name: "override subset of resources",
args: args{
ctx: context.Background(),
dynamoDeploymentComponent: &nvidiacomv1alpha1.DynamoComponentDeployment{
Spec: nvidiacomv1alpha1.DynamoComponentDeploymentSpec{
DynamoComponentDeploymentSharedSpec: nvidiacomv1alpha1.DynamoComponentDeploymentSharedSpec{
ServiceName: "Frontend",
Envs: []corev1.EnvVar{
{
Name: "DYN_DEPLOYMENT_CONFIG",
Value: `{"Frontend":{"port":8080,"ServiceArgs":{"Workers":3, "Resources":{"GPU":"2"}}},"Planner":{"environment":"kubernetes"}}`,
},
},
Replicas: &[]int32{1}[0],
Resources: &common.Resources{
Requests: &common.ResourceItem{
CPU: "1",
Memory: "1Gi",
GPU: "1",
},
},
},
},
},
},
wantErr: false,
expected: &nvidiacomv1alpha1.DynamoComponentDeployment{
Spec: nvidiacomv1alpha1.DynamoComponentDeploymentSpec{
DynamoComponentDeploymentSharedSpec: nvidiacomv1alpha1.DynamoComponentDeploymentSharedSpec{
ServiceName: "Frontend",
Envs: []corev1.EnvVar{
{
Name: "DYN_DEPLOYMENT_CONFIG",
Value: `{"Frontend":{"port":8080,"ServiceArgs":{"Workers":3, "Resources":{"GPU":"2"}}},"Planner":{"environment":"kubernetes"}}`,
},
},
Replicas: &[]int32{3}[0],
Resources: &common.Resources{
Requests: &common.ResourceItem{
CPU: "1",
Memory: "1Gi",
GPU: "2",
},
Limits: &common.ResourceItem{
CPU: "",
Memory: "",
GPU: "2",
},
},
},
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
g := gomega.NewGomegaWithT(t)
if err := overrideWithDynDeploymentConfig(tt.args.ctx, tt.args.dynamoDeploymentComponent); (err != nil) != tt.wantErr {
t.Errorf("overrideWithDynDeploymentConfig() error = %v, wantErr %v", err, tt.wantErr)
}
g.Expect(tt.args.dynamoDeploymentComponent).To(gomega.Equal(tt.expected))
})
}
}
......@@ -20,6 +20,7 @@ package dynamo
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
......@@ -56,10 +57,10 @@ type DynamoConfig struct {
}
type Resources struct {
CPU string `yaml:"cpu,omitempty"`
Memory string `yaml:"memory,omitempty"`
GPU string `yaml:"gpu,omitempty"`
Custom map[string]string `yaml:"custom,omitempty"`
CPU *string `yaml:"cpu,omitempty" json:"cpu,omitempty"`
Memory *string `yaml:"memory,omitempty" json:"memory,omitempty"`
GPU *string `yaml:"gpu,omitempty" json:"gpu,omitempty"`
Custom map[string]string `yaml:"custom,omitempty" json:"custom,omitempty"`
}
type Traffic struct {
......@@ -78,6 +79,7 @@ type Config struct {
Autoscaling *Autoscaling `yaml:"autoscaling,omitempty"`
HttpExposed bool `yaml:"http_exposed,omitempty"`
ApiEndpoints []string `yaml:"api_endpoints,omitempty"`
Workers *int32 `yaml:"workers,omitempty"`
}
type ServiceConfig struct {
......@@ -86,6 +88,19 @@ type ServiceConfig struct {
Config Config `yaml:"config"`
}
type DynDeploymentConfig = map[string]*DynDeploymentServiceConfig
// ServiceConfig represents the configuration for a specific service
type DynDeploymentServiceConfig struct {
ServiceArgs *ServiceArgs `json:"ServiceArgs,omitempty"`
}
// ServiceArgs represents the arguments that can be passed to any service
type ServiceArgs struct {
Workers *int32 `json:"workers,omitempty"`
Resources *Resources `json:"resources,omitempty"`
}
func (s ServiceConfig) GetNamespace() *string {
if s.Config.Dynamo == nil || s.Config.Dynamo.Namespace == "" {
return nil
......@@ -220,6 +235,12 @@ func ParseDynamoGraphConfig(ctx context.Context, yamlContent *bytes.Buffer) (*Dy
return &config, err
}
func ParseDynDeploymentConfig(ctx context.Context, jsonContent []byte) (DynDeploymentConfig, error) {
var config DynDeploymentConfig
err := json.Unmarshal(jsonContent, &config)
return config, err
}
func GetDynamoGraphConfig(ctx context.Context, dynamoDeployment *v1alpha1.DynamoGraphDeployment, recorder EventRecorder) (*DynamoGraphConfig, error) {
dynamoGraphDownloadURL, err := RetrieveDynamoGraphDownloadURL(ctx, dynamoDeployment, recorder)
if err != nil {
......@@ -244,6 +265,7 @@ func GenerateDynamoComponentsDeployments(ctx context.Context, parentDynamoGraphD
deployment.Spec.DynamoTag = config.DynamoTag
deployment.Spec.DynamoComponent = parentDynamoGraphDeployment.Spec.DynamoGraph
deployment.Spec.ServiceName = service.Name
deployment.Spec.Replicas = service.Config.Workers
labels := make(map[string]string)
// add the labels in the spec in order to label all sub-resources
deployment.Spec.Labels = labels
......@@ -281,18 +303,24 @@ func GenerateDynamoComponentsDeployments(ctx context.Context, parentDynamoGraphD
if service.Config.Resources != nil {
deployment.Spec.Resources = &common.Resources{
Requests: &common.ResourceItem{
CPU: service.Config.Resources.CPU,
Memory: service.Config.Resources.Memory,
GPU: service.Config.Resources.GPU,
Custom: service.Config.Resources.Custom,
},
Limits: &common.ResourceItem{
CPU: service.Config.Resources.CPU,
Memory: service.Config.Resources.Memory,
GPU: service.Config.Resources.GPU,
Custom: service.Config.Resources.Custom,
},
}
if service.Config.Resources.CPU != nil {
deployment.Spec.Resources.Requests.CPU = *service.Config.Resources.CPU
deployment.Spec.Resources.Limits.CPU = *service.Config.Resources.CPU
}
if service.Config.Resources.Memory != nil {
deployment.Spec.Resources.Requests.Memory = *service.Config.Resources.Memory
deployment.Spec.Resources.Limits.Memory = *service.Config.Resources.Memory
}
if service.Config.Resources.GPU != nil {
deployment.Spec.Resources.Requests.GPU = *service.Config.Resources.GPU
deployment.Spec.Resources.Limits.GPU = *service.Config.Resources.GPU
}
}
deployment.Spec.Autoscaling = &v1alpha1.Autoscaling{
Enabled: false,
......
......@@ -65,15 +65,16 @@ func TestGenerateDynamoComponentsDeployments(t *testing.T) {
Name: "service1",
},
Resources: &Resources{
CPU: "1",
Memory: "1Gi",
GPU: "0",
CPU: &[]string{"1"}[0],
Memory: &[]string{"1Gi"}[0],
GPU: &[]string{"0"}[0],
Custom: map[string]string{},
},
Autoscaling: &Autoscaling{
MinReplicas: 1,
MaxReplicas: 5,
},
Workers: &[]int32{3}[0],
},
},
{
......@@ -105,6 +106,7 @@ func TestGenerateDynamoComponentsDeployments(t *testing.T) {
DynamoComponentDeploymentSharedSpec: v1alpha1.DynamoComponentDeploymentSharedSpec{
ServiceName: "service1",
DynamoNamespace: &[]string{"default"}[0],
Replicas: &[]int32{3}[0],
Resources: &compounaiCommon.Resources{
Requests: &compounaiCommon.ResourceItem{
CPU: "1",
......@@ -184,9 +186,9 @@ func TestGenerateDynamoComponentsDeployments(t *testing.T) {
Config: Config{
HttpExposed: true,
Resources: &Resources{
CPU: "1",
Memory: "1Gi",
GPU: "0",
CPU: &[]string{"1"}[0],
Memory: &[]string{"1Gi"}[0],
GPU: &[]string{"0"}[0],
Custom: map[string]string{},
},
Autoscaling: &Autoscaling{
......@@ -328,9 +330,9 @@ func TestGenerateDynamoComponentsDeployments(t *testing.T) {
Config: Config{
HttpExposed: true,
Resources: &Resources{
CPU: "1",
Memory: "1Gi",
GPU: "0",
CPU: &[]string{"1"}[0],
Memory: &[]string{"1Gi"}[0],
GPU: &[]string{"0"}[0],
Custom: map[string]string{},
},
Autoscaling: &Autoscaling{
......@@ -479,9 +481,9 @@ func TestGenerateDynamoComponentsDeployments(t *testing.T) {
Name: "service1",
},
Resources: &Resources{
CPU: "1",
Memory: "1Gi",
GPU: "0",
CPU: &[]string{"1"}[0],
Memory: &[]string{"1Gi"}[0],
GPU: &[]string{"0"}[0],
Custom: map[string]string{},
},
Autoscaling: &Autoscaling{
......@@ -532,9 +534,9 @@ func TestGenerateDynamoComponentsDeployments(t *testing.T) {
ComponentType: ComponentTypePlanner,
},
Resources: &Resources{
CPU: "1",
Memory: "1Gi",
GPU: "0",
CPU: &[]string{"1"}[0],
Memory: &[]string{"1Gi"}[0],
GPU: &[]string{"0"}[0],
Custom: map[string]string{},
},
},
......@@ -613,9 +615,9 @@ func TestGenerateDynamoComponentsDeployments(t *testing.T) {
Name: "service1",
},
Resources: &Resources{
CPU: "1",
Memory: "1Gi",
GPU: "0",
CPU: &[]string{"1"}[0],
Memory: &[]string{"1Gi"}[0],
GPU: &[]string{"0"}[0],
Custom: map[string]string{},
},
Autoscaling: &Autoscaling{
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment