indexer.rs 2.42 KB
Newer Older
1
2
3
4
5
6
7
8
// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

use std::sync::Arc;

use anyhow::Result;
use tokio_util::sync::CancellationToken;

9
use crate::ConcurrentRadixTreeCompressed;
10
11
12
use crate::ThreadPoolIndexer;
use crate::indexer::{KvIndexer, KvIndexerInterface, KvIndexerMetrics};
use crate::protocols::{LocalBlockHash, OverlapScores, RouterEvent, WorkerId};
13
14
15
16

#[derive(Clone)]
pub enum Indexer {
    Single(KvIndexer),
17
    Concurrent(Arc<ThreadPoolIndexer<ConcurrentRadixTreeCompressed>>),
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
}

impl Indexer {
    pub async fn apply_event(&self, event: RouterEvent) {
        match self {
            Indexer::Single(idx) => idx.apply_event(event).await,
            Indexer::Concurrent(idx) => idx.apply_event(event).await,
        }
    }

    pub async fn remove_worker(&self, worker_id: WorkerId) {
        match self {
            Indexer::Single(idx) => idx.remove_worker(worker_id).await,
            Indexer::Concurrent(idx) => idx.remove_worker(worker_id).await,
        }
    }

35
36
37
38
39
40
41
    pub async fn remove_worker_dp_rank(&self, worker_id: WorkerId, dp_rank: u32) {
        match self {
            Indexer::Single(idx) => idx.remove_worker_dp_rank(worker_id, dp_rank).await,
            Indexer::Concurrent(idx) => idx.remove_worker_dp_rank(worker_id, dp_rank).await,
        }
    }

42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
    pub async fn find_matches(&self, hashes: Vec<LocalBlockHash>) -> Result<OverlapScores> {
        match self {
            Indexer::Single(idx) => idx.find_matches(hashes).await.map_err(Into::into),
            Indexer::Concurrent(idx) => idx.find_matches(hashes).await.map_err(Into::into),
        }
    }

    pub async fn dump_events(&self) -> Result<Vec<RouterEvent>> {
        match self {
            Indexer::Single(idx) => idx.dump_events().await.map_err(Into::into),
            Indexer::Concurrent(idx) => idx.dump_events().await.map_err(Into::into),
        }
    }
}

pub fn create_indexer(block_size: u32, num_threads: usize) -> Indexer {
    if num_threads > 1 {
        Indexer::Concurrent(Arc::new(ThreadPoolIndexer::new(
60
            ConcurrentRadixTreeCompressed::new(),
61
62
63
64
65
66
67
68
69
70
71
72
73
            num_threads,
            block_size,
        )))
    } else {
        Indexer::Single(KvIndexer::new_with_frequency(
            CancellationToken::new(),
            None,
            block_size,
            Arc::new(KvIndexerMetrics::new_unregistered()),
            None,
        ))
    }
}