Unverified Commit f56483cb authored by mohammedabdulwahhab's avatar mohammedabdulwahhab Committed by GitHub
Browse files

fix: prevent deadlock in etcd when flushing initial keys (#5091)


Signed-off-by: default avatarmohammedabdulwahhab <furkhan324@berkeley.edu>
parent 651569ff
...@@ -348,17 +348,23 @@ impl Client { ...@@ -348,17 +348,23 @@ impl Client {
prefix: impl AsRef<str> + std::fmt::Display, prefix: impl AsRef<str> + std::fmt::Display,
include_existing: bool, include_existing: bool,
) -> Result<PrefixWatcher> { ) -> Result<PrefixWatcher> {
let (tx, rx) = mpsc::channel(32); let (mut start_revision, existing_kvs) = self
.get_start_revision(prefix.as_ref(), include_existing)
// Get start revision and send existing KVs
let mut start_revision = self
.get_start_revision(
prefix.as_ref(),
if include_existing { Some(&tx) } else { None },
)
.await?; .await?;
// Resilience watch stream in background // Size channel to fit all existing KVs (avoids deadlock when sending before return)
let existing_count = existing_kvs.as_ref().map_or(0, |kvs| kvs.len());
let (tx, rx) = mpsc::channel(existing_count + 32);
// Send existing KVs before returning so they're immediately available to consumers
if let Some(kvs) = existing_kvs {
tracing::trace!("sending {} existing kvs", kvs.len());
for kv in kvs {
tx.send(WatchEvent::Put(kv)).await?;
}
}
// Watch for new events in background
let connector = self.connector.clone(); let connector = self.connector.clone();
let prefix_str = prefix.as_ref().to_string(); let prefix_str = prefix.as_ref().to_string();
self.rt.spawn(async move { self.rt.spawn(async move {
...@@ -384,15 +390,12 @@ impl Client { ...@@ -384,15 +390,12 @@ impl Client {
}) })
} }
/// Fetch the initial revision for watching and optionally send existing key-values. /// Fetch the start revision and optionally return existing key-values.
///
/// Returns the next revision to watch from. If `existing_kvs_tx` is provided,
/// all existing keys with the prefix are sent through the channel first.
async fn get_start_revision( async fn get_start_revision(
&self, &self,
prefix: impl AsRef<str> + std::fmt::Display, prefix: impl AsRef<str> + std::fmt::Display,
existing_kvs_tx: Option<&mpsc::Sender<WatchEvent>>, include_existing: bool,
) -> Result<i64> { ) -> Result<(i64, Option<Vec<KeyValue>>)> {
let mut kv_client = self.connector.get_client().kv_client(); let mut kv_client = self.connector.get_client().kv_client();
let mut get_response = kv_client let mut get_response = kv_client
.get(prefix.as_ref(), Some(GetOptions::new().with_prefix())) .get(prefix.as_ref(), Some(GetOptions::new().with_prefix()))
...@@ -406,16 +409,14 @@ impl Client { ...@@ -406,16 +409,14 @@ impl Client {
tracing::trace!("{prefix}: start_revision: {start_revision}"); tracing::trace!("{prefix}: start_revision: {start_revision}");
start_revision += 1; start_revision += 1;
// Send existing KVs from response if requested // Return existing KVs if requested
if let Some(tx) = existing_kvs_tx { let existing_kvs = include_existing.then(|| {
let kvs = get_response.take_kvs(); let kvs = get_response.take_kvs();
tracing::trace!("initial kv count: {:?}", kvs.len()); tracing::trace!("initial kv count: {:?}", kvs.len());
for kv in kvs.into_iter() { kvs
tx.send(WatchEvent::Put(kv)).await?; });
}
}
Ok(start_revision) Ok((start_revision, existing_kvs))
} }
/// Establish a new watch stream with automatic retry and reconnection. /// Establish a new watch stream with automatic retry and reconnection.
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment