transport.rs 6.28 KB
Newer Older
Ryan Olson's avatar
Ryan Olson committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

use ::velo::Messenger;
use anyhow::Result;
use bytes::Bytes;
use dashmap::DashMap;

use std::sync::Arc;

use crate::InstanceId;
use kvbm_physical::manager::SerializedLayout;

use super::{
    OnboardSessionTx, SessionId, SessionMessageTx, dispatch_onboard_message,
    messages::{OnboardMessage, SessionMessage},
};

/// Transport abstraction for sending onboarding messages without boxing futures.
///
/// This enum allows sessions to work with different transport mechanisms:
/// - Velo (distributed): Uses Velo active messages
/// - Local (testing): Direct channel dispatch
pub enum MessageTransport {
    Velo(VeloTransport),
    Local(LocalTransport),
}

impl MessageTransport {
    pub fn velo(messenger: Arc<Messenger>) -> Self {
        Self::Velo(VeloTransport::new(messenger))
    }

    pub fn local(
        sessions: Arc<DashMap<SessionId, OnboardSessionTx>>,
        session_sessions: Arc<DashMap<SessionId, SessionMessageTx>>,
    ) -> Self {
        Self::Local(LocalTransport::new(sessions, session_sessions))
    }

    /// Send an OnboardMessage to a target instance.
    pub async fn send(&self, target: InstanceId, message: OnboardMessage) -> Result<()> {
        match self {
            MessageTransport::Velo(transport) => transport.send(target, message).await,
            MessageTransport::Local(transport) => transport.send(target, message).await,
        }
    }

    /// Request worker metadata from a remote leader for RDMA transfers.
    ///
    /// This makes a synchronous RPC call to the remote leader's export_metadata
    /// handler and returns the `Vec<SerializedLayout>` from all remote workers.
    pub async fn request_metadata(&self, target: InstanceId) -> Result<Vec<SerializedLayout>> {
        match self {
            MessageTransport::Velo(transport) => transport.request_metadata(target).await,
            MessageTransport::Local(_) => {
                anyhow::bail!("request_metadata not supported for local transport")
            }
        }
    }

    /// Send a SessionMessage to a target instance.
    ///
    /// This is the unified session message protocol used for all session communication.
    pub async fn send_session(&self, target: InstanceId, message: SessionMessage) -> Result<()> {
        match self {
            MessageTransport::Velo(transport) => transport.send_session(target, message).await,
            MessageTransport::Local(transport) => transport.send_session(target, message).await,
        }
    }
}

/// Velo-based transport using active messages (fire-and-forget).
pub struct VeloTransport {
    messenger: Arc<Messenger>,
}

impl VeloTransport {
    pub fn new(messenger: Arc<Messenger>) -> Self {
        Self { messenger }
    }

    pub async fn send(&self, target: InstanceId, message: OnboardMessage) -> Result<()> {
        tracing::debug!(
            msg = message.variant_name(),
            target = %target,
            "Sending message"
        );

        let bytes = Bytes::from(serde_json::to_vec(&message)?);

        self.messenger
            .am_send("kvbm.leader.onboard")?
            .raw_payload(bytes)
            .instance(target)
            .send()
            .await?;

        tracing::debug!(target = %target, "Successfully sent");

        Ok(())
    }

    /// Request worker metadata from a remote leader for RDMA transfers.
    ///
    /// Makes a unary RPC call to get `Vec<SerializedLayout>` from
    /// the remote leader's workers.
    pub async fn request_metadata(&self, target: InstanceId) -> Result<Vec<SerializedLayout>> {
        tracing::debug!(target = %target, "Requesting metadata from instance");

        let response: Bytes = self
            .messenger
            .unary("kvbm.leader.export_metadata")?
            .instance(target)
            .send()
            .await?;

        // Deserialize the response
        let metadata: Vec<SerializedLayout> = serde_json::from_slice(&response)?;

        tracing::debug!(
            count = metadata.len(),
            target = %target,
            "Received metadata entries"
        );

        Ok(metadata)
    }

    /// Send a SessionMessage to a target instance.
    ///
    /// Uses the unified "kvbm.leader.session" handler.
    pub async fn send_session(&self, target: InstanceId, message: SessionMessage) -> Result<()> {
        tracing::debug!(
            msg = message.variant_name(),
            target = %target,
            "Sending Session"
        );

        let bytes = Bytes::from(serde_json::to_vec(&message)?);

        self.messenger
            .am_send("kvbm.leader.session")?
            .raw_payload(bytes)
            .instance(target)
            .send()
            .await?;

        tracing::debug!(target = %target, "Successfully sent session msg");

        Ok(())
    }
}

/// Local transport for testing or same-instance communication.
///
/// Directly dispatches messages to session channels without network overhead.
pub struct LocalTransport {
    sessions: Arc<DashMap<SessionId, OnboardSessionTx>>,
    /// Unified session message receivers.
    session_sessions: Arc<DashMap<SessionId, SessionMessageTx>>,
}

impl LocalTransport {
    pub fn new(
        sessions: Arc<DashMap<SessionId, OnboardSessionTx>>,
        session_sessions: Arc<DashMap<SessionId, SessionMessageTx>>,
    ) -> Self {
        Self {
            sessions,
            session_sessions,
        }
    }

    pub async fn send(&self, _target: InstanceId, message: OnboardMessage) -> Result<()> {
        dispatch_onboard_message(&self.sessions, message).await
    }

    /// Send a SessionMessage (unified protocol).
    ///
    /// Routes to session_sessions by session ID.
    pub async fn send_session(&self, _target: InstanceId, message: SessionMessage) -> Result<()> {
        let session_id = message.session_id();

        let sender = self
            .session_sessions
            .get(&session_id)
            .map(|entry| entry.value().clone());
        if let Some(sender) = sender {
            sender
                .send(message)
                .await
                .map_err(|e| anyhow::anyhow!("failed to send to session {session_id}: {e}"))?;
            return Ok(());
        }

        anyhow::bail!("no session registered for session {session_id}");
    }
}