stub.rs 3.26 KB
Newer Older
Ryan Olson's avatar
Ryan Olson committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

//! Stub collective operations implementation.
//!
//! This module provides a no-op implementation of [`CollectiveOps`] for testing
//! and single-worker scenarios where no actual collective communication is needed.

use std::ops::Range;

use anyhow::Result;
use velo::EventManager;

use crate::BlockId;
use kvbm_common::LogicalLayoutHandle;
use kvbm_physical::transfer::TransferCompleteNotification;

use super::CollectiveOps;

/// Stub collective operations implementation.
///
/// This implementation completes immediately without actually performing any
/// collective communication. Use for testing or when collective operations
/// are not yet implemented (e.g., before NCCL integration).
///
/// # Safety
///
/// This stub does NOT perform actual data transfer. Using it in production
/// with `ReplicatedDataWorker` will result in incorrect behavior where
/// non-rank-0 workers have uninitialized data.
///
/// # Example
///
/// ```rust,ignore
/// use kvbm::v2::distributed::collectives::StubCollectiveOps;
///
/// let collective = StubCollectiveOps::new(events, 0, 1);
///
/// // Operations complete immediately without data transfer
/// let notification = collective.broadcast(
///     LogicalLayoutHandle::G1,
///     LogicalLayoutHandle::G1,
///     &src_block_ids,
///     &dst_block_ids,
///     None,
/// )?;
/// ```
pub struct StubCollectiveOps {
    events: EventManager,
    rank: usize,
    world_size: usize,
}

impl StubCollectiveOps {
    /// Create a new stub collective ops.
    ///
    /// # Arguments
    /// * `events` - The event system for creating completion notifications
    /// * `rank` - The rank of this worker in the collective group
    /// * `world_size` - The total number of workers in the collective group
    pub fn new(events: EventManager, rank: usize, world_size: usize) -> Self {
        Self {
            events,
            rank,
            world_size,
        }
    }

    /// Create a stub for single-worker scenarios (rank 0, world_size 1).
    pub fn single_worker(events: EventManager) -> Self {
        Self::new(events, 0, 1)
    }
}

impl CollectiveOps for StubCollectiveOps {
    fn broadcast(
        &self,
        src: LogicalLayoutHandle,
        dst: LogicalLayoutHandle,
        src_block_ids: &[BlockId],
        dst_block_ids: &[BlockId],
        layer_range: Option<Range<usize>>,
    ) -> Result<TransferCompleteNotification> {
        tracing::warn!(
            rank = self.rank,
            world_size = self.world_size,
            ?src,
            ?dst,
            num_src_blocks = src_block_ids.len(),
            num_dst_blocks = dst_block_ids.len(),
            ?layer_range,
            "StubCollectiveOps::broadcast called - completing immediately without actual transfer"
        );

        // Create an event that's already triggered (immediate completion)
        let event = self.events.new_event()?;
        let handle = event.handle();
        event.trigger()?;

        let awaiter = self.events.awaiter(handle)?;
        Ok(TransferCompleteNotification::from_awaiter(awaiter))
    }

    fn rank(&self) -> usize {
        self.rank
    }

    fn world_size(&self) -> usize {
        self.world_size
    }
}