Unverified Commit e5a8628f authored by Keiven C's avatar Keiven C Committed by GitHub
Browse files

feat: add a hierarchical Prometheus MetricsRegistry trait for...


feat: add a hierarchical Prometheus MetricsRegistry trait for DistributedRuntime, Namespace, Components, and Endpoint (#2008)
Co-authored-by: default avatarKeiven Chang <keivenchang@users.noreply.github.com>
Co-authored-by: default avatarRyan Olson <rolson@nvidia.com>
parent 20c5daf3
...@@ -49,7 +49,6 @@ hf-hub = { version = "0.4.2", default-features = false, features = ["tokio", "ru ...@@ -49,7 +49,6 @@ hf-hub = { version = "0.4.2", default-features = false, features = ["tokio", "ru
humantime = { version = "2.2.0" } humantime = { version = "2.2.0" }
libc = { version = "0.2" } libc = { version = "0.2" }
oneshot = { version = "0.1.11", features = ["std", "async"] } oneshot = { version = "0.1.11", features = ["std", "async"] }
opentelemetry = { version = "0.27" }
prometheus = { version = "0.14" } prometheus = { version = "0.14" }
rand = { version = "0.9.0" } rand = { version = "0.9.0" }
reqwest = { version = "0.12.22", default-features = false, features = ["json", "stream", "rustls-tls"] } reqwest = { version = "0.12.22", default-features = false, features = ["json", "stream", "rustls-tls"] }
......
...@@ -173,6 +173,7 @@ async fn app(runtime: Runtime) -> Result<()> { ...@@ -173,6 +173,7 @@ async fn app(runtime: Runtime) -> Result<()> {
let namespace_clone = namespace.clone(); let namespace_clone = namespace.clone();
let metrics_collector_clone = metrics_collector.clone(); let metrics_collector_clone = metrics_collector.clone();
// Note: Subscribing to KVHitRateEvent for illustration purposes. They're not used in production.
// Spawn a task to handle KV hit rate events // Spawn a task to handle KV hit rate events
tokio::spawn(async move { tokio::spawn(async move {
match namespace_clone.subscribe(kv_hit_rate_subject).await { match namespace_clone.subscribe(kv_hit_rate_subject).await {
......
...@@ -2567,6 +2567,18 @@ dependencies = [ ...@@ -2567,6 +2567,18 @@ dependencies = [
"version-compare", "version-compare",
] ]
[[package]]
name = "system_metrics"
version = "0.3.2"
dependencies = [
"dynamo-runtime",
"futures",
"prometheus",
"serde",
"serde_json",
"tokio",
]
[[package]] [[package]]
name = "target-lexicon" name = "target-lexicon"
version = "0.12.16" version = "0.12.16"
......
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
members = [ members = [
"hello_world", "hello_world",
"service_metrics", "service_metrics",
"system_metrics",
] ]
resolver = "3" resolver = "3"
...@@ -32,3 +33,4 @@ repository = "https://github.com/ai-dynamo/dynamo.git" ...@@ -32,3 +33,4 @@ repository = "https://github.com/ai-dynamo/dynamo.git"
[workspace.dependencies] [workspace.dependencies]
# local or crates.io # local or crates.io
dynamo-runtime = { path = "../" } dynamo-runtime = { path = "../" }
prometheus = { workspace = true }
...@@ -45,6 +45,9 @@ async fn app(runtime: Runtime) -> Result<()> { ...@@ -45,6 +45,9 @@ async fn app(runtime: Runtime) -> Result<()> {
println!("{:?}", resp); println!("{:?}", resp);
} }
// This is just an illustration to invoke the server's stats_registry(<action>), where
// the action currently increments the `service_requests_total` metric. You can validate
// the result by running `curl http://localhost:8000/metrics`
let service_set = component.scrape_stats(Duration::from_millis(100)).await?; let service_set = component.scrape_stats(Duration::from_millis(100)).await?;
println!("{:?}", service_set); println!("{:?}", service_set);
......
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
[package]
name = "system_metrics"
version.workspace = true
edition.workspace = true
authors.workspace = true
license.workspace = true
homepage.workspace = true
repository.workspace = true
[dependencies]
dynamo-runtime = { workspace = true }
# third-party
futures = "0.3"
serde = { version = "1", features = ["derive"] }
serde_json = { version = "1" }
tokio = { version = "1", features = ["full"] }
prometheus = { version = "0.14" }
# System Metrics Example
Demonstrates custom metrics and monitoring in Dynamo Runtime using Prometheus.
## Overview
- Automatic hierarchical labeling: Runtime automatically adds `namespace``component``endpoint` labels
- Uses existing Prometheus implementations
- HTTP metrics endpoint automatically added
## Quick Start
### Build
```bash
cd lib/runtime/examples/system_metrics
cargo build
```
### Run Server
```bash
export DYN_LOG=1 DYN_SYSTEM_ENABLED=true DYN_SYSTEM_PORT=8000
cargo run --bin system_server
```
### Run Client
```bash
cargo run --bin system_client
```
Note: Running the client will increment `service_requests_total`.
### View Metrics
```bash
curl http://localhost:8000/metrics
```
Example output:
```
# HELP service_request_duration_seconds Time spent processing requests
# TYPE service_request_duration_seconds histogram
service_request_duration_seconds_bucket{component="component",endpoint="endpoint",namespace="system",service="backend",le="0.005"} 2
service_request_duration_seconds_bucket{component="component",endpoint="endpoint",namespace="system",service="backend",le="0.01"} 2
service_request_duration_seconds_bucket{component="component",endpoint="endpoint",namespace="system",service="backend",le="0.025"} 2
service_request_duration_seconds_bucket{component="component",endpoint="endpoint",namespace="system",service="backend",le="0.05"} 2
service_request_duration_seconds_bucket{component="component",endpoint="endpoint",namespace="system",service="backend",le="0.1"} 2
service_request_duration_seconds_bucket{component="component",endpoint="endpoint",namespace="system",service="backend",le="0.25"} 2
service_request_duration_seconds_bucket{component="component",endpoint="endpoint",namespace="system",service="backend",le="0.5"} 2
service_request_duration_seconds_bucket{component="component",endpoint="endpoint",namespace="system",service="backend",le="1"} 2
service_request_duration_seconds_bucket{component="component",endpoint="endpoint",namespace="system",service="backend",le="2.5"} 2
service_request_duration_seconds_bucket{component="component",endpoint="endpoint",namespace="system",service="backend",le="5"} 2
service_request_duration_seconds_bucket{component="component",endpoint="endpoint",namespace="system",service="backend",le="10"} 2
service_request_duration_seconds_bucket{component="component",endpoint="endpoint",namespace="system",service="backend",le="+Inf"} 2
service_request_duration_seconds_sum{component="component",endpoint="endpoint",namespace="system",service="backend"} 0.000022239000000000002
service_request_duration_seconds_count{component="component",endpoint="endpoint",namespace="system",service="backend"} 2
# HELP service_requests_total Total number of requests processed
# TYPE service_requests_total counter
service_requests_total{component="component",endpoint="endpoint",namespace="system",service="backend"} 2
# HELP uptime_seconds Total uptime of the DistributedRuntime in seconds
# TYPE uptime_seconds gauge
uptime_seconds{namespace="http_server"} 725.997013676
```
## Configuration
| Variable | Description | Default |
|----------|-------------|---------|
| `DYN_LOG` | Enable logging | `0` |
| `DYN_SYSTEM_ENABLED` | Enable system metrics | `false` |
| `DYN_SYSTEM_PORT` | HTTP server port | `8000` |
## Metrics
- `service_requests_total`: Request counter
- `service_request_duration_seconds`: Request duration histogram
- `uptime_seconds`: Server uptime gauge
This provides automatic context and grouping for all metrics without manual configuration.
## Troubleshooting
- **Port in use**: Change `DYN_SYSTEM_PORT`
- **Connection refused**: Ensure server is running first
- **No metrics**: Verify `DYN_SYSTEM_ENABLED=true`
\ No newline at end of file
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use futures::StreamExt;
use system_metrics::DEFAULT_NAMESPACE;
use dynamo_runtime::{
logging, pipeline::PushRouter, protocols::annotated::Annotated, utils::Duration,
DistributedRuntime, Result, Runtime, Worker,
};
fn main() -> Result<()> {
logging::init();
let worker = Worker::from_settings()?;
worker.execute(app)
}
async fn app(runtime: Runtime) -> Result<()> {
let distributed = DistributedRuntime::from_settings(runtime.clone()).await?;
let namespace = distributed.namespace(DEFAULT_NAMESPACE)?;
let component = namespace.component("component")?;
let client = component.endpoint("endpoint").client().await?;
client.wait_for_instances().await?;
let router =
PushRouter::<String, Annotated<String>>::from_client(client, Default::default()).await?;
let mut stream = router.random("hello world".to_string().into()).await?;
while let Some(resp) = stream.next().await {
println!("{:?}", resp);
}
let service_set = component.scrape_stats(Duration::from_millis(100)).await?;
println!("{:?}", service_set);
runtime.shutdown();
Ok(())
}
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use system_metrics::{MyStats, DEFAULT_NAMESPACE};
use dynamo_runtime::{
logging,
metrics::MetricsRegistry,
pipeline::{
async_trait, network::Ingress, AsyncEngine, AsyncEngineContextProvider, Error, ManyOut,
ResponseStream, SingleIn,
},
protocols::annotated::Annotated,
stream, DistributedRuntime, Result, Runtime, Worker,
};
use prometheus::{Counter, Histogram};
use std::sync::Arc;
/// Service metrics struct using the metric classes from metrics.rs
pub struct MySystemStatsMetrics {
pub request_counter: Arc<Counter>,
pub request_duration: Arc<Histogram>,
}
impl MySystemStatsMetrics {
/// Create a new ServiceMetrics instance using the metric backend
pub fn new<R: MetricsRegistry>(
metrics_registry: Arc<R>,
) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
let request_counter = metrics_registry.create_counter(
"service_requests_total",
"Total number of requests processed",
&[("service", "backend")],
)?;
let request_duration = metrics_registry.create_histogram(
"service_request_duration_seconds",
"Time spent processing requests",
&[("service", "backend")],
None,
)?;
Ok(Self {
request_counter,
request_duration,
})
}
}
fn main() -> Result<()> {
logging::init();
let worker = Worker::from_settings()?;
worker.execute(app)
}
async fn app(runtime: Runtime) -> Result<()> {
let distributed = DistributedRuntime::from_settings(runtime.clone()).await?;
backend(distributed).await
}
struct RequestHandler {
metrics: Arc<MySystemStatsMetrics>,
}
impl RequestHandler {
fn new(metrics: Arc<MySystemStatsMetrics>) -> Arc<Self> {
Arc::new(Self { metrics })
}
}
#[async_trait]
impl AsyncEngine<SingleIn<String>, ManyOut<Annotated<String>>, Error> for RequestHandler {
async fn generate(&self, input: SingleIn<String>) -> Result<ManyOut<Annotated<String>>> {
let start_time = std::time::Instant::now();
// Record request start
self.metrics.request_counter.inc();
let (data, ctx) = input.into_parts();
let chars = data
.chars()
.map(|c| Annotated::from_data(c.to_string()))
.collect::<Vec<_>>();
let stream = stream::iter(chars);
// Record request duration
let duration = start_time.elapsed();
self.metrics
.request_duration
.observe(duration.as_secs_f64());
Ok(ResponseStream::new(Box::pin(stream), ctx.context()))
}
}
async fn backend(drt: DistributedRuntime) -> Result<()> {
let endpoint = drt
.namespace(DEFAULT_NAMESPACE)?
.component("component")?
.service_builder()
.create()
.await?
.endpoint("endpoint");
// make the ingress discoverable via a component service
// we must first create a service, then we can attach one more more endpoints
// attach an ingress to an engine, with the RequestHandler using the metrics struct
let endpoint_metrics = Arc::new(
MySystemStatsMetrics::new(Arc::new(endpoint.clone()))
.map_err(|e| Error::msg(e.to_string()))?,
);
let ingress = Ingress::for_engine(RequestHandler::new(endpoint_metrics.clone()))?;
endpoint
.endpoint_builder()
.stats_handler(|_stats| {
println!("Stats handler called with stats: {:?}", _stats);
let stats = MyStats { val: 10 };
serde_json::to_value(stats).unwrap()
})
.handler(ingress)
.start()
.await?;
Ok(())
}
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use serde::{Deserialize, Serialize};
pub const DEFAULT_NAMESPACE: &str = "system";
#[derive(Serialize, Deserialize)]
// Dummy Stats object to demonstrate how to attach a custom stats handler
pub struct MyStats {
pub val: u32,
}
...@@ -29,7 +29,9 @@ ...@@ -29,7 +29,9 @@
//! //!
//! TODO: Top-level Overview of Endpoints/Functions //! TODO: Top-level Overview of Endpoints/Functions
use crate::{discovery::Lease, service::ServiceSet, transports::etcd::EtcdPath}; use crate::{
discovery::Lease, metrics::MetricsRegistry, service::ServiceSet, transports::etcd::EtcdPath,
};
use super::{ use super::{
error, error,
...@@ -168,6 +170,20 @@ impl RuntimeProvider for Component { ...@@ -168,6 +170,20 @@ impl RuntimeProvider for Component {
} }
} }
impl MetricsRegistry for Component {
fn basename(&self) -> String {
self.name.clone()
}
fn parent_hierarchy(&self) -> Vec<String> {
[
self.namespace.parent_hierarchy(),
vec![self.namespace.basename()],
]
.concat()
}
}
impl Component { impl Component {
/// The component part of an instance path in etcd. /// The component part of an instance path in etcd.
pub fn etcd_root(&self) -> String { pub fn etcd_root(&self) -> String {
...@@ -300,6 +316,20 @@ impl RuntimeProvider for Endpoint { ...@@ -300,6 +316,20 @@ impl RuntimeProvider for Endpoint {
} }
} }
impl MetricsRegistry for Endpoint {
fn basename(&self) -> String {
self.name.clone()
}
fn parent_hierarchy(&self) -> Vec<String> {
[
self.component.parent_hierarchy(),
vec![self.component.basename()],
]
.concat()
}
}
impl Endpoint { impl Endpoint {
pub fn id(&self) -> EndpointId { pub fn id(&self) -> EndpointId {
EndpointId { EndpointId {
......
...@@ -19,7 +19,7 @@ use futures::stream::StreamExt; ...@@ -19,7 +19,7 @@ use futures::stream::StreamExt;
use futures::{Stream, TryStreamExt}; use futures::{Stream, TryStreamExt};
use super::*; use super::*;
use crate::metrics::MetricsRegistry;
use crate::traits::events::{EventPublisher, EventSubscriber}; use crate::traits::events::{EventPublisher, EventSubscriber};
#[async_trait] #[async_trait]
...@@ -78,6 +78,16 @@ impl EventSubscriber for Namespace { ...@@ -78,6 +78,16 @@ impl EventSubscriber for Namespace {
} }
} }
impl MetricsRegistry for Namespace {
fn basename(&self) -> String {
self.name.clone()
}
fn parent_hierarchy(&self) -> Vec<String> {
vec![self.drt().basename()]
}
}
#[cfg(feature = "integration")] #[cfg(feature = "integration")]
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
......
...@@ -17,6 +17,7 @@ pub use crate::component::Component; ...@@ -17,6 +17,7 @@ pub use crate::component::Component;
use crate::{ use crate::{
component::{self, ComponentBuilder, Endpoint, InstanceSource, Namespace}, component::{self, ComponentBuilder, Endpoint, InstanceSource, Namespace},
discovery::DiscoveryClient, discovery::DiscoveryClient,
metrics::MetricsRegistry,
service::ServiceClient, service::ServiceClient,
transports::{etcd, nats, tcp}, transports::{etcd, nats, tcp},
ErrorContext, ErrorContext,
...@@ -30,6 +31,16 @@ use std::collections::HashMap; ...@@ -30,6 +31,16 @@ use std::collections::HashMap;
use tokio::sync::Mutex; use tokio::sync::Mutex;
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
impl MetricsRegistry for DistributedRuntime {
fn basename(&self) -> String {
"".to_string() // drt has no basename. Basename only begins with the Namespace.
}
fn parent_hierarchy(&self) -> Vec<String> {
vec![] // drt is the root, so no parent hierarchy
}
}
impl DistributedRuntime { impl DistributedRuntime {
pub async fn new(runtime: Runtime, config: DistributedConfig) -> Result<Self> { pub async fn new(runtime: Runtime, config: DistributedConfig) -> Result<Self> {
let secondary = runtime.secondary(); let secondary = runtime.secondary();
...@@ -65,6 +76,16 @@ impl DistributedRuntime { ...@@ -65,6 +76,16 @@ impl DistributedRuntime {
}) })
.await??; .await??;
// Start HTTP server for health and metrics if enabled in configuration
let config = crate::config::RuntimeConfig::from_settings().unwrap_or_default();
// IMPORTANT: We must extract cancel_token from runtime BEFORE moving runtime into the struct below.
// This is because after moving, runtime is no longer accessible in this scope (ownership rules).
let cancel_token = if config.system_server_enabled() {
Some(runtime.clone().child_token())
} else {
None
};
let distributed_runtime = Self { let distributed_runtime = Self {
runtime, runtime,
etcd_client, etcd_client,
...@@ -73,24 +94,27 @@ impl DistributedRuntime { ...@@ -73,24 +94,27 @@ impl DistributedRuntime {
component_registry: component::Registry::new(), component_registry: component::Registry::new(),
is_static, is_static,
instance_sources: Arc::new(Mutex::new(HashMap::new())), instance_sources: Arc::new(Mutex::new(HashMap::new())),
start_time: std::time::Instant::now(), prometheus_registries_by_prefix: Arc::new(std::sync::Mutex::new(HashMap::<
String,
prometheus::Registry,
>::new())),
}; };
// Start HTTP server for health and metrics (if enabled) // Start HTTP server if enabled
let config = crate::config::RuntimeConfig::from_settings().unwrap_or_default(); if let Some(cancel_token) = cancel_token {
if config.system_server_enabled() { let host = config.system_host.clone();
let drt_arc = Arc::new(distributed_runtime.clone()); let port = config.system_port;
let runtime_clone = distributed_runtime.runtime.clone();
// spawn_http_server spawns its own background task: // Start HTTP server (it spawns its own task internally)
match crate::http_server::spawn_http_server( match crate::http_server::spawn_http_server(
&config.system_host, &host,
config.system_port, port,
runtime_clone.child_token(), cancel_token,
drt_arc, Arc::new(distributed_runtime.clone()),
) )
.await .await
{ {
Ok((addr, _handle)) => { Ok((addr, _)) => {
tracing::info!("HTTP server started successfully on {}", addr); tracing::info!("HTTP server started successfully on {}", addr);
} }
Err(e) => { Err(e) => {
...@@ -191,11 +215,6 @@ impl DistributedRuntime { ...@@ -191,11 +215,6 @@ impl DistributedRuntime {
pub fn instance_sources(&self) -> Arc<Mutex<HashMap<Endpoint, Weak<InstanceSource>>>> { pub fn instance_sources(&self) -> Arc<Mutex<HashMap<Endpoint, Weak<InstanceSource>>>> {
self.instance_sources.clone() self.instance_sources.clone()
} }
/// Get the uptime of this DistributedRuntime in seconds
pub fn uptime(&self) -> std::time::Duration {
self.start_time.elapsed()
}
} }
#[derive(Dissolve)] #[derive(Dissolve)]
......
...@@ -13,77 +13,106 @@ ...@@ -13,77 +13,106 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
use crate::metrics::MetricsRegistry;
use crate::traits::DistributedRuntimeProvider;
use axum::{body, http::StatusCode, response::IntoResponse, routing::get, Router}; use axum::{body, http::StatusCode, response::IntoResponse, routing::get, Router};
use prometheus::{
proto, register_gauge_with_registry, Encoder, Gauge, Opts, Registry, TextEncoder,
};
use std::sync::Arc; use std::sync::Arc;
use std::sync::OnceLock;
use std::time::Instant;
use tokio::net::TcpListener; use tokio::net::TcpListener;
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use tracing; use tracing;
/// Runtime metrics for HTTP server pub struct HttpMetricsRegistry {
pub struct RuntimeMetrics { pub drt: Arc<crate::DistributedRuntime>,
uptime_gauge: Gauge,
} }
impl RuntimeMetrics { impl crate::traits::DistributedRuntimeProvider for HttpMetricsRegistry {
pub fn new(metrics_registry: &Arc<Registry>) -> anyhow::Result<Arc<Self>> { fn drt(&self) -> &crate::DistributedRuntime {
let uptime_opts = Opts::new( &self.drt
"uptime_seconds", }
"Total uptime of the DistributedRuntime in seconds", }
)
.namespace("dynamo")
.subsystem("runtime");
let uptime_gauge = register_gauge_with_registry!(uptime_opts, metrics_registry)?;
Ok(Arc::new(Self { uptime_gauge })) impl MetricsRegistry for HttpMetricsRegistry {
fn basename(&self) -> String {
"http_server".to_string()
} }
pub fn update_uptime(&self, uptime_seconds: f64) { fn parent_hierarchy(&self) -> Vec<String> {
self.uptime_gauge.set(uptime_seconds); [self.drt().parent_hierarchy(), vec![self.drt().basename()]].concat()
} }
} }
/// HTTP server state containing pre-created metrics /// HTTP server state containing metrics and uptime tracking
pub struct HttpServerState { pub struct HttpServerState {
drt: Arc<crate::DistributedRuntime>, // global drt registry is for printing out the entire Prometheus format output
registry: Arc<Registry>, root_drt: Arc<crate::DistributedRuntime>,
runtime_metrics: Arc<RuntimeMetrics>, start_time: OnceLock<Instant>,
uptime_gauge: Arc<prometheus::Gauge>,
} }
impl HttpServerState { impl HttpServerState {
/// Create new HTTP server state with pre-created metrics /// Create new HTTP server state with the provided metrics registry
pub fn new(drt: Arc<crate::DistributedRuntime>) -> anyhow::Result<Self> { pub fn new(drt: Arc<crate::DistributedRuntime>) -> anyhow::Result<Self> {
let registry = Arc::new(Registry::new()); let http_metrics_registry = Arc::new(HttpMetricsRegistry { drt: drt.clone() });
let uptime_gauge = http_metrics_registry.as_ref().create_gauge(
"uptime_seconds",
"Total uptime of the DistributedRuntime in seconds",
&[],
)?;
let state = Self {
root_drt: drt,
start_time: OnceLock::new(),
uptime_gauge,
};
Ok(state)
}
/// Initialize the start time (can only be called once)
pub fn initialize_start_time(&self) -> Result<(), &'static str> {
self.start_time
.set(Instant::now())
.map_err(|_| "Start time already initialized")
}
pub fn uptime(&self) -> Result<std::time::Duration, &'static str> {
self.start_time
.get()
.ok_or("Start time not initialized")
.map(|start_time| start_time.elapsed())
}
// Create runtime metrics /// Get a reference to the distributed runtime
let runtime_metrics = RuntimeMetrics::new(&registry)?; pub fn drt(&self) -> &crate::DistributedRuntime {
&self.root_drt
}
Ok(Self { /// Update the uptime gauge with current value
drt, pub fn update_uptime_gauge(&self) {
registry, if let Ok(uptime) = self.uptime() {
runtime_metrics, let uptime_seconds = uptime.as_secs_f64();
}) self.uptime_gauge.set(uptime_seconds);
} else {
tracing::warn!("Failed to update uptime gauge: start time not initialized");
}
} }
} }
/// Start HTTP server with DistributedRuntime support /// Start HTTP server with metrics support
pub async fn spawn_http_server( pub async fn spawn_http_server(
host: &str, host: &str,
port: u16, port: u16,
cancel_token: CancellationToken, cancel_token: CancellationToken,
drt: Arc<crate::DistributedRuntime>, drt: Arc<crate::DistributedRuntime>,
) -> anyhow::Result<(std::net::SocketAddr, tokio::task::JoinHandle<()>)> { ) -> anyhow::Result<(std::net::SocketAddr, tokio::task::JoinHandle<()>)> {
tracing::info!( // Create HTTP server state with the provided metrics registry
"[spawn_http_server] called with host={}, port={}",
host,
port
);
// Create HTTP server state with pre-created metrics
let server_state = Arc::new(HttpServerState::new(drt)?); let server_state = Arc::new(HttpServerState::new(drt)?);
// Initialize the start time
server_state
.initialize_start_time()
.map_err(|e| anyhow::anyhow!("Failed to initialize start time: {}", e))?;
let app = Router::new() let app = Router::new()
.route( .route(
"/health", "/health",
...@@ -146,48 +175,57 @@ pub async fn spawn_http_server( ...@@ -146,48 +175,57 @@ pub async fn spawn_http_server(
/// Health handler /// Health handler
async fn health_handler(state: Arc<HttpServerState>) -> impl IntoResponse { async fn health_handler(state: Arc<HttpServerState>) -> impl IntoResponse {
tracing::info!("[health_handler] called"); match state.uptime() {
let uptime = state.drt.uptime(); Ok(uptime) => {
let response = format!("OK\nUptime: {} seconds\n", uptime.as_secs()); let response = format!("OK\nUptime: {} seconds\n", uptime.as_secs());
(StatusCode::OK, response) (StatusCode::OK, response)
}
Err(e) => {
tracing::error!("Failed to get uptime: {}", e);
(
StatusCode::INTERNAL_SERVER_ERROR,
"Failed to get uptime".to_string(),
)
}
}
} }
/// Metrics handler with DistributedRuntime uptime /// Metrics handler with DistributedRuntime uptime
async fn metrics_handler(state: Arc<HttpServerState>) -> impl IntoResponse { async fn metrics_handler(state: Arc<HttpServerState>) -> impl IntoResponse {
// Update the uptime gauge with current value // Update the uptime gauge with current value
let uptime_seconds = state.drt.uptime().as_secs_f64(); state.update_uptime_gauge();
state.runtime_metrics.update_uptime(uptime_seconds);
// Gather metrics from the registry // Get metrics from the registry
let metric_families = state.registry.gather(); match state.drt().prometheus_metrics_fmt() {
let encoder = TextEncoder::new();
let mut buffer = Vec::new();
match encoder.encode(&metric_families, &mut buffer) {
Ok(()) => match String::from_utf8(buffer) {
Ok(response) => (StatusCode::OK, response), Ok(response) => (StatusCode::OK, response),
Err(e) => { Err(e) => {
tracing::error!("Failed to encode metrics as UTF-8: {}", e); tracing::error!("Failed to get metrics from registry: {}", e);
(
StatusCode::INTERNAL_SERVER_ERROR,
"Failed to encode metrics as UTF-8".to_string(),
)
}
},
Err(e) => {
tracing::error!("Failed to encode metrics: {}", e);
( (
StatusCode::INTERNAL_SERVER_ERROR, StatusCode::INTERNAL_SERVER_ERROR,
"Failed to encode metrics".to_string(), "Failed to get metrics".to_string(),
) )
} }
} }
} }
// Regular tests: cargo test http_server --lib
// Integration tests: cargo test http_server --lib --features integration
#[cfg(test)]
/// Helper function to create a DRT instance for async testing
/// Uses the test-friendly constructor without discovery
async fn create_test_drt_async() -> crate::DistributedRuntime {
let rt = crate::Runtime::from_current().unwrap();
crate::DistributedRuntime::from_settings_without_discovery(rt)
.await
.unwrap()
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use crate::metrics::MetricsRegistry;
use std::sync::Arc;
use tokio::time::{sleep, Duration}; use tokio::time::{sleep, Duration};
#[tokio::test] #[tokio::test]
...@@ -220,66 +258,68 @@ mod tests { ...@@ -220,66 +258,68 @@ mod tests {
); );
} }
#[cfg(feature = "integration")]
#[tokio::test] #[tokio::test]
async fn test_runtime_metrics_creation() { async fn test_runtime_metrics_initialization_and_namespace() {
// Test RuntimeMetrics creation and functionality // Test that metrics have correct namespace
let registry = Arc::new(Registry::new()); let drt = create_test_drt_async().await;
let runtime_metrics = RuntimeMetrics::new(&registry).unwrap(); let runtime_metrics = HttpServerState::new(Arc::new(drt)).unwrap();
// Wait a bit to ensure uptime is measurable
tokio::time::sleep(Duration::from_millis(10)).await;
// Test updating uptime // Initialize start time
let uptime_seconds = 123.456; runtime_metrics.initialize_start_time().unwrap();
runtime_metrics.update_uptime(uptime_seconds);
// Gather metrics from the registry runtime_metrics.uptime_gauge.set(42.0);
let metric_families = registry.gather();
let encoder = TextEncoder::new(); let response = runtime_metrics.drt().prometheus_metrics_fmt().unwrap();
let mut buffer = Vec::new(); println!("Full metrics response:\n{}", response);
encoder.encode(&metric_families, &mut buffer).unwrap();
let response = String::from_utf8(buffer).unwrap(); let expected = "\
assert!(response.contains("dynamo_runtime_uptime_seconds")); # HELP uptime_seconds Total uptime of the DistributedRuntime in seconds
assert!(response.contains("123.456")); # TYPE uptime_seconds gauge
uptime_seconds{namespace=\"http_server\"} 42
";
assert_eq!(response, expected);
} }
#[cfg(feature = "integration")]
#[tokio::test] #[tokio::test]
async fn test_runtime_metrics_namespace() { async fn test_start_time_initialization() {
// Test that metrics have correct namespace // Test that start time can only be initialized once
let registry = Arc::new(Registry::new()); let drt = create_test_drt_async().await;
let runtime_metrics = RuntimeMetrics::new(&registry).unwrap(); let runtime_metrics = HttpServerState::new(Arc::new(drt)).unwrap();
runtime_metrics.update_uptime(42.0); // First initialization should succeed
assert!(runtime_metrics.initialize_start_time().is_ok());
let metric_families = registry.gather(); // Second initialization should fail
let encoder = TextEncoder::new(); assert!(runtime_metrics.initialize_start_time().is_err());
let mut buffer = Vec::new();
encoder.encode(&metric_families, &mut buffer).unwrap();
let response = String::from_utf8(buffer).unwrap(); // Uptime should work after initialization
// Check for the full metric name with namespace and subsystem let _uptime = runtime_metrics.uptime().unwrap();
assert!(response.contains("dynamo_runtime_uptime_seconds")); // If we get here, uptime calculation works correctly
assert!(response.contains("Total uptime of the DistributedRuntime in seconds"));
} }
/* #[cfg(feature = "integration")]
#[tokio::test]
async fn test_uptime_without_initialization() {
// Test that uptime returns an error if start time is not initialized
let drt = create_test_drt_async().await;
let runtime_metrics = HttpServerState::new(Arc::new(drt)).unwrap();
// This should return an error because start time is not initialized
let result = runtime_metrics.uptime();
assert!(result.is_err());
assert_eq!(result.unwrap_err(), "Start time not initialized");
}
#[cfg(feature = "integration")]
#[tokio::test] #[tokio::test]
async fn test_spawn_http_server_endpoints() { async fn test_spawn_http_server_endpoints() {
use std::sync::Arc;
use tokio::time::sleep;
use tokio_util::sync::CancellationToken;
// use tokio::io::{AsyncReadExt, AsyncWriteExt};
// use reqwest for HTTP requests // use reqwest for HTTP requests
let runtime = crate::Runtime::from_settings().unwrap();
let drt = Arc::new(
crate::DistributedRuntime::from_settings_without_discovery(runtime)
.await
.unwrap(),
);
let cancel_token = CancellationToken::new(); let cancel_token = CancellationToken::new();
let (addr, server_handle) = spawn_http_server("127.0.0.1", 0, cancel_token.clone(), drt) let drt = create_test_drt_async().await;
let (addr, server_handle) =
spawn_http_server("127.0.0.1", 0, cancel_token.clone(), Arc::new(drt))
.await .await
.unwrap(); .unwrap();
println!("[test] Waiting for server to start..."); println!("[test] Waiting for server to start...");
...@@ -324,5 +364,36 @@ mod tests { ...@@ -324,5 +364,36 @@ mod tests {
} }
} }
} }
*/
#[cfg(feature = "integration")]
#[tokio::test]
async fn test_http_server_basic_functionality() {
// Test basic HTTP server functionality without requiring etcd
let cancel_token = CancellationToken::new();
let cancel_token_for_server = cancel_token.clone();
// Test basic HTTP server lifecycle
let app = Router::new().route("/test", get(|| async { (StatusCode::OK, "test") }));
// start HTTP server
let server_handle = tokio::spawn(async move {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let _ = axum::serve(listener, app)
.with_graceful_shutdown(cancel_token_for_server.cancelled_owned())
.await;
});
// wait for a while to let the server start
sleep(Duration::from_millis(100)).await;
// cancel token
cancel_token.cancel();
// wait for the server to shut down
let result = tokio::time::timeout(Duration::from_secs(5), server_handle).await;
assert!(
result.is_ok(),
"HTTP server should shut down when cancel token is cancelled"
);
}
} }
...@@ -38,6 +38,7 @@ pub mod discovery; ...@@ -38,6 +38,7 @@ pub mod discovery;
pub mod engine; pub mod engine;
pub mod http_server; pub mod http_server;
pub mod logging; pub mod logging;
pub mod metrics;
pub mod pipeline; pub mod pipeline;
pub mod prelude; pub mod prelude;
pub mod protocols; pub mod protocols;
...@@ -99,6 +100,6 @@ pub struct DistributedRuntime { ...@@ -99,6 +100,6 @@ pub struct DistributedRuntime {
instance_sources: Arc<Mutex<HashMap<Endpoint, Weak<InstanceSource>>>>, instance_sources: Arc<Mutex<HashMap<Endpoint, Weak<InstanceSource>>>>,
// Start time for tracking uptime // This map associates metric prefixes with their corresponding Prometheus registries.
start_time: std::time::Instant, prometheus_registries_by_prefix: Arc<std::sync::Mutex<HashMap<String, prometheus::Registry>>>,
} }
This diff is collapsed.
...@@ -31,3 +31,14 @@ impl RuntimeProvider for DistributedRuntime { ...@@ -31,3 +31,14 @@ impl RuntimeProvider for DistributedRuntime {
&self.runtime &self.runtime
} }
} }
// This implementation is required because:
// 1. MetricsRegistry has a supertrait bound: `MetricsRegistry: Send + Sync + DistributedRuntimeProvider`
// 2. DistributedRuntime implements MetricsRegistry (in distributed.rs)
// 3. Therefore, DistributedRuntime must implement DistributedRuntimeProvider to satisfy the trait bound
// 4. This enables DistributedRuntime to serve as both a provider (of itself) and a metrics registry
impl DistributedRuntimeProvider for DistributedRuntime {
fn drt(&self) -> &DistributedRuntime {
self
}
}
...@@ -616,7 +616,7 @@ mod tests { ...@@ -616,7 +616,7 @@ mod tests {
fn test_ectd_client() { fn test_ectd_client() {
let rt = Runtime::from_settings().unwrap(); let rt = Runtime::from_settings().unwrap();
let rt_clone = rt.clone(); let rt_clone = rt.clone();
let config = DistributedConfig::from_settings(); let config = DistributedConfig::from_settings(false);
rt_clone.primary().block_on(async move { rt_clone.primary().block_on(async move {
let drt = DistributedRuntime::new(rt, config).await.unwrap(); let drt = DistributedRuntime::new(rt, config).await.unwrap();
...@@ -628,8 +628,11 @@ mod tests { ...@@ -628,8 +628,11 @@ mod tests {
let key = "__integration_test_key"; let key = "__integration_test_key";
let value = b"test_value"; let value = b"test_value";
let client = drt.etcd_client(); let client = drt.etcd_client().expect("etcd client should be available");
let lease_id = drt.primary_lease().id(); let lease_id = drt
.primary_lease()
.expect("primary lease should be available")
.id();
// Create the key // Create the key
let result = client let result = client
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment