Unverified Commit cd7b1492 authored by Olga Andreeva's avatar Olga Andreeva Committed by GitHub
Browse files

test: Adding three basic KVBM integration tests for GitHub Actions CI (#5019)

parent 85b5c808
...@@ -59,11 +59,7 @@ class ApiTester: ...@@ -59,11 +59,7 @@ class ApiTester:
self.base_url = ( self.base_url = (
base_url or os.environ.get("DYNAMO_API_BASE_URL") or "http://localhost:8000" base_url or os.environ.get("DYNAMO_API_BASE_URL") or "http://localhost:8000"
) )
self.model_id = ( self.model_id = model_id or os.environ.get("KVBM_MODEL_ID") or "Qwen/Qwen3-0.6B"
model_id
or os.environ.get("KVBM_MODEL_ID")
or "deepseek-ai/DeepSeek-R1-Distill-Llama-8B"
)
def make_request( def make_request(
self, self,
...@@ -553,6 +549,112 @@ def tester(llm_server): ...@@ -553,6 +549,112 @@ def tester(llm_server):
return t return t
@pytest.fixture(scope="function")
def llm_server_kvbm(request, runtime_services):
"""Start LLM server with configurable cache sizes for KVBM testing.
Usage in test files:
@pytest.mark.parametrize("llm_server_kvbm",
[{"cpu_blocks": 100, "gpu_blocks": 10}], indirect=True)
def test_example(llm_server_kvbm):
...
"""
import importlib.util
import os
import time
from tests.utils.managed_process import ManagedProcess
# Get cache configuration from request.param
params = getattr(request, "param", {})
cpu_blocks = params.get("cpu_blocks", 100)
gpu_blocks = params.get("gpu_blocks", 10)
# Detect available server type
if importlib.util.find_spec("vllm") is not None:
server_type = ServerType.vllm
elif importlib.util.find_spec("tensorrt_llm") is not None:
server_type = ServerType.trtllm
pytest.skip("TensorRT-LLM tests are disabled for this test")
else:
pytest.skip(
"Neither vllm nor tensorrt_llm module is available in the current environment."
)
# Build vLLM command
port = 8000
model = os.environ.get("KVBM_MODEL_ID", "Qwen/Qwen3-0.6B")
command = [
"vllm",
"serve",
"--block-size",
"16",
"--port",
str(port),
"--kv-transfer-config",
'{"kv_connector":"DynamoConnector","kv_role":"kv_both", "kv_connector_module_path": "kvbm.vllm_integration.connector"}',
model,
"--max-model-len",
"8000", # Required to fit on L4 GPU with smaller models
]
# GPU blocks override
if gpu_blocks is not None:
command.extend(["--num-gpu-blocks-override", str(gpu_blocks)])
# Set up environment
env = os.environ.copy()
env.update(
{
"RUST_BACKTRACE": "1",
"VLLM_SERVER_DEV_MODE": "1",
"DYN_LOG": "debug",
"DYN_KVBM_METRICS": "true",
"DYN_KVBM_METRICS_PORT": "6880",
# DynamoConnector connection settings
"NATS_SERVER": "nats://localhost:4222",
"ETCD_ENDPOINTS": "http://localhost:2379",
}
)
# CPU cache blocks override via env
if cpu_blocks is not None:
env["DYN_KVBM_CPU_CACHE_OVERRIDE_NUM_BLOCKS"] = str(cpu_blocks)
# Start server with ManagedProcess
timeout = int(os.environ.get("KVBM_SERVER_START_TIMEOUT", "600"))
log_dir = f"{request.node.name}_vllm"
with ManagedProcess(
command=command,
env=env,
health_check_ports=[port, 6880], # vLLM server + KVBM metrics
timeout=timeout,
display_output=True,
terminate_existing=True,
stragglers=["vllm"],
straggler_commands=["vllm serve"],
log_dir=log_dir,
) as proc:
# Give KVBM connector extra time to fully initialize
print("Waiting 5 seconds for KVBM connector to fully initialize...")
time.sleep(5)
# Create wrapper object for compatibility with existing test code
class ServerWrapper:
"""Wrapper to maintain compatibility with LLMServerManager interface."""
def __init__(self):
self.base_url = f"http://localhost:{port}"
self.server_type = server_type
self.cpu_cache_blocks = cpu_blocks
self.gpu_cache_blocks = gpu_blocks
self.port = port
self.proc = proc
yield ServerWrapper()
class TestDeterminism: class TestDeterminism:
"""Test class for determinism validation.""" """Test class for determinism validation."""
...@@ -684,3 +786,94 @@ class TestDeterminism: ...@@ -684,3 +786,94 @@ class TestDeterminism:
assert ( assert (
success_rate >= success_rate_threshold success_rate >= success_rate_threshold
), f"Model is not deterministic across cache reset: {total_failed} comparisons failed, success rate {success_rate:.1%} lower than expected {success_rate_threshold*100}%" ), f"Model is not deterministic across cache reset: {total_failed} comparisons failed, success rate {success_rate:.1%} lower than expected {success_rate_threshold*100}%"
# ============================================================================
# KVBM Test Helper Functions
# ============================================================================
# Note: KVBM fixtures are in conftest.py for automatic pytest discovery
KVBM_METRICS_PORT = 6880
def parse_kvbm_metrics(metrics_text: str) -> dict:
"""Parse KVBM metrics from Prometheus format.
Args:
metrics_text: Raw Prometheus metrics text
Returns:
Dictionary mapping metric names to integer values
"""
metrics = {}
for line in metrics_text.split("\n"):
if line.startswith("#") or not line.strip():
continue
for metric_name in [
"kvbm_offload_blocks_d2h",
"kvbm_onboard_blocks_h2d",
"kvbm_offload_blocks_h2d",
"kvbm_onboard_blocks_d2d",
"kvbm_matched_tokens",
]:
if line.startswith(metric_name + " "):
parts = line.strip().split()
if len(parts) >= 2:
metrics[metric_name] = int(parts[1])
return metrics
def fetch_kvbm_metrics(port: int = KVBM_METRICS_PORT, timeout: int = 10) -> dict:
"""Fetch and parse KVBM metrics from the metrics endpoint.
Args:
port: Metrics server port (default: 6880)
timeout: Request timeout in seconds
Returns:
Dictionary of parsed metrics
Raises:
RuntimeError: If metrics endpoint is unreachable or returns error
"""
response = requests.get(f"http://localhost:{port}/metrics", timeout=timeout)
if response.status_code != 200:
raise RuntimeError(
f"Metrics endpoint returned status {response.status_code}. "
"Metrics server may not have started."
)
return parse_kvbm_metrics(response.text)
def assert_deterministic(
response1: str,
response2: str,
test_name: str = "",
label1: str = "Response 1",
label2: str = "Response 2",
) -> None:
"""Verify two responses are identical (deterministic).
Args:
response1: First response text
response2: Second response text
test_name: Name of test for error messages
label1: Label for first response in output
label2: Label for second response in output
Raises:
pytest.fail: If responses differ
"""
if response1 == response2:
print(f" ✓ PASS: {test_name} responses are deterministic")
print(f" {label1}: {response1}")
print(f" {label2}: {response2}")
else:
print(f" ✗ FAIL: {test_name} responses differ")
print(f" {label1}: {response1}")
print(f" {label2}: {response2}")
pytest.fail(
f"{test_name}: Responses not deterministic\n"
f"{label1}: {response1}\n"
f"{label2}: {response2}"
)
#!/usr/bin/env python3
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
"""
KVBM (KV Block Manager) integration tests for vLLM.
These tests validate core KVBM functionality:
1. Offload/Onboard: Request offloads to CPU, cache reset, re-request triggers onboarding
2. Eviction: GPU cache fills, blocks evicted, later retrieved without corruption
3. Determinism: Responses remain identical across offload/onboard/eviction cycles
"""
import pytest
import requests
from .common import llm_server_kvbm # noqa: F401, F811
from .common import DeterminismTester, assert_deterministic, fetch_kvbm_metrics
# Test configuration
MIN_OFFLOAD_BLOCKS = 12 # Minimum blocks expected for Qwen3-0.6B with test prompts
MAX_TOKENS = 15 # Max tokens to generate in test responses
# Shared test prompt (Aeldora story)
AELDORA_STORY = (
"In the heart of Eldoria, an ancient land of boundless magic and mysterious creatures, "
"lies the long-forgotten city of Aeloria. Once a beacon of knowledge and power, Aeloria "
"was buried beneath the shifting sands of time, lost to the world for centuries. You are "
"an intrepid explorer, known for your unparalleled curiosity and courage, who has stumbled "
"upon an ancient map hinting at secrets that Aeloria holds a secret so profound that it has "
"the potential to reshape the very fabric of reality. Your journey will take you through "
"treacherous deserts, enchanted forests, and across perilous mountain ranges. Your Task: "
"Character Background: Develop a detailed background for your character. Describe their "
"motivations for seeking out Aeloria, their skills and weaknesses, and any personal "
"connections to the ancient city or its legends. Are they driven by a quest for knowledge, "
"a search for lost familt clue is hidden."
)
# Test markers
pytestmark = [
pytest.mark.kvbm,
pytest.mark.e2e,
pytest.mark.gpu_1,
pytest.mark.vllm,
pytest.mark.pre_merge,
]
# Helper functions
def print_test_header(title: str) -> None:
"""Print a formatted test header."""
print(f"\n{'=' * 70}")
print(title)
print("=" * 70)
def print_phase(phase_num: int, description: str) -> None:
"""Print a formatted phase header."""
print(f"\n=== Phase {phase_num}: {description} ===")
def check_kvbm_metrics(phase_name: str) -> dict[str, int]:
"""Fetch and display KVBM metrics.
Args:
phase_name: Name of the test phase for logging
Returns:
Dictionary containing KVBM metrics with keys:
- kvbm_offload_blocks_d2h: Blocks offloaded from GPU to CPU
- kvbm_onboard_blocks_h2d: Blocks onboarded from CPU to GPU
"""
print(f"\n--- Checking KVBM metrics after {phase_name} ---")
metrics = fetch_kvbm_metrics()
offload_d2h = metrics.get("kvbm_offload_blocks_d2h", 0)
onboard_h2d = metrics.get("kvbm_onboard_blocks_h2d", 0)
print(f" kvbm_offload_blocks_d2h: {offload_d2h}")
print(f" kvbm_onboard_blocks_h2d: {onboard_h2d}")
return {
"kvbm_offload_blocks_d2h": offload_d2h,
"kvbm_onboard_blocks_h2d": onboard_h2d,
}
def reset_cache(base_url: str) -> None:
"""Reset the GPU prefix cache."""
print("Resetting prefix cache...")
try:
response = requests.post(f"{base_url}/reset_prefix_cache", timeout=30)
response.raise_for_status()
print("Cache reset successful")
except Exception as e:
print(f"Warning: Cache reset failed: {e}")
# Fixtures
@pytest.fixture(scope="function")
def tester(llm_server_kvbm): # noqa: F811
"""Create tester bound to the KVBM-enabled server."""
return DeterminismTester(
base_url=llm_server_kvbm.base_url,
server_type=llm_server_kvbm.server_type,
)
# Tests
def test_offload_and_onboard(tester, llm_server_kvbm): # noqa: F811
"""
Test offload → cache reset → onboard cycle with determinism verification.
Validates that:
- Initial request triggers offload to CPU cache
- Cache reset clears GPU cache
- Repeated request triggers onboard from CPU to GPU
- Responses are deterministic across the cycle
"""
print_test_header("OFFLOAD AND ONBOARD TEST")
# Use subset of Aeldora story for offload/onboard test
prompt = AELDORA_STORY[:400] # Use first ~400 chars for smaller cache footprint
# Phase 1: Initial request triggers offload
print_phase(1, "Initial request (expect offload to CPU)")
print(f"Sending request: {prompt[:80]}...")
response_1 = tester.make_request(prompt, max_tokens=MAX_TOKENS)
print(f"Response 1: {response_1}")
metrics = check_kvbm_metrics("Phase 1")
assert (
metrics["kvbm_offload_blocks_d2h"] > 0
), "Phase 1: No blocks offloaded. KVBM may not be triggering offloads."
assert (
metrics["kvbm_onboard_blocks_h2d"] == 0
), f"Phase 1: Expected 0 onboarded blocks, got {metrics['kvbm_onboard_blocks_h2d']}"
print(f"✓ Phase 1: {metrics['kvbm_offload_blocks_d2h']} blocks offloaded")
# Phase 2: Reset GPU cache
print_phase(2, "Clean up GPU cache")
reset_cache(llm_server_kvbm.base_url)
# Phase 3: Repeated request triggers onboard
print_phase(3, "Re-send same request (expect onboard from CPU)")
print(f"Sending same request: {prompt[:80]}...")
response_2 = tester.make_request(prompt, max_tokens=MAX_TOKENS)
print(f"Response 2: {response_2}")
metrics = check_kvbm_metrics("Phase 3")
assert (
metrics["kvbm_onboard_blocks_h2d"] > 0
), "Phase 3: No blocks onboarded. Expected CPU→GPU transfer after cache reset."
print(f"✓ Phase 3: {metrics['kvbm_onboard_blocks_h2d']} blocks onboarded from CPU")
# Verify determinism
print_test_header("DETERMINISM VERIFICATION")
assert_deterministic(
response_1,
response_2,
test_name="Offload/Onboard",
label1="Initial response",
label2="After cache reset",
)
print("\n=== TEST PASSED ===")
@pytest.mark.parametrize(
"llm_server_kvbm", [{"cpu_blocks": 200, "gpu_blocks": 20}], indirect=True
)
def test_gpu_cache_eviction(tester, llm_server_kvbm): # noqa: F811
"""
Test GPU cache eviction mechanics.
Validates that:
- Multiple requests fill GPU cache causing eviction
- Evicted blocks can be retrieved from CPU cache via onboarding
- Metrics correctly reflect offload and onboard operations
"""
print_test_header("GPU CACHE EVICTION TEST")
print(f"GPU blocks: {llm_server_kvbm.gpu_cache_blocks}")
print(f"CPU blocks: {llm_server_kvbm.cpu_cache_blocks}")
# Use full Aeldora story with variations for cache filling
prompt_1 = AELDORA_STORY
prompt_2 = (
"Read the following entry from the ancient scrolls of Aeloria: " + AELDORA_STORY
)
# Phase 1: First request triggers offload
print_phase(1, "Send first request")
print(f"Prompt 1: {prompt_1[:80]}...")
tester.make_request(prompt_1, max_tokens=MAX_TOKENS)
metrics_p1 = check_kvbm_metrics("Phase 1")
assert metrics_p1["kvbm_offload_blocks_d2h"] >= MIN_OFFLOAD_BLOCKS, (
f"Phase 1: Expected >= {MIN_OFFLOAD_BLOCKS} blocks offloaded, "
f"got {metrics_p1['kvbm_offload_blocks_d2h']}"
)
assert (
metrics_p1["kvbm_onboard_blocks_h2d"] == 0
), f"Phase 1: Expected 0 onboarded, got {metrics_p1['kvbm_onboard_blocks_h2d']}"
print(f"✓ Phase 1: {metrics_p1['kvbm_offload_blocks_d2h']} blocks offloaded")
# Phase 2: Second request may evict first from GPU
print_phase(2, "Send second request (may evict first from GPU)")
print(f"Prompt 2: {prompt_2[:80]}...")
tester.make_request(prompt_2, max_tokens=MAX_TOKENS)
metrics_p2 = check_kvbm_metrics("Phase 2")
assert (
metrics_p2["kvbm_offload_blocks_d2h"] > metrics_p1["kvbm_offload_blocks_d2h"]
), (
f"Phase 2: Expected additional offloads, got {metrics_p2['kvbm_offload_blocks_d2h']} "
f"(was {metrics_p1['kvbm_offload_blocks_d2h']})"
)
additional_offloads = (
metrics_p2["kvbm_offload_blocks_d2h"] - metrics_p1["kvbm_offload_blocks_d2h"]
)
print(f"✓ Phase 2: {additional_offloads} additional blocks offloaded")
# Phase 3: Re-request first prompt (should onboard from CPU)
print_phase(3, "Re-request first prompt (verify onboarding)")
print(f"Re-sending Prompt 1: {prompt_1[:80]}...")
tester.make_request(prompt_1, max_tokens=MAX_TOKENS)
metrics_p3 = check_kvbm_metrics("Phase 3")
assert (
metrics_p3["kvbm_onboard_blocks_h2d"] > 0
), "Phase 3: No blocks onboarded. Expected CPU→GPU retrieval after eviction."
print(f"✓ Phase 3: {metrics_p3['kvbm_onboard_blocks_h2d']} blocks onboarded")
print("✓ Eviction mechanics verified: offload → eviction → onboard")
print("\n=== TEST PASSED ===")
@pytest.mark.parametrize(
"llm_server_kvbm", [{"cpu_blocks": 200, "gpu_blocks": 20}], indirect=True
)
def test_onboarding_determinism(tester, llm_server_kvbm): # noqa: F811
"""
Test onboarding determinism under eviction scenario.
Validates that:
- Multiple onboarding cycles produce deterministic results
- Responses are consistent when blocks are onboarded multiple times
- Tests onboarded vs onboarded (not initial vs onboarded)
"""
print_test_header("ONBOARDING DETERMINISM TEST")
print(f"GPU blocks: {llm_server_kvbm.gpu_cache_blocks}")
print(f"CPU blocks: {llm_server_kvbm.cpu_cache_blocks}")
# Use full Aeldora story with variations
prompt_1 = AELDORA_STORY
prompt_2 = (
"Read the following entry from the ancient scrolls of Aeloria: " + AELDORA_STORY
)
# Phase 1: First request triggers offload
print_phase(1, "Send first request")
print(f"Prompt 1: {prompt_1[:80]}...")
tester.make_request(prompt_1, max_tokens=MAX_TOKENS)
check_kvbm_metrics("Phase 1")
# Phase 2: Second request (may evict first from GPU)
print_phase(2, "Send second request (may evict first from GPU)")
print(f"Prompt 2: {prompt_2[:80]}...")
tester.make_request(prompt_2, max_tokens=MAX_TOKENS)
check_kvbm_metrics("Phase 2")
# Phase 3: Re-request prompt 1 (first onboard cycle)
print_phase(3, "Re-request Prompt 1 (first onboard cycle)")
print(f"Re-sending Prompt 1: {prompt_1[:80]}...")
response_1_first_onboard = tester.make_request(prompt_1, max_tokens=MAX_TOKENS)
print(f"Response 1 (first onboard): {response_1_first_onboard}")
check_kvbm_metrics("Phase 3")
# Phase 4: Re-request prompt 2 (first onboard cycle)
print_phase(4, "Re-request Prompt 2 (first onboard cycle)")
print(f"Re-sending Prompt 2: {prompt_2[:80]}...")
response_2_first_onboard = tester.make_request(prompt_2, max_tokens=MAX_TOKENS)
print(f"Response 2 (first onboard): {response_2_first_onboard}")
check_kvbm_metrics("Phase 4")
# Phase 5: Re-request prompt 1 (second onboard cycle)
print_phase(5, "Re-request Prompt 1 (second onboard cycle)")
print(f"Re-sending Prompt 1 (third time): {prompt_1[:80]}...")
response_1_second_onboard = tester.make_request(prompt_1, max_tokens=MAX_TOKENS)
print(f"Response 1 (second onboard): {response_1_second_onboard}")
check_kvbm_metrics("Phase 5")
# Phase 6: Re-request prompt 2 (second onboard cycle)
print_phase(6, "Re-request Prompt 2 (second onboard cycle)")
print(f"Re-sending Prompt 2 (third time): {prompt_2[:80]}...")
response_2_second_onboard = tester.make_request(prompt_2, max_tokens=MAX_TOKENS)
print(f"Response 2 (second onboard): {response_2_second_onboard}")
check_kvbm_metrics("Phase 6")
# Verify determinism between onboarded requests
print_test_header("DETERMINISM VERIFICATION")
print("\nComparing Prompt 1: First onboard vs Second onboard")
assert_deterministic(
response_1_first_onboard,
response_1_second_onboard,
test_name="Prompt 1 onboarding determinism",
label1="First onboard (Phase 3)",
label2="Second onboard (Phase 5)",
)
print("\nComparing Prompt 2: First onboard vs Second onboard")
assert_deterministic(
response_2_first_onboard,
response_2_second_onboard,
test_name="Prompt 2 onboarding determinism",
label1="First onboard (Phase 4)",
label2="Second onboard (Phase 6)",
)
print("\n=== TEST PASSED ===")
if __name__ == "__main__":
pytest.main([__file__, "-v", "-s"])
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment