Unverified Commit f3d784f3 authored by Biswa Panda's avatar Biswa Panda Committed by GitHub
Browse files

feat: query instance_id based on routing strategy (#1787)

parent 13560ab2
......@@ -313,69 +313,81 @@ impl AsyncEngine<SingleIn<PreprocessedRequest>, ManyOut<Annotated<LLMEngineOutpu
InstanceSource::Dynamic(_) => {
// Extract context ID for request tracking
let context_id = request.context().id().to_string();
let (instance_id, overlap_amount) = self
.chooser
.find_best_match(&context_id, &request.token_ids)
.await?;
let query_instance_id = request.has_annotation("query_instance_id");
// Extract context information before moving the request
let stream_context = request.context().clone();
// Update the request with the estimated prefix hit blocks
let (mut backend_input, context) = request.into_parts();
let isl = backend_input.token_ids.len();
backend_input.estimated_prefix_hit_num_blocks = Some(overlap_amount);
let updated_request = context.map(|_| backend_input);
// if request has the annotation "query_instance_id", for example
// curl -d '{... ,"nvext": { "annotations": ["query_instance_id"]}}'
// request will not be routed to worker immediately
if query_instance_id {
let instance_id_str = instance_id.to_string();
let response =
Annotated::from_annotation("worker_instance_id", &instance_id_str)?;
let stream = stream::iter(vec![response]);
Ok(ResponseStream::new(Box::pin(stream), stream_context))
} else {
// Get the response stream from the worker
let mut response_stream =
self.inner.direct(updated_request, instance_id).await?;
// Wrap the stream to track tokens
let stream_context = response_stream.context();
let chooser = self.chooser.clone();
let request_id = context_id.clone();
let block_size = chooser.block_size() as usize;
let wrapped_stream = Box::pin(async_stream::stream! {
let mut accumulated_tokens = Vec::new();
let mut total_output_length = 0usize;
let mut last_block_index = (isl.saturating_sub(1)) / block_size;
let mut first_push_done = false;
while let Some(item) = response_stream.next().await {
// Track tokens if they exist in the response
let Some(ref output) = item.data else {
yield item;
continue;
};
if output.token_ids.is_empty() {
yield item;
continue;
}
// Get the response stream from the worker
let mut response_stream = self.inner.direct(updated_request, instance_id).await?;
// Wrap the stream to track tokens
let stream_context = response_stream.context();
let chooser = self.chooser.clone();
let request_id = context_id.clone();
let block_size = chooser.block_size() as usize;
let wrapped_stream = Box::pin(async_stream::stream! {
let mut accumulated_tokens = Vec::new();
let mut total_output_length = 0usize;
let mut last_block_index = (isl.saturating_sub(1)) / block_size;
let mut first_push_done = false;
// Add tokens to accumulator
accumulated_tokens.extend_from_slice(&output.token_ids);
total_output_length += output.token_ids.len();
// Always push for the first generated token (to mark prefill done)
// or when we've moved to a new block
let current_block_index = (isl + total_output_length).saturating_sub(1) / block_size;
let should_push = (!first_push_done && total_output_length >= 1) ||
(first_push_done && current_block_index > last_block_index);
if should_push {
chooser.push(&request_id, &accumulated_tokens).await;
accumulated_tokens.clear();
last_block_index = current_block_index;
if !first_push_done {
first_push_done = true;
}
}
while let Some(item) = response_stream.next().await {
// Track tokens if they exist in the response
let Some(ref output) = item.data else {
yield item;
continue;
};
if output.token_ids.is_empty() {
yield item;
continue;
}
// Add tokens to accumulator
accumulated_tokens.extend_from_slice(&output.token_ids);
total_output_length += output.token_ids.len();
// Always push for the first generated token (to mark prefill done)
// or when we've moved to a new block
let current_block_index = (isl + total_output_length).saturating_sub(1) / block_size;
let should_push = (!first_push_done && total_output_length >= 1) ||
(first_push_done && current_block_index > last_block_index);
if should_push {
chooser.push(&request_id, &accumulated_tokens).await;
accumulated_tokens.clear();
last_block_index = current_block_index;
if !first_push_done {
first_push_done = true;
}
}
yield item;
}
chooser.free(&request_id).await;
});
Ok(ResponseStream::new(wrapped_stream, stream_context))
chooser.free(&request_id).await;
});
Ok(ResponseStream::new(wrapped_stream, stream_context))
}
}
}
}
......
......@@ -397,8 +397,8 @@ impl OpenAIPreprocessor {
// Only set event if not already set to avoid overriding existing events (like errors)
if response.event.is_none() {
response.event = metrics_annotated.event;
response.comment = metrics_annotated.comment;
}
response.comment = metrics_annotated.comment;
}
tracing::trace!(
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment