Unverified Commit c78b5901 authored by Graham King's avatar Graham King Committed by GitHub
Browse files

chore(runtime): Do not expose etcd lease ID (#3915)


Signed-off-by: default avatarGraham King <grahamk@nvidia.com>
parent a79122c6
...@@ -123,10 +123,8 @@ class DynamoSglangPublisher: ...@@ -123,10 +123,8 @@ class DynamoSglangPublisher:
ep = kv_events.get("endpoint") ep = kv_events.get("endpoint")
zmq_ep = ep.replace("*", get_local_ip_auto()) if ep else None zmq_ep = ep.replace("*", get_local_ip_auto()) if ep else None
lease_id = self.generate_endpoint.lease_id()
zmq_config = ZmqKvEventPublisherConfig( zmq_config = ZmqKvEventPublisherConfig(
worker_id=lease_id, worker_id=self.generate_endpoint.connection_id(),
kv_block_size=self.server_args.page_size, kv_block_size=self.server_args.page_size,
zmq_endpoint=zmq_ep, zmq_endpoint=zmq_ep,
) )
......
...@@ -421,7 +421,7 @@ async def init(runtime: DistributedRuntime, config: Config): ...@@ -421,7 +421,7 @@ async def init(runtime: DistributedRuntime, config: Config):
component, component,
engine, engine,
kv_listener, kv_listener,
int(endpoint.lease_id()), int(endpoint.connection_id()),
config.kv_block_size, config.kv_block_size,
metrics_labels, metrics_labels,
) as publisher: ) as publisher:
......
...@@ -135,7 +135,7 @@ def setup_kv_event_publisher( ...@@ -135,7 +135,7 @@ def setup_kv_event_publisher(
).replace("*", "127.0.0.1") ).replace("*", "127.0.0.1")
zmq_config = ZmqKvEventPublisherConfig( zmq_config = ZmqKvEventPublisherConfig(
worker_id=generate_endpoint.lease_id(), worker_id=generate_endpoint.connection_id(),
kv_block_size=vllm_config.cache_config.block_size, kv_block_size=vllm_config.cache_config.block_size,
zmq_endpoint=zmq_endpoint, zmq_endpoint=zmq_endpoint,
) )
......
...@@ -163,7 +163,7 @@ class VllmBaseWorker: ...@@ -163,7 +163,7 @@ class VllmBaseWorker:
).replace("*", "127.0.0.1") ).replace("*", "127.0.0.1")
zmq_config = ZmqKvEventPublisherConfig( zmq_config = ZmqKvEventPublisherConfig(
worker_id=endpoint.lease_id(), worker_id=endpoint.connection_id(),
kv_block_size=vllm_config.cache_config.block_size, kv_block_size=vllm_config.cache_config.block_size,
zmq_endpoint=zmq_endpoint, zmq_endpoint=zmq_endpoint,
) )
......
...@@ -773,12 +773,9 @@ impl Endpoint { ...@@ -773,12 +773,9 @@ impl Endpoint {
}) })
} }
fn lease_id(&self) -> u64 { // Opaque unique ID for this worker. May change over worker lifetime.
self.inner fn connection_id(&self) -> u64 {
.drt() self.inner.drt().connection_id()
.primary_lease()
.map(|l| l.id())
.unwrap_or(0)
} }
/// Get a RuntimeMetrics helper for creating Prometheus metrics /// Get a RuntimeMetrics helper for creating Prometheus metrics
......
...@@ -225,12 +225,7 @@ impl BlockManager { ...@@ -225,12 +225,7 @@ impl BlockManager {
self._controller = Some(Arc::new(controller)); self._controller = Some(Arc::new(controller));
let instance_id = component let instance_id = component.inner.drt().connection_id();
.inner
.drt()
.primary_lease()
.map(|lease| lease.id())
.ok_or_else(|| to_pyerr(anyhow::anyhow!("no instance id")))?;
tracing::info!( tracing::info!(
"Dynamo KVBM Controller: {}.{}:{}", "Dynamo KVBM Controller: {}.{}:{}",
......
...@@ -991,14 +991,7 @@ async fn create_kv_router_from_endpoint( ...@@ -991,14 +991,7 @@ async fn create_kv_router_from_endpoint(
// Get component from endpoint // Get component from endpoint
let component = endpoint.inner.component(); let component = endpoint.inner.component();
// Verify we're not in static mode // Create ModelManager and use it to create KvRouter (ensures registration)
if component.drt().primary_lease().is_none() {
return Err(PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(
"Failed to get primary lease: Cannot KV route static workers",
));
}
// Create ModelManager and use it to create KvRouter (ensures etcd registration)
let model_manager = Arc::new(llm_rs::discovery::ModelManager::new()); let model_manager = Arc::new(llm_rs::discovery::ModelManager::new());
let kv_router = model_manager let kv_router = model_manager
.kv_chooser_for(component, block_size as u32, kv_router_config) .kv_chooser_for(component, block_size as u32, kv_router_config)
......
...@@ -157,9 +157,9 @@ class Endpoint: ...@@ -157,9 +157,9 @@ class Endpoint:
""" """
... ...
async def lease_id(self) -> int: def connection_id(self) -> int:
""" """
Return primary lease id. Currently, cannot set a different lease id. Opaque unique ID for this worker. May change over worker lifetime.
""" """
... ...
......
...@@ -112,7 +112,7 @@ mod tests { ...@@ -112,7 +112,7 @@ mod tests {
.await .await
.unwrap(); .unwrap();
let worker_id = drt.primary_lease().unwrap().id(); let worker_id = drt.connection_id();
let block_manager = create_reference_block_manager_with_counts(8, 16, 0).await; let block_manager = create_reference_block_manager_with_counts(8, 16, 0).await;
......
...@@ -677,14 +677,7 @@ impl KvbmWorker { ...@@ -677,14 +677,7 @@ impl KvbmWorker {
bytes_per_block: usize, bytes_per_block: usize,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let drt = config.drt.clone(); let drt = config.drt.clone();
let worker_id = drt.connection_id() as usize;
let worker_id = drt
.primary_lease()
.ok_or(anyhow::anyhow!(
"unable to get primary lease; check that drt is not static"
))?
.id() as usize;
// Readiness gating for ping // Readiness gating for ping
let state = Arc::new(WorkerState::new()); let state = Arc::new(WorkerState::new());
......
...@@ -224,13 +224,7 @@ impl KvRouter { ...@@ -224,13 +224,7 @@ impl KvRouter {
consumer_uuid: String, consumer_uuid: String,
) -> Result<Self> { ) -> Result<Self> {
let kv_router_config = kv_router_config.unwrap_or_default(); let kv_router_config = kv_router_config.unwrap_or_default();
let cancellation_token = component.drt().primary_token();
let cancellation_token = component
.drt()
.primary_lease()
.expect("Cannot KV route static workers")
.primary_token();
let generate_endpoint = component.endpoint("generate"); let generate_endpoint = component.endpoint("generate");
let client = generate_endpoint.client().await?; let client = generate_endpoint.client().await?;
......
...@@ -784,15 +784,7 @@ impl WorkerMetricsPublisher { ...@@ -784,15 +784,7 @@ impl WorkerMetricsPublisher {
} }
pub async fn create_endpoint(&self, component: Component) -> Result<()> { pub async fn create_endpoint(&self, component: Component) -> Result<()> {
let worker_id = component let worker_id = component.drt().connection_id();
.drt()
.primary_lease()
.map(|lease| lease.id())
.unwrap_or_else(|| {
tracing::warn!("Component is static, assuming worker_id of 0");
0
});
self.start_nats_metrics_publishing(component.namespace().clone(), worker_id); self.start_nats_metrics_publishing(component.namespace().clone(), worker_id);
Ok(()) Ok(())
} }
......
...@@ -246,12 +246,7 @@ impl MockVllmEngine { ...@@ -246,12 +246,7 @@ impl MockVllmEngine {
tracing::debug!("Component found for KV events publishing"); tracing::debug!("Component found for KV events publishing");
tracing::debug!("Getting worker_id"); tracing::debug!("Getting worker_id");
let worker_id = comp let worker_id = comp.drt().connection_id();
.drt()
.primary_lease()
.expect("Cannot publish KV events without lease") // ← This will PANIC on static!
.id();
// let worker_id = 0;
tracing::debug!("Worker_id set to: {worker_id}"); tracing::debug!("Worker_id set to: {worker_id}");
tracing::debug!("Creating KV event publisher"); tracing::debug!("Creating KV event publisher");
......
...@@ -216,7 +216,7 @@ pub mod llm_kvbm { ...@@ -216,7 +216,7 @@ pub mod llm_kvbm {
impl DynamoKvbmRuntimeConfigBuilder { impl DynamoKvbmRuntimeConfigBuilder {
pub fn build(self) -> Result<kvbm::config::KvManagerRuntimeConfig> { pub fn build(self) -> Result<kvbm::config::KvManagerRuntimeConfig> {
let (runtime, nixl) = self.build_internal()?.dissolve(); let (runtime, nixl) = self.build_internal()?.dissolve();
let worker_id = runtime.primary_lease().unwrap().id(); let worker_id = runtime.connection_id();
Ok(kvbm::config::KvManagerRuntimeConfig::builder() Ok(kvbm::config::KvManagerRuntimeConfig::builder()
.worker_id(worker_id) .worker_id(worker_id)
.cancellation_token(runtime.primary_token().child_token()) .cancellation_token(runtime.primary_token().child_token())
...@@ -247,7 +247,7 @@ pub mod llm_kvbm { ...@@ -247,7 +247,7 @@ pub mod llm_kvbm {
impl DynamoEventManager { impl DynamoEventManager {
pub fn new(component: Arc<KVBMDynamoRuntimeComponent>) -> Self { pub fn new(component: Arc<KVBMDynamoRuntimeComponent>) -> Self {
let (tx, rx) = mpsc::unbounded_channel(); let (tx, rx) = mpsc::unbounded_channel();
let worker_id = component.drt().primary_lease().unwrap().id(); let worker_id = component.drt().connection_id();
component.drt().runtime().secondary().spawn(async move { component.drt().runtime().secondary().spawn(async move {
worker_task(component, rx).await; worker_task(component, rx).await;
}); });
......
...@@ -33,7 +33,6 @@ use std::fmt; ...@@ -33,7 +33,6 @@ use std::fmt;
use crate::{ use crate::{
config::HealthStatus, config::HealthStatus,
discovery::Lease,
metrics::{MetricsHierarchy, MetricsRegistry, prometheus_names}, metrics::{MetricsHierarchy, MetricsRegistry, prometheus_names},
service::ServiceSet, service::ServiceSet,
transports::etcd::{ETCD_ROOT_PATH, EtcdPath}, transports::etcd::{ETCD_ROOT_PATH, EtcdPath},
......
...@@ -15,12 +15,6 @@ pub struct EndpointConfig { ...@@ -15,12 +15,6 @@ pub struct EndpointConfig {
#[builder(private)] #[builder(private)]
endpoint: Endpoint, endpoint: Endpoint,
// todo: move lease to component/service
/// Lease
#[educe(Debug(ignore))]
#[builder(default)]
lease: Option<Lease>,
/// Endpoint handler /// Endpoint handler
#[educe(Debug(ignore))] #[educe(Debug(ignore))]
handler: Arc<dyn PushWorkHandler>, handler: Arc<dyn PushWorkHandler>,
...@@ -61,19 +55,17 @@ impl EndpointConfigBuilder { ...@@ -61,19 +55,17 @@ impl EndpointConfigBuilder {
pub async fn start(self) -> Result<()> { pub async fn start(self) -> Result<()> {
let ( let (
endpoint, endpoint,
lease,
handler, handler,
stats_handler, stats_handler,
metrics_labels, metrics_labels,
graceful_shutdown, graceful_shutdown,
health_check_payload, health_check_payload,
) = self.build_internal()?.dissolve(); ) = self.build_internal()?.dissolve();
let lease = lease.or(endpoint.drt().primary_lease()); let connection_id = endpoint.drt().connection_id();
let lease_id = lease.as_ref().map(|l| l.id()).unwrap_or(0);
tracing::debug!( tracing::debug!(
"Starting endpoint: {}", "Starting endpoint: {}",
endpoint.etcd_path_with_lease_id(lease_id) endpoint.etcd_path_with_lease_id(connection_id)
); );
let service_name = endpoint.component.service_name(); let service_name = endpoint.component.service_name();
...@@ -107,25 +99,26 @@ impl EndpointConfigBuilder { ...@@ -107,25 +99,26 @@ impl EndpointConfigBuilder {
if let Some(stats_handler) = stats_handler { if let Some(stats_handler) = stats_handler {
handler_map handler_map
.lock() .lock()
.insert(endpoint.subject_to(lease_id), stats_handler); .insert(endpoint.subject_to(connection_id), stats_handler);
} }
// creates an endpoint for the service // creates an endpoint for the service
let service_endpoint = group let service_endpoint = group
.endpoint(&endpoint.name_with_id(lease_id)) .endpoint(&endpoint.name_with_id(connection_id))
.await .await
.map_err(|e| anyhow::anyhow!("Failed to start endpoint: {e}"))?; .map_err(|e| anyhow::anyhow!("Failed to start endpoint: {e}"))?;
// Create a token that responds to both runtime shutdown and lease expiration // This creates a child token of the runtime's endpoint_shutdown_token. That token is
let runtime_shutdown_token = endpoint.drt().child_token(); // cancelled first as part of graceful shutdown. See Runtime::shutdown.
let endpoint_shutdown_token = endpoint.drt().child_token();
// Extract all values needed from endpoint before any spawns // Extract all values needed from endpoint before any spawns
let namespace_name = endpoint.component.namespace.name.clone(); let namespace_name = endpoint.component.namespace.name.clone();
let component_name = endpoint.component.name.clone(); let component_name = endpoint.component.name.clone();
let endpoint_name = endpoint.name.clone(); let endpoint_name = endpoint.name.clone();
let system_health = endpoint.drt().system_health.clone(); let system_health = endpoint.drt().system_health.clone();
let subject = endpoint.subject_to(lease_id); let subject = endpoint.subject_to(connection_id);
let etcd_path = endpoint.etcd_path_with_lease_id(lease_id); let etcd_path = endpoint.etcd_path_with_lease_id(connection_id);
let etcd_client = endpoint.component.drt.etcd_client.clone(); let etcd_client = endpoint.component.drt.etcd_client.clone();
// Register health check target in SystemHealth if provided // Register health check target in SystemHealth if provided
...@@ -134,7 +127,7 @@ impl EndpointConfigBuilder { ...@@ -134,7 +127,7 @@ impl EndpointConfigBuilder {
component: component_name.clone(), component: component_name.clone(),
endpoint: endpoint_name.clone(), endpoint: endpoint_name.clone(),
namespace: namespace_name.clone(), namespace: namespace_name.clone(),
instance_id: lease_id, instance_id: connection_id,
transport: TransportType::NatsTcp(subject.clone()), transport: TransportType::NatsTcp(subject.clone()),
}; };
tracing::debug!(endpoint_name = %endpoint_name, "Registering endpoint health check target"); tracing::debug!(endpoint_name = %endpoint_name, "Registering endpoint health check target");
...@@ -149,29 +142,6 @@ impl EndpointConfigBuilder { ...@@ -149,29 +142,6 @@ impl EndpointConfigBuilder {
} }
} }
let cancel_token = if let Some(lease) = lease.as_ref() {
// Create a new token that will be cancelled when EITHER the lease expires OR runtime shutdown occurs
let combined_token = CancellationToken::new();
let combined_for_select = combined_token.clone();
let lease_token = lease.child_token();
// Use secondary runtime for this lightweight monitoring task
endpoint.drt().runtime().secondary().spawn(async move {
tokio::select! {
_ = lease_token.cancelled() => {
tracing::trace!("Lease cancelled, triggering endpoint shutdown");
}
_ = runtime_shutdown_token.cancelled() => {
tracing::trace!("Runtime shutdown triggered, cancelling endpoint");
}
}
combined_for_select.cancel();
});
combined_token
} else {
// No lease, just use runtime shutdown token
runtime_shutdown_token
};
// Register with graceful shutdown tracker if needed // Register with graceful shutdown tracker if needed
if graceful_shutdown { if graceful_shutdown {
tracing::debug!( tracing::debug!(
...@@ -186,12 +156,11 @@ impl EndpointConfigBuilder { ...@@ -186,12 +156,11 @@ impl EndpointConfigBuilder {
let push_endpoint = PushEndpoint::builder() let push_endpoint = PushEndpoint::builder()
.service_handler(handler) .service_handler(handler)
.cancellation_token(cancel_token.clone()) .cancellation_token(endpoint_shutdown_token.clone())
.graceful_shutdown(graceful_shutdown) .graceful_shutdown(graceful_shutdown)
.build() .build()
.map_err(|e| anyhow::anyhow!("Failed to build push endpoint: {e}"))?; .map_err(|e| anyhow::anyhow!("Failed to build push endpoint: {e}"))?;
// launch in primary runtime
let tracker_clone = if graceful_shutdown { let tracker_clone = if graceful_shutdown {
Some(endpoint.drt().graceful_shutdown_tracker()) Some(endpoint.drt().graceful_shutdown_tracker())
} else { } else {
...@@ -210,7 +179,7 @@ impl EndpointConfigBuilder { ...@@ -210,7 +179,7 @@ impl EndpointConfigBuilder {
namespace_name_for_task, namespace_name_for_task,
component_name_for_task, component_name_for_task,
endpoint_name_for_task, endpoint_name_for_task,
lease_id, connection_id,
system_health, system_health,
) )
.await; .await;
...@@ -231,7 +200,7 @@ impl EndpointConfigBuilder { ...@@ -231,7 +200,7 @@ impl EndpointConfigBuilder {
component: component_name.clone(), component: component_name.clone(),
endpoint: endpoint_name.clone(), endpoint: endpoint_name.clone(),
namespace: namespace_name.clone(), namespace: namespace_name.clone(),
instance_id: lease_id, instance_id: connection_id,
transport: TransportType::NatsTcp(subject), transport: TransportType::NatsTcp(subject),
}; };
...@@ -239,7 +208,7 @@ impl EndpointConfigBuilder { ...@@ -239,7 +208,7 @@ impl EndpointConfigBuilder {
if let Some(etcd_client) = &etcd_client if let Some(etcd_client) = &etcd_client
&& let Err(e) = etcd_client && let Err(e) = etcd_client
.kv_create(&etcd_path, info, Some(lease_id)) .kv_create(&etcd_path, info, Some(connection_id))
.await .await
{ {
tracing::error!( tracing::error!(
...@@ -248,7 +217,7 @@ impl EndpointConfigBuilder { ...@@ -248,7 +217,7 @@ impl EndpointConfigBuilder {
error = %e, error = %e,
"Unable to register service for discovery" "Unable to register service for discovery"
); );
cancel_token.cancel(); endpoint_shutdown_token.cancel();
return Err(error!( return Err(error!(
"Unable to register service for discovery. Check discovery service status" "Unable to register service for discovery. Check discovery service status"
)); ));
......
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
use crate::{Result, transports::etcd};
pub use etcd::Lease;
pub struct DiscoveryClient {
namespace: String,
etcd_client: etcd::Client,
}
impl DiscoveryClient {
/// Create a new [`DiscoveryClient`]
///
/// This will establish a connection to the etcd server, create a primary lease,
/// and spawn a task to keep the lease alive and tie the lifetime of the [`Runtime`]
/// to the lease.
///
/// If the lease expires, the [`Runtime`] will be shutdown.
/// If the [`Runtime`] is shutdown, the lease will be revoked.
pub(crate) fn new(namespace: String, etcd_client: etcd::Client) -> Self {
DiscoveryClient {
namespace,
etcd_client,
}
}
/// Get the primary lease ID
pub fn primary_lease_id(&self) -> u64 {
self.etcd_client.lease_id()
}
/// Create a [`Lease`] with a given time-to-live (TTL).
/// This [`Lease`] will be tied to the [`crate::Runtime`], but has its own independent [`crate::CancellationToken`].
pub async fn create_lease(&self, ttl: u64) -> Result<Lease> {
self.etcd_client.create_lease(ttl).await
}
}
...@@ -9,7 +9,6 @@ use crate::transports::nats::DRTNatsClientPrometheusMetrics; ...@@ -9,7 +9,6 @@ use crate::transports::nats::DRTNatsClientPrometheusMetrics;
use crate::{ use crate::{
ErrorContext, ErrorContext,
component::{self, ComponentBuilder, Endpoint, InstanceSource, Namespace}, component::{self, ComponentBuilder, Endpoint, InstanceSource, Namespace},
discovery::DiscoveryClient,
metrics::PrometheusUpdateCallback, metrics::PrometheusUpdateCallback,
metrics::{MetricsHierarchy, MetricsRegistry}, metrics::{MetricsHierarchy, MetricsRegistry},
service::ServiceClient, service::ServiceClient,
...@@ -211,10 +210,8 @@ impl DistributedRuntime { ...@@ -211,10 +210,8 @@ impl DistributedRuntime {
self.runtime.primary_token() self.runtime.primary_token()
} }
/// The etcd lease all our components will be attached to. pub fn connection_id(&self) -> u64 {
/// Not available for static workers. self.store.connection_id()
pub fn primary_lease(&self) -> Option<etcd::Lease> {
self.etcd_client.as_ref().map(|c| c.primary_lease())
} }
pub fn shutdown(&self) { pub fn shutdown(&self) {
...@@ -226,27 +223,6 @@ impl DistributedRuntime { ...@@ -226,27 +223,6 @@ impl DistributedRuntime {
Namespace::new(self.clone(), name.into(), self.is_static) Namespace::new(self.clone(), name.into(), self.is_static)
} }
// /// Create a [`Component`]
// pub fn component(
// &self,
// name: impl Into<String>,
// namespace: impl Into<String>,
// ) -> Result<Component> {
// Ok(ComponentBuilder::from_runtime(self.clone())
// .name(name.into())
// .namespace(namespace.into())
// .build()?)
// }
pub(crate) fn discovery_client(&self, namespace: impl Into<String>) -> DiscoveryClient {
DiscoveryClient::new(
namespace.into(),
self.etcd_client
.clone()
.expect("Attempt to get discovery_client on static DistributedRuntime"),
)
}
pub(crate) fn service_client(&self) -> Option<ServiceClient> { pub(crate) fn service_client(&self) -> Option<ServiceClient> {
self.nats_client().map(|nc| ServiceClient::new(nc.clone())) self.nats_client().map(|nc| ServiceClient::new(nc.clone()))
} }
......
...@@ -22,7 +22,6 @@ pub use config::RuntimeConfig; ...@@ -22,7 +22,6 @@ pub use config::RuntimeConfig;
pub mod component; pub mod component;
pub mod compute; pub mod compute;
pub mod discovery;
pub mod engine; pub mod engine;
pub mod health_check; pub mod health_check;
pub mod system_status_server; pub mod system_status_server;
......
...@@ -174,7 +174,7 @@ impl EtcdBucket { ...@@ -174,7 +174,7 @@ impl EtcdBucket {
tracing::trace!("etcd create: {k}"); tracing::trace!("etcd create: {k}");
// Use atomic transaction to check and create in one operation // Use atomic transaction to check and create in one operation
let put_options = PutOptions::new().with_lease(self.client.primary_lease().id() as i64); let put_options = PutOptions::new().with_lease(self.client.lease_id() as i64);
// Build transaction that creates key only if it doesn't exist // Build transaction that creates key only if it doesn't exist
let txn = Txn::new() let txn = Txn::new()
...@@ -243,7 +243,7 @@ impl EtcdBucket { ...@@ -243,7 +243,7 @@ impl EtcdBucket {
} }
let put_options = PutOptions::new() let put_options = PutOptions::new()
.with_lease(self.client.primary_lease().id() as i64) .with_lease(self.client.lease_id() as i64)
.with_prev_key(); .with_prev_key();
let mut put_resp = self let mut put_resp = self
.client .client
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment