discovery.rs 5.18 KB
Newer Older
Ryan Olson's avatar
Ryan Olson committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

Ryan Olson's avatar
Ryan Olson committed
16
use std::sync::Arc;
17

Ryan Olson's avatar
Ryan Olson committed
18
use serde::{Deserialize, Serialize};
19
use tokio::sync::mpsc::Receiver;
Ryan Olson's avatar
Ryan Olson committed
20

Neelay Shah's avatar
Neelay Shah committed
21
use dynamo_runtime::{
Ryan Olson's avatar
Ryan Olson committed
22
    protocols::{self, annotated::Annotated},
23
    raise,
Ryan Olson's avatar
Ryan Olson committed
24
25
    transports::etcd::{KeyValue, WatchEvent},
    DistributedRuntime, Result,
26
};
Ryan Olson's avatar
Ryan Olson committed
27
28

use super::ModelManager;
29
use crate::model_type::ModelType;
Ryan Olson's avatar
Ryan Olson committed
30
use crate::protocols::openai::chat_completions::{
31
    NvCreateChatCompletionRequest, NvCreateChatCompletionStreamResponse,
32
};
33
34
use crate::protocols::openai::completions::{CompletionRequest, CompletionResponse};
use tracing;
Ryan Olson's avatar
Ryan Olson committed
35
36
37
38
39
40
41
42
43
44
45
/// [ModelEntry] is a struct that contains the information for the HTTP service to discover models
/// from the etcd cluster.
#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
pub struct ModelEntry {
    /// Public name of the model
    /// This will be used to identify the model in the HTTP service and the value used in an
    /// an [OAI ChatRequest][crate::protocols::openai::chat_completions::ChatCompletionRequest].
    pub name: String,

    /// Component of the endpoint.
    pub endpoint: protocols::Endpoint,
46
47
48

    /// Specifies whether the model is a chat or completion model.s
    pub model_type: ModelType,
Ryan Olson's avatar
Ryan Olson committed
49
50
51
52
}

pub struct ModelWatchState {
    pub prefix: String,
53
    pub model_type: ModelType,
Ryan Olson's avatar
Ryan Olson committed
54
55
56
57
    pub manager: ModelManager,
    pub drt: DistributedRuntime,
}

58
pub async fn model_watcher(state: Arc<ModelWatchState>, mut events_rx: Receiver<WatchEvent>) {
59
    tracing::debug!("model watcher started");
Ryan Olson's avatar
Ryan Olson committed
60
61
62
63

    while let Some(event) = events_rx.recv().await {
        match event {
            WatchEvent::Put(kv) => match handle_put(&kv, state.clone()).await {
64
65
                Ok((model_name, model_type)) => {
                    tracing::info!("added {} model: {}", model_type, model_name);
Ryan Olson's avatar
Ryan Olson committed
66
67
                }
                Err(e) => {
68
                    tracing::error!("error adding model: {}", e);
Ryan Olson's avatar
Ryan Olson committed
69
70
71
                }
            },
            WatchEvent::Delete(kv) => match handle_delete(&kv, state.clone()).await {
72
73
                Ok((model_name, model_type)) => {
                    tracing::info!("removed {} model: {}", model_type, model_name);
Ryan Olson's avatar
Ryan Olson committed
74
75
                }
                Err(e) => {
76
                    tracing::error!("error removing model: {}", e);
Ryan Olson's avatar
Ryan Olson committed
77
78
79
80
81
82
                }
            },
        }
    }
}

83
async fn handle_delete(kv: &KeyValue, state: Arc<ModelWatchState>) -> Result<(&str, ModelType)> {
Ryan Olson's avatar
Ryan Olson committed
84
    let key = kv.key_str()?;
85
    tracing::debug!(key, "removing model");
Ryan Olson's avatar
Ryan Olson committed
86
87

    let model_name = key.trim_start_matches(&state.prefix);
88
89
90
91
92
93
94

    match state.model_type {
        ModelType::Chat => state.manager.remove_chat_completions_model(model_name)?,
        ModelType::Completion => state.manager.remove_completions_model(model_name)?,
    };

    Ok((model_name, state.model_type))
Ryan Olson's avatar
Ryan Olson committed
95
96
97
98
99
100
}

// Handles a PUT event from etcd, this usually means adding a new model to the list of served
// models.
//
// If this method errors, for the near term, we will delete the offending key.
101
async fn handle_put(kv: &KeyValue, state: Arc<ModelWatchState>) -> Result<(String, ModelType)> {
Ryan Olson's avatar
Ryan Olson committed
102
    let key = kv.key_str()?;
103
    tracing::debug!(key, "adding model");
Ryan Olson's avatar
Ryan Olson committed
104

105
    // model_entry.name is the service name (e.g. "Llama-3.2-3B-Instruct")
Ryan Olson's avatar
Ryan Olson committed
106
    let model_entry = serde_json::from_slice::<ModelEntry>(kv.value())?;
107
    let service_name = model_entry.name.clone();
Ryan Olson's avatar
Ryan Olson committed
108

109
110
111
112
113
114
115
    if model_entry.model_type != state.model_type {
        raise!(
            "model type mismatch: {} != {}",
            model_entry.model_type,
            state.model_type
        );
    }
Ryan Olson's avatar
Ryan Olson committed
116

117
118
119
120
121
122
123
    match state.model_type {
        ModelType::Chat => {
            let client = state
                .drt
                .namespace(model_entry.endpoint.namespace)?
                .component(model_entry.endpoint.component)?
                .endpoint(model_entry.endpoint.name)
124
                .client::<NvCreateChatCompletionRequest, Annotated<NvCreateChatCompletionStreamResponse>>()
125
126
127
                .await?;
            state
                .manager
128
                .add_chat_completions_model(&service_name, Arc::new(client))?;
129
130
131
132
133
134
135
136
137
138
139
        }
        ModelType::Completion => {
            let client = state
                .drt
                .namespace(model_entry.endpoint.namespace)?
                .component(model_entry.endpoint.component)?
                .endpoint(model_entry.endpoint.name)
                .client::<CompletionRequest, Annotated<CompletionResponse>>()
                .await?;
            state
                .manager
140
                .add_completions_model(&service_name, Arc::new(client))?;
141
142
        }
    }
Ryan Olson's avatar
Ryan Olson committed
143

144
    Ok((service_name, state.model_type))
Ryan Olson's avatar
Ryan Olson committed
145
}