Skip to content
GitLab
Menu
Projects
Groups
Snippets
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
OpenDAS
dynamo
Commits
f4245c99
Unverified
Commit
f4245c99
authored
Dec 19, 2025
by
Yan Ru Pei
Committed by
GitHub
Dec 19, 2025
Browse files
chore: clean up PreprocessedRequest (#5040)
Signed-off-by:
PeaBrane
<
yanrpei@gmail.com
>
parent
37cc1f3d
Changes
8
Hide whitespace changes
Inline
Side-by-side
Showing
8 changed files
with
140 additions
and
114 deletions
+140
-114
components/src/dynamo/router/__main__.py
components/src/dynamo/router/__main__.py
+12
-3
lib/bindings/python/rust/llm/kv.rs
lib/bindings/python/rust/llm/kv.rs
+8
-4
lib/llm/src/kv_router.rs
lib/llm/src/kv_router.rs
+50
-49
lib/llm/src/kv_router/prefill_router.rs
lib/llm/src/kv_router/prefill_router.rs
+21
-11
lib/llm/src/kv_router/protocols.rs
lib/llm/src/kv_router/protocols.rs
+1
-7
lib/llm/src/mocker/engine.rs
lib/llm/src/mocker/engine.rs
+6
-2
lib/llm/src/preprocessor.rs
lib/llm/src/preprocessor.rs
+10
-7
lib/llm/src/protocols/common/preprocessor.rs
lib/llm/src/protocols/common/preprocessor.rs
+32
-31
No files found.
components/src/dynamo/router/__main__.py
View file @
f4245c99
...
...
@@ -90,6 +90,12 @@ class StandaloneRouterHandler:
# Wrap incoming request into PreprocessedRequest format for KvPushRouter
# The request should already have most fields, but we ensure it has the structure
# Build routing hints from request (supports both nested routing object and legacy dp_rank)
routing
=
request
.
get
(
"routing"
)
dp_rank
=
request
.
get
(
"dp_rank"
)
if
routing
is
None
and
dp_rank
is
not
None
:
routing
=
{
"dp_rank"
:
dp_rank
}
preprocessed_request
=
{
"model"
:
request
.
get
(
"model"
,
"unknown"
),
"token_ids"
:
request
[
"token_ids"
],
...
...
@@ -98,9 +104,11 @@ class StandaloneRouterHandler:
"output_options"
:
request
.
get
(
"output_options"
,
{}),
"eos_token_ids"
:
request
.
get
(
"eos_token_ids"
,
[]),
"annotations"
:
request
.
get
(
"annotations"
,
[]),
"disaggregated_params"
:
request
.
get
(
"disaggregated_params"
),
"dp_rank"
:
request
.
get
(
"dp_rank"
),
"extra_args"
:
request
.
get
(
"extra_args"
,
{}),
"routing"
:
routing
,
"router_config_override"
:
request
.
get
(
"router_config_override"
),
"prefill_result"
:
request
.
get
(
"prefill_result"
),
"bootstrap_info"
:
request
.
get
(
"bootstrap_info"
),
"extra_args"
:
request
.
get
(
"extra_args"
),
}
# Route and process through KvPushRouter
...
...
@@ -117,6 +125,7 @@ class StandaloneRouterHandler:
"log_probs"
:
worker_output
.
get
(
"log_probs"
),
"top_logprobs"
:
worker_output
.
get
(
"top_logprobs"
),
"finish_reason"
:
worker_output
.
get
(
"finish_reason"
),
"stop_reason"
:
worker_output
.
get
(
"stop_reason"
),
"index"
:
worker_output
.
get
(
"index"
),
"disaggregated_params"
:
worker_output
.
get
(
"disaggregated_params"
),
"extra_args"
:
worker_output
.
get
(
"extra_args"
),
...
...
lib/bindings/python/rust/llm/kv.rs
View file @
f4245c99
...
...
@@ -1279,13 +1279,17 @@ impl KvPushRouter {
.sampling_options
(
sampling_options
)
.output_options
(
output_options
)
.router_config_override
(
router_config_override
)
.dp_rank
(
dp_rank
)
.extra_args
(
extra_args
)
.tracker
(
Some
(
tracker
.clone
()));
// Set backend_instance_id if worker_id is provided
if
let
Some
(
worker_id
)
=
worker_id
{
request_builder
.backend_instance_id
(
Some
(
worker_id
));
// Set routing hints if worker_id or dp_rank is provided
if
worker_id
.is_some
()
||
dp_rank
.is_some
()
{
let
routing
=
llm_rs
::
protocols
::
common
::
preprocessor
::
RoutingHints
{
backend_instance_id
:
worker_id
,
dp_rank
,
..
Default
::
default
()
};
request_builder
.routing
(
Some
(
routing
));
}
let
request
=
request_builder
.build
()
.map_err
(
to_pyerr
)
?
;
...
...
lib/llm/src/kv_router.rs
View file @
f4245c99
...
...
@@ -661,10 +661,7 @@ impl AsyncEngine<SingleIn<RouterRequest>, ManyOut<Annotated<RouterResponse>>, Er
let
context_id
=
ctx
.context
()
.id
()
.to_string
();
// Handle different request types
let
response
=
match
request
{
RouterRequest
::
New
{
tokens
,
request_extra_info
:
_
,
}
=>
{
RouterRequest
::
New
{
tokens
}
=>
{
let
(
best_worker
,
overlap_blocks
)
=
self
.find_best_match
(
Some
(
&
context_id
),
&
tokens
,
None
,
true
)
.await
?
;
...
...
@@ -743,57 +740,61 @@ impl AsyncEngine<SingleIn<PreprocessedRequest>, ManyOut<Annotated<LLMEngineOutpu
.map
(|
t
|
t
.phase
())
.unwrap_or
(
RequestPhase
::
Aggregated
);
// Get pre-selected worker based on phase
// Get pre-selected worker based on phase, with backend_instance_id as fallback
let
routing
=
request
.routing
.as_ref
();
let
preselected
=
match
phase
{
RequestPhase
::
Prefill
=>
request
.target_prefill_worker_id
,
RequestPhase
::
Decode
=>
request
.target_decode_worker_id
,
RequestPhase
::
Aggregated
=>
None
,
RequestPhase
::
Prefill
=>
{
routing
.and_then
(|
r
|
r
.prefill_worker_id
.or
(
r
.backend_instance_id
))
}
RequestPhase
::
Decode
=>
{
routing
.and_then
(|
r
|
r
.decode_worker_id
.or
(
r
.backend_instance_id
))
}
RequestPhase
::
Aggregated
=>
routing
.and_then
(|
r
|
r
.backend_instance_id
),
};
let
block_size
=
self
.chooser
.block_size
()
as
usize
;
let
(
instance_id
,
dp_rank
,
overlap_amount
)
=
if
let
Some
(
id
)
=
preselected
.or
(
request
.backend_instance_id
)
{
// Route to pre-selected or explicitly specified worker
let
dp_rank
=
request
.dp_rank
.unwrap_or
(
0
);
tracing
::
debug!
(
worker_id
=
id
,
dp_rank
=
dp_rank
,
?
phase
,
"Routing to specified worker"
);
let
(
instance_id
,
dp_rank
,
overlap_amount
)
=
if
let
Some
(
id
)
=
preselected
{
// Route to pre-selected or explicitly specified worker
let
dp_rank
=
routing
.and_then
(|
r
|
r
.dp_rank
)
.unwrap_or
(
0
);
tracing
::
debug!
(
worker_id
=
id
,
dp_rank
=
dp_rank
,
?
phase
,
"Routing to specified worker"
);
// Compute actual overlap blocks by querying the indexer
let
block_hashes
=
compute_block_hash_for_seq
(
&
request
.token_ids
,
self
.chooser
.block_size
(),
None
);
let
overlap_scores
=
self
.chooser.indexer
.find_matches
(
block_hashes
)
.await
?
;
let
worker
=
WorkerWithDpRank
::
new
(
id
,
dp_rank
);
let
overlap_blocks
=
overlap_scores
.scores
.get
(
&
worker
)
.copied
()
.unwrap_or
(
0
);
if
!
is_query_only
{
self
.chooser
.add_request
(
context_id
.clone
(),
&
request
.token_ids
,
overlap_blocks
,
worker
,
)
.await
;
}
(
id
,
dp_rank
,
overlap_blocks
)
}
else
{
// Find the best worker match
// Don't update states if this is a query-only request
let
(
best_worker
,
overlap_amount
)
=
self
.chooser
.find_best_match
(
Some
(
&
context_id
),
// Compute actual overlap blocks by querying the indexer
let
block_hashes
=
compute_block_hash_for_seq
(
&
request
.token_ids
,
self
.chooser
.block_size
(),
None
);
let
overlap_scores
=
self
.chooser.indexer
.find_matches
(
block_hashes
)
.await
?
;
let
worker
=
WorkerWithDpRank
::
new
(
id
,
dp_rank
);
let
overlap_blocks
=
overlap_scores
.scores
.get
(
&
worker
)
.copied
()
.unwrap_or
(
0
);
if
!
is_query_only
{
self
.chooser
.add_request
(
context_id
.clone
(),
&
request
.token_ids
,
request
.router_config_override
.as_ref
()
,
!
is_query_only
,
overlap_blocks
,
worker
,
)
.await
?
;
(
best_worker
.worker_id
,
best_worker
.dp_rank
,
overlap_amount
)
};
.await
;
}
(
id
,
dp_rank
,
overlap_blocks
)
}
else
{
// Find the best worker match
// Don't update states if this is a query-only request
let
(
best_worker
,
overlap_amount
)
=
self
.chooser
.find_best_match
(
Some
(
&
context_id
),
&
request
.token_ids
,
request
.router_config_override
.as_ref
(),
!
is_query_only
,
)
.await
?
;
(
best_worker
.worker_id
,
best_worker
.dp_rank
,
overlap_amount
)
};
// Record metrics in tracker: KV hit rate and worker ID based on phase
if
let
Some
(
ref
tracker
)
=
request
.tracker
{
...
...
@@ -830,7 +831,7 @@ impl AsyncEngine<SingleIn<PreprocessedRequest>, ManyOut<Annotated<LLMEngineOutpu
// Route to worker
let
(
mut
backend_input
,
context
)
=
request
.into_parts
();
backend_input
.dp_rank
=
Some
(
dp_rank
);
backend_input
.
routing_mut
()
.
dp_rank
=
Some
(
dp_rank
);
let
updated_request
=
context
.map
(|
_
|
backend_input
);
let
mut
response_stream
=
self
.inner
.direct
(
updated_request
,
instance_id
)
.await
?
;
...
...
lib/llm/src/kv_router/prefill_router.rs
View file @
f4245c99
...
...
@@ -72,7 +72,7 @@ impl InnerPrefillRouter {
///
/// Supports regular Dynamo and GAIE integrated mode via query_instance_id state machine:
/// - GAIE Stage 1: query_instance_id transitions "" -> "prefill" -> "decode", returns only worker IDs
/// - GAIE Stage 2:
target_
prefill_worker_id/
target_
decode_worker_id are set, full execution with specified workers
/// - GAIE Stage 2:
routing.
prefill_worker_id/
routing.
decode_worker_id are set, full execution with specified workers
/// - Non-GAIE: like GAIE Stage 2 but the worker ids have to be determined.
pub
struct
PrefillRouter
{
prefill_router
:
OnceLock
<
InnerPrefillRouter
>
,
...
...
@@ -221,7 +221,7 @@ impl PrefillRouter {
// Use pre-selected worker (GAIE Stage 2) or query for best worker
let
(
worker_id
,
dp_rank
)
=
if
let
Some
(
id
)
=
preselected_worker
{
let
dp_rank
=
req
.dp_rank
.unwrap_or
(
0
);
let
dp_rank
=
req
.
routing
.as_ref
()
.and_then
(|
r
|
r
.
dp_rank
)
.unwrap_or
(
0
);
tracing
::
debug!
(
worker_id
=
id
,
dp_rank
=
dp_rank
,
...
...
@@ -377,13 +377,17 @@ impl PrefillRouter {
prefill_req
.annotations
.push
(
format!
(
"query_instance_id:{}"
,
RequestPhase
::
Prefill
));
}
else
if
let
Some
(
prefill_worker_id
)
=
prefill_req
.target_prefill_worker_id
{
}
else
if
let
Some
(
prefill_worker_id
)
=
prefill_req
.routing
.as_ref
()
.and_then
(|
r
|
r
.prefill_worker_id
)
{
// GAIE Stage 2: Route to pre-selected prefill worker from the stage 1
tracing
::
debug!
(
target_
prefill_worker_id
=
prefill_worker_id
,
prefill_worker_id
=
prefill_worker_id
,
"GAIE Stage 2: Routing prefill to pre-selected worker"
);
prefill_req
.backend_instance_id
=
Some
(
prefill_worker_id
);
prefill_req
.
routing_mut
()
.
backend_instance_id
=
Some
(
prefill_worker_id
);
}
}
...
...
@@ -456,8 +460,11 @@ impl
Self
::
prepare_prefill_for_gaie
(
&
mut
prefill_req
,
is_gaie_stage1
);
// Try build_bootstrap_info optimization (skip for GAIE Stage 1 which needs query-only flow)
// For GAIE Stage 2, use target_prefill_worker_id if provided
let
preselected_worker
=
prefill_req
.target_prefill_worker_id
;
// For GAIE Stage 2, use prefill_worker_id if provided
let
preselected_worker
=
prefill_req
.routing
.as_ref
()
.and_then
(|
r
|
r
.prefill_worker_id
);
let
prefill_result
=
if
!
is_gaie_stage1
{
if
let
Some
((
worker_id
,
dp_rank
,
bootstrap_info
))
=
self
.build_bootstrap_info
(
&
prefill_req
,
preselected_worker
)
...
...
@@ -466,8 +473,9 @@ impl
let
bootstrap_room
=
bootstrap_info
.bootstrap_room
;
// Prepare request with bootstrap_room and force routing to specific worker
prefill_req
.backend_instance_id
=
Some
(
worker_id
);
prefill_req
.dp_rank
=
Some
(
dp_rank
);
let
routing
=
prefill_req
.routing_mut
();
routing
.backend_instance_id
=
Some
(
worker_id
);
routing
.dp_rank
=
Some
(
dp_rank
);
let
extra_args
=
prefill_req
.extra_args
.get_or_insert_with
(||
serde_json
::
json!
({}));
...
...
@@ -578,8 +586,10 @@ impl
});
// GAIE Stage 2: Route to pre-selected decode worker if specified
if
let
Some
(
decode_worker_id
)
=
decode_req
.target_decode_worker_id
{
decode_req
.backend_instance_id
=
Some
(
decode_worker_id
);
if
let
Some
(
decode_worker_id
)
=
decode_req
.routing
.as_ref
()
.and_then
(|
r
|
r
.decode_worker_id
)
{
decode_req
.routing_mut
()
.backend_instance_id
=
Some
(
decode_worker_id
);
tracing
::
debug!
(
decode_worker_id
=
decode_worker_id
,
"GAIE Stage 2: Routing decode to pre-selected worker"
...
...
lib/llm/src/kv_router/protocols.rs
View file @
f4245c99
...
...
@@ -36,12 +36,9 @@ impl WorkerWithDpRank {
#[derive(Debug,
Clone,
Serialize,
Deserialize)]
#[serde(tag
=
"method"
,
rename_all
=
"snake_case"
)]
pub
enum
RouterRequest
{
// ini
#[serde(rename
=
"new"
)]
New
{
tokens
:
Vec
<
Token
>
,
#[serde(default,
skip_serializing_if
=
"Option::is_none"
)]
request_extra_info
:
Option
<
RequestExtraInfo
>
,
},
MarkPrefill
,
MarkFree
,
...
...
@@ -49,10 +46,7 @@ pub enum RouterRequest {
impl
Default
for
RouterRequest
{
fn
default
()
->
Self
{
RouterRequest
::
New
{
tokens
:
vec!
[],
request_extra_info
:
None
,
}
RouterRequest
::
New
{
tokens
:
vec!
[]
}
}
}
...
...
lib/llm/src/mocker/engine.rs
View file @
f4245c99
...
...
@@ -238,8 +238,12 @@ impl AsyncEngine<SingleIn<PreprocessedRequest>, ManyOut<LLMEngineOutput>, Error>
)
->
Result
<
ManyOut
<
LLMEngineOutput
>
,
Error
>
{
let
(
request
,
ctx
)
=
input
.into_parts
();
// Extract dp_rank from request field (defaults to 0 if not set)
let
dp_rank
=
request
.dp_rank
.unwrap_or
(
0
);
// Extract dp_rank from routing hints (defaults to 0 if not set)
let
dp_rank
=
request
.routing
.as_ref
()
.and_then
(|
r
|
r
.dp_rank
)
.unwrap_or
(
0
);
// Validate dp_rank
if
dp_rank
>=
self
.engine_args.dp_size
{
...
...
lib/llm/src/preprocessor.rs
View file @
f4245c99
...
...
@@ -31,7 +31,7 @@ use crate::model_card::{ModelDeploymentCard, ModelInfo};
use
crate
::
preprocessor
::
media
::
MediaLoader
;
use
crate
::
preprocessor
::
prompt
::
OAIChatLikeRequest
;
use
crate
::
protocols
::
common
::
preprocessor
::{
MultimodalData
,
MultimodalDataMap
,
PreprocessedRequestBuilder
,
MultimodalData
,
MultimodalDataMap
,
PreprocessedRequestBuilder
,
RoutingHints
,
};
use
crate
::
tokenizers
::
Encoding
;
...
...
@@ -237,13 +237,16 @@ impl OpenAIPreprocessor {
builder
.output_options
(
request
.extract_output_options
()
?
);
builder
.annotations
(
request
.annotations
()
.unwrap_or_default
());
builder
.mdc_sum
(
Some
(
self
.mdcsum
.clone
()));
// Extract
backend_instance_id, extra_fields, and worker ID
s from nvext if present
// Extract
routing hint
s from nvext if present
if
let
Some
(
nvext
)
=
request
.nvext
()
{
builder
.backend_instance_id
(
nvext
.backend_instance_id
);
builder
.extra_fields
(
nvext
.extra_fields
.clone
());
// GAIE Stage 2: Extract targeted worker IDs for disaggregated serving
builder
.target_prefill_worker_id
(
nvext
.prefill_worker_id
);
builder
.target_decode_worker_id
(
nvext
.decode_worker_id
);
// Build routing hints from nvext fields
let
routing
=
RoutingHints
{
backend_instance_id
:
nvext
.backend_instance_id
,
prefill_worker_id
:
nvext
.prefill_worker_id
,
decode_worker_id
:
nvext
.decode_worker_id
,
dp_rank
:
None
,
// dp_rank is set later in the pipeline
};
builder
.routing
(
Some
(
routing
));
}
Ok
(
builder
)
...
...
lib/llm/src/protocols/common/preprocessor.rs
View file @
f4245c99
...
...
@@ -8,11 +8,34 @@ use serde::{Deserialize, Serialize};
use
super
::
timing
::
RequestTracker
;
use
super
::{
OutputOptions
,
SamplingOptions
,
StopConditions
};
use
crate
::
kv_router
::
{
RouterConfigOverride
,
protocols
::
RequestExtraInfo
}
;
use
crate
::
kv_router
::
RouterConfigOverride
;
#[cfg(feature
=
"media-nixl"
)]
use
crate
::
preprocessor
::
media
::
RdmaMediaDataDescriptor
;
use
crate
::
protocols
::
TokenIdType
;
/// Routing hints for directing requests to specific workers.
/// These fields are extracted from nvext and used by the router to determine
/// which worker(s) should handle the request.
#[derive(Serialize,
Deserialize,
Debug,
Clone,
Default,
Builder)]
#[builder(default)]
pub
struct
RoutingHints
{
/// General backend instance ID for direct routing (aggregated mode)
#[serde(default,
skip_serializing_if
=
"Option::is_none"
)]
pub
backend_instance_id
:
Option
<
u64
>
,
/// Targeted prefill worker ID for disaggregated serving (GAIE Stage 2)
#[serde(default,
skip_serializing_if
=
"Option::is_none"
)]
pub
prefill_worker_id
:
Option
<
u64
>
,
/// Targeted decode worker ID for disaggregated serving (GAIE Stage 2)
#[serde(default,
skip_serializing_if
=
"Option::is_none"
)]
pub
decode_worker_id
:
Option
<
u64
>
,
/// Data parallel rank for the request
#[serde(default,
skip_serializing_if
=
"Option::is_none"
)]
pub
dp_rank
:
Option
<
u32
>
,
}
#[derive(Serialize,
Deserialize,
Debug,
Clone,
Default)]
pub
struct
BootstrapInfo
{
/// The host address for bootstrap connection
...
...
@@ -85,9 +108,10 @@ pub struct PreprocessedRequest {
#[builder(default)]
pub
annotations
:
Vec
<
String
>
,
///
Targeted
backend
instance
ID for the request
///
Routing hints for worker targeting (
backend
_
instance
_id, prefill/decode worker IDs, dp_rank)
#[builder(default)]
pub
backend_instance_id
:
Option
<
u64
>
,
#[serde(default,
skip_serializing_if
=
"Option::is_none"
)]
pub
routing
:
Option
<
RoutingHints
>
,
/// Router configuration overrides for this specific request
#[builder(default)]
...
...
@@ -103,41 +127,15 @@ pub struct PreprocessedRequest {
#[serde(default,
skip_serializing_if
=
"Option::is_none"
)]
pub
bootstrap_info
:
Option
<
BootstrapInfo
>
,
/// Data parallel rank for the request (used with data parallelism)
#[builder(default)]
#[serde(default,
skip_serializing_if
=
"Option::is_none"
)]
pub
dp_rank
:
Option
<
u32
>
,
/// Additional arguments for extensibility
#[builder(default)]
#[serde(default,
skip_serializing_if
=
"Option::is_none"
)]
pub
extra_args
:
Option
<
serde_json
::
Value
>
,
/// Extra fields requested to be included in the response's nvext
#[builder(default)]
#[serde(default,
skip_serializing_if
=
"Option::is_none"
)]
pub
extra_fields
:
Option
<
Vec
<
String
>>
,
/// Multimodal request-level metadata (mm_hash and token offsets)
#[builder(default)]
#[serde(default,
skip_serializing_if
=
"Option::is_none"
)]
pub
request_extra_info
:
Option
<
RequestExtraInfo
>
,
/// Optional request tracker for per-request metrics (shared with DeltaGenerator)
#[builder(default)]
#[serde(skip)]
pub
tracker
:
Option
<
Arc
<
RequestTracker
>>
,
/// Targeted prefill worker ID for disaggregated serving (GAIE Stage 2)
/// When set, the prefill request will be routed to this specific worker.
#[builder(default)]
#[serde(default,
skip_serializing_if
=
"Option::is_none"
)]
pub
target_prefill_worker_id
:
Option
<
u64
>
,
/// Targeted decode worker ID for disaggregated serving (GAIE Stage 2)
/// When set, the decode request will be routed to this specific worker.
#[builder(default)]
#[serde(default,
skip_serializing_if
=
"Option::is_none"
)]
pub
target_decode_worker_id
:
Option
<
u64
>
,
}
impl
PreprocessedRequest
{
...
...
@@ -154,12 +152,15 @@ impl PreprocessedRequest {
.find
(|
a
|
a
.starts_with
(
&
prefix
))
.map
(|
a
|
a
[
prefix
.len
()
..
]
.to_string
())
}
}
impl
PreprocessedRequest
{
pub
fn
builder
()
->
PreprocessedRequestBuilder
{
PreprocessedRequestBuilder
::
default
()
}
/// Get mutable access to routing hints, creating default if None
pub
fn
routing_mut
(
&
mut
self
)
->
&
mut
RoutingHints
{
self
.routing
.get_or_insert_with
(
RoutingHints
::
default
)
}
}
/// [`PreprocessedEmbeddingRequest`] is the internal representation of an embedding request
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment