"docs/api/vscode:/vscode.git/clone" did not exist on "326a702d5c6f9883b1e2eb31ae83e2bc27206ea4"
Unverified Commit 3efc733f authored by Yan Ru Pei's avatar Yan Ru Pei Committed by GitHub
Browse files

fix(llm): close push-router cleanup gap (#8181)


Signed-off-by: default avatarPeaBrane <yanrpei@gmail.com>
parent 89d67172
...@@ -92,6 +92,46 @@ struct RequestGuard { ...@@ -92,6 +92,46 @@ struct RequestGuard {
deferred_close: Option<SessionCloseAction>, deferred_close: Option<SessionCloseAction>,
} }
struct PendingDispatchGuard {
chooser: Arc<KvRouter>,
scheduler_tracked: bool,
context_id: String,
deferred_close: Option<SessionCloseAction>,
disarmed: bool,
}
fn spawn_cleanup_task(
chooser: &Arc<KvRouter>,
scheduler_tracked: bool,
context_id: &str,
deferred_close: Option<SessionCloseAction>,
log_context: &'static str,
) {
if deferred_close.is_none() && !scheduler_tracked {
return;
}
let Ok(handle) = tokio::runtime::Handle::try_current() else {
tracing::warn!(
"No tokio runtime for {log_context} cleanup of request {}",
context_id
);
return;
};
let chooser = chooser.clone();
let context_id = context_id.to_owned();
handle.spawn(async move {
if scheduler_tracked && let Err(e) = chooser.free(&context_id).await {
tracing::warn!("Failed to free request {context_id} ({log_context}): {e}");
}
if let Some(close) = deferred_close {
close.execute(&context_id);
}
});
}
impl RequestGuard { impl RequestGuard {
async fn on_item(&mut self, item: &Annotated<LLMEngineOutput>) { async fn on_item(&mut self, item: &Annotated<LLMEngineOutput>) {
if !self.prefill_marked { if !self.prefill_marked {
...@@ -208,34 +248,51 @@ impl Drop for RequestGuard { ...@@ -208,34 +248,51 @@ impl Drop for RequestGuard {
fn drop(&mut self) { fn drop(&mut self) {
self.record_metrics(); self.record_metrics();
let deferred_close = self.deferred_close.take(); spawn_cleanup_task(
let needs_free = !self.freed && self.scheduler_tracked; &self.chooser,
!self.freed && self.scheduler_tracked,
if deferred_close.is_none() && !needs_free { &self.context_id,
return; self.deferred_close.take(),
"drop guard",
);
} }
}
let Ok(handle) = tokio::runtime::Handle::try_current() else { impl PendingDispatchGuard {
tracing::warn!( fn new(
"No tokio runtime for drop guard cleanup of request {}", chooser: Arc<KvRouter>,
self.context_id scheduler_tracked: bool,
); context_id: String,
return; deferred_close: Option<SessionCloseAction>,
}; ) -> Self {
Self {
chooser,
scheduler_tracked,
context_id,
deferred_close,
disarmed: false,
}
}
// Mirror finish(): free the scheduler slot first, then fire the fn disarm(mut self) -> Option<SessionCloseAction> {
// deferred session close so the worker's KV isn't released while self.disarmed = true;
// generation teardown is still in progress. self.deferred_close.take()
let chooser = self.chooser.clone();
let context_id = self.context_id.clone();
handle.spawn(async move {
if needs_free && let Err(e) = chooser.free(&context_id).await {
tracing::warn!("Failed to free request {context_id} (drop guard): {e}");
} }
if let Some(close) = deferred_close { }
close.execute(&context_id);
impl Drop for PendingDispatchGuard {
fn drop(&mut self) {
if self.disarmed {
return;
} }
});
spawn_cleanup_task(
&self.chooser,
self.scheduler_tracked,
&self.context_id,
self.deferred_close.take(),
"dispatch guard",
);
} }
} }
...@@ -620,6 +677,12 @@ impl AsyncEngine<SingleIn<PreprocessedRequest>, ManyOut<Annotated<LLMEngineOutpu ...@@ -620,6 +677,12 @@ impl AsyncEngine<SingleIn<PreprocessedRequest>, ManyOut<Annotated<LLMEngineOutpu
} }
let chooser = self.chooser.clone(); let chooser = self.chooser.clone();
let dispatch_guard = PendingDispatchGuard::new(
chooser.clone(),
scheduler_tracked,
context_id.clone(),
deferred_close,
);
let mut response_stream = self let mut response_stream = self
.inner .inner
.direct(updated_request, instance_id) .direct(updated_request, instance_id)
...@@ -632,11 +695,12 @@ impl AsyncEngine<SingleIn<PreprocessedRequest>, ManyOut<Annotated<LLMEngineOutpu ...@@ -632,11 +695,12 @@ impl AsyncEngine<SingleIn<PreprocessedRequest>, ManyOut<Annotated<LLMEngineOutpu
phase = ?phase, phase = ?phase,
)) ))
.await?; .await?;
let deferred_close = dispatch_guard.disarm();
let stream_context = response_stream.context(); let stream_context = response_stream.context();
let context_for_monitoring = stream_context.clone(); let context_for_monitoring = stream_context.clone();
// Build the guard before returning the stream so a drop-before-first-poll
let wrapped_stream = Box::pin(async_stream::stream! { // still frees booked scheduler state.
let mut guard = RequestGuard { let guard = RequestGuard {
chooser: chooser.clone(), chooser: chooser.clone(),
scheduler_tracked, scheduler_tracked,
context_id: context_id.clone(), context_id: context_id.clone(),
...@@ -655,6 +719,9 @@ impl AsyncEngine<SingleIn<PreprocessedRequest>, ManyOut<Annotated<LLMEngineOutpu ...@@ -655,6 +719,9 @@ impl AsyncEngine<SingleIn<PreprocessedRequest>, ManyOut<Annotated<LLMEngineOutpu
deferred_close, deferred_close,
}; };
let wrapped_stream = Box::pin(async_stream::stream! {
let mut guard = guard;
loop { loop {
tokio::select! { tokio::select! {
biased; biased;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment