subscriber.rs 6.25 KB
Newer Older
1
// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2
3
// SPDX-License-Identifier: Apache-2.0

4
use crate::kv_router::{
5
    Indexer, KV_EVENT_SUBJECT, KvRouterConfig, protocols::RouterEvent,
6
    worker_query::WorkerQueryClient,
7
};
8
9
10
11
12
use anyhow::Result;
use dynamo_runtime::{
    component::Component, discovery::EventTransportKind, prelude::*,
    transports::event_plane::EventSubscriber,
};
13

14
/// Start a simplified background task for event consumption using the event plane.
15
16
17
///
/// This is used when local indexer mode is enabled. Unlike `start_kv_router_background`,
/// this function:
18
/// - Uses the event plane (NATS Core or ZMQ) instead of JetStream
19
20
21
22
23
/// - Does not support snapshots, purging, or durable consumers
/// - On worker Added: dumps worker's local indexer into router
/// - On worker Removed: removes worker from router indexer
///
/// This is appropriate when workers have local indexers enabled.
24
async fn start_kv_router_background_event_plane(
25
    component: Component,
Yan Ru Pei's avatar
Yan Ru Pei committed
26
    indexer: Indexer,
27
    transport_kind: EventTransportKind,
28
) -> Result<()> {
29
    let cancellation_token = component.drt().primary_token();
30

31
32
33
34
    // Subscribe to KV events BEFORE spawning the discovery/recovery loop.
    // This ensures no events are lost between the initial dump fetch and the
    // subscription becoming active — the tree state at fetch time is guaranteed
    // to be a subset of what the subscription will deliver.
35
36
37
38
    let mut subscriber =
        EventSubscriber::for_component_with_transport(&component, KV_EVENT_SUBJECT, transport_kind)
            .await?
            .typed::<RouterEvent>();
39
40
41
42
43

    // Brief delay to let the subscription fully establish with the NATS server
    // before recovery fetches the initial dump from workers.
    tokio::time::sleep(std::time::Duration::from_millis(100)).await;

44
45
46
    // WorkerQueryClient handles its own discovery loop for lifecycle + initial recovery.
    // No blocking wait — recovery happens asynchronously as endpoints are discovered.
    let worker_query_client = WorkerQueryClient::spawn(component.clone(), indexer).await?;
47
48
49
50
51
    let kv_event_subject = format!(
        "namespace.{}.component.{}.{}",
        component.namespace().name(),
        component.name(),
        KV_EVENT_SUBJECT
52
53
    );

54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
    match transport_kind {
        EventTransportKind::Nats => {
            tracing::info!(
                subject = %kv_event_subject,
                "KV Router using NATS Core subscription (local_indexer mode)"
            );
        }
        EventTransportKind::Zmq => {
            tracing::info!(
                subject = %kv_event_subject,
                "KV Router using ZMQ event plane subscription (local_indexer mode)"
            );
        }
    }

69
70
71
72
73
74
    tokio::spawn(async move {
        loop {
            tokio::select! {
                biased;

                _ = cancellation_token.cancelled() => {
75
                    tracing::debug!("KV Router event plane background task received cancellation signal");
76
77
78
                    break;
                }

79
80
81
82
                // Handle event consumption from event plane subscription
                Some(result) = subscriber.next() => {
                    let (envelope, event) = match result {
                        Ok((envelope, event)) => (envelope, event),
83
                        Err(e) => {
84
                            tracing::warn!("Failed to receive RouterEvent from event plane: {e:?}");
85
86
87
88
                            continue;
                        }
                    };

89
90
91
92
93
94
                    tracing::trace!(
                        "Received event from publisher {} (seq {})",
                        envelope.publisher_id,
                        envelope.sequence
                    );

95
96
97
98
99
100
101
                    tracing::trace!(
                        "Forwarding live event to recovery coordinator for worker {} dp_rank {} event_id {}",
                        event.worker_id,
                        event.event.dp_rank,
                        event.event.event_id
                    );
                    worker_query_client.handle_live_event(event).await;
102
103
104
105
                }
            }
        }

106
        tracing::debug!("KV Router event plane background task exiting");
107
108
109
110
111
    });

    Ok(())
}

112
113
114
115
/// Helper to decide which subscriber (JetStream or Event Plane) to start based on config
pub async fn start_subscriber(
    component: Component,
    kv_router_config: &KvRouterConfig,
Yan Ru Pei's avatar
Yan Ru Pei committed
116
    indexer: Indexer,
117
118
119
120
121
122
123
) -> Result<()> {
    let transport_kind = EventTransportKind::from_env_or_default();

    // Start subscriber - durable_kv_events flag determines the mode:
    // - durable_kv_events=false (default): Use NATS Core / generic event plane (requires workers to have local_indexer enabled)
    // - durable_kv_events=true: Use JetStream for durability and multi-replica consistency
    if kv_router_config.durable_kv_events {
124
125
126
127
        tracing::warn!(
            "--durable-kv-events is deprecated and will be removed in a future release. \
             The event-plane subscriber (local_indexer mode) is now the recommended path."
        );
128
129
130
131
132
133
134
        if transport_kind == EventTransportKind::Zmq {
            tracing::warn!(
                "--durable-kv-events requires NATS, but ZMQ event plane is configured; falling back to JetStream anyway"
            );
        }
        tracing::info!("Using JetStream subscription (--durable-kv-events enabled)");

135
        let consumer_id = component.drt().discovery().instance_id().to_string();
136
137
138
139
140
141
142
        super::jetstream::start_kv_router_background(
            component,
            consumer_id,
            indexer,
            kv_router_config,
        )
        .await
143
144
145
146
147
148
149
150
151
152
153
154
155
156
    } else {
        if transport_kind == EventTransportKind::Zmq {
            if kv_router_config.router_snapshot_threshold.is_some()
                || kv_router_config.router_reset_states
            {
                tracing::warn!(
                    "ZMQ event plane does not support KV snapshots or state reset; ignoring snapshot/reset settings"
                );
            }
            tracing::info!("Using ZMQ event plane subscription (local_indexer mode)");
        } else {
            tracing::info!("Using NATS Core subscription (local_indexer mode)");
        }

157
        start_kv_router_background_event_plane(component, indexer, transport_kind).await
158
159
    }
}