pool.rs 10.8 KB
Newer Older
Ryan Olson's avatar
Ryan Olson committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

Ryan Olson's avatar
Ryan Olson committed
16
17
18
pub mod managed;
pub use managed::ManagedBlockPool;

Ryan Olson's avatar
Ryan Olson committed
19
20
use derive_builder::Builder;
use derive_getters::Dissolve;
Ryan Olson's avatar
Ryan Olson committed
21
use serde::{Deserialize, Serialize};
Ryan Olson's avatar
Ryan Olson committed
22
23
24
25

pub use super::block::{ImmutableBlock, MutableBlock};

use super::block::{
Ryan Olson's avatar
Ryan Olson committed
26
27
    nixl::short_type_name, private, registry::BlockRegistry, Block, BlockError, BlockMetadata,
    GlobalRegistry, MaybeReturnableBlock,
Ryan Olson's avatar
Ryan Olson committed
28
29
};
use super::events::{EventManager, NullEventManager};
30
use super::metrics::{BlockManagerMetrics, PoolMetrics};
Ryan Olson's avatar
Ryan Olson committed
31
32
use super::storage::Storage;

Ryan Olson's avatar
Ryan Olson committed
33
34
use crate::block_manager::block::locality::LocalityProvider;
use crate::block_manager::CacheLevel;
Ryan Olson's avatar
Ryan Olson committed
35
36
use crate::tokens::{SequenceHash, TokenBlock};

Ryan Olson's avatar
Ryan Olson committed
37
use async_trait::async_trait;
38
use prometheus::Registry;
Ryan Olson's avatar
Ryan Olson committed
39
use std::sync::atomic::{AtomicU64, Ordering};
Ryan Olson's avatar
Ryan Olson committed
40
41
42
43
use std::{
    collections::{BTreeSet, HashMap, VecDeque},
    sync::{Arc, Weak},
};
44
use tokio::runtime::Handle;
Ryan Olson's avatar
Ryan Olson committed
45
use tokio::sync::oneshot;
Ryan Olson's avatar
Ryan Olson committed
46
47
48
49
use tokio_util::sync::CancellationToken;

use dynamo_runtime::Result;

Ryan Olson's avatar
Ryan Olson committed
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
// Type aliases to reduce complexity across the module
type BlockPoolResult<T> = Result<T, BlockPoolError>;
type AsyncResponse<T> = Result<oneshot::Receiver<T>, BlockPoolError>;

// Collection type aliases
pub type MutableBlocks<S, L, M> = Vec<MutableBlock<S, L, M>>;
pub type ImmutableBlocks<S, L, M> = Vec<ImmutableBlock<S, L, M>>;

/// Enum representing either a mutable or immutable block that can be returned to the pool
#[derive(Debug)]
pub enum OwnedBlock<S: Storage, L: LocalityProvider, M: BlockMetadata> {
    Mutable(MutableBlock<S, L, M>),
    Immutable(ImmutableBlock<S, L, M>),
}

impl<S: Storage, L: LocalityProvider, M: BlockMetadata> MaybeReturnableBlock<S, L, M>
    for OwnedBlock<S, L, M>
{
    fn is_returnable(&self) -> bool {
        match self {
            OwnedBlock::Mutable(block) => block.is_returnable(),
            OwnedBlock::Immutable(block) => block.is_returnable(),
        }
    }

    fn try_take_block(self, token: private::PrivateToken) -> Option<Vec<Block<S, L, M>>> {
        match self {
            OwnedBlock::Mutable(block) => block.try_take_block(token),
            OwnedBlock::Immutable(block) => block.try_take_block(token),
        }
    }
}

impl<S: Storage, L: LocalityProvider, M: BlockMetadata> From<MutableBlock<S, L, M>>
    for OwnedBlock<S, L, M>
{
    fn from(block: MutableBlock<S, L, M>) -> Self {
        OwnedBlock::Mutable(block)
    }
}

impl<S: Storage, L: LocalityProvider, M: BlockMetadata> From<ImmutableBlock<S, L, M>>
    for OwnedBlock<S, L, M>
{
    fn from(block: ImmutableBlock<S, L, M>) -> Self {
        OwnedBlock::Immutable(block)
    }
}

Ryan Olson's avatar
Ryan Olson committed
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
#[derive(Debug, thiserror::Error)]
pub enum BlockPoolError {
    #[error("Block is not complete")]
    BlockNotComplete,

    #[error("Not enough blocks available, requested: {0}, available: {1}")]
    NotEnoughBlocksAvailable(usize, usize),

    #[error("Invalid MutableBlock: {0}")]
    InvalidMutableBlock(String),

    #[error("Failed to register block: {0}")]
    FailedToRegisterBlock(String),

    #[error("Progress engine shutdown")]
    ProgressEngineShutdown,

    #[error(transparent)]
    BlockError(#[from] BlockError),

Ryan Olson's avatar
Ryan Olson committed
119
120
    #[error("Reset error: {0}")]
    ResetError(String),
Ryan Olson's avatar
Ryan Olson committed
121

Ryan Olson's avatar
Ryan Olson committed
122
123
    #[error("Block is not returnable")]
    NotReturnable,
124

Ryan Olson's avatar
Ryan Olson committed
125
126
    #[error("Unsupported cache level: {0:?}")]
    UnsupportedCacheLevel(CacheLevel),
127

Ryan Olson's avatar
Ryan Olson committed
128
129
    #[error("No blocks to register")]
    NoBlocksToRegister,
Ryan Olson's avatar
Ryan Olson committed
130
131
}

Ryan Olson's avatar
Ryan Olson committed
132
133
134
135
136
137
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum BlockRegistrationDuplicationSetting {
    /// On registration, if duplication is allowed, blocks with duplicate hashes cannot be registered directly,
    /// but instead can be held live with a strong arc to the primary block. This maintains the lifetime of
    /// the duplicate block.
    Allowed,
Ryan Olson's avatar
Ryan Olson committed
138

Ryan Olson's avatar
Ryan Olson committed
139
140
141
142
143
144
145
146
    /// On registration, if duplication is disabled, blocks with duplicate hashes will be returned immediately
    /// to the inactive pool and the primary block, the one first registered, will be returned to the caller,
    /// replacing the duplicate block.
    ///
    /// Note: If block duplication is disabled, then the implementation must always respect the fact that the
    /// mutable block that was registered, may not be the same block returned by the registration function, and
    /// thus be able to update any references that wish to use the block after registration.
    Disabled,
Ryan Olson's avatar
Ryan Olson committed
147
148
}

Ryan Olson's avatar
Ryan Olson committed
149
/// Generic request-response pattern for background task communication
Ryan Olson's avatar
Ryan Olson committed
150
#[derive(Dissolve)]
Ryan Olson's avatar
Ryan Olson committed
151
152
153
pub struct RequestResponse<Req, Resp> {
    pub request: Req,
    pub response_tx: oneshot::Sender<Resp>,
Ryan Olson's avatar
Ryan Olson committed
154
155
}

Ryan Olson's avatar
Ryan Olson committed
156
157
158
impl<Req, Resp> RequestResponse<Req, Resp> {
    /// Create a new request-response pair
    pub fn new(request: Req) -> (Self, oneshot::Receiver<Resp>) {
Ryan Olson's avatar
Ryan Olson committed
159
160
161
162
163
164
165
166
167
168
169
        let (response_tx, response_rx) = oneshot::channel();
        (
            Self {
                request,
                response_tx,
            },
            response_rx,
        )
    }
}

Ryan Olson's avatar
Ryan Olson committed
170
171
172
173
174
#[async_trait]
pub trait BlockPool<S: Storage, L: LocalityProvider, M: BlockMetadata>:
    BlockPoolController + AsyncBlockPoolController + Send + Sync
{
    /// Add a vector of [`Block`]s to the pool.
Ryan Olson's avatar
Ryan Olson committed
175
176
177
178
179
180
181
182
    ///
    /// These blocks are typically created from a [`super::block::Blocks`]
    /// and represent the initial set of available cache blocks.
    /// Blocks added this way are initially reset.
    ///
    /// # Arguments
    ///
    /// * `blocks` - A [`Vec<Block<S, M>>`] to add to the inactive pool.
Ryan Olson's avatar
Ryan Olson committed
183
    async fn add_blocks(&self, blocks: Vec<Block<S, L, M>>) -> BlockPoolResult<()>;
Ryan Olson's avatar
Ryan Olson committed
184
185

    /// Blocking version of [`BlockPool::add_blocks`].
Ryan Olson's avatar
Ryan Olson committed
186
    fn add_blocks_blocking(&self, blocks: Vec<Block<S, L, M>>) -> BlockPoolResult<()>;
Ryan Olson's avatar
Ryan Olson committed
187

Ryan Olson's avatar
Ryan Olson committed
188
    /// Allocate a specified number of free blocks from the pool.
Ryan Olson's avatar
Ryan Olson committed
189
190
191
192
193
194
195
196
197
198
    ///
    /// # Arguments
    ///
    /// * `count` - The number of blocks to allocate.
    ///
    /// # Returns
    ///
    /// A [`Result`] containing:
    /// - `Ok(Vec<MutableBlock<S, M>>)`: If successful, a vector of allocated mutable blocks.
    /// - `Err(BlockPoolError)`: If not enough blocks are available in the inactive pool.
Ryan Olson's avatar
Ryan Olson committed
199
    async fn allocate_blocks(&self, count: usize) -> BlockPoolResult<MutableBlocks<S, L, M>>;
Ryan Olson's avatar
Ryan Olson committed
200
201

    /// Blocking version of [`BlockPool::allocate_blocks`].
Ryan Olson's avatar
Ryan Olson committed
202
    fn allocate_blocks_blocking(&self, count: usize) -> BlockPoolResult<MutableBlocks<S, L, M>>;
Ryan Olson's avatar
Ryan Olson committed
203

Ryan Olson's avatar
Ryan Olson committed
204
205
    /// Register a vector of [`MutableBlock`]s with the pool.
    async fn register_blocks(
Ryan Olson's avatar
Ryan Olson committed
206
        &self,
Ryan Olson's avatar
Ryan Olson committed
207
208
        blocks: Vec<MutableBlock<S, L, M>>,
    ) -> BlockPoolResult<ImmutableBlocks<S, L, M>>;
Ryan Olson's avatar
Ryan Olson committed
209
210

    /// Blocking version of [`BlockPool::register_blocks`].
Ryan Olson's avatar
Ryan Olson committed
211
    fn register_blocks_blocking(
Ryan Olson's avatar
Ryan Olson committed
212
        &self,
Ryan Olson's avatar
Ryan Olson committed
213
214
        blocks: Vec<MutableBlock<S, L, M>>,
    ) -> BlockPoolResult<ImmutableBlocks<S, L, M>>;
Ryan Olson's avatar
Ryan Olson committed
215

Ryan Olson's avatar
Ryan Olson committed
216
    /// Match a set of [`SequenceHash`]s to existing blocks in the pool.
Ryan Olson's avatar
Ryan Olson committed
217
218
219
    ///
    /// # Arguments
    ///
Ryan Olson's avatar
Ryan Olson committed
220
    /// * `sequence_hashes` - A [`Vec<SequenceHash>`] to match.
Ryan Olson's avatar
Ryan Olson committed
221
222
223
224
    ///
    /// # Returns
    ///
    /// An [`Option<ImmutableBlock<S, M>>`] containing the shared block if found, otherwise `None`.
Ryan Olson's avatar
Ryan Olson committed
225
    async fn match_sequence_hashes(
Ryan Olson's avatar
Ryan Olson committed
226
227
        &self,
        sequence_hashes: &[SequenceHash],
Ryan Olson's avatar
Ryan Olson committed
228
    ) -> BlockPoolResult<ImmutableBlocks<S, L, M>>;
Ryan Olson's avatar
Ryan Olson committed
229
230

    /// Blocking version of [`BlockPool::match_sequence_hashes`].
Ryan Olson's avatar
Ryan Olson committed
231
    fn match_sequence_hashes_blocking(
Ryan Olson's avatar
Ryan Olson committed
232
233
        &self,
        sequence_hashes: &[SequenceHash],
Ryan Olson's avatar
Ryan Olson committed
234
    ) -> BlockPoolResult<ImmutableBlocks<S, L, M>>;
Ryan Olson's avatar
Ryan Olson committed
235

Ryan Olson's avatar
Ryan Olson committed
236
237
    /// Touch a set of blocks. Equivalent to registering and then immediately dropping.
    async fn touch_blocks(&self, sequence_hashes: &[SequenceHash]) -> BlockPoolResult<()>;
Ryan Olson's avatar
Ryan Olson committed
238

Ryan Olson's avatar
Ryan Olson committed
239
240
    /// Blocking version of [`BlockPool::touch_blocks`].
    fn touch_blocks_blocking(&self, sequence_hashes: &[SequenceHash]) -> BlockPoolResult<()>;
Ryan Olson's avatar
Ryan Olson committed
241

Ryan Olson's avatar
Ryan Olson committed
242
243
244
245
    /// Attempt to return a block to the pool. Blocks will naturally be returned to the pool when they are dropped
    /// and their reference count drops to 0; however, for testing and to synchronize the block returning to the
    /// pool, this function can be used.
    async fn try_return_block(&self, block: OwnedBlock<S, L, M>) -> BlockPoolResult<()>;
Ryan Olson's avatar
Ryan Olson committed
246

Ryan Olson's avatar
Ryan Olson committed
247
248
    /// Blocking version of [`BlockPool::try_return_block`].
    fn try_return_block_blocking(&self, block: OwnedBlock<S, L, M>) -> BlockPoolResult<()>;
Ryan Olson's avatar
Ryan Olson committed
249

Ryan Olson's avatar
Ryan Olson committed
250
    fn total_blocks(&self) -> u64;
Ryan Olson's avatar
Ryan Olson committed
251

Ryan Olson's avatar
Ryan Olson committed
252
253
    fn available_blocks(&self) -> u64;
}
254

Ryan Olson's avatar
Ryan Olson committed
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
/// State of the pool when queried.
///
/// Provides a snapshot of the pool's current state including:
/// - Active blocks currently in use
/// - Inactive blocks ordered by reuse priority
/// - Number of empty blocks
#[derive(Debug, Clone, Serialize, Deserialize, Dissolve)]
pub struct BlockPoolStatus {
    /// Active blocks currently in use
    pub active_blocks: usize,

