lib.rs 10 KB
Newer Older
1
2
3
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

4
use std::time::Duration;
5
use std::{future::Future, pin::Pin};
6

7
8
9
10
11
use anyhow::Context as _;
use dynamo_llm::entrypoint::input::Input;
use dynamo_llm::entrypoint::EngineConfig;
use dynamo_llm::local_model::{LocalModel, LocalModelBuilder};
use dynamo_runtime::CancellationToken;
12
use dynamo_runtime::{DistributedRuntime, Runtime};
13

14
mod flags;
15
use either::Either;
16
pub use flags::Flags;
17
mod opt;
18
pub use dynamo_llm::request_template::RequestTemplate;
19
pub use opt::Output;
20
mod subprocess;
21

22
23
const CHILD_STOP_TIMEOUT: Duration = Duration::from_secs(2);

24
pub async fn run(
25
    runtime: Runtime,
26
    in_opt: Input,
27
    out_opt: Option<Output>,
28
29
    flags: Flags,
) -> anyhow::Result<()> {
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
    //
    // Configure
    //

    let mut builder = LocalModelBuilder::default();
    builder
        .model_path(
            flags
                .model_path_pos
                .clone()
                .or(flags.model_path_flag.clone()),
        )
        .model_name(flags.model_name.clone())
        .kv_cache_block_size(flags.kv_cache_block_size)
        // Only set if user provides. Usually loaded from tokenizer_config.json
        .context_length(flags.context_length)
46
        .http_port(Some(flags.http_port))
47
        .router_config(Some(flags.router_config()))
48
49
50
51
        .request_template(flags.request_template.clone());

    // If `in=dyn` we want the trtllm/sglang/vllm subprocess to listen on that endpoint.
    // If not, then the endpoint isn't exposed so we let LocalModel invent one.
52
    let mut rt = Either::Left(runtime.clone());
53
    if let Input::Endpoint(path) = &in_opt {
54
55
        builder.endpoint_id(Some(path.parse().with_context(|| path.clone())?));

56
        let distributed_runtime = DistributedRuntime::from_settings(runtime.clone()).await?;
57
        rt = Either::Right(distributed_runtime);
58
    };
59

60
    let local_model = builder.build().await?;
61

62
63
64
    //
    // Create an engine
    //
65

66
    let out_opt = out_opt.unwrap_or_else(|| default_engine_for(&local_model));
67
68
    print_cuda(&out_opt);

69
70
    // Now that we know the output we're targeting, check if we expect it to work
    flags.validate(&local_model, &out_opt)?;
71

72
    // Make an engine from the local_model, flags and output.
73
74
75
76
77
78
79
80
    let (engine_config, extra) = engine_for(
        runtime.primary_token(),
        out_opt,
        flags.clone(),
        local_model,
        rt.clone(),
    )
    .await?;
81

82
83
84
    //
    // Run in from an input
    //
85
    dynamo_llm::entrypoint::input::run_input(rt, in_opt, engine_config).await?;
86

87
88
89
90
91
    // Allow engines to ask main thread to wait on an extra future.
    // We use this to stop the vllm and sglang sub-process
    if let Some(extra) = extra {
        extra.await;
    }
92

93
94
    Ok(())
}
95

96
type ExtraFuture = Pin<Box<dyn Future<Output = ()> + Send>>;
97

98
99
100
101
102
103
104
/// Create the engine matching `out_opt`
/// Note validation happens in Flags::validate. In here assume everything is going to work.
async fn engine_for(
    cancel_token: CancellationToken,
    out_opt: Output,
    flags: Flags,
    local_model: LocalModel,
105
    rt: Either<Runtime, DistributedRuntime>,
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
) -> anyhow::Result<(EngineConfig, Option<ExtraFuture>)> {
    match out_opt {
        Output::Dynamic => Ok((EngineConfig::Dynamic(Box::new(local_model)), None)),
        Output::EchoFull => Ok((
            EngineConfig::StaticFull {
                model: Box::new(local_model),
                engine: dynamo_llm::engines::make_engine_full(),
            },
            None,
        )),
        Output::EchoCore => Ok((
            EngineConfig::StaticCore {
                engine: dynamo_llm::engines::make_engine_core(),
                model: Box::new(local_model),
            },
            None,
        )),
        #[cfg(feature = "mistralrs")]
        Output::MistralRs => Ok((
            EngineConfig::StaticFull {
                engine: dynamo_engine_mistralrs::make_engine(&local_model).await?,
                model: Box::new(local_model),
            },
            None,
        )),
131
        #[cfg(feature = "llamacpp")]
132
        Output::LlamaCpp => Ok((
133
            EngineConfig::StaticCore {
134
                engine: dynamo_engine_llamacpp::make_engine(cancel_token, &local_model).await?,
135
                model: Box::new(local_model),
136
137
138
139
140
141
142
143
144
145
146
            },
            None,
        )),
        // For multi-node config. vllm uses `ray`, see guide
        Output::Vllm => shell(subprocess::vllm::PY, cancel_token, local_model, flags, None).await,
        // For multi-node config. trtlllm uses `mpi`, see guide
        Output::Trtllm => {
            shell(
                subprocess::trtllm::PY,
                cancel_token,
                local_model,
147
                flags,
148
                None,
149
            )
150
            .await
151
        }
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
        Output::SgLang => {
            let multi_node_config = if flags.num_nodes > 1 {
                Some(dynamo_llm::engines::MultiNodeConfig {
                    num_nodes: flags.num_nodes,
                    node_rank: flags.node_rank,
                    leader_addr: flags.leader_addr.clone().unwrap_or_default(),
                })
            } else {
                None
            };
            shell(
                subprocess::sglang::PY,
                cancel_token,
                local_model,
                flags,
                multi_node_config,
            )
            .await
170
        }
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
        Output::Mocker => {
            let Either::Right(drt) = rt else {
                panic!("Mocker requires a distributed runtime to run.");
            };

            let args = flags.mocker_config();
            let endpoint = local_model.endpoint_id().clone();

            let engine =
                dynamo_llm::mocker::engine::make_mocker_engine(drt, endpoint, args).await?;

            Ok((
                EngineConfig::StaticCore {
                    engine,
                    model: Box::new(local_model),
                },
                None,
            ))
        }
190
    }
191
}
192

193
194
195
196
197
198
199
200
201
202
203
204
205
206
async fn shell(
    py_script: &'static str,
    cancel_token: CancellationToken,
    local_model: LocalModel,
    flags: Flags,
    multi_node_config: Option<dynamo_llm::engines::MultiNodeConfig>,
) -> anyhow::Result<(EngineConfig, Option<ExtraFuture>)> {
    let (py_script, child) =
        match subprocess::start(py_script, &local_model, flags.clone(), multi_node_config).await {
            Ok(x) => x,
            Err(err) => {
                anyhow::bail!("Failed starting engine sub-process: {err}");
            }
        };
207

208
209
210
211
212
    // Sub-process cleanup
    let extra: ExtraFuture = Box::pin(async move {
        stopper(cancel_token, child, py_script).await;
    });
    Ok((EngineConfig::Dynamic(Box::new(local_model)), Some(extra)))
213
}
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230

/// Wait for cancel_token to be cancelled, then stop the child as gracefully as possible.
/// Keeps the TempPath alive until the child is stopped.
async fn stopper(
    cancel_token: CancellationToken,
    mut child: tokio::process::Child,
    py_script: tempfile::TempPath,
) {
    cancel_token.cancelled().await;

    // Ask subprocess to stop gracefully
    if let Some(pid) = child.id() {
        unsafe { libc::kill(pid as i32, libc::SIGTERM) };
    }

    tokio::select! {
        exit = child.wait() => {
231
            tracing::trace!("engine sub-process graceful exit");
232
233
234
235
            match exit {
                Ok(exit_status) if exit_status.success() => {}
                Ok(exit_status) => {
                    // This is nearly always 15 (SIGTERM)
236
                    tracing::trace!("engine sub-process non-0 exit: {exit_status}");
237
238
                }
                Err(err) => {
239
                    tracing::warn!("engine sub-process error getting exit status: {err}");
240
241
242
243
244
                }
            }
        }
        _ = tokio::time::sleep(CHILD_STOP_TIMEOUT) => {
            // It didn't stop in time, kill it
245
            child.kill().await.expect("Failed killing engine subprocess");
246
247
248
249
250
251
252
            let _ = child.wait().await;
        }
    }
    // This temporary file contains the python script running the engine. It deletes on drop.
    // Keep it alive until the engine has stopped.
    drop(py_script);
}
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288

/// If the user will benefit from CUDA/Metal/Vulkan, remind them to build with it.
/// If they have it, celebrate!
// Only mistralrs and llamacpp need to be built with CUDA.
// The Python engines only need it at runtime.
#[cfg(any(feature = "mistralrs", feature = "llamacpp"))]
fn print_cuda(output: &Output) {
    // These engines maybe be compiled in, but are they the chosen one?
    match output {
        #[cfg(feature = "mistralrs")]
        Output::MistralRs => {}
        #[cfg(feature = "llamacpp")]
        Output::LlamaCpp => {}
        _ => {
            return;
        }
    }

    #[cfg(feature = "cuda")]
    {
        tracing::info!("CUDA on");
    }
    #[cfg(feature = "metal")]
    {
        tracing::info!("Metal on");
    }
    #[cfg(feature = "vulkan")]
    {
        tracing::info!("Vulkan on");
    }
    #[cfg(not(any(feature = "cuda", feature = "metal", feature = "vulkan")))]
    tracing::info!("CPU mode. Rebuild with `--features cuda|metal|vulkan` for better performance");
}

#[cfg(not(any(feature = "mistralrs", feature = "llamacpp")))]
fn print_cuda(_output: &Output) {}
289

290
291
292
293
294
295
296
297
298
299
300
301
302
fn default_engine_for(local_model: &LocalModel) -> Output {
    let default_engine = if local_model.card().is_gguf() {
        gguf_default()
    } else {
        safetensors_default()
    };
    tracing::info!(
        "Using default engine: {default_engine}. Use out=<engine> to specify one of {}",
        Output::available_engines().join(", ")
    );
    default_engine
}

303
304
fn gguf_default() -> Output {
    #[cfg(feature = "llamacpp")]
305
306
307
    {
        Output::LlamaCpp
    }
308
309

    #[cfg(all(feature = "mistralrs", not(feature = "llamacpp")))]
310
311
312
    {
        Output::MistralRs
    }
313
314

    #[cfg(not(any(feature = "mistralrs", feature = "llamacpp")))]
315
316
317
    {
        Output::EchoFull
    }
318
319
320
321
}

fn safetensors_default() -> Output {
    #[cfg(feature = "mistralrs")]
322
323
324
    {
        Output::MistralRs
    }
325
326

    #[cfg(not(feature = "mistralrs"))]
327
328
329
330
    {
        Output::EchoFull
    }
}