Unverified Commit 9ae98ed7 authored by Graham King's avatar Graham King Committed by GitHub
Browse files

refactor(runtime): Replace std::sync::Mutex with parking_lot::Mutex (#3740)


Signed-off-by: default avatarGraham King <grahamk@nvidia.com>
parent 2a24e4aa
......@@ -2297,6 +2297,7 @@ dependencies = [
"opentelemetry",
"opentelemetry-otlp",
"opentelemetry_sdk",
"parking_lot",
"prometheus",
"rand 0.9.2",
"rayon",
......@@ -4295,11 +4296,10 @@ dependencies = [
[[package]]
name = "lock_api"
version = "0.4.13"
version = "0.4.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "96936507f153605bddfcda068dd804796c84324ed2510809e5b2a624c81da765"
checksum = "224399e74b87b5f3557511d98dff8b14089b3dadafcab6bb93eab67d3aace965"
dependencies = [
"autocfg",
"scopeguard",
]
......@@ -5615,9 +5615,9 @@ dependencies = [
[[package]]
name = "parking_lot"
version = "0.12.4"
version = "0.12.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "70d58bf43669b5795d1576d0641cfb6fbb2057bf629506267a92807158584a13"
checksum = "93857453250e3077bd71ff98b6a65ea6621a19bb0f559a85248955ac12c45a1a"
dependencies = [
"lock_api",
"parking_lot_core",
......@@ -5625,15 +5625,15 @@ dependencies = [
[[package]]
name = "parking_lot_core"
version = "0.9.11"
version = "0.9.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bc838d2a56b5b1a6c25f55575dfc605fabb63bb2365f6c2353ef9159aa69e4a5"
checksum = "2621685985a2ebf1c516881c026032ac7deafcda1a2c9b7850dc81e3dfcb64c1"
dependencies = [
"cfg-if 1.0.3",
"libc",
"redox_syscall",
"smallvec",
"windows-targets 0.52.6",
"windows-link 0.2.0",
]
[[package]]
......
......@@ -81,6 +81,7 @@ modelexpress-common = { version = "0.2.0" }
humantime = { version = "2.2.0" }
libc = { version = "0.2" }
oneshot = { version = "0.1.11", features = ["std", "async"] }
parking_lot = "0.12.5"
prometheus = { version = "0.14" }
rand = { version = "0.9.0" }
reqwest = { version = "0.12.22", default-features = false, features = [
......
......@@ -805,7 +805,7 @@ dependencies = [
"num-traits",
"serde",
"wasm-bindgen",
"windows-link",
"windows-link 0.1.3",
]
[[package]]
......@@ -1622,6 +1622,7 @@ dependencies = [
"opentelemetry",
"opentelemetry-otlp",
"opentelemetry_sdk",
"parking_lot",
"prometheus",
"rand 0.9.2",
"rayon",
......@@ -3203,11 +3204,10 @@ dependencies = [
[[package]]
name = "lock_api"
version = "0.4.13"
version = "0.4.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "96936507f153605bddfcda068dd804796c84324ed2510809e5b2a624c81da765"
checksum = "224399e74b87b5f3557511d98dff8b14089b3dadafcab6bb93eab67d3aace965"
dependencies = [
"autocfg",
"scopeguard",
]
......@@ -4060,9 +4060,9 @@ checksum = "f38d5652c16fde515bb1ecef450ab0f6a219d619a7274976324d5e377f7dceba"
[[package]]
name = "parking_lot"
version = "0.12.4"
version = "0.12.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "70d58bf43669b5795d1576d0641cfb6fbb2057bf629506267a92807158584a13"
checksum = "93857453250e3077bd71ff98b6a65ea6621a19bb0f559a85248955ac12c45a1a"
dependencies = [
"lock_api",
"parking_lot_core",
......@@ -4070,15 +4070,15 @@ dependencies = [
[[package]]
name = "parking_lot_core"
version = "0.9.11"
version = "0.9.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bc838d2a56b5b1a6c25f55575dfc605fabb63bb2365f6c2353ef9159aa69e4a5"
checksum = "2621685985a2ebf1c516881c026032ac7deafcda1a2c9b7850dc81e3dfcb64c1"
dependencies = [
"cfg-if 1.0.3",
"libc",
"redox_syscall",
"smallvec",
"windows-targets 0.52.6",
"windows-link 0.2.1",
]
[[package]]
......@@ -7205,7 +7205,7 @@ checksum = "c0fdd3ddb90610c7638aa2b3a3ab2904fb9e5cdbecc643ddb3647212781c4ae3"
dependencies = [
"windows-implement",
"windows-interface",
"windows-link",
"windows-link 0.1.3",
"windows-result",
"windows-strings",
]
......@@ -7238,13 +7238,19 @@ version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5e6ad25900d524eaabdbbb96d20b4311e1e7ae1699af4fb28c17ae66c80d798a"
[[package]]
name = "windows-link"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f0805222e57f7521d6a62e36fa9163bc891acd422f971defe97d64e70d0a4fe5"
[[package]]
name = "windows-registry"
version = "0.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5b8a9ed28765efc97bbc954883f4e6796c33a06546ebafacbabee9696967499e"
dependencies = [
"windows-link",
"windows-link 0.1.3",
"windows-result",
"windows-strings",
]
......@@ -7255,7 +7261,7 @@ version = "0.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "56f42bd332cc6c8eac5af113fc0c1fd6a8fd2aa08a0119358686e5160d0586c6"
dependencies = [
"windows-link",
"windows-link 0.1.3",
]
[[package]]
......@@ -7264,7 +7270,7 @@ version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "56e6c93f3a0c3b36176cb1327a4958a0353d5d166c2a35cb268ace15e91d3b57"
dependencies = [
"windows-link",
"windows-link 0.1.3",
]
[[package]]
......@@ -7316,7 +7322,7 @@ version = "0.53.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d5fe6031c4041849d7c496a8ded650796e7b6ecc19df1a431c1a363342e5dc91"
dependencies = [
"windows-link",
"windows-link 0.1.3",
"windows_aarch64_gnullvm 0.53.0",
"windows_aarch64_msvc 0.53.0",
"windows_i686_gnu 0.53.0",
......
......@@ -57,7 +57,7 @@ hf-hub = { workspace = true }
humantime = { workspace = true } # input/batch
rand = { workspace = true }
oneshot = { workspace = true }
parking_lot = "0.12.4"
parking_lot = { workspace = true }
prometheus = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
......
......@@ -36,6 +36,7 @@ either = { workspace = true }
etcd-client = { workspace = true }
futures = { workspace = true }
humantime = { workspace = true }
parking_lot = { workspace = true }
prometheus = { workspace = true }
rand = { workspace = true }
serde = { workspace = true }
......@@ -86,4 +87,4 @@ tempfile = { workspace = true }
[[bench]]
name = "compute_pool_overhead"
harness = false
\ No newline at end of file
harness = false
......@@ -387,7 +387,7 @@ dependencies = [
"iana-time-zone",
"num-traits",
"serde",
"windows-link",
"windows-link 0.1.1",
]
[[package]]
......@@ -688,6 +688,7 @@ dependencies = [
"opentelemetry",
"opentelemetry-otlp",
"opentelemetry_sdk",
"parking_lot",
"prometheus",
"rand 0.9.1",
"rayon",
......@@ -1471,11 +1472,10 @@ dependencies = [
[[package]]
name = "lock_api"
version = "0.4.12"
version = "0.4.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "07af8b9cdd281b7915f413fa73f29ebd5d55d0d3f0155584dade1ff18cea1b17"
checksum = "224399e74b87b5f3557511d98dff8b14089b3dadafcab6bb93eab67d3aace965"
dependencies = [
"autocfg",
"scopeguard",
]
......@@ -1797,9 +1797,9 @@ checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39"
[[package]]
name = "parking_lot"
version = "0.12.3"
version = "0.12.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f1bf18183cf54e8d6059647fc3063646a1801cf30896933ec2311622cc4b9a27"
checksum = "93857453250e3077bd71ff98b6a65ea6621a19bb0f559a85248955ac12c45a1a"
dependencies = [
"lock_api",
"parking_lot_core",
......@@ -1807,15 +1807,15 @@ dependencies = [
[[package]]
name = "parking_lot_core"
version = "0.9.10"
version = "0.9.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e401f977ab385c9e4e3ab30627d6f26d00e2c73eef317493c4ec6d468726cf8"
checksum = "2621685985a2ebf1c516881c026032ac7deafcda1a2c9b7850dc81e3dfcb64c1"
dependencies = [
"cfg-if 1.0.0",
"libc",
"redox_syscall",
"smallvec",
"windows-targets",
"windows-link 0.2.1",
]
[[package]]
......@@ -3636,7 +3636,7 @@ checksum = "4763c1de310c86d75a878046489e2e5ba02c649d185f21c67d4cf8a56d098980"
dependencies = [
"windows-implement",
"windows-interface",
"windows-link",
"windows-link 0.1.1",
"windows-result",
"windows-strings",
]
......@@ -3669,13 +3669,19 @@ version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "76840935b766e1b0a05c0066835fb9ec80071d4c09a16f6bd5f7e655e3c14c38"
[[package]]
name = "windows-link"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f0805222e57f7521d6a62e36fa9163bc891acd422f971defe97d64e70d0a4fe5"
[[package]]
name = "windows-result"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c64fd11a4fd95df68efcfee5f44a294fe71b8bc6a91993e2791938abcc712252"
dependencies = [
"windows-link",
"windows-link 0.1.1",
]
[[package]]
......@@ -3684,7 +3690,7 @@ version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7a2ba9642430ee452d5a7aa78d72907ebe8cfda358e8cb7918a2050581322f97"
dependencies = [
"windows-link",
"windows-link 0.1.1",
]
[[package]]
......
......@@ -574,8 +574,8 @@ fn main() -> Result<()> {
let permits = Arc::new(tokio::sync::Semaphore::new(1)); // 2 workers - 1
// Detect number of worker threads
use parking_lot::Mutex;
use std::collections::HashSet;
use std::sync::Mutex;
let thread_ids = Arc::new(Mutex::new(HashSet::new()));
let mut handles = Vec::new();
......@@ -585,7 +585,7 @@ fn main() -> Result<()> {
let ids = Arc::clone(&thread_ids);
let handle = tokio::task::spawn_blocking(move || {
let thread_id = std::thread::current().id();
ids.lock().unwrap().insert(thread_id);
ids.lock().insert(thread_id);
});
handles.push(handle);
}
......@@ -594,7 +594,7 @@ fn main() -> Result<()> {
let _ = handle.await;
}
let num_workers = thread_ids.lock().unwrap().len();
let num_workers = thread_ids.lock().len();
println!(" Detected {} worker threads", num_workers);
// Now initialize thread-local on all workers using a barrier
......
......@@ -85,7 +85,7 @@ pub enum TransportType {
#[derive(Default)]
pub struct RegistryInner {
services: HashMap<String, Service>,
stats_handlers: HashMap<String, Arc<std::sync::Mutex<HashMap<String, EndpointStatsHandler>>>>,
stats_handlers: HashMap<String, Arc<parking_lot::Mutex<HashMap<String, EndpointStatsHandler>>>>,
}
#[derive(Clone)]
......
......@@ -107,7 +107,6 @@ impl EndpointConfigBuilder {
if let Some(stats_handler) = stats_handler {
handler_map
.lock()
.unwrap()
.insert(endpoint.subject_to(lease_id), stats_handler);
}
......@@ -139,7 +138,7 @@ impl EndpointConfigBuilder {
transport: TransportType::NatsTcp(subject.clone()),
};
tracing::debug!(endpoint_name = %endpoint_name, "Registering endpoint health check target");
let guard = system_health.lock().unwrap();
let guard = system_health.lock();
guard.register_health_check_target(
&endpoint_name,
instance,
......
......@@ -5,8 +5,9 @@ use async_nats::service::Service as NatsService;
use async_nats::service::ServiceExt as _;
use derive_builder::Builder;
use derive_getters::Dissolve;
use parking_lot::Mutex;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::sync::Arc;
use crate::component::Component;
......@@ -44,7 +45,7 @@ pub async fn build_nats_service(
.description(description)
.stats_handler(move |name, stats| {
tracing::trace!("stats_handler: {name}, {stats:?}");
let mut guard = stats_handler_registry.lock().unwrap();
let mut guard = stats_handler_registry.lock();
match guard.get_mut(&name) {
Some(handler) => handler(stats),
None => serde_json::Value::Null,
......
......@@ -367,7 +367,7 @@ impl ComputePoolExt for ComputePool {
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Mutex;
use parking_lot::Mutex;
#[tokio::test]
async fn test_compute_pool_execute() {
......
......@@ -70,7 +70,7 @@ impl DistributedRuntime {
let use_endpoint_health_status = config.use_endpoint_health_status.clone();
let health_endpoint_path = config.system_health_path.clone();
let live_endpoint_path = config.system_live_path.clone();
let system_health = Arc::new(std::sync::Mutex::new(SystemHealth::new(
let system_health = Arc::new(parking_lot::Mutex::new(SystemHealth::new(
starting_health_status,
use_endpoint_health_status,
health_endpoint_path,
......@@ -119,7 +119,6 @@ impl DistributedRuntime {
distributed_runtime
.system_health
.lock()
.unwrap()
.initialize_uptime_gauge(&distributed_runtime)?;
// Handle system status server initialization
......@@ -430,7 +429,7 @@ mod tests {
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
// Check that uptime is 50+ ms
let uptime = drt.system_health.lock().unwrap().uptime();
let uptime = drt.system_health.lock().uptime();
assert!(
uptime >= std::time::Duration::from_millis(50),
"Expected uptime to be at least 50ms, but got {:?}",
......@@ -456,7 +455,7 @@ mod tests {
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
// Check that uptime is 50+ ms
let uptime = drt.system_health.lock().unwrap().uptime();
let uptime = drt.system_health.lock().uptime();
assert!(
uptime >= std::time::Duration::from_millis(50),
"Expected uptime to be at least 50ms, but got {:?}",
......
......@@ -8,9 +8,10 @@ use crate::protocols::annotated::Annotated;
use crate::protocols::maybe_error::MaybeError;
use crate::{DistributedRuntime, HealthStatus, SystemHealth};
use futures::StreamExt;
use parking_lot::Mutex;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::task::JoinHandle;
use tokio::time::{MissedTickBehavior, interval};
......@@ -71,7 +72,7 @@ impl HealthCheckManager {
// Check cache first
{
let cache = self.router_cache.lock().unwrap();
let cache = self.router_cache.lock();
if let Some(router) = cache.get(&cache_key) {
return Ok(router.clone());
}
......@@ -90,10 +91,7 @@ impl HealthCheckManager {
);
// Cache it
self.router_cache
.lock()
.unwrap()
.insert(cache_key, router.clone());
self.router_cache.lock().insert(cache_key, router.clone());
Ok(router)
}
......@@ -102,7 +100,7 @@ impl HealthCheckManager {
pub async fn start(self: Arc<Self>) -> anyhow::Result<()> {
// Get all registered endpoints at startup
let targets = {
let system_health = self.drt.system_health.lock().unwrap();
let system_health = self.drt.system_health.lock();
system_health.get_health_check_targets()
};
......@@ -134,7 +132,7 @@ impl HealthCheckManager {
// Get the endpoint-specific notifier
let notifier = {
let system_health = self.drt.system_health.lock().unwrap();
let system_health = self.drt.system_health.lock();
system_health
.get_endpoint_health_check_notifier(&endpoint_subject)
.expect("Notifier should exist for registered endpoint")
......@@ -153,7 +151,7 @@ impl HealthCheckManager {
// Get the health check payload for this endpoint
let target = {
let system_health = manager.drt.system_health.lock().unwrap();
let system_health = manager.drt.system_health.lock();
system_health.get_health_check_target(&endpoint_subject)
};
......@@ -185,7 +183,6 @@ impl HealthCheckManager {
// Store the task handle
self.endpoint_tasks
.lock()
.unwrap()
.insert(endpoint_subject.clone(), task);
info!(
......@@ -201,7 +198,7 @@ impl HealthCheckManager {
// Get the receiver (can only be taken once)
let mut rx = {
let system_health = manager.drt.system_health.lock().unwrap();
let system_health = manager.drt.system_health.lock();
system_health.take_new_endpoint_receiver().ok_or_else(|| {
anyhow::anyhow!("Endpoint receiver already taken - this should only be called once")
})?
......@@ -217,7 +214,7 @@ impl HealthCheckManager {
);
let already_exists = {
let tasks = manager.endpoint_tasks.lock().unwrap();
let tasks = manager.endpoint_tasks.lock();
tasks.contains_key(&endpoint_subject)
};
......@@ -250,7 +247,7 @@ impl HealthCheckManager {
payload: &serde_json::Value,
) -> anyhow::Result<()> {
let target = {
let system_health = self.drt.system_health.lock().unwrap();
let system_health = self.drt.system_health.lock();
system_health
.get_health_check_target(endpoint_subject)
.ok_or_else(|| {
......@@ -310,7 +307,7 @@ impl HealthCheckManager {
};
// Update health status based on response
system_health.lock().unwrap().set_endpoint_health_status(
system_health.lock().set_endpoint_health_status(
&endpoint_subject_owned,
if is_healthy {
HealthStatus::Ready
......@@ -324,7 +321,7 @@ impl HealthCheckManager {
"Health check request failed for {}: {}",
endpoint_subject_owned, e
);
system_health.lock().unwrap().set_endpoint_health_status(
system_health.lock().set_endpoint_health_status(
&endpoint_subject_owned,
HealthStatus::NotReady,
);
......@@ -338,7 +335,6 @@ impl HealthCheckManager {
warn!("Health check timeout for {}", endpoint_subject_owned);
system_health
.lock()
.unwrap()
.set_endpoint_health_status(&endpoint_subject_owned, HealthStatus::NotReady);
}
......@@ -369,7 +365,7 @@ pub async fn get_health_check_status(
) -> anyhow::Result<serde_json::Value> {
// Get endpoints list from SystemHealth
let endpoint_subjects: Vec<String> = {
let system_health = drt.system_health.lock().unwrap();
let system_health = drt.system_health.lock();
system_health.get_health_check_endpoints()
};
......@@ -377,7 +373,7 @@ pub async fn get_health_check_status(
// Check each endpoint's health status
{
let system_health = drt.system_health.lock().unwrap();
let system_health = drt.system_health.lock();
for endpoint_subject in &endpoint_subjects {
let health_status = system_health
.get_endpoint_health_status(endpoint_subject)
......@@ -447,36 +443,28 @@ mod integration_tests {
"_health_check": true
});
drt.system_health
.lock()
.unwrap()
.register_health_check_target(
endpoint,
crate::component::Instance {
component: "test_component".to_string(),
endpoint: "test_endpoint".to_string(),
namespace: "test_namespace".to_string(),
instance_id: 12345,
transport: crate::component::TransportType::NatsTcp(endpoint.to_string()),
},
payload.clone(),
);
drt.system_health.lock().register_health_check_target(
endpoint,
crate::component::Instance {
component: "test_component".to_string(),
endpoint: "test_endpoint".to_string(),
namespace: "test_namespace".to_string(),
instance_id: 12345,
transport: crate::component::TransportType::NatsTcp(endpoint.to_string()),
},
payload.clone(),
);
let retrieved = drt
.system_health
.lock()
.unwrap()
.get_health_check_target(endpoint)
.map(|t| t.payload);
assert!(retrieved.is_some());
assert_eq!(retrieved.unwrap(), payload);
// Verify endpoint appears in the list
let endpoints = drt
.system_health
.lock()
.unwrap()
.get_health_check_endpoints();
let endpoints = drt.system_health.lock().get_health_check_endpoints();
assert!(endpoints.contains(&endpoint.to_string()));
}
......@@ -490,20 +478,17 @@ mod integration_tests {
"prompt": format!("test{}", i),
"_health_check": true
});
drt.system_health
.lock()
.unwrap()
.register_health_check_target(
&endpoint,
crate::component::Instance {
component: "test_component".to_string(),
endpoint: format!("test_endpoint_{}", i),
namespace: "test_namespace".to_string(),
instance_id: i as i64,
transport: crate::component::TransportType::NatsTcp(endpoint.clone()),
},
payload,
);
drt.system_health.lock().register_health_check_target(
&endpoint,
crate::component::Instance {
component: "test_component".to_string(),
endpoint: format!("test_endpoint_{}", i),
namespace: "test_namespace".to_string(),
instance_id: i as i64,
transport: crate::component::TransportType::NatsTcp(endpoint.clone()),
},
payload,
);
}
let config = HealthCheckConfig {
......@@ -515,7 +500,7 @@ mod integration_tests {
manager.clone().start().await.unwrap();
// Verify all endpoints have their own health check tasks
let tasks = manager.endpoint_tasks.lock().unwrap();
let tasks = manager.endpoint_tasks.lock();
// Should have 3 tasks (one for each endpoint)
assert_eq!(tasks.len(), 3);
// Check that all endpoints are represented in tasks
......@@ -536,26 +521,22 @@ mod integration_tests {
});
// Register the endpoint
drt.system_health
.lock()
.unwrap()
.register_health_check_target(
endpoint,
crate::component::Instance {
component: "test_component".to_string(),
endpoint: "test_endpoint_notifier".to_string(),
namespace: "test_namespace".to_string(),
instance_id: 999,
transport: crate::component::TransportType::NatsTcp(endpoint.to_string()),
},
payload.clone(),
);
drt.system_health.lock().register_health_check_target(
endpoint,
crate::component::Instance {
component: "test_component".to_string(),
endpoint: "test_endpoint_notifier".to_string(),
namespace: "test_namespace".to_string(),
instance_id: 999,
transport: crate::component::TransportType::NatsTcp(endpoint.to_string()),
},
payload.clone(),
);
// Verify that a notifier was created for this endpoint
let notifier = drt
.system_health
.lock()
.unwrap()
.get_endpoint_health_check_notifier(endpoint);
assert!(
......@@ -572,7 +553,6 @@ mod integration_tests {
let status = drt
.system_health
.lock()
.unwrap()
.get_endpoint_health_status(endpoint);
assert_eq!(status, Some(HealthStatus::Ready));
}
......
......@@ -206,7 +206,7 @@ pub struct DistributedRuntime {
instance_sources: Arc<tokio::sync::Mutex<HashMap<Endpoint, Weak<InstanceSource>>>>,
// Health Status
system_health: Arc<std::sync::Mutex<SystemHealth>>,
system_health: Arc<parking_lot::Mutex<SystemHealth>>,
// This map associates metric prefixes with their corresponding Prometheus registries and callbacks.
// Uses RwLock for better concurrency - multiple threads can read (execute callbacks) simultaneously.
......
......@@ -8,9 +8,9 @@
pub mod prometheus_names;
use parking_lot::Mutex;
use std::collections::HashSet;
use std::sync::Arc;
use std::sync::Mutex;
use crate::component::ComponentBuilder;
use anyhow;
......
......@@ -11,8 +11,8 @@ use crate::protocols::LeaseId;
use anyhow::Result;
use async_nats::service::endpoint::Endpoint;
use derive_builder::Builder;
use parking_lot::Mutex;
use std::collections::HashMap;
use std::sync::Mutex;
use tokio::sync::Notify;
use tokio_util::sync::CancellationToken;
use tracing::Instrument;
......@@ -52,7 +52,6 @@ impl PushEndpoint {
system_health
.lock()
.unwrap()
.set_endpoint_health_status(endpoint_name_local.as_str(), HealthStatus::Ready);
loop {
......@@ -132,7 +131,6 @@ impl PushEndpoint {
system_health
.lock()
.unwrap()
.set_endpoint_health_status(endpoint_name_local.as_str(), HealthStatus::NotReady);
// await for all inflight requests to complete if graceful shutdown
......
......@@ -179,8 +179,8 @@ impl Runtime {
/// Detect the number of worker threads in the runtime
async fn detect_worker_thread_count(&self) -> usize {
use parking_lot::Mutex;
use std::collections::HashSet;
use std::sync::Mutex;
let thread_ids = Arc::new(Mutex::new(HashSet::new()));
let mut handles = Vec::new();
......@@ -192,7 +192,7 @@ impl Runtime {
let ids = Arc::clone(&thread_ids);
let handle = tokio::task::spawn_blocking(move || {
let thread_id = std::thread::current().id();
ids.lock().unwrap().insert(thread_id);
ids.lock().insert(thread_id);
});
handles.push(handle);
}
......@@ -202,7 +202,7 @@ impl Runtime {
let _ = handle.await;
}
let count = thread_ids.lock().unwrap().len();
let count = thread_ids.lock().len();
tracing::debug!("Detected {} worker threads in runtime", count);
count
}
......
......@@ -49,7 +49,7 @@ pub struct SystemHealth {
/// This solves the race condition where HealthCheckManager starts before endpoints are registered
/// Using a channel ensures no registrations are lost.
new_endpoint_tx: mpsc::UnboundedSender<String>,
new_endpoint_rx: Arc<std::sync::Mutex<Option<mpsc::UnboundedReceiver<String>>>>,
new_endpoint_rx: Arc<parking_lot::Mutex<Option<mpsc::UnboundedReceiver<String>>>>,
use_endpoint_health_status: Vec<String>,
health_path: String,
live_path: String,
......@@ -78,7 +78,7 @@ impl SystemHealth {
health_check_targets: Arc::new(std::sync::RwLock::new(HashMap::new())),
health_check_notifiers: Arc::new(std::sync::RwLock::new(HashMap::new())),
new_endpoint_tx: tx,
new_endpoint_rx: Arc::new(std::sync::Mutex::new(Some(rx))),
new_endpoint_rx: Arc::new(parking_lot::Mutex::new(Some(rx))),
use_endpoint_health_status,
health_path,
live_path,
......@@ -238,7 +238,7 @@ impl SystemHealth {
/// Take the receiver for new endpoint registrations (can only be called once)
/// This is used by HealthCheckManager to receive notifications of new endpoints
pub fn take_new_endpoint_receiver(&self) -> Option<mpsc::UnboundedReceiver<String>> {
self.new_endpoint_rx.lock().unwrap().take()
self.new_endpoint_rx.lock().take()
}
/// Initialize the uptime gauge using the provided metrics registry
......
......@@ -83,14 +83,12 @@ pub async fn spawn_system_status_server(
.drt()
.system_health
.lock()
.unwrap()
.health_path()
.to_string();
let live_path = server_state
.drt()
.system_health
.lock()
.unwrap()
.live_path()
.to_string();
......@@ -160,7 +158,7 @@ pub async fn spawn_system_status_server(
#[tracing::instrument(skip_all, level = "trace")]
async fn health_handler(state: Arc<SystemStatusState>) -> impl IntoResponse {
// Get basic health status
let system_health = state.drt().system_health.lock().unwrap();
let system_health = state.drt().system_health.lock();
let (healthy, endpoints) = system_health.get_health_status();
let uptime = Some(system_health.uptime());
......@@ -186,12 +184,7 @@ async fn health_handler(state: Arc<SystemStatusState>) -> impl IntoResponse {
#[tracing::instrument(skip_all, level = "trace")]
async fn metrics_handler(state: Arc<SystemStatusState>) -> impl IntoResponse {
// Update the uptime gauge with current value
state
.drt()
.system_health
.lock()
.unwrap()
.update_uptime_gauge();
state.drt().system_health.lock().update_uptime_gauge();
// Execute all the callbacks for all registered hierarchies
let all_hierarchies: Vec<String> = {
......@@ -301,13 +294,13 @@ mod integration_tests {
let drt = create_test_drt_async().await;
// Get uptime from SystemHealth
let uptime = drt.system_health.lock().unwrap().uptime();
let uptime = drt.system_health.lock().uptime();
// Uptime should exist (even if close to zero)
assert!(uptime.as_nanos() > 0 || uptime.is_zero());
// Sleep briefly and check uptime increases
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
let uptime_after = drt.system_health.lock().unwrap().uptime();
let uptime_after = drt.system_health.lock().uptime();
assert!(uptime_after > uptime);
})
.await;
......@@ -358,19 +351,19 @@ mod integration_tests {
let drt = create_test_drt_async().await;
// Get initial uptime
let initial_uptime = drt.system_health.lock().unwrap().uptime();
let initial_uptime = drt.system_health.lock().uptime();
// Update the gauge with initial value
drt.system_health.lock().unwrap().update_uptime_gauge();
drt.system_health.lock().update_uptime_gauge();
// Sleep for 100ms
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
// Get uptime after sleep
let uptime_after_sleep = drt.system_health.lock().unwrap().uptime();
let uptime_after_sleep = drt.system_health.lock().uptime();
// Update the gauge again
drt.system_health.lock().unwrap().update_uptime_gauge();
drt.system_health.lock().update_uptime_gauge();
// Verify uptime increased by at least 100ms
let elapsed = uptime_after_sleep - initial_uptime;
......@@ -775,7 +768,7 @@ mod integration_tests {
// Register the endpoint and its health check payload
{
let system_health = drt.system_health.lock().unwrap();
let system_health = drt.system_health.lock();
system_health.register_health_check_target(
endpoint,
crate::component::Instance {
......@@ -804,7 +797,6 @@ mod integration_tests {
// Set endpoint to healthy state
drt.system_health
.lock()
.unwrap()
.set_endpoint_health_status(endpoint, HealthStatus::Ready);
// Check health again - should now be healthy
......@@ -822,7 +814,6 @@ mod integration_tests {
let endpoint_status = drt
.system_health
.lock()
.unwrap()
.get_endpoint_health_status(endpoint);
assert_eq!(
endpoint_status,
......
......@@ -24,7 +24,8 @@ use super::{CancellationToken, Result, Runtime, RuntimeConfig, error};
use futures::Future;
use once_cell::sync::OnceCell;
use std::{sync::Mutex, time::Duration};
use parking_lot::Mutex;
use std::time::Duration;
use tokio::{signal, task::JoinHandle};
static RT: OnceCell<tokio::runtime::Runtime> = OnceCell::new();
......@@ -190,7 +191,6 @@ impl Worker {
.get()
.expect("Application task not initialized")
.lock()
.unwrap()
.take()
.expect("Application initialized; but another thread is awaiting it; Worker.execute() can only be called once")
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment