Skip to content
GitLab
Menu
Projects
Groups
Snippets
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
OpenDAS
dynamo
Commits
e93170d6
Unverified
Commit
e93170d6
authored
Oct 16, 2025
by
Yan Ru Pei
Committed by
GitHub
Oct 16, 2025
Browse files
feat: use non-blocking lock for radix uploading + a read lock for radix downloading (#3655)
Signed-off-by:
PeaBrane
<
yanrpei@gmail.com
>
parent
9ca2211e
Changes
3
Hide whitespace changes
Inline
Side-by-side
Showing
3 changed files
with
483 additions
and
109 deletions
+483
-109
lib/llm/src/kv_router/subscriber.rs
lib/llm/src/kv_router/subscriber.rs
+85
-109
lib/runtime/src/transports/etcd.rs
lib/runtime/src/transports/etcd.rs
+2
-0
lib/runtime/src/transports/etcd/lock.rs
lib/runtime/src/transports/etcd/lock.rs
+396
-0
No files found.
lib/llm/src/kv_router/subscriber.rs
View file @
e93170d6
...
...
@@ -11,7 +11,7 @@ use dynamo_runtime::{
prelude
::
*
,
traits
::
events
::
EventPublisher
,
transports
::{
etcd
::{
Client
as
EtcdClient
,
WatchEvent
},
etcd
::{
Client
as
EtcdClient
,
DistributedRWLock
,
WatchEvent
},
nats
::{
NatsQueue
,
Slug
},
},
};
...
...
@@ -33,47 +33,27 @@ use crate::{
struct
SnapshotResources
{
nats_client
:
dynamo_runtime
::
transports
::
nats
::
Client
,
bucket_name
:
String
,
lock
_name
:
String
,
rw
lock
:
DistributedRWLock
,
instances_rx
:
tokio
::
sync
::
watch
::
Receiver
<
Vec
<
dynamo_runtime
::
component
::
Instance
>>
,
get_workers_tx
:
mpsc
::
Sender
<
GetWorkersRequest
>
,
snapshot_tx
:
mpsc
::
Sender
<
DumpRequest
>
,
}
impl
SnapshotResources
{
/// Try to acquire distributed lock for snapshot operations
/// Returns Some(lock_response) if lock acquired, None if another instance holds it
async
fn
lock
(
&
self
,
etcd_client
:
&
EtcdClient
)
->
Option
<
etcd_client
::
LockResponse
>
{
match
etcd_client
.lock
(
self
.lock_name
.clone
(),
Some
(
etcd_client
.lease_id
()))
.await
{
Ok
(
response
)
=>
{
tracing
::
debug!
(
"Successfully acquired snapshot lock with key: {:?}"
,
response
.key
()
);
Some
(
response
)
}
Err
(
e
)
=>
{
tracing
::
debug!
(
"Another instance already holds the snapshot lock: {e:?}"
);
None
}
}
}
/// Release the distributed lock
async
fn
unlock
(
&
self
,
etcd_client
:
&
EtcdClient
,
lock_response
:
etcd_client
::
LockResponse
)
{
if
let
Err
(
e
)
=
etcd_client
.unlock
(
lock_response
.key
())
.await
{
tracing
::
warn!
(
"Failed to release snapshot lock: {e:?}"
);
}
}
/// Perform snapshot upload and purge operations
/// Perform snapshot upload and purge operations with write lock
async
fn
purge_then_snapshot
(
&
self
,
etcd_client
:
&
EtcdClient
,
nats_queue
:
&
mut
NatsQueue
,
remove_worker_tx
:
&
mpsc
::
Sender
<
WorkerId
>
,
)
->
anyhow
::
Result
<
()
>
{
// Try to acquire write lock (non-blocking)
let
Some
(
_
write_guard
)
=
self
.rwlock
.try_write_lock
(
etcd_client
)
.await
else
{
tracing
::
debug!
(
"Could not acquire write lock for snapshot (readers active or lock held)"
);
anyhow
::
bail!
(
"Write lock unavailable"
);
};
// Purge before snapshot ensures new/warm-restarted routers won't replay already-acknowledged messages.
// Since KV events are idempotent, this ordering reduces unnecessary reprocessing while maintaining
// at-least-once delivery guarantees. The snapshot will capture the clean state after purge.
...
...
@@ -101,13 +81,11 @@ impl SnapshotResources {
for
worker_id
in
indexer_worker_ids
{
if
!
current_worker_ids
.contains
(
&
worker_id
)
{
tracing
::
info!
(
"Removing stale worker {} from indexer during snapshot"
,
worker_id
"Removing stale worker {worker_id} from indexer during snapshot"
);
if
let
Err
(
e
)
=
remove_worker_tx
.send
(
worker_id
)
.await
{
tracing
::
warn!
(
"Failed to send remove_worker for stale worker {}: {e:?}"
,
worker_id
"Failed to send remove_worker for stale worker {worker_id}: {e:?}"
);
}
}
...
...
@@ -194,11 +172,21 @@ pub async fn start_kv_router_background(
.build
()
?
;
let
nats_client
=
client_options
.connect
()
.await
?
;
// Get etcd client (needed for both snapshots and router watching)
let
etcd_client
=
component
.drt
()
.etcd_client
()
.ok_or_else
(||
anyhow
::
anyhow!
(
"etcd client not available"
))
?
;
// Create bucket name for snapshots/state
let
bucket_name
=
Slug
::
slugify
(
&
format!
(
"{}-{RADIX_STATE_BUCKET}"
,
component
.subject
()))
.to_string
()
.replace
(
"_"
,
"-"
);
// Create RWLock for snapshot coordination
let
lock_prefix
=
format!
(
"{}/{}"
,
ROUTER_SNAPSHOT_LOCK
,
component
.subject
());
let
snapshot_rwlock
=
DistributedRWLock
::
new
(
lock_prefix
);
// Handle initial state based on router_reset_states flag
if
router_reset_states
{
// Delete the bucket to reset state
...
...
@@ -207,43 +195,48 @@ pub async fn start_kv_router_background(
tracing
::
warn!
(
"Failed to delete bucket (may not exist): {e:?}"
);
}
}
else
{
// Try to download initial state from object store
// Try to download initial state from object store
with read lock
let
url
=
url
::
Url
::
parse
(
&
format!
(
"nats://{}/{bucket_name}/{RADIX_STATE_FILE}"
,
nats_client
.addr
()
))
?
;
match
nats_client
.object_store_download_data
::
<
Vec
<
RouterEvent
>>
(
&
url
)
// Acquire read lock with default timeout
if
let
Ok
(
_
read_guard
)
=
snapshot_rwlock
.read_lock_with_wait
(
&
etcd_client
,
&
consumer_uuid
,
None
)
.await
{
Ok
(
events
)
=>
{
tracing
::
info!
(
"Successfully downloaded {} events from object store"
,
events
.len
()
);
// Send all events to the indexer
for
event
in
events
{
if
let
Err
(
e
)
=
kv_events_tx
.send
(
event
)
.await
{
tracing
::
warn!
(
"Failed to send initial event to indexer: {e:?}"
);
tracing
::
debug!
(
"Acquired read lock for snapshot download"
);
// Download snapshot while holding read lock
match
nats_client
.object_store_download_data
::
<
Vec
<
RouterEvent
>>
(
&
url
)
.await
{
Ok
(
events
)
=>
{
tracing
::
info!
(
"Successfully downloaded {} events from object store"
,
events
.len
()
);
// Send all events to the indexer
for
event
in
events
{
if
let
Err
(
e
)
=
kv_events_tx
.send
(
event
)
.await
{
tracing
::
warn!
(
"Failed to send initial event to indexer: {e:?}"
);
}
}
tracing
::
info!
(
"Successfully sent all initial events to indexer"
);
}
Err
(
e
)
=>
{
tracing
::
info!
(
"Did not initialize radix state from NATS object store (likely no snapshots yet): {e:?}"
);
}
tracing
::
info!
(
"Successfully sent all initial events to indexer"
);
}
Err
(
e
)
=>
{
tracing
::
info!
(
"Did not initialize radix state from NATs object store (likely no snapshots yet): {e:?}"
);
}
}
else
{
tracing
::
warn!
(
"Could not acquire read lock for snapshot download (timeout or error)"
);
}
}
// Get etcd client (needed for both snapshots and router watching)
let
etcd_client
=
component
.drt
()
.etcd_client
()
.ok_or_else
(||
anyhow
::
anyhow!
(
"etcd client not available"
))
?
;
// Cleanup orphaned consumers on startup
cleanup_orphaned_consumers
(
&
mut
nats_queue
,
&
etcd_client
,
&
component
,
&
consumer_uuid
)
.await
;
...
...
@@ -252,7 +245,6 @@ pub async fn start_kv_router_background(
.kv_get_and_watch_prefix
(
&
format!
(
"{}/"
,
KV_ROUTERS_ROOT_PATH
))
.await
?
.dissolve
();
let
cleanup_lock_name
=
format!
(
"{}/{}"
,
ROUTER_CLEANUP_LOCK
,
component
.subject
());
// Get the generate endpoint and watch for instance deletions
let
generate_endpoint
=
component
.endpoint
(
"generate"
);
...
...
@@ -276,12 +268,10 @@ pub async fn start_kv_router_background(
maybe_snapshot_tx
,
router_snapshot_threshold
,
)
{
let
lock_name
=
format!
(
"{}/{}"
,
ROUTER_SNAPSHOT_LOCK
,
component
.subject
());
Some
(
SnapshotResources
{
nats_client
,
bucket_name
,
lock
_name
,
rw
lock
:
snapshot_rwlock
.clone
()
,
instances_rx
,
get_workers_tx
,
snapshot_tx
,
...
...
@@ -318,19 +308,19 @@ pub async fn start_kv_router_background(
// Extract the hex worker ID after the colon (e.g., "generate:694d99badb9f7c07" -> "694d99badb9f7c07")
let
Some
(
worker_id_str
)
=
key
.split
(
':'
)
.next_back
()
else
{
tracing
::
warn!
(
"Could not extract worker ID from instance key: {
}"
,
key
);
tracing
::
warn!
(
"Could not extract worker ID from instance key: {key
}"
);
continue
;
};
// Parse as hexadecimal (base 16)
let
Ok
(
worker_id
)
=
i64
::
from_str_radix
(
worker_id_str
,
16
)
else
{
tracing
::
warn!
(
"Could not parse worker ID from instance key: {
}"
,
key
);
tracing
::
warn!
(
"Could not parse worker ID from instance key: {key
}"
);
continue
;
};
tracing
::
info!
(
"Generate endpoint instance deleted, removing worker {
}"
,
worker_id
);
tracing
::
info!
(
"Generate endpoint instance deleted, removing worker {worker_id
}"
);
if
let
Err
(
e
)
=
remove_worker_tx
.send
(
worker_id
)
.await
{
tracing
::
warn!
(
"Failed to send worker removal for worker {
}: {}"
,
worker_id
,
e
);
tracing
::
warn!
(
"Failed to send worker removal for worker {worker_id
}: {e}"
);
}
}
...
...
@@ -382,24 +372,17 @@ pub async fn start_kv_router_background(
continue
;
}
tracing
::
info!
(
"Stream has {message_count} messages, attempting to acquire lock for purge and snapshot"
);
tracing
::
info!
(
"Stream has {message_count} messages, attempting to acquire
write
lock for purge and snapshot"
);
// Try to acquire distributed lock
let
Some
(
lock_response
)
=
resources
.lock
(
&
etcd_client
)
.await
else
{
continue
;
};
// Perform snapshot upload and purge
// Perform snapshot upload and purge (acquires write lock internally)
match
resources
.purge_then_snapshot
(
&
etcd_client
,
&
mut
nats_queue
,
&
remove_worker_tx
,
)
.await
{
Ok
(
_
)
=>
tracing
::
info!
(
"Successfully performed purge and snapshot"
),
Err
(
e
)
=>
tracing
::
error!
(
"Failed to
perform purge and snapshot: {e:?}"
),
Err
(
e
)
=>
tracing
::
debug!
(
"Could not
perform purge and snapshot: {e:?}"
),
}
// Release the lock
resources
.unlock
(
&
etcd_client
,
lock_response
)
.await
;
}
// Handle router deletion events
...
...
@@ -410,7 +393,7 @@ pub async fn start_kv_router_background(
};
let
key
=
String
::
from_utf8_lossy
(
kv
.key
());
tracing
::
info!
(
"Detected router replica deletion: {
}"
,
key
);
tracing
::
info!
(
"Detected router replica deletion: {key
}"
);
// Only process deletions for routers on the same component
if
!
key
.contains
(
component
.path
()
.as_str
())
{
...
...
@@ -423,44 +406,37 @@ pub async fn start_kv_router_background(
// Extract the router UUID from the key
let
Some
(
router_uuid
)
=
key
.split
(
'/'
)
.next_back
()
else
{
tracing
::
warn!
(
"Could not extract UUID from router key: {
}"
,
key
);
tracing
::
warn!
(
"Could not extract UUID from router key: {key
}"
);
continue
;
};
// The consumer UUID is the router UUID
let
consumer_to_delete
=
router_uuid
.to_string
();
tracing
::
info!
(
"Attempting to delete orphaned consumer: {}"
,
consumer_to_delete
);
// Try to acquire cleanup lock before deleting consumer
match
etcd_client
.lock
(
cleanup_lock_name
.clone
(),
Some
(
etcd_client
.lease_id
()))
.await
{
Ok
(
lock_response
)
=>
{
tracing
::
debug!
(
"Acquired cleanup lock for deleting consumer: {}"
,
consumer_to_delete
);
tracing
::
info!
(
"Attempting to delete orphaned consumer: {consumer_to_delete}"
);
// Delete the consumer
if
let
Err
(
e
)
=
nats_queue
.shutdown
(
Some
(
consumer_to_delete
.clone
()))
.await
{
tracing
::
warn!
(
"Failed to delete consumer {}: {}"
,
consumer_to_delete
,
e
);
}
else
{
tracing
::
info!
(
"Successfully deleted orphaned consumer: {}"
,
consumer_to_delete
);
}
// Create a unique cleanup lock for this specific consumer
let
cleanup_lock_name
=
format!
(
"{}/{}/{}"
,
ROUTER_CLEANUP_LOCK
,
component
.subject
(),
consumer_to_delete
);
let
cleanup_rwlock
=
DistributedRWLock
::
new
(
cleanup_lock_name
);
// Release the lock
if
let
Err
(
e
)
=
etcd_client
.unlock
(
lock_response
.key
())
.await
{
tracing
::
warn!
(
"Failed to release cleanup lock: {e:?}"
);
}
}
Err
(
e
)
=>
{
tracing
::
debug!
(
"Could not acquire cleanup lock for consumer {}: {e:?}"
,
consumer_to_delete
);
// Try to acquire cleanup write lock (non-blocking) before deleting consumer
if
let
Some
(
_
cleanup_guard
)
=
cleanup_rwlock
.try_write_lock
(
&
etcd_client
)
.await
{
tracing
::
debug!
(
"Acquired cleanup lock for deleting consumer: {consumer_to_delete}"
);
// Delete the consumer
if
let
Err
(
e
)
=
nats_queue
.shutdown
(
Some
(
consumer_to_delete
.clone
()))
.await
{
tracing
::
warn!
(
"Failed to delete consumer {consumer_to_delete}: {e}"
);
}
else
{
tracing
::
info!
(
"Successfully deleted orphaned consumer: {consumer_to_delete}"
);
}
// Cleanup lock is automatically released when _cleanup_guard goes out of scope
}
else
{
tracing
::
debug!
(
"Could not acquire cleanup lock for consumer {consumer_to_delete}"
);
}
}
}
...
...
@@ -507,7 +483,7 @@ async fn cleanup_orphaned_consumers(
continue
;
}
if
!
active_uuids
.contains
(
&
consumer
)
{
tracing
::
info!
(
"Cleaning up orphaned consumer: {
}"
,
consumer
);
tracing
::
info!
(
"Cleaning up orphaned consumer: {consumer
}"
);
let
_
=
nats_queue
.shutdown
(
Some
(
consumer
))
.await
;
}
}
...
...
lib/runtime/src/transports/etcd.rs
View file @
e93170d6
...
...
@@ -21,9 +21,11 @@ pub use etcd_client::{ConnectOptions, KeyValue, LeaseClient};
use
tokio
::
time
::{
Duration
,
interval
};
mod
lease
;
mod
lock
;
mod
path
;
use
lease
::
*
;
pub
use
lock
::
*
;
pub
use
path
::
*
;
use
super
::
utils
::
build_in_runtime
;
...
...
lib/runtime/src/transports/etcd/lock.rs
0 → 100644
View file @
e93170d6
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//! Distributed read-write lock implementation using etcd atomic transactions
use
std
::
time
::
Duration
;
use
etcd_client
::{
Compare
,
CompareOp
,
PutOptions
,
Txn
,
TxnOp
};
use
crate
::
Result
;
use
super
::
Client
;
/// Timeout for acquiring read lock when downloading snapshots
const
DEFAULT_READ_LOCK_TIMEOUT_SECS
:
u64
=
30
;
/// Distributed read-write lock for coordinating operations across multiple processes
///
/// This implementation uses etcd atomic transactions to prevent race conditions:
/// - Write locks are exclusive (no readers or writers can coexist)
/// - Read locks are shared (multiple readers allowed, but no writers)
/// - All lock operations use atomic compare-and-set to ensure correctness
/// - Locks are bound to leases for automatic cleanup on client failure
#[derive(Clone)]
pub
struct
DistributedRWLock
{
lock_prefix
:
String
,
}
pub
struct
WriteLockGuard
<
'a
>
{
rwlock
:
&
'a
DistributedRWLock
,
etcd_client
:
&
'a
Client
,
}
impl
Drop
for
WriteLockGuard
<
'_
>
{
fn
drop
(
&
mut
self
)
{
match
tokio
::
runtime
::
Handle
::
try_current
()
{
Ok
(
handle
)
=>
{
let
rwlock
=
self
.rwlock
.clone
();
let
etcd_client
=
self
.etcd_client
.clone
();
handle
.spawn
(
async
move
{
let
write_key
=
format!
(
"v1/{}/writer"
,
rwlock
.lock_prefix
);
if
let
Err
(
e
)
=
etcd_client
.kv_delete
(
write_key
.as_str
(),
None
)
.await
{
tracing
::
warn!
(
"Failed to release write lock in drop: {e:?}"
);
}
});
}
Err
(
_
)
=>
{
tracing
::
error!
(
"WriteLockGuard dropped outside tokio runtime - lock not released!
\
Lock will be cleaned up when etcd lease expires."
);
}
}
}
}
pub
struct
ReadLockGuard
<
'a
>
{
rwlock
:
&
'a
DistributedRWLock
,
etcd_client
:
&
'a
Client
,
reader_id
:
String
,
}
impl
Drop
for
ReadLockGuard
<
'_
>
{
fn
drop
(
&
mut
self
)
{
match
tokio
::
runtime
::
Handle
::
try_current
()
{
Ok
(
handle
)
=>
{
let
rwlock
=
self
.rwlock
.clone
();
let
etcd_client
=
self
.etcd_client
.clone
();
let
reader_id
=
self
.reader_id
.clone
();
handle
.spawn
(
async
move
{
let
reader_key
=
format!
(
"v1/{}/readers/{reader_id}"
,
rwlock
.lock_prefix
);
if
let
Err
(
e
)
=
etcd_client
.kv_delete
(
reader_key
.as_str
(),
None
)
.await
{
tracing
::
warn!
(
"Failed to release read lock in drop: {e:?}"
);
}
});
}
Err
(
_
)
=>
{
tracing
::
error!
(
"ReadLockGuard dropped outside tokio runtime - lock not released!
\
Lock will be cleaned up when etcd lease expires."
);
}
}
}
}
impl
DistributedRWLock
{
/// Create a new distributed RWLock with the given prefix
///
/// The lock will create keys under:
/// - `v1/{prefix}/writer` for the write lock
/// - `v1/{prefix}/readers/{reader_id}` for read locks
pub
fn
new
(
lock_prefix
:
String
)
->
Self
{
Self
{
lock_prefix
}
}
/// Try to acquire exclusive write lock (non-blocking)
///
/// Returns `Some(WriteLockGuard)` if acquired, `None` if readers exist or lock unavailable.
/// The guard automatically releases the lock when dropped.
///
/// Implementation strategy:
/// 1. Atomically create writer key if it doesn't exist
/// 2. Immediately check if any readers exist
/// 3. If readers found, rollback (delete writer key) and return None
///
/// Note: There is still a small race window (sub-millisecond) where a reader could acquire
/// a lock between steps 2-3.
pub
async
fn
try_write_lock
<
'a
>
(
&
'a
self
,
etcd_client
:
&
'a
Client
,
)
->
Option
<
WriteLockGuard
<
'a
>>
{
let
write_key
=
format!
(
"v1/{}/writer"
,
self
.lock_prefix
);
let
lease_id
=
etcd_client
.lease_id
();
let
put_options
=
PutOptions
::
new
()
.with_lease
(
lease_id
);
// Step 1: Atomically create write lock only if it doesn't exist
let
txn
=
Txn
::
new
()
.when
(
vec!
[
Compare
::
version
(
write_key
.as_str
(),
CompareOp
::
Equal
,
0
,
)])
.and_then
(
vec!
[
TxnOp
::
put
(
write_key
.as_str
(),
b
"writing"
,
Some
(
put_options
),
)]);
// Execute the atomic transaction
match
etcd_client
.etcd_client
()
.kv_client
()
.txn
(
txn
)
.await
{
Ok
(
response
)
if
response
.succeeded
()
=>
{
// Step 2: Immediately check if any readers exist
let
reader_prefix
=
format!
(
"v1/{}/readers/"
,
self
.lock_prefix
);
match
etcd_client
.kv_get_prefix
(
&
reader_prefix
)
.await
{
Ok
(
readers
)
if
!
readers
.is_empty
()
=>
{
// Readers exist! Rollback - delete our writer key
tracing
::
debug!
(
"Found {} reader(s) after acquiring write lock, rolling back"
,
readers
.len
()
);
if
let
Err
(
e
)
=
etcd_client
.kv_delete
(
write_key
.as_str
(),
None
)
.await
{
tracing
::
warn!
(
"Failed to rollback write lock: {e:?}"
);
}
None
}
Ok
(
_
)
=>
{
// No readers, we successfully hold the write lock
tracing
::
debug!
(
"Successfully acquired write lock with no readers"
);
Some
(
WriteLockGuard
{
rwlock
:
self
,
etcd_client
,
})
}
Err
(
e
)
=>
{
// Error checking for readers - rollback to be safe
tracing
::
warn!
(
"Failed to check for readers, rolling back write lock: {e:?}"
);
let
_
=
etcd_client
.kv_delete
(
write_key
.as_str
(),
None
)
.await
;
None
}
}
}
Ok
(
_
)
=>
{
tracing
::
debug!
(
"Write lock already exists, transaction failed"
);
None
}
Err
(
e
)
=>
{
tracing
::
warn!
(
"Failed to execute write lock transaction: {e:?}"
);
None
}
}
}
/// Acquire shared read lock with polling retry
///
/// Polls every 100ms until write lock is released, then atomically acquires read lock.
/// The guard automatically releases the lock when dropped.
/// Uses atomic transaction to prevent race with writer - the check for no write lock
/// and creation of read lock happen in a single atomic operation.
///
/// # Arguments
/// * `etcd_client` - The etcd client
/// * `reader_id` - Unique identifier for this reader
/// * `timeout` - Optional timeout, defaults to 5 seconds
pub
async
fn
read_lock_with_wait
<
'a
>
(
&
'a
self
,
etcd_client
:
&
'a
Client
,
reader_id
:
&
str
,
timeout
:
Option
<
Duration
>
,
)
->
Result
<
ReadLockGuard
<
'a
>>
{
let
timeout
=
timeout
.unwrap_or
(
Duration
::
from_secs
(
DEFAULT_READ_LOCK_TIMEOUT_SECS
));
let
write_key
=
format!
(
"v1/{}/writer"
,
self
.lock_prefix
);
let
reader_key
=
format!
(
"v1/{}/readers/{reader_id}"
,
self
.lock_prefix
);
let
deadline
=
tokio
::
time
::
Instant
::
now
()
+
timeout
;
let
lease_id
=
etcd_client
.lease_id
();
loop
{
// Check if timeout exceeded
if
tokio
::
time
::
Instant
::
now
()
>
deadline
{
anyhow
::
bail!
(
"Timeout waiting for read lock after {:?}"
,
timeout
);
}
// Try to atomically acquire read lock
// The transaction checks that no writer exists and creates reader key atomically
let
put_options
=
PutOptions
::
new
()
.with_lease
(
lease_id
);
// Build atomic transaction: create reader key only if write_key doesn't exist
let
txn
=
Txn
::
new
()
.when
(
vec!
[
Compare
::
version
(
write_key
.as_str
(),
CompareOp
::
Equal
,
0
,
)])
.and_then
(
vec!
[
TxnOp
::
put
(
reader_key
.as_str
(),
b
"reading"
,
Some
(
put_options
),
)]);
// Execute the atomic transaction
match
etcd_client
.etcd_client
()
.kv_client
()
.txn
(
txn
)
.await
{
Ok
(
response
)
if
response
.succeeded
()
=>
{
tracing
::
debug!
(
"Acquired read lock for reader {}"
,
reader_id
);
return
Ok
(
ReadLockGuard
{
rwlock
:
self
,
etcd_client
,
reader_id
:
reader_id
.to_string
(),
});
}
Ok
(
_
)
=>
{
tracing
::
trace!
(
"Write lock exists or was created, retrying after delay"
);
}
Err
(
e
)
=>
{
tracing
::
warn!
(
"Failed to execute read lock transaction: {e:?}"
);
}
}
// Wait before next retry
tokio
::
time
::
sleep
(
Duration
::
from_millis
(
100
))
.await
;
}
}
}
#[cfg(feature
=
"testing-etcd"
)]
#[cfg(test)]
mod
tests
{
use
super
::
*
;
use
crate
::
Runtime
;
use
std
::
sync
::
Arc
;
use
tokio
::
sync
::
Barrier
;
/// Test the DistributedRWLock behavior
///
/// This test verifies:
/// 1. Multiple readers can acquire read locks simultaneously
/// 2. Write lock fails when readers are active
/// 3. Write lock succeeds when no locks are held
/// 4. Read lock waits for write lock to be released
#[tokio::test]
async
fn
test_distributed_rwlock
()
{
// Setup: Create etcd client
let
runtime
=
Runtime
::
from_settings
()
.unwrap
();
let
etcd_client
=
Client
::
builder
()
.etcd_url
(
vec!
[
"http://localhost:2379"
.to_string
()])
.build
()
.unwrap
();
let
etcd_client
=
Client
::
new
(
etcd_client
,
runtime
)
.await
.unwrap
();
// Prevent runtime from being dropped in async context at end of test
let
etcd_client
=
std
::
mem
::
ManuallyDrop
::
new
(
etcd_client
);
// Create RWLock with unique prefix for this test
let
test_id
=
uuid
::
Uuid
::
new_v4
();
let
lock_prefix
=
format!
(
"/test/rwlock/{}"
,
test_id
);
let
rwlock
=
DistributedRWLock
::
new
(
lock_prefix
.clone
());
// Step 1: Acquire first read lock
let
_
reader1_guard
=
rwlock
.read_lock_with_wait
(
&
etcd_client
,
"reader1"
,
Some
(
Duration
::
from_secs
(
5
)))
.await
.expect
(
"First read lock should succeed"
);
println!
(
"✓ Acquired first read lock"
);
// Step 2: Acquire second read lock (should succeed - multiple readers allowed)
let
_
reader2_guard
=
rwlock
.read_lock_with_wait
(
&
etcd_client
,
"reader2"
,
Some
(
Duration
::
from_secs
(
5
)))
.await
.expect
(
"Second read lock should succeed"
);
println!
(
"✓ Acquired second read lock"
);
// Step 3: Try to acquire write lock (should fail - readers are active)
let
write_result
=
rwlock
.try_write_lock
(
&
etcd_client
)
.await
;
assert
!
(
write_result
.is_none
(),
"Write lock should fail when readers are active"
);
println!
(
"✓ Write lock correctly failed with active readers"
);
// Step 4: Drop first read lock
drop
(
_
reader1_guard
);
tokio
::
time
::
sleep
(
Duration
::
from_millis
(
50
))
.await
;
// Give time for async drop
println!
(
"✓ Released first read lock"
);
// Verify write lock still fails with one reader active
let
write_result_with_one_reader
=
rwlock
.try_write_lock
(
&
etcd_client
)
.await
;
assert
!
(
write_result_with_one_reader
.is_none
(),
"Write lock should still fail when one reader is active"
);
println!
(
"✓ Write lock correctly failed with one reader still active"
);
drop
(
_
reader2_guard
);
tokio
::
time
::
sleep
(
Duration
::
from_millis
(
50
))
.await
;
// Give time for async drop
println!
(
"✓ Released second read lock"
);
// Give etcd a moment to process the deletions
tokio
::
time
::
sleep
(
Duration
::
from_millis
(
100
))
.await
;
// Step 5: Acquire write lock (should succeed now - no locks held)
let
_
write_guard
=
rwlock
.try_write_lock
(
&
etcd_client
)
.await
.expect
(
"Write lock should succeed with no readers"
);
println!
(
"✓ Acquired write lock"
);
// Step 5a: Try to acquire write lock again (should fail immediately - already held)
let
write_result_already_held
=
rwlock
.try_write_lock
(
&
etcd_client
)
.await
;
assert
!
(
write_result_already_held
.is_none
(),
"Write lock should fail when another write lock is already held"
);
println!
(
"✓ Write lock correctly failed when already held"
);
// Step 6: Spawn background task to acquire read lock
// It should wait because write lock is held
let
barrier
=
Arc
::
new
(
Barrier
::
new
(
2
));
let
barrier_clone
=
barrier
.clone
();
let
rwlock_clone
=
rwlock
.clone
();
let
etcd_client_clone
=
etcd_client
.clone
();
let
read_task
=
tokio
::
spawn
(
async
move
{
println!
(
"→ Background: Attempting to acquire read lock (should wait)..."
);
barrier_clone
.wait
()
.await
;
// Signal that we've started
let
start
=
std
::
time
::
Instant
::
now
();
let
_
guard
=
rwlock_clone
.read_lock_with_wait
(
&
etcd_client_clone
,
"reader3"
,
Some
(
Duration
::
from_secs
(
10
)))
.await
.expect
(
"Read lock should eventually succeed"
);
let
elapsed
=
start
.elapsed
();
println!
(
"✓ Background: Acquired read lock after {:?}"
,
elapsed
);
// Verify it actually waited (should be > 100ms since we sleep before releasing write lock)
assert
!
(
elapsed
>
Duration
::
from_millis
(
50
),
"Read lock should have waited for write lock to be released"
);
// Guard will be dropped here, releasing the lock
});
// Wait for background task to start
barrier
.wait
()
.await
;
// Give the background task a moment to start polling
tokio
::
time
::
sleep
(
Duration
::
from_millis
(
200
))
.await
;
// Step 7: Release write lock by dropping guard
println!
(
"→ Releasing write lock..."
);
drop
(
_
write_guard
);
tokio
::
time
::
sleep
(
Duration
::
from_millis
(
50
))
.await
;
// Give time for async drop
println!
(
"✓ Released write lock"
);
// Step 8: Background task should now succeed
read_task
.await
.expect
(
"Background task should complete successfully"
);
// Final cleanup: verify all locks are released
tokio
::
time
::
sleep
(
Duration
::
from_millis
(
100
))
.await
;
let
remaining_locks
=
etcd_client
.kv_get_prefix
(
&
format!
(
"v1/{lock_prefix}"
))
.await
.expect
(
"Should be able to check remaining locks"
);
assert
!
(
remaining_locks
.is_empty
(),
"All locks should be released at end of test"
);
println!
(
"✓ All locks cleaned up successfully"
);
println!
(
"
\n
🎉 All DistributedRWLock tests passed!"
);
}
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment