Unverified Commit e66f3267 authored by Yan Ru Pei's avatar Yan Ru Pei Committed by GitHub
Browse files

fix: derive kv-router discovery name from target worker component (#6475)


Signed-off-by: default avatarPeaBrane <yanrpei@gmail.com>
parent 3097adb2
......@@ -419,8 +419,9 @@ impl ModelManager {
let instance_id = discovery.instance_id();
// Build transport for router endpoint based on request plane mode
// Use KV_ROUTER_COMPONENT as the component name to distinguish from the generate endpoint's component
let router_endpoint_id = router_endpoint_id(endpoint.id().namespace);
// Use the worker's component name so each target pool gets its own router discovery group
let router_endpoint_id =
router_endpoint_id(endpoint.id().namespace, endpoint.id().component);
let transport = build_transport_type(endpoint, &router_endpoint_id, instance_id).await?;
let discovery_spec = DiscoverySpec::Endpoint {
......
......@@ -88,23 +88,22 @@ pub fn worker_kv_indexer_query_endpoint(dp_rank: DpRank) -> String {
}
// for router discovery registration
pub const KV_ROUTER_COMPONENT: &str = "kv-router";
pub const KV_ROUTER_ENDPOINT: &str = "generate";
pub const KV_ROUTER_ENDPOINT: &str = "router-discovery";
/// Creates an EndpointId for the KV router in the given namespace.
pub fn router_endpoint_id(namespace: String) -> EndpointId {
pub fn router_endpoint_id(namespace: String, component: String) -> EndpointId {
EndpointId {
namespace,
component: KV_ROUTER_COMPONENT.to_string(),
component,
name: KV_ROUTER_ENDPOINT.to_string(),
}
}
/// Creates a DiscoveryQuery for the KV router in the given namespace.
pub fn router_discovery_query(namespace: String) -> DiscoveryQuery {
pub fn router_discovery_query(namespace: String, component: String) -> DiscoveryQuery {
DiscoveryQuery::Endpoint {
namespace,
component: KV_ROUTER_COMPONENT.to_string(),
component,
endpoint: KV_ROUTER_ENDPOINT.to_string(),
}
}
......
......@@ -282,7 +282,8 @@ pub async fn start_kv_router_background(
// Watch for router deletions to clean up orphaned consumers via discovery
let generate_endpoint = component.endpoint("generate");
let discovery_client = component.drt().discovery();
let router_discovery_key = router_discovery_query(component.namespace().name());
let router_discovery_key =
router_discovery_query(component.namespace().name(), component.name().to_string());
let mut router_event_stream = discovery_client
.list_and_watch(router_discovery_key, Some(cancellation_token.clone()))
.await?;
......@@ -572,7 +573,10 @@ async fn cleanup_orphaned_consumers(
// Get active routers from discovery
let discovery = component.drt().discovery();
let Ok(router_instances) = discovery
.list(router_discovery_query(component.namespace().name()))
.list(router_discovery_query(
component.namespace().name(),
component.name().to_string(),
))
.await
else {
tracing::debug!("Failed to list router instances from discovery, skipping cleanup");
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment