"launch/vscode:/vscode.git/clone" did not exist on "99cd9d85a943de0eba06fba89c0e7b5311ea45bb"
Commit 666cf87b authored by Ryan McCormick's avatar Ryan McCormick Committed by GitHub
Browse files

feat: Support prometheus push gateway for use cases behind a firewall (#64)

parent e7233b2d
...@@ -129,7 +129,7 @@ dependencies = [ ...@@ -129,7 +129,7 @@ dependencies = [
"ring", "ring",
"rustls-native-certs 0.7.3", "rustls-native-certs 0.7.3",
"rustls-pemfile 2.2.0", "rustls-pemfile 2.2.0",
"rustls-webpki", "rustls-webpki 0.102.8",
"serde", "serde",
"serde_json", "serde_json",
"serde_nanos", "serde_nanos",
...@@ -137,7 +137,7 @@ dependencies = [ ...@@ -137,7 +137,7 @@ dependencies = [
"thiserror 1.0.69", "thiserror 1.0.69",
"time", "time",
"tokio", "tokio",
"tokio-rustls", "tokio-rustls 0.26.2",
"tokio-util", "tokio-util",
"tokio-websockets", "tokio-websockets",
"tracing", "tracing",
...@@ -1683,6 +1683,20 @@ dependencies = [ ...@@ -1683,6 +1683,20 @@ dependencies = [
"want", "want",
] ]
[[package]]
name = "hyper-rustls"
version = "0.24.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ec3efd23720e2049821a693cbc7e65ea87c72f1c58ff2f9522ff332b1491e590"
dependencies = [
"futures-util",
"http 0.2.12",
"hyper 0.14.32",
"rustls 0.21.12",
"tokio",
"tokio-rustls 0.24.1",
]
[[package]] [[package]]
name = "hyper-rustls" name = "hyper-rustls"
version = "0.27.5" version = "0.27.5"
...@@ -1693,11 +1707,11 @@ dependencies = [ ...@@ -1693,11 +1707,11 @@ dependencies = [
"http 1.2.0", "http 1.2.0",
"hyper 1.6.0", "hyper 1.6.0",
"hyper-util", "hyper-util",
"rustls", "rustls 0.23.23",
"rustls-native-certs 0.8.1", "rustls-native-certs 0.8.1",
"rustls-pki-types", "rustls-pki-types",
"tokio", "tokio",
"tokio-rustls", "tokio-rustls 0.26.2",
"tower-service", "tower-service",
] ]
...@@ -3026,7 +3040,7 @@ dependencies = [ ...@@ -3026,7 +3040,7 @@ dependencies = [
"quinn-proto", "quinn-proto",
"quinn-udp", "quinn-udp",
"rustc-hash 2.1.1", "rustc-hash 2.1.1",
"rustls", "rustls 0.23.23",
"socket2", "socket2",
"thiserror 2.0.12", "thiserror 2.0.12",
"tokio", "tokio",
...@@ -3044,7 +3058,7 @@ dependencies = [ ...@@ -3044,7 +3058,7 @@ dependencies = [
"rand", "rand",
"ring", "ring",
"rustc-hash 2.1.1", "rustc-hash 2.1.1",
"rustls", "rustls 0.23.23",
"rustls-pki-types", "rustls-pki-types",
"slab", "slab",
"thiserror 2.0.12", "thiserror 2.0.12",
...@@ -3216,6 +3230,7 @@ dependencies = [ ...@@ -3216,6 +3230,7 @@ dependencies = [
"http 0.2.12", "http 0.2.12",
"http-body 0.4.6", "http-body 0.4.6",
"hyper 0.14.32", "hyper 0.14.32",
"hyper-rustls 0.24.2",
"hyper-tls", "hyper-tls",
"ipnet", "ipnet",
"js-sys", "js-sys",
...@@ -3225,6 +3240,7 @@ dependencies = [ ...@@ -3225,6 +3240,7 @@ dependencies = [
"once_cell", "once_cell",
"percent-encoding", "percent-encoding",
"pin-project-lite", "pin-project-lite",
"rustls 0.21.12",
"rustls-pemfile 1.0.4", "rustls-pemfile 1.0.4",
"serde", "serde",
"serde_json", "serde_json",
...@@ -3233,11 +3249,13 @@ dependencies = [ ...@@ -3233,11 +3249,13 @@ dependencies = [
"system-configuration", "system-configuration",
"tokio", "tokio",
"tokio-native-tls", "tokio-native-tls",
"tokio-rustls 0.24.1",
"tower-service", "tower-service",
"url", "url",
"wasm-bindgen", "wasm-bindgen",
"wasm-bindgen-futures", "wasm-bindgen-futures",
"web-sys", "web-sys",
"webpki-roots 0.25.4",
"winreg", "winreg",
] ]
...@@ -3255,7 +3273,7 @@ dependencies = [ ...@@ -3255,7 +3273,7 @@ dependencies = [
"http-body 1.0.1", "http-body 1.0.1",
"http-body-util", "http-body-util",
"hyper 1.6.0", "hyper 1.6.0",
"hyper-rustls", "hyper-rustls 0.27.5",
"hyper-util", "hyper-util",
"ipnet", "ipnet",
"js-sys", "js-sys",
...@@ -3266,7 +3284,7 @@ dependencies = [ ...@@ -3266,7 +3284,7 @@ dependencies = [
"percent-encoding", "percent-encoding",
"pin-project-lite", "pin-project-lite",
"quinn", "quinn",
"rustls", "rustls 0.23.23",
"rustls-native-certs 0.8.1", "rustls-native-certs 0.8.1",
"rustls-pemfile 2.2.0", "rustls-pemfile 2.2.0",
"rustls-pki-types", "rustls-pki-types",
...@@ -3275,7 +3293,7 @@ dependencies = [ ...@@ -3275,7 +3293,7 @@ dependencies = [
"serde_urlencoded", "serde_urlencoded",
"sync_wrapper 1.0.2", "sync_wrapper 1.0.2",
"tokio", "tokio",
"tokio-rustls", "tokio-rustls 0.26.2",
"tokio-util", "tokio-util",
"tower 0.5.2", "tower 0.5.2",
"tower-service", "tower-service",
...@@ -3357,6 +3375,18 @@ dependencies = [ ...@@ -3357,6 +3375,18 @@ dependencies = [
"windows-sys 0.59.0", "windows-sys 0.59.0",
] ]
[[package]]
name = "rustls"
version = "0.21.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3f56a14d1f48b391359b22f731fd4bd7e43c97f3c50eee276f3aa09c94784d3e"
dependencies = [
"log",
"ring",
"rustls-webpki 0.101.7",
"sct",
]
[[package]] [[package]]
name = "rustls" name = "rustls"
version = "0.23.23" version = "0.23.23"
...@@ -3367,7 +3397,7 @@ dependencies = [ ...@@ -3367,7 +3397,7 @@ dependencies = [
"once_cell", "once_cell",
"ring", "ring",
"rustls-pki-types", "rustls-pki-types",
"rustls-webpki", "rustls-webpki 0.102.8",
"subtle", "subtle",
"zeroize", "zeroize",
] ]
...@@ -3424,6 +3454,16 @@ dependencies = [ ...@@ -3424,6 +3454,16 @@ dependencies = [
"web-time", "web-time",
] ]
[[package]]
name = "rustls-webpki"
version = "0.101.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b6275d1ee7a1cd780b64aca7726599a1dbc893b1e64144529e55c3c2f745765"
dependencies = [
"ring",
"untrusted",
]
[[package]] [[package]]
name = "rustls-webpki" name = "rustls-webpki"
version = "0.102.8" version = "0.102.8"
...@@ -3471,6 +3511,16 @@ version = "1.2.0" ...@@ -3471,6 +3511,16 @@ version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49"
[[package]]
name = "sct"
version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "da046153aa2352493d6cb7da4b6e5c0c057d8a1d0a9aa8560baffdd945acd414"
dependencies = [
"ring",
"untrusted",
]
[[package]] [[package]]
name = "secrecy" name = "secrecy"
version = "0.10.3" version = "0.10.3"
...@@ -4054,13 +4104,23 @@ dependencies = [ ...@@ -4054,13 +4104,23 @@ dependencies = [
"tokio", "tokio",
] ]
[[package]]
name = "tokio-rustls"
version = "0.24.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c28327cf380ac148141087fbfb9de9d7bd4e84ab5d2c28fbc911d753de8a7081"
dependencies = [
"rustls 0.21.12",
"tokio",
]
[[package]] [[package]]
name = "tokio-rustls" name = "tokio-rustls"
version = "0.26.2" version = "0.26.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8e727b36a1a0e8b74c376ac2211e40c2c8af09fb4013c60d910495810f008e9b" checksum = "8e727b36a1a0e8b74c376ac2211e40c2c8af09fb4013c60d910495810f008e9b"
dependencies = [ dependencies = [
"rustls", "rustls 0.23.23",
"tokio", "tokio",
] ]
...@@ -4105,7 +4165,7 @@ dependencies = [ ...@@ -4105,7 +4165,7 @@ dependencies = [
"rustls-native-certs 0.8.1", "rustls-native-certs 0.8.1",
"rustls-pki-types", "rustls-pki-types",
"tokio", "tokio",
"tokio-rustls", "tokio-rustls 0.26.2",
"tokio-util", "tokio-util",
] ]
...@@ -4444,12 +4504,12 @@ dependencies = [ ...@@ -4444,12 +4504,12 @@ dependencies = [
"log", "log",
"native-tls", "native-tls",
"once_cell", "once_cell",
"rustls", "rustls 0.23.23",
"rustls-pki-types", "rustls-pki-types",
"serde", "serde",
"serde_json", "serde_json",
"url", "url",
"webpki-roots", "webpki-roots 0.26.8",
] ]
[[package]] [[package]]
...@@ -4689,6 +4749,12 @@ dependencies = [ ...@@ -4689,6 +4749,12 @@ dependencies = [
"wasm-bindgen", "wasm-bindgen",
] ]
[[package]]
name = "webpki-roots"
version = "0.25.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5f20c57d8d7db6d3b86154206ae5d8fba62dd39573114de97c2cb0578251f8e1"
[[package]] [[package]]
name = "webpki-roots" name = "webpki-roots"
version = "0.26.8" version = "0.26.8"
......
...@@ -43,6 +43,4 @@ opentelemetry-prometheus = "0.13" ...@@ -43,6 +43,4 @@ opentelemetry-prometheus = "0.13"
prometheus = "0.13" prometheus = "0.13"
rand = "0.8" rand = "0.8"
axum = "0.6" axum = "0.6"
reqwest = { version = "0.11", features = ["json", "rustls-tls"] }
[dev-dependencies] \ No newline at end of file
reqwest = { version = "0.11", features = ["blocking"] }
...@@ -12,7 +12,7 @@ This will: ...@@ -12,7 +12,7 @@ This will:
For example: For example:
```bash ```bash
# For more details, try DYN_LOG=debug # For more details, try DYN_LOG=debug
DYN_LOG=info cargo run --bin metrics -- --namespace dynamo --component backend --endpoint generate DYN_LOG=info metrics --namespace dynamo --component backend --endpoint generate
# 2025-02-26T18:45:05.467026Z INFO metrics: Creating unique instance of Metrics at dynamo/components/metrics/instance # 2025-02-26T18:45:05.467026Z INFO metrics: Creating unique instance of Metrics at dynamo/components/metrics/instance
# 2025-02-26T18:45:05.472146Z INFO metrics: Scraping service dynamo_backend_720278f8 and filtering on subject dynamo_backend_720278f8.generate # 2025-02-26T18:45:05.472146Z INFO metrics: Scraping service dynamo_backend_720278f8 and filtering on subject dynamo_backend_720278f8.generate
...@@ -27,13 +27,39 @@ With no matching endpoints running to collect stats from, you should see warning ...@@ -27,13 +27,39 @@ With no matching endpoints running to collect stats from, you should see warning
After a matching endpoint gets started, you should see the warnings stop After a matching endpoint gets started, you should see the warnings stop
when the endpoint gets automatically discovered. when the endpoint gets automatically discovered.
When stats are found from target endpoints, the metrics component will ## Building/Running from Source
aggregate them and publish them to a prometheus server running on `localhost:9091/metrics` by default:
For easy iteration while making edits to the metrics component, you can use `cargo run`
to build and run with your local changes:
```bash
DYN_LOG=info cargo run --bin metrics -- --namespace dynamo --component backend --endpoint generate
``` ```
2025-02-28T04:05:58.077901Z INFO metrics: Aggregated metrics: ProcessedEndpoints { endpoints: [Endpoint { name: "worker-7587884888253033398", subject: "dynamo_backend_720278f8.generate-694d951a80e06bb6", data: ForwardPassMetrics { request_active_slots: 58, request_total_slots: 100, kv_active_blocks: 77, kv_total_blocks: 100 } }, Endpoint { name: "worker-7587884888253033401", subject: "dynamo_backend_720278f8.generate-694d951a80e06bb9", data: ForwardPassMetrics { request_active_slots: 71, request_total_slots: 100, kv_active_blocks: 29, kv_total_blocks: 100 } }], worker_ids: [7587884888253033398, 7587884888253033401], load_avg: 53.0, load_std: 24.0 }
## Metrics Collection Modes
The metrics component supports two modes for exposing metrics in a Prometheus format:
### Pull Mode (Default)
When running in pull mode (the default), the metrics component will expose a Prometheus metrics endpoint on the specified host and port that a Prometheus server or curl client can pull from:
```bash
# Start metrics server on default host (0.0.0.0) and port (9091)
DYN_LOG=info metrics --component backend --endpoint generate
# Or specify a custom port
DYN_LOG=info metrics --component backend --endpoint generate --port 9092
# Or specify a custom host and port
DYN_LOG=info metrics --component backend --endpoint generate --host 127.0.0.1 --port 9092
``` ```
To see the metrics being published in prometheus format, you can run: In pull mode:
- The `--host` parameter must be a valid IPv4 or IPv6 address (e.g., "0.0.0.0", "127.0.0.1")
- The `--port` parameter specifies which port the HTTP server will listen on
You can then query the metrics using:
```bash ```bash
curl localhost:9091/metrics curl localhost:9091/metrics
...@@ -47,7 +73,44 @@ curl localhost:9091/metrics ...@@ -47,7 +73,44 @@ curl localhost:9091/metrics
# llm_kv_blocks_total{component="backend",endpoint="generate",worker_id="7587884888253033401"} 100 # llm_kv_blocks_total{component="backend",endpoint="generate",worker_id="7587884888253033401"} 100
``` ```
## Mock Worker ### Push Mode
For ephemeral or batch jobs, or when metrics need to be pushed through a firewall, you can use Push mode. In this mode, the metrics component will periodically push metrics to an externally hosted Prometheus PushGateway:
Start a prometheus push gateway service via docker:
```bash
docker run --rm -d -p 9091:9091 --name pushgateway prom/pushgateway
```
Start the metrics component in `--push` mode, specifying the host and port of your PushGateway:
```bash
# Push metrics to a Prometheus PushGateway every --push-interval seconds
DYN_LOG=info metrics \
--component backend \
--endpoint generate \
--host 127.0.0.1 \
--port 9091 \
--push
```
When using Push mode:
- The `--host` parameter specifies be the IP address of the PushGateway
- The `--port` parameter specifies the port of the PushGateway
- The push interval can be configured with `--push-interval` (default: 2 seconds)
- A default job name of "dynamo_metrics" is used for the Prometheus job label
- Metrics persist in the PushGateway until explicitly deleted
- Prometheus should be configured to scrape the PushGateway with `honor_labels: true`
To view the metrics hosted on the PushGateway:
```bash
# View all metrics
# curl http://<pushgateway_ip>:<pushgateway_port>/metrics
curl 127.0.0.1:9091/metrics
```
## Workers
### Mock Worker
For convenience and debugging, there is a mock worker that registers a mock `StatsHandler` For convenience and debugging, there is a mock worker that registers a mock `StatsHandler`
with the `endpoint` and publishes mock `KvHitRateEvent`s on `namespace/kv-hit-rate`. with the `endpoint` and publishes mock `KvHitRateEvent`s on `namespace/kv-hit-rate`.
...@@ -60,6 +123,13 @@ DYN_LOG=info cargo run --bin mock_worker ...@@ -60,6 +123,13 @@ DYN_LOG=info cargo run --bin mock_worker
**NOTE**: When using the mock worker, the data from the stats handler and the **NOTE**: When using the mock worker, the data from the stats handler and the
events will be random and shouldn't be expected to correlate with each other. events will be random and shouldn't be expected to correlate with each other.
## Real Worker ### Real Worker
See the KV Routing example in `examples/python_rs/llm/vllm`. See the KV Routing example in `examples/python_rs/llm/vllm`.
Start the `metrics` component with the corresponding namespace/component/endpoint that the
KV Routing example is using (NOTE: `load_metrics` endpoint is currently a hard-coded value
internally for the ForwardPassMetrics StatsHandler), for example:
```
DYN_LOG=info metrics --namespace dynamo --component vllm --endpoint load_metrics
```
...@@ -14,17 +14,131 @@ ...@@ -14,17 +14,131 @@
// limitations under the License. // limitations under the License.
//! Library functions for the metrics application. //! Library functions for the metrics application.
//!
//! This library provides functionality to expose Prometheus metrics either through a local HTTP server
//! or by pushing to a Prometheus PushGateway.
//!
//! # Examples
//!
//! ## Using the metrics pull mode
//! ```no_run
//! use metrics::{PrometheusMetricsCollector, MetricsMode};
//!
//! #[tokio::main]
//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
//! let mut collector = PrometheusMetricsCollector::new()?;
//!
//! // Start a metrics server with default values
//! collector.start(MetricsMode::default())?;
//!
//! // Or explicitly specify values
//! collector.start(MetricsMode::Pull {
//! host: "127.0.0.1".to_string(),
//! port: 9090,
//! })?;
//!
//! // Or use the convenience constructor
//! collector.start(MetricsMode::new_pull())?;
//!
//! // Your application code here
//! tokio::signal::ctrl_c().await?;
//!
//! // Stop the metrics server gracefully
//! collector.stop();
//! Ok(())
//! }
//! ```
//!
//! ## Using the Push mode
//! ```no_run
//! use metrics::{PrometheusMetricsCollector, MetricsMode};
//!
//! #[tokio::main]
//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
//! let mut collector = PrometheusMetricsCollector::new()?;
//!
//! // Start pushing metrics to a Prometheus PushGateway with default values
//! collector.start(MetricsMode::new_push())?;
//!
//! // Or explicitly specify values
//! collector.start(MetricsMode::Push {
//! host: "127.0.0.1".to_string(),
//! port: 9091,
//! job: "custom_job".to_string(),
//! interval: 5, // Push every 5 seconds
//! })?;
//!
//! // Your application code here
//! tokio::signal::ctrl_c().await?;
//!
//! // Stop pushing metrics gracefully
//! collector.stop();
//! Ok(())
//! }
use axum::{routing::get, Router}; use axum::{routing::get, Router};
use prometheus::{register_counter_vec, register_gauge_vec}; use prometheus::{register_counter_vec, register_gauge_vec, Encoder, TextEncoder};
use reqwest::Client;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::net::SocketAddr; use std::net::SocketAddr;
use std::time::Duration as StdDuration;
use dynamo_llm::kv_router::protocols::ForwardPassMetrics; use dynamo_llm::kv_router::protocols::ForwardPassMetrics;
use dynamo_llm::kv_router::scheduler::Endpoint; use dynamo_llm::kv_router::scheduler::Endpoint;
use dynamo_llm::kv_router::scoring::ProcessedEndpoints; use dynamo_llm::kv_router::scoring::ProcessedEndpoints;
use dynamo_runtime::{distributed::Component, service::EndpointInfo, utils::Duration, Result}; use dynamo_runtime::{
distributed::Component, error, service::EndpointInfo, utils::Duration, Result,
};
/// Configuration for metrics collection mode
#[derive(Debug, Clone)]
pub enum MetricsMode {
/// Host a Prometheus metrics server for pull-based collection
Pull {
/// Host to listen on (e.g. "0.0.0.0")
host: String,
/// Port to listen on (e.g. 9091)
port: u16,
},
/// Push to a Prometheus PushGateway
Push {
/// PushGateway host (e.g. "http://localhost")
host: String,
/// PushGateway port (e.g. 9091)
port: u16,
/// Job name for the metrics
job: String,
/// Push interval in seconds
interval: u64,
},
}
impl Default for MetricsMode {
fn default() -> Self {
Self::new_pull()
}
}
impl MetricsMode {
/// Create a new Pull mode with default values
pub fn new_pull() -> Self {
Self::Pull {
host: "0.0.0.0".to_string(),
port: 9091,
}
}
/// Create a new Push mode with default values
pub fn new_push() -> Self {
Self::Push {
host: "127.0.0.1".to_string(),
port: 9091,
job: "dynamo_metrics".to_string(),
interval: 2,
}
}
}
/// Configuration for LLM worker load capacity metrics /// Configuration for LLM worker load capacity metrics
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
...@@ -51,28 +165,53 @@ pub struct StatsWithData { ...@@ -51,28 +165,53 @@ pub struct StatsWithData {
pub data: serde_json::Value, pub data: serde_json::Value,
} }
/// Prometheus metrics server for exposing metrics /// Metrics collector for exposing metrics to prometheus/grafana
pub struct PrometheusMetricsServer { pub struct PrometheusMetricsCollector {
metrics: PrometheusMetrics, metrics: PrometheusMetrics,
mode: Option<MetricsMode>,
shutdown_tx: Option<tokio::sync::oneshot::Sender<()>>,
} }
impl PrometheusMetricsServer { impl PrometheusMetricsCollector {
/// Initialize the metrics server
pub fn new() -> Result<Self> { pub fn new() -> Result<Self> {
Ok(Self { Ok(Self {
metrics: PrometheusMetrics::new()?, metrics: PrometheusMetrics::new()?,
mode: None,
shutdown_tx: None,
}) })
} }
/// Start the metrics server on the specified port /// Start metrics collection with the specified mode
pub fn start(&mut self, port: u16) { pub fn start(&mut self, mode: MetricsMode) -> Result<()> {
// Store the mode
self.mode = Some(mode.clone());
match mode {
MetricsMode::Pull { host, port } => self.start_pull_mode(host, port),
MetricsMode::Push {
host,
port,
job,
interval,
} => self.start_push_mode(host, port, job, interval),
}
}
/// Stop metrics collection
pub fn stop(&mut self) {
if let Some(tx) = self.shutdown_tx.take() {
let _ = tx.send(());
}
}
/// Start a metrics server for pull-based collection on the specified port
fn start_pull_mode(&mut self, host: String, port: u16) -> Result<()> {
// Create an axum router with a metrics endpoint // Create an axum router with a metrics endpoint
let app = Router::new().route( let app = Router::new().route(
"/metrics", "/metrics",
get(|| async { get(|| async {
// Gather and encode metrics // Gather and encode metrics
use prometheus::Encoder; let encoder = TextEncoder::new();
let encoder = prometheus::TextEncoder::new();
let mut buffer = Vec::new(); let mut buffer = Vec::new();
encoder.encode(&prometheus::gather(), &mut buffer).unwrap(); encoder.encode(&prometheus::gather(), &mut buffer).unwrap();
String::from_utf8(buffer).unwrap() String::from_utf8(buffer).unwrap()
...@@ -80,17 +219,120 @@ impl PrometheusMetricsServer { ...@@ -80,17 +219,120 @@ impl PrometheusMetricsServer {
); );
// Create a socket address to listen on // Create a socket address to listen on
let addr = SocketAddr::from(([0, 0, 0, 0], port)); let ip_addr = host.parse().map_err(|e| {
error!("Failed to parse host '{}' as IP address: {}. Use a valid IPv4 or IPv6 address (e.g. '0.0.0.0' or '127.0.0.1')", host, e)
})?;
let addr = SocketAddr::new(ip_addr, port);
// Create shutdown channel
let (tx, rx) = tokio::sync::oneshot::channel();
self.shutdown_tx = Some(tx);
// Try to bind to the address first to fail early if it's not available
let server = match axum::Server::try_bind(&addr) {
Ok(server) => server,
Err(e) => {
return Err(error!(
"Failed to bind to address {}: {}. The port may be in use.",
addr, e
));
}
};
// Spawn the server in a background task // Spawn the server in a background task
tokio::spawn(async move { tokio::spawn(async move {
axum::Server::bind(&addr) let server = server.serve(app.into_make_service());
.serve(app.into_make_service())
// Create a future that completes when shutdown signal is received
let shutdown_future = async {
rx.await.ok();
};
// Run the server with graceful shutdown
tokio::select! {
result = server => {
if let Err(e) = result {
tracing::error!("Metrics server error: {}", e);
}
},
_ = shutdown_future => {
tracing::info!("Metrics server shutting down gracefully");
},
}
});
tracing::info!("Prometheus metrics server started at {addr}/metrics");
Ok(())
}
/// Start pushing metrics to a Prometheus PushGateway
fn start_push_mode(
&mut self,
host: String,
port: u16,
job: String,
interval: u64,
) -> Result<()> {
// Create shutdown channel
let (tx, mut rx) = tokio::sync::oneshot::channel();
self.shutdown_tx = Some(tx);
// Create HTTP client
let client = Client::new();
let url = format!("http://{host}:{port}/metrics/job/{job}");
let url_clone = url.clone();
let interval_duration = StdDuration::from_secs(interval);
// Spawn background task to periodically push metrics
tokio::spawn(async move {
let mut interval = tokio::time::interval(interval_duration);
loop {
tokio::select! {
_ = interval.tick() => {
// Gather and encode metrics
let encoder = TextEncoder::new();
let mut buffer = Vec::new();
if let Err(e) = encoder.encode(&prometheus::gather(), &mut buffer) {
tracing::error!("Failed to encode metrics: {}", e);
continue;
}
// Push metrics to the gateway
match client.post(&url)
.header("Content-Type", encoder.format_type())
.body(buffer)
.send()
.await .await
.unwrap(); {
Ok(response) => {
if response.status().is_success() {
tracing::debug!("Successfully pushed metrics to PushGateway");
} else {
tracing::error!(
"Failed to push metrics to PushGateway. Status: {}, Error: {:?}",
response.status(),
response.text().await
);
}
}
Err(e) => {
tracing::error!("Failed to push metrics to PushGateway: {}", e);
}
}
}
_ = &mut rx => {
tracing::info!("Stopping metrics push task");
break;
}
}
}
}); });
tracing::info!("Prometheus metrics server started at {addr:?}/metrics"); tracing::info!(
"Started pushing metrics to PushGateway at '{url_clone}' with job name '{job}'"
);
Ok(())
} }
/// Update metrics with current values /// Update metrics with current values
......
...@@ -41,13 +41,17 @@ use std::sync::Arc; ...@@ -41,13 +41,17 @@ use std::sync::Arc;
// Import from our library // Import from our library
use metrics::{ use metrics::{
collect_endpoints, extract_metrics, postprocess_metrics, LLMWorkerLoadCapacityConfig, collect_endpoints, extract_metrics, postprocess_metrics, LLMWorkerLoadCapacityConfig,
PrometheusMetricsServer, MetricsMode, PrometheusMetricsCollector,
}; };
/// CLI arguments for the metrics application /// CLI arguments for the metrics application
#[derive(Parser, Debug)] #[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)] #[command(author, version, about, long_about = None)]
struct Args { struct Args {
/// Namespace to operate in and subscribe to events on
#[arg(long, env = "DYN_NAMESPACE", default_value = "dynamo")]
namespace: String,
/// Component to scrape metrics from /// Component to scrape metrics from
#[arg(long)] #[arg(long)]
component: String, component: String,
...@@ -56,13 +60,33 @@ struct Args { ...@@ -56,13 +60,33 @@ struct Args {
#[arg(long)] #[arg(long)]
endpoint: String, endpoint: String,
/// Namespace to operate in /// Polling interval in seconds for scraping dynamo endpoint stats (minimum 1 second)
#[arg(long, env = "DYN_NAMESPACE", default_value = "dynamo")]
namespace: String,
/// Polling interval in seconds (minimum 1 second)
#[arg(long, default_value = "2")] #[arg(long, default_value = "2")]
poll_interval: u64, poll_interval: u64,
/// Host for serving or pushing prometheus metrics (default: 0.0.0.0)
#[arg(
long,
default_value = "0.0.0.0",
help_heading = "Prometheus Metrics Config"
)]
host: String,
/// Port to run the Prometheus metrics server on (default: 9091)
#[arg(
long,
default_value = "9091",
help_heading = "Prometheus Metrics Config"
)]
port: u16,
/// Push metrics to an external Prometheus Pushgateway instead of hosting them in-process
#[arg(long, help_heading = "Prometheus Metrics Config")]
push: bool,
/// Push interval in seconds, when using push mode (minimum 1 second, default: 2)
#[arg(long, default_value = "2", help_heading = "Prometheus Metrics Config")]
push_interval: u64,
} }
fn get_config(args: &Args) -> Result<LLMWorkerLoadCapacityConfig> { fn get_config(args: &Args) -> Result<LLMWorkerLoadCapacityConfig> {
...@@ -78,6 +102,10 @@ fn get_config(args: &Args) -> Result<LLMWorkerLoadCapacityConfig> { ...@@ -78,6 +102,10 @@ fn get_config(args: &Args) -> Result<LLMWorkerLoadCapacityConfig> {
return Err(error!("Polling interval must be at least 1 second")); return Err(error!("Polling interval must be at least 1 second"));
} }
if args.push && args.push_interval < 1 {
return Err(error!("Push interval must be at least 1 second"));
}
Ok(LLMWorkerLoadCapacityConfig { Ok(LLMWorkerLoadCapacityConfig {
component_name: args.component.clone(), component_name: args.component.clone(),
endpoint_name: args.endpoint.clone(), endpoint_name: args.endpoint.clone(),
...@@ -116,24 +144,35 @@ async fn app(runtime: Runtime) -> Result<()> { ...@@ -116,24 +144,35 @@ async fn app(runtime: Runtime) -> Result<()> {
let token = drt.primary_lease().child_token(); let token = drt.primary_lease().child_token();
let event_name = format!("l2c.{}.{}", config.component_name, config.endpoint_name); let event_name = format!("l2c.{}.{}", config.component_name, config.endpoint_name);
// TODO: Make metrics host/port configurable // Initialize Prometheus metrics with the selected mode
// Initialize Prometheus metrics and start server let metrics_collector = PrometheusMetricsCollector::new()?;
let metrics_server = PrometheusMetricsServer::new()?; let metrics_collector = Arc::new(tokio::sync::Mutex::new(metrics_collector));
// Metrics will be updated concurrently, so protect it with a mutex:
// - Main loop: Collect and process ForwardPassMetrics at an interval from endpoint stats handlers // Start metrics collection in the selected mode
// - Subscription task: Collect and process KVHitRateEvent metrics from the KV router as they are published let metrics_mode = if args.push {
let metrics_server = Arc::new(tokio::sync::Mutex::new(metrics_server)); MetricsMode::Push {
metrics_server.lock().await.start(9091); host: args.host,
port: args.port,
job: "dynamo_push_metrics".to_string(),
interval: args.push_interval,
}
} else {
MetricsMode::Pull {
host: args.host,
port: args.port,
}
};
metrics_collector.lock().await.start(metrics_mode)?;
// Subscribe to KV hit rate events // Subscribe to KV hit rate events
let kv_hit_rate_subject = KV_HIT_RATE_SUBJECT; let kv_hit_rate_subject = KV_HIT_RATE_SUBJECT;
tracing::info!("Subscribing to KV hit rate events on subject: {kv_hit_rate_subject}"); tracing::info!("Subscribing to KV hit rate events on subject: {kv_hit_rate_subject}");
// Clone the metrics server and config for the subscription task // Clone fields for the event subscription task
let metrics_server_clone = metrics_server.clone();
let config_clone = config.clone(); let config_clone = config.clone();
// Clone the namespace for the subscription task
let namespace_clone = namespace.clone(); let namespace_clone = namespace.clone();
let metrics_collector_clone = metrics_collector.clone();
// Spawn a task to handle KV hit rate events // Spawn a task to handle KV hit rate events
tokio::spawn(async move { tokio::spawn(async move {
...@@ -156,7 +195,7 @@ async fn app(runtime: Runtime) -> Result<()> { ...@@ -156,7 +195,7 @@ async fn app(runtime: Runtime) -> Result<()> {
); );
// Update metrics with the event data // Update metrics with the event data
let mut metrics = metrics_server_clone.lock().await; let mut metrics = metrics_collector_clone.lock().await;
metrics.update_kv_hit_rate( metrics.update_kv_hit_rate(
&config_clone, &config_clone,
event.worker_id, event.worker_id,
...@@ -190,7 +229,7 @@ async fn app(runtime: Runtime) -> Result<()> { ...@@ -190,7 +229,7 @@ async fn app(runtime: Runtime) -> Result<()> {
tracing::debug!("Aggregated metrics: {processed:?}"); tracing::debug!("Aggregated metrics: {processed:?}");
// Update Prometheus metrics // Update Prometheus metrics
metrics_server.lock().await.update(&config, &processed); metrics_collector.lock().await.update(&config, &processed);
// TODO: Enable KV Routers to subscribe to metrics events published here // TODO: Enable KV Routers to subscribe to metrics events published here
// for a single view of the aggregated metrics, as opposed to the current // for a single view of the aggregated metrics, as opposed to the current
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment