Unverified Commit e17b0460 authored by Graham King's avatar Graham King Committed by GitHub
Browse files

fix(etcd): KeyValueStore etcd uses our client not the lib directly (#4199)


Signed-off-by: default avatarGraham King <grahamk@nvidia.com>
parent 560bb2fc
...@@ -7,7 +7,7 @@ use std::time::Duration; ...@@ -7,7 +7,7 @@ use std::time::Duration;
use crate::{ use crate::{
storage::key_value_store::{Key, KeyValue, WatchEvent}, storage::key_value_store::{Key, KeyValue, WatchEvent},
transports::etcd::Client, transports::etcd,
}; };
use async_stream::stream; use async_stream::stream;
use async_trait::async_trait; use async_trait::async_trait;
...@@ -17,11 +17,11 @@ use super::{KeyValueBucket, KeyValueStore, StoreError, StoreOutcome}; ...@@ -17,11 +17,11 @@ use super::{KeyValueBucket, KeyValueStore, StoreError, StoreOutcome};
#[derive(Clone)] #[derive(Clone)]
pub struct EtcdStore { pub struct EtcdStore {
client: Client, client: etcd::Client,
} }
impl EtcdStore { impl EtcdStore {
pub fn new(client: Client) -> Self { pub fn new(client: etcd::Client) -> Self {
Self { client } Self { client }
} }
} }
...@@ -61,7 +61,7 @@ impl KeyValueStore for EtcdStore { ...@@ -61,7 +61,7 @@ impl KeyValueStore for EtcdStore {
} }
pub struct EtcdBucket { pub struct EtcdBucket {
client: Client, client: etcd::Client,
bucket_name: String, bucket_name: String,
} }
...@@ -114,36 +114,37 @@ impl KeyValueBucket for EtcdBucket { ...@@ -114,36 +114,37 @@ impl KeyValueBucket for EtcdBucket {
) -> Result<Pin<Box<dyn futures::Stream<Item = WatchEvent> + Send + 'life0>>, StoreError> { ) -> Result<Pin<Box<dyn futures::Stream<Item = WatchEvent> + Send + 'life0>>, StoreError> {
let prefix = make_key(&self.bucket_name, &"".into()); let prefix = make_key(&self.bucket_name, &"".into());
tracing::trace!("etcd watch: {prefix}"); tracing::trace!("etcd watch: {prefix}");
let (watcher, mut watch_stream) = self let watcher = self
.client .client
.etcd_client() .kv_watch_prefix(&prefix)
.clone()
.watch(prefix.as_bytes(), Some(WatchOptions::new().with_prefix()))
.await .await
.map_err(|e| StoreError::EtcdError(e.to_string()))?; .map_err(|e| StoreError::EtcdError(e.to_string()))?;
let (_, mut watch_stream) = watcher.dissolve();
let output = stream! { let output = stream! {
let _watcher = watcher; // Keep it alive. Not sure if necessary. while let Some(event) = watch_stream.recv().await {
while let Ok(Some(resp)) = watch_stream.message().await { match event {
for e in resp.events() { etcd::WatchEvent::Put(kv) => {
let Some(kv) = e.kv() else { let (k, v) = kv.into_key_value();
continue; let key = match String::from_utf8(k) {
};
let (k_bytes, v_bytes) = kv.clone().into_key_value();
let key = match String::from_utf8(k_bytes) {
Ok(k) => k, Ok(k) => k,
Err(err) => { Err(err) => {
tracing::error!(%err, prefix, "Invalid UTF8 in etcd key"); tracing::error!(%err, prefix, "Invalid UTF8 in etcd key");
continue; continue;
} }
}; };
match e.event_type() { let item = KeyValue::new(key, v.into());
EventType::Put => {
let item = KeyValue::new(key, v_bytes.into());
yield WatchEvent::Put(item); yield WatchEvent::Put(item);
} }
EventType::Delete => { etcd::WatchEvent::Delete(kv) => {
yield WatchEvent::Delete(Key::from_raw(key)); let (k, _) = kv.into_key_value();
let key = match String::from_utf8(k) {
Ok(k) => k,
Err(err) => {
tracing::error!(%err, prefix, "Invalid UTF8 in etcd key");
continue;
} }
};
yield WatchEvent::Delete(Key::from_raw(key));
} }
} }
} }
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment