lib.rs 11 KB
Newer Older
1
2
3
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

4
use std::{future::Future, pin::Pin};
5
use std::{io::Read, sync::Arc, time::Duration};
6

7
use anyhow::Context;
8
use dynamo_llm::{backend::ExecutionContext, engines::StreamingEngine, local_model::LocalModel};
9
use dynamo_runtime::{CancellationToken, DistributedRuntime};
10

11
12
mod flags;
pub use flags::Flags;
13
14
mod input;
mod opt;
15
pub use dynamo_llm::request_template::RequestTemplate;
16
pub use opt::{Input, Output};
17
mod subprocess;
18

19
20
const CHILD_STOP_TIMEOUT: Duration = Duration::from_secs(2);

21
22
23
24
/// How we identify a python string endpoint
#[cfg(feature = "python")]
const PYTHON_STR_SCHEME: &str = "pystr:";

25
26
27
/// Where we will attach the vllm/sglang subprocess. Invisible to users.
pub const INTERNAL_ENDPOINT: &str = "dyn://dynamo.internal.worker";

28
pub enum EngineConfig {
29
30
    /// Remote networked engines
    Dynamic,
31

32
33
    /// A Full service engine does it's own tokenization and prompt formatting.
    StaticFull {
34
        engine: Arc<dyn StreamingEngine>,
35
        model: Box<LocalModel>,
36
    },
37
38
39
40

    /// A core engine expects to be wrapped with pre/post processors that handle tokenization.
    StaticCore {
        engine: ExecutionContext,
41
        model: Box<LocalModel>,
42
    },
43
44
}

45
pub async fn run(
Neelay Shah's avatar
Neelay Shah committed
46
    runtime: dynamo_runtime::Runtime,
47
    in_opt: Input,
48
49
50
    out_opt: Output,
    flags: Flags,
) -> anyhow::Result<()> {
51
    if matches!(&in_opt, Input::Endpoint(_)) && matches!(&out_opt, Output::Dynamic) {
52
53
54
        anyhow::bail!("Cannot use endpoint for both in and out");
    }

55
    let cancel_token = runtime.primary_token();
56
    let maybe_path = flags
57
        .model_path_pos
58
        .clone()
59
        .or(flags.model_path_flag.clone());
60

61
    let mut local_model: LocalModel = match out_opt {
62
        // If output is dynamic we are ingress and don't have a local model, but making an
63
        // empty one cleans up the code.
64
        Output::Dynamic => Default::default(),
65
66

        // All other output types have a local model
67
68
69
70
71
72
        _ => {
            match &maybe_path {
                Some(model_path) => {
                    LocalModel::prepare(
                        model_path.to_str().context("Invalid UTF-8 in model path")?,
                        flags.model_config.as_deref(),
73
                        flags.model_name.clone(),
74
75
                    )
                    .await?
76
                }
77
78
                None => {
                    // echo_full engine doesn't need a path
79
80
81
82
                    match &flags.model_name {
                        Some(name) => LocalModel::with_name_only(name),
                        None => Default::default(),
                    }
83
84
                }
            }
Graham King's avatar
Graham King committed
85
        }
86
    };
87
    local_model.context_length = flags.context_length;
88

89
    let mut extra: Option<Pin<Box<dyn Future<Output = ()> + Send>>> = None; // vllm and sglang sub-process
90

91
92
93
94
95
96
97
98
    let template = if let Some(path) = flags.request_template.as_ref() {
        let template = RequestTemplate::load(path)?;
        tracing::debug!("Using request template: {template:?}");
        Some(template)
    } else {
        None
    };

99
100
101
    // We may need it later
    let card = local_model.card().clone();

102
103
    // Create the engine matching `out`
    let engine_config = match out_opt {
104
        Output::Dynamic => EngineConfig::Dynamic,
105
106
107
108
        Output::EchoFull => EngineConfig::StaticFull {
            model: Box::new(local_model),
            engine: dynamo_llm::engines::make_engine_full(),
        },
109
        Output::EchoCore => {
110
111
            let card = local_model.card();
            if !card.has_tokenizer() {
112
113
114
115
116
                anyhow::bail!(
                    "out=echo_core need to find the tokenizer. Pass flag --model-path <path>"
                );
            };
            EngineConfig::StaticCore {
117
                engine: dynamo_llm::engines::make_engine_core(),
118
                model: Box::new(local_model),
119
120
            }
        }
121
        #[cfg(feature = "mistralrs")]
122
        Output::MistralRs => EngineConfig::StaticFull {
123
            engine: dynamo_engine_mistralrs::make_engine(&local_model).await?,
124
125
            model: Box::new(local_model),
        },
126
        Output::SgLang => {
127
128
129
130
            if !local_model.path().is_dir() {
                // TODO Does sglang support GGUF? Can we make it work?
                anyhow::bail!("`--model-path should point at a HuggingFace repo checkout");
            }
131
132
133
134
135
136
137
138

            // If `in=dyn` we want the sglang subprocess to listen on that endpoint.
            // If not, then the endpoint isn't exposed so we invent an internal one.
            let endpoint = match &in_opt {
                Input::Endpoint(path) => path.parse()?,
                _ => INTERNAL_ENDPOINT.parse()?,
            };

139
140
141
142
143
144
            let multi_node_conf = dynamo_llm::engines::MultiNodeConfig {
                num_nodes: flags.num_nodes,
                node_rank: flags.node_rank,
                leader_addr: flags.leader_addr.clone().unwrap_or_default(),
            };
            let (py_script, child) = match subprocess::start(
145
                subprocess::sglang::PY,
146
                &local_model,
147
                &endpoint,
148
                flags.tensor_parallel_size,
149
                flags.context_length,
150
151
152
153
154
                if flags.base_gpu_id == 0 {
                    None
                } else {
                    Some(flags.base_gpu_id)
                },
155
156
157
158
159
                if flags.num_nodes <= 1 {
                    None
                } else {
                    Some(multi_node_conf)
                },
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
                flags.extra_engine_args.as_deref(),
            )
            .await
            {
                Ok(x) => x,
                Err(err) => {
                    anyhow::bail!("Failed starting sglang sub-process: {err}");
                }
            };
            let cancel_token = cancel_token.clone();

            // Sub-process cleanup
            extra = Some(Box::pin(async move {
                stopper(cancel_token, child, py_script).await;
            }));
175
            EngineConfig::Dynamic
176
177
178
179
180
        }
        Output::Vllm => {
            if flags.base_gpu_id != 0 {
                anyhow::bail!("vllm does not support base_gpu_id. Set environment variable CUDA_VISIBLE_DEVICES instead.");
            }
181
182
183
184
185
186
187
188

            // If `in=dyn` we want the vllm subprocess to listen on that endpoint.
            // If not, then the endpoint isn't exposed so we invent an internal one.
            let endpoint = match &in_opt {
                Input::Endpoint(path) => path.parse()?,
                _ => INTERNAL_ENDPOINT.parse()?,
            };

189
            let (py_script, child) = match subprocess::start(
190
                subprocess::vllm::PY,
191
                &local_model,
192
                &endpoint,
193
                flags.tensor_parallel_size,
194
                flags.context_length,
195
                None, // base_gpu_id. vllm uses CUDA_VISIBLE_DEVICES instead
196
                None, // multi-node config. vllm uses `ray`, see guide
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
                flags.extra_engine_args.as_deref(),
            )
            .await
            {
                Ok(x) => x,
                Err(err) => {
                    anyhow::bail!("Failed starting vllm sub-process: {err}");
                }
            };
            let cancel_token = cancel_token.clone();

            // Sub-process cleanup
            extra = Some(Box::pin(async move {
                stopper(cancel_token, child, py_script).await;
            }));
212
            EngineConfig::Dynamic
213
214
        }

215
216
        #[cfg(feature = "llamacpp")]
        Output::LlamaCpp => {
217
            if !local_model.path().is_file() {
218
219
                anyhow::bail!("--model-path should refer to a GGUF file. llama_cpp does not support safetensors.");
            }
220
            let engine =
221
                dynamo_engine_llamacpp::make_engine(cancel_token.clone(), &local_model).await?;
222
            EngineConfig::StaticCore {
223
                engine,
224
                model: Box::new(local_model),
Graham King's avatar
Graham King committed
225
226
            }
        }
227
228
        #[cfg(feature = "python")]
        Output::PythonStr(path_str) => {
229
230
            let card = local_model.card();
            let py_args = flags.as_vec(&path_str, &card.service_name);
231
            let p = std::path::PathBuf::from(path_str);
232
233
            let engine =
                dynamo_engine_python::make_string_engine(cancel_token.clone(), &p, py_args).await?;
234
235
            EngineConfig::StaticFull {
                engine,
236
                model: Box::new(local_model),
237
238
            }
        }
239
240
241
242
    };

    match in_opt {
        Input::Http => {
243
            crate::input::http::run(runtime.clone(), flags, engine_config, template).await?;
244
245
        }
        Input::Text => {
246
            crate::input::text::run(runtime.clone(), flags, None, engine_config, template).await?;
247
248
249
250
        }
        Input::Stdin => {
            let mut prompt = String::new();
            std::io::stdin().read_to_string(&mut prompt).unwrap();
251
252
253
254
255
256
257
258
            crate::input::text::run(
                runtime.clone(),
                flags,
                Some(prompt),
                engine_config,
                template,
            )
            .await?;
259
        }
260
        Input::Batch(path) => {
261
262
            crate::input::batch::run(runtime.clone(), flags, card, path, engine_config, template)
                .await?;
263
        }
264
        Input::Endpoint(path) => {
265
266
            let distributed_runtime = DistributedRuntime::from_settings(runtime.clone()).await?;
            crate::input::endpoint::run(distributed_runtime, path, engine_config).await?;
267
268
269
270
        }
    }

    // Allow engines to ask main thread to wait on an extra future.
271
    // We use this to stop the vllm and sglang sub-process
272
    if let Some(extra) = extra {
273
        extra.await;
274
275
276
277
    }

    Ok(())
}
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316

/// Wait for cancel_token to be cancelled, then stop the child as gracefully as possible.
/// Keeps the TempPath alive until the child is stopped.
async fn stopper(
    cancel_token: CancellationToken,
    mut child: tokio::process::Child,
    py_script: tempfile::TempPath,
) {
    cancel_token.cancelled().await;

    // Ask subprocess to stop gracefully
    if let Some(pid) = child.id() {
        unsafe { libc::kill(pid as i32, libc::SIGTERM) };
    }

    tokio::select! {
        exit = child.wait() => {
            tracing::trace!("vllm sub-process graceful exit");
            match exit {
                Ok(exit_status) if exit_status.success() => {}
                Ok(exit_status) => {
                    // This is nearly always 15 (SIGTERM)
                    tracing::trace!("vllm sub-process non-0 exit: {exit_status}");
                }
                Err(err) => {
                    tracing::warn!("vllm sub-process error getting exit status: {err}");
                }
            }
        }
        _ = tokio::time::sleep(CHILD_STOP_TIMEOUT) => {
            // It didn't stop in time, kill it
            child.kill().await.expect("Failed killing vllm subprocess");
            let _ = child.wait().await;
        }
    }
    // This temporary file contains the python script running the engine. It deletes on drop.
    // Keep it alive until the engine has stopped.
    drop(py_script);
}