Unverified Commit 1e828573 authored by Andreas Karatzas's avatar Andreas Karatzas Committed by GitHub
Browse files

[CI][Metrics] Stabilize tests with polling and subprocess guards (#34566)



test_abort_metrics_reset is flaky due to hardware-dependent
fixed sleeps: replace fixed sleeps with polling.

test_metrics_exist_run_batch passes even when the engine crashes
on startup (false positive): add subprocess lifecycle guards.
Signed-off-by: default avatarAndreas Karatzas <akaratza@amd.com>
parent a5ccc85c
......@@ -17,6 +17,7 @@ from transformers import AutoTokenizer
from tests.conftest import LocalAssetServer
from tests.utils import RemoteOpenAIServer
from vllm import version
from vllm.utils.network_utils import get_open_port
MODELS = {
"text": "TinyLlama/TinyLlama-1.1B-Chat-v1.0",
......@@ -315,14 +316,26 @@ async def test_abort_metrics_reset(
client.completions.create(
model=model_name,
prompt=prompt_ids,
max_tokens=100, # Long generation to give time to abort
max_tokens=500, # Long generation to give time to abort
temperature=0.0,
)
)
tasks.append(task)
# Wait a bit for requests to start processing
await asyncio.sleep(0.5)
# Poll until we see running requests rather than using a fixed sleep,
# since generation speed varies across hardware.
try:
await _poll_until(
lambda: _get_running_metrics_from_api(server)[0] > 0,
timeout=10.0,
interval=0.1,
description="running_requests > 0",
)
except TimeoutError:
for task in tasks:
task.cancel()
await asyncio.gather(*tasks, return_exceptions=True)
pytest.fail("Requests never appeared as running in metrics")
# Check that we have running requests
running_requests, waiting_requests, kv_cache_usage = _get_running_metrics_from_api(
......@@ -336,13 +349,15 @@ async def test_abort_metrics_reset(
# Cancel all tasks to abort the requests
for task in tasks:
task.cancel()
# Wait for cancellations to be processed
await asyncio.sleep(1.0)
# Check that metrics have reset to zero
response = requests.get(server.url_for("metrics"))
assert response.status_code == HTTPStatus.OK
await asyncio.gather(*tasks, return_exceptions=True)
# Poll until metrics reset rather than using a fixed sleep
await _poll_until(
lambda: _get_running_metrics_from_api(server) == (0, 0, 0),
timeout=10.0,
interval=0.2,
description="gauge metrics back to zero",
)
# Verify running and waiting requests counts and KV cache usage are zero
running_requests_after, waiting_requests_after, kv_cache_usage_after = (
......@@ -360,6 +375,18 @@ async def test_abort_metrics_reset(
)
async def _poll_until(
predicate, *, timeout: float, interval: float = 0.5, description: str = "condition"
):
"""Poll until predicate() returns True, or raise TimeoutError."""
start = time.time()
while time.time() - start < timeout:
if predicate():
return
await asyncio.sleep(interval)
raise TimeoutError(f"Timed out after {timeout}s waiting for: {description}")
def _get_running_metrics_from_api(server: RemoteOpenAIServer):
"""Return (running_count, waiting_count, kv_cache_usage)"""
......@@ -399,7 +426,7 @@ def test_metrics_exist_run_batch():
input_batch = """{"custom_id": "request-0", "method": "POST", "url": "/v1/embeddings", "body": {"model": "intfloat/multilingual-e5-small", "input": "You are a helpful assistant."}}""" # noqa: E501
base_url = "0.0.0.0"
port = "8001"
port = str(get_open_port())
server_url = f"http://{base_url}:{port}"
with (
......@@ -427,17 +454,32 @@ def test_metrics_exist_run_batch():
],
)
def is_server_up(url):
try:
def is_server_up(url):
try:
response = requests.get(url)
return response.status_code == 200
except requests.ConnectionError:
return False
start = time.time()
timeout = 120
while not is_server_up(server_url):
if proc.poll() is not None:
pytest.fail(
f"Batch process exited early with returncode={proc.returncode}"
)
if time.time() - start > timeout:
pytest.fail("Batch server did not start within timeout")
time.sleep(1)
response = requests.get(server_url + "/metrics")
assert response.status_code == HTTPStatus.OK
finally:
proc.terminate()
try:
response = requests.get(url)
return response.status_code == 200
except requests.ConnectionError:
return False
while not is_server_up(server_url):
time.sleep(1)
response = requests.get(server_url + "/metrics")
assert response.status_code == HTTPStatus.OK
proc.wait()
proc.wait(timeout=15)
except subprocess.TimeoutExpired:
proc.kill()
proc.wait(timeout=5)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment