lib.rs 10.9 KB
Newer Older
1
2
3
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

4
use std::{future::Future, pin::Pin};
5
use std::{io::Read, sync::Arc, time::Duration};
6

7
use anyhow::Context;
8
use dynamo_llm::{backend::ExecutionContext, engines::StreamingEngine, LocalModel};
9
use dynamo_runtime::{CancellationToken, DistributedRuntime};
10

11
12
mod flags;
pub use flags::Flags;
13
14
mod input;
mod opt;
15
pub use dynamo_llm::request_template::RequestTemplate;
16
pub use opt::{Input, Output};
17
mod subprocess;
18

19
20
const CHILD_STOP_TIMEOUT: Duration = Duration::from_secs(2);

21
22
23
24
/// How we identify a python string endpoint
#[cfg(feature = "python")]
const PYTHON_STR_SCHEME: &str = "pystr:";

25
26
27
/// Where we will attach the vllm/sglang subprocess. Invisible to users.
pub const INTERNAL_ENDPOINT: &str = "dyn://dynamo.internal.worker";

28
pub enum EngineConfig {
29
30
    /// Remote networked engines
    Dynamic,
31

32
33
    /// A Full service engine does it's own tokenization and prompt formatting.
    StaticFull {
34
        engine: Arc<dyn StreamingEngine>,
35
        model: Box<LocalModel>,
36
    },
37
38
39
40

    /// A core engine expects to be wrapped with pre/post processors that handle tokenization.
    StaticCore {
        engine: ExecutionContext,
41
        model: Box<LocalModel>,
42
    },
43
44
}

45
pub async fn run(
Neelay Shah's avatar
Neelay Shah committed
46
    runtime: dynamo_runtime::Runtime,
47
    in_opt: Input,
48
49
50
    out_opt: Output,
    flags: Flags,
) -> anyhow::Result<()> {
51
    if matches!(&in_opt, Input::Endpoint(_)) && matches!(&out_opt, Output::Dynamic) {
52
53
54
        anyhow::bail!("Cannot use endpoint for both in and out");
    }

55
    let cancel_token = runtime.primary_token();
56
    let maybe_path = flags
57
        .model_path_pos
58
        .clone()
59
        .or(flags.model_path_flag.clone());
60

61
    let local_model: LocalModel = match out_opt {
62
        // If output is dynamic we are ingress and don't have a local model, but making an
63
        // empty one cleans up the code.
64
        Output::Dynamic => Default::default(),
65
66

        // All other output types have a local model
67
68
69
70
71
72
        _ => {
            match &maybe_path {
                Some(model_path) => {
                    LocalModel::prepare(
                        model_path.to_str().context("Invalid UTF-8 in model path")?,
                        flags.model_config.as_deref(),
73
                        flags.model_name.clone(),
74
75
                    )
                    .await?
76
                }
77
78
                None => {
                    // echo_full engine doesn't need a path
79
80
81
82
                    match &flags.model_name {
                        Some(name) => LocalModel::with_name_only(name),
                        None => Default::default(),
                    }
83
84
                }
            }
Graham King's avatar
Graham King committed
85
        }
86
    };
87

88
    let mut extra: Option<Pin<Box<dyn Future<Output = ()> + Send>>> = None; // vllm and sglang sub-process
89

90
91
92
93
94
95
96
97
    let template = if let Some(path) = flags.request_template.as_ref() {
        let template = RequestTemplate::load(path)?;
        tracing::debug!("Using request template: {template:?}");
        Some(template)
    } else {
        None
    };

98
99
100
    // We may need it later
    let card = local_model.card().clone();

101
102
    // Create the engine matching `out`
    let engine_config = match out_opt {
103
        Output::Dynamic => EngineConfig::Dynamic,
104
105
106
107
        Output::EchoFull => EngineConfig::StaticFull {
            model: Box::new(local_model),
            engine: dynamo_llm::engines::make_engine_full(),
        },
108
        Output::EchoCore => {
109
110
            let card = local_model.card();
            if !card.has_tokenizer() {
111
112
113
114
115
                anyhow::bail!(
                    "out=echo_core need to find the tokenizer. Pass flag --model-path <path>"
                );
            };
            EngineConfig::StaticCore {
116
                engine: dynamo_llm::engines::make_engine_core(),
117
                model: Box::new(local_model),
118
119
            }
        }
120
        #[cfg(feature = "mistralrs")]
121
        Output::MistralRs => EngineConfig::StaticFull {
122
            engine: dynamo_engine_mistralrs::make_engine(&local_model).await?,
123
124
            model: Box::new(local_model),
        },
125
        Output::SgLang => {
126
127
128
129
            if !local_model.path().is_dir() {
                // TODO Does sglang support GGUF? Can we make it work?
                anyhow::bail!("`--model-path should point at a HuggingFace repo checkout");
            }
130
131
132
133
134
135
136
137

            // If `in=dyn` we want the sglang subprocess to listen on that endpoint.
            // If not, then the endpoint isn't exposed so we invent an internal one.
            let endpoint = match &in_opt {
                Input::Endpoint(path) => path.parse()?,
                _ => INTERNAL_ENDPOINT.parse()?,
            };

138
139
140
141
142
143
            let multi_node_conf = dynamo_llm::engines::MultiNodeConfig {
                num_nodes: flags.num_nodes,
                node_rank: flags.node_rank,
                leader_addr: flags.leader_addr.clone().unwrap_or_default(),
            };
            let (py_script, child) = match subprocess::start(
144
                subprocess::sglang::PY,
145
                &local_model,
146
                &endpoint,
147
148
149
150
151
152
                flags.tensor_parallel_size,
                if flags.base_gpu_id == 0 {
                    None
                } else {
                    Some(flags.base_gpu_id)
                },
153
154
155
156
157
                if flags.num_nodes <= 1 {
                    None
                } else {
                    Some(multi_node_conf)
                },
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
                flags.extra_engine_args.as_deref(),
            )
            .await
            {
                Ok(x) => x,
                Err(err) => {
                    anyhow::bail!("Failed starting sglang sub-process: {err}");
                }
            };
            let cancel_token = cancel_token.clone();

            // Sub-process cleanup
            extra = Some(Box::pin(async move {
                stopper(cancel_token, child, py_script).await;
            }));
173
            EngineConfig::Dynamic
174
175
176
177
178
        }
        Output::Vllm => {
            if flags.base_gpu_id != 0 {
                anyhow::bail!("vllm does not support base_gpu_id. Set environment variable CUDA_VISIBLE_DEVICES instead.");
            }
179
180
181
182
183
184
185
186

            // If `in=dyn` we want the vllm subprocess to listen on that endpoint.
            // If not, then the endpoint isn't exposed so we invent an internal one.
            let endpoint = match &in_opt {
                Input::Endpoint(path) => path.parse()?,
                _ => INTERNAL_ENDPOINT.parse()?,
            };

187
            let (py_script, child) = match subprocess::start(
188
                subprocess::vllm::PY,
189
                &local_model,
190
                &endpoint,
191
192
                flags.tensor_parallel_size,
                None, // base_gpu_id. vllm uses CUDA_VISIBLE_DEVICES instead
193
                None, // multi-node config. vllm uses `ray`, see guide
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
                flags.extra_engine_args.as_deref(),
            )
            .await
            {
                Ok(x) => x,
                Err(err) => {
                    anyhow::bail!("Failed starting vllm sub-process: {err}");
                }
            };
            let cancel_token = cancel_token.clone();

            // Sub-process cleanup
            extra = Some(Box::pin(async move {
                stopper(cancel_token, child, py_script).await;
            }));
209
            EngineConfig::Dynamic
210
211
        }

212
213
        #[cfg(feature = "llamacpp")]
        Output::LlamaCpp => {
214
            if !local_model.path().is_file() {
215
216
                anyhow::bail!("--model-path should refer to a GGUF file. llama_cpp does not support safetensors.");
            }
217
            let engine =
218
219
                dynamo_engine_llamacpp::make_engine(cancel_token.clone(), local_model.path())
                    .await?;
220
            EngineConfig::StaticCore {
221
                engine,
222
                model: Box::new(local_model),
Graham King's avatar
Graham King committed
223
224
            }
        }
225
226
        #[cfg(feature = "python")]
        Output::PythonStr(path_str) => {
227
228
            let card = local_model.card();
            let py_args = flags.as_vec(&path_str, &card.service_name);
229
            let p = std::path::PathBuf::from(path_str);
230
231
            let engine =
                dynamo_engine_python::make_string_engine(cancel_token.clone(), &p, py_args).await?;
232
233
            EngineConfig::StaticFull {
                engine,
234
                model: Box::new(local_model),
235
236
            }
        }
237
238
239
240
    };

    match in_opt {
        Input::Http => {
241
            crate::input::http::run(runtime.clone(), flags, engine_config, template).await?;
242
243
        }
        Input::Text => {
244
            crate::input::text::run(runtime.clone(), flags, None, engine_config, template).await?;
245
246
247
248
        }
        Input::Stdin => {
            let mut prompt = String::new();
            std::io::stdin().read_to_string(&mut prompt).unwrap();
249
250
251
252
253
254
255
256
            crate::input::text::run(
                runtime.clone(),
                flags,
                Some(prompt),
                engine_config,
                template,
            )
            .await?;
257
        }
258
        Input::Batch(path) => {
259
260
            crate::input::batch::run(runtime.clone(), flags, card, path, engine_config, template)
                .await?;
261
        }
262
        Input::Endpoint(path) => {
263
264
            let distributed_runtime = DistributedRuntime::from_settings(runtime.clone()).await?;
            crate::input::endpoint::run(distributed_runtime, path, engine_config).await?;
265
266
267
268
        }
    }

    // Allow engines to ask main thread to wait on an extra future.
269
    // We use this to stop the vllm and sglang sub-process
270
    if let Some(extra) = extra {
271
        extra.await;
272
273
274
275
    }

    Ok(())
}
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314

/// Wait for cancel_token to be cancelled, then stop the child as gracefully as possible.
/// Keeps the TempPath alive until the child is stopped.
async fn stopper(
    cancel_token: CancellationToken,
    mut child: tokio::process::Child,
    py_script: tempfile::TempPath,
) {
    cancel_token.cancelled().await;

    // Ask subprocess to stop gracefully
    if let Some(pid) = child.id() {
        unsafe { libc::kill(pid as i32, libc::SIGTERM) };
    }

    tokio::select! {
        exit = child.wait() => {
            tracing::trace!("vllm sub-process graceful exit");
            match exit {
                Ok(exit_status) if exit_status.success() => {}
                Ok(exit_status) => {
                    // This is nearly always 15 (SIGTERM)
                    tracing::trace!("vllm sub-process non-0 exit: {exit_status}");
                }
                Err(err) => {
                    tracing::warn!("vllm sub-process error getting exit status: {err}");
                }
            }
        }
        _ = tokio::time::sleep(CHILD_STOP_TIMEOUT) => {
            // It didn't stop in time, kill it
            child.kill().await.expect("Failed killing vllm subprocess");
            let _ = child.wait().await;
        }
    }
    // This temporary file contains the python script running the engine. It deletes on drop.
    // Keep it alive until the engine has stopped.
    drop(py_script);
}