Unverified Commit 9321c2e5 authored by Jacky's avatar Jacky Committed by GitHub
Browse files

test: Explicit None token thresholds disable Router initiated Rejection (#8477)


Signed-off-by: default avatarJacky <18255193+kthui@users.noreply.github.com>
parent 2a95ef63
......@@ -1063,6 +1063,192 @@ def _test_router_overload_503(
)
def _test_router_threshold_none_disables_rejection(
engine_workers,
block_size: int,
request,
frontend_port: int,
test_payload: dict,
num_requests: int = 4,
):
"""Test that explicit CLI "None" thresholds disable busy-based rejection end to end.
Assumes engine_workers are already initialized. This function manages router lifecycle.
Starts the frontend with literal CLI "None" values for all three threshold knobs,
verifies the /busy_threshold API reports nulls, then sends overload-shaped traffic and
confirms no request is rejected with 503 and the frontend rejection metric stays at 0.
Args:
engine_workers: Backend workers (mocker/vllm) already initialized with __enter__()
block_size: Block size for KV cache
request: Pytest request fixture for managing resources
frontend_port: Port for the frontend HTTP server
test_payload: Base test payload to send to /v1/chat/completions
num_requests: Number of concurrent requests to send under load
Raises:
AssertionError: If thresholds are not null or any request is rejected/fails
"""
logger.info(
"Starting KV router frontend on port %s with explicit CLI None thresholds",
frontend_port,
)
with KVRouterProcess(
request=request,
block_size=block_size,
frontend_port=frontend_port,
namespace=engine_workers.namespace,
blocks_threshold="None",
tokens_threshold="None",
tokens_threshold_frac="None",
):
frontend_url = f"http://localhost:{frontend_port}"
chat_url = f"{frontend_url}/v1/chat/completions"
busy_threshold_url = f"{frontend_url}/busy_threshold"
model_name = test_payload.get("model", "test-model")
load_payload = {
**test_payload,
"max_tokens": 50,
}
logger.info("Waiting for frontend readiness before explicit-None test...")
asyncio.run(
wait_for_frontend_ready(
frontend_url=frontend_url,
expected_num_workers=1,
timeout=60,
)
)
async def verify_thresholds_and_send_load():
async with aiohttp.ClientSession() as session:
logger.info(
"Checking /busy_threshold reports nulls for explicit CLI None thresholds"
)
async with session.post(
busy_threshold_url,
json={"model": model_name},
) as response:
assert (
response.status == 200
), f"POST /busy_threshold (get) failed with status {response.status}"
data = await response.json()
assert (
data.get("active_decode_blocks_threshold") is None
), f"Expected active_decode_blocks_threshold=None: {data}"
assert (
data.get("active_prefill_tokens_threshold") is None
), f"Expected active_prefill_tokens_threshold=None: {data}"
assert (
data.get("active_prefill_tokens_threshold_frac") is None
), f"Expected active_prefill_tokens_threshold_frac=None: {data}"
logger.info(
"POST /busy_threshold returned expected null thresholds: %s",
data,
)
logger.info(
"Launching overload-shaped traffic with explicit None thresholds..."
)
stop_event = asyncio.Event()
response_statuses = asyncio.Queue()
async with aiohttp.ClientSession() as session:
async def send_request(req_id: int, payload: dict) -> int:
try:
async with session.post(chat_url, json=payload) as response:
if response.status == 200:
logger.info(
"Request %s accepted without rejection", req_id
)
await response_statuses.put(response.status)
await stop_event.wait()
return response.status
body = await response.text()
logger.info(
"Request %s got unexpected status %s: %s",
req_id,
response.status,
body,
)
await response_statuses.put(response.status)
return response.status
except asyncio.CancelledError:
raise
except Exception as exc:
logger.info("Request %s failed with error: %s", req_id, exc)
await response_statuses.put(exc)
raise
tasks = []
try:
for i in range(num_requests):
content_words = test_payload["messages"][0]["content"].split()
random.shuffle(content_words)
unique_payload = {
**load_payload,
"messages": [
{
**test_payload["messages"][0],
"content": " ".join(content_words),
}
],
}
tasks.append(
asyncio.create_task(send_request(i, unique_payload))
)
await asyncio.sleep(0.1)
finally:
initial_results = [
await response_statuses.get() for _ in range(num_requests)
]
stop_event.set()
done = await asyncio.gather(*tasks, return_exceptions=True)
for result in initial_results + done:
if isinstance(result, Exception):
raise result
return done
results = asyncio.run(verify_thresholds_and_send_load())
num_succeeded = sum(1 for status in results if status == 200)
num_rejected = sum(1 for status in results if status == 503)
num_other = sum(1 for status in results if status not in (200, 503))
logger.info(
"Results with explicit None thresholds: %s succeeded, %s rejected (503), %s other",
num_succeeded,
num_rejected,
num_other,
)
assert num_rejected == 0, f"Expected 0 rejections, but got {num_rejected}"
assert (
num_other == 0
), f"Expected only 200 or 503 responses, but got {num_other} other"
assert num_succeeded > 0, "Expected at least one successful request"
assert (
num_succeeded == num_requests
), f"Expected {num_requests} successful requests, got {num_succeeded}"
_verify_frontend_rejection_metrics(
frontend_port,
model_name,
"chat_completions",
0,
)
logger.info(
"Explicit CLI None thresholds disabled busy rejection as expected: %s successes, metrics clean",
num_succeeded,
)
async def _zmq_replay_cycle(
phase: int,
router,
......
......@@ -31,6 +31,7 @@ from tests.router.common import (
_test_router_indexers_sync,
_test_router_overload_503,
_test_router_query_instance_id,
_test_router_threshold_none_disables_rejection,
_test_router_two_routers,
)
from tests.router.helper import (
......@@ -977,6 +978,38 @@ def test_mocker_kv_router_overload_503(
)
@pytest.mark.parametrize(
"durable_kv_events", [False], ids=["nondurable"], indirect=True
) # Use NATS Core (local indexer)
@pytest.mark.timeout(45)
def test_mocker_kv_router_threshold_none_disables_rejection(
request, runtime_services_dynamic_ports, predownload_tokenizers, durable_kv_events
):
"""Test that explicit CLI None thresholds disable KV router overload rejection."""
logger.info("Starting mocker KV router explicit-None threshold test")
mocker_args = {
"speedup_ratio": 0.01,
"block_size": 4,
"num_gpu_blocks": 64,
"durable_kv_events": durable_kv_events,
}
with MockerProcess(request, mocker_args=mocker_args, num_mockers=1) as mockers:
logger.info("Starting single mocker instance with limited resources")
logger.info(f"Mocker using endpoint: {mockers.endpoint}")
frontend_port = get_unique_ports(request, num_ports=1)[0]
_test_router_threshold_none_disables_rejection(
engine_workers=mockers,
block_size=4,
request=request,
frontend_port=frontend_port,
test_payload=TEST_PAYLOAD,
num_requests=4,
)
@pytest.mark.timeout(90) # bumped for xdist contention (was 22s; ~7.10s serial avg)
@pytest.mark.parametrize("request_plane", ["nats", "tcp"], indirect=True)
@pytest.mark.parametrize(
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment