"deploy/operator/api/v1alpha1/common.go" did not exist on "02c822d6b919ececa248755e8fc64c96123a1257"
Unverified Commit 6d62fc74 authored by atchernych's avatar atchernych Committed by GitHub
Browse files

feat: Deployment for EPP as a static library (#3314)


Signed-off-by: default avatarAnna Tchernych <atchernych@nvidia.com>
parent af7a41c3
...@@ -5,7 +5,7 @@ This guide demonstrates two setups. ...@@ -5,7 +5,7 @@ This guide demonstrates two setups.
- The basic setup treats each Dynamo deployment as a black box and routes traffic randomly among the deployments. - The basic setup treats each Dynamo deployment as a black box and routes traffic randomly among the deployments.
- The EPP-aware setup uses a custom Dynamo plugin `dyn-kv` to pick the best worker. - The EPP-aware setup uses a custom Dynamo plugin `dyn-kv` to pick the best worker.
EPP’s default approach is token-aware only `by approximation` because it relies on the non-tokenized text in the prompt. But the Dynamo plugin uses a token-aware KV algorithm. It employs the dynamo router which implements kv routing by running your model’s tokenizer inline. The EPP plugin configuration lives in [`helm/dynamo-gaie/epp-config-dynamo.yaml`](helm/dynamo-gaie/epp-config-dynamo.yaml) per EPP [convention](https://gateway-api-inference-extension.sigs.k8s.io/guides/epp-configuration/config-text/). EPP’s default kv-routing approach is token-aware only `by approximation` because the prompt is tokenized with a generic tokenizer unaware of the model deployed. But the Dynamo plugin uses a token-aware KV algorithm. It employs the dynamo router which implements kv routing by running your model’s tokenizer inline. The EPP plugin configuration lives in [`helm/dynamo-gaie/epp-config-dynamo.yaml`](helm/dynamo-gaie/epp-config-dynamo.yaml) per EPP [convention](https://gateway-api-inference-extension.sigs.k8s.io/guides/epp-configuration/config-text/).
Currently, these setups are only supported with the kGateway based Inference Gateway. Currently, these setups are only supported with the kGateway based Inference Gateway.
...@@ -87,6 +87,28 @@ kubectl apply -f agg.yaml -n my-model ...@@ -87,6 +87,28 @@ kubectl apply -f agg.yaml -n my-model
``` ```
Take a note of or change the DYNAMO_IMAGE in the model deployment file. Take a note of or change the DYNAMO_IMAGE in the model deployment file.
Do not forget docker registry secret if needed.
```bash
kubectl create secret docker-registry docker-imagepullsecret \
--docker-server=$DOCKER_SERVER \
--docker-username=$DOCKER_USERNAME \
--docker-password=$DOCKER_PASSWORD \
--namespace=$NAMESPACE
```
Do not forget to include the the HuggingFace token if required.
```bash
export HF_TOKEN=your_hf_token
kubectl create secret generic hf-token-secret \
--from-literal=HF_TOKEN=${HF_TOKEN} \
-n ${NAMESPACE}
```
Create a model configuration file similar to the vllm_agg_qwen.yaml for you model.
This file demonstrates the values needed for the Vllm Agg setup in [agg.yaml](../../components/backends/vllm/deploy/agg.yaml)
Take a note of the model's block size provided in the model card.
### 4. Install Dynamo GAIE helm chart ### ### 4. Install Dynamo GAIE helm chart ###
The Inference Gateway is configured through the `inference-gateway-resources.yaml` file. The Inference Gateway is configured through the `inference-gateway-resources.yaml` file.
...@@ -95,7 +117,7 @@ Deploy the Inference Gateway resources to your Kubernetes cluster by running one ...@@ -95,7 +117,7 @@ Deploy the Inference Gateway resources to your Kubernetes cluster by running one
#### Basic Black Box Integration #### #### Basic Black Box Integration ####
For the basic black box integration run: The basic black box integration uses a standard EPP image`us-central1-docker.pkg.dev/k8s-staging-images/gateway-api-inference-extension/epp:v0.4.0`. For the basic black box integration run:
```bash ```bash
cd deploy/inference-gateway cd deploy/inference-gateway
...@@ -104,9 +126,13 @@ helm install dynamo-gaie ./helm/dynamo-gaie -n my-model -f ./vllm_agg_qwen.yaml ...@@ -104,9 +126,13 @@ helm install dynamo-gaie ./helm/dynamo-gaie -n my-model -f ./vllm_agg_qwen.yaml
#### EPP-aware Integration with the custom Dynamo Plugin #### #### EPP-aware Integration with the custom Dynamo Plugin ####
Dynamo provides a custom routing plugin `pkg/epp/scheduling/plugins/dynamo_kv_scorer/plugin.go` to perform efficient kv routing.
The Dynamo router is built as a static library, the EPP router will call to provide fast inference.
You can either use the image `nvcr.io/nvstaging/ai-dynamo/epp-inference-extension-dynamo:v0.6.0-1` for the EPP_IMAGE in the Helm deployment command and proceed to the step 2 or you can build the image yourself following the steps below.
##### 1. Build the custom EPP image ##### ##### 1. Build the custom EPP image #####
We provide git patches for you to use. If you choose to build your own image use the steps below. Proceed to step 2 otherwise to deploy with Helm.
##### 1.1 Clone the official GAIE repo in a separate folder ##### ##### 1.1 Clone the official GAIE repo in a separate folder #####
...@@ -116,44 +142,74 @@ cd gateway-api-inference-extension ...@@ -116,44 +142,74 @@ cd gateway-api-inference-extension
git checkout v0.5.1 git checkout v0.5.1
``` ```
##### 1.2 Apply patch(es) ##### ##### 1.2 Build the Dynamo Custom EPP #####
###### 1.2.1 Clone the official EPP repo ######
```bash
# Clone the official GAIE repo in a separate folder
cd path/to/gateway-api-inference-extension
git clone git@github.com:kubernetes-sigs/gateway-api-inference-extension.git
git checkout v0.5.1
```
###### 1.2.2 Run the script to build the EPP image ######
The script will apply a custom patch to the code with your GAIE repo and build the image for you to use.
```bash ```bash
git apply <dynamo-folder>/deploy/inference-gateway/epp-patches/v0.5.1-1/epp-v0.5.1-dyn1.patch # Use your custom paths
export DYNAMO_DIR=/path/to/dynamo
export EPP_DIR=/path/to/gateway-api-inference-extension
# Run the script
cd deploy/inference-gateway
./build-epp-dynamo.sh
``` ```
##### 1.3 Build the custom EPP image ##### Under the hood the script applies the Dynamo Patch to the EPP code base; creates a Dynamo Router static library and builds a custom EPP image with it.
Re-tag the freshly built image and push it to your registry.
```bash ```bash
# Build the image <your-docker-registry/dynamo-custom-epp:<your-tag> and then manually push docker images
make image-local-load \ docker tag <your-new-id> <your-image-tag>
IMAGE_REGISTRY=<your-docker-registry> \ docker push <your-image-tag>
IMAGE_NAME=dynamo-custom-epp \
EXTRA_TAG=<your-tag>
# Or run the command below to build push to your registry
make image-local-push \
IMAGE_REGISTRY=<your-docker-registry> \
IMAGE_NAME=dynamo-custom-epp \
EXTRA_TAG=<your-tag>
``` ```
##### 2. Install through helm ##### ##### 2. Deploy through helm #####
```bash ```bash
cd deploy/inference-gateway cd deploy/inference-gateway
# Export the Dynamo image you have used when deploying your model in Step 3. # Export the Dynamo image you have used when deploying your model in Step 3.
export DYNAMO_IMAGE=<the-dynamo-image-you-have-used-when-deploying-the-model> export DYNAMO_IMAGE=<the-dynamo-image-you-have-used-when-deploying-the-model>
export EPP_IMAGE=<the-epp-image-you-built> # i.e. docker.io/lambda108/epp-inference-extension-dynamo:v0.5.1-1 # Export the image tag you have used when building the EPP i.e. docker.io/lambda108/epp-inference-extension-dynamo:v0.5.1-2
export EPP_IMAGE=<the-epp-image-you-built>
```
**Configuration**
You can configure the plugin by setting environment vars in your [values-epp-aware.yaml].
- Overwrite the `DYNAMO_NAMESPACE` env var if needed to match your model's dynamo namespace.
- Set `DYNAMO_BUSY_THRESHOLD` to configure the upper bound on how “full” a worker can be (often derived from kv_active_blocks or other load metrics) before the router skips it. If the selected worker exceeds this value, routing falls back to the next best candidate. By default the value is negative meaning this is not enabled.
- Set `DYNAMO_ROUTER_REPLICA_SYNC=true` to enable a background watcher to keep multiple router instances in sync (important if you run more than one KV router per component).
- By default the Dynamo plugin uses KV routing. You can expose `DYNAMO_USE_KV_ROUTING=false` in your [values-epp-aware.yaml] if you prefer to route in the round-robin fashion.
- If using kv-routing:
- Overwrite the `DYNAMO_KV_BLOCK_SIZE` in your [values-epp-aware.yaml](./values-epp-aware.yaml) to match your model's block size.The `DYNAMO_KV_BLOCK_SIZE` env var is ***MANDATORY*** to prevent silent KV routing failures.
- Set `DYNAMO_OVERLAP_SCORE_WEIGHT` to weigh how heavily the score uses token overlap (predicted KV cache hits) versus other factors (load, historical hit rate). Higher weight biases toward reusing workers with similar cached prefixes.
- Set `DYNAMO_ROUTER_TEMPERATURE` to soften or sharpen the selection curve when combining scores. Low temperature makes the router pick the top candidate deterministically; higher temperature lets lower-scoring workers through more often (exploration).
- Set `DYNAMO_USE_KV_EVENTS=false` if you want to disable KV event tracking while using kv-routing
- See the [KV cache routing design](../../docs/architecture/kv_cache_routing.md) for details.
```bash
helm upgrade --install dynamo-gaie ./helm/dynamo-gaie \ helm upgrade --install dynamo-gaie ./helm/dynamo-gaie \
-n my-model \ -n my-model \
-f ./vllm_agg_qwen.yaml \ -f ./vllm_agg_qwen.yaml \
-f ./values-epp-aware.yaml \ -f ./values-epp-aware.yaml \
--set eppAware.enabled=true \ --set eppAware.enabled=true \
--set-string eppAware.eppImage=$EPP_IMAGE \ --set-string eppAware.eppImage=$EPP_IMAGE
--set-string eppAware.sidecar.image=$DYNAMO_IMAGE
``` ```
...@@ -162,6 +218,7 @@ Key configurations include: ...@@ -162,6 +218,7 @@ Key configurations include:
- A service for the inference gateway - A service for the inference gateway
- Required RBAC roles and bindings - Required RBAC roles and bindings
- RBAC permissions - RBAC permissions
- values-epp-aware.yaml sets eppAware.dynamoNamespace=vllm-agg for the bundled example. Point it at your actual Dynamo namespace by editing that file or adding --set eppAware.dynamoNamespace=<namespace> (and likewise for dynamoComponent, dynamoKvBlockSize if they differ).
### 5. Verify Installation ### ### 5. Verify Installation ###
......
#!/usr/bin/env bash
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
set -e # Exit on any error
# Configuration - Set these environment variables before running
if [[ -z "${DYNAMO_DIR}" ]]; then
echo "DYNAMO_DIR environment variable must be set"
echo " Example: export DYNAMO_DIR=/path/to/dynamo"
exit 1
fi
if [[ -z "${EPP_DIR}" ]]; then
echo "EPP_DIR environment variable must be set"
echo " Example: export EPP_DIR=/path/to/gateway-api-inference-extension-dynamo"
exit 1
fi
DYNAMO_LIB_DIR="${EPP_DIR}/pkg/epp/scheduling/plugins/dynamo_kv_scorer/lib"
DYNAMO_INCLUDE_DIR="${EPP_DIR}/pkg/epp/scheduling/plugins/dynamo_kv_scorer/include"
echo "🏗️ Building Dynamo KV Router C Library..."
# Step 1: Build the static library
echo "📦 Building static library..."
cd "${DYNAMO_DIR}"
cargo build --release -p libdynamo_llm
# Step 2: Generate header file (with fallback)
echo "📝 Generating C header..."
HEADER_OUTPUT="${DYNAMO_DIR}/lib/bindings/c/include/nvidia/dynamo_llm/llm_engine.h"
if ! cbindgen --config lib/bindings/c/cbindgen.toml --crate libdynamo_llm --output "${HEADER_OUTPUT}"; then
echo "cbindgen failed, using fallback header..."
cp "${DYNAMO_DIR}/lib/bindings/c/src/fallback_header.h" "${HEADER_OUTPUT}"
fi
# Step 3: Ensure EPP directories exist
echo "Preparing EPP directories..."
mkdir -p "${DYNAMO_LIB_DIR}"
mkdir -p "${DYNAMO_INCLUDE_DIR}"
# Step 4: Copy files to EPP
echo "Copying files to EPP..."
cp "${HEADER_OUTPUT}" "${DYNAMO_INCLUDE_DIR}/"
cp "${DYNAMO_DIR}/target/release/libdynamo_llm_capi.a" "${DYNAMO_LIB_DIR}/"
# Verify files were copied
if [[ ! -f "${DYNAMO_INCLUDE_DIR}/llm_engine.h" ]]; then
echo "Header file copy failed!"
exit 1
fi
if [[ ! -f "${DYNAMO_LIB_DIR}/libdynamo_llm_capi.a" ]]; then
echo "Library file copy failed!"
exit 1
fi
echo "Files copied successfully:"
echo " Header: ${DYNAMO_INCLUDE_DIR}/llm_engine.h"
echo " Library: ${DYNAMO_LIB_DIR}/libdynamo_llm_capi.a"
# Step 5: Apply Dynamo patch (if it exists)
echo "🔧 Applying Dynamo patch..."
cd "${EPP_DIR}"
PATCH_FILE="${DYNAMO_DIR}/deploy/inference-gateway/epp-patches/v0.5.1-2/epp-v0.5.1-dyn2.patch"
if [[ -f "${PATCH_FILE}" ]]; then
if git apply --check "${PATCH_FILE}" 2>/dev/null; then
git apply "${PATCH_FILE}"
echo "Patch applied successfully"
else
echo "Patch doesn't apply cleanly - may already be applied or need manual resolution"
fi
else
echo "No patch file found at ${PATCH_FILE}"
fi
# Step 6: Build the EPP image
echo "Building the EPP image..."
make dynamo-image-local-load
echo "EPP with Dynamo KV routing built"
diff --git a/Dockerfile.dynamo b/Dockerfile.dynamo
new file mode 100644
index 0000000..3f0e0a0
--- /dev/null
+++ b/Dockerfile.dynamo
@@ -0,0 +1,66 @@
+# Dockerfile.dynamo - Custom Dockerfile for Dynamo FFI plugin
+ARG BUILDER_IMAGE=golang:1.24
+ARG BASE_IMAGE=ubuntu:22.04
+
+############################
+# Builder
+############################
+FROM ${BUILDER_IMAGE} AS builder
+
+ENV CGO_ENABLED=1
+ENV GOOS=linux
+ENV GOARCH=amd64
+# be explicit; helps cgo when linking libstdc++
+ENV CC=gcc
+ENV CXX=g++
+
+# C/C++ toolchain for cgo, and libstdc++ for link-time
+RUN apt-get update && apt-get install -y --no-install-recommends \
+ build-essential \
+ gcc g++ \
+ libc6-dev \
+ ca-certificates \
+ && rm -rf /var/lib/apt/lists/*
+
+ARG COMMIT_SHA=unknown
+ARG BUILD_REF
+
+WORKDIR /src
+
+# deps first (cache)
+COPY go.mod go.sum ./
+RUN go mod download
+
+# source
+COPY cmd/epp ./cmd/epp
+COPY pkg/epp ./pkg/epp
+COPY internal ./internal
+COPY api ./api
+
+# sanity (optional)
+RUN ls -la pkg/epp/scheduling/plugins/dynamo_kv_scorer/include/ || echo "Headers not found"
+RUN ls -la pkg/epp/scheduling/plugins/dynamo_kv_scorer/lib/ || echo "Library not found"
+
+# build
+WORKDIR /src/cmd/epp
+RUN go build \
+ -ldflags="-X sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics.CommitSHA=${COMMIT_SHA} -X sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics.BuildRef=${BUILD_REF}" \
+ -o /epp
+
+############################
+# Runtime
+############################
+FROM ${BASE_IMAGE} AS runtime
+
+# Minimal runtime deps; include libstdc++ runtime for -lstdc++
+RUN apt-get update && apt-get install -y --no-install-recommends \
+ ca-certificates \
+ libstdc++6 \
+ && rm -rf /var/lib/apt/lists/* \
+ && groupadd -r nonroot && useradd -r -g nonroot nonroot
+
+WORKDIR /
+COPY --from=builder /epp /epp
+
+USER nonroot:nonroot
+ENTRYPOINT ["/epp"]
diff --git a/Makefile b/Makefile
index dee7e99..4679ce2 100644
--- a/Makefile
+++ b/Makefile
@@ -170,6 +170,48 @@ verify-all:
##@ Build
+##@ Dynamo EPP with FFI
+
+# Build the Dynamo EPP image with CGO static library support
+.PHONY: dynamo-image-local-build
+dynamo-image-local-build: ## Build the Dynamo EPP image using Docker Buildx for local development.
+ BUILDER=$(shell $(DOCKER_BUILDX_CMD) create --use)
+ $(MAKE) dynamo-image-build PUSH=$(PUSH)
+ $(MAKE) dynamo-image-build LOAD=$(LOAD)
+ $(DOCKER_BUILDX_CMD) rm $$BUILDER
+
+.PHONY: dynamo-image-local-push
+dynamo-image-local-push: PUSH=--push ## Build the Dynamo EPP image for local development and push it to $IMAGE_REPO.
+dynamo-image-local-push: dynamo-image-local-build
+
+.PHONY: dynamo-image-local-load
+dynamo-image-local-load: LOAD=--load ## Build the Dynamo EPP image for local development and load it in the local Docker registry.
+dynamo-image-local-load: dynamo-image-local-build
+
+.PHONY: dynamo-image-build
+dynamo-image-build: ## Build the Dynamo EPP image using Docker Buildx with CGO support.
+ $(IMAGE_BUILD_CMD) -f Dockerfile.dynamo -t $(IMAGE_TAG) \
+ --platform=$(PLATFORMS) \
+ --build-arg BASE_IMAGE=ubuntu:22.04 \
+ --build-arg BUILDER_IMAGE=$(BUILDER_IMAGE) \
+ --build-arg COMMIT_SHA=${GIT_COMMIT_SHA} \
+ --build-arg BUILD_REF=${BUILD_REF} \
+ $(PUSH) \
+ $(LOAD) \
+ $(IMAGE_BUILD_EXTRA_OPTS) ./
+
+.PHONY: dynamo-image-push
+dynamo-image-push: PUSH=--push ## Build the Dynamo EPP image and push it to $IMAGE_REPO.
+dynamo-image-push: dynamo-image-build
+
+.PHONY: dynamo-image-load
+dynamo-image-load: LOAD=--load ## Build the Dynamo EPP image and load it in the local Docker registry.
+dynamo-image-load: dynamo-image-build
+
+.PHONY: dynamo-image-kind
+dynamo-image-kind: dynamo-image-build ## Build the Dynamo EPP image and load it to kind cluster $KIND_CLUSTER ("kind" by default).
+ kind load docker-image $(IMAGE_TAG) --name $(KIND_CLUSTER)
+
# Build the container image
.PHONY: image-local-build
image-local-build: ## Build the EPP image using Docker Buildx for local development.
diff --git a/cmd/epp/main.go b/cmd/epp/main.go
index b5e0617..8592735 100644
--- a/cmd/epp/main.go
+++ b/cmd/epp/main.go
@@ -22,6 +22,11 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/gateway-api-inference-extension/cmd/epp/runner"
+ eppplugins "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins"
+
+ // Dynamo plugins
+ dynprereq "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/requestcontrol/plugins/dynamo_inject_workerid"
+ dynscorer "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins/dynamo_kv_scorer"
)
func main() {
@@ -30,6 +35,9 @@ func main() {
// For adding out-of-tree plugins to the plugins registry, use the following:
// plugins.Register(my-out-of-tree-plugin-name, my-out-of-tree-plugin-factory-function)
+ eppplugins.Register("dynamo-inject-workerid", dynprereq.InjectWorkerIDPreRequestFactory)
+ eppplugins.Register("kv-aware-scorer", dynscorer.KVAwareScorerFactory)
+
if err := runner.NewRunner().Run(ctrl.SetupSignalHandler()); err != nil {
os.Exit(1)
}
diff --git a/pkg/bbr/handlers/request.go b/pkg/bbr/handlers/request.go
index 32fffc0..1aa1b85 100644
--- a/pkg/bbr/handlers/request.go
+++ b/pkg/bbr/handlers/request.go
@@ -18,8 +18,10 @@ package handlers
import (
"context"
+ "encoding/base64"
"encoding/json"
"fmt"
+ "strings"
basepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
eppb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
@@ -31,11 +33,49 @@ import (
const modelHeader = "X-Gateway-Model-Name"
+// Dynamo-related
+const (
+ workerIDHeader = "x-worker-instance-id"
+ injectHintHeader = "x-epp-inject-nvext-worker-instance-id"
+ tokenDataHeader = "x-epp-inject-nvext-token-data"
+)
+
// HandleRequestBody handles request bodies.
func (s *Server) HandleRequestBody(ctx context.Context, data map[string]any) ([]*eppb.ProcessingResponse, error) {
logger := log.FromContext(ctx)
var ret []*eppb.ProcessingResponse
+ // If we captured a worker id hint in the headers phase, inject it into body JSON:
+ // nvext.backend_instance_id = <workerID>
+ if wid := strings.TrimSpace(s.workerIDHint); wid != "" {
+ // ensure nvext is a map[string]any
+ if nv, ok := data["nvext"]; !ok || nv == nil {
+ data["nvext"] = map[string]any{"backend_instance_id": wid}
+ } else if m, ok := nv.(map[string]any); ok {
+ m["backend_instance_id"] = wid
+ } else {
+ // if nvext was some other type, replace with a clean map
+ data["nvext"] = map[string]any{"backend_instance_id": wid}
+ }
+ }
+
+ // If we captured token_data in headers, decode and inject as nvext.token_data
+ if td := strings.TrimSpace(s.tokenDataHint); td != "" {
+ // header value is base64(JSON array)
+ if raw, err := base64.StdEncoding.DecodeString(td); err == nil {
+ var arr []int64
+ if err := json.Unmarshal(raw, &arr); err == nil && len(arr) > 0 {
+ // ensure nvext map exists
+ nv, ok := data["nvext"].(map[string]any)
+ if !ok || nv == nil {
+ nv = map[string]any{}
+ data["nvext"] = nv
+ }
+ nv["token_data"] = arr
+ }
+ }
+ }
+
requestBodyBytes, err := json.Marshal(data)
if err != nil {
return nil, err
@@ -46,6 +86,7 @@ func (s *Server) HandleRequestBody(ctx context.Context, data map[string]any) ([]
metrics.RecordModelNotInBodyCounter()
logger.V(logutil.DEFAULT).Info("Request body does not contain model parameter")
if s.streaming {
+ // still stream the possibly mutated body
ret = append(ret, &eppb.ProcessingResponse{
Response: &eppb.ProcessingResponse_RequestHeaders{
RequestHeaders: &eppb.HeadersResponse{},
@@ -53,14 +94,24 @@ func (s *Server) HandleRequestBody(ctx context.Context, data map[string]any) ([]
})
ret = addStreamedBodyResponse(ret, requestBodyBytes)
return ret, nil
- } else {
- ret = append(ret, &eppb.ProcessingResponse{
+ }
+
+ // non-streaming: return a body response with the (possibly) mutated body
+ return []*eppb.ProcessingResponse{
+ {
Response: &eppb.ProcessingResponse_RequestBody{
- RequestBody: &eppb.BodyResponse{},
+ RequestBody: &eppb.BodyResponse{
+ Response: &eppb.CommonResponse{
+ BodyMutation: &eppb.BodyMutation{
+ Mutation: &eppb.BodyMutation_Body{
+ Body: requestBodyBytes,
+ },
+ },
+ },
+ },
},
- })
- }
- return ret, nil
+ },
+ }, nil
}
modelStr, ok := modelVal.(string)
@@ -73,6 +124,7 @@ func (s *Server) HandleRequestBody(ctx context.Context, data map[string]any) ([]
metrics.RecordSuccessCounter()
if s.streaming {
+ // set the model header, then stream the (possibly) mutated body
ret = append(ret, &eppb.ProcessingResponse{
Response: &eppb.ProcessingResponse_RequestHeaders{
RequestHeaders: &eppb.HeadersResponse{
@@ -86,16 +138,42 @@ func (s *Server) HandleRequestBody(ctx context.Context, data map[string]any) ([]
RawValue: []byte(modelStr),
},
},
+ // also keep the worker id header if we have one
+ func() *basepb.HeaderValueOption {
+ if strings.TrimSpace(s.workerIDHint) == "" {
+ return nil
+ }
+ return &basepb.HeaderValueOption{
+ Header: &basepb.HeaderValue{
+ Key: workerIDHeader,
+ RawValue: []byte(s.workerIDHint),
+ },
+ }
+ }(),
},
},
},
},
},
})
+
+ // prune nil entries if worker id not present
+ hm := ret[len(ret)-1].GetRequestHeaders().GetResponse().GetHeaderMutation()
+ if hm != nil && hm.SetHeaders != nil {
+ out := hm.SetHeaders[:0]
+ for _, h := range hm.SetHeaders {
+ if h != nil {
+ out = append(out, h)
+ }
+ }
+ hm.SetHeaders = out
+ }
+
ret = addStreamedBodyResponse(ret, requestBodyBytes)
return ret, nil
}
+ // Non-streaming: set model header and replace the body with our mutated JSON
return []*eppb.ProcessingResponse{
{
Response: &eppb.ProcessingResponse_RequestBody{
@@ -111,6 +189,22 @@ func (s *Server) HandleRequestBody(ctx context.Context, data map[string]any) ([]
RawValue: []byte(modelStr),
},
},
+ func() *basepb.HeaderValueOption {
+ if strings.TrimSpace(s.workerIDHint) == "" {
+ return nil
+ }
+ return &basepb.HeaderValueOption{
+ Header: &basepb.HeaderValue{
+ Key: workerIDHeader,
+ RawValue: []byte(s.workerIDHint),
+ },
+ }
+ }(),
+ },
+ },
+ BodyMutation: &eppb.BodyMutation{
+ Mutation: &eppb.BodyMutation_Body{
+ Body: requestBodyBytes,
},
},
},
@@ -141,6 +235,32 @@ func addStreamedBodyResponse(responses []*eppb.ProcessingResponse, requestBodyBy
// HandleRequestHeaders handles request headers.
func (s *Server) HandleRequestHeaders(headers *eppb.HttpHeaders) ([]*eppb.ProcessingResponse, error) {
+ // reset per-request
+ s.workerIDHint = ""
+ s.tokenDataHint = ""
+
+ if m := headers.GetHeaders(); m != nil {
+ for _, h := range m.GetHeaders() {
+ k := strings.ToLower(h.GetKey())
+
+ switch k {
+ case injectHintHeader, workerIDHeader:
+ if rv := h.GetRawValue(); len(rv) > 0 {
+ s.workerIDHint = strings.TrimSpace(string(rv))
+ } else {
+ s.workerIDHint = strings.TrimSpace(h.GetValue())
+ }
+ case tokenDataHeader:
+ if rv := h.GetRawValue(); len(rv) > 0 {
+ s.tokenDataHint = strings.TrimSpace(string(rv))
+ } else {
+ s.tokenDataHint = strings.TrimSpace(h.GetValue())
+ }
+ }
+ }
+ }
+
+ // No header mutations needed here; body phase will do the JSON injection.
return []*eppb.ProcessingResponse{
{
Response: &eppb.ProcessingResponse_RequestHeaders{
diff --git a/pkg/bbr/handlers/server.go b/pkg/bbr/handlers/server.go
index a580380..eb2893f 100644
--- a/pkg/bbr/handlers/server.go
+++ b/pkg/bbr/handlers/server.go
@@ -38,7 +38,9 @@ func NewServer(streaming bool) *Server {
// Server implements the Envoy external processing server.
// https://www.envoyproxy.io/docs/envoy/latest/api-v3/service/ext_proc/v3/external_processor.proto
type Server struct {
- streaming bool
+ streaming bool
+ workerIDHint string
+ tokenDataHint string
}
func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error {
diff --git a/pkg/epp/requestcontrol/plugins/dynamo_inject_workerid/plugin.go b/pkg/epp/requestcontrol/plugins/dynamo_inject_workerid/plugin.go
new file mode 100644
index 0000000..b6708fa
--- /dev/null
+++ b/pkg/epp/requestcontrol/plugins/dynamo_inject_workerid/plugin.go
@@ -0,0 +1,69 @@
+package dynamo_inject_workerid
+
+import (
+ "context"
+ "encoding/json"
+ "strings"
+
+ "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins"
+ rc "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/requestcontrol"
+ schedtypes "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types"
+)
+
+const (
+ typeString = "dynamo-inject-workerid"
+ pluginName = "dynamo-inject-workerid"
+ WorkerIDHeader = "x-worker-instance-id"
+ injectHintHeader = "x-epp-inject-nvext-worker-instance-id"
+ TokenDataHeader = "x-epp-inject-nvext-token-data"
+)
+
+var _ plugins.Plugin = (*InjectWorkerIDPreRequest)(nil)
+var _ rc.PreRequest = (*InjectWorkerIDPreRequest)(nil)
+
+type InjectWorkerIDPreRequest struct {
+ typedName plugins.TypedName
+}
+
+func NewInjectWorkerIDPreRequest() *InjectWorkerIDPreRequest {
+ return &InjectWorkerIDPreRequest{
+ typedName: plugins.TypedName{Type: typeString, Name: pluginName},
+ }
+}
+
+func (p *InjectWorkerIDPreRequest) WithName(name string) *InjectWorkerIDPreRequest {
+ p.typedName.Name = name
+ return p
+}
+
+func InjectWorkerIDPreRequestFactory(name string, _ json.RawMessage, _ plugins.Handle) (plugins.Plugin, error) {
+ return NewInjectWorkerIDPreRequest().WithName(name), nil
+}
+
+func (p *InjectWorkerIDPreRequest) TypedName() plugins.TypedName { return p.typedName }
+
+func (p *InjectWorkerIDPreRequest) PreRequest(
+ _ context.Context,
+ req *schedtypes.LLMRequest,
+ _ *schedtypes.SchedulingResult,
+ _ int,
+) {
+ if req == nil {
+ return
+ }
+ if req.Headers == nil {
+ req.Headers = map[string]string{}
+ }
+ wid := strings.TrimSpace(req.Headers[WorkerIDHeader])
+ if wid == "" {
+ return
+ }
+ req.Headers[WorkerIDHeader] = wid
+ req.Headers[injectHintHeader] = wid
+
+ // Pass through token-data header if scorer set it
+ if td := strings.TrimSpace(req.Headers[TokenDataHeader]); td != "" {
+ req.Headers[TokenDataHeader] = td
+ }
+
+}
diff --git a/pkg/epp/scheduling/plugins/dynamo_kv_scorer/epp-config-dynamo.yaml b/pkg/epp/scheduling/plugins/dynamo_kv_scorer/epp-config-dynamo.yaml
new file mode 100644
index 0000000..b689c00
--- /dev/null
+++ b/pkg/epp/scheduling/plugins/dynamo_kv_scorer/epp-config-dynamo.yaml
@@ -0,0 +1,21 @@
+# This is an example for configuring the EPP to use the dynamo token-aware kv router for scoring the pods
+apiVersion: inference.networking.x-k8s.io/v1alpha1
+kind: EndpointPickerConfig
+plugins:
+ # Required: tells EPP which profile to use (even if you only have one)
+ - type: single-profile-handler
+
+ # Picker: chooses the final endpoint after scoring
+ - name: picker
+ type: max-score-picker
+ - name: dyn-pre
+ type: dynamo-inject-workerid
+ parameters: {}
+ - name: dyn-kv
+ type: kv-aware-scorer
+schedulingProfiles:
+ - name: default
+ plugins:
+ - pluginRef: dyn-kv
+ weight: 1
+ - pluginRef: picker
diff --git a/pkg/epp/scheduling/plugins/dynamo_kv_scorer/plugin.go b/pkg/epp/scheduling/plugins/dynamo_kv_scorer/plugin.go
new file mode 100644
index 0000000..1f6a41f
--- /dev/null
+++ b/pkg/epp/scheduling/plugins/dynamo_kv_scorer/plugin.go
@@ -0,0 +1,428 @@
+package dynamo_kv_scorer
+
+/*
+#cgo CPPFLAGS: -I${SRCDIR}/include
+#cgo CXXFLAGS: -std=c++17
+#cgo LDFLAGS: ${SRCDIR}/lib/libdynamo_llm_capi.a -lstdc++ -ldl -lpthread -lm
+
+#include <stdint.h>
+#include <stddef.h>
+#include <stdlib.h> // for free
+#include <stdbool.h>
+
+// enum underlying type is uint32_t; matches cbindgen output
+typedef uint32_t dynamo_llm_result_t;
+enum { DYNAMO_OK = 0, DYNAMO_ERR = 1 };
+
+// opaque handle forward-decl
+struct WorkerSelectionPipeline;
+typedef struct WorkerSelectionPipeline WorkerSelectionPipeline;
+
+// Prototypes (C-compatible)
+dynamo_llm_result_t dynamo_llm_init(const char *namespace_c_str,
+ const char *component_c_str,
+ int64_t worker_id,
+ uint32_t kv_block_size);
+
+dynamo_llm_result_t dynamo_llm_shutdown(void);
+dynamo_llm_result_t dynamo_llm_load_publisher_create(void);
+
+dynamo_llm_result_t dynamo_kv_event_publish_stored(uint64_t event_id,
+ const uint32_t *token_ids,
+ const uintptr_t *num_block_tokens,
+ const uint64_t *block_ids,
+ size_t num_blocks,
+ const uint64_t *parent_hash,
+ uint64_t lora_id);
+
+dynamo_llm_result_t dynamo_kv_event_publish_removed(uint64_t event_id,
+ const uint64_t *block_ids,
+ size_t num_blocks);
+
+dynamo_llm_result_t dynamo_create_worker_selection_pipeline(const char *namespace_c_str,
+ const char *component_c_str,
+ const char *model_name_c_str,
+ bool use_kv_routing,
+ double busy_threshold,
+ double overlap_score_weight,
+ double router_temperature,
+ bool use_kv_events,
+ bool router_replica_sync,
+ WorkerSelectionPipeline **pipeline_out);
+
+dynamo_llm_result_t dynamo_destroy_worker_selection_pipeline(WorkerSelectionPipeline *pipeline);
+
+dynamo_llm_result_t dynamo_query_worker_selection_and_annotate(WorkerSelectionPipeline *pipeline,
+ const char *request_json_c_str,
+ int64_t *worker_instance_id_out,
+ uint32_t **token_ids_out,
+ size_t *token_count_out,
+ char **annotated_request_json_out);
+
+dynamo_llm_result_t dynamo_free_worker_selection_result(uint32_t *token_ids,
+ size_t token_count,
+ char *annotated_request_json);
+*/
+import "C"
+
+import (
+ "context"
+ "encoding/base64"
+ "encoding/json"
+ "fmt"
+ "os"
+ "strings"
+ "sync"
+ "unsafe"
+
+ log "sigs.k8s.io/controller-runtime/pkg/log"
+ "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins"
+ "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework"
+ schedtypes "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types"
+ logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
+)
+
+const (
+ PluginName = "dynamo-kv-scorer"
+ KVAwareScorerType = "kv-aware-scorer"
+ StateKeyWorkerInstanceID = schedtypes.StateKey("dynamo/worker-instance-id")
+ WorkerIDHeader = "x-worker-instance-id"
+ TokenDataHeader = "x-epp-inject-nvext-token-data"
+)
+
+// --------------------------- config / env ---------------------------
+
+var warmupOnce sync.Once
+var warmupErr error
+
+type stateString string
+type params struct {
+}
+
+func (s stateString) Clone() schedtypes.StateData { return s }
+
+type KVAwareScorer struct {
+ typedName plugins.TypedName
+}
+
+var _ plugins.Plugin = (*KVAwareScorer)(nil)
+var _ framework.Scorer = (*KVAwareScorer)(nil)
+
+func NewKVAwareScorer() *KVAwareScorer {
+ return &KVAwareScorer{
+ typedName: plugins.TypedName{Type: KVAwareScorerType, Name: PluginName},
+ }
+}
+
+func (k *KVAwareScorer) WithName(name string) *KVAwareScorer { k.typedName.Name = name; return k }
+
+func KVAwareScorerFactory(name string, raw json.RawMessage, _ plugins.Handle) (plugins.Plugin, error) {
+ p := params{}
+ _ = json.Unmarshal(raw, &p)
+
+ s := NewKVAwareScorer().WithName(name)
+
+ // one-time FFI init (runtime + persistent pipeline)
+ warmupOnce.Do(func() {
+ defer func() {
+ if r := recover(); r != nil {
+ warmupErr = fmt.Errorf("Dynamo configuration error: %v", r)
+ }
+ }()
+ warmupErr = initFFI()
+ })
+ if warmupErr != nil {
+ return nil, fmt.Errorf("Dynamo FFI init for the Router failed: %w", warmupErr)
+ }
+
+ return s, nil
+}
+
+func (k *KVAwareScorer) TypedName() plugins.TypedName { return k.typedName }
+
+// --------------------------- FFI integration ---------------------------
+
+var (
+ ffiOnce sync.Once
+ ffiErr error
+
+ ffiNamespace string
+ ffiComponent string
+ ffiModel string
+ ffiOverlapScoreWeight float64
+ ffiRouterTemperature float64
+ ffiKvBlockSize uint32
+ ffiWorkerID int64
+
+ runtimeInitialized bool
+
+ // Boxed pipeline handle (owned on the Rust side, opaque here)
+ pipeline *C.struct_WorkerSelectionPipeline
+ pipelineMutex sync.RWMutex
+)
+
+func loadDynamoConfig() {
+ ffiNamespace = getEnvOrDefault("DYNAMO_NAMESPACE", "vllm-agg")
+ ffiComponent = getEnvOrDefault("DYNAMO_COMPONENT", "backend")
+ ffiModel = getEnvOrDefault("DYNAMO_MODEL", "Qwen/Qwen3-0.6B")
+ ffiWorkerID = getEnvInt64OrDefault("DYNAMO_WORKER_ID", 1)
+
+ ffiOverlapScoreWeight = getEnvFloatOrDefault("DYNAMO_OVERLAP_SCORE_WEIGHT", -1.0)
+ ffiRouterTemperature = getEnvFloatOrDefault("DYNAMO_ROUTER_TEMPERATURE", -1.0)
+
+ kvBlockSizeStr := os.Getenv("DYNAMO_KV_BLOCK_SIZE")
+ if kvBlockSizeStr == "" {
+ panic("DYNAMO_KV_BLOCK_SIZE is required and must match the model card's kv_cache_block_size")
+ }
+ var tmp int64
+ if n, err := fmt.Sscanf(kvBlockSizeStr, "%d", &tmp); err != nil || n != 1 {
+ panic(fmt.Sprintf("DYNAMO_KV_BLOCK_SIZE='%s' is not a valid integer", kvBlockSizeStr))
+ }
+ ffiKvBlockSize = uint32(tmp)
+ if ffiKvBlockSize < 16 || ffiKvBlockSize > 8192 {
+ panic(fmt.Sprintf("DYNAMO_KV_BLOCK_SIZE=%d outside [16,8192]", ffiKvBlockSize))
+ }
+ if (ffiKvBlockSize & (ffiKvBlockSize - 1)) != 0 {
+ panic(fmt.Sprintf("DYNAMO_KV_BLOCK_SIZE=%d must be a power of 2", ffiKvBlockSize))
+ }
+ fmt.Printf("Dynamo KV Scorer: Loaded DYNAMO_KV_BLOCK_SIZE=%d\n", ffiKvBlockSize)
+}
+
+func getEnvOrDefault(key, def string) string {
+ if v := os.Getenv(key); v != "" {
+ return v
+ }
+ return def
+}
+func getEnvInt64OrDefault(key string, def int64) int64 {
+ if v := os.Getenv(key); v != "" {
+ var p int64
+ if n, err := fmt.Sscanf(v, "%d", &p); err == nil && n == 1 {
+ return p
+ }
+ }
+ return def
+}
+func getEnvFloatOrDefault(key string, def float64) float64 {
+ if v := os.Getenv(key); v != "" {
+ var p float64
+ if n, err := fmt.Sscanf(v, "%f", &p); err == nil && n == 1 {
+ return p
+ }
+ }
+ return def
+}
+func getEnvBoolOrDefault(key string, def bool) bool {
+ if v := os.Getenv(key); v != "" {
+ switch strings.ToLower(v) {
+ case "true", "1", "yes", "on":
+ return true
+ case "false", "0", "no", "off":
+ return false
+ }
+ }
+ return def
+}
+
+// initFFI: initialize runtime and create a persistent boxed pipeline.
+func initFFI() error {
+ ffiOnce.Do(func() {
+ loadDynamoConfig()
+
+ ns := C.CString(ffiNamespace)
+ cm := C.CString(ffiComponent)
+ model := C.CString(ffiModel)
+ defer C.free(unsafe.Pointer(ns))
+ defer C.free(unsafe.Pointer(cm))
+ defer C.free(unsafe.Pointer(model))
+
+ // Init Dynamo runtime
+ if rc := C.dynamo_llm_init(ns, cm, C.int64_t(ffiWorkerID), C.uint32_t(ffiKvBlockSize)); rc != C.DYNAMO_OK {
+ ffiErr = fmt.Errorf("dynamo_llm_init failed")
+ return
+ }
+ runtimeInitialized = true
+
+ // Create persistent pipeline
+ pipelineMutex.Lock()
+ defer pipelineMutex.Unlock()
+
+ rc := C.dynamo_create_worker_selection_pipeline(
+ ns,
+ cm,
+ model,
+ C.bool(getEnvBoolOrDefault("DYNAMO_USE_KV_ROUTING", true)),
+ C.double(getEnvFloatOrDefault("DYNAMO_BUSY_THRESHOLD", -1.0)),
+ C.double(ffiOverlapScoreWeight),
+ C.double(ffiRouterTemperature),
+ C.bool(getEnvBoolOrDefault("DYNAMO_USE_KV_EVENTS", true)),
+ C.bool(getEnvBoolOrDefault("DYNAMO_ROUTER_REPLICA_SYNC", false)),
+ &pipeline,
+ )
+ if rc != C.DYNAMO_OK {
+ ffiErr = fmt.Errorf("dynamo_create_worker_selection_pipeline failed")
+ return
+ }
+ })
+ return ffiErr
+}
+
+// --------------------------- scoring ---------------------------
+
+func encodeTokenData(tokens []int64) string {
+ b, _ := json.Marshal(tokens)
+ return base64.StdEncoding.EncodeToString(b)
+}
+
+func (k *KVAwareScorer) Score(
+ ctx context.Context,
+ cycle *schedtypes.CycleState,
+ req *schedtypes.LLMRequest,
+ pods []schedtypes.Pod,
+) map[schedtypes.Pod]float64 {
+ logger := log.FromContext(ctx)
+
+ workerID, tokenData, err := k.callDynamoRouter(ctx, req)
+ if err != nil {
+ logger.V(logutil.DEFAULT).Error(err, "Dynamo call failed; proceeding without worker id")
+ } else if workerID != "" {
+ logger.V(logutil.DEFAULT).Info(
+ "Dynamo router selected worker",
+ "workerID", workerID,
+ "tokenDataCount", len(tokenData),
+ "tokenData", tokenData,
+ )
+ cycle.Write(StateKeyWorkerInstanceID, stateString(workerID))
+ if req.Headers == nil {
+ req.Headers = map[string]string{}
+ }
+ req.Headers[WorkerIDHeader] = workerID
+ if len(tokenData) > 0 {
+ if req.Headers == nil {
+ req.Headers = map[string]string{}
+ }
+ req.Headers[TokenDataHeader] = encodeTokenData(tokenData)
+ }
+ }
+
+ out := make(map[schedtypes.Pod]float64, len(pods))
+ for _, p := range pods {
+ out[p] = 1.0
+ }
+ return out
+}
+
+// --------------------------- router call (persistent only) ---------------------------
+
+func (k *KVAwareScorer) callDynamoRouter(
+ ctx context.Context,
+ req *schedtypes.LLMRequest,
+) (string, []int64, error) {
+ logger := log.FromContext(ctx)
+
+ if err := initFFI(); err != nil {
+ logger.V(logutil.DEFAULT).Error(err, "FFI init failed")
+ return "", nil, err
+ }
+ if !runtimeInitialized {
+ return "", nil, fmt.Errorf("dynamo runtime not initialized")
+ }
+
+ pipelineMutex.RLock()
+ currentPipeline := pipeline
+ pipelineMutex.RUnlock()
+
+ if currentPipeline == nil {
+ return "", nil, fmt.Errorf("dynamo worker selection pipeline not created")
+ }
+
+ // Build OpenAI-compatible JSON request
+ requestBody := buildOpenAIRequest(req)
+ requestJSON, err := json.Marshal(requestBody)
+ if err != nil {
+ logger.V(logutil.DEFAULT).Error(err, "Failed to marshal OpenAI request")
+ return "", nil, fmt.Errorf("marshal OpenAI request: %w", err)
+ }
+ cRequestJSON := C.CString(string(requestJSON))
+ defer C.free(unsafe.Pointer(cRequestJSON))
+
+ // Output variables
+ var cWorkerID C.int64_t
+ var cTokens *C.uint32_t
+ var cTokenCount C.size_t
+ var cAnnotatedJSON *C.char
+
+ // Call the worker selection pipeline
+ rc := C.dynamo_query_worker_selection_and_annotate(
+ currentPipeline,
+ cRequestJSON,
+ &cWorkerID,
+ &cTokens,
+ &cTokenCount,
+ &cAnnotatedJSON,
+ )
+ if rc != C.DYNAMO_OK {
+ return "", nil, fmt.Errorf("dynamo_query_worker_selection_and_annotate failed")
+ }
+
+ // Copy tokens into Go memory and free C memory
+ count := int(uintptr(cTokenCount))
+ var tokens64 []int64
+ if count > 0 && cTokens != nil {
+ src := unsafe.Slice((*uint32)(unsafe.Pointer(cTokens)), count)
+ tokens64 = make([]int64, count)
+ for i := 0; i < count; i++ {
+ tokens64[i] = int64(src[i])
+ }
+ }
+ C.dynamo_free_worker_selection_result(cTokens, cTokenCount, cAnnotatedJSON)
+
+ workerID := fmt.Sprintf("%d", int64(cWorkerID))
+ logger.V(logutil.DEFAULT).Info("Worker selection completed",
+ "workerID", workerID, "tokenCount", count)
+
+ return workerID, tokens64, nil
+}
+
+func buildOpenAIRequest(req *schedtypes.LLMRequest) map[string]any {
+ requestBody := make(map[string]any)
+ userText := "default prompt"
+ if req != nil && strings.TrimSpace(req.Prompt) != "" {
+ userText = req.Prompt
+ }
+ requestBody["messages"] = []map[string]any{{"role": "user", "content": userText}}
+ if req != nil && strings.TrimSpace(req.TargetModel) != "" {
+ requestBody["model"] = req.TargetModel
+ } else {
+ requestBody["model"] = ffiModel
+ }
+ requestBody["max_tokens"] = 1
+ requestBody["temperature"] = 0.0
+ requestBody["stream"] = true
+ requestBody["nvext"] = map[string]any{
+ "annotations": []string{"query_instance_id"},
+ }
+ return requestBody
+}
+
+// --------------------------- shutdown ---------------------------
+
+func cleanupDynamo() error {
+ pipelineMutex.Lock()
+ defer pipelineMutex.Unlock()
+
+ if pipeline != nil {
+ if rc := C.dynamo_destroy_worker_selection_pipeline(pipeline); rc != C.DYNAMO_OK {
+ fmt.Printf("Warning: dynamo_destroy_worker_selection_pipeline failed\n")
+ }
+ pipeline = nil
+ }
+
+ if runtimeInitialized {
+ if rc := C.dynamo_llm_shutdown(); rc != C.DYNAMO_OK {
+ return fmt.Errorf("dynamo_llm_shutdown failed")
+ }
+ runtimeInitialized = false
+ }
+ return nil
+}
...@@ -41,8 +41,8 @@ spec: ...@@ -41,8 +41,8 @@ spec:
containers: containers:
- name: epp - name: epp
image: {{ if .Values.eppAware.enabled }}{{ default .Values.extension.image .Values.eppAware.eppImage }}{{ else }}{{ .Values.extension.image }}{{ end }} image: "{{ if .Values.eppAware.enabled }}{{ default .Values.extension.image .Values.eppAware.eppImage }}{{ else }}{{ .Values.extension.image }}{{ end }}"
imagePullPolicy: {{ .Values.epp.imagePullPolicy | default "IfNotPresent" }} imagePullPolicy: {{ default "IfNotPresent" .Values.epp.imagePullPolicy }}
args: args:
{{- if .Values.epp.argsOverride }} {{- if .Values.epp.argsOverride }}
{{- toYaml .Values.epp.argsOverride | nindent 8 }} {{- toYaml .Values.epp.argsOverride | nindent 8 }}
...@@ -64,17 +64,38 @@ spec: ...@@ -64,17 +64,38 @@ spec:
- "/etc/epp/epp-config-dynamo.yaml" - "/etc/epp/epp-config-dynamo.yaml"
{{- end }} {{- end }}
{{- end }} {{- end }}
{{- $platformNs := default .Release.Namespace .Values.platformNamespace -}}
{{- $platformName := default "dynamo-platform" .Values.platformReleaseName -}}
{{- $ns := required "set eppAware.dynamoNamespace via values" .Values.eppAware.dynamoNamespace -}}
{{- $comp := default "backend" .Values.eppAware.dynamoComponent -}}
{{- $kv := default "16" .Values.eppAware.dynamoKvBlockSize -}}
{{- if .Values.eppAware.enabled }} {{- if .Values.eppAware.enabled }}
volumeMounts: volumeMounts:
- name: epp-config - name: epp-config
mountPath: /etc/epp mountPath: /etc/epp
readOnly: true readOnly: true
{{- end }} {{- end }}
env: env:
{{- if .Values.eppAware.enabled }}
- name: ETCD_ENDPOINTS
value: "{{ $platformName }}-etcd.{{ $platformNs }}:2379"
- name: NATS_SERVER
value: "nats://{{ $platformName }}-nats.{{ $platformNs }}:4222"
- name: DYNAMO_NAMESPACE
value: "{{ $ns }}"
- name: DYNAMO_COMPONENT
value: "{{ $comp }}"
- name: DYNAMO_KV_BLOCK_SIZE
value: "{{ $kv }}"
{{- end }}
{{- range .Values.epp.extraEnv }} {{- range .Values.epp.extraEnv }}
- name: {{ .name }} - name: {{ .name }}
value: {{ .value | quote }} value: {{ .value | quote }}
{{- end }} {{- end }}
ports: ports:
- containerPort: 9002 - containerPort: 9002
- containerPort: 9003 - containerPort: 9003
...@@ -93,27 +114,6 @@ spec: ...@@ -93,27 +114,6 @@ spec:
initialDelaySeconds: 5 initialDelaySeconds: 5
periodSeconds: 10 periodSeconds: 10
{{- if .Values.eppAware.enabled }}
- name: {{ .Values.eppAware.sidecar.name }}
image: {{ .Values.eppAware.sidecar.image }}
imagePullPolicy: {{ .Values.eppAware.sidecar.imagePullPolicy | default "IfNotPresent" }}
command: {{- toYaml .Values.eppAware.sidecar.command | nindent 8 }}
args: {{- toYaml .Values.eppAware.sidecar.args | nindent 8 }}
env:
{{- range .Values.eppAware.sidecar.env }}
{{- if .valueFromDynamoNamespace }}
- name: {{ .name }}
value: "{{ $.Values.dynamoNamespace }}"
{{- else }}
- name: {{ .name }}
value: {{ .value | quote }}
{{- end }}
{{- end }}
ports:
{{- toYaml .Values.eppAware.sidecar.ports | nindent 8 }}
resources:
{{- toYaml .Values.eppAware.sidecar.resources | nindent 10 }}
{{- end }}
{{- if .Values.eppAware.enabled }} {{- if .Values.eppAware.enabled }}
volumes: volumes:
- name: epp-config - name: epp-config
......
...@@ -66,32 +66,7 @@ epp: ...@@ -66,32 +66,7 @@ epp:
eppAware: eppAware:
enabled: false enabled: false
# Optional: override EPP image when epp-aware=true # Optional: override EPP image when epp-aware=true
eppImage: docker.io/lambda108/epp-inference-extension-dynamo:v0.5.1-1 eppImage: nvcr.io/nvstaging/ai-dynamo/gaie-epp-dynamo:v0.6.0-1
dynamoNamespace: ""
# Sidecar (frontend-router) dynamoComponent: ""
sidecar: dynamoKvBlockSize: ""
# Container name for the sidecar
name: frontend-router
# Sidecar image
image: nvcr.io/nvidia/ai-dynamo/vllm-runtime:my-tag
# Image pull policy for the sidecar
imagePullPolicy: IfNotPresent
# Command and args for running the frontend in router mode.
command: ["/bin/sh", "-c"]
args: ["python3 -m dynamo.frontend --http-port 8000 --router-mode kv"]
# Environment variables for the sidecar.
env:
- name: DYNAMO_NAMESPACE
valueFromDynamoNamespace: true
- name: ETCD_ENDPOINTS
value: "http://dynamo-platform-etcd:2379"
- name: NATS_SERVER
value: "nats://dynamo-platform-nats:4222"
# Resource requests/limits for the sidecar container.
resources:
requests:
cpu: "1"
memory: "2Gi"
# Ports exposed by the sidecar container.
ports:
- containerPort: 8000
...@@ -15,11 +15,17 @@ ...@@ -15,11 +15,17 @@
eppAware: eppAware:
enabled: true enabled: true
eppImage: docker.io/lambda108/epp-inference-extension-dynamo:v0.5.1-1 eppImage: nvcr.io/nvstaging/ai-dynamo/gaie-epp-dynamo:v0.6.0-1
dynamoNamespace: vllm-agg
dynamoComponent: backend
dynamoKvBlockSize: "16"
imagePullSecrets: imagePullSecrets:
- docker-imagepullsecret - docker-imagepullsecret
platformReleaseName: dynamo-platform
platformNamespace: "my-model"
epp: epp:
extraEnv: extraEnv:
- name: USE_STREAMING - name: USE_STREAMING
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment