lib.rs 18.5 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

16
17
#[cfg(any(feature = "vllm", feature = "sglang"))]
use std::{future::Future, pin::Pin};
18
use std::{io::Read, sync::Arc};
19

Neelay Shah's avatar
Neelay Shah committed
20
use dynamo_llm::{
21
22
    backend::ExecutionContext, kv_router::publisher::KvMetricsPublisher,
    model_card::model::ModelDeploymentCard,
23
    types::openai::chat_completions::OpenAIChatCompletionsStreamingEngine,
24
};
25
use dynamo_runtime::{protocols::Endpoint, DistributedRuntime};
26

27
28
mod flags;
pub use flags::Flags;
29
mod hub;
30
mod input;
31
#[cfg(any(feature = "vllm", feature = "sglang"))]
32
mod net;
33
34
35
mod opt;
pub use opt::{Input, Output};

36
37
38
/// How we identify a namespace/component/endpoint URL.
/// Technically the '://' is not part of the scheme but it eliminates several string
/// concatenations.
39
const ENDPOINT_SCHEME: &str = "dyn://";
40

41
42
43
44
/// When `in=text` the user doesn't need to know the model name, and doesn't need to provide it on
/// the command line. Hence it's optional, and defaults to this.
const INVISIBLE_MODEL_NAME: &str = "dynamo-run";

45
46
47
/// The component name for the KV publisher, if used
const KV_PUBLISHER_COMPONENT: &str = "kvpublisher";

48
49
50
51
/// How we identify a python string endpoint
#[cfg(feature = "python")]
const PYTHON_STR_SCHEME: &str = "pystr:";

52
53
54
55
/// How we identify a python token endpoint
#[cfg(feature = "python")]
const PYTHON_TOK_SCHEME: &str = "pytok:";

56
pub enum EngineConfig {
57
    /// An remote networked engine we don't know about yet
58
    Dynamic(Endpoint),
59

60
61
62
63
64
    /// A Full service engine does it's own tokenization and prompt formatting.
    StaticFull {
        service_name: String,
        engine: OpenAIChatCompletionsStreamingEngine,
    },
65
66
67
68
69
70
71

    /// A core engine expects to be wrapped with pre/post processors that handle tokenization.
    StaticCore {
        service_name: String,
        engine: ExecutionContext,
        card: Box<ModelDeploymentCard>,
    },
72

73
74
    /// vllm multi-node doesn't run an engine on nodes other than 0. 'ray' does all the work.
    None,
75
76
}

77
78
79
80
81
82
/// Distributed system values
struct DynInput {
    endpoint_id: Endpoint,
    distributed_runtime: DistributedRuntime,
}

83
#[allow(unused_mut)]
84
pub async fn run(
Neelay Shah's avatar
Neelay Shah committed
85
    runtime: dynamo_runtime::Runtime,
86
    mut in_opt: Input, // mut because vllm and sglang multi-node can change it
87
88
    out_opt: Output,
    flags: Flags,
89
    #[allow(unused_variables)] zmq_socket_prefix: Option<String>,
90
) -> anyhow::Result<()> {
91
92
    let cancel_token = runtime.primary_token();

93
    // Turn relative paths into absolute paths
94
    let mut model_path = flags
95
        .model_path_pos
96
97
        .clone()
        .or(flags.model_path_flag.clone())
98
99
100
101
102
103
104
        .and_then(|p| {
            if p.exists() {
                p.canonicalize().ok()
            } else {
                Some(p)
            }
        });
105

Graham King's avatar
Graham King committed
106
    // Serve the model under the name provided, or the name of the GGUF file or HF repo.
107
    let mut model_name = flags
108
        .model_name
109
        .clone()
110
111
112
        .or_else(|| {
            model_path
                .as_ref()
113
                .and_then(|p| p.iter().next_back())
114
115
116
117
118
119
120
121
122
                .map(|n| n.to_string_lossy().into_owned())
        })
        .or_else(|| {
            if in_opt == Input::Text {
                Some(INVISIBLE_MODEL_NAME.to_string())
            } else {
                None
            }
        });
123
124
125
126
127
128

    // If it's an HF repo download it
    if let Some(inner_model_path) = model_path.as_ref() {
        if !inner_model_path.exists() {
            model_name = inner_model_path
                .iter()
129
                .next_back()
130
131
132
133
134
                .map(|s| s.to_string_lossy().to_string());
            model_path = Some(hub::from_hf(inner_model_path).await?);
        }
    }

Graham King's avatar
Graham King committed
135
136
137
    // Load the model deployment card, if any
    // Only used by some engines, so without those feature flags it's unused.
    #[allow(unused_variables)]
138
    let maybe_card = match (&model_path, &flags.model_config) {
Graham King's avatar
Graham King committed
139
140
        // --model-config takes precedence
        (_, Some(model_config)) => {
141
142
143
144
145
146
147
148
149
150
            match ModelDeploymentCard::from_local_path(model_config, model_name.as_deref()).await {
                Ok(card) => Some(card),
                Err(e) => {
                    tracing::error!(
                        "Failed to load model card from --model-config path {}: {e}",
                        model_config.display(),
                    );
                    None
                }
            }
151
        }
Graham King's avatar
Graham King committed
152
153
        // If --model-path is an HF repo use that
        (Some(model_path), _) if model_path.is_dir() => {
154
            match ModelDeploymentCard::from_local_path(model_path, model_name.as_deref()).await {
155
156
157
                Ok(card) => Some(card),
                Err(e) => {
                    tracing::error!(
158
                        "Failed to load model card from --model-path {}: {e}",
159
160
161
162
                        model_path.display(),
                    );
                    None
                }
163
164
165
166
167
168
169
170
171
172
173
174
175
            }
        }
        (Some(model_path), _) if model_path.is_file() => {
            match ModelDeploymentCard::from_gguf(model_path, model_name.as_deref()).await {
                Ok(card) => Some(card),
                Err(e) => {
                    tracing::error!(
                        "Failed to load model card from GGUF {}: {e}",
                        model_path.display(),
                    );
                    None
                }
            }
Graham King's avatar
Graham King committed
176
177
        }
        // Otherwise we don't have one, but we only need it if we're tokenizing
178
179
        _ => {
            tracing::debug!("No model card path provided (neither --model-config nor a directory in --model-path)");
180
            None
181
        }
182
    };
183

184
185
186
187
188
189
190
191
192
193
194
195
196
    // If we are in a distributed system, we need to know our component upfront
    let dyn_input = match &in_opt {
        Input::Endpoint(endpoint_path) => {
            let distributed_runtime = DistributedRuntime::from_settings(runtime.clone()).await?;
            let endpoint_id: Endpoint = endpoint_path.parse()?;
            Some(DynInput {
                endpoint_id,
                distributed_runtime,
            })
        }
        _ => None,
    };

Graham King's avatar
Graham King committed
197
    #[cfg(any(feature = "vllm", feature = "sglang"))]
198
    let mut extra: Option<Pin<Box<dyn Future<Output = ()> + Send>>> = None; // vllm and sglang sub-process
199

200
201
    // Create the engine matching `out`
    let engine_config = match out_opt {
202
203
204
205
206
207
208
209
        Output::EchoFull => {
            let Some(model_name) = model_name else {
                anyhow::bail!(
                    "Pass --model-name or --model-path so we know which model to imitate"
                );
            };
            EngineConfig::StaticFull {
                service_name: model_name,
210
                engine: dynamo_llm::engines::make_engine_full(),
211
212
            }
        }
213
214
215
216
217
218
219
220
221
        Output::EchoCore => {
            let Some(mut card) = maybe_card.clone() else {
                anyhow::bail!(
                    "out=echo_core need to find the tokenizer. Pass flag --model-path <path>"
                );
            };
            card.requires_preprocessing = true;
            EngineConfig::StaticCore {
                service_name: card.service_name.clone(),
222
                engine: dynamo_llm::engines::make_engine_core(),
223
224
225
                card: Box::new(card),
            }
        }
226
        Output::Endpoint(path) => {
227
            let endpoint: Endpoint = path.parse()?;
228
            EngineConfig::Dynamic(endpoint)
229
        }
230
231
232
233
234
235
236
237
238
239
        #[cfg(feature = "mistralrs")]
        Output::MistralRs => {
            let Some(model_path) = model_path else {
                anyhow::bail!("out=mistralrs requires flag --model-path=<full-path-to-model-gguf>");
            };
            let Some(model_name) = model_name else {
                unreachable!("We checked model_path earlier, and set model_name from model_path");
            };
            EngineConfig::StaticFull {
                service_name: model_name,
240
                engine: dynamo_engine_mistralrs::make_engine(&model_path).await?,
241
242
            }
        }
243
244
245
246
247
248
249
250
251
252
253
254
255
        #[cfg(feature = "sglang")]
        Output::SgLang => {
            let Some(model_path) = model_path else {
                anyhow::bail!("out=sglang requires flag --model-path=<full-path-to-model-dir>");
            };
            if !model_path.is_dir() {
                anyhow::bail!("`--model-path should point at a HuggingFace repo checkout");
            }
            // Safety: Earlier we build maybe_card from model_path, which we checked right above
            let card = maybe_card.clone().unwrap();
            let Some(sock_prefix) = zmq_socket_prefix else {
                anyhow::bail!("sglang requires zmq_socket_prefix");
            };
Neelay Shah's avatar
Neelay Shah committed
256
            let node_conf = dynamo_llm::engines::MultiNodeConfig {
257
258
                num_nodes: flags.num_nodes,
                node_rank: flags.node_rank,
259
                leader_addr: flags.leader_addr.clone().unwrap_or_default(),
260
261
262
263
264
265
266
            };
            if node_conf.num_nodes > 1 {
                if let Ok(Some(if_name)) = net::get_primary_interface().await {
                    tracing::info!("If you see 'gloo' errors from sglang try setting these environment variables:");
                    tracing::info!("export GLOO_SOCKET_IFNAME={if_name}");
                    tracing::info!("export NCCL_SOCKET_IFNAME={if_name}");
                }
267
268
269
270
271
                if node_conf.node_rank != 0 {
                    // Follower nodes take input from leader node over pytorch distributed, not
                    // from user.
                    in_opt = Input::None;
                }
272
273
            }

274
            let (engine, sglang_process) = dynamo_engine_sglang::make_engine(
275
276
277
278
279
280
                cancel_token.clone(),
                &model_path,
                &sock_prefix,
                node_conf,
                flags.tensor_parallel_size,
                flags.base_gpu_id,
281
                flags.extra_engine_args.clone(),
282
283
            )
            .await?;
284
285
286
            extra = Some(Box::pin(async move {
                let _ = sglang_process.await;
            }));
287
288
            EngineConfig::StaticCore {
                service_name: card.service_name.clone(),
289
290
291
292
                engine,
                card: Box::new(card),
            }
        }
Graham King's avatar
Graham King committed
293
294
        #[cfg(feature = "vllm")]
        Output::Vllm => {
295
296
297
            if flags.base_gpu_id != 0 {
                anyhow::bail!("vllm does not support base_gpu_id. Set environment variable CUDA_VISIBLE_DEVICES instead.");
            }
Graham King's avatar
Graham King committed
298
299
300
301
302
303
304
            let Some(model_path) = model_path else {
                anyhow::bail!(
                    "out=vllm requires flag --model-path=<full-path-to-hf-repo-or-model-gguf>"
                );
            };
            let Some(card) = maybe_card.clone() else {
                anyhow::bail!(
305
                    "Unable to build tokenizer. out=vllm requires --model-path to be an HF repo with fast tokenizer (tokenizer.json) or a GGUF file"
Graham King's avatar
Graham King committed
306
307
308
309
310
                );
            };
            let Some(sock_prefix) = zmq_socket_prefix else {
                anyhow::bail!("vllm requires zmq_socket_prefix");
            };
Neelay Shah's avatar
Neelay Shah committed
311
            let node_conf = dynamo_llm::engines::MultiNodeConfig {
312
313
                num_nodes: flags.num_nodes,
                node_rank: flags.node_rank,
314
                leader_addr: flags.leader_addr.clone().unwrap_or_default(),
315
316
317
318
319
320
321
322
323
324
325
326
            };
            if node_conf.num_nodes > 1 {
                if let Ok(Some(if_name)) = net::get_primary_interface().await {
                    tracing::info!("If you see network errors from vllm try setting this environment variable:");
                    tracing::info!("export NCCL_SOCKET_IFNAME={if_name}");
                }
                if node_conf.node_rank != 0 {
                    // Only node 0 runs vllm, the others communicate over ray
                    in_opt = Input::None;
                }
            }
            if node_conf.node_rank == 0 {
327
328
329
330
331
332
333
334
335
336
337
338
339
                let kv_metrics_publisher = if let Some(dyn_input) = &dyn_input {
                    let kvp_component = dyn_input
                        .distributed_runtime
                        .namespace(dyn_input.endpoint_id.namespace.clone())?
                        .component(KV_PUBLISHER_COMPONENT)?;
                    let kvp = Arc::new(KvMetricsPublisher::new()?);
                    let kvp_inner = kvp.clone();
                    tokio::spawn(async move { kvp_inner.create_endpoint(kvp_component).await });
                    Some(kvp)
                } else {
                    None
                };

340
                // vllm multi-node only the leader runs vllm
341
                let (engine, vllm_future) = dynamo_engine_vllm::make_leader_engine(
342
343
344
345
346
                    cancel_token.clone(),
                    &model_path,
                    &sock_prefix,
                    node_conf,
                    flags.tensor_parallel_size,
347
348
                    flags.extra_engine_args.clone(),
                    kv_metrics_publisher,
349
350
351
352
353
354
355
356
357
358
359
360
                )
                .await?;
                extra = Some(Box::pin(async move {
                    let _ = vllm_future.await;
                }));
                EngineConfig::StaticCore {
                    service_name: card.service_name.clone(),
                    engine,
                    card: Box::new(card),
                }
            } else {
                // Nodes rank > 0 only run 'ray'
361
362
                let stop_future =
                    dynamo_engine_vllm::start_follower(cancel_token.clone(), node_conf).await?;
363
364
                extra = Some(Box::pin(stop_future));
                EngineConfig::None
Graham King's avatar
Graham King committed
365
366
            }
        }
367
368
369
370
371
372
373
374
        #[cfg(feature = "llamacpp")]
        Output::LlamaCpp => {
            let Some(model_path) = model_path else {
                anyhow::bail!("out=llamacpp requires flag --model-path=<full-path-to-model-gguf>");
            };
            if !model_path.is_file() {
                anyhow::bail!("--model-path should refer to a GGUF file. llama_cpp does not support safetensors.");
            }
375
            let Some(card) = maybe_card.clone() else {
Graham King's avatar
Graham King committed
376
377
378
                anyhow::bail!(
                    "Pass --model-config so we can find the tokenizer, should be an HF checkout."
                );
379
            };
380
381
            let engine =
                dynamo_engine_llamacpp::make_engine(cancel_token.clone(), &model_path).await?;
382
383
            EngineConfig::StaticCore {
                service_name: card.service_name.clone(),
384
                engine,
Graham King's avatar
Graham King committed
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
                card: Box::new(card),
            }
        }
        #[cfg(feature = "trtllm")]
        Output::TrtLLM => {
            let Some(model_path) = model_path else {
                anyhow::bail!("out=trtllm requires flag --model-path=<full-path-to-model-dir>");
            };
            if !model_path.is_dir() {
                anyhow::bail!(
                    "--model-path should point at a directory containing `.engine` files."
                );
            }
            // Safety: Earlier we build maybe_card from model_path, which we checked right above
            let card = maybe_card.clone().unwrap();
400
401
402
403
            let engine = dynamo_engine_trtllm::make_engine(
                model_path.display(),
                flags.tensor_parallel_size,
            )?;
Graham King's avatar
Graham King committed
404
405
406
            EngineConfig::StaticCore {
                service_name: card.service_name.clone(),
                engine,
407
408
409
                card: Box::new(card),
            }
        }
410
411
412
413
414
        #[cfg(feature = "python")]
        Output::PythonStr(path_str) => {
            let Some(model_name) = model_name else {
                anyhow::bail!("Provide model service name as `--model-name <this>`");
            };
415
            let py_args = flags.as_vec(&path_str, &model_name);
416
            let p = std::path::PathBuf::from(path_str);
417
418
            let engine =
                dynamo_engine_python::make_string_engine(cancel_token.clone(), &p, py_args).await?;
419
420
421
422
423
            EngineConfig::StaticFull {
                service_name: model_name,
                engine,
            }
        }
424
425
426
427
428
429
430
431
        #[cfg(feature = "python")]
        Output::PythonTok(path_str) => {
            let Some(card) = maybe_card.clone() else {
                anyhow::bail!("Could not find tokenizer. Pass flag --model-path <path>");
            };
            let Some(model_name) = model_name else {
                unreachable!("If we have a card we must have a model name");
            };
432
            let py_args = flags.as_vec(&path_str, &model_name);
433
            let p = std::path::PathBuf::from(path_str);
434
435
            let engine =
                dynamo_engine_python::make_token_engine(cancel_token.clone(), &p, py_args).await?;
436
437
438
439
440
441
            EngineConfig::StaticCore {
                service_name: model_name.clone(),
                engine,
                card: Box::new(card),
            }
        }
442
443
444
445
    };

    match in_opt {
        Input::Http => {
446
            crate::input::http::run(runtime.clone(), flags, engine_config).await?;
447
448
        }
        Input::Text => {
449
            crate::input::text::run(runtime.clone(), flags, None, engine_config).await?;
450
451
452
453
        }
        Input::Stdin => {
            let mut prompt = String::new();
            std::io::stdin().read_to_string(&mut prompt).unwrap();
454
            crate::input::text::run(runtime.clone(), flags, Some(prompt), engine_config).await?;
455
        }
456
        Input::Batch(path) => {
457
458
            crate::input::batch::run(runtime.clone(), flags, maybe_card, path, engine_config)
                .await?;
459
        }
460
        Input::Endpoint(path) => {
461
462
463
464
            let Some(dyn_input) = dyn_input else {
                unreachable!("We set dyn_input earlier");
            };
            crate::input::endpoint::run(dyn_input.distributed_runtime, path, engine_config).await?;
465
        }
466
467
468
469
470
471
472
473
        Input::None => {
            // Multi-node setup. The engine sub-process has been started and is talking
            // to it's node_rank 0 controller. We do nothing.
            // TODO: Acquire an etcd lease, we are running
            cancel_token.cancelled().await;
        }
    }

Graham King's avatar
Graham King committed
474
    #[cfg(any(feature = "vllm", feature = "sglang"))]
475
476
    // Allow engines to ask main thread to wait on an extra future.
    if let Some(extra) = extra {
477
        extra.await;
478
479
480
481
    }

    Ok(())
}