Unverified Commit 78a3feda authored by Yan Ru Pei's avatar Yan Ru Pei Committed by GitHub
Browse files

fix: hook up worker removals for indexer (#3095)


Signed-off-by: default avatarPeaBrane <yanrpei@gmail.com>
parent 26889b09
...@@ -419,6 +419,7 @@ impl KvIndexer { ...@@ -419,6 +419,7 @@ impl KvIndexer {
component.inner.clone(), component.inner.clone(),
consumer_uuid.unwrap_or_else(|| uuid::Uuid::new_v4().to_string()), consumer_uuid.unwrap_or_else(|| uuid::Uuid::new_v4().to_string()),
inner.event_sender(), inner.event_sender(),
inner.remove_worker_sender(),
None, None,
cancellation_token, cancellation_token,
None, None,
......
...@@ -266,6 +266,7 @@ impl KvRouter { ...@@ -266,6 +266,7 @@ impl KvRouter {
component.clone(), component.clone(),
consumer_uuid, consumer_uuid,
kv_indexer.event_sender(), kv_indexer.event_sender(),
kv_indexer.remove_worker_sender(),
kv_router_config kv_router_config
.router_snapshot_threshold .router_snapshot_threshold
.map(|_| kv_indexer.snapshot_event_sender()), .map(|_| kv_indexer.snapshot_event_sender()),
......
...@@ -874,6 +874,15 @@ impl KvIndexer { ...@@ -874,6 +874,15 @@ impl KvIndexer {
pub fn snapshot_event_sender(&self) -> mpsc::Sender<DumpRequest> { pub fn snapshot_event_sender(&self) -> mpsc::Sender<DumpRequest> {
self.dump_tx.clone() self.dump_tx.clone()
} }
/// Get a sender for worker removal requests.
///
/// ### Returns
///
/// A `mpsc::Sender` for `WorkerId`s.
pub fn remove_worker_sender(&self) -> mpsc::Sender<WorkerId> {
self.remove_worker_tx.clone()
}
} }
#[async_trait] #[async_trait]
......
...@@ -68,10 +68,12 @@ impl SnapshotResources { ...@@ -68,10 +68,12 @@ impl SnapshotResources {
} }
/// Start a unified background task for event consumption and optional snapshot management /// Start a unified background task for event consumption and optional snapshot management
#[allow(clippy::too_many_arguments)]
pub async fn start_kv_router_background( pub async fn start_kv_router_background(
component: Component, component: Component,
consumer_uuid: String, consumer_uuid: String,
kv_events_tx: mpsc::Sender<RouterEvent>, kv_events_tx: mpsc::Sender<RouterEvent>,
remove_worker_tx: mpsc::Sender<crate::kv_router::indexer::WorkerId>,
snapshot_tx: Option<mpsc::Sender<DumpRequest>>, snapshot_tx: Option<mpsc::Sender<DumpRequest>>,
cancellation_token: CancellationToken, cancellation_token: CancellationToken,
router_snapshot_threshold: Option<u32>, router_snapshot_threshold: Option<u32>,
...@@ -156,6 +158,13 @@ pub async fn start_kv_router_background( ...@@ -156,6 +158,13 @@ pub async fn start_kv_router_background(
.dissolve(); .dissolve();
let cleanup_lock_name = format!("{}/{}", ROUTER_CLEANUP_LOCK, component.subject()); let cleanup_lock_name = format!("{}/{}", ROUTER_CLEANUP_LOCK, component.subject());
// Get the generate endpoint and watch for instance deletions
let generate_endpoint = component.endpoint("generate");
let (_instance_prefix, _instance_watcher, mut instance_event_rx) = etcd_client
.kv_get_and_watch_prefix(generate_endpoint.etcd_root())
.await?
.dissolve();
// Only set up snapshot-related resources if snapshot_tx is provided and threshold is set // Only set up snapshot-related resources if snapshot_tx is provided and threshold is set
let snapshot_resources = if snapshot_tx.is_some() && router_snapshot_threshold.is_some() { let snapshot_resources = if snapshot_tx.is_some() && router_snapshot_threshold.is_some() {
let lock_name = format!("{}/{}", ROUTER_SNAPSHOT_LOCK, component.subject()); let lock_name = format!("{}/{}", ROUTER_SNAPSHOT_LOCK, component.subject());
...@@ -188,6 +197,30 @@ pub async fn start_kv_router_background( ...@@ -188,6 +197,30 @@ pub async fn start_kv_router_background(
break; break;
} }
// Handle generate endpoint instance deletion events
Some(event) = instance_event_rx.recv() => {
let WatchEvent::Delete(kv) = event else {
continue;
};
let key = String::from_utf8_lossy(kv.key());
let Some(worker_id_str) = key.split('/').next_back() else {
tracing::warn!("Could not extract worker ID from instance key: {}", key);
continue;
};
let Ok(worker_id) = worker_id_str.parse::<i64>() else {
tracing::warn!("Could not parse worker ID from instance key: {}", key);
continue;
};
tracing::info!("Generate endpoint instance deleted, removing worker {}", worker_id);
if let Err(e) = remove_worker_tx.send(worker_id).await {
tracing::warn!("Failed to send worker removal for worker {}: {}", worker_id, e);
}
}
// Handle event consumption // Handle event consumption
result = nats_queue.dequeue_task(None) => { result = nats_queue.dequeue_task(None) => {
match result { match result {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment