"lib/llm/vscode:/vscode.git/clone" did not exist on "d58a68819cb73668e8f6cd74b59dfd3fa9ff63af"
Unverified Commit 7731b024 authored by Graham King's avatar Graham King Committed by GitHub
Browse files

chore: Use KeyValueStoreManager instead of etcd::Client (#3822)


Signed-off-by: default avatarGraham King <grahamk@nvidia.com>
parent 6f9be594
...@@ -2,7 +2,9 @@ ...@@ -2,7 +2,9 @@
// SPDX-License-Identifier: Apache-2.0 // SPDX-License-Identifier: Apache-2.0
pub use crate::component::Component; pub use crate::component::Component;
use crate::storage::key_value_store::{EtcdStore, KeyValueStore, MemoryStore}; use crate::storage::key_value_store::{
EtcdStore, KeyValueStore, KeyValueStoreEnum, KeyValueStoreManager, MemoryStore,
};
use crate::transports::nats::DRTNatsClientPrometheusMetrics; use crate::transports::nats::DRTNatsClientPrometheusMetrics;
use crate::{ use crate::{
ErrorContext, PrometheusUpdateCallback, ErrorContext, PrometheusUpdateCallback,
...@@ -46,12 +48,10 @@ impl DistributedRuntime { ...@@ -46,12 +48,10 @@ impl DistributedRuntime {
let runtime_clone = runtime.clone(); let runtime_clone = runtime.clone();
let (etcd_client, store) = if is_static { let (etcd_client, store) = if is_static {
let store: Arc<dyn KeyValueStore> = Arc::new(MemoryStore::new()); (None, KeyValueStoreManager::memory())
(None, store)
} else { } else {
let etcd_client = etcd::Client::new(etcd_config.clone(), runtime_clone).await?; let etcd_client = etcd::Client::new(etcd_config.clone(), runtime_clone).await?;
let store: Arc<dyn KeyValueStore> = Arc::new(EtcdStore::new(etcd_client.clone())); let store = KeyValueStoreManager::etcd(etcd_client.clone());
(Some(etcd_client), store) (Some(etcd_client), store)
}; };
...@@ -278,10 +278,16 @@ impl DistributedRuntime { ...@@ -278,10 +278,16 @@ impl DistributedRuntime {
self.etcd_client.clone() self.etcd_client.clone()
} }
// Deprecated but our CI blocks us using the feature currently.
//#[deprecated(note = "Use KeyValueStoreManager via store(); this will be removed")]
pub fn deprecated_etcd_client(&self) -> Option<etcd::Client> {
self.etcd_client.clone()
}
/// An interface to store things. Will eventually replace `etcd_client`. /// An interface to store things. Will eventually replace `etcd_client`.
/// Currently does key-value, but will grow to include whatever we need to store. /// Currently does key-value, but will grow to include whatever we need to store.
pub fn store(&self) -> Arc<dyn KeyValueStore> { pub fn store(&self) -> &KeyValueStoreManager {
self.store.clone() &self.store
} }
pub fn child_token(&self) -> CancellationToken { pub fn child_token(&self) -> CancellationToken {
......
...@@ -484,7 +484,7 @@ mod integration_tests { ...@@ -484,7 +484,7 @@ mod integration_tests {
component: "test_component".to_string(), component: "test_component".to_string(),
endpoint: format!("test_endpoint_{}", i), endpoint: format!("test_endpoint_{}", i),
namespace: "test_namespace".to_string(), namespace: "test_namespace".to_string(),
instance_id: i as i64, instance_id: i,
transport: crate::component::TransportType::NatsTcp(endpoint.clone()), transport: crate::component::TransportType::NatsTcp(endpoint.clone()),
}, },
payload, payload,
......
...@@ -10,10 +10,10 @@ ...@@ -10,10 +10,10 @@
use std::sync::Arc; use std::sync::Arc;
use crate::component::{INSTANCE_ROOT_PATH, Instance}; use crate::component::{INSTANCE_ROOT_PATH, Instance};
use crate::storage::key_value_store::KeyValueStore; use crate::storage::key_value_store::{KeyValueStore, KeyValueStoreManager};
use crate::transports::etcd::Client as EtcdClient; use crate::transports::etcd::Client as EtcdClient;
pub async fn list_all_instances(client: Arc<dyn KeyValueStore>) -> anyhow::Result<Vec<Instance>> { pub async fn list_all_instances(client: &KeyValueStoreManager) -> anyhow::Result<Vec<Instance>> {
let Some(bucket) = client.get_bucket(INSTANCE_ROOT_PATH).await? else { let Some(bucket) = client.get_bucket(INSTANCE_ROOT_PATH).await? else {
return Ok(vec![]); return Ok(vec![]);
}; };
......
...@@ -52,7 +52,8 @@ pub use tokio_util::sync::CancellationToken; ...@@ -52,7 +52,8 @@ pub use tokio_util::sync::CancellationToken;
pub use worker::Worker; pub use worker::Worker;
use crate::{ use crate::{
metrics::prometheus_names::distributed_runtime, storage::key_value_store::KeyValueStore, metrics::prometheus_names::distributed_runtime,
storage::key_value_store::{KeyValueStore, KeyValueStoreManager},
}; };
use component::{Endpoint, InstanceSource}; use component::{Endpoint, InstanceSource};
...@@ -188,7 +189,7 @@ pub struct DistributedRuntime { ...@@ -188,7 +189,7 @@ pub struct DistributedRuntime {
// we might consider a unifed transport manager here // we might consider a unifed transport manager here
etcd_client: Option<transports::etcd::Client>, etcd_client: Option<transports::etcd::Client>,
nats_client: Option<transports::nats::Client>, nats_client: Option<transports::nats::Client>,
store: Arc<dyn KeyValueStore>, store: KeyValueStoreManager,
tcp_server: Arc<OnceCell<Arc<transports::tcp::server::TcpStreamServer>>>, tcp_server: Arc<OnceCell<Arc<transports::tcp::server::TcpStreamServer>>>,
system_status_server: Arc<OnceLock<Arc<system_status_server::SystemStatusServerInfo>>>, system_status_server: Arc<OnceLock<Arc<system_status_server::SystemStatusServerInfo>>>,
......
...@@ -310,7 +310,7 @@ pub fn make_handle_payload_span( ...@@ -310,7 +310,7 @@ pub fn make_handle_payload_span(
component: &str, component: &str,
endpoint: &str, endpoint: &str,
namespace: &str, namespace: &str,
instance_id: i64, instance_id: u64,
) -> Span { ) -> Span {
let (otel_context, trace_id, parent_span_id) = extract_otel_context_from_nats_headers(headers); let (otel_context, trace_id, parent_span_id) = extract_otel_context_from_nats_headers(headers);
let trace_parent = TraceParent::from_headers(headers); let trace_parent = TraceParent::from_headers(headers);
......
...@@ -77,7 +77,7 @@ pub enum RouterMode { ...@@ -77,7 +77,7 @@ pub enum RouterMode {
#[default] #[default]
RoundRobin, RoundRobin,
Random, Random,
Direct(i64), Direct(u64),
// Marker value, KV routing itself is in dynamo-llm // Marker value, KV routing itself is in dynamo-llm
KV, KV,
} }
...@@ -181,7 +181,7 @@ where ...@@ -181,7 +181,7 @@ where
pub async fn direct( pub async fn direct(
&self, &self,
request: SingleIn<T>, request: SingleIn<T>,
instance_id: i64, instance_id: u64,
) -> anyhow::Result<ManyOut<U>> { ) -> anyhow::Result<ManyOut<U>> {
let found = self.client.instance_ids_avail().contains(&instance_id); let found = self.client.instance_ids_avail().contains(&instance_id);
...@@ -206,7 +206,7 @@ where ...@@ -206,7 +206,7 @@ where
async fn generate_with_fault_detection( async fn generate_with_fault_detection(
&self, &self,
instance_id: i64, instance_id: u64,
request: SingleIn<T>, request: SingleIn<T>,
) -> anyhow::Result<ManyOut<U>> { ) -> anyhow::Result<ManyOut<U>> {
// Check if all workers are busy (only if busy threshold is set) // Check if all workers are busy (only if busy threshold is set)
......
...@@ -39,7 +39,7 @@ impl PushEndpoint { ...@@ -39,7 +39,7 @@ impl PushEndpoint {
namespace: String, namespace: String,
component_name: String, component_name: String,
endpoint_name: String, endpoint_name: String,
instance_id: i64, instance_id: u64,
system_health: Arc<Mutex<SystemHealth>>, system_health: Arc<Mutex<SystemHealth>>,
) -> Result<()> { ) -> Result<()> {
let mut endpoint = endpoint; let mut endpoint = endpoint;
......
...@@ -64,26 +64,115 @@ impl From<&Key> for String { ...@@ -64,26 +64,115 @@ impl From<&Key> for String {
#[async_trait] #[async_trait]
pub trait KeyValueStore: Send + Sync { pub trait KeyValueStore: Send + Sync {
type Bucket: KeyValueBucket + Send + Sync + 'static;
async fn get_or_create_bucket( async fn get_or_create_bucket(
&self, &self,
bucket_name: &str, bucket_name: &str,
// auto-delete items older than this // auto-delete items older than this
ttl: Option<Duration>, ttl: Option<Duration>,
) -> Result<Box<dyn KeyValueBucket>, StoreError>; ) -> Result<Self::Bucket, StoreError>;
async fn get_bucket(&self, bucket_name: &str) -> Result<Option<Self::Bucket>, StoreError>;
fn connection_id(&self) -> u64;
}
#[allow(clippy::large_enum_variant)]
pub enum KeyValueStoreEnum {
Memory(MemoryStore),
Nats(NATSStore),
Etcd(EtcdStore),
}
impl KeyValueStoreEnum {
async fn get_or_create_bucket(
&self,
bucket_name: &str,
// auto-delete items older than this
ttl: Option<Duration>,
) -> Result<Box<dyn KeyValueBucket>, StoreError> {
use KeyValueStoreEnum::*;
Ok(match self {
Memory(x) => Box::new(x.get_or_create_bucket(bucket_name, ttl).await?),
Nats(x) => Box::new(x.get_or_create_bucket(bucket_name, ttl).await?),
Etcd(x) => Box::new(x.get_or_create_bucket(bucket_name, ttl).await?),
})
}
async fn get_bucket( async fn get_bucket(
&self, &self,
bucket_name: &str, bucket_name: &str,
) -> Result<Option<Box<dyn KeyValueBucket>>, StoreError>; ) -> Result<Option<Box<dyn KeyValueBucket>>, StoreError> {
use KeyValueStoreEnum::*;
let maybe_bucket: Option<Box<dyn KeyValueBucket>> = match self {
Memory(x) => x
.get_bucket(bucket_name)
.await?
.map(|b| Box::new(b) as Box<dyn KeyValueBucket>),
Nats(x) => x
.get_bucket(bucket_name)
.await?
.map(|b| Box::new(b) as Box<dyn KeyValueBucket>),
Etcd(x) => x
.get_bucket(bucket_name)
.await?
.map(|b| Box::new(b) as Box<dyn KeyValueBucket>),
};
Ok(maybe_bucket)
}
fn connection_id(&self) -> u64; fn connection_id(&self) -> u64 {
use KeyValueStoreEnum::*;
match self {
Memory(x) => x.connection_id(),
Etcd(x) => x.connection_id(),
Nats(x) => x.connection_id(),
}
}
} }
pub struct KeyValueStoreManager(Box<dyn KeyValueStore>); #[derive(Clone)]
pub struct KeyValueStoreManager(Arc<KeyValueStoreEnum>);
impl Default for KeyValueStoreManager {
fn default() -> Self {
KeyValueStoreManager::memory()
}
}
impl KeyValueStoreManager { impl KeyValueStoreManager {
pub fn new(s: Box<dyn KeyValueStore>) -> KeyValueStoreManager { /// In-memory KeyValueStoreManager for testing
KeyValueStoreManager(s) pub fn memory() -> Self {
Self::new(KeyValueStoreEnum::Memory(MemoryStore::new()))
}
pub fn etcd(etcd_client: crate::transports::etcd::Client) -> Self {
Self::new(KeyValueStoreEnum::Etcd(EtcdStore::new(etcd_client)))
}
fn new(s: KeyValueStoreEnum) -> KeyValueStoreManager {
KeyValueStoreManager(Arc::new(s))
}
pub async fn get_or_create_bucket(
&self,
bucket_name: &str,
// auto-delete items older than this
ttl: Option<Duration>,
) -> Result<Box<dyn KeyValueBucket>, StoreError> {
self.0.get_or_create_bucket(bucket_name, ttl).await
}
pub async fn get_bucket(
&self,
bucket_name: &str,
) -> Result<Option<Box<dyn KeyValueBucket>>, StoreError> {
self.0.get_bucket(bucket_name).await
}
pub fn connection_id(&self) -> u64 {
self.0.connection_id()
} }
pub async fn load<T: for<'a> Deserialize<'a>>( pub async fn load<T: for<'a> Deserialize<'a>>(
...@@ -95,17 +184,13 @@ impl KeyValueStoreManager { ...@@ -95,17 +184,13 @@ impl KeyValueStoreManager {
// No bucket means no cards // No bucket means no cards
return Ok(None); return Ok(None);
}; };
match bucket.get(key).await { Ok(match bucket.get(key).await? {
Ok(Some(card_bytes)) => { Some(card_bytes) => {
let card: T = serde_json::from_slice(card_bytes.as_ref())?; let card: T = serde_json::from_slice(card_bytes.as_ref())?;
Ok(Some(card)) Some(card)
} }
Ok(None) => Ok(None), None => None,
Err(err) => { })
// TODO look at what errors NATS can give us and make more specific wrappers
Err(StoreError::NATSError(err.to_string()))
}
}
} }
/// Returns a receiver that will receive all the existing keys, and /// Returns a receiver that will receive all the existing keys, and
...@@ -115,6 +200,7 @@ impl KeyValueStoreManager { ...@@ -115,6 +200,7 @@ impl KeyValueStoreManager {
self: Arc<Self>, self: Arc<Self>,
bucket_name: &str, bucket_name: &str,
bucket_ttl: Option<Duration>, bucket_ttl: Option<Duration>,
cancel_token: CancellationToken,
) -> ( ) -> (
tokio::task::JoinHandle<Result<(), StoreError>>, tokio::task::JoinHandle<Result<(), StoreError>>,
tokio::sync::mpsc::UnboundedReceiver<T>, tokio::sync::mpsc::UnboundedReceiver<T>,
...@@ -136,7 +222,14 @@ impl KeyValueStoreManager { ...@@ -136,7 +222,14 @@ impl KeyValueStoreManager {
} }
// Now block waiting for new entries // Now block waiting for new entries
while let Some(card_bytes) = stream.next().await { loop {
let card_bytes = tokio::select! {
_ = cancel_token.cancelled() => break,
result = stream.next() => match result {
Some(bytes) => bytes,
None => break,
}
};
let card: T = serde_json::from_slice(card_bytes.as_ref())?; let card: T = serde_json::from_slice(card_bytes.as_ref())?;
let _ = tx.send(card); let _ = tx.send(card);
} }
...@@ -170,7 +263,7 @@ impl KeyValueStoreManager { ...@@ -170,7 +263,7 @@ impl KeyValueStoreManager {
/// An online storage for key-value config values. /// An online storage for key-value config values.
/// Usually backed by `nats-server`. /// Usually backed by `nats-server`.
#[async_trait] #[async_trait]
pub trait KeyValueBucket: Send { pub trait KeyValueBucket: Send + Sync {
/// A bucket is a collection of key/value pairs. /// A bucket is a collection of key/value pairs.
/// Insert a value into a bucket, if it doesn't exist already /// Insert a value into a bucket, if it doesn't exist already
async fn insert( async fn insert(
...@@ -191,7 +284,7 @@ pub trait KeyValueBucket: Send { ...@@ -191,7 +284,7 @@ pub trait KeyValueBucket: Send {
/// such time. /// such time.
async fn watch( async fn watch(
&self, &self,
) -> Result<Pin<Box<dyn futures::Stream<Item = bytes::Bytes> + Send + 'life0>>, StoreError>; ) -> Result<Pin<Box<dyn futures::Stream<Item = bytes::Bytes> + Send + '_>>, StoreError>;
async fn entries(&self) -> Result<HashMap<String, bytes::Bytes>, StoreError>; async fn entries(&self) -> Result<HashMap<String, bytes::Bytes>, StoreError>;
} }
...@@ -230,7 +323,7 @@ pub enum StoreError { ...@@ -230,7 +323,7 @@ pub enum StoreError {
#[error("Internal etcd error: {0}")] #[error("Internal etcd error: {0}")]
EtcdError(String), EtcdError(String),
#[error("Key Value Error: {0} for bucket '{1}")] #[error("Key Value Error: {0} for bucket '{1}'")]
KeyValueError(String, String), KeyValueError(String, String),
#[error("Error decoding bytes: {0}")] #[error("Error decoding bytes: {0}")]
......
...@@ -25,31 +25,31 @@ impl EtcdStore { ...@@ -25,31 +25,31 @@ impl EtcdStore {
#[async_trait] #[async_trait]
impl KeyValueStore for EtcdStore { impl KeyValueStore for EtcdStore {
type Bucket = EtcdBucket;
/// A "bucket" in etcd is a path prefix /// A "bucket" in etcd is a path prefix
async fn get_or_create_bucket( async fn get_or_create_bucket(
&self, &self,
bucket_name: &str, bucket_name: &str,
_ttl: Option<Duration>, // TODO ttl not used yet _ttl: Option<Duration>, // TODO ttl not used yet
) -> Result<Box<dyn KeyValueBucket>, StoreError> { ) -> Result<Self::Bucket, StoreError> {
Ok(self.get_bucket(bucket_name).await?.unwrap()) Ok(EtcdBucket {
client: self.client.clone(),
bucket_name: bucket_name.to_string(),
})
} }
/// A "bucket" in etcd is a path prefix. This creates an EtcdBucket object without doing /// A "bucket" in etcd is a path prefix. This creates an EtcdBucket object without doing
/// any network calls. /// any network calls.
async fn get_bucket( async fn get_bucket(&self, bucket_name: &str) -> Result<Option<Self::Bucket>, StoreError> {
&self, Ok(Some(EtcdBucket {
bucket_name: &str,
) -> Result<Option<Box<dyn KeyValueBucket>>, StoreError> {
Ok(Some(Box::new(EtcdBucket {
client: self.client.clone(), client: self.client.clone(),
bucket_name: bucket_name.to_string(), bucket_name: bucket_name.to_string(),
}))) }))
} }
fn connection_id(&self) -> u64 { fn connection_id(&self) -> u64 {
// This conversion from i64 to u64 is safe because etcd lease IDs are u64 internally. self.client.lease_id()
// They present as i64 because of the limitations of the etcd grpc/HTTP JSON API.
self.client.lease_id() as u64
} }
} }
...@@ -108,7 +108,7 @@ impl KeyValueBucket for EtcdBucket { ...@@ -108,7 +108,7 @@ impl KeyValueBucket for EtcdBucket {
{ {
let k = make_key(&self.bucket_name, &"".into()); let k = make_key(&self.bucket_name, &"".into());
tracing::trace!("etcd watch: {k}"); tracing::trace!("etcd watch: {k}");
let (_watcher, mut watch_stream) = self let (watcher, mut watch_stream) = self
.client .client
.etcd_client() .etcd_client()
.clone() .clone()
...@@ -116,6 +116,7 @@ impl KeyValueBucket for EtcdBucket { ...@@ -116,6 +116,7 @@ impl KeyValueBucket for EtcdBucket {
.await .await
.map_err(|e| StoreError::EtcdError(e.to_string()))?; .map_err(|e| StoreError::EtcdError(e.to_string()))?;
let output = stream! { let output = stream! {
let _watcher = watcher; // Keep it alive. Not sure if necessary.
while let Ok(Some(resp)) = watch_stream.message().await { while let Ok(Some(resp)) = watch_stream.message().await {
for e in resp.events() { for e in resp.events() {
if matches!(e.event_type(), EventType::Put) && e.kv().is_some() { if matches!(e.event_type(), EventType::Put) && e.kv().is_some() {
...@@ -155,7 +156,7 @@ impl EtcdBucket { ...@@ -155,7 +156,7 @@ impl EtcdBucket {
tracing::trace!("etcd create: {k}"); tracing::trace!("etcd create: {k}");
// Use atomic transaction to check and create in one operation // Use atomic transaction to check and create in one operation
let put_options = PutOptions::new().with_lease(self.client.primary_lease().id()); let put_options = PutOptions::new().with_lease(self.client.primary_lease().id() as i64);
// Build transaction that creates key only if it doesn't exist // Build transaction that creates key only if it doesn't exist
let txn = Txn::new() let txn = Txn::new()
...@@ -224,7 +225,7 @@ impl EtcdBucket { ...@@ -224,7 +225,7 @@ impl EtcdBucket {
} }
let put_options = PutOptions::new() let put_options = PutOptions::new()
.with_lease(self.client.primary_lease().id()) .with_lease(self.client.primary_lease().id() as i64)
.with_prev_key(); .with_prev_key();
let mut put_resp = self let mut put_resp = self
.client .client
......
...@@ -67,35 +67,34 @@ impl MemoryStore { ...@@ -67,35 +67,34 @@ impl MemoryStore {
#[async_trait] #[async_trait]
impl KeyValueStore for MemoryStore { impl KeyValueStore for MemoryStore {
type Bucket = MemoryBucketRef;
async fn get_or_create_bucket( async fn get_or_create_bucket(
&self, &self,
bucket_name: &str, bucket_name: &str,
// MemoryStore doesn't respect TTL yet // MemoryStore doesn't respect TTL yet
_ttl: Option<Duration>, _ttl: Option<Duration>,
) -> Result<Box<dyn KeyValueBucket>, StoreError> { ) -> Result<Self::Bucket, StoreError> {
let mut locked_data = self.inner.data.lock().await; let mut locked_data = self.inner.data.lock().await;
// Ensure the bucket exists // Ensure the bucket exists
locked_data locked_data
.entry(bucket_name.to_string()) .entry(bucket_name.to_string())
.or_insert_with(MemoryBucket::new); .or_insert_with(MemoryBucket::new);
// Return an object able to access it // Return an object able to access it
Ok(Box::new(MemoryBucketRef { Ok(MemoryBucketRef {
name: bucket_name.to_string(), name: bucket_name.to_string(),
inner: self.inner.clone(), inner: self.inner.clone(),
})) })
} }
/// This operation cannot fail on MemoryStore. Always returns Ok. /// This operation cannot fail on MemoryStore. Always returns Ok.
async fn get_bucket( async fn get_bucket(&self, bucket_name: &str) -> Result<Option<Self::Bucket>, StoreError> {
&self,
bucket_name: &str,
) -> Result<Option<Box<dyn KeyValueBucket>>, StoreError> {
let locked_data = self.inner.data.lock().await; let locked_data = self.inner.data.lock().await;
match locked_data.get(bucket_name) { match locked_data.get(bucket_name) {
Some(_) => Ok(Some(Box::new(MemoryBucketRef { Some(_) => Ok(Some(MemoryBucketRef {
name: bucket_name.to_string(), name: bucket_name.to_string(),
inner: self.inner.clone(), inner: self.inner.clone(),
}))), })),
None => Ok(None), None => Ok(None),
} }
} }
......
...@@ -23,25 +23,24 @@ pub struct NATSBucket { ...@@ -23,25 +23,24 @@ pub struct NATSBucket {
#[async_trait] #[async_trait]
impl KeyValueStore for NATSStore { impl KeyValueStore for NATSStore {
type Bucket = NATSBucket;
async fn get_or_create_bucket( async fn get_or_create_bucket(
&self, &self,
bucket_name: &str, bucket_name: &str,
ttl: Option<Duration>, ttl: Option<Duration>,
) -> Result<Box<dyn KeyValueBucket>, StoreError> { ) -> Result<Self::Bucket, StoreError> {
let name = Slug::slugify(bucket_name); let name = Slug::slugify(bucket_name);
let nats_store = self let nats_store = self
.get_or_create_key_value(&self.endpoint.namespace, &name, ttl) .get_or_create_key_value(&self.endpoint.namespace, &name, ttl)
.await?; .await?;
Ok(Box::new(NATSBucket { nats_store })) Ok(NATSBucket { nats_store })
} }
async fn get_bucket( async fn get_bucket(&self, bucket_name: &str) -> Result<Option<Self::Bucket>, StoreError> {
&self,
bucket_name: &str,
) -> Result<Option<Box<dyn KeyValueBucket>>, StoreError> {
let name = Slug::slugify(bucket_name); let name = Slug::slugify(bucket_name);
match self.get_key_value(&self.endpoint.namespace, &name).await? { match self.get_key_value(&self.endpoint.namespace, &name).await? {
Some(nats_store) => Ok(Some(Box::new(NATSBucket { nats_store }))), Some(nats_store) => Ok(Some(NATSBucket { nats_store })),
None => Ok(None), None => Ok(None),
} }
} }
......
...@@ -34,15 +34,22 @@ use super::utils::build_in_runtime; ...@@ -34,15 +34,22 @@ use super::utils::build_in_runtime;
#[derive(Clone)] #[derive(Clone)]
pub struct Client { pub struct Client {
client: etcd_client::Client, client: etcd_client::Client,
primary_lease: i64, primary_lease: u64,
runtime: Runtime, runtime: Runtime,
rt: Arc<tokio::runtime::Runtime>, rt: Arc<tokio::runtime::Runtime>,
} }
impl std::fmt::Debug for Client {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "etcd::Client primary_lease={}", self.primary_lease)
}
}
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct Lease { pub struct Lease {
/// ETCD lease ID /// ETCD lease ID
id: i64, /// Delivered as i64 by etcd because of documented gRPC limitations.
id: u64,
/// [`CancellationToken`] associated with the lease /// [`CancellationToken`] associated with the lease
cancel_token: CancellationToken, cancel_token: CancellationToken,
...@@ -50,7 +57,7 @@ pub struct Lease { ...@@ -50,7 +57,7 @@ pub struct Lease {
impl Lease { impl Lease {
/// Get the lease ID /// Get the lease ID
pub fn id(&self) -> i64 { pub fn id(&self) -> u64 {
self.id self.id
} }
...@@ -146,7 +153,7 @@ impl Client { ...@@ -146,7 +153,7 @@ impl Client {
} }
/// Get the primary lease ID. /// Get the primary lease ID.
pub fn lease_id(&self) -> i64 { pub fn lease_id(&self) -> u64 {
self.primary_lease self.primary_lease
} }
...@@ -160,7 +167,7 @@ impl Client { ...@@ -160,7 +167,7 @@ impl Client {
/// Create a [`Lease`] with a given time-to-live (TTL). /// Create a [`Lease`] with a given time-to-live (TTL).
/// This [`Lease`] will be tied to the [`Runtime`], specifically a child [`CancellationToken`]. /// This [`Lease`] will be tied to the [`Runtime`], specifically a child [`CancellationToken`].
pub async fn create_lease(&self, ttl: i64) -> Result<Lease> { pub async fn create_lease(&self, ttl: u64) -> Result<Lease> {
let token = self.runtime.child_token(); let token = self.runtime.child_token();
let lease_client = self.client.lease_client(); let lease_client = self.client.lease_client();
self.rt self.rt
...@@ -169,14 +176,14 @@ impl Client { ...@@ -169,14 +176,14 @@ impl Client {
} }
// Revoke an etcd lease given its lease id. A wrapper over etcd_client::LeaseClient::revoke // Revoke an etcd lease given its lease id. A wrapper over etcd_client::LeaseClient::revoke
pub async fn revoke_lease(&self, lease_id: i64) -> Result<()> { pub async fn revoke_lease(&self, lease_id: u64) -> Result<()> {
let lease_client = self.client.lease_client(); let lease_client = self.client.lease_client();
self.rt.spawn(revoke_lease(lease_client, lease_id)).await? self.rt.spawn(revoke_lease(lease_client, lease_id)).await?
} }
pub async fn kv_create(&self, key: &str, value: Vec<u8>, lease_id: Option<i64>) -> Result<()> { pub async fn kv_create(&self, key: &str, value: Vec<u8>, lease_id: Option<u64>) -> Result<()> {
let id = lease_id.unwrap_or(self.lease_id()); let id = lease_id.unwrap_or(self.lease_id());
let put_options = PutOptions::new().with_lease(id); let put_options = PutOptions::new().with_lease(id as i64);
// Build the transaction // Build the transaction
let txn = Txn::new() let txn = Txn::new()
...@@ -203,10 +210,10 @@ impl Client { ...@@ -203,10 +210,10 @@ impl Client {
&self, &self,
key: String, key: String,
value: Vec<u8>, value: Vec<u8>,
lease_id: Option<i64>, lease_id: Option<u64>,
) -> Result<()> { ) -> Result<()> {
let id = lease_id.unwrap_or(self.lease_id()); let id = lease_id.unwrap_or(self.lease_id());
let put_options = PutOptions::new().with_lease(id); let put_options = PutOptions::new().with_lease(id as i64);
// Build the transaction that either creates the key if it doesn't exist, // Build the transaction that either creates the key if it doesn't exist,
// or validates the existing value matches what we expect // or validates the existing value matches what we expect
...@@ -254,10 +261,10 @@ impl Client { ...@@ -254,10 +261,10 @@ impl Client {
&self, &self,
key: impl AsRef<str>, key: impl AsRef<str>,
value: impl AsRef<[u8]>, value: impl AsRef<[u8]>,
lease_id: Option<i64>, lease_id: Option<u64>,
) -> Result<()> { ) -> Result<()> {
let id = lease_id.unwrap_or(self.lease_id()); let id = lease_id.unwrap_or(self.lease_id());
let put_options = PutOptions::new().with_lease(id); let put_options = PutOptions::new().with_lease(id as i64);
let _ = self let _ = self
.client .client
.kv_client() .kv_client()
...@@ -274,7 +281,7 @@ impl Client { ...@@ -274,7 +281,7 @@ impl Client {
) -> Result<PutResponse> { ) -> Result<PutResponse> {
let options = options let options = options
.unwrap_or_default() .unwrap_or_default()
.with_lease(self.primary_lease().id()); .with_lease(self.primary_lease().id() as i64);
self.client self.client
.kv_client() .kv_client()
.put(key.as_ref(), value.as_ref(), Some(options)) .put(key.as_ref(), value.as_ref(), Some(options))
...@@ -295,12 +302,12 @@ impl Client { ...@@ -295,12 +302,12 @@ impl Client {
&self, &self,
key: impl Into<Vec<u8>>, key: impl Into<Vec<u8>>,
options: Option<DeleteOptions>, options: Option<DeleteOptions>,
) -> Result<i64> { ) -> Result<u64> {
self.client self.client
.kv_client() .kv_client()
.delete(key, options) .delete(key, options)
.await .await
.map(|del_response| del_response.deleted()) .map(|del_response| del_response.deleted() as u64)
.map_err(|err| err.into()) .map_err(|err| err.into())
} }
...@@ -319,11 +326,11 @@ impl Client { ...@@ -319,11 +326,11 @@ impl Client {
pub async fn lock( pub async fn lock(
&self, &self,
key: impl Into<Vec<u8>>, key: impl Into<Vec<u8>>,
lease_id: Option<i64>, lease_id: Option<u64>,
) -> Result<LockResponse> { ) -> Result<LockResponse> {
let mut lock_client = self.client.lock_client(); let mut lock_client = self.client.lock_client();
let id = lease_id.unwrap_or(self.lease_id()); let id = lease_id.unwrap_or(self.lease_id());
let options = LockOptions::new().with_lease(id); let options = LockOptions::new().with_lease(id as i64);
lock_client lock_client
.lock(key, Some(options)) .lock(key, Some(options))
.await .await
...@@ -629,7 +636,7 @@ impl KvCache { ...@@ -629,7 +636,7 @@ impl KvCache {
} }
/// Update a value in both the cache and etcd /// Update a value in both the cache and etcd
pub async fn put(&self, key: &str, value: Vec<u8>, lease_id: Option<i64>) -> Result<()> { pub async fn put(&self, key: &str, value: Vec<u8>, lease_id: Option<u64>) -> Result<()> {
let full_key = format!("{}{}", self.prefix, key); let full_key = format!("{}{}", self.prefix, key);
// Update etcd first // Update etcd first
......
...@@ -6,13 +6,13 @@ use super::*; ...@@ -6,13 +6,13 @@ use super::*;
/// Create a [`Lease`] with a given time-to-live (TTL) attached to the [`CancellationToken`]. /// Create a [`Lease`] with a given time-to-live (TTL) attached to the [`CancellationToken`].
pub async fn create_lease( pub async fn create_lease(
mut lease_client: LeaseClient, mut lease_client: LeaseClient,
ttl: i64, ttl: u64,
token: CancellationToken, token: CancellationToken,
) -> Result<Lease> { ) -> Result<Lease> {
let lease = lease_client.grant(ttl, None).await?; let lease = lease_client.grant(ttl as i64, None).await?;
let id = lease.id(); let id = lease.id() as u64;
let ttl = lease.ttl(); let ttl = lease.ttl() as u64;
let child = token.child_token(); let child = token.child_token();
let clone = token.clone(); let clone = token.clone();
...@@ -36,8 +36,8 @@ pub async fn create_lease( ...@@ -36,8 +36,8 @@ pub async fn create_lease(
} }
/// Revoke a lease given its lease id. A wrapper over etcd_client::LeaseClient::revoke /// Revoke a lease given its lease id. A wrapper over etcd_client::LeaseClient::revoke
pub async fn revoke_lease(mut lease_client: LeaseClient, lease_id: i64) -> Result<()> { pub async fn revoke_lease(mut lease_client: LeaseClient, lease_id: u64) -> Result<()> {
match lease_client.revoke(lease_id).await { match lease_client.revoke(lease_id as i64).await {
Ok(_) => Ok(()), Ok(_) => Ok(()),
Err(e) => { Err(e) => {
tracing::warn!("failed to revoke lease: {:?}", e); tracing::warn!("failed to revoke lease: {:?}", e);
...@@ -52,15 +52,15 @@ pub async fn revoke_lease(mut lease_client: LeaseClient, lease_id: i64) -> Resul ...@@ -52,15 +52,15 @@ pub async fn revoke_lease(mut lease_client: LeaseClient, lease_id: i64) -> Resul
/// If /// If
pub async fn keep_alive( pub async fn keep_alive(
client: LeaseClient, client: LeaseClient,
lease_id: i64, lease_id: u64,
ttl: i64, ttl: u64,
token: CancellationToken, token: CancellationToken,
) -> Result<()> { ) -> Result<()> {
let mut ttl = ttl; let mut ttl = ttl;
let mut deadline = create_deadline(ttl)?; let mut deadline = create_deadline(ttl)?;
let mut client = client; let mut client = client;
let (mut heartbeat_sender, mut heartbeat_receiver) = client.keep_alive(lease_id).await?; let (mut heartbeat_sender, mut heartbeat_receiver) = client.keep_alive(lease_id as i64).await?;
loop { loop {
// if the deadline is exceeded, then we have failed to issue a heartbeat in time // if the deadline is exceeded, then we have failed to issue a heartbeat in time
...@@ -79,7 +79,7 @@ pub async fn keep_alive( ...@@ -79,7 +79,7 @@ pub async fn keep_alive(
tracing::trace!(lease_id, "keep alive response received: {:?}", resp); tracing::trace!(lease_id, "keep alive response received: {:?}", resp);
// update ttl and deadline // update ttl and deadline
ttl = resp.ttl(); ttl = resp.ttl() as u64;
deadline = create_deadline(ttl)?; deadline = create_deadline(ttl)?;
if resp.ttl() == 0 { if resp.ttl() == 0 {
...@@ -91,11 +91,11 @@ pub async fn keep_alive( ...@@ -91,11 +91,11 @@ pub async fn keep_alive(
_ = token.cancelled() => { _ = token.cancelled() => {
tracing::trace!(lease_id, "cancellation token triggered; revoking lease"); tracing::trace!(lease_id, "cancellation token triggered; revoking lease");
let _ = client.revoke(lease_id).await?; let _ = client.revoke(lease_id as i64).await?;
return Ok(()); return Ok(());
} }
_ = tokio::time::sleep(tokio::time::Duration::from_secs(ttl as u64 / 2)) => { _ = tokio::time::sleep(tokio::time::Duration::from_secs(ttl / 2)) => {
tracing::trace!(lease_id, "sending keep alive"); tracing::trace!(lease_id, "sending keep alive");
// if we get a error issuing the heartbeat, set the ttl to 0 // if we get a error issuing the heartbeat, set the ttl to 0
...@@ -117,9 +117,6 @@ pub async fn keep_alive( ...@@ -117,9 +117,6 @@ pub async fn keep_alive(
} }
/// Create a deadline for a given time-to-live (TTL). /// Create a deadline for a given time-to-live (TTL).
fn create_deadline(ttl: i64) -> Result<std::time::Instant> { fn create_deadline(ttl: u64) -> Result<std::time::Instant> {
if ttl <= 0 { Ok(std::time::Instant::now() + std::time::Duration::from_secs(ttl))
return Err(error!("invalid ttl: {}", ttl));
}
Ok(std::time::Instant::now() + std::time::Duration::from_secs(ttl as u64))
} }
...@@ -112,7 +112,7 @@ impl DistributedRWLock { ...@@ -112,7 +112,7 @@ impl DistributedRWLock {
) -> Option<WriteLockGuard<'a>> { ) -> Option<WriteLockGuard<'a>> {
let write_key = format!("v1/{}/writer", self.lock_prefix); let write_key = format!("v1/{}/writer", self.lock_prefix);
let lease_id = etcd_client.lease_id(); let lease_id = etcd_client.lease_id();
let put_options = PutOptions::new().with_lease(lease_id); let put_options = PutOptions::new().with_lease(lease_id as i64);
// Step 1: Atomically create write lock only if it doesn't exist // Step 1: Atomically create write lock only if it doesn't exist
let txn = Txn::new() let txn = Txn::new()
...@@ -204,7 +204,7 @@ impl DistributedRWLock { ...@@ -204,7 +204,7 @@ impl DistributedRWLock {
// Try to atomically acquire read lock // Try to atomically acquire read lock
// The transaction checks that no writer exists and creates reader key atomically // The transaction checks that no writer exists and creates reader key atomically
let put_options = PutOptions::new().with_lease(lease_id); let put_options = PutOptions::new().with_lease(lease_id as i64);
// Build atomic transaction: create reader key only if write_key doesn't exist // Build atomic transaction: create reader key only if write_key doesn't exist
let txn = Txn::new() let txn = Txn::new()
......
...@@ -85,7 +85,7 @@ async fn create_barrier_key<T: Serialize>( ...@@ -85,7 +85,7 @@ async fn create_barrier_key<T: Serialize>(
client: &Client, client: &Client,
key: &str, key: &str,
data: T, data: T,
lease_id: Option<i64>, lease_id: Option<u64>,
) -> Result<(), LeaderWorkerBarrierError> { ) -> Result<(), LeaderWorkerBarrierError> {
let serialized_data = let serialized_data =
serde_json::to_vec(&data).map_err(LeaderWorkerBarrierError::SerdeError)?; serde_json::to_vec(&data).map_err(LeaderWorkerBarrierError::SerdeError)?;
...@@ -151,7 +151,7 @@ impl<LeaderData: Serialize + DeserializeOwned, WorkerData: Serialize + Deseriali ...@@ -151,7 +151,7 @@ impl<LeaderData: Serialize + DeserializeOwned, WorkerData: Serialize + Deseriali
data: &LeaderData, data: &LeaderData,
) -> anyhow::Result<HashMap<String, WorkerData>, LeaderWorkerBarrierError> { ) -> anyhow::Result<HashMap<String, WorkerData>, LeaderWorkerBarrierError> {
let etcd_client = rt let etcd_client = rt
.etcd_client() .deprecated_etcd_client()
.ok_or(LeaderWorkerBarrierError::EtcdClientNotFound)?; .ok_or(LeaderWorkerBarrierError::EtcdClientNotFound)?;
let lease_id = etcd_client.lease_id(); let lease_id = etcd_client.lease_id();
...@@ -178,7 +178,7 @@ impl<LeaderData: Serialize + DeserializeOwned, WorkerData: Serialize + Deseriali ...@@ -178,7 +178,7 @@ impl<LeaderData: Serialize + DeserializeOwned, WorkerData: Serialize + Deseriali
&self, &self,
client: &Client, client: &Client,
data: &LeaderData, data: &LeaderData,
lease_id: i64, lease_id: u64,
) -> Result<(), LeaderWorkerBarrierError> { ) -> Result<(), LeaderWorkerBarrierError> {
let key = barrier_key(&self.barrier_id, BARRIER_DATA); let key = barrier_key(&self.barrier_id, BARRIER_DATA);
create_barrier_key(client, &key, data, Some(lease_id)).await create_barrier_key(client, &key, data, Some(lease_id)).await
...@@ -197,7 +197,7 @@ impl<LeaderData: Serialize + DeserializeOwned, WorkerData: Serialize + Deseriali ...@@ -197,7 +197,7 @@ impl<LeaderData: Serialize + DeserializeOwned, WorkerData: Serialize + Deseriali
&self, &self,
client: &Client, client: &Client,
worker_result: &Result<HashMap<String, WorkerData>, LeaderWorkerBarrierError>, worker_result: &Result<HashMap<String, WorkerData>, LeaderWorkerBarrierError>,
lease_id: i64, lease_id: u64,
) -> Result<(), LeaderWorkerBarrierError> { ) -> Result<(), LeaderWorkerBarrierError> {
if let Ok(worker_result) = worker_result { if let Ok(worker_result) = worker_result {
let key = barrier_key(&self.barrier_id, BARRIER_COMPLETE); let key = barrier_key(&self.barrier_id, BARRIER_COMPLETE);
...@@ -245,7 +245,7 @@ impl<LeaderData: Serialize + DeserializeOwned, WorkerData: Serialize + Deseriali ...@@ -245,7 +245,7 @@ impl<LeaderData: Serialize + DeserializeOwned, WorkerData: Serialize + Deseriali
data: &WorkerData, data: &WorkerData,
) -> anyhow::Result<LeaderData, LeaderWorkerBarrierError> { ) -> anyhow::Result<LeaderData, LeaderWorkerBarrierError> {
let etcd_client = rt let etcd_client = rt
.etcd_client() .deprecated_etcd_client()
.ok_or(LeaderWorkerBarrierError::EtcdClientNotFound)?; .ok_or(LeaderWorkerBarrierError::EtcdClientNotFound)?;
let lease_id = etcd_client.lease_id(); let lease_id = etcd_client.lease_id();
...@@ -284,7 +284,7 @@ impl<LeaderData: Serialize + DeserializeOwned, WorkerData: Serialize + Deseriali ...@@ -284,7 +284,7 @@ impl<LeaderData: Serialize + DeserializeOwned, WorkerData: Serialize + Deseriali
&self, &self,
client: &Client, client: &Client,
data: &WorkerData, data: &WorkerData,
lease_id: i64, lease_id: u64,
) -> Result<String, LeaderWorkerBarrierError> { ) -> Result<String, LeaderWorkerBarrierError> {
let key = barrier_key( let key = barrier_key(
&self.barrier_id, &self.barrier_id,
......
...@@ -208,8 +208,8 @@ pub mod key_extractors { ...@@ -208,8 +208,8 @@ pub mod key_extractors {
use etcd_client::KeyValue; use etcd_client::KeyValue;
/// Extract the lease ID as the key /// Extract the lease ID as the key
pub fn lease_id(kv: &KeyValue) -> Option<i64> { pub fn lease_id(kv: &KeyValue) -> Option<u64> {
Some(kv.lease()) Some(kv.lease() as u64)
} }
/// Extract the key as a string (without prefix) /// Extract the key as a string (without prefix)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment