Unverified Commit dba51eed authored by Julien Mancuso's avatar Julien Mancuso Committed by GitHub
Browse files

fix: propagate vllm-distributed-executor-backend annotation from DGD metadata to backends (#6692)


Signed-off-by: default avatarJulien Mancuso <jmancuso@nvidia.com>
parent 4ffa1082
...@@ -334,22 +334,7 @@ func generateSingleDCD( ...@@ -334,22 +334,7 @@ func generateSingleDCD(
labels[commonconsts.KubeLabelDynamoWorkerHash] = rollingUpdateCtx.NewWorkerHash labels[commonconsts.KubeLabelDynamoWorkerHash] = rollingUpdateCtx.NewWorkerHash
} }
// Propagate metrics annotation from parent deployment if present propagateDGDAnnotations(parentDGD.GetAnnotations(), &deployment.Spec.DynamoComponentDeploymentSharedSpec)
if parentDGD.Annotations != nil {
if deployment.Spec.Annotations == nil {
deployment.Spec.Annotations = make(map[string]string)
}
if val, exists := parentDGD.Annotations[commonconsts.KubeAnnotationEnableMetrics]; exists {
deployment.Spec.Annotations[commonconsts.KubeAnnotationEnableMetrics] = val
}
if val, exists := parentDGD.Annotations[commonconsts.KubeAnnotationDynamoDiscoveryBackend]; exists {
deployment.Spec.Annotations[commonconsts.KubeAnnotationDynamoDiscoveryBackend] = val
}
// Propagate operator origin version for version-gated behavior in backends
if val, exists := parentDGD.Annotations[commonconsts.KubeAnnotationDynamoOperatorOriginVersion]; exists {
deployment.Spec.Annotations[commonconsts.KubeAnnotationDynamoOperatorOriginVersion] = val
}
}
// Apply restart annotation if this service should be restarted. // Apply restart annotation if this service should be restarted.
if restartState.ShouldAnnotateService(componentName) { if restartState.ShouldAnnotateService(componentName) {
...@@ -1236,6 +1221,9 @@ func GeneratePodSpecForComponent( ...@@ -1236,6 +1221,9 @@ func GeneratePodSpecForComponent(
if len(dynamoDeployment.Spec.Envs) > 0 { if len(dynamoDeployment.Spec.Envs) > 0 {
component.Envs = MergeEnvs(dynamoDeployment.Spec.Envs, component.Envs) component.Envs = MergeEnvs(dynamoDeployment.Spec.Envs, component.Envs)
} }
propagateDGDAnnotations(dynamoDeployment.GetAnnotations(), component)
podSpec, err := GenerateBasePodSpec(component, backendFramework, secretsRetriever, dynamoDeployment.Name, dynamoDeployment.Namespace, role, numberOfNodes, operatorConfig, multinodeDeploymentType, serviceName, checkpointInfo) podSpec, err := GenerateBasePodSpec(component, backendFramework, secretsRetriever, dynamoDeployment.Name, dynamoDeployment.Namespace, role, numberOfNodes, operatorConfig, multinodeDeploymentType, serviceName, checkpointInfo)
if err != nil { if err != nil {
return nil, err return nil, err
...@@ -1243,6 +1231,32 @@ func GeneratePodSpecForComponent( ...@@ -1243,6 +1231,32 @@ func GeneratePodSpecForComponent(
return podSpec, nil return podSpec, nil
} }
// dgdPropagatedAnnotationKeys lists DGD metadata annotations that are propagated
// to component-level annotations (for both the DCD/controller and Grove paths).
// Service-level annotations take precedence (are never overwritten).
var dgdPropagatedAnnotationKeys = []string{
commonconsts.KubeAnnotationEnableMetrics,
commonconsts.KubeAnnotationDynamoDiscoveryBackend,
commonconsts.KubeAnnotationDynamoOperatorOriginVersion,
commonconsts.KubeAnnotationVLLMDistributedExecutorBackend,
}
// propagateDGDAnnotations copies DGD-level annotations into the component
// annotations so that downstream logic can read them uniformly.
// Service-level annotations take precedence (are never overwritten).
func propagateDGDAnnotations(dgdAnnotations map[string]string, component *v1alpha1.DynamoComponentDeploymentSharedSpec) {
for _, key := range dgdPropagatedAnnotationKeys {
if val, exists := dgdAnnotations[key]; exists {
if component.Annotations == nil {
component.Annotations = make(map[string]string)
}
if _, serviceHas := component.Annotations[key]; !serviceHas {
component.Annotations[key] = val
}
}
}
}
// GenerateGrovePodCliqueSet generates a Grove PodCliqueSet for the given deployment, supporting both single-node and multinode cases. // GenerateGrovePodCliqueSet generates a Grove PodCliqueSet for the given deployment, supporting both single-node and multinode cases.
func GenerateGrovePodCliqueSet( func GenerateGrovePodCliqueSet(
ctx context.Context, ctx context.Context,
...@@ -1295,14 +1309,6 @@ func GenerateGrovePodCliqueSet( ...@@ -1295,14 +1309,6 @@ func GenerateGrovePodCliqueSet(
component.Annotations[commonconsts.KubeAnnotationDynamoDiscoveryBackend] = string(discoveryBackend) component.Annotations[commonconsts.KubeAnnotationDynamoDiscoveryBackend] = string(discoveryBackend)
} }
// Propagate operator origin version for version-gated behavior in backends
if val, exists := dynamoDeployment.Annotations[commonconsts.KubeAnnotationDynamoOperatorOriginVersion]; exists {
if component.Annotations == nil {
component.Annotations = make(map[string]string)
}
component.Annotations[commonconsts.KubeAnnotationDynamoOperatorOriginVersion] = val
}
// Get checkpoint info for this service if available // Get checkpoint info for this service if available
var checkpointInfo *checkpoint.CheckpointInfo var checkpointInfo *checkpoint.CheckpointInfo
if checkpointInfoByService != nil { if checkpointInfoByService != nil {
......
...@@ -6966,3 +6966,77 @@ func TestFrontendDefaults_NamespacePrefixEnvVar(t *testing.T) { ...@@ -6966,3 +6966,77 @@ func TestFrontendDefaults_NamespacePrefixEnvVar(t *testing.T) {
} }
assert.True(t, found, "DYN_NAMESPACE_PREFIX should be set on frontend") assert.True(t, found, "DYN_NAMESPACE_PREFIX should be set on frontend")
} }
func TestPropagateDGDAnnotations(t *testing.T) {
tests := []struct {
name string
dgdAnnotations map[string]string
serviceAnnotations map[string]string
expectedAnnotation map[string]string
}{
{
name: "DGD annotation propagates to empty service annotations",
dgdAnnotations: map[string]string{
commonconsts.KubeAnnotationVLLMDistributedExecutorBackend: "ray",
},
serviceAnnotations: nil,
expectedAnnotation: map[string]string{
commonconsts.KubeAnnotationVLLMDistributedExecutorBackend: "ray",
},
},
{
name: "service-level annotation takes precedence over DGD",
dgdAnnotations: map[string]string{
commonconsts.KubeAnnotationVLLMDistributedExecutorBackend: "ray",
},
serviceAnnotations: map[string]string{
commonconsts.KubeAnnotationVLLMDistributedExecutorBackend: "mp",
},
expectedAnnotation: map[string]string{
commonconsts.KubeAnnotationVLLMDistributedExecutorBackend: "mp",
},
},
{
name: "no DGD annotation, no service annotation",
dgdAnnotations: nil,
serviceAnnotations: nil,
expectedAnnotation: nil,
},
{
name: "origin version also propagates",
dgdAnnotations: map[string]string{
commonconsts.KubeAnnotationDynamoOperatorOriginVersion: "1.0.0",
},
serviceAnnotations: nil,
expectedAnnotation: map[string]string{
commonconsts.KubeAnnotationDynamoOperatorOriginVersion: "1.0.0",
},
},
{
name: "unrelated DGD annotations are not propagated",
dgdAnnotations: map[string]string{
"some-other-annotation": "value",
},
serviceAnnotations: nil,
expectedAnnotation: nil,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
component := &v1alpha1.DynamoComponentDeploymentSharedSpec{
Annotations: tt.serviceAnnotations,
}
propagateDGDAnnotations(tt.dgdAnnotations, component)
if tt.expectedAnnotation == nil {
assert.True(t, len(component.Annotations) == 0 || component.Annotations == nil,
"expected no annotations, got %v", component.Annotations)
} else {
for k, v := range tt.expectedAnnotation {
assert.Equal(t, v, component.Annotations[k], "annotation %s mismatch", k)
}
}
})
}
}
...@@ -20,6 +20,7 @@ package validation ...@@ -20,6 +20,7 @@ package validation
import ( import (
"context" "context"
"fmt" "fmt"
"strings"
nvidiacomv1alpha1 "github.com/ai-dynamo/dynamo/deploy/operator/api/v1alpha1" nvidiacomv1alpha1 "github.com/ai-dynamo/dynamo/deploy/operator/api/v1alpha1"
"github.com/ai-dynamo/dynamo/deploy/operator/internal/consts" "github.com/ai-dynamo/dynamo/deploy/operator/internal/consts"
...@@ -112,6 +113,11 @@ func (v *SharedSpecValidator) Validate(ctx context.Context) (admission.Warnings, ...@@ -112,6 +113,11 @@ func (v *SharedSpecValidator) Validate(ctx context.Context) (admission.Warnings,
v.fieldPath)) v.fieldPath))
} }
// Validate service-level annotations
if err := v.validateServiceAnnotations(); err != nil {
return nil, err
}
// Validate EPP-specific constraints // Validate EPP-specific constraints
if err := v.validateEPPConfig(ctx); err != nil { if err := v.validateEPPConfig(ctx); err != nil {
return nil, err return nil, err
...@@ -224,3 +230,20 @@ func (v *SharedSpecValidator) checkInferencePoolAPIAvailability(ctx context.Cont ...@@ -224,3 +230,20 @@ func (v *SharedSpecValidator) checkInferencePoolAPIAvailability(ctx context.Cont
return nil return nil
} }
// validateServiceAnnotations validates known annotations on the service-level spec.
func (v *SharedSpecValidator) validateServiceAnnotations() error {
if v.spec.Annotations == nil {
return nil
}
if value, exists := v.spec.Annotations[consts.KubeAnnotationVLLMDistributedExecutorBackend]; exists {
switch strings.ToLower(value) {
case "mp", "ray":
// valid
default:
return fmt.Errorf("%s.annotations[%s] has invalid value %q: must be \"mp\" or \"ray\"",
v.fieldPath, consts.KubeAnnotationVLLMDistributedExecutorBackend, value)
}
}
return nil
}
...@@ -215,6 +215,40 @@ func TestSharedSpecValidator_Validate(t *testing.T) { ...@@ -215,6 +215,40 @@ func TestSharedSpecValidator_Validate(t *testing.T) {
wantErr: true, wantErr: true,
errMsg: "spec.services[main].replicas must be non-negative", errMsg: "spec.services[main].replicas must be non-negative",
}, },
{
name: "valid service annotation vllm-distributed-executor-backend=ray",
spec: &nvidiacomv1alpha1.DynamoComponentDeploymentSharedSpec{
Annotations: map[string]string{
"nvidia.com/vllm-distributed-executor-backend": "ray",
},
},
fieldPath: "spec.services[decode]",
calculatedNamespace: "default-my-dgd",
wantErr: false,
},
{
name: "valid service annotation vllm-distributed-executor-backend=mp",
spec: &nvidiacomv1alpha1.DynamoComponentDeploymentSharedSpec{
Annotations: map[string]string{
"nvidia.com/vllm-distributed-executor-backend": "mp",
},
},
fieldPath: "spec.services[decode]",
calculatedNamespace: "default-my-dgd",
wantErr: false,
},
{
name: "invalid service annotation vllm-distributed-executor-backend",
spec: &nvidiacomv1alpha1.DynamoComponentDeploymentSharedSpec{
Annotations: map[string]string{
"nvidia.com/vllm-distributed-executor-backend": "invalid",
},
},
fieldPath: "spec.services[decode]",
calculatedNamespace: "default-my-dgd",
wantErr: true,
errMsg: `spec.services[decode].annotations[nvidia.com/vllm-distributed-executor-backend] has invalid value "invalid": must be "mp" or "ray"`,
},
} }
for _, tt := range tests { for _, tt := range tests {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment