Unverified Commit f6ed01b1 authored by Graham King's avatar Graham King Committed by GitHub
Browse files

chore: Replace ServiceConfigBuilder with add_stats_service (#3736)


Signed-off-by: default avatarGraham King <grahamk@nvidia.com>
parent a7fed329
...@@ -664,10 +664,11 @@ impl Component { ...@@ -664,10 +664,11 @@ impl Component {
}) })
} }
/// NATS specific stats/metrics call
fn create_service<'p>(&self, py: Python<'p>) -> PyResult<Bound<'p, PyAny>> { fn create_service<'p>(&self, py: Python<'p>) -> PyResult<Bound<'p, PyAny>> {
let builder = self.inner.service_builder(); let mut inner = self.inner.clone();
pyo3_async_runtimes::tokio::future_into_py(py, async move { pyo3_async_runtimes::tokio::future_into_py(py, async move {
let _ = builder.create().await.map_err(to_pyerr)?; inner.add_stats_service().await.map_err(to_pyerr)?;
Ok(()) Ok(())
}) })
} }
......
...@@ -39,16 +39,17 @@ pub struct Controller<Locality: LocalityProvider, Metadata: BlockMetadata> { ...@@ -39,16 +39,17 @@ pub struct Controller<Locality: LocalityProvider, Metadata: BlockMetadata> {
impl<Locality: LocalityProvider, Metadata: BlockMetadata> Controller<Locality, Metadata> { impl<Locality: LocalityProvider, Metadata: BlockMetadata> Controller<Locality, Metadata> {
pub async fn new( pub async fn new(
block_manager: KvBlockManager<Locality, Metadata>, block_manager: KvBlockManager<Locality, Metadata>,
component: dynamo_runtime::component::Component, mut component: dynamo_runtime::component::Component,
) -> anyhow::Result<Self> { ) -> anyhow::Result<Self> {
let service = component.service_builder().create().await?; component.add_stats_service().await?;
let handler = ControllerHandler::new(block_manager.clone()); let handler = ControllerHandler::new(block_manager.clone());
let engine = Ingress::for_engine(handler.clone())?; let engine = Ingress::for_engine(handler.clone())?;
let component_clone = component.clone();
let reset_task = CriticalTaskExecutionHandle::new( let reset_task = CriticalTaskExecutionHandle::new(
|_cancel_token| async move { |_cancel_token| async move {
service component_clone
.endpoint("controller") .endpoint("controller")
.endpoint_builder() .endpoint_builder()
.handler(engine) .handler(engine)
......
...@@ -38,8 +38,7 @@ pub async fn run( ...@@ -38,8 +38,7 @@ pub async fn run(
// We can only make the NATS service if we have NATS // We can only make the NATS service if we have NATS
if distributed_runtime.nats_client().is_some() { if distributed_runtime.nats_client().is_some() {
// TODO fix in next PR, ServiceConfigBuilder is silly component.add_stats_service().await?;
component = component.service_builder().create().await?;
} }
let endpoint = component.endpoint(&endpoint_id.name); let endpoint = component.endpoint(&endpoint_id.name);
......
...@@ -939,11 +939,8 @@ mod tests { ...@@ -939,11 +939,8 @@ mod tests {
// Create namespace and shared component for both seq_managers // Create namespace and shared component for both seq_managers
let namespace = distributed.namespace("test_cross_instance_sync")?; let namespace = distributed.namespace("test_cross_instance_sync")?;
let component = namespace let mut component = namespace.component("sequences")?;
.component("sequences")? component.add_stats_service().await?;
.service_builder()
.create()
.await?;
// Create multi-worker sequence managers with: // Create multi-worker sequence managers with:
// - Worker 0 with dp_size=2 (dp_ranks 0 and 1) // - Worker 0 with dp_size=2 (dp_ranks 0 and 1)
...@@ -1108,11 +1105,8 @@ mod tests { ...@@ -1108,11 +1105,8 @@ mod tests {
// Create namespace and shared component for both seq_managers // Create namespace and shared component for both seq_managers
let namespace = distributed.namespace("test_no_token_seq_sync")?; let namespace = distributed.namespace("test_no_token_seq_sync")?;
let component = namespace let mut component = namespace.component("sequences")?;
.component("sequences")? component.add_stats_service().await?;
.service_builder()
.create()
.await?;
// Create multi-worker sequence managers with ALL workers [0, 1, 2] // Create multi-worker sequence managers with ALL workers [0, 1, 2]
// Both use the same component to ensure event synchronization works // Both use the same component to ensure event synchronization works
......
...@@ -539,12 +539,8 @@ mod integration_tests { ...@@ -539,12 +539,8 @@ mod integration_tests {
tracing::info!("✓ Runtime and distributed runtime created"); tracing::info!("✓ Runtime and distributed runtime created");
// Create component for MockVllmEngine (needed for publishers) // Create component for MockVllmEngine (needed for publishers)
let test_component = distributed let mut test_component = distributed.namespace("test")?.component(MOCKER_COMPONENT)?;
.namespace("test")? test_component.add_stats_service().await?;
.component(MOCKER_COMPONENT)?
.service_builder()
.create()
.await?;
tracing::info!("✓ Test component created"); tracing::info!("✓ Test component created");
// Create MockVllmEngine WITH component (enables publishers) // Create MockVllmEngine WITH component (enables publishers)
......
...@@ -54,12 +54,9 @@ async fn backend(runtime: DistributedRuntime) -> Result<()> { ...@@ -54,12 +54,9 @@ async fn backend(runtime: DistributedRuntime) -> Result<()> {
// // make the ingress discoverable via a component service // // make the ingress discoverable via a component service
// // we must first create a service, then we can attach one more more endpoints // // we must first create a service, then we can attach one more more endpoints
runtime let mut component = runtime.namespace(DEFAULT_NAMESPACE)?.component("backend")?;
.namespace(DEFAULT_NAMESPACE)? component.add_stats_service().await?;
.component("backend")? component
.service_builder()
.create()
.await?
.endpoint("generate") .endpoint("generate")
.endpoint_builder() .endpoint_builder()
.handler(ingress) .handler(ingress)
......
...@@ -56,12 +56,9 @@ async fn backend(runtime: DistributedRuntime) -> Result<()> { ...@@ -56,12 +56,9 @@ async fn backend(runtime: DistributedRuntime) -> Result<()> {
// make the ingress discoverable via a component service // make the ingress discoverable via a component service
// we must first create a service, then we can attach one more more endpoints // we must first create a service, then we can attach one more more endpoints
runtime let mut component = runtime.namespace(DEFAULT_NAMESPACE)?.component("backend")?;
.namespace(DEFAULT_NAMESPACE)? component.add_stats_service().await?;
.component("backend")? component
.service_builder()
.create()
.await?
.endpoint("generate") .endpoint("generate")
.endpoint_builder() .endpoint_builder()
.stats_handler(|stats| { .stats_handler(|stats| {
......
...@@ -88,13 +88,11 @@ impl AsyncEngine<SingleIn<String>, ManyOut<Annotated<String>>, Error> for Reques ...@@ -88,13 +88,11 @@ impl AsyncEngine<SingleIn<String>, ManyOut<Annotated<String>>, Error> for Reques
pub async fn backend(drt: DistributedRuntime, endpoint_name: Option<&str>) -> Result<()> { pub async fn backend(drt: DistributedRuntime, endpoint_name: Option<&str>) -> Result<()> {
let endpoint_name = endpoint_name.unwrap_or(DEFAULT_ENDPOINT); let endpoint_name = endpoint_name.unwrap_or(DEFAULT_ENDPOINT);
let endpoint = drt let mut component = drt
.namespace(DEFAULT_NAMESPACE)? .namespace(DEFAULT_NAMESPACE)?
.component(DEFAULT_COMPONENT)? .component(DEFAULT_COMPONENT)?;
.service_builder() component.add_stats_service().await?;
.create() let endpoint = component.endpoint(endpoint_name);
.await?
.endpoint(endpoint_name);
// Create custom metrics for system stats // Create custom metrics for system stats
let system_metrics = let system_metrics =
......
...@@ -371,8 +371,48 @@ impl Component { ...@@ -371,8 +371,48 @@ impl Component {
unimplemented!("collect_stats") unimplemented!("collect_stats")
} }
pub fn service_builder(&self) -> service::ServiceConfigBuilder { pub async fn add_stats_service(&mut self) -> anyhow::Result<()> {
service::ServiceConfigBuilder::from_component(self.clone()) let service_name = self.service_name();
// Pre-check to save cost of creating the service, but don't hold the lock
if self
.drt
.component_registry
.inner
.lock()
.await
.services
.contains_key(&service_name)
{
anyhow::bail!("Service {service_name} already exists");
}
let Some(nats_client) = self.drt.nats_client() else {
anyhow::bail!("Cannot create NATS service without NATS.");
};
let description = None;
let (nats_service, stats_reg) =
service::build_nats_service(nats_client, self, description).await?;
let mut guard = self.drt.component_registry.inner.lock().await;
if !guard.services.contains_key(&service_name) {
// Normal case
guard.services.insert(service_name.clone(), nats_service);
guard.stats_handlers.insert(service_name.clone(), stats_reg);
drop(guard);
} else {
drop(guard);
let _ = nats_service.stop().await;
return Err(anyhow::anyhow!(
"Service create race for {service_name}, now already exists"
));
}
// Register metrics callback. CRITICAL: Never fail service creation for metrics issues.
if let Err(err) = self.start_scraping_nats_service_component_metrics() {
tracing::debug!(service_name, error = %err, "Metrics registration failed");
}
Ok(())
} }
} }
......
...@@ -12,8 +12,6 @@ use crate::component::Component; ...@@ -12,8 +12,6 @@ use crate::component::Component;
pub use super::endpoint::EndpointStats; pub use super::endpoint::EndpointStats;
use educe::Educe;
type StatsHandlerRegistry = Arc<Mutex<HashMap<String, EndpointStatsHandler>>>; type StatsHandlerRegistry = Arc<Mutex<HashMap<String, EndpointStatsHandler>>>;
pub type StatsHandler = pub type StatsHandler =
Box<dyn FnMut(String, EndpointStats) -> serde_json::Value + Send + Sync + 'static>; Box<dyn FnMut(String, EndpointStats) -> serde_json::Value + Send + Sync + 'static>;
...@@ -23,71 +21,7 @@ pub type EndpointStatsHandler = ...@@ -23,71 +21,7 @@ pub type EndpointStatsHandler =
pub const PROJECT_NAME: &str = "Dynamo"; pub const PROJECT_NAME: &str = "Dynamo";
const SERVICE_VERSION: &str = env!("CARGO_PKG_VERSION"); const SERVICE_VERSION: &str = env!("CARGO_PKG_VERSION");
#[derive(Educe, Builder, Dissolve)] pub async fn build_nats_service(
#[educe(Debug)]
#[builder(pattern = "owned", build_fn(private, name = "build_internal"))]
pub struct ServiceConfig {
#[builder(private)]
component: Component,
/// Description
#[builder(default)]
description: Option<String>,
}
impl ServiceConfigBuilder {
/// Create the [`Component`]'s service and store it in the registry.
pub async fn create(self) -> anyhow::Result<Component> {
let (component, description) = self.build_internal()?.dissolve();
let service_name = component.service_name();
// Pre-check to save cost of creating the service, but don't hold the lock
if component
.drt
.component_registry
.inner
.lock()
.await
.services
.contains_key(&service_name)
{
anyhow::bail!("Service {service_name} already exists");
}
let Some(nats_client) = component.drt.nats_client() else {
anyhow::bail!("Cannot create NATS service without NATS.");
};
let (nats_service, stats_reg) =
build_nats_service(nats_client, &component, description).await?;
let mut guard = component.drt.component_registry.inner.lock().await;
if !guard.services.contains_key(&service_name) {
// Normal case
guard.services.insert(service_name.clone(), nats_service);
guard.stats_handlers.insert(service_name, stats_reg);
drop(guard);
} else {
drop(guard);
let _ = nats_service.stop().await;
return Err(anyhow::anyhow!(
"Service create race for {service_name}, now already exists"
));
}
// Register metrics callback. CRITICAL: Never fail service creation for metrics issues.
if let Err(err) = component.start_scraping_nats_service_component_metrics() {
tracing::debug!(
"Metrics registration failed for '{}': {}",
component.service_name(),
err
);
}
Ok(component)
}
}
async fn build_nats_service(
nats_client: &crate::transports::nats::Client, nats_client: &crate::transports::nats::Client,
component: &Component, component: &Component,
description: Option<String>, description: Option<String>,
...@@ -123,9 +57,3 @@ async fn build_nats_service( ...@@ -123,9 +57,3 @@ async fn build_nats_service(
Ok((nats_service, stats_handler_registry_clone)) Ok((nats_service, stats_handler_registry_clone))
} }
impl ServiceConfigBuilder {
pub(crate) fn from_component(component: Component) -> Self {
Self::default().component(component)
}
}
...@@ -1347,17 +1347,17 @@ mod test_metricsregistry_nats { ...@@ -1347,17 +1347,17 @@ mod test_metricsregistry_nats {
// Setup real DRT and registry using the test-friendly constructor // Setup real DRT and registry using the test-friendly constructor
let drt = create_test_drt_async().await; let drt = create_test_drt_async().await;
// Create a namespace and components from the DRT // Create a namespace and component from the DRT
let namespace = drt.namespace("ns789").unwrap(); let namespace = drt.namespace("ns789").unwrap();
let components = namespace.component("comp789").unwrap(); let mut component = namespace.component("comp789").unwrap();
// Create a service to trigger metrics callback registration // Create a service to trigger metrics callback registration
let _service = components.service_builder().create().await.unwrap(); component.add_stats_service().await.unwrap();
// Get components output which should include NATS client metrics // Get component output which should include NATS client metrics
// Additional checks for NATS client metrics (without checking specific values) // Additional checks for NATS client metrics (without checking specific values)
let component_nats_metrics = let component_nats_metrics =
super::test_helpers::extract_nats_lines(&components.prometheus_expfmt().unwrap()); super::test_helpers::extract_nats_lines(&component.prometheus_expfmt().unwrap());
println!( println!(
"Component NATS metrics count: {}", "Component NATS metrics count: {}",
component_nats_metrics.len() component_nats_metrics.len()
...@@ -1371,7 +1371,7 @@ mod test_metricsregistry_nats { ...@@ -1371,7 +1371,7 @@ mod test_metricsregistry_nats {
// Check for specific NATS client metric names (without values) // Check for specific NATS client metric names (without values)
let component_metrics = let component_metrics =
super::test_helpers::extract_metrics(&components.prometheus_expfmt().unwrap()); super::test_helpers::extract_metrics(&component.prometheus_expfmt().unwrap());
let actual_component_nats_metrics_sorted: Vec<&str> = component_metrics let actual_component_nats_metrics_sorted: Vec<&str> = component_metrics
.iter() .iter()
.map(|line| { .map(|line| {
...@@ -1457,12 +1457,15 @@ mod test_metricsregistry_nats { ...@@ -1457,12 +1457,15 @@ mod test_metricsregistry_nats {
let runtime = Runtime::from_current()?; let runtime = Runtime::from_current()?;
let drt = DistributedRuntime::from_settings(runtime.clone()).await?; let drt = DistributedRuntime::from_settings(runtime.clone()).await?;
let namespace = drt.namespace("ns123").unwrap(); let namespace = drt.namespace("ns123").unwrap();
let component = namespace.component("comp123").unwrap(); let mut component = namespace.component("comp123").unwrap();
let ingress = Ingress::for_engine(MessageHandler::new()).unwrap(); let ingress = Ingress::for_engine(MessageHandler::new()).unwrap();
let _backend_handle = tokio::spawn(async move { let _backend_handle = tokio::spawn(async move {
let service = component.service_builder().create().await.unwrap(); component.add_stats_service().await.unwrap();
let endpoint = service.endpoint("echo").endpoint_builder().handler(ingress); let endpoint = component
.endpoint("echo")
.endpoint_builder()
.handler(ingress);
endpoint.start().await.unwrap(); endpoint.start().await.unwrap();
}); });
......
...@@ -615,7 +615,7 @@ mod integration_tests { ...@@ -615,7 +615,7 @@ mod integration_tests {
// Now create a namespace, component, and endpoint to make the system healthy // Now create a namespace, component, and endpoint to make the system healthy
let namespace = drt.namespace("ns1234").unwrap(); let namespace = drt.namespace("ns1234").unwrap();
let component = namespace.component("comp1234").unwrap(); let mut component = namespace.component("comp1234").unwrap();
// Create a simple test handler // Create a simple test handler
use crate::pipeline::{async_trait, network::Ingress, AsyncEngine, AsyncEngineContextProvider, Error, ManyOut, SingleIn}; use crate::pipeline::{async_trait, network::Ingress, AsyncEngine, AsyncEngineContextProvider, Error, ManyOut, SingleIn};
...@@ -641,12 +641,8 @@ mod integration_tests { ...@@ -641,12 +641,8 @@ mod integration_tests {
// Start the service and endpoint with a health check payload // Start the service and endpoint with a health check payload
// This will automatically register the endpoint for health monitoring // This will automatically register the endpoint for health monitoring
tokio::spawn(async move { tokio::spawn(async move {
let _ = component component.add_stats_service().await.unwrap();
.service_builder() let _ = component.endpoint(ENDPOINT_NAME)
.create()
.await
.unwrap()
.endpoint(ENDPOINT_NAME)
.endpoint_builder() .endpoint_builder()
.handler(ingress) .handler(ingress)
.health_check_payload(serde_json::json!({ .health_check_payload(serde_json::json!({
......
...@@ -116,12 +116,9 @@ mod integration { ...@@ -116,12 +116,9 @@ mod integration {
// // make the ingress discoverable via a component service // // make the ingress discoverable via a component service
// // we must first create a service, then we can attach one more more endpoints // // we must first create a service, then we can attach one more more endpoints
runtime let mut component = runtime.namespace(DEFAULT_NAMESPACE)?.component("backend")?;
.namespace(DEFAULT_NAMESPACE)? component.add_stats_service().await?;
.component("backend")? component
.service_builder()
.create()
.await?
.endpoint("generate") .endpoint("generate")
.endpoint_builder() .endpoint_builder()
.handler(ingress) .handler(ingress)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment