"lib/bench/vscode:/vscode.git/clone" did not exist on "9f3b7b330db03848d03b9a14f94930af26a3761f"
Unverified Commit 6654a57d authored by Yuewei Na's avatar Yuewei Na Committed by GitHub
Browse files

fix: prevent device_blocks double-add in TRT-LLM KVBM connector (#6406)


Signed-off-by: default avatarYuewei Na <nv-yna@users.noreply.github.com>
Co-authored-by: default avatarYuewei Na <nv-yna@users.noreply.github.com>
parent 6d3b92f0
// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
use std::{any::Any, cmp::max, sync::Arc};
use std::{
any::Any,
cmp::max,
collections::{HashMap, HashSet},
sync::Arc,
};
use dynamo_llm::{
block_manager::{
......@@ -347,8 +352,9 @@ pub struct VllmConnectorSlot {
/// The number of blocks cached from the disk
tokens_cached_from_disk: usize,
/// Phantom data to ensure the storage type is correct.
block_manager: VllmBlockManager,
/// Block manager for device pool operations (cache lookup, onboarding).
/// `None` only in unit tests where these operations are not exercised.
block_manager: Option<VllmBlockManager>,
block_size: usize,
......@@ -384,6 +390,10 @@ pub struct VllmConnectorSlot {
/// Block index where offload was terminated due to priority filtering.
/// When Some, no further blocks will be offloaded to ensure global contiguity.
offload_terminated_at_block: Option<usize>,
/// Stored block priorities from previous apply_scheduler_output calls.
/// Used as fallback when priorities=None in subsequent chunked prefill iterations.
stored_block_priorities: HashMap<BlockId, u32>,
}
impl VllmConnectorSlot {
......@@ -404,7 +414,7 @@ impl VllmConnectorSlot {
Self {
request_id,
sequence,
block_manager,
block_manager: Some(block_manager),
block_size,
xfer_tx,
// default values
......@@ -424,9 +434,53 @@ impl VllmConnectorSlot {
cache_stats,
offload_min_priority,
offload_terminated_at_block: None,
stored_block_priorities: HashMap::new(),
}
}
#[cfg(test)]
fn new_for_test(
request_id: String,
tokens: Tokens,
salt_hash: SaltHash,
block_size: usize,
xfer_tx: mpsc::UnboundedSender<LocalTransferRequest>,
cache_stats: Arc<CacheStatsTracker>,
offload_min_priority: u32,
) -> Self {
assert!(!tokens.is_empty(), "tokens must be non-empty");
let sequence = TokenBlockSequence::new(tokens, block_size as u32, Some(salt_hash));
Self {
request_id,
sequence,
block_manager: None,
block_size,
xfer_tx,
state: SlotState::Initialized,
iteration_first_scheduled: None,
current_position: 0,
evaluated_blocks: 0,
device_blocks: Vec::new(),
staging_from_host: None,
staging_from_disk: None,
pending_operations: None,
tokens_cached_from_device: 0,
tokens_cached_from_host: 0,
tokens_cached_from_disk: 0,
performed_cache_lookup: false,
total_blocks_queried: 0,
cache_stats,
offload_min_priority,
offload_terminated_at_block: None,
stored_block_priorities: HashMap::new(),
}
}
#[cfg(test)]
fn device_blocks_snapshot(&self) -> &[BlockId] {
&self.device_blocks
}
fn mark_as_skipped_prefill(&mut self) -> Result<(), SlotError> {
if self.state != SlotState::Prefilling {
return Err(SlotError::InvalidState(format!(
......@@ -502,6 +556,7 @@ impl Slot for VllmConnectorSlot {
self.performed_cache_lookup = false;
self.total_blocks_queried = 0;
self.offload_terminated_at_block = None;
self.stored_block_priorities.clear();
}
fn reset(&mut self) {
......@@ -543,6 +598,20 @@ impl Slot for VllmConnectorSlot {
num_scheduled_tokens: usize,
priorities: Option<&[u32]>,
) -> Result<(), SlotError> {
tracing::debug!(
"ENTRY: apply_scheduler_output: req={}, tokens.len={}, block_ids.len={}, computed={}, scheduled={}, \
has_priorities={}, current_pos={}, evaluated_blocks={}, device_blocks_len={}",
self.request_id,
tokens.len(),
block_ids.len(),
num_computed_tokens,
num_scheduled_tokens,
priorities.is_some(),
self.current_position,
self.evaluated_blocks,
self.device_blocks.len()
);
// Validate contract: priorities must match block_ids length when provided
if let Some(prios) = priorities {
assert_eq!(
......@@ -570,10 +639,82 @@ impl Slot for VllmConnectorSlot {
self.current_position = max(self.current_position, num_computed_tokens);
self.evaluated_blocks = max(self.evaluated_blocks, num_computed_tokens / self.block_size);
// apply new block_ids
// Apply new block_ids with suffix/prefix overlap contract.
// Block IDs are unique, so we use an O(N) algorithm:
// 1. Locate block_ids[0] in device_blocks via rposition (at most one match).
// 2. Verify device_blocks[pos..] == block_ids[..suffix_len] (the overlap).
// 3. Extend with only the non-overlapping tail block_ids[overlap_len..].
//
// Valid cases:
// [3,4,5] + [6,7] → [3,4,5,6,7] (no overlap)
// [3,4,5,6,7] + [6,7,8,9] → [3,4,5,6,7,8,9] (suffix/prefix overlap of 2)
// [3,4,5] + [3,4,5] → [3,4,5] (full overlap)
//
// Invalid (panics):
// [3,4,5,6,7] + [6,8,9] → block 6 found but suffix doesn't match prefix
if !block_ids.is_empty() {
tracing::debug!("assigning {} new device blocks slot", block_ids.len());
self.device_blocks.extend(block_ids);
let overlap_len = if let Some(pos) = self
.device_blocks
.iter()
.rposition(|&id| id == block_ids[0])
{
let suffix_len = self.device_blocks.len() - pos;
assert!(
suffix_len <= block_ids.len()
&& self.device_blocks[pos..] == block_ids[..suffix_len],
"device_blocks contract violation: block_ids[0]={} found at \
device_blocks[{}] but device_blocks[{}..] != block_ids[..{}] \
(device_blocks={:?}, block_ids={:?})",
block_ids[0],
pos,
pos,
suffix_len,
self.device_blocks,
block_ids
);
suffix_len
} else {
0
};
let new_ids = &block_ids[overlap_len..];
if !new_ids.is_empty() {
// Validate: no block in the non-overlapping portion should already exist.
let existing: HashSet<BlockId> = self.device_blocks.iter().copied().collect();
for id in new_ids {
assert!(
!existing.contains(id),
"device_blocks contract violation: block {} already in device_blocks \
but not part of suffix/prefix overlap (overlap_len={}, \
device_blocks={:?}, block_ids={:?})",
id,
overlap_len,
self.device_blocks,
block_ids
);
}
self.device_blocks.extend_from_slice(new_ids);
}
if overlap_len > 0 {
tracing::debug!(
"DEDUP: suffix/prefix overlap of {} block_ids for req={}, extended with {} new",
overlap_len,
self.request_id,
new_ids.len()
);
}
}
// Store block→priority mapping for use in subsequent chunked prefill iterations.
// In chunked prefill, new_request (chunk 1) carries priorities for ALL blocks,
// but cached_request (chunk 2+) has priorities=None. Storing here lets us look up
// priorities for blocks evaluated in later chunks.
if let Some(prios) = priorities {
for (block_id, priority) in block_ids.iter().zip(prios.iter()) {
self.stored_block_priorities.insert(*block_id, *priority);
}
}
// Early exit if offload has been permanently terminated.
......@@ -640,38 +781,14 @@ impl Slot for VllmConnectorSlot {
.copied()
.collect();
// Get candidate priorities from the priorities parameter.
// When priorities are provided, extract priorities for candidate blocks.
let candidate_priorities: Vec<u32> = if let Some(prios) = priorities {
let new_blocks_start = self.device_blocks.len() - block_ids.len();
let candidate_start = self.evaluated_blocks;
if candidate_start >= new_blocks_start {
let prio_offset = candidate_start - new_blocks_start;
debug_assert!(
prio_offset + num_candidate_blocks <= prios.len(),
"prio_offset ({}) + num_candidate_blocks ({}) > prios.len() ({}); \
candidate_start={}, new_blocks_start={}, device_blocks.len()={}, block_ids.len()={}",
prio_offset,
num_candidate_blocks,
prios.len(),
candidate_start,
new_blocks_start,
self.device_blocks.len(),
block_ids.len()
);
prios
// Look up candidate priorities from stored_block_priorities.
// The HashMap is populated above (lines 700-706) for every block_id
// that arrives with priorities. This replaces fragile offset-based indexing
// that assumed block_ids was appended verbatim to device_blocks.
let candidate_priorities: Vec<u32> = candidate_block_ids
.iter()
.skip(prio_offset)
.take(num_candidate_blocks)
.copied()
.collect()
} else {
vec![0; num_candidate_blocks]
}
} else {
vec![0; num_candidate_blocks]
};
.map(|id| self.stored_block_priorities.get(id).copied().unwrap_or(0))
.collect();
assert_eq!(
candidate_block_ids.len(),
......@@ -691,6 +808,17 @@ impl Slot for VllmConnectorSlot {
num_candidate_blocks
};
tracing::debug!(
"OFFLOAD_DECISION: req={}, num_candidate={}, num_to_offload={}, threshold={}, \
candidate_block_ids={:?}, candidate_priorities={:?}",
self.request_id,
num_candidate_blocks,
num_blocks_to_offload,
self.offload_min_priority,
&candidate_block_ids,
&candidate_priorities
);
if num_blocks_to_offload > 0 {
if self.offload_min_priority > 0 {
tracing::debug!(
......@@ -863,7 +991,7 @@ impl Slot for VllmConnectorSlot {
tracing::info!("slot is in the Preempted state; we get another chance to match");
}
let block_size = self.block_manager.block_size();
let block_size = self.block_size;
let num_computed_blocks = num_computed_tokens / block_size;
debug_assert!(num_computed_tokens.is_multiple_of(block_size));
......@@ -918,7 +1046,8 @@ impl Slot for VllmConnectorSlot {
let mut host_blocks = self
.block_manager
.host()
.as_ref()
.and_then(|bm| bm.host())
.map(|host| host.match_sequence_hashes_blocking(blocks_to_lookup))
.transpose()?
.unwrap_or_default();
......@@ -932,7 +1061,8 @@ impl Slot for VllmConnectorSlot {
// start at host offset
let mut disk_blocks = self
.block_manager
.disk()
.as_ref()
.and_then(|bm| bm.disk())
.map(|disk| disk.match_sequence_hashes_blocking(&sequence_hashes[search_offset..]))
.transpose()?
.unwrap_or_default();
......@@ -1743,3 +1873,398 @@ impl<S: Storage, L: LocalityProvider, M: BlockMetadata> AnyBlocks for AnyImmutab
self.block_ids()
}
}
#[cfg(test)]
mod connector_tests {
use super::*;
use crate::block_manager::cache_stats::CacheStatsTracker;
use dynamo_llm::tokens::{SaltHash, Tokens};
use std::sync::Arc;
use tokio::sync::mpsc;
const BLOCK_SIZE: usize = 32;
const SALT_HASH: SaltHash = 12345;
/// Creates a test slot with `num_tokens` tokens and the given priority threshold.
/// Returns the slot and the receiving end of the transfer channel for inspecting offload requests.
fn create_test_slot(
num_tokens: usize,
offload_min_priority: u32,
) -> (
VllmConnectorSlot,
mpsc::UnboundedReceiver<LocalTransferRequest>,
) {
let tokens: Vec<u32> = (1..=num_tokens as u32).collect();
let (xfer_tx, xfer_rx) = mpsc::unbounded_channel();
let cache_stats = Arc::new(CacheStatsTracker::new(None));
let slot = VllmConnectorSlot::new_for_test(
"test-req".to_string(),
Tokens::from(tokens),
SALT_HASH,
BLOCK_SIZE,
xfer_tx,
cache_stats,
offload_min_priority,
);
(slot, xfer_rx)
}
/// Generates block IDs starting from `start`.
fn block_ids(start: usize, count: usize) -> Vec<usize> {
(start..start + count).collect()
}
/// Drains all pending offload requests from the channel and returns their block IDs.
fn drain_offload_block_ids(
rx: &mut mpsc::UnboundedReceiver<LocalTransferRequest>,
) -> Vec<Vec<usize>> {
let mut results = Vec::new();
while let Ok(req) = rx.try_recv() {
if let LocalTransferRequest::Offload(offload) = req {
results.push(offload.block_ids);
}
}
results
}
// ---------------------------------------------------------------
// Test 1: vLLM pattern — append_mutable then apply with empty blocks
// ---------------------------------------------------------------
#[test]
fn test_vllm_pattern_no_double_add() {
let num_tokens = 96; // 3 blocks of 32
let (mut slot, _rx) = create_test_slot(num_tokens, 0);
let blocks = block_ids(100, 3);
// Step 1: append_mutable (from update_state_after_alloc)
slot.append_mutable_device_blocks(&blocks).unwrap();
assert_eq!(slot.num_device_blocks_allocated(), 3);
// Step 2: apply_scheduler_output with empty blocks (vLLM pattern)
slot.apply_scheduler_output(&[], &[], 0, num_tokens, None)
.unwrap();
// device_blocks should still be exactly 3 — no double-add
assert_eq!(slot.num_device_blocks_allocated(), 3);
}
// ---------------------------------------------------------------
// Test 2: TRT-LLM pattern — append_mutable then apply with SAME blocks
// The dedup guard must prevent device_blocks from doubling.
// ---------------------------------------------------------------
#[test]
fn test_trtllm_pattern_no_double_add() {
let num_tokens = 96; // 3 blocks of 32
let (mut slot, _rx) = create_test_slot(num_tokens, 0);
let blocks = block_ids(100, 3);
// Step 1: append_mutable (from update_state_after_alloc)
slot.append_mutable_device_blocks(&blocks).unwrap();
assert_eq!(slot.num_device_blocks_allocated(), 3);
// Step 2: apply_scheduler_output with THE SAME blocks (TRT-LLM pattern)
// Without the dedup guard, this doubles device_blocks to len=6.
slot.apply_scheduler_output(&[], &blocks, 0, num_tokens, None)
.unwrap();
// device_blocks must still be exactly 3 — dedup guard prevented the double-add
assert_eq!(slot.num_device_blocks_allocated(), 3);
}
// ---------------------------------------------------------------
// Test 3: Decode adds a new block correctly
// ---------------------------------------------------------------
#[test]
fn test_decode_block_added_correctly() {
let num_tokens = 96; // 3 blocks
let (mut slot, _rx) = create_test_slot(num_tokens, 0);
let prefill_blocks = block_ids(100, 3);
// Prefill: append + apply with empty blocks (vLLM pattern)
slot.append_mutable_device_blocks(&prefill_blocks).unwrap();
slot.apply_scheduler_output(&[], &[], 0, num_tokens, None)
.unwrap();
assert_eq!(slot.num_device_blocks_allocated(), 3);
// Decode: new block at boundary (token 96 = block 3)
let decode_block = block_ids(200, 1);
let decode_token: Vec<u32> = vec![9999];
slot.apply_scheduler_output(&decode_token, &decode_block, 95, 1, None)
.unwrap();
assert_eq!(slot.num_device_blocks_allocated(), 4);
}
// ---------------------------------------------------------------
// Test 4: Multiple append_mutable calls accumulate
// ---------------------------------------------------------------
#[test]
fn test_append_mutable_is_additive() {
let (mut slot, _rx) = create_test_slot(128, 0);
slot.append_mutable_device_blocks(&block_ids(100, 2))
.unwrap();
assert_eq!(slot.num_device_blocks_allocated(), 2);
slot.append_mutable_device_blocks(&block_ids(200, 1))
.unwrap();
assert_eq!(slot.num_device_blocks_allocated(), 3);
}
// ---------------------------------------------------------------
// Test 5: Offload sends correct block IDs through channel
// ---------------------------------------------------------------
#[test]
fn test_offload_sends_correct_block_ids() {
let num_tokens = 96; // 3 blocks
let (mut slot, mut rx) = create_test_slot(num_tokens, 0);
let blocks = block_ids(100, 3);
// Prefill: append blocks, then apply with num_scheduled_tokens=96.
// All 3 blocks (96/32) become offload candidates since evaluated_blocks starts at 0.
// Empty tokens → Prefilling state, and next_position(96) == total_tokens(96)
// so the early-return does not fire and offload proceeds.
slot.append_mutable_device_blocks(&blocks).unwrap();
slot.apply_scheduler_output(&[], &[], 0, num_tokens, None)
.unwrap();
let offloads = drain_offload_block_ids(&mut rx);
assert_eq!(offloads.len(), 1, "expected exactly one offload batch");
assert_eq!(offloads[0], vec![100, 101, 102]);
}
// ---------------------------------------------------------------
// Test 6: Priority filtering offloads only blocks above threshold
// ---------------------------------------------------------------
#[test]
fn test_priority_filtering_offloads_correct_count() {
let num_tokens = 96; // 3 blocks
let (mut slot, mut rx) = create_test_slot(num_tokens, 30);
let blocks = block_ids(100, 3);
let priorities: Vec<u32> = vec![80, 80, 10]; // 2 above threshold, 1 below
// Use the TRT-LLM pattern: append_mutable first, then apply with same blocks + priorities.
// The dedup guard prevents the double-add, but priorities are still processed.
slot.append_mutable_device_blocks(&blocks).unwrap();
slot.apply_scheduler_output(&[], &blocks, 0, num_tokens, Some(&priorities))
.unwrap();
// device_blocks should be 3 (dedup prevented doubling)
assert_eq!(slot.num_device_blocks_allocated(), 3);
let offloads = drain_offload_block_ids(&mut rx);
assert_eq!(offloads.len(), 1);
// Only blocks with priority >= 30 should be offloaded (first 2)
assert_eq!(offloads[0], vec![100, 101]);
}
// ---------------------------------------------------------------
// Test 7: Priority filtering terminates further offloading
// ---------------------------------------------------------------
#[test]
fn test_priority_filtering_terminates_offload() {
let num_tokens = 128; // 4 blocks
let (mut slot, mut rx) = create_test_slot(num_tokens, 30);
let blocks = block_ids(100, 4);
// First 2 high, then 2 low — offload terminates at block 2
let priorities: Vec<u32> = vec![80, 80, 10, 10];
slot.append_mutable_device_blocks(&blocks).unwrap();
slot.apply_scheduler_output(&[], &blocks, 0, num_tokens, Some(&priorities))
.unwrap();
let offloads = drain_offload_block_ids(&mut rx);
assert_eq!(offloads[0], vec![100, 101]); // only 2 offloaded
// Now simulate a decode iteration that crosses a block boundary.
// Because offload was terminated, no further offloading should happen.
let decode_block = block_ids(200, 1);
let decode_token: Vec<u32> = vec![9999];
slot.apply_scheduler_output(&decode_token, &decode_block, 127, 1, None)
.unwrap();
let further_offloads = drain_offload_block_ids(&mut rx);
assert!(
further_offloads.is_empty(),
"no offloads should occur after termination"
);
}
// ---------------------------------------------------------------
// Test 8: evaluated_blocks advances correctly across iterations
// ---------------------------------------------------------------
#[test]
fn test_evaluated_blocks_advances_correctly() {
// 128 tokens = 4 blocks. We'll process in 2 chunks of 64 tokens.
let num_tokens = 128;
let (mut slot, mut rx) = create_test_slot(num_tokens, 0);
let blocks = block_ids(100, 4);
slot.append_mutable_device_blocks(&blocks).unwrap();
// Chunk 1: schedule first 64 tokens → evaluates blocks 0,1
slot.apply_scheduler_output(&[], &[], 0, 64, None).unwrap();
let offloads_1 = drain_offload_block_ids(&mut rx);
assert_eq!(offloads_1.len(), 1);
assert_eq!(offloads_1[0], vec![100, 101]); // blocks 0,1
// Chunk 2: schedule next 64 tokens → evaluates blocks 2,3
// (uses cached_request pattern: empty tokens, empty blocks)
slot.apply_scheduler_output(&[], &[], 64, 64, None).unwrap();
let offloads_2 = drain_offload_block_ids(&mut rx);
assert_eq!(offloads_2.len(), 1);
assert_eq!(offloads_2[0], vec![102, 103]); // blocks 2,3
assert_eq!(slot.num_device_blocks_allocated(), 4);
}
// ---------------------------------------------------------------
// Test 9: Partial overlap dedup — suffix/prefix contract
// [10,11,12] + apply([12,13]) → [10,11,12,13]
// ---------------------------------------------------------------
#[test]
fn test_partial_overlap_dedup() {
let num_tokens = 128; // 4 blocks
let (mut slot, _rx) = create_test_slot(num_tokens, 0);
// Step 1: append_mutable with first 3 blocks
slot.append_mutable_device_blocks(&[10, 11, 12]).unwrap();
assert_eq!(slot.num_device_blocks_allocated(), 3);
// Step 2: apply_scheduler_output with overlapping blocks [12, 13].
// Suffix [12] of device_blocks matches prefix [12] of block_ids.
// Only block 13 is new and gets appended.
slot.apply_scheduler_output(&[], &[12, 13], 0, 128, None)
.unwrap();
assert_eq!(slot.num_device_blocks_allocated(), 4);
assert_eq!(slot.device_blocks_snapshot(), &[10, 11, 12, 13]);
}
// ---------------------------------------------------------------
// Test 10: Priorities work correctly with partial overlap dedup.
// Chunk 1 provides priorities for blocks 10-12, chunk 2
// overlaps on block 12 and adds block 13 with low priority.
// ---------------------------------------------------------------
#[test]
fn test_partial_overlap_with_priorities() {
let num_tokens = 128; // 4 blocks
let (mut slot, mut rx) = create_test_slot(num_tokens, 30);
// Chunk 1: 3 blocks, all high priority, schedule 96 tokens
slot.append_mutable_device_blocks(&[10, 11, 12]).unwrap();
slot.apply_scheduler_output(&[], &[10, 11, 12], 0, 96, Some(&[80, 80, 80]))
.unwrap();
let offloads_1 = drain_offload_block_ids(&mut rx);
assert_eq!(offloads_1.len(), 1);
assert_eq!(offloads_1[0], vec![10, 11, 12]);
assert_eq!(slot.num_device_blocks_allocated(), 3);
// Chunk 2: block 13 is new. append_mutable adds it.
// apply receives [12, 13] with priorities [80, 10].
// Dedup: suffix [12] matches prefix [12], overlap=1, extends with [13].
// But block 13 was already added by append_mutable, so the new_ids
// validation would fail — unless append_mutable already put it there.
// Actually: append_mutable added 13, so device_blocks = [10,11,12,13].
// Then apply receives [12,13]: suffix [12,13] matches prefix [12,13], overlap=2.
// No new blocks to extend. Priorities {12->80, 13->10} stored in HashMap.
slot.append_mutable_device_blocks(&[13]).unwrap();
assert_eq!(slot.num_device_blocks_allocated(), 4);
slot.apply_scheduler_output(&[], &[12, 13], 96, 32, Some(&[80, 10]))
.unwrap();
// Candidate is block 13 (index 3, evaluated_blocks=3).
// Priority for 13 = 10 (< threshold 30), so no offload.
let offloads_2 = drain_offload_block_ids(&mut rx);
assert!(
offloads_2.is_empty(),
"block 13 has priority 10 < threshold 30"
);
assert_eq!(slot.device_blocks_snapshot(), &[10, 11, 12, 13]);
}
// ---------------------------------------------------------------
// Test 11: Invalid overlap panics — block present but not at tail
// [10,11,12] + apply([11,14]) → panic (contract violation)
// ---------------------------------------------------------------
#[test]
#[should_panic(expected = "contract violation")]
fn test_invalid_overlap_panics() {
let num_tokens = 128;
let (mut slot, _rx) = create_test_slot(num_tokens, 0);
slot.append_mutable_device_blocks(&[10, 11, 12]).unwrap();
// block_ids[0]=11 is found at device_blocks[1], but device_blocks[1..] = [11,12]
// does NOT match block_ids[..2] = [11,14]. Contract violation.
slot.apply_scheduler_output(&[], &[11, 14], 0, 128, None)
.unwrap();
}
// ---------------------------------------------------------------
// Test 12: Non-contiguous duplicate panics (Case 3)
// [10,11,12,13,14] + apply([13,14,10]) → block 10 already
// exists but is not part of the suffix/prefix overlap.
// ---------------------------------------------------------------
#[test]
#[should_panic(expected = "contract violation")]
fn test_non_contiguous_duplicate_panics() {
let num_tokens = 192; // 6 blocks
let (mut slot, _rx) = create_test_slot(num_tokens, 0);
slot.append_mutable_device_blocks(&[10, 11, 12, 13, 14])
.unwrap();
// Overlap: suffix [13,14] matches prefix [13,14], overlap=2.
// new_ids = [10]. But 10 ∈ device_blocks → contract violation.
slot.apply_scheduler_output(&[], &[13, 14, 10], 0, 192, None)
.unwrap();
}
// ---------------------------------------------------------------
// Test 13: Over-provision — caller provides all blocks including
// those already registered. Full prefix overlap.
// [10,11,12] + apply([10,11,12,13,14]) → [10,11,12,13,14]
// ---------------------------------------------------------------
#[test]
fn test_over_provision_dedup() {
let num_tokens = 160; // 5 blocks
let (mut slot, _rx) = create_test_slot(num_tokens, 0);
slot.append_mutable_device_blocks(&[10, 11, 12]).unwrap();
assert_eq!(slot.num_device_blocks_allocated(), 3);
// block_ids[0]=10 found at device_blocks[0], suffix_len=3.
// device_blocks[0..3]=[10,11,12] == block_ids[0..3]=[10,11,12] → overlap=3.
// new_ids=[13,14], both genuinely new → extend.
slot.apply_scheduler_output(&[], &[10, 11, 12, 13, 14], 0, 160, None)
.unwrap();
assert_eq!(slot.num_device_blocks_allocated(), 5);
assert_eq!(slot.device_blocks_snapshot(), &[10, 11, 12, 13, 14]);
}
// ---------------------------------------------------------------
// Test 14: Chunked re-provision — all blocks re-sent in chunk 2
// with additional new blocks appended.
// [10,11,12,13,14] + apply([10,11,12,13,14,15,16])
// → [10,11,12,13,14,15,16]
// ---------------------------------------------------------------
#[test]
fn test_chunked_reprovision_dedup() {
let num_tokens = 224; // 7 blocks
let (mut slot, _rx) = create_test_slot(num_tokens, 0);
slot.append_mutable_device_blocks(&[10, 11, 12, 13, 14])
.unwrap();
assert_eq!(slot.num_device_blocks_allocated(), 5);
// Full overlap of all 5 existing blocks, plus 2 new.
slot.apply_scheduler_output(&[], &[10, 11, 12, 13, 14, 15, 16], 0, 224, None)
.unwrap();
assert_eq!(slot.num_device_blocks_allocated(), 7);
assert_eq!(slot.device_blocks_snapshot(), &[10, 11, 12, 13, 14, 15, 16]);
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment