Unverified Commit 602ce0ed authored by Yan Ru Pei's avatar Yan Ru Pei Committed by GitHub
Browse files

chore: use u64 as router consumer id instead of uuid (#5478)


Signed-off-by: default avatarPeaBrane <yanrpei@gmail.com>
parent d5befaab
...@@ -374,9 +374,6 @@ impl ModelManager { ...@@ -374,9 +374,6 @@ impl ModelManager {
discovery.register(discovery_spec).await?; discovery.register(discovery_spec).await?;
// Use instance_id (hex) as the consumer ID for NATS consumer coordination
let consumer_id = instance_id.to_string();
// Get or create runtime config watcher for this endpoint // Get or create runtime config watcher for this endpoint
let workers_with_configs = self.get_or_create_runtime_config_watcher(endpoint).await?; let workers_with_configs = self.get_or_create_runtime_config_watcher(endpoint).await?;
...@@ -388,7 +385,7 @@ impl ModelManager { ...@@ -388,7 +385,7 @@ impl ModelManager {
kv_cache_block_size, kv_cache_block_size,
Some(selector), Some(selector),
kv_router_config, kv_router_config,
consumer_id, instance_id,
) )
.await?; .await?;
let new_kv_chooser = Arc::new(chooser); let new_kv_chooser = Arc::new(chooser);
......
...@@ -334,7 +334,7 @@ impl KvRouter { ...@@ -334,7 +334,7 @@ impl KvRouter {
block_size: u32, block_size: u32,
selector: Option<Box<dyn WorkerSelector + Send + Sync>>, selector: Option<Box<dyn WorkerSelector + Send + Sync>>,
kv_router_config: Option<KvRouterConfig>, kv_router_config: Option<KvRouterConfig>,
consumer_id: String, router_id: u64,
) -> Result<Self> { ) -> Result<Self> {
let kv_router_config = kv_router_config.unwrap_or_default(); let kv_router_config = kv_router_config.unwrap_or_default();
let component = endpoint.component(); let component = endpoint.component();
...@@ -389,7 +389,7 @@ impl KvRouter { ...@@ -389,7 +389,7 @@ impl KvRouter {
workers_with_configs.clone(), workers_with_configs.clone(),
selector, selector,
kv_router_config.router_replica_sync, kv_router_config.router_replica_sync,
consumer_id.clone(), router_id,
) )
.await?; .await?;
...@@ -442,6 +442,8 @@ impl KvRouter { ...@@ -442,6 +442,8 @@ impl KvRouter {
"Not all workers have local_indexer enabled, using JetStream subscription" "Not all workers have local_indexer enabled, using JetStream subscription"
); );
// Convert router_id to string for NATS consumer naming
let consumer_id = router_id.to_string();
start_kv_router_background( start_kv_router_background(
component.clone(), component.clone(),
consumer_id, consumer_id,
......
...@@ -3,7 +3,6 @@ ...@@ -3,7 +3,6 @@
use crate::tokens::{SequenceHash, Token}; use crate::tokens::{SequenceHash, Token};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use uuid::Uuid;
use xxhash_rust::xxh3; use xxhash_rust::xxh3;
/// Seed for XXH3 hashing, consistent with indexer.rs /// Seed for XXH3 hashing, consistent with indexer.rs
...@@ -265,7 +264,7 @@ pub struct PrefillEvent { ...@@ -265,7 +264,7 @@ pub struct PrefillEvent {
pub request_id: String, pub request_id: String,
pub worker_id: WorkerId, pub worker_id: WorkerId,
pub data: PrefillEventData, pub data: PrefillEventData,
pub router_id: Uuid, pub router_id: u64,
} }
/// Represents the different stages of prefilling tokens for a request. /// Represents the different stages of prefilling tokens for a request.
...@@ -284,7 +283,7 @@ pub struct ActiveSequenceEvent { ...@@ -284,7 +283,7 @@ pub struct ActiveSequenceEvent {
pub request_id: String, pub request_id: String,
pub worker: WorkerWithDpRank, pub worker: WorkerWithDpRank,
pub data: ActiveSequenceEventData, pub data: ActiveSequenceEventData,
pub router_id: Uuid, pub router_id: u64,
} }
#[derive(Serialize, Deserialize, Debug, Clone)] #[derive(Serialize, Deserialize, Debug, Clone)]
......
...@@ -99,7 +99,7 @@ impl KvScheduler { ...@@ -99,7 +99,7 @@ impl KvScheduler {
workers_with_configs: Arc<RuntimeConfigsWithNotify>, workers_with_configs: Arc<RuntimeConfigsWithNotify>,
selector: Option<Box<dyn WorkerSelector + Send + Sync>>, selector: Option<Box<dyn WorkerSelector + Send + Sync>>,
replica_sync: bool, replica_sync: bool,
router_uuid: String, router_id: u64,
) -> Result<Self, KvSchedulerError> { ) -> Result<Self, KvSchedulerError> {
let selector = selector.unwrap_or(Box::new(DefaultWorkerSelector::default())); let selector = selector.unwrap_or(Box::new(DefaultWorkerSelector::default()));
...@@ -116,7 +116,7 @@ impl KvScheduler { ...@@ -116,7 +116,7 @@ impl KvScheduler {
block_size as usize, block_size as usize,
initial_workers, initial_workers,
replica_sync, replica_sync,
router_uuid, router_id,
)); ));
// Spawn background task to sync slots with DashMap when notified of changes. // Spawn background task to sync slots with DashMap when notified of changes.
......
...@@ -409,7 +409,7 @@ pub struct ActiveSequencesMultiWorker { ...@@ -409,7 +409,7 @@ pub struct ActiveSequencesMultiWorker {
handles: Arc<DashMap<WorkerWithDpRank, std::thread::JoinHandle<()>>>, handles: Arc<DashMap<WorkerWithDpRank, std::thread::JoinHandle<()>>>,
block_size: usize, block_size: usize,
component: Component, component: Component,
router_id: Uuid, router_id: u64,
replica_sync: bool, replica_sync: bool,
} }
...@@ -419,21 +419,13 @@ impl ActiveSequencesMultiWorker { ...@@ -419,21 +419,13 @@ impl ActiveSequencesMultiWorker {
block_size: usize, block_size: usize,
workers_with_configs: HashMap<u64, Option<ModelRuntimeConfig>>, workers_with_configs: HashMap<u64, Option<ModelRuntimeConfig>>,
replica_sync: bool, replica_sync: bool,
router_uuid: String, router_id: u64,
) -> Self { ) -> Self {
assert!(block_size > 1, "block_size must be greater than 1"); assert!(block_size > 1, "block_size must be greater than 1");
let senders = Arc::new(DashMap::new()); let senders = Arc::new(DashMap::new());
let handles = Arc::new(DashMap::new()); let handles = Arc::new(DashMap::new());
let request_to_worker = Arc::new(DashMap::new()); let request_to_worker = Arc::new(DashMap::new());
let router_id = Uuid::parse_str(&router_uuid).unwrap_or_else(|e| {
tracing::warn!(
"Failed to parse router UUID '{}': {}, using new UUID",
router_uuid,
e
);
Uuid::new_v4()
});
// Expand workers by their dp_rank // Expand workers by their dp_rank
for (worker_id, config) in workers_with_configs { for (worker_id, config) in workers_with_configs {
...@@ -602,7 +594,7 @@ impl ActiveSequencesMultiWorker { ...@@ -602,7 +594,7 @@ impl ActiveSequencesMultiWorker {
>, >,
request_to_worker: Arc<DashMap<RequestId, WorkerWithDpRank>>, request_to_worker: Arc<DashMap<RequestId, WorkerWithDpRank>>,
component: Component, component: Component,
router_id: Uuid, router_id: u64,
cancel_token: CancellationToken, cancel_token: CancellationToken,
) -> Result<()> { ) -> Result<()> {
let mut subscriber = component let mut subscriber = component
...@@ -1249,14 +1241,14 @@ mod tests { ...@@ -1249,14 +1241,14 @@ mod tests {
block_size, block_size,
workers_with_configs.clone(), workers_with_configs.clone(),
true, true,
Uuid::new_v4().to_string(), 1,
)); ));
let seq_manager_2 = Arc::new(ActiveSequencesMultiWorker::new( let seq_manager_2 = Arc::new(ActiveSequencesMultiWorker::new(
component, component,
block_size, block_size,
workers_with_configs, workers_with_configs,
true, true,
Uuid::new_v4().to_string(), 2,
)); ));
// Give some time for the subscription loops to start // Give some time for the subscription loops to start
...@@ -1408,14 +1400,14 @@ mod tests { ...@@ -1408,14 +1400,14 @@ mod tests {
block_size, block_size,
workers_with_configs.clone(), workers_with_configs.clone(),
true, true,
Uuid::new_v4().to_string(), 1,
)); ));
let seq_manager_2 = Arc::new(ActiveSequencesMultiWorker::new( let seq_manager_2 = Arc::new(ActiveSequencesMultiWorker::new(
component, component,
block_size, block_size,
workers_with_configs, workers_with_configs,
true, true,
Uuid::new_v4().to_string(), 2,
)); ));
// Give some time for the subscription loops to start // Give some time for the subscription loops to start
......
...@@ -654,7 +654,7 @@ pub async fn start_kv_router_background( ...@@ -654,7 +654,7 @@ pub async fn start_kv_router_background(
let router_instance_id = id.instance_id(); let router_instance_id = id.instance_id();
// The consumer UUID is the instance_id in hex format // The consumer ID is the instance_id as a string
let consumer_to_delete = router_instance_id.to_string(); let consumer_to_delete = router_instance_id.to_string();
tracing::info!( tracing::info!(
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment