testing.rs 12.7 KB
Newer Older
Ryan Olson's avatar
Ryan Olson committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

//! Round-trip testing infrastructure for transfer verification.
//!
//! This module provides utilities for testing data integrity across transfers
//! by comparing checksums after round-trip operations:
//! 1. Source blocks (host) → Intermediate (device/disk/remote)
//! 2. Intermediate → Destination blocks (host, different IDs)
//! 3. Verify checksums match between source and destination

use super::context::TransferContext;
use super::{
    BlockChecksum, FillPattern, PhysicalLayout, StorageKind, compute_block_checksums, fill_blocks,
    transfer_blocks,
};
use anyhow::{Result, anyhow};
use std::collections::HashMap;

/// Result of a round-trip test.
#[derive(Debug)]
pub struct RoundTripTestResult {
    /// Source block checksums (keyed by source block ID)
    pub source_checksums: HashMap<usize, BlockChecksum>,

    /// Destination block checksums (keyed by destination block ID)
    pub dest_checksums: HashMap<usize, BlockChecksum>,

    /// Block ID mapping used (src_id, dst_id)
    pub block_mapping: Vec<(usize, usize)>,

    /// Whether all checksums matched
    pub success: bool,

    /// Mismatched blocks (if any)
    pub mismatches: Vec<(usize, usize)>, // (src_id, dst_id) pairs that didn't match
}

impl RoundTripTestResult {
    /// Check if the round-trip test passed.
    pub fn is_success(&self) -> bool {
        self.success
    }

    /// Get the number of blocks tested.
    pub fn num_blocks(&self) -> usize {
        self.block_mapping.len()
    }

    /// Get a detailed report of the test results.
    pub fn report(&self) -> String {
        if self.success {
            format!(
                "Round-trip test PASSED: {}/{} blocks verified successfully",
                self.num_blocks(),
                self.num_blocks()
            )
        } else {
            format!(
                "Round-trip test FAILED: {}/{} blocks mismatched\nMismatches: {:?}",
                self.mismatches.len(),
                self.num_blocks(),
                self.mismatches
            )
        }
    }
}

/// Builder for round-trip tests.
///
/// This allows configuring a test that transfers data from source blocks
/// to intermediate storage and back to different destination blocks,
/// verifying data integrity via checksums.
pub struct RoundTripTest {
    /// Source physical layout (must be local)
    source: PhysicalLayout,

    /// Intermediate physical layout (can be remote/device/disk)
    intermediate: PhysicalLayout,

    /// Destination physical layout (must be local)
    destination: PhysicalLayout,

    /// Block mapping: (src_id, intermediate_id, dst_id)
    block_mapping: Vec<(usize, usize, usize)>,

    /// Fill pattern for source blocks
    fill_pattern: FillPattern,
}

impl RoundTripTest {
    /// Create a new round-trip test.
    ///
    /// # Arguments
    /// * `source` - Source physical layout (must be local)
    /// * `intermediate` - Intermediate physical layout
    /// * `destination` - Destination physical layout (must be local)
    pub fn new(
        source: PhysicalLayout,
        intermediate: PhysicalLayout,
        destination: PhysicalLayout,
    ) -> Result<Self> {
        if source.is_remote() {
            return Err(anyhow!("Source layout must be local"));
        }
        if destination.is_remote() {
            return Err(anyhow!("Destination layout must be local"));
        }

        Ok(Self {
            source,
            intermediate,
            destination,
            block_mapping: Vec::new(),
            fill_pattern: FillPattern::Sequential,
        })
    }

    /// Set the fill pattern for source blocks.
    pub fn with_fill_pattern(mut self, pattern: FillPattern) -> Self {
        self.fill_pattern = pattern;
        self
    }

    /// Add a block mapping for the round-trip test.
    ///
    /// # Arguments
    /// * `src_id` - Source block ID
    /// * `intermediate_id` - Intermediate block ID
    /// * `dst_id` - Destination block ID
    pub fn add_block_mapping(
        mut self,
        src_id: usize,
        intermediate_id: usize,
        dst_id: usize,
    ) -> Self {
        self.block_mapping.push((src_id, intermediate_id, dst_id));
        self
    }

    /// Add multiple block mappings at once.
    ///
    /// This is a convenience method for adding several mappings.
    pub fn with_block_mappings(mut self, mappings: &[(usize, usize, usize)]) -> Self {
        self.block_mapping.extend_from_slice(mappings);
        self
    }

    /// Run the round-trip test.
    ///
    /// # Workflow
    /// 1. Fill source blocks with the specified pattern
    /// 2. Compute source checksums
    /// 3. Transfer source → intermediate
    /// 4. Transfer intermediate → destination
    /// 5. Compute destination checksums
    /// 6. Compare checksums
    ///
    /// # Arguments
    /// * `ctx` - Transfer context with CUDA stream and NIXL agent
    pub async fn run(self, ctx: &TransferContext) -> Result<RoundTripTestResult> {
        if self.block_mapping.is_empty() {
            return Err(anyhow!("No block mappings specified"));
        }

        // Step 1: Fill source blocks
        let src_ids: Vec<usize> = self.block_mapping.iter().map(|(src, _, _)| *src).collect();
        fill_blocks(&self.source, &src_ids, self.fill_pattern)?;

        // Step 2: Compute source checksums
        let source_checksums = compute_block_checksums(&self.source, &src_ids)?;

        // Step 3: Transfer source → intermediate
        let src_ids_intermediate: Vec<usize> =
            self.block_mapping.iter().map(|(src, _, _)| *src).collect();
        let inter_ids_from_src: Vec<usize> = self
            .block_mapping
            .iter()
            .map(|(_, inter, _)| *inter)
            .collect();
        let notification = transfer_blocks(
            &self.source,
            &self.intermediate,
            &src_ids_intermediate,
            &inter_ids_from_src,
            ctx,
        )?;
        notification.await?;

        // Step 4: Transfer intermediate → destination
        let inter_ids_to_dst: Vec<usize> = self
            .block_mapping
            .iter()
            .map(|(_, inter, _)| *inter)
            .collect();
        let dst_ids_from_inter: Vec<usize> =
            self.block_mapping.iter().map(|(_, _, dst)| *dst).collect();
        let notification = transfer_blocks(
            &self.intermediate,
            &self.destination,
            &inter_ids_to_dst,
            &dst_ids_from_inter,
            ctx,
        )?;
        notification.await?;

        // Step 5: Compute destination checksums
        let dst_ids: Vec<usize> = self.block_mapping.iter().map(|(_, _, dst)| *dst).collect();
        let dest_checksums = compute_block_checksums(&self.destination, &dst_ids)?;

        // Step 6: Compare checksums
        let mut mismatches = Vec::new();
        for (src_id, _, dst_id) in &self.block_mapping {
            let src_checksum = &source_checksums[src_id];
            let dst_checksum = &dest_checksums[dst_id];

            if src_checksum != dst_checksum {
                mismatches.push((*src_id, *dst_id));
            }
        }

        let success = mismatches.is_empty();
        let block_mapping: Vec<(usize, usize)> = self
            .block_mapping
            .iter()
            .map(|(src, _, dst)| (*src, *dst))
            .collect();

        Ok(RoundTripTestResult {
            source_checksums,
            dest_checksums,
            block_mapping,
            success,
            mismatches,
        })
    }
}

#[cfg(all(test, feature = "testing-kvbm"))]
mod tests {
    use super::*;
    use crate::v2::layout::{
        FullyContiguousLayout, Layout, LayoutConfig, MemoryRegion, OwnedMemoryRegion,
    };
    use std::sync::Arc;

    // Helper to create a minimal transfer context for testing
    // In real tests with CUDA/NIXL, this would be properly constructed
    fn create_test_context() -> TransferContext {
        // For now, we'll skip these tests if CUDA is not available
        // In the future, we can mock TransferContext or use conditional compilation
        todo!("Create test context - requires CUDA/NIXL setup")
    }

    #[tokio::test]
    #[ignore = "Requires CUDA/NIXL setup"]
    async fn test_round_trip_host_to_host() {
        // Create three layouts: source, intermediate, destination
        let (src_layout, _src_mem) = create_test_layout(4);
        let (inter_layout, _inter_mem) = create_test_layout(4);
        let (dst_layout, _dst_mem) = create_test_layout(4);

        let source = PhysicalLayout::new_local(src_layout, StorageKind::System);
        let intermediate = PhysicalLayout::new_local(inter_layout, StorageKind::Pinned);
        let destination = PhysicalLayout::new_local(dst_layout, StorageKind::System);

        // Build round-trip test with different block IDs
        // Source: blocks [0, 1, 2, 3]
        // Intermediate: blocks [0, 1, 2, 3]
        // Destination: blocks [0, 1, 2, 3] (different memory than source)
        let test = RoundTripTest::new(source, intermediate, destination)
            .unwrap()
            .with_fill_pattern(FillPattern::Sequential)
            .add_block_mapping(0, 0, 0)
            .add_block_mapping(1, 1, 1)
            .add_block_mapping(2, 2, 2)
            .add_block_mapping(3, 3, 3);

        // Create a transfer context (requires actual CUDA/NIXL setup)
        let ctx = create_test_context();

        // Run the test
        let result = test.run(&ctx).await.unwrap();

        assert!(result.is_success(), "{}", result.report());
        assert_eq!(result.num_blocks(), 4);
    }

    #[tokio::test]
    #[ignore = "Requires CUDA/NIXL setup"]
    async fn test_round_trip_different_block_ids() {
        // Create layouts with enough blocks
        let (src_layout, _src_mem) = create_test_layout(8);
        let (inter_layout, _inter_mem) = create_test_layout(8);
        let (dst_layout, _dst_mem) = create_test_layout(8);

        let source = PhysicalLayout::new_local(src_layout, StorageKind::System);
        let intermediate = PhysicalLayout::new_local(inter_layout, StorageKind::Pinned);
        let destination = PhysicalLayout::new_local(dst_layout, StorageKind::System);

        // Test with non-overlapping block IDs
        // Source: blocks [0, 1, 2, 3]
        // Intermediate: blocks [2, 3, 4, 5]
        // Destination: blocks [4, 5, 6, 7]
        let test = RoundTripTest::new(source, intermediate, destination)
            .unwrap()
            .with_fill_pattern(FillPattern::BlockBased)
            .with_block_mappings(&[(0, 2, 4), (1, 3, 5), (2, 4, 6), (3, 5, 7)]);

        let ctx = create_test_context();
        let result = test.run(&ctx).await.unwrap();

        assert!(result.is_success(), "{}", result.report());
        assert_eq!(result.num_blocks(), 4);
    }

    #[test]
    fn test_round_trip_builder() {
        let (src_layout, _) = create_test_layout(4);
        let (inter_layout, _) = create_test_layout(4);
        let (dst_layout, _) = create_test_layout(4);

        let source = PhysicalLayout::new_local(src_layout, StorageKind::System);
        let intermediate = PhysicalLayout::new_local(inter_layout, StorageKind::Pinned);
        let destination = PhysicalLayout::new_local(dst_layout, StorageKind::System);

        let test = RoundTripTest::new(source, intermediate, destination)
            .unwrap()
            .with_fill_pattern(FillPattern::Constant(42))
            .add_block_mapping(0, 0, 1)
            .add_block_mapping(1, 1, 2);

        assert_eq!(test.block_mapping.len(), 2);
    }

    #[test]
    fn test_round_trip_requires_local_source() {
        let (src_layout, _) = create_test_layout(1);
        let (inter_layout, _) = create_test_layout(1);
        let (dst_layout, _) = create_test_layout(1);

        let source =
            PhysicalLayout::new_remote(src_layout, StorageKind::System, "remote".to_string());
        let intermediate = PhysicalLayout::new_local(inter_layout, StorageKind::Pinned);
        let destination = PhysicalLayout::new_local(dst_layout, StorageKind::System);

        let result = RoundTripTest::new(source, intermediate, destination);
        assert!(result.is_err());
    }

    #[test]
    fn test_round_trip_requires_local_destination() {
        let (src_layout, _) = create_test_layout(1);
        let (inter_layout, _) = create_test_layout(1);
        let (dst_layout, _) = create_test_layout(1);

        let source = PhysicalLayout::new_local(src_layout, StorageKind::System);
        let intermediate = PhysicalLayout::new_local(inter_layout, StorageKind::Pinned);
        let destination =
            PhysicalLayout::new_remote(dst_layout, StorageKind::System, "remote".to_string());

        let result = RoundTripTest::new(source, intermediate, destination);
        assert!(result.is_err());
    }
}