Unverified Commit c376655f authored by Neelay Shah's avatar Neelay Shah Committed by GitHub
Browse files

refactor: unify on_response, rename request_id fields, deprecate x-dynamo-request-id (#7834)


Co-authored-by: default avatarClaude Opus 4.6 (1M context) <noreply@anthropic.com>
parent 2b906504
...@@ -124,8 +124,7 @@ async fn handler_anthropic_messages( ...@@ -124,8 +124,7 @@ async fn handler_anthropic_messages(
} }
// Create request context // Create request context
let request_id = get_or_create_request_id(&headers) let request_id = get_or_create_request_id(&headers);
.map_err(|msg| anthropic_error(StatusCode::BAD_REQUEST, "invalid_request_error", &msg))?;
let streaming = request.stream; let streaming = request.stream;
let cancellation_labels = CancellationLabels { let cancellation_labels = CancellationLabels {
model: request.model.clone(), model: request.model.clone(),
......
...@@ -291,30 +291,39 @@ pub async fn smart_json_error_middleware(request: Request<Body>, next: Next) -> ...@@ -291,30 +291,39 @@ pub async fn smart_json_error_middleware(request: Request<Body>, next: Next) ->
} }
} }
/// Validate the `x-dynamo-request-id` header and return the request ID. /// Return the request ID for the current request.
/// ///
/// Returns `Err(message)` if the header is present but invalid (not UTF-8 or not a UUID). /// The canonical request ID is set by `make_inference_request_span()` and stored
/// The caller is responsible for converting the error message into the appropriate HTTP /// in the `DistributedTraceContext` via `DistributedTraceIdLayer`. This function
/// error format (OpenAI vs Anthropic). /// retrieves it, falling back to a validated `x-dynamo-request-id` header value
/// (deprecated, DEP #7812) or a new UUID.
/// ///
/// The request ID comes from the trace context — `make_inference_request_span()` guarantees it by /// **Deprecation (DEP #7812):** The `x-dynamo-request-id` header is deprecated.
/// generating a UUID when the client doesn't provide one. /// Clients should rely on server-generated request IDs instead of supplying their own.
pub(super) fn get_or_create_request_id(headers: &HeaderMap) -> Result<String, String> { pub(super) fn get_or_create_request_id(headers: &HeaderMap) -> String {
// Validate and extract x-dynamo-request-id header if present. // Validate x-dynamo-request-id header if present, warn on invalid values.
// Returns error for non-UTF-8 or non-UUID values. // DEP #7812: x-dynamo-request-id is deprecated — clients should rely on
// server-generated request IDs instead of supplying their own.
let validated_header = if let Some(raw) = headers.get(DYNAMO_REQUEST_ID_HEADER) { let validated_header = if let Some(raw) = headers.get(DYNAMO_REQUEST_ID_HEADER) {
tracing::warn!(
"{} header is deprecated (DEP #7812); server-generated request IDs should be used instead",
DYNAMO_REQUEST_ID_HEADER
);
match raw.to_str() { match raw.to_str() {
Err(_) => { Err(_) => {
return Err(format!( tracing::warn!(
"{}{} header must be a valid UTF-8 string", "{} header must be a valid UTF-8 string",
VALIDATION_PREFIX, DYNAMO_REQUEST_ID_HEADER DYNAMO_REQUEST_ID_HEADER
)); );
None
} }
Ok(s) if uuid::Uuid::parse_str(s).is_err() => { Ok(s) if uuid::Uuid::parse_str(s).is_err() => {
return Err(format!( tracing::warn!(
"{}{} header must be a valid UUID, got: {}", "{} header must be a valid UUID, got: {}",
VALIDATION_PREFIX, DYNAMO_REQUEST_ID_HEADER, s DYNAMO_REQUEST_ID_HEADER,
)); s
);
None
} }
Ok(s) => Some(s.to_string()), Ok(s) => Some(s.to_string()),
} }
...@@ -324,13 +333,13 @@ pub(super) fn get_or_create_request_id(headers: &HeaderMap) -> Result<String, St ...@@ -324,13 +333,13 @@ pub(super) fn get_or_create_request_id(headers: &HeaderMap) -> Result<String, St
// Prefer trace context (set by make_inference_request_span via DistributedTraceIdLayer) // Prefer trace context (set by make_inference_request_span via DistributedTraceIdLayer)
if let Some(trace_context) = get_distributed_tracing_context() if let Some(trace_context) = get_distributed_tracing_context()
&& let Some(x_dynamo_request_id) = trace_context.x_dynamo_request_id && let Some(request_id) = trace_context.request_id
{ {
return Ok(x_dynamo_request_id); return request_id;
} }
// Fallback: use validated header value, or generate new UUID // Fallback: use validated header for backwards compat, or generate new UUID
Ok(validated_header.unwrap_or_else(|| uuid::Uuid::new_v4().to_string())) validated_header.unwrap_or_else(|| uuid::Uuid::new_v4().to_string())
} }
/// OpenAI Completions Request Handler /// OpenAI Completions Request Handler
...@@ -352,12 +361,7 @@ async fn handler_completions( ...@@ -352,12 +361,7 @@ async fn handler_completions(
request.nvext = apply_header_routing_overrides(request.nvext.take(), &headers); request.nvext = apply_header_routing_overrides(request.nvext.take(), &headers);
// create the context for the request // create the context for the request
let request_id = get_or_create_request_id(&headers).map_err(|msg| { let request_id = get_or_create_request_id(&headers);
ErrorMessage::from_http_error(HttpError {
code: 400,
message: msg,
})
})?;
let streaming = request.inner.stream.unwrap_or(false); let streaming = request.inner.stream.unwrap_or(false);
let cancellation_labels = CancellationLabels { let cancellation_labels = CancellationLabels {
model: request.inner.model.clone(), model: request.inner.model.clone(),
...@@ -737,12 +741,7 @@ async fn embeddings( ...@@ -737,12 +741,7 @@ async fn embeddings(
// return a 503 if the service is not ready // return a 503 if the service is not ready
check_ready(&state)?; check_ready(&state)?;
let request_id = get_or_create_request_id(&headers).map_err(|msg| { let request_id = get_or_create_request_id(&headers);
ErrorMessage::from_http_error(HttpError {
code: 400,
message: msg,
})
})?;
let request = Context::with_id(request, request_id); let request = Context::with_id(request, request_id);
let request_id = request.id().to_string(); let request_id = request.id().to_string();
...@@ -820,12 +819,7 @@ async fn handler_chat_completions( ...@@ -820,12 +819,7 @@ async fn handler_chat_completions(
request.nvext = apply_header_routing_overrides(request.nvext.take(), &headers); request.nvext = apply_header_routing_overrides(request.nvext.take(), &headers);
// create the context for the request // create the context for the request
let request_id = get_or_create_request_id(&headers).map_err(|msg| { let request_id = get_or_create_request_id(&headers);
ErrorMessage::from_http_error(HttpError {
code: 400,
message: msg,
})
})?;
let streaming = request.inner.stream.unwrap_or(false); let streaming = request.inner.stream.unwrap_or(false);
let cancellation_labels = CancellationLabels { let cancellation_labels = CancellationLabels {
model: request.inner.model.clone(), model: request.inner.model.clone(),
...@@ -1434,12 +1428,7 @@ async fn handler_responses( ...@@ -1434,12 +1428,7 @@ async fn handler_responses(
request.nvext = apply_header_routing_overrides(request.nvext.take(), &headers); request.nvext = apply_header_routing_overrides(request.nvext.take(), &headers);
// create the context for the request // create the context for the request
let request_id = get_or_create_request_id(&headers).map_err(|msg| { let request_id = get_or_create_request_id(&headers);
ErrorMessage::from_http_error(HttpError {
code: 400,
message: msg,
})
})?;
let streaming = request.inner.stream.unwrap_or(false); let streaming = request.inner.stream.unwrap_or(false);
let cancellation_labels = CancellationLabels { let cancellation_labels = CancellationLabels {
model: request.inner.model.clone().unwrap_or_default(), model: request.inner.model.clone().unwrap_or_default(),
...@@ -1931,12 +1920,7 @@ async fn images( ...@@ -1931,12 +1920,7 @@ async fn images(
// return a 503 if the service is not ready // return a 503 if the service is not ready
check_ready(&state)?; check_ready(&state)?;
let request_id = get_or_create_request_id(&headers).map_err(|msg| { let request_id = get_or_create_request_id(&headers);
ErrorMessage::from_http_error(HttpError {
code: 400,
message: msg,
})
})?;
let request = Context::with_id(request, request_id); let request = Context::with_id(request, request_id);
let request_id = request.id().to_string(); let request_id = request.id().to_string();
...@@ -2029,12 +2013,7 @@ async fn videos( ...@@ -2029,12 +2013,7 @@ async fn videos(
// return a 503 if the service is not ready // return a 503 if the service is not ready
check_ready(&state)?; check_ready(&state)?;
let request_id = get_or_create_request_id(&headers).map_err(|msg| { let request_id = get_or_create_request_id(&headers);
ErrorMessage::from_http_error(HttpError {
code: 400,
message: msg,
})
})?;
let request = Context::with_id(request, request_id); let request = Context::with_id(request, request_id);
let request_id = request.id().to_string(); let request_id = request.id().to_string();
...@@ -2105,12 +2084,7 @@ async fn video_stream( ...@@ -2105,12 +2084,7 @@ async fn video_stream(
) -> Result<Response, ErrorResponse> { ) -> Result<Response, ErrorResponse> {
check_ready(&state)?; check_ready(&state)?;
let request_id = get_or_create_request_id(&headers).map_err(|msg| { let request_id = get_or_create_request_id(&headers);
ErrorMessage::from_http_error(HttpError {
code: 400,
message: msg,
})
})?;
let request = Context::with_id(request, request_id); let request = Context::with_id(request, request_id);
let model = request.model.clone(); let model = request.model.clone();
...@@ -2264,12 +2238,7 @@ async fn audio_speech( ...@@ -2264,12 +2238,7 @@ async fn audio_speech(
check_ready(&state)?; check_ready(&state)?;
let response_format = request.response_format.clone(); let response_format = request.response_format.clone();
let request_id = get_or_create_request_id(&headers).map_err(|msg| { let request_id = get_or_create_request_id(&headers);
ErrorMessage::from_http_error(HttpError {
code: 400,
message: msg,
})
})?;
let request = Context::with_id(request, request_id); let request = Context::with_id(request, request_id);
let request_id = request.id().to_string(); let request_id = request.id().to_string();
......
...@@ -28,7 +28,7 @@ use derive_builder::Builder; ...@@ -28,7 +28,7 @@ use derive_builder::Builder;
use dynamo_runtime::config::env_is_truthy; use dynamo_runtime::config::env_is_truthy;
use dynamo_runtime::config::environment_names::llm as env_llm; use dynamo_runtime::config::environment_names::llm as env_llm;
use dynamo_runtime::discovery::Discovery; use dynamo_runtime::discovery::Discovery;
use dynamo_runtime::logging::make_inference_request_span; use dynamo_runtime::logging::{make_inference_request_span, make_system_request_span};
use dynamo_runtime::metrics::{ use dynamo_runtime::metrics::{
frontend_perf::ensure_frontend_perf_metrics_registered_prometheus, frontend_perf::ensure_frontend_perf_metrics_registered_prometheus,
request_plane::ensure_request_plane_metrics_registered_prometheus, request_plane::ensure_request_plane_metrics_registered_prometheus,
...@@ -40,6 +40,19 @@ use tokio::task::JoinHandle; ...@@ -40,6 +40,19 @@ use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use tower_http::trace::TraceLayer; use tower_http::trace::TraceLayer;
/// Middleware that echoes `x-request-id` from request to response headers.
async fn echo_request_id_header(
request: axum::extract::Request,
next: axum::middleware::Next,
) -> axum::response::Response {
let x_request_id = request.headers().get("x-request-id").cloned();
let mut response = next.run(request).await;
if let Some(value) = x_request_id {
response.headers_mut().insert("x-request-id", value);
}
response
}
/// HTTP service shared state /// HTTP service shared state
pub struct State { pub struct State {
metrics: Arc<Metrics>, metrics: Arc<Metrics>,
...@@ -491,11 +504,21 @@ impl HttpServiceConfigBuilder { ...@@ -491,11 +504,21 @@ impl HttpServiceConfigBuilder {
tracing::warn!("Failed to register transport metrics: {}", e); tracing::warn!("Failed to register transport metrics: {}", e);
} }
let mut router = axum::Router::new();
let mut all_docs = Vec::new(); let mut all_docs = Vec::new();
let mut routes = vec![ // Shared on_response callback for both system and inference routes
let on_response = |response: &Response<Body>, latency: Duration, _span: &tracing::Span| {
let status = response.status();
let latency_ms = latency.as_millis();
if status.is_server_error() || status.is_client_error() {
tracing::error!(status = %status.as_u16(), latency_ms = %latency_ms, "http response sent");
} else {
tracing::info!(status = %status.as_u16(), latency_ms = %latency_ms, "http response sent");
}
};
// System routes (health, metrics, models) — debug-level spans
let system_routes = vec![
metrics::router( metrics::router(
registry, registry,
var(HTTP_SVC_METRICS_PATH_ENV).ok(), var(HTTP_SVC_METRICS_PATH_ENV).ok(),
...@@ -506,55 +529,42 @@ impl HttpServiceConfigBuilder { ...@@ -506,55 +529,42 @@ impl HttpServiceConfigBuilder {
super::health::live_check_router(state.clone(), var(HTTP_SVC_LIVE_PATH_ENV).ok()), super::health::live_check_router(state.clone(), var(HTTP_SVC_LIVE_PATH_ENV).ok()),
super::busy_threshold::busy_threshold_router(state.clone(), None), super::busy_threshold::busy_threshold_router(state.clone(), None),
]; ];
let mut system_router = axum::Router::new();
for (route_docs, route) in system_routes {
system_router = system_router.merge(route);
all_docs.extend(route_docs);
}
// Inference routes (completions, chat, embeddings, etc.) — info-level spans
let endpoint_routes = let endpoint_routes =
HttpServiceConfigBuilder::get_endpoints_router(state.clone(), &config.request_template); HttpServiceConfigBuilder::get_endpoints_router(state.clone(), &config.request_template);
routes.extend(endpoint_routes); let mut inference_router = axum::Router::new();
for (route_docs, route) in routes { for (route_docs, route) in endpoint_routes {
router = router.merge(route); inference_router = inference_router.merge(route);
all_docs.extend(route_docs); all_docs.extend(route_docs);
} }
inference_router = inference_router.layer(
TraceLayer::new_for_http()
.make_span_with(make_inference_request_span)
.on_response(on_response),
);
// Add OpenAPI documentation routes (must be after all other routes so it can document them) // OpenAPI documentation routes (system)
// Note: The path parameter is currently unused as SwaggerUi requires static paths
let (openapi_docs, openapi_route) = let (openapi_docs, openapi_route) =
super::openapi_docs::openapi_router(all_docs.clone(), None); super::openapi_docs::openapi_router(all_docs.clone(), None);
router = router.merge(openapi_route); system_router = system_router.merge(openapi_route);
all_docs.extend(openapi_docs); all_docs.extend(openapi_docs);
// Add span for tracing system_router = system_router.layer(
// Add on_response callback for logging response status code
router = router.layer(
TraceLayer::new_for_http() TraceLayer::new_for_http()
.make_span_with(make_inference_request_span) .make_span_with(make_system_request_span)
.on_response( .on_response(on_response),
|response: &Response<Body>, latency: Duration, _span: &tracing::Span| {
let status = response.status();
let latency_ms = latency.as_millis();
if status.is_server_error() {
tracing::error!(
status = %status.as_u16(),
latency_ms = %latency_ms,
"request completed with server error"
);
} else if status.is_client_error() {
tracing::warn!(
status = %status.as_u16(),
latency_ms = %latency_ms,
"request completed with client request error"
);
} else {
tracing::debug!(
status = %status.as_u16(),
latency_ms = %latency_ms,
"request completed"
);
}
},
),
); );
let router = system_router.merge(inference_router);
// Echo x-request-id from request to response headers for client correlation
let router = router.layer(axum::middleware::from_fn(echo_request_id_header));
Ok(HttpService { Ok(HttpService {
state, state,
router, router,
......
...@@ -173,7 +173,7 @@ pub struct DistributedTraceContext { ...@@ -173,7 +173,7 @@ pub struct DistributedTraceContext {
#[serde(skip_serializing_if = "Option::is_none")] #[serde(skip_serializing_if = "Option::is_none")]
pub x_request_id: Option<String>, pub x_request_id: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")] #[serde(skip_serializing_if = "Option::is_none")]
pub x_dynamo_request_id: Option<String>, pub request_id: Option<String>,
} }
/// Pending context data collected in on_new_span, to be finalized in on_enter /// Pending context data collected in on_new_span, to be finalized in on_enter
...@@ -184,7 +184,7 @@ struct PendingDistributedTraceContext { ...@@ -184,7 +184,7 @@ struct PendingDistributedTraceContext {
parent_id: Option<String>, parent_id: Option<String>,
tracestate: Option<String>, tracestate: Option<String>,
x_request_id: Option<String>, x_request_id: Option<String>,
x_dynamo_request_id: Option<String>, request_id: Option<String>,
} }
/// Macro to emit a tracing event at a dynamic level with a custom target. /// Macro to emit a tracing event at a dynamic level with a custom target.
...@@ -232,7 +232,7 @@ pub struct TraceParent { ...@@ -232,7 +232,7 @@ pub struct TraceParent {
pub parent_id: Option<String>, pub parent_id: Option<String>,
pub tracestate: Option<String>, pub tracestate: Option<String>,
pub x_request_id: Option<String>, pub x_request_id: Option<String>,
pub x_dynamo_request_id: Option<String>, pub request_id: Option<String>,
} }
pub trait GenericHeaders { pub trait GenericHeaders {
...@@ -257,7 +257,7 @@ impl TraceParent { ...@@ -257,7 +257,7 @@ impl TraceParent {
let mut parent_id = None; let mut parent_id = None;
let mut tracestate = None; let mut tracestate = None;
let mut x_request_id = None; let mut x_request_id = None;
let mut x_dynamo_request_id = None; let mut request_id = None;
if let Some(header_value) = headers.get("traceparent") { if let Some(header_value) = headers.get("traceparent") {
(trace_id, parent_id) = parse_traceparent(header_value); (trace_id, parent_id) = parse_traceparent(header_value);
...@@ -271,19 +271,20 @@ impl TraceParent { ...@@ -271,19 +271,20 @@ impl TraceParent {
tracestate = Some(header_value.to_string()); tracestate = Some(header_value.to_string());
} }
if let Some(header_value) = headers.get("x-dynamo-request-id") { // Read request-id from internal headers, with fallback to deprecated x-dynamo-request-id
x_dynamo_request_id = Some(header_value.to_string()); if let Some(header_value) = headers.get("request-id") {
request_id = Some(header_value.to_string());
} else if let Some(header_value) = headers.get("x-dynamo-request-id") {
request_id = Some(header_value.to_string());
} }
// Validate UUID format let request_id = request_id.filter(|id| uuid::Uuid::parse_str(id).is_ok());
let x_dynamo_request_id =
x_dynamo_request_id.filter(|id| uuid::Uuid::parse_str(id).is_ok());
TraceParent { TraceParent {
trace_id, trace_id,
parent_id, parent_id,
tracestate, tracestate,
x_request_id, x_request_id,
x_dynamo_request_id, request_id,
} }
} }
} }
...@@ -301,11 +302,11 @@ pub fn make_inference_request_span<B>(req: &Request<B>) -> Span { ...@@ -301,11 +302,11 @@ pub fn make_inference_request_span<B>(req: &Request<B>) -> Span {
let otel_context = extract_otel_context_from_http_headers(req.headers()); let otel_context = extract_otel_context_from_http_headers(req.headers());
// Generate a request ID if the client didn't provide one so that workers // Ensure every inference request has a request_id on the span.
// (which read x_dynamo_request_id from the span/trace context) always // This is the single source of truth — workers and get_or_create_request_id
// have a consistent ID to correlate with. // read it back via DistributedTraceIdLayer.
let x_dynamo_request_id = trace_parent let request_id = trace_parent
.x_dynamo_request_id .request_id
.unwrap_or_else(|| Uuid::new_v4().to_string()); .unwrap_or_else(|| Uuid::new_v4().to_string());
let span = tracing::info_span!( let span = tracing::info_span!(
...@@ -317,10 +318,14 @@ pub fn make_inference_request_span<B>(req: &Request<B>) -> Span { ...@@ -317,10 +318,14 @@ pub fn make_inference_request_span<B>(req: &Request<B>) -> Span {
trace_id = trace_parent.trace_id, trace_id = trace_parent.trace_id,
parent_id = trace_parent.parent_id, parent_id = trace_parent.parent_id,
x_request_id = trace_parent.x_request_id, x_request_id = trace_parent.x_request_id,
x_dynamo_request_id = %x_dynamo_request_id, request_id = %request_id,
model = tracing::field::Empty, model = tracing::field::Empty,
input_tokens = tracing::field::Empty, input_tokens = tracing::field::Empty,
output_tokens = tracing::field::Empty, output_tokens = tracing::field::Empty,
ttft_ms = tracing::field::Empty,
avg_itl_ms = tracing::field::Empty,
prefill_worker_id = tracing::field::Empty,
decode_worker_id = tracing::field::Empty,
); );
if let Some(context) = otel_context { if let Some(context) = otel_context {
...@@ -330,19 +335,48 @@ pub fn make_inference_request_span<B>(req: &Request<B>) -> Span { ...@@ -330,19 +335,48 @@ pub fn make_inference_request_span<B>(req: &Request<B>) -> Span {
span span
} }
/// Create a span for system endpoints (health, metrics, models, etc.). /// Create a span for system endpoints (health, metrics, models, engine, loras, etc.).
/// ///
/// Uses `target: "system_span"` which follows normal DYN_LOG filtering — these /// Same structure as `make_inference_request_span` but uses `target: "system_span"`
/// endpoints are polled frequently and don't need to be visible at INFO level. /// which follows normal DYN_LOG filtering (debug level by default). The inference
/// span target `request_span` is always-on via a `request_span=trace` directive;
/// system spans are not, keeping high-frequency polling endpoints quiet.
pub fn make_system_request_span<B>(req: &Request<B>) -> Span { pub fn make_system_request_span<B>(req: &Request<B>) -> Span {
let method = req.method(); let method = req.method();
let uri = req.uri(); let uri = req.uri();
tracing::debug_span!( let version = format!("{:?}", req.version());
let trace_parent = TraceParent::from_headers(req.headers());
let otel_context = extract_otel_context_from_http_headers(req.headers());
// Ensure every system request has a request_id on the span.
let request_id = trace_parent
.request_id
.unwrap_or_else(|| Uuid::new_v4().to_string());
let span = tracing::debug_span!(
target: "system_span", target: "system_span",
"http-request", "http-request",
method = %method, method = %method,
uri = %uri, uri = %uri,
) version = %version,
trace_id = trace_parent.trace_id,
parent_id = trace_parent.parent_id,
x_request_id = trace_parent.x_request_id,
request_id = %request_id,
model = tracing::field::Empty,
input_tokens = tracing::field::Empty,
output_tokens = tracing::field::Empty,
ttft_ms = tracing::field::Empty,
avg_itl_ms = tracing::field::Empty,
prefill_worker_id = tracing::field::Empty,
decode_worker_id = tracing::field::Empty,
);
if let Some(context) = otel_context {
let _ = span.set_parent(context);
}
span
} }
/// Extract OpenTelemetry context from HTTP headers for distributed tracing /// Extract OpenTelemetry context from HTTP headers for distributed tracing
...@@ -399,7 +433,7 @@ pub fn make_handle_payload_span( ...@@ -399,7 +433,7 @@ pub fn make_handle_payload_span(
trace_id = trace_id.as_str(), trace_id = trace_id.as_str(),
parent_id = parent_id.as_str(), parent_id = parent_id.as_str(),
x_request_id = trace_parent.x_request_id, x_request_id = trace_parent.x_request_id,
x_dynamo_request_id = trace_parent.x_dynamo_request_id, request_id = trace_parent.request_id,
tracestate = trace_parent.tracestate, tracestate = trace_parent.tracestate,
component = component, component = component,
endpoint = endpoint, endpoint = endpoint,
...@@ -416,7 +450,7 @@ pub fn make_handle_payload_span( ...@@ -416,7 +450,7 @@ pub fn make_handle_payload_span(
target: "request_span", target: "request_span",
"handle_payload", "handle_payload",
x_request_id = trace_parent.x_request_id, x_request_id = trace_parent.x_request_id,
x_dynamo_request_id = trace_parent.x_dynamo_request_id, request_id = trace_parent.request_id,
tracestate = trace_parent.tracestate, tracestate = trace_parent.tracestate,
component = component, component = component,
endpoint = endpoint, endpoint = endpoint,
...@@ -436,7 +470,11 @@ pub fn make_handle_payload_span_from_tcp_headers( ...@@ -436,7 +470,11 @@ pub fn make_handle_payload_span_from_tcp_headers(
) -> Span { ) -> Span {
let (otel_context, trace_id, parent_span_id) = extract_otel_context_from_tcp_headers(headers); let (otel_context, trace_id, parent_span_id) = extract_otel_context_from_tcp_headers(headers);
let x_request_id = headers.get("x-request-id").cloned(); let x_request_id = headers.get("x-request-id").cloned();
let x_dynamo_request_id = headers.get("x-dynamo-request-id").cloned(); let request_id = headers
.get("request-id")
.or_else(|| headers.get("x-dynamo-request-id"))
.filter(|id| uuid::Uuid::parse_str(id).is_ok())
.cloned();
let tracestate = headers.get("tracestate").cloned(); let tracestate = headers.get("tracestate").cloned();
if let (Some(trace_id), Some(parent_id)) = (trace_id.as_ref(), parent_span_id.as_ref()) { if let (Some(trace_id), Some(parent_id)) = (trace_id.as_ref(), parent_span_id.as_ref()) {
...@@ -446,7 +484,7 @@ pub fn make_handle_payload_span_from_tcp_headers( ...@@ -446,7 +484,7 @@ pub fn make_handle_payload_span_from_tcp_headers(
trace_id = trace_id.as_str(), trace_id = trace_id.as_str(),
parent_id = parent_id.as_str(), parent_id = parent_id.as_str(),
x_request_id = x_request_id, x_request_id = x_request_id,
x_dynamo_request_id = x_dynamo_request_id, request_id = request_id,
tracestate = tracestate, tracestate = tracestate,
component = component, component = component,
endpoint = endpoint, endpoint = endpoint,
...@@ -463,7 +501,7 @@ pub fn make_handle_payload_span_from_tcp_headers( ...@@ -463,7 +501,7 @@ pub fn make_handle_payload_span_from_tcp_headers(
target: "request_span", target: "request_span",
"handle_payload", "handle_payload",
x_request_id = x_request_id, x_request_id = x_request_id,
x_dynamo_request_id = x_dynamo_request_id, request_id = request_id,
tracestate = tracestate, tracestate = tracestate,
component = component, component = component,
endpoint = endpoint, endpoint = endpoint,
...@@ -599,8 +637,8 @@ pub fn inject_trace_headers_into_map(headers: &mut std::collections::HashMap<Str ...@@ -599,8 +637,8 @@ pub fn inject_trace_headers_into_map(headers: &mut std::collections::HashMap<Str
if let Some(x_request_id) = trace_context.x_request_id { if let Some(x_request_id) = trace_context.x_request_id {
headers.insert("x-request-id".to_string(), x_request_id); headers.insert("x-request-id".to_string(), x_request_id);
} }
if let Some(x_dynamo_request_id) = trace_context.x_dynamo_request_id { if let Some(request_id) = trace_context.request_id {
headers.insert("x-dynamo-request-id".to_string(), x_dynamo_request_id); headers.insert("request-id".to_string(), request_id);
} }
} }
} }
...@@ -632,8 +670,6 @@ pub fn make_client_request_span( ...@@ -632,8 +670,6 @@ pub fn make_client_request_span(
trace_id = ctx.trace_id.as_str(), trace_id = ctx.trace_id.as_str(),
parent_id = ctx.span_id.as_str(), parent_id = ctx.span_id.as_str(),
x_request_id = ctx.x_request_id.as_deref(), x_request_id = ctx.x_request_id.as_deref(),
x_dynamo_request_id = ctx.x_dynamo_request_id.as_deref(),
// tracestate = ctx.tracestate.as_deref(),
) )
} else { } else {
tracing::info_span!( tracing::info_span!(
...@@ -643,8 +679,6 @@ pub fn make_client_request_span( ...@@ -643,8 +679,6 @@ pub fn make_client_request_span(
trace_id = ctx.trace_id.as_str(), trace_id = ctx.trace_id.as_str(),
parent_id = ctx.span_id.as_str(), parent_id = ctx.span_id.as_str(),
x_request_id = ctx.x_request_id.as_deref(), x_request_id = ctx.x_request_id.as_deref(),
x_dynamo_request_id = ctx.x_dynamo_request_id.as_deref(),
// tracestate = ctx.tracestate.as_deref(),
) )
}; };
...@@ -711,7 +745,7 @@ where ...@@ -711,7 +745,7 @@ where
let mut parent_id: Option<String> = None; let mut parent_id: Option<String> = None;
let mut span_id: Option<String> = None; let mut span_id: Option<String> = None;
let mut x_request_id: Option<String> = None; let mut x_request_id: Option<String> = None;
let mut x_dynamo_request_id: Option<String> = None; let mut request_id: Option<String> = None;
let mut tracestate: Option<String> = None; let mut tracestate: Option<String> = None;
let mut visitor = FieldVisitor::default(); let mut visitor = FieldVisitor::default();
attrs.record(&mut visitor); attrs.record(&mut visitor);
...@@ -753,9 +787,11 @@ where ...@@ -753,9 +787,11 @@ where
x_request_id = Some(x_request_id_input.to_string()); x_request_id = Some(x_request_id_input.to_string());
} }
// Extract x_dynamo_request_id // Extract request_id (with backward compat for x_dynamo_request_id)
if let Some(x_request_id_input) = visitor.fields.get("x_dynamo_request_id") { if let Some(request_id_input) = visitor.fields.get("request_id") {
x_dynamo_request_id = Some(x_request_id_input.to_string()); request_id = Some(request_id_input.to_string());
} else if let Some(x_request_id_input) = visitor.fields.get("x_dynamo_request_id") {
request_id = Some(x_request_id_input.to_string());
} }
// Inherit trace context from parent span if available // Inherit trace context from parent span if available
...@@ -787,7 +823,7 @@ where ...@@ -787,7 +823,7 @@ where
parent_id, parent_id,
tracestate, tracestate,
x_request_id, x_request_id,
x_dynamo_request_id, request_id,
}); });
} }
} }
...@@ -820,7 +856,7 @@ where ...@@ -820,7 +856,7 @@ where
let parent_id = pending.parent_id; let parent_id = pending.parent_id;
let tracestate = pending.tracestate; let tracestate = pending.tracestate;
let x_request_id = pending.x_request_id; let x_request_id = pending.x_request_id;
let x_dynamo_request_id = pending.x_dynamo_request_id; let request_id = pending.request_id;
// Try to extract from OtelData if not already set // Try to extract from OtelData if not already set
// Need to drop extensions_mut to get immutable borrow for OtelData // Need to drop extensions_mut to get immutable borrow for OtelData
...@@ -872,7 +908,7 @@ where ...@@ -872,7 +908,7 @@ where
start: Some(Instant::now()), start: Some(Instant::now()),
end: None, end: None,
x_request_id, x_request_id,
x_dynamo_request_id, request_id,
}); });
drop(extensions); drop(extensions);
...@@ -1309,14 +1345,16 @@ where ...@@ -1309,14 +1345,16 @@ where
visitor.fields.remove("x_request_id"); visitor.fields.remove("x_request_id");
} }
if let Some(x_dynamo_request_id) = tracing_context.x_dynamo_request_id.clone() { if let Some(request_id) = tracing_context.request_id.clone() {
visitor.fields.insert( visitor.fields.insert(
"x_dynamo_request_id".to_string(), "request_id".to_string(),
serde_json::Value::String(x_dynamo_request_id), serde_json::Value::String(request_id),
); );
} else { } else {
visitor.fields.remove("x_dynamo_request_id"); visitor.fields.remove("request_id");
} }
// Remove old field name if present
visitor.fields.remove("x_dynamo_request_id");
} else { } else {
tracing::error!( tracing::error!(
"Distributed Trace Context not found, falling back to internal ids" "Distributed Trace Context not found, falling back to internal ids"
......
...@@ -263,7 +263,7 @@ async fn handle_shared_request( ...@@ -263,7 +263,7 @@ async fn handle_shared_request(
trace_id = traceparent.trace_id, trace_id = traceparent.trace_id,
parent_id = traceparent.parent_id, parent_id = traceparent.parent_id,
x_request_id = traceparent.x_request_id, x_request_id = traceparent.x_request_id,
x_dynamo_request_id = traceparent.x_dynamo_request_id, request_id = traceparent.request_id,
tracestate = traceparent.tracestate tracestate = traceparent.tracestate
)) ))
.await; .await;
...@@ -308,10 +308,19 @@ impl TraceParent { ...@@ -308,10 +308,19 @@ impl TraceParent {
traceparent.x_request_id = Some(s.to_string()); traceparent.x_request_id = Some(s.to_string());
} }
if let Some(value) = headers.get("x-dynamo-request-id") // Read request-id from internal headers, with fallback to deprecated x-dynamo-request-id
&& let Ok(s) = value.to_str() if let Some(s) = headers
.get("request-id")
.and_then(|v| v.to_str().ok())
.filter(|s| uuid::Uuid::parse_str(s).is_ok())
{
traceparent.request_id = Some(s.to_string());
} else if let Some(s) = headers
.get("x-dynamo-request-id")
.and_then(|v| v.to_str().ok())
.filter(|s| uuid::Uuid::parse_str(s).is_ok())
{ {
traceparent.x_dynamo_request_id = Some(s.to_string()); traceparent.request_id = Some(s.to_string());
} }
traceparent traceparent
...@@ -374,13 +383,19 @@ mod tests { ...@@ -374,13 +383,19 @@ mod tests {
headers.insert("traceparent", "test-trace-id".parse().unwrap()); headers.insert("traceparent", "test-trace-id".parse().unwrap());
headers.insert("tracestate", "test-state".parse().unwrap()); headers.insert("tracestate", "test-state".parse().unwrap());
headers.insert("x-request-id", "req-123".parse().unwrap()); headers.insert("x-request-id", "req-123".parse().unwrap());
headers.insert("x-dynamo-request-id", "dyn-456".parse().unwrap()); headers.insert(
"x-dynamo-request-id",
"550e8400-e29b-41d4-a716-446655440000".parse().unwrap(),
);
let traceparent = TraceParent::from_axum_headers(&headers); let traceparent = TraceParent::from_axum_headers(&headers);
assert_eq!(traceparent.trace_id, Some("test-trace-id".to_string())); assert_eq!(traceparent.trace_id, Some("test-trace-id".to_string()));
assert_eq!(traceparent.tracestate, Some("test-state".to_string())); assert_eq!(traceparent.tracestate, Some("test-state".to_string()));
assert_eq!(traceparent.x_request_id, Some("req-123".to_string())); assert_eq!(traceparent.x_request_id, Some("req-123".to_string()));
assert_eq!(traceparent.x_dynamo_request_id, Some("dyn-456".to_string())); assert_eq!(
traceparent.request_id,
Some("550e8400-e29b-41d4-a716-446655440000".to_string())
);
} }
#[test] #[test]
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment