Unverified Commit f962c7d6 authored by Yan Ru Pei's avatar Yan Ru Pei Committed by GitHub
Browse files

test: e2e test of two Router replicas with mockers (#2324)

parent 9630693e
......@@ -144,7 +144,9 @@ def test_mocker_kv_router(request, runtime_services):
# Use async to send requests concurrently for better performance
asyncio.run(
send_concurrent_requests(
f"http://localhost:{frontend_port}/v1/chat/completions",
[
f"http://localhost:{frontend_port}/v1/chat/completions"
], # Pass as list
test_payload,
NUM_REQUESTS,
)
......@@ -164,6 +166,93 @@ def test_mocker_kv_router(request, runtime_services):
os.unlink(mocker_args_file)
@pytest.mark.pre_merge
def test_mocker_two_kv_router(request, runtime_services):
"""
Test with two KV routers and multiple mocker engine instances.
Alternates requests between the two routers to test load distribution.
"""
# runtime_services starts etcd and nats
logger.info("Starting mocker two KV router test")
# Create mocker args file
mocker_args = {"speedup_ratio": SPEEDUP_RATIO, "block_size": BLOCK_SIZE}
mocker_args_file = os.path.join(request.node.name, "mocker_args.json")
with open(mocker_args_file, "w") as f:
json.dump(mocker_args, f)
# Start mocker instances
mocker_processes = []
kv_routers = []
try:
# Start two KV routers (frontend) on ports 8091 and 8092
router_ports = [PORT + 1, PORT + 2] # 8091 and 8092
for port in router_ports:
logger.info(f"Starting KV router frontend on port {port}")
kv_router = KVRouterProcess(request, port)
kv_router.__enter__()
kv_routers.append(kv_router)
for i in range(NUM_MOCKERS):
# Use unique endpoints for each mocker
endpoint = "dyn://test-namespace.mocker.generate"
logger.info(f"Starting mocker instance {i} on endpoint {endpoint}")
mocker = MockerProcess(request, endpoint, mocker_args_file)
mocker_processes.append(mocker)
# Start all mockers
for mocker in mocker_processes:
mocker.__enter__()
# Send test requests
test_payload = {
"model": MODEL_NAME,
"messages": [
{
"role": "user",
"content": "In a quiet meadow tucked between rolling hills, a plump gray rabbit nibbled on clover beneath the shade of a gnarled oak tree. Its ears twitched at the faint rustle of leaves, but it remained calm, confident in the safety of its burrow just a few hops away. The late afternoon sun warmed its fur, and tiny dust motes danced in the golden light as bees hummed lazily nearby. Though the rabbit lived a simple life, every day was an adventure of scents, shadows, and snacks—an endless search for the tastiest patch of greens and the softest spot to nap.",
}
],
"stream": True,
"max_tokens": 10,
}
# Build URLs for both routers
router_urls = [
f"http://localhost:{port}/v1/chat/completions" for port in router_ports
]
# Use async to send requests concurrently, alternating between routers
asyncio.run(
send_concurrent_requests(
router_urls,
test_payload,
NUM_REQUESTS,
)
)
logger.info(
f"Successfully completed {NUM_REQUESTS} requests across {len(router_ports)} routers"
)
finally:
# Clean up routers
for kv_router in kv_routers:
kv_router.__exit__(None, None, None)
# Clean up mockers
for mocker in mocker_processes:
mocker.__exit__(None, None, None)
if os.path.exists(mocker_args_file):
os.unlink(mocker_args_file)
async def send_request_with_retry(url: str, payload: dict, max_retries: int = 4):
"""Send a single request with exponential backoff retry"""
wait_time = 1 # Start with 1 second
......@@ -192,20 +281,25 @@ async def send_request_with_retry(url: str, payload: dict, max_retries: int = 4)
return False
async def send_concurrent_requests(url: str, payload: dict, num_requests: int):
"""Send multiple requests concurrently and verify responses"""
async def send_concurrent_requests(urls: list, payload: dict, num_requests: int):
"""Send multiple requests concurrently, alternating between URLs if multiple provided"""
# First, send a test request with retry to ensure the system is ready
logger.info("Sending initial test request with retry...")
if not await send_request_with_retry(url, payload):
raise RuntimeError("Failed to connect after multiple retries")
# First, send test requests with retry to ensure all systems are ready
for i, url in enumerate(urls):
logger.info(f"Sending initial test request to URL {i} ({url}) with retry...")
if not await send_request_with_retry(url, payload):
raise RuntimeError(f"Failed to connect to URL {i} after multiple retries")
async def send_single_request(session: aiohttp.ClientSession, request_id: int):
# Alternate between URLs based on request_id
url = urls[request_id % len(urls)]
url_index = request_id % len(urls)
try:
async with session.post(url, json=payload) as response:
if response.status != 200:
logger.error(
f"Request {request_id} failed with status {response.status}"
f"Request {request_id} to URL {url_index} failed with status {response.status}"
)
return False
......@@ -216,12 +310,14 @@ async def send_concurrent_requests(url: str, payload: dict, num_requests: int):
chunks.append(line)
logger.debug(
f"Request {request_id} completed with {len(chunks)} chunks"
f"Request {request_id} to URL {url_index} completed with {len(chunks)} chunks"
)
return True
except Exception as e:
logger.error(f"Request {request_id} failed with error: {e}")
logger.error(
f"Request {request_id} to URL {url_index} failed with error: {e}"
)
return False
# Send all requests at once
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment