Skip to content
GitLab
Menu
Projects
Groups
Snippets
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
OpenDAS
dynamo
Commits
4cf04338
Unverified
Commit
4cf04338
authored
Apr 04, 2026
by
doujiang24
Committed by
GitHub
Apr 03, 2026
Browse files
fix(kv-router): clean up watermarks on worker deregister (#7863)
Signed-off-by:
zhudejiang.pt
<
zhudejiang.pt@antgroup.com
>
parent
dacb2980
Changes
1
Show whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
180 additions
and
0 deletions
+180
-0
lib/kv-router/src/standalone_indexer/registry.rs
lib/kv-router/src/standalone_indexer/registry.rs
+180
-0
No files found.
lib/kv-router/src/standalone_indexer/registry.rs
View file @
4cf04338
...
@@ -519,6 +519,9 @@ impl WorkerRegistry {
...
@@ -519,6 +519,9 @@ impl WorkerRegistry {
cancel_token
.cancel
();
cancel_token
.cancel
();
}
}
}
}
for
&
dp_rank
in
entry
.listeners
.keys
()
{
self
.watermarks
.remove
(
&
(
instance_id
,
dp_rank
));
}
}
else
{
}
else
{
#[cfg(feature
=
"indexer-runtime"
)]
#[cfg(feature
=
"indexer-runtime"
)]
{
{
...
@@ -569,6 +572,7 @@ impl WorkerRegistry {
...
@@ -569,6 +572,7 @@ impl WorkerRegistry {
if
let
Some
(
cancel_token
)
=
record
.take_cancel
()
{
if
let
Some
(
cancel_token
)
=
record
.take_cancel
()
{
cancel_token
.cancel
();
cancel_token
.cancel
();
}
}
self
.watermarks
.remove
(
&
(
instance_id
,
dp_rank
));
if
remove_worker
{
if
remove_worker
{
self
.workers
.remove
(
&
instance_id
);
self
.workers
.remove
(
&
instance_id
);
...
@@ -622,6 +626,9 @@ impl WorkerRegistry {
...
@@ -622,6 +626,9 @@ impl WorkerRegistry {
cancel_token
.cancel
();
cancel_token
.cancel
();
}
}
}
}
for
&
dp_rank
in
entry
.listeners
.keys
()
{
self
.watermarks
.remove
(
&
(
instance_id
,
dp_rank
));
}
}
else
{
}
else
{
#[cfg(feature
=
"indexer-runtime"
)]
#[cfg(feature
=
"indexer-runtime"
)]
{
{
...
@@ -902,3 +909,176 @@ impl WorkerRegistry {
...
@@ -902,3 +909,176 @@ impl WorkerRegistry {
self
.indexers
.remove
(
key
);
self
.indexers
.remove
(
key
);
}
}
}
}
#[cfg(test)]
mod
tests
{
use
super
::
*
;
use
std
::
sync
::
atomic
::
Ordering
;
fn
test_registry
()
->
WorkerRegistry
{
WorkerRegistry
::
new
(
1
)
}
#[tokio::test]
async
fn
deregister_removes_watermark
()
{
let
registry
=
test_registry
();
registry
.signal_ready
();
registry
.register
(
1
,
"tcp://127.0.0.1:15557"
.to_string
(),
0
,
"test-model"
.to_string
(),
"default"
.to_string
(),
1
,
None
,
)
.await
.unwrap
();
assert
!
(
registry
.watermarks
.contains_key
(
&
(
1
,
0
)));
registry
.deregister
(
1
,
"test-model"
,
"default"
)
.await
.unwrap
();
assert
!
(
!
registry
.watermarks
.contains_key
(
&
(
1
,
0
)),
"watermark should be removed after deregister"
);
}
#[tokio::test]
async
fn
deregister_dp_rank_removes_watermark
()
{
let
registry
=
test_registry
();
registry
.signal_ready
();
registry
.register
(
1
,
"tcp://127.0.0.1:15558"
.to_string
(),
0
,
"test-model"
.to_string
(),
"default"
.to_string
(),
1
,
None
,
)
.await
.unwrap
();
registry
.register
(
1
,
"tcp://127.0.0.1:15559"
.to_string
(),
1
,
"test-model"
.to_string
(),
"default"
.to_string
(),
1
,
None
,
)
.await
.unwrap
();
assert
!
(
registry
.watermarks
.contains_key
(
&
(
1
,
0
)));
assert
!
(
registry
.watermarks
.contains_key
(
&
(
1
,
1
)));
registry
.deregister_dp_rank
(
1
,
1
,
"test-model"
,
"default"
)
.await
.unwrap
();
assert
!
(
registry
.watermarks
.contains_key
(
&
(
1
,
0
)),
"watermark for dp_rank 0 should remain"
);
assert
!
(
!
registry
.watermarks
.contains_key
(
&
(
1
,
1
)),
"watermark for dp_rank 1 should be removed"
);
}
#[tokio::test]
async
fn
re_register_gets_fresh_watermark
()
{
let
registry
=
test_registry
();
registry
.signal_ready
();
registry
.register
(
1
,
"tcp://127.0.0.1:15560"
.to_string
(),
0
,
"test-model"
.to_string
(),
"default"
.to_string
(),
1
,
None
,
)
.await
.unwrap
();
// Simulate that the listener advanced the watermark.
if
let
Some
(
wm
)
=
registry
.watermarks
.get
(
&
(
1
,
0
))
{
wm
.store
(
42
,
Ordering
::
Release
);
}
registry
.deregister
(
1
,
"test-model"
,
"default"
)
.await
.unwrap
();
registry
.register
(
1
,
"tcp://127.0.0.1:15561"
.to_string
(),
0
,
"test-model"
.to_string
(),
"default"
.to_string
(),
1
,
None
,
)
.await
.unwrap
();
let
wm
=
registry
.watermarks
.get
(
&
(
1
,
0
))
.expect
(
"watermark should exist after re-register"
);
assert_eq!
(
wm
.load
(
Ordering
::
Acquire
),
u64
::
MAX
,
"re-registered watermark should be fresh (u64::MAX)"
);
}
#[tokio::test]
async
fn
deregister_all_tenants_removes_watermarks
()
{
let
registry
=
test_registry
();
registry
.signal_ready
();
registry
.register
(
1
,
"tcp://127.0.0.1:15562"
.to_string
(),
0
,
"test-model"
.to_string
(),
"default"
.to_string
(),
1
,
None
,
)
.await
.unwrap
();
assert
!
(
registry
.watermarks
.contains_key
(
&
(
1
,
0
)));
registry
.deregister_all_tenants
(
1
,
"test-model"
)
.await
.unwrap
();
assert
!
(
!
registry
.watermarks
.contains_key
(
&
(
1
,
0
)),
"watermark should be removed after deregister_all_tenants"
);
}
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment