Unverified Commit adfc02d5 authored by Karen Chung's avatar Karen Chung Committed by GitHub
Browse files

feat: (Router) Guard threshold-based rejection logic when CLI thresholds are unset (#8333)

parent 8fd7de9a
...@@ -45,6 +45,20 @@ def validate_model_path(value: str) -> str: ...@@ -45,6 +45,20 @@ def validate_model_path(value: str) -> str:
return value return value
def _nullable_float(value: str) -> Optional[float]:
"""Parse a float, or return None for the literal 'None'."""
if value is None or value == "None":
return None
return float(value)
def _nullable_int(value: str) -> Optional[int]:
"""Parse an int, or return None for the literal 'None'."""
if value is None or value == "None":
return None
return int(value)
class FrontendConfig(KvRouterConfigBase, AicPerfConfigBase): class FrontendConfig(KvRouterConfigBase, AicPerfConfigBase):
"""Configuration for the Dynamo frontend.""" """Configuration for the Dynamo frontend."""
...@@ -331,36 +345,37 @@ class FrontendArgGroup(ArgGroup): ...@@ -331,36 +345,37 @@ class FrontendArgGroup(ArgGroup):
g, g,
flag_name="--active-decode-blocks-threshold", flag_name="--active-decode-blocks-threshold",
env_var="DYN_ACTIVE_DECODE_BLOCKS_THRESHOLD", env_var="DYN_ACTIVE_DECODE_BLOCKS_THRESHOLD",
default=None, default=1.0,
help=( help=(
"Threshold percentage (0.0-1.0) for determining when a worker is considered busy " "Threshold fraction (0.0-1.0) of KV cache block utilization above which a worker "
"based on KV cache block utilization. If not set, blocks-based busy detection is disabled." "is considered busy. Pass 'None' on the CLI to disable this check. Default: 1.0."
), ),
arg_type=float, arg_type=_nullable_float,
) )
add_argument( add_argument(
g, g,
flag_name="--active-prefill-tokens-threshold", flag_name="--active-prefill-tokens-threshold",
env_var="DYN_ACTIVE_PREFILL_TOKENS_THRESHOLD", env_var="DYN_ACTIVE_PREFILL_TOKENS_THRESHOLD",
default=None, default=10_000_000,
help=( help=(
"Literal token count threshold for determining when a worker is considered busy " "Literal token count threshold for determining when a worker is considered busy "
"based on prefill token utilization. When active prefill tokens exceed this " "based on prefill token utilization. When active prefill tokens exceed this "
"threshold, the worker is marked as busy. If not set, tokens-based busy detection is disabled." "threshold, the worker is marked as busy. Pass 'None' on the CLI to disable this "
"check. Uses OR logic with --active-prefill-tokens-threshold-frac. Default: 10000000."
), ),
arg_type=int, arg_type=_nullable_int,
) )
add_argument( add_argument(
g, g,
flag_name="--active-prefill-tokens-threshold-frac", flag_name="--active-prefill-tokens-threshold-frac",
env_var="DYN_ACTIVE_PREFILL_TOKENS_THRESHOLD_FRAC", env_var="DYN_ACTIVE_PREFILL_TOKENS_THRESHOLD_FRAC",
default=None, default=10.0,
help=( help=(
"Fraction of max_num_batched_tokens for busy detection. Worker is busy when " "Fraction of max_num_batched_tokens for busy detection. Worker is busy when "
"active_prefill_tokens > frac * max_num_batched_tokens. Default 1.5 (disabled). " "active_prefill_tokens > frac * max_num_batched_tokens. Pass 'None' on the CLI to "
"Uses OR logic with --active-prefill-tokens-threshold." "disable this check. Uses OR logic with --active-prefill-tokens-threshold. Default: 10.0."
), ),
arg_type=float, arg_type=_nullable_float,
) )
add_argument( add_argument(
g, g,
......
...@@ -68,9 +68,9 @@ When enabled, the frontend's embedded KV router predicts one expected prefill du ...@@ -68,9 +68,9 @@ When enabled, the frontend's embedded KV router predicts one expected prefill du
| CLI Argument | Env Var | Default | Description | | CLI Argument | Env Var | Default | Description |
|-------------|---------|---------|-------------| |-------------|---------|---------|-------------|
| `--migration-limit` | `DYN_MIGRATION_LIMIT` | `0` | Max request migrations per worker disconnect. 0 = disabled | | `--migration-limit` | `DYN_MIGRATION_LIMIT` | `0` | Max request migrations per worker disconnect. 0 = disabled |
| `--active-decode-blocks-threshold` | `DYN_ACTIVE_DECODE_BLOCKS_THRESHOLD` | | KV cache utilization fraction (0.0–1.0) for busy detection | | `--active-decode-blocks-threshold` | `DYN_ACTIVE_DECODE_BLOCKS_THRESHOLD` | `1.0` | KV cache utilization fraction (0.0–1.0) for busy detection. Pass `None` to disable |
| `--active-prefill-tokens-threshold` | `DYN_ACTIVE_PREFILL_TOKENS_THRESHOLD` | | Absolute token count for prefill busy detection | | `--active-prefill-tokens-threshold` | `DYN_ACTIVE_PREFILL_TOKENS_THRESHOLD` | `10000000` | Absolute token count for prefill busy detection. Pass `None` to disable |
| `--active-prefill-tokens-threshold-frac` | `DYN_ACTIVE_PREFILL_TOKENS_THRESHOLD_FRAC` | | Fraction of `max_num_batched_tokens` for prefill busy detection. OR logic with absolute threshold | | `--active-prefill-tokens-threshold-frac` | `DYN_ACTIVE_PREFILL_TOKENS_THRESHOLD_FRAC` | `10.0` | Fraction of `max_num_batched_tokens` for prefill busy detection. OR logic with absolute threshold. Pass `None` to disable |
## Model Discovery ## Model Discovery
......
...@@ -78,8 +78,9 @@ See [Health Checks](../observability/health-checks.md) for details. ...@@ -78,8 +78,9 @@ See [Health Checks](../observability/health-checks.md) for details.
| Canary health checks | `DYN_HEALTH_CHECK_ENABLED` | `false` | | Canary health checks | `DYN_HEALTH_CHECK_ENABLED` | `false` |
| Canary wait time | `DYN_CANARY_WAIT_TIME` | `10` seconds | | Canary wait time | `DYN_CANARY_WAIT_TIME` | `10` seconds |
| Health check timeout | `DYN_HEALTH_CHECK_REQUEST_TIMEOUT` | `3` seconds | | Health check timeout | `DYN_HEALTH_CHECK_REQUEST_TIMEOUT` | `3` seconds |
| Decode blocks threshold | `--active-decode-blocks-threshold` | None (disabled) | | Decode blocks threshold | `DYN_ACTIVE_DECODE_BLOCKS_THRESHOLD` | `1.0` |
| Prefill tokens threshold | `--active-prefill-tokens-threshold` | None (disabled) | | Prefill tokens threshold | `DYN_ACTIVE_PREFILL_TOKENS_THRESHOLD` | `10000000` |
## Failure Scenarios and Recovery ## Failure Scenarios and Recovery
......
...@@ -876,8 +876,8 @@ impl ModelWatcher { ...@@ -876,8 +876,8 @@ impl ModelWatcher {
let push_router = PushRouter::< let push_router = PushRouter::<
NvCreateEmbeddingRequest, NvCreateEmbeddingRequest,
Annotated<NvCreateEmbeddingResponse>, Annotated<NvCreateEmbeddingResponse>,
>::from_client_with_threshold( >::from_client_with_monitor(
client, self.router_config.router_mode, None, None client, self.router_config.router_mode, None
) )
.await?; .await?;
worker_set.embeddings_engine = Some(Arc::new(push_router)); worker_set.embeddings_engine = Some(Arc::new(push_router));
...@@ -896,11 +896,8 @@ impl ModelWatcher { ...@@ -896,11 +896,8 @@ impl ModelWatcher {
let chat_router = PushRouter::< let chat_router = PushRouter::<
NvCreateChatCompletionRequest, NvCreateChatCompletionRequest,
Annotated<NvCreateChatCompletionStreamResponse>, Annotated<NvCreateChatCompletionStreamResponse>,
>::from_client_with_threshold( >::from_client_with_monitor(
client.clone(), client.clone(), self.router_config.router_mode, None
self.router_config.router_mode,
None,
None,
) )
.await?; .await?;
worker_set.chat_engine = Some(Arc::new(chat_router)); worker_set.chat_engine = Some(Arc::new(chat_router));
...@@ -910,8 +907,8 @@ impl ModelWatcher { ...@@ -910,8 +907,8 @@ impl ModelWatcher {
let images_router = PushRouter::< let images_router = PushRouter::<
NvCreateImageRequest, NvCreateImageRequest,
Annotated<NvImagesResponse>, Annotated<NvImagesResponse>,
>::from_client_with_threshold( >::from_client_with_monitor(
client.clone(), self.router_config.router_mode, None, None client.clone(), self.router_config.router_mode, None
) )
.await?; .await?;
worker_set.images_engine = Some(Arc::new(images_router)); worker_set.images_engine = Some(Arc::new(images_router));
...@@ -921,8 +918,8 @@ impl ModelWatcher { ...@@ -921,8 +918,8 @@ impl ModelWatcher {
let videos_router = PushRouter::< let videos_router = PushRouter::<
NvCreateVideoRequest, NvCreateVideoRequest,
Annotated<NvVideosResponse>, Annotated<NvVideosResponse>,
>::from_client_with_threshold( >::from_client_with_monitor(
client.clone(), self.router_config.router_mode, None, None client.clone(), self.router_config.router_mode, None
) )
.await?; .await?;
worker_set.videos_engine = Some(Arc::new(videos_router)); worker_set.videos_engine = Some(Arc::new(videos_router));
...@@ -932,11 +929,8 @@ impl ModelWatcher { ...@@ -932,11 +929,8 @@ impl ModelWatcher {
let audios_router = PushRouter::< let audios_router = PushRouter::<
NvCreateAudioSpeechRequest, NvCreateAudioSpeechRequest,
Annotated<NvAudioSpeechResponse>, Annotated<NvAudioSpeechResponse>,
>::from_client_with_threshold( >::from_client_with_monitor(
client.clone(), client.clone(), self.router_config.router_mode, None
self.router_config.router_mode,
None,
None,
) )
.await?; .await?;
worker_set.audios_engine = Some(Arc::new(audios_router)); worker_set.audios_engine = Some(Arc::new(audios_router));
...@@ -946,8 +940,8 @@ impl ModelWatcher { ...@@ -946,8 +940,8 @@ impl ModelWatcher {
let push_router = PushRouter::< let push_router = PushRouter::<
NvCreateChatCompletionRequest, NvCreateChatCompletionRequest,
Annotated<NvCreateChatCompletionStreamResponse>, Annotated<NvCreateChatCompletionStreamResponse>,
>::from_client_with_threshold( >::from_client_with_monitor(
client, self.router_config.router_mode, None, None client, self.router_config.router_mode, None
) )
.await?; .await?;
worker_set.chat_engine = Some(Arc::new(push_router)); worker_set.chat_engine = Some(Arc::new(push_router));
...@@ -956,8 +950,8 @@ impl ModelWatcher { ...@@ -956,8 +950,8 @@ impl ModelWatcher {
let push_router = PushRouter::< let push_router = PushRouter::<
NvCreateCompletionRequest, NvCreateCompletionRequest,
Annotated<NvCreateCompletionResponse>, Annotated<NvCreateCompletionResponse>,
>::from_client_with_threshold( >::from_client_with_monitor(
client, self.router_config.router_mode, None, None client, self.router_config.router_mode, None
) )
.await?; .await?;
worker_set.completions_engine = Some(Arc::new(push_router)); worker_set.completions_engine = Some(Arc::new(push_router));
...@@ -975,8 +969,8 @@ impl ModelWatcher { ...@@ -975,8 +969,8 @@ impl ModelWatcher {
let router = PushRouter::< let router = PushRouter::<
PreprocessedEmbeddingRequest, PreprocessedEmbeddingRequest,
Annotated<EmbeddingsEngineOutput>, Annotated<EmbeddingsEngineOutput>,
>::from_client_with_threshold( >::from_client_with_monitor(
client, self.router_config.router_mode, None, None client, self.router_config.router_mode, None
) )
.await?; .await?;
...@@ -999,8 +993,8 @@ impl ModelWatcher { ...@@ -999,8 +993,8 @@ impl ModelWatcher {
let push_router = PushRouter::< let push_router = PushRouter::<
NvCreateTensorRequest, NvCreateTensorRequest,
Annotated<NvCreateTensorResponse>, Annotated<NvCreateTensorResponse>,
>::from_client_with_threshold( >::from_client_with_monitor(
client, self.router_config.router_mode, None, None client, self.router_config.router_mode, None
) )
.await?; .await?;
worker_set.tensor_engine = Some(Arc::new(push_router)); worker_set.tensor_engine = Some(Arc::new(push_router));
......
...@@ -4,7 +4,7 @@ ...@@ -4,7 +4,7 @@
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::Arc; use std::sync::Arc;
use std::sync::RwLock; use std::sync::RwLock;
use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use tokio::sync::Notify; use tokio::sync::Notify;
...@@ -52,20 +52,16 @@ fn cleanup_worker_metrics(worker_id: u64, dp_ranks: &[u32], worker_type: &str) { ...@@ -52,20 +52,16 @@ fn cleanup_worker_metrics(worker_id: u64, dp_ranks: &[u32], worker_type: &str) {
let _ = WORKER_LAST_INTER_TOKEN_LATENCY_GAUGE.remove_label_values(unset_labels); let _ = WORKER_LAST_INTER_TOKEN_LATENCY_GAUGE.remove_label_values(unset_labels);
} }
/// Scale factor for storing f64 thresholds as u32 (10000 = 4 decimal places) /// Default value for `max_num_batched_tokens` when the runtime config does not
const THRESHOLD_SCALE: u32 = 10000; /// report it. Set high enough that the frac-based busy check (which multiplies
/// this value by the threshold fraction) can never fire with realistic loads.
/// Default value for max_num_batched_tokens and active_prefill_tokens_threshold
/// when not configured. Set high enough to effectively disable busy detection.
const DEFAULT_MAX_TOKENS: u64 = 10_000_000; const DEFAULT_MAX_TOKENS: u64 = 10_000_000;
/// Configuration for worker load thresholds used in busy detection. /// Configuration for worker load thresholds used in busy detection.
/// ///
/// All thresholds are optional. When not set, defaults are applied: /// All thresholds are opt-in. An unset (`None`) field means the corresponding
/// - `active_decode_blocks_threshold`: 1.0 (effectively disabled) /// check is skipped entirely — it never contributes to a worker being marked
/// - `active_prefill_tokens_threshold`: 10,000,000 (effectively disabled) /// busy. If all three are `None`, busy-based rejection is fully disabled.
/// - `active_prefill_tokens_threshold_frac`: 1.5 (effectively disabled)
/// - `max_num_batched_tokens` (from runtime config): 10,000,000 if not reported
#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq)] #[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq)]
pub struct LoadThresholdConfig { pub struct LoadThresholdConfig {
/// KV cache block utilization threshold (0.0-1.0). /// KV cache block utilization threshold (0.0-1.0).
...@@ -78,7 +74,7 @@ pub struct LoadThresholdConfig { ...@@ -78,7 +74,7 @@ pub struct LoadThresholdConfig {
#[serde(skip_serializing_if = "Option::is_none")] #[serde(skip_serializing_if = "Option::is_none")]
pub active_prefill_tokens_threshold: Option<u64>, pub active_prefill_tokens_threshold: Option<u64>,
/// Fraction of max_num_batched_tokens (0.0-1.5+). /// Fraction of max_num_batched_tokens.
/// Worker is busy when `active_prefill_tokens > frac * max_num_batched_tokens`. /// Worker is busy when `active_prefill_tokens > frac * max_num_batched_tokens`.
#[serde(skip_serializing_if = "Option::is_none")] #[serde(skip_serializing_if = "Option::is_none")]
pub active_prefill_tokens_threshold_frac: Option<f64>, pub active_prefill_tokens_threshold_frac: Option<f64>,
...@@ -200,7 +196,7 @@ impl WorkerLoadState { ...@@ -200,7 +196,7 @@ impl WorkerLoadState {
fn update_from_active_load( fn update_from_active_load(
&mut self, &mut self,
active_load: &ActiveLoad, active_load: &ActiveLoad,
active_decode_blocks_threshold: f64, active_decode_blocks_threshold: Option<f64>,
) { ) {
let dp_rank = active_load.dp_rank; let dp_rank = active_load.dp_rank;
if let Some(active_blocks) = active_load.active_decode_blocks { if let Some(active_blocks) = active_load.active_decode_blocks {
...@@ -212,30 +208,42 @@ impl WorkerLoadState { ...@@ -212,30 +208,42 @@ impl WorkerLoadState {
if let Some(active_tokens) = active_load.active_prefill_tokens { if let Some(active_tokens) = active_load.active_prefill_tokens {
self.active_prefill_tokens.insert(dp_rank, active_tokens); self.active_prefill_tokens.insert(dp_rank, active_tokens);
} }
self.update_decode_busy_latch( if let Some(threshold) = active_decode_blocks_threshold {
dp_rank, self.update_decode_busy_latch(
active_load.active_decode_blocks, dp_rank,
active_load.kv_used_blocks, active_load.active_decode_blocks,
active_decode_blocks_threshold, active_load.kv_used_blocks,
); threshold,
);
}
} }
/// Returns true if ALL dp_ranks are considered busy based on the threshold logic. /// Returns true if ALL dp_ranks are considered busy based on the threshold logic.
/// ///
/// For each dp_rank, a dp_rank is busy if ANY of these conditions is met (OR logic): /// Each threshold is `Option<T>`. A `None` threshold means that check is
/// 1. `active_prefill_tokens > active_prefill_tokens_threshold` (absolute threshold) /// skipped entirely — it cannot contribute to a dp_rank being busy. If all
/// 2. `active_prefill_tokens > frac * max_num_batched_tokens` (fraction-based threshold) /// three thresholds are `None`, no dp_rank is ever busy.
/// 3. decode busy latch set by either `kv_used_blocks` or `active_decode_blocks`
/// ///
/// If none of these checks can be performed (missing data), that dp_rank is considered free. /// For each dp_rank, a dp_rank is busy if ANY of these conditions is met (OR logic):
/// 1. `active_prefill_tokens > active_prefill_tokens_threshold` (absolute, if set)
/// 2. `active_prefill_tokens > frac * max_num_batched_tokens` (fractional, if set)
/// 3. decode busy latch set by either `kv_used_blocks` or `active_decode_blocks` (if set)
/// ///
/// The worker is busy only if ALL dp_ranks are busy. /// The worker is busy only if ALL dp_ranks are busy.
pub fn is_busy( pub fn is_busy(
&self, &self,
active_decode_blocks_threshold: f64, active_decode_blocks_threshold: Option<f64>,
active_prefill_tokens_threshold: u64, active_prefill_tokens_threshold: Option<u64>,
active_prefill_tokens_threshold_frac: f64, active_prefill_tokens_threshold_frac: Option<f64>,
) -> bool { ) -> bool {
// Short-circuit if all thresholds are unset (i.e. no busy check can fire)
if active_decode_blocks_threshold.is_none()
&& active_prefill_tokens_threshold.is_none()
&& active_prefill_tokens_threshold_frac.is_none()
{
return false;
}
// Get all dp_ranks we know about // Get all dp_ranks we know about
let all_dp_ranks: std::collections::HashSet<_> = self let all_dp_ranks: std::collections::HashSet<_> = self
.active_decode_blocks .active_decode_blocks
...@@ -255,30 +263,36 @@ impl WorkerLoadState { ...@@ -255,30 +263,36 @@ impl WorkerLoadState {
all_dp_ranks.iter().all(|&dp_rank| { all_dp_ranks.iter().all(|&dp_rank| {
// Check 1: prefill tokens threshold (absolute token count) // Check 1: prefill tokens threshold (absolute token count)
if let Some(&active_tokens) = self.active_prefill_tokens.get(&dp_rank) { if let Some(&active_tokens) = self.active_prefill_tokens.get(&dp_rank) {
if active_tokens > active_prefill_tokens_threshold { if let Some(abs_threshold) = active_prefill_tokens_threshold
&& active_tokens > abs_threshold
{
return true; // This dp_rank is busy due to absolute token threshold return true; // This dp_rank is busy due to absolute token threshold
} }
// Check 2: prefill tokens threshold (fraction of max_num_batched_tokens) // Check 2: prefill tokens threshold (fraction of max_num_batched_tokens)
let max_batched = self if let Some(frac) = active_prefill_tokens_threshold_frac {
.max_num_batched_tokens let max_batched = self
.get(&dp_rank) .max_num_batched_tokens
.copied() .get(&dp_rank)
.unwrap_or(DEFAULT_MAX_TOKENS); .copied()
let frac_threshold = .unwrap_or(DEFAULT_MAX_TOKENS);
(active_prefill_tokens_threshold_frac * max_batched as f64) as u64; let frac_threshold = (frac * max_batched as f64) as u64;
if active_tokens > frac_threshold { if active_tokens > frac_threshold {
return true; // This dp_rank is busy due to frac-based token threshold return true;
}
} }
} }
// Check 3: decode busy latch // Check 3: decode busy latch (OR-ed from kv_used_blocks and active_decode_blocks)
if let Some(latch) = self.decode_busy_latches.get(&dp_rank) { if let Some(decode_threshold) = active_decode_blocks_threshold {
if latch.latched_busy { let is_busy = self
.decode_busy_latches
.get(&dp_rank)
.map(|latch| latch.latched_busy)
.unwrap_or_else(|| self.current_decode_busy(dp_rank, decode_threshold));
if is_busy {
return true; return true;
} }
} else if self.current_decode_busy(dp_rank, active_decode_blocks_threshold) {
return true;
} }
// If we can't perform any check or no threshold exceeded, this dp_rank is free // If we can't perform any check or no threshold exceeded, this dp_rank is free
...@@ -307,12 +321,10 @@ pub struct KvWorkerMonitor { ...@@ -307,12 +321,10 @@ pub struct KvWorkerMonitor {
/// Notifies the monitoring task when a prefill client is registered /// Notifies the monitoring task when a prefill client is registered
prefill_client_notify: Arc<Notify>, prefill_client_notify: Arc<Notify>,
worker_load_states: Arc<DashMap<u64, WorkerLoadState>>, worker_load_states: Arc<DashMap<u64, WorkerLoadState>>,
/// Active decode blocks threshold stored as parts-per-10000 (e.g., 8500 = 0.85) /// Load thresholds for busy detection. Each field is `Option<T>` — unset
active_decode_blocks_threshold: Arc<AtomicU32>, /// means the corresponding check in `is_busy` is skipped. If all three are
/// Active prefill tokens threshold stored as literal token count (u64) /// `None`, rejection is fully disabled.
active_prefill_tokens_threshold: Arc<AtomicU64>, thresholds: Arc<RwLock<LoadThresholdConfig>>,
/// Active prefill tokens threshold as fraction of max_num_batched_tokens, stored scaled
active_prefill_tokens_threshold_frac: Arc<AtomicU32>,
/// Guard to ensure start_monitoring() only runs once across clones /// Guard to ensure start_monitoring() only runs once across clones
started: Arc<AtomicBool>, started: Arc<AtomicBool>,
} }
...@@ -320,13 +332,10 @@ pub struct KvWorkerMonitor { ...@@ -320,13 +332,10 @@ pub struct KvWorkerMonitor {
impl KvWorkerMonitor { impl KvWorkerMonitor {
/// Create a new worker monitor with the given threshold configuration. /// Create a new worker monitor with the given threshold configuration.
/// ///
/// All thresholds can be dynamically updated via setter methods or /// Unset thresholds (`None`) remain unset and their corresponding checks
/// `set_load_threshold_config()`. /// in `is_busy` are skipped. Thresholds can be updated at runtime via
/// /// [`set_load_threshold_config`](Self::set_load_threshold_config) or the
/// Defaults are applied for any threshold not specified in the config: /// individual setters.
/// - `active_decode_blocks_threshold`: 1.0 (effectively disabled)
/// - `active_prefill_tokens_threshold`: DEFAULT_MAX_TOKENS (effectively disabled)
/// - `active_prefill_tokens_threshold_frac`: 1.5 (effectively disabled)
/// ///
/// Prometheus metrics are exposed via [`WORKER_LOAD_METRICS`] and should be registered /// Prometheus metrics are exposed via [`WORKER_LOAD_METRICS`] and should be registered
/// using [`register_worker_load_metrics`](crate::kv_router::metrics::register_worker_load_metrics) /// using [`register_worker_load_metrics`](crate::kv_router::metrics::register_worker_load_metrics)
...@@ -335,28 +344,25 @@ impl KvWorkerMonitor { ...@@ -335,28 +344,25 @@ impl KvWorkerMonitor {
/// For disaggregated mode, call `set_prefill_client` after creation to enable /// For disaggregated mode, call `set_prefill_client` after creation to enable
/// proper TTFT metric cleanup when prefill workers are removed. /// proper TTFT metric cleanup when prefill workers are removed.
pub fn new(client: Client, config: LoadThresholdConfig) -> Self { pub fn new(client: Client, config: LoadThresholdConfig) -> Self {
let active_decode_blocks = config.active_decode_blocks_threshold.unwrap_or(1.0);
let active_prefill_tokens = config
.active_prefill_tokens_threshold
.unwrap_or(DEFAULT_MAX_TOKENS);
let active_prefill_tokens_frac = config.active_prefill_tokens_threshold_frac.unwrap_or(1.5);
Self { Self {
client, client,
prefill_client: Arc::new(RwLock::new(None)), prefill_client: Arc::new(RwLock::new(None)),
prefill_client_notify: Arc::new(Notify::new()), prefill_client_notify: Arc::new(Notify::new()),
worker_load_states: Arc::new(DashMap::new()), worker_load_states: Arc::new(DashMap::new()),
active_decode_blocks_threshold: Arc::new(AtomicU32::new(Self::f64_to_scaled( thresholds: Arc::new(RwLock::new(config)),
active_decode_blocks,
))),
active_prefill_tokens_threshold: Arc::new(AtomicU64::new(active_prefill_tokens)),
active_prefill_tokens_threshold_frac: Arc::new(AtomicU32::new(Self::f64_to_scaled(
active_prefill_tokens_frac,
))),
started: Arc::new(AtomicBool::new(false)), started: Arc::new(AtomicBool::new(false)),
} }
} }
/// Returns true iff the user explicitly configured at least one threshold.
///
/// When false, all three per-field checks are skipped in `is_busy` and
/// rejection is fully disabled. Callers that gate 503 responses on busy
/// detection should check this before enabling the gate.
pub fn is_configured(&self) -> bool {
self.thresholds.read().unwrap().is_configured()
}
/// Set the prefill client for disaggregated mode. /// Set the prefill client for disaggregated mode.
/// ///
/// This enables monitoring of prefill endpoint instances for TTFT metric cleanup. /// This enables monitoring of prefill endpoint instances for TTFT metric cleanup.
...@@ -368,79 +374,77 @@ impl KvWorkerMonitor { ...@@ -368,79 +374,77 @@ impl KvWorkerMonitor {
pub fn set_prefill_client(&self, prefill_client: Client) { pub fn set_prefill_client(&self, prefill_client: Client) {
let mut guard = self.prefill_client.write().unwrap(); let mut guard = self.prefill_client.write().unwrap();
*guard = Some(prefill_client); *guard = Some(prefill_client);
// Notify the monitoring task that prefill client is now available
self.prefill_client_notify.notify_one(); self.prefill_client_notify.notify_one();
tracing::debug!("KvWorkerMonitor: prefill client registered for TTFT cleanup"); tracing::debug!("KvWorkerMonitor: prefill client registered for TTFT cleanup");
} }
/// Convert a f64 threshold to scaled u32 for atomic storage. /// Get the current active decode blocks threshold, if configured.
#[inline] pub fn active_decode_blocks_threshold(&self) -> Option<f64> {
fn f64_to_scaled(threshold: f64) -> u32 { self.thresholds
(threshold * THRESHOLD_SCALE as f64) as u32 .read()
} .unwrap()
.active_decode_blocks_threshold
/// Convert a scaled u32 back to f64 threshold.
#[inline]
fn scaled_to_f64(scaled: u32) -> f64 {
scaled as f64 / THRESHOLD_SCALE as f64
}
/// Get the current active decode blocks threshold value as f64.
pub fn active_decode_blocks_threshold(&self) -> f64 {
Self::scaled_to_f64(self.active_decode_blocks_threshold.load(Ordering::Relaxed))
} }
/// Set the active decode blocks threshold value from f64. /// Set the active decode blocks threshold.
pub fn set_active_decode_blocks_threshold(&self, threshold: f64) { pub fn set_active_decode_blocks_threshold(&self, threshold: f64) {
self.active_decode_blocks_threshold self.thresholds
.store(Self::f64_to_scaled(threshold), Ordering::Relaxed); .write()
.unwrap()
.active_decode_blocks_threshold = Some(threshold);
} }
/// Get the current active prefill tokens threshold value as u64. /// Get the current active prefill tokens threshold, if configured.
pub fn active_prefill_tokens_threshold(&self) -> u64 { pub fn active_prefill_tokens_threshold(&self) -> Option<u64> {
self.active_prefill_tokens_threshold.load(Ordering::Relaxed) self.thresholds
.read()
.unwrap()
.active_prefill_tokens_threshold
} }
/// Set the active prefill tokens threshold value from u64. /// Set the active prefill tokens threshold.
pub fn set_active_prefill_tokens_threshold(&self, threshold: u64) { pub fn set_active_prefill_tokens_threshold(&self, threshold: u64) {
self.active_prefill_tokens_threshold self.thresholds
.store(threshold, Ordering::Relaxed); .write()
.unwrap()
.active_prefill_tokens_threshold = Some(threshold);
} }
/// Get the current active prefill tokens threshold frac value as f64. /// Get the current active prefill tokens threshold frac, if configured.
pub fn active_prefill_tokens_threshold_frac(&self) -> f64 { pub fn active_prefill_tokens_threshold_frac(&self) -> Option<f64> {
Self::scaled_to_f64( self.thresholds
self.active_prefill_tokens_threshold_frac .read()
.load(Ordering::Relaxed), .unwrap()
) .active_prefill_tokens_threshold_frac
} }
/// Set the active prefill tokens threshold frac value from f64. /// Set the active prefill tokens threshold frac.
pub fn set_active_prefill_tokens_threshold_frac(&self, frac: f64) { pub fn set_active_prefill_tokens_threshold_frac(&self, frac: f64) {
self.active_prefill_tokens_threshold_frac self.thresholds
.store(Self::f64_to_scaled(frac), Ordering::Relaxed); .write()
.unwrap()
.active_prefill_tokens_threshold_frac = Some(frac);
} }
/// Get the current load threshold configuration. /// Get the current load threshold configuration. Unset fields are returned
/// as `None` (no spurious fallback values).
pub fn load_threshold_config(&self) -> LoadThresholdConfig { pub fn load_threshold_config(&self) -> LoadThresholdConfig {
LoadThresholdConfig { self.thresholds.read().unwrap().clone()
active_decode_blocks_threshold: Some(self.active_decode_blocks_threshold()),
active_prefill_tokens_threshold: Some(self.active_prefill_tokens_threshold()),
active_prefill_tokens_threshold_frac: Some(self.active_prefill_tokens_threshold_frac()),
}
} }
/// Update all thresholds from a LoadThresholdConfig. /// Update thresholds from a `LoadThresholdConfig`. Only fields that are
/// Only updates fields that are Some in the config. /// `Some` in the input overwrite their counterparts; `None` fields leave
/// the existing value untouched.
pub fn set_load_threshold_config(&self, config: &LoadThresholdConfig) { pub fn set_load_threshold_config(&self, config: &LoadThresholdConfig) {
if let Some(threshold) = config.active_decode_blocks_threshold { let mut guard = self.thresholds.write().unwrap();
self.set_active_decode_blocks_threshold(threshold); if let Some(v) = config.active_decode_blocks_threshold {
guard.active_decode_blocks_threshold = Some(v);
} }
if let Some(threshold) = config.active_prefill_tokens_threshold { if let Some(v) = config.active_prefill_tokens_threshold {
self.set_active_prefill_tokens_threshold(threshold); guard.active_prefill_tokens_threshold = Some(v);
} }
if let Some(frac) = config.active_prefill_tokens_threshold_frac { if let Some(v) = config.active_prefill_tokens_threshold_frac {
self.set_active_prefill_tokens_threshold_frac(frac); guard.active_prefill_tokens_threshold_frac = Some(v);
} }
} }
} }
...@@ -507,10 +511,7 @@ impl WorkerLoadMonitor for KvWorkerMonitor { ...@@ -507,10 +511,7 @@ impl WorkerLoadMonitor for KvWorkerMonitor {
let client = self.client.clone(); let client = self.client.clone();
let prefill_client_holder = self.prefill_client.clone(); let prefill_client_holder = self.prefill_client.clone();
let prefill_client_notify = self.prefill_client_notify.clone(); let prefill_client_notify = self.prefill_client_notify.clone();
let active_decode_blocks_threshold = self.active_decode_blocks_threshold.clone(); let thresholds = self.thresholds.clone();
let active_prefill_tokens_threshold = self.active_prefill_tokens_threshold.clone();
let active_prefill_tokens_threshold_frac =
self.active_prefill_tokens_threshold_frac.clone();
// Spawn background monitoring task // Spawn background monitoring task
tokio::spawn(async move { tokio::spawn(async move {
...@@ -626,13 +627,9 @@ impl WorkerLoadMonitor for KvWorkerMonitor { ...@@ -626,13 +627,9 @@ impl WorkerLoadMonitor for KvWorkerMonitor {
.or_default() .or_default()
.insert(dp_rank); .insert(dp_rank);
// Load thresholds dynamically - allows runtime updates // Snapshot thresholds once per event — rare writes (HTTP endpoint)
let current_active_decode_blocks_threshold = // mean RwLock contention is effectively zero.
Self::scaled_to_f64(active_decode_blocks_threshold.load(Ordering::Relaxed)); let cfg = thresholds.read().unwrap().clone();
let current_active_prefill_tokens_threshold =
active_prefill_tokens_threshold.load(Ordering::Relaxed);
let current_active_prefill_tokens_threshold_frac =
Self::scaled_to_f64(active_prefill_tokens_threshold_frac.load(Ordering::Relaxed));
// Update worker load state per dp_rank (for busy detection only) // Update worker load state per dp_rank (for busy detection only)
// Note: Prometheus gauges are updated directly by sequence.rs // Note: Prometheus gauges are updated directly by sequence.rs
...@@ -640,7 +637,7 @@ impl WorkerLoadMonitor for KvWorkerMonitor { ...@@ -640,7 +637,7 @@ impl WorkerLoadMonitor for KvWorkerMonitor {
let mut state = worker_load_states.entry(worker_id).or_default(); let mut state = worker_load_states.entry(worker_id).or_default();
state.update_from_active_load( state.update_from_active_load(
&active_load, &active_load,
current_active_decode_blocks_threshold, cfg.active_decode_blocks_threshold,
); );
} }
...@@ -651,9 +648,9 @@ impl WorkerLoadMonitor for KvWorkerMonitor { ...@@ -651,9 +648,9 @@ impl WorkerLoadMonitor for KvWorkerMonitor {
entry entry
.value() .value()
.is_busy( .is_busy(
current_active_decode_blocks_threshold, cfg.active_decode_blocks_threshold,
current_active_prefill_tokens_threshold, cfg.active_prefill_tokens_threshold,
current_active_prefill_tokens_threshold_frac, cfg.active_prefill_tokens_threshold_frac,
) )
.then_some(*entry.key()) .then_some(*entry.key())
}) })
...@@ -771,9 +768,51 @@ impl WorkerLoadMonitor for KvWorkerMonitor { ...@@ -771,9 +768,51 @@ impl WorkerLoadMonitor for KvWorkerMonitor {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::WorkerLoadState; use super::{LoadThresholdConfig, WorkerLoadState};
use dynamo_kv_router::protocols::ActiveLoad; use dynamo_kv_router::protocols::ActiveLoad;
#[test]
fn load_threshold_config_default_is_not_configured() {
assert!(!LoadThresholdConfig::default().is_configured());
}
#[test]
fn load_threshold_config_decode_only_is_configured() {
let config = LoadThresholdConfig {
active_decode_blocks_threshold: Some(0.85),
..Default::default()
};
assert!(config.is_configured());
}
#[test]
fn load_threshold_config_prefill_tokens_only_is_configured() {
let config = LoadThresholdConfig {
active_prefill_tokens_threshold: Some(10_000),
..Default::default()
};
assert!(config.is_configured());
}
#[test]
fn load_threshold_config_prefill_frac_only_is_configured() {
let config = LoadThresholdConfig {
active_prefill_tokens_threshold_frac: Some(0.9),
..Default::default()
};
assert!(config.is_configured());
}
#[test]
fn load_threshold_config_all_set_is_configured() {
let config = LoadThresholdConfig {
active_decode_blocks_threshold: Some(0.85),
active_prefill_tokens_threshold: Some(10_000),
active_prefill_tokens_threshold_frac: Some(0.9),
};
assert!(config.is_configured());
}
#[test] #[test]
fn is_busy_prefers_kv_used_blocks_over_active_decode_blocks() { fn is_busy_prefers_kv_used_blocks_over_active_decode_blocks() {
let mut state = WorkerLoadState::default(); let mut state = WorkerLoadState::default();
...@@ -781,7 +820,7 @@ mod tests { ...@@ -781,7 +820,7 @@ mod tests {
state.kv_used_blocks.insert(0, 90); state.kv_used_blocks.insert(0, 90);
state.kv_total_blocks.insert(0, 100); state.kv_total_blocks.insert(0, 100);
assert!(state.is_busy(0.6, u64::MAX, 2.0)); assert!(state.is_busy(Some(0.6), Some(u64::MAX), Some(2.0)));
} }
#[test] #[test]
...@@ -790,7 +829,7 @@ mod tests { ...@@ -790,7 +829,7 @@ mod tests {
state.active_decode_blocks.insert(0, 90); state.active_decode_blocks.insert(0, 90);
state.kv_total_blocks.insert(0, 100); state.kv_total_blocks.insert(0, 100);
assert!(state.is_busy(0.6, u64::MAX, 2.0)); assert!(state.is_busy(Some(0.6), Some(u64::MAX), Some(2.0)));
} }
#[test] #[test]
...@@ -799,7 +838,7 @@ mod tests { ...@@ -799,7 +838,7 @@ mod tests {
state.kv_used_blocks.insert(0, 90); state.kv_used_blocks.insert(0, 90);
state.kv_total_blocks.insert(0, 100); state.kv_total_blocks.insert(0, 100);
assert!(state.is_busy(0.6, u64::MAX, 2.0)); assert!(state.is_busy(Some(0.6), Some(u64::MAX), Some(2.0)));
} }
#[test] #[test]
...@@ -814,10 +853,10 @@ mod tests { ...@@ -814,10 +853,10 @@ mod tests {
active_prefill_tokens: None, active_prefill_tokens: None,
kv_used_blocks: Some(90), kv_used_blocks: Some(90),
}, },
0.6, Some(0.6),
); );
assert!(state.is_busy(0.6, u64::MAX, 2.0)); assert!(state.is_busy(Some(0.6), Some(u64::MAX), Some(2.0)));
} }
#[test] #[test]
...@@ -833,9 +872,9 @@ mod tests { ...@@ -833,9 +872,9 @@ mod tests {
active_prefill_tokens: None, active_prefill_tokens: None,
kv_used_blocks: Some(90), kv_used_blocks: Some(90),
}, },
0.6, Some(0.6),
); );
assert!(state.is_busy(0.6, u64::MAX, 2.0)); assert!(state.is_busy(Some(0.6), Some(u64::MAX), Some(2.0)));
state.update_from_active_load( state.update_from_active_load(
&ActiveLoad { &ActiveLoad {
...@@ -845,9 +884,9 @@ mod tests { ...@@ -845,9 +884,9 @@ mod tests {
active_prefill_tokens: None, active_prefill_tokens: None,
kv_used_blocks: None, kv_used_blocks: None,
}, },
0.6, Some(0.6),
); );
assert!(state.is_busy(0.6, u64::MAX, 2.0)); assert!(state.is_busy(Some(0.6), Some(u64::MAX), Some(2.0)));
state.update_from_active_load( state.update_from_active_load(
&ActiveLoad { &ActiveLoad {
...@@ -857,9 +896,9 @@ mod tests { ...@@ -857,9 +896,9 @@ mod tests {
active_prefill_tokens: None, active_prefill_tokens: None,
kv_used_blocks: Some(10), kv_used_blocks: Some(10),
}, },
0.6, Some(0.6),
); );
assert!(!state.is_busy(0.6, u64::MAX, 2.0)); assert!(!state.is_busy(Some(0.6), Some(u64::MAX), Some(2.0)));
} }
#[test] #[test]
...@@ -875,9 +914,9 @@ mod tests { ...@@ -875,9 +914,9 @@ mod tests {
active_prefill_tokens: None, active_prefill_tokens: None,
kv_used_blocks: Some(90), kv_used_blocks: Some(90),
}, },
0.6, Some(0.6),
); );
assert!(state.is_busy(0.6, u64::MAX, 2.0)); assert!(state.is_busy(Some(0.6), Some(u64::MAX), Some(2.0)));
state.update_from_active_load( state.update_from_active_load(
&ActiveLoad { &ActiveLoad {
...@@ -887,9 +926,9 @@ mod tests { ...@@ -887,9 +926,9 @@ mod tests {
active_prefill_tokens: None, active_prefill_tokens: None,
kv_used_blocks: Some(10), kv_used_blocks: Some(10),
}, },
0.6, Some(0.6),
); );
assert!(!state.is_busy(0.6, u64::MAX, 2.0)); assert!(!state.is_busy(Some(0.6), Some(u64::MAX), Some(2.0)));
} }
#[test] #[test]
...@@ -905,9 +944,9 @@ mod tests { ...@@ -905,9 +944,9 @@ mod tests {
active_prefill_tokens: None, active_prefill_tokens: None,
kv_used_blocks: None, kv_used_blocks: None,
}, },
0.6, Some(0.6),
); );
assert!(state.is_busy(0.6, u64::MAX, 2.0)); assert!(state.is_busy(Some(0.6), Some(u64::MAX), Some(2.0)));
state.update_from_active_load( state.update_from_active_load(
&ActiveLoad { &ActiveLoad {
...@@ -917,9 +956,9 @@ mod tests { ...@@ -917,9 +956,9 @@ mod tests {
active_prefill_tokens: None, active_prefill_tokens: None,
kv_used_blocks: None, kv_used_blocks: None,
}, },
0.6, Some(0.6),
); );
assert!(!state.is_busy(0.6, u64::MAX, 2.0)); assert!(!state.is_busy(Some(0.6), Some(u64::MAX), Some(2.0)));
} }
#[test] #[test]
...@@ -935,9 +974,9 @@ mod tests { ...@@ -935,9 +974,9 @@ mod tests {
active_prefill_tokens: None, active_prefill_tokens: None,
kv_used_blocks: None, kv_used_blocks: None,
}, },
0.6, Some(0.6),
); );
assert!(state.is_busy(0.6, u64::MAX, 2.0)); assert!(state.is_busy(Some(0.6), Some(u64::MAX), Some(2.0)));
state.update_from_active_load( state.update_from_active_load(
&ActiveLoad { &ActiveLoad {
...@@ -947,8 +986,82 @@ mod tests { ...@@ -947,8 +986,82 @@ mod tests {
active_prefill_tokens: None, active_prefill_tokens: None,
kv_used_blocks: Some(10), kv_used_blocks: Some(10),
}, },
0.6, Some(0.6),
);
assert!(!state.is_busy(Some(0.6), Some(u64::MAX), Some(2.0)));
}
#[test]
fn is_busy_returns_false_when_all_thresholds_are_none() {
let mut state = WorkerLoadState::default();
state.kv_total_blocks.insert(0, 100);
state.active_decode_blocks.insert(0, 99);
state.kv_used_blocks.insert(0, 99);
state.active_prefill_tokens.insert(0, u64::MAX / 2);
state.max_num_batched_tokens.insert(0, 1_000);
assert!(!state.is_busy(None, None, None));
}
#[test]
fn is_busy_with_only_decode_threshold_ignores_prefill_signals() {
let mut state = WorkerLoadState::default();
state.max_num_batched_tokens.insert(0, 1_000);
state.active_prefill_tokens.insert(0, 5_000);
assert!(!state.is_busy(Some(0.6), None, None));
}
#[test]
fn is_busy_with_only_prefill_abs_ignores_decode_latch() {
let mut state = WorkerLoadState::default();
state.kv_total_blocks.insert(0, 100);
state.update_from_active_load(
&ActiveLoad {
worker_id: 1,
dp_rank: 0,
active_decode_blocks: Some(90),
active_prefill_tokens: None,
kv_used_blocks: Some(90),
},
Some(0.6),
); );
assert!(!state.is_busy(0.6, u64::MAX, 2.0));
assert!(!state.is_busy(None, Some(u64::MAX), None));
}
#[test]
fn is_busy_with_only_prefill_frac_ignores_decode_latch() {
let mut state = WorkerLoadState::default();
state.kv_total_blocks.insert(0, 100);
state.update_from_active_load(
&ActiveLoad {
worker_id: 1,
dp_rank: 0,
active_decode_blocks: Some(90),
active_prefill_tokens: None,
kv_used_blocks: Some(90),
},
Some(0.6),
);
assert!(!state.is_busy(None, None, Some(2.0)));
}
#[test]
fn is_busy_with_only_prefill_abs_fires_when_tokens_exceed_threshold() {
let mut state = WorkerLoadState::default();
state.active_prefill_tokens.insert(0, 5_000);
assert!(state.is_busy(None, Some(1_000), None));
}
#[test]
fn is_busy_with_only_prefill_frac_fires_when_fraction_exceeded() {
let mut state = WorkerLoadState::default();
state.max_num_batched_tokens.insert(0, 1_000);
state.active_prefill_tokens.insert(0, 2_500);
assert!(state.is_busy(None, None, Some(2.0)));
} }
} }
...@@ -322,19 +322,13 @@ where ...@@ -322,19 +322,13 @@ where
wait_for_min_initial_workers(&router_client, min_initial_workers).await?; wait_for_min_initial_workers(&router_client, min_initial_workers).await?;
// Get threshold value and wrap monitor for PushRouter
// Note: PushRouter uses active_decode_blocks_threshold for its internal logic
let threshold_value = worker_monitor
.as_ref()
.map(|m| m.active_decode_blocks_threshold());
let monitor_arc = let monitor_arc =
worker_monitor.map(|m| Arc::new(m) as Arc<dyn dynamo_runtime::pipeline::WorkerLoadMonitor>); worker_monitor.map(|m| Arc::new(m) as Arc<dyn dynamo_runtime::pipeline::WorkerLoadMonitor>);
let router = let router =
PushRouter::<PreprocessedRequest, Annotated<LLMEngineOutput>>::from_client_with_threshold( PushRouter::<PreprocessedRequest, Annotated<LLMEngineOutput>>::from_client_with_monitor(
router_client, router_client,
router_mode, router_mode,
threshold_value,
monitor_arc, monitor_arc,
) )
.await?; .await?;
......
...@@ -149,10 +149,9 @@ impl PrefillRouter { ...@@ -149,10 +149,9 @@ impl PrefillRouter {
self.register_prefill_client(model_manager.as_ref(), &client); self.register_prefill_client(model_manager.as_ref(), &client);
// Build the PushRouter for prefill with KV mode using the shared client // Build the PushRouter for prefill with KV mode using the shared client
let push_router = PushRouter::<PreprocessedRequest, Annotated<LLMEngineOutput>>::from_client_with_threshold( let push_router = PushRouter::<PreprocessedRequest, Annotated<LLMEngineOutput>>::from_client_with_monitor(
client, client,
RouterMode::KV, RouterMode::KV,
None, // busy_threshold
None, // worker_monitor None, // worker_monitor
) )
.await?; .await?;
...@@ -167,10 +166,9 @@ impl PrefillRouter { ...@@ -167,10 +166,9 @@ impl PrefillRouter {
// Create simple push router with the frontend's router mode // Create simple push router with the frontend's router mode
// Note: Per-worker metrics (active_prefill_tokens, active_decode_blocks) are only // Note: Per-worker metrics (active_prefill_tokens, active_decode_blocks) are only
// available in KV routing mode where the router has actual bookkeeping. // available in KV routing mode where the router has actual bookkeeping.
let push_router = PushRouter::<PreprocessedRequest, Annotated<LLMEngineOutput>>::from_client_with_threshold( let push_router = PushRouter::<PreprocessedRequest, Annotated<LLMEngineOutput>>::from_client_with_monitor(
client, client,
self.router_mode, self.router_mode,
None, // busy_threshold
None, // worker_monitor None, // worker_monitor
) )
.await?; .await?;
......
...@@ -134,10 +134,6 @@ where ...@@ -134,10 +134,6 @@ where
/// addresses it, then passes it to AddressedPushRouter which does the network traffic. /// addresses it, then passes it to AddressedPushRouter which does the network traffic.
addressed: Arc<AddressedPushRouter>, addressed: Arc<AddressedPushRouter>,
/// Threshold for determining when a worker is busy (0.0 to 1.0)
/// If None, busy detection is disabled
busy_threshold: Option<f64>,
/// When false, `generate_with_fault_detection` skips fault detection logic: /// When false, `generate_with_fault_detection` skips fault detection logic:
/// it won't call `report_instance_down` on errors, and it uses the raw discovery /// it won't call `report_instance_down` on errors, and it uses the raw discovery
/// instance list instead of the filtered avail list. Use for recovery/query paths /// instance list instead of the filtered avail list. Use for recovery/query paths
...@@ -275,9 +271,9 @@ where ...@@ -275,9 +271,9 @@ where
T: Data + Serialize, T: Data + Serialize,
U: Data + for<'de> Deserialize<'de> + MaybeError, U: Data + for<'de> Deserialize<'de> + MaybeError,
{ {
/// Create a new PushRouter without busy threshold (no busy detection) /// Create a new PushRouter without a worker load monitor (no busy detection)
pub async fn from_client(client: Client, router_mode: RouterMode) -> anyhow::Result<Self> { pub async fn from_client(client: Client, router_mode: RouterMode) -> anyhow::Result<Self> {
Self::from_client_with_threshold(client, router_mode, None, None).await Self::from_client_with_monitor(client, router_mode, None).await
} }
/// Create a new PushRouter with fault detection disabled. /// Create a new PushRouter with fault detection disabled.
...@@ -307,7 +303,6 @@ where ...@@ -307,7 +303,6 @@ where
addressed, addressed,
router_mode, router_mode,
round_robin_counter: Arc::new(AtomicU64::new(0)), round_robin_counter: Arc::new(AtomicU64::new(0)),
busy_threshold: None,
fault_detection_enabled: false, fault_detection_enabled: false,
response_timeout: response_inactivity_timeout(), response_timeout: response_inactivity_timeout(),
occupancy_state, occupancy_state,
...@@ -315,11 +310,15 @@ where ...@@ -315,11 +310,15 @@ where
}) })
} }
/// Create a new PushRouter with optional busy threshold and worker load monitor /// Create a new PushRouter with an optional worker load monitor.
pub async fn from_client_with_threshold( ///
/// The rejection path is gated by `fault_detection_enabled` (true here);
/// busy detection itself is driven by the monitor via `client.update_free_instances(...)`.
/// If no thresholds are configured on the monitor (or no monitor is provided),
/// `client.instance_ids_free()` returns all instances and the gate never rejects.
pub async fn from_client_with_monitor(
client: Client, client: Client,
router_mode: RouterMode, router_mode: RouterMode,
busy_threshold: Option<f64>,
worker_monitor: Option<Arc<dyn WorkerLoadMonitor>>, worker_monitor: Option<Arc<dyn WorkerLoadMonitor>>,
) -> anyhow::Result<Self> { ) -> anyhow::Result<Self> {
let addressed = addressed_router(&client.endpoint).await?; let addressed = addressed_router(&client.endpoint).await?;
...@@ -345,7 +344,6 @@ where ...@@ -345,7 +344,6 @@ where
addressed, addressed,
router_mode, router_mode,
round_robin_counter: Arc::new(AtomicU64::new(0)), round_robin_counter: Arc::new(AtomicU64::new(0)),
busy_threshold,
fault_detection_enabled: true, fault_detection_enabled: true,
response_timeout: response_inactivity_timeout(), response_timeout: response_inactivity_timeout(),
occupancy_state, occupancy_state,
...@@ -668,8 +666,8 @@ where ...@@ -668,8 +666,8 @@ where
) )
}; };
// Check if all workers are busy (only if busy threshold is set and fault detection enabled) // Check if all workers are busy (when fault detection is enabled).
if self.fault_detection_enabled && self.busy_threshold.is_some() { if self.fault_detection_enabled {
let free_instances = self.client.instance_ids_free(); let free_instances = self.client.instance_ids_free();
if free_instances.is_empty() { if free_instances.is_empty() {
// Check if we actually have any instances at all // Check if we actually have any instances at all
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment