Unverified Commit 6979710a authored by MatejKosec's avatar MatejKosec Committed by GitHub
Browse files

fix(operator): vLLM multi-node support for TP and DP (#5006)

parent fc776f7d
...@@ -143,9 +143,10 @@ For multinode deployments, the operator modifies probes based on the backend fra ...@@ -143,9 +143,10 @@ For multinode deployments, the operator modifies probes based on the backend fra
The operator automatically selects between two deployment modes based on parallelism configuration: The operator automatically selects between two deployment modes based on parallelism configuration:
**Ray-Based Mode** (when `world_size > GPUs_per_node`): **Tensor/Pipeline Parallel Mode** (when `world_size > GPUs_per_node`):
- **Worker nodes**: All probes (liveness, readiness, startup) are removed - Uses Ray for distributed execution (`--distributed-executor-backend ray`)
- **Leader nodes**: All probes remain active - **Leader nodes**: Starts Ray head and runs vLLM; all probes remain active
- **Worker nodes**: Run Ray agents only; all probes (liveness, readiness, startup) are removed
**Data Parallel Mode** (when `world_size × data_parallel_size > GPUs_per_node`): **Data Parallel Mode** (when `world_size × data_parallel_size > GPUs_per_node`):
- **Worker nodes**: All probes (liveness, readiness, startup) are removed - **Worker nodes**: All probes (liveness, readiness, startup) are removed
...@@ -247,7 +248,7 @@ Default container ports are configured based on component type: ...@@ -247,7 +248,7 @@ Default container ports are configured based on component type:
## Backend-Specific Configurations ## Backend-Specific Configurations
### VLLM ### VLLM
- **Ray Head Port**: 6379 (for Ray-based multinode deployments) - **Ray Head Port**: 6379 (for Ray cluster coordination in multinode TP/PP deployments)
- **Data Parallel RPC Port**: 13445 (for data parallel multinode deployments) - **Data Parallel RPC Port**: 13445 (for data parallel multinode deployments)
### SGLang ### SGLang
......
...@@ -825,7 +825,7 @@ func TestDynamoComponentDeploymentReconciler_generateLeaderWorkerSet(t *testing. ...@@ -825,7 +825,7 @@ func TestDynamoComponentDeploymentReconciler_generateLeaderWorkerSet(t *testing.
Name: commonconsts.MainContainerName, Name: commonconsts.MainContainerName,
Image: "test-image:latest", Image: "test-image:latest",
Command: []string{"/bin/sh", "-c"}, Command: []string{"/bin/sh", "-c"},
Args: []string{"ray start --head --port=6379 && some dynamo command --tensor-parallel-size 4 --pipeline-parallel-size 1"}, Args: []string{"ray start --head --port=6379 && some dynamo command --tensor-parallel-size 4 --pipeline-parallel-size 1 --distributed-executor-backend ray"},
Env: []corev1.EnvVar{ Env: []corev1.EnvVar{
{Name: commonconsts.DynamoComponentEnvVar, Value: commonconsts.ComponentTypeWorker}, {Name: commonconsts.DynamoComponentEnvVar, Value: commonconsts.ComponentTypeWorker},
{Name: commonconsts.DynamoDiscoveryBackendEnvVar, Value: "kubernetes"}, {Name: commonconsts.DynamoDiscoveryBackendEnvVar, Value: "kubernetes"},
......
...@@ -25,7 +25,7 @@ func (b *VLLMBackend) UpdateContainer(container *corev1.Container, numberOfNodes ...@@ -25,7 +25,7 @@ func (b *VLLMBackend) UpdateContainer(container *corev1.Container, numberOfNodes
if isMultinode { if isMultinode {
// Apply multinode-specific argument modifications // Apply multinode-specific argument modifications
updateVLLMMultinodeArgs(container, role, serviceName, multinodeDeployer, component.Resources) updateVLLMMultinodeArgs(container, role, serviceName, multinodeDeployer, component.Resources, numberOfNodes)
// Remove probes for multinode worker and leader // Remove probes for multinode worker and leader
if role == RoleWorker { if role == RoleWorker {
...@@ -71,12 +71,12 @@ func (b *VLLMBackend) UpdatePodSpec(podSpec *corev1.PodSpec, numberOfNodes int32 ...@@ -71,12 +71,12 @@ func (b *VLLMBackend) UpdatePodSpec(podSpec *corev1.PodSpec, numberOfNodes int32
// updateVLLMMultinodeArgs will inject Ray-specific flags for tensor parallel multinode deployments // updateVLLMMultinodeArgs will inject Ray-specific flags for tensor parallel multinode deployments
// OR data parallel flags for data parallel multinode deployments // OR data parallel flags for data parallel multinode deployments
func updateVLLMMultinodeArgs(container *corev1.Container, role Role, serviceName string, multinodeDeployer MultinodeDeployer, resources *v1alpha1.Resources) { func updateVLLMMultinodeArgs(container *corev1.Container, role Role, serviceName string, multinodeDeployer MultinodeDeployer, resources *v1alpha1.Resources, numberOfNodes int32) {
expandedArgs := getExpandedArgs(container) expandedArgs := getExpandedArgs(container)
if needsRayDistributedLaunch(expandedArgs, resources) { if needsRayDistributedLaunch(expandedArgs, resources) {
injectRayDistributedLaunchFlags(container, role, serviceName, multinodeDeployer) injectRayDistributedLaunchFlags(container, role, serviceName, multinodeDeployer)
} else if needsDataParallelLaunch(expandedArgs, resources) { } else if needsDataParallelLaunch(expandedArgs, resources) {
injectDataParallelLaunchFlags(container, role, serviceName, multinodeDeployer, resources) injectDataParallelLaunchFlags(container, role, serviceName, multinodeDeployer, resources, numberOfNodes)
} else { } else {
logger := log.Log.WithName("vllm-backend") logger := log.Log.WithName("vllm-backend")
logger.Info("No need to inject Ray-specific or data parallel flags for multinode deployments", "args", strings.Join(container.Args, " ")) logger.Info("No need to inject Ray-specific or data parallel flags for multinode deployments", "args", strings.Join(container.Args, " "))
...@@ -98,37 +98,85 @@ func injectRayDistributedLaunchFlags(container *corev1.Container, role Role, ser ...@@ -98,37 +98,85 @@ func injectRayDistributedLaunchFlags(container *corev1.Container, role Role, ser
case RoleLeader: case RoleLeader:
fullCommand := strings.Join(container.Command, " ") fullCommand := strings.Join(container.Command, " ")
originalArgs := strings.Join(container.Args, " ") originalArgs := strings.Join(container.Args, " ")
// Prepend ray start --head command to existing args // Use Ray executor for multi-node vLLM deployments.
container.Args = []string{fmt.Sprintf("ray start --head --port=%s && %s %s", VLLMPort, fullCommand, originalArgs)} // vLLM will create a placement group spanning all Ray nodes and spawn workers automatically.
// DO NOT pass --nnodes or --node-rank - these are only for mp backend.
// The Ray executor handles multi-node distribution via placement groups.
vllmMultinodeFlags := "--distributed-executor-backend ray"
container.Args = []string{fmt.Sprintf("ray start --head --port=%s && %s %s %s", VLLMPort, fullCommand, originalArgs, vllmMultinodeFlags)}
case RoleWorker: case RoleWorker:
// Worker nodes only run Ray, completely replace args // Worker nodes only run Ray agent - vLLM on leader will spawn Ray actors on workers
leaderHostname := multinodeDeployer.GetLeaderHostname(serviceName) leaderHostname := multinodeDeployer.GetLeaderHostname(serviceName)
container.Args = []string{fmt.Sprintf("ray start --address=%s:%s --block", leaderHostname, VLLMPort)} container.Args = []string{fmt.Sprintf("ray start --address=%s:%s --block", leaderHostname, VLLMPort)}
} }
container.Command = []string{"/bin/sh", "-c"} // ensure cmd is a shell container.Command = []string{"/bin/sh", "-c"} // ensure cmd is a shell
} }
func injectDataParallelLaunchFlags(container *corev1.Container, role Role, serviceName string, multinodeDeployer MultinodeDeployer, resources *v1alpha1.Resources) { func injectDataParallelLaunchFlags(container *corev1.Container, role Role, serviceName string, multinodeDeployer MultinodeDeployer, resources *v1alpha1.Resources, numberOfNodes int32) {
expandedArgs := getExpandedArgs(container) expandedArgs := getExpandedArgs(container)
leaderHostname := multinodeDeployer.GetLeaderHostname(serviceName) leaderHostname := multinodeDeployer.GetLeaderHostname(serviceName)
dataParallelSizeLocal := getContainerGPUs(resources) / getWorldSize(expandedArgs)
var startRank string // Calculate engines per node
containerGPUs := getContainerGPUs(resources)
worldSize := getWorldSize(expandedArgs) // TP * PP per engine
dataParallelSizeLocal := containerGPUs / worldSize
// Get total DP size from args, or calculate from nodes
totalDPSize := getFlagValue(expandedArgs, dataParallelSizeFlag)
if totalDPSize == 1 {
totalDPSize = dataParallelSizeLocal * int64(numberOfNodes)
}
var flags []string
needsShell := false
// Helper to check if flag already exists in args
hasFlag := func(flag string) bool {
for _, arg := range expandedArgs {
if arg == flag {
return true
}
}
return false
}
switch role { switch role {
case RoleLeader:
// Leader runs API server + coordinator + local engines
// Hybrid LB mode: local DP coordination within node, Dynamo routes between nodes
flags = []string{"--data-parallel-hybrid-lb"}
// Only inject --data-parallel-size if not already present (avoids duplicates from profiler)
if !hasFlag("--data-parallel-size") {
flags = append(flags, "--data-parallel-size", strconv.FormatInt(totalDPSize, 10))
}
flags = append(flags,
"--data-parallel-size-local", strconv.FormatInt(dataParallelSizeLocal, 10),
"--data-parallel-start-rank", "0",
"--data-parallel-address", leaderHostname,
"--data-parallel-rpc-port", dataParallelRPCPort,
)
case RoleWorker: case RoleWorker:
// Worker runs API server + coordinator + local engines on its node
// Hybrid LB mode: local DP coordination within node, Dynamo routes between nodes
nodeRank, _ := multinodeDeployer.GetNodeRank() nodeRank, _ := multinodeDeployer.GetNodeRank()
startRank = fmt.Sprintf("$(( %d * %s ))", dataParallelSizeLocal, nodeRank) startRank := fmt.Sprintf("$(( %d * %s ))", dataParallelSizeLocal, nodeRank)
case RoleLeader: needsShell = true // Need shell for arithmetic expansion
startRank = "0" // leader start rank is always 0
default: flags = []string{"--data-parallel-hybrid-lb"}
startRank = "0" // Only inject --data-parallel-size if not already present (avoids duplicates from profiler)
} if !hasFlag("--data-parallel-size") {
flags := []string{ flags = append(flags, "--data-parallel-size", strconv.FormatInt(totalDPSize, 10))
"--data-parallel-address", leaderHostname, }
"--data-parallel-size-local", strconv.FormatInt(dataParallelSizeLocal, 10), flags = append(flags,
"--data-parallel-rpc-port", dataParallelRPCPort, "--data-parallel-size-local", strconv.FormatInt(dataParallelSizeLocal, 10),
"--data-parallel-start-rank", startRank, "--data-parallel-start-rank", startRank,
"--data-parallel-address", leaderHostname,
"--data-parallel-rpc-port", dataParallelRPCPort,
)
} }
injectFlagsIntoContainerCommand(container, strings.Join(flags, " "), true, "vllm")
injectFlagsIntoContainerCommand(container, strings.Join(flags, " "), needsShell, "vllm")
} }
// if world size (within DP rank) > GPU count, then we need to inject ray // if world size (within DP rank) > GPU count, then we need to inject ray
......
...@@ -44,7 +44,7 @@ func TestVLLMBackend_UpdateContainer(t *testing.T) { ...@@ -44,7 +44,7 @@ func TestVLLMBackend_UpdateContainer(t *testing.T) {
multinodeDeployer: &GroveMultinodeDeployer{}, multinodeDeployer: &GroveMultinodeDeployer{},
initialContainer: &corev1.Container{Command: []string{"python3", "-m", "dynamo.vllm"}, Args: []string{"--model", "test", tensorParallelSizeFlag, "8"}}, initialContainer: &corev1.Container{Command: []string{"python3", "-m", "dynamo.vllm"}, Args: []string{"--model", "test", tensorParallelSizeFlag, "8"}},
gpuCount: 4, gpuCount: 4,
expectedArgs: []string{fmt.Sprintf("ray start --head --port=%s && python3 -m dynamo.vllm --model test %s 8", VLLMPort, tensorParallelSizeFlag)}, expectedArgs: []string{fmt.Sprintf("ray start --head --port=%s && python3 -m dynamo.vllm --model test %s 8 --distributed-executor-backend ray", VLLMPort, tensorParallelSizeFlag)},
expectProbesRemoved: true, expectProbesRemoved: true,
}, },
{ {
...@@ -156,7 +156,7 @@ func TestVLLMBackend_ShellCommandInjection(t *testing.T) { ...@@ -156,7 +156,7 @@ func TestVLLMBackend_ShellCommandInjection(t *testing.T) {
multinodeDeployer: &GroveMultinodeDeployer{}, multinodeDeployer: &GroveMultinodeDeployer{},
initialContainer: &corev1.Container{Command: []string{"sh", "-c"}, Args: []string{fmt.Sprintf("python3 -m dynamo.vllm %s 8", dataParallelSizeFlag)}}, initialContainer: &corev1.Container{Command: []string{"sh", "-c"}, Args: []string{fmt.Sprintf("python3 -m dynamo.vllm %s 8", dataParallelSizeFlag)}},
gpuCount: 4, gpuCount: 4,
expectedArgs: []string{"python3 -m dynamo.vllm --data-parallel-address $(GROVE_PCSG_NAME)-$(GROVE_PCSG_INDEX)-test-service-ldr-0.$(GROVE_HEADLESS_SERVICE) --data-parallel-size-local 4 --data-parallel-rpc-port 13445 --data-parallel-start-rank 0 --data-parallel-size 8"}, expectedArgs: []string{"python3 -m dynamo.vllm --data-parallel-hybrid-lb --data-parallel-size-local 4 --data-parallel-start-rank 0 --data-parallel-address $(GROVE_PCSG_NAME)-$(GROVE_PCSG_INDEX)-test-service-ldr-0.$(GROVE_HEADLESS_SERVICE) --data-parallel-rpc-port 13445 --data-parallel-size 8"},
description: "Shell commands should use regex injection for python commands", description: "Shell commands should use regex injection for python commands",
}, },
{ {
...@@ -166,7 +166,7 @@ func TestVLLMBackend_ShellCommandInjection(t *testing.T) { ...@@ -166,7 +166,7 @@ func TestVLLMBackend_ShellCommandInjection(t *testing.T) {
multinodeDeployer: &GroveMultinodeDeployer{}, multinodeDeployer: &GroveMultinodeDeployer{},
initialContainer: &corev1.Container{Command: []string{"sh", "-c"}, Args: []string{fmt.Sprintf("echo blah | wc -l && python3 -m dynamo.vllm %s 8 && ls -al", dataParallelSizeFlag)}}, initialContainer: &corev1.Container{Command: []string{"sh", "-c"}, Args: []string{fmt.Sprintf("echo blah | wc -l && python3 -m dynamo.vllm %s 8 && ls -al", dataParallelSizeFlag)}},
gpuCount: 4, gpuCount: 4,
expectedArgs: []string{"echo blah | wc -l && python3 -m dynamo.vllm --data-parallel-address $(GROVE_PCSG_NAME)-$(GROVE_PCSG_INDEX)-test-service-ldr-0.$(GROVE_HEADLESS_SERVICE) --data-parallel-size-local 4 --data-parallel-rpc-port 13445 --data-parallel-start-rank 0 --data-parallel-size 8 && ls -al"}, expectedArgs: []string{"echo blah | wc -l && python3 -m dynamo.vllm --data-parallel-hybrid-lb --data-parallel-size-local 4 --data-parallel-start-rank 0 --data-parallel-address $(GROVE_PCSG_NAME)-$(GROVE_PCSG_INDEX)-test-service-ldr-0.$(GROVE_HEADLESS_SERVICE) --data-parallel-rpc-port 13445 --data-parallel-size 8 && ls -al"},
description: "Complex shell commands should inject flags only into python part", description: "Complex shell commands should inject flags only into python part",
}, },
{ {
...@@ -176,7 +176,7 @@ func TestVLLMBackend_ShellCommandInjection(t *testing.T) { ...@@ -176,7 +176,7 @@ func TestVLLMBackend_ShellCommandInjection(t *testing.T) {
multinodeDeployer: &LWSMultinodeDeployer{}, multinodeDeployer: &LWSMultinodeDeployer{},
initialContainer: &corev1.Container{Command: []string{"sh", "-c"}, Args: []string{fmt.Sprintf("python3 -m dynamo.vllm %s 8", dataParallelSizeFlag)}}, initialContainer: &corev1.Container{Command: []string{"sh", "-c"}, Args: []string{fmt.Sprintf("python3 -m dynamo.vllm %s 8", dataParallelSizeFlag)}},
gpuCount: 4, gpuCount: 4,
expectedArgs: []string{"python3 -m dynamo.vllm --data-parallel-address $LWS_LEADER_ADDRESS --data-parallel-size-local 4 --data-parallel-rpc-port 13445 --data-parallel-start-rank 0 --data-parallel-size 8"}, expectedArgs: []string{"python3 -m dynamo.vllm --data-parallel-hybrid-lb --data-parallel-size-local 4 --data-parallel-start-rank 0 --data-parallel-address $LWS_LEADER_ADDRESS --data-parallel-rpc-port 13445 --data-parallel-size 8"},
description: "LWS shell commands should use LWS variables", description: "LWS shell commands should use LWS variables",
}, },
{ {
...@@ -186,7 +186,7 @@ func TestVLLMBackend_ShellCommandInjection(t *testing.T) { ...@@ -186,7 +186,7 @@ func TestVLLMBackend_ShellCommandInjection(t *testing.T) {
multinodeDeployer: &GroveMultinodeDeployer{}, multinodeDeployer: &GroveMultinodeDeployer{},
initialContainer: &corev1.Container{Command: []string{"sh", "-c"}, Args: []string{fmt.Sprintf("python3 -m dynamo.vllm %s 8 | tee /tmp/log", dataParallelSizeFlag)}}, initialContainer: &corev1.Container{Command: []string{"sh", "-c"}, Args: []string{fmt.Sprintf("python3 -m dynamo.vllm %s 8 | tee /tmp/log", dataParallelSizeFlag)}},
gpuCount: 4, gpuCount: 4,
expectedArgs: []string{"python3 -m dynamo.vllm --data-parallel-address $(GROVE_PCSG_NAME)-$(GROVE_PCSG_INDEX)-test-service-ldr-0.$(GROVE_HEADLESS_SERVICE) --data-parallel-size-local 4 --data-parallel-rpc-port 13445 --data-parallel-start-rank 0 --data-parallel-size 8 | tee /tmp/log"}, expectedArgs: []string{"python3 -m dynamo.vllm --data-parallel-hybrid-lb --data-parallel-size-local 4 --data-parallel-start-rank 0 --data-parallel-address $(GROVE_PCSG_NAME)-$(GROVE_PCSG_INDEX)-test-service-ldr-0.$(GROVE_HEADLESS_SERVICE) --data-parallel-rpc-port 13445 --data-parallel-size 8 | tee /tmp/log"},
description: "Shell commands with pipes should inject flags before pipe", description: "Shell commands with pipes should inject flags before pipe",
}, },
} }
...@@ -338,7 +338,7 @@ func TestUpdateVLLMMultinodeArgs(t *testing.T) { ...@@ -338,7 +338,7 @@ func TestUpdateVLLMMultinodeArgs(t *testing.T) {
multinodeDeployer: &GroveMultinodeDeployer{}, multinodeDeployer: &GroveMultinodeDeployer{},
initialContainer: &corev1.Container{Command: []string{"python3"}, Args: []string{"-m", "dynamo.vllm", tensorParallelSizeFlag, "16"}}, initialContainer: &corev1.Container{Command: []string{"python3"}, Args: []string{"-m", "dynamo.vllm", tensorParallelSizeFlag, "16"}},
gpuCount: 8, gpuCount: 8,
expectedArgs: []string{fmt.Sprintf("ray start --head --port=%s && python3 -m dynamo.vllm %s 16", VLLMPort, tensorParallelSizeFlag)}, expectedArgs: []string{fmt.Sprintf("ray start --head --port=%s && python3 -m dynamo.vllm %s 16 --distributed-executor-backend ray", VLLMPort, tensorParallelSizeFlag)},
}, },
{ {
name: "leader prepends distributed data parallel flags", name: "leader prepends distributed data parallel flags",
...@@ -346,7 +346,7 @@ func TestUpdateVLLMMultinodeArgs(t *testing.T) { ...@@ -346,7 +346,7 @@ func TestUpdateVLLMMultinodeArgs(t *testing.T) {
multinodeDeployer: &GroveMultinodeDeployer{}, multinodeDeployer: &GroveMultinodeDeployer{},
initialContainer: &corev1.Container{Command: []string{"python3"}, Args: []string{"-m", "dynamo.vllm", dataParallelSizeFlag, "16"}}, initialContainer: &corev1.Container{Command: []string{"python3"}, Args: []string{"-m", "dynamo.vllm", dataParallelSizeFlag, "16"}},
gpuCount: 8, gpuCount: 8,
expectedArgs: []string{fmt.Sprintf("exec python3 -m dynamo.vllm %s 16 --data-parallel-address $(GROVE_PCSG_NAME)-$(GROVE_PCSG_INDEX)-test-service-ldr-0.$(GROVE_HEADLESS_SERVICE) --data-parallel-size-local 8 --data-parallel-rpc-port 13445 --data-parallel-start-rank 0", dataParallelSizeFlag)}, expectedArgs: []string{"-m", "dynamo.vllm", dataParallelSizeFlag, "16", "--data-parallel-hybrid-lb", "--data-parallel-size-local", "8", "--data-parallel-start-rank", "0", "--data-parallel-address", "$(GROVE_PCSG_NAME)-$(GROVE_PCSG_INDEX)-test-service-ldr-0.$(GROVE_HEADLESS_SERVICE)", "--data-parallel-rpc-port", "13445"},
}, },
{ {
name: "leader with empty args does not modify", name: "leader with empty args does not modify",
...@@ -370,7 +370,7 @@ func TestUpdateVLLMMultinodeArgs(t *testing.T) { ...@@ -370,7 +370,7 @@ func TestUpdateVLLMMultinodeArgs(t *testing.T) {
multinodeDeployer: &GroveMultinodeDeployer{}, multinodeDeployer: &GroveMultinodeDeployer{},
initialContainer: &corev1.Container{Command: []string{"python3"}, Args: []string{"-m", "dynamo.vllm", dataParallelSizeFlag, "16"}}, initialContainer: &corev1.Container{Command: []string{"python3"}, Args: []string{"-m", "dynamo.vllm", dataParallelSizeFlag, "16"}},
gpuCount: 8, gpuCount: 8,
expectedArgs: []string{fmt.Sprintf("exec python3 -m dynamo.vllm %s 16 --data-parallel-address $(GROVE_PCSG_NAME)-$(GROVE_PCSG_INDEX)-test-service-ldr-0.$(GROVE_HEADLESS_SERVICE) --data-parallel-size-local 8 --data-parallel-rpc-port 13445 --data-parallel-start-rank $(( 8 * $((GROVE_PCLQ_POD_INDEX + 1)) ))", dataParallelSizeFlag)}, expectedArgs: []string{fmt.Sprintf("exec python3 -m dynamo.vllm %s 16 --data-parallel-hybrid-lb --data-parallel-size-local 8 --data-parallel-start-rank $(( 8 * $((GROVE_PCLQ_POD_INDEX + 1)) )) --data-parallel-address $(GROVE_PCSG_NAME)-$(GROVE_PCSG_INDEX)-test-service-ldr-0.$(GROVE_HEADLESS_SERVICE) --data-parallel-rpc-port 13445", dataParallelSizeFlag)},
}, },
{ {
name: "worker with data parallel launch Grove, tp > 1", name: "worker with data parallel launch Grove, tp > 1",
...@@ -378,7 +378,7 @@ func TestUpdateVLLMMultinodeArgs(t *testing.T) { ...@@ -378,7 +378,7 @@ func TestUpdateVLLMMultinodeArgs(t *testing.T) {
multinodeDeployer: &GroveMultinodeDeployer{}, multinodeDeployer: &GroveMultinodeDeployer{},
initialContainer: &corev1.Container{Command: []string{"python3"}, Args: []string{"-m", "dynamo.vllm", dataParallelSizeFlag, "8", tensorParallelSizeFlag, "2"}}, initialContainer: &corev1.Container{Command: []string{"python3"}, Args: []string{"-m", "dynamo.vllm", dataParallelSizeFlag, "8", tensorParallelSizeFlag, "2"}},
gpuCount: 8, gpuCount: 8,
expectedArgs: []string{fmt.Sprintf("exec python3 -m dynamo.vllm %s 8 %s 2 --data-parallel-address $(GROVE_PCSG_NAME)-$(GROVE_PCSG_INDEX)-test-service-ldr-0.$(GROVE_HEADLESS_SERVICE) --data-parallel-size-local 4 --data-parallel-rpc-port 13445 --data-parallel-start-rank $(( 4 * $((GROVE_PCLQ_POD_INDEX + 1)) ))", dataParallelSizeFlag, tensorParallelSizeFlag)}, expectedArgs: []string{fmt.Sprintf("exec python3 -m dynamo.vllm %s 8 %s 2 --data-parallel-hybrid-lb --data-parallel-size-local 4 --data-parallel-start-rank $(( 4 * $((GROVE_PCLQ_POD_INDEX + 1)) )) --data-parallel-address $(GROVE_PCSG_NAME)-$(GROVE_PCSG_INDEX)-test-service-ldr-0.$(GROVE_HEADLESS_SERVICE) --data-parallel-rpc-port 13445", dataParallelSizeFlag, tensorParallelSizeFlag)},
}, },
{ {
name: "worker with ray distributed launch LWS", name: "worker with ray distributed launch LWS",
...@@ -415,7 +415,7 @@ func TestUpdateVLLMMultinodeArgs(t *testing.T) { ...@@ -415,7 +415,7 @@ func TestUpdateVLLMMultinodeArgs(t *testing.T) {
} }
// Call updateVLLMMultinodeArgs // Call updateVLLMMultinodeArgs
updateVLLMMultinodeArgs(tt.initialContainer, tt.role, "test-service", tt.multinodeDeployer, resources) updateVLLMMultinodeArgs(tt.initialContainer, tt.role, "test-service", tt.multinodeDeployer, resources, 2)
if tt.expectNotModified { if tt.expectNotModified {
// Args should not have changed // Args should not have changed
......
...@@ -2898,7 +2898,7 @@ func TestGenerateGrovePodCliqueSet(t *testing.T) { ...@@ -2898,7 +2898,7 @@ func TestGenerateGrovePodCliqueSet(t *testing.T) {
"-c", "-c",
}, },
Args: []string{ Args: []string{
"ray start --head --port=6379 && python3 -m dynamo.vllm --custom-flag custom-value --tensor-parallel-size 4 --pipeline-parallel-size 1", "ray start --head --port=6379 && python3 -m dynamo.vllm --custom-flag custom-value --tensor-parallel-size 4 --pipeline-parallel-size 1 --distributed-executor-backend ray",
}, },
Ports: []corev1.ContainerPort{ Ports: []corev1.ContainerPort{
{ {
......
...@@ -26,7 +26,7 @@ type GroveMultinodeDeployer struct { ...@@ -26,7 +26,7 @@ type GroveMultinodeDeployer struct {
} }
func (d *GroveMultinodeDeployer) GetLeaderHostname(serviceName string) string { func (d *GroveMultinodeDeployer) GetLeaderHostname(serviceName string) string {
return fmt.Sprintf("$(GROVE_PCSG_NAME)-$(GROVE_PCSG_INDEX)-%s-%s-0.$(GROVE_HEADLESS_SERVICE)", serviceName, commonconsts.GroveRoleSuffixLeader) return fmt.Sprintf("$(GROVE_PCSG_NAME)-$(GROVE_PCSG_INDEX)-%s-%s-0.$(GROVE_HEADLESS_SERVICE)", strings.ToLower(serviceName), commonconsts.GroveRoleSuffixLeader)
} }
func (d *GroveMultinodeDeployer) GetNodeRank() (string, bool) { func (d *GroveMultinodeDeployer) GetNodeRank() (string, bool) {
...@@ -46,7 +46,7 @@ func (d *GroveMultinodeDeployer) GetHostNames(serviceName string, numberOfNodes ...@@ -46,7 +46,7 @@ func (d *GroveMultinodeDeployer) GetHostNames(serviceName string, numberOfNodes
// Add worker hostnames // Add worker hostnames
for i := int32(0); i < numberOfNodes-1; i++ { for i := int32(0); i < numberOfNodes-1; i++ {
workerHostname := fmt.Sprintf("$(GROVE_PCSG_NAME)-$(GROVE_PCSG_INDEX)-%s-%s-%d.$(GROVE_HEADLESS_SERVICE)", workerHostname := fmt.Sprintf("$(GROVE_PCSG_NAME)-$(GROVE_PCSG_INDEX)-%s-%s-%d.$(GROVE_HEADLESS_SERVICE)",
serviceName, commonconsts.GroveRoleSuffixWorker, i) strings.ToLower(serviceName), commonconsts.GroveRoleSuffixWorker, i)
hostnames = append(hostnames, workerHostname) hostnames = append(hostnames, workerHostname)
} }
return hostnames return hostnames
......
...@@ -964,9 +964,10 @@ For multinode deployments, the operator modifies probes based on the backend fra ...@@ -964,9 +964,10 @@ For multinode deployments, the operator modifies probes based on the backend fra
The operator automatically selects between two deployment modes based on parallelism configuration: The operator automatically selects between two deployment modes based on parallelism configuration:
**Ray-Based Mode** (when `world_size > GPUs_per_node`): **Tensor/Pipeline Parallel Mode** (when `world_size > GPUs_per_node`):
- **Worker nodes**: All probes (liveness, readiness, startup) are removed - Uses Ray for distributed execution (`--distributed-executor-backend ray`)
- **Leader nodes**: All probes remain active - **Leader nodes**: Starts Ray head and runs vLLM; all probes remain active
- **Worker nodes**: Run Ray agents only; all probes (liveness, readiness, startup) are removed
**Data Parallel Mode** (when `world_size × data_parallel_size > GPUs_per_node`): **Data Parallel Mode** (when `world_size × data_parallel_size > GPUs_per_node`):
- **Worker nodes**: All probes (liveness, readiness, startup) are removed - **Worker nodes**: All probes (liveness, readiness, startup) are removed
...@@ -1068,7 +1069,7 @@ Default container ports are configured based on component type: ...@@ -1068,7 +1069,7 @@ Default container ports are configured based on component type:
## Backend-Specific Configurations ## Backend-Specific Configurations
### VLLM ### VLLM
- **Ray Head Port**: 6379 (for Ray-based multinode deployments) - **Ray Head Port**: 6379 (for Ray cluster coordination in multinode TP/PP deployments)
- **Data Parallel RPC Port**: 13445 (for data parallel multinode deployments) - **Data Parallel RPC Port**: 13445 (for data parallel multinode deployments)
### SGLang ### SGLang
......
...@@ -190,17 +190,23 @@ For vLLM multinode deployments, the operator automatically selects and configure ...@@ -190,17 +190,23 @@ For vLLM multinode deployments, the operator automatically selects and configure
The operator automatically determines the deployment mode based on your parallelism configuration: The operator automatically determines the deployment mode based on your parallelism configuration:
**1. Ray-Based Mode (Tensor/Pipeline Parallelism across nodes)** **1. Tensor/Pipeline Parallelism Mode (Single model across nodes)**
- **When used**: When `world_size > GPUs_per_node` where `world_size = tensor_parallel_size × pipeline_parallel_size` - **When used**: When `world_size > GPUs_per_node` where `world_size = tensor_parallel_size × pipeline_parallel_size`
- **Use case**: Distributing a single model instance across multiple nodes using tensor or pipeline parallelism - **Use case**: Distributing a single model instance across multiple nodes using tensor or pipeline parallelism
The operator uses Ray for multi-node tensor/pipeline parallel deployments. Ray provides automatic placement group management and worker spawning across nodes.
**Leader Node:** **Leader Node:**
- **Ray Head**: The operator prepends `ray start --head --port=6379` to your existing command - **Command**: `ray start --head --port=6379 && <original-vllm-command> --distributed-executor-backend ray`
- **Behavior**: Starts Ray head node, then runs vLLM which creates a placement group spanning all Ray workers
- **Probes**: All health probes remain active (liveness, readiness, startup) - **Probes**: All health probes remain active (liveness, readiness, startup)
**Worker Nodes:** **Worker Nodes:**
- **Ray Worker**: The command is replaced with `ray start --address=<leader-hostname>:6379 --block` - **Command**: `ray start --address=<leader-hostname>:6379 --block`
- **Probes**: All probes (liveness, readiness, startup) are automatically removed since workers don't expose health endpoints - **Behavior**: Joins Ray cluster and blocks; vLLM on leader spawns Ray actors to these workers
- **Probes**: All probes (liveness, readiness, startup) are automatically removed
> **Note**: vLLM's Ray executor automatically creates a placement group and spawns workers across the cluster. The `--nnodes` flag is NOT used with Ray - it's only compatible with the `mp` backend.
**2. Data Parallel Mode (Multiple model instances across nodes)** **2. Data Parallel Mode (Multiple model instances across nodes)**
- **When used**: When `world_size × data_parallel_size > GPUs_per_node` - **When used**: When `world_size × data_parallel_size > GPUs_per_node`
...@@ -216,6 +222,18 @@ The operator automatically determines the deployment mode based on your parallel ...@@ -216,6 +222,18 @@ The operator automatically determines the deployment mode based on your parallel
**Note**: The operator intelligently injects these flags into your command regardless of command structure (direct Python commands or shell wrappers) **Note**: The operator intelligently injects these flags into your command regardless of command structure (direct Python commands or shell wrappers)
#### Why Ray for Multi-Node TP/PP?
vLLM supports two distributed executor backends: `ray` and `mp`. For multi-node deployments:
- **Ray executor**: vLLM creates a placement group and spawns Ray actors across the cluster. Workers don't run vLLM directly - the leader's vLLM process manages everything.
- **mp executor**: Each node must run its own vLLM process with `--nnodes`, `--node-rank`, `--master-addr`, `--master-port`. This approach is more complex to orchestrate.
The Dynamo operator uses Ray because:
1. It aligns with vLLM's official multi-node documentation (see `multi-node-serving.sh`)
2. Simpler orchestration - only the leader runs vLLM, workers just need Ray agents
3. vLLM automatically handles placement group creation and worker management
#### Compilation Cache Support #### Compilation Cache Support
When a volume mount is configured with `useAsCompilationCache: true`, the operator automatically sets: When a volume mount is configured with `useAsCompilationCache: true`, the operator automatically sets:
- **`VLLM_CACHE_ROOT`**: Environment variable pointing to the cache mount point - **`VLLM_CACHE_ROOT`**: Environment variable pointing to the cache mount point
......
...@@ -78,7 +78,7 @@ ...@@ -78,7 +78,7 @@
**Time-To-First-Token (TTFT)** - The latency from receiving a request to generating the first output token. **Time-To-First-Token (TTFT)** - The latency from receiving a request to generating the first output token.
## V ## V
**vLLM** - High-throughput LLM serving engine with Ray distributed support and PagedAttention. **vLLM** - High-throughput LLM serving engine with distributed tensor/pipeline parallelism and PagedAttention.
## W ## W
**Wide Expert Parallelism (WideEP)** - Mixture-of-Experts deployment strategy that spreads experts across many GPUs (e.g., 64-way EP) so each GPU hosts only a few experts. **Wide Expert Parallelism (WideEP)** - Mixture-of-Experts deployment strategy that spreads experts across many GPUs (e.g., 64-way EP) so each GPU hosts only a few experts.
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment