lib.rs 12.8 KB
Newer Older
1
2
3
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

4
use std::{future::Future, pin::Pin};
5
use std::{io::Read, sync::Arc, time::Duration};
6

7
use anyhow::Context;
8
use dynamo_llm::{backend::ExecutionContext, engines::StreamingEngine, local_model::LocalModel};
9
use dynamo_runtime::{CancellationToken, DistributedRuntime};
10

11
12
mod flags;
pub use flags::Flags;
13
14
mod input;
mod opt;
15
pub use dynamo_llm::request_template::RequestTemplate;
16
pub use opt::{Input, Output};
17
mod subprocess;
18

19
20
const CHILD_STOP_TIMEOUT: Duration = Duration::from_secs(2);

21
22
23
24
/// How we identify a python string endpoint
#[cfg(feature = "python")]
const PYTHON_STR_SCHEME: &str = "pystr:";

25
26
27
/// Where we will attach the vllm/sglang subprocess. Invisible to users.
pub const INTERNAL_ENDPOINT: &str = "dyn://dynamo.internal.worker";

28
29
30
/// Default size of a KV cache block. Override with --kv-cache-block-size
const DEFAULT_KV_CACHE_BLOCK_SIZE: usize = 16;

31
pub enum EngineConfig {
32
33
    /// Remote networked engines
    Dynamic,
34

35
36
    /// A Full service engine does it's own tokenization and prompt formatting.
    StaticFull {
37
        engine: Arc<dyn StreamingEngine>,
38
        model: Box<LocalModel>,
39
    },
40
41
42
43

    /// A core engine expects to be wrapped with pre/post processors that handle tokenization.
    StaticCore {
        engine: ExecutionContext,
44
        model: Box<LocalModel>,
45
    },
46
47
}

48
pub async fn run(
Neelay Shah's avatar
Neelay Shah committed
49
    runtime: dynamo_runtime::Runtime,
50
    in_opt: Input,
51
52
53
    out_opt: Output,
    flags: Flags,
) -> anyhow::Result<()> {
54
    if matches!(&in_opt, Input::Endpoint(_)) && matches!(&out_opt, Output::Dynamic) {
55
56
57
        anyhow::bail!("Cannot use endpoint for both in and out");
    }

58
    let cancel_token = runtime.primary_token();
59
    let maybe_path = flags
60
        .model_path_pos
61
        .clone()
62
        .or(flags.model_path_flag.clone());
63

64
    let mut local_model: LocalModel = match out_opt {
65
        // If output is dynamic we are ingress and don't have a local model, but making an
66
        // empty one cleans up the code.
67
        Output::Dynamic => Default::default(),
68
69

        // All other output types have a local model
70
71
72
73
74
75
        _ => {
            match &maybe_path {
                Some(model_path) => {
                    LocalModel::prepare(
                        model_path.to_str().context("Invalid UTF-8 in model path")?,
                        flags.model_config.as_deref(),
76
                        flags.model_name.clone(),
77
78
                    )
                    .await?
79
                }
80
81
                None => {
                    // echo_full engine doesn't need a path
82
83
84
85
                    match &flags.model_name {
                        Some(name) => LocalModel::with_name_only(name),
                        None => Default::default(),
                    }
86
87
                }
            }
Graham King's avatar
Graham King committed
88
        }
89
    };
90
91
92
93
94
95
96
97
98
99
100

    // Only set if user provides. Usually loaded from tokenizer_config.json
    if let Some(context_length) = flags.context_length {
        local_model.set_context_length(context_length);
    }
    // Always set, there is no engine provided default
    local_model.set_kv_cache_block_size(
        flags
            .kv_cache_block_size
            .unwrap_or(DEFAULT_KV_CACHE_BLOCK_SIZE),
    );
101

102
    let mut extra: Option<Pin<Box<dyn Future<Output = ()> + Send>>> = None; // vllm and sglang sub-process
103

104
105
106
107
108
109
110
111
    let template = if let Some(path) = flags.request_template.as_ref() {
        let template = RequestTemplate::load(path)?;
        tracing::debug!("Using request template: {template:?}");
        Some(template)
    } else {
        None
    };

112
113
114
    // We may need it later
    let card = local_model.card().clone();

115
116
    // Create the engine matching `out`
    let engine_config = match out_opt {
117
118
119
120
121
122
123
124
125
126
        Output::Dynamic => {
            // Sanity check - TODO probably make a general sanity check at start of method
            if flags.context_length.is_some() {
                anyhow::bail!("'--content-length' flag should only be used on the worker node, not on the ingress");
            }
            if flags.kv_cache_block_size.is_some() {
                anyhow::bail!("'--kv-cache-block-size' flag should only be used on the worker node, not on the ingress");
            }
            EngineConfig::Dynamic
        }
127
128
129
130
        Output::EchoFull => EngineConfig::StaticFull {
            model: Box::new(local_model),
            engine: dynamo_llm::engines::make_engine_full(),
        },
131
        Output::EchoCore => {
132
133
            let card = local_model.card();
            if !card.has_tokenizer() {
134
135
136
137
138
                anyhow::bail!(
                    "out=echo_core need to find the tokenizer. Pass flag --model-path <path>"
                );
            };
            EngineConfig::StaticCore {
139
                engine: dynamo_llm::engines::make_engine_core(),
140
                model: Box::new(local_model),
141
142
            }
        }
143
        #[cfg(feature = "mistralrs")]
144
        Output::MistralRs => EngineConfig::StaticFull {
145
            engine: dynamo_engine_mistralrs::make_engine(&local_model).await?,
146
147
            model: Box::new(local_model),
        },
148
        Output::SgLang => {
149
150
151
152
            if !local_model.path().is_dir() {
                // TODO Does sglang support GGUF? Can we make it work?
                anyhow::bail!("`--model-path should point at a HuggingFace repo checkout");
            }
153
154
155
156
157
158
159
160

            // If `in=dyn` we want the sglang subprocess to listen on that endpoint.
            // If not, then the endpoint isn't exposed so we invent an internal one.
            let endpoint = match &in_opt {
                Input::Endpoint(path) => path.parse()?,
                _ => INTERNAL_ENDPOINT.parse()?,
            };

161
162
163
164
165
166
            let multi_node_conf = dynamo_llm::engines::MultiNodeConfig {
                num_nodes: flags.num_nodes,
                node_rank: flags.node_rank,
                leader_addr: flags.leader_addr.clone().unwrap_or_default(),
            };
            let (py_script, child) = match subprocess::start(
167
                subprocess::sglang::PY,
168
                &local_model,
169
                &endpoint,
170
                flags.clone(),
171
172
173
174
175
                if flags.num_nodes <= 1 {
                    None
                } else {
                    Some(multi_node_conf)
                },
176
177
178
179
180
181
182
183
184
185
186
187
188
189
            )
            .await
            {
                Ok(x) => x,
                Err(err) => {
                    anyhow::bail!("Failed starting sglang sub-process: {err}");
                }
            };
            let cancel_token = cancel_token.clone();

            // Sub-process cleanup
            extra = Some(Box::pin(async move {
                stopper(cancel_token, child, py_script).await;
            }));
190
            EngineConfig::Dynamic
191
192
193
194
195
        }
        Output::Vllm => {
            if flags.base_gpu_id != 0 {
                anyhow::bail!("vllm does not support base_gpu_id. Set environment variable CUDA_VISIBLE_DEVICES instead.");
            }
196
197
198
199
200
201
202
203

            // If `in=dyn` we want the vllm subprocess to listen on that endpoint.
            // If not, then the endpoint isn't exposed so we invent an internal one.
            let endpoint = match &in_opt {
                Input::Endpoint(path) => path.parse()?,
                _ => INTERNAL_ENDPOINT.parse()?,
            };

204
            let (py_script, child) = match subprocess::start(
205
                subprocess::vllm::PY,
206
                &local_model,
207
                &endpoint,
208
                flags.clone(),
209
                None, // multi-node config. vllm uses `ray`, see guide
210
211
212
213
214
215
216
217
218
219
220
221
222
223
            )
            .await
            {
                Ok(x) => x,
                Err(err) => {
                    anyhow::bail!("Failed starting vllm sub-process: {err}");
                }
            };
            let cancel_token = cancel_token.clone();

            // Sub-process cleanup
            extra = Some(Box::pin(async move {
                stopper(cancel_token, child, py_script).await;
            }));
224
            EngineConfig::Dynamic
225
        }
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
        Output::Trtllm => {
            if flags.base_gpu_id != 0 {
                anyhow::bail!("TRTLLM does not support base_gpu_id. Set environment variable CUDA_VISIBLE_DEVICES instead.");
            }

            // If `in=dyn` we want the trtllm subprocess to listen on that endpoint.
            // If not, then the endpoint isn't exposed so we invent an internal one.
            let endpoint = match &in_opt {
                Input::Endpoint(path) => path.parse()?,
                _ => INTERNAL_ENDPOINT.parse()?,
            };

            let (py_script, child) = match subprocess::start(
                subprocess::trtllm::PY,
                &local_model,
                &endpoint,
                flags.clone(),
                None, // multi-node config. trtlllm uses `mpi`, see guide
            )
            .await
            {
                Ok(x) => x,
                Err(err) => {
                    anyhow::bail!("Failed starting trtllm sub-process: {err}");
                }
            };
            let cancel_token = cancel_token.clone();

            // Sub-process cleanup
            extra = Some(Box::pin(async move {
                stopper(cancel_token, child, py_script).await;
            }));
            EngineConfig::Dynamic
        }
260

261
262
        #[cfg(feature = "llamacpp")]
        Output::LlamaCpp => {
263
            if !local_model.path().is_file() {
264
265
                anyhow::bail!("--model-path should refer to a GGUF file. llama_cpp does not support safetensors.");
            }
266
            let engine =
267
                dynamo_engine_llamacpp::make_engine(cancel_token.clone(), &local_model).await?;
268
            EngineConfig::StaticCore {
269
                engine,
270
                model: Box::new(local_model),
Graham King's avatar
Graham King committed
271
272
            }
        }
273
274
        #[cfg(feature = "python")]
        Output::PythonStr(path_str) => {
275
276
            let card = local_model.card();
            let py_args = flags.as_vec(&path_str, &card.service_name);
277
            let p = std::path::PathBuf::from(path_str);
278
279
            let engine =
                dynamo_engine_python::make_string_engine(cancel_token.clone(), &p, py_args).await?;
280
281
            EngineConfig::StaticFull {
                engine,
282
                model: Box::new(local_model),
283
284
            }
        }
285
286
287
288
    };

    match in_opt {
        Input::Http => {
289
            crate::input::http::run(runtime.clone(), flags, engine_config, template).await?;
290
291
        }
        Input::Text => {
292
            crate::input::text::run(runtime.clone(), flags, None, engine_config, template).await?;
293
294
295
296
        }
        Input::Stdin => {
            let mut prompt = String::new();
            std::io::stdin().read_to_string(&mut prompt).unwrap();
297
298
299
300
301
302
303
304
            crate::input::text::run(
                runtime.clone(),
                flags,
                Some(prompt),
                engine_config,
                template,
            )
            .await?;
305
        }
306
        Input::Batch(path) => {
307
308
            crate::input::batch::run(runtime.clone(), flags, card, path, engine_config, template)
                .await?;
309
        }
310
        Input::Endpoint(path) => {
311
312
            let distributed_runtime = DistributedRuntime::from_settings(runtime.clone()).await?;
            crate::input::endpoint::run(distributed_runtime, path, engine_config).await?;
313
314
315
316
        }
    }

    // Allow engines to ask main thread to wait on an extra future.
317
    // We use this to stop the vllm and sglang sub-process
318
    if let Some(extra) = extra {
319
        extra.await;
320
321
322
323
    }

    Ok(())
}
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362

/// Wait for cancel_token to be cancelled, then stop the child as gracefully as possible.
/// Keeps the TempPath alive until the child is stopped.
async fn stopper(
    cancel_token: CancellationToken,
    mut child: tokio::process::Child,
    py_script: tempfile::TempPath,
) {
    cancel_token.cancelled().await;

    // Ask subprocess to stop gracefully
    if let Some(pid) = child.id() {
        unsafe { libc::kill(pid as i32, libc::SIGTERM) };
    }

    tokio::select! {
        exit = child.wait() => {
            tracing::trace!("vllm sub-process graceful exit");
            match exit {
                Ok(exit_status) if exit_status.success() => {}
                Ok(exit_status) => {
                    // This is nearly always 15 (SIGTERM)
                    tracing::trace!("vllm sub-process non-0 exit: {exit_status}");
                }
                Err(err) => {
                    tracing::warn!("vllm sub-process error getting exit status: {err}");
                }
            }
        }
        _ = tokio::time::sleep(CHILD_STOP_TIMEOUT) => {
            // It didn't stop in time, kill it
            child.kill().await.expect("Failed killing vllm subprocess");
            let _ = child.wait().await;
        }
    }
    // This temporary file contains the python script running the engine. It deletes on drop.
    // Keep it alive until the engine has stopped.
    drop(py_script);
}