builder.rs 4.41 KB
Newer Older
Ryan Olson's avatar
Ryan Olson committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

//! Builder for KvbmRuntime with optional pre-built components.

use std::sync::Arc;

use anyhow::Result;
use dynamo_memory::nixl::NixlAgent;
use kvbm_config::KvbmConfig;
use tokio::runtime::{Handle, Runtime};
use velo::Messenger;

/// Runtime handle - either owned or borrowed.
pub enum RuntimeHandle {
    /// Owned runtime (created by builder).
    Owned(Arc<Runtime>),
    /// Borrowed handle (external runtime).
    Handle(Handle),
}

impl RuntimeHandle {
    /// Get a handle to the runtime.
    pub fn handle(&self) -> Handle {
        match self {
            RuntimeHandle::Owned(rt) => rt.handle().clone(),
            RuntimeHandle::Handle(h) => h.clone(),
        }
    }
}

/// Builder for KvbmRuntime with optional pre-built components.
///
/// The builder allows injecting pre-built components or building them from config:
/// - If a component is provided, it's used directly
/// - If not provided, the component is built from the config
pub struct KvbmRuntimeBuilder {
    config: KvbmConfig,
    runtime: Option<RuntimeHandle>,
    messenger: Option<Arc<Messenger>>,
    nixl_agent: Option<NixlAgent>,
}

impl KvbmRuntimeBuilder {
    /// Create builder from config.
    pub fn new(config: KvbmConfig) -> Self {
        Self {
            config,
            runtime: None,
            messenger: None,
            nixl_agent: None,
        }
    }

    /// Create builder from environment.
    pub fn from_env() -> Result<Self, kvbm_config::ConfigError> {
        Ok(Self::new(KvbmConfig::from_env()?))
    }

    /// Create builder from JSON config string (merged with env/files).
    ///
    /// JSON has highest priority - overrides env vars, TOML files, and defaults.
    /// This is the primary entrypoint for vLLM's `kv_connector_extra_config` dict.
    pub fn from_json(json: &str) -> Result<Self, kvbm_config::ConfigError> {
        Ok(Self::new(KvbmConfig::from_figment_with_json(json)?))
    }

    /// Use an existing tokio Runtime (takes ownership via Arc).
    pub fn with_runtime(mut self, runtime: Arc<Runtime>) -> Self {
        self.runtime = Some(RuntimeHandle::Owned(runtime));
        self
    }

    /// Use an existing tokio Handle (borrowed).
    pub fn with_runtime_handle(mut self, handle: Handle) -> Self {
        self.runtime = Some(RuntimeHandle::Handle(handle));
        self
    }

    /// Use an existing Messenger instance.
    pub fn with_messenger(mut self, messenger: Arc<Messenger>) -> Self {
        self.messenger = Some(messenger);
        self
    }

    /// Use an existing NixlAgent instance.
    pub fn with_nixl_agent(mut self, agent: NixlAgent) -> Self {
        self.nixl_agent = Some(agent);
        self
    }

    /// Build runtime for leader role.
    pub async fn build_leader(self) -> Result<super::KvbmRuntime> {
        self.build_internal().await
    }

    /// Build runtime for worker role.
    pub async fn build_worker(self) -> Result<super::KvbmRuntime> {
        self.build_internal().await
    }

    async fn build_internal(self) -> Result<super::KvbmRuntime> {
        // 1. Tokio runtime - use provided or build from config
        let runtime = match self.runtime {
            Some(rt) => rt,
            None => RuntimeHandle::Owned(Arc::new(self.config.tokio.build_runtime()?)),
        };

        // 2. Messenger - use provided or build from config (BEFORE NixL)
        let messenger = match self.messenger {
            Some(m) => m,
            None => self.config.messenger.build_messenger().await?,
        };

        // 3. NixL - use provided or build from config (AFTER Messenger)
        //    Only build if config.nixl is Some (NixL enabled)
        let nixl_agent = match self.nixl_agent {
            Some(agent) => Some(agent),
            None => match &self.config.nixl {
                Some(nixl_config) => {
                    let agent_name = format!("nixl-{}", messenger.instance_id());
                    let backend_config = nixl_config.clone().into();
                    Some(NixlAgent::from_nixl_backend_config(
                        &agent_name,
                        backend_config,
                    )?)
                }
                None => None, // NixL disabled
            },
        };

        Ok(super::KvbmRuntime {
            config: self.config,
            runtime,
            messenger,
            nixl_agent,
        })
    }
}