Unverified Commit da7d6724 authored by Julien Mancuso's avatar Julien Mancuso Committed by GitHub
Browse files

fix(operator): emit Kubernetes syntax from LWS deployer so kubelet expands...

fix(operator): emit Kubernetes  syntax from LWS deployer so kubelet expands the leader hostname in direct-python container args (#8369)
parent 9054c882
...@@ -910,7 +910,7 @@ func TestDynamoComponentDeploymentReconciler_generateLeaderWorkerSet(t *testing. ...@@ -910,7 +910,7 @@ func TestDynamoComponentDeploymentReconciler_generateLeaderWorkerSet(t *testing.
Name: commonconsts.MainContainerName, Name: commonconsts.MainContainerName,
Image: "test-image:latest", Image: "test-image:latest",
Command: []string{"/bin/sh", "-c"}, Command: []string{"/bin/sh", "-c"},
Args: []string{"ray start --address=$LWS_LEADER_ADDRESS:6379 --block"}, Args: []string{"ray start --address=$(LWS_LEADER_ADDRESS):6379 --block"},
Env: []corev1.EnvVar{ Env: []corev1.EnvVar{
{Name: commonconsts.DynamoComponentEnvVar, Value: commonconsts.ComponentTypeWorker}, {Name: commonconsts.DynamoComponentEnvVar, Value: commonconsts.ComponentTypeWorker},
{Name: commonconsts.DynamoDiscoveryBackendEnvVar, Value: "kubernetes"}, {Name: commonconsts.DynamoDiscoveryBackendEnvVar, Value: "kubernetes"},
......
...@@ -3,7 +3,6 @@ package dynamo ...@@ -3,7 +3,6 @@ package dynamo
import ( import (
"fmt" "fmt"
"regexp" "regexp"
"strings"
"github.com/ai-dynamo/dynamo/deploy/operator/api/v1alpha1" "github.com/ai-dynamo/dynamo/deploy/operator/api/v1alpha1"
corev1 "k8s.io/api/core/v1" corev1 "k8s.io/api/core/v1"
...@@ -77,7 +76,6 @@ func (b *SGLangBackend) getMultinodeFlags(numberOfNodes int32, role Role, servic ...@@ -77,7 +76,6 @@ func (b *SGLangBackend) getMultinodeFlags(numberOfNodes int32, role Role, servic
if role == RoleLeader { if role == RoleLeader {
nodeRank = "0" nodeRank = "0"
needsShell = false needsShell = false
leaderHostname = convertIfShellVar(leaderHostname)
} else { } else {
nodeRank, needsShell = multinodeDeployer.GetNodeRank() nodeRank, needsShell = multinodeDeployer.GetNodeRank()
} }
...@@ -86,19 +84,3 @@ func (b *SGLangBackend) getMultinodeFlags(numberOfNodes int32, role Role, servic ...@@ -86,19 +84,3 @@ func (b *SGLangBackend) getMultinodeFlags(numberOfNodes int32, role Role, servic
flags := fmt.Sprintf("--dist-init-addr %s --nnodes %d --node-rank %s", distInitAddr, numberOfNodes, nodeRank) flags := fmt.Sprintf("--dist-init-addr %s --nnodes %d --node-rank %s", distInitAddr, numberOfNodes, nodeRank)
return flags, needsShell return flags, needsShell
} }
// Match a string representing a shell variable, such as $ABC
var shellVarRe = regexp.MustCompile(`^\$([A-Za-z_][A-Za-z0-9_]*)$`)
// convertIfShellVar convert shell variable $ABC to $(ABC)
func convertIfShellVar(s string) string {
if strings.HasPrefix(s, "$(") && strings.HasSuffix(s, ")") {
return s
}
if match := shellVarRe.FindStringSubmatch(s); len(match) > 1 {
return "$(" + match[1] + ")"
}
return s
}
...@@ -92,6 +92,34 @@ func TestSGLangBackend_PythonCommandInjection(t *testing.T) { ...@@ -92,6 +92,34 @@ func TestSGLangBackend_PythonCommandInjection(t *testing.T) {
expectedArgs: []string{"-m", "dynamo.sglang", "--model", "llama", "--dist-init-addr", "leader.example.com:29500", "--nnodes", "2", "--node-rank", "1"}, expectedArgs: []string{"-m", "dynamo.sglang", "--model", "llama", "--dist-init-addr", "leader.example.com:29500", "--nnodes", "2", "--node-rank", "1"},
description: "Direct python command with simple deployer should append flags", description: "Direct python command with simple deployer should append flags",
}, },
{
// LWS worker returns $(LWS_WORKER_INDEX) with needsShell=false, so
// flags are appended directly to Args and the kubelet expands both
// $(LWS_LEADER_ADDRESS) and $(LWS_WORKER_INDEX) before the container
// starts - no sh -c wrapper.
name: "python command LWS worker - direct append (kubelet expansion)",
numberOfNodes: 2,
role: RoleWorker,
multinodeDeployer: &LWSMultinodeDeployer{},
initialCommand: []string{"python3"},
initialArgs: []string{"-m", "dynamo.sglang", "--model", "llama"},
expectedCommand: []string{"python3"},
expectedArgs: []string{"-m", "dynamo.sglang", "--model", "llama", "--dist-init-addr", "$(LWS_LEADER_ADDRESS):29500", "--nnodes", "2", "--node-rank", "$(LWS_WORKER_INDEX)"},
description: "LWS worker with direct python command should append flags without sh -c wrapping",
},
{
// LWS leader uses rank 0 (plain integer literal) so needsShell is
// false for both leader and worker roles.
name: "python command LWS leader - direct append",
numberOfNodes: 2,
role: RoleLeader,
multinodeDeployer: &LWSMultinodeDeployer{},
initialCommand: []string{"python3"},
initialArgs: []string{"-m", "dynamo.sglang"},
expectedCommand: []string{"python3"},
expectedArgs: []string{"-m", "dynamo.sglang", "--dist-init-addr", "$(LWS_LEADER_ADDRESS):29500", "--nnodes", "2", "--node-rank", "0"},
description: "LWS leader with direct python command should append flags with kubelet-expanded leader hostname",
},
{ {
name: "python command shell deployer - shell wrapping", name: "python command shell deployer - shell wrapping",
numberOfNodes: 2, numberOfNodes: 2,
......
...@@ -215,7 +215,7 @@ func TestTRTLLMBackend_UpdateContainer(t *testing.T) { ...@@ -215,7 +215,7 @@ func TestTRTLLMBackend_UpdateContainer(t *testing.T) {
{Name: mpiRunSecretName, MountPath: "/ssh-pk", ReadOnly: true}, {Name: mpiRunSecretName, MountPath: "/ssh-pk", ReadOnly: true},
}, },
expectedCommand: []string{"/bin/sh", "-c"}, expectedCommand: []string{"/bin/sh", "-c"},
expectedArgs: []string{"mkdir -p $HOME/.ssh && ls -la /ssh-pk/ && cp /ssh-pk/private.key $HOME/.ssh/id_rsa && cp /ssh-pk/private.key.pub $HOME/.ssh/id_rsa.pub && cp /ssh-pk/private.key.pub $HOME/.ssh/authorized_keys && chmod 600 $HOME/.ssh/id_rsa $HOME/.ssh/authorized_keys && chmod 644 $HOME/.ssh/id_rsa.pub && printf 'Host *\\nIdentityFile '$HOME'/.ssh/id_rsa\\nStrictHostKeyChecking no\\nPort 2222\\n' > $HOME/.ssh/config && TIMEOUT=300; START_TIME=$(date +%s); for worker in $(echo \"$LWS_LEADER_ADDRESS\" | sed 's/\\./-1\\./'); do echo \"Waiting for DNS: $worker\"; until getent hosts $worker >/dev/null 2>&1; do CURRENT_TIME=$(date +%s); if [ $((CURRENT_TIME - START_TIME)) -gt $TIMEOUT ]; then echo \"ERROR: Timeout waiting for DNS: $worker\"; exit 1; fi; echo \"DNS not ready for $worker, retrying...\"; sleep 2; done; echo \"✓ DNS resolved: $worker\"; done; echo \"All workers DNS ready\" && mpirun $([ \"$(id -u)\" = \"0\" ] && echo --allow-run-as-root) --oversubscribe -n 2 -H $LWS_LEADER_ADDRESS,$(echo \"$LWS_LEADER_ADDRESS\" | sed 's/\\./-1\\./') --mca pml ob1 --mca plm_rsh_args \"-p 2222 -o StrictHostKeyChecking=no -i $HOME/.ssh/id_rsa\" -x CUDA_VISIBLE_DEVICES -x HF_DATASETS_CACHE -x HF_ENDPOINT -x HF_HOME -x HF_TOKEN -x HOME -x HUGGING_FACE_HUB_TOKEN -x LD_LIBRARY_PATH -x MODEL_PATH -x NCCL_DEBUG -x NCCL_IB_DISABLE -x NCCL_P2P_DISABLE -x OMPI_MCA_orte_keep_fqdn_hostnames -x PATH -x PYTHONPATH -x TENSORRT_LLM_CACHE_DIR -x TOKENIZERS_PARALLELISM -x TRANSFORMERS_CACHE -x TRTLLM_USE_UCX_KVCACHE -x USER bash -c 'trtllm-llmapi-launch python3 --model test'"}, expectedArgs: []string{"mkdir -p $HOME/.ssh && ls -la /ssh-pk/ && cp /ssh-pk/private.key $HOME/.ssh/id_rsa && cp /ssh-pk/private.key.pub $HOME/.ssh/id_rsa.pub && cp /ssh-pk/private.key.pub $HOME/.ssh/authorized_keys && chmod 600 $HOME/.ssh/id_rsa $HOME/.ssh/authorized_keys && chmod 644 $HOME/.ssh/id_rsa.pub && printf 'Host *\\nIdentityFile '$HOME'/.ssh/id_rsa\\nStrictHostKeyChecking no\\nPort 2222\\n' > $HOME/.ssh/config && TIMEOUT=300; START_TIME=$(date +%s); for worker in $(echo \"$LWS_LEADER_ADDRESS\" | sed 's/\\./-1\\./'); do echo \"Waiting for DNS: $worker\"; until getent hosts $worker >/dev/null 2>&1; do CURRENT_TIME=$(date +%s); if [ $((CURRENT_TIME - START_TIME)) -gt $TIMEOUT ]; then echo \"ERROR: Timeout waiting for DNS: $worker\"; exit 1; fi; echo \"DNS not ready for $worker, retrying...\"; sleep 2; done; echo \"✓ DNS resolved: $worker\"; done; echo \"All workers DNS ready\" && mpirun $([ \"$(id -u)\" = \"0\" ] && echo --allow-run-as-root) --oversubscribe -n 2 -H $(LWS_LEADER_ADDRESS),$(echo \"$LWS_LEADER_ADDRESS\" | sed 's/\\./-1\\./') --mca pml ob1 --mca plm_rsh_args \"-p 2222 -o StrictHostKeyChecking=no -i $HOME/.ssh/id_rsa\" -x CUDA_VISIBLE_DEVICES -x HF_DATASETS_CACHE -x HF_ENDPOINT -x HF_HOME -x HF_TOKEN -x HOME -x HUGGING_FACE_HUB_TOKEN -x LD_LIBRARY_PATH -x MODEL_PATH -x NCCL_DEBUG -x NCCL_IB_DISABLE -x NCCL_P2P_DISABLE -x OMPI_MCA_orte_keep_fqdn_hostnames -x PATH -x PYTHONPATH -x TENSORRT_LLM_CACHE_DIR -x TOKENIZERS_PARALLELISM -x TRANSFORMERS_CACHE -x TRTLLM_USE_UCX_KVCACHE -x USER bash -c 'trtllm-llmapi-launch python3 --model test'"},
expectedEnv: []corev1.EnvVar{ expectedEnv: []corev1.EnvVar{
{Name: "OMPI_MCA_orte_keep_fqdn_hostnames", Value: "1"}, {Name: "OMPI_MCA_orte_keep_fqdn_hostnames", Value: "1"},
}, },
...@@ -512,7 +512,7 @@ func TestTRTLLMBackend_hostNamesList(t *testing.T) { ...@@ -512,7 +512,7 @@ func TestTRTLLMBackend_hostNamesList(t *testing.T) {
multinodeDeployer: &LWSMultinodeDeployer{}, multinodeDeployer: &LWSMultinodeDeployer{},
serviceName: "test-service", serviceName: "test-service",
expectedContains: []string{ expectedContains: []string{
"$LWS_LEADER_ADDRESS", "$(LWS_LEADER_ADDRESS)",
"$(echo \"$LWS_LEADER_ADDRESS\" | sed 's/\\./-1\\./')", "$(echo \"$LWS_LEADER_ADDRESS\" | sed 's/\\./-1\\./')",
}, },
}, },
...@@ -535,7 +535,7 @@ func TestTRTLLMBackend_hostNamesList(t *testing.T) { ...@@ -535,7 +535,7 @@ func TestTRTLLMBackend_hostNamesList(t *testing.T) {
multinodeDeployer: &LWSMultinodeDeployer{}, multinodeDeployer: &LWSMultinodeDeployer{},
serviceName: "worker", serviceName: "worker",
expectedContains: []string{ expectedContains: []string{
"$LWS_LEADER_ADDRESS", "$(LWS_LEADER_ADDRESS)",
"$(echo \"$LWS_LEADER_ADDRESS\" | sed 's/\\./-1\\./')", "$(echo \"$LWS_LEADER_ADDRESS\" | sed 's/\\./-1\\./')",
"$(echo \"$LWS_LEADER_ADDRESS\" | sed 's/\\./-2\\./')", "$(echo \"$LWS_LEADER_ADDRESS\" | sed 's/\\./-2\\./')",
"$(echo \"$LWS_LEADER_ADDRESS\" | sed 's/\\./-3\\./')", "$(echo \"$LWS_LEADER_ADDRESS\" | sed 's/\\./-3\\./')",
...@@ -652,7 +652,7 @@ func TestTRTLLMBackend_setupLeaderContainer(t *testing.T) { ...@@ -652,7 +652,7 @@ func TestTRTLLMBackend_setupLeaderContainer(t *testing.T) {
component: &v1alpha1.DynamoComponentDeploymentSharedSpec{}, component: &v1alpha1.DynamoComponentDeploymentSharedSpec{},
initialArgs: []string{}, initialArgs: []string{},
initialCommand: []string{"python", "-m", "worker"}, initialCommand: []string{"python", "-m", "worker"},
expected: "mkdir -p $HOME/.ssh && ls -la /ssh-pk/ && cp /ssh-pk/private.key $HOME/.ssh/id_rsa && cp /ssh-pk/private.key.pub $HOME/.ssh/id_rsa.pub && cp /ssh-pk/private.key.pub $HOME/.ssh/authorized_keys && chmod 600 $HOME/.ssh/id_rsa $HOME/.ssh/authorized_keys && chmod 644 $HOME/.ssh/id_rsa.pub && printf 'Host *\\nIdentityFile '$HOME'/.ssh/id_rsa\\nStrictHostKeyChecking no\\nPort 2222\\n' > $HOME/.ssh/config && TIMEOUT=300; START_TIME=$(date +%s); for worker in $(echo \"$LWS_LEADER_ADDRESS\" | sed 's/\\./-1\\./'); do echo \"Waiting for DNS: $worker\"; until getent hosts $worker >/dev/null 2>&1; do CURRENT_TIME=$(date +%s); if [ $((CURRENT_TIME - START_TIME)) -gt $TIMEOUT ]; then echo \"ERROR: Timeout waiting for DNS: $worker\"; exit 1; fi; echo \"DNS not ready for $worker, retrying...\"; sleep 2; done; echo \"✓ DNS resolved: $worker\"; done; echo \"All workers DNS ready\" && mpirun $([ \"$(id -u)\" = \"0\" ] && echo --allow-run-as-root) --oversubscribe -n 0 -H $LWS_LEADER_ADDRESS,$(echo \"$LWS_LEADER_ADDRESS\" | sed 's/\\./-1\\./') --mca pml ob1 --mca plm_rsh_args \"-p 2222 -o StrictHostKeyChecking=no -i $HOME/.ssh/id_rsa\" -x CUDA_VISIBLE_DEVICES -x HF_DATASETS_CACHE -x HF_ENDPOINT -x HF_HOME -x HF_TOKEN -x HOME -x HUGGING_FACE_HUB_TOKEN -x LD_LIBRARY_PATH -x MODEL_PATH -x NCCL_DEBUG -x NCCL_IB_DISABLE -x NCCL_P2P_DISABLE -x PATH -x PYTHONPATH -x TENSORRT_LLM_CACHE_DIR -x TOKENIZERS_PARALLELISM -x TRANSFORMERS_CACHE -x TRTLLM_USE_UCX_KVCACHE -x USER bash -c 'trtllm-llmapi-launch python -m worker'", expected: "mkdir -p $HOME/.ssh && ls -la /ssh-pk/ && cp /ssh-pk/private.key $HOME/.ssh/id_rsa && cp /ssh-pk/private.key.pub $HOME/.ssh/id_rsa.pub && cp /ssh-pk/private.key.pub $HOME/.ssh/authorized_keys && chmod 600 $HOME/.ssh/id_rsa $HOME/.ssh/authorized_keys && chmod 644 $HOME/.ssh/id_rsa.pub && printf 'Host *\\nIdentityFile '$HOME'/.ssh/id_rsa\\nStrictHostKeyChecking no\\nPort 2222\\n' > $HOME/.ssh/config && TIMEOUT=300; START_TIME=$(date +%s); for worker in $(echo \"$LWS_LEADER_ADDRESS\" | sed 's/\\./-1\\./'); do echo \"Waiting for DNS: $worker\"; until getent hosts $worker >/dev/null 2>&1; do CURRENT_TIME=$(date +%s); if [ $((CURRENT_TIME - START_TIME)) -gt $TIMEOUT ]; then echo \"ERROR: Timeout waiting for DNS: $worker\"; exit 1; fi; echo \"DNS not ready for $worker, retrying...\"; sleep 2; done; echo \"✓ DNS resolved: $worker\"; done; echo \"All workers DNS ready\" && mpirun $([ \"$(id -u)\" = \"0\" ] && echo --allow-run-as-root) --oversubscribe -n 0 -H $(LWS_LEADER_ADDRESS),$(echo \"$LWS_LEADER_ADDRESS\" | sed 's/\\./-1\\./') --mca pml ob1 --mca plm_rsh_args \"-p 2222 -o StrictHostKeyChecking=no -i $HOME/.ssh/id_rsa\" -x CUDA_VISIBLE_DEVICES -x HF_DATASETS_CACHE -x HF_ENDPOINT -x HF_HOME -x HF_TOKEN -x HOME -x HUGGING_FACE_HUB_TOKEN -x LD_LIBRARY_PATH -x MODEL_PATH -x NCCL_DEBUG -x NCCL_IB_DISABLE -x NCCL_P2P_DISABLE -x PATH -x PYTHONPATH -x TENSORRT_LLM_CACHE_DIR -x TOKENIZERS_PARALLELISM -x TRANSFORMERS_CACHE -x TRTLLM_USE_UCX_KVCACHE -x USER bash -c 'trtllm-llmapi-launch python -m worker'",
}, },
{ {
name: "Leader with both command and args (shell command - args take precedence)", name: "Leader with both command and args (shell command - args take precedence)",
......
...@@ -88,7 +88,7 @@ func TestVLLMBackend_UpdateContainer(t *testing.T) { ...@@ -88,7 +88,7 @@ func TestVLLMBackend_UpdateContainer(t *testing.T) {
multinodeDeployer: &LWSMultinodeDeployer{}, multinodeDeployer: &LWSMultinodeDeployer{},
initialContainer: &corev1.Container{Args: []string{"python3", "-m", "dynamo.vllm", tensorParallelSizeFlag, "8"}}, initialContainer: &corev1.Container{Args: []string{"python3", "-m", "dynamo.vllm", tensorParallelSizeFlag, "8"}},
gpuCount: 4, gpuCount: 4,
expectedArgs: []string{"ray start --address=$LWS_LEADER_ADDRESS:6379 --block"}, expectedArgs: []string{"ray start --address=$(LWS_LEADER_ADDRESS):6379 --block"},
expectProbesRemoved: true, expectProbesRemoved: true,
}, },
{ {
...@@ -261,7 +261,7 @@ func TestVLLMBackend_ShellCommandInjection(t *testing.T) { ...@@ -261,7 +261,7 @@ func TestVLLMBackend_ShellCommandInjection(t *testing.T) {
multinodeDeployer: &LWSMultinodeDeployer{}, multinodeDeployer: &LWSMultinodeDeployer{},
initialContainer: &corev1.Container{Command: []string{"sh", "-c"}, Args: []string{fmt.Sprintf("python3 -m dynamo.vllm %s 8", dataParallelSizeFlag)}}, initialContainer: &corev1.Container{Command: []string{"sh", "-c"}, Args: []string{fmt.Sprintf("python3 -m dynamo.vllm %s 8", dataParallelSizeFlag)}},
gpuCount: 4, gpuCount: 4,
expectedArgs: []string{"python3 -m dynamo.vllm --data-parallel-hybrid-lb --data-parallel-size-local 4 --data-parallel-start-rank 0 --data-parallel-address $LWS_LEADER_ADDRESS --data-parallel-rpc-port 13445 --data-parallel-size 8"}, expectedArgs: []string{"python3 -m dynamo.vllm --data-parallel-hybrid-lb --data-parallel-size-local 4 --data-parallel-start-rank 0 --data-parallel-address $(LWS_LEADER_ADDRESS) --data-parallel-rpc-port 13445 --data-parallel-size 8"},
description: "LWS shell commands should use LWS variables", description: "LWS shell commands should use LWS variables",
}, },
{ {
...@@ -452,6 +452,9 @@ func TestUpdateVLLMMultinodeArgs(t *testing.T) { ...@@ -452,6 +452,9 @@ func TestUpdateVLLMMultinodeArgs(t *testing.T) {
tensorParallelSizeFlag, commonconsts.VLLMMpMasterPort)}, tensorParallelSizeFlag, commonconsts.VLLMMpMasterPort)},
}, },
{ {
// LWS worker: $(LWS_LEADER_ADDRESS) and $(LWS_WORKER_INDEX) are both
// kubelet-expanded, so flags are appended directly to Args without an
// sh -c wrapper.
name: "worker uses mp (origin version >= threshold) LWS", name: "worker uses mp (origin version >= threshold) LWS",
role: RoleWorker, role: RoleWorker,
multinodeDeployer: &LWSMultinodeDeployer{}, multinodeDeployer: &LWSMultinodeDeployer{},
...@@ -460,9 +463,34 @@ func TestUpdateVLLMMultinodeArgs(t *testing.T) { ...@@ -460,9 +463,34 @@ func TestUpdateVLLMMultinodeArgs(t *testing.T) {
annotations: map[string]string{ annotations: map[string]string{
commonconsts.KubeAnnotationDynamoOperatorOriginVersion: "1.0.0", commonconsts.KubeAnnotationDynamoOperatorOriginVersion: "1.0.0",
}, },
expectedArgs: []string{fmt.Sprintf( expectedArgs: []string{"-m", "dynamo.vllm", tensorParallelSizeFlag, "16", "--distributed-executor-backend", "mp", "--nnodes", "2", "--master-addr", "$(LWS_LEADER_ADDRESS)", "--master-port", commonconsts.VLLMMpMasterPort, "--node-rank", "$(LWS_WORKER_INDEX)", "--headless"},
"exec python3 -m dynamo.vllm %s 16 --distributed-executor-backend mp --nnodes 2 --master-addr $LWS_LEADER_ADDRESS --master-port %s --node-rank $(LWS_WORKER_INDEX) --headless", },
tensorParallelSizeFlag, commonconsts.VLLMMpMasterPort)}, {
// Regression test: LWS leader with direct python command must emit
// Kubernetes $(LWS_LEADER_ADDRESS) syntax so the kubelet expands it
// from the LWS-injected env var. Emitting the bare shell $VAR causes
// vLLM to receive the literal string and fail to resolve the leader.
name: "leader uses mp (origin version >= threshold) LWS",
role: RoleLeader,
multinodeDeployer: &LWSMultinodeDeployer{},
initialContainer: &corev1.Container{Command: []string{"python3"}, Args: []string{"-m", "dynamo.vllm", tensorParallelSizeFlag, "16"}},
gpuCount: 8,
annotations: map[string]string{
commonconsts.KubeAnnotationDynamoOperatorOriginVersion: "1.0.0",
},
expectedArgs: []string{"-m", "dynamo.vllm", tensorParallelSizeFlag, "16", "--distributed-executor-backend", "mp", "--nnodes", "2", "--master-addr", "$(LWS_LEADER_ADDRESS)", "--master-port", commonconsts.VLLMMpMasterPort, "--node-rank", "0"},
},
{
// Regression test: LWS leader on the data-parallel path. Same bug
// class as the MP leader case above - bare $LWS_LEADER_ADDRESS would
// not be expanded by K8s, so we emit $(LWS_LEADER_ADDRESS) instead.
name: "leader with data parallel launch LWS",
role: RoleLeader,
multinodeDeployer: &LWSMultinodeDeployer{},
initialContainer: &corev1.Container{Command: []string{"python3"}, Args: []string{"-m", "dynamo.vllm", dataParallelSizeFlag, "16"}},
gpuCount: 8,
annotations: nil,
expectedArgs: []string{"-m", "dynamo.vllm", dataParallelSizeFlag, "16", "--data-parallel-hybrid-lb", "--data-parallel-size-local", "8", "--data-parallel-start-rank", "0", "--data-parallel-address", "$(LWS_LEADER_ADDRESS)", "--data-parallel-rpc-port", "13445"},
}, },
{ {
name: "leader prepends distributed data parallel flags (annotations don't affect DP path)", name: "leader prepends distributed data parallel flags (annotations don't affect DP path)",
...@@ -516,7 +544,7 @@ func TestUpdateVLLMMultinodeArgs(t *testing.T) { ...@@ -516,7 +544,7 @@ func TestUpdateVLLMMultinodeArgs(t *testing.T) {
initialContainer: &corev1.Container{Args: []string{"python3", "-m", "dynamo.vllm", tensorParallelSizeFlag, "16"}}, initialContainer: &corev1.Container{Args: []string{"python3", "-m", "dynamo.vllm", tensorParallelSizeFlag, "16"}},
gpuCount: 8, gpuCount: 8,
annotations: nil, annotations: nil,
expectedArgs: []string{"ray start --address=$LWS_LEADER_ADDRESS:6379 --block"}, expectedArgs: []string{"ray start --address=$(LWS_LEADER_ADDRESS):6379 --block"},
}, },
{ {
name: "main role does not modify args", name: "main role does not modify args",
...@@ -609,7 +637,7 @@ func TestVLLMBackend_UpdatePodSpec(t *testing.T) { ...@@ -609,7 +637,7 @@ func TestVLLMBackend_UpdatePodSpec(t *testing.T) {
}, },
expectInitContainer: true, expectInitContainer: true,
expectedInitImage: "vllm:v2", expectedInitImage: "vllm:v2",
expectedLeaderHost: "$LWS_LEADER_ADDRESS", expectedLeaderHost: "${LWS_LEADER_ADDRESS}",
}, },
{ {
name: "mp leader does not inject init container", name: "mp leader does not inject init container",
......
...@@ -6,13 +6,26 @@ type LWSMultinodeDeployer struct { ...@@ -6,13 +6,26 @@ type LWSMultinodeDeployer struct {
MultinodeDeployer MultinodeDeployer
} }
// GetLeaderHostname returns the leader address in Kubernetes env-var
// expansion syntax. LWS injects LWS_LEADER_ADDRESS into every pod of a
// LeaderWorkerSet, and the kubelet substitutes $(VAR) references in
// container Args/Command before the container starts, which means the
// same string works whether flags are appended directly to a python
// command or wrapped in sh -c. Returning the bare shell form
// ($LWS_LEADER_ADDRESS) would be passed literally to direct-python
// commands and break distributed init.
func (d *LWSMultinodeDeployer) GetLeaderHostname(serviceName string) string { func (d *LWSMultinodeDeployer) GetLeaderHostname(serviceName string) string {
return "$LWS_LEADER_ADDRESS" return "$(LWS_LEADER_ADDRESS)"
} }
// GetNodeRank returns the current pod's rank within its LWS group in
// Kubernetes env-var expansion syntax. needsShell is false because
// $(LWS_WORKER_INDEX) is substituted by the kubelet in container
// Args/Command before the container starts, so no sh -c wrapper is
// required. This contrasts with Grove, which returns a shell
// arithmetic expression and therefore does need shell interpretation.
func (d *LWSMultinodeDeployer) GetNodeRank() (string, bool) { func (d *LWSMultinodeDeployer) GetNodeRank() (string, bool) {
// This requires shell expansion for variable substitution return "$(LWS_WORKER_INDEX)", false
return "$(LWS_WORKER_INDEX)", true
} }
func (d *LWSMultinodeDeployer) NeedsDNSWait() bool { func (d *LWSMultinodeDeployer) NeedsDNSWait() bool {
...@@ -20,17 +33,34 @@ func (d *LWSMultinodeDeployer) NeedsDNSWait() bool { ...@@ -20,17 +33,34 @@ func (d *LWSMultinodeDeployer) NeedsDNSWait() bool {
return true return true
} }
// GetHostNames returns hostnames for every pod in the LWS group.
//
// The returned slice intentionally mixes two expansion contexts:
//
// - hostnames[0] is the leader address in Kubernetes env-var syntax
// ($(LWS_LEADER_ADDRESS)), which the kubelet substitutes in container
// Args/Command before the container starts.
// - hostnames[1..] are derived from $LWS_LEADER_ADDRESS via a shell
// command substitution ($(echo "$LWS_LEADER_ADDRESS" | sed ...)) because
// LWS does not expose per-worker hostnames as env vars; the index has to
// be spliced into the leader hostname at runtime.
//
// Because worker entries use shell command substitution, callers MUST feed
// the returned values through a shell (e.g. via sh -c / mpirun), not into
// container Args directly - otherwise workers would be handed the literal
// `$(echo ... | sed ...)` string. The current TRT-LLM mpirun launcher is
// the only consumer and already runs inside a shell, so this is safe.
//
// LWS only provides LWS_LEADER_ADDRESS, LWS_GROUP_SIZE, and LWS_WORKER_INDEX.
// LWS_LEADER_ADDRESS format: <lws-name>-<group-index>-<leader-pod-index>.<service-name>.<namespace>
// Example: trtllm-disagg-tp8-decode-0-0.trtllm-disagg-tp8-decode-0.jsm
// Worker pods append their index: trtllm-disagg-tp8-decode-0-0-1, trtllm-disagg-tp8-decode-0-0-2, etc.
// We derive worker addresses by inserting -{i} before the first dot using sed.
func (d *LWSMultinodeDeployer) GetHostNames(serviceName string, numberOfNodes int32) []string { func (d *LWSMultinodeDeployer) GetHostNames(serviceName string, numberOfNodes int32) []string {
hostnames := make([]string, numberOfNodes) hostnames := make([]string, numberOfNodes)
hostnames[0] = d.GetLeaderHostname(serviceName) hostnames[0] = d.GetLeaderHostname(serviceName)
// LWS only provides LWS_LEADER_ADDRESS, LWS_GROUP_SIZE, and LWS_WORKER_INDEX
// LWS_LEADER_ADDRESS format: <lws-name>-<group-index>-<leader-pod-index>.<service-name>.<namespace>
// Example: trtllm-disagg-tp8-decode-0-0.trtllm-disagg-tp8-decode-0.jsm
// Worker pods append their index: trtllm-disagg-tp8-decode-0-0-1, trtllm-disagg-tp8-decode-0-0-2, etc.
// We derive worker addresses by inserting -{i} before the first dot
for i := int32(1); i < numberOfNodes; i++ { for i := int32(1); i < numberOfNodes; i++ {
// Use sed to replace first "." with "-{i}." to append worker index
hostnames[i] = fmt.Sprintf("$(echo \"$LWS_LEADER_ADDRESS\" | sed 's/\\./-%d\\./')", i) hostnames[i] = fmt.Sprintf("$(echo \"$LWS_LEADER_ADDRESS\" | sed 's/\\./-%d\\./')", i)
} }
return hostnames return hostnames
......
...@@ -15,18 +15,29 @@ import ( ...@@ -15,18 +15,29 @@ import (
* into container commands for multinode SGLang deployments. The complexity arises from supporting multiple * into container commands for multinode SGLang deployments. The complexity arises from supporting multiple
* container command patterns and ensuring proper environment variable interpretation. * container command patterns and ensuring proper environment variable interpretation.
* *
* All MultinodeDeployer implementations MUST return Kubernetes env-var
* expansion syntax ("$(VAR)") from GetLeaderHostname / GetNodeRank. The
* kubelet substitutes those references in container Args/Command before the
* container starts, so plain $(VAR) references never require a shell wrapper.
* Shell wrapping (`sh -c`) is only needed for shell-only constructs that the
* kubelet does not evaluate - e.g. arithmetic expansion `$(( ... ))` or
* command substitution - which is signaled by the `needsShell` bool returned
* from GetNodeRank (Grove's `$((GROVE_PCLQ_POD_INDEX + 1))` is the canonical
* example).
*
* Two main scenarios are handled: * Two main scenarios are handled:
* *
* 1. Direct Python Command (e.g., Command: ["python3"], Args: ["-m", "sglang", "..."]) * 1. Direct Python Command (e.g., Command: ["python3"], Args: ["-m", "sglang", "..."])
* - If shell interpretation is needed (for env vars): Wrap in "sh -c" with exec * - If needsShell is true (shell-only expression such as arithmetic): wrap
* - If no shell needed: Simply append flags to the Args array * the command in "sh -c" with exec so the shell evaluates the expression.
* - Otherwise: simply append flags to the Args array; the kubelet expands
* any $(VAR) references itself.
* *
* 2. Non-Python Command (e.g., Command: ["sh"], Args: ["-c", "python3 -m sglang ..."]) * 2. Non-Python Command (e.g., Command: ["sh"], Args: ["-c", "python3 -m sglang ..."])
* - Use regex-based injection to find embedded Python+SGLang commands within args * - Use regex-based injection to find embedded Python+SGLang commands within args
* - Insert flags after the Python command but before any shell operators (|, &, ;) * - Insert flags after the Python command but before any shell operators (|, &, ;)
*
* The needsShell flag indicates when environment variables require shell interpretation
*/ */
// shellQuoteForBashC quotes a string so it survives shell interpretation inside sh -c. // shellQuoteForBashC quotes a string so it survives shell interpretation inside sh -c.
// Simple args (flags, paths) pass through unchanged; args containing special characters // Simple args (flags, paths) pass through unchanged; args containing special characters
// (JSON, env vars, spaces, quotes) are wrapped in double quotes with inner escaping. // (JSON, env vars, spaces, quotes) are wrapped in double quotes with inner escaping.
...@@ -69,7 +80,6 @@ func injectFlagsIntoContainerCommand(container *corev1.Container, flags string, ...@@ -69,7 +80,6 @@ func injectFlagsIntoContainerCommand(container *corev1.Container, flags string,
container.Command = []string{"sh", "-c"} container.Command = []string{"sh", "-c"}
container.Args = []string{shellCommand} container.Args = []string{shellCommand}
} else { } else {
// Simple append to args
flagsSlice := strings.Fields(flags) flagsSlice := strings.Fields(flags)
container.Args = append(container.Args, flagsSlice...) container.Args = append(container.Args, flagsSlice...)
} }
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment