// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. // SPDX-License-Identifier: Apache-2.0 use std::collections::HashMap; use ::velo::Messenger; use anyhow::Result; use dashmap::DashMap; use tokio::sync::{Mutex, mpsc, watch}; use uuid::Uuid; use std::sync::Arc; use crate::{ BlockId, G2, G3, InstanceId, SequenceHash, object::ObjectBlockOps, worker::RemoteDescriptor, }; use kvbm_common::LogicalLayoutHandle; use kvbm_logical::{ blocks::{BlockRegistry, ImmutableBlock}, manager::BlockManager, }; use kvbm_physical::transfer::{TransferCompleteNotification, TransferOptions}; use kvbm_physical::manager::{LayoutHandle, SerializedLayout}; use super::{ super::worker::Worker, super::worker::group::{ParallelWorkers, SpmdParallelWorkers}, AsyncSessionResult, FindMatchesOptions, FindMatchesResult, Leader, OnboardingStatus, ReadyResult, // Legacy SessionHandle for deferred operations SessionHandle as LegacySessionHandle, SessionId, StagingMode, accessor::{BlockAccessor, PolicyContext}, session::{ BlockHolder, ControlRole, ControllableSessionOptions, ControllableSessionResult, InitiatorSession, MessageTransport, OnboardMessage, OnboardSessionTx, ResponderSession, ServerSession, ServerSessionHandle, ServerSessionOptions, SessionHandle, SessionMessage, SessionMessageTx, SessionPhase, create_server_session, session_handle_state_channel, session_message_channel, }, velo::{ExportMetadataCallback, VeloLeaderService}, }; /// Primary leader implementation for the distributed KVBM system. /// /// `InstanceLeader` coordinates block onboarding across local and remote /// instances. It owns a G2 (host memory) `BlockManager` and an optional G3 /// (disk) `BlockManager`, a set of workers for executing physical transfers, /// and a parallel worker abstraction for multi-rank RDMA operations. /// /// Key responsibilities: /// - **Block matching**: finding which requested sequence hashes are already /// cached locally (via `BlockAccessor` policies). /// - **Session management**: creating, attaching, and driving onboard sessions /// between endpoint (source) and controller (destination) roles. /// - **Remote connectivity**: exchanging serialized layout metadata with peer /// instances so workers can perform RDMA transfers. /// - **Velo RPC**: registering handlers via `VeloLeaderService` so remote /// leaders can initiate sessions and exchange metadata. #[derive(Clone)] pub struct InstanceLeader { /// Nova instance for distributed communication. messenger: Arc, /// Block registry for deduplication. #[allow(dead_code)] pub(crate) registry: BlockRegistry, /// G2 (host memory) block manager (wrapped in Arc since BlockManager doesn't implement Clone). pub(crate) g2_manager: Arc>, /// Optional G3 (disk) block manager pub(crate) g3_manager: Option>>, /// Workers for executing transfers (at least 1 required). /// Multiple workers enable parallel transfers and redundancy. workers: Vec>, /// Parallel worker abstraction wrapping the workers. /// Used for RDMA transfers with proper handle mapping storage. parallel_worker: Option>, /// Map of active sessions (session_id -> message channel). sessions: Arc>, /// Cached worker metadata (avoids querying workers repeatedly). cached_worker_metadata: Option>, /// Map of session states for holding blocks alive (RAII). session_states: Arc>, /// List of remote leader instance IDs (mutable for post-construction configuration). remote_leaders: Arc>>, /// Message transport for session communication. transport: Arc, // ======================================================================== // Unified Session Protocol // ======================================================================== /// Map of session message receivers. /// Used by SessionHandle/SessionEndpoint/ControllableSession. session_sessions: Arc>, // ======================================================================== // G4/Object Storage // ======================================================================== /// Object storage client for G4 search and load operations. /// Leader calls has_blocks on S3 directly, coordinates workers for get_blocks. object_client: Option>, } /// Builder for InstanceLeader. #[derive(Default)] pub struct InstanceLeaderBuilder { messenger: Option>, registry: Option, g2_manager: Option>>, g3_manager: Option>>, workers: Vec>, sessions: Option>>, remote_leaders: Option>, cached_worker_metadata: Option>, object_client: Option>, } impl InstanceLeaderBuilder { /// Initialize builder with components from KvbmRuntime. /// /// This extracts Nova from the runtime. Use this when the runtime /// has already been constructed and you want the leader to share /// the same Nova instance for distributed communication. /// /// # Example /// ```ignore /// let runtime = KvbmRuntime::from_env_leader().await?; /// let leader = InstanceLeaderBuilder::default() /// .with_runtime(&runtime) /// .g2_manager(g2_manager) /// .build()?; /// ``` pub fn with_runtime(self, runtime: &crate::KvbmRuntime) -> Self { self.messenger(runtime.messenger().clone()) } pub fn messenger(mut self, messenger: Arc) -> Self { self.messenger = Some(messenger); self } pub fn registry(mut self, registry: BlockRegistry) -> Self { self.registry = Some(registry); self } pub fn with_g2_manager(mut self, manager: Option>) -> Self { self.g2_manager = manager.map(Arc::new); self } pub fn with_g3_manager(mut self, manager: Option>) -> Self { self.g3_manager = manager.map(Arc::new); self } pub fn g2_manager(mut self, manager: Arc>) -> Self { self.g2_manager = Some(manager); self } pub fn g3_manager(mut self, manager: Arc>) -> Self { self.g3_manager = Some(manager); self } /// Add a single worker (convenience method). pub fn worker(mut self, worker: Arc) -> Self { self.workers.push(worker); self } /// Set all workers at once. pub fn workers(mut self, workers: Vec>) -> Self { self.workers = workers; self } pub fn remote_leaders(mut self, leaders: Vec) -> Self { self.remote_leaders = Some(leaders); self } /// Cache worker metadata upfront to avoid querying workers later. /// /// This is useful when workers have already exported metadata during initialization /// (e.g., in the connector pattern where workers return metadata in their init response). pub fn with_cached_worker_metadata(mut self, metadata: Vec) -> Self { self.cached_worker_metadata = Some(metadata); self } /// Set the object storage client for G4 search and load operations. /// /// The leader uses this client to: /// - Query S3 for block presence via `has_blocks` /// - Coordinate workers to load blocks from S3 via `get_blocks` pub fn object_client(mut self, client: Arc) -> Self { self.object_client = Some(client); self } pub fn build(self) -> Result { let messenger = self .messenger .ok_or_else(|| anyhow::anyhow!("Nova instance required"))?; let transport = Arc::new(MessageTransport::velo(messenger.clone())); // Create event system for notification aggregation let events = Arc::new(messenger.event_manager()); // Get current tokio runtime handle let runtime = tokio::runtime::Handle::current(); // // Validate at least one worker // if self.workers.is_empty() { // anyhow::bail!("At least one worker required"); // } // todo: we will need a common builder pattern for creating "general" parallel workers // - we could also use an enum and match as the number of types will be limited // Create parallel worker if workers are provided let parallel_worker: Option> = if !self.workers.is_empty() { Some(Arc::new(SpmdParallelWorkers::new( self.workers.to_vec(), events.clone(), runtime.clone(), ))) } else { None }; Ok(InstanceLeader { messenger, registry: self .registry .ok_or_else(|| anyhow::anyhow!("block registry required"))?, g2_manager: self .g2_manager .ok_or_else(|| anyhow::anyhow!("g2_manager required"))?, g3_manager: self.g3_manager, workers: self.workers, parallel_worker, cached_worker_metadata: self.cached_worker_metadata, sessions: self.sessions.unwrap_or_else(|| Arc::new(DashMap::new())), session_states: Arc::new(DashMap::new()), remote_leaders: Arc::new(std::sync::RwLock::new( self.remote_leaders.unwrap_or_default(), )), transport, session_sessions: Arc::new(DashMap::new()), object_client: self.object_client, }) } } /// Internal session state for holding matched blocks. #[allow(dead_code)] // Used for RAII block lifetime management struct SessionState { session_id: SessionId, matched_g2_blocks: Vec>, matched_g3_blocks: Vec>, status_tx: watch::Sender, } /// Result of scanning for blocks across tiers. /// /// Unlike `FindMatchesResult`, this scans all given hashes without stopping on first miss. /// Returns blocks found in each tier along with their sorted positions. pub struct ScanBlocksResult { /// Blocks found in G2 (host memory). pub g2_blocks: HashMap>, /// Blocks found in G3 (disk). pub g3_blocks: HashMap>, /// All found blocks sorted by position (lowest to highest). /// Each entry indicates which tier (G2/G3) the block was found in. pub sorted_matches: Vec<(SequenceHash, LogicalLayoutHandle)>, } impl InstanceLeader { /// Get a reference to the G2 BlockManager. pub fn g2_manager(&self) -> &Arc> { &self.g2_manager } /// Get a reference to the optional G3 BlockManager. pub fn g3_manager(&self) -> Option<&Arc>> { self.g3_manager.as_ref() } /// Get the block registry. pub fn registry(&self) -> &BlockRegistry { &self.registry } /// Get a reference to the Nova instance. /// /// This provides access to the Nova distributed system for features /// like event coordination and cross-instance communication. pub fn messenger(&self) -> &Arc { &self.messenger } /// Get the tokio runtime handle from Nova. /// /// This handle should be used for spawning background tasks that need to /// run on the KVBM runtime's executor (e.g., offload engine pipelines). pub fn runtime(&self) -> tokio::runtime::Handle { self.messenger.runtime().clone() } /// Check if a parallel_worker is configured. /// /// The parallel_worker is required for local transfer operations /// (e.g., offloading blocks between tiers). pub fn has_parallel_worker(&self) -> bool { self.parallel_worker.is_some() } /// Get the parallel worker for distributed operations. /// /// The parallel worker fans out operations to all workers and aggregates results. /// It implements `ObjectBlockOps` for coordinated object storage uploads. pub fn parallel_worker(&self) -> Option> { self.parallel_worker.clone() } /// Get the object storage client for G4 operations. /// /// Returns `Some` if object storage is configured, `None` otherwise. /// The client is used by InitiatorSession for G4 parallel search. pub fn object_client(&self) -> Option> { self.object_client.clone() } /// Add a remote leader to the search list. /// /// Remote leaders are queried during `find_matches_with_options` when /// `search_remote == true`. This method allows adding remote leaders /// after construction (e.g., when instance IDs are only known after /// cluster setup). pub fn add_remote_leader(&self, instance_id: InstanceId) { let mut remote_leaders = self.remote_leaders.write().unwrap(); if !remote_leaders.contains(&instance_id) { remote_leaders.push(instance_id); } } /// Set all remote leaders at once. pub fn set_remote_leaders(&self, instance_ids: Vec) { let mut remote_leaders = self.remote_leaders.write().unwrap(); *remote_leaders = instance_ids; } /// Get the list of remote leader instance IDs. pub fn remote_leaders(&self) -> Vec { self.remote_leaders.read().unwrap().clone() } /// Scan for all blocks matching any of the given sequence hashes. /// /// Unlike `find_matches`, this: /// - Does NOT stop on first miss /// - Returns blocks from both G2 and G3 tiers separately /// - Acquires blocks from pools (caller owns until dropped via RAII) /// - Returns `sorted_matches` ordered by `SequenceHash::position()` /// /// # Arguments /// * `sequence_hashes` - Hashes to scan for /// * `touch` - Whether to update frequency tracking (for MultiLRU eviction policy) /// /// # Algorithm /// 1. Scan G2 manager for candidates /// 2. Scan G3 manager for remaining candidates /// 3. Build sorted_matches from both, sorted by position (lowest to highest) pub fn scan_blocks(&self, sequence_hashes: &[SequenceHash], touch: bool) -> ScanBlocksResult { // Step 1: Scan G2 for all candidates let g2_blocks = self.g2_manager.scan_matches(sequence_hashes, touch); // Step 2: Find remaining hashes not in G2 let remaining: Vec = sequence_hashes .iter() .filter(|h| !g2_blocks.contains_key(h)) .copied() .collect(); // Step 3: Scan G3 for remaining (if G3 exists) let g3_blocks = if let Some(ref g3_manager) = self.g3_manager { if !remaining.is_empty() { g3_manager.scan_matches(&remaining, touch) } else { HashMap::new() } } else { HashMap::new() }; // Step 4: Build sorted_matches from both tiers let mut sorted_matches: Vec<(SequenceHash, LogicalLayoutHandle)> = Vec::with_capacity(g2_blocks.len() + g3_blocks.len()); // Add G2 matches for hash in g2_blocks.keys() { sorted_matches.push((*hash, LogicalLayoutHandle::G2)); } // Add G3 matches for hash in g3_blocks.keys() { sorted_matches.push((*hash, LogicalLayoutHandle::G3)); } // Sort by SequenceHash position (lowest to highest) sorted_matches.sort_by_key(|(hash, _)| hash.position()); ScanBlocksResult { g2_blocks, g3_blocks, sorted_matches, } } /// Scan blocks using a custom policy that controls iteration and yields results. /// /// This provides maximum flexibility for implementing custom scanning strategies. /// The policy receives access to a `BlockAccessor` for acquiring blocks and a /// `PolicyContext` for yielding results incrementally. /// /// # Arguments /// * `hashes` - Sequence hashes to scan /// * `touch` - Whether to update frequency tracking on block access /// * `policy` - Function that implements the scanning strategy /// /// # Design /// /// The accessor does NOT hold locks between calls. Each `.find()` call is /// independent. This enables: /// - Custom iteration patterns (sorted, BTree scan, binary search, etc.) /// - Yielding results incrementally (e.g., contiguous subsequences) /// - Future parallel execution (accessor is Send + Sync) /// /// # Example: Simple linear scan /// ```ignore /// let blocks = leader.scan_with_policy(&hashes, true, |hashes, ctx| { /// for hash in hashes { /// if let Some(block) = ctx.accessor().find(*hash) { /// ctx.yield_item(block); /// } /// } /// }); /// ``` /// /// # Example: Find contiguous subsequences /// ```ignore /// let runs: Vec> = leader.scan_with_policy(&hashes, true, |hashes, ctx| { /// let mut run = Vec::new(); /// let mut last_pos: Option = None; /// /// for hash in hashes.iter().sorted_by_key(|h| h.position()) { /// if let Some(block) = ctx.accessor().find(*hash) { /// let pos = block.position(); /// if last_pos.map_or(true, |p| pos == p + 1) { /// run.push(block); /// } else { /// if !run.is_empty() { ctx.yield_item(std::mem::take(&mut run)); } /// run.push(block); /// } /// last_pos = Some(pos); /// } else if !run.is_empty() { /// ctx.yield_item(std::mem::take(&mut run)); /// last_pos = None; /// } /// } /// if !run.is_empty() { ctx.yield_item(run); } /// }); /// ``` pub fn scan_with_policy(&self, hashes: &[SequenceHash], touch: bool, policy: F) -> Vec where F: FnOnce(&[SequenceHash], &mut PolicyContext), { let accessor = BlockAccessor::new(self, touch); let mut ctx = PolicyContext { accessor, results: Vec::new(), }; policy(hashes, &mut ctx); ctx.results } pub fn builder() -> InstanceLeaderBuilder { InstanceLeaderBuilder::default() } /// Register Nova handlers for leader-to-leader communication. /// /// This must be called after construction to enable distributed onboarding. pub fn register_handlers(&self) -> Result<()> { let instance_id = self.messenger.instance_id(); let g2_manager = self.g2_manager.clone(); let g3_manager = self.g3_manager.clone(); let parallel_worker = self.parallel_worker.clone(); let transport = self.transport.clone(); let sessions = self.sessions.clone(); let spawn_responder = move |msg: OnboardMessage| -> Result<()> { if let OnboardMessage::CreateSession { requester, session_id, sequence_hashes, } = msg { let (tx, rx) = mpsc::channel(100); sessions.insert(session_id, tx); let session = ResponderSession::new( session_id, instance_id, requester, g2_manager.clone(), g3_manager.clone(), parallel_worker.clone(), transport.clone(), ); tokio::spawn(async move { if let Err(e) = session.run(rx, sequence_hashes).await { tracing::warn!(error = %e, "ResponderSession error"); } }); Ok(()) } else { anyhow::bail!("spawn_responder called with non-CreateSession message") } }; // Create export_metadata callback if we have workers or cached metadata let export_metadata_callback: Option = if !self.workers.is_empty() || self.cached_worker_metadata.is_some() { let workers = self.workers.clone(); let cached_metadata = self.cached_worker_metadata.clone(); Some(Arc::new(move || { let workers = workers.clone(); let cached_metadata = cached_metadata.clone(); Box::pin(async move { // Return cached metadata if available if let Some(cached) = cached_metadata { return Ok(cached); } // Otherwise, query workers let mut metadata = Vec::with_capacity(workers.len()); for worker in &workers { let serialized = worker.export_metadata()?.await?; metadata.push(serialized); } Ok(metadata) }) })) } else { None }; let mut service = VeloLeaderService::new(self.messenger.clone(), self.sessions.clone()) .with_spawn_responder(spawn_responder) .with_session_sessions(self.session_sessions.clone()); if let Some(callback) = export_metadata_callback { service = service.with_export_metadata(callback); } service.register_handlers()?; Ok(()) } /// Store session state (held blocks and status channel). /// /// Blocks are kept alive via RAII until the session is removed from storage. fn store_session_state(&self, state: SessionState) { self.session_states.insert(state.session_id, state); } /// Release a completed session, dropping any held blocks. /// /// This is optional - sessions will naturally be cleaned up when the InstanceLeader /// is dropped. Call this explicitly if you need to release blocks earlier. pub fn release_session(&self, session_id: SessionId) { self.session_states.remove(&session_id); self.sessions.remove(&session_id); self.session_sessions.remove(&session_id); } // ======================================================================== // Inverted Control Pattern (Prefill-Decode) Methods // ======================================================================== /// Create a controllable session for local blocks. /// /// This is the "Decode side" of the inverted control pattern: /// 1. Search local G2 and G3 for matches /// 2. Create a ControllableSession that holds the blocks /// 3. Return session_id to be sent to Prefill out-of-band /// /// By default, G3→G2 staging starts immediately (auto_stage=true). pub fn create_controllable_session( &self, sequence_hashes: &[SequenceHash], ) -> Result { self.create_controllable_session_with_options( sequence_hashes, ControllableSessionOptions::default(), ) } /// Create a controllable session with custom options. /// /// Use this when you need to control auto-staging behavior. pub fn create_controllable_session_with_options( &self, sequence_hashes: &[SequenceHash], options: ControllableSessionOptions, ) -> Result { let session_id = SessionId::from(Uuid::new_v4()); // Local search only let matched_g2_blocks = self.g2_manager.match_blocks(sequence_hashes); // Find remaining hashes not in G2 let remaining_hashes: Vec<_> = sequence_hashes .iter() .filter(|h| !matched_g2_blocks.iter().any(|b| b.sequence_hash() == **h)) .copied() .collect(); // Search G3 for remaining hashes let matched_g3_blocks = if let Some(ref g3_manager) = self.g3_manager { g3_manager.match_blocks(&remaining_hashes) } else { Vec::new() }; let local_g2_count = matched_g2_blocks.len(); let local_g3_count = matched_g3_blocks.len(); // Create session channel using unified SessionMessage protocol let (tx, rx) = session_message_channel(100); self.session_sessions.insert(session_id, tx); // Collect G2 layout handles from workers for round-robin block allocation let worker_g2_handles: Vec = self .parallel_worker .as_ref() .map(|pw| pw.workers().iter().filter_map(|w| w.g2_handle()).collect()) .unwrap_or_default(); let endpoint = super::session::SessionEndpoint::new( session_id, self.messenger.instance_id(), self.transport.clone(), rx, ); let (cmd_tx, cmd_rx) = mpsc::channel(16); let session = ServerSession::new_with_staging( endpoint, BlockHolder::new(matched_g2_blocks), BlockHolder::new(matched_g3_blocks), worker_g2_handles, self.g2_manager.clone(), self.parallel_worker.clone(), cmd_rx, ServerSessionOptions { auto_stage: options.auto_stage, }, ); // Keep handle alive to prevent cmd channel from closing let _handle = ServerSessionHandle::new(session_id, self.messenger.instance_id(), cmd_tx); // Spawn session task let session_sessions = self.session_sessions.clone(); tokio::spawn(async move { let _handle = _handle; // move handle into task to keep cmd channel open if let Err(e) = session.run().await { tracing::warn!(error = %e, "ServerSession error"); } // Clean up when session completes session_sessions.remove(&session_id); }); Ok(ControllableSessionResult { session_id, local_g2_count, local_g3_count, }) } // ======================================================================== // Unified Session Protocol // ======================================================================== /// Attach to a remote session. /// Returns a `SessionHandle` that uses `SessionMessage` for communication. /// /// # Arguments /// * `remote_instance` - The instance hosting the session /// * `session_id` - The session to attach to /// /// # Example /// ```ignore /// let handle = leader.attach_session(remote_id, session_id).await?; /// let state = handle.wait_for_ready().await?; /// handle.trigger_staging().await?; /// ``` pub async fn attach_session( &self, remote_instance: InstanceId, session_id: SessionId, ) -> Result { // Create local channel for receiving state updates let (state_tx, state_rx) = session_handle_state_channel(); // Register handler for this session's messages let (msg_tx, msg_rx) = session_message_channel(100); self.session_sessions.insert(session_id, msg_tx); // Spawn receiver task to update state tokio::spawn(Self::run_session_receiver(msg_rx, state_tx)); // Send attach message using new protocol let msg = SessionMessage::Attach { peer: self.messenger.instance_id(), session_id, as_role: ControlRole::Controller, }; self.transport.send_session(remote_instance, msg).await?; let mut handle = SessionHandle::new( session_id, remote_instance, self.messenger.instance_id(), self.transport.clone(), state_rx, ); // Add RDMA support if parallel worker is configured if let Some(parallel_worker) = &self.parallel_worker { handle = handle.with_rdma_support(parallel_worker.clone()); } Ok(handle) } // ======================================================================== // Endpoint Session Creation (Server-Side) // ======================================================================== /// Create an endpoint session that a remote peer can attach to. /// /// This searches local G2/G3 for blocks matching the given sequence hashes /// and creates a session that exposes them for remote RDMA pull. /// /// Returns `(session_id, handle)` where: /// - `session_id` - Send to remote peer for attachment /// - `handle` - Use to control the session (send layer notifications, close) /// /// # Example /// ```ignore /// // Create session for sequence hashes /// let (session_id, handle) = leader.create_endpoint_session(&hashes)?; /// /// // Send session_id to remote peer out-of-band /// // Remote attaches via: remote_leader.attach_session(local_id, session_id) /// /// // For layerwise transfer, notify when layers are ready /// handle.notify_layers_ready(0..1).await?; /// ``` pub fn create_endpoint_session( &self, sequence_hashes: &[SequenceHash], ) -> Result<(SessionId, ServerSessionHandle)> { let session_id = SessionId::from(uuid::Uuid::new_v4()); // Local search let matched_g2_blocks = self.g2_manager.match_blocks(sequence_hashes); // Collect layout handles from workers // Note: For single-worker setups, all blocks use the same handle // For multi-worker (SPMD), each block gets the handle from its assigned worker let worker_g2_handles: Vec = self .parallel_worker .as_ref() .map(|pw| pw.workers().iter().filter_map(|w| w.g2_handle()).collect()) .unwrap_or_default(); // Assign layout handle to each matched block // For now, use the first worker's handle for all blocks (single-worker assumption) // TODO: For SPMD, map blocks to worker handles based on block assignment let layout_handle = worker_g2_handles .first() .copied() .ok_or_else(|| anyhow::anyhow!("No G2 layout handle available from workers"))?; let layout_handles: Vec = vec![layout_handle; matched_g2_blocks.len()]; // Get sequence hashes from matched blocks let matched_hashes: Vec = matched_g2_blocks .iter() .map(|b| b.sequence_hash()) .collect(); // Create the session channel let (msg_tx, msg_rx) = session_message_channel(100); self.session_sessions.insert(session_id, msg_tx); // Create BlockHolder from matched blocks let block_holder = BlockHolder::new(matched_g2_blocks); // Create the session and handle let (session, handle) = create_server_session( session_id, self.messenger.instance_id(), block_holder, layout_handles, matched_hashes, self.transport.clone(), msg_rx, ); // Spawn the session task let session_sessions = self.session_sessions.clone(); tokio::spawn(async move { if let Err(e) = session.run().await { tracing::warn!(error = %e, "ServerSession error"); } // Clean up when session completes session_sessions.remove(&session_id); }); Ok((session_id, handle)) } /// Create an endpoint session for specific pre-allocated blocks. /// /// Unlike `create_endpoint_session`, this doesn't search - it uses the /// provided blocks directly. Useful when the caller already has blocks /// to expose (e.g., after prefill computation). /// /// # Arguments /// * `blocks` - Blocks to expose for RDMA pull /// * `sequence_hashes` - Sequence hashes for the blocks (must match block count) /// * `layout_handles` - Layout handles for the blocks (must match block count) /// /// # Example /// ```ignore /// // After prefill computation, expose blocks for Decode to pull /// let (session_id, handle) = leader.create_endpoint_session_for_blocks( /// prefill_blocks, /// &hashes, /// &layout_handles, /// )?; /// ``` pub fn create_endpoint_session_for_blocks( &self, blocks: BlockHolder, sequence_hashes: &[SequenceHash], layout_handles: &[LayoutHandle], ) -> Result<(SessionId, ServerSessionHandle)> { let session_id = SessionId::from(uuid::Uuid::new_v4()); // Create the session channel let (msg_tx, msg_rx) = session_message_channel(100); self.session_sessions.insert(session_id, msg_tx); // Create the session and handle let (session, handle) = create_server_session( session_id, self.messenger.instance_id(), blocks, layout_handles.to_vec(), sequence_hashes.to_vec(), self.transport.clone(), msg_rx, ); // Spawn the session task let session_sessions = self.session_sessions.clone(); tokio::spawn(async move { if let Err(e) = session.run().await { tracing::warn!(error = %e, "ServerSession error"); } // Clean up when session completes session_sessions.remove(&session_id); }); Ok((session_id, handle)) } /// Internal: Process incoming SessionMessage for a session. async fn run_session_receiver( mut rx: mpsc::Receiver, state_tx: super::session::SessionHandleStateTx, ) { while let Some(msg) = rx.recv().await { match msg { SessionMessage::StateResponse { state, .. } => { state_tx.update(state); } SessionMessage::BlocksStaged { staged_blocks, remaining, layer_range, .. } => { state_tx.add_staged_blocks(staged_blocks, remaining, layer_range); } SessionMessage::Error { message, .. } => { tracing::warn!(%message, "Session error"); state_tx.set_failed(); break; } SessionMessage::Close { .. } => { state_tx.set_phase(SessionPhase::Complete); break; } _ => { // Ignore control commands (sent by controller, not received) } } } } /// Get the session sessions map (for Nova handler registration). #[expect(dead_code)] pub(crate) fn session_sessions(&self) -> Arc> { self.session_sessions.clone() } // ======================================================================== // RDMA Metadata Management // These methods handle layout metadata export/import for remote RDMA transfers. // ======================================================================== /// Check if metadata for a remote instance has been loaded. /// /// Returns true if `import_remote_metadata` has been successfully called /// for the given instance. pub fn has_remote_metadata(&self, instance: InstanceId) -> bool { self.parallel_worker .as_ref() .map(|pw| pw.has_remote_metadata(instance)) .unwrap_or(false) } /// Get the number of workers attached to this leader. pub fn worker_count(&self) -> usize { self.workers.len() } /// Export metadata from all workers. /// /// Returns a `Vec` where each element corresponds to a worker /// in rank order. This metadata can be sent to remote instances to enable /// RDMA transfers. /// /// # Returns /// Vector of serialized layouts, one per worker pub async fn export_worker_metadata(&self) -> Result> { // Return cached metadata if available if let Some(cached) = &self.cached_worker_metadata { return Ok(cached.clone()); } // Otherwise, query workers let mut metadata = Vec::with_capacity(self.workers.len()); for worker in &self.workers { let serialized = worker.export_metadata()?.await?; metadata.push(serialized); } Ok(metadata) } /// Import metadata from a remote instance's workers. /// /// This imports layout metadata from a remote instance, enabling RDMA transfers /// to pull data from that instance. Metadata is imported rank-by-rank: /// - local worker 0 imports remote worker 0's metadata /// - local worker 1 imports remote worker 1's metadata /// - etc. /// /// # Arguments /// * `remote_instance` - The instance ID of the remote leader /// * `metadata` - Vector of SerializedLayout from remote workers (one per worker) /// /// # Errors /// Returns an error if: /// - No parallel worker configured /// - Metadata was already imported for this instance /// - Worker count mismatch between local and remote /// - Individual worker metadata import fails pub async fn import_remote_metadata( &self, remote_instance: InstanceId, metadata: Vec, ) -> Result<()> { let parallel_worker = self .parallel_worker .as_ref() .ok_or_else(|| anyhow::anyhow!("No parallel worker configured"))?; // Check if already loaded if parallel_worker.has_remote_metadata(remote_instance) { anyhow::bail!("Metadata already imported for instance {}", remote_instance); } // Connect to remote - this imports metadata and stores handle mappings parallel_worker .connect_remote(remote_instance, metadata)? .await?; Ok(()) } // ======================================================================== // Private Worker Mirror Methods // These methods execute operations across all workers and aggregate results. // ======================================================================== /// Execute local transfer across all workers, returning aggregated notification. /// /// Delegates to the parallel_worker which fans out to all workers and /// aggregates their notifications into a single composite notification. #[allow(dead_code)] pub(crate) fn execute_local_transfer( &self, src: LogicalLayoutHandle, dst: LogicalLayoutHandle, src_block_ids: Vec, dst_block_ids: Vec, options: TransferOptions, ) -> Result { let parallel_worker = self .parallel_worker .as_ref() .ok_or_else(|| anyhow::anyhow!("No parallel worker configured"))?; parallel_worker.execute_local_transfer( src, dst, Arc::from(src_block_ids), Arc::from(dst_block_ids), options, ) } /// Execute remote onboard across all workers, returning aggregated notification. /// /// Delegates to the parallel_worker which fans out to all workers and /// aggregates their notifications into a single composite notification. #[allow(dead_code)] pub(crate) fn execute_remote_onboard( &self, src: RemoteDescriptor, dst: LogicalLayoutHandle, dst_block_ids: Vec, options: TransferOptions, ) -> Result { let parallel_worker = self .parallel_worker .as_ref() .ok_or_else(|| anyhow::anyhow!("No parallel worker configured"))?; parallel_worker.execute_remote_onboard(src, dst, Arc::from(dst_block_ids), options) } /// Execute remote offload across all workers, returning aggregated notification. /// /// Delegates to the parallel_worker which fans out to all workers and /// aggregates their notifications into a single composite notification. #[allow(dead_code)] pub(crate) fn execute_remote_offload( &self, src: LogicalLayoutHandle, dst: RemoteDescriptor, src_block_ids: Vec, options: TransferOptions, ) -> Result { let parallel_worker = self .parallel_worker .as_ref() .ok_or_else(|| anyhow::anyhow!("No parallel worker configured"))?; parallel_worker.execute_remote_offload(src, Arc::from(src_block_ids), dst, options) } } impl Leader for InstanceLeader { fn find_matches_with_options( &self, sequence_hashes: &[SequenceHash], options: FindMatchesOptions, ) -> Result { // Search G2 (host memory) for matches // Uses match_blocks which stops at first miss (implements "first hole" policy). // This ensures we only find contiguous blocks from the start of the sequence. // For distributed search, remote instances use scan_matches for broad coverage, // then first-hole filtering is applied in InitiatorSession after aggregation. // todo: add explicit timing tracing here // let start_time = Instant::now(); let matched_g2_blocks = self.g2_manager.match_blocks(sequence_hashes); //let g2_search_time = Instant::now().duration_since(start_time); // Search G3 (disk) for remaining hashes if G3 is available let remaining_hashes: Vec<_> = sequence_hashes .iter() .filter(|h| !matched_g2_blocks.iter().any(|b| b.sequence_hash() == **h)) .copied() .collect(); let matched_g3_blocks = if let Some(ref g3_manager) = self.g3_manager { // Uses match_blocks on remaining hashes (those not found in G2). // Since G2 already applied first-hole policy, G3 search continues from where G2 stopped. g3_manager.match_blocks(&remaining_hashes) } else { Vec::new() }; // Determine if we can return immediately (Ready) or need async session // Ready if: // - g3 blocks is empty // - AND NOT (search_remote AND has_remote_leaders) // - AND NOT (search_remote AND has_object_client) // // AsyncSession (is_ready=false) if: // - g3 is not empty, or // - search_remote is true AND (has_remote_leaders OR has_object_client) let has_remote_leaders = !self.remote_leaders.read().unwrap().is_empty(); let has_object_client = self.object_client.is_some(); let needs_remote_search = options.search_remote && (has_remote_leaders || has_object_client); let is_ready = matched_g3_blocks.is_empty() && !needs_remote_search; if is_ready { // No session needed - blocks owned directly by ReadyResult (RAII) return Ok(FindMatchesResult::Ready(ReadyResult::new( matched_g2_blocks, ))); } // AsyncSession path: G3 blocks found or remote search enabled let session_id = SessionId::from(Uuid::new_v4()); let local_g2_count = matched_g2_blocks.len(); let local_g3_count = matched_g3_blocks.len(); // AsyncSession: staging locally and/or remote searching let (status_tx, status_rx) = watch::channel(OnboardingStatus::Searching); let all_g2_blocks = Arc::new(Mutex::new(None)); // Store session state to keep blocks alive let state = SessionState { session_id, matched_g2_blocks, matched_g3_blocks, status_tx: status_tx.clone(), }; self.store_session_state(state); // If no remote search, handle local-only staging if !options.search_remote { // Local-only staging (Prepare or Full mode) // TODO: Implement local G3→G2 staging let total_matched = local_g2_count + local_g3_count; status_tx .send(OnboardingStatus::Complete { matched_blocks: total_matched, }) .ok(); return Ok(FindMatchesResult::AsyncSession(AsyncSessionResult::new( session_id, status_rx, all_g2_blocks, None, // No session handle for local-only staging (yet) ))); } // Remote search path let (tx, rx) = mpsc::channel(100); self.sessions.insert(session_id, tx); // Create control channel for Hold/Prepare modes let (session_handle, control_rx) = if matches!( options.staging_mode, StagingMode::Hold | StagingMode::Prepare ) { let (control_tx, control_rx) = mpsc::channel(10); let handle = LegacySessionHandle::new(session_id, options.staging_mode, control_tx); (Some(handle), Some(control_rx)) } else { (None, None) }; let session = InitiatorSession::new( session_id, self.messenger.instance_id(), options.staging_mode, self.g2_manager.clone(), self.g3_manager.clone(), self.parallel_worker.clone(), self.transport.clone(), status_tx.clone(), all_g2_blocks.clone(), control_rx.unwrap_or_else(|| { let (_, rx) = mpsc::channel(1); rx }), self.object_client.clone(), ); let remote_leaders = self.remote_leaders.read().unwrap().clone(); let sequence_hashes = sequence_hashes.to_vec(); let handle = self.messenger.runtime(); handle.spawn(async move { if let Err(e) = session.run(rx, remote_leaders, sequence_hashes).await { tracing::warn!(error = %e, "InitiatorSession error"); // Try to update status to indicate error status_tx .send(OnboardingStatus::Complete { matched_blocks: 0 }) .ok(); } }); Ok(FindMatchesResult::AsyncSession(AsyncSessionResult::new( session_id, status_rx, all_g2_blocks, session_handle, ))) } }