"deploy/vscode:/vscode.git/clone" did not exist on "6c539fbdacaa8d752695ca50e44fd2c96f04fdcc"
Unverified Commit 13058cbd authored by Neelay Shah's avatar Neelay Shah Committed by GitHub
Browse files

feat: trace infrastructure — spans + request ID propagation [DIS-1643] (#7733)


Co-authored-by: default avatarClaude Opus 4.6 (1M context) <noreply@anthropic.com>
parent 658b0d5a
......@@ -124,7 +124,8 @@ async fn handler_anthropic_messages(
}
// Create request context
let request_id = get_or_create_request_id(None, &headers);
let request_id = get_or_create_request_id(&headers)
.map_err(|msg| anthropic_error(StatusCode::BAD_REQUEST, "invalid_request_error", &msg))?;
let streaming = request.stream;
let cancellation_labels = CancellationLabels {
model: request.model.clone(),
......
......@@ -291,37 +291,46 @@ pub async fn smart_json_error_middleware(request: Request<Body>, next: Next) ->
}
}
/// Get the request ID from a primary source, or next from the headers, or lastly create a new one if not present
// TODO: Similar function exists in lib/llm/src/grpc/service/openai.rs but with different signature and simpler logic
pub(super) fn get_or_create_request_id(primary: Option<&str>, headers: &HeaderMap) -> String {
// Try to get request id from trace context
if let Some(trace_context) = get_distributed_tracing_context()
&& let Some(x_dynamo_request_id) = trace_context.x_dynamo_request_id
{
return x_dynamo_request_id;
/// Validate the `x-dynamo-request-id` header and return the request ID.
///
/// Returns `Err(message)` if the header is present but invalid (not UTF-8 or not a UUID).
/// The caller is responsible for converting the error message into the appropriate HTTP
/// error format (OpenAI vs Anthropic).
///
/// The request ID comes from the trace context — `make_inference_request_span()` guarantees it by
/// generating a UUID when the client doesn't provide one.
pub(super) fn get_or_create_request_id(headers: &HeaderMap) -> Result<String, String> {
// Validate and extract x-dynamo-request-id header if present.
// Returns error for non-UTF-8 or non-UUID values.
let validated_header = if let Some(raw) = headers.get(DYNAMO_REQUEST_ID_HEADER) {
match raw.to_str() {
Err(_) => {
return Err(format!(
"{}{} header must be a valid UTF-8 string",
VALIDATION_PREFIX, DYNAMO_REQUEST_ID_HEADER
));
}
// Try to get the request ID from the primary source
if let Some(primary) = primary
&& let Ok(uuid) = uuid::Uuid::parse_str(primary)
{
return uuid.to_string();
Ok(s) if uuid::Uuid::parse_str(s).is_err() => {
return Err(format!(
"{}{} header must be a valid UUID, got: {}",
VALIDATION_PREFIX, DYNAMO_REQUEST_ID_HEADER, s
));
}
// Try to get the request ID header as a string slice
let request_id_opt = headers
.get(DYNAMO_REQUEST_ID_HEADER)
.and_then(|h| h.to_str().ok());
// Try to parse the request ID as a UUID, or generate a new one if missing/invalid
let uuid = match request_id_opt {
Some(request_id) => {
uuid::Uuid::parse_str(request_id).unwrap_or_else(|_| uuid::Uuid::new_v4())
Ok(s) => Some(s.to_string()),
}
None => uuid::Uuid::new_v4(),
} else {
None
};
uuid.to_string()
// Prefer trace context (set by make_inference_request_span via DistributedTraceIdLayer)
if let Some(trace_context) = get_distributed_tracing_context()
&& let Some(x_dynamo_request_id) = trace_context.x_dynamo_request_id
{
return Ok(x_dynamo_request_id);
}
// Fallback: use validated header value, or generate new UUID
Ok(validated_header.unwrap_or_else(|| uuid::Uuid::new_v4().to_string()))
}
/// OpenAI Completions Request Handler
......@@ -343,7 +352,12 @@ async fn handler_completions(
request.nvext = apply_header_routing_overrides(request.nvext.take(), &headers);
// create the context for the request
let request_id = get_or_create_request_id(request.inner.user.as_deref(), &headers);
let request_id = get_or_create_request_id(&headers).map_err(|msg| {
ErrorMessage::from_http_error(HttpError {
code: 400,
message: msg,
})
})?;
let streaming = request.inner.stream.unwrap_or(false);
let cancellation_labels = CancellationLabels {
model: request.inner.model.clone(),
......@@ -723,7 +737,12 @@ async fn embeddings(
// return a 503 if the service is not ready
check_ready(&state)?;
let request_id = get_or_create_request_id(request.inner.user.as_deref(), &headers);
let request_id = get_or_create_request_id(&headers).map_err(|msg| {
ErrorMessage::from_http_error(HttpError {
code: 400,
message: msg,
})
})?;
let request = Context::with_id(request, request_id);
let request_id = request.id().to_string();
......@@ -801,7 +820,12 @@ async fn handler_chat_completions(
request.nvext = apply_header_routing_overrides(request.nvext.take(), &headers);
// create the context for the request
let request_id = get_or_create_request_id(request.inner.user.as_deref(), &headers);
let request_id = get_or_create_request_id(&headers).map_err(|msg| {
ErrorMessage::from_http_error(HttpError {
code: 400,
message: msg,
})
})?;
let streaming = request.inner.stream.unwrap_or(false);
let cancellation_labels = CancellationLabels {
model: request.inner.model.clone(),
......@@ -1410,7 +1434,12 @@ async fn handler_responses(
request.nvext = apply_header_routing_overrides(request.nvext.take(), &headers);
// create the context for the request
let request_id = get_or_create_request_id(None, &headers);
let request_id = get_or_create_request_id(&headers).map_err(|msg| {
ErrorMessage::from_http_error(HttpError {
code: 400,
message: msg,
})
})?;
let streaming = request.inner.stream.unwrap_or(false);
let cancellation_labels = CancellationLabels {
model: request.inner.model.clone().unwrap_or_default(),
......@@ -1902,7 +1931,12 @@ async fn images(
// return a 503 if the service is not ready
check_ready(&state)?;
let request_id = get_or_create_request_id(request.inner.user.as_deref(), &headers);
let request_id = get_or_create_request_id(&headers).map_err(|msg| {
ErrorMessage::from_http_error(HttpError {
code: 400,
message: msg,
})
})?;
let request = Context::with_id(request, request_id);
let request_id = request.id().to_string();
......@@ -1995,7 +2029,12 @@ async fn videos(
// return a 503 if the service is not ready
check_ready(&state)?;
let request_id = get_or_create_request_id(request.user.as_deref(), &headers);
let request_id = get_or_create_request_id(&headers).map_err(|msg| {
ErrorMessage::from_http_error(HttpError {
code: 400,
message: msg,
})
})?;
let request = Context::with_id(request, request_id);
let request_id = request.id().to_string();
......@@ -2066,7 +2105,12 @@ async fn video_stream(
) -> Result<Response, ErrorResponse> {
check_ready(&state)?;
let request_id = get_or_create_request_id(request.user.as_deref(), &headers);
let request_id = get_or_create_request_id(&headers).map_err(|msg| {
ErrorMessage::from_http_error(HttpError {
code: 400,
message: msg,
})
})?;
let request = Context::with_id(request, request_id);
let model = request.model.clone();
......@@ -2220,7 +2264,12 @@ async fn audio_speech(
check_ready(&state)?;
let response_format = request.response_format.clone();
let request_id = get_or_create_request_id(request.user.as_deref(), &headers);
let request_id = get_or_create_request_id(&headers).map_err(|msg| {
ErrorMessage::from_http_error(HttpError {
code: 400,
message: msg,
})
})?;
let request = Context::with_id(request, request_id);
let request_id = request.id().to_string();
......
......@@ -28,7 +28,7 @@ use derive_builder::Builder;
use dynamo_runtime::config::env_is_truthy;
use dynamo_runtime::config::environment_names::llm as env_llm;
use dynamo_runtime::discovery::Discovery;
use dynamo_runtime::logging::make_request_span;
use dynamo_runtime::logging::make_inference_request_span;
use dynamo_runtime::metrics::{
frontend_perf::ensure_frontend_perf_metrics_registered_prometheus,
request_plane::ensure_request_plane_metrics_registered_prometheus,
......@@ -526,7 +526,7 @@ impl HttpServiceConfigBuilder {
// Add on_response callback for logging response status code
router = router.layer(
TraceLayer::new_for_http()
.make_span_with(make_request_span)
.make_span_with(make_inference_request_span)
.on_response(
|response: &Response<Body>, latency: Duration, _span: &tracing::Span| {
let status = response.status();
......
......@@ -288,8 +288,12 @@ impl TraceParent {
}
}
// Takes Axum request and returning a span
pub fn make_request_span<B>(req: &Request<B>) -> Span {
/// Create a span for inference request endpoints (completions, chat, embeddings, etc.).
///
/// Uses `target: "request_span"` which is always allowed through the DYN_LOG filter
/// (via `request_span=trace` directive in `filters()`). This ensures request context
/// (request_id, model, trace_id) is always available on log events.
pub fn make_inference_request_span<B>(req: &Request<B>) -> Span {
let method = req.method();
let uri = req.uri();
let version = format!("{:?}", req.version());
......@@ -297,7 +301,15 @@ pub fn make_request_span<B>(req: &Request<B>) -> Span {
let otel_context = extract_otel_context_from_http_headers(req.headers());
// Generate a request ID if the client didn't provide one so that workers
// (which read x_dynamo_request_id from the span/trace context) always
// have a consistent ID to correlate with.
let x_dynamo_request_id = trace_parent
.x_dynamo_request_id
.unwrap_or_else(|| Uuid::new_v4().to_string());
let span = tracing::info_span!(
target: "request_span",
"http-request",
method = %method,
uri = %uri,
......@@ -305,7 +317,10 @@ pub fn make_request_span<B>(req: &Request<B>) -> Span {
trace_id = trace_parent.trace_id,
parent_id = trace_parent.parent_id,
x_request_id = trace_parent.x_request_id,
x_dynamo_request_id = trace_parent.x_dynamo_request_id,
x_dynamo_request_id = %x_dynamo_request_id,
model = tracing::field::Empty,
input_tokens = tracing::field::Empty,
output_tokens = tracing::field::Empty,
);
if let Some(context) = otel_context {
......@@ -315,6 +330,21 @@ pub fn make_request_span<B>(req: &Request<B>) -> Span {
span
}
/// Create a span for system endpoints (health, metrics, models, etc.).
///
/// Uses `target: "system_span"` which follows normal DYN_LOG filtering — these
/// endpoints are polled frequently and don't need to be visible at INFO level.
pub fn make_system_request_span<B>(req: &Request<B>) -> Span {
let method = req.method();
let uri = req.uri();
tracing::debug_span!(
target: "system_span",
"http-request",
method = %method,
uri = %uri,
)
}
/// Extract OpenTelemetry context from HTTP headers for distributed tracing
fn extract_otel_context_from_http_headers(
headers: &http::HeaderMap,
......@@ -364,6 +394,7 @@ pub fn make_handle_payload_span(
if let (Some(trace_id), Some(parent_id)) = (trace_id.as_ref(), parent_span_id.as_ref()) {
let span = tracing::info_span!(
target: "request_span",
"handle_payload",
trace_id = trace_id.as_str(),
parent_id = parent_id.as_str(),
......@@ -382,6 +413,7 @@ pub fn make_handle_payload_span(
span
} else {
tracing::info_span!(
target: "request_span",
"handle_payload",
x_request_id = trace_parent.x_request_id,
x_dynamo_request_id = trace_parent.x_dynamo_request_id,
......@@ -409,6 +441,7 @@ pub fn make_handle_payload_span_from_tcp_headers(
if let (Some(trace_id), Some(parent_id)) = (trace_id.as_ref(), parent_span_id.as_ref()) {
let span = tracing::info_span!(
target: "request_span",
"handle_payload",
trace_id = trace_id.as_str(),
parent_id = parent_id.as_str(),
......@@ -427,6 +460,7 @@ pub fn make_handle_payload_span_from_tcp_headers(
span
} else {
tracing::info_span!(
target: "request_span",
"handle_payload",
x_request_id = x_request_id,
x_dynamo_request_id = x_dynamo_request_id,
......@@ -1043,6 +1077,12 @@ fn filters(config: LoggingConfig) -> EnvFilter {
filter_layer = filter_layer.add_directive("span_event=trace".parse().unwrap());
}
// Always allow infrastructure request spans regardless of DYN_LOG level.
// This ensures request context (request_id, model, trace_id) is always
// available on log events, even when DYN_LOG=error or DYN_LOG=warn.
// Can be overridden via DYN_LOG=request_span=<level> if needed.
filter_layer = filter_layer.add_directive("request_span=trace".parse().unwrap());
filter_layer
}
......
......@@ -102,7 +102,7 @@ impl PushEndpoint {
instance_id,
)
} else {
tracing::info_span!("handle_payload")
tracing::info_span!(target: "request_span", "handle_payload")
};
tokio::spawn(async move {
......
......@@ -8,7 +8,7 @@ use crate::config::HealthStatus;
use crate::config::environment_names::logging as env_logging;
use crate::config::environment_names::runtime::canary as env_canary;
use crate::config::environment_names::runtime::system as env_system;
use crate::logging::make_request_span;
use crate::logging::make_system_request_span;
use crate::metrics::MetricsHierarchy;
use crate::traits::DistributedRuntimeProvider;
use axum::{
......@@ -221,7 +221,7 @@ pub async fn spawn_system_status_server(
tracing::info!("[fallback handler] called");
(StatusCode::NOT_FOUND, "Route not found").into_response()
})
.layer(TraceLayer::new_for_http().make_span_with(make_request_span));
.layer(TraceLayer::new_for_http().make_span_with(make_system_request_span));
let address = format!("{}:{}", host, port);
tracing::info!("[spawn_system_status_server] binding to: {address}");
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment