Unverified Commit 1a399799 authored by Chao Yang's avatar Chao Yang Committed by GitHub
Browse files

Sgl-router Prometheus metrics endpoint and usage track metrics (#6537)

parent 022012aa
...@@ -30,6 +30,9 @@ tracing-appender = "0.2.3" ...@@ -30,6 +30,9 @@ tracing-appender = "0.2.3"
kube = { version = "0.88.1", features = ["runtime", "derive"] } kube = { version = "0.88.1", features = ["runtime", "derive"] }
k8s-openapi = { version = "0.21.0", features = ["v1_29"] } k8s-openapi = { version = "0.21.0", features = ["v1_29"] }
futures = "0.3" futures = "0.3"
# Added for metrics
metrics = "0.24.2"
metrics-exporter-prometheus = "0.17.0"
[profile.release] [profile.release]
lto = "thin" lto = "thin"
codegen-units = 1 codegen-units = 1
...@@ -81,6 +81,18 @@ router = Router( ...@@ -81,6 +81,18 @@ router = Router(
Use the `--verbose` flag with the CLI for more detailed logs. Use the `--verbose` flag with the CLI for more detailed logs.
### Metrics
SGL Router exposes a Prometheus HTTP scrape endpoint for monitoring, which by default listens at 127.0.0.1:29000.
To change the endpoint to listen on all network interfaces and set the port to 9000, configure the following options when launching the router:
```
python -m sglang_router.launch_router \
--worker-urls http://localhost:8080 http://localhost:8081 \
--prometheus-host 0.0.0.0 \
--prometheus-port 9000
```
### Kubernetes Service Discovery ### Kubernetes Service Discovery
SGL Router supports automatic service discovery for worker nodes in Kubernetes environments. When enabled, the router will automatically: SGL Router supports automatic service discovery for worker nodes in Kubernetes environments. When enabled, the router will automatically:
......
...@@ -48,6 +48,9 @@ class RouterArgs: ...@@ -48,6 +48,9 @@ class RouterArgs:
selector: Dict[str, str] = dataclasses.field(default_factory=dict) selector: Dict[str, str] = dataclasses.field(default_factory=dict)
service_discovery_port: int = 80 service_discovery_port: int = 80
service_discovery_namespace: Optional[str] = None service_discovery_namespace: Optional[str] = None
# Prometheus configuration
prometheus_port: Optional[int] = None
prometheus_host: Optional[str] = None
@staticmethod @staticmethod
def add_cli_args( def add_cli_args(
...@@ -176,6 +179,19 @@ class RouterArgs: ...@@ -176,6 +179,19 @@ class RouterArgs:
type=str, type=str,
help="Kubernetes namespace to watch for pods. If not provided, watches all namespaces (requires cluster-wide permissions)", help="Kubernetes namespace to watch for pods. If not provided, watches all namespaces (requires cluster-wide permissions)",
) )
# Prometheus configuration
parser.add_argument(
f"--{prefix}prometheus-port",
type=int,
default=29000,
help="Port to expose Prometheus metrics. If not specified, Prometheus metrics are disabled",
)
parser.add_argument(
f"--{prefix}prometheus-host",
type=str,
default="127.0.0.1",
help="Host address to bind the Prometheus metrics server",
)
@classmethod @classmethod
def from_cli_args( def from_cli_args(
...@@ -215,6 +231,8 @@ class RouterArgs: ...@@ -215,6 +231,8 @@ class RouterArgs:
service_discovery_namespace=getattr( service_discovery_namespace=getattr(
args, f"{prefix}service_discovery_namespace", None args, f"{prefix}service_discovery_namespace", None
), ),
prometheus_port=getattr(args, f"{prefix}prometheus_port", None),
prometheus_host=getattr(args, f"{prefix}prometheus_host", None),
) )
@staticmethod @staticmethod
...@@ -278,6 +296,8 @@ def launch_router(args: argparse.Namespace) -> Optional[Router]: ...@@ -278,6 +296,8 @@ def launch_router(args: argparse.Namespace) -> Optional[Router]:
selector=router_args.selector, selector=router_args.selector,
service_discovery_port=router_args.service_discovery_port, service_discovery_port=router_args.service_discovery_port,
service_discovery_namespace=router_args.service_discovery_namespace, service_discovery_namespace=router_args.service_discovery_namespace,
prometheus_port=router_args.prometheus_port,
prometheus_host=router_args.prometheus_host,
) )
router.start() router.start()
......
...@@ -40,6 +40,8 @@ class Router: ...@@ -40,6 +40,8 @@ class Router:
worker URLs using this port. Default: 80 worker URLs using this port. Default: 80
service_discovery_namespace: Kubernetes namespace to watch for pods. If not provided, service_discovery_namespace: Kubernetes namespace to watch for pods. If not provided,
watches pods across all namespaces (requires cluster-wide permissions). Default: None watches pods across all namespaces (requires cluster-wide permissions). Default: None
prometheus_port: Port to expose Prometheus metrics. Default: None
prometheus_host: Host address to bind the Prometheus metrics server. Default: None
""" """
def __init__( def __init__(
...@@ -62,6 +64,8 @@ class Router: ...@@ -62,6 +64,8 @@ class Router:
selector: Dict[str, str] = None, selector: Dict[str, str] = None,
service_discovery_port: int = 80, service_discovery_port: int = 80,
service_discovery_namespace: Optional[str] = None, service_discovery_namespace: Optional[str] = None,
prometheus_port: Optional[int] = None,
prometheus_host: Optional[str] = None,
): ):
if selector is None: if selector is None:
selector = {} selector = {}
...@@ -85,6 +89,8 @@ class Router: ...@@ -85,6 +89,8 @@ class Router:
selector=selector, selector=selector,
service_discovery_port=service_discovery_port, service_discovery_port=service_discovery_port,
service_discovery_namespace=service_discovery_namespace, service_discovery_namespace=service_discovery_namespace,
prometheus_port=prometheus_port,
prometheus_host=prometheus_host,
) )
def start(self) -> None: def start(self) -> None:
......
...@@ -28,6 +28,8 @@ def popen_launch_router( ...@@ -28,6 +28,8 @@ def popen_launch_router(
selector: list = None, selector: list = None,
service_discovery_port: int = 80, service_discovery_port: int = 80,
service_discovery_namespace: str = None, service_discovery_namespace: str = None,
prometheus_port: int = None,
prometheus_host: str = None,
): ):
""" """
Launch the router server process. Launch the router server process.
...@@ -45,6 +47,8 @@ def popen_launch_router( ...@@ -45,6 +47,8 @@ def popen_launch_router(
selector: List of label selectors in format ["key1=value1", "key2=value2"] selector: List of label selectors in format ["key1=value1", "key2=value2"]
service_discovery_port: Port to use for service discovery service_discovery_port: Port to use for service discovery
service_discovery_namespace: Kubernetes namespace to watch for pods. If None, watches all namespaces. service_discovery_namespace: Kubernetes namespace to watch for pods. If None, watches all namespaces.
prometheus_port: Port to expose Prometheus metrics. If None, Prometheus metrics are disabled.
prometheus_host: Host address to bind the Prometheus metrics server.
""" """
_, host, port = base_url.split(":") _, host, port = base_url.split(":")
host = host[2:] host = host[2:]
...@@ -87,6 +91,12 @@ def popen_launch_router( ...@@ -87,6 +91,12 @@ def popen_launch_router(
["--router-service-discovery-namespace", service_discovery_namespace] ["--router-service-discovery-namespace", service_discovery_namespace]
) )
if prometheus_port is not None:
command.extend(["--router-prometheus-port", str(prometheus_port)])
if prometheus_host is not None:
command.extend(["--router-prometheus-host", prometheus_host])
if log_dir is not None: if log_dir is not None:
command.extend(["--log-dir", log_dir]) command.extend(["--log-dir", log_dir])
......
use pyo3::prelude::*; use pyo3::prelude::*;
pub mod logging; pub mod logging;
use std::collections::HashMap; use std::collections::HashMap;
pub mod prometheus;
pub mod router; pub mod router;
pub mod server; pub mod server;
pub mod service_discovery; pub mod service_discovery;
pub mod tree; pub mod tree;
use crate::prometheus::PrometheusConfig;
#[pyclass(eq)] #[pyclass(eq)]
#[derive(Clone, PartialEq, Debug)] #[derive(Clone, PartialEq, Debug)]
...@@ -35,6 +37,8 @@ struct Router { ...@@ -35,6 +37,8 @@ struct Router {
selector: HashMap<String, String>, selector: HashMap<String, String>,
service_discovery_port: u16, service_discovery_port: u16,
service_discovery_namespace: Option<String>, service_discovery_namespace: Option<String>,
prometheus_port: Option<u16>,
prometheus_host: Option<String>,
} }
#[pymethods] #[pymethods]
...@@ -58,7 +62,9 @@ impl Router { ...@@ -58,7 +62,9 @@ impl Router {
service_discovery = false, service_discovery = false,
selector = HashMap::new(), selector = HashMap::new(),
service_discovery_port = 80, service_discovery_port = 80,
service_discovery_namespace = None service_discovery_namespace = None,
prometheus_port = None,
prometheus_host = None
))] ))]
fn new( fn new(
worker_urls: Vec<String>, worker_urls: Vec<String>,
...@@ -79,6 +85,8 @@ impl Router { ...@@ -79,6 +85,8 @@ impl Router {
selector: HashMap<String, String>, selector: HashMap<String, String>,
service_discovery_port: u16, service_discovery_port: u16,
service_discovery_namespace: Option<String>, service_discovery_namespace: Option<String>,
prometheus_port: Option<u16>,
prometheus_host: Option<String>,
) -> PyResult<Self> { ) -> PyResult<Self> {
Ok(Router { Ok(Router {
host, host,
...@@ -99,6 +107,8 @@ impl Router { ...@@ -99,6 +107,8 @@ impl Router {
selector, selector,
service_discovery_port, service_discovery_port,
service_discovery_namespace, service_discovery_namespace,
prometheus_port,
prometheus_host,
}) })
} }
...@@ -136,6 +146,15 @@ impl Router { ...@@ -136,6 +146,15 @@ impl Router {
None None
}; };
// Create Prometheus config if enabled
let prometheus_config = Some(PrometheusConfig {
port: self.prometheus_port.unwrap_or(29000),
host: self
.prometheus_host
.clone()
.unwrap_or_else(|| "127.0.0.1".to_string()),
});
actix_web::rt::System::new().block_on(async move { actix_web::rt::System::new().block_on(async move {
server::startup(server::ServerConfig { server::startup(server::ServerConfig {
host: self.host.clone(), host: self.host.clone(),
...@@ -146,6 +165,7 @@ impl Router { ...@@ -146,6 +165,7 @@ impl Router {
max_payload_size: self.max_payload_size, max_payload_size: self.max_payload_size,
log_dir: self.log_dir.clone(), log_dir: self.log_dir.clone(),
service_discovery_config, service_discovery_config,
prometheus_config,
}) })
.await .await
.map_err(|e| pyo3::exceptions::PyRuntimeError::new_err(e.to_string()))?; .map_err(|e| pyo3::exceptions::PyRuntimeError::new_err(e.to_string()))?;
......
use metrics_exporter_prometheus::{Matcher, PrometheusBuilder};
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::time::Duration;
#[derive(Debug, Clone)]
pub struct PrometheusConfig {
pub port: u16,
pub host: String,
}
impl Default for PrometheusConfig {
fn default() -> Self {
Self {
port: 29000,
host: "0.0.0.0".to_string(),
}
}
}
pub fn start_prometheus(config: PrometheusConfig) {
let duration_matcher = Matcher::Suffix(String::from("duration"));
let duration_bucket = [
0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, 15.0, 30.0, 45.0,
60.0, 90.0, 120.0, 180.0, 240.0,
];
let ip_addr: IpAddr = config
.host
.parse()
.unwrap_or(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)));
let socket_addr = SocketAddr::new(ip_addr, config.port);
PrometheusBuilder::new()
.with_http_listener(socket_addr)
.upkeep_timeout(Duration::from_secs(5 * 60))
.set_buckets_for_metric(duration_matcher, &duration_bucket)
.expect("failed to set duration bucket")
.install()
.expect("failed to install Prometheus metrics exporter");
}
use crate::tree::Tree; use crate::tree::Tree;
use ::metrics::{counter, gauge, histogram};
use actix_web::http::header::{HeaderValue, CONTENT_TYPE}; use actix_web::http::header::{HeaderValue, CONTENT_TYPE};
use actix_web::{HttpRequest, HttpResponse}; use actix_web::{HttpRequest, HttpResponse};
use bytes::Bytes; use bytes::Bytes;
...@@ -10,6 +11,7 @@ use std::sync::atomic::AtomicUsize; ...@@ -10,6 +11,7 @@ use std::sync::atomic::AtomicUsize;
use std::sync::{Arc, Mutex, RwLock}; use std::sync::{Arc, Mutex, RwLock};
use std::thread; use std::thread;
use std::time::Duration; use std::time::Duration;
use std::time::Instant;
use tokio; use tokio;
use tracing::{debug, error, info, warn}; use tracing::{debug, error, info, warn};
...@@ -135,6 +137,9 @@ pub enum PolicyConfig { ...@@ -135,6 +137,9 @@ pub enum PolicyConfig {
impl Router { impl Router {
pub fn new(worker_urls: Vec<String>, policy_config: PolicyConfig) -> Result<Self, String> { pub fn new(worker_urls: Vec<String>, policy_config: PolicyConfig) -> Result<Self, String> {
// Update active workers gauge
gauge!("sgl_router_active_workers").set(worker_urls.len() as f64);
// Get timeout and interval from policy config // Get timeout and interval from policy config
let (timeout_secs, interval_secs) = match &policy_config { let (timeout_secs, interval_secs) = match &policy_config {
PolicyConfig::RandomConfig { PolicyConfig::RandomConfig {
...@@ -328,6 +333,7 @@ impl Router { ...@@ -328,6 +333,7 @@ impl Router {
route: &str, route: &str,
req: &HttpRequest, req: &HttpRequest,
) -> HttpResponse { ) -> HttpResponse {
let start = Instant::now();
let mut request_builder = client.get(format!("{}{}", worker_url, route)); let mut request_builder = client.get(format!("{}{}", worker_url, route));
// Copy all headers from original request except for /health because it does not need authorization // Copy all headers from original request except for /health because it does not need authorization
...@@ -337,7 +343,7 @@ impl Router { ...@@ -337,7 +343,7 @@ impl Router {
} }
} }
match request_builder.send().await { let response = match request_builder.send().await {
Ok(res) => { Ok(res) => {
let status = actix_web::http::StatusCode::from_u16(res.status().as_u16()) let status = actix_web::http::StatusCode::from_u16(res.status().as_u16())
.unwrap_or(actix_web::http::StatusCode::INTERNAL_SERVER_ERROR); .unwrap_or(actix_web::http::StatusCode::INTERNAL_SERVER_ERROR);
...@@ -352,7 +358,21 @@ impl Router { ...@@ -352,7 +358,21 @@ impl Router {
"Failed to send request to worker {}: {}", "Failed to send request to worker {}: {}",
worker_url, e worker_url, e
)), )),
};
// Record request metrics
if route != "/health" {
let duration = start.elapsed();
counter!("sgl_router_requests_total", "route" => route.to_string()).increment(1);
histogram!("sgl_router_request_duration_seconds", "route" => route.to_string())
.record(duration.as_secs_f64());
if !response.status().is_success() {
counter!("sgl_router_request_errors_total", "route" => route.to_string())
.increment(1);
}
} }
response
} }
pub async fn route_to_first( pub async fn route_to_first(
...@@ -510,6 +530,10 @@ impl Router { ...@@ -510,6 +530,10 @@ impl Router {
max_load, min_load, running_queue max_load, min_load, running_queue
); );
counter!("sgl_router_load_balancing_events_total").increment(1);
gauge!("sgl_router_max_load").set(max_load as f64);
gauge!("sgl_router_min_load").set(min_load as f64);
// Use shortest queue routing when load is imbalanced // Use shortest queue routing when load is imbalanced
running_queue running_queue
.iter() .iter()
...@@ -523,8 +547,10 @@ impl Router { ...@@ -523,8 +547,10 @@ impl Router {
matched_text.chars().count() as f32 / text.chars().count() as f32; matched_text.chars().count() as f32 / text.chars().count() as f32;
if matched_rate > *cache_threshold { if matched_rate > *cache_threshold {
counter!("sgl_router_cache_hits_total").increment(1);
matched_worker.to_string() matched_worker.to_string()
} else { } else {
counter!("sgl_router_cache_misses_total").increment(1);
tree.get_smallest_tenant() tree.get_smallest_tenant()
} }
}; };
...@@ -537,6 +563,11 @@ impl Router { ...@@ -537,6 +563,11 @@ impl Router {
.unwrap() .unwrap()
.get_mut(&selected_url) .get_mut(&selected_url)
.unwrap() += 1; .unwrap() += 1;
gauge!("sgl_router_running_requests", "worker" => selected_url.to_string())
.set(*running_queue.get(&selected_url).unwrap() as f64);
counter!("sgl_router_processed_requests_total", "worker" => selected_url.to_string()).increment(1);
tree.insert(&text, &selected_url); tree.insert(&text, &selected_url);
selected_url selected_url
...@@ -636,6 +667,7 @@ impl Router { ...@@ -636,6 +667,7 @@ impl Router {
body: &Bytes, body: &Bytes,
route: &str, route: &str,
) -> HttpResponse { ) -> HttpResponse {
let start = Instant::now();
const MAX_REQUEST_RETRIES: u32 = 3; const MAX_REQUEST_RETRIES: u32 = 3;
const MAX_TOTAL_RETRIES: u32 = 6; const MAX_TOTAL_RETRIES: u32 = 6;
let mut total_retries = 0; let mut total_retries = 0;
...@@ -648,18 +680,24 @@ impl Router { ...@@ -648,18 +680,24 @@ impl Router {
while request_retries < MAX_REQUEST_RETRIES { while request_retries < MAX_REQUEST_RETRIES {
if total_retries >= 1 { if total_retries >= 1 {
info!("Retrying request after {} failed attempts", total_retries); info!("Retrying request after {} failed attempts", total_retries);
counter!("sgl_router_retries_total", "route" => route.to_string()).increment(1);
} }
let response = self let response = self
.send_generate_request(client, req, body, route, &worker_url) .send_generate_request(client, req, body, route, &worker_url)
.await; .await;
if response.status().is_success() { if response.status().is_success() {
let duration = start.elapsed();
histogram!("sgl_router_generate_duration_seconds", "route" => route.to_string()).record(duration.as_secs_f64());
return response; return response;
} else { } else {
// if the worker is healthy, it means the request is bad, so return the error response // if the worker is healthy, it means the request is bad, so return the error response
let health_response = let health_response =
self.send_request(client, &worker_url, "/health", req).await; self.send_request(client, &worker_url, "/health", req).await;
if health_response.status().is_success() { if health_response.status().is_success() {
counter!("sgl_router_request_errors_total", "route" => route.to_string())
.increment(1);
return response; return response;
} }
} }
...@@ -682,6 +720,7 @@ impl Router { ...@@ -682,6 +720,7 @@ impl Router {
} }
} }
counter!("sgl_router_request_errors_total", "route" => route.to_string()).increment(1);
HttpResponse::InternalServerError().body("All retry attempts failed") HttpResponse::InternalServerError().body("All retry attempts failed")
} }
...@@ -733,6 +772,7 @@ impl Router { ...@@ -733,6 +772,7 @@ impl Router {
} }
info!("Added worker: {}", worker_url); info!("Added worker: {}", worker_url);
urls.push(worker_url.to_string()); urls.push(worker_url.to_string());
gauge!("sgl_router_active_workers").set(urls.len() as f64);
} }
} }
...@@ -804,6 +844,7 @@ impl Router { ...@@ -804,6 +844,7 @@ impl Router {
if let Some(index) = urls.iter().position(|url| url == &worker_url) { if let Some(index) = urls.iter().position(|url| url == &worker_url) {
urls.remove(index); urls.remove(index);
info!("Removed worker: {}", worker_url); info!("Removed worker: {}", worker_url);
gauge!("sgl_router_active_workers").set(urls.len() as f64);
} else { } else {
warn!("Worker {} not found, skipping removal", worker_url); warn!("Worker {} not found, skipping removal", worker_url);
return; return;
......
use crate::logging::{self, LoggingConfig}; use crate::logging::{self, LoggingConfig};
use crate::prometheus::{self, PrometheusConfig};
use crate::router::PolicyConfig; use crate::router::PolicyConfig;
use crate::router::Router; use crate::router::Router;
use crate::service_discovery::{start_service_discovery, ServiceDiscoveryConfig}; use crate::service_discovery::{start_service_discovery, ServiceDiscoveryConfig};
...@@ -161,6 +162,7 @@ pub struct ServerConfig { ...@@ -161,6 +162,7 @@ pub struct ServerConfig {
pub max_payload_size: usize, pub max_payload_size: usize,
pub log_dir: Option<String>, pub log_dir: Option<String>,
pub service_discovery_config: Option<ServiceDiscoveryConfig>, pub service_discovery_config: Option<ServiceDiscoveryConfig>,
pub prometheus_config: Option<PrometheusConfig>,
} }
pub async fn startup(config: ServerConfig) -> std::io::Result<()> { pub async fn startup(config: ServerConfig) -> std::io::Result<()> {
...@@ -184,6 +186,17 @@ pub async fn startup(config: ServerConfig) -> std::io::Result<()> { ...@@ -184,6 +186,17 @@ pub async fn startup(config: ServerConfig) -> std::io::Result<()> {
None None
}; };
// Initialize prometheus metrics exporter
if let Some(prometheus_config) = config.prometheus_config {
info!(
"🚧 Initializing Prometheus metrics on {}:{}",
prometheus_config.host, prometheus_config.port
);
prometheus::start_prometheus(prometheus_config);
} else {
info!("🚧 Prometheus metrics disabled");
}
info!("🚧 Initializing router on {}:{}", config.host, config.port); info!("🚧 Initializing router on {}:{}", config.host, config.port);
info!("🚧 Initializing workers on {:?}", config.worker_urls); info!("🚧 Initializing workers on {:?}", config.worker_urls);
info!("🚧 Policy Config: {:?}", config.policy_config); info!("🚧 Policy Config: {:?}", config.policy_config);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment