Unverified Commit f3d3a8b3 authored by Yan Ru Pei's avatar Yan Ru Pei Committed by GitHub
Browse files

fix(kv-router): coalesce worker recovery dumps (#8021)


Signed-off-by: default avatarPeaBrane <yanrpei@gmail.com>
parent a1f230e9
...@@ -378,11 +378,7 @@ impl KvIndexer { ...@@ -378,11 +378,7 @@ impl KvIndexer {
self.event_tx.clone() self.event_tx.clone()
} }
/// Get a sender for dump requests (snapshot events). #[cfg(test)]
///
/// ### Returns
///
/// A `mpsc::Sender` for `DumpRequest`s.
pub fn snapshot_event_sender(&self) -> mpsc::Sender<DumpRequest> { pub fn snapshot_event_sender(&self) -> mpsc::Sender<DumpRequest> {
self.dump_tx.clone() self.dump_tx.clone()
} }
......
...@@ -7,19 +7,198 @@ use std::{ ...@@ -7,19 +7,198 @@ use std::{
}; };
use async_trait::async_trait; use async_trait::async_trait;
use tokio::sync::mpsc; use tokio::sync::futures::OwnedNotified;
use tokio::sync::{Mutex as AsyncMutex, Notify, mpsc};
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use super::{ use super::{
DumpRequest, GetWorkersRequest, KvIndexer, KvIndexerInterface, KvIndexerMetrics, KvRouterError, GetWorkersRequest, KvIndexer, KvIndexerInterface, KvIndexerMetrics, KvRouterError,
WorkerKvQueryResponse, WorkerKvQueryResponse,
}; };
use crate::protocols::*; use crate::protocols::*;
#[cfg(test)]
use super::DumpRequest;
#[cfg(test)]
use std::sync::atomic::{AtomicUsize, Ordering};
#[cfg(test)]
use tokio::time::Duration;
// ------------------------------------------------- // -------------------------------------------------
// Decentralized router: LocalKvIndexer for workers // Decentralized router: LocalKvIndexer for workers
// ------------------------------------------------- // -------------------------------------------------
#[derive(Clone)]
struct CachedRecoverySnapshot {
events: Arc<Vec<RouterEvent>>,
base_last_event_id: u64,
last_event_id: u64,
}
impl CachedRecoverySnapshot {
fn into_response(self) -> WorkerKvQueryResponse {
WorkerKvQueryResponse::TreeDump {
events: self.events.as_ref().clone(),
last_event_id: self.last_event_id,
}
}
}
#[derive(Clone)]
struct InFlightRecoveryBuild {
generation: u64,
notify: Arc<Notify>,
}
#[derive(Default)]
struct RecoveryCacheState {
generation: u64,
cached: Option<CachedRecoverySnapshot>,
building: Option<InFlightRecoveryBuild>,
}
struct RecoverySnapshotCache {
state: AsyncMutex<RecoveryCacheState>,
}
enum DumpPlan {
Immediate(WorkerKvQueryResponse),
RequiresDump { last_event_id: u64 },
}
enum CacheReuseDecision {
ReturnExact(CachedRecoverySnapshot),
ReturnExtended(WorkerKvQueryResponse),
WaitForBuilder(OwnedNotified),
BuildFresh {
build: InFlightRecoveryBuild,
last_event_id: u64,
},
}
enum TailAppendSafety {
ExactHit,
Safe {
last_event_id: u64,
tail: Vec<RouterEvent>,
},
Invalidate,
}
enum BuildTaskResult {
Response(WorkerKvQueryResponse),
StaleGeneration,
}
struct FreshDumpOutput {
response: WorkerKvQueryResponse,
snapshot: Option<CachedRecoverySnapshot>,
}
impl RecoverySnapshotCache {
fn new() -> Self {
Self {
state: AsyncMutex::new(RecoveryCacheState::default()),
}
}
async fn decide_reuse_or_build<F>(
&self,
fallback_last_event_id: u64,
current_last_event_id: Option<u64>,
assess_tail_append_safety: F,
) -> CacheReuseDecision
where
F: FnOnce(&CachedRecoverySnapshot) -> TailAppendSafety,
{
let mut cache_state = self.state.lock().await;
if let Some(cached) = cache_state.cached.clone() {
match assess_tail_append_safety(&cached) {
TailAppendSafety::ExactHit => return CacheReuseDecision::ReturnExact(cached),
TailAppendSafety::Safe {
last_event_id,
tail,
} => {
let mut events = cached.events.as_ref().clone();
events.extend(tail);
let shared_events = Arc::new(events);
cache_state.cached = Some(CachedRecoverySnapshot {
events: shared_events.clone(),
base_last_event_id: cached.base_last_event_id,
last_event_id,
});
return CacheReuseDecision::ReturnExtended(WorkerKvQueryResponse::TreeDump {
events: shared_events.as_ref().clone(),
last_event_id,
});
}
TailAppendSafety::Invalidate => {
cache_state.cached = None;
}
}
}
if let Some(build) = cache_state.building.clone() {
return CacheReuseDecision::WaitForBuilder(build.notify.notified_owned());
}
let build = InFlightRecoveryBuild {
generation: cache_state.generation,
notify: Arc::new(Notify::new()),
};
let last_event_id = current_last_event_id.unwrap_or(fallback_last_event_id);
cache_state.building = Some(build.clone());
CacheReuseDecision::BuildFresh {
build,
last_event_id,
}
}
async fn finish_build(
&self,
build: &InFlightRecoveryBuild,
build_output: FreshDumpOutput,
) -> BuildTaskResult {
let mut cache_state = self.state.lock().await;
let is_current_build = cache_state
.building
.as_ref()
.is_some_and(|inflight| inflight.generation == build.generation);
let generation_matches = cache_state.generation == build.generation;
if is_current_build {
cache_state.building = None;
}
if !is_current_build || !generation_matches {
return BuildTaskResult::StaleGeneration;
}
if let Some(snapshot) = build_output.snapshot {
cache_state.cached = Some(snapshot);
}
BuildTaskResult::Response(build_output.response)
}
async fn clear_build_if_current(&self, generation: u64) {
let mut cache_state = self.state.lock().await;
if cache_state
.building
.as_ref()
.is_some_and(|inflight| inflight.generation == generation)
{
cache_state.building = None;
}
}
async fn invalidate(&self) {
let mut cache_state = self.state.lock().await;
cache_state.generation = cache_state.generation.saturating_add(1);
cache_state.cached = None;
}
}
/// A thin wrapper around KvIndexer that buffers recent events /// A thin wrapper around KvIndexer that buffers recent events
/// (e.g. which may be queued by router upon startup) /// (e.g. which may be queued by router upon startup)
/// ///
...@@ -28,8 +207,16 @@ pub struct LocalKvIndexer { ...@@ -28,8 +207,16 @@ pub struct LocalKvIndexer {
indexer: KvIndexer, indexer: KvIndexer,
/// Circular buffer of recent events /// Circular buffer of recent events
pub(super) event_buffer: Mutex<VecDeque<RouterEvent>>, pub(super) event_buffer: Mutex<VecDeque<RouterEvent>>,
/// Coordinates single-flight tree dumps and the cached recovery snapshot.
/// This stays separate from `event_buffer` so dump wait/build state can be
/// managed on the async path without holding the buffer lock across `.await`.
recovery_cache: Arc<RecoverySnapshotCache>,
/// Maximum number of events to keep in buffer /// Maximum number of events to keep in buffer
max_buffer_size: usize, // Router sets this to WORKER_KV_INDEXER_BUFFER_SIZE max_buffer_size: usize, // Router sets this to WORKER_KV_INDEXER_BUFFER_SIZE
#[cfg(test)]
dump_build_count: AtomicUsize,
#[cfg(test)]
dump_build_delay: Mutex<Option<Duration>>,
} }
impl LocalKvIndexer { impl LocalKvIndexer {
...@@ -43,35 +230,34 @@ impl LocalKvIndexer { ...@@ -43,35 +230,34 @@ impl LocalKvIndexer {
Self { Self {
indexer: KvIndexer::new(token, kv_block_size, metrics), indexer: KvIndexer::new(token, kv_block_size, metrics),
event_buffer: Mutex::new(VecDeque::with_capacity(max_buffer_size)), event_buffer: Mutex::new(VecDeque::with_capacity(max_buffer_size)),
recovery_cache: Arc::new(RecoverySnapshotCache::new()),
max_buffer_size, max_buffer_size,
#[cfg(test)]
dump_build_count: AtomicUsize::new(0),
#[cfg(test)]
dump_build_delay: Mutex::new(None),
} }
} }
/// Get all buffered events (oldest first). #[cfg(test)]
pub fn get_all_events_in_buffer(&self) -> Vec<RouterEvent> { pub fn get_all_events_in_buffer(&self) -> Vec<RouterEvent> {
let buffer = self.event_buffer.lock().unwrap(); let buffer = self.event_buffer.lock().unwrap();
buffer.iter().cloned().collect() buffer.iter().cloned().collect()
} }
/// Build a tree dump response with the given `last_event_id`.
async fn tree_dump_response(&self, last_event_id: u64) -> WorkerKvQueryResponse {
let events = self.dump_events().await.unwrap_or_default();
WorkerKvQueryResponse::TreeDump {
events,
last_event_id,
}
}
/// Query events by ID range, returning events in `[start_id, end_id]` (both inclusive). /// Query events by ID range, returning events in `[start_id, end_id]` (both inclusive).
/// ///
/// ### Arguments /// ### Arguments
/// ///
/// * `start_id` - Starting event ID (inclusive). If `None`, dumps entire tree. /// * `start_id` - Starting event ID (inclusive). If `None`, dumps entire tree.
/// * `end_id` - Ending event ID (inclusive). If `None`, returns up to newest available. /// * `end_id` - Ending event ID (inclusive). Used for validation and
/// `TooNew` responses; successful buffer-backed responses may still
/// return through the newest buffered event.
/// ///
/// ### Returns /// ### Returns
/// ///
/// - `Events`: Buffered events with original IDs (when range is within buffer) /// - `Events`: Buffered events with original IDs from `start_id` through the
/// current buffered tail, plus the buffered `last_event_id`
/// - `TreeDump`: Full tree dump with synthetic IDs and the worker's latest real event ID (when range is too old or unspecified) /// - `TreeDump`: Full tree dump with synthetic IDs and the worker's latest real event ID (when range is too old or unspecified)
/// - `TooNew`: Error when requested range is newer than available data /// - `TooNew`: Error when requested range is newer than available data
/// - `InvalidRange`: Error when end_id < start_id /// - `InvalidRange`: Error when end_id < start_id
...@@ -80,154 +266,313 @@ impl LocalKvIndexer { ...@@ -80,154 +266,313 @@ impl LocalKvIndexer {
start_id: Option<u64>, start_id: Option<u64>,
end_id: Option<u64>, end_id: Option<u64>,
) -> WorkerKvQueryResponse { ) -> WorkerKvQueryResponse {
// Validate range if both specified match self.classify_query(start_id, end_id) {
DumpPlan::Immediate(response) => response,
DumpPlan::RequiresDump { last_event_id } => {
self.get_cached_or_fresh_dump(last_event_id).await
}
}
}
/// Record an event in the buffer
fn record_event(&self, event: RouterEvent) -> bool {
let mut buffer = self.event_buffer.lock().unwrap();
let mut detected_gap = false;
// Check that event id is consecutive to last one
if let Some(last_event) = buffer.back()
&& event.event.event_id != last_event.event.event_id + 1
{
detected_gap = true;
let expected = last_event.event.event_id + 1;
tracing::error!(
worker_id = event.worker_id,
expected,
got = event.event.event_id,
"Non-consecutive KV event id; buffer may have gaps"
);
}
tracing::debug!(
"Recorded event {:?} in buffer, now size is {}",
event,
buffer.len()
);
// Add to back
buffer.push_back(event);
// Remove from front if over capacity (circular buffer behavior)
while buffer.len() > self.max_buffer_size {
buffer.pop_front();
}
detected_gap
}
/// Apply event with buffering.
///
/// This forwards the event to the underlying indexer and records it on success.
pub async fn apply_event_with_buffer(&self, event: RouterEvent) -> Result<(), KvRouterError> {
// Forward to underlying indexer
let result = self
.indexer
.event_sender()
.send(event.clone())
.await
.map_err(|_| KvRouterError::IndexerOffline);
if result.is_ok() {
let should_invalidate = matches!(event.event.data, KvCacheEventData::Cleared);
let detected_gap = self.record_event(event);
if should_invalidate || detected_gap {
self.recovery_cache.invalidate().await;
}
}
result
}
#[cfg(test)]
pub fn buffer_len(&self) -> usize {
let buffer = self.event_buffer.lock().unwrap();
buffer.len()
}
fn classify_query(&self, start_id: Option<u64>, end_id: Option<u64>) -> DumpPlan {
if let (Some(s), Some(e)) = (start_id, end_id) if let (Some(s), Some(e)) = (start_id, end_id)
&& e < s && e < s
{ {
tracing::warn!(start_id = s, end_id = e, "Invalid range: end_id < start_id"); tracing::warn!(start_id = s, end_id = e, "Invalid range: end_id < start_id");
return WorkerKvQueryResponse::InvalidRange { return DumpPlan::Immediate(WorkerKvQueryResponse::InvalidRange {
start_id: s, start_id: s,
end_id: e, end_id: e,
}; });
} }
// Get buffer state let buffer = self.event_buffer.lock().unwrap();
let (first_id, last_id) = { let (first_id, last_id) = if buffer.is_empty() {
let buffer = self.event_buffer.lock().unwrap(); (None, None)
if buffer.is_empty() { } else {
(None, None) (
} else { Some(buffer.front().unwrap().event.event_id),
( Some(buffer.back().unwrap().event.event_id),
Some(buffer.front().unwrap().event.event_id), )
Some(buffer.back().unwrap().event.event_id),
)
}
}; };
// If no start_id specified, dump entire tree
if start_id.is_none() { if start_id.is_none() {
tracing::debug!("No start_id specified, dumping entire tree"); tracing::debug!("No start_id specified, dumping entire tree");
return self.tree_dump_response(last_id.unwrap_or(0)).await; return DumpPlan::RequiresDump {
last_event_id: last_id.unwrap_or(0),
};
} }
let start_id = start_id.unwrap(); let start_id = start_id.expect("checked above");
let end_id = end_id.unwrap_or_else(|| last_id.unwrap_or(start_id)); let end_id = end_id.unwrap_or_else(|| last_id.unwrap_or(start_id));
// Check for empty buffer
let Some(first_buffered) = first_id else { let Some(first_buffered) = first_id else {
tracing::debug!("Buffer empty, dumping entire tree"); tracing::debug!("Buffer empty, dumping entire tree");
return self.tree_dump_response(0).await; return DumpPlan::RequiresDump { last_event_id: 0 };
}; };
let last_buffered = last_id.unwrap(); let last_buffered = last_id.expect("buffer non-empty");
// Check if request is too new
if start_id > last_buffered { if start_id > last_buffered {
tracing::warn!( tracing::warn!(
start_id, start_id,
last_buffered, last_buffered,
"Requested start_id is newer than buffer" "Requested start_id is newer than buffer"
); );
return WorkerKvQueryResponse::TooNew { return DumpPlan::Immediate(WorkerKvQueryResponse::TooNew {
requested_start: Some(start_id), requested_start: Some(start_id),
requested_end: Some(end_id), requested_end: Some(end_id),
newest_available: last_buffered, newest_available: last_buffered,
}; });
} }
// Check if start_id is too old (before buffer) -> tree dump
if start_id < first_buffered { if start_id < first_buffered {
tracing::info!( tracing::info!(
start_id, start_id,
first_buffered, first_buffered,
"Requested start_id is older than buffer, dumping entire tree" "Requested start_id is older than buffer, dumping entire tree"
); );
return self.tree_dump_response(last_buffered).await; return DumpPlan::RequiresDump {
last_event_id: last_buffered,
};
} }
// Serve from buffer let start_idx = match buffer.binary_search_by_key(&start_id, |event| event.event.event_id) {
let buffer = self.event_buffer.lock().unwrap();
let start_idx = match buffer.binary_search_by_key(&start_id, |e| e.event.event_id) {
Ok(idx) => idx, Ok(idx) => idx,
Err(insertion_point) => insertion_point, Err(insertion_point) => insertion_point,
}; };
let events = buffer.iter().skip(start_idx).cloned().collect();
// Clamp end_id to buffer bounds DumpPlan::Immediate(WorkerKvQueryResponse::Events {
let clamped_end_id = end_id.min(last_buffered); events,
let end_idx = match buffer.binary_search_by_key(&clamped_end_id, |e| e.event.event_id) { last_event_id: last_buffered,
Ok(idx) => idx + 1, // Include the matched element })
Err(insertion_point) => insertion_point, }
};
let events: Vec<RouterEvent> = buffer
.iter()
.skip(start_idx)
.take(end_idx.saturating_sub(start_idx))
.cloned()
.collect();
WorkerKvQueryResponse::Events(events) async fn get_cached_or_fresh_dump(&self, fallback_last_event_id: u64) -> WorkerKvQueryResponse {
loop {
let decision = self
.recovery_cache
.decide_reuse_or_build(
fallback_last_event_id,
self.current_buffer_last_event_id(),
|cached| self.assess_tail_append_safety(cached),
)
.await;
match decision {
CacheReuseDecision::ReturnExact(snapshot) => return snapshot.into_response(),
CacheReuseDecision::ReturnExtended(response) => return response,
CacheReuseDecision::WaitForBuilder(waiter) => waiter.await,
CacheReuseDecision::BuildFresh {
build,
last_event_id,
} => {
let notify = build.notify.clone();
let generation = build.generation;
let build_handle = self.spawn_dump_build(build, last_event_id);
match build_handle.await {
Ok(BuildTaskResult::Response(response)) => return response,
Ok(BuildTaskResult::StaleGeneration) => continue,
Err(error) => {
tracing::warn!("Recovery cache build task failed: {error}");
self.recovery_cache.clear_build_if_current(generation).await;
notify.notify_waiters();
return WorkerKvQueryResponse::TreeDump {
events: Vec::new(),
last_event_id,
};
}
}
}
}
}
} }
/// Record an event in the buffer fn assess_tail_append_safety(&self, cached: &CachedRecoverySnapshot) -> TailAppendSafety {
fn record_event(&self, event: RouterEvent) { let append_budget = self.recovery_cache_append_budget();
let mut buffer = self.event_buffer.lock().unwrap(); let buffer = self.event_buffer.lock().unwrap();
let Some(first_buffered) = buffer.front().map(|event| event.event.event_id) else {
return if cached.last_event_id == 0 {
TailAppendSafety::ExactHit
} else {
TailAppendSafety::Invalidate
};
};
let last_buffered = buffer.back().unwrap().event.event_id;
// Check that event id is consecutive to last one if last_buffered <= cached.last_event_id {
if let Some(last_event) = buffer.back() return TailAppendSafety::ExactHit;
&& event.event.event_id != last_event.event.event_id + 1
{
let expected = last_event.event.event_id + 1;
tracing::error!(
worker_id = event.worker_id,
expected,
got = event.event.event_id,
"Non-consecutive KV event id; buffer may have gaps"
);
} }
tracing::debug!(
"Recorded event {:?} in buffer, now size is {}",
event,
buffer.len()
);
// Add to back let appended_since_base = last_buffered.saturating_sub(cached.base_last_event_id);
buffer.push_back(event); if appended_since_base > append_budget {
return TailAppendSafety::Invalidate;
}
// Remove from front if over capacity (circular buffer behavior) let next_event_id = cached.last_event_id.saturating_add(1);
while buffer.len() > self.max_buffer_size { if next_event_id < first_buffered {
buffer.pop_front(); return TailAppendSafety::Invalidate;
} }
}
/// Apply event with buffering. let start_idx =
/// match buffer.binary_search_by_key(&next_event_id, |event| event.event.event_id) {
/// This forwards the event to the underlying indexer and records it on success. Ok(idx) => idx,
pub async fn apply_event_with_buffer(&self, event: RouterEvent) -> Result<(), KvRouterError> { Err(insertion_point) => insertion_point,
// Forward to underlying indexer };
let result = self
.indexer let mut tail = Vec::with_capacity(buffer.len().saturating_sub(start_idx));
.event_sender() for event in buffer.iter().skip(start_idx) {
.send(event.clone()) match event.event.data {
.await KvCacheEventData::Stored(_) | KvCacheEventData::Removed(_) => {
.map_err(|_| KvRouterError::IndexerOffline); tail.push(event.clone());
if result.is_ok() { }
self.record_event(event); _ => {
return TailAppendSafety::Invalidate;
}
}
} }
result TailAppendSafety::Safe {
last_event_id: last_buffered,
tail,
}
} }
/// Clear the event buffer. fn recovery_cache_append_budget(&self) -> u64 {
pub fn clear_buffer(&self) { (self.max_buffer_size / 2) as u64
let mut buffer = self.event_buffer.lock().unwrap();
buffer.clear();
} }
/// Get the current buffer size. fn current_buffer_last_event_id(&self) -> Option<u64> {
pub fn buffer_len(&self) -> usize { self.event_buffer
let buffer = self.event_buffer.lock().unwrap(); .lock()
buffer.len() .unwrap()
.back()
.map(|event| event.event.event_id)
}
fn spawn_dump_build(
&self,
build: InFlightRecoveryBuild,
last_event_id: u64,
) -> tokio::task::JoinHandle<BuildTaskResult> {
let indexer = self.indexer.clone();
let recovery_cache = self.recovery_cache.clone();
#[cfg(test)]
let build_delay = *self.dump_build_delay.lock().unwrap();
#[cfg(test)]
self.dump_build_count.fetch_add(1, Ordering::Relaxed);
tokio::spawn(async move {
#[cfg(test)]
if let Some(delay) = build_delay {
tokio::time::sleep(delay).await;
}
let build_output = Self::build_fresh_dump(indexer, last_event_id).await;
let notify = build.notify.clone();
let result = recovery_cache.finish_build(&build, build_output).await;
notify.notify_waiters();
result
})
}
async fn build_fresh_dump(indexer: KvIndexer, last_event_id: u64) -> FreshDumpOutput {
match indexer.dump_events().await {
Ok(events) => FreshDumpOutput {
response: WorkerKvQueryResponse::TreeDump {
events: events.clone(),
last_event_id,
},
snapshot: Some(CachedRecoverySnapshot {
events: Arc::new(events),
base_last_event_id: last_event_id,
last_event_id,
}),
},
Err(error) => {
tracing::warn!("Failed to build recovery dump: {error}");
FreshDumpOutput {
response: WorkerKvQueryResponse::TreeDump {
events: Vec::new(),
last_event_id,
},
snapshot: None,
}
}
}
}
#[cfg(test)]
pub(crate) fn dump_build_count(&self) -> usize {
self.dump_build_count.load(Ordering::Relaxed)
}
#[cfg(test)]
pub(crate) fn set_dump_build_delay(&self, delay: Option<Duration>) {
*self.dump_build_delay.lock().unwrap() = delay;
} }
// Delegation methods to underlying KvIndexer // Delegation methods to underlying KvIndexer
...@@ -236,7 +581,7 @@ impl LocalKvIndexer { ...@@ -236,7 +581,7 @@ impl LocalKvIndexer {
self.indexer.event_sender() self.indexer.event_sender()
} }
/// Get a sender for dump requests (snapshot events). #[cfg(test)]
pub fn snapshot_event_sender(&self) -> mpsc::Sender<DumpRequest> { pub fn snapshot_event_sender(&self) -> mpsc::Sender<DumpRequest> {
self.indexer.snapshot_event_sender() self.indexer.snapshot_event_sender()
} }
...@@ -250,11 +595,6 @@ impl LocalKvIndexer { ...@@ -250,11 +595,6 @@ impl LocalKvIndexer {
pub fn get_workers_sender(&self) -> mpsc::Sender<GetWorkersRequest> { pub fn get_workers_sender(&self) -> mpsc::Sender<GetWorkersRequest> {
self.indexer.get_workers_sender() self.indexer.get_workers_sender()
} }
/// Get the KV block size.
pub fn block_size(&self) -> u32 {
self.indexer.block_size()
}
} }
// Implement KvIndexerInterface by delegating to the underlying indexer // Implement KvIndexerInterface by delegating to the underlying indexer
...@@ -287,6 +627,10 @@ impl KvIndexerInterface for LocalKvIndexer { ...@@ -287,6 +627,10 @@ impl KvIndexerInterface for LocalKvIndexer {
let _ = self.indexer.remove_worker_sender().send(worker).await; let _ = self.indexer.remove_worker_sender().send(worker).await;
} }
async fn remove_worker_dp_rank(&self, worker: WorkerId, dp_rank: DpRank) {
KvIndexerInterface::remove_worker_dp_rank(&self.indexer, worker, dp_rank).await;
}
fn shutdown(&self) { fn shutdown(&self) {
self.indexer.shutdown(); self.indexer.shutdown();
} }
......
...@@ -2094,6 +2094,52 @@ mod local_indexer_tests { ...@@ -2094,6 +2094,52 @@ mod local_indexer_tests {
use super::*; use super::*;
use rstest_reuse::apply; use rstest_reuse::apply;
fn make_local_store_event(event_id: u64, block_hash: u64) -> RouterEvent {
RouterEvent::new(
0,
KvCacheEvent {
event_id,
data: KvCacheEventData::Stored(KvCacheStoreData {
parent_hash: None,
blocks: vec![KvCacheStoredBlockData {
block_hash: ExternalSequenceBlockHash(block_hash),
tokens_hash: LocalBlockHash(block_hash),
mm_extra_info: None,
}],
}),
dp_rank: 0,
},
)
}
fn make_local_remove_event(event_id: u64, block_hashes: &[u64]) -> RouterEvent {
RouterEvent::new(
0,
KvCacheEvent {
event_id,
data: KvCacheEventData::Removed(KvCacheRemoveData {
block_hashes: block_hashes
.iter()
.copied()
.map(ExternalSequenceBlockHash)
.collect(),
}),
dp_rank: 0,
},
)
}
fn make_local_clear_event(event_id: u64) -> RouterEvent {
RouterEvent::new(
0,
KvCacheEvent {
event_id,
data: KvCacheEventData::Cleared,
dp_rank: 0,
},
)
}
#[tokio::test] #[tokio::test]
async fn test_local_indexer_slice_within_range() { async fn test_local_indexer_slice_within_range() {
let indexer = make_local_indexer_with_events(&[1, 2, 3, 4, 5]); let indexer = make_local_indexer_with_events(&[1, 2, 3, 4, 5]);
...@@ -2101,33 +2147,44 @@ mod local_indexer_tests { ...@@ -2101,33 +2147,44 @@ mod local_indexer_tests {
// Helper to extract events from response // Helper to extract events from response
let extract_events = |resp: WorkerKvQueryResponse| -> Vec<RouterEvent> { let extract_events = |resp: WorkerKvQueryResponse| -> Vec<RouterEvent> {
match resp { match resp {
WorkerKvQueryResponse::Events(e) => e, WorkerKvQueryResponse::Events { events: e, .. } => e,
WorkerKvQueryResponse::TreeDump { events: e, .. } => e, WorkerKvQueryResponse::TreeDump { events: e, .. } => e,
_ => panic!("Unexpected response type"), _ => panic!("Unexpected response type"),
} }
}; };
let extract_last_event_id = |resp: &WorkerKvQueryResponse| -> Option<u64> {
match resp {
WorkerKvQueryResponse::Events { last_event_id, .. } => Some(*last_event_id),
WorkerKvQueryResponse::TreeDump { last_event_id, .. } => Some(*last_event_id),
_ => None,
}
};
let get_ids = |events: Vec<RouterEvent>| -> Vec<u64> { let get_ids = |events: Vec<RouterEvent>| -> Vec<u64> {
events.iter().map(|e| e.event.event_id).collect() events.iter().map(|e| e.event.event_id).collect()
}; };
// Test get_events_in_id_range (buffer queries) // Test get_events_in_id_range (buffer queries)
// Range is [start, end] inclusive // Buffer hits now return the contiguous suffix through the buffered tail.
let result = indexer.get_events_in_id_range(Some(2), Some(4)).await; let result = indexer.get_events_in_id_range(Some(2), Some(4)).await;
let ids = get_ids(extract_events(result)); let ids = get_ids(extract_events(result.clone()));
assert_eq!(ids, vec![2, 3, 4]); // inclusive range [2, 4] assert_eq!(ids, vec![2, 3, 4, 5]);
assert_eq!(extract_last_event_id(&result), Some(5));
let result = indexer.get_events_in_id_range(Some(2), Some(6)).await; let result = indexer.get_events_in_id_range(Some(2), Some(6)).await;
let ids = get_ids(extract_events(result)); let ids = get_ids(extract_events(result.clone()));
assert_eq!(ids, vec![2, 3, 4, 5]); // clamp end to buffer max assert_eq!(ids, vec![2, 3, 4, 5]); // clamp end to buffer max
assert_eq!(extract_last_event_id(&result), Some(5));
// start_id=0 is before buffer (first is 1), so should trigger tree dump // start_id=0 is before buffer (first is 1), so should trigger tree dump
let result = indexer.get_events_in_id_range(Some(0), Some(4)).await; let result = indexer.get_events_in_id_range(Some(0), Some(4)).await;
assert!(matches!(result, WorkerKvQueryResponse::TreeDump { .. })); assert!(matches!(result, WorkerKvQueryResponse::TreeDump { .. }));
let result = indexer.get_events_in_id_range(Some(3), Some(3)).await; let result = indexer.get_events_in_id_range(Some(3), Some(3)).await;
let ids = get_ids(extract_events(result)); let ids = get_ids(extract_events(result.clone()));
assert_eq!(ids, vec![3]); // single element when start == end assert_eq!(ids, vec![3, 4, 5]);
assert_eq!(extract_last_event_id(&result), Some(5));
// Invalid range: end < start // Invalid range: end < start
let result = indexer.get_events_in_id_range(Some(5), Some(2)).await; let result = indexer.get_events_in_id_range(Some(5), Some(2)).await;
...@@ -2176,12 +2233,20 @@ mod local_indexer_tests { ...@@ -2176,12 +2233,20 @@ mod local_indexer_tests {
let extract_events = |resp: WorkerKvQueryResponse| -> Vec<RouterEvent> { let extract_events = |resp: WorkerKvQueryResponse| -> Vec<RouterEvent> {
match resp { match resp {
WorkerKvQueryResponse::Events(e) => e, WorkerKvQueryResponse::Events { events: e, .. } => e,
WorkerKvQueryResponse::TreeDump { events: e, .. } => e, WorkerKvQueryResponse::TreeDump { events: e, .. } => e,
_ => panic!("Unexpected response type: {:?}", resp), _ => panic!("Unexpected response type: {:?}", resp),
} }
}; };
let extract_last_event_id = |resp: &WorkerKvQueryResponse| -> Option<u64> {
match resp {
WorkerKvQueryResponse::Events { last_event_id, .. } => Some(*last_event_id),
WorkerKvQueryResponse::TreeDump { last_event_id, .. } => Some(*last_event_id),
_ => None,
}
};
let get_ids = |events: Vec<RouterEvent>| -> Vec<u64> { let get_ids = |events: Vec<RouterEvent>| -> Vec<u64> {
events.iter().map(|e| e.event.event_id).collect() events.iter().map(|e| e.event.event_id).collect()
}; };
...@@ -2192,10 +2257,25 @@ mod local_indexer_tests { ...@@ -2192,10 +2257,25 @@ mod local_indexer_tests {
// Buffer path tests // Buffer path tests
let result = indexer.get_events_in_id_range(Some(11), None).await; let result = indexer.get_events_in_id_range(Some(11), None).await;
assert_eq!(get_ids(extract_events(result)), vec![11, 12, 13, 14]); assert_eq!(
get_ids(extract_events(result.clone())),
vec![11, 12, 13, 14]
);
assert_eq!(extract_last_event_id(&result), Some(14));
let result = indexer.get_events_in_id_range(Some(10), Some(14)).await; let result = indexer.get_events_in_id_range(Some(10), Some(14)).await;
assert_eq!(get_ids(extract_events(result)), vec![10, 11, 12, 13, 14]); assert_eq!(
get_ids(extract_events(result.clone())),
vec![10, 11, 12, 13, 14]
);
assert_eq!(extract_last_event_id(&result), Some(14));
let result = indexer.get_events_in_id_range(Some(11), Some(12)).await;
assert_eq!(
get_ids(extract_events(result.clone())),
vec![11, 12, 13, 14]
);
assert_eq!(extract_last_event_id(&result), Some(14));
// Tree dump path tests // Tree dump path tests
let result = indexer.get_events_in_id_range(None, None).await; let result = indexer.get_events_in_id_range(None, None).await;
...@@ -2340,16 +2420,23 @@ mod local_indexer_tests { ...@@ -2340,16 +2420,23 @@ mod local_indexer_tests {
assert_eq!(buffered_events[0].worker_id, worker_id); assert_eq!(buffered_events[0].worker_id, worker_id);
// Test serialization round-trip // Test serialization round-trip
let response = WorkerKvQueryResponse::Events(buffered_events); let response = WorkerKvQueryResponse::Events {
events: buffered_events,
last_event_id: 1,
};
let serialized = serde_json::to_vec(&response).unwrap(); let serialized = serde_json::to_vec(&response).unwrap();
let deserialized: WorkerKvQueryResponse = serde_json::from_slice(&serialized).unwrap(); let deserialized: WorkerKvQueryResponse = serde_json::from_slice(&serialized).unwrap();
let events = match deserialized { let (events, last_event_id) = match deserialized {
WorkerKvQueryResponse::Events(e) => e, WorkerKvQueryResponse::Events {
events,
last_event_id,
} => (events, last_event_id),
_ => panic!("Expected Events variant"), _ => panic!("Expected Events variant"),
}; };
assert_eq!(events.len(), 1); assert_eq!(events.len(), 1);
assert_eq!(events[0].worker_id, worker_id); assert_eq!(events[0].worker_id, worker_id);
assert_eq!(last_event_id, 1);
} }
#[tokio::test] #[tokio::test]
...@@ -2397,6 +2484,281 @@ mod local_indexer_tests { ...@@ -2397,6 +2484,281 @@ mod local_indexer_tests {
} }
} }
#[tokio::test]
async fn test_local_indexer_remove_worker_dp_rank_only_clears_target_rank() {
let local_indexer = LocalKvIndexer::new(
CancellationToken::new(),
4,
Arc::new(KvIndexerMetrics::new_unregistered()),
5,
);
local_indexer
.apply_event_with_buffer(make_store_event_with_dp_rank(7, &[101], 0))
.await
.unwrap();
local_indexer
.apply_event_with_buffer(make_store_event_with_dp_rank(7, &[202], 1))
.await
.unwrap();
local_indexer.flush().await;
local_indexer.remove_worker_dp_rank(7, 0).await;
local_indexer.flush().await;
let events = local_indexer.dump_events().await.unwrap();
let mut rank0 = events
.iter()
.filter(|event| event.worker_id == 7 && event.event.dp_rank == 0)
.collect::<Vec<_>>();
let mut rank1 = events
.iter()
.filter(|event| event.worker_id == 7 && event.event.dp_rank == 1)
.collect::<Vec<_>>();
rank0.sort_by_key(|event| event.event.event_id);
rank1.sort_by_key(|event| event.event.event_id);
assert!(rank0.is_empty());
assert_eq!(rank1.len(), 1);
assert!(matches!(
&rank1[0].event.data,
KvCacheEventData::Stored(data)
if data.blocks.first().map(|block| block.block_hash.0) == Some(202)
));
}
#[tokio::test]
async fn test_local_indexer_coalesces_concurrent_tree_dumps() {
let indexer = Arc::new(LocalKvIndexer::new(
CancellationToken::new(),
4,
Arc::new(KvIndexerMetrics::new_unregistered()),
5,
));
indexer.set_dump_build_delay(Some(Duration::from_millis(50)));
let first = {
let indexer = indexer.clone();
tokio::spawn(async move { indexer.get_events_in_id_range(None, None).await })
};
tokio::time::sleep(Duration::from_millis(10)).await;
let second = {
let indexer = indexer.clone();
tokio::spawn(async move { indexer.get_events_in_id_range(None, None).await })
};
let first = first.await.unwrap();
let second = second.await.unwrap();
assert!(matches!(first, WorkerKvQueryResponse::TreeDump { .. }));
assert!(matches!(second, WorkerKvQueryResponse::TreeDump { .. }));
assert_eq!(indexer.dump_build_count(), 1);
}
#[tokio::test(start_paused = true)]
async fn test_local_indexer_reuses_cached_tree_dump_without_time_expiry() {
let indexer = LocalKvIndexer::new(
CancellationToken::new(),
4,
Arc::new(KvIndexerMetrics::new_unregistered()),
5,
);
indexer
.apply_event_with_buffer(make_local_store_event(1, 101))
.await
.unwrap();
indexer.flush().await;
let first = indexer.get_events_in_id_range(None, None).await;
time::advance(Duration::from_secs(60)).await;
let second = indexer.get_events_in_id_range(None, None).await;
assert!(matches!(first, WorkerKvQueryResponse::TreeDump { .. }));
assert!(matches!(second, WorkerKvQueryResponse::TreeDump { .. }));
assert_eq!(indexer.dump_build_count(), 1);
}
#[tokio::test]
async fn test_local_indexer_rebuilds_when_cumulative_append_budget_exceeded() {
let indexer = LocalKvIndexer::new(
CancellationToken::new(),
4,
Arc::new(KvIndexerMetrics::new_unregistered()),
5,
);
indexer
.apply_event_with_buffer(make_local_store_event(1, 101))
.await
.unwrap();
indexer.flush().await;
let _ = indexer.get_events_in_id_range(None, None).await;
assert_eq!(indexer.dump_build_count(), 1);
indexer
.apply_event_with_buffer(make_local_store_event(2, 202))
.await
.unwrap();
let _ = indexer.get_events_in_id_range(None, None).await;
assert_eq!(indexer.dump_build_count(), 1);
indexer
.apply_event_with_buffer(make_local_store_event(3, 303))
.await
.unwrap();
let _ = indexer.get_events_in_id_range(None, None).await;
assert_eq!(indexer.dump_build_count(), 1);
indexer
.apply_event_with_buffer(make_local_store_event(4, 404))
.await
.unwrap();
let _ = indexer.get_events_in_id_range(None, None).await;
assert_eq!(indexer.dump_build_count(), 2);
}
#[tokio::test]
async fn test_local_indexer_appends_safe_tail_to_cached_dump() {
let indexer = LocalKvIndexer::new(
CancellationToken::new(),
4,
Arc::new(KvIndexerMetrics::new_unregistered()),
5,
);
indexer
.apply_event_with_buffer(make_local_store_event(1, 101))
.await
.unwrap();
indexer.flush().await;
let first = indexer.get_events_in_id_range(None, None).await;
assert!(matches!(first, WorkerKvQueryResponse::TreeDump { .. }));
assert_eq!(indexer.dump_build_count(), 1);
indexer
.apply_event_with_buffer(make_local_remove_event(2, &[101]))
.await
.unwrap();
match indexer.get_events_in_id_range(None, None).await {
WorkerKvQueryResponse::TreeDump {
events,
last_event_id,
} => {
assert_eq!(last_event_id, 2);
assert!(events.iter().any(|event| event.event.event_id == 2));
assert!(
events
.iter()
.any(|event| matches!(event.event.data, KvCacheEventData::Removed(_)))
);
}
other => panic!("Expected TreeDump, got: {other:?}"),
}
assert_eq!(indexer.dump_build_count(), 1);
}
#[tokio::test]
async fn test_local_indexer_invalidates_cache_on_clear() {
let indexer = LocalKvIndexer::new(
CancellationToken::new(),
4,
Arc::new(KvIndexerMetrics::new_unregistered()),
5,
);
indexer
.apply_event_with_buffer(make_local_store_event(1, 101))
.await
.unwrap();
indexer.flush().await;
let _ = indexer.get_events_in_id_range(None, None).await;
assert_eq!(indexer.dump_build_count(), 1);
indexer
.apply_event_with_buffer(make_local_clear_event(2))
.await
.unwrap();
let _ = indexer.get_events_in_id_range(None, None).await;
assert_eq!(indexer.dump_build_count(), 2);
}
#[tokio::test]
async fn test_local_indexer_invalidates_cache_on_event_gap() {
let indexer = LocalKvIndexer::new(
CancellationToken::new(),
4,
Arc::new(KvIndexerMetrics::new_unregistered()),
5,
);
indexer
.apply_event_with_buffer(make_local_store_event(1, 101))
.await
.unwrap();
indexer.flush().await;
let _ = indexer.get_events_in_id_range(None, None).await;
assert_eq!(indexer.dump_build_count(), 1);
indexer
.apply_event_with_buffer(make_local_store_event(3, 303))
.await
.unwrap();
let _ = indexer.get_events_in_id_range(None, None).await;
assert_eq!(indexer.dump_build_count(), 2);
}
#[tokio::test]
async fn test_local_indexer_invalidates_cache_on_missing_tail_coverage() {
let indexer = LocalKvIndexer::new(
CancellationToken::new(),
4,
Arc::new(KvIndexerMetrics::new_unregistered()),
1,
);
indexer
.apply_event_with_buffer(make_local_store_event(1, 101))
.await
.unwrap();
indexer.flush().await;
let _ = indexer.get_events_in_id_range(None, None).await;
assert_eq!(indexer.dump_build_count(), 1);
indexer
.apply_event_with_buffer(make_local_store_event(2, 202))
.await
.unwrap();
indexer
.apply_event_with_buffer(make_local_store_event(3, 303))
.await
.unwrap();
let _ = indexer.get_events_in_id_range(None, None).await;
assert_eq!(indexer.dump_build_count(), 2);
}
#[tokio::test]
async fn test_local_indexer_failed_dump_is_not_cached() {
let indexer = LocalKvIndexer::new(
CancellationToken::new(),
4,
Arc::new(KvIndexerMetrics::new_unregistered()),
5,
);
let dump_tx = indexer.snapshot_event_sender();
indexer.shutdown();
dump_tx.closed().await;
let _ = indexer.get_events_in_id_range(None, None).await;
let _ = indexer.get_events_in_id_range(None, None).await;
assert_eq!(indexer.dump_build_count(), 2);
}
#[tokio::test] #[tokio::test]
#[apply(indexer_template)] #[apply(indexer_template)]
async fn test_apply_events_idempotent(variant: &str) { async fn test_apply_events_idempotent(variant: &str) {
......
...@@ -47,15 +47,23 @@ pub struct WorkerKvQueryRequest { ...@@ -47,15 +47,23 @@ pub struct WorkerKvQueryRequest {
/// Start event ID (inclusive). If `None`, dumps entire tree. /// Start event ID (inclusive). If `None`, dumps entire tree.
pub start_event_id: Option<u64>, pub start_event_id: Option<u64>,
/// End event ID (inclusive). If `None`, returns up to newest available. /// End event ID (inclusive). Used for validation and `TooNew` responses.
/// Successful buffer-backed recovery may still return through the current
/// newest buffered event.
pub end_event_id: Option<u64>, pub end_event_id: Option<u64>,
} }
/// Response from a worker's local KV indexer. /// Response from a worker's local KV indexer.
#[derive(Serialize, Deserialize, Debug, Clone)] #[derive(Serialize, Deserialize, Debug, Clone)]
pub enum WorkerKvQueryResponse { pub enum WorkerKvQueryResponse {
/// Events served from the circular buffer (with original event IDs) /// Events served from the circular buffer (with original event IDs),
Events(Vec<RouterEvent>), /// always covering the requested `start_event_id` through the current
/// buffered tail. `last_event_id` is taken from the same buffer snapshot
/// and should be used as the recovery watermark after applying the batch.
Events {
events: Vec<RouterEvent>,
last_event_id: u64,
},
/// Full tree dump (with synthetic 0-indexed event IDs). /// Full tree dump (with synthetic 0-indexed event IDs).
/// Includes `last_event_id`: the newest real event ID in the worker's buffer /// Includes `last_event_id`: the newest real event ID in the worker's buffer
/// at the time of the dump, so the caller can set its tracking cursor correctly. /// at the time of the dump, so the caller can set its tracking cursor correctly.
......
...@@ -11,7 +11,8 @@ use dynamo_kv_router::{ ...@@ -11,7 +11,8 @@ use dynamo_kv_router::{
config::KvRouterConfig, config::KvRouterConfig,
indexer::{KvIndexer, KvIndexerInterface, KvIndexerMetrics, KvRouterError}, indexer::{KvIndexer, KvIndexerInterface, KvIndexerMetrics, KvRouterError},
protocols::{ protocols::{
LocalBlockHash, OverlapScores, RouterEvent, TokensWithHashes, WorkerId, WorkerWithDpRank, DpRank, LocalBlockHash, OverlapScores, RouterEvent, TokensWithHashes, WorkerId,
WorkerWithDpRank,
}, },
}; };
use dynamo_runtime::{component::Component, traits::DistributedRuntimeProvider}; use dynamo_runtime::{component::Component, traits::DistributedRuntimeProvider};
...@@ -207,6 +208,18 @@ impl Indexer { ...@@ -207,6 +208,18 @@ impl Indexer {
} }
} }
pub(crate) async fn remove_worker_dp_rank(&self, worker_id: WorkerId, dp_rank: DpRank) {
match self {
Self::KvIndexer(indexer) => {
KvIndexerInterface::remove_worker_dp_rank(indexer, worker_id, dp_rank).await;
}
Self::Concurrent(tpi) => {
KvIndexerInterface::remove_worker_dp_rank(tpi.as_ref(), worker_id, dp_rank).await;
}
Self::Remote(_) | Self::None => {}
}
}
pub(crate) async fn get_workers(&self) -> Vec<WorkerId> { pub(crate) async fn get_workers(&self) -> Vec<WorkerId> {
match self { match self {
Self::KvIndexer(indexer) => { Self::KvIndexer(indexer) => {
......
...@@ -352,6 +352,18 @@ impl WorkerQueryClient { ...@@ -352,6 +352,18 @@ impl WorkerQueryClient {
self.indexer.apply_event(event).await; self.indexer.apply_event(event).await;
} }
async fn apply_tree_dump_replace_locked(
&self,
worker_id: WorkerId,
dp_rank: DpRank,
events: Vec<RouterEvent>,
) {
self.indexer.remove_worker_dp_rank(worker_id, dp_rank).await;
for event in events {
self.indexer.apply_event(event).await;
}
}
pub(crate) async fn handle_live_event(self: &Arc<Self>, event: RouterEvent) { pub(crate) async fn handle_live_event(self: &Arc<Self>, event: RouterEvent) {
let worker_id = event.worker_id; let worker_id = event.worker_id;
let dp_rank = event.event.dp_rank; let dp_rank = event.event.dp_rank;
...@@ -486,7 +498,10 @@ impl WorkerQueryClient { ...@@ -486,7 +498,10 @@ impl WorkerQueryClient {
let mut saw_clear = false; let mut saw_clear = false;
match result { match result {
Ok(WorkerKvQueryResponse::Events(events)) => { Ok(WorkerKvQueryResponse::Events {
events,
last_event_id,
}) => {
tracing::debug!( tracing::debug!(
"Got {count} buffered events from worker {} dp_rank {}", "Got {count} buffered events from worker {} dp_rank {}",
key.0, key.0,
...@@ -505,6 +520,7 @@ impl WorkerQueryClient { ...@@ -505,6 +520,7 @@ impl WorkerQueryClient {
self.indexer.apply_event(event).await; self.indexer.apply_event(event).await;
new_cursor = new_cursor.advance_to(event_id); new_cursor = new_cursor.advance_to(event_id);
} }
new_cursor = new_cursor.advance_to(last_event_id);
successful_response = true; successful_response = true;
} }
Ok(WorkerKvQueryResponse::TreeDump { Ok(WorkerKvQueryResponse::TreeDump {
...@@ -518,9 +534,8 @@ impl WorkerQueryClient { ...@@ -518,9 +534,8 @@ impl WorkerQueryClient {
events.len(), events.len(),
last_event_id last_event_id
); );
for event in events { self.apply_tree_dump_replace_locked(key.0, key.1, events)
self.indexer.apply_event(event).await; .await;
}
new_cursor = new_cursor.advance_to(last_event_id); new_cursor = new_cursor.advance_to(last_event_id);
successful_response = true; successful_response = true;
} }
...@@ -889,6 +904,25 @@ mod tests { ...@@ -889,6 +904,25 @@ mod tests {
hashes hashes
} }
fn stored_block_hashes_for(
events: &[RouterEvent],
worker_id: WorkerId,
dp_rank: DpRank,
) -> Vec<u64> {
let mut hashes = events
.iter()
.filter(|event| event.worker_id == worker_id && event.event.dp_rank == dp_rank)
.filter_map(|event| match &event.event.data {
KvCacheEventData::Stored(data) => {
data.blocks.first().map(|block| block.block_hash.0)
}
_ => None,
})
.collect::<Vec<_>>();
hashes.sort_unstable();
hashes
}
async fn wait_for<F>(mut check: F) async fn wait_for<F>(mut check: F)
where where
F: FnMut() -> bool, F: FnMut() -> bool,
...@@ -958,9 +992,13 @@ mod tests { ...@@ -958,9 +992,13 @@ mod tests {
.expect("response stream should yield one item"); .expect("response stream should yield one item");
match response { match response {
WorkerKvQueryResponse::Events(events) => { WorkerKvQueryResponse::Events {
events,
last_event_id,
} => {
assert_eq!(events.len(), 1); assert_eq!(events.len(), 1);
assert_eq!(events[0].event.event_id, 1); assert_eq!(events[0].event.event_id, 1);
assert_eq!(last_event_id, 1);
} }
other => panic!("Unexpected response: {other:?}"), other => panic!("Unexpected response: {other:?}"),
} }
...@@ -1022,9 +1060,10 @@ mod tests { ...@@ -1022,9 +1060,10 @@ mod tests {
MockQueryAction { MockQueryAction {
started: Some(first_started.clone()), started: Some(first_started.clone()),
release: Some(first_release.clone()), release: Some(first_release.clone()),
response: Ok(WorkerKvQueryResponse::Events( response: Ok(WorkerKvQueryResponse::Events {
(11..=15).map(|id| make_store_event(1, 0, id)).collect(), events: (11..=15).map(|id| make_store_event(1, 0, id)).collect(),
)), last_event_id: 15,
}),
}, },
); );
transport.push_action( transport.push_action(
...@@ -1032,9 +1071,10 @@ mod tests { ...@@ -1032,9 +1071,10 @@ mod tests {
MockQueryAction { MockQueryAction {
started: None, started: None,
release: None, release: None,
response: Ok(WorkerKvQueryResponse::Events( response: Ok(WorkerKvQueryResponse::Events {
(16..=18).map(|id| make_store_event(1, 0, id)).collect(), events: (16..=18).map(|id| make_store_event(1, 0, id)).collect(),
)), last_event_id: 18,
}),
}, },
); );
...@@ -1085,10 +1125,10 @@ mod tests { ...@@ -1085,10 +1125,10 @@ mod tests {
MockQueryAction { MockQueryAction {
started: None, started: None,
release: None, release: None,
response: Ok(WorkerKvQueryResponse::Events(vec![ response: Ok(WorkerKvQueryResponse::Events {
make_store_event(1, 0, 12), events: vec![make_store_event(1, 0, 12), make_store_event(1, 0, 13)],
make_store_event(1, 0, 13), last_event_id: 13,
])), }),
}, },
); );
...@@ -1124,6 +1164,141 @@ mod tests { ...@@ -1124,6 +1164,141 @@ mod tests {
assert_eq!(stored_block_hashes(&events), vec![11, 12, 13]); assert_eq!(stored_block_hashes(&events), vec![11, 12, 13]);
} }
#[tokio::test]
async fn test_initial_restore_tree_dump_with_safe_tail_advances_cursor() {
let (client, transport, kv_indexer) = make_test_client("initial-restore-safe-tail").await;
let key = (1, 0);
transport.push_action(
key,
MockQueryAction {
started: None,
release: None,
response: Ok(WorkerKvQueryResponse::TreeDump {
events: vec![make_store_event(1, 0, 0), make_store_event(1, 0, 11)],
last_event_id: 11,
}),
},
);
client.handle_discovered_worker(1, 0).await;
wait_for(|| {
rank_state_matches(&client, key, |state| {
state.last_applied_id() == Some(11) && !state.recovery_inflight
})
})
.await;
kv_indexer.flush().await;
let events = kv_indexer.dump_events().await.unwrap();
assert_eq!(stored_block_hashes(&events), vec![0, 11]);
assert_eq!(transport.call_count(), 1);
}
#[tokio::test]
async fn test_tree_dump_replaces_stale_state_for_recovered_rank() {
let (client, transport, kv_indexer) = make_test_client("tree-dump-replaces-rank").await;
let key = (1, 0);
kv_indexer.apply_event(make_store_event(1, 0, 90)).await;
kv_indexer.apply_event(make_store_event(1, 0, 91)).await;
kv_indexer.flush().await;
transport.push_action(
key,
MockQueryAction {
started: None,
release: None,
response: Ok(WorkerKvQueryResponse::TreeDump {
events: vec![make_store_event(1, 0, 11)],
last_event_id: 11,
}),
},
);
client.handle_discovered_worker(1, 0).await;
wait_for(|| {
rank_state_matches(&client, key, |state| {
state.last_applied_id() == Some(11) && !state.recovery_inflight
})
})
.await;
kv_indexer.flush().await;
let events = kv_indexer.dump_events().await.unwrap();
assert_eq!(stored_block_hashes_for(&events, 1, 0), vec![11]);
}
#[tokio::test]
async fn test_tree_dump_recovery_does_not_clear_other_dp_ranks() {
let (client, transport, kv_indexer) = make_test_client("tree-dump-preserves-sibling").await;
let key = (1, 0);
kv_indexer.apply_event(make_store_event(1, 0, 90)).await;
kv_indexer.apply_event(make_store_event(1, 1, 77)).await;
kv_indexer.flush().await;
transport.push_action(
key,
MockQueryAction {
started: None,
release: None,
response: Ok(WorkerKvQueryResponse::TreeDump {
events: vec![make_store_event(1, 0, 11)],
last_event_id: 11,
}),
},
);
client.handle_discovered_worker(1, 0).await;
wait_for(|| {
rank_state_matches(&client, key, |state| {
state.last_applied_id() == Some(11) && !state.recovery_inflight
})
})
.await;
kv_indexer.flush().await;
let events = kv_indexer.dump_events().await.unwrap();
assert_eq!(stored_block_hashes_for(&events, 1, 0), vec![11]);
assert_eq!(stored_block_hashes_for(&events, 1, 1), vec![77]);
}
#[tokio::test]
async fn test_empty_tree_dump_clears_only_recovered_rank() {
let (client, transport, kv_indexer) = make_test_client("tree-dump-empty-clears-rank").await;
let key = (1, 0);
kv_indexer.apply_event(make_store_event(1, 0, 90)).await;
kv_indexer.apply_event(make_store_event(1, 1, 77)).await;
kv_indexer.flush().await;
transport.push_action(
key,
MockQueryAction {
started: None,
release: None,
response: Ok(WorkerKvQueryResponse::TreeDump {
events: vec![],
last_event_id: 11,
}),
},
);
client.handle_discovered_worker(1, 0).await;
wait_for(|| {
rank_state_matches(&client, key, |state| {
state.last_applied_id() == Some(11) && !state.recovery_inflight
})
})
.await;
kv_indexer.flush().await;
let events = kv_indexer.dump_events().await.unwrap();
assert!(stored_block_hashes_for(&events, 1, 0).is_empty());
assert_eq!(stored_block_hashes_for(&events, 1, 1), vec![77]);
}
#[tokio::test] #[tokio::test]
async fn test_live_event_for_other_worker_is_not_blocked_by_inflight_recovery() { async fn test_live_event_for_other_worker_is_not_blocked_by_inflight_recovery() {
let (client, transport, kv_indexer) = make_test_client("live-concurrency").await; let (client, transport, kv_indexer) = make_test_client("live-concurrency").await;
...@@ -1148,11 +1323,14 @@ mod tests { ...@@ -1148,11 +1323,14 @@ mod tests {
MockQueryAction { MockQueryAction {
started: Some(started.clone()), started: Some(started.clone()),
release: Some(release.clone()), release: Some(release.clone()),
response: Ok(WorkerKvQueryResponse::Events(vec![ response: Ok(WorkerKvQueryResponse::Events {
make_store_event(1, 0, 11), events: vec![
make_store_event(1, 0, 12), make_store_event(1, 0, 11),
make_store_event(1, 0, 13), make_store_event(1, 0, 12),
])), make_store_event(1, 0, 13),
],
last_event_id: 13,
}),
}, },
); );
...@@ -1193,10 +1371,10 @@ mod tests { ...@@ -1193,10 +1371,10 @@ mod tests {
MockQueryAction { MockQueryAction {
started: Some(started.clone()), started: Some(started.clone()),
release: Some(release.clone()), release: Some(release.clone()),
response: Ok(WorkerKvQueryResponse::Events(vec![ response: Ok(WorkerKvQueryResponse::Events {
make_store_event(1, 0, 11), events: vec![make_store_event(1, 0, 11), make_store_event(1, 0, 12)],
make_store_event(1, 0, 12), last_event_id: 12,
])), }),
}, },
); );
...@@ -1231,11 +1409,14 @@ mod tests { ...@@ -1231,11 +1409,14 @@ mod tests {
MockQueryAction { MockQueryAction {
started: Some(started.clone()), started: Some(started.clone()),
release: Some(release.clone()), release: Some(release.clone()),
response: Ok(WorkerKvQueryResponse::Events(vec![ response: Ok(WorkerKvQueryResponse::Events {
make_store_event(1, 0, 11), events: vec![
make_store_event(1, 0, 12), make_store_event(1, 0, 11),
make_store_event(1, 0, 13), make_store_event(1, 0, 12),
])), make_store_event(1, 0, 13),
],
last_event_id: 13,
}),
}, },
); );
client.handle_live_event(make_store_event(1, 0, 13)).await; client.handle_live_event(make_store_event(1, 0, 13)).await;
...@@ -1285,11 +1466,14 @@ mod tests { ...@@ -1285,11 +1466,14 @@ mod tests {
MockQueryAction { MockQueryAction {
started: None, started: None,
release: None, release: None,
response: Ok(WorkerKvQueryResponse::Events(vec![ response: Ok(WorkerKvQueryResponse::Events {
make_store_event(1, 0, 11), events: vec![
make_clear_event(1, 0, 12), make_store_event(1, 0, 11),
make_store_event(1, 0, 13), make_clear_event(1, 0, 12),
])), make_store_event(1, 0, 13),
],
last_event_id: 13,
}),
}, },
); );
......
...@@ -502,7 +502,9 @@ mod tests_startup_helpers { ...@@ -502,7 +502,9 @@ mod tests_startup_helpers {
use super::*; use super::*;
use crate::utils::zmq::{bind_pub_socket, send_multipart}; use crate::utils::zmq::{bind_pub_socket, send_multipart};
use bytes::Bytes; use bytes::Bytes;
use dynamo_kv_router::indexer::{GetWorkersRequest, KvIndexer, KvIndexerInterface}; use dynamo_kv_router::indexer::{
GetWorkersRequest, KvIndexer, KvIndexerInterface, WorkerKvQueryResponse,
};
use dynamo_kv_router::protocols::{ExternalSequenceBlockHash, LocalBlockHash}; use dynamo_kv_router::protocols::{ExternalSequenceBlockHash, LocalBlockHash};
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
...@@ -1152,8 +1154,12 @@ mod tests_startup_helpers { ...@@ -1152,8 +1154,12 @@ mod tests_startup_helpers {
); );
// assert: Worker's local indexer buffered event // assert: Worker's local indexer buffered event
let buffered = local_indexer_1.get_all_events_in_buffer(); match local_indexer_1.get_events_in_id_range(Some(1), None).await {
assert_eq!(buffered.len(), 1, "Local indexer should buffer 1 event"); WorkerKvQueryResponse::Events { events, .. } => {
assert_eq!(events.len(), 1, "Local indexer should buffer 1 event");
}
other => panic!("Expected buffered events, got {other:?}"),
}
// === STEP 2 & 3: Simulate Outage - Stop forwarding to router === // === STEP 2 & 3: Simulate Outage - Stop forwarding to router ===
let event_2 = KvCacheEvent { let event_2 = KvCacheEvent {
...@@ -1192,12 +1198,16 @@ mod tests_startup_helpers { ...@@ -1192,12 +1198,16 @@ mod tests_startup_helpers {
} }
// assert: Worker's local indexer has both events // assert: Worker's local indexer has both events
let buffered = local_indexer_1.get_all_events_in_buffer(); match local_indexer_1.get_events_in_id_range(Some(1), None).await {
assert_eq!( WorkerKvQueryResponse::Events { events, .. } => {
buffered.len(), assert_eq!(
2, events.len(),
"Local indexer should have both events during outage" 2,
); "Local indexer should have both events during outage"
);
}
other => panic!("Expected buffered events, got {other:?}"),
}
// assert: Router DOESN'T have event_2 // assert: Router DOESN'T have event_2
let block_hashes_2 = vec![LocalBlockHash(200), LocalBlockHash(202)]; let block_hashes_2 = vec![LocalBlockHash(200), LocalBlockHash(202)];
...@@ -1223,7 +1233,7 @@ mod tests_startup_helpers { ...@@ -1223,7 +1233,7 @@ mod tests_startup_helpers {
.get_events_in_id_range(Some(last_known_id + 1), None) .get_events_in_id_range(Some(last_known_id + 1), None)
.await; .await;
let missed_events = match response { let missed_events = match response {
dynamo_kv_router::indexer::WorkerKvQueryResponse::Events(e) => e, dynamo_kv_router::indexer::WorkerKvQueryResponse::Events { events: e, .. } => e,
dynamo_kv_router::indexer::WorkerKvQueryResponse::TreeDump { events: e, .. } => e, dynamo_kv_router::indexer::WorkerKvQueryResponse::TreeDump { events: e, .. } => e,
dynamo_kv_router::indexer::WorkerKvQueryResponse::Error(message) => { dynamo_kv_router::indexer::WorkerKvQueryResponse::Error(message) => {
panic!("Unexpected error response: {message}") panic!("Unexpected error response: {message}")
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment