Unverified Commit 6a1391eb authored by Yan Ru Pei's avatar Yan Ru Pei Committed by GitHub
Browse files

feat: router frees request from slot manager on stopped requests (#3623)


Signed-off-by: default avatarPeaBrane <yanrpei@gmail.com>
parent 400dceae
......@@ -569,18 +569,35 @@ impl AsyncEngine<SingleIn<PreprocessedRequest>, ManyOut<Annotated<LLMEngineOutpu
let mut response_stream = self.inner.direct(updated_request, instance_id).await?;
let stream_context = response_stream.context();
let chooser = self.chooser.clone();
let context_for_monitoring = stream_context.clone();
let wrapped_stream = Box::pin(async_stream::stream! {
if let Some(first_item) = response_stream.next().await {
let mut prefill_marked = false;
loop {
tokio::select! {
biased;
_ = context_for_monitoring.stopped() => {
tracing::debug!("Request {context_id} cancelled, ending stream");
break;
}
item = response_stream.next() => {
let Some(item) = item else {
break;
};
if !prefill_marked {
if let Err(e) = chooser.mark_prefill_completed(&context_id).await {
tracing::warn!("Failed to mark prefill completed for request {context_id}: {e:?}");
}
yield first_item;
prefill_marked = true;
}
while let Some(item) = response_stream.next().await {
yield item;
}
}
}
if let Err(e) = chooser.free(&context_id).await {
tracing::warn!("Failed to free request {context_id}: {e:?}");
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment