Unverified Commit 224f63f5 authored by Olga Andreeva's avatar Olga Andreeva Committed by GitHub
Browse files

test: Fixing kvbm tests/ adding concurrency test (#5474)

parent 199d11f5
...@@ -309,7 +309,7 @@ impl Worker for KvConnectorWorker { ...@@ -309,7 +309,7 @@ impl Worker for KvConnectorWorker {
if self.layers_complete == self.kv_cache_layers.len() { if self.layers_complete == self.kv_cache_layers.len() {
let offloading_operations = std::mem::take(&mut self.offloading_operations); let offloading_operations = std::mem::take(&mut self.offloading_operations);
tracing::info!( tracing::trace!(
iteration = self.iteration, iteration = self.iteration,
num_operations = offloading_operations.len(), num_operations = offloading_operations.len(),
"All layers complete, enqueuing {} offload operations", "All layers complete, enqueuing {} offload operations",
......
...@@ -233,6 +233,7 @@ markers = [ ...@@ -233,6 +233,7 @@ markers = [
"router: marks tests for router component", "router: marks tests for router component",
"planner: marks tests for planner component", "planner: marks tests for planner component",
"kvbm: marks tests for KV behavior and model determinism", "kvbm: marks tests for KV behavior and model determinism",
"kvbm_concurrency: marks concurrency stress tests for KVBM (runs separately)",
"model: model id used by a test or parameter", "model: model id used by a test or parameter",
"custom_build: marks tests that require custom builds or special setup (e.g., MoE models)", "custom_build: marks tests that require custom builds or special setup (e.g., MoE models)",
"k8s: marks tests as requiring Kubernetes", "k8s: marks tests as requiring Kubernetes",
......
...@@ -55,6 +55,7 @@ def pytest_configure(config): ...@@ -55,6 +55,7 @@ def pytest_configure(config):
"planner: marks tests for planner component", "planner: marks tests for planner component",
"kvbm: marks tests for KV behavior and model determinism", "kvbm: marks tests for KV behavior and model determinism",
"kvbm_v2: marks tests using KVBM V2", "kvbm_v2: marks tests using KVBM V2",
"kvbm_concurrency: marks concurrency stress tests for KVBM (runs separately)",
"model: model id used by a test or parameter", "model: model id used by a test or parameter",
"custom_build: marks tests that require custom builds or special setup (e.g., MoE models)", "custom_build: marks tests that require custom builds or special setup (e.g., MoE models)",
"k8s: marks tests as requiring Kubernetes", "k8s: marks tests as requiring Kubernetes",
......
...@@ -3,11 +3,23 @@ ...@@ -3,11 +3,23 @@
"""Unit tests to verify KVBM package and wheels are properly installed.""" """Unit tests to verify KVBM package and wheels are properly installed."""
import importlib.util
import subprocess import subprocess
import pytest import pytest
def _is_sglang_installed() -> bool:
"""Check if sglang is installed (KVBM is not available in sglang images)."""
return importlib.util.find_spec("sglang") is not None
# Skip all KVBM tests if running in sglang environment (sglang doesn't have KVBM)
pytestmark = pytest.mark.skipif(
_is_sglang_installed(), reason="KVBM is not available in sglang images"
)
# Helper functions for KVBM verification # Helper functions for KVBM verification
def _check_kvbm_wheel_exists(): def _check_kvbm_wheel_exists():
"""Helper to verify KVBM wheel file exists in expected location.""" """Helper to verify KVBM wheel file exists in expected location."""
......
...@@ -9,11 +9,12 @@ This module contains shared classes and functions used by both ...@@ -9,11 +9,12 @@ This module contains shared classes and functions used by both
aggregated and disaggregated determinism tests. aggregated and disaggregated determinism tests.
""" """
import importlib.util
import os import os
import re import re
import time import time
from collections import defaultdict from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor, as_completed from difflib import SequenceMatcher
from enum import Enum from enum import Enum
from pathlib import Path from pathlib import Path
from typing import Dict, List, Optional, Tuple from typing import Dict, List, Optional, Tuple
...@@ -21,6 +22,112 @@ from typing import Dict, List, Optional, Tuple ...@@ -21,6 +22,112 @@ from typing import Dict, List, Optional, Tuple
import pytest import pytest
import requests import requests
# ============================================================================
# Module Availability Checks
# ============================================================================
def check_module_available(module_name: str) -> bool:
"""Check if a Python module is available and importable.
This function first checks if the module spec can be found, then attempts
to actually import it to ensure it's functional.
Args:
module_name: Name of the module to check (e.g., "vllm", "tensorrt_llm")
Returns:
True if the module is available and importable, False otherwise
Example:
>>> has_vllm = check_module_available("vllm")
>>> has_trtllm = check_module_available("tensorrt_llm")
"""
if importlib.util.find_spec(module_name) is None:
return False
try:
importlib.import_module(module_name)
return True
except ImportError:
return False
def calculate_semantic_similarity(text1: str, text2: str) -> float:
"""
Calculate semantic similarity between two texts using character-level matching.
Returns a similarity ratio between 0 and 1:
- 1.0 = exact match
- 0.8+ = semantically equivalent (minor word changes)
- <0.7 = significantly different
"""
matcher = SequenceMatcher(None, text1, text2)
return matcher.ratio()
def are_semantically_equivalent(
text1: str,
text2: str,
min_similarity: float = 0.75,
prefix_exact_match_ratio: float = 0.5,
) -> tuple:
"""
Check if two texts are semantically equivalent.
Checks both overall similarity and prefix matching to ensure early tokens
are deterministic (where FP errors haven't accumulated).
Args:
text1: First text (baseline)
text2: Second text (response to compare)
min_similarity: Minimum similarity ratio (0-1) to consider equivalent
prefix_exact_match_ratio: Ratio of text that must exactly match from start
Returns:
(is_equivalent, similarity_score, reason)
"""
# Calculate overall similarity
similarity = calculate_semantic_similarity(text1, text2)
# Check prefix match (first X% must be exact to avoid early divergence)
prefix_len = int(min(len(text1), len(text2)) * prefix_exact_match_ratio)
prefix_match = text1[:prefix_len] == text2[:prefix_len]
if similarity >= min_similarity:
if prefix_match:
return (
True,
similarity,
f"Semantically equivalent ({similarity:.1%} similar, prefix matches)",
)
else:
return (
False,
similarity,
f"High similarity but early divergence (prefix mismatch at {prefix_len} chars)",
)
else:
return (False, similarity, f"Low similarity ({similarity:.1%})")
def load_prompt_from_file(prompt_path: Path) -> Optional[str]:
"""Load and preprocess prompt from file.
Args:
prompt_path: Path to the prompt file
Returns:
Cleaned prompt content, or None if file doesn't exist
"""
if not prompt_path.exists():
return None
with open(prompt_path, "r", encoding="utf-8") as f:
# Strip SPDX license header lines (start with #)
lines = f.readlines()
content_lines = [line for line in lines if not line.startswith("#")]
return "".join(content_lines).strip()
def check_logs_for_patterns( def check_logs_for_patterns(
log_path: Path, patterns: List[str], process_name: str log_path: Path, patterns: List[str], process_name: str
...@@ -146,7 +253,7 @@ class DeterminismTester(ApiTester): ...@@ -146,7 +253,7 @@ class DeterminismTester(ApiTester):
self.server_type = server_type self.server_type = server_type
self.shakespeare_file = Path("t8.shakespeare.txt") self.shakespeare_file = Path("t8.shakespeare.txt")
self.max_iterations = int(os.environ.get("KVBM_MAX_ITERATIONS", "500")) self.max_iterations = int(os.environ.get("KVBM_MAX_ITERATIONS", "100"))
self.word_count = int(os.environ.get("KVBM_WORD_COUNT", "200")) self.word_count = int(os.environ.get("KVBM_WORD_COUNT", "200"))
# Test intervals # Test intervals
...@@ -278,38 +385,6 @@ class DeterminismTester(ApiTester): ...@@ -278,38 +385,6 @@ class DeterminismTester(ApiTester):
end_word = min(start_word + self.word_count, len(words)) end_word = min(start_word + self.word_count, len(words))
return " ".join(words[start_word:end_word]) return " ".join(words[start_word:end_word])
def download_ifeval_dataset(self) -> List[str]:
"""Download and extract all prompts from IFEval dataset."""
try:
from datasets import load_dataset
print("Loading complete IFEval dataset...")
dataset = load_dataset("google/IFEval", split="train")
# Extract all prompts from the dataset
prompts = []
for example in dataset:
# IFEval has 'prompt' field with the instruction
if "prompt" in example:
prompt_text = example["prompt"].strip()
if prompt_text: # Only skip empty prompts
prompts.append(prompt_text)
print(f"Loaded {len(prompts)} prompts from complete IFEval dataset")
return prompts
except ImportError:
print(
"Warning: datasets library not available, falling back to default prompts"
)
return self.control_sequences + self.random_sequences
except Exception as e:
print(
f"Warning: Failed to load IFEval dataset ({e}), falling back to default prompts"
)
return self.control_sequences + self.random_sequences
def run_test_iterations(self): def run_test_iterations(self):
"""Run the test iterations with comprehensive warmup.""" """Run the test iterations with comprehensive warmup."""
# Perform initial warmup before testing # Perform initial warmup before testing
...@@ -405,141 +480,6 @@ class DeterminismTester(ApiTester): ...@@ -405,141 +480,6 @@ class DeterminismTester(ApiTester):
return passed, failed return passed, failed
def test_concurrent_determinism(
self, prompts: List[str], num_workers: int = 4, requests_per_prompt: int = 3
) -> bool:
"""Test determinism with concurrent requests to the same prompts."""
print("\n=== CONCURRENT DETERMINISM TEST ===")
print(f"Workers: {num_workers}, Requests per prompt: {requests_per_prompt}")
# Prepare test data: each prompt will get multiple concurrent requests
test_tasks = []
for i, prompt in enumerate(prompts):
for req_num in range(requests_per_prompt):
test_tasks.append(
{
"prompt_idx": i,
"prompt": prompt,
"request_id": f"p{i}_r{req_num}",
}
)
print(f"Total concurrent requests: {len(test_tasks)}")
# Storage for responses grouped by prompt
concurrent_responses: Dict[int, List[Tuple[str, str]]] = defaultdict(list)
def make_concurrent_request(task):
"""Worker function for concurrent requests."""
try:
response = self.make_request(task["prompt"])
return {
"prompt_idx": task["prompt_idx"],
"request_id": task["request_id"],
"response": response,
"success": True,
"error": None,
}
except Exception as e:
return {
"prompt_idx": task["prompt_idx"],
"request_id": task["request_id"],
"response": None,
"success": False,
"error": str(e),
}
# Execute concurrent requests
print("Executing concurrent requests...")
start_time = time.time()
with ThreadPoolExecutor(max_workers=num_workers) as executor:
# Submit all tasks
future_to_task = {
executor.submit(make_concurrent_request, task): task
for task in test_tasks
}
# Collect results
completed = 0
failed = 0
for future in as_completed(future_to_task):
result = future.result()
completed += 1
if result["success"]:
concurrent_responses[result["prompt_idx"]].append(
(result["request_id"], result["response"])
)
if completed % 10 == 0:
print(f" Completed: {completed}/{len(test_tasks)}")
else:
failed += 1
print(f" Failed request {result['request_id']}: {result['error']}")
elapsed = time.time() - start_time
print(
f"Completed {completed} requests in {elapsed:.2f}s ({completed/elapsed:.1f} req/s)"
)
print(f"Failed requests: {failed}")
# Analyze concurrent determinism
print("\n=== CONCURRENT DETERMINISM ANALYSIS ===")
total_prompts_tested = 0
deterministic_prompts = 0
for prompt_idx, responses in concurrent_responses.items():
if len(responses) < 2:
print(
f"Prompt {prompt_idx}: Only {len(responses)} response(s), skipping"
)
continue
total_prompts_tested += 1
prompt_text = prompts[prompt_idx]
print(f"\nPrompt {prompt_idx}: {prompt_text[:50]}...")
print(f"Concurrent responses: {len(responses)}")
# Extract just the response text
response_texts = [resp[1] for resp in responses]
request_ids = [resp[0] for resp in responses]
# Check if all responses are identical
reference_response = response_texts[0]
mismatches = []
for req_id, response_text in zip(request_ids[1:], response_texts[1:]):
if response_text != reference_response:
mismatches.append((req_id, response_text))
if not mismatches:
print(
f" DETERMINISTIC: All {len(responses)} concurrent responses identical"
)
print(f" Response: {reference_response}")
deterministic_prompts += 1
else:
print(f" NON-DETERMINISTIC: {len(mismatches)} different responses")
print(f" Reference ({request_ids[0]}): {reference_response}")
for req_id, diff_response in mismatches:
print(f" Different ({req_id}): {diff_response}")
# Final assessment
success_rate = (
deterministic_prompts / total_prompts_tested
if total_prompts_tested > 0
else 0
)
print("\n=== FINAL CONCURRENT DETERMINISM RESULT ===")
print(f"Prompts tested: {total_prompts_tested}")
print(f"Deterministic: {deterministic_prompts}")
print(f"Non-deterministic: {total_prompts_tested - deterministic_prompts}")
print(f"Success rate: {success_rate:.1%}")
print(f"Concurrency level: {num_workers} workers")
print(f"Request rate: {completed/elapsed:.1f} req/s")
return success_rate == 1.0
@pytest.fixture(scope="function") @pytest.fixture(scope="function")
def tester(llm_server): def tester(llm_server):
...@@ -561,7 +501,6 @@ def llm_server_kvbm(request, runtime_services): ...@@ -561,7 +501,6 @@ def llm_server_kvbm(request, runtime_services):
def test_example(llm_server_kvbm): def test_example(llm_server_kvbm):
... ...
""" """
import importlib.util
import os import os
import time import time
...@@ -577,9 +516,9 @@ def llm_server_kvbm(request, runtime_services): ...@@ -577,9 +516,9 @@ def llm_server_kvbm(request, runtime_services):
) )
# Detect available server type # Detect available server type
if importlib.util.find_spec("vllm") is not None: if check_module_available("vllm"):
server_type = ServerType.vllm server_type = ServerType.vllm
elif importlib.util.find_spec("tensorrt_llm") is not None: elif check_module_available("tensorrt_llm"):
server_type = ServerType.trtllm server_type = ServerType.trtllm
pytest.skip("TensorRT-LLM tests are disabled for this test") pytest.skip("TensorRT-LLM tests are disabled for this test")
else: else:
...@@ -663,6 +602,446 @@ def llm_server_kvbm(request, runtime_services): ...@@ -663,6 +602,446 @@ def llm_server_kvbm(request, runtime_services):
class TestDeterminism: class TestDeterminism:
"""Test class for determinism validation.""" """Test class for determinism validation."""
def _establish_baseline(self, tester, prompt: str, max_tokens: int) -> str:
"""Establish baseline response: warmup -> clear cache -> baseline."""
print("\n" + "=" * 70)
print("ESTABLISHING BASELINE (warmup -> clear cache -> baseline)")
print("=" * 70)
# Step 1: Warmup
print("\nStep 1: Warmup request...")
try:
warmup_response = tester.make_request(
prompt, max_tokens=max_tokens, temperature=0, seed=42
)
print(f"Warmup response: {warmup_response}")
except Exception as e:
pytest.fail(f"Warmup request failed: {e}")
# Step 2: Clear cache
print("\nStep 2: Clearing cache...")
try:
tester.reset_prefix_cache()
print("Cache cleared successfully")
except Exception as e:
print(f"Warning: Cache reset failed: {e}")
# Step 3: Baseline request
print("\nStep 3: Baseline request (after cache clear)...")
try:
baseline_response = tester.make_request(
prompt, max_tokens=max_tokens, temperature=0, seed=42
)
print(f"Baseline response: {baseline_response}")
print("\n✓ Baseline established")
print("=" * 70)
return baseline_response
except Exception as e:
pytest.fail(f"Baseline request failed: {e}")
def _start_benchmark(self, llm_server) -> tuple:
"""Start vllm bench in background.
Returns:
tuple: (process, file_handle, log_path)
"""
import subprocess
model = os.environ.get(
"KVBM_MODEL_ID", "deepseek-ai/DeepSeek-R1-Distill-Llama-8B"
)
bench_cmd = [
"vllm",
"bench",
"serve",
"--backend",
"vllm",
"--model",
model,
"--base-url",
llm_server.base_url,
"--dataset-name",
"random",
"--random-input-len",
"4000",
"--random-output-len",
"180",
"--max-concurrency",
"7",
"--num-prompts",
"2000",
]
print(f"\nStarting vllm bench: {' '.join(bench_cmd)}")
bench_log = os.path.join(str(Path(".")), "vllm_bench_semantic.log")
bench_file = open(bench_log, "w")
bench_process = subprocess.Popen(
bench_cmd,
stdout=bench_file,
stderr=subprocess.STDOUT,
env=os.environ.copy(),
)
return bench_process, bench_file, bench_log
def _wait_for_benchmark_activity(self, initial_offload: int) -> bool:
"""Wait for benchmark to start creating offload activity.
Args:
initial_offload: Initial offload block count to compare against
Returns:
bool: True if benchmark activity detected, False otherwise
"""
print("\nWaiting for benchmark to start and create memory pressure...")
max_wait = int(os.environ.get("KVBM_BENCH_STARTUP_WAIT", "120"))
for wait_iteration in range(max_wait // 5):
time.sleep(5)
elapsed = (wait_iteration + 1) * 5
try:
current_metrics = fetch_kvbm_metrics()
current_offload = current_metrics.get("kvbm_offload_blocks_d2h", 0)
if current_offload > initial_offload:
offload_delta = current_offload - initial_offload
print(
f" Benchmark activity detected after {elapsed}s ({offload_delta} blocks offloaded)"
)
print("Waiting additional 10s for benchmark to fully ramp up...")
time.sleep(10)
return True
else:
print(f" Waiting... ({elapsed}s elapsed, no offload activity yet)")
except Exception as e:
print(f" Waiting... ({elapsed}s elapsed, metrics check failed: {e})")
print(f" Warning: No benchmark activity detected after {max_wait}s")
return False
def _compare_with_baseline(
self, response: str, baseline: str, min_similarity: float, request_num: int
) -> dict:
"""Compare response with baseline. Returns comparison result dict.
Returns a dict with keys:
- exact_match: bool - True if response exactly matches baseline
- semantic_match: bool - True if semantically equivalent (includes exact matches)
- similarity: float - Similarity score 0.0-1.0
- reason: str - Explanation of the result
- diverge_pos: int - Character position where divergence starts (if not matching)
- approx_token: int - Approximate token position of divergence
- context_before: str - Text context before divergence point
- baseline_continues: str - How baseline continues after divergence
- response_continues: str - How response continues after divergence
- request_num: int - Request number
- response: str - Full response text
- baseline: str - Full baseline text
"""
result = {
"request_num": request_num,
"exact_match": False,
"semantic_match": False,
"similarity": 0.0,
"reason": "",
"response": response,
"baseline": baseline,
}
# Check for exact match
if response == baseline:
result["exact_match"] = True
result["semantic_match"] = True
result["similarity"] = 1.0
result["reason"] = "Exact match"
return result
# Check semantic equivalence
is_equivalent, similarity, reason = are_semantically_equivalent(
baseline, response, min_similarity=min_similarity
)
result["similarity"] = similarity
result["reason"] = reason
if is_equivalent:
result["semantic_match"] = True
else:
# Find divergence point for reporting
diverge_pos = 0
for j, (c1, c2) in enumerate(zip(baseline, response)):
if c1 != c2:
diverge_pos = j
break
else:
diverge_pos = min(len(baseline), len(response))
approx_token = diverge_pos // 4
result["diverge_pos"] = diverge_pos
result["approx_token"] = approx_token
result["context_before"] = baseline[max(0, diverge_pos - 30) : diverge_pos]
result["baseline_continues"] = baseline[diverge_pos : diverge_pos + 50]
result["response_continues"] = response[diverge_pos : diverge_pos + 50]
return result
def _report_results(
self,
num_requests: int,
exact_matches: int,
semantic_matches: int,
mismatches: list,
):
"""Print final test results."""
print("\n" + "=" * 70)
print("SEMANTIC DETERMINISM RESULTS")
print("=" * 70)
print(f"Total requests: {num_requests}")
print(
f"Exact matches: {exact_matches}/{num_requests} ({exact_matches/num_requests:.1%})"
)
print(
f"Semantic matches: {semantic_matches}/{num_requests} ({semantic_matches/num_requests:.1%})"
)
print(
f"Semantic divergence: {len(mismatches)}/{num_requests} ({len(mismatches)/num_requests:.1%})"
)
if mismatches:
print(f"\n{'='*70}")
print(f"NON-DETERMINISTIC RESPONSES ({len(mismatches)} total):")
print(f"{'='*70}")
for mismatch in mismatches:
req_num = mismatch["request_num"]
if "error" in mismatch:
print(f"\nRequest {req_num}: FAILED - {mismatch['error']}")
else:
print(
f"\nRequest {req_num}: MISMATCH (similarity: {mismatch.get('similarity', 0):.1%})"
)
print(f" Baseline: {mismatch.get('baseline', '')[:150]}...")
print(f" Got: {mismatch.get('response', '')[:150]}...")
semantic_success_rate = (semantic_matches / num_requests) * 100
min_success_rate = 80.0
print(f"\n{'='*70}")
print(f"SEMANTIC SUCCESS RATE: {semantic_success_rate:.1f}%")
print(f"{'='*70}")
print(f"Failed requests: {[m['request_num'] for m in mismatches]}")
if semantic_success_rate < min_success_rate:
pytest.fail(
f"Semantic determinism test failed!\n"
f"Semantic match rate: {semantic_success_rate:.1f}% (< {min_success_rate:.0f}%)\n"
f"This indicates significant non-determinism beyond FP precision effects"
)
else:
print(
f"TEST PASSED - SEMANTICALLY DETERMINISTIC (>= {min_success_rate:.0f}%)"
)
else:
print(f"\n{'='*70}")
print("TEST PASSED - ALL RESPONSES SEMANTICALLY EQUIVALENT")
print(f"{'='*70}")
print(
f"Exact matches: {exact_matches}/{num_requests} ({exact_matches/num_requests:.1%})"
)
def _show_final_kvbm_stats(self, initial_offload: int):
"""Display final KVBM metrics and compare with initial state.
Args:
initial_offload: Initial offload block count to compare against
Raises:
pytest.fail: If no offload activity was detected during the test
"""
print(f"\n{'='*70}")
print("FINAL KVBM STATS")
print(f"{'='*70}")
try:
final_metrics = fetch_kvbm_metrics()
final_offload = final_metrics.get("kvbm_offload_blocks_d2h", 0)
final_onboard = final_metrics.get("kvbm_onboard_blocks_h2d", 0)
offload_delta = final_offload - initial_offload
print(f"Initial offload: {initial_offload} blocks")
print(f"Final offload: {final_offload} blocks")
print(f"Total offloaded: {offload_delta} blocks")
print(f"Total onboarded: {final_onboard} blocks")
if offload_delta > 0:
print(
f"\n KVBM offload activity detected: {offload_delta} blocks offloaded"
)
else:
pytest.fail(
f"No offload activity detected during test.\n"
f"Initial offload: {initial_offload} blocks, Final offload: {final_offload} blocks.\n"
f"Test requires memory pressure to properly validate determinism under load."
)
if final_onboard > 0:
print(
f" KVBM onboard activity detected: {final_onboard} blocks onboarded"
)
else:
pytest.fail(
f"No onboard activity detected during test.\n"
f"Final onboard: {final_onboard} blocks.\n"
f"Test requires KV cache onboarding to properly validate determinism under load."
)
except Exception as e:
print(f"Could not fetch final metrics: {e}")
def base_test_spanish_prompt_determinism_under_load(
self, tester, llm_server, runtime_services, spanish_prompt_path: Path
):
"""Base implementation: send Spanish prompt repeatedly while vllm bench runs.
Tests determinism under high concurrency load. Reproduces bugs where responses
can become corrupted or non-deterministic under memory pressure.
Args:
tester: DeterminismTester instance
llm_server: LLM server manager
runtime_services: Runtime services fixture
spanish_prompt_path: Path to the Spanish prompt file
"""
import subprocess
print("\n" + "=" * 70)
print("DETERMINISM TEST UNDER HIGH CONCURRENCY LOAD")
print("=" * 70)
# Load prompt
prompt = load_prompt_from_file(spanish_prompt_path)
if prompt is None:
pytest.fail(f"Prompt not found at {spanish_prompt_path}")
# Test parameters
num_requests = int(os.environ.get("KVBM_NUM_ITERATIONS", "15"))
delay_seconds = int(os.environ.get("KVBM_REQUEST_DELAY", "30"))
max_tokens = int(os.environ.get("KVBM_MAX_TOKENS", "80"))
min_similarity = float(os.environ.get("KVBM_MIN_SIMILARITY", "0.75"))
print("\nTest configuration:")
print(f" Requests: {num_requests}")
print(f" Delay: {delay_seconds}s")
print(f" Max tokens: {max_tokens}")
print(f" Min semantic similarity: {min_similarity:.0%}")
# Establish baseline
baseline_response = self._establish_baseline(tester, prompt, max_tokens)
# Start benchmark
bench_process, bench_file, bench_log = self._start_benchmark(llm_server)
try:
# Check initial metrics
print("\nChecking initial KVBM metrics...")
try:
initial_metrics = fetch_kvbm_metrics()
initial_offload = initial_metrics.get("kvbm_offload_blocks_d2h", 0)
print(f"Initial offload: {initial_offload} blocks")
except Exception as e:
print(f"Could not fetch initial metrics: {e}")
initial_offload = 0
# Wait for benchmark activity
benchmark_started = self._wait_for_benchmark_activity(initial_offload)
if not benchmark_started:
pytest.fail(
"Benchmark failed to start or create offload activity. "
"Test cannot proceed without memory pressure to properly test determinism under load."
)
print("Waiting additional 10s for benchmark to fully ramp up...")
time.sleep(10)
# Send requests and track results
print(f"\n{'='*70}")
print(f"SENDING {num_requests} REQUESTS (comparing against baseline)")
print(f"{'='*70}")
responses = []
mismatches = []
exact_matches = 0
semantic_matches = 0
for i in range(num_requests):
print(f"\n--- Request {i+1}/{num_requests} ---")
try:
response = tester.make_request(
prompt, max_tokens=max_tokens, temperature=0, seed=42
)
responses.append(response)
print(f"Response: {response}")
# Compare with baseline
comparison = self._compare_with_baseline(
response, baseline_response, min_similarity, i + 1
)
if comparison["exact_match"]:
print("✓ EXACT MATCH (100% deterministic)")
exact_matches += 1
semantic_matches += 1
elif comparison["semantic_match"]:
print(
f"✓ SEMANTICALLY EQUIVALENT ({comparison['similarity']:.1%} similar)"
)
print(f" {comparison['reason']}")
semantic_matches += 1
else:
print(
f"✗ SEMANTIC DIVERGENCE ({comparison['similarity']:.1%} similar)"
)
print(f" {comparison['reason']}")
print(
f" Divergence at char {comparison['diverge_pos']} (~token {comparison['approx_token']})"
)
print(f" Context before: ...{comparison['context_before']}")
print(
f" Baseline continues: {comparison['baseline_continues']}..."
)
print(
f" Response continues: {comparison['response_continues']}..."
)
mismatches.append(comparison)
except Exception as e:
print(f"Request failed: {e}")
responses.append(None)
mismatches.append({"request_num": i + 1, "error": str(e)})
# Wait before next request
if i < num_requests - 1:
print(f"Waiting {delay_seconds}s...")
time.sleep(delay_seconds)
# Report results
self._report_results(
num_requests, exact_matches, semantic_matches, mismatches
)
# Show final KVBM stats
self._show_final_kvbm_stats(initial_offload)
finally:
print("\nStopping benchmark...")
try:
bench_process.terminate()
bench_process.wait(timeout=10)
except subprocess.TimeoutExpired:
bench_process.kill()
bench_process.wait()
bench_file.close()
print(f"Benchmark log: {bench_log}")
def base_test_determinism_with_cache_reset( def base_test_determinism_with_cache_reset(
self, tester, llm_server, runtime_services, success_rate_threshold=1.0 self, tester, llm_server, runtime_services, success_rate_threshold=1.0
): ):
......
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
Divus es un hombre alto, de cabello dividido entre blanco y negro. Viste un largo abrigo confeccionado con pieles sintéticas y proyecta una presencia elegante y autoritaria. Es un profesor sumamente estricto, exigente y perfeccionista. No tolera equivocaciones en las evaluaciones prácticas y toma con la máxima seriedad la formación de sus alumnos. Aunque su carácter puede resultar intimidante, es un docente comprometido que se involucra activamente en el avance académico de quienes demuestran potencial. Es reconocido por ofrecer tutorías personalizadas a los estudiantes destacados, asegurándose de que alcancen el nivel más alto. Valora por encima de todo la puntualidad, la precisión técnica y la claridad conceptual, y espera que sus alumnos mantengan la misma disciplina tanto dentro como fuera del aula.\n\nHistorial de Divus Crewel:\nRespetado dentro de la academia por su elevado porcentaje de aprobados en exámenes de nivel avanzado, Divus ha formado a numerosos estudiantes que luego brillaron en campos técnicos y estratégicos. Su método se caracteriza por ser riguroso y directo, priorizando la observación minuciosa, la corrección inmediata de fallos y la práctica estructurada hasta alcanzar la excelencia. Ha participado en distintos comités de evaluación y es conocido por diseñar pruebas exigentes que reproducen situaciones reales. Aunque rara vez ofrece halagos explícitos, registra cuidadosamente el progreso individual y adapta su enseñanza según las fortalezas y debilidades que detecta.\n\nEscenario del rol:\nDivus se preparaba para aplicar un examen práctico presencial. Tú y tu hermano mayor no estaban convencidos de abandonar su habitación, pues solían realizar las prácticas remotamente desde su dormitorio en Ignihyde, utilizando sistemas avanzados y simulaciones virtuales. Consideraban innecesario el desplazamiento, pero la normativa del curso exigía una evaluación presencial.\n\nMientras Divus distribuía el material del examen —carpetas, instrumentos de medición y hojas de instrucciones detalladas—, nota tu presencia entre los participantes. Se detiene por un instante, revisa la lista y se acerca con un interés profesional.\n\n”Oh… parece que tenemos a una estudiante que normalmente trabaja de forma remota participando en persona.”\n\nSu tono es firme, aunque teñido de curiosidad. Aún no estaba familiarizado contigo en persona, por lo que adopta una actitud objetiva y profesional. Al revisar tu expediente académico, observa tus calificaciones previas, los comentarios de otros profesores y los registros de prácticas exitosas. Su expresión cambia apenas al notar tu constancia y rapidez de aprendizaje.\n\nDivus cierra la carpeta con cuidado y adopta una postura más formal, evaluándote con atención.\n\n”Tus resultados anteriores son consistentes. No obstante, el objetivo de esta prueba es evaluar tu desempeño en un entorno controlado, con recursos limitados y bajo presión de tiempo. Aquí se miden no solo los conocimientos, sino también el criterio, el método y la capacidad de adaptación.”\n\nDa unos pasos hacia la mesa y señala el material dispuesto, asegurándose de que todo esté perfectamente alineado.\n\n”Lee las instrucciones con atención antes de comenzar. Cada procedimiento tiene una razón específica. No se aceptarán improvisaciones fuera de los parámetros establecidos.”\n\nHace una breve pausa, observando a los demás estudiantes, y luego vuelve a dirigir la mirada hacia ti.\n\n”Si completas la prueba con resultados satisfactorios, consideraré ofrecerte tutorías adicionales. No son un privilegio común; exigen constancia y disposición para aceptar correcciones directas.”\n\nEl ambiente se vuelve silencioso mientras Divus da la señal de inicio. Permanece atento, recorriendo el aula con pasos medidos, observando técnicas, anotando tiempos y registrando mentalmente las conductas de cada estudiante. En más de una ocasión, dirige una mirada analítica hacia tu estación de trabajo, evaluando tu método sin intervenir.\n\nEjemplo de interacción:\n\n{{char}}:\nObserva tus apuntes y dispositivos con atención.\n”Veo que aplicas un enfoque sistemático. Eso minimiza errores y mejora la eficiencia.”\n\n{{user}}:\n”Gracias, profesor.”\n\n{{char}}:\n”Mantén ese nivel de concentración. La constancia es tan valiosa como el conocimiento.”\n\n\nContinuación del escenario:\nMientras la evaluación progresa, Divus revisa algunos resultados preliminares. Anota observaciones en su cuaderno, marcando discretamente los puntos fuertes y las áreas a perfeccionar de cada alumno. Su mente opera de forma analítica, comparando métodos, tiempos de ejecución y decisiones técnicas. Solo interviene para recordar una regla general o ajustar el ritmo de la clase cuando lo considera pertinente.\n\nAl concluir el tiempo designado, Divus da la orden de finalizar. Recoge el material con precisión, revisando que todo esté completo y correctamente identificado. Luego se coloca al frente, cruzando los brazos mientras examina al grupo.\n\n”La evaluación ha terminado. Los resultados serán analizados con detenimiento. No esperen comentarios inmediatos; prefiero revisar cada procedimiento con calma.”\n\nSu mirada se detiene en ti brevemente, sin mostrar emoción, pero registrando tu desempeño.\n\n”Aquellos que demuestren constancia y criterio recibirán indicaciones adicionales en los próximos días. El aprendizaje no se detiene en una sola prueba.”\n\nDivus abandona el aula con paso firme, dejando un ambiente de reflexión y expectativa. Para él, cada examen es una herramienta para perfeccionar el talento, y ya planea los siguientes pasos del curso.\n\nNo escribas como {{user}} ni asumas su reacción o respuesta. Espera la respuesta de {{user}} antes de continuar.\n
\ No newline at end of file
...@@ -13,7 +13,6 @@ This test validates that: ...@@ -13,7 +13,6 @@ This test validates that:
""" """
import concurrent.futures import concurrent.futures
import importlib.util
import logging import logging
import os import os
import re import re
...@@ -27,22 +26,11 @@ import yaml ...@@ -27,22 +26,11 @@ import yaml
from tests.kvbm_integration.common import ApiTester, check_logs_for_patterns from tests.kvbm_integration.common import ApiTester, check_logs_for_patterns
from tests.utils.managed_process import ManagedProcess from tests.utils.managed_process import ManagedProcess
# Check if engines are available and build list of available engines # Check if engines are available and build list of available engines
# Use find_spec first (fast check), then verify import works (functional check) from .common import check_module_available
def _check_engine_available(module_name: str) -> bool:
"""Check if an engine module is available and importable."""
if importlib.util.find_spec(module_name) is None:
return False
try:
importlib.import_module(module_name)
return True
except ImportError:
return False
HAS_VLLM = _check_engine_available("vllm") HAS_VLLM = check_module_available("vllm")
HAS_TRTLLM = _check_engine_available("tensorrt_llm") HAS_TRTLLM = check_module_available("tensorrt_llm")
# Build list of available engines for parameterization # Build list of available engines for parameterization
AVAILABLE_ENGINES = [] AVAILABLE_ENGINES = []
......
...@@ -11,6 +11,8 @@ when given the same inputs with fixed seed and temperature=0. ...@@ -11,6 +11,8 @@ when given the same inputs with fixed seed and temperature=0.
The test uses comprehensive server warmup (sending all test prompts The test uses comprehensive server warmup (sending all test prompts
before validation) to avoid server initialization effects that could before validation) to avoid server initialization effects that could
impact determinism measurements. impact determinism measurements.
This is a TensorRTLLM only test.
""" """
import logging import logging
...@@ -24,8 +26,13 @@ from tests.utils.engine_process import FRONTEND_PORT ...@@ -24,8 +26,13 @@ from tests.utils.engine_process import FRONTEND_PORT
from tests.utils.managed_process import DynamoFrontendProcess, ManagedProcess from tests.utils.managed_process import DynamoFrontendProcess, ManagedProcess
from tests.utils.payloads import check_models_api from tests.utils.payloads import check_models_api
from .common import check_module_available
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
HAS_TRTLLM = check_module_available("tensorrt_llm")
# Just need a model to show the config works rather than any stress of the system. # Just need a model to show the config works rather than any stress of the system.
MODEL_PATH = "TinyLlama/TinyLlama-1.1B-Chat-v1.0" MODEL_PATH = "TinyLlama/TinyLlama-1.1B-Chat-v1.0"
SERVED_MODEL_NAME = "TinyLlama/TinyLlama-1.1B-Chat-v1.0" SERVED_MODEL_NAME = "TinyLlama/TinyLlama-1.1B-Chat-v1.0"
...@@ -151,6 +158,7 @@ def send_completion_request( ...@@ -151,6 +158,7 @@ def send_completion_request(
@pytest.mark.nightly @pytest.mark.nightly
@pytest.mark.slow @pytest.mark.slow
@pytest.mark.gpu_1 @pytest.mark.gpu_1
@pytest.mark.skipif(not HAS_TRTLLM, reason="requires tensorrt_llm")
def test_kvbm_without_cuda_graph_enabled(request, runtime_services): def test_kvbm_without_cuda_graph_enabled(request, runtime_services):
""" """
End-to-end test for TRTLLM worker with cuda_graph_config not defined and End-to-end test for TRTLLM worker with cuda_graph_config not defined and
...@@ -187,6 +195,7 @@ def test_kvbm_without_cuda_graph_enabled(request, runtime_services): ...@@ -187,6 +195,7 @@ def test_kvbm_without_cuda_graph_enabled(request, runtime_services):
@pytest.mark.slow @pytest.mark.slow
@pytest.mark.nightly @pytest.mark.nightly
@pytest.mark.gpu_1 @pytest.mark.gpu_1
@pytest.mark.skipif(not HAS_TRTLLM, reason="requires tensorrt_llm")
def test_kvbm_with_cuda_graph_enabled(request, runtime_services): def test_kvbm_with_cuda_graph_enabled(request, runtime_services):
""" """
End-to-end test for TRTLLM worker with cuda_graph_config defined and End-to-end test for TRTLLM worker with cuda_graph_config defined and
......
...@@ -14,27 +14,30 @@ The expected results should be 100% match between the two cases. Compared to ...@@ -14,27 +14,30 @@ The expected results should be 100% match between the two cases. Compared to
disaggregated mode, aggregated mode has less randomness chances. disaggregated mode, aggregated mode has less randomness chances.
""" """
import importlib.util
import logging import logging
import os import os
import signal import signal
import socket
import subprocess import subprocess
import sys
import threading
import time import time
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime from datetime import datetime
from pathlib import Path from pathlib import Path
from typing import Any, Dict, Optional, TextIO from typing import Any, Dict, List, Optional, TextIO
import pytest import pytest
import requests import requests
from .common import DeterminismTester, ServerType from .common import DeterminismTester, ServerType
from .common import TestDeterminism as BaseTestDeterminism from .common import TestDeterminism as BaseTestDeterminism
from .common import check_module_available
HAS_VLLM_BENCH = check_module_available("vllm")
# Test markers to align with repository conventions # Test markers to align with repository conventions
# Todo: enable the rest when kvbm is built in the ci # Todo: enable the rest when kvbm is built in the ci
pytestmark = [ pytestmark = [
pytest.mark.kvbm,
pytest.mark.e2e, pytest.mark.e2e,
pytest.mark.slow, pytest.mark.slow,
pytest.mark.gpu_1, pytest.mark.gpu_1,
...@@ -42,6 +45,16 @@ pytestmark = [ ...@@ -42,6 +45,16 @@ pytestmark = [
] ]
def _find_free_port() -> int:
"""Find a free port by binding to port 0 and letting the OS assign one."""
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(("", 0))
s.listen(1)
port = s.getsockname()[1]
return port
class LLMServerManager: class LLMServerManager:
"""Manages LLM server lifecycle for determinism testing.""" """Manages LLM server lifecycle for determinism testing."""
...@@ -55,7 +68,13 @@ class LLMServerManager: ...@@ -55,7 +68,13 @@ class LLMServerManager:
server_type: Optional[str] = ServerType.vllm, server_type: Optional[str] = ServerType.vllm,
): ):
self.server_type = server_type self.server_type = server_type
self.port = port or int(os.environ.get("KVBM_SERVER_PORT", "8000")) # Use provided port, env var, or find a free port to avoid conflicts
if port is not None:
self.port = port
elif os.environ.get("KVBM_SERVER_PORT"):
self.port = int(os.environ["KVBM_SERVER_PORT"])
else:
self.port = _find_free_port()
self.base_url = base_url or f"http://localhost:{self.port}" self.base_url = base_url or f"http://localhost:{self.port}"
self.process: Optional[subprocess.Popen] = None self.process: Optional[subprocess.Popen] = None
self.cpu_cache_blocks = cpu_cache_blocks self.cpu_cache_blocks = cpu_cache_blocks
...@@ -72,7 +91,7 @@ class LLMServerManager: ...@@ -72,7 +91,7 @@ class LLMServerManager:
self.log_dir / f"{self.server_type}_server_{config_str}_{timestamp}.log" self.log_dir / f"{self.server_type}_server_{config_str}_{timestamp}.log"
) )
self.server_stdout_file: Optional[TextIO] = None self.server_stdout_file: Optional[TextIO] = None
self.server_stderr_file: Optional[TextIO] = None self._tee_threads: List[threading.Thread] = []
# Environment for the process # Environment for the process
self.env = os.environ.copy() self.env = os.environ.copy()
...@@ -82,6 +101,12 @@ class LLMServerManager: ...@@ -82,6 +101,12 @@ class LLMServerManager:
# DynamoConnector connection settings # DynamoConnector connection settings
"NATS_SERVER": "nats://localhost:4222", "NATS_SERVER": "nats://localhost:4222",
"ETCD_ENDPOINTS": "http://localhost:2379", "ETCD_ENDPOINTS": "http://localhost:2379",
# Enable KVBM metrics for monitoring offload/onboard
"DYN_KVBM_METRICS": "true",
"DYN_KVBM_METRICS_PORT": "6880",
# Enable vLLM batch invariant for deterministic batching
"VLLM_BATCH_INVARIANT": "1",
"VLLM_ATTENTION_BACKEND": "FLASH_ATTN",
} }
) )
...@@ -164,40 +189,69 @@ class LLMServerManager: ...@@ -164,40 +189,69 @@ class LLMServerManager:
with open(config_path, "w") as f: with open(config_path, "w") as f:
yaml.dump(llm_api_config, f, default_flow_style=False, sort_keys=False) yaml.dump(llm_api_config, f, default_flow_style=False, sort_keys=False)
def _tee_output(self, pipe: Any, log_file: TextIO, prefix: str) -> None:
"""Read from pipe and write to both log file and stdout (tee)."""
try:
for line in iter(pipe.readline, ""):
if not line:
break
# Write to log file
log_file.write(line)
log_file.flush()
# Write to stdout with prefix
sys.stdout.write(f"[{prefix}] {line}")
sys.stdout.flush()
except (ValueError, OSError):
pass # Pipe closed
finally:
pipe.close()
def start_server(self, timeout: int = 300) -> bool: def start_server(self, timeout: int = 300) -> bool:
"""Start LLM server and wait for readiness.""" """Start LLM server and wait for readiness."""
if self.is_server_running(): if self.is_server_running():
self.stop_server() self.stop_server()
time.sleep(2) time.sleep(2)
# Open log files # Open log file (combined stdout+stderr)
self.server_stdout_file = open( self.server_stdout_file = open(self.server_log_file.with_suffix(".log"), "w")
self.server_log_file.with_suffix(".stdout.log"), "w"
) # Write header
self.server_stderr_file = open( header = f"=== {self.server_type} Server Started at {datetime.now()} ===\nCommand: {' '.join(self.server_cmd)}\n"
self.server_log_file.with_suffix(".stderr.log"), "w" self.server_stdout_file.write(header)
) self.server_stdout_file.flush()
if self.server_stdout_file is not None: print(f"[{self.server_type}] {header}", end="")
self.server_stdout_file.write(
f"=== {self.server_type} Server Started at {datetime.now()} ===\nCommand: {' '.join(self.server_cmd)}\n"
)
self.server_stdout_file.flush()
# Launch # Launch with pipe, redirect stderr to stdout
self.process = subprocess.Popen( self.process = subprocess.Popen(
self.server_cmd, self.server_cmd,
stdout=self.server_stdout_file, stdout=subprocess.PIPE,
stderr=self.server_stderr_file, stderr=subprocess.STDOUT, # Redirect stderr to stdout
env=self.env, env=self.env,
preexec_fn=os.setsid, preexec_fn=os.setsid,
text=True,
bufsize=1, # Line buffered
) )
# Start tee thread for combined output
self._tee_threads = [
threading.Thread(
target=self._tee_output,
args=(self.process.stdout, self.server_stdout_file, self.server_type),
daemon=True,
),
]
for t in self._tee_threads:
t.start()
# Wait for health # Wait for health
start_time = time.time() start_time = time.time()
while time.time() - start_time < timeout: while time.time() - start_time < timeout:
if self.is_server_running(): if self.is_server_running():
return True return True
if self.process.poll() is not None: if self.process.poll() is not None:
# Process exited, wait for tee thread to finish
for t in self._tee_threads:
t.join(timeout=2)
self._close_log_files() self._close_log_files()
return False return False
time.sleep(5) time.sleep(5)
...@@ -220,6 +274,10 @@ class LLMServerManager: ...@@ -220,6 +274,10 @@ class LLMServerManager:
pass pass
finally: finally:
self.process = None self.process = None
# Wait for tee threads to finish
for t in self._tee_threads:
t.join(timeout=2)
self._tee_threads = []
self._close_log_files() self._close_log_files()
def _close_log_files(self): def _close_log_files(self):
...@@ -229,9 +287,6 @@ class LLMServerManager: ...@@ -229,9 +287,6 @@ class LLMServerManager:
) )
self.server_stdout_file.close() self.server_stdout_file.close()
self.server_stdout_file = None self.server_stdout_file = None
if self.server_stderr_file:
self.server_stderr_file.close()
self.server_stderr_file = None
def is_server_running(self) -> bool: def is_server_running(self) -> bool:
try: try:
...@@ -318,9 +373,9 @@ def llm_server(request, runtime_services): ...@@ -318,9 +373,9 @@ def llm_server(request, runtime_services):
# Put logs in the per-test directory set up by tests/conftest.py # Put logs in the per-test directory set up by tests/conftest.py
log_dir = Path(request.node.name) log_dir = Path(request.node.name)
if importlib.util.find_spec("vllm") is not None: if check_module_available("vllm"):
server_type = ServerType.vllm server_type = ServerType.vllm
elif importlib.util.find_spec("tensorrt_llm") is not None: elif check_module_available("tensorrt_llm"):
server_type = ServerType.trtllm server_type = ServerType.trtllm
else: else:
raise Exception( raise Exception(
...@@ -363,10 +418,14 @@ class TestDeterminismAgg(BaseTestDeterminism): ...@@ -363,10 +418,14 @@ class TestDeterminismAgg(BaseTestDeterminism):
@pytest.mark.parametrize( @pytest.mark.parametrize(
"llm_server", "llm_server",
[ [
{"cpu_blocks": int(os.environ.get("KVBM_CPU_BLOCKS", "10000"))}, {
"cpu_blocks": int(os.environ.get("KVBM_CPU_BLOCKS", "10000")),
"gpu_blocks": int(os.environ.get("KVBM_GPU_BLOCKS", "2048")),
},
], ],
indirect=True, indirect=True,
) )
@pytest.mark.kvbm
def test_determinism_agg_with_cache_reset( def test_determinism_agg_with_cache_reset(
self, tester, llm_server, runtime_services self, tester, llm_server, runtime_services
): ):
...@@ -379,197 +438,38 @@ class TestDeterminismAgg(BaseTestDeterminism): ...@@ -379,197 +438,38 @@ class TestDeterminismAgg(BaseTestDeterminism):
@pytest.mark.parametrize( @pytest.mark.parametrize(
"llm_server", "llm_server",
[ [
{"cpu_blocks": int(os.environ.get("KVBM_CPU_BLOCKS", "20000"))}, {
"cpu_blocks": int(os.environ.get("KVBM_CPU_BLOCKS", "30000")),
"gpu_blocks": int(os.environ.get("KVBM_GPU_BLOCKS", "2048")),
},
], ],
indirect=True, indirect=True,
) )
@pytest.mark.parametrize( @pytest.mark.kvbm_concurrency
"num_concurrent", @pytest.mark.skipif(
[int(x) for x in os.environ.get("KVBM_CONCURRENT_REQUESTS", "3").split(",")], not HAS_VLLM_BENCH, reason="requires vllm bench (vllm module not found)"
) )
@pytest.mark.parametrize( @pytest.mark.xfail(
"max_tokens", reason="Known issue, fixed in PR: https://github.com/ai-dynamo/dynamo/pull/5475",
[int(os.environ.get("KVBM_MAX_TOKENS", "48"))], run=True,
) )
@pytest.mark.parametrize( def test_concurrent_determinism_under_load(
"num_prompts", self, tester, llm_server, runtime_services
[int(x) for x in os.environ.get("KVBM_IFEVAL_PROMPTS", "120").split(",")],
)
@pytest.mark.skip(reason="Flaky test: DIS-665")
def test_concurrent_determinism_with_ifeval(
self,
tester,
llm_server,
runtime_services,
num_concurrent,
max_tokens,
num_prompts,
): ):
"""Simple concurrent determinism test: send IFEval prompts concurrently, with cache reset.""" """Test Spanish prompt determinism under high concurrency load.
print("\n" + "=" * 70)
print("CONCURRENT DETERMINISM TEST WITH IFEVAL")
print("=" * 70)
print(f"Using max_tokens={max_tokens} (from KVBM_MAX_TOKENS)") Reproduces the bug where Spanish responses become English or corrupted.
"""
# Get the Spanish prompt path relative to this test file
spanish_prompt_path = Path(
os.path.join(os.path.dirname(__file__), "es_prompt.txt")
).absolute()
# Configuration comes from parametrize # Call the base class implementation
print( super().base_test_spanish_prompt_determinism_under_load(
f"Configuration: {num_concurrent} concurrent requests, {max_tokens} max tokens" tester, llm_server, runtime_services, spanish_prompt_path
)
# Load IFEval prompts
ifeval_prompts = tester.download_ifeval_dataset()
if not ifeval_prompts:
pytest.skip("IFEval dataset not available")
# Use parametrized number of IFEval prompts
test_prompts = ifeval_prompts[:num_prompts]
print(
f"Using {len(test_prompts)} IFEval prompts for concurrent testing (parametrized: {num_prompts})"
)
print(f"Concurrency level: {num_concurrent} simultaneous requests")
# Show sample prompts
print("\nSample prompts:")
for i, prompt in enumerate(test_prompts[:3]):
print(f" {i+1}. {prompt[:80]}{'...' if len(prompt) > 80 else ''}")
if len(test_prompts) > 3:
print(f" ... and {len(test_prompts) - 3} more")
def run_concurrent_test(phase_name, do_warmup=False):
"""Run one phase of concurrent testing."""
print(f"\n=== {phase_name} ===")
if do_warmup:
# KV Cache warmup - send ALL test prompts to compute KV caches
print(
f"Warming up KV caches with all {len(test_prompts)} test prompts..."
)
warmup_failed = 0
for i, prompt in enumerate(test_prompts):
if (
i % 5 == 0 or i == len(test_prompts) - 1
): # Progress every 5 prompts
print(f" Warmup progress: {i+1}/{len(test_prompts)}")
try:
tester.make_request(prompt)
except Exception as e:
warmup_failed += 1
if warmup_failed <= 3: # Show first few failures
print(f" Warmup failed for prompt {i}: {e}")
if warmup_failed > 0:
print(
f"Warmup completed with {warmup_failed} failures out of {len(test_prompts)} prompts"
)
else:
print(
f"Warmup completed successfully - all {len(test_prompts)} KV caches computed"
)
# Wait for 10 seconds to make sure all transfers are complete
time.sleep(10)
else:
print("Skipping warmup (already done in previous phase)")
# Run concurrent requests
print(
f"Sending {len(test_prompts)} requests with {num_concurrent} max concurrent..."
)
start_time = time.time()
def make_request_wrapper(prompt_and_idx):
idx, prompt = prompt_and_idx
try:
response = tester.make_request(prompt)
return {
"idx": idx,
"prompt": prompt,
"response": response,
"success": True,
}
except Exception as e:
return {
"idx": idx,
"prompt": prompt,
"error": str(e),
"success": False,
}
# Execute all requests concurrently
with ThreadPoolExecutor(max_workers=num_concurrent) as executor:
results = list(
executor.map(make_request_wrapper, enumerate(test_prompts))
)
elapsed = time.time() - start_time
successful = [r for r in results if r["success"]]
failed = [r for r in results if not r["success"]]
print(
f"Completed in {elapsed:.2f}s - Success: {len(successful)}, Failed: {len(failed)}"
)
if failed:
for fail in failed[:3]: # Show first few failures
print(f" Failed: {fail['error']}")
return successful
# Phase 1: Before cache reset
results_before = run_concurrent_test(
"PHASE 1: BEFORE CACHE RESET", do_warmup=True
) )
# Reset cache
print("\n" + "=" * 50)
print("RESETTING CACHE")
print("=" * 50)
tester.reset_prefix_cache()
# Phase 2: After cache reset
results_after = run_concurrent_test("PHASE 2: AFTER CACHE RESET")
# Compare results between phases
print("\n" + "=" * 70)
print("DETERMINISM ANALYSIS")
print("=" * 70)
# Create lookup for before results
before_responses = {r["idx"]: r["response"] for r in results_before}
after_responses = {r["idx"]: r["response"] for r in results_after}
deterministic_count = 0
total_compared = 0
for idx in before_responses:
if idx in after_responses:
total_compared += 1
before_resp = before_responses[idx]
after_resp = after_responses[idx]
if before_resp == after_resp:
deterministic_count += 1
print(f" Prompt {idx}: DETERMINISTIC")
else:
print(f" Prompt {idx}: NON-DETERMINISTIC")
print(f" Before: {before_resp}")
print(f" After: {after_resp}")
# Final assessment
success_rate = deterministic_count / total_compared if total_compared > 0 else 0
print("\n=== FINAL RESULT ===")
print(f"Prompts compared: {total_compared}")
print(f"Deterministic: {deterministic_count}")
print(f"Success rate: {success_rate:.1%}")
print(f"Concurrent requests: {num_concurrent}")
assert (
success_rate == 1.0
), f"Determinism failed: {deterministic_count}/{total_compared} prompts deterministic"
if __name__ == "__main__": if __name__ == "__main__":
# Allow running as script # Allow running as script
......
...@@ -15,7 +15,6 @@ Compared to aggregated mode, disaggregated mode has some known randomness. ...@@ -15,7 +15,6 @@ Compared to aggregated mode, disaggregated mode has some known randomness.
Example reference: https://github.com/vllm-project/vllm/issues/7779#issuecomment-2304967870 Example reference: https://github.com/vllm-project/vllm/issues/7779#issuecomment-2304967870
""" """
import importlib.util
import logging import logging
import os import os
import signal import signal
...@@ -32,6 +31,7 @@ import yaml ...@@ -32,6 +31,7 @@ import yaml
from .common import DeterminismTester, ServerType from .common import DeterminismTester, ServerType
from .common import TestDeterminism as BaseTestDeterminism from .common import TestDeterminism as BaseTestDeterminism
from .common import check_module_available
# Test markers to align with repository conventions # Test markers to align with repository conventions
# Todo: enable the rest when kvbm is built in the ci # Todo: enable the rest when kvbm is built in the ci
...@@ -507,9 +507,9 @@ def llm_server(request, runtime_services): ...@@ -507,9 +507,9 @@ def llm_server(request, runtime_services):
# Put logs in the per-test directory set up by tests/conftest.py # Put logs in the per-test directory set up by tests/conftest.py
log_dir = Path(request.node.name) log_dir = Path(request.node.name)
if importlib.util.find_spec("vllm") is not None: if check_module_available("vllm"):
server_type = ServerType.vllm server_type = ServerType.vllm
elif importlib.util.find_spec("tensorrt_llm") is not None: elif check_module_available("tensorrt_llm"):
server_type = ServerType.trtllm server_type = ServerType.trtllm
else: else:
pytest.skip("vllm module is not available in the current environment.") pytest.skip("vllm module is not available in the current environment.")
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment