Unverified Commit daa4fb04 authored by Tzu-Ling Kan's avatar Tzu-Ling Kan Committed by GitHub
Browse files

feat(vllm): elastic EP scaling — ray.nodes() patch, scale_elastic_ep route,...


feat(vllm): elastic EP scaling — ray.nodes() patch, scale_elastic_ep route, null-tolerant discovery (#7481)
Signed-off-by: default avatartzulingk <tzulingk@nvidia.com>
parent 8c2a4681
......@@ -15,6 +15,8 @@ from contextlib import asynccontextmanager
from dataclasses import dataclass
from typing import Any, AsyncIterator, Dict, Final, Generic, Optional, TypeVar
import ray
import ray.util.state as _ray_util_state
import torch
from vllm.config import VllmConfig
from vllm.inputs import EmbedsPrompt, TextPrompt, TokensPrompt
......@@ -56,6 +58,36 @@ from .multimodal_utils.hash_utils import compute_mm_uuids_from_images
from .multimodal_utils.model import construct_qwen_decode_mm_data, is_qwen_vl_model
from .multimodal_utils.prefill_worker_utils import MultiModalEmbeddingLoader
# TODO(upstream-vllm): remove this patch once vLLM fixes add_dp_placement_groups in
# vllm/v1/engine/utils.py to use ray.nodes() instead of ray.util.state.list_nodes().
#
# Patch ray.util.state.list_nodes to use the GCS API instead of the dashboard HTTP
# API (127.0.0.1:8265/api/v0/nodes). The dynamo image installs ray core only (not
# ray[default]), so the dashboard HTTP server starts in --minimal mode with the HTTP
# server disabled. vLLM's add_dp_placement_groups calls list_nodes() which requires
# that HTTP endpoint, causing scale_elastic_ep to fail with "Failed to connect to
# API server".
#
# ray.nodes() uses the GCS gRPC channel directly (no dashboard process needed) and
# returns the same information. This patch makes elastic EP scaling self-contained.
#
# Format mapping:
# list_nodes() → objects with .node_ip and .node_id
# ray.nodes() → dicts with "NodeManagerAddress" and "NodeID"
class _NodeInfo:
__slots__ = ("node_ip", "node_id")
def __init__(self, d: dict) -> None:
self.node_ip: str = d["NodeManagerAddress"]
self.node_id: str = d["NodeID"]
_ray_util_state.list_nodes = lambda **kw: [
_NodeInfo(n) for n in ray.nodes() if n.get("Alive", False)
]
# Multimodal data dictionary keys
IMAGE_URL_KEY: Final = "image_url"
VIDEO_URL_KEY: Final = "video_url"
......@@ -495,6 +527,47 @@ class BaseWorkerHandler(ABC, Generic[RequestT, ResponseT]):
logger.error(f"Failed to sleep engine: {e}")
return {"status": "error", "message": str(e)}
async def scale_elastic_ep(self, body: dict) -> dict:
"""Scale the elastic expert-parallelism data-parallel size live.
Args:
body: Dict with required 'new_data_parallel_size' key (int).
Example::
{"new_data_parallel_size": 4}
The vLLM Ray DP backend will spin up / tear down DP workers on the GPUs
already reserved by the pod, then hot-swap the expert routing table.
No pod restart is needed.
"""
body = body or {}
new_dp_size = body.get("new_data_parallel_size")
if new_dp_size is None:
return {
"status": "error",
"message": "Missing required field: new_data_parallel_size",
}
try:
new_dp_size = int(new_dp_size)
except (TypeError, ValueError):
return {
"status": "error",
"message": f"new_data_parallel_size must be an integer, got: {new_dp_size!r}",
}
logger.info(f"[ElasticEP] Scaling to new_data_parallel_size={new_dp_size}")
try:
await self.engine_client.scale_elastic_ep(new_dp_size)
logger.info(f"[ElasticEP] Scaling to dp={new_dp_size} complete")
return {
"status": "ok",
"message": f"Scaled to data_parallel_size={new_dp_size}",
"new_data_parallel_size": new_dp_size,
}
except Exception as e:
logger.error(f"[ElasticEP] Scaling failed: {e}")
return {"status": "error", "message": str(e)}
async def wake_up(self, body: dict) -> dict:
"""Wake the engine to restore GPU memory and re-register to discovery.
......
......@@ -609,7 +609,20 @@ async def register_vllm_model(
f"Getting engine runtime configuration metadata from vLLM engine for {model_type}..."
)
runtime_values = get_engine_cache_info(engine_client)
runtime_config.total_kv_blocks = runtime_values["num_gpu_blocks"]
num_gpu_blocks = runtime_values["num_gpu_blocks"]
if num_gpu_blocks is None:
# TODO(upstream-vllm): remove this workaround once vLLM propagates
# num_gpu_blocks from Ray DP workers back to the main-process vllm_config.
# With Ray-based data-parallel backend, num_gpu_blocks is computed inside
# Ray worker processes and is never written back to the main-process
# vllm_config. Use 0 as a sentinel so the Rust runtime can still register
# the model; KV-cache capacity metrics will be unavailable in this mode.
logging.warning(
"num_gpu_blocks is None (expected when using --data-parallel-backend ray). "
"Setting total_kv_blocks=0 for model registration."
)
num_gpu_blocks = 0
runtime_config.total_kv_blocks = num_gpu_blocks
runtime_config.max_num_seqs = runtime_values["max_num_seqs"]
runtime_config.max_num_batched_tokens = runtime_values["max_num_batched_tokens"]
# Decode workers don't create the WorkerKvQuery endpoint, so don't advertise local indexer
......
......@@ -498,7 +498,10 @@ class WorkerFactory:
# Register sleep/wake_up engine routes
runtime.register_engine_route("sleep", handler.sleep)
runtime.register_engine_route("wake_up", handler.wake_up)
logger.info("Registered engine routes: /engine/sleep, /engine/wake_up")
runtime.register_engine_route("scale_elastic_ep", handler.scale_elastic_ep)
logger.info(
"Registered engine routes: /engine/sleep, /engine/wake_up, /engine/scale_elastic_ep"
)
# Parse endpoint types from --endpoint-types flag
model_type = parse_endpoint_types(config.endpoint_types)
......@@ -702,7 +705,10 @@ class WorkerFactory:
# Register sleep/wake_up engine routes
runtime.register_engine_route("sleep", handler.sleep)
runtime.register_engine_route("wake_up", handler.wake_up)
logger.info("Registered engine routes: /engine/sleep, /engine/wake_up")
runtime.register_engine_route("scale_elastic_ep", handler.scale_elastic_ep)
logger.info(
"Registered engine routes: /engine/sleep, /engine/wake_up, /engine/scale_elastic_ep"
)
shutdown_endpoints[:] = [generate_endpoint, clear_endpoint]
......
......@@ -2,20 +2,45 @@
// SPDX-License-Identifier: Apache-2.0
use anyhow::Result;
use serde::Deserialize as _;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use super::{DiscoveryInstance, DiscoveryInstanceId, DiscoveryQuery};
/// Deserializes a JSON `null` or missing field as `T::default()`.
///
/// Kubernetes Server-Side Apply with `schema = "disabled"` can write an empty
/// object `{}` as `null` for nested free-form fields. Without this helper, the
/// daemon fails to deserialize the `DynamoWorkerMetadata` CR, and the worker is
/// excluded from the `MetadataSnapshot` (i.e. invisible to service discovery),
/// causing `KubeDiscoveryClient::list` to return 0 instances and all inference
/// requests to 404. One concrete example is vLLM elastic EP scaling:
/// `scale_elastic_ep` reinitializes event plane sockets, which triggers
/// `unregister_event_channel()`, leaving `event_channels` as an empty map `{}`.
/// SSA then writes it back as `null`, breaking deserialization until this helper
/// treats `null` as an empty map. The issue applies to any event plane
/// implementation, not only a specific transport.
fn deserialize_null_default<'de, D, T>(deserializer: D) -> Result<T, D::Error>
where
D: serde::Deserializer<'de>,
T: Default + serde::Deserialize<'de>,
{
Ok(Option::<T>::deserialize(deserializer)?.unwrap_or_default())
}
/// Metadata stored on each pod and exposed via HTTP endpoint
/// This struct holds all discovery registrations for this pod instance
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct DiscoveryMetadata {
/// Registered endpoint instances (key: path string from EndpointInstanceId::to_path())
#[serde(default, deserialize_with = "deserialize_null_default")]
endpoints: HashMap<String, DiscoveryInstance>,
/// Registered model card instances (key: path string from ModelCardInstanceId::to_path())
#[serde(default, deserialize_with = "deserialize_null_default")]
model_cards: HashMap<String, DiscoveryInstance>,
/// Registered event channel instances (key: path string from EventChannelInstanceId::to_path())
#[serde(default, deserialize_with = "deserialize_null_default")]
event_channels: HashMap<String, DiscoveryInstance>,
}
......
......@@ -404,6 +404,8 @@ module = [
"gradio.*",
"kubernetes_asyncio",
"kubernetes_asyncio.*",
"ray",
"ray.*",
"pydantic_core",
"aiconfigurator",
"aiconfigurator.*",
......
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Elastic EP Scaling Demo Deployment
#
# Demonstrates dynamic expert parallelism scaling via the /scale_elastic_ep API.
# Pod requests 4 GPUs up front. Starts with data-parallel-size=2 (2 active DP workers,
# 2 GPUs used). Scale up to dp=4 activates the remaining 2 GPUs without pod restart.
#
# Requirements:
# - 4 GPUs available on a single node (tp=1 per DP rank, no cross-node needed)
# - data-parallel-backend=ray (required for elastic EP)
# - Model: deepseek-ai/DeepSeek-V2-Lite (small MoE, ~16B, fits in 4×A100/H100)
#
# Scaling endpoint: POST /scale_elastic_ep {"new_data_parallel_size": <int>}
# - Scale up: {"new_data_parallel_size": 4}
# - Scale down: {"new_data_parallel_size": 2}
apiVersion: nvidia.com/v1alpha1
kind: DynamoGraphDeployment
metadata:
name: vllm-elastic-ep-demo
spec:
services:
Frontend:
componentType: frontend
replicas: 1
extraPodSpec:
mainContainer:
# Update tag to match VllmDecodeWorker image below
image: nvcr.io/nvidia/ai-dynamo/vllm-runtime:my-tag
VllmDecodeWorker:
envFromSecret: hf-token-secret
componentType: worker
# Single pod with 4 GPUs — elastic EP scales DP workers within this pod via Ray
replicas: 1
resources:
requests:
memory: "60Gi"
# Reserve all 4 GPUs upfront so scale-up can activate them without rescheduling
gpu: "4"
limits:
memory: "120Gi"
gpu: "4"
envs:
- name: DYN_SYSTEM_ENABLED
value: "true"
# allgather_reducescatter: no PPLX infrastructure required, works for demo
# Switch to "pplx" for production MoE workloads with PPLX cluster
- name: VLLM_ALL2ALL_BACKEND
value: "allgather_reducescatter"
- name: VLLM_USE_ELASTIC_EP
value: "1"
- name: VLLM_USE_V1
value: "1"
- name: VLLM_WORKER_MULTIPROC_METHOD
value: "spawn"
# Expose all 4 GPUs to the process — Ray will schedule DP workers on them
- name: CUDA_VISIBLE_DEVICES
value: "0,1,2,3"
- name: VLLM_LOGGING_LEVEL
value: "INFO"
# Speed up HF downloads and improve 429 rate-limit resilience
- name: HF_HUB_ENABLE_HF_TRANSFER
value: "1"
# Point HF cache to the PVC so the model is loaded from local storage
- name: HF_HOME
value: "/model-cache"
extraPodSpec:
imagePullSecrets:
- name: nvcr-imagepullsecret
volumes:
- name: model-cache
persistentVolumeClaim:
claimName: model-cache
mainContainer:
image: nvcr.io/nvidia/ai-dynamo/vllm-runtime:my-tag
imagePullPolicy: Always
workingDir: /workspace/examples/backends/vllm
volumeMounts:
- name: model-cache
mountPath: /model-cache
command:
- python3
- -m
- dynamo.vllm
args:
- --model
- deepseek-ai/DeepSeek-V2-Lite
- --trust-remote-code
# tp=1: each DP rank uses 1 GPU. With dp=2 → 2 GPUs active at start.
# After scale to dp=4 → all 4 GPUs active.
- --tensor-parallel-size
- "1"
# Initial DP size = 2. Scale up to 4 via /scale_elastic_ep API.
- --data-parallel-size
- "2"
# REQUIRED for elastic EP — Ray backend manages dynamic worker lifecycle
- --data-parallel-backend
- ray
- --gpu-memory-utilization
- "0.8"
- --max-model-len
- "4096"
- --enable-expert-parallel
- --enable-elastic-ep
- --enable-eplb
# 0 redundant experts: keeps expert count divisible across dp ranks
- --eplb-config.num_redundant_experts
- "0"
- --no-enable-prefix-caching
- --enforce-eager
#!/bin/bash
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Elastic EP Scaling Regression Test
#
# Runs a full 6-step scale sequence on a live elastic EP deployment:
# Baseline (dp=2) → dp=3 → dp=4 → dp=3 → dp=2 → dp=4 → dp=2
#
# After each step captures:
# - Full API request + response
# - nvidia-smi GPU memory for all 4 GPUs
# - ps aux PIDs for Ray actors (DPMoEEngineCoreActor, RayWorkerWrapper)
# - Live inference result with latency
#
# Usage:
# ./run_elastic_ep_scale_test.sh [NAMESPACE] [DEPLOYMENT_NAME]
#
# Defaults:
# NAMESPACE = default
# DEPLOYMENT_NAME = vllm-elastic-ep-demo
#
# Prerequisites:
# - kubectl configured and pointing at the right cluster
# - Deployment already applied (see moe_elastic_ep_demo.yaml)
# - Ports 8001 and 8002 free on localhost
set -uo pipefail
NS="${1:-default}"
DEPLOYMENT_NAME="${2:-vllm-elastic-ep-demo}"
MODEL="deepseek-ai/DeepSeek-V2-Lite"
echo "Namespace: $NS"
echo "Model: $MODEL"
echo ""
# ── Pod lookup helpers ────────────────────────────────────────────────────────
# Always re-resolved from the cluster so the script handles pod restarts
# and works regardless of the randomly-generated pod name suffix.
worker_pod() {
kubectl get pods -n "$NS" \
-l "nvidia.com/dynamo-component=VllmDecodeWorker" \
--field-selector=status.phase=Running \
-o jsonpath='{.items[0].metadata.name}' 2>/dev/null
}
frontend_pod() {
kubectl get pods -n "$NS" \
-l "nvidia.com/dynamo-component=Frontend" \
--field-selector=status.phase=Running \
-o jsonpath='{.items[0].metadata.name}' 2>/dev/null
}
# Verify at least one worker pod exists before proceeding
INITIAL_POD=$(worker_pod)
if [ -z "$INITIAL_POD" ]; then
echo "ERROR: no running VllmDecodeWorker pod found in namespace $NS" >&2
exit 1
fi
echo "Worker pod (at start): $INITIAL_POD"
# ── Wait for pod ready ────────────────────────────────────────────────────────
echo "=== Waiting for worker pod to be Ready ==="
kubectl wait pod/"$(worker_pod)" -n "$NS" --for=condition=Ready --timeout=900s
echo "Ready at $(date -u +%Y-%m-%dT%H:%M:%SZ)"
# ── Port-forwards ─────────────────────────────────────────────────────────────
pkill -f "port-forward.*8001:9090" 2>/dev/null || true
pkill -f "port-forward.*8002:8000" 2>/dev/null || true
sleep 2
kubectl port-forward pod/"$(worker_pod)" 8001:9090 -n "$NS" &
PF_ENGINE=$!
kubectl port-forward pod/"$(frontend_pod)" 8002:8000 -n "$NS" &
PF_FRONTEND=$!
echo "Port-forwards: engine=$PF_ENGINE frontend=$PF_FRONTEND"
sleep 5
# ── Wait for inference endpoint ───────────────────────────────────────────────
echo "=== Waiting for inference endpoint ==="
for i in $(seq 1 60); do
CODE=$(curl -s -o /dev/null -w "%{http_code}" -m 5 http://localhost:8002/v1/models 2>/dev/null)
if [ "$CODE" = "200" ]; then
echo "Endpoint ready (checked after ~$((i * 5))s)"
break
fi
sleep 5
done
# ── Helpers ───────────────────────────────────────────────────────────────────
snapshot() {
local label="$1"
local pod
pod=$(worker_pod)
echo ""
echo "--- nvidia-smi ($label) ---"
kubectl exec "$pod" -n "$NS" -- \
nvidia-smi --query-gpu=index,memory.used,utilization.gpu --format=csv,noheader 2>&1
echo "--- Ray processes ($label) ---"
kubectl exec "$pod" -n "$NS" -- ps aux 2>&1 \
| awk '/DPMoEEngineCoreActor|RayWorkerWrapper/{printf "PID=%-8s CMD=%s\n", $2, $11}'
}
infer() {
local label="$1"
local pod
pod=$(worker_pod)
echo ""
echo "--- inference ($label) ---"
# Patch CRD if event_channels became null after scale (known Rust serde bug,
# fixed in lib/runtime/src/discovery/metadata.rs)
EC=$(kubectl get dynamoworkermetadata "$pod" -n "$NS" \
-o jsonpath='{.spec.data.event_channels}' 2>/dev/null)
if [ "$EC" = "null" ] || [ -z "$EC" ]; then
kubectl patch dynamoworkermetadata "$pod" -n "$NS" \
--type=merge -p '{"spec":{"data":{"event_channels":{}}}}' 2>/dev/null
echo "(patched event_channels: null → {} — workaround for discovery 404)"
sleep 3
fi
RESP=$(curl -s -m 30 http://localhost:8002/v1/completions \
-H "Content-Type: application/json" \
-d "{\"model\":\"$MODEL\",\"prompt\":\"2+2=\",\"max_tokens\":5,\"temperature\":0}")
echo "$RESP" | python3 -c "
import sys, json
d = json.load(sys.stdin)
print('text:', repr(d['choices'][0]['text'].strip()), ' time_ms:', d['nvext']['timing']['total_time_ms'])
" 2>/dev/null || echo "response: $RESP"
}
scale() {
local from_dp="$1"
local to_dp="$2"
local timeout="${3:-700}"
echo ""
echo "=========================================="
echo "SCALE dp=$from_dp → dp=$to_dp at $(date -u +%Y-%m-%dT%H:%M:%SZ)"
echo " worker pod: $(worker_pod)"
echo "=========================================="
echo "--- request: POST /engine/scale_elastic_ep {\"new_data_parallel_size\": $to_dp} ---"
RESP=$(curl -s -X POST http://localhost:8001/engine/scale_elastic_ep \
-H "Content-Type: application/json" \
-d "{\"new_data_parallel_size\": $to_dp}" \
--max-time "$timeout")
echo "--- response ---"
echo "$RESP"
snapshot "after dp=$to_dp"
infer "dp=$to_dp"
}
# ── Baseline ──────────────────────────────────────────────────────────────────
echo ""
echo "=========================================="
echo "BASELINE dp=2 at $(date -u +%Y-%m-%dT%H:%M:%SZ)"
echo "=========================================="
snapshot "baseline dp=2"
infer "dp=2"
# ── 6 scale steps ─────────────────────────────────────────────────────────────
scale 2 3 700 # step 1: dp=2 → dp=3
scale 3 4 700 # step 2: dp=3 → dp=4
scale 4 3 300 # step 3: dp=4 → dp=3
scale 3 2 300 # step 4: dp=3 → dp=2
scale 2 4 700 # step 5: dp=2 → dp=4 (scale-up after scale-down — known regression)
scale 4 2 300 # step 6: dp=4 → dp=2
echo ""
echo "=== ALL STEPS COMPLETE at $(date -u +%Y-%m-%dT%H:%M:%SZ) ==="
kill $PF_ENGINE $PF_FRONTEND 2>/dev/null || true
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment