Unverified Commit 382a26f0 authored by Yan Ru Pei's avatar Yan Ru Pei Committed by GitHub
Browse files

chore: type ZmqMessage (#5689)


Signed-off-by: default avatarPeaBrane <yanrpei@gmail.com>
parent 4636ecaf
...@@ -32,6 +32,15 @@ use super::frame::Frame; ...@@ -32,6 +32,15 @@ use super::frame::Frame;
use super::transport::{EventTransportRx, EventTransportTx, WireStream}; use super::transport::{EventTransportRx, EventTransportTx, WireStream};
use crate::discovery::EventTransportKind; use crate::discovery::EventTransportKind;
/// Parts of a received ZMQ multipart message.
struct ZmqMessage {
#[allow(dead_code)]
topic: Vec<u8>,
publisher_id: u64,
sequence: u64,
data: Vec<u8>,
}
/// ZMQ PUB transport for publishing events. /// ZMQ PUB transport for publishing events.
/// ///
/// Uses raw zmq::Socket with configured HWM for better scalability. /// Uses raw zmq::Socket with configured HWM for better scalability.
...@@ -360,48 +369,55 @@ impl ZmqSubTransport { ...@@ -360,48 +369,55 @@ impl ZmqSubTransport {
loop { loop {
// Receive multipart message in blocking task: [topic, publisher_id, sequence, frame_bytes] // Receive multipart message in blocking task: [topic, publisher_id, sequence, frame_bytes]
let socket_clone = Arc::clone(&socket); let socket_clone = Arc::clone(&socket);
let result = tokio::task::spawn_blocking( let result = tokio::task::spawn_blocking(move || -> Result<Option<ZmqMessage>> {
move || -> Result<Option<(Vec<u8>, u64, u64, Vec<u8>)>> { let socket = socket_clone.lock().unwrap();
let socket = socket_clone.lock().unwrap();
// Receive topic frame (may timeout with EAGAIN)
// Receive topic frame (may timeout with EAGAIN) let topic = match socket.recv_bytes(0) {
let topic = match socket.recv_bytes(0) { Ok(data) => data,
Ok(data) => data, Err(zmq::Error::EAGAIN) => return Ok(None), // Timeout, retry
Err(zmq::Error::EAGAIN) => return Ok(None), // Timeout, retry Err(e) => return Err(e.into()),
Err(e) => return Err(e.into()), };
};
// Receive publisher_id frame (8 bytes, u64 big-endian)
// Receive publisher_id frame (8 bytes, u64 big-endian) let publisher_id_bytes = socket.recv_bytes(0)?;
let publisher_id_bytes = socket.recv_bytes(0)?; if publisher_id_bytes.len() != 8 {
if publisher_id_bytes.len() != 8 { anyhow::bail!(
anyhow::bail!( "Invalid publisher_id frame: expected 8 bytes, got {}",
"Invalid publisher_id frame: expected 8 bytes, got {}", publisher_id_bytes.len()
publisher_id_bytes.len() );
); }
} let publisher_id = u64::from_be_bytes(publisher_id_bytes.try_into().unwrap());
let publisher_id =
u64::from_be_bytes(publisher_id_bytes.try_into().unwrap()); // Receive sequence frame (8 bytes, u64 big-endian)
let sequence_bytes = socket.recv_bytes(0)?;
// Receive sequence frame (8 bytes, u64 big-endian) if sequence_bytes.len() != 8 {
let sequence_bytes = socket.recv_bytes(0)?; anyhow::bail!(
if sequence_bytes.len() != 8 { "Invalid sequence frame: expected 8 bytes, got {}",
anyhow::bail!( sequence_bytes.len()
"Invalid sequence frame: expected 8 bytes, got {}", );
sequence_bytes.len() }
); let sequence = u64::from_be_bytes(sequence_bytes.try_into().unwrap());
}
let sequence = u64::from_be_bytes(sequence_bytes.try_into().unwrap()); // Receive data frame
let data = socket.recv_bytes(0)?;
// Receive data frame
let data = socket.recv_bytes(0)?; Ok(Some(ZmqMessage {
topic,
Ok(Some((topic, publisher_id, sequence, data))) publisher_id,
}, sequence,
) data,
}))
})
.await; .await;
match result { match result {
Ok(Ok(Some((_topic, publisher_id, sequence, frame_bytes)))) => { Ok(Ok(Some(ZmqMessage {
publisher_id,
sequence,
data: frame_bytes,
..
}))) => {
// Log dedup metadata for debugging // Log dedup metadata for debugging
tracing::trace!( tracing::trace!(
publisher_id = publisher_id, publisher_id = publisher_id,
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment