// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. // SPDX-License-Identifier: Apache-2.0 use dynamo_llm::block_manager::{DiskStorage, PinnedStorage}; use super::*; #[allow(dead_code)] pub enum SlotPosition { /// The current position in the sequence representing all tokens that have been computed. Computed, /// The number of tokens that were ini Prefill, /// If the compute position is less than the prefill position, this will be the Prefill position; /// otherwise, it will be the Computed position All, } pub struct Slot { /// Current position in the sequence of tokens that have been computed. /// When the slot is initialized, we populate the sequence with the prefill tokens. /// However, those tokens are not yet prefilled, so they are not yet represented /// in the sequence_position. computed_position: usize, /// The number of tokens that were initially prefilled. prefill_position: usize, /// The sequence of token blocks sequence: TokenBlockSequence, /// The immutable blocks immutable: Vec>, /// The mutable blocks mutable: VecDeque>, /// Blocks to be onboarded from the host /// We must hold these blocks in the slot state until the scheduler trigger the onboarding. onboard_from_host: Option>>, /// Blocks to be onboarded from the disk /// We must hold these blocks in the slot state until the scheduler trigger the onboarding. onboard_from_disk: Option>>, /// The number of blocks cached from the device blocks_cached_from_device: usize, /// The number of blocks cached from the host blocks_cached_from_host: usize, /// The number of blocks cached from the disk blocks_cached_from_disk: usize, } impl std::fmt::Debug for Slot { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { let immutable_block_ids = self .immutable .iter() .map(|b| b.block_id()) .collect::>(); let mutable_block_ids = self .mutable .iter() .map(|b| b.block_id()) .collect::>(); write!( f, "Slot(computed_position: {}, prefill_position: {}, immutable_block_ids: {:?}, mutable_block_ids: {:?})", self.computed_position, self.prefill_position, immutable_block_ids, mutable_block_ids ) } } impl Slot { /// Creates a new slot. pub fn new(tokens: Tokens, block_size: usize, salt_hash: SaltHash) -> Self { let sequence = TokenBlockSequence::new(tokens, block_size as u32, Some(salt_hash)); let prefill_position = sequence.total_tokens(); Self { computed_position: 0, prefill_position, sequence, immutable: Vec::new(), mutable: VecDeque::new(), onboard_from_host: None, onboard_from_disk: None, blocks_cached_from_device: 0, blocks_cached_from_host: 0, blocks_cached_from_disk: 0, } } pub fn first_allocation(&self) -> bool { self.immutable.is_empty() && self.mutable.is_empty() } /// Updates the sequence with the given tokens. /// These tokens will advance the computed sequence position. #[tracing::instrument(level = "debug", skip(block_pool))] pub fn apply_computed_tokens( &mut self, mut tokens_to_append: Vec, block_pool: &dyn BlockPool, ) -> Result<(), SlotError> { if tokens_to_append.is_empty() { return Ok(()); } // Check that we have sufficient capacity in mutable blocks for the tokens let available_capacity = self.mutable.len() * self.sequence.block_size() - (self.computed_position % self.sequence.block_size()); if tokens_to_append.len() > available_capacity { return Err(SlotError::from_str(&format!( "Insufficient capacity: need {} tokens but only {} available in mutable blocks", tokens_to_append.len(), available_capacity ))); } // if we are still prefilling, we don't extend the sequence, but verify the tokens match what is already present. if self.computed_position < self.prefill_position { // In chunked prefill, vLLM may combine the final prefill chunk with some decode tokens. // We need to split off the decode tokens and apply them below. let remaining_decode_tokens = if self.computed_position + tokens_to_append.len() > self.sequence.total_tokens() { tokens_to_append.split_off(self.sequence.total_tokens() - self.computed_position) } else { vec![] }; debug_assert_eq!( self.sequence .tokens_at( self.computed_position..self.computed_position + tokens_to_append.len() ) .as_ref(), &tokens_to_append, "tokens to apply do not match the sequence tokens" ); self.computed_position += tokens_to_append.len(); tracing::debug!( "applying {} prefill tokens; new computed_position: {}", tokens_to_append.len(), self.computed_position ); tokens_to_append = remaining_decode_tokens; } if !tokens_to_append.is_empty() { // if we are not prefilling, we extend the sequence and advance the sequence position. // first advance the sequence, then the position -- this covers the case where the extend fails. let count = tokens_to_append.len(); self.sequence .extend(tokens_to_append.into()) .map_err(|e| SlotError::from_str(&format!("failed to extend sequence: {:?}", e)))?; self.computed_position += count; tracing::debug!( "applied {} tokens; new computed_position: {}", count, self.computed_position ); } // determine if we need to register any blocks // if the number of blocks for the computed position is greater than the number of immutable blocks, // then we have to transition one or more of the mutable blocks to immutable. let num_blocks_to_register = (self.computed_position / self.sequence.block_size()) - self.immutable.len(); debug_assert!(num_blocks_to_register <= self.mutable.len()); if num_blocks_to_register == 0 { tracing::trace!("no blocks to register"); return Ok(()); } let mut blocks_to_register = Vec::new(); tracing::trace!("registering {} blocks", num_blocks_to_register); assert!(self.mutable.len() >= num_blocks_to_register); // create an iterator over the mutable blocks zipped with the token blocks let zipped_blocks = self .mutable .drain(0..num_blocks_to_register) .zip(self.sequence.blocks().iter().skip(self.immutable.len())); // apply the token blocks to the mutable blocks for (mut mutable_block, token_block) in zipped_blocks { mutable_block .state_mut() .apply_token_block(token_block.clone()) .map_err(|e| { SlotError::from_str(&format!("failed to apply token block: {:?}", e)) })?; blocks_to_register.push(mutable_block); } assert_eq!(blocks_to_register.len(), num_blocks_to_register); // register the mutable blocks and extend the slot's immutable blocks let immutable_blocks = block_pool .register_blocks_blocking(blocks_to_register) .map_err(|e| SlotError::from_str(&format!("failed to register blocks: {:?}", e)))?; assert_eq!(immutable_blocks.len(), num_blocks_to_register); tracing::debug!("registered {:?}", immutable_blocks); tracing::debug!("new computed_position: {}", self.computed_position); self.immutable.extend(immutable_blocks); Ok(()) } /// Initialize the slot with the device matched blocks. /// /// Note: This should only be called one time before when we first load the initial /// device matches to the sequence. This method will validate the mutable blocks are /// empty and clear the immutable blocks; we clear the immutable blocks because vLLM /// can try to apply this multiple times if the slot was unable acquire blocks for the /// remainder of the sequence. #[tracing::instrument(level = "debug")] pub fn initialize_with_device_matches( &mut self, computed_blocks: Vec>, ) -> Result<(), SlotError> { assert!(self.mutable.is_empty()); self.blocks_cached_from_device = computed_blocks.len(); self.immutable.clear(); self.apply_immutable_blocks(computed_blocks) } /// Apply immutable blocks to the slot. /// /// Note: The current compute position must match the number of tokens held by the immutable blocks. fn apply_immutable_blocks( &mut self, computed_blocks: Vec>, ) -> Result<(), SlotError> { debug_assert_eq!( self.computed_position % self.sequence.block_size(), 0, "not on a block boundary" ); debug_assert_eq!( self.computed_position / self.sequence.block_size(), self.immutable.len(), "number of computed blocks does not match the number of immutable blocks in the sequence" ); // the expected number of immutable blocks after applying the computed blocks let count = computed_blocks.len(); let expected_immutable_count = self.immutable.len() + computed_blocks.len(); // create an iterator over the mutable blocks zipped with the token blocks let zipped_blocks = self .sequence .blocks() .iter() .skip(self.immutable.len()) .zip(computed_blocks); // validate the sequence hashes of the incoming immutable computed blocks // against the sequence hashes of blocks in the sequence. for (sequence_block, computed_block) in zipped_blocks { if sequence_block.sequence_hash() != computed_block.sequence_hash() { return Err(SlotError::from_str("computed block sequence hash mismatch")); } self.computed_position += sequence_block.block_size(); self.immutable.push(computed_block); } assert_eq!( self.immutable.len(), expected_immutable_count, "did not apply the expected number of immutable blocks; expected: {}, actual: {}", expected_immutable_count, self.immutable.len() ); tracing::debug!( "applied {} immutable blocks; computed sequence position: {}", count, self.computed_position ); Ok(()) } /// Allocates space for the given number of new tokens. /// /// Returns None if unable to allocate new blocks, /// otherwise returns the block ids of the new blocks. /// /// An empty vector is returned if no new blocks are required. #[tracing::instrument(level = "debug", skip(block_pool), ret)] pub fn allocate_blocks( &mut self, num_new_tokens: usize, block_pool: &dyn BlockPool, ) -> Option> { let total_num_blocks = (self.computed_position + num_new_tokens).div_ceil(self.sequence.block_size()); let num_new_blocks = total_num_blocks - (self.immutable.len() + self.mutable.len()); if num_new_blocks == 0 { return Some(Vec::new()); } let new_blocks = block_pool.allocate_blocks_blocking(num_new_blocks).ok(); match new_blocks { Some(new_blocks) => { let block_ids = new_blocks.iter().map(|b| b.block_id()).collect(); self.mutable.extend(new_blocks); Some(block_ids) } None => None, } } /// Frees the blocks in the slot. /// This will return the blocks in reverse order so that the tail blocks are evicted first. #[tracing::instrument(level = "debug")] pub fn free_blocks(&mut self) { self.mutable.clear(); let mut immutable_blocks = std::mem::take(&mut self.immutable); immutable_blocks.reverse(); self.computed_position = 0; } /// Returns the block ids for the slot. /// We return in order the immutable blocks, then the mutable blocks. pub fn get_block_ids(&self) -> Vec { let mut block_ids = Vec::new(); block_ids.extend(self.immutable.iter().map(|b| b.block_id())); block_ids.extend(self.mutable.iter().map(|b| b.block_id())); block_ids } /// Number of tokens in the requested position. pub fn num_tokens(&self, position: SlotPosition) -> usize { match position { SlotPosition::Computed => self.computed_position, SlotPosition::Prefill => self.prefill_position, SlotPosition::All => self.sequence.total_tokens(), } } /// Sequence hashes for the requested position. pub fn sequence_hashes(&self, position: SlotPosition) -> Vec { match position { SlotPosition::Computed => { debug_assert!(self.computed_position <= self.sequence.total_tokens()); self.sequence.blocks()[0..self.computed_position] .iter() .map(|b| b.sequence_hash()) .collect() } SlotPosition::Prefill => { assert!(self.prefill_position <= self.sequence.total_tokens()); self.sequence.blocks()[0..self.prefill_position] .iter() .map(|b| b.sequence_hash()) .collect() } SlotPosition::All => self .sequence .blocks() .iter() .map(|b| b.sequence_hash()) .collect(), } } pub fn num_blocks_cached_from_device(&self) -> usize { self.blocks_cached_from_device } pub fn num_blocks_cached_from_host(&self) -> usize { self.blocks_cached_from_host } pub fn num_blocks_cached_from_disk(&self) -> usize { self.blocks_cached_from_disk } } impl Slot { #[tracing::instrument(level = "debug", skip(self, block_manager), ret)] pub fn trigger_onboard( &mut self, block_manager: &dynamo_llm::block_manager::KvBlockManager, ) -> Result<(), SlotError> { if self.onboard_from_host.is_none() && self.onboard_from_disk.is_none() { return Ok(()); } if let Some(host_blocks) = self.onboard_from_host.take() { self.blocks_cached_from_host = host_blocks.len(); self.onboard_blocks_to_slot(host_blocks, block_manager)?; } if let Some(disk_blocks) = self.onboard_from_disk.take() { self.blocks_cached_from_disk = disk_blocks.len(); self.onboard_blocks_to_slot(disk_blocks, block_manager)?; } tracing::debug!("onboarded blocks to slot {:?}", self); Ok(()) } #[tracing::instrument(level = "debug", skip(self, bm), ret)] pub fn onboard_blocks_to_slot( &mut self, offloaded_blocks: Vec>, bm: &dynamo_llm::block_manager::KvBlockManager, ) -> Result<(), SlotError> { if offloaded_blocks.len() > self.mutable.len() { return Err(SlotError::from_str( "insufficient mutable blocks to onboard", )); } let target_device_blocks = self.mutable.drain(0..offloaded_blocks.len()).collect(); let immutable_device_blocks = bm .onboard_blocks(offloaded_blocks, Some(target_device_blocks)) .blocking_recv() .unwrap() .map_err(|e| SlotError::from_str(&format!("failed to onboard blocks: {:?}", e)))?; self.apply_immutable_blocks(immutable_device_blocks)?; Ok(()) } pub fn store_onboard_blocks( &mut self, host_blocks: Vec>, disk_blocks: Vec>, ) { self.onboard_from_host = Some(host_blocks); self.onboard_from_disk = Some(disk_blocks); } } impl Drop for Slot { fn drop(&mut self) { self.free_blocks(); } } #[cfg(test)] mod tests { use super::*; use dynamo_llm::block_manager::{ block::locality::Local, block::{BasicMetadata, Blocks}, pool::{BlockPool, ManagedBlockPool}, storage::tests::{NullDeviceAllocator, NullDeviceStorage}, }; use dynamo_llm::tokens::{SaltHash, Tokens}; use std::sync::Arc; const BLOCK_SIZE: usize = 4; const SALT_HASH: SaltHash = 12345; // Test fixture providing a pre-configured block pool for testing struct TestFixture { pool: Arc>, _runtime: tokio::runtime::Runtime, } impl TestFixture { fn new() -> Self { use dynamo_llm::block_manager::layout::{FullyContiguous, LayoutConfig}; let config = LayoutConfig { num_blocks: 10, num_layers: 2, outer_dim: 1, page_size: BLOCK_SIZE, inner_dim: 128, alignment: 1, dtype_width_bytes: 2, }; let layout = FullyContiguous::allocate(config, &NullDeviceAllocator).unwrap(); let blocks = Blocks::<_, BasicMetadata>::new(layout, 42, 0) .unwrap() .into_blocks() .unwrap(); let runtime = tokio::runtime::Runtime::new().unwrap(); let pool = Arc::new( ManagedBlockPool::builder() .blocks(blocks) .async_runtime(runtime.handle().clone()) .build() .unwrap(), ); Self { pool, _runtime: runtime, } } } // Helper function to create a slot with a given token sequence fn create_slot_with_tokens(tokens: Vec) -> Slot { let token_sequence = Tokens::from(tokens); Slot::new(token_sequence, BLOCK_SIZE, SALT_HASH) } // Helper function to allocate blocks for a slot // Note: We allocate extra capacity to work around debug assertion issues fn allocate_blocks_for_slot( slot: &mut Slot, num_tokens: usize, pool: &dyn BlockPool, ) -> Option> { slot.allocate_blocks(num_tokens, pool) } // Phase 1: Foundation Test - Basic slot creation and state #[test] fn test_slot_creation_and_basic_state() { let initial_tokens = vec![1, 2, 3, 4]; let slot = create_slot_with_tokens(initial_tokens.clone()); // Verify initial state assert_eq!(slot.num_tokens(SlotPosition::Prefill), initial_tokens.len()); assert_eq!(slot.num_tokens(SlotPosition::Computed), 0); assert_eq!(slot.num_tokens(SlotPosition::All), initial_tokens.len()); // Verify slot starts with no blocks allocated assert_eq!(slot.get_block_ids().len(), 0); } // Phase 2: Edge Cases - Empty token application #[test] fn test_empty_token_application() { let fixture = TestFixture::new(); let initial_tokens = vec![1, 2, 3, 4]; let mut slot = create_slot_with_tokens(initial_tokens.clone()); // Allocate blocks for initial tokens let allocated_blocks = allocate_blocks_for_slot(&mut slot, initial_tokens.len(), fixture.pool.as_ref()); assert!(allocated_blocks.is_some()); assert_eq!(slot.mutable.len(), allocated_blocks.unwrap().len()); // Apply empty token list - should succeed and not change state let result = slot.apply_computed_tokens(vec![], fixture.pool.as_ref()); assert!( result.is_ok(), "Empty token application failed: {:?}", result.err() ); // State should remain unchanged assert_eq!(slot.num_tokens(SlotPosition::Computed), 0); assert_eq!(slot.num_tokens(SlotPosition::All), initial_tokens.len()); } // Phase 2: Edge Cases - Single token sequence prefill #[test] fn test_single_token_sequence() { let fixture = TestFixture::new(); let initial_tokens = vec![42]; let mut slot = create_slot_with_tokens(initial_tokens.clone()); // Verify initial state assert_eq!(slot.num_tokens(SlotPosition::Prefill), 1); assert_eq!(slot.num_tokens(SlotPosition::Computed), 0); assert_eq!(slot.num_tokens(SlotPosition::All), 1); // Allocate blocks and apply the single token let allocated_blocks = allocate_blocks_for_slot(&mut slot, initial_tokens.len(), fixture.pool.as_ref()); assert!(allocated_blocks.is_some()); assert_eq!(slot.mutable.len(), 1); let result = slot.apply_computed_tokens(initial_tokens, fixture.pool.as_ref()); assert!( result.is_ok(), "Single token prefill failed: {:?}", result.err() ); // After prefill, computed should match prefill assert_eq!(slot.num_tokens(SlotPosition::Computed), 1); assert_eq!(slot.num_tokens(SlotPosition::All), 1); // Single token doesn't fill the entire block (block_size=4), so it remains mutable assert_eq!( slot.mutable.len(), 1, "Single token should keep block as mutable" ); assert_eq!( slot.immutable.len(), 0, "Single token should not register any immutable blocks" ); } // Phase 3: Core Operations - Block allocation with chunked prefill #[test] fn test_block_allocation_chunked_prefill() { let fixture = TestFixture::new(); let initial_tokens = vec![1, 2, 3, 4, 5, 6, 7, 8]; // Exactly 2 blocks (BLOCK_SIZE = 4) let mut slot = create_slot_with_tokens(initial_tokens.clone()); // Initially no blocks allocated assert_eq!(slot.get_block_ids().len(), 0); // Allocate blocks for initial tokens (will include extra capacity) let allocated_blocks = allocate_blocks_for_slot(&mut slot, initial_tokens.len(), fixture.pool.as_ref()); assert!(allocated_blocks.is_some()); let block_ids = allocated_blocks.unwrap(); // We expect at least 2 blocks (may be more due to extra capacity) assert!( block_ids.len() >= 2, "Expected at least 2 blocks for 8 tokens, got {}", block_ids.len() ); // Verify blocks are allocated in the slot assert!(slot.get_block_ids().len() >= 2); // Complete prefill token by token to work around assertion bug for (i, token) in initial_tokens.iter().enumerate() { let result = slot.apply_computed_tokens(vec![*token], fixture.pool.as_ref()); assert!(result.is_ok(), "Token {} failed: {:?}", i, result.err()); assert_eq!(slot.num_tokens(SlotPosition::Computed), i + 1); } // Verify final state assert_eq!(slot.num_tokens(SlotPosition::Computed), 8); assert_eq!(slot.num_tokens(SlotPosition::All), 8); // 8 tokens = 2 full blocks (block_size=4), all should be registered as immutable assert_eq!( slot.mutable.len(), 0, "All blocks should be registered as immutable" ); assert_eq!( slot.immutable.len(), 2, "Should have 2 immutable blocks for 8 tokens" ); } // Phase 4: Standard Workflows - Standard decode after prefill #[test] fn test_standard_decode_flow() { let fixture = TestFixture::new(); let initial_tokens = vec![1, 2, 3, 4]; let mut slot = create_slot_with_tokens(initial_tokens.clone()); // Complete prefill first let allocated_blocks = allocate_blocks_for_slot(&mut slot, initial_tokens.len(), fixture.pool.as_ref()); assert!(allocated_blocks.is_some()); let result = slot.apply_computed_tokens(initial_tokens.clone(), fixture.pool.as_ref()); assert!(result.is_ok(), "Prefill failed: {:?}", result.err()); // Verify prefill completed assert_eq!(slot.num_tokens(SlotPosition::Computed), 4); assert_eq!(slot.num_tokens(SlotPosition::Prefill), 4); assert_eq!(slot.num_tokens(SlotPosition::All), 4); assert_eq!(slot.mutable.len(), 0); assert_eq!(slot.immutable.len(), 1); // Now we're in decode mode - add new tokens one at a time for i in 0..5 { println!("=== Decode Pass {} ===", i); let decode_token = 100 + i as u32; // Use distinct tokens for decode // Allocate space for the new token let allocated_blocks = allocate_blocks_for_slot(&mut slot, 1, fixture.pool.as_ref()); assert!( allocated_blocks.is_some(), "Failed to allocate block for decode token {}", i ); assert_eq!(slot.mutable.len(), 1); // Apply the decode token let result = slot.apply_computed_tokens(vec![decode_token], fixture.pool.as_ref()); assert!( result.is_ok(), "Decode token {} failed: {:?}", i, result.err() ); // Verify state after each decode token let expected_total = initial_tokens.len() + i + 1; assert_eq!(slot.num_tokens(SlotPosition::Computed), expected_total); assert_eq!(slot.num_tokens(SlotPosition::All), expected_total); // Prefill count should remain unchanged assert_eq!(slot.num_tokens(SlotPosition::Prefill), 4); if expected_total.is_multiple_of(BLOCK_SIZE) { assert_eq!(slot.mutable.len(), 0); assert_eq!(slot.immutable.len(), expected_total / BLOCK_SIZE); } else { assert_eq!(slot.mutable.len(), 1); assert_eq!(slot.immutable.len(), expected_total / BLOCK_SIZE); } } // Final verification assert_eq!(slot.num_tokens(SlotPosition::Computed), 9); assert_eq!(slot.num_tokens(SlotPosition::All), 9); assert_eq!(slot.num_tokens(SlotPosition::Prefill), 4); assert_eq!(slot.mutable.len(), 1); assert_eq!(slot.immutable.len(), 2); } // Debug Assertion Bug Analysis - demonstrates the issue #[test] fn test_assertion_bug_analysis() { let fixture = TestFixture::new(); let initial_tokens = vec![1, 2]; // Small sequence let mut slot = create_slot_with_tokens(initial_tokens.clone()); // Allocate exactly what we need WITHOUT extra capacity let total_needed_blocks = initial_tokens.len().div_ceil(BLOCK_SIZE); let exact_allocation = fixture .pool .allocate_blocks_blocking(total_needed_blocks) .unwrap(); slot.mutable.extend(exact_allocation); println!("=== Debug Assertion Bug Analysis ==="); println!("tokens_to_append.len(): {}", initial_tokens.len()); println!("total_needed_blocks: {}", total_needed_blocks); println!("computed_position: {}", slot.computed_position); println!("block_size: {}", BLOCK_SIZE); println!("mutable.len(): {}", slot.mutable.len()); let remaining_in_block = slot.computed_position % BLOCK_SIZE; let assertion_rhs = remaining_in_block + slot.mutable.len(); println!("computed_position % block_size: {}", remaining_in_block); println!( "Broken assertion RHS: {} + {} = {}", remaining_in_block, slot.mutable.len(), assertion_rhs ); println!( "Assertion: {} < {} = {}", initial_tokens.len(), assertion_rhs, initial_tokens.len() < assertion_rhs ); let actual_capacity = slot.mutable.len() * BLOCK_SIZE; println!( "Actual token capacity: {} blocks × {} = {}", slot.mutable.len(), BLOCK_SIZE, actual_capacity ); println!( "Should succeed: {} <= {} = {}", initial_tokens.len(), actual_capacity, initial_tokens.len() <= actual_capacity ); // This would fail with the broken assertion, but logically should succeed // since we have enough actual capacity // Apply tokens one-by-one to avoid the assertion bug for (i, token) in initial_tokens.iter().enumerate() { let result = slot.apply_computed_tokens(vec![*token], fixture.pool.as_ref()); assert!(result.is_ok(), "Token {} failed: {:?}", i, result.err()); } assert_eq!(slot.num_tokens(SlotPosition::Computed), 2); } // Phase 5: Block Caching Lifecycle - Cache miss → registration → cache hit #[test] fn test_block_caching_lifecycle() { let fixture = TestFixture::new(); let tokens = vec![1, 2, 3, 4, 5, 6, 7, 8]; // 2 full blocks let salt_hash = SALT_HASH; // === FIRST PASS: Cache Miss → Block Registration === let mut slot1 = Slot::new(tokens.clone().into(), BLOCK_SIZE, salt_hash); // Allocate blocks for first slot let allocated_blocks = allocate_blocks_for_slot(&mut slot1, tokens.len(), fixture.pool.as_ref()); assert!( allocated_blocks.is_some(), "Failed to allocate blocks for first slot" ); // Apply tokens token-by-token (work around assertion bug) for (i, token) in tokens.iter().enumerate() { let result = slot1.apply_computed_tokens(vec![*token], fixture.pool.as_ref()); assert!( result.is_ok(), "Token {} failed in first slot: {:?}", i, result.err() ); } // Verify first slot state assert_eq!(slot1.num_tokens(SlotPosition::Computed), 8); assert_eq!(slot1.num_tokens(SlotPosition::All), 8); // Capture sequence hashes and immutable blocks from first slot let sequence_hashes = slot1.sequence_hashes(SlotPosition::All); let first_slot_blocks = slot1.get_block_ids(); println!("=== First Pass (Cache Miss) ==="); println!("Sequence hashes: {:?}", sequence_hashes); println!("Block IDs: {:?}", first_slot_blocks); println!("Immutable blocks count: {}", slot1.immutable.len()); // At this point, blocks should be registered in the pool's cache // The immutable blocks contain the computed token data // Free the first slot (returns blocks to pool for reuse) drop(slot1); // === SECOND PASS: Cache Hit → Block Reuse === let mut slot2 = Slot::new(tokens.clone().into(), BLOCK_SIZE, salt_hash); // Verify that second slot has same sequence hashes let slot2_hashes = slot2.sequence_hashes(SlotPosition::All); assert_eq!( sequence_hashes, slot2_hashes, "Sequence hashes should match for same tokens/salt" ); // Now we do the REAL cache lookup - equivalent to get_computed_blocks() println!("=== Second Pass (Cache Hit) ==="); println!("Looking up sequence hashes: {:?}", sequence_hashes); // This is the actual cache lookup mechanism used by get_computed_blocks() let cached_blocks = fixture .pool .match_sequence_hashes_blocking(&sequence_hashes) .expect("Cache lookup failed"); println!("Cache hit! Found {} cached blocks", cached_blocks.len()); // Apply the cached blocks (this is the real cache hit path) let result = slot2.initialize_with_device_matches(cached_blocks); assert!(result.is_ok(), "Cache hit failed: {:?}", result.err()); // Verify second slot state matches first slot assert_eq!(slot2.num_tokens(SlotPosition::Computed), 8); assert_eq!(slot2.num_tokens(SlotPosition::All), 8); assert_eq!(slot2.sequence_hashes(SlotPosition::All), sequence_hashes); // Verify that we achieved the same result with cache hit vs cache miss println!("=== Verification ==="); println!("First slot final state: {} tokens", 8); println!( "Second slot final state: {} tokens", slot2.num_tokens(SlotPosition::All) ); println!("Cache hit successful: both slots have identical state"); // Key insight: apply_computed_blocks() is much faster than apply_computed_tokens() // because it skips token validation and block registration } // ============================================================================ // PHASE 3: BLOCK ID SHARING VALIDATION TESTS - The Critical Phase // ============================================================================ #[test] fn test_block_id_sharing_between_identical_slots() { let fixture = TestFixture::new(); let tokens = vec![1, 2, 3, 4, 5, 6, 7, 8]; // 2 full blocks let salt = SALT_HASH; let chunk_size = 2; // Chunked prefill size println!("=== Block ID Sharing Test (Chunked Prefill) ==="); // FIRST SLOT: Cache miss → chunked prefill → block registration let mut slot1 = Slot::new(tokens.clone().into(), BLOCK_SIZE, salt); // Process tokens in chunks with proper allocation pattern for (pass, chunk) in tokens.chunks(chunk_size).enumerate() { println!("Pass {}: Processing chunk {:?}", pass + 1, chunk); // Allocate blocks for this chunk let allocated_blocks = slot1.allocate_blocks(chunk_size, fixture.pool.as_ref()); println!(" Allocated blocks: {:?}", allocated_blocks); // Apply the chunk let result = slot1.apply_computed_tokens(chunk.to_vec(), fixture.pool.as_ref()); assert!( result.is_ok(), "Pass {} failed: {:?}", pass + 1, result.err() ); let computed_tokens = slot1.num_tokens(SlotPosition::Computed); let mutable_count = slot1.mutable.len(); let immutable_count = slot1.immutable.len(); println!( " After pass {}: computed={}, mutable={}, immutable={}", pass + 1, computed_tokens, mutable_count, immutable_count ); // Assert expected block counts for chunked prefill pattern match pass + 1 { 1 => { // Pass 1: First chunk (2 tokens) - block allocated but not full assert_eq!(computed_tokens, 2, "Pass 1: Should have 2 computed tokens"); assert_eq!( mutable_count, 1, "Pass 1: Should have 1 mutable block (partially filled)" ); assert_eq!(immutable_count, 0, "Pass 1: Should have 0 immutable blocks"); } 2 => { // Pass 2: Second chunk (4 tokens total) - first block full and registered assert_eq!(computed_tokens, 4, "Pass 2: Should have 4 computed tokens"); assert_eq!( mutable_count, 0, "Pass 2: Should have 0 mutable blocks (first block registered)" ); assert_eq!(immutable_count, 1, "Pass 2: Should have 1 immutable block"); } 3 => { // Pass 3: Third chunk (6 tokens total) - second block allocated assert_eq!(computed_tokens, 6, "Pass 3: Should have 6 computed tokens"); assert_eq!( mutable_count, 1, "Pass 3: Should have 1 mutable block (second block allocated)" ); assert_eq!(immutable_count, 1, "Pass 3: Should have 1 immutable block"); } 4 => { // Pass 4: Fourth chunk (8 tokens total) - second block full and registered assert_eq!(computed_tokens, 8, "Pass 4: Should have 8 computed tokens"); assert_eq!( mutable_count, 0, "Pass 4: Should have 0 mutable blocks (second block registered)" ); assert_eq!(immutable_count, 2, "Pass 4: Should have 2 immutable blocks"); } _ => panic!("Unexpected pass number: {}", pass + 1), } } let slot1_hashes = slot1.sequence_hashes(SlotPosition::All); let slot1_blocks = slot1.get_block_ids(); println!("Slot1 final state:"); println!(" Sequence hashes: {:?}", slot1_hashes); println!(" Block IDs: {:?}", slot1_blocks); println!( " Mutable blocks: {}, Immutable blocks: {}", slot1.mutable.len(), slot1.immutable.len() ); // SECOND SLOT: Cache hit → block reuse let mut slot2 = Slot::new(tokens.clone().into(), BLOCK_SIZE, salt); // Verify same sequence hashes let slot2_hashes = slot2.sequence_hashes(SlotPosition::All); assert_eq!( slot1_hashes, slot2_hashes, "Identical slots should have identical hashes" ); // Do cache lookup using the sequence hashes let cached_blocks = fixture .pool .match_sequence_hashes_blocking(&slot2_hashes) .expect("Cache lookup should succeed"); println!("Cache hit! Found {} cached blocks", cached_blocks.len()); // Apply cached blocks (this is the cache hit path) let result = slot2.initialize_with_device_matches(cached_blocks); assert!(result.is_ok(), "Cache hit failed: {:?}", result.err()); let slot2_blocks = slot2.get_block_ids(); println!("Slot2 final state:"); println!(" Block IDs: {:?}", slot2_blocks); println!( " Mutable blocks: {}, Immutable blocks: {}", slot2.mutable.len(), slot2.immutable.len() ); // *** THE KEY ASSERTION: Block ID sharing *** // Note: slot1 may have extra mutable blocks that haven't been registered yet // Only compare the immutable blocks that represent the actual computed tokens let slot1_immutable_blocks: Vec = slot1_blocks .iter() .take(slot1.immutable.len()) .cloned() .collect(); assert_eq!( slot1_immutable_blocks, slot2_blocks, "Slots with identical sequence hashes MUST share the same registered block IDs" ); // Verify both slots have same final state assert_eq!( slot1.num_tokens(SlotPosition::All), slot2.num_tokens(SlotPosition::All) ); assert_eq!( slot1.num_tokens(SlotPosition::Computed), slot2.num_tokens(SlotPosition::Computed) ); println!( "✅ Block ID sharing verified: both slots share immutable blocks {:?}", slot1_immutable_blocks ); } #[test] fn test_cache_hit_vs_cache_miss_workflow_comparison() { let fixture = TestFixture::new(); let tokens = vec![1, 2, 3, 4, 5, 6, 7, 8]; let salt = SALT_HASH; println!("=== Cache Hit vs Cache Miss Workflow ==="); // WORKFLOW 1: Cache Miss Path (slot1) let mut slot1 = Slot::new(tokens.clone().into(), BLOCK_SIZE, salt); let allocated_blocks = allocate_blocks_for_slot(&mut slot1, tokens.len(), fixture.pool.as_ref()); assert!(allocated_blocks.is_some()); let start_time = std::time::Instant::now(); // Token-by-token application (cache miss path) for token in &tokens { let result = slot1.apply_computed_tokens(vec![*token], fixture.pool.as_ref()); assert!(result.is_ok()); } let cache_miss_duration = start_time.elapsed(); let slot1_blocks = slot1.get_block_ids(); let slot1_hashes = slot1.sequence_hashes(SlotPosition::All); println!("Cache miss workflow completed in {:?}", cache_miss_duration); println!(" - Applied {} tokens individually", tokens.len()); println!(" - Registered {} blocks", slot1_blocks.len()); // WORKFLOW 2: Cache Hit Path (slot2) let mut slot2 = Slot::new(tokens.clone().into(), BLOCK_SIZE, salt); let start_time = std::time::Instant::now(); // Cache lookup and batch block application (cache hit path) let cached_blocks = fixture .pool .match_sequence_hashes_blocking(&slot1_hashes) .expect("Cache lookup failed"); let result = slot2.initialize_with_device_matches(cached_blocks); assert!(result.is_ok()); let cache_hit_duration = start_time.elapsed(); let slot2_blocks = slot2.get_block_ids(); println!("Cache hit workflow completed in {:?}", cache_hit_duration); println!(" - Applied {} blocks in batch", slot2_blocks.len()); println!(" - Skipped individual token validation"); // Verify identical final state assert_eq!(slot1_blocks, slot2_blocks); assert_eq!( slot1.num_tokens(SlotPosition::All), slot2.num_tokens(SlotPosition::All) ); assert_eq!( slot1.num_tokens(SlotPosition::Computed), slot2.num_tokens(SlotPosition::Computed) ); // Cache hit should be faster (though timing can be variable in tests) println!("Performance comparison:"); println!(" - Cache miss: {:?}", cache_miss_duration); println!(" - Cache hit: {:?}", cache_hit_duration); println!("✅ Both workflows produce identical results with shared block IDs"); } #[test] fn test_mixed_cache_scenarios_with_block_sharing() { let fixture = TestFixture::new(); // Different token sequences let tokens_a = vec![1, 2, 3, 4, 5, 6, 7, 8]; let tokens_b = vec![9, 10, 11, 12, 13, 14, 15, 16]; let salt = SALT_HASH; println!("=== Mixed Cache Scenarios ==="); // Create first slot with tokens_a (cache miss) let mut slot_a1 = Slot::new(tokens_a.clone().into(), BLOCK_SIZE, salt); let allocated_blocks = allocate_blocks_for_slot(&mut slot_a1, tokens_a.len(), fixture.pool.as_ref()); assert!(allocated_blocks.is_some()); for token in &tokens_a { let result = slot_a1.apply_computed_tokens(vec![*token], fixture.pool.as_ref()); assert!(result.is_ok()); } let hashes_a = slot_a1.sequence_hashes(SlotPosition::All); let blocks_a1 = slot_a1.get_block_ids(); // Create first slot with tokens_b (cache miss) let mut slot_b1 = Slot::new(tokens_b.clone().into(), BLOCK_SIZE, salt); let allocated_blocks = allocate_blocks_for_slot(&mut slot_b1, tokens_b.len(), fixture.pool.as_ref()); assert!(allocated_blocks.is_some()); for token in &tokens_b { let result = slot_b1.apply_computed_tokens(vec![*token], fixture.pool.as_ref()); assert!(result.is_ok()); } let hashes_b = slot_b1.sequence_hashes(SlotPosition::All); let blocks_b1 = slot_b1.get_block_ids(); // Verify different sequences have different hashes and blocks assert_ne!( hashes_a, hashes_b, "Different token sequences should have different hashes" ); assert_ne!( blocks_a1, blocks_b1, "Different sequences should have different block IDs" ); println!("Setup complete:"); println!(" - Sequence A blocks: {:?}", blocks_a1); println!(" - Sequence B blocks: {:?}", blocks_b1); // Now create duplicate slots (cache hits) let mut slot_a2 = Slot::new(tokens_a.clone().into(), BLOCK_SIZE, salt); let cached_blocks_a = fixture .pool .match_sequence_hashes_blocking(&hashes_a) .expect("Cache lookup for sequence A failed"); let result = slot_a2.initialize_with_device_matches(cached_blocks_a); assert!(result.is_ok()); let mut slot_b2 = Slot::new(tokens_b.clone().into(), BLOCK_SIZE, salt); let cached_blocks_b = fixture .pool .match_sequence_hashes_blocking(&hashes_b) .expect("Cache lookup for sequence B failed"); let result = slot_b2.initialize_with_device_matches(cached_blocks_b); assert!(result.is_ok()); let blocks_a2 = slot_a2.get_block_ids(); let blocks_b2 = slot_b2.get_block_ids(); // Verify block sharing within same sequences assert_eq!(blocks_a1, blocks_a2, "Sequence A slots should share blocks"); assert_eq!(blocks_b1, blocks_b2, "Sequence B slots should share blocks"); // Verify no sharing between different sequences assert_ne!( blocks_a2, blocks_b2, "Different sequences should not share blocks" ); println!("✅ Mixed cache scenario validation:"); println!(" - A1 and A2 share blocks: {:?}", blocks_a1); println!(" - B1 and B2 share blocks: {:?}", blocks_b1); println!(" - A and B sequences use different blocks ✓"); } #[test] fn test_salt_prevents_unwanted_block_sharing() { let fixture = TestFixture::new(); let tokens = vec![1, 2, 3, 4, 5, 6, 7, 8]; let salt1 = SALT_HASH; let salt2 = SALT_HASH + 1000; // Different salt println!("=== Salt Isolation Test ==="); // Create slots with same tokens but different salts let mut slot1 = Slot::new(tokens.clone().into(), BLOCK_SIZE, salt1); let allocated_blocks = allocate_blocks_for_slot(&mut slot1, tokens.len(), fixture.pool.as_ref()); assert!(allocated_blocks.is_some()); for token in &tokens { let result = slot1.apply_computed_tokens(vec![*token], fixture.pool.as_ref()); assert!(result.is_ok()); } let mut slot2 = Slot::new(tokens.clone().into(), BLOCK_SIZE, salt2); let allocated_blocks = allocate_blocks_for_slot(&mut slot2, tokens.len(), fixture.pool.as_ref()); assert!(allocated_blocks.is_some()); for token in &tokens { let result = slot2.apply_computed_tokens(vec![*token], fixture.pool.as_ref()); assert!(result.is_ok()); } let hashes1 = slot1.sequence_hashes(SlotPosition::All); let hashes2 = slot2.sequence_hashes(SlotPosition::All); let blocks1 = slot1.get_block_ids(); let blocks2 = slot2.get_block_ids(); // Different salts should prevent block sharing assert_ne!( hashes1, hashes2, "Different salts should produce different hashes" ); assert_ne!( blocks1, blocks2, "Different salts should prevent block sharing" ); println!("Salt isolation verified:"); println!(" - Same tokens: {:?}", tokens); println!(" - Salt1 {} → blocks {:?}", salt1, blocks1); println!(" - Salt2 {} → blocks {:?}", salt2, blocks2); println!("✅ Different salts prevent unwanted block sharing"); } // ============================================================================ // PHASE 4: COMPLEX SCENARIOS & ERROR CONDITIONS TESTS // ============================================================================ #[test] fn test_insufficient_capacity_error_handling() { let fixture = TestFixture::new(); let initial_tokens = vec![1, 2]; // 2 tokens let mut slot = create_slot_with_tokens(initial_tokens.clone()); println!("=== Insufficient Capacity Error Test ==="); // Allocate exactly enough blocks for initial tokens (1 block for 2 tokens) let allocated_blocks = slot.allocate_blocks(2, fixture.pool.as_ref()); assert!(allocated_blocks.is_some()); assert_eq!(allocated_blocks.unwrap().len(), 1); println!("Allocated 1 block for 2 tokens"); // Apply initial tokens successfully let result = slot.apply_computed_tokens(initial_tokens, fixture.pool.as_ref()); assert!(result.is_ok(), "Initial token application should succeed"); println!("Applied initial 2 tokens successfully"); // Validate internal state after successful application assert_eq!(slot.num_tokens(SlotPosition::Computed), 2); assert_eq!( slot.mutable.len(), 1, "Should have 1 mutable block (partially filled)" ); assert_eq!( slot.immutable.len(), 0, "Should have 0 immutable blocks (block not full)" ); println!( " Internal state after success: mutable={}, immutable={}", slot.mutable.len(), slot.immutable.len() ); // Now try to apply more tokens than available capacity let excessive_tokens = vec![3, 4, 5, 6, 7]; // 5 tokens, but only 2 slots left in block let result = slot.apply_computed_tokens(excessive_tokens, fixture.pool.as_ref()); // Should fail with clear error message assert!(result.is_err(), "Should fail with insufficient capacity"); let error_msg = format!("{:?}", result.err().unwrap()); assert!( error_msg.contains("Insufficient capacity"), "Error should mention insufficient capacity: {}", error_msg ); assert!( error_msg.contains("need 5 tokens but only 2 available"), "Error should specify exact capacity issue: {}", error_msg ); // Validate internal state is unchanged after error assert_eq!( slot.num_tokens(SlotPosition::Computed), 2, "Computed tokens should be unchanged after error" ); assert_eq!( slot.mutable.len(), 1, "Mutable block count should be unchanged after error" ); assert_eq!( slot.immutable.len(), 0, "Immutable block count should be unchanged after error" ); println!( " Internal state after error: mutable={}, immutable={} (unchanged)", slot.mutable.len(), slot.immutable.len() ); println!("✅ Insufficient capacity error handled correctly"); println!(" Error: {}", error_msg); } #[test] fn test_apply_tokens_without_allocation() { let fixture = TestFixture::new(); let tokens = vec![1, 2, 3, 4]; let mut slot = create_slot_with_tokens(tokens.clone()); println!("=== Apply Tokens Without Allocation Test ==="); // Validate initial state (no blocks allocated) assert_eq!(slot.num_tokens(SlotPosition::Computed), 0); assert_eq!(slot.mutable.len(), 0, "Should start with 0 mutable blocks"); assert_eq!( slot.immutable.len(), 0, "Should start with 0 immutable blocks" ); println!( " Initial state: mutable={}, immutable={}", slot.mutable.len(), slot.immutable.len() ); // Try to apply tokens without allocating blocks first let result = slot.apply_computed_tokens(tokens, fixture.pool.as_ref()); // Should fail because no mutable blocks are allocated assert!(result.is_err(), "Should fail without block allocation"); let error_msg = format!("{:?}", result.err().unwrap()); assert!( error_msg.contains("Insufficient capacity"), "Error should mention insufficient capacity: {}", error_msg ); assert!( error_msg.contains("need 4 tokens but only 0 available"), "Error should specify no capacity available: {}", error_msg ); // Validate state is unchanged after error assert_eq!( slot.num_tokens(SlotPosition::Computed), 0, "Computed tokens should remain 0 after error" ); assert_eq!( slot.mutable.len(), 0, "Mutable block count should remain 0 after error" ); assert_eq!( slot.immutable.len(), 0, "Immutable block count should remain 0 after error" ); println!( " State after error: mutable={}, immutable={} (unchanged)", slot.mutable.len(), slot.immutable.len() ); println!("✅ Apply without allocation error handled correctly"); println!(" Error: {}", error_msg); } #[test] fn test_progressive_token_application_with_capacity_management() { let fixture = TestFixture::new(); let mut slot = Slot::new(vec![1, 2, 3, 4, 5, 6, 7, 8].into(), BLOCK_SIZE, SALT_HASH); println!("=== Progressive Token Application Test ==="); // Apply tokens progressively, allocating capacity as needed let token_chunks = [vec![1, 2], vec![3, 4], vec![5, 6], vec![7, 8]]; for (i, chunk) in token_chunks.iter().enumerate() { println!("Applying chunk {}: {:?}", i + 1, chunk); // Allocate capacity for this chunk let allocated = slot.allocate_blocks(chunk.len(), fixture.pool.as_ref()); assert!( allocated.is_some(), "Should successfully allocate for chunk {}", i + 1 ); // Apply the chunk let result = slot.apply_computed_tokens(chunk.clone(), fixture.pool.as_ref()); assert!( result.is_ok(), "Chunk {} should apply successfully: {:?}", i + 1, result.err() ); let computed = slot.num_tokens(SlotPosition::Computed); let mutable_count = slot.mutable.len(); let immutable_count = slot.immutable.len(); println!( " After chunk {}: computed={} tokens, mutable={}, immutable={}", i + 1, computed, mutable_count, immutable_count ); // Validate internal state progression (similar to chunked prefill pattern) let expected_immutable = computed / BLOCK_SIZE; let expected_mutable = if computed % BLOCK_SIZE == 0 { 0 } else { 1 }; assert_eq!( immutable_count, expected_immutable, "Chunk {}: Expected {} immutable blocks for {} computed tokens", i + 1, expected_immutable, computed ); assert!( mutable_count <= expected_mutable + 1, "Chunk {}: Mutable count {} should be <= {} (may have extra allocated)", i + 1, mutable_count, expected_mutable + 1 ); } // Verify final state assert_eq!(slot.num_tokens(SlotPosition::Computed), 8); assert_eq!(slot.num_tokens(SlotPosition::All), 8); assert_eq!( slot.immutable.len(), 2, "Should have 2 immutable blocks (8 tokens / 4 per block)" ); assert_eq!( slot.mutable.len(), 0, "Should have 0 mutable blocks (all tokens applied and registered)" ); println!("✅ Progressive token application completed successfully"); println!( " Final state: mutable={}, immutable={}", slot.mutable.len(), slot.immutable.len() ); } #[test] fn test_speculative_decode_over_allocation() { let fixture = TestFixture::new(); let initial_tokens = vec![1, 2, 3, 4]; // 1 block worth let mut slot = create_slot_with_tokens(initial_tokens.clone()); println!("=== Speculative Decode Over-Allocation Test ==="); // Complete prefill first let allocated_blocks = slot.allocate_blocks(initial_tokens.len(), fixture.pool.as_ref()); assert!(allocated_blocks.is_some()); let result = slot.apply_computed_tokens(initial_tokens, fixture.pool.as_ref()); assert!(result.is_ok()); println!( "Prefill completed: {} tokens", slot.num_tokens(SlotPosition::Computed) ); // Allocate capacity for speculative decode (more than we'll actually use) let speculative_capacity = 6; // Allocate for 6 tokens let allocated_blocks = slot.allocate_blocks(speculative_capacity, fixture.pool.as_ref()); assert!(allocated_blocks.is_some()); let allocated_count = allocated_blocks.unwrap().len(); println!( "Allocated {} blocks for speculative decode", allocated_count ); // Only use partial capacity (simulate speculative decode where only some predictions are correct) let actual_decode_tokens = vec![100, 101]; // Only 2 tokens used out of 6 allocated let result = slot.apply_computed_tokens(actual_decode_tokens, fixture.pool.as_ref()); assert!(result.is_ok(), "Partial utilization should succeed"); // Verify state assert_eq!(slot.num_tokens(SlotPosition::Computed), 6); // 4 prefill + 2 decode assert_eq!(slot.num_tokens(SlotPosition::All), 6); // Validate internal state after speculative decode let expected_immutable = 6 / BLOCK_SIZE; // 6 tokens / 4 per block = 1 immutable block let remaining_computed = 6 % BLOCK_SIZE; // 6 % 4 = 2 tokens in partial block assert_eq!( slot.immutable.len(), expected_immutable, "Should have {} immutable blocks for {} computed tokens", expected_immutable, slot.num_tokens(SlotPosition::Computed) ); // Verify we still have unused mutable blocks (over-allocated) assert!( !slot.mutable.is_empty(), "Should have unused mutable blocks from over-allocation" ); // Calculate expected vs actual capacity let used_capacity_in_mutable = if remaining_computed > 0 { remaining_computed } else { 0 }; let total_mutable_capacity = slot.mutable.len() * BLOCK_SIZE; let unused_capacity = total_mutable_capacity - used_capacity_in_mutable; assert!( unused_capacity >= 4, "Should have at least 4 unused token slots from over-allocation, got {}", unused_capacity ); println!("✅ Speculative decode over-allocation handled correctly"); println!(" Used: 2 decode tokens, Allocated capacity for: 6 tokens"); println!( " Internal state: mutable={}, immutable={}", slot.mutable.len(), slot.immutable.len() ); println!( " Capacity: used {} slots, unused {} slots in mutable blocks", used_capacity_in_mutable, unused_capacity ); } #[test] fn test_mutual_exclusivity_cache_operations() { let fixture = TestFixture::new(); let tokens = vec![1, 2, 3, 4, 5, 6, 7, 8]; let salt = SALT_HASH; println!("=== Mutual Exclusivity Test ==="); // Create first slot and complete cache miss workflow let mut slot1 = Slot::new(tokens.clone().into(), BLOCK_SIZE, salt); let allocated_blocks = allocate_blocks_for_slot(&mut slot1, tokens.len(), fixture.pool.as_ref()); assert!(allocated_blocks.is_some()); for token in &tokens { let result = slot1.apply_computed_tokens(vec![*token], fixture.pool.as_ref()); assert!(result.is_ok()); } let sequence_hashes = slot1.sequence_hashes(SlotPosition::All); // Create second slot for testing mutual exclusivity let mut slot2 = Slot::new(tokens.clone().into(), BLOCK_SIZE, salt); // Get cached blocks for potential cache hit let cached_blocks = fixture .pool .match_sequence_hashes_blocking(&sequence_hashes) .expect("Cache lookup should succeed"); // Test 1: Apply cached blocks (should succeed) let result = slot2.initialize_with_device_matches(cached_blocks); assert!(result.is_ok(), "Cache hit should succeed"); // Validate internal state after cache hit assert_eq!( slot2.num_tokens(SlotPosition::Computed), 8, "Cache hit should result in 8 computed tokens" ); assert_eq!( slot2.immutable.len(), 2, "Cache hit should result in 2 immutable blocks" ); assert_eq!( slot2.mutable.len(), 0, "Cache hit should have 0 mutable blocks (all blocks cached)" ); println!("✅ Cache hit operation succeeded"); println!( " Internal state after cache hit: mutable={}, immutable={}", slot2.mutable.len(), slot2.immutable.len() ); // Test 2: Try to apply tokens after applying cached blocks (should work as decode) let additional_tokens = vec![9, 10]; // First allocate blocks for the additional tokens let allocated_blocks = slot2.allocate_blocks(additional_tokens.len(), fixture.pool.as_ref()); if allocated_blocks.is_some() { let pre_decode_mutable = slot2.mutable.len(); let _ = slot2.immutable.len(); let result = slot2.apply_computed_tokens(additional_tokens, fixture.pool.as_ref()); // This should work as decode tokens after cache hit assert!(result.is_ok(), "Decode after cache hit should work"); // Validate state after decode assert_eq!( slot2.num_tokens(SlotPosition::Computed), 10, "Should have 10 total tokens after decode" ); assert!( slot2.mutable.len() >= pre_decode_mutable, "Should have allocated new mutable blocks for decode" ); println!("✅ Decode tokens after cache hit succeeded (expected behavior)"); println!( " Internal state after decode: mutable={}, immutable={}", slot2.mutable.len(), slot2.immutable.len() ); } println!("✅ Mutual exclusivity test completed"); } #[test] fn test_zero_token_edge_cases() { let fixture = TestFixture::new(); println!("=== Zero Token Edge Cases Test ==="); // Test 1: Create slot with empty token sequence let empty_tokens: Vec = vec![]; let mut empty_slot = Slot::new(empty_tokens.into(), BLOCK_SIZE, SALT_HASH); assert_eq!(empty_slot.num_tokens(SlotPosition::All), 0); assert_eq!(empty_slot.num_tokens(SlotPosition::Prefill), 0); assert_eq!(empty_slot.num_tokens(SlotPosition::Computed), 0); // Validate initial internal state for empty slot assert_eq!( empty_slot.mutable.len(), 0, "Empty slot should have 0 mutable blocks" ); assert_eq!( empty_slot.immutable.len(), 0, "Empty slot should have 0 immutable blocks" ); println!( " Empty slot initial state: mutable={}, immutable={}", empty_slot.mutable.len(), empty_slot.immutable.len() ); // Test 2: Apply empty token list (should succeed) let result = empty_slot.apply_computed_tokens(vec![], fixture.pool.as_ref()); assert!(result.is_ok(), "Empty token application should succeed"); // Validate state unchanged after empty application assert_eq!(empty_slot.num_tokens(SlotPosition::Computed), 0); assert_eq!( empty_slot.mutable.len(), 0, "Empty application should not change mutable blocks" ); assert_eq!( empty_slot.immutable.len(), 0, "Empty application should not change immutable blocks" ); println!( " After empty application: mutable={}, immutable={} (unchanged)", empty_slot.mutable.len(), empty_slot.immutable.len() ); // Test 3: Allocate zero blocks let allocated = empty_slot.allocate_blocks(0, fixture.pool.as_ref()); assert!(allocated.is_some(), "Zero block allocation should succeed"); assert_eq!( allocated.unwrap().len(), 0, "Should return empty block list" ); // Validate state unchanged after zero allocation assert_eq!( empty_slot.mutable.len(), 0, "Zero allocation should not change mutable blocks" ); assert_eq!( empty_slot.immutable.len(), 0, "Zero allocation should not change immutable blocks" ); println!( " After zero allocation: mutable={}, immutable={} (unchanged)", empty_slot.mutable.len(), empty_slot.immutable.len() ); println!("✅ Zero token edge cases handled correctly"); } #[test] fn test_block_pool_resource_constraints() { let fixture = TestFixture::new(); let tokens = vec![1, 2, 3, 4]; println!("=== Block Pool Resource Constraints Test ==="); // Create multiple slots to potentially exhaust the pool let mut slots = Vec::new(); let mut successful_allocations = 0; // Keep allocating until we hit the pool limit for i in 0..20 { // Try to create many slots let mut slot = create_slot_with_tokens(tokens.clone()); let allocated = slot.allocate_blocks(tokens.len(), fixture.pool.as_ref()); if allocated.is_some() && !allocated.as_ref().unwrap().is_empty() { successful_allocations += 1; slots.push(slot); println!("Slot {}: Successfully allocated blocks", i); } else { println!("Slot {}: Failed to allocate blocks (pool exhausted)", i); break; } } println!( "Successfully allocated blocks for {} slots", successful_allocations ); assert!( successful_allocations > 0, "Should be able to allocate at least some blocks" ); // Try one more allocation that should fail let mut final_slot = create_slot_with_tokens(tokens.clone()); let final_allocation = final_slot.allocate_blocks(tokens.len(), fixture.pool.as_ref()); if final_allocation.is_none() || final_allocation.unwrap().is_empty() { println!("✅ Pool exhaustion handled gracefully"); } else { println!("Note: Pool had more capacity than expected"); } println!("✅ Resource constraint test completed"); } #[test] fn test_sequence_hash_mismatch_handling() { let fixture = TestFixture::new(); let tokens1 = vec![1, 2, 3, 4]; let tokens2 = vec![5, 6, 7, 8]; // Different tokens let salt = SALT_HASH; println!("=== Sequence Hash Mismatch Test ==="); // Create first slot and cache blocks let mut slot1 = Slot::new(tokens1.clone().into(), BLOCK_SIZE, salt); let allocated_blocks = allocate_blocks_for_slot(&mut slot1, tokens1.len(), fixture.pool.as_ref()); assert!(allocated_blocks.is_some()); for token in &tokens1 { let result = slot1.apply_computed_tokens(vec![*token], fixture.pool.as_ref()); assert!(result.is_ok()); } let hashes1 = slot1.sequence_hashes(SlotPosition::All); // Create second slot with different tokens let mut slot2 = Slot::new(tokens2.clone().into(), BLOCK_SIZE, salt); let hashes2 = slot2.sequence_hashes(SlotPosition::All); // Verify hashes are different assert_ne!( hashes1, hashes2, "Different tokens should have different hashes" ); // Try to apply blocks from slot1 to slot2 (should fail due to hash mismatch) let cached_blocks = fixture .pool .match_sequence_hashes_blocking(&hashes1) .expect("Should find cached blocks"); // This test documents current behavior - the system should detect hash mismatches // but the current implementation might not validate this at the slot level println!("Cached blocks from tokens1: {} blocks", cached_blocks.len()); println!("Attempting to apply to slot with different token sequence..."); // The hash mismatch detection happens in apply_computed_blocks let result = slot2.initialize_with_device_matches(cached_blocks); if result.is_err() { println!("✅ Hash mismatch correctly detected and rejected"); } else { println!("Note: Hash mismatch not detected at this level (may be validated elsewhere)"); } println!("✅ Sequence hash mismatch test completed"); } #[test] fn test_blocks_chunked_prefill_with_decode_tokens() { let fixture = TestFixture::new(); let tokens = vec![0; BLOCK_SIZE * 2]; let mut slot = Slot::new(tokens.clone().into(), BLOCK_SIZE, SALT_HASH); let allocated_blocks = slot.allocate_blocks(tokens.len() + 2, fixture.pool.as_ref()); assert_eq!(allocated_blocks.unwrap().len(), 3); slot.apply_computed_tokens(tokens[..BLOCK_SIZE].to_vec(), fixture.pool.as_ref()) .unwrap(); assert_eq!(slot.immutable.len(), 1); assert_eq!(slot.mutable.len(), 2); // Add the remaining prefill tokens along with some simulated decode tokens. let remaining_prefill_with_decode_tokens = vec![0; BLOCK_SIZE + 1]; slot.apply_computed_tokens(remaining_prefill_with_decode_tokens, fixture.pool.as_ref()) .unwrap(); assert_eq!(slot.immutable.len(), 2); assert_eq!(slot.mutable.len(), 1); } }