Unverified Commit 82794761 authored by Biswa Panda's avatar Biswa Panda Committed by GitHub
Browse files

feat(perf): add pipeline instrumentation and feature gated NVTX markers (#6746)

parent ba9a8a9f
...@@ -2136,6 +2136,7 @@ dependencies = [ ...@@ -2136,6 +2136,7 @@ dependencies = [
"chrono", "chrono",
"console-subscriber", "console-subscriber",
"criterion 0.5.1", "criterion 0.5.1",
"cudarc",
"dashmap 6.1.0", "dashmap 6.1.0",
"derive-getters", "derive-getters",
"derive_builder", "derive_builder",
......
...@@ -151,3 +151,10 @@ insta.opt-level = 3 ...@@ -151,3 +151,10 @@ insta.opt-level = 3
# These make the build much slower but shrink the binary, and could help performance # These make the build much slower but shrink the binary, and could help performance
codegen-units = 1 codegen-units = 1
lto = "thin" lto = "thin"
# Profiling profile: release-like but retains debug symbols for perf/flamegraph/Nsight.
# Build: cargo build --profile profiling --features nvtx
[profile.profiling]
inherits = "release"
debug = true
strip = false
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Mirrors the root .cargo/config.toml so this standalone workspace
# gets the same build flags (tokio_unstable is required for Tokio metrics APIs).
[build]
rustflags = ["--cfg", "tokio_unstable"]
...@@ -1801,6 +1801,7 @@ dependencies = [ ...@@ -1801,6 +1801,7 @@ dependencies = [
"blake3", "blake3",
"bytes", "bytes",
"chrono", "chrono",
"cudarc",
"dashmap 6.1.0", "dashmap 6.1.0",
"derive-getters", "derive-getters",
"derive_builder", "derive_builder",
......
...@@ -27,6 +27,7 @@ media-ffmpeg = ["dynamo-llm/media-ffmpeg"] ...@@ -27,6 +27,7 @@ media-ffmpeg = ["dynamo-llm/media-ffmpeg"]
kv-indexer = ["dep:clap", "dep:tracing-subscriber"] kv-indexer = ["dep:clap", "dep:tracing-subscriber"]
kv-indexer-runtime = ["kv-indexer", "dynamo-kv-router/indexer-runtime"] kv-indexer-runtime = ["kv-indexer", "dynamo-kv-router/indexer-runtime"]
kv-indexer-metrics = ["kv-indexer", "dynamo-kv-router/metrics"] kv-indexer-metrics = ["kv-indexer", "dynamo-kv-router/metrics"]
nvtx = ["dynamo-runtime/nvtx"]
[dependencies] [dependencies]
dynamo-runtime = { path = "../../runtime" } dynamo-runtime = { path = "../../runtime" }
...@@ -79,3 +80,8 @@ dynamo-llm = { path = "../../llm" } ...@@ -79,3 +80,8 @@ dynamo-llm = { path = "../../llm" }
dynamo-llm = { path = "../../llm", default-features = false } dynamo-llm = { path = "../../llm", default-features = false }
[dev-dependencies] [dev-dependencies]
[profile.profiling]
inherits = "release"
debug = true
strip = false
...@@ -51,8 +51,10 @@ class frontend_perf: ...@@ -51,8 +51,10 @@ class frontend_perf:
TOKENIZE_SECONDS = "tokenize_seconds" TOKENIZE_SECONDS = "tokenize_seconds"
# Template application time in preprocessor # Template application time in preprocessor
TEMPLATE_SECONDS = "template_seconds" TEMPLATE_SECONDS = "template_seconds"
# Per-token detokenization cost (microseconds) # Cumulative detokenization time (microseconds); pair with DETOKENIZE_TOKEN_COUNT
DETOKENIZE_PER_TOKEN_US = "detokenize_per_token_us" DETOKENIZE_TOTAL_US = "detokenize_total_us"
# Total tokens detokenized; use rate(total_us)/rate(count) for per-token average
DETOKENIZE_TOKEN_COUNT = "detokenize_token_count"
# Event loop delay canary (sleep 10ms, measure drift) # Event loop delay canary (sleep 10ms, measure drift)
EVENT_LOOP_DELAY_SECONDS = "event_loop_delay_seconds" EVENT_LOOP_DELAY_SECONDS = "event_loop_delay_seconds"
# Count of event loop stalls (delay > 5ms) # Count of event loop stalls (delay > 5ms)
...@@ -245,16 +247,18 @@ class name_prefix: ...@@ -245,16 +247,18 @@ class name_prefix:
FRONTEND = "dynamo_frontend" FRONTEND = "dynamo_frontend"
# Prefix for KV router metrics (used with router_id label) # Prefix for KV router metrics (used with router_id label)
ROUTER = "dynamo_router" ROUTER = "dynamo_router"
# Prefix for request-plane (transport-agnostic) metrics at AddressedPushRouter
REQUEST_PLANE = "dynamo_request_plane"
# Prefix for tokio runtime metrics # Prefix for tokio runtime metrics
TOKIO = "dynamo_tokio" TOKIO = "dynamo_tokio"
# Prefix for standalone KV indexer metrics # Prefix for standalone KV indexer metrics
KVINDEXER = "dynamo_kvindexer" KVINDEXER = "dynamo_kvindexer"
# Prefix for request-plane metrics at AddressedPushRouter
REQUEST_PLANE = "dynamo_request_plane"
# Prefix for transport-layer metrics (TCP / NATS) # Prefix for transport-layer metrics (TCP / NATS)
TRANSPORT = "dynamo_transport" TRANSPORT = "dynamo_transport"
# Prefix for work-handler transport breakdown metrics (backend side) # Prefix for work-handler transport breakdown metrics (backend side)
WORK_HANDLER = "dynamo_work_handler" WORK_HANDLER = "dynamo_work_handler"
# Prefix for routing overhead metrics (raw Prometheus, not component-scoped)
ROUTING_OVERHEAD = "dynamo_routing_overhead"
class request_plane: class request_plane:
......
...@@ -22,6 +22,8 @@ testing-cuda = ["dep:cudarc", "dynamo-memory/testing-cuda"] ...@@ -22,6 +22,8 @@ testing-cuda = ["dep:cudarc", "dynamo-memory/testing-cuda"]
testing-nixl = ["dep:nixl-sys", "dynamo-memory/testing-nixl"] testing-nixl = ["dep:nixl-sys", "dynamo-memory/testing-nixl"]
testing-etcd = [] testing-etcd = []
block-manager = ["dep:nixl-sys", "dep:cudarc", "dep:nix", "dep:aligned-vec"] block-manager = ["dep:nixl-sys", "dep:cudarc", "dep:nix", "dep:aligned-vec"]
# Forward the NVTX feature to dynamo-runtime (build with --features nvtx or dynamo-llm/nvtx)
nvtx = ["dynamo-runtime/nvtx"]
block-manager-bench = ["block-manager", "testing-full", "dep:clap", "dep:indicatif"] block-manager-bench = ["block-manager", "testing-full", "dep:clap", "dep:indicatif"]
cuda = ["dep:cudarc"] cuda = ["dep:cudarc"]
integration = ["dynamo-runtime/integration"] integration = ["dynamo-runtime/integration"]
......
...@@ -21,6 +21,7 @@ use anyhow::Result; ...@@ -21,6 +21,7 @@ use anyhow::Result;
use futures::stream::{self, StreamExt}; use futures::stream::{self, StreamExt};
use crate::model_card::ModelDeploymentCard; use crate::model_card::ModelDeploymentCard;
use dynamo_runtime::dynamo_nvtx_range;
use dynamo_runtime::{ use dynamo_runtime::{
pipeline::{ pipeline::{
AsyncEngineContextProvider, ManyOut, Operator, ResponseStream, ServerStreamingEngine, AsyncEngineContextProvider, ManyOut, Operator, ResponseStream, ServerStreamingEngine,
...@@ -468,9 +469,13 @@ impl Decoder { ...@@ -468,9 +469,13 @@ impl Decoder {
// decode the token // decode the token
let detokenize_start = Instant::now(); let detokenize_start = Instant::now();
let token = self.decode_stream.step(token_id)?; let token = {
let _nvtx = dynamo_nvtx_range!("detokenize");
self.decode_stream.step(token_id)?
};
let detokenize_elapsed = detokenize_start.elapsed();
if let Some(tracker) = &self.tracker { if let Some(tracker) = &self.tracker {
tracker.record_detokenize_latency(detokenize_start.elapsed()); tracker.record_detokenize_latency(detokenize_elapsed);
} }
// stop conditions to not apply until the minimum number of tokens have been generated // stop conditions to not apply until the minimum number of tokens have been generated
......
...@@ -53,7 +53,6 @@ pub async fn run( ...@@ -53,7 +53,6 @@ pub async fn run(
http_service_builder.cancel_token(Some(distributed_runtime.primary_token())); http_service_builder.cancel_token(Some(distributed_runtime.primary_token()));
http_service_builder = http_service_builder =
http_service_builder.with_request_template(engine_config.local_model().request_template()); http_service_builder.with_request_template(engine_config.local_model().request_template());
// Inject the DRT's metrics registry so that component-scoped metrics // Inject the DRT's metrics registry so that component-scoped metrics
// (e.g. KvIndexerMetrics) are exposed (default port 8000 if not overridden). // (e.g. KvIndexerMetrics) are exposed (default port 8000 if not overridden).
http_service_builder = http_service_builder =
......
...@@ -29,6 +29,12 @@ use dynamo_runtime::config::env_is_truthy; ...@@ -29,6 +29,12 @@ use dynamo_runtime::config::env_is_truthy;
use dynamo_runtime::config::environment_names::llm as env_llm; use dynamo_runtime::config::environment_names::llm as env_llm;
use dynamo_runtime::discovery::Discovery; use dynamo_runtime::discovery::Discovery;
use dynamo_runtime::logging::make_request_span; use dynamo_runtime::logging::make_request_span;
use dynamo_runtime::metrics::{
frontend_perf::ensure_frontend_perf_metrics_registered_prometheus,
request_plane::ensure_request_plane_metrics_registered_prometheus,
tokio_perf::{ensure_tokio_perf_metrics_registered_prometheus, tokio_metrics_and_canary_loop},
transport_metrics::ensure_transport_metrics_registered_prometheus,
};
use std::net::SocketAddr; use std::net::SocketAddr;
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
...@@ -306,9 +312,14 @@ impl HttpService { ...@@ -306,9 +312,14 @@ impl HttpService {
.handle(handle.clone()) .handle(handle.clone())
.serve(router.into_make_service()); .serve(router.into_make_service());
// Spawn canary after all fallible startup so it won't leak on early errors
tokio::spawn(tokio_metrics_and_canary_loop(cancel_token.clone()));
tokio::select! { tokio::select! {
result = server => { result = server => {
result.map_err(|e| anyhow::anyhow!("HTTPS server error: {}", e))?; let result = result.map_err(|e| anyhow::anyhow!("HTTPS server error: {}", e));
cancel_token.cancel();
result?;
} }
_ = observer.cancelled() => { _ = observer.cancelled() => {
state_cancel.cancel(); state_cancel.cancel();
...@@ -341,6 +352,9 @@ impl HttpService { ...@@ -341,6 +352,9 @@ impl HttpService {
} }
})?; })?;
// Spawn canary after all fallible startup so it won't leak on early errors
tokio::spawn(tokio_metrics_and_canary_loop(cancel_token.clone()));
axum::serve(listener, router) axum::serve(listener, router)
.with_graceful_shutdown(async move { .with_graceful_shutdown(async move {
observer.cancelled_owned().await; observer.cancelled_owned().await;
...@@ -353,6 +367,7 @@ impl HttpService { ...@@ -353,6 +367,7 @@ impl HttpService {
}) })
.await .await
.inspect_err(|_| cancel_token.cancel())?; .inspect_err(|_| cancel_token.cancel())?;
cancel_token.cancel();
} }
Ok(()) Ok(())
...@@ -461,6 +476,19 @@ impl HttpServiceConfigBuilder { ...@@ -461,6 +476,19 @@ impl HttpServiceConfigBuilder {
} }
} }
if let Err(e) = ensure_request_plane_metrics_registered_prometheus(&registry) {
tracing::warn!("Failed to register request-plane metrics: {}", e);
}
if let Err(e) = ensure_frontend_perf_metrics_registered_prometheus(&registry) {
tracing::warn!("Failed to register frontend perf metrics: {}", e);
}
if let Err(e) = ensure_tokio_perf_metrics_registered_prometheus(&registry) {
tracing::warn!("Failed to register tokio perf metrics: {}", e);
}
if let Err(e) = ensure_transport_metrics_registered_prometheus(&registry) {
tracing::warn!("Failed to register transport metrics: {}", e);
}
let mut router = axum::Router::new(); let mut router = axum::Router::new();
let mut all_docs = Vec::new(); let mut all_docs = Vec::new();
......
...@@ -6,6 +6,7 @@ use std::sync::Arc; ...@@ -6,6 +6,7 @@ use std::sync::Arc;
use anyhow::Result; use anyhow::Result;
use dynamo_kv_router::protocols::{TokensWithHashes, WorkerWithDpRank}; use dynamo_kv_router::protocols::{TokensWithHashes, WorkerWithDpRank};
use dynamo_runtime::{ use dynamo_runtime::{
dynamo_nvtx_range,
pipeline::{ pipeline::{
AsyncEngine, AsyncEngineContextProvider, Error, ManyOut, PushRouter, ResponseStream, AsyncEngine, AsyncEngineContextProvider, Error, ManyOut, PushRouter, ResponseStream,
SingleIn, async_trait, SingleIn, async_trait,
...@@ -222,6 +223,7 @@ impl KvPushRouter { ...@@ -222,6 +223,7 @@ impl KvPushRouter {
phase: RequestPhase, phase: RequestPhase,
is_query_only: bool, is_query_only: bool,
) -> Result<WorkerSelection, Error> { ) -> Result<WorkerSelection, Error> {
let _nvtx_select = dynamo_nvtx_range!("route.select_worker");
let routing = request.routing.as_ref(); let routing = request.routing.as_ref();
let lora_name = routing.and_then(|r| r.lora_name.clone()); let lora_name = routing.and_then(|r| r.lora_name.clone());
let priority_jump = routing.and_then(|r| r.priority_jump).unwrap_or(0.0); let priority_jump = routing.and_then(|r| r.priority_jump).unwrap_or(0.0);
...@@ -242,6 +244,7 @@ impl KvPushRouter { ...@@ -242,6 +244,7 @@ impl KvPushRouter {
}; };
let Some(id) = preselected_id else { let Some(id) = preselected_id else {
let _nvtx_kv = dynamo_nvtx_range!("route.kv_match");
let (best_worker, overlap_amount) = self let (best_worker, overlap_amount) = self
.chooser .chooser
.find_best_match( .find_best_match(
......
...@@ -27,6 +27,12 @@ use futures::Stream; ...@@ -27,6 +27,12 @@ use futures::Stream;
use futures::stream::{self, StreamExt}; use futures::stream::{self, StreamExt};
use prompt::OAIPromptFormatter; use prompt::OAIPromptFormatter;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use dynamo_runtime::dynamo_nvtx_range;
use dynamo_runtime::metrics::frontend_perf::{
DETOKENIZE_TOKEN_COUNT, DETOKENIZE_TOTAL_US, STAGE_DURATION_SECONDS, TEMPLATE_SECONDS,
TOKENIZE_SECONDS,
};
use std::{collections::HashMap, pin::Pin, sync::Arc}; use std::{collections::HashMap, pin::Pin, sync::Arc};
use tracing; use tracing;
...@@ -228,10 +234,16 @@ impl OpenAIPreprocessor { ...@@ -228,10 +234,16 @@ impl OpenAIPreprocessor {
request: &R, request: &R,
tracker: Option<&RequestTracker>, tracker: Option<&RequestTracker>,
) -> Result<(PreprocessedRequest, HashMap<String, String>, bool)> { ) -> Result<(PreprocessedRequest, HashMap<String, String>, bool)> {
let preprocess_start = Instant::now();
let mut builder = self.builder(request)?; let mut builder = self.builder(request)?;
let formatted_prompt = self
.apply_template(request) let template_start = Instant::now();
.with_context(|| "Failed to apply prompt template")?; let formatted_prompt = {
let _nvtx = dynamo_nvtx_range!("preprocess.template");
self.apply_template(request)
.with_context(|| "Failed to apply prompt template")?
};
TEMPLATE_SECONDS.observe(template_start.elapsed().as_secs_f64());
// Check if the chat template injected a reasoning start token at the end // Check if the chat template injected a reasoning start token at the end
// of the prompt (e.g., Qwen3.5 appends `<think>\n` when enable_thinking // of the prompt (e.g., Qwen3.5 appends `<think>\n` when enable_thinking
...@@ -241,13 +253,22 @@ impl OpenAIPreprocessor { ...@@ -241,13 +253,22 @@ impl OpenAIPreprocessor {
.as_ref() .as_ref()
.is_some_and(|p| p.trim_end().ends_with("<think>")); .is_some_and(|p| p.trim_end().ends_with("<think>"));
let annotations = self let tokenize_start = Instant::now();
.gather_tokens(request, &mut builder, formatted_prompt.clone(), tracker) let annotations = {
.with_context(|| "Failed to gather tokens")?; let _nvtx = dynamo_nvtx_range!("preprocess.tokenize");
self.gather_tokens(request, &mut builder, formatted_prompt.clone(), tracker)
.with_context(|| "Failed to gather tokens")?
};
TOKENIZE_SECONDS.observe(tokenize_start.elapsed().as_secs_f64());
self.gather_multi_modal_data(request, &mut builder, formatted_prompt) self.gather_multi_modal_data(request, &mut builder, formatted_prompt)
.await .await
.with_context(|| "Failed to gather multimodal data")?; .with_context(|| "Failed to gather multimodal data")?;
STAGE_DURATION_SECONDS
.with_label_values(&["preprocess"])
.observe(preprocess_start.elapsed().as_secs_f64());
Ok((builder.build()?, annotations, prompt_injected_reasoning)) Ok((builder.build()?, annotations, prompt_injected_reasoning))
} }
...@@ -872,6 +893,15 @@ impl OpenAIPreprocessor { ...@@ -872,6 +893,15 @@ impl OpenAIPreprocessor {
detokenize_count: tracker.as_ref().map(|t| t.detokenize_count()), detokenize_count: tracker.as_ref().map(|t| t.detokenize_count()),
}; };
// Flush per-request detokenize accumulators to global Prometheus counters
// (once per request instead of per-token).
if let Some(t) = tracker.as_ref() {
if let Some(total) = t.detokenize_total_latency() {
DETOKENIZE_TOTAL_US.inc_by(total.as_micros() as f64);
}
DETOKENIZE_TOKEN_COUNT.inc_by(t.detokenize_count() as f64);
}
if let Ok(metrics_annotated) = llm_metrics.to_annotation::<()>() { if let Ok(metrics_annotated) = llm_metrics.to_annotation::<()>() {
// Only set event if not already set to avoid overriding existing events (like errors) // Only set event if not already set to avoid overriding existing events (like errors)
if response.event.is_none() { if response.event.is_none() {
...@@ -937,6 +967,15 @@ impl OpenAIPreprocessor { ...@@ -937,6 +967,15 @@ impl OpenAIPreprocessor {
detokenize_count: tracker.as_ref().map(|t| t.detokenize_count()), detokenize_count: tracker.as_ref().map(|t| t.detokenize_count()),
}; };
// Flush per-request detokenize accumulators to global Prometheus counters
// (once per request instead of per-token).
if let Some(t) = tracker.as_ref() {
if let Some(total) = t.detokenize_total_latency() {
DETOKENIZE_TOTAL_US.inc_by(total.as_micros() as f64);
}
DETOKENIZE_TOKEN_COUNT.inc_by(t.detokenize_count() as f64);
}
// Create annotation string // Create annotation string
let annotation = llm_metrics.to_annotation::<()>().unwrap_or_else(|e| { let annotation = llm_metrics.to_annotation::<()>().unwrap_or_else(|e| {
tracing::warn!("Failed to serialize metrics: {}", e); tracing::warn!("Failed to serialize metrics: {}", e);
......
...@@ -19,6 +19,9 @@ testing-etcd = [] # Tests that require an active ETCD server ...@@ -19,6 +19,9 @@ testing-etcd = [] # Tests that require an active ETCD server
tokio-console = ["dep:console-subscriber", "tokio/tracing"] tokio-console = ["dep:console-subscriber", "tokio/tracing"]
compute-validation = [] # Enable validation and timing for compute macros compute-validation = [] # Enable validation and timing for compute macros
tcp-low-latency = [] # Enable Linux-specific TCP optimizations (TCP_QUICKACK, SO_BUSY_POLL) tcp-low-latency = [] # Enable Linux-specific TCP optimizations (TCP_QUICKACK, SO_BUSY_POLL)
# NVTX timeline annotations for Nsight Systems (compile-time off; also gated at runtime by DYN_ENABLE_RUST_NVTX).
# Overhead: feature off = zero; feature on, env off = ~1ns AtomicBool load; feature on, env on = ~50ns/annotation.
nvtx = ["dep:cudarc"]
[dependencies] [dependencies]
# Use workspace dependencies where available # Use workspace dependencies where available
...@@ -66,6 +69,8 @@ url = { workspace = true } ...@@ -66,6 +69,8 @@ url = { workspace = true }
validator = { workspace = true } validator = { workspace = true }
xxhash-rust = { workspace = true } xxhash-rust = { workspace = true }
cudarc = { workspace = true, features = ["nvtx"], optional = true }
arc-swap = { version = "1" } arc-swap = { version = "1" }
async-once-cell = { version = "0.5.4" } async-once-cell = { version = "0.5.4" }
bincode = { version = "1" } bincode = { version = "1" }
......
...@@ -34,6 +34,7 @@ pub mod distributed; ...@@ -34,6 +34,7 @@ pub mod distributed;
pub mod instances; pub mod instances;
pub mod logging; pub mod logging;
pub mod metrics; pub mod metrics;
pub mod nvtx;
pub mod pipeline; pub mod pipeline;
pub mod prelude; pub mod prelude;
pub mod protocols; pub mod protocols;
......
...@@ -5,7 +5,7 @@ ...@@ -5,7 +5,7 @@
//! Used by both runtime (route, transport_roundtrip) and llm (preprocess, postprocess, tokenize, template, detokenize). //! Used by both runtime (route, transport_roundtrip) and llm (preprocess, postprocess, tokenize, template, detokenize).
use once_cell::sync::{Lazy, OnceCell}; use once_cell::sync::{Lazy, OnceCell};
use prometheus::{Histogram, HistogramOpts, HistogramVec, Registry}; use prometheus::{Counter, Histogram, HistogramOpts, HistogramVec, Opts, Registry};
use super::prometheus_names::{frontend_perf, name_prefix}; use super::prometheus_names::{frontend_perf, name_prefix};
use crate::MetricsRegistry; use crate::MetricsRegistry;
...@@ -57,18 +57,23 @@ pub static TEMPLATE_SECONDS: Lazy<Histogram> = Lazy::new(|| { ...@@ -57,18 +57,23 @@ pub static TEMPLATE_SECONDS: Lazy<Histogram> = Lazy::new(|| {
.expect("template_seconds histogram") .expect("template_seconds histogram")
}); });
/// Per-token detokenization cost (microseconds). /// Cumulative detokenization time across all tokens (microseconds).
pub static DETOKENIZE_PER_TOKEN_US: Lazy<Histogram> = Lazy::new(|| { /// Use `rate(total) / rate(count)` in Prometheus to derive per-token average.
Histogram::with_opts( pub static DETOKENIZE_TOTAL_US: Lazy<Counter> = Lazy::new(|| {
HistogramOpts::new( Counter::with_opts(Opts::new(
frontend_metric_name(frontend_perf::DETOKENIZE_PER_TOKEN_US), frontend_metric_name(frontend_perf::DETOKENIZE_TOTAL_US),
"Detokenization cost per token (microseconds)", "Cumulative detokenization time (microseconds)",
) ))
.buckets(vec![ .expect("detokenize_total_us counter")
1.0, 5.0, 10.0, 25.0, 50.0, 100.0, 250.0, 500.0, 1000.0, });
]),
) /// Total number of tokens detokenized.
.expect("detokenize_per_token_us histogram") pub static DETOKENIZE_TOKEN_COUNT: Lazy<Counter> = Lazy::new(|| {
Counter::with_opts(Opts::new(
frontend_metric_name(frontend_perf::DETOKENIZE_TOKEN_COUNT),
"Total tokens detokenized",
))
.expect("detokenize_token_count counter")
}); });
/// Guards idempotency for the `MetricsRegistry` registration path. /// Guards idempotency for the `MetricsRegistry` registration path.
...@@ -88,7 +93,10 @@ pub fn ensure_frontend_perf_metrics_registered(registry: &MetricsRegistry) { ...@@ -88,7 +93,10 @@ pub fn ensure_frontend_perf_metrics_registered(registry: &MetricsRegistry) {
registry.add_metric(Box::new(TOKENIZE_SECONDS.clone())).ok(); registry.add_metric(Box::new(TOKENIZE_SECONDS.clone())).ok();
registry.add_metric(Box::new(TEMPLATE_SECONDS.clone())).ok(); registry.add_metric(Box::new(TEMPLATE_SECONDS.clone())).ok();
registry registry
.add_metric(Box::new(DETOKENIZE_PER_TOKEN_US.clone())) .add_metric(Box::new(DETOKENIZE_TOTAL_US.clone()))
.ok();
registry
.add_metric(Box::new(DETOKENIZE_TOKEN_COUNT.clone()))
.ok(); .ok();
}); });
} }
...@@ -104,7 +112,8 @@ pub fn ensure_frontend_perf_metrics_registered_prometheus( ...@@ -104,7 +112,8 @@ pub fn ensure_frontend_perf_metrics_registered_prometheus(
registry.register(Box::new(STAGE_DURATION_SECONDS.clone()))?; registry.register(Box::new(STAGE_DURATION_SECONDS.clone()))?;
registry.register(Box::new(TOKENIZE_SECONDS.clone()))?; registry.register(Box::new(TOKENIZE_SECONDS.clone()))?;
registry.register(Box::new(TEMPLATE_SECONDS.clone()))?; registry.register(Box::new(TEMPLATE_SECONDS.clone()))?;
registry.register(Box::new(DETOKENIZE_PER_TOKEN_US.clone()))?; registry.register(Box::new(DETOKENIZE_TOTAL_US.clone()))?;
registry.register(Box::new(DETOKENIZE_TOKEN_COUNT.clone()))?;
let _ = PROMETHEUS_REGISTERED.set(()); let _ = PROMETHEUS_REGISTERED.set(());
Ok(()) Ok(())
} }
...@@ -61,31 +61,42 @@ ...@@ -61,31 +61,42 @@
use once_cell::sync::Lazy; use once_cell::sync::Lazy;
use regex::Regex; use regex::Regex;
/// Metric name prefixes used across the metrics system /// Metric name prefixes used across the metrics system.
pub mod name_prefix { pub mod name_prefix {
/// Prefix for all Prometheus metric names. /// Prefix for component-scoped metrics, auto-labeled with namespace/endpoint.
pub const COMPONENT: &str = "dynamo_component"; pub const COMPONENT: &str = "dynamo_component";
/// Prefix for frontend service metrics /// Prefix for frontend HTTP service metrics (requests, TTFT, ITL, disconnects).
pub const FRONTEND: &str = "dynamo_frontend"; pub const FRONTEND: &str = "dynamo_frontend";
/// Prefix for KV router metrics (used with router_id label) /// Prefix for KV router instance metrics (carries `router_id` label).
pub const ROUTER: &str = "dynamo_router"; pub const ROUTER: &str = "dynamo_router";
/// Prefix for tokio runtime metrics // Note: REQUEST_PLANE vs TRANSPORT: REQUEST_PLANE measures *what requests do* (latency,
pub const TOKIO: &str = "dynamo_tokio"; // concurrency) and is transport-agnostic. TRANSPORT measures *how the wire behaves*
// (bytes transferred, protocol errors) and is protocol-specific (TCP/NATS).
/// Prefix for standalone KV indexer metrics /// Prefix for standalone KV indexer metrics
pub const KVINDEXER: &str = "dynamo_kvindexer"; pub const KVINDEXER: &str = "dynamo_kvindexer";
/// Prefix for request-plane metrics at AddressedPushRouter /// Prefix for request-plane metrics at AddressedPushRouter.
/// Transport-agnostic: measures request lifecycle latency and concurrency
/// (queue → send → roundtrip TTFT, inflight gauge).
pub const REQUEST_PLANE: &str = "dynamo_request_plane"; pub const REQUEST_PLANE: &str = "dynamo_request_plane";
/// Prefix for transport-layer metrics (TCP / NATS) /// Prefix for transport-layer metrics (TCP / NATS).
/// Protocol-specific: measures wire-level health (bytes sent/received, error counts).
pub const TRANSPORT: &str = "dynamo_transport"; pub const TRANSPORT: &str = "dynamo_transport";
/// Prefix for work-handler transport breakdown metrics (backend side) /// Prefix for work-handler transport breakdown metrics (backend side)
pub const WORK_HANDLER: &str = "dynamo_work_handler"; pub const WORK_HANDLER: &str = "dynamo_work_handler";
/// Prefix for tokio runtime metrics (poll times, queue depths, stalls).
pub const TOKIO: &str = "dynamo_tokio";
/// Prefix for per-phase routing overhead latency (hashing, scheduling).
/// Raw Prometheus, not component-scoped.
pub const ROUTING_OVERHEAD: &str = "dynamo_routing_overhead";
} }
/// Automatically inserted Prometheus label names used across the metrics system /// Automatically inserted Prometheus label names used across the metrics system
...@@ -507,8 +518,10 @@ pub mod frontend_perf { ...@@ -507,8 +518,10 @@ pub mod frontend_perf {
pub const TOKENIZE_SECONDS: &str = "tokenize_seconds"; pub const TOKENIZE_SECONDS: &str = "tokenize_seconds";
/// Template application time in preprocessor /// Template application time in preprocessor
pub const TEMPLATE_SECONDS: &str = "template_seconds"; pub const TEMPLATE_SECONDS: &str = "template_seconds";
/// Per-token detokenization cost (microseconds) /// Cumulative detokenization time (microseconds); pair with DETOKENIZE_TOKEN_COUNT
pub const DETOKENIZE_PER_TOKEN_US: &str = "detokenize_per_token_us"; pub const DETOKENIZE_TOTAL_US: &str = "detokenize_total_us";
/// Total tokens detokenized; use rate(total_us)/rate(count) for per-token average
pub const DETOKENIZE_TOKEN_COUNT: &str = "detokenize_token_count";
/// Event loop delay canary (sleep 10ms, measure drift) /// Event loop delay canary (sleep 10ms, measure drift)
pub const EVENT_LOOP_DELAY_SECONDS: &str = "event_loop_delay_seconds"; pub const EVENT_LOOP_DELAY_SECONDS: &str = "event_loop_delay_seconds";
/// Count of event loop stalls (delay > 5ms) /// Count of event loop stalls (delay > 5ms)
......
...@@ -8,6 +8,7 @@ use prometheus::{Counter, Gauge, Histogram, HistogramOpts, IntCounterVec, IntGau ...@@ -8,6 +8,7 @@ use prometheus::{Counter, Gauge, Histogram, HistogramOpts, IntCounterVec, IntGau
use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use tokio::runtime::Handle; use tokio::runtime::Handle;
use tokio_util::sync::CancellationToken;
use super::prometheus_names::{frontend_perf, name_prefix, tokio_perf as names}; use super::prometheus_names::{frontend_perf, name_prefix, tokio_perf as names};
use crate::MetricsRegistry; use crate::MetricsRegistry;
...@@ -246,7 +247,8 @@ pub fn ensure_tokio_perf_metrics_registered_prometheus( ...@@ -246,7 +247,8 @@ pub fn ensure_tokio_perf_metrics_registered_prometheus(
/// Run the tokio metrics collector (1s interval) and event-loop canary. /// Run the tokio metrics collector (1s interval) and event-loop canary.
/// Spawn this on the runtime you want to monitor (e.g. primary handle). /// Spawn this on the runtime you want to monitor (e.g. primary handle).
pub async fn tokio_metrics_and_canary_loop() { /// The loop exits cleanly when `cancel` is triggered.
pub async fn tokio_metrics_and_canary_loop(cancel: CancellationToken) {
let canary_interval = Duration::from_millis(10); let canary_interval = Duration::from_millis(10);
let stall_threshold = Duration::from_millis(5); let stall_threshold = Duration::from_millis(5);
let collect_interval = Duration::from_secs(1); let collect_interval = Duration::from_secs(1);
...@@ -254,7 +256,13 @@ pub async fn tokio_metrics_and_canary_loop() { ...@@ -254,7 +256,13 @@ pub async fn tokio_metrics_and_canary_loop() {
let mut prev_counters = PrevWorkerCounters::new(); let mut prev_counters = PrevWorkerCounters::new();
loop { loop {
let start = Instant::now(); let start = Instant::now();
tokio::time::sleep(canary_interval).await; tokio::select! {
_ = tokio::time::sleep(canary_interval) => {}
_ = cancel.cancelled() => {
tracing::debug!("tokio metrics and canary loop shutting down");
return;
}
}
let delay = start.elapsed().saturating_sub(canary_interval); let delay = start.elapsed().saturating_sub(canary_interval);
EVENT_LOOP_DELAY_SECONDS.observe(delay.as_secs_f64()); EVENT_LOOP_DELAY_SECONDS.observe(delay.as_secs_f64());
if delay > stall_threshold { if delay > stall_threshold {
......
// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//! NVTX timeline-annotation helpers for Nsight Systems profiling.
//!
//! Delegates to [`cudarc::nvtx`] for the actual NVTX calls
//!
//! # Gating (two-level)
//!
//! | Cargo feature `nvtx` | `DYN_ENABLE_RUST_NVTX` env | Effect |
//! |----------------------|----------------------------|-------------------------------------------|
//! | off (default) | any | macros compile to nothing; zero overhead |
//! | on | unset | one `Relaxed` load per site (~1 ns) |
//! | on | `1` / `true` / `yes` | cudarc NVTX calls (~50 ns/annotation) |
//!
//! # Usage
//!
//! ```rust,ignore
//! let _r = dynamo_nvtx_range!("preprocess.tokenize"); // RAII — pops at scope end
//! dynamo_nvtx_push!("codec.encode");
//! dynamo_nvtx_pop!();
//! dynamo_nvtx_name_thread!("tokio-worker-0");
//! ```
//!
//! # Build
//!
//! ```bash
//! cargo build --profile profiling --features nvtx
//! ```
//! Requires `libnvToolsExt.so` at runtime (CUDA Toolkit or NVHPC).
#[cfg(feature = "nvtx")]
use std::sync::atomic::{AtomicBool, Ordering};
#[cfg(feature = "nvtx")]
static NVTX_ENABLED: AtomicBool = AtomicBool::new(false);
// ── Public API ───────────────────────────────────────────────────────────────
/// Initialise the NVTX subsystem from the `DYN_ENABLE_RUST_NVTX` environment variable.
/// Must be called once at runtime startup before any annotation macros fire.
/// No-op when the `nvtx` Cargo feature is off.
pub fn init() {
#[cfg(feature = "nvtx")]
{
let enabled = std::env::var("DYN_ENABLE_RUST_NVTX")
.map(|v| matches!(v.to_lowercase().as_str(), "1" | "true" | "yes" | "on"))
.unwrap_or(false);
NVTX_ENABLED.store(enabled, Ordering::Relaxed);
if enabled {
tracing::info!("NVTX annotations enabled (DYN_ENABLE_RUST_NVTX)");
}
}
}
/// Returns `true` when the `nvtx` feature is compiled in **and** `DYN_ENABLE_RUST_NVTX` is set.
#[inline(always)]
pub fn enabled() -> bool {
#[cfg(feature = "nvtx")]
{
return NVTX_ENABLED.load(Ordering::Relaxed);
}
#[allow(unreachable_code)]
false
}
/// Push an NVTX range onto the calling thread's stack.
/// No-op (compiled out) when the `nvtx` feature is off.
#[inline(always)]
pub fn push_impl(name: &str) {
#[cfg(feature = "nvtx")]
{
if NVTX_ENABLED.load(Ordering::Relaxed) {
cudarc::nvtx::result::range_push(name);
}
}
let _ = name;
}
/// Pop the innermost NVTX range from the calling thread's stack.
/// No-op (compiled out) when the `nvtx` feature is off.
#[inline(always)]
pub fn pop_impl() {
#[cfg(feature = "nvtx")]
{
if NVTX_ENABLED.load(Ordering::Relaxed) {
cudarc::nvtx::result::range_pop();
}
}
}
/// Name the current OS thread in the Nsight Systems timeline.
/// No-op (compiled out) when the `nvtx` feature is off.
#[inline(always)]
pub fn name_current_thread_impl(name: &str) {
#[cfg(feature = "nvtx")]
{
if NVTX_ENABLED.load(Ordering::Relaxed) {
#[cfg(target_os = "linux")]
let tid = unsafe { libc::syscall(libc::SYS_gettid) as u32 };
#[cfg(not(target_os = "linux"))]
let tid = 0u32;
cudarc::nvtx::result::name_os_thread(tid, name);
}
}
let _ = name;
}
// ── RAII guard ───────────────────────────────────────────────────────────────
/// RAII guard that pops an NVTX range when dropped.
/// Construct with [`dynamo_nvtx_range!`].
#[cfg(feature = "nvtx")]
pub struct NvtxRangeGuard {
active: bool,
}
/// Zero-sized no-op guard used when the `nvtx` feature is off.
#[cfg(not(feature = "nvtx"))]
pub struct NvtxRangeGuard;
impl NvtxRangeGuard {
#[doc(hidden)]
pub fn new(name: &str) -> Self {
#[cfg(feature = "nvtx")]
{
let active = NVTX_ENABLED.load(Ordering::Relaxed);
if active {
cudarc::nvtx::result::range_push(name);
}
return NvtxRangeGuard { active };
}
#[cfg(not(feature = "nvtx"))]
{
let _ = name;
NvtxRangeGuard {}
}
}
}
#[cfg(feature = "nvtx")]
impl Drop for NvtxRangeGuard {
fn drop(&mut self) {
if self.active {
cudarc::nvtx::result::range_pop();
}
}
}
#[cfg(not(feature = "nvtx"))]
impl Drop for NvtxRangeGuard {
fn drop(&mut self) {}
}
// ── Macros ───────────────────────────────────────────────────────────────────
/// Push a named NVTX range onto the calling thread's stack.
/// Zero-cost when the `nvtx` Cargo feature is off.
#[macro_export]
macro_rules! dynamo_nvtx_push {
($name:expr) => {
$crate::nvtx::push_impl($name)
};
}
/// Pop the innermost NVTX range from the calling thread's stack.
/// Zero-cost when the `nvtx` Cargo feature is off.
#[macro_export]
macro_rules! dynamo_nvtx_pop {
() => {
$crate::nvtx::pop_impl()
};
}
/// Open a named NVTX range that closes automatically at end of scope.
///
/// ```rust,ignore
/// let _r = dynamo_nvtx_range!("preprocess.tokenize");
/// // range closes here
/// ```
/// Zero-cost when the `nvtx` Cargo feature is off.
#[macro_export]
macro_rules! dynamo_nvtx_range {
($name:expr) => {
$crate::nvtx::NvtxRangeGuard::new($name)
};
}
/// Annotate the current OS thread in the Nsight Systems timeline.
/// Zero-cost when the `nvtx` Cargo feature is off.
#[macro_export]
macro_rules! dynamo_nvtx_name_thread {
($name:expr) => {
$crate::nvtx::name_current_thread_impl($name)
};
}
...@@ -278,6 +278,11 @@ struct RequestControlMessage { ...@@ -278,6 +278,11 @@ struct RequestControlMessage {
request_type: RequestType, request_type: RequestType,
response_type: ResponseType, response_type: ResponseType,
connection_info: ConnectionInfo, connection_info: ConnectionInfo,
/// Wall-clock send timestamp (nanos since UNIX epoch) for transport latency breakdown.
/// Uses `SystemTime` so accuracy depends on NTP sync between frontend and backend hosts.
/// Reliable for single-machine profiling; treat cross-host values as approximate.
#[serde(default, skip_serializing_if = "Option::is_none")]
frontend_send_ts_ns: Option<u64>,
} }
pub struct Ingress<Req: PipelineIO, Resp: PipelineIO> { pub struct Ingress<Req: PipelineIO, Resp: PipelineIO> {
...@@ -310,6 +315,11 @@ impl<Req: PipelineIO + Sync, Resp: PipelineIO> Ingress<Req, Resp> { ...@@ -310,6 +315,11 @@ impl<Req: PipelineIO + Sync, Resp: PipelineIO> Ingress<Req, Resp> {
let metrics = WorkHandlerMetrics::from_endpoint(endpoint, metrics_labels) let metrics = WorkHandlerMetrics::from_endpoint(endpoint, metrics_labels)
.map_err(|e| anyhow::anyhow!("Failed to create work handler metrics: {}", e))?; .map_err(|e| anyhow::anyhow!("Failed to create work handler metrics: {}", e))?;
// Register global transport breakdown metrics (idempotent)
crate::metrics::work_handler_perf::ensure_work_handler_perf_metrics_registered(
endpoint.get_metrics_registry(),
);
self.metrics self.metrics
.set(Arc::new(metrics)) .set(Arc::new(metrics))
.map_err(|_| anyhow::anyhow!("Metrics already set")) .map_err(|_| anyhow::anyhow!("Metrics already set"))
......
...@@ -2,12 +2,19 @@ ...@@ -2,12 +2,19 @@
// SPDX-License-Identifier: Apache-2.0 // SPDX-License-Identifier: Apache-2.0
use std::sync::Arc; use std::sync::Arc;
use std::time::Instant;
use super::unified_client::RequestPlaneClient; use super::unified_client::RequestPlaneClient;
use super::*; use super::*;
use crate::dynamo_nvtx_range;
use crate::engine::{AsyncEngine, AsyncEngineContextProvider, Data}; use crate::engine::{AsyncEngine, AsyncEngineContextProvider, Data};
use crate::error::{DynamoError, ErrorType}; use crate::error::{DynamoError, ErrorType};
use crate::logging::inject_trace_headers_into_map; use crate::logging::inject_trace_headers_into_map;
use crate::metrics::frontend_perf::STAGE_DURATION_SECONDS;
use crate::metrics::request_plane::{
REQUEST_PLANE_INFLIGHT, REQUEST_PLANE_QUEUE_SECONDS, REQUEST_PLANE_ROUNDTRIP_TTFT_SECONDS,
REQUEST_PLANE_SEND_SECONDS,
};
use crate::pipeline::network::ConnectionInfo; use crate::pipeline::network::ConnectionInfo;
use crate::pipeline::network::NetworkStreamWrapper; use crate::pipeline::network::NetworkStreamWrapper;
use crate::pipeline::network::PendingConnections; use crate::pipeline::network::PendingConnections;
...@@ -19,8 +26,11 @@ use crate::pipeline::{ManyOut, PipelineError, ResponseStream, SingleIn}; ...@@ -19,8 +26,11 @@ use crate::pipeline::{ManyOut, PipelineError, ResponseStream, SingleIn};
use crate::protocols::maybe_error::MaybeError; use crate::protocols::maybe_error::MaybeError;
use anyhow::{Error, Result}; use anyhow::{Error, Result};
use futures::stream::Stream;
use serde::Deserialize; use serde::Deserialize;
use serde::Serialize; use serde::Serialize;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio_stream::{StreamExt, StreamNotifyClose, wrappers::ReceiverStream}; use tokio_stream::{StreamExt, StreamNotifyClose, wrappers::ReceiverStream};
use tracing::Instrument; use tracing::Instrument;
...@@ -44,6 +54,60 @@ struct RequestControlMessage { ...@@ -44,6 +54,60 @@ struct RequestControlMessage {
request_type: RequestType, request_type: RequestType,
response_type: ResponseType, response_type: ResponseType,
connection_info: ConnectionInfo, connection_info: ConnectionInfo,
/// Wall-clock send timestamp (nanos since UNIX epoch) for transport latency breakdown.
/// Uses `SystemTime` so accuracy depends on NTP sync between frontend and backend hosts.
/// Reliable for single-machine profiling; treat cross-host values as approximate.
#[serde(default, skip_serializing_if = "Option::is_none")]
frontend_send_ts_ns: Option<u64>,
}
/// RAII guard that decrements REQUEST_PLANE_INFLIGHT on drop unless disarmed.
/// Protects against gauge leaks when `?` operators cause early returns between
/// the increment and `InflightDecStream` construction.
struct InflightGuard {
armed: bool,
}
impl InflightGuard {
fn new() -> Self {
Self { armed: true }
}
/// Consume the guard without decrementing. Call this when `InflightDecStream`
/// takes over responsibility for the decrement.
fn disarm(mut self) {
self.armed = false;
}
}
impl Drop for InflightGuard {
fn drop(&mut self) {
if self.armed {
REQUEST_PLANE_INFLIGHT.dec();
}
}
}
/// Wrapper that decrements request-plane inflight gauge when the stream is dropped.
struct InflightDecStream<S> {
inner: S,
}
impl<S, T> Stream for InflightDecStream<S>
where
S: Stream<Item = T> + Unpin,
{
type Item = T;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Pin::new(&mut self.inner).poll_next(cx)
}
}
impl<S> Drop for InflightDecStream<S> {
fn drop(&mut self) {
REQUEST_PLANE_INFLIGHT.dec();
}
} }
pub struct AddressedRequest<T> { pub struct AddressedRequest<T> {
...@@ -92,6 +156,10 @@ where ...@@ -92,6 +156,10 @@ where
U: Data + for<'de> Deserialize<'de> + MaybeError, U: Data + for<'de> Deserialize<'de> + MaybeError,
{ {
async fn generate(&self, request: SingleIn<AddressedRequest<T>>) -> Result<ManyOut<U>, Error> { async fn generate(&self, request: SingleIn<AddressedRequest<T>>) -> Result<ManyOut<U>, Error> {
let queue_start = Instant::now();
REQUEST_PLANE_INFLIGHT.inc();
let inflight_guard = InflightGuard::new();
let request_id = request.context().id().to_string(); let request_id = request.context().id().to_string();
let (addressed_request, context) = request.transfer(()); let (addressed_request, context) = request.transfer(());
let (request, address) = addressed_request.into_parts(); let (request, address) = addressed_request.into_parts();
...@@ -130,6 +198,7 @@ where ...@@ -130,6 +198,7 @@ where
request_type: RequestType::SingleIn, request_type: RequestType::SingleIn,
response_type: ResponseType::ManyOut, response_type: ResponseType::ManyOut,
connection_info, connection_info,
frontend_send_ts_ns: None,
}; };
// next build the two part message where we package the connection info and the request into // next build the two part message where we package the connection info and the request into
...@@ -151,7 +220,13 @@ where ...@@ -151,7 +220,13 @@ where
// or it should take a two part message directly // or it should take a two part message directly
// todo - update this // todo - update this
let codec = TwoPartCodec::default(); let codec = TwoPartCodec::default();
let buffer = codec.encode_message(msg)?; let buffer = {
let _nvtx = dynamo_nvtx_range!("codec.encode");
codec.encode_message(msg)?
};
REQUEST_PLANE_QUEUE_SECONDS.observe(queue_start.elapsed().as_secs_f64());
let tx_start = Instant::now();
// TRANSPORT ABSTRACT REQUIRED - END HERE // TRANSPORT ABSTRACT REQUIRED - END HERE
...@@ -167,25 +242,47 @@ where ...@@ -167,25 +242,47 @@ where
let mut headers = std::collections::HashMap::new(); let mut headers = std::collections::HashMap::new();
inject_trace_headers_into_map(&mut headers); inject_trace_headers_into_map(&mut headers);
// Send request (works for all transport types) // Stamp send time right before the transport write so the network
// transit metric excludes serialization/encoding overhead.
let send_ts_ns = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_nanos() as u64;
headers.insert("x-frontend-send-ts-ns".to_string(), send_ts_ns.to_string());
// Phase A: Frontend → Backend (network + queue + ack)
let _nvtx_send = dynamo_nvtx_range!("transport.tcp.send");
let _response = self let _response = self
.req_client .req_client
.send_request(address, buffer, headers) .send_request(address, buffer, headers)
.await?; .await?;
drop(_nvtx_send);
REQUEST_PLANE_SEND_SECONDS.observe(tx_start.elapsed().as_secs_f64());
let _nvtx_wait = dynamo_nvtx_range!("transport.tcp.wait_backend");
tracing::trace!(request_id, "awaiting transport handshake"); tracing::trace!(request_id, "awaiting transport handshake");
let response_stream = response_stream_provider let response_stream = response_stream_provider
.await .await
.map_err(|_| PipelineError::DetachedStreamReceiver)? .map_err(|_| PipelineError::DetachedStreamReceiver)?
.map_err(PipelineError::ConnectionFailed)?; .map_err(PipelineError::ConnectionFailed)?;
drop(_nvtx_wait);
// TODO: Detect end-of-stream using Server-Sent Events (SSE) // TODO: Detect end-of-stream using Server-Sent Events (SSE)
let mut is_complete_final = false; let mut is_complete_final = false;
let mut first_response = true;
let stream = tokio_stream::StreamNotifyClose::new( let stream = tokio_stream::StreamNotifyClose::new(
tokio_stream::wrappers::ReceiverStream::new(response_stream.rx), tokio_stream::wrappers::ReceiverStream::new(response_stream.rx),
) )
.filter_map(move |res| { .filter_map(move |res| {
if let Some(res_bytes) = res { if let Some(res_bytes) = res {
if first_response {
first_response = false;
let roundtrip_ttft = tx_start.elapsed().as_secs_f64();
REQUEST_PLANE_ROUNDTRIP_TTFT_SECONDS.observe(roundtrip_ttft);
STAGE_DURATION_SECONDS
.with_label_values(&["transport_roundtrip"])
.observe(queue_start.elapsed().as_secs_f64());
}
if is_complete_final { if is_complete_final {
let err = DynamoError::msg( let err = DynamoError::msg(
"Response received after generation ended - this should never happen", "Response received after generation ended - this should never happen",
...@@ -234,6 +331,8 @@ where ...@@ -234,6 +331,8 @@ where
} }
}); });
inflight_guard.disarm();
let stream = InflightDecStream { inner: stream };
Ok(ResponseStream::new(Box::pin(stream), engine_ctx)) Ok(ResponseStream::new(Box::pin(stream), engine_ctx))
} }
} }
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment