Unverified Commit 6b452a14 authored by Yan Ru Pei's avatar Yan Ru Pei Committed by GitHub
Browse files

chore(mocker): drop the Destroy variant (#8588)


Signed-off-by: default avatarPeaBrane <yanrpei@gmail.com>
parent 241b254d
...@@ -163,7 +163,6 @@ pub enum MoveBlock { ...@@ -163,7 +163,6 @@ pub enum MoveBlock {
Option<Vec<Vec<u32>>>, Option<Vec<Vec<u32>>>,
Option<UniqueBlock>, Option<UniqueBlock>,
), ),
Destroy(Vec<UniqueBlock>),
Deref(Vec<UniqueBlock>), Deref(Vec<UniqueBlock>),
Promote( Promote(
Uuid, Uuid,
......
...@@ -325,7 +325,7 @@ impl ActiveSequence { ...@@ -325,7 +325,7 @@ impl ActiveSequence {
.rev() .rev()
.map(|block| match block { .map(|block| match block {
UniqueBlock::PartialBlock(uuid) => { UniqueBlock::PartialBlock(uuid) => {
MoveBlock::Destroy(vec![UniqueBlock::PartialBlock(*uuid)]) MoveBlock::Deref(vec![UniqueBlock::PartialBlock(*uuid)])
} }
UniqueBlock::FullBlock(hash) => { UniqueBlock::FullBlock(hash) => {
MoveBlock::Deref(vec![UniqueBlock::FullBlock(*hash)]) MoveBlock::Deref(vec![UniqueBlock::FullBlock(*hash)])
...@@ -425,13 +425,13 @@ mod tests { ...@@ -425,13 +425,13 @@ mod tests {
} }
} }
fn assert_destroy_partial(signal: &MoveBlock) { fn assert_deref_partial(signal: &MoveBlock) {
match signal { match signal {
MoveBlock::Destroy(blocks) => { MoveBlock::Deref(blocks) => {
assert_eq!(blocks.len(), 1); assert_eq!(blocks.len(), 1);
assert!(matches!(blocks[0], UniqueBlock::PartialBlock(_))); assert!(matches!(blocks[0], UniqueBlock::PartialBlock(_)));
} }
_ => panic!("Expected MoveBlock::Destroy for partial block"), _ => panic!("Expected MoveBlock::Deref for partial block"),
} }
} }
...@@ -587,12 +587,12 @@ mod tests { ...@@ -587,12 +587,12 @@ mod tests {
let signals_third = seq.generate(); let signals_third = seq.generate();
assert_eq!(signals_third.len(), 0); assert_eq!(signals_third.len(), 0);
// Generate last token - we reach max_output_tokens, should trigger Destroy and Deref signals // Generate last token - we reach max_output_tokens, should trigger Deref signals
let signals_last = seq.generate(); let signals_last = seq.generate();
assert_eq!(signals_last.len(), 2); assert_eq!(signals_last.len(), 2);
// First signal should be Destroy for the partial block // First signal should be Deref for the partial block
assert_destroy_partial(&signals_last[0]); assert_deref_partial(&signals_last[0]);
// Second signal should be Deref for the full block // Second signal should be Deref for the full block
assert_deref_full(&signals_last[1]); assert_deref_full(&signals_last[1]);
......
...@@ -14,10 +14,10 @@ ...@@ -14,10 +14,10 @@
//! new `MutableBlock`, stage with PLH, and register. On capacity exhaustion //! new `MutableBlock`, stage with PLH, and register. On capacity exhaustion
//! returns partial count so the scheduler can preempt the oldest running //! returns partial count so the scheduler can preempt the oldest running
//! request. //! request.
//! - **Destroy**: drop all RAII handles for the block. Emits a `Removed` KV //! - **Deref**: release one request-owned handle. For `PartialBlock` this drops
//! event to match the mocker's existing router protocol. //! the unique `MutableBlock` and returns it to the reset pool. For
//! - **Deref**: pop one `ImmutableBlock` clone; when the vec empties, the block //! `FullBlock` this pops one `ImmutableBlock` clone; when the vec empties,
//! transitions to kvbm-logical's inactive pool (RAII return). //! the block transitions to kvbm-logical's inactive pool (RAII return).
//! - **Promote**: PartialBlock (`MutableBlock`) → FullBlock (`ImmutableBlock`). //! - **Promote**: PartialBlock (`MutableBlock`) → FullBlock (`ImmutableBlock`).
//! Collapses onto an existing registered handle if the PLH / SequenceHash is //! Collapses onto an existing registered handle if the PLH / SequenceHash is
//! already present; otherwise stages + registers a new block. //! already present; otherwise stages + registers a new block.
...@@ -245,7 +245,7 @@ impl KvManager { ...@@ -245,7 +245,7 @@ impl KvManager {
/// be allocated (capacity exhausted); the scheduler uses this to trigger /// be allocated (capacity exhausted); the scheduler uses this to trigger
/// preemption. /// preemption.
/// ///
/// For `Destroy` / `Deref` / `Promote`, returns 1 on success and panics on /// For `Deref` / `Promote`, returns 1 on success and panics on
/// invalid state (consistent with the old `vllm_backend` semantics). /// invalid state (consistent with the old `vllm_backend` semantics).
pub fn process(&mut self, event: &MoveBlock) -> usize { pub fn process(&mut self, event: &MoveBlock) -> usize {
match event { match event {
...@@ -256,10 +256,6 @@ impl KvManager { ...@@ -256,10 +256,6 @@ impl KvManager {
token_ids.as_deref(), token_ids.as_deref(),
parent.as_ref(), parent.as_ref(),
), ),
MoveBlock::Destroy(hashes) => {
self.process_destroy(hashes);
1
}
MoveBlock::Deref(hashes) => { MoveBlock::Deref(hashes) => {
self.process_deref(hashes); self.process_deref(hashes);
1 1
...@@ -430,47 +426,13 @@ impl KvManager { ...@@ -430,47 +426,13 @@ impl KvManager {
allocated allocated
} }
/// Process a `MoveBlock::Destroy` instruction. fn process_deref(&mut self, blocks: &[UniqueBlock]) {
///
/// Contract difference vs. the legacy `vllm_backend`: in the old manual
/// backend `Destroy(FullBlock)` physically removed the block from the
/// cache surface. Here we only drop all active RAII handles for the
/// block; kvbm-logical keeps the `Registered` state alive and the block
/// transitions to the **inactive pool**, where it remains matchable via
/// `match_blocks(plh)` until an allocation forces its eviction. We still
/// emit a router `Removed` event for protocol parity, but callers should
/// be aware that a subsequent `Use` on the same PLH will reactivate the
/// same physical block from inactive (an `InactiveHit`) rather than
/// allocating fresh.
///
/// In the current scheduler flow `Destroy(FullBlock)` is not exercised
/// (preemption goes through `Deref` + `reset_with_signal`), so this
/// divergence is effectively dormant — see `test_destroy_full_block`.
fn process_destroy(&mut self, blocks: &[UniqueBlock]) {
let mut destroyed = Vec::<SequenceHash>::new();
for block in blocks { for block in blocks {
match block { match block {
UniqueBlock::PartialBlock(uuid) => { UniqueBlock::PartialBlock(uuid) => {
self.active_partial self.active_partial
.remove(uuid) .remove(uuid)
.expect("Destroy: partial block not in active pool"); .expect("Deref: partial block not in active pool");
}
UniqueBlock::FullBlock(seq_hash) => {
self.active_full
.remove(seq_hash)
.expect("Destroy: full block not in active pool");
destroyed.push(*seq_hash);
}
}
}
self.publish_kv_event(destroyed, &[], None, false, None);
}
fn process_deref(&mut self, blocks: &[UniqueBlock]) {
for block in blocks {
match block {
UniqueBlock::PartialBlock(_) => {
panic!("Deref on PartialBlock is not valid");
} }
UniqueBlock::FullBlock(seq_hash) => { UniqueBlock::FullBlock(seq_hash) => {
let vec = self let vec = self
...@@ -657,10 +619,43 @@ mod tests { ...@@ -657,10 +619,43 @@ mod tests {
) )
} }
fn make_mgr_capturing_with_backend(
capacity: usize,
block_size: usize,
backend: MockerEvictionBackend,
) -> (KvManager, Arc<CapturingSink>) {
let sink = Arc::new(CapturingSink::default());
let publishers = KvEventPublishers::new(Some(sink.clone() as _), None);
(
KvManager::new_with_eviction_backend(capacity, block_size, publishers, 0, backend),
sink,
)
}
fn plh(v: u64) -> PositionalLineageHash { fn plh(v: u64) -> PositionalLineageHash {
PositionalLineageHash::new(v, None, 0) PositionalLineageHash::new(v, None, 0)
} }
fn lineage_plh(id: u64) -> PositionalLineageHash {
match id {
0 => PositionalLineageHash::new(0, None, 0),
1 => PositionalLineageHash::new(1, Some(0), 1),
2 => PositionalLineageHash::new(2, Some(1), 2),
3 => PositionalLineageHash::new(3, Some(2), 3),
4 => PositionalLineageHash::new(4, Some(3), 4),
5 => PositionalLineageHash::new(5, Some(1), 2),
6 => PositionalLineageHash::new(6, Some(5), 3),
7 => PositionalLineageHash::new(7, Some(2), 3),
8 => PositionalLineageHash::new(8, Some(7), 4),
9 => PositionalLineageHash::new(9, Some(8), 5),
10 => PositionalLineageHash::new(10, None, 0),
11 => PositionalLineageHash::new(11, Some(10), 1),
12 => PositionalLineageHash::new(12, Some(11), 2),
13 => PositionalLineageHash::new(13, None, 0),
_ => plh(id),
}
}
fn use_full(mgr: &mut KvManager, seq_hash: u64, p: PositionalLineageHash) -> usize { fn use_full(mgr: &mut KvManager, seq_hash: u64, p: PositionalLineageHash) -> usize {
mgr.process(&MoveBlock::Use( mgr.process(&MoveBlock::Use(
vec![UniqueBlock::FullBlock(seq_hash)], vec![UniqueBlock::FullBlock(seq_hash)],
...@@ -685,8 +680,8 @@ mod tests { ...@@ -685,8 +680,8 @@ mod tests {
mgr.process(&MoveBlock::Deref(vec![UniqueBlock::FullBlock(seq_hash)])); mgr.process(&MoveBlock::Deref(vec![UniqueBlock::FullBlock(seq_hash)]));
} }
fn destroy_full(mgr: &mut KvManager, seq_hash: u64) { fn deref_partial(mgr: &mut KvManager, uuid: Uuid) {
mgr.process(&MoveBlock::Destroy(vec![UniqueBlock::FullBlock(seq_hash)])); mgr.process(&MoveBlock::Deref(vec![UniqueBlock::PartialBlock(uuid)]));
} }
#[test] #[test]
...@@ -776,42 +771,14 @@ mod tests { ...@@ -776,42 +771,14 @@ mod tests {
} }
#[test] #[test]
#[should_panic(expected = "Deref on PartialBlock is not valid")] fn test_deref_partial_returns_to_reset() {
fn test_deref_on_partial_panics() {
let mut mgr = make_mgr(10, 16); let mut mgr = make_mgr(10, 16);
let uuid = Uuid::new_v4(); let uuid = Uuid::new_v4();
use_partial(&mut mgr, uuid); use_partial(&mut mgr, uuid);
mgr.process(&MoveBlock::Deref(vec![UniqueBlock::PartialBlock(uuid)])); assert_eq!(mgr.active_partial.len(), 1);
} deref_partial(&mut mgr, uuid);
assert!(mgr.active_partial.is_empty());
#[test] assert_eq!(mgr.num_active_block_refs(), 0);
fn test_destroy_full_block() {
let (mut mgr, sink) = make_mgr_capturing(10, 16);
use_full(&mut mgr, 1, plh(100));
assert_eq!(mgr.num_active_blocks(), 1);
assert_eq!(mgr.num_inactive_blocks(), 0);
destroy_full(&mut mgr, 1);
// Active handles released — but kvbm-logical keeps the Registered
// state alive and the block lands in the *inactive* pool, still
// matchable via `match_blocks(plh)`.
assert_eq!(mgr.num_active_blocks(), 0);
assert_eq!(
mgr.num_inactive_blocks(),
1,
"Destroy must leave the block in kvbm-logical's inactive pool"
);
// Router-visible protocol parity: we still emit a `Removed` event
// for the caller.
let events = sink.events.lock().unwrap();
let removed_count = events
.iter()
.filter(|e| matches!(e.data, KvCacheEventData::Removed(_)))
.count();
assert_eq!(removed_count, 1, "Destroy must emit one Removed event");
} }
#[test] #[test]
...@@ -876,16 +843,10 @@ mod tests { ...@@ -876,16 +843,10 @@ mod tests {
#[test] #[test]
fn test_block_lifecycle_stringent() { fn test_block_lifecycle_stringent() {
// Batch helpers local to this test. Each FullBlock gets a unique PLH fn use_blocks(mgr: &mut KvManager, ids: &[u64]) -> usize {
// derived from its id so the PLH->SequenceHash mapping stays 1:1.
fn use_blocks(mgr: &mut KvManager, ids: &[u64]) {
let blocks: Vec<_> = ids.iter().map(|&id| UniqueBlock::FullBlock(id)).collect(); let blocks: Vec<_> = ids.iter().map(|&id| UniqueBlock::FullBlock(id)).collect();
let plhs: Vec<_> = ids.iter().map(|&id| plh(id)).collect(); let plhs: Vec<_> = ids.iter().map(|&id| lineage_plh(id)).collect();
mgr.process(&MoveBlock::Use(blocks, vec![], plhs, None, None)); mgr.process(&MoveBlock::Use(blocks, vec![], plhs, None, None))
}
fn destroy_blocks(mgr: &mut KvManager, ids: &[u64]) {
let blocks = ids.iter().map(|&id| UniqueBlock::FullBlock(id)).collect();
mgr.process(&MoveBlock::Destroy(blocks));
} }
fn deref_blocks(mgr: &mut KvManager, ids: &[u64]) { fn deref_blocks(mgr: &mut KvManager, ids: &[u64]) {
let blocks = ids.iter().map(|&id| UniqueBlock::FullBlock(id)).collect(); let blocks = ids.iter().map(|&id| UniqueBlock::FullBlock(id)).collect();
...@@ -916,18 +877,16 @@ mod tests { ...@@ -916,18 +877,16 @@ mod tests {
// kvbm-logical AND absent from `active_full`. Also checks total count // kvbm-logical AND absent from `active_full`. Also checks total count
// matches so we catch stray inactive entries too. // matches so we catch stray inactive entries too.
// //
// NOTE: under kvbm-logical, `Destroy` removes the block from // NOTE: under kvbm-logical, once the last `ImmutableBlock` handle is
// `active_full` but the `ImmutableBlock` drop returns it to the // dropped, the block returns to the inactive pool and remains matchable
// inactive pool — so a destroyed block appears as inactive here until // until eviction.
// evicted. This differs from the old `HashCache` where `Destroy`
// removed the block entirely.
fn assert_inactive_blocks(mgr: &KvManager, expected_ids: &[u64]) { fn assert_inactive_blocks(mgr: &KvManager, expected_ids: &[u64]) {
assert_eq!( assert_eq!(
mgr.num_inactive_blocks(), mgr.num_inactive_blocks(),
expected_ids.len(), expected_ids.len(),
"inactive count mismatch; expected={expected_ids:?}" "inactive count mismatch; expected={expected_ids:?}"
); );
let plhs: Vec<_> = expected_ids.iter().map(|&id| plh(id)).collect(); let plhs: Vec<_> = expected_ids.iter().map(|&id| lineage_plh(id)).collect();
let presence = mgr let presence = mgr
.block_manager .block_manager
.block_registry() .block_registry()
...@@ -943,47 +902,123 @@ mod tests { ...@@ -943,47 +902,123 @@ mod tests {
); );
} }
} }
fn drain_events(sink: &Arc<CapturingSink>) -> Vec<KvCacheEvent> {
std::mem::take(&mut *sink.events.lock().unwrap())
}
fn assert_stored_event(
event: &KvCacheEvent,
expected_blocks: &[u64],
expected_parent: Option<u64>,
) {
let KvCacheEventData::Stored(data) = &event.data else {
panic!("expected Stored event, got {:?}", event.data);
};
let actual_blocks: Vec<u64> =
data.blocks.iter().map(|block| block.block_hash.0).collect();
assert_eq!(actual_blocks, expected_blocks, "stored blocks mismatch");
assert_eq!(
data.parent_hash.map(|hash| hash.0),
expected_parent,
"stored parent_hash mismatch"
);
}
fn assert_removed_event(event: &KvCacheEvent, expected_blocks: &[u64]) {
let KvCacheEventData::Removed(data) = &event.data else {
panic!("expected Removed event, got {:?}", event.data);
};
let actual_blocks: Vec<u64> = data.block_hashes.iter().map(|hash| hash.0).collect();
assert_eq!(actual_blocks, expected_blocks, "removed blocks mismatch");
}
let mut mgr = make_mgr(10, 16); let (mut mgr, sink) =
make_mgr_capturing_with_backend(10, 16, MockerEvictionBackend::Lineage);
// Use blocks 0..=4, then 0, 1, 5, 6 — 0 and 1 bump refcount to 2. // Use blocks 0..=4, then 0, 1, 5, 6 — 0 and 1 bump refcount to 2.
use_blocks(&mut mgr, &[0, 1, 2, 3, 4]); assert_eq!(use_blocks(&mut mgr, &[0, 1, 2, 3, 4]), 5);
use_blocks(&mut mgr, &[0, 1, 5, 6]); let events = drain_events(&sink);
assert_eq!(events.len(), 1, "expected one Stored event for [0..=4]");
assert_stored_event(&events[0], &[0, 1, 2, 3, 4], None);
assert_eq!(use_blocks(&mut mgr, &[0, 1, 5, 6]), 4);
let events = drain_events(&sink);
assert_eq!(events.len(), 1, "expected one Stored event for [5, 6]");
assert_stored_event(&events[0], &[5, 6], Some(1));
assert_active( assert_active(
&mgr, &mgr,
&[(0, 2), (1, 2), (2, 1), (3, 1), (4, 1), (5, 1), (6, 1)], &[(0, 2), (1, 2), (2, 1), (3, 1), (4, 1), (5, 1), (6, 1)],
); );
// Destroy block 4; deref 0, 1, 2, 3. 2 and 3 drop to inactive via // Leaf-to-root release order is what makes the resulting inactive set
// RAII; 4 is destroyed (removed from active_full + Removed emitted) // deterministic under the Lineage backend.
// but kvbm-logical still has it in the inactive pool. deref_blocks(&mut mgr, &[4, 3, 2, 1, 0]);
destroy_blocks(&mut mgr, &[4]); let events = drain_events(&sink);
deref_blocks(&mut mgr, &[0, 1, 2, 3]); assert!(events.is_empty(), "Deref should not emit KV events");
assert_active(&mgr, &[(0, 1), (1, 1), (5, 1), (6, 1)]); assert_active(&mgr, &[(0, 1), (1, 1), (5, 1), (6, 1)]);
assert_inactive_blocks(&mgr, &[2, 3, 4]); assert_inactive_blocks(&mgr, &[2, 3, 4]);
// Destroy block 6; deref 0, 1, 5. Active drains; inactive = {0..=6}. // Release the second branch leaf-to-root too. Active drains; inactive = {0..=6}.
destroy_blocks(&mut mgr, &[6]); deref_blocks(&mut mgr, &[6, 5, 1, 0]);
deref_blocks(&mut mgr, &[0, 1, 5]); let events = drain_events(&sink);
assert!(events.is_empty(), "Deref should not emit KV events");
assert_active(&mgr, &[]); assert_active(&mgr, &[]);
assert_inactive_blocks(&mgr, &[0, 1, 2, 3, 4, 5, 6]); assert_inactive_blocks(&mgr, &[0, 1, 2, 3, 4, 5, 6]);
// Re-use 0, 1, 2 (reactivates from inactive) + 7, 8, 9 (new, 3 free // Re-use 0, 1, 2 (reactivates from inactive) + 7, 8, 9 (new, 3 free
// slots). No eviction needed — inactive shrinks to {3, 4, 5, 6}. // slots). No eviction needed — inactive shrinks to {3, 4, 5, 6}.
use_blocks(&mut mgr, &[0, 1, 2, 7, 8, 9]); assert_eq!(use_blocks(&mut mgr, &[0, 1, 2, 7, 8, 9]), 6);
let events = drain_events(&sink);
assert_eq!(events.len(), 1, "expected one Stored event for [7, 8, 9]");
assert_stored_event(&events[0], &[7, 8, 9], Some(2));
assert_active(&mgr, &[(0, 1), (1, 1), (2, 1), (7, 1), (8, 1), (9, 1)]); assert_active(&mgr, &[(0, 1), (1, 1), (2, 1), (7, 1), (8, 1), (9, 1)]);
assert_inactive_blocks(&mgr, &[3, 4, 5, 6]); assert_inactive_blocks(&mgr, &[3, 4, 5, 6]);
// Allocate through capacity: 10, 11, 12 force eviction of 3 inactive // Capacity pressure now forces exact leaf-first evictions: 4, then 3,
// entries. Exact survivor depends on eviction order (Lineage/LRU), so // then 6. The sole inactive survivor is 5.
// only assert count. assert_eq!(use_blocks(&mut mgr, &[10, 11, 12]), 3);
use_blocks(&mut mgr, &[10, 11, 12]); let events = drain_events(&sink);
assert_eq!(mgr.num_active_blocks(), 9); assert_eq!(
assert_eq!(mgr.num_inactive_blocks(), 1); events.len(),
2,
"expected Stored + Removed for [10, 11, 12]"
);
assert_stored_event(&events[0], &[10, 11, 12], None);
assert_removed_event(&events[1], &[4, 3, 6]);
assert_active(
&mgr,
&[
(0, 1),
(1, 1),
(2, 1),
(7, 1),
(8, 1),
(9, 1),
(10, 1),
(11, 1),
(12, 1),
],
);
assert_inactive_blocks(&mgr, &[5]);
// One more block keeps us at full capacity without panicking. assert_eq!(use_blocks(&mut mgr, &[13]), 1);
use_blocks(&mut mgr, &[13]); let events = drain_events(&sink);
assert_eq!(mgr.num_active_blocks(), 10); assert_eq!(events.len(), 2, "expected Stored + Removed for [13]");
assert_stored_event(&events[0], &[13], None);
assert_removed_event(&events[1], &[5]);
assert_active(
&mgr,
&[
(0, 1),
(1, 1),
(2, 1),
(7, 1),
(8, 1),
(9, 1),
(10, 1),
(11, 1),
(12, 1),
(13, 1),
],
);
assert_eq!(mgr.num_inactive_blocks(), 0); assert_eq!(mgr.num_inactive_blocks(), 0);
} }
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment