lib.rs 11.5 KB
Newer Older
1
2
3
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

4
use std::{future::Future, pin::Pin};
5
use std::{io::Read, sync::Arc, time::Duration};
6

7
use anyhow::Context;
8
use dynamo_llm::{backend::ExecutionContext, engines::StreamingEngine, local_model::LocalModel};
9
use dynamo_runtime::{CancellationToken, DistributedRuntime};
10

11
12
mod flags;
pub use flags::Flags;
13
14
mod input;
mod opt;
15
pub use dynamo_llm::request_template::RequestTemplate;
16
pub use opt::{Input, Output};
17
mod subprocess;
18

19
20
const CHILD_STOP_TIMEOUT: Duration = Duration::from_secs(2);

21
22
23
24
/// How we identify a python string endpoint
#[cfg(feature = "python")]
const PYTHON_STR_SCHEME: &str = "pystr:";

25
26
27
/// Where we will attach the vllm/sglang subprocess. Invisible to users.
pub const INTERNAL_ENDPOINT: &str = "dyn://dynamo.internal.worker";

28
29
30
/// Default size of a KV cache block. Override with --kv-cache-block-size
const DEFAULT_KV_CACHE_BLOCK_SIZE: usize = 16;

31
pub enum EngineConfig {
32
33
    /// Remote networked engines
    Dynamic,
34

35
36
    /// A Full service engine does it's own tokenization and prompt formatting.
    StaticFull {
37
        engine: Arc<dyn StreamingEngine>,
38
        model: Box<LocalModel>,
39
    },
40
41
42
43

    /// A core engine expects to be wrapped with pre/post processors that handle tokenization.
    StaticCore {
        engine: ExecutionContext,
44
        model: Box<LocalModel>,
45
    },
46
47
}

48
pub async fn run(
Neelay Shah's avatar
Neelay Shah committed
49
    runtime: dynamo_runtime::Runtime,
50
    in_opt: Input,
51
52
53
    out_opt: Output,
    flags: Flags,
) -> anyhow::Result<()> {
54
    if matches!(&in_opt, Input::Endpoint(_)) && matches!(&out_opt, Output::Dynamic) {
55
56
57
        anyhow::bail!("Cannot use endpoint for both in and out");
    }

58
    let cancel_token = runtime.primary_token();
59
    let maybe_path = flags
60
        .model_path_pos
61
        .clone()
62
        .or(flags.model_path_flag.clone());
63

64
    let mut local_model: LocalModel = match out_opt {
65
        // If output is dynamic we are ingress and don't have a local model, but making an
66
        // empty one cleans up the code.
67
        Output::Dynamic => Default::default(),
68
69

        // All other output types have a local model
70
71
72
73
74
75
        _ => {
            match &maybe_path {
                Some(model_path) => {
                    LocalModel::prepare(
                        model_path.to_str().context("Invalid UTF-8 in model path")?,
                        flags.model_config.as_deref(),
76
                        flags.model_name.clone(),
77
78
                    )
                    .await?
79
                }
80
81
                None => {
                    // echo_full engine doesn't need a path
82
83
84
85
                    match &flags.model_name {
                        Some(name) => LocalModel::with_name_only(name),
                        None => Default::default(),
                    }
86
87
                }
            }
Graham King's avatar
Graham King committed
88
        }
89
    };
90
91
92
93
94
95
96
97
98
99
100

    // Only set if user provides. Usually loaded from tokenizer_config.json
    if let Some(context_length) = flags.context_length {
        local_model.set_context_length(context_length);
    }
    // Always set, there is no engine provided default
    local_model.set_kv_cache_block_size(
        flags
            .kv_cache_block_size
            .unwrap_or(DEFAULT_KV_CACHE_BLOCK_SIZE),
    );
101

102
    let mut extra: Option<Pin<Box<dyn Future<Output = ()> + Send>>> = None; // vllm and sglang sub-process
103

104
105
106
107
108
109
110
111
    let template = if let Some(path) = flags.request_template.as_ref() {
        let template = RequestTemplate::load(path)?;
        tracing::debug!("Using request template: {template:?}");
        Some(template)
    } else {
        None
    };

112
113
114
    // We may need it later
    let card = local_model.card().clone();

115
116
    // Create the engine matching `out`
    let engine_config = match out_opt {
117
118
119
120
121
122
123
124
125
126
        Output::Dynamic => {
            // Sanity check - TODO probably make a general sanity check at start of method
            if flags.context_length.is_some() {
                anyhow::bail!("'--content-length' flag should only be used on the worker node, not on the ingress");
            }
            if flags.kv_cache_block_size.is_some() {
                anyhow::bail!("'--kv-cache-block-size' flag should only be used on the worker node, not on the ingress");
            }
            EngineConfig::Dynamic
        }
127
128
129
130
        Output::EchoFull => EngineConfig::StaticFull {
            model: Box::new(local_model),
            engine: dynamo_llm::engines::make_engine_full(),
        },
131
        Output::EchoCore => {
132
133
            let card = local_model.card();
            if !card.has_tokenizer() {
134
135
136
137
138
                anyhow::bail!(
                    "out=echo_core need to find the tokenizer. Pass flag --model-path <path>"
                );
            };
            EngineConfig::StaticCore {
139
                engine: dynamo_llm::engines::make_engine_core(),
140
                model: Box::new(local_model),
141
142
            }
        }
143
        #[cfg(feature = "mistralrs")]
144
        Output::MistralRs => EngineConfig::StaticFull {
145
            engine: dynamo_engine_mistralrs::make_engine(&local_model).await?,
146
147
            model: Box::new(local_model),
        },
148
        Output::SgLang => {
149
150
151
152
            if !local_model.path().is_dir() {
                // TODO Does sglang support GGUF? Can we make it work?
                anyhow::bail!("`--model-path should point at a HuggingFace repo checkout");
            }
153
154
155
156
157
158
159
160

            // If `in=dyn` we want the sglang subprocess to listen on that endpoint.
            // If not, then the endpoint isn't exposed so we invent an internal one.
            let endpoint = match &in_opt {
                Input::Endpoint(path) => path.parse()?,
                _ => INTERNAL_ENDPOINT.parse()?,
            };

161
162
163
164
165
166
            let multi_node_conf = dynamo_llm::engines::MultiNodeConfig {
                num_nodes: flags.num_nodes,
                node_rank: flags.node_rank,
                leader_addr: flags.leader_addr.clone().unwrap_or_default(),
            };
            let (py_script, child) = match subprocess::start(
167
                subprocess::sglang::PY,
168
                &local_model,
169
                &endpoint,
170
                flags.clone(),
171
172
173
174
175
                if flags.num_nodes <= 1 {
                    None
                } else {
                    Some(multi_node_conf)
                },
176
177
178
179
180
181
182
183
184
185
186
187
188
189
            )
            .await
            {
                Ok(x) => x,
                Err(err) => {
                    anyhow::bail!("Failed starting sglang sub-process: {err}");
                }
            };
            let cancel_token = cancel_token.clone();

            // Sub-process cleanup
            extra = Some(Box::pin(async move {
                stopper(cancel_token, child, py_script).await;
            }));
190
            EngineConfig::Dynamic
191
192
193
194
195
        }
        Output::Vllm => {
            if flags.base_gpu_id != 0 {
                anyhow::bail!("vllm does not support base_gpu_id. Set environment variable CUDA_VISIBLE_DEVICES instead.");
            }
196
197
198
199
200
201
202
203

            // If `in=dyn` we want the vllm subprocess to listen on that endpoint.
            // If not, then the endpoint isn't exposed so we invent an internal one.
            let endpoint = match &in_opt {
                Input::Endpoint(path) => path.parse()?,
                _ => INTERNAL_ENDPOINT.parse()?,
            };

204
            let (py_script, child) = match subprocess::start(
205
                subprocess::vllm::PY,
206
                &local_model,
207
                &endpoint,
208
                flags.clone(),
209
                None, // multi-node config. vllm uses `ray`, see guide
210
211
212
213
214
215
216
217
218
219
220
221
222
223
            )
            .await
            {
                Ok(x) => x,
                Err(err) => {
                    anyhow::bail!("Failed starting vllm sub-process: {err}");
                }
            };
            let cancel_token = cancel_token.clone();

            // Sub-process cleanup
            extra = Some(Box::pin(async move {
                stopper(cancel_token, child, py_script).await;
            }));
224
            EngineConfig::Dynamic
225
226
        }

227
228
        #[cfg(feature = "llamacpp")]
        Output::LlamaCpp => {
229
            if !local_model.path().is_file() {
230
231
                anyhow::bail!("--model-path should refer to a GGUF file. llama_cpp does not support safetensors.");
            }
232
            let engine =
233
                dynamo_engine_llamacpp::make_engine(cancel_token.clone(), &local_model).await?;
234
            EngineConfig::StaticCore {
235
                engine,
236
                model: Box::new(local_model),
Graham King's avatar
Graham King committed
237
238
            }
        }
239
240
        #[cfg(feature = "python")]
        Output::PythonStr(path_str) => {
241
242
            let card = local_model.card();
            let py_args = flags.as_vec(&path_str, &card.service_name);
243
            let p = std::path::PathBuf::from(path_str);
244
245
            let engine =
                dynamo_engine_python::make_string_engine(cancel_token.clone(), &p, py_args).await?;
246
247
            EngineConfig::StaticFull {
                engine,
248
                model: Box::new(local_model),
249
250
            }
        }
251
252
253
254
    };

    match in_opt {
        Input::Http => {
255
            crate::input::http::run(runtime.clone(), flags, engine_config, template).await?;
256
257
        }
        Input::Text => {
258
            crate::input::text::run(runtime.clone(), flags, None, engine_config, template).await?;
259
260
261
262
        }
        Input::Stdin => {
            let mut prompt = String::new();
            std::io::stdin().read_to_string(&mut prompt).unwrap();
263
264
265
266
267
268
269
270
            crate::input::text::run(
                runtime.clone(),
                flags,
                Some(prompt),
                engine_config,
                template,
            )
            .await?;
271
        }
272
        Input::Batch(path) => {
273
274
            crate::input::batch::run(runtime.clone(), flags, card, path, engine_config, template)
                .await?;
275
        }
276
        Input::Endpoint(path) => {
277
278
            let distributed_runtime = DistributedRuntime::from_settings(runtime.clone()).await?;
            crate::input::endpoint::run(distributed_runtime, path, engine_config).await?;
279
280
281
282
        }
    }

    // Allow engines to ask main thread to wait on an extra future.
283
    // We use this to stop the vllm and sglang sub-process
284
    if let Some(extra) = extra {
285
        extra.await;
286
287
288
289
    }

    Ok(())
}
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328

/// Wait for cancel_token to be cancelled, then stop the child as gracefully as possible.
/// Keeps the TempPath alive until the child is stopped.
async fn stopper(
    cancel_token: CancellationToken,
    mut child: tokio::process::Child,
    py_script: tempfile::TempPath,
) {
    cancel_token.cancelled().await;

    // Ask subprocess to stop gracefully
    if let Some(pid) = child.id() {
        unsafe { libc::kill(pid as i32, libc::SIGTERM) };
    }

    tokio::select! {
        exit = child.wait() => {
            tracing::trace!("vllm sub-process graceful exit");
            match exit {
                Ok(exit_status) if exit_status.success() => {}
                Ok(exit_status) => {
                    // This is nearly always 15 (SIGTERM)
                    tracing::trace!("vllm sub-process non-0 exit: {exit_status}");
                }
                Err(err) => {
                    tracing::warn!("vllm sub-process error getting exit status: {err}");
                }
            }
        }
        _ = tokio::time::sleep(CHILD_STOP_TIMEOUT) => {
            // It didn't stop in time, kill it
            child.kill().await.expect("Failed killing vllm subprocess");
            let _ = child.wait().await;
        }
    }
    // This temporary file contains the python script running the engine. It deletes on drop.
    // Keep it alive until the engine has stopped.
    drop(py_script);
}