Unverified Commit 0cb01b3f authored by Neelay Shah's avatar Neelay Shah Committed by GitHub
Browse files

feat: updates to structured logging (#2061)


Signed-off-by: default avatarNeelay Shah <neelays@nvidia.com>
Co-authored-by: default avatarcoderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
parent 803bfa81
......@@ -252,7 +252,7 @@ dependencies = [
"eventsource-stream",
"futures",
"rand 0.8.5",
"reqwest",
"reqwest 0.12.22",
"reqwest-eventsource",
"secrecy",
"serde",
......@@ -634,15 +634,30 @@ dependencies = [
"rayon",
]
[[package]]
name = "bit-set"
version = "0.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0700ddab506f33b20a03b13996eccd309a48e5ff77d0d95926aa0210fb4e95f1"
dependencies = [
"bit-vec 0.6.3",
]
[[package]]
name = "bit-set"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "08807e080ed7f9d5433fa9b275196cfc35414f66a0c79d864dc51a0d825231a3"
dependencies = [
"bit-vec",
"bit-vec 0.8.0",
]
[[package]]
name = "bit-vec"
version = "0.6.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "349f9b6a179ed607305526ca489b34ad0a41aed5f7980fa90eb03160b69598fb"
[[package]]
name = "bit-vec"
version = "0.8.0"
......@@ -736,6 +751,12 @@ version = "3.17.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1628fb46dfa0b37568d12e5edd512553eccf6a22a78e8bde00bb4aed84d5bdbf"
[[package]]
name = "bytecount"
version = "0.6.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "175812e0be2bccb6abe50bb8d566126198344f707e304f45c648fd8f2cc0365e"
[[package]]
name = "bytemuck"
version = "1.23.1"
......@@ -943,7 +964,7 @@ version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6fac387a98bb7c37292057cffc56d62ecb629900026402633ae9160df93a8766"
dependencies = [
"nom",
"nom 7.1.3",
]
[[package]]
......@@ -1906,7 +1927,7 @@ dependencies = [
"rand 0.9.1",
"rayon",
"regex",
"reqwest",
"reqwest 0.12.22",
"rmp-serde",
"rstest 0.18.2",
"rstest_reuse",
......@@ -1986,6 +2007,7 @@ dependencies = [
"figment",
"futures",
"humantime",
"jsonschema",
"local-ip-address",
"log",
"nid",
......@@ -1995,12 +2017,14 @@ dependencies = [
"prometheus",
"rand 0.9.1",
"regex",
"reqwest",
"reqwest 0.12.22",
"rstest 0.23.0",
"serde",
"serde_json",
"socket2",
"stdio-override",
"temp-env",
"tempfile",
"thiserror 2.0.12",
"tokio",
"tokio-stream",
......@@ -2246,7 +2270,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "74fef4569247a5f429d9156b9d0a2599914385dd189c539334c625d8099d90ab"
dependencies = [
"futures-core",
"nom",
"nom 7.1.3",
"pin-project-lite",
]
......@@ -2271,13 +2295,23 @@ version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "af9673d8203fcb076b19dfd17e38b3d4ae9f44959416ea532ce72415a6020365"
[[package]]
name = "fancy-regex"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b95f7c0680e4142284cf8b22c14a476e87d61b004a3a0861872b32ef7ead40a2"
dependencies = [
"bit-set 0.5.3",
"regex",
]
[[package]]
name = "fancy-regex"
version = "0.14.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e24cb5a94bcae1e5408b0effca5cd7172ea3c5755049c5f3af4cd283a165298"
dependencies = [
"bit-set",
"bit-set 0.8.0",
"regex-automata 0.4.9",
"regex-syntax 0.8.5",
]
......@@ -2406,6 +2440,16 @@ dependencies = [
"percent-encoding",
]
[[package]]
name = "fraction"
version = "0.13.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3027ae1df8d41b4bed2241c8fdad4acc1e7af60c8e17743534b545e77182d678"
dependencies = [
"lazy_static",
"num",
]
[[package]]
name = "fuchsia-zircon"
version = "0.3.3"
......@@ -2846,7 +2890,7 @@ version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3ac5654356c6f7f6116905aeaf92ab002c3d03414ada5dbe0bb2e32aa5fea173"
dependencies = [
"fancy-regex",
"fancy-regex 0.14.0",
"ggml-quants",
"indexmap 2.9.0",
"log",
......@@ -2888,6 +2932,25 @@ dependencies = [
"regex-syntax 0.8.5",
]
[[package]]
name = "h2"
version = "0.3.27"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0beca50380b1fc32983fc1cb4587bfa4bb9e78fc259aad4a0032d2080309222d"
dependencies = [
"bytes",
"fnv",
"futures-core",
"futures-sink",
"futures-util",
"http 0.2.12",
"indexmap 2.9.0",
"slab",
"tokio",
"tokio-util",
"tracing",
]
[[package]]
name = "h2"
version = "0.4.9"
......@@ -2963,7 +3026,7 @@ dependencies = [
"base64 0.21.7",
"byteorder",
"flate2",
"nom",
"nom 7.1.3",
"num-traits",
]
......@@ -3008,7 +3071,7 @@ dependencies = [
"log",
"num_cpus",
"rand 0.8.5",
"reqwest",
"reqwest 0.12.22",
"serde",
"serde_json",
"thiserror 2.0.12",
......@@ -3152,6 +3215,7 @@ dependencies = [
"futures-channel",
"futures-core",
"futures-util",
"h2 0.3.27",
"http 0.2.12",
"http-body 0.4.6",
"httparse",
......@@ -3174,7 +3238,7 @@ dependencies = [
"bytes",
"futures-channel",
"futures-util",
"h2",
"h2 0.4.9",
"http 1.3.1",
"http-body 1.0.1",
"httparse",
......@@ -3237,7 +3301,7 @@ dependencies = [
"percent-encoding",
"pin-project-lite",
"socket2",
"system-configuration",
"system-configuration 0.6.1",
"tokio",
"tower-service",
"tracing",
......@@ -3567,6 +3631,15 @@ version = "1.70.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7943c866cc5cd64cbc25b2e01621d07fa8eb2a1a23160ee81ce38704e97b8ecf"
[[package]]
name = "iso8601"
version = "0.6.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e1082f0c48f143442a1ac6122f67e360ceee130b967af4d50996e5154a45df46"
dependencies = [
"nom 8.0.0",
]
[[package]]
name = "itertools"
version = "0.10.5"
......@@ -3659,6 +3732,36 @@ dependencies = [
"wasm-bindgen",
]
[[package]]
name = "jsonschema"
version = "0.17.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2a071f4f7efc9a9118dfb627a0a94ef247986e1ab8606a4c806ae2b3aa3b6978"
dependencies = [
"ahash",
"anyhow",
"base64 0.21.7",
"bytecount",
"clap 4.5.40",
"fancy-regex 0.11.0",
"fraction",
"getrandom 0.2.16",
"iso8601",
"itoa",
"memchr",
"num-cmp",
"once_cell",
"parking_lot",
"percent-encoding",
"regex",
"reqwest 0.11.27",
"serde",
"serde_json",
"time",
"url",
"uuid 1.17.0",
]
[[package]]
name = "jwalk"
version = "0.8.1"
......@@ -4011,7 +4114,7 @@ dependencies = [
"futures",
"prometheus",
"rand 0.9.1",
"reqwest",
"reqwest 0.12.22",
"serde",
"serde_json",
"thiserror 2.0.12",
......@@ -4142,7 +4245,7 @@ dependencies = [
"indexmap 2.9.0",
"mistralrs-core",
"rand 0.9.1",
"reqwest",
"reqwest 0.12.22",
"serde",
"serde_json",
"tokio",
......@@ -4222,7 +4325,7 @@ dependencies = [
"rayon",
"regex",
"regex-automata 0.4.9",
"reqwest",
"reqwest 0.12.22",
"rubato",
"rust-mcp-schema",
"rustc-hash 2.1.1",
......@@ -4263,7 +4366,7 @@ dependencies = [
"async-trait",
"futures-util",
"http 1.3.1",
"reqwest",
"reqwest 0.12.22",
"rust-mcp-schema",
"serde",
"serde_json",
......@@ -4497,6 +4600,15 @@ dependencies = [
"minimal-lexical",
]
[[package]]
name = "nom"
version = "8.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "df9761775871bdef83bee530e60050f7e54b1105350d6884eb0fb4f46c2f9405"
dependencies = [
"memchr",
]
[[package]]
name = "nonmax"
version = "0.5.5"
......@@ -4555,6 +4667,12 @@ dependencies = [
"num-traits",
]
[[package]]
name = "num-cmp"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "63335b2e2c34fae2fb0aa2cecfd9f0832a1e24b3b32ecec612c3426d46dc8aaa"
[[package]]
name = "num-complex"
version = "0.4.6"
......@@ -5187,8 +5305,8 @@ version = "1.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "14cae93065090804185d3b75f0bf93b8eeda30c7a9b4a33d3bdb3988d6229e50"
dependencies = [
"bit-set",
"bit-vec",
"bit-set 0.8.0",
"bit-vec 0.8.0",
"bitflags 2.9.0",
"lazy_static",
"num-traits",
......@@ -5666,6 +5784,42 @@ version = "1.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ba39f3699c378cd8970968dcbff9c43159ea4cfbd88d43c00b22f2ef10a435d2"
[[package]]
name = "reqwest"
version = "0.11.27"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dd67538700a17451e7cba03ac727fb961abb7607553461627b97de0b89cf4a62"
dependencies = [
"base64 0.21.7",
"bytes",
"encoding_rs",
"futures-core",
"futures-util",
"h2 0.3.27",
"http 0.2.12",
"http-body 0.4.6",
"hyper 0.14.32",
"ipnet",
"js-sys",
"log",
"mime",
"once_cell",
"percent-encoding",
"pin-project-lite",
"serde",
"serde_json",
"serde_urlencoded",
"sync_wrapper 0.1.2",
"system-configuration 0.5.1",
"tokio",
"tower-service",
"url",
"wasm-bindgen",
"wasm-bindgen-futures",
"web-sys",
"winreg",
]
[[package]]
name = "reqwest"
version = "0.12.22"
......@@ -5678,7 +5832,7 @@ dependencies = [
"futures-channel",
"futures-core",
"futures-util",
"h2",
"h2 0.4.9",
"http 1.3.1",
"http-body 1.0.1",
"http-body-util",
......@@ -5723,9 +5877,9 @@ dependencies = [
"futures-core",
"futures-timer",
"mime",
"nom",
"nom 7.1.3",
"pin-project-lite",
"reqwest",
"reqwest 0.12.22",
"thiserror 1.0.69",
]
......@@ -6556,7 +6710,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5851699c4033c63636f7ea4cf7b7c1f1bf06d0cc03cfb42e711de5a5c46cf326"
dependencies = [
"base64 0.13.1",
"nom",
"nom 7.1.3",
"serde",
"unicode-segmentation",
]
......@@ -6573,6 +6727,16 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f"
[[package]]
name = "stdio-override"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cffa8a2e517b4e9f270c47e1c4120df90506d9451c1efa67e3698d66446d30ce"
dependencies = [
"libc",
"winapi 0.3.9",
]
[[package]]
name = "stop-words"
version = "0.8.1"
......@@ -6879,6 +7043,17 @@ dependencies = [
"windows",
]
[[package]]
name = "system-configuration"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ba3a3adc5c275d719af8cb4272ea1c4a6d668a777f37e115f6d11ddbc1c8e0e7"
dependencies = [
"bitflags 1.3.2",
"core-foundation 0.9.4",
"system-configuration-sys 0.5.0",
]
[[package]]
name = "system-configuration"
version = "0.6.1"
......@@ -6887,7 +7062,17 @@ checksum = "3c879d448e9d986b661742763247d3693ed13609438cf3d006f51f5368a5ba6b"
dependencies = [
"bitflags 2.9.0",
"core-foundation 0.9.4",
"system-configuration-sys",
"system-configuration-sys 0.6.0",
]
[[package]]
name = "system-configuration-sys"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a75fb188eb626b924683e3b95e3a48e63551fcfb51949de2f06a9d91dbee93c9"
dependencies = [
"core-foundation-sys",
"libc",
]
[[package]]
......@@ -7113,7 +7298,7 @@ dependencies = [
"dary_heap",
"derive_builder",
"esaxx-rs",
"fancy-regex",
"fancy-regex 0.14.0",
"getrandom 0.3.2",
"hf-hub",
"itertools 0.14.0",
......@@ -7350,7 +7535,7 @@ dependencies = [
"axum 0.7.9",
"base64 0.22.1",
"bytes",
"h2",
"h2 0.4.9",
"http 1.3.1",
"http-body 1.0.1",
"http-body-util",
......@@ -8404,6 +8589,16 @@ dependencies = [
"memchr",
]
[[package]]
name = "winreg"
version = "0.50.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "524e57b2c537c0f9b1e69f1965311ec12182b4122e45035b1508cd24d2adadb1"
dependencies = [
"cfg-if 1.0.0",
"windows-sys 0.48.0",
]
[[package]]
name = "wit-bindgen-rt"
version = "0.39.0"
......
......@@ -70,3 +70,6 @@ env_logger = { version = "0.11" }
reqwest = { workspace = true }
rstest = { version = "0.23.0" }
temp-env = { version = "0.3.6" , features=["async_closure"] }
stdio-override = {version= "0.2.0"}
jsonschema = {version = "0.17"}
tempfile = { workspace = true }
\ No newline at end of file
......@@ -14,6 +14,7 @@
// limitations under the License.
use crate::config::HealthStatus;
use crate::logging::TraceParent;
use crate::metrics::MetricsRegistry;
use crate::traits::DistributedRuntimeProvider;
use axum::{body, http::StatusCode, response::IntoResponse, routing::get, Router};
......@@ -25,6 +26,7 @@ use std::time::Instant;
use tokio::{net::TcpListener, task::JoinHandle};
use tokio_util::sync::CancellationToken;
use tracing;
use tracing::Instrument;
/// HTTP server information containing socket address and handle
#[derive(Debug)]
......@@ -160,26 +162,35 @@ pub async fn spawn_http_server(
"/health",
get({
let state = Arc::clone(&server_state);
move || health_handler(state.clone())
move |tracing_ctx| health_handler(state, "health", tracing_ctx)
}),
)
.route(
"/live",
get({
let state = Arc::clone(&server_state);
move || health_handler(state)
move |tracing_ctx| health_handler(state, "live", tracing_ctx)
}),
)
.route(
"/metrics",
get({
let state = Arc::clone(&server_state);
move || metrics_handler(state)
move |tracing_ctx| metrics_handler(state, "metrics", tracing_ctx)
}),
)
.fallback(|| async {
tracing::info!("[fallback handler] called");
(StatusCode::NOT_FOUND, "Route not found").into_response()
.fallback(|tracing_ctx: TraceParent| {
async {
tracing::info!("[fallback handler] called");
(StatusCode::NOT_FOUND, "Route not found").into_response()
}
.instrument(tracing::trace_span!(
"fallback handler",
trace_id = tracing_ctx.trace_id,
parent_id = tracing_ctx.parent_id,
x_request_id = tracing_ctx.x_request_id,
tracestate = tracing_ctx.tracestate
))
});
let address = format!("{}:{}", host, port);
......@@ -216,8 +227,16 @@ pub async fn spawn_http_server(
}
/// Health handler
#[tracing::instrument(skip_all, level = "trace")]
async fn health_handler(state: Arc<HttpServerState>) -> impl IntoResponse {
#[tracing::instrument(skip_all, level="trace", fields(route= %route,
trace_id = trace_parent.trace_id,
parent_id = trace_parent.parent_id,
x_request_id= trace_parent.x_request_id,
tracestate= trace_parent.tracestate))]
async fn health_handler(
state: Arc<HttpServerState>,
route: &'static str, // Used for tracing only
trace_parent: TraceParent, // Used for tracing only
) -> impl IntoResponse {
let system_health = state.drt().system_health.lock().await;
let (mut healthy, endpoints) = system_health.get_health_status();
let uptime = match state.uptime() {
......@@ -248,7 +267,16 @@ async fn health_handler(state: Arc<HttpServerState>) -> impl IntoResponse {
}
/// Metrics handler with DistributedRuntime uptime
async fn metrics_handler(state: Arc<HttpServerState>) -> impl IntoResponse {
#[tracing::instrument(skip_all, level="trace", fields(route= %route,
trace_id = trace_parent.trace_id,
parent_id = trace_parent.parent_id,
x_request_id = trace_parent.x_request_id,
tracestate = trace_parent.tracestate))]
async fn metrics_handler(
state: Arc<HttpServerState>,
route: &'static str, // Used for tracing only
trace_parent: TraceParent, // Used for tracing only
) -> impl IntoResponse {
// Update the uptime gauge with current value
state.update_uptime_gauge();
......@@ -281,9 +309,17 @@ async fn create_test_drt_async() -> crate::DistributedRuntime {
#[cfg(test)]
mod tests {
use super::*;
use crate::logging::tests::load_log;
use crate::metrics::MetricsRegistry;
use anyhow::{anyhow, Result};
use chrono::{DateTime, Utc};
use jsonschema::{Draft, JSONSchema};
use rstest::rstest;
use serde_json::Value;
use std::fs::File;
use std::io::{BufRead, BufReader};
use std::sync::Arc;
use stdio_override::*;
use tokio::time::{sleep, Duration};
#[tokio::test]
......@@ -358,10 +394,10 @@ dynamo_uptime_seconds{namespace=\"http_server\"} 42
}
#[rstest]
#[cfg(feature = "integration")]
#[case("ready", 200, "ready")]
#[case("notready", 503, "notready")]
#[tokio::test]
#[cfg(feature = "integration")]
async fn test_health_endpoints(
#[case] starting_health_status: &'static str,
#[case] expected_status: u16,
......@@ -375,6 +411,8 @@ dynamo_uptime_seconds{namespace=\"http_server\"} 42
// Closure call is needed here to satisfy async_with_vars
crate::logging::init();
#[allow(clippy::redundant_closure_call)]
temp_env::async_with_vars(
[(
......@@ -402,6 +440,14 @@ dynamo_uptime_seconds{namespace=\"http_server\"} 42
("/someRandomPathNotFoundHere", 404, "Route not found"),
] {
println!("[test] Sending request to {}", path);
let traceparent_value =
"00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01";
let tracestate_value = "vendor1=opaqueValue1,vendor2=opaqueValue2";
let mut headers = reqwest::header::HeaderMap::new();
headers.insert(
reqwest::header::HeaderName.from_static("traceparent"),
reqwest::header::HeaderValue.from_str(traceparent_value)?,
);
let url = format!("http://{}{}", addr, path);
let response = client.get(&url).send().await.unwrap();
let status = response.status();
......@@ -427,6 +473,67 @@ dynamo_uptime_seconds{namespace=\"http_server\"} 42
.await;
}
#[tokio::test]
#[cfg(feature = "integration")]
async fn test_health_endpoint_tracing() -> Result<()> {
use std::sync::Arc;
use tokio::time::sleep;
use tokio_util::sync::CancellationToken;
// Closure call is needed here to satisfy async_with_vars
#[allow(clippy::redundant_closure_call)]
let _ = temp_env::async_with_vars(
[
("DYN_SYSTEM_STARTING_HEALTH_STATUS", Some("ready")),
("DYN_LOGGING_JSONL", Some("1")),
("DYN_LOG", Some("trace")),
],
(async || {
// TODO Add proper testing for
// trace id and parent id
crate::logging::init();
let runtime = crate::Runtime::from_settings().unwrap();
let drt = Arc::new(
crate::DistributedRuntime::from_settings_without_discovery(runtime)
.await
.unwrap(),
);
let cancel_token = CancellationToken::new();
let (addr, _) = spawn_http_server("127.0.0.1", 0, cancel_token.clone(), drt)
.await
.unwrap();
sleep(std::time::Duration::from_millis(1000)).await;
let client = reqwest::Client::new();
for path in [("/health"), ("/live"), ("/someRandomPathNotFoundHere")] {
let traceparent_value =
"00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01";
let tracestate_value = "vendor1=opaqueValue1,vendor2=opaqueValue2";
let mut headers = reqwest::header::HeaderMap::new();
headers.insert(
reqwest::header::HeaderName::from_static("traceparent"),
reqwest::header::HeaderValue::from_str(traceparent_value)?,
);
headers.insert(
reqwest::header::HeaderName::from_static("tracestate"),
reqwest::header::HeaderValue::from_str(tracestate_value)?,
);
let url = format!("http://{}{}", addr, path);
let response = client.get(&url).headers(headers).send().await.unwrap();
let status = response.status();
let body = response.text().await.unwrap();
tracing::info!(body = body, status = status.to_string());
}
Ok::<(), anyhow::Error>(())
})(),
)
.await;
Ok(())
}
#[cfg(feature = "integration")]
#[tokio::test]
async fn test_uptime_without_initialization() {
......
......@@ -48,6 +48,22 @@ use tracing_subscriber::EnvFilter;
use tracing_subscriber::{filter::Directive, fmt};
use crate::config::{disable_ansi_logging, jsonl_logging_enabled};
use axum::extract::FromRequestParts;
use axum::http::request::Parts;
use serde_json::Value;
use std::convert::Infallible;
use std::time::Instant;
use tracing::field::Field;
use tracing::span;
use tracing::Id;
use tracing::Span;
use tracing_subscriber::field::Visit;
use tracing_subscriber::fmt::format::FmtSpan;
use tracing_subscriber::layer::Context;
use tracing_subscriber::registry::SpanData;
use tracing_subscriber::Layer;
use tracing_subscriber::Registry;
use uuid::Uuid;
/// ENV used to set the log level
const FILTER_ENV: &str = "DYN_LOG";
......@@ -87,6 +103,239 @@ impl Default for LoggingConfig {
}
}
/// Generate a 32-character, lowercase hex trace ID (W3C-compliant)
fn generate_trace_id() -> String {
Uuid::new_v4().simple().to_string()
}
/// Generate a 16-character, lowercase hex span ID (W3C-compliant)
fn generate_span_id() -> String {
// Use the first 8 bytes (16 hex chars) of a UUID v4
let uuid = Uuid::new_v4();
let bytes = uuid.as_bytes();
bytes[..8].iter().map(|b| format!("{:02x}", b)).collect()
}
/// Validate a given trace ID according to W3C Trace Context specifications.
/// A valid trace ID is a 32-character hexadecimal string (lowercase).
pub fn is_valid_trace_id(trace_id: &str) -> bool {
trace_id.len() == 32 && trace_id.chars().all(|c| c.is_ascii_hexdigit())
}
/// Validate a given span ID according to W3C Trace Context specifications.
/// A valid span ID is a 16-character hexadecimal string (lowercase).
pub fn is_valid_span_id(span_id: &str) -> bool {
span_id.len() == 16 && span_id.chars().all(|c| c.is_ascii_hexdigit())
}
pub struct DistributedTraceIdLayer;
#[derive(Clone)]
pub struct DistributedTraceContext {
trace_id: String,
span_id: String,
parent_id: Option<String>,
tracestate: Option<String>,
start: Instant,
end: Option<Instant>,
x_request_id: Option<String>,
}
#[derive(Debug, Clone)]
pub struct TraceParent {
pub trace_id: Option<String>,
pub parent_id: Option<String>,
pub tracestate: Option<String>,
pub x_request_id: Option<String>,
}
impl<S> FromRequestParts<S> for TraceParent
where
S: Send + Sync,
{
type Rejection = Infallible;
async fn from_request_parts(parts: &mut Parts, _state: &S) -> Result<Self, Self::Rejection> {
let mut trace_id = None;
let mut parent_id = None;
let mut tracestate = None;
if let Some(header_value) = parts.headers.get("traceparent") {
if let Ok(header_str) = header_value.to_str() {
let pieces: Vec<_> = header_str.split('-').collect();
if pieces.len() == 4 {
let candidate_trace_id = pieces[1];
let candidate_parent_id = pieces[2];
if is_valid_trace_id(candidate_trace_id)
&& is_valid_span_id(candidate_parent_id)
{
trace_id = Some(candidate_trace_id.to_string());
parent_id = Some(candidate_parent_id.to_string());
} else {
tracing::debug!("Invalid traceparent header: {}", header_str);
}
}
}
}
if let Some(header_value) = parts.headers.get("tracestate") {
if let Ok(header_str) = header_value.to_str() {
tracestate = Some(header_str.to_string());
}
}
// Extract X-Request-ID or x-request-id (case-insensitive)
let x_request_id = parts
.headers
.get("x-request-id")
.and_then(|val| val.to_str().ok())
.map(|s| s.to_string());
Ok(TraceParent {
trace_id,
parent_id,
tracestate,
x_request_id,
})
}
}
#[derive(Debug, Default)]
pub struct FieldVisitor {
pub fields: HashMap<String, String>,
}
impl Visit for FieldVisitor {
fn record_str(&mut self, field: &Field, value: &str) {
self.fields
.insert(field.name().to_string(), value.to_string());
}
fn record_debug(&mut self, field: &Field, value: &dyn std::fmt::Debug) {
self.fields
.insert(field.name().to_string(), format!("{:?}", value).to_string());
}
}
impl<S> Layer<S> for DistributedTraceIdLayer
where
S: Subscriber + for<'a> tracing_subscriber::registry::LookupSpan<'a>,
{
// Capture close span time
// Currently not used but added for future use in timing
fn on_close(&self, id: Id, ctx: Context<'_, S>) {
if let Some(span) = ctx.span(&id) {
let mut extensions = span.extensions_mut();
if let Some(distributed_tracing_context) =
extensions.get_mut::<DistributedTraceContext>()
{
distributed_tracing_context.end = Some(Instant::now());
}
}
}
// Adds W3C compliant span_id, trace_id, and parent_id if not already present
fn on_new_span(&self, attrs: &span::Attributes<'_>, id: &Id, ctx: Context<'_, S>) {
if let Some(span) = ctx.span(id) {
let mut trace_id: Option<String> = None;
let mut parent_id: Option<String> = None;
let mut span_id: Option<String> = None;
let mut x_request_id: Option<String> = None;
let mut tracestate: Option<String> = None;
let mut visitor = FieldVisitor::default();
attrs.record(&mut visitor);
if let Some(trace_id_input) = visitor.fields.get("trace_id") {
if !is_valid_trace_id(trace_id_input) {
tracing::trace!("trace id '{}' is not valid! Ignoring.", trace_id_input);
} else {
trace_id = Some(trace_id_input.to_string());
}
}
if let Some(span_id_input) = visitor.fields.get("span_id") {
if !is_valid_span_id(span_id_input) {
tracing::trace!("span id '{}' is not valid! Ignoring.", span_id_input);
} else {
span_id = Some(span_id_input.to_string());
}
}
if let Some(parent_id_input) = visitor.fields.get("parent_id") {
if !is_valid_span_id(parent_id_input) {
tracing::trace!("parent id '{}' is not valid! Ignoring.", parent_id_input);
} else {
parent_id = Some(parent_id_input.to_string());
}
}
if let Some(tracestate_input) = visitor.fields.get("tracestate") {
tracestate = Some(tracestate_input.to_string());
}
if let Some(x_request_id_input) = visitor.fields.get("x_request_id") {
x_request_id = Some(x_request_id_input.to_string());
}
if parent_id.is_none() {
if let Some(parent_span_id) = ctx.current_span().id() {
if let Some(parent_span) = ctx.span(parent_span_id) {
let parent_ext = parent_span.extensions();
if let Some(parent_tracing_context) =
parent_ext.get::<DistributedTraceContext>()
{
trace_id = Some(parent_tracing_context.trace_id.clone());
parent_id = Some(parent_tracing_context.span_id.clone());
tracestate = parent_tracing_context.tracestate.clone();
}
}
}
}
if (parent_id.is_some() || span_id.is_some()) && trace_id.is_none() {
tracing::error!("parent id or span id are set but trace id is not set!");
// Clear inconsistent IDs to maintain trace integrity
parent_id = None;
span_id = None;
}
if trace_id.is_none() {
trace_id = Some(generate_trace_id());
}
if span_id.is_none() {
span_id = Some(generate_span_id());
}
let mut extensions = span.extensions_mut();
extensions.insert(DistributedTraceContext {
trace_id: trace_id.expect("Trace ID must be set"),
span_id: span_id.expect("Span ID must be set"),
parent_id,
tracestate,
start: Instant::now(),
end: None,
x_request_id,
});
}
}
}
// Enables functions to retreive their current
// context for adding to distributed headers
pub fn get_distributed_tracing_context() -> Option<DistributedTraceContext> {
Span::current()
.with_subscriber(|(id, subscriber)| {
subscriber
.downcast_ref::<Registry>()
.and_then(|registry| registry.span_data(id))
.and_then(|span_data| {
let extensions = span_data.extensions();
extensions.get::<DistributedTraceContext>().cloned()
})
})
.flatten()
}
/// Initialize the logger
pub fn init() {
INIT.call_once(setup_logging);
......@@ -94,7 +343,6 @@ pub fn init() {
#[cfg(feature = "tokio-console")]
fn setup_logging() {
// Start tokio-console server. Returns a tracing-subscriber Layer.
let tokio_console_layer = console_subscriber::ConsoleLayer::builder()
.with_default_env()
.server_addr(([0, 0, 0, 0], console_subscriber::Server::DEFAULT_PORT))
......@@ -116,26 +364,24 @@ fn setup_logging() {
#[cfg(not(feature = "tokio-console"))]
fn setup_logging() {
let f = filters(load_config());
// The generics mean we have to repeat everything. Each builder method returns a
// specialized type.
let filter_layer = filters(load_config());
if jsonl_logging_enabled() {
// JSON logger for NIM
let l = fmt::layer()
.with_ansi(false)
.with_span_events(FmtSpan::NEW | FmtSpan::CLOSE)
.event_format(CustomJsonFormatter::new())
.with_writer(std::io::stderr)
.with_filter(f);
tracing_subscriber::registry().with(l).init();
.with_filter(filter_layer);
tracing_subscriber::registry()
.with(DistributedTraceIdLayer)
.with(l)
.init();
} else {
// Normal logging
let l = fmt::layer()
.with_ansi(!disable_ansi_logging())
.event_format(fmt::format().compact().with_timer(TimeFormatter::new()))
.with_writer(std::io::stderr)
.with_filter(f);
.with_filter(filter_layer);
tracing_subscriber::registry().with(l).init();
}
}
......@@ -146,7 +392,6 @@ fn filters(config: LoggingConfig) -> EnvFilter {
.with_env_var(FILTER_ENV)
.from_env_lossy();
// apply the log_filters from the config files
for (module, level) in config.log_filters {
match format!("{module}={level}").parse::<Directive>() {
Ok(d) => {
......@@ -182,7 +427,6 @@ pub fn log_message(level: &str, message: &str, module: &str, file: &str, line: u
);
}
// TODO: This should be merged into the global config (rust/common/src/config.rs) once we have it
fn load_config() -> LoggingConfig {
let config_path = std::env::var(CONFIG_PATH_ENV).unwrap_or_else(|_| "".to_string());
let figment = Figment::new()
......@@ -198,9 +442,10 @@ struct JsonLog<'a> {
time: String,
level: String,
#[serde(skip_serializing_if = "Option::is_none")]
file_path: Option<&'a str>,
file: Option<&'a str>,
#[serde(skip_serializing_if = "Option::is_none")]
line_number: Option<u32>,
line: Option<u32>,
target: &'a str,
message: serde_json::Value,
#[serde(flatten)]
fields: BTreeMap<String, serde_json::Value>,
......@@ -220,11 +465,11 @@ impl TimeFormatter {
fn format_now(&self) -> String {
if self.use_local_tz {
chrono::Local::now()
.format("%Y-%m-%dT%H:%M:%S%.3f%:z")
.format("%Y-%m-%dT%H:%M:%S%.6f%:z")
.to_string()
} else {
chrono::Utc::now()
.format("%Y-%m-%dT%H:%M:%S%.3fZ")
.format("%Y-%m-%dT%H:%M:%S%.6fZ")
.to_string()
}
}
......@@ -248,6 +493,23 @@ impl CustomJsonFormatter {
}
}
use once_cell::sync::Lazy;
use regex::Regex;
fn parse_tracing_duration(s: &str) -> Option<u64> {
static RE: Lazy<Regex> =
Lazy::new(|| Regex::new(r#"^["']?\s*([0-9.]+)\s*(µs|us|ns|ms|s)\s*["']?$"#).unwrap());
let captures = RE.captures(s)?;
let value: f64 = captures[1].parse().ok()?;
let unit = &captures[2];
match unit {
"ns" => Some((value / 1000.0) as u64),
"µs" | "us" => Some(value as u64),
"ms" => Some((value * 1000.0) as u64),
"s" => Some((value * 1_000_000.0) as u64),
_ => None,
}
}
impl<S, N> tracing_subscriber::fmt::FormatEvent<S, N> for CustomJsonFormatter
where
S: Subscriber + for<'a> LookupSpan<'a>,
......@@ -260,8 +522,9 @@ where
event: &Event<'_>,
) -> std::fmt::Result {
let mut visitor = JsonVisitor::default();
let time = self.time_formatter.format_now();
event.record(&mut visitor);
let message = visitor
let mut message = visitor
.fields
.remove("message")
.unwrap_or(serde_json::Value::String("".to_string()));
......@@ -284,26 +547,109 @@ where
serde_json::Value::String(value.trim_matches('"').to_string()),
);
}
let busy_us = visitor
.fields
.remove("time.busy")
.and_then(|v| parse_tracing_duration(&v.to_string()));
let idle_us = visitor
.fields
.remove("time.idle")
.and_then(|v| parse_tracing_duration(&v.to_string()));
if let (Some(busy_us), Some(idle_us)) = (busy_us, idle_us) {
visitor.fields.insert(
"time.busy_us".to_string(),
serde_json::Value::Number(busy_us.into()),
);
visitor.fields.insert(
"time.idle_us".to_string(),
serde_json::Value::Number(idle_us.into()),
);
visitor.fields.insert(
"time.duration_us".to_string(),
serde_json::Value::Number((busy_us + idle_us).into()),
);
}
message = match message.as_str() {
Some("new") => serde_json::Value::String("SPAN_CREATED".to_string()),
Some("close") => serde_json::Value::String("SPAN_CLOSED".to_string()),
_ => message.clone(),
};
visitor.fields.insert(
"span_name".to_string(),
serde_json::Value::String(span.name().to_string()),
);
}
if let Some(tracing_context) = ext.get::<DistributedTraceContext>() {
visitor.fields.insert(
"span_id".to_string(),
serde_json::Value::String(tracing_context.span_id.clone()),
);
visitor.fields.insert(
"trace_id".to_string(),
serde_json::Value::String(tracing_context.trace_id.clone()),
);
if let Some(parent_id) = tracing_context.parent_id.clone() {
visitor.fields.insert(
"parent_id".to_string(),
serde_json::Value::String(parent_id),
);
} else {
visitor.fields.remove("parent_id");
}
if let Some(tracestate) = tracing_context.tracestate.clone() {
visitor.fields.insert(
"tracestate".to_string(),
serde_json::Value::String(tracestate),
);
} else {
visitor.fields.remove("tracestate");
}
if let Some(x_request_id) = tracing_context.x_request_id.clone() {
visitor.fields.insert(
"x_request_id".to_string(),
serde_json::Value::String(x_request_id),
);
} else {
visitor.fields.remove("x_request_id");
}
} else {
tracing::error!(
"Distributed Trace Context not found, falling back to internal ids"
);
visitor.fields.insert(
"span_id".to_string(),
serde_json::Value::String(span.id().into_u64().to_string()),
);
if let Some(parent) = span.parent() {
visitor.fields.insert(
"parent_id".to_string(),
serde_json::Value::String(parent.id().into_u64().to_string()),
);
}
}
} else {
let reserved_fields = [
"trace_id",
"span_id",
"parent_id",
"span_name",
"tracestate",
];
for reserved_field in reserved_fields {
visitor.fields.remove(reserved_field);
}
}
let metadata = event.metadata();
let log = JsonLog {
level: metadata.level().to_string(),
time: self.time_formatter.format_now(),
file_path: if cfg!(debug_assertions) {
metadata.file()
} else {
None
},
line_number: if cfg!(debug_assertions) {
metadata.line()
} else {
None
},
time,
file: metadata.file(),
line: metadata.line(),
target: metadata.target(),
message,
fields: visitor.fields,
};
......@@ -312,10 +658,8 @@ where
}
}
// Visitor to collect fields
#[derive(Default)]
struct JsonVisitor {
// BTreeMap so that it's sorted, and always prints in the same order
fields: BTreeMap<String, serde_json::Value>,
}
......@@ -328,10 +672,14 @@ impl tracing::field::Visit for JsonVisitor {
}
fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
self.fields.insert(
field.name().to_string(),
serde_json::Value::String(value.to_string()),
);
if field.name() != "message" {
match serde_json::from_str::<Value>(value) {
Ok(json_val) => self.fields.insert(field.name().to_string(), json_val),
Err(_) => self.fields.insert(field.name().to_string(), value.into()),
};
} else {
self.fields.insert(field.name().to_string(), value.into());
}
}
fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
......@@ -357,9 +705,314 @@ impl tracing::field::Visit for JsonVisitor {
use serde_json::value::Number;
self.fields.insert(
field.name().to_string(),
// Infinite or NaN values are not JSON numbers, replace them with 0.
// It's unlikely that we would log an inf or nan value.
serde_json::Value::Number(Number::from_f64(value).unwrap_or(0.into())),
);
}
}
#[cfg(test)]
pub mod tests {
use super::*;
use anyhow::{anyhow, Result};
use chrono::{DateTime, Utc};
use jsonschema::{Draft, JSONSchema};
use serde_json::Value;
use std::fs::File;
use std::io::{BufRead, BufReader};
use stdio_override::*;
use tempfile::NamedTempFile;
static LOG_LINE_SCHEMA: &str = r#"
{
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Runtime Log Line",
"type": "object",
"required": [
"file",
"level",
"line",
"message",
"target",
"time"
],
"properties": {
"file": { "type": "string" },
"level": { "type": "string", "enum": ["ERROR", "WARN", "INFO", "DEBUG", "TRACE"] },
"line": { "type": "integer" },
"message": { "type": "string" },
"target": { "type": "string" },
"time": { "type": "string", "format": "date-time" },
"span_id": { "type": "string", "pattern": "^[a-f0-9]{16}$" },
"parent_id": { "type": "string", "pattern": "^[a-f0-9]{16}$" },
"trace_id": { "type": "string", "pattern": "^[a-f0-9]{32}$" },
"span_name": { "type": "string" },
"time.busy_us": { "type": "integer" },
"time.duration_us": { "type": "integer" },
"time.idle_us": { "type": "integer" },
"tracestate": { "type": "string" }
},
"additionalProperties": true
}
"#;
#[tracing::instrument(
skip_all,
fields(
span_id = "abd16e319329445f",
trace_id = "2adfd24468724599bb9a4990dc342288"
)
)]
async fn parent() {
tracing::Span::current().record("trace_id", "invalid");
tracing::Span::current().record("span_id", "invalid");
tracing::Span::current().record("span_name", "invalid");
tracing::trace!(message = "parent!");
if let Some(my_ctx) = get_distributed_tracing_context() {
tracing::info!(my_trace_id = my_ctx.trace_id);
}
child().await;
}
#[tracing::instrument(skip_all)]
async fn child() {
tracing::trace!(message = "child");
if let Some(my_ctx) = get_distributed_tracing_context() {
tracing::info!(my_trace_id = my_ctx.trace_id);
}
grandchild().await;
}
#[tracing::instrument(skip_all)]
async fn grandchild() {
tracing::trace!(message = "grandchild");
if let Some(my_ctx) = get_distributed_tracing_context() {
tracing::info!(my_trace_id = my_ctx.trace_id);
}
}
pub fn load_log(file_name: &str) -> Result<Vec<serde_json::Value>> {
let schema_json: Value =
serde_json::from_str(LOG_LINE_SCHEMA).expect("schema parse failure");
let compiled_schema = JSONSchema::options()
.with_draft(Draft::Draft7)
.compile(&schema_json)
.expect("Invalid schema");
let f = File::open(file_name)?;
let reader = BufReader::new(f);
let mut result = Vec::new();
for (line_num, line) in reader.lines().enumerate() {
let line = line?;
let val: Value = serde_json::from_str(&line)
.map_err(|e| anyhow!("Line {}: invalid JSON: {}", line_num + 1, e))?;
if let Err(errors) = compiled_schema.validate(&val) {
let errs = errors.map(|e| e.to_string()).collect::<Vec<_>>().join("; ");
return Err(anyhow!(
"Line {}: JSON Schema Validation errors: {}",
line_num + 1,
errs
));
}
println!("{}", val);
result.push(val);
}
Ok(result)
}
#[tokio::test]
async fn test_json_log_capture() -> Result<()> {
#[allow(clippy::redundant_closure_call)]
let _ = temp_env::async_with_vars(
[("DYN_LOGGING_JSONL", Some("1"))],
(async || {
let tmp_file = NamedTempFile::new().unwrap();
let file_name = tmp_file.path().to_str().unwrap();
let guard = StderrOverride::from_file(file_name)?;
init();
parent().await;
drop(guard);
let lines = load_log(file_name)?;
// 1. Validate my_trace_id matches parent's trace ID
let parent_trace_id = Uuid::parse_str("2adfd24468724599bb9a4990dc342288")
.unwrap()
.simple()
.to_string();
for log_line in &lines {
if let Some(my_trace_id) = log_line.get("my_trace_id") {
assert_eq!(
my_trace_id,
&serde_json::Value::String(parent_trace_id.clone())
);
}
}
// 2. Validate span IDs are unique for SPAN_CREATED and SPAN_CLOSED events
let mut created_span_ids: Vec<String> = Vec::new();
let mut closed_span_ids: Vec<String> = Vec::new();
for log_line in &lines {
if let Some(message) = log_line.get("message") {
match message.as_str().unwrap() {
"SPAN_CREATED" => {
if let Some(span_id) = log_line.get("span_id") {
let span_id_str = span_id.as_str().unwrap();
assert!(
created_span_ids.iter().all(|id| id != span_id_str),
"Duplicate span ID found in SPAN_CREATED: {}",
span_id_str
);
created_span_ids.push(span_id_str.to_string());
}
}
"SPAN_CLOSED" => {
if let Some(span_id) = log_line.get("span_id") {
let span_id_str = span_id.as_str().unwrap();
assert!(
closed_span_ids.iter().all(|id| id != span_id_str),
"Duplicate span ID found in SPAN_CLOSED: {}",
span_id_str
);
closed_span_ids.push(span_id_str.to_string());
}
}
_ => {}
}
}
}
// Additionally, ensure that every SPAN_CLOSED has a corresponding SPAN_CREATED
for closed_span_id in &closed_span_ids {
assert!(
created_span_ids.contains(closed_span_id),
"SPAN_CLOSED without corresponding SPAN_CREATED: {}",
closed_span_id
);
}
// 3. Validate parent span relationships
let parent_span_id = lines
.iter()
.find(|log_line| {
log_line.get("message").unwrap().as_str().unwrap() == "SPAN_CREATED"
&& log_line.get("span_name").unwrap().as_str().unwrap() == "parent"
})
.and_then(|log_line| {
log_line
.get("span_id")
.map(|s| s.as_str().unwrap().to_string())
})
.unwrap();
let child_span_id = lines
.iter()
.find(|log_line| {
log_line.get("message").unwrap().as_str().unwrap() == "SPAN_CREATED"
&& log_line.get("span_name").unwrap().as_str().unwrap() == "child"
})
.and_then(|log_line| {
log_line
.get("span_id")
.map(|s| s.as_str().unwrap().to_string())
})
.unwrap();
let _grandchild_span_id = lines
.iter()
.find(|log_line| {
log_line.get("message").unwrap().as_str().unwrap() == "SPAN_CREATED"
&& log_line.get("span_name").unwrap().as_str().unwrap() == "grandchild"
})
.and_then(|log_line| {
log_line
.get("span_id")
.map(|s| s.as_str().unwrap().to_string())
})
.unwrap();
// Parent span has no parent_id
for log_line in &lines {
if log_line.get("span_name").unwrap().as_str().unwrap() == "parent" {
assert!(log_line.get("parent_id").is_none());
}
}
// Child span's parent_id is parent_span_id
for log_line in &lines {
if log_line.get("span_name").unwrap().as_str().unwrap() == "child" {
assert_eq!(
log_line.get("parent_id").unwrap().as_str().unwrap(),
&parent_span_id
);
}
}
// Grandchild span's parent_id is child_span_id
for log_line in &lines {
if log_line.get("span_name").unwrap().as_str().unwrap() == "grandchild" {
assert_eq!(
log_line.get("parent_id").unwrap().as_str().unwrap(),
&child_span_id
);
}
}
// Validate duration relationships
let parent_duration = lines
.iter()
.find(|log_line| {
log_line.get("message").unwrap().as_str().unwrap() == "SPAN_CLOSED"
&& log_line.get("span_name").unwrap().as_str().unwrap() == "parent"
})
.and_then(|log_line| {
log_line
.get("time.duration_us")
.map(|d| d.as_u64().unwrap())
})
.unwrap();
let child_duration = lines
.iter()
.find(|log_line| {
log_line.get("message").unwrap().as_str().unwrap() == "SPAN_CLOSED"
&& log_line.get("span_name").unwrap().as_str().unwrap() == "child"
})
.and_then(|log_line| {
log_line
.get("time.duration_us")
.map(|d| d.as_u64().unwrap())
})
.unwrap();
let grandchild_duration = lines
.iter()
.find(|log_line| {
log_line.get("message").unwrap().as_str().unwrap() == "SPAN_CLOSED"
&& log_line.get("span_name").unwrap().as_str().unwrap() == "grandchild"
})
.and_then(|log_line| {
log_line
.get("time.duration_us")
.map(|d| d.as_u64().unwrap())
})
.unwrap();
assert!(
parent_duration > child_duration + grandchild_duration,
"Parent duration is not greater than the sum of child and grandchild durations"
);
assert!(
child_duration > grandchild_duration,
"Child duration is not greater than grandchild duration"
);
Ok::<(), anyhow::Error>(())
})(),
)
.await;
Ok(())
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment