Unverified Commit f30d76ce authored by Graham King's avatar Graham King Committed by GitHub
Browse files

fix: KeyValueStore etcd impl should use internal etcd client (#4212)


Signed-off-by: default avatarGraham King <grahamk@nvidia.com>
parent 23660bc5
......@@ -182,43 +182,18 @@ impl EtcdBucket {
let k = make_key(&self.bucket_name, key);
tracing::trace!("etcd create: {k}");
// Use atomic transaction to check and create in one operation
let put_options = PutOptions::new().with_lease(self.client.lease_id() as i64);
// Build transaction that creates key only if it doesn't exist
let txn = Txn::new()
.when(vec![Compare::version(k.as_str(), CompareOp::Equal, 0)]) // Atomic check
.and_then(vec![TxnOp::put(k.as_str(), value, Some(put_options))]) // Only if check passes
.or_else(vec![
TxnOp::get(k.as_str(), None), // Key exists, get its info
]);
// Execute the transaction
let result = self
match self
.client
.etcd_client()
.kv_client()
.txn(txn)
.kv_create(k.as_str(), value.into(), None)
.await
.map_err(|e| StoreError::EtcdError(e.to_string()))?;
if result.succeeded() {
// Key was created successfully
return Ok(StoreOutcome::Created(1)); // version of new key is always 1
}
// Key already existed, get its version
if let Some(etcd_client::TxnOpResponse::Get(get_resp)) =
result.op_responses().into_iter().next()
&& let Some(kv) = get_resp.kvs().first()
.map_err(|e| StoreError::EtcdError(e.to_string()))?
{
let version = kv.version() as u64;
return Ok(StoreOutcome::Exists(version));
None => {
// Key was created successfully
Ok(StoreOutcome::Created(1)) // version of new key is always 1
}
Some(revision) => Ok(StoreOutcome::Exists(revision)),
}
// Shouldn't happen, but handle edge case
Err(StoreError::EtcdError(
"Unexpected transaction response".to_string(),
))
}
async fn update(
......
......@@ -112,7 +112,7 @@ impl Client {
/// Get a clone of the underlying [`etcd_client::Client`] instance.
/// This returns a clone since the client is behind an RwLock.
pub fn etcd_client(&self) -> etcd_client::Client {
fn etcd_client(&self) -> etcd_client::Client {
self.connector.get_client()
}
......@@ -121,28 +121,48 @@ impl Client {
self.primary_lease
}
pub async fn kv_create(&self, key: &str, value: Vec<u8>, lease_id: Option<u64>) -> Result<()> {
/// Returns Ok(None) if value was created, Ok(Some(revision)) if the value already exists.
pub async fn kv_create(
&self,
key: &str,
value: Vec<u8>,
lease_id: Option<u64>,
) -> Result<Option<u64>> {
let id = lease_id.unwrap_or(self.lease_id());
let put_options = PutOptions::new().with_lease(id as i64);
// Build the transaction
// Build transaction that creates key only if it doesn't exist
let txn = Txn::new()
.when(vec![Compare::version(key, CompareOp::Equal, 0)]) // Ensure the lock does not exist
.and_then(vec![
TxnOp::put(key, value, Some(put_options)), // Create the object
])
.or_else(vec![
TxnOp::get(key, None), // Key exists, get its info
]);
// Execute the transaction
let result = self.connector.get_client().kv_client().txn(txn).await?;
// Created
if result.succeeded() {
Ok(())
} else {
for resp in result.op_responses() {
tracing::warn!(response = ?resp, "kv_create etcd op response");
}
anyhow::bail!("Unable to create key. Check etcd server status")
return Ok(None);
}
// Already exists
if let Some(etcd_client::TxnOpResponse::Get(get_resp)) =
result.op_responses().into_iter().next()
&& let Some(kv) = get_resp.kvs().first()
{
let version = kv.version() as u64;
return Ok(Some(version));
}
// Error
for resp in result.op_responses() {
tracing::warn!(response = ?resp, "kv_create etcd op response");
}
anyhow::bail!("Unable to create key. Check etcd server status")
}
/// Atomically create a key if it does not exist, or validate the values are identical if the key exists.
......
......@@ -90,15 +90,11 @@ async fn create_barrier_key<T: Serialize>(
let serialized_data =
serde_json::to_vec(&data).map_err(LeaderWorkerBarrierError::SerdeError)?;
// TODO: This can fail for many reasons, the most common of which is that the key already exists.
// Currently, the ETCD client returns a very generic error, so we can't distinguish between the them.
// For now, just assume it's because the key already exists.
client
.kv_create(key, serialized_data, lease_id)
.await
.map_err(|_| LeaderWorkerBarrierError::IdNotUnique)?;
Ok(())
match client.kv_create(key, serialized_data, lease_id).await {
Ok(None) => Ok(()),
Ok(Some(_)) => Err(LeaderWorkerBarrierError::IdNotUnique),
Err(err) => Err(LeaderWorkerBarrierError::EtcdError(err)),
}
}
/// Waits for a single key to appear (used for completion/abort signals)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment