Unverified Commit 2a2e63a0 authored by atchernych's avatar atchernych Committed by GitHub
Browse files

feat: handle PrefillComplete in the Dynamo EPP Scorer Plugin [DEP-728] (#5592)


Signed-off-by: default avatarAnna Tchernych <atchernych@nvidia.com>
parent 24807345
...@@ -95,7 +95,7 @@ spec: ...@@ -95,7 +95,7 @@ spec:
{{- end }} {{- end }}
{{- if .Values.etcdAddr }} {{- if .Values.etcdAddr }}
- --etcdAddr={{ .Values.etcdAddr }} - --etcdAddr={{ .Values.etcdAddr }}
{{- else }} {{- else if and .Values.etcd .Values.etcd.enabled }}
- --etcdAddr={{ .Release.Name }}-etcd.{{ .Release.Namespace }}.svc.cluster.local:2379 - --etcdAddr={{ .Release.Name }}-etcd.{{ .Release.Namespace }}.svc.cluster.local:2379
{{- end }} {{- end }}
{{- if and .Values.dynamo.istio.enabled .Values.dynamo.istio.gateway }} {{- if and .Values.dynamo.istio.enabled .Values.dynamo.istio.gateway }}
......
...@@ -28,9 +28,8 @@ Currently, these setups are only supported with the kGateway based Inference Gat ...@@ -28,9 +28,8 @@ Currently, these setups are only supported with the kGateway based Inference Gat
- [7. Usage](#7-usage) - [7. Usage](#7-usage)
- [8. Deleting the installation](#8-deleting-the-installation) - [8. Deleting the installation](#8-deleting-the-installation)
- [Gateway API Inference Extension Details](#gateway-api-inference-extension-integration) - [Gateway API Inference Extension Details](#gateway-api-inference-extension-integration)
- [v1.2.1 API Changes](#v121-api-changes) - [Router bookkeeping operations](#router-bookkeeping-operations)
- [Building for v1.2.1](#building-for-v121) - [Header Routing Hints](#header-routing-hints)
- [Header-Only Routing for v1.2.1](#header-only-routing-for-v121)
## Prerequisites ## Prerequisites
...@@ -68,6 +67,14 @@ kubectl get gateway inference-gateway ...@@ -68,6 +67,14 @@ kubectl get gateway inference-gateway
### 3. Deploy Your Model ### ### 3. Deploy Your Model ###
Follow the steps in [model deployment](../../examples/backends/vllm/deploy/README.md) to deploy `Qwen/Qwen3-0.6B` model in aggregate mode using [agg.yaml](../../examples/backends/vllm/deploy/agg.yaml) in `my-model` kubernetes namespace. Follow the steps in [model deployment](../../examples/backends/vllm/deploy/README.md) to deploy `Qwen/Qwen3-0.6B` model in aggregate mode using [agg.yaml](../../examples/backends/vllm/deploy/agg.yaml) in `my-model` kubernetes namespace.
Make sure to enable kv-routing by adding the env var in the FrontEnd.
```bash
mainContainer:
image: ...
env:
- name: DYN_ROUTER_MODE
value: "kv"
```
Sample commands to deploy model: Sample commands to deploy model:
...@@ -228,7 +235,7 @@ a. User minikube tunnel to expose the gateway to the host ...@@ -228,7 +235,7 @@ a. User minikube tunnel to expose the gateway to the host
```bash ```bash
# in first terminal # in first terminal
ps aux | grep "minikube tunnel" | grep -v grep # make sure minikube tunnel is not already running. ps aux | grep "minikube tunnel" | grep -v grep # make sure minikube tunnel is not already running.
minikube tunnel & # start the tunnel minikube tunnel # start the tunnel
# in second terminal where you want to send inference requests # in second terminal where you want to send inference requests
GATEWAY_URL=$(kubectl get svc inference-gateway -n my-model -o jsonpath='{.spec.clusterIP}') GATEWAY_URL=$(kubectl get svc inference-gateway -n my-model -o jsonpath='{.spec.clusterIP}')
...@@ -359,18 +366,14 @@ kubectl delete -f https://github.com/kubernetes-sigs/gateway-api/releases/downlo ...@@ -359,18 +366,14 @@ kubectl delete -f https://github.com/kubernetes-sigs/gateway-api/releases/downlo
This section documents the updated plugin implementation for Gateway API Inference Extension **v1.2.1**. This section documents the updated plugin implementation for Gateway API Inference Extension **v1.2.1**.
### v1.2.1 API Changes ### Router bookkeeping operations
### Building for v1.2.1
The plugin code for v1.2.1 is in: EPP performs Dynamo router book keeping operations so the FrontEnd's Router does not have to sync its state.
- `pkg/plugins/dynamo_kv_scorer/plugin.go`
### Header-Only Routing for v1.2.1 ### Header Routing Hints
In v1.2.1, the EPP uses a **header-only approach** for communicating routing decisions. Since v1.2.1, the EPP uses a **header-only approach** for communicating routing decisions.
The plugins set HTTP headers that are forwarded to the backend workers. The plugins set HTTP headers that are forwarded to the backend workers.
#### Headers Set by Dynamo Plugins #### Headers Set by Dynamo Plugins
......
...@@ -46,7 +46,7 @@ import ( ...@@ -46,7 +46,7 @@ import (
func main() { func main() {
// Register Dynamo custom plugins: // Register Dynamo custom plugins:
// - kv-aware-scorer: Implements Scorer, PreRequest, and ResponseComplete interfaces // - kv-aware-scorer: Implements Scorer, PreRequest, and ResponseStreaming interfaces
// - Score: Calls Dynamo router to select workers based on KV cache, sets routing headers // - Score: Calls Dynamo router to select workers based on KV cache, sets routing headers
// - PreRequest: Registers request with router bookkeeping after scheduling is finalized // - PreRequest: Registers request with router bookkeeping after scheduling is finalized
// - ResponseComplete: Cleans up router bookkeeping when response completes // - ResponseComplete: Cleans up router bookkeeping when response completes
......
...@@ -121,6 +121,9 @@ const ( ...@@ -121,6 +121,9 @@ const (
WorkerIDHeader = "x-worker-instance-id" WorkerIDHeader = "x-worker-instance-id"
PrefillWorkerIDHeader = "x-prefill-instance-id" PrefillWorkerIDHeader = "x-prefill-instance-id"
RoutingModeHeader = "x-dynamo-routing-mode" RoutingModeHeader = "x-dynamo-routing-mode"
// EnableLocalUpdatesHeader controls router bookkeeping in the Dynamo frontend.
// Set to "false" for GAIE Stage 2 so the EPP handles bookkeeping via C FFI.
EnableLocalUpdatesHeader = "x-enable-local-updates"
// stateKey is the key used to store routing state in PluginState // stateKey is the key used to store routing state in PluginState
stateKey = "dynamo-routing-state" stateKey = "dynamo-routing-state"
...@@ -161,13 +164,15 @@ func (s *DynamoRoutingState) Clone() plugins.StateData { ...@@ -161,13 +164,15 @@ func (s *DynamoRoutingState) Clone() plugins.StateData {
} }
type KVAwareScorer struct { type KVAwareScorer struct {
typedName plugins.TypedName typedName plugins.TypedName
pluginState *plugins.PluginState pluginState *plugins.PluginState
firstTokenSeen sync.Map // map[requestID]bool - tracks which requests have received first token
} }
var _ plugins.Plugin = (*KVAwareScorer)(nil) var _ plugins.Plugin = (*KVAwareScorer)(nil)
var _ framework.Scorer = (*KVAwareScorer)(nil) var _ framework.Scorer = (*KVAwareScorer)(nil)
var _ rc.PreRequest = (*KVAwareScorer)(nil) var _ rc.PreRequest = (*KVAwareScorer)(nil)
var _ rc.ResponseStreaming = (*KVAwareScorer)(nil)
var _ rc.ResponseComplete = (*KVAwareScorer)(nil) var _ rc.ResponseComplete = (*KVAwareScorer)(nil)
func NewKVAwareScorer(ctx context.Context) *KVAwareScorer { func NewKVAwareScorer(ctx context.Context) *KVAwareScorer {
...@@ -324,7 +329,7 @@ func initFFI() error { ...@@ -324,7 +329,7 @@ func initFFI() error {
C.double(ffiOverlapScoreWeight), C.double(ffiOverlapScoreWeight),
C.double(ffiRouterTemperature), C.double(ffiRouterTemperature),
C.bool(getEnvBoolOrDefault("DYNAMO_USE_KV_EVENTS", true)), C.bool(getEnvBoolOrDefault("DYNAMO_USE_KV_EVENTS", true)),
C.bool(getEnvBoolOrDefault("DYNAMO_ROUTER_REPLICA_SYNC", true)), C.bool(getEnvBoolOrDefault("DYNAMO_ROUTER_REPLICA_SYNC", false)), // no need as long as we call the Router Book keeping operations from the EPP.
C.bool(ffiEnforceDisagg), C.bool(ffiEnforceDisagg),
&pipeline, &pipeline,
) )
...@@ -363,6 +368,10 @@ func (k *KVAwareScorer) Score( ...@@ -363,6 +368,10 @@ func (k *KVAwareScorer) Score(
} }
req.Headers[WorkerIDHeader] = workerID req.Headers[WorkerIDHeader] = workerID
// Disable local updates in the Dynamo frontend router.
// EPP handles bookkeeping via C FFI (add_request, mark_prefill_complete, free_request).
req.Headers[EnableLocalUpdatesHeader] = "false"
// Set routing mode and prefill worker ID based on disaggregated vs aggregated // Set routing mode and prefill worker ID based on disaggregated vs aggregated
if prefillWorkerID != "" && prefillWorkerID != workerID { if prefillWorkerID != "" && prefillWorkerID != workerID {
// Disaggregated mode: separate prefill and decode workers // Disaggregated mode: separate prefill and decode workers
...@@ -438,6 +447,33 @@ func (k *KVAwareScorer) PreRequest( ...@@ -438,6 +447,33 @@ func (k *KVAwareScorer) PreRequest(
) )
} }
// ResponseStreaming is called for each chunk of a streaming response.
// On the first token, it marks prefill as complete in the Dynamo router's bookkeeping.
func (k *KVAwareScorer) ResponseStreaming(
ctx context.Context,
request *schedtypes.LLMRequest,
response *rc.Response,
targetPod *backend.Pod,
) {
if request == nil || request.RequestId == "" {
return
}
// Check if we've already seen the first token for this request
// LoadOrStore returns (value, loaded) - if loaded is false, this is the first time
if _, alreadySeen := k.firstTokenSeen.LoadOrStore(request.RequestId, true); !alreadySeen {
// This is the first token - mark prefill as complete
logger := log.FromContext(ctx)
if err := CallMarkPrefillComplete(request.RequestId); err != nil {
logger.V(logutil.DEFAULT).Error(err, "ResponseStreaming: failed to mark prefill complete",
"requestID", request.RequestId)
return
}
logger.V(logutil.VERBOSE).Info("ResponseStreaming: marked prefill complete (first token received)",
"requestID", request.RequestId)
}
}
// ResponseComplete is called after the complete response is sent to the client. // ResponseComplete is called after the complete response is sent to the client.
// It cleans up the router bookkeeping state for the completed request by calling // It cleans up the router bookkeeping state for the completed request by calling
// dynamo_router_free_request to release resources associated with the request. // dynamo_router_free_request to release resources associated with the request.
...@@ -460,6 +496,9 @@ func (k *KVAwareScorer) ResponseComplete( ...@@ -460,6 +496,9 @@ func (k *KVAwareScorer) ResponseComplete(
return return
} }
// Clean up the first token tracking map
k.firstTokenSeen.Delete(requestID)
// Call the dynamo router to free the request bookkeeping // Call the dynamo router to free the request bookkeeping
if err := callFreeRequestInternal(requestID); err != nil { if err := callFreeRequestInternal(requestID); err != nil {
logger.V(logutil.DEFAULT).Error(err, "ResponseComplete: failed to free request", logger.V(logutil.DEFAULT).Error(err, "ResponseComplete: failed to free request",
......
...@@ -14,12 +14,6 @@ ...@@ -14,12 +14,6 @@
# limitations under the License. # limitations under the License.
# Dynamo EPP Configuration # Dynamo EPP Configuration
#
# The KV scorer sets routing headers that the Lua filter at the gateway
# reads to inject nvext into the request body:
# - x-worker-instance-id: Selected worker ID
# - x-prefiller-host-port: Prefill worker (disaggregated mode)
# - x-dynamo-routing-mode: "aggregated" or "disaggregated"
apiVersion: inference.networking.x-k8s.io/v1alpha1 apiVersion: inference.networking.x-k8s.io/v1alpha1
kind: EndpointPickerConfig kind: EndpointPickerConfig
......
...@@ -17,10 +17,11 @@ apiVersion: rbac.authorization.k8s.io/v1 ...@@ -17,10 +17,11 @@ apiVersion: rbac.authorization.k8s.io/v1
metadata: metadata:
name: pod-read name: pod-read
rules: rules:
# Gateway API inference resources # Gateway API inference resources (experimental API)
- apiGroups: ["inference.networking.x-k8s.io"] - apiGroups: ["inference.networking.x-k8s.io"]
resources: ["inferencepools", "inferenceobjectives", "inferencemodelrewrites"] resources: ["inferencepools", "inferenceobjectives", "inferencemodelrewrites"]
verbs: ["get", "watch", "list"] verbs: ["get", "watch", "list"]
# Gateway API inference resources (stable API)
- apiGroups: ["inference.networking.k8s.io"] - apiGroups: ["inference.networking.k8s.io"]
resources: ["inferencepools"] resources: ["inferencepools"]
verbs: ["get", "watch", "list"] verbs: ["get", "watch", "list"]
......
...@@ -67,7 +67,7 @@ spec: ...@@ -67,7 +67,7 @@ spec:
- -pool-namespace - -pool-namespace
- "{{ .Release.Namespace }}" - "{{ .Release.Namespace }}"
- -pool-group - -pool-group
- "inference.networking.x-k8s.io" - "inference.networking.k8s.io"
- -v - -v
- "4" - "4"
- --zap-encoder - --zap-encoder
...@@ -118,8 +118,6 @@ spec: ...@@ -118,8 +118,6 @@ spec:
value: "{{ $ns }}" value: "{{ $ns }}"
- name: DYNAMO_KV_BLOCK_SIZE - name: DYNAMO_KV_BLOCK_SIZE
value: "{{ $kv }}" value: "{{ $kv }}"
- name: DYNAMO_ROUTER_REPLICA_SYNC
value: "true"
- name: USE_STREAMING - name: USE_STREAMING
value: "true" value: "true"
# HuggingFace token for downloading model config files # HuggingFace token for downloading model config files
......
...@@ -13,8 +13,8 @@ ...@@ -13,8 +13,8 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
# InferencePool for kGateway (stable API - inference.networking.k8s.io/v1) # InferencePool (stable API - inference.networking.k8s.io/v1)
# This is used by kGateway to resolve HTTPRoute backends # Used by both kGateway (to resolve HTTPRoute backends) and EPP (to discover backend pods)
apiVersion: inference.networking.k8s.io/v1 apiVersion: inference.networking.k8s.io/v1
kind: InferencePool kind: InferencePool
metadata: metadata:
...@@ -32,22 +32,4 @@ spec: ...@@ -32,22 +32,4 @@ spec:
kind: Service kind: Service
name: {{ .Values.model.shortName }}-epp name: {{ .Values.model.shortName }}-epp
port: port:
number: 9002 number: 9002
--- \ No newline at end of file
# InferencePool for EPP (experimental API - inference.networking.x-k8s.io/v1alpha2)
# This is used by the EPP to discover and route to backend pods
apiVersion: inference.networking.x-k8s.io/v1alpha2
kind: InferencePool
metadata:
name: {{ .Values.model.shortName }}-pool
namespace: {{ .Release.Namespace }}
spec:
targetPortNumber: {{ .Values.inferencePool.port }}
selector:
nvidia.com/dynamo-component: Frontend
nvidia.com/dynamo-namespace: {{ include "dynamo-gaie.dynamoNamespace" . }}
extensionRef:
failureMode: FailOpen
group: ""
kind: Service
name: {{ .Values.model.shortName }}-epp
\ No newline at end of file
...@@ -195,7 +195,9 @@ helm install dynamo-platform ./platform/ \ ...@@ -195,7 +195,9 @@ helm install dynamo-platform ./platform/ \
--namespace ${NAMESPACE} \ --namespace ${NAMESPACE} \
--create-namespace \ --create-namespace \
--set "dynamo-operator.controllerManager.manager.image.repository=${DOCKER_SERVER}/dynamo-operator" \ --set "dynamo-operator.controllerManager.manager.image.repository=${DOCKER_SERVER}/dynamo-operator" \
--set "dynamo-operator.controllerManager.manager.image.tag=${IMAGE_TAG}" --set "dynamo-operator.controllerManager.manager.image.tag=${IMAGE_TAG}" \
--set etcd.enabled=false \
--set dynamo-operator.imagePullSecrets[0].name=docker-imagepullsecret
``` ```
For detailed installation options, see the [Installation Guide](./installation_guide.md) For detailed installation options, see the [Installation Guide](./installation_guide.md)
......
...@@ -952,7 +952,7 @@ pub unsafe extern "C" fn dynamo_router_add_request( ...@@ -952,7 +952,7 @@ pub unsafe extern "C" fn dynamo_router_add_request(
} }
/// Mark prefill as completed for a request. /// Mark prefill as completed for a request.
/// Call this from GAIE hook when the first token is generated. /// Call this from the EPP extension point when the first token is generated.
/// ///
/// # Safety /// # Safety
/// - `pipeline` must be a valid, non-null pointer from `dynamo_create_worker_selection_pipeline` /// - `pipeline` must be a valid, non-null pointer from `dynamo_create_worker_selection_pipeline`
...@@ -1132,76 +1132,14 @@ pub fn add_query_instance_id( ...@@ -1132,76 +1132,14 @@ pub fn add_query_instance_id(
set_kv_annotation(request, "query_instance_id".to_string(), "") set_kv_annotation(request, "query_instance_id".to_string(), "")
} }
/// Set worker IDs directly on the NvExt fields for GAIE Stage 2 // Note: set_worker_ids_for_stage2 and set_token_data_for_stage2 have been removed.
/// // The EPP now handles routing configuration via HTTP headers:
/// For disaggregated mode: sets `prefill_worker_id` and `decode_worker_id` // - `x-worker-instance-id`: decode worker ID
/// For aggregated mode: sets `backend_instance_id` (when both IDs are the same) // - `x-prefill-instance-id`: prefill worker ID (disaggregated mode only)
/// // - `x-enable-local-updates`: set to "false" to disable router bookkeeping
/// Also sets `enable_local_updates: false` since the external caller (EPP/GAIE) //
/// will handle bookkeeping via C FFI functions. // Body modifications are NOT sent to the inference engine (only headers are forwarded),
pub fn set_worker_ids_for_stage2( // so these functions were ineffective.
request: &mut NvCreateChatCompletionRequest,
decode_worker_id: Option<i64>,
prefill_worker_id: Option<i64>,
) -> &mut NvCreateChatCompletionRequest {
let nvext = request.nvext.get_or_insert_with(|| {
NvExt::builder()
.build()
.expect("NvExt builder should not fail")
});
// Disable local updates - external caller handles bookkeeping via C FFI
nvext.enable_local_updates = Some(false);
// Check if this is aggregated mode (same worker for both)
let is_aggregated = prefill_worker_id == decode_worker_id;
if is_aggregated {
// Aggregated: use backend_instance_id for direct routing
if let Some(id) = decode_worker_id {
nvext.backend_instance_id = Some(id as u64);
tracing::debug!(
backend_instance_id = id,
"GAIE Stage 2 Aggregated: Setting backend_instance_id"
);
}
} else {
// Disaggregated: use separate prefill and decode worker IDs
if let Some(id) = prefill_worker_id {
nvext.prefill_worker_id = Some(id as u64);
}
if let Some(id) = decode_worker_id {
nvext.decode_worker_id = Some(id as u64);
}
tracing::debug!(
prefill_worker_id = ?prefill_worker_id,
decode_worker_id = ?decode_worker_id,
"GAIE Stage 2 Disaggregated: Setting prefill and decode worker IDs"
);
}
request
}
/// Set token_data directly on the NvExt field for GAIE Stage 2
pub fn set_token_data_for_stage2<'a>(
request: &'a mut NvCreateChatCompletionRequest,
tokens: &[u32],
) -> &'a mut NvCreateChatCompletionRequest {
let nvext = request.nvext.get_or_insert_with(|| {
NvExt::builder()
.build()
.expect("NvExt builder should not fail")
});
nvext.token_data = Some(tokens.to_vec());
tracing::debug!(
token_count = tokens.len(),
"GAIE Stage 2: Setting token_data"
);
request
}
/// Ensure `nvext` exists and return a mutable slice of annotations. /// Ensure `nvext` exists and return a mutable slice of annotations.
fn ensure_annotations(request: &mut NvCreateChatCompletionRequest) -> &mut Vec<String> { fn ensure_annotations(request: &mut NvCreateChatCompletionRequest) -> &mut Vec<String> {
...@@ -1227,30 +1165,34 @@ fn set_kv_annotation( ...@@ -1227,30 +1165,34 @@ fn set_kv_annotation(
request request
} }
/// Wrapper function that queries worker selection and prepares the request for GAIE Stage 2 /// Wrapper function that queries worker selection for GAIE Stage 1
/// ///
/// This function performs the complete GAIE Stage 1 flow: /// This function performs the complete GAIE Stage 1 flow:
/// 1. Clones the original request and adds "query_instance_id:" (empty) annotation /// 1. Clones the original request and adds "query_instance_id:" (empty) annotation
/// 2. Calls engine.generate() with the modified request /// 2. Calls engine.generate() with the modified request
/// 3. Extracts worker_id info and tokens from the response stream /// 3. Extracts worker_id info and tokens from the response stream
/// 4. Sets the appropriate NvExt fields on the original request for Stage 2: /// 4. Returns WorkerSelectionResult and the original request
/// - Disaggregated: prefill_worker_id, decode_worker_id, token_data ///
/// - Aggregated: backend_instance_id, token_data /// Note: The EPP (caller) is responsible for setting HTTP headers for Stage 2:
/// 5. Returns WorkerSelectionResult and the modified request ready for Stage 2 /// - `x-worker-instance-id`: decode worker ID
/// - `x-prefill-instance-id`: prefill worker ID (disaggregated mode only)
/// - `x-enable-local-updates`: "false" to disable router bookkeeping
///
/// Body modifications are NOT forwarded to the inference engine, so this function
/// does not modify the request body.
/// ///
/// # Parameters /// # Parameters
/// - `engine`: The worker selection pipeline engine /// - `engine`: The worker selection pipeline engine
/// - `original_request`: The original OpenAI request to process /// - `original_request`: The original OpenAI request to process
/// ///
/// # Returns /// # Returns
/// A tuple containing (WorkerSelectionResult, modified_original_request) /// A tuple containing (WorkerSelectionResult, original_request)
/// where the modified_original_request is ready for GAIE Stage 2 execution
pub async fn query_worker_selection_and_annotate( pub async fn query_worker_selection_and_annotate(
engine: &ServiceEngine< engine: &ServiceEngine<
SingleIn<NvCreateChatCompletionRequest>, SingleIn<NvCreateChatCompletionRequest>,
ManyOut<Annotated<NvCreateChatCompletionStreamResponse>>, ManyOut<Annotated<NvCreateChatCompletionStreamResponse>>,
>, >,
mut original_request: NvCreateChatCompletionRequest, original_request: NvCreateChatCompletionRequest,
) -> anyhow::Result<(WorkerSelectionResult, NvCreateChatCompletionRequest)> { ) -> anyhow::Result<(WorkerSelectionResult, NvCreateChatCompletionRequest)> {
// GAIE Stage 1: Query for worker selection // GAIE Stage 1: Query for worker selection
let mut query_request = original_request.clone(); let mut query_request = original_request.clone();
...@@ -1259,14 +1201,9 @@ pub async fn query_worker_selection_and_annotate( ...@@ -1259,14 +1201,9 @@ pub async fn query_worker_selection_and_annotate(
let response_stream = engine.generate(single_in).await?; let response_stream = engine.generate(single_in).await?;
let result = extract_worker_selection_from_stream(response_stream).await?; let result = extract_worker_selection_from_stream(response_stream).await?;
// Prepare request for GAIE Stage 2: Set NvExt fields directly // Return the original request unchanged.
set_worker_ids_for_stage2( // The EPP sets routing headers (worker IDs, enable_local_updates) which the
&mut original_request, // Dynamo frontend reads via apply_header_routing_overrides().
result.decode_worker_id,
result.prefill_worker_id,
);
set_token_data_for_stage2(&mut original_request, &result.tokens);
Ok((result, original_request)) Ok((result, original_request))
} }
......
...@@ -11,12 +11,16 @@ pub use crate::protocols::common::timing::TimingInfo; ...@@ -11,12 +11,16 @@ pub use crate::protocols::common::timing::TimingInfo;
pub const HEADER_WORKER_INSTANCE_ID: &str = "x-worker-instance-id"; pub const HEADER_WORKER_INSTANCE_ID: &str = "x-worker-instance-id";
pub const HEADER_PREFILL_INSTANCE_ID: &str = "x-prefill-instance-id"; pub const HEADER_PREFILL_INSTANCE_ID: &str = "x-prefill-instance-id";
/// Header to disable local bookkeeping updates (for GAIE Stage 2)
/// When set to "false", the router skips add_request, mark_prefill_completed, and free calls.
pub const HEADER_ENABLE_LOCAL_UPDATES: &str = "x-enable-local-updates";
/// Apply routing overrides from HTTP headers to nvext. /// Apply routing overrides from HTTP headers to nvext.
/// ///
/// Header mappings: /// Header mappings:
/// - `x-worker-instance-id` -> `backend_instance_id` and `decode_worker_id` /// - `x-worker-instance-id` -> `backend_instance_id` and `decode_worker_id`
/// - `x-prefill-instance-id` -> `prefill_worker_id` /// - `x-prefill-instance-id` -> `prefill_worker_id`
/// - `x-enable-local-updates` -> `enable_local_updates` (set to false to disable router bookkeeping)
/// ///
/// Headers take priority over existing nvext values when present. /// Headers take priority over existing nvext values when present.
/// If no headers are present, returns the original nvext unchanged. /// If no headers are present, returns the original nvext unchanged.
...@@ -31,7 +35,17 @@ pub fn apply_header_routing_overrides(nvext: Option<NvExt>, headers: &HeaderMap) ...@@ -31,7 +35,17 @@ pub fn apply_header_routing_overrides(nvext: Option<NvExt>, headers: &HeaderMap)
.and_then(|v| v.to_str().ok()) .and_then(|v| v.to_str().ok())
.and_then(|s| s.parse::<u64>().ok()); .and_then(|s| s.parse::<u64>().ok());
if worker_id.is_none() && prefill_id.is_none() { // Parse enable_local_updates header: "true" or "false"
let enable_local_updates = headers
.get(HEADER_ENABLE_LOCAL_UPDATES)
.and_then(|v| v.to_str().ok())
.and_then(|s| match s.to_lowercase().as_str() {
"true" | "1" => Some(true),
"false" | "0" => Some(false),
_ => None,
});
if worker_id.is_none() && prefill_id.is_none() && enable_local_updates.is_none() {
return nvext; return nvext;
} }
...@@ -43,6 +57,9 @@ pub fn apply_header_routing_overrides(nvext: Option<NvExt>, headers: &HeaderMap) ...@@ -43,6 +57,9 @@ pub fn apply_header_routing_overrides(nvext: Option<NvExt>, headers: &HeaderMap)
if let Some(id) = prefill_id { if let Some(id) = prefill_id {
ext.prefill_worker_id = Some(id); ext.prefill_worker_id = Some(id);
} }
if let Some(enabled) = enable_local_updates {
ext.enable_local_updates = Some(enabled);
}
Some(ext) Some(ext)
} }
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment