Unverified Commit a09ca3ec authored by Jacky's avatar Jacky Committed by GitHub
Browse files

feat: FT downed worker instance tracking and skipping (#1424)

parent 648740e8
......@@ -55,8 +55,10 @@ enum EndpointEvent {
pub struct Client {
// This is me
pub endpoint: Endpoint,
// These are the remotes I know about
// These are the remotes I know about from watching etcd
pub instance_source: Arc<InstanceSource>,
// These are the instances that are reported as down from sending rpc
instance_inhibited: Arc<Mutex<HashMap<i64, u64>>>,
}
#[derive(Clone, Debug)]
......@@ -65,12 +67,15 @@ pub enum InstanceSource {
Dynamic(tokio::sync::watch::Receiver<Vec<Instance>>),
}
// TODO: Avoid returning a full clone of `Vec<Instance>` everytime from Client
// See instances() and instances_avail() methods
impl Client {
// Client will only talk to a single static endpoint
pub(crate) async fn new_static(endpoint: Endpoint) -> Result<Self> {
Ok(Client {
endpoint,
instance_source: Arc::new(InstanceSource::Static),
instance_inhibited: Arc::new(Mutex::new(HashMap::new())),
})
}
......@@ -87,6 +92,7 @@ impl Client {
Ok(Client {
endpoint,
instance_source,
instance_inhibited: Arc::new(Mutex::new(HashMap::new())),
})
}
......@@ -99,6 +105,7 @@ impl Client {
self.endpoint.etcd_root()
}
/// Instances available from watching etcd
pub fn instances(&self) -> Vec<Instance> {
match self.instance_source.as_ref() {
InstanceSource::Static => vec![],
......@@ -127,6 +134,53 @@ impl Client {
Ok(instances)
}
/// Instances available from watching etcd minus those reported as down
pub async fn instances_avail(&self) -> Vec<Instance> {
// TODO: Can we get the remaining TTL from the lease for the instance?
const ETCD_LEASE_TTL: u64 = 10; // seconds
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs();
let instances = self.instances();
let mut inhibited = self.instance_inhibited.lock().await;
instances
.into_iter()
.filter_map(|instance| {
let id = instance.id();
if let Some(&timestamp) = inhibited.get(&id) {
// If the inhibition is stale, remove it and include the instance
if now.saturating_sub(timestamp) > ETCD_LEASE_TTL {
tracing::debug!("instance {id} stale inhibition");
inhibited.remove(&id);
Some(instance)
} else {
tracing::debug!("instance {id} is inhibited");
None
}
} else {
tracing::debug!("instance {id} not inhibited");
Some(instance)
}
})
.collect()
}
/// Mark an instance as down/unavailable
pub async fn report_instance_down(&self, instance_id: i64) {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs();
let mut inhibited = self.instance_inhibited.lock().await;
inhibited.insert(instance_id, now);
tracing::debug!("inhibiting instance {instance_id}");
}
/// Is this component know at startup and not discovered via etcd?
pub fn is_static(&self) -> bool {
matches!(self.instance_source.as_ref(), InstanceSource::Static)
......
......@@ -13,10 +13,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use async_nats::client::{
RequestError as NatsRequestError, RequestErrorKind::NoResponders as NatsNoResponders,
};
use async_trait::async_trait;
use rand::Rng;
use serde::{Deserialize, Serialize};
use std::{
future::Future,
marker::PhantomData,
sync::{
atomic::{AtomicU64, Ordering},
......@@ -27,7 +31,9 @@ use std::{
use crate::{
component::{Client, Endpoint, InstanceSource},
engine::{AsyncEngine, Data},
pipeline::{AddressedPushRouter, AddressedRequest, Error, ManyOut, SingleIn},
pipeline::{
error::PipelineErrorExt, AddressedPushRouter, AddressedRequest, Error, ManyOut, SingleIn,
},
traits::DistributedRuntimeProvider,
};
......@@ -103,15 +109,17 @@ where
/// Issue a request to the next available instance in a round-robin fashion
pub async fn round_robin(&self, request: SingleIn<T>) -> anyhow::Result<ManyOut<U>> {
let counter = self.round_robin_counter.fetch_add(1, Ordering::Relaxed);
let slf = self;
let routing_algorithm = move || async move {
let counter = slf.round_robin_counter.fetch_add(1, Ordering::Relaxed);
let instance_id = {
let instances = self.client.instances();
let instances = slf.client.instances_avail().await;
let count = instances.len();
if count == 0 {
return Err(anyhow::anyhow!(
"no instances found for endpoint {:?}",
self.client.endpoint.etcd_root()
slf.client.endpoint.etcd_root()
));
}
let offset = counter % count as u64;
......@@ -119,21 +127,23 @@ where
};
tracing::trace!("round robin router selected {instance_id}");
let subject = self.client.endpoint.subject_to(instance_id);
let request = request.map(|req| AddressedRequest::new(req, subject));
self.addressed.generate(request).await
Ok(instance_id)
};
self.generate_with_fault_tolerance(routing_algorithm, request)
.await
}
/// Issue a request to a random endpoint
pub async fn random(&self, request: SingleIn<T>) -> anyhow::Result<ManyOut<U>> {
let slf = self;
let routing_algorithm = move || async move {
let instance_id = {
let instances = self.client.instances();
let instances = slf.client.instances_avail().await;
let count = instances.len();
if count == 0 {
return Err(anyhow::anyhow!(
"no instances found for endpoint {:?}",
self.client.endpoint.etcd_root()
slf.client.endpoint.etcd_root()
));
}
let counter = rand::rng().random::<u64>();
......@@ -141,11 +151,10 @@ where
instances[offset as usize].id()
};
tracing::trace!("random router selected {instance_id}");
let subject = self.client.endpoint.subject_to(instance_id);
let request = request.map(|req| AddressedRequest::new(req, subject));
self.addressed.generate(request).await
Ok(instance_id)
};
self.generate_with_fault_tolerance(routing_algorithm, request)
.await
}
/// Issue a request to a specific endpoint
......@@ -154,22 +163,23 @@ where
request: SingleIn<T>,
instance_id: i64,
) -> anyhow::Result<ManyOut<U>> {
let slf = self;
let routing_algorithm = move || async move {
let found = {
let instances = self.client.instances();
let instances = slf.client.instances_avail().await;
instances.iter().any(|ep| ep.id() == instance_id)
};
if !found {
return Err(anyhow::anyhow!(
"instance_id={instance_id} not found for endpoint {:?}",
self.client.endpoint.etcd_root()
slf.client.endpoint.etcd_root()
));
}
let subject = self.client.endpoint.subject_to(instance_id);
let request = request.map(|req| AddressedRequest::new(req, subject));
self.addressed.generate(request).await
Ok(instance_id)
};
self.generate_with_fault_tolerance(routing_algorithm, request)
.await
}
pub async fn r#static(&self, request: SingleIn<T>) -> anyhow::Result<ManyOut<U>> {
......@@ -179,6 +189,31 @@ where
tracing::debug!("router generate");
self.addressed.generate(request).await
}
async fn generate_with_fault_tolerance<F, R>(
&self,
routing_algorithm: F,
request: SingleIn<T>,
) -> anyhow::Result<ManyOut<U>>
where
F: FnOnce() -> R,
R: Future<Output = anyhow::Result<i64>>,
{
let instance_id = routing_algorithm().await?;
let subject = self.client.endpoint.subject_to(instance_id);
let request = request.map(|req| AddressedRequest::new(req, subject));
let stream = self.addressed.generate(request).await;
if let Some(err) = stream.as_ref().err() {
if let Some(req_err) = err.downcast_ref::<NatsRequestError>() {
if matches!(req_err.kind(), NatsNoResponders) {
self.client.report_instance_down(instance_id).await;
}
}
}
stream
}
}
#[async_trait]
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment