subscriber.rs 7.77 KB
Newer Older
1
// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2
3
// SPDX-License-Identifier: Apache-2.0

4
use std::collections::HashMap;
5

6
use crate::kv_router::{
7
    Indexer, KV_EVENT_SUBJECT, KvRouterConfig,
8
    protocols::{DpRank, RouterEvent, WorkerId},
9
    worker_query::WorkerQueryClient,
10
};
11
12
13
14
15
use anyhow::Result;
use dynamo_runtime::{
    component::Component, discovery::EventTransportKind, prelude::*,
    transports::event_plane::EventSubscriber,
};
16

17
/// Start a simplified background task for event consumption using the event plane.
18
19
20
///
/// This is used when local indexer mode is enabled. Unlike `start_kv_router_background`,
/// this function:
21
/// - Uses the event plane (NATS Core or ZMQ) instead of JetStream
22
23
24
25
/// - Does not support snapshots, purging, or durable consumers
/// - On worker Added: dumps worker's local indexer into router
/// - On worker Removed: removes worker from router indexer
///
26
27
28
/// This function first recovers state from all currently registered workers before
/// spawning the background task, ensuring the router is ready before returning.
///
29
/// This is appropriate when workers have local indexers enabled.
30
async fn start_kv_router_background_event_plane(
31
    component: Component,
Yan Ru Pei's avatar
Yan Ru Pei committed
32
    indexer: Indexer,
33
    transport_kind: EventTransportKind,
34
) -> Result<()> {
35
    let cancellation_token = component.drt().primary_token();
36
37
    // WorkerQueryClient handles its own discovery loop for lifecycle + initial recovery.
    // No blocking wait — recovery happens asynchronously as endpoints are discovered.
Yan Ru Pei's avatar
Yan Ru Pei committed
38
    let worker_query_client = WorkerQueryClient::spawn(component.clone(), indexer.clone()).await?;
39

40
41
42
43
44
45
46
47
48
49
    // Subscribe to KV events using the selected event plane transport
    let mut subscriber =
        EventSubscriber::for_component_with_transport(&component, KV_EVENT_SUBJECT, transport_kind)
            .await?
            .typed::<RouterEvent>();
    let kv_event_subject = format!(
        "namespace.{}.component.{}.{}",
        component.namespace().name(),
        component.name(),
        KV_EVENT_SUBJECT
50
51
    );

52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
    match transport_kind {
        EventTransportKind::Nats => {
            tracing::info!(
                subject = %kv_event_subject,
                "KV Router using NATS Core subscription (local_indexer mode)"
            );
        }
        EventTransportKind::Zmq => {
            tracing::info!(
                subject = %kv_event_subject,
                "KV Router using ZMQ event plane subscription (local_indexer mode)"
            );
        }
    }

67
    tokio::spawn(async move {
68
69
70
        // Track last received event ID per (worker, dp_rank) for gap detection
        // Each dp_rank has its own monotonic event ID sequence
        let mut last_event_ids: HashMap<(WorkerId, DpRank), u64> = HashMap::new();
71
72
73
74
75
76

        loop {
            tokio::select! {
                biased;

                _ = cancellation_token.cancelled() => {
77
                    tracing::debug!("KV Router event plane background task received cancellation signal");
78
79
80
                    break;
                }

81
82
83
84
                // Handle event consumption from event plane subscription
                Some(result) = subscriber.next() => {
                    let (envelope, event) = match result {
                        Ok((envelope, event)) => (envelope, event),
85
                        Err(e) => {
86
                            tracing::warn!("Failed to receive RouterEvent from event plane: {e:?}");
87
88
89
90
91
                            continue;
                        }
                    };

                    let worker_id = event.worker_id;
92
                    let dp_rank = event.event.dp_rank;
93
                    let event_id = event.event.event_id;
94
                    let event_key = (worker_id, dp_rank);
95

96
97
98
99
100
101
                    tracing::trace!(
                        "Received event from publisher {} (seq {})",
                        envelope.publisher_id,
                        envelope.sequence
                    );

102
                    // Gap detection: check if event ID is monotonically increasing per (worker, dp_rank)
103
                    // Note: event_id <= last_id is duplicate/out-of-order, apply anyway (idempotent)
104
                    if let Some(&last_id) = last_event_ids.get(&event_key)
105
106
107
108
                        && event_id > last_id + 1
                    {
                        let gap_start = last_id + 1;
                        let gap_end = event_id - 1;
109
                        let gap_size = gap_end - gap_start + 1;
110
                        tracing::warn!(
111
                            "Event ID gap detected for worker {worker_id} dp_rank {dp_rank}, recovering events [{gap_start}, {gap_end}], gap_size: {gap_size}"
112
113
                        );

114
                        if let Err(e) = worker_query_client
115
                            .recover_from_worker(worker_id, dp_rank, Some(gap_start), Some(gap_end))
116
117
                            .await
                        {
118
                            tracing::error!(
119
                                "Failed to recover gap events for worker {worker_id} dp_rank {dp_rank} (gap_start: {gap_start}, gap_end: {gap_end}); proceeding with current event anyway: {e}"
120
121
122
123
124
125
                            );
                        }
                    }

                    // Update last seen event ID (use max to handle out-of-order)
                    last_event_ids
126
                        .entry(event_key)
127
128
129
130
                        .and_modify(|id| *id = (*id).max(event_id))
                        .or_insert(event_id);

                    // Forward the RouterEvent to the indexer
Yan Ru Pei's avatar
Yan Ru Pei committed
131
                    indexer.apply_event(event).await;
132
133
134
135
                }
            }
        }

136
        tracing::debug!("KV Router event plane background task exiting");
137
138
139
140
141
    });

    Ok(())
}

142
143
144
145
/// Helper to decide which subscriber (JetStream or Event Plane) to start based on config
pub async fn start_subscriber(
    component: Component,
    kv_router_config: &KvRouterConfig,
Yan Ru Pei's avatar
Yan Ru Pei committed
146
    indexer: Indexer,
147
148
149
150
151
152
153
) -> Result<()> {
    let transport_kind = EventTransportKind::from_env_or_default();

    // Start subscriber - durable_kv_events flag determines the mode:
    // - durable_kv_events=false (default): Use NATS Core / generic event plane (requires workers to have local_indexer enabled)
    // - durable_kv_events=true: Use JetStream for durability and multi-replica consistency
    if kv_router_config.durable_kv_events {
154
155
156
157
        tracing::warn!(
            "--durable-kv-events is deprecated and will be removed in a future release. \
             The event-plane subscriber (local_indexer mode) is now the recommended path."
        );
158
159
160
161
162
163
164
        if transport_kind == EventTransportKind::Zmq {
            tracing::warn!(
                "--durable-kv-events requires NATS, but ZMQ event plane is configured; falling back to JetStream anyway"
            );
        }
        tracing::info!("Using JetStream subscription (--durable-kv-events enabled)");

165
        let consumer_id = component.drt().discovery().instance_id().to_string();
166
167
168
169
170
171
172
        super::jetstream::start_kv_router_background(
            component,
            consumer_id,
            indexer,
            kv_router_config,
        )
        .await
173
174
175
176
177
178
179
180
181
182
183
184
185
186
    } else {
        if transport_kind == EventTransportKind::Zmq {
            if kv_router_config.router_snapshot_threshold.is_some()
                || kv_router_config.router_reset_states
            {
                tracing::warn!(
                    "ZMQ event plane does not support KV snapshots or state reset; ignoring snapshot/reset settings"
                );
            }
            tracing::info!("Using ZMQ event plane subscription (local_indexer mode)");
        } else {
            tracing::info!("Using NATS Core subscription (local_indexer mode)");
        }

187
        start_kv_router_background_event_plane(component, indexer, transport_kind).await
188
189
    }
}