// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. // SPDX-License-Identifier: Apache-2.0 // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. use super::*; use reuse::AvailableBlocks; /// Manages the reservation and priority reuse of kv blocks for a single storage type, /// e.g. a GPU, host memory. pub struct KvStorageManager { available_blocks: AvailableBlocks, inflight_blocks: ReservedBlocks, block_size: usize, } impl KvStorageManager { pub async fn new(block_size: usize) -> Self { Self { available_blocks: AvailableBlocks::new().await, inflight_blocks: ReservedBlocks::new(block_size), block_size, } } pub async fn prepare_prefill_sequence(&mut self, tokens: Tokens) -> Result { log::debug!("adding request with {} tokens", tokens.len()); let seq = tokens.into_sequence(self.block_size); let (blocks, tail_block) = seq.into_parts(); log::debug!( "request translates to {} blocks; remaining tokens: {}", blocks.len(), tail_block.tokens().len() ); // first match blocks to inflight blocks let mut inflight_blocks = self.inflight_blocks.match_token_blocks(&blocks)?; log::debug!("matched {} inflight blocks", inflight_blocks.len()); // shift the blocks to the left by the number of inflight blocks let unmatched_blocks = &blocks[inflight_blocks.len()..]; let unmatched_hashes = unmatched_blocks .iter() .map(|b| b.sequence_hash()) .collect::>(); // match the remaining blocks to freed gpu blocks (available_blocks) let unregistered_blocks = self.available_blocks.match_blocks(unmatched_hashes).await?; log::debug!("matched {} freed blocks", unregistered_blocks.len()); // the blocks from the freed blocks pool must be registered as inflight blocks // todo - we might have to register the list of unregistered blocks as a single transaction for block in unregistered_blocks { inflight_blocks.push(self.inflight_blocks.register(block)?); } // the remaining blocks are the unmatched blocks let remaining_blocks = blocks.into_iter().skip(inflight_blocks.len()).collect(); Ok(PrefillMatched { inflight_blocks, remaining_blocks, tail_block, }) } pub async fn prepare_prefill_offload( &mut self, matched: PrefillMatched, ) -> Result { let (inflight_blocks, remaining_blocks, tail_block) = matched.dissolve(); let mut blocks_to_reuse = self .available_blocks .take_blocks(remaining_blocks.len() as u32 + 1) .await?; if blocks_to_reuse.len() != remaining_blocks.len() + 1 { raise!( "expected {} blocks, got {}", remaining_blocks.len() + 1, blocks_to_reuse.len() ); } // update the blocks_to_reuse with the token block from remaining_blocks let complete_prefill_blocks: Vec = remaining_blocks .into_iter() .map(|b| { let mut block = blocks_to_reuse.pop().unwrap(); block.update_token_block(b); block }) .collect(); assert_eq!(blocks_to_reuse.len(), 1); let tail_kv_block = blocks_to_reuse.pop().unwrap(); let tail_prefill_block = PartialKvBlock { token_block: tail_block, kv_block: tail_kv_block, }; Ok(PrefillOffload { inflight_blocks, complete_prefill_blocks, tail_prefill_block, }) } } #[derive(Dissolve)] pub struct PartialKvBlock { token_block: PartialTokenBlock, kv_block: UniqueBlock, } #[derive(Dissolve)] pub struct PrefillMatched { inflight_blocks: Vec, remaining_blocks: Vec, tail_block: PartialTokenBlock, } #[derive(Dissolve)] pub struct PrefillOffload { inflight_blocks: Vec, complete_prefill_blocks: Vec, tail_prefill_block: PartialKvBlock, } // #[cfg(test)] // mod tests { // use super::*; // use dynamo_runtime::logging::init; // #[tokio::test] // async fn test() { // init(); // let mut manager = KvStorageManager::new(2); // for _ in 0..100 { // manager.available_blocks.insert(KvBlock::default()); // } // let tokens = Tokens::from([0_i32, 1, 2, 3, 4, 5, 6, 7, 8].as_ref()); // // this is good for the scheduler to make a local decision as it now knows how many // // net-new blocks need to be prefilled // let sequence = manager.prepare_prefill_sequence(tokens).unwrap(); // assert_eq!(sequence.inflight_blocks.len(), 0); // assert_eq!(sequence.remaining_blocks.len(), 4); // } // }