"...git@developer.sourcefind.cn:2222/OpenDAS/vllm_cscc.git" did not exist on "4fccd30f19e0b44ec4a2b076cfc33aeafdd2d72e"
Unverified Commit 0f8e1a9e authored by Yan Ru Pei's avatar Yan Ru Pei Committed by GitHub
Browse files

fix(kv-router): recover initial standalone-indexer ZMQ gaps (#7463)


Signed-off-by: default avatarPeaBrane <yanrpei@gmail.com>
parent 00d8ea5a
...@@ -31,6 +31,14 @@ fn calculate_backoff_ms(consecutive_errors: u32) -> u64 { ...@@ -31,6 +31,14 @@ fn calculate_backoff_ms(consecutive_errors: u32) -> u64 {
const WATERMARK_UNSET: u64 = u64::MAX; const WATERMARK_UNSET: u64 = u64::MAX;
fn gap_start(prev: u64, seq: u64) -> Option<u64> {
if prev == WATERMARK_UNSET {
return (seq > 0).then_some(0);
}
(seq > prev + 1).then_some(prev + 1)
}
#[expect(clippy::too_many_arguments)] #[expect(clippy::too_many_arguments)]
async fn replay_gap( async fn replay_gap(
replay_socket: &mut DealerSocket, replay_socket: &mut DealerSocket,
...@@ -323,8 +331,7 @@ async fn zmq_recv_loop( ...@@ -323,8 +331,7 @@ async fn zmq_recv_loop(
let seq = u64::from_be_bytes(seq_bytes[..8].try_into().expect("length checked above")); let seq = u64::from_be_bytes(seq_bytes[..8].try_into().expect("length checked above"));
let prev = watermark.load(Ordering::Acquire); let prev = watermark.load(Ordering::Acquire);
if prev != WATERMARK_UNSET && seq > prev + 1 { if let Some(gap_start) = gap_start(prev, seq) {
let gap_start = prev + 1;
tracing::warn!( tracing::warn!(
worker_id, worker_id,
dp_rank, dp_rank,
......
...@@ -226,10 +226,6 @@ impl ZmqKvEventSink { ...@@ -226,10 +226,6 @@ impl ZmqKvEventSink {
let zmq_msg = zeromq::ZmqMessage::try_from(frames) let zmq_msg = zeromq::ZmqMessage::try_from(frames)
.expect("Failed to create ZMQ multipart message"); .expect("Failed to create ZMQ multipart message");
if let Err(e) = pub_socket.send(zmq_msg).await {
tracing::warn!("Failed to send ZMQ KV event: {e}");
}
if router_socket.is_some() { if router_socket.is_some() {
if ring_buffer.len() >= REPLAY_BUFFER_CAPACITY { if ring_buffer.len() >= REPLAY_BUFFER_CAPACITY {
ring_buffer.pop_front(); ring_buffer.pop_front();
...@@ -237,6 +233,12 @@ impl ZmqKvEventSink { ...@@ -237,6 +233,12 @@ impl ZmqKvEventSink {
ring_buffer.push_back((seq_num, payload)); ring_buffer.push_back((seq_num, payload));
} }
// Record the batch for replay before live publish so listeners
// can recover even if the PUB send is missed or fails.
if let Err(e) = pub_socket.send(zmq_msg).await {
tracing::warn!("Failed to send ZMQ KV event: {e}");
}
seq_num += 1; seq_num += 1;
} }
} }
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment