Unverified Commit 99ab75b8 authored by Olga Andreeva's avatar Olga Andreeva Committed by GitHub
Browse files

test: Resolving wrong model for testing (#5135)


Signed-off-by: default avatarOlga Andreeva <oandreeva@nvidia.com>
Signed-off-by: default avatarOlga Andreeva <124622579+oandreeva-nv@users.noreply.github.com>
Co-authored-by: default avatarGuanLuo <41310872+GuanLuo@users.noreply.github.com>
parent 2b157d89
...@@ -54,7 +54,7 @@ Environment variables control server settings and test load: ...@@ -54,7 +54,7 @@ Environment variables control server settings and test load:
- `--num-gpu-blocks-override` is applied when `gpu_blocks` is parametrized - `--num-gpu-blocks-override` is applied when `gpu_blocks` is parametrized
- Request/test parameters - Request/test parameters
- `KVBM_MAX_TOKENS` (default: `48`) - `KVBM_MAX_TOKENS` (default: `48`) - single integer for max tokens per request
- `KVBM_SEED` (default: `42`) - `KVBM_SEED` (default: `42`)
- `KVBM_MAX_ITERATIONS` (default: `500`) - `KVBM_MAX_ITERATIONS` (default: `500`)
- `KVBM_WORD_COUNT` (default: `200`) - `KVBM_WORD_COUNT` (default: `200`)
...@@ -64,19 +64,18 @@ Environment variables control server settings and test load: ...@@ -64,19 +64,18 @@ Environment variables control server settings and test load:
- `KVBM_HTTP_TIMEOUT` (default: `30` seconds) - `KVBM_HTTP_TIMEOUT` (default: `30` seconds)
- `KVBM_SHAKESPEARE_URL` (default: MIT OCW Shakespeare text) - `KVBM_SHAKESPEARE_URL` (default: MIT OCW Shakespeare text)
- Concurrent testing - Concurrent testing (only for `test_concurrent_determinism_with_ifeval`)
- `KVBM_CONCURRENT_REQUESTS` (default: `"3"` - comma-separated list for parametrization of max concurrent workers) - `KVBM_CONCURRENT_REQUESTS` (default: `3`) - comma-separated list for parametrization of max concurrent workers
- `KVBM_MAX_TOKENS` (default: `"10"` - comma-separated list for parametrization of max_tokens in concurrent tests) - `KVBM_IFEVAL_PROMPTS` (default: `120`) - comma-separated list for parametrization of number of IFEval prompts
- `KVBM_IFEVAL_PROMPTS` (default: `"120"` - comma-separated list for parametrization of number of IFEval prompts to use)
Example: ### Example
```bash ```bash
KVBM_MODEL_ID=Qwen/Qwen3-0.6B \ KVBM_MODEL_ID=deepseek-ai/DeepSeek-R1-Distill-Llama-8B \
KVBM_CPU_BLOCKS=12000 \ KVBM_CPU_BLOCKS=10000 \
KVBM_MAX_ITERATIONS=100 \ KVBM_MAX_ITERATIONS=100 \
KVBM_MAX_TOKENS=48 \
KVBM_CONCURRENT_REQUESTS="10,25,50" \ KVBM_CONCURRENT_REQUESTS="10,25,50" \
KVBM_MAX_TOKENS="48,128,256" \
KVBM_IFEVAL_PROMPTS="50,120,200" \ KVBM_IFEVAL_PROMPTS="50,120,200" \
pytest -v -m "kvbm" -s pytest -v -m "kvbm" -s
``` ```
......
...@@ -59,7 +59,9 @@ class ApiTester: ...@@ -59,7 +59,9 @@ class ApiTester:
self.base_url = ( self.base_url = (
base_url or os.environ.get("DYNAMO_API_BASE_URL") or "http://localhost:8000" base_url or os.environ.get("DYNAMO_API_BASE_URL") or "http://localhost:8000"
) )
self.model_id = model_id or os.environ.get("KVBM_MODEL_ID") or "Qwen/Qwen3-0.6B" self.model_id = model_id or os.environ.get(
"KVBM_MODEL_ID", "deepseek-ai/DeepSeek-R1-Distill-Llama-8B"
)
def make_request( def make_request(
self, self,
...@@ -555,7 +557,7 @@ def llm_server_kvbm(request, runtime_services): ...@@ -555,7 +557,7 @@ def llm_server_kvbm(request, runtime_services):
Usage in test files: Usage in test files:
@pytest.mark.parametrize("llm_server_kvbm", @pytest.mark.parametrize("llm_server_kvbm",
[{"cpu_blocks": 100, "gpu_blocks": 10}], indirect=True) [{"cpu_blocks": 100, "gpu_blocks": 10, "model": "Qwen/Qwen3-0.6B"}], indirect=True)
def test_example(llm_server_kvbm): def test_example(llm_server_kvbm):
... ...
""" """
...@@ -565,10 +567,14 @@ def llm_server_kvbm(request, runtime_services): ...@@ -565,10 +567,14 @@ def llm_server_kvbm(request, runtime_services):
from tests.utils.managed_process import ManagedProcess from tests.utils.managed_process import ManagedProcess
# Get cache configuration from request.param # Get configuration from request.param
params = getattr(request, "param", {}) params = getattr(request, "param", {})
cpu_blocks = params.get("cpu_blocks", 100) cpu_blocks = params.get("cpu_blocks", 100)
gpu_blocks = params.get("gpu_blocks", 10) gpu_blocks = params.get("gpu_blocks", 10)
model = params.get(
"model",
os.environ.get("KVBM_MODEL_ID", "deepseek-ai/DeepSeek-R1-Distill-Llama-8B"),
)
# Detect available server type # Detect available server type
if importlib.util.find_spec("vllm") is not None: if importlib.util.find_spec("vllm") is not None:
...@@ -583,7 +589,6 @@ def llm_server_kvbm(request, runtime_services): ...@@ -583,7 +589,6 @@ def llm_server_kvbm(request, runtime_services):
# Build vLLM command # Build vLLM command
port = 8000 port = 8000
model = os.environ.get("KVBM_MODEL_ID", "Qwen/Qwen3-0.6B")
command = [ command = [
"vllm", "vllm",
"serve", "serve",
......
...@@ -78,7 +78,7 @@ def test_directory(request): ...@@ -78,7 +78,7 @@ def test_directory(request):
def create_trtllm_config(test_directory: Path) -> Path: def create_trtllm_config(test_directory: Path) -> Path:
"""Create TensorRT-LLM config YAML file with KVBM connector configuration.""" """Create TensorRT-LLM config YAML file with KVBM connector configuration."""
config_path = test_directory / "trtllm_config.yaml" config_path = Path(os.path.join(test_directory, "trtllm_config.yaml"))
config = { config = {
"backend": "pytorch", "backend": "pytorch",
"cuda_graph_config": None, "cuda_graph_config": None,
...@@ -218,9 +218,8 @@ def frontend_server(test_directory, runtime_services): ...@@ -218,9 +218,8 @@ def frontend_server(test_directory, runtime_services):
) )
# Create separate log directory for frontend to avoid conflicts with vllm # Create separate log directory for frontend to avoid conflicts with vllm
frontend_log_dir = test_directory / "frontend" frontend_log_dir = Path(os.path.join(test_directory, "frontend")).absolute()
frontend_log_dir.mkdir(parents=True, exist_ok=True) frontend_log_dir.mkdir(parents=True, exist_ok=True)
log_file = frontend_log_dir / "python.log.txt"
# Create managed process and start via context manager # Create managed process and start via context manager
with ManagedProcess( with ManagedProcess(
...@@ -230,9 +229,11 @@ def frontend_server(test_directory, runtime_services): ...@@ -230,9 +229,11 @@ def frontend_server(test_directory, runtime_services):
timeout=120, # Increased timeout for frontend+router initialization timeout=120, # Increased timeout for frontend+router initialization
working_dir=str(test_directory), working_dir=str(test_directory),
display_output=False, display_output=False,
log_dir=str(frontend_log_dir), # Separate log directory log_dir=str(frontend_log_dir), # Absolute path keeps logs in test directory
) as frontend_process: ) as frontend_process:
logger.info(f"Frontend started on port {FRONTEND_PORT}") # Get actual log file path from ManagedProcess (it may modify log_dir to use temp directory)
log_file = Path(frontend_process._log_path)
logger.info(f"Frontend started on port {FRONTEND_PORT}, log file: {log_file}")
yield { yield {
"process": frontend_process, "process": frontend_process,
...@@ -309,9 +310,8 @@ def llm_worker(frontend_server, test_directory, runtime_services, engine_type): ...@@ -309,9 +310,8 @@ def llm_worker(frontend_server, test_directory, runtime_services, engine_type):
env["DYN_KVBM_TRTLLM_ZMQ_PORT"] = "20081" env["DYN_KVBM_TRTLLM_ZMQ_PORT"] = "20081"
# Create separate log directory for worker to avoid conflicts with frontend # Create separate log directory for worker to avoid conflicts with frontend
worker_log_dir = test_directory / engine worker_log_dir = Path(os.path.join(test_directory, engine)).absolute()
worker_log_dir.mkdir(parents=True, exist_ok=True) worker_log_dir.mkdir(parents=True, exist_ok=True)
log_file = worker_log_dir / "python.log.txt"
# Create managed process and start via context manager # Create managed process and start via context manager
with ManagedProcess( with ManagedProcess(
...@@ -321,9 +321,13 @@ def llm_worker(frontend_server, test_directory, runtime_services, engine_type): ...@@ -321,9 +321,13 @@ def llm_worker(frontend_server, test_directory, runtime_services, engine_type):
timeout=300, # Increased timeout for model loading and consolidator init timeout=300, # Increased timeout for model loading and consolidator init
working_dir=str(test_directory), working_dir=str(test_directory),
display_output=False, display_output=False,
log_dir=str(worker_log_dir), # Separate log directory log_dir=str(worker_log_dir), # Absolute path keeps logs in test directory
terminate_existing=False, terminate_existing=False,
) as worker_process: ) as worker_process:
# Get actual log file path from ManagedProcess (it may modify log_dir to use temp directory)
log_file = Path(worker_process._log_path)
logger.info(f"Worker log file: {log_file}")
logger.info( logger.info(
f"Waiting for {engine.upper()} worker and consolidator to initialize..." f"Waiting for {engine.upper()} worker and consolidator to initialize..."
) )
...@@ -645,9 +649,8 @@ class TestConsolidatorRouterE2E: ...@@ -645,9 +649,8 @@ class TestConsolidatorRouterE2E:
} }
) )
frontend_log_dir = test_directory / "frontend" frontend_log_dir = Path(os.path.join(test_directory, "frontend")).absolute()
frontend_log_dir.mkdir(parents=True, exist_ok=True) frontend_log_dir.mkdir(parents=True, exist_ok=True)
frontend_log = frontend_log_dir / "python.log.txt"
with ManagedProcess( with ManagedProcess(
command=frontend_command, command=frontend_command,
...@@ -656,8 +659,10 @@ class TestConsolidatorRouterE2E: ...@@ -656,8 +659,10 @@ class TestConsolidatorRouterE2E:
timeout=120, timeout=120,
working_dir=str(test_directory), working_dir=str(test_directory),
display_output=False, display_output=False,
log_dir=str(frontend_log_dir), log_dir=str(frontend_log_dir), # Absolute path keeps logs in test directory
) as _frontend_process: ) as _frontend_process:
# Get actual log file path from ManagedProcess
frontend_log = Path(_frontend_process._log_path)
logger.info(f"Frontend started on port {FRONTEND_PORT}") logger.info(f"Frontend started on port {FRONTEND_PORT}")
# Start worker with constrained GPU blocks but larger KVBM blocks # Start worker with constrained GPU blocks but larger KVBM blocks
...@@ -715,9 +720,8 @@ class TestConsolidatorRouterE2E: ...@@ -715,9 +720,8 @@ class TestConsolidatorRouterE2E:
if engine == "trtllm": if engine == "trtllm":
worker_env["DYN_KVBM_TRTLLM_ZMQ_PORT"] = "20081" worker_env["DYN_KVBM_TRTLLM_ZMQ_PORT"] = "20081"
worker_log_dir = test_directory / engine worker_log_dir = Path(os.path.join(test_directory, engine)).absolute()
worker_log_dir.mkdir(parents=True, exist_ok=True) worker_log_dir.mkdir(parents=True, exist_ok=True)
worker_log = worker_log_dir / "python.log.txt"
with ManagedProcess( with ManagedProcess(
command=worker_command, command=worker_command,
...@@ -726,9 +730,13 @@ class TestConsolidatorRouterE2E: ...@@ -726,9 +730,13 @@ class TestConsolidatorRouterE2E:
timeout=300, timeout=300,
working_dir=str(test_directory), working_dir=str(test_directory),
display_output=False, display_output=False,
log_dir=str(worker_log_dir), log_dir=str(
worker_log_dir
), # Absolute path keeps logs in test directory
terminate_existing=False, terminate_existing=False,
) as _worker_process: ) as _worker_process:
# Get actual log file path from ManagedProcess (it may modify log_dir to use temp directory)
worker_log = Path(_worker_process._log_path)
logger.info(f"Waiting for {engine.upper()} worker to initialize...") logger.info(f"Waiting for {engine.upper()} worker to initialize...")
# Wait for worker to register with frontend # Wait for worker to register with frontend
......
...@@ -364,7 +364,8 @@ def llm_server(request, runtime_services): ...@@ -364,7 +364,8 @@ def llm_server(request, runtime_services):
def tester(llm_server): def tester(llm_server):
"""Create determinism tester bound to the running server's base URL.""" """Create determinism tester bound to the running server's base URL."""
t = AggDeterminismTester( t = AggDeterminismTester(
base_url=llm_server.base_url, server_type=llm_server.server_type base_url=llm_server.base_url,
server_type=llm_server.server_type,
) )
t.download_shakespeare_text() t.download_shakespeare_text()
return t return t
...@@ -420,7 +421,7 @@ class TestDeterminismAgg(BaseTestDeterminism): ...@@ -420,7 +421,7 @@ class TestDeterminismAgg(BaseTestDeterminism):
) )
@pytest.mark.parametrize( @pytest.mark.parametrize(
"max_tokens", "max_tokens",
[int(x) for x in os.environ.get("KVBM_MAX_TOKENS", "10").split(",")], [int(os.environ.get("KVBM_MAX_TOKENS", "48"))],
) )
@pytest.mark.parametrize( @pytest.mark.parametrize(
"num_prompts", "num_prompts",
...@@ -441,12 +442,7 @@ class TestDeterminismAgg(BaseTestDeterminism): ...@@ -441,12 +442,7 @@ class TestDeterminismAgg(BaseTestDeterminism):
print("CONCURRENT DETERMINISM TEST WITH IFEVAL") print("CONCURRENT DETERMINISM TEST WITH IFEVAL")
print("=" * 70) print("=" * 70)
# Override max_tokens for this test iteration print(f"Using max_tokens={max_tokens} (from KVBM_MAX_TOKENS)")
original_max_tokens = os.environ.get("KVBM_MAX_TOKENS")
os.environ["KVBM_MAX_TOKENS"] = str(max_tokens)
print(
f"Using KVBM_MAX_TOKENS={max_tokens} (parametrized, original: {original_max_tokens or '48'})"
)
# Configuration comes from parametrize # Configuration comes from parametrize
print( print(
...@@ -602,12 +598,6 @@ class TestDeterminismAgg(BaseTestDeterminism): ...@@ -602,12 +598,6 @@ class TestDeterminismAgg(BaseTestDeterminism):
print(f"Success rate: {success_rate:.1%}") print(f"Success rate: {success_rate:.1%}")
print(f"Concurrent requests: {num_concurrent}") print(f"Concurrent requests: {num_concurrent}")
# Restore original max_tokens setting
if original_max_tokens is not None:
os.environ["KVBM_MAX_TOKENS"] = original_max_tokens
else:
os.environ.pop("KVBM_MAX_TOKENS", None)
assert ( assert (
success_rate == 1.0 success_rate == 1.0
), f"Determinism failed: {deterministic_count}/{total_compared} prompts deterministic" ), f"Determinism failed: {deterministic_count}/{total_compared} prompts deterministic"
......
...@@ -445,7 +445,8 @@ def llm_server(request, runtime_services): ...@@ -445,7 +445,8 @@ def llm_server(request, runtime_services):
def tester(llm_server): def tester(llm_server):
"""Create determinism tester bound to the running server's base URL.""" """Create determinism tester bound to the running server's base URL."""
t = DisaggDeterminismTester( t = DisaggDeterminismTester(
base_url=llm_server.base_url, server_type=llm_server.server_type base_url=llm_server.base_url,
server_type=llm_server.server_type,
) )
t.download_shakespeare_text() t.download_shakespeare_text()
return t return t
......
...@@ -14,7 +14,7 @@ These tests validate core KVBM functionality: ...@@ -14,7 +14,7 @@ These tests validate core KVBM functionality:
import pytest import pytest
import requests import requests
from .common import llm_server_kvbm # noqa: F401, F811 from .common import llm_server_kvbm # noqa: F401
from .common import DeterminismTester, assert_deterministic, fetch_kvbm_metrics from .common import DeterminismTester, assert_deterministic, fetch_kvbm_metrics
# Test configuration # Test configuration
...@@ -96,17 +96,23 @@ def reset_cache(base_url: str) -> None: ...@@ -96,17 +96,23 @@ def reset_cache(base_url: str) -> None:
print(f"Warning: Cache reset failed: {e}") print(f"Warning: Cache reset failed: {e}")
# Model used for test_kvbm tests (smaller model for faster CI)
KVBM_TEST_MODEL = "Qwen/Qwen3-0.6B"
# Fixtures # Fixtures
@pytest.fixture(scope="function") @pytest.fixture(scope="function")
def tester(llm_server_kvbm): # noqa: F811 def tester(llm_server_kvbm): # noqa: F811
"""Create tester bound to the KVBM-enabled server.""" """Create tester bound to the KVBM-enabled server."""
return DeterminismTester( return DeterminismTester(
base_url=llm_server_kvbm.base_url, base_url=llm_server_kvbm.base_url,
model_id=KVBM_TEST_MODEL,
server_type=llm_server_kvbm.server_type, server_type=llm_server_kvbm.server_type,
) )
# Tests # Tests
@pytest.mark.parametrize("llm_server_kvbm", [{"model": KVBM_TEST_MODEL}], indirect=True)
def test_offload_and_onboard(tester, llm_server_kvbm): # noqa: F811 def test_offload_and_onboard(tester, llm_server_kvbm): # noqa: F811
""" """
Test offload → cache reset → onboard cycle with determinism verification. Test offload → cache reset → onboard cycle with determinism verification.
...@@ -169,7 +175,9 @@ def test_offload_and_onboard(tester, llm_server_kvbm): # noqa: F811 ...@@ -169,7 +175,9 @@ def test_offload_and_onboard(tester, llm_server_kvbm): # noqa: F811
@pytest.mark.parametrize( @pytest.mark.parametrize(
"llm_server_kvbm", [{"cpu_blocks": 200, "gpu_blocks": 20}], indirect=True "llm_server_kvbm",
[{"cpu_blocks": 200, "gpu_blocks": 20, "model": KVBM_TEST_MODEL}],
indirect=True,
) )
def test_gpu_cache_eviction(tester, llm_server_kvbm): # noqa: F811 def test_gpu_cache_eviction(tester, llm_server_kvbm): # noqa: F811
""" """
...@@ -241,7 +249,9 @@ def test_gpu_cache_eviction(tester, llm_server_kvbm): # noqa: F811 ...@@ -241,7 +249,9 @@ def test_gpu_cache_eviction(tester, llm_server_kvbm): # noqa: F811
@pytest.mark.parametrize( @pytest.mark.parametrize(
"llm_server_kvbm", [{"cpu_blocks": 200, "gpu_blocks": 20}], indirect=True "llm_server_kvbm",
[{"cpu_blocks": 200, "gpu_blocks": 20, "model": KVBM_TEST_MODEL}],
indirect=True,
) )
def test_onboarding_determinism(tester, llm_server_kvbm): # noqa: F811 def test_onboarding_determinism(tester, llm_server_kvbm): # noqa: F811
""" """
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment