Unverified Commit 4b3cd459 authored by Yan Ru Pei's avatar Yan Ru Pei Committed by GitHub
Browse files

feat(kv-router): add start positions to flat KV events (#8426)


Signed-off-by: default avatarPeaBrane <yanrpei@gmail.com>
parent e8ecf6ff
...@@ -632,6 +632,7 @@ impl SequenceData { ...@@ -632,6 +632,7 @@ impl SequenceData {
event_id, event_id,
data: KvCacheEventData::Stored(KvCacheStoreData { data: KvCacheEventData::Stored(KvCacheStoreData {
parent_hash: None, parent_hash: None,
start_position: None,
blocks: self blocks: self
.local_hashes .local_hashes
.iter() .iter()
......
...@@ -286,6 +286,7 @@ fn kv_event_create_stored_from_parts( ...@@ -286,6 +286,7 @@ fn kv_event_create_stored_from_parts(
data: KvCacheEventData::Stored(KvCacheStoreData { data: KvCacheEventData::Stored(KvCacheStoreData {
blocks, blocks,
parent_hash: kv_params.parent_hash.map(ExternalSequenceBlockHash), parent_hash: kv_params.parent_hash.map(ExternalSequenceBlockHash),
start_position: None,
}), }),
event_id: kv_params.event_id, event_id: kv_params.event_id,
dp_rank: 0, dp_rank: 0,
......
...@@ -306,6 +306,7 @@ impl KvEventPublisher { ...@@ -306,6 +306,7 @@ impl KvEventPublisher {
event_id, event_id,
data: KvCacheEventData::Stored(KvCacheStoreData { data: KvCacheEventData::Stored(KvCacheStoreData {
parent_hash: parent_hash.map(ExternalSequenceBlockHash::from), parent_hash: parent_hash.map(ExternalSequenceBlockHash::from),
start_position: None,
blocks: create_stored_blocks( blocks: create_stored_blocks(
kv_block_size, kv_block_size,
&token_ids, &token_ids,
......
...@@ -255,7 +255,10 @@ worker_blocks (DashMap<Worker, RwLock<HashMap>>): ...@@ -255,7 +255,10 @@ worker_blocks (DashMap<Worker, RwLock<HashMap>>):
### How Operations Work ### How Operations Work
**store_blocks(worker, parent_hash, blocks)**: **store_blocks(worker, parent_hash, blocks)**:
1. Find starting position: `pos = worker_blocks[worker][parent_hash].position + 1` 1. Find starting position:
- `start_position` if the batch provides one
- otherwise `worker_blocks[worker][parent_hash].position + 1`
- otherwise `0`
2. For each block at position `i`: 2. For each block at position `i`:
- Insert into `index[(pos+i, local_hash)]` → add worker to SeqEntry - Insert into `index[(pos+i, local_hash)]` → add worker to SeqEntry
- Insert into `worker_blocks[worker][seq_hash] = (pos+i, local_hash)` - Insert into `worker_blocks[worker][seq_hash] = (pos+i, local_hash)`
...@@ -291,9 +294,9 @@ Query: [b0, b1, b2, ..., b63, b64, ..., b127, ...] ...@@ -291,9 +294,9 @@ Query: [b0, b1, b2, ..., b63, b64, ..., b127, ...]
**dump_events()**: **dump_events()**:
1. Iterate `worker_blocks`, collecting all blocks per worker 1. Iterate `worker_blocks`, collecting all blocks per worker
2. Sort each worker's blocks by position (parents before children) 2. Sort each worker's blocks by position
3. Emit one single-block `RouterEvent::Stored` per block, synthesizing 3. Emit one single-block `RouterEvent::Stored` per block with
`parent_hash` from any seq_hash at the prior position `start_position = Some(position)` and `parent_hash = None`
4. Events can be replayed into a fresh `PositionalIndexer` to reconstruct 4. Events can be replayed into a fresh `PositionalIndexer` to reconstruct
the same index state the same index state
......
...@@ -608,6 +608,7 @@ impl ConcurrentRadixTree { ...@@ -608,6 +608,7 @@ impl ConcurrentRadixTree {
event_id, event_id,
data: KvCacheEventData::Stored(KvCacheStoreData { data: KvCacheEventData::Stored(KvCacheStoreData {
parent_hash, parent_hash,
start_position: None,
blocks: vec![KvCacheStoredBlockData { blocks: vec![KvCacheStoredBlockData {
block_hash, block_hash,
mm_extra_info: None, mm_extra_info: None,
......
...@@ -1191,6 +1191,7 @@ impl ConcurrentRadixTreeCompressed { ...@@ -1191,6 +1191,7 @@ impl ConcurrentRadixTreeCompressed {
event_id, event_id,
data: KvCacheEventData::Stored(KvCacheStoreData { data: KvCacheEventData::Stored(KvCacheStoreData {
parent_hash, parent_hash,
start_position: None,
blocks: full_blocks.clone(), blocks: full_blocks.clone(),
}), }),
dp_rank: worker.dp_rank, dp_rank: worker.dp_rank,
...@@ -1205,6 +1206,7 @@ impl ConcurrentRadixTreeCompressed { ...@@ -1205,6 +1206,7 @@ impl ConcurrentRadixTreeCompressed {
event_id, event_id,
data: KvCacheEventData::Stored(KvCacheStoreData { data: KvCacheEventData::Stored(KvCacheStoreData {
parent_hash, parent_hash,
start_position: None,
blocks: full_blocks[..k].to_vec(), blocks: full_blocks[..k].to_vec(),
}), }),
dp_rank: worker.dp_rank, dp_rank: worker.dp_rank,
......
...@@ -255,6 +255,7 @@ impl KvIndexer { ...@@ -255,6 +255,7 @@ impl KvIndexer {
let hashes = routing_req.local_hashes.iter().zip(routing_req.sequence_hashes.iter()); let hashes = routing_req.local_hashes.iter().zip(routing_req.sequence_hashes.iter());
let stored_event = KvCacheEventData::Stored(KvCacheStoreData { let stored_event = KvCacheEventData::Stored(KvCacheStoreData {
parent_hash: None, parent_hash: None,
start_position: None,
blocks: hashes.map(|(local_hash, sequence_hash)| KvCacheStoredBlockData { blocks: hashes.map(|(local_hash, sequence_hash)| KvCacheStoredBlockData {
tokens_hash: *local_hash, tokens_hash: *local_hash,
block_hash: ExternalSequenceBlockHash(*sequence_hash), block_hash: ExternalSequenceBlockHash(*sequence_hash),
......
...@@ -237,30 +237,37 @@ impl PositionalIndexer { ...@@ -237,30 +237,37 @@ impl PositionalIndexer {
store_data: KvCacheStoreData, store_data: KvCacheStoreData,
event_id: u64, event_id: u64,
) -> Result<(), KvCacheEventError> { ) -> Result<(), KvCacheEventError> {
let KvCacheStoreData {
parent_hash,
start_position,
blocks,
} = store_data;
let worker_map = worker_blocks.entry(worker).or_default(); let worker_map = worker_blocks.entry(worker).or_default();
// Determine starting position based on parent_hash let start_pos = match start_position {
let start_pos = match store_data.parent_hash { Some(start_position) => start_position as usize,
Some(parent_hash) => { None => match parent_hash {
let Some(entry) = worker_map.get(&parent_hash) else { Some(parent_hash) => {
tracing::warn!( let Some(entry) = worker_map.get(&parent_hash) else {
worker_id = worker.worker_id.to_string(), tracing::warn!(
dp_rank = worker.dp_rank, worker_id = worker.worker_id.to_string(),
event_id, dp_rank = worker.dp_rank,
parent_hash = ?parent_hash, event_id,
); parent_hash = ?parent_hash,
return Err(KvCacheEventError::ParentBlockNotFound); );
}; return Err(KvCacheEventError::ParentBlockNotFound);
};
entry.0 + 1 // parent position + 1
} entry.0 + 1 // parent position + 1
None => 0, // Start from position 0 }
None => 0, // Start from position 0
},
}; };
let worker_blocks_entry = worker_blocks.entry(worker).or_default(); let worker_blocks_entry = worker_blocks.entry(worker).or_default();
let num_stored_blocks = store_data.blocks.len(); let num_stored_blocks = blocks.len();
for (i, block_data) in store_data.blocks.into_iter().enumerate() { for (i, block_data) in blocks.into_iter().enumerate() {
let position = start_pos + i; let position = start_pos + i;
let local_hash = block_data.tokens_hash; let local_hash = block_data.tokens_hash;
let seq_hash = block_data.block_hash; let seq_hash = block_data.block_hash;
...@@ -411,47 +418,22 @@ impl PositionalIndexer { ...@@ -411,47 +418,22 @@ impl PositionalIndexer {
let mut event_id = 0u64; let mut event_id = 0u64;
for (worker, worker_map) in worker_blocks.iter() { for (worker, worker_map) in worker_blocks.iter() {
// Collect (position, local_hash, seq_hash) and sort by position // Collect (position, local_hash, seq_hash) and sort by position.
// so parents are emitted before children during replay.
let mut blocks: Vec<_> = worker_map let mut blocks: Vec<_> = worker_map
.iter() .iter()
.map(|(seq_hash, (pos, local_hash))| (*pos, *local_hash, *seq_hash)) .map(|(seq_hash, (pos, local_hash))| (*pos, *local_hash, *seq_hash))
.collect(); .collect();
blocks.sort_unstable_by_key(|(pos, _, _)| *pos); blocks.sort_unstable_by_key(|(pos, _, _)| *pos);
// Track one valid seq_hash per position for parent_hash synthesis.
// Note: The synthesized parent_hash doesn't need to be the true logical
// parent — during replay it's only used to derive `start_pos = parent.position + 1`,
// so any seq_hash at the previous position is sufficient. The PositionalIndexer
// is position-based, not tree-topology-based.
let mut last_at_position: FxHashMap<usize, ExternalSequenceBlockHash> =
FxHashMap::default();
for (pos, local_hash, seq_hash) in blocks { for (pos, local_hash, seq_hash) in blocks {
let parent_hash = if pos == 0 {
None
} else {
match last_at_position.get(&(pos - 1)) {
Some(&parent) => Some(parent),
None => {
tracing::warn!(
worker_id = worker.worker_id.to_string(),
dp_rank = worker.dp_rank,
position = pos,
"Orphaned block at position with no parent; skipping in dump"
);
continue;
}
}
};
events.push(RouterEvent { events.push(RouterEvent {
worker_id: worker.worker_id, worker_id: worker.worker_id,
storage_tier: crate::protocols::StorageTier::Device, storage_tier: crate::protocols::StorageTier::Device,
event: KvCacheEvent { event: KvCacheEvent {
event_id, event_id,
data: KvCacheEventData::Stored(KvCacheStoreData { data: KvCacheEventData::Stored(KvCacheStoreData {
parent_hash, parent_hash: None,
start_position: Some(pos as u32),
blocks: vec![KvCacheStoredBlockData { blocks: vec![KvCacheStoredBlockData {
block_hash: seq_hash, block_hash: seq_hash,
tokens_hash: local_hash, tokens_hash: local_hash,
...@@ -462,7 +444,6 @@ impl PositionalIndexer { ...@@ -462,7 +444,6 @@ impl PositionalIndexer {
}, },
}); });
event_id += 1; event_id += 1;
last_at_position.insert(pos, seq_hash);
} }
} }
......
...@@ -542,6 +542,7 @@ impl RadixTree { ...@@ -542,6 +542,7 @@ impl RadixTree {
event_id, event_id,
data: KvCacheEventData::Stored(KvCacheStoreData { data: KvCacheEventData::Stored(KvCacheStoreData {
parent_hash, parent_hash,
start_position: None,
blocks: vec![KvCacheStoredBlockData { blocks: vec![KvCacheStoredBlockData {
block_hash, block_hash,
mm_extra_info: None, mm_extra_info: None,
......
...@@ -32,7 +32,7 @@ fn make_store_event_with_dp_rank( ...@@ -32,7 +32,7 @@ fn make_store_event_with_dp_rank(
local_hashes: &[u64], local_hashes: &[u64],
dp_rank: u32, dp_rank: u32,
) -> RouterEvent { ) -> RouterEvent {
make_store_event_full(worker_id, local_hashes, dp_rank, None) make_store_event_full(worker_id, local_hashes, dp_rank, None, None)
} }
/// Create a store event with parent hash for continuation sequences. /// Create a store event with parent hash for continuation sequences.
...@@ -72,17 +72,27 @@ fn make_store_event_with_parent( ...@@ -72,17 +72,27 @@ fn make_store_event_with_parent(
0, 0,
KvCacheEventData::Stored(KvCacheStoreData { KvCacheEventData::Stored(KvCacheStoreData {
parent_hash, parent_hash,
start_position: None,
blocks: stored_blocks_with_sequence_hashes(&new_block_hashes, new_seq_hashes), blocks: stored_blocks_with_sequence_hashes(&new_block_hashes, new_seq_hashes),
}), }),
) )
} }
fn make_store_event_with_start_position(
worker_id: u64,
local_hashes: &[u64],
start_position: u32,
) -> RouterEvent {
make_store_event_full(worker_id, local_hashes, 0, None, Some(start_position))
}
/// Create a store event with all options. /// Create a store event with all options.
fn make_store_event_full( fn make_store_event_full(
worker_id: u64, worker_id: u64,
local_hashes: &[u64], local_hashes: &[u64],
dp_rank: u32, dp_rank: u32,
parent_hash: Option<ExternalSequenceBlockHash>, parent_hash: Option<ExternalSequenceBlockHash>,
start_position: Option<u32>,
) -> RouterEvent { ) -> RouterEvent {
let local_block_hashes: Vec<LocalBlockHash> = let local_block_hashes: Vec<LocalBlockHash> =
local_hashes.iter().map(|&h| LocalBlockHash(h)).collect(); local_hashes.iter().map(|&h| LocalBlockHash(h)).collect();
...@@ -94,6 +104,7 @@ fn make_store_event_full( ...@@ -94,6 +104,7 @@ fn make_store_event_full(
dp_rank, dp_rank,
KvCacheEventData::Stored(KvCacheStoreData { KvCacheEventData::Stored(KvCacheStoreData {
parent_hash, parent_hash,
start_position,
blocks: stored_blocks_with_sequence_hashes(&local_block_hashes, &seq_hashes), blocks: stored_blocks_with_sequence_hashes(&local_block_hashes, &seq_hashes),
}), }),
) )
...@@ -781,6 +792,46 @@ mod interface_tests { ...@@ -781,6 +792,46 @@ mod interface_tests {
assert_score(index.as_ref(), &[1, 2, 3], WorkerWithDpRank::new(0, 0), 3).await; assert_score(index.as_ref(), &[1, 2, 3], WorkerWithDpRank::new(0, 0), 3).await;
} }
#[tokio::test]
async fn test_flat_dump_replay_preserves_start_positions() {
let index = make_indexer("flat");
index
.apply_event(make_store_event_with_start_position(0, &[11, 12], 10))
.await;
flush_and_settle(index.as_ref()).await;
let dumped = index.dump_events().await.unwrap();
let stored = dumped
.iter()
.filter_map(|event| match &event.event.data {
KvCacheEventData::Stored(data) => Some(data),
_ => None,
})
.collect::<Vec<_>>();
assert_eq!(stored.len(), 2);
assert_eq!(
stored
.iter()
.map(|data| data.start_position)
.collect::<Vec<_>>(),
vec![Some(10), Some(11)]
);
assert!(stored.iter().all(|data| data.parent_hash.is_none()));
let replay = make_indexer("flat");
for event in dumped {
replay.apply_event(event).await;
}
flush_and_settle(replay.as_ref()).await;
assert_eq!(
snapshot_tree(index.as_ref()).await,
snapshot_tree(replay.as_ref()).await
);
}
#[tokio::test] #[tokio::test]
#[apply(indexer_template)] #[apply(indexer_template)]
async fn test_multiple_dp_ranks(variant: &str) { async fn test_multiple_dp_ranks(variant: &str) {
...@@ -1081,6 +1132,7 @@ mod lora_tests { ...@@ -1081,6 +1132,7 @@ mod lora_tests {
0, 0,
KvCacheEventData::Stored(KvCacheStoreData { KvCacheEventData::Stored(KvCacheStoreData {
parent_hash: None, parent_hash: None,
start_position: None,
blocks: stored_blocks_with_sequence_hashes(&base_hashes, &base_seq), blocks: stored_blocks_with_sequence_hashes(&base_hashes, &base_seq),
}), }),
); );
...@@ -1093,6 +1145,7 @@ mod lora_tests { ...@@ -1093,6 +1145,7 @@ mod lora_tests {
0, 0,
KvCacheEventData::Stored(KvCacheStoreData { KvCacheEventData::Stored(KvCacheStoreData {
parent_hash: None, parent_hash: None,
start_position: None,
blocks: stored_blocks_with_sequence_hashes(&lora_hashes, &lora_seq), blocks: stored_blocks_with_sequence_hashes(&lora_hashes, &lora_seq),
}), }),
); );
...@@ -1177,6 +1230,7 @@ mod lora_tests { ...@@ -1177,6 +1230,7 @@ mod lora_tests {
0, 0,
KvCacheEventData::Stored(KvCacheStoreData { KvCacheEventData::Stored(KvCacheStoreData {
parent_hash: None, parent_hash: None,
start_position: None,
blocks: stored_blocks_with_sequence_hashes(&base_local, &base_seq), blocks: stored_blocks_with_sequence_hashes(&base_local, &base_seq),
}), }),
)) ))
...@@ -1191,6 +1245,7 @@ mod lora_tests { ...@@ -1191,6 +1245,7 @@ mod lora_tests {
0, 0,
KvCacheEventData::Stored(KvCacheStoreData { KvCacheEventData::Stored(KvCacheStoreData {
parent_hash: None, parent_hash: None,
start_position: None,
blocks: stored_blocks_with_sequence_hashes(&lora_local, &lora_seq), blocks: stored_blocks_with_sequence_hashes(&lora_local, &lora_seq),
}), }),
)) ))
...@@ -1262,6 +1317,7 @@ mod lora_tests { ...@@ -1262,6 +1317,7 @@ mod lora_tests {
0, 0,
KvCacheEventData::Stored(KvCacheStoreData { KvCacheEventData::Stored(KvCacheStoreData {
parent_hash: None, parent_hash: None,
start_position: None,
blocks: stored_blocks_with_sequence_hashes(&hashes_a, &seq_a), blocks: stored_blocks_with_sequence_hashes(&hashes_a, &seq_a),
}), }),
)) ))
...@@ -1275,6 +1331,7 @@ mod lora_tests { ...@@ -1275,6 +1331,7 @@ mod lora_tests {
0, 0,
KvCacheEventData::Stored(KvCacheStoreData { KvCacheEventData::Stored(KvCacheStoreData {
parent_hash: None, parent_hash: None,
start_position: None,
blocks: stored_blocks_with_sequence_hashes(&hashes_b, &seq_b), blocks: stored_blocks_with_sequence_hashes(&hashes_b, &seq_b),
}), }),
)) ))
...@@ -2126,6 +2183,7 @@ mod local_indexer_tests { ...@@ -2126,6 +2183,7 @@ mod local_indexer_tests {
event_id, event_id,
data: KvCacheEventData::Stored(KvCacheStoreData { data: KvCacheEventData::Stored(KvCacheStoreData {
parent_hash: None, parent_hash: None,
start_position: None,
blocks: vec![KvCacheStoredBlockData { blocks: vec![KvCacheStoredBlockData {
block_hash: ExternalSequenceBlockHash(block_hash), block_hash: ExternalSequenceBlockHash(block_hash),
tokens_hash: LocalBlockHash(block_hash), tokens_hash: LocalBlockHash(block_hash),
...@@ -2234,6 +2292,7 @@ mod local_indexer_tests { ...@@ -2234,6 +2292,7 @@ mod local_indexer_tests {
event_id: id, event_id: id,
data: KvCacheEventData::Stored(KvCacheStoreData { data: KvCacheEventData::Stored(KvCacheStoreData {
parent_hash: None, parent_hash: None,
start_position: None,
blocks: vec![KvCacheStoredBlockData { blocks: vec![KvCacheStoredBlockData {
block_hash: ExternalSequenceBlockHash(id * 100), block_hash: ExternalSequenceBlockHash(id * 100),
tokens_hash: LocalBlockHash(id * 200), tokens_hash: LocalBlockHash(id * 200),
...@@ -2335,6 +2394,7 @@ mod local_indexer_tests { ...@@ -2335,6 +2394,7 @@ mod local_indexer_tests {
event_id: id, event_id: id,
data: KvCacheEventData::Stored(KvCacheStoreData { data: KvCacheEventData::Stored(KvCacheStoreData {
parent_hash: None, parent_hash: None,
start_position: None,
blocks: vec![KvCacheStoredBlockData { blocks: vec![KvCacheStoredBlockData {
block_hash: ExternalSequenceBlockHash(id * 100), block_hash: ExternalSequenceBlockHash(id * 100),
tokens_hash: LocalBlockHash(id * 200), tokens_hash: LocalBlockHash(id * 200),
...@@ -2423,6 +2483,7 @@ mod local_indexer_tests { ...@@ -2423,6 +2483,7 @@ mod local_indexer_tests {
event_id: 1, event_id: 1,
data: KvCacheEventData::Stored(KvCacheStoreData { data: KvCacheEventData::Stored(KvCacheStoreData {
parent_hash: None, parent_hash: None,
start_position: None,
blocks: vec![KvCacheStoredBlockData { blocks: vec![KvCacheStoredBlockData {
block_hash: ExternalSequenceBlockHash(100), block_hash: ExternalSequenceBlockHash(100),
tokens_hash: LocalBlockHash(200), tokens_hash: LocalBlockHash(200),
...@@ -2479,6 +2540,7 @@ mod local_indexer_tests { ...@@ -2479,6 +2540,7 @@ mod local_indexer_tests {
event_id: 1, event_id: 1,
data: KvCacheEventData::Stored(KvCacheStoreData { data: KvCacheEventData::Stored(KvCacheStoreData {
parent_hash: None, parent_hash: None,
start_position: None,
blocks: vec![KvCacheStoredBlockData { blocks: vec![KvCacheStoredBlockData {
block_hash: ExternalSequenceBlockHash(100), block_hash: ExternalSequenceBlockHash(100),
tokens_hash: LocalBlockHash(200), tokens_hash: LocalBlockHash(200),
......
...@@ -505,6 +505,9 @@ pub enum KvCacheEventData { ...@@ -505,6 +505,9 @@ pub enum KvCacheEventData {
pub struct KvCacheStoreData { pub struct KvCacheStoreData {
/// The optional hash of the parent block. /// The optional hash of the parent block.
pub parent_hash: Option<ExternalSequenceBlockHash>, pub parent_hash: Option<ExternalSequenceBlockHash>,
/// Absolute position of the first block in this batch for positional replay.
#[serde(default)]
pub start_position: Option<u32>,
/// A list of stored blocked data. /// A list of stored blocked data.
pub blocks: Vec<KvCacheStoredBlockData>, pub blocks: Vec<KvCacheStoredBlockData>,
} }
...@@ -932,6 +935,7 @@ mod tests { ...@@ -932,6 +935,7 @@ mod tests {
event_id: 1, event_id: 1,
data: KvCacheEventData::Stored(KvCacheStoreData { data: KvCacheEventData::Stored(KvCacheStoreData {
parent_hash: None, parent_hash: None,
start_position: None,
blocks: vec![KvCacheStoredBlockData { blocks: vec![KvCacheStoredBlockData {
block_hash: ExternalSequenceBlockHash(0), block_hash: ExternalSequenceBlockHash(0),
mm_extra_info: None, mm_extra_info: None,
...@@ -1135,6 +1139,7 @@ mod tests { ...@@ -1135,6 +1139,7 @@ mod tests {
fn test_kv_cache_events_serialization() { fn test_kv_cache_events_serialization() {
let event_data = KvCacheEventData::Stored(KvCacheStoreData { let event_data = KvCacheEventData::Stored(KvCacheStoreData {
parent_hash: Some(ExternalSequenceBlockHash(1)), parent_hash: Some(ExternalSequenceBlockHash(1)),
start_position: None,
blocks: vec![KvCacheStoredBlockData { blocks: vec![KvCacheStoredBlockData {
block_hash: ExternalSequenceBlockHash(2), block_hash: ExternalSequenceBlockHash(2),
tokens_hash: LocalBlockHash(3), tokens_hash: LocalBlockHash(3),
......
...@@ -72,9 +72,18 @@ pub fn make_blocks(hashes: Vec<u64>) -> Vec<KvCacheStoredBlockData> { ...@@ -72,9 +72,18 @@ pub fn make_blocks(hashes: Vec<u64>) -> Vec<KvCacheStoredBlockData> {
pub fn add_blocks( pub fn add_blocks(
hashes: Vec<u64>, hashes: Vec<u64>,
parent_hash: Option<ExternalSequenceBlockHash>, parent_hash: Option<ExternalSequenceBlockHash>,
) -> KvCacheEventData {
add_blocks_with_start_position(hashes, parent_hash, None)
}
pub fn add_blocks_with_start_position(
hashes: Vec<u64>,
parent_hash: Option<ExternalSequenceBlockHash>,
start_position: Option<u32>,
) -> KvCacheEventData { ) -> KvCacheEventData {
KvCacheEventData::Stored(KvCacheStoreData { KvCacheEventData::Stored(KvCacheStoreData {
parent_hash, parent_hash,
start_position,
blocks: make_blocks(hashes), blocks: make_blocks(hashes),
}) })
} }
...@@ -88,6 +97,21 @@ pub fn create_store_event( ...@@ -88,6 +97,21 @@ pub fn create_store_event(
router_event(worker_id, event_id, 0, add_blocks(hashes, parent)) router_event(worker_id, event_id, 0, add_blocks(hashes, parent))
} }
pub fn create_store_event_with_start_position(
worker_id: WorkerId,
event_id: u64,
hashes: Vec<u64>,
parent: Option<ExternalSequenceBlockHash>,
start_position: Option<u32>,
) -> RouterEvent {
router_event(
worker_id,
event_id,
0,
add_blocks_with_start_position(hashes, parent, start_position),
)
}
pub fn create_remove_event(worker_id: WorkerId, event_id: u64, hashes: Vec<u64>) -> RouterEvent { pub fn create_remove_event(worker_id: WorkerId, event_id: u64, hashes: Vec<u64>) -> RouterEvent {
remove_event( remove_event(
worker_id, worker_id,
......
...@@ -464,6 +464,7 @@ pub fn convert_event( ...@@ -464,6 +464,7 @@ pub fn convert_event(
parent_hash: parent_block_hash parent_hash: parent_block_hash
.map(BlockHashValue::into_u64) .map(BlockHashValue::into_u64)
.map(ExternalSequenceBlockHash::from), .map(ExternalSequenceBlockHash::from),
start_position: None,
blocks: create_stored_blocks( blocks: create_stored_blocks(
kv_block_size, kv_block_size,
&token_ids, &token_ids,
......
...@@ -557,6 +557,7 @@ fn sequence_to_router_event(sequence: &SequenceData, event_id: u64) -> RouterEve ...@@ -557,6 +557,7 @@ fn sequence_to_router_event(sequence: &SequenceData, event_id: u64) -> RouterEve
event_id, event_id,
data: KvCacheEventData::Stored(KvCacheStoreData { data: KvCacheEventData::Stored(KvCacheStoreData {
parent_hash: None, parent_hash: None,
start_position: None,
blocks: sequence blocks: sequence
.local_hashes .local_hashes
.iter() .iter()
......
...@@ -947,6 +947,7 @@ mod tests { ...@@ -947,6 +947,7 @@ mod tests {
event_id, event_id,
data: KvCacheEventData::Stored(KvCacheStoreData { data: KvCacheEventData::Stored(KvCacheStoreData {
parent_hash: None, parent_hash: None,
start_position: None,
blocks: vec![KvCacheStoredBlockData { blocks: vec![KvCacheStoredBlockData {
block_hash: ExternalSequenceBlockHash(event_id), block_hash: ExternalSequenceBlockHash(event_id),
tokens_hash: LocalBlockHash(event_id), tokens_hash: LocalBlockHash(event_id),
......
...@@ -606,6 +606,7 @@ mod tests_startup_helpers { ...@@ -606,6 +606,7 @@ mod tests_startup_helpers {
event_id: 1, event_id: 1,
data: KvCacheEventData::Stored(KvCacheStoreData { data: KvCacheEventData::Stored(KvCacheStoreData {
parent_hash: None, parent_hash: None,
start_position: None,
blocks: vec![ blocks: vec![
KvCacheStoredBlockData { KvCacheStoredBlockData {
block_hash: ExternalSequenceBlockHash(100), block_hash: ExternalSequenceBlockHash(100),
...@@ -698,6 +699,7 @@ mod tests_startup_helpers { ...@@ -698,6 +699,7 @@ mod tests_startup_helpers {
event_id: 1, event_id: 1,
data: KvCacheEventData::Stored(KvCacheStoreData { data: KvCacheEventData::Stored(KvCacheStoreData {
parent_hash: None, parent_hash: None,
start_position: None,
blocks: vec![KvCacheStoredBlockData { blocks: vec![KvCacheStoredBlockData {
block_hash: ExternalSequenceBlockHash(100), block_hash: ExternalSequenceBlockHash(100),
tokens_hash: LocalBlockHash(200), tokens_hash: LocalBlockHash(200),
...@@ -780,6 +782,7 @@ mod tests_startup_helpers { ...@@ -780,6 +782,7 @@ mod tests_startup_helpers {
event_id: 1, event_id: 1,
data: KvCacheEventData::Stored(KvCacheStoreData { data: KvCacheEventData::Stored(KvCacheStoreData {
parent_hash: None, parent_hash: None,
start_position: None,
blocks: vec![KvCacheStoredBlockData { blocks: vec![KvCacheStoredBlockData {
block_hash: ExternalSequenceBlockHash(100), block_hash: ExternalSequenceBlockHash(100),
tokens_hash: LocalBlockHash(200), tokens_hash: LocalBlockHash(200),
...@@ -968,6 +971,7 @@ mod tests_startup_helpers { ...@@ -968,6 +971,7 @@ mod tests_startup_helpers {
let KvCacheEventData::Stored(KvCacheStoreData { let KvCacheEventData::Stored(KvCacheStoreData {
parent_hash, parent_hash,
start_position,
blocks, blocks,
}) = event.data }) = event.data
else { else {
...@@ -975,6 +979,7 @@ mod tests_startup_helpers { ...@@ -975,6 +979,7 @@ mod tests_startup_helpers {
}; };
assert!(parent_hash.is_none()); assert!(parent_hash.is_none());
assert!(start_position.is_none());
assert_eq!(blocks.len(), 1); assert_eq!(blocks.len(), 1);
assert_eq!(blocks[0].block_hash.0, 42); assert_eq!(blocks[0].block_hash.0, 42);
...@@ -1094,6 +1099,7 @@ mod tests_startup_helpers { ...@@ -1094,6 +1099,7 @@ mod tests_startup_helpers {
event_id: 1, event_id: 1,
data: KvCacheEventData::Stored(KvCacheStoreData { data: KvCacheEventData::Stored(KvCacheStoreData {
parent_hash: None, parent_hash: None,
start_position: None,
blocks: vec![ blocks: vec![
KvCacheStoredBlockData { KvCacheStoredBlockData {
block_hash: ExternalSequenceBlockHash(100), block_hash: ExternalSequenceBlockHash(100),
...@@ -1166,6 +1172,7 @@ mod tests_startup_helpers { ...@@ -1166,6 +1172,7 @@ mod tests_startup_helpers {
event_id: 2, event_id: 2,
data: KvCacheEventData::Stored(KvCacheStoreData { data: KvCacheEventData::Stored(KvCacheStoreData {
parent_hash: None, parent_hash: None,
start_position: None,
blocks: vec![ blocks: vec![
KvCacheStoredBlockData { KvCacheStoredBlockData {
block_hash: ExternalSequenceBlockHash(100), // Shared prefix block_hash: ExternalSequenceBlockHash(100), // Shared prefix
...@@ -1280,6 +1287,7 @@ mod test_event_dedup_filter { ...@@ -1280,6 +1287,7 @@ mod test_event_dedup_filter {
fn store_data(hashes: &[u64]) -> KvCacheStoreData { fn store_data(hashes: &[u64]) -> KvCacheStoreData {
KvCacheStoreData { KvCacheStoreData {
parent_hash: None, parent_hash: None,
start_position: None,
blocks: hashes blocks: hashes
.iter() .iter()
.map(|&h| KvCacheStoredBlockData { .map(|&h| KvCacheStoredBlockData {
...@@ -1556,6 +1564,7 @@ mod batching_state_tests { ...@@ -1556,6 +1564,7 @@ mod batching_state_tests {
state.pending_stored = Some(KvCacheStoreData { state.pending_stored = Some(KvCacheStoreData {
parent_hash: None, parent_hash: None,
start_position: None,
blocks: vec![], blocks: vec![],
}); });
assert!( assert!(
...@@ -1659,6 +1668,7 @@ mod batching_state_tests { ...@@ -1659,6 +1668,7 @@ mod batching_state_tests {
}; };
let first = KvCacheStoreData { let first = KvCacheStoreData {
parent_hash: Some(ExternalSequenceBlockHash(0)), parent_hash: Some(ExternalSequenceBlockHash(0)),
start_position: None,
blocks: vec![block1], blocks: vec![block1],
}; };
...@@ -1865,6 +1875,7 @@ mod event_processor_tests { ...@@ -1865,6 +1875,7 @@ mod event_processor_tests {
event_id: i as u64, event_id: i as u64,
data: KvCacheEventData::Stored(KvCacheStoreData { data: KvCacheEventData::Stored(KvCacheStoreData {
parent_hash, parent_hash,
start_position: None,
blocks: vec![KvCacheStoredBlockData { blocks: vec![KvCacheStoredBlockData {
block_hash: ExternalSequenceBlockHash(i as u64), block_hash: ExternalSequenceBlockHash(i as u64),
tokens_hash: LocalBlockHash(i as u64 * 100), tokens_hash: LocalBlockHash(i as u64 * 100),
...@@ -1957,6 +1968,7 @@ mod event_processor_tests { ...@@ -1957,6 +1968,7 @@ mod event_processor_tests {
event_id: i as u64, event_id: i as u64,
data: KvCacheEventData::Stored(KvCacheStoreData { data: KvCacheEventData::Stored(KvCacheStoreData {
parent_hash: Some(ExternalSequenceBlockHash((i + 1) as u64 * 100)), parent_hash: Some(ExternalSequenceBlockHash((i + 1) as u64 * 100)),
start_position: None,
blocks: vec![KvCacheStoredBlockData { blocks: vec![KvCacheStoredBlockData {
block_hash: ExternalSequenceBlockHash(i as u64), block_hash: ExternalSequenceBlockHash(i as u64),
tokens_hash: LocalBlockHash(i as u64 * 100), tokens_hash: LocalBlockHash(i as u64 * 100),
...@@ -2023,6 +2035,7 @@ mod event_processor_tests { ...@@ -2023,6 +2035,7 @@ mod event_processor_tests {
event_id: 0, event_id: 0,
data: KvCacheEventData::Stored(KvCacheStoreData { data: KvCacheEventData::Stored(KvCacheStoreData {
parent_hash: None, parent_hash: None,
start_position: Some(10),
blocks: vec![KvCacheStoredBlockData { blocks: vec![KvCacheStoredBlockData {
block_hash: ExternalSequenceBlockHash(1), block_hash: ExternalSequenceBlockHash(1),
tokens_hash: LocalBlockHash(100), tokens_hash: LocalBlockHash(100),
...@@ -2038,6 +2051,7 @@ mod event_processor_tests { ...@@ -2038,6 +2051,7 @@ mod event_processor_tests {
event_id: 1, event_id: 1,
data: KvCacheEventData::Stored(KvCacheStoreData { data: KvCacheEventData::Stored(KvCacheStoreData {
parent_hash: Some(ExternalSequenceBlockHash(1)), parent_hash: Some(ExternalSequenceBlockHash(1)),
start_position: Some(11_111),
blocks: vec![KvCacheStoredBlockData { blocks: vec![KvCacheStoredBlockData {
block_hash: ExternalSequenceBlockHash(2), block_hash: ExternalSequenceBlockHash(2),
tokens_hash: LocalBlockHash(200), tokens_hash: LocalBlockHash(200),
...@@ -2053,6 +2067,7 @@ mod event_processor_tests { ...@@ -2053,6 +2067,7 @@ mod event_processor_tests {
event_id: 2, event_id: 2,
data: KvCacheEventData::Stored(KvCacheStoreData { data: KvCacheEventData::Stored(KvCacheStoreData {
parent_hash: Some(ExternalSequenceBlockHash(1)), parent_hash: Some(ExternalSequenceBlockHash(1)),
start_position: Some(20),
blocks: vec![KvCacheStoredBlockData { blocks: vec![KvCacheStoredBlockData {
block_hash: ExternalSequenceBlockHash(3), block_hash: ExternalSequenceBlockHash(3),
tokens_hash: LocalBlockHash(300), tokens_hash: LocalBlockHash(300),
...@@ -2086,6 +2101,11 @@ mod event_processor_tests { ...@@ -2086,6 +2101,11 @@ mod event_processor_tests {
data.parent_hash, None, data.parent_hash, None,
"First batch should preserve the original root parent" "First batch should preserve the original root parent"
); );
assert_eq!(
data.start_position,
Some(10),
"First batch should preserve the original start position"
);
} else { } else {
panic!("Expected first event to be Stored"); panic!("Expected first event to be Stored");
} }
...@@ -2101,6 +2121,11 @@ mod event_processor_tests { ...@@ -2101,6 +2121,11 @@ mod event_processor_tests {
Some(ExternalSequenceBlockHash(1)), Some(ExternalSequenceBlockHash(1)),
"Second batch should preserve the reused parent hash" "Second batch should preserve the reused parent hash"
); );
assert_eq!(
data.start_position,
Some(20),
"Second batch should keep the new root's start position"
);
} else { } else {
panic!("Expected second event to be Stored"); panic!("Expected second event to be Stored");
} }
...@@ -2235,6 +2260,7 @@ mod event_processor_tests { ...@@ -2235,6 +2260,7 @@ mod event_processor_tests {
event_id: 1, event_id: 1,
data: KvCacheEventData::Stored(KvCacheStoreData { data: KvCacheEventData::Stored(KvCacheStoreData {
parent_hash: Some(ExternalSequenceBlockHash(0)), parent_hash: Some(ExternalSequenceBlockHash(0)),
start_position: None,
blocks: vec![KvCacheStoredBlockData { blocks: vec![KvCacheStoredBlockData {
block_hash: ExternalSequenceBlockHash(1), block_hash: ExternalSequenceBlockHash(1),
tokens_hash: LocalBlockHash(100), tokens_hash: LocalBlockHash(100),
...@@ -2537,6 +2563,7 @@ mod event_processor_tests { ...@@ -2537,6 +2563,7 @@ mod event_processor_tests {
event_id: 100, event_id: 100,
data: KvCacheEventData::Stored(KvCacheStoreData { data: KvCacheEventData::Stored(KvCacheStoreData {
parent_hash: Some(ExternalSequenceBlockHash(0)), parent_hash: Some(ExternalSequenceBlockHash(0)),
start_position: None,
blocks: vec![KvCacheStoredBlockData { blocks: vec![KvCacheStoredBlockData {
block_hash: ExternalSequenceBlockHash(1), block_hash: ExternalSequenceBlockHash(1),
tokens_hash: LocalBlockHash(100), tokens_hash: LocalBlockHash(100),
...@@ -2552,6 +2579,7 @@ mod event_processor_tests { ...@@ -2552,6 +2579,7 @@ mod event_processor_tests {
event_id: 101, event_id: 101,
data: KvCacheEventData::Stored(KvCacheStoreData { data: KvCacheEventData::Stored(KvCacheStoreData {
parent_hash: Some(ExternalSequenceBlockHash(1)), parent_hash: Some(ExternalSequenceBlockHash(1)),
start_position: None,
blocks: vec![KvCacheStoredBlockData { blocks: vec![KvCacheStoredBlockData {
block_hash: ExternalSequenceBlockHash(2), block_hash: ExternalSequenceBlockHash(2),
tokens_hash: LocalBlockHash(200), tokens_hash: LocalBlockHash(200),
...@@ -2569,6 +2597,7 @@ mod event_processor_tests { ...@@ -2569,6 +2597,7 @@ mod event_processor_tests {
event_id: 200, event_id: 200,
data: KvCacheEventData::Stored(KvCacheStoreData { data: KvCacheEventData::Stored(KvCacheStoreData {
parent_hash: Some(ExternalSequenceBlockHash(0)), parent_hash: Some(ExternalSequenceBlockHash(0)),
start_position: None,
blocks: vec![KvCacheStoredBlockData { blocks: vec![KvCacheStoredBlockData {
block_hash: ExternalSequenceBlockHash(100), block_hash: ExternalSequenceBlockHash(100),
tokens_hash: LocalBlockHash(1000), tokens_hash: LocalBlockHash(1000),
...@@ -2654,6 +2683,7 @@ mod event_processor_tests { ...@@ -2654,6 +2683,7 @@ mod event_processor_tests {
event_id: 0, event_id: 0,
data: KvCacheEventData::Stored(KvCacheStoreData { data: KvCacheEventData::Stored(KvCacheStoreData {
parent_hash: None, // Root block with no parent parent_hash: None, // Root block with no parent
start_position: Some(10),
blocks: vec![KvCacheStoredBlockData { blocks: vec![KvCacheStoredBlockData {
block_hash: ExternalSequenceBlockHash(1), block_hash: ExternalSequenceBlockHash(1),
tokens_hash: LocalBlockHash(100), tokens_hash: LocalBlockHash(100),
...@@ -2670,6 +2700,7 @@ mod event_processor_tests { ...@@ -2670,6 +2700,7 @@ mod event_processor_tests {
event_id: 1, event_id: 1,
data: KvCacheEventData::Stored(KvCacheStoreData { data: KvCacheEventData::Stored(KvCacheStoreData {
parent_hash: Some(ExternalSequenceBlockHash(1)), // Points to previous block parent_hash: Some(ExternalSequenceBlockHash(1)), // Points to previous block
start_position: Some(999),
blocks: vec![KvCacheStoredBlockData { blocks: vec![KvCacheStoredBlockData {
block_hash: ExternalSequenceBlockHash(2), block_hash: ExternalSequenceBlockHash(2),
tokens_hash: LocalBlockHash(200), tokens_hash: LocalBlockHash(200),
...@@ -2686,6 +2717,7 @@ mod event_processor_tests { ...@@ -2686,6 +2717,7 @@ mod event_processor_tests {
event_id: 2, event_id: 2,
data: KvCacheEventData::Stored(KvCacheStoreData { data: KvCacheEventData::Stored(KvCacheStoreData {
parent_hash: Some(ExternalSequenceBlockHash(2)), parent_hash: Some(ExternalSequenceBlockHash(2)),
start_position: Some(1_234),
blocks: vec![KvCacheStoredBlockData { blocks: vec![KvCacheStoredBlockData {
block_hash: ExternalSequenceBlockHash(3), block_hash: ExternalSequenceBlockHash(3),
tokens_hash: LocalBlockHash(300), tokens_hash: LocalBlockHash(300),
...@@ -2716,6 +2748,11 @@ mod event_processor_tests { ...@@ -2716,6 +2748,11 @@ mod event_processor_tests {
data.parent_hash, None, data.parent_hash, None,
"Batch parent_hash should remain None (from first event), NOT overwritten by subsequent events" "Batch parent_hash should remain None (from first event), NOT overwritten by subsequent events"
); );
assert_eq!(
data.start_position,
Some(10),
"Batch start_position should remain anchored to the first event"
);
} else { } else {
panic!("Expected Stored event"); panic!("Expected Stored event");
} }
......
...@@ -310,6 +310,7 @@ impl SglangKvManager { ...@@ -310,6 +310,7 @@ impl SglangKvManager {
event_id: self.next_event_id, event_id: self.next_event_id,
data: KvCacheEventData::Stored(KvCacheStoreData { data: KvCacheEventData::Stored(KvCacheStoreData {
parent_hash, parent_hash,
start_position: None,
blocks, blocks,
}), }),
dp_rank: self.dp_rank, dp_rank: self.dp_rank,
......
...@@ -116,6 +116,7 @@ impl KvManager { ...@@ -116,6 +116,7 @@ impl KvManager {
KvCacheEventData::Stored(KvCacheStoreData { KvCacheEventData::Stored(KvCacheStoreData {
parent_hash: parent_hash.map(ExternalSequenceBlockHash), parent_hash: parent_hash.map(ExternalSequenceBlockHash),
start_position: None,
blocks: full_blocks blocks: full_blocks
.into_iter() .into_iter()
.zip(local_hashes_slice.iter()) .zip(local_hashes_slice.iter())
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment