Unverified Commit 43ba25bc authored by Yan Ru Pei's avatar Yan Ru Pei Committed by GitHub
Browse files

fix: block on KV subscriber initialization (#5149)


Signed-off-by: default avatarPeaBrane <yanrpei@gmail.com>
parent 6bfb41de
......@@ -388,60 +388,44 @@ impl KvRouter {
};
tracing::info!("Found {count} worker runtime config(s), starting KV event subscriber");
// Clone everything needed for the background subscriber task
let component_clone = component.clone();
let kv_indexer_clone = kv_indexer.clone();
let cancellation_token_clone = cancellation_token.clone();
let worker_query_client_clone =
worker_query::WorkerQueryClient::new(component.clone(), runtime_configs_rx.clone());
// Spawn subscriber as background task (long-running)
// Start subscriber - setup runs synchronously, then spawns background loop internally
if all_local_indexer {
// All workers have local_indexer enabled - use NATS Core
tracing::info!(
"All {count} workers have local_indexer enabled, using NATS Core subscription"
);
tokio::spawn(async move {
if let Err(e) = start_kv_router_background_nats_core(
component_clone,
kv_indexer_clone.event_sender(),
kv_indexer_clone.remove_worker_sender(),
cancellation_token_clone,
worker_query_client_clone,
)
.await
{
tracing::error!("Failed to start NATS Core subscriber: {e}");
}
});
start_kv_router_background_nats_core(
component.clone(),
kv_indexer.event_sender(),
kv_indexer.remove_worker_sender(),
cancellation_token.clone(),
worker_query::WorkerQueryClient::new(
component.clone(),
runtime_configs_rx.clone(),
),
)
.await?;
} else {
// Not all workers have local_indexer - use JetStream
tracing::info!(
"Not all workers have local_indexer enabled, using JetStream subscription"
);
tokio::spawn(async move {
if let Err(e) = start_kv_router_background(
component_clone,
consumer_id,
kv_indexer_clone.event_sender(),
kv_indexer_clone.remove_worker_sender(),
kv_router_config
.router_snapshot_threshold
.map(|_| kv_indexer_clone.get_workers_sender()),
kv_router_config
.router_snapshot_threshold
.map(|_| kv_indexer_clone.snapshot_event_sender()),
cancellation_token_clone,
kv_router_config.router_snapshot_threshold,
kv_router_config.router_reset_states,
)
.await
{
tracing::error!("Failed to start JetStream subscriber: {e}");
}
});
start_kv_router_background(
component.clone(),
consumer_id,
kv_indexer.event_sender(),
kv_indexer.remove_worker_sender(),
kv_router_config
.router_snapshot_threshold
.map(|_| kv_indexer.get_workers_sender()),
kv_router_config
.router_snapshot_threshold
.map(|_| kv_indexer.snapshot_event_sender()),
cancellation_token.clone(),
kv_router_config.router_snapshot_threshold,
kv_router_config.router_reset_states,
)
.await?;
}
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment