Unverified Commit f6c34e07 authored by Yan Ru Pei's avatar Yan Ru Pei Committed by GitHub
Browse files

chore: otel tracings for routing overheads and metrics (#6194)


Signed-off-by: default avatarPeaBrane <yanrpei@gmail.com>
parent 2be83be2
...@@ -20,6 +20,7 @@ use dynamo_runtime::{ ...@@ -20,6 +20,7 @@ use dynamo_runtime::{
}; };
use futures::stream; use futures::stream;
use tokio::sync::oneshot; use tokio::sync::oneshot;
use tracing::Instrument;
use validator::Validate; use validator::Validate;
// Re-export from dynamo-kv-router crate // Re-export from dynamo-kv-router crate
...@@ -383,18 +384,25 @@ impl KvRouter { ...@@ -383,18 +384,25 @@ impl KvRouter {
let isl_tokens = tokens.len(); let isl_tokens = tokens.len();
let block_hashes = compute_block_hash_for_seq(tokens, self.block_size, None); let block_hashes = tracing::info_span!("kv_router.compute_block_hashes")
.in_scope(|| compute_block_hash_for_seq(tokens, self.block_size, None));
let hash_elapsed = start.elapsed(); let hash_elapsed = start.elapsed();
let overlap_scores = self.indexer.find_matches(block_hashes).await?; let overlap_scores = self
.indexer
.find_matches(block_hashes)
.instrument(tracing::info_span!("kv_router.find_matches"))
.await?;
let find_matches_elapsed = start.elapsed(); let find_matches_elapsed = start.elapsed();
// Compute seq_hashes only if scheduler needs it for active blocks tracking // Compute seq_hashes only if scheduler needs it for active blocks tracking
let maybe_seq_hashes = self.kv_router_config.compute_seq_hashes_for_tracking( let maybe_seq_hashes = tracing::info_span!("kv_router.compute_seq_hashes").in_scope(|| {
tokens, self.kv_router_config.compute_seq_hashes_for_tracking(
self.block_size, tokens,
router_config_override, self.block_size,
); router_config_override,
)
});
let seq_hash_elapsed = start.elapsed(); let seq_hash_elapsed = start.elapsed();
let best_worker = self let best_worker = self
...@@ -409,6 +417,7 @@ impl KvRouter { ...@@ -409,6 +417,7 @@ impl KvRouter {
lora_name, lora_name,
priority_jump, priority_jump,
) )
.instrument(tracing::info_span!("kv_router.schedule"))
.await?; .await?;
let total_elapsed = start.elapsed(); let total_elapsed = start.elapsed();
......
...@@ -287,7 +287,6 @@ impl PrefillRouter { ...@@ -287,7 +287,6 @@ impl PrefillRouter {
.unwrap_or(0.0); .unwrap_or(0.0);
match self match self
.query_prefill_worker(&req.token_ids, false, lora_name, priority_jump) .query_prefill_worker(&req.token_ids, false, lora_name, priority_jump)
.instrument(tracing::info_span!("query_prefill_worker"))
.await .await
{ {
Ok((worker_id, dp_rank)) => (worker_id, dp_rank), Ok((worker_id, dp_rank)) => (worker_id, dp_rank),
...@@ -619,7 +618,6 @@ impl ...@@ -619,7 +618,6 @@ impl
}) })
} }
} }
.instrument(tracing::info_span!("prefill_routing"))
.await; .await;
// Abort if cancelled during prefill // Abort if cancelled during prefill
......
...@@ -13,6 +13,7 @@ use dynamo_runtime::{ ...@@ -13,6 +13,7 @@ use dynamo_runtime::{
}; };
use futures::stream::{self, StreamExt}; use futures::stream::{self, StreamExt};
use serde_json::json; use serde_json::json;
use tracing::Instrument;
use crate::{ use crate::{
kv_router::{ kv_router::{
...@@ -239,6 +240,7 @@ impl AsyncEngine<SingleIn<PreprocessedRequest>, ManyOut<Annotated<LLMEngineOutpu ...@@ -239,6 +240,7 @@ impl AsyncEngine<SingleIn<PreprocessedRequest>, ManyOut<Annotated<LLMEngineOutpu
let block_size = self.chooser.block_size() as usize; let block_size = self.chooser.block_size() as usize;
let selection = self let selection = self
.select_worker(&context_id, &request, phase, is_query_only) .select_worker(&context_id, &request, phase, is_query_only)
.instrument(tracing::info_span!("kv_router.select_worker"))
.await?; .await?;
let WorkerSelection { let WorkerSelection {
instance_id, instance_id,
...@@ -328,7 +330,18 @@ impl AsyncEngine<SingleIn<PreprocessedRequest>, ManyOut<Annotated<LLMEngineOutpu ...@@ -328,7 +330,18 @@ impl AsyncEngine<SingleIn<PreprocessedRequest>, ManyOut<Annotated<LLMEngineOutpu
} }
let chooser = self.chooser.clone(); let chooser = self.chooser.clone();
let mut response_stream = self.inner.direct(updated_request, instance_id).await?; let mut response_stream = self
.inner
.direct(updated_request, instance_id)
.instrument(tracing::info_span!(
"kv_router.route_request",
request_id = %context_id,
worker_id = instance_id,
dp_rank = dp_rank,
overlap_blocks = overlap_amount,
phase = ?phase,
))
.await?;
let stream_context = response_stream.context(); let stream_context = response_stream.context();
let context_for_monitoring = stream_context.clone(); let context_for_monitoring = stream_context.clone();
......
...@@ -27,6 +27,7 @@ use std::{ ...@@ -27,6 +27,7 @@ use std::{
}, },
}; };
use tokio_stream::StreamExt; use tokio_stream::StreamExt;
use tracing::Instrument;
/// Trait for monitoring worker load and determining busy state. /// Trait for monitoring worker load and determining busy state.
/// Implementations can define custom load metrics and busy thresholds. /// Implementations can define custom load metrics and busy thresholds.
...@@ -310,6 +311,18 @@ where ...@@ -310,6 +311,18 @@ where
instance_id: u64, instance_id: u64,
request: SingleIn<T>, request: SingleIn<T>,
) -> anyhow::Result<ManyOut<U>> { ) -> anyhow::Result<ManyOut<U>> {
let request_id = request.id().to_string();
let route_span = if matches!(self.router_mode, RouterMode::KV) {
tracing::Span::none()
} else {
tracing::info_span!(
"router.route_request",
request_id = %request_id,
worker_id = instance_id,
router_mode = ?self.router_mode,
)
};
// Check if all workers are busy (only if busy threshold is set and fault detection enabled) // Check if all workers are busy (only if busy threshold is set and fault detection enabled)
if self.fault_detection_enabled && self.busy_threshold.is_some() { if self.fault_detection_enabled && self.busy_threshold.is_some() {
let free_instances = self.client.instance_ids_free(); let free_instances = self.client.instance_ids_free();
...@@ -373,7 +386,11 @@ where ...@@ -373,7 +386,11 @@ where
let request = request.map(|req| AddressedRequest::new(req, address)); let request = request.map(|req| AddressedRequest::new(req, address));
let stream: anyhow::Result<ManyOut<U>> = self.addressed.generate(request).await; let stream: anyhow::Result<ManyOut<U>> = self
.addressed
.generate(request)
.instrument(route_span)
.await;
match stream { match stream {
Ok(stream) => { Ok(stream) => {
if !self.fault_detection_enabled { if !self.fault_detection_enabled {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment