Unverified Commit 4f2cbec0 authored by Vladislav Nosivskoy's avatar Vladislav Nosivskoy Committed by GitHub
Browse files

fix: guard KV-indexer against self-referential blocks (#4395)


Signed-off-by: default avatarVladislav Nosivskoy <vladnosiv@gmail.com>
Signed-off-by: default avatarPeaBrane <yanrpei@gmail.com>
Co-authored-by: default avatarPeaBrane <yanrpei@gmail.com>
parent c65f1ac0
...@@ -81,6 +81,9 @@ pub enum KvCacheEventError { ...@@ -81,6 +81,9 @@ pub enum KvCacheEventError {
#[error("Failed to find block")] #[error("Failed to find block")]
BlockNotFound, BlockNotFound,
#[error("Invalid block sequence")]
InvalidBlockSequence,
} }
/// A shared reference to a [`RadixBlock`]. /// A shared reference to a [`RadixBlock`].
...@@ -238,8 +241,6 @@ pub struct RadixTree { ...@@ -238,8 +241,6 @@ pub struct RadixTree {
lookup: HashMap<WorkerWithDpRank, HashMap<ExternalSequenceBlockHash, SharedRadixBlock>>, lookup: HashMap<WorkerWithDpRank, HashMap<ExternalSequenceBlockHash, SharedRadixBlock>>,
/// The time buffer the radix tree should check when considering frequence of block accesses /// The time buffer the radix tree should check when considering frequence of block accesses
expiration_duration: Option<Duration>, expiration_duration: Option<Duration>,
/// The tree current size.
current_size: usize,
} }
impl Default for RadixTree { impl Default for RadixTree {
...@@ -259,7 +260,6 @@ impl RadixTree { ...@@ -259,7 +260,6 @@ impl RadixTree {
root: Rc::new(RefCell::new(RadixBlock::new())), root: Rc::new(RefCell::new(RadixBlock::new())),
lookup: HashMap::new(), lookup: HashMap::new(),
expiration_duration, expiration_duration,
current_size: 0,
} }
} }
...@@ -361,11 +361,7 @@ impl RadixTree { ...@@ -361,11 +361,7 @@ impl RadixTree {
// we check the radix tree's root to find it. // we check the radix tree's root to find it.
// this is the single most expensive lookup // this is the single most expensive lookup
let current = match op.parent_hash { let current = match op.parent_hash {
Some(parent) => worker_lookup.get(&parent), Some(parent) => match worker_lookup.get(&parent) {
None => Some(&self.root),
};
let mut current = match current {
Some(current) => current.clone(), Some(current) => current.clone(),
None => { None => {
tracing::warn!( tracing::warn!(
...@@ -378,46 +374,72 @@ impl RadixTree { ...@@ -378,46 +374,72 @@ impl RadixTree {
); );
return Err(KvCacheEventError::ParentBlockNotFound); return Err(KvCacheEventError::ParentBlockNotFound);
} }
},
None => self.root.clone(),
}; };
for block_id in op.blocks { fn process_blocks(
let mut inner = current.borrow_mut(); parent: SharedRadixBlock,
let block = match inner.children.get(&block_id.tokens_hash) { blocks: &[KvCacheStoredBlockData],
worker: WorkerWithDpRank,
worker_lookup: &mut HashMap<ExternalSequenceBlockHash, SharedRadixBlock>,
id: u64,
) -> Result<(), KvCacheEventError> {
if blocks.is_empty() {
return Ok(());
}
let mut parent_mut = parent.borrow_mut();
let block_data = &blocks[0];
let child = match parent_mut.children.get(&block_data.tokens_hash) {
Some(block) => block.clone(), Some(block) => block.clone(),
None => { None => {
// create new block - automatically added to the lookup table // create new block - automatically added to the lookup table
let new_block = worker_lookup let new_block = worker_lookup
.get(&block_id.block_hash) .get(&block_data.block_hash)
.cloned() .cloned()
.unwrap_or_else(|| Rc::new(RefCell::new(RadixBlock::new()))); .unwrap_or_else(|| Rc::new(RefCell::new(RadixBlock::new())));
// insert into radix tree // insert into radix tree
inner parent_mut
.children .children
.insert(block_id.tokens_hash, new_block.clone()); .insert(block_data.tokens_hash, new_block.clone());
// increment the current size when creating a new block
self.current_size = self.current_size.saturating_add(1);
new_block new_block
} }
}; };
// Update child and check for cycles
{
// Try to borrow the child mutably - if it fails, it's already borrowed
// in the ancestor chain (parent_mut is alive + all ancestors in recursive stack)
let mut child_mut = match child.try_borrow_mut() {
Ok(b) => b,
Err(_) => {
tracing::warn!(
worker_id = worker.worker_id.to_string(),
dp_rank = worker.dp_rank,
id,
block_hash = ?block_data.block_hash,
"Detected cycle in store event (block already in parent chain); rejecting sequence"
);
return Err(KvCacheEventError::InvalidBlockSequence);
}
};
// add our worker to the block with its external hash // add our worker to the block with its external hash
block child_mut.workers.insert(worker, block_data.block_hash);
.borrow_mut() }
.workers
.insert(worker, block_id.block_hash);
// add the block to the worker_id lookup table // add the block to the worker_id lookup table
worker_lookup.insert(block_id.block_hash, block.clone()); worker_lookup.insert(block_data.block_hash, child.clone());
// drop inner so we can shift current to this block
drop(inner);
current = block; // Recurse with the child and remaining blocks
process_blocks(child, &blocks[1..], worker, worker_lookup, id)
} }
Ok(())
process_blocks(current, &op.blocks, worker, worker_lookup, id)
} }
KvCacheEventData::Removed(remove) => { KvCacheEventData::Removed(remove) => {
// tracing::trace!(id, "KV Remove Operation: {:?}", op); // tracing::trace!(id, "KV Remove Operation: {:?}", op);
...@@ -447,9 +469,6 @@ impl RadixTree { ...@@ -447,9 +469,6 @@ impl RadixTree {
if guard.workers.is_empty() { if guard.workers.is_empty() {
// if no workers are using this block, that is true for all children // if no workers are using this block, that is true for all children
guard.children.clear(); guard.children.clear();
// Decrement the current size when removing the last worker from a node
self.current_size = self.current_size.saturating_sub(1);
} }
// remove the block from the lookup table // remove the block from the lookup table
worker_lookup.remove(&block); worker_lookup.remove(&block);
...@@ -482,9 +501,6 @@ impl RadixTree { ...@@ -482,9 +501,6 @@ impl RadixTree {
// If no workers are using this block, that is true for all children // If no workers are using this block, that is true for all children
if block.borrow().workers.is_empty() { if block.borrow().workers.is_empty() {
block.borrow_mut().children.clear(); block.borrow_mut().children.clear();
// Decrement the current size when removing the last worker from a node
self.current_size = self.current_size.saturating_sub(1);
} }
}); });
...@@ -587,7 +603,7 @@ impl RadixTree { ...@@ -587,7 +603,7 @@ impl RadixTree {
} }
pub fn current_size(&self) -> usize { pub fn current_size(&self) -> usize {
self.current_size self.lookup.values().map(|m| m.len()).sum()
} }
} }
...@@ -602,6 +618,7 @@ pub struct KvIndexerMetrics { ...@@ -602,6 +618,7 @@ pub struct KvIndexerMetrics {
pub const METRIC_STATUS_OK: &str = "ok"; pub const METRIC_STATUS_OK: &str = "ok";
pub const METRIC_STATUS_PARENT_NOT_FOUND: &str = "parent_block_not_found"; pub const METRIC_STATUS_PARENT_NOT_FOUND: &str = "parent_block_not_found";
pub const METRIC_STATUS_BLOCK_NOT_FOUND: &str = "block_not_found"; pub const METRIC_STATUS_BLOCK_NOT_FOUND: &str = "block_not_found";
pub const METRIC_STATUS_INVALID_BLOCK: &str = "invalid_block";
/// Metric event labels. /// Metric event labels.
pub const METRIC_EVENT_STORED: &str = "stored"; pub const METRIC_EVENT_STORED: &str = "stored";
...@@ -674,6 +691,7 @@ impl KvIndexerMetrics { ...@@ -674,6 +691,7 @@ impl KvIndexerMetrics {
let error_label = match e { let error_label = match e {
KvCacheEventError::ParentBlockNotFound => METRIC_STATUS_PARENT_NOT_FOUND, KvCacheEventError::ParentBlockNotFound => METRIC_STATUS_PARENT_NOT_FOUND,
KvCacheEventError::BlockNotFound => METRIC_STATUS_BLOCK_NOT_FOUND, KvCacheEventError::BlockNotFound => METRIC_STATUS_BLOCK_NOT_FOUND,
KvCacheEventError::InvalidBlockSequence => METRIC_STATUS_INVALID_BLOCK,
}; };
self.kv_cache_events_applied self.kv_cache_events_applied
.with_label_values(&[event_type, error_label]) .with_label_values(&[event_type, error_label])
...@@ -1732,6 +1750,34 @@ mod tests { ...@@ -1732,6 +1750,34 @@ mod tests {
result.unwrap_err(), result.unwrap_err(),
KvCacheEventError::BlockNotFound KvCacheEventError::BlockNotFound
)); ));
// Parent appears in blocks: parent=1, blocks=[1, 2, 3]
// This should be rejected as block 1 (hash 100) is the parent
trie.apply_event(create_store_event(worker_0, 4, vec![1], None))
.unwrap();
let result = trie.apply_event(create_store_event(
worker_0,
5,
vec![1, 2, 3],
Some(ExternalSequenceBlockHash(100)),
));
assert!(matches!(
result.unwrap_err(),
KvCacheEventError::InvalidBlockSequence
));
// Block appears twice in sequence: parent=1, blocks=[2, 3, 2]
// Block 2 appears at positions 0 and 2, creating a cycle
let result = trie.apply_event(create_store_event(
worker_0,
6,
vec![2, 3, 2],
Some(ExternalSequenceBlockHash(100)),
));
assert!(matches!(
result.unwrap_err(),
KvCacheEventError::InvalidBlockSequence
));
} }
#[test] #[test]
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment