lib.rs 6.61 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

16
17
use std::path::PathBuf;

18
19
20
21
22
23
24
25
26
use triton_distributed_llm::{
    backend::ExecutionContext,
    model_card::model::ModelDeploymentCard,
    types::{
        openai::chat_completions::{
            ChatCompletionRequest, ChatCompletionResponseDelta,
            OpenAIChatCompletionsStreamingEngine,
        },
        Annotated,
27
28
    },
};
29
use triton_distributed_runtime::{component::Client, protocols::Endpoint, DistributedRuntime};
30
31
32
33
34
35

mod input;
mod opt;
mod output;
pub use opt::{Input, Output};

36
37
38
39
40
/// How we identify a namespace/component/endpoint URL.
/// Technically the '://' is not part of the scheme but it eliminates several string
/// concatenations.
const ENDPOINT_SCHEME: &str = "tdr://";

41
42
43
44
/// Required options depend on the in and out choices
#[derive(clap::Parser, Debug, Clone)]
#[command(version, about, long_about = None)]
pub struct Flags {
45
46
47
48
49
50
51
52
53
    /// Full path to the model, which can be either a GGUF file or a checked out HF repository.
    /// For the `echo_full` engine omit the flag.
    #[arg(index = 1)]
    pub model_path_pos: Option<PathBuf>,

    // `--model-path`. The one above is `tio <positional-model-path>`
    #[arg(long = "model-path")]
    pub model_path_flag: Option<PathBuf>,

54
55
56
57
58
59
    /// HTTP port. `in=http` only
    #[arg(long, default_value = "8080")]
    pub http_port: u16,

    /// The name of the model we are serving
    #[arg(long)]
60
    pub model_name: Option<String>,
61
62
63
}

pub enum EngineConfig {
64
65
66
67
    /// An remote networked engine we don't know about yet
    /// We don't have the pre-processor yet so this is only text requests. Type will change later.
    Dynamic(Client<ChatCompletionRequest, Annotated<ChatCompletionResponseDelta>>),

68
69
70
71
72
    /// A Full service engine does it's own tokenization and prompt formatting.
    StaticFull {
        service_name: String,
        engine: OpenAIChatCompletionsStreamingEngine,
    },
73
74
75
76
77
78
79

    /// A core engine expects to be wrapped with pre/post processors that handle tokenization.
    StaticCore {
        service_name: String,
        engine: ExecutionContext,
        card: Box<ModelDeploymentCard>,
    },
80
81
82
}

pub async fn run(
Neelay Shah's avatar
Neelay Shah committed
83
    runtime: triton_distributed_runtime::Runtime,
84
85
86
87
    in_opt: Input,
    out_opt: Output,
    flags: Flags,
) -> anyhow::Result<()> {
88
89
    let cancel_token = runtime.primary_token();

90
    // Turn relative paths into absolute paths
91
92
93
94
    let model_path = flags
        .model_path_pos
        .or(flags.model_path_flag)
        .and_then(|p| p.canonicalize().ok());
95
    // Serve the model under the name provided, or the name of the GGUF file.
96
97
98
99
100
101
    let model_name = flags.model_name.or_else(|| {
        model_path
            .as_ref()
            .and_then(|p| p.iter().last())
            .map(|n| n.to_string_lossy().into_owned())
    });
102
103
104
105
106
107
108
109
110
    // If model path is a directory we can build a model deployment card from it
    let maybe_card = match &model_path {
        Some(model_path) if model_path.is_dir() => {
            ModelDeploymentCard::from_local_path(model_path, model_name.as_deref())
                .await
                .ok()
        }
        Some(_) | None => None,
    };
111

112
113
    // Create the engine matching `out`
    let engine_config = match out_opt {
114
115
116
117
118
119
120
121
122
123
124
        Output::EchoFull => {
            let Some(model_name) = model_name else {
                anyhow::bail!(
                    "Pass --model-name or --model-path so we know which model to imitate"
                );
            };
            EngineConfig::StaticFull {
                service_name: model_name,
                engine: output::echo_full::make_engine_full(),
            }
        }
125
126
127
128
129
130
131
132
133
134
135
136
137
        Output::EchoCore => {
            let Some(mut card) = maybe_card.clone() else {
                anyhow::bail!(
                    "out=echo_core need to find the tokenizer. Pass flag --model-path <path>"
                );
            };
            card.requires_preprocessing = true;
            EngineConfig::StaticCore {
                service_name: card.service_name.clone(),
                engine: output::echo_core::make_engine_core(),
                card: Box::new(card),
            }
        }
138
        Output::Endpoint(path) => {
139
140
            let endpoint: Endpoint = path.parse()?;

141
142
143
144
            // This will attempt to connect to NATS and etcd
            let distributed_runtime = DistributedRuntime::from_settings(runtime.clone()).await?;

            let client = distributed_runtime
145
146
147
                .namespace(endpoint.namespace)?
                .component(endpoint.component)?
                .endpoint(endpoint.name)
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
                .client::<ChatCompletionRequest, Annotated<ChatCompletionResponseDelta>>()
                .await?;

            tracing::info!("Waiting for remote {}...", client.path());
            tokio::select! {
                _ = cancel_token.cancelled() => {
                    return Ok(());
                }
                r = client.wait_for_endpoints() => {
                    r?;
                }
            }

            EngineConfig::Dynamic(client)
        }
163
164
165
166
167
168
169
170
171
172
        #[cfg(feature = "mistralrs")]
        Output::MistralRs => {
            let Some(model_path) = model_path else {
                anyhow::bail!("out=mistralrs requires flag --model-path=<full-path-to-model-gguf>");
            };
            let Some(model_name) = model_name else {
                unreachable!("We checked model_path earlier, and set model_name from model_path");
            };
            EngineConfig::StaticFull {
                service_name: model_name,
173
174
                engine: triton_distributed_llm::engines::mistralrs::make_engine(&model_path)
                    .await?,
175
176
            }
        }
177
178
179
180
    };

    match in_opt {
        Input::Http => {
181
            crate::input::http::run(runtime.clone(), flags.http_port, engine_config).await?;
182
183
184
185
        }
        Input::Text => {
            crate::input::text::run(cancel_token.clone(), engine_config).await?;
        }
186
187
188
        Input::Endpoint(path) => {
            crate::input::endpoint::run(runtime.clone(), path, engine_config).await?;
        }
189
190
191
192
    }

    Ok(())
}