Unverified Commit 94ad5081 authored by mohammedabdulwahhab's avatar mohammedabdulwahhab Committed by GitHub
Browse files

fix: introduce service discovery interface (1/n) (#3937)


Signed-off-by: default avatarmohammedabdulwahhab <furkhan324@berkeley.edu>
parent bfb2574f
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
use super::{
DiscoveryClient, DiscoveryEvent, DiscoveryInstance, DiscoveryKey, DiscoverySpec,
DiscoveryStream,
};
use crate::Result;
use async_trait::async_trait;
use std::sync::{Arc, Mutex};
/// Shared in-memory registry for mock discovery
#[derive(Clone, Default)]
pub struct SharedMockRegistry {
instances: Arc<Mutex<Vec<DiscoveryInstance>>>,
}
impl SharedMockRegistry {
pub fn new() -> Self {
Self::default()
}
}
/// Mock implementation of DiscoveryClient for testing
/// We can potentially remove this once we have KeyValueDiscoveryClient implemented
pub struct MockDiscoveryClient {
instance_id: u64,
registry: SharedMockRegistry,
}
impl MockDiscoveryClient {
pub fn new(instance_id: Option<u64>, registry: SharedMockRegistry) -> Self {
let instance_id = instance_id.unwrap_or_else(|| {
use std::sync::atomic::{AtomicU64, Ordering};
static COUNTER: AtomicU64 = AtomicU64::new(1);
COUNTER.fetch_add(1, Ordering::SeqCst)
});
Self {
instance_id,
registry,
}
}
}
/// Helper function to check if an instance matches a discovery key query
fn matches_key(instance: &DiscoveryInstance, key: &DiscoveryKey) -> bool {
match (instance, key) {
(DiscoveryInstance::Endpoint { .. }, DiscoveryKey::AllEndpoints) => true,
(
DiscoveryInstance::Endpoint {
namespace: ins_ns, ..
},
DiscoveryKey::NamespacedEndpoints { namespace },
) => ins_ns == namespace,
(
DiscoveryInstance::Endpoint {
namespace: ins_ns,
component: ins_comp,
..
},
DiscoveryKey::ComponentEndpoints {
namespace,
component,
},
) => ins_ns == namespace && ins_comp == component,
(
DiscoveryInstance::Endpoint {
namespace: ins_ns,
component: ins_comp,
endpoint: ins_ep,
..
},
DiscoveryKey::Endpoint {
namespace,
component,
endpoint,
},
) => ins_ns == namespace && ins_comp == component && ins_ep == endpoint,
}
}
#[async_trait]
impl DiscoveryClient for MockDiscoveryClient {
fn instance_id(&self) -> u64 {
self.instance_id
}
async fn register(&self, spec: DiscoverySpec) -> Result<DiscoveryInstance> {
let instance = spec.with_instance_id(self.instance_id);
self.registry
.instances
.lock()
.unwrap()
.push(instance.clone());
Ok(instance)
}
async fn list_and_watch(&self, key: DiscoveryKey) -> Result<DiscoveryStream> {
use std::collections::HashSet;
let registry = self.registry.clone();
let stream = async_stream::stream! {
let mut known_instances = HashSet::new();
loop {
let current: Vec<_> = {
let instances = registry.instances.lock().unwrap();
instances
.iter()
.filter(|instance| matches_key(instance, &key))
.cloned()
.collect()
};
let current_ids: HashSet<_> = current.iter().map(|i| {
match i {
DiscoveryInstance::Endpoint { instance_id, .. } => *instance_id,
}
}).collect();
// Emit Added events for new instances
for instance in current {
let id = match &instance {
DiscoveryInstance::Endpoint { instance_id, .. } => *instance_id,
};
if known_instances.insert(id) {
yield Ok(DiscoveryEvent::Added(instance));
}
}
// Emit Removed events for instances that are gone
for id in known_instances.difference(&current_ids).cloned().collect::<Vec<_>>() {
yield Ok(DiscoveryEvent::Removed(id));
known_instances.remove(&id);
}
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
}
};
Ok(Box::pin(stream))
}
}
#[cfg(test)]
mod tests {
use super::*;
use futures::StreamExt;
#[tokio::test]
async fn test_mock_discovery_add_and_remove() {
let registry = SharedMockRegistry::new();
let client1 = MockDiscoveryClient::new(Some(1), registry.clone());
let client2 = MockDiscoveryClient::new(Some(2), registry.clone());
let spec = DiscoverySpec::Endpoint {
namespace: "test-ns".to_string(),
component: "test-comp".to_string(),
endpoint: "test-ep".to_string(),
};
let key = DiscoveryKey::Endpoint {
namespace: "test-ns".to_string(),
component: "test-comp".to_string(),
endpoint: "test-ep".to_string(),
};
// Start watching
let mut stream = client1.list_and_watch(key.clone()).await.unwrap();
// Add first instance
client1.register(spec.clone()).await.unwrap();
let event = stream.next().await.unwrap().unwrap();
match event {
DiscoveryEvent::Added(DiscoveryInstance::Endpoint { instance_id, .. }) => {
assert_eq!(instance_id, 1);
}
_ => panic!("Expected Added event for instance-1"),
}
// Add second instance
client2.register(spec.clone()).await.unwrap();
let event = stream.next().await.unwrap().unwrap();
match event {
DiscoveryEvent::Added(DiscoveryInstance::Endpoint { instance_id, .. }) => {
assert_eq!(instance_id, 2);
}
_ => panic!("Expected Added event for instance-2"),
}
// Remove first instance
registry.instances.lock().unwrap().retain(|i| match i {
DiscoveryInstance::Endpoint { instance_id, .. } => *instance_id != 1,
});
let event = stream.next().await.unwrap().unwrap();
match event {
DiscoveryEvent::Removed(instance_id) => {
assert_eq!(instance_id, 1);
}
_ => panic!("Expected Removed event for instance-1"),
}
}
}
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
use crate::Result;
use async_trait::async_trait;
use futures::Stream;
use serde::{Deserialize, Serialize};
use std::pin::Pin;
mod mock;
pub use mock::{MockDiscoveryClient, SharedMockRegistry};
/// Query key for prefix-based discovery queries
/// Supports hierarchical queries from all endpoints down to specific endpoints
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum DiscoveryKey {
/// Query all endpoints in the system
AllEndpoints,
/// Query all endpoints in a specific namespace
NamespacedEndpoints { namespace: String },
/// Query all endpoints in a namespace/component
ComponentEndpoints {
namespace: String,
component: String,
},
/// Query a specific endpoint
Endpoint {
namespace: String,
component: String,
endpoint: String,
},
// TODO: Extend to support ModelCard queries:
// - AllModels
// - NamespacedModels { namespace }
// - ComponentModels { namespace, component }
// - Model { namespace, component, model_name }
}
/// Specification for registering objects in the discovery plane
/// Represents the input to the register() operation
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum DiscoverySpec {
/// Endpoint specification for registration
Endpoint {
namespace: String,
component: String,
endpoint: String,
},
// TODO: Add ModelCard variant:
// - ModelCard { namespace, component, model_name, card: ModelDeploymentCard }
}
impl DiscoverySpec {
/// Attaches an instance ID to create a DiscoveryInstance
pub fn with_instance_id(self, instance_id: u64) -> DiscoveryInstance {
match self {
Self::Endpoint {
namespace,
component,
endpoint,
} => DiscoveryInstance::Endpoint {
namespace,
component,
endpoint,
instance_id,
},
}
}
}
/// Registered instances in the discovery plane
/// Represents objects that have been successfully registered with an instance ID
#[derive(Debug, Clone, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
#[serde(tag = "type")]
pub enum DiscoveryInstance {
/// Registered endpoint instance
Endpoint {
namespace: String,
component: String,
endpoint: String,
instance_id: u64,
},
// TODO: Add ModelCard variant:
// - ModelCard { namespace, component, model_name, instance_id, card: ModelDeploymentCard }
}
/// Events emitted by the discovery client watch stream
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum DiscoveryEvent {
/// A new instance was added
Added(DiscoveryInstance),
/// An instance was removed (identified by instance_id)
Removed(u64),
}
/// Stream type for discovery events
pub type DiscoveryStream = Pin<Box<dyn Stream<Item = Result<DiscoveryEvent>> + Send>>;
/// Discovery client trait for service discovery across different backends
#[async_trait]
pub trait DiscoveryClient: Send + Sync {
/// Returns a unique identifier for this worker (e.g lease id if using etcd or generated id for memory store)
/// Discovery objects created by this worker will be associated with this id.
fn instance_id(&self) -> u64;
/// Registers an object in the discovery plane with the instance id
async fn register(&self, spec: DiscoverySpec) -> Result<DiscoveryInstance>;
/// Returns a stream of discovery events (Added/Removed) for the given discovery key
async fn list_and_watch(&self, key: DiscoveryKey) -> Result<DiscoveryStream>;
}
...@@ -9,6 +9,7 @@ use crate::transports::nats::DRTNatsClientPrometheusMetrics; ...@@ -9,6 +9,7 @@ use crate::transports::nats::DRTNatsClientPrometheusMetrics;
use crate::{ use crate::{
ErrorContext, ErrorContext,
component::{self, ComponentBuilder, Endpoint, InstanceSource, Namespace}, component::{self, ComponentBuilder, Endpoint, InstanceSource, Namespace},
discovery::DiscoveryClient,
metrics::PrometheusUpdateCallback, metrics::PrometheusUpdateCallback,
metrics::{MetricsHierarchy, MetricsRegistry}, metrics::{MetricsHierarchy, MetricsRegistry},
service::ServiceClient, service::ServiceClient,
...@@ -83,6 +84,14 @@ impl DistributedRuntime { ...@@ -83,6 +84,14 @@ impl DistributedRuntime {
let nats_client_for_metrics = nats_client.clone(); let nats_client_for_metrics = nats_client.clone();
// Initialize discovery client with mock implementation
// TODO: Replace MockDiscoveryClient with KeyValueStoreDiscoveryClient or KubeDiscoveryClient
let discovery_client = {
use crate::discovery::{MockDiscoveryClient, SharedMockRegistry};
let registry = SharedMockRegistry::new();
Arc::new(MockDiscoveryClient::new(None, registry)) as Arc<dyn DiscoveryClient>
};
let distributed_runtime = Self { let distributed_runtime = Self {
runtime, runtime,
etcd_client, etcd_client,
...@@ -90,6 +99,7 @@ impl DistributedRuntime { ...@@ -90,6 +99,7 @@ impl DistributedRuntime {
nats_client, nats_client,
tcp_server: Arc::new(OnceCell::new()), tcp_server: Arc::new(OnceCell::new()),
system_status_server: Arc::new(OnceLock::new()), system_status_server: Arc::new(OnceLock::new()),
discovery_client,
component_registry: component::Registry::new(), component_registry: component::Registry::new(),
is_static, is_static,
instance_sources: Arc::new(Mutex::new(HashMap::new())), instance_sources: Arc::new(Mutex::new(HashMap::new())),
...@@ -223,6 +233,11 @@ impl DistributedRuntime { ...@@ -223,6 +233,11 @@ impl DistributedRuntime {
Namespace::new(self.clone(), name.into(), self.is_static) Namespace::new(self.clone(), name.into(), self.is_static)
} }
/// TODO: Return discovery client when KeyValueDiscoveryClient or KubeDiscoveryClient is implemented
pub fn discovery_client(&self) -> Result<Arc<dyn DiscoveryClient>> {
Err(error!("Discovery client not implemented!"))
}
pub(crate) fn service_client(&self) -> Option<ServiceClient> { pub(crate) fn service_client(&self) -> Option<ServiceClient> {
self.nats_client().map(|nc| ServiceClient::new(nc.clone())) self.nats_client().map(|nc| ServiceClient::new(nc.clone()))
} }
......
...@@ -22,6 +22,7 @@ pub use config::RuntimeConfig; ...@@ -22,6 +22,7 @@ pub use config::RuntimeConfig;
pub mod component; pub mod component;
pub mod compute; pub mod compute;
pub mod discovery;
pub mod engine; pub mod engine;
pub mod health_check; pub mod health_check;
pub mod system_status_server; pub mod system_status_server;
...@@ -95,6 +96,9 @@ pub struct DistributedRuntime { ...@@ -95,6 +96,9 @@ pub struct DistributedRuntime {
tcp_server: Arc<OnceCell<Arc<transports::tcp::server::TcpStreamServer>>>, tcp_server: Arc<OnceCell<Arc<transports::tcp::server::TcpStreamServer>>>,
system_status_server: Arc<OnceLock<Arc<system_status_server::SystemStatusServerInfo>>>, system_status_server: Arc<OnceLock<Arc<system_status_server::SystemStatusServerInfo>>>,
// Service discovery client
discovery_client: Arc<dyn discovery::DiscoveryClient>,
// local registry for components // local registry for components
// the registry allows us to use share runtime resources across instances of the same component object. // the registry allows us to use share runtime resources across instances of the same component object.
// take for example two instances of a client to the same remote component. The registry allows us to use // take for example two instances of a client to the same remote component. The registry allows us to use
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment