"lib/kvbm/vscode:/vscode.git/clone" did not exist on "827b8c3e511295a53351b44b6af596c85f189454"
query_engine.rs 1.65 KB
Newer Older
1
2
3
4
5
6
7
8
9
// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

use std::sync::Arc;

use anyhow::Result;
use dynamo_runtime::pipeline::{
    AsyncEngine, AsyncEngineContextProvider, ManyOut, ResponseStream, SingleIn, async_trait,
};
10
use dynamo_runtime::stream;
11

12
13
use crate::indexer::{IndexerQueryRequest, IndexerQueryResponse};
use crate::standalone_indexer::registry::{IndexerKey, WorkerRegistry};
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33

pub struct IndexerQueryEngine {
    pub registry: Arc<WorkerRegistry>,
}

#[async_trait]
impl AsyncEngine<SingleIn<IndexerQueryRequest>, ManyOut<IndexerQueryResponse>, anyhow::Error>
    for IndexerQueryEngine
{
    async fn generate(
        &self,
        request: SingleIn<IndexerQueryRequest>,
    ) -> Result<ManyOut<IndexerQueryResponse>> {
        let (req, ctx) = request.into_parts();
        let key = IndexerKey {
            model_name: req.model_name.clone(),
            tenant_id: req.namespace.clone(),
        };

        let response = match self.registry.get_indexer(&key) {
34
            Some(entry) => match entry.indexer.find_matches(req.block_hashes).await {
35
                Ok(scores) => IndexerQueryResponse::Scores(scores.into()),
36
                Err(err) => IndexerQueryResponse::Error(err.to_string()),
37
38
39
40
41
42
43
            },
            None => IndexerQueryResponse::Error(format!(
                "no indexer for model={} namespace={}",
                req.model_name, req.namespace
            )),
        };

44
45
46
47
48
        let response_stream = stream::iter(vec![response]);
        Ok(ResponseStream::new(
            Box::pin(response_stream),
            ctx.context(),
        ))
49
50
    }
}