Unverified Commit 0844f8ee authored by Yan Ru Pei's avatar Yan Ru Pei Committed by GitHub
Browse files

feat: allow shutdown of orphaned kv consumers on Router startup (#3516)


Signed-off-by: default avatarPeaBrane <yanrpei@gmail.com>
parent 44012a24
...@@ -3,7 +3,7 @@ ...@@ -3,7 +3,7 @@
//! Background processes for the KV Router including event consumption and snapshot uploads. //! Background processes for the KV Router including event consumption and snapshot uploads.
use std::time::Duration; use std::{collections::HashSet, time::Duration};
use anyhow::Result; use anyhow::Result;
use dynamo_runtime::{ use dynamo_runtime::{
...@@ -11,7 +11,7 @@ use dynamo_runtime::{ ...@@ -11,7 +11,7 @@ use dynamo_runtime::{
prelude::*, prelude::*,
traits::events::EventPublisher, traits::events::EventPublisher,
transports::{ transports::{
etcd::WatchEvent, etcd::{Client as EtcdClient, WatchEvent},
nats::{NatsQueue, Slug}, nats::{NatsQueue, Slug},
}, },
}; };
...@@ -32,7 +32,7 @@ use crate::{ ...@@ -32,7 +32,7 @@ use crate::{
struct SnapshotResources { struct SnapshotResources {
nats_client: dynamo_runtime::transports::nats::Client, nats_client: dynamo_runtime::transports::nats::Client,
bucket_name: String, bucket_name: String,
etcd_client: dynamo_runtime::transports::etcd::Client, etcd_client: EtcdClient,
lock_name: String, lock_name: String,
} }
...@@ -91,7 +91,7 @@ pub async fn start_kv_router_background( ...@@ -91,7 +91,7 @@ pub async fn start_kv_router_background(
stream_name.clone(), stream_name.clone(),
nats_server.clone(), nats_server.clone(),
std::time::Duration::from_secs(60), // 1 minute timeout std::time::Duration::from_secs(60), // 1 minute timeout
consumer_uuid, consumer_uuid.clone(),
); );
nats_queue.connect_with_reset(router_reset_states).await?; nats_queue.connect_with_reset(router_reset_states).await?;
...@@ -151,6 +151,9 @@ pub async fn start_kv_router_background( ...@@ -151,6 +151,9 @@ pub async fn start_kv_router_background(
.etcd_client() .etcd_client()
.ok_or_else(|| anyhow::anyhow!("etcd client not available"))?; .ok_or_else(|| anyhow::anyhow!("etcd client not available"))?;
// Cleanup orphaned consumers on startup
cleanup_orphaned_consumers(&mut nats_queue, &etcd_client, &component, &consumer_uuid).await;
// Watch for router deletions to clean up orphaned consumers // Watch for router deletions to clean up orphaned consumers
let (_prefix_str, _watcher, mut router_replicas_rx) = etcd_client let (_prefix_str, _watcher, mut router_replicas_rx) = etcd_client
.kv_get_and_watch_prefix(&format!("{}/", KV_ROUTERS_ROOT_PATH)) .kv_get_and_watch_prefix(&format!("{}/", KV_ROUTERS_ROOT_PATH))
...@@ -300,7 +303,7 @@ pub async fn start_kv_router_background( ...@@ -300,7 +303,7 @@ pub async fn start_kv_router_background(
}; };
let key = String::from_utf8_lossy(kv.key()); let key = String::from_utf8_lossy(kv.key());
tracing::info!("Router deleted: {}", key); tracing::info!("Detected router replica deletion: {}", key);
// Only process deletions for routers on the same component // Only process deletions for routers on the same component
if !key.contains(component.path().as_str()) { if !key.contains(component.path().as_str()) {
...@@ -365,6 +368,44 @@ pub async fn start_kv_router_background( ...@@ -365,6 +368,44 @@ pub async fn start_kv_router_background(
Ok(()) Ok(())
} }
/// Cleanup orphaned NATS consumers that no longer have corresponding etcd router entries
async fn cleanup_orphaned_consumers(
nats_queue: &mut NatsQueue,
etcd_client: &EtcdClient,
component: &Component,
consumer_uuid: &str,
) {
let Ok(consumers) = nats_queue.list_consumers().await else {
return;
};
let router_prefix = format!("{}/{}/", KV_ROUTERS_ROOT_PATH, component.path());
let Ok(router_entries) = etcd_client.kv_get_prefix(&router_prefix).await else {
return;
};
let active_uuids: HashSet<String> = router_entries
.iter()
.filter_map(|kv| {
String::from_utf8_lossy(kv.key())
.split('/')
.next_back()
.map(str::to_string)
})
.collect();
for consumer in consumers {
if consumer == consumer_uuid {
// Never delete myself (extra/redundant safeguard)
continue;
}
if !active_uuids.contains(&consumer) {
tracing::info!("Cleaning up orphaned consumer: {}", consumer);
let _ = nats_queue.shutdown(Some(consumer)).await;
}
}
}
/// Perform snapshot upload and purge operations /// Perform snapshot upload and purge operations
async fn purge_then_snapshot( async fn purge_then_snapshot(
nats_queue: &mut NatsQueue, nats_queue: &mut NatsQueue,
......
...@@ -649,6 +649,17 @@ impl NatsQueue { ...@@ -649,6 +649,17 @@ impl NatsQueue {
} }
} }
/// List all consumer names for the stream
pub async fn list_consumers(&mut self) -> Result<Vec<String>> {
self.ensure_connection().await?;
if let Some(client) = &self.client {
client.list_consumers(&self.stream_name).await
} else {
Err(anyhow::anyhow!("Client not connected"))
}
}
/// Enqueue a task using the provided data /// Enqueue a task using the provided data
pub async fn enqueue_task(&mut self, task_data: Bytes) -> Result<()> { pub async fn enqueue_task(&mut self, task_data: Bytes) -> Result<()> {
self.ensure_connection().await?; self.ensure_connection().await?;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment