Unverified Commit 67273aba authored by Yan Ru Pei's avatar Yan Ru Pei Committed by GitHub
Browse files

test: reenable router + vllm tests (#4746)


Signed-off-by: default avatarPeaBrane <yanrpei@gmail.com>
parent d5800311
......@@ -30,7 +30,6 @@ pytestmark = [
pytest.mark.pre_merge,
pytest.mark.gpu_0,
pytest.mark.integration,
pytest.mark.parallel,
pytest.mark.model(MODEL_NAME),
]
NUM_MOCKERS = 2
......@@ -287,6 +286,7 @@ class DisaggMockerProcess:
self._process.__exit__(exc_type, exc_val, exc_tb)
@pytest.mark.parallel
def test_mocker_kv_router(request, runtime_services_session, predownload_tokenizers):
"""
Test KV router with multiple mocker engine instances.
......@@ -326,6 +326,7 @@ def test_mocker_kv_router(request, runtime_services_session, predownload_tokeniz
mockers.__exit__(None, None, None)
@pytest.mark.parallel
@pytest.mark.parametrize("store_backend", ["etcd", "file"])
def test_mocker_two_kv_router(
request,
......@@ -381,6 +382,7 @@ def test_mocker_two_kv_router(
mockers.__exit__(None, None, None)
@pytest.mark.parallel
@pytest.mark.skip(reason="Flaky, temporarily disabled")
def test_mocker_kv_router_overload_503(
request, runtime_services_session, predownload_tokenizers
......@@ -419,6 +421,7 @@ def test_mocker_kv_router_overload_503(
mockers.__exit__(None, None, None)
@pytest.mark.parallel
def test_kv_push_router_bindings(
request, runtime_services_session, predownload_tokenizers
):
......@@ -504,6 +507,7 @@ def test_indexers_sync(
mockers.__exit__(None, None, None)
@pytest.mark.parallel
def test_query_instance_id_returns_worker_and_tokens(
request, runtime_services_session, predownload_tokenizers
):
......@@ -538,6 +542,7 @@ def test_query_instance_id_returns_worker_and_tokens(
mockers.__exit__(None, None, None)
@pytest.mark.parallel
def test_router_decisions(request, runtime_services_session, predownload_tokenizers):
"""Validate KV cache prefix reuse and dp_rank routing by sending progressive requests with overlapping prefixes."""
......@@ -577,6 +582,7 @@ def test_router_decisions(request, runtime_services_session, predownload_tokeniz
mockers.__exit__(None, None, None)
@pytest.mark.parallel
def test_router_disagg_decisions(
request, runtime_services_session, predownload_tokenizers
):
......@@ -642,6 +648,7 @@ def test_router_disagg_decisions(
prefill_workers.__exit__(None, None, None)
@pytest.mark.parallel
def test_busy_threshold_endpoint(
request, runtime_services_session, predownload_tokenizers
):
......
......@@ -10,6 +10,7 @@ import pytest
from tests.router.common import ( # utilities
_test_router_basic,
_test_router_decisions,
_test_router_indexers_sync,
generate_random_suffix,
get_runtime,
)
......@@ -20,7 +21,6 @@ logger = logging.getLogger(__name__)
MODEL_NAME = "TinyLlama/TinyLlama-1.1B-Chat-v1.0"
pytestmark = [
pytest.mark.pre_merge,
pytest.mark.e2e,
pytest.mark.vllm,
pytest.mark.model(MODEL_NAME),
......@@ -46,6 +46,16 @@ TEST_PAYLOAD: Dict[str, Any] = {
"max_tokens": 10,
}
# Shared vLLM configuration for all tests
# gpu_memory_utilization limits actual VRAM allocation (required for multi-worker on same GPU)
VLLM_ARGS: Dict[str, Any] = {
"block_size": BLOCK_SIZE,
"model": MODEL_NAME,
"gpu_memory_utilization": 0.4, # Limit VRAM allocation per worker
"max_model_len": 1024, # Limit context length to reduce KV cache size
"enforce_eager": True, # Disable CUDA graphs for faster startup & lower memory
}
class VLLMProcess:
"""Manages vLLM workers using dynamo.vllm (HTTP API + KV events).
......@@ -72,11 +82,12 @@ class VLLMProcess:
vllm_args: Configuration dict with keys:
- block_size: KV cache block size (default: 16)
- model: Model name/path (default: TinyLlama-1.1B)
- gpu_memory_utilization: GPU memory fraction per worker (default: 0.9)
- gpu_memory_utilization: Fraction of GPU memory to allocate (optional)
- num_gpu_blocks_override: Cap on number of KV cache blocks (optional)
- max_model_len: Maximum sequence length (optional)
- speedup_ratio: IGNORED (vLLM runs at real speed)
- enforce_eager: Disable CUDA graphs (default: False)
num_workers: Number of vLLM worker processes
single_gpu: If True, all workers share GPU 0 (requires gpu_memory_utilization < 1.0/num_workers)
single_gpu: If True, all workers share GPU 0
data_parallel_size: If set, enables data parallelism with this many ranks (num_workers must equal data_parallel_size)
"""
# Generate unique namespace for isolation
......@@ -92,8 +103,10 @@ class VLLMProcess:
block_size = vllm_args.get("block_size", BLOCK_SIZE)
model = vllm_args.get("model", MODEL_NAME)
gpu_memory_utilization = vllm_args.get("gpu_memory_utilization", 0.9)
gpu_memory_utilization = vllm_args.get("gpu_memory_utilization")
num_gpu_blocks_override = vllm_args.get("num_gpu_blocks_override")
max_model_len = vllm_args.get("max_model_len")
enforce_eager = vllm_args.get("enforce_eager", False)
self.model_name = model
......@@ -130,15 +143,28 @@ class VLLMProcess:
model,
"--block-size",
str(block_size),
"--enforce-eager", # Disable CUDA graphs for faster startup
"--gpu-memory-utilization",
str(gpu_memory_utilization),
]
# Disable CUDA graphs for faster startup & lower memory
if enforce_eager:
command.append("--enforce-eager")
# Limit VRAM allocation (required for multi-worker on same GPU)
if gpu_memory_utilization is not None:
command.extend(
["--gpu-memory-utilization", str(gpu_memory_utilization)]
)
# Add optional max_model_len if specified
if max_model_len is not None:
command.extend(["--max-model-len", str(max_model_len)])
# Cap block count for predictable KV cache behavior
if num_gpu_blocks_override is not None:
command.extend(
["--num-gpu-blocks-override", str(num_gpu_blocks_override)]
)
if data_parallel_size is not None:
# Add DP configuration for external load balancing
# See: https://docs.vllm.ai/en/v0.10.0/serving/data_parallel_deployment.html#external-load-balancing
......@@ -157,6 +183,8 @@ class VLLMProcess:
{
"CUDA_VISIBLE_DEVICES": gpu_device,
"DYN_NAMESPACE": self.namespace,
"DYN_VLLM_KV_EVENT_PORT": str(20080 + worker_idx),
"VLLM_NIXL_SIDE_CHANNEL_PORT": str(20090 + worker_idx),
"PYTHONHASHSEED": "0", # for deterministic event id's
}
)
......@@ -176,13 +204,13 @@ class VLLMProcess:
if data_parallel_size is not None:
logger.info(
f"Created {data_parallel_size} DP ranks per worker on GPU(s) {gpu_device} "
f"(gpu_memory_utilization={gpu_memory_utilization}) "
f"(gpu_mem={gpu_memory_utilization}) "
f"with endpoint: {self.endpoint}"
)
else:
logger.info(
f"Created vLLM worker {worker_idx} on GPU {gpu_device} "
f"(gpu_memory_utilization={gpu_memory_utilization}) "
f"(gpu_mem={gpu_memory_utilization}) "
f"with endpoint: {self.endpoint}"
)
......@@ -276,9 +304,11 @@ class VLLMProcess:
time.sleep(2)
@pytest.mark.pre_merge
@pytest.mark.gpu_1
@pytest.mark.skip(reason="All vLLM tests disabled for now")
def test_vllm_kv_router_basic(request, runtime_services, predownload_tokenizers):
def test_vllm_kv_router_basic(
request, runtime_services, predownload_models, set_ucx_tls_no_mm
):
"""
Quick e2e sanity test for KV router with vLLM engine instances.
"""
......@@ -287,19 +317,12 @@ def test_vllm_kv_router_basic(request, runtime_services, predownload_tokenizers)
N_VLLM_WORKERS = 2
logger.info(f"Starting vLLM KV router test with {N_VLLM_WORKERS} workers")
vllm_args = {
"block_size": BLOCK_SIZE,
"model": MODEL_NAME,
"gpu_memory_utilization": 0.35,
"max_model_len": 1024, # Limit context length to reduce KV cache size
}
try:
# Start vLLM workers
logger.info(f"Starting {N_VLLM_WORKERS} vLLM workers")
vllm_workers = VLLMProcess(
request,
vllm_args=vllm_args,
vllm_args=VLLM_ARGS,
num_workers=N_VLLM_WORKERS,
single_gpu=True, # fit workers into one GPU
)
......@@ -323,32 +346,22 @@ def test_vllm_kv_router_basic(request, runtime_services, predownload_tokenizers)
vllm_workers.__exit__(None, None, None)
@pytest.mark.pre_merge
@pytest.mark.gpu_1
@pytest.mark.skip(reason="All vLLM tests disabled for now")
def test_router_decisions_vllm_multiple_workers(
request, runtime_services, predownload_tokenizers
request, runtime_services, predownload_models, set_ucx_tls_no_mm
):
# runtime_services starts etcd and nats
logger.info("Starting vLLM router prefix reuse test with two workers")
# Create vLLM args - one worker with dp_size=2, sharing GPU 0
vllm_args = {
"block_size": BLOCK_SIZE,
"model": MODEL_NAME,
"gpu_memory_utilization": 0.35,
"max_model_len": 1024, # Limit context length to reduce KV cache size
}
N_WORKERS = 2
try:
# Start 2 worker processes (dp_rank 0 and dp_rank 1) on the same GPU
logger.info(
"Starting 2 vLLM worker processes with dp_size=2 on single GPU (gpu_memory_utilization=0.35, max_model_len=1024)"
)
# Start 2 worker processes on the same GPU
logger.info("Starting 2 vLLM worker processes on single GPU (gpu_mem=0.4)")
vllm_workers = VLLMProcess(
request,
vllm_args=vllm_args,
num_workers=N_WORKERS, # One worker process with dp_size=2
vllm_args=VLLM_ARGS,
num_workers=N_WORKERS,
single_gpu=True, # Worker uses GPU 0
)
logger.info(f"All vLLM workers using namespace: {vllm_workers.namespace}")
......@@ -373,8 +386,9 @@ def test_router_decisions_vllm_multiple_workers(
@pytest.mark.gpu_2
@pytest.mark.skip(reason="All vLLM tests disabled for now")
def test_router_decisions_vllm_dp(request, runtime_services, predownload_tokenizers):
def test_router_decisions_vllm_dp(
request, runtime_services, predownload_models, set_ucx_tls_no_mm
):
"""Validate KV cache prefix reuse with vLLM by sending progressive requests with overlapping prefixes.
Same flow as test_router_decisions_vllm_multiple_workers; force first request to (worker_id, dp_rank=1).
Dump events from router and verify:
......@@ -382,23 +396,14 @@ def test_router_decisions_vllm_dp(request, runtime_services, predownload_tokeniz
* The (worker_id, dp_rank) with events should have exactly 4 events (one per request)
* All events should be on the forced (worker_id, dp_rank=1) (verifying forced routing and prefix reuse)
"""
# Create vLLM args - one worker with dp_size=2, sharing GPU 0
vllm_args = {
"block_size": BLOCK_SIZE,
"model": MODEL_NAME,
"gpu_memory_utilization": 0.35,
"max_model_len": 1024, # Limit context length to reduce KV cache size
}
N_WORKERS = 1
DP_SIZE = 2
try:
logger.info(
"Starting 2 vLLM DP ranks (dp_size=2) on single GPU (gpu_memory_utilization=0.35, max_model_len=1024)"
)
logger.info("Starting 2 vLLM DP ranks (dp_size=2) (gpu_mem=0.4)")
vllm_workers = VLLMProcess(
request,
vllm_args=vllm_args,
vllm_args=VLLM_ARGS,
num_workers=N_WORKERS, # Ignored when data_parallel_size is set
single_gpu=False,
data_parallel_size=DP_SIZE, # Creates DP_SIZE processes (one per rank)
......@@ -421,3 +426,44 @@ def test_router_decisions_vllm_dp(request, runtime_services, predownload_tokeniz
# Clean up vLLM workers
if "vllm_workers" in locals():
vllm_workers.__exit__(None, None, None)
@pytest.mark.pre_merge
@pytest.mark.gpu_1
def test_vllm_indexers_sync(
request, runtime_services, predownload_models, set_ucx_tls_no_mm
):
"""
Test that two KV routers have synchronized indexer states after processing requests
with vLLM workers. This test verifies that both routers converge to the same internal state.
"""
logger.info("Starting vLLM indexers sync test")
N_VLLM_WORKERS = 2
try:
# Start vLLM workers
logger.info(f"Starting {N_VLLM_WORKERS} vLLM workers")
vllm_workers = VLLMProcess(
request,
vllm_args=VLLM_ARGS,
num_workers=N_VLLM_WORKERS,
single_gpu=True, # fit workers into one GPU
)
logger.info(f"All vLLM workers using namespace: {vllm_workers.namespace}")
vllm_workers.__enter__()
# Use the common test implementation (creates its own runtimes for each router)
# Note: Consumer verification is done inside _test_router_indexers_sync while routers are alive
_test_router_indexers_sync(
engine_workers=vllm_workers,
block_size=BLOCK_SIZE,
model_name=MODEL_NAME,
num_workers=N_VLLM_WORKERS,
store_backend="etcd",
)
logger.info("vLLM indexers sync test completed successfully")
finally:
if "vllm_workers" in locals():
vllm_workers.__exit__(None, None, None)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment