Unverified Commit bc76247d authored by Biswa Panda's avatar Biswa Panda Committed by GitHub
Browse files

fix: add recv timeout for zmq transport (#5685)

parent 9c71f286
...@@ -25,6 +25,7 @@ use std::sync::{Arc, Mutex}; ...@@ -25,6 +25,7 @@ use std::sync::{Arc, Mutex};
/// Default ZMQ HWM is 1000, which limits scalability. /// Default ZMQ HWM is 1000, which limits scalability.
const ZMQ_SNDHWM: i32 = 100_000; // Send buffer: 100K messages const ZMQ_SNDHWM: i32 = 100_000; // Send buffer: 100K messages
const ZMQ_RCVHWM: i32 = 100_000; // Receive buffer: 100K messages const ZMQ_RCVHWM: i32 = 100_000; // Receive buffer: 100K messages
const ZMQ_RCVTIMEOUT_MS: i32 = 100; // Receive timeout: 100ms (avoids blocking forever)
use super::codec::MsgpackCodec; use super::codec::MsgpackCodec;
use super::frame::Frame; use super::frame::Frame;
...@@ -244,8 +245,8 @@ impl ZmqSubTransport { ...@@ -244,8 +245,8 @@ impl ZmqSubTransport {
// Configure High Water Mark for better scalability // Configure High Water Mark for better scalability
socket.set_rcvhwm(ZMQ_RCVHWM)?; socket.set_rcvhwm(ZMQ_RCVHWM)?;
// Set receive timeout to -1 (blocking) // Set receive timeout to avoid blocking forever (fixes test hangs)
socket.set_rcvtimeo(-1)?; socket.set_rcvtimeo(ZMQ_RCVTIMEOUT_MS)?;
// Connect to endpoint // Connect to endpoint
socket.connect(&endpoint_owned)?; socket.connect(&endpoint_owned)?;
...@@ -307,8 +308,8 @@ impl ZmqSubTransport { ...@@ -307,8 +308,8 @@ impl ZmqSubTransport {
// Configure High Water Mark for better scalability // Configure High Water Mark for better scalability
socket.set_rcvhwm(ZMQ_RCVHWM)?; socket.set_rcvhwm(ZMQ_RCVHWM)?;
// Set receive timeout to -1 (blocking) // Set receive timeout to avoid blocking forever (fixes test hangs)
socket.set_rcvtimeo(-1)?; socket.set_rcvtimeo(ZMQ_RCVTIMEOUT_MS)?;
// Connect to all endpoints // Connect to all endpoints
for endpoint in &endpoints_owned { for endpoint in &endpoints_owned {
...@@ -350,6 +351,7 @@ impl ZmqSubTransport { ...@@ -350,6 +351,7 @@ impl ZmqSubTransport {
/// ///
/// This task holds the socket lock only briefly during each recv operation, /// This task holds the socket lock only briefly during each recv operation,
/// allowing multiple subscribers to receive concurrently via broadcast channel. /// allowing multiple subscribers to receive concurrently via broadcast channel.
/// Uses finite timeout to avoid blocking forever (fixes test hangs from ZMQ "slow joiner" problem).
fn start_socket_pump( fn start_socket_pump(
socket: Arc<Mutex<zmq::Socket>>, socket: Arc<Mutex<zmq::Socket>>,
broadcast_tx: tokio::sync::broadcast::Sender<Bytes>, broadcast_tx: tokio::sync::broadcast::Sender<Bytes>,
...@@ -358,12 +360,16 @@ impl ZmqSubTransport { ...@@ -358,12 +360,16 @@ impl ZmqSubTransport {
loop { loop {
// Receive multipart message in blocking task: [topic, publisher_id, sequence, frame_bytes] // Receive multipart message in blocking task: [topic, publisher_id, sequence, frame_bytes]
let socket_clone = Arc::clone(&socket); let socket_clone = Arc::clone(&socket);
let result = let result = tokio::task::spawn_blocking(
tokio::task::spawn_blocking(move || -> Result<(Vec<u8>, u64, u64, Vec<u8>)> { move || -> Result<Option<(Vec<u8>, u64, u64, Vec<u8>)>> {
let socket = socket_clone.lock().unwrap(); let socket = socket_clone.lock().unwrap();
// Receive topic frame // Receive topic frame (may timeout with EAGAIN)
let topic = socket.recv_bytes(0)?; let topic = match socket.recv_bytes(0) {
Ok(data) => data,
Err(zmq::Error::EAGAIN) => return Ok(None), // Timeout, retry
Err(e) => return Err(e.into()),
};
// Receive publisher_id frame (8 bytes, u64 big-endian) // Receive publisher_id frame (8 bytes, u64 big-endian)
let publisher_id_bytes = socket.recv_bytes(0)?; let publisher_id_bytes = socket.recv_bytes(0)?;
...@@ -389,12 +395,13 @@ impl ZmqSubTransport { ...@@ -389,12 +395,13 @@ impl ZmqSubTransport {
// Receive data frame // Receive data frame
let data = socket.recv_bytes(0)?; let data = socket.recv_bytes(0)?;
Ok((topic, publisher_id, sequence, data)) Ok(Some((topic, publisher_id, sequence, data)))
}) },
.await; )
.await;
match result { match result {
Ok(Ok((_topic, publisher_id, sequence, frame_bytes))) => { Ok(Ok(Some((_topic, publisher_id, sequence, frame_bytes)))) => {
// Log dedup metadata for debugging // Log dedup metadata for debugging
tracing::trace!( tracing::trace!(
publisher_id = publisher_id, publisher_id = publisher_id,
...@@ -416,6 +423,10 @@ impl ZmqSubTransport { ...@@ -416,6 +423,10 @@ impl ZmqSubTransport {
} }
} }
} }
Ok(Ok(None)) => {
// Timeout (EAGAIN), continue polling
continue;
}
Ok(Err(e)) => { Ok(Err(e)) => {
tracing::error!(error = %e, "ZMQ receive error in socket pump"); tracing::error!(error = %e, "ZMQ receive error in socket pump");
break; break;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment