Unverified Commit 2f1778c1 authored by Graham King's avatar Graham King Committed by GitHub
Browse files

fix(storage): File store shouldn't notify on metdata changes (#4434)


Signed-off-by: default avatarGraham King <grahamk@nvidia.com>
parent 08146531
...@@ -199,13 +199,13 @@ dependencies = [ ...@@ -199,13 +199,13 @@ dependencies = [
[[package]] [[package]]
name = "async-nats" name = "async-nats"
version = "0.40.0" version = "0.45.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e23419d455dc57d3ae60a2f4278cf561fc74fe866e548e14d2b0ad3e1b8ca0b2" checksum = "86dde77d8a733a9dbaf865a9eb65c72e09c88f3d14d3dd0d2aecf511920ee4fe"
dependencies = [ dependencies = [
"base64 0.22.1", "base64 0.22.1",
"bytes", "bytes",
"futures", "futures-util",
"memchr", "memchr",
"nkeys", "nkeys",
"nuid", "nuid",
...@@ -226,6 +226,7 @@ dependencies = [ ...@@ -226,6 +226,7 @@ dependencies = [
"time", "time",
"tokio", "tokio",
"tokio-rustls", "tokio-rustls",
"tokio-stream",
"tokio-util", "tokio-util",
"tokio-websockets", "tokio-websockets",
"tracing", "tracing",
...@@ -1607,7 +1608,7 @@ dependencies = [ ...@@ -1607,7 +1608,7 @@ dependencies = [
"toktrie", "toktrie",
"toktrie_hf_tokenizers", "toktrie_hf_tokenizers",
"tonic 0.13.1", "tonic 0.13.1",
"tonic-build", "tonic-build 0.13.1",
"tower", "tower",
"tower-http", "tower-http",
"tracing", "tracing",
...@@ -1888,16 +1889,18 @@ dependencies = [ ...@@ -1888,16 +1889,18 @@ dependencies = [
[[package]] [[package]]
name = "etcd-client" name = "etcd-client"
version = "0.16.1" version = "0.17.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "88365f1a5671eb2f7fc240adb216786bc6494b38ce15f1d26ad6eaa303d5e822" checksum = "8acfe553027cd07fc5fafa81a84f19a7a87eaffaccd2162b6db05e8d6ce98084"
dependencies = [ dependencies = [
"http", "http",
"prost 0.13.5", "prost 0.14.1",
"tokio", "tokio",
"tokio-stream", "tokio-stream",
"tonic 0.13.1", "tonic 0.14.2",
"tonic-build", "tonic-build 0.14.2",
"tonic-prost",
"tonic-prost-build",
"tower", "tower",
"tower-service", "tower-service",
] ]
...@@ -3878,7 +3881,7 @@ dependencies = [ ...@@ -3878,7 +3881,7 @@ dependencies = [
"thiserror 2.0.17", "thiserror 2.0.17",
"tokio", "tokio",
"tonic 0.13.1", "tonic 0.13.1",
"tonic-build", "tonic-build 0.13.1",
"tracing", "tracing",
] ]
...@@ -4847,7 +4850,29 @@ dependencies = [ ...@@ -4847,7 +4850,29 @@ dependencies = [
"petgraph", "petgraph",
"prettyplease", "prettyplease",
"prost 0.13.5", "prost 0.13.5",
"prost-types", "prost-types 0.13.5",
"regex",
"syn 2.0.110",
"tempfile",
]
[[package]]
name = "prost-build"
version = "0.14.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ac6c3320f9abac597dcbc668774ef006702672474aad53c6d596b62e487b40b1"
dependencies = [
"heck",
"itertools 0.14.0",
"log",
"multimap",
"once_cell",
"petgraph",
"prettyplease",
"prost 0.14.1",
"prost-types 0.14.1",
"pulldown-cmark",
"pulldown-cmark-to-cmark",
"regex", "regex",
"syn 2.0.110", "syn 2.0.110",
"tempfile", "tempfile",
...@@ -4888,6 +4913,15 @@ dependencies = [ ...@@ -4888,6 +4913,15 @@ dependencies = [
"prost 0.13.5", "prost 0.13.5",
] ]
[[package]]
name = "prost-types"
version = "0.14.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b9b4db3d6da204ed77bb26ba83b6122a73aeb2e87e25fbf7ad2e84c4ccbf8f72"
dependencies = [
"prost 0.14.1",
]
[[package]] [[package]]
name = "protobuf" name = "protobuf"
version = "3.7.2" version = "3.7.2"
...@@ -4908,6 +4942,26 @@ dependencies = [ ...@@ -4908,6 +4942,26 @@ dependencies = [
"thiserror 1.0.69", "thiserror 1.0.69",
] ]
[[package]]
name = "pulldown-cmark"
version = "0.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e8bbe1a966bd2f362681a44f6edce3c2310ac21e4d5067a6e7ec396297a6ea0"
dependencies = [
"bitflags 2.10.0",
"memchr",
"unicase",
]
[[package]]
name = "pulldown-cmark-to-cmark"
version = "21.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8246feae3db61428fd0bb94285c690b460e4517d83152377543ca802357785f1"
dependencies = [
"pulldown-cmark",
]
[[package]] [[package]]
name = "pulp" name = "pulp"
version = "0.18.22" version = "0.18.22"
...@@ -6833,7 +6887,6 @@ dependencies = [ ...@@ -6833,7 +6887,6 @@ dependencies = [
"prost 0.13.5", "prost 0.13.5",
"socket2 0.5.10", "socket2 0.5.10",
"tokio", "tokio",
"tokio-rustls",
"tokio-stream", "tokio-stream",
"tower", "tower",
"tower-layer", "tower-layer",
...@@ -6848,8 +6901,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" ...@@ -6848,8 +6901,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eb7613188ce9f7df5bfe185db26c5814347d110db17920415cf2fbcad85e7203" checksum = "eb7613188ce9f7df5bfe185db26c5814347d110db17920415cf2fbcad85e7203"
dependencies = [ dependencies = [
"async-trait", "async-trait",
"axum",
"base64 0.22.1", "base64 0.22.1",
"bytes", "bytes",
"h2",
"http", "http",
"http-body", "http-body",
"http-body-util", "http-body-util",
...@@ -6858,8 +6913,10 @@ dependencies = [ ...@@ -6858,8 +6913,10 @@ dependencies = [
"hyper-util", "hyper-util",
"percent-encoding", "percent-encoding",
"pin-project", "pin-project",
"socket2 0.6.1",
"sync_wrapper", "sync_wrapper",
"tokio", "tokio",
"tokio-rustls",
"tokio-stream", "tokio-stream",
"tower", "tower",
"tower-layer", "tower-layer",
...@@ -6875,8 +6932,20 @@ checksum = "eac6f67be712d12f0b41328db3137e0d0757645d8904b4cb7d51cd9c2279e847" ...@@ -6875,8 +6932,20 @@ checksum = "eac6f67be712d12f0b41328db3137e0d0757645d8904b4cb7d51cd9c2279e847"
dependencies = [ dependencies = [
"prettyplease", "prettyplease",
"proc-macro2", "proc-macro2",
"prost-build", "prost-build 0.13.5",
"prost-types", "prost-types 0.13.5",
"quote",
"syn 2.0.110",
]
[[package]]
name = "tonic-build"
version = "0.14.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4c40aaccc9f9eccf2cd82ebc111adc13030d23e887244bc9cfa5d1d636049de3"
dependencies = [
"prettyplease",
"proc-macro2",
"quote", "quote",
"syn 2.0.110", "syn 2.0.110",
] ]
...@@ -6892,6 +6961,22 @@ dependencies = [ ...@@ -6892,6 +6961,22 @@ dependencies = [
"tonic 0.14.2", "tonic 0.14.2",
] ]
[[package]]
name = "tonic-prost-build"
version = "0.14.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b4a16cba4043dc3ff43fcb3f96b4c5c154c64cbd18ca8dce2ab2c6a451d058a2"
dependencies = [
"prettyplease",
"proc-macro2",
"prost-build 0.14.1",
"prost-types 0.14.1",
"quote",
"syn 2.0.110",
"tempfile",
"tonic-build 0.14.2",
]
[[package]] [[package]]
name = "tower" name = "tower"
version = "0.5.2" version = "0.5.2"
......
...@@ -9,6 +9,7 @@ use arc_swap::ArcSwap; ...@@ -9,6 +9,7 @@ use arc_swap::ArcSwap;
use futures::StreamExt; use futures::StreamExt;
use tokio::net::unix::pipe::Receiver; use tokio::net::unix::pipe::Receiver;
use crate::discovery::{DiscoveryEvent, DiscoveryInstance};
use crate::{ use crate::{
component::{Endpoint, Instance}, component::{Endpoint, Instance},
pipeline::async_trait, pipeline::async_trait,
...@@ -40,15 +41,11 @@ pub struct Client { ...@@ -40,15 +41,11 @@ pub struct Client {
impl Client { impl Client {
// Client with auto-discover instances using key-value store // Client with auto-discover instances using key-value store
pub(crate) async fn new(endpoint: Endpoint) -> Result<Self> { pub(crate) async fn new(endpoint: Endpoint) -> Result<Self> {
tracing::debug!( tracing::trace!(
"Client::new_dynamic: Creating dynamic client for endpoint: {}", "Client::new_dynamic: Creating dynamic client for endpoint: {}",
endpoint.path() endpoint.path()
); );
let instance_source = Self::get_or_create_dynamic_instance_source(&endpoint).await?; let instance_source = Self::get_or_create_dynamic_instance_source(&endpoint).await?;
tracing::debug!(
"Client::new_dynamic: Got instance source for endpoint: {}",
endpoint.path()
);
let (avail_tx, avail_rx) = tokio::sync::watch::channel(vec![]); let (avail_tx, avail_rx) = tokio::sync::watch::channel(vec![]);
let client = Client { let client = Client {
...@@ -59,15 +56,7 @@ impl Client { ...@@ -59,15 +56,7 @@ impl Client {
instance_avail_tx: Arc::new(avail_tx), instance_avail_tx: Arc::new(avail_tx),
instance_avail_rx: avail_rx, instance_avail_rx: avail_rx,
}; };
tracing::debug!(
"Client::new_dynamic: Starting instance source monitor for endpoint: {}",
endpoint.path()
);
client.monitor_instance_source(); client.monitor_instance_source();
tracing::debug!(
"Client::new_dynamic: Successfully created dynamic client for endpoint: {}",
endpoint.path()
);
Ok(client) Ok(client)
} }
...@@ -104,32 +93,17 @@ impl Client { ...@@ -104,32 +93,17 @@ impl Client {
/// Wait for at least one Instance to be available for this Endpoint /// Wait for at least one Instance to be available for this Endpoint
pub async fn wait_for_instances(&self) -> Result<Vec<Instance>> { pub async fn wait_for_instances(&self) -> Result<Vec<Instance>> {
tracing::debug!( tracing::trace!(
"wait_for_instances: Starting wait for endpoint: {}", "wait_for_instances: Starting wait for endpoint: {}",
self.endpoint.path() self.endpoint.path()
); );
let mut rx = self.instance_source.as_ref().clone(); let mut rx = self.instance_source.as_ref().clone();
// wait for there to be 1 or more endpoints // wait for there to be 1 or more endpoints
let mut iteration = 0;
let mut instances: Vec<Instance>; let mut instances: Vec<Instance>;
loop { loop {
instances = rx.borrow_and_update().to_vec(); instances = rx.borrow_and_update().to_vec();
tracing::debug!(
"wait_for_instances: iteration={}, current_instance_count={}, endpoint={}",
iteration,
instances.len(),
self.endpoint.path()
);
if instances.is_empty() { if instances.is_empty() {
tracing::debug!(
"wait_for_instances: No instances yet, waiting for change notification for endpoint: {}",
self.endpoint.path()
);
rx.changed().await?; rx.changed().await?;
tracing::debug!(
"wait_for_instances: Change notification received for endpoint: {}",
self.endpoint.path()
);
} else { } else {
tracing::info!( tracing::info!(
"wait_for_instances: Found {} instance(s) for endpoint: {}", "wait_for_instances: Found {} instance(s) for endpoint: {}",
...@@ -138,7 +112,6 @@ impl Client { ...@@ -138,7 +112,6 @@ impl Client {
); );
break; break;
} }
iteration += 1;
} }
Ok(instances) Ok(instances)
} }
...@@ -173,13 +146,8 @@ impl Client { ...@@ -173,13 +146,8 @@ impl Client {
let cancel_token = self.endpoint.drt().primary_token(); let cancel_token = self.endpoint.drt().primary_token();
let client = self.clone(); let client = self.clone();
let endpoint_path = self.endpoint.path(); let endpoint_path = self.endpoint.path();
tracing::debug!(
"monitor_instance_source: Starting monitor for endpoint: {}",
endpoint_path
);
tokio::task::spawn(async move { tokio::task::spawn(async move {
let mut rx = client.instance_source.as_ref().clone(); let mut rx = client.instance_source.as_ref().clone();
let mut iteration = 0;
while !cancel_token.is_cancelled() { while !cancel_token.is_cancelled() {
let instance_ids: Vec<u64> = rx let instance_ids: Vec<u64> = rx
.borrow_and_update() .borrow_and_update()
...@@ -187,14 +155,6 @@ impl Client { ...@@ -187,14 +155,6 @@ impl Client {
.map(|instance| instance.id()) .map(|instance| instance.id())
.collect(); .collect();
tracing::debug!(
"monitor_instance_source: iteration={}, instance_count={}, instance_ids={:?}, endpoint={}",
iteration,
instance_ids.len(),
instance_ids,
endpoint_path
);
// TODO: this resets both tracked available and free instances // TODO: this resets both tracked available and free instances
client.instance_avail.store(Arc::new(instance_ids.clone())); client.instance_avail.store(Arc::new(instance_ids.clone()));
client.instance_free.store(Arc::new(instance_ids.clone())); client.instance_free.store(Arc::new(instance_ids.clone()));
...@@ -202,11 +162,6 @@ impl Client { ...@@ -202,11 +162,6 @@ impl Client {
// Send update to watch channel subscribers // Send update to watch channel subscribers
let _ = client.instance_avail_tx.send(instance_ids); let _ = client.instance_avail_tx.send(instance_ids);
tracing::debug!(
"monitor_instance_source: instance source updated, endpoint={}",
endpoint_path
);
if let Err(err) = rx.changed().await { if let Err(err) = rx.changed().await {
tracing::error!( tracing::error!(
"monitor_instance_source: The Sender is dropped: {}, endpoint={}", "monitor_instance_source: The Sender is dropped: {}, endpoint={}",
...@@ -215,12 +170,7 @@ impl Client { ...@@ -215,12 +170,7 @@ impl Client {
); );
cancel_token.cancel(); cancel_token.cancel();
} }
iteration += 1;
} }
tracing::debug!(
"monitor_instance_source: Monitor loop exiting for endpoint: {}",
endpoint_path
);
}); });
} }
...@@ -231,32 +181,14 @@ impl Client { ...@@ -231,32 +181,14 @@ impl Client {
let instance_sources = drt.instance_sources(); let instance_sources = drt.instance_sources();
let mut instance_sources = instance_sources.lock().await; let mut instance_sources = instance_sources.lock().await;
tracing::debug!(
"get_or_create_dynamic_instance_source: Checking cache for endpoint: {}",
endpoint.path()
);
if let Some(instance_source) = instance_sources.get(endpoint) { if let Some(instance_source) = instance_sources.get(endpoint) {
if let Some(instance_source) = instance_source.upgrade() { if let Some(instance_source) = instance_source.upgrade() {
tracing::debug!(
"get_or_create_dynamic_instance_source: Found cached instance source for endpoint: {}",
endpoint.path()
);
return Ok(instance_source); return Ok(instance_source);
} else { } else {
tracing::debug!(
"get_or_create_dynamic_instance_source: Cached instance source was dropped, removing for endpoint: {}",
endpoint.path()
);
instance_sources.remove(endpoint); instance_sources.remove(endpoint);
} }
} }
tracing::debug!(
"get_or_create_dynamic_instance_source: Creating new instance source for endpoint: {}",
endpoint.path()
);
let discovery = drt.discovery(); let discovery = drt.discovery();
let discovery_query = crate::discovery::DiscoveryQuery::Endpoint { let discovery_query = crate::discovery::DiscoveryQuery::Endpoint {
namespace: endpoint.component.namespace.name.clone(), namespace: endpoint.component.namespace.name.clone(),
...@@ -264,40 +196,25 @@ impl Client { ...@@ -264,40 +196,25 @@ impl Client {
endpoint: endpoint.name.clone(), endpoint: endpoint.name.clone(),
}; };
tracing::debug!(
"get_or_create_dynamic_instance_source: Calling discovery.list_and_watch for query: {:?}",
discovery_query
);
let mut discovery_stream = discovery let mut discovery_stream = discovery
.list_and_watch(discovery_query.clone(), None) .list_and_watch(discovery_query.clone(), None)
.await?; .await?;
tracing::debug!(
"get_or_create_dynamic_instance_source: Got discovery stream for query: {:?}",
discovery_query
);
let (watch_tx, watch_rx) = tokio::sync::watch::channel(vec![]); let (watch_tx, watch_rx) = tokio::sync::watch::channel(vec![]);
let secondary = endpoint.component.drt.runtime().secondary().clone(); let secondary = endpoint.component.drt.runtime().secondary().clone();
secondary.spawn(async move { secondary.spawn(async move {
tracing::debug!("endpoint_watcher: Starting for discovery query: {:?}", discovery_query); tracing::trace!("endpoint_watcher: Starting for discovery query: {:?}", discovery_query);
let mut map: HashMap<u64, Instance> = HashMap::new(); let mut map: HashMap<u64, Instance> = HashMap::new();
let mut event_count = 0;
loop { loop {
let discovery_event = tokio::select! { let discovery_event = tokio::select! {
_ = watch_tx.closed() => { _ = watch_tx.closed() => {
tracing::debug!("endpoint_watcher: all watchers have closed; shutting down for discovery query: {:?}", discovery_query);
break; break;
} }
discovery_event = discovery_stream.next() => { discovery_event = discovery_stream.next() => {
tracing::debug!("endpoint_watcher: Received stream event for discovery query: {:?}", discovery_query);
match discovery_event { match discovery_event {
Some(Ok(event)) => { Some(Ok(event)) => {
tracing::debug!("endpoint_watcher: Got Ok event: {:?}", event);
event event
}, },
Some(Err(e)) => { Some(Err(e)) => {
...@@ -305,67 +222,34 @@ impl Client { ...@@ -305,67 +222,34 @@ impl Client {
break; break;
} }
None => { None => {
tracing::debug!("endpoint_watcher: watch stream has closed; shutting down for discovery query: {:?}", discovery_query);
break; break;
} }
} }
} }
}; };
event_count += 1;
tracing::debug!("endpoint_watcher: Processing event #{} for discovery query: {:?}", event_count, discovery_query);
match discovery_event { match discovery_event {
crate::discovery::DiscoveryEvent::Added(discovery_instance) => { DiscoveryEvent::Added(discovery_instance) => {
match discovery_instance { if let DiscoveryInstance::Endpoint(instance) = discovery_instance {
crate::discovery::DiscoveryInstance::Endpoint(instance) => {
tracing::debug!(
"endpoint_watcher: Added endpoint instance_id={}, namespace={}, component={}, endpoint={}",
instance.instance_id,
instance.namespace,
instance.component,
instance.endpoint
);
map.insert(instance.instance_id, instance); map.insert(instance.instance_id, instance);
} }
_ => {
tracing::debug!("endpoint_watcher: Ignoring non-endpoint instance (Model, etc.) for discovery query: {:?}", discovery_query);
} }
} DiscoveryEvent::Removed(instance_id) => {
}
crate::discovery::DiscoveryEvent::Removed(instance_id) => {
tracing::debug!(
"endpoint_watcher: Removed instance_id={} for discovery query: {:?}",
instance_id,
discovery_query
);
map.remove(&instance_id); map.remove(&instance_id);
} }
} }
let instances: Vec<Instance> = map.values().cloned().collect(); let instances: Vec<Instance> = map.values().cloned().collect();
tracing::debug!(
"endpoint_watcher: Current map size={}, sending update for discovery query: {:?}",
instances.len(),
discovery_query
);
if watch_tx.send(instances).is_err() { if watch_tx.send(instances).is_err() {
tracing::debug!("endpoint_watcher: Unable to send watch updates; shutting down for discovery query: {:?}", discovery_query);
break; break;
} }
} }
tracing::debug!("endpoint_watcher: Completed for discovery query: {:?}, total events processed: {}", discovery_query, event_count);
let _ = watch_tx.send(vec![]); let _ = watch_tx.send(vec![]);
}); });
let instance_source = Arc::new(watch_rx); let instance_source = Arc::new(watch_rx);
instance_sources.insert(endpoint.clone(), Arc::downgrade(&instance_source)); instance_sources.insert(endpoint.clone(), Arc::downgrade(&instance_source));
tracing::debug!(
"get_or_create_dynamic_instance_source: Successfully created and cached instance source for endpoint: {}",
endpoint.path()
);
Ok(instance_source) Ok(instance_source)
} }
} }
...@@ -247,7 +247,7 @@ impl Discovery for KVStoreDiscovery { ...@@ -247,7 +247,7 @@ impl Discovery for KVStoreDiscovery {
MODELS_BUCKET MODELS_BUCKET
}; };
tracing::debug!( tracing::trace!(
"KVStoreDiscovery::list_and_watch: Starting watch for query={:?}, prefix={}, bucket={}", "KVStoreDiscovery::list_and_watch: Starting watch for query={:?}, prefix={}, bucket={}",
query, query,
prefix, prefix,
...@@ -264,47 +264,18 @@ impl Discovery for KVStoreDiscovery { ...@@ -264,47 +264,18 @@ impl Discovery for KVStoreDiscovery {
cancel_token, cancel_token,
); );
tracing::debug!(
"KVStoreDiscovery::list_and_watch: Got watch receiver for bucket={}",
bucket_name
);
// Create a stream that filters and transforms WatchEvents to DiscoveryEvents // Create a stream that filters and transforms WatchEvents to DiscoveryEvents
let stream = async_stream::stream! { let stream = async_stream::stream! {
let mut event_count = 0;
tracing::debug!("KVStoreDiscovery::list_and_watch: Stream started, waiting for events on prefix={}", prefix);
while let Some(event) = rx.recv().await { while let Some(event) = rx.recv().await {
event_count += 1;
tracing::debug!(
"KVStoreDiscovery::list_and_watch: Received event #{} for prefix={}",
event_count,
prefix
);
let discovery_event = match event { let discovery_event = match event {
WatchEvent::Put(kv) => { WatchEvent::Put(kv) => {
tracing::debug!(
"KVStoreDiscovery::list_and_watch: Put event, key={}, prefix={}, matches={}",
kv.key_str(),
prefix,
Self::matches_prefix(kv.key_str(), &prefix, bucket_name)
);
// Check if this key matches our prefix // Check if this key matches our prefix
if !Self::matches_prefix(kv.key_str(), &prefix, bucket_name) { if !Self::matches_prefix(kv.key_str(), &prefix, bucket_name) {
tracing::debug!(
"KVStoreDiscovery::list_and_watch: Skipping key {} (doesn't match prefix {})",
kv.key_str(),
prefix
);
continue; continue;
} }
match Self::parse_instance(kv.value()) { match Self::parse_instance(kv.value()) {
Ok(instance) => { Ok(instance) => {
tracing::debug!(
"KVStoreDiscovery::list_and_watch: Emitting Added event for instance_id={}, key={}",
instance.instance_id(),
kv.key_str()
);
Some(DiscoveryEvent::Added(instance)) Some(DiscoveryEvent::Added(instance))
}, },
Err(e) => { Err(e) => {
...@@ -319,18 +290,8 @@ impl Discovery for KVStoreDiscovery { ...@@ -319,18 +290,8 @@ impl Discovery for KVStoreDiscovery {
} }
WatchEvent::Delete(kv) => { WatchEvent::Delete(kv) => {
let key_str = kv.as_ref(); let key_str = kv.as_ref();
tracing::debug!(
"KVStoreDiscovery::list_and_watch: Delete event, key={}, prefix={}",
key_str,
prefix
);
// Check if this key matches our prefix // Check if this key matches our prefix
if !Self::matches_prefix(key_str, &prefix, bucket_name) { if !Self::matches_prefix(key_str, &prefix, bucket_name) {
tracing::debug!(
"KVStoreDiscovery::list_and_watch: Skipping deleted key {} (doesn't match prefix {})",
key_str,
prefix
);
continue; continue;
} }
...@@ -342,11 +303,6 @@ impl Discovery for KVStoreDiscovery { ...@@ -342,11 +303,6 @@ impl Discovery for KVStoreDiscovery {
Some(instance_id_hex) => { Some(instance_id_hex) => {
match u64::from_str_radix(instance_id_hex, 16) { match u64::from_str_radix(instance_id_hex, 16) {
Ok(instance_id) => { Ok(instance_id) => {
tracing::debug!(
"KVStoreDiscovery::list_and_watch: Emitting Removed event for instance_id={}, key={}",
instance_id,
key_str
);
Some(DiscoveryEvent::Removed(instance_id)) Some(DiscoveryEvent::Removed(instance_id))
} }
Err(e) => { Err(e) => {
...@@ -371,19 +327,10 @@ impl Discovery for KVStoreDiscovery { ...@@ -371,19 +327,10 @@ impl Discovery for KVStoreDiscovery {
}; };
if let Some(event) = discovery_event { if let Some(event) = discovery_event {
tracing::debug!("KVStoreDiscovery::list_and_watch: Yielding event: {:?}", event);
yield Ok(event); yield Ok(event);
} else {
tracing::debug!("KVStoreDiscovery::list_and_watch: Event was filtered out (None)");
} }
} }
tracing::debug!("KVStoreDiscovery::list_and_watch: Stream ended after {} events for prefix={}", event_count, prefix);
}; };
tracing::debug!(
"KVStoreDiscovery::list_and_watch: Returning stream for query={:?}",
query
);
Ok(Box::pin(stream)) Ok(Box::pin(stream))
} }
} }
......
...@@ -18,7 +18,7 @@ use std::{collections::HashMap, pin::Pin}; ...@@ -18,7 +18,7 @@ use std::{collections::HashMap, pin::Pin};
use anyhow::Context as _; use anyhow::Context as _;
use async_trait::async_trait; use async_trait::async_trait;
use futures::StreamExt; use futures::StreamExt;
use notify::{Config, Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher}; use notify::{Config, Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher, event};
use parking_lot::Mutex; use parking_lot::Mutex;
use crate::storage::key_value_store::KeyValue; use crate::storage::key_value_store::KeyValue;
...@@ -370,13 +370,8 @@ impl KeyValueBucket for Directory { ...@@ -370,13 +370,8 @@ impl KeyValueBucket for Directory {
} }
// Canonicalize paths to handle symlinks (e.g., /var -> /private/var on macOS) // Canonicalize paths to handle symlinks (e.g., /var -> /private/var on macOS)
let canonical_item_path = match item_path.canonicalize() { // The unwrap_or_else path is for Remove case.
Ok(p) => p, let canonical_item_path = item_path.canonicalize().unwrap_or_else(|_| item_path.clone());
Err(err) => {
tracing::warn!(error = %err, item = %item_path.display(), "Failed to canonicalize path. Using original path.");
item_path.clone()
}
};
let key = match canonical_item_path.strip_prefix(&root) { let key = match canonical_item_path.strip_prefix(&root) {
Ok(stripped) => stripped.display().to_string().replace("_", "/"), Ok(stripped) => stripped.display().to_string().replace("_", "/"),
...@@ -393,7 +388,7 @@ impl KeyValueBucket for Directory { ...@@ -393,7 +388,7 @@ impl KeyValueBucket for Directory {
}; };
match event.kind { match event.kind {
EventKind::Create(_) | EventKind::Modify(_) => { EventKind::Create(event::CreateKind::File) | EventKind::Modify(event::ModifyKind::Data(event::DataChange::Content)) => {
let data: bytes::Bytes = match fs::read(&item_path) { let data: bytes::Bytes = match fs::read(&item_path) {
Ok(data) => data.into(), Ok(data) => data.into(),
Err(err) => { Err(err) => {
...@@ -404,11 +399,11 @@ impl KeyValueBucket for Directory { ...@@ -404,11 +399,11 @@ impl KeyValueBucket for Directory {
let item = KeyValue::new(key, data); let item = KeyValue::new(key, data);
yield WatchEvent::Put(item); yield WatchEvent::Put(item);
} }
EventKind::Remove(_) => { EventKind::Remove(event::RemoveKind::File) => {
yield WatchEvent::Delete(Key::from_raw(key)); yield WatchEvent::Delete(Key::from_raw(key));
} }
event_type => { _ => {
tracing::debug!(?event_type, dir = %dir.display(), "Ignoring event type"); // These happen every time the keep-alive updates last modified time
continue; continue;
} }
} }
......
...@@ -18,7 +18,7 @@ mod integration { ...@@ -18,7 +18,7 @@ mod integration {
pub const DEFAULT_NAMESPACE: &str = "dynamo"; pub const DEFAULT_NAMESPACE: &str = "dynamo";
use dynamo_runtime::{ use dynamo_runtime::{
DistributedRuntime, ErrorContext, Result, Runtime, Worker, DistributedRuntime, Runtime, Worker,
config::environment_names::testing as env_testing, config::environment_names::testing as env_testing,
logging, logging,
pipeline::{ pipeline::{
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment