"docs/guides/vscode:/vscode.git/clone" did not exist on "c6d66bc37d89dee33dd91d9dafea75d823ea8567"
Unverified Commit 53f3d2af authored by Keiven C's avatar Keiven C Committed by GitHub
Browse files

feat: replace polling with event-driven metrics updates (#3207)


Signed-off-by: default avatarKeiven Chang <keivenchang@users.noreply.github.com>
parent da7d1a33
...@@ -80,6 +80,7 @@ pub async fn run(runtime: Runtime, engine_config: EngineConfig) -> anyhow::Resul ...@@ -80,6 +80,7 @@ pub async fn run(runtime: Runtime, engine_config: EngineConfig) -> anyhow::Resul
router_config.busy_threshold, router_config.busy_threshold,
target_namespace, target_namespace,
Arc::new(http_service.clone()), Arc::new(http_service.clone()),
http_service.state().metrics_clone(),
) )
.await?; .await?;
} }
...@@ -217,7 +218,11 @@ async fn run_watcher( ...@@ -217,7 +218,11 @@ async fn run_watcher(
busy_threshold: Option<f64>, busy_threshold: Option<f64>,
target_namespace: Option<String>, target_namespace: Option<String>,
http_service: Arc<HttpService>, http_service: Arc<HttpService>,
metrics: Arc<crate::http::service::metrics::Metrics>,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
// Clone model_manager before it's moved into ModelWatcher
let model_manager_clone = model_manager.clone();
let mut watch_obj = ModelWatcher::new( let mut watch_obj = ModelWatcher::new(
runtime, runtime,
model_manager, model_manager,
...@@ -234,11 +239,22 @@ async fn run_watcher( ...@@ -234,11 +239,22 @@ async fn run_watcher(
watch_obj.set_notify_on_model_update(tx); watch_obj.set_notify_on_model_update(tx);
// Spawn a task to watch for model type changes and update HTTP service endpoints // Spawn a task to watch for model type changes and update HTTP service endpoints and metrics
let _endpoint_enabler_task = tokio::spawn(async move { let _endpoint_enabler_task = tokio::spawn(async move {
while let Some(model_type) = rx.recv().await { while let Some(model_type) = rx.recv().await {
tracing::debug!("Received model type update: {:?}", model_type); tracing::debug!("Received model type update: {:?}", model_type);
// Update HTTP endpoints (existing functionality)
update_http_endpoints(http_service.clone(), model_type); update_http_endpoints(http_service.clone(), model_type);
// Update metrics (only for added models)
update_model_metrics(
model_type,
model_manager_clone.clone(),
metrics.clone(),
Some(etcd_client.clone()),
)
.await;
} }
}); });
...@@ -271,3 +287,46 @@ fn update_http_endpoints(service: Arc<HttpService>, model_type: ModelUpdate) { ...@@ -271,3 +287,46 @@ fn update_http_endpoints(service: Arc<HttpService>, model_type: ModelUpdate) {
} }
} }
} }
/// Updates metrics for model type changes
async fn update_model_metrics(
model_type: ModelUpdate,
model_manager: Arc<ModelManager>,
metrics: Arc<crate::http::service::metrics::Metrics>,
etcd_client: Option<etcd::Client>,
) {
match model_type {
ModelUpdate::Added(model_type) => {
tracing::debug!("Updating metrics for added model type: {:?}", model_type);
// Get all model entries and update metrics for matching types
let model_entries = model_manager.get_model_entries();
for entry in model_entries {
if entry.model_type == model_type {
// Update runtime config metrics if available
if let Some(runtime_config) = &entry.runtime_config {
metrics.update_runtime_config_metrics(&entry.name, runtime_config);
}
// Update MDC metrics if etcd is available
if let Some(ref etcd) = etcd_client
&& let Err(e) = metrics
.update_metrics_from_model_entry_with_mdc(&entry, etcd)
.await
{
tracing::debug!(
model = %entry.name,
error = %e,
"Failed to update MDC metrics for newly added model"
);
}
}
}
}
ModelUpdate::Removed(model_type) => {
tracing::debug!("Model type removed: {:?}", model_type);
// Note: Metrics are typically not removed to preserve historical data
// This matches the behavior in the polling task
}
}
}
...@@ -472,37 +472,6 @@ impl Metrics { ...@@ -472,37 +472,6 @@ impl Metrics {
} }
} }
/// Update model deployment card metrics for a model
/// This should be called when model deployment card information is available
pub fn update_mdc_metrics(
&self,
model_name: &str,
context_length: u32,
kv_cache_block_size: u32,
migration_limit: u32,
) {
self.model_context_length
.with_label_values(&[model_name])
.set(context_length as i64);
self.model_kv_cache_block_size
.with_label_values(&[model_name])
.set(kv_cache_block_size as i64);
self.model_migration_limit
.with_label_values(&[model_name])
.set(migration_limit as i64);
}
/// Update metrics from a ModelEntry
/// This is a convenience method that extracts runtime config from a ModelEntry
/// and updates the appropriate metrics
pub fn update_metrics_from_model_entry(&self, model_entry: &ModelEntry) {
if let Some(runtime_config) = &model_entry.runtime_config {
self.update_runtime_config_metrics(&model_entry.name, runtime_config);
}
}
/// Update metrics from a ModelEntry and its ModelDeploymentCard /// Update metrics from a ModelEntry and its ModelDeploymentCard
/// This updates both runtime config metrics and MDC-specific metrics /// This updates both runtime config metrics and MDC-specific metrics
pub async fn update_metrics_from_model_entry_with_mdc( pub async fn update_metrics_from_model_entry_with_mdc(
...@@ -525,12 +494,19 @@ impl Metrics { ...@@ -525,12 +494,19 @@ impl Metrics {
.await .await
{ {
Ok(Some(mdc)) => { Ok(Some(mdc)) => {
self.update_mdc_metrics( // Inline MDC metrics update
&model_entry.name, self.model_context_length
mdc.context_length, .with_label_values(&[&model_entry.name])
mdc.kv_cache_block_size, .set(mdc.context_length as i64);
mdc.migration_limit,
); self.model_kv_cache_block_size
.with_label_values(&[&model_entry.name])
.set(mdc.kv_cache_block_size as i64);
self.model_migration_limit
.with_label_values(&[&model_entry.name])
.set(mdc.migration_limit as i64);
tracing::debug!( tracing::debug!(
model = %model_entry.name, model = %model_entry.name,
"Successfully updated MDC metrics" "Successfully updated MDC metrics"
...@@ -554,110 +530,6 @@ impl Metrics { ...@@ -554,110 +530,6 @@ impl Metrics {
Ok(()) Ok(())
} }
/// Start a background task that periodically updates runtime config metrics
///
/// ## Why Polling is Required
///
/// Polling is necessary because new models may come online at any time through the distributed
/// discovery system. The ModelManager is continuously updated as workers register/deregister
/// with etcd, and we need to periodically check for these changes to expose their metrics.
///
/// ## Behavior
///
/// - Polls the ModelManager for current models and updates metrics accordingly
/// - Models are never removed from metrics to preserve historical data
/// - If multiple model instances have the same name, only the first instance's metrics are used
/// - Subsequent instances with duplicate names will be skipped
///
/// ## MDC (Model Deployment Card) Behavior
///
/// Currently, we don't overwrite an MDC. The first worker to start wins, and we assume
/// that all other workers claiming to serve that model really are using the same configuration.
/// Later, every worker will have its own MDC, and the frontend will validate that they
/// checksum the same. For right now, you can assume they have the same MDC, because
/// they aren't allowed to change it.
///
/// The task will run until the provided cancellation token is cancelled.
pub fn start_runtime_config_polling_task(
metrics: Arc<Self>,
manager: Arc<crate::discovery::ModelManager>,
etcd_client: Option<dynamo_runtime::transports::etcd::Client>,
poll_interval: Duration,
cancel_token: tokio_util::sync::CancellationToken,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
let mut interval = tokio::time::interval(poll_interval);
let mut known_models = std::collections::HashSet::new();
tracing::info!(
interval_secs = poll_interval.as_secs(),
"Starting runtime config metrics polling task (metrics never removed)"
);
loop {
tokio::select! {
_ = cancel_token.cancelled() => {
tracing::info!("Runtime config metrics polling task cancelled");
break;
}
_ = interval.tick() => {
// Continue with polling logic
}
}
// Get current model entries from the manager
let current_entries = manager.get_model_entries();
let mut current_models = std::collections::HashSet::new();
// Note: If multiple model instances have the same name, only the first instance's config metrics are recorded.
// Subsequent instances with duplicate names will be skipped for config updates.
// This is based on the assumption that all workers serving the same model have identical
// configuration values (MDC content, runtime config, etc.). This assumption holds because
// workers are not allowed to change their configuration after registration.
// Update configuration metrics for current models
for entry in current_entries {
// Skip config processing if we've already seen this model name
if !current_models.insert(entry.name.clone()) {
tracing::debug!(
model_name = %entry.name,
endpoint = ?entry.endpoint_id,
"Skipping duplicate model instance - only first instance config metrics are recorded"
);
continue;
}
// Update runtime config metrics if available
if let Some(runtime_config) = &entry.runtime_config {
metrics.update_runtime_config_metrics(&entry.name, runtime_config);
}
// Optionally load MDC for additional metrics if etcd is available
if let Some(ref etcd) = etcd_client
&& let Err(e) = metrics
.update_metrics_from_model_entry_with_mdc(&entry, etcd)
.await
{
tracing::debug!(
model = %entry.name,
error = %e,
"Failed to update MDC metrics (this is normal if MDC is not available)"
);
}
}
// Update our known models set
known_models.extend(current_models.iter().cloned());
tracing::trace!(
active_models = current_models.len(),
total_known_models = known_models.len(),
"Updated runtime config metrics for active models"
);
}
})
}
/// Create a new [`InflightGuard`] for the given model and annotate if its a streaming request, /// Create a new [`InflightGuard`] for the given model and annotate if its a streaming request,
/// and the kind of endpoint that was hit /// and the kind of endpoint that was hit
/// ///
......
...@@ -133,9 +133,6 @@ pub struct HttpService { ...@@ -133,9 +133,6 @@ pub struct HttpService {
tls_cert_path: Option<PathBuf>, tls_cert_path: Option<PathBuf>,
tls_key_path: Option<PathBuf>, tls_key_path: Option<PathBuf>,
route_docs: Vec<RouteDoc>, route_docs: Vec<RouteDoc>,
// Metrics polling configuration
etcd_client: Option<dynamo_runtime::transports::etcd::Client>,
} }
#[derive(Clone, Builder)] #[derive(Clone, Builder)]
...@@ -204,22 +201,6 @@ impl HttpService { ...@@ -204,22 +201,6 @@ impl HttpService {
let protocol = if self.enable_tls { "HTTPS" } else { "HTTP" }; let protocol = if self.enable_tls { "HTTPS" } else { "HTTP" };
tracing::info!(protocol, address, "Starting HTTP(S) service"); tracing::info!(protocol, address, "Starting HTTP(S) service");
// Start background task to poll runtime config metrics with proper cancellation
let poll_interval_secs = std::env::var("DYN_HTTP_SVC_CONFIG_METRICS_POLL_INTERVAL_SECS")
.ok()
.and_then(|s| s.parse::<f64>().ok())
.filter(|&secs| secs > 0.0) // Guard against zero or negative values
.unwrap_or(8.0);
let poll_interval = Duration::from_secs_f64(poll_interval_secs);
let _polling_task = super::metrics::Metrics::start_runtime_config_polling_task(
self.state.metrics_clone(),
self.state.manager_clone(),
self.etcd_client.clone(),
poll_interval,
cancel_token.child_token(),
);
let router = self.router.clone(); let router = self.router.clone();
let observer = cancel_token.child_token(); let observer = cancel_token.child_token();
...@@ -313,8 +294,8 @@ impl HttpServiceConfigBuilder { ...@@ -313,8 +294,8 @@ impl HttpServiceConfigBuilder {
let config: HttpServiceConfig = self.build_internal()?; let config: HttpServiceConfig = self.build_internal()?;
let model_manager = Arc::new(ModelManager::new()); let model_manager = Arc::new(ModelManager::new());
let etcd_client = config.etcd_client.clone(); let etcd_client = config.etcd_client;
let state = Arc::new(State::new_with_etcd(model_manager, config.etcd_client)); let state = Arc::new(State::new_with_etcd(model_manager, etcd_client));
state state
.flags .flags
...@@ -366,7 +347,6 @@ impl HttpServiceConfigBuilder { ...@@ -366,7 +347,6 @@ impl HttpServiceConfigBuilder {
tls_cert_path: config.tls_cert_path, tls_cert_path: config.tls_cert_path,
tls_key_path: config.tls_key_path, tls_key_path: config.tls_key_path,
route_docs: all_docs, route_docs: all_docs,
etcd_client,
}) })
} }
......
...@@ -293,19 +293,18 @@ async fn test_metrics_with_mock_model() { ...@@ -293,19 +293,18 @@ async fn test_metrics_with_mock_model() {
mod integration_tests { mod integration_tests {
use super::*; use super::*;
use dynamo_llm::{ use dynamo_llm::{
discovery::ModelEntry, engines::make_echo_engine, entrypoint::EngineConfig, discovery::{ModelEntry, ModelWatcher},
engines::make_echo_engine,
entrypoint::EngineConfig,
local_model::LocalModelBuilder, local_model::LocalModelBuilder,
}; };
use dynamo_runtime::DistributedRuntime; use dynamo_runtime::DistributedRuntime;
use std::sync::Arc;
#[tokio::test] #[tokio::test]
#[ignore = "Requires etcd and distributed runtime"] #[ignore = "Requires etcd and distributed runtime"]
async fn test_metrics_with_mdc_registration() { async fn test_metrics_with_mdc_registration() {
// Integration test for metrics collection with full MDC registration (like real model servers) // Integration test for metrics collection with full MDC registration (like real model servers)
temp_env::async_with_vars([
(METRICS_PREFIX_ENV, None::<&str>),
("DYN_HTTP_SVC_CONFIG_METRICS_POLL_INTERVAL_SECS", Some("0.6")), // Fast polling for tests (600ms)
], async {
let port = get_random_port().await; let port = get_random_port().await;
// Create distributed runtime (required for MDC registration) // Create distributed runtime (required for MDC registration)
...@@ -331,13 +330,12 @@ mod integration_tests { ...@@ -331,13 +330,12 @@ mod integration_tests {
let service = HttpService::builder() let service = HttpService::builder()
.port(port) .port(port)
.enable_chat_endpoints(true) .enable_chat_endpoints(true)
.with_etcd_client(distributed_runtime.etcd_client())
.build() .build()
.unwrap(); .unwrap();
// Set up model watcher to discover models from etcd (like production) // Set up model watcher to discover models from etcd (like production)
// This is crucial for the polling task to find model entries // This is crucial for the polling task to find model entries
use dynamo_llm::discovery::{ModelWatcher, MODEL_ROOT_PATH}; use dynamo_llm::discovery::{MODEL_ROOT_PATH, ModelWatcher};
use dynamo_runtime::pipeline::RouterMode; use dynamo_runtime::pipeline::RouterMode;
let model_watcher = ModelWatcher::new( let model_watcher = ModelWatcher::new(
...@@ -350,14 +348,16 @@ mod integration_tests { ...@@ -350,14 +348,16 @@ mod integration_tests {
// Start watching etcd for model registrations // Start watching etcd for model registrations
if let Some(etcd_client) = distributed_runtime.etcd_client() { if let Some(etcd_client) = distributed_runtime.etcd_client() {
let models_watcher = etcd_client.kv_get_and_watch_prefix(MODEL_ROOT_PATH).await.unwrap(); let models_watcher = etcd_client
.kv_get_and_watch_prefix(MODEL_ROOT_PATH)
.await
.unwrap();
let (_prefix, _watcher, receiver) = models_watcher.dissolve(); let (_prefix, _watcher, receiver) = models_watcher.dissolve();
// Spawn watcher task to discover models from etcd // Spawn watcher task to discover models from etcd
let _watcher_task = tokio::spawn(async move { let _watcher_task = tokio::spawn(async move {
model_watcher.watch(receiver, None).await; model_watcher.watch(receiver, None).await;
}); });
} }
// Set up the engine following the StaticFull pattern from http.rs // Set up the engine following the StaticFull pattern from http.rs
...@@ -387,7 +387,6 @@ mod integration_tests { ...@@ -387,7 +387,6 @@ mod integration_tests {
.await .await
.unwrap(); .unwrap();
// Start the HTTP service // Start the HTTP service
let token = CancellationToken::new(); let token = CancellationToken::new();
let cancel_token = token.clone(); let cancel_token = token.clone();
...@@ -397,35 +396,15 @@ mod integration_tests { ...@@ -397,35 +396,15 @@ mod integration_tests {
// Wait for service to be ready // Wait for service to be ready
wait_for_metrics_ready(port).await; wait_for_metrics_ready(port).await;
// Wait for MDC registration to complete by checking if the model appears
// This simulates the real polling that happens in production
let start = tokio::time::Instant::now();
let timeout = Duration::from_secs(10);
loop {
if start.elapsed() > timeout {
break; // Continue with test even if MDC metrics aren't ready
}
// Check if our model is registered in the manager (indicates MDC registration completed)
let model_service_name = model.service_name();
if manager.has_model_any(model_service_name) {
tracing::info!("MDC registration completed for {}", model_service_name);
break;
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
// Give a bit more time for background metrics collection // Give a bit more time for background metrics collection
tokio::time::sleep(Duration::from_millis(200)).await; tokio::time::sleep(Duration::from_secs(5)).await;
let client = reqwest::Client::new(); let client = reqwest::Client::new();
// Create a chat completion request // Create a chat completion request
let message = dynamo_async_openai::types::ChatCompletionRequestMessage::User( let message = dynamo_async_openai::types::ChatCompletionRequestMessage::User(
dynamo_async_openai::types::ChatCompletionRequestUserMessage { dynamo_async_openai::types::ChatCompletionRequestUserMessage {
content: content: dynamo_async_openai::types::ChatCompletionRequestUserMessageContent::Text(
dynamo_async_openai::types::ChatCompletionRequestUserMessageContent::Text(
"Hello, MDC model!".to_string(), "Hello, MDC model!".to_string(),
), ),
name: None, name: None,
...@@ -457,8 +436,8 @@ mod integration_tests { ...@@ -457,8 +436,8 @@ mod integration_tests {
// Consume the response stream to complete the request // Consume the response stream to complete the request
let _response_body = response.bytes().await.unwrap(); let _response_body = response.bytes().await.unwrap();
// Wait for the fast polling interval (600ms) for MDC metrics // Wait for the fast polling interval (50ms) for MDC metrics
tokio::time::sleep(Duration::from_millis(5)).await; tokio::time::sleep(Duration::from_millis(50)).await;
// Fetch and verify metrics // Fetch and verify metrics
let metrics_response = client let metrics_response = client
...@@ -485,8 +464,11 @@ mod integration_tests { ...@@ -485,8 +464,11 @@ mod integration_tests {
// Assert MDC-based model configuration metrics are present // Assert MDC-based model configuration metrics are present
// These MUST be present for the test to pass // These MUST be present for the test to pass
assert!(metrics_body.contains("dynamo_frontend_model_context_length"), assert!(
"MDC metrics not found! Metrics body: {}", metrics_body); metrics_body.contains("dynamo_frontend_model_context_length"),
"MDC metrics not found! Metrics body: {}",
metrics_body
);
assert!(metrics_body.contains("dynamo_frontend_model_kv_cache_block_size")); assert!(metrics_body.contains("dynamo_frontend_model_kv_cache_block_size"));
assert!(metrics_body.contains("dynamo_frontend_model_migration_limit")); assert!(metrics_body.contains("dynamo_frontend_model_migration_limit"));
...@@ -497,7 +479,6 @@ mod integration_tests { ...@@ -497,7 +479,6 @@ mod integration_tests {
// - dynamo_frontend_model_max_num_seqs (requires actual batching config from real engines) // - dynamo_frontend_model_max_num_seqs (requires actual batching config from real engines)
// - dynamo_frontend_model_max_num_batched_tokens (requires actual batching config from real engines) // - dynamo_frontend_model_max_num_batched_tokens (requires actual batching config from real engines)
// Verify specific request counter incremented // Verify specific request counter incremented
assert!(metrics_body.contains("endpoint=\"chat_completions\"")); assert!(metrics_body.contains("endpoint=\"chat_completions\""));
assert!(metrics_body.contains("request_type=\"stream\"")); assert!(metrics_body.contains("request_type=\"stream\""));
...@@ -505,66 +486,70 @@ mod integration_tests { ...@@ -505,66 +486,70 @@ mod integration_tests {
// Now test the complete lifecycle: remove the model from etcd // Now test the complete lifecycle: remove the model from etcd
// Get all model entries to find the one we need to delete // Remove the model using the cleaner ModelWatcher approach
if let Some(etcd_client) = distributed_runtime.etcd_client() { if let Some(etcd_client) = distributed_runtime.etcd_client() {
let kvs = etcd_client.kv_get_prefix("models").await.unwrap(); // Use ModelWatcher to find and remove the model (following ModelWatcher::handle_delete pattern)
let watcher = ModelWatcher::new(
// Find our model's etcd key distributed_runtime.clone(),
let mut model_key_to_delete = None; service.state().manager_clone(),
for kv in kvs { RouterMode::RoundRobin,
if let Ok(model_entry) = serde_json::from_slice::<ModelEntry>(kv.value()) None,
&& model_entry.name == "test-mdc-model" None,
{ );
model_key_to_delete = Some(kv.key_str().unwrap().to_string());
break;
}
}
if let Some(key) = model_key_to_delete {
etcd_client.kv_delete(key.as_str(), None).await.unwrap();
// Poll every 80ms for up to 2 seconds to check when worker count drops to 0
let start_time = tokio::time::Instant::now(); // Get all model entries for our test model
let timeout = Duration::from_millis(2000); let model_entries = watcher.entries_for_model("test-mdc-model").await.unwrap();
let mut worker_count_dropped = false;
while start_time.elapsed() < timeout { if !model_entries.is_empty() {
// Check if the model was removed from the manager // For each model entry, we need to find its etcd key and remove it
let has_model = manager.has_model_any(model.service_name()); // This follows the same pattern as ModelWatcher::handle_delete
for model_entry in model_entries {
// Find the etcd key for this specific model entry
// etcd keys follow pattern: "models/{UUID}"
// Example: "models/11dff335-316d-4c9f-8229-88ad8e8dac5e"
let kvs = etcd_client.kv_get_prefix("models").await.unwrap();
// Fetch current metrics // Find the key by matching ModelEntry JSON structure:
let metrics_response = client // {
.get(format!("http://localhost:{}/metrics", port)) // "name": "test-mdc-model",
.send() // "endpoint": { "namespace": "...", "component": "...", "name": "..." },
.await // "model_type": "Chat",
.unwrap(); // "runtime_config": { ... },
// "model_input": "Text"
// }
let key = kvs
.iter()
.find(|kv| {
serde_json::from_slice::<ModelEntry>(kv.value())
.map(|entry| {
entry.name == model_entry.name
&& entry.endpoint_id == model_entry.endpoint_id
})
.unwrap_or(false)
})
.map(|kv| kv.key_str().unwrap().to_string());
if metrics_response.status().is_success() { if let Some(key) = key {
// Remove from ModelManager first (this returns the ModelEntry)
if let Some(_removed_entry) = manager.remove_model_entry(&key) {
// Remove engines (following ModelWatcher::handle_delete pattern)
manager
.remove_chat_completions_model(&model_entry.name)
.ok();
manager.remove_completions_model(&model_entry.name).ok();
manager.remove_embeddings_model(&model_entry.name).ok();
manager.remove_tensor_model(&model_entry.name).ok();
// Since model_workers metric was removed, just check if model is gone from manager // Then delete from etcd
if !has_model { etcd_client.kv_delete(key.as_str(), None).await.unwrap();
worker_count_dropped = true;
break;
} }
} }
tokio::time::sleep(Duration::from_millis(80)).await;
} }
// Assert that model was removed from manager
assert!(worker_count_dropped,
"Model should be removed from manager after etcd removal and polling cycles");
} else {
} }
} }
// Clean up // Clean up
cancel_token.cancel(); cancel_token.cancel();
task.await.unwrap().unwrap(); task.await.unwrap().unwrap();
})
.await;
} }
} }
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment