lib.rs 14.2 KB
Newer Older
1
2
3
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

4
use std::{future::Future, pin::Pin};
5
use std::{io::Read, sync::Arc, time::Duration};
6

7
use anyhow::Context;
8
use dynamo_llm::{backend::ExecutionContext, engines::StreamingEngine, local_model::LocalModel};
9
use dynamo_runtime::{CancellationToken, DistributedRuntime};
10

11
12
mod flags;
pub use flags::Flags;
13
14
mod input;
mod opt;
15
pub use dynamo_llm::request_template::RequestTemplate;
16
pub use opt::{Input, Output};
17
mod subprocess;
18

19
20
const CHILD_STOP_TIMEOUT: Duration = Duration::from_secs(2);

21
22
23
/// Where we will attach the vllm/sglang subprocess. Invisible to users.
pub const INTERNAL_ENDPOINT: &str = "dyn://dynamo.internal.worker";

24
25
26
/// Default size of a KV cache block. Override with --kv-cache-block-size
const DEFAULT_KV_CACHE_BLOCK_SIZE: usize = 16;

27
pub enum EngineConfig {
28
29
    /// Remote networked engines
    Dynamic,
30

31
32
    /// A Full service engine does it's own tokenization and prompt formatting.
    StaticFull {
33
        engine: Arc<dyn StreamingEngine>,
34
        model: Box<LocalModel>,
35
    },
36
37
38
39

    /// A core engine expects to be wrapped with pre/post processors that handle tokenization.
    StaticCore {
        engine: ExecutionContext,
40
        model: Box<LocalModel>,
41
    },
42
43
}

44
45
46
47
48
49
50
51
fn is_in_dynamic(in_opt: &Input) -> bool {
    matches!(in_opt, Input::Endpoint(_))
}

fn is_out_dynamic(out_opt: &Option<Output>) -> bool {
    matches!(out_opt, Some(Output::Dynamic))
}

52
pub async fn run(
Neelay Shah's avatar
Neelay Shah committed
53
    runtime: dynamo_runtime::Runtime,
54
    in_opt: Input,
55
    out_opt: Option<Output>,
56
57
    flags: Flags,
) -> anyhow::Result<()> {
58
    if is_in_dynamic(&in_opt) && is_out_dynamic(&out_opt) {
59
60
61
        anyhow::bail!("Cannot use endpoint for both in and out");
    }

62
    let cancel_token = runtime.primary_token();
63
    let maybe_path = flags
64
        .model_path_pos
65
        .clone()
66
        .or(flags.model_path_flag.clone());
67

68
    let mut local_model: LocalModel = if is_out_dynamic(&out_opt) {
69
        // If output is dynamic we are ingress and don't have a local model, but making an
70
        // empty one cleans up the code.
71
72
        Default::default()
    } else {
73
        // All other output types have a local model
74
75
76
77
78
79
80
81
82
83
84
85
86
87
        match &maybe_path {
            Some(model_path) => {
                LocalModel::prepare(
                    model_path.to_str().context("Invalid UTF-8 in model path")?,
                    flags.model_config.as_deref(),
                    flags.model_name.clone(),
                )
                .await?
            }
            None => {
                // echo_full engine doesn't need a path
                match &flags.model_name {
                    Some(name) => LocalModel::with_name_only(name),
                    None => Default::default(),
88
89
                }
            }
Graham King's avatar
Graham King committed
90
        }
91
    };
92
93
94
95
96
97
98
99
100
101
102

    // Only set if user provides. Usually loaded from tokenizer_config.json
    if let Some(context_length) = flags.context_length {
        local_model.set_context_length(context_length);
    }
    // Always set, there is no engine provided default
    local_model.set_kv_cache_block_size(
        flags
            .kv_cache_block_size
            .unwrap_or(DEFAULT_KV_CACHE_BLOCK_SIZE),
    );
103

104
    let mut extra: Option<Pin<Box<dyn Future<Output = ()> + Send>>> = None; // vllm and sglang sub-process
105

106
107
108
109
110
111
112
113
    let template = if let Some(path) = flags.request_template.as_ref() {
        let template = RequestTemplate::load(path)?;
        tracing::debug!("Using request template: {template:?}");
        Some(template)
    } else {
        None
    };

114
115
116
    // We may need it later
    let card = local_model.card().clone();

117
118
    let out_opt = out_opt.unwrap_or_else(|| {
        let default_engine = if card.is_gguf() {
119
            gguf_default()
120
        } else {
121
            safetensors_default()
122
123
124
125
126
127
128
129
130
        };
        tracing::info!(
            "Using default engine: {default_engine}. Use out=<engine> to specify one of {}",
            Output::available_engines().join(", ")
        );
        default_engine
    });
    print_cuda(&out_opt);

131
132
    // Create the engine matching `out`
    let engine_config = match out_opt {
133
134
135
136
137
138
139
140
141
142
        Output::Dynamic => {
            // Sanity check - TODO probably make a general sanity check at start of method
            if flags.context_length.is_some() {
                anyhow::bail!("'--content-length' flag should only be used on the worker node, not on the ingress");
            }
            if flags.kv_cache_block_size.is_some() {
                anyhow::bail!("'--kv-cache-block-size' flag should only be used on the worker node, not on the ingress");
            }
            EngineConfig::Dynamic
        }
143
144
145
146
        Output::EchoFull => EngineConfig::StaticFull {
            model: Box::new(local_model),
            engine: dynamo_llm::engines::make_engine_full(),
        },
147
        Output::EchoCore => {
148
149
            let card = local_model.card();
            if !card.has_tokenizer() {
150
151
152
153
154
                anyhow::bail!(
                    "out=echo_core need to find the tokenizer. Pass flag --model-path <path>"
                );
            };
            EngineConfig::StaticCore {
155
                engine: dynamo_llm::engines::make_engine_core(),
156
                model: Box::new(local_model),
157
158
            }
        }
159
        #[cfg(feature = "mistralrs")]
160
        Output::MistralRs => EngineConfig::StaticFull {
161
            engine: dynamo_engine_mistralrs::make_engine(&local_model).await?,
162
163
            model: Box::new(local_model),
        },
164
        Output::SgLang => {
165
166
167
168
            if !local_model.path().is_dir() {
                // TODO Does sglang support GGUF? Can we make it work?
                anyhow::bail!("`--model-path should point at a HuggingFace repo checkout");
            }
169
170
171
172
173
174
175
176

            // If `in=dyn` we want the sglang subprocess to listen on that endpoint.
            // If not, then the endpoint isn't exposed so we invent an internal one.
            let endpoint = match &in_opt {
                Input::Endpoint(path) => path.parse()?,
                _ => INTERNAL_ENDPOINT.parse()?,
            };

177
178
179
180
181
182
            let multi_node_conf = dynamo_llm::engines::MultiNodeConfig {
                num_nodes: flags.num_nodes,
                node_rank: flags.node_rank,
                leader_addr: flags.leader_addr.clone().unwrap_or_default(),
            };
            let (py_script, child) = match subprocess::start(
183
                subprocess::sglang::PY,
184
                &local_model,
185
                &endpoint,
186
                flags.clone(),
187
188
189
190
191
                if flags.num_nodes <= 1 {
                    None
                } else {
                    Some(multi_node_conf)
                },
192
193
194
195
196
197
198
199
200
201
202
203
204
205
            )
            .await
            {
                Ok(x) => x,
                Err(err) => {
                    anyhow::bail!("Failed starting sglang sub-process: {err}");
                }
            };
            let cancel_token = cancel_token.clone();

            // Sub-process cleanup
            extra = Some(Box::pin(async move {
                stopper(cancel_token, child, py_script).await;
            }));
206
            EngineConfig::Dynamic
207
208
209
210
211
        }
        Output::Vllm => {
            if flags.base_gpu_id != 0 {
                anyhow::bail!("vllm does not support base_gpu_id. Set environment variable CUDA_VISIBLE_DEVICES instead.");
            }
212
213
214
215
216
217
218
219

            // If `in=dyn` we want the vllm subprocess to listen on that endpoint.
            // If not, then the endpoint isn't exposed so we invent an internal one.
            let endpoint = match &in_opt {
                Input::Endpoint(path) => path.parse()?,
                _ => INTERNAL_ENDPOINT.parse()?,
            };

220
            let (py_script, child) = match subprocess::start(
221
                subprocess::vllm::PY,
222
                &local_model,
223
                &endpoint,
224
                flags.clone(),
225
                None, // multi-node config. vllm uses `ray`, see guide
226
227
228
229
230
231
232
233
234
235
236
237
238
239
            )
            .await
            {
                Ok(x) => x,
                Err(err) => {
                    anyhow::bail!("Failed starting vllm sub-process: {err}");
                }
            };
            let cancel_token = cancel_token.clone();

            // Sub-process cleanup
            extra = Some(Box::pin(async move {
                stopper(cancel_token, child, py_script).await;
            }));
240
            EngineConfig::Dynamic
241
        }
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
        Output::Trtllm => {
            if flags.base_gpu_id != 0 {
                anyhow::bail!("TRTLLM does not support base_gpu_id. Set environment variable CUDA_VISIBLE_DEVICES instead.");
            }

            // If `in=dyn` we want the trtllm subprocess to listen on that endpoint.
            // If not, then the endpoint isn't exposed so we invent an internal one.
            let endpoint = match &in_opt {
                Input::Endpoint(path) => path.parse()?,
                _ => INTERNAL_ENDPOINT.parse()?,
            };

            let (py_script, child) = match subprocess::start(
                subprocess::trtllm::PY,
                &local_model,
                &endpoint,
                flags.clone(),
                None, // multi-node config. trtlllm uses `mpi`, see guide
            )
            .await
            {
                Ok(x) => x,
                Err(err) => {
                    anyhow::bail!("Failed starting trtllm sub-process: {err}");
                }
            };
            let cancel_token = cancel_token.clone();

            // Sub-process cleanup
            extra = Some(Box::pin(async move {
                stopper(cancel_token, child, py_script).await;
            }));
            EngineConfig::Dynamic
        }
276

277
278
        #[cfg(feature = "llamacpp")]
        Output::LlamaCpp => {
279
            if !local_model.path().is_file() {
280
281
                anyhow::bail!("--model-path should refer to a GGUF file. llama_cpp does not support safetensors.");
            }
282
            let engine =
283
                dynamo_engine_llamacpp::make_engine(cancel_token.clone(), &local_model).await?;
284
            EngineConfig::StaticCore {
285
                engine,
286
                model: Box::new(local_model),
Graham King's avatar
Graham King committed
287
288
            }
        }
289
290
291
292
    };

    match in_opt {
        Input::Http => {
293
            crate::input::http::run(runtime.clone(), flags, engine_config, template).await?;
294
295
        }
        Input::Text => {
296
            crate::input::text::run(runtime.clone(), flags, None, engine_config, template).await?;
297
298
299
300
        }
        Input::Stdin => {
            let mut prompt = String::new();
            std::io::stdin().read_to_string(&mut prompt).unwrap();
301
302
303
304
305
306
307
308
            crate::input::text::run(
                runtime.clone(),
                flags,
                Some(prompt),
                engine_config,
                template,
            )
            .await?;
309
        }
310
        Input::Batch(path) => {
311
312
            crate::input::batch::run(runtime.clone(), flags, card, path, engine_config, template)
                .await?;
313
        }
314
        Input::Endpoint(path) => {
315
316
            let distributed_runtime = DistributedRuntime::from_settings(runtime.clone()).await?;
            crate::input::endpoint::run(distributed_runtime, path, engine_config).await?;
317
318
319
320
        }
    }

    // Allow engines to ask main thread to wait on an extra future.
321
    // We use this to stop the vllm and sglang sub-process
322
    if let Some(extra) = extra {
323
        extra.await;
324
325
326
327
    }

    Ok(())
}
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366

/// Wait for cancel_token to be cancelled, then stop the child as gracefully as possible.
/// Keeps the TempPath alive until the child is stopped.
async fn stopper(
    cancel_token: CancellationToken,
    mut child: tokio::process::Child,
    py_script: tempfile::TempPath,
) {
    cancel_token.cancelled().await;

    // Ask subprocess to stop gracefully
    if let Some(pid) = child.id() {
        unsafe { libc::kill(pid as i32, libc::SIGTERM) };
    }

    tokio::select! {
        exit = child.wait() => {
            tracing::trace!("vllm sub-process graceful exit");
            match exit {
                Ok(exit_status) if exit_status.success() => {}
                Ok(exit_status) => {
                    // This is nearly always 15 (SIGTERM)
                    tracing::trace!("vllm sub-process non-0 exit: {exit_status}");
                }
                Err(err) => {
                    tracing::warn!("vllm sub-process error getting exit status: {err}");
                }
            }
        }
        _ = tokio::time::sleep(CHILD_STOP_TIMEOUT) => {
            // It didn't stop in time, kill it
            child.kill().await.expect("Failed killing vllm subprocess");
            let _ = child.wait().await;
        }
    }
    // This temporary file contains the python script running the engine. It deletes on drop.
    // Keep it alive until the engine has stopped.
    drop(py_script);
}
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402

/// If the user will benefit from CUDA/Metal/Vulkan, remind them to build with it.
/// If they have it, celebrate!
// Only mistralrs and llamacpp need to be built with CUDA.
// The Python engines only need it at runtime.
#[cfg(any(feature = "mistralrs", feature = "llamacpp"))]
fn print_cuda(output: &Output) {
    // These engines maybe be compiled in, but are they the chosen one?
    match output {
        #[cfg(feature = "mistralrs")]
        Output::MistralRs => {}
        #[cfg(feature = "llamacpp")]
        Output::LlamaCpp => {}
        _ => {
            return;
        }
    }

    #[cfg(feature = "cuda")]
    {
        tracing::info!("CUDA on");
    }
    #[cfg(feature = "metal")]
    {
        tracing::info!("Metal on");
    }
    #[cfg(feature = "vulkan")]
    {
        tracing::info!("Vulkan on");
    }
    #[cfg(not(any(feature = "cuda", feature = "metal", feature = "vulkan")))]
    tracing::info!("CPU mode. Rebuild with `--features cuda|metal|vulkan` for better performance");
}

#[cfg(not(any(feature = "mistralrs", feature = "llamacpp")))]
fn print_cuda(_output: &Output) {}
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421

fn gguf_default() -> Output {
    #[cfg(feature = "llamacpp")]
    return Output::LlamaCpp;

    #[cfg(all(feature = "mistralrs", not(feature = "llamacpp")))]
    return Output::MistralRs;

    #[cfg(not(any(feature = "mistralrs", feature = "llamacpp")))]
    return Output::EchoFull;
}

fn safetensors_default() -> Output {
    #[cfg(feature = "mistralrs")]
    return Output::MistralRs;

    #[cfg(not(feature = "mistralrs"))]
    return Output::EchoFull;
}