Unverified Commit e6a6a1f2 authored by Jacky's avatar Jacky Committed by GitHub
Browse files

feat: Request Migration Metrics (#5029)


Signed-off-by: default avatarJacky <18255193+kthui@users.noreply.github.com>
parent edda76b4
...@@ -109,6 +109,28 @@ The migration system is designed with several important architectural considerat ...@@ -109,6 +109,28 @@ The migration system is designed with several important architectural considerat
**Error Handling**: The migration system distinguishes between different types of failures and applies appropriate recovery strategies for each scenario. **Error Handling**: The migration system distinguishes between different types of failures and applies appropriate recovery strategies for each scenario.
## Monitoring and Metrics
The migration system exposes Prometheus metrics to monitor migration activity. These metrics are available on the frontend's `/metrics` endpoint (default port 8000):
- `dynamo_frontend_model_migration_total`: Counter tracking the total number of request migrations
- Labels:
- `model`: The model name being served
- `migration_type`: Either `new_request` (initial connection failure) or `ongoing_request` (mid-stream disconnection)
**Example metrics output:**
```
dynamo_frontend_model_migration_total{migration_type="ongoing_request",model="Qwen/Qwen3-0.6B"} 3
dynamo_frontend_model_migration_total{migration_type="new_request",model="Qwen/Qwen3-0.6B"} 1
```
These metrics can be used to:
- Monitor worker reliability and failure patterns
- Alert on excessive migration rates indicating infrastructure issues
- Track the effectiveness of fault tolerance mechanisms
For more information on Dynamo metrics, see the [Metrics documentation](../observability/metrics.md).
## Operational Impact ## Operational Impact
Request migration fundamentally changes how the system handles failures, moving from a "fail-fast" approach to a "graceful degradation" model. This architectural shift enables higher availability and better resource utilization while maintaining the same external API contract for clients. Request migration fundamentally changes how the system handles failures, moving from a "fail-fast" approach to a "graceful degradation" model. This architectural shift enables higher availability and better resource utilization while maintaining the same external API contract for clients.
...@@ -159,6 +159,7 @@ The Dynamo HTTP Frontend (`python -m dynamo.frontend`) exposes `dynamo_frontend_ ...@@ -159,6 +159,7 @@ The Dynamo HTTP Frontend (`python -m dynamo.frontend`) exposes `dynamo_frontend_
- `dynamo_frontend_request_duration_seconds`: LLM request duration (histogram) - `dynamo_frontend_request_duration_seconds`: LLM request duration (histogram)
- `dynamo_frontend_requests_total`: Total LLM requests (counter) - `dynamo_frontend_requests_total`: Total LLM requests (counter)
- `dynamo_frontend_time_to_first_token_seconds`: Time to first token (histogram) - `dynamo_frontend_time_to_first_token_seconds`: Time to first token (histogram)
- `dynamo_frontend_model_migration_total`: Total number of request migrations due to worker unavailability (counter, labels: `model`, `migration_type`)
**Access frontend metrics:** **Access frontend metrics:**
```bash ```bash
......
...@@ -358,6 +358,7 @@ use dynamo_runtime::{Runtime, distributed::DistributedConfig, traits::Distribute ...@@ -358,6 +358,7 @@ use dynamo_runtime::{Runtime, distributed::DistributedConfig, traits::Distribute
use dynamo_llm::discovery::ModelManager; use dynamo_llm::discovery::ModelManager;
use dynamo_llm::entrypoint::build_routed_pipeline; use dynamo_llm::entrypoint::build_routed_pipeline;
use dynamo_llm::http::service::metrics::Metrics;
use dynamo_llm::kv_router::KvRouterConfig; use dynamo_llm::kv_router::KvRouterConfig;
use dynamo_llm::model_card::ModelDeploymentCard; use dynamo_llm::model_card::ModelDeploymentCard;
use dynamo_llm::protocols::openai::nvext::NvExt; use dynamo_llm::protocols::openai::nvext::NvExt;
...@@ -1120,11 +1121,14 @@ pub async fn create_worker_selection_pipeline_chat( ...@@ -1120,11 +1121,14 @@ pub async fn create_worker_selection_pipeline_chat(
active_prefill_tokens_threshold: None, active_prefill_tokens_threshold: None,
enforce_disagg, enforce_disagg,
}; };
// Create metrics for migration tracking (not exposed via /metrics in C bindings)
let metrics = Arc::new(Metrics::new());
let watcher = ModelWatcher::new( let watcher = ModelWatcher::new(
component.drt().clone(), component.drt().clone(),
model_manager.clone(), model_manager.clone(),
router_config, router_config,
None, None,
metrics.clone(),
); );
let cards = watcher let cards = watcher
.cards_for_model(model_name, Some(namespace), false) .cards_for_model(model_name, Some(namespace), false)
...@@ -1225,6 +1229,7 @@ pub async fn create_worker_selection_pipeline_chat( ...@@ -1225,6 +1229,7 @@ pub async fn create_worker_selection_pipeline_chat(
hf_tokenizer, hf_tokenizer,
prefill_chooser, prefill_chooser,
enforce_disagg, enforce_disagg,
metrics,
) )
.await?; .await?;
......
...@@ -78,6 +78,10 @@ class frontend_service: ...@@ -78,6 +78,10 @@ class frontend_service:
MODEL_KV_CACHE_BLOCK_SIZE = "model_kv_cache_block_size" MODEL_KV_CACHE_BLOCK_SIZE = "model_kv_cache_block_size"
# Request migration limit for a worker serving the model (MDC) # Request migration limit for a worker serving the model (MDC)
MODEL_MIGRATION_LIMIT = "model_migration_limit" MODEL_MIGRATION_LIMIT = "model_migration_limit"
# Total number of request migrations due to worker unavailability
MODEL_MIGRATION_TOTAL = "model_migration_total"
# Label name for the type of migration
MIGRATION_TYPE_LABEL = "migration_type"
class kvbm: class kvbm:
...@@ -141,45 +145,6 @@ class name_prefix: ...@@ -141,45 +145,6 @@ class name_prefix:
FRONTEND = "dynamo_frontend" FRONTEND = "dynamo_frontend"
class nats_client:
"""NATS client metrics. DistributedRuntime contains a NATS client shared by all children)"""
# Prefix for all NATS client metrics
PREFIX = ""
# Total number of bytes received by NATS client
IN_TOTAL_BYTES = "nats_client_in_total_bytes"
# Total number of bytes sent by NATS client
OUT_OVERHEAD_BYTES = "nats_client_out_overhead_bytes"
# Total number of messages received by NATS client
IN_MESSAGES = "nats_client_in_messages"
# Total number of messages sent by NATS client
OUT_MESSAGES = "nats_client_out_messages"
# Current number of active connections for NATS client
# Note: Gauge metric measuring current connections, not cumulative total
CURRENT_CONNECTIONS = "nats_client_current_connections"
# Current connection state of NATS client (0=disconnected, 1=connected, 2=reconnecting)
CONNECTION_STATE = "nats_client_connection_state"
class nats_service:
"""NATS service metrics, from the $SRV.STATS.<service_name> requests on NATS server"""
# Prefix for all NATS service metrics
PREFIX = ""
# Average processing time in milliseconds (maps to: average_processing_time in ms)
PROCESSING_MS_AVG = "nats_service_processing_ms_avg"
# Total errors across all endpoints (maps to: num_errors)
ERRORS_TOTAL = "nats_service_errors_total"
# Total requests across all endpoints (maps to: num_requests)
REQUESTS_TOTAL = "nats_service_requests_total"
# Total processing time in milliseconds (maps to: processing_time in ms)
PROCESSING_MS_TOTAL = "nats_service_processing_ms_total"
# Number of active services (derived from ServiceSet.services)
ACTIVE_SERVICES = "nats_service_active_services"
# Number of active endpoints (derived from ServiceInfo.endpoints)
ACTIVE_ENDPOINTS = "nats_service_active_endpoints"
class task_tracker: class task_tracker:
"""Task tracker Prometheus metric name suffixes""" """Task tracker Prometheus metric name suffixes"""
......
...@@ -21,6 +21,7 @@ use dynamo_runtime::{ ...@@ -21,6 +21,7 @@ use dynamo_runtime::{
use crate::{ use crate::{
backend::Backend, backend::Backend,
entrypoint::{self, EngineFactoryCallback, RouterConfig}, entrypoint::{self, EngineFactoryCallback, RouterConfig},
http::service::metrics::Metrics,
kv_router::PrefillRouter, kv_router::PrefillRouter,
model_card::ModelDeploymentCard, model_card::ModelDeploymentCard,
model_type::{ModelInput, ModelType}, model_type::{ModelInput, ModelType},
...@@ -54,6 +55,7 @@ pub struct ModelWatcher { ...@@ -54,6 +55,7 @@ pub struct ModelWatcher {
notify_on_model: Notify, notify_on_model: Notify,
model_update_tx: Option<Sender<ModelUpdate>>, model_update_tx: Option<Sender<ModelUpdate>>,
engine_factory: Option<EngineFactoryCallback>, engine_factory: Option<EngineFactoryCallback>,
metrics: Arc<Metrics>,
} }
const ALL_MODEL_TYPES: &[ModelType] = &[ const ALL_MODEL_TYPES: &[ModelType] = &[
...@@ -70,6 +72,7 @@ impl ModelWatcher { ...@@ -70,6 +72,7 @@ impl ModelWatcher {
model_manager: Arc<ModelManager>, model_manager: Arc<ModelManager>,
router_config: RouterConfig, router_config: RouterConfig,
engine_factory: Option<EngineFactoryCallback>, engine_factory: Option<EngineFactoryCallback>,
metrics: Arc<Metrics>,
) -> ModelWatcher { ) -> ModelWatcher {
Self { Self {
manager: model_manager, manager: model_manager,
...@@ -78,6 +81,7 @@ impl ModelWatcher { ...@@ -78,6 +81,7 @@ impl ModelWatcher {
notify_on_model: Notify::new(), notify_on_model: Notify::new(),
model_update_tx: None, model_update_tx: None,
engine_factory, engine_factory,
metrics,
} }
} }
...@@ -451,6 +455,7 @@ impl ModelWatcher { ...@@ -451,6 +455,7 @@ impl ModelWatcher {
tokenizer_hf.clone(), tokenizer_hf.clone(),
prefill_chooser.clone(), prefill_chooser.clone(),
self.router_config.enforce_disagg, self.router_config.enforce_disagg,
self.metrics.clone(),
) )
.await .await
.context("build_routed_pipeline")? .context("build_routed_pipeline")?
...@@ -484,6 +489,7 @@ impl ModelWatcher { ...@@ -484,6 +489,7 @@ impl ModelWatcher {
tokenizer_hf, tokenizer_hf,
prefill_chooser, prefill_chooser,
self.router_config.enforce_disagg, self.router_config.enforce_disagg,
self.metrics.clone(),
) )
.await .await
.context("build_routed_pipeline_with_preprocessor")?; .context("build_routed_pipeline_with_preprocessor")?;
......
...@@ -8,6 +8,7 @@ use crate::{ ...@@ -8,6 +8,7 @@ use crate::{
discovery::{KvWorkerMonitor, ModelManager, ModelWatcher}, discovery::{KvWorkerMonitor, ModelManager, ModelWatcher},
engines::StreamingEngineAdapter, engines::StreamingEngineAdapter,
entrypoint::{EngineConfig, RouterConfig}, entrypoint::{EngineConfig, RouterConfig},
http::service::metrics::Metrics,
kv_router::{KvPushRouter, KvRouter, PrefillRouter}, kv_router::{KvPushRouter, KvRouter, PrefillRouter},
migration::Migration, migration::Migration,
model_card::ModelDeploymentCard, model_card::ModelDeploymentCard,
...@@ -62,11 +63,14 @@ pub async fn prepare_engine( ...@@ -62,11 +63,14 @@ pub async fn prepare_engine(
model: local_model, .. model: local_model, ..
} => { } => {
let model_manager = Arc::new(ModelManager::new()); let model_manager = Arc::new(ModelManager::new());
// Create metrics for migration tracking (not exposed via /metrics in Dynamic engine mode)
let metrics = Arc::new(Metrics::new());
let watch_obj = Arc::new(ModelWatcher::new( let watch_obj = Arc::new(ModelWatcher::new(
distributed_runtime.clone(), distributed_runtime.clone(),
model_manager.clone(), model_manager.clone(),
RouterConfig::default(), RouterConfig::default(),
None, None,
metrics,
)); ));
let discovery = distributed_runtime.discovery(); let discovery = distributed_runtime.discovery();
let discovery_stream = discovery let discovery_stream = discovery
...@@ -174,6 +178,7 @@ pub async fn build_routed_pipeline<Req, Resp>( ...@@ -174,6 +178,7 @@ pub async fn build_routed_pipeline<Req, Resp>(
hf_tokenizer: tokenizers::Tokenizer, hf_tokenizer: tokenizers::Tokenizer,
prefill_chooser: Option<Arc<PrefillRouter>>, prefill_chooser: Option<Arc<PrefillRouter>>,
enforce_disagg: bool, enforce_disagg: bool,
metrics: Arc<Metrics>,
) -> anyhow::Result<ServiceEngine<SingleIn<Req>, ManyOut<Annotated<Resp>>>> ) -> anyhow::Result<ServiceEngine<SingleIn<Req>, ManyOut<Annotated<Resp>>>>
where where
Req: Data, Req: Data,
...@@ -198,6 +203,7 @@ where ...@@ -198,6 +203,7 @@ where
hf_tokenizer, hf_tokenizer,
prefill_chooser, prefill_chooser,
enforce_disagg, enforce_disagg,
metrics,
) )
.await .await
} }
...@@ -213,6 +219,7 @@ pub async fn build_routed_pipeline_with_preprocessor<Req, Resp>( ...@@ -213,6 +219,7 @@ pub async fn build_routed_pipeline_with_preprocessor<Req, Resp>(
hf_tokenizer: tokenizers::Tokenizer, hf_tokenizer: tokenizers::Tokenizer,
prefill_chooser: Option<Arc<PrefillRouter>>, prefill_chooser: Option<Arc<PrefillRouter>>,
enforce_disagg: bool, enforce_disagg: bool,
metrics: Arc<Metrics>,
) -> anyhow::Result<ServiceEngine<SingleIn<Req>, ManyOut<Annotated<Resp>>>> ) -> anyhow::Result<ServiceEngine<SingleIn<Req>, ManyOut<Annotated<Resp>>>>
where where
Req: Data, Req: Data,
...@@ -227,7 +234,7 @@ where ...@@ -227,7 +234,7 @@ where
let frontend = SegmentSource::<SingleIn<Req>, ManyOut<Annotated<Resp>>>::new(); let frontend = SegmentSource::<SingleIn<Req>, ManyOut<Annotated<Resp>>>::new();
let preprocessor_op = preprocessor.into_operator(); let preprocessor_op = preprocessor.into_operator();
let backend = Backend::from_tokenizer(hf_tokenizer).into_operator(); let backend = Backend::from_tokenizer(hf_tokenizer).into_operator();
let migration = Migration::from_mdc(card).into_operator(); let migration = Migration::from_mdc(card, metrics).into_operator();
// For KV routing, use the client from the chooser to ensure shared state // For KV routing, use the client from the chooser to ensure shared state
let router_client = if router_mode == RouterMode::KV { let router_client = if router_mode == RouterMode::KV {
......
...@@ -8,6 +8,7 @@ use crate::{ ...@@ -8,6 +8,7 @@ use crate::{
engines::StreamingEngineAdapter, engines::StreamingEngineAdapter,
entrypoint::{EngineConfig, RouterConfig, input::common}, entrypoint::{EngineConfig, RouterConfig, input::common},
grpc::service::kserve, grpc::service::kserve,
http::service::metrics::Metrics,
namespace::is_global_namespace, namespace::is_global_namespace,
types::openai::{ types::openai::{
chat_completions::{NvCreateChatCompletionRequest, NvCreateChatCompletionStreamResponse}, chat_completions::{NvCreateChatCompletionRequest, NvCreateChatCompletionStreamResponse},
...@@ -110,7 +111,9 @@ async fn run_watcher( ...@@ -110,7 +111,9 @@ async fn run_watcher(
router_config: RouterConfig, router_config: RouterConfig,
target_namespace: Option<String>, target_namespace: Option<String>,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let watch_obj = ModelWatcher::new(runtime.clone(), model_manager, router_config, None); // Create metrics for migration tracking (not exposed via /metrics in gRPC mode)
let metrics = Arc::new(Metrics::new());
let watch_obj = ModelWatcher::new(runtime.clone(), model_manager, router_config, None, metrics);
tracing::debug!("Waiting for remote model"); tracing::debug!("Waiting for remote model");
let discovery = runtime.discovery(); let discovery = runtime.discovery();
let discovery_stream = discovery let discovery_stream = discovery
......
...@@ -205,6 +205,7 @@ async fn run_watcher( ...@@ -205,6 +205,7 @@ async fn run_watcher(
model_manager, model_manager,
router_config, router_config,
engine_factory, engine_factory,
metrics.clone(),
); );
tracing::debug!("Waiting for remote model"); tracing::debug!("Waiting for remote model");
let discovery = runtime.discovery(); let discovery = runtime.discovery();
......
...@@ -179,6 +179,7 @@ pub struct Metrics { ...@@ -179,6 +179,7 @@ pub struct Metrics {
model_context_length: IntGaugeVec, model_context_length: IntGaugeVec,
model_kv_cache_block_size: IntGaugeVec, model_kv_cache_block_size: IntGaugeVec,
model_migration_limit: IntGaugeVec, model_migration_limit: IntGaugeVec,
model_migration_total: IntCounterVec,
} }
// Inflight tracks requests from HTTP handler start until complete response is finished. // Inflight tracks requests from HTTP handler start until complete response is finished.
...@@ -507,6 +508,15 @@ impl Metrics { ...@@ -507,6 +508,15 @@ impl Metrics {
) )
.unwrap(); .unwrap();
let model_migration_total = IntCounterVec::new(
Opts::new(
frontend_metric_name(frontend_service::MODEL_MIGRATION_TOTAL),
"Total number of request migrations due to worker unavailability",
),
&["model", frontend_service::MIGRATION_TYPE_LABEL],
)
.unwrap();
Metrics { Metrics {
request_counter, request_counter,
inflight_gauge, inflight_gauge,
...@@ -525,6 +535,7 @@ impl Metrics { ...@@ -525,6 +535,7 @@ impl Metrics {
model_context_length, model_context_length,
model_kv_cache_block_size, model_kv_cache_block_size,
model_migration_limit, model_migration_limit,
model_migration_total,
} }
} }
...@@ -623,6 +634,7 @@ impl Metrics { ...@@ -623,6 +634,7 @@ impl Metrics {
registry.register(Box::new(self.model_context_length.clone()))?; registry.register(Box::new(self.model_context_length.clone()))?;
registry.register(Box::new(self.model_kv_cache_block_size.clone()))?; registry.register(Box::new(self.model_kv_cache_block_size.clone()))?;
registry.register(Box::new(self.model_migration_limit.clone()))?; registry.register(Box::new(self.model_migration_limit.clone()))?;
registry.register(Box::new(self.model_migration_total.clone()))?;
Ok(()) Ok(())
} }
...@@ -678,6 +690,34 @@ impl Metrics { ...@@ -678,6 +690,34 @@ impl Metrics {
Ok(()) Ok(())
} }
/// Increment the migration counter for a new request migration
pub fn inc_migration_new_request(&self, model: &str) {
self.model_migration_total
.with_label_values(&[model, frontend_service::migration_type::NEW_REQUEST])
.inc();
}
/// Increment the migration counter for an ongoing request migration
pub fn inc_migration_ongoing_request(&self, model: &str) {
self.model_migration_total
.with_label_values(&[model, frontend_service::migration_type::ONGOING_REQUEST])
.inc();
}
/// Get the current count of new request migrations for a model
pub fn get_migration_new_request_count(&self, model: &str) -> u64 {
self.model_migration_total
.with_label_values(&[model, frontend_service::migration_type::NEW_REQUEST])
.get()
}
/// Get the current count of ongoing request migrations for a model
pub fn get_migration_ongoing_request_count(&self, model: &str) -> u64 {
self.model_migration_total
.with_label_values(&[model, frontend_service::migration_type::ONGOING_REQUEST])
.get()
}
/// Create a new [`InflightGuard`] for the given model and annotate if its a streaming request, /// Create a new [`InflightGuard`] for the given model and annotate if its a streaming request,
/// and the kind of endpoint that was hit /// and the kind of endpoint that was hit
/// ///
......
...@@ -11,24 +11,24 @@ use async_nats::client::{ ...@@ -11,24 +11,24 @@ use async_nats::client::{
}; };
use crate::{ use crate::{
model_card::ModelDeploymentCard, preprocessor::BackendOutput, http::service::metrics::Metrics, model_card::ModelDeploymentCard, preprocessor::BackendOutput,
protocols::common::llm_backend::PreprocessedRequest, protocols::common::llm_backend::PreprocessedRequest,
}; };
use dynamo_runtime::{ use dynamo_runtime::pipeline::{
pipeline::{
AsyncEngineContext, AsyncEngineContextProvider, Context, ManyOut, Operator, ResponseStream, AsyncEngineContext, AsyncEngineContextProvider, Context, ManyOut, Operator, ResponseStream,
ServerStreamingEngine, SingleIn, async_trait, network::STREAM_ERR_MSG, ServerStreamingEngine, SingleIn, async_trait, network::STREAM_ERR_MSG,
},
protocols::{annotated::Annotated, maybe_error::MaybeError},
}; };
use dynamo_runtime::protocols::{annotated::Annotated, maybe_error::MaybeError};
pub struct Migration { pub struct Migration {
migration_limit: u32, migration_limit: u32,
model_name: Arc<String>,
metrics: Arc<Metrics>,
} }
impl Migration { impl Migration {
pub fn from_mdc(mdc: &ModelDeploymentCard) -> Arc<Self> { pub fn from_mdc(mdc: &ModelDeploymentCard, metrics: Arc<Metrics>) -> Arc<Self> {
tracing::debug!( tracing::debug!(
"model {} migration limit {}", "model {} migration limit {}",
mdc.display_name, mdc.display_name,
...@@ -36,6 +36,8 @@ impl Migration { ...@@ -36,6 +36,8 @@ impl Migration {
); );
Arc::new(Self { Arc::new(Self {
migration_limit: mdc.migration_limit, migration_limit: mdc.migration_limit,
model_name: Arc::new(mdc.display_name.clone()),
metrics,
}) })
} }
} }
...@@ -57,8 +59,14 @@ impl ...@@ -57,8 +59,14 @@ impl
let (preprocessed_request, context) = request.transfer(()); let (preprocessed_request, context) = request.transfer(());
let engine_ctx = context.context(); let engine_ctx = context.context();
let engine_ctx_ = engine_ctx.clone(); let engine_ctx_ = engine_ctx.clone();
let retry_manager = let retry_manager = RetryManager::build(
RetryManager::build(engine_ctx, preprocessed_request, next, self.migration_limit) engine_ctx,
preprocessed_request,
next,
self.migration_limit,
self.model_name.clone(),
self.metrics.clone(),
)
.await?; .await?;
let response_stream = stream::unfold(retry_manager, move |mut retry_manager| async move { let response_stream = stream::unfold(retry_manager, move |mut retry_manager| async move {
retry_manager retry_manager
...@@ -76,6 +84,8 @@ struct RetryManager { ...@@ -76,6 +84,8 @@ struct RetryManager {
next_generate: ServerStreamingEngine<PreprocessedRequest, Annotated<BackendOutput>>, next_generate: ServerStreamingEngine<PreprocessedRequest, Annotated<BackendOutput>>,
next_stream: Option<ManyOut<Annotated<BackendOutput>>>, next_stream: Option<ManyOut<Annotated<BackendOutput>>>,
retries_left: u32, retries_left: u32,
model_name: Arc<String>,
metrics: Arc<Metrics>,
} }
impl RetryManager { impl RetryManager {
...@@ -84,6 +94,8 @@ impl RetryManager { ...@@ -84,6 +94,8 @@ impl RetryManager {
preprocessed_request: PreprocessedRequest, preprocessed_request: PreprocessedRequest,
next: ServerStreamingEngine<PreprocessedRequest, Annotated<BackendOutput>>, next: ServerStreamingEngine<PreprocessedRequest, Annotated<BackendOutput>>,
retries_left: u32, retries_left: u32,
model_name: Arc<String>,
metrics: Arc<Metrics>,
) -> Result<Self> { ) -> Result<Self> {
let mut slf = Self { let mut slf = Self {
context, context,
...@@ -91,6 +103,8 @@ impl RetryManager { ...@@ -91,6 +103,8 @@ impl RetryManager {
next_generate: next, next_generate: next,
next_stream: None, next_stream: None,
retries_left: retries_left + 1, // +1 to account for the initial attempt retries_left: retries_left + 1, // +1 to account for the initial attempt
model_name,
metrics,
}; };
slf.new_stream().await?; slf.new_stream().await?;
Ok(slf) Ok(slf)
...@@ -114,6 +128,7 @@ impl RetryManager { ...@@ -114,6 +128,7 @@ impl RetryManager {
.any(|e| e.to_string().starts_with(STREAM_ERR_MSG)) .any(|e| e.to_string().starts_with(STREAM_ERR_MSG))
{ {
tracing::warn!("Stream disconnected... recreating stream..."); tracing::warn!("Stream disconnected... recreating stream...");
self.metrics.inc_migration_ongoing_request(&self.model_name);
if let Err(err) = self.new_stream().await { if let Err(err) = self.new_stream().await {
tracing::warn!("Cannot recreate stream: {:#}", err); tracing::warn!("Cannot recreate stream: {:#}", err);
} else { } else {
...@@ -146,6 +161,7 @@ impl RetryManager { ...@@ -146,6 +161,7 @@ impl RetryManager {
&& matches!(req_err.kind(), NatsNoResponders) && matches!(req_err.kind(), NatsNoResponders)
{ {
tracing::warn!("Creating new stream... retrying..."); tracing::warn!("Creating new stream... retrying...");
self.metrics.inc_migration_new_request(&self.model_name);
continue; continue;
} }
break; break;
...@@ -183,12 +199,15 @@ impl RetryManager { ...@@ -183,12 +199,15 @@ impl RetryManager {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use crate::http::service::metrics::Metrics;
use crate::protocols::common::{OutputOptions, SamplingOptions, StopConditions}; use crate::protocols::common::{OutputOptions, SamplingOptions, StopConditions};
use dynamo_runtime::pipeline::AsyncEngine; use dynamo_runtime::pipeline::AsyncEngine;
use dynamo_runtime::pipeline::context::Controller; use dynamo_runtime::pipeline::context::Controller;
use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::atomic::{AtomicU32, Ordering};
use tokio::sync::mpsc; use tokio::sync::mpsc;
const TEST_MODEL: &str = "test-model";
// Helper to create a mock preprocessed request // Helper to create a mock preprocessed request
fn create_mock_request(max_tokens: u32) -> PreprocessedRequest { fn create_mock_request(max_tokens: u32) -> PreprocessedRequest {
PreprocessedRequest::builder() PreprocessedRequest::builder()
...@@ -495,7 +514,15 @@ mod tests { ...@@ -495,7 +514,15 @@ mod tests {
mock_engine; mock_engine;
let ctx = Arc::new(Controller::new(context_id.clone())); let ctx = Arc::new(Controller::new(context_id.clone()));
let mut retry_manager = RetryManager::build(ctx, request, next_generate, 0) let metrics = Arc::new(Metrics::new());
let mut retry_manager = RetryManager::build(
ctx,
request,
next_generate,
0,
Arc::new(TEST_MODEL.to_string()),
metrics.clone(),
)
.await .await
.expect("Failed to build RetryManager"); .expect("Failed to build RetryManager");
...@@ -511,6 +538,9 @@ mod tests { ...@@ -511,6 +538,9 @@ mod tests {
assert_eq!(output.token_ids, vec![101 + i as u32]); // 101, 102, 103, ..., 110 assert_eq!(output.token_ids, vec![101 + i as u32]); // 101, 102, 103, ..., 110
} }
} }
assert_eq!(metrics.get_migration_new_request_count(TEST_MODEL), 0);
assert_eq!(metrics.get_migration_ongoing_request_count(TEST_MODEL), 0);
} }
/// Test case 2: New request migration /// Test case 2: New request migration
...@@ -534,7 +564,15 @@ mod tests { ...@@ -534,7 +564,15 @@ mod tests {
mock_engine; mock_engine;
let ctx = Arc::new(Controller::new(context_id.clone())); let ctx = Arc::new(Controller::new(context_id.clone()));
let mut retry_manager = RetryManager::build(ctx, request, next_generate, 3) let metrics = Arc::new(Metrics::new());
let mut retry_manager = RetryManager::build(
ctx,
request,
next_generate,
3,
Arc::new(TEST_MODEL.to_string()),
metrics.clone(),
)
.await .await
.expect("Failed to build RetryManager"); .expect("Failed to build RetryManager");
...@@ -550,6 +588,9 @@ mod tests { ...@@ -550,6 +588,9 @@ mod tests {
assert_eq!(output.token_ids, vec![101 + i as u32]); // 101, 102, 103, ..., 110 assert_eq!(output.token_ids, vec![101 + i as u32]); // 101, 102, 103, ..., 110
} }
} }
assert_eq!(metrics.get_migration_new_request_count(TEST_MODEL), 1);
assert_eq!(metrics.get_migration_ongoing_request_count(TEST_MODEL), 0);
} }
/// Test case 3: Ongoing request migration /// Test case 3: Ongoing request migration
...@@ -574,7 +615,15 @@ mod tests { ...@@ -574,7 +615,15 @@ mod tests {
mock_engine; mock_engine;
let ctx = Arc::new(Controller::new(context_id.clone())); let ctx = Arc::new(Controller::new(context_id.clone()));
let mut retry_manager = RetryManager::build(ctx, request, next_generate, 3) let metrics = Arc::new(Metrics::new());
let mut retry_manager = RetryManager::build(
ctx,
request,
next_generate,
3,
Arc::new(TEST_MODEL.to_string()),
metrics.clone(),
)
.await .await
.expect("Failed to build RetryManager"); .expect("Failed to build RetryManager");
...@@ -593,6 +642,9 @@ mod tests { ...@@ -593,6 +642,9 @@ mod tests {
assert_eq!(output.token_ids, vec![101 + i as u32]); // 101, 102, 103, ..., 110 assert_eq!(output.token_ids, vec![101 + i as u32]); // 101, 102, 103, ..., 110
} }
} }
assert_eq!(metrics.get_migration_new_request_count(TEST_MODEL), 0);
assert_eq!(metrics.get_migration_ongoing_request_count(TEST_MODEL), 1);
} }
/// Test case 4: New request migration - indefinite failure /// Test case 4: New request migration - indefinite failure
...@@ -615,12 +667,24 @@ mod tests { ...@@ -615,12 +667,24 @@ mod tests {
// Should fail to build due to initial stream creation failure after exhausting all 3 retries // Should fail to build due to initial stream creation failure after exhausting all 3 retries
let ctx = Arc::new(Controller::new(context_id.clone())); let ctx = Arc::new(Controller::new(context_id.clone()));
let retry_manager_result = RetryManager::build(ctx, request, next_generate, 3).await; let metrics = Arc::new(Metrics::new());
let retry_manager_result = RetryManager::build(
ctx,
request,
next_generate,
3,
Arc::new(TEST_MODEL.to_string()),
metrics.clone(),
)
.await;
assert!(retry_manager_result.is_err()); assert!(retry_manager_result.is_err());
if let Err(error) = retry_manager_result { if let Err(error) = retry_manager_result {
assert!(error.to_string().contains("no responders")); assert!(error.to_string().contains("no responders"));
} }
assert_eq!(metrics.get_migration_new_request_count(TEST_MODEL), 4); // 3 retries + 1 final failure
assert_eq!(metrics.get_migration_ongoing_request_count(TEST_MODEL), 0);
} }
/// Test case 5: Ongoing request migration - indefinite failure /// Test case 5: Ongoing request migration - indefinite failure
...@@ -642,7 +706,15 @@ mod tests { ...@@ -642,7 +706,15 @@ mod tests {
mock_engine; mock_engine;
let ctx = Arc::new(Controller::new(context_id.clone())); let ctx = Arc::new(Controller::new(context_id.clone()));
let mut retry_manager = RetryManager::build(ctx, request, next_generate, 3) // 3 retries let metrics = Arc::new(Metrics::new());
let mut retry_manager = RetryManager::build(
ctx,
request,
next_generate,
3,
Arc::new(TEST_MODEL.to_string()),
metrics.clone(),
) // 3 retries
.await .await
.expect("Failed to build RetryManager"); .expect("Failed to build RetryManager");
...@@ -670,6 +742,9 @@ mod tests { ...@@ -670,6 +742,9 @@ mod tests {
if let Some(error) = error_response.err() { if let Some(error) = error_response.err() {
assert!(error.to_string().contains(STREAM_ERR_MSG)); assert!(error.to_string().contains(STREAM_ERR_MSG));
} }
assert_eq!(metrics.get_migration_new_request_count(TEST_MODEL), 3); // 2 retries + 1 final failure
assert_eq!(metrics.get_migration_ongoing_request_count(TEST_MODEL), 1); // initial ongoing failure retry
} }
/// Test case 6: Ongoing request migration - indefinite failure with stream errors /// Test case 6: Ongoing request migration - indefinite failure with stream errors
...@@ -691,7 +766,15 @@ mod tests { ...@@ -691,7 +766,15 @@ mod tests {
mock_engine; mock_engine;
let ctx = Arc::new(Controller::new(context_id.clone())); let ctx = Arc::new(Controller::new(context_id.clone()));
let mut retry_manager = RetryManager::build(ctx, request, next_generate, 3) // 3 retries let metrics = Arc::new(Metrics::new());
let mut retry_manager = RetryManager::build(
ctx,
request,
next_generate,
3,
Arc::new(TEST_MODEL.to_string()),
metrics.clone(),
) // 3 retries
.await .await
.expect("Failed to build RetryManager"); .expect("Failed to build RetryManager");
...@@ -719,6 +802,9 @@ mod tests { ...@@ -719,6 +802,9 @@ mod tests {
if let Some(error) = error_response.err() { if let Some(error) = error_response.err() {
assert!(error.to_string().contains(STREAM_ERR_MSG)); assert!(error.to_string().contains(STREAM_ERR_MSG));
} }
assert_eq!(metrics.get_migration_new_request_count(TEST_MODEL), 0);
assert_eq!(metrics.get_migration_ongoing_request_count(TEST_MODEL), 4); // 3 retries + 1 final failure
} }
/// Test case 7: Request cancelled when creating new stream /// Test case 7: Request cancelled when creating new stream
...@@ -745,7 +831,16 @@ mod tests { ...@@ -745,7 +831,16 @@ mod tests {
ctx.stop_generating(); ctx.stop_generating();
// Should fail to build due to stopped context // Should fail to build due to stopped context
let retry_manager_result = RetryManager::build(ctx, request, next_generate, 3).await; let metrics = Arc::new(Metrics::new());
let retry_manager_result = RetryManager::build(
ctx,
request,
next_generate,
3,
Arc::new(TEST_MODEL.to_string()),
metrics.clone(),
)
.await;
assert!(retry_manager_result.is_err()); assert!(retry_manager_result.is_err());
if let Err(error) = retry_manager_result { if let Err(error) = retry_manager_result {
...@@ -755,5 +850,8 @@ mod tests { ...@@ -755,5 +850,8 @@ mod tests {
.contains(&format!("Context id {} is stopped or killed", context_id)) .contains(&format!("Context id {} is stopped or killed", context_id))
); );
} }
assert_eq!(metrics.get_migration_new_request_count(TEST_MODEL), 0);
assert_eq!(metrics.get_migration_ongoing_request_count(TEST_MODEL), 0);
} }
} }
...@@ -340,6 +340,7 @@ mod integration_tests { ...@@ -340,6 +340,7 @@ mod integration_tests {
service.state().manager_clone(), service.state().manager_clone(),
dynamo_llm::entrypoint::RouterConfig::default(), dynamo_llm::entrypoint::RouterConfig::default(),
None, None,
service.state().metrics_clone(),
); );
// Start watching for model registrations via discovery interface // Start watching for model registrations via discovery interface
let discovery = distributed_runtime.discovery(); let discovery = distributed_runtime.discovery();
...@@ -512,6 +513,7 @@ mod integration_tests { ...@@ -512,6 +513,7 @@ mod integration_tests {
service.state().manager_clone(), service.state().manager_clone(),
dynamo_llm::entrypoint::RouterConfig::default(), dynamo_llm::entrypoint::RouterConfig::default(),
None, None,
service.state().metrics_clone(),
); );
// Get all model entries for our test model // Get all model entries for our test model
......
...@@ -147,6 +147,21 @@ pub mod frontend_service { ...@@ -147,6 +147,21 @@ pub mod frontend_service {
/// Request migration limit for a worker serving the model (MDC) /// Request migration limit for a worker serving the model (MDC)
pub const MODEL_MIGRATION_LIMIT: &str = "model_migration_limit"; pub const MODEL_MIGRATION_LIMIT: &str = "model_migration_limit";
/// Total number of request migrations due to worker unavailability
pub const MODEL_MIGRATION_TOTAL: &str = "model_migration_total";
/// Label name for the type of migration
pub const MIGRATION_TYPE_LABEL: &str = "migration_type";
/// Migration type label values
pub mod migration_type {
/// Migration during initial stream creation (NoResponders error)
pub const NEW_REQUEST: &str = "new_request";
/// Migration during ongoing request (stream disconnected)
pub const ONGOING_REQUEST: &str = "ongoing_request";
}
/// Status label values /// Status label values
pub mod status { pub mod status {
/// Value for successful requests /// Value for successful requests
......
...@@ -27,6 +27,7 @@ from .utils import ( ...@@ -27,6 +27,7 @@ from .utils import (
determine_request_receiving_worker, determine_request_receiving_worker,
start_completion_request, start_completion_request,
validate_completion_response, validate_completion_response,
verify_migration_metrics,
verify_migration_occurred, verify_migration_occurred,
) )
...@@ -203,6 +204,11 @@ def test_request_migration_sglang_worker_failure( ...@@ -203,6 +204,11 @@ def test_request_migration_sglang_worker_failure(
# Step 7: Verify migration occurred # Step 7: Verify migration occurred
verify_migration_occurred(frontend) verify_migration_occurred(frontend)
# Step 8: Verify migration metrics
verify_migration_metrics(
frontend.frontend_port, expected_ongoing_request_count=1
)
@pytest.mark.timeout(235) # 3x average @pytest.mark.timeout(235) # 3x average
@pytest.mark.skip(reason="SGLang graceful shutdown not yet implemented") @pytest.mark.skip(reason="SGLang graceful shutdown not yet implemented")
...@@ -273,6 +279,11 @@ def test_request_migration_sglang_graceful_shutdown( ...@@ -273,6 +279,11 @@ def test_request_migration_sglang_graceful_shutdown(
# Step 7: Verify migration occurred during graceful shutdown # Step 7: Verify migration occurred during graceful shutdown
verify_migration_occurred(frontend) verify_migration_occurred(frontend)
# Step 8: Verify migration metrics
verify_migration_metrics(
frontend.frontend_port, expected_ongoing_request_count=1
)
@pytest.mark.timeout(135) # 3x average @pytest.mark.timeout(135) # 3x average
def test_no_request_migration_sglang_worker_failure( def test_no_request_migration_sglang_worker_failure(
......
...@@ -27,6 +27,7 @@ from .utils import ( ...@@ -27,6 +27,7 @@ from .utils import (
determine_request_receiving_worker, determine_request_receiving_worker,
start_completion_request, start_completion_request,
validate_completion_response, validate_completion_response,
verify_migration_metrics,
verify_migration_occurred, verify_migration_occurred,
) )
...@@ -189,6 +190,11 @@ def test_request_migration_trtllm_worker_failure( ...@@ -189,6 +190,11 @@ def test_request_migration_trtllm_worker_failure(
# Step 7: Verify migration occurred # Step 7: Verify migration occurred
verify_migration_occurred(frontend) verify_migration_occurred(frontend)
# Step 8: Verify migration metrics
verify_migration_metrics(
frontend.frontend_port, expected_ongoing_request_count=1
)
@pytest.mark.timeout(290) # 3x average @pytest.mark.timeout(290) # 3x average
@pytest.mark.skip(reason="TRT-LLM graceful shutdown not yet implemented") @pytest.mark.skip(reason="TRT-LLM graceful shutdown not yet implemented")
...@@ -247,6 +253,11 @@ def test_request_migration_trtllm_graceful_shutdown( ...@@ -247,6 +253,11 @@ def test_request_migration_trtllm_graceful_shutdown(
# Step 7: Verify migration occurred during graceful shutdown # Step 7: Verify migration occurred during graceful shutdown
verify_migration_occurred(frontend) verify_migration_occurred(frontend)
# Step 8: Verify migration metrics
verify_migration_metrics(
frontend.frontend_port, expected_ongoing_request_count=1
)
@pytest.mark.timeout(185) # 3x average @pytest.mark.timeout(185) # 3x average
def test_no_request_migration_trtllm_worker_failure( def test_no_request_migration_trtllm_worker_failure(
......
...@@ -27,6 +27,7 @@ from .utils import ( ...@@ -27,6 +27,7 @@ from .utils import (
determine_request_receiving_worker, determine_request_receiving_worker,
start_completion_request, start_completion_request,
validate_completion_response, validate_completion_response,
verify_migration_metrics,
verify_migration_occurred, verify_migration_occurred,
) )
...@@ -199,6 +200,11 @@ def test_request_migration_vllm_worker_failure( ...@@ -199,6 +200,11 @@ def test_request_migration_vllm_worker_failure(
# Step 7: Verify migration occurred # Step 7: Verify migration occurred
verify_migration_occurred(frontend) verify_migration_occurred(frontend)
# Step 8: Verify migration metrics
verify_migration_metrics(
frontend.frontend_port, expected_ongoing_request_count=1
)
@pytest.mark.timeout(280) # 3x average @pytest.mark.timeout(280) # 3x average
def test_request_migration_vllm_graceful_shutdown( def test_request_migration_vllm_graceful_shutdown(
...@@ -256,6 +262,11 @@ def test_request_migration_vllm_graceful_shutdown( ...@@ -256,6 +262,11 @@ def test_request_migration_vllm_graceful_shutdown(
# Step 7: Verify migration occurred during graceful shutdown # Step 7: Verify migration occurred during graceful shutdown
verify_migration_occurred(frontend) verify_migration_occurred(frontend)
# Step 8: Verify migration metrics
verify_migration_metrics(
frontend.frontend_port, expected_ongoing_request_count=1
)
@pytest.mark.timeout(150) # 3x average @pytest.mark.timeout(150) # 3x average
def test_no_request_migration_vllm_worker_failure( def test_no_request_migration_vllm_worker_failure(
......
...@@ -209,3 +209,87 @@ def verify_migration_occurred(frontend_process: DynamoFrontendProcess) -> None: ...@@ -209,3 +209,87 @@ def verify_migration_occurred(frontend_process: DynamoFrontendProcess) -> None:
assert ( assert (
"Cannot recreate stream: " not in log_content "Cannot recreate stream: " not in log_content
), "'Cannot recreate stream: ...' error found in logs" ), "'Cannot recreate stream: ...' error found in logs"
def _parse_migration_metric(
metrics_text: str, model_name: str, migration_type: str
) -> int:
"""
Parse the migration metric value from Prometheus metrics text.
Args:
metrics_text: Raw Prometheus metrics text
model_name: The model name label value
migration_type: The migration_type label value ("ongoing_request" or "new_request")
Returns:
The metric count, or 0 if not found
"""
import re
# Match pattern like:
# dynamo_frontend_model_migration_total{migration_type="ongoing_request",model="Qwen/Qwen3-0.6B"} 1
# Labels can be in any order
pattern = rf'dynamo_frontend_model_migration_total\{{[^}}]*migration_type="{migration_type}"[^}}]*model="{re.escape(model_name)}"[^}}]*\}}\s+(\d+)'
match = re.search(pattern, metrics_text)
if match:
return int(match.group(1))
# Try with labels in reverse order
pattern = rf'dynamo_frontend_model_migration_total\{{[^}}]*model="{re.escape(model_name)}"[^}}]*migration_type="{migration_type}"[^}}]*\}}\s+(\d+)'
match = re.search(pattern, metrics_text)
if match:
return int(match.group(1))
return 0
def verify_migration_metrics(
frontend_port: int,
expected_ongoing_request_count: int = 0,
expected_new_request_count: int = 0,
) -> None:
"""
Verify migration metrics by querying the frontend's /metrics endpoint.
Args:
frontend_port: Port where the frontend is running
expected_ongoing_request_count: Expected count of ongoing_request migrations
expected_new_request_count: Expected count of new_request migrations
"""
metrics_url = f"http://localhost:{frontend_port}/metrics"
try:
response = requests.get(metrics_url, timeout=1)
response.raise_for_status()
except requests.RequestException as e:
pytest.fail(f"Failed to fetch metrics from {metrics_url}: {e}")
metrics_text = response.text
logger.info(f"Fetched metrics from {metrics_url}")
# Parse metrics to find migration counts
ongoing_count = _parse_migration_metric(
metrics_text, FAULT_TOLERANCE_MODEL_NAME, "ongoing_request"
)
new_request_count = _parse_migration_metric(
metrics_text, FAULT_TOLERANCE_MODEL_NAME, "new_request"
)
logger.info(
f"Migration metrics - ongoing_request: {ongoing_count}, new_request: {new_request_count}"
)
if expected_ongoing_request_count > 0:
assert ongoing_count >= expected_ongoing_request_count, (
f"Expected at least {expected_ongoing_request_count} ongoing_request migrations, "
f"but got {ongoing_count}"
)
if expected_new_request_count > 0:
assert new_request_count >= expected_new_request_count, (
f"Expected at least {expected_new_request_count} new_request migrations, "
f"but got {new_request_count}"
)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment