"examples/backends/mocker/vscode:/vscode.git/clone" did not exist on "ab5a31b5a4fe7074585e3359309e1a5b1b81186c"
pool.rs 10.3 KB
Newer Older
Ryan Olson's avatar
Ryan Olson committed
1
2
3
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

Ryan Olson's avatar
Ryan Olson committed
4
5
6
pub mod managed;
pub use managed::ManagedBlockPool;

Ryan Olson's avatar
Ryan Olson committed
7
8
use derive_builder::Builder;
use derive_getters::Dissolve;
Ryan Olson's avatar
Ryan Olson committed
9
use serde::{Deserialize, Serialize};
Ryan Olson's avatar
Ryan Olson committed
10
11
12
13

pub use super::block::{ImmutableBlock, MutableBlock};

use super::block::{
14
15
    Block, BlockError, BlockMetadata, GlobalRegistry, MaybeReturnableBlock, nixl::short_type_name,
    private, registry::BlockRegistry,
Ryan Olson's avatar
Ryan Olson committed
16
17
};
use super::events::{EventManager, NullEventManager};
18
use super::metrics::{BlockManagerMetrics, PoolMetrics};
Ryan Olson's avatar
Ryan Olson committed
19
20
use super::storage::Storage;

Ryan Olson's avatar
Ryan Olson committed
21
use crate::block_manager::CacheLevel;
22
use crate::block_manager::block::locality::LocalityProvider;
Ryan Olson's avatar
Ryan Olson committed
23
24
use crate::tokens::{SequenceHash, TokenBlock};

Ryan Olson's avatar
Ryan Olson committed
25
use async_trait::async_trait;
26
use prometheus::Registry;
Ryan Olson's avatar
Ryan Olson committed
27
use std::sync::atomic::{AtomicU64, Ordering};
Ryan Olson's avatar
Ryan Olson committed
28
29
30
31
use std::{
    collections::{BTreeSet, HashMap, VecDeque},
    sync::{Arc, Weak},
};
32
use tokio::runtime::Handle;
Ryan Olson's avatar
Ryan Olson committed
33
use tokio::sync::oneshot;
Ryan Olson's avatar
Ryan Olson committed
34
35
36
37
use tokio_util::sync::CancellationToken;

use dynamo_runtime::Result;

Ryan Olson's avatar
Ryan Olson committed
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
// Type aliases to reduce complexity across the module
type BlockPoolResult<T> = Result<T, BlockPoolError>;
type AsyncResponse<T> = Result<oneshot::Receiver<T>, BlockPoolError>;

// Collection type aliases
pub type MutableBlocks<S, L, M> = Vec<MutableBlock<S, L, M>>;
pub type ImmutableBlocks<S, L, M> = Vec<ImmutableBlock<S, L, M>>;

/// Enum representing either a mutable or immutable block that can be returned to the pool
#[derive(Debug)]
pub enum OwnedBlock<S: Storage, L: LocalityProvider, M: BlockMetadata> {
    Mutable(MutableBlock<S, L, M>),
    Immutable(ImmutableBlock<S, L, M>),
}

impl<S: Storage, L: LocalityProvider, M: BlockMetadata> MaybeReturnableBlock<S, L, M>
    for OwnedBlock<S, L, M>
{
    fn is_returnable(&self) -> bool {
        match self {
            OwnedBlock::Mutable(block) => block.is_returnable(),
            OwnedBlock::Immutable(block) => block.is_returnable(),
        }
    }

    fn try_take_block(self, token: private::PrivateToken) -> Option<Vec<Block<S, L, M>>> {
        match self {
            OwnedBlock::Mutable(block) => block.try_take_block(token),
            OwnedBlock::Immutable(block) => block.try_take_block(token),
        }
    }
}

impl<S: Storage, L: LocalityProvider, M: BlockMetadata> From<MutableBlock<S, L, M>>
    for OwnedBlock<S, L, M>
{
    fn from(block: MutableBlock<S, L, M>) -> Self {
        OwnedBlock::Mutable(block)
    }
}

impl<S: Storage, L: LocalityProvider, M: BlockMetadata> From<ImmutableBlock<S, L, M>>
    for OwnedBlock<S, L, M>
{
    fn from(block: ImmutableBlock<S, L, M>) -> Self {
        OwnedBlock::Immutable(block)
    }
}

Ryan Olson's avatar
Ryan Olson committed
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
#[derive(Debug, thiserror::Error)]
pub enum BlockPoolError {
    #[error("Block is not complete")]
    BlockNotComplete,

    #[error("Not enough blocks available, requested: {0}, available: {1}")]
    NotEnoughBlocksAvailable(usize, usize),

    #[error("Invalid MutableBlock: {0}")]
    InvalidMutableBlock(String),

    #[error("Failed to register block: {0}")]
    FailedToRegisterBlock(String),

    #[error("Progress engine shutdown")]
    ProgressEngineShutdown,

    #[error(transparent)]
    BlockError(#[from] BlockError),

Ryan Olson's avatar
Ryan Olson committed
107
108
    #[error("Reset error: {0}")]
    ResetError(String),
Ryan Olson's avatar
Ryan Olson committed
109

Ryan Olson's avatar
Ryan Olson committed
110
111
    #[error("Block is not returnable")]
    NotReturnable,
112

Ryan Olson's avatar
Ryan Olson committed
113
114
    #[error("Unsupported cache level: {0:?}")]
    UnsupportedCacheLevel(CacheLevel),
115

Ryan Olson's avatar
Ryan Olson committed
116
117
    #[error("No blocks to register")]
    NoBlocksToRegister,
Ryan Olson's avatar
Ryan Olson committed
118
119
}

Ryan Olson's avatar
Ryan Olson committed
120
121
122
123
124
125
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum BlockRegistrationDuplicationSetting {
    /// On registration, if duplication is allowed, blocks with duplicate hashes cannot be registered directly,
    /// but instead can be held live with a strong arc to the primary block. This maintains the lifetime of
    /// the duplicate block.
    Allowed,
Ryan Olson's avatar
Ryan Olson committed
126

Ryan Olson's avatar
Ryan Olson committed
127
128
129
130
131
132
133
134
    /// On registration, if duplication is disabled, blocks with duplicate hashes will be returned immediately
    /// to the inactive pool and the primary block, the one first registered, will be returned to the caller,
    /// replacing the duplicate block.
    ///
    /// Note: If block duplication is disabled, then the implementation must always respect the fact that the
    /// mutable block that was registered, may not be the same block returned by the registration function, and
    /// thus be able to update any references that wish to use the block after registration.
    Disabled,
Ryan Olson's avatar
Ryan Olson committed
135
136
}

Ryan Olson's avatar
Ryan Olson committed
137
/// Generic request-response pattern for background task communication
Ryan Olson's avatar
Ryan Olson committed
138
#[derive(Dissolve)]
Ryan Olson's avatar
Ryan Olson committed
139
140
141
pub struct RequestResponse<Req, Resp> {
    pub request: Req,
    pub response_tx: oneshot::Sender<Resp>,
Ryan Olson's avatar
Ryan Olson committed
142
143
}

Ryan Olson's avatar
Ryan Olson committed
144
145
146
impl<Req, Resp> RequestResponse<Req, Resp> {
    /// Create a new request-response pair
    pub fn new(request: Req) -> (Self, oneshot::Receiver<Resp>) {
Ryan Olson's avatar
Ryan Olson committed
147
148
149
150
151
152
153
154
155
156
157
        let (response_tx, response_rx) = oneshot::channel();
        (
            Self {
                request,
                response_tx,
            },
            response_rx,
        )
    }
}

Ryan Olson's avatar
Ryan Olson committed
158
159
160
161
162
#[async_trait]
pub trait BlockPool<S: Storage, L: LocalityProvider, M: BlockMetadata>:
    BlockPoolController + AsyncBlockPoolController + Send + Sync
{
    /// Add a vector of [`Block`]s to the pool.
Ryan Olson's avatar
Ryan Olson committed
163
164
165
166
167
168
169
170
    ///
    /// These blocks are typically created from a [`super::block::Blocks`]
    /// and represent the initial set of available cache blocks.
    /// Blocks added this way are initially reset.
    ///
    /// # Arguments
    ///
    /// * `blocks` - A [`Vec<Block<S, M>>`] to add to the inactive pool.
Ryan Olson's avatar
Ryan Olson committed
171
    async fn add_blocks(&self, blocks: Vec<Block<S, L, M>>) -> BlockPoolResult<()>;
Ryan Olson's avatar
Ryan Olson committed
172
173

    /// Blocking version of [`BlockPool::add_blocks`].
Ryan Olson's avatar
Ryan Olson committed
174
    fn add_blocks_blocking(&self, blocks: Vec<Block<S, L, M>>) -> BlockPoolResult<()>;
Ryan Olson's avatar
Ryan Olson committed
175

Ryan Olson's avatar
Ryan Olson committed
176
    /// Allocate a specified number of free blocks from the pool.
Ryan Olson's avatar
Ryan Olson committed
177
178
179
180
181
182
183
184
185
186
    ///
    /// # Arguments
    ///
    /// * `count` - The number of blocks to allocate.
    ///
    /// # Returns
    ///
    /// A [`Result`] containing:
    /// - `Ok(Vec<MutableBlock<S, M>>)`: If successful, a vector of allocated mutable blocks.
    /// - `Err(BlockPoolError)`: If not enough blocks are available in the inactive pool.
Ryan Olson's avatar
Ryan Olson committed
187
    async fn allocate_blocks(&self, count: usize) -> BlockPoolResult<MutableBlocks<S, L, M>>;
Ryan Olson's avatar
Ryan Olson committed
188
189

    /// Blocking version of [`BlockPool::allocate_blocks`].
Ryan Olson's avatar
Ryan Olson committed
190
    fn allocate_blocks_blocking(&self, count: usize) -> BlockPoolResult<MutableBlocks<S, L, M>>;
Ryan Olson's avatar
Ryan Olson committed
191

Ryan Olson's avatar
Ryan Olson committed
192
193
    /// Register a vector of [`MutableBlock`]s with the pool.
    async fn register_blocks(
Ryan Olson's avatar
Ryan Olson committed
194
        &self,
Ryan Olson's avatar
Ryan Olson committed
195
196
        blocks: Vec<MutableBlock<S, L, M>>,
    ) -> BlockPoolResult<ImmutableBlocks<S, L, M>>;
Ryan Olson's avatar
Ryan Olson committed
197
198

    /// Blocking version of [`BlockPool::register_blocks`].
Ryan Olson's avatar
Ryan Olson committed
199
    fn register_blocks_blocking(
Ryan Olson's avatar
Ryan Olson committed
200
        &self,
Ryan Olson's avatar
Ryan Olson committed
201
202
        blocks: Vec<MutableBlock<S, L, M>>,
    ) -> BlockPoolResult<ImmutableBlocks<S, L, M>>;
Ryan Olson's avatar
Ryan Olson committed
203

Ryan Olson's avatar
Ryan Olson committed
204
    /// Match a set of [`SequenceHash`]s to existing blocks in the pool.
Ryan Olson's avatar
Ryan Olson committed
205
206
207
    ///
    /// # Arguments
    ///
Ryan Olson's avatar
Ryan Olson committed
208
    /// * `sequence_hashes` - A [`Vec<SequenceHash>`] to match.
Ryan Olson's avatar
Ryan Olson committed
209
210
211
212
    ///
    /// # Returns
    ///
    /// An [`Option<ImmutableBlock<S, M>>`] containing the shared block if found, otherwise `None`.
Ryan Olson's avatar
Ryan Olson committed
213
    async fn match_sequence_hashes(
Ryan Olson's avatar
Ryan Olson committed
214
215
        &self,
        sequence_hashes: &[SequenceHash],
Ryan Olson's avatar
Ryan Olson committed
216
    ) -> BlockPoolResult<ImmutableBlocks<S, L, M>>;
Ryan Olson's avatar
Ryan Olson committed
217
218

    /// Blocking version of [`BlockPool::match_sequence_hashes`].
Ryan Olson's avatar
Ryan Olson committed
219
    fn match_sequence_hashes_blocking(
Ryan Olson's avatar
Ryan Olson committed
220
221
        &self,
        sequence_hashes: &[SequenceHash],
Ryan Olson's avatar
Ryan Olson committed
222
    ) -> BlockPoolResult<ImmutableBlocks<S, L, M>>;
Ryan Olson's avatar
Ryan Olson committed
223

Ryan Olson's avatar
Ryan Olson committed
224
225
    /// Touch a set of blocks. Equivalent to registering and then immediately dropping.
    async fn touch_blocks(&self, sequence_hashes: &[SequenceHash]) -> BlockPoolResult<()>;
Ryan Olson's avatar
Ryan Olson committed
226

Ryan Olson's avatar
Ryan Olson committed
227
228
    /// Blocking version of [`BlockPool::touch_blocks`].
    fn touch_blocks_blocking(&self, sequence_hashes: &[SequenceHash]) -> BlockPoolResult<()>;
Ryan Olson's avatar
Ryan Olson committed
229

Ryan Olson's avatar
Ryan Olson committed
230
231
232
233
    /// Attempt to return a block to the pool. Blocks will naturally be returned to the pool when they are dropped
    /// and their reference count drops to 0; however, for testing and to synchronize the block returning to the
    /// pool, this function can be used.
    async fn try_return_block(&self, block: OwnedBlock<S, L, M>) -> BlockPoolResult<()>;
Ryan Olson's avatar
Ryan Olson committed
234

Ryan Olson's avatar
Ryan Olson committed
235
236
    /// Blocking version of [`BlockPool::try_return_block`].
    fn try_return_block_blocking(&self, block: OwnedBlock<S, L, M>) -> BlockPoolResult<()>;
Ryan Olson's avatar
Ryan Olson committed
237

Ryan Olson's avatar
Ryan Olson committed
238
    fn total_blocks(&self) -> u64;
Ryan Olson's avatar
Ryan Olson committed
239

Ryan Olson's avatar
Ryan Olson committed
240
241
    fn available_blocks(&self) -> u64;
}
242

Ryan Olson's avatar
Ryan Olson committed
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
/// State of the pool when queried.
///
/// Provides a snapshot of the pool's current state including:
/// - Active blocks currently in use
/// - Inactive blocks ordered by reuse priority
/// - Number of empty blocks
#[derive(Debug, Clone, Serialize, Deserialize, Dissolve)]
pub struct BlockPoolStatus {
    /// Active blocks currently in use
    pub active_blocks: usize,

    /// Inactive blocks ordered by reuse priority
    /// Blocks at the front of the list are more likely to be reused
    pub inactive_blocks: usize,

    /// Number of empty blocks
    pub empty_blocks: usize,
}
261

Ryan Olson's avatar
Ryan Olson committed
262
263
264
265
#[derive(Debug, Clone, Serialize, Deserialize, Dissolve)]
pub struct ResetBlocksResponse {
    /// Blocks that were reset
    pub reset_blocks: Vec<SequenceHash>,
266

Ryan Olson's avatar
Ryan Olson committed
267
268
    /// Blocks that were not found in the pool
    pub not_found: Vec<SequenceHash>,
269

Ryan Olson's avatar
Ryan Olson committed
270
271
272
    /// Blocks that were not reset
    pub not_reset: Vec<SequenceHash>,
}
273

Ryan Olson's avatar
Ryan Olson committed
274
275
276
pub trait BlockPoolController: Send + Sync {
    /// Returns the [`BlockPoolStatus`] of the pool.
    fn status_blocking(&self) -> Result<BlockPoolStatus, BlockPoolError>;
277

Ryan Olson's avatar
Ryan Olson committed
278
    /// Resets the pool to its initial state.
279
    ///
Ryan Olson's avatar
Ryan Olson committed
280
281
    /// This function will error unless all blocks have returned to the inactive pool.
    fn reset_blocking(&self) -> Result<(), BlockPoolError>;
282

Ryan Olson's avatar
Ryan Olson committed
283
284
285
286
287
288
    /// Attempt to reset a set of blocks.
    fn reset_blocks_blocking(
        &self,
        sequence_hashes: &[SequenceHash],
    ) -> Result<ResetBlocksResponse, BlockPoolError>;
}
289

Ryan Olson's avatar
Ryan Olson committed
290
291
292
293
#[async_trait::async_trait]
pub trait AsyncBlockPoolController: Send + Sync {
    /// Returns the [`BlockPoolStatus`] of the pool.
    async fn status(&self) -> Result<BlockPoolStatus, BlockPoolError>;
294

Ryan Olson's avatar
Ryan Olson committed
295
296
297
298
    /// Resets the pool to its initial state.
    ///
    /// This function will error unless all blocks have returned to the inactive pool.
    async fn reset(&self) -> Result<(), BlockPoolError>;
299

Ryan Olson's avatar
Ryan Olson committed
300
301
302
303
304
    /// Attempt to reset a set of blocks.
    async fn reset_blocks(
        &self,
        sequence_hashes: &[SequenceHash],
    ) -> Result<ResetBlocksResponse, BlockPoolError>;
Ryan Olson's avatar
Ryan Olson committed
305
}