mod.rs 5.52 KB
Newer Older
Ryan Olson's avatar
Ryan Olson committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

//! Offload Engine for asynchronous block transfers between storage tiers.
//!
//! The offload engine provides a policy-based, cancellable pipeline for moving
//! blocks from higher-performance tiers (G1/G2) to lower-cost tiers (G3/G4).
//!
//! # Architecture
//!
//! ```text
//! ┌─────────────────────────────────────────────────────────────────┐
//! │                        OffloadEngine                            │
//! │                                                                 │
//! │  ┌───────────────┐    ┌───────────────┐    ┌───────────────┐    │
//! │  │G1→G2 Pipeline │────│ G2→G3 Pipeline│    │ G2→G4 Pipeline│    │
//! │  └───────────────┘    └───────────────┘    └───────────────┘    │
//! │         │                     │                     │           │
//! │         └─────────auto_chain──┘                     │           │
//! │                                                                 │
//! └─────────────────────────────────────────────────────────────────┘
//!
//! Pipeline stages:
//! ┌─────────────┐    ┌────────────────┐    ┌──────────────────┐
//! │   Policy    │───▶│     Batch      │───▶│    Transfer      │
//! │  Evaluator  │    │   Collector    │    │    Executor      │
//! └─────────────┘    └────────────────┘    └──────────────────┘
//!       │                   │                      │
//!       ▼                   ▼                      ▼
//!   cancel check       cancel check          wait for in-flight
//! ```
//!
//! # Features
//!
//! - **Policy-based filtering**: Blocks pass through configurable policies
//!   (presence checks, LFU thresholds) before transfer
//! - **Batched transfers**: Blocks are accumulated into batches for efficient
//!   bulk transfers
//! - **Cancellation**: Clean cancellation with confirmation that all blocks
//!   are released and no outstanding operations remain
//! - **Pipeline chaining**: G1→G2 completions can automatically feed G2→G3
//!
//! See also: [Developer Guide](../../docs/offload-developer.md) for implementation
//! details and extension rules.
//!
//! # Example
//!
//! ```ignore
//! use kvbm::v2::distributed::offload::{
//!     OffloadEngine, PipelineBuilder, PresenceFilter, PresenceAndLFUFilter,
//! };
//!
//! // Build engine with pipelines
//! let engine = OffloadEngine::builder(leader.clone())
//!     .with_registry(registry.clone())
//!     .with_g2_manager(g2_manager.clone())
//!     .with_g3_manager(g3_manager.clone())
//!     .with_g2_to_g3_pipeline(
//!         PipelineBuilder::<G2, G3>::new()
//!             .policy(Arc::new(PresenceAndLFUFilter::with_default_threshold(registry.clone())))
//!             .batch_size(64)
//!             .build()
//!     )
//!     .build()?;
//!
//! // Enqueue blocks for offload
//! let handle = engine.enqueue_g2_to_g3(blocks)?;
//!
//! // Wait for completion or cancel
//! tokio::select! {
//!     result = handle.wait() => {
//!         println!("Completed: {:?}", result?.completed_blocks);
//!     }
//!     _ = shutdown_signal => {
//!         handle.cancel().wait().await;
//!         println!("Cancelled");
//!     }
//! }
//! ```
//!
//! See also: [Developer Guide](../../docs/offload-developer.md)

/// Helper macro to create an NVTX range when the nvtx feature is enabled.
/// The range automatically ends when the returned guard is dropped.
macro_rules! nvtx_range {
    ($name:expr) => {{
        #[cfg(feature = "nvtx")]
        let _range = nvtx::range!($name);
        #[cfg(not(feature = "nvtx"))]
        let _range = ();
        _range
    }};
}

mod batch;
mod cancel;
mod engine;
mod handle;
mod pending;
mod pipeline;
mod policy;
mod queue;
mod source;

#[cfg(test)]
mod cancel_tests;

// Re-export public API
pub use cancel::{CancelConfirmation, CancelState, CancellationToken};
pub use engine::{OffloadEngine, OffloadEngineBuilder};
pub use handle::{TransferHandle, TransferId, TransferResult, TransferStatus};
pub use pending::{PendingGuard, PendingTracker};
pub use pipeline::{
    ObjectPipeline, ObjectPipelineBuilder, ObjectPipelineConfig, Pipeline, PipelineBuilder,
    PipelineConfig, ResolvedBatch, ResolvedBlock, upgrade_batch,
};
pub use policy::{
    AllOfPolicy, AnyOfPolicy, BoxFuture, EvalContext, ObjectLockPresenceFilter,
    ObjectPresenceFilter, OffloadPolicy, PassAllPolicy, PolicyBatchFuture, PolicyFuture,
    PresenceAndLFUFilter, PresenceChecker, PresenceFilter, S3PresenceChecker, async_batch_result,
    async_result, create_policy_from_config, sync_batch_result, sync_result,
};
pub use queue::CancellableQueue;
pub use source::{ExternalBlock, SourceBlock, SourceBlocks};

// Re-export batch config for advanced users
pub use batch::{BatchConfig, TimingTrace};