Unverified Commit f1bcb175 authored by Graham King's avatar Graham King Committed by GitHub
Browse files

feat: Add metric tokenizer_latency_ms (#6092)


Signed-off-by: default avatarGraham King <grahamk@nvidia.com>
parent d8feb93c
......@@ -57,6 +57,8 @@ class frontend_service:
OUTPUT_SEQUENCE_TOKENS = "output_sequence_tokens"
# Number of cached tokens (prefix cache hits) per request
CACHED_TOKENS = "cached_tokens"
# Tokenizer latency in milliseconds
TOKENIZER_LATENCY_MS = "tokenizer_latency_ms"
# Total number of output tokens generated (counter that updates in real-time)
OUTPUT_TOKENS_TOTAL = "output_tokens_total"
# Time to first token in seconds
......@@ -98,6 +100,8 @@ class frontend_service:
WORKER_LAST_INTER_TOKEN_LATENCY_SECONDS = "worker_last_inter_token_latency_seconds"
# Label name for the type of migration
MIGRATION_TYPE_LABEL = "migration_type"
# Label name for tokenizer operation
OPERATION_LABEL = "operation"
class kvbm:
......
......@@ -234,6 +234,7 @@ pub struct Metrics {
input_sequence_length: HistogramVec,
output_sequence_length: HistogramVec,
cached_tokens: HistogramVec,
tokenizer_latency: HistogramVec,
output_tokens_counter: IntCounterVec,
time_to_first_token: HistogramVec,
inter_token_latency: HistogramVec,
......@@ -327,6 +328,8 @@ pub struct ResponseMetricCollector {
osl: usize,
// we track if cached_tokens has been observed to ensure we only increment once per request
cached_tokens_observed: bool,
// we track if tokenizer latency has been observed to ensure we only increment once per request
tokenizer_latency_observed: bool,
// Prefill worker info for TTFT attribution (set from LLMMetricAnnotation)
prefill_worker_id: Option<u64>,
prefill_dp_rank: Option<u32>,
......@@ -356,6 +359,7 @@ impl Metrics {
/// - `{prefix}_request_duration_seconds` - HistogramVec for the duration of requests
/// - `{prefix}_input_sequence_tokens` - HistogramVec for input sequence length in tokens
/// - `{prefix}_output_sequence_tokens` - HistogramVec for output sequence length in tokens
/// - `{prefix}_tokenizer_latency_ms` - HistogramVec for tokenizer latency in milliseconds
/// - `{prefix}_output_tokens_total` - IntCounterVec for total output tokens generated (real-time updates)
/// - `{prefix}_time_to_first_token_seconds` - HistogramVec for time to first token in seconds
/// - `{prefix}_inter_token_latency_seconds` - HistogramVec for inter-token latency in seconds
......@@ -531,6 +535,18 @@ impl Metrics {
)
.unwrap();
let tokenizer_latency = HistogramVec::new(
HistogramOpts::new(
frontend_metric_name(frontend_service::TOKENIZER_LATENCY_MS),
"Tokenizer latency in milliseconds",
)
.buckets(vec![
0.5, 1.0, 2.0, 4.0, 8.0, 16.0, 32.0, 64.0, 128.0, 256.0, 512.0,
]),
&[frontend_service::OPERATION_LABEL],
)
.unwrap();
// Runtime configuration metrics
// Note: Some of these metrics represent counter-like values from source systems,
// but are implemented as gauges because they are copied/synchronized from upstream
......@@ -607,6 +623,7 @@ impl Metrics {
input_sequence_length,
output_sequence_length,
cached_tokens,
tokenizer_latency,
output_tokens_counter,
time_to_first_token,
inter_token_latency,
......@@ -704,6 +721,7 @@ impl Metrics {
registry.register(Box::new(self.input_sequence_length.clone()))?;
registry.register(Box::new(self.output_sequence_length.clone()))?;
registry.register(Box::new(self.cached_tokens.clone()))?;
registry.register(Box::new(self.tokenizer_latency.clone()))?;
registry.register(Box::new(self.output_tokens_counter.clone()))?;
registry.register(Box::new(self.time_to_first_token.clone()))?;
registry.register(Box::new(self.inter_token_latency.clone()))?;
......@@ -969,6 +987,7 @@ impl ResponseMetricCollector {
start_time: Instant::now(),
osl: 0,
cached_tokens_observed: false,
tokenizer_latency_observed: false,
prefill_worker_id: None,
prefill_dp_rank: None,
prefill_worker_type: None,
......@@ -1033,6 +1052,19 @@ impl ResponseMetricCollector {
}
}
/// Observe tokenizer latency in milliseconds, once per request.
pub fn observe_tokenizer_latency(&mut self, tokenizer_latency: Option<Duration>) {
if let Some(latency) = tokenizer_latency
&& !self.tokenizer_latency_observed
{
self.tokenizer_latency_observed = true;
self.metrics
.tokenizer_latency
.with_label_values(&[frontend_service::operation::TOKENIZE])
.observe(latency.as_secs_f64() * 1000.0);
}
}
/// Observe a response with input sequence length and number of new tokens
pub fn observe_response(&mut self, isl: usize, num_tokens: usize) {
if num_tokens == 0 {
......@@ -1147,6 +1179,7 @@ pub fn process_response_and_observe_metrics<T>(
if let Ok(Some(metrics)) = LLMMetricAnnotation::from_annotation(annotated) {
response_collector.observe_current_osl(metrics.output_tokens);
response_collector.observe_cached_tokens(metrics.cached_tokens);
response_collector.observe_tokenizer_latency(metrics.tokenizer_latency);
response_collector.set_worker_info(
metrics.prefill_worker_id,
metrics.prefill_dp_rank,
......@@ -1196,6 +1229,7 @@ pub fn process_response_using_event_converter_and_observe_metrics<T: Serialize>(
if let Ok(Some(metrics)) = LLMMetricAnnotation::from_annotation(&annotated) {
response_collector.observe_current_osl(metrics.output_tokens);
response_collector.observe_cached_tokens(metrics.cached_tokens);
response_collector.observe_tokenizer_latency(metrics.tokenizer_latency);
response_collector.set_worker_info(
metrics.prefill_worker_id,
metrics.prefill_dp_rank,
......@@ -1676,6 +1710,7 @@ mod tests {
let model = "test-model";
let expected_metric_name = "dynamo_frontend_cached_tokens";
let expected_tokenizer_metric_name = "dynamo_frontend_tokenizer_latency_ms";
let mut collector = metrics.clone().create_response_collector(model);
// Create a metrics annotation event (event without SSE data payload)
......@@ -1700,6 +1735,7 @@ mod tests {
decode_worker_id: None,
decode_dp_rank: None,
decode_worker_type: None,
tokenizer_latency: Some(Duration::from_millis(8)),
};
let annotation = llm_metrics.to_annotation::<()>().unwrap();
......@@ -1729,6 +1765,17 @@ mod tests {
.get_sample_count(),
1
);
let histogram_family = metric_families
.iter()
.find(|mf| mf.name() == expected_tokenizer_metric_name)
.expect("histogram should be registered");
assert_eq!(
histogram_family.get_metric()[0]
.get_histogram()
.get_sample_count(),
1
);
}
#[test]
......@@ -1742,6 +1789,7 @@ mod tests {
let model = "test-model";
let expected_metric_name = "dynamo_frontend_cached_tokens";
let expected_tokenizer_metric_name = "dynamo_frontend_tokenizer_latency_ms";
let mut collector = metrics.clone().create_response_collector(model);
// Create a metrics annotation event
......@@ -1765,6 +1813,7 @@ mod tests {
decode_worker_id: None,
decode_dp_rank: None,
decode_worker_type: None,
tokenizer_latency: Some(Duration::from_millis(8)),
};
let annotation = llm_metrics.to_annotation::<()>().unwrap();
......@@ -1787,5 +1836,16 @@ mod tests {
.get_sample_count(),
1
);
let histogram_family = metric_families
.iter()
.find(|mf| mf.name() == expected_tokenizer_metric_name)
.expect("histogram should be registered");
assert_eq!(
histogram_family.get_metric()[0]
.get_histogram()
.get_sample_count(),
1
);
}
}
......@@ -23,6 +23,7 @@ use dynamo_async_openai::types::{
use futures::Stream;
use futures::stream::{self, StreamExt};
use prompt::OAIPromptFormatter;
use std::time::{Duration, Instant};
use std::{collections::HashMap, pin::Pin, sync::Arc};
use tracing;
......@@ -33,6 +34,7 @@ use crate::preprocessor::prompt::OAIChatLikeRequest;
use crate::protocols::common::preprocessor::{
MultimodalData, MultimodalDataMap, PreprocessedRequestBuilder, RoutingHints,
};
use crate::protocols::common::timing::RequestTracker;
use crate::tokenizers::Encoding;
use dynamo_parsers::{ReasoningParser, ReasoningParserType};
......@@ -92,6 +94,8 @@ pub struct LLMMetricAnnotation {
/// Stored at routing time to avoid expensive MDC lookup when updating ITL metrics.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub decode_worker_type: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub tokenizer_latency: Option<Duration>,
}
impl LLMMetricAnnotation {
......@@ -212,13 +216,14 @@ impl OpenAIPreprocessor {
>(
&self,
request: &R,
tracker: Option<&RequestTracker>,
) -> Result<(PreprocessedRequest, HashMap<String, String>)> {
let mut builder = self.builder(request)?;
let formatted_prompt = self
.apply_template(request)
.with_context(|| "Failed to apply prompt template")?;
let annotations = self
.gather_tokens(request, &mut builder, formatted_prompt)
.gather_tokens(request, &mut builder, formatted_prompt, tracker)
.with_context(|| "Failed to gather tokens")?;
self.gather_multi_modal_data(request, &mut builder)
.await
......@@ -422,6 +427,7 @@ impl OpenAIPreprocessor {
request: &R,
builder: &mut PreprocessedRequestBuilder,
formatted_prompt: Option<String>,
tracker: Option<&RequestTracker>,
) -> Result<HashMap<String, String>> {
let mut annotations = HashMap::new();
// match request type before any conversion/processing
......@@ -480,12 +486,12 @@ impl OpenAIPreprocessor {
tracing::warn!(
"backend_instance_id provided but no token_data; tokenizing prompt"
);
let encoding = self.tokenizer.encode(&prompt)?;
let encoding = self.encode_with_timing(&prompt, tracker)?;
(encoding.token_ids().to_vec(), false)
}
} else {
// No backend_instance_id provided, continue the normal flow.
let encoding = self.tokenizer.encode(&prompt)?;
let encoding = self.encode_with_timing(&prompt, tracker)?;
(encoding.token_ids().to_vec(), false)
};
......@@ -502,7 +508,7 @@ impl OpenAIPreprocessor {
}
TextInput::Batch(texts) => {
if texts.len() == 1 {
let encoding = self.tokenizer.encode(&texts[0])?;
let encoding = self.encode_with_timing(&texts[0], tracker)?;
builder.token_ids(encoding.token_ids().to_vec());
} else {
bail!(
......@@ -518,6 +524,19 @@ impl OpenAIPreprocessor {
Ok(annotations)
}
fn encode_with_timing(
&self,
prompt: &str,
tracker: Option<&RequestTracker>,
) -> anyhow::Result<Encoding> {
let encode_start = Instant::now();
let encoding = self.tokenizer.encode(prompt)?;
if let Some(t) = tracker {
t.record_tokenizer_latency(encode_start.elapsed());
}
Ok(encoding)
}
/// Preprocess an embedding request, handling both text and token ID inputs.
///
/// For text inputs, tokenizes the text using the configured tokenizer.
......@@ -703,6 +722,7 @@ impl OpenAIPreprocessor {
decode_worker_id,
decode_dp_rank,
decode_worker_type,
tokenizer_latency: tracker.as_ref().and_then(|t| t.tokenizer_latency()),
};
if let Ok(metrics_annotated) = llm_metrics.to_annotation::<()>() {
......@@ -763,6 +783,7 @@ impl OpenAIPreprocessor {
decode_worker_id,
decode_dp_rank,
decode_worker_type,
tokenizer_latency: tracker.as_ref().and_then(|t| t.tokenizer_latency()),
};
// Create annotation string
......@@ -1024,12 +1045,15 @@ impl
// create a response generator
let response_generator = request.response_generator(context.id().to_string());
let tracker = response_generator.tracker();
// convert the chat completion request to a common completion request
let (mut common_request, annotations) = self.preprocess_request(&request).await?;
let (mut common_request, annotations) = self
.preprocess_request(&request, tracker.as_deref())
.await?;
// Attach the timing tracker to the request so downstream components can record metrics
common_request.tracker = response_generator.tracker();
common_request.tracker = tracker;
let mut response_generator = Box::new(response_generator);
......@@ -1176,6 +1200,7 @@ impl
// create a response generator
let response_generator = request.response_generator(context.id().to_string());
let mut response_generator = Box::new(response_generator);
let tracker = response_generator.tracker();
// convert the chat completion request to a common completion request
let mut builder = self.builder(&request)?;
......@@ -1188,7 +1213,7 @@ impl
HashMap::new()
} else {
// Normal path: tokenize the prompt
self.gather_tokens(&request, &mut builder, None)?
self.gather_tokens(&request, &mut builder, None, tracker.as_deref())?
};
// Gather multimodal data (works with both embeddings and text prompts)
......@@ -1197,7 +1222,7 @@ impl
let mut common_request = builder.build()?;
// Attach the timing tracker to the request so downstream components can record metrics
common_request.tracker = response_generator.tracker();
common_request.tracker = tracker;
// Update ISL only for text prompts (embeddings get sequence length from tensor shape)
if common_request.prompt_embeds.is_none() {
......
......@@ -6,11 +6,14 @@
//! This module provides [`RequestTracker`] for tracking timing and routing information
//! that can be returned to clients via the `nvext` response field.
use std::sync::{
use std::time::{Instant, SystemTime, UNIX_EPOCH};
use std::{
sync::{
Arc, OnceLock,
atomic::{AtomicU32, AtomicU64, Ordering},
},
time::Duration,
};
use std::time::{Instant, SystemTime, UNIX_EPOCH};
use parking_lot::Mutex;
use serde::{Deserialize, Serialize};
......@@ -127,6 +130,9 @@ pub struct RequestTracker {
/// This prevents race conditions in the bootstrap optimization path where prefill
/// runs in background and needs to complete record_worker before phase changes.
phase_semaphore: Arc<Semaphore>,
/// How long it took to tokenize the input
tokenizer_latency: OnceLock<Duration>,
}
impl RequestTracker {
......@@ -154,6 +160,7 @@ impl RequestTracker {
decode_worker_type: OnceLock::new(),
phase: Mutex::new(RequestPhase::Aggregated),
phase_semaphore: Arc::new(Semaphore::new(1)),
tokenizer_latency: OnceLock::new(),
}
}
......@@ -388,6 +395,14 @@ impl RequestTracker {
}
}
pub fn record_tokenizer_latency(&self, l: Duration) {
let _ = self.tokenizer_latency.set(l);
}
pub fn tokenizer_latency(&self) -> Option<Duration> {
self.tokenizer_latency.get().copied()
}
/// Get worker ID information if any worker IDs have been recorded.
pub fn get_worker_info(&self) -> Option<WorkerIdInfo> {
let prefill = self.prefill_worker_id();
......
......@@ -501,7 +501,7 @@ pub mod openai_preprocessor_tests {
let oai_preprocessor = OpenAIPreprocessor::new(mdc.clone()).unwrap();
let request = Request::from(SINGLE_CHAT_MESSAGE, None, None, mdc.slug().to_string());
let preprocessed_request = oai_preprocessor
.preprocess_request(&request)
.preprocess_request(&request, None)
.await
.unwrap()
.0;
......@@ -583,7 +583,10 @@ async fn test_media_url_passthrough(#[case] media_chunks: &[(&str, usize)]) {
let message = build_message("Test multimodal content", media_chunks);
let request = Request::from(&message, None, None, mdc.slug().to_string());
let (preprocessed, _annotations) = preprocessor.preprocess_request(&request).await.unwrap();
let (preprocessed, _annotations) = preprocessor
.preprocess_request(&request, None)
.await
.unwrap();
// Verify multimodal data handling
if media_chunks.is_empty() {
......
......@@ -128,6 +128,9 @@ pub mod frontend_service {
/// Number of cached tokens (prefix cache hits) per request
pub const CACHED_TOKENS: &str = "cached_tokens";
/// Tokenizer latency in milliseconds
pub const TOKENIZER_LATENCY_MS: &str = "tokenizer_latency_ms";
/// Total number of output tokens generated (counter that updates in real-time)
pub const OUTPUT_TOKENS_TOTAL: &str = "output_tokens_total";
......@@ -188,6 +191,19 @@ pub mod frontend_service {
/// Label name for the type of migration
pub const MIGRATION_TYPE_LABEL: &str = "migration_type";
/// Label name for tokenizer operation
pub const OPERATION_LABEL: &str = "operation";
/// Operation label values for tokenizer latency metric
pub mod operation {
/// Tokenization operation
pub const TOKENIZE: &str = "tokenize";
/// Detokenization operation
/// Currently unused, will be added next.
pub const DETOKENIZE: &str = "detokenize";
}
/// Migration type label values
pub mod migration_type {
/// Migration during initial stream creation (NoResponders error)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment