lib.rs 12.2 KB
Newer Older
1
2
3
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

4
use std::{future::Future, pin::Pin};
5
use std::{io::Read, sync::Arc, time::Duration};
6

7
use anyhow::Context;
8
use dynamo_llm::{backend::ExecutionContext, engines::StreamingEngine, local_model::LocalModel};
9
use dynamo_runtime::{CancellationToken, DistributedRuntime};
10

11
12
mod flags;
pub use flags::Flags;
13
14
mod input;
mod opt;
15
pub use dynamo_llm::request_template::RequestTemplate;
16
pub use opt::{Input, Output};
17
mod subprocess;
18

19
20
const CHILD_STOP_TIMEOUT: Duration = Duration::from_secs(2);

21
22
23
/// Where we will attach the vllm/sglang subprocess. Invisible to users.
pub const INTERNAL_ENDPOINT: &str = "dyn://dynamo.internal.worker";

24
25
26
/// Default size of a KV cache block. Override with --kv-cache-block-size
const DEFAULT_KV_CACHE_BLOCK_SIZE: usize = 16;

27
pub enum EngineConfig {
28
29
    /// Remote networked engines
    Dynamic,
30

31
32
    /// A Full service engine does it's own tokenization and prompt formatting.
    StaticFull {
33
        engine: Arc<dyn StreamingEngine>,
34
        model: Box<LocalModel>,
35
    },
36
37
38
39

    /// A core engine expects to be wrapped with pre/post processors that handle tokenization.
    StaticCore {
        engine: ExecutionContext,
40
        model: Box<LocalModel>,
41
    },
42
43
}

44
pub async fn run(
Neelay Shah's avatar
Neelay Shah committed
45
    runtime: dynamo_runtime::Runtime,
46
    in_opt: Input,
47
48
49
    out_opt: Output,
    flags: Flags,
) -> anyhow::Result<()> {
50
    if matches!(&in_opt, Input::Endpoint(_)) && matches!(&out_opt, Output::Dynamic) {
51
52
53
        anyhow::bail!("Cannot use endpoint for both in and out");
    }

54
    let cancel_token = runtime.primary_token();
55
    let maybe_path = flags
56
        .model_path_pos
57
        .clone()
58
        .or(flags.model_path_flag.clone());
59

60
    let mut local_model: LocalModel = match out_opt {
61
        // If output is dynamic we are ingress and don't have a local model, but making an
62
        // empty one cleans up the code.
63
        Output::Dynamic => Default::default(),
64
65

        // All other output types have a local model
66
67
68
69
70
71
        _ => {
            match &maybe_path {
                Some(model_path) => {
                    LocalModel::prepare(
                        model_path.to_str().context("Invalid UTF-8 in model path")?,
                        flags.model_config.as_deref(),
72
                        flags.model_name.clone(),
73
74
                    )
                    .await?
75
                }
76
77
                None => {
                    // echo_full engine doesn't need a path
78
79
80
81
                    match &flags.model_name {
                        Some(name) => LocalModel::with_name_only(name),
                        None => Default::default(),
                    }
82
83
                }
            }
Graham King's avatar
Graham King committed
84
        }
85
    };
86
87
88
89
90
91
92
93
94
95
96

    // Only set if user provides. Usually loaded from tokenizer_config.json
    if let Some(context_length) = flags.context_length {
        local_model.set_context_length(context_length);
    }
    // Always set, there is no engine provided default
    local_model.set_kv_cache_block_size(
        flags
            .kv_cache_block_size
            .unwrap_or(DEFAULT_KV_CACHE_BLOCK_SIZE),
    );
97

98
    let mut extra: Option<Pin<Box<dyn Future<Output = ()> + Send>>> = None; // vllm and sglang sub-process
99

100
101
102
103
104
105
106
107
    let template = if let Some(path) = flags.request_template.as_ref() {
        let template = RequestTemplate::load(path)?;
        tracing::debug!("Using request template: {template:?}");
        Some(template)
    } else {
        None
    };

108
109
110
    // We may need it later
    let card = local_model.card().clone();

111
112
    // Create the engine matching `out`
    let engine_config = match out_opt {
113
114
115
116
117
118
119
120
121
122
        Output::Dynamic => {
            // Sanity check - TODO probably make a general sanity check at start of method
            if flags.context_length.is_some() {
                anyhow::bail!("'--content-length' flag should only be used on the worker node, not on the ingress");
            }
            if flags.kv_cache_block_size.is_some() {
                anyhow::bail!("'--kv-cache-block-size' flag should only be used on the worker node, not on the ingress");
            }
            EngineConfig::Dynamic
        }
123
124
125
126
        Output::EchoFull => EngineConfig::StaticFull {
            model: Box::new(local_model),
            engine: dynamo_llm::engines::make_engine_full(),
        },
127
        Output::EchoCore => {
128
129
            let card = local_model.card();
            if !card.has_tokenizer() {
130
131
132
133
134
                anyhow::bail!(
                    "out=echo_core need to find the tokenizer. Pass flag --model-path <path>"
                );
            };
            EngineConfig::StaticCore {
135
                engine: dynamo_llm::engines::make_engine_core(),
136
                model: Box::new(local_model),
137
138
            }
        }
139
        #[cfg(feature = "mistralrs")]
140
        Output::MistralRs => EngineConfig::StaticFull {
141
            engine: dynamo_engine_mistralrs::make_engine(&local_model).await?,
142
143
            model: Box::new(local_model),
        },
144
        Output::SgLang => {
145
146
147
148
            if !local_model.path().is_dir() {
                // TODO Does sglang support GGUF? Can we make it work?
                anyhow::bail!("`--model-path should point at a HuggingFace repo checkout");
            }
149
150
151
152
153
154
155
156

            // If `in=dyn` we want the sglang subprocess to listen on that endpoint.
            // If not, then the endpoint isn't exposed so we invent an internal one.
            let endpoint = match &in_opt {
                Input::Endpoint(path) => path.parse()?,
                _ => INTERNAL_ENDPOINT.parse()?,
            };

157
158
159
160
161
162
            let multi_node_conf = dynamo_llm::engines::MultiNodeConfig {
                num_nodes: flags.num_nodes,
                node_rank: flags.node_rank,
                leader_addr: flags.leader_addr.clone().unwrap_or_default(),
            };
            let (py_script, child) = match subprocess::start(
163
                subprocess::sglang::PY,
164
                &local_model,
165
                &endpoint,
166
                flags.clone(),
167
168
169
170
171
                if flags.num_nodes <= 1 {
                    None
                } else {
                    Some(multi_node_conf)
                },
172
173
174
175
176
177
178
179
180
181
182
183
184
185
            )
            .await
            {
                Ok(x) => x,
                Err(err) => {
                    anyhow::bail!("Failed starting sglang sub-process: {err}");
                }
            };
            let cancel_token = cancel_token.clone();

            // Sub-process cleanup
            extra = Some(Box::pin(async move {
                stopper(cancel_token, child, py_script).await;
            }));
186
            EngineConfig::Dynamic
187
188
189
190
191
        }
        Output::Vllm => {
            if flags.base_gpu_id != 0 {
                anyhow::bail!("vllm does not support base_gpu_id. Set environment variable CUDA_VISIBLE_DEVICES instead.");
            }
192
193
194
195
196
197
198
199

            // If `in=dyn` we want the vllm subprocess to listen on that endpoint.
            // If not, then the endpoint isn't exposed so we invent an internal one.
            let endpoint = match &in_opt {
                Input::Endpoint(path) => path.parse()?,
                _ => INTERNAL_ENDPOINT.parse()?,
            };

200
            let (py_script, child) = match subprocess::start(
201
                subprocess::vllm::PY,
202
                &local_model,
203
                &endpoint,
204
                flags.clone(),
205
                None, // multi-node config. vllm uses `ray`, see guide
206
207
208
209
210
211
212
213
214
215
216
217
218
219
            )
            .await
            {
                Ok(x) => x,
                Err(err) => {
                    anyhow::bail!("Failed starting vllm sub-process: {err}");
                }
            };
            let cancel_token = cancel_token.clone();

            // Sub-process cleanup
            extra = Some(Box::pin(async move {
                stopper(cancel_token, child, py_script).await;
            }));
220
            EngineConfig::Dynamic
221
        }
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
        Output::Trtllm => {
            if flags.base_gpu_id != 0 {
                anyhow::bail!("TRTLLM does not support base_gpu_id. Set environment variable CUDA_VISIBLE_DEVICES instead.");
            }

            // If `in=dyn` we want the trtllm subprocess to listen on that endpoint.
            // If not, then the endpoint isn't exposed so we invent an internal one.
            let endpoint = match &in_opt {
                Input::Endpoint(path) => path.parse()?,
                _ => INTERNAL_ENDPOINT.parse()?,
            };

            let (py_script, child) = match subprocess::start(
                subprocess::trtllm::PY,
                &local_model,
                &endpoint,
                flags.clone(),
                None, // multi-node config. trtlllm uses `mpi`, see guide
            )
            .await
            {
                Ok(x) => x,
                Err(err) => {
                    anyhow::bail!("Failed starting trtllm sub-process: {err}");
                }
            };
            let cancel_token = cancel_token.clone();

            // Sub-process cleanup
            extra = Some(Box::pin(async move {
                stopper(cancel_token, child, py_script).await;
            }));
            EngineConfig::Dynamic
        }
256

257
258
        #[cfg(feature = "llamacpp")]
        Output::LlamaCpp => {
259
            if !local_model.path().is_file() {
260
261
                anyhow::bail!("--model-path should refer to a GGUF file. llama_cpp does not support safetensors.");
            }
262
            let engine =
263
                dynamo_engine_llamacpp::make_engine(cancel_token.clone(), &local_model).await?;
264
            EngineConfig::StaticCore {
265
                engine,
266
                model: Box::new(local_model),
Graham King's avatar
Graham King committed
267
268
            }
        }
269
270
271
272
    };

    match in_opt {
        Input::Http => {
273
            crate::input::http::run(runtime.clone(), flags, engine_config, template).await?;
274
275
        }
        Input::Text => {
276
            crate::input::text::run(runtime.clone(), flags, None, engine_config, template).await?;
277
278
279
280
        }
        Input::Stdin => {
            let mut prompt = String::new();
            std::io::stdin().read_to_string(&mut prompt).unwrap();
281
282
283
284
285
286
287
288
            crate::input::text::run(
                runtime.clone(),
                flags,
                Some(prompt),
                engine_config,
                template,
            )
            .await?;
289
        }
290
        Input::Batch(path) => {
291
292
            crate::input::batch::run(runtime.clone(), flags, card, path, engine_config, template)
                .await?;
293
        }
294
        Input::Endpoint(path) => {
295
296
            let distributed_runtime = DistributedRuntime::from_settings(runtime.clone()).await?;
            crate::input::endpoint::run(distributed_runtime, path, engine_config).await?;
297
298
299
300
        }
    }

    // Allow engines to ask main thread to wait on an extra future.
301
    // We use this to stop the vllm and sglang sub-process
302
    if let Some(extra) = extra {
303
        extra.await;
304
305
306
307
    }

    Ok(())
}
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346

/// Wait for cancel_token to be cancelled, then stop the child as gracefully as possible.
/// Keeps the TempPath alive until the child is stopped.
async fn stopper(
    cancel_token: CancellationToken,
    mut child: tokio::process::Child,
    py_script: tempfile::TempPath,
) {
    cancel_token.cancelled().await;

    // Ask subprocess to stop gracefully
    if let Some(pid) = child.id() {
        unsafe { libc::kill(pid as i32, libc::SIGTERM) };
    }

    tokio::select! {
        exit = child.wait() => {
            tracing::trace!("vllm sub-process graceful exit");
            match exit {
                Ok(exit_status) if exit_status.success() => {}
                Ok(exit_status) => {
                    // This is nearly always 15 (SIGTERM)
                    tracing::trace!("vllm sub-process non-0 exit: {exit_status}");
                }
                Err(err) => {
                    tracing::warn!("vllm sub-process error getting exit status: {err}");
                }
            }
        }
        _ = tokio::time::sleep(CHILD_STOP_TIMEOUT) => {
            // It didn't stop in time, kill it
            child.kill().await.expect("Failed killing vllm subprocess");
            let _ = child.wait().await;
        }
    }
    // This temporary file contains the python script running the engine. It deletes on drop.
    // Keep it alive until the engine has stopped.
    drop(py_script);
}