Unverified Commit 64a30d3d authored by Biswa Panda's avatar Biswa Panda Committed by GitHub
Browse files

feat: use os assigned free port for tcp rpc (#4891)

parent 33249945
......@@ -108,13 +108,15 @@ export DYN_REQUEST_PLANE=tcp
# Optional: Configure TCP server host and port
export DYN_TCP_RPC_HOST=0.0.0.0 # Default host
export DYN_TCP_RPC_PORT=9999 # Default port
# export DYN_TCP_RPC_PORT=9999 # Optional: specify a fixed port
# Run your Dynamo service
DYN_REQUEST_PLANE=tcp python -m dynamo.frontend --http-port=8000 &
DYN_REQUEST_PLANE=tcp python -m dynamo.vllm --model Qwen/Qwen3-0.6B
```
**Note:** By default, TCP uses an OS-assigned free port (port 0). This is ideal for environments where multiple services may run on the same machine or when you want to avoid port conflicts. If you need a specific port (e.g., for firewall rules), set `DYN_TCP_RPC_PORT` explicitly.
**When to use TCP:**
- Simple deployments with direct service-to-service communication (e.g. frontend to backend)
- Minimal infrastructure requirements (no NATS needed)
......@@ -124,7 +126,7 @@ DYN_REQUEST_PLANE=tcp python -m dynamo.vllm --model Qwen/Qwen3-0.6B
Additional TCP-specific environment variables:
- `DYN_TCP_RPC_HOST`: Server host address (default: auto-detected)
- `DYN_TCP_RPC_PORT`: Server port (default: 9999)
- `DYN_TCP_RPC_PORT`: Server port. If not set, the OS assigns a free port automatically (recommended for most deployments). Set explicitly only if you need a specific port for firewall rules.
- `DYN_TCP_MAX_MESSAGE_SIZE`: Maximum message size for TCP client (default: 32MB)
- `DYN_TCP_REQUEST_TIMEOUT`: Request timeout for TCP client (default: 10 seconds)
- `DYN_TCP_POOL_SIZE`: Connection pool size for TCP client (default: 50)
......@@ -228,7 +230,7 @@ Request plane configuration is loaded from environment variables at startup and
1. Stop your Dynamo services
2. Set environment variable `DYN_REQUEST_PLANE=tcp`
3. Optionally configure TCP-specific settings (`DYN_TCP_RPC_PORT`, etc.)
3. Optionally configure TCP-specific settings (e.g., `DYN_TCP_RPC_HOST`). Note: `DYN_TCP_RPC_PORT` is optional; if not set, an OS-assigned free port is used automatically.
4. Restart your services
......@@ -279,7 +281,7 @@ curl http://localhost:8000/v1/chat/completions \
**Symptoms:** Server fails to start due to "address already in use"
**Solutions:**
- TCP default port: 9999 (adjust environment variable `DYN_TCP_RPC_PORT`)
- TCP: By default, TCP uses an OS-assigned free port, so port conflicts should be rare. If you explicitly set `DYN_TCP_RPC_PORT` to a specific port and get conflicts, either change the port or remove the setting to use automatic port assignment.
- HTTP default port: 8888 (adjust environment variable `DYN_HTTP_RPC_PORT`)
## Performance Considerations
......
......@@ -330,12 +330,11 @@ impl ModelManager {
// Register router via discovery mechanism
let discovery = endpoint.component().drt().discovery();
let instance_id = discovery.instance_id();
let request_plane_mode = endpoint.drt().request_plane();
// Build transport for router endpoint based on request plane mode
// Use KV_ROUTER_COMPONENT as the component name to distinguish from the generate endpoint's component
let router_endpoint_id = router_endpoint_id(endpoint.id().namespace);
let transport = build_transport_type(request_plane_mode, &router_endpoint_id, instance_id);
let transport = build_transport_type(endpoint, &router_endpoint_id, instance_id).await?;
let discovery_spec = DiscoverySpec::Endpoint {
namespace: router_endpoint_id.namespace.clone(),
......
......@@ -89,30 +89,6 @@ impl EndpointConfigBuilder {
let request_plane_mode = endpoint.drt().request_plane();
tracing::info!("Endpoint starting with request plane mode: {request_plane_mode}",);
// Register health check target in SystemHealth if provided
if let Some(health_check_payload) = &health_check_payload {
// Build transport based on request plane mode
let transport = build_transport_type(request_plane_mode, &endpoint_id, connection_id);
let instance = Instance {
component: endpoint_id.component.clone(),
endpoint: endpoint_id.name.clone(),
namespace: endpoint_id.namespace.clone(),
instance_id: connection_id,
transport,
};
tracing::debug!(endpoint_name = %endpoint.name, "Registering endpoint health check target");
let guard = system_health.lock();
guard.register_health_check_target(
&endpoint.name,
instance,
health_check_payload.clone(),
);
if let Some(notifier) = guard.get_endpoint_health_check_notifier(&endpoint.name) {
handler.set_endpoint_health_check_notifier(notifier)?;
}
}
// Register with graceful shutdown tracker if needed
if graceful_shutdown {
tracing::debug!(
......@@ -137,9 +113,33 @@ impl EndpointConfigBuilder {
let component_name_for_task = endpoint_id.component.clone();
let endpoint_name_for_task = endpoint_id.name.clone();
// Get the unified request plane server (works for all transport types)
// Get the unified request plane server
let server = endpoint.drt().request_plane_server().await?;
// Register health check target in SystemHealth if provided
if let Some(health_check_payload) = &health_check_payload {
// Build transport based on request plane mode
let transport = build_transport_type(&endpoint, &endpoint_id, connection_id).await?;
let instance = Instance {
component: endpoint_id.component.clone(),
endpoint: endpoint_id.name.clone(),
namespace: endpoint_id.namespace.clone(),
instance_id: connection_id,
transport,
};
tracing::debug!(endpoint_name = %endpoint.name, "Registering endpoint health check target");
let guard = system_health.lock();
guard.register_health_check_target(
&endpoint.name,
instance,
health_check_payload.clone(),
);
if let Some(notifier) = guard.get_endpoint_health_check_notifier(&endpoint.name) {
handler.set_endpoint_health_check_notifier(notifier)?;
}
}
tracing::info!(
endpoint = %endpoint_name_for_task,
transport = server.transport_name(),
......@@ -198,7 +198,7 @@ impl EndpointConfigBuilder {
let discovery = endpoint.drt().discovery();
// Build transport for discovery service based on request plane mode
let transport = build_transport_type(request_plane_mode, &endpoint_id, connection_id);
let transport = build_transport_type(&endpoint, &endpoint_id, connection_id).await?;
let discovery_spec = crate::discovery::DiscoverySpec::Endpoint {
namespace: endpoint_id.namespace.clone(),
......@@ -232,11 +232,14 @@ impl EndpointConfigBuilder {
/// - HTTP: Uses full URL path including endpoint name (e.g., http://host:port/v1/rpc/endpoint_name)
/// - TCP: Includes endpoint name for routing (e.g., host:port/endpoint_name)
/// - NATS: Uses subject-based addressing (unique per endpoint)
pub fn build_transport_type(
///
/// # Errors
/// Returns an error if TCP mode is used but the TCP server hasn't been started yet.
fn build_transport_type_inner(
mode: RequestPlaneMode,
endpoint_id: &EndpointId,
connection_id: u64,
) -> TransportType {
) -> Result<TransportType> {
match mode {
RequestPlaneMode::Http => {
let http_host = crate::utils::get_http_rpc_host_from_env();
......@@ -252,23 +255,54 @@ pub fn build_transport_type(
endpoint_id.name
);
TransportType::Http(http_endpoint)
Ok(TransportType::Http(http_endpoint))
}
RequestPlaneMode::Tcp => {
let tcp_host = crate::utils::get_tcp_rpc_host_from_env();
// If a fixed port is explicitly configured, use it directly (no init ordering dependency).
// Otherwise, use the actual bound port (set by TCP server after binding when port 0 is used).
let tcp_port = std::env::var("DYN_TCP_RPC_PORT")
.ok()
.and_then(|p| p.parse::<u16>().ok())
.unwrap_or(9999);
.unwrap_or(crate::pipeline::network::manager::get_actual_tcp_rpc_port()?);
// Include endpoint name for proper TCP routing
// TCP client parses this format and adds x-endpoint-path header for server-side routing
let tcp_endpoint = format!("{}:{}/{}", tcp_host, tcp_port, endpoint_id.name);
TransportType::Tcp(tcp_endpoint)
Ok(TransportType::Tcp(tcp_endpoint))
}
RequestPlaneMode::Nats => {
TransportType::Nats(nats::instance_subject(endpoint_id, connection_id))
RequestPlaneMode::Nats => Ok(TransportType::Nats(nats::instance_subject(
endpoint_id,
connection_id,
))),
}
}
/// Build transport type, ensuring TCP server is initialized when needed.
///
/// In TCP mode with an OS-assigned port (`DYN_TCP_RPC_PORT` unset or invalid), the server must bind
/// before we can construct a correct transport address. This helper ensures that initialization
/// occurs, then delegates to the internal builder.
pub async fn build_transport_type(
endpoint: &Endpoint,
endpoint_id: &EndpointId,
connection_id: u64,
) -> Result<TransportType> {
let mode = endpoint.drt().request_plane();
if mode == RequestPlaneMode::Tcp {
// Only force server init when we *don't* have a valid explicit port.
let has_fixed_port = std::env::var("DYN_TCP_RPC_PORT")
.ok()
.and_then(|p| p.parse::<u16>().ok())
.is_some();
if !has_fixed_port {
// Ensure request plane server is initialized before building transport.
let _ = endpoint.drt().request_plane_server().await?;
}
}
build_transport_type_inner(mode, endpoint_id, connection_id)
}
......@@ -11,7 +11,7 @@ use crate::pipeline::network::PushWorkHandler;
use anyhow::Result;
use bytes::Bytes;
use dashmap::DashMap;
use parking_lot::Mutex;
use parking_lot::{Mutex, RwLock};
use std::net::SocketAddr;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
......@@ -36,7 +36,10 @@ fn get_max_message_size() -> usize {
/// Shared TCP server that handles multiple endpoints on a single port
pub struct SharedTcpServer {
handlers: Arc<DashMap<String, Arc<EndpointHandler>>>,
/// The address to bind to (may have port 0 for OS-assigned port)
bind_addr: SocketAddr,
/// The actual bound address (populated after bind_and_start, contains actual port)
actual_addr: RwLock<Option<SocketAddr>>,
cancellation_token: CancellationToken,
}
......@@ -55,11 +58,83 @@ impl SharedTcpServer {
pub fn new(bind_addr: SocketAddr, cancellation_token: CancellationToken) -> Arc<Self> {
Arc::new(Self {
handlers: Arc::new(DashMap::new()),
// address we requested to bind to.
bind_addr,
// actual address after free port assignment (if DYN_TCP_RPC_PORT is not specified)
actual_addr: RwLock::new(None),
cancellation_token,
})
}
/// Bind the server and start accepting connections.
///
/// This method binds to the configured address first, then starts the accept loop.
/// If the configured port is 0, the OS will assign a free port.
/// The actual bound address is stored and can be retrieved via `actual_address()`.
///
/// Returns the actual bound address (useful when port 0 was specified).
pub async fn bind_and_start(self: Arc<Self>) -> Result<SocketAddr> {
tracing::info!("Binding TCP server to {}", self.bind_addr);
let listener = TcpListener::bind(&self.bind_addr).await?;
let actual_addr = listener.local_addr()?;
tracing::info!(
requested = %self.bind_addr,
actual = %actual_addr,
"TCP server bound successfully"
);
// Store the actual bound address
*self.actual_addr.write() = Some(actual_addr);
// Start accepting connections in a background task
let server = self.clone();
tokio::spawn(async move {
server.accept_loop(listener).await;
});
Ok(actual_addr)
}
/// Get the actual bound address (after bind_and_start has been called).
///
/// Returns None if the server hasn't been started yet.
pub fn actual_address(&self) -> Option<SocketAddr> {
*self.actual_addr.read()
}
/// Internal accept loop - runs after binding
async fn accept_loop(self: Arc<Self>, listener: TcpListener) {
let cancellation_token = self.cancellation_token.clone();
loop {
tokio::select! {
accept_result = listener.accept() => {
match accept_result {
Ok((stream, peer_addr)) => {
tracing::trace!("Accepted TCP connection from {}", peer_addr);
let handlers = self.handlers.clone();
tokio::spawn(async move {
if let Err(e) = Self::handle_connection(stream, handlers).await {
tracing::error!("TCP connection error: {}", e);
}
});
}
Err(e) => {
tracing::error!("Failed to accept TCP connection: {}", e);
}
}
}
_ = cancellation_token.cancelled() => {
tracing::info!("SharedTcpServer received cancellation signal, shutting down");
return;
}
}
}
}
#[allow(clippy::too_many_arguments)]
pub async fn register_endpoint(
&self,
......@@ -93,7 +168,7 @@ impl SharedTcpServer {
tracing::info!(
"Registered endpoint '{}' with shared TCP server on {}",
endpoint_name,
self.bind_addr
self.actual_address().unwrap_or(self.bind_addr)
);
Ok(())
......@@ -129,37 +204,16 @@ impl SharedTcpServer {
}
}
/// Start the server (legacy method - prefer bind_and_start for new code).
///
/// This method is kept for backwards compatibility. It binds and starts
/// the server but doesn't return the actual bound address.
pub async fn start(self: Arc<Self>) -> Result<()> {
tracing::info!("Starting shared TCP server on {}", self.bind_addr);
let listener = TcpListener::bind(&self.bind_addr).await?;
let cancellation_token = self.cancellation_token.clone();
loop {
tokio::select! {
accept_result = listener.accept() => {
match accept_result {
Ok((stream, peer_addr)) => {
tracing::trace!("Accepted TCP connection from {}", peer_addr);
let handlers = self.handlers.clone();
tokio::spawn(async move {
if let Err(e) = Self::handle_connection(stream, handlers).await {
tracing::debug!("TCP connection error: {}", e);
}
});
}
Err(e) => {
tracing::error!("Failed to accept TCP connection: {}", e);
}
}
}
_ = cancellation_token.cancelled() => {
tracing::info!("SharedTcpServer received cancellation signal, shutting down");
return Ok(());
}
}
}
let cancel_token = self.cancellation_token.clone();
self.bind_and_start().await?;
// Wait for cancellation (the accept loop runs in background)
cancel_token.cancelled().await;
Ok(())
}
async fn handle_connection(
......@@ -378,7 +432,10 @@ impl super::unified_server::RequestPlaneServer for SharedTcpServer {
}
fn address(&self) -> String {
format!("tcp://{}:{}", self.bind_addr.ip(), self.bind_addr.port())
// Return actual bound address if available (after bind_and_start),
// otherwise fall back to configured bind address
let addr = self.actual_address().unwrap_or(self.bind_addr);
format!("tcp://{}:{}", addr.ip(), addr.port())
}
fn transport_name(&self) -> &'static str {
......
......@@ -19,8 +19,36 @@ use crate::distributed::RequestPlaneMode;
use anyhow::Result;
use async_once_cell::OnceCell;
use std::sync::Arc;
use std::sync::OnceLock;
use tokio_util::sync::CancellationToken;
/// Global storage for the actual TCP RPC port after binding.
/// Uses OnceLock since the port is set once when the server binds and never changes.
static ACTUAL_TCP_RPC_PORT: OnceLock<u16> = OnceLock::new();
/// Get the actual TCP RPC port that the server is listening on.
pub fn get_actual_tcp_rpc_port() -> anyhow::Result<u16> {
ACTUAL_TCP_RPC_PORT.get().copied().ok_or_else(|| {
tracing::error!(
"TCP RPC port not set - request_plane_server() must be called before get_actual_tcp_rpc_port()"
);
anyhow::anyhow!(
"TCP RPC port not initialized. This is not expected."
)
})
}
/// Set the actual TCP RPC port (called internally after server binds).
fn set_actual_tcp_rpc_port(port: u16) {
if let Err(existing) = ACTUAL_TCP_RPC_PORT.set(port) {
tracing::warn!(
existing_port = existing,
new_port = port,
"TCP RPC port already set, ignoring new value"
);
}
}
/// Network configuration loaded from environment variables
#[derive(Clone)]
struct NetworkConfig {
......@@ -31,7 +59,8 @@ struct NetworkConfig {
// TCP server configuration
tcp_host: String,
tcp_port: u16,
/// TCP port to bind to. If None, the OS will assign a free port.
tcp_port: Option<u16>,
// HTTP client configuration
http_client_config: super::egress::http_router::Http2Config,
......@@ -60,12 +89,12 @@ impl NetworkConfig {
.unwrap_or_else(|_| "/v1/rpc".to_string()),
// TCP server configuration
// If DYN_TCP_RPC_PORT is set, use that port; otherwise None means OS will assign a free port
tcp_host: std::env::var("DYN_TCP_RPC_HOST")
.unwrap_or_else(|_| crate::utils::get_tcp_rpc_host_from_env()),
tcp_port: std::env::var("DYN_TCP_RPC_PORT")
.ok()
.and_then(|p| p.parse().ok())
.unwrap_or(9999),
.and_then(|p| p.parse().ok()),
// HTTP client configuration (reads DYN_HTTP2_* env vars)
http_client_config: super::egress::http_router::Http2Config::from_env(),
......@@ -140,12 +169,35 @@ impl NetworkManager {
) -> Self {
let config = NetworkConfig::from_env(nats_client);
tracing::info!(
%mode,
http_port = config.http_port,
tcp_port = config.tcp_port,
"Initializing NetworkManager"
);
match mode {
RequestPlaneMode::Http => {
tracing::info!(
%mode,
host = %config.http_host,
port = config.http_port,
rpc_root = %config.http_rpc_root,
"Initializing NetworkManager with HTTP request plane"
);
}
RequestPlaneMode::Tcp => {
let port_display = config
.tcp_port
.map(|p| p.to_string())
.unwrap_or_else(|| "OS-assigned".to_string());
tracing::info!(
%mode,
host = %config.tcp_host,
port = %port_display,
"Initializing NetworkManager with TCP request plane"
);
}
RequestPlaneMode::Nats => {
tracing::info!(
%mode,
"Initializing NetworkManager with NATS request plane"
);
}
}
Self {
mode,
......@@ -250,24 +302,31 @@ impl NetworkManager {
async fn create_tcp_server(&self) -> Result<Arc<dyn RequestPlaneServer>> {
use super::ingress::shared_tcp_endpoint::SharedTcpServer;
let bind_addr = format!("{}:{}", self.config.tcp_host, self.config.tcp_port)
// Use configured port if specified, otherwise use port 0 (OS assigns free port)
let port = self.config.tcp_port.unwrap_or(0);
let bind_addr = format!("{}:{}", self.config.tcp_host, port)
.parse()
.map_err(|e| anyhow::anyhow!("Invalid TCP bind address: {}", e))?;
tracing::info!(
bind_addr = %bind_addr,
port_source = if self.config.tcp_port.is_some() { "DYN_TCP_RPC_PORT" } else { "OS-assigned" },
"Creating TCP request plane server"
);
let server = SharedTcpServer::new(bind_addr, self.cancellation_token.clone());
// Start server in background
let server_clone = server.clone();
tokio::spawn(async move {
if let Err(e) = server_clone.start().await {
tracing::error!("TCP request plane server error: {}", e);
}
});
// Bind and start server, getting the actual bound address
let actual_addr = server.clone().bind_and_start().await?;
// Store the actual bound port globally so build_transport_type() can access it
set_actual_tcp_rpc_port(actual_addr.port());
tracing::info!(
actual_addr = %actual_addr,
actual_port = actual_addr.port(),
"TCP request plane server started"
);
Ok(server as Arc<dyn RequestPlaneServer>)
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment