onboarding.rs 5.02 KB
Newer Older
Ryan Olson's avatar
Ryan Olson committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

use anyhow::Result;
use tokio::sync::mpsc;

use super::session::SessionId;
use super::types::StagingMode;

/// Status of an onboarding operation.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum OnboardingStatus {
    /// Searching for blocks (local or remote).
    Searching,

    /// Holding blocks without staging (StagingMode::Hold).
    /// Provides location breakdown for cost analysis.
    /// - `local_g2`: number of blocks in local G2 (ready to use)
    /// - `local_g3`: number of blocks in local G3 (needs local staging)
    /// - `remote_g2`: number of blocks in remote G2 (needs RDMA pull)
    /// - `remote_g3`: number of blocks in remote G3 (needs remote staging + RDMA)
    /// - `pending_g4`: number of blocks with G4 load in progress
    /// - `loaded_g4`: number of blocks successfully loaded from G4 (included in local_g2)
    /// - `failed_g4`: number of blocks that failed to load from G4
    Holding {
        local_g2: usize,
        local_g3: usize,
        remote_g2: usize,
        remote_g3: usize,
        pending_g4: usize,
        loaded_g4: usize,
        failed_g4: usize,
    },

    /// Preparing: staging G3→G2 (StagingMode::Prepare or Full).
    /// - `matched`: total number of blocks matched during search
    /// - `staging_local`: number of local G3→G2 transfers in progress
    /// - `staging_remote`: number of remote G3→G2 transfers in progress
    Preparing {
        matched: usize,
        staging_local: usize,
        staging_remote: usize,
    },

    /// Prepared: all blocks in G2, session still alive (StagingMode::Prepare).
    /// - `local_g2`: number of blocks in local G2
    /// - `remote_g2`: number of blocks in remote G2 instances
    Prepared { local_g2: usize, remote_g2: usize },

    /// Staging: full mode with RDMA pulls (StagingMode::Full).
    /// - `matched`: total number of blocks matched
    /// - `staging_local`: local G3→G2 in progress
    /// - `staging_remote`: remote G3→G2 in progress
    /// - `pulling`: remote G2→local G2 (RDMA) in progress
    Staging {
        matched: usize,
        staging_local: usize,
        staging_remote: usize,
        pulling: usize,
    },

    /// Operation complete - all blocks are in initiator's G2 (StagingMode::Full).
    /// Or terminal state for Hold/Prepare modes.
    /// - `matched`: total number of blocks in local G2
    Complete { matched_blocks: usize },
}

/// Control commands for managing live sessions.
#[derive(Debug)]
pub(crate) enum SessionControl {
    /// Trigger prepare operation (Hold → Prepare): stage all G3→G2
    Prepare,

    /// Trigger pull operation (Prepare → Full): RDMA pull remote G2→local G2
    Pull,

    /// Cancel session and release all blocks
    Cancel,

    /// Shutdown session (normal completion)
    Shutdown,
}

/// Handle to a live onboarding session for deferred operations.
///
/// Only available for StagingMode::Hold and StagingMode::Prepare.
#[derive(Debug)]
pub struct SessionHandle {
    session_id: SessionId,
    mode: StagingMode,
    control_tx: mpsc::Sender<SessionControl>,
}

impl SessionHandle {
    pub(crate) fn new(
        session_id: SessionId,
        mode: StagingMode,
        control_tx: mpsc::Sender<SessionControl>,
    ) -> Self {
        Self {
            session_id,
            mode,
            control_tx,
        }
    }

    /// Get the session ID.
    pub fn session_id(&self) -> SessionId {
        self.session_id
    }

    /// Get the current staging mode.
    pub fn mode(&self) -> StagingMode {
        self.mode
    }

    /// Trigger G3→G2 staging on all instances (Hold → Prepare).
    ///
    /// The server validates that the session is in Hold mode before processing.
    /// After this completes, the session transitions to Prepare mode internally.
    pub async fn prepare(&self) -> Result<()> {
        self.control_tx
            .send(SessionControl::Prepare)
            .await
            .map_err(|_| anyhow::anyhow!("session task has exited"))
    }

    /// Trigger RDMA pull from remote G2→local G2 (Prepare → Complete).
    ///
    /// The server validates that the session is in Prepare mode before processing.
    /// After this completes, the session transitions to Complete status.
    pub async fn pull(&self) -> Result<()> {
        self.control_tx
            .send(SessionControl::Pull)
            .await
            .map_err(|_| anyhow::anyhow!("session task has exited"))
    }

    /// Cancel session and release all held blocks.
    pub async fn cancel(&self) -> Result<()> {
        self.control_tx
            .send(SessionControl::Cancel)
            .await
            .map_err(|_| anyhow::anyhow!("session task has exited"))
    }

    /// Shutdown session (used internally).
    #[expect(dead_code)]
    pub(crate) async fn shutdown(&self) -> Result<()> {
        self.control_tx
            .send(SessionControl::Shutdown)
            .await
            .map_err(|_| anyhow::anyhow!("session task has exited"))
    }
}