"runtime/rust/vscode:/vscode.git/clone" did not exist on "7f85dcc3f616d5d4a77ccb89bf7e5f1abe9888c9"
Commit 8f741f14 authored by Ryan Olson's avatar Ryan Olson Committed by GitHub
Browse files

feat: adding etcd method kv_create_or_validate (#249)

What does the PR do?
 - adds etcd method to atomic create or validate a kv entry.
 - adds integration tests to validate the behavior
parent 4f6f63cd
......@@ -23,7 +23,9 @@ use tokio::sync::mpsc;
use tracing as log;
use validator::Validate;
use etcd_client::{Compare, CompareOp, GetOptions, PutOptions, Txn, TxnOp, WatchOptions, Watcher};
use etcd_client::{
Compare, CompareOp, GetOptions, PutOptions, Txn, TxnOp, TxnOpResponse, WatchOptions, Watcher,
};
pub use etcd_client::{ConnectOptions, KeyValue, LeaseClient};
......@@ -163,7 +165,74 @@ impl Client {
]);
// Execute the transaction
let _ = self.client.kv_client().txn(txn).await?;
let result = self.client.kv_client().txn(txn).await?;
match result.succeeded() {
true => Ok(()),
false => Err(error!("failed to create key")),
}
}
/// Atomically create a key if it does not exist, or validate the values are identical if the key exists.
pub async fn kv_create_or_validate(
&self,
key: String,
value: Vec<u8>,
lease_id: Option<i64>,
) -> Result<()> {
let put_options = lease_id.map(|id| PutOptions::new().with_lease(id));
// Build the transaction that either creates the key if it doesn't exist,
// or validates the existing value matches what we expect
let txn = Txn::new()
.when(vec![Compare::version(key.as_str(), CompareOp::Equal, 0)]) // Key doesn't exist
.and_then(vec![
TxnOp::put(key.as_str(), value.clone(), put_options), // Create it
])
.or_else(vec![
// If key exists but values don't match, this will fail the transaction
TxnOp::txn(Txn::new().when(vec![Compare::value(
key.as_str(),
CompareOp::Equal,
value.clone(),
)])),
]);
// Execute the transaction
let result = self.client.kv_client().txn(txn).await?;
// We have to enumerate the response paths to determine if the transaction succeeded
if result.succeeded() {
Ok(())
} else {
match result.op_responses().first() {
Some(response) => match response {
TxnOpResponse::Txn(response) => match response.succeeded() {
true => Ok(()),
false => Err(error!("failed to create or validate key")),
},
_ => Err(error!("unexpected response type")),
},
None => Err(error!("failed to create or validate key")),
}
}
}
pub async fn kv_put(
&self,
key: impl AsRef<str>,
value: impl AsRef<[u8]>,
lease_id: Option<i64>,
) -> Result<()> {
let _ = self
.client
.kv_client()
.put(
key.as_ref(),
value.as_ref(),
lease_id.map(|id| PutOptions::new().with_lease(id)),
)
.await?;
Ok(())
}
......@@ -200,7 +269,8 @@ impl Client {
Some(
WatchOptions::new()
.with_prefix()
.with_start_revision(start_revision),
.with_start_revision(start_revision)
.with_prev_key(),
),
)
.await?;
......@@ -295,3 +365,58 @@ fn default_servers() -> Vec<String> {
Err(_) => vec!["http://localhost:2379".to_string()],
}
}
#[cfg(feature = "integration")]
#[cfg(test)]
mod tests {
use crate::{distributed::DistributedConfig, DistributedRuntime};
use super::*;
#[test]
fn test_ectd_client() {
let rt = Runtime::from_settings().unwrap();
let rt_clone = rt.clone();
let config = DistributedConfig::from_settings();
rt_clone.primary().block_on(async move {
let drt = DistributedRuntime::new(rt, config).await.unwrap();
test_kv_create_or_validate(drt).await.unwrap();
});
}
async fn test_kv_create_or_validate(drt: DistributedRuntime) -> Result<()> {
let key = "__integration_test_key";
let value = b"test_value";
let client = drt.etcd_client();
let lease_id = drt.primary_lease().id();
// Create the key
let result = client
.kv_create(key.to_string(), value.to_vec(), Some(lease_id))
.await;
assert!(result.is_ok(), "");
// Try to create the key again - this should fail
let result = client
.kv_create(key.to_string(), value.to_vec(), Some(lease_id))
.await;
assert!(result.is_err());
// Create or validate should succeed as the values match
let result = client
.kv_create_or_validate(key.to_string(), value.to_vec(), Some(lease_id))
.await;
assert!(result.is_ok());
// Try to create the key with a different value
let different_value = b"different_value";
let result = client
.kv_create_or_validate(key.to_string(), different_value.to_vec(), Some(lease_id))
.await;
assert!(result.is_err(), "");
Ok(())
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment