Unverified Commit 6b75d6b0 authored by Michael Feil's avatar Michael Feil Committed by GitHub
Browse files

fix: recovery etc order, full recovery, thundering herd on same worker ids in same order. (#8016)


Signed-off-by: default avatarmichaelfeil <63565275+michaelfeil@users.noreply.github.com>
parent 6c877e4d
...@@ -18,6 +18,7 @@ use dynamo_runtime::protocols::maybe_error::MaybeError; ...@@ -18,6 +18,7 @@ use dynamo_runtime::protocols::maybe_error::MaybeError;
use dynamo_runtime::stream; use dynamo_runtime::stream;
use dynamo_runtime::traits::DistributedRuntimeProvider; use dynamo_runtime::traits::DistributedRuntimeProvider;
use futures::StreamExt; use futures::StreamExt;
use rand::Rng;
use tokio::sync::{Mutex, Semaphore}; use tokio::sync::{Mutex, Semaphore};
use super::Indexer; use super::Indexer;
...@@ -459,6 +460,14 @@ impl WorkerQueryClient { ...@@ -459,6 +460,14 @@ impl WorkerQueryClient {
let client = self.clone(); let client = self.clone();
tokio::spawn(async move { tokio::spawn(async move {
// Add jitter only for full-restore (start_event_id is None)
// to permute semaphore acquisition order and reduce thundering herd risk on initial discovery.
// This distributes load when multiple routers start simultaneously.
if start_event_id.is_none() {
let jitter_us = rand::rng().random_range(0..3000u64);
tokio::time::sleep(Duration::from_micros(jitter_us)).await;
}
let Ok(_permit) = client.recovery_semaphore.clone().acquire_owned().await else { let Ok(_permit) = client.recovery_semaphore.clone().acquire_owned().await else {
return; return;
}; };
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment