"docs/reference/support-matrix.md" did not exist on "ea391f38b7df20062db3b1e9c1fa91002e2c88f0"
Unverified Commit 9fa5450c authored by Keiven C's avatar Keiven C Committed by GitHub
Browse files

feat: add HTTP queue metrics for NIM frontend request tracking (#2914)


Signed-off-by: default avatarKeiven Chang <keivenchang@users.noreply.github.com>
parent 02a22cbc
......@@ -36,7 +36,7 @@ As of Q2 2025, Dynamo HTTP Frontend metrics are exposed when you build container
### Available Metrics
#### Component Metrics
#### Backend Component Metrics
The core Dynamo backend system automatically exposes metrics with the `dynamo_component_*` prefix for all components that use the `DistributedRuntime` framework:
......@@ -47,6 +47,19 @@ The core Dynamo backend system automatically exposes metrics with the `dynamo_co
- `dynamo_component_response_bytes_total`: Total bytes sent in responses (counter)
- `dynamo_component_system_uptime_seconds`: DistributedRuntime uptime (gauge)
#### KV Router Statistics (kvstats)
KV router statistics are automatically exposed by LLM workers and KV router components with the `dynamo_component_kvstats_*` prefix. These metrics provide insights into GPU memory usage and cache efficiency:
- `dynamo_component_kvstats_active_blocks`: Number of active KV cache blocks currently in use (gauge)
- `dynamo_component_kvstats_total_blocks`: Total number of KV cache blocks available (gauge)
- `dynamo_component_kvstats_gpu_cache_usage_percent`: GPU cache usage as a percentage (0.0-1.0) (gauge)
- `dynamo_component_kvstats_gpu_prefix_cache_hit_rate`: GPU prefix cache hit rate as a percentage (0.0-1.0) (gauge)
These metrics are published by:
- **LLM Workers**: vLLM and TRT-LLM backends publish these metrics through their respective publishers
- **KV Router**: The KV router component aggregates and exposes these metrics for load balancing decisions
#### Specialized Component Metrics
Some components expose additional metrics specific to their functionality:
......@@ -57,7 +70,8 @@ Some components expose additional metrics specific to their functionality:
When using Dynamo HTTP Frontend (`--framework VLLM` or `--framework TRTLLM`), these metrics are automatically exposed with the `dynamo_frontend_*` prefix and include `model` labels containing the model name:
- `dynamo_frontend_inflight_requests`: Inflight requests (gauge)
- `dynamo_frontend_inflight_requests_total`: Inflight requests (gauge)
- `dynamo_frontend_queued_requests_total`: Number of requests in HTTP processing queue (gauge)
- `dynamo_frontend_input_sequence_tokens`: Input sequence length (histogram)
- `dynamo_frontend_inter_token_latency_seconds`: Inter-token latency (histogram)
- `dynamo_frontend_output_sequence_tokens`: Output sequence length (histogram)
......@@ -65,6 +79,71 @@ When using Dynamo HTTP Frontend (`--framework VLLM` or `--framework TRTLLM`), th
- `dynamo_frontend_requests_total`: Total LLM requests (counter)
- `dynamo_frontend_time_to_first_token_seconds`: Time to first token (histogram)
**Note**: The `dynamo_frontend_inflight_requests_total` metric tracks requests from HTTP handler start until the complete response is finished, while `dynamo_frontend_queued_requests_total` tracks requests from HTTP handler start until first token generation begins (including prefill time). HTTP queue time is a subset of inflight time.
#### Request Processing Flow
This section explains the distinction between two key metrics used to track request processing:
1. **Inflight**: Tracks requests from HTTP handler start until the complete response is finished
2. **HTTP Queue**: Tracks requests from HTTP handler start until first token generation begins (including prefill time)
**Example Request Flow:**
```
curl -s localhost:8000/v1/completions -H "Content-Type: application/json" -d '{
"model": "Qwen/Qwen3-0.6B",
"prompt": "Hello let's talk about LLMs",
"stream": false,
"max_tokens": 1000
}'
```
**Timeline:**
```
Timeline: 0, 1, ...
Client ────> Frontend:8000 ────────────────────> Dynamo component/backend (vLLM, SGLang, TRT)
│request start │received │
| | |
│ ├──> start prefill ──> first token ──> |last token
│ │ (not impl) | |
├─────actual HTTP queue¹ ──────────┘ │ |
│ │ │
├─────implemented HTTP queue ─────────────────────────────┘ |
│ │
└─────────────────────────────────── Inflight ────────────────────────────┘
```
**Concurrency Example:**
Suppose the backend allows 3 concurrent requests and there are 10 clients continuously hitting the frontend:
- All 10 requests will be counted as inflight (from start until complete response)
- 7 requests will be in HTTP queue most of the time
- 3 requests will be actively processed (between first token and last token)
**Testing Setup:**
Try launching a frontend and a Mocker backend that allows 3 concurrent requests:
```bash
$ python -m dynamo.frontend --http-port 8000
$ python -m dynamo.mocker --model-path Qwen/Qwen3-0.6B --max-num-seqs 3
# Launch your 10 concurrent clients here
# Then check the queued_requests_total and inflight_requests_total metrics from the frontend:
$ curl -s localhost:8000/metrics|grep -v '^#'|grep -E 'queue|inflight'
dynamo_frontend_queued_requests_total{model="qwen/qwen3-0.6b"} 7
dynamo_frontend_inflight_requests_total{model="qwen/qwen3-0.6b"} 10
```
**Real setup using vLLM (instead of Mocker):**
```bash
$ python -m dynamo.vllm --model Qwen/Qwen3-0.6B \
--enforce-eager --no-enable-prefix-caching --max-num-seqs 3
```
**Key Differences:**
- **Inflight**: Measures total request lifetime including processing time
- **HTTP Queue**: Measures queuing time before processing begins (including prefill time)
- **HTTP Queue ≤ Inflight** (HTTP queue is a subset of inflight time)
¹ **TODO**: Implement the "actual" HTTP queue metric that tracks from request start until first token generation begins, rather than the current implementation that tracks until first token is received by the frontend
### Required Files
The following configuration files should be present in this directory:
......@@ -76,6 +155,35 @@ The following configuration files should be present in this directory:
- [grafana_dashboards/grafana-dcgm-metrics.json](./grafana_dashboards/grafana-dcgm-metrics.json): Contains Grafana dashboard configuration for DCGM GPU metrics
- [grafana_dashboards/grafana-llm-metrics.json](./grafana_dashboards/grafana-llm-metrics.json): This file, which is being phased out, contains the Grafana dashboard configuration for LLM-specific metrics. It requires an additional `metrics` component to operate concurrently. A new version is under development.
### Metric Name Constants
The [prometheus_names.rs](../../lib/runtime/src/metrics/prometheus_names.rs) module provides centralized Prometheus metric name constants and sanitization utilities for the Dynamo metrics system. This module ensures consistency across all components and prevents metric name duplication.
#### Key Features
- **Centralized Constants**: All Prometheus metric names are defined as constants to avoid duplication and typos
- **Automatic Sanitization**: Functions to sanitize metric and label names according to Prometheus naming rules
- **Component Organization**: Metric names are organized by component (frontend, work_handler, nats_client, etc.)
- **Validation Arrays**: Arrays of metric names for iteration and validation purposes
#### Metric Name Prefixes
- `dynamo_component_*`: Core component metrics (requests, latency, bytes, etc.)
- `dynamo_frontend_*`: Frontend service metrics (LLM HTTP service)
- `nats_client_*`: NATS client connection and message metrics
- `nats_service_*`: NATS service statistics metrics
- `kvstats_*`: KV cache statistics from LLM workers
#### Sanitization Functions
The module provides functions to ensure metric and label names comply with Prometheus naming conventions:
- `sanitize_prometheus_name()`: Sanitizes metric names (allows colons and `__`)
- `sanitize_prometheus_label()`: Sanitizes label names (no colons, no `__` prefix)
- `build_component_metric_name()`: Builds full component metric names with proper prefixing
This centralized approach ensures all Dynamo components use consistent, valid Prometheus metric names without manual coordination.
## Getting Started
### Prerequisites
......
......@@ -254,6 +254,15 @@ impl ModelManager {
.and_then(|config| config.tool_call_parser.clone())
.map(|parser| parser.to_string())
}
/// Creates parsing options with tool call parser and reasoning parser for the specified model.
/// Currently reasoning parser is not implemented (returns None).
pub fn get_parsing_options(&self, model: &str) -> crate::protocols::openai::ParsingOptions {
let tool_call_parser = self.get_model_tool_call_parser(model);
let reasoning_parser = None; // TODO: Implement reasoning parser
crate::protocols::openai::ParsingOptions::new(tool_call_parser, reasoning_parser)
}
}
pub struct ModelEngines<E> {
......
......@@ -21,7 +21,7 @@ use tokio::task::JoinHandle;
use tokio_stream::{Stream, StreamExt};
use tokio_util::sync::CancellationToken;
use crate::grpc::service::openai::{completion_response_stream, get_parsing_options};
use crate::grpc::service::openai::completion_response_stream;
use tonic::{Request, Response, Status, transport::Server};
use crate::protocols::openai::completions::{
......@@ -207,7 +207,7 @@ impl GrpcInferenceService for KserveService {
}
let model = completion_request.inner.model.clone();
let parsing_options = get_parsing_options(self.state.manager(), &model);
let parsing_options = self.state.manager.get_parsing_options(&model);
let stream = completion_response_stream(self.state_clone(), completion_request).await?;
......@@ -277,7 +277,7 @@ impl GrpcInferenceService for KserveService {
}
let model = completion_request.inner.model.clone();
let parsing_options = get_parsing_options(state.manager(), &model);
let parsing_options = state.manager.get_parsing_options(&model);
let streaming = completion_request.inner.stream.unwrap_or(false);
......
......@@ -9,10 +9,8 @@ use dynamo_runtime::{
use futures::{Stream, StreamExt, stream};
use std::sync::Arc;
use crate::discovery::ModelManager;
use crate::protocols::openai::{
ParsingOptions,
completions::{NvCreateCompletionRequest, NvCreateCompletionResponse},
use crate::protocols::openai::completions::{
NvCreateCompletionRequest, NvCreateCompletionResponse,
};
use crate::types::Annotated;
......@@ -21,9 +19,8 @@ use super::kserve;
// [gluo NOTE] These are common utilities that should be shared between frontends
use crate::http::service::{
disconnect::{ConnectionHandle, create_connection_monitor},
metrics::{Endpoint, ResponseMetricCollector},
metrics::{Endpoint, InflightGuard, process_response_and_observe_metrics},
};
use crate::{http::service::metrics::InflightGuard, preprocessor::LLMMetricAnnotation};
use tonic::Status;
......@@ -72,6 +69,8 @@ pub async fn completion_response_stream(
.get_completions_engine(model)
.map_err(|_| Status::not_found("model not found"))?;
let http_queue_guard = state.metrics_clone().create_http_queue_guard(model);
let inflight_guard =
state
.metrics_clone()
......@@ -112,9 +111,15 @@ pub async fn completion_response_stream(
// apply any annotations to the front of the stream
let stream = stream::iter(annotations).chain(stream);
// Tap on the stream to collect response metrics
// Tap on the stream to collect response metrics and handle http_queue_guard
let mut http_queue_guard = Some(http_queue_guard);
let stream = stream.inspect(move |response| {
process_metrics_only(response, &mut response_collector);
// Calls observe_response() on each token - drops http_queue_guard on first token
process_response_and_observe_metrics(
response,
&mut response_collector,
&mut http_queue_guard,
);
});
let stream = grpc_monitor_for_disconnects(stream, ctx, inflight_guard, stream_handle);
......@@ -166,18 +171,8 @@ pub fn grpc_monitor_for_disconnects<T>(
}
}
fn process_metrics_only<T>(
annotated: &Annotated<T>,
response_collector: &mut ResponseMetricCollector,
) {
// update metrics
if let Ok(Some(metrics)) = LLMMetricAnnotation::from_annotation(annotated) {
response_collector.observe_current_osl(metrics.output_tokens);
response_collector.observe_response(metrics.input_tokens, metrics.chunk_tokens);
}
}
/// Get the request ID from a primary source, or lastly create a new one if not present
// TODO: Similar function exists in lib/llm/src/http/service/openai.rs but with different signature and more complex logic (distributed tracing, headers)
fn get_or_create_request_id(primary: Option<&str>) -> String {
// Try to get the request ID from the primary source
if let Some(primary) = primary
......@@ -190,10 +185,3 @@ fn get_or_create_request_id(primary: Option<&str>) -> String {
let uuid = uuid::Uuid::new_v4();
uuid.to_string()
}
pub fn get_parsing_options(manager: &ModelManager, model: &str) -> ParsingOptions {
let tool_call_parser = manager.get_model_tool_call_parser(model);
let reasoning_parser = None; // TODO: Implement reasoning parser
ParsingOptions::new(tool_call_parser, reasoning_parser)
}
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
use axum::{Router, extract::State, http::StatusCode, response::IntoResponse, routing::get};
use axum::{
Router,
extract::State,
http::StatusCode,
response::{IntoResponse, sse::Event},
routing::get,
};
use dynamo_runtime::metrics::prometheus_names::{
frontend_service, name_prefix, sanitize_frontend_prometheus_prefix,
};
use prometheus::{Encoder, HistogramOpts, HistogramVec, IntCounterVec, IntGaugeVec, Opts};
use serde::Serialize;
use std::{
sync::Arc,
time::{Duration, Instant},
......@@ -19,6 +26,7 @@ pub struct Metrics {
request_counter: IntCounterVec,
inflight_gauge: IntGaugeVec,
client_disconnect_gauge: prometheus::IntGauge,
http_queue_gauge: IntGaugeVec,
request_duration: HistogramVec,
input_sequence_length: HistogramVec,
output_sequence_length: HistogramVec,
......@@ -26,6 +34,18 @@ pub struct Metrics {
inter_token_latency: HistogramVec,
}
// Inflight tracks requests from HTTP handler start until complete response is finished.
// HTTP queue tracks requests from HTTP handler start until first token generation begins (including prefill time).
// HTTP queue time is a subset of inflight time. For detailed explanation, see:
// deploy/metrics/README.md - "Request Processing Flow" section
/// RAII object for HTTP queue gauge
/// Tracks requests from HTTP handler start until metrics processing begins
pub struct HttpQueueGuard {
metrics: Arc<Metrics>,
model: String,
}
/// RAII object for inflight gauge and request counters
/// If this object is dropped without calling `mark_ok`, then the request will increment
/// the request counter with the `status` label with [`frontend_service::status::ERROR`]; otherwise, it will increment
......@@ -65,6 +85,7 @@ pub enum RequestType {
}
/// Status
#[derive(PartialEq)]
pub enum Status {
Success,
Error,
......@@ -127,7 +148,7 @@ impl Metrics {
let inflight_gauge = IntGaugeVec::new(
Opts::new(
frontend_metric_name(frontend_service::INFLIGHT_REQUESTS),
frontend_metric_name(frontend_service::INFLIGHT_REQUESTS_TOTAL),
"Number of inflight requests",
),
&["model"],
......@@ -140,6 +161,15 @@ impl Metrics {
)
.unwrap();
let http_queue_gauge = IntGaugeVec::new(
Opts::new(
frontend_metric_name(frontend_service::QUEUED_REQUESTS_TOTAL),
"Number of requests in HTTP processing queue",
),
&["model"],
)
.unwrap();
let buckets = vec![0.0, 1.0, 2.0, 4.0, 8.0, 16.0, 32.0, 64.0, 128.0, 256.0];
let request_duration = HistogramVec::new(
......@@ -206,6 +236,7 @@ impl Metrics {
request_counter,
inflight_gauge,
client_disconnect_gauge,
http_queue_gauge,
request_duration,
input_sequence_length,
output_sequence_length,
......@@ -281,10 +312,19 @@ impl Metrics {
self.client_disconnect_gauge.get()
}
fn inc_http_queue_gauge(&self, model: &str) {
self.http_queue_gauge.with_label_values(&[model]).inc()
}
fn dec_http_queue_gauge(&self, model: &str) {
self.http_queue_gauge.with_label_values(&[model]).dec()
}
pub fn register(&self, registry: &Registry) -> Result<(), prometheus::Error> {
registry.register(Box::new(self.request_counter.clone()))?;
registry.register(Box::new(self.inflight_gauge.clone()))?;
registry.register(Box::new(self.client_disconnect_gauge.clone()))?;
registry.register(Box::new(self.http_queue_gauge.clone()))?;
registry.register(Box::new(self.request_duration.clone()))?;
registry.register(Box::new(self.input_sequence_length.clone()))?;
registry.register(Box::new(self.output_sequence_length.clone()))?;
......@@ -298,6 +338,13 @@ impl Metrics {
///
/// The [`InflightGuard`] is an RAII object will handle incrementing the inflight gauge and
/// request counters.
///
/// # Metrics Distinction
///
/// This method creates an inflight guard t tracks requests actively being processed by the LLM engine.
/// This is distinct from [`HttpQueueGuard`] which tracks requests from HTTP handler start until
/// first token generation (including prefill time). The separation allows monitoring both HTTP processing queue time
/// and actual LLM processing time.
pub fn create_inflight_guard(
self: Arc<Self>,
model: &str,
......@@ -322,6 +369,30 @@ impl Metrics {
pub fn create_response_collector(self: Arc<Self>, model: &str) -> ResponseMetricCollector {
ResponseMetricCollector::new(self, model.to_string().to_lowercase())
}
/// Create a new [`HttpQueueGuard`] for tracking HTTP processing queue
///
/// This guard tracks requests from HTTP handler start until first token generation,
/// providing visibility into HTTP processing queue time before actual LLM processing begins.
pub fn create_http_queue_guard(self: Arc<Self>, model: &str) -> HttpQueueGuard {
HttpQueueGuard::new(self, model.to_string().to_lowercase())
}
}
impl HttpQueueGuard {
fn new(metrics: Arc<Metrics>, model: String) -> Self {
// Increment the HTTP queue gauge when the guard is created
metrics.inc_http_queue_gauge(&model);
HttpQueueGuard { metrics, model }
}
}
impl Drop for HttpQueueGuard {
fn drop(&mut self) {
// Decrement the HTTP queue gauge when the guard is dropped
self.metrics.dec_http_queue_gauge(&self.model);
}
}
impl InflightGuard {
......@@ -355,6 +426,8 @@ impl InflightGuard {
impl Drop for InflightGuard {
fn drop(&mut self) {
let duration = self.timer.elapsed().as_secs_f64();
// Decrement the gauge when the guard is dropped
self.metrics.dec_inflight_gauge(&self.model);
......@@ -372,7 +445,7 @@ impl Drop for InflightGuard {
self.metrics
.request_duration
.with_label_values(&[&self.model])
.observe(self.timer.elapsed().as_secs_f64());
.observe(duration);
}
}
......@@ -433,6 +506,11 @@ impl ResponseMetricCollector {
self.osl = osl;
}
/// Check if this will be the first token (before calling observe_response)
pub fn is_first_token(&self) -> bool {
self.is_first_token
}
/// Observe a response with input sequence length and number of new tokens
pub fn observe_response(&mut self, isl: usize, num_tokens: usize) {
if num_tokens == 0 {
......@@ -486,6 +564,103 @@ impl Drop for ResponseMetricCollector {
}
}
/// Process streaming metrics for annotated responses
///
/// This function handles metrics collection and http_queue_guard management for streaming responses.
/// It observes the current output sequence length, drops the http_queue_guard on the first token,
/// and records response metrics.
pub fn process_response_and_observe_metrics<T>(
annotated: &crate::types::Annotated<T>,
response_collector: &mut ResponseMetricCollector,
http_queue_guard: &mut Option<HttpQueueGuard>,
) {
use crate::preprocessor::LLMMetricAnnotation;
// update metrics
if let Ok(Some(metrics)) = LLMMetricAnnotation::from_annotation(annotated) {
response_collector.observe_current_osl(metrics.output_tokens);
// Drop http_queue_guard on first token for non-streaming (same as streaming)
if response_collector.is_first_token()
&& metrics.chunk_tokens > 0
&& let Some(guard) = http_queue_guard.take()
{
drop(guard);
}
response_collector.observe_response(metrics.input_tokens, metrics.chunk_tokens);
}
}
/// Event converter wrapper for streaming responses
pub struct EventConverter<T>(pub crate::types::Annotated<T>);
impl<T> From<crate::types::Annotated<T>> for EventConverter<T> {
fn from(annotated: crate::types::Annotated<T>) -> Self {
EventConverter(annotated)
}
}
/// Process streaming response with event conversion for SSE
///
/// This function handles metrics collection, http_queue_guard management, and converts
/// annotated responses to SSE events for streaming responses.
pub fn process_response_using_event_converter_and_observe_metrics<T: Serialize>(
annotated: EventConverter<T>,
response_collector: &mut ResponseMetricCollector,
http_queue_guard: &mut Option<HttpQueueGuard>,
) -> Result<Event, axum::Error> {
use crate::preprocessor::LLMMetricAnnotation;
let mut annotated = annotated.0;
// update metrics
if let Ok(Some(metrics)) = LLMMetricAnnotation::from_annotation(&annotated) {
response_collector.observe_current_osl(metrics.output_tokens);
// Drop http_queue_guard on first token for streaming
if response_collector.is_first_token()
&& metrics.chunk_tokens > 0
&& let Some(guard) = http_queue_guard.take()
{
drop(guard);
}
response_collector.observe_response(metrics.input_tokens, metrics.chunk_tokens);
// Chomp the LLMMetricAnnotation so it's not returned in the response stream
// TODO: add a flag to control what is returned in the SSE stream
if annotated.event.as_deref() == Some(crate::preprocessor::ANNOTATION_LLM_METRICS) {
annotated.event = None;
annotated.comment = None;
}
}
let mut event = Event::default();
if let Some(data) = annotated.data {
event = event.json_data(data)?;
}
if let Some(msg) = annotated.event {
if msg == "error" {
let msgs = annotated
.comment
.unwrap_or_else(|| vec!["unspecified error".to_string()]);
return Err(axum::Error::new(msgs.join(" -- ")));
}
event = event.event(msg);
}
if let Some(comments) = annotated.comment {
for comment in comments {
event = event.comment(comment);
}
}
Ok(event)
}
/// Create a new router with the given path
pub fn router(registry: Registry, path: Option<String>) -> (Vec<RouteDoc>, Router) {
let registry = Arc::new(registry);
......
......@@ -13,7 +13,7 @@ use axum::{
http::{HeaderMap, StatusCode},
response::{
IntoResponse, Response,
sse::{Event, KeepAlive, Sse},
sse::{KeepAlive, Sse},
},
routing::{get, post},
};
......@@ -28,13 +28,15 @@ use super::{
RouteDoc,
disconnect::{ConnectionHandle, create_connection_monitor, monitor_for_disconnects},
error::HttpError,
metrics::{Endpoint, ResponseMetricCollector},
metrics::{
Endpoint, EventConverter, process_response_and_observe_metrics,
process_response_using_event_converter_and_observe_metrics,
},
service_v2,
};
use crate::engines::ValidateRequest;
use crate::protocols::openai::chat_completions::aggregator::ChatCompletionAggregator;
use crate::protocols::openai::{
ParsingOptions,
chat_completions::{NvCreateChatCompletionRequest, NvCreateChatCompletionResponse},
completions::{NvCreateCompletionRequest, NvCreateCompletionResponse},
embeddings::{NvCreateEmbeddingRequest, NvCreateEmbeddingResponse},
......@@ -42,7 +44,6 @@ use crate::protocols::openai::{
};
use crate::request_template::RequestTemplate;
use crate::types::Annotated;
use crate::{discovery::ModelManager, preprocessor::LLMMetricAnnotation};
use dynamo_runtime::logging::get_distributed_tracing_context;
use tracing::Instrument;
......@@ -165,6 +166,7 @@ impl From<HttpError> for ErrorMessage {
}
/// Get the request ID from a primary source, or next from the headers, or lastly create a new one if not present
// TODO: Similar function exists in lib/llm/src/grpc/service/openai.rs but with different signature and simpler logic
fn get_or_create_request_id(primary: Option<&str>, headers: &HeaderMap) -> String {
// Try to get request id from trace context
if let Some(trace_context) = get_distributed_tracing_context()
......@@ -196,13 +198,6 @@ fn get_or_create_request_id(primary: Option<&str>, headers: &HeaderMap) -> Strin
uuid.to_string()
}
fn get_parsing_options(manager: &ModelManager, model: &str) -> ParsingOptions {
let tool_call_parser = manager.get_model_tool_call_parser(model);
let reasoning_parser = None; // TODO: Implement reasoning parser
ParsingOptions::new(tool_call_parser, reasoning_parser)
}
/// OpenAI Completions Request Handler
///
/// This method will handle the incoming request for the `/v1/completions endpoint`. The endpoint is a "source"
......@@ -262,34 +257,37 @@ async fn completions(
// todo - decide on default
let streaming = request.inner.stream.unwrap_or(false);
// todo - make the protocols be optional for model name
// todo - when optional, if none, apply a default
let model = request.inner.model.clone();
// Create http_queue_guard early - tracks time waiting to be processed
let http_queue_guard = state.metrics_clone().create_http_queue_guard(&model);
// update the request to always stream
let request = request.map(|mut req| {
req.inner.stream = Some(true);
req
});
// todo - make the protocols be optional for model name
// todo - when optional, if none, apply a default
let model = &request.inner.model;
// todo - error handling should be more robust
let engine = state
.manager()
.get_completions_engine(model)
.get_completions_engine(&model)
.map_err(|_| ErrorMessage::model_not_found())?;
let parsing_options = get_parsing_options(state.manager(), model);
let parsing_options = state.manager().get_parsing_options(&model);
let mut inflight_guard =
state
.metrics_clone()
.create_inflight_guard(model, Endpoint::Completions, streaming);
let mut response_collector = state.metrics_clone().create_response_collector(model);
let mut response_collector = state.metrics_clone().create_response_collector(&model);
// prepare to process any annotations
let annotations = request.annotations();
// Create inflight_guard before calling engine to ensure errors are counted
let mut inflight_guard =
state
.metrics_clone()
.create_inflight_guard(&model, Endpoint::Completions, streaming);
// issue the generate call on the engine
let stream = engine
.generate(request)
......@@ -320,8 +318,15 @@ async fn completions(
let stream = stream::iter(annotations).chain(stream);
if streaming {
// For streaming, we'll drop the http_queue_guard on the first token
let mut http_queue_guard = Some(http_queue_guard);
let stream = stream.map(move |response| {
process_event_converter(EventConverter::from(response), &mut response_collector)
// Calls observe_response() on each token
process_response_using_event_converter_and_observe_metrics(
EventConverter::from(response),
&mut response_collector,
&mut http_queue_guard,
)
});
let stream = monitor_for_disconnects(stream, ctx, inflight_guard, stream_handle);
......@@ -334,8 +339,14 @@ async fn completions(
Ok(sse_stream.into_response())
} else {
// Tap the stream to collect metrics for non-streaming requests without altering items
let mut http_queue_guard = Some(http_queue_guard);
let stream = stream.inspect(move |response| {
process_metrics_only(response, &mut response_collector);
// Calls observe_response() on each token - drops http_queue_guard on first token
process_response_and_observe_metrics(
response,
&mut response_collector,
&mut http_queue_guard,
);
});
let response = NvCreateCompletionResponse::from_annotated_stream(stream, parsing_options)
......@@ -374,6 +385,9 @@ async fn embeddings(
// todo - when optional, if none, apply a default
let model = &request.inner.model;
// Create http_queue_guard early - tracks time waiting to be processed
let http_queue_guard = state.metrics_clone().create_http_queue_guard(model);
// todo - error handling should be more robust
let engine = state
.manager()
......@@ -386,12 +400,25 @@ async fn embeddings(
.metrics_clone()
.create_inflight_guard(model, Endpoint::Embeddings, streaming);
let mut response_collector = state.metrics_clone().create_response_collector(model);
// issue the generate call on the engine
let stream = engine
.generate(request)
.await
.map_err(|e| ErrorMessage::from_anyhow(e, "Failed to generate embeddings"))?;
// Process stream to collect metrics and drop http_queue_guard on first token
let mut http_queue_guard = Some(http_queue_guard);
let stream = stream.inspect(move |response| {
// Calls observe_response() on each token - drops http_queue_guard on first token
process_response_and_observe_metrics(
response,
&mut response_collector,
&mut http_queue_guard,
);
});
// Embeddings are typically returned as a single response (non-streaming)
// so we fold the stream into a single response
let response = NvCreateEmbeddingResponse::from_annotated_stream(stream)
......@@ -499,27 +526,30 @@ async fn chat_completions(
// todo - make the protocols be optional for model name
// todo - when optional, if none, apply a default
let model = &request.inner.model;
// todo - determine the proper error code for when a request model is not present
let model = request.inner.model.clone();
// Create HTTP queue guard after template resolution so labels are correct
let http_queue_guard = state.metrics_clone().create_http_queue_guard(&model);
tracing::trace!("Getting chat completions engine for model: {}", model);
let engine = state
.manager()
.get_chat_completions_engine(model)
.get_chat_completions_engine(&model)
.map_err(|_| ErrorMessage::model_not_found())?;
let parsing_options = get_parsing_options(state.manager(), model);
let parsing_options = state.manager().get_parsing_options(&model);
let mut response_collector = state.metrics_clone().create_response_collector(&model);
let annotations = request.annotations();
// Create inflight_guard before calling engine to ensure errors are counted
let mut inflight_guard =
state
.metrics_clone()
.create_inflight_guard(model, Endpoint::ChatCompletions, streaming);
let mut response_collector = state.metrics_clone().create_response_collector(model);
tracing::trace!("Issuing generate call for chat completions");
let annotations = request.annotations();
.create_inflight_guard(&model, Endpoint::ChatCompletions, streaming);
// issue the generate call on the engine
let stream = engine
......@@ -551,10 +581,16 @@ async fn chat_completions(
// note - we might do this as part of the post processing set to make it more generic
if streaming {
stream_handle.arm();
stream_handle.arm(); // allows the system to detect client disconnects and cancel the LLM generation
let mut http_queue_guard = Some(http_queue_guard);
let stream = stream.map(move |response| {
process_event_converter(EventConverter::from(response), &mut response_collector)
// Calls observe_response() on each token
process_response_using_event_converter_and_observe_metrics(
EventConverter::from(response),
&mut response_collector,
&mut http_queue_guard,
)
});
let stream = monitor_for_disconnects(stream, ctx, inflight_guard, stream_handle);
......@@ -566,8 +602,14 @@ async fn chat_completions(
Ok(sse_stream.into_response())
} else {
let mut http_queue_guard = Some(http_queue_guard);
let stream = stream.inspect(move |response| {
process_metrics_only(response, &mut response_collector);
// Calls observe_response() on each token - drops http_queue_guard on first token
process_response_and_observe_metrics(
response,
&mut response_collector,
&mut http_queue_guard,
);
});
let response =
......@@ -711,6 +753,10 @@ async fn responses(
// return a 503 if the service is not ready
check_ready(&state)?;
// Create http_queue_guard early - tracks time waiting to be processed
let model = request.inner.model.clone();
let http_queue_guard = state.metrics_clone().create_http_queue_guard(&model);
// Handle unsupported fields - if Some(resp) is returned by validate_unsupported_fields,
// then a field was used that is unsupported. We will log an error message
// and early return a 501 NOT_IMPLEMENTED status code. Otherwise, proceeed.
......@@ -760,23 +806,16 @@ async fn responses(
request
});
let model = &request.inner.model;
tracing::trace!("Getting chat completions engine for model: {}", model);
let engine = state
.manager()
.get_chat_completions_engine(model)
.get_chat_completions_engine(&model)
.map_err(|_| ErrorMessage::model_not_found())?;
let parsing_options = get_parsing_options(state.manager(), model);
let parsing_options = state.manager().get_parsing_options(&model);
let mut inflight_guard =
state
.metrics_clone()
.create_inflight_guard(model, Endpoint::Responses, false);
let _response_collector = state.metrics_clone().create_response_collector(model);
let mut response_collector = state.metrics_clone().create_response_collector(&model);
tracing::trace!("Issuing generate call for chat completions");
......@@ -786,6 +825,23 @@ async fn responses(
.await
.map_err(|e| ErrorMessage::from_anyhow(e, "Failed to generate completions"))?;
// Create inflight_guard now that actual processing has begun
let mut inflight_guard =
state
.metrics_clone()
.create_inflight_guard(&model, Endpoint::Responses, false);
// Process stream to collect metrics and drop http_queue_guard on first token
let mut http_queue_guard = Some(http_queue_guard);
let stream = stream.inspect(move |response| {
// Calls observe_response() on each token - drops http_queue_guard on first token
process_response_and_observe_metrics(
response,
&mut response_collector,
&mut http_queue_guard,
);
});
// TODO: handle streaming, currently just unary
let response =
NvCreateChatCompletionResponse::from_annotated_stream(stream, parsing_options.clone())
......@@ -988,69 +1044,6 @@ struct ModelListing {
owned_by: String,
}
struct EventConverter<T>(Annotated<T>);
impl<T> From<Annotated<T>> for EventConverter<T> {
fn from(annotated: Annotated<T>) -> Self {
EventConverter(annotated)
}
}
fn process_metrics_only<T>(
annotated: &Annotated<T>,
response_collector: &mut ResponseMetricCollector,
) {
// update metrics
if let Ok(Some(metrics)) = LLMMetricAnnotation::from_annotation(annotated) {
response_collector.observe_current_osl(metrics.output_tokens);
response_collector.observe_response(metrics.input_tokens, metrics.chunk_tokens);
}
}
fn process_event_converter<T: Serialize>(
annotated: EventConverter<T>,
response_collector: &mut ResponseMetricCollector,
) -> Result<Event, axum::Error> {
let mut annotated = annotated.0;
// update metrics
if let Ok(Some(metrics)) = LLMMetricAnnotation::from_annotation(&annotated) {
response_collector.observe_current_osl(metrics.output_tokens);
response_collector.observe_response(metrics.input_tokens, metrics.chunk_tokens);
// Chomp the LLMMetricAnnotation so it's not returned in the response stream
// TODO: add a flag to control what is returned in the SSE stream
if annotated.event.as_deref() == Some(crate::preprocessor::ANNOTATION_LLM_METRICS) {
annotated.event = None;
annotated.comment = None;
}
}
let mut event = Event::default();
if let Some(data) = annotated.data {
event = event.json_data(data)?;
}
if let Some(msg) = annotated.event {
if msg == "error" {
let msgs = annotated
.comment
.unwrap_or_else(|| vec!["unspecified error".to_string()]);
return Err(axum::Error::new(msgs.join(" -- ")));
}
event = event.event(msg);
}
if let Some(comments) = annotated.comment {
for comment in comments {
event = event.comment(comment);
}
}
Ok(event)
}
/// Create an Axum [`Router`] for the OpenAI API Completions endpoint
/// If not path is provided, the default path is `/v1/completions`
pub fn completions_router(
......
......@@ -6,6 +6,34 @@
//! This module provides centralized Prometheus metric name constants and sanitization functions
//! for various components to ensure consistency and avoid duplication across the codebase.
//!
//! ## Naming Conventions
//!
//! All metric names should follow: `{prefix}_{name}_{suffix}`
//!
//! **Prefix**: Component identifier (`dynamo_component_`, `dynamo_frontend_`, etc.)
//! **Name**: Descriptive snake_case name indicating what is measured
//! **Suffix**:
//! - Units: `_seconds`, `_bytes`, `_ms`, `_percent`
//! - Counters: `_total` (not `total_` prefix)
//! - Note: Do not use `_counter`, `_gauge`, `_time`, or `_size` in Prometheus names (too vague)
//!
//! **Common Transformations**:
//! - ❌ `_counter` → ✅ `_total`
//! - ❌ `_time` → ✅ `_seconds`, `_ms`, `_hours`, `_duration_seconds`
//! - ❌ `_size` → ✅ `_bytes`, `_total`, `_length`
//! - ❌ `_gauge` → ✅ (no suffix needed for current values)
//! - ❌ `_rate` → ✅ `_per_second`, `_per_minute`
//!
//! **Examples**:
//! - ✅ `dynamo_frontend_requests_total` - Total request counter (not `incoming_requests`)
//! - ✅ `dynamo_frontend_request_duration_seconds` - Request duration histogram (not `response_time`)
//! - ✅ `dynamo_component_errors_total` - Total error counter (not `total_errors`)
//! - ✅ `dynamo_component_memory_usage_bytes` - Memory usage gauge
//! - ✅ `dynamo_frontend_inflight_requests_total` - Current inflight requests gauge
//! - ✅ `nats_client_connection_duration_ms` - Connection time in milliseconds
//! - ✅ `dynamo_component_cpu_usage_percent` - CPU usage percentage
//! - ✅ `dynamo_frontend_tokens_per_second` - Token generation rate
//!
//! ## Key Differences: Prometheus Metric Names vs Prometheus Label Names
//!
//! **Metric names**: Allow colons and `__` anywhere. **Label names**: No colons, no `__` prefix.
......@@ -45,8 +73,11 @@ pub mod frontend_service {
/// Total number of LLM requests processed
pub const REQUESTS_TOTAL: &str = "requests_total";
/// Number of inflight requests
pub const INFLIGHT_REQUESTS: &str = "inflight_requests";
/// Number of requests waiting in HTTP queue before receiving the first response.
pub const QUEUED_REQUESTS_TOTAL: &str = "queued_requests_total";
/// Number of inflight requests going to the engine (vLLM, SGLang, ...)
pub const INFLIGHT_REQUESTS_TOTAL: &str = "inflight_requests_total";
/// Duration of LLM requests
pub const REQUEST_DURATION_SECONDS: &str = "request_duration_seconds";
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment