Unverified Commit 28044799 authored by Keiven C's avatar Keiven C Committed by GitHub
Browse files

refactor: frontend/test_vllm.py to use shared dynamic ports (#5013)


Signed-off-by: default avatarKeiven Chang <keivenchang@users.noreply.github.com>
Co-authored-by: default avatarKeiven Chang <keivenchang@users.noreply.github.com>
parent 6df20955
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0 # SPDX-License-Identifier: Apache-2.0
"""End-to-end tests covering reasoning effort behaviour.""" """End-to-end tests covering reasoning effort behaviour.
Runtime note:
- `python -m pytest tests/frontend/test_vllm.py -v` took ~228s (3m48s) wall time.
- Measured on: Ubuntu 24.04.2, Intel(R) Core(TM) i9-14900K (32 CPUs), NVIDIA RTX 6000 Ada Generation (1 warmup run + 1 measured run).
- Expect variance depending on model cache state, compilation warmup, and system load.
"""
from __future__ import annotations from __future__ import annotations
import logging import logging
import os import os
import shutil import shutil
from typing import Any, Dict, Optional, Tuple from typing import Any, Dict, Generator, Optional, Tuple
import pytest import pytest
import requests import requests
from tests.conftest import EtcdServer, NatsServer
from tests.utils.constants import GPT_OSS from tests.utils.constants import GPT_OSS
from tests.utils.managed_process import ManagedProcess from tests.utils.managed_process import DynamoFrontendProcess, ManagedProcess
from tests.utils.payloads import check_models_api from tests.utils.payloads import check_models_api
from tests.utils.port_utils import ServicePorts
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
...@@ -62,40 +68,20 @@ SYSTEM_HEALTH_TOOL = { ...@@ -62,40 +68,20 @@ SYSTEM_HEALTH_TOOL = {
} }
class DynamoFrontendProcess(ManagedProcess):
"""Process manager for Dynamo frontend"""
def __init__(self, request):
command = ["python", "-m", "dynamo.frontend", "--router-mode", "round-robin"]
# Unset DYN_SYSTEM_PORT - frontend doesn't use system metrics server
env = os.environ.copy()
env.pop("DYN_SYSTEM_PORT", None)
log_dir = f"{request.node.name}_frontend"
# Clean up any existing log directory from previous runs
try:
shutil.rmtree(log_dir)
logger.info(f"Cleaned up existing log directory: {log_dir}")
except FileNotFoundError:
# Directory doesn't exist, which is fine
pass
super().__init__(
command=command,
env=env,
display_output=True,
terminate_existing=True,
log_dir=log_dir,
)
class VllmWorkerProcess(ManagedProcess): class VllmWorkerProcess(ManagedProcess):
"""Vllm Worker process for GPT-OSS model.""" """Vllm Worker process for GPT-OSS model."""
def __init__(self, request, worker_id: str = "vllm-worker"): def __init__(
self,
request,
*,
frontend_port: int,
system_port: int,
worker_id: str = "vllm-worker",
):
self.worker_id = worker_id self.worker_id = worker_id
self.frontend_port = int(frontend_port)
self.system_port = int(system_port)
command = [ command = [
"python3", "python3",
...@@ -114,7 +100,7 @@ class VllmWorkerProcess(ManagedProcess): ...@@ -114,7 +100,7 @@ class VllmWorkerProcess(ManagedProcess):
env = os.environ.copy() env = os.environ.copy()
env["DYN_LOG"] = "debug" env["DYN_LOG"] = "debug"
env["DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS"] = '["generate"]' env["DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS"] = '["generate"]'
env["DYN_SYSTEM_PORT"] = "8083" env["DYN_SYSTEM_PORT"] = str(self.system_port)
log_dir = f"{request.node.name}_{worker_id}" log_dir = f"{request.node.name}_{worker_id}"
...@@ -127,8 +113,8 @@ class VllmWorkerProcess(ManagedProcess): ...@@ -127,8 +113,8 @@ class VllmWorkerProcess(ManagedProcess):
command=command, command=command,
env=env, env=env,
health_check_urls=[ health_check_urls=[
("http://localhost:8000/v1/models", check_models_api), (f"http://localhost:{self.frontend_port}/v1/models", check_models_api),
("http://localhost:8083/health", self.is_ready), (f"http://localhost:{self.system_port}/health", self.is_ready),
], ],
timeout=500, timeout=500,
display_output=True, display_output=True,
...@@ -155,13 +141,15 @@ class VllmWorkerProcess(ManagedProcess): ...@@ -155,13 +141,15 @@ class VllmWorkerProcess(ManagedProcess):
def _send_chat_request( def _send_chat_request(
payload: Dict[str, Any], payload: Dict[str, Any],
*,
base_url: str,
timeout: int = 180, timeout: int = 180,
) -> requests.Response: ) -> requests.Response:
"""Send a chat completion request with a specific payload.""" """Send a chat completion request with a specific payload."""
headers = {"Content-Type": "application/json"} headers = {"Content-Type": "application/json"}
response = requests.post( response = requests.post(
"http://localhost:8000/v1/chat/completions", f"{base_url}/v1/chat/completions",
headers=headers, headers=headers,
json=payload, json=payload,
timeout=timeout, timeout=timeout,
...@@ -169,22 +157,39 @@ def _send_chat_request( ...@@ -169,22 +157,39 @@ def _send_chat_request(
return response return response
@pytest.fixture(scope="module") @pytest.fixture(scope="function")
def runtime_services(request): def start_services(
"""Module-scoped runtime services for this test file.""" request, runtime_services_dynamic_ports, dynamo_dynamic_ports: ServicePorts
with NatsServer(request) as nats_process: ) -> Generator[ServicePorts, None, None]:
with EtcdServer(request) as etcd_process: """Start frontend and worker processes for this test.
yield nats_process, etcd_process
`runtime_services_dynamic_ports` ensures NATS/etcd run on per-test ports and sets
NATS_SERVER/ETCD_ENDPOINTS env vars for Dynamo to discover them.
@pytest.fixture(scope="module")
def start_services(request, runtime_services): This fixture also *returns the exact ports used to launch the services* so tests
"""Start frontend and worker processes once for this module's tests.""" cannot accidentally construct requests against a different `dynamo_dynamic_ports`
with DynamoFrontendProcess(request): instance (e.g., if fixture scopes/usage are changed in the future).
"""
_ = runtime_services_dynamic_ports
frontend_port = dynamo_dynamic_ports.frontend_port
system_port = dynamo_dynamic_ports.system_ports[0]
with DynamoFrontendProcess(
request,
frontend_port=frontend_port,
# Optional debugging (not enabled on main):
# If the frontend hits a Rust panic, enabling backtraces makes failures diagnosable
# from CI logs without needing to repro locally.
# extra_env={"RUST_BACKTRACE": "1", "TOKIO_BACKTRACE": "1"},
terminate_existing=False,
):
logger.info("Frontend started for tests") logger.info("Frontend started for tests")
with VllmWorkerProcess(request): with VllmWorkerProcess(
request,
frontend_port=frontend_port,
system_port=system_port,
):
logger.info("Vllm Worker started for tests") logger.info("Vllm Worker started for tests")
yield yield dynamo_dynamic_ports
def _extract_reasoning_metrics(data: Dict[str, Any]) -> Tuple[str, Optional[int]]: def _extract_reasoning_metrics(data: Dict[str, Any]) -> Tuple[str, Optional[int]]:
...@@ -217,9 +222,11 @@ def _validate_chat_response(response: requests.Response) -> Dict[str, Any]: ...@@ -217,9 +222,11 @@ def _validate_chat_response(response: requests.Response) -> Dict[str, Any]:
return response_json return response_json
@pytest.mark.usefixtures("start_services") @pytest.mark.timeout(240) # ~3x measured total (~70s/test), rounded up
@pytest.mark.post_merge @pytest.mark.post_merge
def test_reasoning_effort(request, runtime_services, predownload_models) -> None: def test_reasoning_effort(
request, start_services: ServicePorts, predownload_models
) -> None:
"""High reasoning effort should yield more detailed reasoning than low effort.""" """High reasoning effort should yield more detailed reasoning than low effort."""
prompt = ( prompt = (
...@@ -252,12 +259,13 @@ def test_reasoning_effort(request, runtime_services, predownload_models) -> None ...@@ -252,12 +259,13 @@ def test_reasoning_effort(request, runtime_services, predownload_models) -> None
"chat_template_args": {"reasoning_effort": "low"}, "chat_template_args": {"reasoning_effort": "low"},
} }
high_response = _send_chat_request(high_payload) base_url = f"http://localhost:{start_services.frontend_port}"
high_response = _send_chat_request(high_payload, base_url=base_url)
high_reasoning_text, high_reasoning_tokens = _extract_reasoning_metrics( high_reasoning_text, high_reasoning_tokens = _extract_reasoning_metrics(
_validate_chat_response(high_response) _validate_chat_response(high_response)
) )
low_response = _send_chat_request(low_payload) low_response = _send_chat_request(low_payload, base_url=base_url)
low_reasoning_text, low_reasoning_tokens = _extract_reasoning_metrics( low_reasoning_text, low_reasoning_tokens = _extract_reasoning_metrics(
_validate_chat_response(low_response) _validate_chat_response(low_response)
) )
...@@ -280,9 +288,11 @@ def test_reasoning_effort(request, runtime_services, predownload_models) -> None ...@@ -280,9 +288,11 @@ def test_reasoning_effort(request, runtime_services, predownload_models) -> None
) )
@pytest.mark.usefixtures("start_services") @pytest.mark.timeout(180) # ~3x measured total (~50s/test), rounded up
@pytest.mark.post_merge @pytest.mark.post_merge
def test_tool_calling(request, runtime_services, predownload_models) -> None: def test_tool_calling(
request, start_services: ServicePorts, predownload_models
) -> None:
"""Test tool calling functionality with weather and system health tools.""" """Test tool calling functionality with weather and system health tools."""
payload = { payload = {
...@@ -302,7 +312,8 @@ def test_tool_calling(request, runtime_services, predownload_models) -> None: ...@@ -302,7 +312,8 @@ def test_tool_calling(request, runtime_services, predownload_models) -> None:
"response_format": {"type": "text"}, "response_format": {"type": "text"},
} }
response = _send_chat_request(payload) base_url = f"http://localhost:{start_services.frontend_port}"
response = _send_chat_request(payload, base_url=base_url)
response_data = _validate_chat_response(response) response_data = _validate_chat_response(response)
logger.info("Tool call response: %s", response_data) logger.info("Tool call response: %s", response_data)
...@@ -319,10 +330,10 @@ def test_tool_calling(request, runtime_services, predownload_models) -> None: ...@@ -319,10 +330,10 @@ def test_tool_calling(request, runtime_services, predownload_models) -> None:
), "Expected get_current_weather tool to be called" ), "Expected get_current_weather tool to be called"
@pytest.mark.usefixtures("start_services") @pytest.mark.timeout(180) # ~3x measured total (~50s/test), rounded up
@pytest.mark.nightly @pytest.mark.nightly
def test_tool_calling_second_round( def test_tool_calling_second_round(
request, runtime_services, predownload_models request, start_services: ServicePorts, predownload_models
) -> None: ) -> None:
"""Test tool calling with a follow-up message containing assistant's prior tool calls.""" """Test tool calling with a follow-up message containing assistant's prior tool calls."""
...@@ -364,7 +375,8 @@ def test_tool_calling_second_round( ...@@ -364,7 +375,8 @@ def test_tool_calling_second_round(
"response_format": {"type": "text"}, "response_format": {"type": "text"},
} }
response = _send_chat_request(payload) base_url = f"http://localhost:{start_services.frontend_port}"
response = _send_chat_request(payload, base_url=base_url)
response_data = _validate_chat_response(response) response_data = _validate_chat_response(response)
logger.info("Tool call second round response: %s", response_data) logger.info("Tool call second round response: %s", response_data)
...@@ -382,9 +394,9 @@ def test_tool_calling_second_round( ...@@ -382,9 +394,9 @@ def test_tool_calling_second_round(
), "Expected response to include temperature information from tool call result (20°C)" ), "Expected response to include temperature information from tool call result (20°C)"
@pytest.mark.usefixtures("start_services") @pytest.mark.timeout(180) # ~3x measured total (~57s/test), rounded up
@pytest.mark.nightly @pytest.mark.nightly
def test_reasoning(request, runtime_services, predownload_models) -> None: def test_reasoning(request, start_services: ServicePorts, predownload_models) -> None:
"""Test reasoning functionality with a mathematical problem.""" """Test reasoning functionality with a mathematical problem."""
payload = { payload = {
...@@ -402,7 +414,8 @@ def test_reasoning(request, runtime_services, predownload_models) -> None: ...@@ -402,7 +414,8 @@ def test_reasoning(request, runtime_services, predownload_models) -> None:
"max_tokens": 2000, "max_tokens": 2000,
} }
response = _send_chat_request(payload) base_url = f"http://localhost:{start_services.frontend_port}"
response = _send_chat_request(payload, base_url=base_url)
response_data = _validate_chat_response(response) response_data = _validate_chat_response(response)
logger.info("Reasoning response: %s", response_data) logger.info("Reasoning response: %s", response_data)
......
...@@ -600,11 +600,6 @@ class DynamoFrontendProcess(ManagedProcess): ...@@ -600,11 +600,6 @@ class DynamoFrontendProcess(ManagedProcess):
): ):
# TODO: Refactor remaining duplicate "DynamoFrontendProcess" helpers in tests to # TODO: Refactor remaining duplicate "DynamoFrontendProcess" helpers in tests to
# use this shared implementation (and delete the copies): # use this shared implementation (and delete the copies):
# - tests/frontend/test_vllm.py
# - tests/router/common.py
# - tests/router/test_router_e2e_with_vllm.py
# - tests/router/test_router_e2e_with_sglang.py
# - tests/router/test_router_e2e_with_trtllm.py
# - tests/fault_tolerance/cancellation/utils.py # - tests/fault_tolerance/cancellation/utils.py
# - tests/fault_tolerance/migration/utils.py # - tests/fault_tolerance/migration/utils.py
# - tests/fault_tolerance/etcd_ha/utils.py # - tests/fault_tolerance/etcd_ha/utils.py
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment