Unverified Commit d461631b authored by Karen Chung's avatar Karen Chung Committed by GitHub
Browse files

test: Router CI tests with vLLM engine (#3948)


Signed-off-by: default avatarPeaBrane <yanrpei@gmail.com>
Co-authored-by: default avatarPeaBrane <yanrpei@gmail.com>
parent 872900f1
......@@ -234,3 +234,22 @@ def runtime_services(request):
with NatsServer(request) as nats_process:
with EtcdServer(request) as etcd_process:
yield nats_process, etcd_process
@pytest.fixture
def file_storage_backend():
"""Fixture that sets up and tears down file storage backend.
Creates a temporary directory for file-based KV storage and sets
the DYN_FILE_KV environment variable. Cleans up after the test.
"""
with tempfile.TemporaryDirectory() as tmpdir:
old_env = os.environ.get("DYN_FILE_KV")
os.environ["DYN_FILE_KV"] = tmpdir
logging.info(f"Set up file storage backend in: {tmpdir}")
yield tmpdir
# Cleanup
if old_env is not None:
os.environ["DYN_FILE_KV"] = old_env
else:
os.environ.pop("DYN_FILE_KV", None)
......@@ -6,7 +6,6 @@
import logging
import os
import shutil
import tempfile
import numpy as np
import pytest
......@@ -91,25 +90,6 @@ def extract_params(param_map) -> dict:
return result
@pytest.fixture
def file_storage_backend():
"""Fixture that sets up and tears down file storage backend.
Creates a temporary directory for file-based KV storage and sets
the DYN_FILE_KV environment variable. Cleans up after the test.
"""
with tempfile.TemporaryDirectory() as tmpdir:
old_env = os.environ.get("DYN_FILE_KV")
os.environ["DYN_FILE_KV"] = tmpdir
logger.info(f"Set up file storage backend in: {tmpdir}")
yield tmpdir
# Cleanup
if old_env is not None:
os.environ["DYN_FILE_KV"] = old_env
else:
os.environ.pop("DYN_FILE_KV", None)
@pytest.mark.e2e
@pytest.mark.pre_merge
@pytest.mark.parametrize(
......
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
import asyncio
import json
import logging
import random
import string
from typing import Any, Optional
import aiohttp
import nats
from dynamo._core import DistributedRuntime, KvPushRouter, KvRouterConfig
from tests.utils.managed_process import ManagedProcess
logger = logging.getLogger(__name__)
NUM_REQUESTS = 100
BLOCK_SIZE = 16
########################################################
# Helper Classes
########################################################
class KVRouterProcess(ManagedProcess):
"""Manages the KV router process using dynamo.frontend"""
def __init__(
self, request, block_size: int, frontend_port: int, store_backend: str = "etcd"
):
command = [
"python3",
"-m",
"dynamo.frontend",
"--kv-cache-block-size",
str(block_size),
"--router-mode",
"kv",
"--http-port",
str(frontend_port),
"--store-kv",
store_backend,
]
super().__init__(
command=command,
timeout=60,
display_output=True,
health_check_ports=[frontend_port],
health_check_urls=[
(f"http://localhost:{frontend_port}/v1/models", self._check_ready)
],
log_dir=request.node.name,
terminate_existing=False,
)
self.port = frontend_port
def _check_ready(self, response):
"""Check if KV router is ready"""
return response.status_code == 200
def __exit__(self, exc_type, exc_val, exc_tb):
super().__exit__(exc_type, exc_val, exc_tb)
def generate_random_suffix() -> str:
"""Generate a 10-character random alphabetic suffix for namespace isolation."""
return "".join(random.choices(string.ascii_lowercase, k=10)) # noqa: S311
########################################################
# Utility functions
########################################################
async def wait_for_frontend_ready(
frontend_url: str, expected_num_workers: int = 2, timeout: int = 120
):
"""Wait for backend worker(s) to be ready via the HTTP frontend (OpenAI API).
This function performs a two-phase readiness check through the frontend HTTP server:
1. Polls GET /v1/models until at least one model is registered (workers connected)
2. Sends a test POST to /v1/chat/completions to verify the request pipeline is functional
Use this when testing through the HTTP frontend server (dynamo.frontend).
For direct Python API testing with KvPushRouter, use wait_for_workers_ready() instead.
Args:
frontend_url: Base URL of the frontend HTTP server (e.g., "http://localhost:8000")
expected_num_workers: Number of workers to wait for (currently logs but doesn't enforce)
timeout: Maximum time to wait in seconds for both phases combined
Raises:
TimeoutError: If workers don't register or pipeline doesn't become ready within timeout
aiohttp.ClientError: If HTTP requests fail unexpectedly
"""
models_url = f"{frontend_url}/v1/models"
chat_url = f"{frontend_url}/v1/chat/completions"
start_time = asyncio.get_event_loop().time()
logger.info(
f"Waiting for {expected_num_workers} workers to register on HTTP frontend (timeout={timeout}s)..."
)
# Phase 1: Wait for models to appear in /v1/models
model_name = None
while True:
elapsed = asyncio.get_event_loop().time() - start_time
if elapsed > timeout:
raise TimeoutError(
f"Timeout waiting for vLLM workers. Waited {elapsed:.1f}s, no workers registered."
)
try:
async with aiohttp.ClientSession() as session:
async with session.get(models_url) as response:
if response.status == 200:
data = await response.json()
models = data.get("data", [])
if len(models) > 0:
model_name = models[0].get("id")
logger.info(
f"Workers registered. Found {len(models)} model(s): {[m.get('id') for m in models]}"
)
break
else:
logger.debug(
f"No models registered yet (elapsed: {elapsed:.1f}s)"
)
except Exception as e:
logger.debug(f"Error checking models endpoint: {e}")
# Wait before next poll
await asyncio.sleep(2)
# Phase 2: Wait for chat completions pipeline to be ready
logger.info("Waiting for chat completions pipeline to be built...")
test_payload = {
"model": model_name,
"messages": [{"role": "user", "content": "test"}],
"max_tokens": 1,
"stream": False,
}
while True:
elapsed = asyncio.get_event_loop().time() - start_time
if elapsed > timeout:
raise TimeoutError(
f"Timeout waiting for chat completions pipeline. Waited {elapsed:.1f}s."
)
try:
async with aiohttp.ClientSession() as session:
async with session.post(chat_url, json=test_payload) as response:
if response.status == 200:
logger.info("Chat completions pipeline ready!")
return
else:
logger.debug(
f"Chat completions not ready yet, status {response.status} (elapsed: {elapsed:.1f}s)"
)
except Exception as e:
logger.debug(f"Error testing chat completions: {e}")
# Wait before next poll
await asyncio.sleep(2)
async def wait_for_workers_ready(
endpoint,
router: KvPushRouter,
expected_num_workers: int,
model_name: str,
) -> list[int]:
"""Wait for workers to be ready and return their instance IDs.
Supports mocker and vLLM workers.
This function polls the endpoint's client for instance IDs until the expected
number of workers are available, then sends a warmup request to verify they
can handle requests.
Args:
endpoint: The endpoint object to get the client from
router: The KvPushRouter to use for sending warmup requests
expected_num_workers: Number of workers to wait for
Returns:
Sorted list of unique instance IDs (ints).
Raises:
AssertionError: If workers don't become ready or warmup request fails.
"""
logger.info("Waiting for workers to be ready")
# Get the client from the endpoint
client = await endpoint.client()
# Poll for instance IDs until we have the expected number
instance_ids: list[int] = []
max_wait_time = 60 # seconds
start_time = asyncio.get_running_loop().time()
while len(instance_ids) < expected_num_workers:
instance_ids = client.instance_ids()
logger.info(f"Found {len(instance_ids)} instance(s): {instance_ids}")
if len(instance_ids) >= expected_num_workers:
break
# Check timeout
if asyncio.get_running_loop().time() - start_time > max_wait_time:
raise AssertionError(
f"Timeout waiting for workers. Found {len(instance_ids)} instance(s), expected {expected_num_workers}"
)
# Wait 1 second before polling again
await asyncio.sleep(1.0)
# Send a warmup request to verify workers can handle requests
test_token_ids = [random.randint(1, 10000) for _ in range(4)]
logger.info(f"Sending warmup request with {len(test_token_ids)} tokens")
try:
await send_request_via_python_kv_router(
kv_python_router=router,
model_name=model_name,
token_ids=test_token_ids,
initial_wait=1.0,
max_retries=8,
stop_conditions={
"ignore_eos": True,
"max_tokens": 2,
},
)
except Exception as e:
raise AssertionError(f"Warmup request failed: {e}")
logger.info(f"All {len(instance_ids)} workers are ready")
return sorted(instance_ids)
async def send_request_with_retry(url: str, payload: dict, max_retries: int = 8):
"""Send a single request with exponential backoff retry"""
wait_time = 1 # Start with 1 second
for attempt in range(max_retries + 1):
await asyncio.sleep(wait_time)
try:
async with aiohttp.ClientSession() as session:
async with session.post(url, json=payload) as response:
if response.status == 200:
# Read the response to ensure it's valid
async for _ in response.content:
pass
logger.debug(
f"First request succeeded on attempt {attempt + 1}"
)
return True
else:
logger.warning(
f"Attempt {attempt + 1} failed with status {response.status}"
)
except Exception as e:
logger.warning(f"Attempt {attempt + 1} failed with error: {e}")
if attempt < max_retries:
wait_time *= 2 # Double the wait time
return False
def get_runtime(store_backend="etcd", request_plane="nats"):
"""Create a DistributedRuntime instance for testing.
Args:
store_backend: Storage backend to use ("etcd" or "file"). Defaults to "etcd".
request_plane: How frontend talks to backend ("tcp", "http" or "nats"). Defaults to "nats".
"""
try:
# Try to get running loop (works in async context)
loop = asyncio.get_running_loop()
except RuntimeError:
# No running loop, create a new one (sync context)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
return DistributedRuntime(loop, store_backend, request_plane)
async def check_nats_consumers(namespace: str, expected_count: Optional[int] = None):
"""Check NATS consumers for the KV events stream.
Args:
namespace: The namespace to check consumers for
expected_count: Optional expected number of consumers. If provided, asserts if count doesn't match.
Returns:
List of consumer names
"""
component_subject = f"namespace.{namespace}.component.mocker"
slugified = component_subject.lower().replace(".", "-").replace("_", "-")
stream_name = f"{slugified}-kv-events"
logger.info(f"Checking consumers for stream: {stream_name}")
nc = await nats.connect("nats://localhost:4222")
try:
js = nc.jetstream()
consumer_infos = await js.consumers_info(stream_name)
consumer_names = [info.name for info in consumer_infos]
logger.info(f"Found {len(consumer_names)} consumers: {consumer_names}")
# Log detailed consumer info
for info in consumer_infos:
logger.info(
f"Consumer {info.name}: "
f"num_pending={info.num_pending}, "
f"num_ack_pending={info.num_ack_pending}, "
f"ack_floor={info.ack_floor}, "
f"delivered={info.delivered}"
)
if expected_count is not None:
assert (
len(consumer_names) == expected_count
), f"Expected {expected_count} durable consumers, found {len(consumer_names)}: {consumer_names}"
logger.info(f"✓ Verified {expected_count} durable consumers exist")
return consumer_names
finally:
await nc.close()
async def send_inflight_requests(urls: list, payload: dict, num_requests: int):
"""Send multiple requests concurrently, alternating between URLs if multiple provided"""
# First, send test requests with retry to ensure all systems are ready
for i, url in enumerate(urls):
logger.info(f"Sending initial test request to URL {i} ({url}) with retry...")
if not await send_request_with_retry(url, payload):
raise RuntimeError(f"Failed to connect to URL {i} after multiple retries")
async def send_single_request(session: aiohttp.ClientSession, request_id: int):
# Alternate between URLs based on request_id
url = urls[request_id % len(urls)]
url_index = request_id % len(urls)
try:
async with session.post(url, json=payload) as response:
if response.status != 200:
logger.error(
f"Request {request_id} to URL {url_index} failed with status {response.status}"
)
return False
# For streaming responses, read the entire stream
chunks = []
async for line in response.content:
if line:
chunks.append(line)
logger.debug(
f"Request {request_id} to URL {url_index} completed with {len(chunks)} chunks"
)
return True
except Exception as e:
logger.error(
f"Request {request_id} to URL {url_index} failed with error: {e}"
)
return False
# Send all requests at once
async with aiohttp.ClientSession() as session:
tasks = [send_single_request(session, i) for i in range(num_requests)]
results = await asyncio.gather(*tasks, return_exceptions=True)
successful = sum(1 for r in results if r if r is True)
failed = num_requests - successful
logger.info(f"Completed all requests: {successful} successful, {failed} failed")
assert (
successful == num_requests
), f"Expected {num_requests} successful requests, got {successful}"
logger.info(f"All {num_requests} requests completed successfully")
async def send_request_via_python_kv_router(
kv_python_router: KvPushRouter,
model_name: str,
token_ids: list,
initial_wait: float,
max_retries: int,
stop_conditions: Optional[dict] = None,
sampling_options: Optional[dict] = None,
output_options: Optional[dict] = None,
router_config_override: Optional[dict] = None,
worker_id: Optional[
int
] = None, # If None, Router will select the best available worker
dp_rank: Optional[int] = None, # Data parallel rank (defaults to 0)
) -> bool:
"""Send a request to the specified worker instance.
Returns True if workers respond, otherwise raises or returns False.
"""
wait_time = initial_wait
log_message = (
f"worker with worker_id={worker_id}"
if worker_id is not None
else "the best available worker"
)
# Retry loop sending request to worker with exponential backoff
for attempt in range(max_retries + 1):
try:
logger.debug(f"Sending request to {log_message} (attempt {attempt + 1})")
stream = await kv_python_router.generate(
token_ids=token_ids,
model=model_name,
stop_conditions=stop_conditions,
sampling_options=sampling_options,
output_options=output_options,
router_config_override=router_config_override,
worker_id=worker_id,
dp_rank=dp_rank,
)
if stream is not None:
logger.debug(f"Request succeeded on attempt {attempt + 1}")
break
except Exception as e:
logger.warning(f"Attempt {attempt + 1} failed with error: {e}")
if attempt < max_retries:
await asyncio.sleep(wait_time)
wait_time *= 2
else:
raise RuntimeError(
f"Failed to connect to workers after {max_retries + 1} attempts"
) from e
# Collect tokens from the SSE stream
generated_tokens = []
async for response in stream:
if isinstance(response, dict):
# Check if response has token_ids
if "token_ids" in response:
tokens = response["token_ids"]
if isinstance(tokens, list):
generated_tokens.extend(tokens)
logger.debug(f"Received {len(tokens)} tokens: {tokens}")
# Check for finish reason
if "finish_reason" in response:
logger.debug(
f"Stream finished with reason: {response['finish_reason']}"
)
# Verify if expected number of tokens are generated if max_tokens specified and ignore_eos is True
logger.debug(f"Total generated tokens: {len(generated_tokens)}")
if (
stop_conditions
and "max_tokens" in stop_conditions
and "ignore_eos" in stop_conditions
and stop_conditions["ignore_eos"]
):
max_tokens = int(stop_conditions["max_tokens"])
assert len(generated_tokens) == max_tokens, (
f"Expected exactly {max_tokens} tokens but got {len(generated_tokens)}. "
f"Tokens: {generated_tokens}"
)
logger.debug(
f"Successfully verified {max_tokens} tokens generated as expected via KvPushRouter with ignore_eos=True"
)
return True
return False
########################################################
# Test templates
########################################################
def _test_router_basic(
engine_workers,
block_size: int,
request,
frontend_port: int,
test_payload: dict,
num_requests: int,
wait_for_frontend: bool = False,
frontend_timeout: int = 180,
store_backend: str = "etcd",
):
"""Basic router test: start router, wait for workers (optional) and send concurrent requests via HTTP frontend.
Assumes engine_workers are already initialized. This function manages router lifecycle.
This is a shared test implementation for both mocker and vLLM workers.
The key difference is that vLLM workers need time to load models and register,
so they require wait_for_frontend=True.
Args:
engine_workers: Backend workers (mocker/vllm) already initialized with __enter__()
block_size: Block size for KV cache
request: Pytest request fixture for managing resources
frontend_port: Port to start the frontend HTTP server on
test_payload: Test payload to send to /v1/chat/completions
num_requests: Number of concurrent requests to send
wait_for_frontend: If True, poll /v1/models and /v1/chat/completions until ready
frontend_timeout: Timeout for frontend readiness check (only used if wait_for_frontend=True)
store_backend: Storage backend to use ("etcd" or "file"). Defaults to "etcd".
Raises:
AssertionError: If requests fail or frontend doesn't become ready
TimeoutError: If frontend doesn't become ready within timeout
"""
try:
# Start KV router frontend
logger.info(f"Starting KV router frontend on port {frontend_port}")
kv_router = KVRouterProcess(request, block_size, frontend_port, store_backend)
kv_router.__enter__()
frontend_url = f"http://localhost:{frontend_port}"
# Wait for workers to register with frontend if needed (vLLM requires this)
if wait_for_frontend:
logger.info("Waiting for workers to register with frontend...")
asyncio.run(
wait_for_frontend_ready(
frontend_url=frontend_url,
expected_num_workers=engine_workers.num_workers,
timeout=frontend_timeout,
)
)
# Send concurrent requests to the frontend
logger.info(f"Sending {num_requests} concurrent requests to frontend...")
asyncio.run(
send_inflight_requests(
[f"{frontend_url}/v1/chat/completions"],
test_payload,
num_requests,
)
)
logger.info(f"Successfully completed {num_requests} requests")
finally:
if "kv_router" in locals():
kv_router.__exit__(None, None, None)
def _test_router_two_routers(
engine_workers,
block_size: int,
request,
router_ports: list[int],
test_payload: dict,
num_requests: int,
store_backend: str = "etcd",
):
"""Test two KV routers with alternating requests and consumer lifecycle verification.
Assumes engine_workers are already initialized. This function manages router lifecycle.
This test:
1. Starts two KV routers on different ports
2. Sends requests alternating between the two routers
3. Verifies that both routers create durable consumers
4. Verifies consumers are cleaned up when routers exit
Args:
engine_workers: Backend workers (mocker/vllm) already initialized with __enter__()
block_size: Block size for KV cache
request: Pytest request fixture for managing resources
router_ports: List of two port numbers for the routers (e.g., [8091, 8092])
test_payload: Test payload to send to /v1/chat/completions
num_requests: Number of concurrent requests to send
store_backend: Storage backend to use ("etcd" or "file"). Defaults to "etcd".
Raises:
AssertionError: If consumer lifecycle verification fails
"""
import nats
kv_routers = []
try:
# Start two KV routers on different ports
for port in router_ports:
logger.info(f"Starting KV router frontend on port {port}")
kv_router = KVRouterProcess(request, block_size, port, store_backend)
kv_router.__enter__()
kv_routers.append(kv_router)
# Build URLs for both routers
router_urls = [
f"http://localhost:{port}/v1/chat/completions" for port in router_ports
]
# Send requests concurrently, alternating between routers
asyncio.run(
send_inflight_requests(
router_urls,
test_payload,
num_requests,
)
)
logger.info(
f"Successfully completed {num_requests} requests across {len(router_ports)} routers"
)
# Verify durable consumers lifecycle
async def verify_consumer_lifecycle():
logger.info("Verifying durable consumers lifecycle")
# Construct the stream name from the workers namespace
component_subject = f"namespace.{engine_workers.namespace}.component.{engine_workers.component_name}"
slugified = component_subject.lower().replace(".", "-").replace("_", "-")
stream_name = f"{slugified}-kv-events"
logger.info(f"Checking consumers for stream: {stream_name}")
# Connect to NATS and list consumers
nc = await nats.connect("nats://localhost:4222")
try:
js = nc.jetstream()
# List consumers - should have 2 (one for each router process)
consumer_infos = await js.consumers_info(stream_name)
consumer_names = [info.name for info in consumer_infos]
logger.info(f"Found {len(consumer_names)} consumers: {consumer_names}")
assert (
len(consumer_names) == 2
), f"Expected 2 durable consumers (one per router), found {len(consumer_names)}: {consumer_names}"
logger.info("✓ Verified 2 durable consumers exist (one per router)")
# Kill the first router process
logger.info(f"Killing first router on port {router_ports[0]}")
kv_routers[0].__exit__(None, None, None)
# Poll until one consumer remains (up to 5s)
for _ in range(25):
consumer_infos = await js.consumers_info(stream_name)
if len(list(consumer_infos)) == 1:
break
await asyncio.sleep(0.2)
# Verify only 1 consumer remains
consumer_names = [info.name for info in consumer_infos]
logger.info(
f"After killing router1, found {len(consumer_names)} consumers: {consumer_names}"
)
assert (
len(consumer_names) == 1
), f"Expected 1 durable consumer after killing router1, found {len(consumer_names)}: {consumer_names}"
logger.info(
"✓ Verified 1 durable consumer remains after killing first router"
)
# Kill the second router process
logger.info(f"Killing second router on port {router_ports[1]}")
kv_routers[1].__exit__(None, None, None)
# Poll until no consumers remain (up to 5s)
for _ in range(25):
consumer_infos = await js.consumers_info(stream_name)
if len(list(consumer_infos)) == 0:
break
await asyncio.sleep(0.2)
consumer_names = [info.name for info in consumer_infos]
logger.info(
f"After killing router2, found {len(consumer_names)} consumers: {consumer_names}"
)
assert (
len(consumer_names) == 0
), f"Expected 0 durable consumers after killing both routers, found {len(consumer_names)}: {consumer_names}"
logger.info(
"✓ Verified 0 durable consumers remain after killing both routers"
)
finally:
await nc.close()
# Run consumer lifecycle verification
asyncio.run(verify_consumer_lifecycle())
# Clear the kv_routers list since we've already cleaned them up
kv_routers = []
finally:
# Clean up any remaining routers (in case of error before consumer verification)
for kv_router in kv_routers:
kv_router.__exit__(None, None, None)
def _test_python_router_bindings(
engine_workers,
endpoint,
block_size: int,
model_name: str,
num_workers: int,
):
"""Test KvPushRouter Python bindings with token streaming and config overrides.
Assumes engine_workers are already initialized. This test creates a KvPushRouter
Python object and sends three test requests to verify:
1. Token streaming with full router config overrides (overlap_score_weight, router_temperature)
2. Token streaming without any overrides (uses default config)
3. Token streaming with partial override (only router_temperature)
All requests use ignore_eos=True with varying max_tokens to test token generation control.
Args:
engine_workers: Backend workers (mocker/vllm) already initialized with __enter__()
endpoint: Dynamo endpoint for the workers
block_size: Block size for KV cache
model_name: Model name to use for requests
num_workers: Expected number of workers
Raises:
AssertionError: If requests fail or router doesn't work correctly
"""
# Create KvRouterConfig with default settings
kv_router_config = KvRouterConfig()
# Create KvPushRouter Python object
kv_push_router = KvPushRouter(
endpoint=endpoint,
block_size=block_size,
kv_router_config=kv_router_config,
)
logger.info("Created KvPushRouter Python object")
# Wait for workers to be ready
asyncio.run(
wait_for_workers_ready(endpoint, kv_push_router, num_workers, model_name)
)
# Generate random token IDs (100 to 200 tokens)
num_input_tokens = random.randint(100, 200)
token_ids = [random.randint(1, 10000) for _ in range(num_input_tokens)]
# Set up override parameters
router_config_override = {
"overlap_score_weight": 0.5, # Override the default weight
"router_temperature": 0.5, # Override the default temperature
}
logger.info(f"Generated {num_input_tokens} random token IDs")
# Test with full overrides
logger.info(f"Testing with full router config overrides: {router_config_override}")
asyncio.run(
send_request_via_python_kv_router(
kv_python_router=kv_push_router,
model_name=model_name,
token_ids=token_ids,
initial_wait=1.0,
max_retries=8,
stop_conditions={
"ignore_eos": True, # Don't stop on EOS token
"max_tokens": 20, # Generate exactly 20 tokens
},
sampling_options={"temperature": 0.7, "top_p": 0.9},
output_options={
"include_input_tokens": False,
"return_full_text": False,
},
router_config_override=router_config_override,
)
)
# Test without overrides
logger.info("Testing without router config overrides")
asyncio.run(
send_request_via_python_kv_router(
kv_python_router=kv_push_router,
model_name=model_name,
token_ids=token_ids[:50], # Use fewer tokens for second test,
initial_wait=1.0,
max_retries=8,
stop_conditions={
"ignore_eos": True, # Don't stop on EOS token
"max_tokens": 10, # Generate exactly 10 tokens for the second test
},
sampling_options={"temperature": 0.7, "top_p": 0.9},
output_options={
"include_input_tokens": False,
"return_full_text": False,
},
# No router_config_override this time
)
)
# Test with partial override (only temperature)
partial_override = {"router_temperature": 0.1}
logger.info(f"Testing with partial router config overrides: {partial_override}")
asyncio.run(
send_request_via_python_kv_router(
kv_python_router=kv_push_router,
model_name=model_name,
token_ids=token_ids[:30], # Use fewer tokens for third test,
initial_wait=1.0,
max_retries=8,
stop_conditions={
"ignore_eos": True, # Don't stop on EOS token
"max_tokens": 5, # Generate exactly 5 tokens for the third test
},
sampling_options={"temperature": 0.7, "top_p": 0.9},
output_options={
"include_input_tokens": False,
"return_full_text": False,
},
router_config_override=partial_override,
)
)
logger.info("KvPushRouter bindings test completed successfully")
def _test_router_query_instance_id(
engine_workers,
block_size: int,
request,
frontend_port: int,
test_payload: dict,
store_backend: str = "etcd",
):
"""Test query_instance_id annotation returns worker_instance_id and token_data without routing.
Assumes engine_workers are already initialized. This function manages router lifecycle.
This tests the early return optimization where a request with 'nvext.annotations': ['query_instance_id']
receives metadata without waiting for model generation. The router should:
1. NOT route the request to a worker for generation
2. Return worker_instance_id as an SSE event (which worker would handle it)
3. Return token_data as an SSE event (the tokenized input)
4. Terminate the stream with [DONE]
This is useful for clients that want to know which worker will handle a request before
committing to the full generation (e.g., for request routing decisions).
Args:
engine_workers: Backend workers (mocker/vllm) already initialized with __enter__()
block_size: Block size for KV cache
request: Pytest request fixture for managing resources
frontend_port: Port for the frontend HTTP server
test_payload: Base test payload to send to /v1/chat/completions
store_backend: Storage backend to use ("etcd" or "file"). Defaults to "etcd".
Raises:
AssertionError: If annotation response structure is incorrect or contains generation content
"""
import aiohttp
try:
# Start KV router (frontend)
logger.info(f"Starting KV router frontend on port {frontend_port}")
kv_router = KVRouterProcess(request, block_size, frontend_port, store_backend)
kv_router.__enter__()
url = f"http://localhost:{frontend_port}/v1/chat/completions"
# Send a warming request first to ensure system is ready
logger.info("Sending warming request without annotations...")
asyncio.run(send_request_with_retry(url, test_payload))
# Test payload with query_instance_id annotation
annotated_payload = {
**test_payload,
"nvext": {"annotations": ["query_instance_id"]},
}
async def test_annotation_response():
"""Send request with query_instance_id and validate response structure"""
async with aiohttp.ClientSession() as session:
logger.info("Sending request with query_instance_id annotation...")
async with session.post(url, json=annotated_payload) as response:
assert (
response.status == 200
), f"Expected 200 but got {response.status}"
# Collect all response chunks
response_chunks = []
async for chunk in response.content:
if chunk:
chunk_str = chunk.decode("utf-8", errors="replace")
response_chunks.append(chunk_str)
full_response = "".join(response_chunks)
logger.info(
f"Full SSE response ({len(full_response)} bytes):\n{full_response}"
)
# Parse and validate the response structure
events = []
sse_parts = full_response.split("\n\n")
for part in sse_parts:
part = part.strip()
if not part:
continue
if part.startswith("event:"):
lines = part.split("\n")
event_line = next(
(line for line in lines if line.startswith("event:")),
None,
)
data_line = next(
(
line
for line in lines
if line.startswith("data:") or line.startswith(":")
),
None,
)
if event_line and data_line:
event_type = event_line.split(":", 1)[1].strip()
if data_line.startswith("data:"):
data_value = data_line.split(":", 1)[1].strip()
else:
data_value = data_line.split(":", 1)[1].strip()
events.append((event_type, data_value))
elif part.startswith("data:"):
data_value = part.split(":", 1)[1].strip()
logger.info(f"Parsed events: {events}")
# Validate worker_instance_id event
worker_event = next(
(e for e in events if e[0] == "worker_instance_id"), None
)
assert (
worker_event is not None
), f"Missing worker_instance_id event in: {events}"
# Validate token_data event
token_event = next(
(e for e in events if e[0] == "token_data"), None
)
assert (
token_event is not None
), f"Missing token_data event in: {events}"
token_data_str = token_event[1].strip('"')
try:
token_list = json.loads(token_data_str)
except json.JSONDecodeError as e:
raise AssertionError(
f"token_data is not valid JSON: {token_data_str}, error: {e}"
)
assert isinstance(
token_list, list
), f"token_data should be a list, got: {type(token_list)}"
assert (
len(token_list) > 0
), f"token_data should not be empty: {token_list}"
assert all(
isinstance(token, int) for token in token_list
), f"All tokens should be integers: {token_list}"
logger.info(
f"Valid token_data with {len(token_list)} tokens: {token_list[:10]}{'...' if len(token_list) > 10 else ''}"
)
# Validate that no actual generation happened (should only be metadata)
# This proves the early return worked correctly
generation_indicators = [
"choices",
"content",
"delta",
"finish_reason",
]
for indicator in generation_indicators:
assert (
indicator not in full_response.lower()
), f"Found generation indicator '{indicator}' - request should not have been routed to worker"
logger.info(
"No generation content found - early return worked correctly"
)
return {
"worker_instance_id": worker_event[1].strip('"'),
"token_count": len(token_list),
"tokens": token_list,
}
result = asyncio.run(test_annotation_response())
logger.info("Successfully validated query_instance_id annotation response:")
logger.info(f"Worker ID: {result['worker_instance_id']}")
logger.info(f"Token count: {result['token_count']}")
finally:
if "kv_router" in locals():
kv_router.__exit__(None, None, None)
def _test_router_overload_503(
engine_workers,
block_size: int,
request,
frontend_port: int,
test_payload: dict,
busy_threshold: float = 0.2,
):
"""Test that KV router returns 503 when all workers are busy.
Assumes engine_workers are already initialized. This function manages router lifecycle.
Uses limited resources to intentionally trigger the overload condition.
Args:
engine_workers: Backend workers (mocker/vllm) already initialized with __enter__()
block_size: Block size for KV cache (should be small to exhaust quickly, e.g. 4)
request: Pytest request fixture for managing resources
frontend_port: Port for the frontend HTTP server
test_payload: Base test payload to send to /v1/chat/completions
busy_threshold: Busy threshold for the router (default 0.2)
Raises:
AssertionError: If 503 response is not received when expected
"""
import aiohttp
from tests.utils.managed_process import ManagedProcess
try:
logger.info(
f"Starting KV router frontend on port {frontend_port} with limited resources"
)
# Custom command for router with limited block size
command = [
"python",
"-m",
"dynamo.frontend",
"--busy-threshold",
str(busy_threshold),
"--kv-cache-block-size",
str(block_size),
"--router-mode",
"kv",
"--http-port",
str(frontend_port),
]
kv_router = ManagedProcess(
command=command,
timeout=60,
display_output=True,
health_check_ports=[frontend_port],
health_check_urls=[
(
f"http://localhost:{frontend_port}/v1/models",
lambda r: r.status_code == 200,
)
],
log_dir=request.node.name,
terminate_existing=False,
)
kv_router.__enter__()
url = f"http://localhost:{frontend_port}/v1/chat/completions"
# Custom payload for 503 test with more tokens to consume resources
test_payload_503 = {
**test_payload,
"max_tokens": 50, # Longer output to consume more blocks
}
# First, send one request with retry to ensure system is ready
logger.info("Sending initial request to ensure system is ready...")
asyncio.run(send_inflight_requests([url], test_payload_503, 1))
# Now send 50 concurrent requests to exhaust resources, then verify 503
logger.info("Sending 50 concurrent requests to exhaust resources...")
async def exhaust_resources_and_verify_503():
async with aiohttp.ClientSession() as session:
# Start 50 long-running requests concurrently
tasks = []
for i in range(50):
# Create unique shuffled content for each request
content_words = test_payload["messages"][0]["content"].split()
random.shuffle(content_words)
shuffled_content = " ".join(content_words)
# Create unique payload for this request
unique_payload = {
**test_payload,
"max_tokens": 50,
"messages": [
{**test_payload["messages"][0], "content": shuffled_content}
],
}
async def send_long_request(req_id, payload):
try:
async with session.post(url, json=payload) as response:
if response.status == 200:
# Don't read the response fully, just hold the connection
await asyncio.sleep(
10
) # Hold connection for 10 seconds
return True
else:
logger.info(
f"Request {req_id} got status {response.status}"
)
return False
except Exception as e:
logger.info(f"Request {req_id} failed: {e}")
return False
tasks.append(
asyncio.create_task(send_long_request(i, unique_payload))
)
# Wait briefly to ensure requests are in-flight
await asyncio.sleep(0.2)
# Now send one more request that should get 503
logger.info("Sending additional request that should receive 503...")
try:
async with session.post(url, json=test_payload_503) as response:
status_code = response.status
if status_code == 503:
body = await response.json()
logger.info(f"Got expected 503 response: {body}")
assert "Service temporarily unavailable" in body.get(
"error", ""
) or "All workers are busy" in body.get(
"error", ""
), f"Expected service overload error message, got: {body}"
return True
else:
logger.error(f"Expected 503 but got {status_code}")
if status_code == 200:
logger.error(
"Request unexpectedly succeeded when it should have been rejected"
)
return False
except Exception as e:
logger.error(f"Failed to send overload test request: {e}")
return False
finally:
# Cancel all background tasks
for task in tasks:
task.cancel()
await asyncio.gather(*tasks, return_exceptions=True)
# Run the test
success = asyncio.run(exhaust_resources_and_verify_503())
assert success, "Failed to verify 503 response when resources are exhausted"
logger.info("Successfully verified 503 response when all workers are busy")
finally:
if "kv_router" in locals():
kv_router.__exit__(None, None, None)
def _test_router_indexers_sync(
engine_workers,
block_size: int,
model_name: str,
num_workers: int,
store_backend: str = "etcd",
):
"""Test that two KV routers have synchronized indexer states after processing requests.
Assumes engine_workers are already initialized. This test:
1. Creates first KvPushRouter (with its own runtime) and sends 25 requests (triggers snapshot at threshold=20)
2. Creates second KvPushRouter (with its own runtime, should sync from NATS snapshot)
3. Sends 25 requests to second router
4. Verifies NATS object store contains the snapshot
5. Dumps states from both routers and compares them (should be identical)
This validates that the snapshot mechanism works and routers can sync state from NATS.
Args:
engine_workers: Backend workers (mocker/vllm) already initialized with __enter__()
block_size: Block size for KV cache
model_name: Model name to use for requests
num_workers: Expected number of workers
store_backend: Storage backend to use ("etcd" or "file"). Defaults to "etcd".
Raises:
AssertionError: If router states don't synchronize correctly or snapshot is missing
"""
import nats
# Use async to manage the test flow
async def test_sync():
# Create KvRouterConfig with lower snapshot threshold for testing
kv_router_config = KvRouterConfig(router_snapshot_threshold=20)
async def send_requests_to_router(router, num_requests, router_name, endpoint):
# Now send the actual requests
tasks = []
for i in range(num_requests):
# Generate random token IDs for each request
logger.debug(f"Sending request {i + 1}/{num_requests} to {router_name}")
# Generate 30 random tokens
request_tokens = [random.randint(1, 10000) for _ in range(30)]
# Send request to mocker via the router
tasks.append(
asyncio.create_task(
send_request_via_python_kv_router(
kv_python_router=router,
model_name=model_name,
token_ids=request_tokens,
initial_wait=1.0,
max_retries=8,
stop_conditions={
"ignore_eos": True, # Don't stop on EOS token
"max_tokens": 10, # Generate exactly 10 tokens
},
)
)
)
# Wait for all requests to complete
results = await asyncio.gather(*tasks)
successful = sum(1 for r in results if r)
logger.info(
f"Completed {successful}/{num_requests} requests for {router_name}"
)
return successful
# Create first runtime and endpoint for router 1
logger.info("Creating first KV router with its own runtime")
runtime1 = get_runtime(store_backend)
namespace1 = runtime1.namespace(engine_workers.namespace)
component1 = namespace1.component(engine_workers.component_name)
endpoint1 = component1.endpoint("generate")
kv_push_router1 = KvPushRouter(
endpoint=endpoint1,
block_size=block_size,
kv_router_config=kv_router_config,
)
# Wait for workers to be ready
await wait_for_workers_ready(
endpoint1, kv_push_router1, num_workers, model_name
)
# Send 25 requests to first router
logger.info("Sending 25 requests to first router")
# Send requests to first router
successful1 = await send_requests_to_router(
kv_push_router1, 25, "Router 1", endpoint1
)
assert (
successful1 == 25
), f"Expected 25 successful requests to router 1, got {successful1}"
# Wait for a second before creating the second router
logger.info("Waiting for 1 second before creating second router")
await asyncio.sleep(1)
# Create second runtime and endpoint for router 2
logger.info("Creating second KV router with its own runtime")
runtime2 = get_runtime(store_backend)
namespace2 = runtime2.namespace(engine_workers.namespace)
component2 = namespace2.component(engine_workers.component_name)
endpoint2 = component2.endpoint("generate")
kv_push_router2 = KvPushRouter(
endpoint=endpoint2,
block_size=block_size,
kv_router_config=kv_router_config,
)
# Send 25 requests to second router with initial retry loop
logger.info("Sending 25 requests to second router")
successful2 = await send_requests_to_router(
kv_push_router2, 25, "Router 2", endpoint2
)
assert (
successful2 == 25
), f"Expected 25 successful requests to router 2, got {successful2}"
# Wait for all requests to complete (they should already be complete from gather)
# Wait another 1 second for internal synchronization
logger.info("Waiting for final synchronization")
await asyncio.sleep(1)
# Verify NATS object store bucket was created with snapshot
# Mirror the Rust bucket naming logic from subscriber.rs:
# component.subject() -> "namespace.{ns}.component.{comp}"
# then slugify (convert dots to dashes, lowercase, etc) and append "-radix-bucket"
component_subject = f"namespace.{engine_workers.namespace}.component.{engine_workers.component_name}"
slugified = component_subject.lower().replace(".", "-").replace("_", "-")
expected_bucket = f"{slugified}-radix-bucket"
expected_file = "radix-state"
logger.info(f"Verifying NATS object store bucket exists: {expected_bucket}")
snapshot_verified = False
try:
# Connect to NATS and check object store
nc = await nats.connect("nats://localhost:4222")
try:
js = nc.jetstream()
obj_store = await js.object_store(expected_bucket)
# Try to get the expected file
try:
result = await obj_store.get(expected_file)
logger.info(
f"✓ Snapshot file '{expected_file}' found in bucket '{expected_bucket}' "
f"(size: {len(result.data) if result.data else 0} bytes)"
)
snapshot_verified = True
except Exception as e:
logger.error(
f"Snapshot file '{expected_file}' not found in bucket '{expected_bucket}': {e}"
)
finally:
await nc.close()
except Exception as e:
logger.error(f"Error checking NATS object store: {e}")
# Assert that snapshot was created (threshold=20, sent 25 requests)
if not snapshot_verified:
assert False, (
f"Expected snapshot to be created in bucket '{expected_bucket}' with file '{expected_file}'. "
f"Router sent 25 requests with snapshot_threshold=20, so snapshot should have been triggered."
)
# Dump states from both routers
logger.info("Dumping states from both routers")
state1_json = await kv_push_router1.dump_events()
state2_json = await kv_push_router2.dump_events()
# Parse JSON strings for comparison
state1 = json.loads(state1_json)
state2 = json.loads(state2_json)
# Sort both states for comparison (order might differ due to HashMap iteration and sharding)
def sort_key(event):
data = event["event"]["data"]["stored"]
blocks = data["blocks"]
first_block = blocks[0]
return (
event["worker_id"],
first_block["tokens_hash"],
data["parent_hash"],
)
sorted_state1 = sorted(state1, key=sort_key)
sorted_state2 = sorted(state2, key=sort_key)
# Verify they are equal
logger.info(f"Router 1 has {len(sorted_state1)} events")
logger.info(f"Router 2 has {len(sorted_state2)} events")
# Compare states one by one and only show differences
if len(sorted_state1) != len(sorted_state2):
logger.error(
f"Router 1 has {len(sorted_state1)} events, Router 2 has {len(sorted_state2)} events"
)
assert False, "Router states have different numbers of events"
differences = []
for i, (state1_item, state2_item) in enumerate(
zip(sorted_state1, sorted_state2)
):
# Create copies without event_id for comparison
item1_compare = state1_item.copy()
item2_compare = state2_item.copy()
# Remove event_id from the nested event structure
if "event" in item1_compare and "event_id" in item1_compare["event"]:
del item1_compare["event"]["event_id"]
if "event" in item2_compare and "event_id" in item2_compare["event"]:
del item2_compare["event"]["event_id"]
if item1_compare != item2_compare:
differences.append(
{
"index": i,
"router1_state": state1_item,
"router2_state": state2_item,
}
)
# If there are differences, format them for easier debugging
if differences:
error_msg = (
f"Router states are not equal. Found {len(differences)} differences:\n"
)
for diff in differences:
error_msg += f"\nDifference at index {diff['index']}:\n"
error_msg += (
f"Router 1: {json.dumps(diff['router1_state'], indent=2)}\n"
)
error_msg += (
f"Router 2: {json.dumps(diff['router2_state'], indent=2)}\n"
)
error_msg += "-" * 80 + "\n"
assert False, error_msg
logger.info("Successfully verified that both router states are equal")
# Verify NATS consumers are created (while routers are still alive)
logger.info("Verifying NATS consumers exist for both routers")
component_subject = f"namespace.{engine_workers.namespace}.component.{engine_workers.component_name}"
slugified = component_subject.lower().replace(".", "-").replace("_", "-")
stream_name = f"{slugified}-kv-events"
nc = await nats.connect("nats://localhost:4222")
try:
js = nc.jetstream()
consumer_infos = await js.consumers_info(stream_name)
consumer_names = [info.name for info in consumer_infos]
logger.info(f"Found {len(consumer_names)} consumers: {consumer_names}")
assert len(consumer_names) == 2, (
f"Expected 2 durable consumers (one per router), "
f"found {len(consumer_names)}: {consumer_names}"
)
logger.info("✓ Verified 2 durable consumers exist (one per router)")
finally:
await nc.close()
# Run the async test
asyncio.run(test_sync())
logger.info("Indexers sync test completed successfully")
def _test_router_decisions(
engine_workers,
endpoint,
model_name: str,
request,
test_dp_rank: bool = False,
):
"""Validate KV cache prefix reuse and worker routing by sending progressive requests with overlapping prefixes.
Assumes engine workers are already initialized. Sends 4 progressive requests where each extends
the previous tokens by BLOCK_SIZE. The first request is forced to a specific worker (and optionally
dp_rank), and subsequent requests should naturally route to the same worker due to prefix reuse.
Args:
engine_workers: MockerProcess or VLLMProcess instance (already initialized with __enter__())
endpoint: Endpoint of the engine workers
model_name: Name of the model
request: Pytest request fixture
test_dp_rank: If True, also forces and validates dp_rank routing (for data parallel setups)
Raises:
AssertionError: If routing decisions don't follow KV cache prefix reuse as expected
"""
# Create KvRouterConfig with lower snapshot threshold for testing
kv_router_config = KvRouterConfig(router_snapshot_threshold=20)
kv_push_router = KvPushRouter(
endpoint=endpoint,
block_size=BLOCK_SIZE,
kv_router_config=kv_router_config,
)
# Use async to manage the test flow
async def test_sync():
# Wait for workers to be ready and get their instance IDs
worker_ids = await wait_for_workers_ready(
endpoint,
kv_push_router,
expected_num_workers=engine_workers.num_workers,
model_name=model_name,
)
logger.info(f"Workers ready: {worker_ids}")
# Use the first worker_id for forced routing
forced_worker_id = worker_ids[0]
forced_dp_rank = 1 if test_dp_rank else None
if test_dp_rank:
logger.info(
f"Will force first request to worker_id={forced_worker_id}, dp_rank={forced_dp_rank}"
)
else:
logger.info(f"Will force first request to worker_id={forced_worker_id}")
# Send 4 progressive requests with overlapping prefixes
cumulative_tokens = []
for i in range(4):
# Add BLOCK_SIZE new random tokens
new_tokens = [random.randint(1, 10000) for _ in range(BLOCK_SIZE)]
cumulative_tokens.extend(new_tokens)
# Force first request to specific worker_id (and dp_rank if testing DP), let subsequent requests follow naturally
worker_id_override = forced_worker_id if i == 0 else None
dp_rank_override = forced_dp_rank if i == 0 and test_dp_rank else None
log_msg = (
f"Sending request {i + 1}/4 with {len(cumulative_tokens)} tokens "
f"(added {len(new_tokens)} new tokens)"
)
if worker_id_override is not None:
if test_dp_rank:
log_msg += f" - FORCING worker_id={worker_id_override}, dp_rank={dp_rank_override}"
else:
log_msg += f" - FORCING worker_id={worker_id_override}"
logger.info(log_msg)
await send_request_via_python_kv_router(
kv_python_router=kv_push_router,
model_name=model_name,
token_ids=cumulative_tokens.copy(),
initial_wait=1.0,
max_retries=8,
stop_conditions={
"ignore_eos": True, # Don't stop on EOS token
"max_tokens": 2, # Generate exactly 2 tokens
},
worker_id=worker_id_override,
dp_rank=dp_rank_override,
)
# Wait a bit between requests
await asyncio.sleep(0.5)
# Wait for final synchronization (especially important for DP)
if test_dp_rank:
await asyncio.sleep(1)
# Dump events from the router
events_json = await kv_push_router.dump_events()
return events_json, forced_worker_id, forced_dp_rank
# Run the async test
events_json, expected_worker_id, expected_dp_rank = asyncio.run(test_sync())
# Parse events and count by worker routing key (worker_id or (worker_id, dp_rank))
events = json.loads(events_json)
if test_dp_rank:
# Group by (worker_id, dp_rank) tuple for DP testing
events_by_key_dp: dict[tuple[int, int], list[Any]] = {}
for event in events:
worker_id = event.get("worker_id")
dp_rank = event.get("event", {}).get("dp_rank", 0)
key = (worker_id, dp_rank)
if key not in events_by_key_dp:
events_by_key_dp[key] = []
events_by_key_dp[key].append(event)
logger.info(
f"Events by (worker_id, dp_rank): {[(key, len(evts)) for key, evts in events_by_key_dp.items()]}"
)
# Verify: All but one routing key should have no events (due to prefix reuse)
keys_with_events_dp = [
key for key, evts in events_by_key_dp.items() if len(evts) > 0
]
assert len(keys_with_events_dp) == 1, (
f"Expected exactly 1 (worker_id, dp_rank) to have events (due to prefix reuse), "
f"but found {len(keys_with_events_dp)} with events: {keys_with_events_dp}"
)
# Verify: The routing key with events should have exactly 4 events (one per request)
active_key_dp = keys_with_events_dp[0]
num_events = len(events_by_key_dp[active_key_dp])
assert num_events == 4, (
f"Expected (worker_id, dp_rank) {active_key_dp} to have exactly 4 events, "
f"but found {num_events} events"
)
# Verify: Routing should match the forced values
active_worker_id, active_dp_rank = active_key_dp
assert active_worker_id == expected_worker_id, (
f"Expected all events to have worker_id={expected_worker_id} (forced in first request), "
f"but found worker_id={active_worker_id}"
)
assert active_dp_rank == expected_dp_rank, (
f"Expected all events to have dp_rank={expected_dp_rank} (forced in first request), "
f"but found dp_rank={active_dp_rank}"
)
logger.info(
f"Successfully verified: Worker {active_worker_id} dp_rank {active_dp_rank} handled all 4 requests with prefix reuse. "
f"All events correctly routed to worker_id={expected_worker_id}, dp_rank={expected_dp_rank} as expected. "
f"KV events synchronized correctly."
)
else:
# Group by worker_id only for multiple workers testing
events_by_key_single: dict[int, list] = {}
for event in events:
worker_id = event.get("worker_id")
if worker_id not in events_by_key_single:
events_by_key_single[worker_id] = []
events_by_key_single[worker_id].append(event)
logger.info(
f"Events by worker_id: {[(key, len(evts)) for key, evts in events_by_key_single.items()]}"
)
# Verify: All but one routing key should have no events (due to prefix reuse)
keys_with_events_single = [
key for key, evts in events_by_key_single.items() if len(evts) > 0
]
assert len(keys_with_events_single) == 1, (
f"Expected exactly 1 worker_id to have events (due to prefix reuse), "
f"but found {len(keys_with_events_single)} with events: {keys_with_events_single}"
)
# Verify: The routing key with events should have exactly 4 events (one per request)
active_worker_id = keys_with_events_single[0]
num_events = len(events_by_key_single[active_worker_id])
assert num_events == 4, (
f"Expected worker_id {active_worker_id} to have exactly 4 events, "
f"but found {num_events} events"
)
# Verify: Routing should match the forced values
assert active_worker_id == expected_worker_id, (
f"Expected all events to have worker_id={expected_worker_id} (forced in first request), "
f"but found worker_id={active_worker_id}"
)
logger.info(
f"Successfully verified: Worker {active_worker_id} handled all 4 requests with prefix reuse. "
f"All events correctly routed to worker_id={expected_worker_id} as expected. "
f"KV events synchronized correctly."
)
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
import asyncio
import json
import logging
import os
import random
import string
import tempfile
from typing import Any, Dict, Optional
import aiohttp
import nats
import pytest
from dynamo._core import DistributedRuntime, KvPushRouter, KvRouterConfig
from tests.router.common import ( # utilities
_test_python_router_bindings,
_test_router_basic,
_test_router_decisions,
_test_router_indexers_sync,
_test_router_overload_503,
_test_router_query_instance_id,
_test_router_two_routers,
generate_random_suffix,
get_runtime,
)
from tests.utils.constants import ROUTER_MODEL_NAME
from tests.utils.managed_process import ManagedProcess
pytestmark = pytest.mark.pre_merge
logger = logging.getLogger(__name__)
@pytest.fixture
def file_storage_backend():
"""Fixture that sets up and tears down file storage backend.
Creates a temporary directory for file-based KV storage and sets
the DYN_FILE_KV environment variable. Cleans up after the test.
"""
with tempfile.TemporaryDirectory() as tmpdir:
old_env = os.environ.get("DYN_FILE_KV")
os.environ["DYN_FILE_KV"] = tmpdir
logger.info(f"Set up file storage backend in: {tmpdir}")
yield tmpdir
# Cleanup
if old_env is not None:
os.environ["DYN_FILE_KV"] = old_env
else:
os.environ.pop("DYN_FILE_KV", None)
logger = logging.getLogger(__name__)
MODEL_NAME = ROUTER_MODEL_NAME
NUM_MOCKERS = 2
BLOCK_SIZE = 16
SPEEDUP_RATIO = 10.0
PORTS = [
8011,
8022,
] # Frontend ports: use PORTS[0] for single router, PORTS for multi-router
NUM_REQUESTS = 100
PORT = 8090 # Starting port for mocker instances
def generate_random_suffix() -> str:
"""Generate a 10-character random alphabetic suffix for namespace isolation."""
return "".join(random.choices(string.ascii_lowercase, k=10))
BLOCK_SIZE = 16
# Shared test payload for all tests
......@@ -82,8 +64,10 @@ class MockerProcess:
# Generate a unique namespace suffix shared by all mockers
namespace_suffix = generate_random_suffix()
self.namespace = f"test-namespace-{namespace_suffix}"
self.endpoint = f"dyn://{self.namespace}.mocker.generate"
self.component_name = "mocker"
self.endpoint = f"dyn://{self.namespace}.{self.component_name}.generate"
self.num_mockers = num_mockers
self.num_workers = self.num_mockers # for compatibility with common.py
self.mocker_processes = []
# Default mocker args if not provided
......@@ -163,353 +147,6 @@ class MockerProcess:
process.__exit__(exc_type, exc_val, exc_tb)
class KVRouterProcess(ManagedProcess):
"""Manages the KV router process using dynamo.frontend"""
def __init__(self, request, frontend_port: int, store_backend: str = "etcd"):
command = [
"python",
"-m",
"dynamo.frontend",
"--kv-cache-block-size",
str(BLOCK_SIZE),
"--router-mode",
"kv",
"--http-port",
str(frontend_port),
"--store-kv",
store_backend,
]
super().__init__(
command=command,
timeout=60,
display_output=True,
health_check_ports=[frontend_port],
health_check_urls=[
(f"http://localhost:{frontend_port}/v1/models", self._check_ready)
],
log_dir=request.node.name,
terminate_existing=False,
)
self.port = frontend_port
def _check_ready(self, response):
"""Check if KV router is ready"""
return response.status_code == 200
def __exit__(self, exc_type, exc_val, exc_tb):
super().__exit__(exc_type, exc_val, exc_tb)
async def send_request_with_retry(url: str, payload: dict, max_retries: int = 8):
"""Send a single request with exponential backoff retry"""
wait_time = 1 # Start with 1 second
for attempt in range(max_retries + 1):
await asyncio.sleep(wait_time)
try:
async with aiohttp.ClientSession() as session:
async with session.post(url, json=payload) as response:
if response.status == 200:
# Read the response to ensure it's valid
async for _ in response.content:
pass
logger.debug(
f"First request succeeded on attempt {attempt + 1}"
)
return True
else:
logger.warning(
f"Attempt {attempt + 1} failed with status {response.status}"
)
except Exception as e:
logger.warning(f"Attempt {attempt + 1} failed with error: {e}")
if attempt < max_retries:
wait_time *= 2 # Double the wait time
return False
def get_runtime(store_backend="etcd", request_plane="nats"):
"""Create a DistributedRuntime instance for testing.
Args:
store_backend: Storage backend to use ("etcd" or "file"). Defaults to "etcd".
request_plane: How frontend talks to backend ("tcp", "http" or "nats). Defaults to "nats".
"""
try:
# Try to get running loop (works in async context)
loop = asyncio.get_running_loop()
except RuntimeError:
# No running loop, create a new one (sync context)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
return DistributedRuntime(loop, store_backend, request_plane)
async def check_nats_consumers(namespace: str, expected_count: Optional[int] = None):
"""Check NATS consumers for the KV events stream.
Args:
namespace: The namespace to check consumers for
expected_count: Optional expected number of consumers. If provided, logs an error if count doesn't match.
Returns:
List of consumer names
"""
component_subject = f"namespace.{namespace}.component.mocker"
slugified = component_subject.lower().replace(".", "-").replace("_", "-")
stream_name = f"{slugified}-kv-events"
logger.info(f"Checking consumers for stream: {stream_name}")
nc = await nats.connect("nats://localhost:4222")
try:
js = nc.jetstream()
consumer_infos = await js.consumers_info(stream_name)
consumer_names = [info.name for info in consumer_infos]
logger.info(f"Found {len(consumer_names)} consumers: {consumer_names}")
# Log detailed consumer info
for info in consumer_infos:
logger.info(
f"Consumer {info.name}: "
f"num_pending={info.num_pending}, "
f"num_ack_pending={info.num_ack_pending}, "
f"ack_floor={info.ack_floor}, "
f"delivered={info.delivered}"
)
if expected_count is not None:
assert (
len(consumer_names) == expected_count
), f"Expected {expected_count} durable consumers, found {len(consumer_names)}: {consumer_names}"
logger.info(f"✓ Verified {expected_count} durable consumers exist")
return consumer_names
finally:
await nc.close()
async def send_inflight_requests(urls: list, payload: dict, num_requests: int):
"""Send multiple requests concurrently, alternating between URLs if multiple provided"""
# First, send test requests with retry to ensure all systems are ready
for i, url in enumerate(urls):
logger.info(f"Sending initial test request to URL {i} ({url}) with retry...")
if not await send_request_with_retry(url, payload):
raise RuntimeError(f"Failed to connect to URL {i} after multiple retries")
async def send_single_request(session: aiohttp.ClientSession, request_id: int):
# Alternate between URLs based on request_id
url = urls[request_id % len(urls)]
url_index = request_id % len(urls)
try:
async with session.post(url, json=payload) as response:
if response.status != 200:
logger.error(
f"Request {request_id} to URL {url_index} failed with status {response.status}"
)
return False
# For streaming responses, read the entire stream
chunks = []
async for line in response.content:
if line:
chunks.append(line)
logger.debug(
f"Request {request_id} to URL {url_index} completed with {len(chunks)} chunks"
)
return True
except Exception as e:
logger.error(
f"Request {request_id} to URL {url_index} failed with error: {e}"
)
return False
# Send all requests at once
async with aiohttp.ClientSession() as session:
tasks = [send_single_request(session, i) for i in range(num_requests)]
results = await asyncio.gather(*tasks)
successful = sum(1 for r in results if r)
failed = sum(1 for r in results if not r)
logger.info(f"Completed all requests: {successful} successful, {failed} failed")
assert (
successful == num_requests
), f"Expected {num_requests} successful requests, got {successful}"
logger.info(f"All {num_requests} requests completed successfully")
async def send_request_via_python_kv_router(
kv_python_router: KvPushRouter,
token_ids: list,
initial_wait: float,
max_retries: int,
stop_conditions: Optional[dict] = None,
sampling_options: Optional[dict] = None,
output_options: Optional[dict] = None,
router_config_override: Optional[dict] = None,
worker_id: Optional[
int
] = None, # If None, Router will select the best available worker
dp_rank: Optional[int] = None, # Data parallel rank (defaults to 0)
):
"""Send a request to the specified mocker instance.
Returns True if mockers respond, otherwise raises or returns False.
"""
wait_time = initial_wait
log_message = (
f"the mocker with worker_id={worker_id}"
if worker_id is not None
else "the best available mocker"
)
# Retry loop sending reuqest to mocker worker with exponential backoff
for attempt in range(max_retries + 1):
try:
logger.debug(f"Sending request to {log_message} (attempt {attempt + 1})")
stream = await kv_python_router.generate(
token_ids=token_ids,
model=MODEL_NAME,
stop_conditions=stop_conditions,
sampling_options=sampling_options,
output_options=output_options,
router_config_override=router_config_override,
worker_id=worker_id,
dp_rank=dp_rank,
)
if stream is not None:
logger.debug(f"Request succeeded on attempt {attempt + 1}")
break
except Exception as e:
logger.warning(f"Attempt {attempt + 1} failed with error: {e}")
if attempt < max_retries:
await asyncio.sleep(wait_time)
wait_time *= 2
else:
raise RuntimeError(
f"Failed to connect to mockers after {max_retries + 1} attempts: {e}"
)
# Collect tokens from the SSE stream
generated_tokens = []
async for response in stream:
if isinstance(response, dict):
# Check if response has token_ids
if "token_ids" in response:
tokens = response["token_ids"]
if isinstance(tokens, list):
generated_tokens.extend(tokens)
logger.debug(f"Received {len(tokens)} tokens: {tokens}")
# Check for finish reason
if "finish_reason" in response:
logger.debug(
f"Stream finished with reason: {response['finish_reason']}"
)
# Verify if expected number of tokens are generated if max_tokens specified and ignore_eos is True
logger.debug(f"Total generated tokens: {len(generated_tokens)}")
if (
stop_conditions
and "max_tokens" in stop_conditions
and "ignore_eos" in stop_conditions
and stop_conditions["ignore_eos"]
):
max_tokens = int(stop_conditions["max_tokens"])
assert len(generated_tokens) == max_tokens, (
f"Expected exactly {max_tokens} tokens but got {len(generated_tokens)}. "
f"Tokens: {generated_tokens}"
)
logger.debug(
f"Successfully verified {max_tokens} tokens generated as expected via KvPushRouter with ignore_eos=True"
)
return True
return False
async def wait_for_mockers_ready(
endpoint, router: KvPushRouter, expected_num_workers: int = NUM_MOCKERS
) -> list[int]:
"""Wait for mocker workers to be ready and return their instance IDs.
This function polls the endpoint's client for instance IDs until the expected
number of workers are available, then sends a warmup request to verify they
can handle requests.
Args:
endpoint: The endpoint object to get the client from
router: The KvPushRouter to use for sending warmup requests
expected_num_workers: Number of workers to wait for (default: NUM_MOCKERS)
Returns:
Sorted list of unique instance IDs (ints).
Raises:
AssertionError: If workers don't become ready or warmup request fails.
"""
logger.info("Waiting for mockers to be ready")
# Get the client from the endpoint
client = await endpoint.client()
# Poll for instance IDs until we have the expected number
instance_ids: list[int] = []
max_wait_time = 60 # seconds
start_time = asyncio.get_event_loop().time()
while len(instance_ids) < expected_num_workers:
instance_ids = client.instance_ids()
logger.info(f"Found {len(instance_ids)} instance(s): {instance_ids}")
if len(instance_ids) >= expected_num_workers:
break
# Check timeout
if asyncio.get_event_loop().time() - start_time > max_wait_time:
raise AssertionError(
f"Timeout waiting for workers. Found {len(instance_ids)} instance(s), expected {expected_num_workers}"
)
# Wait 1 second before polling again
await asyncio.sleep(1.0)
# Send a warmup request to verify workers can handle requests
test_token_ids = [random.randint(1, 10000) for _ in range(4)]
logger.info(f"Sending warmup request with {len(test_token_ids)} tokens")
try:
await send_request_via_python_kv_router(
kv_python_router=router,
token_ids=test_token_ids,
initial_wait=1.0,
max_retries=8,
stop_conditions={
"ignore_eos": True,
"max_tokens": 2,
},
)
except Exception as e:
raise AssertionError(f"Warmup request failed: {e}")
logger.info(f"All {len(instance_ids)} workers are ready")
return sorted(instance_ids)
@pytest.mark.pre_merge
@pytest.mark.model(MODEL_NAME)
def test_mocker_kv_router(request, runtime_services, predownload_tokenizers):
......@@ -525,13 +162,6 @@ def test_mocker_kv_router(request, runtime_services, predownload_tokenizers):
mocker_args = {"speedup_ratio": SPEEDUP_RATIO, "block_size": BLOCK_SIZE}
try:
# Start KV router (frontend)
frontend_port = PORT
logger.info(f"Starting KV router frontend on port {frontend_port}")
kv_router = KVRouterProcess(request, frontend_port)
kv_router.__enter__()
# Start mocker instances with the new CLI interface
logger.info(f"Starting {NUM_MOCKERS} mocker instances")
mockers = MockerProcess(
......@@ -540,24 +170,18 @@ def test_mocker_kv_router(request, runtime_services, predownload_tokenizers):
logger.info(f"All mockers using endpoint: {mockers.endpoint}")
mockers.__enter__()
# Use async to send requests concurrently for better performance
asyncio.run(
send_inflight_requests(
[
f"http://localhost:{frontend_port}/v1/chat/completions"
], # Pass as list
TEST_PAYLOAD,
NUM_REQUESTS,
)
# Run basic router test (starts router internally, mocker workers don't need frontend readiness check)
_test_router_basic(
engine_workers=mockers,
block_size=BLOCK_SIZE,
request=request,
frontend_port=PORTS[0],
test_payload=TEST_PAYLOAD,
num_requests=NUM_REQUESTS,
wait_for_frontend=False, # Mocker workers are fast, no need to wait
)
logger.info(f"Successfully completed {NUM_REQUESTS} requests")
finally:
# Clean up
if "kv_router" in locals():
kv_router.__exit__(None, None, None)
if "mockers" in locals():
mockers.__exit__(None, None, None)
......@@ -583,21 +207,10 @@ def test_mocker_two_kv_router(
f"Starting mocker two KV router test with {store_backend} storage backend"
)
# Create mocker args dictionary: FixtureRequest: tuple[NatsServer, EtcdServer]: NoneType
# Create mocker args dictionary
mocker_args = {"speedup_ratio": SPEEDUP_RATIO, "block_size": BLOCK_SIZE}
kv_routers = []
try:
# Start two KV routers (frontend) on ports 8091 and 8092
router_ports = [PORT + 1, PORT + 2] # 8091 and 8092
for port in router_ports:
logger.info(f"Starting KV router frontend on port {port}")
kv_router = KVRouterProcess(request, port, store_backend)
kv_router.__enter__()
kv_routers.append(kv_router)
# Start mocker instances with the new CLI interface
logger.info(f"Starting {NUM_MOCKERS} mocker instances")
mockers = MockerProcess(
......@@ -609,69 +222,18 @@ def test_mocker_two_kv_router(
logger.info(f"All mockers using endpoint: {mockers.endpoint}")
mockers.__enter__()
# Build URLs for both routers
router_urls = [
f"http://localhost:{port}/v1/chat/completions" for port in router_ports
]
# Use async to send requests concurrently, alternating between routers
asyncio.run(
send_inflight_requests(
router_urls,
TEST_PAYLOAD,
NUM_REQUESTS,
)
)
logger.info(
f"Successfully completed {NUM_REQUESTS} requests across {len(router_ports)} routers"
)
# Verify durable consumers lifecycle
async def verify_consumer_lifecycle():
logger.info("Verifying durable consumers lifecycle")
# Check initial consumer count - should have 2 (one for each router process)
await check_nats_consumers(mockers.namespace, expected_count=2)
# Kill the first router process
logger.info(f"Killing first router on port {router_ports[0]}")
kv_routers[0].__exit__(None, None, None)
# Wait for cleanup to happen (consumer deletion is triggered by etcd watch)
await asyncio.sleep(1)
# Verify only 1 consumer remains
await check_nats_consumers(mockers.namespace, expected_count=1)
logger.info(
"✓ Verified 1 durable consumer remains after killing first router"
)
# Kill the second router process
logger.info(f"Killing second router on port {router_ports[1]}")
kv_routers[1].__exit__(None, None, None)
# Wait for cleanup to happen
await asyncio.sleep(1)
# Verify no consumers remain
await check_nats_consumers(mockers.namespace, expected_count=0)
logger.info(
"✓ Verified 0 durable consumers remain after killing both routers"
# Run two-router test (starts KV routers internally and manages their lifecycle)
_test_router_two_routers(
engine_workers=mockers,
block_size=BLOCK_SIZE,
request=request,
router_ports=PORTS,
test_payload=TEST_PAYLOAD,
num_requests=NUM_REQUESTS,
store_backend=store_backend,
)
# Run consumer lifecycle verification
asyncio.run(verify_consumer_lifecycle())
# Clear the kv_routers list since we've already cleaned them up
kv_routers = []
finally:
# Clean up any remaining routers (in case of error before consumer verification)
for kv_router in kv_routers:
kv_router.__exit__(None, None, None)
# Clean up mockers
if "mockers" in locals():
mockers.__exit__(None, None, None)
......@@ -682,12 +244,7 @@ def test_mocker_two_kv_router(
def test_mocker_kv_router_overload_503(
request, runtime_services, predownload_tokenizers
):
"""
Test that KV router returns 503 when all workers are busy.
This test uses limited resources to intentionally trigger the overload condition.
"""
# runtime_services starts etcd and nats
"""Test that KV router returns 503 when mocker workers are overloaded."""
logger.info("Starting mocker KV router overload test for 503 status")
# Create mocker args dictionary with limited resources
mocker_args = {
......@@ -697,149 +254,24 @@ def test_mocker_kv_router_overload_503(
}
try:
# Start KV router (frontend) with limited block size
frontend_port = PORT + 10 # Use different port to avoid conflicts
logger.info(
f"Starting KV router frontend on port {frontend_port} with limited resources"
)
# Custom command for router with limited block size
command = [
"python",
"-m",
"dynamo.frontend",
"--busy-threshold",
"0.2",
"--kv-cache-block-size",
"4", # Match the mocker's block size
"--router-mode",
"kv",
"--http-port",
str(frontend_port),
]
kv_router = ManagedProcess(
command=command,
timeout=60,
display_output=True,
health_check_ports=[frontend_port],
health_check_urls=[
(
f"http://localhost:{frontend_port}/v1/models",
lambda r: r.status_code == 200,
)
],
log_dir=request.node.name,
terminate_existing=False,
)
kv_router.__enter__()
# Start single mocker instance with limited resources using the new CLI interface
# Start single mocker instance with limited resources
logger.info("Starting single mocker instance with limited resources")
mockers = MockerProcess(request, mocker_args=mocker_args, num_mockers=1)
logger.info(f"Mocker using endpoint: {mockers.endpoint}")
mockers.__enter__()
url = f"http://localhost:{frontend_port}/v1/chat/completions"
# Custom payload for 503 test with more tokens to consume resources
test_payload_503 = {
**TEST_PAYLOAD,
"max_tokens": 50, # Longer output to consume more blocks
}
# First, send one request with retry to ensure system is ready
logger.info("Sending initial request to ensure system is ready...")
asyncio.run(send_inflight_requests([url], test_payload_503, 1))
# Now send 50 concurrent requests to exhaust resources, then verify 503
logger.info("Sending 50 concurrent requests to exhaust resources...")
async def exhaust_resources_and_verify_503():
async with aiohttp.ClientSession() as session:
# Start 50 long-running requests concurrently
tasks = []
for i in range(50):
# Create unique shuffled content for each request
content_words = TEST_PAYLOAD["messages"][0]["content"].split()
random.shuffle(content_words)
shuffled_content = " ".join(content_words)
# Create unique payload for this request
unique_payload = {
**TEST_PAYLOAD,
"max_tokens": 50,
"messages": [
{**TEST_PAYLOAD["messages"][0], "content": shuffled_content}
],
}
async def send_long_request(req_id, payload):
try:
async with session.post(url, json=payload) as response:
if response.status == 200:
# Don't read the response fully, just hold the connection
await asyncio.sleep(
10
) # Hold connection for 10 seconds
return True
else:
logger.info(
f"Request {req_id} got status {response.status}"
)
return False
except Exception as e:
logger.info(f"Request {req_id} failed: {e}")
return False
tasks.append(
asyncio.create_task(send_long_request(i, unique_payload))
# Run overload 503 test
frontend_port = PORTS[0] + 10 # Use different port to avoid conflicts
_test_router_overload_503(
engine_workers=mockers,
block_size=4, # Match the mocker's block size
request=request,
frontend_port=frontend_port,
test_payload=TEST_PAYLOAD,
busy_threshold=0.2,
)
# Wait briefly to ensure requests are in-flight
await asyncio.sleep(0.2)
# Now send one more request that should get 503
logger.info("Sending additional request that should receive 503...")
try:
async with session.post(url, json=test_payload_503) as response:
status_code = response.status
if status_code == 503:
body = await response.json()
logger.info(f"Got expected 503 response: {body}")
assert "Service temporarily unavailable" in body.get(
"error", ""
) or "All workers are busy" in body.get(
"error", ""
), f"Expected service overload error message, got: {body}"
return True
else:
logger.error(f"Expected 503 but got {status_code}")
if status_code == 200:
logger.error(
"Request unexpectedly succeeded when it should have been rejected"
)
return False
except Exception as e:
logger.error(f"Failed to send overload test request: {e}")
return False
finally:
# Cancel all background tasks
for task in tasks:
task.cancel()
await asyncio.gather(*tasks, return_exceptions=True)
# Run the test
success = asyncio.run(exhaust_resources_and_verify_503())
assert success, "Failed to verify 503 response when resources are exhausted"
logger.info("Successfully verified 503 response when all workers are busy")
finally:
# Clean up
if "kv_router" in locals():
kv_router.__exit__(None, None, None)
if "mockers" in locals():
mockers.__exit__(None, None, None)
......@@ -847,20 +279,12 @@ def test_mocker_kv_router_overload_503(
@pytest.mark.pre_merge
@pytest.mark.model(MODEL_NAME)
def test_kv_push_router_bindings(request, runtime_services, predownload_tokenizers):
"""
Test KvPushRouter Python bindings with mocker engines.
This test creates KvPushRouter as a Python object and verifies
token streaming with ignore_eos=True and max_tokens=20.
"""
# runtime_services starts etcd and nats
"""Test KvPushRouter Python bindings with mocker engines."""
logger.info("Starting KvPushRouter bindings test")
# Create mocker args dictionary
mocker_args = {"speedup_ratio": SPEEDUP_RATIO, "block_size": BLOCK_SIZE}
try:
# Start mocker instances with the new CLI interface
# Start mocker instances
logger.info(f"Starting {NUM_MOCKERS} mocker instances")
mockers = MockerProcess(
request, mocker_args=mocker_args, num_mockers=NUM_MOCKERS
......@@ -870,108 +294,20 @@ def test_kv_push_router_bindings(request, runtime_services, predownload_tokenize
# Get runtime and create endpoint
runtime = get_runtime()
# Use the namespace from the mockers
namespace = runtime.namespace(mockers.namespace)
component = namespace.component("mocker")
component = namespace.component(mockers.component_name)
endpoint = component.endpoint("generate")
# Create KvRouterConfig with default settings
kv_router_config = KvRouterConfig()
# Create KvPushRouter Python object
kv_push_router = KvPushRouter(
# Run Python router bindings test
_test_python_router_bindings(
engine_workers=mockers,
endpoint=endpoint,
block_size=BLOCK_SIZE,
kv_router_config=kv_router_config,
model_name=MODEL_NAME,
num_workers=NUM_MOCKERS,
)
logger.info("Created KvPushRouter Python object")
# Wait for mockers to be ready
asyncio.run(wait_for_mockers_ready(endpoint, kv_push_router))
# Generate random token IDs (100 to 200 tokens)
num_input_tokens = random.randint(100, 200)
token_ids = [random.randint(1, 10000) for _ in range(num_input_tokens)]
# Set up override parameters
router_config_override = {
"overlap_score_weight": 0.5, # Override the default weight
"router_temperature": 0.5, # Override the default temperature
}
logger.info(f"Generated {num_input_tokens} random token IDs")
# Test with full overrides
logger.info(
f"Testing with full router config overrides: {router_config_override}"
)
asyncio.run(
send_request_via_python_kv_router(
kv_python_router=kv_push_router,
token_ids=token_ids,
initial_wait=1.0,
max_retries=8,
stop_conditions={
"ignore_eos": True, # Don't stop on EOS token
"max_tokens": 20, # Generate exactly 20 tokens
},
sampling_options={"temperature": 0.7, "top_p": 0.9},
output_options={
"include_input_tokens": False,
"return_full_text": False,
},
router_config_override=router_config_override,
)
)
# Test without overrides
logger.info("Testing without router config overrides")
asyncio.run(
send_request_via_python_kv_router(
kv_python_router=kv_push_router,
token_ids=token_ids[:50], # Use fewer tokens for second test,
initial_wait=1.0,
max_retries=8,
stop_conditions={
"ignore_eos": True, # Don't stop on EOS token
"max_tokens": 10, # Generate exactly 10 tokens for the second test
},
sampling_options={"temperature": 0.7, "top_p": 0.9},
output_options={
"include_input_tokens": False,
"return_full_text": False,
},
# No router_config_override this time
)
)
# Test with partial override (only temperature)
partial_override = {"router_temperature": 0.1}
logger.info(f"Testing with partial router config overrides: {partial_override}")
asyncio.run(
send_request_via_python_kv_router(
kv_python_router=kv_push_router,
token_ids=token_ids[:30], # Use fewer tokens for third test,
initial_wait=1.0,
max_retries=8,
stop_conditions={
"ignore_eos": True, # Don't stop on EOS token
"max_tokens": 5, # Generate exactly 5 tokens for the third test
},
sampling_options={"temperature": 0.7, "top_p": 0.9},
output_options={
"include_input_tokens": False,
"return_full_text": False,
},
router_config_override=partial_override,
)
)
logger.info("KvPushRouter bindings test completed successfully")
finally:
# Clean up mockers
if "mockers" in locals():
mockers.__exit__(None, None, None)
......@@ -995,11 +331,11 @@ def test_indexers_sync(
# runtime_services starts etcd and nats
logger.info(f"Starting indexers sync test with {store_backend} storage backend")
# Create mocker args dicti: FixtureRequestonary: tuple[NatsServer, EtcdServer]: NoneType
# Create mocker args dictionary
mocker_args = {"speedup_ratio": SPEEDUP_RATIO, "block_size": BLOCK_SIZE}
try:
# Start mocker instances with the new CLI interface
# Start mocker instances
logger.info(f"Starting {NUM_MOCKERS} mocker instances")
mockers = MockerProcess(
request,
......@@ -1008,234 +344,21 @@ def test_indexers_sync(
store_backend=store_backend,
)
logger.info(f"All mockers using endpoint: {mockers.endpoint}")
# Initialize mockers
mockers.__enter__()
# Use async to manage the test flow
async def test_sync():
# Create SEPARATE runtimes for each router to ensure independence
# This is especially important for file storage backend where connection_id
# would otherwise be shared between routers
runtime1 = get_runtime(store_backend)
runtime2 = get_runtime(store_backend)
# Use the namespace from the mockers for both runtimes
namespace1 = runtime1.namespace(mockers.namespace)
component1 = namespace1.component("mocker")
endpoint1 = component1.endpoint("generate")
namespace2 = runtime2.namespace(mockers.namespace)
component2 = namespace2.component("mocker")
endpoint2 = component2.endpoint("generate")
# Create KvRouterConfig with lower snapshot threshold for testing
kv_router_config = KvRouterConfig(router_snapshot_threshold=20)
async def send_requests_to_router(router, num_requests, router_name):
# Now send the actual requests
tasks = []
for i in range(num_requests):
# Generate random token IDs for each request
logger.debug(
f"Sending request {i + 1}/{num_requests} to {router_name}"
)
# Generate 30 random tokens
request_tokens = [random.randint(1, 10000) for _ in range(30)]
# Send request to mocker via the router
tasks.append(
asyncio.create_task(
send_request_via_python_kv_router(
kv_python_router=router,
token_ids=request_tokens,
initial_wait=1.0,
max_retries=8,
stop_conditions={
"ignore_eos": True, # Don't stop on EOS token
"max_tokens": 10, # Generate exactly 10 tokens
},
)
)
)
# Wait for all requests to complete
results = await asyncio.gather(*tasks)
successful = sum(1 for r in results if r)
logger.info(
f"Completed {successful}/{num_requests} requests for {router_name}"
)
return successful
# Launch first router
logger.info("Creating first KV router")
kv_push_router1 = KvPushRouter(
endpoint=endpoint1,
block_size=BLOCK_SIZE,
kv_router_config=kv_router_config,
)
# Wait for mockers to be ready
await wait_for_mockers_ready(endpoint1, kv_push_router1)
# Send 25 requests to first router
logger.info("Sending 25 requests to first router")
# Send requests to first router
successful1 = await send_requests_to_router(kv_push_router1, 25, "Router 1")
assert (
successful1 == 25
), f"Expected 25 successful requests to router 1, got {successful1}"
# Wait for a second before creating the second router
logger.info("Waiting for 1 second before creating second router")
await asyncio.sleep(2)
# Launch second router - will automatically sync with the first router's state
logger.info("Creating second KV router")
kv_push_router2 = KvPushRouter(
endpoint=endpoint2,
# Use the common test implementation (creates its own runtimes for each router)
# Note: Consumer verification is done inside _test_router_indexers_sync while routers are alive
_test_router_indexers_sync(
engine_workers=mockers,
block_size=BLOCK_SIZE,
kv_router_config=kv_router_config,
)
# Send 25 requests to second router with initial retry loop
logger.info("Sending 25 requests to second router")
successful2 = await send_requests_to_router(kv_push_router2, 25, "Router 2")
assert (
successful2 == 25
), f"Expected 25 successful requests to router 2, got {successful2}"
# Wait for all requests to complete (they should already be complete from gather)
# Wait another 1 second for internal synchronization
logger.info("Waiting for final synchronization")
await asyncio.sleep(1)
# Check NATS consumers to verify both routers have separate consumers
await check_nats_consumers(mockers.namespace, expected_count=2)
# Verify NATS object store bucket was created with snapshot
# Mirror the Rust bucket naming logic from subscriber.rs:
# component.subject() -> "namespace.{ns}.component.{comp}"
# then slugify (convert dots to dashes, lowercase, etc) and append "-radix-bucket"
component_subject = f"namespace.{mockers.namespace}.component.mocker"
slugified = component_subject.lower().replace(".", "-").replace("_", "-")
expected_bucket = f"{slugified}-radix-bucket"
expected_file = "radix-state"
logger.info(f"Verifying NATS object store bucket exists: {expected_bucket}")
snapshot_verified = False
try:
# Connect to NATS and check object store
nc = await nats.connect("nats://localhost:4222")
try:
js = nc.jetstream()
obj_store = await js.object_store(expected_bucket)
# Try to get the expected file
try:
result = await obj_store.get(expected_file)
logger.info(
f"✓ Snapshot file '{expected_file}' found in bucket '{expected_bucket}' "
f"(size: {len(result.data) if result.data else 0} bytes)"
)
snapshot_verified = True
except Exception as e:
logger.error(
f"Snapshot file '{expected_file}' not found in bucket '{expected_bucket}': {e}"
)
finally:
await nc.close()
except Exception as e:
logger.error(f"Error checking NATS object store: {e}")
# Assert that snapshot was created (threshold=20, sent 25 requests)
if not snapshot_verified:
assert False, (
f"Expected snapshot to be created in bucket '{expected_bucket}' with file '{expected_file}'. "
f"Router sent 25 requests with snapshot_threshold=20, so snapshot should have been triggered."
)
# Dump states from both routers
logger.info("Dumping states from both routers")
state1_json = await kv_push_router1.dump_events()
state2_json = await kv_push_router2.dump_events()
# Parse JSON strings for comparison
state1 = json.loads(state1_json)
state2 = json.loads(state2_json)
# Sort both states for comparison (order might differ due to HashMap iteration and sharding)
def sort_key(event):
data = event["event"]["data"]["stored"]
blocks = data["blocks"]
first_block = blocks[0]
return (
event["worker_id"],
first_block["tokens_hash"],
data["parent_hash"],
)
sorted_state1 = sorted(state1, key=sort_key)
sorted_state2 = sorted(state2, key=sort_key)
# Verify they are equal
logger.info(f"Router 1 has {len(sorted_state1)} events")
logger.info(f"Router 2 has {len(sorted_state2)} events")
# Compare states one by one and only show differences
if len(sorted_state1) != len(sorted_state2):
logger.error(
f"Router 1 has {len(sorted_state1)} events, Router 2 has {len(sorted_state2)} events"
)
assert False, "Router states have different numbers of events"
differences = []
for i, (state1_item, state2_item) in enumerate(
zip(sorted_state1, sorted_state2)
):
# Create copies without event_id for comparison
item1_compare = state1_item.copy()
item2_compare = state2_item.copy()
# Remove event_id from the nested event structure
if "event" in item1_compare and "event_id" in item1_compare["event"]:
del item1_compare["event"]["event_id"]
if "event" in item2_compare and "event_id" in item2_compare["event"]:
del item2_compare["event"]["event_id"]
if item1_compare != item2_compare:
differences.append(
{
"index": i,
"router1_state": state1_item,
"router2_state": state2_item,
}
)
# If there are differences, format them for easier debugging
if differences:
error_msg = f"Router states are not equal. Found {len(differences)} differences:\n"
for diff in differences:
error_msg += f"\nDifference at index {diff['index']}:\n"
error_msg += (
f"Router 1: {json.dumps(diff['router1_state'], indent=2)}\n"
)
error_msg += (
f"Router 2: {json.dumps(diff['router2_state'], indent=2)}\n"
model_name=MODEL_NAME,
num_workers=NUM_MOCKERS,
store_backend=store_backend,
)
error_msg += "-" * 80 + "\n"
assert False, error_msg
logger.info("Successfully verified that both router states are equal")
# Run the async test
asyncio.run(test_sync())
logger.info("Indexers sync test completed successfully")
finally:
# Clean up mockers
if "mockers" in locals():
mockers.__exit__(None, None, None)
......@@ -1245,38 +368,13 @@ def test_indexers_sync(
def test_query_instance_id_returns_worker_and_tokens(
request, runtime_services, predownload_tokenizers
):
"""
Test that the KV router correctly handles query_instance_id annotation.
When a request includes 'nvext.annotations': ['query_instance_id'], the router should:
1. NOT route the request to a worker immediately
2. Return worker_instance_id as an SSE event
3. Return token_data as an SSE event containing the request tokens
4. Term: FixtureRequestinate the stream w: tuple[NatsServer, EtcdServer]ith [DONE]: NoneType
This tests the specific code block:
if query_instance_id {
let instance_id_str = instance_id.to_string();
let response = Annotated::from_annotation("worker_instance_id", &instance_id_str)?;
let response_tokens = Annotated::from_annotation("token_data", &request.token_ids)?;
let stream = stream::iter(vec![response, response_tokens]);
return Ok(ResponseStream::new(Box::pin(stream), stream_context));
}
"""
"""Test query_instance_id annotation with mocker engines."""
logger.info("Starting KV router query_instance_id annotation test")
mocker_args = {"speedup_ratio": SPEEDUP_RATIO, "block_size": BLOCK_SIZE}
os.makedirs(request.node.name, exist_ok=True)
try:
# Start KV router (frontend)
frontend_port = PORT + 30 # Use unique port to avoid conflicts
logger.info(f"Starting KV router frontend on port {frontend_port}")
kv_router = KVRouterProcess(request, frontend_port)
kv_router.__enter__()
# Start multiple mocker engines to ensure worker selection logic
# Start mocker instances
logger.info(f"Starting {NUM_MOCKERS} mocker instances")
mockers = MockerProcess(
request, mocker_args=mocker_args, num_mockers=NUM_MOCKERS
......@@ -1284,147 +382,17 @@ def test_query_instance_id_returns_worker_and_tokens(
logger.info(f"All mockers using endpoint: {mockers.endpoint}")
mockers.__enter__()
url = f"http://localhost:{frontend_port}/v1/chat/completions"
# Send a warming request first to ensure system is ready
logger.info("Sending warming request without annotations...")
asyncio.run(send_request_with_retry(url, TEST_PAYLOAD))
# Test payload with query_instance_id annotation
annotated_payload = {
**TEST_PAYLOAD,
"nvext": {"annotations": ["query_instance_id"]},
}
async def test_annotation_response():
"""Send request with query_instance_id and validate response structure"""
async with aiohttp.ClientSession() as session:
logger.info("Sending request with query_instance_id annotation...")
async with session.post(url, json=annotated_payload) as response:
assert (
response.status == 200
), f"Expected 200 but got {response.status}"
# Collect all response chunks
response_chunks = []
async for chunk in response.content:
if chunk:
chunk_str = chunk.decode("utf-8", errors="replace")
response_chunks.append(chunk_str)
full_response = "".join(response_chunks)
logger.info(
f"Full SSE response ({len(full_response)} bytes):\n{full_response}"
)
# Parse and validate the response structure
events = []
sse_parts = full_response.split("\n\n")
for part in sse_parts:
part = part.strip()
if not part:
continue
if part.startswith("event:"):
lines = part.split("\n")
event_line = next(
(line for line in lines if line.startswith("event:")),
None,
)
data_line = next(
(
line
for line in lines
if line.startswith("data:") or line.startswith(":")
),
None,
)
if event_line and data_line:
event_type = event_line.split(":", 1)[1].strip()
if data_line.startswith("data:"):
data_value = data_line.split(":", 1)[1].strip()
else:
data_value = data_line.split(":", 1)[1].strip()
events.append((event_type, data_value))
elif part.startswith("data:"):
data_value = part.split(":", 1)[1].strip()
logger.info(f"Parsed events: {events}")
# Validate worker_instance_id event
worker_event = next(
(e for e in events if e[0] == "worker_instance_id"), None
)
assert (
worker_event is not None
), f"Missing worker_instance_id event in: {events}"
# Validate token_data event
token_event = next(
(e for e in events if e[0] == "token_data"), None
)
assert (
token_event is not None
), f"Missing token_data event in: {events}"
token_data_str = token_event[1].strip('"')
try:
token_list = json.loads(token_data_str)
except json.JSONDecodeError as e:
raise AssertionError(
f"token_data is not valid JSON: {token_data_str}, error: {e}"
)
assert isinstance(
token_list, list
), f"token_data should be a list, got: {type(token_list)}"
assert (
len(token_list) > 0
), f"token_data should not be empty: {token_list}"
assert all(
isinstance(token, int) for token in token_list
), f"All tokens should be integers: {token_list}"
logger.info(
f"Valid token_data with {len(token_list)} tokens: {token_list[:10]}{'...' if len(token_list) > 10 else ''}"
)
# Validate that no actual generation happened (should only be metadata)
# This proves the early return worked correctly
generation_indicators = [
"choices",
"content",
"delta",
"finish_reason",
]
for indicator in generation_indicators:
assert (
indicator not in full_response.lower()
), f"Found generation indicator '{indicator}' - request should not have been routed to worker"
logger.info(
"No generation content found - early return worked correctly"
# Run query_instance_id annotation test
frontend_port = PORTS[0] + 30 # Use unique port to avoid conflicts
_test_router_query_instance_id(
engine_workers=mockers,
block_size=BLOCK_SIZE,
request=request,
frontend_port=frontend_port,
test_payload=TEST_PAYLOAD,
)
return {
"worker_instance_id": worker_event[1].strip('"'),
"token_count": len(token_list),
"tokens": token_list,
}
result = asyncio.run(test_annotation_response())
logger.info("Successfully validated query_instance_id annotation response:")
logger.info(f"Worker ID: {result['worker_instance_id']}")
logger.info(f"Token count: {result['token_count']}")
finally:
if "kv_router" in locals():
kv_router.__exit__(None, None, None)
if "mockers" in locals():
mockers.__exit__(None, None, None)
......@@ -1432,21 +400,7 @@ def test_query_instance_id_returns_worker_and_tokens(
@pytest.mark.pre_merge
@pytest.mark.model(MODEL_NAME)
def test_router_decisions(request, runtime_services, predownload_tokenizers):
"""Validate KV cache prefix reuse and dp_rank routing by sending progressive requests with overlapping prefixes.
Flow:
- Start two mocker workers, each with dp_size=4 (8 total dp ranks).
- Wait for workers to be ready.
- Send 4 progressive requests, each extending the previous tokens:
* Request 1: BLOCK_SIZE random tokens (forced to specific worker_id and dp_rank=1)
* Request 2: Request 1 tokens + BLOCK_SIZE new random tokens (naturally routed)
* Request 3: Request 2 tokens + BLOCK_SIZE new random tokens (naturally routed)
* Request 4: Request 3 tokens + BLOCK_SIZE new random tokens (naturally routed)
- Dump events from router and verify:
* All but one (worker_id, dp_rank) should have no events (due to prefix reuse)
* The (worker_id, dp_rank) with events should have exactly 4 events (one per request)
* All events should be on the forced (worker_id, dp_rank=1) (verifying forced routing and prefix reuse)
"""
"""Validate KV cache prefix reuse and dp_rank routing by sending progressive requests with overlapping prefixes."""
# runtime_services starts etcd and nats
logger.info("Starting test router prefix reuse and KV events synchronization")
......@@ -1459,12 +413,12 @@ def test_router_decisions(request, runtime_services, predownload_tokenizers):
}
try:
# Start 2 mocker instances, each with dp_size=4 (8 total dp ranks)
logger.info(
"Starting 2 mocker instances with dp_size=4 each (8 total dp ranks)"
)
mockers = MockerProcess(request, mocker_args=mocker_args, num_mockers=2)
logger.info(f"All mockers using endpoint: {mockers.endpoint}")
# Initialize mockers
mockers.__enter__()
......@@ -1475,131 +429,10 @@ def test_router_decisions(request, runtime_services, predownload_tokenizers):
component = namespace.component("mocker")
endpoint = component.endpoint("generate")
# Create KvRouterConfig with lower snapshot threshold for testing
kv_router_config = KvRouterConfig(router_snapshot_threshold=20)
kv_push_router = KvPushRouter(
endpoint=endpoint,
block_size=BLOCK_SIZE,
kv_router_config=kv_router_config,
)
# Use async to manage the test flow
async def test_sync():
# Wait for workers to be ready and get their instance IDs
mocker_worker_ids = await wait_for_mockers_ready(
endpoint, kv_push_router, expected_num_workers=2
)
logger.info(f"Workers ready: {mocker_worker_ids}")
# Use the first worker_id for forced routing
forced_worker_id = mocker_worker_ids[0]
forced_dp_rank = 1
logger.info(
f"Will force first request to worker_id={forced_worker_id}, dp_rank={forced_dp_rank}"
)
# Send 4 progressive requests with overlapping prefixes
cumulative_tokens = []
for i in range(4):
# Add BLOCK_SIZE new random tokens
new_tokens = [random.randint(1, 10000) for _ in range(BLOCK_SIZE)]
cumulative_tokens.extend(new_tokens)
# Force first request to specific worker_id and dp_rank=1, let subsequent requests follow naturally
worker_id_override = forced_worker_id if i == 0 else None
dp_rank_override = forced_dp_rank if i == 0 else None
logger.info(
f"Sending request {i + 1}/4 with {len(cumulative_tokens)} tokens "
f"(added {len(new_tokens)} new tokens)"
f"{f' - FORCING worker_id={worker_id_override}, dp_rank={dp_rank_override}' if worker_id_override is not None else ''}"
)
await send_request_via_python_kv_router(
kv_python_router=kv_push_router,
token_ids=cumulative_tokens.copy(),
initial_wait=1.0,
max_retries=8,
stop_conditions={
"ignore_eos": True, # Don't stop on EOS token
"max_tokens": 2, # Generate exactly 2 tokens
},
worker_id=worker_id_override,
dp_rank=dp_rank_override,
)
# Wait a bit between requests
await asyncio.sleep(0.5)
# Wait for final synchronization
await asyncio.sleep(1)
# Dump events from the router
events_json = await kv_push_router.dump_events()
return events_json, forced_worker_id, forced_dp_rank
# Run the async test
events_json, expected_worker_id, expected_dp_rank = asyncio.run(test_sync())
# Parse events and count by (worker_id, dp_rank)
events = json.loads(events_json)
events_by_worker_dp: dict[tuple[int, int], list[Any]] = {}
for event in events:
worker_id = event.get("worker_id")
# Extract dp_rank from the event's KvCacheEvent
dp_rank = event.get("event", {}).get("dp_rank", 0)
key = (worker_id, dp_rank)
if key not in events_by_worker_dp:
events_by_worker_dp[key] = []
events_by_worker_dp[key].append(event)
logger.info(
f"Events by (worker_id, dp_rank): {[(key, len(evts)) for key, evts in events_by_worker_dp.items()]}"
)
# Verify: All but one (worker_id, dp_rank) should have no events
workers_with_events = [
key for key, evts in events_by_worker_dp.items() if len(evts) > 0
]
assert len(workers_with_events) == 1, (
f"Expected exactly 1 (worker_id, dp_rank) to have events (due to prefix reuse), "
f"but found {len(workers_with_events)} with events: {workers_with_events}"
)
# Verify: The (worker_id, dp_rank) with events should have exactly 4 events
active_worker_dp = workers_with_events[0]
num_events = len(events_by_worker_dp[active_worker_dp])
assert num_events == 4, (
f"Expected (worker_id, dp_rank) {active_worker_dp} to have exactly 4 events, "
f"but found {num_events} events"
)
# Verify: Both worker_id and dp_rank should match the forced values
active_worker_id = active_worker_dp[0]
active_dp_rank = active_worker_dp[1]
assert active_worker_id == expected_worker_id, (
f"Expected all events to have worker_id={expected_worker_id} (forced in first request), "
f"but found worker_id={active_worker_id}"
)
assert active_dp_rank == expected_dp_rank, (
f"Expected all events to have dp_rank={expected_dp_rank} (forced in first request), "
f"but found dp_rank={active_dp_rank}"
)
logger.info(
f"Successfully verified: Worker {active_worker_id} dp_rank {active_dp_rank} handled all 4 requests with prefix reuse. "
f"All events correctly routed to worker_id={expected_worker_id}, dp_rank={expected_dp_rank} as expected. "
f"KV events synchronized correctly."
_test_router_decisions(
mockers, endpoint, MODEL_NAME, request, test_dp_rank=True
)
finally:
# Clean up mockers
if "mockers" in locals():
mockers.__exit__(None, None, None)
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
import logging
import os
import time
from typing import Any, Dict, Optional
import pytest
from tests.router.common import ( # utilities
_test_router_basic,
_test_router_decisions,
generate_random_suffix,
get_runtime,
)
from tests.utils.managed_process import ManagedProcess
logger = logging.getLogger(__name__)
MODEL_NAME = "TinyLlama/TinyLlama-1.1B-Chat-v1.0"
SPEEDUP_RATIO = 10.0
PORTS = [
8011,
8022,
] # Frontend ports: use PORTS[0] for single router, PORTS for multi-router
NUM_REQUESTS = 10
BLOCK_SIZE = 16
# Shared test payload for all tests
TEST_PAYLOAD: Dict[str, Any] = {
"model": MODEL_NAME,
"messages": [
{
"role": "user",
"content": "In a quiet meadow tucked between rolling hills, a plump gray rabbit nibbled on clover beneath the shade of a gnarled oak tree. Its ears twitched at the faint rustle of leaves, but it remained calm, confident in the safety of its burrow just a few hops away. The late afternoon sun warmed its fur, and tiny dust motes danced in the golden light as bees hummed lazily nearby. Though the rabbit lived a simple life, every day was an adventure of scents, shadows, and snacks—an endless search for the tastiest patch of greens and the softest spot to nap.",
}
],
"stream": True,
"max_tokens": 10,
}
class VLLMProcess:
"""Manages vLLM workers using dynamo.vllm (HTTP API + KV events).
This is a drop-in replacement for MockerProcess that uses real vLLM workers.
The key difference: dynamo.vllm automatically handles:
- HTTP API serving
- KV cache event publishing (ZMQ → NATS bridge)
- Integration with dynamo.frontend router
"""
def __init__(
self,
request,
vllm_args: Optional[Dict[str, Any]] = None,
num_workers: int = 2,
single_gpu: bool = False,
data_parallel_size: Optional[int] = None,
):
"""Initialize vLLM workers with dynamo integration.
Args:
request: pytest request fixture for log directory
vllm_args: Configuration dict with keys:
- block_size: KV cache block size (default: 16)
- model: Model name/path (default: TinyLlama-1.1B)
- gpu_memory_utilization: GPU memory fraction per worker (default: 0.9)
- max_model_len: Maximum sequence length (optional)
- speedup_ratio: IGNORED (vLLM runs at real speed)
num_workers: Number of vLLM worker processes
single_gpu: If True, all workers share GPU 0 (requires gpu_memory_utilization < 1.0/num_workers)
data_parallel_size: If set, enables data parallelism with this many ranks (num_workers must equal data_parallel_size)
"""
# Generate unique namespace for isolation
namespace_suffix = generate_random_suffix()
self.namespace = f"test-namespace-{namespace_suffix}"
self.component_name = "backend"
self.endpoint = f"dyn://{self.namespace}.{self.component_name}.generate"
self.num_workers = num_workers
self.worker_processes = []
if vllm_args is None:
vllm_args = {}
block_size = vllm_args.get("block_size", BLOCK_SIZE)
model = vllm_args.get("model", MODEL_NAME)
gpu_memory_utilization = vllm_args.get("gpu_memory_utilization", 0.9)
max_model_len = vllm_args.get("max_model_len")
self.model_name = model
# Create vLLM worker processes
# Matches test.sh behavior:
# - When data_parallel_size is set, launch one process per DP rank
# - Each process gets --data-parallel-rank and --data-parallel-size
# - Each process runs on its own GPU via CUDA_VISIBLE_DEVICES
# - --connector nixl enables KV cache transfer between ranks
for worker_idx in range(num_workers):
# Calculate GPU device for this process
if single_gpu:
# Force all processes to GPU 0 (for single-GPU testing)
gpu_device = "0"
elif data_parallel_size is not None:
# Worker sees dp_rank GPUs (each DP rank gets its own GPU)
worker_start_gpu = worker_idx * data_parallel_size
gpu_device = ",".join(
str(i)
for i in range(
worker_start_gpu, worker_start_gpu + data_parallel_size
)
)
else:
# No DP; worker sees one GPU
gpu_device = str(worker_idx)
command = [
"python3",
"-m",
"dynamo.vllm",
"--model",
model,
"--block-size",
str(block_size),
"--enforce-eager", # Disable CUDA graphs for faster startup
"--gpu-memory-utilization",
str(gpu_memory_utilization),
]
# Add optional max_model_len if specified
if max_model_len is not None:
command.extend(["--max-model-len", str(max_model_len)])
if data_parallel_size is not None:
# Add DP configuration for external load balancing
# See: https://docs.vllm.ai/en/v0.10.0/serving/data_parallel_deployment.html#external-load-balancing
command.extend(
[
"--data-parallel-size",
str(data_parallel_size),
# "--data-parallel-address", "127.0.0.1", # Required for DP coordination
# "--data-parallel-rpc-port", "13345", # RPC port for DP coordination
# "--connector", "nixl", # Required for KV transfer between DP ranks
]
)
env = os.environ.copy() # Copy parent environment
env.update(
{
"CUDA_VISIBLE_DEVICES": gpu_device,
"DYN_NAMESPACE": self.namespace,
"PYTHONHASHSEED": "0", # for deterministic event id's
}
)
# Create managed process for the worker
process = ManagedProcess(
command=command,
env=env,
timeout=120, # Allow time for model loading
display_output=True,
health_check_ports=[],
health_check_urls=[],
log_dir=request.node.name,
terminate_existing=False,
)
self.worker_processes.append(process)
if data_parallel_size is not None:
logger.info(
f"Created {data_parallel_size} DP ranks per worker on GPU(s) {gpu_device} "
f"(gpu_memory_utilization={gpu_memory_utilization}) "
f"with endpoint: {self.endpoint}"
)
else:
logger.info(
f"Created vLLM worker {worker_idx} on GPU {gpu_device} "
f"(gpu_memory_utilization={gpu_memory_utilization}) "
f"with endpoint: {self.endpoint}"
)
def __enter__(self):
"""Start all vLLM worker processes with sequential initialization.
Workers are started sequentially with a delay between each to avoid
NIXL/UCX resource contention during initialization. This prevents
UCX shared memory handle allocation failures when multiple workers
try to initialize simultaneously on the same GPU.
"""
logger.info(
f"[VLLMProcess] Starting {len(self.worker_processes)} worker processes sequentially..."
)
# Start each process sequentially, waiting for NIXL initialization before next
for i, process in enumerate(self.worker_processes):
logger.info(f"[VLLMProcess] Starting vLLM worker {i}...")
try:
# Manually initialize the process without blocking on health checks
process._logger = logging.getLogger(process.__class__.__name__)
process._command_name = process.command[0]
os.makedirs(process.log_dir, exist_ok=True)
log_name = f"{process._command_name}.log.txt"
process._log_path = os.path.join(process.log_dir, log_name)
if process.data_dir:
process._remove_directory(process.data_dir)
process._terminate_existing()
logger.info(
f"[VLLMProcess] Launching process {i} (pid will be assigned)..."
)
process._start_process() # Start the process but don't wait
logger.info(
f"[VLLMProcess] Worker {i} launched with PID: {process.proc.pid if process.proc else 'unknown'}"
)
time.sleep(process.delayed_start)
# Wait for NIXL initialization before starting next worker
# This prevents UCX shared memory contention
if i < len(self.worker_processes) - 1:
nixl_init_delay = 5 # seconds
logger.info(
f"[VLLMProcess] Waiting {nixl_init_delay}s for worker {i} to initialize NIXL before starting next worker..."
)
time.sleep(nixl_init_delay)
except Exception:
logger.exception(f"[VLLMProcess] Failed to start worker {i}")
# Clean up on failure
try:
process.__exit__(None, None, None)
except Exception as cleanup_err:
logger.warning(f"[VLLMProcess] Error during cleanup: {cleanup_err}")
raise
logger.info(
f"[VLLMProcess] All {len(self.worker_processes)} workers launched with sequential initialization."
)
logger.info("[VLLMProcess] Waiting for health checks to complete...")
# Now wait for health checks for all processes
for i, process in enumerate(self.worker_processes):
logger.info(f"[VLLMProcess] Checking health for worker {i}...")
try:
elapsed = process._check_ports(process.timeout)
process._check_urls(process.timeout - elapsed)
process._check_funcs(process.timeout - elapsed)
logger.info(f"[VLLMProcess] Worker {i} health checks passed")
except Exception:
logger.error(f"[VLLMProcess] Worker {i} health check failed")
# Clean up all processes on failure
self.__exit__(None, None, None)
raise
logger.info(
"[VLLMProcess] All workers started successfully and passed health checks!"
)
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Stop all vLLM worker processes gracefully."""
for i, process in enumerate(self.worker_processes):
logger.info(f"Stopping vLLM worker {i}")
process.__exit__(exc_type, exc_val, exc_tb)
# Add delay to ensure full cleanup of NATS/ETCD/ZMQ resources
# This prevents test isolation issues when running multiple tests
logger.info("Waiting for vLLM worker resources to fully clean up...")
time.sleep(2)
@pytest.mark.e2e
@pytest.mark.gpu_1
@pytest.mark.vllm
@pytest.mark.skip(reason="All vLLM tests disabled for now")
@pytest.mark.model(MODEL_NAME)
def test_vllm_kv_router_basic(request, runtime_services, predownload_tokenizers):
"""
Quick e2e sanity test for KV router with vLLM engine instances.
"""
# runtime_services starts etcd and nats
N_VLLM_WORKERS = 2
logger.info(f"Starting vLLM KV router test with {N_VLLM_WORKERS} workers")
vllm_args = {
"block_size": BLOCK_SIZE,
"model": MODEL_NAME,
"gpu_memory_utilization": 0.35,
"max_model_len": 1024, # Limit context length to reduce KV cache size
}
try:
# Start vLLM workers
logger.info(f"Starting {N_VLLM_WORKERS} vLLM workers")
vllm_workers = VLLMProcess(
request,
vllm_args=vllm_args,
num_workers=N_VLLM_WORKERS,
single_gpu=True, # fit workers into one GPU
)
logger.info(f"All vLLM workers using namespace: {vllm_workers.namespace}")
vllm_workers.__enter__()
# Run basic router test (starts router internally, vLLM workers need frontend readiness check)
_test_router_basic(
engine_workers=vllm_workers,
block_size=BLOCK_SIZE,
request=request,
frontend_port=PORTS[0],
test_payload=TEST_PAYLOAD,
num_requests=NUM_REQUESTS,
wait_for_frontend=True, # vLLM workers need time to load models
frontend_timeout=180, # 3 minutes should be plenty for TinyLlama
store_backend="etcd", # Explicit for clarity
)
finally:
if "vllm_workers" in locals():
vllm_workers.__exit__(None, None, None)
@pytest.mark.e2e
@pytest.mark.vllm
@pytest.mark.gpu_1
@pytest.mark.skip(reason="All vLLM tests disabled for now")
@pytest.mark.model(MODEL_NAME)
def test_router_decisions_vllm_multiple_workers(
request, runtime_services, predownload_tokenizers
):
# runtime_services starts etcd and nats
logger.info("Starting vLLM router prefix reuse test with two workers")
# Create vLLM args - one worker with dp_size=2, sharing GPU 0
vllm_args = {
"block_size": BLOCK_SIZE,
"model": MODEL_NAME,
"gpu_memory_utilization": 0.35,
"max_model_len": 1024, # Limit context length to reduce KV cache size
}
N_WORKERS = 2
try:
# Start 2 worker processes (dp_rank 0 and dp_rank 1) on the same GPU
logger.info(
"Starting 2 vLLM worker processes with dp_size=2 on single GPU (gpu_memory_utilization=0.35, max_model_len=1024)"
)
vllm_workers = VLLMProcess(
request,
vllm_args=vllm_args,
num_workers=N_WORKERS, # One worker process with dp_size=2
single_gpu=True, # Worker uses GPU 0
)
logger.info(f"All vLLM workers using namespace: {vllm_workers.namespace}")
# Initialize vLLM workers
vllm_workers.__enter__()
# Get runtime and create endpoint
runtime = get_runtime()
namespace = runtime.namespace(vllm_workers.namespace)
component = namespace.component("backend")
endpoint = component.endpoint("generate")
_test_router_decisions(
vllm_workers, endpoint, MODEL_NAME, request, test_dp_rank=False
)
finally:
# Clean up vLLM workers
if "vllm_workers" in locals():
vllm_workers.__exit__(None, None, None)
@pytest.mark.e2e
@pytest.mark.vllm
@pytest.mark.gpu_2
@pytest.mark.skip(reason="All vLLM tests disabled for now")
@pytest.mark.model(MODEL_NAME)
def test_router_decisions_vllm_dp(request, runtime_services, predownload_tokenizers):
"""Validate KV cache prefix reuse with vLLM by sending progressive requests with overlapping prefixes.
Same flow as test_router_decisions_vllm_multiple_workers; force first request to (worker_id, dp_rank=1).
Dump events from router and verify:
* All but one (worker_id, dp_rank) should have no events (due to prefix reuse)
* The (worker_id, dp_rank) with events should have exactly 4 events (one per request)
* All events should be on the forced (worker_id, dp_rank=1) (verifying forced routing and prefix reuse)
"""
# Create vLLM args - one worker with dp_size=2, sharing GPU 0
vllm_args = {
"block_size": BLOCK_SIZE,
"model": MODEL_NAME,
"gpu_memory_utilization": 0.35,
"max_model_len": 1024, # Limit context length to reduce KV cache size
}
N_WORKERS = 1
DP_SIZE = 2
try:
logger.info(
"Starting 2 vLLM DP ranks (dp_size=2) on single GPU (gpu_memory_utilization=0.35, max_model_len=1024)"
)
vllm_workers = VLLMProcess(
request,
vllm_args=vllm_args,
num_workers=N_WORKERS, # Ignored when data_parallel_size is set
single_gpu=False,
data_parallel_size=DP_SIZE, # Creates DP_SIZE processes (one per rank)
)
logger.info(f"All vLLM workers using namespace: {vllm_workers.namespace}")
vllm_workers.__enter__()
# Get runtime and create endpoint
runtime = get_runtime()
# Use the namespace from the vLLM workers
namespace = runtime.namespace(vllm_workers.namespace)
component = namespace.component("backend") # endpoint is backend.generate
endpoint = component.endpoint("generate")
_test_router_decisions(
vllm_workers, endpoint, MODEL_NAME, request, test_dp_rank=True
)
finally:
# Clean up vLLM workers
if "vllm_workers" in locals():
vllm_workers.__exit__(None, None, None)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment