"benchmarks/vscode:/vscode.git/clone" did not exist on "5139f688cb2697d652d6bdac996fefb2a05d7f70"
local_model.rs 15.4 KB
Newer Older
1
2
3
4
5
6
7
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

use std::fs;
use std::path::{Path, PathBuf};
use std::sync::Arc;

8
use anyhow::Context as _;
9
use dynamo_runtime::protocols::EndpointId;
10
use dynamo_runtime::slug::Slug;
11
use dynamo_runtime::traits::DistributedRuntimeProvider;
12
use dynamo_runtime::{
13
    component::Endpoint,
14
15
    storage::key_value_store::{EtcdStorage, KeyValueStore, KeyValueStoreManager},
};
16

17
use crate::discovery::ModelEntry;
18
use crate::entrypoint::RouterConfig;
19
use crate::mocker::protocols::MockEngineArgs;
20
use crate::model_card::{self, ModelDeploymentCard};
21
use crate::model_type::{ModelInput, ModelType};
22
use crate::request_template::RequestTemplate;
23

24
25
mod network_name;
pub use network_name::ModelNetworkName;
26
27
28
pub mod runtime_config;

use runtime_config::ModelRuntimeConfig;
29

30
31
32
33
34
35
36
/// Prefix for Hugging Face model repository
const HF_SCHEME: &str = "hf://";

/// What we call a model if the user didn't provide a name. Usually this means the name
/// is invisible, for example in a text chat.
const DEFAULT_NAME: &str = "dynamo";

37
38
39
40
/// Engines don't usually provide a default, so we do.
const DEFAULT_KV_CACHE_BLOCK_SIZE: u32 = 16;

/// We can't have it default to 0, so pick something
Graham King's avatar
Graham King committed
41
42
/// 'pub' because the bindings use it for consistency.
pub const DEFAULT_HTTP_PORT: u16 = 8080;
43
44
45
46
47
48
49
50
51
52

pub struct LocalModelBuilder {
    model_path: Option<PathBuf>,
    model_name: Option<String>,
    model_config: Option<PathBuf>,
    endpoint_id: Option<EndpointId>,
    context_length: Option<u32>,
    template_file: Option<PathBuf>,
    router_config: Option<RouterConfig>,
    kv_cache_block_size: u32,
53
    http_host: Option<String>,
54
    http_port: u16,
Graham King's avatar
Graham King committed
55
56
    tls_cert_path: Option<PathBuf>,
    tls_key_path: Option<PathBuf>,
57
    migration_limit: u32,
58
    is_mocker: bool,
59
60
    extra_engine_args: Option<PathBuf>,
    runtime_config: ModelRuntimeConfig,
61
    user_data: Option<serde_json::Value>,
62
    custom_template_path: Option<PathBuf>,
63
    namespace: Option<String>,
64
65
}

66
impl Default for LocalModelBuilder {
67
    fn default() -> Self {
68
69
        LocalModelBuilder {
            kv_cache_block_size: DEFAULT_KV_CACHE_BLOCK_SIZE,
70
            http_host: Default::default(),
71
            http_port: DEFAULT_HTTP_PORT,
Graham King's avatar
Graham King committed
72
73
            tls_cert_path: Default::default(),
            tls_key_path: Default::default(),
74
75
76
77
78
79
80
            model_path: Default::default(),
            model_name: Default::default(),
            model_config: Default::default(),
            endpoint_id: Default::default(),
            context_length: Default::default(),
            template_file: Default::default(),
            router_config: Default::default(),
81
            migration_limit: Default::default(),
82
            is_mocker: Default::default(),
83
84
            extra_engine_args: Default::default(),
            runtime_config: Default::default(),
85
            user_data: Default::default(),
86
            custom_template_path: Default::default(),
87
            namespace: Default::default(),
88
89
90
91
        }
    }
}

92
93
94
95
impl LocalModelBuilder {
    pub fn model_path(&mut self, model_path: Option<PathBuf>) -> &mut Self {
        self.model_path = model_path;
        self
96
97
    }

98
99
100
    pub fn model_name(&mut self, model_name: Option<String>) -> &mut Self {
        self.model_name = model_name;
        self
101
102
    }

103
104
105
    pub fn model_config(&mut self, model_config: Option<PathBuf>) -> &mut Self {
        self.model_config = model_config;
        self
106
107
    }

108
109
    pub fn endpoint_id(&mut self, endpoint_id: Option<EndpointId>) -> &mut Self {
        self.endpoint_id = endpoint_id;
110
        self
111
112
    }

113
114
115
    pub fn context_length(&mut self, context_length: Option<u32>) -> &mut Self {
        self.context_length = context_length;
        self
116
117
    }

118
119
120
121
    /// Passing None resets it to default
    pub fn kv_cache_block_size(&mut self, kv_cache_block_size: Option<u32>) -> &mut Self {
        self.kv_cache_block_size = kv_cache_block_size.unwrap_or(DEFAULT_KV_CACHE_BLOCK_SIZE);
        self
122
123
    }

124
125
126
127
128
    pub fn http_host(&mut self, host: Option<String>) -> &mut Self {
        self.http_host = host;
        self
    }

Graham King's avatar
Graham King committed
129
130
131
132
133
134
135
136
137
138
139
140
    pub fn http_port(&mut self, port: u16) -> &mut Self {
        self.http_port = port;
        self
    }

    pub fn tls_cert_path(&mut self, p: Option<PathBuf>) -> &mut Self {
        self.tls_cert_path = p;
        self
    }

    pub fn tls_key_path(&mut self, p: Option<PathBuf>) -> &mut Self {
        self.tls_key_path = p;
141
        self
142
143
    }

144
145
    pub fn router_config(&mut self, router_config: Option<RouterConfig>) -> &mut Self {
        self.router_config = router_config;
146
147
148
        self
    }

149
150
151
152
153
    pub fn namespace(&mut self, namespace: Option<String>) -> &mut Self {
        self.namespace = namespace;
        self
    }

154
155
156
    pub fn request_template(&mut self, template_file: Option<PathBuf>) -> &mut Self {
        self.template_file = template_file;
        self
157
158
    }

159
160
161
162
163
    pub fn custom_template_path(&mut self, custom_template_path: Option<PathBuf>) -> &mut Self {
        self.custom_template_path = custom_template_path;
        self
    }

164
165
166
167
168
    pub fn migration_limit(&mut self, migration_limit: Option<u32>) -> &mut Self {
        self.migration_limit = migration_limit.unwrap_or(0);
        self
    }

169
170
171
172
173
    pub fn is_mocker(&mut self, is_mocker: bool) -> &mut Self {
        self.is_mocker = is_mocker;
        self
    }

174
175
176
177
178
179
180
181
182
183
    pub fn extra_engine_args(&mut self, extra_engine_args: Option<PathBuf>) -> &mut Self {
        self.extra_engine_args = extra_engine_args;
        self
    }

    pub fn runtime_config(&mut self, runtime_config: ModelRuntimeConfig) -> &mut Self {
        self.runtime_config = runtime_config;
        self
    }

184
185
186
187
188
    pub fn user_data(&mut self, user_data: Option<serde_json::Value>) -> &mut Self {
        self.user_data = user_data;
        self
    }

189
190
191
192
193
194
195
196
197
    /// Make an LLM ready for use:
    /// - Download it from Hugging Face (and NGC in future) if necessary
    /// - Resolve the path
    /// - Load it's ModelDeploymentCard card
    /// - Name it correctly
    ///
    /// The model name will depend on what "model_path" is:
    /// - A folder: The last part of the folder name: "/data/llms/Qwen2.5-3B-Instruct" -> "Qwen2.5-3B-Instruct"
    /// - A file: The GGUF filename: "/data/llms/Qwen2.5-3B-Instruct-Q6_K.gguf" -> "Qwen2.5-3B-Instruct-Q6_K.gguf"
198
199
200
201
202
203
204
205
    /// - An HF repo: The HF repo name: "Qwen/Qwen3-0.6B" stays the same
    pub async fn build(&mut self) -> anyhow::Result<LocalModel> {
        // Generate an endpoint ID for this model if the user didn't provide one.
        // The user only provides one if exposing the model.
        let endpoint_id = self
            .endpoint_id
            .take()
            .unwrap_or_else(|| internal_endpoint("local_model"));
206

207
208
209
210
211
212
213
214
        let template = self
            .template_file
            .as_deref()
            .map(RequestTemplate::load)
            .transpose()?;

        // echo_full engine doesn't need a path. It's an edge case, move it out of the way.
        if self.model_path.is_none() {
215
216
217
218
            let mut card = ModelDeploymentCard::with_name_only(
                self.model_name.as_deref().unwrap_or(DEFAULT_NAME),
            );
            card.migration_limit = self.migration_limit;
219
            card.user_data = self.user_data.take();
220
            card.runtime_config = self.runtime_config.clone();
221

222
            return Ok(LocalModel {
223
                card,
224
225
226
                full_path: PathBuf::new(),
                endpoint_id,
                template,
227
                http_host: self.http_host.take(),
228
                http_port: self.http_port,
Graham King's avatar
Graham King committed
229
230
                tls_cert_path: self.tls_cert_path.take(),
                tls_key_path: self.tls_key_path.take(),
231
                router_config: self.router_config.take().unwrap_or_default(),
232
                runtime_config: self.runtime_config.clone(),
233
                namespace: self.namespace.clone(),
234
235
236
237
238
239
            });
        }

        // Main logic. We are running a model.
        let model_path = self.model_path.take().unwrap();
        let model_path = model_path.to_str().context("Invalid UTF-8 in model path")?;
240
241
242
243
244
245
246
247

        // Check for hf:// prefix first, in case we really want an HF repo but it conflicts
        // with a relative path.
        let is_hf_repo =
            model_path.starts_with(HF_SCHEME) || !fs::exists(model_path).unwrap_or(false);
        let relative_path = model_path.trim_start_matches(HF_SCHEME);
        let full_path = if is_hf_repo {
            // HF download if necessary
248
            super::hub::from_hf(relative_path, self.is_mocker).await?
249
250
251
        } else {
            fs::canonicalize(relative_path)?
        };
252
253
        // --model-config takes precedence over --model-path
        let model_config_path = self.model_config.as_ref().unwrap_or(&full_path);
254

255
        let mut card =
256
            ModelDeploymentCard::load(model_config_path, self.custom_template_path.as_deref())?;
257
258
259

        // Usually we infer from the path, self.model_name is user override
        let model_name = self.model_name.take().unwrap_or_else(|| {
260
261
262
263
264
265
266
267
268
269
270
271
            if is_hf_repo {
                // HF repos use their full name ("org/name") not the folder name
                relative_path.to_string()
            } else {
                full_path
                    .iter()
                    .next_back()
                    .map(|n| n.to_string_lossy().into_owned())
                    .unwrap_or_else(|| {
                        // Panic because we can't do anything without a model
                        panic!("Invalid model path, too short: '{}'", full_path.display())
                    })
272
            }
273
        });
274
        card.set_name(&model_name);
275

276
        card.kv_cache_block_size = self.kv_cache_block_size;
277

278
279
280
281
        // Override max number of tokens in context. We usually only do this to limit kv cache allocation.
        if let Some(context_length) = self.context_length {
            card.context_length = context_length;
        }
282

283
        // Override runtime configs with mocker engine args
284
285
286
287
288
289
290
291
292
        if self.is_mocker
            && let Some(path) = &self.extra_engine_args
        {
            let mocker_engine_args = MockEngineArgs::from_json_file(path)
                .expect("Failed to load mocker engine args for runtime config overriding.");
            self.runtime_config.total_kv_blocks = Some(mocker_engine_args.num_gpu_blocks as u64);
            self.runtime_config.max_num_seqs = mocker_engine_args.max_num_seqs.map(|v| v as u64);
            self.runtime_config.max_num_batched_tokens =
                mocker_engine_args.max_num_batched_tokens.map(|v| v as u64);
293
294
        }

295
        card.migration_limit = self.migration_limit;
296
        card.user_data = self.user_data.take();
297
        card.runtime_config = self.runtime_config.clone();
298

299
300
301
302
303
        Ok(LocalModel {
            card,
            full_path,
            endpoint_id,
            template,
304
            http_host: self.http_host.take(),
305
            http_port: self.http_port,
Graham King's avatar
Graham King committed
306
307
            tls_cert_path: self.tls_cert_path.take(),
            tls_key_path: self.tls_key_path.take(),
308
            router_config: self.router_config.take().unwrap_or_default(),
309
            runtime_config: self.runtime_config.clone(),
310
            namespace: self.namespace.clone(),
311
312
313
314
315
316
317
318
319
320
        })
    }
}

#[derive(Debug, Clone)]
pub struct LocalModel {
    full_path: PathBuf,
    card: ModelDeploymentCard,
    endpoint_id: EndpointId,
    template: Option<RequestTemplate>,
321
    http_host: Option<String>,
Graham King's avatar
Graham King committed
322
323
324
    http_port: u16,
    tls_cert_path: Option<PathBuf>,
    tls_key_path: Option<PathBuf>,
325
    router_config: RouterConfig,
326
    runtime_config: ModelRuntimeConfig,
327
    namespace: Option<String>,
328
329
330
331
332
333
334
335
336
337
338
}

impl LocalModel {
    pub fn card(&self) -> &ModelDeploymentCard {
        &self.card
    }

    pub fn path(&self) -> &Path {
        &self.full_path
    }

339
    /// Human friendly model name. This is the correct name.
340
341
342
343
    pub fn display_name(&self) -> &str {
        &self.card.display_name
    }

344
345
    /// The name under which we make this model available over HTTP.
    /// A slugified version of the model's name, for use in NATS, etcd, etc.
346
    pub fn service_name(&self) -> &str {
347
        self.card.slug().as_ref()
348
349
350
351
352
353
    }

    pub fn request_template(&self) -> Option<RequestTemplate> {
        self.template.clone()
    }

354
355
356
357
    pub fn http_host(&self) -> Option<String> {
        self.http_host.clone()
    }

358
359
360
361
    pub fn http_port(&self) -> u16 {
        self.http_port
    }

Graham King's avatar
Graham King committed
362
363
364
365
366
367
368
369
    pub fn tls_cert_path(&self) -> Option<&Path> {
        self.tls_cert_path.as_deref()
    }

    pub fn tls_key_path(&self) -> Option<&Path> {
        self.tls_key_path.as_deref()
    }

370
371
372
373
    pub fn router_config(&self) -> &RouterConfig {
        &self.router_config
    }

374
375
376
377
    pub fn runtime_config(&self) -> &ModelRuntimeConfig {
        &self.runtime_config
    }

378
379
380
381
    pub fn namespace(&self) -> Option<&str> {
        self.namespace.as_deref()
    }

382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
    pub fn is_gguf(&self) -> bool {
        // GGUF is the only file (not-folder) we accept, so we don't need to check the extension
        // We will error when we come to parse it
        self.full_path.is_file()
    }

    /// An endpoint to identify this model by.
    pub fn endpoint_id(&self) -> &EndpointId {
        &self.endpoint_id
    }

    /// Drop the LocalModel returning it's ModelDeploymentCard.
    /// For the case where we only need the card and don't want to clone it.
    pub fn into_card(self) -> ModelDeploymentCard {
        self.card
397
398
399
400
401
402
403
404
    }

    /// Attach this model the endpoint. This registers it on the network
    /// allowing ingress to discover it.
    pub async fn attach(
        &mut self,
        endpoint: &Endpoint,
        model_type: ModelType,
405
        model_input: ModelInput,
406
407
408
409
410
    ) -> anyhow::Result<()> {
        // A static component doesn't have an etcd_client because it doesn't need to register
        let Some(etcd_client) = endpoint.drt().etcd_client() else {
            anyhow::bail!("Cannot attach to static endpoint");
        };
411

412
413
414
415
416
        // Store model config files in NATS object store
        let nats_client = endpoint.drt().nats_client();
        self.card.move_to_nats(nats_client.clone()).await?;

        // Publish the Model Deployment Card to etcd
417
        let kvstore: Box<dyn KeyValueStore> = Box::new(EtcdStorage::new(etcd_client.clone()));
418
419
        let card_store = Arc::new(KeyValueStoreManager::new(kvstore));
        let key = self.card.slug().to_string();
420

421
        card_store
422
            .publish(model_card::ROOT_PATH, None, &key, &mut self.card)
423
424
425
426
            .await?;

        // Publish our ModelEntry to etcd. This allows ingress to find the model card.
        // (Why don't we put the model card directly under this key?)
427
        let network_name = ModelNetworkName::new();
428
429
        tracing::debug!("Registering with etcd as {network_name}");
        let model_registration = ModelEntry {
430
            name: self.display_name().to_string(),
431
            endpoint_id: endpoint.id(),
432
            model_type,
433
            runtime_config: Some(self.runtime_config.clone()),
434
            model_input,
435
436
437
        };
        etcd_client
            .kv_create(
438
                &network_name,
439
440
441
442
443
444
                serde_json::to_vec_pretty(&model_registration)?,
                None, // use primary lease
            )
            .await
    }
}
445
446
447
448
449
450
451
452
453
454

/// A random endpoint to use for internal communication
/// We can't hard code because we may be running several on the same machine (GPUs 0-3 and 4-7)
fn internal_endpoint(engine: &str) -> EndpointId {
    EndpointId {
        namespace: Slug::slugify(&uuid::Uuid::new_v4().to_string()).to_string(),
        component: engine.to_string(),
        name: "generate".to_string(),
    }
}