block_manager.rs 15.5 KB
Newer Older
1
// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
Ryan Olson's avatar
Ryan Olson committed
2
3
4
5
6
7
8
9
// SPDX-License-Identifier: Apache-2.0

//! Block Manager for LLM KV Cache
//!
//! This module provides functionality for managing KV blocks in LLM attention
//! mechanisms. It handles storage allocation, block management, and safe access
//! patterns for both system memory and remote (NIXL) storage.

10
pub mod config;
Ryan Olson's avatar
Ryan Olson committed
11
12
13
mod state;

pub mod block;
Ryan Olson's avatar
Ryan Olson committed
14
15
pub mod connector;
pub mod distributed;
Ryan Olson's avatar
Ryan Olson committed
16
pub mod events;
17
pub mod kv_consolidator;
Ryan Olson's avatar
Ryan Olson committed
18
pub mod layout;
19
pub mod metrics_kvbm;
20
pub mod numa_allocator;
21
pub mod offload;
Ryan Olson's avatar
Ryan Olson committed
22
23
pub mod pool;
pub mod storage;
24
pub mod v2;
Ryan Olson's avatar
Ryan Olson committed
25

Ryan Olson's avatar
Ryan Olson committed
26
27
28
// dynamo rt integration
pub mod controller;

Ryan Olson's avatar
Ryan Olson committed
29
30
pub use crate::common::dtype::DType;
pub use block::{
31
    BasicMetadata, BlockMetadata, Blocks, ImmutableBlock, MutableBlock,
Ryan Olson's avatar
Ryan Olson committed
32
33
    locality::{self, LocalityProvider, LogicalResources},
    nixl::{BlockDescriptorList, IsImmutable, IsMutable, MutabilityKind, RemoteBlock},
Ryan Olson's avatar
Ryan Olson committed
34
35
};
pub use config::*;
Ryan Olson's avatar
Ryan Olson committed
36

37
pub use layout::{LayoutConfig, LayoutConfigBuilder, LayoutError, LayoutType, nixl::NixlLayout};
38
pub use offload::{filter::OffloadFilter, request::BlockResult};
Ryan Olson's avatar
Ryan Olson committed
39
pub use pool::{BlockPool, ManagedBlockPool};
Ryan Olson's avatar
Ryan Olson committed
40
pub use storage::{
41
42
    DeviceStorage, DiskStorage, PinnedStorage, Storage, StorageAllocator,
    nixl::NixlRegisterableStorage,
Ryan Olson's avatar
Ryan Olson committed
43
44
45
46
47
48
49
};
pub use tokio_util::sync::CancellationToken;

use anyhow::{Context, Result};
use block::nixl::{BlockMutability, NixlBlockSet, RemoteBlocks, SerializedNixlBlockSet};
use derive_builder::Builder;
use nixl_sys::Agent as NixlAgent;
Ryan Olson's avatar
Ryan Olson committed
50
use serde::{Deserialize, Serialize};
Ryan Olson's avatar
Ryan Olson committed
51
52
53
54
55
use std::{
    collections::HashMap,
    sync::{Arc, RwLock},
};
use storage::nixl::MemType;
Ryan Olson's avatar
Ryan Olson committed
56
use tokio::sync::oneshot;
Ryan Olson's avatar
Ryan Olson committed
57
58
59
60
use validator::Validate;

pub type WorkerID = u64;

Ryan Olson's avatar
Ryan Olson committed
61
pub type ReferenceBlockManager = KvBlockManager<locality::Local, BasicMetadata>;
Ryan Olson's avatar
Ryan Olson committed
62
63

/// Represents the different cache levels for KV blocks
Ryan Olson's avatar
Ryan Olson committed
64
#[derive(Copy, Clone, Debug, Eq, Hash, PartialEq, Serialize, Deserialize)]
Ryan Olson's avatar
Ryan Olson committed
65
66
67
68
69
70
71
72
73
74
75
76
77
78
pub enum CacheLevel {
    /// Represents KV blocks in GPU memory
    G1,

    /// Represents KV blocks in CPU memory
    G2,

    /// Represents KV blocks in Local NVMe storage
    G3,

    /// Represents KV blocks in Remote NVMe storage
    G4,
}

Ryan Olson's avatar
Ryan Olson committed
79
80
81
82
83
84
85
86
87
88
89
90
91
92
/// Type of channel used to reset the block manager to a specific cache level
pub type BlockResetChannel = tokio::sync::broadcast::Receiver<CacheLevel>;

#[derive(Debug)]
struct CancelOnLastDrop {
    cancellation_token: CancellationToken,
}

impl Drop for CancelOnLastDrop {
    fn drop(&mut self) {
        self.cancellation_token.cancel();
    }
}

Ryan Olson's avatar
Ryan Olson committed
93
94
95
96
97
98
99
// When we construct the pool:
// 1. instantiate the runtime,
// 2. build layout::LayoutConfigs for each of the requested storage types
// 3. register the layouts with the NIXL agent if enabled
// 4. construct a Blocks object for each layout providing a unique block_set_idx
//    for each layout type.
// 5. initialize the pools for each set of blocks
Ryan Olson's avatar
Ryan Olson committed
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
#[derive(Debug)]
pub struct KvBlockManager<Locality: LocalityProvider, Metadata: BlockMetadata> {
    state: Arc<state::KvBlockManagerState<Locality, Metadata>>,
    _cancellation_token: Arc<CancelOnLastDrop>,
    block_size: usize,
}

impl<Locality: LocalityProvider, Metadata: BlockMetadata> Clone
    for KvBlockManager<Locality, Metadata>
{
    fn clone(&self) -> Self {
        Self {
            state: self.state.clone(),
            _cancellation_token: self._cancellation_token.clone(),
            block_size: self.block_size,
        }
    }
}

impl<Locality: LocalityProvider, Metadata: BlockMetadata> KvBlockManager<Locality, Metadata> {
    /// Get the block size
    pub fn block_size(&self) -> usize {
        self.block_size
    }

    /// Get a reference to the disk block pool
    pub fn disk(&self) -> Option<&dyn BlockPool<DiskStorage, Locality, Metadata>> {
        self.state.disk()
    }

    /// Get a reference to the host block pool
    pub fn host(&self) -> Option<&dyn BlockPool<PinnedStorage, Locality, Metadata>> {
        self.state.host()
    }

    /// Get a reference to the device block pool
    pub fn device(&self) -> Option<&dyn BlockPool<DeviceStorage, Locality, Metadata>> {
        self.state.device()
    }

    /// Get the worker ID
    pub fn worker_id(&self) -> WorkerID {
        self.state.worker_id()
    }

    /// Onboard a set of blocks to the device pool
    pub fn onboard_blocks<S: Storage>(
        &self,
        blocks: Vec<ImmutableBlock<S, Locality, Metadata>>,
        targets: Option<Vec<MutableBlock<DeviceStorage, Locality, Metadata>>>,
    ) -> oneshot::Receiver<BlockResult<DeviceStorage, Locality, Metadata>> {
        self.state.onboard_blocks(blocks, targets)
    }
Ryan Olson's avatar
Ryan Olson committed
153
154
}

Ryan Olson's avatar
Ryan Olson committed
155
156
157
158
159
160
161
162
163
164
165
166
fn build_cancel_token(config: &mut KvBlockManagerConfig) -> Arc<CancelOnLastDrop> {
    // The frontend of the KvBlockManager will take ownership of the cancellation token
    // and will be responsible for cancelling the task when the KvBlockManager is dropped
    let cancellation_token = config.runtime.cancellation_token.clone();

    // The internal state will use a child token of the original token
    config.runtime.cancellation_token = cancellation_token.child_token();

    Arc::new(CancelOnLastDrop { cancellation_token })
}

impl<Metadata: BlockMetadata> KvBlockManager<locality::Local, Metadata> {
Ryan Olson's avatar
Ryan Olson committed
167
168
169
170
171
    /// Create a new [KvBlockManager]
    ///
    /// The returned object is a frontend to the [KvBlockManager] which owns the cancellation
    /// tokens. When this object gets drop, the cancellation token will be cancelled and begin
    /// the gracefully shutdown of the block managers internal state.
Ryan Olson's avatar
Ryan Olson committed
172
173
    pub async fn new(mut config: KvBlockManagerConfig) -> Result<Self> {
        let _cancellation_token = build_cancel_token(&mut config);
Ryan Olson's avatar
Ryan Olson committed
174

Ryan Olson's avatar
Ryan Olson committed
175
        let block_size = config.model.page_size;
Ryan Olson's avatar
Ryan Olson committed
176
177

        // Create the internal state
Ryan Olson's avatar
Ryan Olson committed
178
        let state = state::KvBlockManagerState::<locality::Local, Metadata>::new(config).await?;
Ryan Olson's avatar
Ryan Olson committed
179
180
181

        Ok(Self {
            state,
Ryan Olson's avatar
Ryan Olson committed
182
183
            _cancellation_token,
            block_size,
Ryan Olson's avatar
Ryan Olson committed
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
        })
    }

    /// Exports the local blockset configuration as a serialized object.
    pub fn export_local_blockset(&self) -> Result<SerializedNixlBlockSet> {
        self.state.export_local_blockset()
    }

    /// Imports a remote blockset configuration from a serialized object.
    pub fn import_remote_blockset(
        &self,
        serialized_blockset: SerializedNixlBlockSet,
    ) -> Result<()> {
        self.state.import_remote_blockset(serialized_blockset)
    }

    /// Get a [`Vec<RemoteBlock<IsImmutable>>`] from a [`BlockDescriptorList`]
    pub fn get_remote_blocks_immutable(
        &self,
        bds: &BlockDescriptorList,
    ) -> Result<Vec<RemoteBlock<IsImmutable>>> {
        self.state.get_remote_blocks_immutable(bds)
    }

    /// Get a [`Vec<RemoteBlock<IsMutable>>`] from a [`BlockDescriptorList`]
    pub fn get_remote_blocks_mutable(
        &self,
        bds: &BlockDescriptorList,
    ) -> Result<Vec<RemoteBlock<IsMutable>>> {
        self.state.get_remote_blocks_mutable(bds)
    }
Ryan Olson's avatar
Ryan Olson committed
215
}
Ryan Olson's avatar
Ryan Olson committed
216

Ryan Olson's avatar
Ryan Olson committed
217
218
219
impl<R: LogicalResources, Metadata: BlockMetadata> KvBlockManager<locality::Logical<R>, Metadata> {
    pub async fn new(mut config: KvBlockManagerConfig, logical_resources: R) -> Result<Self> {
        let block_size = config.model.page_size;
Ryan Olson's avatar
Ryan Olson committed
220

Ryan Olson's avatar
Ryan Olson committed
221
        let _cancellation_token = build_cancel_token(&mut config);
Ryan Olson's avatar
Ryan Olson committed
222

Ryan Olson's avatar
Ryan Olson committed
223
224
225
226
227
        let state = state::KvBlockManagerState::<locality::Logical<R>, Metadata>::new(
            config,
            logical_resources,
        )
        .await?;
228

Ryan Olson's avatar
Ryan Olson committed
229
230
231
232
233
        Ok(Self {
            state,
            _cancellation_token,
            block_size,
        })
Ryan Olson's avatar
Ryan Olson committed
234
235
236
237
238
    }
}

#[cfg(all(test, feature = "testing-full"))]
mod tests {
Ryan Olson's avatar
Ryan Olson committed
239

Ryan Olson's avatar
Ryan Olson committed
240
241
    use super::*;

242
    use crate::tokens::Tokens;
Ryan Olson's avatar
Ryan Olson committed
243
244
245
246
247
    use std::sync::atomic::{AtomicU64, Ordering};

    // Atomic Counter for Worker ID
    static WORKER_ID: AtomicU64 = AtomicU64::new(1337);

Ryan Olson's avatar
Ryan Olson committed
248
249
250
251
252
    pub fn create_reference_block_manager_config_with_counts(
        device: usize,
        host: usize,
        disk: usize,
    ) -> KvBlockManagerConfig {
Ryan Olson's avatar
Ryan Olson committed
253
        let worker_id = WORKER_ID.fetch_add(1, Ordering::SeqCst);
254
255
256
257
258
259
260
261
262

        // Check if we're already in a Tokio runtime context
        let async_runtime = if tokio::runtime::Handle::try_current().is_ok() {
            None // If we're already in a runtime, don't create a new one
        } else {
            // Only create a new runtime if not already in one
            Some(Arc::new(tokio::runtime::Runtime::new().unwrap()))
        };

Ryan Olson's avatar
Ryan Olson committed
263
        let builder = KvBlockManagerConfig::builder()
Ryan Olson's avatar
Ryan Olson committed
264
265
266
            .runtime(
                KvManagerRuntimeConfig::builder()
                    .worker_id(worker_id)
267
                    .enable_nixl()
268
                    .async_runtime(async_runtime)
Ryan Olson's avatar
Ryan Olson committed
269
270
271
272
273
274
                    .build()
                    .unwrap(),
            )
            .model(
                KvManagerModelConfig::builder()
                    .num_layers(3)
275
                    .outer_dim(2)
Ryan Olson's avatar
Ryan Olson committed
276
277
278
279
                    .page_size(4)
                    .inner_dim(16)
                    .build()
                    .unwrap(),
Ryan Olson's avatar
Ryan Olson committed
280
281
282
283
            );

        let builder = if disk > 0 {
            builder.disk_layout(
284
                KvManagerLayoutConfig::builder()
Ryan Olson's avatar
Ryan Olson committed
285
                    .num_blocks(disk)
286
                    .allocator(storage::DiskAllocator::default())
287
288
289
                    .build()
                    .unwrap(),
            )
Ryan Olson's avatar
Ryan Olson committed
290
291
292
293
294
295
        } else {
            builder
        };

        let builder = if host > 0 {
            builder.host_layout(
Ryan Olson's avatar
Ryan Olson committed
296
                KvManagerLayoutConfig::builder()
Ryan Olson's avatar
Ryan Olson committed
297
                    .num_blocks(host)
Ryan Olson's avatar
Ryan Olson committed
298
299
300
301
                    .allocator(storage::PinnedAllocator::default())
                    .build()
                    .unwrap(),
            )
Ryan Olson's avatar
Ryan Olson committed
302
303
304
305
306
307
        } else {
            builder
        };

        let builder = if device > 0 {
            builder.device_layout(
Ryan Olson's avatar
Ryan Olson committed
308
                KvManagerLayoutConfig::builder()
Ryan Olson's avatar
Ryan Olson committed
309
                    .num_blocks(device)
Ryan Olson's avatar
Ryan Olson committed
310
311
312
313
                    .allocator(storage::DeviceAllocator::new(0).unwrap())
                    .build()
                    .unwrap(),
            )
Ryan Olson's avatar
Ryan Olson committed
314
315
316
        } else {
            builder
        };
Ryan Olson's avatar
Ryan Olson committed
317

Ryan Olson's avatar
Ryan Olson committed
318
        builder.build().unwrap()
Ryan Olson's avatar
Ryan Olson committed
319
320
    }

Ryan Olson's avatar
Ryan Olson committed
321
322
    pub fn create_reference_block_manager_config() -> KvBlockManagerConfig {
        create_reference_block_manager_config_with_counts(8, 16, 16)
Ryan Olson's avatar
Ryan Olson committed
323
324
    }

Ryan Olson's avatar
Ryan Olson committed
325
326
327
328
329
330
331
332
    pub async fn create_reference_block_manager() -> ReferenceBlockManager {
        ReferenceBlockManager::new(create_reference_block_manager_config())
            .await
            .unwrap()
    }

    #[tokio::test]
    async fn test_reference_block_manager_inherited_async_runtime() {
Ryan Olson's avatar
Ryan Olson committed
333
        dynamo_runtime::logging::init();
Ryan Olson's avatar
Ryan Olson committed
334
        let _block_manager = create_reference_block_manager().await;
Ryan Olson's avatar
Ryan Olson committed
335
336
337
338
339
340
341
342
    }

    // This tests mimics the behavior of two unique kvbm workers exchanging blocksets
    // Each KvBlockManager is a unique worker in this test, each has its resources including
    // it's own worker_ids, nixl_agent, and block pools.
    //
    // This test is meant to mimic the behavior of the basic nixl integration test found here:
    // https://github.com/ai-dynamo/nixl/blob/main/src/bindings/rust/src/tests.rs
Ryan Olson's avatar
Ryan Olson committed
343
344
    // TODO: This test doesn't work because NIXL doesn't support partial metadata in the rust bindings.
    #[ignore]
Ryan Olson's avatar
Ryan Olson committed
345
346
347
348
349
    #[tokio::test]
    async fn test_reference_block_managers() {
        dynamo_runtime::logging::init();

        // create two block managers - mimics two unique dynamo workers
Ryan Olson's avatar
Ryan Olson committed
350
351
        let kvbm_0 = create_reference_block_manager().await;
        let kvbm_1 = create_reference_block_manager().await;
Ryan Olson's avatar
Ryan Olson committed
352
353
354
355
356
357
358
359
360
361
362
363
364

        assert_ne!(kvbm_0.worker_id(), kvbm_1.worker_id());

        // in dynamo, we would exchange the blocksets via the discovery plane
        let blockset_0 = kvbm_0.export_local_blockset().unwrap();
        let blockset_1 = kvbm_1.export_local_blockset().unwrap();

        // in dynamo, we would be watching the discovery plane for remote blocksets
        kvbm_0.import_remote_blockset(blockset_1).unwrap();
        kvbm_1.import_remote_blockset(blockset_0).unwrap();

        // Worker 0
        // Allocate 4 mutable blocks on the host
Ryan Olson's avatar
Ryan Olson committed
365
        let _blocks_0 = kvbm_0.host().unwrap().allocate_blocks(4).await.unwrap();
Ryan Olson's avatar
Ryan Olson committed
366

Ryan Olson's avatar
Ryan Olson committed
367
368
369
        // // Create a BlockDescriptorList for the mutable blocks
        // // let blockset_0 = BlockDescriptorList::from_mutable_blocks(&blocks_0).unwrap();
        // let blockset_0 = blocks_0.as_block_descriptor_set().unwrap();
Ryan Olson's avatar
Ryan Olson committed
370

Ryan Olson's avatar
Ryan Olson committed
371
372
373
374
        // // Worker 1
        // // Create a RemoteBlock list from blockset_0
        // let _blocks_1 = kvbm_1.host().unwrap().allocate_blocks(4).await.unwrap();
        // let mut _remote_blocks_0 = kvbm_1.get_remote_blocks_mutable(&blockset_0).unwrap();
Ryan Olson's avatar
Ryan Olson committed
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411

        // TODO(#967) - Enable with TransferEngine

        // // Create a TransferRequestPut for the mutable blocks
        // let transfer_request = TransferRequestPut::new(&blocks_0, &mut remote_blocks_0).unwrap();

        // // Validate blocks - this could be an expensive operation
        // // TODO: Create an ENV trigger debug flag which will call this on every transfer request
        // // In this case, we expect an error because we have overlapping blocks as we are sending to/from the same blocks
        // // because we are using the wrong target (artifact of the test setup allowing variable to cross what woudl be
        // // worker boundaries)
        // assert!(transfer_request.validate_blocks().is_err());

        // // This is proper request - PUT from worker 1 (local) to worker 0 (remote)
        // let transfer_request = TransferRequestPut::new(&blocks_1, &mut remote_blocks_0).unwrap();
        // assert!(transfer_request.validate_blocks().is_ok());

        // // Execute the transfer request
        // transfer_request.execute().unwrap();

        // let mut put_request = PutRequestBuilder::<_, _>::builder();

        // put_request.from(&blocks_1).to(&mut remote_blocks_0);

        // // Create a Put request direct between two local blocks
        // // split the blocks into two vecs each with 2 blocks
        // let mut blocks_1 = blocks_1;

        // let slice_0 = blocks_1.split_off(2);
        // let mut slice_1 = blocks_1;

        // let transfer_request = TransferRequestPut::new(&slice_0, &mut slice_1).unwrap();
        // assert!(transfer_request.validate_blocks().is_ok());

        // // Execute the transfer request
        // transfer_request.execute().unwrap();
    }
412
413
414
415
416

    #[tokio::test]
    async fn test_offload() -> Result<()> {
        dynamo_runtime::logging::init();

Ryan Olson's avatar
Ryan Olson committed
417
        let block_manager = create_reference_block_manager().await;
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436

        let device = block_manager.device().unwrap();

        let tokens = Tokens::from(vec![1, 2, 3, 4]);
        let token_sequence = tokens.into_sequence(4, Some(0));
        let token_block = token_sequence.blocks().first().unwrap();

        let mut device_block = device.allocate_blocks(1).await?.into_iter().next().unwrap();
        device_block.apply_token_block(token_block.clone())?;

        let immutable_device_blocks = device.register_blocks(vec![device_block]).await.unwrap();
        assert_eq!(immutable_device_blocks.len(), 1);

        tokio::time::sleep(std::time::Duration::from_millis(100)).await;

        // It should now be on host and disk.
        let host_blocks = block_manager
            .host()
            .unwrap()
Ryan Olson's avatar
Ryan Olson committed
437
            .match_sequence_hashes(vec![immutable_device_blocks[0].sequence_hash()].as_slice())
438
439
440
441
442
443
444
            .await
            .unwrap();
        assert_eq!(host_blocks.len(), 1);

        let disk_blocks = block_manager
            .disk()
            .unwrap()
Ryan Olson's avatar
Ryan Olson committed
445
            .match_sequence_hashes(vec![immutable_device_blocks[0].sequence_hash()].as_slice())
446
447
448
449
450
451
            .await
            .unwrap();
        assert_eq!(disk_blocks.len(), 1);

        Ok(())
    }
Ryan Olson's avatar
Ryan Olson committed
452
}