pool.rs 10.2 KB
Newer Older
Ryan Olson's avatar
Ryan Olson committed
1
2
3
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

Ryan Olson's avatar
Ryan Olson committed
4
5
6
pub mod managed;
pub use managed::ManagedBlockPool;

Ryan Olson's avatar
Ryan Olson committed
7
8
use derive_builder::Builder;
use derive_getters::Dissolve;
Ryan Olson's avatar
Ryan Olson committed
9
use serde::{Deserialize, Serialize};
Ryan Olson's avatar
Ryan Olson committed
10
11
12
13

pub use super::block::{ImmutableBlock, MutableBlock};

use super::block::{
14
15
    Block, BlockError, BlockMetadata, GlobalRegistry, MaybeReturnableBlock, nixl::short_type_name,
    private, registry::BlockRegistry,
Ryan Olson's avatar
Ryan Olson committed
16
17
18
19
};
use super::events::{EventManager, NullEventManager};
use super::storage::Storage;

Ryan Olson's avatar
Ryan Olson committed
20
use crate::block_manager::CacheLevel;
21
use crate::block_manager::block::locality::LocalityProvider;
Ryan Olson's avatar
Ryan Olson committed
22
23
use crate::tokens::{SequenceHash, TokenBlock};

Ryan Olson's avatar
Ryan Olson committed
24
25
use async_trait::async_trait;
use std::sync::atomic::{AtomicU64, Ordering};
Ryan Olson's avatar
Ryan Olson committed
26
27
28
29
use std::{
    collections::{BTreeSet, HashMap, VecDeque},
    sync::{Arc, Weak},
};
30
use tokio::runtime::Handle;
Ryan Olson's avatar
Ryan Olson committed
31
use tokio::sync::oneshot;
Ryan Olson's avatar
Ryan Olson committed
32
33
34
35
use tokio_util::sync::CancellationToken;

use dynamo_runtime::Result;

Ryan Olson's avatar
Ryan Olson committed
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
// Type aliases to reduce complexity across the module
type BlockPoolResult<T> = Result<T, BlockPoolError>;
type AsyncResponse<T> = Result<oneshot::Receiver<T>, BlockPoolError>;

// Collection type aliases
pub type MutableBlocks<S, L, M> = Vec<MutableBlock<S, L, M>>;
pub type ImmutableBlocks<S, L, M> = Vec<ImmutableBlock<S, L, M>>;

/// Enum representing either a mutable or immutable block that can be returned to the pool
#[derive(Debug)]
pub enum OwnedBlock<S: Storage, L: LocalityProvider, M: BlockMetadata> {
    Mutable(MutableBlock<S, L, M>),
    Immutable(ImmutableBlock<S, L, M>),
}

impl<S: Storage, L: LocalityProvider, M: BlockMetadata> MaybeReturnableBlock<S, L, M>
    for OwnedBlock<S, L, M>
{
    fn is_returnable(&self) -> bool {
        match self {
            OwnedBlock::Mutable(block) => block.is_returnable(),
            OwnedBlock::Immutable(block) => block.is_returnable(),
        }
    }

    fn try_take_block(self, token: private::PrivateToken) -> Option<Vec<Block<S, L, M>>> {
        match self {
            OwnedBlock::Mutable(block) => block.try_take_block(token),
            OwnedBlock::Immutable(block) => block.try_take_block(token),
        }
    }
}

impl<S: Storage, L: LocalityProvider, M: BlockMetadata> From<MutableBlock<S, L, M>>
    for OwnedBlock<S, L, M>
{
    fn from(block: MutableBlock<S, L, M>) -> Self {
        OwnedBlock::Mutable(block)
    }
}

impl<S: Storage, L: LocalityProvider, M: BlockMetadata> From<ImmutableBlock<S, L, M>>
    for OwnedBlock<S, L, M>
{
    fn from(block: ImmutableBlock<S, L, M>) -> Self {
        OwnedBlock::Immutable(block)
    }
}

Ryan Olson's avatar
Ryan Olson committed
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
#[derive(Debug, thiserror::Error)]
pub enum BlockPoolError {
    #[error("Block is not complete")]
    BlockNotComplete,

    #[error("Not enough blocks available, requested: {0}, available: {1}")]
    NotEnoughBlocksAvailable(usize, usize),

    #[error("Invalid MutableBlock: {0}")]
    InvalidMutableBlock(String),

    #[error("Failed to register block: {0}")]
    FailedToRegisterBlock(String),

    #[error("Progress engine shutdown")]
    ProgressEngineShutdown,

    #[error(transparent)]
    BlockError(#[from] BlockError),

Ryan Olson's avatar
Ryan Olson committed
105
106
    #[error("Reset error: {0}")]
    ResetError(String),
Ryan Olson's avatar
Ryan Olson committed
107

Ryan Olson's avatar
Ryan Olson committed
108
109
    #[error("Block is not returnable")]
    NotReturnable,
110

Ryan Olson's avatar
Ryan Olson committed
111
112
    #[error("Unsupported cache level: {0:?}")]
    UnsupportedCacheLevel(CacheLevel),
113

Ryan Olson's avatar
Ryan Olson committed
114
115
    #[error("No blocks to register")]
    NoBlocksToRegister,
Ryan Olson's avatar
Ryan Olson committed
116
117
}

Ryan Olson's avatar
Ryan Olson committed
118
119
120
121
122
123
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum BlockRegistrationDuplicationSetting {
    /// On registration, if duplication is allowed, blocks with duplicate hashes cannot be registered directly,
    /// but instead can be held live with a strong arc to the primary block. This maintains the lifetime of
    /// the duplicate block.
    Allowed,
Ryan Olson's avatar
Ryan Olson committed
124

Ryan Olson's avatar
Ryan Olson committed
125
126
127
128
129
130
131
132
    /// On registration, if duplication is disabled, blocks with duplicate hashes will be returned immediately
    /// to the inactive pool and the primary block, the one first registered, will be returned to the caller,
    /// replacing the duplicate block.
    ///
    /// Note: If block duplication is disabled, then the implementation must always respect the fact that the
    /// mutable block that was registered, may not be the same block returned by the registration function, and
    /// thus be able to update any references that wish to use the block after registration.
    Disabled,
Ryan Olson's avatar
Ryan Olson committed
133
134
}

Ryan Olson's avatar
Ryan Olson committed
135
/// Generic request-response pattern for background task communication
Ryan Olson's avatar
Ryan Olson committed
136
#[derive(Dissolve)]
Ryan Olson's avatar
Ryan Olson committed
137
138
139
pub struct RequestResponse<Req, Resp> {
    pub request: Req,
    pub response_tx: oneshot::Sender<Resp>,
Ryan Olson's avatar
Ryan Olson committed
140
141
}

Ryan Olson's avatar
Ryan Olson committed
142
143
144
impl<Req, Resp> RequestResponse<Req, Resp> {
    /// Create a new request-response pair
    pub fn new(request: Req) -> (Self, oneshot::Receiver<Resp>) {
Ryan Olson's avatar
Ryan Olson committed
145
146
147
148
149
150
151
152
153
154
155
        let (response_tx, response_rx) = oneshot::channel();
        (
            Self {
                request,
                response_tx,
            },
            response_rx,
        )
    }
}

Ryan Olson's avatar
Ryan Olson committed
156
157
158
159
160
#[async_trait]
pub trait BlockPool<S: Storage, L: LocalityProvider, M: BlockMetadata>:
    BlockPoolController + AsyncBlockPoolController + Send + Sync
{
    /// Add a vector of [`Block`]s to the pool.
Ryan Olson's avatar
Ryan Olson committed
161
162
163
164
165
166
167
168
    ///
    /// These blocks are typically created from a [`super::block::Blocks`]
    /// and represent the initial set of available cache blocks.
    /// Blocks added this way are initially reset.
    ///
    /// # Arguments
    ///
    /// * `blocks` - A [`Vec<Block<S, M>>`] to add to the inactive pool.
Ryan Olson's avatar
Ryan Olson committed
169
    async fn add_blocks(&self, blocks: Vec<Block<S, L, M>>) -> BlockPoolResult<()>;
Ryan Olson's avatar
Ryan Olson committed
170
171

    /// Blocking version of [`BlockPool::add_blocks`].
Ryan Olson's avatar
Ryan Olson committed
172
    fn add_blocks_blocking(&self, blocks: Vec<Block<S, L, M>>) -> BlockPoolResult<()>;
Ryan Olson's avatar
Ryan Olson committed
173

Ryan Olson's avatar
Ryan Olson committed
174
    /// Allocate a specified number of free blocks from the pool.
Ryan Olson's avatar
Ryan Olson committed
175
176
177
178
179
180
181
182
183
184
    ///
    /// # Arguments
    ///
    /// * `count` - The number of blocks to allocate.
    ///
    /// # Returns
    ///
    /// A [`Result`] containing:
    /// - `Ok(Vec<MutableBlock<S, M>>)`: If successful, a vector of allocated mutable blocks.
    /// - `Err(BlockPoolError)`: If not enough blocks are available in the inactive pool.
Ryan Olson's avatar
Ryan Olson committed
185
    async fn allocate_blocks(&self, count: usize) -> BlockPoolResult<MutableBlocks<S, L, M>>;
Ryan Olson's avatar
Ryan Olson committed
186
187

    /// Blocking version of [`BlockPool::allocate_blocks`].
Ryan Olson's avatar
Ryan Olson committed
188
    fn allocate_blocks_blocking(&self, count: usize) -> BlockPoolResult<MutableBlocks<S, L, M>>;
Ryan Olson's avatar
Ryan Olson committed
189

Ryan Olson's avatar
Ryan Olson committed
190
191
    /// Register a vector of [`MutableBlock`]s with the pool.
    async fn register_blocks(
Ryan Olson's avatar
Ryan Olson committed
192
        &self,
Ryan Olson's avatar
Ryan Olson committed
193
194
        blocks: Vec<MutableBlock<S, L, M>>,
    ) -> BlockPoolResult<ImmutableBlocks<S, L, M>>;
Ryan Olson's avatar
Ryan Olson committed
195
196

    /// Blocking version of [`BlockPool::register_blocks`].
Ryan Olson's avatar
Ryan Olson committed
197
    fn register_blocks_blocking(
Ryan Olson's avatar
Ryan Olson committed
198
        &self,
Ryan Olson's avatar
Ryan Olson committed
199
200
        blocks: Vec<MutableBlock<S, L, M>>,
    ) -> BlockPoolResult<ImmutableBlocks<S, L, M>>;
Ryan Olson's avatar
Ryan Olson committed
201

Ryan Olson's avatar
Ryan Olson committed
202
    /// Match a set of [`SequenceHash`]s to existing blocks in the pool.
Ryan Olson's avatar
Ryan Olson committed
203
204
205
    ///
    /// # Arguments
    ///
Ryan Olson's avatar
Ryan Olson committed
206
    /// * `sequence_hashes` - A [`Vec<SequenceHash>`] to match.
Ryan Olson's avatar
Ryan Olson committed
207
208
209
210
    ///
    /// # Returns
    ///
    /// An [`Option<ImmutableBlock<S, M>>`] containing the shared block if found, otherwise `None`.
Ryan Olson's avatar
Ryan Olson committed
211
    async fn match_sequence_hashes(
Ryan Olson's avatar
Ryan Olson committed
212
213
        &self,
        sequence_hashes: &[SequenceHash],
Ryan Olson's avatar
Ryan Olson committed
214
    ) -> BlockPoolResult<ImmutableBlocks<S, L, M>>;
Ryan Olson's avatar
Ryan Olson committed
215
216

    /// Blocking version of [`BlockPool::match_sequence_hashes`].
Ryan Olson's avatar
Ryan Olson committed
217
    fn match_sequence_hashes_blocking(
Ryan Olson's avatar
Ryan Olson committed
218
219
        &self,
        sequence_hashes: &[SequenceHash],
Ryan Olson's avatar
Ryan Olson committed
220
    ) -> BlockPoolResult<ImmutableBlocks<S, L, M>>;
Ryan Olson's avatar
Ryan Olson committed
221

Ryan Olson's avatar
Ryan Olson committed
222
223
    /// Touch a set of blocks. Equivalent to registering and then immediately dropping.
    async fn touch_blocks(&self, sequence_hashes: &[SequenceHash]) -> BlockPoolResult<()>;
Ryan Olson's avatar
Ryan Olson committed
224

Ryan Olson's avatar
Ryan Olson committed
225
226
    /// Blocking version of [`BlockPool::touch_blocks`].
    fn touch_blocks_blocking(&self, sequence_hashes: &[SequenceHash]) -> BlockPoolResult<()>;
Ryan Olson's avatar
Ryan Olson committed
227

Ryan Olson's avatar
Ryan Olson committed
228
229
230
231
    /// Attempt to return a block to the pool. Blocks will naturally be returned to the pool when they are dropped
    /// and their reference count drops to 0; however, for testing and to synchronize the block returning to the
    /// pool, this function can be used.
    async fn try_return_block(&self, block: OwnedBlock<S, L, M>) -> BlockPoolResult<()>;
Ryan Olson's avatar
Ryan Olson committed
232

Ryan Olson's avatar
Ryan Olson committed
233
234
    /// Blocking version of [`BlockPool::try_return_block`].
    fn try_return_block_blocking(&self, block: OwnedBlock<S, L, M>) -> BlockPoolResult<()>;
Ryan Olson's avatar
Ryan Olson committed
235

Ryan Olson's avatar
Ryan Olson committed
236
    fn total_blocks(&self) -> u64;
Ryan Olson's avatar
Ryan Olson committed
237

Ryan Olson's avatar
Ryan Olson committed
238
239
    fn available_blocks(&self) -> u64;
}
240

Ryan Olson's avatar
Ryan Olson committed
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
/// State of the pool when queried.
///
/// Provides a snapshot of the pool's current state including:
/// - Active blocks currently in use
/// - Inactive blocks ordered by reuse priority
/// - Number of empty blocks
#[derive(Debug, Clone, Serialize, Deserialize, Dissolve)]
pub struct BlockPoolStatus {
    /// Active blocks currently in use
    pub active_blocks: usize,

    /// Inactive blocks ordered by reuse priority
    /// Blocks at the front of the list are more likely to be reused
    pub inactive_blocks: usize,

    /// Number of empty blocks
    pub empty_blocks: usize,
}
259

Ryan Olson's avatar
Ryan Olson committed
260
261
262
263
#[derive(Debug, Clone, Serialize, Deserialize, Dissolve)]
pub struct ResetBlocksResponse {
    /// Blocks that were reset
    pub reset_blocks: Vec<SequenceHash>,
264

Ryan Olson's avatar
Ryan Olson committed
265
266
    /// Blocks that were not found in the pool
    pub not_found: Vec<SequenceHash>,
267

Ryan Olson's avatar
Ryan Olson committed
268
269
270
    /// Blocks that were not reset
    pub not_reset: Vec<SequenceHash>,
}
271

Ryan Olson's avatar
Ryan Olson committed
272
273
274
pub trait BlockPoolController: Send + Sync {
    /// Returns the [`BlockPoolStatus`] of the pool.
    fn status_blocking(&self) -> Result<BlockPoolStatus, BlockPoolError>;
275

Ryan Olson's avatar
Ryan Olson committed
276
    /// Resets the pool to its initial state.
277
    ///
Ryan Olson's avatar
Ryan Olson committed
278
279
    /// This function will error unless all blocks have returned to the inactive pool.
    fn reset_blocking(&self) -> Result<(), BlockPoolError>;
280

Ryan Olson's avatar
Ryan Olson committed
281
282
283
284
285
286
    /// Attempt to reset a set of blocks.
    fn reset_blocks_blocking(
        &self,
        sequence_hashes: &[SequenceHash],
    ) -> Result<ResetBlocksResponse, BlockPoolError>;
}
287

Ryan Olson's avatar
Ryan Olson committed
288
289
290
291
#[async_trait::async_trait]
pub trait AsyncBlockPoolController: Send + Sync {
    /// Returns the [`BlockPoolStatus`] of the pool.
    async fn status(&self) -> Result<BlockPoolStatus, BlockPoolError>;
292

Ryan Olson's avatar
Ryan Olson committed
293
294
295
296
    /// Resets the pool to its initial state.
    ///
    /// This function will error unless all blocks have returned to the inactive pool.
    async fn reset(&self) -> Result<(), BlockPoolError>;
297

Ryan Olson's avatar
Ryan Olson committed
298
299
300
301
302
    /// Attempt to reset a set of blocks.
    async fn reset_blocks(
        &self,
        sequence_hashes: &[SequenceHash],
    ) -> Result<ResetBlocksResponse, BlockPoolError>;
Ryan Olson's avatar
Ryan Olson committed
303
}