Unverified Commit f7f2fb26 authored by Kyle McGill's avatar Kyle McGill Committed by GitHub
Browse files

fix: Fix consolidator tests by computing the number of requests needed to...

fix: Fix consolidator tests by computing the number of requests needed to trigger an expected pass (#5220)
parent c92a422b
......@@ -86,9 +86,7 @@ def create_trtllm_config(test_directory: Path) -> Path:
"enable_partial_reuse": False,
"free_gpu_memory_fraction": 0.01,
},
"build_config": {
"max_seq_len": 4096,
},
"max_seq_len": 4096,
"kv_connector_config": {
"connector_module": "kvbm.trtllm_integration.connector",
"connector_scheduler_class": "DynamoKVBMConnectorLeader",
......@@ -100,6 +98,102 @@ def create_trtllm_config(test_directory: Path) -> Path:
return config_path
def compute_deduplication_test_params(
g1_gpu_blocks: int,
g2_cpu_blocks: int,
g3_disk_blocks: int,
block_size: int = 16,
gpu_utilization_per_request: float = 0.6,
offload_overflow_factor: float = 100.0,
) -> dict:
"""
Compute max_tokens and num_requests for the deduplication test.
Given fixed block allocations for each storage tier, compute test parameters
that ensure the test lands in the "Goldilocks zone" where:
1. Each request fits within GPU cache (with safety margin)
2. Total tokens generated overflow all tiers
3. Deduplication behavior is observable (blocks exist in multiple sources)
4. Blocks are eventually removed from ALL sources (triggering REMOVE events)
The key insight is that blocks are replicated across GPU, CPU, and Disk
simultaneously. To trigger "last source" removal, a block must be evicted
from ALL three tiers. The bottleneck is the smallest offload cache (CPU or
Disk), not total capacity.
Args:
g1_gpu_blocks: Number of GPU cache blocks (--num-gpu-blocks-override)
g2_cpu_blocks: Number of CPU cache blocks (DYN_KVBM_CPU_CACHE_OVERRIDE_NUM_BLOCKS)
g3_disk_blocks: Number of disk cache blocks (DYN_KVBM_DISK_CACHE_OVERRIDE_NUM_BLOCKS)
block_size: Tokens per block (default 16 for vLLM)
gpu_utilization_per_request: Fraction of GPU capacity to use per request (default 0.6)
- Too high (>0.8): requests may cause immediate evictions
- Too low (<0.3): too many requests needed, slow test
offload_overflow_factor: How many times to overflow the smallest offload
cache (default 100.0). High value needed because:
- Blocks persist in offload caches longer than GPU
- Offloading is asynchronous - not all blocks make it
- Need enough volume to ensure blocks cycle through all tiers
Returns:
dict with:
- max_tokens: Maximum tokens per request
- num_requests: Number of requests to send
- blocks_per_request: Estimated blocks consumed per request
- total_capacity_blocks: Total blocks across all tiers
- total_blocks_generated: Expected total blocks generated
- min_offload_cache: The smallest offload cache (bottleneck)
Example:
>>> config = compute_deduplication_test_params(
... g1_gpu_blocks=30,
... g2_cpu_blocks=10,
... g3_disk_blocks=10,
... )
>>> config
{
'max_tokens': 288,
'num_requests': 56,
'blocks_per_request': 18,
'total_capacity_blocks': 50,
'total_blocks_generated': 1008,
'min_offload_cache': 10,
}
"""
from math import ceil
# 1. max_tokens: fits in GPU with margin
# Use gpu_utilization to leave headroom for the model to process without
# triggering evictions mid-generation
max_blocks_per_request = max(1, int(g1_gpu_blocks * gpu_utilization_per_request))
max_tokens = max_blocks_per_request * block_size
# 2. num_requests: overflow the smallest offload cache many times
# The bottleneck for "last source" removal is the smallest offload cache.
# Blocks must be evicted from GPU AND both CPU and Disk.
# Since blocks are replicated to all tiers, we need to cycle through
# the smallest cache many times to ensure blocks get evicted from ALL sources.
min_offload_cache = min(g2_cpu_blocks, g3_disk_blocks)
total_capacity_blocks = g1_gpu_blocks + g2_cpu_blocks + g3_disk_blocks
# Generate enough unique blocks to overflow the smallest offload cache
# many times, ensuring blocks eventually get evicted from all sources
unique_blocks_needed = int(min_offload_cache * offload_overflow_factor)
num_requests = max(1, ceil(unique_blocks_needed / max_blocks_per_request))
# Calculate expected total blocks generated
total_blocks_generated = num_requests * max_blocks_per_request
return {
"max_tokens": max_tokens,
"num_requests": num_requests,
"blocks_per_request": max_blocks_per_request,
"total_capacity_blocks": total_capacity_blocks,
"total_blocks_generated": total_blocks_generated,
"min_offload_cache": min_offload_cache,
}
def extract_consolidator_stats(log_path: Path, engine: str = "vllm") -> dict:
"""Extract consolidator event statistics from engine logs."""
stats = {
......@@ -668,6 +762,18 @@ class TestConsolidatorRouterE2E:
# Start worker with constrained GPU blocks but larger KVBM blocks
model_id = os.environ.get("CONSOLIDATOR_MODEL_ID", "Qwen/Qwen3-0.6B")
# Fixed cache tier sizes
g1_gpu_blocks = 10 # Very small GPU cache to force evictions
g2_cpu_blocks = 5 # Smaller than GPU but large enough to retain blocks
g3_disk_blocks = 5 # Smaller than GPU but large enough to retain blocks
# Compute optimal test parameters for this configuration
test_params = compute_deduplication_test_params(
g1_gpu_blocks=g1_gpu_blocks,
g2_cpu_blocks=g2_cpu_blocks,
g3_disk_blocks=g3_disk_blocks,
)
# Build command based on engine type
if engine == "vllm":
worker_command = [
......@@ -681,7 +787,7 @@ class TestConsolidatorRouterE2E:
"--enforce-eager",
"--enable-prefix-caching",
"--num-gpu-blocks-override",
"30", # Very small GPU cache to force evictions
str(g1_gpu_blocks),
]
else: # trtllm
# Create TensorRT-LLM config file with KVBM connector
......@@ -710,8 +816,8 @@ class TestConsolidatorRouterE2E:
"RUST_BACKTRACE": "1",
"NATS_SERVER": "nats://localhost:4222",
"ETCD_ENDPOINTS": "http://localhost:2379",
"DYN_KVBM_CPU_CACHE_OVERRIDE_NUM_BLOCKS": "50", # Larger than GPU but still constrained
"DYN_KVBM_DISK_CACHE_OVERRIDE_NUM_BLOCKS": "50",
"DYN_KVBM_CPU_CACHE_OVERRIDE_NUM_BLOCKS": str(g2_cpu_blocks),
"DYN_KVBM_DISK_CACHE_OVERRIDE_NUM_BLOCKS": str(g3_disk_blocks),
"DYN_LOG": "debug",
}
)
......@@ -759,15 +865,60 @@ class TestConsolidatorRouterE2E:
# Phase 1: Send requests to fill GPU cache
logger.info("Phase 1: Filling GPU cache with diverse prompts")
num_requests = 100
for i in range(num_requests):
prompt = f"Tell me a unique story about topic {i}. Make it very long and detailed with many paragraphs."
response = tester.send_chat_request(
messages=[{"role": "user", "content": prompt}],
max_tokens=100, # Increase tokens to use more blocks per request
)
assert "content" in response["choices"][0]["message"]
logger.info(f"Request {i+1}/{num_requests} completed")
num_requests = test_params["num_requests"]
max_tokens = test_params["max_tokens"]
concurrency = 2 # Parallel requests to create cache pressure
logger.info(
f"Computed test params: max_tokens={max_tokens}, "
f"num_requests={num_requests}, concurrency={concurrency}, "
f"blocks_per_request={test_params['blocks_per_request']}, "
f"total_capacity={test_params['total_capacity_blocks']} blocks, "
f"min_offload_cache={test_params['min_offload_cache']} blocks, "
f"total_blocks_generated={test_params['total_blocks_generated']}"
)
# Use parallel requests to create concurrent cache pressure
# This forces the GPU cache to actually fill up (multiple requests
# holding blocks simultaneously) rather than recycling blocks
# between serial requests
from concurrent.futures import ThreadPoolExecutor, as_completed
def send_request(request_idx: int) -> tuple[int, bool]:
"""Send a single request and return (index, success)."""
prompt = f"Tell me a unique story about topic {request_idx}. Make it very long and detailed with many paragraphs."
try:
response = tester.send_chat_request(
messages=[{"role": "user", "content": prompt}],
max_tokens=max_tokens,
)
except requests.RequestException:
logger.exception(f"Request {request_idx} failed")
return (request_idx, False)
else:
success = "content" in response["choices"][0]["message"]
return (request_idx, success)
completed_count = 0
failed_count = 0
with ThreadPoolExecutor(max_workers=concurrency) as executor:
futures = {
executor.submit(send_request, i): i for i in range(num_requests)
}
for future in as_completed(futures):
_request_idx, success = future.result()
if success:
completed_count += 1
else:
failed_count += 1
if completed_count % 10 == 0 or completed_count == num_requests:
logger.info(
f"Progress: {completed_count}/{num_requests} completed"
)
logger.info(
f"All requests finished: {completed_count} succeeded, {failed_count} failed"
)
assert failed_count == 0, f"{failed_count} requests failed"
# Wait for requests to complete and blocks to be freed
# With GUARANTEED_NO_EVICT, blocks are freed when requests complete (not evicted)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment