indexer_standalone.rs 4.58 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

use anyhow::Result;
use serde::{Deserialize, Serialize};

use dynamo_runtime::{
    component::Component,
    pipeline::{
        AsyncEngine, AsyncEngineContextProvider, ManyOut, ResponseStream, SingleIn, async_trait,
        network::Ingress,
    },
    protocols::{annotated::Annotated, maybe_error::MaybeError},
    stream,
};

use crate::kv_router::{
    Indexer, KV_INDEXER_QUERY_ENDPOINT, KvRouterConfig,
    protocols::{
        BlockExtraInfo, LocalBlockHash, OverlapScores, RouterEvent, compute_block_hash_for_seq,
    },
    subscriber,
};

#[derive(Serialize, Deserialize, Debug)]
pub enum IndexerQueryRequest {
    FindMatchesHashed {
        block_hashes: Vec<LocalBlockHash>,
    },
    FindMatchesTokens {
        tokens: Vec<u32>,
        block_mm_infos: Option<Vec<Option<BlockExtraInfo>>>,
    },
    DumpTree,
}

#[derive(Serialize, Deserialize, Debug)]
pub enum IndexerQueryResponse {
    Matches(OverlapScores),
    TreeDump(Vec<RouterEvent>),
    Error(String),
}

impl MaybeError for IndexerQueryResponse {
    fn from_err(err: Box<dyn std::error::Error + Send + Sync>) -> Self {
        IndexerQueryResponse::Error(err.to_string())
    }

    fn err(&self) -> Option<anyhow::Error> {
        match self {
            IndexerQueryResponse::Error(msg) => Some(anyhow::Error::msg(msg.clone())),
            _ => None,
        }
    }
}

struct IndexerQueryEngine {
    indexer: Indexer,
    block_size: u32,
}

#[async_trait]
impl
    AsyncEngine<
        SingleIn<IndexerQueryRequest>,
        ManyOut<Annotated<IndexerQueryResponse>>,
        anyhow::Error,
    > for IndexerQueryEngine
{
    async fn generate(
        &self,
        request: SingleIn<IndexerQueryRequest>,
    ) -> Result<ManyOut<Annotated<IndexerQueryResponse>>> {
        let (request, ctx) = request.into_parts();

        if matches!(request, IndexerQueryRequest::DumpTree) {
            let response = match self.indexer.dump_events().await {
                Ok(events) => IndexerQueryResponse::TreeDump(events),
                Err(e) => IndexerQueryResponse::Error(format!("{e:?}")),
            };
            return Ok(ResponseStream::new(
                Box::pin(stream::iter(vec![Annotated::from_data(response)])),
                ctx.context(),
            ));
        }

        let block_hashes = match request {
            IndexerQueryRequest::FindMatchesHashed { block_hashes } => block_hashes,
            IndexerQueryRequest::FindMatchesTokens {
                tokens,
                block_mm_infos,
            } => compute_block_hash_for_seq(&tokens, self.block_size, block_mm_infos.as_deref()),
            IndexerQueryRequest::DumpTree => unreachable!(),
        };

        let response = match self.indexer.find_matches(block_hashes).await {
            Ok(scores) => IndexerQueryResponse::Matches(scores),
            Err(e) => IndexerQueryResponse::Error(format!("{e:?}")),
        };

        Ok(ResponseStream::new(
            Box::pin(stream::iter(vec![Annotated::from_data(response)])),
            ctx.context(),
        ))
    }
}

async fn start_indexer_query_endpoint(
    component: Component,
    indexer: Indexer,
    block_size: u32,
) -> Result<()> {
    let engine = std::sync::Arc::new(IndexerQueryEngine {
        indexer,
        block_size,
    });

    let ingress = Ingress::for_engine(engine)?;

    let fut = component
        .endpoint(KV_INDEXER_QUERY_ENDPOINT)
        .endpoint_builder()
        .handler(ingress)
        .graceful_shutdown(true)
        .start();

    tokio::spawn(async move {
        if let Err(e) = fut.await {
            tracing::error!("Indexer query endpoint failed: {e:?}");
        }
    });

    Ok(())
}

pub async fn start_kv_block_indexer(
    component: &Component,
    kv_router_config: &KvRouterConfig,
    block_size: u32,
) -> Result<Indexer> {
    if kv_router_config.durable_kv_events {
        anyhow::bail!(
            "standalone indexer does not support durable_kv_events (JetStream): \
             consumer ID collisions, orphan cleanup conflicts, and snapshot/purge races \
             make it incompatible with an independent indexer"
        );
    }

    let indexer = Indexer::new(component, kv_router_config, block_size);

    subscriber::start_subscriber(component.clone(), kv_router_config, indexer.clone()).await?;

    start_indexer_query_endpoint(component.clone(), indexer.clone(), block_size).await?;

    tracing::info!(
        "Standalone KV indexer started with query endpoint '{KV_INDEXER_QUERY_ENDPOINT}'"
    );

    Ok(indexer)
}