// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. // SPDX-License-Identifier: Apache-2.0 //! Messenger transport and discovery configuration. use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use anyhow::{Context, Result, bail}; use serde::{Deserialize, Serialize}; use validator::Validate; use crate::discovery::DiscoveryConfig; /// Messenger configuration combining backend and discovery settings. #[derive(Debug, Clone, Default, Serialize, Deserialize, Validate)] pub struct MessengerConfig { #[validate(nested)] pub backend: MessengerBackendConfig, /// Discovery configuration. None = discovery disabled. #[serde(default)] pub discovery: Option, } impl MessengerConfig { /// Build a Messenger instance from this configuration. /// /// This creates: /// 1. A TCP transport bound to the configured address /// 2. A discovery backend based on the configured type (if any) /// 3. A Messenger instance combining both pub async fn build_messenger(&self) -> Result> { use std::net::TcpListener; use std::sync::Arc; use velo::Messenger; use velo::backend::tcp::TcpTransportBuilder; // 1. Build TCP transport // Pre-bind listener to get OS-assigned port (if port is 0) let bind_addr = self.backend.resolve_bind_addr()?; let listener = TcpListener::bind(bind_addr) .with_context(|| format!("Failed to bind TCP listener to {}", bind_addr))?; // Extract actual bound address (with real port if OS-assigned) let actual_addr = listener .local_addr() .context("Failed to get local address from listener")?; tracing::info!("Built TCP transport bound to {}", actual_addr); // Build transport using from_listener to use the actual port let tcp_transport = TcpTransportBuilder::new() .from_listener(listener)? .build() .context("Failed to build TCP transport")?; let tcp_transport = Arc::new(tcp_transport); // 2. Build discovery backend based on configuration let mut builder = Messenger::builder().add_transport(tcp_transport); if let Some(discovery_config) = &self.discovery { match discovery_config { DiscoveryConfig::Etcd(_cfg) => { bail!("Etcd discovery not yet supported in velo"); } DiscoveryConfig::P2p(_cfg) => { bail!("P2P discovery not yet supported in velo"); } DiscoveryConfig::Filesystem(cfg) => { use velo::discovery::FilesystemPeerDiscovery; let peer_discovery = FilesystemPeerDiscovery::new(&cfg.path) .context("Failed to build filesystem discovery")?; builder = builder.discovery(Arc::new(peer_discovery)); tracing::info!("Built filesystem discovery from: {:?}", cfg.path); } } } // 3. Build Messenger let messenger = builder.build().await.context("Failed to build Messenger")?; Ok(messenger) } } /// Messenger backend (transport) configuration. #[derive(Debug, Clone, Default, Serialize, Deserialize, Validate)] pub struct MessengerBackendConfig { /// IP address to bind (mutually exclusive with tcp_interface). /// e.g., "0.0.0.0" or "192.168.1.100" pub tcp_addr: Option, /// Network interface to bind (mutually exclusive with tcp_addr). /// e.g., "eth0", "ens192" pub tcp_interface: Option, /// TCP port to bind. 0 means OS-assigned (ephemeral port). #[serde(default)] pub tcp_port: u16, } impl MessengerBackendConfig { /// Resolve the bind address from either interface name or explicit address. /// /// Returns error if both tcp_addr and tcp_interface are specified. pub fn resolve_bind_addr(&self) -> Result { let ip = match (&self.tcp_addr, &self.tcp_interface) { (Some(_), Some(_)) => { bail!("tcp_addr and tcp_interface are mutually exclusive") } (Some(addr), None) => addr .parse::() .with_context(|| format!("Invalid IP address: {}", addr))?, (None, Some(iface)) => get_interface_ip(iface) .with_context(|| format!("Failed to get IP for interface: {}", iface))?, (None, None) => IpAddr::V4(Ipv4Addr::UNSPECIFIED), }; Ok(SocketAddr::new(ip, self.tcp_port)) } } /// Get the IP address for a network interface. fn get_interface_ip(interface_name: &str) -> Result { use nix::ifaddrs::getifaddrs; let addrs = getifaddrs().context("Failed to get interface addresses")?; for ifaddr in addrs { if ifaddr.interface_name == interface_name && let Some(addr) = ifaddr.address { // Prefer IPv4 addresses if let Some(sockaddr) = addr.as_sockaddr_in() { return Ok(IpAddr::V4(sockaddr.ip())); } // Fall back to IPv6 if no IPv4 if let Some(sockaddr) = addr.as_sockaddr_in6() { return Ok(IpAddr::V6(sockaddr.ip())); } } } bail!("No IP address found for interface: {}", interface_name) } #[cfg(test)] mod tests { use super::*; #[test] fn test_default_backend_config() { let config = MessengerBackendConfig::default(); assert!(config.tcp_addr.is_none()); assert!(config.tcp_interface.is_none()); assert_eq!(config.tcp_port, 0); } #[test] fn test_resolve_bind_addr_default() { let config = MessengerBackendConfig::default(); let addr = config.resolve_bind_addr().unwrap(); assert_eq!(addr.ip(), IpAddr::V4(Ipv4Addr::UNSPECIFIED)); assert_eq!(addr.port(), 0); } #[test] fn test_resolve_bind_addr_explicit() { let config = MessengerBackendConfig { tcp_addr: Some("192.168.1.100".to_string()), tcp_interface: None, tcp_port: 8080, }; let addr = config.resolve_bind_addr().unwrap(); assert_eq!(addr.ip(), IpAddr::V4(Ipv4Addr::new(192, 168, 1, 100))); assert_eq!(addr.port(), 8080); } #[test] fn test_resolve_bind_addr_mutual_exclusivity() { let config = MessengerBackendConfig { tcp_addr: Some("0.0.0.0".to_string()), tcp_interface: Some("eth0".to_string()), tcp_port: 0, }; let result = config.resolve_bind_addr(); assert!(result.is_err()); assert!( result .unwrap_err() .to_string() .contains("mutually exclusive") ); } }