Unverified Commit 4abab20f authored by Jacky's avatar Jacky Committed by GitHub
Browse files

refactor: Update inhibited instance removal logic (#1548)

parent 250ed733
...@@ -58,7 +58,7 @@ pub struct Client { ...@@ -58,7 +58,7 @@ pub struct Client {
// These are the remotes I know about from watching etcd // These are the remotes I know about from watching etcd
pub instance_source: Arc<InstanceSource>, pub instance_source: Arc<InstanceSource>,
// These are the instances that are reported as down from sending rpc // These are the instances that are reported as down from sending rpc
instance_inhibited: Arc<Mutex<HashMap<i64, u64>>>, instance_inhibited: Arc<Mutex<HashMap<i64, std::time::Instant>>>,
} }
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
...@@ -138,26 +138,26 @@ impl Client { ...@@ -138,26 +138,26 @@ impl Client {
pub async fn instances_avail(&self) -> Vec<Instance> { pub async fn instances_avail(&self) -> Vec<Instance> {
// TODO: Can we get the remaining TTL from the lease for the instance? // TODO: Can we get the remaining TTL from the lease for the instance?
const ETCD_LEASE_TTL: u64 = 10; // seconds const ETCD_LEASE_TTL: u64 = 10; // seconds
let now = std::time::SystemTime::now() let now = std::time::Instant::now();
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs();
let instances = self.instances(); let instances = self.instances();
let mut inhibited = self.instance_inhibited.lock().await; let mut inhibited = self.instance_inhibited.lock().await;
instances // 1. Remove inhibited instances that are no longer in `self.instances()`
// 2. Remove inhibited instances that have expired
// 3. Only return instances that are not inhibited after removals
let mut new_inhibited = HashMap::<i64, std::time::Instant>::new();
let filtered = instances
.into_iter() .into_iter()
.filter_map(|instance| { .filter_map(|instance| {
let id = instance.id(); let id = instance.id();
if let Some(&timestamp) = inhibited.get(&id) { if let Some(&timestamp) = inhibited.get(&id) {
// If the inhibition is stale, remove it and include the instance if now.duration_since(timestamp).as_secs() > ETCD_LEASE_TTL {
if now.saturating_sub(timestamp) > ETCD_LEASE_TTL {
tracing::debug!("instance {id} stale inhibition"); tracing::debug!("instance {id} stale inhibition");
inhibited.remove(&id);
Some(instance) Some(instance)
} else { } else {
tracing::debug!("instance {id} is inhibited"); tracing::debug!("instance {id} is inhibited");
new_inhibited.insert(id, timestamp);
None None
} }
} else { } else {
...@@ -165,15 +165,15 @@ impl Client { ...@@ -165,15 +165,15 @@ impl Client {
Some(instance) Some(instance)
} }
}) })
.collect() .collect();
*inhibited = new_inhibited;
filtered
} }
/// Mark an instance as down/unavailable /// Mark an instance as down/unavailable
pub async fn report_instance_down(&self, instance_id: i64) { pub async fn report_instance_down(&self, instance_id: i64) {
let now = std::time::SystemTime::now() let now = std::time::Instant::now();
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs();
let mut inhibited = self.instance_inhibited.lock().await; let mut inhibited = self.instance_inhibited.lock().await;
inhibited.insert(instance_id, now); inhibited.insert(instance_id, now);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment