lib.rs 11.1 KB
Newer Older
1
2
3
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

4
use std::{future::Future, pin::Pin};
5
use std::{io::Read, sync::Arc, time::Duration};
6

7
use anyhow::Context;
8
use dynamo_llm::{backend::ExecutionContext, engines::StreamingEngine, LocalModel};
9
use dynamo_runtime::{protocols::Endpoint, CancellationToken, DistributedRuntime};
10

11
12
mod flags;
pub use flags::Flags;
13
14
mod input;
mod opt;
15
pub use dynamo_llm::request_template::RequestTemplate;
16
pub use opt::{Input, Output};
17
mod subprocess;
18

19
20
const CHILD_STOP_TIMEOUT: Duration = Duration::from_secs(2);

21
22
23
24
/// How we identify a python string endpoint
#[cfg(feature = "python")]
const PYTHON_STR_SCHEME: &str = "pystr:";

25
26
27
/// Where we will attach the vllm/sglang subprocess. Invisible to users.
pub const INTERNAL_ENDPOINT: &str = "dyn://dynamo.internal.worker";

28
pub enum EngineConfig {
29
    /// An remote networked engine we don't know about yet
30
    Dynamic(Endpoint),
31

32
33
    /// A Full service engine does it's own tokenization and prompt formatting.
    StaticFull {
34
        engine: Arc<dyn StreamingEngine>,
35
        model: Box<LocalModel>,
36
    },
37
38
39
40

    /// A core engine expects to be wrapped with pre/post processors that handle tokenization.
    StaticCore {
        engine: ExecutionContext,
41
        model: Box<LocalModel>,
42
    },
43
44
}

45
pub async fn run(
Neelay Shah's avatar
Neelay Shah committed
46
    runtime: dynamo_runtime::Runtime,
47
    in_opt: Input,
48
49
50
    out_opt: Output,
    flags: Flags,
) -> anyhow::Result<()> {
51
52
53
54
    if matches!(&in_opt, Input::Endpoint(_)) && matches!(&out_opt, Output::Endpoint(_)) {
        anyhow::bail!("Cannot use endpoint for both in and out");
    }

55
    let cancel_token = runtime.primary_token();
56
    let maybe_path = flags
57
        .model_path_pos
58
        .clone()
59
        .or(flags.model_path_flag.clone());
60

61
62
63
64
    let local_model: LocalModel = match out_opt {
        // If output is an endpoint we are ingress and don't have a local model, but making an
        // empty one cleans up the code.
        Output::Endpoint(_) => Default::default(),
65
66

        // All other output types have a local model
67
68
69
70
71
72
        _ => {
            match &maybe_path {
                Some(model_path) => {
                    LocalModel::prepare(
                        model_path.to_str().context("Invalid UTF-8 in model path")?,
                        flags.model_config.as_deref(),
73
                        flags.model_name.clone(),
74
75
                    )
                    .await?
76
                }
77
78
                None => {
                    // echo_full engine doesn't need a path
79
80
81
82
                    match &flags.model_name {
                        Some(name) => LocalModel::with_name_only(name),
                        None => Default::default(),
                    }
83
84
                }
            }
Graham King's avatar
Graham King committed
85
        }
86
    };
87

88
    let mut extra: Option<Pin<Box<dyn Future<Output = ()> + Send>>> = None; // vllm and sglang sub-process
89

90
91
92
93
94
95
96
97
    let template = if let Some(path) = flags.request_template.as_ref() {
        let template = RequestTemplate::load(path)?;
        tracing::debug!("Using request template: {template:?}");
        Some(template)
    } else {
        None
    };

98
99
100
    // We may need it later
    let card = local_model.card().clone();

101
102
    // Create the engine matching `out`
    let engine_config = match out_opt {
103
104
105
        Output::Endpoint(path) => {
            let endpoint: Endpoint = path.parse()?;
            EngineConfig::Dynamic(endpoint)
106
        }
107
108
109
110
        Output::EchoFull => EngineConfig::StaticFull {
            model: Box::new(local_model),
            engine: dynamo_llm::engines::make_engine_full(),
        },
111
        Output::EchoCore => {
112
113
            let card = local_model.card();
            if !card.has_tokenizer() {
114
115
116
117
118
                anyhow::bail!(
                    "out=echo_core need to find the tokenizer. Pass flag --model-path <path>"
                );
            };
            EngineConfig::StaticCore {
119
                engine: dynamo_llm::engines::make_engine_core(),
120
                model: Box::new(local_model),
121
122
            }
        }
123
        #[cfg(feature = "mistralrs")]
124
        Output::MistralRs => EngineConfig::StaticFull {
125
            engine: dynamo_engine_mistralrs::make_engine(&local_model).await?,
126
127
            model: Box::new(local_model),
        },
128
        Output::SgLang => {
129
130
131
132
            if !local_model.path().is_dir() {
                // TODO Does sglang support GGUF? Can we make it work?
                anyhow::bail!("`--model-path should point at a HuggingFace repo checkout");
            }
133
134
135
136
137
138
139
140

            // If `in=dyn` we want the sglang subprocess to listen on that endpoint.
            // If not, then the endpoint isn't exposed so we invent an internal one.
            let endpoint = match &in_opt {
                Input::Endpoint(path) => path.parse()?,
                _ => INTERNAL_ENDPOINT.parse()?,
            };

141
142
143
144
145
146
            let multi_node_conf = dynamo_llm::engines::MultiNodeConfig {
                num_nodes: flags.num_nodes,
                node_rank: flags.node_rank,
                leader_addr: flags.leader_addr.clone().unwrap_or_default(),
            };
            let (py_script, child) = match subprocess::start(
147
                subprocess::sglang::PY,
148
                &local_model,
149
                &endpoint,
150
151
152
153
154
155
                flags.tensor_parallel_size,
                if flags.base_gpu_id == 0 {
                    None
                } else {
                    Some(flags.base_gpu_id)
                },
156
157
158
159
160
                if flags.num_nodes <= 1 {
                    None
                } else {
                    Some(multi_node_conf)
                },
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
                flags.extra_engine_args.as_deref(),
            )
            .await
            {
                Ok(x) => x,
                Err(err) => {
                    anyhow::bail!("Failed starting sglang sub-process: {err}");
                }
            };
            let cancel_token = cancel_token.clone();

            // Sub-process cleanup
            extra = Some(Box::pin(async move {
                stopper(cancel_token, child, py_script).await;
            }));
            EngineConfig::Dynamic(endpoint)
        }
        Output::Vllm => {
            if flags.base_gpu_id != 0 {
                anyhow::bail!("vllm does not support base_gpu_id. Set environment variable CUDA_VISIBLE_DEVICES instead.");
            }
182
183
184
185
186
187
188
189

            // If `in=dyn` we want the vllm subprocess to listen on that endpoint.
            // If not, then the endpoint isn't exposed so we invent an internal one.
            let endpoint = match &in_opt {
                Input::Endpoint(path) => path.parse()?,
                _ => INTERNAL_ENDPOINT.parse()?,
            };

190
            let (py_script, child) = match subprocess::start(
191
                subprocess::vllm::PY,
192
                &local_model,
193
                &endpoint,
194
195
                flags.tensor_parallel_size,
                None, // base_gpu_id. vllm uses CUDA_VISIBLE_DEVICES instead
196
                None, // multi-node config. vllm uses `ray`, see guide
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
                flags.extra_engine_args.as_deref(),
            )
            .await
            {
                Ok(x) => x,
                Err(err) => {
                    anyhow::bail!("Failed starting vllm sub-process: {err}");
                }
            };
            let cancel_token = cancel_token.clone();

            // Sub-process cleanup
            extra = Some(Box::pin(async move {
                stopper(cancel_token, child, py_script).await;
            }));
            EngineConfig::Dynamic(endpoint)
        }

215
216
        #[cfg(feature = "llamacpp")]
        Output::LlamaCpp => {
217
            if !local_model.path().is_file() {
218
219
                anyhow::bail!("--model-path should refer to a GGUF file. llama_cpp does not support safetensors.");
            }
220
            let engine =
221
222
                dynamo_engine_llamacpp::make_engine(cancel_token.clone(), local_model.path())
                    .await?;
223
            EngineConfig::StaticCore {
224
                engine,
225
                model: Box::new(local_model),
Graham King's avatar
Graham King committed
226
227
            }
        }
228
229
        #[cfg(feature = "python")]
        Output::PythonStr(path_str) => {
230
231
            let card = local_model.card();
            let py_args = flags.as_vec(&path_str, &card.service_name);
232
            let p = std::path::PathBuf::from(path_str);
233
234
            let engine =
                dynamo_engine_python::make_string_engine(cancel_token.clone(), &p, py_args).await?;
235
236
            EngineConfig::StaticFull {
                engine,
237
                model: Box::new(local_model),
238
239
            }
        }
240
241
242
243
    };

    match in_opt {
        Input::Http => {
244
            crate::input::http::run(runtime.clone(), flags, engine_config, template).await?;
245
246
        }
        Input::Text => {
247
            crate::input::text::run(runtime.clone(), flags, None, engine_config, template).await?;
248
249
250
251
        }
        Input::Stdin => {
            let mut prompt = String::new();
            std::io::stdin().read_to_string(&mut prompt).unwrap();
252
253
254
255
256
257
258
259
            crate::input::text::run(
                runtime.clone(),
                flags,
                Some(prompt),
                engine_config,
                template,
            )
            .await?;
260
        }
261
        Input::Batch(path) => {
262
263
            crate::input::batch::run(runtime.clone(), flags, card, path, engine_config, template)
                .await?;
264
        }
265
        Input::Endpoint(path) => {
266
267
            let distributed_runtime = DistributedRuntime::from_settings(runtime.clone()).await?;
            crate::input::endpoint::run(distributed_runtime, path, engine_config).await?;
268
269
270
271
        }
    }

    // Allow engines to ask main thread to wait on an extra future.
272
    // We use this to stop the vllm and sglang sub-process
273
    if let Some(extra) = extra {
274
        extra.await;
275
276
277
278
    }

    Ok(())
}
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317

/// Wait for cancel_token to be cancelled, then stop the child as gracefully as possible.
/// Keeps the TempPath alive until the child is stopped.
async fn stopper(
    cancel_token: CancellationToken,
    mut child: tokio::process::Child,
    py_script: tempfile::TempPath,
) {
    cancel_token.cancelled().await;

    // Ask subprocess to stop gracefully
    if let Some(pid) = child.id() {
        unsafe { libc::kill(pid as i32, libc::SIGTERM) };
    }

    tokio::select! {
        exit = child.wait() => {
            tracing::trace!("vllm sub-process graceful exit");
            match exit {
                Ok(exit_status) if exit_status.success() => {}
                Ok(exit_status) => {
                    // This is nearly always 15 (SIGTERM)
                    tracing::trace!("vllm sub-process non-0 exit: {exit_status}");
                }
                Err(err) => {
                    tracing::warn!("vllm sub-process error getting exit status: {err}");
                }
            }
        }
        _ = tokio::time::sleep(CHILD_STOP_TIMEOUT) => {
            // It didn't stop in time, kill it
            child.kill().await.expect("Failed killing vllm subprocess");
            let _ = child.wait().await;
        }
    }
    // This temporary file contains the python script running the engine. It deletes on drop.
    // Keep it alive until the engine has stopped.
    drop(py_script);
}