Unverified Commit 8055b7dd authored by Yan Ru Pei's avatar Yan Ru Pei Committed by GitHub
Browse files

fix(kv-router): plumb expected_osl through scheduler queue path (#6812)


Signed-off-by: default avatarPeaBrane <yanrpei@gmail.com>
parent 06c7d6f5
...@@ -493,6 +493,7 @@ impl RouterHandles { ...@@ -493,6 +493,7 @@ impl RouterHandles {
false, false,
None, None,
0.0, 0.0,
None,
allowed_worker_ids, allowed_worker_ids,
) )
.await .await
......
...@@ -918,6 +918,7 @@ impl KvRouter { ...@@ -918,6 +918,7 @@ impl KvRouter {
update_states, update_states,
lora_name, lora_name,
0.0, 0.0,
None,
None, // allowed_worker_ids: pass via RoutingHints in PreprocessedRequest path None, // allowed_worker_ids: pass via RoutingHints in PreprocessedRequest path
) )
.await .await
......
...@@ -186,7 +186,7 @@ impl<P: SequencePublisher + 'static, C: WorkerConfigLike> SchedulerQueue<P, C> { ...@@ -186,7 +186,7 @@ impl<P: SequencePublisher + 'static, C: WorkerConfigLike> SchedulerQueue<P, C> {
token_sequence: request.token_seq, token_sequence: request.token_seq,
isl: request.isl_tokens, isl: request.isl_tokens,
overlap: selection.overlap_blocks, overlap: selection.overlap_blocks,
expected_output_tokens: None, expected_output_tokens: request.expected_output_tokens,
worker: selection.worker, worker: selection.worker,
lora_name: request.lora_name.clone(), lora_name: request.lora_name.clone(),
}) })
...@@ -303,6 +303,7 @@ mod tests { ...@@ -303,6 +303,7 @@ mod tests {
update_states: true, update_states: true,
lora_name: None, lora_name: None,
priority_jump: 0.0, priority_jump: 0.0,
expected_output_tokens: None,
allowed_worker_ids: None, allowed_worker_ids: None,
resp_tx: Some(tx), resp_tx: Some(tx),
}; };
......
...@@ -47,6 +47,9 @@ pub struct SchedulingRequest { ...@@ -47,6 +47,9 @@ pub struct SchedulingRequest {
pub lora_name: Option<String>, pub lora_name: Option<String>,
/// Priority jump in seconds; decreases effective arrival time in the queue. /// Priority jump in seconds; decreases effective arrival time in the queue.
pub priority_jump: f64, pub priority_jump: f64,
/// Expected output tokens from agent_hints.osl, forwarded to the slot tracker
/// for output block decay estimation.
pub expected_output_tokens: Option<u32>,
/// Optional set of allowed worker IDs to restrict routing decisions (EPP). /// Optional set of allowed worker IDs to restrict routing decisions (EPP).
pub allowed_worker_ids: Option<HashSet<WorkerId>>, pub allowed_worker_ids: Option<HashSet<WorkerId>>,
pub resp_tx: Option<tokio::sync::oneshot::Sender<Result<SchedulingResponse, KvSchedulerError>>>, pub resp_tx: Option<tokio::sync::oneshot::Sender<Result<SchedulingResponse, KvSchedulerError>>>,
......
...@@ -370,6 +370,7 @@ impl KvRouter { ...@@ -370,6 +370,7 @@ impl KvRouter {
update_states: bool, update_states: bool,
lora_name: Option<String>, lora_name: Option<String>,
priority_jump: f64, priority_jump: f64,
expected_output_tokens: Option<u32>,
allowed_worker_ids: Option<HashSet<WorkerId>>, allowed_worker_ids: Option<HashSet<WorkerId>>,
) -> anyhow::Result<(WorkerWithDpRank, u32)> { ) -> anyhow::Result<(WorkerWithDpRank, u32)> {
let start = Instant::now(); let start = Instant::now();
...@@ -419,6 +420,7 @@ impl KvRouter { ...@@ -419,6 +420,7 @@ impl KvRouter {
update_states, update_states,
lora_name, lora_name,
priority_jump, priority_jump,
expected_output_tokens,
allowed_worker_ids, allowed_worker_ids,
) )
.instrument(tracing::info_span!("kv_router.schedule")) .instrument(tracing::info_span!("kv_router.schedule"))
...@@ -584,6 +586,7 @@ impl AsyncEngine<SingleIn<RouterRequest>, ManyOut<Annotated<RouterResponse>>, Er ...@@ -584,6 +586,7 @@ impl AsyncEngine<SingleIn<RouterRequest>, ManyOut<Annotated<RouterResponse>>, Er
None, None,
0.0, 0.0,
None, None,
None,
) )
.await?; .await?;
......
...@@ -539,6 +539,7 @@ impl PrefillRouter { ...@@ -539,6 +539,7 @@ impl PrefillRouter {
update_states, update_states,
lora_name, lora_name,
priority_jump, priority_jump,
None,
allowed_worker_ids, allowed_worker_ids,
) )
.await?; .await?;
......
...@@ -252,6 +252,7 @@ impl KvPushRouter { ...@@ -252,6 +252,7 @@ impl KvPushRouter {
!is_query_only, !is_query_only,
lora_name, lora_name,
priority_jump, priority_jump,
expected_output_tokens,
allowed_worker_ids, allowed_worker_ids,
) )
.await?; .await?;
......
...@@ -158,6 +158,7 @@ impl KvScheduler { ...@@ -158,6 +158,7 @@ impl KvScheduler {
update_states: bool, update_states: bool,
lora_name: Option<String>, lora_name: Option<String>,
priority_jump: f64, priority_jump: f64,
expected_output_tokens: Option<u32>,
allowed_worker_ids: Option<HashSet<WorkerId>>, allowed_worker_ids: Option<HashSet<WorkerId>>,
) -> Result<SchedulingResponse, KvSchedulerError> { ) -> Result<SchedulingResponse, KvSchedulerError> {
#[cfg(feature = "bench")] #[cfg(feature = "bench")]
...@@ -175,6 +176,7 @@ impl KvScheduler { ...@@ -175,6 +176,7 @@ impl KvScheduler {
update_states, update_states,
lora_name, lora_name,
priority_jump, priority_jump,
expected_output_tokens,
allowed_worker_ids, allowed_worker_ids,
resp_tx: Some(resp_tx), resp_tx: Some(resp_tx),
}; };
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment