mod.rs 14.9 KB
Newer Older
Ryan Olson's avatar
Ryan Olson committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

//! Object storage module for distributed block management.
//!
//! This module provides traits and implementations for storing KV cache blocks
//! in object storage systems like S3/MinIO.
//!
//! # Architecture
//!
//! Traits are defined here; implementations are in feature-gated submodules:
//! - [`ObjectBlockOps`](crate::object::ObjectBlockOps) - High-level block operations (put, get, has)
//! - [`ObjectLockManager`](crate::object::ObjectLockManager) - Distributed locking for coordinated offloads
//!
//! Consumers should use factory functions to obtain trait objects without
//! depending on specific feature flags.

use std::sync::Arc;

use anyhow::Result;
use futures::future::BoxFuture;

use crate::{BlockId, SequenceHash};
use kvbm_common::LogicalLayoutHandle;
use kvbm_physical::layout::LayoutConfig;
use kvbm_physical::transfer::PhysicalLayout;

#[cfg(feature = "s3")]
pub mod s3;

// ============================================================================
// Key Formatting
// ============================================================================

/// Trait for converting SequenceHash to object storage keys.
///
/// Implementations can embed rank, namespace, or any other prefix/suffix
/// to ensure key uniqueness across SPMD workers or other contexts.
pub trait KeyFormatter: Send + Sync {
    /// Convert a sequence hash to an object storage key string.
    fn format_key(&self, hash: &SequenceHash) -> String;
}

/// Default key formatter - uses Display representation of PositionalLineageHash.
///
/// Produces keys like: `0:abc123` or `5:abc123:def456` (position:current\[:parent\])
/// using base58 encoding for hash fragments.
/// Suitable for single-worker scenarios or testing.
#[derive(Debug, Clone, Default)]
pub struct DefaultKeyFormatter;

impl KeyFormatter for DefaultKeyFormatter {
    fn format_key(&self, hash: &SequenceHash) -> String {
        hash.to_string()
    }
}

/// Rank-prefixed key formatter for SPMD workers.
///
/// Formats keys as `{rank}/{display_hash}` to ensure uniqueness across workers
/// writing the same logical blocks. The hash uses the Display representation
/// (e.g., `0/5:abc123:def456`).
#[derive(Debug, Clone)]
pub struct RankPrefixedKeyFormatter {
    rank: usize,
}

impl RankPrefixedKeyFormatter {
    /// Create a new rank-prefixed formatter.
    pub fn new(rank: usize) -> Self {
        Self { rank }
    }

    /// Get the rank.
    pub fn rank(&self) -> usize {
        self.rank
    }
}

impl KeyFormatter for RankPrefixedKeyFormatter {
    fn format_key(&self, hash: &SequenceHash) -> String {
        format!("{}/{}", self.rank, hash)
    }
}

/// Create a key formatter appropriate for the given rank.
///
/// Returns a `RankPrefixedKeyFormatter` if rank is provided,
/// otherwise returns a `DefaultKeyFormatter`.
pub fn create_key_formatter(rank: Option<usize>) -> Arc<dyn KeyFormatter> {
    match rank {
        Some(r) => Arc::new(RankPrefixedKeyFormatter::new(r)),
        None => Arc::new(DefaultKeyFormatter),
    }
}

/// Extension methods for LayoutConfig to support object storage operations.
pub trait LayoutConfigExt {
    /// Compute the size of a single block in bytes.
    fn block_size_bytes(&self) -> usize;

    /// Compute the size of a single memory region in bytes.
    fn region_size(&self) -> usize;
}

impl LayoutConfigExt for LayoutConfig {
    fn block_size_bytes(&self) -> usize {
        self.num_layers
            .saturating_mul(self.outer_dim)
            .saturating_mul(self.page_size)
            .saturating_mul(self.inner_dim)
            .saturating_mul(self.dtype_width_bytes)
    }

    fn region_size(&self) -> usize {
        self.page_size
            .saturating_mul(self.inner_dim)
            .saturating_mul(self.dtype_width_bytes)
    }
}

/// Low-level object storage client trait.
pub trait ObjectClient: Send + Sync {
    /// Check if an object exists.
    fn has_object(&self, key: &[u8]) -> anyhow::Result<bool>;

    /// Put an object.
    fn put_object(&self, key: &[u8], data: &[&[u8]]) -> anyhow::Result<()>;

    /// Get an object.
    fn get_object(&self, key: &[u8], data: &mut [&mut [u8]]) -> anyhow::Result<()>;
}

/// Unified object block operations trait.
///
/// This trait provides high-level operations for storing and retrieving
/// KV cache blocks in object storage (e.g., S3, MinIO).
///
/// Uses `LogicalLayoutHandle` to identify source/destination layouts. In distributed
/// mode, workers resolve the logical handle to their own physical layouts. This allows
/// the leader (which doesn't have physical layouts) to use the same trait.
///
/// Uses `'static` BoxFuture for runtime flexibility - implementations clone/Arc
/// what they need from self. Takes owned Vecs for simplicity; keys are returned
/// in results so callers can correlate success/failure.
///
/// Implemented by:
/// - `S3ObjectBlockClient` - direct S3 operations (has_blocks only; put/get require physical layout)
/// - `DirectWorker` - resolves logical handle to physical layout, then delegates
/// - `CoordinatedWorker` - delegates to inner worker
/// - `LeaderObjectClient` - coordinates workers for distributed uploads
pub trait ObjectBlockOps: Send + Sync {
    /// Check if blocks exist in object storage.
    ///
    /// Returns a vector of (hash, size_option) pairs where:
    /// - Some(size) indicates the block exists with the given size in bytes
    /// - None indicates the block does not exist or an error occurred
    fn has_blocks(
        &self,
        keys: Vec<SequenceHash>,
    ) -> BoxFuture<'static, Vec<(SequenceHash, Option<usize>)>>;

    /// Put blocks to object storage.
    ///
    /// # Arguments
    /// * `keys` - Sequence hashes identifying each block
    /// * `src_layout` - Logical layout handle identifying the source (workers resolve to physical)
    /// * `block_ids` - Block IDs within the layout to upload
    ///
    /// Returns a vector of results for each block:
    /// - Ok(hash) indicates the block was successfully stored
    /// - Err(hash) indicates the block failed to store
    ///
    /// # Note
    /// For `S3ObjectBlockClient`, this will error - use `put_blocks_with_layout` instead.
    /// Workers should resolve the logical handle to their physical layout first.
    fn put_blocks(
        &self,
        keys: Vec<SequenceHash>,
        src_layout: LogicalLayoutHandle,
        block_ids: Vec<BlockId>,
    ) -> BoxFuture<'static, Vec<Result<SequenceHash, SequenceHash>>>;

    /// Get blocks from object storage.
    ///
    /// # Arguments
    /// * `keys` - Sequence hashes identifying each block
    /// * `dst_layout` - Logical layout handle identifying the destination (workers resolve to physical)
    /// * `block_ids` - Block IDs within the layout to download into
    ///
    /// Returns a vector of results for each block:
    /// - Ok(hash) indicates the block was successfully retrieved
    /// - Err(hash) indicates the block failed to retrieve
    ///
    /// # Note
    /// For `S3ObjectBlockClient`, this will error - use `get_blocks_with_layout` instead.
    /// Workers should resolve the logical handle to their physical layout first.
    fn get_blocks(
        &self,
        keys: Vec<SequenceHash>,
        dst_layout: LogicalLayoutHandle,
        block_ids: Vec<BlockId>,
    ) -> BoxFuture<'static, Vec<Result<SequenceHash, SequenceHash>>>;

    // =========================================================================
    // Physical Layout Methods (for workers that can resolve handles)
    // =========================================================================

    /// Put blocks to object storage using a resolved physical layout.
    ///
    /// This method is called by workers after resolving a logical handle to
    /// their physical layout. The default implementation errors; storage backends
    /// like `S3ObjectBlockClient` override this with actual upload logic.
    ///
    /// # Arguments
    /// * `keys` - Sequence hashes identifying each block
    /// * `layout` - Physical layout containing the block data
    /// * `block_ids` - Block IDs within the layout to upload
    fn put_blocks_with_layout(
        &self,
        keys: Vec<SequenceHash>,
        _layout: PhysicalLayout,
        _block_ids: Vec<BlockId>,
    ) -> BoxFuture<'static, Vec<Result<SequenceHash, SequenceHash>>> {
        Box::pin(async move { keys.into_iter().map(Err).collect() })
    }

    /// Get blocks from object storage into a resolved physical layout.
    ///
    /// This method is called by workers after resolving a logical handle to
    /// their physical layout. The default implementation errors; storage backends
    /// like `S3ObjectBlockClient` override this with actual download logic.
    ///
    /// # Arguments
    /// * `keys` - Sequence hashes identifying each block
    /// * `layout` - Physical layout to write the block data into
    /// * `block_ids` - Block IDs within the layout to download into
    fn get_blocks_with_layout(
        &self,
        keys: Vec<SequenceHash>,
        _layout: PhysicalLayout,
        _block_ids: Vec<BlockId>,
    ) -> BoxFuture<'static, Vec<Result<SequenceHash, SequenceHash>>> {
        Box::pin(async move { keys.into_iter().map(Err).collect() })
    }
}

// ============================================================================
// Object Lock Manager Trait
// ============================================================================

/// Lock file content structure for distributed locking.
///
/// The lock file is stored as JSON in object storage at `{sequence_hash}.lock`.
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct LockFileContent {
    /// Unique identifier of the instance that holds the lock
    pub instance_id: String,
    /// When the lock was acquired (ISO 8601 timestamp)
    pub acquired_at: String,
    /// When the lock expires (ISO 8601 timestamp)
    pub deadline: String,
}

/// Object lock manager trait for distributed locking in object storage.
///
/// This trait provides the locking semantics for the object offload pipeline:
/// 1. Check `.meta` file to see if block is already offloaded
/// 2. Try to acquire `.lock` file with conditional PUT
/// 3. Create `.meta` file after successful offload
/// 4. Release `.lock` file after completion
///
/// # Locking Flow
///
/// ```text
/// has_meta() -> false -> try_acquire_lock() -> true -> execute transfer -> create_meta() -> release_lock()
///                                           -> false -> skip (another instance owns it)
///            -> true -> skip (already offloaded)
/// ```
pub trait ObjectLockManager: Send + Sync {
    /// Check if meta file exists (block already offloaded).
    ///
    /// Returns `true` if `{hash}.meta` exists, meaning the block has been
    /// successfully offloaded and should be skipped.
    fn has_meta(&self, hash: SequenceHash) -> BoxFuture<'static, Result<bool>>;

    /// Try to acquire a lock for the given block.
    ///
    /// This method:
    /// 1. Attempts conditional PUT of `{hash}.lock` with `If-None-Match: *`
    /// 2. If lock exists, reads it to check deadline
    /// 3. If deadline is breached (> timeout), overwrites the lock
    ///
    /// Returns:
    /// - `Ok(true)` if lock was acquired or overwritten
    /// - `Ok(false)` if another instance owns a valid lock
    /// - `Err(...)` for other errors
    fn try_acquire_lock(&self, hash: SequenceHash) -> BoxFuture<'static, Result<bool>>;

    /// Create the meta file after successful offload.
    ///
    /// This marks the block as successfully offloaded by creating `{hash}.meta`.
    fn create_meta(&self, hash: SequenceHash) -> BoxFuture<'static, Result<()>>;

    /// Release the lock by deleting the lock file.
    ///
    /// Deletes `{hash}.lock` after the transfer is complete.
    fn release_lock(&self, hash: SequenceHash) -> BoxFuture<'static, Result<()>>;
}

// ============================================================================
// Factory Functions
// ============================================================================

/// Create an object client from configuration.
///
/// Returns a trait object so consumers don't need to depend on the `s3` feature.
/// The implementation is selected based on the configuration type.
///
/// # Arguments
/// * `config` - Object storage configuration
/// * `rank` - Optional worker rank for key prefixing (None for leader)
///
/// # Errors
/// Returns an error if the object client cannot be initialized or if the
/// required feature is not enabled.
#[cfg(feature = "s3")]
pub async fn create_object_client(
    config: &kvbm_config::ObjectConfig,
    rank: Option<usize>,
) -> Result<Arc<dyn ObjectBlockOps>> {
    use kvbm_config::ObjectClientConfig;
    use s3::{S3Config, S3ObjectBlockClient};

    let key_formatter = create_key_formatter(rank);

    match &config.client {
        ObjectClientConfig::S3(s3_config) => {
            let config = S3Config::from_object_config(s3_config);
            let client = S3ObjectBlockClient::with_key_formatter(config, key_formatter).await?;
            Ok(Arc::new(client))
        }
        ObjectClientConfig::Nixl(_nixl_config) => {
            anyhow::bail!("Nixl object storage backend not yet implemented")
        }
    }
}

/// Fallback when S3 feature is disabled.
#[cfg(not(feature = "s3"))]
pub async fn create_object_client(
    _config: &kvbm_config::ObjectConfig,
    _rank: Option<usize>,
) -> Result<Arc<dyn ObjectBlockOps>> {
    anyhow::bail!("Object storage requires the 's3' feature to be enabled")
}

/// Create a lock manager from configuration.
///
/// Returns a trait object so consumers don't need to depend on the `s3` feature.
///
/// # Arguments
/// * `config` - Object storage configuration
/// * `instance_id` - Unique identifier for this instance (used in lock files)
///
/// # Errors
/// Returns an error if the lock manager cannot be initialized or if the
/// required feature is not enabled.
#[cfg(feature = "s3")]
pub async fn create_lock_manager(
    config: &kvbm_config::ObjectConfig,
    instance_id: String,
) -> Result<Arc<dyn ObjectLockManager>> {
    use kvbm_config::ObjectClientConfig;
    use s3::{S3Config, S3LockManager, S3ObjectBlockClient};

    match &config.client {
        ObjectClientConfig::S3(s3_config) => {
            let config = S3Config::from_object_config(s3_config);
            // Lock manager uses default key formatter (no rank prefix for lock/meta files)
            let client = Arc::new(S3ObjectBlockClient::new(config).await?);
            let manager = S3LockManager::new(client, instance_id);
            Ok(Arc::new(manager))
        }
        ObjectClientConfig::Nixl(_nixl_config) => {
            anyhow::bail!("Nixl object storage backend not yet implemented")
        }
    }
}

/// Fallback when S3 feature is disabled.
#[cfg(not(feature = "s3"))]
pub async fn create_lock_manager(
    _config: &kvbm_config::ObjectConfig,
    _instance_id: String,
) -> Result<Arc<dyn ObjectLockManager>> {
    anyhow::bail!("Object storage requires the 's3' feature to be enabled")
}