"git@developer.sourcefind.cn:change/sglang.git" did not exist on "9effeb5bddf2e58fb35f274f0c03162b079db781"
Unverified Commit 36bfddec authored by Tony Lu's avatar Tony Lu Committed by GitHub
Browse files

[router] add metrics for worker and policy (#8971)


Signed-off-by: default avatarTony Lu <tonyluj@gmail.com>
parent 91e2f902
use super::{CircuitBreaker, CircuitBreakerConfig, WorkerError, WorkerResult}; use super::{CircuitBreaker, CircuitBreakerConfig, WorkerError, WorkerResult};
use crate::metrics::RouterMetrics;
use async_trait::async_trait; use async_trait::async_trait;
use futures; use futures;
use serde_json; use serde_json;
...@@ -259,6 +260,7 @@ impl Worker for BasicWorker { ...@@ -259,6 +260,7 @@ impl Worker for BasicWorker {
fn set_healthy(&self, healthy: bool) { fn set_healthy(&self, healthy: bool) {
self.healthy.store(healthy, Ordering::Release); self.healthy.store(healthy, Ordering::Release);
RouterMetrics::set_worker_health(self.url(), healthy);
} }
async fn check_health_async(&self) -> WorkerResult<()> { async fn check_health_async(&self) -> WorkerResult<()> {
......
...@@ -181,6 +181,7 @@ impl LoadBalancingPolicy for CacheAwarePolicy { ...@@ -181,6 +181,7 @@ impl LoadBalancingPolicy for CacheAwarePolicy {
// Increment processed counter // Increment processed counter
workers[min_load_idx].increment_processed(); workers[min_load_idx].increment_processed();
RouterMetrics::record_processed_request(workers[min_load_idx].url()); RouterMetrics::record_processed_request(workers[min_load_idx].url());
RouterMetrics::record_policy_decision(self.name(), workers[min_load_idx].url());
return Some(min_load_idx); return Some(min_load_idx);
} }
......
...@@ -90,6 +90,7 @@ impl LoadBalancingPolicy for PowerOfTwoPolicy { ...@@ -90,6 +90,7 @@ impl LoadBalancingPolicy for PowerOfTwoPolicy {
// Increment processed counter // Increment processed counter
workers[selected_idx].increment_processed(); workers[selected_idx].increment_processed();
RouterMetrics::record_processed_request(workers[selected_idx].url()); RouterMetrics::record_processed_request(workers[selected_idx].url());
RouterMetrics::record_policy_decision(self.name(), workers[selected_idx].url());
Some(selected_idx) Some(selected_idx)
} }
......
...@@ -2,6 +2,7 @@ ...@@ -2,6 +2,7 @@
use super::{get_healthy_worker_indices, LoadBalancingPolicy}; use super::{get_healthy_worker_indices, LoadBalancingPolicy};
use crate::core::Worker; use crate::core::Worker;
use crate::metrics::RouterMetrics;
use rand::Rng; use rand::Rng;
/// Random selection policy /// Random selection policy
...@@ -30,6 +31,10 @@ impl LoadBalancingPolicy for RandomPolicy { ...@@ -30,6 +31,10 @@ impl LoadBalancingPolicy for RandomPolicy {
let mut rng = rand::thread_rng(); let mut rng = rand::thread_rng();
let random_idx = rng.gen_range(0..healthy_indices.len()); let random_idx = rng.gen_range(0..healthy_indices.len());
let worker = workers[healthy_indices[random_idx]].url();
RouterMetrics::record_processed_request(worker);
RouterMetrics::record_policy_decision(self.name(), worker);
Some(healthy_indices[random_idx]) Some(healthy_indices[random_idx])
} }
......
...@@ -2,6 +2,7 @@ ...@@ -2,6 +2,7 @@
use super::{get_healthy_worker_indices, LoadBalancingPolicy}; use super::{get_healthy_worker_indices, LoadBalancingPolicy};
use crate::core::Worker; use crate::core::Worker;
use crate::metrics::RouterMetrics;
use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::atomic::{AtomicUsize, Ordering};
/// Round-robin selection policy /// Round-robin selection policy
...@@ -35,7 +36,10 @@ impl LoadBalancingPolicy for RoundRobinPolicy { ...@@ -35,7 +36,10 @@ impl LoadBalancingPolicy for RoundRobinPolicy {
// Get and increment counter atomically // Get and increment counter atomically
let count = self.counter.fetch_add(1, Ordering::Relaxed); let count = self.counter.fetch_add(1, Ordering::Relaxed);
let selected_idx = count % healthy_indices.len(); let selected_idx = count % healthy_indices.len();
let worker = workers[healthy_indices[selected_idx]].url();
RouterMetrics::record_processed_request(worker);
RouterMetrics::record_policy_decision(self.name(), worker);
Some(healthy_indices[selected_idx]) Some(healthy_indices[selected_idx])
} }
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment