"...git@developer.sourcefind.cn:2222/OpenDAS/vllm_cscc.git" did not exist on "80141bbf2f1b8b0beaac097f94923f95773734ef"
Unverified Commit c4964f55 authored by Tzu-Ling Kan's avatar Tzu-Ling Kan Committed by GitHub
Browse files

feat(vllm): multinode elastic EP support via Ray cluster + concurrent scale fix (#8216)


Signed-off-by: default avatarTzu-Ling <tzulingk@nvidia.com>
parent e041ccfc
...@@ -414,6 +414,16 @@ class BaseWorkerHandler(ABC, Generic[RequestT, ResponseT]): ...@@ -414,6 +414,16 @@ class BaseWorkerHandler(ABC, Generic[RequestT, ResponseT]):
self.dp_range = get_dp_range_for_worker(self.engine_client.vllm_config) self.dp_range = get_dp_range_for_worker(self.engine_client.vllm_config)
self._quiesce_controller = VllmEngineQuiesceController(self.engine_client) self._quiesce_controller = VllmEngineQuiesceController(self.engine_client)
self._quiesce_lock = asyncio.Lock() self._quiesce_lock = asyncio.Lock()
# Serialise concurrent scale_elastic_ep calls. vLLM's elastic-EP
# bootstrap creates a fresh TCPStore per scale operation and stores it
# in engine_client._coord_store. When two callers race through
# _setup_elastic_ep_reconfig_bootstrap concurrently the first caller's
# store gets garbage-collected before the new Ray actor has had a chance
# to connect, causing a 300 s TCPStore timeout on the worker node.
# One handler is created per worker process (worker_factory.py), so all
# concurrent HTTP callers share this lock and only one scale operation
# can mutate _coord_store at a time.
self._scale_ep_lock = asyncio.Lock()
# Initialize InputParamManager for text-in-text-out mode # Initialize InputParamManager for text-in-text-out mode
tokenizer = None tokenizer = None
...@@ -546,52 +556,78 @@ class BaseWorkerHandler(ABC, Generic[RequestT, ResponseT]): ...@@ -546,52 +556,78 @@ class BaseWorkerHandler(ABC, Generic[RequestT, ResponseT]):
"status": "error", "status": "error",
"message": f"new_data_parallel_size must be an integer, got: {new_dp_size!r}", "message": f"new_data_parallel_size must be an integer, got: {new_dp_size!r}",
} }
if new_dp_size < 2:
logger.info(f"[ElasticEP] Scaling to new_data_parallel_size={new_dp_size}")
try:
# TODO(upstream-vllm): remove this patch once vLLM fixes
# add_dp_placement_groups in vllm/v1/engine/utils.py to use ray.nodes()
# instead of ray.util.state.list_nodes().
#
# Patch ray.util.state.list_nodes to use the GCS API instead of the
# dashboard HTTP API (127.0.0.1:8265/api/v0/nodes). The dynamo image
# installs ray core only (not ray[default]), so the dashboard HTTP server
# starts in --minimal mode with the HTTP server disabled. vLLM's
# add_dp_placement_groups calls list_nodes() which requires that HTTP
# endpoint, causing scale_elastic_ep to fail with "Failed to connect to
# API server".
#
# ray.nodes() uses the GCS gRPC channel directly (no dashboard process
# needed) and returns the same information. Imported lazily so ray is not
# required at module load time (absent in non-elastic-EP deployments).
#
# Format mapping:
# list_nodes() → objects with .node_ip and .node_id
# ray.nodes() → dicts with "NodeManagerAddress" and "NodeID"
import ray
import ray.util.state as _ray_util_state
class _NodeInfo:
__slots__ = ("node_id", "node_ip")
def __init__(self, d: dict) -> None:
self.node_ip: str = d["NodeManagerAddress"]
self.node_id: str = d["NodeID"]
_ray_util_state.list_nodes = lambda **kw: [
_NodeInfo(n) for n in ray.nodes() if n.get("Alive", False)
]
await self.engine_client.scale_elastic_ep(new_dp_size)
logger.info(f"[ElasticEP] Scaling to dp={new_dp_size} complete")
return { return {
"status": "ok", "status": "error",
"message": f"Scaled to data_parallel_size={new_dp_size}", "message": (
"new_data_parallel_size": new_dp_size, "new_data_parallel_size must be >= 2 when elastic EP/ePLB is enabled"
),
} }
except Exception as e:
logger.error(f"[ElasticEP] Scaling failed: {e}") logger.info(f"[ElasticEP] Scaling to new_data_parallel_size={new_dp_size}")
return {"status": "error", "message": str(e)}
# Early-reject if another scale is already in progress rather than
# queuing behind it: a queued caller would garbage-collect the first
# caller's TCPStore before its Ray actor connects, causing a 300 s
# timeout on the worker node.
# The locked() check followed immediately by async with is safe:
# asyncio.Lock.acquire() only suspends (yields to the event loop) when
# the lock is already held. When locked() returns False the lock is
# free, so acquire() completes synchronously — no other coroutine can
# run between the check and the acquisition.
if self._scale_ep_lock.locked():
msg = (
f"A scale_elastic_ep operation is already in progress; "
f"rejecting concurrent request for new_data_parallel_size={new_dp_size}"
)
logger.warning(f"[ElasticEP] {msg}")
return {"status": "error", "message": msg}
async with self._scale_ep_lock:
try:
# TODO(upstream-vllm): remove this patch once vLLM fixes
# add_dp_placement_groups in vllm/v1/engine/utils.py to use ray.nodes()
# instead of ray.util.state.list_nodes().
#
# Patch ray.util.state.list_nodes to use the GCS API instead of the
# dashboard HTTP API (127.0.0.1:8265/api/v0/nodes). The dynamo image
# installs ray core only (not ray[default]), so the dashboard HTTP server
# starts in --minimal mode with the HTTP server disabled. vLLM's
# add_dp_placement_groups calls list_nodes() which requires that HTTP
# endpoint, causing scale_elastic_ep to fail with "Failed to connect to
# API server".
#
# ray.nodes() uses the GCS gRPC channel directly (no dashboard process
# needed) and returns the same information. Imported lazily so ray is not
# required at module load time (absent in non-elastic-EP deployments).
#
# Format mapping:
# list_nodes() → objects with .node_ip and .node_id
# ray.nodes() → dicts with "NodeManagerAddress" and "NodeID"
import ray
import ray.util.state as _ray_util_state
class _NodeInfo:
__slots__ = ("node_id", "node_ip")
def __init__(self, d: dict) -> None:
self.node_ip: str = d["NodeManagerAddress"]
self.node_id: str = d["NodeID"]
_ray_util_state.list_nodes = lambda **kw: [
_NodeInfo(n) for n in ray.nodes() if n.get("Alive", False)
]
await self.engine_client.scale_elastic_ep(new_dp_size)
logger.info(f"[ElasticEP] Scaling to dp={new_dp_size} complete")
return {
"status": "ok",
"message": f"Scaled to data_parallel_size={new_dp_size}",
"new_data_parallel_size": new_dp_size,
}
except Exception as e:
logger.error(f"[ElasticEP] Scaling failed: {e}")
return {"status": "error", "message": str(e)}
async def wake_up(self, body: dict) -> dict: async def wake_up(self, body: dict) -> dict:
"""Wake the engine to restore GPU memory and re-register to discovery. """Wake the engine to restore GPU memory and re-register to discovery.
......
...@@ -15,11 +15,13 @@ import ( ...@@ -15,11 +15,13 @@ import (
) )
const ( const (
VLLMPort = "6379" VLLMPort = "6379"
dataParallelRPCPort = "13445" dataParallelRPCPort = "13445"
tensorParallelSizeFlag = "--tensor-parallel-size" tensorParallelSizeFlag = "--tensor-parallel-size"
pipelineParallelSizeFlag = "--pipeline-parallel-size" pipelineParallelSizeFlag = "--pipeline-parallel-size"
dataParallelSizeFlag = "--data-parallel-size" dataParallelSizeFlag = "--data-parallel-size"
dataParallelSizeLocalFlag = "--data-parallel-size-local"
enableElasticEPFlag = "--enable-elastic-ep"
) )
type VLLMBackend struct { type VLLMBackend struct {
...@@ -200,6 +202,17 @@ func (b *VLLMBackend) UpdatePodSpec(podSpec *corev1.PodSpec, numberOfNodes int32 ...@@ -200,6 +202,17 @@ func (b *VLLMBackend) UpdatePodSpec(podSpec *corev1.PodSpec, numberOfNodes int32
return return
} }
// Elastic EP workers use a Ray cluster, not the MP coordinator. The worker
// command (injected by injectElasticEPRayLaunchFlags) already contains an
// inline health gate that polls DynamoSystemPort (9090) before joining Ray.
// The MP init container waits on VLLMMpMasterPort (29500), which never opens
// in the elastic EP path — injecting it would cause the worker to hang forever.
if component.ExtraPodSpec != nil && component.ExtraPodSpec.MainContainer != nil {
if hasFlag(getExpandedArgs(component.ExtraPodSpec.MainContainer), enableElasticEPFlag) {
return
}
}
if len(podSpec.Containers) == 0 || b.ParentGraphDeploymentName == "" { if len(podSpec.Containers) == 0 || b.ParentGraphDeploymentName == "" {
return return
} }
...@@ -253,6 +266,18 @@ func updateVLLMMultinodeArgs(container *corev1.Container, role Role, serviceName ...@@ -253,6 +266,18 @@ func updateVLLMMultinodeArgs(container *corev1.Container, role Role, serviceName
injectMpDistributedLaunchFlags(container, role, serviceName, multinodeDeployer, numberOfNodes) injectMpDistributedLaunchFlags(container, role, serviceName, multinodeDeployer, numberOfNodes)
} else if needsDistributed { } else if needsDistributed {
injectRayDistributedLaunchFlags(container, role, serviceName, multinodeDeployer) injectRayDistributedLaunchFlags(container, role, serviceName, multinodeDeployer)
} else if hasFlag(expandedArgs, enableElasticEPFlag) {
// Elastic EP requires a single Ray cluster spanning all nodes.
// The operator's RPC-based DP coordination (--data-parallel-hybrid-lb) is
// explicitly incompatible with elastic EP — vLLM raises NotImplementedError
// if both are present. Instead we set up a cross-node Ray cluster:
// Leader: ray start --head --block & <tcp-poll-ray-ready> && <vllm cmd>
// Worker: <poll /live until 200> && ray start --address=<leader>:6379 --block
// Note: --data-parallel-size-local is intentionally NOT injected. With the
// worker's health-gate delaying its Ray join until dynamo.vllm is fully ready,
// only the leader node is in the Ray cluster when create_dp_placement_groups runs,
// so vLLM naturally places all initial DP workers on the leader node.
injectElasticEPRayLaunchFlags(container, role, serviceName, multinodeDeployer)
} else if needsDataParallelMultinodeLaunch(expandedArgs, resources) { } else if needsDataParallelMultinodeLaunch(expandedArgs, resources) {
injectDataParallelLaunchFlags(container, role, serviceName, multinodeDeployer, resources, numberOfNodes) injectDataParallelLaunchFlags(container, role, serviceName, multinodeDeployer, resources, numberOfNodes)
} else { } else {
...@@ -349,6 +374,97 @@ func injectRayDistributedLaunchFlags(container *corev1.Container, role Role, ser ...@@ -349,6 +374,97 @@ func injectRayDistributedLaunchFlags(container *corev1.Container, role Role, ser
container.Command = []string{"/bin/sh", "-c"} // ensure cmd is a shell container.Command = []string{"/bin/sh", "-c"} // ensure cmd is a shell
} }
// injectElasticEPRayLaunchFlags sets up a cross-node Ray cluster for elastic EP.
//
// Elastic EP requires --data-parallel-backend ray so that vLLM's Ray executor
// manages dynamic worker lifecycle. It is explicitly incompatible with
// --data-parallel-hybrid-lb (the operator's normal multinode DP path), because
// elastic EP needs a single API server and core client to coordinate scale up/down.
//
// We reuse the Ray TP/PP topology: leader starts the Ray head and runs vLLM,
// workers join the Ray cluster and expose their GPUs as idle resources.
//
// Worker health-gate: the worker deliberately waits until the leader's /live
// endpoint (DynamoSystemPort 9090) returns HTTP 200 before joining Ray. This is
// critical for correct DP placement:
// - Port 9090 (system status server) opens EARLY in vLLM startup, before
// create_dp_placement_groups runs.
// - GET /live returns 503 during initialization and 200 only after the engine
// is fully ready (create_dp_placement_groups done, model loaded).
// - If the worker joins Ray before /live → 200, vLLM's create_dp_placement_groups
// sees all cluster GPUs (leader + worker) and creates too many placement groups,
// causing: "AssertionError: Created N DP placement groups, expected dp_size".
// - Waiting for HTTP 200 ensures the worker joins AFTER placement groups are
// set, so the leader's GPUs hold all initial DP workers (warm standby).
//
// Note: --data-parallel-size-local is intentionally NOT injected. With the
// health-gate ensuring only the leader is in Ray at vLLM startup, vLLM
// naturally places all --data-parallel-size workers on the leader node.
//
// Leader: ray start --head --port=6379 --block & <tcp-poll-ray-ready 150×2s> && <vllm cmd>
// Worker: <poll /live HTTP until 200> && ray start --address=<leader>:6379 --block
func injectElasticEPRayLaunchFlags(container *corev1.Container, role Role, serviceName string, multinodeDeployer MultinodeDeployer) {
switch role {
case RoleLeader:
quotedCmd := make([]string, len(container.Command))
for i, tok := range container.Command {
quotedCmd[i] = shellQuoteForBashC(tok)
}
quotedArgs := make([]string, len(container.Args))
for i, arg := range container.Args {
quotedArgs[i] = shellQuoteForBashC(arg)
}
// Poll Ray head readiness with a bounded retry loop (150 × 2 s = 5 min max).
// An unbounded `until` loop would spin forever if `ray start --head` crashes
// silently or the port never opens.
container.Args = []string{fmt.Sprintf(
`ray start --head --port=%s --block & `+
`i=0; until python3 -c "import socket; s=socket.create_connection(('127.0.0.1',%s),timeout=1); s.close()" 2>/dev/null; `+
`do i=$((i+1)); [ "$i" -ge 150 ] && { echo "ERROR: Ray head did not start within 300s" >&2; exit 1; }; sleep 2; done && %s %s`,
VLLMPort,
VLLMPort,
strings.Join(quotedCmd, " "),
strings.Join(quotedArgs, " "),
)}
case RoleWorker:
leaderHostname := multinodeDeployer.GetLeaderHostname(serviceName)
// Health-gate: poll GET /live on DynamoSystemPort (9090) until HTTP 200.
// /live returns 503 during vLLM initialization and 200 when the engine is
// fully ready. This ensures the worker joins Ray AFTER create_dp_placement_groups
// has run (which requires only the leader's GPUs to be in the cluster).
// Uses Python's urllib (always available) instead of curl.
// Prerequisite: DYN_SYSTEM_ENABLED=true must be set on the leader pod so
// that the Dynamo system server listens on port 9090. The operator injects
// this env var unconditionally via component_worker.go.
// Bounded at 720 × 15s = 3 hours to cover large models with slow disk I/O.
// Without a bound, a permanently broken leader leaves the worker looping
// forever with no Kubernetes liveness probe to detect it (probes are removed
// at the UpdatePodSpec level for elastic EP workers).
healthGate := fmt.Sprintf(
`i=0; until python3 -c "import urllib.request; urllib.request.urlopen('http://%s:%d/live', timeout=5)" `+
`2>/dev/null; do `+
`i=$((i+1)); [ "$i" -ge 720 ] && { echo "ERROR: leader /live did not become ready within 3h" >&2; exit 1; }; `+
`echo 'waiting for leader dynamo.vllm /live to return 200...'; sleep 15; done`,
leaderHostname, commonconsts.DynamoSystemPort,
)
container.Args = []string{fmt.Sprintf(
"%s && ray start --address=%s:%s --block",
healthGate, leaderHostname, VLLMPort,
)}
}
container.Command = []string{"/bin/sh", "-c"}
}
// hasFlag returns true if flag exists in expandedArgs.
func hasFlag(expandedArgs []string, flag string) bool {
for _, arg := range expandedArgs {
if arg == flag {
return true
}
}
return false
}
func injectDataParallelLaunchFlags(container *corev1.Container, role Role, serviceName string, multinodeDeployer MultinodeDeployer, resources *v1alpha1.Resources, numberOfNodes int32) { func injectDataParallelLaunchFlags(container *corev1.Container, role Role, serviceName string, multinodeDeployer MultinodeDeployer, resources *v1alpha1.Resources, numberOfNodes int32) {
expandedArgs := getExpandedArgs(container) expandedArgs := getExpandedArgs(container)
leaderHostname := multinodeDeployer.GetLeaderHostname(serviceName) leaderHostname := multinodeDeployer.GetLeaderHostname(serviceName)
...@@ -367,27 +483,17 @@ func injectDataParallelLaunchFlags(container *corev1.Container, role Role, servi ...@@ -367,27 +483,17 @@ func injectDataParallelLaunchFlags(container *corev1.Container, role Role, servi
var flags []string var flags []string
needsShell := false needsShell := false
// Helper to check if flag already exists in args
hasFlag := func(flag string) bool {
for _, arg := range expandedArgs {
if arg == flag {
return true
}
}
return false
}
switch role { switch role {
case RoleLeader: case RoleLeader:
// Leader runs API server + coordinator + local engines // Leader runs API server + coordinator + local engines
// Hybrid LB mode: local DP coordination within node, Dynamo routes between nodes // Hybrid LB mode: local DP coordination within node, Dynamo routes between nodes
flags = []string{"--data-parallel-hybrid-lb"} flags = []string{"--data-parallel-hybrid-lb"}
// Only inject --data-parallel-size if not already present (avoids duplicates from profiler) // Only inject --data-parallel-size if not already present (avoids duplicates from profiler)
if !hasFlag("--data-parallel-size") { if !hasFlag(expandedArgs, dataParallelSizeFlag) {
flags = append(flags, "--data-parallel-size", strconv.FormatInt(totalDPSize, 10)) flags = append(flags, dataParallelSizeFlag, strconv.FormatInt(totalDPSize, 10))
} }
flags = append(flags, flags = append(flags,
"--data-parallel-size-local", strconv.FormatInt(dataParallelSizeLocal, 10), dataParallelSizeLocalFlag, strconv.FormatInt(dataParallelSizeLocal, 10),
"--data-parallel-start-rank", "0", "--data-parallel-start-rank", "0",
"--data-parallel-address", leaderHostname, "--data-parallel-address", leaderHostname,
"--data-parallel-rpc-port", dataParallelRPCPort, "--data-parallel-rpc-port", dataParallelRPCPort,
...@@ -402,11 +508,11 @@ func injectDataParallelLaunchFlags(container *corev1.Container, role Role, servi ...@@ -402,11 +508,11 @@ func injectDataParallelLaunchFlags(container *corev1.Container, role Role, servi
flags = []string{"--data-parallel-hybrid-lb"} flags = []string{"--data-parallel-hybrid-lb"}
// Only inject --data-parallel-size if not already present (avoids duplicates from profiler) // Only inject --data-parallel-size if not already present (avoids duplicates from profiler)
if !hasFlag("--data-parallel-size") { if !hasFlag(expandedArgs, dataParallelSizeFlag) {
flags = append(flags, "--data-parallel-size", strconv.FormatInt(totalDPSize, 10)) flags = append(flags, dataParallelSizeFlag, strconv.FormatInt(totalDPSize, 10))
} }
flags = append(flags, flags = append(flags,
"--data-parallel-size-local", strconv.FormatInt(dataParallelSizeLocal, 10), dataParallelSizeLocalFlag, strconv.FormatInt(dataParallelSizeLocal, 10),
"--data-parallel-start-rank", startRank, "--data-parallel-start-rank", startRank,
"--data-parallel-address", leaderHostname, "--data-parallel-address", leaderHostname,
"--data-parallel-rpc-port", dataParallelRPCPort, "--data-parallel-rpc-port", dataParallelRPCPort,
......
...@@ -417,6 +417,7 @@ func TestUpdateVLLMMultinodeArgs(t *testing.T) { ...@@ -417,6 +417,7 @@ func TestUpdateVLLMMultinodeArgs(t *testing.T) {
annotations map[string]string // nil = legacy (no annotations) annotations map[string]string // nil = legacy (no annotations)
expectedArgs []string expectedArgs []string
expectNotModified bool expectNotModified bool
description string
}{ }{
{ {
name: "leader uses ray (nil annotations = legacy)", name: "leader uses ray (nil annotations = legacy)",
...@@ -555,6 +556,85 @@ func TestUpdateVLLMMultinodeArgs(t *testing.T) { ...@@ -555,6 +556,85 @@ func TestUpdateVLLMMultinodeArgs(t *testing.T) {
annotations: nil, annotations: nil,
expectNotModified: true, expectNotModified: true,
}, },
// Elastic EP tests: --enable-elastic-ep must use Ray cluster path,
// never the --data-parallel-hybrid-lb RPC path.
//
// Leader: ray start --head --port=6379 --block & <tcp-poll-ray-ready 150×2s> && <vllm cmd> (no --data-parallel-size-local injected)
// Worker: health-gate on DynamoSystemPort (9090) && ray start --address=<leader> --block
//
// The health-gate ensures the worker only joins Ray after dynamo.vllm is fully
// serving, so create_dp_placement_groups sees only the leader node and places all
// initial DP workers there (warm standby).
{
name: "elastic EP leader Grove: ray head start + vllm serve",
role: RoleLeader,
multinodeDeployer: &GroveMultinodeDeployer{},
initialContainer: &corev1.Container{
Command: []string{"python3", "-m", "dynamo.vllm"},
Args: []string{"--model", "test", dataParallelSizeFlag, "4", "--data-parallel-backend", "ray", enableElasticEPFlag},
},
gpuCount: 2,
annotations: nil,
expectedArgs: []string{fmt.Sprintf(
`ray start --head --port=%s --block & i=0; until python3 -c "import socket; s=socket.create_connection(('127.0.0.1',%s),timeout=1); s.close()" 2>/dev/null; do i=$((i+1)); [ "$i" -ge 150 ] && { echo "ERROR: Ray head did not start within 300s" >&2; exit 1; }; sleep 2; done && python3 -m dynamo.vllm --model test %s 4 --data-parallel-backend ray %s`,
VLLMPort, VLLMPort, dataParallelSizeFlag, enableElasticEPFlag,
)},
description: "Operator prepends ray head start and TCP readiness poll; --data-parallel-hybrid-lb and --data-parallel-size-local are NOT injected (elastic EP uses Ray for GPU assignment, not the RPC path)",
},
{
name: "elastic EP worker Grove: health-gate then ray join",
role: RoleWorker,
multinodeDeployer: &GroveMultinodeDeployer{},
initialContainer: &corev1.Container{
Command: []string{"python3", "-m", "dynamo.vllm"},
Args: []string{"--model", "test", dataParallelSizeFlag, "4", "--data-parallel-backend", "ray", enableElasticEPFlag},
},
gpuCount: 2,
annotations: nil,
expectedArgs: []string{fmt.Sprintf(
`i=0; until python3 -c "import urllib.request; urllib.request.urlopen('http://%s:%d/live', timeout=5)" 2>/dev/null; do i=$((i+1)); [ "$i" -ge 720 ] && { echo "ERROR: leader /live did not become ready within 3h" >&2; exit 1; }; echo 'waiting for leader dynamo.vllm /live to return 200...'; sleep 15; done && ray start --address=%s:%s --block`,
"$(GROVE_PCSG_NAME)-$(GROVE_PCSG_INDEX)-test-service-ldr-0.$(GROVE_HEADLESS_SERVICE)",
commonconsts.DynamoSystemPort,
"$(GROVE_PCSG_NAME)-$(GROVE_PCSG_INDEX)-test-service-ldr-0.$(GROVE_HEADLESS_SERVICE)",
VLLMPort,
)},
description: "Operator replaces the entire command with a /live HTTP health-gate then ray join; vllm does NOT run on the worker (worker provides idle GPUs for warm standby, claimed on scale-up)",
},
{
name: "elastic EP leader Grove: user-specified --data-parallel-size-local preserved",
role: RoleLeader,
multinodeDeployer: &GroveMultinodeDeployer{},
initialContainer: &corev1.Container{
Command: []string{"python3", "-m", "dynamo.vllm"},
Args: []string{"--model", "test", dataParallelSizeFlag, "4", "--data-parallel-backend", "ray", enableElasticEPFlag, dataParallelSizeLocalFlag, "2"},
},
gpuCount: 2,
annotations: nil,
expectedArgs: []string{fmt.Sprintf(
`ray start --head --port=%s --block & i=0; until python3 -c "import socket; s=socket.create_connection(('127.0.0.1',%s),timeout=1); s.close()" 2>/dev/null; do i=$((i+1)); [ "$i" -ge 150 ] && { echo "ERROR: Ray head did not start within 300s" >&2; exit 1; }; sleep 2; done && python3 -m dynamo.vllm --model test %s 4 --data-parallel-backend ray %s %s 2`,
VLLMPort, VLLMPort, dataParallelSizeFlag, enableElasticEPFlag, dataParallelSizeLocalFlag,
)},
description: "Operator prepends ray head start but does not override a user-specified --data-parallel-size-local",
},
{
name: "elastic EP worker LWS: health-gate then ray join",
role: RoleWorker,
multinodeDeployer: &LWSMultinodeDeployer{},
initialContainer: &corev1.Container{
Command: []string{"python3", "-m", "dynamo.vllm"},
Args: []string{"--model", "test", dataParallelSizeFlag, "4", "--data-parallel-backend", "ray", enableElasticEPFlag},
},
gpuCount: 2,
annotations: nil,
expectedArgs: []string{fmt.Sprintf(
`i=0; until python3 -c "import urllib.request; urllib.request.urlopen('http://%s:%d/live', timeout=5)" 2>/dev/null; do i=$((i+1)); [ "$i" -ge 720 ] && { echo "ERROR: leader /live did not become ready within 3h" >&2; exit 1; }; echo 'waiting for leader dynamo.vllm /live to return 200...'; sleep 15; done && ray start --address=%s:%s --block`,
"$(LWS_LEADER_ADDRESS)",
commonconsts.DynamoSystemPort,
"$(LWS_LEADER_ADDRESS)",
VLLMPort,
)},
description: "Same as Grove worker but uses $(LWS_LEADER_ADDRESS) (kubelet-expanded) instead of the Grove-specific DNS address",
},
} }
for _, tt := range tests { for _, tt := range tests {
...@@ -708,6 +788,56 @@ func TestVLLMBackend_UpdatePodSpec(t *testing.T) { ...@@ -708,6 +788,56 @@ func TestVLLMBackend_UpdatePodSpec(t *testing.T) {
expectedInitImage: "vllm:latest", expectedInitImage: "vllm:latest",
expectedLeaderHost: "${GROVE_PCSG_NAME}-${GROVE_PCSG_INDEX}-test-service-ldr-0.${GROVE_HEADLESS_SERVICE}", expectedLeaderHost: "${GROVE_PCSG_NAME}-${GROVE_PCSG_INDEX}-test-service-ldr-0.${GROVE_HEADLESS_SERVICE}",
}, },
// Elastic EP regression: UpdatePodSpec must NOT inject the MP init container
// when --enable-elastic-ep is present in ExtraPodSpec.MainContainer. The elastic
// EP path uses a Ray cluster, not the MP coordinator; the MP init container waits
// on VLLMMpMasterPort (29500) which never opens in the elastic EP path.
{
name: "elastic EP worker Grove: no MP init container injected",
numberOfNodes: 2,
role: RoleWorker,
component: &v1alpha1.DynamoComponentDeploymentSharedSpec{
Annotations: map[string]string{
commonconsts.KubeAnnotationDynamoOperatorOriginVersion: "1.0.0",
},
ExtraPodSpec: &v1alpha1.ExtraPodSpec{
MainContainer: &corev1.Container{
Command: []string{"python3", "-m", "dynamo.vllm"},
Args: []string{"--model", "test", dataParallelSizeFlag, "4", "--data-parallel-backend", "ray", enableElasticEPFlag},
},
},
},
multinodeDeployer: &GroveMultinodeDeployer{},
initialPodSpec: &corev1.PodSpec{
Containers: []corev1.Container{
{Name: "main", Image: "vllm:latest"},
},
},
expectInitContainer: false,
},
{
name: "elastic EP worker LWS: no MP init container injected",
numberOfNodes: 2,
role: RoleWorker,
component: &v1alpha1.DynamoComponentDeploymentSharedSpec{
Annotations: map[string]string{
commonconsts.KubeAnnotationDynamoOperatorOriginVersion: "1.0.0",
},
ExtraPodSpec: &v1alpha1.ExtraPodSpec{
MainContainer: &corev1.Container{
Command: []string{"python3", "-m", "dynamo.vllm"},
Args: []string{"--model", "test", dataParallelSizeFlag, "4", "--data-parallel-backend", "ray", enableElasticEPFlag},
},
},
},
multinodeDeployer: &LWSMultinodeDeployer{},
initialPodSpec: &corev1.PodSpec{
Containers: []corev1.Container{
{Name: "main", Image: "vllm:v2"},
},
},
expectInitContainer: false,
},
} }
for _, tt := range tests { for _, tt := range tests {
......
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Multi-Node Elastic EP Scaling Deployment — 3 GPUs per node (6 total)
#
# Warm-standby topology: leader node has 2 active DP ranks; worker node's 3 GPUs
# sit idle in the shared Ray cluster, ready to be claimed on scale-up.
#
# vLLM constraint: --enable-elastic-ep requires --enable-eplb, and --enable-eplb
# requires dp>=2 (or tp>=2). dp=1 is not a valid starting point; scale DOWN to
# dp=1 crashes vLLM's _eplb_reshuffle_before_scale_down — avoid dp=1 at runtime.
#
# Topology:
# - 2 nodes × 3 GPUs each = 6 GPUs total in one Ray cluster
# - Leader pod: ray start --head && dynamo.vllm ... --data-parallel-size 2
# - Worker pod: polls leader port 9090 until dynamo.vllm is ready, then
# ray start --address=<leader>:6379 --block (no vLLM, GPUs idle)
# - scale_elastic_ep endpoint: leader pod port 9090 only
#
# Scale sequence (run_elastic_ep_scale_test_multinode_3gpu.sh):
# dp=2 → dp=3 → dp=4 → dp=5 → dp=6 → dp=5 → dp=4 → dp=3 → dp=2 → dp=4 → dp=6 → dp=2
#
# Node placement per step:
# dp=2: leader GPU 0+1 active ← baseline (2 leader GPUs)
# dp=3: leader GPU 0+1+2 ← all leader GPUs
# dp=4: leader GPU 0+1+2, worker GPU 0 ← first cross-node actor
# dp=5: leader GPU 0+1+2, worker GPU 0+1
# dp=6: leader GPU 0+1+2, worker GPU 0+1+2 ← full capacity
apiVersion: nvidia.com/v1alpha1
kind: DynamoGraphDeployment
metadata:
name: ep-mn
spec:
services:
Frontend:
componentType: frontend
replicas: 1
extraPodSpec:
nodeSelector:
karpenter.sh/nodepool: general-medium-storage
mainContainer:
image: ${IMAGE}
VllmDecodeWorker:
envFromSecret: hf-token-secret
componentType: worker
replicas: 1
multinode:
nodeCount: 2
resources:
requests:
memory: "90Gi"
gpu: "3"
limits:
memory: "180Gi"
gpu: "3"
envs:
- name: DYN_SYSTEM_ENABLED
value: "true"
- name: VLLM_ALL2ALL_BACKEND
value: "allgather_reducescatter"
- name: VLLM_USE_ELASTIC_EP
value: "1"
- name: VLLM_USE_V1
value: "1"
- name: VLLM_WORKER_MULTIPROC_METHOD
value: "spawn"
- name: VLLM_LOGGING_LEVEL
value: "INFO"
- name: HF_HUB_ENABLE_HF_TRANSFER
value: "1"
- name: HF_HOME
value: "/model-cache"
- name: NCCL_IB_DISABLE
value: "1"
- name: NCCL_SOCKET_IFNAME
value: "eth0"
extraPodSpec:
imagePullSecrets:
- name: nvcr-imagepullsecret
volumes:
- name: model-cache
persistentVolumeClaim:
claimName: model-cache
mainContainer:
image: ${IMAGE}
imagePullPolicy: Always
workingDir: /workspace/examples/backends/vllm
volumeMounts:
- name: model-cache
mountPath: /model-cache
command:
- python3
- -m
- dynamo.vllm
args:
- --model
- deepseek-ai/DeepSeek-V2-Lite
- --trust-remote-code
- --tensor-parallel-size
- "1"
- --data-parallel-size
- "2"
- --data-parallel-backend
- ray
- --gpu-memory-utilization
- "0.8"
- --max-model-len
- "4096"
- --enable-expert-parallel
- --enable-elastic-ep
- --enable-eplb
- --eplb-config.num_redundant_experts
- "0"
- --no-enable-prefix-caching
- --enforce-eager
#!/bin/bash
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Multi-Node Elastic EP Scale Test — 3 GPU/node (6 GPUs total)
#
# Warm-standby topology:
# - 2 nodes × 3 GPUs each = 6 GPUs total
# - Baseline dp=2: 2 leader GPUs active, 4 GPUs idle (1 leader + 3 worker)
#
# Note: --enable-elastic-ep requires --enable-eplb, and --enable-eplb requires
# dp>=2 at startup. Scale DOWN to dp=1 crashes vLLM's _eplb_reshuffle_before_scale_down
# (vLLM bug) — dp=1 is intentionally excluded from this test sequence.
#
# Scale sequence:
# dp=2 → dp=3 → dp=4 → dp=5 → dp=6 → dp=5 → dp=4 → dp=3 → dp=2 → dp=4 → dp=6 → dp=2
#
# Node placement per step:
# dp=2: leader GPU 0+1 ← baseline
# dp=3: leader GPU 0+1+2 ← all leader GPUs used
# dp=4: leader GPU 0+1+2, worker GPU 0 ← first cross-node actor
# dp=5: leader GPU 0+1+2, worker GPU 0+1
# dp=6: leader GPU 0+1+2, worker GPU 0+1+2 ← full capacity
#
# nvidia-smi memory usage is captured from BOTH pods after every scale step so
# we can observe which node's GPUs become active as ranks are added or removed.
#
# Usage:
# ./run_elastic_ep_scale_test_multinode_3gpu.sh [NAMESPACE] [DEPLOYMENT_NAME]
#
# Defaults:
# NAMESPACE = tzulingk-multinode-elastic
# DEPLOYMENT_NAME = ep-mn
set -uo pipefail
NS="${1:-tzulingk-multinode-elastic}"
DEPLOYMENT_NAME="${2:-ep-mn}"
MODEL="deepseek-ai/DeepSeek-V2-Lite"
echo "Namespace: $NS"
echo "Deployment: $DEPLOYMENT_NAME"
echo "Model: $MODEL"
echo ""
# ── Pod lookup helpers ────────────────────────────────────────────────────────
# All running VllmDecodeWorker pods (both nodes)
all_worker_pods() {
kubectl get pods -n "$NS" \
-l "nvidia.com/dynamo-component=VllmDecodeWorker,nvidia.com/dynamo-graph-deployment-name=$DEPLOYMENT_NAME" \
--field-selector=status.phase=Running \
-o jsonpath='{.items[*].metadata.name}' 2>/dev/null
}
# Leader pod (lowest-sorted name = rank-0)
# Only the leader exposes port 9090 (scale API)
head_pod() {
kubectl get pods -n "$NS" \
-l "nvidia.com/dynamo-component=VllmDecodeWorker,nvidia.com/dynamo-graph-deployment-name=$DEPLOYMENT_NAME" \
--field-selector=status.phase=Running \
--sort-by='.metadata.name' \
-o jsonpath='{.items[0].metadata.name}' 2>/dev/null
}
frontend_pod() {
kubectl get pods -n "$NS" \
-l "nvidia.com/dynamo-component=Frontend,nvidia.com/dynamo-graph-deployment-name=$DEPLOYMENT_NAME" \
--field-selector=status.phase=Running \
-o jsonpath='{.items[0].metadata.name}' 2>/dev/null
}
# Verify pods are present
INITIAL_HEAD=$(head_pod)
if [ -z "$INITIAL_HEAD" ]; then
echo "ERROR: no running VllmDecodeWorker pod found in namespace $NS" >&2
exit 1
fi
echo "Leader pod (at start): $INITIAL_HEAD"
echo "All worker pods: $(all_worker_pods)"
# ── Wait for leader ready ─────────────────────────────────────────────────────
echo ""
echo "=== Waiting for leader pod to be Ready ==="
kubectl wait pod/"$(head_pod)" -n "$NS" --for=condition=Ready --timeout=900s
echo "Ready at $(date -u +%Y-%m-%dT%H:%M:%SZ)"
# ── Wait for inference endpoint ───────────────────────────────────────────────
# Use kubectl exec for all API calls — kubectl port-forward tunnels through the
# Teleport/AKS API proxy and die after a single connection drop, never
# recovering. kubectl exec opens a fresh connection per call.
echo "=== Waiting for inference endpoint ==="
for i in $(seq 1 60); do
fpod=$(frontend_pod)
CODE=$(kubectl exec "$fpod" -n "$NS" -- \
curl -s -o /dev/null -w "%{http_code}" -m 5 http://localhost:8000/v1/models 2>/dev/null)
if [ "$CODE" = "200" ]; then
echo "Endpoint ready (checked after ~$((i * 5))s)"
break
fi
sleep 5
done
if [ "$CODE" != "200" ]; then
echo "ERROR: inference endpoint never became ready (last HTTP code: ${CODE:-none})" >&2
exit 1
fi
# ── Helpers ───────────────────────────────────────────────────────────────────
# Captures nvidia-smi from BOTH pods so we can see which node holds active GPUs
snapshot() {
local label="$1"
local pods
pods=$(all_worker_pods)
echo ""
for pod in $pods; do
node=$(kubectl get pod "$pod" -n "$NS" -o jsonpath='{.spec.nodeName}' 2>/dev/null)
echo "--- nvidia-smi ($label) pod=$pod node=$node ---"
kubectl exec "$pod" -n "$NS" -- \
nvidia-smi --query-gpu=index,memory.used,memory.free,utilization.gpu --format=csv 2>&1
echo "--- Ray actors ($label) pod=$pod ---"
kubectl exec "$pod" -n "$NS" -- ps aux 2>&1 \
| awk '/DPMoEEngineCoreActor|RayWorkerWrapper/{printf "PID=%-8s CMD=%s\n", $2, $11}'
done
}
infer() {
local label="$1"
echo ""
echo "--- inference ($label) ---"
local fpod
fpod=$(frontend_pod)
if [ -z "$fpod" ]; then
echo " (no frontend pod found — skipping)"
return
fi
# Use kubectl exec so we curl from inside the pod — avoids relying on a
# persistent port-forward which dies after the first connection on AKS.
RESP=$(kubectl exec "$fpod" -n "$NS" -- \
curl -s -m 60 http://localhost:8000/v1/completions \
-H "Content-Type: application/json" \
-d "{\"model\":\"$MODEL\",\"prompt\":\"2+2=\",\"max_tokens\":16,\"temperature\":0}" \
2>&1)
echo "$RESP" | python3 -c "
import sys, json
d = json.load(sys.stdin)
text = d['choices'][0]['text'].strip()
timing = d.get('nvext', {}).get('timing', {}).get('total_time_ms', 'n/a')
print('text:', repr(text), ' time_ms:', timing)
" 2>/dev/null || echo "raw response: $RESP"
}
scale() {
local from_dp="$1"
local to_dp="$2"
local timeout="${3:-700}"
echo ""
echo "=========================================="
echo "SCALE dp=$from_dp → dp=$to_dp at $(date -u +%Y-%m-%dT%H:%M:%SZ)"
echo " leader pod: $(head_pod)"
echo " all pods: $(all_worker_pods)"
echo "=========================================="
local lpod
lpod=$(head_pod)
RESP=$(kubectl exec "$lpod" -n "$NS" -- \
curl -s -X POST http://localhost:9090/engine/scale_elastic_ep \
-H "Content-Type: application/json" \
-d "{\"new_data_parallel_size\": $to_dp}" \
--max-time "$timeout" \
2>&1)
echo "--- scale response ---"
echo "$RESP"
SCALE_STATUS=$(echo "$RESP" | python3 -c "import sys,json; d=json.load(sys.stdin); print(d.get('status',''))" 2>/dev/null)
SCALE_DP=$(echo "$RESP" | python3 -c "import sys,json; d=json.load(sys.stdin); print(d.get('new_data_parallel_size',''))" 2>/dev/null)
if [ "$SCALE_STATUS" != "ok" ] || [ "$SCALE_DP" != "$to_dp" ]; then
echo "ERROR: scale to dp=$to_dp failed: $RESP" >&2
exit 1
fi
snapshot "after dp=$to_dp"
infer "dp=$to_dp"
}
# ── Baseline ──────────────────────────────────────────────────────────────────
echo ""
echo "=========================================="
echo "BASELINE dp=2 at $(date -u +%Y-%m-%dT%H:%M:%SZ)"
echo "=========================================="
snapshot "baseline dp=2"
infer "dp=2"
# ── Scale sequence (11 steps) — dp=1 excluded (vLLM _eplb_reshuffle bug) ─────
scale 2 3 700 # step 1: dp=2 → dp=3 (within-leader: 3rd GPU on leader)
scale 3 4 700 # step 2: dp=3 → dp=4 (cross-node: first actor on worker)
scale 4 5 700 # step 3: dp=4 → dp=5 (cross-node: 2nd actor on worker)
scale 5 6 700 # step 4: dp=5 → dp=6 (cross-node: worker fully active)
scale 6 5 300 # step 5: dp=6 → dp=5 (scale down: remove 1 from worker)
scale 5 4 300 # step 6: dp=5 → dp=4 (scale down)
scale 4 3 300 # step 7: dp=4 → dp=3 (scale down: worker back to idle)
scale 3 2 300 # step 8: dp=3 → dp=2 (scale down: back to baseline)
scale 2 4 700 # step 9: dp=2 → dp=4 (jump up: skip dp=3)
scale 4 6 700 # step 10: dp=4 → dp=6 (jump to full capacity)
scale 6 2 300 # step 11: dp=6 → dp=2 (jump down: back to baseline)
echo ""
echo "=== ALL STEPS COMPLETE at $(date -u +%Y-%m-%dT%H:%M:%SZ) ==="
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment