Unverified Commit 06939770 authored by Neelay Shah's avatar Neelay Shah Committed by GitHub
Browse files

feat: add span event logging and fix trace context extraction (#5400)


Co-authored-by: default avatarClaude Opus 4.5 <noreply@anthropic.com>
parent 971c3069
......@@ -8,15 +8,16 @@ SPDX-License-Identifier: Apache-2.0
## Overview
Dynamo provides structured logging in both text as well as JSONL. When
JSONL is enabled logs additionally contain `span` creation and exit
events as well as support for `trace_id` and `span_id` fields for
distributed tracing.
JSONL is enabled, logs support `trace_id` and `span_id` fields for
distributed tracing. Span creation and exit events can be optionally
enabled via the `DYN_LOGGING_SPAN_EVENTS` environment variable.
## Environment Variables
| Variable | Description | Default | Example |
|----------|-------------|---------|---------|
| `DYN_LOGGING_JSONL` | Enable JSONL logging format | `false` | `true` |
| `DYN_LOGGING_SPAN_EVENTS` | Enable span entry/close event logging (`SPAN_FIRST_ENTRY`, `SPAN_CLOSED` messages) | `false` | `true` |
| `DYN_LOG` | Log levels per target `<default_level>,<module_path>=<level>,<module_path>=<level>` | `info` | `DYN_LOG=info,dynamo_runtime::system_status_server:trace` |
| `DYN_LOG_USE_LOCAL_TZ` | Use local timezone for timestamps (default is UTC) | `false` | `true` |
| `DYN_LOGGING_CONFIG_PATH` | Path to custom TOML logging configuration | none | `/path/to/config.toml` |
......
......@@ -473,6 +473,11 @@ pub fn use_local_timezone() -> bool {
env_is_truthy(environment_names::logging::DYN_LOG_USE_LOCAL_TZ)
}
/// Returns true if `DYN_LOGGING_SPAN_EVENTS` is set to a truthy value.
pub fn span_events_enabled() -> bool {
env_is_truthy(environment_names::logging::DYN_LOGGING_SPAN_EVENTS)
}
#[cfg(test)]
mod tests {
use super::*;
......
......@@ -37,6 +37,9 @@ pub mod logging {
/// Use local timezone for logging timestamps (default is UTC)
pub const DYN_LOG_USE_LOCAL_TZ: &str = "DYN_LOG_USE_LOCAL_TZ";
/// Enable span event logging (create/close events)
pub const DYN_LOGGING_SPAN_EVENTS: &str = "DYN_LOGGING_SPAN_EVENTS";
/// OTLP (OpenTelemetry Protocol) tracing configuration
pub mod otlp {
/// Enable OTLP trace exporting (set to "1" to enable)
......@@ -347,6 +350,7 @@ mod tests {
logging::DYN_LOGGING_JSONL,
logging::DYN_SDK_DISABLE_ANSI_LOGGING,
logging::DYN_LOG_USE_LOCAL_TZ,
logging::DYN_LOGGING_SPAN_EVENTS,
logging::otlp::OTEL_EXPORT_ENABLED,
logging::otlp::OTEL_EXPORTER_OTLP_TRACES_ENDPOINT,
logging::otlp::OTEL_SERVICE_NAME,
......
......@@ -47,7 +47,7 @@ use tracing_subscriber::prelude::*;
use tracing_subscriber::registry::LookupSpan;
use tracing_subscriber::{filter::Directive, fmt};
use crate::config::{disable_ansi_logging, jsonl_logging_enabled};
use crate::config::{disable_ansi_logging, jsonl_logging_enabled, span_events_enabled};
use async_nats::{HeaderMap, HeaderValue};
use axum::extract::FromRequestParts;
use axum::http;
......@@ -186,6 +186,22 @@ struct PendingDistributedTraceContext {
x_dynamo_request_id: Option<String>,
}
/// Macro to emit a tracing event at a dynamic level with a custom target.
macro_rules! emit_at_level {
($level:expr, target: $target:expr, $($arg:tt)*) => {
// tracing::event! requires a compile-time constant level, so we must match
// on the runtime level and use a literal Level constant in each arm.
// See: https://github.com/tokio-rs/tracing/issues/2730
match $level {
&tracing::Level::ERROR => tracing::event!(target: $target, tracing::Level::ERROR, $($arg)*),
&tracing::Level::WARN => tracing::event!(target: $target, tracing::Level::WARN, $($arg)*),
&tracing::Level::INFO => tracing::event!(target: $target, tracing::Level::INFO, $($arg)*),
&tracing::Level::DEBUG => tracing::event!(target: $target, tracing::Level::DEBUG, $($arg)*),
&tracing::Level::TRACE => tracing::event!(target: $target, tracing::Level::TRACE, $($arg)*),
}
};
}
impl DistributedTraceContext {
/// Create a traceparent string from the context
pub fn create_traceparent(&self) -> String {
......@@ -811,7 +827,7 @@ where
panic!("span_id is not set in on_enter - OtelData may not be properly initialized");
}
// Re-acquire mutable borrow to insert the finalized context
let span_level = span.metadata().level();
let mut extensions = span.extensions_mut();
extensions.insert(DistributedTraceContext {
trace_id: trace_id.expect("Trace ID must be set"),
......@@ -823,6 +839,14 @@ where
x_request_id,
x_dynamo_request_id,
});
drop(extensions);
// Emit SPAN_FIRST_ENTRY event. This only runs if the span passed the layer's filter
// (on_enter is not called for filtered-out spans), so no additional check needed.
if span_events_enabled() {
emit_at_level!(span_level, target: "span_event", message = "SPAN_FIRST_ENTRY");
}
}
}
}
......@@ -881,8 +905,14 @@ fn setup_logging() -> Result<(), Box<dyn std::error::Error>> {
let otel_filter_layer = filters(load_config());
if jsonl_logging_enabled() {
let span_events = if span_events_enabled() {
FmtSpan::CLOSE
} else {
FmtSpan::NONE
};
let l = fmt::layer()
.with_ansi(false)
.with_span_events(span_events)
.event_format(CustomJsonFormatter::new())
.with_writer(std::io::stderr)
.with_filter(fmt_filter_layer);
......@@ -981,6 +1011,13 @@ fn filters(config: LoggingConfig) -> EnvFilter {
}
}
}
// When span events are enabled, allow "span_event" target at all levels
// This ensures SPAN_FIRST_ENTRY events pass the filter when emitted from on_enter
if span_events_enabled() {
filter_layer = filter_layer.add_directive("span_event=trace".parse().unwrap());
}
filter_layer
}
......@@ -1025,7 +1062,7 @@ struct JsonLog<'a> {
file: Option<&'a str>,
#[serde(skip_serializing_if = "Option::is_none")]
line: Option<u32>,
target: &'a str,
target: String,
message: serde_json::Value,
#[serde(flatten)]
fields: BTreeMap<String, serde_json::Value>,
......@@ -1114,6 +1151,8 @@ where
.remove("message")
.unwrap_or(serde_json::Value::String("".to_string()));
let mut target_override: Option<String> = None;
let current_span = event
.parent()
.and_then(|id| ctx.span(id))
......@@ -1157,11 +1196,14 @@ where
);
}
message = match message.as_str() {
Some("new") => serde_json::Value::String("SPAN_CREATED".to_string()),
Some("close") => serde_json::Value::String("SPAN_CLOSED".to_string()),
_ => message.clone(),
};
let is_span_created = message.as_str() == Some("SPAN_FIRST_ENTRY");
let is_span_closed = message.as_str() == Some("close");
if is_span_created || is_span_closed {
target_override = Some(span.metadata().target().to_string());
if is_span_closed {
message = serde_json::Value::String("SPAN_CLOSED".to_string());
}
}
visitor.fields.insert(
"span_name".to_string(),
......@@ -1243,7 +1285,7 @@ where
time,
file: metadata.file(),
line: metadata.line(),
target: metadata.target(),
target: target_override.unwrap_or_else(|| metadata.target().to_string()),
message,
fields: visitor.fields,
};
......@@ -1618,4 +1660,228 @@ pub mod tests {
.await;
Ok(())
}
// Test functions at different log levels for filtering tests
#[tracing::instrument(level = "debug", skip_all)]
async fn debug_level_span() {
tracing::debug!("inside debug span");
}
#[tracing::instrument(level = "info", skip_all)]
async fn info_level_span() {
tracing::info!("inside info span");
}
#[tracing::instrument(level = "warn", skip_all)]
async fn warn_level_span() {
tracing::warn!("inside warn span");
}
// Span from a different target - should be FILTERED OUT at info level
// because the filter is warn,dynamo_runtime::logging::tests=debug
#[tracing::instrument(level = "info", target = "other_module", skip_all)]
async fn other_target_info_span() {
tracing::info!(target: "other_module", "inside other target span");
}
/// Comprehensive test for span events covering:
/// - SPAN_FIRST_ENTRY and SPAN_CLOSED event emission
/// - Trace context (trace_id, span_id) in span events
/// - Timing information in SPAN_CLOSED events
/// - Level-based filtering (positive: allowed levels pass, negative: filtered levels blocked)
/// - Target-based filtering (spans from allowed targets pass even at lower levels)
///
/// This test runs in a subprocess to ensure logging is initialized with our specific
/// filter settings (DYN_LOG=warn,dynamo_runtime::logging::tests=debug), avoiding
/// interference from other tests that may have initialized logging first.
#[test]
fn test_span_events() {
use std::process::Command;
// Run cargo test for the subprocess test with specific env vars
let output = Command::new("cargo")
.args([
"test",
"-p",
"dynamo-runtime",
"test_span_events_subprocess",
"--",
"--exact",
"--nocapture",
])
.env("DYN_LOGGING_JSONL", "1")
.env("DYN_LOGGING_SPAN_EVENTS", "1")
.env("DYN_LOG", "warn,dynamo_runtime::logging::tests=debug")
.output()
.expect("Failed to execute subprocess test");
// Print output for debugging
if !output.status.success() {
eprintln!(
"=== STDOUT ===\n{}",
String::from_utf8_lossy(&output.stdout)
);
eprintln!(
"=== STDERR ===\n{}",
String::from_utf8_lossy(&output.stderr)
);
}
assert!(
output.status.success(),
"Subprocess test failed with exit code: {:?}",
output.status.code()
);
}
/// Subprocess test that performs the actual span event validation.
/// This is called by test_span_events in a separate process with controlled env vars.
#[tokio::test]
async fn test_span_events_subprocess() -> Result<()> {
// Skip if not running as subprocess (env vars not set)
if std::env::var("DYN_LOGGING_SPAN_EVENTS").is_err() {
return Ok(());
}
let tmp_file = NamedTempFile::new().unwrap();
let file_name = tmp_file.path().to_str().unwrap();
let guard = StderrOverride::from_file(file_name)?;
init();
// Run parent/child/grandchild spans (all INFO level by default)
parent().await;
// Run spans at explicit levels from our test module
debug_level_span().await;
info_level_span().await;
warn_level_span().await;
// Run span from different target (should be filtered out)
other_target_info_span().await;
drop(guard);
let lines = load_log(file_name)?;
// Helper to check if a span event exists
let has_span_event = |msg: &str, span_name: &str| {
lines.iter().any(|log| {
log.get("message").and_then(|v| v.as_str()) == Some(msg)
&& log.get("span_name").and_then(|v| v.as_str()) == Some(span_name)
})
};
// Helper to get span events
let get_span_events = |msg: &str| -> Vec<&serde_json::Value> {
lines
.iter()
.filter(|log| log.get("message").and_then(|v| v.as_str()) == Some(msg))
.collect()
};
// === Test 1: SPAN_FIRST_ENTRY events have required fields ===
let span_created_events = get_span_events("SPAN_FIRST_ENTRY");
for event in &span_created_events {
// Must have span_name
assert!(
event.get("span_name").is_some(),
"SPAN_FIRST_ENTRY must have span_name"
);
// Must have valid trace_id (format check)
let trace_id = event
.get("trace_id")
.and_then(|v| v.as_str())
.expect("SPAN_FIRST_ENTRY must have trace_id");
assert!(
trace_id.len() == 32 && trace_id.chars().all(|c| c.is_ascii_hexdigit()),
"SPAN_FIRST_ENTRY must have valid trace_id format"
);
// Must have valid span_id
let span_id = event
.get("span_id")
.and_then(|v| v.as_str())
.expect("SPAN_FIRST_ENTRY must have span_id");
assert!(
is_valid_span_id(span_id),
"SPAN_FIRST_ENTRY must have valid span_id"
);
}
// === Test 2: SPAN_CLOSED events have timing info ===
let span_closed_events = get_span_events("SPAN_CLOSED");
for event in &span_closed_events {
assert!(
event.get("span_name").is_some(),
"SPAN_CLOSED must have span_name"
);
assert!(
event.get("time.busy_us").is_some()
|| event.get("time.idle_us").is_some()
|| event.get("time.duration_us").is_some(),
"SPAN_CLOSED must have timing information"
);
// Must have valid trace_id
let trace_id = event
.get("trace_id")
.and_then(|v| v.as_str())
.expect("SPAN_CLOSED must have trace_id");
assert!(
trace_id.len() == 32 && trace_id.chars().all(|c| c.is_ascii_hexdigit()),
"SPAN_CLOSED must have valid trace_id format"
);
}
// === Test 3: Target-based filtering (positive) ===
// Spans from dynamo_runtime::logging::tests should pass at ALL levels
// because the target is allowed at debug level
assert!(
has_span_event("SPAN_FIRST_ENTRY", "debug_level_span"),
"DEBUG span from allowed target MUST pass (target=debug filter)"
);
assert!(
has_span_event("SPAN_FIRST_ENTRY", "info_level_span"),
"INFO span from allowed target MUST pass (target=debug filter)"
);
assert!(
has_span_event("SPAN_FIRST_ENTRY", "warn_level_span"),
"WARN span from allowed target MUST pass (target=debug filter)"
);
// parent/child/grandchild are INFO level from allowed target - should pass
assert!(
has_span_event("SPAN_FIRST_ENTRY", "parent"),
"parent span (INFO) from allowed target MUST pass"
);
assert!(
has_span_event("SPAN_FIRST_ENTRY", "child"),
"child span (INFO) from allowed target MUST pass"
);
assert!(
has_span_event("SPAN_FIRST_ENTRY", "grandchild"),
"grandchild span (INFO) from allowed target MUST pass"
);
// === Test 4: Level-based filtering (negative) ===
// Verify spans from OTHER targets at debug/info level are filtered out
assert!(
!has_span_event("SPAN_FIRST_ENTRY", "other_target_info_span"),
"INFO span from non-allowed target (other_module) MUST be filtered out"
);
// Also verify no spans from other targets appear at debug/info level
for event in &span_created_events {
let target = event.get("target").and_then(|v| v.as_str()).unwrap_or("");
let level = event.get("level").and_then(|v| v.as_str()).unwrap_or("");
// If level is DEBUG or INFO, target must be our test module
if level == "DEBUG" || level == "INFO" {
assert!(
target.contains("dynamo_runtime::logging::tests"),
"DEBUG/INFO span must be from allowed target, got target={target}"
);
}
}
Ok(())
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment