Unverified Commit c9ff6235 authored by Yan Ru Pei's avatar Yan Ru Pei Committed by GitHub
Browse files

chore: replace --enforce-disagg with --decode-fallback, default to enforcing disagg (#6515)


Signed-off-by: default avatarPeaBrane <yanrpei@gmail.com>
parent 0432aad8
......@@ -64,7 +64,7 @@ class FrontendConfig(ConfigBase):
router_track_output_blocks: bool
router_event_threads: int
router_queue_threshold: Optional[float]
enforce_disagg: bool
decode_fallback: bool
migration_limit: int
active_decode_blocks_threshold: Optional[float]
......@@ -376,12 +376,14 @@ class FrontendArgGroup(ArgGroup):
)
add_negatable_bool_argument(
g,
flag_name="--enforce-disagg",
env_var="DYN_ENFORCE_DISAGG",
flag_name="--decode-fallback",
env_var="DYN_DECODE_FALLBACK",
default=False,
dest="decode_fallback",
help=(
"Enforce disaggregated prefill-decode. When set, unactivated prefill router will "
"return an error instead of falling back to decode-only mode."
"Allow falling back to decode-only (aggregated) mode when prefill workers are "
"unavailable. By default, disaggregated prefill-decode is enforced and requests "
"fail if no prefill workers are found."
),
)
......
......@@ -210,7 +210,7 @@ async def async_main():
active_decode_blocks_threshold=config.active_decode_blocks_threshold,
active_prefill_tokens_threshold=config.active_prefill_tokens_threshold,
active_prefill_tokens_threshold_frac=config.active_prefill_tokens_threshold_frac,
enforce_disagg=config.enforce_disagg,
decode_fallback=config.decode_fallback,
)
kwargs = {
"http_host": config.http_host,
......
......@@ -54,7 +54,7 @@ typedef struct {
// Router bindings API (replaces Pipeline API)
query_router_result_t create_routers(const char *namespace_c_str,
const char *component_c_str,
bool enforce_disagg,
bool decode_fallback,
RouterHandles **out_handle);
query_router_result_t route_request(RouterHandles *handle,
......@@ -195,7 +195,7 @@ var (
ffiNamespace string
ffiComponent string
ffiEnforceDisagg bool
ffiDecodeFallback bool
routerInitialized bool
......@@ -207,10 +207,10 @@ var (
func loadDynamoConfig() {
ffiNamespace = getEnvOrDefault("DYN_NAMESPACE", "vllm-agg")
ffiComponent = "backend" // This is not the same as DYN_COMPONENT=epp (in this case)
ffiEnforceDisagg = getEnvBoolOrDefault("DYN_ENFORCE_DISAGG", false)
ffiDecodeFallback = getEnvBoolOrDefault("DYN_DECODE_FALLBACK", false)
// Note: model name and kv_cache_block_size are now auto-discovered from the model card
fmt.Printf("Dynamo KV Scorer: namespace=%s, component=%s, enforce_disagg=%v\n",
ffiNamespace, ffiComponent, ffiEnforceDisagg)
fmt.Printf("Dynamo KV Scorer: namespace=%s, component=%s, decode_fallback=%v\n",
ffiNamespace, ffiComponent, ffiDecodeFallback)
}
func getEnvOrDefault(key, def string) string {
......@@ -249,7 +249,7 @@ func initFFI() error {
rc := C.create_routers(
ns,
cm,
C.bool(ffiEnforceDisagg),
C.bool(ffiDecodeFallback),
&routerHandles,
)
if rc != C.QUERY_ROUTER_OK {
......
......@@ -225,7 +225,7 @@ You can configure the plugin by setting environment variables in the EPP compone
Common Vars for Routing Configuration:
- Set `DYN_BUSY_THRESHOLD` to configure the upper bound on how "full" a worker can be (often derived from kv_active_blocks or other load metrics) before the router skips it. If the selected worker exceeds this value, routing falls back to the next best candidate. By default the value is negative meaning this is not enabled.
- Set `DYN_ENFORCE_DISAGG=true` if you want to enforce every request being served in the disaggregated manner. By default it is false meaning if the the prefill worker is not available the request will be served in the aggregated manner.
- Set `DYN_DECODE_FALLBACK=true` to allow falling back to aggregated (decode-only) mode when prefill workers are unavailable. By default, disaggregated prefill-decode is enforced and requests fail if no prefill workers are found.
- Set `DYN_OVERLAP_SCORE_WEIGHT` to weigh how heavily the score uses token overlap (predicted KV cache hits) versus other factors (load, historical hit rate). Higher weight biases toward reusing workers with similar cached prefixes. (default: 1)
- Set `DYN_ROUTER_TEMPERATURE` to soften or sharpen the selection curve when combining scores. Low temperature makes the router pick the top candidate deterministically; higher temperature lets lower-scoring workers through more often (exploration).
- Set `DYN_USE_KV_EVENTS=false` if you want to disable the router listening for KV events while using kv-routing (default: true). SGLang workers require `--kv-events-config` and TRT-LLM workers require `--publish-events-and-metrics` to publish KV events. For vLLM, KV events are auto-configured when prefix caching is active (deprecated — use `--kv-events-config` explicitly)
......
......@@ -560,7 +560,7 @@ fn kv_router_config_from_env() -> KvRouterConfig {
/// # Arguments
/// - `namespace`: Namespace for the model
/// - `component`: Component name (defaults to "backend" if NULL or empty)
/// - `enforce_disagg`: If true, disaggregated mode is required (fails if no prefill workers found)
/// - `decode_fallback`: If true, allows falling back to decode-only mode when no prefill workers are found
/// - `out_handle`: Output handle
///
/// # Safety
......@@ -570,7 +570,7 @@ fn kv_router_config_from_env() -> KvRouterConfig {
pub unsafe extern "C" fn create_routers(
namespace: *const c_char,
component: *const c_char,
enforce_disagg: bool,
decode_fallback: bool,
out_handle: *mut RouterHandlesPtr,
) -> QueryRouterResult {
if namespace.is_null() || out_handle.is_null() {
......@@ -702,18 +702,20 @@ pub unsafe extern "C" fn create_routers(
RouterMode::KV,
block_size,
Some(prefill_config),
enforce_disagg,
decode_fallback,
model_name.clone(),
namespace_str.clone(),
)
}
None if enforce_disagg => {
tracing::error!("Prefill workers required (enforce_disagg=true) but none found");
None if !decode_fallback => {
tracing::error!(
"Prefill workers required but none found and decode fallback is disabled"
);
return Err(QueryRouterResult::ErrDisaggEnforced);
}
None => {
tracing::info!("No prefill workers found, running in aggregated mode");
PrefillRouter::disabled(model_manager.clone(), RouterMode::KV, enforce_disagg)
PrefillRouter::disabled(model_manager.clone(), RouterMode::KV, decode_fallback)
}
};
......
......@@ -108,20 +108,20 @@ pub struct RouterConfig {
active_prefill_tokens_threshold: Option<u64>,
/// Threshold for active prefill tokens as fraction of max_num_batched_tokens
active_prefill_tokens_threshold_frac: Option<f64>,
enforce_disagg: bool,
decode_fallback: bool,
}
#[pymethods]
impl RouterConfig {
#[new]
#[pyo3(signature = (mode, config=None, active_decode_blocks_threshold=None, active_prefill_tokens_threshold=None, active_prefill_tokens_threshold_frac=None, enforce_disagg=false))]
#[pyo3(signature = (mode, config=None, active_decode_blocks_threshold=None, active_prefill_tokens_threshold=None, active_prefill_tokens_threshold_frac=None, decode_fallback=false))]
pub fn new(
mode: RouterMode,
config: Option<KvRouterConfig>,
active_decode_blocks_threshold: Option<f64>,
active_prefill_tokens_threshold: Option<u64>,
active_prefill_tokens_threshold_frac: Option<f64>,
enforce_disagg: bool,
decode_fallback: bool,
) -> Self {
Self {
router_mode: mode,
......@@ -129,7 +129,7 @@ impl RouterConfig {
active_decode_blocks_threshold,
active_prefill_tokens_threshold,
active_prefill_tokens_threshold_frac,
enforce_disagg,
decode_fallback,
}
}
}
......@@ -144,7 +144,7 @@ impl From<RouterConfig> for RsRouterConfig {
active_prefill_tokens_threshold: rc.active_prefill_tokens_threshold,
active_prefill_tokens_threshold_frac: rc.active_prefill_tokens_threshold_frac,
},
enforce_disagg: rc.enforce_disagg,
decode_fallback: rc.decode_fallback,
}
}
}
......
......@@ -945,7 +945,7 @@ class RouterConfig:
active_decode_blocks_threshold: Optional[float] = None,
active_prefill_tokens_threshold: Optional[int] = None,
active_prefill_tokens_threshold_frac: Optional[float] = None,
enforce_disagg: bool = False,
decode_fallback: bool = False,
) -> None:
"""
Create a RouterConfig.
......@@ -956,7 +956,7 @@ class RouterConfig:
active_decode_blocks_threshold: Threshold percentage (0.0-1.0) for decode blocks busy detection
active_prefill_tokens_threshold: Literal token count threshold for prefill busy detection
active_prefill_tokens_threshold_frac: Fraction of max_num_batched_tokens for busy detection
enforce_disagg: Enforce disaggregated prefill-decode mode
decode_fallback: Allow falling back to decode-only mode when prefill workers are unavailable
"""
...
......
......@@ -491,7 +491,7 @@ impl ModelWatcher {
self.router_config.router_mode,
card.kv_cache_block_size,
Some(prefill_config),
self.router_config.enforce_disagg,
self.router_config.decode_fallback,
model_name.clone(),
namespace.clone(),
)
......@@ -536,7 +536,7 @@ impl ModelWatcher {
kv_chooser.clone(),
tokenizer_hf.clone(),
prefill_chooser.clone(),
self.router_config.enforce_disagg,
self.router_config.decode_fallback,
self.migration_limit,
self.metrics.clone(),
)
......@@ -570,7 +570,7 @@ impl ModelWatcher {
preprocessor,
tokenizer_hf,
prefill_chooser,
self.router_config.enforce_disagg,
self.router_config.decode_fallback,
self.migration_limit,
self.metrics.clone(),
)
......
......@@ -37,7 +37,7 @@ pub struct RouterConfig {
pub kv_router_config: KvRouterConfig,
/// Load threshold configuration for busy detection
pub load_threshold_config: LoadThresholdConfig,
pub enforce_disagg: bool,
pub decode_fallback: bool,
}
impl RouterConfig {
......@@ -46,7 +46,7 @@ impl RouterConfig {
router_mode,
kv_router_config,
load_threshold_config: LoadThresholdConfig::default(),
enforce_disagg: false,
decode_fallback: false,
}
}
......@@ -55,8 +55,8 @@ impl RouterConfig {
self
}
pub fn with_enforce_disagg(mut self, enforce_disagg: bool) -> Self {
self.enforce_disagg = enforce_disagg;
pub fn with_decode_fallback(mut self, decode_fallback: bool) -> Self {
self.decode_fallback = decode_fallback;
self
}
}
......
......@@ -187,7 +187,7 @@ pub async fn build_routed_pipeline<Req, Resp>(
chooser: Option<Arc<KvRouter>>,
hf_tokenizer: tokenizers::Tokenizer,
prefill_chooser: Option<Arc<PrefillRouter>>,
enforce_disagg: bool,
decode_fallback: bool,
migration_limit: u32,
metrics: Arc<Metrics>,
) -> anyhow::Result<ServiceEngine<SingleIn<Req>, ManyOut<Annotated<Resp>>>>
......@@ -216,7 +216,7 @@ where
preprocessor,
hf_tokenizer,
prefill_chooser,
enforce_disagg,
decode_fallback,
migration_limit,
metrics,
)
......@@ -234,7 +234,7 @@ pub async fn build_routed_pipeline_with_preprocessor<Req, Resp>(
preprocessor: Arc<OpenAIPreprocessor>,
hf_tokenizer: tokenizers::Tokenizer,
prefill_chooser: Option<Arc<PrefillRouter>>,
enforce_disagg: bool,
decode_fallback: bool,
migration_limit: u32,
metrics: Arc<Metrics>,
) -> anyhow::Result<ServiceEngine<SingleIn<Req>, ManyOut<Annotated<Resp>>>>
......@@ -298,7 +298,7 @@ where
// Use the provided prefill chooser, or create a disabled one if not provided
let prefill_chooser = prefill_chooser
.unwrap_or_else(|| PrefillRouter::disabled(model_manager, router_mode, enforce_disagg));
.unwrap_or_else(|| PrefillRouter::disabled(model_manager, router_mode, decode_fallback));
let prefill_op = prefill_chooser.into_operator();
// Link with prefill chooser including backward edge for response flow
......
......@@ -100,7 +100,7 @@ pub struct PrefillRouter {
endpoint_id: OnceLock<EndpointId>,
cancel_token: CancellationToken,
router_mode: RouterMode,
enforce_disagg: bool,
decode_fallback: bool,
/// Model name used to look up the worker monitor for prefill client registration
model_name: String,
/// Namespace used to look up the correct WorkerSet's worker monitor
......@@ -112,7 +112,7 @@ impl PrefillRouter {
pub fn disabled(
model_manager: Arc<ModelManager>,
router_mode: RouterMode,
enforce_disagg: bool,
decode_fallback: bool,
) -> Arc<Self> {
Arc::new(Self {
prefill_router: OnceLock::new(),
......@@ -120,7 +120,7 @@ impl PrefillRouter {
endpoint_id: OnceLock::new(),
cancel_token: CancellationToken::new(),
router_mode,
enforce_disagg,
decode_fallback,
model_name: String::new(), // Not used for disabled router
namespace: String::new(), // Not used for disabled router
})
......@@ -133,7 +133,7 @@ impl PrefillRouter {
router_mode: RouterMode,
kv_cache_block_size: u32,
kv_router_config: Option<KvRouterConfig>,
enforce_disagg: bool,
decode_fallback: bool,
model_name: String,
namespace: String,
) -> Arc<Self> {
......@@ -146,7 +146,7 @@ impl PrefillRouter {
endpoint_id: OnceLock::new(),
cancel_token: cancel_token.clone(),
router_mode,
enforce_disagg,
decode_fallback,
model_name,
namespace,
});
......@@ -575,11 +575,9 @@ impl
// Save original max_tokens for decode
let original_max_tokens = req.stop_conditions.max_tokens;
// If prefill router is not activated, skip directly to decode
// If prefill router is not activated (no prefill workers discovered),
// this is aggregated mode — route directly to decode.
if self.prefill_router.get().is_none() {
if self.enforce_disagg {
return Err(anyhow::anyhow!(PrefillError::NotActivated));
}
return next.generate(context.map(|_| req)).await;
}
......@@ -702,20 +700,20 @@ impl
next.generate(decode_request).await
}
Err(PrefillError::NotActivated) => {
if self.enforce_disagg {
if !self.decode_fallback {
tracing::error!(
"Prefill router not activated, but disaggregated mode is enforced. Failing request."
"No prefill workers discovered yet and decode fallback is disabled. Failing request."
);
return Err(anyhow::anyhow!(PrefillError::NotActivated));
}
tracing::debug!("Prefill router not activated, falling back to decode-only");
tracing::debug!("No prefill workers discovered yet, falling back to decode-only");
next.generate(context.map(|_| req)).await
}
Err(e) => {
if self.enforce_disagg {
if !self.decode_fallback {
tracing::error!(
error = %e,
"Remote prefill failed, but disaggregated mode is enforced. Failing request."
"Remote prefill failed and decode fallback is disabled. Failing request."
);
return Err(anyhow::anyhow!(e));
}
......
......@@ -285,9 +285,7 @@ def test_request_migration_sglang_prefill(
"""
# Step 1: Start the frontend
with DynamoFrontendProcess(
request, migration_limit=migration_limit, enforce_disagg=True
) as frontend:
with DynamoFrontendProcess(request, migration_limit=migration_limit) as frontend:
logger.info("Frontend started successfully")
# Step 2: Start decode worker first (required for prefill workers to connect)
......@@ -355,9 +353,7 @@ def test_request_migration_sglang_kv_transfer(
"""
# Step 1: Start the frontend
with DynamoFrontendProcess(
request, migration_limit=migration_limit, enforce_disagg=True
) as frontend:
with DynamoFrontendProcess(request, migration_limit=migration_limit) as frontend:
logger.info("Frontend started successfully")
# Step 2: Start prefill worker first
......@@ -428,9 +424,7 @@ def test_request_migration_sglang_decode(
)
# Step 1: Start the frontend
with DynamoFrontendProcess(
request, migration_limit=migration_limit, enforce_disagg=True
) as frontend:
with DynamoFrontendProcess(request, migration_limit=migration_limit) as frontend:
logger.info("Frontend started successfully")
# Step 2: Start prefill worker first
......
......@@ -272,9 +272,7 @@ def test_request_migration_trtllm_prefill(
"""
# Step 1: Start the frontend
with DynamoFrontendProcess(
request, migration_limit=migration_limit, enforce_disagg=True
) as frontend:
with DynamoFrontendProcess(request, migration_limit=migration_limit) as frontend:
logger.info("Frontend started successfully")
# Step 2: Start decode worker first (required for prefill workers to connect)
......@@ -342,9 +340,7 @@ def test_request_migration_trtllm_kv_transfer(
"""
# Step 1: Start the frontend
with DynamoFrontendProcess(
request, migration_limit=migration_limit, enforce_disagg=True
) as frontend:
with DynamoFrontendProcess(request, migration_limit=migration_limit) as frontend:
logger.info("Frontend started successfully")
# Step 2: Start prefill worker first
......@@ -415,9 +411,7 @@ def test_request_migration_trtllm_decode(
)
# Step 1: Start the frontend
with DynamoFrontendProcess(
request, migration_limit=migration_limit, enforce_disagg=True
) as frontend:
with DynamoFrontendProcess(request, migration_limit=migration_limit) as frontend:
logger.info("Frontend started successfully")
# Step 2: Start prefill worker first
......
......@@ -281,9 +281,7 @@ def test_request_migration_vllm_prefill(
"""
# Step 1: Start the frontend
with DynamoFrontendProcess(
request, migration_limit=migration_limit, enforce_disagg=True
) as frontend:
with DynamoFrontendProcess(request, migration_limit=migration_limit) as frontend:
logger.info("Frontend started successfully")
# Step 2: Start decode worker first (required for prefill workers to connect)
......@@ -360,9 +358,7 @@ def test_request_migration_vllm_kv_transfer(
"""
# Step 1: Start the frontend
with DynamoFrontendProcess(
request, migration_limit=migration_limit, enforce_disagg=True
) as frontend:
with DynamoFrontendProcess(request, migration_limit=migration_limit) as frontend:
logger.info("Frontend started successfully")
# Step 2: Start prefill worker first
......@@ -443,9 +439,7 @@ def test_request_migration_vllm_decode(
)
# Step 1: Start the frontend
with DynamoFrontendProcess(
request, migration_limit=migration_limit, enforce_disagg=True
) as frontend:
with DynamoFrontendProcess(request, migration_limit=migration_limit) as frontend:
logger.info("Frontend started successfully")
# Step 2: Start prefill worker first
......
......@@ -21,7 +21,7 @@ logger = logging.getLogger(__name__)
class DynamoFrontendProcess(BaseDynamoFrontendProcess):
"""Fault-tolerance frontend wrapper (keeps env settings from the historical helper)."""
def __init__(self, request, migration_limit: int, enforce_disagg: bool = False):
def __init__(self, request, migration_limit: int, decode_fallback: bool = False):
extra_env = {
"DYN_REQUEST_PLANE": request.getfixturevalue("request_plane"),
# These tests expect full control over requests sent to workers. The canary
......@@ -29,8 +29,8 @@ class DynamoFrontendProcess(BaseDynamoFrontendProcess):
"DYN_HEALTH_CHECK_ENABLED": "false",
}
extra_args = []
if enforce_disagg:
extra_args.append("--enforce-disagg")
if decode_fallback:
extra_args.append("--decode-fallback")
super().__init__(
request,
frontend_port=0, # allocate a free port (xdist-safe)
......
......@@ -47,7 +47,7 @@ class KVRouterProcess(ManagedProcess):
frontend_port: int,
namespace: str,
store_backend: str = "etcd",
enforce_disagg: bool = False,
decode_fallback: bool = False,
blocks_threshold: float | None = None,
tokens_threshold: float | None = None,
tokens_threshold_frac: float | None = None,
......@@ -70,8 +70,8 @@ class KVRouterProcess(ManagedProcess):
namespace,
]
if enforce_disagg:
command.append("--enforce-disagg")
if decode_fallback:
command.append("--decode-fallback")
if blocks_threshold is not None:
command.extend(["--active-decode-blocks-threshold", str(blocks_threshold)])
......@@ -1730,7 +1730,6 @@ def _test_router_decisions_disagg(
frontend_port,
decode_workers.namespace,
store_backend,
enforce_disagg=True,
request_plane=request_plane,
durable_kv_events=durable_kv_events,
):
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment