lib.rs 10.1 KB
Newer Older
1
2
3
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

4
use std::{future::Future, pin::Pin};
5
use std::{io::Read, sync::Arc, time::Duration};
6

7
use anyhow::Context;
8
use dynamo_llm::{backend::ExecutionContext, engines::StreamingEngine, LocalModel};
9
use dynamo_runtime::{protocols::Endpoint, CancellationToken, DistributedRuntime};
10

11
12
mod flags;
pub use flags::Flags;
13
14
mod input;
mod opt;
15
pub use dynamo_llm::request_template::RequestTemplate;
16
pub use opt::{Input, Output};
17
mod subprocess;
18

19
20
const CHILD_STOP_TIMEOUT: Duration = Duration::from_secs(2);

21
22
23
24
/// How we identify a python string endpoint
#[cfg(feature = "python")]
const PYTHON_STR_SCHEME: &str = "pystr:";

25
pub enum EngineConfig {
26
    /// An remote networked engine we don't know about yet
27
    Dynamic(Endpoint),
28

29
30
    /// A Full service engine does it's own tokenization and prompt formatting.
    StaticFull {
31
        engine: Arc<dyn StreamingEngine>,
32
        model: Box<LocalModel>,
33
    },
34
35
36
37

    /// A core engine expects to be wrapped with pre/post processors that handle tokenization.
    StaticCore {
        engine: ExecutionContext,
38
        model: Box<LocalModel>,
39
    },
40
41
}

42
pub async fn run(
Neelay Shah's avatar
Neelay Shah committed
43
    runtime: dynamo_runtime::Runtime,
44
    in_opt: Input,
45
46
47
    out_opt: Output,
    flags: Flags,
) -> anyhow::Result<()> {
48
    let cancel_token = runtime.primary_token();
49
    let maybe_path = flags
50
        .model_path_pos
51
        .clone()
52
        .or(flags.model_path_flag.clone());
53

54
55
56
57
    let local_model: LocalModel = match out_opt {
        // If output is an endpoint we are ingress and don't have a local model, but making an
        // empty one cleans up the code.
        Output::Endpoint(_) => Default::default(),
58
59

        // All other output types have a local model
60
61
62
63
64
65
        _ => {
            match &maybe_path {
                Some(model_path) => {
                    LocalModel::prepare(
                        model_path.to_str().context("Invalid UTF-8 in model path")?,
                        flags.model_config.as_deref(),
66
                        flags.model_name.clone(),
67
68
                    )
                    .await?
69
                }
70
71
72
                None => {
                    // echo_full engine doesn't need a path
                    Default::default()
73
74
                }
            }
Graham King's avatar
Graham King committed
75
        }
76
    };
77

78
    let mut extra: Option<Pin<Box<dyn Future<Output = ()> + Send>>> = None; // vllm and sglang sub-process
79

80
81
82
83
84
85
86
87
    let template = if let Some(path) = flags.request_template.as_ref() {
        let template = RequestTemplate::load(path)?;
        tracing::debug!("Using request template: {template:?}");
        Some(template)
    } else {
        None
    };

88
89
90
    // We may need it later
    let card = local_model.card().clone();

91
92
    // Create the engine matching `out`
    let engine_config = match out_opt {
93
94
95
        Output::Endpoint(path) => {
            let endpoint: Endpoint = path.parse()?;
            EngineConfig::Dynamic(endpoint)
96
        }
97
98
99
100
        Output::EchoFull => EngineConfig::StaticFull {
            model: Box::new(local_model),
            engine: dynamo_llm::engines::make_engine_full(),
        },
101
        Output::EchoCore => {
102
103
            let card = local_model.card();
            if !card.has_tokenizer() {
104
105
106
107
108
                anyhow::bail!(
                    "out=echo_core need to find the tokenizer. Pass flag --model-path <path>"
                );
            };
            EngineConfig::StaticCore {
109
                engine: dynamo_llm::engines::make_engine_core(),
110
                model: Box::new(local_model),
111
112
            }
        }
113
        #[cfg(feature = "mistralrs")]
114
        Output::MistralRs => EngineConfig::StaticFull {
115
            engine: dynamo_engine_mistralrs::make_engine(&local_model).await?,
116
117
            model: Box::new(local_model),
        },
118
        Output::SgLang => {
119
120
121
122
            if !local_model.path().is_dir() {
                // TODO Does sglang support GGUF? Can we make it work?
                anyhow::bail!("`--model-path should point at a HuggingFace repo checkout");
            }
123
124
125
126
127
128
            let multi_node_conf = dynamo_llm::engines::MultiNodeConfig {
                num_nodes: flags.num_nodes,
                node_rank: flags.node_rank,
                leader_addr: flags.leader_addr.clone().unwrap_or_default(),
            };
            let (py_script, child) = match subprocess::start(
129
                subprocess::sglang::PY,
130
                &local_model,
131
132
133
134
135
136
                flags.tensor_parallel_size,
                if flags.base_gpu_id == 0 {
                    None
                } else {
                    Some(flags.base_gpu_id)
                },
137
138
139
140
141
                if flags.num_nodes <= 1 {
                    None
                } else {
                    Some(multi_node_conf)
                },
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
                flags.extra_engine_args.as_deref(),
            )
            .await
            {
                Ok(x) => x,
                Err(err) => {
                    anyhow::bail!("Failed starting sglang sub-process: {err}");
                }
            };
            let cancel_token = cancel_token.clone();

            // Sub-process cleanup
            extra = Some(Box::pin(async move {
                stopper(cancel_token, child, py_script).await;
            }));
            let endpoint: Endpoint = subprocess::ENDPOINT.parse()?;
            EngineConfig::Dynamic(endpoint)
        }
        Output::Vllm => {
            if flags.base_gpu_id != 0 {
                anyhow::bail!("vllm does not support base_gpu_id. Set environment variable CUDA_VISIBLE_DEVICES instead.");
            }
164
            let (py_script, child) = match subprocess::start(
165
                subprocess::vllm::PY,
166
                &local_model,
167
168
                flags.tensor_parallel_size,
                None, // base_gpu_id. vllm uses CUDA_VISIBLE_DEVICES instead
169
                None, // multi-node config. vllm uses `ray`, see guide
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
                flags.extra_engine_args.as_deref(),
            )
            .await
            {
                Ok(x) => x,
                Err(err) => {
                    anyhow::bail!("Failed starting vllm sub-process: {err}");
                }
            };
            let cancel_token = cancel_token.clone();

            // Sub-process cleanup
            extra = Some(Box::pin(async move {
                stopper(cancel_token, child, py_script).await;
            }));
            let endpoint: Endpoint = subprocess::ENDPOINT.parse()?;
            EngineConfig::Dynamic(endpoint)
        }

189
190
        #[cfg(feature = "llamacpp")]
        Output::LlamaCpp => {
191
            if !local_model.path().is_file() {
192
193
                anyhow::bail!("--model-path should refer to a GGUF file. llama_cpp does not support safetensors.");
            }
194
            let engine =
195
196
                dynamo_engine_llamacpp::make_engine(cancel_token.clone(), local_model.path())
                    .await?;
197
            EngineConfig::StaticCore {
198
                engine,
199
                model: Box::new(local_model),
Graham King's avatar
Graham King committed
200
201
            }
        }
202
203
        #[cfg(feature = "python")]
        Output::PythonStr(path_str) => {
204
205
            let card = local_model.card();
            let py_args = flags.as_vec(&path_str, &card.service_name);
206
            let p = std::path::PathBuf::from(path_str);
207
208
            let engine =
                dynamo_engine_python::make_string_engine(cancel_token.clone(), &p, py_args).await?;
209
210
            EngineConfig::StaticFull {
                engine,
211
                model: Box::new(local_model),
212
213
            }
        }
214
215
216
217
    };

    match in_opt {
        Input::Http => {
218
            crate::input::http::run(runtime.clone(), flags, engine_config, template).await?;
219
220
        }
        Input::Text => {
221
            crate::input::text::run(runtime.clone(), flags, None, engine_config, template).await?;
222
223
224
225
        }
        Input::Stdin => {
            let mut prompt = String::new();
            std::io::stdin().read_to_string(&mut prompt).unwrap();
226
227
228
229
230
231
232
233
            crate::input::text::run(
                runtime.clone(),
                flags,
                Some(prompt),
                engine_config,
                template,
            )
            .await?;
234
        }
235
        Input::Batch(path) => {
236
237
            crate::input::batch::run(runtime.clone(), flags, card, path, engine_config, template)
                .await?;
238
        }
239
        Input::Endpoint(path) => {
240
241
            let distributed_runtime = DistributedRuntime::from_settings(runtime.clone()).await?;
            crate::input::endpoint::run(distributed_runtime, path, engine_config).await?;
242
243
244
245
        }
    }

    // Allow engines to ask main thread to wait on an extra future.
246
    // We use this to stop the vllm and sglang sub-process
247
    if let Some(extra) = extra {
248
        extra.await;
249
250
251
252
    }

    Ok(())
}
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291

/// Wait for cancel_token to be cancelled, then stop the child as gracefully as possible.
/// Keeps the TempPath alive until the child is stopped.
async fn stopper(
    cancel_token: CancellationToken,
    mut child: tokio::process::Child,
    py_script: tempfile::TempPath,
) {
    cancel_token.cancelled().await;

    // Ask subprocess to stop gracefully
    if let Some(pid) = child.id() {
        unsafe { libc::kill(pid as i32, libc::SIGTERM) };
    }

    tokio::select! {
        exit = child.wait() => {
            tracing::trace!("vllm sub-process graceful exit");
            match exit {
                Ok(exit_status) if exit_status.success() => {}
                Ok(exit_status) => {
                    // This is nearly always 15 (SIGTERM)
                    tracing::trace!("vllm sub-process non-0 exit: {exit_status}");
                }
                Err(err) => {
                    tracing::warn!("vllm sub-process error getting exit status: {err}");
                }
            }
        }
        _ = tokio::time::sleep(CHILD_STOP_TIMEOUT) => {
            // It didn't stop in time, kill it
            child.kill().await.expect("Failed killing vllm subprocess");
            let _ = child.wait().await;
        }
    }
    // This temporary file contains the python script running the engine. It deletes on drop.
    // Keep it alive until the engine has stopped.
    drop(py_script);
}