lib.rs 10.5 KB
Newer Older
1
2
3
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

4
use std::{future::Future, pin::Pin};
5
use std::{io::Read, sync::Arc, time::Duration};
6

7
use anyhow::Context;
8
use dynamo_llm::{backend::ExecutionContext, engines::StreamingEngine, LocalModel};
9
use dynamo_runtime::{protocols::Endpoint, CancellationToken, DistributedRuntime};
10

11
12
mod flags;
pub use flags::Flags;
13
14
mod input;
mod opt;
15
pub use dynamo_llm::request_template::RequestTemplate;
16
pub use opt::{Input, Output};
17
mod subprocess;
18

19
20
21
22
/// When `in=text` the user doesn't need to know the model name, and doesn't need to provide it on
/// the command line. Hence it's optional, and defaults to this.
const INVISIBLE_MODEL_NAME: &str = "dynamo-run";

23
24
const CHILD_STOP_TIMEOUT: Duration = Duration::from_secs(2);

25
26
27
28
/// How we identify a python string endpoint
#[cfg(feature = "python")]
const PYTHON_STR_SCHEME: &str = "pystr:";

29
pub enum EngineConfig {
30
    /// An remote networked engine we don't know about yet
31
    Dynamic(Endpoint),
32

33
34
    /// A Full service engine does it's own tokenization and prompt formatting.
    StaticFull {
35
        engine: Arc<dyn StreamingEngine>,
36
        model: Box<LocalModel>,
37
    },
38
39
40
41

    /// A core engine expects to be wrapped with pre/post processors that handle tokenization.
    StaticCore {
        engine: ExecutionContext,
42
        model: Box<LocalModel>,
43
    },
44
45
}

46
pub async fn run(
Neelay Shah's avatar
Neelay Shah committed
47
    runtime: dynamo_runtime::Runtime,
48
    in_opt: Input,
49
50
51
    out_opt: Output,
    flags: Flags,
) -> anyhow::Result<()> {
52
    let cancel_token = runtime.primary_token();
53
    let maybe_path = flags
54
        .model_path_pos
55
        .clone()
56
        .or(flags.model_path_flag.clone());
57

58
59
60
61
    let local_model: LocalModel = match out_opt {
        // If output is an endpoint we are ingress and don't have a local model, but making an
        // empty one cleans up the code.
        Output::Endpoint(_) => Default::default(),
62
63

        // All other output types have a local model
64
65
66
67
68
69
70
71
72
73
74
75
76
77
        _ => {
            match &maybe_path {
                Some(model_path) => {
                    let maybe_model_name = if in_opt == Input::Text {
                        Some(INVISIBLE_MODEL_NAME.to_string())
                    } else {
                        flags.model_name.clone()
                    };
                    LocalModel::prepare(
                        model_path.to_str().context("Invalid UTF-8 in model path")?,
                        flags.model_config.as_deref(),
                        maybe_model_name.as_deref(),
                    )
                    .await?
78
                }
79
80
81
                None => {
                    // echo_full engine doesn't need a path
                    Default::default()
82
83
                }
            }
Graham King's avatar
Graham King committed
84
        }
85
    };
86

87
    let mut extra: Option<Pin<Box<dyn Future<Output = ()> + Send>>> = None; // vllm and sglang sub-process
88

89
90
91
92
93
94
95
96
    let template = if let Some(path) = flags.request_template.as_ref() {
        let template = RequestTemplate::load(path)?;
        tracing::debug!("Using request template: {template:?}");
        Some(template)
    } else {
        None
    };

97
98
99
    // We may need it later
    let card = local_model.card().clone();

100
101
    // Create the engine matching `out`
    let engine_config = match out_opt {
102
103
104
        Output::Endpoint(path) => {
            let endpoint: Endpoint = path.parse()?;
            EngineConfig::Dynamic(endpoint)
105
        }
106
107
108
109
        Output::EchoFull => EngineConfig::StaticFull {
            model: Box::new(local_model),
            engine: dynamo_llm::engines::make_engine_full(),
        },
110
        Output::EchoCore => {
111
112
            let card = local_model.card();
            if !card.has_tokenizer() {
113
114
115
116
117
                anyhow::bail!(
                    "out=echo_core need to find the tokenizer. Pass flag --model-path <path>"
                );
            };
            EngineConfig::StaticCore {
118
                engine: dynamo_llm::engines::make_engine_core(),
119
                model: Box::new(local_model),
120
121
            }
        }
122
        #[cfg(feature = "mistralrs")]
123
124
125
126
        Output::MistralRs => EngineConfig::StaticFull {
            engine: dynamo_engine_mistralrs::make_engine(local_model.path()).await?,
            model: Box::new(local_model),
        },
127
        Output::SgLang => {
128
129
130
131
            if !local_model.path().is_dir() {
                // TODO Does sglang support GGUF? Can we make it work?
                anyhow::bail!("`--model-path should point at a HuggingFace repo checkout");
            }
132
133
134
135
136
137
            let multi_node_conf = dynamo_llm::engines::MultiNodeConfig {
                num_nodes: flags.num_nodes,
                node_rank: flags.node_rank,
                leader_addr: flags.leader_addr.clone().unwrap_or_default(),
            };
            let (py_script, child) = match subprocess::start(
138
139
140
141
142
143
144
145
                subprocess::sglang::PY,
                local_model.path(),
                flags.tensor_parallel_size,
                if flags.base_gpu_id == 0 {
                    None
                } else {
                    Some(flags.base_gpu_id)
                },
146
147
148
149
150
                if flags.num_nodes <= 1 {
                    None
                } else {
                    Some(multi_node_conf)
                },
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
                flags.extra_engine_args.as_deref(),
            )
            .await
            {
                Ok(x) => x,
                Err(err) => {
                    anyhow::bail!("Failed starting sglang sub-process: {err}");
                }
            };
            let cancel_token = cancel_token.clone();

            // Sub-process cleanup
            extra = Some(Box::pin(async move {
                stopper(cancel_token, child, py_script).await;
            }));
            let endpoint: Endpoint = subprocess::ENDPOINT.parse()?;
            EngineConfig::Dynamic(endpoint)
        }
        Output::Vllm => {
            if flags.base_gpu_id != 0 {
                anyhow::bail!("vllm does not support base_gpu_id. Set environment variable CUDA_VISIBLE_DEVICES instead.");
            }
173
            let (py_script, child) = match subprocess::start(
174
175
176
177
                subprocess::vllm::PY,
                local_model.path(),
                flags.tensor_parallel_size,
                None, // base_gpu_id. vllm uses CUDA_VISIBLE_DEVICES instead
178
                None, // multi-node config. vllm uses `ray`, see guide
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
                flags.extra_engine_args.as_deref(),
            )
            .await
            {
                Ok(x) => x,
                Err(err) => {
                    anyhow::bail!("Failed starting vllm sub-process: {err}");
                }
            };
            let cancel_token = cancel_token.clone();

            // Sub-process cleanup
            extra = Some(Box::pin(async move {
                stopper(cancel_token, child, py_script).await;
            }));
            let endpoint: Endpoint = subprocess::ENDPOINT.parse()?;
            EngineConfig::Dynamic(endpoint)
        }

198
199
        #[cfg(feature = "llamacpp")]
        Output::LlamaCpp => {
200
            if !local_model.path().is_file() {
201
202
                anyhow::bail!("--model-path should refer to a GGUF file. llama_cpp does not support safetensors.");
            }
203
            let engine =
204
205
                dynamo_engine_llamacpp::make_engine(cancel_token.clone(), local_model.path())
                    .await?;
206
            EngineConfig::StaticCore {
207
                engine,
208
                model: Box::new(local_model),
Graham King's avatar
Graham King committed
209
210
            }
        }
211
212
        #[cfg(feature = "python")]
        Output::PythonStr(path_str) => {
213
214
            let card = local_model.card();
            let py_args = flags.as_vec(&path_str, &card.service_name);
215
            let p = std::path::PathBuf::from(path_str);
216
217
            let engine =
                dynamo_engine_python::make_string_engine(cancel_token.clone(), &p, py_args).await?;
218
219
            EngineConfig::StaticFull {
                engine,
220
                model: Box::new(local_model),
221
222
            }
        }
223
224
225
226
    };

    match in_opt {
        Input::Http => {
227
            crate::input::http::run(runtime.clone(), flags, engine_config, template).await?;
228
229
        }
        Input::Text => {
230
            crate::input::text::run(runtime.clone(), flags, None, engine_config, template).await?;
231
232
233
234
        }
        Input::Stdin => {
            let mut prompt = String::new();
            std::io::stdin().read_to_string(&mut prompt).unwrap();
235
236
237
238
239
240
241
242
            crate::input::text::run(
                runtime.clone(),
                flags,
                Some(prompt),
                engine_config,
                template,
            )
            .await?;
243
        }
244
        Input::Batch(path) => {
245
246
            crate::input::batch::run(runtime.clone(), flags, card, path, engine_config, template)
                .await?;
247
        }
248
        Input::Endpoint(path) => {
249
250
            let distributed_runtime = DistributedRuntime::from_settings(runtime.clone()).await?;
            crate::input::endpoint::run(distributed_runtime, path, engine_config).await?;
251
252
253
254
        }
    }

    // Allow engines to ask main thread to wait on an extra future.
255
    // We use this to stop the vllm and sglang sub-process
256
    if let Some(extra) = extra {
257
        extra.await;
258
259
260
261
    }

    Ok(())
}
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300

/// Wait for cancel_token to be cancelled, then stop the child as gracefully as possible.
/// Keeps the TempPath alive until the child is stopped.
async fn stopper(
    cancel_token: CancellationToken,
    mut child: tokio::process::Child,
    py_script: tempfile::TempPath,
) {
    cancel_token.cancelled().await;

    // Ask subprocess to stop gracefully
    if let Some(pid) = child.id() {
        unsafe { libc::kill(pid as i32, libc::SIGTERM) };
    }

    tokio::select! {
        exit = child.wait() => {
            tracing::trace!("vllm sub-process graceful exit");
            match exit {
                Ok(exit_status) if exit_status.success() => {}
                Ok(exit_status) => {
                    // This is nearly always 15 (SIGTERM)
                    tracing::trace!("vllm sub-process non-0 exit: {exit_status}");
                }
                Err(err) => {
                    tracing::warn!("vllm sub-process error getting exit status: {err}");
                }
            }
        }
        _ = tokio::time::sleep(CHILD_STOP_TIMEOUT) => {
            // It didn't stop in time, kill it
            child.kill().await.expect("Failed killing vllm subprocess");
            let _ = child.wait().await;
        }
    }
    // This temporary file contains the python script running the engine. It deletes on drop.
    // Keep it alive until the engine has stopped.
    drop(py_script);
}