leader.rs 3.39 KB
Newer Older
Ryan Olson's avatar
Ryan Olson committed
1
2
3
4
// SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

use super::*;
Richard Huo's avatar
Richard Huo committed
5

Ryan Olson's avatar
Ryan Olson committed
6
use derive_getters::Dissolve;
7
8
9
use llm_rs::block_manager::distributed::{
    KvbmLeader as KvbmLeaderImpl, KvbmLeaderConfig, KvbmLeaderNumBlocksConfig,
};
10
use utils::{get_leader_zmq_ack_url, get_leader_zmq_pub_url};
Ryan Olson's avatar
Ryan Olson committed
11
12
13
14
15
16
17
18
19
20

const CPU_CACHE: &str = "DYN_KVBM_CPU_CACHE_GB";
const CPU_CACHE_OVERRIDE: &str = "DYN_KVBM_CPU_CACHE_OVERRIDE_NUM_BLOCKS";

const DISK_CACHE: &str = "DYN_KVBM_DISK_CACHE_GB";
const DISK_CACHE_OVERRIDE: &str = "DYN_KVBM_DISK_CACHE_OVERRIDE_NUM_BLOCKS";

const LEADER_WORKER_INIT_TIMEOUT_SECS: &str = "DYN_KVBM_LEADER_WORKER_INIT_TIMEOUT_SECS";
const DEFAULT_INIT_TIMEOUT_SECS: u64 = 120;

21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
fn read_env_usize(key: &str) -> Option<usize> {
    std::env::var(key).ok()?.trim().parse::<usize>().ok()
}

fn read_cache_size_float(key: &str) -> f64 {
    std::env::var(key)
        .unwrap_or_default()
        .parse::<f64>()
        .unwrap_or(0.0)
}

fn get_blocks_config(cache_size_key: &str, override_key: &str) -> KvbmLeaderNumBlocksConfig {
    if let Some(nblocks) = read_env_usize(override_key) {
        // Optional: still read cache size for observability, but override takes precedence.
        let cache_gb: f64 = read_cache_size_float(cache_size_key);
        return KvbmLeaderNumBlocksConfig {
            cache_size_in_gb: cache_gb,
            num_blocks_overriden: nblocks,
        };
    }

    // No override -> compute from cache size (in GB)
    let cache_gb: f64 = read_cache_size_float(cache_size_key);
    KvbmLeaderNumBlocksConfig {
        cache_size_in_gb: cache_gb,
        num_blocks_overriden: 0,
Ryan Olson's avatar
Ryan Olson committed
47
48
49
50
51
52
53
54
55
56
57
58
59
60
    }
}

fn get_leader_init_timeout_secs(override_key: &str) -> u64 {
    std::env::var(override_key)
        .ok()
        .and_then(|v| v.parse::<u64>().ok())
        .unwrap_or(DEFAULT_INIT_TIMEOUT_SECS)
}

#[pyclass]
#[derive(Clone, Dissolve)]
pub struct KvbmLeader {
    leader: Arc<KvbmLeaderImpl>,
Richard Huo's avatar
Richard Huo committed
61
    drt: Option<Arc<rs::DistributedRuntime>>,
Ryan Olson's avatar
Ryan Olson committed
62
63
64
65
66
67
68
69
70
71
72
}

impl KvbmLeader {
    pub fn get_inner(&self) -> Arc<KvbmLeaderImpl> {
        self.leader.clone()
    }
}

#[pymethods]
impl KvbmLeader {
    #[new]
Richard Huo's avatar
Richard Huo committed
73
74
75
76
77
78
79
80
81
82
    #[pyo3(signature = (world_size, drt=None))]
    fn new(world_size: usize, drt: Option<PyObject>) -> PyResult<Self> {
        let drt: Option<Arc<rs::DistributedRuntime>> = Python::with_gil(|py| {
            if let Some(obj) = drt {
                extract_distributed_runtime_from_obj(py, obj)
            } else {
                Ok(None)
            }
        })?;

Ryan Olson's avatar
Ryan Olson committed
83
84
85
86
87
88
        let leader_init_timeout_sec: u64 =
            get_leader_init_timeout_secs(LEADER_WORKER_INIT_TIMEOUT_SECS);

        let config = KvbmLeaderConfig::builder()
            .world_size(world_size)
            .leader_init_timeout_secs(leader_init_timeout_sec)
89
90
            .host_blocks_config(get_blocks_config(CPU_CACHE, CPU_CACHE_OVERRIDE))
            .disk_blocks_config(get_blocks_config(DISK_CACHE, DISK_CACHE_OVERRIDE))
91
92
            .leader_pub_url(get_leader_zmq_pub_url())
            .leader_ack_url(get_leader_zmq_ack_url())
Ryan Olson's avatar
Ryan Olson committed
93
94
95
            .build()
            .map_err(to_pyerr)?;

96
97
        config.sanity_check().map_err(to_pyerr)?;

Richard Huo's avatar
Richard Huo committed
98
        let rt = get_current_tokio_handle();
Ryan Olson's avatar
Ryan Olson committed
99
100
101
102
103
104
105
106
107
108

        let leader =
            rt.block_on(async move { KvbmLeaderImpl::new(config).await.map_err(to_pyerr) })?;

        Ok(Self {
            leader: Arc::new(leader),
            drt,
        })
    }
}