discovery.rs 4.75 KB
Newer Older
Ryan Olson's avatar
Ryan Olson committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

Ryan Olson's avatar
Ryan Olson committed
16
use std::sync::Arc;
17

Ryan Olson's avatar
Ryan Olson committed
18
use serde::{Deserialize, Serialize};
19
use tokio::sync::mpsc::Receiver;
Ryan Olson's avatar
Ryan Olson committed
20

Neelay Shah's avatar
Neelay Shah committed
21
use triton_distributed_runtime::{
Ryan Olson's avatar
Ryan Olson committed
22
23
24
    protocols::{self, annotated::Annotated},
    transports::etcd::{KeyValue, WatchEvent},
    DistributedRuntime, Result,
25
};
Ryan Olson's avatar
Ryan Olson committed
26
27
28
29

use super::ModelManager;
use crate::protocols::openai::chat_completions::{
    ChatCompletionRequest, ChatCompletionResponseDelta,
30
};
Ryan Olson's avatar
Ryan Olson committed
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51

/// [ModelEntry] is a struct that contains the information for the HTTP service to discover models
/// from the etcd cluster.
#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
pub struct ModelEntry {
    /// Public name of the model
    /// This will be used to identify the model in the HTTP service and the value used in an
    /// an [OAI ChatRequest][crate::protocols::openai::chat_completions::ChatCompletionRequest].
    pub name: String,

    /// Component of the endpoint.
    pub endpoint: protocols::Endpoint,
}

pub struct ModelWatchState {
    pub prefix: String,
    pub manager: ModelManager,
    pub drt: DistributedRuntime,
}

pub async fn model_watcher(state: Arc<ModelWatchState>, events_rx: Receiver<WatchEvent>) {
52
    tracing::debug!("model watcher started");
Ryan Olson's avatar
Ryan Olson committed
53
54
55
56
57
58
59

    let mut events_rx = events_rx;

    while let Some(event) = events_rx.recv().await {
        match event {
            WatchEvent::Put(kv) => match handle_put(&kv, state.clone()).await {
                Ok(model_name) => {
60
                    tracing::info!("added chat model: {}", model_name);
Ryan Olson's avatar
Ryan Olson committed
61
62
                }
                Err(e) => {
63
64
                    tracing::error!("error adding chat model: {}", e);
                    // tracing::warn!(
Ryan Olson's avatar
Ryan Olson committed
65
66
67
68
                    //     "deleting offending key: {}",
                    //     kv.key_str().unwrap_or_default()
                    // );
                    // if let Err(e) = kv_client.delete(kv.key(), None).await {
69
                    //     tracing::error!("failed to delete offending key: {}", e);
Ryan Olson's avatar
Ryan Olson committed
70
71
72
73
74
                    // }
                }
            },
            WatchEvent::Delete(kv) => match handle_delete(&kv, state.clone()).await {
                Ok(model_name) => {
75
                    tracing::info!("removed chat model: {}", model_name);
Ryan Olson's avatar
Ryan Olson committed
76
77
                }
                Err(e) => {
78
                    tracing::error!("error removing chat model: {}", e);
Ryan Olson's avatar
Ryan Olson committed
79
80
81
82
83
                }
            },
        }
    }

84
    tracing::debug!("model watcher stopped");
Ryan Olson's avatar
Ryan Olson committed
85
86
87
}

async fn handle_delete(kv: &KeyValue, state: Arc<ModelWatchState>) -> Result<String> {
88
    tracing::debug!("removing model");
Ryan Olson's avatar
Ryan Olson committed
89
90

    let key = kv.key_str()?;
91
    tracing::debug!("key: {}", key);
Ryan Olson's avatar
Ryan Olson committed
92
93
94
95
96
97
98
99
100
101
102

    let model_name = key.trim_start_matches(&state.prefix);
    state.manager.remove_chat_completions_model(model_name)?;
    Ok(model_name.to_string())
}

// Handles a PUT event from etcd, this usually means adding a new model to the list of served
// models.
//
// If this method errors, for the near term, we will delete the offending key.
async fn handle_put(kv: &KeyValue, state: Arc<ModelWatchState>) -> Result<String> {
103
    tracing::debug!("adding model");
Ryan Olson's avatar
Ryan Olson committed
104
105

    let key = kv.key_str()?;
106
    tracing::debug!("key: {}", key);
Ryan Olson's avatar
Ryan Olson committed
107

108
    //let model_name = key.trim_start_matches(&state.prefix);
Ryan Olson's avatar
Ryan Olson committed
109
110
    let model_entry = serde_json::from_slice::<ModelEntry>(kv.value())?;

111
    /*
Ryan Olson's avatar
Ryan Olson committed
112
113
114
115
116
117
118
119
120
    // this means there is an entry in etcd that breaks the contract that the key
    // in the models path must match the model name in the entry.
    if model_entry.name != model_name {
        raise!(
            "model name mismatch: {} != {}",
            model_entry.name,
            model_name
        );
    }
121
    */
Ryan Olson's avatar
Ryan Olson committed
122
123
124
125
126
127
128
129
130
131
132

    let client = state
        .drt
        .namespace(model_entry.endpoint.namespace)?
        .component(model_entry.endpoint.component)?
        .endpoint(model_entry.endpoint.name)
        .client::<ChatCompletionRequest, Annotated<ChatCompletionResponseDelta>>()
        .await?;

    let client = Arc::new(client);

133
134
    let model_name = model_entry.name.clone();
    tracing::info!("New model registered: {model_name}");
Ryan Olson's avatar
Ryan Olson committed
135
136
    state
        .manager
137
        .add_chat_completions_model(&model_name, client)?;
Ryan Olson's avatar
Ryan Olson committed
138
139
140

    Ok(model_name.to_string())
}