Unverified Commit b407b419 authored by Biswa Panda's avatar Biswa Panda Committed by GitHub
Browse files

fix: resolve HTTP port collisions when multiple workers share a process (#7063)

parent 6945d47e
......@@ -240,10 +240,13 @@ fn build_transport_type_inner(
match mode {
RequestPlaneMode::Http => {
let http_host = crate::utils::get_http_rpc_host_from_env();
// If a fixed port is explicitly configured, use it directly.
// Otherwise, use the actual bound port (set by HTTP server after binding when port 0 is used).
let http_port = std::env::var("DYN_HTTP_RPC_PORT")
.ok()
.and_then(|p| p.parse::<u16>().ok())
.unwrap_or(8888);
.filter(|&p| p != 0)
.unwrap_or(crate::pipeline::network::manager::get_actual_http_rpc_port()?);
let rpc_root =
std::env::var("DYN_HTTP_RPC_ROOT_PATH").unwrap_or_else(|_| "/v1/rpc".to_string());
......@@ -261,6 +264,7 @@ fn build_transport_type_inner(
let tcp_port = std::env::var("DYN_TCP_RPC_PORT")
.ok()
.and_then(|p| p.parse::<u16>().ok())
.filter(|&p| p != 0)
.unwrap_or(crate::pipeline::network::manager::get_actual_tcp_rpc_port()?);
// Include instance_id and endpoint name for proper TCP routing.
......@@ -293,18 +297,26 @@ pub async fn build_transport_type(
) -> Result<TransportType> {
let mode = endpoint.drt().request_plane();
if mode == RequestPlaneMode::Tcp {
// Only force server init when we *don't* have a valid explicit port.
let has_fixed_port = std::env::var("DYN_TCP_RPC_PORT")
// For TCP and HTTP with OS-assigned ports, we must ensure the server is initialized
// (bound to a port) before we can construct a correct transport address.
let has_fixed_port = match mode {
RequestPlaneMode::Tcp => std::env::var("DYN_TCP_RPC_PORT")
.ok()
.and_then(|p| p.parse::<u16>().ok())
.is_some();
.filter(|&p| p != 0)
.is_some(),
RequestPlaneMode::Http => std::env::var("DYN_HTTP_RPC_PORT")
.ok()
.and_then(|p| p.parse::<u16>().ok())
.filter(|&p| p != 0)
.is_some(),
RequestPlaneMode::Nats => true, // NATS doesn't need port init
};
if !has_fixed_port {
// Ensure request plane server is initialized before building transport.
let _ = endpoint.drt().request_plane_server().await?;
}
}
build_transport_type_inner(mode, endpoint_id, connection_id)
}
......
......@@ -23,7 +23,7 @@ use hyper_util::service::TowerToHyperService;
use parking_lot::Mutex;
use std::net::SocketAddr;
use std::sync::atomic::{AtomicU64, Ordering};
use tokio::sync::Notify;
use tokio::sync::{Notify, RwLock};
use tokio_util::sync::CancellationToken;
use tower_http::trace::TraceLayer;
use tracing::Instrument;
......@@ -38,6 +38,7 @@ pub const VERSION: &str = env!("CARGO_PKG_VERSION");
pub struct SharedHttpServer {
handlers: Arc<DashMap<String, Arc<EndpointHandler>>>,
bind_addr: SocketAddr,
actual_addr: RwLock<Option<SocketAddr>>,
cancellation_token: CancellationToken,
}
......@@ -58,10 +59,16 @@ impl SharedHttpServer {
Arc::new(Self {
handlers: Arc::new(DashMap::new()),
bind_addr,
actual_addr: RwLock::new(None),
cancellation_token,
})
}
/// Get the actual bound address (after `bind_and_start` resolves).
pub fn actual_address(&self) -> Option<SocketAddr> {
self.actual_addr.try_read().ok().and_then(|g| *g)
}
/// Register an endpoint handler with this server
#[allow(clippy::too_many_arguments)]
pub async fn register_endpoint(
......@@ -129,8 +136,10 @@ impl SharedHttpServer {
}
}
/// Start the shared HTTP server
pub async fn start(self: Arc<Self>) -> Result<()> {
/// Bind the TCP listener and start the accept loop.
///
/// Returns the actual bound `SocketAddr` (important when binding to port 0).
pub async fn bind_and_start(self: Arc<Self>) -> Result<SocketAddr> {
let rpc_root_path = std::env::var("DYN_HTTP_RPC_ROOT_PATH")
.unwrap_or_else(|_| DEFAULT_RPC_ROOT_PATH.to_string());
let route_pattern = format!("{}/{{*endpoint}}", rpc_root_path);
......@@ -140,15 +149,23 @@ impl SharedHttpServer {
.layer(TraceLayer::new_for_http())
.with_state(self.clone());
let listener = tokio::net::TcpListener::bind(&self.bind_addr).await?;
let actual_addr = listener.local_addr()?;
tracing::info!(
"Starting shared HTTP/2 endpoint server on {} at path {}/:endpoint",
self.bind_addr,
rpc_root_path
requested = %self.bind_addr,
actual = %actual_addr,
rpc_root = %rpc_root_path,
"HTTP/2 endpoint server bound"
);
let listener = tokio::net::TcpListener::bind(&self.bind_addr).await?;
// Store the actual address so `address()` returns the real port.
*self.actual_addr.write().await = Some(actual_addr);
let cancellation_token = self.cancellation_token.clone();
// Spawn the accept loop in the background.
tokio::spawn(async move {
loop {
tokio::select! {
accept_result = listener.accept() => {
......@@ -158,13 +175,10 @@ impl SharedHttpServer {
let cancel_clone = cancellation_token.clone();
tokio::spawn(async move {
// Create HTTP/2 connection builder with prior knowledge
let http2_builder = Http2Builder::new(TokioExecutor::new());
let io = TokioIo::new(stream);
let tower_service = app_clone.into_service();
// Wrap Tower service for Hyper compatibility
let hyper_service = TowerToHyperService::new(tower_service);
tokio::select! {
......@@ -186,10 +200,13 @@ impl SharedHttpServer {
}
_ = cancellation_token.cancelled() => {
tracing::info!("SharedHttpServer received cancellation signal, shutting down");
return Ok(());
return;
}
}
}
});
Ok(actual_addr)
}
/// Wait for all inflight requests across all endpoints
......@@ -332,7 +349,8 @@ impl super::unified_server::RequestPlaneServer for SharedHttpServer {
}
fn address(&self) -> String {
format!("http://{}:{}", self.bind_addr.ip(), self.bind_addr.port())
let addr = self.actual_address().unwrap_or(self.bind_addr);
format!("http://{}:{}", addr.ip(), addr.port())
}
fn transport_name(&self) -> &'static str {
......@@ -374,4 +392,69 @@ mod tests {
let server = SharedHttpServer::new(bind_addr, token);
assert!(server.handlers.is_empty());
}
#[tokio::test]
async fn test_bind_and_start_assigns_os_port() {
use std::net::{IpAddr, Ipv4Addr};
let bind_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0);
let token = CancellationToken::new();
let server = SharedHttpServer::new(bind_addr, token.clone());
let actual_addr = server.clone().bind_and_start().await.unwrap();
// OS should assign a non-zero port
assert_ne!(actual_addr.port(), 0);
// actual_address() should return the real bound address
assert_eq!(server.actual_address(), Some(actual_addr));
// address() should contain the real port
let addr_str =
<SharedHttpServer as super::unified_server::RequestPlaneServer>::address(&*server);
assert!(addr_str.contains(&actual_addr.port().to_string()));
token.cancel();
}
#[tokio::test]
async fn test_two_servers_get_different_ports() {
use std::net::{IpAddr, Ipv4Addr};
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0);
let token1 = CancellationToken::new();
let token2 = CancellationToken::new();
let server1 = SharedHttpServer::new(addr, token1.clone());
let server2 = SharedHttpServer::new(addr, token2.clone());
let actual1 = server1.clone().bind_and_start().await.unwrap();
let actual2 = server2.clone().bind_and_start().await.unwrap();
// Two servers binding to port 0 must get different ports
assert_ne!(actual1.port(), actual2.port());
token1.cancel();
token2.cancel();
}
#[tokio::test]
async fn test_bind_and_start_with_explicit_port() {
use std::net::{IpAddr, Ipv4Addr};
// First bind to port 0 to get a free port
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let free_port = listener.local_addr().unwrap().port();
drop(listener); // Release the port
let bind_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), free_port);
let token = CancellationToken::new();
let server = SharedHttpServer::new(bind_addr, token.clone());
let actual_addr = server.clone().bind_and_start().await.unwrap();
// When binding to an explicit port, actual should match
assert_eq!(actual_addr.port(), free_port);
token.cancel();
}
}
......@@ -27,6 +27,10 @@ use tokio_util::sync::CancellationToken;
/// Uses OnceLock since the port is set once when the server binds and never changes.
static ACTUAL_TCP_RPC_PORT: OnceLock<u16> = OnceLock::new();
/// Global storage for the actual HTTP RPC port after binding.
/// Uses OnceLock since the port is set once when the server binds and never changes.
static ACTUAL_HTTP_RPC_PORT: OnceLock<u16> = OnceLock::new();
/// Global storage for the shared TCP server instance.
///
/// When multiple workers run in the same process, they must share a single TCP server
......@@ -38,6 +42,14 @@ static ACTUAL_TCP_RPC_PORT: OnceLock<u16> = OnceLock::new();
static GLOBAL_TCP_SERVER: tokio::sync::OnceCell<Arc<SharedTcpServer>> =
tokio::sync::OnceCell::const_new();
/// Global storage for the shared HTTP server instance.
///
/// Same rationale as GLOBAL_TCP_SERVER: multiple workers in the same process must share
/// a single HTTP server so that all endpoints are registered on the same port.
static GLOBAL_HTTP_SERVER: tokio::sync::OnceCell<
Arc<super::ingress::http_endpoint::SharedHttpServer>,
> = tokio::sync::OnceCell::const_new();
/// Process-wide cancellation token for the global TCP server.
///
/// This token is independent of any individual runtime's cancellation token so that
......@@ -46,6 +58,10 @@ static GLOBAL_TCP_SERVER: tokio::sync::OnceCell<Arc<SharedTcpServer>> =
static GLOBAL_TCP_SERVER_TOKEN: std::sync::LazyLock<CancellationToken> =
std::sync::LazyLock::new(CancellationToken::new);
/// Process-wide cancellation token for the global HTTP server.
static GLOBAL_HTTP_SERVER_TOKEN: std::sync::LazyLock<CancellationToken> =
std::sync::LazyLock::new(CancellationToken::new);
/// Get the actual TCP RPC port that the server is listening on.
pub fn get_actual_tcp_rpc_port() -> anyhow::Result<u16> {
ACTUAL_TCP_RPC_PORT.get().copied().ok_or_else(|| {
......@@ -69,12 +85,36 @@ fn set_actual_tcp_rpc_port(port: u16) {
}
}
/// Get the actual HTTP RPC port that the server is listening on.
pub fn get_actual_http_rpc_port() -> anyhow::Result<u16> {
ACTUAL_HTTP_RPC_PORT.get().copied().ok_or_else(|| {
tracing::error!(
"HTTP RPC port not set - request_plane_server() must be called before get_actual_http_rpc_port()"
);
anyhow::anyhow!(
"HTTP RPC port not initialized. This is not expected."
)
})
}
/// Set the actual HTTP RPC port (called internally after server binds).
fn set_actual_http_rpc_port(port: u16) {
if let Err(existing) = ACTUAL_HTTP_RPC_PORT.set(port) {
tracing::warn!(
existing_port = existing,
new_port = port,
"HTTP RPC port already set, ignoring new value"
);
}
}
/// Network configuration loaded from environment variables
#[derive(Clone)]
struct NetworkConfig {
// HTTP server configuration
http_host: String,
http_port: u16,
/// HTTP port to bind to. If None, the OS will assign a free port.
http_port: Option<u16>,
http_rpc_root: String,
// TCP server configuration
......@@ -99,12 +139,12 @@ impl NetworkConfig {
fn from_env(nats_client: Option<async_nats::Client>) -> Self {
Self {
// HTTP server configuration
// If DYN_HTTP_RPC_PORT is set, use that port; otherwise None means OS will assign a free port
http_host: std::env::var("DYN_HTTP_RPC_HOST")
.unwrap_or_else(|_| crate::utils::get_http_rpc_host_from_env()),
http_port: std::env::var("DYN_HTTP_RPC_PORT")
.ok()
.and_then(|p| p.parse().ok())
.unwrap_or(8888),
.and_then(|p| p.parse().ok()),
http_rpc_root: std::env::var("DYN_HTTP_RPC_ROOT_PATH")
.unwrap_or_else(|_| "/v1/rpc".to_string()),
......@@ -191,10 +231,14 @@ impl NetworkManager {
match mode {
RequestPlaneMode::Http => {
let port_display = config
.http_port
.map(|p| p.to_string())
.unwrap_or_else(|| "OS-assigned".to_string());
tracing::info!(
%mode,
host = %config.http_host,
port = config.http_port,
port = %port_display,
rpc_root = %config.http_rpc_root,
"Initializing NetworkManager with HTTP request plane"
);
......@@ -296,27 +340,42 @@ impl NetworkManager {
async fn create_http_server(&self) -> Result<Arc<dyn RequestPlaneServer>> {
use super::ingress::http_endpoint::SharedHttpServer;
let bind_addr = format!("{}:{}", self.config.http_host, self.config.http_port)
// Use the global HTTP server to ensure all workers in the same process share
// a single server. This is critical for correct endpoint routing.
let server = GLOBAL_HTTP_SERVER
.get_or_try_init(|| async {
// Use configured port if specified, otherwise use port 0 (OS assigns free port)
let port = self.config.http_port.unwrap_or(0);
let bind_addr = format!("{}:{}", self.config.http_host, port)
.parse()
.map_err(|e| anyhow::anyhow!("Invalid HTTP bind address: {}", e))?;
tracing::info!(
bind_addr = %bind_addr,
port_source = if self.config.http_port.is_some() { "DYN_HTTP_RPC_PORT" } else { "OS-assigned" },
rpc_root = %self.config.http_rpc_root,
"Creating HTTP request plane server"
);
let server = SharedHttpServer::new(bind_addr, self.cancellation_token.clone());
let server = SharedHttpServer::new(bind_addr, GLOBAL_HTTP_SERVER_TOKEN.clone());
// Start server in background
let server_clone = server.clone();
tokio::spawn(async move {
if let Err(e) = server_clone.start().await {
tracing::error!("HTTP request plane server error: {e}");
}
});
// Bind and start server, getting the actual bound address
let actual_addr = server.clone().bind_and_start().await?;
// Store the actual bound port globally so build_transport_type() can access it
set_actual_http_rpc_port(actual_addr.port());
Ok(server as Arc<dyn RequestPlaneServer>)
tracing::info!(
actual_addr = %actual_addr,
actual_port = actual_addr.port(),
"HTTP request plane server started"
);
Ok::<_, anyhow::Error>(server)
})
.await?;
Ok(server.clone() as Arc<dyn RequestPlaneServer>)
}
async fn create_tcp_server(&self) -> Result<Arc<dyn RequestPlaneServer>> {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment