Unverified Commit 886506c1 authored by Graham King's avatar Graham King Committed by GitHub
Browse files

feat: Command line flag to set request plane mode: tcp, http or nats (#4365)


Signed-off-by: default avatarGraham King <grahamk@nvidia.com>
parent 9f3d1b90
......@@ -435,6 +435,6 @@ This is required because DistributedRuntime is a process-level singleton.
)
loop = asyncio.get_running_loop()
runtime = DistributedRuntime(loop, "file")
runtime = DistributedRuntime(loop, "file", "nats")
yield runtime
runtime.shutdown()
......@@ -34,7 +34,7 @@ async def distributed_runtime():
Each test gets its own runtime in a forked process to avoid singleton conflicts.
"""
loop = asyncio.get_running_loop()
runtime = DistributedRuntime(loop, "etcd")
runtime = DistributedRuntime(loop, "etcd", "nats")
yield runtime
runtime.shutdown()
......
......@@ -299,7 +299,6 @@ mod integration_tests {
};
use dynamo_runtime::DistributedRuntime;
use dynamo_runtime::discovery::DiscoveryQuery;
use dynamo_runtime::pipeline::RouterMode;
use std::sync::Arc;
#[tokio::test]
......
......@@ -32,7 +32,8 @@
use std::fmt;
use crate::{
config::{HealthStatus, RequestPlaneMode},
config::HealthStatus,
distributed::RequestPlaneMode,
metrics::{MetricsHierarchy, MetricsRegistry, prometheus_names},
service::ServiceSet,
transports::etcd::{ETCD_ROOT_PATH, EtcdPath},
......@@ -412,7 +413,7 @@ impl Component {
// Register metrics callback. CRITICAL: Never fail service creation for metrics issues.
// Only enable NATS service metrics collection when using NATS request plane mode
let request_plane_mode = RequestPlaneMode::get();
let request_plane_mode = self.drt.request_plane();
match request_plane_mode {
RequestPlaneMode::Nats => {
if let Err(err) = self.start_scraping_nats_service_component_metrics() {
......
......@@ -12,7 +12,7 @@ use tokio_util::sync::CancellationToken;
use crate::{
component::{Endpoint, Instance, TransportType, service::EndpointStatsHandler},
config::RequestPlaneMode,
distributed::RequestPlaneMode,
pipeline::network::{PushWorkHandler, ingress::push_endpoint::PushEndpoint},
storage::key_value_store,
traits::DistributedRuntimeProvider,
......@@ -113,7 +113,7 @@ impl EndpointConfigBuilder {
}
// Determine request plane mode
let request_plane_mode = RequestPlaneMode::get();
let request_plane_mode = endpoint.drt().request_plane();
tracing::info!(
"Endpoint starting with request plane mode: {:?}",
request_plane_mode
......
......@@ -3,7 +3,6 @@
use super::*;
use crate::component::Component;
use crate::config::RequestPlaneMode;
use async_nats::service::Service as NatsService;
use async_nats::service::ServiceExt as _;
use derive_builder::Builder;
......
......@@ -469,75 +469,6 @@ pub fn use_local_timezone() -> bool {
env_is_truthy("DYN_LOG_USE_LOCAL_TZ")
}
/// Request plane transport mode configuration
///
/// This determines how requests are distributed from routers to workers:
/// - `Nats`: Use NATS for request distribution (default, legacy)
/// - `Http`: Use HTTP/2 for request distribution
/// - `Tcp`: Use raw TCP for request distribution with msgpack support
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum RequestPlaneMode {
/// Use NATS for request plane (default for backward compatibility)
Nats,
/// Use HTTP/2 for request plane
Http,
/// Use raw TCP for request plane with msgpack support
Tcp,
}
impl Default for RequestPlaneMode {
fn default() -> Self {
Self::Nats
}
}
impl fmt::Display for RequestPlaneMode {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Nats => write!(f, "nats"),
Self::Http => write!(f, "http"),
Self::Tcp => write!(f, "tcp"),
}
}
}
impl std::str::FromStr for RequestPlaneMode {
type Err = anyhow::Error;
fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
match s.to_lowercase().as_str() {
"nats" => Ok(Self::Nats),
"http" => Ok(Self::Http),
"tcp" => Ok(Self::Tcp),
_ => Err(anyhow::anyhow!(
"Invalid request plane mode: '{}'. Valid options are: 'nats', 'http', 'tcp'",
s
)),
}
}
}
/// Global cached request plane mode
static REQUEST_PLANE_MODE: OnceLock<RequestPlaneMode> = OnceLock::new();
impl RequestPlaneMode {
/// The cached request plane mode, initialized from `DYN_REQUEST_PLANE` environment variable
/// or defaulting to NATS if not set or invalid.
pub fn get() -> Self {
*REQUEST_PLANE_MODE.get_or_init(Self::from_env)
}
/// Get the request plane mode from environment variable (uncached)
/// Reads from `DYN_REQUEST_PLANE` environment variable.
pub fn from_env() -> Self {
std::env::var("DYN_REQUEST_PLANE")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or_default()
}
}
#[cfg(test)]
mod tests {
use super::*;
......@@ -741,62 +672,4 @@ mod tests {
assert!(!env_is_falsey("TEST_MISSING"));
});
}
#[test]
fn test_request_plane_mode_from_str() {
assert_eq!(
"nats".parse::<RequestPlaneMode>().unwrap(),
RequestPlaneMode::Nats
);
assert_eq!(
"http".parse::<RequestPlaneMode>().unwrap(),
RequestPlaneMode::Http
);
assert_eq!(
"tcp".parse::<RequestPlaneMode>().unwrap(),
RequestPlaneMode::Tcp
);
assert_eq!(
"NATS".parse::<RequestPlaneMode>().unwrap(),
RequestPlaneMode::Nats
);
assert_eq!(
"HTTP".parse::<RequestPlaneMode>().unwrap(),
RequestPlaneMode::Http
);
assert_eq!(
"TCP".parse::<RequestPlaneMode>().unwrap(),
RequestPlaneMode::Tcp
);
assert!("invalid".parse::<RequestPlaneMode>().is_err());
}
#[test]
fn test_request_plane_mode_display() {
assert_eq!(RequestPlaneMode::Nats.to_string(), "nats");
assert_eq!(RequestPlaneMode::Http.to_string(), "http");
assert_eq!(RequestPlaneMode::Tcp.to_string(), "tcp");
}
#[test]
fn test_request_plane_mode_default() {
assert_eq!(RequestPlaneMode::default(), RequestPlaneMode::Nats);
}
#[test]
fn test_request_plane_mode_get_cached() {
// Test that get() returns a consistent value
let mode1 = RequestPlaneMode::get();
let mode2 = RequestPlaneMode::get();
assert_eq!(mode1, mode2, "Cached mode should be consistent");
// Verify it's one of the valid modes
assert!(
matches!(
mode1,
RequestPlaneMode::Nats | RequestPlaneMode::Http | RequestPlaneMode::Tcp
),
"Mode should be a valid variant"
);
}
}
......@@ -23,6 +23,7 @@ use crate::SystemHealth;
use crate::runtime::Runtime;
use async_once_cell::OnceCell;
use std::fmt;
use std::sync::{Arc, OnceLock, Weak};
use tokio::sync::watch::Receiver;
......@@ -49,6 +50,7 @@ pub struct DistributedRuntime {
tcp_server: Arc<OnceCell<Arc<transports::tcp::server::TcpStreamServer>>>,
network_manager: Arc<OnceCell<Arc<crate::pipeline::network::manager::NetworkManager>>>,
system_status_server: Arc<OnceLock<Arc<system_status_server::SystemStatusServerInfo>>>,
request_plane: RequestPlaneMode,
// Service discovery client
discovery_client: Arc<dyn discovery::Discovery>,
......@@ -95,7 +97,7 @@ impl std::fmt::Debug for DistributedRuntime {
impl DistributedRuntime {
pub async fn new(runtime: Runtime, config: DistributedConfig) -> Result<Self> {
let (selected_kv_store, nats_config) = config.dissolve();
let (selected_kv_store, nats_config, request_plane) = config.dissolve();
let runtime_clone = runtime.clone();
......@@ -183,6 +185,7 @@ impl DistributedRuntime {
instance_sources: Arc::new(Mutex::new(HashMap::new())),
metrics_registry: crate::MetricsRegistry::new(),
system_health,
request_plane,
};
if let Some(nats_client_for_metrics) = nats_client_for_metrics {
......@@ -359,6 +362,7 @@ impl DistributedRuntime {
self.child_token(),
nats_client,
self.component_registry.clone(),
self.request_plane,
))
})
.await?;
......@@ -430,6 +434,11 @@ impl DistributedRuntime {
&self.store
}
/// How the frontend should talk to the backend.
pub fn request_plane(&self) -> RequestPlaneMode {
self.request_plane
}
pub fn child_token(&self) -> CancellationToken {
self.runtime.child_token()
}
......@@ -447,6 +456,7 @@ impl DistributedRuntime {
pub struct DistributedConfig {
pub store_backend: KeyValueStoreSelect,
pub nats_config: nats::ClientOptions,
pub request_plane: RequestPlaneMode,
}
impl DistributedConfig {
......@@ -454,6 +464,7 @@ impl DistributedConfig {
DistributedConfig {
store_backend: KeyValueStoreSelect::Etcd(Box::default()),
nats_config: nats::ClientOptions::default(),
request_plane: RequestPlaneMode::from_env(),
}
}
......@@ -465,10 +476,70 @@ impl DistributedConfig {
DistributedConfig {
store_backend: KeyValueStoreSelect::Etcd(Box::new(etcd_config)),
nats_config: nats::ClientOptions::default(),
request_plane: RequestPlaneMode::from_env(),
}
}
}
/// Request plane transport mode configuration
///
/// This determines how requests are distributed from routers to workers:
/// - `Nats`: Use NATS for request distribution (default, legacy)
/// - `Http`: Use HTTP/2 for request distribution
/// - `Tcp`: Use raw TCP for request distribution with msgpack support
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RequestPlaneMode {
/// Use NATS for request plane (default for backward compatibility)
Nats,
/// Use HTTP/2 for request plane
Http,
/// Use raw TCP for request plane with msgpack support
Tcp,
}
impl Default for RequestPlaneMode {
fn default() -> Self {
Self::Nats
}
}
impl fmt::Display for RequestPlaneMode {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Nats => write!(f, "nats"),
Self::Http => write!(f, "http"),
Self::Tcp => write!(f, "tcp"),
}
}
}
impl std::str::FromStr for RequestPlaneMode {
type Err = anyhow::Error;
fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
match s.to_lowercase().as_str() {
"nats" => Ok(Self::Nats),
"http" => Ok(Self::Http),
"tcp" => Ok(Self::Tcp),
_ => Err(anyhow::anyhow!(
"Invalid request plane mode: '{}'. Valid options are: 'nats', 'http', 'tcp'",
s
)),
}
}
}
impl RequestPlaneMode {
/// Get the request plane mode from environment variable (uncached)
/// Reads from `DYN_REQUEST_PLANE` environment variable.
fn from_env() -> Self {
std::env::var("DYN_REQUEST_PLANE")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or_default()
}
}
pub mod distributed_test_utils {
//! Common test helper functions for DistributedRuntime tests
......@@ -483,6 +554,7 @@ pub mod distributed_test_utils {
let config = super::DistributedConfig {
store_backend: KeyValueStoreSelect::Memory,
nats_config: nats::ClientOptions::default(),
request_plane: crate::distributed::RequestPlaneMode::default(),
};
super::DistributedRuntime::new(rt, config).await.unwrap()
}
......@@ -490,6 +562,7 @@ pub mod distributed_test_utils {
#[cfg(all(test, feature = "integration"))]
mod tests {
use super::RequestPlaneMode;
use super::distributed_test_utils::create_test_drt_async;
#[tokio::test]
......@@ -543,4 +616,40 @@ mod tests {
})
.await;
}
#[test]
fn test_request_plane_mode_from_str() {
assert_eq!(
"nats".parse::<RequestPlaneMode>().unwrap(),
RequestPlaneMode::Nats
);
assert_eq!(
"http".parse::<RequestPlaneMode>().unwrap(),
RequestPlaneMode::Http
);
assert_eq!(
"tcp".parse::<RequestPlaneMode>().unwrap(),
RequestPlaneMode::Tcp
);
assert_eq!(
"NATS".parse::<RequestPlaneMode>().unwrap(),
RequestPlaneMode::Nats
);
assert_eq!(
"HTTP".parse::<RequestPlaneMode>().unwrap(),
RequestPlaneMode::Http
);
assert_eq!(
"TCP".parse::<RequestPlaneMode>().unwrap(),
RequestPlaneMode::Tcp
);
assert!("invalid".parse::<RequestPlaneMode>().is_err());
}
#[test]
fn test_request_plane_mode_display() {
assert_eq!(RequestPlaneMode::Nats.to_string(), "nats");
assert_eq!(RequestPlaneMode::Http.to_string(), "http");
assert_eq!(RequestPlaneMode::Tcp.to_string(), "tcp");
}
}
......@@ -15,7 +15,7 @@
use super::egress::unified_client::RequestPlaneClient;
use super::ingress::unified_server::RequestPlaneServer;
use crate::config::RequestPlaneMode;
use crate::distributed::RequestPlaneMode;
use anyhow::Result;
use async_once_cell::OnceCell;
use std::sync::Arc;
......@@ -99,7 +99,7 @@ impl NetworkConfig {
///
/// ```ignore
/// // Create manager (typically done once in DistributedRuntime)
/// let manager = NetworkManager::new(cancel_token, nats_client, component_registry);
/// let manager = NetworkManager::new(cancel_token, nats_client, component_registry, request_plane_mode);
///
/// // Get server (lazy init, cached)
/// let server = manager.server().await?;
......@@ -136,12 +136,12 @@ impl NetworkManager {
cancellation_token: CancellationToken,
nats_client: Option<async_nats::Client>,
component_registry: crate::component::Registry,
mode: RequestPlaneMode,
) -> Arc<Self> {
let mode = RequestPlaneMode::get();
let config = NetworkConfig::from_env(nats_client);
tracing::info!(
mode = %mode,
%mode,
http_port = config.http_port,
tcp_port = config.tcp_port,
"Initializing NetworkManager"
......
......@@ -44,6 +44,7 @@ async fn test_recursive_namespace_implementation() {
let config = DistributedConfig {
store_backend: KeyValueStoreSelect::Memory,
nats_config: nats::ClientOptions::default(),
request_plane: dynamo_runtime::distributed::RequestPlaneMode::default(),
};
let distributed_runtime = DistributedRuntime::new(runtime, config).await.unwrap();
......@@ -90,6 +91,7 @@ async fn test_multiple_branches_recursive_namespaces() {
let config = DistributedConfig {
store_backend: KeyValueStoreSelect::Memory,
nats_config: nats::ClientOptions::default(),
request_plane: dynamo_runtime::distributed::RequestPlaneMode::default(),
};
let distributed_runtime = DistributedRuntime::new(runtime, config).await.unwrap();
......
......@@ -31,7 +31,7 @@ def get_runtime():
except Exception:
# If no existing runtime, create a new one
loop = asyncio.get_running_loop()
_runtime_instance = DistributedRuntime(loop, "etcd")
_runtime_instance = DistributedRuntime(loop, "etcd", "nats")
return _runtime_instance
......
......@@ -232,11 +232,12 @@ async def send_request_with_retry(url: str, payload: dict, max_retries: int = 8)
return False
def get_runtime(store_backend="etcd"):
def get_runtime(store_backend="etcd", request_plane="nats"):
"""Create a DistributedRuntime instance for testing.
Args:
store_backend: Storage backend to use ("etcd" or "file"). Defaults to "etcd".
request_plane: How frontend talks to backend ("tcp", "http" or "nats). Defaults to "nats".
"""
try:
# Try to get running loop (works in async context)
......@@ -245,7 +246,7 @@ def get_runtime(store_backend="etcd"):
# No running loop, create a new one (sync context)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
return DistributedRuntime(loop, store_backend)
return DistributedRuntime(loop, store_backend, request_plane)
async def check_nats_consumers(namespace: str, expected_count: Optional[int] = None):
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment