// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. // SPDX-License-Identifier: Apache-2.0 use std::ops::Range; use std::sync::Arc; use serde::{Deserialize, Serialize}; use crate::{BlockId, G2, InstanceId, SequenceHash}; use kvbm_logical::blocks::ImmutableBlock; use kvbm_physical::manager::LayoutHandle; use super::SessionId; /// Messages exchanged between leaders during onboarding sessions. /// /// Phase 2 protocol (G2-only): /// 1. Initiator sends CreateSession to multiple responders /// 2. Each responder searches local G2 and sends G2Results back /// 3. Initiator applies first-responder-wins and sends HoldBlocks to each /// 4. Responders send Acknowledged after releasing unwanted blocks /// /// Phase 3 protocol (G3 staging): /// 5. Responders search G3 and send G3Results /// 6. Initiator sends StageBlocks with blocks to stage G3->G2 /// 7. Responders stage blocks and send BlocksReady when complete #[derive(Debug, Clone, Serialize, Deserialize)] pub enum OnboardMessage { /// Initiator creates a new onboarding session. CreateSession { requester: InstanceId, session_id: SessionId, sequence_hashes: Vec, }, /// Responder signals local search (G2 and G3) is complete. SearchComplete { responder: InstanceId, session_id: SessionId, }, /// Responder reports G2 search results. /// - sequence_hashes: ordered list of matched sequence hashes /// - block_ids: parallel list of block IDs (can be zipped with sequence_hashes) G2Results { responder: InstanceId, session_id: SessionId, sequence_hashes: Vec, block_ids: Vec, }, /// Responder reports G3 search results. /// - sequence_hashes: ordered list of matched sequence hashes (no block IDs) G3Results { responder: InstanceId, session_id: SessionId, sequence_hashes: Vec, }, /// Initiator tells responder which sequence hashes to hold/drop. /// Works across G2 and G3 tiers. HoldBlocks { requester: InstanceId, session_id: SessionId, hold_hashes: Vec, drop_hashes: Vec, }, /// Initiator tells responder which G3 sequence hashes to stage to G2. /// Any G3 blocks with these hashes should be staged to G2. StageBlocks { requester: InstanceId, session_id: SessionId, stage_hashes: Vec, }, /// Responder reports newly staged blocks are ready in G2 (after G3->G2 staging). /// Only reports blocks that were just staged, not all G2 blocks. /// - sequence_hashes: newly staged blocks /// - block_ids: parallel to sequence_hashes BlocksReady { responder: InstanceId, session_id: SessionId, sequence_hashes: Vec, block_ids: Vec, }, /// Responder acknowledges hold/drop request. Acknowledged { responder: InstanceId, session_id: SessionId, }, /// Initiator tells responder to release specific sequence hashes that weren't selected. /// Works across G2 and G3 tiers. ReleaseBlocks { requester: InstanceId, session_id: SessionId, release_hashes: Vec, }, /// Initiator tells responder session is complete, responder can cleanup. CloseSession { requester: InstanceId, session_id: SessionId, }, // ========================================================================= // G4/Object Storage Messages (Internal - not sent over network) // ========================================================================= /// G4 search results from object storage `has_blocks`. /// /// Internal message sent via mpsc channel from the G4 search task /// to the initiator session. Contains hashes found in object storage /// with their sizes. G4Results { session_id: SessionId, /// Hashes found in G4 with their sizes in bytes found_hashes: Vec<(SequenceHash, usize)>, }, /// G4 load completion results from object storage `get_blocks`. /// /// Internal message sent via mpsc channel from the G4 load task /// to the initiator session. Contains per-block success/failure. G4LoadComplete { session_id: SessionId, /// Successfully loaded hashes success: Vec, /// Failed hashes with error messages failures: Vec<(SequenceHash, String)>, /// Successfully loaded and registered G2 blocks. /// These are ready to be added to local_g2_blocks. /// Wrapped in Arc for Clone derivation (internal message only). #[serde(skip)] blocks: Arc>>, }, // TODO: Add heartbeat/TTL mechanism for handling unresponsive initiators // Heartbeat { // requester: InstanceId, // session_id: SessionId, // timestamp: u64, // }, // TTL resets with each heartbeat. If TTL expires: // - Responder releases all held blocks // - Responder cleans up session state // - Session task exits } /// Represents a block match found during search. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct BlockMatch { pub sequence_hash: SequenceHash, pub block_id: BlockId, } impl OnboardMessage { /// Extract the session ID from any message variant. pub fn session_id(&self) -> SessionId { match self { OnboardMessage::CreateSession { session_id, .. } | OnboardMessage::SearchComplete { session_id, .. } | OnboardMessage::G2Results { session_id, .. } | OnboardMessage::G3Results { session_id, .. } | OnboardMessage::HoldBlocks { session_id, .. } | OnboardMessage::StageBlocks { session_id, .. } | OnboardMessage::BlocksReady { session_id, .. } | OnboardMessage::Acknowledged { session_id, .. } | OnboardMessage::ReleaseBlocks { session_id, .. } | OnboardMessage::CloseSession { session_id, .. } | OnboardMessage::G4Results { session_id, .. } | OnboardMessage::G4LoadComplete { session_id, .. } => *session_id, } } /// Extract the requester/responder instance ID from the message. /// /// # Panics /// Panics if called on G4 messages (internal only, no instance ID). pub fn instance_id(&self) -> InstanceId { match self { OnboardMessage::CreateSession { requester, .. } | OnboardMessage::HoldBlocks { requester, .. } | OnboardMessage::StageBlocks { requester, .. } | OnboardMessage::ReleaseBlocks { requester, .. } | OnboardMessage::CloseSession { requester, .. } => *requester, OnboardMessage::SearchComplete { responder, .. } | OnboardMessage::G2Results { responder, .. } | OnboardMessage::G3Results { responder, .. } | OnboardMessage::BlocksReady { responder, .. } | OnboardMessage::Acknowledged { responder, .. } => *responder, OnboardMessage::G4Results { .. } | OnboardMessage::G4LoadComplete { .. } => { panic!("G4 messages are internal and do not have an instance ID") } } } /// Get the variant name as a string for logging. pub fn variant_name(&self) -> &'static str { match self { OnboardMessage::CreateSession { .. } => "CreateSession", OnboardMessage::SearchComplete { .. } => "SearchComplete", OnboardMessage::G2Results { .. } => "G2Results", OnboardMessage::G3Results { .. } => "G3Results", OnboardMessage::HoldBlocks { .. } => "HoldBlocks", OnboardMessage::StageBlocks { .. } => "StageBlocks", OnboardMessage::BlocksReady { .. } => "BlocksReady", OnboardMessage::Acknowledged { .. } => "Acknowledged", OnboardMessage::ReleaseBlocks { .. } => "ReleaseBlocks", OnboardMessage::CloseSession { .. } => "CloseSession", OnboardMessage::G4Results { .. } => "G4Results", OnboardMessage::G4LoadComplete { .. } => "G4LoadComplete", } } } // ============================================================================= // Unified Session Protocol // ============================================================================= // // These types support the unified session model where sessions can dynamically // transition between control roles. use super::state::{ControlRole, SessionPhase}; /// Unified session message protocol. /// /// Unified, bidirectional protocol that supports dynamic control transfer. /// /// # Protocol Overview /// /// 1. **Connection**: `Attach`/`Detach` for peer management /// 2. **Control Transfer**: `YieldControl`/`AcquireControl` for bidirectional role changes /// 3. **Block Operations**: Commands from controller to controllee /// 4. **State Sync**: Responses from controllee to controller /// 5. **Lifecycle**: `Close`/`Error` for termination #[derive(Debug, Clone, Serialize, Deserialize)] pub enum SessionMessage { // ========================================================================= // Connection Management // ========================================================================= /// Attach to a session as a specific role. /// /// Sent by a peer to establish the session relationship. Attach { /// The instance ID of the attaching peer. peer: InstanceId, /// The session to attach to. session_id: SessionId, /// The role this peer will assume (typically `Controllee` or `Controller`). as_role: ControlRole, }, /// Detach from a session. /// /// Graceful disconnection. The session may continue if other peers are attached. Detach { /// The instance ID of the detaching peer. peer: InstanceId, /// The session to detach from. session_id: SessionId, }, // ========================================================================= // Control Transfer (Bidirectional) // ========================================================================= /// Yield control to peer. /// /// The sender transitions from `Controller` to `Neutral`. /// The receiver (if in `Controllee`) can then `AcquireControl` or remain passive. YieldControl { /// The instance ID of the yielding peer. peer: InstanceId, /// The session. session_id: SessionId, }, /// Acquire control from peer. /// /// The sender attempts to become `Controller`. /// Valid when sender is `Neutral` or `Controllee` and peer is `Neutral`. AcquireControl { /// The instance ID of the peer acquiring control. peer: InstanceId, /// The session. session_id: SessionId, }, // ========================================================================= // Block Operations (Controller → Controllee) // ========================================================================= /// Trigger staging of blocks (e.g., G3→G2). TriggerStaging { /// The session. session_id: SessionId, }, /// Request that specific blocks be held (kept alive). HoldBlocks { /// The session. session_id: SessionId, /// Sequence hashes of blocks to hold. hold_hashes: Vec, }, /// Release specific blocks (they can now be evicted). ReleaseBlocks { /// The session. session_id: SessionId, /// Sequence hashes of blocks to release. release_hashes: Vec, }, /// Notify that blocks have been pulled via RDMA. /// /// The controllee can release these blocks from its hold. BlocksPulled { /// The session. session_id: SessionId, /// Sequence hashes of blocks that were pulled. pulled_hashes: Vec, }, // ========================================================================= // State Synchronization (Controllee → Controller) // ========================================================================= /// Full state snapshot. /// /// Sent after attachment and periodically on state changes. StateResponse { /// The session. session_id: SessionId, /// Complete state snapshot. state: SessionStateSnapshot, }, /// Notification that blocks have been staged. /// /// This message supports layerwise transfer by optionally specifying /// which layer range is ready. When `layer_range` is `None`, all layers /// of the staged blocks are ready for transfer. BlocksStaged { /// The session. session_id: SessionId, /// Newly staged blocks (now in target tier). staged_blocks: Vec, /// Count of blocks remaining to stage. remaining: usize, /// Layer range that is ready for transfer. /// /// - `None`: All layers are ready (default behavior) /// - `Some(0..1)`: Only layer 0 is ready /// - `Some(0..60)`: Layers 0-59 are ready /// /// This enables layerwise streaming where the sender computes /// layer-by-layer and notifies the receiver as each layer completes. layer_range: Option>, }, // ========================================================================= // Lifecycle // ========================================================================= /// Close the session gracefully. Close { /// The session. session_id: SessionId, }, /// Report an error. Error { /// The session. session_id: SessionId, /// Error description. message: String, }, } impl SessionMessage { /// Extract the session ID from any message variant. pub fn session_id(&self) -> SessionId { match self { SessionMessage::Attach { session_id, .. } | SessionMessage::Detach { session_id, .. } | SessionMessage::YieldControl { session_id, .. } | SessionMessage::AcquireControl { session_id, .. } | SessionMessage::TriggerStaging { session_id, .. } | SessionMessage::HoldBlocks { session_id, .. } | SessionMessage::ReleaseBlocks { session_id, .. } | SessionMessage::BlocksPulled { session_id, .. } | SessionMessage::StateResponse { session_id, .. } | SessionMessage::BlocksStaged { session_id, .. } | SessionMessage::Close { session_id, .. } | SessionMessage::Error { session_id, .. } => *session_id, } } /// Extract the peer instance ID if present. pub fn peer(&self) -> Option { match self { SessionMessage::Attach { peer, .. } | SessionMessage::Detach { peer, .. } | SessionMessage::YieldControl { peer, .. } | SessionMessage::AcquireControl { peer, .. } => Some(*peer), _ => None, } } /// Check if this is a control command (sent by controller). pub fn is_control_command(&self) -> bool { matches!( self, SessionMessage::TriggerStaging { .. } | SessionMessage::HoldBlocks { .. } | SessionMessage::ReleaseBlocks { .. } | SessionMessage::BlocksPulled { .. } ) } /// Check if this is a state response (sent by controllee). pub fn is_state_response(&self) -> bool { matches!( self, SessionMessage::StateResponse { .. } | SessionMessage::BlocksStaged { .. } ) } /// Get the variant name as a string for logging. pub fn variant_name(&self) -> &'static str { match self { SessionMessage::Attach { .. } => "Attach", SessionMessage::Detach { .. } => "Detach", SessionMessage::YieldControl { .. } => "YieldControl", SessionMessage::AcquireControl { .. } => "AcquireControl", SessionMessage::TriggerStaging { .. } => "TriggerStaging", SessionMessage::HoldBlocks { .. } => "HoldBlocks", SessionMessage::ReleaseBlocks { .. } => "ReleaseBlocks", SessionMessage::BlocksPulled { .. } => "BlocksPulled", SessionMessage::StateResponse { .. } => "StateResponse", SessionMessage::BlocksStaged { .. } => "BlocksStaged", SessionMessage::Close { .. } => "Close", SessionMessage::Error { .. } => "Error", } } } /// Complete session state snapshot. /// /// Sent in `SessionMessage::StateResponse` to provide the controller /// with full visibility into the controllee's state. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct SessionStateSnapshot { /// Current session phase. pub phase: SessionPhase, /// Current control role of the sender. pub control_role: ControlRole, /// Blocks currently in G2 (ready for RDMA pull). pub g2_blocks: Vec, /// Count of blocks pending staging to G2. pub g3_pending: usize, /// Layer range that is ready for transfer. /// /// - `None`: All layers are ready (or not applicable) /// - `Some(0..1)`: Only layer 0 is ready /// - `Some(0..60)`: Layers 0-59 are ready /// /// This is updated when receiving `BlocksStaged` messages with `layer_range`. /// The controller can use this to know which layers can be pulled. #[serde(default)] pub ready_layer_range: Option>, } /// Block information for session messages. /// /// Contains the metadata needed to identify and transfer a block. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct BlockInfo { /// Physical block ID in the layout. pub block_id: BlockId, /// Logical sequence hash. pub sequence_hash: SequenceHash, /// Layout handle for RDMA operations. pub layout_handle: LayoutHandle, }