Unverified Commit e42746a1 authored by Michael Feil's avatar Michael Feil Committed by GitHub
Browse files

feat: frontend, disconnect metrics (#1) (#2953)


Signed-off-by: default avatarmichaelfeil <me@michaelfeil.eu>
parent 4b3a2c1a
......@@ -52,7 +52,8 @@ pub async fn completion_response_stream(
let context = request.context();
// create the connection handles
let (mut connection_handle, stream_handle) = create_connection_monitor(context.clone()).await;
let (mut connection_handle, stream_handle) =
create_connection_monitor(context.clone(), Some(state.metrics_clone())).await;
let streaming = request.inner.stream.unwrap_or(false);
// update the request to always stream
......
......@@ -33,7 +33,7 @@ use dynamo_runtime::engine::AsyncEngineContext;
use futures::{Stream, StreamExt};
use std::sync::Arc;
use crate::http::service::metrics::InflightGuard;
use crate::http::service::metrics::{InflightGuard, Metrics};
#[derive(Clone, Copy)]
pub enum ConnectionStatus {
......@@ -99,6 +99,7 @@ impl Drop for ConnectionHandle {
/// The handles are returned in the order of the first being armed and the second being disarmed.
pub async fn create_connection_monitor(
engine_context: Arc<dyn AsyncEngineContext>,
metrics: Option<Arc<Metrics>>,
) -> (ConnectionHandle, ConnectionHandle) {
// these oneshot channels monitor possible disconnects from the client in two different scopes:
// - the local task (connection_handle)
......@@ -111,6 +112,7 @@ pub async fn create_connection_monitor(
engine_context.clone(),
connection_rx,
stream_rx,
metrics,
));
// Two handles, the first is armed, the second is disarmed
......@@ -125,11 +127,15 @@ async fn connection_monitor(
engine_context: Arc<dyn AsyncEngineContext>,
connection_rx: tokio::sync::oneshot::Receiver<ConnectionStatus>,
stream_rx: tokio::sync::oneshot::Receiver<ConnectionStatus>,
metrics: Option<Arc<Metrics>>,
) {
match connection_rx.await {
Err(_) | Ok(ConnectionStatus::ClosedUnexpectedly) => {
// the client has disconnected, no need to gracefully cancel, just kill the context
tracing::trace!("Connection closed unexpectedly; issuing cancellation");
if let Some(metrics) = &metrics {
metrics.inc_client_disconnect();
}
engine_context.kill();
}
Ok(ConnectionStatus::ClosedGracefully) => {
......@@ -141,6 +147,9 @@ async fn connection_monitor(
match stream_rx.await {
Err(_) | Ok(ConnectionStatus::ClosedUnexpectedly) => {
tracing::trace!("Stream closed unexpectedly; issuing cancellation");
if let Some(metrics) = &metrics {
metrics.inc_client_disconnect();
}
engine_context.kill();
}
Ok(ConnectionStatus::ClosedGracefully) => {
......
......@@ -18,6 +18,7 @@ use super::RouteDoc;
pub struct Metrics {
request_counter: IntCounterVec,
inflight_gauge: IntGaugeVec,
client_disconnect_gauge: prometheus::IntGauge,
request_duration: HistogramVec,
input_sequence_length: HistogramVec,
output_sequence_length: HistogramVec,
......@@ -133,6 +134,12 @@ impl Metrics {
)
.unwrap();
let client_disconnect_gauge = prometheus::IntGauge::new(
frontend_metric_name("client_disconnects"),
"Number of connections dropped by clients",
)
.unwrap();
let buckets = vec![0.0, 1.0, 2.0, 4.0, 8.0, 16.0, 32.0, 64.0, 128.0, 256.0];
let request_duration = HistogramVec::new(
......@@ -198,6 +205,7 @@ impl Metrics {
Metrics {
request_counter,
inflight_gauge,
client_disconnect_gauge,
request_duration,
input_sequence_length,
output_sequence_length,
......@@ -263,9 +271,20 @@ impl Metrics {
self.inflight_gauge.with_label_values(&[model]).dec()
}
/// Increment the gauge for client disconnections
pub fn inc_client_disconnect(&self) {
self.client_disconnect_gauge.inc();
}
/// Get the count of client disconnections
pub fn get_client_disconnect_count(&self) -> i64 {
self.client_disconnect_gauge.get()
}
pub fn register(&self, registry: &Registry) -> Result<(), prometheus::Error> {
registry.register(Box::new(self.request_counter.clone()))?;
registry.register(Box::new(self.inflight_gauge.clone()))?;
registry.register(Box::new(self.client_disconnect_gauge.clone()))?;
registry.register(Box::new(self.request_duration.clone()))?;
registry.register(Box::new(self.input_sequence_length.clone()))?;
registry.register(Box::new(self.output_sequence_length.clone()))?;
......
......@@ -224,7 +224,8 @@ async fn handler_completions(
let context = request.context();
// create the connection handles
let (mut connection_handle, stream_handle) = create_connection_monitor(context.clone()).await;
let (mut connection_handle, stream_handle) =
create_connection_monitor(context.clone(), Some(state.metrics_clone())).await;
// possibly long running task
// if this returns a streaming response, the stream handle will be armed and captured by the response stream
......@@ -419,7 +420,8 @@ async fn handler_chat_completions(
let context = request.context();
// create the connection handles
let (mut connection_handle, stream_handle) = create_connection_monitor(context.clone()).await;
let (mut connection_handle, stream_handle) =
create_connection_monitor(context.clone(), Some(state.metrics_clone())).await;
let response =
tokio::spawn(chat_completions(state, template, request, stream_handle).in_current_span())
......@@ -651,7 +653,8 @@ async fn handler_responses(
let context = request.context();
// create the connection handles
let (mut connection_handle, _stream_handle) = create_connection_monitor(context.clone()).await;
let (mut connection_handle, _stream_handle) =
create_connection_monitor(context.clone(), Some(state.metrics_clone())).await;
let response = tokio::spawn(responses(state, template, request).in_current_span())
.await
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment