Unverified Commit 1ab2fe1b authored by Yan Ru Pei's avatar Yan Ru Pei Committed by GitHub
Browse files

feat: remove distributed locks dependency from Router (#4113)


Signed-off-by: default avatarPeaBrane <yanrpei@gmail.com>
parent 0b284b63
...@@ -70,8 +70,6 @@ pub const ACTIVE_SEQUENCES_SUBJECT: &str = "active_sequences_events"; ...@@ -70,8 +70,6 @@ pub const ACTIVE_SEQUENCES_SUBJECT: &str = "active_sequences_events";
// for radix tree snapshot storage // for radix tree snapshot storage
pub const RADIX_STATE_BUCKET: &str = "radix-bucket"; pub const RADIX_STATE_BUCKET: &str = "radix-bucket";
pub const RADIX_STATE_FILE: &str = "radix-state"; pub const RADIX_STATE_FILE: &str = "radix-state";
pub const ROUTER_SNAPSHOT_LOCK: &str = "router-snapshot-lock";
pub const ROUTER_CLEANUP_LOCK: &str = "router-cleanup-lock";
/// A trait that users can implement to define custom selection logic /// A trait that users can implement to define custom selection logic
pub trait WorkerSelector { pub trait WorkerSelector {
......
...@@ -168,7 +168,7 @@ pub fn compute_seq_hash_for_block(block_hashes: &[LocalBlockHash]) -> Vec<Sequen ...@@ -168,7 +168,7 @@ pub fn compute_seq_hash_for_block(block_hashes: &[LocalBlockHash]) -> Vec<Sequen
} }
/// A [`KvCacheEvent`] on a specific LLM worker denoted by [`WorkerId`]. /// A [`KvCacheEvent`] on a specific LLM worker denoted by [`WorkerId`].
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct RouterEvent { pub struct RouterEvent {
/// The ID of the worker emitting the event. /// The ID of the worker emitting the event.
worker_id: WorkerId, worker_id: WorkerId,
......
...@@ -230,7 +230,7 @@ pub struct KvCacheEvents { ...@@ -230,7 +230,7 @@ pub struct KvCacheEvents {
} }
/// Represents a single cache event with an ID and associated data. /// Represents a single cache event with an ID and associated data.
#[derive(Serialize, Deserialize, Debug, Clone)] #[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
pub struct KvCacheEvent { pub struct KvCacheEvent {
/// The unique identifier of the event. /// The unique identifier of the event.
pub event_id: u64, pub event_id: u64,
...@@ -244,7 +244,7 @@ pub struct KvCacheEvent { ...@@ -244,7 +244,7 @@ pub struct KvCacheEvent {
/// Represents the data associated with a cache event. /// Represents the data associated with a cache event.
/// ///
/// Data is either stored or removed. /// Data is either stored or removed.
#[derive(Serialize, Deserialize, Debug, Clone)] #[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
#[serde(rename_all = "snake_case")] #[serde(rename_all = "snake_case")]
pub enum KvCacheEventData { pub enum KvCacheEventData {
Stored(KvCacheStoreData), Stored(KvCacheStoreData),
...@@ -253,7 +253,7 @@ pub enum KvCacheEventData { ...@@ -253,7 +253,7 @@ pub enum KvCacheEventData {
} }
/// Represents the data associated with a stored cache event. /// Represents the data associated with a stored cache event.
#[derive(Serialize, Deserialize, Debug, Clone)] #[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
pub struct KvCacheStoreData { pub struct KvCacheStoreData {
/// The optional hash of the parent block. /// The optional hash of the parent block.
pub parent_hash: Option<ExternalSequenceBlockHash>, pub parent_hash: Option<ExternalSequenceBlockHash>,
...@@ -262,7 +262,7 @@ pub struct KvCacheStoreData { ...@@ -262,7 +262,7 @@ pub struct KvCacheStoreData {
} }
/// Represents data for a stored block. /// Represents data for a stored block.
#[derive(Serialize, Deserialize, Debug, Clone)] #[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
pub struct KvCacheStoredBlockData { pub struct KvCacheStoredBlockData {
/// The hash of the block. /// The hash of the block.
pub block_hash: ExternalSequenceBlockHash, pub block_hash: ExternalSequenceBlockHash,
...@@ -271,7 +271,7 @@ pub struct KvCacheStoredBlockData { ...@@ -271,7 +271,7 @@ pub struct KvCacheStoredBlockData {
} }
/// Represents the data associated with a removed cache event. /// Represents the data associated with a removed cache event.
#[derive(Serialize, Deserialize, Debug, Clone)] #[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
pub struct KvCacheRemoveData { pub struct KvCacheRemoveData {
/// A list of block hashes to remove. /// A list of block hashes to remove.
pub block_hashes: Vec<ExternalSequenceBlockHash>, pub block_hashes: Vec<ExternalSequenceBlockHash>,
......
...@@ -11,49 +11,124 @@ use dynamo_runtime::{ ...@@ -11,49 +11,124 @@ use dynamo_runtime::{
prelude::*, prelude::*,
traits::events::EventPublisher, traits::events::EventPublisher,
transports::{ transports::{
etcd::{Client as EtcdClient, DistributedRWLock, WatchEvent}, etcd::{Client as EtcdClient, WatchEvent},
nats::{NatsQueue, Slug}, nats::{NatsQueue, Slug},
}, },
}; };
use rand::Rng;
use tokio::sync::{mpsc, oneshot}; use tokio::sync::{mpsc, oneshot};
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use crate::{ use crate::{
discovery::KV_ROUTERS_ROOT_PATH, discovery::KV_ROUTERS_ROOT_PATH,
kv_router::{ kv_router::{
KV_EVENT_SUBJECT, RADIX_STATE_BUCKET, RADIX_STATE_FILE, ROUTER_CLEANUP_LOCK, KV_EVENT_SUBJECT, RADIX_STATE_BUCKET, RADIX_STATE_FILE,
ROUTER_SNAPSHOT_LOCK,
indexer::{DumpRequest, GetWorkersRequest, RouterEvent}, indexer::{DumpRequest, GetWorkersRequest, RouterEvent},
protocols::WorkerId, protocols::WorkerId,
}, },
}; };
/// Delay between snapshot reads to verify stability
const SNAPSHOT_STABILITY_DELAY: Duration = Duration::from_millis(100);
const MAX_SNAPSHOT_STABILITY_ATTEMPTS: usize = 10;
const CHECK_INTERVAL_BASE: Duration = Duration::from_secs(1);
const CHECK_INTERVAL_JITTER_MS: i64 = 100;
/// Download a stable snapshot from object store and send events to the indexer.
/// Retries until two consecutive reads match or max attempts is reached.
async fn download_stable_snapshot(
nats_client: &dynamo_runtime::transports::nats::Client,
bucket_name: &str,
kv_events_tx: &mpsc::Sender<RouterEvent>,
) -> Result<()> {
let url = url::Url::parse(&format!(
"nats://{}/{bucket_name}/{RADIX_STATE_FILE}",
nats_client.addr()
))?;
// Try to get initial snapshot
let Ok(mut prev_events) = nats_client
.object_store_download_data::<Vec<RouterEvent>>(&url)
.await
else {
tracing::debug!(
"Failed to download snapshots. This is normal for freshly started Router replicas."
);
return Ok(());
};
// Keep trying until we get two consecutive stable reads
for attempt in 1..=MAX_SNAPSHOT_STABILITY_ATTEMPTS {
tokio::time::sleep(SNAPSHOT_STABILITY_DELAY).await;
let curr_events = match nats_client
.object_store_download_data::<Vec<RouterEvent>>(&url)
.await
{
Ok(events) => events,
Err(e) => {
tracing::warn!(
"Snapshot read failed on attempt {attempt}, using previous snapshot with {} events: {e:?}",
prev_events.len()
);
break;
}
};
// Check if snapshot is stable (two consecutive reads match)
if prev_events == curr_events {
tracing::info!(
"Successfully downloaded stable snapshot with {} events from object store (stable after {attempt} attempts)",
curr_events.len()
);
prev_events = curr_events;
break;
}
tracing::debug!(
"Snapshot changed between reads on attempt {attempt} ({} -> {} events), retrying",
prev_events.len(),
curr_events.len()
);
prev_events = curr_events;
if attempt == MAX_SNAPSHOT_STABILITY_ATTEMPTS {
tracing::warn!(
"Max stability attempts reached, using latest snapshot with {} events",
prev_events.len()
);
}
}
// Send all events to the indexer
for event in prev_events {
if let Err(e) = kv_events_tx.send(event).await {
tracing::warn!("Failed to send initial event to indexer: {e:?}");
}
}
tracing::info!("Successfully sent all initial events to indexer");
Ok(())
}
/// Resources required for snapshot operations /// Resources required for snapshot operations
#[derive(Clone)] #[derive(Clone)]
struct SnapshotResources { struct SnapshotResources {
nats_client: dynamo_runtime::transports::nats::Client, nats_client: dynamo_runtime::transports::nats::Client,
bucket_name: String, bucket_name: String,
rwlock: DistributedRWLock,
instances_rx: tokio::sync::watch::Receiver<Vec<dynamo_runtime::component::Instance>>, instances_rx: tokio::sync::watch::Receiver<Vec<dynamo_runtime::component::Instance>>,
get_workers_tx: mpsc::Sender<GetWorkersRequest>, get_workers_tx: mpsc::Sender<GetWorkersRequest>,
snapshot_tx: mpsc::Sender<DumpRequest>, snapshot_tx: mpsc::Sender<DumpRequest>,
} }
impl SnapshotResources { impl SnapshotResources {
/// Perform snapshot upload and purge operations with write lock /// Perform snapshot upload and purge operations
async fn purge_then_snapshot( async fn purge_then_snapshot(
&self, &self,
etcd_client: &EtcdClient,
nats_queue: &mut NatsQueue, nats_queue: &mut NatsQueue,
remove_worker_tx: &mpsc::Sender<WorkerId>, remove_worker_tx: &mpsc::Sender<WorkerId>,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
// Try to acquire write lock (non-blocking)
let Some(_write_guard) = self.rwlock.try_write_lock(etcd_client).await else {
tracing::debug!(
"Could not acquire write lock for snapshot (readers active or lock held)"
);
anyhow::bail!("Write lock unavailable");
};
// Purge before snapshot ensures new/warm-restarted routers won't replay already-acknowledged messages. // Purge before snapshot ensures new/warm-restarted routers won't replay already-acknowledged messages.
// Since KV events are idempotent, this ordering reduces unnecessary reprocessing while maintaining // Since KV events are idempotent, this ordering reduces unnecessary reprocessing while maintaining
// at-least-once delivery guarantees. The snapshot will capture the clean state after purge. // at-least-once delivery guarantees. The snapshot will capture the clean state after purge.
...@@ -183,58 +258,16 @@ pub async fn start_kv_router_background( ...@@ -183,58 +258,16 @@ pub async fn start_kv_router_background(
.to_string() .to_string()
.replace("_", "-"); .replace("_", "-");
// Create RWLock for snapshot coordination
let lock_prefix = format!("{}/{}", ROUTER_SNAPSHOT_LOCK, component.subject());
let snapshot_rwlock = DistributedRWLock::new(lock_prefix);
// Handle initial state based on router_reset_states flag // Handle initial state based on router_reset_states flag
if router_reset_states { if !router_reset_states {
// Try to download initial state from object store with stability check
download_stable_snapshot(&nats_client, &bucket_name, &kv_events_tx).await?;
} else {
// Delete the bucket to reset state // Delete the bucket to reset state
tracing::info!("Resetting router state, deleting bucket: {bucket_name}"); tracing::info!("Resetting router state, deleting bucket: {bucket_name}");
if let Err(e) = nats_client.object_store_delete_bucket(&bucket_name).await { if let Err(e) = nats_client.object_store_delete_bucket(&bucket_name).await {
tracing::warn!("Failed to delete bucket (may not exist): {e:?}"); tracing::warn!("Failed to delete bucket (may not exist): {e:?}");
} }
} else {
// Try to download initial state from object store with read lock
let url = url::Url::parse(&format!(
"nats://{}/{bucket_name}/{RADIX_STATE_FILE}",
nats_client.addr()
))?;
// Acquire read lock with default timeout
if let Ok(_read_guard) = snapshot_rwlock
.read_lock_with_wait(&etcd_client, &consumer_uuid, None)
.await
{
tracing::debug!("Acquired read lock for snapshot download");
// Download snapshot while holding read lock
match nats_client
.object_store_download_data::<Vec<RouterEvent>>(&url)
.await
{
Ok(events) => {
tracing::info!(
"Successfully downloaded {} events from object store",
events.len()
);
// Send all events to the indexer
for event in events {
if let Err(e) = kv_events_tx.send(event).await {
tracing::warn!("Failed to send initial event to indexer: {e:?}");
}
}
tracing::info!("Successfully sent all initial events to indexer");
}
Err(_) => {
tracing::debug!(
"Failed to download snapshots. This is normal for freshly started Router replicas."
);
}
}
} else {
tracing::warn!("Could not acquire read lock for snapshot download (timeout or error)");
}
} }
// Cleanup orphaned consumers on startup // Cleanup orphaned consumers on startup
...@@ -271,7 +304,6 @@ pub async fn start_kv_router_background( ...@@ -271,7 +304,6 @@ pub async fn start_kv_router_background(
Some(SnapshotResources { Some(SnapshotResources {
nats_client, nats_client,
bucket_name, bucket_name,
rwlock: snapshot_rwlock.clone(),
instances_rx, instances_rx,
get_workers_tx, get_workers_tx,
snapshot_tx, snapshot_tx,
...@@ -281,7 +313,13 @@ pub async fn start_kv_router_background( ...@@ -281,7 +313,13 @@ pub async fn start_kv_router_background(
}; };
tokio::spawn(async move { tokio::spawn(async move {
let mut check_interval = tokio::time::interval(Duration::from_secs(1)); // Create interval with jitter
let jitter_ms =
rand::rng().random_range(-CHECK_INTERVAL_JITTER_MS..=CHECK_INTERVAL_JITTER_MS);
let interval_duration = Duration::from_millis(
(CHECK_INTERVAL_BASE.as_millis() as i64 + jitter_ms).max(1) as u64,
);
let mut check_interval = tokio::time::interval(interval_duration);
check_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); check_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop { loop {
...@@ -365,17 +403,15 @@ pub async fn start_kv_router_background( ...@@ -365,17 +403,15 @@ pub async fn start_kv_router_background(
continue; continue;
}; };
// Guard clause: skip if message count is too low
let threshold = router_snapshot_threshold.unwrap_or(u32::MAX) as u64; let threshold = router_snapshot_threshold.unwrap_or(u32::MAX) as u64;
if message_count <= threshold { if message_count <= threshold {
continue; continue;
} }
tracing::info!("Stream has {message_count} messages, attempting to acquire write lock for purge and snapshot"); tracing::info!("Stream has {message_count} messages (threshold: {threshold}), performing purge and snapshot");
// Perform snapshot upload and purge (acquires write lock internally)
match resources.purge_then_snapshot( match resources.purge_then_snapshot(
&etcd_client,
&mut nats_queue, &mut nats_queue,
&remove_worker_tx, &remove_worker_tx,
).await { ).await {
...@@ -414,28 +450,11 @@ pub async fn start_kv_router_background( ...@@ -414,28 +450,11 @@ pub async fn start_kv_router_background(
tracing::info!("Attempting to delete orphaned consumer: {consumer_to_delete}"); tracing::info!("Attempting to delete orphaned consumer: {consumer_to_delete}");
// Create a unique cleanup lock for this specific consumer // Delete the consumer (allow race condition if multiple routers try to delete)
let cleanup_lock_name = format!("{}/{}/{}", ROUTER_CLEANUP_LOCK, component.subject(), consumer_to_delete); if let Err(e) = nats_queue.shutdown(Some(consumer_to_delete.clone())).await {
let cleanup_rwlock = DistributedRWLock::new(cleanup_lock_name); tracing::warn!("Failed to delete consumer {consumer_to_delete}: {e}");
// Try to acquire cleanup write lock (non-blocking) before deleting consumer
if let Some(_cleanup_guard) = cleanup_rwlock.try_write_lock(&etcd_client).await {
tracing::debug!(
"Acquired cleanup lock for deleting consumer: {consumer_to_delete}"
);
// Delete the consumer
if let Err(e) = nats_queue.shutdown(Some(consumer_to_delete.clone())).await {
tracing::warn!("Failed to delete consumer {consumer_to_delete}: {e}");
} else {
tracing::info!("Successfully deleted orphaned consumer: {consumer_to_delete}");
}
// Cleanup lock is automatically released when _cleanup_guard goes out of scope
} else { } else {
tracing::debug!( tracing::info!("Successfully deleted orphaned consumer: {consumer_to_delete}");
"Could not acquire cleanup lock for consumer {consumer_to_delete}"
);
} }
} }
} }
......
...@@ -1034,7 +1034,7 @@ def test_indexers_sync(request, runtime_services, predownload_tokenizers): ...@@ -1034,7 +1034,7 @@ def test_indexers_sync(request, runtime_services, predownload_tokenizers):
# Wait for a second before creating the second router # Wait for a second before creating the second router
logger.info("Waiting for 1 second before creating second router") logger.info("Waiting for 1 second before creating second router")
await asyncio.sleep(1) await asyncio.sleep(2)
# Launch second router - will automatically sync with the first router's state # Launch second router - will automatically sync with the first router's state
logger.info("Creating second KV router") logger.info("Creating second KV router")
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment