Unverified Commit 3ee98925 authored by Biswa Panda's avatar Biswa Panda Committed by GitHub
Browse files

feat: add event plane discovery (#5614)

parent 491a2109
...@@ -214,7 +214,7 @@ impl ModelWatcher { ...@@ -214,7 +214,7 @@ impl ModelWatcher {
// Extract ModelCardInstanceId from the removal event // Extract ModelCardInstanceId from the removal event
let model_card_instance_id = match &id { let model_card_instance_id = match &id {
DiscoveryInstanceId::Model(mcid) => mcid, DiscoveryInstanceId::Model(mcid) => mcid,
DiscoveryInstanceId::Endpoint(_) => { DiscoveryInstanceId::Endpoint(_) | DiscoveryInstanceId::EventChannel(_) => {
tracing::error!( tracing::error!(
"Unexpected discovery instance type in removal (expected Model)" "Unexpected discovery instance type in removal (expected Model)"
); );
......
...@@ -14,6 +14,7 @@ ...@@ -14,6 +14,7 @@
//! - **Runtime**: Tokio runtime configuration and system server settings //! - **Runtime**: Tokio runtime configuration and system server settings
//! - **NATS**: NATS client connection and authentication //! - **NATS**: NATS client connection and authentication
//! - **ETCD**: ETCD client connection and authentication //! - **ETCD**: ETCD client connection and authentication
//! - **Event Plane**: Event transport selection (NATS)
//! - **KVBM**: Key-Value Block Manager configuration //! - **KVBM**: Key-Value Block Manager configuration
//! - **LLM**: Language model inference configuration //! - **LLM**: Language model inference configuration
//! - **Model**: Model loading and caching //! - **Model**: Model loading and caching
...@@ -303,6 +304,15 @@ pub mod model { ...@@ -303,6 +304,15 @@ pub mod model {
} }
} }
/// Event Plane transport environment variables
pub mod event_plane {
/// Event transport selection: "zmq" or "nats". Default: "nats"
pub const DYN_EVENT_PLANE: &str = "DYN_EVENT_PLANE";
/// Event plane codec selection: "json" or "msgpack".
pub const DYN_EVENT_PLANE_CODEC: &str = "DYN_EVENT_PLANE_CODEC";
}
/// CUDA and GPU environment variables /// CUDA and GPU environment variables
pub mod cuda { pub mod cuda {
/// Path to custom CUDA fatbin file /// Path to custom CUDA fatbin file
...@@ -406,6 +416,9 @@ mod tests { ...@@ -406,6 +416,9 @@ mod tests {
model::huggingface::HF_TOKEN, model::huggingface::HF_TOKEN,
model::huggingface::HF_HUB_CACHE, model::huggingface::HF_HUB_CACHE,
model::huggingface::HF_HOME, model::huggingface::HF_HOME,
// Event Plane
event_plane::DYN_EVENT_PLANE,
event_plane::DYN_EVENT_PLANE_CODEC,
// CUDA // CUDA
cuda::DYNAMO_FATBIN_PATH, cuda::DYNAMO_FATBIN_PATH,
// Build // Build
......
...@@ -132,6 +132,21 @@ impl Discovery for KubeDiscoveryClient { ...@@ -132,6 +132,21 @@ impl Discovery for KubeDiscoveryClient {
); );
metadata.register_model_card(instance.clone())?; metadata.register_model_card(instance.clone())?;
} }
DiscoveryInstance::EventChannel {
namespace,
component,
topic,
..
} => {
tracing::info!(
"Registering event channel: namespace={}, component={}, topic={}, instance_id={:x}",
namespace,
component,
topic,
instance_id
);
metadata.register_event_channel(instance.clone())?;
}
} }
// Build and apply the CR with the updated metadata // Build and apply the CR with the updated metadata
...@@ -189,6 +204,21 @@ impl Discovery for KubeDiscoveryClient { ...@@ -189,6 +204,21 @@ impl Discovery for KubeDiscoveryClient {
); );
metadata.unregister_model_card(&instance)?; metadata.unregister_model_card(&instance)?;
} }
DiscoveryInstance::EventChannel {
namespace,
component,
topic,
..
} => {
tracing::info!(
"Unregistering event channel: namespace={}, component={}, topic={}, instance_id={:x}",
namespace,
component,
topic,
instance_id
);
metadata.unregister_event_channel(&instance)?;
}
} }
// Build and apply the CR with the updated metadata // Build and apply the CR with the updated metadata
......
...@@ -11,12 +11,14 @@ use tokio_util::sync::CancellationToken; ...@@ -11,12 +11,14 @@ use tokio_util::sync::CancellationToken;
use super::{ use super::{
Discovery, DiscoveryEvent, DiscoveryInstance, DiscoveryInstanceId, DiscoveryQuery, Discovery, DiscoveryEvent, DiscoveryInstance, DiscoveryInstanceId, DiscoveryQuery,
DiscoverySpec, DiscoveryStream, EndpointInstanceId, ModelCardInstanceId, DiscoverySpec, DiscoveryStream, EndpointInstanceId, EventChannelInstanceId,
ModelCardInstanceId,
}; };
use crate::storage::kv; use crate::storage::kv;
const INSTANCES_BUCKET: &str = "v1/instances"; const INSTANCES_BUCKET: &str = "v1/instances";
const MODELS_BUCKET: &str = "v1/mdc"; const MODELS_BUCKET: &str = "v1/mdc";
const EVENT_CHANNELS_BUCKET: &str = "v1/event_channels";
/// Discovery implementation backed by a kv::Store /// Discovery implementation backed by a kv::Store
pub struct KVStoreDiscovery { pub struct KVStoreDiscovery {
...@@ -42,6 +44,16 @@ impl KVStoreDiscovery { ...@@ -42,6 +44,16 @@ impl KVStoreDiscovery {
format!("{}/{}/{}/{:x}", namespace, component, endpoint, instance_id) format!("{}/{}/{}/{:x}", namespace, component, endpoint, instance_id)
} }
/// Build the key path for an event channel relative to bucket, not absolute)
fn event_channel_key(
namespace: &str,
component: &str,
topic: &str,
instance_id: u64,
) -> String {
format!("{}/{}/{}/{:x}", namespace, component, topic, instance_id)
}
/// Extract prefix for querying based on discovery query /// Extract prefix for querying based on discovery query
fn query_prefix(query: &DiscoveryQuery) -> String { fn query_prefix(query: &DiscoveryQuery) -> String {
match query { match query {
...@@ -82,6 +94,22 @@ impl KVStoreDiscovery { ...@@ -82,6 +94,22 @@ impl KVStoreDiscovery {
} => { } => {
format!("{}/{}/{}/{}", MODELS_BUCKET, namespace, component, endpoint) format!("{}/{}/{}/{}", MODELS_BUCKET, namespace, component, endpoint)
} }
DiscoveryQuery::EventChannels(query) => {
let mut path = EVENT_CHANNELS_BUCKET.to_string();
if let Some(ns) = &query.namespace {
path.push('/');
path.push_str(ns);
if let Some(comp) = &query.component {
path.push('/');
path.push_str(comp);
if let Some(topic) = &query.topic {
path.push('/');
path.push_str(topic);
}
}
}
path
}
} }
} }
...@@ -190,6 +218,30 @@ impl Discovery for KVStoreDiscovery { ...@@ -190,6 +218,30 @@ impl Discovery for KVStoreDiscovery {
} }
(MODELS_BUCKET, key) (MODELS_BUCKET, key)
} }
DiscoveryInstance::EventChannel {
namespace,
component,
topic,
instance_id,
..
} => {
let key = Self::event_channel_key(namespace, component, topic, *instance_id);
// TODO: bis - remove this info log
tracing::info!(
"KVStoreDiscovery::register: EventChannel bucket={}, key={}",
EVENT_CHANNELS_BUCKET,
key
);
tracing::debug!(
"KVStoreDiscovery::register: Registering event channel instance_id={}, namespace={}, component={}, topic={}, key={}",
instance_id,
namespace,
component,
topic,
key
);
(EVENT_CHANNELS_BUCKET, key)
}
}; };
// Serialize the instance // Serialize the instance
...@@ -284,6 +336,24 @@ impl Discovery for KVStoreDiscovery { ...@@ -284,6 +336,24 @@ impl Discovery for KVStoreDiscovery {
} }
(MODELS_BUCKET, key) (MODELS_BUCKET, key)
} }
DiscoveryInstance::EventChannel {
namespace,
component,
topic,
instance_id,
..
} => {
let key = Self::event_channel_key(namespace, component, topic, *instance_id);
tracing::debug!(
"KVStoreDiscovery::unregister: Unregistering event channel instance_id={}, namespace={}, component={}, topic={}, key={}",
instance_id,
namespace,
component,
topic,
key
);
(EVENT_CHANNELS_BUCKET, key)
}
}; };
// Get the bucket - if it doesn't exist, the instance is already removed from the KV store // Get the bucket - if it doesn't exist, the instance is already removed from the KV store
...@@ -307,17 +377,32 @@ impl Discovery for KVStoreDiscovery { ...@@ -307,17 +377,32 @@ impl Discovery for KVStoreDiscovery {
let prefix = Self::query_prefix(&query); let prefix = Self::query_prefix(&query);
let bucket_name = if prefix.starts_with(INSTANCES_BUCKET) { let bucket_name = if prefix.starts_with(INSTANCES_BUCKET) {
INSTANCES_BUCKET INSTANCES_BUCKET
} else if prefix.starts_with(EVENT_CHANNELS_BUCKET) {
EVENT_CHANNELS_BUCKET
} else { } else {
MODELS_BUCKET MODELS_BUCKET
}; };
// Get bucket - if it doesn't exist, return empty list // Get bucket - if it doesn't exist, return empty list
let Some(bucket) = self.store.get_bucket(bucket_name).await? else { let Some(bucket) = self.store.get_bucket(bucket_name).await? else {
tracing::info!(
"KVStoreDiscovery::list: bucket missing for query={:?}, prefix={}, bucket={}",
query,
prefix,
bucket_name
);
return Ok(Vec::new()); return Ok(Vec::new());
}; };
// Get all entries from the bucket // Get all entries from the bucket
let entries = bucket.entries().await?; let entries = bucket.entries().await?;
tracing::info!(
"KVStoreDiscovery::list: query={:?}, prefix={}, bucket={}, entries={}",
query,
prefix,
bucket_name,
entries.len()
);
// Filter by prefix and deserialize // Filter by prefix and deserialize
let mut instances = Vec::new(); let mut instances = Vec::new();
...@@ -343,6 +428,8 @@ impl Discovery for KVStoreDiscovery { ...@@ -343,6 +428,8 @@ impl Discovery for KVStoreDiscovery {
let prefix = Self::query_prefix(&query); let prefix = Self::query_prefix(&query);
let bucket_name = if prefix.starts_with(INSTANCES_BUCKET) { let bucket_name = if prefix.starts_with(INSTANCES_BUCKET) {
INSTANCES_BUCKET INSTANCES_BUCKET
} else if prefix.starts_with(EVENT_CHANNELS_BUCKET) {
EVENT_CHANNELS_BUCKET
} else { } else {
MODELS_BUCKET MODELS_BUCKET
}; };
...@@ -402,18 +489,22 @@ impl Discovery for KVStoreDiscovery { ...@@ -402,18 +489,22 @@ impl Discovery for KVStoreDiscovery {
// - Endpoints: "namespace/component/endpoint/{instance_id:x}" // - Endpoints: "namespace/component/endpoint/{instance_id:x}"
// - Models: "namespace/component/endpoint/{instance_id:x}" // - Models: "namespace/component/endpoint/{instance_id:x}"
// - LoRA models: "namespace/component/endpoint/{instance_id:x}/{lora_slug}" // - LoRA models: "namespace/component/endpoint/{instance_id:x}/{lora_slug}"
// - EventChannels: "namespace/component/{instance_id:x}"
// //
// Use strip_bucket_prefix for consistency with matches_prefix(). // Use strip_bucket_prefix for consistency with matches_prefix().
let relative_key = Self::strip_bucket_prefix(key_str, bucket_name); let relative_key = Self::strip_bucket_prefix(key_str, bucket_name);
let key_parts: Vec<&str> = relative_key.split('/').collect(); let key_parts: Vec<&str> = relative_key.split('/').collect();
// In relative key: namespace/component/endpoint/{instance_id}[/{lora_slug}] // EventChannels need 4 parts (namespace/component/topic/instance_id)
// We need at least 4 parts: namespace, component, endpoint, instance_id // Endpoints/Models need at least 4 parts
if key_parts.len() < 4 { let min_parts = 4;
if key_parts.len() < min_parts {
tracing::warn!( tracing::warn!(
key = %key_str, key = %key_str,
relative_key = %relative_key, relative_key = %relative_key,
actual_parts = key_parts.len(), actual_parts = key_parts.len(),
expected_min = min_parts,
bucket = bucket_name,
"Delete event key doesn't have enough parts" "Delete event key doesn't have enough parts"
); );
continue; continue;
...@@ -421,49 +512,75 @@ impl Discovery for KVStoreDiscovery { ...@@ -421,49 +512,75 @@ impl Discovery for KVStoreDiscovery {
let namespace = key_parts[0].to_string(); let namespace = key_parts[0].to_string();
let component = key_parts[1].to_string(); let component = key_parts[1].to_string();
let endpoint = key_parts[2].to_string();
let instance_id_hex = key_parts[3]; // Handle EventChannel (4 parts: namespace/component/topic/instance_id) vs Endpoints/Models
let id = if bucket_name == EVENT_CHANNELS_BUCKET {
match u64::from_str_radix(instance_id_hex, 16) { // EventChannel keys: namespace/component/topic/{instance_id:x}
Ok(instance_id) => { let topic = key_parts[2].to_string();
// Construct the appropriate DiscoveryInstanceId based on bucket type let instance_id_hex = key_parts[3];
let id = if bucket_name == INSTANCES_BUCKET { match u64::from_str_radix(instance_id_hex, 16) {
DiscoveryInstanceId::Endpoint(EndpointInstanceId { Ok(instance_id) => {
namespace, DiscoveryInstanceId::EventChannel(EventChannelInstanceId {
component,
endpoint,
instance_id,
})
} else {
// Model - check for LoRA suffix (5th part if present)
let model_suffix = key_parts.get(4).map(|s| s.to_string());
DiscoveryInstanceId::Model(ModelCardInstanceId {
namespace, namespace,
component, component,
endpoint, topic,
instance_id, instance_id,
model_suffix,
}) })
}; }
Err(e) => {
tracing::debug!( tracing::warn!(
"KVStoreDiscovery::list_and_watch: Emitting Removed event for {:?}, key={}", key = %key_str,
id, error = %e,
key_str instance_id_hex = %instance_id_hex,
); "Failed to parse event channel instance_id hex"
Some(DiscoveryEvent::Removed(id)) );
continue;
}
} }
Err(e) => { } else {
tracing::warn!( let endpoint = key_parts[2].to_string();
key = %key_str, let instance_id_hex = key_parts[3];
relative_key = %relative_key,
error = %e, match u64::from_str_radix(instance_id_hex, 16) {
instance_id_hex = %instance_id_hex, Ok(instance_id) => {
"Failed to parse instance_id hex from deleted key" // Construct the appropriate DiscoveryInstanceId based on bucket type
); if bucket_name == INSTANCES_BUCKET {
None DiscoveryInstanceId::Endpoint(EndpointInstanceId {
namespace,
component,
endpoint,
instance_id,
})
} else {
// Model - check for LoRA suffix (5th part if present)
let model_suffix = key_parts.get(4).map(|s| s.to_string());
DiscoveryInstanceId::Model(ModelCardInstanceId {
namespace,
component,
endpoint,
instance_id,
model_suffix,
})
}
}
Err(e) => {
tracing::warn!(
key = %key_str,
error = %e,
instance_id_hex = %instance_id_hex,
"Failed to parse instance_id hex from deleted key"
);
continue;
}
} }
} };
tracing::debug!(
"KVStoreDiscovery::list_and_watch: Emitting Removed event for {:?}, key={}",
id,
key_str
);
Some(DiscoveryEvent::Removed(id))
} }
}; };
......
...@@ -15,6 +15,8 @@ pub struct DiscoveryMetadata { ...@@ -15,6 +15,8 @@ pub struct DiscoveryMetadata {
endpoints: HashMap<String, DiscoveryInstance>, endpoints: HashMap<String, DiscoveryInstance>,
/// Registered model card instances (key: path string from ModelCardInstanceId::to_path()) /// Registered model card instances (key: path string from ModelCardInstanceId::to_path())
model_cards: HashMap<String, DiscoveryInstance>, model_cards: HashMap<String, DiscoveryInstance>,
/// Registered event channel instances (key: path string from EventChannelInstanceId::to_path())
event_channels: HashMap<String, DiscoveryInstance>,
} }
impl DiscoveryMetadata { impl DiscoveryMetadata {
...@@ -23,6 +25,7 @@ impl DiscoveryMetadata { ...@@ -23,6 +25,7 @@ impl DiscoveryMetadata {
Self { Self {
endpoints: HashMap::new(), endpoints: HashMap::new(),
model_cards: HashMap::new(), model_cards: HashMap::new(),
event_channels: HashMap::new(),
} }
} }
...@@ -36,6 +39,9 @@ impl DiscoveryMetadata { ...@@ -36,6 +39,9 @@ impl DiscoveryMetadata {
DiscoveryInstanceId::Model(_) => { DiscoveryInstanceId::Model(_) => {
anyhow::bail!("Cannot register non-endpoint instance as endpoint") anyhow::bail!("Cannot register non-endpoint instance as endpoint")
} }
DiscoveryInstanceId::EventChannel(_) => {
anyhow::bail!("Cannot register EventChannel instance as endpoint")
}
} }
} }
...@@ -49,6 +55,9 @@ impl DiscoveryMetadata { ...@@ -49,6 +55,9 @@ impl DiscoveryMetadata {
DiscoveryInstanceId::Endpoint(_) => { DiscoveryInstanceId::Endpoint(_) => {
anyhow::bail!("Cannot register non-model-card instance as model card") anyhow::bail!("Cannot register non-model-card instance as model card")
} }
DiscoveryInstanceId::EventChannel(_) => {
anyhow::bail!("Cannot register EventChannel instance as model card")
}
} }
} }
...@@ -62,6 +71,9 @@ impl DiscoveryMetadata { ...@@ -62,6 +71,9 @@ impl DiscoveryMetadata {
DiscoveryInstanceId::Model(_) => { DiscoveryInstanceId::Model(_) => {
anyhow::bail!("Cannot unregister non-endpoint instance as endpoint") anyhow::bail!("Cannot unregister non-endpoint instance as endpoint")
} }
DiscoveryInstanceId::EventChannel(_) => {
anyhow::bail!("Cannot unregister EventChannel instance as endpoint")
}
} }
} }
...@@ -75,6 +87,41 @@ impl DiscoveryMetadata { ...@@ -75,6 +87,41 @@ impl DiscoveryMetadata {
DiscoveryInstanceId::Endpoint(_) => { DiscoveryInstanceId::Endpoint(_) => {
anyhow::bail!("Cannot unregister non-model-card instance as model card") anyhow::bail!("Cannot unregister non-model-card instance as model card")
} }
DiscoveryInstanceId::EventChannel(_) => {
anyhow::bail!("Cannot unregister EventChannel instance as model card")
}
}
}
/// Register an event channel instance
pub fn register_event_channel(&mut self, instance: DiscoveryInstance) -> Result<()> {
match instance.id() {
DiscoveryInstanceId::EventChannel(key) => {
self.event_channels.insert(key.to_path(), instance);
Ok(())
}
DiscoveryInstanceId::Endpoint(_) => {
anyhow::bail!("Cannot register Endpoint instance as event channel")
}
DiscoveryInstanceId::Model(_) => {
anyhow::bail!("Cannot register Model instance as event channel")
}
}
}
/// Unregister an event channel instance
pub fn unregister_event_channel(&mut self, instance: &DiscoveryInstance) -> Result<()> {
match instance.id() {
DiscoveryInstanceId::EventChannel(key) => {
self.event_channels.remove(&key.to_path());
Ok(())
}
DiscoveryInstanceId::Endpoint(_) => {
anyhow::bail!("Cannot unregister Endpoint instance as event channel")
}
DiscoveryInstanceId::Model(_) => {
anyhow::bail!("Cannot unregister Model instance as event channel")
}
} }
} }
...@@ -88,11 +135,17 @@ impl DiscoveryMetadata { ...@@ -88,11 +135,17 @@ impl DiscoveryMetadata {
self.model_cards.values().cloned().collect() self.model_cards.values().cloned().collect()
} }
/// Get all registered instances (endpoints and model cards) /// Get all registered event channels
pub fn get_all_event_channels(&self) -> Vec<DiscoveryInstance> {
self.event_channels.values().cloned().collect()
}
/// Get all registered instances (endpoints, model cards, and event channels)
pub fn get_all(&self) -> Vec<DiscoveryInstance> { pub fn get_all(&self) -> Vec<DiscoveryInstance> {
self.endpoints self.endpoints
.values() .values()
.chain(self.model_cards.values()) .chain(self.model_cards.values())
.chain(self.event_channels.values())
.cloned() .cloned()
.collect() .collect()
} }
...@@ -109,6 +162,9 @@ impl DiscoveryMetadata { ...@@ -109,6 +162,9 @@ impl DiscoveryMetadata {
| DiscoveryQuery::NamespacedModels { .. } | DiscoveryQuery::NamespacedModels { .. }
| DiscoveryQuery::ComponentModels { .. } | DiscoveryQuery::ComponentModels { .. }
| DiscoveryQuery::EndpointModels { .. } => self.get_all_model_cards(), | DiscoveryQuery::EndpointModels { .. } => self.get_all_model_cards(),
// EventChannel queries now return actual event channels
DiscoveryQuery::EventChannels(_) => self.get_all_event_channels(),
}; };
filter_instances(all_instances, query) filter_instances(all_instances, query)
...@@ -205,6 +261,27 @@ fn filter_instances( ...@@ -205,6 +261,27 @@ fn filter_instances(
_ => false, _ => false,
}) })
.collect(), .collect(),
// EventChannel queries - unified filtering with optional scope filters
DiscoveryQuery::EventChannels(query) => instances
.into_iter()
.filter(|inst| match inst {
DiscoveryInstance::EventChannel {
namespace: ns,
component: comp,
topic: t,
..
} => {
// Filter by namespace if specified
query.namespace.as_ref().is_none_or(|qns| qns == ns)
// Filter by component if specified
&& query.component.as_ref().is_none_or(|qc| qc == comp)
// Filter by topic if specified
&& query.topic.as_ref().is_none_or(|qt| qt == t)
}
_ => false,
})
.collect(),
} }
} }
...@@ -289,6 +366,7 @@ impl MetadataSnapshot { ...@@ -289,6 +366,7 @@ impl MetadataSnapshot {
mod tests { mod tests {
use super::*; use super::*;
use crate::component::{Instance, TransportType}; use crate::component::{Instance, TransportType};
use crate::discovery::EventChannelQuery;
#[test] #[test]
fn test_metadata_serde() { fn test_metadata_serde() {
...@@ -382,4 +460,98 @@ mod tests { ...@@ -382,4 +460,98 @@ mod tests {
assert_eq!(metadata.get_all_model_cards().len(), 2); assert_eq!(metadata.get_all_model_cards().len(), 2);
assert_eq!(metadata.get_all().len(), 5); assert_eq!(metadata.get_all().len(), 5);
} }
#[tokio::test]
async fn test_event_channel_registration() {
use crate::discovery::EventTransport;
let mut metadata = DiscoveryMetadata::new();
// Register event channels
for i in 0..3 {
let instance = DiscoveryInstance::EventChannel {
namespace: "test".to_string(),
component: "comp1".to_string(),
topic: "test-topic".to_string(),
instance_id: i,
transport: EventTransport::zmq(format!("tcp://localhost:{}", 5000 + i)),
};
metadata.register_event_channel(instance).unwrap();
}
// Test get_all_event_channels
assert_eq!(metadata.get_all_event_channels().len(), 3);
// Test get_all includes event channels
assert_eq!(metadata.get_all().len(), 3);
// Test filter by all event channels
let filtered = metadata.filter(&DiscoveryQuery::EventChannels(EventChannelQuery::all()));
assert_eq!(filtered.len(), 3);
// Test filter by component
let filtered = metadata.filter(&DiscoveryQuery::EventChannels(
EventChannelQuery::component("test", "comp1"),
));
assert_eq!(filtered.len(), 3);
// Test filter with non-matching query
let filtered = metadata.filter(&DiscoveryQuery::EventChannels(
EventChannelQuery::component("other", "comp1"),
));
assert_eq!(filtered.len(), 0);
// Test unregister
let instance = DiscoveryInstance::EventChannel {
namespace: "test".to_string(),
component: "comp1".to_string(),
topic: "test-topic".to_string(),
instance_id: 0,
transport: EventTransport::zmq("tcp://localhost:5000"),
};
metadata.unregister_event_channel(&instance).unwrap();
assert_eq!(metadata.get_all_event_channels().len(), 2);
}
#[tokio::test]
async fn test_mixed_instances() {
use crate::discovery::EventTransport;
let mut metadata = DiscoveryMetadata::new();
// Register one of each type
let endpoint = DiscoveryInstance::Endpoint(Instance {
namespace: "test".to_string(),
component: "comp1".to_string(),
endpoint: "ep1".to_string(),
instance_id: 1,
transport: TransportType::Nats("nats://localhost:4222".to_string()),
});
metadata.register_endpoint(endpoint).unwrap();
let model = DiscoveryInstance::Model {
namespace: "test".to_string(),
component: "comp1".to_string(),
endpoint: "ep1".to_string(),
instance_id: 2,
card_json: serde_json::json!({"model": "test"}),
model_suffix: None,
};
metadata.register_model_card(model).unwrap();
let event_channel = DiscoveryInstance::EventChannel {
namespace: "test".to_string(),
component: "comp1".to_string(),
topic: "test-topic".to_string(),
instance_id: 3,
transport: EventTransport::zmq("tcp://localhost:5000"),
};
metadata.register_event_channel(event_channel).unwrap();
// Verify get_all returns all three
assert_eq!(metadata.get_all().len(), 3);
assert_eq!(metadata.get_all_endpoints().len(), 1);
assert_eq!(metadata.get_all_model_cards().len(), 1);
assert_eq!(metadata.get_all_event_channels().len(), 1);
}
} }
...@@ -105,20 +105,48 @@ fn matches_query(instance: &DiscoveryInstance, query: &DiscoveryQuery) -> bool { ...@@ -105,20 +105,48 @@ fn matches_query(instance: &DiscoveryInstance, query: &DiscoveryQuery) -> bool {
}, },
) => inst_ns == namespace && inst_comp == component && inst_ep == endpoint, ) => inst_ns == namespace && inst_comp == component && inst_ep == endpoint,
// EventChannel matching - unified query
(
DiscoveryInstance::EventChannel {
namespace: inst_ns,
component: inst_comp,
topic: inst_topic,
..
},
DiscoveryQuery::EventChannels(query),
) => {
query.namespace.as_ref().is_none_or(|ns| ns == inst_ns)
&& query.component.as_ref().is_none_or(|c| c == inst_comp)
&& query.topic.as_ref().is_none_or(|t| t == inst_topic)
}
// Cross-type matches return false // Cross-type matches return false
( (
DiscoveryInstance::Endpoint(_), DiscoveryInstance::Endpoint(_),
DiscoveryQuery::AllModels DiscoveryQuery::AllModels
| DiscoveryQuery::NamespacedModels { .. } | DiscoveryQuery::NamespacedModels { .. }
| DiscoveryQuery::ComponentModels { .. } | DiscoveryQuery::ComponentModels { .. }
| DiscoveryQuery::EndpointModels { .. }, | DiscoveryQuery::EndpointModels { .. }
| DiscoveryQuery::EventChannels(_),
) => false, ) => false,
( (
DiscoveryInstance::Model { .. }, DiscoveryInstance::Model { .. },
DiscoveryQuery::AllEndpoints DiscoveryQuery::AllEndpoints
| DiscoveryQuery::NamespacedEndpoints { .. } | DiscoveryQuery::NamespacedEndpoints { .. }
| DiscoveryQuery::ComponentEndpoints { .. } | DiscoveryQuery::ComponentEndpoints { .. }
| DiscoveryQuery::Endpoint { .. }, | DiscoveryQuery::Endpoint { .. }
| DiscoveryQuery::EventChannels(_),
) => false,
(
DiscoveryInstance::EventChannel { .. },
DiscoveryQuery::AllEndpoints
| DiscoveryQuery::NamespacedEndpoints { .. }
| DiscoveryQuery::ComponentEndpoints { .. }
| DiscoveryQuery::Endpoint { .. }
| DiscoveryQuery::AllModels
| DiscoveryQuery::NamespacedModels { .. }
| DiscoveryQuery::ComponentModels { .. }
| DiscoveryQuery::EndpointModels { .. },
) => false, ) => false,
} }
} }
...@@ -261,6 +289,7 @@ mod tests { ...@@ -261,6 +289,7 @@ mod tests {
registry.instances.lock().unwrap().retain(|i| match i { registry.instances.lock().unwrap().retain(|i| match i {
DiscoveryInstance::Endpoint(inst) => inst.instance_id != 1, DiscoveryInstance::Endpoint(inst) => inst.instance_id != 1,
DiscoveryInstance::Model { instance_id, .. } => *instance_id != 1, DiscoveryInstance::Model { instance_id, .. } => *instance_id != 1,
DiscoveryInstance::EventChannel { instance_id, .. } => *instance_id != 1,
}); });
let event = stream.next().await.unwrap().unwrap(); let event = stream.next().await.unwrap().unwrap();
......
...@@ -23,6 +23,155 @@ pub mod utils; ...@@ -23,6 +23,155 @@ pub mod utils;
use crate::component::TransportType; use crate::component::TransportType;
pub use utils::watch_and_extract_field; pub use utils::watch_and_extract_field;
/// Transport kind for event plane - used for configuration and env var selection.
///
/// This enum represents the *type* of transport without connection details.
/// Use `EventTransport` when you need the full transport configuration.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, Default)]
#[serde(rename_all = "snake_case")]
pub enum EventTransportKind {
/// NATS Core pub/sub
#[default]
Nats,
/// ZMQ pub/sub
Zmq,
}
impl EventTransportKind {
/// Parse from environment variable `DYN_EVENT_PLANE`.
/// Returns `Nats` if not set or empty.
/// Returns error for invalid values.
pub fn from_env() -> Result<Self> {
match std::env::var(crate::config::environment_names::event_plane::DYN_EVENT_PLANE)
.as_deref()
{
Ok("nats") | Ok("") | Err(_) => Ok(Self::Nats),
Ok("zmq") => Ok(Self::Zmq),
Ok(other) => anyhow::bail!(
"Invalid DYN_EVENT_PLANE value '{}'. Valid values: 'nats', 'zmq'",
other
),
}
}
/// Parse from environment variable, defaulting to Nats on error.
/// Logs a warning if an invalid value is encountered.
pub fn from_env_or_default() -> Self {
Self::from_env().unwrap_or_else(|e| {
tracing::warn!("{}, defaulting to NATS", e);
Self::Nats
})
}
/// Get the default codec for this transport kind.
/// NATS defaults to JSON, ZMQ defaults to MsgPack.
pub fn default_codec(&self) -> EventCodecKind {
match self {
Self::Nats => EventCodecKind::Json,
Self::Zmq => EventCodecKind::Msgpack,
}
}
}
/// Codec kind for event plane serialization.
///
/// This enum represents the serialization format for event envelopes and payloads.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum EventCodecKind {
/// JSON codec - human-readable, good for debugging
Json,
/// MessagePack codec - compact binary format
Msgpack,
}
impl EventCodecKind {
/// Parse from environment variable `DYN_EVENT_PLANE_CODEC`.
/// Returns None if not set, allowing transport to select default.
/// Returns error for invalid values.
pub fn from_env() -> Result<Option<Self>> {
match std::env::var(crate::config::environment_names::event_plane::DYN_EVENT_PLANE_CODEC)
.as_deref()
{
Err(_) => Ok(None), // Not set
Ok("") => Ok(None), // Empty
Ok("json") => Ok(Some(Self::Json)),
Ok("msgpack") => Ok(Some(Self::Msgpack)),
Ok(other) => anyhow::bail!(
"Invalid DYN_EVENT_PLANE_CODEC value '{}'. Valid values: 'json', 'msgpack'",
other
),
}
}
/// Parse from environment variable with transport-specific default.
/// Logs a warning if an invalid value is encountered.
pub fn from_env_or_transport_default(transport: EventTransportKind) -> Self {
Self::from_env()
.unwrap_or_else(|e| {
tracing::warn!(
"{}, defaulting to {:?} for {:?}",
e,
transport.default_codec(),
transport
);
None
})
.unwrap_or_else(|| transport.default_codec())
}
}
/// Transport configuration for event plane channels.
///
/// This enum carries both the transport kind and its connection configuration.
/// Kept separate from `TransportType` (request plane) to distinguish event semantics.
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[serde(tag = "kind", content = "config")]
pub enum EventTransport {
/// NATS Core pub/sub - subject prefix for the channel
Nats {
/// Subject prefix (e.g., "namespace.dynamo.component.backend")
subject_prefix: String,
},
/// ZMQ pub/sub - endpoint address
Zmq {
/// ZMQ endpoint (e.g., "tcp://host:port")
endpoint: String,
},
}
impl EventTransport {
/// Get the transport kind
pub fn kind(&self) -> EventTransportKind {
match self {
Self::Nats { .. } => EventTransportKind::Nats,
Self::Zmq { .. } => EventTransportKind::Zmq,
}
}
/// Create a NATS transport with the given subject prefix
pub fn nats(subject_prefix: impl Into<String>) -> Self {
Self::Nats {
subject_prefix: subject_prefix.into(),
}
}
/// Create a ZMQ transport with the given endpoint
pub fn zmq(endpoint: impl Into<String>) -> Self {
Self::Zmq {
endpoint: endpoint.into(),
}
}
/// Get the subject prefix (NATS) or endpoint (ZMQ)
pub fn address(&self) -> &str {
match self {
Self::Nats { subject_prefix } => subject_prefix,
Self::Zmq { endpoint } => endpoint,
}
}
}
/// Query key for prefix-based discovery queries /// Query key for prefix-based discovery queries
/// Supports hierarchical queries from all endpoints down to specific endpoints /// Supports hierarchical queries from all endpoints down to specific endpoints
#[derive(Debug, Clone, PartialEq, Eq, Hash)] #[derive(Debug, Clone, PartialEq, Eq, Hash)]
...@@ -57,6 +206,74 @@ pub enum DiscoveryQuery { ...@@ -57,6 +206,74 @@ pub enum DiscoveryQuery {
component: String, component: String,
endpoint: String, endpoint: String,
}, },
/// Unified event channel query with optional scope filters
EventChannels(EventChannelQuery),
}
/// Unified query for event channels with optional scope filters
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct EventChannelQuery {
/// Optional namespace filter
pub namespace: Option<String>,
/// Optional component filter (requires namespace to be meaningful)
pub component: Option<String>,
/// Optional topic filter (requires namespace and component to be meaningful)
pub topic: Option<String>,
}
impl EventChannelQuery {
/// Query all event channels (no filters)
pub fn all() -> Self {
Self {
namespace: None,
component: None,
topic: None,
}
}
/// Query event channels in a specific namespace
pub fn namespace(namespace: impl Into<String>) -> Self {
Self {
namespace: Some(namespace.into()),
component: None,
topic: None,
}
}
/// Query event channels for a specific component
pub fn component(namespace: impl Into<String>, component: impl Into<String>) -> Self {
Self {
namespace: Some(namespace.into()),
component: Some(component.into()),
topic: None,
}
}
/// Query event channels for a specific topic
pub fn topic(
namespace: impl Into<String>,
component: impl Into<String>,
topic: impl Into<String>,
) -> Self {
Self {
namespace: Some(namespace.into()),
component: Some(component.into()),
topic: Some(topic.into()),
}
}
/// Get the scope level (0=all, 1=namespace, 2=component, 3=topic)
pub fn scope_level(&self) -> u8 {
if self.topic.is_some() {
3
} else if self.component.is_some() {
2
} else if self.namespace.is_some() {
1
} else {
0
}
}
} }
/// Specification for registering objects in the discovery plane /// Specification for registering objects in the discovery plane
...@@ -83,6 +300,16 @@ pub enum DiscoverySpec { ...@@ -83,6 +300,16 @@ pub enum DiscoverySpec {
/// Key format: {namespace}/{component}/{endpoint}/{instance_id}[/{model_suffix}] /// Key format: {namespace}/{component}/{endpoint}/{instance_id}[/{model_suffix}]
model_suffix: Option<String>, model_suffix: Option<String>,
}, },
/// Event plane channel specification
/// Used for registering event publishers/subscribers for discovery
EventChannel {
namespace: String,
component: String,
/// Topic name for this channel (e.g., "kv-events", "kv-metrics")
topic: String,
/// Event transport type (NATS subject prefix or ZMQ endpoint)
transport: EventTransport,
},
} }
impl DiscoverySpec { impl DiscoverySpec {
...@@ -151,6 +378,18 @@ impl DiscoverySpec { ...@@ -151,6 +378,18 @@ impl DiscoverySpec {
card_json, card_json,
model_suffix, model_suffix,
}, },
Self::EventChannel {
namespace,
component,
topic,
transport,
} => DiscoveryInstance::EventChannel {
namespace,
component,
topic,
instance_id,
transport,
},
} }
} }
} }
...@@ -174,6 +413,16 @@ pub enum DiscoveryInstance { ...@@ -174,6 +413,16 @@ pub enum DiscoveryInstance {
#[serde(default, skip_serializing_if = "Option::is_none")] #[serde(default, skip_serializing_if = "Option::is_none")]
model_suffix: Option<String>, model_suffix: Option<String>,
}, },
/// Registered event channel instance for event plane pub/sub
EventChannel {
namespace: String,
component: String,
/// Topic name for this channel (e.g., "kv-events", "kv-metrics")
topic: String,
instance_id: u64,
/// Event transport type (NATS subject prefix or ZMQ endpoint)
transport: EventTransport,
},
} }
impl DiscoveryInstance { impl DiscoveryInstance {
...@@ -182,6 +431,7 @@ impl DiscoveryInstance { ...@@ -182,6 +431,7 @@ impl DiscoveryInstance {
match self { match self {
Self::Endpoint(inst) => inst.instance_id, Self::Endpoint(inst) => inst.instance_id,
Self::Model { instance_id, .. } => *instance_id, Self::Model { instance_id, .. } => *instance_id,
Self::EventChannel { instance_id, .. } => *instance_id,
} }
} }
...@@ -196,6 +446,9 @@ impl DiscoveryInstance { ...@@ -196,6 +446,9 @@ impl DiscoveryInstance {
Self::Endpoint(_) => { Self::Endpoint(_) => {
anyhow::bail!("Cannot deserialize model from Endpoint instance") anyhow::bail!("Cannot deserialize model from Endpoint instance")
} }
Self::EventChannel { .. } => {
anyhow::bail!("Cannot deserialize model from EventChannel instance")
}
} }
} }
...@@ -223,6 +476,18 @@ impl DiscoveryInstance { ...@@ -223,6 +476,18 @@ impl DiscoveryInstance {
instance_id: *instance_id, instance_id: *instance_id,
model_suffix: model_suffix.clone(), model_suffix: model_suffix.clone(),
}), }),
Self::EventChannel {
namespace,
component,
topic,
instance_id,
..
} => DiscoveryInstanceId::EventChannel(EventChannelInstanceId {
namespace: namespace.clone(),
component: component.clone(),
topic: topic.clone(),
instance_id: *instance_id,
}),
} }
} }
} }
...@@ -276,6 +541,44 @@ pub struct ModelCardInstanceId { ...@@ -276,6 +541,44 @@ pub struct ModelCardInstanceId {
pub model_suffix: Option<String>, pub model_suffix: Option<String>,
} }
/// Unique identifier for an event channel instance
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct EventChannelInstanceId {
pub namespace: String,
pub component: String,
/// Topic name for this channel (e.g., "kv-events", "kv-metrics")
pub topic: String,
pub instance_id: u64,
}
impl EventChannelInstanceId {
/// Converts to a path string: `{namespace}/{component}/{topic}/{instance_id:x}`
pub fn to_path(&self) -> String {
format!(
"{}/{}/{}/{:x}",
self.namespace, self.component, self.topic, self.instance_id
)
}
/// Parses from a path string: `{namespace}/{component}/{topic}/{instance_id:x}`
pub fn from_path(path: &str) -> Result<Self> {
let parts: Vec<&str> = path.split('/').collect();
if parts.len() != 4 {
anyhow::bail!(
"Invalid EventChannelInstanceId path: expected 4 parts, got {}",
parts.len()
);
}
Ok(Self {
namespace: parts[0].to_string(),
component: parts[1].to_string(),
topic: parts[2].to_string(),
instance_id: u64::from_str_radix(parts[3], 16)
.map_err(|e| anyhow::anyhow!("Invalid instance_id hex: {}", e))?,
})
}
}
impl ModelCardInstanceId { impl ModelCardInstanceId {
/// Converts to a path string: `{namespace}/{component}/{endpoint}/{instance_id:x}[/{model_suffix}]` /// Converts to a path string: `{namespace}/{component}/{endpoint}/{instance_id:x}[/{model_suffix}]`
pub fn to_path(&self) -> String { pub fn to_path(&self) -> String {
...@@ -316,6 +619,7 @@ impl ModelCardInstanceId { ...@@ -316,6 +619,7 @@ impl ModelCardInstanceId {
pub enum DiscoveryInstanceId { pub enum DiscoveryInstanceId {
Endpoint(EndpointInstanceId), Endpoint(EndpointInstanceId),
Model(ModelCardInstanceId), Model(ModelCardInstanceId),
EventChannel(EventChannelInstanceId),
} }
impl DiscoveryInstanceId { impl DiscoveryInstanceId {
...@@ -324,22 +628,34 @@ impl DiscoveryInstanceId { ...@@ -324,22 +628,34 @@ impl DiscoveryInstanceId {
match self { match self {
Self::Endpoint(eid) => eid.instance_id, Self::Endpoint(eid) => eid.instance_id,
Self::Model(mid) => mid.instance_id, Self::Model(mid) => mid.instance_id,
Self::EventChannel(ecid) => ecid.instance_id,
} }
} }
/// Extracts the EndpointInstanceId, returning an error if this is a Model variant /// Extracts the EndpointInstanceId, returning an error if this is a Model or EventChannel variant
pub fn extract_endpoint_id(&self) -> Result<&EndpointInstanceId> { pub fn extract_endpoint_id(&self) -> Result<&EndpointInstanceId> {
match self { match self {
Self::Endpoint(eid) => Ok(eid), Self::Endpoint(eid) => Ok(eid),
Self::Model(_) => anyhow::bail!("Expected Endpoint variant, got Model"), Self::Model(_) => anyhow::bail!("Expected Endpoint variant, got Model"),
Self::EventChannel(_) => anyhow::bail!("Expected Endpoint variant, got EventChannel"),
} }
} }
/// Extracts the ModelCardInstanceId, returning an error if this is an Endpoint variant /// Extracts the ModelCardInstanceId, returning an error if this is an Endpoint or EventChannel variant
pub fn extract_model_id(&self) -> Result<&ModelCardInstanceId> { pub fn extract_model_id(&self) -> Result<&ModelCardInstanceId> {
match self { match self {
Self::Model(mid) => Ok(mid), Self::Model(mid) => Ok(mid),
Self::Endpoint(_) => anyhow::bail!("Expected Model variant, got Endpoint"), Self::Endpoint(_) => anyhow::bail!("Expected Model variant, got Endpoint"),
Self::EventChannel(_) => anyhow::bail!("Expected Model variant, got EventChannel"),
}
}
/// Extracts the EventChannelInstanceId, returning an error if this is an Endpoint or Model variant
pub fn extract_event_channel_id(&self) -> Result<&EventChannelInstanceId> {
match self {
Self::EventChannel(ecid) => Ok(ecid),
Self::Endpoint(_) => anyhow::bail!("Expected EventChannel variant, got Endpoint"),
Self::Model(_) => anyhow::bail!("Expected EventChannel variant, got Model"),
} }
} }
} }
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment