Skip to content
GitLab
Menu
Projects
Groups
Snippets
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
Menu
Open sidebar
OpenDAS
dynamo
Commits
8bdf18e5
Unverified
Commit
8bdf18e5
authored
May 08, 2025
by
Yan Ru Pei
Committed by
GitHub
May 08, 2025
Browse files
fix: should route based on waiting requests, not active (#989)
parent
5c98f8d1
Changes
1
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
15 additions
and
11 deletions
+15
-11
lib/llm/src/kv_router/scheduler.rs
lib/llm/src/kv_router/scheduler.rs
+15
-11
No files found.
lib/llm/src/kv_router/scheduler.rs
View file @
8bdf18e5
...
@@ -215,8 +215,10 @@ pub fn process_worker_selection(
...
@@ -215,8 +215,10 @@ pub fn process_worker_selection(
.get_mut
(
&
selection
.worker_id
)
.get_mut
(
&
selection
.worker_id
)
.expect
(
"worker not found"
);
.expect
(
"worker not found"
);
// Update worker state
// Update worker state predictively
worker
.data.request_active_slots
+=
1
;
// Will be overwritten on next polling of metrics
worker
.data.num_requests_waiting
+=
1
;
// Assumes radix attention so KV load is only incremented by uncached blocks
worker
.data.kv_active_blocks
+=
selection
.required_blocks
-
selection
.overlap_blocks
as
u64
;
worker
.data.kv_active_blocks
+=
selection
.required_blocks
-
selection
.overlap_blocks
as
u64
;
// Emit event
// Emit event
...
@@ -245,7 +247,7 @@ impl WorkerSelector for DefaultWorkerSelector {
...
@@ -245,7 +247,7 @@ impl WorkerSelector for DefaultWorkerSelector {
assert
!
(
request
.isl_tokens
>
0
);
assert
!
(
request
.isl_tokens
>
0
);
let
mut
worker_scores
=
HashMap
::
new
();
let
mut
worker_scores
=
HashMap
::
new
();
let
mut
max_
active
=
0.0
;
let
mut
max_
waiting
=
0.0
;
// Calculate worker scores and find max waiting requests
// Calculate worker scores and find max waiting requests
for
(
worker_id
,
ep
)
in
workers
.endpoints
.iter
()
{
for
(
worker_id
,
ep
)
in
workers
.endpoints
.iter
()
{
...
@@ -256,16 +258,16 @@ impl WorkerSelector for DefaultWorkerSelector {
...
@@ -256,16 +258,16 @@ impl WorkerSelector for DefaultWorkerSelector {
}
}
// Track max waiting requests
// Track max waiting requests
max_
active
=
f64
::
max
(
max_
active
,
ep
.data.request
_active_slots
as
f64
);
max_
waiting
=
f64
::
max
(
max_
waiting
,
ep
.data.
num_
request
s_waiting
as
f64
);
}
}
if
max_
active
==
0.0
{
if
max_
waiting
==
0.0
{
return
Err
(
KvSchedulerError
::
NoEndpoints
);
return
Err
(
KvSchedulerError
::
NoEndpoints
);
}
}
// make immutable
// make immutable
let
worker_scores
=
worker_scores
;
let
worker_scores
=
worker_scores
;
let
max_
active
=
max_active
;
let
max_
waiting
=
max_waiting
;
// Calculate logits for each worker
// Calculate logits for each worker
let
mut
best_logit
=
f64
::
NEG_INFINITY
;
let
mut
best_logit
=
f64
::
NEG_INFINITY
;
...
@@ -280,14 +282,14 @@ impl WorkerSelector for DefaultWorkerSelector {
...
@@ -280,14 +282,14 @@ impl WorkerSelector for DefaultWorkerSelector {
// Calculate normalized metrics
// Calculate normalized metrics
assert
!
(
ep
.data.kv_total_blocks
>
0
);
assert
!
(
ep
.data.kv_total_blocks
>
0
);
let
gpu_cache_usage
=
ep
.data.kv_active_blocks
as
f64
/
ep
.data.kv_total_blocks
as
f64
;
let
gpu_cache_usage
=
ep
.data.kv_active_blocks
as
f64
/
ep
.data.kv_total_blocks
as
f64
;
let
normalized_
active
=
if
max_
active
>
0.0
{
let
normalized_
waiting
=
if
max_
waiting
>
0.0
{
ep
.data.request
_active_slots
as
f64
/
max_
active
ep
.data.
num_
request
s_waiting
as
f64
/
max_
waiting
}
else
{
}
else
{
0.0
0.0
};
};
// Calculate logit using same formula as Python
// Calculate logit using same formula as Python
let
logit
=
2.0
*
score
-
gpu_cache_usage
-
normalized_
active
;
let
logit
=
2.0
*
score
-
gpu_cache_usage
-
normalized_
waiting
;
tracing
::
info!
(
tracing
::
info!
(
"Formula for {}: {:.3} = 2.0 * {:.3} - {:.3} - {:.3}"
,
"Formula for {}: {:.3} = 2.0 * {:.3} - {:.3} - {:.3}"
,
...
@@ -295,7 +297,7 @@ impl WorkerSelector for DefaultWorkerSelector {
...
@@ -295,7 +297,7 @@ impl WorkerSelector for DefaultWorkerSelector {
logit
,
logit
,
score
,
score
,
gpu_cache_usage
,
gpu_cache_usage
,
normalized_
active
normalized_
waiting
);
);
// Track best workers
// Track best workers
...
@@ -313,8 +315,10 @@ impl WorkerSelector for DefaultWorkerSelector {
...
@@ -313,8 +315,10 @@ impl WorkerSelector for DefaultWorkerSelector {
}
}
// Return early if no valid workers found
// Return early if no valid workers found
if
best_workers
.is_empty
()
||
best_logit
==
0.0
{
if
best_workers
.is_empty
()
{
return
Err
(
KvSchedulerError
::
NoEndpoints
);
return
Err
(
KvSchedulerError
::
NoEndpoints
);
}
else
if
best_logit
==
0.0
{
tracing
::
warn!
(
"best worker logit is 0"
);
}
}
let
worker_id
=
if
best_workers
.len
()
==
1
{
let
worker_id
=
if
best_workers
.len
()
==
1
{
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment