Unverified Commit d49fc3eb authored by Yan Ru Pei's avatar Yan Ru Pei Committed by GitHub
Browse files

fix: purge before snapshotting (#3077)


Signed-off-by: default avatarPeaBrane <yanrpei@gmail.com>
parent 0373b897
......@@ -208,10 +208,15 @@ impl ApproxKvIndexer {
};
tokio::select! {
Some(request) = match_rx.recv() => {
let scores = trie.find_matches(request.sequence, false);
request.resp.send(scores).unwrap();
_ = cancel_clone.cancelled() => {
tracing::debug!("Approximate Indexer progress loop shutting down");
return;
}
Some(worker) = remove_worker_rx.recv() => {
trie.remove_worker(worker);
}
Some(result) = route_rx.recv() => {
let hashes = result.local_hashes.iter().zip(result.sequence_hashes.iter());
......@@ -239,14 +244,17 @@ impl ApproxKvIndexer {
worker: result.worker_id,
}).collect());
}
Some(worker) = remove_worker_rx.recv() => {
trie.remove_worker(worker);
}
Some(dump_req) = dump_rx.recv() => {
let events = trie.dump_tree_as_events();
let _ = dump_req.resp.send(events);
}
Some(request) = match_rx.recv() => {
let scores = trie.find_matches(request.sequence, false);
request.resp.send(scores).unwrap();
}
_ = expiry_fut => {
let expired = timer_manager.pop_expired();
......@@ -266,11 +274,6 @@ impl ApproxKvIndexer {
let _ = trie.apply_event(event);
});
}
_ = cancel_clone.cancelled() => {
tracing::debug!("Approximate Indexer progress loop shutting down");
return;
}
}
}
});
......
......@@ -797,13 +797,19 @@ impl KvIndexer {
tokio::select! {
biased;
_ = cancel.cancelled() => {
tracing::debug!("KvCacheIndexer progress loop shutting down");
return;
}
Some(worker) = remove_worker_rx.recv() => {
trie.remove_worker(worker);
}
Some(req) = match_rx.recv() => {
let matches = trie.find_matches(req.sequence, req.early_exit);
let _ = req.resp.send(matches);
Some(event) = event_rx.recv() => {
let event_type = KvIndexerMetrics::get_event_type(&event.event.data);
let result = trie.apply_event(event);
metrics.increment_event_applied(event_type, result);
}
Some(dump_req) = dump_rx.recv() => {
......@@ -811,15 +817,9 @@ impl KvIndexer {
let _ = dump_req.resp.send(events);
}
_ = cancel.cancelled() => {
tracing::debug!("KvCacheIndexer progress loop shutting down");
return;
}
Some(event) = event_rx.recv() => {
let event_type = KvIndexerMetrics::get_event_type(&event.event.data);
let result = trie.apply_event(event);
metrics.increment_event_applied(event_type, result);
Some(req) = match_rx.recv() => {
let matches = trie.find_matches(req.sequence, req.early_exit);
let _ = req.resp.send(matches);
}
}
}
......@@ -1040,15 +1040,19 @@ impl KvIndexerSharded {
tokio::select! {
biased;
_ = cancel.cancelled() => {
tracing::trace!("KvCacheIndexer progress loop shutting down");
return;
}
Some(worker) = shard_remove_worker_rx.recv() => {
trie.remove_worker(worker);
}
Ok(req) = shard_broadcast_rx.recv() => {
let matches = trie.find_matches(req.sequence, req.early_exit);
if let Err(e) = req.resp.send(matches).await {
tracing::trace!("Failed to send match response: {:?}", e);
}
Some(event) = shard_event_rx.recv() => {
let event_type = KvIndexerMetrics::get_event_type(&event.event.data);
let result = trie.apply_event(event);
metrics.increment_event_applied(event_type, result);
}
Some(dump_req) = shard_dump_rx.recv() => {
......@@ -1056,15 +1060,11 @@ impl KvIndexerSharded {
let _ = dump_req.resp.send(events);
}
_ = cancel.cancelled() => {
tracing::trace!("KvCacheIndexer progress loop shutting down");
return;
}
Some(event) = shard_event_rx.recv() => {
let event_type = KvIndexerMetrics::get_event_type(&event.event.data);
let result = trie.apply_event(event);
metrics.increment_event_applied(event_type, result);
Ok(req) = shard_broadcast_rx.recv() => {
let matches = trie.find_matches(req.sequence, req.early_exit);
if let Err(e) = req.resp.send(matches).await {
tracing::trace!("Failed to send match response: {:?}", e);
}
}
}
}
......
......@@ -244,7 +244,7 @@ pub async fn start_kv_router_background(
};
// Perform snapshot upload and purge
match perform_snapshot_and_purge(
match purge_then_snapshot(
&mut nats_queue,
snapshot_tx,
resources
......@@ -322,15 +322,19 @@ pub async fn start_kv_router_background(
}
/// Perform snapshot upload and purge operations
async fn perform_snapshot_and_purge(
async fn purge_then_snapshot(
nats_queue: &mut NatsQueue,
snapshot_tx: &mpsc::Sender<DumpRequest>,
resources: &SnapshotResources,
) -> anyhow::Result<()> {
// Snapshot before purge ensures we capture the current state before removing any messages.
// This guarantees the snapshot matches what has been acknowledged up to this point.
// Purge before snapshot ensures new/warm-restarted routers won't replay already-acknowledged messages.
// Since KV events are idempotent, this ordering reduces unnecessary reprocessing while maintaining
// at-least-once delivery guarantees. The snapshot will capture the clean state after purge.
// First, request a snapshot from the indexer
// First, purge acknowledged messages from the stream
nats_queue.purge_acknowledged().await?;
// Now request a snapshot from the indexer (which reflects the post-purge state)
let (resp_tx, resp_rx) = oneshot::channel();
let dump_req = DumpRequest { resp: resp_tx };
......@@ -363,8 +367,5 @@ async fn perform_snapshot_and_purge(
resources.bucket_name
);
// Now purge acknowledged messages from the stream
nats_queue.purge_acknowledged().await?;
Ok(())
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment