Unverified Commit 679d9f14 authored by Neelay Shah's avatar Neelay Shah Committed by GitHub
Browse files

feat: request lifecycle logging + metrics capture [DIS-1643] (#7840)


Co-authored-by: default avatarClaude Opus 4.6 (1M context) <noreply@anthropic.com>
parent 59d32d25
......@@ -88,10 +88,12 @@ pub async fn completion_response_stream(
let http_queue_guard = state.metrics_clone().create_http_queue_guard(model);
let inflight_guard =
state
.metrics_clone()
.create_inflight_guard(model, Endpoint::Completions, streaming);
let inflight_guard = state.metrics_clone().create_inflight_guard(
model,
Endpoint::Completions,
streaming,
&request_id,
);
let mut response_collector = state.metrics_clone().create_response_collector(model);
......
......@@ -89,10 +89,12 @@ pub async fn tensor_response_stream(
let http_queue_guard = state.metrics_clone().create_http_queue_guard(model);
let inflight_guard =
state
.metrics_clone()
.create_inflight_guard(model, Endpoint::Tensor, streaming);
let inflight_guard = state.metrics_clone().create_inflight_guard(
model,
Endpoint::Tensor,
streaming,
&request_id,
);
let mut response_collector = state.metrics_clone().create_response_collector(model);
......
......@@ -273,6 +273,14 @@ async fn anthropic_messages(
let mut response_collector = state.metrics_clone().create_response_collector(&model);
// Create inflight_guard early to ensure all errors are counted
let mut inflight_guard = state.metrics_clone().create_inflight_guard(
&model,
Endpoint::AnthropicMessages,
streaming,
request.id(),
);
tracing::trace!("Issuing generate call for Anthropic messages");
let engine_stream = engine.generate(request).await.map_err(|e| {
......@@ -305,11 +313,6 @@ async fn anthropic_messages(
Box<dyn futures::Stream<Item = Annotated<NvCreateChatCompletionStreamResponse>> + Send>,
> = Box::pin(engine_stream);
let mut inflight_guard =
state
.metrics_clone()
.create_inflight_guard(&model, Endpoint::AnthropicMessages, streaming);
if streaming {
stream_handle.arm();
......
......@@ -135,7 +135,7 @@ async fn connection_monitor(
match connection_rx.await {
Err(_) | Ok(ConnectionStatus::ClosedUnexpectedly) => {
// the client has disconnected, no need to gracefully cancel, just kill the context
tracing::trace!("Connection closed unexpectedly; issuing cancellation");
tracing::warn!("Connection closed unexpectedly; issuing cancellation");
if let Some(metrics) = &metrics {
metrics.inc_client_disconnect();
metrics.inc_cancellation(&cancellation_labels);
......@@ -150,7 +150,7 @@ async fn connection_monitor(
match stream_rx.await {
Err(_) | Ok(ConnectionStatus::ClosedUnexpectedly) => {
tracing::trace!("Stream closed unexpectedly; issuing cancellation");
tracing::warn!("Stream closed unexpectedly; issuing cancellation");
if let Some(metrics) = &metrics {
metrics.inc_client_disconnect();
metrics.inc_cancellation(&cancellation_labels);
......@@ -211,9 +211,19 @@ pub fn monitor_for_disconnects(
}
}
_ = context.stopped() => {
tracing::trace!("Context stopped; breaking stream");
// Mark as cancelled when context is stopped (client disconnect or timeout)
inflight_guard.mark_error(ErrorType::Cancelled);
// Token counts (input_tokens, output_tokens) are recorded on
// the enclosing span by ResponseMetricCollector::Drop.
tracing::warn!(
request_id = %inflight_guard.request_id(),
model = %inflight_guard.model(),
endpoint = %inflight_guard.endpoint(),
request_type = %inflight_guard.request_type(),
error_type = "cancelled",
elapsed_ms = %inflight_guard.elapsed_ms(),
"request cancelled"
);
break;
}
}
......
......@@ -283,6 +283,7 @@ pub struct InflightGuard {
status: Status,
error_type: ErrorType,
timer: Instant,
request_id: String,
}
/// Requests will be logged by the type of endpoint hit
......@@ -371,6 +372,10 @@ pub struct ResponseMetricCollector {
// be computed.
last_response_time: Option<Duration>,
osl: usize,
isl: usize,
ttft_ms: Option<f64>,
itl_sum_secs: f64,
itl_count: u64,
// we track if cached_tokens has been observed to ensure we only increment once per request
cached_tokens_observed: bool,
// we track if tokenize latency has been observed to ensure we only increment once per request
......@@ -914,6 +919,7 @@ impl Metrics {
model: &str,
endpoint: Endpoint,
streaming: bool,
request_id: &str,
) -> InflightGuard {
let request_type = if streaming {
RequestType::Stream
......@@ -926,6 +932,7 @@ impl Metrics {
model.to_string().to_lowercase(),
endpoint,
request_type,
request_id.to_string(),
)
}
......@@ -965,14 +972,21 @@ impl InflightGuard {
model: String,
endpoint: Endpoint,
request_type: RequestType,
request_id: String,
) -> Self {
// Start the timer
let timer = Instant::now();
// Increment the inflight gauge when the guard is created
metrics.inc_inflight_gauge(&model);
// Return the RAII Guard
tracing::Span::current().record("model", model.as_str());
tracing::info!(
request_id = %request_id,
model = %model,
endpoint = %endpoint,
request_type = %request_type,
"request received"
);
InflightGuard {
metrics,
model,
......@@ -981,9 +995,29 @@ impl InflightGuard {
status: Status::Error,
error_type: ErrorType::Internal,
timer,
request_id,
}
}
pub fn request_id(&self) -> &str {
&self.request_id
}
pub fn model(&self) -> &str {
&self.model
}
pub fn endpoint(&self) -> &Endpoint {
&self.endpoint
}
pub fn request_type(&self) -> &RequestType {
&self.request_type
}
pub fn error_type(&self) -> &ErrorType {
&self.error_type
}
pub fn elapsed_ms(&self) -> u128 {
self.timer.elapsed().as_millis()
}
pub(crate) fn mark_ok(&mut self) {
self.status = Status::Success;
self.error_type = ErrorType::None;
......@@ -998,13 +1032,7 @@ impl InflightGuard {
impl Drop for InflightGuard {
fn drop(&mut self) {
let duration = self.timer.elapsed().as_secs_f64();
// Decrement the gauge when the guard is dropped
self.metrics.dec_inflight_gauge(&self.model);
// the frequency on incrementing the full request counter is relatively low
// if we were incrementing the counter on every forward pass, we'd use static CounterVec or
// discrete counter object without the more costly lookup required for the following calls
self.metrics.inc_request_counter(
&self.model,
&self.endpoint,
......@@ -1012,12 +1040,48 @@ impl Drop for InflightGuard {
&self.status,
&self.error_type,
);
// Record the duration of the request
self.metrics
.request_duration
.with_label_values(&[&self.model])
.observe(duration);
let elapsed_ms = self.timer.elapsed().as_millis();
let status_str = self.status.as_str();
match self.status {
Status::Error => {
let detail = match self.error_type {
ErrorType::Cancelled => "cancelled before completion",
ErrorType::Internal => "internal server error during processing",
ErrorType::Validation => "invalid request parameters",
ErrorType::NotFound => "model or resource not found",
ErrorType::Overload => "service overloaded or rate limited",
ErrorType::NotImplemented => "requested feature not implemented",
ErrorType::None => "unknown error",
};
tracing::error!(
request_id = %self.request_id,
model = %self.model,
endpoint = %self.endpoint,
request_type = %self.request_type,
status = %status_str,
error_type = %self.error_type,
error_detail = %detail,
elapsed_ms = %elapsed_ms,
"request completed"
);
}
Status::Success => {
tracing::info!(
request_id = %self.request_id,
model = %self.model,
endpoint = %self.endpoint,
request_type = %self.request_type,
status = %status_str,
elapsed_ms = %elapsed_ms,
"request completed"
);
}
}
}
}
......@@ -1062,6 +1126,12 @@ impl RequestType {
}
}
impl std::fmt::Display for RequestType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(self.as_str())
}
}
impl Status {
pub fn as_str(&self) -> &'static str {
match self {
......@@ -1085,6 +1155,12 @@ impl ErrorType {
}
}
impl std::fmt::Display for ErrorType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(self.as_str())
}
}
impl ResponseMetricCollector {
fn new(metrics: Arc<Metrics>, model: String) -> Self {
ResponseMetricCollector {
......@@ -1094,6 +1170,10 @@ impl ResponseMetricCollector {
last_response_time: None,
start_time: Instant::now(),
osl: 0,
isl: 0,
ttft_ms: None,
itl_sum_secs: 0.0,
itl_count: 0,
cached_tokens_observed: false,
tokenize_latency_observed: false,
detokenize_latency_total: Duration::ZERO,
......@@ -1194,6 +1274,9 @@ impl ResponseMetricCollector {
return;
}
// Store ISL for span recording on drop
self.isl = isl;
// Increment the real-time output tokens counter
self.metrics
.output_tokens_counter
......@@ -1205,8 +1288,9 @@ impl ResponseMetricCollector {
// we use the full response time as TTFT and ignore the ITL
self.is_first_token = false;
// Publish TTFT
// Publish TTFT and store for span recording
let ttft = self.start_time.elapsed().as_secs_f64();
self.ttft_ms = Some(ttft * 1000.0);
self.metrics
.time_to_first_token
.with_label_values(&[&self.model])
......@@ -1247,6 +1331,8 @@ impl ResponseMetricCollector {
if let Some(last_response_time) = self.last_response_time {
let response_duration = current_duration - last_response_time;
let itl = response_duration.as_secs_f64() / num_tokens as f64;
self.itl_sum_secs += itl * num_tokens as f64;
self.itl_count += num_tokens as u64;
for _ in 0..num_tokens {
self.metrics
.inter_token_latency
......@@ -1292,6 +1378,25 @@ impl Drop for ResponseMetricCollector {
.output_sequence_length
.with_label_values(&[&self.model])
.observe(self.osl as f64);
// Record request summary on the enclosing span.
// InflightGuard::Drop and on_response logs will inherit these.
let span = tracing::Span::current();
span.record("input_tokens", self.isl as u32);
span.record("output_tokens", self.osl as u32);
if let Some(ttft_ms) = self.ttft_ms {
span.record("ttft_ms", format!("{:.2}", ttft_ms).as_str());
}
if self.itl_count > 0 {
let avg_ms = (self.itl_sum_secs / self.itl_count as f64) * 1000.0;
span.record("avg_itl_ms", format!("{:.2}", avg_ms).as_str());
}
if let Some(worker_id) = self.prefill_worker_id {
span.record("prefill_worker_id", worker_id);
}
if let Some(worker_id) = self.decode_worker_id {
span.record("decode_worker_id", worker_id);
}
}
}
......@@ -2087,7 +2192,7 @@ mod tests {
let mut guard =
metrics
.clone()
.create_inflight_guard(model, Endpoint::ChatCompletions, false);
.create_inflight_guard(model, Endpoint::ChatCompletions, false, "");
guard.mark_ok();
} // guard drops here
......@@ -2117,7 +2222,7 @@ mod tests {
let mut guard =
metrics
.clone()
.create_inflight_guard(model, Endpoint::ChatCompletions, false);
.create_inflight_guard(model, Endpoint::ChatCompletions, false, "");
guard.mark_error(ErrorType::Validation);
} // guard drops here
......@@ -2147,7 +2252,7 @@ mod tests {
let _guard =
metrics
.clone()
.create_inflight_guard(model, Endpoint::ChatCompletions, false);
.create_inflight_guard(model, Endpoint::ChatCompletions, false, "");
// Don't call mark_ok() or mark_error() - simulate panic/unhandled error
} // guard drops with default error_type=Internal
......@@ -2187,7 +2292,7 @@ mod tests {
for error_type in &error_types {
let mut guard = metrics
.clone()
.create_inflight_guard(model, endpoint, false);
.create_inflight_guard(model, endpoint, false, "");
guard.mark_error(error_type.clone());
drop(guard);
}
......@@ -2226,7 +2331,7 @@ mod tests {
let mut guard =
metrics
.clone()
.create_inflight_guard(model, Endpoint::ChatCompletions, false);
.create_inflight_guard(model, Endpoint::ChatCompletions, false, "");
guard.mark_error(ErrorType::Validation);
drop(guard);
}
......@@ -2235,7 +2340,7 @@ mod tests {
let mut guard =
metrics
.clone()
.create_inflight_guard(model, Endpoint::Completions, false);
.create_inflight_guard(model, Endpoint::Completions, false, "");
guard.mark_error(ErrorType::Internal);
drop(guard);
}
......@@ -2244,7 +2349,7 @@ mod tests {
let mut guard =
metrics
.clone()
.create_inflight_guard(model, Endpoint::Embeddings, false);
.create_inflight_guard(model, Endpoint::Embeddings, false, "");
guard.mark_ok();
drop(guard);
}
......
......@@ -443,10 +443,12 @@ async fn completions_single(
let model = request.inner.model.clone();
// Create inflight_guard early to ensure all errors are counted
let mut inflight_guard =
state
.metrics_clone()
.create_inflight_guard(&model, Endpoint::Completions, streaming);
let mut inflight_guard = state.metrics_clone().create_inflight_guard(
&model,
Endpoint::Completions,
streaming,
&request_id,
);
// Create http_queue_guard early - tracks time waiting to be processed
let http_queue_guard = state.metrics_clone().create_http_queue_guard(&model);
......@@ -577,10 +579,12 @@ async fn completions_batch(
let model = request.inner.model.clone();
// Create inflight_guard early to ensure all errors are counted
let mut inflight_guard =
state
.metrics_clone()
.create_inflight_guard(&model, Endpoint::Completions, streaming);
let mut inflight_guard = state.metrics_clone().create_inflight_guard(
&model,
Endpoint::Completions,
streaming,
&request_id,
);
// Create http_queue_guard early - tracks time waiting to be processed
let http_queue_guard = state.metrics_clone().create_http_queue_guard(&model);
......@@ -753,10 +757,12 @@ async fn embeddings(
let model = &request.inner.model;
// Create inflight_guard early to ensure all errors are counted
let mut inflight =
state
.metrics_clone()
.create_inflight_guard(model, Endpoint::Embeddings, streaming);
let mut inflight = state.metrics_clone().create_inflight_guard(
model,
Endpoint::Embeddings,
streaming,
&request_id,
);
// Create http_queue_guard early - tracks time waiting to be processed
let http_queue_guard = state.metrics_clone().create_http_queue_guard(model);
......@@ -1124,10 +1130,12 @@ async fn chat_completions(
tracing::trace!("Received chat completions request: {:?}", request.content());
// Create inflight_guard early to ensure all errors (including validation) are counted
let mut inflight_guard =
state
.metrics_clone()
.create_inflight_guard(&model, Endpoint::ChatCompletions, streaming);
let mut inflight_guard = state.metrics_clone().create_inflight_guard(
&model,
Endpoint::ChatCompletions,
streaming,
&request_id,
);
// Handle unsupported fields - if Some(resp) is returned by
// validate_chat_completion_unsupported_fields,
......@@ -1499,10 +1507,12 @@ async fn responses(
// Create http_queue_guard early - tracks time waiting to be processed
let http_queue_guard = state.metrics_clone().create_http_queue_guard(&model);
let mut inflight_guard =
state
.metrics_clone()
.create_inflight_guard(&model, Endpoint::Responses, streaming);
let mut inflight_guard = state.metrics_clone().create_inflight_guard(
&model,
Endpoint::Responses,
streaming,
request.id(),
);
// Handle unsupported fields - if Some(resp) is returned by validate_unsupported_fields,
// then a field was used that is unsupported. We will log an error message
......@@ -1949,10 +1959,12 @@ async fn images(
.map_err(|_| ErrorMessage::model_not_found())?;
// this will increment the inflight gauge for the model
let mut inflight =
state
.metrics_clone()
.create_inflight_guard(&model, Endpoint::Images, streaming);
let mut inflight = state.metrics_clone().create_inflight_guard(
&model,
Endpoint::Images,
streaming,
&request_id,
);
let mut response_collector = state.metrics_clone().create_response_collector(&model);
......@@ -2033,10 +2045,12 @@ async fn videos(
.map_err(|_| ErrorMessage::model_not_found())?;
// this will increment the inflight gauge for the model
let mut inflight =
state
.metrics_clone()
.create_inflight_guard(&model, Endpoint::Videos, streaming);
let mut inflight = state.metrics_clone().create_inflight_guard(
&model,
Endpoint::Videos,
streaming,
&request_id,
);
let mut response_collector = state.metrics_clone().create_response_collector(&model);
......@@ -2095,9 +2109,10 @@ async fn video_stream(
.get_videos_engine(&model)
.map_err(|_| ErrorMessage::model_not_found())?;
let mut inflight = state
.metrics_clone()
.create_inflight_guard(&model, Endpoint::Videos, true);
let mut inflight =
state
.metrics_clone()
.create_inflight_guard(&model, Endpoint::Videos, true, request.id());
let mut response_collector = state.metrics_clone().create_response_collector(&model);
......@@ -2261,10 +2276,12 @@ async fn audio_speech(
.get_audios_engine(&model)
.map_err(|_| ErrorMessage::model_not_found())?;
let mut inflight =
state
.metrics_clone()
.create_inflight_guard(&model, Endpoint::Audios, streaming);
let mut inflight = state.metrics_clone().create_inflight_guard(
&model,
Endpoint::Audios,
streaming,
&request_id,
);
let mut response_collector = state.metrics_clone().create_response_collector(&model);
......
......@@ -78,6 +78,7 @@ async fn test_metrics_prefix_default() {
"test-model",
Endpoint::ChatCompletions,
false,
"",
);
}
......@@ -117,6 +118,7 @@ async fn test_metrics_prefix_custom() {
"test-model",
Endpoint::ChatCompletions,
true,
"",
);
}
......@@ -151,6 +153,7 @@ async fn test_metrics_prefix_sanitized() {
"test-model",
Endpoint::ChatCompletions,
true,
"",
);
}
......
......@@ -358,7 +358,11 @@ impl<Req: PipelineIO + Sync, Resp: PipelineIO> Ingress<Req, Resp> {
#[async_trait]
pub trait PushWorkHandler: Send + Sync {
async fn handle_payload(&self, payload: Bytes) -> Result<(), PipelineError>;
async fn handle_payload(
&self,
payload: Bytes,
request_id: Option<String>,
) -> Result<(), PipelineError>;
/// Add metrics to the handler
fn add_metrics(
......
......@@ -241,6 +241,7 @@ where
// Prepare trace headers using shared helper
let mut headers = std::collections::HashMap::new();
inject_trace_headers_into_map(&mut headers);
headers.insert("request-id".to_string(), request_id.clone());
// Stamp send time right before the transport write so the network
// transit metric excludes serialization/encoding overhead.
......
......@@ -253,7 +253,7 @@ async fn handle_shared_request(
tokio::spawn(async move {
tracing::trace!(instance_id, "handling new HTTP request");
let result = service_handler
.handle_payload(body)
.handle_payload(body, traceparent.request_id.clone())
.instrument(tracing::info_span!(
"handle_payload",
component = component_name.as_ref(),
......
......@@ -105,10 +105,23 @@ impl PushEndpoint {
tracing::info_span!(target: "request_span", "handle_payload")
};
// Extract request_id from headers before passing payload
let request_id = req
.message
.headers
.as_ref()
.and_then(|h| h.get("request-id").map(|v| v.to_string()))
.or_else(|| {
req.message
.headers
.as_ref()
.and_then(|h| h.get("x-dynamo-request-id").map(|v| v.to_string()))
});
tokio::spawn(async move {
tracing::trace!(instance_id, "handling new request");
let result = ingress
.handle_payload(req.message.payload)
.handle_payload(req.message.payload, request_id)
.instrument(span)
.await;
match result {
......
......@@ -111,17 +111,23 @@ impl WorkHandlerMetrics {
}
}
// RAII guard to ensure inflight gauge is decremented and request duration is observed on all code paths.
// RAII guard to ensure inflight gauge is decremented, request duration is observed,
// and lifecycle logs are emitted on all code paths.
struct RequestMetricsGuard {
inflight_requests: prometheus::IntGauge,
request_duration: prometheus::Histogram,
start_time: Instant,
request_id: Option<String>,
}
impl Drop for RequestMetricsGuard {
fn drop(&mut self) {
self.inflight_requests.dec();
self.request_duration
.observe(self.start_time.elapsed().as_secs_f64());
if let Some(request_id) = &self.request_id {
tracing::info!(request_id = %request_id, "request completed");
}
}
}
......@@ -149,7 +155,11 @@ where
Ok(())
}
async fn handle_payload(&self, payload: Bytes) -> Result<(), PipelineError> {
async fn handle_payload(
&self,
payload: Bytes,
request_id: Option<String>,
) -> Result<(), PipelineError> {
let t2_wallclock_ns = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
......@@ -161,10 +171,14 @@ where
m.request_counter.inc();
m.inflight_requests.inc();
m.request_bytes.inc_by(payload.len() as u64);
if let Some(rid) = &request_id {
tracing::info!(request_id = %rid, "request received");
}
RequestMetricsGuard {
inflight_requests: m.inflight_requests.clone(),
request_duration: m.request_duration.clone(),
start_time,
request_id: request_id.clone(),
}
});
......@@ -357,6 +371,7 @@ where
}
// Ensure the metrics guard is not dropped until the end of the function.
// Drop fires "request completed" log via RAII.
drop(_inflight_guard);
Ok(())
......
......@@ -209,9 +209,15 @@ impl SharedTcpServer {
work_item.instance_id,
);
let request_id = work_item
.headers
.get("request-id")
.or_else(|| work_item.headers.get("x-dynamo-request-id"))
.cloned();
let result = work_item
.service_handler
.handle_payload(work_item.payload)
.handle_payload(work_item.payload, request_id)
.instrument(span)
.await;
......@@ -657,7 +663,11 @@ mod tests {
#[async_trait]
impl PushWorkHandler for SlowMockHandler {
async fn handle_payload(&self, _payload: Bytes) -> Result<(), PipelineError> {
async fn handle_payload(
&self,
_payload: Bytes,
_request_id: Option<String>,
) -> Result<(), PipelineError> {
self.request_in_flight.store(true, Ordering::SeqCst);
self.request_started.notify_one();
......@@ -738,7 +748,7 @@ mod tests {
let handler = handler.clone();
async move {
let payload = Bytes::from("test payload");
handler.handle_payload(payload).await
handler.handle_payload(payload, None).await
}
});
......@@ -861,7 +871,11 @@ mod tests {
#[async_trait]
impl PushWorkHandler for ConcurrencyTrackingHandler {
async fn handle_payload(&self, _payload: Bytes) -> Result<(), PipelineError> {
async fn handle_payload(
&self,
_payload: Bytes,
_request_id: Option<String>,
) -> Result<(), PipelineError> {
// Increment concurrent count
let current = self.concurrent_count.fetch_add(1, Ordering::SeqCst) + 1;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment