Unverified Commit 227a0e71 authored by Hongkuan Zhou's avatar Hongkuan Zhou Committed by GitHub
Browse files

refactor: use comment filed in annotated to pass metric-related information (#1385)

parent 3363d8b6
...@@ -418,9 +418,6 @@ impl ...@@ -418,9 +418,6 @@ impl
id: None, id: None,
data: Some(delta), data: Some(delta),
event: None, event: None,
chunk_tokens: None,
input_tokens: None,
output_tokens: None,
comment: None, comment: None,
}; };
yield ann; yield ann;
...@@ -585,9 +582,6 @@ impl AsyncEngine<SingleIn<NvCreateCompletionRequest>, ManyOut<Annotated<Completi ...@@ -585,9 +582,6 @@ impl AsyncEngine<SingleIn<NvCreateCompletionRequest>, ManyOut<Annotated<Completi
id: None, id: None,
data: Some(inner), data: Some(inner),
event: None, event: None,
chunk_tokens: None,
input_tokens: None,
output_tokens: None,
comment: None, comment: None,
}; };
yield ann; yield ann;
......
...@@ -202,7 +202,7 @@ impl ...@@ -202,7 +202,7 @@ impl
let response = NvCreateChatCompletionStreamResponse { let response = NvCreateChatCompletionStreamResponse {
inner, inner,
}; };
yield Annotated{ id: Some(id.to_string()), data: Some(response), event: None, chunk_tokens: None, input_tokens: None, output_tokens: None, comment: None }; yield Annotated{ id: Some(id.to_string()), data: Some(response), event: None, comment: None };
id += 1; id += 1;
} }
...@@ -210,7 +210,7 @@ impl ...@@ -210,7 +210,7 @@ impl
let response = NvCreateChatCompletionStreamResponse { let response = NvCreateChatCompletionStreamResponse {
inner, inner,
}; };
yield Annotated { id: Some(id.to_string()), data: Some(response), event: None, chunk_tokens: None, input_tokens: None, output_tokens: None, comment: None }; yield Annotated { id: Some(id.to_string()), data: Some(response), event: None, comment: None };
}; };
Ok(ResponseStream::new(Box::pin(output), ctx)) Ok(ResponseStream::new(Box::pin(output), ctx))
...@@ -234,11 +234,11 @@ impl AsyncEngine<SingleIn<NvCreateCompletionRequest>, ManyOut<Annotated<Completi ...@@ -234,11 +234,11 @@ impl AsyncEngine<SingleIn<NvCreateCompletionRequest>, ManyOut<Annotated<Completi
for c in chars_string.chars() { for c in chars_string.chars() {
tokio::time::sleep(*TOKEN_ECHO_DELAY).await; tokio::time::sleep(*TOKEN_ECHO_DELAY).await;
let response = deltas.create_choice(0, Some(c.to_string()), None); let response = deltas.create_choice(0, Some(c.to_string()), None);
yield Annotated{ id: Some(id.to_string()), data: Some(response), event: None, chunk_tokens: None, input_tokens: None, output_tokens: None, comment: None }; yield Annotated{ id: Some(id.to_string()), data: Some(response), event: None, comment: None };
id += 1; id += 1;
} }
let response = deltas.create_choice(0, None, Some("stop".to_string())); let response = deltas.create_choice(0, None, Some("stop".to_string()));
yield Annotated { id: Some(id.to_string()), data: Some(response), event: None, chunk_tokens: None, input_tokens: None, output_tokens: None, comment: None }; yield Annotated { id: Some(id.to_string()), data: Some(response), event: None, comment: None };
}; };
......
...@@ -27,6 +27,7 @@ use super::{ ...@@ -27,6 +27,7 @@ use super::{
service_v2, RouteDoc, service_v2, RouteDoc,
}; };
use crate::preprocessor::LLMMetricAnnotation;
use crate::protocols::openai::embeddings::{NvCreateEmbeddingRequest, NvCreateEmbeddingResponse}; use crate::protocols::openai::embeddings::{NvCreateEmbeddingRequest, NvCreateEmbeddingResponse};
use crate::protocols::openai::{ use crate::protocols::openai::{
chat_completions::NvCreateChatCompletionResponse, completions::CompletionResponse, chat_completions::NvCreateChatCompletionResponse, completions::CompletionResponse,
...@@ -500,6 +501,12 @@ fn process_event_converter<T: Serialize>( ...@@ -500,6 +501,12 @@ fn process_event_converter<T: Serialize>(
) -> Result<Event, axum::Error> { ) -> Result<Event, axum::Error> {
let annotated = annotated.0; let annotated = annotated.0;
// update metrics
if let Ok(Some(metrics)) = LLMMetricAnnotation::from_annotation(&annotated) {
response_collector.observe_current_osl(metrics.output_tokens);
response_collector.observe_response(metrics.input_tokens, metrics.chunk_tokens);
}
let mut event = Event::default(); let mut event = Event::default();
if let Some(data) = annotated.data { if let Some(data) = annotated.data {
...@@ -516,16 +523,6 @@ fn process_event_converter<T: Serialize>( ...@@ -516,16 +523,6 @@ fn process_event_converter<T: Serialize>(
event = event.event(msg); event = event.event(msg);
} }
if let Some(osl) = annotated.output_tokens {
response_collector.observe_current_osl(osl);
}
if let Some(isl) = annotated.input_tokens {
if let Some(chunk_tokens) = annotated.chunk_tokens {
response_collector.observe_response(isl, chunk_tokens);
}
}
if let Some(comments) = annotated.comment { if let Some(comments) = annotated.comment {
for comment in comments { for comment in comments {
event = event.comment(comment); event = event.comment(comment);
......
...@@ -59,6 +59,41 @@ pub use crate::protocols::common::llm_backend::{BackendOutput, PreprocessedReque ...@@ -59,6 +59,41 @@ pub use crate::protocols::common::llm_backend::{BackendOutput, PreprocessedReque
pub const ANNOTATION_FORMATTED_PROMPT: &str = "formatted_prompt"; pub const ANNOTATION_FORMATTED_PROMPT: &str = "formatted_prompt";
pub const ANNOTATION_TOKEN_IDS: &str = "token_ids"; pub const ANNOTATION_TOKEN_IDS: &str = "token_ids";
pub const ANNOTATION_LLM_METRICS: &str = "llm_metrics";
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct LLMMetricAnnotation {
pub input_tokens: usize,
pub output_tokens: usize,
pub chunk_tokens: usize,
}
impl LLMMetricAnnotation {
/// Convert this metrics struct to an Annotated event
pub fn to_annotation<T>(&self) -> Result<Annotated<T>, serde_json::Error> {
Annotated::from_annotation(ANNOTATION_LLM_METRICS, self)
}
/// Extract LLM metrics from an Annotated event, if present
pub fn from_annotation<T>(
annotation: &Annotated<T>,
) -> Result<Option<LLMMetricAnnotation>, Box<dyn std::error::Error>> {
if annotation.event.is_none() {
return Ok(None);
}
if annotation.event.as_ref().unwrap() != ANNOTATION_LLM_METRICS {
return Ok(None);
}
let comments = annotation
.comment
.as_ref()
.ok_or("missing comments block")?;
if comments.len() != 1 {
return Err("malformed comments block - expected exactly 1 comment".into());
}
let metrics: LLMMetricAnnotation = serde_json::from_str(&comments[0])?;
Ok(Some(metrics))
}
}
pub struct OpenAIPreprocessor { pub struct OpenAIPreprocessor {
mdcsum: String, mdcsum: String,
...@@ -251,9 +286,20 @@ impl OpenAIPreprocessor { ...@@ -251,9 +286,20 @@ impl OpenAIPreprocessor {
.map_err(|e| e.to_string()) .map_err(|e| e.to_string())
}); });
response.chunk_tokens = Some(chunk_tokens); // Create LLM metrics annotation
response.input_tokens = Some(isl); let llm_metrics = LLMMetricAnnotation {
response.output_tokens = Some(current_osl); input_tokens: isl,
output_tokens: current_osl,
chunk_tokens,
};
if let Ok(metrics_annotated) = llm_metrics.to_annotation::<()>() {
// Only set event if not already set to avoid overriding existing events (like errors)
if response.event.is_none() {
response.event = metrics_annotated.event;
}
response.comment = metrics_annotated.comment;
}
tracing::trace!( tracing::trace!(
request_id = inner.context.id(), request_id = inner.context.id(),
......
...@@ -118,9 +118,6 @@ where ...@@ -118,9 +118,6 @@ where
data, data,
id: value.id, id: value.id,
event: value.event, event: value.event,
chunk_tokens: None,
input_tokens: None,
output_tokens: None,
comment: value.comments, comment: value.comments,
}) })
} }
......
...@@ -284,9 +284,6 @@ mod tests { ...@@ -284,9 +284,6 @@ mod tests {
data: Some(data), data: Some(data),
id: Some("test_id".to_string()), id: Some("test_id".to_string()),
event: None, event: None,
chunk_tokens: None,
input_tokens: None,
output_tokens: None,
comment: None, comment: None,
} }
} }
...@@ -430,9 +427,6 @@ mod tests { ...@@ -430,9 +427,6 @@ mod tests {
data: Some(data), data: Some(data),
id: Some("test_id".to_string()), id: Some("test_id".to_string()),
event: None, event: None,
chunk_tokens: None,
input_tokens: None,
output_tokens: None,
comment: None, comment: None,
}; };
let stream = Box::pin(stream::iter(vec![annotated_delta])); let stream = Box::pin(stream::iter(vec![annotated_delta]));
......
...@@ -205,9 +205,6 @@ mod tests { ...@@ -205,9 +205,6 @@ mod tests {
}), }),
id: Some("test_id".to_string()), id: Some("test_id".to_string()),
event: None, event: None,
chunk_tokens: None,
input_tokens: None,
output_tokens: None,
comment: None, comment: None,
} }
} }
...@@ -317,9 +314,6 @@ mod tests { ...@@ -317,9 +314,6 @@ mod tests {
}), }),
id: Some("test_id".to_string()), id: Some("test_id".to_string()),
event: None, event: None,
chunk_tokens: None,
input_tokens: None,
output_tokens: None,
comment: None, comment: None,
}; };
......
...@@ -37,12 +37,6 @@ pub struct Annotated<R> { ...@@ -37,12 +37,6 @@ pub struct Annotated<R> {
#[serde(skip_serializing_if = "Option::is_none")] #[serde(skip_serializing_if = "Option::is_none")]
pub event: Option<String>, pub event: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")] #[serde(skip_serializing_if = "Option::is_none")]
pub chunk_tokens: Option<usize>,
#[serde(skip_serializing_if = "Option::is_none")]
pub input_tokens: Option<usize>,
#[serde(skip_serializing_if = "Option::is_none")]
pub output_tokens: Option<usize>,
#[serde(skip_serializing_if = "Option::is_none")]
pub comment: Option<Vec<String>>, pub comment: Option<Vec<String>>,
} }
...@@ -53,9 +47,6 @@ impl<R> Annotated<R> { ...@@ -53,9 +47,6 @@ impl<R> Annotated<R> {
data: None, data: None,
id: None, id: None,
event: Some("error".to_string()), event: Some("error".to_string()),
chunk_tokens: None,
input_tokens: None,
output_tokens: None,
comment: Some(vec![error]), comment: Some(vec![error]),
} }
} }
...@@ -66,9 +57,6 @@ impl<R> Annotated<R> { ...@@ -66,9 +57,6 @@ impl<R> Annotated<R> {
data: Some(data), data: Some(data),
id: None, id: None,
event: None, event: None,
chunk_tokens: None,
input_tokens: None,
output_tokens: None,
comment: None, comment: None,
} }
} }
...@@ -84,9 +72,6 @@ impl<R> Annotated<R> { ...@@ -84,9 +72,6 @@ impl<R> Annotated<R> {
data: None, data: None,
id: None, id: None,
event: Some(name.into()), event: Some(name.into()),
chunk_tokens: None,
input_tokens: None,
output_tokens: None,
comment: Some(vec![serde_json::to_string(value)?]), comment: Some(vec![serde_json::to_string(value)?]),
}) })
} }
...@@ -122,9 +107,6 @@ impl<R> Annotated<R> { ...@@ -122,9 +107,6 @@ impl<R> Annotated<R> {
data, data,
id: self.id, id: self.id,
event: self.event, event: self.event,
chunk_tokens: self.chunk_tokens,
input_tokens: self.input_tokens,
output_tokens: self.output_tokens,
comment: self.comment, comment: self.comment,
} }
} }
...@@ -140,9 +122,6 @@ impl<R> Annotated<R> { ...@@ -140,9 +122,6 @@ impl<R> Annotated<R> {
data, data,
id: self.id, id: self.id,
event: self.event, event: self.event,
chunk_tokens: self.chunk_tokens,
input_tokens: self.input_tokens,
output_tokens: self.output_tokens,
comment: self.comment, comment: self.comment,
}, },
Err(e) => Annotated::from_error(e), Err(e) => Annotated::from_error(e),
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment