Unverified Commit 13ce09ea authored by Graham King's avatar Graham King Committed by GitHub
Browse files

chore: KV Router start using Store instead of etcd (#3945)


Signed-off-by: default avatarGraham King <grahamk@nvidia.com>
parent eb3a486d
...@@ -9,8 +9,11 @@ use std::{ ...@@ -9,8 +9,11 @@ use std::{
use parking_lot::{Mutex, RwLock}; use parking_lot::{Mutex, RwLock};
use tokio::sync::oneshot; use tokio::sync::oneshot;
use dynamo_runtime::component::{Component, Endpoint};
use dynamo_runtime::prelude::DistributedRuntimeProvider; use dynamo_runtime::prelude::DistributedRuntimeProvider;
use dynamo_runtime::{
component::{Component, Endpoint},
storage::key_value_store::Key,
};
use crate::{ use crate::{
discovery::KV_ROUTERS_ROOT_PATH, discovery::KV_ROUTERS_ROOT_PATH,
...@@ -309,24 +312,15 @@ impl ModelManager { ...@@ -309,24 +312,15 @@ impl ModelManager {
return Ok(kv_chooser); return Ok(kv_chooser);
} }
// Create new KV router with etcd registration let store = component.drt().store();
let etcd_client = component let router_bucket = store
.drt() .get_or_create_bucket(KV_ROUTERS_ROOT_PATH, None)
.etcd_client() .await?;
.ok_or_else(|| anyhow::anyhow!("KV routing requires etcd (dynamic mode)"))?;
let router_uuid = uuid::Uuid::new_v4(); let router_uuid = uuid::Uuid::new_v4();
let router_key = format!( let router_key = Key::from_raw(format!("{}/{router_uuid}", component.path()));
"{}/{}/{}", let json_router_config = serde_json::to_vec_pretty(&kv_router_config.unwrap_or_default())?;
KV_ROUTERS_ROOT_PATH, router_bucket
component.path(), .insert(&router_key, json_router_config.into(), 0)
router_uuid
);
etcd_client
.kv_create(
&router_key,
serde_json::to_vec_pretty(&kv_router_config.unwrap_or_default())?,
None, // use primary lease
)
.await?; .await?;
let selector = Box::new(DefaultWorkerSelector::new(kv_router_config)); let selector = Box::new(DefaultWorkerSelector::new(kv_router_config));
......
...@@ -290,10 +290,10 @@ impl KeyValueStoreManager { ...@@ -290,10 +290,10 @@ impl KeyValueStoreManager {
key: &Key, key: &Key,
obj: &mut T, obj: &mut T,
) -> anyhow::Result<StoreOutcome> { ) -> anyhow::Result<StoreOutcome> {
let obj_json = serde_json::to_string(obj)?; let obj_json = serde_json::to_vec(obj)?;
let bucket = self.0.get_or_create_bucket(bucket_name, bucket_ttl).await?; let bucket = self.0.get_or_create_bucket(bucket_name, bucket_ttl).await?;
let outcome = bucket.insert(key, &obj_json, obj.revision()).await?; let outcome = bucket.insert(key, obj_json.into(), obj.revision()).await?;
match outcome { match outcome {
StoreOutcome::Created(revision) | StoreOutcome::Exists(revision) => { StoreOutcome::Created(revision) | StoreOutcome::Exists(revision) => {
...@@ -313,7 +313,7 @@ pub trait KeyValueBucket: Send + Sync { ...@@ -313,7 +313,7 @@ pub trait KeyValueBucket: Send + Sync {
async fn insert( async fn insert(
&self, &self,
key: &Key, key: &Key,
value: &str, value: bytes::Bytes,
revision: u64, revision: u64,
) -> Result<StoreOutcome, StoreError>; ) -> Result<StoreOutcome, StoreError>;
...@@ -434,14 +434,14 @@ mod tests { ...@@ -434,14 +434,14 @@ mod tests {
let s2 = Arc::clone(&s); let s2 = Arc::clone(&s);
let bucket = s.get_or_create_bucket(BUCKET_NAME, None).await?; let bucket = s.get_or_create_bucket(BUCKET_NAME, None).await?;
let res = bucket.insert(&"test1".into(), "value1", 0).await?; let res = bucket.insert(&"test1".into(), "value1".into(), 0).await?;
assert_eq!(res, StoreOutcome::Created(0)); assert_eq!(res, StoreOutcome::Created(0));
let mut expected = Vec::with_capacity(3); let mut expected = Vec::with_capacity(3);
for i in 1..=3 { for i in 1..=3 {
let item = WatchEvent::Put(KeyValue::new( let item = WatchEvent::Put(KeyValue::new(
format!("test{i}"), format!("test{i}"),
bytes::Bytes::from(format!("value{i}").into_bytes()), format!("value{i}").into(),
)); ));
expected.push(item); expected.push(item);
} }
...@@ -472,18 +472,18 @@ mod tests { ...@@ -472,18 +472,18 @@ mod tests {
// wouldn't be testing the watch behavior. // wouldn't be testing the watch behavior.
got_first_rx.await?; got_first_rx.await?;
let res = bucket.insert(&"test2".into(), "value2", 0).await?; let res = bucket.insert(&"test2".into(), "value2".into(), 0).await?;
assert_eq!(res, StoreOutcome::Created(0)); assert_eq!(res, StoreOutcome::Created(0));
// Repeat a key and revision. Ignored. // Repeat a key and revision. Ignored.
let res = bucket.insert(&"test2".into(), "value2", 0).await?; let res = bucket.insert(&"test2".into(), "value2".into(), 0).await?;
assert_eq!(res, StoreOutcome::Exists(0)); assert_eq!(res, StoreOutcome::Exists(0));
// Increment revision // Increment revision
let res = bucket.insert(&"test2".into(), "value2", 1).await?; let res = bucket.insert(&"test2".into(), "value2".into(), 1).await?;
assert_eq!(res, StoreOutcome::Created(1)); assert_eq!(res, StoreOutcome::Created(1));
let res = bucket.insert(&"test3".into(), "value3", 0).await?; let res = bucket.insert(&"test3".into(), "value3".into(), 0).await?;
assert_eq!(res, StoreOutcome::Created(0)); assert_eq!(res, StoreOutcome::Created(0));
// ingress exits once it has received all values // ingress exits once it has received all values
...@@ -500,7 +500,7 @@ mod tests { ...@@ -500,7 +500,7 @@ mod tests {
let bucket: &'static _ = let bucket: &'static _ =
Box::leak(Box::new(s.get_or_create_bucket(BUCKET_NAME, None).await?)); Box::leak(Box::new(s.get_or_create_bucket(BUCKET_NAME, None).await?));
let res = bucket.insert(&"test1".into(), "value1", 0).await?; let res = bucket.insert(&"test1".into(), "value1".into(), 0).await?;
assert_eq!(res, StoreOutcome::Created(0)); assert_eq!(res, StoreOutcome::Created(0));
let stream = bucket.watch().await?; let stream = bucket.watch().await?;
...@@ -509,10 +509,7 @@ mod tests { ...@@ -509,10 +509,7 @@ mod tests {
let mut rx1 = tap.subscribe(); let mut rx1 = tap.subscribe();
let mut rx2 = tap.subscribe(); let mut rx2 = tap.subscribe();
let item = WatchEvent::Put(KeyValue::new( let item = WatchEvent::Put(KeyValue::new("test1".to_string(), "GK".into()));
"test1".to_string(),
bytes::Bytes::from(b"GK".as_slice()),
));
let item_clone = item.clone(); let item_clone = item.clone();
let handle1 = tokio::spawn(async move { let handle1 = tokio::spawn(async move {
let b = rx1.recv().await.unwrap(); let b = rx1.recv().await.unwrap();
...@@ -523,7 +520,7 @@ mod tests { ...@@ -523,7 +520,7 @@ mod tests {
assert_eq!(b, item); assert_eq!(b, item);
}); });
bucket.insert(&"test1".into(), "GK", 1).await?; bucket.insert(&"test1".into(), "GK".into(), 1).await?;
let _ = futures::join!(handle1, handle2); let _ = futures::join!(handle1, handle2);
Ok(()) Ok(())
......
...@@ -66,7 +66,7 @@ impl KeyValueBucket for EtcdBucket { ...@@ -66,7 +66,7 @@ impl KeyValueBucket for EtcdBucket {
async fn insert( async fn insert(
&self, &self,
key: &Key, key: &Key,
value: &str, value: bytes::Bytes,
// "version" in etcd speak. revision is a global cluster-wide value // "version" in etcd speak. revision is a global cluster-wide value
revision: u64, revision: u64,
) -> Result<StoreOutcome, StoreError> { ) -> Result<StoreOutcome, StoreError> {
...@@ -169,7 +169,11 @@ impl KeyValueBucket for EtcdBucket { ...@@ -169,7 +169,11 @@ impl KeyValueBucket for EtcdBucket {
} }
impl EtcdBucket { impl EtcdBucket {
async fn create(&self, key: &Key, value: &str) -> Result<StoreOutcome, StoreError> { async fn create(
&self,
key: &Key,
value: impl Into<Vec<u8>>,
) -> Result<StoreOutcome, StoreError> {
let k = make_key(&self.bucket_name, key); let k = make_key(&self.bucket_name, key);
tracing::trace!("etcd create: {k}"); tracing::trace!("etcd create: {k}");
...@@ -215,7 +219,7 @@ impl EtcdBucket { ...@@ -215,7 +219,7 @@ impl EtcdBucket {
async fn update( async fn update(
&self, &self,
key: &Key, key: &Key,
value: &str, value: impl AsRef<[u8]>,
revision: u64, revision: u64,
) -> Result<StoreOutcome, StoreError> { ) -> Result<StoreOutcome, StoreError> {
let version = revision; let version = revision;
...@@ -328,7 +332,7 @@ mod concurrent_create_tests { ...@@ -328,7 +332,7 @@ mod concurrent_create_tests {
let result = bucket_clone let result = bucket_clone
.lock() .lock()
.await .await
.insert(&key_clone, &value_clone, 0) .insert(&key_clone, value_clone.into(), 0)
.await; .await;
match result { match result {
......
...@@ -17,7 +17,7 @@ use super::{KeyValueBucket, KeyValueStore, StoreError, StoreOutcome}; ...@@ -17,7 +17,7 @@ use super::{KeyValueBucket, KeyValueStore, StoreError, StoreOutcome};
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
enum MemoryEvent { enum MemoryEvent {
Put { key: String, value: String }, Put { key: String, value: bytes::Bytes },
Delete { key: String }, Delete { key: String },
} }
...@@ -45,7 +45,7 @@ pub struct MemoryBucketRef { ...@@ -45,7 +45,7 @@ pub struct MemoryBucketRef {
} }
struct MemoryBucket { struct MemoryBucket {
data: HashMap<String, (u64, String)>, data: HashMap<String, (u64, bytes::Bytes)>,
} }
impl MemoryBucket { impl MemoryBucket {
...@@ -114,7 +114,7 @@ impl KeyValueBucket for MemoryBucketRef { ...@@ -114,7 +114,7 @@ impl KeyValueBucket for MemoryBucketRef {
async fn insert( async fn insert(
&self, &self,
key: &Key, key: &Key,
value: &str, value: bytes::Bytes,
revision: u64, revision: u64,
) -> Result<StoreOutcome, StoreError> { ) -> Result<StoreOutcome, StoreError> {
let mut locked_data = self.inner.data.lock(); let mut locked_data = self.inner.data.lock();
...@@ -124,10 +124,10 @@ impl KeyValueBucket for MemoryBucketRef { ...@@ -124,10 +124,10 @@ impl KeyValueBucket for MemoryBucketRef {
}; };
let outcome = match bucket.data.entry(key.to_string()) { let outcome = match bucket.data.entry(key.to_string()) {
Entry::Vacant(e) => { Entry::Vacant(e) => {
e.insert((revision, value.to_string())); e.insert((revision, value.clone()));
let _ = self.inner.change_sender.send(MemoryEvent::Put { let _ = self.inner.change_sender.send(MemoryEvent::Put {
key: key.to_string(), key: key.to_string(),
value: value.to_string(), value,
}); });
StoreOutcome::Created(revision) StoreOutcome::Created(revision)
} }
...@@ -136,7 +136,7 @@ impl KeyValueBucket for MemoryBucketRef { ...@@ -136,7 +136,7 @@ impl KeyValueBucket for MemoryBucketRef {
if *rev == revision { if *rev == revision {
StoreOutcome::Exists(revision) StoreOutcome::Exists(revision)
} else { } else {
entry.insert((revision, value.to_string())); entry.insert((revision, value));
StoreOutcome::Created(revision) StoreOutcome::Created(revision)
} }
} }
...@@ -149,10 +149,7 @@ impl KeyValueBucket for MemoryBucketRef { ...@@ -149,10 +149,7 @@ impl KeyValueBucket for MemoryBucketRef {
let Some(bucket) = locked_data.get(&self.name) else { let Some(bucket) = locked_data.get(&self.name) else {
return Ok(None); return Ok(None);
}; };
Ok(bucket Ok(bucket.data.get(&key.0).map(|(_, v)| v.clone()))
.data
.get(&key.0)
.map(|(_, v)| bytes::Bytes::from(v.clone())))
} }
async fn delete(&self, key: &Key) -> Result<(), StoreError> { async fn delete(&self, key: &Key) -> Result<(), StoreError> {
...@@ -183,7 +180,7 @@ impl KeyValueBucket for MemoryBucketRef { ...@@ -183,7 +180,7 @@ impl KeyValueBucket for MemoryBucketRef {
}; };
for (key, (_rev, v)) in &bucket.data { for (key, (_rev, v)) in &bucket.data {
seen_keys.insert(key.clone()); seen_keys.insert(key.clone());
let item = KeyValue::new(key.clone(), bytes::Bytes::from(v.clone().into_bytes())); let item = KeyValue::new(key.clone(), v.clone());
existing_items.push(WatchEvent::Put(item)); existing_items.push(WatchEvent::Put(item));
} }
drop(data_lock); drop(data_lock);
...@@ -204,7 +201,7 @@ impl KeyValueBucket for MemoryBucketRef { ...@@ -204,7 +201,7 @@ impl KeyValueBucket for MemoryBucketRef {
if seen_keys.contains(&key) { if seen_keys.contains(&key) {
continue; continue;
} }
let item = KeyValue::new(key, bytes::Bytes::from(value)); let item = KeyValue::new(key, value);
yield WatchEvent::Put(item); yield WatchEvent::Put(item);
}, },
Some(MemoryEvent::Delete { key }) => { Some(MemoryEvent::Delete { key }) => {
...@@ -222,7 +219,7 @@ impl KeyValueBucket for MemoryBucketRef { ...@@ -222,7 +219,7 @@ impl KeyValueBucket for MemoryBucketRef {
Some(bucket) => Ok(bucket Some(bucket) => Ok(bucket
.data .data
.iter() .iter()
.map(|(k, (_rev, v))| (k.to_string(), bytes::Bytes::from(v.clone()))) .map(|(k, (_rev, v))| (k.to_string(), v.clone()))
.collect()), .collect()),
None => Err(StoreError::MissingBucket(self.name.clone())), None => Err(StoreError::MissingBucket(self.name.clone())),
} }
......
...@@ -119,7 +119,7 @@ impl KeyValueBucket for NATSBucket { ...@@ -119,7 +119,7 @@ impl KeyValueBucket for NATSBucket {
async fn insert( async fn insert(
&self, &self,
key: &Key, key: &Key,
value: &str, value: bytes::Bytes,
revision: u64, revision: u64,
) -> Result<StoreOutcome, StoreError> { ) -> Result<StoreOutcome, StoreError> {
if revision == 0 { if revision == 0 {
...@@ -195,8 +195,8 @@ impl KeyValueBucket for NATSBucket { ...@@ -195,8 +195,8 @@ impl KeyValueBucket for NATSBucket {
} }
impl NATSBucket { impl NATSBucket {
async fn create(&self, key: &Key, value: &str) -> Result<StoreOutcome, StoreError> { async fn create(&self, key: &Key, value: bytes::Bytes) -> Result<StoreOutcome, StoreError> {
match self.nats_store.create(&key, value.to_string().into()).await { match self.nats_store.create(&key, value).await {
Ok(revision) => Ok(StoreOutcome::Created(revision)), Ok(revision) => Ok(StoreOutcome::Created(revision)),
Err(err) if err.kind() == async_nats::jetstream::kv::CreateErrorKind::AlreadyExists => { Err(err) if err.kind() == async_nats::jetstream::kv::CreateErrorKind::AlreadyExists => {
// key exists, get the revsion // key exists, get the revsion
...@@ -219,14 +219,10 @@ impl NATSBucket { ...@@ -219,14 +219,10 @@ impl NATSBucket {
async fn update( async fn update(
&self, &self,
key: &Key, key: &Key,
value: &str, value: bytes::Bytes,
revision: u64, revision: u64,
) -> Result<StoreOutcome, StoreError> { ) -> Result<StoreOutcome, StoreError> {
match self match self.nats_store.update(key, value.clone(), revision).await {
.nats_store
.update(key, value.to_string().into(), revision)
.await
{
Ok(revision) => Ok(StoreOutcome::Created(revision)), Ok(revision) => Ok(StoreOutcome::Created(revision)),
Err(err) Err(err)
if err.kind() == async_nats::jetstream::kv::UpdateErrorKind::WrongLastRevision => if err.kind() == async_nats::jetstream::kv::UpdateErrorKind::WrongLastRevision =>
...@@ -240,16 +236,16 @@ impl NATSBucket { ...@@ -240,16 +236,16 @@ impl NATSBucket {
/// We have the wrong revision for a key. Fetch it's entry to get the correct revision, /// We have the wrong revision for a key. Fetch it's entry to get the correct revision,
/// and try the update again. /// and try the update again.
async fn resync_update(&self, key: &Key, value: &str) -> Result<StoreOutcome, StoreError> { async fn resync_update(
&self,
key: &Key,
value: bytes::Bytes,
) -> Result<StoreOutcome, StoreError> {
match self.nats_store.entry(key).await { match self.nats_store.entry(key).await {
Ok(Some(entry)) => { Ok(Some(entry)) => {
// Re-try the update with new version number // Re-try the update with new version number
let next_rev = entry.revision + 1; let next_rev = entry.revision + 1;
match self match self.nats_store.update(key, value, next_rev).await {
.nats_store
.update(key, value.to_string().into(), next_rev)
.await
{
Ok(correct_revision) => Ok(StoreOutcome::Created(correct_revision)), Ok(correct_revision) => Ok(StoreOutcome::Created(correct_revision)),
Err(err) => Err(StoreError::NATSError(format!( Err(err) => Err(StoreError::NATSError(format!(
"Error during update of key {key} after resync: {err}" "Error during update of key {key} after resync: {err}"
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment