Unverified Commit 1549c338 authored by Yan Ru Pei's avatar Yan Ru Pei Committed by GitHub
Browse files

chore: derive router_id and cancellation_token from component instead of plumbing (#6419)


Signed-off-by: default avatarPeaBrane <yanrpei@gmail.com>
Co-authored-by: default avatarClaude Opus 4.6 <noreply@anthropic.com>
parent dad4237d
...@@ -443,7 +443,6 @@ impl ModelManager { ...@@ -443,7 +443,6 @@ impl ModelManager {
kv_cache_block_size, kv_cache_block_size,
Some(selector), Some(selector),
kv_router_config, kv_router_config,
instance_id,
worker_type, worker_type,
) )
.await?; .await?;
......
...@@ -141,7 +141,6 @@ impl Indexer { ...@@ -141,7 +141,6 @@ impl Indexer {
component: &dynamo_runtime::component::Component, component: &dynamo_runtime::component::Component,
kv_router_config: &KvRouterConfig, kv_router_config: &KvRouterConfig,
block_size: u32, block_size: u32,
cancellation_token: tokio_util::sync::CancellationToken,
) -> Self { ) -> Self {
if kv_router_config.overlap_score_weight == 0.0 { if kv_router_config.overlap_score_weight == 0.0 {
return Indexer::None; return Indexer::None;
...@@ -156,6 +155,7 @@ impl Indexer { ...@@ -156,6 +155,7 @@ impl Indexer {
} }
let kv_indexer_metrics = indexer::KvIndexerMetrics::from_component(component); let kv_indexer_metrics = indexer::KvIndexerMetrics::from_component(component);
let cancellation_token = component.drt().primary_token();
// If use_kv_events is false, enable TTL and pruning for approximate behavior // If use_kv_events is false, enable TTL and pruning for approximate behavior
let prune_config = if !kv_router_config.use_kv_events { let prune_config = if !kv_router_config.use_kv_events {
...@@ -274,7 +274,6 @@ pub struct KvRouter { ...@@ -274,7 +274,6 @@ pub struct KvRouter {
} }
impl KvRouter { impl KvRouter {
#[allow(clippy::too_many_arguments)]
pub async fn new( pub async fn new(
endpoint: Endpoint, endpoint: Endpoint,
client: Client, client: Client,
...@@ -282,7 +281,6 @@ impl KvRouter { ...@@ -282,7 +281,6 @@ impl KvRouter {
block_size: u32, block_size: u32,
selector: Option<Box<dyn WorkerSelector + Send + Sync>>, selector: Option<Box<dyn WorkerSelector + Send + Sync>>,
kv_router_config: Option<KvRouterConfig>, kv_router_config: Option<KvRouterConfig>,
router_id: u64,
worker_type: &'static str, worker_type: &'static str,
) -> Result<Self> { ) -> Result<Self> {
let kv_router_config = kv_router_config.unwrap_or_default(); let kv_router_config = kv_router_config.unwrap_or_default();
...@@ -290,12 +288,7 @@ impl KvRouter { ...@@ -290,12 +288,7 @@ impl KvRouter {
let component = endpoint.component(); let component = endpoint.component();
let cancellation_token = component.drt().primary_token(); let cancellation_token = component.drt().primary_token();
let indexer = Indexer::new( let indexer = Indexer::new(component, &kv_router_config, block_size);
component,
&kv_router_config,
block_size,
cancellation_token.clone(),
);
// Wait for at least one worker with a known runtime config before starting scheduler // Wait for at least one worker with a known runtime config before starting scheduler
let _ = workers_with_configs let _ = workers_with_configs
...@@ -310,22 +303,14 @@ impl KvRouter { ...@@ -310,22 +303,14 @@ impl KvRouter {
block_size, block_size,
workers_with_configs.clone(), workers_with_configs.clone(),
selector, selector,
kv_router_config.router_replica_sync, &kv_router_config,
router_id,
worker_type, worker_type,
kv_router_config.router_queue_threshold,
) )
.await?; .await?;
// Start KV event subscription if needed (use_kv_events=true and overlap_score_weight>0) // Start KV event subscription if needed (use_kv_events=true and overlap_score_weight>0)
if kv_router_config.should_subscribe_to_kv_events() { if kv_router_config.should_subscribe_to_kv_events() {
subscriber::start_subscriber( subscriber::start_subscriber(component.clone(), &kv_router_config, indexer.clone())
component.clone(),
&kv_router_config,
router_id,
indexer.clone(),
cancellation_token.clone(),
)
.await?; .await?;
} else { } else {
tracing::info!( tracing::info!(
......
...@@ -90,16 +90,13 @@ pub struct KvScheduler { ...@@ -90,16 +90,13 @@ pub struct KvScheduler {
} }
impl KvScheduler { impl KvScheduler {
#[allow(clippy::too_many_arguments)]
pub async fn start( pub async fn start(
component: Component, component: Component,
block_size: u32, block_size: u32,
workers_with_configs: RuntimeConfigWatch, workers_with_configs: RuntimeConfigWatch,
selector: Option<Box<dyn WorkerSelector + Send + Sync>>, selector: Option<Box<dyn WorkerSelector + Send + Sync>>,
replica_sync: bool, kv_router_config: &KvRouterConfig,
router_id: u64,
worker_type: &'static str, worker_type: &'static str,
queue_threshold: Option<f64>,
) -> Result<Self, KvSchedulerError> { ) -> Result<Self, KvSchedulerError> {
let selector = selector.unwrap_or(Box::new(DefaultWorkerSelector::default())); let selector = selector.unwrap_or(Box::new(DefaultWorkerSelector::default()));
...@@ -108,12 +105,13 @@ impl KvScheduler { ...@@ -108,12 +105,13 @@ impl KvScheduler {
let initial_workers: HashMap<WorkerId, ModelRuntimeConfig> = let initial_workers: HashMap<WorkerId, ModelRuntimeConfig> =
workers_with_configs.borrow().clone(); workers_with_configs.borrow().clone();
let router_id = component.drt().discovery().instance_id();
let slots = Arc::new( let slots = Arc::new(
ActiveSequencesMultiWorker::new( ActiveSequencesMultiWorker::new(
component.clone(), component.clone(),
block_size as usize, block_size as usize,
initial_workers, initial_workers,
replica_sync, kv_router_config.router_replica_sync,
router_id, router_id,
worker_type, worker_type,
) )
...@@ -163,7 +161,7 @@ impl KvScheduler { ...@@ -163,7 +161,7 @@ impl KvScheduler {
slots.clone(), slots.clone(),
workers_with_configs.clone(), workers_with_configs.clone(),
ready_notify.clone(), ready_notify.clone(),
queue_threshold, kv_router_config.router_queue_threshold,
)); ));
let queue_clone = queue.clone(); let queue_clone = queue.clone();
......
...@@ -225,10 +225,11 @@ pub async fn start_kv_router_background( ...@@ -225,10 +225,11 @@ pub async fn start_kv_router_background(
component: Component, component: Component,
consumer_id: String, consumer_id: String,
indexer: Indexer, indexer: Indexer,
cancellation_token: CancellationToken, kv_router_config: &KvRouterConfig,
router_snapshot_threshold: Option<u32>,
router_reset_states: bool,
) -> Result<()> { ) -> Result<()> {
let cancellation_token = component.drt().primary_token();
let router_snapshot_threshold = kv_router_config.router_snapshot_threshold;
let router_reset_states = kv_router_config.router_reset_states;
// Set up NATS connections // Set up NATS connections
let stream_name = create_kv_stream_name(&component, KV_EVENT_SUBJECT); let stream_name = create_kv_stream_name(&component, KV_EVENT_SUBJECT);
let nats_server = std::env::var(env_nats::NATS_SERVER) let nats_server = std::env::var(env_nats::NATS_SERVER)
...@@ -449,9 +450,9 @@ pub async fn start_kv_router_background( ...@@ -449,9 +450,9 @@ pub async fn start_kv_router_background(
pub async fn start_kv_router_background_event_plane( pub async fn start_kv_router_background_event_plane(
component: Component, component: Component,
indexer: Indexer, indexer: Indexer,
cancellation_token: CancellationToken,
transport_kind: EventTransportKind, transport_kind: EventTransportKind,
) -> Result<()> { ) -> Result<()> {
let cancellation_token = component.drt().primary_token();
// WorkerQueryClient handles its own discovery loop for lifecycle + initial recovery. // WorkerQueryClient handles its own discovery loop for lifecycle + initial recovery.
// No blocking wait — recovery happens asynchronously as endpoints are discovered. // No blocking wait — recovery happens asynchronously as endpoints are discovered.
let worker_query_client = WorkerQueryClient::spawn(component.clone(), indexer.clone()).await?; let worker_query_client = WorkerQueryClient::spawn(component.clone(), indexer.clone()).await?;
...@@ -600,9 +601,7 @@ async fn cleanup_orphaned_consumers( ...@@ -600,9 +601,7 @@ async fn cleanup_orphaned_consumers(
pub async fn start_subscriber( pub async fn start_subscriber(
component: Component, component: Component,
kv_router_config: &KvRouterConfig, kv_router_config: &KvRouterConfig,
router_id: u64,
indexer: Indexer, indexer: Indexer,
cancellation_token: CancellationToken,
) -> Result<()> { ) -> Result<()> {
let transport_kind = EventTransportKind::from_env_or_default(); let transport_kind = EventTransportKind::from_env_or_default();
...@@ -617,16 +616,8 @@ pub async fn start_subscriber( ...@@ -617,16 +616,8 @@ pub async fn start_subscriber(
} }
tracing::info!("Using JetStream subscription (--durable-kv-events enabled)"); tracing::info!("Using JetStream subscription (--durable-kv-events enabled)");
let consumer_id = router_id.to_string(); let consumer_id = component.drt().discovery().instance_id().to_string();
start_kv_router_background( start_kv_router_background(component, consumer_id, indexer, kv_router_config).await
component,
consumer_id,
indexer,
cancellation_token,
kv_router_config.router_snapshot_threshold,
kv_router_config.router_reset_states,
)
.await
} else { } else {
if transport_kind == EventTransportKind::Zmq { if transport_kind == EventTransportKind::Zmq {
if kv_router_config.router_snapshot_threshold.is_some() if kv_router_config.router_snapshot_threshold.is_some()
...@@ -641,12 +632,6 @@ pub async fn start_subscriber( ...@@ -641,12 +632,6 @@ pub async fn start_subscriber(
tracing::info!("Using NATS Core subscription (local_indexer mode)"); tracing::info!("Using NATS Core subscription (local_indexer mode)");
} }
start_kv_router_background_event_plane( start_kv_router_background_event_plane(component, indexer, transport_kind).await
component.clone(),
indexer,
cancellation_token,
transport_kind,
)
.await
} }
} }
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment