tests.rs 105 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

use std::sync::Arc;
use std::time::{Duration, Instant};

use rstest::rstest;
use rstest_reuse::{self, *};
use tokio::time;
use tokio_util::sync::CancellationToken;

use super::concurrent_radix_tree::ConcurrentRadixTree;
13
use super::concurrent_radix_tree_compressed::ConcurrentRadixTreeCompressed;
14
15
use super::positional::PositionalIndexer;
use super::*;
16
use crate::indexer::pruning::PruneConfig;
17
use crate::protocols::*;
18
use crate::test_utils::{remove_event, router_event, stored_blocks_with_sequence_hashes};
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34

// ============================================================================
// Helper functions
// ============================================================================

/// Create a store event with proper sequence hashes computed from local hashes.
fn make_store_event(worker_id: u64, local_hashes: &[u64]) -> RouterEvent {
    make_store_event_with_dp_rank(worker_id, local_hashes, 0)
}

/// Create a store event with a specific dp_rank.
fn make_store_event_with_dp_rank(
    worker_id: u64,
    local_hashes: &[u64],
    dp_rank: u32,
) -> RouterEvent {
35
    make_store_event_full(worker_id, local_hashes, dp_rank, None, None)
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
}

/// Create a store event with parent hash for continuation sequences.
/// `prefix_hashes` are the hashes of the prefix (to compute parent_hash).
/// `local_hashes` are the new blocks being stored.
fn make_store_event_with_parent(
    worker_id: u64,
    prefix_hashes: &[u64],
    local_hashes: &[u64],
) -> RouterEvent {
    // Compute the parent hash from the prefix
    let prefix_block_hashes: Vec<LocalBlockHash> =
        prefix_hashes.iter().map(|&h| LocalBlockHash(h)).collect();
    let prefix_seq_hashes = compute_seq_hash_for_block(&prefix_block_hashes);
    let parent_hash = prefix_seq_hashes
        .last()
        .map(|&h| ExternalSequenceBlockHash(h));

    // Compute the full sequence including prefix for proper seq_hash calculation
    let full_hashes: Vec<u64> = prefix_hashes
        .iter()
        .chain(local_hashes.iter())
        .copied()
        .collect();
    let full_block_hashes: Vec<LocalBlockHash> =
        full_hashes.iter().map(|&h| LocalBlockHash(h)).collect();
    let full_seq_hashes = compute_seq_hash_for_block(&full_block_hashes);

    // Only include the new blocks (skip prefix)
    let new_block_hashes: Vec<LocalBlockHash> =
        local_hashes.iter().map(|&h| LocalBlockHash(h)).collect();
    let new_seq_hashes = &full_seq_hashes[prefix_hashes.len()..];

69
    router_event(
70
        worker_id,
71
72
73
74
        0,
        0,
        KvCacheEventData::Stored(KvCacheStoreData {
            parent_hash,
75
            start_position: None,
76
77
78
            blocks: stored_blocks_with_sequence_hashes(&new_block_hashes, new_seq_hashes),
        }),
    )
79
80
}

81
82
83
84
85
86
87
88
fn make_store_event_with_start_position(
    worker_id: u64,
    local_hashes: &[u64],
    start_position: u32,
) -> RouterEvent {
    make_store_event_full(worker_id, local_hashes, 0, None, Some(start_position))
}

89
90
91
92
93
94
/// Create a store event with all options.
fn make_store_event_full(
    worker_id: u64,
    local_hashes: &[u64],
    dp_rank: u32,
    parent_hash: Option<ExternalSequenceBlockHash>,
95
    start_position: Option<u32>,
96
97
98
99
100
) -> RouterEvent {
    let local_block_hashes: Vec<LocalBlockHash> =
        local_hashes.iter().map(|&h| LocalBlockHash(h)).collect();
    let seq_hashes = compute_seq_hash_for_block(&local_block_hashes);

101
    router_event(
102
        worker_id,
103
104
105
106
        0,
        dp_rank,
        KvCacheEventData::Stored(KvCacheStoreData {
            parent_hash,
107
            start_position,
108
109
110
            blocks: stored_blocks_with_sequence_hashes(&local_block_hashes, &seq_hashes),
        }),
    )
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
}

/// Create a remove event for blocks with given local hashes.
fn make_remove_event(worker_id: u64, local_hashes: &[u64]) -> RouterEvent {
    make_remove_event_with_dp_rank(worker_id, local_hashes, 0)
}

/// Create a remove event with a specific dp_rank.
fn make_remove_event_with_dp_rank(
    worker_id: u64,
    local_hashes: &[u64],
    dp_rank: u32,
) -> RouterEvent {
    let local_block_hashes: Vec<LocalBlockHash> =
        local_hashes.iter().map(|&h| LocalBlockHash(h)).collect();
    let seq_hashes = compute_seq_hash_for_block(&local_block_hashes);

128
    remove_event(
129
        worker_id,
130
131
132
133
134
135
136
        0,
        dp_rank,
        seq_hashes
            .iter()
            .map(|&h| ExternalSequenceBlockHash(h))
            .collect(),
    )
137
138
}

139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
/// Create a remove event with parent hash for continuation sequences.
/// `prefix_hashes` are the hashes of the prefix (to compute parent_hash and full seq context).
/// `local_hashes` are the blocks being removed.
fn make_remove_event_with_parent(
    worker_id: u64,
    prefix_hashes: &[u64],
    local_hashes: &[u64],
) -> RouterEvent {
    let full_hashes: Vec<u64> = prefix_hashes
        .iter()
        .chain(local_hashes.iter())
        .copied()
        .collect();
    let full_block_hashes: Vec<LocalBlockHash> =
        full_hashes.iter().map(|&h| LocalBlockHash(h)).collect();
    let full_seq_hashes = compute_seq_hash_for_block(&full_block_hashes);

    let suffix_seq_hashes = &full_seq_hashes[prefix_hashes.len()..];

158
    remove_event(
159
        worker_id,
160
161
162
163
164
165
166
        0,
        0,
        suffix_seq_hashes
            .iter()
            .map(|&h| ExternalSequenceBlockHash(h))
            .collect(),
    )
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
}

/// Snapshot the tree state for deterministic comparison.
/// Dumps all events, zeros out `event_id`, and sorts by `(worker_id, dp_rank, block_hash)`.
async fn snapshot_tree(index: &dyn KvIndexerInterface) -> Vec<RouterEvent> {
    let mut events = index.dump_events().await.unwrap();
    for ev in &mut events {
        ev.event.event_id = 0;
    }
    events.sort_by(|a, b| {
        a.worker_id.cmp(&b.worker_id).then_with(|| {
            a.event.dp_rank.cmp(&b.event.dp_rank).then_with(|| {
                let hash_a = match &a.event.data {
                    KvCacheEventData::Stored(s) => {
                        s.blocks.first().map(|b| b.block_hash.0).unwrap_or(0)
                    }
                    KvCacheEventData::Removed(r) => {
                        r.block_hashes.first().map(|h| h.0).unwrap_or(0)
                    }
                    KvCacheEventData::Cleared => 0,
                };
                let hash_b = match &b.event.data {
                    KvCacheEventData::Stored(s) => {
                        s.blocks.first().map(|b| b.block_hash.0).unwrap_or(0)
                    }
                    KvCacheEventData::Removed(r) => {
                        r.block_hashes.first().map(|h| h.0).unwrap_or(0)
                    }
                    KvCacheEventData::Cleared => 0,
                };
                hash_a.cmp(&hash_b)
            })
        })
    });
    events
}

204
205
206
207
208
209
210
/// Create a clear event for a worker.
fn make_clear_event(worker_id: u64) -> RouterEvent {
    make_clear_event_with_dp_rank(worker_id, 0)
}

/// Create a clear event with a specific dp_rank.
fn make_clear_event_with_dp_rank(worker_id: u64, dp_rank: u32) -> RouterEvent {
211
    router_event(worker_id, 0, dp_rank, KvCacheEventData::Cleared)
212
213
214
215
216
217
218
219
}

// ============================================================================
// KvIndexerInterface tests - parametrized over all implementations
// ============================================================================

#[template]
#[rstest]
220
fn indexer_template(
221
    #[values("single", "flat", "concurrent", "concurrent_compressed")] variant: &str,
222
223
) {
}
224

225
226
227
#[template]
#[rstest]
fn tree_size_indexer_template(
228
    #[values("single", "concurrent", "concurrent_compressed")] variant: &str,
229
230
231
) {
}

232
233
fn make_indexer(variant: &str) -> Box<dyn KvIndexerInterface> {
    let metrics = Arc::new(KvIndexerMetrics::new_unregistered());
234
235
236
237
238
239
240
241
    make_indexer_with_metrics(variant, metrics).0
}

fn make_indexer_with_metrics(
    variant: &str,
    metrics: Arc<KvIndexerMetrics>,
) -> (Box<dyn KvIndexerInterface>, Arc<KvIndexerMetrics>) {
    let token = CancellationToken::new();
242
243
    let kv_block_size = 32;

244
245
246
    let indexer: Box<dyn KvIndexerInterface> = match variant {
        "single" => Box::new(KvIndexer::new(token, kv_block_size, metrics.clone())),
        "flat" => Box::new(ThreadPoolIndexer::new_with_metrics(
247
248
249
            PositionalIndexer::new(32),
            4,
            kv_block_size,
250
            Some(metrics.clone()),
251
        )),
252
        "concurrent" => Box::new(ThreadPoolIndexer::new_with_metrics(
253
254
255
            ConcurrentRadixTree::new(),
            4,
            kv_block_size,
256
            Some(metrics.clone()),
257
        )),
258
        "concurrent_compressed" => Box::new(ThreadPoolIndexer::new_with_metrics(
259
260
261
            ConcurrentRadixTreeCompressed::new(),
            4,
            kv_block_size,
262
            Some(metrics.clone()),
263
        )),
264
        _ => panic!("Unknown variant: {}", variant),
265
266
267
    };

    (indexer, metrics)
268
269
}

270
271
272
273
274
275
276
277
/// Ensure queued indexer work is drained, then give a short settle window.
/// This is intentionally conservative for tests that assert immediately
/// after asynchronous event ingestion.
async fn flush_and_settle(index: &dyn KvIndexerInterface) {
    index.flush().await;
    tokio::time::sleep(Duration::from_millis(100)).await;
}

278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
async fn query_scores(index: &dyn KvIndexerInterface, query: &[u64]) -> OverlapScores {
    index
        .find_matches(query.iter().copied().map(LocalBlockHash).collect())
        .await
        .unwrap()
}

async fn assert_score(
    index: &dyn KvIndexerInterface,
    query: &[u64],
    worker: WorkerWithDpRank,
    expected_score: u32,
) {
    let scores = query_scores(index, query).await;
    assert_eq!(scores.scores.get(&worker), Some(&expected_score));
}

295
296
297
298
299
300
301
async fn assert_query_score_and_tree_size(
    index: &dyn KvIndexerInterface,
    query: &[u64],
    worker: WorkerWithDpRank,
    expected_score: u32,
    expected_tree_size: usize,
) {
302
    let scores = query_scores(index, query).await;
303
304
305
306
    assert_eq!(scores.scores.get(&worker), Some(&expected_score));
    assert_eq!(scores.tree_sizes.get(&worker), Some(&expected_tree_size));
}

307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
async fn assert_no_scores(index: &dyn KvIndexerInterface, query: &[u64]) {
    let scores = query_scores(index, query).await;
    assert!(scores.scores.is_empty());
}

async fn assert_exact_scores(
    index: &dyn KvIndexerInterface,
    query: &[u64],
    expected_scores: &[(WorkerWithDpRank, u32)],
) {
    let scores = query_scores(index, query).await;
    assert_eq!(scores.scores.len(), expected_scores.len());
    for (worker, expected_score) in expected_scores {
        assert_eq!(scores.scores.get(worker), Some(expected_score));
    }
}

324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
#[cfg(feature = "metrics")]
fn event_metric_value(
    metrics: &KvIndexerMetrics,
    event_type: &'static str,
    status: &'static str,
) -> u64 {
    metrics
        .kv_cache_events_applied
        .get_metric_with_label_values(&[event_type, status])
        .unwrap()
        .get()
}

#[cfg(feature = "metrics")]
fn warning_metric_value(metrics: &KvIndexerMetrics, warning_kind: &'static str) -> u64 {
    metrics
        .kv_cache_event_warnings
        .get_metric_with_label_values(&[warning_kind])
        .unwrap()
        .get()
}

#[cfg(feature = "metrics")]
fn assert_no_event_errors(metrics: &KvIndexerMetrics) {
    let invalid_count = [
        (METRIC_EVENT_STORED, METRIC_STATUS_PARENT_NOT_FOUND),
        (METRIC_EVENT_STORED, METRIC_STATUS_BLOCK_NOT_FOUND),
        (METRIC_EVENT_STORED, METRIC_STATUS_INVALID_BLOCK),
        (METRIC_EVENT_REMOVED, METRIC_STATUS_PARENT_NOT_FOUND),
        (METRIC_EVENT_REMOVED, METRIC_STATUS_BLOCK_NOT_FOUND),
        (METRIC_EVENT_REMOVED, METRIC_STATUS_INVALID_BLOCK),
    ]
    .into_iter()
    .map(|(event_type, status)| event_metric_value(metrics, event_type, status))
    .sum::<u64>();
    assert_eq!(
        invalid_count, 0,
        "router indexer reported invalid KV events"
    );
}

#[cfg(feature = "metrics")]
fn assert_no_event_warnings(metrics: &KvIndexerMetrics) {
    assert_eq!(
        warning_metric_value(metrics, METRIC_WARNING_DUPLICATE_STORE),
        0,
        "router indexer reported suspicious KV events",
    );
}

374
375
376
377
mod interface_tests {
    use super::*;
    use rstest_reuse::apply;

378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
    #[cfg(feature = "metrics")]
    #[tokio::test]
    #[apply(indexer_template)]
    async fn test_duplicate_store_replay_warns_without_error(variant: &str) {
        let metrics = Arc::new(KvIndexerMetrics::new_unregistered());
        let (index, metrics) = make_indexer_with_metrics(variant, metrics);
        let worker = WorkerWithDpRank::new(0, 0);
        let event = make_store_event(0, &[1, 2, 3]);

        index.apply_event(event.clone()).await;
        flush_and_settle(index.as_ref()).await;
        let first_snapshot = snapshot_tree(index.as_ref()).await;

        index.apply_event(event).await;
        flush_and_settle(index.as_ref()).await;

        assert_eq!(
            first_snapshot,
            snapshot_tree(index.as_ref()).await,
            "replaying the same store event should not change the tree structure"
        );
        assert_score(index.as_ref(), &[1, 2, 3], worker, 3).await;
        assert_no_event_errors(metrics.as_ref());
        assert_eq!(
            warning_metric_value(metrics.as_ref(), METRIC_WARNING_DUPLICATE_STORE),
            1
        );
    }

    #[cfg(feature = "metrics")]
    #[tokio::test]
    #[apply(indexer_template)]
    async fn test_continuation_store_does_not_warn(variant: &str) {
        let metrics = Arc::new(KvIndexerMetrics::new_unregistered());
        let (index, metrics) = make_indexer_with_metrics(variant, metrics);
        let worker = WorkerWithDpRank::new(0, 0);

        index.apply_event(make_store_event(0, &[1, 2, 3])).await;
        flush_and_settle(index.as_ref()).await;

        index
            .apply_event(make_store_event_with_parent(0, &[1, 2, 3], &[4, 5]))
            .await;
        flush_and_settle(index.as_ref()).await;

        assert_score(index.as_ref(), &[1, 2, 3, 4, 5], worker, 5).await;
        assert_no_event_errors(metrics.as_ref());
        assert_no_event_warnings(metrics.as_ref());
    }

428
429
430
431
432
433
434
435
436
437
    #[tokio::test]
    #[apply(indexer_template)]
    async fn test_store_and_find(variant: &str) {
        let index = make_indexer(variant);

        // Store a sequence for worker 0
        index.apply_event(make_store_event(0, &[1, 2, 3])).await;

        flush_and_settle(index.as_ref()).await;

438
        assert_score(index.as_ref(), &[1, 2, 3], WorkerWithDpRank::new(0, 0), 3).await;
439
    }
440

441
    #[tokio::test]
442
443
444
    #[apply(tree_size_indexer_template)]
    async fn test_tree_size_accounting_stays_stable(variant: &str) {
        let index = make_indexer(variant);
445
        let worker = WorkerWithDpRank::new(0, 0);
446
447
448
449
450
        let prefix_event = make_store_event(0, &[1, 2, 3]);
        let continuation_event = make_store_event_with_parent(0, &[1, 2, 3], &[4, 5]);
        let continuation_remove = make_remove_event_with_parent(0, &[1, 2, 3], &[4, 5]);
        let prefix_remove = make_remove_event(0, &[1, 2, 3]);

451
452
453
454
        // TODO: The non-compressed radix-family implementations still have a broader
        // tree-size accounting gap after mid-chain removes because descendant
        // lookup entries are cleaned up lazily. That means "store -> partial
        // remove -> restore continuation" can still miscount restored coverage
455
        // in single and concurrent. This test is intentionally scoped
456
457
        // to duplicate store/remove replay so all tree-size variants share the
        // same stable baseline.
458
459
460

        index.apply_event(prefix_event.clone()).await;
        flush_and_settle(index.as_ref()).await;
461

462
463
464
465
        assert_query_score_and_tree_size(index.as_ref(), &[1, 2, 3], worker, 3, 3).await;
        let prefix_snapshot = snapshot_tree(index.as_ref()).await;

        index.apply_event(prefix_event).await;
466
467
468
        flush_and_settle(index.as_ref()).await;

        assert_eq!(
469
470
471
            prefix_snapshot,
            snapshot_tree(index.as_ref()).await,
            "replaying the same store event should not change the tree structure"
472
        );
473
        assert_query_score_and_tree_size(index.as_ref(), &[1, 2, 3], worker, 3, 3).await;
474

475
        index.apply_event(continuation_event.clone()).await;
476
477
        flush_and_settle(index.as_ref()).await;

478
479
480
481
482
        assert_query_score_and_tree_size(index.as_ref(), &[1, 2, 3, 4, 5], worker, 5, 5).await;
        let full_snapshot = snapshot_tree(index.as_ref()).await;

        index.apply_event(continuation_event).await;
        flush_and_settle(index.as_ref()).await;
483
484

        assert_eq!(
485
486
487
            full_snapshot,
            snapshot_tree(index.as_ref()).await,
            "replaying the same continuation store should not change the tree structure"
488
        );
489
490
491
492
493
494
495
496
497
498
499
        assert_query_score_and_tree_size(index.as_ref(), &[1, 2, 3, 4, 5], worker, 5, 5).await;

        index.apply_event(continuation_remove.clone()).await;
        flush_and_settle(index.as_ref()).await;

        assert_query_score_and_tree_size(index.as_ref(), &[1, 2, 3, 4, 5], worker, 3, 3).await;
        let trimmed_snapshot = snapshot_tree(index.as_ref()).await;

        index.apply_event(continuation_remove).await;
        flush_and_settle(index.as_ref()).await;

500
        assert_eq!(
501
502
503
            trimmed_snapshot,
            snapshot_tree(index.as_ref()).await,
            "replaying the same remove event should not change the tree structure"
504
        );
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
        assert_query_score_and_tree_size(index.as_ref(), &[1, 2, 3, 4, 5], worker, 3, 3).await;

        index.apply_event(prefix_remove.clone()).await;
        flush_and_settle(index.as_ref()).await;

        let empty_scores = index
            .find_matches(vec![
                LocalBlockHash(1),
                LocalBlockHash(2),
                LocalBlockHash(3),
                LocalBlockHash(4),
                LocalBlockHash(5),
            ])
            .await
            .unwrap();
        assert!(empty_scores.scores.is_empty());
        assert!(snapshot_tree(index.as_ref()).await.is_empty());

        index.apply_event(prefix_remove).await;
        flush_and_settle(index.as_ref()).await;

        let duplicate_empty_scores = index
            .find_matches(vec![
                LocalBlockHash(1),
                LocalBlockHash(2),
                LocalBlockHash(3),
                LocalBlockHash(4),
                LocalBlockHash(5),
            ])
            .await
            .unwrap();
        assert!(duplicate_empty_scores.scores.is_empty());
        assert!(snapshot_tree(index.as_ref()).await.is_empty());
538
539
    }

540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
    #[tokio::test]
    async fn test_concurrent_compressed_restore_after_mid_chain_remove_updates_tree_size() {
        let index = make_indexer("concurrent_compressed");
        let worker = WorkerWithDpRank::new(0, 0);

        index.apply_event(make_store_event(0, &[1, 2, 3])).await;
        flush_and_settle(index.as_ref()).await;

        assert_query_score_and_tree_size(index.as_ref(), &[1, 2, 3], worker, 3, 3).await;

        index
            .apply_event(make_remove_event_with_parent(0, &[1], &[2]))
            .await;
        flush_and_settle(index.as_ref()).await;

        assert_query_score_and_tree_size(index.as_ref(), &[1, 2, 3], worker, 1, 1).await;

        index
            .apply_event(make_store_event_with_parent(0, &[1], &[2, 3]))
            .await;
        flush_and_settle(index.as_ref()).await;

        assert_query_score_and_tree_size(index.as_ref(), &[1, 2, 3], worker, 3, 3).await;
    }

    #[tokio::test]
    async fn test_concurrent_compressed_partial_node_drops_unreachable_descendants() {
        let index = make_indexer("concurrent_compressed");

        index.apply_event(make_store_event(0, &[1, 2, 3])).await;
        index
            .apply_event(make_store_event_with_parent(0, &[1, 2, 3], &[4, 5]))
            .await;
        flush_and_settle(index.as_ref()).await;

        index
            .apply_event(make_remove_event_with_parent(0, &[1], &[2]))
            .await;
        flush_and_settle(index.as_ref()).await;

        assert_eq!(
            snapshot_tree(index.as_ref()).await,
            vec![make_store_event(0, &[1])]
        );
    }

586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
    #[tokio::test]
    async fn test_concurrent_compressed_cleanup_prunes_dead_children_under_live_prefix() {
        let index = ThreadPoolIndexer::new(ConcurrentRadixTreeCompressed::new(), 1, 32);

        index.apply_event(make_store_event(0, &[1, 2, 3])).await;
        index
            .apply_event(make_store_event_with_parent(0, &[1, 2, 3], &[4, 5]))
            .await;
        index
            .apply_event(make_store_event_with_parent(0, &[1, 2, 3], &[6, 7]))
            .await;
        flush_and_settle(&index).await;

        index
            .apply_event(make_remove_event_with_parent(0, &[1, 2, 3], &[4, 5]))
            .await;
        index
            .apply_event(make_remove_event_with_parent(0, &[1, 2, 3], &[6, 7]))
            .await;
        flush_and_settle(&index).await;

        let expected_snapshot = vec![make_store_event(0, &[1, 2, 3])];
        assert_eq!(snapshot_tree(&index).await, expected_snapshot);
        assert_eq!(index.backend().raw_child_edge_count(), 3);

        index.backend().run_cleanup_for_test();

        assert_eq!(index.backend().raw_child_edge_count(), 1);
        assert_eq!(
            snapshot_tree(&index).await,
            vec![make_store_event(0, &[1, 2, 3])]
        );
        assert_score(&index, &[1, 2, 3], WorkerWithDpRank::new(0, 0), 3).await;
    }

621
622
623
624
    #[tokio::test]
    #[apply(indexer_template)]
    async fn test_partial_match(variant: &str) {
        let index = make_indexer(variant);
625

626
627
        // Store [1, 2, 3] for worker 0
        index.apply_event(make_store_event(0, &[1, 2, 3])).await;
628

629
        flush_and_settle(index.as_ref()).await;
630

631
        assert_score(index.as_ref(), &[1, 2, 999], WorkerWithDpRank::new(0, 0), 2).await;
632
    }
633

634
635
636
637
    #[tokio::test]
    #[apply(indexer_template)]
    async fn test_remove(variant: &str) {
        let index = make_indexer(variant);
638

639
640
        // Store sequence for worker 0
        index.apply_event(make_store_event(0, &[1, 2, 3])).await;
641

642
643
        // Remove all blocks
        index.apply_event(make_remove_event(0, &[1, 2, 3])).await;
644

645
        flush_and_settle(index.as_ref()).await;
646

647
        assert_no_scores(index.as_ref(), &[1, 2, 3]).await;
648
    }
649

650
651
652
653
    #[tokio::test]
    #[apply(indexer_template)]
    async fn test_multiple_workers_shared_prefix(variant: &str) {
        let index = make_indexer(variant);
654

655
656
657
658
659
        // Worker 0 has [1, 2], Worker 1 has [1, 3]
        // Since sequence hashes are cumulative, [1] has same hash for both,
        // but [1, 2] and [1, 3] have different hashes.
        index.apply_event(make_store_event(0, &[1, 2])).await;
        index.apply_event(make_store_event(1, &[1, 3])).await;
660

661
        flush_and_settle(index.as_ref()).await;
662

663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
        assert_exact_scores(
            index.as_ref(),
            &[1],
            &[
                (WorkerWithDpRank::new(0, 0), 1),
                (WorkerWithDpRank::new(1, 0), 1),
            ],
        )
        .await;

        assert_exact_scores(
            index.as_ref(),
            &[1, 2],
            &[
                (WorkerWithDpRank::new(0, 0), 2),
                (WorkerWithDpRank::new(1, 0), 1),
            ],
        )
        .await;
682
    }
683

684
685
686
687
    #[tokio::test]
    #[apply(indexer_template)]
    async fn test_remove_worker(variant: &str) {
        let index = make_indexer(variant);
688

689
690
        index.apply_event(make_store_event(0, &[1, 2, 3])).await;
        index.apply_event(make_store_event(1, &[1, 2, 3])).await;
691

692
693
        // Allow time for async event processing
        flush_and_settle(index.as_ref()).await;
694

695
        index.remove_worker(0).await;
696

697
698
        // Allow time for async remove_worker processing
        flush_and_settle(index.as_ref()).await;
699

700
701
702
703
704
705
        assert_exact_scores(
            index.as_ref(),
            &[1, 2, 3],
            &[(WorkerWithDpRank::new(1, 0), 3)],
        )
        .await;
706
707
    }

708
709
710
711
    #[tokio::test]
    #[apply(indexer_template)]
    async fn test_large_stores(variant: &str) {
        let index = make_indexer(variant);
712

713
714
715
716
717
718
719
720
721
        // Test sequences of increasing sizes
        for i in 0..10u64 {
            let len = 1 << i; // 1, 2, 4, 8, ..., 512
            let worker_id = i;
            let sequence: Vec<u64> = (1..=len).map(|x| x + (i * 10000)).collect();
            index
                .apply_event(make_store_event(worker_id, &sequence))
                .await;
        }
722

723
        flush_and_settle(index.as_ref()).await;
724

725
726
727
728
729
730
        // Verify we can find matches for the last stored sequence
        let last_seq: Vec<LocalBlockHash> = (1..=512u64)
            .map(|x| LocalBlockHash(x + (9 * 10000)))
            .collect();
        let scores = index.find_matches(last_seq).await.unwrap();
        assert!(!scores.scores.is_empty());
731
732
    }

733
734
735
736
    #[tokio::test]
    #[apply(indexer_template)]
    async fn test_dump_and_restore(variant: &str) {
        let index = make_indexer(variant);
737

738
739
740
        // Store some data
        index.apply_event(make_store_event(0, &[1, 2, 3])).await;
        index.apply_event(make_store_event(1, &[1, 2, 4])).await;
741

742
743
        // Allow background worker threads to process events.
        flush_and_settle(index.as_ref()).await;
744

745
746
747
        // Dump the tree as events and replay into a new index
        let events = index.dump_events().await.unwrap();
        assert!(!events.is_empty());
748

749
750
751
752
        let restored = make_indexer(variant);
        for event in events {
            restored.apply_event(event).await;
        }
753

754
        flush_and_settle(restored.as_ref()).await;
755

756
757
758
759
760
        assert_eq!(
            snapshot_tree(index.as_ref()).await,
            snapshot_tree(restored.as_ref()).await
        );
    }
761

762
763
764
765
    #[tokio::test]
    #[apply(indexer_template)]
    async fn test_clear_all_blocks(variant: &str) {
        let index = make_indexer(variant);
766

767
768
769
        // Store some data for two workers
        index.apply_event(make_store_event(0, &[1, 2, 3])).await;
        index.apply_event(make_store_event(1, &[1, 2, 3])).await;
770

771
772
        // Clear worker 0's blocks using the Cleared event
        index.apply_event(make_clear_event(0)).await;
773

774
        flush_and_settle(index.as_ref()).await;
775

776
777
778
779
780
781
782
783
784
785
786
787
        // Worker 0's blocks should be gone, worker 1's remain
        let scores = index
            .find_matches(vec![
                LocalBlockHash(1),
                LocalBlockHash(2),
                LocalBlockHash(3),
            ])
            .await
            .unwrap();
        assert_eq!(scores.scores.len(), 1);
        assert!(scores.scores.contains_key(&WorkerWithDpRank::new(1, 0)));
    }
788

789
790
791
792
    #[tokio::test]
    #[apply(indexer_template)]
    async fn test_empty_query(variant: &str) {
        let index = make_indexer(variant);
793

794
        index.apply_event(make_store_event(0, &[1, 2, 3])).await;
795

796
        flush_and_settle(index.as_ref()).await;
797

798
        assert_no_scores(index.as_ref(), &[]).await;
799
    }
800

801
802
803
804
    #[tokio::test]
    #[apply(indexer_template)]
    async fn test_miss_query(variant: &str) {
        let index = make_indexer(variant);
805

806
        index.apply_event(make_store_event(0, &[1, 2, 3])).await;
807

808
        flush_and_settle(index.as_ref()).await;
809

810
        assert_no_scores(index.as_ref(), &[999, 998]).await;
811
    }
812

813
814
815
816
817
818
    #[tokio::test]
    #[apply(indexer_template)]
    async fn test_shutdown(variant: &str) {
        let index = make_indexer(variant);
        index.shutdown();
    }
819

820
821
822
823
824
825
826
827
828
    #[tokio::test]
    #[apply(indexer_template)]
    async fn test_shutdown_idempotent(variant: &str) {
        let index = make_indexer(variant);
        index.apply_event(make_store_event(0, &[1, 2, 3])).await;
        flush_and_settle(index.as_ref()).await;
        index.shutdown();
        index.shutdown();
    }
829

830
831
832
833
834
835
836
    #[tokio::test]
    #[apply(indexer_template)]
    async fn test_find_matches_for_request(variant: &str) {
        let index = make_indexer(variant);

        // Empty index should return no matches
        let tokens = vec![1, 2, 3, 4];
837
838
839
840
        let scores = index
            .find_matches_for_request(&tokens, None, None)
            .await
            .unwrap();
841
842
843
844
845
846
847
848
849
850
851
        assert!(scores.scores.is_empty());

        // Store some data and verify we can find it via tokens
        index.apply_event(make_store_event(0, &[1, 2, 3])).await;

        // Allow time for async processing
        flush_and_settle(index.as_ref()).await;

        // Note: find_matches_for_request computes block hashes from tokens,
        // so we need tokens that hash to the same LocalBlockHash values.
        // For this test, we just verify the method works without error.
852
853
854
855
        let scores = index
            .find_matches_for_request(&tokens, None, None)
            .await
            .unwrap();
856
857
858
859
        // The tokens [1,2,3,4] won't match our stored [1,2,3] local hashes
        // because find_matches_for_request computes different hashes from raw tokens
        assert!(scores.scores.is_empty() || !scores.scores.is_empty());
    }
860

861
862
863
864
    #[tokio::test]
    #[apply(indexer_template)]
    async fn test_process_routing_decision(variant: &str) {
        let index = make_indexer(variant);
865

866
867
868
        // Create tokens with hashes
        let tokens = vec![1u32, 2, 3, 4, 5, 6, 7, 8];
        let mut tokens_with_hashes = TokensWithHashes::new(tokens, 32);
869

870
        let worker = WorkerWithDpRank::new(0, 0);
871

872
873
874
875
876
877
        // Process routing decision - should not error
        let result = index
            .process_routing_decision_for_request(&mut tokens_with_hashes, worker)
            .await;
        assert!(result.is_ok());
    }
878

879
880
881
882
    #[tokio::test]
    #[apply(indexer_template)]
    async fn test_parent_hash_chains(variant: &str) {
        let index = make_indexer(variant);
883

884
885
        // Store initial sequence [1, 2, 3]
        index.apply_event(make_store_event(0, &[1, 2, 3])).await;
886

887
888
889
890
        // Store continuation [4, 5] with parent pointing to block 3
        index
            .apply_event(make_store_event_with_parent(0, &[1, 2, 3], &[4, 5]))
            .await;
891

892
        flush_and_settle(index.as_ref()).await;
893

894
        // Query for full sequence [1, 2, 3, 4, 5] should match all 5 blocks
895
896
897
898
899
900
901
        assert_score(
            index.as_ref(),
            &[1, 2, 3, 4, 5],
            WorkerWithDpRank::new(0, 0),
            5,
        )
        .await;
902

903
        // Query for just [1, 2, 3] should match 3 blocks
904
        assert_score(index.as_ref(), &[1, 2, 3], WorkerWithDpRank::new(0, 0), 3).await;
905
    }
906

907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
    #[tokio::test]
    async fn test_flat_dump_replay_preserves_start_positions() {
        let index = make_indexer("flat");
        index
            .apply_event(make_store_event_with_start_position(0, &[11, 12], 10))
            .await;

        flush_and_settle(index.as_ref()).await;

        let dumped = index.dump_events().await.unwrap();
        let stored = dumped
            .iter()
            .filter_map(|event| match &event.event.data {
                KvCacheEventData::Stored(data) => Some(data),
                _ => None,
            })
            .collect::<Vec<_>>();
        assert_eq!(stored.len(), 2);
        assert_eq!(
            stored
                .iter()
                .map(|data| data.start_position)
                .collect::<Vec<_>>(),
            vec![Some(10), Some(11)]
        );
        assert!(stored.iter().all(|data| data.parent_hash.is_none()));

        let replay = make_indexer("flat");
        for event in dumped {
            replay.apply_event(event).await;
        }

        flush_and_settle(replay.as_ref()).await;

        assert_eq!(
            snapshot_tree(index.as_ref()).await,
            snapshot_tree(replay.as_ref()).await
        );
    }

947
948
949
950
    #[tokio::test]
    #[apply(indexer_template)]
    async fn test_multiple_dp_ranks(variant: &str) {
        let index = make_indexer(variant);
951

952
953
954
955
956
957
958
959
960
961
        // Same worker_id but different dp_ranks should be tracked separately
        index
            .apply_event(make_store_event_with_dp_rank(0, &[1, 2, 3], 0))
            .await;
        index
            .apply_event(make_store_event_with_dp_rank(0, &[1, 2, 3], 1))
            .await;
        index
            .apply_event(make_store_event_with_dp_rank(0, &[1, 2, 3], 2))
            .await;
962

963
        flush_and_settle(index.as_ref()).await;
964

965
966
967
        // Query should return all 3 dp_ranks as separate entries
        let seq: Vec<LocalBlockHash> = (1..=3).map(LocalBlockHash).collect();
        let scores = index.find_matches(seq).await.unwrap();
968

969
970
971
972
        assert_eq!(scores.scores.len(), 3);
        assert_eq!(*scores.scores.get(&WorkerWithDpRank::new(0, 0)).unwrap(), 3);
        assert_eq!(*scores.scores.get(&WorkerWithDpRank::new(0, 1)).unwrap(), 3);
        assert_eq!(*scores.scores.get(&WorkerWithDpRank::new(0, 2)).unwrap(), 3);
973
974
    }

975
976
977
978
    #[tokio::test]
    #[apply(indexer_template)]
    async fn test_partial_block_removal(variant: &str) {
        let index = make_indexer(variant);
979

980
981
        // Store [1, 2, 3]
        index.apply_event(make_store_event(0, &[1, 2, 3])).await;
982

983
        flush_and_settle(index.as_ref()).await;
984

985
986
987
988
        // Verify all 3 blocks match
        let seq: Vec<LocalBlockHash> = (1..=3).map(LocalBlockHash).collect();
        let scores = index.find_matches(seq.clone()).await.unwrap();
        assert_eq!(*scores.scores.get(&WorkerWithDpRank::new(0, 0)).unwrap(), 3);
989

990
991
992
993
994
995
        // Remove only the last block (block 3)
        // To do this correctly, we need to compute the seq_hash for block 3 specifically,
        // which requires the full sequence context [1,2,3].
        let full_hashes: Vec<LocalBlockHash> = (1..=3).map(LocalBlockHash).collect();
        let seq_hashes = compute_seq_hash_for_block(&full_hashes);
        let block_3_seq_hash = ExternalSequenceBlockHash(seq_hashes[2]); // Last block's hash
996

997
998
        let remove_event = remove_event(0, 0, 0, vec![block_3_seq_hash]);
        index.apply_event(remove_event).await;
999

1000
        flush_and_settle(index.as_ref()).await;
1001

1002
1003
1004
        // Query [1, 2, 3] - should only match 2 blocks now (block 3 is removed)
        let scores = index.find_matches(seq).await.unwrap();
        assert_eq!(*scores.scores.get(&WorkerWithDpRank::new(0, 0)).unwrap(), 2);
1005

1006
1007
1008
1009
1010
        // Query [1, 2] - should still match 2 blocks
        let partial_seq: Vec<LocalBlockHash> = (1..=2).map(LocalBlockHash).collect();
        let scores = index.find_matches(partial_seq).await.unwrap();
        assert_eq!(*scores.scores.get(&WorkerWithDpRank::new(0, 0)).unwrap(), 2);
    }
1011

1012
1013
1014
1015
1016
1017
1018
1019
    #[tokio::test]
    #[apply(indexer_template)]
    async fn test_remove_mid_chain_block(variant: &str) {
        // TODO: positional indexer has no parent-child structure, so mid-chain removal
        // doesn't invalidate later positions — jump search skips over the gap and over-counts.
        if variant == "flat" {
            return;
        }
1020

1021
        let index = make_indexer(variant);
1022

1023
1024
1025
1026
        // Store [1, 2, 3, 4, 5]
        index
            .apply_event(make_store_event(0, &[1, 2, 3, 4, 5]))
            .await;
1027

1028
        flush_and_settle(index.as_ref()).await;
1029

1030
1031
1032
1033
        // Verify all 5 blocks match
        let seq: Vec<LocalBlockHash> = (1..=5).map(LocalBlockHash).collect();
        let scores = index.find_matches(seq.clone()).await.unwrap();
        assert_eq!(*scores.scores.get(&WorkerWithDpRank::new(0, 0)).unwrap(), 5);
1034

1035
1036
1037
1038
        // Remove only block 3 (index 2) — the middle of the chain
        let full_hashes: Vec<LocalBlockHash> = (1..=5).map(LocalBlockHash).collect();
        let seq_hashes = compute_seq_hash_for_block(&full_hashes);
        let block_3_seq_hash = ExternalSequenceBlockHash(seq_hashes[2]);
1039

1040
1041
        let remove_event = remove_event(0, 0, 0, vec![block_3_seq_hash]);
        index.apply_event(remove_event).await;
1042

1043
        flush_and_settle(index.as_ref()).await;
1044

1045
1046
1047
        // Query [1, 2, 3, 4, 5] — only first 2 positions reachable (block 3 removed, orphaning 4 & 5)
        let scores = index.find_matches(seq.clone()).await.unwrap();
        assert_eq!(*scores.scores.get(&WorkerWithDpRank::new(0, 0)).unwrap(), 2);
1048

1049
1050
1051
1052
        // Query [1, 2] — prefix before the gap is still intact
        let prefix_seq: Vec<LocalBlockHash> = (1..=2).map(LocalBlockHash).collect();
        let scores = index.find_matches(prefix_seq).await.unwrap();
        assert_eq!(*scores.scores.get(&WorkerWithDpRank::new(0, 0)).unwrap(), 2);
1053

1054
1055
1056
1057
        // Re-store block 3 as a continuation of [1, 2]
        index
            .apply_event(make_store_event_with_parent(0, &[1, 2], &[3]))
            .await;
1058

1059
        flush_and_settle(index.as_ref()).await;
1060

1061
1062
1063
1064
        // Query [1, 2, 3, 4, 5] — block 3 is back but 4 & 5 were orphaned, so score = 3
        let scores = index.find_matches(seq).await.unwrap();
        assert_eq!(*scores.scores.get(&WorkerWithDpRank::new(0, 0)).unwrap(), 3);
    }
1065

1066
1067
1068
1069
    #[tokio::test]
    #[apply(indexer_template)]
    async fn test_remove_nonexistent_worker(variant: &str) {
        let index = make_indexer(variant);
1070

1071
1072
        // Store data for worker 0
        index.apply_event(make_store_event(0, &[1, 2, 3])).await;
1073

1074
        flush_and_settle(index.as_ref()).await;
1075

1076
1077
        // Remove non-existent worker 999 - should not error or affect worker 0
        index.remove_worker(999).await;
1078

1079
1080
        // Allow time for async processing
        flush_and_settle(index.as_ref()).await;
1081

1082
1083
1084
1085
1086
1087
        // Worker 0's data should still be there
        let seq: Vec<LocalBlockHash> = (1..=3).map(LocalBlockHash).collect();
        let scores = index.find_matches(seq).await.unwrap();
        assert_eq!(scores.scores.len(), 1);
        assert!(scores.scores.contains_key(&WorkerWithDpRank::new(0, 0)));
    }
1088

1089
1090
1091
1092
    #[tokio::test]
    #[apply(indexer_template)]
    async fn test_remove_nonexistent_blocks(variant: &str) {
        let index = make_indexer(variant);
1093

1094
1095
        // Store [1, 2, 3]
        index.apply_event(make_store_event(0, &[1, 2, 3])).await;
1096

1097
1098
        // Try to remove blocks [999, 998] that don't exist - should not error
        index.apply_event(make_remove_event(0, &[999, 998])).await;
1099

1100
        flush_and_settle(index.as_ref()).await;
1101

1102
1103
1104
1105
1106
        // Original data should still be there
        let seq: Vec<LocalBlockHash> = (1..=3).map(LocalBlockHash).collect();
        let scores = index.find_matches(seq).await.unwrap();
        assert_eq!(*scores.scores.get(&WorkerWithDpRank::new(0, 0)).unwrap(), 3);
    }
1107

1108
1109
1110
1111
    #[tokio::test]
    #[apply(indexer_template)]
    async fn test_clear_then_reuse(variant: &str) {
        let index = make_indexer(variant);
1112

1113
1114
        // Store initial data
        index.apply_event(make_store_event(0, &[1, 2, 3])).await;
1115

1116
1117
        // Clear the worker
        index.apply_event(make_clear_event(0)).await;
1118

1119
        flush_and_settle(index.as_ref()).await;
1120

1121
1122
1123
1124
        // Verify data is gone
        let seq: Vec<LocalBlockHash> = (1..=3).map(LocalBlockHash).collect();
        let scores = index.find_matches(seq.clone()).await.unwrap();
        assert!(scores.scores.is_empty());
1125

1126
1127
        // Store new data for the same worker
        index.apply_event(make_store_event(0, &[1, 2, 3])).await;
1128

1129
        flush_and_settle(index.as_ref()).await;
1130

1131
1132
1133
1134
1135
        // Verify new data is accessible
        let scores = index.find_matches(seq).await.unwrap();
        assert_eq!(scores.scores.len(), 1);
        assert_eq!(*scores.scores.get(&WorkerWithDpRank::new(0, 0)).unwrap(), 3);
    }
1136

1137
1138
1139
1140
    #[tokio::test]
    #[apply(indexer_template)]
    async fn test_multiple_sequences_per_worker(variant: &str) {
        let index = make_indexer(variant);
1141

1142
1143
1144
1145
1146
1147
1148
        // Store two disjoint sequences for the same worker
        // Sequence 1: [1, 2, 3]
        index.apply_event(make_store_event(0, &[1, 2, 3])).await;
        // Sequence 2: [100, 101, 102] (completely different, no parent)
        index
            .apply_event(make_store_event(0, &[100, 101, 102]))
            .await;
1149

1150
        flush_and_settle(index.as_ref()).await;
1151

1152
1153
1154
1155
        // Query first sequence
        let seq1: Vec<LocalBlockHash> = (1..=3).map(LocalBlockHash).collect();
        let scores = index.find_matches(seq1).await.unwrap();
        assert_eq!(*scores.scores.get(&WorkerWithDpRank::new(0, 0)).unwrap(), 3);
1156

1157
1158
1159
1160
        // Query second sequence
        let seq2: Vec<LocalBlockHash> = (100..=102).map(LocalBlockHash).collect();
        let scores = index.find_matches(seq2).await.unwrap();
        assert_eq!(*scores.scores.get(&WorkerWithDpRank::new(0, 0)).unwrap(), 3);
1161

1162
1163
1164
1165
1166
1167
        // Query a mix that doesn't exist as a sequence - should only match first block
        let mixed: Vec<LocalBlockHash> = vec![LocalBlockHash(1), LocalBlockHash(100)];
        let scores = index.find_matches(mixed).await.unwrap();
        // Only block 1 matches because [1, 100] is not a valid prefix
        assert_eq!(*scores.scores.get(&WorkerWithDpRank::new(0, 0)).unwrap(), 1);
    }
1168

1169
1170
1171
1172
    #[tokio::test]
    #[apply(indexer_template)]
    async fn test_clear_clears_all_dp_ranks(variant: &str) {
        let index = make_indexer(variant);
1173

1174
1175
1176
1177
1178
1179
1180
        // Store same sequence for different dp_ranks
        index
            .apply_event(make_store_event_with_dp_rank(0, &[1, 2, 3], 0))
            .await;
        index
            .apply_event(make_store_event_with_dp_rank(0, &[1, 2, 3], 1))
            .await;
1181

1182
        flush_and_settle(index.as_ref()).await;
1183

1184
1185
1186
1187
        // Verify both dp_ranks are present
        let seq: Vec<LocalBlockHash> = (1..=3).map(LocalBlockHash).collect();
        let scores = index.find_matches(seq.clone()).await.unwrap();
        assert_eq!(scores.scores.len(), 2);
1188

1189
1190
        // Clear event clears ALL blocks for the worker_id, regardless of dp_rank
        index.apply_event(make_clear_event_with_dp_rank(0, 0)).await;
1191

1192
        flush_and_settle(index.as_ref()).await;
1193

1194
1195
1196
1197
1198
1199
1200
        // Both dp_ranks should be cleared
        let scores = index.find_matches(seq).await.unwrap();
        assert!(
            scores.scores.is_empty(),
            "Cleared event should clear all dp_ranks for a worker"
        );
    }
1201
1202
}

1203
1204
1205
// ============================================================================
// LoRA isolation tests
// ============================================================================
1206

1207
1208
1209
mod lora_tests {
    use super::*;
    use rstest_reuse::apply;
1210

1211
1212
1213
1214
1215
    #[tokio::test]
    #[apply(indexer_template)]
    async fn test_lora_and_base_model_blocks_do_not_conflict(variant: &str) {
        let index = make_indexer(variant);
        let kv_block_size: u32 = 32;
1216

1217
1218
        // Same token sequence for both base model and LoRA adapter
        let tokens: Vec<u32> = (0..kv_block_size * 3).collect();
1219

1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
        let base_hashes =
            compute_block_hash_for_seq(&tokens, kv_block_size, BlockHashOptions::default());
        let lora_hashes = compute_block_hash_for_seq(
            &tokens,
            kv_block_size,
            BlockHashOptions {
                lora_name: Some("my-adapter"),
                ..Default::default()
            },
        );
1230

1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
        // Hashes must differ despite identical tokens
        assert_ne!(
            base_hashes, lora_hashes,
            "Base and LoRA hashes must differ for the same tokens"
        );

        let base_seq = compute_seq_hash_for_block(&base_hashes);
        let lora_seq = compute_seq_hash_for_block(&lora_hashes);

        // Store base-model blocks on worker 0
        let base_event = router_event(
1242
1243
1244
1245
1246
            0,
            0,
            0,
            KvCacheEventData::Stored(KvCacheStoreData {
                parent_hash: None,
1247
                start_position: None,
1248
                blocks: stored_blocks_with_sequence_hashes(&base_hashes, &base_seq),
1249
            }),
1250
1251
        );
        index.apply_event(base_event).await;
1252

1253
1254
        // Store LoRA blocks on worker 1
        let lora_event = router_event(
1255
1256
1257
1258
1259
            1,
            0,
            0,
            KvCacheEventData::Stored(KvCacheStoreData {
                parent_hash: None,
1260
                start_position: None,
1261
                blocks: stored_blocks_with_sequence_hashes(&lora_hashes, &lora_seq),
1262
            }),
1263
1264
        );
        index.apply_event(lora_event).await;
1265

1266
        flush_and_settle(index.as_ref()).await;
1267

1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
        // Query with base-model hashes → only worker 0
        let base_scores = index.find_matches(base_hashes.clone()).await.unwrap();
        assert_eq!(
            base_scores.scores.len(),
            1,
            "Only base-model worker should match"
        );
        assert_eq!(
            *base_scores
                .scores
                .get(&WorkerWithDpRank::new(0, 0))
                .unwrap(),
            3
        );
1282

1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
        // Query with LoRA hashes → only worker 1
        let lora_scores = index.find_matches(lora_hashes.clone()).await.unwrap();
        assert_eq!(lora_scores.scores.len(), 1, "Only LoRA worker should match");
        assert_eq!(
            *lora_scores
                .scores
                .get(&WorkerWithDpRank::new(1, 0))
                .unwrap(),
            3
        );
    }
1294

1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
    /// Reproduces the "block_hash mismatch: sequence hashes should be uniform
    /// across workers" warning seen when the same prompt is sent to both a base
    /// model worker and a LoRA worker.
    ///
    /// On main (without LoRA-aware hashing), both workers compute the same
    /// LocalBlockHash for identical tokens.  But vLLM's engine includes the
    /// adapter in its rolling ExternalSequenceBlockHash, so the radix tree
    /// sees conflicting sequence hashes at the same tree node.
    ///
    /// With LoRA-aware hashing, compute_block_hash_for_seq produces distinct
    /// LocalBlockHash values for different adapters, so the blocks land on
    /// separate tree paths and no mismatch occurs.
    #[tokio::test]
    #[apply(indexer_template)]
    async fn test_lora_base_same_tokens_no_seq_hash_mismatch(variant: &str) {
        let index = make_indexer(variant);
        let kv_block_size: u32 = 32;

        let tokens: Vec<u32> = (0..kv_block_size * 3).collect();

        // With LoRA-aware hashing, base and adapter produce different LocalBlockHash
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
        let base_local =
            compute_block_hash_for_seq(&tokens, kv_block_size, BlockHashOptions::default());
        let lora_local = compute_block_hash_for_seq(
            &tokens,
            kv_block_size,
            BlockHashOptions {
                lora_name: Some("my-adapter"),
                ..Default::default()
            },
        );
1326
1327
1328
1329
1330

        assert_ne!(
            base_local, lora_local,
            "LoRA-aware hashing must produce different LocalBlockHash values"
        );
1331

1332
1333
1334
1335
        // Simulate what vLLM does: same tokens, different rolling seq hashes
        // because the engine accounts for the adapter internally.
        let base_seq = compute_seq_hash_for_block(&base_local);
        let lora_seq = compute_seq_hash_for_block(&lora_local);
1336

1337
1338
1339
1340
1341
1342
1343
1344
        // Worker 0: base model
        index
            .apply_event(router_event(
                0,
                0,
                0,
                KvCacheEventData::Stored(KvCacheStoreData {
                    parent_hash: None,
1345
                    start_position: None,
1346
1347
1348
1349
                    blocks: stored_blocks_with_sequence_hashes(&base_local, &base_seq),
                }),
            ))
            .await;
1350

1351
1352
1353
1354
1355
1356
1357
1358
1359
        // Worker 1: LoRA adapter — different LocalBlockHash, so this goes to
        // a separate tree path instead of colliding with worker 0's node.
        index
            .apply_event(router_event(
                1,
                0,
                0,
                KvCacheEventData::Stored(KvCacheStoreData {
                    parent_hash: None,
1360
                    start_position: None,
1361
1362
1363
1364
                    blocks: stored_blocks_with_sequence_hashes(&lora_local, &lora_seq),
                }),
            ))
            .await;
1365

1366
        flush_and_settle(index.as_ref()).await;
1367

1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
        // Base query finds only worker 0
        let base_scores = index.find_matches(base_local.clone()).await.unwrap();
        assert_eq!(base_scores.scores.len(), 1);
        assert_eq!(
            *base_scores
                .scores
                .get(&WorkerWithDpRank::new(0, 0))
                .unwrap(),
            3
        );
1378

1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
        // LoRA query finds only worker 1
        let lora_scores = index.find_matches(lora_local.clone()).await.unwrap();
        assert_eq!(lora_scores.scores.len(), 1);
        assert_eq!(
            *lora_scores
                .scores
                .get(&WorkerWithDpRank::new(1, 0))
                .unwrap(),
            3
        );
    }
1390

1391
1392
1393
1394
1395
    #[tokio::test]
    #[apply(indexer_template)]
    async fn test_different_lora_adapters_do_not_conflict(variant: &str) {
        let index = make_indexer(variant);
        let kv_block_size: u32 = 32;
1396

1397
        let tokens: Vec<u32> = (0..kv_block_size * 2).collect();
1398

1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
        let hashes_a = compute_block_hash_for_seq(
            &tokens,
            kv_block_size,
            BlockHashOptions {
                lora_name: Some("adapter-a"),
                ..Default::default()
            },
        );
        let hashes_b = compute_block_hash_for_seq(
            &tokens,
            kv_block_size,
            BlockHashOptions {
                lora_name: Some("adapter-b"),
                ..Default::default()
            },
        );
1415

1416
1417
1418
1419
        assert_ne!(
            hashes_a, hashes_b,
            "Different adapters must produce different hashes"
        );
1420

1421
1422
        let seq_a = compute_seq_hash_for_block(&hashes_a);
        let seq_b = compute_seq_hash_for_block(&hashes_b);
1423

1424
1425
1426
1427
1428
1429
1430
1431
        // Store adapter-a blocks on worker 0
        index
            .apply_event(router_event(
                0,
                0,
                0,
                KvCacheEventData::Stored(KvCacheStoreData {
                    parent_hash: None,
1432
                    start_position: None,
1433
1434
1435
1436
                    blocks: stored_blocks_with_sequence_hashes(&hashes_a, &seq_a),
                }),
            ))
            .await;
1437

1438
1439
1440
1441
1442
1443
1444
1445
        // Store adapter-b blocks on worker 1
        index
            .apply_event(router_event(
                1,
                0,
                0,
                KvCacheEventData::Stored(KvCacheStoreData {
                    parent_hash: None,
1446
                    start_position: None,
1447
1448
1449
1450
                    blocks: stored_blocks_with_sequence_hashes(&hashes_b, &seq_b),
                }),
            ))
            .await;
1451

1452
        flush_and_settle(index.as_ref()).await;
1453

1454
1455
1456
1457
1458
        // Query adapter-a → only worker 0
        let scores_a = index.find_matches(hashes_a.clone()).await.unwrap();
        assert_eq!(scores_a.scores.len(), 1);
        assert!(scores_a.scores.contains_key(&WorkerWithDpRank::new(0, 0)));
        assert!(!scores_a.scores.contains_key(&WorkerWithDpRank::new(1, 0)));
1459

1460
1461
1462
1463
1464
1465
1466
        // Query adapter-b → only worker 1
        let scores_b = index.find_matches(hashes_b.clone()).await.unwrap();
        assert_eq!(scores_b.scores.len(), 1);
        assert!(scores_b.scores.contains_key(&WorkerWithDpRank::new(1, 0)));
        assert!(!scores_b.scores.contains_key(&WorkerWithDpRank::new(0, 0)));
    }
}
1467

1468
1469
1470
// ============================================================================
// Long sequence tests - especially important for NestedMap/PositionalIndexer
// ============================================================================
1471

1472
1473
1474
mod long_sequence_tests {
    use super::*;
    use rstest_reuse::apply;
1475

1476
1477
1478
1479
    #[tokio::test]
    #[apply(indexer_template)]
    async fn test_long_sequence_single_store(variant: &str) {
        let index = make_indexer(variant);
1480

1481
1482
1483
1484
        // Store a long sequence (128 blocks) in a single event
        let seq_len = 128;
        let sequence: Vec<u64> = (1..=seq_len).collect();
        index.apply_event(make_store_event(0, &sequence)).await;
1485

1486
        flush_and_settle(index.as_ref()).await;
1487

1488
1489
1490
1491
1492
1493
1494
1495
        // Query full sequence - should match all blocks
        let full_query: Vec<LocalBlockHash> = sequence.iter().map(|&i| LocalBlockHash(i)).collect();
        let scores = index.find_matches(full_query).await.unwrap();
        assert_eq!(scores.scores.len(), 1);
        assert_eq!(
            *scores.scores.get(&WorkerWithDpRank::new(0, 0)).unwrap(),
            seq_len as u32
        );
1496

1497
1498
1499
1500
1501
1502
1503
        // Query prefix (first 64 blocks)
        let prefix_query: Vec<LocalBlockHash> = (1..=64).map(LocalBlockHash).collect();
        let scores = index.find_matches(prefix_query).await.unwrap();
        assert_eq!(
            *scores.scores.get(&WorkerWithDpRank::new(0, 0)).unwrap(),
            64
        );
1504

1505
1506
1507
1508
1509
1510
1511
1512
1513
        // Query with divergence at position 50
        let mut divergent_query: Vec<LocalBlockHash> = (1..=100).map(LocalBlockHash).collect();
        divergent_query[49] = LocalBlockHash(99999); // Position 49 (0-indexed) diverges
        let scores = index.find_matches(divergent_query).await.unwrap();
        assert_eq!(
            *scores.scores.get(&WorkerWithDpRank::new(0, 0)).unwrap(),
            49
        );
    }
1514

1515
1516
1517
1518
    #[tokio::test]
    #[apply(indexer_template)]
    async fn test_long_sequence_multiple_continuations(variant: &str) {
        let index = make_indexer(variant);
1519

1520
1521
1522
1523
        // Build a long sequence through multiple continuations
        // First store: blocks 1-50
        let first_chunk: Vec<u64> = (1..=50).collect();
        index.apply_event(make_store_event(0, &first_chunk)).await;
1524

1525
1526
1527
1528
1529
        // Second store: blocks 51-100 (continuation of first)
        let second_chunk: Vec<u64> = (51..=100).collect();
        index
            .apply_event(make_store_event_with_parent(0, &first_chunk, &second_chunk))
            .await;
1530

1531
1532
1533
1534
1535
1536
        // Third store: blocks 101-150 (continuation of second)
        let prefix_1_2: Vec<u64> = (1..=100).collect();
        let third_chunk: Vec<u64> = (101..=150).collect();
        index
            .apply_event(make_store_event_with_parent(0, &prefix_1_2, &third_chunk))
            .await;
1537

1538
        flush_and_settle(index.as_ref()).await;
1539

1540
1541
1542
1543
        // Query full sequence - should match all 150 blocks
        let full_query: Vec<LocalBlockHash> = (1..=150).map(LocalBlockHash).collect();
        let scores = index.find_matches(full_query).await.unwrap();
        assert_eq!(scores.scores.len(), 1);
1544
1545
        assert_eq!(
            *scores.scores.get(&WorkerWithDpRank::new(0, 0)).unwrap(),
1546
            150
1547
1548
        );

1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
        // Query crossing continuation boundaries
        let cross_boundary_query: Vec<LocalBlockHash> = (45..=105).map(LocalBlockHash).collect();
        let scores = index.find_matches(cross_boundary_query).await.unwrap();
        // Query starts at block 45, but stored sequence starts at 1, so this won't match
        // because the sequence hash at position 0 of our query (block 45) won't match
        // the stored sequence hash at position 0 (block 1)
        assert!(
            scores.scores.is_empty() || !scores.scores.contains_key(&WorkerWithDpRank::new(0, 0))
        );
    }
1559

1560
1561
1562
1563
    #[tokio::test]
    #[apply(indexer_template)]
    async fn test_long_sequence_branching_continuations(variant: &str) {
        let index = make_indexer(variant);
1564

1565
1566
1567
        // Common prefix: blocks 1-30
        let common_prefix: Vec<u64> = (1..=30).collect();
        index.apply_event(make_store_event(0, &common_prefix)).await;
1568

1569
1570
1571
1572
1573
        // Branch A: blocks 31-60 on worker 0
        let branch_a: Vec<u64> = (31..=60).collect();
        index
            .apply_event(make_store_event_with_parent(0, &common_prefix, &branch_a))
            .await;
1574

1575
1576
1577
1578
1579
1580
1581
1582
1583
        // Branch B: blocks 131-160 (different content) on worker 1
        // First store the common prefix for worker 1
        index.apply_event(make_store_event(1, &common_prefix)).await;
        let branch_b: Vec<u64> = (131..=160).collect();
        index
            .apply_event(make_store_event_with_parent(1, &common_prefix, &branch_b))
            .await;

        flush_and_settle(index.as_ref()).await;
1584

1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
        // Query common prefix - both workers should match
        let prefix_query: Vec<LocalBlockHash> = (1..=30).map(LocalBlockHash).collect();
        let scores = index.find_matches(prefix_query).await.unwrap();
        assert_eq!(scores.scores.len(), 2);
        assert_eq!(
            *scores.scores.get(&WorkerWithDpRank::new(0, 0)).unwrap(),
            30
        );
        assert_eq!(
            *scores.scores.get(&WorkerWithDpRank::new(1, 0)).unwrap(),
            30
        );

        // Query branch A path - only worker 0 should match fully
        let branch_a_query: Vec<LocalBlockHash> = (1..=60).map(LocalBlockHash).collect();
        let scores = index.find_matches(branch_a_query).await.unwrap();
        assert_eq!(
            *scores.scores.get(&WorkerWithDpRank::new(0, 0)).unwrap(),
            60
        );
        assert_eq!(
            *scores.scores.get(&WorkerWithDpRank::new(1, 0)).unwrap(),
            30
        );
1609
1610
    }

1611
1612
1613
1614
    #[tokio::test]
    #[apply(indexer_template)]
    async fn test_long_sequence_partial_removal(variant: &str) {
        let index = make_indexer(variant);
1615

1616
1617
1618
        // Store a long sequence
        let sequence: Vec<u64> = (1..=100).collect();
        index.apply_event(make_store_event(0, &sequence)).await;
1619

1620
        flush_and_settle(index.as_ref()).await;
1621

1622
1623
1624
1625
1626
1627
1628
        // Verify full match
        let full_query: Vec<LocalBlockHash> = sequence.iter().map(|&i| LocalBlockHash(i)).collect();
        let scores = index.find_matches(full_query.clone()).await.unwrap();
        assert_eq!(
            *scores.scores.get(&WorkerWithDpRank::new(0, 0)).unwrap(),
            100
        );
1629

1630
1631
1632
1633
1634
1635
1636
        // Remove blocks 80-100 (the tail)
        let tail_hashes: Vec<LocalBlockHash> = (1..=100).map(LocalBlockHash).collect();
        let seq_hashes = compute_seq_hash_for_block(&tail_hashes);
        let remove_hashes: Vec<ExternalSequenceBlockHash> = seq_hashes[79..100]
            .iter()
            .map(|&h| ExternalSequenceBlockHash(h))
            .collect();
1637

1638
1639
        let remove_event = remove_event(0, 0, 0, remove_hashes);
        index.apply_event(remove_event).await;
1640

1641
        flush_and_settle(index.as_ref()).await;
1642

1643
1644
1645
1646
1647
1648
1649
        // Query should now only match first 79 blocks
        let scores = index.find_matches(full_query).await.unwrap();
        assert_eq!(
            *scores.scores.get(&WorkerWithDpRank::new(0, 0)).unwrap(),
            79
        );
    }
1650

1651
1652
1653
1654
1655
1656
1657
1658
1659
1660
1661
1662
1663
1664
1665
1666
1667
1668
1669
1670
1671
1672
1673
1674
1675
1676
1677
1678
1679
1680
1681
1682
1683
1684
1685
1686
1687
1688
1689
1690
1691
1692
1693
1694
    #[tokio::test]
    #[apply(indexer_template)]
    async fn test_long_sequence_interleaved_workers(variant: &str) {
        let index = make_indexer(variant);

        // Multiple workers storing overlapping long sequences concurrently
        // Worker 0: blocks 1-100
        // Worker 1: blocks 1-75
        // Worker 2: blocks 1-50
        // Worker 3: blocks 1-25

        let seq_100: Vec<u64> = (1..=100).collect();
        let seq_75: Vec<u64> = (1..=75).collect();
        let seq_50: Vec<u64> = (1..=50).collect();
        let seq_25: Vec<u64> = (1..=25).collect();

        index.apply_event(make_store_event(0, &seq_100)).await;
        index.apply_event(make_store_event(1, &seq_75)).await;
        index.apply_event(make_store_event(2, &seq_50)).await;
        index.apply_event(make_store_event(3, &seq_25)).await;

        flush_and_settle(index.as_ref()).await;

        // Query for 60 blocks - workers 0,1 match 60, worker 2 matches 50, worker 3 matches 25
        let query_60: Vec<LocalBlockHash> = (1..=60).map(LocalBlockHash).collect();
        let scores = index.find_matches(query_60).await.unwrap();
        assert_eq!(scores.scores.len(), 4);
        assert_eq!(
            *scores.scores.get(&WorkerWithDpRank::new(0, 0)).unwrap(),
            60
        );
        assert_eq!(
            *scores.scores.get(&WorkerWithDpRank::new(1, 0)).unwrap(),
            60
        );
        assert_eq!(
            *scores.scores.get(&WorkerWithDpRank::new(2, 0)).unwrap(),
            50
        );
        assert_eq!(
            *scores.scores.get(&WorkerWithDpRank::new(3, 0)).unwrap(),
            25
        );
    }
1695

1696
1697
1698
1699
    #[tokio::test]
    #[apply(indexer_template)]
    async fn test_long_sequence_exact_jump_size_boundaries(variant: &str) {
        let index = make_indexer(variant);
1700

1701
1702
        // Test sequences that align exactly with jump_size boundaries (32 for PositionalIndexer)
        // This tests edge cases in the jump search algorithm
1703

1704
1705
1706
        // Store sequence of exactly 32 blocks
        let seq_32: Vec<u64> = (1..=32).collect();
        index.apply_event(make_store_event(0, &seq_32)).await;
1707

1708
1709
1710
        // Store sequence of exactly 64 blocks (2x jump_size)
        let seq_64: Vec<u64> = (1001..=1064).collect();
        index.apply_event(make_store_event(1, &seq_64)).await;
1711

1712
1713
1714
        // Store sequence of exactly 96 blocks (3x jump_size)
        let seq_96: Vec<u64> = (2001..=2096).collect();
        index.apply_event(make_store_event(2, &seq_96)).await;
1715

1716
        flush_and_settle(index.as_ref()).await;
1717

1718
1719
1720
1721
1722
1723
1724
        // Verify all sequences match correctly
        let query_32: Vec<LocalBlockHash> = seq_32.iter().map(|&i| LocalBlockHash(i)).collect();
        let scores = index.find_matches(query_32).await.unwrap();
        assert_eq!(
            *scores.scores.get(&WorkerWithDpRank::new(0, 0)).unwrap(),
            32
        );
1725

1726
1727
1728
1729
1730
1731
        let query_64: Vec<LocalBlockHash> = seq_64.iter().map(|&i| LocalBlockHash(i)).collect();
        let scores = index.find_matches(query_64).await.unwrap();
        assert_eq!(
            *scores.scores.get(&WorkerWithDpRank::new(1, 0)).unwrap(),
            64
        );
1732

1733
1734
1735
1736
1737
1738
1739
        let query_96: Vec<LocalBlockHash> = seq_96.iter().map(|&i| LocalBlockHash(i)).collect();
        let scores = index.find_matches(query_96).await.unwrap();
        assert_eq!(
            *scores.scores.get(&WorkerWithDpRank::new(2, 0)).unwrap(),
            96
        );
    }
1740

1741
1742
1743
1744
    #[tokio::test]
    #[apply(indexer_template)]
    async fn test_long_sequence_off_by_one_jump_boundaries(variant: &str) {
        let index = make_indexer(variant);
1745

1746
1747
1748
1749
1750
        // Test sequences at jump_size +/- 1 boundaries to catch off-by-one errors
        let seq_31: Vec<u64> = (1..=31).collect();
        let seq_33: Vec<u64> = (101..=133).collect();
        let seq_63: Vec<u64> = (201..=263).collect();
        let seq_65: Vec<u64> = (301..=365).collect();
1751

1752
1753
1754
1755
        index.apply_event(make_store_event(0, &seq_31)).await;
        index.apply_event(make_store_event(1, &seq_33)).await;
        index.apply_event(make_store_event(2, &seq_63)).await;
        index.apply_event(make_store_event(3, &seq_65)).await;
1756

1757
1758
1759
1760
1761
1762
1763
1764
1765
1766
1767
1768
1769
1770
1771
1772
1773
1774
1775
1776
1777
1778
1779
1780
1781
1782
1783
1784
1785
1786
1787
1788
1789
1790
1791
1792
1793
1794
1795
1796
1797
1798
1799
1800
1801
1802
1803
1804
1805
1806
1807
1808
1809
1810
1811
1812
1813
1814
1815
1816
1817
1818
1819
1820
1821
1822
1823
1824
1825
1826
1827
1828
1829
1830
1831
1832
1833
1834
1835
1836
1837
1838
1839
1840
1841
1842
1843
1844
1845
1846
1847
1848
1849
1850
1851
1852
1853
1854
1855
1856
1857
1858
1859
1860
1861
1862
1863
1864
1865
1866
1867
1868
1869
1870
1871
1872
1873
1874
1875
1876
1877
1878
1879
1880
1881
1882
1883
1884
1885
1886
1887
1888
1889
1890
1891
1892
1893
1894
1895
1896
1897
1898
1899
1900
1901
1902
1903
1904
1905
1906
1907
1908
1909
1910
1911
1912
1913
1914
1915
1916
1917
1918
1919
1920
1921
1922
1923
1924
1925
1926
1927
        flush_and_settle(index.as_ref()).await;

        // Verify all sequences match correctly
        let query_31: Vec<LocalBlockHash> = seq_31.iter().map(|&i| LocalBlockHash(i)).collect();
        let scores = index.find_matches(query_31).await.unwrap();
        assert_eq!(
            *scores.scores.get(&WorkerWithDpRank::new(0, 0)).unwrap(),
            31
        );

        let query_33: Vec<LocalBlockHash> = seq_33.iter().map(|&i| LocalBlockHash(i)).collect();
        let scores = index.find_matches(query_33).await.unwrap();
        assert_eq!(
            *scores.scores.get(&WorkerWithDpRank::new(1, 0)).unwrap(),
            33
        );

        let query_63: Vec<LocalBlockHash> = seq_63.iter().map(|&i| LocalBlockHash(i)).collect();
        let scores = index.find_matches(query_63).await.unwrap();
        assert_eq!(
            *scores.scores.get(&WorkerWithDpRank::new(2, 0)).unwrap(),
            63
        );

        let query_65: Vec<LocalBlockHash> = seq_65.iter().map(|&i| LocalBlockHash(i)).collect();
        let scores = index.find_matches(query_65).await.unwrap();
        assert_eq!(
            *scores.scores.get(&WorkerWithDpRank::new(3, 0)).unwrap(),
            65
        );
    }

    #[tokio::test]
    #[apply(indexer_template)]
    async fn test_long_sequence_divergence_at_jump_boundaries(variant: &str) {
        let index = make_indexer(variant);

        // Store a long sequence
        let sequence: Vec<u64> = (1..=128).collect();
        index.apply_event(make_store_event(0, &sequence)).await;

        flush_and_settle(index.as_ref()).await;

        // Test divergence exactly at jump boundaries (position 31, 32, 33, 63, 64, 65)
        for diverge_pos in [31usize, 32, 33, 63, 64, 65, 95, 96, 97] {
            let mut query: Vec<LocalBlockHash> = (1..=128).map(LocalBlockHash).collect();
            query[diverge_pos] = LocalBlockHash(99999);

            let scores = index.find_matches(query).await.unwrap();
            assert_eq!(
                *scores.scores.get(&WorkerWithDpRank::new(0, 0)).unwrap(),
                diverge_pos as u32,
                "Divergence at position {} should match {} blocks",
                diverge_pos,
                diverge_pos
            );
        }
    }

    #[tokio::test]
    #[apply(indexer_template)]
    async fn test_long_sequence_deep_continuation_chain(variant: &str) {
        let index = make_indexer(variant);

        // Build a very long sequence through many small continuations
        // This tests the parent_hash chain handling
        let chunk_size = 10;
        let num_chunks = 20; // Total 200 blocks

        let mut full_prefix: Vec<u64> = Vec::new();

        for chunk_idx in 0..num_chunks {
            let chunk_start = chunk_idx * chunk_size + 1;
            let chunk: Vec<u64> = (chunk_start..chunk_start + chunk_size)
                .map(|x| x as u64)
                .collect();

            if chunk_idx == 0 {
                index.apply_event(make_store_event(0, &chunk)).await;
            } else {
                index
                    .apply_event(make_store_event_with_parent(0, &full_prefix, &chunk))
                    .await;
            }

            full_prefix.extend(&chunk);
        }

        flush_and_settle(index.as_ref()).await;

        // Query full sequence
        let full_query: Vec<LocalBlockHash> = (1..=200).map(LocalBlockHash).collect();
        let scores = index.find_matches(full_query).await.unwrap();
        assert_eq!(
            *scores.scores.get(&WorkerWithDpRank::new(0, 0)).unwrap(),
            200
        );

        // Query partial prefix crossing multiple chunk boundaries
        let partial_query: Vec<LocalBlockHash> = (1..=75).map(LocalBlockHash).collect();
        let scores = index.find_matches(partial_query).await.unwrap();
        assert_eq!(
            *scores.scores.get(&WorkerWithDpRank::new(0, 0)).unwrap(),
            75
        );
    }

    #[tokio::test]
    #[apply(indexer_template)]
    async fn test_long_sequence_clear_and_rebuild(variant: &str) {
        let index = make_indexer(variant);

        // Store a long sequence
        let sequence: Vec<u64> = (1..=100).collect();
        index.apply_event(make_store_event(0, &sequence)).await;

        flush_and_settle(index.as_ref()).await;

        // Verify it's stored
        let query: Vec<LocalBlockHash> = sequence.iter().map(|&i| LocalBlockHash(i)).collect();
        let scores = index.find_matches(query.clone()).await.unwrap();
        assert_eq!(
            *scores.scores.get(&WorkerWithDpRank::new(0, 0)).unwrap(),
            100
        );

        // Clear the worker
        index.apply_event(make_clear_event(0)).await;

        flush_and_settle(index.as_ref()).await;

        // Verify it's cleared
        let scores = index.find_matches(query.clone()).await.unwrap();
        assert!(scores.scores.is_empty());

        // Rebuild with a different sequence
        let new_sequence: Vec<u64> = (1001..=1100).collect();
        index.apply_event(make_store_event(0, &new_sequence)).await;

        flush_and_settle(index.as_ref()).await;

        // Verify new sequence works
        let new_query: Vec<LocalBlockHash> =
            new_sequence.iter().map(|&i| LocalBlockHash(i)).collect();
        let scores = index.find_matches(new_query).await.unwrap();
        assert_eq!(
            *scores.scores.get(&WorkerWithDpRank::new(0, 0)).unwrap(),
            100
        );

        // Verify old sequence no longer matches
        let scores = index.find_matches(query).await.unwrap();
        assert!(scores.scores.is_empty());
    }

    #[tokio::test]
    #[apply(indexer_template)]
    async fn test_long_sequence_multiple_workers_diverging(variant: &str) {
        let index = make_indexer(variant);

        // Multiple workers with long sequences that share a prefix then diverge
        // This tests precise drain point tracking across workers

        // All workers share prefix 1-40
        let shared_prefix: Vec<u64> = (1..=40).collect();

        // Worker 0: prefix + 41-100 (stores full sequence 1-100)
        let worker_0_full: Vec<u64> = (1..=100).collect();

        // Worker 1: prefix + 141-180 (diverges at block 41)
        let worker_1_suffix: Vec<u64> = (141..=180).collect();
1928

1929
1930
        // Worker 2: prefix + 241-300 (diverges at block 41)
        let worker_2_suffix: Vec<u64> = (241..=300).collect();
1931

1932
1933
        // Store for all workers
        index.apply_event(make_store_event(0, &worker_0_full)).await;
1934

1935
        index.apply_event(make_store_event(1, &shared_prefix)).await;
1936
        index
1937
1938
1939
1940
1941
            .apply_event(make_store_event_with_parent(
                1,
                &shared_prefix,
                &worker_1_suffix,
            ))
1942
            .await;
1943
1944
1945
1946
1947
1948
1949
1950
1951
1952
1953
1954
1955
1956
1957
1958
1959
1960
1961
1962
1963
1964
1965
1966
1967
1968
1969
1970

        index.apply_event(make_store_event(2, &shared_prefix)).await;
        index
            .apply_event(make_store_event_with_parent(
                2,
                &shared_prefix,
                &worker_2_suffix,
            ))
            .await;

        flush_and_settle(index.as_ref()).await;

        // Query 1-100 - worker 0 matches 100, workers 1&2 match 40
        let query: Vec<LocalBlockHash> = worker_0_full.iter().map(|&i| LocalBlockHash(i)).collect();
        let scores = index.find_matches(query).await.unwrap();

        assert_eq!(
            *scores.scores.get(&WorkerWithDpRank::new(0, 0)).unwrap(),
            100
        );
        assert_eq!(
            *scores.scores.get(&WorkerWithDpRank::new(1, 0)).unwrap(),
            40
        );
        assert_eq!(
            *scores.scores.get(&WorkerWithDpRank::new(2, 0)).unwrap(),
            40
        );
1971
1972
    }

1973
1974
1975
1976
    #[tokio::test]
    #[apply(indexer_template)]
    async fn test_long_sequence_staggered_lengths(variant: &str) {
        let index = make_indexer(variant);
1977

1978
1979
1980
1981
1982
1983
        // Workers with sequences of staggered lengths to test drain tracking
        // Worker 0: 10 blocks
        // Worker 1: 20 blocks
        // Worker 2: 35 blocks (just past first jump)
        // Worker 3: 64 blocks (exactly 2 jumps)
        // Worker 4: 100 blocks
1984

1985
1986
1987
1988
1989
1990
        for (worker_id, len) in [(0, 10), (1, 20), (2, 35), (3, 64), (4, 100)] {
            let sequence: Vec<u64> = (1..=len).collect();
            index
                .apply_event(make_store_event(worker_id, &sequence))
                .await;
        }
1991

1992
        flush_and_settle(index.as_ref()).await;
1993

1994
1995
1996
        // Query for 100 blocks - each worker should match their stored length
        let query: Vec<LocalBlockHash> = (1..=100).map(LocalBlockHash).collect();
        let scores = index.find_matches(query).await.unwrap();
1997

1998
1999
2000
2001
2002
2003
2004
2005
2006
2007
2008
2009
2010
2011
2012
2013
2014
2015
2016
2017
2018
        assert_eq!(
            *scores.scores.get(&WorkerWithDpRank::new(0, 0)).unwrap(),
            10
        );
        assert_eq!(
            *scores.scores.get(&WorkerWithDpRank::new(1, 0)).unwrap(),
            20
        );
        assert_eq!(
            *scores.scores.get(&WorkerWithDpRank::new(2, 0)).unwrap(),
            35
        );
        assert_eq!(
            *scores.scores.get(&WorkerWithDpRank::new(3, 0)).unwrap(),
            64
        );
        assert_eq!(
            *scores.scores.get(&WorkerWithDpRank::new(4, 0)).unwrap(),
            100
        );
    }
2019

2020
2021
2022
2023
    #[tokio::test]
    #[apply(indexer_template)]
    async fn test_very_long_sequence(variant: &str) {
        let index = make_indexer(variant);
2024

2025
2026
2027
2028
        // Test with a very long sequence (1000 blocks)
        let seq_len = 1000u64;
        let sequence: Vec<u64> = (1..=seq_len).collect();
        index.apply_event(make_store_event(0, &sequence)).await;
2029

2030
2031
2032
2033
2034
2035
2036
2037
2038
2039
2040
2041
2042
2043
2044
2045
2046
2047
2048
2049
2050
2051
2052
2053
2054
2055
2056
        flush_and_settle(index.as_ref()).await;

        // Full match
        let full_query: Vec<LocalBlockHash> = sequence.iter().map(|&i| LocalBlockHash(i)).collect();
        let scores = index.find_matches(full_query).await.unwrap();
        assert_eq!(
            *scores.scores.get(&WorkerWithDpRank::new(0, 0)).unwrap(),
            seq_len as u32
        );

        // Partial match (first 500)
        let partial_query: Vec<LocalBlockHash> = (1..=500).map(LocalBlockHash).collect();
        let scores = index.find_matches(partial_query).await.unwrap();
        assert_eq!(
            *scores.scores.get(&WorkerWithDpRank::new(0, 0)).unwrap(),
            500
        );

        // Divergence in the middle
        let mut mid_diverge: Vec<LocalBlockHash> = (1..=1000).map(LocalBlockHash).collect();
        mid_diverge[499] = LocalBlockHash(99999);
        let scores = index.find_matches(mid_diverge).await.unwrap();
        assert_eq!(
            *scores.scores.get(&WorkerWithDpRank::new(0, 0)).unwrap(),
            499
        );
    }
2057
2058
2059
}

// ============================================================================
2060
// Tests specific to tree-based implementations with frequency/pruning support.
2061
2062
2063
2064
2065
// These use features not available in PositionalIndexer
// ============================================================================

#[template]
#[rstest]
2066
fn tree_indexer_template(#[values("single")] variant: &str) {}
2067
2068
2069
2070
2071
2072
2073
2074
2075
2076
2077
2078
2079
2080
2081
2082
2083
2084
2085
2086
2087

fn make_tree_indexer_with_frequency(
    variant: &str,
    expiration: Duration,
) -> Box<dyn KvIndexerInterface> {
    let token = CancellationToken::new();
    let metrics = Arc::new(KvIndexerMetrics::new_unregistered());
    let kv_block_size = 32;

    match variant {
        "single" => Box::new(KvIndexer::new_with_frequency(
            token,
            Some(expiration),
            kv_block_size,
            metrics,
            None,
        )),
        _ => panic!("Unknown variant: {}", variant),
    }
}

2088
#[tokio::test]
2089
async fn test_routing_decision_assigns_first_seen_worker() {
2090
2091
    let token = CancellationToken::new();
    let metrics = Arc::new(KvIndexerMetrics::new_unregistered());
2092
    let index = KvIndexer::new_with_frequency(
2093
2094
2095
2096
2097
2098
2099
2100
2101
2102
2103
2104
2105
2106
2107
2108
2109
2110
2111
2112
2113
2114
2115
2116
2117
        token,
        Some(Duration::from_secs(60)),
        32,
        metrics,
        Some(PruneConfig::default()),
    );
    let worker = WorkerWithDpRank::new(42, 0);
    let local_hashes = vec![LocalBlockHash(11), LocalBlockHash(22)];
    let sequence_hashes = compute_seq_hash_for_block(&local_hashes);

    index
        .process_routing_decision_with_hashes(worker, local_hashes.clone(), sequence_hashes)
        .await
        .unwrap();
    flush_and_settle(&index).await;

    assert_score(&index, &[11, 22], worker, 2).await;

    index.remove_worker(worker.worker_id).await;
    flush_and_settle(&index).await;

    let scores = query_scores(&index, &[11, 22]).await;
    assert!(!scores.scores.contains_key(&worker));
}

2118
2119
2120
mod tree_specific_tests {
    use super::*;
    use rstest_reuse::apply;
2121

2122
2123
2124
2125
    #[tokio::test]
    #[apply(tree_indexer_template)]
    async fn test_frequency(variant: &str) {
        const ONE_MILLIS: Duration = Duration::from_millis(1);
2126

2127
2128
        let expiration = Duration::from_millis(50);
        let kv_indexer = make_tree_indexer_with_frequency(variant, expiration);
2129

2130
2131
2132
2133
2134
2135
2136
        // The blocks
        let block_hashes = vec![
            LocalBlockHash(1),
            LocalBlockHash(2),
            LocalBlockHash(3),
            LocalBlockHash(4),
        ];
2137

2138
2139
2140
2141
2142
2143
2144
2145
2146
2147
2148
2149
2150
2151
2152
2153
2154
2155
2156
2157
2158
2159
2160
2161
2162
2163
2164
2165
2166
        let overlap = kv_indexer.find_matches(block_hashes.clone()).await.unwrap();
        assert_eq!(
            overlap.frequencies.len(),
            0,
            "Should be no cached blocks yet"
        );

        // Blocks go in cache
        let event = make_store_event(0, &[1, 2, 3, 4]);
        kv_indexer.apply_event(event).await;

        // First access - poll briefly since store event is applied async
        let mut overlap = OverlapScores::default();
        let timeout = Duration::from_millis(10);
        let start = Instant::now();
        while overlap.scores.is_empty() && Instant::now().duration_since(start) < timeout {
            time::sleep(ONE_MILLIS).await;
            overlap = kv_indexer.find_matches(block_hashes.clone()).await.unwrap();
        }
        assert_eq!(
            overlap.scores.len(),
            1,
            "One worker has these blocks cached"
        );
        assert_eq!(
            overlap.frequencies.len(),
            0,
            "Blocks have not previously been accessed"
        );
2167

2168
2169
2170
2171
2172
2173
2174
2175
        // Second access
        let overlap = kv_indexer.find_matches(block_hashes.clone()).await.unwrap();
        assert_eq!(overlap.scores.len(), 1, "Still one worker matches");
        assert_eq!(
            overlap.frequencies,
            vec![1, 1, 1, 1],
            "We should see the first access now"
        );
2176

2177
2178
        // Let those two accesses expire
        time::sleep(expiration + Duration::from_millis(10)).await;
2179

2180
2181
2182
2183
2184
2185
2186
2187
2188
2189
2190
2191
2192
2193
2194
2195
2196
2197
2198
2199
2200
2201
2202
        // New first access
        let overlap = kv_indexer.find_matches(block_hashes.clone()).await.unwrap();
        assert_eq!(
            overlap.frequencies.len(),
            0,
            "Blocks were accessed too long ago"
        );

        // New second access
        let _ = kv_indexer.find_matches(block_hashes.clone()).await.unwrap();

        // Access only the first three blocks
        let overlap = kv_indexer
            .find_matches(block_hashes[0..3].to_vec())
            .await
            .unwrap();
        // We see the previous two new accesses
        assert_eq!(overlap.frequencies, vec![2, 2, 2]);

        // The third access did not touch the last block
        let overlap = kv_indexer.find_matches(block_hashes.clone()).await.unwrap();
        assert_eq!(overlap.frequencies, vec![3, 3, 3, 2]);
    }
2203
2204
2205
2206
2207
2208
}

// ============================================================================
// KvIndexerMetrics tests
// ============================================================================

2209
2210
2211
mod metrics_tests {
    #[cfg(feature = "metrics")]
    use super::*;
2212

2213
2214
2215
2216
    #[cfg(feature = "metrics")]
    #[test]
    fn test_increment_event_applied() {
        let metrics = KvIndexerMetrics::new_unregistered();
2217

2218
2219
2220
2221
2222
2223
2224
2225
2226
2227
2228
2229
2230
2231
2232
2233
2234
2235
2236
2237
2238
2239
2240
2241
2242
        metrics.increment_event_applied(METRIC_EVENT_STORED, Ok(()));
        assert_eq!(
            metrics
                .kv_cache_events_applied
                .get_metric_with_label_values(&[METRIC_EVENT_STORED, METRIC_STATUS_OK])
                .unwrap()
                .get(),
            1
        );

        metrics.increment_event_applied(
            METRIC_EVENT_STORED,
            Err(KvCacheEventError::ParentBlockNotFound),
        );
        assert_eq!(
            metrics
                .kv_cache_events_applied
                .get_metric_with_label_values(&[
                    METRIC_EVENT_STORED,
                    METRIC_STATUS_PARENT_NOT_FOUND
                ])
                .unwrap()
                .get(),
            1
        );
2243
2244

        metrics
2245
2246
2247
2248
2249
2250
2251
2252
2253
2254
2255
2256
            .increment_event_applied(METRIC_EVENT_REMOVED, Err(KvCacheEventError::BlockNotFound));
        assert_eq!(
            metrics
                .kv_cache_events_applied
                .get_metric_with_label_values(&[
                    METRIC_EVENT_REMOVED,
                    METRIC_STATUS_BLOCK_NOT_FOUND
                ])
                .unwrap()
                .get(),
            1
        );
2257
2258
2259
2260
2261
2262
2263
2264
2265
2266

        metrics.increment_event_warning(METRIC_WARNING_DUPLICATE_STORE);
        assert_eq!(
            metrics
                .kv_cache_event_warnings
                .get_metric_with_label_values(&[METRIC_WARNING_DUPLICATE_STORE])
                .unwrap()
                .get(),
            1
        );
2267
    }
2268
2269
2270
2271
2272
2273
2274
2275
2276
2277
2278
2279
2280
2281
2282
2283
2284
2285
2286
2287
2288
2289
2290
2291
2292
2293
2294
2295
2296
}

// ============================================================================
// LocalKvIndexer tests
// ============================================================================

fn make_local_indexer_with_events(ids: &[u64]) -> LocalKvIndexer {
    let indexer = LocalKvIndexer::new(
        CancellationToken::new(),
        4,
        Arc::new(KvIndexerMetrics::new_unregistered()),
        32,
    );
    {
        let mut buffer = indexer.event_buffer.lock().unwrap();
        for &id in ids {
            buffer.push_back(RouterEvent::new(
                0,
                KvCacheEvent {
                    event_id: id,
                    data: KvCacheEventData::Cleared,
                    dp_rank: 0,
                },
            ));
        }
    }
    indexer
}

2297
2298
2299
2300
mod local_indexer_tests {
    use super::*;
    use rstest_reuse::apply;

2301
2302
2303
2304
2305
2306
2307
    fn make_local_store_event(event_id: u64, block_hash: u64) -> RouterEvent {
        RouterEvent::new(
            0,
            KvCacheEvent {
                event_id,
                data: KvCacheEventData::Stored(KvCacheStoreData {
                    parent_hash: None,
2308
                    start_position: None,
2309
2310
2311
2312
2313
2314
2315
2316
2317
2318
2319
2320
2321
2322
2323
2324
2325
2326
2327
2328
2329
2330
2331
2332
2333
2334
2335
2336
2337
2338
2339
2340
2341
2342
2343
2344
2345
2346
2347
                    blocks: vec![KvCacheStoredBlockData {
                        block_hash: ExternalSequenceBlockHash(block_hash),
                        tokens_hash: LocalBlockHash(block_hash),
                        mm_extra_info: None,
                    }],
                }),
                dp_rank: 0,
            },
        )
    }

    fn make_local_remove_event(event_id: u64, block_hashes: &[u64]) -> RouterEvent {
        RouterEvent::new(
            0,
            KvCacheEvent {
                event_id,
                data: KvCacheEventData::Removed(KvCacheRemoveData {
                    block_hashes: block_hashes
                        .iter()
                        .copied()
                        .map(ExternalSequenceBlockHash)
                        .collect(),
                }),
                dp_rank: 0,
            },
        )
    }

    fn make_local_clear_event(event_id: u64) -> RouterEvent {
        RouterEvent::new(
            0,
            KvCacheEvent {
                event_id,
                data: KvCacheEventData::Cleared,
                dp_rank: 0,
            },
        )
    }

2348
2349
2350
2351
2352
2353
2354
    #[tokio::test]
    async fn test_local_indexer_slice_within_range() {
        let indexer = make_local_indexer_with_events(&[1, 2, 3, 4, 5]);

        // Helper to extract events from response
        let extract_events = |resp: WorkerKvQueryResponse| -> Vec<RouterEvent> {
            match resp {
2355
                WorkerKvQueryResponse::Events { events: e, .. } => e,
2356
2357
2358
2359
2360
                WorkerKvQueryResponse::TreeDump { events: e, .. } => e,
                _ => panic!("Unexpected response type"),
            }
        };

2361
2362
2363
2364
2365
2366
2367
2368
        let extract_last_event_id = |resp: &WorkerKvQueryResponse| -> Option<u64> {
            match resp {
                WorkerKvQueryResponse::Events { last_event_id, .. } => Some(*last_event_id),
                WorkerKvQueryResponse::TreeDump { last_event_id, .. } => Some(*last_event_id),
                _ => None,
            }
        };

2369
2370
2371
2372
2373
        let get_ids = |events: Vec<RouterEvent>| -> Vec<u64> {
            events.iter().map(|e| e.event.event_id).collect()
        };

        // Test get_events_in_id_range (buffer queries)
2374
        // Buffer hits now return the contiguous suffix through the buffered tail.
2375
        let result = indexer.get_events_in_id_range(Some(2), Some(4)).await;
2376
2377
2378
        let ids = get_ids(extract_events(result.clone()));
        assert_eq!(ids, vec![2, 3, 4, 5]);
        assert_eq!(extract_last_event_id(&result), Some(5));
2379
2380

        let result = indexer.get_events_in_id_range(Some(2), Some(6)).await;
2381
        let ids = get_ids(extract_events(result.clone()));
2382
        assert_eq!(ids, vec![2, 3, 4, 5]); // clamp end to buffer max
2383
        assert_eq!(extract_last_event_id(&result), Some(5));
2384
2385
2386
2387
2388
2389

        // start_id=0 is before buffer (first is 1), so should trigger tree dump
        let result = indexer.get_events_in_id_range(Some(0), Some(4)).await;
        assert!(matches!(result, WorkerKvQueryResponse::TreeDump { .. }));

        let result = indexer.get_events_in_id_range(Some(3), Some(3)).await;
2390
2391
2392
        let ids = get_ids(extract_events(result.clone()));
        assert_eq!(ids, vec![3, 4, 5]);
        assert_eq!(extract_last_event_id(&result), Some(5));
2393
2394
2395
2396
2397

        // Invalid range: end < start
        let result = indexer.get_events_in_id_range(Some(5), Some(2)).await;
        assert!(matches!(result, WorkerKvQueryResponse::InvalidRange { .. }));
    }
2398

2399
2400
2401
2402
2403
2404
2405
2406
2407
2408
2409
2410
2411
2412
2413
2414
2415
2416
    #[tokio::test]
    async fn test_local_indexer_get_events_in_id_range_all_cases() {
        // Create indexer with small buffer (5 events max)
        let indexer = LocalKvIndexer::new(
            CancellationToken::new(),
            4,
            Arc::new(KvIndexerMetrics::new_unregistered()),
            5,
        );

        // Helper to create a test event
        let make_event = |id: u64| {
            RouterEvent::new(
                0,
                KvCacheEvent {
                    event_id: id,
                    data: KvCacheEventData::Stored(KvCacheStoreData {
                        parent_hash: None,
2417
                        start_position: None,
2418
2419
2420
2421
2422
2423
2424
2425
2426
2427
2428
2429
2430
2431
2432
2433
2434
                        blocks: vec![KvCacheStoredBlockData {
                            block_hash: ExternalSequenceBlockHash(id * 100),
                            tokens_hash: LocalBlockHash(id * 200),
                            mm_extra_info: None,
                        }],
                    }),
                    dp_rank: 0,
                },
            )
        };

        // Add 10 events (IDs 5-14), buffer keeps last 5: events 10-14
        for id in 5..15 {
            indexer
                .apply_event_with_buffer(make_event(id))
                .await
                .unwrap();
2435
2436
        }

2437
2438
        // Wait for events to be processed
        indexer.flush().await;
2439

2440
2441
        let extract_events = |resp: WorkerKvQueryResponse| -> Vec<RouterEvent> {
            match resp {
2442
                WorkerKvQueryResponse::Events { events: e, .. } => e,
2443
2444
2445
2446
                WorkerKvQueryResponse::TreeDump { events: e, .. } => e,
                _ => panic!("Unexpected response type: {:?}", resp),
            }
        };
2447

2448
2449
2450
2451
2452
2453
2454
2455
        let extract_last_event_id = |resp: &WorkerKvQueryResponse| -> Option<u64> {
            match resp {
                WorkerKvQueryResponse::Events { last_event_id, .. } => Some(*last_event_id),
                WorkerKvQueryResponse::TreeDump { last_event_id, .. } => Some(*last_event_id),
                _ => None,
            }
        };

2456
2457
2458
        let get_ids = |events: Vec<RouterEvent>| -> Vec<u64> {
            events.iter().map(|e| e.event.event_id).collect()
        };
2459

2460
2461
2462
        // Verify buffer state
        let buffer_events = indexer.get_all_events_in_buffer();
        assert_eq!(get_ids(buffer_events), vec![10, 11, 12, 13, 14]);
2463

2464
2465
        // Buffer path tests
        let result = indexer.get_events_in_id_range(Some(11), None).await;
2466
2467
2468
2469
2470
        assert_eq!(
            get_ids(extract_events(result.clone())),
            vec![11, 12, 13, 14]
        );
        assert_eq!(extract_last_event_id(&result), Some(14));
2471

2472
        let result = indexer.get_events_in_id_range(Some(10), Some(14)).await;
2473
2474
2475
2476
2477
2478
2479
2480
2481
2482
2483
2484
        assert_eq!(
            get_ids(extract_events(result.clone())),
            vec![10, 11, 12, 13, 14]
        );
        assert_eq!(extract_last_event_id(&result), Some(14));

        let result = indexer.get_events_in_id_range(Some(11), Some(12)).await;
        assert_eq!(
            get_ids(extract_events(result.clone())),
            vec![11, 12, 13, 14]
        );
        assert_eq!(extract_last_event_id(&result), Some(14));
2485

2486
2487
2488
2489
        // Tree dump path tests
        let result = indexer.get_events_in_id_range(None, None).await;
        assert!(matches!(&result, WorkerKvQueryResponse::TreeDump { .. }));
        assert_eq!(extract_events(result).len(), 10);
2490

2491
2492
2493
2494
2495
2496
2497
2498
2499
2500
2501
2502
2503
2504
2505
2506
2507
2508
2509
2510
2511
2512
2513
2514
2515
2516
2517
2518
        let result = indexer.get_events_in_id_range(Some(7), None).await;
        assert!(matches!(result, WorkerKvQueryResponse::TreeDump { .. }));

        // Edge cases
        let result = indexer.get_events_in_id_range(Some(15), Some(10)).await;
        assert!(matches!(result, WorkerKvQueryResponse::InvalidRange { .. }));

        let result = indexer.get_events_in_id_range(Some(100), Some(200)).await;
        assert!(matches!(result, WorkerKvQueryResponse::TooNew { .. }));
    }

    #[tokio::test]
    async fn test_tree_dump_includes_last_event_id() {
        // Create indexer with small buffer (5 events max)
        let indexer = LocalKvIndexer::new(
            CancellationToken::new(),
            4,
            Arc::new(KvIndexerMetrics::new_unregistered()),
            5,
        );

        let make_event = |id: u64| {
            RouterEvent::new(
                0,
                KvCacheEvent {
                    event_id: id,
                    data: KvCacheEventData::Stored(KvCacheStoreData {
                        parent_hash: None,
2519
                        start_position: None,
2520
2521
2522
2523
2524
2525
2526
2527
2528
2529
2530
2531
2532
2533
2534
2535
2536
2537
2538
2539
2540
2541
2542
2543
2544
2545
2546
2547
2548
2549
2550
2551
2552
2553
2554
2555
2556
2557
2558
2559
2560
2561
2562
2563
2564
2565
2566
2567
2568
2569
2570
2571
2572
2573
2574
2575
2576
2577
2578
2579
2580
2581
2582
2583
2584
2585
2586
2587
2588
2589
2590
2591
2592
2593
2594
2595
2596
2597
2598
2599
2600
2601
2602
2603
                        blocks: vec![KvCacheStoredBlockData {
                            block_hash: ExternalSequenceBlockHash(id * 100),
                            tokens_hash: LocalBlockHash(id * 200),
                            mm_extra_info: None,
                        }],
                    }),
                    dp_rank: 0,
                },
            )
        };

        // Add 10 events (IDs 5-14), buffer keeps last 5: events 10-14
        for id in 5..15 {
            indexer
                .apply_event_with_buffer(make_event(id))
                .await
                .unwrap();
        }
        indexer.flush().await;

        // Request with start_id=None -> tree dump should include last_event_id=14
        let result = indexer.get_events_in_id_range(None, None).await;
        match result {
            WorkerKvQueryResponse::TreeDump {
                last_event_id,
                events,
            } => {
                assert_eq!(
                    last_event_id, 14,
                    "last_event_id should be the buffer's newest event ID"
                );
                assert!(!events.is_empty(), "tree dump should contain events");
            }
            other => panic!("Expected TreeDump, got: {other:?}"),
        }

        // Request with start_id older than buffer -> tree dump should include last_event_id=14
        let result = indexer.get_events_in_id_range(Some(7), None).await;
        match result {
            WorkerKvQueryResponse::TreeDump {
                last_event_id,
                events,
            } => {
                assert_eq!(
                    last_event_id, 14,
                    "last_event_id should be the buffer's newest event ID"
                );
                assert!(!events.is_empty(), "tree dump should contain events");
            }
            other => panic!("Expected TreeDump, got: {other:?}"),
        }

        // Empty buffer case: create a fresh indexer with no events
        let empty_indexer = LocalKvIndexer::new(
            CancellationToken::new(),
            4,
            Arc::new(KvIndexerMetrics::new_unregistered()),
            5,
        );
        let result = empty_indexer.get_events_in_id_range(None, None).await;
        match result {
            WorkerKvQueryResponse::TreeDump {
                last_event_id,
                events,
            } => {
                assert_eq!(
                    last_event_id, 0,
                    "empty buffer should return last_event_id=0"
                );
                assert!(events.is_empty(), "empty indexer should have no events");
            }
            other => panic!("Expected TreeDump, got: {other:?}"),
        }
    }

    #[tokio::test]
    async fn test_local_indexer_buffer_and_serialization() {
        let worker_id = 42u64;
        let token = CancellationToken::new();
        let metrics = Arc::new(KvIndexerMetrics::new_unregistered());
        let local_indexer = Arc::new(LocalKvIndexer::new(token, 4, metrics, 100));

        let test_event = RouterEvent::new(
            worker_id,
2604
            KvCacheEvent {
2605
                event_id: 1,
2606
2607
                data: KvCacheEventData::Stored(KvCacheStoreData {
                    parent_hash: None,
2608
                    start_position: None,
2609
                    blocks: vec![KvCacheStoredBlockData {
2610
2611
                        block_hash: ExternalSequenceBlockHash(100),
                        tokens_hash: LocalBlockHash(200),
2612
2613
2614
2615
2616
                        mm_extra_info: None,
                    }],
                }),
                dp_rank: 0,
            },
2617
        );
2618

2619
2620
        local_indexer
            .apply_event_with_buffer(test_event)
2621
2622
2623
            .await
            .unwrap();

2624
        local_indexer.flush().await;
2625

2626
2627
2628
        let buffered_events = local_indexer.get_all_events_in_buffer();
        assert_eq!(buffered_events.len(), 1);
        assert_eq!(buffered_events[0].worker_id, worker_id);
2629

2630
        // Test serialization round-trip
2631
2632
2633
2634
        let response = WorkerKvQueryResponse::Events {
            events: buffered_events,
            last_event_id: 1,
        };
2635
2636
        let serialized = serde_json::to_vec(&response).unwrap();
        let deserialized: WorkerKvQueryResponse = serde_json::from_slice(&serialized).unwrap();
2637

2638
2639
2640
2641
2642
        let (events, last_event_id) = match deserialized {
            WorkerKvQueryResponse::Events {
                events,
                last_event_id,
            } => (events, last_event_id),
2643
2644
2645
2646
            _ => panic!("Expected Events variant"),
        };
        assert_eq!(events.len(), 1);
        assert_eq!(events[0].worker_id, worker_id);
2647
        assert_eq!(last_event_id, 1);
2648
    }
2649

2650
2651
2652
2653
2654
2655
2656
2657
    #[tokio::test]
    async fn test_local_indexer_does_not_buffer_failed_send() {
        let local_indexer = LocalKvIndexer::new(
            CancellationToken::new(),
            4,
            Arc::new(KvIndexerMetrics::new_unregistered()),
            5,
        );
2658

2659
2660
        let test_event = RouterEvent::new(
            7,
2661
            KvCacheEvent {
2662
                event_id: 1,
2663
2664
                data: KvCacheEventData::Stored(KvCacheStoreData {
                    parent_hash: None,
2665
                    start_position: None,
2666
                    blocks: vec![KvCacheStoredBlockData {
2667
2668
                        block_hash: ExternalSequenceBlockHash(100),
                        tokens_hash: LocalBlockHash(200),
2669
2670
2671
2672
2673
                        mm_extra_info: None,
                    }],
                }),
                dp_rank: 0,
            },
2674
        );
2675

2676
2677
2678
2679
2680
2681
2682
2683
2684
2685
2686
2687
2688
2689
2690
2691
2692
        let event_tx = local_indexer.event_sender();
        local_indexer.shutdown();
        event_tx.closed().await;

        let result = local_indexer.apply_event_with_buffer(test_event).await;
        assert!(matches!(result, Err(KvRouterError::IndexerOffline)));
        assert_eq!(local_indexer.buffer_len(), 0);

        match local_indexer.get_events_in_id_range(None, None).await {
            WorkerKvQueryResponse::TreeDump {
                events,
                last_event_id,
            } => {
                assert!(events.is_empty());
                assert_eq!(last_event_id, 0);
            }
            other => panic!("Expected TreeDump, got: {other:?}"),
2693
2694
        }
    }
2695

2696
2697
2698
2699
2700
2701
2702
2703
2704
2705
2706
2707
2708
2709
2710
2711
2712
2713
2714
2715
2716
2717
2718
2719
2720
2721
2722
2723
2724
2725
2726
2727
2728
2729
2730
2731
2732
2733
2734
2735
2736
2737
2738
2739
2740
2741
2742
2743
2744
2745
2746
2747
2748
2749
2750
2751
2752
2753
2754
2755
2756
2757
2758
2759
2760
2761
2762
2763
2764
2765
2766
2767
2768
2769
2770
2771
2772
2773
2774
2775
2776
2777
2778
2779
2780
2781
2782
2783
2784
2785
2786
2787
2788
2789
2790
2791
2792
2793
2794
2795
2796
2797
2798
2799
2800
2801
2802
2803
2804
2805
2806
2807
2808
2809
2810
2811
2812
2813
2814
2815
2816
2817
2818
2819
2820
2821
2822
2823
2824
2825
2826
2827
2828
2829
2830
2831
2832
2833
2834
2835
2836
2837
2838
2839
2840
2841
2842
2843
2844
2845
2846
2847
2848
2849
2850
2851
2852
2853
2854
2855
2856
2857
2858
2859
2860
2861
2862
2863
2864
2865
2866
2867
2868
2869
2870
2871
2872
2873
2874
2875
2876
2877
2878
2879
2880
2881
2882
2883
2884
2885
2886
2887
2888
2889
2890
2891
2892
2893
2894
2895
2896
2897
2898
2899
2900
2901
2902
2903
2904
2905
2906
2907
2908
2909
2910
2911
2912
2913
2914
2915
2916
2917
2918
2919
2920
2921
2922
2923
2924
2925
2926
2927
2928
2929
2930
2931
2932
2933
2934
2935
2936
2937
2938
2939
2940
2941
2942
2943
2944
2945
2946
2947
2948
2949
2950
2951
2952
2953
2954
2955
2956
2957
2958
2959
2960
2961
2962
2963
2964
2965
2966
2967
2968
2969
2970
    #[tokio::test]
    async fn test_local_indexer_remove_worker_dp_rank_only_clears_target_rank() {
        let local_indexer = LocalKvIndexer::new(
            CancellationToken::new(),
            4,
            Arc::new(KvIndexerMetrics::new_unregistered()),
            5,
        );

        local_indexer
            .apply_event_with_buffer(make_store_event_with_dp_rank(7, &[101], 0))
            .await
            .unwrap();
        local_indexer
            .apply_event_with_buffer(make_store_event_with_dp_rank(7, &[202], 1))
            .await
            .unwrap();
        local_indexer.flush().await;

        local_indexer.remove_worker_dp_rank(7, 0).await;
        local_indexer.flush().await;

        let events = local_indexer.dump_events().await.unwrap();
        let mut rank0 = events
            .iter()
            .filter(|event| event.worker_id == 7 && event.event.dp_rank == 0)
            .collect::<Vec<_>>();
        let mut rank1 = events
            .iter()
            .filter(|event| event.worker_id == 7 && event.event.dp_rank == 1)
            .collect::<Vec<_>>();
        rank0.sort_by_key(|event| event.event.event_id);
        rank1.sort_by_key(|event| event.event.event_id);

        assert!(rank0.is_empty());
        assert_eq!(rank1.len(), 1);
        assert!(matches!(
            &rank1[0].event.data,
            KvCacheEventData::Stored(data)
                if data.blocks.first().map(|block| block.block_hash.0) == Some(202)
        ));
    }

    #[tokio::test]
    async fn test_local_indexer_coalesces_concurrent_tree_dumps() {
        let indexer = Arc::new(LocalKvIndexer::new(
            CancellationToken::new(),
            4,
            Arc::new(KvIndexerMetrics::new_unregistered()),
            5,
        ));
        indexer.set_dump_build_delay(Some(Duration::from_millis(50)));

        let first = {
            let indexer = indexer.clone();
            tokio::spawn(async move { indexer.get_events_in_id_range(None, None).await })
        };
        tokio::time::sleep(Duration::from_millis(10)).await;
        let second = {
            let indexer = indexer.clone();
            tokio::spawn(async move { indexer.get_events_in_id_range(None, None).await })
        };

        let first = first.await.unwrap();
        let second = second.await.unwrap();

        assert!(matches!(first, WorkerKvQueryResponse::TreeDump { .. }));
        assert!(matches!(second, WorkerKvQueryResponse::TreeDump { .. }));
        assert_eq!(indexer.dump_build_count(), 1);
    }

    #[tokio::test(start_paused = true)]
    async fn test_local_indexer_reuses_cached_tree_dump_without_time_expiry() {
        let indexer = LocalKvIndexer::new(
            CancellationToken::new(),
            4,
            Arc::new(KvIndexerMetrics::new_unregistered()),
            5,
        );
        indexer
            .apply_event_with_buffer(make_local_store_event(1, 101))
            .await
            .unwrap();
        indexer.flush().await;

        let first = indexer.get_events_in_id_range(None, None).await;
        time::advance(Duration::from_secs(60)).await;
        let second = indexer.get_events_in_id_range(None, None).await;

        assert!(matches!(first, WorkerKvQueryResponse::TreeDump { .. }));
        assert!(matches!(second, WorkerKvQueryResponse::TreeDump { .. }));
        assert_eq!(indexer.dump_build_count(), 1);
    }

    #[tokio::test]
    async fn test_local_indexer_rebuilds_when_cumulative_append_budget_exceeded() {
        let indexer = LocalKvIndexer::new(
            CancellationToken::new(),
            4,
            Arc::new(KvIndexerMetrics::new_unregistered()),
            5,
        );
        indexer
            .apply_event_with_buffer(make_local_store_event(1, 101))
            .await
            .unwrap();
        indexer.flush().await;

        let _ = indexer.get_events_in_id_range(None, None).await;
        assert_eq!(indexer.dump_build_count(), 1);

        indexer
            .apply_event_with_buffer(make_local_store_event(2, 202))
            .await
            .unwrap();
        let _ = indexer.get_events_in_id_range(None, None).await;
        assert_eq!(indexer.dump_build_count(), 1);

        indexer
            .apply_event_with_buffer(make_local_store_event(3, 303))
            .await
            .unwrap();
        let _ = indexer.get_events_in_id_range(None, None).await;
        assert_eq!(indexer.dump_build_count(), 1);

        indexer
            .apply_event_with_buffer(make_local_store_event(4, 404))
            .await
            .unwrap();
        let _ = indexer.get_events_in_id_range(None, None).await;
        assert_eq!(indexer.dump_build_count(), 2);
    }

    #[tokio::test]
    async fn test_local_indexer_appends_safe_tail_to_cached_dump() {
        let indexer = LocalKvIndexer::new(
            CancellationToken::new(),
            4,
            Arc::new(KvIndexerMetrics::new_unregistered()),
            5,
        );
        indexer
            .apply_event_with_buffer(make_local_store_event(1, 101))
            .await
            .unwrap();
        indexer.flush().await;

        let first = indexer.get_events_in_id_range(None, None).await;
        assert!(matches!(first, WorkerKvQueryResponse::TreeDump { .. }));
        assert_eq!(indexer.dump_build_count(), 1);

        indexer
            .apply_event_with_buffer(make_local_remove_event(2, &[101]))
            .await
            .unwrap();

        match indexer.get_events_in_id_range(None, None).await {
            WorkerKvQueryResponse::TreeDump {
                events,
                last_event_id,
            } => {
                assert_eq!(last_event_id, 2);
                assert!(events.iter().any(|event| event.event.event_id == 2));
                assert!(
                    events
                        .iter()
                        .any(|event| matches!(event.event.data, KvCacheEventData::Removed(_)))
                );
            }
            other => panic!("Expected TreeDump, got: {other:?}"),
        }
        assert_eq!(indexer.dump_build_count(), 1);
    }

    #[tokio::test]
    async fn test_local_indexer_invalidates_cache_on_clear() {
        let indexer = LocalKvIndexer::new(
            CancellationToken::new(),
            4,
            Arc::new(KvIndexerMetrics::new_unregistered()),
            5,
        );
        indexer
            .apply_event_with_buffer(make_local_store_event(1, 101))
            .await
            .unwrap();
        indexer.flush().await;

        let _ = indexer.get_events_in_id_range(None, None).await;
        assert_eq!(indexer.dump_build_count(), 1);

        indexer
            .apply_event_with_buffer(make_local_clear_event(2))
            .await
            .unwrap();

        let _ = indexer.get_events_in_id_range(None, None).await;
        assert_eq!(indexer.dump_build_count(), 2);
    }

    #[tokio::test]
    async fn test_local_indexer_invalidates_cache_on_event_gap() {
        let indexer = LocalKvIndexer::new(
            CancellationToken::new(),
            4,
            Arc::new(KvIndexerMetrics::new_unregistered()),
            5,
        );
        indexer
            .apply_event_with_buffer(make_local_store_event(1, 101))
            .await
            .unwrap();
        indexer.flush().await;

        let _ = indexer.get_events_in_id_range(None, None).await;
        assert_eq!(indexer.dump_build_count(), 1);

        indexer
            .apply_event_with_buffer(make_local_store_event(3, 303))
            .await
            .unwrap();

        let _ = indexer.get_events_in_id_range(None, None).await;
        assert_eq!(indexer.dump_build_count(), 2);
    }

    #[tokio::test]
    async fn test_local_indexer_invalidates_cache_on_missing_tail_coverage() {
        let indexer = LocalKvIndexer::new(
            CancellationToken::new(),
            4,
            Arc::new(KvIndexerMetrics::new_unregistered()),
            1,
        );
        indexer
            .apply_event_with_buffer(make_local_store_event(1, 101))
            .await
            .unwrap();
        indexer.flush().await;

        let _ = indexer.get_events_in_id_range(None, None).await;
        assert_eq!(indexer.dump_build_count(), 1);

        indexer
            .apply_event_with_buffer(make_local_store_event(2, 202))
            .await
            .unwrap();
        indexer
            .apply_event_with_buffer(make_local_store_event(3, 303))
            .await
            .unwrap();

        let _ = indexer.get_events_in_id_range(None, None).await;
        assert_eq!(indexer.dump_build_count(), 2);
    }

    #[tokio::test]
    async fn test_local_indexer_failed_dump_is_not_cached() {
        let indexer = LocalKvIndexer::new(
            CancellationToken::new(),
            4,
            Arc::new(KvIndexerMetrics::new_unregistered()),
            5,
        );

        let dump_tx = indexer.snapshot_event_sender();
        indexer.shutdown();
        dump_tx.closed().await;

        let _ = indexer.get_events_in_id_range(None, None).await;
        let _ = indexer.get_events_in_id_range(None, None).await;

        assert_eq!(indexer.dump_build_count(), 2);
    }

2971
2972
2973
2974
    #[tokio::test]
    #[apply(indexer_template)]
    async fn test_apply_events_idempotent(variant: &str) {
        let index = make_indexer(variant);
2975

2976
2977
2978
2979
2980
2981
2982
2983
2984
2985
2986
2987
2988
2989
2990
2991
2992
2993
2994
2995
2996
2997
2998
2999
3000
3001
3002
3003
3004
3005
        // Setup: build initial tree
        index.apply_event(make_store_event(0, &[1, 2, 3])).await;
        index.apply_event(make_store_event(1, &[4, 5, 6])).await;
        index
            .apply_event(make_store_event_with_parent(0, &[1, 2, 3], &[7, 8]))
            .await;
        flush_and_settle(index.as_ref()).await;
        let s0 = snapshot_tree(index.as_ref()).await;

        // Mutation events: each add paired with its remove
        let adds = [
            make_store_event(2, &[1, 2, 9]),
            make_store_event_with_parent(1, &[4, 5, 6], &[10, 11, 12]),
        ];
        let removes = [
            make_remove_event(2, &[1, 2, 9]),
            make_remove_event_with_parent(1, &[4, 5, 6], &[10, 11, 12]),
        ];

        // Phase 1: interleaved add/remove
        index.apply_event(adds[0].clone()).await;
        index.apply_event(removes[0].clone()).await;
        index.apply_event(adds[1].clone()).await;
        index.apply_event(removes[1].clone()).await;
        flush_and_settle(index.as_ref()).await;
        let s1 = snapshot_tree(index.as_ref()).await;
        assert_eq!(
            s0, s1,
            "Phase 1: interleaved add/remove should restore tree"
        );
3006

3007
3008
3009
3010
3011
3012
3013
3014
3015
3016
3017
3018
3019
3020
3021
3022
3023
3024
3025
3026
        // Phase 2: same interleaved again (idempotence of the full cycle)
        index.apply_event(adds[0].clone()).await;
        index.apply_event(removes[0].clone()).await;
        index.apply_event(adds[1].clone()).await;
        index.apply_event(removes[1].clone()).await;
        flush_and_settle(index.as_ref()).await;
        let s2 = snapshot_tree(index.as_ref()).await;
        assert_eq!(s1, s2, "Phase 2: repeated cycle should be idempotent");

        // Phase 3: non-interleaved (all adds then all removes)
        index.apply_event(adds[0].clone()).await;
        index.apply_event(adds[1].clone()).await;
        index.apply_event(removes[0].clone()).await;
        index.apply_event(removes[1].clone()).await;
        flush_and_settle(index.as_ref()).await;
        let s3 = snapshot_tree(index.as_ref()).await;
        assert_eq!(
            s2, s3,
            "Phase 3: non-interleaved ordering should restore tree"
        );
3027
3028
    }
}