Unverified Commit 8e4d81f3 authored by Paul Hendricks's avatar Paul Hendricks Committed by GitHub
Browse files

refactor: Switch ModelManager locks from `std::sync::Mutex` to `parking_lot::Mutex` (#2696)

parent a24221d4
...@@ -1999,6 +1999,7 @@ dependencies = [ ...@@ -1999,6 +1999,7 @@ dependencies = [
"nixl-sys", "nixl-sys",
"offset-allocator", "offset-allocator",
"oneshot", "oneshot",
"parking_lot",
"prometheus", "prometheus",
"proptest", "proptest",
"rand 0.9.2", "rand 0.9.2",
......
...@@ -64,6 +64,7 @@ hf-hub = { workspace = true } ...@@ -64,6 +64,7 @@ hf-hub = { workspace = true }
humantime = { workspace = true } # input/batch humantime = { workspace = true } # input/batch
rand = { workspace = true } rand = { workspace = true }
oneshot = { workspace = true } oneshot = { workspace = true }
parking_lot = "0.12.4"
prometheus = { workspace = true } prometheus = { workspace = true }
serde = { workspace = true } serde = { workspace = true }
serde_json = { workspace = true } serde_json = { workspace = true }
......
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. // SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0 // SPDX-License-Identifier: Apache-2.0
use std::{
collections::{HashMap, HashSet},
sync::{Arc, RwLock},
};
use parking_lot::Mutex;
use dynamo_runtime::component::Component; use dynamo_runtime::component::Component;
use dynamo_runtime::prelude::DistributedRuntimeProvider; use dynamo_runtime::prelude::DistributedRuntimeProvider;
use dynamo_runtime::slug::Slug; use dynamo_runtime::slug::Slug;
use crate::discovery::ModelEntry; use crate::discovery::ModelEntry;
use crate::kv_router::{KvRouterConfig, scheduler::DefaultWorkerSelector}; use crate::kv_router::{KvRouterConfig, scheduler::DefaultWorkerSelector};
use crate::{ use crate::{
kv_router::KvRouter, kv_router::KvRouter,
...@@ -15,12 +21,6 @@ use crate::{ ...@@ -15,12 +21,6 @@ use crate::{
completions::OpenAICompletionsStreamingEngine, embeddings::OpenAIEmbeddingsStreamingEngine, completions::OpenAICompletionsStreamingEngine, embeddings::OpenAIEmbeddingsStreamingEngine,
}, },
}; };
use std::collections::HashSet;
use std::sync::RwLock;
use std::{
collections::HashMap,
sync::{Arc, Mutex},
};
#[derive(Debug, thiserror::Error)] #[derive(Debug, thiserror::Error)]
pub enum ModelManagerError { pub enum ModelManagerError {
...@@ -61,7 +61,7 @@ impl ModelManager { ...@@ -61,7 +61,7 @@ impl ModelManager {
} }
pub fn get_model_entries(&self) -> Vec<ModelEntry> { pub fn get_model_entries(&self) -> Vec<ModelEntry> {
self.entries.lock().unwrap().values().cloned().collect() self.entries.lock().values().cloned().collect()
} }
pub fn has_model_any(&self, model: &str) -> bool { pub fn has_model_any(&self, model: &str) -> bool {
...@@ -170,12 +170,12 @@ impl ModelManager { ...@@ -170,12 +170,12 @@ impl ModelManager {
/// Save a ModelEntry under an instance's etcd `models/` key so we can fetch it later when the key is /// Save a ModelEntry under an instance's etcd `models/` key so we can fetch it later when the key is
/// deleted from etcd. /// deleted from etcd.
pub fn save_model_entry(&self, key: &str, entry: ModelEntry) { pub fn save_model_entry(&self, key: &str, entry: ModelEntry) {
self.entries.lock().unwrap().insert(key.to_string(), entry); self.entries.lock().insert(key.to_string(), entry);
} }
/// Remove and return model entry for this instance's etcd key. We do this when the instance stops. /// Remove and return model entry for this instance's etcd key. We do this when the instance stops.
pub fn remove_model_entry(&self, key: &str) -> Option<ModelEntry> { pub fn remove_model_entry(&self, key: &str) -> Option<ModelEntry> {
self.entries.lock().unwrap().remove(key) self.entries.lock().remove(key)
} }
pub async fn kv_chooser_for( pub async fn kv_chooser_for(
...@@ -203,7 +203,7 @@ impl ModelManager { ...@@ -203,7 +203,7 @@ impl ModelManager {
} }
fn get_kv_chooser(&self, model_name: &str) -> Option<Arc<KvRouter>> { fn get_kv_chooser(&self, model_name: &str) -> Option<Arc<KvRouter>> {
self.kv_choosers.lock().unwrap().get(model_name).cloned() self.kv_choosers.lock().get(model_name).cloned()
} }
/// Create and return a KV chooser for this component and model /// Create and return a KV chooser for this component and model
...@@ -242,21 +242,18 @@ impl ModelManager { ...@@ -242,21 +242,18 @@ impl ModelManager {
let new_kv_chooser = Arc::new(chooser); let new_kv_chooser = Arc::new(chooser);
self.kv_choosers self.kv_choosers
.lock() .lock()
.unwrap()
.insert(model_name.to_string(), new_kv_chooser.clone()); .insert(model_name.to_string(), new_kv_chooser.clone());
Ok(new_kv_chooser) Ok(new_kv_chooser)
} }
pub fn get_model_tool_call_parser(&self, model: &str) -> Option<String> { pub fn get_model_tool_call_parser(&self, model: &str) -> Option<String> {
match self.entries.lock() { self.entries
Ok(entries) => entries .lock()
.values() .values()
.find(|entry| entry.name == model) .find(|entry| entry.name == model)
.and_then(|entry| entry.runtime_config.as_ref()) .and_then(|entry| entry.runtime_config.as_ref())
.and_then(|config| config.tool_call_parser.clone()) .and_then(|config| config.tool_call_parser.clone())
.map(|parser| parser.to_string()), .map(|parser| parser.to_string())
Err(_) => None,
}
} }
} }
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment