Unverified Commit 17dcffe8 authored by Graham King's avatar Graham King Committed by GitHub
Browse files

chore(runtime): Make nats_client private, refactor NATS stats scraping (#4591)


Signed-off-by: default avatarGraham King <grahamk@nvidia.com>
parent c90e3dff
...@@ -269,131 +269,6 @@ impl Component { ...@@ -269,131 +269,6 @@ impl Component {
instances.sort(); instances.sort();
Ok(instances) Ok(instances)
} }
/// Scrape ServiceSet, which contains NATS stats as well as user defined stats
/// embedded in data field of ServiceInfo.
async fn scrape_stats(&self, timeout: Duration) -> anyhow::Result<ServiceSet> {
// Debug: scraping stats for component
let service_name = self.service_name();
let Some(service_client) = self
.drt()
.nats_client()
.map(|nc| ServiceClient::new(nc.clone()))
else {
anyhow::bail!("ServiceSet is gathered via NATS, do not call this in non-NATS setups.");
};
service_client
.collect_services(&service_name, timeout)
.await
}
/// Add Prometheus metrics for this component's NATS service stats.
///
/// Starts a background task that periodically requests service statistics from NATS
/// and updates the corresponding Prometheus metrics. The first scrape happens immediately,
/// then subsequent scrapes occur at a fixed interval of 9.8 seconds (MAX_WAIT_MS),
/// which should be near or smaller than typical Prometheus scraping intervals to ensure
/// metrics are fresh when Prometheus collects them.
fn start_scraping_nats_service_component_metrics(&self) -> anyhow::Result<()> {
const MAX_WAIT_MS: std::time::Duration = std::time::Duration::from_millis(9800); // Should be <= Prometheus scrape interval
// If there is another component with the same service name, this will fail.
let component_metrics = ComponentNatsServerPrometheusMetrics::new(self)?;
let component_clone = self.clone();
// Start a background task that scrapes stats every 5 seconds
let m = component_metrics.clone();
let c = component_clone.clone();
// Use the DRT's runtime handle to spawn the background task.
// We cannot use regular `tokio::spawn` here because:
// 1. This method may be called from contexts without an active Tokio runtime
// (e.g., tests that create a DRT in a blocking context)
// 2. Tests often create a temporary runtime just to build the DRT, then drop it
// 3. `tokio::spawn` requires being called from within a runtime context
// By using the DRT's own runtime handle, we ensure the task runs in the
// correct runtime that will persist for the lifetime of the component.
c.drt().runtime().secondary().spawn(async move {
let timeout = std::time::Duration::from_millis(500);
let mut interval = tokio::time::interval(MAX_WAIT_MS);
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
match c.scrape_stats(timeout).await {
Ok(service_set) => {
m.update_from_service_set(&service_set);
}
Err(err) => {
tracing::error!(
"Background scrape failed for {}: {}",
c.service_name(),
err
);
m.reset_to_zeros();
}
}
interval.tick().await;
}
});
Ok(())
}
// Gather NATS metrics
async fn add_stats_service(&mut self) -> anyhow::Result<()> {
let service_name = self.service_name();
// Pre-check to save cost of creating the service, but don't hold the lock
if self
.drt
.component_registry()
.inner
.lock()
.await
.services
.contains_key(&service_name)
{
// The NATS service is per component, but it is called from `serve_endpoint`, and there
// are often multiple endpoints for a component (e.g. `clear_kv_blocks` and `generate`).
tracing::trace!("Service {service_name} already exists");
return Ok(());
}
let Some(nats_client) = self.drt.nats_client() else {
anyhow::bail!("Cannot create NATS service without NATS.");
};
let description = None;
let (nats_service, stats_reg) =
service::build_nats_service(nats_client, self, description).await?;
let mut guard = self.drt.component_registry().inner.lock().await;
if !guard.services.contains_key(&service_name) {
// Normal case
guard.services.insert(service_name.clone(), nats_service);
guard.stats_handlers.insert(service_name.clone(), stats_reg);
tracing::info!("Added NATS / stats service {service_name}");
drop(guard);
} else {
drop(guard);
let _ = nats_service.stop().await;
// The NATS service is per component, but it is called from `serve_endpoint`, and there
// are often multiple endpoints for a component (e.g. `clear_kv_blocks` and `generate`).
return Ok(());
}
if let Err(err) = self.start_scraping_nats_service_component_metrics() {
tracing::debug!(
"Metrics registration failed for '{}': {}",
self.service_name(),
err
);
}
Ok(())
}
} }
impl ComponentBuilder { impl ComponentBuilder {
...@@ -404,14 +279,9 @@ impl ComponentBuilder { ...@@ -404,14 +279,9 @@ impl ComponentBuilder {
pub fn build(self) -> Result<Component, anyhow::Error> { pub fn build(self) -> Result<Component, anyhow::Error> {
let component = self.build_internal()?; let component = self.build_internal()?;
// If this component is using NATS, gather it's metrics // If this component is using NATS, gather it's metrics
if component.drt().request_plane().is_nats() { let drt = component.drt();
let mut c = component.clone(); if drt.request_plane().is_nats() {
// Start in the background to isolate the async, and because we don't need it yet drt.start_stats_service(component.clone());
component.drt().runtime().secondary().spawn(async move {
if let Err(err) = c.add_stats_service().await {
tracing::error!(error = %err, component = c.service_name(), "Failed starting stats service");
}
});
} }
Ok(component) Ok(component)
} }
......
...@@ -32,10 +32,9 @@ impl EventPublisher for Component { ...@@ -32,10 +32,9 @@ impl EventPublisher for Component {
bytes: Vec<u8>, bytes: Vec<u8>,
) -> Result<()> { ) -> Result<()> {
let subject = format!("{}.{}", self.subject(), event_name.as_ref()); let subject = format!("{}.{}", self.subject(), event_name.as_ref());
let Some(nats_client) = self.drt().nats_client() else { self.drt()
anyhow::bail!("KV router's EventPublisher requires NATS"); .kv_router_nats_publish(subject, bytes.into())
}; .await?;
nats_client.client().publish(subject, bytes.into()).await?;
Ok(()) Ok(())
} }
} }
...@@ -47,10 +46,7 @@ impl EventSubscriber for Component { ...@@ -47,10 +46,7 @@ impl EventSubscriber for Component {
event_name: impl AsRef<str> + Send + Sync, event_name: impl AsRef<str> + Send + Sync,
) -> Result<async_nats::Subscriber> { ) -> Result<async_nats::Subscriber> {
let subject = format!("{}.{}", self.subject(), event_name.as_ref()); let subject = format!("{}.{}", self.subject(), event_name.as_ref());
let Some(nats_client) = self.drt().nats_client() else { Ok(self.drt().kv_router_nats_subscribe(subject).await?)
anyhow::bail!("KV router's EventSubscriber requires NATS");
};
Ok(nats_client.client().subscribe(subject).await?)
} }
async fn subscribe_with_type<T: for<'de> Deserialize<'de> + Send + 'static>( async fn subscribe_with_type<T: for<'de> Deserialize<'de> + Send + 'static>(
......
...@@ -35,10 +35,9 @@ impl EventPublisher for Namespace { ...@@ -35,10 +35,9 @@ impl EventPublisher for Namespace {
bytes: Vec<u8>, bytes: Vec<u8>,
) -> Result<()> { ) -> Result<()> {
let subject = format!("{}.{}", self.subject(), event_name.as_ref()); let subject = format!("{}.{}", self.subject(), event_name.as_ref());
let Some(nats_client) = self.drt().nats_client() else { self.drt()
anyhow::bail!("KV router's Namespace EventPublisher requires NATS"); .kv_router_nats_publish(subject, bytes.into())
}; .await?;
nats_client.client().publish(subject, bytes.into()).await?;
Ok(()) Ok(())
} }
} }
...@@ -50,10 +49,7 @@ impl EventSubscriber for Namespace { ...@@ -50,10 +49,7 @@ impl EventSubscriber for Namespace {
event_name: impl AsRef<str> + Send + Sync, event_name: impl AsRef<str> + Send + Sync,
) -> Result<async_nats::Subscriber> { ) -> Result<async_nats::Subscriber> {
let subject = format!("{}.{}", self.subject(), event_name.as_ref()); let subject = format!("{}.{}", self.subject(), event_name.as_ref());
let Some(nats_client) = self.drt().nats_client() else { Ok(self.drt().kv_router_nats_subscribe(subject).await?)
anyhow::bail!("KV router's Namespace EventSubscriber requires NATS");
};
Ok(nats_client.client().subscribe(subject).await?)
} }
async fn subscribe_with_type<T: for<'de> Deserialize<'de> + Send + 'static>( async fn subscribe_with_type<T: for<'de> Deserialize<'de> + Send + 'static>(
......
...@@ -3,6 +3,8 @@ ...@@ -3,6 +3,8 @@
use crate::component::{Component, Instance}; use crate::component::{Component, Instance};
use crate::pipeline::PipelineError; use crate::pipeline::PipelineError;
use crate::pipeline::network::manager::NetworkManager;
use crate::service::{ComponentNatsServerPrometheusMetrics, ServiceClient, ServiceSet};
use crate::storage::key_value_store::{ use crate::storage::key_value_store::{
EtcdStore, KeyValueStore, KeyValueStoreEnum, KeyValueStoreManager, KeyValueStoreSelect, EtcdStore, KeyValueStore, KeyValueStoreEnum, KeyValueStoreManager, KeyValueStoreSelect,
MemoryStore, MemoryStore,
...@@ -21,9 +23,12 @@ use super::utils::GracefulShutdownTracker; ...@@ -21,9 +23,12 @@ use super::utils::GracefulShutdownTracker;
use crate::SystemHealth; use crate::SystemHealth;
use crate::runtime::Runtime; use crate::runtime::Runtime;
// Used instead of std::cell::OnceCell because get_or_try_init there is nightly
use async_once_cell::OnceCell; use async_once_cell::OnceCell;
use std::fmt; use std::fmt;
use std::sync::{Arc, OnceLock, Weak}; use std::sync::{Arc, OnceLock, Weak};
use std::time::Duration;
use tokio::sync::watch::Receiver; use tokio::sync::watch::Receiver;
use anyhow::Result; use anyhow::Result;
...@@ -44,8 +49,8 @@ pub struct DistributedRuntime { ...@@ -44,8 +49,8 @@ pub struct DistributedRuntime {
nats_client: Option<transports::nats::Client>, nats_client: Option<transports::nats::Client>,
store: KeyValueStoreManager, store: KeyValueStoreManager,
network_manager: Arc<NetworkManager>,
tcp_server: Arc<OnceCell<Arc<transports::tcp::server::TcpStreamServer>>>, tcp_server: Arc<OnceCell<Arc<transports::tcp::server::TcpStreamServer>>>,
network_manager: Arc<OnceCell<Arc<crate::pipeline::network::manager::NetworkManager>>>,
system_status_server: Arc<OnceLock<Arc<system_status_server::SystemStatusServerInfo>>>, system_status_server: Arc<OnceLock<Arc<system_status_server::SystemStatusServerInfo>>>,
request_plane: RequestPlaneMode, request_plane: RequestPlaneMode,
...@@ -168,18 +173,27 @@ impl DistributedRuntime { ...@@ -168,18 +173,27 @@ impl DistributedRuntime {
} }
}; };
let component_registry = component::Registry::new();
let nats_client_for_metrics = nats_client.clone(); let nats_client_for_metrics = nats_client.clone();
// NetworkManager for request plane
let network_manager = NetworkManager::new(
runtime.child_token(),
nats_client.clone().map(|c| c.client().clone()),
component_registry.clone(),
request_plane,
);
let distributed_runtime = Self { let distributed_runtime = Self {
runtime, runtime,
store, store,
network_manager: Arc::new(network_manager),
nats_client, nats_client,
tcp_server: Arc::new(OnceCell::new()), tcp_server: Arc::new(OnceCell::new()),
network_manager: Arc::new(OnceCell::new()),
system_status_server: Arc::new(OnceLock::new()), system_status_server: Arc::new(OnceLock::new()),
discovery_client, discovery_client,
discovery_metadata, discovery_metadata,
component_registry: component::Registry::new(), component_registry,
instance_sources: Arc::new(Mutex::new(HashMap::new())), instance_sources: Arc::new(Mutex::new(HashMap::new())),
metrics_registry: crate::MetricsRegistry::new(), metrics_registry: crate::MetricsRegistry::new(),
system_health, system_health,
...@@ -336,32 +350,12 @@ impl DistributedRuntime { ...@@ -336,32 +350,12 @@ impl DistributedRuntime {
.clone()) .clone())
} }
/// Get the network manager (lazy initialization) /// Get the network manager
/// ///
/// The network manager consolidates all network configuration and provides /// The network manager consolidates all network configuration and provides
/// unified access to request plane servers and clients. /// unified access to request plane servers and clients.
pub async fn network_manager( pub fn network_manager(&self) -> Arc<NetworkManager> {
&self, self.network_manager.clone()
) -> Result<Arc<crate::pipeline::network::manager::NetworkManager>> {
use crate::pipeline::network::manager::NetworkManager;
let manager = self
.network_manager
.get_or_try_init(async {
// Get NATS client if available
let nats_client = self.nats_client().map(|c| c.client().clone());
// NetworkManager handles all config reading and mode selection
anyhow::Ok(NetworkManager::new(
self.child_token(),
nats_client,
self.component_registry.clone(),
self.request_plane,
))
})
.await?;
Ok(manager.clone())
} }
/// Get the request plane server (convenience method) /// Get the request plane server (convenience method)
...@@ -371,12 +365,7 @@ impl DistributedRuntime { ...@@ -371,12 +365,7 @@ impl DistributedRuntime {
&self, &self,
) -> Result<Arc<dyn crate::pipeline::network::ingress::unified_server::RequestPlaneServer>> ) -> Result<Arc<dyn crate::pipeline::network::ingress::unified_server::RequestPlaneServer>>
{ {
let manager = self.network_manager().await?; self.network_manager().server().await
manager.server().await
}
pub(crate) fn nats_client(&self) -> Option<&nats::Client> {
self.nats_client.as_ref()
} }
/// Get system status server information if available /// Get system status server information if available
...@@ -408,6 +397,151 @@ impl DistributedRuntime { ...@@ -408,6 +397,151 @@ impl DistributedRuntime {
pub fn instance_sources(&self) -> Arc<Mutex<InstanceMap>> { pub fn instance_sources(&self) -> Arc<Mutex<InstanceMap>> {
self.instance_sources.clone() self.instance_sources.clone()
} }
/// TODO: This is a temporary KV router measure for component/component.rs EventPublisher impl for
/// Component, to allow it to publish to NATS. KV Router is the only user.
pub(crate) async fn kv_router_nats_publish(
&self,
subject: String,
payload: bytes::Bytes,
) -> anyhow::Result<()> {
let Some(nats_client) = self.nats_client.as_ref() else {
anyhow::bail!("KV router's EventPublisher requires NATS");
};
Ok(nats_client.client().publish(subject, payload).await?)
}
/// TODO: This is a temporary KV router measure for component/component.rs EventSubscriber impl for
/// Component, to allow it to subscribe to NATS. KV Router is the only user.
pub(crate) async fn kv_router_nats_subscribe(
&self,
subject: String,
) -> Result<async_nats::Subscriber> {
let Some(nats_client) = self.nats_client.as_ref() else {
anyhow::bail!("KV router's EventSubscriber requires NATS");
};
Ok(nats_client.client().subscribe(subject).await?)
}
/// Start NATS metrics service in the background to isolate the async,
/// and because we don't need it yet.
/// TODO: This and the things it calls should be in a nats module somewhere.
pub fn start_stats_service(&self, component: Component) {
let drt = self.clone();
self.runtime().secondary().spawn(async move {
let service_name = component.service_name();
if let Err(err) = drt.add_stats_service(component).await {
tracing::error!(error = %err, component = service_name, "Failed starting stats service");
}
});
}
/// Gather NATS metrics
async fn add_stats_service(&self, component: Component) -> anyhow::Result<()> {
let service_name = component.service_name();
// Pre-check to save cost of creating the service, but don't hold the lock
if self
.component_registry()
.inner
.lock()
.await
.services
.contains_key(&service_name)
{
// The NATS service is per component, but it is called from `serve_endpoint`, and there
// are often multiple endpoints for a component (e.g. `clear_kv_blocks` and `generate`).
tracing::trace!("Service {service_name} already exists");
return Ok(());
}
let Some(nats_client) = self.nats_client.as_ref() else {
anyhow::bail!("Cannot create NATS service without NATS.");
};
let description = None;
let (nats_service, stats_reg) =
crate::component::service::build_nats_service(nats_client, &component, description)
.await?;
let mut guard = self.component_registry().inner.lock().await;
if !guard.services.contains_key(&service_name) {
// Normal case
guard.services.insert(service_name.clone(), nats_service);
guard.stats_handlers.insert(service_name.clone(), stats_reg);
tracing::info!("Added NATS / stats service {service_name}");
drop(guard);
} else {
drop(guard);
let _ = nats_service.stop().await;
// The NATS service is per component, but it is called from `serve_endpoint`, and there
// are often multiple endpoints for a component (e.g. `clear_kv_blocks` and `generate`).
// TODO: Is this still true?
return Ok(());
}
let cancel_token = self.primary_token();
let service_client = self
.nats_client
.as_ref()
.map(|nc| ServiceClient::new(nc.clone()))
.ok_or_else(|| {
anyhow::anyhow!("Stats service requires NATS client to collect service metrics.")
})?;
// If there is another component with the same service name, this will fail.
let component_metrics = ComponentNatsServerPrometheusMetrics::new(&component)?;
self.runtime().secondary().spawn(nats_metrics_worker(
cancel_token,
service_client,
component_metrics,
component,
));
Ok(())
}
}
/// Add Prometheus metrics for this component's NATS service stats.
///
/// Starts a background task that periodically requests service statistics from NATS
/// and updates the corresponding Prometheus metrics. The first scrape happens immediately,
/// then subsequent scrapes occur at a fixed interval of 9.8 seconds (MAX_WAIT_MS),
/// which should be near or smaller than typical Prometheus scraping intervals to ensure
/// metrics are fresh when Prometheus collects them.
async fn nats_metrics_worker(
cancel_token: CancellationToken,
service_client: ServiceClient,
component_metrics: ComponentNatsServerPrometheusMetrics,
component: Component,
) {
const MAX_WAIT_MS: Duration = Duration::from_millis(9800); // Should be <= Prometheus scrape interval
let timeout = Duration::from_millis(500);
let mut interval = tokio::time::interval(MAX_WAIT_MS);
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
let service_name = component.service_name();
loop {
tokio::select! {
result = service_client.collect_services(&service_name, timeout) => {
match result {
Ok(service_set) => {
component_metrics.update_from_service_set(&service_set);
}
Err(err) => {
tracing::error!("Background scrape failed for {service_name}: {err}",);
component_metrics.reset_to_zeros();
}
}
}
_ = cancel_token.cancelled() => {
tracing::trace!("nats_metrics_worker stopped");
break;
}
}
interval.tick().await;
}
} }
#[derive(Dissolve)] #[derive(Dissolve)]
......
...@@ -90,7 +90,7 @@ impl RouterMode { ...@@ -90,7 +90,7 @@ impl RouterMode {
async fn addressed_router(endpoint: &Endpoint) -> anyhow::Result<Arc<AddressedPushRouter>> { async fn addressed_router(endpoint: &Endpoint) -> anyhow::Result<Arc<AddressedPushRouter>> {
// Get network manager and create client (no mode checks!) // Get network manager and create client (no mode checks!)
let manager = endpoint.drt().network_manager().await?; let manager = endpoint.drt().network_manager();
let req_client = manager.create_client()?; let req_client = manager.create_client()?;
let resp_transport = endpoint.drt().tcp_server().await?; let resp_transport = endpoint.drt().tcp_server().await?;
......
...@@ -137,7 +137,7 @@ impl NetworkManager { ...@@ -137,7 +137,7 @@ impl NetworkManager {
nats_client: Option<async_nats::Client>, nats_client: Option<async_nats::Client>,
component_registry: crate::component::Registry, component_registry: crate::component::Registry,
mode: RequestPlaneMode, mode: RequestPlaneMode,
) -> Arc<Self> { ) -> Self {
let config = NetworkConfig::from_env(nats_client); let config = NetworkConfig::from_env(nats_client);
tracing::info!( tracing::info!(
...@@ -147,13 +147,13 @@ impl NetworkManager { ...@@ -147,13 +147,13 @@ impl NetworkManager {
"Initializing NetworkManager" "Initializing NetworkManager"
); );
Arc::new(Self { Self {
mode, mode,
config, config,
server: Arc::new(OnceCell::new()), server: Arc::new(OnceCell::new()),
cancellation_token, cancellation_token,
component_registry, component_registry,
}) }
} }
/// Get or create the request plane server /// Get or create the request plane server
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment