Unverified Commit 8dd6369e authored by Julien Mancuso's avatar Julien Mancuso Committed by GitHub
Browse files

feat: use vllm multiprocessing in multinode scenario (#6191)


Signed-off-by: default avatarJulien Mancuso <jmancuso@nvidia.com>
parent f391553e
...@@ -90,6 +90,17 @@ const ( ...@@ -90,6 +90,17 @@ const (
// Records which operator version created the resource, enabling version-gated behavior changes. // Records which operator version created the resource, enabling version-gated behavior changes.
KubeAnnotationDynamoOperatorOriginVersion = "nvidia.com/dynamo-operator-origin-version" KubeAnnotationDynamoOperatorOriginVersion = "nvidia.com/dynamo-operator-origin-version"
// vLLM distributed executor backend override annotation.
// Users can set this on a DGD to explicitly choose "mp" or "ray" for multi-node vLLM deployments.
// When present, takes priority over the version-based default.
KubeAnnotationVLLMDistributedExecutorBackend = "nvidia.com/vllm-distributed-executor-backend"
// VLLMMpMasterPort is the default port for vLLM multiprocessing coordination between nodes.
VLLMMpMasterPort = "29500"
// VLLMNixlSideChannelHostEnvVar is the env var that tells vLLM which host IP to use for the NIXL side channel.
VLLMNixlSideChannelHostEnvVar = "VLLM_NIXL_SIDE_CHANNEL_HOST"
// Metrics related constants // Metrics related constants
KubeAnnotationEnableMetrics = "nvidia.com/enable-metrics" // User-provided annotation to control metrics KubeAnnotationEnableMetrics = "nvidia.com/enable-metrics" // User-provided annotation to control metrics
KubeLabelMetricsEnabled = "nvidia.com/metrics-enabled" // Controller-managed label for pod selection KubeLabelMetricsEnabled = "nvidia.com/metrics-enabled" // Controller-managed label for pod selection
......
...@@ -62,7 +62,7 @@ func (b *SGLangBackend) UpdateContainer(container *corev1.Container, numberOfNod ...@@ -62,7 +62,7 @@ func (b *SGLangBackend) UpdateContainer(container *corev1.Container, numberOfNod
injectFlagsIntoContainerCommand(container, flags, needsShell, "sglang") injectFlagsIntoContainerCommand(container, flags, needsShell, "sglang")
} }
func (b *SGLangBackend) UpdatePodSpec(podSpec *corev1.PodSpec, numberOfNodes int32, role Role, component *v1alpha1.DynamoComponentDeploymentSharedSpec, serviceName string) { func (b *SGLangBackend) UpdatePodSpec(podSpec *corev1.PodSpec, numberOfNodes int32, role Role, component *v1alpha1.DynamoComponentDeploymentSharedSpec, serviceName string, multinodeDeployer MultinodeDeployer) {
// do nothing // do nothing
} }
......
...@@ -17,24 +17,6 @@ type TRTLLMBackend struct { ...@@ -17,24 +17,6 @@ type TRTLLMBackend struct {
MpiRunSecretName string MpiRunSecretName string
} }
// shellQuoteForBashC quotes a string for safe use inside a single-quoted bash -c '...' command.
// Since the outer context is already single-quoted, we CANNOT use single quotes here.
// Instead, we use double quotes and escape characters that are special inside double quotes:
// backslash, double quote, dollar sign, and backtick.
// Any embedded single quotes are also escaped (\') since the result lives inside bash -c '...'.
func shellQuoteForBashC(s string) string {
if strings.ContainsAny(s, " \t\n'\"\\{}[]$`!") {
escaped := s
escaped = strings.ReplaceAll(escaped, `\`, `\\`) // must be first
escaped = strings.ReplaceAll(escaped, `"`, `\"`)
escaped = strings.ReplaceAll(escaped, `$`, `\$`)
escaped = strings.ReplaceAll(escaped, "`", "\\`")
escaped = strings.ReplaceAll(escaped, "'", `'"'"'`) // end single-quote, literal ', restart single-quote
return `"` + escaped + `"`
}
return s
}
func (b *TRTLLMBackend) UpdateContainer(container *corev1.Container, numberOfNodes int32, role Role, component *v1alpha1.DynamoComponentDeploymentSharedSpec, serviceName string, multinodeDeployer MultinodeDeployer) { func (b *TRTLLMBackend) UpdateContainer(container *corev1.Container, numberOfNodes int32, role Role, component *v1alpha1.DynamoComponentDeploymentSharedSpec, serviceName string, multinodeDeployer MultinodeDeployer) {
// Check for volumeMounts with useAsCompilationCache=true // Check for volumeMounts with useAsCompilationCache=true
for _, volumeMount := range component.VolumeMounts { for _, volumeMount := range component.VolumeMounts {
...@@ -92,7 +74,7 @@ func (b *TRTLLMBackend) UpdateContainer(container *corev1.Container, numberOfNod ...@@ -92,7 +74,7 @@ func (b *TRTLLMBackend) UpdateContainer(container *corev1.Container, numberOfNod
} }
} }
func (b *TRTLLMBackend) UpdatePodSpec(podSpec *corev1.PodSpec, numberOfNodes int32, role Role, component *v1alpha1.DynamoComponentDeploymentSharedSpec, serviceName string) { func (b *TRTLLMBackend) UpdatePodSpec(podSpec *corev1.PodSpec, numberOfNodes int32, role Role, component *v1alpha1.DynamoComponentDeploymentSharedSpec, serviceName string, multinodeDeployer MultinodeDeployer) {
// Add SSH keypair volume for TRTLLM multinode deployments // Add SSH keypair volume for TRTLLM multinode deployments
if numberOfNodes > 1 { if numberOfNodes > 1 {
sshVolume := corev1.Volume{ sshVolume := corev1.Volume{
......
...@@ -448,7 +448,7 @@ func TestTRTLLMBackend_UpdatePodSpec(t *testing.T) { ...@@ -448,7 +448,7 @@ func TestTRTLLMBackend_UpdatePodSpec(t *testing.T) {
component := &v1alpha1.DynamoComponentDeploymentSharedSpec{} component := &v1alpha1.DynamoComponentDeploymentSharedSpec{}
// Call UpdatePodSpec // Call UpdatePodSpec
backend.UpdatePodSpec(podSpec, tt.numberOfNodes, tt.role, component, "test-service") backend.UpdatePodSpec(podSpec, tt.numberOfNodes, tt.role, component, "test-service", tt.multinodeDeployer)
// Check volume count // Check volume count
if len(podSpec.Volumes) != tt.expectedVolumeCount { if len(podSpec.Volumes) != tt.expectedVolumeCount {
......
...@@ -6,6 +6,8 @@ import ( ...@@ -6,6 +6,8 @@ import (
"strings" "strings"
"github.com/ai-dynamo/dynamo/deploy/operator/api/v1alpha1" "github.com/ai-dynamo/dynamo/deploy/operator/api/v1alpha1"
commonconsts "github.com/ai-dynamo/dynamo/deploy/operator/internal/consts"
"github.com/ai-dynamo/dynamo/deploy/operator/internal/featuregate"
corev1 "k8s.io/api/core/v1" corev1 "k8s.io/api/core/v1"
"sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/log"
) )
...@@ -25,7 +27,18 @@ func (b *VLLMBackend) UpdateContainer(container *corev1.Container, numberOfNodes ...@@ -25,7 +27,18 @@ func (b *VLLMBackend) UpdateContainer(container *corev1.Container, numberOfNodes
if isMultinode { if isMultinode {
// Apply multinode-specific argument modifications // Apply multinode-specific argument modifications
updateVLLMMultinodeArgs(container, role, serviceName, multinodeDeployer, component.Resources, numberOfNodes) updateVLLMMultinodeArgs(container, role, serviceName, multinodeDeployer, component.Resources, numberOfNodes, component.Annotations)
if shouldUseMpBackend(component.Annotations) {
container.Env = append(container.Env, corev1.EnvVar{
Name: commonconsts.VLLMNixlSideChannelHostEnvVar,
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: "status.podIP",
},
},
})
}
// Remove probes for multinode worker and leader // Remove probes for multinode worker and leader
if role == RoleWorker { if role == RoleWorker {
...@@ -65,21 +78,64 @@ func (b *VLLMBackend) UpdateContainer(container *corev1.Container, numberOfNodes ...@@ -65,21 +78,64 @@ func (b *VLLMBackend) UpdateContainer(container *corev1.Container, numberOfNodes
} }
} }
func (b *VLLMBackend) UpdatePodSpec(podSpec *corev1.PodSpec, numberOfNodes int32, role Role, component *v1alpha1.DynamoComponentDeploymentSharedSpec, serviceName string) { func (b *VLLMBackend) UpdatePodSpec(podSpec *corev1.PodSpec, numberOfNodes int32, role Role, component *v1alpha1.DynamoComponentDeploymentSharedSpec, serviceName string, multinodeDeployer MultinodeDeployer) {
// do nothing if numberOfNodes <= 1 || role != RoleWorker || !shouldUseMpBackend(component.Annotations) {
return
}
if len(podSpec.Containers) == 0 {
return
}
leaderHostname := multinodeDeployer.GetLeaderHostname(serviceName)
mainImage := podSpec.Containers[0].Image
waitScript := fmt.Sprintf(`import socket, time
host, port = "%s", %s
print(f"Waiting for leader master port at {host}:{port}...", flush=True)
start = time.monotonic()
last_status = start
last_err = ""
while True:
try:
s = socket.create_connection((host, port), timeout=2)
s.close()
elapsed = time.monotonic() - start
print(f"Leader master port ready (waited {elapsed:.1f}s)", flush=True)
break
except Exception as e:
last_err = f"{type(e).__name__}: {e}"
now = time.monotonic()
if now - last_status >= 30:
print(f"Still waiting for {host}:{port}... ({now - start:.0f}s elapsed, last error: {last_err})", flush=True)
last_status = now
time.sleep(2)
`, leaderHostname, commonconsts.VLLMMpMasterPort)
initContainer := corev1.Container{
Name: "wait-for-leader-mp",
Image: mainImage,
Command: []string{"python3", "-c", waitScript},
}
podSpec.InitContainers = append(podSpec.InitContainers, initContainer)
} }
// updateVLLMMultinodeArgs will inject Ray-specific flags for tensor parallel multinode deployments // updateVLLMMultinodeArgs dispatches to the appropriate injection function based on
// OR data parallel flags for data parallel multinode deployments // parallelism strategy (TP/PP distributed vs data-parallel) and executor backend (mp vs ray).
func updateVLLMMultinodeArgs(container *corev1.Container, role Role, serviceName string, multinodeDeployer MultinodeDeployer, resources *v1alpha1.Resources, numberOfNodes int32) { func updateVLLMMultinodeArgs(container *corev1.Container, role Role, serviceName string, multinodeDeployer MultinodeDeployer, resources *v1alpha1.Resources, numberOfNodes int32, annotations map[string]string) {
expandedArgs := getExpandedArgs(container) expandedArgs := getExpandedArgs(container)
if needsRayDistributedLaunch(expandedArgs, resources) { needsDistributed := needsTensorParallelMultinodeLaunch(expandedArgs, resources)
if needsDistributed && shouldUseMpBackend(annotations) {
injectMpDistributedLaunchFlags(container, role, serviceName, multinodeDeployer, numberOfNodes)
} else if needsDistributed {
injectRayDistributedLaunchFlags(container, role, serviceName, multinodeDeployer) injectRayDistributedLaunchFlags(container, role, serviceName, multinodeDeployer)
} else if needsDataParallelLaunch(expandedArgs, resources) { } else if needsDataParallelMultinodeLaunch(expandedArgs, resources) {
injectDataParallelLaunchFlags(container, role, serviceName, multinodeDeployer, resources, numberOfNodes) injectDataParallelLaunchFlags(container, role, serviceName, multinodeDeployer, resources, numberOfNodes)
} else { } else {
logger := log.Log.WithName("vllm-backend") logger := log.Log.WithName("vllm-backend")
logger.Info("No need to inject Ray-specific or data parallel flags for multinode deployments", "args", strings.Join(container.Args, " ")) logger.Info("No need to inject tensor or data parallel flags for multinode deployments", "args", strings.Join(container.Args, " "))
} }
} }
...@@ -93,6 +149,61 @@ func getExpandedArgs(container *corev1.Container) []string { ...@@ -93,6 +149,61 @@ func getExpandedArgs(container *corev1.Container) []string {
return expandedArgs return expandedArgs
} }
// shouldUseMpBackend determines whether to use multiprocessing (mp) or Ray for vLLM
// multi-node distributed launches.
//
// Decision logic:
// 1. Explicit override annotation takes priority (user set "mp" or "ray")
// 2. Operator origin version feature gate: uses featuregate.VLLMMultiprocessing
func shouldUseMpBackend(annotations map[string]string) bool {
logger := log.Log.WithName("vllm-backend")
// Step 1: Check explicit override
if override, exists := annotations[commonconsts.KubeAnnotationVLLMDistributedExecutorBackend]; exists {
switch strings.ToLower(override) {
case "mp":
logger.Info("Using mp backend (explicit override)")
return true
case "ray":
logger.Info("Using ray backend (explicit override)")
return false
default:
logger.Info("Ignoring invalid vllm-distributed-executor-backend annotation value, falling through to version check",
"value", override)
}
}
// Step 2: Check operator origin version gate
return featuregate.VLLMMultiprocessing.IsEnabled(annotations)
}
// injectMpDistributedLaunchFlags injects vLLM multiprocessing flags for multi-node TP/PP deployments.
//
// Leader: runs the original vLLM command with --distributed-executor-backend mp,
// --nnodes, --node-rank 0, --master-addr, --master-port
//
// Worker: runs the same vLLM command with --headless, --node-rank <rank>, and the same
// coordination flags. An init container (injected via UpdatePodSpec) handles waiting for
// the leader's master port before the worker's main container starts.
func injectMpDistributedLaunchFlags(container *corev1.Container, role Role, serviceName string, multinodeDeployer MultinodeDeployer, numberOfNodes int32) {
leaderHostname := multinodeDeployer.GetLeaderHostname(serviceName)
mpFlags := fmt.Sprintf("--distributed-executor-backend mp --nnodes %d --master-addr %s --master-port %s",
numberOfNodes, leaderHostname, commonconsts.VLLMMpMasterPort)
needsShell := false
switch role {
case RoleLeader:
mpFlags += " --node-rank 0"
case RoleWorker:
nodeRank, needsShellForRank := multinodeDeployer.GetNodeRank()
needsShell = needsShellForRank
mpFlags += fmt.Sprintf(" --node-rank %s --headless", nodeRank)
}
injectFlagsIntoContainerCommand(container, mpFlags, needsShell, "vllm")
}
func injectRayDistributedLaunchFlags(container *corev1.Container, role Role, serviceName string, multinodeDeployer MultinodeDeployer) { func injectRayDistributedLaunchFlags(container *corev1.Container, role Role, serviceName string, multinodeDeployer MultinodeDeployer) {
switch role { switch role {
case RoleLeader: case RoleLeader:
...@@ -179,9 +290,9 @@ func injectDataParallelLaunchFlags(container *corev1.Container, role Role, servi ...@@ -179,9 +290,9 @@ func injectDataParallelLaunchFlags(container *corev1.Container, role Role, servi
injectFlagsIntoContainerCommand(container, strings.Join(flags, " "), needsShell, "vllm") injectFlagsIntoContainerCommand(container, strings.Join(flags, " "), needsShell, "vllm")
} }
// if world size (within DP rank) > GPU count, then we need to inject ray // needsMultinodeDistributedLaunch returns true when the model's world size (TP * PP)
// world size = tensor parallel size * pipeline parallel size // exceeds the GPU count of a single node, requiring multi-node distribution (via mp or ray).
func needsRayDistributedLaunch(expandedArgs []string, resources *v1alpha1.Resources) bool { func needsTensorParallelMultinodeLaunch(expandedArgs []string, resources *v1alpha1.Resources) bool {
containerGPUs := getContainerGPUs(resources) containerGPUs := getContainerGPUs(resources)
if containerGPUs == 0 { if containerGPUs == 0 {
return false return false
...@@ -196,7 +307,7 @@ func getWorldSize(expandedArgs []string) int64 { ...@@ -196,7 +307,7 @@ func getWorldSize(expandedArgs []string) int64 {
} }
// if world size across all DP ranks > GPU count, then we need to inject data parallel multinode coordination // if world size across all DP ranks > GPU count, then we need to inject data parallel multinode coordination
func needsDataParallelLaunch(expandedArgs []string, resources *v1alpha1.Resources) bool { func needsDataParallelMultinodeLaunch(expandedArgs []string, resources *v1alpha1.Resources) bool {
dataParallelSize := getFlagValue(expandedArgs, dataParallelSizeFlag) dataParallelSize := getFlagValue(expandedArgs, dataParallelSizeFlag)
containerGPUs := getContainerGPUs(resources) containerGPUs := getContainerGPUs(resources)
if containerGPUs == 0 { if containerGPUs == 0 {
......
...@@ -7,6 +7,7 @@ import ( ...@@ -7,6 +7,7 @@ import (
"testing" "testing"
"github.com/ai-dynamo/dynamo/deploy/operator/api/v1alpha1" "github.com/ai-dynamo/dynamo/deploy/operator/api/v1alpha1"
commonconsts "github.com/ai-dynamo/dynamo/deploy/operator/internal/consts"
"github.com/onsi/gomega" "github.com/onsi/gomega"
corev1 "k8s.io/api/core/v1" corev1 "k8s.io/api/core/v1"
) )
...@@ -37,7 +38,7 @@ func TestVLLMBackend_UpdateContainer(t *testing.T) { ...@@ -37,7 +38,7 @@ func TestVLLMBackend_UpdateContainer(t *testing.T) {
expectNotModified: true, expectNotModified: true,
}, },
{ {
name: "multinode leader prepends ray start --head", name: "multinode leader uses ray (no annotations = legacy)",
numberOfNodes: 3, numberOfNodes: 3,
role: RoleLeader, role: RoleLeader,
component: &v1alpha1.DynamoComponentDeploymentSharedSpec{}, component: &v1alpha1.DynamoComponentDeploymentSharedSpec{},
...@@ -48,7 +49,7 @@ func TestVLLMBackend_UpdateContainer(t *testing.T) { ...@@ -48,7 +49,7 @@ func TestVLLMBackend_UpdateContainer(t *testing.T) {
expectProbesRemoved: true, expectProbesRemoved: true,
}, },
{ {
name: "multinode worker replaces args with ray start --block", name: "multinode worker uses ray (no annotations = legacy)",
numberOfNodes: 3, numberOfNodes: 3,
role: RoleWorker, role: RoleWorker,
component: &v1alpha1.DynamoComponentDeploymentSharedSpec{}, component: &v1alpha1.DynamoComponentDeploymentSharedSpec{},
...@@ -59,7 +60,7 @@ func TestVLLMBackend_UpdateContainer(t *testing.T) { ...@@ -59,7 +60,7 @@ func TestVLLMBackend_UpdateContainer(t *testing.T) {
expectProbesRemoved: true, expectProbesRemoved: true,
}, },
{ {
name: "multinode worker with LWS deployment type", name: "multinode worker with LWS deployment type (no annotations = legacy ray)",
numberOfNodes: 2, numberOfNodes: 2,
role: RoleWorker, role: RoleWorker,
component: &v1alpha1.DynamoComponentDeploymentSharedSpec{}, component: &v1alpha1.DynamoComponentDeploymentSharedSpec{},
...@@ -89,6 +90,69 @@ func TestVLLMBackend_UpdateContainer(t *testing.T) { ...@@ -89,6 +90,69 @@ func TestVLLMBackend_UpdateContainer(t *testing.T) {
gpuCount: 0, gpuCount: 0,
expectNotModified: true, expectNotModified: true,
}, },
{
name: "multinode leader uses mp (origin version >= threshold)",
numberOfNodes: 2,
role: RoleLeader,
component: &v1alpha1.DynamoComponentDeploymentSharedSpec{
Annotations: map[string]string{
commonconsts.KubeAnnotationDynamoOperatorOriginVersion: "1.0.0",
},
},
multinodeDeployer: &GroveMultinodeDeployer{},
initialContainer: &corev1.Container{Command: []string{"python3"}, Args: []string{"-m", "dynamo.vllm", tensorParallelSizeFlag, "16"}},
gpuCount: 8,
expectedArgs: []string{"-m", "dynamo.vllm", tensorParallelSizeFlag, "16", "--distributed-executor-backend", "mp", "--nnodes", "2", "--master-addr", "$(GROVE_PCSG_NAME)-$(GROVE_PCSG_INDEX)-test-service-ldr-0.$(GROVE_HEADLESS_SERVICE)", "--master-port", commonconsts.VLLMMpMasterPort, "--node-rank", "0"},
expectProbesRemoved: true,
},
{
name: "multinode worker uses mp (origin version >= threshold) Grove",
numberOfNodes: 2,
role: RoleWorker,
component: &v1alpha1.DynamoComponentDeploymentSharedSpec{
Annotations: map[string]string{
commonconsts.KubeAnnotationDynamoOperatorOriginVersion: "1.0.0",
},
},
multinodeDeployer: &GroveMultinodeDeployer{},
initialContainer: &corev1.Container{Command: []string{"python3"}, Args: []string{"-m", "dynamo.vllm", tensorParallelSizeFlag, "16"}},
gpuCount: 8,
expectedArgs: []string{fmt.Sprintf(
"exec python3 -m dynamo.vllm %s 16 --distributed-executor-backend mp --nnodes 2 --master-addr $(GROVE_PCSG_NAME)-$(GROVE_PCSG_INDEX)-test-service-ldr-0.$(GROVE_HEADLESS_SERVICE) --master-port %s --node-rank $((GROVE_PCLQ_POD_INDEX + 1)) --headless",
tensorParallelSizeFlag, commonconsts.VLLMMpMasterPort)},
expectProbesRemoved: true,
},
{
name: "multinode leader uses ray (explicit override despite new version)",
numberOfNodes: 2,
role: RoleLeader,
component: &v1alpha1.DynamoComponentDeploymentSharedSpec{
Annotations: map[string]string{
commonconsts.KubeAnnotationDynamoOperatorOriginVersion: "1.0.0",
commonconsts.KubeAnnotationVLLMDistributedExecutorBackend: "ray",
},
},
multinodeDeployer: &GroveMultinodeDeployer{},
initialContainer: &corev1.Container{Command: []string{"python3", "-m", "dynamo.vllm"}, Args: []string{"--model", "test", tensorParallelSizeFlag, "8"}},
gpuCount: 4,
expectedArgs: []string{fmt.Sprintf("ray start --head --port=%s && python3 -m dynamo.vllm --model test %s 8 --distributed-executor-backend ray", VLLMPort, tensorParallelSizeFlag)},
expectProbesRemoved: true,
},
{
name: "multinode leader uses mp (explicit override on legacy DGD)",
numberOfNodes: 2,
role: RoleLeader,
component: &v1alpha1.DynamoComponentDeploymentSharedSpec{
Annotations: map[string]string{
commonconsts.KubeAnnotationVLLMDistributedExecutorBackend: "mp",
},
},
multinodeDeployer: &GroveMultinodeDeployer{},
initialContainer: &corev1.Container{Command: []string{"python3"}, Args: []string{"-m", "dynamo.vllm", tensorParallelSizeFlag, "16"}},
gpuCount: 8,
expectedArgs: []string{"-m", "dynamo.vllm", tensorParallelSizeFlag, "16", "--distributed-executor-backend", "mp", "--nnodes", "2", "--master-addr", "$(GROVE_PCSG_NAME)-$(GROVE_PCSG_INDEX)-test-service-ldr-0.$(GROVE_HEADLESS_SERVICE)", "--master-port", commonconsts.VLLMMpMasterPort, "--node-rank", "0"},
expectProbesRemoved: true,
},
} }
for _, tt := range tests { for _, tt := range tests {
...@@ -328,24 +392,64 @@ func TestUpdateVLLMMultinodeArgs(t *testing.T) { ...@@ -328,24 +392,64 @@ func TestUpdateVLLMMultinodeArgs(t *testing.T) {
role Role role Role
multinodeDeployer MultinodeDeployer multinodeDeployer MultinodeDeployer
initialContainer *corev1.Container initialContainer *corev1.Container
gpuCount int64 // GPU count for the test case gpuCount int64
annotations map[string]string // nil = legacy (no annotations)
expectedArgs []string expectedArgs []string
expectNotModified bool expectNotModified bool
}{ }{
{ {
name: "leader prepends ray start --head", name: "leader uses ray (nil annotations = legacy)",
role: RoleLeader, role: RoleLeader,
multinodeDeployer: &GroveMultinodeDeployer{}, multinodeDeployer: &GroveMultinodeDeployer{},
initialContainer: &corev1.Container{Command: []string{"python3"}, Args: []string{"-m", "dynamo.vllm", tensorParallelSizeFlag, "16"}}, initialContainer: &corev1.Container{Command: []string{"python3"}, Args: []string{"-m", "dynamo.vllm", tensorParallelSizeFlag, "16"}},
gpuCount: 8, gpuCount: 8,
annotations: nil,
expectedArgs: []string{fmt.Sprintf("ray start --head --port=%s && python3 -m dynamo.vllm %s 16 --distributed-executor-backend ray", VLLMPort, tensorParallelSizeFlag)}, expectedArgs: []string{fmt.Sprintf("ray start --head --port=%s && python3 -m dynamo.vllm %s 16 --distributed-executor-backend ray", VLLMPort, tensorParallelSizeFlag)},
}, },
{ {
name: "leader prepends distributed data parallel flags", name: "leader uses mp (origin version >= threshold)",
role: RoleLeader,
multinodeDeployer: &GroveMultinodeDeployer{},
initialContainer: &corev1.Container{Command: []string{"python3"}, Args: []string{"-m", "dynamo.vllm", tensorParallelSizeFlag, "16"}},
gpuCount: 8,
annotations: map[string]string{
commonconsts.KubeAnnotationDynamoOperatorOriginVersion: "1.0.0",
},
expectedArgs: []string{"-m", "dynamo.vllm", tensorParallelSizeFlag, "16", "--distributed-executor-backend", "mp", "--nnodes", "2", "--master-addr", "$(GROVE_PCSG_NAME)-$(GROVE_PCSG_INDEX)-test-service-ldr-0.$(GROVE_HEADLESS_SERVICE)", "--master-port", commonconsts.VLLMMpMasterPort, "--node-rank", "0"},
},
{
name: "worker uses mp (origin version >= threshold) Grove",
role: RoleWorker,
multinodeDeployer: &GroveMultinodeDeployer{},
initialContainer: &corev1.Container{Command: []string{"python3"}, Args: []string{"-m", "dynamo.vllm", tensorParallelSizeFlag, "16"}},
gpuCount: 8,
annotations: map[string]string{
commonconsts.KubeAnnotationDynamoOperatorOriginVersion: "1.0.0",
},
expectedArgs: []string{fmt.Sprintf(
"exec python3 -m dynamo.vllm %s 16 --distributed-executor-backend mp --nnodes 2 --master-addr $(GROVE_PCSG_NAME)-$(GROVE_PCSG_INDEX)-test-service-ldr-0.$(GROVE_HEADLESS_SERVICE) --master-port %s --node-rank $((GROVE_PCLQ_POD_INDEX + 1)) --headless",
tensorParallelSizeFlag, commonconsts.VLLMMpMasterPort)},
},
{
name: "worker uses mp (origin version >= threshold) LWS",
role: RoleWorker,
multinodeDeployer: &LWSMultinodeDeployer{},
initialContainer: &corev1.Container{Command: []string{"python3"}, Args: []string{"-m", "dynamo.vllm", tensorParallelSizeFlag, "16"}},
gpuCount: 8,
annotations: map[string]string{
commonconsts.KubeAnnotationDynamoOperatorOriginVersion: "1.0.0",
},
expectedArgs: []string{fmt.Sprintf(
"exec python3 -m dynamo.vllm %s 16 --distributed-executor-backend mp --nnodes 2 --master-addr $LWS_LEADER_ADDRESS --master-port %s --node-rank $(LWS_WORKER_INDEX) --headless",
tensorParallelSizeFlag, commonconsts.VLLMMpMasterPort)},
},
{
name: "leader prepends distributed data parallel flags (annotations don't affect DP path)",
role: RoleLeader, role: RoleLeader,
multinodeDeployer: &GroveMultinodeDeployer{}, multinodeDeployer: &GroveMultinodeDeployer{},
initialContainer: &corev1.Container{Command: []string{"python3"}, Args: []string{"-m", "dynamo.vllm", dataParallelSizeFlag, "16"}}, initialContainer: &corev1.Container{Command: []string{"python3"}, Args: []string{"-m", "dynamo.vllm", dataParallelSizeFlag, "16"}},
gpuCount: 8, gpuCount: 8,
annotations: nil,
expectedArgs: []string{"-m", "dynamo.vllm", dataParallelSizeFlag, "16", "--data-parallel-hybrid-lb", "--data-parallel-size-local", "8", "--data-parallel-start-rank", "0", "--data-parallel-address", "$(GROVE_PCSG_NAME)-$(GROVE_PCSG_INDEX)-test-service-ldr-0.$(GROVE_HEADLESS_SERVICE)", "--data-parallel-rpc-port", "13445"}, expectedArgs: []string{"-m", "dynamo.vllm", dataParallelSizeFlag, "16", "--data-parallel-hybrid-lb", "--data-parallel-size-local", "8", "--data-parallel-start-rank", "0", "--data-parallel-address", "$(GROVE_PCSG_NAME)-$(GROVE_PCSG_INDEX)-test-service-ldr-0.$(GROVE_HEADLESS_SERVICE)", "--data-parallel-rpc-port", "13445"},
}, },
{ {
...@@ -354,14 +458,16 @@ func TestUpdateVLLMMultinodeArgs(t *testing.T) { ...@@ -354,14 +458,16 @@ func TestUpdateVLLMMultinodeArgs(t *testing.T) {
multinodeDeployer: &GroveMultinodeDeployer{}, multinodeDeployer: &GroveMultinodeDeployer{},
initialContainer: &corev1.Container{Args: []string{}}, initialContainer: &corev1.Container{Args: []string{}},
gpuCount: 0, gpuCount: 0,
annotations: nil,
expectNotModified: true, expectNotModified: true,
}, },
{ {
name: "worker with ray distributed launch Grove", name: "worker with ray distributed launch Grove (nil annotations)",
role: RoleWorker, role: RoleWorker,
multinodeDeployer: &GroveMultinodeDeployer{}, multinodeDeployer: &GroveMultinodeDeployer{},
initialContainer: &corev1.Container{Args: []string{"python3", "-m", "dynamo.vllm", tensorParallelSizeFlag, "16"}}, initialContainer: &corev1.Container{Args: []string{"python3", "-m", "dynamo.vllm", tensorParallelSizeFlag, "16"}},
gpuCount: 8, gpuCount: 8,
annotations: nil,
expectedArgs: []string{"ray start --address=$(GROVE_PCSG_NAME)-$(GROVE_PCSG_INDEX)-test-service-ldr-0.$(GROVE_HEADLESS_SERVICE):6379 --block"}, expectedArgs: []string{"ray start --address=$(GROVE_PCSG_NAME)-$(GROVE_PCSG_INDEX)-test-service-ldr-0.$(GROVE_HEADLESS_SERVICE):6379 --block"},
}, },
{ {
...@@ -370,6 +476,7 @@ func TestUpdateVLLMMultinodeArgs(t *testing.T) { ...@@ -370,6 +476,7 @@ func TestUpdateVLLMMultinodeArgs(t *testing.T) {
multinodeDeployer: &GroveMultinodeDeployer{}, multinodeDeployer: &GroveMultinodeDeployer{},
initialContainer: &corev1.Container{Command: []string{"python3"}, Args: []string{"-m", "dynamo.vllm", dataParallelSizeFlag, "16"}}, initialContainer: &corev1.Container{Command: []string{"python3"}, Args: []string{"-m", "dynamo.vllm", dataParallelSizeFlag, "16"}},
gpuCount: 8, gpuCount: 8,
annotations: nil,
expectedArgs: []string{fmt.Sprintf("exec python3 -m dynamo.vllm %s 16 --data-parallel-hybrid-lb --data-parallel-size-local 8 --data-parallel-start-rank $(( 8 * $((GROVE_PCLQ_POD_INDEX + 1)) )) --data-parallel-address $(GROVE_PCSG_NAME)-$(GROVE_PCSG_INDEX)-test-service-ldr-0.$(GROVE_HEADLESS_SERVICE) --data-parallel-rpc-port 13445", dataParallelSizeFlag)}, expectedArgs: []string{fmt.Sprintf("exec python3 -m dynamo.vllm %s 16 --data-parallel-hybrid-lb --data-parallel-size-local 8 --data-parallel-start-rank $(( 8 * $((GROVE_PCLQ_POD_INDEX + 1)) )) --data-parallel-address $(GROVE_PCSG_NAME)-$(GROVE_PCSG_INDEX)-test-service-ldr-0.$(GROVE_HEADLESS_SERVICE) --data-parallel-rpc-port 13445", dataParallelSizeFlag)},
}, },
{ {
...@@ -378,14 +485,16 @@ func TestUpdateVLLMMultinodeArgs(t *testing.T) { ...@@ -378,14 +485,16 @@ func TestUpdateVLLMMultinodeArgs(t *testing.T) {
multinodeDeployer: &GroveMultinodeDeployer{}, multinodeDeployer: &GroveMultinodeDeployer{},
initialContainer: &corev1.Container{Command: []string{"python3"}, Args: []string{"-m", "dynamo.vllm", dataParallelSizeFlag, "8", tensorParallelSizeFlag, "2"}}, initialContainer: &corev1.Container{Command: []string{"python3"}, Args: []string{"-m", "dynamo.vllm", dataParallelSizeFlag, "8", tensorParallelSizeFlag, "2"}},
gpuCount: 8, gpuCount: 8,
annotations: nil,
expectedArgs: []string{fmt.Sprintf("exec python3 -m dynamo.vllm %s 8 %s 2 --data-parallel-hybrid-lb --data-parallel-size-local 4 --data-parallel-start-rank $(( 4 * $((GROVE_PCLQ_POD_INDEX + 1)) )) --data-parallel-address $(GROVE_PCSG_NAME)-$(GROVE_PCSG_INDEX)-test-service-ldr-0.$(GROVE_HEADLESS_SERVICE) --data-parallel-rpc-port 13445", dataParallelSizeFlag, tensorParallelSizeFlag)}, expectedArgs: []string{fmt.Sprintf("exec python3 -m dynamo.vllm %s 8 %s 2 --data-parallel-hybrid-lb --data-parallel-size-local 4 --data-parallel-start-rank $(( 4 * $((GROVE_PCLQ_POD_INDEX + 1)) )) --data-parallel-address $(GROVE_PCSG_NAME)-$(GROVE_PCSG_INDEX)-test-service-ldr-0.$(GROVE_HEADLESS_SERVICE) --data-parallel-rpc-port 13445", dataParallelSizeFlag, tensorParallelSizeFlag)},
}, },
{ {
name: "worker with ray distributed launch LWS", name: "worker with ray distributed launch LWS (nil annotations)",
role: RoleWorker, role: RoleWorker,
multinodeDeployer: &LWSMultinodeDeployer{}, multinodeDeployer: &LWSMultinodeDeployer{},
initialContainer: &corev1.Container{Args: []string{"python3", "-m", "dynamo.vllm", tensorParallelSizeFlag, "16"}}, initialContainer: &corev1.Container{Args: []string{"python3", "-m", "dynamo.vllm", tensorParallelSizeFlag, "16"}},
gpuCount: 8, gpuCount: 8,
annotations: nil,
expectedArgs: []string{"ray start --address=$LWS_LEADER_ADDRESS:6379 --block"}, expectedArgs: []string{"ray start --address=$LWS_LEADER_ADDRESS:6379 --block"},
}, },
{ {
...@@ -394,6 +503,7 @@ func TestUpdateVLLMMultinodeArgs(t *testing.T) { ...@@ -394,6 +503,7 @@ func TestUpdateVLLMMultinodeArgs(t *testing.T) {
multinodeDeployer: &GroveMultinodeDeployer{}, multinodeDeployer: &GroveMultinodeDeployer{},
initialContainer: &corev1.Container{Args: []string{"python3", "-m", "dynamo.frontend"}}, initialContainer: &corev1.Container{Args: []string{"python3", "-m", "dynamo.frontend"}},
gpuCount: 0, gpuCount: 0,
annotations: nil,
expectNotModified: true, expectNotModified: true,
}, },
} }
...@@ -414,8 +524,8 @@ func TestUpdateVLLMMultinodeArgs(t *testing.T) { ...@@ -414,8 +524,8 @@ func TestUpdateVLLMMultinodeArgs(t *testing.T) {
} }
} }
// Call updateVLLMMultinodeArgs // Call updateVLLMMultinodeArgs with annotations
updateVLLMMultinodeArgs(tt.initialContainer, tt.role, "test-service", tt.multinodeDeployer, resources, 2) updateVLLMMultinodeArgs(tt.initialContainer, tt.role, "test-service", tt.multinodeDeployer, resources, 2, tt.annotations)
if tt.expectNotModified { if tt.expectNotModified {
// Args should not have changed // Args should not have changed
...@@ -427,3 +537,236 @@ func TestUpdateVLLMMultinodeArgs(t *testing.T) { ...@@ -427,3 +537,236 @@ func TestUpdateVLLMMultinodeArgs(t *testing.T) {
}) })
} }
} }
func TestVLLMBackend_UpdatePodSpec(t *testing.T) {
backend := &VLLMBackend{}
tests := []struct {
name string
numberOfNodes int32
role Role
component *v1alpha1.DynamoComponentDeploymentSharedSpec
multinodeDeployer MultinodeDeployer
initialPodSpec *corev1.PodSpec
expectInitContainer bool
expectedInitName string
expectedInitImage string
expectedInitCommandLen int
expectWaitScriptContent string
}{
{
name: "mp worker with Grove deployer injects init container",
numberOfNodes: 2,
role: RoleWorker,
component: &v1alpha1.DynamoComponentDeploymentSharedSpec{
Annotations: map[string]string{
commonconsts.KubeAnnotationDynamoOperatorOriginVersion: "1.0.0",
},
},
multinodeDeployer: &GroveMultinodeDeployer{},
initialPodSpec: &corev1.PodSpec{
Containers: []corev1.Container{
{Name: "main", Image: "vllm:latest"},
},
},
expectInitContainer: true,
expectedInitName: "wait-for-leader-mp",
expectedInitImage: "vllm:latest",
expectedInitCommandLen: 3,
expectWaitScriptContent: "$(GROVE_PCSG_NAME)-$(GROVE_PCSG_INDEX)-test-service-ldr-0.$(GROVE_HEADLESS_SERVICE)",
},
{
name: "mp worker with LWS deployer injects init container",
numberOfNodes: 2,
role: RoleWorker,
component: &v1alpha1.DynamoComponentDeploymentSharedSpec{
Annotations: map[string]string{
commonconsts.KubeAnnotationDynamoOperatorOriginVersion: "1.0.0",
},
},
multinodeDeployer: &LWSMultinodeDeployer{},
initialPodSpec: &corev1.PodSpec{
Containers: []corev1.Container{
{Name: "main", Image: "vllm:v2"},
},
},
expectInitContainer: true,
expectedInitName: "wait-for-leader-mp",
expectedInitImage: "vllm:v2",
expectedInitCommandLen: 3,
expectWaitScriptContent: "$LWS_LEADER_ADDRESS",
},
{
name: "mp leader does not inject init container",
numberOfNodes: 2,
role: RoleLeader,
component: &v1alpha1.DynamoComponentDeploymentSharedSpec{
Annotations: map[string]string{
commonconsts.KubeAnnotationDynamoOperatorOriginVersion: "1.0.0",
},
},
multinodeDeployer: &GroveMultinodeDeployer{},
initialPodSpec: &corev1.PodSpec{
Containers: []corev1.Container{
{Name: "main", Image: "vllm:latest"},
},
},
expectInitContainer: false,
},
{
name: "ray worker does not inject init container (legacy)",
numberOfNodes: 2,
role: RoleWorker,
component: &v1alpha1.DynamoComponentDeploymentSharedSpec{},
multinodeDeployer: &GroveMultinodeDeployer{},
initialPodSpec: &corev1.PodSpec{
Containers: []corev1.Container{
{Name: "main", Image: "vllm:latest"},
},
},
expectInitContainer: false,
},
{
name: "single node does not inject init container",
numberOfNodes: 1,
role: RoleMain,
component: &v1alpha1.DynamoComponentDeploymentSharedSpec{
Annotations: map[string]string{
commonconsts.KubeAnnotationDynamoOperatorOriginVersion: "1.0.0",
},
},
multinodeDeployer: &GroveMultinodeDeployer{},
initialPodSpec: &corev1.PodSpec{
Containers: []corev1.Container{
{Name: "main", Image: "vllm:latest"},
},
},
expectInitContainer: false,
},
{
name: "mp worker preserves existing init containers",
numberOfNodes: 2,
role: RoleWorker,
component: &v1alpha1.DynamoComponentDeploymentSharedSpec{
Annotations: map[string]string{
commonconsts.KubeAnnotationDynamoOperatorOriginVersion: "1.0.0",
},
},
multinodeDeployer: &GroveMultinodeDeployer{},
initialPodSpec: &corev1.PodSpec{
InitContainers: []corev1.Container{
{Name: "existing-init", Image: "busybox"},
},
Containers: []corev1.Container{
{Name: "main", Image: "vllm:latest"},
},
},
expectInitContainer: true,
expectedInitName: "wait-for-leader-mp",
expectedInitImage: "vllm:latest",
expectedInitCommandLen: 3,
expectWaitScriptContent: "$(GROVE_PCSG_NAME)-$(GROVE_PCSG_INDEX)-test-service-ldr-0.$(GROVE_HEADLESS_SERVICE)",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
g := gomega.NewGomegaWithT(t)
initialInitCount := len(tt.initialPodSpec.InitContainers)
backend.UpdatePodSpec(tt.initialPodSpec, tt.numberOfNodes, tt.role, tt.component, "test-service", tt.multinodeDeployer)
if tt.expectInitContainer {
g.Expect(len(tt.initialPodSpec.InitContainers)).To(gomega.Equal(initialInitCount + 1))
injected := tt.initialPodSpec.InitContainers[len(tt.initialPodSpec.InitContainers)-1]
g.Expect(injected.Name).To(gomega.Equal(tt.expectedInitName))
g.Expect(injected.Image).To(gomega.Equal(tt.expectedInitImage))
g.Expect(len(injected.Command)).To(gomega.Equal(tt.expectedInitCommandLen))
g.Expect(injected.Command[0]).To(gomega.Equal("python3"))
g.Expect(injected.Command[1]).To(gomega.Equal("-c"))
g.Expect(injected.Command[2]).To(gomega.ContainSubstring(tt.expectWaitScriptContent))
g.Expect(injected.Command[2]).To(gomega.ContainSubstring("socket.create_connection"))
g.Expect(injected.Command[2]).To(gomega.ContainSubstring(commonconsts.VLLMMpMasterPort))
} else {
g.Expect(len(tt.initialPodSpec.InitContainers)).To(gomega.Equal(initialInitCount))
}
})
}
}
func TestShouldUseMpBackend(t *testing.T) {
// Version-based gate behavior is tested in featuregate.TestOperatorOriginFeatureGate_IsEnabled.
// These tests focus on the explicit override logic and its interaction with the feature gate.
tests := []struct {
name string
annotations map[string]string
want bool
}{
{
name: "nil annotations = legacy = ray (delegates to feature gate)",
annotations: nil,
want: false,
},
{
name: "empty annotations = legacy = ray (delegates to feature gate)",
annotations: map[string]string{},
want: false,
},
{
name: "explicit override mp takes priority over version",
annotations: map[string]string{
commonconsts.KubeAnnotationDynamoOperatorOriginVersion: "0.1.0",
commonconsts.KubeAnnotationVLLMDistributedExecutorBackend: "mp",
},
want: true,
},
{
name: "explicit override ray takes priority over version",
annotations: map[string]string{
commonconsts.KubeAnnotationDynamoOperatorOriginVersion: "1.0.0",
commonconsts.KubeAnnotationVLLMDistributedExecutorBackend: "ray",
},
want: false,
},
{
name: "explicit override mp (no origin version)",
annotations: map[string]string{
commonconsts.KubeAnnotationVLLMDistributedExecutorBackend: "mp",
},
want: true,
},
{
name: "explicit override with invalid value falls through to feature gate",
annotations: map[string]string{
commonconsts.KubeAnnotationDynamoOperatorOriginVersion: "1.0.0",
commonconsts.KubeAnnotationVLLMDistributedExecutorBackend: "invalid",
},
want: true, // invalid override ignored, version >= threshold via feature gate
},
{
name: "explicit override case insensitive MP",
annotations: map[string]string{
commonconsts.KubeAnnotationVLLMDistributedExecutorBackend: "MP",
},
want: true,
},
{
name: "explicit override case insensitive Ray",
annotations: map[string]string{
commonconsts.KubeAnnotationDynamoOperatorOriginVersion: "1.0.0",
commonconsts.KubeAnnotationVLLMDistributedExecutorBackend: "RAY",
},
want: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := shouldUseMpBackend(tt.annotations)
if got != tt.want {
t.Errorf("shouldUseMpBackend() = %v, want %v", got, tt.want)
}
})
}
}
...@@ -867,7 +867,7 @@ func ParseBackendFramework(framework string) (BackendFramework, error) { ...@@ -867,7 +867,7 @@ func ParseBackendFramework(framework string) (BackendFramework, error) {
// Each backend (SGLang, VLLM, etc.) implements this interface // Each backend (SGLang, VLLM, etc.) implements this interface
type Backend interface { type Backend interface {
UpdateContainer(container *corev1.Container, numberOfNodes int32, role Role, component *v1alpha1.DynamoComponentDeploymentSharedSpec, serviceName string, multinodeDeployer MultinodeDeployer) UpdateContainer(container *corev1.Container, numberOfNodes int32, role Role, component *v1alpha1.DynamoComponentDeploymentSharedSpec, serviceName string, multinodeDeployer MultinodeDeployer)
UpdatePodSpec(podSpec *corev1.PodSpec, numberOfNodes int32, role Role, component *v1alpha1.DynamoComponentDeploymentSharedSpec, serviceName string) UpdatePodSpec(podSpec *corev1.PodSpec, numberOfNodes int32, role Role, component *v1alpha1.DynamoComponentDeploymentSharedSpec, serviceName string, multinodeDeployer MultinodeDeployer)
} }
// NoopBackend does no processing - used for non-worker components like frontend, planner, router // NoopBackend does no processing - used for non-worker components like frontend, planner, router
...@@ -877,7 +877,7 @@ func (b *NoopBackend) UpdateContainer(container *corev1.Container, numberOfNodes ...@@ -877,7 +877,7 @@ func (b *NoopBackend) UpdateContainer(container *corev1.Container, numberOfNodes
// No-op: frontend, planner, router, etc. don't need backend-specific processing // No-op: frontend, planner, router, etc. don't need backend-specific processing
} }
func (b *NoopBackend) UpdatePodSpec(podSpec *corev1.PodSpec, numberOfNodes int32, role Role, component *v1alpha1.DynamoComponentDeploymentSharedSpec, serviceName string) { func (b *NoopBackend) UpdatePodSpec(podSpec *corev1.PodSpec, numberOfNodes int32, role Role, component *v1alpha1.DynamoComponentDeploymentSharedSpec, serviceName string, multinodeDeployer MultinodeDeployer) {
// No-op: frontend, planner, router, etc. don't need backend-specific processing // No-op: frontend, planner, router, etc. don't need backend-specific processing
} }
...@@ -1166,7 +1166,7 @@ func GenerateBasePodSpec( ...@@ -1166,7 +1166,7 @@ func GenerateBasePodSpec(
podSpec.Volumes = append(podSpec.Volumes, volumes...) podSpec.Volumes = append(podSpec.Volumes, volumes...)
podSpec.ImagePullSecrets = controller_common.AppendUniqueImagePullSecrets(podSpec.ImagePullSecrets, imagePullSecrets) podSpec.ImagePullSecrets = controller_common.AppendUniqueImagePullSecrets(podSpec.ImagePullSecrets, imagePullSecrets)
backend.UpdatePodSpec(&podSpec, numberOfNodes, role, component, serviceName) backend.UpdatePodSpec(&podSpec, numberOfNodes, role, component, serviceName, multinodeDeployer)
// Inject checkpoint configuration if enabled // Inject checkpoint configuration if enabled
// This handles ALL checkpoint-related modifications: // This handles ALL checkpoint-related modifications:
......
...@@ -27,19 +27,39 @@ import ( ...@@ -27,19 +27,39 @@ import (
* *
* The needsShell flag indicates when environment variables require shell interpretation * The needsShell flag indicates when environment variables require shell interpretation
*/ */
// shellQuoteForBashC quotes a string so it survives shell interpretation inside sh -c.
// Simple args (flags, paths) pass through unchanged; args containing special characters
// (JSON, env vars, spaces, quotes) are wrapped in double quotes with inner escaping.
func shellQuoteForBashC(s string) string {
if strings.ContainsAny(s, " \t\n'\"\\{}[]$`!") {
escaped := s
escaped = strings.ReplaceAll(escaped, `\`, `\\`) // must be first
escaped = strings.ReplaceAll(escaped, `"`, `\"`)
escaped = strings.ReplaceAll(escaped, `$`, `\$`)
escaped = strings.ReplaceAll(escaped, "`", "\\`")
escaped = strings.ReplaceAll(escaped, "'", `'"'"'`)
return `"` + escaped + `"`
}
return s
}
func injectFlagsIntoContainerCommand(container *corev1.Container, flags string, needsShell bool, framework string) { func injectFlagsIntoContainerCommand(container *corev1.Container, flags string, needsShell bool, framework string) {
if len(container.Command) > 0 && isPythonCommand(container.Command[0]) { if len(container.Command) > 0 && isPythonCommand(container.Command[0]) {
// Direct python command case // Direct python command case
if needsShell { if needsShell {
// Transform to shell wrapper for env var interpretation // Transform to shell wrapper for env var interpretation.
// Quote each original arg individually so JSON and other special
// characters survive shell interpretation.
fullCommand := strings.Join(container.Command, " ") fullCommand := strings.Join(container.Command, " ")
originalArgs := strings.Join(container.Args, " ") quotedArgs := make([]string, len(container.Args))
for i, arg := range container.Args {
quotedArgs[i] = shellQuoteForBashC(arg)
}
originalArgs := strings.Join(quotedArgs, " ")
var shellCommand string var shellCommand string
if len(container.Args) > 0 { if len(container.Args) > 0 {
// Use exec to ensure PID 1 is given to the python command
shellCommand = fmt.Sprintf("exec %s %s %s", fullCommand, originalArgs, flags) shellCommand = fmt.Sprintf("exec %s %s %s", fullCommand, originalArgs, flags)
} else { } else {
// Use exec to ensure PID 1 is given to the python command
shellCommand = fmt.Sprintf("exec %s %s", fullCommand, flags) shellCommand = fmt.Sprintf("exec %s %s", fullCommand, flags)
} }
container.Command = []string{"sh", "-c"} container.Command = []string{"sh", "-c"}
......
/*
* SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package featuregate
import semver "github.com/Masterminds/semver/v3"
// Feature gates gated on the operator origin version (the operator version that
// first reconciled / created the DGD resource).
var (
// VLLMMultiprocessing gates the use of vLLM native multiprocessing (mp)
// instead of Ray for multi-node deployments. Enabled for DGDs originally
// created by operator >= 1.0.0-dev.
VLLMMultiprocessing = OperatorOriginFeatureGate{
Name: "VLLMMultiprocessing",
MinOriginVersion: *semver.MustParse("1.0.0-dev"),
}
)
/*
* SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package featuregate
import (
semver "github.com/Masterminds/semver/v3"
"github.com/ai-dynamo/dynamo/deploy/operator/internal/consts"
"sigs.k8s.io/controller-runtime/pkg/log"
)
// OperatorOriginFeatureGate represents a feature gated on the operator version
// that originally created the DGD (nvidia.com/dynamo-operator-origin-version).
//
// The origin version is stamped by the mutating webhook at CREATE time and never
// changes afterwards. This allows the operator to introduce new default behaviors
// for newly created resources while preserving backward compatibility for existing ones.
//
// When the annotation is absent (pre-upgrade DGD), IsEnabled returns false
// to preserve backward compatibility.
type OperatorOriginFeatureGate struct {
Name string // Human-readable feature name (for logging)
MinOriginVersion semver.Version // Minimum origin version required (semver)
}
// IsEnabled returns true if the origin version in annotations meets or exceeds
// the gate's MinOriginVersion threshold.
//
// Returns false when:
// - annotations is nil (no metadata)
// - origin version annotation is absent (pre-upgrade DGD)
// - origin version is not valid semver
// - origin version < MinOriginVersion
func (fg OperatorOriginFeatureGate) IsEnabled(annotations map[string]string) bool {
logger := log.Log.WithName("featuregate").WithValues("feature", fg.Name)
originVersion, exists := annotations[consts.KubeAnnotationDynamoOperatorOriginVersion]
if !exists {
logger.V(1).Info("No operator origin version annotation, feature disabled (backward compat)")
return false
}
version, err := semver.NewVersion(originVersion)
if err != nil {
logger.Info("Invalid origin version, feature disabled",
"version", originVersion, "error", err.Error())
return false
}
enabled := version.Compare(&fg.MinOriginVersion) >= 0
logger.V(1).Info("Feature gate evaluated",
"originVersion", originVersion,
"threshold", fg.MinOriginVersion,
"enabled", enabled)
return enabled
}
/*
* SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package featuregate
import (
"testing"
semver "github.com/Masterminds/semver/v3"
"github.com/ai-dynamo/dynamo/deploy/operator/internal/consts"
)
func TestOperatorOriginFeatureGate_IsEnabled(t *testing.T) {
gate := OperatorOriginFeatureGate{
Name: "TestFeature",
MinOriginVersion: *semver.MustParse("1.0.0-dev"),
}
tests := []struct {
name string
annotations map[string]string
want bool
}{
{
name: "nil annotations = disabled (backward compat)",
annotations: nil,
want: false,
},
{
name: "empty annotations = disabled (backward compat)",
annotations: map[string]string{},
want: false,
},
{
name: "origin version below threshold = disabled",
annotations: map[string]string{
consts.KubeAnnotationDynamoOperatorOriginVersion: "0.9.0",
},
want: false,
},
{
name: "origin version at threshold = enabled",
annotations: map[string]string{
consts.KubeAnnotationDynamoOperatorOriginVersion: "1.0.0-dev",
},
want: true,
},
{
name: "origin version above threshold (release > pre-release) = enabled",
annotations: map[string]string{
consts.KubeAnnotationDynamoOperatorOriginVersion: "1.0.0",
},
want: true,
},
{
name: "origin version well above threshold = enabled",
annotations: map[string]string{
consts.KubeAnnotationDynamoOperatorOriginVersion: "2.0.0",
},
want: true,
},
{
name: "pre-release below threshold = disabled",
annotations: map[string]string{
consts.KubeAnnotationDynamoOperatorOriginVersion: "0.9.0-dev",
},
want: false,
},
{
name: "invalid origin version = disabled (graceful fallback)",
annotations: map[string]string{
consts.KubeAnnotationDynamoOperatorOriginVersion: "not-a-version",
},
want: false,
},
{
name: "fallback version 0.0.0-unknown = disabled",
annotations: map[string]string{
consts.KubeAnnotationDynamoOperatorOriginVersion: "0.0.0-unknown",
},
want: false,
},
{
name: "unrelated annotations without origin version = disabled",
annotations: map[string]string{
"some.other/annotation": "value",
},
want: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := gate.IsEnabled(tt.annotations)
if got != tt.want {
t.Errorf("IsEnabled() = %v, want %v", got, tt.want)
}
})
}
}
func TestOperatorOriginFeatureGate_DifferentThresholds(t *testing.T) {
annotations := map[string]string{
consts.KubeAnnotationDynamoOperatorOriginVersion: "0.9.0",
}
tests := []struct {
name string
minOriginVersion semver.Version
want bool
}{
{
name: "threshold below origin = enabled",
minOriginVersion: *semver.MustParse("0.8.0"),
want: true,
},
{
name: "threshold equal to origin = enabled",
minOriginVersion: *semver.MustParse("0.9.0"),
want: true,
},
{
name: "threshold above origin = disabled",
minOriginVersion: *semver.MustParse("1.0.0"),
want: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
gate := OperatorOriginFeatureGate{
Name: "TestFeature",
MinOriginVersion: tt.minOriginVersion,
}
got := gate.IsEnabled(annotations)
if got != tt.want {
t.Errorf("IsEnabled() with threshold %v = %v, want %v", tt.minOriginVersion, got, tt.want)
}
})
}
}
...@@ -441,6 +441,17 @@ func (v *DynamoGraphDeploymentValidator) validateAnnotations() error { ...@@ -441,6 +441,17 @@ func (v *DynamoGraphDeploymentValidator) validateAnnotations() error {
} }
} }
// Validate vLLM distributed executor backend override
if value, exists := annotations[consts.KubeAnnotationVLLMDistributedExecutorBackend]; exists {
switch strings.ToLower(value) {
case "mp", "ray":
// valid
default:
errs = append(errs, fmt.Errorf("annotation %s has invalid value %q: must be \"mp\" or \"ray\"",
consts.KubeAnnotationVLLMDistributedExecutorBackend, value))
}
}
return errors.Join(errs...) return errors.Join(errs...)
} }
......
...@@ -677,6 +677,79 @@ func TestDynamoGraphDeploymentValidator_Validate(t *testing.T) { ...@@ -677,6 +677,79 @@ func TestDynamoGraphDeploymentValidator_Validate(t *testing.T) {
wantErr: false, wantErr: false,
}, },
// Annotation validation test cases // Annotation validation test cases
{
name: "valid annotation vllm-distributed-executor-backend=mp",
deployment: &nvidiacomv1alpha1.DynamoGraphDeployment{
ObjectMeta: metav1.ObjectMeta{
Name: "test-graph",
Namespace: "default",
Annotations: map[string]string{
consts.KubeAnnotationVLLMDistributedExecutorBackend: "mp",
},
},
Spec: nvidiacomv1alpha1.DynamoGraphDeploymentSpec{
Services: map[string]*nvidiacomv1alpha1.DynamoComponentDeploymentSharedSpec{
"main": {},
},
},
},
wantErr: false,
},
{
name: "valid annotation vllm-distributed-executor-backend=ray",
deployment: &nvidiacomv1alpha1.DynamoGraphDeployment{
ObjectMeta: metav1.ObjectMeta{
Name: "test-graph",
Namespace: "default",
Annotations: map[string]string{
consts.KubeAnnotationVLLMDistributedExecutorBackend: "ray",
},
},
Spec: nvidiacomv1alpha1.DynamoGraphDeploymentSpec{
Services: map[string]*nvidiacomv1alpha1.DynamoComponentDeploymentSharedSpec{
"main": {},
},
},
},
wantErr: false,
},
{
name: "valid annotation vllm-distributed-executor-backend case insensitive MP",
deployment: &nvidiacomv1alpha1.DynamoGraphDeployment{
ObjectMeta: metav1.ObjectMeta{
Name: "test-graph",
Namespace: "default",
Annotations: map[string]string{
consts.KubeAnnotationVLLMDistributedExecutorBackend: "MP",
},
},
Spec: nvidiacomv1alpha1.DynamoGraphDeploymentSpec{
Services: map[string]*nvidiacomv1alpha1.DynamoComponentDeploymentSharedSpec{
"main": {},
},
},
},
wantErr: false,
},
{
name: "invalid annotation vllm-distributed-executor-backend",
deployment: &nvidiacomv1alpha1.DynamoGraphDeployment{
ObjectMeta: metav1.ObjectMeta{
Name: "test-graph",
Namespace: "default",
Annotations: map[string]string{
consts.KubeAnnotationVLLMDistributedExecutorBackend: "invalid",
},
},
Spec: nvidiacomv1alpha1.DynamoGraphDeploymentSpec{
Services: map[string]*nvidiacomv1alpha1.DynamoComponentDeploymentSharedSpec{
"main": {},
},
},
},
wantErr: true,
errMsg: `annotation nvidia.com/vllm-distributed-executor-backend has invalid value "invalid": must be "mp" or "ray"`,
},
{ {
name: "no annotations is valid", name: "no annotations is valid",
deployment: &nvidiacomv1alpha1.DynamoGraphDeployment{ deployment: &nvidiacomv1alpha1.DynamoGraphDeployment{
...@@ -765,6 +838,46 @@ func TestDynamoGraphDeploymentValidator_Validate(t *testing.T) { ...@@ -765,6 +838,46 @@ func TestDynamoGraphDeploymentValidator_Validate(t *testing.T) {
wantErr: true, wantErr: true,
errMsg: `annotation nvidia.com/dynamo-operator-origin-version has invalid value "not-a-version": must be valid semver`, errMsg: `annotation nvidia.com/dynamo-operator-origin-version has invalid value "not-a-version": must be valid semver`,
}, },
{
name: "both annotations valid",
deployment: &nvidiacomv1alpha1.DynamoGraphDeployment{
ObjectMeta: metav1.ObjectMeta{
Name: "test-graph",
Namespace: "default",
Annotations: map[string]string{
consts.KubeAnnotationDynamoOperatorOriginVersion: "1.0.0",
consts.KubeAnnotationVLLMDistributedExecutorBackend: "mp",
},
},
Spec: nvidiacomv1alpha1.DynamoGraphDeploymentSpec{
Services: map[string]*nvidiacomv1alpha1.DynamoComponentDeploymentSharedSpec{
"main": {},
},
},
},
wantErr: false,
},
{
name: "both annotations invalid",
deployment: &nvidiacomv1alpha1.DynamoGraphDeployment{
ObjectMeta: metav1.ObjectMeta{
Name: "test-graph",
Namespace: "default",
Annotations: map[string]string{
consts.KubeAnnotationDynamoOperatorOriginVersion: "bad",
consts.KubeAnnotationVLLMDistributedExecutorBackend: "invalid",
},
},
Spec: nvidiacomv1alpha1.DynamoGraphDeploymentSpec{
Services: map[string]*nvidiacomv1alpha1.DynamoComponentDeploymentSharedSpec{
"main": {},
},
},
},
wantErr: true,
errMsg: "annotation nvidia.com/dynamo-operator-origin-version has invalid value \"bad\": must be valid semver\nannotation nvidia.com/vllm-distributed-executor-backend has invalid value \"invalid\": must be \"mp\" or \"ray\"",
errContains: true,
},
} }
for _, tt := range tests { for _, tt := range tests {
......
...@@ -43,6 +43,12 @@ spec: ...@@ -43,6 +43,12 @@ spec:
- Qwen/Qwen3-0.6B - Qwen/Qwen3-0.6B
- --tensor-parallel-size - --tensor-parallel-size
- "2" - "2"
- --is-decode-worker
- --connector
- none
- --kv-transfer-config
- '{"kv_connector": "NixlConnector", "kv_role": "kv_both", "engine_id":
"vllm-disagg-decode-engine-0abc123"}'
prefill: prefill:
envFromSecret: hf-token-secret envFromSecret: hf-token-secret
componentType: worker componentType: worker
...@@ -63,6 +69,12 @@ spec: ...@@ -63,6 +69,12 @@ spec:
args: args:
- --model - --model
- Qwen/Qwen3-0.6B - Qwen/Qwen3-0.6B
- --is-prefill-worker
- --tensor-parallel-size - --tensor-parallel-size
- "2" - "2"
- --is-prefill-worker
- --connector
- none
- --kv-transfer-config
- '{"kv_connector": "NixlConnector", "kv_role": "kv_both", "engine_id":
"vllm-disagg-prefill-engine-0abc123"}'
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment