Unverified Commit f05cabac authored by MatejKosec's avatar MatejKosec Committed by GitHub
Browse files

perf(discovery): replace polling with Notify in concurrent registration wait (#8291)


Signed-off-by: default avatarMatej Kosec <mkosec@nvidia.com>
parent 0d5bf022
...@@ -82,6 +82,9 @@ pub struct ModelWatcher { ...@@ -82,6 +82,9 @@ pub struct ModelWatcher {
metrics: Arc<Metrics>, metrics: Arc<Metrics>,
/// Guards against concurrent pipeline construction for the same (model, namespace). /// Guards against concurrent pipeline construction for the same (model, namespace).
registering_worker_sets: DashSet<String>, registering_worker_sets: DashSet<String>,
/// Wakes tasks blocked in `recover_concurrent_registration` when a
/// `RegistrationGuard` drops (i.e. a registration completes or panics).
registration_notify: Notify,
/// Tracks in-flight `handle_put` tasks by instance path so that `handle_delete` /// Tracks in-flight `handle_put` tasks by instance path so that `handle_delete`
/// can await a racing put before proceeding with cleanup. /// can await a racing put before proceeding with cleanup.
pending_puts: DashMap<String, JoinHandle<()>>, pending_puts: DashMap<String, JoinHandle<()>>,
...@@ -121,17 +124,20 @@ fn is_model_type_list_empty(manager: &ModelManager, model_type: ModelType) -> bo ...@@ -121,17 +124,20 @@ fn is_model_type_list_empty(manager: &ModelManager, model_type: ModelType) -> bo
} }
} }
/// RAII guard that removes a key from a `DashSet` on drop. /// RAII guard that removes a key from a `DashSet` on drop and wakes any tasks
/// waiting for the registration to finish via the shared [`Notify`].
/// Ensures `registering_worker_sets` is cleaned up even if the registration /// Ensures `registering_worker_sets` is cleaned up even if the registration
/// task panics, preventing permanent poisoning of the registration key. /// task panics, preventing permanent poisoning of the registration key.
struct RegistrationGuard<'a> { struct RegistrationGuard<'a> {
set: &'a DashSet<String>, set: &'a DashSet<String>,
key: String, key: String,
notify: &'a Notify,
} }
impl Drop for RegistrationGuard<'_> { impl Drop for RegistrationGuard<'_> {
fn drop(&mut self) { fn drop(&mut self) {
self.set.remove(&self.key); self.set.remove(&self.key);
self.notify.notify_waiters();
} }
} }
...@@ -159,6 +165,7 @@ impl ModelWatcher { ...@@ -159,6 +165,7 @@ impl ModelWatcher {
prefill_load_estimator, prefill_load_estimator,
metrics, metrics,
registering_worker_sets: DashSet::new(), registering_worker_sets: DashSet::new(),
registration_notify: Notify::new(),
pending_puts: DashMap::new(), pending_puts: DashMap::new(),
} }
} }
...@@ -478,9 +485,11 @@ impl ModelWatcher { ...@@ -478,9 +485,11 @@ impl ModelWatcher {
// RAII guard ensures the registration key is removed even if // RAII guard ensures the registration key is removed even if
// do_worker_set_registration panics, preventing permanent poisoning. // do_worker_set_registration panics, preventing permanent poisoning.
// It also wakes any waiters in recover_concurrent_registration.
let _guard = RegistrationGuard { let _guard = RegistrationGuard {
set: &self.registering_worker_sets, set: &self.registering_worker_sets,
key: registration_key, key: registration_key,
notify: &self.registration_notify,
}; };
self.do_worker_set_registration(mcid, card).await self.do_worker_set_registration(mcid, card).await
...@@ -504,10 +513,31 @@ impl ModelWatcher { ...@@ -504,10 +513,31 @@ impl ModelWatcher {
// Wait for the in-flight registration to complete so we can validate // Wait for the in-flight registration to complete so we can validate
// the new worker's checksum. Without this, a concurrent worker with a // the new worker's checksum. Without this, a concurrent worker with a
// mismatched checksum could sneak past the early check in `watch`. // mismatched checksum could sneak past the early check in `watch`.
let mut attempts = 0; //
while self.registering_worker_sets.contains(registration_key) && attempts < 300 { // Uses a Notify + enable() loop instead of polling to wake up
tokio::time::sleep(Duration::from_millis(100)).await; // immediately when the RegistrationGuard drops, avoiding up to 100ms
attempts += 1; // of unnecessary latency and wasted CPU cycles.
// An absolute deadline ensures spurious wakeups (from unrelated
// registrations sharing the same Notify) cannot extend the wait
// beyond 30 seconds.
let deadline = tokio::time::Instant::now() + Duration::from_secs(30);
loop {
let notified = self.registration_notify.notified();
tokio::pin!(notified);
// Register interest in the notification BEFORE checking the
// condition to avoid a race where the guard drops between
// our check and the .await.
notified.as_mut().enable();
if !self.registering_worker_sets.contains(registration_key) {
break;
}
let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
if remaining.is_zero() {
break;
}
if tokio::time::timeout(remaining, notified).await.is_err() {
break;
}
} }
// Validate checksum against the registered model // Validate checksum against the registered model
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment