"vllm/vscode:/vscode.git/clone" did not exist on "0a74e9d0f2367cc121547aa8e21e13b04d4cad30"
remote_indexer.rs 2.33 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

use anyhow::Result;
use futures::StreamExt;

use dynamo_runtime::{
    component::Component,
    pipeline::{ManyOut, RouterMode, SingleIn, network::egress::push_router::PushRouter},
};

use dynamo_kv_router::{
    indexer::{IndexerQueryRequest, IndexerQueryResponse, KV_INDEXER_QUERY_ENDPOINT},
    protocols::{LocalBlockHash, OverlapScores},
};

/// A remote indexer that queries a standalone KV indexer via the request plane.
///
/// Used by the frontend when `remote_indexer_component` is configured. Instead of
/// maintaining a local radix tree, this forwards `find_matches` queries to the
/// standalone indexer service over the Dynamo request plane.
pub struct RemoteIndexer {
    router: PushRouter<IndexerQueryRequest, IndexerQueryResponse>,
    model_name: String,
    namespace: String,
}

impl RemoteIndexer {
    pub async fn new(
        component: &Component,
        indexer_component_name: &str,
        model_name: String,
    ) -> Result<Self> {
        let namespace = component.namespace().name();
        let indexer_ns = component.namespace();
        let indexer_component = indexer_ns.component(indexer_component_name)?;
        let endpoint = indexer_component.endpoint(KV_INDEXER_QUERY_ENDPOINT);
        let client = endpoint.client().await?;
        let router =
            PushRouter::from_client_no_fault_detection(client, RouterMode::RoundRobin).await?;
        Ok(Self {
            router,
            model_name,
            namespace,
        })
    }

    pub async fn find_matches(&self, block_hashes: Vec<LocalBlockHash>) -> Result<OverlapScores> {
        let request = IndexerQueryRequest {
            model_name: self.model_name.clone(),
            namespace: self.namespace.clone(),
            block_hashes,
        };
        let mut stream: ManyOut<IndexerQueryResponse> =
            self.router.round_robin(SingleIn::new(request)).await?;

        match stream.next().await {
            Some(IndexerQueryResponse::Scores(scores)) => Ok(scores.into()),
            Some(IndexerQueryResponse::Error(msg)) => {
                Err(anyhow::anyhow!("Remote indexer error: {}", msg))
            }
            None => Err(anyhow::anyhow!("Remote indexer returned empty response")),
        }
    }
}