subscriber.rs 8.03 KB
Newer Older
1
// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2
3
// SPDX-License-Identifier: Apache-2.0

4
use std::collections::HashMap;
5

6
use crate::kv_router::{
7
    Indexer, KV_EVENT_SUBJECT, KvRouterConfig,
8
    protocols::{DpRank, RouterEvent, WorkerId},
9
    worker_query::WorkerQueryClient,
10
};
11
12
13
14
15
use anyhow::Result;
use dynamo_runtime::{
    component::Component, discovery::EventTransportKind, prelude::*,
    transports::event_plane::EventSubscriber,
};
16

17
/// Start a simplified background task for event consumption using the event plane.
18
19
20
///
/// This is used when local indexer mode is enabled. Unlike `start_kv_router_background`,
/// this function:
21
/// - Uses the event plane (NATS Core or ZMQ) instead of JetStream
22
23
24
25
/// - Does not support snapshots, purging, or durable consumers
/// - On worker Added: dumps worker's local indexer into router
/// - On worker Removed: removes worker from router indexer
///
26
27
28
/// This function first recovers state from all currently registered workers before
/// spawning the background task, ensuring the router is ready before returning.
///
29
/// This is appropriate when workers have local indexers enabled.
30
async fn start_kv_router_background_event_plane(
31
    component: Component,
Yan Ru Pei's avatar
Yan Ru Pei committed
32
    indexer: Indexer,
33
    transport_kind: EventTransportKind,
34
) -> Result<()> {
35
    let cancellation_token = component.drt().primary_token();
36

37
38
39
40
    // Subscribe to KV events BEFORE spawning the discovery/recovery loop.
    // This ensures no events are lost between the initial dump fetch and the
    // subscription becoming active — the tree state at fetch time is guaranteed
    // to be a subset of what the subscription will deliver.
41
42
43
44
    let mut subscriber =
        EventSubscriber::for_component_with_transport(&component, KV_EVENT_SUBJECT, transport_kind)
            .await?
            .typed::<RouterEvent>();
45
46
47
48
49
50

    // Brief delay to let the subscription fully establish with the NATS server
    // before recovery fetches the initial dump from workers.
    tokio::time::sleep(std::time::Duration::from_millis(100)).await;

    let worker_query_client = WorkerQueryClient::spawn(component.clone(), indexer.clone()).await?;
51
52
53
54
55
    let kv_event_subject = format!(
        "namespace.{}.component.{}.{}",
        component.namespace().name(),
        component.name(),
        KV_EVENT_SUBJECT
56
57
    );

58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
    match transport_kind {
        EventTransportKind::Nats => {
            tracing::info!(
                subject = %kv_event_subject,
                "KV Router using NATS Core subscription (local_indexer mode)"
            );
        }
        EventTransportKind::Zmq => {
            tracing::info!(
                subject = %kv_event_subject,
                "KV Router using ZMQ event plane subscription (local_indexer mode)"
            );
        }
    }

73
    tokio::spawn(async move {
74
75
76
        // Track last received event ID per (worker, dp_rank) for gap detection
        // Each dp_rank has its own monotonic event ID sequence
        let mut last_event_ids: HashMap<(WorkerId, DpRank), u64> = HashMap::new();
77
78
79
80
81
82

        loop {
            tokio::select! {
                biased;

                _ = cancellation_token.cancelled() => {
83
                    tracing::debug!("KV Router event plane background task received cancellation signal");
84
85
86
                    break;
                }

87
88
89
90
                // Handle event consumption from event plane subscription
                Some(result) = subscriber.next() => {
                    let (envelope, event) = match result {
                        Ok((envelope, event)) => (envelope, event),
91
                        Err(e) => {
92
                            tracing::warn!("Failed to receive RouterEvent from event plane: {e:?}");
93
94
95
96
97
                            continue;
                        }
                    };

                    let worker_id = event.worker_id;
98
                    let dp_rank = event.event.dp_rank;
99
                    let event_id = event.event.event_id;
100
                    let event_key = (worker_id, dp_rank);
101

102
103
104
105
106
107
                    tracing::trace!(
                        "Received event from publisher {} (seq {})",
                        envelope.publisher_id,
                        envelope.sequence
                    );

108
                    // Gap detection: check if event ID is monotonically increasing per (worker, dp_rank)
109
                    // Note: event_id <= last_id is duplicate/out-of-order, apply anyway (idempotent)
110
                    if let Some(&last_id) = last_event_ids.get(&event_key)
111
112
113
114
                        && event_id > last_id + 1
                    {
                        let gap_start = last_id + 1;
                        let gap_end = event_id - 1;
115
                        let gap_size = gap_end - gap_start + 1;
116
                        tracing::warn!(
117
                            "Event ID gap detected for worker {worker_id} dp_rank {dp_rank}, recovering events [{gap_start}, {gap_end}], gap_size: {gap_size}"
118
119
                        );

120
                        if let Err(e) = worker_query_client
121
                            .recover_from_worker(worker_id, dp_rank, Some(gap_start), Some(gap_end))
122
123
                            .await
                        {
124
                            tracing::error!(
125
                                "Failed to recover gap events for worker {worker_id} dp_rank {dp_rank} (gap_start: {gap_start}, gap_end: {gap_end}); proceeding with current event anyway: {e}"
126
127
128
129
130
131
                            );
                        }
                    }

                    // Update last seen event ID (use max to handle out-of-order)
                    last_event_ids
132
                        .entry(event_key)
133
134
135
136
                        .and_modify(|id| *id = (*id).max(event_id))
                        .or_insert(event_id);

                    // Forward the RouterEvent to the indexer
Yan Ru Pei's avatar
Yan Ru Pei committed
137
                    indexer.apply_event(event).await;
138
139
140
141
                }
            }
        }

142
        tracing::debug!("KV Router event plane background task exiting");
143
144
145
146
147
    });

    Ok(())
}

148
149
150
151
/// Helper to decide which subscriber (JetStream or Event Plane) to start based on config
pub async fn start_subscriber(
    component: Component,
    kv_router_config: &KvRouterConfig,
Yan Ru Pei's avatar
Yan Ru Pei committed
152
    indexer: Indexer,
153
154
155
156
157
158
159
) -> Result<()> {
    let transport_kind = EventTransportKind::from_env_or_default();

    // Start subscriber - durable_kv_events flag determines the mode:
    // - durable_kv_events=false (default): Use NATS Core / generic event plane (requires workers to have local_indexer enabled)
    // - durable_kv_events=true: Use JetStream for durability and multi-replica consistency
    if kv_router_config.durable_kv_events {
160
161
162
163
        tracing::warn!(
            "--durable-kv-events is deprecated and will be removed in a future release. \
             The event-plane subscriber (local_indexer mode) is now the recommended path."
        );
164
165
166
167
168
169
170
        if transport_kind == EventTransportKind::Zmq {
            tracing::warn!(
                "--durable-kv-events requires NATS, but ZMQ event plane is configured; falling back to JetStream anyway"
            );
        }
        tracing::info!("Using JetStream subscription (--durable-kv-events enabled)");

171
        let consumer_id = component.drt().discovery().instance_id().to_string();
172
173
174
175
176
177
178
        super::jetstream::start_kv_router_background(
            component,
            consumer_id,
            indexer,
            kv_router_config,
        )
        .await
179
180
181
182
183
184
185
186
187
188
189
190
191
192
    } else {
        if transport_kind == EventTransportKind::Zmq {
            if kv_router_config.router_snapshot_threshold.is_some()
                || kv_router_config.router_reset_states
            {
                tracing::warn!(
                    "ZMQ event plane does not support KV snapshots or state reset; ignoring snapshot/reset settings"
                );
            }
            tracing::info!("Using ZMQ event plane subscription (local_indexer mode)");
        } else {
            tracing::info!("Using NATS Core subscription (local_indexer mode)");
        }

193
        start_kv_router_background_event_plane(component, indexer, transport_kind).await
194
195
    }
}