Unverified Commit 8388e162 authored by Thomas Montfort's avatar Thomas Montfort Committed by GitHub
Browse files

feat: vllm data parallelism multi-node support in operator (#3595)

parent f5854e17
......@@ -677,17 +677,9 @@ spec:
componentType:
description: ComponentType indicates the role of this component (for example, "main").
type: string
dynamoComponent:
description: |-
DynamoComponent selects the Dynamo component from the archive to deploy.
Typically corresponds to a component defined in the packaged Dynamo artifacts.
type: string
dynamoNamespace:
description: Dynamo namespace of the service (allows to override the Dynamo namespace of the service defined in annotations inside the Dynamo archive)
type: string
dynamoTag:
description: 'contains the tag of the DynamoComponent: for example, "my_package:MyService"'
type: string
envFromSecret:
description: |-
EnvFromSecret references a Secret whose key/value pairs will be exposed as
......
......@@ -64,14 +64,9 @@ spec:
- vllm
- trtllm
type: string
dynamoGraph:
description: |-
DynamoGraph selects the graph (workflow/topology) to deploy. This must match
a graph name packaged with the Dynamo archive.
type: string
envs:
description: |-
Envs are environment variables applied to all services in the graph unless
Envs are environment variables applied to all services in the deployment unless
overridden by service-specific configuration.
items:
description: EnvVar represents an environment variable present in a Container.
......@@ -10388,11 +10383,7 @@ spec:
type: object
type: array
type: object
description: |-
Services allows per-service overrides of the component deployment settings.
- key: name of the service defined by the DynamoComponent
- value: overrides for that service
If not set for a service, the default DynamoComponentDeployment values are used.
description: Services are the services to deploy as part of this deployment.
type: object
type: object
status:
......
......@@ -677,17 +677,9 @@ spec:
componentType:
description: ComponentType indicates the role of this component (for example, "main").
type: string
dynamoComponent:
description: |-
DynamoComponent selects the Dynamo component from the archive to deploy.
Typically corresponds to a component defined in the packaged Dynamo artifacts.
type: string
dynamoNamespace:
description: Dynamo namespace of the service (allows to override the Dynamo namespace of the service defined in annotations inside the Dynamo archive)
type: string
dynamoTag:
description: 'contains the tag of the DynamoComponent: for example, "my_package:MyService"'
type: string
envFromSecret:
description: |-
EnvFromSecret references a Secret whose key/value pairs will be exposed as
......
......@@ -64,14 +64,9 @@ spec:
- vllm
- trtllm
type: string
dynamoGraph:
description: |-
DynamoGraph selects the graph (workflow/topology) to deploy. This must match
a graph name packaged with the Dynamo archive.
type: string
envs:
description: |-
Envs are environment variables applied to all services in the graph unless
Envs are environment variables applied to all services in the deployment unless
overridden by service-specific configuration.
items:
description: EnvVar represents an environment variable present in a Container.
......@@ -10388,11 +10383,7 @@ spec:
type: object
type: array
type: object
description: |-
Services allows per-service overrides of the component deployment settings.
- key: name of the service defined by the DynamoComponent
- value: overrides for that service
If not set for a service, the default DynamoComponentDeployment values are used.
description: Services are the services to deploy as part of this deployment.
type: object
type: object
status:
......
......@@ -94,7 +94,16 @@ For larger models (typically >70B parameters) or slower storage systems, you may
For multinode deployments, the operator modifies probes based on the backend framework and node role:
#### VLLM Backend
The operator automatically selects between two deployment modes based on parallelism configuration:
**Ray-Based Mode** (when `world_size > GPUs_per_node`):
- **Worker nodes**: All probes (liveness, readiness, startup) are removed
- **Leader nodes**: All probes remain active
**Data Parallel Mode** (when `world_size × data_parallel_size > GPUs_per_node`):
- **Worker nodes**: All probes (liveness, readiness, startup) are removed
- **Leader nodes**: All probes remain active
#### SGLang Backend
- **Worker nodes**: All probes (liveness, readiness, startup) are removed
......@@ -192,7 +201,8 @@ Default container ports are configured based on component type:
## Backend-Specific Configurations
### VLLM
- **Ray Head Port**: 6379 (for multinode deployments)
- **Ray Head Port**: 6379 (for Ray-based multinode deployments)
- **Data Parallel RPC Port**: 13445 (for data parallel multinode deployments)
### SGLang
- **Distribution Init Port**: 29500 (for multinode deployments)
......
......@@ -3,7 +3,6 @@ package dynamo
import (
"fmt"
"regexp"
"strings"
"github.com/ai-dynamo/dynamo/deploy/cloud/operator/api/v1alpha1"
corev1 "k8s.io/api/core/v1"
......@@ -60,56 +59,7 @@ func (b *SGLangBackend) UpdateContainer(container *corev1.Container, numberOfNod
return
}
/*
* Flag Injection Strategy for Multinode SGLang Deployments
*
* This code handles the injection of distributed training flags (--dist-init-addr, --nnodes, --node-rank)
* into container commands for multinode SGLang deployments. The complexity arises from supporting multiple
* container command patterns and ensuring proper environment variable interpretation.
*
* Two main scenarios are handled:
*
* 1. Direct Python Command (e.g., Command: ["python3"], Args: ["-m", "sglang", "..."])
* - If shell interpretation is needed (for env vars): Wrap in "sh -c" with exec
* - If no shell needed: Simply append flags to the Args array
*
* 2. Non-Python Command (e.g., Command: ["sh"], Args: ["-c", "python3 -m sglang ..."])
* - Use regex-based injection to find embedded Python+SGLang commands within args
* - Insert flags after the Python command but before any shell operators (|, &, ;)
*
* The needsShell flag indicates when environment variables require shell interpretation
*/
if len(container.Command) > 0 && isPythonCommand(container.Command[0]) {
// Direct python command case
if needsShell {
// Transform to shell wrapper for env var interpretation
fullCommand := strings.Join(container.Command, " ")
originalArgs := strings.Join(container.Args, " ")
var shellCommand string
if len(container.Args) > 0 {
// Use exec to ensure PID 1 is given to the python command
shellCommand = fmt.Sprintf("exec %s %s %s", fullCommand, originalArgs, flags)
} else {
// Use exec to ensure PID 1 is given to the python command
shellCommand = fmt.Sprintf("exec %s %s", fullCommand, flags)
}
container.Command = []string{"sh", "-c"}
container.Args = []string{shellCommand}
} else {
// Simple append to args
flagsSlice := strings.Fields(flags)
container.Args = append(container.Args, flagsSlice...)
}
} else {
// Non-python command case - try injection on each arg individually
for i, arg := range container.Args {
modifiedArg := b.injectFlagsIntoPythonCommand(arg, flags)
if modifiedArg != arg { // flags were successfully injected
container.Args[i] = modifiedArg
break // stop after first successful injection
}
}
}
injectFlagsIntoContainerCommand(container, flags, needsShell, "sglang")
}
func (b *SGLangBackend) UpdatePodSpec(podSpec *corev1.PodSpec, numberOfNodes int32, role Role, component *v1alpha1.DynamoComponentDeploymentSharedSpec, serviceName string) {
......@@ -133,26 +83,3 @@ func (b *SGLangBackend) getMultinodeFlags(numberOfNodes int32, role Role, servic
flags := fmt.Sprintf("--dist-init-addr %s --nnodes %d --node-rank %s", distInitAddr, numberOfNodes, nodeRank)
return flags, needsShell
}
// injectFlagsIntoPythonCommand finds python sglang commands and adds flags after them
func (b *SGLangBackend) injectFlagsIntoPythonCommand(arg, flags string) string {
// Regex to match python commands that contain sglang
// Matches: python, python3, python3.11, etc. followed by sglang-related modules
pattern := `(python[0-9.]*\s+[^|&;]*sglang[^|&;]*?)(\s|$|[|&;])`
re := regexp.MustCompile(pattern)
// Replace with the command + flags + whatever comes after
result := re.ReplaceAllStringFunc(arg, func(match string) string {
// Extract the python command part and the delimiter
submatches := re.FindStringSubmatch(match)
if len(submatches) >= 3 {
pythonCmd := submatches[1]
delimiter := submatches[2]
return pythonCmd + " " + flags + delimiter
}
return match
})
return result
}
......@@ -2,15 +2,21 @@ package dynamo
import (
"fmt"
"strconv"
"strings"
"github.com/ai-dynamo/dynamo/deploy/cloud/operator/api/v1alpha1"
"github.com/ai-dynamo/dynamo/deploy/cloud/operator/internal/consts"
corev1 "k8s.io/api/core/v1"
"sigs.k8s.io/controller-runtime/pkg/log"
)
const (
VLLMPort = "6379"
VLLMPort = "6379"
dataParallelRPCPort = "13445"
tensorParallelSizeFlag = "--tensor-parallel-size"
pipelineParallelSizeFlag = "--pipeline-parallel-size"
dataParallelSizeFlag = "--data-parallel-size"
)
type VLLMBackend struct{}
......@@ -64,17 +70,108 @@ func (b *VLLMBackend) UpdatePodSpec(podSpec *corev1.PodSpec, numberOfNodes int32
// do nothing
}
// updateVLLMMultinodeArgs applies Ray-specific modifications for multinode deployments
// updateVLLMMultinodeArgs will inject Ray-specific flags for tensor parallel multinode deployments
// OR data parallel flags for data parallel multinode deployments
func updateVLLMMultinodeArgs(container *corev1.Container, role Role, serviceName string, multinodeDeployer MultinodeDeployer) {
expandedArgs := getExpandedArgs(container)
if needsRayDistributedLaunch(expandedArgs, container.Resources) {
injectRayDistributedLaunchFlags(container, role, serviceName, multinodeDeployer)
} else if needsDataParallelLaunch(expandedArgs, container.Resources) {
injectDataParallelLaunchFlags(container, role, serviceName, multinodeDeployer)
} else {
logger := log.Log.WithName("vllm-backend")
logger.Info("No need to inject Ray-specific or data parallel flags for multinode deployments", "args", strings.Join(container.Args, " "))
}
}
// getExpandedArgs will expand the containers args in the case where
// the args are joined together with spaces as an individual string (i.e. "python3 -m dynamo.vllm")
func getExpandedArgs(container *corev1.Container) []string {
expandedArgs := []string{}
for _, arg := range container.Args {
expandedArgs = append(expandedArgs, strings.Fields(arg)...)
}
return expandedArgs
}
func injectRayDistributedLaunchFlags(container *corev1.Container, role Role, serviceName string, multinodeDeployer MultinodeDeployer) {
switch role {
case RoleLeader:
if len(container.Args) > 0 {
// Prepend ray start --head command to existing args
container.Args = []string{fmt.Sprintf("ray start --head --port=%s && %s", VLLMPort, strings.Join(container.Args, " "))}
}
fullCommand := strings.Join(container.Command, " ")
originalArgs := strings.Join(container.Args, " ")
// Prepend ray start --head command to existing args
container.Args = []string{fmt.Sprintf("ray start --head --port=%s && %s %s", VLLMPort, fullCommand, originalArgs)}
case RoleWorker:
// Worker nodes only run Ray, completely replace args
leaderHostname := multinodeDeployer.GetLeaderHostname(serviceName)
container.Args = []string{fmt.Sprintf("ray start --address=%s:%s --block", leaderHostname, VLLMPort)}
}
container.Command = []string{"sh", "-c"} // ensure cmd is a shell
}
func injectDataParallelLaunchFlags(container *corev1.Container, role Role, serviceName string, multinodeDeployer MultinodeDeployer) {
expandedArgs := getExpandedArgs(container)
leaderHostname := multinodeDeployer.GetLeaderHostname(serviceName)
dataParallelSizeLocal := getContainerGPUs(container.Resources) / getWorldSize(expandedArgs)
var startRank string
switch role {
case RoleWorker:
nodeRank, _ := multinodeDeployer.GetNodeRank()
startRank = fmt.Sprintf("$(( %d * %s ))", dataParallelSizeLocal, nodeRank)
case RoleLeader:
startRank = "0" // leader start rank is always 0
default:
startRank = "0"
}
flags := []string{
"--data-parallel-address", leaderHostname,
"--data-parallel-size-local", strconv.FormatInt(dataParallelSizeLocal, 10),
"--data-parallel-rpc-port", dataParallelRPCPort,
"--data-parallel-start-rank", startRank,
}
injectFlagsIntoContainerCommand(container, strings.Join(flags, " "), true, "vllm")
}
// if world size (within DP rank) > GPU count, then we need to inject ray
// world size = tensor parallel size * pipeline parallel size
func needsRayDistributedLaunch(expandedArgs []string, resources corev1.ResourceRequirements) bool {
return getWorldSize(expandedArgs) > getContainerGPUs(resources)
}
func getWorldSize(expandedArgs []string) int64 {
tensorParallelSize := getFlagValue(expandedArgs, tensorParallelSizeFlag)
pipelineParallelSize := getFlagValue(expandedArgs, pipelineParallelSizeFlag)
return tensorParallelSize * pipelineParallelSize
}
// if world size across all DP ranks > GPU count, then we need to inject data parallel multinode coordination
func needsDataParallelLaunch(expandedArgs []string, resources corev1.ResourceRequirements) bool {
dataParallelSize := getFlagValue(expandedArgs, dataParallelSizeFlag)
return getWorldSize(expandedArgs)*dataParallelSize > getContainerGPUs(resources)
}
func getFlagValue(expandedArgs []string, flag string) int64 {
var flagValue int64 = 1
for i, arg := range expandedArgs {
if arg == flag && (i+1 < len(expandedArgs)) {
flagValue, err := strconv.ParseInt(expandedArgs[i+1], 10, 64)
if err != nil {
continue
}
return flagValue
}
}
return flagValue
}
func getContainerGPUs(resources corev1.ResourceRequirements) int64 {
var containerGPUs int64 = 1
// Requests defaults to Limits, doesn't make sense in case where Requests < Limits for gpus
for name, quantity := range resources.Limits {
if name.String() == consts.KubeResourceGPUNvidia {
containerGPUs = quantity.Value()
break
}
}
return containerGPUs
}
package dynamo
import (
"strings"
"fmt"
"reflect"
"testing"
"github.com/ai-dynamo/dynamo/deploy/cloud/operator/api/v1alpha1"
"github.com/onsi/gomega"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
)
func TestVLLMBackend_UpdateContainer(t *testing.T) {
backend := &VLLMBackend{}
tests := []struct {
name string
numberOfNodes int32
role Role
component *v1alpha1.DynamoComponentDeploymentSharedSpec
multinodeDeployer MultinodeDeployer
initialArgs []string
initialLivenessProbe *corev1.Probe
initialReadinessProbe *corev1.Probe
initialStartupProbe *corev1.Probe
expectedArgs []string
expectContains []string
expectNotModified bool // If true, container args should not change
expectProbesRemoved bool // If true, probes should be nil
name string
numberOfNodes int32
role Role
component *v1alpha1.DynamoComponentDeploymentSharedSpec
multinodeDeployer MultinodeDeployer
initialContainer *corev1.Container
expectedArgs []string
expectNotModified bool // If true, container args should not change
expectProbesRemoved bool // If true, probes should be nil
}{
{
name: "single node does not modify args",
......@@ -33,7 +31,7 @@ func TestVLLMBackend_UpdateContainer(t *testing.T) {
role: RoleMain,
component: &v1alpha1.DynamoComponentDeploymentSharedSpec{},
multinodeDeployer: &GroveMultinodeDeployer{},
initialArgs: []string{"python3", "-m", "dynamo.vllm"},
initialContainer: &corev1.Container{Command: []string{"python3"}, Args: []string{"-m", "dynamo.vllm"}},
expectNotModified: true,
},
{
......@@ -42,8 +40,8 @@ func TestVLLMBackend_UpdateContainer(t *testing.T) {
role: RoleLeader,
component: &v1alpha1.DynamoComponentDeploymentSharedSpec{},
multinodeDeployer: &GroveMultinodeDeployer{},
initialArgs: []string{"python3", "-m", "dynamo.vllm", "--model", "test"},
expectContains: []string{"ray start --head --port=6379 &&", "python3", "-m", "dynamo.vllm", "--model", "test"},
initialContainer: &corev1.Container{Command: []string{"python3", "-m", "dynamo.vllm"}, Args: []string{"--model", "test", tensorParallelSizeFlag, "8"}, Resources: corev1.ResourceRequirements{Limits: corev1.ResourceList{corev1.ResourceName("nvidia.com/gpu"): resource.MustParse("4")}}},
expectedArgs: []string{fmt.Sprintf("ray start --head --port=%s && python3 -m dynamo.vllm --model test %s 8", VLLMPort, tensorParallelSizeFlag)},
expectProbesRemoved: true,
},
{
......@@ -52,7 +50,7 @@ func TestVLLMBackend_UpdateContainer(t *testing.T) {
role: RoleWorker,
component: &v1alpha1.DynamoComponentDeploymentSharedSpec{},
multinodeDeployer: &GroveMultinodeDeployer{},
initialArgs: []string{"python3", "-m", "dynamo.vllm", "--model", "test"},
initialContainer: &corev1.Container{Command: []string{"python3"}, Args: []string{"-m", "dynamo.vllm", "--model", "test", tensorParallelSizeFlag, "8"}, Resources: corev1.ResourceRequirements{Limits: corev1.ResourceList{corev1.ResourceName("nvidia.com/gpu"): resource.MustParse("4")}}},
expectedArgs: []string{"ray start --address=$(GROVE_PCSG_NAME)-$(GROVE_PCSG_INDEX)-test-service-ldr-0.$(GROVE_HEADLESS_SERVICE):6379 --block"},
expectProbesRemoved: true,
},
......@@ -62,7 +60,7 @@ func TestVLLMBackend_UpdateContainer(t *testing.T) {
role: RoleWorker,
component: &v1alpha1.DynamoComponentDeploymentSharedSpec{},
multinodeDeployer: &LWSMultinodeDeployer{},
initialArgs: []string{"python3", "-m", "dynamo.vllm"},
initialContainer: &corev1.Container{Args: []string{"python3", "-m", "dynamo.vllm", tensorParallelSizeFlag, "8"}, Resources: corev1.ResourceRequirements{Limits: corev1.ResourceList{corev1.ResourceName("nvidia.com/gpu"): resource.MustParse("4")}}},
expectedArgs: []string{"ray start --address=$(LWS_LEADER_ADDRESS):6379 --block"},
expectProbesRemoved: true,
},
......@@ -72,7 +70,7 @@ func TestVLLMBackend_UpdateContainer(t *testing.T) {
role: RoleLeader,
component: &v1alpha1.DynamoComponentDeploymentSharedSpec{},
multinodeDeployer: &GroveMultinodeDeployer{},
initialArgs: []string{},
initialContainer: &corev1.Container{Args: []string{}},
expectNotModified: true, // Should not modify empty args
},
{
......@@ -81,7 +79,7 @@ func TestVLLMBackend_UpdateContainer(t *testing.T) {
role: RoleMain,
component: &v1alpha1.DynamoComponentDeploymentSharedSpec{},
multinodeDeployer: &GroveMultinodeDeployer{},
initialArgs: []string{"python3", "-m", "dynamo.frontend"},
initialContainer: &corev1.Container{Args: []string{"python3", "-m", "dynamo.frontend"}},
expectNotModified: true,
},
}
......@@ -90,37 +88,99 @@ func TestVLLMBackend_UpdateContainer(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
g := gomega.NewGomegaWithT(t)
// Create a container with initial state
container := &corev1.Container{
Args: append([]string{}, tt.initialArgs...), // Copy slice to avoid modifying original
LivenessProbe: tt.initialLivenessProbe,
ReadinessProbe: tt.initialReadinessProbe,
StartupProbe: tt.initialStartupProbe,
}
initialContainerArgs := append([]string{}, tt.initialContainer.Args...)
// Call UpdateContainer
backend.UpdateContainer(container, tt.numberOfNodes, tt.role, tt.component, "test-service", tt.multinodeDeployer)
backend.UpdateContainer(tt.initialContainer, tt.numberOfNodes, tt.role, tt.component, "test-service", tt.multinodeDeployer)
if tt.expectNotModified {
// Args should not have changed
g.Expect(container.Args).To(gomega.Equal(tt.initialArgs))
g.Expect(tt.initialContainer.Args).To(gomega.Equal(initialContainerArgs))
} else if tt.expectedArgs != nil {
// Check exact match
g.Expect(container.Args).To(gomega.Equal(tt.expectedArgs))
} else if tt.expectContains != nil {
// Check that expected strings are contained in the result
argsStr := strings.Join(container.Args, " ")
for _, expected := range tt.expectContains {
if !strings.Contains(argsStr, expected) {
t.Errorf("UpdateContainer() args = %v, should contain %s", container.Args, expected)
}
}
g.Expect(tt.initialContainer.Args).To(gomega.Equal(tt.expectedArgs))
}
if tt.expectProbesRemoved {
g.Expect(container.LivenessProbe).To(gomega.BeNil())
g.Expect(container.ReadinessProbe).To(gomega.BeNil())
g.Expect(container.StartupProbe).To(gomega.BeNil())
g.Expect(tt.initialContainer.LivenessProbe).To(gomega.BeNil())
g.Expect(tt.initialContainer.ReadinessProbe).To(gomega.BeNil())
g.Expect(tt.initialContainer.StartupProbe).To(gomega.BeNil())
}
})
}
}
func TestVLLMBackend_ShellCommandInjection(t *testing.T) {
backend := &VLLMBackend{}
tests := []struct {
name string
numberOfNodes int32
role Role
multinodeDeployer MultinodeDeployer
initialContainer *corev1.Container
expectedArgs []string
description string
}{
{
name: "single node shell command not modified",
numberOfNodes: 1,
role: RoleMain,
multinodeDeployer: &GroveMultinodeDeployer{},
initialContainer: &corev1.Container{Command: []string{"sh", "-c"}, Args: []string{"python3 -m dynamo.vllm"}},
expectedArgs: []string{"python3 -m dynamo.vllm"},
description: "Single node should not modify shell commands",
},
{
name: "multinode shell command with regex injection",
numberOfNodes: 2,
role: RoleLeader,
multinodeDeployer: &GroveMultinodeDeployer{},
initialContainer: &corev1.Container{Command: []string{"sh", "-c"}, Args: []string{fmt.Sprintf("python3 -m dynamo.vllm %s 8", dataParallelSizeFlag)}, Resources: corev1.ResourceRequirements{Limits: corev1.ResourceList{corev1.ResourceName("nvidia.com/gpu"): resource.MustParse("4")}}},
expectedArgs: []string{"python3 -m dynamo.vllm --data-parallel-address $(GROVE_PCSG_NAME)-$(GROVE_PCSG_INDEX)-test-service-ldr-0.$(GROVE_HEADLESS_SERVICE) --data-parallel-size-local 4 --data-parallel-rpc-port 13445 --data-parallel-start-rank 0 --data-parallel-size 8"},
description: "Shell commands should use regex injection for python commands",
},
{
name: "multinode shell command with complex pipeline",
numberOfNodes: 2,
role: RoleLeader,
multinodeDeployer: &GroveMultinodeDeployer{},
initialContainer: &corev1.Container{Command: []string{"sh", "-c"}, Args: []string{fmt.Sprintf("echo blah | wc -l && python3 -m dynamo.vllm %s 8 && ls -al", dataParallelSizeFlag)}, Resources: corev1.ResourceRequirements{Limits: corev1.ResourceList{corev1.ResourceName("nvidia.com/gpu"): resource.MustParse("4")}}},
expectedArgs: []string{"echo blah | wc -l && python3 -m dynamo.vllm --data-parallel-address $(GROVE_PCSG_NAME)-$(GROVE_PCSG_INDEX)-test-service-ldr-0.$(GROVE_HEADLESS_SERVICE) --data-parallel-size-local 4 --data-parallel-rpc-port 13445 --data-parallel-start-rank 0 --data-parallel-size 8 && ls -al"},
description: "Complex shell commands should inject flags only into python part",
},
{
name: "shell command with LWS deployer",
numberOfNodes: 2,
role: RoleLeader,
multinodeDeployer: &LWSMultinodeDeployer{},
initialContainer: &corev1.Container{Command: []string{"sh", "-c"}, Args: []string{fmt.Sprintf("python3 -m dynamo.vllm %s 8", dataParallelSizeFlag)}, Resources: corev1.ResourceRequirements{Limits: corev1.ResourceList{corev1.ResourceName("nvidia.com/gpu"): resource.MustParse("4")}}},
expectedArgs: []string{"python3 -m dynamo.vllm --data-parallel-address $(LWS_LEADER_ADDRESS) --data-parallel-size-local 4 --data-parallel-rpc-port 13445 --data-parallel-start-rank 0 --data-parallel-size 8"},
description: "LWS shell commands should use LWS variables",
},
{
name: "shell command with pipes",
numberOfNodes: 2,
role: RoleLeader,
multinodeDeployer: &GroveMultinodeDeployer{},
initialContainer: &corev1.Container{Command: []string{"sh", "-c"}, Args: []string{fmt.Sprintf("python3 -m dynamo.vllm %s 8 | tee /tmp/log", dataParallelSizeFlag)}, Resources: corev1.ResourceRequirements{Limits: corev1.ResourceList{corev1.ResourceName("nvidia.com/gpu"): resource.MustParse("4")}}},
expectedArgs: []string{"python3 -m dynamo.vllm --data-parallel-address $(GROVE_PCSG_NAME)-$(GROVE_PCSG_INDEX)-test-service-ldr-0.$(GROVE_HEADLESS_SERVICE) --data-parallel-size-local 4 --data-parallel-rpc-port 13445 --data-parallel-start-rank 0 --data-parallel-size 8 | tee /tmp/log"},
description: "Shell commands with pipes should inject flags before pipe",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
expectedCommand := append([]string{}, tt.initialContainer.Command...)
backend.UpdateContainer(tt.initialContainer, tt.numberOfNodes, tt.role, &v1alpha1.DynamoComponentDeploymentSharedSpec{}, "test-service", tt.multinodeDeployer)
if !reflect.DeepEqual(tt.initialContainer.Args, tt.expectedArgs) {
t.Errorf("UpdateContainer() args = %v, want %v", tt.initialContainer.Args, tt.expectedArgs)
}
if !reflect.DeepEqual(tt.initialContainer.Command, expectedCommand) {
t.Errorf("UpdateContainer() should preserve shell command, got: %v, want: %v", tt.initialContainer.Command, expectedCommand)
}
})
}
......@@ -235,44 +295,64 @@ func TestUpdateVLLMMultinodeArgs(t *testing.T) {
name string
role Role
multinodeDeployer MultinodeDeployer
initialArgs []string
initialContainer *corev1.Container
expectedArgs []string
expectContains []string
expectNotModified bool
}{
{
name: "leader prepends ray start --head",
role: RoleLeader,
multinodeDeployer: &GroveMultinodeDeployer{},
initialArgs: []string{"python3", "-m", "dynamo.vllm"},
expectContains: []string{"ray start --head --port=6379 &&", "python3", "-m", "dynamo.vllm"},
initialContainer: &corev1.Container{Args: []string{"python3", "-m", "dynamo.vllm", tensorParallelSizeFlag, "16"}, Resources: corev1.ResourceRequirements{Limits: corev1.ResourceList{corev1.ResourceName("nvidia.com/gpu"): resource.MustParse("8")}}},
expectedArgs: []string{fmt.Sprintf("ray start --head --port=%s && python3 -m dynamo.vllm %s 16", VLLMPort, tensorParallelSizeFlag)},
},
{
name: "leader prepends distributed data parallel flags",
role: RoleLeader,
multinodeDeployer: &GroveMultinodeDeployer{},
initialContainer: &corev1.Container{Command: []string{"python3"}, Args: []string{"-m", "dynamo.vllm", dataParallelSizeFlag, "16"}, Resources: corev1.ResourceRequirements{Limits: corev1.ResourceList{corev1.ResourceName("nvidia.com/gpu"): resource.MustParse("8")}}},
expectedArgs: []string{fmt.Sprintf("exec python3 -m dynamo.vllm %s 16 --data-parallel-address $(GROVE_PCSG_NAME)-$(GROVE_PCSG_INDEX)-test-service-ldr-0.$(GROVE_HEADLESS_SERVICE) --data-parallel-size-local 8 --data-parallel-rpc-port 13445 --data-parallel-start-rank 0", dataParallelSizeFlag)},
},
{
name: "leader with empty args does not modify",
role: RoleLeader,
multinodeDeployer: &GroveMultinodeDeployer{},
initialArgs: []string{},
initialContainer: &corev1.Container{Args: []string{}},
expectNotModified: true,
},
{
name: "worker with Grove deployment",
name: "worker with ray distributed launch Grove",
role: RoleWorker,
multinodeDeployer: &GroveMultinodeDeployer{},
initialArgs: []string{"python3", "-m", "dynamo.vllm"},
initialContainer: &corev1.Container{Args: []string{"python3", "-m", "dynamo.vllm", tensorParallelSizeFlag, "16"}, Resources: corev1.ResourceRequirements{Limits: corev1.ResourceList{corev1.ResourceName("nvidia.com/gpu"): resource.MustParse("8")}}},
expectedArgs: []string{"ray start --address=$(GROVE_PCSG_NAME)-$(GROVE_PCSG_INDEX)-test-service-ldr-0.$(GROVE_HEADLESS_SERVICE):6379 --block"},
},
{
name: "worker with LWS deployment",
name: "worker with data parallel launch Grove",
role: RoleWorker,
multinodeDeployer: &GroveMultinodeDeployer{},
initialContainer: &corev1.Container{Command: []string{"python3"}, Args: []string{"-m", "dynamo.vllm", dataParallelSizeFlag, "16"}, Resources: corev1.ResourceRequirements{Limits: corev1.ResourceList{corev1.ResourceName("nvidia.com/gpu"): resource.MustParse("8")}}},
expectedArgs: []string{fmt.Sprintf("exec python3 -m dynamo.vllm %s 16 --data-parallel-address $(GROVE_PCSG_NAME)-$(GROVE_PCSG_INDEX)-test-service-ldr-0.$(GROVE_HEADLESS_SERVICE) --data-parallel-size-local 8 --data-parallel-rpc-port 13445 --data-parallel-start-rank $(( 8 * $((GROVE_PCLQ_POD_INDEX + 1)) ))", dataParallelSizeFlag)},
},
{
name: "worker with data parallel launch Grove, tp > 1",
role: RoleWorker,
multinodeDeployer: &GroveMultinodeDeployer{},
initialContainer: &corev1.Container{Command: []string{"python3"}, Args: []string{"-m", "dynamo.vllm", dataParallelSizeFlag, "8", tensorParallelSizeFlag, "2"}, Resources: corev1.ResourceRequirements{Limits: corev1.ResourceList{corev1.ResourceName("nvidia.com/gpu"): resource.MustParse("8")}}},
expectedArgs: []string{fmt.Sprintf("exec python3 -m dynamo.vllm %s 8 %s 2 --data-parallel-address $(GROVE_PCSG_NAME)-$(GROVE_PCSG_INDEX)-test-service-ldr-0.$(GROVE_HEADLESS_SERVICE) --data-parallel-size-local 4 --data-parallel-rpc-port 13445 --data-parallel-start-rank $(( 4 * $((GROVE_PCLQ_POD_INDEX + 1)) ))", dataParallelSizeFlag, tensorParallelSizeFlag)},
},
{
name: "worker with ray distributed launch LWS",
role: RoleWorker,
multinodeDeployer: &LWSMultinodeDeployer{},
initialArgs: []string{"python3", "-m", "dynamo.vllm"},
initialContainer: &corev1.Container{Args: []string{"python3", "-m", "dynamo.vllm", tensorParallelSizeFlag, "16"}, Resources: corev1.ResourceRequirements{Limits: corev1.ResourceList{corev1.ResourceName("nvidia.com/gpu"): resource.MustParse("8")}}},
expectedArgs: []string{"ray start --address=$(LWS_LEADER_ADDRESS):6379 --block"},
},
{
name: "main role does not modify args",
role: RoleMain,
multinodeDeployer: &GroveMultinodeDeployer{},
initialArgs: []string{"python3", "-m", "dynamo.frontend"},
initialContainer: &corev1.Container{Args: []string{"python3", "-m", "dynamo.frontend"}},
expectNotModified: true,
},
}
......@@ -281,28 +361,16 @@ func TestUpdateVLLMMultinodeArgs(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
g := gomega.NewGomegaWithT(t)
// Create a container with initial args
container := &corev1.Container{
Args: append([]string{}, tt.initialArgs...), // Copy slice to avoid modifying original
}
initialContainerArgs := append([]string{}, tt.initialContainer.Args...)
// Call updateVLLMMultinodeArgs
updateVLLMMultinodeArgs(container, tt.role, "test-service", tt.multinodeDeployer)
updateVLLMMultinodeArgs(tt.initialContainer, tt.role, "test-service", tt.multinodeDeployer)
if tt.expectNotModified {
// Args should not have changed
g.Expect(container.Args).To(gomega.Equal(tt.initialArgs))
g.Expect(tt.initialContainer.Args).To(gomega.Equal(initialContainerArgs))
} else if tt.expectedArgs != nil {
// Check exact match
g.Expect(container.Args).To(gomega.Equal(tt.expectedArgs))
} else if tt.expectContains != nil {
// Check that expected strings are contained in the result
argsStr := strings.Join(container.Args, " ")
for _, expected := range tt.expectContains {
if !strings.Contains(argsStr, expected) {
t.Errorf("updateVLLMMultinodeArgs() args = %v, should contain %s", container.Args, expected)
}
}
g.Expect(tt.initialContainer.Args).To(gomega.Equal(tt.expectedArgs))
}
})
}
......
package dynamo
import (
"fmt"
"regexp"
"strings"
corev1 "k8s.io/api/core/v1"
)
/*
* Flag Injection Strategy for Multinode
*
* This code handles the injection of distributed training flags (--dist-init-addr, --nnodes, --node-rank)
* into container commands for multinode SGLang deployments. The complexity arises from supporting multiple
* container command patterns and ensuring proper environment variable interpretation.
*
* Two main scenarios are handled:
*
* 1. Direct Python Command (e.g., Command: ["python3"], Args: ["-m", "sglang", "..."])
* - If shell interpretation is needed (for env vars): Wrap in "sh -c" with exec
* - If no shell needed: Simply append flags to the Args array
*
* 2. Non-Python Command (e.g., Command: ["sh"], Args: ["-c", "python3 -m sglang ..."])
* - Use regex-based injection to find embedded Python+SGLang commands within args
* - Insert flags after the Python command but before any shell operators (|, &, ;)
*
* The needsShell flag indicates when environment variables require shell interpretation
*/
func injectFlagsIntoContainerCommand(container *corev1.Container, flags string, needsShell bool, framework string) {
if len(container.Command) > 0 && isPythonCommand(container.Command[0]) {
// Direct python command case
if needsShell {
// Transform to shell wrapper for env var interpretation
fullCommand := strings.Join(container.Command, " ")
originalArgs := strings.Join(container.Args, " ")
var shellCommand string
if len(container.Args) > 0 {
// Use exec to ensure PID 1 is given to the python command
shellCommand = fmt.Sprintf("exec %s %s %s", fullCommand, originalArgs, flags)
} else {
// Use exec to ensure PID 1 is given to the python command
shellCommand = fmt.Sprintf("exec %s %s", fullCommand, flags)
}
container.Command = []string{"sh", "-c"}
container.Args = []string{shellCommand}
} else {
// Simple append to args
flagsSlice := strings.Fields(flags)
container.Args = append(container.Args, flagsSlice...)
}
} else {
// Non-python command case - try injection on each arg individually
for i, arg := range container.Args {
modifiedArg := injectFlagsIntoPythonCommand(arg, flags, framework)
if modifiedArg != arg { // flags were successfully injected
container.Args[i] = modifiedArg
break // stop after first successful injection
}
}
}
}
func injectFlagsIntoPythonCommand(arg, flags string, framework string) string {
// Regex to match python commands that contain sglang
// Matches: python, python3, python3.11, etc. followed by sglang-related modules
pattern := fmt.Sprintf(`(python[0-9.]*\s+[^|&;]*%s[^|&;]*?)(\s|$|[|&;])`, framework)
re := regexp.MustCompile(pattern)
// Replace with the command + flags + whatever comes after
result := re.ReplaceAllStringFunc(arg, func(match string) string {
// Extract the python command part and the delimiter
submatches := re.FindStringSubmatch(match)
if len(submatches) >= 3 {
pythonCmd := submatches[1]
delimiter := submatches[2]
return pythonCmd + " " + flags + delimiter
}
return match
})
return result
}
......@@ -182,16 +182,38 @@ When you deploy a multinode workload, the Dynamo operator automatically applies
### vLLM Backend
For vLLM multinode deployments, the operator automatically configures Ray for distributed inference:
For vLLM multinode deployments, the operator automatically selects and configures the appropriate distributed execution mode based on your parallelism settings:
#### Leader Node
#### Deployment Modes
The operator automatically determines the deployment mode based on your parallelism configuration:
**1. Ray-Based Mode (Tensor/Pipeline Parallelism across nodes)**
- **When used**: When `world_size > GPUs_per_node` where `world_size = tensor_parallel_size × pipeline_parallel_size`
- **Use case**: Distributing a single model instance across multiple nodes using tensor or pipeline parallelism
**Leader Node:**
- **Ray Head**: The operator prepends `ray start --head --port=6379` to your existing command
- **Probes**: All health probes remain active (liveness, readiness, startup)
#### Worker Nodes
**Worker Nodes:**
- **Ray Worker**: The command is replaced with `ray start --address=<leader-hostname>:6379 --block`
- **Probes**: All probes (liveness, readiness, startup) are automatically removed since workers don't expose health endpoints
**2. Data Parallel Mode (Multiple model instances across nodes)**
- **When used**: When `world_size × data_parallel_size > GPUs_per_node`
- **Use case**: Running multiple independent model instances across nodes with data parallelism (e.g., MoE models with expert parallelism)
**All Nodes (Leader and Workers):**
- **Injected Flags**:
- `--data-parallel-address <leader-hostname>` - Address of the coordination server
- `--data-parallel-size-local <value>` - Number of data parallel workers per node
- `--data-parallel-rpc-port 13445` - RPC port for data parallel coordination
- `--data-parallel-start-rank <value>` - Starting rank for this node (calculated automatically)
- **Probes**: Worker probes are removed; leader probes remain active
**Note**: The operator intelligently injects these flags into your command regardless of command structure (direct Python commands or shell wrappers)
#### Compilation Cache Support
When a volume mount is configured with `useAsCompilationCache: true`, the operator automatically sets:
- **`VLLM_CACHE_ROOT`**: Environment variable pointing to the cache mount point
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment