Unverified Commit 6bc6d400 authored by atchernych's avatar atchernych Committed by GitHub
Browse files

feat: skip router when worker id is pre-determined (#2450)


Co-authored-by: default avatarBiswa Panda <biswa.panda@gmail.com>
parent 57d4fa05
...@@ -350,10 +350,16 @@ impl AsyncEngine<SingleIn<PreprocessedRequest>, ManyOut<Annotated<LLMEngineOutpu ...@@ -350,10 +350,16 @@ impl AsyncEngine<SingleIn<PreprocessedRequest>, ManyOut<Annotated<LLMEngineOutpu
InstanceSource::Dynamic(_) => { InstanceSource::Dynamic(_) => {
// Extract context ID for request tracking // Extract context ID for request tracking
let context_id = request.context().id().to_string(); let context_id = request.context().id().to_string();
let (instance_id, overlap_amount) = self let (instance_id, overlap_amount) = if let Some(id) = request.backend_instance_id {
.chooser // If instance_id is set, use it
.find_best_match(&context_id, &request.token_ids) (id, 0)
.await?; } else {
// Otherwise, find the best match
self.chooser
.find_best_match(&context_id, &request.token_ids)
.await?
};
let query_instance_id = request.has_annotation("query_instance_id"); let query_instance_id = request.has_annotation("query_instance_id");
// Extract context information before moving the request // Extract context information before moving the request
let stream_context = request.context().clone(); let stream_context = request.context().clone();
......
...@@ -188,6 +188,7 @@ mod tests { ...@@ -188,6 +188,7 @@ mod tests {
mdc_sum: None, mdc_sum: None,
annotations: vec![], annotations: vec![],
estimated_prefix_hit_num_blocks: None, estimated_prefix_hit_num_blocks: None,
backend_instance_id: None,
} }
} }
......
...@@ -646,6 +646,7 @@ mod integration_tests { ...@@ -646,6 +646,7 @@ mod integration_tests {
mdc_sum: None, mdc_sum: None,
annotations: vec![format!("dp_rank:{dp_rank}")], annotations: vec![format!("dp_rank:{dp_rank}")],
estimated_prefix_hit_num_blocks: None, estimated_prefix_hit_num_blocks: None,
backend_instance_id: None,
}; };
let requests = vec![ let requests = vec![
......
...@@ -254,6 +254,10 @@ impl OpenAIPreprocessor { ...@@ -254,6 +254,10 @@ impl OpenAIPreprocessor {
builder.annotations(request.annotations().unwrap_or_default()); builder.annotations(request.annotations().unwrap_or_default());
builder.mdc_sum(Some(self.mdcsum.clone())); builder.mdc_sum(Some(self.mdcsum.clone()));
builder.estimated_prefix_hit_num_blocks(None); builder.estimated_prefix_hit_num_blocks(None);
// Extract backend_instance_id from nvext if present
if let Some(nvext) = request.nvext() {
builder.backend_instance_id(nvext.backend_instance_id);
}
Ok((builder.build()?, annotations)) Ok((builder.build()?, annotations))
} }
......
...@@ -50,6 +50,10 @@ pub struct PreprocessedRequest { ...@@ -50,6 +50,10 @@ pub struct PreprocessedRequest {
/// Estimated number of prefix hit tokens (only used in kv aware routing) /// Estimated number of prefix hit tokens (only used in kv aware routing)
#[builder(default)] #[builder(default)]
pub estimated_prefix_hit_num_blocks: Option<u32>, pub estimated_prefix_hit_num_blocks: Option<u32>,
/// Targeted backend instance ID for the request
#[builder(default)]
pub backend_instance_id: Option<i64>,
} }
impl PreprocessedRequest { impl PreprocessedRequest {
......
...@@ -62,6 +62,12 @@ pub struct NvExt { ...@@ -62,6 +62,12 @@ pub struct NvExt {
#[builder(default, setter(strip_option))] #[builder(default, setter(strip_option))]
pub annotations: Option<Vec<String>>, pub annotations: Option<Vec<String>>,
/// Targeted backend instance ID for the request
/// If set, the request will be routed to backend instance with the given ID.
/// If not set, the request will be routed to the best matching instance.
#[builder(default, setter(strip_option))]
#[serde(default, skip_serializing_if = "Option::is_none")]
pub backend_instance_id: Option<i64>,
/// Guided Decoding Options /// Guided Decoding Options
/// If specified, the output will be a JSON object. Can be a string, an object, or null. /// If specified, the output will be a JSON object. Can be a string, an object, or null.
#[serde(default, skip_serializing_if = "Option::is_none")] #[serde(default, skip_serializing_if = "Option::is_none")]
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment