"pcdet/ops/vscode:/vscode.git/clone" did not exist on "43baf787bcc3ceefc007c0bafb3b122a56911cc9"
lib.rs 15.2 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

16
17
#[cfg(any(feature = "vllm", feature = "sglang"))]
use std::{future::Future, pin::Pin};
18
use std::{io::Read, sync::Arc};
19

20
use anyhow::Context;
Neelay Shah's avatar
Neelay Shah committed
21
use dynamo_llm::{
22
    backend::ExecutionContext, engines::StreamingEngine, kv_router::publisher::KvMetricsPublisher,
23
    LocalModel,
24
};
25
use dynamo_runtime::{protocols::Endpoint, DistributedRuntime};
26

27
28
mod flags;
pub use flags::Flags;
29
mod input;
30
#[cfg(any(feature = "vllm", feature = "sglang"))]
31
mod net;
32
mod opt;
33
pub use dynamo_llm::request_template::RequestTemplate;
34
35
pub use opt::{Input, Output};

36
37
38
/// How we identify a namespace/component/endpoint URL.
/// Technically the '://' is not part of the scheme but it eliminates several string
/// concatenations.
39
const ENDPOINT_SCHEME: &str = "dyn://";
40

41
42
43
44
/// When `in=text` the user doesn't need to know the model name, and doesn't need to provide it on
/// the command line. Hence it's optional, and defaults to this.
const INVISIBLE_MODEL_NAME: &str = "dynamo-run";

45
46
47
/// The component name for the KV publisher, if used
const KV_PUBLISHER_COMPONENT: &str = "kvpublisher";

48
49
50
51
/// How we identify a python string endpoint
#[cfg(feature = "python")]
const PYTHON_STR_SCHEME: &str = "pystr:";

52
53
54
55
/// How we identify a python token endpoint
#[cfg(feature = "python")]
const PYTHON_TOK_SCHEME: &str = "pytok:";

56
pub enum EngineConfig {
57
    /// An remote networked engine we don't know about yet
58
    Dynamic(Endpoint),
59

60
61
    /// A Full service engine does it's own tokenization and prompt formatting.
    StaticFull {
62
        engine: Arc<dyn StreamingEngine>,
63
        model: Box<LocalModel>,
64
    },
65
66
67
68

    /// A core engine expects to be wrapped with pre/post processors that handle tokenization.
    StaticCore {
        engine: ExecutionContext,
69
        model: Box<LocalModel>,
70
    },
71

72
73
    /// vllm multi-node doesn't run an engine on nodes other than 0. 'ray' does all the work.
    None,
74
75
}

76
77
78
79
80
81
/// Distributed system values
struct DynInput {
    endpoint_id: Endpoint,
    distributed_runtime: DistributedRuntime,
}

82
#[allow(unused_mut)]
83
pub async fn run(
Neelay Shah's avatar
Neelay Shah committed
84
    runtime: dynamo_runtime::Runtime,
85
    mut in_opt: Input, // mut because vllm and sglang multi-node can change it
86
87
    out_opt: Output,
    flags: Flags,
88
    #[allow(unused_variables)] zmq_socket_prefix: Option<String>,
89
) -> anyhow::Result<()> {
90
    let cancel_token = runtime.primary_token();
91
    let maybe_path = flags
92
        .model_path_pos
93
        .clone()
94
        .or(flags.model_path_flag.clone());
95

96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
    let local_model: LocalModel = match out_opt {
        // If output is an endpoint we are ingress and don't have a local model, but making an
        // empty one cleans up the code.
        Output::Endpoint(_) => Default::default(),
        _ => {
            match &maybe_path {
                Some(model_path) => {
                    let maybe_model_name = if in_opt == Input::Text {
                        Some(INVISIBLE_MODEL_NAME.to_string())
                    } else {
                        flags.model_name.clone()
                    };
                    LocalModel::prepare(
                        model_path.to_str().context("Invalid UTF-8 in model path")?,
                        flags.model_config.as_deref(),
                        maybe_model_name.as_deref(),
                    )
                    .await?
114
                }
115
116
117
                None => {
                    // echo_full engine doesn't need a path
                    Default::default()
118
119
                }
            }
Graham King's avatar
Graham King committed
120
        }
121
    };
122

123
124
    let dyn_input = match &in_opt {
        Input::Endpoint(endpoint_path) => {
125
            if maybe_path.as_ref().map(|mp| mp.is_file()).unwrap_or(false)
126
127
128
129
130
131
132
133
134
135
                && flags.model_config.is_none()
            {
                // TODO We need to convert tokenizer extract from GGUF file into something we can
                // publish to NATS. Ideally `tokenizer.json` directly, but otherwise an
                // intermediate format.
                tracing::error!("Serving GGUF files in a distributed system requires `--model-config <hf-repo-dir>` so that we can find the tokenzier config");
                return Ok(());
            }

            // If we are in a distributed system, we need to know our component upfront
136
137
138
139
140
141
142
143
144
145
            let distributed_runtime = DistributedRuntime::from_settings(runtime.clone()).await?;
            let endpoint_id: Endpoint = endpoint_path.parse()?;
            Some(DynInput {
                endpoint_id,
                distributed_runtime,
            })
        }
        _ => None,
    };

Graham King's avatar
Graham King committed
146
    #[cfg(any(feature = "vllm", feature = "sglang"))]
147
    let mut extra: Option<Pin<Box<dyn Future<Output = ()> + Send>>> = None; // vllm and sglang sub-process
148

149
150
151
152
153
154
155
156
    let template = if let Some(path) = flags.request_template.as_ref() {
        let template = RequestTemplate::load(path)?;
        tracing::debug!("Using request template: {template:?}");
        Some(template)
    } else {
        None
    };

157
158
159
    // We may need it later
    let card = local_model.card().clone();

160
161
    // Create the engine matching `out`
    let engine_config = match out_opt {
162
163
164
        Output::Endpoint(path) => {
            let endpoint: Endpoint = path.parse()?;
            EngineConfig::Dynamic(endpoint)
165
        }
166
167
168
169
        Output::EchoFull => EngineConfig::StaticFull {
            model: Box::new(local_model),
            engine: dynamo_llm::engines::make_engine_full(),
        },
170
        Output::EchoCore => {
171
172
            let card = local_model.card();
            if !card.has_tokenizer() {
173
174
175
176
177
                anyhow::bail!(
                    "out=echo_core need to find the tokenizer. Pass flag --model-path <path>"
                );
            };
            EngineConfig::StaticCore {
178
                engine: dynamo_llm::engines::make_engine_core(),
179
                model: Box::new(local_model),
180
181
            }
        }
182
        #[cfg(feature = "mistralrs")]
183
184
185
186
        Output::MistralRs => EngineConfig::StaticFull {
            engine: dynamo_engine_mistralrs::make_engine(local_model.path()).await?,
            model: Box::new(local_model),
        },
187
188
        #[cfg(feature = "sglang")]
        Output::SgLang => {
189
            if !local_model.path().is_dir() {
190
191
192
193
194
                anyhow::bail!("`--model-path should point at a HuggingFace repo checkout");
            }
            let Some(sock_prefix) = zmq_socket_prefix else {
                anyhow::bail!("sglang requires zmq_socket_prefix");
            };
Neelay Shah's avatar
Neelay Shah committed
195
            let node_conf = dynamo_llm::engines::MultiNodeConfig {
196
197
                num_nodes: flags.num_nodes,
                node_rank: flags.node_rank,
198
                leader_addr: flags.leader_addr.clone().unwrap_or_default(),
199
200
201
202
203
204
205
            };
            if node_conf.num_nodes > 1 {
                if let Ok(Some(if_name)) = net::get_primary_interface().await {
                    tracing::info!("If you see 'gloo' errors from sglang try setting these environment variables:");
                    tracing::info!("export GLOO_SOCKET_IFNAME={if_name}");
                    tracing::info!("export NCCL_SOCKET_IFNAME={if_name}");
                }
206
207
208
209
210
                if node_conf.node_rank != 0 {
                    // Follower nodes take input from leader node over pytorch distributed, not
                    // from user.
                    in_opt = Input::None;
                }
211
212
            }

213
            let (engine, sglang_process) = dynamo_engine_sglang::make_engine(
214
                cancel_token.clone(),
215
                local_model.path(),
216
217
218
219
                &sock_prefix,
                node_conf,
                flags.tensor_parallel_size,
                flags.base_gpu_id,
220
                flags.extra_engine_args.clone(),
221
222
            )
            .await?;
223
224
225
            extra = Some(Box::pin(async move {
                let _ = sglang_process.await;
            }));
226
            EngineConfig::StaticCore {
227
                engine,
228
                model: Box::new(local_model),
229
230
            }
        }
Graham King's avatar
Graham King committed
231
        #[cfg(feature = "vllm")]
232
        Output::Vllm0_7 => {
233
234
235
            if flags.base_gpu_id != 0 {
                anyhow::bail!("vllm does not support base_gpu_id. Set environment variable CUDA_VISIBLE_DEVICES instead.");
            }
Graham King's avatar
Graham King committed
236
237
238
            let Some(sock_prefix) = zmq_socket_prefix else {
                anyhow::bail!("vllm requires zmq_socket_prefix");
            };
Neelay Shah's avatar
Neelay Shah committed
239
            let node_conf = dynamo_llm::engines::MultiNodeConfig {
240
241
                num_nodes: flags.num_nodes,
                node_rank: flags.node_rank,
242
                leader_addr: flags.leader_addr.clone().unwrap_or_default(),
243
244
245
246
247
248
249
250
251
252
253
254
            };
            if node_conf.num_nodes > 1 {
                if let Ok(Some(if_name)) = net::get_primary_interface().await {
                    tracing::info!("If you see network errors from vllm try setting this environment variable:");
                    tracing::info!("export NCCL_SOCKET_IFNAME={if_name}");
                }
                if node_conf.node_rank != 0 {
                    // Only node 0 runs vllm, the others communicate over ray
                    in_opt = Input::None;
                }
            }
            if node_conf.node_rank == 0 {
255
256
257
258
259
260
261
                let kv_metrics_publisher = if let Some(dyn_input) = &dyn_input {
                    let kvp_component = dyn_input
                        .distributed_runtime
                        .namespace(dyn_input.endpoint_id.namespace.clone())?
                        .component(KV_PUBLISHER_COMPONENT)?;
                    let kvp = Arc::new(KvMetricsPublisher::new()?);
                    let kvp_inner = kvp.clone();
262
263
264
                    tokio::spawn(
                        async move { kvp_inner.create_endpoint(kvp_component, None).await },
                    );
265
266
267
268
269
                    Some(kvp)
                } else {
                    None
                };

270
                // vllm multi-node only the leader runs vllm
271
                let (engine, vllm_future) = dynamo_engine_vllm0_7::make_leader_engine(
272
                    cancel_token.clone(),
273
                    local_model.path(),
274
275
276
                    &sock_prefix,
                    node_conf,
                    flags.tensor_parallel_size,
277
278
                    flags.extra_engine_args.clone(),
                    kv_metrics_publisher,
279
280
281
282
283
284
285
                )
                .await?;
                extra = Some(Box::pin(async move {
                    let _ = vllm_future.await;
                }));
                EngineConfig::StaticCore {
                    engine,
286
                    model: Box::new(local_model),
287
288
289
                }
            } else {
                // Nodes rank > 0 only run 'ray'
290
                let stop_future =
291
                    dynamo_engine_vllm0_7::start_follower(cancel_token.clone(), node_conf).await?;
292
293
                extra = Some(Box::pin(stop_future));
                EngineConfig::None
Graham King's avatar
Graham King committed
294
295
            }
        }
296
297
298
299
300
301
302
303
304
305
306
307
308

        #[cfg(feature = "vllm")]
        Output::Vllm | Output::Vllm0_8 => {
            if flags.base_gpu_id != 0 {
                anyhow::bail!("vllm does not support base_gpu_id. Set environment variable CUDA_VISIBLE_DEVICES instead.");
            }
            let node_conf = dynamo_llm::engines::MultiNodeConfig {
                num_nodes: flags.num_nodes,
                node_rank: flags.node_rank,
                leader_addr: flags.leader_addr.clone().unwrap_or_default(),
            };
            let engine = dynamo_engine_vllm0_8::make_engine(
                cancel_token.clone(),
309
                local_model.path(),
310
311
312
313
314
315
316
                node_conf,
                flags.tensor_parallel_size,
                flags.extra_engine_args.clone(),
            )
            .await?;
            EngineConfig::StaticCore {
                engine,
317
                model: Box::new(local_model),
318
319
320
            }
        }

321
322
        #[cfg(feature = "llamacpp")]
        Output::LlamaCpp => {
323
            if !local_model.path().is_file() {
324
325
                anyhow::bail!("--model-path should refer to a GGUF file. llama_cpp does not support safetensors.");
            }
326
            let engine =
327
328
                dynamo_engine_llamacpp::make_engine(cancel_token.clone(), local_model.path())
                    .await?;
329
            EngineConfig::StaticCore {
330
                engine,
331
                model: Box::new(local_model),
Graham King's avatar
Graham King committed
332
333
            }
        }
334
335
        #[cfg(feature = "python")]
        Output::PythonStr(path_str) => {
336
337
            let card = local_model.card();
            let py_args = flags.as_vec(&path_str, &card.service_name);
338
            let p = std::path::PathBuf::from(path_str);
339
340
            let engine =
                dynamo_engine_python::make_string_engine(cancel_token.clone(), &p, py_args).await?;
341
342
            EngineConfig::StaticFull {
                engine,
343
                model: Box::new(local_model),
344
345
            }
        }
346
347
        #[cfg(feature = "python")]
        Output::PythonTok(path_str) => {
348
349
            let card = local_model.card();
            let py_args = flags.as_vec(&path_str, &card.service_name);
350
            let p = std::path::PathBuf::from(path_str);
351
352
            let engine =
                dynamo_engine_python::make_token_engine(cancel_token.clone(), &p, py_args).await?;
353
354
            EngineConfig::StaticCore {
                engine,
355
                model: Box::new(local_model),
356
357
            }
        }
358
359
360
361
    };

    match in_opt {
        Input::Http => {
362
            crate::input::http::run(runtime.clone(), flags, engine_config, template).await?;
363
364
        }
        Input::Text => {
365
            crate::input::text::run(runtime.clone(), flags, None, engine_config, template).await?;
366
367
368
369
        }
        Input::Stdin => {
            let mut prompt = String::new();
            std::io::stdin().read_to_string(&mut prompt).unwrap();
370
371
372
373
374
375
376
377
            crate::input::text::run(
                runtime.clone(),
                flags,
                Some(prompt),
                engine_config,
                template,
            )
            .await?;
378
        }
379
        Input::Batch(path) => {
380
381
            crate::input::batch::run(runtime.clone(), flags, card, path, engine_config, template)
                .await?;
382
        }
383
        Input::Endpoint(path) => {
384
385
386
387
            let Some(dyn_input) = dyn_input else {
                unreachable!("We set dyn_input earlier");
            };
            crate::input::endpoint::run(dyn_input.distributed_runtime, path, engine_config).await?;
388
        }
389
390
391
392
393
394
395
396
        Input::None => {
            // Multi-node setup. The engine sub-process has been started and is talking
            // to it's node_rank 0 controller. We do nothing.
            // TODO: Acquire an etcd lease, we are running
            cancel_token.cancelled().await;
        }
    }

Graham King's avatar
Graham King committed
397
    #[cfg(any(feature = "vllm", feature = "sglang"))]
398
399
    // Allow engines to ask main thread to wait on an extra future.
    if let Some(extra) = extra {
400
        extra.await;
401
402
403
404
    }

    Ok(())
}