"ssh:/git@developer.sourcefind.cn:2222/OpenDAS/vllm_cscc.git" did not exist on "50cb927050ff929e5ace52c28b6bd2762c58c0d4"
Unverified Commit 55c6525f authored by jthomson04's avatar jthomson04 Committed by GitHub
Browse files

feat: Improved offload queueing and block eviction ordering (#1425)

parent a1aea900
...@@ -181,7 +181,17 @@ impl<S: Storage, M: BlockMetadata> Block<S, M> { ...@@ -181,7 +181,17 @@ impl<S: Storage, M: BlockMetadata> Block<S, M> {
BlockState::Complete(state) => Ok(state.token_block().sequence_hash()), BlockState::Complete(state) => Ok(state.token_block().sequence_hash()),
BlockState::Registered(state, _) => Ok(state.sequence_hash()), BlockState::Registered(state, _) => Ok(state.sequence_hash()),
_ => Err(BlockError::InvalidState( _ => Err(BlockError::InvalidState(
"Block is not complete".to_string(), "Block is not complete nor registered.".to_string(),
)),
}
}
pub fn parent_sequence_hash(&self) -> Result<Option<SequenceHash>, BlockError> {
match self.state() {
BlockState::Complete(state) => Ok(state.token_block().parent_sequence_hash()),
BlockState::Registered(state, _) => Ok(state.parent_sequence_hash()),
_ => Err(BlockError::InvalidState(
"Block is not complete nor registered.".to_string(),
)), )),
} }
} }
...@@ -605,6 +615,9 @@ pub(crate) fn layout_to_blocks<S: Storage, M: BlockMetadata>( ...@@ -605,6 +615,9 @@ pub(crate) fn layout_to_blocks<S: Storage, M: BlockMetadata>(
pub struct MutableBlock<S: Storage, M: BlockMetadata> { pub struct MutableBlock<S: Storage, M: BlockMetadata> {
block: Option<Block<S, M>>, block: Option<Block<S, M>>,
return_tx: tokio::sync::mpsc::UnboundedSender<Block<S, M>>, return_tx: tokio::sync::mpsc::UnboundedSender<Block<S, M>>,
// Use to track parent relationship, as well as ensure that parents of registered blocks stay
// alive as long as the child is alive.
parent: Option<Arc<MutableBlock<S, M>>>,
} }
impl<S: Storage + NixlDescriptor, M: BlockMetadata> WritableBlock for MutableBlock<S, M> { impl<S: Storage + NixlDescriptor, M: BlockMetadata> WritableBlock for MutableBlock<S, M> {
...@@ -626,8 +639,13 @@ impl<S: Storage, M: BlockMetadata> MutableBlock<S, M> { ...@@ -626,8 +639,13 @@ impl<S: Storage, M: BlockMetadata> MutableBlock<S, M> {
Self { Self {
block: Some(block), block: Some(block),
return_tx, return_tx,
parent: None,
} }
} }
pub fn set_parent(&mut self, parent: Arc<MutableBlock<S, M>>) {
self.parent = Some(parent);
}
} }
impl<S: Storage, M: BlockMetadata> std::fmt::Debug for MutableBlock<S, M> { impl<S: Storage, M: BlockMetadata> std::fmt::Debug for MutableBlock<S, M> {
...@@ -769,7 +787,7 @@ impl<S: Storage, M: BlockMetadata> ImmutableBlock<S, M> { ...@@ -769,7 +787,7 @@ impl<S: Storage, M: BlockMetadata> ImmutableBlock<S, M> {
Self { block } Self { block }
} }
pub fn mutable_block(&self) -> &Arc<MutableBlock<S, M>> { pub(crate) fn mutable_block(&self) -> &Arc<MutableBlock<S, M>> {
&self.block &self.block
} }
} }
......
...@@ -309,15 +309,18 @@ impl<Metadata: BlockMetadata> OffloadManager<Metadata> { ...@@ -309,15 +309,18 @@ impl<Metadata: BlockMetadata> OffloadManager<Metadata> {
} }
} }
let target_blocks = match target_pool.allocate_blocks(1).await { let target_block = 'target_block: {
Ok(blocks) => blocks, if let Ok(blocks) = target_pool.allocate_blocks(1).await {
Err(_) => { if let Some(block) = blocks.into_iter().next() {
tracing::warn!("Target pool full. Skipping offload. This should only ever happen with very small pool sizes."); break 'target_block Some(block);
continue; }
} }
tracing::warn!("Target pool full. Skipping offload. This should only ever happen with very small pool sizes.");
None
}; };
if let Some(target_block) = target_blocks.into_iter().next() { if let Some(target_block) = target_block {
pool_metrics.counter("offload_processed").inc(); pool_metrics.counter("offload_processed").inc();
transfer_manager transfer_manager
.enqueue_transfer(PendingTransfer::new( .enqueue_transfer(PendingTransfer::new(
...@@ -533,7 +536,7 @@ impl<Metadata: BlockMetadata> OffloadManager<Metadata> { ...@@ -533,7 +536,7 @@ impl<Metadata: BlockMetadata> OffloadManager<Metadata> {
} }
#[cfg(all(test, feature = "testing-cuda"))] #[cfg(all(test, feature = "testing-cuda"))]
mod tests { pub mod tests {
use super::*; use super::*;
use crate::block_manager::block::test_utils::get_private_token; use crate::block_manager::block::test_utils::get_private_token;
...@@ -550,6 +553,7 @@ mod tests { ...@@ -550,6 +553,7 @@ mod tests {
}, },
DType, LayoutConfig, DType, LayoutConfig,
}; };
use crate::tokens::{TokenBlockSequence, Tokens};
use nixl_sys::{MemoryRegion, NixlDescriptor}; use nixl_sys::{MemoryRegion, NixlDescriptor};
use aligned_vec::avec; use aligned_vec::avec;
...@@ -580,7 +584,7 @@ mod tests { ...@@ -580,7 +584,7 @@ mod tests {
}; };
} }
fn build_pools( pub fn build_pools(
device_blocks: usize, device_blocks: usize,
host_blocks: Option<usize>, host_blocks: Option<usize>,
disk_blocks: Option<usize>, disk_blocks: Option<usize>,
...@@ -1387,4 +1391,122 @@ mod tests { ...@@ -1387,4 +1391,122 @@ mod tests {
Ok(()) Ok(())
} }
#[tokio::test]
async fn test_offload_evict_order() -> Result<()> {
let (offload_manager, device_pool, host_pool, _) = build_pools(4, Some(4), None, None)?;
let device_pool = device_pool.as_ref().unwrap();
let host_pool = host_pool.as_ref().unwrap();
let tokens = vec![0_u32; BLOCK_SIZE * 4];
let token_blocks = TokenBlockSequence::new(Tokens::from(tokens), 4, None);
assert_eq!(token_blocks.blocks().len(), 4);
let mut mutable_blocks = Vec::new();
let mut sequence_hashes = Vec::new();
for token_block in token_blocks.blocks() {
let mut mutable_block = device_pool
.allocate_blocks(1)
.await?
.into_iter()
.next()
.unwrap();
mutable_block.apply_token_block(token_block.clone())?;
sequence_hashes.push(mutable_block.sequence_hash()?);
mutable_blocks.push(mutable_block);
}
let immutable_blocks = device_pool.register_blocks(mutable_blocks).await?;
for block in &immutable_blocks {
offload_manager.offload(block, 0).await?;
}
// Wait for offloads.
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
// Allocate 2 blocks on the host.
let _host_blocks = host_pool.allocate_blocks(2).await?;
// Check the existing blocks.
assert_eq!(
host_pool
.match_sequence_hashes(sequence_hashes.as_slice())
.await?
.len(),
2
);
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
let _host_blocks2 = host_pool.allocate_blocks(1).await?;
// Now there should only be the first block on host.
assert_eq!(
host_pool
.match_sequence_hashes(sequence_hashes.as_slice())
.await?
.len(),
1
);
Ok(())
}
#[tokio::test]
async fn test_onboard_evict_order() -> Result<()> {
let (offload_manager, device_pool, host_pool, _) = build_pools(4, Some(4), None, None)?;
let device_pool = device_pool.as_ref().unwrap();
let host_pool = host_pool.as_ref().unwrap();
let tokens = vec![0_u32; BLOCK_SIZE * 4];
let token_blocks = TokenBlockSequence::new(Tokens::from(tokens), 4, None);
assert_eq!(token_blocks.blocks().len(), 4);
let mut mutable_blocks = Vec::new();
let mut sequence_hashes = Vec::new();
for token_block in token_blocks.blocks() {
let mut block = host_pool
.allocate_blocks(1)
.await?
.into_iter()
.next()
.unwrap();
block.apply_token_block(token_block.clone())?;
sequence_hashes.push(block.sequence_hash()?);
mutable_blocks.push(block);
}
let immutable_blocks = host_pool.register_blocks(mutable_blocks).await?;
let _ = offload_manager.onboard(immutable_blocks).await?;
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
let _device_blocks = device_pool.allocate_blocks(2).await?;
assert_eq!(
device_pool
.match_sequence_hashes(sequence_hashes.as_slice())
.await?
.len(),
2
);
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
let _device_blocks2 = device_pool.allocate_blocks(1).await?;
assert_eq!(
device_pool
.match_sequence_hashes(sequence_hashes.as_slice())
.await?
.len(),
1
);
Ok(())
}
} }
...@@ -501,12 +501,16 @@ struct ProgressEngine<S: Storage, M: BlockMetadata> { ...@@ -501,12 +501,16 @@ struct ProgressEngine<S: Storage, M: BlockMetadata> {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use crate::block_manager::block::BlockExt;
use super::super::block::{BasicMetadata, Blocks}; use super::super::block::{BasicMetadata, Blocks};
use super::super::layout::tests::setup_layout; use super::super::layout::{tests::setup_layout, FullyContiguous, LayoutConfig};
use super::*; use super::*;
use crate::block_manager::block::BlockExt;
use crate::block_manager::DType;
use crate::tokens::{TokenBlockSequence, Tokens};
use crate::block_manager::storage::tests::{NullDeviceAllocator, NullDeviceStorage};
/// Helper method to build a [`BlockPool`] with a [`ProgressEngine`] for unit testing /// Helper method to build a [`BlockPool`] with a [`ProgressEngine`] for unit testing
impl<S: Storage, M: BlockMetadata> BlockPoolArgsBuilder<S, M> { impl<S: Storage, M: BlockMetadata> BlockPoolArgsBuilder<S, M> {
fn build_with_progress_engine( fn build_with_progress_engine(
...@@ -673,4 +677,303 @@ mod tests { ...@@ -673,4 +677,303 @@ mod tests {
assert_eq!(matched.len(), 1); assert_eq!(matched.len(), 1);
assert_eq!(matched[0].sequence_hash().unwrap(), sequence_hash); assert_eq!(matched[0].sequence_hash().unwrap(), sequence_hash);
} }
async fn create_blocks<S: Storage, M: BlockMetadata>(
pool: &BlockPool<S, M>,
num_blocks: usize,
) -> anyhow::Result<(Vec<ImmutableBlock<S, M>>, Vec<SequenceHash>)> {
let tokens = vec![0; num_blocks * 4];
let token_blocks = TokenBlockSequence::new(Tokens::from(tokens), 4, None);
assert_eq!(token_blocks.blocks().len(), num_blocks);
let mut sequence_hashes = Vec::new();
let mut mutable_blocks = Vec::new();
for token_block in token_blocks.blocks().iter() {
let mut block = pool.allocate_blocks(1).await?.pop().unwrap();
block.apply_token_block(token_block.clone())?;
sequence_hashes.push(block.sequence_hash().unwrap());
mutable_blocks.push(block);
}
let immutable_blocks = pool.register_blocks(mutable_blocks).await?;
Ok((immutable_blocks, sequence_hashes))
}
async fn make_simple_pool(
num_blocks: usize,
) -> anyhow::Result<BlockPool<NullDeviceStorage, BasicMetadata>> {
let config = LayoutConfig {
num_blocks,
num_layers: 1,
outer_dim: 1,
page_size: 4,
inner_dim: 1024,
alignment: 1,
dtype: DType::FP16,
};
let layout = FullyContiguous::<NullDeviceStorage>::allocate(config, &NullDeviceAllocator)?;
let blocks = Blocks::<_, BasicMetadata>::new(layout, 42, 0)?.into_blocks()?;
let pool = BlockPool::builder().blocks(blocks).build()?;
Ok(pool)
}
/// A test that ensures that we only ever evict leaves from the inactive pool.
#[tokio::test]
async fn test_block_pool_evict_leaves() -> anyhow::Result<()> {
let pool = make_simple_pool(4).await?;
let (_, sequence_hashes) = create_blocks(&pool, 4).await?;
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
// Allocate 1 block. This should evict the leaf of our allocated sequence.
pool.allocate_blocks(1).await?;
// The leaf should be evicted, so we should have 3 matches.
let matched = pool
.match_sequence_hashes(sequence_hashes.as_slice())
.await?;
assert_eq!(matched.len(), 3);
drop(matched);
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
// Allocate 2 blocks. This should get the previously allocated block, as well as one more leaf.
pool.allocate_blocks(2).await.unwrap();
// The next leaf should be evicted, so we should have 2 matches.
let matched = pool
.match_sequence_hashes(sequence_hashes.as_slice())
.await?;
assert_eq!(matched.len(), 2);
drop(matched);
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
// If we allocate all the blocks, the entire remaining sequence should be evicted.
let blocks = pool.allocate_blocks(4).await?;
assert_eq!(blocks.len(), 4);
Ok(())
}
/// When a block has two children, we need to ensure that we evict both children before
/// adding the parent to the leaf set.
#[tokio::test]
async fn test_block_pool_parent_child() -> anyhow::Result<()> {
let pool = make_simple_pool(3).await?;
let tokens = vec![1, 2, 3, 4, 5];
let sequence = TokenBlockSequence::new(Tokens::from(tokens.clone()), 4, None);
// Create a root block, with two child blocks.
let mut root_block = pool.allocate_blocks(1).await?.pop().unwrap();
root_block.apply_token_block(sequence.blocks().first().unwrap().clone())?;
let root_block_hash = root_block.sequence_hash().unwrap();
let mut child_blocks = Vec::new();
let mut child_block_hashes = Vec::new();
for i in 0..2 {
// Create a new token sequence using the common prefix.
let mut tokens = tokens.clone();
for _ in 0..4 {
tokens.push(i);
}
let seq = TokenBlockSequence::new(Tokens::from(tokens), 4, None);
// Allocate and apply the suffix to the child block.
let mut child_block = pool.allocate_blocks(1).await?.pop().unwrap();
child_block.apply_token_block(seq.blocks()[1].clone())?;
child_block_hashes.push(child_block.sequence_hash().unwrap());
child_blocks.push(child_block);
}
// Register the children first. This can happen with offloading.
let child_blocks = pool.register_blocks(child_blocks).await?;
// After the children are registered, we can register the root block.
let root_block = pool.register_blocks(vec![root_block]).await?;
// Drop both of them.
drop(root_block);
drop(child_blocks);
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
// Allocate two new blocks, which should evict both children.
pool.allocate_blocks(2).await?;
// Now, the root block should be the only block left.
for child_block_hash in child_block_hashes {
let matched = pool.match_sequence_hashes(&[child_block_hash]).await?;
assert_eq!(matched.len(), 0);
}
// Check that the root block remains.
let matched = pool.match_sequence_hashes(&[root_block_hash]).await?;
assert_eq!(matched.len(), 1);
Ok(())
}
/// When offloading, it's possible that the tail of a sequence in a pool is evicted before
/// the entire sequence can be offloaded. This can happen in the following case:
///
/// Assume a sequence of 4 blocks: [0, 1, 2, 3]
/// 1. Blocks 0, 1, and 2 are offloaded to host memory.
/// 2. Block 2 is evicted from the host.
/// 3. Block 3 is offloaded to host memory.
/// Now, the contents of the cache are [0, 1] and [3].
/// We need to treat these as two separate sequences.
#[tokio::test]
async fn test_block_pool_fragmentation() -> anyhow::Result<()> {
let pool = make_simple_pool(4).await?;
let tokens = vec![0; 16];
let token_blocks = TokenBlockSequence::new(Tokens::from(tokens), 4, None);
assert_eq!(token_blocks.blocks().len(), 4);
let mut sequence_hashes = Vec::new();
// Allocate and register the first 3 blocks.
for block in token_blocks.blocks()[..3].iter() {
let mut mutable_block = pool.allocate_blocks(1).await?.pop().unwrap();
mutable_block.apply_token_block(block.clone())?;
sequence_hashes.push(mutable_block.sequence_hash()?);
let _ = pool.register_blocks(vec![mutable_block]).await?;
}
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
// Allocate 2 blocks. This should take the remaining uninitialized block as well as the
// tail of the currently registered sequence.
let _ = pool.allocate_blocks(2).await?;
assert_eq!(
pool.match_sequence_hashes(sequence_hashes.as_slice())
.await?
.len(),
2
);
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
// Allocate 1 more block for the leaf of the sequence.
let mut mutable_block = pool.allocate_blocks(1).await?.into_iter().next().unwrap();
mutable_block.apply_token_block(token_blocks.blocks()[3].clone())?;
let _ = pool.register_blocks(vec![mutable_block]).await?;
// We should still only match the first 2 blocks, since the 3rd block has been evicted.
assert_eq!(
pool.match_sequence_hashes(sequence_hashes.as_slice())
.await?
.len(),
2
);
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
// Now, we should be able to allocate all 4 blocks.
let _ = pool.allocate_blocks(4).await?;
Ok(())
}
/// Matching an entire sequence (moving it to the active pool), and returning it
/// should not affect the parent-child relationships of the blocks.
#[tokio::test]
async fn test_block_pool_match_return() -> anyhow::Result<()> {
let pool = make_simple_pool(4).await?;
let (_, sequence_hashes) = create_blocks(&pool, 4).await?;
// We match the root of the sequence (moving it to the active pool), then
// immediately return it.
assert_eq!(
pool.match_sequence_hashes(vec![sequence_hashes[0]].as_slice())
.await?
.len(),
1
);
let _alloc_blocks1 = pool.allocate_blocks(3).await?;
// Allocating 3 blocks should evict all but the root of the sequence.
assert_eq!(
pool.match_sequence_hashes(sequence_hashes.as_slice())
.await?
.len(),
1
);
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
let _alloc_blocks2 = pool.allocate_blocks(1).await?;
// Now, allocating one more block should evict the root.
assert_eq!(
pool.match_sequence_hashes(sequence_hashes.as_slice())
.await?
.len(),
0
);
Ok(())
}
/// When we move a suffix of a sequence to the active pool (like what happens when onboarding),
/// then return it to the inactive pool, we need to ensure that the parent-child relationships
/// are still correct, and that the temporary leaf in the inactive pool can't be evicted.
#[tokio::test]
async fn test_block_pool_match_partial() -> anyhow::Result<()> {
let pool = make_simple_pool(4).await?;
let (_, sequence_hashes) = create_blocks(&pool, 4).await?;
// Assert that all 4 blocks are in the pool.
assert_eq!(
pool.match_sequence_hashes(sequence_hashes.as_slice())
.await?
.len(),
4
);
// Now, we match only the last 2 blocks
let matched_suffix = pool.match_sequence_hashes(&sequence_hashes[2..]).await?;
assert_eq!(matched_suffix.len(), 2);
// This allocation should fail. Although there are 2 inactive blocks, the leaf is in the active pool.
let new_alloc_block = pool.allocate_blocks(1).await?;
assert_eq!(new_alloc_block.len(), 0);
// Now, drop the leaf, and return it to the inactive pool.
drop(matched_suffix);
// All 4 blocks should still be in the pool.
assert_eq!(
pool.match_sequence_hashes(sequence_hashes.as_slice())
.await?
.len(),
4
);
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
Ok(())
}
} }
...@@ -29,7 +29,7 @@ impl<S: Storage, M: BlockMetadata> ActiveBlockPool<S, M> { ...@@ -29,7 +29,7 @@ impl<S: Storage, M: BlockMetadata> ActiveBlockPool<S, M> {
pub fn register( pub fn register(
&mut self, &mut self,
block: MutableBlock<S, M>, mut block: MutableBlock<S, M>,
) -> Result<ImmutableBlock<S, M>, BlockPoolError> { ) -> Result<ImmutableBlock<S, M>, BlockPoolError> {
if !block.state().is_registered() { if !block.state().is_registered() {
return Err(BlockPoolError::InvalidMutableBlock( return Err(BlockPoolError::InvalidMutableBlock(
...@@ -41,6 +41,14 @@ impl<S: Storage, M: BlockMetadata> ActiveBlockPool<S, M> { ...@@ -41,6 +41,14 @@ impl<S: Storage, M: BlockMetadata> ActiveBlockPool<S, M> {
BlockPoolError::InvalidMutableBlock("block has no sequence hash".to_string()) BlockPoolError::InvalidMutableBlock("block has no sequence hash".to_string())
})?; })?;
// Set the parent of the block if it has one.
// This is needed to ensure the lifetime of the parent is at least as long as the child.
if let Ok(Some(parent)) = block.parent_sequence_hash() {
if let Some(parent_block) = self.match_sequence_hash(parent) {
block.set_parent(parent_block.mutable_block().clone());
}
}
let shared = Arc::new(block); let shared = Arc::new(block);
match self.map.entry(sequence_hash) { match self.map.entry(sequence_hash) {
......
...@@ -16,15 +16,20 @@ ...@@ -16,15 +16,20 @@
use crate::block_manager::block::BlockState; use crate::block_manager::block::BlockState;
use super::*; use super::*;
use std::collections::HashSet;
use tracing::instrument; use tracing::instrument;
#[derive(Default)] #[derive(Default)]
pub struct InactiveBlockPool<S: Storage, M: BlockMetadata> { pub struct InactiveBlockPool<S: Storage, M: BlockMetadata> {
// Direct lookup by sequence_hash // Direct lookup by sequence_hash.
lookup_map: HashMap<SequenceHash, Block<S, M>>, lookup_map: HashMap<SequenceHash, Block<S, M>>,
// Ordered by timestamp (oldest first) // A priority ordering for the leaf nodes.
priority_set: BTreeSet<PriorityKey<M>>, // Leaf nodes are defined as blocks that have no children in the inactive pool.
leaf_set: BTreeSet<PriorityKey<M>>,
// Mapping from parents to their children.
parent_children: HashMap<SequenceHash, HashSet<SequenceHash>>,
// Fully Uninitialized // Fully Uninitialized
uninitialized_set: VecDeque<Block<S, M>>, uninitialized_set: VecDeque<Block<S, M>>,
...@@ -45,7 +50,8 @@ impl<S: Storage, M: BlockMetadata> InactiveBlockPool<S, M> { ...@@ -45,7 +50,8 @@ impl<S: Storage, M: BlockMetadata> InactiveBlockPool<S, M> {
pub(crate) fn new() -> Self { pub(crate) fn new() -> Self {
Self { Self {
lookup_map: HashMap::new(), lookup_map: HashMap::new(),
priority_set: BTreeSet::new(), leaf_set: BTreeSet::new(),
parent_children: HashMap::new(),
uninitialized_set: VecDeque::new(), uninitialized_set: VecDeque::new(),
return_tick: 0, return_tick: 0,
total_blocks: 0, total_blocks: 0,
...@@ -75,12 +81,11 @@ impl<S: Storage, M: BlockMetadata> InactiveBlockPool<S, M> { ...@@ -75,12 +81,11 @@ impl<S: Storage, M: BlockMetadata> InactiveBlockPool<S, M> {
/// Inserts a block into the pool using its sequence hash for potential reuse. /// Inserts a block into the pool using its sequence hash for potential reuse.
/// ///
/// If an entry with the same priority key already exists in the [`priority_set`],
/// the block is reset and moved to the [`uninitialized_set`].
/// If an entry with the same sequence hash already exists in the [`lookup_map`] /// If an entry with the same sequence hash already exists in the [`lookup_map`]
/// (but not the priority set - indicating an inconsistency), the block is reset /// the block is reset and moved to the [`uninitialized_set`].
/// and moved to the [`uninitialized_set`]. /// Otherwise, the block is added to the [`lookup_map`].
/// Otherwise, the block is added to both the [`lookup_map`] and the [`priority_set`]. /// If there are no children of the block, it is added to the [`leaf_set`].
/// If the parent of the block is in the [`leaf_set`], it is removed from the [`leaf_set`].
/// ///
/// # Arguments /// # Arguments
/// ///
...@@ -89,22 +94,35 @@ impl<S: Storage, M: BlockMetadata> InactiveBlockPool<S, M> { ...@@ -89,22 +94,35 @@ impl<S: Storage, M: BlockMetadata> InactiveBlockPool<S, M> {
#[instrument(level = "trace", skip(self, block), fields(sequence_hash = ?sequence_hash))] #[instrument(level = "trace", skip(self, block), fields(sequence_hash = ?sequence_hash))]
fn insert_with_sequence_hash(&mut self, block: Block<S, M>, sequence_hash: SequenceHash) { fn insert_with_sequence_hash(&mut self, block: Block<S, M>, sequence_hash: SequenceHash) {
let priority_key = PriorityKey::new(block.metadata().clone(), sequence_hash); let priority_key = PriorityKey::new(block.metadata().clone(), sequence_hash);
if self.priority_set.contains(&priority_key) { if self.lookup_map.contains_key(&sequence_hash) {
tracing::trace!("multiple entries with the same priority key, resetting block and inserting into uninitialized set"); tracing::trace!("multiple entries with the same sequence hash, resetting block and inserting into uninitialized set");
let mut block = block; let mut block = block;
block.reset(); block.reset();
self.uninitialized_set.push_back(block); self.uninitialized_set.push_back(block);
} else if let std::collections::hash_map::Entry::Vacant(e) =
self.lookup_map.entry(sequence_hash)
{
tracing::trace!("inserting block to map and priority set");
self.priority_set.insert(priority_key);
e.insert(block);
} else { } else {
tracing::trace!("multiple entries in lookup map with the same sequence hash, inserting into uninitialized set"); tracing::trace!("inserting block to map and priority set");
let mut block = block;
block.reset(); if let Ok(Some(parent)) = block.parent_sequence_hash() {
self.uninitialized_set.push_back(block); // Add the entry for the parent->child link.
self.parent_children
.entry(parent)
.or_default()
.insert(sequence_hash);
// If the parent is currently in the inactive pool, remove it from the leaf set.
if let Some(parent_block) = self.lookup_map.get_mut(&parent) {
self.leaf_set
.remove(&PriorityKey::new(parent_block.metadata().clone(), parent));
}
}
// Create the entry for the block in the lookup map.
self.lookup_map.insert(sequence_hash, block);
// If the block has no children, it is a leaf.
if !self.parent_children.contains_key(&sequence_hash) {
self.leaf_set.insert(priority_key);
}
} }
} }
...@@ -206,7 +224,7 @@ impl<S: Storage, M: BlockMetadata> InactiveBlockPool<S, M> { ...@@ -206,7 +224,7 @@ impl<S: Storage, M: BlockMetadata> InactiveBlockPool<S, M> {
/// Returns multiple blocks to the pool. /// Returns multiple blocks to the pool.
/// ///
/// Iterates through the blocks in reverse order (tail to head) and calls /// Iterates through the blocks in order and calls
/// `return_block` for each one. /// `return_block` for each one.
/// ///
/// # Arguments /// # Arguments
...@@ -217,7 +235,7 @@ impl<S: Storage, M: BlockMetadata> InactiveBlockPool<S, M> { ...@@ -217,7 +235,7 @@ impl<S: Storage, M: BlockMetadata> InactiveBlockPool<S, M> {
let count = blocks.len(); let count = blocks.len();
tracing::debug!(count, "Returning blocks to pool"); tracing::debug!(count, "Returning blocks to pool");
// return the block to the pool from tail to head // return the block to the pool from tail to head
for (i, block) in blocks.into_iter().rev().enumerate() { for (i, block) in blocks.into_iter().enumerate() {
tracing::trace!(current = i + 1, total = count, "Returning block"); tracing::trace!(current = i + 1, total = count, "Returning block");
// Note: return_block has its own instrumentation // Note: return_block has its own instrumentation
self.return_block(block); self.return_block(block);
...@@ -225,7 +243,7 @@ impl<S: Storage, M: BlockMetadata> InactiveBlockPool<S, M> { ...@@ -225,7 +243,7 @@ impl<S: Storage, M: BlockMetadata> InactiveBlockPool<S, M> {
} }
/// Attempts to remove and return a block associated with the given sequence hash /// Attempts to remove and return a block associated with the given sequence hash
/// from the [`lookup_map`] and [`priority_set`]. /// from the [`lookup_map`] and [`leaf_set`].
/// ///
/// # Arguments /// # Arguments
/// ///
...@@ -238,9 +256,10 @@ impl<S: Storage, M: BlockMetadata> InactiveBlockPool<S, M> { ...@@ -238,9 +256,10 @@ impl<S: Storage, M: BlockMetadata> InactiveBlockPool<S, M> {
fn take_with_sequence_hash(&mut self, sequence_hash: SequenceHash) -> Option<Block<S, M>> { fn take_with_sequence_hash(&mut self, sequence_hash: SequenceHash) -> Option<Block<S, M>> {
match self.lookup_map.remove(&sequence_hash) { match self.lookup_map.remove(&sequence_hash) {
Some(block) => { Some(block) => {
// Remove from priority set // Remove from leaf set, if it exists.
let priority_key = PriorityKey::new(block.metadata().clone(), sequence_hash); self.leaf_set
self.priority_set.remove(&priority_key); .remove(&PriorityKey::new(block.metadata().clone(), sequence_hash));
Some(block) Some(block)
} }
None => None, None => None,
...@@ -366,12 +385,39 @@ impl<S: Storage, M: BlockMetadata> InactiveBlockPool<S, M> { ...@@ -366,12 +385,39 @@ impl<S: Storage, M: BlockMetadata> InactiveBlockPool<S, M> {
return Some(block); return Some(block);
} }
// if we have blocks in the priority set, pop the first (it's sorted by priority) // if we have blocks in the leaf set, pop the first (it's sorted by priority)
// a fatal error will occur if the block is not found in the lookup map // a fatal error will occur if the block is not found in the lookup map
if let Some(key) = self.priority_set.pop_first() { if let Some(key) = self.leaf_set.pop_first() {
tracing::trace!("Acquired priority/registered block map; resetting block"); tracing::trace!("Acquired priority/registered block map; resetting block");
match self.lookup_map.remove(&key.sequence_hash()) { match self.lookup_map.remove(&key.sequence_hash()) {
Some(mut block) => { Some(mut block) => {
if let Some(children) = self.parent_children.get(&key.sequence_hash()) {
panic!(
"Block has {} inactive children, but should have none.",
children.len()
);
}
if let Ok(Some(parent)) = block.parent_sequence_hash() {
let is_leaf = match self.parent_children.get_mut(&parent) {
Some(children) => {
children.remove(&key.sequence_hash());
children.is_empty()
}
None => true,
};
if is_leaf {
self.parent_children.remove(&parent);
if let Some(parent_block) = self.lookup_map.get(&parent) {
self.leaf_set.insert(PriorityKey::new(
parent_block.metadata().clone(),
parent,
));
}
}
}
block.reset(); block.reset();
self.return_tick += 1; self.return_tick += 1;
block.metadata_on_acquired(self.return_tick); block.metadata_on_acquired(self.return_tick);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment