Unverified Commit 11110303 authored by Simo Lin's avatar Simo Lin Committed by GitHub
Browse files

[router] clean up workflow logs to debug for implementation details logs (#11886)

parent 28ddfb37
...@@ -139,7 +139,7 @@ impl JobQueue { ...@@ -139,7 +139,7 @@ impl JobQueue {
pub fn new(config: JobQueueConfig, context: Weak<AppContext>) -> Arc<Self> { pub fn new(config: JobQueueConfig, context: Weak<AppContext>) -> Arc<Self> {
let (tx, rx) = mpsc::channel(config.queue_capacity); let (tx, rx) = mpsc::channel(config.queue_capacity);
info!( debug!(
"Initializing worker job queue: capacity={}, workers={}", "Initializing worker job queue: capacity={}, workers={}",
config.queue_capacity, config.worker_count config.queue_capacity, config.worker_count
); );
...@@ -194,7 +194,7 @@ impl JobQueue { ...@@ -194,7 +194,7 @@ impl JobQueue {
Ok(_) => { Ok(_) => {
let queue_depth = self.tx.max_capacity() - self.tx.capacity(); let queue_depth = self.tx.max_capacity() - self.tx.capacity();
RouterMetrics::set_job_queue_depth(queue_depth); RouterMetrics::set_job_queue_depth(queue_depth);
info!( debug!(
"Job submitted: type={}, worker={}, queue_depth={}", "Job submitted: type={}, worker={}, queue_depth={}",
job_type, worker_url, queue_depth job_type, worker_url, queue_depth
); );
...@@ -225,7 +225,7 @@ impl JobQueue { ...@@ -225,7 +225,7 @@ impl JobQueue {
context: Weak<AppContext>, context: Weak<AppContext>,
status_map: Arc<DashMap<String, JobStatus>>, status_map: Arc<DashMap<String, JobStatus>>,
) { ) {
info!("Worker job queue worker {} started", worker_id); debug!("Worker job queue worker {} started", worker_id);
loop { loop {
// Lock the receiver and try to receive a job // Lock the receiver and try to receive a job
...@@ -246,7 +246,7 @@ impl JobQueue { ...@@ -246,7 +246,7 @@ impl JobQueue {
JobStatus::processing(&job_type, &worker_url), JobStatus::processing(&job_type, &worker_url),
); );
info!( debug!(
"Worker {} processing job: type={}, worker={}", "Worker {} processing job: type={}, worker={}",
worker_id, job_type, worker_url worker_id, job_type, worker_url
); );
...@@ -289,7 +289,7 @@ impl JobQueue { ...@@ -289,7 +289,7 @@ impl JobQueue {
} }
} }
warn!("Worker job queue worker {} stopped", worker_id); debug!("Worker job queue worker {} stopped", worker_id);
} }
/// Execute a specific job /// Execute a specific job
...@@ -303,7 +303,7 @@ impl JobQueue { ...@@ -303,7 +303,7 @@ impl JobQueue {
let instance_id = Self::start_worker_workflow(engine, config, context).await?; let instance_id = Self::start_worker_workflow(engine, config, context).await?;
info!( debug!(
"Started worker registration workflow for {} (instance: {})", "Started worker registration workflow for {} (instance: {})",
config.url, instance_id config.url, instance_id
); );
...@@ -357,7 +357,7 @@ impl JobQueue { ...@@ -357,7 +357,7 @@ impl JobQueue {
} }
}; };
info!( debug!(
"Creating AddWorker jobs for {} workers from config", "Creating AddWorker jobs for {} workers from config",
workers.len() workers.len()
); );
...@@ -501,7 +501,7 @@ impl JobQueue { ...@@ -501,7 +501,7 @@ impl JobQueue {
Ok(message) => { Ok(message) => {
RouterMetrics::record_job_success(job_type); RouterMetrics::record_job_success(job_type);
status_map.remove(worker_url); status_map.remove(worker_url);
info!( debug!(
"Worker {} completed job: type={}, worker={}, duration={:.3}s, result={}", "Worker {} completed job: type={}, worker={}, duration={:.3}s, result={}",
worker_id, worker_id,
job_type, job_type,
......
...@@ -17,7 +17,7 @@ use async_trait::async_trait; ...@@ -17,7 +17,7 @@ use async_trait::async_trait;
use once_cell::sync::Lazy; use once_cell::sync::Lazy;
use reqwest::Client; use reqwest::Client;
use serde_json::Value; use serde_json::Value;
use tracing::{info, warn}; use tracing::{debug, info, warn};
use crate::{ use crate::{
core::{ core::{
...@@ -202,7 +202,7 @@ impl StepExecutor for DetectConnectionModeStep { ...@@ -202,7 +202,7 @@ impl StepExecutor for DetectConnectionModeStep {
.get("worker_config") .get("worker_config")
.ok_or_else(|| WorkflowError::ContextValueNotFound("worker_config".to_string()))?; .ok_or_else(|| WorkflowError::ContextValueNotFound("worker_config".to_string()))?;
info!( debug!(
"Detecting connection mode for {} (timeout: {}s, max_attempts: {})", "Detecting connection mode for {} (timeout: {}s, max_attempts: {})",
config.url, config.health_check_timeout_secs, config.max_connection_attempts config.url, config.health_check_timeout_secs, config.max_connection_attempts
); );
...@@ -217,11 +217,11 @@ impl StepExecutor for DetectConnectionModeStep { ...@@ -217,11 +217,11 @@ impl StepExecutor for DetectConnectionModeStep {
let connection_mode = match (http_result, grpc_result) { let connection_mode = match (http_result, grpc_result) {
(Ok(_), _) => { (Ok(_), _) => {
info!("{} detected as HTTP", config.url); debug!("{} detected as HTTP", config.url);
ConnectionMode::Http ConnectionMode::Http
} }
(_, Ok(_)) => { (_, Ok(_)) => {
info!("{} detected as gRPC", config.url); debug!("{} detected as gRPC", config.url);
ConnectionMode::Grpc { port: None } ConnectionMode::Grpc { port: None }
} }
(Err(http_err), Err(grpc_err)) => { (Err(http_err), Err(grpc_err)) => {
...@@ -259,7 +259,7 @@ impl StepExecutor for DiscoverMetadataStep { ...@@ -259,7 +259,7 @@ impl StepExecutor for DiscoverMetadataStep {
.get("connection_mode") .get("connection_mode")
.ok_or_else(|| WorkflowError::ContextValueNotFound("connection_mode".to_string()))?; .ok_or_else(|| WorkflowError::ContextValueNotFound("connection_mode".to_string()))?;
info!( debug!(
"Discovering metadata for {} ({:?})", "Discovering metadata for {} ({:?})",
config.url, *connection_mode config.url, *connection_mode
); );
...@@ -275,7 +275,7 @@ impl StepExecutor for DiscoverMetadataStep { ...@@ -275,7 +275,7 @@ impl StepExecutor for DiscoverMetadataStep {
HashMap::new() HashMap::new()
}); });
info!( debug!(
"Discovered {} metadata labels for {}", "Discovered {} metadata labels for {}",
discovered_labels.len(), discovered_labels.len(),
config.url config.url
...@@ -304,14 +304,14 @@ impl StepExecutor for DiscoverDPInfoStep { ...@@ -304,14 +304,14 @@ impl StepExecutor for DiscoverDPInfoStep {
// Skip DP discovery if not DP-aware // Skip DP discovery if not DP-aware
if !config.dp_aware { if !config.dp_aware {
info!( debug!(
"Worker {} is not DP-aware, skipping DP discovery", "Worker {} is not DP-aware, skipping DP discovery",
config.url config.url
); );
return Ok(StepResult::Success); return Ok(StepResult::Success);
} }
info!("Discovering DP info for {} (DP-aware)", config.url); debug!("Discovering DP info for {} (DP-aware)", config.url);
// Get DP info from worker // Get DP info from worker
let dp_info = WorkerManager::get_dp_info(&config.url, config.api_key.as_deref()) let dp_info = WorkerManager::get_dp_info(&config.url, config.api_key.as_deref())
...@@ -321,7 +321,7 @@ impl StepExecutor for DiscoverDPInfoStep { ...@@ -321,7 +321,7 @@ impl StepExecutor for DiscoverDPInfoStep {
message: format!("Failed to get DP info: {}", e), message: format!("Failed to get DP info: {}", e),
})?; })?;
info!( debug!(
"Discovered DP size {} for {} (model: {})", "Discovered DP size {} for {} (model: {})",
dp_info.dp_size, config.url, dp_info.model_id dp_info.dp_size, config.url, dp_info.model_id
); );
...@@ -406,12 +406,12 @@ impl StepExecutor for CreateWorkerStep { ...@@ -406,12 +406,12 @@ impl StepExecutor for CreateWorkerStep {
.cloned(); .cloned();
if let Some(model_id) = derived_model_id { if let Some(model_id) = derived_model_id {
info!("Derived model_id from metadata: {}", model_id); debug!("Derived model_id from metadata: {}", model_id);
final_labels.insert("model_id".to_string(), model_id); final_labels.insert("model_id".to_string(), model_id);
} }
} }
info!( debug!(
"Creating worker {} with {} discovered + {} config = {} final labels", "Creating worker {} with {} discovered + {} config = {} final labels",
config.url, config.url,
discovered_labels.len(), discovered_labels.len(),
...@@ -471,7 +471,7 @@ impl StepExecutor for CreateWorkerStep { ...@@ -471,7 +471,7 @@ impl StepExecutor for CreateWorkerStep {
}; };
if normalized_url != config.url { if normalized_url != config.url {
info!( debug!(
"Normalized worker URL: {} -> {} ({:?})", "Normalized worker URL: {} -> {} ({:?})",
config.url, config.url,
normalized_url, normalized_url,
...@@ -486,7 +486,7 @@ impl StepExecutor for CreateWorkerStep { ...@@ -486,7 +486,7 @@ impl StepExecutor for CreateWorkerStep {
.get("dp_info") .get("dp_info")
.ok_or_else(|| WorkflowError::ContextValueNotFound("dp_info".to_string()))?; .ok_or_else(|| WorkflowError::ContextValueNotFound("dp_info".to_string()))?;
info!( debug!(
"Creating {} DP-aware workers for {} (dp_size: {})", "Creating {} DP-aware workers for {} (dp_size: {})",
dp_info.dp_size, config.url, dp_info.dp_size dp_info.dp_size, config.url, dp_info.dp_size
); );
...@@ -512,7 +512,7 @@ impl StepExecutor for CreateWorkerStep { ...@@ -512,7 +512,7 @@ impl StepExecutor for CreateWorkerStep {
worker.set_healthy(false); worker.set_healthy(false);
workers.push(worker); workers.push(worker);
info!( debug!(
"Created DP-aware worker {}@{}/{} ({:?})", "Created DP-aware worker {}@{}/{} ({:?})",
config.url, config.url,
rank, rank,
...@@ -545,7 +545,7 @@ impl StepExecutor for CreateWorkerStep { ...@@ -545,7 +545,7 @@ impl StepExecutor for CreateWorkerStep {
let worker = Arc::new(builder.build()) as Arc<dyn Worker>; let worker = Arc::new(builder.build()) as Arc<dyn Worker>;
worker.set_healthy(false); worker.set_healthy(false);
info!( debug!(
"Created worker object for {} ({:?}) with {} labels", "Created worker object for {} ({:?}) with {} labels",
config.url, config.url,
connection_mode.as_ref(), connection_mode.as_ref(),
...@@ -589,7 +589,7 @@ impl StepExecutor for RegisterWorkerStep { ...@@ -589,7 +589,7 @@ impl StepExecutor for RegisterWorkerStep {
for worker in workers.iter() { for worker in workers.iter() {
let worker_id = app_context.worker_registry.register(Arc::clone(worker)); let worker_id = app_context.worker_registry.register(Arc::clone(worker));
worker_ids.push(worker_id.clone()); worker_ids.push(worker_id.clone());
info!( debug!(
"Registered DP-aware worker {} with ID {:?}", "Registered DP-aware worker {} with ID {:?}",
config.url, worker_id config.url, worker_id
); );
...@@ -607,7 +607,7 @@ impl StepExecutor for RegisterWorkerStep { ...@@ -607,7 +607,7 @@ impl StepExecutor for RegisterWorkerStep {
.worker_registry .worker_registry
.register(Arc::clone(worker.as_ref())); .register(Arc::clone(worker.as_ref()));
info!("Registered worker {} with ID {:?}", config.url, worker_id); debug!("Registered worker {} with ID {:?}", config.url, worker_id);
context.set("worker_id", worker_id); context.set("worker_id", worker_id);
Ok(StepResult::Success) Ok(StepResult::Success)
...@@ -664,7 +664,7 @@ impl StepExecutor for UpdatePoliciesStep { ...@@ -664,7 +664,7 @@ impl StepExecutor for UpdatePoliciesStep {
} }
} }
info!( debug!(
"Updated policies for {} DP-aware workers {} (model: {})", "Updated policies for {} DP-aware workers {} (model: {})",
workers.len(), workers.len(),
config.url, config.url,
...@@ -693,7 +693,7 @@ impl StepExecutor for UpdatePoliciesStep { ...@@ -693,7 +693,7 @@ impl StepExecutor for UpdatePoliciesStep {
} }
} }
info!( debug!(
"Updated policies for worker {} (model: {})", "Updated policies for worker {} (model: {})",
config.url, model_id config.url, model_id
); );
...@@ -728,7 +728,7 @@ impl StepExecutor for ActivateWorkerStep { ...@@ -728,7 +728,7 @@ impl StepExecutor for ActivateWorkerStep {
worker.set_healthy(true); worker.set_healthy(true);
} }
info!( debug!(
"Activated {} DP-aware workers {} (marked as healthy)", "Activated {} DP-aware workers {} (marked as healthy)",
workers.len(), workers.len(),
config.url config.url
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment