// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. // SPDX-License-Identifier: Apache-2.0 use anyhow::Result; use async_trait::async_trait; use futures::Stream; use serde::{Deserialize, Serialize}; use std::pin::Pin; use tokio_util::sync::CancellationToken; mod metadata; pub use metadata::{DiscoveryMetadata, MetadataSnapshot}; mod mock; pub use mock::{MockDiscovery, SharedMockRegistry}; mod kv_store; pub use kv_store::KVStoreDiscovery; mod kube; pub use kube::{KubeDiscoveryClient, hash_pod_name}; pub mod utils; use crate::component::TransportType; pub use utils::watch_and_extract_field; /// Query key for prefix-based discovery queries /// Supports hierarchical queries from all endpoints down to specific endpoints #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub enum DiscoveryQuery { /// Query all endpoints in the system AllEndpoints, /// Query all endpoints in a specific namespace NamespacedEndpoints { namespace: String, }, /// Query all endpoints in a namespace/component ComponentEndpoints { namespace: String, component: String, }, /// Query a specific endpoint Endpoint { namespace: String, component: String, endpoint: String, }, AllModels, NamespacedModels { namespace: String, }, ComponentModels { namespace: String, component: String, }, EndpointModels { namespace: String, component: String, endpoint: String, }, } /// Specification for registering objects in the discovery plane /// Represents the input to the register() operation #[derive(Debug, Clone, PartialEq, Eq)] pub enum DiscoverySpec { /// Endpoint specification for registration Endpoint { namespace: String, component: String, endpoint: String, /// Transport type and routing information transport: TransportType, }, Model { namespace: String, component: String, endpoint: String, /// ModelDeploymentCard serialized as JSON /// This allows lib/runtime to remain independent of lib/llm types /// DiscoverySpec.from_model() and DiscoveryInstance.deserialize_model() are ergonomic helpers to create and deserialize the model card. card_json: serde_json::Value, }, } impl DiscoverySpec { /// Creates a Model discovery spec from a serializable type /// The card will be serialized to JSON to avoid cross-crate dependencies pub fn from_model( namespace: String, component: String, endpoint: String, card: &T, ) -> Result where T: Serialize, { let card_json = serde_json::to_value(card)?; Ok(Self::Model { namespace, component, endpoint, card_json, }) } /// Attaches an instance ID to create a DiscoveryInstance pub fn with_instance_id(self, instance_id: u64) -> DiscoveryInstance { match self { Self::Endpoint { namespace, component, endpoint, transport, } => DiscoveryInstance::Endpoint(crate::component::Instance { namespace, component, endpoint, instance_id, transport, }), Self::Model { namespace, component, endpoint, card_json, } => DiscoveryInstance::Model { namespace, component, endpoint, instance_id, card_json, }, } } } /// Registered instances in the discovery plane /// Represents objects that have been successfully registered with an instance ID #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] #[serde(tag = "type")] pub enum DiscoveryInstance { /// Registered endpoint instance - wraps the component::Instance directly Endpoint(crate::component::Instance), Model { namespace: String, component: String, endpoint: String, instance_id: u64, /// ModelDeploymentCard serialized as JSON /// This allows lib/runtime to remain independent of lib/llm types card_json: serde_json::Value, }, } impl DiscoveryInstance { /// Returns the instance ID for this discovery instance pub fn instance_id(&self) -> u64 { match self { Self::Endpoint(inst) => inst.instance_id, Self::Model { instance_id, .. } => *instance_id, } } /// Deserializes the model JSON into the specified type T /// Returns an error if this is not a Model instance or if deserialization fails pub fn deserialize_model(&self) -> Result where T: for<'de> Deserialize<'de>, { match self { Self::Model { card_json, .. } => Ok(serde_json::from_value(card_json.clone())?), Self::Endpoint(_) => { anyhow::bail!("Cannot deserialize model from Endpoint instance") } } } } /// Events emitted by the discovery watch stream #[derive(Debug, Clone, PartialEq, Eq)] pub enum DiscoveryEvent { /// A new instance was added Added(DiscoveryInstance), /// An instance was removed (identified by instance_id) Removed(u64), } /// Stream type for discovery events pub type DiscoveryStream = Pin> + Send>>; /// Discovery trait for service discovery across different backends #[async_trait] pub trait Discovery: Send + Sync { /// Returns a unique identifier for this worker (e.g lease id if using etcd or generated id for memory store) /// Discovery objects created by this worker will be associated with this id. fn instance_id(&self) -> u64; /// Registers an object in the discovery plane with the instance id async fn register(&self, spec: DiscoverySpec) -> Result; /// Unregisters an instance from the discovery plane async fn unregister(&self, instance: DiscoveryInstance) -> Result<()>; /// Returns a list of currently registered instances for the given discovery query /// This is a one-time snapshot without watching for changes async fn list(&self, query: DiscoveryQuery) -> Result>; /// Returns a stream of discovery events (Added/Removed) for the given discovery query /// The optional cancellation token can be used to stop the watch stream async fn list_and_watch( &self, query: DiscoveryQuery, cancel_token: Option, ) -> Result; }