Unverified Commit bad5d122 authored by Tzu-Ling Kan's avatar Tzu-Ling Kan Committed by GitHub
Browse files

feat: Sglang metrics labels. (#2679)

parent 91a459c0
......@@ -65,7 +65,7 @@ async def init(runtime: DistributedRuntime, config: Config):
.client()
)
publisher, metrics_task = await setup_sgl_metrics(engine, component)
publisher, metrics_task, metrics_labels = await setup_sgl_metrics(engine, component)
kv_publisher = None
if server_args.kv_events_config:
......@@ -116,7 +116,9 @@ async def init(runtime: DistributedRuntime, config: Config):
# Start endpoint immediately and register model concurrently
# Requests queue until ready_event is set
await asyncio.gather(
generate_endpoint.serve_endpoint(gated_generate, graceful_shutdown=False),
generate_endpoint.serve_endpoint(
handler.generate, graceful_shutdown=False, metrics_labels=metrics_labels
),
register_model(),
)
except Exception as e:
......@@ -146,7 +148,13 @@ async def init_prefill(runtime: DistributedRuntime, config: Config):
handler = PrefillWorkerHandler(component, engine, config)
tasks = [generate_endpoint.serve_endpoint(handler.generate, graceful_shutdown=True)]
tasks = [
generate_endpoint.serve_endpoint(
handler.generate,
graceful_shutdown=True,
metrics_labels=[("model", server_args.served_model_name)],
)
]
try:
await asyncio.gather(*tasks)
......
......@@ -3,7 +3,7 @@
import asyncio
import logging
from typing import Optional
from typing import List, Optional, Tuple
import sglang as sgl
import zmq
......@@ -25,10 +25,15 @@ class DynamoSglangStatPublisher:
Handles SGLang metrics reception and publishing.
"""
def __init__(self, engine: sgl.Engine, component: Component) -> None:
def __init__(
self,
engine: sgl.Engine,
component: Component,
metrics_labels: Optional[List[Tuple[str, str]]] = None,
) -> None:
self.engine = engine
self.inner = WorkerMetricsPublisher()
self.inner.create_endpoint(component)
self.inner.create_endpoint(component, metrics_labels)
# Set default values (can be overridden later if needed)
self.request_total_slots = 1024
......@@ -127,13 +132,14 @@ class DynamoSglangStatPublisher:
async def setup_sgl_metrics(
engine: sgl.Engine,
component: Component,
) -> tuple[DynamoSglangStatPublisher, asyncio.Task]:
) -> tuple[DynamoSglangStatPublisher, asyncio.Task, list[tuple[str, str]]]:
"""
Convenience bootstrap: create endpoint, publish an initial update, and start the metrics loop.
"""
publisher = DynamoSglangStatPublisher(engine, component)
metrics_labels = [("model", engine.server_args.served_model_name)]
publisher = DynamoSglangStatPublisher(engine, component, metrics_labels)
publisher.init_publish()
task = asyncio.create_task(publisher.run())
logging.info("SGLang metrics loop started")
return publisher, task
return publisher, task, metrics_labels
......@@ -4,6 +4,7 @@
import logging
import os
import re
import time
from dataclasses import dataclass
from typing import Any, List
......@@ -209,6 +210,82 @@ def test_sglang_deployment(request, runtime_services, sglang_config_test):
logger.info(f"SGLang completions response: {text}")
@pytest.mark.e2e
@pytest.mark.gpu_1
@pytest.mark.sglang
@pytest.mark.slow
def test_metrics_labels(request, runtime_services):
"""
Test that the sglang backend correctly exports model labels in its metrics.
This test verifies that the model name appears as a label in the Prometheus metrics.
"""
logger.info("Starting test_metrics_labels for sglang backend")
# Configuration
model_path = "Qwen/Qwen3-0.6B"
metrics_port = 8081
# Build command to start sglang backend with metrics enabled
command = [
"python3",
"-m",
"dynamo.sglang",
"--model-path",
model_path,
"--mem-fraction-static",
"0.4", # Limit memory usage for testing
]
# Set environment for metrics
env = os.environ.copy()
env["DYN_SYSTEM_ENABLED"] = "true"
env["DYN_SYSTEM_PORT"] = str(metrics_port)
# Use ManagedProcess for consistent process management
with ManagedProcess(
command=command,
env=env,
timeout=120,
display_output=True,
health_check_urls=[
(f"http://localhost:{metrics_port}/metrics", lambda r: r.status_code == 200)
],
delayed_start=30, # Give SGLang time to initialize
):
# Give the backend a moment to fully initialize metrics
time.sleep(2)
# Fetch and verify metrics
logger.info("Fetching metrics to verify model label...")
response = requests.get(f"http://localhost:{metrics_port}/metrics", timeout=10)
assert response.status_code == 200, "Failed to fetch metrics"
metrics_text = response.text
logger.info(f"Metrics text: {metrics_text}")
# Parse the Prometheus metrics to find our label
pattern = rf'dynamo_component_requests_total\{{[^}}]*model="{re.escape(model_path)}"[^}}]*\}}\s+(\d+)'
matches = re.findall(pattern, metrics_text)
if matches:
initial_value = int(matches[0])
assert (
initial_value == 0
), f"Expected initial metric value to be 0, got {initial_value}"
else:
# Check if any dynamo_component metrics exist
if "dynamo_component" in metrics_text:
logger.info(
"✓ Metrics endpoint is working (found dynamo_component metrics)"
)
logger.warning(
"Note: dynamo_component_requests_total not found - likely because the engine didn't fully initialize"
)
logger.info("For complete testing, use a real pre-built TRT-LLM engine")
else:
pytest.fail("No dynamo_component metrics found at all")
@pytest.mark.skip(
reason="Requires 4 GPUs - enable when hardware is consistently available"
)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment