Unverified Commit a441aaf8 authored by atchernych's avatar atchernych Committed by GitHub
Browse files

feat: Enable EPP worker discovery mode (#6592)


Signed-off-by: default avatarAnna Tchernych <atchernych@nvidia.com>
parent b77f9765
......@@ -7,6 +7,7 @@
*.[Pp][Nn][Gg] binary
*.[Zz][Ii][Pp] binary
*.[Tt][Gg][Zz] binary
*.[Gg][Zz] binary
*.fatbin binary
# Exclude test data files from linguist language detection
......
......@@ -167,10 +167,14 @@ func (s *DynDecodeScorer) Score(ctx context.Context, cycleState *schedtypes.Cycl
if isDisaggregated {
req.Headers[RoutingModeHeader] = "disaggregated"
// In disagg mode, the prefill worker was selected by the prefill scorer profile.
// The prefill worker ID would need to be communicated from the prefill profile result.
// For now we set the mode header; the prefill worker header will be set
// when the framework processes the prefill profile result.
// The prefill worker ID header was already set by DynPrefillScorer
// directly on req.Headers during the prefill profile run.
if prefillID, ok := req.Headers[PrefillWorkerIDHeader]; ok {
logger.V(logutil.DEFAULT).Info("DynDecodeScorer: prefill worker header present",
"prefillWorkerID", prefillID)
} else {
logger.V(logutil.DEFAULT).Error(nil, "DynDecodeScorer: x-prefill-instance-id header missing — DynPrefillScorer did not set it")
}
} else {
req.Headers[RoutingModeHeader] = "aggregated"
}
......
......@@ -20,6 +20,7 @@ import (
"context"
"encoding/json"
"fmt"
"strconv"
log "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins"
......@@ -113,10 +114,23 @@ func (s *DynPrefillScorer) Score(ctx context.Context, cycleState *schedtypes.Cyc
return uniformScores(pods, 0)
}
prefillWorkerID := strconv.FormatUint(result.WorkerID, 10)
logger.V(logutil.DEFAULT).Info("DynPrefillScorer: prefill worker selected",
"prefillWorkerID", fmt.Sprintf("%d", result.WorkerID),
"prefillWorkerID", prefillWorkerID,
"tokenCount", len(result.TokenData))
// Set the prefill worker ID header directly on the request.
// The request object is shared across all profile runs in the scheduling
// cycle, so the decode scorer (which runs in the next profile) will see it.
// This is more reliable than CycleState which may be scoped per profile.
if req.Headers == nil {
req.Headers = map[string]string{}
}
req.Headers[PrefillWorkerIDHeader] = prefillWorkerID
// Also write to CycleState for any plugin that needs it via the standard API.
cycleState.Write(PrefillWorkerIDStateKey, &PrefillWorkerIDState{WorkerID: prefillWorkerID})
// Score: 1.0 for all pods. The label-filter has already restricted to prefill workers,
// and the FFI router's internal selection is authoritative.
// In the future, we could match worker IDs to pod names for precise scoring.
......
......@@ -43,6 +43,10 @@ const (
// PrefillEnabledStateKey is used to communicate prefill-enabled status
// from the DisaggProfileHandler to the scorer plugins via CycleState.
PrefillEnabledStateKey = plugins.StateKey("disagg-prefill-enabled")
// PrefillWorkerIDStateKey communicates the prefill worker ID selected by
// DynPrefillScorer to DynDecodeScorer so it can set the x-prefill-instance-id header.
PrefillWorkerIDStateKey = plugins.StateKey("disagg-prefill-worker-id")
)
// PrefillEnabledState stores whether prefill is enabled for the current scheduling cycle.
......@@ -56,6 +60,17 @@ func (s *PrefillEnabledState) Clone() plugins.StateData {
return &PrefillEnabledState{Enabled: s.Enabled}
}
// PrefillWorkerIDState stores the prefill worker ID selected by DynPrefillScorer.
// Written by DynPrefillScorer, read by DynDecodeScorer to set the header.
type PrefillWorkerIDState struct {
WorkerID string
}
// Clone implements plugins.StateData.
func (s *PrefillWorkerIDState) Clone() plugins.StateData {
return &PrefillWorkerIDState{WorkerID: s.WorkerID}
}
// readPrefillEnabled reads the PrefillEnabledState from CycleState.
// Returns false if the state is not found or not set.
func readPrefillEnabled(cycleState *schedtypes.CycleState) bool {
......
......@@ -101,9 +101,12 @@ import (
"time"
"unsafe"
ctrl "sigs.k8s.io/controller-runtime"
schedtypes "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types"
)
var logger = ctrl.Log.WithName("dynamo-kv-scorer")
var (
ffiOnce sync.Once
ffiErr error
......@@ -120,12 +123,16 @@ var (
)
func loadDynamoConfig() {
ffiNamespace = getEnvOrDefault("DYN_NAMESPACE", "vllm-agg")
ffiNamespace = getEnvOrDefault("DYN_NAMESPACE_PREFIX", getEnvOrDefault("DYN_NAMESPACE", "vllm-agg"))
ffiComponent = "backend" // This is not the same as DYN_COMPONENT=epp (in this case)
ffiEnforceDisagg = getEnvBoolOrDefault("DYN_ENFORCE_DISAGG", false)
// Note: model name and kv_cache_block_size are now auto-discovered from the model card
fmt.Printf("Dynamo KV Scorer: namespace=%s, component=%s, enforce_disagg=%v\n",
ffiNamespace, ffiComponent, ffiEnforceDisagg)
logger.Info("Dynamo KV Scorer config loaded",
"namespace", ffiNamespace,
"component", ffiComponent,
"enforce_disagg", ffiEnforceDisagg,
"kvCacheBlockSize", getEnvOrDefault("DYN_KV_CACHE_BLOCK_SIZE", "(from discovery)"),
"modelName", getEnvOrDefault("DYN_MODEL_NAME", "(from discovery)"))
}
func getEnvOrDefault(key, def string) string {
......@@ -168,7 +175,16 @@ func initFFI() error {
&routerHandles,
)
if rc != C.QUERY_ROUTER_OK {
ffiErr = fmt.Errorf("create_routers failed with code %d", rc)
switch rc {
case C.QUERY_ROUTER_ERR_DISAGG_ENFORCED:
ffiErr = fmt.Errorf(
"create_routers failed: no prefill workers found. "+
"If running in aggregated mode, set DYN_DECODE_FALLBACK=true to allow decode-only routing. "+
"If running in disaggregated mode, ensure prefill workers are deployed and discoverable in namespace %q",
ffiNamespace)
default:
ffiErr = fmt.Errorf("create_routers failed with code %d", rc)
}
return
}
routerInitialized = true
......
......@@ -95,6 +95,14 @@ func (e *EPPDefaults) GetBaseContainer(context ComponentContext) (corev1.Contain
Name: "RUST_LOG",
Value: "debug,dynamo_llm::kv_router=trace",
},
{
Name: "DYN_ENFORCE_DISAGG",
Value: "false",
},
{
Name: commonconsts.DynamoNamespacePrefixEnvVar,
Value: context.DynamoNamespace,
},
}...)
// EPP default args
......
......@@ -71,10 +71,6 @@ kubectl create secret generic hf-token-secret \
-n ${NAMESPACE}
```
Create a model configuration file similar to the vllm_agg_qwen.yaml for your model.
This file demonstrates the values needed for the vLLM aggregated setup in [agg.yaml](https://github.com/ai-dynamo/dynamo/blob/main/examples/backends/vllm/deploy/agg.yaml)
Take a note of the model's block size provided in the model card.
### 4. Build EPP image (Optional)
You can either use the provided Dynamo FrontEnd image for the EPP image or you need to build your own Dynamo EPP custom image following the steps below.
......@@ -124,6 +120,7 @@ For the HttpRoute service make sure to specify the namespace where your gateway
```bash
cd <dynamo-source-root>
# kubectl get httproutes -n my-model # Make sure you do not have an incompatible HttpRoute running, delete if so.
kubectl apply -f examples/backends/vllm/deploy/gaie/agg.yaml -n my-model
kubectl apply -f examples/backends/vllm/deploy/gaie/http-route.yaml -n my-model
```
......@@ -136,6 +133,11 @@ kubectl apply -f recipes/llama-3-70b/model-cache/model-cache.yaml -n ${NAMESPAC
kubectl apply -f recipes/llama-3-70b/model-cache/model-download.yaml -n ${NAMESPACE}
```
We provide examples for llama-3-70b vLLM under the `recipes/llama-3-70b/vllm/agg/gaie/` for aggregated and `recipes/llama-3-70b/vllm/disagg-single-node/gaie/` for disaggregated serving.
Note for the aggregated serving you need to disable DYN_ENFORCE_DISAGG in epp config.
```bash
- name: DYN_ENFORCE_DISAGG
value: "false"
```
Use the proper folder in commands below.
```bash
......@@ -143,7 +145,7 @@ Use the proper folder in commands below.
# agg
kubectl apply -f recipes/llama-3-70b/vllm/agg/gaie/deploy.yaml -n ${NAMESPACE}
# Deploy the GAIE http-route CR.
# Deploy the GAIE http-route CR. Adjust parentRefs.namespace in this file first to point where your gateway is.
kubectl apply -f recipes/llama-3-70b/vllm/agg/gaie/http-route.yaml -n ${NAMESPACE}
# or disagg
......@@ -180,7 +182,7 @@ extraPodSpec:
**Gateway Namespace**
Note that this assumes your gateway is installed into `NAMESPACE=my-model` (examples' default)
If you installed it into a different namespace, you need to adjust the HttpRoute entry in http-route.yaml.
If you installed it into a different namespace, you need to adjust the HttpRoute entry in `http-route.yaml`.
#### 5.b. Deploy as a standalone pod
......@@ -197,6 +199,7 @@ cd deploy/inference-gateway/standalone
# Export the EPP image - use the Dynamo FrontEnd image or build your own EPP image (see section 4)
export EPP_IMAGE=<the-epp-image>
```
Create a model configuration file similar to the vllm_agg_qwen.yaml for your model.
```bash
helm upgrade --install dynamo-gaie ./helm/dynamo-gaie -n my-model -f ./vllm_agg_qwen.yaml --set-string extension.image=$EPP_IMAGE
......@@ -221,11 +224,31 @@ Key configurations include:
You can configure the plugin by setting environment variables in the EPP component of your DGD in case of the operator-managed installation or in your [values.yaml](https://github.com/ai-dynamo/dynamo/blob/main/deploy/inference-gateway/standalone/helm/dynamo-gaie/values.yaml).
Common Vars for Routing Configuration:
**Enabling KV-Aware Routing (most precise)**
KV-aware routing uses live KV cache block events from workers so the EPP can route requests to the worker with the best prefix cache overlap. To enable it (default):
1. **Workers — enable prefix caching and KV event publishing.** Each worker must publish KV cache events to event plane (NATS/ZMQ) so the EPP's router can track per-worker cache state.
- **vLLM:** Pass `--enable-prefix-caching` and `--kv-events-config '{"enable_kv_cache_events":true}'`.
- **SGLang:** Pass `--kv-events-config` with the appropriate endpoint.
- **TRT-LLM:** Pass `--publish-events-and-metrics`.
- **Disaggregated vLLM (prefill/decode separation):** Do **not** pass `--disaggregation-mode decode` on decode workers — this flag hardcodes KV event publishing to off. Instead, omit the flag (defaults to aggregated mode) so decode workers also publish their cache state.
2. **EPP — leave `DYN_USE_KV_EVENTS` at its default (`true`).** The EPP subscribes to worker KV events via event plane (NATS/ZMQ) and uses them for prefix-overlap scoring.
3. **Block size — must be consistent.** The `--block-size` on all workers must match `DYN_KV_CACHE_BLOCK_SIZE` on the EPP (default: 128). Mismatched block sizes cause incorrect block hash computation.
**Disabling KV-Aware Routing**
To disable the EPP from listening for KV events (e.g., when prefix caching is off on workers, or for simpler load-balanced routing):
1. **EPP:** Set `DYN_USE_KV_EVENTS=false`. The router falls back to approximate mode (routing decisions are tracked locally with TTL decay instead of live KV events from workers).
2. **Workers:** Pass `--no-enable-prefix-caching` to disable prefix caching entirely. Without prefix caching, no KV events are generated regardless of other flags.
3. **Optionally** set `DYN_OVERLAP_SCORE_WEIGHT=0` on the EPP to skip prefix-overlap scoring altogether, making the router select workers based on load only.
- Set `DYN_BUSY_THRESHOLD` to configure the upper bound on how "full" a worker can be (often derived from kv_active_blocks or other load metrics) before the router skips it. If the selected worker exceeds this value, routing falls back to the next best candidate. By default the value is negative meaning this is not enabled.
- Set `DYN_ENFORCE_DISAGG=true` to strictly enforce disaggregated mode. When enabled, requests fail if prefill workers have not registered yet. Without this, requests arriving before prefill workers are discovered fall through to decode-only routing. Prefill errors always fail requests regardless of this setting.
- Set `DYN_OVERLAP_SCORE_WEIGHT` to weigh how heavily the score uses token overlap (predicted KV cache hits) versus other factors (load, historical hit rate). Higher weight biases toward reusing workers with similar cached prefixes. (default: 1)
- Set `DYN_ROUTER_TEMPERATURE` to soften or sharpen the selection curve when combining scores. Low temperature makes the router pick the top candidate deterministically; higher temperature lets lower-scoring workers through more often (exploration).
- Set `DYN_USE_KV_EVENTS=false` if you want to disable the router listening for KV events while using kv-routing (default: true). SGLang workers require `--kv-events-config` and TRT-LLM workers require `--publish-events-and-metrics` to publish KV events. For vLLM, KV events are auto-configured when prefix caching is active (deprecated — use `--kv-events-config` explicitly)
- `DYN_ROUTER_TEMPERATURE` — Temperature for worker sampling via softmax (default: 0.0)
- `DYN_ROUTER_REPLICA_SYNC` — Enable replica synchronization (default: false)
- `DYN_ROUTER_TRACK_ACTIVE_BLOCKS` — Track active blocks (default: true)
......@@ -264,9 +287,8 @@ The Inference Gateway provides HTTP endpoints for model inference.
#### 1: Populate gateway URL for your k8s cluster ####
To test the gateway in minikube, use the following command:
a. User minikube tunnel to expose the gateway to the host
This requires `sudo` access to the host machine. alternatively, you can use port-forward to expose the gateway to the host as shown in alternative (b).
a. To test the integration in minikube, proceed as below:
Use minikube tunnel to expose the gateway to the host. This requires `sudo` access to the host machine. Alternatively, you can use port-forward to expose the gateway to the host as shown in alternative (b).
```bash
# in first terminal
......@@ -277,11 +299,13 @@ minikube tunnel # start the tunnel
GATEWAY_URL=$(kubectl get svc inference-gateway -n my-model -o jsonpath='{.spec.clusterIP}') && echo $GATEWAY_URL
```
b. use port-forward to expose the gateway to the host
b. To test on a cluster use commands below:
use port-forward to expose the gateway to the host
```bash
# in first terminal
kubectl port-forward svc/inference-gateway 8000:80 -n {NAMESPACE} # for NAMESPACE put wherever you installed thee gateway i.e. kgateway-system
kubectl port-forward svc/inference-gateway 8000:80 -n ${NAMESPACE} # for NAMESPACE put wherever you installed the gateway i.e. kgateway-system or my-model8
# in second terminal where you want to send inference requests
GATEWAY_URL=http://localhost:8000
......@@ -332,6 +356,25 @@ curl $GATEWAY_URL/v1/chat/completions \
"temperature": 0.0
}'
```
or
```bash
MODEL_NAME="RedHatAI/Llama-3.3-70B-Instruct-FP8-dynamic"
curl -H "Host: llama3-70b-disagg.example.com" http://localhost:8000/v1/chat/completions \
-H "Content-Type: application/json" \
-d '{
"model": "'"${MODEL_NAME}"'",
"messages": [
{
"role": "user",
"content": "In the heart of Eldoria, an ancient land of boundless magic and mysterious creatures, lies the long-forgotten city of Aeloria. Once a beacon of knowledge and power, Aeloria was buried beneath the shifting sands of time, lost to the world for centuries. You are an intrepid explorer, known for your unparalleled curiosity and courage, who has stumbled upon an ancient map hinting at ests that Aeloria holds a secret so profound that it has the potential to reshape the very fabric of reality. Your journey will take you through treacherous deserts, enchanted forests, and across perilous mountain ranges. Your Task: Character Background: Develop a detailed background for your character. Describe their motivations for seeking out Aeloria, their skills and weaknesses, and any personal connections to the ancient city or its legends. Are they driven by a quest for knowledge, a search for lost familt clue is hidden."
}
],
"stream":false,
"max_tokens": 30,
"temperature": 0.0
}'
```
Sample inference output:
......@@ -369,7 +412,9 @@ Sample inference output:
```
***If you have more than one HttpRoute running on the cluster***
Add the host to your HttpRoute.yaml and add the header `curl -H "Host: llama3-70b-agg.example.com" ...` to every request.
Add the host to your HttpRoute.yaml and add the header
`curl -H "Host: llama3-70b-agg.example.com" ...` or `curl -H "Host: llama3-70b-disagg.example.com" http://localhost:8000/v1/models`
```bash
spec:
hostnames:
......
......@@ -14,6 +14,13 @@ spec:
extraPodSpec:
mainContainer:
image: nvcr.io/nvidia/ai-dynamo/epp-image:my-tag
env:
- name: DYN_KV_CACHE_BLOCK_SIZE
value: "128"
- name: DYN_MODEL_NAME
value: "Qwen/Qwen3-0.6B"
- name: DYN_ENFORCE_DISAGG
value: "false"
eppConfig:
config:
plugins:
......
......@@ -57,9 +57,9 @@ fn initialize_tracing() {
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.finish();
tracing::subscriber::set_global_default(subscriber).expect("setting default subscriber failed");
tracing::debug!("Tracing initialized");
if tracing::subscriber::set_global_default(subscriber).is_ok() {
tracing::debug!("Tracing initialized");
}
}
#[repr(u32)]
......@@ -439,6 +439,10 @@ impl RouterHandles {
priority_jump: f64,
allowed_worker_ids: Option<HashSet<WorkerId>>,
) -> Result<u64, QueryRouterResult> {
if let Some(ref ids) = allowed_worker_ids {
self.prefill_router.register_workers(ids);
}
self.prefill_router
.query_prefill_worker(
tokens,
......@@ -473,6 +477,10 @@ impl RouterHandles {
is_disaggregated: bool,
allowed_worker_ids: Option<HashSet<WorkerId>>,
) -> Result<(WorkerWithDpRank, u32), QueryRouterResult> {
if let Some(ref ids) = allowed_worker_ids {
self.decode_router.register_workers(ids);
}
// For decode phase in disaggregated mode, use overlap_score_weight=0
// This matches prefill_router.rs
let config_override = if is_disaggregated {
......@@ -594,6 +602,8 @@ pub unsafe extern "C" fn create_routers(
enforce_disagg: bool,
out_handle: *mut RouterHandlesPtr,
) -> QueryRouterResult {
initialize_tracing();
if namespace.is_null() || out_handle.is_null() {
return QueryRouterResult::ErrInvalidParam;
}
......@@ -633,24 +643,29 @@ pub unsafe extern "C" fn create_routers(
}
};
// Wait for at least one worker to be discovered before proceeding
// This ensures the decode router can be created successfully
let instance_count = wait_for_discovery_sync(&drt).await;
if instance_count == 0 {
tracing::error!(
"Discovery sync failed: no worker instances found. Is the backend running?"
let (preprocessor, block_size, model_name, actual_namespace) =
match init_preprocessor(&drt, &namespace_str).await {
Ok(result) => result,
Err(e) => {
tracing::error!(error = %e, "Failed to initialize preprocessor");
return Err(QueryRouterResult::ErrInitFailed);
}
};
if actual_namespace != namespace_str {
tracing::info!(
base_namespace = namespace_str,
actual_namespace = actual_namespace,
"Worker namespace has rolling-update suffix"
);
return Err(QueryRouterResult::ErrInitFailed);
}
tracing::info!(
"Discovery sync complete, {} worker(s) found",
instance_count
);
let kv_router_config = kv_router_config_from_env();
let mut kv_router_config = kv_router_config_from_env();
kv_router_config.skip_initial_worker_wait = true;
// Get component and endpoint
let component_handle = match drt.namespace(&namespace_str) {
// Build endpoint using the actual namespace discovered from workers,
// which may include a rolling-update hash suffix.
let component_handle = match drt.namespace(&actual_namespace) {
Ok(ns) => match ns.component(&component_str) {
Ok(c) => c,
Err(e) => {
......@@ -667,25 +682,6 @@ pub unsafe extern "C" fn create_routers(
let model_manager = Arc::new(ModelManager::new());
// Fetch model card via discovery and create preprocessor + get block_size
let (preprocessor, block_size, model_name) =
match fetch_preprocessor_from_discovery(&drt, &namespace_str).await {
Ok((prep, bs, name)) => {
tracing::info!(
kv_cache_block_size = bs,
"Preprocessor created from discovery"
);
(Some(prep), bs, name)
}
Err(e) => {
tracing::error!(
error = %e,
"Failed to fetch model card from discovery - cannot determine block_size"
);
return Err(QueryRouterResult::ErrInitFailed);
}
};
// Create decode router
let decode_router = match model_manager
.kv_chooser_for(
......@@ -704,6 +700,44 @@ pub unsafe extern "C" fn create_routers(
}
};
// Wait for the runtime config watch to be populated with at least one
// decode worker's ModelRuntimeConfig. skip_initial_worker_wait=true
// skips this inside KvRouter::new, but the selector needs workers in
// workers_with_configs to avoid NoEndpoints on the first request.
// discovery sync already confirmed workers exist; this just waits for
// the async join of instance IDs + configs to complete in the watch.
{
let mut config_watch = model_manager
.get_or_create_runtime_config_watcher(&endpoint)
.await
.map_err(|e| {
tracing::error!(error = ?e, "Failed to get runtime config watcher");
QueryRouterResult::ErrInitFailed
})?;
tracing::info!(
"Waiting for decode workers to register ModelRuntimeConfig \
(no timeout - controlled by K8s StartupProbe)..."
);
let wait_result = config_watch.wait_for(|m| !m.is_empty()).await.map(|_| ());
match wait_result {
Ok(()) => {
let count = config_watch.borrow().len();
tracing::info!(
worker_count = count,
"Runtime config watch populated with decode workers"
);
}
Err(_) => {
tracing::error!(
"Runtime config watch closed before any workers appeared. \
Decode routing will fail. \
Verify workers are running and publishing to discovery."
);
return Err(QueryRouterResult::ErrInitFailed);
}
}
}
// Create PrefillRouter based on one-time discovery of prefill workers
// Auto-detects disaggregated mode by checking if prefill workers are present
// The prefill workers have to be created before the epp is created.
......@@ -1243,6 +1277,51 @@ pub unsafe extern "C" fn route_decode_request(
}
}
/// Initialize the preprocessor, block size, and model name.
///
/// Waits for discovery to sync (model card must be available for tokenization),
/// then creates the preprocessor from the model card. The `kv_cache_block_size`
/// and `model_name` are taken from the model card to ensure consistency with
/// the worker configuration.
async fn init_preprocessor(
drt: &DistributedRuntime,
target_namespace: &str,
) -> anyhow::Result<(Option<Arc<OpenAIPreprocessor>>, u32, String, String)> {
let instance_count = wait_for_discovery_sync(drt).await;
if instance_count == 0 {
anyhow::bail!("Discovery sync failed: no worker instances found. Is the backend running?");
}
tracing::info!(
"Discovery sync complete, {} worker(s) found",
instance_count
);
// Retry fetching the preprocessor: model card metadata may arrive after
// worker endpoints are registered.
let (prep, block_size, model_name, actual_namespace) = loop {
match fetch_preprocessor_from_discovery(drt, target_namespace).await {
Ok(result) => break result,
Err(e) => {
tracing::warn!(
error = %e,
target_namespace,
"Model card not available yet, retrying in 5s..."
);
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
}
}
};
tracing::info!(
kv_cache_block_size = block_size,
model_name = model_name,
actual_namespace = actual_namespace,
"Preprocessor initialized from model card"
);
Ok((Some(prep), block_size, model_name, actual_namespace))
}
/// Fetch model card via discovery and create preprocessor.
///
/// This function:
......@@ -1254,7 +1333,7 @@ pub unsafe extern "C" fn route_decode_request(
async fn fetch_preprocessor_from_discovery(
drt: &DistributedRuntime,
target_namespace: &str,
) -> anyhow::Result<(Arc<OpenAIPreprocessor>, u32, String)> {
) -> anyhow::Result<(Arc<OpenAIPreprocessor>, u32, String, String)> {
use dynamo_llm::model_card::ModelDeploymentCard;
use dynamo_runtime::discovery::DiscoveryInstance;
......@@ -1263,16 +1342,18 @@ async fn fetch_preprocessor_from_discovery(
// List all models
let instances = discovery.list(DiscoveryQuery::AllModels).await?;
// Find first model card in the target namespace (decode workers only)
let mut model_card: Option<ModelDeploymentCard> = None;
// Find first model card in the target namespace (decode workers only).
// Use prefix matching because workers may append a rolling-update hash
// suffix to the base namespace (e.g. "ns-dgd-58908edc" vs "ns-dgd").
let mut model_card: Option<(ModelDeploymentCard, String)> = None;
for instance in instances {
if let DiscoveryInstance::Model { namespace, .. } = &instance {
// Filter by namespace
if namespace != target_namespace {
if !namespace.starts_with(target_namespace) {
continue;
}
let actual_namespace = namespace.clone();
match instance.deserialize_model::<ModelDeploymentCard>() {
Ok(card) => {
// Skip prefill-only workers, we want decode workers for routing
......@@ -1282,7 +1363,7 @@ async fn fetch_preprocessor_from_discovery(
{
continue;
}
model_card = Some(card);
model_card = Some((card, actual_namespace));
break;
}
Err(e) => {
......@@ -1293,7 +1374,7 @@ async fn fetch_preprocessor_from_discovery(
}
}
let mut card = model_card.ok_or_else(|| {
let (mut card, actual_namespace) = model_card.ok_or_else(|| {
anyhow::anyhow!(
"No model found in namespace '{}' via discovery",
target_namespace
......@@ -1305,6 +1386,7 @@ async fn fetch_preprocessor_from_discovery(
tracing::info!(
model_name = model_name,
kv_cache_block_size = kv_cache_block_size,
actual_namespace = actual_namespace,
"Found model card via discovery"
);
......@@ -1313,7 +1395,12 @@ async fn fetch_preprocessor_from_discovery(
// Create preprocessor
let preprocessor = OpenAIPreprocessor::new(card)?;
Ok((preprocessor, kv_cache_block_size, model_name))
Ok((
preprocessor,
kv_cache_block_size,
model_name,
actual_namespace,
))
}
/// Find a prefill endpoint from already-discovered instances (one-time filter).
......@@ -1342,8 +1429,7 @@ async fn find_prefill_endpoint(
..
} = &instance
{
// Filter by namespace
if namespace != target_namespace {
if !namespace.starts_with(target_namespace) {
continue;
}
......
......@@ -94,6 +94,7 @@ impl KvRouterConfig {
router_queue_threshold,
router_event_threads,
router_enable_cache_control,
skip_initial_worker_wait: false,
router_queue_policy: router_queue_policy.parse().unwrap_or_else(|_| {
panic!("invalid router_queue_policy: {router_queue_policy:?}")
}),
......
......@@ -126,6 +126,10 @@ pub struct KvRouterConfig {
/// When false (default), cache_control is ignored and no cache_control client is created.
pub router_enable_cache_control: bool,
/// Skip blocking for workers at init time (default: false).
/// When true, the router starts immediately without waiting for discovery-based
/// workers and workers are provided externally per-request (e.g., EPP).
pub skip_initial_worker_wait: bool,
/// Scheduling policy for the router queue.
/// "fcfs" (default): first-come first-served with priority bumps — optimizes tail TTFT.
/// "wspt": weighted shortest processing time (Smith's rule) — optimizes average TTFT.
......@@ -158,6 +162,7 @@ impl Default for KvRouterConfig {
router_queue_threshold: Some(2.0),
router_event_threads: 4,
router_enable_cache_control: false,
skip_initial_worker_wait: false,
router_queue_policy: RouterQueuePolicy::default(),
remote_indexer_component: None,
}
......
......@@ -96,15 +96,47 @@ impl<P: SequencePublisher + 'static, C: WorkerConfigLike, S: SchedulingPolicy>
}
}
/// Register externally-provided workers in the slot tracker.
///
/// Looks up DP rank/size from the discovery watch channel; defaults to
/// `(0, 1)` for workers not yet known to discovery.
pub fn register_workers(&self, worker_ids: &std::collections::HashSet<u64>) {
let discovery_workers = self.workers_with_configs.borrow();
let dp_range: std::collections::HashMap<u64, (u32, u32)> = worker_ids
.iter()
.map(|&id| {
let (dp_start, dp_size) = discovery_workers
.get(&id)
.map(|runtime_config| {
(
runtime_config.data_parallel_start_rank(),
runtime_config.data_parallel_size(),
)
})
.unwrap_or((0, 1));
(id, (dp_start, dp_size))
})
.collect();
self.slots.register_external_workers(&dp_range);
}
/// Enqueue a new request.
/// If queueing is disabled or workers have capacity, schedule immediately.
/// Otherwise park in the pending heap.
///
/// When `allowed_worker_ids` is set on the request (external routing), the
/// capacity check is skipped.
pub async fn enqueue(&self, request: SchedulingRequest) {
let Some(threshold) = self.threshold_frac else {
self.schedule(request).await;
return;
};
if request.allowed_worker_ids.is_some() {
self.schedule(request).await;
return;
}
if self.all_workers_busy(threshold, request.allowed_worker_ids.as_ref()) {
tracing::debug!("all workers busy, queueing request");
let arrival_offset = self.start_time.elapsed();
......
......@@ -274,6 +274,33 @@ impl<P: SequencePublisher + 'static> ActiveSequencesMultiWorker<P> {
Ok(())
}
/// Register externally-provided workers (e.g. from EPP) in the slot tracker,
/// adding any that are missing.
///
/// Unlike [`update_workers`], this does not remove workers absent from the
/// input — it only adds new ones. This is intentional: the EPP may send
/// different subsets of workers on different requests, and one routing call
/// must not evict workers registered by another.
///
/// Worker removal in External mode will be handled separately via GAIE
/// lifecycle events (not yet implemented). TODO (atchernych) once we upgrade to GAIE latest.
pub fn register_external_workers(&self, dp_range: &HashMap<u64, (u32, u32)>) {
let mut table = self.workers.write();
for (&worker_id, &(dp_start, dp_size)) in dp_range {
for dp_rank in dp_start..(dp_start + dp_size) {
let worker = WorkerWithDpRank::new(worker_id, dp_rank);
if !table.index.contains_key(&worker) {
tracing::debug!("Lazily registering external worker {:?}", worker);
let idx = table.slots.len();
table
.slots
.push((worker, RwLock::new(ActiveSequences::new(self.block_size))));
table.index.insert(worker, idx);
}
}
}
}
/// Update the set of workers, adding and removing as needed.
///
/// `new_dp_range` maps worker IDs to their data-parallel range (start, size).
......
......@@ -335,13 +335,14 @@ impl KvRouter {
let indexer = Indexer::new(component, &kv_router_config, block_size, model_name).await?;
// Wait for at least one worker with a known runtime config before starting scheduler
let _ = workers_with_configs
.wait_for(|m| !m.is_empty())
.await
.map_err(|_| {
anyhow::anyhow!("runtime config watch closed before any workers appeared")
})?;
if !kv_router_config.skip_initial_worker_wait {
let _ = workers_with_configs
.wait_for(|m| !m.is_empty())
.await
.map_err(|_| {
anyhow::anyhow!("runtime config watch closed before any workers appeared")
})?;
}
let scheduler = KvScheduler::start(
component.clone(),
......@@ -487,6 +488,11 @@ impl KvRouter {
Ok((response.best_worker, response.overlap_blocks))
}
/// Register externally-provided workers in the slot tracker.
pub fn register_workers(&self, worker_ids: &HashSet<WorkerId>) {
self.scheduler.register_workers(worker_ids);
}
#[allow(clippy::too_many_arguments)]
pub async fn add_request(
&self,
......
......@@ -499,6 +499,13 @@ impl PrefillRouter {
///
/// This is the shared worker selection logic used by both `resolve_prefill_worker`
/// and `query_route`.
/// Register externally-provided workers in the prefill router's slot tracker.
pub fn register_workers(&self, worker_ids: &HashSet<WorkerId>) {
if let Some(InnerPrefillRouter::KvRouter(r)) = self.prefill_router.get() {
r.chooser.register_workers(worker_ids);
}
}
pub async fn query_prefill_worker(
&self,
token_ids: &[u32],
......@@ -607,6 +614,13 @@ impl
.as_ref()
.and_then(|r| r.prefill_worker_id);
if self.router_mode.is_direct_routing() && preselected_worker.is_none() {
return Err(anyhow::anyhow!(
"Prefill worker ID required in Direct routing mode but none found in request. \
Expected prefill_worker_id to be set via x-prefill-instance-id header by external router (e.g., EPP)."
));
}
let prefill_result = async {
if let Some((worker_id, dp_rank, bootstrap_info)) = self
.resolve_prefill_worker(&prefill_req, preselected_worker)
......
......@@ -47,7 +47,9 @@ impl KvScheduler {
let selector = selector.unwrap_or(Box::new(DefaultWorkerSelector::new(None, worker_type)));
// Get initial workers from watch receiver.
// Caller must ensure at least one worker is present (via wait_for).
// When skip_initial_worker_wait is false, the caller ensures at least one
// worker is present (via wait_for). When true the map may be empty;
// workers will be lazily registered via allowed_worker_ids per-request.
let initial_workers: HashMap<WorkerId, ModelRuntimeConfig> =
workers_with_configs.borrow().clone();
......@@ -64,39 +66,51 @@ impl KvScheduler {
.map_err(|e| KvSchedulerError::InitFailed(e.to_string()))?;
// Spawn background task to sync slots when the watch value changes.
let slots_monitor = slots.clone();
let mut monitor_rx = workers_with_configs.clone();
let monitor_cancel_token = component.drt().child_token();
tokio::spawn(async move {
tracing::trace!("KvScheduler workers monitoring task started");
let mut last_workers: HashMap<WorkerId, ModelRuntimeConfig> = HashMap::new();
loop {
tokio::select! {
_ = monitor_cancel_token.cancelled() => {
tracing::trace!("KvScheduler workers monitoring task shutting down");
break;
}
result = monitor_rx.changed() => {
if result.is_err() {
tracing::warn!("KvScheduler: config watch sender dropped, shutting down");
//
// In EPP mode (skip_initial_worker_wait=true) we skip the monitoring task:
// the per-request allowed_worker_ids is the source of truth, workers are
// lazily registered via register_external_workers() from the C bindings,
// and update_workers() would impose discovery-based lifecycle (add/remove)
// on the slot tracker, conflicting with EPP ownership.
if kv_router_config.skip_initial_worker_wait {
tracing::info!("skipping discovery-based worker monitoring");
} else {
let slots_monitor = slots.clone();
let mut monitor_rx = workers_with_configs.clone();
let monitor_cancel_token = component.drt().child_token();
tokio::spawn(async move {
tracing::trace!("KvScheduler workers monitoring task started");
let mut last_workers: HashMap<WorkerId, ModelRuntimeConfig> = HashMap::new();
loop {
tokio::select! {
_ = monitor_cancel_token.cancelled() => {
tracing::trace!("KvScheduler workers monitoring task shutting down");
break;
}
result = monitor_rx.changed() => {
if result.is_err() {
tracing::warn!("KvScheduler: config watch sender dropped, shutting down");
break;
}
}
}
}
let current_workers = monitor_rx.borrow_and_update().clone();
if current_workers != last_workers {
let dp_range: HashMap<u64, (u32, u32)> = current_workers
.iter()
.map(|(&id, c)| (id, (c.data_parallel_start_rank, c.data_parallel_size)))
.collect();
slots_monitor.update_workers(&dp_range);
last_workers = current_workers;
let current_workers = monitor_rx.borrow_and_update().clone();
if current_workers != last_workers {
let dp_range: HashMap<u64, (u32, u32)> = current_workers
.iter()
.map(|(&id, c)| {
(id, (c.data_parallel_start_rank, c.data_parallel_size))
})
.collect();
slots_monitor.update_workers(&dp_range);
last_workers = current_workers;
}
}
}
});
});
}
let (request_tx, request_rx) = tokio::sync::mpsc::channel::<SchedulingRequest>(1024);
let scheduler_cancel_token = component.drt().primary_token();
......@@ -215,6 +229,11 @@ impl KvScheduler {
Ok(response)
}
/// Register externally-provided workers in the slot tracker.
pub fn register_workers(&self, worker_ids: &HashSet<WorkerId>) {
self.queue.register_workers(worker_ids);
}
pub async fn add_request(&self, req: SequenceRequest) -> Result<(), SequenceError> {
self.slots.add_request(req).await
}
......
......@@ -60,7 +60,7 @@ pub struct RoutingHints {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub cache_control_ttl: Option<u64>,
/// Optional set of allowed worker IDs to restrict routing decisions (EPP).
/// Worker IDs provided externally and not discovered by the router.
/// When set, only workers in this set are considered during scoring.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub allowed_worker_ids: Option<HashSet<WorkerId>>,
......
......@@ -249,7 +249,8 @@ where
/// Select the next worker according to the routing mode.
/// Increments round-robin counter if applicable.
/// Panics if called on Direct or KV mode - those have their own selection mechanisms.
/// Returns None for Direct mode - requires explicit worker IDs via routing hints
/// Panics for KV mode which has its own selection via find_best_match.
pub fn select_next_worker(&self) -> Option<u64> {
let instance_ids = self.client.instance_ids_avail();
let count = instance_ids.len();
......@@ -266,6 +267,7 @@ where
let counter = rand::rng().random::<u64>() as usize;
Some(instance_ids[counter % count])
}
RouterMode::Direct => None,
_ => {
panic!(
"select_next_worker should not be called for {:?} routing mode",
......@@ -277,6 +279,7 @@ where
/// Peek the next worker according to the routing mode without incrementing the counter.
/// Useful for checking if a worker is suitable before committing to it.
/// Returns None for Direct mode - requires explicit worker IDs via routing hints.
pub fn peek_next_worker(&self) -> Option<u64> {
let instance_ids = self.client.instance_ids_avail();
let count = instance_ids.len();
......@@ -296,6 +299,7 @@ where
let counter = rand::rng().random::<u64>() as usize;
Some(instance_ids[counter % count])
}
RouterMode::Direct => None,
_ => {
panic!(
"peek_next_worker should not be called for {:?} routing mode",
......
......@@ -17,6 +17,13 @@ spec:
extraPodSpec:
mainContainer:
image: nvcr.io/nvidia/ai-dynamo/frontend:my-tag
env:
- name: DYN_KV_CACHE_BLOCK_SIZE
value: "128"
- name: DYN_MODEL_NAME
value: "RedHatAI/Llama-3.3-70B-Instruct-FP8-dynamic"
- name: DYN_DECODE_FALLBACK
value: "true"
eppConfig:
# This config uses the same disagg-profile-handler as disaggregated deployments.
# The handler's graceful degradation feature makes this possible:
......
......@@ -16,7 +16,12 @@ spec:
replicas: 1
extraPodSpec:
mainContainer:
image: nvcr.io/nvidia/ai-dynamo/epp-image:my-tag
image: nvcr.io/nvidia/ai-dynamo/frontend:my-tag
env:
- name: DYN_KV_CACHE_BLOCK_SIZE
value: "128"
- name: DYN_MODEL_NAME
value: "RedHatAI/Llama-3.3-70B-Instruct-FP8-dynamic"
eppConfig:
config:
plugins:
......@@ -68,7 +73,7 @@ spec:
sharedMemory:
size: 80Gi
frontendSidecar:
image: nvcr.io/nvidia/ai-dynamo/vllm-runtime:my-tag
image: nvcr.io/nvidia/ai-dynamo/frontend:my-tag
args:
- -m
- dynamo.frontend
......@@ -97,7 +102,7 @@ spec:
- name: HF_HOME
value: /opt/models
args:
- "python3 -m dynamo.vllm --model $MODEL_PATH --served-model-name $SERVED_MODEL_NAME --tensor-parallel-size 2 --data-parallel-size 1 --is-prefill-worker --gpu-memory-utilization 0.90 --no-enable-prefix-caching --block-size 128"
- "python3 -m dynamo.vllm --model $MODEL_PATH --served-model-name $SERVED_MODEL_NAME --tensor-parallel-size 2 --data-parallel-size 1 --disaggregation-mode prefill --kv-transfer-config '{\"kv_connector\":\"NixlConnector\",\"kv_role\":\"kv_both\"}' --gpu-memory-utilization 0.90 --enable-prefix-caching --block-size 128 --kv-events-config '{\"enable_kv_cache_events\":true}'"
command:
- /bin/sh
- -c
......@@ -119,7 +124,7 @@ spec:
sharedMemory:
size: 80Gi
frontendSidecar:
image: nvcr.io/nvidia/ai-dynamo/vllm-runtime:my-tag
image: nvcr.io/nvidia/ai-dynamo/frontend:my-tag
args:
- -m
- dynamo.frontend
......@@ -148,7 +153,7 @@ spec:
- name: HF_HOME
value: /opt/models
args:
- "python3 -m dynamo.vllm --model $MODEL_PATH --served-model-name $SERVED_MODEL_NAME --tensor-parallel-size 4 --data-parallel-size 1 --gpu-memory-utilization 0.90 --no-enable-prefix-caching --block-size 128"
- "python3 -m dynamo.vllm --model $MODEL_PATH --served-model-name $SERVED_MODEL_NAME --tensor-parallel-size 4 --data-parallel-size 1 --kv-transfer-config '{\"kv_connector\":\"NixlConnector\",\"kv_role\":\"kv_both\"}' --gpu-memory-utilization 0.90 --enable-prefix-caching --block-size 128 --kv-events-config '{\"enable_kv_cache_events\":true}'"
command:
- /bin/sh
- -c
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment