Unverified Commit 2eb9e0dd authored by Keiven C's avatar Keiven C Committed by GitHub
Browse files

fix: harden KVBM integration tests with dynamic timeouts and metrics (#6105)


Signed-off-by: default avatarKeiven Chang <keivenchang@users.noreply.github.com>
parent 67d00b24
...@@ -1311,7 +1311,14 @@ def fetch_kvbm_metrics(port: int, timeout: int = 10) -> dict: ...@@ -1311,7 +1311,14 @@ def fetch_kvbm_metrics(port: int, timeout: int = 10) -> dict:
Raises: Raises:
RuntimeError: If metrics endpoint is unreachable or returns error RuntimeError: If metrics endpoint is unreachable or returns error
""" """
response = requests.get(f"http://localhost:{port}/metrics", timeout=timeout) url = f"http://localhost:{port}/metrics"
try:
response = requests.get(url, timeout=timeout)
except requests.exceptions.ConnectionError as err:
raise RuntimeError(
f"Metrics endpoint {url} refused connection. "
f"Check that DYN_KVBM_METRICS_PORT={port} matches the running server."
) from err
if response.status_code != 200: if response.status_code != 200:
raise RuntimeError( raise RuntimeError(
f"Metrics endpoint returned status {response.status_code}. " f"Metrics endpoint returned status {response.status_code}. "
......
...@@ -118,7 +118,7 @@ def tester(llm_server_kvbm): # noqa: F811 ...@@ -118,7 +118,7 @@ def tester(llm_server_kvbm): # noqa: F811
], ],
indirect=True, indirect=True,
) )
@pytest.mark.xfail(reason="Test currently failing and blocking CI") @pytest.mark.timeout(140) # 4x measured (~34s), rounded up
def test_chunked_prefill_offload(tester, llm_server_kvbm): # noqa: F811 def test_chunked_prefill_offload(tester, llm_server_kvbm): # noqa: F811
""" """
Validate that chunked prefill blocks are offloaded. Validate that chunked prefill blocks are offloaded.
......
...@@ -508,6 +508,7 @@ class TestConsolidatorRouterE2E: ...@@ -508,6 +508,7 @@ class TestConsolidatorRouterE2E:
logger.info(f"Concurrent requests: {successes}/{num_requests} succeeded") logger.info(f"Concurrent requests: {successes}/{num_requests} succeeded")
return successes, results return successes, results
@pytest.mark.timeout(150) # 4x measured (~37s), rounded up
def test_basic_consolidator_flow(self, tester, llm_worker, frontend_server): def test_basic_consolidator_flow(self, tester, llm_worker, frontend_server):
""" """
Test basic consolidator flow: Test basic consolidator flow:
...@@ -551,6 +552,7 @@ class TestConsolidatorRouterE2E: ...@@ -551,6 +552,7 @@ class TestConsolidatorRouterE2E:
logger.info(f"Basic consolidator flow test passed ({engine.upper()})") logger.info(f"Basic consolidator flow test passed ({engine.upper()})")
@pytest.mark.timeout(170) # 4x measured (~41s), rounded up
def test_consolidator_handles_concurrent_requests( def test_consolidator_handles_concurrent_requests(
self, tester, llm_worker, frontend_server self, tester, llm_worker, frontend_server
): ):
...@@ -591,6 +593,7 @@ class TestConsolidatorRouterE2E: ...@@ -591,6 +593,7 @@ class TestConsolidatorRouterE2E:
logger.info(f"Concurrent request handling test passed ({engine.upper()})") logger.info(f"Concurrent request handling test passed ({engine.upper()})")
@pytest.mark.timeout(180) # 4x measured (~44s), rounded up
def test_store_deduplication_across_sources( def test_store_deduplication_across_sources(
self, tester, llm_worker, frontend_server self, tester, llm_worker, frontend_server
): ):
...@@ -686,6 +689,7 @@ class TestConsolidatorRouterE2E: ...@@ -686,6 +689,7 @@ class TestConsolidatorRouterE2E:
logger.info(f"STORE deduplication test passed ({engine.upper()})") logger.info(f"STORE deduplication test passed ({engine.upper()})")
@pytest.mark.timeout(340) # 4x measured (~85s), rounded up
@pytest.mark.parametrize("engine_type", AVAILABLE_ENGINES) @pytest.mark.parametrize("engine_type", AVAILABLE_ENGINES)
def test_remove_deduplication_across_sources( def test_remove_deduplication_across_sources(
self, test_directory, runtime_services, engine_type self, test_directory, runtime_services, engine_type
......
...@@ -12,6 +12,12 @@ fixed seed and temperature=0. ...@@ -12,6 +12,12 @@ fixed seed and temperature=0.
The expected results should be 100% match between the two cases. Compared to The expected results should be 100% match between the two cases. Compared to
disaggregated mode, aggregated mode has less randomness chances. disaggregated mode, aggregated mode has less randomness chances.
These tests are slow by default (~368s and ~601s). For faster runs with
fewer iterations, run the following command (expected to finish in ~58s + ~152s):
KVBM_MAX_ITERATIONS=2 KVBM_NUM_ITERATIONS=2 KVBM_REQUEST_DELAY=2 \
pytest tests/kvbm_integration/test_determinism_agg.py -v --tb=short
""" """
import logging import logging
...@@ -36,6 +42,20 @@ from .common import check_module_available ...@@ -36,6 +42,20 @@ from .common import check_module_available
HAS_VLLM_BENCH = check_module_available("vllm") HAS_VLLM_BENCH = check_module_available("vllm")
# KVBM env vars that drive test duration (used to compute timeouts below).
_KVBM_MAX_ITERATIONS = int(os.environ.get("KVBM_MAX_ITERATIONS", "100"))
_KVBM_NUM_ITERATIONS = int(os.environ.get("KVBM_NUM_ITERATIONS", "15"))
_KVBM_REQUEST_DELAY = int(os.environ.get("KVBM_REQUEST_DELAY", "30"))
# Compute timeouts from the same env vars that control test duration.
# test_determinism_agg_with_cache_reset: runs warmup + 2 phases of KVBM_MAX_ITERATIONS,
# each iteration ~4s (request + overhead), plus ~50s setup/teardown.
_CACHE_RESET_TIMEOUT = 2 * (_KVBM_MAX_ITERATIONS * 4 + 50)
# test_concurrent_determinism_under_load: dominated by
# (KVBM_NUM_ITERATIONS - 1) * KVBM_REQUEST_DELAY seconds of sleep,
# plus ~150s overhead (server startup, benchmark ramp, teardown).
_CONCURRENT_TIMEOUT = 2 * ((_KVBM_NUM_ITERATIONS - 1) * _KVBM_REQUEST_DELAY + 150)
# Test markers to align with repository conventions # Test markers to align with repository conventions
# Todo: enable the rest when kvbm is built in the ci # Todo: enable the rest when kvbm is built in the ci
pytestmark = [ pytestmark = [
...@@ -70,6 +90,8 @@ class LLMServerManager: ...@@ -70,6 +90,8 @@ class LLMServerManager:
self.port = allocate_port(start_port=8000) self.port = allocate_port(start_port=8000)
self.port_allocated = True # Port allocated by us, must deallocate self.port_allocated = True # Port allocated by us, must deallocate
self.base_url = base_url or f"http://localhost:{self.port}" self.base_url = base_url or f"http://localhost:{self.port}"
self.metrics_port = allocate_port(start_port=6880)
self.metrics_port_allocated = True
self.process: Optional[subprocess.Popen] = None self.process: Optional[subprocess.Popen] = None
self.cpu_cache_blocks = cpu_cache_blocks self.cpu_cache_blocks = cpu_cache_blocks
self.gpu_cache_blocks = gpu_cache_blocks self.gpu_cache_blocks = gpu_cache_blocks
...@@ -97,7 +119,7 @@ class LLMServerManager: ...@@ -97,7 +119,7 @@ class LLMServerManager:
"ETCD_ENDPOINTS": "http://localhost:2379", "ETCD_ENDPOINTS": "http://localhost:2379",
# Enable KVBM metrics for monitoring offload/onboard # Enable KVBM metrics for monitoring offload/onboard
"DYN_KVBM_METRICS": "true", "DYN_KVBM_METRICS": "true",
"DYN_KVBM_METRICS_PORT": "6880", "DYN_KVBM_METRICS_PORT": str(self.metrics_port),
# Enable vLLM batch invariant for deterministic batching # Enable vLLM batch invariant for deterministic batching
"VLLM_BATCH_INVARIANT": "1", "VLLM_BATCH_INVARIANT": "1",
"VLLM_ATTENTION_BACKEND": "FLASH_ATTN", "VLLM_ATTENTION_BACKEND": "FLASH_ATTN",
...@@ -241,7 +263,16 @@ class LLMServerManager: ...@@ -241,7 +263,16 @@ class LLMServerManager:
start_time = time.time() start_time = time.time()
while time.time() - start_time < timeout: while time.time() - start_time < timeout:
if self.is_server_running(): if self.is_server_running():
return True # Verify metrics endpoint is reachable (fail fast on wrong port)
try:
requests.get(
f"http://localhost:{self.metrics_port}/metrics", timeout=5
)
return True
except requests.exceptions.RequestException:
print(
f"Warning: server healthy but metrics port {self.metrics_port} not reachable yet"
)
if self.process.poll() is not None: if self.process.poll() is not None:
# Process exited, wait for tee thread to finish # Process exited, wait for tee thread to finish
for t in self._tee_threads: for t in self._tee_threads:
...@@ -274,10 +305,13 @@ class LLMServerManager: ...@@ -274,10 +305,13 @@ class LLMServerManager:
self._tee_threads = [] self._tee_threads = []
self._close_log_files() self._close_log_files()
# Deallocate port if we allocated it # Deallocate ports if we allocated them
if self.port_allocated: if self.port_allocated:
deallocate_port(self.port) deallocate_port(self.port)
self.port_allocated = False self.port_allocated = False
if self.metrics_port_allocated:
deallocate_port(self.metrics_port)
self.metrics_port_allocated = False
def _close_log_files(self): def _close_log_files(self):
if self.server_stdout_file: if self.server_stdout_file:
...@@ -425,6 +459,9 @@ class TestDeterminismAgg(BaseTestDeterminism): ...@@ -425,6 +459,9 @@ class TestDeterminismAgg(BaseTestDeterminism):
indirect=True, indirect=True,
) )
@pytest.mark.kvbm @pytest.mark.kvbm
@pytest.mark.timeout(
_CACHE_RESET_TIMEOUT
) # ~368s actual measured on 32-core machine
def test_determinism_agg_with_cache_reset( def test_determinism_agg_with_cache_reset(
self, tester, llm_server, runtime_services self, tester, llm_server, runtime_services
): ):
...@@ -448,6 +485,9 @@ class TestDeterminismAgg(BaseTestDeterminism): ...@@ -448,6 +485,9 @@ class TestDeterminismAgg(BaseTestDeterminism):
@pytest.mark.skipif( @pytest.mark.skipif(
not HAS_VLLM_BENCH, reason="requires vllm bench (vllm module not found)" not HAS_VLLM_BENCH, reason="requires vllm bench (vllm module not found)"
) )
@pytest.mark.timeout(
_CONCURRENT_TIMEOUT
) # ~601s actual measured on 32-core machine
def test_concurrent_determinism_under_load( def test_concurrent_determinism_under_load(
self, tester, llm_server, runtime_services self, tester, llm_server, runtime_services
): ):
......
...@@ -114,7 +114,7 @@ def tester(llm_server_kvbm): # noqa: F811 ...@@ -114,7 +114,7 @@ def tester(llm_server_kvbm): # noqa: F811
# Tests # Tests
@pytest.mark.parametrize("llm_server_kvbm", [{"model": KVBM_TEST_MODEL}], indirect=True) @pytest.mark.parametrize("llm_server_kvbm", [{"model": KVBM_TEST_MODEL}], indirect=True)
@pytest.mark.timeout(110) # 3x measured time (36.31s), rounded up @pytest.mark.timeout(170) # 4x measured (~41s), rounded up
def test_offload_and_onboard(tester, llm_server_kvbm): # noqa: F811 def test_offload_and_onboard(tester, llm_server_kvbm): # noqa: F811
""" """
Test offload → cache reset → onboard cycle with determinism verification. Test offload → cache reset → onboard cycle with determinism verification.
...@@ -181,7 +181,7 @@ def test_offload_and_onboard(tester, llm_server_kvbm): # noqa: F811 ...@@ -181,7 +181,7 @@ def test_offload_and_onboard(tester, llm_server_kvbm): # noqa: F811
[{"cpu_blocks": 200, "gpu_blocks": 20, "model": KVBM_TEST_MODEL}], [{"cpu_blocks": 200, "gpu_blocks": 20, "model": KVBM_TEST_MODEL}],
indirect=True, indirect=True,
) )
@pytest.mark.timeout(190) # 3x measured time (63.39s), rounded up @pytest.mark.timeout(170) # 4x measured (~42s), rounded up
def test_gpu_cache_eviction(tester, llm_server_kvbm): # noqa: F811 def test_gpu_cache_eviction(tester, llm_server_kvbm): # noqa: F811
""" """
Test GPU cache eviction mechanics. Test GPU cache eviction mechanics.
...@@ -256,7 +256,7 @@ def test_gpu_cache_eviction(tester, llm_server_kvbm): # noqa: F811 ...@@ -256,7 +256,7 @@ def test_gpu_cache_eviction(tester, llm_server_kvbm): # noqa: F811
[{"cpu_blocks": 200, "gpu_blocks": 20, "model": KVBM_TEST_MODEL}], [{"cpu_blocks": 200, "gpu_blocks": 20, "model": KVBM_TEST_MODEL}],
indirect=True, indirect=True,
) )
@pytest.mark.timeout(107) # 3x measured time (35.40s), rounded up @pytest.mark.timeout(160) # 4x measured (~39s), rounded up
def test_onboarding_determinism(tester, llm_server_kvbm): # noqa: F811 def test_onboarding_determinism(tester, llm_server_kvbm): # noqa: F811
""" """
Test onboarding determinism under eviction scenario. Test onboarding determinism under eviction scenario.
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment