Unverified Commit 44986bf5 authored by Keiven C's avatar Keiven C Committed by GitHub
Browse files

test: remove hardcoded ports and add timeouts to KVBM tests (#5855)


Signed-off-by: default avatarKeiven Chang <keivenc@nvidia.com>
Co-authored-by: default avatarKeiven Chang <keivenchang@users.noreply.github.com>
parent 11cefc36
......@@ -247,7 +247,9 @@ markers = [
"usefixtures: use fixtures",
"parametrize: parameterized test",
"filterwarnings: filter warnings",
"asyncio: asyncio test marker"
"asyncio: asyncio test marker",
# Third-party plugin markers
"timeout: test timeout in seconds (pytest-timeout plugin)",
]
# Linting/formatting
......
......@@ -60,6 +60,8 @@ def pytest_configure(config):
"custom_build: marks tests that require custom builds or special setup (e.g., MoE models)",
"k8s: marks tests as requiring Kubernetes",
"fault_tolerance: marks tests as fault tolerance tests",
# Third-party plugin markers
"timeout: test timeout in seconds (pytest-timeout plugin)",
]
for marker in markers:
config.addinivalue_line("markers", marker)
......
......@@ -22,6 +22,8 @@ from typing import Dict, List, Optional, Tuple
import pytest
import requests
from tests.utils.port_utils import allocate_port, deallocate_port
# ============================================================================
# Module Availability Checks
# ============================================================================
......@@ -156,16 +158,26 @@ def check_logs_for_patterns(
class ApiTester:
"""Base class for making API requests to LLM endpoints."""
"""Base class for making API requests to LLM endpoints.
Note: base_url should be provided explicitly. The default fallback to localhost:8000
is deprecated and should not be relied upon for new tests.
"""
def __init__(
self,
base_url: Optional[str] = None,
model_id: Optional[str] = None,
):
self.base_url = (
base_url or os.environ.get("DYNAMO_API_BASE_URL") or "http://localhost:8000"
if base_url is None:
# Fallback chain: env var or error (no hardcoded default)
base_url = os.environ.get("DYNAMO_API_BASE_URL")
if base_url is None:
raise ValueError(
"base_url must be provided explicitly or set DYNAMO_API_BASE_URL environment variable. "
"Hardcoded default ports are not supported for pytest-xdist compatibility."
)
self.base_url = base_url
self.model_id = model_id or os.environ.get(
"KVBM_MODEL_ID", "deepseek-ai/DeepSeek-R1-Distill-Llama-8B"
)
......@@ -498,7 +510,7 @@ def tester(llm_server):
@pytest.fixture(scope="function")
def llm_server_kvbm(request, runtime_services):
def llm_server_kvbm(request, runtime_services_dynamic_ports):
"""Start LLM server with configurable cache sizes for KVBM testing.
Usage in test files:
......@@ -521,6 +533,9 @@ def llm_server_kvbm(request, runtime_services):
os.environ.get("KVBM_MODEL_ID", "deepseek-ai/DeepSeek-R1-Distill-Llama-8B"),
)
# Unpack NATS and etcd processes from runtime_services_dynamic_ports
nats_process, etcd_process = runtime_services_dynamic_ports
# Detect available server type
if check_module_available("vllm"):
server_type = ServerType.vllm
......@@ -532,8 +547,25 @@ def llm_server_kvbm(request, runtime_services):
"Neither vllm nor tensorrt_llm module is available in the current environment."
)
# Use dynamic port allocation to avoid conflicts (pytest-xdist safe)
# Note: ZMQ ports are allocated in lower range due to i16 limit (max 32767) in port_utils
port = allocate_port(start_port=8000)
metrics_port = allocate_port(start_port=6880)
zmq_pub_port = allocate_port(start_port=20001) # Lower range instead of 56001
zmq_ack_port = allocate_port(start_port=20002) # Lower range instead of 56002
print(
f"Allocated dynamic ports - vLLM: {port}, Metrics: {metrics_port}, "
f"ZMQ Pub: {zmq_pub_port}, ZMQ Ack: {zmq_ack_port}, "
f"NATS: {nats_process.port}, etcd: {etcd_process.port}"
)
# Build vLLM command
port = 8000
# TODO: For parallel execution on single GPU with pytest-xdist, add dynamic GPU memory allocation:
# 1. Detect parallel execution: worker_count = os.environ.get("PYTEST_XDIST_WORKER_COUNT")
# 2. Calculate fraction: gpu_memory_fraction = 0.85 / int(worker_count) if worker_count else 0.9
# 3. Add to command: "--gpu-memory-utilization", str(gpu_memory_fraction)
# Example: 2 workers → 42.5% each, 3 workers → 28.3% each
# Trade-off: Smaller GPU memory per worker = smaller KV cache = more CPU offloads
command = [
"vllm",
"serve",
......@@ -553,6 +585,7 @@ def llm_server_kvbm(request, runtime_services):
command.extend(["--num-gpu-blocks-override", str(gpu_blocks)])
# Set up environment
# Note: NATS_SERVER and ETCD_ENDPOINTS are already set by runtime_services_dynamic_ports fixture
env = os.environ.copy()
env.update(
{
......@@ -560,10 +593,11 @@ def llm_server_kvbm(request, runtime_services):
"VLLM_SERVER_DEV_MODE": "1",
"DYN_LOG": "debug",
"DYN_KVBM_METRICS": "true",
"DYN_KVBM_METRICS_PORT": "6880",
# DynamoConnector connection settings
"NATS_SERVER": "nats://localhost:4222",
"ETCD_ENDPOINTS": "http://localhost:2379",
"DYN_KVBM_METRICS_PORT": str(metrics_port),
"DYN_KVBM_LEADER_ZMQ_PUB_PORT": str(zmq_pub_port),
"DYN_KVBM_LEADER_ZMQ_ACK_PORT": str(zmq_ack_port),
# DynamoConnector will use NATS_SERVER and ETCD_ENDPOINTS from environment
# (already set by runtime_services_dynamic_ports fixture)
}
)
......@@ -575,15 +609,44 @@ def llm_server_kvbm(request, runtime_services):
timeout = int(os.environ.get("KVBM_SERVER_START_TIMEOUT", "600"))
log_dir = f"{request.node.name}_vllm"
# Port-specific cleanup: Only kill vLLM processes using OUR allocated ports.
# This makes the fixture safe for pytest-xdist parallel execution.
# Check for processes listening on our specific ports before starting.
import psutil
from tests.utils.managed_process import terminate_process_tree
_logger = __import__("logging").getLogger(__name__)
for check_port in [port, metrics_port]:
for proc in psutil.process_iter(["pid", "name", "cmdline"]):
try:
# Check if process is listening on our port
connections = proc.connections(kind="inet")
for conn in connections:
if conn.laddr.port == check_port and conn.status == "LISTEN":
_logger.info(
f"Terminating existing process {proc.name()} (PID {proc.pid}) "
f"listening on port {check_port}"
)
terminate_process_tree(proc.pid, _logger)
break
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
pass
except Exception:
pass # Continue cleanup even if one process fails
# SAFETY: Do NOT use terminate_existing=True or stragglers=["vllm"] here.
# Those kill ALL vLLM processes system-wide, breaking parallel test execution.
# Port-based cleanup above is targeted and xdist-safe.
with ManagedProcess(
command=command,
env=env,
health_check_ports=[port, 6880], # vLLM server + KVBM metrics
health_check_ports=[port, metrics_port], # vLLM server + KVBM metrics
timeout=timeout,
display_output=True,
terminate_existing=True,
stragglers=["vllm"],
straggler_commands=["vllm serve"],
terminate_existing=False, # Port-based cleanup done above instead
stragglers=[], # Empty - we handle cleanup manually per port
straggler_commands=[], # Empty - we handle cleanup manually per port
log_dir=log_dir,
) as proc:
# Give KVBM connector extra time to fully initialize
......@@ -600,9 +663,21 @@ def llm_server_kvbm(request, runtime_services):
self.cpu_cache_blocks = cpu_blocks
self.gpu_cache_blocks = gpu_blocks
self.port = port
self.metrics_port = metrics_port
self.proc = proc
try:
yield ServerWrapper()
finally:
# Clean up allocated ports
deallocate_port(port)
deallocate_port(metrics_port)
deallocate_port(zmq_pub_port)
deallocate_port(zmq_ack_port)
print(
f"Deallocated ports - vLLM: {port}, Metrics: {metrics_port}, "
f"ZMQ Pub: {zmq_pub_port}, ZMQ Ack: {zmq_ack_port}"
)
class TestDeterminism:
......@@ -1183,8 +1258,6 @@ class TestDeterminism:
# ============================================================================
# Note: KVBM fixtures are in conftest.py for automatic pytest discovery
KVBM_METRICS_PORT = 6880
def parse_kvbm_metrics(metrics_text: str) -> dict:
"""Parse KVBM metrics from Prometheus format.
......@@ -1213,19 +1286,25 @@ def parse_kvbm_metrics(metrics_text: str) -> dict:
return metrics
def fetch_kvbm_metrics(port: int = KVBM_METRICS_PORT, timeout: int = 10) -> dict:
def fetch_kvbm_metrics(port: Optional[int] = None, timeout: int = 10) -> dict:
"""Fetch and parse KVBM metrics from the metrics endpoint.
Args:
port: Metrics server port (default: 6880)
port: Metrics server port (required for dynamic port allocation)
timeout: Request timeout in seconds
Returns:
Dictionary of parsed metrics
Raises:
ValueError: If port is not provided
RuntimeError: If metrics endpoint is unreachable or returns error
"""
if port is None:
raise ValueError(
"port must be provided explicitly. "
"Hardcoded default port is not supported for pytest-xdist compatibility."
)
response = requests.get(f"http://localhost:{port}/metrics", timeout=timeout)
if response.status_code != 200:
raise RuntimeError(
......
......@@ -17,7 +17,6 @@ disaggregated mode, aggregated mode has less randomness chances.
import logging
import os
import signal
import socket
import subprocess
import sys
import threading
......@@ -29,6 +28,8 @@ from typing import Any, Dict, List, Optional, TextIO
import pytest
import requests
from tests.utils.port_utils import allocate_port, deallocate_port
from .common import DeterminismTester, ServerType
from .common import TestDeterminism as BaseTestDeterminism
from .common import check_module_available
......@@ -45,16 +46,6 @@ pytestmark = [
]
def _find_free_port() -> int:
"""Find a free port by binding to port 0 and letting the OS assign one."""
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(("", 0))
s.listen(1)
port = s.getsockname()[1]
return port
class LLMServerManager:
"""Manages LLM server lifecycle for determinism testing."""
......@@ -68,13 +59,16 @@ class LLMServerManager:
server_type: Optional[str] = ServerType.vllm,
):
self.server_type = server_type
# Use provided port, env var, or find a free port to avoid conflicts
# Use provided port, env var, or allocate a dynamic port to avoid conflicts
if port is not None:
self.port = port
self.port_allocated = False # Port provided by caller, don't deallocate
elif os.environ.get("KVBM_SERVER_PORT"):
self.port = int(os.environ["KVBM_SERVER_PORT"])
self.port_allocated = False # Port from env var, don't deallocate
else:
self.port = _find_free_port()
self.port = allocate_port(start_port=8000)
self.port_allocated = True # Port allocated by us, must deallocate
self.base_url = base_url or f"http://localhost:{self.port}"
self.process: Optional[subprocess.Popen] = None
self.cpu_cache_blocks = cpu_cache_blocks
......@@ -280,6 +274,11 @@ class LLMServerManager:
self._tee_threads = []
self._close_log_files()
# Deallocate port if we allocated it
if self.port_allocated:
deallocate_port(self.port)
self.port_allocated = False
def _close_log_files(self):
if self.server_stdout_file:
self.server_stdout_file.write(
......
......@@ -59,11 +59,12 @@ def print_phase(phase_num: int, description: str) -> None:
print(f"\n=== Phase {phase_num}: {description} ===")
def check_kvbm_metrics(phase_name: str) -> dict[str, int]:
def check_kvbm_metrics(phase_name: str, metrics_port: int) -> dict[str, int]:
"""Fetch and display KVBM metrics.
Args:
phase_name: Name of the test phase for logging
metrics_port: Port number for the KVBM metrics endpoint
Returns:
Dictionary containing KVBM metrics with keys:
......@@ -71,7 +72,7 @@ def check_kvbm_metrics(phase_name: str) -> dict[str, int]:
- kvbm_onboard_blocks_h2d: Blocks onboarded from CPU to GPU
"""
print(f"\n--- Checking KVBM metrics after {phase_name} ---")
metrics = fetch_kvbm_metrics()
metrics = fetch_kvbm_metrics(port=metrics_port)
offload_d2h = metrics.get("kvbm_offload_blocks_d2h", 0)
onboard_h2d = metrics.get("kvbm_onboard_blocks_h2d", 0)
......@@ -113,6 +114,7 @@ def tester(llm_server_kvbm): # noqa: F811
# Tests
@pytest.mark.parametrize("llm_server_kvbm", [{"model": KVBM_TEST_MODEL}], indirect=True)
@pytest.mark.timeout(110) # 3x measured time (36.31s), rounded up
def test_offload_and_onboard(tester, llm_server_kvbm): # noqa: F811
"""
Test offload → cache reset → onboard cycle with determinism verification.
......@@ -135,7 +137,7 @@ def test_offload_and_onboard(tester, llm_server_kvbm): # noqa: F811
response_1 = tester.make_request(prompt, max_tokens=MAX_TOKENS)
print(f"Response 1: {response_1}")
metrics = check_kvbm_metrics("Phase 1")
metrics = check_kvbm_metrics("Phase 1", llm_server_kvbm.metrics_port)
assert (
metrics["kvbm_offload_blocks_d2h"] > 0
), "Phase 1: No blocks offloaded. KVBM may not be triggering offloads."
......@@ -155,7 +157,7 @@ def test_offload_and_onboard(tester, llm_server_kvbm): # noqa: F811
response_2 = tester.make_request(prompt, max_tokens=MAX_TOKENS)
print(f"Response 2: {response_2}")
metrics = check_kvbm_metrics("Phase 3")
metrics = check_kvbm_metrics("Phase 3", llm_server_kvbm.metrics_port)
assert (
metrics["kvbm_onboard_blocks_h2d"] > 0
), "Phase 3: No blocks onboarded. Expected CPU→GPU transfer after cache reset."
......@@ -179,6 +181,7 @@ def test_offload_and_onboard(tester, llm_server_kvbm): # noqa: F811
[{"cpu_blocks": 200, "gpu_blocks": 20, "model": KVBM_TEST_MODEL}],
indirect=True,
)
@pytest.mark.timeout(190) # 3x measured time (63.39s), rounded up
def test_gpu_cache_eviction(tester, llm_server_kvbm): # noqa: F811
"""
Test GPU cache eviction mechanics.
......@@ -204,7 +207,7 @@ def test_gpu_cache_eviction(tester, llm_server_kvbm): # noqa: F811
tester.make_request(prompt_1, max_tokens=MAX_TOKENS)
metrics_p1 = check_kvbm_metrics("Phase 1")
metrics_p1 = check_kvbm_metrics("Phase 1", llm_server_kvbm.metrics_port)
assert metrics_p1["kvbm_offload_blocks_d2h"] >= MIN_OFFLOAD_BLOCKS, (
f"Phase 1: Expected >= {MIN_OFFLOAD_BLOCKS} blocks offloaded, "
f"got {metrics_p1['kvbm_offload_blocks_d2h']}"
......@@ -220,7 +223,7 @@ def test_gpu_cache_eviction(tester, llm_server_kvbm): # noqa: F811
tester.make_request(prompt_2, max_tokens=MAX_TOKENS)
metrics_p2 = check_kvbm_metrics("Phase 2")
metrics_p2 = check_kvbm_metrics("Phase 2", llm_server_kvbm.metrics_port)
assert (
metrics_p2["kvbm_offload_blocks_d2h"] > metrics_p1["kvbm_offload_blocks_d2h"]
), (
......@@ -238,7 +241,7 @@ def test_gpu_cache_eviction(tester, llm_server_kvbm): # noqa: F811
tester.make_request(prompt_1, max_tokens=MAX_TOKENS)
metrics_p3 = check_kvbm_metrics("Phase 3")
metrics_p3 = check_kvbm_metrics("Phase 3", llm_server_kvbm.metrics_port)
assert (
metrics_p3["kvbm_onboard_blocks_h2d"] > 0
), "Phase 3: No blocks onboarded. Expected CPU→GPU retrieval after eviction."
......@@ -253,6 +256,7 @@ def test_gpu_cache_eviction(tester, llm_server_kvbm): # noqa: F811
[{"cpu_blocks": 200, "gpu_blocks": 20, "model": KVBM_TEST_MODEL}],
indirect=True,
)
@pytest.mark.timeout(107) # 3x measured time (35.40s), rounded up
def test_onboarding_determinism(tester, llm_server_kvbm): # noqa: F811
"""
Test onboarding determinism under eviction scenario.
......@@ -276,41 +280,41 @@ def test_onboarding_determinism(tester, llm_server_kvbm): # noqa: F811
print_phase(1, "Send first request")
print(f"Prompt 1: {prompt_1[:80]}...")
tester.make_request(prompt_1, max_tokens=MAX_TOKENS)
check_kvbm_metrics("Phase 1")
check_kvbm_metrics("Phase 1", llm_server_kvbm.metrics_port)
# Phase 2: Second request (may evict first from GPU)
print_phase(2, "Send second request (may evict first from GPU)")
print(f"Prompt 2: {prompt_2[:80]}...")
tester.make_request(prompt_2, max_tokens=MAX_TOKENS)
check_kvbm_metrics("Phase 2")
check_kvbm_metrics("Phase 2", llm_server_kvbm.metrics_port)
# Phase 3: Re-request prompt 1 (first onboard cycle)
print_phase(3, "Re-request Prompt 1 (first onboard cycle)")
print(f"Re-sending Prompt 1: {prompt_1[:80]}...")
response_1_first_onboard = tester.make_request(prompt_1, max_tokens=MAX_TOKENS)
print(f"Response 1 (first onboard): {response_1_first_onboard}")
check_kvbm_metrics("Phase 3")
check_kvbm_metrics("Phase 3", llm_server_kvbm.metrics_port)
# Phase 4: Re-request prompt 2 (first onboard cycle)
print_phase(4, "Re-request Prompt 2 (first onboard cycle)")
print(f"Re-sending Prompt 2: {prompt_2[:80]}...")
response_2_first_onboard = tester.make_request(prompt_2, max_tokens=MAX_TOKENS)
print(f"Response 2 (first onboard): {response_2_first_onboard}")
check_kvbm_metrics("Phase 4")
check_kvbm_metrics("Phase 4", llm_server_kvbm.metrics_port)
# Phase 5: Re-request prompt 1 (second onboard cycle)
print_phase(5, "Re-request Prompt 1 (second onboard cycle)")
print(f"Re-sending Prompt 1 (third time): {prompt_1[:80]}...")
response_1_second_onboard = tester.make_request(prompt_1, max_tokens=MAX_TOKENS)
print(f"Response 1 (second onboard): {response_1_second_onboard}")
check_kvbm_metrics("Phase 5")
check_kvbm_metrics("Phase 5", llm_server_kvbm.metrics_port)
# Phase 6: Re-request prompt 2 (second onboard cycle)
print_phase(6, "Re-request Prompt 2 (second onboard cycle)")
print(f"Re-sending Prompt 2 (third time): {prompt_2[:80]}...")
response_2_second_onboard = tester.make_request(prompt_2, max_tokens=MAX_TOKENS)
print(f"Response 2 (second onboard): {response_2_second_onboard}")
check_kvbm_metrics("Phase 6")
check_kvbm_metrics("Phase 6", llm_server_kvbm.metrics_port)
# Verify determinism between onboarded requests
print_test_header("DETERMINISM VERIFICATION")
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment