Unverified Commit d5befaab authored by blarson-b10's avatar blarson-b10 Committed by GitHub
Browse files

fix: remove store block recursion in favor of loop + don't try to detect loops (#5497)


Signed-off-by: default avatarBrian Larson <brian.larson@baseten.co>
parent 050906b5
......@@ -322,7 +322,7 @@ impl RadixTree {
// find the parent block - if the parent exists it must be on our worker, if not,
// we check the radix tree's root to find it.
// this is the single most expensive lookup
let current = match op.parent_hash {
let mut current = match op.parent_hash {
Some(parent) => match worker_lookup.get(&parent) {
Some(current) => current.clone(),
None => {
......@@ -340,20 +340,8 @@ impl RadixTree {
None => self.root.clone(),
};
fn process_blocks(
parent: SharedRadixBlock,
blocks: &[KvCacheStoredBlockData],
worker: WorkerWithDpRank,
worker_lookup: &mut HashMap<ExternalSequenceBlockHash, SharedRadixBlock>,
id: u64,
) -> Result<(), KvCacheEventError> {
if blocks.is_empty() {
return Ok(());
}
let mut parent_mut = parent.borrow_mut();
let block_data = &blocks[0];
for block_data in op.blocks {
let mut parent_mut = current.borrow_mut();
let child = match parent_mut.children.get(&block_data.tokens_hash) {
Some(block) => block.clone(),
None => {
......@@ -372,10 +360,10 @@ impl RadixTree {
}
};
// Update child and check for cycles
// Update child and check for self referential blocks
{
// Try to borrow the child mutably - if it fails, it's already borrowed
// in the ancestor chain (parent_mut is alive + all ancestors in recursive stack)
// which means a self referencing block.
let mut child_mut = match child.try_borrow_mut() {
Ok(b) => b,
Err(_) => {
......@@ -384,7 +372,7 @@ impl RadixTree {
dp_rank = worker.dp_rank,
id,
block_hash = ?block_data.block_hash,
"Detected cycle in store event (block already in parent chain); rejecting sequence"
"Detected self referencing block in store event; rejecting sequence"
);
return Err(KvCacheEventError::InvalidBlockSequence);
}
......@@ -397,16 +385,18 @@ impl RadixTree {
// add the block to the worker_id lookup table
worker_lookup.insert(block_data.block_hash, child.clone());
// Recurse with the child and remaining blocks
process_blocks(child, &blocks[1..], worker, worker_lookup, id)
}
// drop child so we can shift current to this block
drop(parent_mut);
process_blocks(current, &op.blocks, worker, worker_lookup, id)
current = child;
}
Ok(())
}
KvCacheEventData::Removed(remove) => {
// tracing::trace!(id, "KV Remove Operation: {:?}", op);
// let mut worker_lookup = self.lookup.get(&worker_id).expect("Worker not found");
let mut kv_cache_err: Option<KvCacheEventError> = None;
for block in remove.block_hashes {
// entry in radix tree
// a small optimization would be to get the next block from the reduced set of children
......@@ -422,7 +412,13 @@ impl RadixTree {
block_hash = ?block,
"Failed to find block to remove; skipping remove operation"
);
return Err(KvCacheEventError::BlockNotFound);
// Kv cache removed events may be batched; we should try to apply all
// operations in the batch before returning an error. Return the first
// error.
if kv_cache_err.is_none() {
kv_cache_err = Some(KvCacheEventError::BlockNotFound);
}
continue;
}
};
......@@ -435,7 +431,11 @@ impl RadixTree {
// remove the block from the lookup table
worker_lookup.remove(&block);
}
Ok(())
if let Some(err) = kv_cache_err {
Err(err)
} else {
Ok(())
}
}
KvCacheEventData::Cleared => {
self.clear_all_blocks(worker.worker_id);
......@@ -2382,7 +2382,8 @@ mod tests {
));
// Parent appears in blocks: parent=1, blocks=[1, 2, 3]
// This should be rejected as block 1 (hash 100) is the parent
// This should be rejected as block 1 (hash 100) is the parent - this is
// a self referencing block.
trie.apply_event(create_store_event(worker_0, 4, vec![1], None))
.unwrap();
let result = trie.apply_event(create_store_event(
......@@ -2395,19 +2396,6 @@ mod tests {
result.unwrap_err(),
KvCacheEventError::InvalidBlockSequence
));
// Block appears twice in sequence: parent=1, blocks=[2, 3, 2]
// Block 2 appears at positions 0 and 2, creating a cycle
let result = trie.apply_event(create_store_event(
worker_0,
6,
vec![2, 3, 2],
Some(ExternalSequenceBlockHash(100)),
));
assert!(matches!(
result.unwrap_err(),
KvCacheEventError::InvalidBlockSequence
));
}
#[test]
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment