Skip to content
GitLab
Menu
Projects
Groups
Snippets
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
OpenDAS
dynamo
Commits
0c0336e6
Unverified
Commit
0c0336e6
authored
Feb 04, 2026
by
Hongkuan Zhou
Committed by
GitHub
Feb 04, 2026
Browse files
feat: Add per-worker Prometheus metrics for router load monitoring (#5842)
Signed-off-by:
hongkuanz
<
hongkuanz@nvidia.com
>
parent
eff08aed
Changes
22
Hide whitespace changes
Inline
Side-by-side
Showing
20 changed files
with
1023 additions
and
85 deletions
+1023
-85
examples/hierarchical_planner/run_example.sh
examples/hierarchical_planner/run_example.sh
+6
-6
lib/bindings/c/src/lib.rs
lib/bindings/c/src/lib.rs
+8
-1
lib/bindings/python/rust/llm/kv.rs
lib/bindings/python/rust/llm/kv.rs
+36
-2
lib/bindings/python/src/dynamo/prometheus_names.py
lib/bindings/python/src/dynamo/prometheus_names.py
+16
-0
lib/llm/src/discovery.rs
lib/llm/src/discovery.rs
+5
-1
lib/llm/src/discovery/model_manager.rs
lib/llm/src/discovery/model_manager.rs
+7
-0
lib/llm/src/discovery/watcher.rs
lib/llm/src/discovery/watcher.rs
+14
-13
lib/llm/src/discovery/worker_monitor.rs
lib/llm/src/discovery/worker_monitor.rs
+298
-9
lib/llm/src/entrypoint/input/common.rs
lib/llm/src/entrypoint/input/common.rs
+3
-0
lib/llm/src/http/service/metrics.rs
lib/llm/src/http/service/metrics.rs
+185
-2
lib/llm/src/http/service/service_v2.rs
lib/llm/src/http/service/service_v2.rs
+14
-1
lib/llm/src/kv_router.rs
lib/llm/src/kv_router.rs
+13
-2
lib/llm/src/kv_router/prefill_router.rs
lib/llm/src/kv_router/prefill_router.rs
+73
-18
lib/llm/src/kv_router/scheduler.rs
lib/llm/src/kv_router/scheduler.rs
+8
-0
lib/llm/src/kv_router/sequence.rs
lib/llm/src/kv_router/sequence.rs
+52
-6
lib/llm/src/preprocessor.rs
lib/llm/src/preprocessor.rs
+61
-1
lib/llm/src/protocols/common/timing.rs
lib/llm/src/protocols/common/timing.rs
+205
-11
lib/llm/src/protocols/openai.rs
lib/llm/src/protocols/openai.rs
+5
-0
lib/llm/src/protocols/openai/chat_completions/delta.rs
lib/llm/src/protocols/openai/chat_completions/delta.rs
+7
-6
lib/llm/src/protocols/openai/completions/delta.rs
lib/llm/src/protocols/openai/completions/delta.rs
+7
-6
No files found.
examples/hierarchical_planner/run_example.sh
View file @
0c0336e6
...
@@ -27,8 +27,8 @@ python -m dynamo.global_router \
...
@@ -27,8 +27,8 @@ python -m dynamo.global_router \
# ============================================================================
# ============================================================================
DYN_NAMESPACE
=
prefill_pool_0 python
-m
dynamo.router
\
DYN_NAMESPACE
=
prefill_pool_0 python
-m
dynamo.router
\
--endpoint
prefill_pool_0.worker.generate
\
--endpoint
prefill_pool_0.worker.generate
\
--block-size
16
&
\
--block-size
16
\
--no-track-active-blocks
# prefill router does not need to track active blocks
--no-track-active-blocks
&
# prefill router does not need to track active blocks
python
-m
dynamo.mocker
\
python
-m
dynamo.mocker
\
--model-path
Qwen/Qwen3-0.6B
\
--model-path
Qwen/Qwen3-0.6B
\
...
@@ -41,8 +41,8 @@ python -m dynamo.mocker \
...
@@ -41,8 +41,8 @@ python -m dynamo.mocker \
# ============================================================================
# ============================================================================
DYN_NAMESPACE
=
prefill_pool_1 python
-m
dynamo.router
\
DYN_NAMESPACE
=
prefill_pool_1 python
-m
dynamo.router
\
--endpoint
prefill_pool_1.worker.generate
\
--endpoint
prefill_pool_1.worker.generate
\
--block-size
16
&
\
--block-size
16
\
--no-track-active-blocks
# prefill router does not need to track active blocks
--no-track-active-blocks
&
# prefill router does not need to track active blocks
python
-m
dynamo.mocker
\
python
-m
dynamo.mocker
\
--model-path
Qwen/Qwen3-0.6B
\
--model-path
Qwen/Qwen3-0.6B
\
...
@@ -55,8 +55,8 @@ python -m dynamo.mocker \
...
@@ -55,8 +55,8 @@ python -m dynamo.mocker \
# ============================================================================
# ============================================================================
DYN_NAMESPACE
=
decode_pool_0 python
-m
dynamo.router
\
DYN_NAMESPACE
=
decode_pool_0 python
-m
dynamo.router
\
--endpoint
decode_pool_0.worker.generate
\
--endpoint
decode_pool_0.worker.generate
\
--block-size
16
&
\
--block-size
16
\
--kv-overlap-score-weight
0
--kv-overlap-score-weight
0
&
python
-m
dynamo.mocker
\
python
-m
dynamo.mocker
\
--model-path
Qwen/Qwen3-0.6B
\
--model-path
Qwen/Qwen3-0.6B
\
...
...
lib/bindings/c/src/lib.rs
View file @
0c0336e6
...
@@ -1335,6 +1335,7 @@ pub async fn create_worker_selection_pipeline_chat(
...
@@ -1335,6 +1335,7 @@ pub async fn create_worker_selection_pipeline_chat(
>
,
>
,
Option
<
Arc
<
dynamo_llm
::
kv_router
::
KvRouter
>>
,
Option
<
Arc
<
dynamo_llm
::
kv_router
::
KvRouter
>>
,
)
>
{
)
>
{
use
dynamo_llm
::
discovery
::
WORKER_TYPE_DECODE
;
use
dynamo_llm
::
kv_router
::
PrefillRouter
;
use
dynamo_llm
::
kv_router
::
PrefillRouter
;
// Use the global DRT singleton - initialize if not already done
// Use the global DRT singleton - initialize if not already done
...
@@ -1401,7 +1402,12 @@ pub async fn create_worker_selection_pipeline_chat(
...
@@ -1401,7 +1402,12 @@ pub async fn create_worker_selection_pipeline_chat(
let
chooser
=
if
router_mode
==
RouterMode
::
KV
{
let
chooser
=
if
router_mode
==
RouterMode
::
KV
{
Some
(
Some
(
model_manager
model_manager
.kv_chooser_for
(
&
endpoint
,
card
.kv_cache_block_size
,
kv_router_config
)
.kv_chooser_for
(
&
endpoint
,
card
.kv_cache_block_size
,
kv_router_config
,
WORKER_TYPE_DECODE
,
)
.await
?
,
.await
?
,
)
)
}
else
{
}
else
{
...
@@ -1425,6 +1431,7 @@ pub async fn create_worker_selection_pipeline_chat(
...
@@ -1425,6 +1431,7 @@ pub async fn create_worker_selection_pipeline_chat(
card
.kv_cache_block_size
,
card
.kv_cache_block_size
,
Some
(
prefill_config
),
Some
(
prefill_config
),
enforce_disagg
,
enforce_disagg
,
model_name
.to_string
(),
)
)
});
});
...
...
lib/bindings/python/rust/llm/kv.rs
View file @
0c0336e6
...
@@ -991,7 +991,11 @@ impl KvRecorder {
...
@@ -991,7 +991,11 @@ impl KvRecorder {
}
}
/// Helper function to create a KV router from an endpoint using the ModelManager
/// Helper function to create a KV router from an endpoint using the ModelManager
/// to ensure proper etcd registration
/// to ensure proper etcd registration.
/// Infers worker type using endpoint naming and router config:
/// - If endpoint name/component contains "prefill", treat as prefill
/// - If router_track_active_blocks is disabled, treat as prefill
/// - Otherwise, default to decode
async
fn
create_kv_router_from_endpoint
(
async
fn
create_kv_router_from_endpoint
(
endpoint
:
&
Endpoint
,
endpoint
:
&
Endpoint
,
block_size
:
usize
,
block_size
:
usize
,
...
@@ -999,8 +1003,28 @@ async fn create_kv_router_from_endpoint(
...
@@ -999,8 +1003,28 @@ async fn create_kv_router_from_endpoint(
)
->
Result
<
Arc
<
llm_rs
::
kv_router
::
KvRouter
>
,
PyErr
>
{
)
->
Result
<
Arc
<
llm_rs
::
kv_router
::
KvRouter
>
,
PyErr
>
{
// Create ModelManager and use it to create KvRouter (ensures registration)
// Create ModelManager and use it to create KvRouter (ensures registration)
let
model_manager
=
Arc
::
new
(
llm_rs
::
discovery
::
ModelManager
::
new
());
let
model_manager
=
Arc
::
new
(
llm_rs
::
discovery
::
ModelManager
::
new
());
let
endpoint_id
=
endpoint
.inner
.id
();
let
namespace
=
endpoint_id
.namespace
.to_lowercase
();
let
component
=
endpoint_id
.component
.to_lowercase
();
let
name
=
endpoint_id
.name
.to_lowercase
();
let
endpoint_is_prefill
=
namespace
.contains
(
"prefill"
)
||
component
.contains
(
"prefill"
)
||
name
.contains
(
"prefill"
);
let
track_active_blocks
=
kv_router_config
.as_ref
()
.map
(|
cfg
|
cfg
.router_track_active_blocks
)
.unwrap_or
(
true
);
let
worker_type
=
if
endpoint_is_prefill
||
!
track_active_blocks
{
llm_rs
::
discovery
::
WORKER_TYPE_PREFILL
}
else
{
llm_rs
::
discovery
::
WORKER_TYPE_DECODE
};
let
kv_router
=
model_manager
let
kv_router
=
model_manager
.kv_chooser_for
(
&
endpoint
.inner
,
block_size
as
u32
,
kv_router_config
)
.kv_chooser_for
(
&
endpoint
.inner
,
block_size
as
u32
,
kv_router_config
,
worker_type
,
)
.await
.await
.map_err
(
to_pyerr
)
?
;
.map_err
(
to_pyerr
)
?
;
...
@@ -1096,7 +1120,17 @@ impl KvPushRouter {
...
@@ -1096,7 +1120,17 @@ impl KvPushRouter {
#[pymethods]
#[pymethods]
impl
KvPushRouter
{
impl
KvPushRouter
{
/// Create a new KvPushRouter for KV-aware routing to workers.
///
/// # Arguments
/// * `endpoint` - The endpoint to route requests to
/// * `block_size` - KV cache block size for routing decisions
/// * `kv_router_config` - Configuration for the KV router
///
/// Note: Worker type for Prometheus metrics is inferred from the endpoint name/component
/// (contains "prefill") or by `router_track_active_blocks` being disabled.
#[new]
#[new]
#[pyo3(signature
=
(endpoint,
block_size,
kv_router_config))]
fn
new
(
fn
new
(
endpoint
:
&
Endpoint
,
endpoint
:
&
Endpoint
,
block_size
:
usize
,
block_size
:
usize
,
...
...
lib/bindings/python/src/dynamo/prometheus_names.py
View file @
0c0336e6
...
@@ -80,6 +80,22 @@ class frontend_service:
...
@@ -80,6 +80,22 @@ class frontend_service:
MODEL_MIGRATION_LIMIT
=
"model_migration_limit"
MODEL_MIGRATION_LIMIT
=
"model_migration_limit"
# Total number of request migrations due to worker unavailability
# Total number of request migrations due to worker unavailability
MODEL_MIGRATION_TOTAL
=
"model_migration_total"
MODEL_MIGRATION_TOTAL
=
"model_migration_total"
# Active decode blocks (KV cache blocks) per worker
# Gauge metric tracking current KV cache block utilization for each worker
WORKER_ACTIVE_DECODE_BLOCKS
=
"worker_active_decode_blocks"
# Active prefill tokens per worker
# Gauge metric tracking current queued prefill tokens for each worker
WORKER_ACTIVE_PREFILL_TOKENS
=
"worker_active_prefill_tokens"
# Last observed time to first token per worker (in seconds)
# Gauge metric tracking the most recent TTFT for each worker
WORKER_LAST_TIME_TO_FIRST_TOKEN_SECONDS
=
"worker_last_time_to_first_token_seconds"
# Last observed input sequence tokens per worker
# Gauge metric tracking the input token count from the same request as WORKER_LAST_TIME_TO_FIRST_TOKEN_SECONDS
# Updated atomically with TTFT to correlate latency with input size
WORKER_LAST_INPUT_SEQUENCE_TOKENS
=
"worker_last_input_sequence_tokens"
# Last observed inter-token latency per worker (in seconds)
# Gauge metric tracking the most recent ITL for each worker
WORKER_LAST_INTER_TOKEN_LATENCY_SECONDS
=
"worker_last_inter_token_latency_seconds"
# Label name for the type of migration
# Label name for the type of migration
MIGRATION_TYPE_LABEL
=
"migration_type"
MIGRATION_TYPE_LABEL
=
"migration_type"
...
...
lib/llm/src/discovery.rs
View file @
0c0336e6
...
@@ -11,4 +11,8 @@ mod watcher;
...
@@ -11,4 +11,8 @@ mod watcher;
pub
use
watcher
::{
ModelUpdate
,
ModelWatcher
};
pub
use
watcher
::{
ModelUpdate
,
ModelWatcher
};
mod
worker_monitor
;
mod
worker_monitor
;
pub
use
worker_monitor
::{
KvWorkerMonitor
,
LoadThresholdConfig
,
WorkerLoadState
};
pub
use
worker_monitor
::{
KvWorkerMonitor
,
LoadThresholdConfig
,
WORKER_ACTIVE_DECODE_BLOCKS_GAUGE
,
WORKER_ACTIVE_PREFILL_TOKENS_GAUGE
,
WORKER_TYPE_DECODE
,
WORKER_TYPE_PREFILL
,
WorkerLoadState
,
register_worker_load_metrics
,
};
lib/llm/src/discovery/model_manager.rs
View file @
0c0336e6
...
@@ -354,6 +354,7 @@ impl ModelManager {
...
@@ -354,6 +354,7 @@ impl ModelManager {
endpoint
:
&
Endpoint
,
endpoint
:
&
Endpoint
,
kv_cache_block_size
:
u32
,
kv_cache_block_size
:
u32
,
kv_router_config
:
Option
<
KvRouterConfig
>
,
kv_router_config
:
Option
<
KvRouterConfig
>
,
worker_type
:
&
'static
str
,
)
->
anyhow
::
Result
<
Arc
<
KvRouter
>>
{
)
->
anyhow
::
Result
<
Arc
<
KvRouter
>>
{
let
endpoint_id
=
endpoint
.id
();
let
endpoint_id
=
endpoint
.id
();
...
@@ -403,6 +404,7 @@ impl ModelManager {
...
@@ -403,6 +404,7 @@ impl ModelManager {
Some
(
selector
),
Some
(
selector
),
kv_router_config
,
kv_router_config
,
instance_id
,
instance_id
,
worker_type
,
)
)
.await
?
;
.await
?
;
let
new_kv_chooser
=
Arc
::
new
(
chooser
);
let
new_kv_chooser
=
Arc
::
new
(
chooser
);
...
@@ -538,6 +540,11 @@ impl ModelManager {
...
@@ -538,6 +540,11 @@ impl ModelManager {
Some
(
monitor
.load_threshold_config
())
Some
(
monitor
.load_threshold_config
())
}
}
/// Gets an existing worker monitor for a model, if one exists.
pub
fn
get_worker_monitor
(
&
self
,
model
:
&
str
)
->
Option
<
KvWorkerMonitor
>
{
self
.worker_monitors
.get
(
model
)
.map
(|
m
|
m
.clone
())
}
/// Gets or creates a worker monitor for a model. Updates thresholds if monitor exists.
/// Gets or creates a worker monitor for a model. Updates thresholds if monitor exists.
pub
fn
get_or_create_worker_monitor
(
pub
fn
get_or_create_worker_monitor
(
&
self
,
&
self
,
...
...
lib/llm/src/discovery/watcher.rs
View file @
0c0336e6
...
@@ -24,6 +24,7 @@ use dynamo_runtime::{
...
@@ -24,6 +24,7 @@ use dynamo_runtime::{
use
crate
::{
use
crate
::{
backend
::
Backend
,
backend
::
Backend
,
discovery
::
WORKER_TYPE_DECODE
,
entrypoint
::{
self
,
EngineFactoryCallback
,
RouterConfig
},
entrypoint
::{
self
,
EngineFactoryCallback
,
RouterConfig
},
http
::
service
::
metrics
::
Metrics
,
http
::
service
::
metrics
::
Metrics
,
kv_router
::
PrefillRouter
,
kv_router
::
PrefillRouter
,
...
@@ -429,6 +430,7 @@ impl ModelWatcher {
...
@@ -429,6 +430,7 @@ impl ModelWatcher {
&
endpoint
,
&
endpoint
,
card
.kv_cache_block_size
,
card
.kv_cache_block_size
,
Some
(
self
.router_config.kv_router_config
),
Some
(
self
.router_config.kv_router_config
),
WORKER_TYPE_DECODE
,
// This is the decode router
)
)
.await
?
,
.await
?
,
)
)
...
@@ -441,9 +443,10 @@ impl ModelWatcher {
...
@@ -441,9 +443,10 @@ impl ModelWatcher {
// Create prefill chooser once if we're building pipelines
// Create prefill chooser once if we're building pipelines
// Both chat and completions will share the same prefill chooser instance
// Both chat and completions will share the same prefill chooser instance
let
model_name
=
card
.name
()
.to_string
();
let
prefill_chooser
=
self
let
prefill_chooser
=
self
.manager
.manager
.register_prefill_router
(
card
.name
()
.to_string
())
.register_prefill_router
(
model_name
.clone
())
.map
(|
rx
|
{
.map
(|
rx
|
{
// Create prefill-specific config with track_active_blocks disabled
// Create prefill-specific config with track_active_blocks disabled
let
mut
prefill_config
=
self
.router_config.kv_router_config
;
let
mut
prefill_config
=
self
.router_config.kv_router_config
;
...
@@ -456,21 +459,19 @@ impl ModelWatcher {
...
@@ -456,21 +459,19 @@ impl ModelWatcher {
card
.kv_cache_block_size
,
card
.kv_cache_block_size
,
Some
(
prefill_config
),
Some
(
prefill_config
),
self
.router_config.enforce_disagg
,
self
.router_config.enforce_disagg
,
model_name
.clone
(),
// Pass model name for worker monitor lookup
)
)
});
});
// Get or create the worker monitor for this model
// Get or create the worker monitor for this model.
// This allows dynamic threshold updates via the ModelManager
// Always create the monitor for Prometheus metrics (active_decode_blocks, active_prefill_tokens,
// Create monitor if any threshold is configured
// worker TTFT/ITL cleanup). The thresholds control busy detection behavior only.
let
worker_monitor
=
if
self
.router_config.load_threshold_config
.is_configured
()
{
// LoadThresholdConfig allows dynamic threshold updates via the ModelManager.
Some
(
self
.manager
.get_or_create_worker_monitor
(
let
worker_monitor
=
Some
(
self
.manager
.get_or_create_worker_monitor
(
card
.name
(),
card
.name
(),
client
.clone
(),
client
.clone
(),
self
.router_config.load_threshold_config
.clone
(),
self
.router_config.load_threshold_config
.clone
(),
))
));
}
else
{
None
};
// Add chat engine only if the model supports chat
// Add chat engine only if the model supports chat
if
card
.model_type
.supports_chat
()
{
if
card
.model_type
.supports_chat
()
{
...
...
lib/llm/src/discovery/worker_monitor.rs
View file @
0c0336e6
...
@@ -4,19 +4,93 @@
...
@@ -4,19 +4,93 @@
use
std
::
collections
::
HashMap
;
use
std
::
collections
::
HashMap
;
use
std
::
sync
::
Arc
;
use
std
::
sync
::
Arc
;
use
std
::
sync
::
atomic
::{
AtomicBool
,
AtomicU32
,
AtomicU64
,
Ordering
};
use
std
::
sync
::
atomic
::{
AtomicBool
,
AtomicU32
,
AtomicU64
,
Ordering
};
use
std
::
sync
::{
LazyLock
,
RwLock
};
use
tokio
::
sync
::
Notify
;
use
dashmap
::
DashMap
;
use
dashmap
::
DashMap
;
use
prometheus
::{
IntGaugeVec
,
Opts
,
Registry
};
use
serde
::{
Deserialize
,
Serialize
};
use
serde
::{
Deserialize
,
Serialize
};
use
crate
::
http
::
service
::
metrics
::{
WORKER_LAST_INPUT_SEQUENCE_TOKENS_GAUGE
,
WORKER_LAST_INTER_TOKEN_LATENCY_GAUGE
,
WORKER_LAST_TIME_TO_FIRST_TOKEN_GAUGE
,
};
use
crate
::
kv_router
::
KV_METRICS_SUBJECT
;
use
crate
::
kv_router
::
KV_METRICS_SUBJECT
;
use
crate
::
kv_router
::
protocols
::
ActiveLoad
;
use
crate
::
kv_router
::
protocols
::
ActiveLoad
;
use
crate
::
model_card
::
ModelDeploymentCard
;
use
crate
::
model_card
::
ModelDeploymentCard
;
use
dynamo_runtime
::
component
::
Client
;
use
dynamo_runtime
::
component
::
Client
;
use
dynamo_runtime
::
discovery
::{
DiscoveryQuery
,
watch_and_extract_field
};
use
dynamo_runtime
::
discovery
::{
DiscoveryQuery
,
watch_and_extract_field
};
use
dynamo_runtime
::
metrics
::
prometheus_names
::
frontend_service
;
use
dynamo_runtime
::
pipeline
::{
WorkerLoadMonitor
,
async_trait
};
use
dynamo_runtime
::
pipeline
::{
WorkerLoadMonitor
,
async_trait
};
use
dynamo_runtime
::
traits
::
DistributedRuntimeProvider
;
use
dynamo_runtime
::
traits
::
DistributedRuntimeProvider
;
use
dynamo_runtime
::
transports
::
event_plane
::
EventSubscriber
;
use
dynamo_runtime
::
transports
::
event_plane
::
EventSubscriber
;
// Re-export worker type constants from timing.rs (single source of truth)
pub
use
crate
::
protocols
::
common
::
timing
::{
WORKER_TYPE_DECODE
,
WORKER_TYPE_PREFILL
};
/// Global Prometheus gauge for active decode blocks per worker (labels: worker_id, dp_rank, worker_type)
/// This is shared across all KvWorkerMonitor instances.
pub
static
WORKER_ACTIVE_DECODE_BLOCKS_GAUGE
:
LazyLock
<
IntGaugeVec
>
=
LazyLock
::
new
(||
{
IntGaugeVec
::
new
(
Opts
::
new
(
format!
(
"dynamo_frontend_{}"
,
frontend_service
::
WORKER_ACTIVE_DECODE_BLOCKS
),
"Active KV cache decode blocks per worker"
,
),
&
[
"worker_id"
,
"dp_rank"
,
"worker_type"
],
)
.expect
(
"Failed to create worker_active_decode_blocks gauge"
)
});
/// Global Prometheus gauge for active prefill tokens per worker (labels: worker_id, dp_rank, worker_type)
/// This is shared across all KvWorkerMonitor instances.
pub
static
WORKER_ACTIVE_PREFILL_TOKENS_GAUGE
:
LazyLock
<
IntGaugeVec
>
=
LazyLock
::
new
(||
{
IntGaugeVec
::
new
(
Opts
::
new
(
format!
(
"dynamo_frontend_{}"
,
frontend_service
::
WORKER_ACTIVE_PREFILL_TOKENS
),
"Active prefill tokens queued per worker"
,
),
&
[
"worker_id"
,
"dp_rank"
,
"worker_type"
],
)
.expect
(
"Failed to create worker_active_prefill_tokens gauge"
)
});
/// Register the global worker load Prometheus metrics with the given registry.
///
/// This should be called once during HTTP service setup to expose the worker load
/// metrics via the `/metrics` endpoint.
///
/// # Errors
/// Returns an error if the metrics are already registered with the registry.
pub
fn
register_worker_load_metrics
(
registry
:
&
Registry
)
->
Result
<
(),
prometheus
::
Error
>
{
registry
.register
(
Box
::
new
(
WORKER_ACTIVE_DECODE_BLOCKS_GAUGE
.clone
()))
?
;
registry
.register
(
Box
::
new
(
WORKER_ACTIVE_PREFILL_TOKENS_GAUGE
.clone
()))
?
;
Ok
(())
}
/// Clean up all Prometheus metrics for a worker across the specified dp_ranks.
///
/// This removes metrics with the given worker_id, dp_rank, and worker_type label combination.
/// Called when workers are removed to prevent stale metrics from accumulating.
fn
cleanup_worker_metrics
(
worker_id
:
u64
,
dp_ranks
:
&
[
u32
],
worker_type
:
&
str
)
{
let
worker_id_str
=
worker_id
.to_string
();
for
dp_rank
in
dp_ranks
{
let
dp_rank_str
=
dp_rank
.to_string
();
let
labels
=
&
[
worker_id_str
.as_str
(),
dp_rank_str
.as_str
(),
worker_type
];
let
_
=
WORKER_ACTIVE_DECODE_BLOCKS_GAUGE
.remove_label_values
(
labels
);
let
_
=
WORKER_ACTIVE_PREFILL_TOKENS_GAUGE
.remove_label_values
(
labels
);
let
_
=
WORKER_LAST_TIME_TO_FIRST_TOKEN_GAUGE
.remove_label_values
(
labels
);
let
_
=
WORKER_LAST_INPUT_SEQUENCE_TOKENS_GAUGE
.remove_label_values
(
labels
);
let
_
=
WORKER_LAST_INTER_TOKEN_LATENCY_GAUGE
.remove_label_values
(
labels
);
}
}
/// Scale factor for storing f64 thresholds as u32 (10000 = 4 decimal places)
/// Scale factor for storing f64 thresholds as u32 (10000 = 4 decimal places)
const
THRESHOLD_SCALE
:
u32
=
10000
;
const
THRESHOLD_SCALE
:
u32
=
10000
;
...
@@ -140,9 +214,21 @@ impl WorkerLoadState {
...
@@ -140,9 +214,21 @@ impl WorkerLoadState {
///
///
/// Cloning shares state via internal Arc-wrapped fields. This allows multiple pipelines
/// Cloning shares state via internal Arc-wrapped fields. This allows multiple pipelines
/// (e.g., chat and completions) to share the same monitor instance.
/// (e.g., chat and completions) to share the same monitor instance.
///
/// Prometheus metrics are exposed via the global gauges [`WORKER_ACTIVE_DECODE_BLOCKS_GAUGE`]
/// and [`WORKER_ACTIVE_PREFILL_TOKENS_GAUGE`], which should be registered with the HTTP
/// service's Prometheus registry using [`register_worker_load_metrics`].
///
/// In disaggregated mode, use `set_prefill_client` to register the prefill endpoint for
/// proper TTFT metric cleanup when prefill workers are removed.
#[derive(Clone)]
#[derive(Clone)]
pub
struct
KvWorkerMonitor
{
pub
struct
KvWorkerMonitor
{
/// Decode endpoint client (used for ITL cleanup and busy detection)
client
:
Client
,
client
:
Client
,
/// Optional prefill endpoint client (used for TTFT cleanup in disaggregated mode)
prefill_client
:
Arc
<
RwLock
<
Option
<
Client
>>>
,
/// Notifies the monitoring task when a prefill client is registered
prefill_client_notify
:
Arc
<
Notify
>
,
worker_load_states
:
Arc
<
DashMap
<
u64
,
WorkerLoadState
>>
,
worker_load_states
:
Arc
<
DashMap
<
u64
,
WorkerLoadState
>>
,
/// Active decode blocks threshold stored as parts-per-10000 (e.g., 8500 = 0.85)
/// Active decode blocks threshold stored as parts-per-10000 (e.g., 8500 = 0.85)
active_decode_blocks_threshold
:
Arc
<
AtomicU32
>
,
active_decode_blocks_threshold
:
Arc
<
AtomicU32
>
,
...
@@ -164,6 +250,12 @@ impl KvWorkerMonitor {
...
@@ -164,6 +250,12 @@ impl KvWorkerMonitor {
/// - `active_decode_blocks_threshold`: 1.0 (effectively disabled)
/// - `active_decode_blocks_threshold`: 1.0 (effectively disabled)
/// - `active_prefill_tokens_threshold`: DEFAULT_MAX_TOKENS (effectively disabled)
/// - `active_prefill_tokens_threshold`: DEFAULT_MAX_TOKENS (effectively disabled)
/// - `active_prefill_tokens_threshold_frac`: 1.5 (effectively disabled)
/// - `active_prefill_tokens_threshold_frac`: 1.5 (effectively disabled)
///
/// Prometheus metrics are exposed via the global gauges and should be registered
/// using [`register_worker_load_metrics`] during HTTP service setup.
///
/// For disaggregated mode, call `set_prefill_client` after creation to enable
/// proper TTFT metric cleanup when prefill workers are removed.
pub
fn
new
(
client
:
Client
,
config
:
LoadThresholdConfig
)
->
Self
{
pub
fn
new
(
client
:
Client
,
config
:
LoadThresholdConfig
)
->
Self
{
let
active_decode_blocks
=
config
.active_decode_blocks_threshold
.unwrap_or
(
1.0
);
let
active_decode_blocks
=
config
.active_decode_blocks_threshold
.unwrap_or
(
1.0
);
let
active_prefill_tokens
=
config
let
active_prefill_tokens
=
config
...
@@ -173,6 +265,8 @@ impl KvWorkerMonitor {
...
@@ -173,6 +265,8 @@ impl KvWorkerMonitor {
Self
{
Self
{
client
,
client
,
prefill_client
:
Arc
::
new
(
RwLock
::
new
(
None
)),
prefill_client_notify
:
Arc
::
new
(
Notify
::
new
()),
worker_load_states
:
Arc
::
new
(
DashMap
::
new
()),
worker_load_states
:
Arc
::
new
(
DashMap
::
new
()),
active_decode_blocks_threshold
:
Arc
::
new
(
AtomicU32
::
new
(
Self
::
f64_to_scaled
(
active_decode_blocks_threshold
:
Arc
::
new
(
AtomicU32
::
new
(
Self
::
f64_to_scaled
(
active_decode_blocks
,
active_decode_blocks
,
...
@@ -185,6 +279,22 @@ impl KvWorkerMonitor {
...
@@ -185,6 +279,22 @@ impl KvWorkerMonitor {
}
}
}
}
/// Set the prefill client for disaggregated mode.
///
/// This enables monitoring of prefill endpoint instances for TTFT metric cleanup.
/// In disaggregated mode, TTFT metrics are attributed to prefill workers, so we need
/// to watch the prefill endpoint to clean up TTFT gauges when prefill workers disappear.
///
/// This method can be called after `start_monitoring` - the monitoring loop will
/// be immediately notified and start watching the prefill endpoint.
pub
fn
set_prefill_client
(
&
self
,
prefill_client
:
Client
)
{
let
mut
guard
=
self
.prefill_client
.write
()
.unwrap
();
*
guard
=
Some
(
prefill_client
);
// Notify the monitoring task that prefill client is now available
self
.prefill_client_notify
.notify_one
();
tracing
::
debug!
(
"KvWorkerMonitor: prefill client registered for TTFT cleanup"
);
}
/// Convert a f64 threshold to scaled u32 for atomic storage.
/// Convert a f64 threshold to scaled u32 for atomic storage.
#[inline]
#[inline]
fn
f64_to_scaled
(
threshold
:
f64
)
->
u32
{
fn
f64_to_scaled
(
threshold
:
f64
)
->
u32
{
...
@@ -277,22 +387,48 @@ impl WorkerLoadMonitor for KvWorkerMonitor {
...
@@ -277,22 +387,48 @@ impl WorkerLoadMonitor for KvWorkerMonitor {
// Watch for runtime config updates from model deployment cards via discovery interface
// Watch for runtime config updates from model deployment cards via discovery interface
let
discovery
=
component
.drt
()
.discovery
();
let
discovery
=
component
.drt
()
.discovery
();
let
discovery_stream
=
discovery
let
discovery_stream
=
match
discovery
.list_and_watch
(
DiscoveryQuery
::
AllModels
,
Some
(
cancellation_token
.clone
()))
.list_and_watch
(
DiscoveryQuery
::
AllModels
,
Some
(
cancellation_token
.clone
()))
.await
?
;
.await
{
Ok
(
stream
)
=>
stream
,
Err
(
e
)
=>
{
tracing
::
error!
(
"KvWorkerMonitor: failed to create discovery stream: {}"
,
e
);
// Reset started flag so retry can work
self
.started
.store
(
false
,
Ordering
::
SeqCst
);
return
Err
(
e
);
}
};
let
mut
config_events_rx
=
let
mut
config_events_rx
=
watch_and_extract_field
(
discovery_stream
,
|
card
:
ModelDeploymentCard
|
{
watch_and_extract_field
(
discovery_stream
,
|
card
:
ModelDeploymentCard
|
{
card
.runtime_config
card
.runtime_config
});
});
// Subscribe to KV metrics events using EventSubscriber (Msgpack payloads)
// Subscribe to KV metrics events using EventSubscriber (Msgpack payloads)
let
mut
kv_metrics_rx
=
// This is optional - if NATS isn't available, we skip KV metrics but still do TTFT/ITL cleanup
EventSubscriber
::
for_namespace
(
component
.namespace
(),
KV_METRICS_SUBJECT
)
let
kv_metrics_rx
=
match
EventSubscriber
::
for_namespace
(
.await
?
component
.namespace
(),
.typed
::
<
ActiveLoad
>
();
KV_METRICS_SUBJECT
,
)
.await
{
Ok
(
sub
)
=>
Some
(
sub
.typed
::
<
ActiveLoad
>
()),
Err
(
e
)
=>
{
tracing
::
warn!
(
"KvWorkerMonitor: KV metrics subscriber not available ({}), skipping load metrics."
,
e
);
None
}
};
// Watch decode endpoint instances for cleanup (ITL metrics)
let
mut
decode_instances_rx
=
self
.client
.instance_avail_watcher
();
let
worker_load_states
=
self
.worker_load_states
.clone
();
let
worker_load_states
=
self
.worker_load_states
.clone
();
let
client
=
self
.client
.clone
();
let
client
=
self
.client
.clone
();
let
prefill_client_holder
=
self
.prefill_client
.clone
();
let
prefill_client_notify
=
self
.prefill_client_notify
.clone
();
let
active_decode_blocks_threshold
=
self
.active_decode_blocks_threshold
.clone
();
let
active_decode_blocks_threshold
=
self
.active_decode_blocks_threshold
.clone
();
let
active_prefill_tokens_threshold
=
self
.active_prefill_tokens_threshold
.clone
();
let
active_prefill_tokens_threshold
=
self
.active_prefill_tokens_threshold
.clone
();
let
active_prefill_tokens_threshold_frac
=
let
active_prefill_tokens_threshold_frac
=
...
@@ -300,9 +436,32 @@ impl WorkerLoadMonitor for KvWorkerMonitor {
...
@@ -300,9 +436,32 @@ impl WorkerLoadMonitor for KvWorkerMonitor {
// Spawn background monitoring task
// Spawn background monitoring task
tokio
::
spawn
(
async
move
{
tokio
::
spawn
(
async
move
{
let
mut
kv_metrics_rx
=
kv_metrics_rx
;
// Move into async block
let
mut
previous_busy_instances
=
Vec
::
new
();
// Track previous state
let
mut
previous_busy_instances
=
Vec
::
new
();
// Track previous state
// Track decode worker IDs (for ITL cleanup)
let
mut
known_decode_workers
:
std
::
collections
::
HashSet
<
u64
>
=
decode_instances_rx
.borrow
()
.iter
()
.copied
()
.collect
();
// Track prefill worker IDs (for TTFT cleanup in disaggregated mode)
let
mut
known_prefill_workers
:
std
::
collections
::
HashSet
<
u64
>
=
std
::
collections
::
HashSet
::
new
();
let
mut
prefill_instances_rx
:
Option
<
tokio
::
sync
::
watch
::
Receiver
<
Vec
<
u64
>>>
=
None
;
let
mut
known_worker_dp_ranks
:
HashMap
<
u64
,
std
::
collections
::
HashSet
<
u32
>>
=
HashMap
::
new
();
loop
{
loop
{
// Create a future that either reads from kv_metrics or pends forever if unavailable
let
kv_event_future
=
async
{
if
let
Some
(
ref
mut
rx
)
=
kv_metrics_rx
{
rx
.next
()
.await
}
else
{
// If no subscriber, pend forever (this branch is effectively disabled)
std
::
future
::
pending
()
.await
}
};
tokio
::
select!
{
tokio
::
select!
{
_
=
cancellation_token
.cancelled
()
=>
{
_
=
cancellation_token
.cancelled
()
=>
{
tracing
::
debug!
(
"Worker monitoring cancelled"
);
tracing
::
debug!
(
"Worker monitoring cancelled"
);
...
@@ -313,12 +472,40 @@ impl WorkerLoadMonitor for KvWorkerMonitor {
...
@@ -313,12 +472,40 @@ impl WorkerLoadMonitor for KvWorkerMonitor {
_
=
config_events_rx
.changed
()
=>
{
_
=
config_events_rx
.changed
()
=>
{
let
runtime_configs
=
config_events_rx
.borrow
()
.clone
();
let
runtime_configs
=
config_events_rx
.borrow
()
.clone
();
// Find workers that are being removed (not in runtime_configs anymore)
let
removed_workers
:
Vec
<
u64
>
=
known_worker_dp_ranks
.keys
()
.filter
(|
id
|
!
runtime_configs
.contains_key
(
id
))
.copied
()
.collect
();
// Clean up Prometheus metrics for removed workers
for
worker_id
in
&
removed_workers
{
if
let
Some
(
dp_ranks
)
=
known_worker_dp_ranks
.remove
(
worker_id
)
{
let
dp_ranks_vec
:
Vec
<
u32
>
=
dp_ranks
.into_iter
()
.collect
();
// Clean up metrics for both worker types since we don't know which type this worker was
cleanup_worker_metrics
(
*
worker_id
,
&
dp_ranks_vec
,
WORKER_TYPE_DECODE
);
cleanup_worker_metrics
(
*
worker_id
,
&
dp_ranks_vec
,
WORKER_TYPE_PREFILL
);
tracing
::
debug!
(
"Removed Prometheus metrics for worker {}"
,
worker_id
);
}
}
worker_load_states
.retain
(|
lease_id
,
_
|
runtime_configs
.contains_key
(
lease_id
));
worker_load_states
.retain
(|
lease_id
,
_
|
runtime_configs
.contains_key
(
lease_id
));
// Update worker load states with runtime config values for all dp_ranks
// Update worker load states with runtime config values for all dp_ranks
// This ensures we track workers from MDCs even if they don't publish ActiveLoad
for
(
lease_id
,
runtime_config
)
in
runtime_configs
.iter
()
{
for
(
lease_id
,
runtime_config
)
in
runtime_configs
.iter
()
{
let
mut
state
=
worker_load_states
.entry
(
*
lease_id
)
.or_default
();
let
mut
state
=
worker_load_states
.entry
(
*
lease_id
)
.or_default
();
// Track dp_ranks for this worker (for cleanup when worker disappears)
let
dp_ranks_set
=
known_worker_dp_ranks
.entry
(
*
lease_id
)
.or_default
();
for
dp_rank
in
0
..
runtime_config
.data_parallel_size
{
dp_ranks_set
.insert
(
dp_rank
);
}
// Populate total_blocks for all dp_ranks (they share the same total)
// Populate total_blocks for all dp_ranks (they share the same total)
if
let
Some
(
total_blocks
)
=
runtime_config
.total_kv_blocks
{
if
let
Some
(
total_blocks
)
=
runtime_config
.total_kv_blocks
{
for
dp_rank
in
0
..
runtime_config
.data_parallel_size
{
for
dp_rank
in
0
..
runtime_config
.data_parallel_size
{
...
@@ -335,8 +522,10 @@ impl WorkerLoadMonitor for KvWorkerMonitor {
...
@@ -335,8 +522,10 @@ impl WorkerLoadMonitor for KvWorkerMonitor {
}
}
}
}
// Handle KV metrics updates (ActiveLoad)
// Handle KV metrics updates (ActiveLoad) - only if subscriber is available
kv_event
=
kv_metrics_rx
.next
()
=>
{
// Note: Prometheus gauges are updated directly by sequence.rs (router's own bookkeeping)
// This branch only updates WorkerLoadState for busy detection thresholds
kv_event
=
kv_event_future
=>
{
let
Some
(
event_result
)
=
kv_event
else
{
let
Some
(
event_result
)
=
kv_event
else
{
tracing
::
debug!
(
"KV metrics stream closed"
);
tracing
::
debug!
(
"KV metrics stream closed"
);
break
;
break
;
...
@@ -350,7 +539,14 @@ impl WorkerLoadMonitor for KvWorkerMonitor {
...
@@ -350,7 +539,14 @@ impl WorkerLoadMonitor for KvWorkerMonitor {
let
worker_id
=
active_load
.worker_id
;
let
worker_id
=
active_load
.worker_id
;
let
dp_rank
=
active_load
.dp_rank
;
let
dp_rank
=
active_load
.dp_rank
;
// Update worker load state per dp_rank
// Track known worker/dp_rank combinations for cleanup
known_worker_dp_ranks
.entry
(
worker_id
)
.or_default
()
.insert
(
dp_rank
);
// Update worker load state per dp_rank (for busy detection only)
// Note: Prometheus gauges are updated directly by sequence.rs
{
{
let
mut
state
=
worker_load_states
.entry
(
worker_id
)
.or_default
();
let
mut
state
=
worker_load_states
.entry
(
worker_id
)
.or_default
();
if
let
Some
(
active_blocks
)
=
active_load
.active_decode_blocks
{
if
let
Some
(
active_blocks
)
=
active_load
.active_decode_blocks
{
...
@@ -391,6 +587,99 @@ impl WorkerLoadMonitor for KvWorkerMonitor {
...
@@ -391,6 +587,99 @@ impl WorkerLoadMonitor for KvWorkerMonitor {
previous_busy_instances
=
busy_instances
;
previous_busy_instances
=
busy_instances
;
}
}
}
}
// Handle decode endpoint instance changes (for ITL and decode metrics cleanup)
_
=
decode_instances_rx
.changed
()
=>
{
let
current_instances
:
std
::
collections
::
HashSet
<
u64
>
=
decode_instances_rx
.borrow
()
.iter
()
.copied
()
.collect
();
// Find decode workers that disappeared
let
removed_workers
:
Vec
<
u64
>
=
known_decode_workers
.difference
(
&
current_instances
)
.copied
()
.collect
();
if
!
removed_workers
.is_empty
()
{
// Clean up metrics for removed decode workers (with worker_type=decode label)
for
worker_id
in
&
removed_workers
{
// Get dp_ranks from known_worker_dp_ranks if available, otherwise use [0]
let
dp_ranks
:
Vec
<
u32
>
=
known_worker_dp_ranks
.get
(
worker_id
)
.map
(|
ranks
|
ranks
.iter
()
.copied
()
.collect
())
.unwrap_or_else
(||
vec!
[
0
]);
cleanup_worker_metrics
(
*
worker_id
,
&
dp_ranks
,
WORKER_TYPE_DECODE
);
tracing
::
debug!
(
"Cleaned up metrics for removed decode worker {}"
,
worker_id
);
}
}
known_decode_workers
=
current_instances
;
}
// Handle prefill endpoint instance changes (for TTFT and prefill metrics cleanup in disaggregated mode)
result
=
async
{
if
let
Some
(
ref
mut
rx
)
=
prefill_instances_rx
{
rx
.changed
()
.await
}
else
{
// No prefill watcher yet, pend forever
std
::
future
::
pending
()
.await
}
}
=>
{
// Handle channel closure (e.g., all prefill workers went down)
let
Ok
(())
=
result
else
{
// Prefill endpoint closed - stop watching to avoid busy loop
prefill_instances_rx
=
None
;
tracing
::
info!
(
"Prefill endpoint watcher closed, will re-activate when client is set"
);
continue
;
};
let
Some
(
ref
rx
)
=
prefill_instances_rx
else
{
continue
;
};
let
current_instances
:
std
::
collections
::
HashSet
<
u64
>
=
rx
.borrow
()
.iter
()
.copied
()
.collect
();
// Find prefill workers that disappeared
let
removed_workers
:
Vec
<
u64
>
=
known_prefill_workers
.difference
(
&
current_instances
)
.copied
()
.collect
();
if
!
removed_workers
.is_empty
()
{
// Clean up metrics for removed prefill workers (with worker_type=prefill label)
for
worker_id
in
&
removed_workers
{
// Get dp_ranks from known_worker_dp_ranks if available, otherwise use [0]
let
dp_ranks
:
Vec
<
u32
>
=
known_worker_dp_ranks
.get
(
worker_id
)
.map
(|
ranks
|
ranks
.iter
()
.copied
()
.collect
())
.unwrap_or_else
(||
vec!
[
0
]);
cleanup_worker_metrics
(
*
worker_id
,
&
dp_ranks
,
WORKER_TYPE_PREFILL
);
tracing
::
debug!
(
"Cleaned up metrics for removed prefill worker {}"
,
worker_id
);
}
}
known_prefill_workers
=
current_instances
;
}
// Wait for prefill client to be registered (push-based notification)
_
=
prefill_client_notify
.notified
(),
if
prefill_instances_rx
.is_none
()
=>
{
let
guard
=
prefill_client_holder
.read
()
.unwrap
();
if
let
Some
(
ref
prefill_client
)
=
*
guard
{
let
rx
=
prefill_client
.instance_avail_watcher
();
known_prefill_workers
=
rx
.borrow
()
.iter
()
.copied
()
.collect
();
prefill_instances_rx
=
Some
(
rx
);
tracing
::
info!
(
"KvWorkerMonitor: prefill endpoint watcher activated, tracking {} workers"
,
known_prefill_workers
.len
()
);
}
}
}
}
}
}
...
...
lib/llm/src/entrypoint/input/common.rs
View file @
0c0336e6
...
@@ -271,6 +271,9 @@ where
...
@@ -271,6 +271,9 @@ where
let
service_backend
=
match
router_mode
{
let
service_backend
=
match
router_mode
{
RouterMode
::
Random
|
RouterMode
::
RoundRobin
|
RouterMode
::
Direct
(
_
)
=>
{
RouterMode
::
Random
|
RouterMode
::
RoundRobin
|
RouterMode
::
Direct
(
_
)
=>
{
// Non-KV routing: use PushRouter directly.
// Note: Per-worker metrics (active_prefill_tokens, active_decode_blocks) are only
// available in KV routing mode where the router has actual bookkeeping.
ServiceBackend
::
from_engine
(
Arc
::
new
(
router
))
ServiceBackend
::
from_engine
(
Arc
::
new
(
router
))
}
}
RouterMode
::
KV
=>
{
RouterMode
::
KV
=>
{
...
...
lib/llm/src/http/service/metrics.rs
View file @
0c0336e6
...
@@ -14,10 +14,12 @@ use dynamo_runtime::{
...
@@ -14,10 +14,12 @@ use dynamo_runtime::{
frontend_service
,
name_prefix
,
sanitize_frontend_prometheus_prefix
,
frontend_service
,
name_prefix
,
sanitize_frontend_prometheus_prefix
,
},
},
};
};
use
prometheus
::{
Encoder
,
HistogramOpts
,
HistogramVec
,
IntCounterVec
,
IntGaugeVec
,
Opts
};
use
prometheus
::{
Encoder
,
GaugeVec
,
HistogramOpts
,
HistogramVec
,
IntCounterVec
,
IntGaugeVec
,
Opts
,
};
use
serde
::
Serialize
;
use
serde
::
Serialize
;
use
std
::{
use
std
::{
sync
::
Arc
,
sync
::
{
Arc
,
LazyLock
},
time
::{
Duration
,
Instant
},
time
::{
Duration
,
Instant
},
};
};
...
@@ -29,6 +31,72 @@ pub use prometheus::Registry;
...
@@ -29,6 +31,72 @@ pub use prometheus::Registry;
use
super
::
RouteDoc
;
use
super
::
RouteDoc
;
/// Worker type label values for Prometheus timing metrics
pub
use
crate
::
discovery
::{
WORKER_TYPE_DECODE
,
WORKER_TYPE_PREFILL
};
/// Global Prometheus gauge for last observed TTFT per worker (in seconds)
/// Labels: worker_id, dp_rank, worker_type
pub
static
WORKER_LAST_TIME_TO_FIRST_TOKEN_GAUGE
:
LazyLock
<
GaugeVec
>
=
LazyLock
::
new
(||
{
GaugeVec
::
new
(
Opts
::
new
(
format!
(
"dynamo_frontend_{}"
,
frontend_service
::
WORKER_LAST_TIME_TO_FIRST_TOKEN_SECONDS
),
"Last observed time to first token per worker (seconds)"
,
),
&
[
"worker_id"
,
"dp_rank"
,
"worker_type"
],
)
.expect
(
"Failed to create worker_last_time_to_first_token gauge"
)
});
/// Global Prometheus gauge for last observed input sequence tokens per worker
/// Labels: worker_id, dp_rank, worker_type
/// Updated atomically with TTFT - represents the input token count from the same request
pub
static
WORKER_LAST_INPUT_SEQUENCE_TOKENS_GAUGE
:
LazyLock
<
IntGaugeVec
>
=
LazyLock
::
new
(||
{
IntGaugeVec
::
new
(
Opts
::
new
(
format!
(
"dynamo_frontend_{}"
,
frontend_service
::
WORKER_LAST_INPUT_SEQUENCE_TOKENS
),
"Last observed input sequence tokens per worker"
,
),
&
[
"worker_id"
,
"dp_rank"
,
"worker_type"
],
)
.expect
(
"Failed to create worker_last_input_sequence_tokens gauge"
)
});
/// Global Prometheus gauge for last observed ITL per worker (in seconds)
/// Labels: worker_id, dp_rank, worker_type
pub
static
WORKER_LAST_INTER_TOKEN_LATENCY_GAUGE
:
LazyLock
<
GaugeVec
>
=
LazyLock
::
new
(||
{
GaugeVec
::
new
(
Opts
::
new
(
format!
(
"dynamo_frontend_{}"
,
frontend_service
::
WORKER_LAST_INTER_TOKEN_LATENCY_SECONDS
),
"Last observed inter-token latency per worker (seconds)"
,
),
&
[
"worker_id"
,
"dp_rank"
,
"worker_type"
],
)
.expect
(
"Failed to create worker_last_inter_token_latency gauge"
)
});
/// Register the global per-worker TTFT/ITL/input-tokens Prometheus metrics with the given registry.
///
/// This should be called once during HTTP service setup to expose the metrics
/// via the `/metrics` endpoint.
///
/// # Errors
/// Returns an error if the metrics are already registered with the registry.
pub
fn
register_worker_timing_metrics
(
registry
:
&
Registry
)
->
Result
<
(),
prometheus
::
Error
>
{
registry
.register
(
Box
::
new
(
WORKER_LAST_TIME_TO_FIRST_TOKEN_GAUGE
.clone
()))
?
;
registry
.register
(
Box
::
new
(
WORKER_LAST_INPUT_SEQUENCE_TOKENS_GAUGE
.clone
()))
?
;
registry
.register
(
Box
::
new
(
WORKER_LAST_INTER_TOKEN_LATENCY_GAUGE
.clone
()))
?
;
Ok
(())
}
/// Generate log-spaced histogram buckets with values rounded to 2 significant figures.
/// Generate log-spaced histogram buckets with values rounded to 2 significant figures.
///
///
/// # Arguments
/// # Arguments
...
@@ -259,6 +327,16 @@ pub struct ResponseMetricCollector {
...
@@ -259,6 +327,16 @@ pub struct ResponseMetricCollector {
osl
:
usize
,
osl
:
usize
,
// we track if cached_tokens has been observed to ensure we only increment once per request
// we track if cached_tokens has been observed to ensure we only increment once per request
cached_tokens_observed
:
bool
,
cached_tokens_observed
:
bool
,
// Prefill worker info for TTFT attribution (set from LLMMetricAnnotation)
prefill_worker_id
:
Option
<
u64
>
,
prefill_dp_rank
:
Option
<
u32
>
,
// Prefill worker type for Prometheus labeling - stored at routing time to avoid MDC lookup
prefill_worker_type
:
Option
<
String
>
,
// Decode worker info for ITL attribution (set from LLMMetricAnnotation)
decode_worker_id
:
Option
<
u64
>
,
decode_dp_rank
:
Option
<
u32
>
,
// Decode worker type for Prometheus labeling - stored at routing time to avoid MDC lookup
decode_worker_type
:
Option
<
String
>
,
}
}
impl
Default
for
Metrics
{
impl
Default
for
Metrics
{
...
@@ -891,6 +969,44 @@ impl ResponseMetricCollector {
...
@@ -891,6 +969,44 @@ impl ResponseMetricCollector {
start_time
:
Instant
::
now
(),
start_time
:
Instant
::
now
(),
osl
:
0
,
osl
:
0
,
cached_tokens_observed
:
false
,
cached_tokens_observed
:
false
,
prefill_worker_id
:
None
,
prefill_dp_rank
:
None
,
prefill_worker_type
:
None
,
decode_worker_id
:
None
,
decode_dp_rank
:
None
,
decode_worker_type
:
None
,
}
}
/// Set the worker info for per-worker TTFT/ITL metrics.
/// In disaggregated mode, TTFT is attributed to prefill worker, ITL to decode worker.
/// Worker types are stored at routing time to avoid expensive MDC lookup when updating metrics.
pub
fn
set_worker_info
(
&
mut
self
,
prefill_worker_id
:
Option
<
u64
>
,
prefill_dp_rank
:
Option
<
u32
>
,
prefill_worker_type
:
Option
<
String
>
,
decode_worker_id
:
Option
<
u64
>
,
decode_dp_rank
:
Option
<
u32
>
,
decode_worker_type
:
Option
<
String
>
,
)
{
if
self
.prefill_worker_id
.is_none
()
{
self
.prefill_worker_id
=
prefill_worker_id
;
}
if
self
.prefill_dp_rank
.is_none
()
{
self
.prefill_dp_rank
=
prefill_dp_rank
;
}
if
self
.prefill_worker_type
.is_none
()
{
self
.prefill_worker_type
=
prefill_worker_type
;
}
if
self
.decode_worker_id
.is_none
()
{
self
.decode_worker_id
=
decode_worker_id
;
}
if
self
.decode_dp_rank
.is_none
()
{
self
.decode_dp_rank
=
decode_dp_rank
;
}
if
self
.decode_worker_type
.is_none
()
{
self
.decode_worker_type
=
decode_worker_type
;
}
}
}
}
...
@@ -941,6 +1057,28 @@ impl ResponseMetricCollector {
...
@@ -941,6 +1057,28 @@ impl ResponseMetricCollector {
.with_label_values
(
&
[
&
self
.model
])
.with_label_values
(
&
[
&
self
.model
])
.observe
(
ttft
);
.observe
(
ttft
);
// Update per-worker TTFT and input sequence tokens gauges - attributed to prefill worker.
// Both gauges are updated atomically from the same request to correlate latency with input size.
// Use stored worker_type (from routing time) to avoid MDC lookup.
// Falls back to WORKER_TYPE_PREFILL if not available.
if
let
Some
(
worker_id
)
=
self
.prefill_worker_id
{
let
worker_id_str
=
worker_id
.to_string
();
let
dp_rank_str
=
self
.prefill_dp_rank
.map_or
(
"0"
.to_string
(),
|
r
|
r
.to_string
());
let
worker_type
=
self
.prefill_worker_type
.as_deref
()
.unwrap_or
(
WORKER_TYPE_PREFILL
);
let
labels
=
&
[
worker_id_str
.as_str
(),
dp_rank_str
.as_str
(),
worker_type
];
WORKER_LAST_TIME_TO_FIRST_TOKEN_GAUGE
.with_label_values
(
labels
)
.set
(
ttft
);
WORKER_LAST_INPUT_SEQUENCE_TOKENS_GAUGE
.with_label_values
(
labels
)
.set
(
isl
as
i64
);
}
// Publish ISL
// Publish ISL
// TODO: publish ISL as soon as the tokenization process completes
// TODO: publish ISL as soon as the tokenization process completes
self
.metrics
self
.metrics
...
@@ -960,6 +1098,23 @@ impl ResponseMetricCollector {
...
@@ -960,6 +1098,23 @@ impl ResponseMetricCollector {
.with_label_values
(
&
[
&
self
.model
])
.with_label_values
(
&
[
&
self
.model
])
.observe
(
itl
);
.observe
(
itl
);
}
}
// Update per-worker ITL gauge - attributed to decode worker.
// Use stored worker_type (from routing time) to avoid MDC lookup.
// Falls back to WORKER_TYPE_DECODE if not available.
if
let
Some
(
worker_id
)
=
self
.decode_worker_id
{
let
worker_id_str
=
worker_id
.to_string
();
let
dp_rank_str
=
self
.decode_dp_rank
.map_or
(
"0"
.to_string
(),
|
r
|
r
.to_string
());
let
worker_type
=
self
.decode_worker_type
.as_deref
()
.unwrap_or
(
WORKER_TYPE_DECODE
);
WORKER_LAST_INTER_TOKEN_LATENCY_GAUGE
.with_label_values
(
&
[
worker_id_str
.as_str
(),
dp_rank_str
.as_str
(),
worker_type
])
.set
(
itl
);
}
}
}
self
.last_response_time
=
Some
(
current_duration
);
self
.last_response_time
=
Some
(
current_duration
);
...
@@ -992,6 +1147,14 @@ pub fn process_response_and_observe_metrics<T>(
...
@@ -992,6 +1147,14 @@ pub fn process_response_and_observe_metrics<T>(
if
let
Ok
(
Some
(
metrics
))
=
LLMMetricAnnotation
::
from_annotation
(
annotated
)
{
if
let
Ok
(
Some
(
metrics
))
=
LLMMetricAnnotation
::
from_annotation
(
annotated
)
{
response_collector
.observe_current_osl
(
metrics
.output_tokens
);
response_collector
.observe_current_osl
(
metrics
.output_tokens
);
response_collector
.observe_cached_tokens
(
metrics
.cached_tokens
);
response_collector
.observe_cached_tokens
(
metrics
.cached_tokens
);
response_collector
.set_worker_info
(
metrics
.prefill_worker_id
,
metrics
.prefill_dp_rank
,
metrics
.prefill_worker_type
,
metrics
.decode_worker_id
,
metrics
.decode_dp_rank
,
metrics
.decode_worker_type
,
);
// Drop http_queue_guard on first token for non-streaming (same as streaming)
// Drop http_queue_guard on first token for non-streaming (same as streaming)
if
response_collector
.is_first_token
()
if
response_collector
.is_first_token
()
...
@@ -1033,6 +1196,14 @@ pub fn process_response_using_event_converter_and_observe_metrics<T: Serialize>(
...
@@ -1033,6 +1196,14 @@ pub fn process_response_using_event_converter_and_observe_metrics<T: Serialize>(
if
let
Ok
(
Some
(
metrics
))
=
LLMMetricAnnotation
::
from_annotation
(
&
annotated
)
{
if
let
Ok
(
Some
(
metrics
))
=
LLMMetricAnnotation
::
from_annotation
(
&
annotated
)
{
response_collector
.observe_current_osl
(
metrics
.output_tokens
);
response_collector
.observe_current_osl
(
metrics
.output_tokens
);
response_collector
.observe_cached_tokens
(
metrics
.cached_tokens
);
response_collector
.observe_cached_tokens
(
metrics
.cached_tokens
);
response_collector
.set_worker_info
(
metrics
.prefill_worker_id
,
metrics
.prefill_dp_rank
,
metrics
.prefill_worker_type
,
metrics
.decode_worker_id
,
metrics
.decode_dp_rank
,
metrics
.decode_worker_type
,
);
// Drop http_queue_guard on first token for streaming
// Drop http_queue_guard on first token for streaming
if
response_collector
.is_first_token
()
if
response_collector
.is_first_token
()
...
@@ -1526,6 +1697,12 @@ mod tests {
...
@@ -1526,6 +1697,12 @@ mod tests {
output_tokens
:
20
,
output_tokens
:
20
,
chunk_tokens
:
5
,
chunk_tokens
:
5
,
cached_tokens
:
Some
(
15
),
cached_tokens
:
Some
(
15
),
prefill_worker_id
:
None
,
prefill_dp_rank
:
None
,
prefill_worker_type
:
None
,
decode_worker_id
:
None
,
decode_dp_rank
:
None
,
decode_worker_type
:
None
,
};
};
let
annotation
=
llm_metrics
.to_annotation
::
<
()
>
()
.unwrap
();
let
annotation
=
llm_metrics
.to_annotation
::
<
()
>
()
.unwrap
();
...
@@ -1585,6 +1762,12 @@ mod tests {
...
@@ -1585,6 +1762,12 @@ mod tests {
output_tokens
:
20
,
output_tokens
:
20
,
chunk_tokens
:
5
,
chunk_tokens
:
5
,
cached_tokens
:
Some
(
15
),
cached_tokens
:
Some
(
15
),
prefill_worker_id
:
None
,
prefill_dp_rank
:
None
,
prefill_worker_type
:
None
,
decode_worker_id
:
None
,
decode_dp_rank
:
None
,
decode_worker_type
:
None
,
};
};
let
annotation
=
llm_metrics
.to_annotation
::
<
()
>
()
.unwrap
();
let
annotation
=
llm_metrics
.to_annotation
::
<
()
>
()
.unwrap
();
...
...
lib/llm/src/http/service/service_v2.rs
View file @
0c0336e6
...
@@ -15,7 +15,8 @@ use axum::http::Response;
...
@@ -15,7 +15,8 @@ use axum::http::Response;
use
super
::
Metrics
;
use
super
::
Metrics
;
use
super
::
RouteDoc
;
use
super
::
RouteDoc
;
use
super
::
metrics
;
use
super
::
metrics
;
use
crate
::
discovery
::
ModelManager
;
use
super
::
metrics
::
register_worker_timing_metrics
;
use
crate
::
discovery
::{
ModelManager
,
register_worker_load_metrics
};
use
crate
::
endpoint_type
::
EndpointType
;
use
crate
::
endpoint_type
::
EndpointType
;
use
crate
::
request_template
::
RequestTemplate
;
use
crate
::
request_template
::
RequestTemplate
;
use
anyhow
::
Result
;
use
anyhow
::
Result
;
...
@@ -392,6 +393,18 @@ impl HttpServiceConfigBuilder {
...
@@ -392,6 +393,18 @@ impl HttpServiceConfigBuilder {
let
registry
=
metrics
::
Registry
::
new
();
let
registry
=
metrics
::
Registry
::
new
();
state
.metrics_clone
()
.register
(
&
registry
)
?
;
state
.metrics_clone
()
.register
(
&
registry
)
?
;
// Register worker load metrics (active_decode_blocks, active_prefill_tokens per worker)
// These are updated by KvWorkerMonitor when receiving ActiveLoad events
if
let
Err
(
e
)
=
register_worker_load_metrics
(
&
registry
)
{
tracing
::
warn!
(
"Failed to register worker load metrics: {}"
,
e
);
}
// Register worker timing metrics (last_ttft, last_itl per worker)
// These are updated by ResponseMetricCollector when observing TTFT/ITL
if
let
Err
(
e
)
=
register_worker_timing_metrics
(
&
registry
)
{
tracing
::
warn!
(
"Failed to register worker timing metrics: {}"
,
e
);
}
// DEPRECATED: To be removed after custom backends migrate to Dynamo backend.
// DEPRECATED: To be removed after custom backends migrate to Dynamo backend.
// Setup custom backend metrics if configured
// Setup custom backend metrics if configured
let
custom_backend_registry
=
let
custom_backend_registry
=
...
...
lib/llm/src/kv_router.rs
View file @
0c0336e6
...
@@ -333,6 +333,7 @@ pub struct KvRouter {
...
@@ -333,6 +333,7 @@ pub struct KvRouter {
}
}
impl
KvRouter
{
impl
KvRouter
{
#[allow(clippy::too_many_arguments)]
pub
async
fn
new
(
pub
async
fn
new
(
endpoint
:
Endpoint
,
endpoint
:
Endpoint
,
client
:
Client
,
client
:
Client
,
...
@@ -341,6 +342,7 @@ impl KvRouter {
...
@@ -341,6 +342,7 @@ impl KvRouter {
selector
:
Option
<
Box
<
dyn
WorkerSelector
+
Send
+
Sync
>>
,
selector
:
Option
<
Box
<
dyn
WorkerSelector
+
Send
+
Sync
>>
,
kv_router_config
:
Option
<
KvRouterConfig
>
,
kv_router_config
:
Option
<
KvRouterConfig
>
,
router_id
:
u64
,
router_id
:
u64
,
worker_type
:
&
'static
str
,
)
->
Result
<
Self
>
{
)
->
Result
<
Self
>
{
let
kv_router_config
=
kv_router_config
.unwrap_or_default
();
let
kv_router_config
=
kv_router_config
.unwrap_or_default
();
let
component
=
endpoint
.component
();
let
component
=
endpoint
.component
();
...
@@ -382,6 +384,7 @@ impl KvRouter {
...
@@ -382,6 +384,7 @@ impl KvRouter {
selector
,
selector
,
kv_router_config
.router_replica_sync
,
kv_router_config
.router_replica_sync
,
router_id
,
router_id
,
worker_type
,
)
)
.await
?
;
.await
?
;
...
@@ -581,6 +584,12 @@ impl KvRouter {
...
@@ -581,6 +584,12 @@ impl KvRouter {
self
.scheduler
.free
(
request_id
)
.await
self
.scheduler
.free
(
request_id
)
.await
}
}
/// Get the worker type for this router ("prefill" or "decode").
/// Used for Prometheus metric labeling.
pub
fn
worker_type
(
&
self
)
->
&
'static
str
{
self
.scheduler
.worker_type
()
}
pub
async
fn
add_output_block
(
pub
async
fn
add_output_block
(
&
self
,
&
self
,
request_id
:
&
str
,
request_id
:
&
str
,
...
@@ -926,11 +935,13 @@ impl AsyncEngine<SingleIn<PreprocessedRequest>, ManyOut<Annotated<LLMEngineOutpu
...
@@ -926,11 +935,13 @@ impl AsyncEngine<SingleIn<PreprocessedRequest>, ManyOut<Annotated<LLMEngineOutpu
}
}
}
}
// Record metrics in tracker: KV hit rate and worker ID based on phase
// Record metrics in tracker: KV hit rate, worker ID, and worker type based on phase.
// Worker type is stored at routing time to avoid expensive MDC lookups when
// updating Prometheus metrics (TTFT/ITL) later in the response stream.
if
let
Some
(
ref
tracker
)
=
request
.tracker
{
if
let
Some
(
ref
tracker
)
=
request
.tracker
{
let
isl_blocks
=
request
.token_ids
.len
()
.div_ceil
(
block_size
);
let
isl_blocks
=
request
.token_ids
.len
()
.div_ceil
(
block_size
);
tracker
.record_kv_hit
(
overlap_amount
,
isl_blocks
);
tracker
.record_kv_hit
(
overlap_amount
,
isl_blocks
);
tracker
.record_worker
(
instance_id
);
tracker
.record_worker
_full
(
instance_id
,
dp_rank
,
self
.chooser
.worker_type
()
);
}
}
// Handle query-only requests: early return with worker info
// Handle query-only requests: early return with worker info
...
...
lib/llm/src/kv_router/prefill_router.rs
View file @
0c0336e6
...
@@ -24,7 +24,7 @@ use crate::{
...
@@ -24,7 +24,7 @@ use crate::{
kv_router
::{
KvPushRouter
,
KvRouterConfig
,
RouterConfigOverride
},
kv_router
::{
KvPushRouter
,
KvRouterConfig
,
RouterConfigOverride
},
protocols
::
common
::
llm_backend
::{
LLMEngineOutput
,
PreprocessedRequest
},
protocols
::
common
::
llm_backend
::{
LLMEngineOutput
,
PreprocessedRequest
},
protocols
::
common
::
preprocessor
::{
BootstrapInfo
,
PrefillResult
},
protocols
::
common
::
preprocessor
::{
BootstrapInfo
,
PrefillResult
},
protocols
::
common
::
timing
::{
RequestPhase
,
RequestTracker
},
protocols
::
common
::
timing
::{
RequestPhase
,
RequestTracker
,
WORKER_TYPE_PREFILL
},
};
};
/// Errors that can occur during prefill routing
/// Errors that can occur during prefill routing
...
@@ -50,6 +50,8 @@ enum InnerPrefillRouter {
...
@@ -50,6 +50,8 @@ enum InnerPrefillRouter {
/// KV-aware routing using KvPushRouter
/// KV-aware routing using KvPushRouter
KvRouter
(
Arc
<
KvPushRouter
>
),
KvRouter
(
Arc
<
KvPushRouter
>
),
/// Simple routing (RoundRobin, Random, Direct)
/// Simple routing (RoundRobin, Random, Direct)
/// Note: Per-worker metrics (active_prefill_tokens, active_decode_blocks) are only
/// available in KV routing mode where the router has actual bookkeeping.
SimpleRouter
(
Arc
<
PushRouter
<
PreprocessedRequest
,
Annotated
<
LLMEngineOutput
>>>
),
SimpleRouter
(
Arc
<
PushRouter
<
PreprocessedRequest
,
Annotated
<
LLMEngineOutput
>>>
),
}
}
...
@@ -104,6 +106,8 @@ pub struct PrefillRouter {
...
@@ -104,6 +106,8 @@ pub struct PrefillRouter {
cancel_token
:
CancellationToken
,
cancel_token
:
CancellationToken
,
router_mode
:
RouterMode
,
router_mode
:
RouterMode
,
enforce_disagg
:
bool
,
enforce_disagg
:
bool
,
/// Model name used to look up the worker monitor for prefill client registration
model_name
:
String
,
}
}
impl
PrefillRouter
{
impl
PrefillRouter
{
...
@@ -120,6 +124,7 @@ impl PrefillRouter {
...
@@ -120,6 +124,7 @@ impl PrefillRouter {
cancel_token
:
CancellationToken
::
new
(),
cancel_token
:
CancellationToken
::
new
(),
router_mode
,
router_mode
,
enforce_disagg
,
enforce_disagg
,
model_name
:
String
::
new
(),
// Not used for disabled router
})
})
}
}
...
@@ -130,6 +135,7 @@ impl PrefillRouter {
...
@@ -130,6 +135,7 @@ impl PrefillRouter {
kv_cache_block_size
:
u32
,
kv_cache_block_size
:
u32
,
kv_router_config
:
Option
<
KvRouterConfig
>
,
kv_router_config
:
Option
<
KvRouterConfig
>
,
enforce_disagg
:
bool
,
enforce_disagg
:
bool
,
model_name
:
String
,
)
->
Arc
<
Self
>
{
)
->
Arc
<
Self
>
{
let
prefill_router
=
OnceLock
::
new
();
let
prefill_router
=
OnceLock
::
new
();
let
cancel_token
=
CancellationToken
::
new
();
let
cancel_token
=
CancellationToken
::
new
();
...
@@ -141,6 +147,7 @@ impl PrefillRouter {
...
@@ -141,6 +147,7 @@ impl PrefillRouter {
cancel_token
:
cancel_token
.clone
(),
cancel_token
:
cancel_token
.clone
(),
router_mode
,
router_mode
,
enforce_disagg
,
enforce_disagg
,
model_name
,
});
});
// Spawn background task to wait for activation
// Spawn background task to wait for activation
...
@@ -194,14 +201,24 @@ impl PrefillRouter {
...
@@ -194,14 +201,24 @@ impl PrefillRouter {
.await
?
;
.await
?
;
let
inner_router
=
if
self
.router_mode
.is_kv_routing
()
{
let
inner_router
=
if
self
.router_mode
.is_kv_routing
()
{
// Create KV chooser using the endpoint
// Create KV chooser using the endpoint
(this is a prefill router)
let
kv_chooser
=
model_manager
let
kv_chooser
=
model_manager
.kv_chooser_for
(
&
endpoint
,
kv_cache_block_size
,
kv_router_config
)
.kv_chooser_for
(
&
endpoint
,
kv_cache_block_size
,
kv_router_config
,
WORKER_TYPE_PREFILL
,
)
.await
?
;
.await
?
;
// Extract client from kv_chooser to ensure shared state
// Extract client from kv_chooser to ensure shared state
let
client
=
kv_chooser
.client
()
.clone
();
let
client
=
kv_chooser
.client
()
.clone
();
// Register prefill client with worker monitor for TTFT metric cleanup in disaggregated mode
if
let
Some
(
monitor
)
=
model_manager
.get_worker_monitor
(
&
self
.model_name
)
{
monitor
.set_prefill_client
(
client
.clone
());
}
// Build the PushRouter for prefill with KV mode using the shared client
// Build the PushRouter for prefill with KV mode using the shared client
let
push_router
=
PushRouter
::
<
PreprocessedRequest
,
Annotated
<
LLMEngineOutput
>>
::
from_client_with_threshold
(
let
push_router
=
PushRouter
::
<
PreprocessedRequest
,
Annotated
<
LLMEngineOutput
>>
::
from_client_with_threshold
(
client
,
client
,
...
@@ -217,7 +234,14 @@ impl PrefillRouter {
...
@@ -217,7 +234,14 @@ impl PrefillRouter {
// Create client for simple router
// Create client for simple router
let
client
=
endpoint
.client
()
.await
?
;
let
client
=
endpoint
.client
()
.await
?
;
// Register prefill client with worker monitor for TTFT metric cleanup in disaggregated mode
if
let
Some
(
monitor
)
=
model_manager
.get_worker_monitor
(
&
self
.model_name
)
{
monitor
.set_prefill_client
(
client
.clone
());
}
// Create simple push router with the frontend's router mode
// Create simple push router with the frontend's router mode
// Note: Per-worker metrics (active_prefill_tokens, active_decode_blocks) are only
// available in KV routing mode where the router has actual bookkeeping.
let
push_router
=
PushRouter
::
<
PreprocessedRequest
,
Annotated
<
LLMEngineOutput
>>
::
from_client_with_threshold
(
let
push_router
=
PushRouter
::
<
PreprocessedRequest
,
Annotated
<
LLMEngineOutput
>>
::
from_client_with_threshold
(
client
,
client
,
self
.router_mode
,
self
.router_mode
,
...
@@ -325,12 +349,14 @@ impl PrefillRouter {
...
@@ -325,12 +349,14 @@ impl PrefillRouter {
/// If `phase_permit` is provided, it is dropped after the first output is received,
/// If `phase_permit` is provided, it is dropped after the first output is received,
/// allowing subsequent `set_phase` calls to proceed. This is used in the bootstrap
/// allowing subsequent `set_phase` calls to proceed. This is used in the bootstrap
/// optimization path to ensure `record_worker` completes before the phase changes.
/// optimization path to ensure `record_worker` completes before the phase changes.
///
/// Returns (PrefillResult, Option<(worker_id, dp_rank)>).
async
fn
execute_prefill
(
async
fn
execute_prefill
(
router
:
Option
<
InnerPrefillRouter
>
,
router
:
Option
<
InnerPrefillRouter
>
,
request
:
SingleIn
<
PreprocessedRequest
>
,
request
:
SingleIn
<
PreprocessedRequest
>
,
target_worker
:
Option
<
u64
>
,
target_worker
:
Option
<
u64
>
,
phase_permit
:
Option
<
OwnedSemaphorePermit
>
,
phase_permit
:
Option
<
OwnedSemaphorePermit
>
,
)
->
Result
<
(
PrefillResult
,
Option
<
u64
>
),
PrefillError
>
{
)
->
Result
<
(
PrefillResult
,
Option
<
(
u64
,
u32
)
>
),
PrefillError
>
{
let
router
=
router
.ok_or
(
PrefillError
::
NotActivated
)
?
;
let
router
=
router
.ok_or
(
PrefillError
::
NotActivated
)
?
;
let
mut
prefill_response
=
router
let
mut
prefill_response
=
router
.generate_to_worker
(
request
,
target_worker
)
.generate_to_worker
(
request
,
target_worker
)
...
@@ -382,20 +408,27 @@ impl PrefillRouter {
...
@@ -382,20 +408,27 @@ impl PrefillRouter {
));
));
};
};
// Extract prefill worker ID from disaggregated_params
// Extract prefill worker ID and dp_rank from disaggregated_params
let
prefill_worker_id
=
disaggregated_params
let
prefill_worker_info
=
.get
(
"worker_id"
)
disaggregated_params
.and_then
(|
worker_id_json
|
{
.get
(
"worker_id"
)
worker_id_json
.and_then
(|
worker_id_json
|
{
.get
(
"prefill_worker_id"
)
let
worker_id
=
worker_id_json
.and_then
(|
v
|
v
.as_u64
())
.get
(
"prefill_worker_id"
)
});
.and_then
(|
v
|
v
.as_u64
())
?
;
let
dp_rank
=
worker_id_json
.get
(
"prefill_dp_rank"
)
.and_then
(|
v
|
v
.as_u64
())
.map
(|
r
|
r
as
u32
)
.unwrap_or
(
0
);
Some
((
worker_id
,
dp_rank
))
});
Ok
((
Ok
((
PrefillResult
{
PrefillResult
{
disaggregated_params
,
disaggregated_params
,
prompt_tokens_details
,
prompt_tokens_details
,
},
},
prefill_worker_i
d
,
prefill_worker_i
nfo
,
))
))
}
}
...
@@ -437,14 +470,16 @@ impl PrefillRouter {
...
@@ -437,14 +470,16 @@ impl PrefillRouter {
);
);
}
}
/// Call the prefill router and extract structured prefill result
and
worker ID.
/// Call the prefill router and extract structured prefill result
,
worker ID
, and dp_rank
.
///
///
/// This is the synchronous prefill path - we wait for prefill to complete before proceeding.
/// This is the synchronous prefill path - we wait for prefill to complete before proceeding.
/// No phase permit is needed since `record_worker` completes before we return.
/// No phase permit is needed since `record_worker` completes before we return.
///
/// Returns (PrefillResult, Option<(worker_id, dp_rank)>).
async
fn
call_prefill
(
async
fn
call_prefill
(
&
self
,
&
self
,
request
:
SingleIn
<
PreprocessedRequest
>
,
request
:
SingleIn
<
PreprocessedRequest
>
,
)
->
Result
<
(
PrefillResult
,
Option
<
u64
>
),
PrefillError
>
{
)
->
Result
<
(
PrefillResult
,
Option
<
(
u64
,
u32
)
>
),
PrefillError
>
{
// For call_prefill path, routing is handled by the router itself (no direct routing needed)
// For call_prefill path, routing is handled by the router itself (no direct routing needed)
// No phase permit needed - we wait for completion before changing phase
// No phase permit needed - we wait for completion before changing phase
Self
::
execute_prefill
(
self
.prefill_router
.get
()
.cloned
(),
request
,
None
,
None
)
.await
Self
::
execute_prefill
(
self
.prefill_router
.get
()
.cloned
(),
request
,
None
,
None
)
.await
...
@@ -522,6 +557,14 @@ impl
...
@@ -522,6 +557,14 @@ impl
router
.select_next_worker
();
router
.select_next_worker
();
}
}
// Record prefill worker on the main request's tracker for metrics.
// (The cloned prefill_req has its own tracker, so we need to record here)
// Worker type is stored at routing time to avoid expensive MDC lookups when
// updating Prometheus TTFT metrics later in the response stream.
if
let
Some
(
ref
tracker
)
=
req
.tracker
{
tracker
.record_prefill_worker_full
(
worker_id
,
dp_rank
,
WORKER_TYPE_PREFILL
);
}
let
routing
=
prefill_req
.routing_mut
();
let
routing
=
prefill_req
.routing_mut
();
routing
.prefill_worker_id
=
Some
(
worker_id
);
routing
.prefill_worker_id
=
Some
(
worker_id
);
routing
.dp_rank
=
Some
(
dp_rank
);
routing
.dp_rank
=
Some
(
dp_rank
);
...
@@ -546,9 +589,21 @@ impl
...
@@ -546,9 +589,21 @@ impl
let
prefill_context
=
Context
::
with_id
(
prefill_req
,
request_id
.clone
());
let
prefill_context
=
Context
::
with_id
(
prefill_req
,
request_id
.clone
());
engine_ctx
.link_child
(
prefill_context
.context
());
engine_ctx
.link_child
(
prefill_context
.context
());
self
.call_prefill
(
prefill_context
)
let
result
=
self
.call_prefill
(
prefill_context
)
.await
;
.await
.map
(|(
result
,
worker_id
)|
(
Some
(
result
),
worker_id
,
None
))
// Record prefill worker on the main request's tracker for metrics.
// (call_prefill returns the worker_id and dp_rank from the prefill routing)
// Worker type is stored at routing time to avoid expensive MDC lookups when
// updating Prometheus TTFT metrics later in the response stream.
if
let
Ok
((
_
,
Some
((
worker_id
,
dp_rank
))))
=
&
result
&&
let
Some
(
ref
tracker
)
=
req
.tracker
{
tracker
.record_prefill_worker_full
(
*
worker_id
,
*
dp_rank
,
WORKER_TYPE_PREFILL
);
}
result
.map
(|(
result
,
worker_info
)|
{
(
Some
(
result
),
worker_info
.map
(|(
id
,
_
)|
id
),
None
)
})
}
}
}
}
.instrument
(
tracing
::
info_span!
(
"prefill_routing"
))
.instrument
(
tracing
::
info_span!
(
"prefill_routing"
))
...
...
lib/llm/src/kv_router/scheduler.rs
View file @
0c0336e6
...
@@ -101,6 +101,7 @@ impl KvScheduler {
...
@@ -101,6 +101,7 @@ impl KvScheduler {
selector
:
Option
<
Box
<
dyn
WorkerSelector
+
Send
+
Sync
>>
,
selector
:
Option
<
Box
<
dyn
WorkerSelector
+
Send
+
Sync
>>
,
replica_sync
:
bool
,
replica_sync
:
bool
,
router_id
:
u64
,
router_id
:
u64
,
worker_type
:
&
'static
str
,
)
->
Result
<
Self
,
KvSchedulerError
>
{
)
->
Result
<
Self
,
KvSchedulerError
>
{
let
selector
=
selector
.unwrap_or
(
Box
::
new
(
DefaultWorkerSelector
::
default
()));
let
selector
=
selector
.unwrap_or
(
Box
::
new
(
DefaultWorkerSelector
::
default
()));
...
@@ -119,6 +120,7 @@ impl KvScheduler {
...
@@ -119,6 +120,7 @@ impl KvScheduler {
initial_workers
,
initial_workers
,
replica_sync
,
replica_sync
,
router_id
,
router_id
,
worker_type
,
)
)
.await
.await
.map_err
(|
e
|
KvSchedulerError
::
InitFailed
(
e
.to_string
()))
?
,
.map_err
(|
e
|
KvSchedulerError
::
InitFailed
(
e
.to_string
()))
?
,
...
@@ -345,6 +347,12 @@ impl KvScheduler {
...
@@ -345,6 +347,12 @@ impl KvScheduler {
self
.slots
.free
(
&
request_id
.to_string
())
.await
self
.slots
.free
(
&
request_id
.to_string
())
.await
}
}
/// Get the worker type for this scheduler ("prefill" or "decode").
/// Used for Prometheus metric labeling.
pub
fn
worker_type
(
&
self
)
->
&
'static
str
{
self
.slots
.worker_type
()
}
pub
async
fn
add_output_block
(
pub
async
fn
add_output_block
(
&
self
,
&
self
,
request_id
:
&
str
,
request_id
:
&
str
,
...
...
lib/llm/src/kv_router/sequence.rs
View file @
0c0336e6
...
@@ -40,6 +40,7 @@ use uuid::Uuid;
...
@@ -40,6 +40,7 @@ use uuid::Uuid;
use
super
::
protocols
::{
use
super
::
protocols
::{
ActiveLoad
,
ActiveSequenceEvent
,
ActiveSequenceEventData
,
WorkerWithDpRank
,
ActiveLoad
,
ActiveSequenceEvent
,
ActiveSequenceEventData
,
WorkerWithDpRank
,
};
};
use
crate
::
discovery
::{
WORKER_ACTIVE_DECODE_BLOCKS_GAUGE
,
WORKER_ACTIVE_PREFILL_TOKENS_GAUGE
};
use
crate
::
kv_router
::{
ACTIVE_SEQUENCES_SUBJECT
,
KV_METRICS_SUBJECT
};
use
crate
::
kv_router
::{
ACTIVE_SEQUENCES_SUBJECT
,
KV_METRICS_SUBJECT
};
use
crate
::
local_model
::
runtime_config
::
ModelRuntimeConfig
;
use
crate
::
local_model
::
runtime_config
::
ModelRuntimeConfig
;
use
dynamo_runtime
::
CancellationToken
;
use
dynamo_runtime
::
CancellationToken
;
...
@@ -415,6 +416,8 @@ pub struct ActiveSequencesMultiWorker {
...
@@ -415,6 +416,8 @@ pub struct ActiveSequencesMultiWorker {
/// Publisher for metrics (namespace-scoped)
/// Publisher for metrics (namespace-scoped)
metrics_publisher
:
EventPublisher
,
metrics_publisher
:
EventPublisher
,
replica_sync
:
bool
,
replica_sync
:
bool
,
/// Worker type for Prometheus metrics labeling ("prefill" or "decode")
worker_type
:
&
'static
str
,
}
}
impl
ActiveSequencesMultiWorker
{
impl
ActiveSequencesMultiWorker
{
...
@@ -424,6 +427,7 @@ impl ActiveSequencesMultiWorker {
...
@@ -424,6 +427,7 @@ impl ActiveSequencesMultiWorker {
workers_with_configs
:
HashMap
<
u64
,
Option
<
ModelRuntimeConfig
>>
,
workers_with_configs
:
HashMap
<
u64
,
Option
<
ModelRuntimeConfig
>>
,
replica_sync
:
bool
,
replica_sync
:
bool
,
router_id
:
u64
,
router_id
:
u64
,
worker_type
:
&
'static
str
,
)
->
Result
<
Self
>
{
)
->
Result
<
Self
>
{
assert
!
(
block_size
>
1
,
"block_size must be greater than 1"
);
assert
!
(
block_size
>
1
,
"block_size must be greater than 1"
);
...
@@ -462,6 +466,7 @@ impl ActiveSequencesMultiWorker {
...
@@ -462,6 +466,7 @@ impl ActiveSequencesMultiWorker {
metrics_publisher
,
metrics_publisher
,
router_id
,
router_id
,
replica_sync
,
replica_sync
,
worker_type
,
};
};
// Start the subscription loop only if replica_sync is enabled
// Start the subscription loop only if replica_sync is enabled
...
@@ -1045,7 +1050,25 @@ impl ActiveSequencesMultiWorker {
...
@@ -1045,7 +1050,25 @@ impl ActiveSequencesMultiWorker {
}
}
};
};
// Publish ActiveLoad
// Update Prometheus gauges directly (router's own bookkeeping)
let
worker_id_str
=
worker
.worker_id
.to_string
();
let
dp_rank_str
=
worker
.dp_rank
.to_string
();
WORKER_ACTIVE_DECODE_BLOCKS_GAUGE
.with_label_values
(
&
[
worker_id_str
.as_str
(),
dp_rank_str
.as_str
(),
self
.worker_type
,
])
.set
(
active_blocks
as
i64
);
WORKER_ACTIVE_PREFILL_TOKENS_GAUGE
.with_label_values
(
&
[
worker_id_str
.as_str
(),
dp_rank_str
.as_str
(),
self
.worker_type
,
])
.set
(
active_tokens
as
i64
);
// Also publish ActiveLoad to NATS for other subscribers (if NATS is available)
let
active_load
=
ActiveLoad
{
let
active_load
=
ActiveLoad
{
worker_id
:
worker
.worker_id
,
worker_id
:
worker
.worker_id
,
dp_rank
:
worker
.dp_rank
,
dp_rank
:
worker
.dp_rank
,
...
@@ -1054,7 +1077,8 @@ impl ActiveSequencesMultiWorker {
...
@@ -1054,7 +1077,8 @@ impl ActiveSequencesMultiWorker {
};
};
if
let
Err
(
e
)
=
self
.metrics_publisher
.publish
(
&
active_load
)
.await
{
if
let
Err
(
e
)
=
self
.metrics_publisher
.publish
(
&
active_load
)
.await
{
tracing
::
warn!
(
"Failed to publish ActiveLoad for worker {worker:?}: {e:?}"
);
// This is expected if NATS is not available - the local gauge update above already succeeded
tracing
::
trace!
(
"Failed to publish ActiveLoad to NATS for worker {worker:?}: {e:?}"
);
}
}
}
}
...
@@ -1063,6 +1087,12 @@ impl ActiveSequencesMultiWorker {
...
@@ -1063,6 +1087,12 @@ impl ActiveSequencesMultiWorker {
self
.senders
.len
()
self
.senders
.len
()
}
}
/// Get the worker type for this router ("prefill" or "decode").
/// Used for Prometheus metric labeling.
pub
fn
worker_type
(
&
self
)
->
&
'static
str
{
self
.worker_type
}
/// Generic method to query all workers with a given command
/// Generic method to query all workers with a given command
async
fn
query_workers
<
T
:
Send
+
'static
>
(
async
fn
query_workers
<
T
:
Send
+
'static
>
(
&
self
,
&
self
,
...
@@ -1301,12 +1331,20 @@ mod tests {
...
@@ -1301,12 +1331,20 @@ mod tests {
workers_with_configs
.clone
(),
workers_with_configs
.clone
(),
true
,
true
,
1
,
1
,
crate
::
discovery
::
WORKER_TYPE_DECODE
,
)
)
.await
?
,
.await
?
,
);
);
let
seq_manager_2
=
Arc
::
new
(
let
seq_manager_2
=
Arc
::
new
(
ActiveSequencesMultiWorker
::
new
(
component
,
block_size
,
workers_with_configs
,
true
,
2
)
ActiveSequencesMultiWorker
::
new
(
.await
?
,
component
,
block_size
,
workers_with_configs
,
true
,
2
,
crate
::
discovery
::
WORKER_TYPE_DECODE
,
)
.await
?
,
);
);
// Give some time for the subscription loops to start
// Give some time for the subscription loops to start
...
@@ -1463,12 +1501,20 @@ mod tests {
...
@@ -1463,12 +1501,20 @@ mod tests {
workers_with_configs
.clone
(),
workers_with_configs
.clone
(),
true
,
true
,
1
,
1
,
crate
::
discovery
::
WORKER_TYPE_DECODE
,
)
)
.await
?
,
.await
?
,
);
);
let
seq_manager_2
=
Arc
::
new
(
let
seq_manager_2
=
Arc
::
new
(
ActiveSequencesMultiWorker
::
new
(
component
,
block_size
,
workers_with_configs
,
true
,
2
)
ActiveSequencesMultiWorker
::
new
(
.await
?
,
component
,
block_size
,
workers_with_configs
,
true
,
2
,
crate
::
discovery
::
WORKER_TYPE_DECODE
,
)
.await
?
,
);
);
// Give some time for the subscription loops to start
// Give some time for the subscription loops to start
...
...
lib/llm/src/preprocessor.rs
View file @
0c0336e6
...
@@ -72,6 +72,26 @@ pub struct LLMMetricAnnotation {
...
@@ -72,6 +72,26 @@ pub struct LLMMetricAnnotation {
pub
output_tokens
:
usize
,
pub
output_tokens
:
usize
,
pub
chunk_tokens
:
usize
,
pub
chunk_tokens
:
usize
,
pub
cached_tokens
:
Option
<
usize
>
,
pub
cached_tokens
:
Option
<
usize
>
,
/// Prefill worker ID (for TTFT attribution in disaggregated mode)
#[serde(default,
skip_serializing_if
=
"Option::is_none"
)]
pub
prefill_worker_id
:
Option
<
u64
>
,
/// Prefill worker DP rank
#[serde(default,
skip_serializing_if
=
"Option::is_none"
)]
pub
prefill_dp_rank
:
Option
<
u32
>
,
/// Prefill worker type ("prefill" or "decode") for Prometheus metric labeling.
/// Stored at routing time to avoid expensive MDC lookup when updating TTFT metrics.
#[serde(default,
skip_serializing_if
=
"Option::is_none"
)]
pub
prefill_worker_type
:
Option
<
String
>
,
/// Decode worker ID (for ITL attribution in disaggregated mode)
#[serde(default,
skip_serializing_if
=
"Option::is_none"
)]
pub
decode_worker_id
:
Option
<
u64
>
,
/// Decode worker DP rank
#[serde(default,
skip_serializing_if
=
"Option::is_none"
)]
pub
decode_dp_rank
:
Option
<
u32
>
,
/// Decode worker type ("prefill" or "decode") for Prometheus metric labeling.
/// Stored at routing time to avoid expensive MDC lookup when updating ITL metrics.
#[serde(default,
skip_serializing_if
=
"Option::is_none"
)]
pub
decode_worker_type
:
Option
<
String
>
,
}
}
impl
LLMMetricAnnotation
{
impl
LLMMetricAnnotation
{
...
@@ -657,12 +677,32 @@ impl OpenAIPreprocessor {
...
@@ -657,12 +677,32 @@ impl OpenAIPreprocessor {
.map_err
(|
e
|
e
.to_string
())
.map_err
(|
e
|
e
.to_string
())
});
});
// Create LLM metrics annotation
// Create LLM metrics annotation with prefill/decode worker info from tracker.
// Worker types are stored at routing time to avoid expensive MDC lookup.
let
tracker
=
inner
.response_generator
.tracker
();
let
prefill_worker_id
=
tracker
.as_ref
()
.and_then
(|
t
|
t
.prefill_worker_id
());
let
prefill_dp_rank
=
tracker
.as_ref
()
.and_then
(|
t
|
t
.prefill_dp_rank
());
let
prefill_worker_type
=
tracker
.as_ref
()
.and_then
(|
t
|
t
.prefill_worker_type
())
.map
(
String
::
from
);
let
decode_worker_id
=
tracker
.as_ref
()
.and_then
(|
t
|
t
.decode_worker_id
());
let
decode_dp_rank
=
tracker
.as_ref
()
.and_then
(|
t
|
t
.decode_dp_rank
());
let
decode_worker_type
=
tracker
.as_ref
()
.and_then
(|
t
|
t
.decode_worker_type
())
.map
(
String
::
from
);
let
llm_metrics
=
LLMMetricAnnotation
{
let
llm_metrics
=
LLMMetricAnnotation
{
input_tokens
:
isl
,
input_tokens
:
isl
,
output_tokens
:
current_osl
,
output_tokens
:
current_osl
,
chunk_tokens
,
chunk_tokens
,
cached_tokens
:
None
,
cached_tokens
:
None
,
prefill_worker_id
,
prefill_dp_rank
,
prefill_worker_type
,
decode_worker_id
,
decode_dp_rank
,
decode_worker_type
,
};
};
if
let
Ok
(
metrics_annotated
)
=
llm_metrics
.to_annotation
::
<
()
>
()
{
if
let
Ok
(
metrics_annotated
)
=
llm_metrics
.to_annotation
::
<
()
>
()
{
...
@@ -695,6 +735,20 @@ impl OpenAIPreprocessor {
...
@@ -695,6 +735,20 @@ impl OpenAIPreprocessor {
let
usage_chunk
=
inner
.response_generator
.create_usage_chunk
();
let
usage_chunk
=
inner
.response_generator
.create_usage_chunk
();
let
usage
=
inner
.response_generator
.get_usage
();
let
usage
=
inner
.response_generator
.get_usage
();
let
tracker
=
inner
.response_generator
.tracker
();
let
prefill_worker_id
=
tracker
.as_ref
()
.and_then
(|
t
|
t
.prefill_worker_id
());
let
prefill_dp_rank
=
tracker
.as_ref
()
.and_then
(|
t
|
t
.prefill_dp_rank
());
let
prefill_worker_type
=
tracker
.as_ref
()
.and_then
(|
t
|
t
.prefill_worker_type
())
.map
(
String
::
from
);
let
decode_worker_id
=
tracker
.as_ref
()
.and_then
(|
t
|
t
.decode_worker_id
());
let
decode_dp_rank
=
tracker
.as_ref
()
.and_then
(|
t
|
t
.decode_dp_rank
());
let
decode_worker_type
=
tracker
.as_ref
()
.and_then
(|
t
|
t
.decode_worker_type
())
.map
(
String
::
from
);
let
llm_metrics
=
LLMMetricAnnotation
{
let
llm_metrics
=
LLMMetricAnnotation
{
input_tokens
:
usage
.prompt_tokens
as
usize
,
input_tokens
:
usage
.prompt_tokens
as
usize
,
output_tokens
:
usage
.completion_tokens
as
usize
,
output_tokens
:
usage
.completion_tokens
as
usize
,
...
@@ -703,6 +757,12 @@ impl OpenAIPreprocessor {
...
@@ -703,6 +757,12 @@ impl OpenAIPreprocessor {
.prompt_tokens_details
.prompt_tokens_details
.as_ref
()
.as_ref
()
.and_then
(|
d
|
d
.cached_tokens
.map
(|
c
|
c
as
usize
)),
.and_then
(|
d
|
d
.cached_tokens
.map
(|
c
|
c
as
usize
)),
prefill_worker_id
,
prefill_dp_rank
,
prefill_worker_type
,
decode_worker_id
,
decode_dp_rank
,
decode_worker_type
,
};
};
// Create annotation string
// Create annotation string
...
...
lib/llm/src/protocols/common/timing.rs
View file @
0c0336e6
...
@@ -6,7 +6,10 @@
...
@@ -6,7 +6,10 @@
//! This module provides [`RequestTracker`] for tracking timing and routing information
//! This module provides [`RequestTracker`] for tracking timing and routing information
//! that can be returned to clients via the `nvext` response field.
//! that can be returned to clients via the `nvext` response field.
use
std
::
sync
::{
Arc
,
OnceLock
};
use
std
::
sync
::{
Arc
,
OnceLock
,
atomic
::{
AtomicU32
,
AtomicU64
,
Ordering
},
};
use
std
::
time
::{
Instant
,
SystemTime
,
UNIX_EPOCH
};
use
std
::
time
::{
Instant
,
SystemTime
,
UNIX_EPOCH
};
use
parking_lot
::
Mutex
;
use
parking_lot
::
Mutex
;
...
@@ -16,6 +19,17 @@ use utoipa::ToSchema;
...
@@ -16,6 +19,17 @@ use utoipa::ToSchema;
use
crate
::
protocols
::
openai
::
nvext
::
WorkerIdInfo
;
use
crate
::
protocols
::
openai
::
nvext
::
WorkerIdInfo
;
/// Sentinel value indicating no worker ID has been set.
/// We use 0 as the sentinel since valid worker IDs are non-zero lease IDs from etcd.
const
NO_WORKER_ID
:
u64
=
0
;
const
NO_DP_RANK
:
u32
=
u32
::
MAX
;
/// Worker type constants for Prometheus metric labels.
/// These are stored in RequestTracker at routing time to avoid costly MDC lookups
/// when updating per-worker metrics (TTFT, ITL).
pub
const
WORKER_TYPE_PREFILL
:
&
str
=
"prefill"
;
pub
const
WORKER_TYPE_DECODE
:
&
str
=
"decode"
;
/// Phase of the request in disaggregated serving.
/// Phase of the request in disaggregated serving.
///
///
/// Used to determine which worker ID field to record when routing.
/// Used to determine which worker ID field to record when routing.
...
@@ -48,10 +62,15 @@ impl std::fmt::Display for RequestPhase {
...
@@ -48,10 +62,15 @@ impl std::fmt::Display for RequestPhase {
/// - `first_token_time`: When the first token was generated (set once via OnceLock)
/// - `first_token_time`: When the first token was generated (set once via OnceLock)
/// - `request_finish_time`: When the request finished (set once via OnceLock)
/// - `request_finish_time`: When the request finished (set once via OnceLock)
/// - KV cache hit rate information
/// - KV cache hit rate information
/// - Worker IDs and types for per-worker Prometheus metrics
///
///
/// The `OnceLock` fields ensure that values are set exactly once,
/// The `OnceLock` fields ensure that values are set exactly once,
/// which is important for disaggregated serving where the "first token"
/// which is important for disaggregated serving where the "first token"
/// might appear multiple times.
/// might appear multiple times.
///
/// Worker IDs use `AtomicU64` instead of `OnceLock<u64>` for lower overhead since
/// the tracker is created for every request. The sentinel value `NO_WORKER_ID` (0)
/// indicates no worker has been recorded yet.
#[derive(Debug)]
#[derive(Debug)]
pub
struct
RequestTracker
{
pub
struct
RequestTracker
{
/// When the request was received (monotonic clock for duration calculations)
/// When the request was received (monotonic clock for duration calculations)
...
@@ -75,11 +94,30 @@ pub struct RequestTracker {
...
@@ -75,11 +94,30 @@ pub struct RequestTracker {
/// Input sequence length in blocks (for hit rate calculation) - set once via OnceLock
/// Input sequence length in blocks (for hit rate calculation) - set once via OnceLock
isl_blocks
:
OnceLock
<
usize
>
,
isl_blocks
:
OnceLock
<
usize
>
,
/// Prefill worker ID (for disaggregated serving) - set once via OnceLock
/// Prefill worker ID (for disaggregated serving).
prefill_worker_id
:
OnceLock
<
u64
>
,
/// Uses atomic with compare-exchange for set-once semantics.
/// Value of 0 (NO_WORKER_ID) means not yet set.
prefill_worker_id
:
AtomicU64
,
/// Prefill DP rank. Value of u32::MAX (NO_DP_RANK) means not yet set.
prefill_dp_rank
:
AtomicU32
,
/// Decode worker ID. Value of 0 (NO_WORKER_ID) means not yet set.
decode_worker_id
:
AtomicU64
,
/// Decode DP rank. Value of u32::MAX (NO_DP_RANK) means not yet set.
decode_dp_rank
:
AtomicU32
,
/// Worker type for the prefill worker ("prefill" or "decode").
/// Stored at routing time to avoid MDC lookup when updating Prometheus metrics.
/// In aggregated mode, this will be "decode" since the same worker handles both.
/// This is necessary because TTFT metrics need to know the worker type label,
/// and looking up MDC by worker_id would require iterating all cards (O(n)).
prefill_worker_type
:
OnceLock
<&
'static
str
>
,
/// Decode worker ID - set once via OnceLock
/// Worker type for the decode worker (always "decode").
decode_worker_id
:
OnceLock
<
u64
>
,
/// Stored for symmetry with prefill_worker_type, though decode is always "decode".
decode_worker_type
:
OnceLock
<&
'static
str
>
,
/// Request phase (Prefill/Decode/Aggregated)
/// Request phase (Prefill/Decode/Aggregated)
phase
:
Mutex
<
RequestPhase
>
,
phase
:
Mutex
<
RequestPhase
>
,
...
@@ -108,8 +146,12 @@ impl RequestTracker {
...
@@ -108,8 +146,12 @@ impl RequestTracker {
request_finish_time
:
OnceLock
::
new
(),
request_finish_time
:
OnceLock
::
new
(),
kv_overlap_blocks
:
OnceLock
::
new
(),
kv_overlap_blocks
:
OnceLock
::
new
(),
isl_blocks
:
OnceLock
::
new
(),
isl_blocks
:
OnceLock
::
new
(),
prefill_worker_id
:
OnceLock
::
new
(),
prefill_worker_id
:
AtomicU64
::
new
(
NO_WORKER_ID
),
decode_worker_id
:
OnceLock
::
new
(),
prefill_dp_rank
:
AtomicU32
::
new
(
NO_DP_RANK
),
decode_worker_id
:
AtomicU64
::
new
(
NO_WORKER_ID
),
decode_dp_rank
:
AtomicU32
::
new
(
NO_DP_RANK
),
prefill_worker_type
:
OnceLock
::
new
(),
decode_worker_type
:
OnceLock
::
new
(),
phase
:
Mutex
::
new
(
RequestPhase
::
Aggregated
),
phase
:
Mutex
::
new
(
RequestPhase
::
Aggregated
),
phase_semaphore
:
Arc
::
new
(
Semaphore
::
new
(
1
)),
phase_semaphore
:
Arc
::
new
(
Semaphore
::
new
(
1
)),
}
}
...
@@ -177,12 +219,82 @@ impl RequestTracker {
...
@@ -177,12 +219,82 @@ impl RequestTracker {
/// Record the prefill worker ID. Returns true if this was the first call.
/// Record the prefill worker ID. Returns true if this was the first call.
pub
fn
record_prefill_worker
(
&
self
,
id
:
u64
)
->
bool
{
pub
fn
record_prefill_worker
(
&
self
,
id
:
u64
)
->
bool
{
self
.prefill_worker_id
.set
(
id
)
.is_ok
()
self
.prefill_worker_id
.compare_exchange
(
NO_WORKER_ID
,
id
,
Ordering
::
SeqCst
,
Ordering
::
SeqCst
)
.is_ok
()
}
/// Record the prefill worker ID and DP rank. Returns true if worker_id was recorded for the first time.
/// Only sets the dp_rank if the worker_id is newly set to avoid mismatched worker_id/dp_rank pairs.
pub
fn
record_prefill_worker_with_rank
(
&
self
,
id
:
u64
,
dp_rank
:
u32
)
->
bool
{
let
is_new
=
self
.prefill_worker_id
.compare_exchange
(
NO_WORKER_ID
,
id
,
Ordering
::
SeqCst
,
Ordering
::
SeqCst
)
.is_ok
();
if
is_new
{
self
.prefill_dp_rank
.store
(
dp_rank
,
Ordering
::
SeqCst
);
}
is_new
}
/// Record the prefill worker ID, DP rank, and worker type.
/// The worker_type is stored to avoid MDC lookup when updating Prometheus metrics.
/// Returns true if worker_id was recorded for the first time.
pub
fn
record_prefill_worker_full
(
&
self
,
id
:
u64
,
dp_rank
:
u32
,
worker_type
:
&
'static
str
,
)
->
bool
{
let
is_new
=
self
.prefill_worker_id
.compare_exchange
(
NO_WORKER_ID
,
id
,
Ordering
::
SeqCst
,
Ordering
::
SeqCst
)
.is_ok
();
if
is_new
{
self
.prefill_dp_rank
.store
(
dp_rank
,
Ordering
::
SeqCst
);
let
_
=
self
.prefill_worker_type
.set
(
worker_type
);
}
is_new
}
}
/// Record the decode worker ID. Returns true if this was the first call.
/// Record the decode worker ID. Returns true if this was the first call.
pub
fn
record_decode_worker
(
&
self
,
id
:
u64
)
->
bool
{
pub
fn
record_decode_worker
(
&
self
,
id
:
u64
)
->
bool
{
self
.decode_worker_id
.set
(
id
)
.is_ok
()
self
.decode_worker_id
.compare_exchange
(
NO_WORKER_ID
,
id
,
Ordering
::
SeqCst
,
Ordering
::
SeqCst
)
.is_ok
()
}
/// Record the decode worker ID and DP rank. Returns true if worker_id was recorded for the first time.
/// Only sets the dp_rank if the worker_id is newly set to avoid mismatched worker_id/dp_rank pairs.
pub
fn
record_decode_worker_with_rank
(
&
self
,
id
:
u64
,
dp_rank
:
u32
)
->
bool
{
let
is_new
=
self
.decode_worker_id
.compare_exchange
(
NO_WORKER_ID
,
id
,
Ordering
::
SeqCst
,
Ordering
::
SeqCst
)
.is_ok
();
if
is_new
{
self
.decode_dp_rank
.store
(
dp_rank
,
Ordering
::
SeqCst
);
}
is_new
}
/// Record the decode worker ID, DP rank, and worker type.
/// The worker_type is stored to avoid MDC lookup when updating Prometheus metrics.
/// Returns true if worker_id was recorded for the first time.
pub
fn
record_decode_worker_full
(
&
self
,
id
:
u64
,
dp_rank
:
u32
,
worker_type
:
&
'static
str
,
)
->
bool
{
let
is_new
=
self
.decode_worker_id
.compare_exchange
(
NO_WORKER_ID
,
id
,
Ordering
::
SeqCst
,
Ordering
::
SeqCst
)
.is_ok
();
if
is_new
{
self
.decode_dp_rank
.store
(
dp_rank
,
Ordering
::
SeqCst
);
let
_
=
self
.decode_worker_type
.set
(
worker_type
);
}
is_new
}
}
/// Set the request phase and return a permit that blocks subsequent phase changes.
/// Set the request phase and return a permit that blocks subsequent phase changes.
...
@@ -230,10 +342,56 @@ impl RequestTracker {
...
@@ -230,10 +342,56 @@ impl RequestTracker {
}
}
}
}
/// Record worker ID and DP rank based on the current phase.
///
/// - Prefill phase: records as prefill_worker_id/prefill_dp_rank
/// - Decode phase: records as decode_worker_id/decode_dp_rank
/// - Aggregated phase: records as both prefill and decode worker/rank
pub
fn
record_worker_with_rank
(
&
self
,
instance_id
:
u64
,
dp_rank
:
u32
)
{
match
self
.phase
()
{
RequestPhase
::
Prefill
=>
{
self
.record_prefill_worker_with_rank
(
instance_id
,
dp_rank
);
}
RequestPhase
::
Decode
=>
{
self
.record_decode_worker_with_rank
(
instance_id
,
dp_rank
);
}
RequestPhase
::
Aggregated
=>
{
self
.record_prefill_worker_with_rank
(
instance_id
,
dp_rank
);
self
.record_decode_worker_with_rank
(
instance_id
,
dp_rank
);
}
}
}
/// Record worker ID, DP rank, and worker type based on the current phase.
///
/// This is the preferred method when worker_type is known (from MDC or router config),
/// as it stores the worker_type for later use in Prometheus metric updates without
/// requiring an expensive MDC lookup.
///
/// - Prefill phase: records as prefill worker with given worker_type
/// - Decode phase: records as decode worker with given worker_type
/// - Aggregated phase: records as both prefill and decode worker with the same worker_type
pub
fn
record_worker_full
(
&
self
,
instance_id
:
u64
,
dp_rank
:
u32
,
worker_type
:
&
'static
str
)
{
match
self
.phase
()
{
RequestPhase
::
Prefill
=>
{
self
.record_prefill_worker_full
(
instance_id
,
dp_rank
,
worker_type
);
}
RequestPhase
::
Decode
=>
{
self
.record_decode_worker_full
(
instance_id
,
dp_rank
,
worker_type
);
}
RequestPhase
::
Aggregated
=>
{
// In aggregated mode, both prefill and decode happen on the same worker,
// so we record the same worker_type for both
self
.record_prefill_worker_full
(
instance_id
,
dp_rank
,
worker_type
);
self
.record_decode_worker_full
(
instance_id
,
dp_rank
,
worker_type
);
}
}
}
/// Get worker ID information if any worker IDs have been recorded.
/// Get worker ID information if any worker IDs have been recorded.
pub
fn
get_worker_info
(
&
self
)
->
Option
<
WorkerIdInfo
>
{
pub
fn
get_worker_info
(
&
self
)
->
Option
<
WorkerIdInfo
>
{
let
prefill
=
self
.prefill_worker_id
.get
()
.copied
();
let
prefill
=
self
.prefill_worker_id
();
let
decode
=
self
.decode_worker_id
.get
()
.copied
();
let
decode
=
self
.decode_worker_id
();
if
prefill
.is_none
()
&&
decode
.is_none
()
{
if
prefill
.is_none
()
&&
decode
.is_none
()
{
return
None
;
return
None
;
...
@@ -241,10 +399,46 @@ impl RequestTracker {
...
@@ -241,10 +399,46 @@ impl RequestTracker {
Some
(
WorkerIdInfo
{
Some
(
WorkerIdInfo
{
prefill_worker_id
:
prefill
,
prefill_worker_id
:
prefill
,
prefill_dp_rank
:
self
.prefill_dp_rank
(),
decode_worker_id
:
decode
,
decode_worker_id
:
decode
,
decode_dp_rank
:
self
.decode_dp_rank
(),
})
})
}
}
/// Get the decode worker ID if recorded.
pub
fn
decode_worker_id
(
&
self
)
->
Option
<
u64
>
{
let
id
=
self
.decode_worker_id
.load
(
Ordering
::
SeqCst
);
if
id
==
NO_WORKER_ID
{
None
}
else
{
Some
(
id
)
}
}
/// Get the decode DP rank if recorded.
pub
fn
decode_dp_rank
(
&
self
)
->
Option
<
u32
>
{
let
rank
=
self
.decode_dp_rank
.load
(
Ordering
::
SeqCst
);
if
rank
==
NO_DP_RANK
{
None
}
else
{
Some
(
rank
)
}
}
/// Get the prefill worker ID if recorded.
pub
fn
prefill_worker_id
(
&
self
)
->
Option
<
u64
>
{
let
id
=
self
.prefill_worker_id
.load
(
Ordering
::
SeqCst
);
if
id
==
NO_WORKER_ID
{
None
}
else
{
Some
(
id
)
}
}
/// Get the prefill DP rank if recorded.
pub
fn
prefill_dp_rank
(
&
self
)
->
Option
<
u32
>
{
let
rank
=
self
.prefill_dp_rank
.load
(
Ordering
::
SeqCst
);
if
rank
==
NO_DP_RANK
{
None
}
else
{
Some
(
rank
)
}
}
/// Get the prefill worker type if recorded.
pub
fn
prefill_worker_type
(
&
self
)
->
Option
<&
'static
str
>
{
self
.prefill_worker_type
.get
()
.copied
()
}
/// Get the decode worker type if recorded.
pub
fn
decode_worker_type
(
&
self
)
->
Option
<&
'static
str
>
{
self
.decode_worker_type
.get
()
.copied
()
}
pub
fn
get_timing_info
(
&
self
)
->
TimingInfo
{
pub
fn
get_timing_info
(
&
self
)
->
TimingInfo
{
TimingInfo
{
TimingInfo
{
request_received_ms
:
self
.request_received_epoch_ms
,
request_received_ms
:
self
.request_received_epoch_ms
,
...
...
lib/llm/src/protocols/openai.rs
View file @
0c0336e6
...
@@ -232,6 +232,11 @@ pub trait DeltaGeneratorExt<ResponseType: Send + 'static + std::fmt::Debug>:
...
@@ -232,6 +232,11 @@ pub trait DeltaGeneratorExt<ResponseType: Send + 'static + std::fmt::Debug>:
/// Get the current usage statistics with properly calculated total_tokens.
/// Get the current usage statistics with properly calculated total_tokens.
fn
get_usage
(
&
self
)
->
dynamo_async_openai
::
types
::
CompletionUsage
;
fn
get_usage
(
&
self
)
->
dynamo_async_openai
::
types
::
CompletionUsage
;
/// Returns the request tracker if available, for accessing worker timing metrics.
fn
tracker
(
&
self
)
->
Option
<
std
::
sync
::
Arc
<
common
::
timing
::
RequestTracker
>>
{
None
}
}
}
#[derive(Clone,
Debug,
Serialize,
Deserialize,
Default)]
#[derive(Clone,
Debug,
Serialize,
Deserialize,
Default)]
...
...
lib/llm/src/protocols/openai/chat_completions/delta.rs
View file @
0c0336e6
...
@@ -153,12 +153,9 @@ impl DeltaGenerator {
...
@@ -153,12 +153,9 @@ impl DeltaGenerator {
let
chatcmpl_id
=
format!
(
"chatcmpl-{request_id}"
);
let
chatcmpl_id
=
format!
(
"chatcmpl-{request_id}"
);
// Create request tracker if tracking is enabled
// Always create request tracker for per-worker metrics (TTFT, ITL per worker_id).
let
tracker
=
if
options
.enable_tracking
{
// The enable_tracking option only controls whether timing info is included in the response.
Some
(
Arc
::
new
(
RequestTracker
::
new
()))
let
tracker
=
Some
(
Arc
::
new
(
RequestTracker
::
new
()));
}
else
{
None
};
Self
{
Self
{
id
:
chatcmpl_id
,
id
:
chatcmpl_id
,
...
@@ -504,6 +501,10 @@ impl crate::protocols::openai::DeltaGeneratorExt<NvCreateChatCompletionStreamRes
...
@@ -504,6 +501,10 @@ impl crate::protocols::openai::DeltaGeneratorExt<NvCreateChatCompletionStreamRes
fn
get_usage
(
&
self
)
->
dynamo_async_openai
::
types
::
CompletionUsage
{
fn
get_usage
(
&
self
)
->
dynamo_async_openai
::
types
::
CompletionUsage
{
DeltaGenerator
::
get_usage
(
self
)
DeltaGenerator
::
get_usage
(
self
)
}
}
fn
tracker
(
&
self
)
->
Option
<
std
::
sync
::
Arc
<
crate
::
protocols
::
common
::
timing
::
RequestTracker
>>
{
self
.tracker
.clone
()
}
}
}
#[cfg(test)]
#[cfg(test)]
...
...
lib/llm/src/protocols/openai/completions/delta.rs
View file @
0c0336e6
...
@@ -120,12 +120,9 @@ impl DeltaGenerator {
...
@@ -120,12 +120,9 @@ impl DeltaGenerator {
let
completion_id
=
format!
(
"cmpl-{request_id}"
);
let
completion_id
=
format!
(
"cmpl-{request_id}"
);
// Create request tracker if tracking is enabled
// Always create request tracker for per-worker metrics (TTFT, ITL per worker_id).
let
tracker
=
if
options
.enable_tracking
{
// The enable_tracking option only controls whether timing info is included in the response.
Some
(
Arc
::
new
(
RequestTracker
::
new
()))
let
tracker
=
Some
(
Arc
::
new
(
RequestTracker
::
new
()));
}
else
{
None
};
Self
{
Self
{
id
:
completion_id
,
id
:
completion_id
,
...
@@ -398,4 +395,8 @@ impl crate::protocols::openai::DeltaGeneratorExt<NvCreateCompletionResponse> for
...
@@ -398,4 +395,8 @@ impl crate::protocols::openai::DeltaGeneratorExt<NvCreateCompletionResponse> for
fn
get_usage
(
&
self
)
->
dynamo_async_openai
::
types
::
CompletionUsage
{
fn
get_usage
(
&
self
)
->
dynamo_async_openai
::
types
::
CompletionUsage
{
DeltaGenerator
::
get_usage
(
self
)
DeltaGenerator
::
get_usage
(
self
)
}
}
fn
tracker
(
&
self
)
->
Option
<
std
::
sync
::
Arc
<
crate
::
protocols
::
common
::
timing
::
RequestTracker
>>
{
self
.tracker
.clone
()
}
}
}
Prev
1
2
Next
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment