Unverified Commit ef535edb authored by Tzu-Ling Kan's avatar Tzu-Ling Kan Committed by GitHub
Browse files

feat: Trtllm metric_labels. (#2666)

parent b9640e5c
......@@ -268,16 +268,20 @@ async def init(runtime: DistributedRuntime, config: Config):
kv_listener = runtime.namespace(config.namespace).component(
config.component
)
metrics_labels = [("model", config.served_model_name)]
async with get_publisher(
component,
engine,
kv_listener,
int(endpoint.lease_id()),
config.kv_block_size,
metrics_labels,
) as publisher:
handler_config.publisher = publisher
handler = RequestHandlerFactory().get_request_handler(handler_config)
await endpoint.serve_endpoint(handler.generate)
await endpoint.serve_endpoint(
handler.generate, metrics_labels=metrics_labels
)
else:
handler = RequestHandlerFactory().get_request_handler(handler_config)
await endpoint.serve_endpoint(handler.generate)
......
......@@ -111,13 +111,16 @@ class Publisher:
A class to retrieve stats and kv cache events from TRTLLM engine and publish them to the metrics and events publishers.
"""
def __init__(self, component, engine, kv_listener, worker_id, kv_block_size):
def __init__(
self, component, engine, kv_listener, worker_id, kv_block_size, metrics_labels
):
self.component = component
self.engine = engine
self.kv_listener = kv_listener
self.worker_id = worker_id
self.kv_block_size = kv_block_size
self.max_window_size = None
self.metrics_labels = metrics_labels
# The first few kv events from the model engine are always "created" type events.
# Use these events to capture the max_window_size of the model.
......@@ -140,7 +143,9 @@ class Publisher:
if self.metrics_publisher is None:
logging.error("KV metrics publisher not initialized!")
return
await self.metrics_publisher.create_endpoint(self.component)
await self.metrics_publisher.create_endpoint(
self.component, self.metrics_labels
)
def initialize(self):
# Setup the metrics publisher
......@@ -447,8 +452,12 @@ class Publisher:
@asynccontextmanager
async def get_publisher(component, engine, kv_listener, worker_id, kv_block_size):
publisher = Publisher(component, engine, kv_listener, worker_id, kv_block_size)
async def get_publisher(
component, engine, kv_listener, worker_id, kv_block_size, metrics_labels
):
publisher = Publisher(
component, engine, kv_listener, worker_id, kv_block_size, metrics_labels
)
try:
publisher.initialize()
yield publisher
......
......@@ -9,6 +9,7 @@ from typing import (
Dict,
List,
Optional,
Tuple,
Union,
)
......@@ -216,7 +217,7 @@ class Endpoint:
...
async def serve_endpoint(self, handler: RequestHandler, graceful_shutdown: bool = True) -> None:
async def serve_endpoint(self, handler: RequestHandler, graceful_shutdown: bool = True, metrics_labels: Optional[List[Tuple[str, str]]] = None) -> None:
"""
Serve an endpoint discoverable by all connected clients at
`{{ namespace }}/components/{{ component_name }}/endpoints/{{ endpoint_name }}`
......@@ -224,6 +225,7 @@ class Endpoint:
Args:
handler: The request handler function
graceful_shutdown: Whether to wait for inflight requests to complete during shutdown (default: True)
metrics_labels: Optional list of metrics labels to add to the metrics
"""
...
......@@ -438,7 +440,7 @@ class WorkerMetricsPublisher:
Create a `WorkerMetricsPublisher` object
"""
def create_endpoint(self, component: Component) -> None:
def create_endpoint(self, component: Component, metrics_labels: Optional[List[Tuple[str, str]]] = None) -> None:
"""
Similar to Component.create_service, but only service created through
this method will interact with KV router of the same component.
......
......@@ -174,3 +174,164 @@ def test_deployment(trtllm_config_test, request, runtime_services):
url, payload=request_body, timeout=config.timeout - elapsed
)
server_process.check_response(payload, response, response_handler)
@pytest.mark.e2e
@pytest.mark.gpu_1
@pytest.mark.trtllm_marker
@pytest.mark.slow
def test_metrics_labels(request, runtime_services):
"""
Test that the trtllm backend correctly exports model labels in its metrics.
This test uses the --extra-engine-args flag with agg.yaml configuration
to start the backend without needing a pre-built TensorRT-LLM engine.
Prerequisites:
- etcd and NATS must be running (docker compose -f deploy/docker-compose.yml up -d)
- The test runs from the trtllm directory to access engine_configs/agg.yaml
"""
import os
import re
import subprocess
import threading
import requests
logger = logging.getLogger(request.node.name)
logger.info("Starting test_metrics_labels")
# Use the exact configuration that works for the user
model_path = "Qwen/Qwen3-0.6B"
served_model_name = "Qwen/Qwen3-0.6B"
agg_engine_args = "engine_configs/agg.yaml"
metrics_port = 8081
timeout = 60
# Change to the trtllm directory where engine_configs/agg.yaml exists
working_directory = os.path.abspath("components/backends/trtllm")
# Build command using the user's working command
command = [
"python3",
"-m",
"dynamo.trtllm",
"--model-path",
model_path,
"--served-model-name",
served_model_name,
"--extra-engine-args",
agg_engine_args,
"--max-seq-len",
"100",
"--max-num-tokens",
"100",
"--publish-events-and-metrics",
]
# Set environment for metrics
env = os.environ.copy()
env["DYN_SYSTEM_ENABLED"] = "true"
env["DYN_SYSTEM_PORT"] = str(metrics_port)
# Start the backend process
logger.info(f"Starting trtllm backend with model: {served_model_name}")
logger.info(f"Command: {' '.join(command)}")
logger.info(f"Working directory: {working_directory}")
process = subprocess.Popen(
command,
env=env,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
cwd=working_directory,
)
try:
# Start a thread to capture and log output
output_lines = []
def log_output():
if process.stdout is None:
logger.warning("Process stdout is None, cannot capture output")
return
for line in process.stdout:
line = line.strip()
if line:
output_lines.append(line)
logger.info(f"[TRTLLM] {line}")
output_thread = threading.Thread(target=log_output)
output_thread.daemon = True
output_thread.start()
# Wait for metrics endpoint to be ready
metrics_url = f"http://localhost:{metrics_port}/metrics"
start_time = time.time()
while time.time() - start_time < timeout:
# Check if process has died
if process.poll() is not None:
logger.error(f"Process exited with code: {process.returncode}")
logger.error("Last 20 output lines:\n" + "\n".join(output_lines[-20:]))
pytest.fail(
f"trtllm backend process died with exit code {process.returncode}"
)
try:
response = requests.get(metrics_url, timeout=5)
if response.status_code == 200:
logger.info("Metrics endpoint is ready")
break
except requests.RequestException as e:
logger.debug(f"Metrics not ready yet: {e}")
time.sleep(2)
else:
logger.error("Last 50 output lines:\n" + "\n".join(output_lines[-50:]))
pytest.fail(
f"Metrics endpoint did not become available within {timeout} seconds"
)
# Check that the metrics include the model label
response = requests.get(metrics_url)
assert response.status_code == 200, "Failed to fetch metrics"
metrics_text = response.text
logger.info(f"Metrics text: {metrics_text}")
# With the --extra-engine-args flag pointing to agg.yaml,
# the backend should be able to start properly and register endpoints.
# Let's check for the dynamo_component_requests_total metric with our model label.
# Parse the Prometheus metrics to find our label
pattern = rf'dynamo_component_requests_total\{{[^}}]*model="{re.escape(served_model_name)}"[^}}]*\}}\s+(\d+)'
matches = re.findall(pattern, metrics_text)
if matches:
initial_value = int(matches[0])
assert (
initial_value == 0
), f"Expected initial metric value to be 0, got {initial_value}"
else:
# Check if any dynamo_component metrics exist
if "dynamo_component" in metrics_text:
logger.info(
"✓ Metrics endpoint is working (found dynamo_component metrics)"
)
logger.warning(
"Note: dynamo_component_requests_total not found - likely because dummy engine didn't fully initialize"
)
logger.info("For complete testing, use a real pre-built TRT-LLM engine")
else:
pytest.fail("No dynamo_component metrics found at all")
finally:
# Clean up
logger.info("Terminating backend process")
process.terminate()
try:
process.wait(timeout=10)
except subprocess.TimeoutExpired:
process.kill()
process.wait()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment