lib.rs 5.54 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

16
17
use std::path::PathBuf;

Neelay Shah's avatar
Neelay Shah committed
18
19
use triton_distributed_runtime::{component::Client, DistributedRuntime};
use triton_distributed_llm::types::{
20
21
22
23
24
    openai::chat_completions::{
        ChatCompletionRequest, ChatCompletionResponseDelta, OpenAIChatCompletionsStreamingEngine,
    },
    Annotated,
};
25
26
27
28
29
30

mod input;
mod opt;
mod output;
pub use opt::{Input, Output};

31
32
33
34
35
/// How we identify a namespace/component/endpoint URL.
/// Technically the '://' is not part of the scheme but it eliminates several string
/// concatenations.
const ENDPOINT_SCHEME: &str = "tdr://";

36
37
38
39
/// Required options depend on the in and out choices
#[derive(clap::Parser, Debug, Clone)]
#[command(version, about, long_about = None)]
pub struct Flags {
40
41
42
43
44
45
46
47
48
    /// Full path to the model, which can be either a GGUF file or a checked out HF repository.
    /// For the `echo_full` engine omit the flag.
    #[arg(index = 1)]
    pub model_path_pos: Option<PathBuf>,

    // `--model-path`. The one above is `tio <positional-model-path>`
    #[arg(long = "model-path")]
    pub model_path_flag: Option<PathBuf>,

49
50
51
52
53
54
    /// HTTP port. `in=http` only
    #[arg(long, default_value = "8080")]
    pub http_port: u16,

    /// The name of the model we are serving
    #[arg(long)]
55
    pub model_name: Option<String>,
56
57
58
}

pub enum EngineConfig {
59
60
61
62
    /// An remote networked engine we don't know about yet
    /// We don't have the pre-processor yet so this is only text requests. Type will change later.
    Dynamic(Client<ChatCompletionRequest, Annotated<ChatCompletionResponseDelta>>),

63
64
65
66
67
68
69
70
    /// A Full service engine does it's own tokenization and prompt formatting.
    StaticFull {
        service_name: String,
        engine: OpenAIChatCompletionsStreamingEngine,
    },
}

pub async fn run(
Neelay Shah's avatar
Neelay Shah committed
71
    runtime: triton_distributed_runtime::Runtime,
72
73
74
75
    in_opt: Input,
    out_opt: Output,
    flags: Flags,
) -> anyhow::Result<()> {
76
77
    let cancel_token = runtime.primary_token();

78
    // Turn relative paths into absolute paths
79
80
81
82
    let model_path = flags
        .model_path_pos
        .or(flags.model_path_flag)
        .and_then(|p| p.canonicalize().ok());
83
    // Serve the model under the name provided, or the name of the GGUF file.
84
85
86
87
88
89
    let model_name = flags.model_name.or_else(|| {
        model_path
            .as_ref()
            .and_then(|p| p.iter().last())
            .map(|n| n.to_string_lossy().into_owned())
    });
90

91
92
    // Create the engine matching `out`
    let engine_config = match out_opt {
93
94
95
96
97
98
99
100
101
102
103
        Output::EchoFull => {
            let Some(model_name) = model_name else {
                anyhow::bail!(
                    "Pass --model-name or --model-path so we know which model to imitate"
                );
            };
            EngineConfig::StaticFull {
                service_name: model_name,
                engine: output::echo_full::make_engine_full(),
            }
        }
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
        Output::Endpoint(path) => {
            let elements: Vec<&str> = path.split('/').collect();
            if elements.len() != 3 {
                anyhow::bail!("An endpoint URL must have format {ENDPOINT_SCHEME}namespace/component/endpoint");
            }
            // This will attempt to connect to NATS and etcd
            let distributed_runtime = DistributedRuntime::from_settings(runtime.clone()).await?;

            let client = distributed_runtime
                .namespace(elements[0])?
                .component(elements[1])?
                .endpoint(elements[2])
                .client::<ChatCompletionRequest, Annotated<ChatCompletionResponseDelta>>()
                .await?;

            tracing::info!("Waiting for remote {}...", client.path());
            tokio::select! {
                _ = cancel_token.cancelled() => {
                    return Ok(());
                }
                r = client.wait_for_endpoints() => {
                    r?;
                }
            }

            EngineConfig::Dynamic(client)
        }
131
132
133
134
135
136
137
138
139
140
        #[cfg(feature = "mistralrs")]
        Output::MistralRs => {
            let Some(model_path) = model_path else {
                anyhow::bail!("out=mistralrs requires flag --model-path=<full-path-to-model-gguf>");
            };
            let Some(model_name) = model_name else {
                unreachable!("We checked model_path earlier, and set model_name from model_path");
            };
            EngineConfig::StaticFull {
                service_name: model_name,
Neelay Shah's avatar
Neelay Shah committed
141
                engine: triton_distributed_llm::engines::mistralrs::make_engine(&model_path).await?,
142
143
            }
        }
144
145
146
147
    };

    match in_opt {
        Input::Http => {
148
            crate::input::http::run(runtime.clone(), flags.http_port, engine_config).await?;
149
150
151
152
        }
        Input::Text => {
            crate::input::text::run(cancel_token.clone(), engine_config).await?;
        }
153
154
155
        Input::Endpoint(path) => {
            crate::input::endpoint::run(runtime.clone(), path, engine_config).await?;
        }
156
157
158
159
    }

    Ok(())
}