Unverified Commit d83854b5 authored by Tzu-Ling Kan's avatar Tzu-Ling Kan Committed by GitHub
Browse files

refactor: Rewrite test_metrics_labels to use TRTLLMProcess. (#2794)


Signed-off-by: default avatartzulingk@nvidia.com <tzulingk@nvidia.com>
parent 882ae1b4
#!/bin/bash
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
# Environment variables with defaults
export MODEL_PATH=${MODEL_PATH:-"Qwen/Qwen3-0.6B"}
export SERVED_MODEL_NAME=${SERVED_MODEL_NAME:-"Qwen/Qwen3-0.6B"}
export AGG_ENGINE_ARGS=${AGG_ENGINE_ARGS:-"engine_configs/agg.yaml"}
export MODALITY=${MODALITY:-"text"}
# Setup cleanup trap
cleanup() {
echo "Cleaning up background processes..."
kill $DYNAMO_PID 2>/dev/null || true
wait $DYNAMO_PID 2>/dev/null || true
echo "Cleanup complete."
}
trap cleanup EXIT INT TERM
# Run clear_namespace
python3 utils/clear_namespace.py --namespace dynamo
# Run frontend
python3 -m dynamo.frontend --http-port 8000 &
DYNAMO_PID=$!
# Run worker
DYN_SYSTEM_ENABLED=true DYN_SYSTEM_PORT=8081 \
python3 -m dynamo.trtllm \
--model-path "$MODEL_PATH" \
--served-model-name "$SERVED_MODEL_NAME" \
--modality "$MODALITY" \
--extra-engine-args "$AGG_ENGINE_ARGS" \
--publish-events-and-metrics
......@@ -12,6 +12,7 @@ from tests.serve.common import EngineConfig, create_payload_for_config
from tests.utils.deployment_graph import (
chat_completions_response_handler,
completions_response_handler,
metrics_handler,
)
from tests.utils.engine_process import EngineProcess
......@@ -28,6 +29,7 @@ class TRTLLMProcess(EngineProcess):
def __init__(self, config: TRTLLMConfig, request):
self.port = 8000
self.backend_metrics_port = 8081
self.config = config
self.dir = config.directory
script_path = os.path.join(self.dir, "launch", config.script_name)
......@@ -76,12 +78,17 @@ def run_trtllm_test_case(config: TRTLLMConfig, request) -> None:
)
for _ in range(payload.repeat_count):
elapsed = time.time() - start_time
response = server_process.send_request(
url, payload=request_body, timeout=config.timeout - elapsed
)
server_process.check_response(payload, response, response_handler)
if endpoint == "metrics":
response = server_process.get_metrics(
server_process.backend_metrics_port
)
response_handler(response)
else:
elapsed = time.time() - start_time
response = server_process.send_request(
url, payload=request_body, timeout=config.timeout - elapsed
)
server_process.check_response(payload, response, response_handler)
# trtllm test configurations
......@@ -137,6 +144,18 @@ trtllm_configs = {
],
model="Qwen/Qwen3-0.6B",
),
"aggregated_metrics": TRTLLMConfig(
name="aggregated_metrics",
directory="/workspace/components/backends/trtllm",
script_name="agg_metrics.sh",
marks=[pytest.mark.gpu_1, pytest.mark.trtllm_marker],
endpoints=[
"v1/chat/completions",
"metrics",
], # Make a request to make sure the model is loaded and metrics are published.
response_handlers=[chat_completions_response_handler, metrics_handler],
model="Qwen/Qwen3-0.6B",
),
}
......@@ -169,173 +188,6 @@ def test_deployment(trtllm_config_test, request, runtime_services):
run_trtllm_test_case(config, request)
@pytest.mark.e2e
@pytest.mark.gpu_1
@pytest.mark.trtllm_marker
@pytest.mark.slow
def test_metrics_labels(request, runtime_services):
"""
Test that the trtllm backend correctly exports model labels in its metrics.
This test uses the --extra-engine-args flag with agg.yaml configuration
to start the backend.
The test runs from the trtllm directory to access engine_configs/agg.yaml
"""
import re
import subprocess
import threading
import requests
logger = logging.getLogger(request.node.name)
logger.info("Starting test_metrics_labels")
# Use the exact configuration that works for the user
model_path = "Qwen/Qwen3-0.6B"
served_model_name = "Qwen/Qwen3-0.6B"
agg_engine_args = "engine_configs/agg.yaml"
metrics_port = 8081
timeout = 60
# Calculate the path to the trtllm directory from the test file location
test_dir = os.path.dirname(os.path.abspath(__file__))
# Go up two levels from tests/serve/
project_root = os.path.dirname(os.path.dirname(test_dir))
working_directory = os.path.join(project_root, "components", "backends", "trtllm")
# Verify the engine config file exists
engine_config_path = os.path.join(working_directory, agg_engine_args)
if not os.path.exists(engine_config_path):
pytest.fail(f"Engine config file not found at: {engine_config_path}")
logger.info(f"Using engine config from: {engine_config_path}")
# Build command using the user's working command
command = [
"python3",
"-m",
"dynamo.trtllm",
"--model-path",
model_path,
"--served-model-name",
served_model_name,
"--extra-engine-args",
agg_engine_args,
"--max-seq-len",
"100",
"--max-num-tokens",
"100",
"--publish-events-and-metrics",
]
# Set environment for metrics
env = os.environ.copy()
env["DYN_SYSTEM_ENABLED"] = "true"
env["DYN_SYSTEM_PORT"] = str(metrics_port)
# Start the backend process
logger.info(f"Starting trtllm backend with model: {served_model_name}")
logger.info(f"Command: {' '.join(command)}")
logger.info(f"Working directory: {working_directory}")
process = subprocess.Popen(
command,
env=env,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
cwd=working_directory,
)
try:
# Start a thread to capture and log output
output_lines = []
def log_output():
if process.stdout is None:
logger.warning("Process stdout is None, cannot capture output")
return
for line in process.stdout:
line = line.strip()
if line:
output_lines.append(line)
logger.info(f"[TRTLLM] {line}")
output_thread = threading.Thread(target=log_output)
output_thread.daemon = True
output_thread.start()
# Wait for metrics endpoint to be ready
metrics_url = f"http://localhost:{metrics_port}/metrics"
start_time = time.time()
while time.time() - start_time < timeout:
# Check if process has died
if process.poll() is not None:
logger.error(f"Process exited with code: {process.returncode}")
logger.error("Last 20 output lines:\n" + "\n".join(output_lines[-20:]))
pytest.fail(
f"trtllm backend process died with exit code {process.returncode}"
)
try:
response = requests.get(metrics_url, timeout=5)
if response.status_code == 200:
logger.info("Metrics endpoint is ready")
break
except requests.RequestException as e:
logger.debug(f"Metrics not ready yet: {e}")
time.sleep(2)
else:
logger.error("Last 50 output lines:\n" + "\n".join(output_lines[-50:]))
pytest.fail(
f"Metrics endpoint did not become available within {timeout} seconds"
)
# Check that the metrics include the model label
response = requests.get(metrics_url)
assert response.status_code == 200, "Failed to fetch metrics"
metrics_text = response.text
logger.info(f"Metrics text: {metrics_text}")
# With the --extra-engine-args flag pointing to agg.yaml,
# the backend should be able to start properly and register endpoints.
# Let's check for the dynamo_component_requests_total metric with our model label.
# Parse the Prometheus metrics to find our label
pattern = rf'dynamo_component_requests_total\{{[^}}]*model="{re.escape(served_model_name)}"[^}}]*\}}\s+(\d+)'
matches = re.findall(pattern, metrics_text)
if matches:
initial_value = int(matches[0])
assert (
initial_value == 0
), f"Expected initial metric value to be 0, got {initial_value}"
else:
# Check if any dynamo_component metrics exist
if "dynamo_component" in metrics_text:
logger.info(
"✓ Metrics endpoint is working (found dynamo_component metrics)"
)
logger.warning(
"Note: dynamo_component_requests_total not found - likely because dummy engine didn't fully initialize"
)
logger.info("For complete testing, use a real pre-built TRT-LLM engine")
else:
pytest.fail("No dynamo_component metrics found at all")
finally:
# Clean up
logger.info("Terminating backend process")
process.terminate()
try:
process.wait(timeout=10)
except subprocess.TimeoutExpired:
process.kill()
process.wait()
@pytest.mark.e2e
@pytest.mark.gpu_1
@pytest.mark.trtllm_marker
......
......@@ -88,3 +88,34 @@ def completions_response_handler(response):
assert len(result["choices"]) > 0, "Empty choices in response"
assert "text" in result["choices"][0], "Missing 'text' in first choice"
return result["choices"][0]["text"]
def metrics_handler(response):
"""Handler to check if metrics endpoint is working and contains model label."""
import logging
import re
logger = logging.getLogger(__name__)
metrics_text = response.text
# Check for any model label in dynamo_component_requests_total metric
pattern = r'dynamo_component_requests_total\{[^}]*model="[^"]*"[^}]*\}\s+(\d+)'
matches = re.findall(pattern, metrics_text)
if not matches:
raise AssertionError(
"Metric 'dynamo_component_requests_total' with model label not found in metrics output"
)
# Since we send a request first, the counter should be > 0
for match in matches:
request_count = int(match)
if request_count > 0:
logger.info(
f"Found dynamo_component_requests_total with count: {request_count}"
)
return metrics_text
raise AssertionError(
"dynamo_component_requests_total exists but has count of 0 - request was not tracked"
)
......@@ -32,6 +32,23 @@ class EngineProcess(ManagedProcess):
except Exception:
return False
def get_metrics(self, port=8081):
"""Curl the metrics endpoint and return the response."""
import requests
metrics_url = f"http://localhost:{port}/metrics"
logger.info(f"Curling metrics endpoint: {metrics_url}")
try:
response = requests.get(metrics_url, timeout=10)
logger.info(
f"Metrics endpoint responded with status: {response.status_code}"
)
return response
except requests.RequestException as e:
logger.error(f"Failed to curl metrics endpoint: {e}")
raise
def send_request(
self, url: str, payload: Dict[str, Any], timeout: float = 30.0
) -> requests.Response:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment