discovery.rs 6.05 KB
Newer Older
Ryan Olson's avatar
Ryan Olson committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

Ryan Olson's avatar
Ryan Olson committed
16
use std::sync::Arc;
17

Ryan Olson's avatar
Ryan Olson committed
18
use serde::{Deserialize, Serialize};
19
use tokio::sync::mpsc::Receiver;
Ryan Olson's avatar
Ryan Olson committed
20

Neelay Shah's avatar
Neelay Shah committed
21
use dynamo_runtime::{
Ryan Olson's avatar
Ryan Olson committed
22
    protocols::{self, annotated::Annotated},
23
    raise,
Ryan Olson's avatar
Ryan Olson committed
24
    transports::etcd::{KeyValue, WatchEvent},
25
    DistributedRuntime,
26
};
Ryan Olson's avatar
Ryan Olson committed
27
28

use super::ModelManager;
29
use crate::model_type::ModelType;
Ryan Olson's avatar
Ryan Olson committed
30
use crate::protocols::openai::chat_completions::{
31
    NvCreateChatCompletionRequest, NvCreateChatCompletionStreamResponse,
32
};
33
34
use crate::protocols::openai::completions::{CompletionRequest, CompletionResponse};
use tracing;
Ryan Olson's avatar
Ryan Olson committed
35
36
37
38
39
40
/// [ModelEntry] is a struct that contains the information for the HTTP service to discover models
/// from the etcd cluster.
#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
pub struct ModelEntry {
    /// Public name of the model
    /// This will be used to identify the model in the HTTP service and the value used in an
41
    /// an [OAI ChatRequest][crate::protocols::openai::chat_completions::NvCreateChatCompletionRequest].
Ryan Olson's avatar
Ryan Olson committed
42
43
44
45
    pub name: String,

    /// Component of the endpoint.
    pub endpoint: protocols::Endpoint,
46
47
48

    /// Specifies whether the model is a chat or completion model.s
    pub model_type: ModelType,
Ryan Olson's avatar
Ryan Olson committed
49
50
51
52
}

pub struct ModelWatchState {
    pub prefix: String,
53
    pub model_type: ModelType,
Ryan Olson's avatar
Ryan Olson committed
54
55
56
57
    pub manager: ModelManager,
    pub drt: DistributedRuntime,
}

58
pub async fn model_watcher(state: Arc<ModelWatchState>, mut events_rx: Receiver<WatchEvent>) {
59
    tracing::debug!("model watcher started");
Ryan Olson's avatar
Ryan Olson committed
60
61
62

    while let Some(event) = events_rx.recv().await {
        match event {
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
            WatchEvent::Put(kv) => {
                let key = match kv.key_str() {
                    Ok(key) => key,
                    Err(err) => {
                        tracing::error!(%err, ?kv, "Invalid UTF8 in model key");
                        continue;
                    }
                };
                tracing::debug!(key, "adding model");

                // model_entry.name is the service name (e.g. "Llama-3.2-3B-Instruct")
                let model_entry = match serde_json::from_slice::<ModelEntry>(kv.value()) {
                    Ok(model_entry) => model_entry,
                    Err(err) => {
                        tracing::error!(%err, ?kv, "Invalid JSON in model entry");
                        continue;
                    }
                };
                if state.manager.has_model_any(&model_entry.name) {
                    tracing::trace!(
                        service_name = model_entry.name,
                        "New endpoint for existing model"
                    );
                    continue;
Ryan Olson's avatar
Ryan Olson committed
87
                }
88
89
90
91
92
93
94
95

                match handle_put(model_entry, state.clone()).await {
                    Ok((model_name, model_type)) => {
                        tracing::info!("added {} model: {}", model_type, model_name);
                    }
                    Err(e) => {
                        tracing::error!("error adding model: {}", e);
                    }
Ryan Olson's avatar
Ryan Olson committed
96
                }
97
            }
Ryan Olson's avatar
Ryan Olson committed
98
            WatchEvent::Delete(kv) => match handle_delete(&kv, state.clone()).await {
99
100
                Ok((model_name, model_type)) => {
                    tracing::info!("removed {} model: {}", model_type, model_name);
Ryan Olson's avatar
Ryan Olson committed
101
102
                }
                Err(e) => {
103
                    tracing::error!("error removing model: {}", e);
Ryan Olson's avatar
Ryan Olson committed
104
105
106
107
108
109
                }
            },
        }
    }
}

110
111
112
113
async fn handle_delete(
    kv: &KeyValue,
    state: Arc<ModelWatchState>,
) -> anyhow::Result<(&str, ModelType)> {
Ryan Olson's avatar
Ryan Olson committed
114
    let key = kv.key_str()?;
115
    tracing::debug!(key, "removing model");
Ryan Olson's avatar
Ryan Olson committed
116
117

    let model_name = key.trim_start_matches(&state.prefix);
118
119
120
121
122
123
124

    match state.model_type {
        ModelType::Chat => state.manager.remove_chat_completions_model(model_name)?,
        ModelType::Completion => state.manager.remove_completions_model(model_name)?,
    };

    Ok((model_name, state.model_type))
Ryan Olson's avatar
Ryan Olson committed
125
126
127
128
129
130
}

// Handles a PUT event from etcd, this usually means adding a new model to the list of served
// models.
//
// If this method errors, for the near term, we will delete the offending key.
131
132
133
134
async fn handle_put(
    model_entry: ModelEntry,
    state: Arc<ModelWatchState>,
) -> anyhow::Result<(String, ModelType)> {
135
136
137
138
139
140
141
    if model_entry.model_type != state.model_type {
        raise!(
            "model type mismatch: {} != {}",
            model_entry.model_type,
            state.model_type
        );
    }
Ryan Olson's avatar
Ryan Olson committed
142

143
144
145
146
147
148
149
    match state.model_type {
        ModelType::Chat => {
            let client = state
                .drt
                .namespace(model_entry.endpoint.namespace)?
                .component(model_entry.endpoint.component)?
                .endpoint(model_entry.endpoint.name)
150
                .client::<NvCreateChatCompletionRequest, Annotated<NvCreateChatCompletionStreamResponse>>()
151
152
153
                .await?;
            state
                .manager
154
                .add_chat_completions_model(&model_entry.name, Arc::new(client))?;
155
156
157
158
159
160
161
162
163
164
165
        }
        ModelType::Completion => {
            let client = state
                .drt
                .namespace(model_entry.endpoint.namespace)?
                .component(model_entry.endpoint.component)?
                .endpoint(model_entry.endpoint.name)
                .client::<CompletionRequest, Annotated<CompletionResponse>>()
                .await?;
            state
                .manager
166
                .add_completions_model(&model_entry.name, Arc::new(client))?;
167
168
        }
    }
Ryan Olson's avatar
Ryan Olson committed
169

170
    Ok((model_entry.name, state.model_type))
Ryan Olson's avatar
Ryan Olson committed
171
}