Unverified Commit 5fe0476e authored by Tanmay Verma's avatar Tanmay Verma Committed by GitHub
Browse files

feat: Add metrics endpoint to kserve gRPC service (#4400)

parent 826eea05
...@@ -81,9 +81,18 @@ pub async fn run( ...@@ -81,9 +81,18 @@ pub async fn run(
grpc_service grpc_service
} }
}; };
grpc_service
.run(distributed_runtime.primary_token()) // Run both HTTP (for metrics) and gRPC servers concurrently
.await?; let http_service = grpc_service.http_service().clone();
let shutdown_token = distributed_runtime.primary_token();
// Wait for both servers to complete, propagating the first error if any occurs
// Both tasks should run indefinitely until cancelled by the shutdown token
tokio::try_join!(
grpc_service.run(shutdown_token.clone()),
http_service.run(shutdown_token)
)?;
distributed_runtime.shutdown(); // Cancel primary token distributed_runtime.shutdown(); // Cancel primary token
Ok(()) Ok(())
} }
......
...@@ -8,7 +8,7 @@ use crate::grpc::service::kserve::inference::DataType; ...@@ -8,7 +8,7 @@ use crate::grpc::service::kserve::inference::DataType;
use crate::grpc::service::kserve::inference::ModelInput; use crate::grpc::service::kserve::inference::ModelInput;
use crate::grpc::service::kserve::inference::ModelOutput; use crate::grpc::service::kserve::inference::ModelOutput;
use crate::http::service::Metrics; use crate::http::service::Metrics;
use crate::http::service::metrics; use crate::http::service::service_v2 as http_service;
use crate::discovery::ModelManager; use crate::discovery::ModelManager;
use crate::local_model::runtime_config::ModelRuntimeConfig; use crate::local_model::runtime_config::ModelRuntimeConfig;
...@@ -42,20 +42,29 @@ use inference::{ ...@@ -42,20 +42,29 @@ use inference::{
use prost::Message; use prost::Message;
/// [gluo TODO] 'metrics' are for HTTP service and there is HTTP endpoint /// gRPC service state - shares metrics with HTTP service for unified metrics collection
/// for it as part of HTTP service. Should we always start HTTP service up
/// for non-inference?
pub struct State { pub struct State {
metrics: Arc<Metrics>, metrics: Arc<Metrics>,
manager: Arc<ModelManager>, manager: Arc<ModelManager>,
} }
#[derive(Default, Builder)]
#[builder(
pattern = "owned",
build_fn(private, name = "build_internal"),
name = "StateBuilder",
vis = "pub"
)]
pub(crate) struct StateConfig {
#[builder(default, setter(strip_option))]
metrics: Option<Arc<Metrics>>,
#[builder(default, setter(strip_option))]
manager: Option<Arc<ModelManager>>,
}
impl State { impl State {
pub fn new(manager: Arc<ModelManager>) -> Self { pub fn builder() -> StateBuilder {
Self { StateBuilder::default()
manager,
metrics: Arc::new(Metrics::default()),
}
} }
/// Get the Prometheus [`Metrics`] object which tracks request counts and inflight requests /// Get the Prometheus [`Metrics`] object which tracks request counts and inflight requests
...@@ -76,11 +85,29 @@ impl State { ...@@ -76,11 +85,29 @@ impl State {
} }
} }
impl StateBuilder {
pub fn build(self) -> Result<State, anyhow::Error> {
let config = self.build_internal()?;
Ok(State {
manager: config
.manager
.unwrap_or_else(|| Arc::new(ModelManager::new())),
metrics: config
.metrics
.unwrap_or_else(|| Arc::new(Metrics::default())),
})
}
}
#[derive(Clone)] #[derive(Clone)]
pub struct KserveService { pub struct KserveService {
// The state we share with every request handler // The state we share with every request handler
state: Arc<State>, state: Arc<State>,
// HTTP service for metrics endpoint
http_service: http_service::HttpService,
port: u16, port: u16,
host: String, host: String,
request_template: Option<RequestTemplate>, request_template: Option<RequestTemplate>,
...@@ -97,6 +124,12 @@ pub struct KserveServiceConfig { ...@@ -97,6 +124,12 @@ pub struct KserveServiceConfig {
#[builder(default = "None")] #[builder(default = "None")]
request_template: Option<RequestTemplate>, request_template: Option<RequestTemplate>,
#[builder(default = "8788")]
http_metrics_port: u16,
#[builder(setter(into), default = "String::from(\"0.0.0.0\")")]
http_metrics_host: String,
} }
impl KserveService { impl KserveService {
...@@ -116,6 +149,10 @@ impl KserveService { ...@@ -116,6 +149,10 @@ impl KserveService {
self.state().manager() self.state().manager()
} }
pub fn http_service(&self) -> &http_service::HttpService {
&self.http_service
}
pub async fn spawn(&self, cancel_token: CancellationToken) -> JoinHandle<Result<()>> { pub async fn spawn(&self, cancel_token: CancellationToken) -> JoinHandle<Result<()>> {
let this = self.clone(); let this = self.clone();
tokio::spawn(async move { this.run(cancel_token).await }) tokio::spawn(async move { this.run(cancel_token).await })
...@@ -140,15 +177,29 @@ impl KserveServiceConfigBuilder { ...@@ -140,15 +177,29 @@ impl KserveServiceConfigBuilder {
pub fn build(self) -> Result<KserveService, anyhow::Error> { pub fn build(self) -> Result<KserveService, anyhow::Error> {
let config: KserveServiceConfig = self.build_internal()?; let config: KserveServiceConfig = self.build_internal()?;
let model_manager = Arc::new(ModelManager::new()); // Create HTTP service with only non-inference endpoints (metrics, health, models list)
let state = Arc::new(State::new(model_manager)); // This provides the metrics endpoint and shared metrics object
let http_service = http_service::HttpService::builder()
// enable prometheus metrics .port(config.http_metrics_port)
let registry = metrics::Registry::new(); .host(config.http_metrics_host.clone())
state.metrics_clone().register(&registry)?; // Disable all inference endpoints - only use for metrics/health
.enable_chat_endpoints(false)
.enable_cmpl_endpoints(false)
.enable_embeddings_endpoints(false)
.enable_responses_endpoints(false)
.build()?;
// Share the HTTP service's model manager and metrics object with gRPC state
let state = Arc::new(
State::builder()
.manager(http_service.state().manager_clone())
.metrics(http_service.state().metrics_clone())
.build()?,
);
Ok(KserveService { Ok(KserveService {
state, state,
http_service,
port: config.port, port: config.port,
host: config.host, host: config.host,
request_template: config.request_template, request_template: config.request_template,
......
...@@ -15,11 +15,11 @@ use crate::types::Annotated; ...@@ -15,11 +15,11 @@ use crate::types::Annotated;
use super::kserve; use super::kserve;
// [gluo NOTE] These are common utilities that should be shared between frontends // [gluo NOTE] These are common utilities that should be shared between frontends
use crate::http::service::metrics::InflightGuard;
use crate::http::service::{ use crate::http::service::{
disconnect::{ConnectionHandle, create_connection_monitor}, disconnect::{ConnectionHandle, create_connection_monitor},
metrics::{Endpoint, ResponseMetricCollector}, metrics::{Endpoint, process_response_and_observe_metrics},
}; };
use crate::{http::service::metrics::InflightGuard, preprocessor::LLMMetricAnnotation};
use crate::protocols::tensor; use crate::protocols::tensor;
use crate::protocols::tensor::{ use crate::protocols::tensor::{
...@@ -76,6 +76,8 @@ pub async fn tensor_response_stream( ...@@ -76,6 +76,8 @@ pub async fn tensor_response_stream(
.get_tensor_engine(model) .get_tensor_engine(model)
.map_err(|_| Status::not_found("model not found"))?; .map_err(|_| Status::not_found("model not found"))?;
let http_queue_guard = state.metrics_clone().create_http_queue_guard(model);
let inflight_guard = let inflight_guard =
state state
.metrics_clone() .metrics_clone()
...@@ -115,9 +117,15 @@ pub async fn tensor_response_stream( ...@@ -115,9 +117,15 @@ pub async fn tensor_response_stream(
// apply any annotations to the front of the stream // apply any annotations to the front of the stream
let stream = stream::iter(annotations).chain(stream); let stream = stream::iter(annotations).chain(stream);
// Tap on the stream to collect response metrics // Tap on the stream to collect response metrics and handle http_queue_guard
let mut http_queue_guard = Some(http_queue_guard);
let stream = stream.inspect(move |response| { let stream = stream.inspect(move |response| {
process_metrics_only(response, &mut response_collector); // Calls observe_response() on each token - drops http_queue_guard on first token
process_response_and_observe_metrics(
response,
&mut response_collector,
&mut http_queue_guard,
);
}); });
let stream = grpc_monitor_for_disconnects(stream, ctx, inflight_guard, stream_handle); let stream = grpc_monitor_for_disconnects(stream, ctx, inflight_guard, stream_handle);
...@@ -170,17 +178,6 @@ pub fn grpc_monitor_for_disconnects<T>( ...@@ -170,17 +178,6 @@ pub fn grpc_monitor_for_disconnects<T>(
} }
} }
fn process_metrics_only<T>(
annotated: &Annotated<T>,
response_collector: &mut ResponseMetricCollector,
) {
// update metrics
if let Ok(Some(metrics)) = LLMMetricAnnotation::from_annotation(annotated) {
response_collector.observe_current_osl(metrics.output_tokens);
response_collector.observe_response(metrics.input_tokens, metrics.chunk_tokens);
}
}
/// Get the request ID from a primary source, or lastly create a new one if not present /// Get the request ID from a primary source, or lastly create a new one if not present
fn get_or_create_request_id(primary: Option<&str>) -> String { fn get_or_create_request_id(primary: Option<&str>) -> String {
// Try to get the request ID from the primary source // Try to get the request ID from the primary source
......
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. // SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0 // SPDX-License-Identifier: Apache-2.0
#[path = "common/ports.rs"]
mod ports;
pub mod kserve_test { pub mod kserve_test {
// For using gRPC client for test // For using gRPC client for test
pub mod inference { pub mod inference {
...@@ -41,6 +44,7 @@ pub mod kserve_test { ...@@ -41,6 +44,7 @@ pub mod kserve_test {
use tokio::time::timeout; use tokio::time::timeout;
use tonic::{Request, Response, transport::Channel}; use tonic::{Request, Response, transport::Channel};
use crate::ports::get_random_port;
use dynamo_async_openai::types::Prompt; use dynamo_async_openai::types::Prompt;
use prost::Message; use prost::Message;
...@@ -1794,4 +1798,177 @@ pub mod kserve_test { ...@@ -1794,4 +1798,177 @@ pub mod kserve_test {
"Expected error message about missing value" "Expected error message about missing value"
); );
} }
async fn wait_for_http_ready(port: u16, timeout_secs: u64) {
let client = reqwest::Client::new();
let start = tokio::time::Instant::now();
let timeout = Duration::from_secs(timeout_secs);
let url = format!("http://localhost:{}/metrics", port);
loop {
match client.get(&url).send().await {
Ok(_) => return,
Err(_) if start.elapsed() < timeout => {
tokio::time::sleep(Duration::from_millis(50)).await;
}
Err(e) => panic!("HTTP service failed to start within timeout: {}", e),
}
}
}
fn assert_metric_value(metrics: &str, model_name: &str, endpoint: &str, expected_count: u32) {
// Find the metric line that contains both the model and endpoint labels
let metric_line = metrics
.lines()
.find(|line| {
line.starts_with("dynamo_frontend_requests_total{")
&& line.contains(&format!("model=\"{}\"", model_name))
&& line.contains(&format!("endpoint=\"{}\"", endpoint))
})
.unwrap_or_else(|| {
panic!(
"Could not find metric for model='{}' endpoint='{}' in metrics:\n{}",
model_name, endpoint, metrics
)
});
let value_str = metric_line.split_whitespace().last().unwrap_or("0");
let actual_count: u32 = value_str.parse().unwrap_or(0);
assert_eq!(
actual_count, expected_count,
"Expected {} requests for model='{}' endpoint='{}', but found {}",
expected_count, model_name, endpoint, actual_count
);
}
#[tokio::test]
async fn test_kserve_grpc_metrics_endpoint() {
let grpc_port = get_random_port().await;
let http_metrics_port = get_random_port().await;
let service = KserveService::builder()
.port(grpc_port)
.http_metrics_port(http_metrics_port)
.build()
.unwrap();
let state = service.state_clone();
let manager = state.manager();
// Register completion model
let mut card = ModelDeploymentCard::with_name_only("test_model");
card.model_type = ModelType::Completions;
card.model_input = ModelInput::Text;
manager
.add_completions_model("test_model", card.mdcsum(), Arc::new(SplitEngine {}))
.unwrap();
manager.save_model_card("test_model", card).unwrap();
// Register tensor model
let mut tensor_card = ModelDeploymentCard::with_name_only("test_tensor_model");
tensor_card.model_type = ModelType::TensorBased;
tensor_card.model_input = ModelInput::Tensor;
tensor_card.runtime_config = ModelRuntimeConfig {
tensor_model_config: Some(tensor::TensorModelConfig {
name: "test_tensor_model".to_string(),
inputs: vec![tensor::TensorMetadata {
name: "input".to_string(),
data_type: tensor::DataType::Int32,
shape: vec![1],
parameters: Default::default(),
}],
outputs: vec![tensor::TensorMetadata {
name: "output".to_string(),
data_type: tensor::DataType::Int32,
shape: vec![1],
parameters: Default::default(),
}],
triton_model_config: None,
}),
..Default::default()
};
manager
.add_tensor_model(
"test_tensor_model",
tensor_card.mdcsum(),
Arc::new(TensorEngine {}),
)
.unwrap();
manager
.save_model_card("test_tensor_model", tensor_card)
.unwrap();
// Start services
let cancel_token = CancellationToken::new();
let grpc_task = service.spawn(cancel_token.clone()).await;
let http_task = service.http_service().spawn(cancel_token.clone()).await;
// Wait for services to be ready
let mut grpc_client = get_ready_client(grpc_port, 10).await;
wait_for_http_ready(http_metrics_port, 10).await;
// Test completion model
grpc_client
.model_infer(Request::new(ModelInferRequest {
model_name: "test_model".into(),
model_version: "1".into(),
id: "test-metrics".into(),
inputs: vec![inference::model_infer_request::InferInputTensor {
name: "text_input".into(),
datatype: "BYTES".into(),
shape: vec![1],
contents: Some(inference::InferTensorContents {
bytes_contents: vec!["test input".into()],
..Default::default()
}),
..Default::default()
}],
..Default::default()
}))
.await
.expect("Inference request failed");
// Test tensor model
grpc_client
.model_infer(Request::new(ModelInferRequest {
model_name: "test_tensor_model".into(),
model_version: "1".into(),
id: "test-tensor-metrics".into(),
inputs: vec![inference::model_infer_request::InferInputTensor {
name: "input".into(),
datatype: "INT32".into(),
shape: vec![1],
contents: Some(inference::InferTensorContents {
int_contents: vec![42],
..Default::default()
}),
..Default::default()
}],
..Default::default()
}))
.await
.expect("Tensor inference request failed");
// Verify metrics are exposed via HTTP endpoint
let metrics_url = format!("http://localhost:{}/metrics", http_metrics_port);
let metrics_body = reqwest::get(&metrics_url)
.await
.expect("Failed to fetch metrics")
.text()
.await
.unwrap();
// Verify metrics are present and have correct values
assert!(
metrics_body.contains("dynamo_frontend_inflight_requests"),
"Metrics should contain inflight gauge"
);
assert_metric_value(&metrics_body, "test_model", "completions", 1);
assert_metric_value(&metrics_body, "test_tensor_model", "tensor", 1);
// Clean up
cancel_token.cancel();
let _ = tokio::join!(grpc_task, http_task);
}
} }
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment