Unverified Commit ffa6e84a authored by Michael Feil's avatar Michael Feil Committed by GitHub
Browse files

feat: garbage collection tcp machine + read from shared place (#8420)


Signed-off-by: default avatarmichaelfeil <63565275+michaelfeil@users.noreply.github.com>
parent 73e0a2f2
...@@ -96,6 +96,7 @@ Additional TCP-specific environment variables: ...@@ -96,6 +96,7 @@ Additional TCP-specific environment variables:
- `DYN_TCP_RPC_HOST`: Server host address (default: auto-detected) - `DYN_TCP_RPC_HOST`: Server host address (default: auto-detected)
- `DYN_TCP_RPC_PORT`: Server port. If not set, the OS assigns a free port automatically (recommended for most deployments). Set explicitly only if you need a specific port for firewall rules. - `DYN_TCP_RPC_PORT`: Server port. If not set, the OS assigns a free port automatically (recommended for most deployments). Set explicitly only if you need a specific port for firewall rules.
- `DYN_TCP_MAX_MESSAGE_SIZE`: Maximum message size for TCP client (default: 32MB) - `DYN_TCP_MAX_MESSAGE_SIZE`: Maximum message size for TCP client (default: 32MB)
- `DYN_TCP_SHRINK_MESSAGE_SIZE`: Threshold for shrinking the zero-copy decoder buffer back to initial size after processing large messages (default: 8MB, max: DYN_TCP_MAX_MESSAGE_SIZE)
- `DYN_TCP_REQUEST_TIMEOUT`: Request timeout for TCP client (default: 10 seconds) - `DYN_TCP_REQUEST_TIMEOUT`: Request timeout for TCP client (default: 10 seconds)
- `DYN_TCP_POOL_SIZE`: Connection pool size for TCP client (default: 50) - `DYN_TCP_POOL_SIZE`: Connection pool size for TCP client (default: 50)
- `DYN_TCP_CONNECT_TIMEOUT`: Connect timeout for TCP client (default: 3 seconds) - `DYN_TCP_CONNECT_TIMEOUT`: Connect timeout for TCP client (default: 3 seconds)
......
...@@ -31,12 +31,26 @@ use super::{ ...@@ -31,12 +31,26 @@ use super::{
AsyncTransportEngine, Context, Data, Error, ManyOut, PipelineError, PipelineIO, SegmentSource, AsyncTransportEngine, Context, Data, Error, ManyOut, PipelineError, PipelineIO, SegmentSource,
ServiceBackend, ServiceEngine, SingleIn, Source, context, ServiceBackend, ServiceEngine, SingleIn, Source, context,
}; };
use ingress::push_handler::WorkHandlerMetrics;
// Add Prometheus metrics types
use crate::metrics::MetricsHierarchy; use crate::metrics::MetricsHierarchy;
use ingress::push_handler::WorkHandlerMetrics;
use prometheus::{CounterVec, Histogram, IntCounter, IntCounterVec, IntGauge}; use prometheus::{CounterVec, Histogram, IntCounter, IntCounterVec, IntGauge};
/// Shared default maximum TCP message size across request-plane components.
pub(crate) const DEFAULT_TCP_MAX_MESSAGE_SIZE: usize = 32 * 1024 * 1024;
static TCP_MAX_MESSAGE_SIZE: OnceLock<usize> = OnceLock::new();
/// Read the configured TCP max message size once and share it across client,
/// server, and zero-copy decoder code paths.
pub(crate) fn get_tcp_max_message_size() -> usize {
*TCP_MAX_MESSAGE_SIZE.get_or_init(|| {
std::env::var("DYN_TCP_MAX_MESSAGE_SIZE")
.ok()
.and_then(|s| s.parse::<usize>().ok())
.unwrap_or(DEFAULT_TCP_MAX_MESSAGE_SIZE)
})
}
pub trait Codable: PipelineIO + Serialize + for<'de> Deserialize<'de> {} pub trait Codable: PipelineIO + Serialize + for<'de> Deserialize<'de> {}
impl<T: PipelineIO + Serialize + for<'de> Deserialize<'de>> Codable for T {} impl<T: PipelineIO + Serialize + for<'de> Deserialize<'de>> Codable for T {}
......
...@@ -9,30 +9,77 @@ ...@@ -9,30 +9,77 @@
//! 3. Splitting off exact message sizes (zero-copy via Bytes::split_to) //! 3. Splitting off exact message sizes (zero-copy via Bytes::split_to)
//! 4. Returning Arc-counted Bytes that can be cloned cheaply //! 4. Returning Arc-counted Bytes that can be cloned cheaply
use crate::pipeline::network::get_tcp_max_message_size;
use bytes::{Buf, Bytes, BytesMut}; use bytes::{Buf, Bytes, BytesMut};
use std::io; use std::io;
use std::sync::OnceLock;
use tokio::io::{AsyncRead, AsyncReadExt}; use tokio::io::{AsyncRead, AsyncReadExt};
/// Maximum message size (32MB default, configurable via env)
const MAX_MESSAGE_SIZE: usize = 32 * 1024 * 1024; // 32MB
const INITIAL_BUFFER_SIZE: usize = 262144; // 256KB const INITIAL_BUFFER_SIZE: usize = 262144; // 256KB
const DEFAULT_SHRINK_SIZE: usize = 8 * 1024 * 1024; // 8MB
static SHRINK_MESSAGE_SIZE: OnceLock<usize> = OnceLock::new();
/// Get the shrink message size threshold.
fn get_shrink_message_size() -> usize {
*SHRINK_MESSAGE_SIZE.get_or_init(|| {
let max_size = get_tcp_max_message_size();
// Check for environment variable override
let env_result = std::env::var("DYN_TCP_SHRINK_MESSAGE_SIZE");
let env_shrink_size = env_result.as_ref().ok().and_then(|s| {
s.parse::<usize>().ok().or_else(|| {
tracing::warn!(
env_var = "DYN_TCP_SHRINK_MESSAGE_SIZE",
value = %s,
"Invalid value for DYN_TCP_SHRINK_MESSAGE_SIZE, using default"
);
None
})
});
let resolved = resolve_shrink_message_size(max_size, env_shrink_size);
// Warn if the configured value was clamped
if let Some(configured) = env_shrink_size
&& configured != resolved
{
tracing::warn!(
configured_size = configured,
resolved_size = resolved,
max_size = max_size,
initial_buffer_size = INITIAL_BUFFER_SIZE,
"DYN_TCP_SHRINK_MESSAGE_SIZE was clamped to valid range. Note the size is in bytes."
);
}
resolved
})
}
/// Resolve the shrink message size threshold based on configuration and constraints.
///
fn resolve_shrink_message_size(max_size: usize, env_shrink_size: Option<usize>) -> usize {
let configured_size = env_shrink_size.unwrap_or(DEFAULT_SHRINK_SIZE);
fn get_max_message_size() -> usize { // Clamp to valid range: [INITIAL_BUFFER_SIZE, max_size]
std::env::var("DYN_TCP_MAX_MESSAGE_SIZE") configured_size
.ok() .min(max_size) // Don't exceed max message size
.and_then(|s| s.parse::<usize>().ok()) .max(INITIAL_BUFFER_SIZE) // Don't go below initial buffer size
.unwrap_or(MAX_MESSAGE_SIZE)
} }
/// Zero-copy streaming decoder that reuses buffers /// Zero-copy streaming decoder that reuses buffers
/// ///
/// This decoder maintains an internal buffer and only allocates when necessary. /// This decoder maintains an internal buffer and only allocates when necessary.
/// Messages are returned as Arc-counted Bytes slices, making cloning extremely cheap. /// Messages are returned as Arc-counted Bytes slices, making cloning extremely cheap.
/// The reusable buffer resets back to INITIAL_BUFFER_SIZE only when unread data
/// is empty and capacity exceeds DYN_TCP_SHRINK_MESSAGE_SIZE.
pub struct ZeroCopyTcpDecoder { pub struct ZeroCopyTcpDecoder {
/// Reusable read buffer - grows as needed but never shrinks /// Reusable read buffer - grows as needed, shrinks when empty and oversized
read_buffer: BytesMut, read_buffer: BytesMut,
/// Maximum allowed message size /// Maximum allowed message size
max_message_size: usize, max_message_size: usize,
/// Threshold for shrinking buffer back to initial size when empty
shrink_threshold: usize,
} }
impl ZeroCopyTcpDecoder { impl ZeroCopyTcpDecoder {
...@@ -45,7 +92,8 @@ impl ZeroCopyTcpDecoder { ...@@ -45,7 +92,8 @@ impl ZeroCopyTcpDecoder {
pub fn with_capacity(capacity: usize) -> Self { pub fn with_capacity(capacity: usize) -> Self {
Self { Self {
read_buffer: BytesMut::with_capacity(capacity), read_buffer: BytesMut::with_capacity(capacity),
max_message_size: get_max_message_size(), max_message_size: get_tcp_max_message_size(),
shrink_threshold: get_shrink_message_size(),
} }
} }
...@@ -167,6 +215,11 @@ impl ZeroCopyTcpDecoder { ...@@ -167,6 +215,11 @@ impl ZeroCopyTcpDecoder {
// split_to() just advances the internal pointer, doesn't allocate or copy // split_to() just advances the internal pointer, doesn't allocate or copy
let message_bytes = self.read_buffer.split_to(total_len).freeze(); let message_bytes = self.read_buffer.split_to(total_len).freeze();
// Shrink buffer if it grew too large and is now empty, could be optimized with lock-free buffer pool in the future.
if self.read_buffer.is_empty() && self.read_buffer.capacity() > self.shrink_threshold {
self.read_buffer = BytesMut::with_capacity(INITIAL_BUFFER_SIZE);
}
// Return zero-copy message wrapper // Return zero-copy message wrapper
Ok(TcpRequestMessageZeroCopy::new(message_bytes)) Ok(TcpRequestMessageZeroCopy::new(message_bytes))
} }
...@@ -303,6 +356,64 @@ mod tests { ...@@ -303,6 +356,64 @@ mod tests {
use super::*; use super::*;
use tokio::io::AsyncWriteExt; use tokio::io::AsyncWriteExt;
#[test]
fn test_resolve_shrink_message_size_edge_cases() {
// Test case: max_size = 10MB (larger than DEFAULT_SHRINK_SIZE)
// Should return DEFAULT_SHRINK_SIZE (8MB) since env is None
let max_size_10mb = 10 * 1024 * 1024;
let result = resolve_shrink_message_size(max_size_10mb, None);
assert_eq!(
result, DEFAULT_SHRINK_SIZE,
"10MB max should return default 8MB"
);
// Test case: max_size < DEFAULT_SHRINK_SIZE
// Should return max_size (capped by .min())
let max_size_1mb = 1024 * 1024;
let result = resolve_shrink_message_size(max_size_1mb, None);
assert_eq!(result, max_size_1mb, "1MB max should be capped to 1MB");
// Test case: max_size = DEFAULT_SHRINK_SIZE
// Should return DEFAULT_SHRINK_SIZE (exact match)
let result = resolve_shrink_message_size(DEFAULT_SHRINK_SIZE, None);
assert_eq!(
result, DEFAULT_SHRINK_SIZE,
"exact match should return default"
);
// Test case: env_shrink_size provided and within bounds
let env_size = 2 * 1024 * 1024; // 2MB
let result = resolve_shrink_message_size(max_size_10mb, Some(env_size));
assert_eq!(
result, env_size,
"env var should be used when within bounds"
);
// Test case: env_shrink_size exceeds max_size
let env_size_large = 20 * 1024 * 1024; // 20MB
let result = resolve_shrink_message_size(max_size_10mb, Some(env_size_large));
assert_eq!(
result, max_size_10mb,
"env var should be capped to max_size"
);
// Test case: env_shrink_size below INITIAL_BUFFER_SIZE
let env_size_small = 100 * 1024; // 100KB < 256KB
let result = resolve_shrink_message_size(max_size_10mb, Some(env_size_small));
assert_eq!(
result, INITIAL_BUFFER_SIZE,
"env var should be clamped to INITIAL_BUFFER_SIZE"
);
// Test case: max_size below INITIAL_BUFFER_SIZE
let max_size_small = 100 * 1024; // 100KB < 256KB
let result = resolve_shrink_message_size(max_size_small, None);
assert_eq!(
result, INITIAL_BUFFER_SIZE,
"result should be clamped to INITIAL_BUFFER_SIZE"
);
}
#[tokio::test] #[tokio::test]
async fn test_zero_copy_decoder_basic() { async fn test_zero_copy_decoder_basic() {
// Create a test message with headers // Create a test message with headers
...@@ -500,4 +611,55 @@ mod tests { ...@@ -500,4 +611,55 @@ mod tests {
assert_eq!(msg.headers().len(), 1); assert_eq!(msg.headers().len(), 1);
assert_eq!(msg.headers().get("x-test-header").unwrap(), "test-value"); assert_eq!(msg.headers().get("x-test-header").unwrap(), "test-value");
} }
#[tokio::test]
async fn test_zero_copy_decoder_buffer_shrinking() {
// Test that buffer shrinks back after reading a large message.
// Uses small sizes to avoid env var dependencies and keep test fast.
let endpoint = "test/endpoint";
let small_payload = b"small";
// Use 1MB payload with 512KB shrink threshold
let large_payload = vec![0x42u8; 1024 * 1024]; // 1MB
fn make_message(endpoint: &str, payload: &[u8]) -> Vec<u8> {
let mut message = Vec::new();
message.extend_from_slice(&(endpoint.len() as u16).to_be_bytes());
message.extend_from_slice(endpoint.as_bytes());
message.extend_from_slice(&(0u16).to_be_bytes()); // empty headers
message.extend_from_slice(&(payload.len() as u32).to_be_bytes());
message.extend_from_slice(payload);
message
}
// Create decoder with explicit settings to avoid env var dependencies
let mut decoder = ZeroCopyTcpDecoder::with_capacity(INITIAL_BUFFER_SIZE);
decoder.max_message_size = 2 * 1024 * 1024; // 2MB max
decoder.shrink_threshold = 512 * 1024; // 512KB shrink threshold
assert!(decoder.buffer_capacity() <= INITIAL_BUFFER_SIZE);
// Read large message - buffer grows during read, then shrinks after split_to()
let large_message = make_message(endpoint, &large_payload);
let mut reader = &large_message[..];
decoder.read_message(&mut reader).await.unwrap();
// After reading, buffer should have shrunk back because:
// - The buffer grew to ~1MB to hold the message
// - 1MB >= 512KB shrink threshold, so it triggers the shrink
assert!(
decoder.buffer_capacity() <= INITIAL_BUFFER_SIZE,
"buffer should shrink after large message, got capacity {}",
decoder.buffer_capacity()
);
assert!(
decoder.buffered_len() == 0,
"buffer should be empty after read"
);
// Read small message - should work fine with shrunk buffer
let small_message = make_message(endpoint, small_payload);
let mut reader = &small_message[..];
let msg = decoder.read_message(&mut reader).await.unwrap();
assert_eq!(msg.payload().as_ref(), small_payload);
}
} }
...@@ -14,6 +14,7 @@ use super::unified_client::{ClientStats, Headers, RequestPlaneClient}; ...@@ -14,6 +14,7 @@ use super::unified_client::{ClientStats, Headers, RequestPlaneClient};
use crate::metrics::transport_metrics::{ use crate::metrics::transport_metrics::{
TCP_BYTES_RECEIVED_TOTAL, TCP_BYTES_SENT_TOTAL, TCP_ERRORS_TOTAL, TCP_BYTES_RECEIVED_TOTAL, TCP_BYTES_SENT_TOTAL, TCP_ERRORS_TOTAL,
}; };
use crate::pipeline::network::get_tcp_max_message_size;
use anyhow::Result; use anyhow::Result;
use async_trait::async_trait; use async_trait::async_trait;
use bytes::{Bytes, BytesMut}; use bytes::{Bytes, BytesMut};
...@@ -60,9 +61,6 @@ const DEFAULT_GLOBAL_CONNECT_LIMIT: usize = 64; ...@@ -60,9 +61,6 @@ const DEFAULT_GLOBAL_CONNECT_LIMIT: usize = 64;
/// Default idle host TTL in seconds before cleanup /// Default idle host TTL in seconds before cleanup
const DEFAULT_HOST_IDLE_TTL_SECS: u64 = 300; const DEFAULT_HOST_IDLE_TTL_SECS: u64 = 300;
/// Default maximum message size for TCP client (32 MB)
const DEFAULT_MAX_MESSAGE_SIZE: usize = 32 * 1024 * 1024;
/// Spin loop limit before falling back to async Notify in writer task /// Spin loop limit before falling back to async Notify in writer task
const WRITER_SPIN_LIMIT: u32 = 64; const WRITER_SPIN_LIMIT: u32 = 64;
...@@ -71,14 +69,6 @@ const WRITER_SPIN_LIMIT: u32 = 64; ...@@ -71,14 +69,6 @@ const WRITER_SPIN_LIMIT: u32 = 64;
/// stays at the high-water mark for subsequent batches (amortised zero allocation). /// stays at the high-water mark for subsequent batches (amortised zero allocation).
const WRITER_INITIAL_BUF_CAPACITY: usize = 256 * 1024; const WRITER_INITIAL_BUF_CAPACITY: usize = 256 * 1024;
/// Get maximum message size from environment or use default
fn get_max_message_size() -> usize {
std::env::var("DYN_TCP_MAX_MESSAGE_SIZE")
.ok()
.and_then(|s| s.parse::<usize>().ok())
.unwrap_or(DEFAULT_MAX_MESSAGE_SIZE)
}
/// Check if latency tracing is enabled via environment /// Check if latency tracing is enabled via environment
fn latency_trace_enabled() -> bool { fn latency_trace_enabled() -> bool {
std::env::var("DYN_TCP_LATENCY_TRACE") std::env::var("DYN_TCP_LATENCY_TRACE")
...@@ -593,7 +583,7 @@ impl TcpConnection { ...@@ -593,7 +583,7 @@ impl TcpConnection {
) -> Result<()> { ) -> Result<()> {
use crate::pipeline::network::codec::TcpResponseCodec; use crate::pipeline::network::codec::TcpResponseCodec;
let max_message_size = get_max_message_size(); let max_message_size = get_tcp_max_message_size();
let codec = TcpResponseCodec::new(Some(max_message_size)); let codec = TcpResponseCodec::new(Some(max_message_size));
let mut framed = FramedRead::new(read_half, codec); let mut framed = FramedRead::new(read_half, codec);
......
...@@ -22,9 +22,6 @@ use tokio_util::bytes::BytesMut; ...@@ -22,9 +22,6 @@ use tokio_util::bytes::BytesMut;
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use tracing::Instrument; use tracing::Instrument;
/// Default maximum message size for TCP server (32 MB)
const DEFAULT_MAX_MESSAGE_SIZE: usize = 32 * 1024 * 1024;
/// Default worker pool size for TCP request handling /// Default worker pool size for TCP request handling
const DEFAULT_WORKER_POOL_SIZE: usize = 1500; const DEFAULT_WORKER_POOL_SIZE: usize = 1500;
...@@ -32,14 +29,6 @@ const DEFAULT_WORKER_POOL_SIZE: usize = 1500; ...@@ -32,14 +29,6 @@ const DEFAULT_WORKER_POOL_SIZE: usize = 1500;
/// this is 4X the worker pool size to handle burst traffic /// this is 4X the worker pool size to handle burst traffic
const DEFAULT_WORK_QUEUE_SIZE: usize = 6000; const DEFAULT_WORK_QUEUE_SIZE: usize = 6000;
/// Get maximum message size from environment or use default
fn get_max_message_size() -> usize {
std::env::var("DYN_TCP_MAX_MESSAGE_SIZE")
.ok()
.and_then(|s| s.parse::<usize>().ok())
.unwrap_or(DEFAULT_MAX_MESSAGE_SIZE)
}
/// Get worker pool size from environment or use default /// Get worker pool size from environment or use default
fn get_worker_pool_size() -> usize { fn get_worker_pool_size() -> usize {
std::env::var("DYN_TCP_WORKER_POOL_SIZE") std::env::var("DYN_TCP_WORKER_POOL_SIZE")
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment