Unverified Commit f934744f authored by Yan Ru Pei's avatar Yan Ru Pei Committed by GitHub
Browse files

fix: restore --enforce-disagg to reject requests before prefill router activates (#6957)


Signed-off-by: default avatarPeaBrane <yanrpei@gmail.com>
parent 723676b4
...@@ -52,7 +52,7 @@ class FrontendConfig(KvRouterConfigBase): ...@@ -52,7 +52,7 @@ class FrontendConfig(KvRouterConfigBase):
router_mode: str router_mode: str
namespace: Optional[str] = None namespace: Optional[str] = None
namespace_prefix: Optional[str] = None namespace_prefix: Optional[str] = None
decode_fallback: bool enforce_disagg: bool
migration_limit: int migration_limit: int
active_decode_blocks_threshold: Optional[float] active_decode_blocks_threshold: Optional[float]
...@@ -191,14 +191,15 @@ class FrontendArgGroup(ArgGroup): ...@@ -191,14 +191,15 @@ class FrontendArgGroup(ArgGroup):
add_negatable_bool_argument( add_negatable_bool_argument(
g, g,
flag_name="--decode-fallback", flag_name="--enforce-disagg",
env_var="DYN_DECODE_FALLBACK", env_var="DYN_ENFORCE_DISAGG",
default=False, default=False,
dest="decode_fallback", dest="enforce_disagg",
help=( help=(
"Allow falling back to decode-only (aggregated) mode when prefill workers are " "Strictly enforce disaggregated mode. Requests will fail if the prefill router "
"unavailable. By default, disaggregated prefill-decode is enforced and requests " "has not activated yet (e.g., prefill workers still registering). This is stricter "
"fail if no prefill workers are found." "than the default: without this flag, requests arriving before prefill workers are "
"discovered fall through to aggregated decode-only routing."
), ),
) )
......
...@@ -197,7 +197,7 @@ async def async_main(): ...@@ -197,7 +197,7 @@ async def async_main():
active_decode_blocks_threshold=config.active_decode_blocks_threshold, active_decode_blocks_threshold=config.active_decode_blocks_threshold,
active_prefill_tokens_threshold=config.active_prefill_tokens_threshold, active_prefill_tokens_threshold=config.active_prefill_tokens_threshold,
active_prefill_tokens_threshold_frac=config.active_prefill_tokens_threshold_frac, active_prefill_tokens_threshold_frac=config.active_prefill_tokens_threshold_frac,
decode_fallback=config.decode_fallback, enforce_disagg=config.enforce_disagg,
) )
kwargs: dict[str, Any] = { kwargs: dict[str, Any] = {
"http_host": config.http_host, "http_host": config.http_host,
......
...@@ -59,7 +59,7 @@ typedef struct { ...@@ -59,7 +59,7 @@ typedef struct {
// Router bindings API // Router bindings API
query_router_result_t create_routers(const char *namespace_c_str, query_router_result_t create_routers(const char *namespace_c_str,
const char *component_c_str, const char *component_c_str,
bool decode_fallback, bool enforce_disagg,
RouterHandles **out_handle); RouterHandles **out_handle);
query_router_result_t route_prefill_request(RouterHandles *handle, query_router_result_t route_prefill_request(RouterHandles *handle,
...@@ -110,7 +110,7 @@ var ( ...@@ -110,7 +110,7 @@ var (
ffiNamespace string ffiNamespace string
ffiComponent string ffiComponent string
ffiDecodeFallback bool ffiEnforceDisagg bool
routerInitialized bool routerInitialized bool
...@@ -122,10 +122,10 @@ var ( ...@@ -122,10 +122,10 @@ var (
func loadDynamoConfig() { func loadDynamoConfig() {
ffiNamespace = getEnvOrDefault("DYN_NAMESPACE", "vllm-agg") ffiNamespace = getEnvOrDefault("DYN_NAMESPACE", "vllm-agg")
ffiComponent = "backend" // This is not the same as DYN_COMPONENT=epp (in this case) ffiComponent = "backend" // This is not the same as DYN_COMPONENT=epp (in this case)
ffiDecodeFallback = getEnvBoolOrDefault("DYN_DECODE_FALLBACK", false) ffiEnforceDisagg = getEnvBoolOrDefault("DYN_ENFORCE_DISAGG", false)
// Note: model name and kv_cache_block_size are now auto-discovered from the model card // Note: model name and kv_cache_block_size are now auto-discovered from the model card
fmt.Printf("Dynamo KV Scorer: namespace=%s, component=%s, decode_fallback=%v\n", fmt.Printf("Dynamo KV Scorer: namespace=%s, component=%s, enforce_disagg=%v\n",
ffiNamespace, ffiComponent, ffiDecodeFallback) ffiNamespace, ffiComponent, ffiEnforceDisagg)
} }
func getEnvOrDefault(key, def string) string { func getEnvOrDefault(key, def string) string {
...@@ -164,7 +164,7 @@ func initFFI() error { ...@@ -164,7 +164,7 @@ func initFFI() error {
rc := C.create_routers( rc := C.create_routers(
ns, ns,
cm, cm,
C.bool(ffiDecodeFallback), C.bool(ffiEnforceDisagg),
&routerHandles, &routerHandles,
) )
if rc != C.QUERY_ROUTER_OK { if rc != C.QUERY_ROUTER_OK {
......
...@@ -239,7 +239,7 @@ You can configure the plugin by setting environment variables in the EPP compone ...@@ -239,7 +239,7 @@ You can configure the plugin by setting environment variables in the EPP compone
Common Vars for Routing Configuration: Common Vars for Routing Configuration:
- Set `DYN_BUSY_THRESHOLD` to configure the upper bound on how "full" a worker can be (often derived from kv_active_blocks or other load metrics) before the router skips it. If the selected worker exceeds this value, routing falls back to the next best candidate. By default the value is negative meaning this is not enabled. - Set `DYN_BUSY_THRESHOLD` to configure the upper bound on how "full" a worker can be (often derived from kv_active_blocks or other load metrics) before the router skips it. If the selected worker exceeds this value, routing falls back to the next best candidate. By default the value is negative meaning this is not enabled.
- Set `DYN_DECODE_FALLBACK=true` to allow falling back to aggregated (decode-only) mode when prefill workers are unavailable. By default, disaggregated prefill-decode is enforced and requests fail if no prefill workers are found. - Set `DYN_ENFORCE_DISAGG=true` to strictly enforce disaggregated mode. When enabled, requests fail if prefill workers have not registered yet. Without this, requests arriving before prefill workers are discovered fall through to decode-only routing. Prefill errors always fail requests regardless of this setting.
- Set `DYN_OVERLAP_SCORE_WEIGHT` to weigh how heavily the score uses token overlap (predicted KV cache hits) versus other factors (load, historical hit rate). Higher weight biases toward reusing workers with similar cached prefixes. (default: 1) - Set `DYN_OVERLAP_SCORE_WEIGHT` to weigh how heavily the score uses token overlap (predicted KV cache hits) versus other factors (load, historical hit rate). Higher weight biases toward reusing workers with similar cached prefixes. (default: 1)
- Set `DYN_ROUTER_TEMPERATURE` to soften or sharpen the selection curve when combining scores. Low temperature makes the router pick the top candidate deterministically; higher temperature lets lower-scoring workers through more often (exploration). - Set `DYN_ROUTER_TEMPERATURE` to soften or sharpen the selection curve when combining scores. Low temperature makes the router pick the top candidate deterministically; higher temperature lets lower-scoring workers through more often (exploration).
- Set `DYN_USE_KV_EVENTS=false` if you want to disable the router listening for KV events while using kv-routing (default: true). SGLang workers require `--kv-events-config` and TRT-LLM workers require `--publish-events-and-metrics` to publish KV events. For vLLM, KV events are auto-configured when prefix caching is active (deprecated — use `--kv-events-config` explicitly) - Set `DYN_USE_KV_EVENTS=false` if you want to disable the router listening for KV events while using kv-routing (default: true). SGLang workers require `--kv-events-config` and TRT-LLM workers require `--publish-events-and-metrics` to publish KV events. For vLLM, KV events are auto-configured when prefix caching is active (deprecated — use `--kv-events-config` explicitly)
......
...@@ -14,9 +14,6 @@ spec: ...@@ -14,9 +14,6 @@ spec:
extraPodSpec: extraPodSpec:
mainContainer: mainContainer:
image: nvcr.io/nvidia/ai-dynamo/epp-image:my-tag image: nvcr.io/nvidia/ai-dynamo/epp-image:my-tag
env:
- name: DYN_DECODE_FALLBACK
value: "true"
eppConfig: eppConfig:
config: config:
plugins: plugins:
......
...@@ -577,7 +577,7 @@ fn kv_router_config_from_env() -> KvRouterConfig { ...@@ -577,7 +577,7 @@ fn kv_router_config_from_env() -> KvRouterConfig {
/// # Arguments /// # Arguments
/// - `namespace`: Namespace for the model /// - `namespace`: Namespace for the model
/// - `component`: Component name (defaults to "backend" if NULL or empty) /// - `component`: Component name (defaults to "backend" if NULL or empty)
/// - `decode_fallback`: If true, allows falling back to decode-only mode when no prefill workers are found /// - `enforce_disagg`: If true, requires prefill workers to be present at init time
/// - `out_handle`: Output handle /// - `out_handle`: Output handle
/// ///
/// # Safety /// # Safety
...@@ -587,7 +587,7 @@ fn kv_router_config_from_env() -> KvRouterConfig { ...@@ -587,7 +587,7 @@ fn kv_router_config_from_env() -> KvRouterConfig {
pub unsafe extern "C" fn create_routers( pub unsafe extern "C" fn create_routers(
namespace: *const c_char, namespace: *const c_char,
component: *const c_char, component: *const c_char,
decode_fallback: bool, enforce_disagg: bool,
out_handle: *mut RouterHandlesPtr, out_handle: *mut RouterHandlesPtr,
) -> QueryRouterResult { ) -> QueryRouterResult {
if namespace.is_null() || out_handle.is_null() { if namespace.is_null() || out_handle.is_null() {
...@@ -719,20 +719,20 @@ pub unsafe extern "C" fn create_routers( ...@@ -719,20 +719,20 @@ pub unsafe extern "C" fn create_routers(
RouterMode::KV, RouterMode::KV,
block_size, block_size,
Some(prefill_config), Some(prefill_config),
decode_fallback, enforce_disagg,
model_name.clone(), model_name.clone(),
namespace_str.clone(), namespace_str.clone(),
) )
} }
None if !decode_fallback => { None if enforce_disagg => {
tracing::error!( tracing::error!(
"Prefill workers required but none found and decode fallback is disabled" "Prefill workers required but none found (enforce_disagg is enabled)"
); );
return Err(QueryRouterResult::ErrDisaggEnforced); return Err(QueryRouterResult::ErrDisaggEnforced);
} }
None => { None => {
tracing::info!("No prefill workers found, running in aggregated mode"); tracing::info!("No prefill workers found, running in aggregated mode");
PrefillRouter::disabled(model_manager.clone(), RouterMode::KV, decode_fallback) PrefillRouter::disabled(model_manager.clone(), RouterMode::KV, enforce_disagg)
} }
}; };
......
...@@ -110,20 +110,20 @@ pub struct RouterConfig { ...@@ -110,20 +110,20 @@ pub struct RouterConfig {
active_prefill_tokens_threshold: Option<u64>, active_prefill_tokens_threshold: Option<u64>,
/// Threshold for active prefill tokens as fraction of max_num_batched_tokens /// Threshold for active prefill tokens as fraction of max_num_batched_tokens
active_prefill_tokens_threshold_frac: Option<f64>, active_prefill_tokens_threshold_frac: Option<f64>,
decode_fallback: bool, enforce_disagg: bool,
} }
#[pymethods] #[pymethods]
impl RouterConfig { impl RouterConfig {
#[new] #[new]
#[pyo3(signature = (mode, config=None, active_decode_blocks_threshold=None, active_prefill_tokens_threshold=None, active_prefill_tokens_threshold_frac=None, decode_fallback=false))] #[pyo3(signature = (mode, config=None, active_decode_blocks_threshold=None, active_prefill_tokens_threshold=None, active_prefill_tokens_threshold_frac=None, enforce_disagg=false))]
pub fn new( pub fn new(
mode: RouterMode, mode: RouterMode,
config: Option<KvRouterConfig>, config: Option<KvRouterConfig>,
active_decode_blocks_threshold: Option<f64>, active_decode_blocks_threshold: Option<f64>,
active_prefill_tokens_threshold: Option<u64>, active_prefill_tokens_threshold: Option<u64>,
active_prefill_tokens_threshold_frac: Option<f64>, active_prefill_tokens_threshold_frac: Option<f64>,
decode_fallback: bool, enforce_disagg: bool,
) -> Self { ) -> Self {
Self { Self {
router_mode: mode, router_mode: mode,
...@@ -131,7 +131,7 @@ impl RouterConfig { ...@@ -131,7 +131,7 @@ impl RouterConfig {
active_decode_blocks_threshold, active_decode_blocks_threshold,
active_prefill_tokens_threshold, active_prefill_tokens_threshold,
active_prefill_tokens_threshold_frac, active_prefill_tokens_threshold_frac,
decode_fallback, enforce_disagg,
} }
} }
} }
...@@ -146,7 +146,7 @@ impl From<RouterConfig> for RsRouterConfig { ...@@ -146,7 +146,7 @@ impl From<RouterConfig> for RsRouterConfig {
active_prefill_tokens_threshold: rc.active_prefill_tokens_threshold, active_prefill_tokens_threshold: rc.active_prefill_tokens_threshold,
active_prefill_tokens_threshold_frac: rc.active_prefill_tokens_threshold_frac, active_prefill_tokens_threshold_frac: rc.active_prefill_tokens_threshold_frac,
}, },
decode_fallback: rc.decode_fallback, enforce_disagg: rc.enforce_disagg,
} }
} }
} }
......
...@@ -996,7 +996,7 @@ class RouterConfig: ...@@ -996,7 +996,7 @@ class RouterConfig:
active_decode_blocks_threshold: Optional[float] = None, active_decode_blocks_threshold: Optional[float] = None,
active_prefill_tokens_threshold: Optional[int] = None, active_prefill_tokens_threshold: Optional[int] = None,
active_prefill_tokens_threshold_frac: Optional[float] = None, active_prefill_tokens_threshold_frac: Optional[float] = None,
decode_fallback: bool = False, enforce_disagg: bool = False,
) -> None: ) -> None:
""" """
Create a RouterConfig. Create a RouterConfig.
...@@ -1007,7 +1007,7 @@ class RouterConfig: ...@@ -1007,7 +1007,7 @@ class RouterConfig:
active_decode_blocks_threshold: Threshold percentage (0.0-1.0) for decode blocks busy detection active_decode_blocks_threshold: Threshold percentage (0.0-1.0) for decode blocks busy detection
active_prefill_tokens_threshold: Literal token count threshold for prefill busy detection active_prefill_tokens_threshold: Literal token count threshold for prefill busy detection
active_prefill_tokens_threshold_frac: Fraction of max_num_batched_tokens for busy detection active_prefill_tokens_threshold_frac: Fraction of max_num_batched_tokens for busy detection
decode_fallback: Allow falling back to decode-only mode when prefill workers are unavailable enforce_disagg: Strictly enforce disaggregated mode, failing requests if no prefill workers are available
""" """
... ...
......
...@@ -491,7 +491,7 @@ impl ModelWatcher { ...@@ -491,7 +491,7 @@ impl ModelWatcher {
self.router_config.router_mode, self.router_config.router_mode,
card.kv_cache_block_size, card.kv_cache_block_size,
Some(prefill_config), Some(prefill_config),
self.router_config.decode_fallback, self.router_config.enforce_disagg,
model_name.clone(), model_name.clone(),
namespace.clone(), namespace.clone(),
) )
...@@ -536,7 +536,7 @@ impl ModelWatcher { ...@@ -536,7 +536,7 @@ impl ModelWatcher {
kv_chooser.clone(), kv_chooser.clone(),
tokenizer.clone(), tokenizer.clone(),
prefill_chooser.clone(), prefill_chooser.clone(),
self.router_config.decode_fallback, self.router_config.enforce_disagg,
self.migration_limit, self.migration_limit,
self.metrics.clone(), self.metrics.clone(),
) )
...@@ -567,7 +567,7 @@ impl ModelWatcher { ...@@ -567,7 +567,7 @@ impl ModelWatcher {
preprocessor, preprocessor,
tokenizer, tokenizer,
prefill_chooser, prefill_chooser,
self.router_config.decode_fallback, self.router_config.enforce_disagg,
self.migration_limit, self.migration_limit,
self.metrics.clone(), self.metrics.clone(),
) )
......
...@@ -37,7 +37,7 @@ pub struct RouterConfig { ...@@ -37,7 +37,7 @@ pub struct RouterConfig {
pub kv_router_config: KvRouterConfig, pub kv_router_config: KvRouterConfig,
/// Load threshold configuration for busy detection /// Load threshold configuration for busy detection
pub load_threshold_config: LoadThresholdConfig, pub load_threshold_config: LoadThresholdConfig,
pub decode_fallback: bool, pub enforce_disagg: bool,
} }
impl RouterConfig { impl RouterConfig {
...@@ -46,7 +46,7 @@ impl RouterConfig { ...@@ -46,7 +46,7 @@ impl RouterConfig {
router_mode, router_mode,
kv_router_config, kv_router_config,
load_threshold_config: LoadThresholdConfig::default(), load_threshold_config: LoadThresholdConfig::default(),
decode_fallback: false, enforce_disagg: false,
} }
} }
...@@ -55,8 +55,8 @@ impl RouterConfig { ...@@ -55,8 +55,8 @@ impl RouterConfig {
self self
} }
pub fn with_decode_fallback(mut self, decode_fallback: bool) -> Self { pub fn with_enforce_disagg(mut self, enforce_disagg: bool) -> Self {
self.decode_fallback = decode_fallback; self.enforce_disagg = enforce_disagg;
self self
} }
} }
......
...@@ -189,7 +189,7 @@ pub async fn build_routed_pipeline<Req, Resp>( ...@@ -189,7 +189,7 @@ pub async fn build_routed_pipeline<Req, Resp>(
chooser: Option<Arc<KvRouter>>, chooser: Option<Arc<KvRouter>>,
tokenizer: crate::tokenizers::Tokenizer, tokenizer: crate::tokenizers::Tokenizer,
prefill_chooser: Option<Arc<PrefillRouter>>, prefill_chooser: Option<Arc<PrefillRouter>>,
decode_fallback: bool, enforce_disagg: bool,
migration_limit: u32, migration_limit: u32,
metrics: Arc<Metrics>, metrics: Arc<Metrics>,
) -> anyhow::Result<ServiceEngine<SingleIn<Req>, ManyOut<Annotated<Resp>>>> ) -> anyhow::Result<ServiceEngine<SingleIn<Req>, ManyOut<Annotated<Resp>>>>
...@@ -218,7 +218,7 @@ where ...@@ -218,7 +218,7 @@ where
preprocessor, preprocessor,
tokenizer, tokenizer,
prefill_chooser, prefill_chooser,
decode_fallback, enforce_disagg,
migration_limit, migration_limit,
metrics, metrics,
) )
...@@ -236,7 +236,7 @@ pub async fn build_routed_pipeline_with_preprocessor<Req, Resp>( ...@@ -236,7 +236,7 @@ pub async fn build_routed_pipeline_with_preprocessor<Req, Resp>(
preprocessor: Arc<OpenAIPreprocessor>, preprocessor: Arc<OpenAIPreprocessor>,
tokenizer: crate::tokenizers::Tokenizer, tokenizer: crate::tokenizers::Tokenizer,
prefill_chooser: Option<Arc<PrefillRouter>>, prefill_chooser: Option<Arc<PrefillRouter>>,
decode_fallback: bool, enforce_disagg: bool,
migration_limit: u32, migration_limit: u32,
metrics: Arc<Metrics>, metrics: Arc<Metrics>,
) -> anyhow::Result<ServiceEngine<SingleIn<Req>, ManyOut<Annotated<Resp>>>> ) -> anyhow::Result<ServiceEngine<SingleIn<Req>, ManyOut<Annotated<Resp>>>>
...@@ -305,7 +305,7 @@ where ...@@ -305,7 +305,7 @@ where
// Use the provided prefill chooser, or create a disabled one if not provided // Use the provided prefill chooser, or create a disabled one if not provided
let prefill_chooser = prefill_chooser let prefill_chooser = prefill_chooser
.unwrap_or_else(|| PrefillRouter::disabled(model_manager, router_mode, decode_fallback)); .unwrap_or_else(|| PrefillRouter::disabled(model_manager, router_mode, enforce_disagg));
let prefill_op = prefill_chooser.into_operator(); let prefill_op = prefill_chooser.into_operator();
// Link with prefill chooser including backward edge for response flow // Link with prefill chooser including backward edge for response flow
......
...@@ -109,7 +109,7 @@ pub struct PrefillRouter { ...@@ -109,7 +109,7 @@ pub struct PrefillRouter {
endpoint_id: OnceLock<EndpointId>, endpoint_id: OnceLock<EndpointId>,
cancel_token: CancellationToken, cancel_token: CancellationToken,
router_mode: RouterMode, router_mode: RouterMode,
decode_fallback: bool, enforce_disagg: bool,
/// Model name used to look up the worker monitor for prefill client registration /// Model name used to look up the worker monitor for prefill client registration
model_name: String, model_name: String,
/// Namespace used to look up the correct WorkerSet's worker monitor /// Namespace used to look up the correct WorkerSet's worker monitor
...@@ -121,7 +121,7 @@ impl PrefillRouter { ...@@ -121,7 +121,7 @@ impl PrefillRouter {
pub fn disabled( pub fn disabled(
model_manager: Arc<ModelManager>, model_manager: Arc<ModelManager>,
router_mode: RouterMode, router_mode: RouterMode,
decode_fallback: bool, enforce_disagg: bool,
) -> Arc<Self> { ) -> Arc<Self> {
Arc::new(Self { Arc::new(Self {
prefill_router: OnceLock::new(), prefill_router: OnceLock::new(),
...@@ -129,7 +129,7 @@ impl PrefillRouter { ...@@ -129,7 +129,7 @@ impl PrefillRouter {
endpoint_id: OnceLock::new(), endpoint_id: OnceLock::new(),
cancel_token: CancellationToken::new(), cancel_token: CancellationToken::new(),
router_mode, router_mode,
decode_fallback, enforce_disagg,
model_name: String::new(), // Not used for disabled router model_name: String::new(), // Not used for disabled router
namespace: String::new(), // Not used for disabled router namespace: String::new(), // Not used for disabled router
}) })
...@@ -142,7 +142,7 @@ impl PrefillRouter { ...@@ -142,7 +142,7 @@ impl PrefillRouter {
router_mode: RouterMode, router_mode: RouterMode,
kv_cache_block_size: u32, kv_cache_block_size: u32,
kv_router_config: Option<KvRouterConfig>, kv_router_config: Option<KvRouterConfig>,
decode_fallback: bool, enforce_disagg: bool,
model_name: String, model_name: String,
namespace: String, namespace: String,
) -> Arc<Self> { ) -> Arc<Self> {
...@@ -155,7 +155,7 @@ impl PrefillRouter { ...@@ -155,7 +155,7 @@ impl PrefillRouter {
endpoint_id: OnceLock::new(), endpoint_id: OnceLock::new(),
cancel_token: cancel_token.clone(), cancel_token: cancel_token.clone(),
router_mode, router_mode,
decode_fallback, enforce_disagg,
model_name, model_name,
namespace, namespace,
}); });
...@@ -594,7 +594,11 @@ impl ...@@ -594,7 +594,11 @@ impl
// If prefill router is not activated (no prefill workers discovered), // If prefill router is not activated (no prefill workers discovered),
// this is aggregated mode — route directly to decode. // this is aggregated mode — route directly to decode.
// With --enforce-disagg, fail instead of falling back.
if self.prefill_router.get().is_none() { if self.prefill_router.get().is_none() {
if self.enforce_disagg {
return Err(anyhow::anyhow!(PrefillError::NotActivated));
}
return next.generate(context.map(|_| req)).await; return next.generate(context.map(|_| req)).await;
} }
...@@ -714,28 +718,12 @@ impl ...@@ -714,28 +718,12 @@ impl
next.generate(decode_request).await next.generate(decode_request).await
} }
Err(PrefillError::NotActivated) => { Err(PrefillError::NotActivated) => {
if !self.decode_fallback { tracing::error!("Prefill router not activated, failing request");
tracing::error!( Err(anyhow::anyhow!(PrefillError::NotActivated))
"No prefill workers discovered yet and decode fallback is disabled. Failing request."
);
return Err(anyhow::anyhow!(PrefillError::NotActivated));
}
tracing::debug!("No prefill workers discovered yet, falling back to decode-only");
next.generate(context.map(|_| req)).await
} }
Err(e) => { Err(e) => {
if !self.decode_fallback { tracing::error!(error = %e, "Remote prefill failed, failing request");
tracing::error!( Err(anyhow::anyhow!(e))
error = %e,
"Remote prefill failed and decode fallback is disabled. Failing request."
);
return Err(anyhow::anyhow!(e));
}
tracing::warn!(
error = %e,
"Remote prefill failed, falling back to decode-only. This may impact performance in disaggregated deployments. Verify prefill workers are healthy and accessible."
);
next.generate(context.map(|_| req)).await
} }
} }
} }
......
...@@ -21,22 +21,18 @@ logger = logging.getLogger(__name__) ...@@ -21,22 +21,18 @@ logger = logging.getLogger(__name__)
class DynamoFrontendProcess(BaseDynamoFrontendProcess): class DynamoFrontendProcess(BaseDynamoFrontendProcess):
"""Fault-tolerance frontend wrapper (keeps env settings from the historical helper).""" """Fault-tolerance frontend wrapper (keeps env settings from the historical helper)."""
def __init__(self, request, migration_limit: int, decode_fallback: bool = False): def __init__(self, request, migration_limit: int):
extra_env = { extra_env = {
"DYN_REQUEST_PLANE": request.getfixturevalue("request_plane"), "DYN_REQUEST_PLANE": request.getfixturevalue("request_plane"),
# These tests expect full control over requests sent to workers. The canary # These tests expect full control over requests sent to workers. The canary
# health check can inject extra requests and cause intermittent failures. # health check can inject extra requests and cause intermittent failures.
"DYN_HEALTH_CHECK_ENABLED": "false", "DYN_HEALTH_CHECK_ENABLED": "false",
} }
extra_args = []
if decode_fallback:
extra_args.append("--decode-fallback")
super().__init__( super().__init__(
request, request,
frontend_port=0, # allocate a free port (xdist-safe) frontend_port=0, # allocate a free port (xdist-safe)
router_mode="round-robin", router_mode="round-robin",
migration_limit=migration_limit, migration_limit=migration_limit,
extra_args=extra_args if extra_args else None,
extra_env=extra_env, extra_env=extra_env,
terminate_all_matching_process_names=False, terminate_all_matching_process_names=False,
display_name="frontend", display_name="frontend",
......
...@@ -46,7 +46,7 @@ class KVRouterProcess(ManagedProcess): ...@@ -46,7 +46,7 @@ class KVRouterProcess(ManagedProcess):
frontend_port: int, frontend_port: int,
namespace: str, namespace: str,
store_backend: str = "etcd", store_backend: str = "etcd",
decode_fallback: bool = False, enforce_disagg: bool = False,
blocks_threshold: float | None = None, blocks_threshold: float | None = None,
tokens_threshold: float | None = None, tokens_threshold: float | None = None,
tokens_threshold_frac: float | None = None, tokens_threshold_frac: float | None = None,
...@@ -69,8 +69,8 @@ class KVRouterProcess(ManagedProcess): ...@@ -69,8 +69,8 @@ class KVRouterProcess(ManagedProcess):
namespace, namespace,
] ]
if decode_fallback: if enforce_disagg:
command.append("--decode-fallback") command.append("--enforce-disagg")
if blocks_threshold is not None: if blocks_threshold is not None:
command.extend(["--active-decode-blocks-threshold", str(blocks_threshold)]) command.extend(["--active-decode-blocks-threshold", str(blocks_threshold)])
...@@ -1774,6 +1774,7 @@ def _test_router_decisions_disagg( ...@@ -1774,6 +1774,7 @@ def _test_router_decisions_disagg(
frontend_port, frontend_port,
decode_workers.namespace, decode_workers.namespace,
store_backend, store_backend,
enforce_disagg=True,
request_plane=request_plane, request_plane=request_plane,
durable_kv_events=durable_kv_events, durable_kv_events=durable_kv_events,
): ):
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment