pool.rs 10.2 KB
Newer Older
Ryan Olson's avatar
Ryan Olson committed
1
2
3
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

Ryan Olson's avatar
Ryan Olson committed
4
5
6
pub mod managed;
pub use managed::ManagedBlockPool;

7
use anyhow::Result;
Ryan Olson's avatar
Ryan Olson committed
8
9
use derive_builder::Builder;
use derive_getters::Dissolve;
Ryan Olson's avatar
Ryan Olson committed
10
use serde::{Deserialize, Serialize};
Ryan Olson's avatar
Ryan Olson committed
11
12
13
14

pub use super::block::{ImmutableBlock, MutableBlock};

use super::block::{
15
16
    Block, BlockError, BlockMetadata, GlobalRegistry, MaybeReturnableBlock, nixl::short_type_name,
    private, registry::BlockRegistry,
Ryan Olson's avatar
Ryan Olson committed
17
18
19
20
};
use super::events::{EventManager, NullEventManager};
use super::storage::Storage;

Ryan Olson's avatar
Ryan Olson committed
21
use crate::block_manager::CacheLevel;
22
use crate::block_manager::block::locality::LocalityProvider;
Ryan Olson's avatar
Ryan Olson committed
23
24
use crate::tokens::{SequenceHash, TokenBlock};

Ryan Olson's avatar
Ryan Olson committed
25
26
use async_trait::async_trait;
use std::sync::atomic::{AtomicU64, Ordering};
Ryan Olson's avatar
Ryan Olson committed
27
28
29
30
use std::{
    collections::{BTreeSet, HashMap, VecDeque},
    sync::{Arc, Weak},
};
31
use tokio::runtime::Handle;
Ryan Olson's avatar
Ryan Olson committed
32
use tokio::sync::oneshot;
Ryan Olson's avatar
Ryan Olson committed
33
34
use tokio_util::sync::CancellationToken;

Ryan Olson's avatar
Ryan Olson committed
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
// Type aliases to reduce complexity across the module
type BlockPoolResult<T> = Result<T, BlockPoolError>;
type AsyncResponse<T> = Result<oneshot::Receiver<T>, BlockPoolError>;

// Collection type aliases
pub type MutableBlocks<S, L, M> = Vec<MutableBlock<S, L, M>>;
pub type ImmutableBlocks<S, L, M> = Vec<ImmutableBlock<S, L, M>>;

/// Enum representing either a mutable or immutable block that can be returned to the pool
#[derive(Debug)]
pub enum OwnedBlock<S: Storage, L: LocalityProvider, M: BlockMetadata> {
    Mutable(MutableBlock<S, L, M>),
    Immutable(ImmutableBlock<S, L, M>),
}

impl<S: Storage, L: LocalityProvider, M: BlockMetadata> MaybeReturnableBlock<S, L, M>
    for OwnedBlock<S, L, M>
{
    fn is_returnable(&self) -> bool {
        match self {
            OwnedBlock::Mutable(block) => block.is_returnable(),
            OwnedBlock::Immutable(block) => block.is_returnable(),
        }
    }

    fn try_take_block(self, token: private::PrivateToken) -> Option<Vec<Block<S, L, M>>> {
        match self {
            OwnedBlock::Mutable(block) => block.try_take_block(token),
            OwnedBlock::Immutable(block) => block.try_take_block(token),
        }
    }
}

impl<S: Storage, L: LocalityProvider, M: BlockMetadata> From<MutableBlock<S, L, M>>
    for OwnedBlock<S, L, M>
{
    fn from(block: MutableBlock<S, L, M>) -> Self {
        OwnedBlock::Mutable(block)
    }
}

impl<S: Storage, L: LocalityProvider, M: BlockMetadata> From<ImmutableBlock<S, L, M>>
    for OwnedBlock<S, L, M>
{
    fn from(block: ImmutableBlock<S, L, M>) -> Self {
        OwnedBlock::Immutable(block)
    }
}

Ryan Olson's avatar
Ryan Olson committed
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
#[derive(Debug, thiserror::Error)]
pub enum BlockPoolError {
    #[error("Block is not complete")]
    BlockNotComplete,

    #[error("Not enough blocks available, requested: {0}, available: {1}")]
    NotEnoughBlocksAvailable(usize, usize),

    #[error("Invalid MutableBlock: {0}")]
    InvalidMutableBlock(String),

    #[error("Failed to register block: {0}")]
    FailedToRegisterBlock(String),

    #[error("Progress engine shutdown")]
    ProgressEngineShutdown,

    #[error(transparent)]
    BlockError(#[from] BlockError),

Ryan Olson's avatar
Ryan Olson committed
104
105
    #[error("Reset error: {0}")]
    ResetError(String),
Ryan Olson's avatar
Ryan Olson committed
106

Ryan Olson's avatar
Ryan Olson committed
107
108
    #[error("Block is not returnable")]
    NotReturnable,
109

Ryan Olson's avatar
Ryan Olson committed
110
111
    #[error("Unsupported cache level: {0:?}")]
    UnsupportedCacheLevel(CacheLevel),
112

Ryan Olson's avatar
Ryan Olson committed
113
114
    #[error("No blocks to register")]
    NoBlocksToRegister,
Ryan Olson's avatar
Ryan Olson committed
115
116
}

Ryan Olson's avatar
Ryan Olson committed
117
118
119
120
121
122
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum BlockRegistrationDuplicationSetting {
    /// On registration, if duplication is allowed, blocks with duplicate hashes cannot be registered directly,
    /// but instead can be held live with a strong arc to the primary block. This maintains the lifetime of
    /// the duplicate block.
    Allowed,
Ryan Olson's avatar
Ryan Olson committed
123

Ryan Olson's avatar
Ryan Olson committed
124
125
126
127
128
129
130
131
    /// On registration, if duplication is disabled, blocks with duplicate hashes will be returned immediately
    /// to the inactive pool and the primary block, the one first registered, will be returned to the caller,
    /// replacing the duplicate block.
    ///
    /// Note: If block duplication is disabled, then the implementation must always respect the fact that the
    /// mutable block that was registered, may not be the same block returned by the registration function, and
    /// thus be able to update any references that wish to use the block after registration.
    Disabled,
Ryan Olson's avatar
Ryan Olson committed
132
133
}

Ryan Olson's avatar
Ryan Olson committed
134
/// Generic request-response pattern for background task communication
Ryan Olson's avatar
Ryan Olson committed
135
#[derive(Dissolve)]
Ryan Olson's avatar
Ryan Olson committed
136
137
138
pub struct RequestResponse<Req, Resp> {
    pub request: Req,
    pub response_tx: oneshot::Sender<Resp>,
Ryan Olson's avatar
Ryan Olson committed
139
140
}

Ryan Olson's avatar
Ryan Olson committed
141
142
143
impl<Req, Resp> RequestResponse<Req, Resp> {
    /// Create a new request-response pair
    pub fn new(request: Req) -> (Self, oneshot::Receiver<Resp>) {
Ryan Olson's avatar
Ryan Olson committed
144
145
146
147
148
149
150
151
152
153
154
        let (response_tx, response_rx) = oneshot::channel();
        (
            Self {
                request,
                response_tx,
            },
            response_rx,
        )
    }
}

Ryan Olson's avatar
Ryan Olson committed
155
156
157
158
159
#[async_trait]
pub trait BlockPool<S: Storage, L: LocalityProvider, M: BlockMetadata>:
    BlockPoolController + AsyncBlockPoolController + Send + Sync
{
    /// Add a vector of [`Block`]s to the pool.
Ryan Olson's avatar
Ryan Olson committed
160
161
162
163
164
165
166
167
    ///
    /// These blocks are typically created from a [`super::block::Blocks`]
    /// and represent the initial set of available cache blocks.
    /// Blocks added this way are initially reset.
    ///
    /// # Arguments
    ///
    /// * `blocks` - A [`Vec<Block<S, M>>`] to add to the inactive pool.
Ryan Olson's avatar
Ryan Olson committed
168
    async fn add_blocks(&self, blocks: Vec<Block<S, L, M>>) -> BlockPoolResult<()>;
Ryan Olson's avatar
Ryan Olson committed
169
170

    /// Blocking version of [`BlockPool::add_blocks`].
Ryan Olson's avatar
Ryan Olson committed
171
    fn add_blocks_blocking(&self, blocks: Vec<Block<S, L, M>>) -> BlockPoolResult<()>;
Ryan Olson's avatar
Ryan Olson committed
172

Ryan Olson's avatar
Ryan Olson committed
173
    /// Allocate a specified number of free blocks from the pool.
Ryan Olson's avatar
Ryan Olson committed
174
175
176
177
178
179
180
181
182
183
    ///
    /// # Arguments
    ///
    /// * `count` - The number of blocks to allocate.
    ///
    /// # Returns
    ///
    /// A [`Result`] containing:
    /// - `Ok(Vec<MutableBlock<S, M>>)`: If successful, a vector of allocated mutable blocks.
    /// - `Err(BlockPoolError)`: If not enough blocks are available in the inactive pool.
Ryan Olson's avatar
Ryan Olson committed
184
    async fn allocate_blocks(&self, count: usize) -> BlockPoolResult<MutableBlocks<S, L, M>>;
Ryan Olson's avatar
Ryan Olson committed
185
186

    /// Blocking version of [`BlockPool::allocate_blocks`].
Ryan Olson's avatar
Ryan Olson committed
187
    fn allocate_blocks_blocking(&self, count: usize) -> BlockPoolResult<MutableBlocks<S, L, M>>;
Ryan Olson's avatar
Ryan Olson committed
188

Ryan Olson's avatar
Ryan Olson committed
189
190
    /// Register a vector of [`MutableBlock`]s with the pool.
    async fn register_blocks(
Ryan Olson's avatar
Ryan Olson committed
191
        &self,
Ryan Olson's avatar
Ryan Olson committed
192
193
        blocks: Vec<MutableBlock<S, L, M>>,
    ) -> BlockPoolResult<ImmutableBlocks<S, L, M>>;
Ryan Olson's avatar
Ryan Olson committed
194
195

    /// Blocking version of [`BlockPool::register_blocks`].
Ryan Olson's avatar
Ryan Olson committed
196
    fn register_blocks_blocking(
Ryan Olson's avatar
Ryan Olson committed
197
        &self,
Ryan Olson's avatar
Ryan Olson committed
198
199
        blocks: Vec<MutableBlock<S, L, M>>,
    ) -> BlockPoolResult<ImmutableBlocks<S, L, M>>;
Ryan Olson's avatar
Ryan Olson committed
200

Ryan Olson's avatar
Ryan Olson committed
201
    /// Match a set of [`SequenceHash`]s to existing blocks in the pool.
Ryan Olson's avatar
Ryan Olson committed
202
203
204
    ///
    /// # Arguments
    ///
Ryan Olson's avatar
Ryan Olson committed
205
    /// * `sequence_hashes` - A [`Vec<SequenceHash>`] to match.
Ryan Olson's avatar
Ryan Olson committed
206
207
208
209
    ///
    /// # Returns
    ///
    /// An [`Option<ImmutableBlock<S, M>>`] containing the shared block if found, otherwise `None`.
Ryan Olson's avatar
Ryan Olson committed
210
    async fn match_sequence_hashes(
Ryan Olson's avatar
Ryan Olson committed
211
212
        &self,
        sequence_hashes: &[SequenceHash],
Ryan Olson's avatar
Ryan Olson committed
213
    ) -> BlockPoolResult<ImmutableBlocks<S, L, M>>;
Ryan Olson's avatar
Ryan Olson committed
214
215

    /// Blocking version of [`BlockPool::match_sequence_hashes`].
Ryan Olson's avatar
Ryan Olson committed
216
    fn match_sequence_hashes_blocking(
Ryan Olson's avatar
Ryan Olson committed
217
218
        &self,
        sequence_hashes: &[SequenceHash],
Ryan Olson's avatar
Ryan Olson committed
219
    ) -> BlockPoolResult<ImmutableBlocks<S, L, M>>;
Ryan Olson's avatar
Ryan Olson committed
220

Ryan Olson's avatar
Ryan Olson committed
221
222
    /// Touch a set of blocks. Equivalent to registering and then immediately dropping.
    async fn touch_blocks(&self, sequence_hashes: &[SequenceHash]) -> BlockPoolResult<()>;
Ryan Olson's avatar
Ryan Olson committed
223

Ryan Olson's avatar
Ryan Olson committed
224
225
    /// Blocking version of [`BlockPool::touch_blocks`].
    fn touch_blocks_blocking(&self, sequence_hashes: &[SequenceHash]) -> BlockPoolResult<()>;
Ryan Olson's avatar
Ryan Olson committed
226

Ryan Olson's avatar
Ryan Olson committed
227
228
229
230
    /// Attempt to return a block to the pool. Blocks will naturally be returned to the pool when they are dropped
    /// and their reference count drops to 0; however, for testing and to synchronize the block returning to the
    /// pool, this function can be used.
    async fn try_return_block(&self, block: OwnedBlock<S, L, M>) -> BlockPoolResult<()>;
Ryan Olson's avatar
Ryan Olson committed
231

Ryan Olson's avatar
Ryan Olson committed
232
233
    /// Blocking version of [`BlockPool::try_return_block`].
    fn try_return_block_blocking(&self, block: OwnedBlock<S, L, M>) -> BlockPoolResult<()>;
Ryan Olson's avatar
Ryan Olson committed
234

Ryan Olson's avatar
Ryan Olson committed
235
    fn total_blocks(&self) -> u64;
Ryan Olson's avatar
Ryan Olson committed
236

Ryan Olson's avatar
Ryan Olson committed
237
238
    fn available_blocks(&self) -> u64;
}
239

Ryan Olson's avatar
Ryan Olson committed
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
/// State of the pool when queried.
///
/// Provides a snapshot of the pool's current state including:
/// - Active blocks currently in use
/// - Inactive blocks ordered by reuse priority
/// - Number of empty blocks
#[derive(Debug, Clone, Serialize, Deserialize, Dissolve)]
pub struct BlockPoolStatus {
    /// Active blocks currently in use
    pub active_blocks: usize,

    /// Inactive blocks ordered by reuse priority
    /// Blocks at the front of the list are more likely to be reused
    pub inactive_blocks: usize,

    /// Number of empty blocks
    pub empty_blocks: usize,
}
258

Ryan Olson's avatar
Ryan Olson committed
259
260
261
262
#[derive(Debug, Clone, Serialize, Deserialize, Dissolve)]
pub struct ResetBlocksResponse {
    /// Blocks that were reset
    pub reset_blocks: Vec<SequenceHash>,
263

Ryan Olson's avatar
Ryan Olson committed
264
265
    /// Blocks that were not found in the pool
    pub not_found: Vec<SequenceHash>,
266

Ryan Olson's avatar
Ryan Olson committed
267
268
269
    /// Blocks that were not reset
    pub not_reset: Vec<SequenceHash>,
}
270

Ryan Olson's avatar
Ryan Olson committed
271
272
273
pub trait BlockPoolController: Send + Sync {
    /// Returns the [`BlockPoolStatus`] of the pool.
    fn status_blocking(&self) -> Result<BlockPoolStatus, BlockPoolError>;
274

Ryan Olson's avatar
Ryan Olson committed
275
    /// Resets the pool to its initial state.
276
    ///
Ryan Olson's avatar
Ryan Olson committed
277
278
    /// This function will error unless all blocks have returned to the inactive pool.
    fn reset_blocking(&self) -> Result<(), BlockPoolError>;
279

Ryan Olson's avatar
Ryan Olson committed
280
281
282
283
284
285
    /// Attempt to reset a set of blocks.
    fn reset_blocks_blocking(
        &self,
        sequence_hashes: &[SequenceHash],
    ) -> Result<ResetBlocksResponse, BlockPoolError>;
}
286

Ryan Olson's avatar
Ryan Olson committed
287
288
289
290
#[async_trait::async_trait]
pub trait AsyncBlockPoolController: Send + Sync {
    /// Returns the [`BlockPoolStatus`] of the pool.
    async fn status(&self) -> Result<BlockPoolStatus, BlockPoolError>;
291

Ryan Olson's avatar
Ryan Olson committed
292
293
294
295
    /// Resets the pool to its initial state.
    ///
    /// This function will error unless all blocks have returned to the inactive pool.
    async fn reset(&self) -> Result<(), BlockPoolError>;
296

Ryan Olson's avatar
Ryan Olson committed
297
298
299
300
301
    /// Attempt to reset a set of blocks.
    async fn reset_blocks(
        &self,
        sequence_hashes: &[SequenceHash],
    ) -> Result<ResetBlocksResponse, BlockPoolError>;
Ryan Olson's avatar
Ryan Olson committed
302
}