    /// Inactive blocks ordered by reuse priority
    /// Blocks at the front of the list are more likely to be reused
    pub inactive_blocks: usize,

    /// Number of empty blocks
    pub empty_blocks: usize,
}
273

Ryan Olson's avatar
Ryan Olson committed
274
275
276
277
#[derive(Debug, Clone, Serialize, Deserialize, Dissolve)]
pub struct ResetBlocksResponse {
    /// Blocks that were reset
    pub reset_blocks: Vec<SequenceHash>,
278

Ryan Olson's avatar
Ryan Olson committed
279
280
    /// Blocks that were not found in the pool
    pub not_found: Vec<SequenceHash>,
281

Ryan Olson's avatar
Ryan Olson committed
282
283
284
    /// Blocks that were not reset
    pub not_reset: Vec<SequenceHash>,
}
285

Ryan Olson's avatar
Ryan Olson committed
286
287
288
pub trait BlockPoolController: Send + Sync {
    /// Returns the [`BlockPoolStatus`] of the pool.
    fn status_blocking(&self) -> Result<BlockPoolStatus, BlockPoolError>;
289

Ryan Olson's avatar
Ryan Olson committed
290
    /// Resets the pool to its initial state.
291
    ///
Ryan Olson's avatar
Ryan Olson committed
292
293
    /// This function will error unless all blocks have returned to the inactive pool.
    fn reset_blocking(&self) -> Result<(), BlockPoolError>;
294

Ryan Olson's avatar
Ryan Olson committed
295
296
297
298
299
300
    /// Attempt to reset a set of blocks.
    fn reset_blocks_blocking(
        &self,
        sequence_hashes: &[SequenceHash],
    ) -> Result<ResetBlocksResponse, BlockPoolError>;
}
301

Ryan Olson's avatar
Ryan Olson committed
302
303
304
305
#[async_trait::async_trait]
pub trait AsyncBlockPoolController: Send + Sync {
    /// Returns the [`BlockPoolStatus`] of the pool.
    async fn status(&self) -> Result<BlockPoolStatus, BlockPoolError>;
306

Ryan Olson's avatar
Ryan Olson committed
307
308
309
310
    /// Resets the pool to its initial state.
    ///
    /// This function will error unless all blocks have returned to the inactive pool.
    async fn reset(&self) -> Result<(), BlockPoolError>;
311

Ryan Olson's avatar
Ryan Olson committed
312
313
314
315
316
    /// Attempt to reset a set of blocks.
    async fn reset_blocks(
        &self,
        sequence_hashes: &[SequenceHash],
    ) -> Result<ResetBlocksResponse, BlockPoolError>;
Ryan Olson's avatar
Ryan Olson committed
317
}