lib.rs 18.8 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

16
17
#[cfg(any(feature = "vllm", feature = "sglang"))]
use std::{future::Future, pin::Pin};
18
use std::{io::Read, sync::Arc, time::Duration};
19

20
use anyhow::Context;
Neelay Shah's avatar
Neelay Shah committed
21
use dynamo_llm::{
22
    backend::ExecutionContext, engines::StreamingEngine, kv_router::publisher::KvMetricsPublisher,
23
    LocalModel,
24
};
25
use dynamo_runtime::{protocols::Endpoint, CancellationToken, DistributedRuntime};
26

27
28
mod flags;
pub use flags::Flags;
29
mod input;
30
#[cfg(any(feature = "vllm", feature = "sglang"))]
31
mod net;
32
mod opt;
33
pub use dynamo_llm::request_template::RequestTemplate;
34
pub use opt::{Input, Output};
35
mod subprocess;
36

37
38
39
40
/// When `in=text` the user doesn't need to know the model name, and doesn't need to provide it on
/// the command line. Hence it's optional, and defaults to this.
const INVISIBLE_MODEL_NAME: &str = "dynamo-run";

41
42
43
/// The component name for the KV publisher, if used
const KV_PUBLISHER_COMPONENT: &str = "kvpublisher";

44
45
const CHILD_STOP_TIMEOUT: Duration = Duration::from_secs(2);

46
47
48
49
/// How we identify a python string endpoint
#[cfg(feature = "python")]
const PYTHON_STR_SCHEME: &str = "pystr:";

50
51
52
53
/// How we identify a python token endpoint
#[cfg(feature = "python")]
const PYTHON_TOK_SCHEME: &str = "pytok:";

54
pub enum EngineConfig {
55
    /// An remote networked engine we don't know about yet
56
    Dynamic(Endpoint),
57

58
59
    /// A Full service engine does it's own tokenization and prompt formatting.
    StaticFull {
60
        engine: Arc<dyn StreamingEngine>,
61
        model: Box<LocalModel>,
62
    },
63
64
65
66

    /// A core engine expects to be wrapped with pre/post processors that handle tokenization.
    StaticCore {
        engine: ExecutionContext,
67
        model: Box<LocalModel>,
68
    },
69

70
71
    /// vllm multi-node doesn't run an engine on nodes other than 0. 'ray' does all the work.
    None,
72
73
}

74
75
76
77
78
79
/// Distributed system values
struct DynInput {
    endpoint_id: Endpoint,
    distributed_runtime: DistributedRuntime,
}

80
#[allow(unused_mut)]
81
pub async fn run(
Neelay Shah's avatar
Neelay Shah committed
82
    runtime: dynamo_runtime::Runtime,
83
    mut in_opt: Input, // mut because vllm and sglang multi-node can change it
84
85
    out_opt: Output,
    flags: Flags,
86
    #[allow(unused_variables)] zmq_socket_prefix: Option<String>,
87
) -> anyhow::Result<()> {
88
    let cancel_token = runtime.primary_token();
89
    let maybe_path = flags
90
        .model_path_pos
91
        .clone()
92
        .or(flags.model_path_flag.clone());
93

94
95
96
97
    let local_model: LocalModel = match out_opt {
        // If output is an endpoint we are ingress and don't have a local model, but making an
        // empty one cleans up the code.
        Output::Endpoint(_) => Default::default(),
98
99

        // All other output types have a local model
100
101
102
103
104
105
106
107
108
109
110
111
112
113
        _ => {
            match &maybe_path {
                Some(model_path) => {
                    let maybe_model_name = if in_opt == Input::Text {
                        Some(INVISIBLE_MODEL_NAME.to_string())
                    } else {
                        flags.model_name.clone()
                    };
                    LocalModel::prepare(
                        model_path.to_str().context("Invalid UTF-8 in model path")?,
                        flags.model_config.as_deref(),
                        maybe_model_name.as_deref(),
                    )
                    .await?
114
                }
115
116
117
                None => {
                    // echo_full engine doesn't need a path
                    Default::default()
118
119
                }
            }
Graham King's avatar
Graham King committed
120
        }
121
    };
122

123
124
    let dyn_input = match &in_opt {
        Input::Endpoint(endpoint_path) => {
125
            if maybe_path.as_ref().map(|mp| mp.is_file()).unwrap_or(false)
126
127
128
129
130
131
132
133
134
135
                && flags.model_config.is_none()
            {
                // TODO We need to convert tokenizer extract from GGUF file into something we can
                // publish to NATS. Ideally `tokenizer.json` directly, but otherwise an
                // intermediate format.
                tracing::error!("Serving GGUF files in a distributed system requires `--model-config <hf-repo-dir>` so that we can find the tokenzier config");
                return Ok(());
            }

            // If we are in a distributed system, we need to know our component upfront
136
137
138
139
140
141
142
143
144
145
            let distributed_runtime = DistributedRuntime::from_settings(runtime.clone()).await?;
            let endpoint_id: Endpoint = endpoint_path.parse()?;
            Some(DynInput {
                endpoint_id,
                distributed_runtime,
            })
        }
        _ => None,
    };

146
    let mut extra: Option<Pin<Box<dyn Future<Output = ()> + Send>>> = None; // vllm and sglang sub-process
147

148
149
150
151
152
153
154
155
    let template = if let Some(path) = flags.request_template.as_ref() {
        let template = RequestTemplate::load(path)?;
        tracing::debug!("Using request template: {template:?}");
        Some(template)
    } else {
        None
    };

156
157
158
    // We may need it later
    let card = local_model.card().clone();

159
160
    // Create the engine matching `out`
    let engine_config = match out_opt {
161
162
163
        Output::Endpoint(path) => {
            let endpoint: Endpoint = path.parse()?;
            EngineConfig::Dynamic(endpoint)
164
        }
165
166
167
168
        Output::EchoFull => EngineConfig::StaticFull {
            model: Box::new(local_model),
            engine: dynamo_llm::engines::make_engine_full(),
        },
169
        Output::EchoCore => {
170
171
            let card = local_model.card();
            if !card.has_tokenizer() {
172
173
174
175
176
                anyhow::bail!(
                    "out=echo_core need to find the tokenizer. Pass flag --model-path <path>"
                );
            };
            EngineConfig::StaticCore {
177
                engine: dynamo_llm::engines::make_engine_core(),
178
                model: Box::new(local_model),
179
180
            }
        }
181
        #[cfg(feature = "mistralrs")]
182
183
184
185
        Output::MistralRs => EngineConfig::StaticFull {
            engine: dynamo_engine_mistralrs::make_engine(local_model.path()).await?,
            model: Box::new(local_model),
        },
186

187
        Output::SgLang => {
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
            if !local_model.path().is_dir() {
                // TODO Does sglang support GGUF? Can we make it work?
                anyhow::bail!("`--model-path should point at a HuggingFace repo checkout");
            }
            let (py_script, mut child) = match subprocess::start(
                subprocess::sglang::PY,
                local_model.path(),
                flags.tensor_parallel_size,
                if flags.base_gpu_id == 0 {
                    None
                } else {
                    Some(flags.base_gpu_id)
                },
                flags.extra_engine_args.as_deref(),
            )
            .await
            {
                Ok(x) => x,
                Err(err) => {
                    anyhow::bail!("Failed starting sglang sub-process: {err}");
                }
            };
            let cancel_token = cancel_token.clone();

            // Sub-process cleanup
            extra = Some(Box::pin(async move {
                stopper(cancel_token, child, py_script).await;
            }));
            let endpoint: Endpoint = subprocess::ENDPOINT.parse()?;
            EngineConfig::Dynamic(endpoint)
        }

        #[cfg(feature = "sglang")]
        Output::SgLangLegacy => {
222
            if !local_model.path().is_dir() {
223
224
225
226
227
                anyhow::bail!("`--model-path should point at a HuggingFace repo checkout");
            }
            let Some(sock_prefix) = zmq_socket_prefix else {
                anyhow::bail!("sglang requires zmq_socket_prefix");
            };
Neelay Shah's avatar
Neelay Shah committed
228
            let node_conf = dynamo_llm::engines::MultiNodeConfig {
229
230
                num_nodes: flags.num_nodes,
                node_rank: flags.node_rank,
231
                leader_addr: flags.leader_addr.clone().unwrap_or_default(),
232
233
234
235
236
237
238
            };
            if node_conf.num_nodes > 1 {
                if let Ok(Some(if_name)) = net::get_primary_interface().await {
                    tracing::info!("If you see 'gloo' errors from sglang try setting these environment variables:");
                    tracing::info!("export GLOO_SOCKET_IFNAME={if_name}");
                    tracing::info!("export NCCL_SOCKET_IFNAME={if_name}");
                }
239
240
241
242
243
                if node_conf.node_rank != 0 {
                    // Follower nodes take input from leader node over pytorch distributed, not
                    // from user.
                    in_opt = Input::None;
                }
244
245
            }

246
            let (engine, sglang_process) = dynamo_engine_sglang::make_engine(
247
                cancel_token.clone(),
248
                local_model.path(),
249
250
251
252
                &sock_prefix,
                node_conf,
                flags.tensor_parallel_size,
                flags.base_gpu_id,
253
                flags.extra_engine_args.clone(),
254
255
            )
            .await?;
256
257
258
            extra = Some(Box::pin(async move {
                let _ = sglang_process.await;
            }));
259
            EngineConfig::StaticCore {
260
                engine,
261
                model: Box::new(local_model),
262
263
            }
        }
Graham King's avatar
Graham King committed
264
        #[cfg(feature = "vllm")]
265
        Output::Vllm0_7 => {
266
267
268
            if flags.base_gpu_id != 0 {
                anyhow::bail!("vllm does not support base_gpu_id. Set environment variable CUDA_VISIBLE_DEVICES instead.");
            }
Graham King's avatar
Graham King committed
269
270
271
            let Some(sock_prefix) = zmq_socket_prefix else {
                anyhow::bail!("vllm requires zmq_socket_prefix");
            };
Neelay Shah's avatar
Neelay Shah committed
272
            let node_conf = dynamo_llm::engines::MultiNodeConfig {
273
274
                num_nodes: flags.num_nodes,
                node_rank: flags.node_rank,
275
                leader_addr: flags.leader_addr.clone().unwrap_or_default(),
276
277
278
279
280
281
282
283
284
285
286
287
            };
            if node_conf.num_nodes > 1 {
                if let Ok(Some(if_name)) = net::get_primary_interface().await {
                    tracing::info!("If you see network errors from vllm try setting this environment variable:");
                    tracing::info!("export NCCL_SOCKET_IFNAME={if_name}");
                }
                if node_conf.node_rank != 0 {
                    // Only node 0 runs vllm, the others communicate over ray
                    in_opt = Input::None;
                }
            }
            if node_conf.node_rank == 0 {
288
289
290
291
292
293
294
                let kv_metrics_publisher = if let Some(dyn_input) = &dyn_input {
                    let kvp_component = dyn_input
                        .distributed_runtime
                        .namespace(dyn_input.endpoint_id.namespace.clone())?
                        .component(KV_PUBLISHER_COMPONENT)?;
                    let kvp = Arc::new(KvMetricsPublisher::new()?);
                    let kvp_inner = kvp.clone();
295
296
297
                    tokio::spawn(
                        async move { kvp_inner.create_endpoint(kvp_component, None).await },
                    );
298
299
300
301
302
                    Some(kvp)
                } else {
                    None
                };

303
                // vllm multi-node only the leader runs vllm
304
                let (engine, vllm_future) = dynamo_engine_vllm0_7::make_leader_engine(
305
                    cancel_token.clone(),
306
                    local_model.path(),
307
308
309
                    &sock_prefix,
                    node_conf,
                    flags.tensor_parallel_size,
310
311
                    flags.extra_engine_args.clone(),
                    kv_metrics_publisher,
312
313
314
315
316
317
318
                )
                .await?;
                extra = Some(Box::pin(async move {
                    let _ = vllm_future.await;
                }));
                EngineConfig::StaticCore {
                    engine,
319
                    model: Box::new(local_model),
320
321
322
                }
            } else {
                // Nodes rank > 0 only run 'ray'
323
                let stop_future =
324
                    dynamo_engine_vllm0_7::start_follower(cancel_token.clone(), node_conf).await?;
325
326
                extra = Some(Box::pin(stop_future));
                EngineConfig::None
Graham King's avatar
Graham King committed
327
328
            }
        }
329
330

        #[cfg(feature = "vllm")]
331
        Output::Vllm0_8 => {
332
333
334
335
336
337
338
339
340
341
            if flags.base_gpu_id != 0 {
                anyhow::bail!("vllm does not support base_gpu_id. Set environment variable CUDA_VISIBLE_DEVICES instead.");
            }
            let node_conf = dynamo_llm::engines::MultiNodeConfig {
                num_nodes: flags.num_nodes,
                node_rank: flags.node_rank,
                leader_addr: flags.leader_addr.clone().unwrap_or_default(),
            };
            let engine = dynamo_engine_vllm0_8::make_engine(
                cancel_token.clone(),
342
                local_model.path(),
343
344
345
346
347
348
349
                node_conf,
                flags.tensor_parallel_size,
                flags.extra_engine_args.clone(),
            )
            .await?;
            EngineConfig::StaticCore {
                engine,
350
                model: Box::new(local_model),
351
352
353
            }
        }

354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
        // No feature flag because it uses a sub-process, it's very cheap to include
        Output::Vllm => {
            if flags.base_gpu_id != 0 {
                anyhow::bail!("vllm does not support base_gpu_id. Set environment variable CUDA_VISIBLE_DEVICES instead.");
            }
            let (py_script, mut child) = match subprocess::start(
                subprocess::vllm::PY,
                local_model.path(),
                flags.tensor_parallel_size,
                None, // base_gpu_id. vllm uses CUDA_VISIBLE_DEVICES instead
                flags.extra_engine_args.as_deref(),
            )
            .await
            {
                Ok(x) => x,
                Err(err) => {
                    anyhow::bail!("Failed starting vllm sub-process: {err}");
                }
            };
            let cancel_token = cancel_token.clone();

            // Sub-process cleanup
            extra = Some(Box::pin(async move {
                stopper(cancel_token, child, py_script).await;
            }));
            let endpoint: Endpoint = subprocess::ENDPOINT.parse()?;
            EngineConfig::Dynamic(endpoint)
        }

383
384
        #[cfg(feature = "llamacpp")]
        Output::LlamaCpp => {
385
            if !local_model.path().is_file() {
386
387
                anyhow::bail!("--model-path should refer to a GGUF file. llama_cpp does not support safetensors.");
            }
388
            let engine =
389
390
                dynamo_engine_llamacpp::make_engine(cancel_token.clone(), local_model.path())
                    .await?;
391
            EngineConfig::StaticCore {
392
                engine,
393
                model: Box::new(local_model),
Graham King's avatar
Graham King committed
394
395
            }
        }
396
397
        #[cfg(feature = "python")]
        Output::PythonStr(path_str) => {
398
399
            let card = local_model.card();
            let py_args = flags.as_vec(&path_str, &card.service_name);
400
            let p = std::path::PathBuf::from(path_str);
401
402
            let engine =
                dynamo_engine_python::make_string_engine(cancel_token.clone(), &p, py_args).await?;
403
404
            EngineConfig::StaticFull {
                engine,
405
                model: Box::new(local_model),
406
407
            }
        }
408
409
        #[cfg(feature = "python")]
        Output::PythonTok(path_str) => {
410
411
            let card = local_model.card();
            let py_args = flags.as_vec(&path_str, &card.service_name);
412
            let p = std::path::PathBuf::from(path_str);
413
414
            let engine =
                dynamo_engine_python::make_token_engine(cancel_token.clone(), &p, py_args).await?;
415
416
            EngineConfig::StaticCore {
                engine,
417
                model: Box::new(local_model),
418
419
            }
        }
420
421
422
423
    };

    match in_opt {
        Input::Http => {
424
            crate::input::http::run(runtime.clone(), flags, engine_config, template).await?;
425
426
        }
        Input::Text => {
427
            crate::input::text::run(runtime.clone(), flags, None, engine_config, template).await?;
428
429
430
431
        }
        Input::Stdin => {
            let mut prompt = String::new();
            std::io::stdin().read_to_string(&mut prompt).unwrap();
432
433
434
435
436
437
438
439
            crate::input::text::run(
                runtime.clone(),
                flags,
                Some(prompt),
                engine_config,
                template,
            )
            .await?;
440
        }
441
        Input::Batch(path) => {
442
443
            crate::input::batch::run(runtime.clone(), flags, card, path, engine_config, template)
                .await?;
444
        }
445
        Input::Endpoint(path) => {
446
447
448
449
            let Some(dyn_input) = dyn_input else {
                unreachable!("We set dyn_input earlier");
            };
            crate::input::endpoint::run(dyn_input.distributed_runtime, path, engine_config).await?;
450
        }
451
452
453
454
455
456
457
458
459
        Input::None => {
            // Multi-node setup. The engine sub-process has been started and is talking
            // to it's node_rank 0 controller. We do nothing.
            // TODO: Acquire an etcd lease, we are running
            cancel_token.cancelled().await;
        }
    }

    // Allow engines to ask main thread to wait on an extra future.
460
    // We use this to stop the vllm and sglang sub-process
461
    if let Some(extra) = extra {
462
        extra.await;
463
464
465
466
    }

    Ok(())
}
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505

/// Wait for cancel_token to be cancelled, then stop the child as gracefully as possible.
/// Keeps the TempPath alive until the child is stopped.
async fn stopper(
    cancel_token: CancellationToken,
    mut child: tokio::process::Child,
    py_script: tempfile::TempPath,
) {
    cancel_token.cancelled().await;

    // Ask subprocess to stop gracefully
    if let Some(pid) = child.id() {
        unsafe { libc::kill(pid as i32, libc::SIGTERM) };
    }

    tokio::select! {
        exit = child.wait() => {
            tracing::trace!("vllm sub-process graceful exit");
            match exit {
                Ok(exit_status) if exit_status.success() => {}
                Ok(exit_status) => {
                    // This is nearly always 15 (SIGTERM)
                    tracing::trace!("vllm sub-process non-0 exit: {exit_status}");
                }
                Err(err) => {
                    tracing::warn!("vllm sub-process error getting exit status: {err}");
                }
            }
        }
        _ = tokio::time::sleep(CHILD_STOP_TIMEOUT) => {
            // It didn't stop in time, kill it
            child.kill().await.expect("Failed killing vllm subprocess");
            let _ = child.wait().await;
        }
    }
    // This temporary file contains the python script running the engine. It deletes on drop.
    // Keep it alive until the engine has stopped.
    drop(py_script);
}