Unverified Commit f1d5c95a authored by ishandhanani's avatar ishandhanani Committed by GitHub
Browse files

feat: wire nvext.cache_control TTL-based pinning through Dynamo router (#6213)


Co-authored-by: default avatarClaude <noreply@anthropic.com>
parent d9657b34
...@@ -64,6 +64,7 @@ class FrontendConfig(ConfigBase): ...@@ -64,6 +64,7 @@ class FrontendConfig(ConfigBase):
router_track_output_blocks: bool router_track_output_blocks: bool
router_event_threads: int router_event_threads: int
router_queue_threshold: Optional[float] router_queue_threshold: Optional[float]
router_enable_cache_control: bool
decode_fallback: bool decode_fallback: bool
migration_limit: int migration_limit: int
...@@ -95,6 +96,8 @@ class FrontendConfig(ConfigBase): ...@@ -95,6 +96,8 @@ class FrontendConfig(ConfigBase):
raise ValueError( raise ValueError(
"--migration-limit must be between 0 and 4294967295 (0=disabled)" "--migration-limit must be between 0 and 4294967295 (0=disabled)"
) )
if self.router_enable_cache_control and self.router_mode != "kv":
raise ValueError("--enable-cache-control requires --router-mode=kv")
@register_encoder(FrontendConfig) @register_encoder(FrontendConfig)
...@@ -373,6 +376,18 @@ class FrontendArgGroup(ArgGroup): ...@@ -373,6 +376,18 @@ class FrontendArgGroup(ArgGroup):
), ),
arg_type=float, arg_type=float,
) )
add_negatable_bool_argument(
g,
flag_name="--enable-cache-control",
env_var="DYN_ENABLE_CACHE_CONTROL",
default=False,
dest="router_enable_cache_control",
help=(
"KV Router: Enable cache control (PIN with TTL). When set, the router creates "
"a cache_control service mesh client and fires pin_prefix after generation for "
"requests with nvext.cache_control. Requires --router-mode=kv."
),
)
add_negatable_bool_argument( add_negatable_bool_argument(
g, g,
flag_name="--decode-fallback", flag_name="--decode-fallback",
......
...@@ -193,6 +193,7 @@ async def async_main(): ...@@ -193,6 +193,7 @@ async def async_main():
router_prune_target_ratio=config.router_prune_target_ratio, router_prune_target_ratio=config.router_prune_target_ratio,
router_queue_threshold=config.router_queue_threshold, router_queue_threshold=config.router_queue_threshold,
router_event_threads=config.router_event_threads, router_event_threads=config.router_event_threads,
router_enable_cache_control=config.router_enable_cache_control,
) )
elif config.router_mode == "random": elif config.router_mode == "random":
router_mode = RouterMode.Random router_mode = RouterMode.Random
......
...@@ -330,6 +330,47 @@ class BaseWorkerHandler(BaseGenerativeHandler): ...@@ -330,6 +330,47 @@ class BaseWorkerHandler(BaseGenerativeHandler):
"new_version": req.new_version, "new_version": req.new_version,
} }
async def pin_prefix(self, body: dict) -> dict:
"""Pin a prefix by token_ids to resist eviction.
Args:
body: Dict with "token_ids" list of token IDs and optional
"ttl_seconds" (default 300).
"""
token_ids = body.get("token_ids", [])
ttl_seconds = body.get("ttl_seconds", 300)
if not token_ids:
return {"status": "error", "message": "token_ids required"}
try:
result = await self.engine.tokenizer_manager.pin_prefix(
token_ids, ttl_seconds
)
return {
"status": "ok" if result.success else "error",
"nodes_pinned": result.nodes_pinned,
"message": result.message,
}
except Exception as e:
logging.error(f"Failed to pin prefix: {e}")
return {"status": "error", "message": str(e)}
async def cache_control(self, request, context=None):
"""Service mesh endpoint for cache control operations.
Args:
request: Dict with "action" key and action-specific parameters.
context: Optional Dynamo context (unused but required by protocol).
Yields:
Single dict with operation result.
"""
action = request.get("action")
if action == "pin_prefix":
result = await self.pin_prefix(request)
else:
result = {"status": "error", "message": f"Unknown action: {action}"}
yield result
def register_engine_routes(self, runtime) -> None: def register_engine_routes(self, runtime) -> None:
"""Register all engine routes for this handler. """Register all engine routes for this handler.
...@@ -344,6 +385,7 @@ class BaseWorkerHandler(BaseGenerativeHandler): ...@@ -344,6 +385,7 @@ class BaseWorkerHandler(BaseGenerativeHandler):
runtime.register_engine_route( runtime.register_engine_route(
"resume_memory_occupation", self.resume_memory_occupation "resume_memory_occupation", self.resume_memory_occupation
) )
runtime.register_engine_route("pin_prefix", self.pin_prefix)
runtime.register_engine_route( runtime.register_engine_route(
"update_weights_from_disk", self.update_weights_from_disk "update_weights_from_disk", self.update_weights_from_disk
) )
......
...@@ -669,7 +669,7 @@ ...@@ -669,7 +669,7 @@
}, },
"id": 9, "id": 9,
"panels": [], "panels": [],
"title": "Cache & PIN", "title": "Cache",
"type": "row" "type": "row"
}, },
{ {
...@@ -756,89 +756,6 @@ ...@@ -756,89 +756,6 @@
} }
] ]
}, },
{
"datasource": {
"type": "prometheus",
"uid": "P1809F7CD0C75ACF3"
},
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisBorderShow": false,
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"drawStyle": "line",
"fillOpacity": 0,
"gradientMode": "none",
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"showPoints": "auto",
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green"
},
{
"color": "red",
"value": 80
}
]
}
},
"overrides": []
},
"gridPos": {
"h": 8,
"w": 8,
"x": 8,
"y": 19
},
"id": 11,
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"mode": "single",
"sort": "none"
}
},
"title": "Active PIN Count",
"type": "timeseries",
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "P1809F7CD0C75ACF3"
},
"editorMode": "code",
"expr": "sglang:hicache_active_pin_count",
"legendFormat": "PINs",
"range": true,
"refId": "A"
}
]
},
{ {
"datasource": { "datasource": {
"type": "prometheus", "type": "prometheus",
......
---
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
title: SGLang for Agentic Workloads
subtitle: Priority scheduling, KV cache eviction policies, and cache pinning for multi-turn agentic serving
---
# SGLang for Agentic Workloads
This guide covers SGLang-specific configuration for agentic serving with Dynamo. It explains which SGLang engine flags to enable, how Dynamo's [agent hints](../../components/router/agent-hints.md) map to SGLang behavior, and how to use experimental cache pinning to protect KV cache for high-value conversations.
## Overview
Agentic workloads (tool-calling loops, multi-turn reasoning, code generation pipelines) have different performance characteristics than batch inference:
- **Prefix-heavy**: Successive turns share a growing conversation prefix. KV cache reuse is critical for low TTFT.
- **Priority-sensitive**: Some requests (user-facing agent turns) matter more than background tasks.
- **Long-lived**: Conversations span minutes to hours. Cache eviction under memory pressure can destroy accumulated KV state.
Dynamo's agent hints give the router per-request metadata. SGLang's engine flags control how that metadata affects scheduling and eviction on the worker.
## SGLang Engine Flags
### Priority Scheduling
Enable priority-based scheduling so the engine respects the `priority` value from `nvext.agent_hints.priority`:
```bash
python -m dynamo.sglang \
--model-path <model> \
--enable-priority-scheduling \
--schedule-low-priority-values-first \
...
```
| Flag | Description |
|------|-------------|
| `--enable-priority-scheduling` | Enables priority-based request scheduling instead of FCFS. |
| `--schedule-low-priority-values-first` | Inverts priority ordering so lower values are scheduled first (matches vLLM convention). Without this flag, higher values = higher priority. |
When priority scheduling is enabled, the engine uses the `priority` field from `nvext.agent_hints` to order requests in its internal queue. Requests with higher effective priority are scheduled before lower-priority ones. Ties are broken by arrival time.
### Priority-Based KV Cache Eviction
By default, SGLang evicts radix tree nodes using LRU. You can switch to priority-based eviction so that low-priority cache entries are evicted before high-priority ones:
```bash
python -m dynamo.sglang \
--model-path <model> \
--radix-eviction-policy priority \
...
```
| Flag | Values | Default | Description |
|------|--------|---------|-------------|
| `--radix-eviction-policy` | `lru`, `priority` | `lru` | Eviction strategy for the GPU radix cache. `priority` uses a heap ordered by the request's priority value. |
This does **not** require HiCache. It controls GPU-only radix tree eviction. When the GPU KV cache is full:
- **`lru`**: Evicts the least recently used leaf nodes first.
- **`priority`**: Evicts lowest-priority leaf nodes first. Nodes with equal priority fall back to LRU ordering.
#### Interaction with HiCache
When both `--radix-eviction-policy priority` and `--enable-hierarchical-cache` are enabled, priority affects eviction at both tiers:
| Event | Behavior |
|-------|----------|
| **GPU full** | Low-priority nodes are evicted (demoted to host) first. With `write_through`, all nodes survive on host -- priority only affects demotion order. |
| **Host full** | Low-priority nodes are deleted from host first. High-priority nodes survive longer. Pinned nodes are skipped entirely. |
The practical impact depends on your write policy. With `write_through`, GPU eviction is just a demotion -- the real deletion happens at host eviction, which is where priority ordering matters most.
## How Agent Hints Map to SGLang
Dynamo's `nvext.agent_hints` fields are consumed by the router and forwarded to SGLang workers. Here is how each hint interacts with the SGLang engine:
| Agent Hint | Router Behavior | SGLang Engine Behavior |
|------------|----------------|----------------------|
| `priority` | No routing effect (forwarded to engine) | Queue ordering when `--enable-priority-scheduling` is set. Also affects radix cache eviction order when `--radix-eviction-policy priority` is set. |
| `latency_sensitivity` | Shifts request earlier in router queue (requires `--router-queue-threshold`) | No direct engine effect. |
| `osl` | Output block tracking for routing decisions (requires `--router-track-output-blocks`) | No direct engine effect. |
| `speculative_prefill` | After response completes, sends a `max_tokens=1` prefill to warm the KV cache for the predicted next turn. | SGLang processes the prefill request normally, populating the radix cache. |
### Example: Agentic Request with Hints
```python
from openai import OpenAI
client = OpenAI(base_url="http://localhost:8000/v1", api_key="dummy")
response = client.chat.completions.create(
model="Qwen/Qwen3-14B-FP8",
messages=[
{"role": "system", "content": "You are a coding assistant."},
{"role": "user", "content": "Write a Python function to parse CSV files."},
],
stream=True,
extra_body={
"nvext": {
"agent_hints": {
"priority": 10,
"latency_sensitivity": 2.0,
"speculative_prefill": True,
"osl": 512
}
}
}
)
for chunk in response:
if chunk.choices[0].delta.content:
print(chunk.choices[0].delta.content, end="")
```
## Cache Pinning (Experimental)
> [!WARNING]
> Cache pinning is experimental and available on development branches only. The API may change.
**Required PRs:**
- SGLang: [feat: TTL-based prefix pinning with refresh-on-hit for HiRadixCache](https://github.com/sgl-project/sglang/pull/18941)
- Dynamo: [feat: wire nvext.cache_control TTL-based pinning through Dynamo router](https://github.com/ai-dynamo/dynamo/pull/6213)
Cache pinning lets you explicitly protect KV cache for high-value conversation prefixes. When a request includes `nvext.cache_control`, the router fires a `pin_prefix` call to the SGLang worker after generation completes. Pinned nodes resist eviction for the specified TTL -- even under memory pressure, they are retained (demoted to host memory with HiCache rather than deleted).
### How It Works
```mermaid
sequenceDiagram
participant Client
participant Preprocessor
participant Router
participant Worker as SGLang Worker
participant Cache as Radix Cache
Client->>Preprocessor: chat/completions + nvext.cache_control{ttl}
Preprocessor->>Preprocessor: Extract TTL, attach to RoutingHints
Preprocessor->>Router: PreprocessedRequest (cache_control_ttl=N)
Router->>Router: Select worker, record token_ids + TTL in PinState
Router->>Worker: Generate request
Worker-->>Router: Stream response tokens
Router-->>Client: Stream response tokens
Note over Router,Worker: On stream completion
Router-)Worker: pin_prefix(token_ids, ttl) [fire-and-forget]
Worker->>Cache: Walk radix tree along token sequence
Cache->>Cache: Set pin_expiry, acquire host_ref_counter hold
Worker--)Router: {status: ok, nodes_pinned: N}
Note over Cache: TTL expires
Cache->>Cache: Clear pin_expiry, release host_ref_counter
Note over Cache: Node now eligible for normal eviction
```
1. The client includes `nvext.cache_control` with a TTL in the request.
2. The Dynamo preprocessor extracts the TTL and attaches it to routing hints.
3. The router routes the request normally and records the token IDs in a `PinState`.
4. After the response stream completes, the router spawns a fire-and-forget `pin_prefix` RPC to the worker that served the request.
5. The worker walks the radix tree along the token sequence and pins each node, setting `pin_expiry` and acquiring a `host_ref_counter` hold that prevents eviction.
6. When TTL expires, the pin is cleared and the node becomes eligible for normal eviction.
### Enabling Cache Pinning
**Frontend flag:**
```bash
python -m dynamo.frontend \
--router-mode kv \
--enable-cache-control \
...
```
| Flag | Description |
|------|-------------|
| `--enable-cache-control` | Enables cache control (PIN with TTL). Creates a `cache_control` service mesh client and fires `pin_prefix` after generation for requests with `nvext.cache_control`. Requires `--router-mode=kv`. |
**SGLang worker:** The worker receives PIN requests via its `cache_control` service mesh endpoint. You **must** set the `SGLANG_HICACHE_MAX_PINNED_RATIO` environment variable to a non-zero value -- pinning is disabled by default.
| Environment Variable | Type | Default | Description |
|---------------------|------|---------|-------------|
| `SGLANG_HICACHE_MAX_PINNED_RATIO` | `float` | `0.0` | Max fraction of cache tokens that can be pinned. Must be in `[0, 1)`. `0` disables pinning entirely. |
HiCache is required (`--enable-hierarchical-cache`). Without it, the scheduler rejects PIN requests. For best results, use `write_through` so that pinned nodes demote to host memory instead of being deleted when GPU memory fills:
```bash
SGLANG_HICACHE_MAX_PINNED_RATIO=0.1 python -m dynamo.sglang \
--model-path Qwen/Qwen3-14B-FP8 \
--enable-hierarchical-cache \
--hicache-ratio 2.0 \
--hicache-write-policy write_through \
...
```
### Request Format
Include `cache_control` as a top-level field in `nvext`:
```json
{
"model": "Qwen/Qwen3-14B-FP8",
"messages": [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "Explain quantum computing."}
],
"nvext": {
"cache_control": {
"type": "ephemeral",
"ttl": "1h"
}
}
}
```
| Field | Type | Description |
|-------|------|-------------|
| `cache_control.type` | `string` | Currently only `"ephemeral"` is supported. |
| `cache_control.ttl` | `string` | TTL as integer seconds (`"600"`) or shorthand (`"5m"`, `"1h"`). Clamped to [300, 3600] seconds. Unrecognized strings default to 300s. |
### Python Example
```python
from openai import OpenAI
client = OpenAI(base_url="http://localhost:8000/v1", api_key="dummy")
# First turn -- pin the conversation prefix for 1 hour
response = client.chat.completions.create(
model="Qwen/Qwen3-14B-FP8",
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": "Analyze this codebase and suggest improvements."},
],
stream=True,
extra_body={
"nvext": {
"cache_control": {
"type": "ephemeral",
"ttl": "1h"
}
}
}
)
# Collect the assistant reply
assistant_response = ""
for chunk in response:
if chunk.choices[0].delta.content:
assistant_response += chunk.choices[0].delta.content
# Later turns reuse the pinned prefix -- even after heavy load from
# other requests, the KV cache for this conversation is preserved.
response = client.chat.completions.create(
model="Qwen/Qwen3-14B-FP8",
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": "Analyze this codebase and suggest improvements."},
{"role": "assistant", "content": assistant_response},
{"role": "user", "content": "Now focus on the database layer."},
],
stream=True,
extra_body={
"nvext": {
"cache_control": {
"type": "ephemeral",
"ttl": "1h"
}
}
}
)
```
### Verifying Cache Hits
The response includes `prompt_tokens_details.cached_tokens` in the `usage` object when `--enable-cache-report` is set on the SGLang worker:
```json
{
"usage": {
"prompt_tokens": 2048,
"completion_tokens": 150,
"prompt_tokens_details": {
"cached_tokens": 1920
}
}
}
```
A high `cached_tokens / prompt_tokens` ratio on subsequent turns confirms that the pinned prefix was preserved.
### Limitations
- **Pinning disabled by default**: `SGLANG_HICACHE_MAX_PINNED_RATIO` defaults to `0.0`. You must set it to a non-zero value (e.g., `0.1`) or all PIN requests will be rejected.
- **HiCache required**: The scheduler rejects PIN requests unless `--enable-hierarchical-cache` is set.
- **TTL clamping**: Values are clamped to [300, 3600] seconds. You cannot pin for less than 5 minutes or more than 1 hour.
- **Pin budget**: Pinned tokens consume a budget controlled by `SGLANG_HICACHE_MAX_PINNED_RATIO` (fraction of host pool capacity). Requests exceeding this budget are rejected.
- **No priority on pinned nodes**: `pin_prefix` does not set a priority on the radix tree nodes. All pinned nodes have equal eviction priority and fall back to LRU ordering among themselves when host memory fills.
- **Requires stack restart for A/B testing**: Pins persist in cache across benchmark runs. When comparing pinned vs. unpinned performance, restart the full stack between phases to avoid false cache hits.
## See Also
- **[Agent Hints](../../components/router/agent-hints.md)**: Per-request hint reference
- **[NVIDIA Request Extensions (nvext)](../../components/frontend/nvext.md)**: Full `nvext` field reference
- **[Router Guide](../../components/router/router-guide.md)**: Router configuration and CLI arguments
- **[SGLang HiCache](../../integrations/sglang-hicache.md)**: Enabling hierarchical KV cache
...@@ -40,6 +40,7 @@ Include `nvext` as a top-level field alongside standard OpenAI-compatible fields ...@@ -40,6 +40,7 @@ Include `nvext` as a top-level field alongside standard OpenAI-compatible fields
| `prefill_worker_id` | `u64` | `None` | Router | Routes the request to a specific prefill worker (disaggregated serving). | | `prefill_worker_id` | `u64` | `None` | Router | Routes the request to a specific prefill worker (disaggregated serving). |
| `decode_worker_id` | `u64` | `None` | Router | Routes the request to a specific decode worker (disaggregated serving). | | `decode_worker_id` | `u64` | `None` | Router | Routes the request to a specific decode worker (disaggregated serving). |
| `agent_hints` | object | `None` | Router | Per-request hints for scheduling and load balancing. See [Agent Hints](#agent-hints). | | `agent_hints` | object | `None` | Router | Per-request hints for scheduling and load balancing. See [Agent Hints](#agent-hints). |
| `cache_control` | object | `None` | Router | KV cache pinning hint with TTL. See [Cache Control](#cache-control). |
### Header Overrides ### Header Overrides
...@@ -134,6 +135,31 @@ When omitted, SGLang defaults to `None` (engine default); vLLM defaults to `0`. ...@@ -134,6 +135,31 @@ When omitted, SGLang defaults to `None` (engine default); vLLM defaults to `0`.
} }
``` ```
## Cache Control
> [!WARNING]
> Cache control is experimental and available on development branches only. The API may change.
The `cache_control` object enables explicit KV cache pinning with a TTL. When set, the router fires a `pin_prefix` call to the backend worker after generation completes, protecting the conversation's KV cache from eviction for the specified duration.
| Field | Type | Default | Description |
|-------|------|---------|-------------|
| `cache_control.type` | `string` | — | Cache control type. Currently only `"ephemeral"` is supported. |
| `cache_control.ttl` | `string` | `"300"` | TTL as integer seconds (`"600"`) or shorthand (`"5m"`, `"1h"`). Clamped to [300, 3600] seconds. |
```json
{
"nvext": {
"cache_control": {
"type": "ephemeral",
"ttl": "1h"
}
}
}
```
Requires `--enable-cache-control` and `--router-mode=kv` on the frontend. See [SGLang for Agentic Workloads](../../backends/sglang/agents.md#cache-pinning-experimental) for full setup and usage details.
## Response Extensions ## Response Extensions
When the client requests response metadata via `extra_fields`, the response includes an `nvext` object with the requested fields: When the client requests response metadata via `extra_fields`, the response includes an `nvext` object with the requested fields:
......
...@@ -97,5 +97,7 @@ This is most effective for reasoning models in agentic loops, where the conversa ...@@ -97,5 +97,7 @@ This is most effective for reasoning models in agentic loops, where the conversa
## See Also ## See Also
- **[SGLang for Agentic Workloads](../../backends/sglang/agents.md)**: SGLang engine flags for priority scheduling, eviction policies, and cache pinning
- **[NVIDIA Request Extensions (nvext)](../frontend/nvext.md)**: Full `nvext` field reference including `cache_control`
- **[Router Guide](router-guide.md)**: Full router configuration and CLI arguments - **[Router Guide](router-guide.md)**: Full router configuration and CLI arguments
- **[Router Examples](router-examples.md)**: Usage patterns and benchmarking - **[Router Examples](router-examples.md)**: Usage patterns and benchmarking
...@@ -95,6 +95,8 @@ navigation: ...@@ -95,6 +95,8 @@ navigation:
path: ../pages/features/multimodal/multimodal-sglang.md path: ../pages/features/multimodal/multimodal-sglang.md
- page: Tool Calling - page: Tool Calling
path: ../pages/agents/tool-calling.md path: ../pages/agents/tool-calling.md
- page: SGLang for Agentic Workloads
path: ../pages/backends/sglang/agents.md
- page: LoRA Adapters - page: LoRA Adapters
path: ../pages/features/lora/README.md path: ../pages/features/lora/README.md
- section: Observability (Local) - section: Observability (Local)
......
...@@ -52,7 +52,7 @@ impl KvRouterConfig { ...@@ -52,7 +52,7 @@ impl KvRouterConfig {
#[pymethods] #[pymethods]
impl KvRouterConfig { impl KvRouterConfig {
#[new] #[new]
#[pyo3(signature = (overlap_score_weight=1.0, router_temperature=0.0, use_kv_events=true, durable_kv_events=false, router_replica_sync=false, router_track_active_blocks=true, router_track_output_blocks=false, router_assume_kv_reuse=true, router_snapshot_threshold=1000000, router_reset_states=false, router_ttl_secs=120.0, router_max_tree_size=1048576, router_prune_target_ratio=0.8, router_queue_threshold=None, router_event_threads=1))] #[pyo3(signature = (overlap_score_weight=1.0, router_temperature=0.0, use_kv_events=true, durable_kv_events=false, router_replica_sync=false, router_track_active_blocks=true, router_track_output_blocks=false, router_assume_kv_reuse=true, router_snapshot_threshold=1000000, router_reset_states=false, router_ttl_secs=120.0, router_max_tree_size=1048576, router_prune_target_ratio=0.8, router_queue_threshold=None, router_event_threads=1, router_enable_cache_control=false))]
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
fn new( fn new(
overlap_score_weight: f64, overlap_score_weight: f64,
...@@ -70,6 +70,7 @@ impl KvRouterConfig { ...@@ -70,6 +70,7 @@ impl KvRouterConfig {
router_prune_target_ratio: f64, router_prune_target_ratio: f64,
router_queue_threshold: Option<f64>, router_queue_threshold: Option<f64>,
router_event_threads: u32, router_event_threads: u32,
router_enable_cache_control: bool,
) -> Self { ) -> Self {
KvRouterConfig { KvRouterConfig {
inner: RsKvRouterConfig { inner: RsKvRouterConfig {
...@@ -88,6 +89,7 @@ impl KvRouterConfig { ...@@ -88,6 +89,7 @@ impl KvRouterConfig {
router_prune_target_ratio, router_prune_target_ratio,
router_queue_threshold, router_queue_threshold,
router_event_threads, router_event_threads,
router_enable_cache_control,
}, },
} }
} }
......
...@@ -28,6 +28,7 @@ pub use dynamo_kv_router::approx; ...@@ -28,6 +28,7 @@ pub use dynamo_kv_router::approx;
pub use dynamo_kv_router::indexer; pub use dynamo_kv_router::indexer;
pub use dynamo_kv_router::protocols; pub use dynamo_kv_router::protocols;
pub mod cache_control;
pub mod config; pub mod config;
pub mod indexer_standalone; pub mod indexer_standalone;
mod jetstream; mod jetstream;
...@@ -42,6 +43,7 @@ pub mod sequence; ...@@ -42,6 +43,7 @@ pub mod sequence;
pub mod subscriber; pub mod subscriber;
pub mod worker_query; pub mod worker_query;
pub use cache_control::{CacheControlClient, spawn_pin_prefix};
pub use config::{KvRouterConfig, RouterConfigOverride}; pub use config::{KvRouterConfig, RouterConfigOverride};
pub use indexer_standalone::start_kv_block_indexer; pub use indexer_standalone::start_kv_block_indexer;
pub use prefill_router::PrefillRouter; pub use prefill_router::PrefillRouter;
......
// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
use anyhow::Result;
use dynamo_runtime::{
component::Component,
pipeline::{PushRouter, RouterMode, SingleIn},
protocols::annotated::Annotated,
};
use futures::StreamExt;
use crate::protocols::TokenIdType;
/// State captured at routing time for a deferred PIN after generation completes.
pub(crate) struct PinState {
pub token_ids: Vec<TokenIdType>,
pub cc_client: CacheControlClient,
pub instance_id: u64,
pub ttl_seconds: u64,
}
/// A PushRouter client typed for cache_control requests/responses.
///
/// Both request and response are untyped JSON. The worker's cache_control
/// endpoint returns {"status": "ok"/"error", ...} but the router treats
/// PIN as fire-and-forget and only logs the response at debug level.
pub type CacheControlClient = PushRouter<serde_json::Value, Annotated<serde_json::Value>>;
/// Create a cache_control client from a component.
///
/// Connects to the "cache_control" endpoint on the given component and returns
/// a PushRouter client for sending cache control operations (pin_prefix,
/// unpin_prefix) to workers.
pub(crate) async fn create_cache_control_client(
component: &Component,
) -> Result<CacheControlClient> {
let client = component.endpoint("cache_control").client().await?;
CacheControlClient::from_client(client, RouterMode::KV).await
}
/// Fire-and-forget pin_prefix to the worker that served this request.
///
/// Spawns a detached task that sends the pin request and logs the outcome.
/// Does nothing if `client` is `None` (logs a warning).
pub fn spawn_pin_prefix(
client: Option<&CacheControlClient>,
token_ids: &[TokenIdType],
instance_id: u64,
context_id: &str,
ttl_seconds: u64,
) {
let Some(cc) = client else {
tracing::warn!(
request_id = %context_id,
"cache_control set but no cache_control_client configured"
);
return;
};
let cc = cc.clone();
let token_ids = token_ids.to_vec();
let context_id = context_id.to_owned();
tokio::spawn(async move {
let pin_request = serde_json::json!({
"action": "pin_prefix",
"token_ids": token_ids,
"ttl_seconds": ttl_seconds,
});
match cc.direct(SingleIn::new(pin_request), instance_id).await {
Ok(mut stream) => {
if let Some(resp) = stream.next().await {
tracing::info!(
request_id = %context_id,
worker_id = instance_id,
?resp,
"pin_prefix response"
);
}
// Drain remaining stream to avoid "Failed to publish
// complete final" errors from the push handler.
while stream.next().await.is_some() {}
}
Err(e) => {
tracing::warn!(
request_id = %context_id,
worker_id = instance_id,
"Failed to pin prefix: {e}"
);
}
}
});
}
...@@ -85,6 +85,12 @@ pub struct KvRouterConfig { ...@@ -85,6 +85,12 @@ pub struct KvRouterConfig {
/// single-threaded RadixTree. Default: 1. /// single-threaded RadixTree. Default: 1.
#[validate(range(min = 1))] #[validate(range(min = 1))]
pub router_event_threads: u32, pub router_event_threads: u32,
/// Enable cache control (PIN with TTL) via the worker's cache_control service mesh endpoint.
/// When true, the router creates a cache_control client and honors nvext.cache_control on
/// requests, firing a pin_prefix call (with TTL) to the worker after generation completes.
/// When false (default), cache_control is ignored and no cache_control client is created.
pub router_enable_cache_control: bool,
} }
impl Default for KvRouterConfig { impl Default for KvRouterConfig {
...@@ -105,6 +111,7 @@ impl Default for KvRouterConfig { ...@@ -105,6 +111,7 @@ impl Default for KvRouterConfig {
router_prune_target_ratio: 0.8, router_prune_target_ratio: 0.8,
router_queue_threshold: None, router_queue_threshold: None,
router_event_threads: 1, router_event_threads: 1,
router_enable_cache_control: false,
} }
} }
} }
......
...@@ -13,11 +13,13 @@ use dynamo_runtime::{ ...@@ -13,11 +13,13 @@ use dynamo_runtime::{
}; };
use futures::stream::{self, StreamExt}; use futures::stream::{self, StreamExt};
use serde_json::json; use serde_json::json;
use tokio::sync::OnceCell;
use tracing::Instrument; use tracing::Instrument;
use crate::{ use crate::{
kv_router::{ kv_router::{
KvRouter, CacheControlClient, KvRouter,
cache_control::{PinState, create_cache_control_client, spawn_pin_prefix},
metrics::RouterRequestMetrics, metrics::RouterRequestMetrics,
protocols::{TokensWithHashes, WorkerWithDpRank}, protocols::{TokensWithHashes, WorkerWithDpRank},
}, },
...@@ -31,6 +33,8 @@ use crate::{ ...@@ -31,6 +33,8 @@ use crate::{
pub struct KvPushRouter { pub struct KvPushRouter {
inner: PushRouter<PreprocessedRequest, Annotated<LLMEngineOutput>>, inner: PushRouter<PreprocessedRequest, Annotated<LLMEngineOutput>>,
pub chooser: Arc<KvRouter>, pub chooser: Arc<KvRouter>,
/// Lazily initialized on first PIN request. `None` when cache_control is disabled.
cache_control_cell: Option<OnceCell<CacheControlClient>>,
} }
/// Result of worker selection containing instance ID, dp_rank, and overlap amount. /// Result of worker selection containing instance ID, dp_rank, and overlap amount.
...@@ -61,6 +65,8 @@ struct RequestGuard { ...@@ -61,6 +65,8 @@ struct RequestGuard {
isl_tokens: usize, isl_tokens: usize,
block_size: usize, block_size: usize,
expected_output_tokens: Option<u32>, expected_output_tokens: Option<u32>,
// PIN state: set when cache_control TTL is present and a cc_client exists
pin_state: Option<PinState>,
} }
impl RequestGuard { impl RequestGuard {
...@@ -136,6 +142,16 @@ impl RequestGuard { ...@@ -136,6 +142,16 @@ impl RequestGuard {
tracing::warn!("Failed to free request {}: {e}", self.context_id); tracing::warn!("Failed to free request {}: {e}", self.context_id);
} }
self.freed = true; self.freed = true;
if let Some(ref pin) = self.pin_state {
spawn_pin_prefix(
Some(&pin.cc_client),
&pin.token_ids,
pin.instance_id,
&self.context_id,
pin.ttl_seconds,
);
}
} }
fn record_metrics(&mut self) { fn record_metrics(&mut self) {
...@@ -182,7 +198,18 @@ impl KvPushRouter { ...@@ -182,7 +198,18 @@ impl KvPushRouter {
// scrapeable before any requests arrive. Both the frontend pipeline // scrapeable before any requests arrive. Both the frontend pipeline
// and the standalone router create KvPushRouter, so this covers both. // and the standalone router create KvPushRouter, so this covers both.
RouterRequestMetrics::from_component(chooser.client().endpoint.component()); RouterRequestMetrics::from_component(chooser.client().endpoint.component());
KvPushRouter { inner, chooser }
let cache_control_cell = if chooser.kv_router_config().router_enable_cache_control {
tracing::info!("Cache control enabled for PIN operations (lazy init)");
Some(OnceCell::new())
} else {
None
};
KvPushRouter {
inner,
chooser,
cache_control_cell,
}
} }
/// Select a worker for the request, either using a preselected worker or finding the best match. /// Select a worker for the request, either using a preselected worker or finding the best match.
...@@ -426,6 +453,26 @@ impl AsyncEngine<SingleIn<PreprocessedRequest>, ManyOut<Annotated<LLMEngineOutpu ...@@ -426,6 +453,26 @@ impl AsyncEngine<SingleIn<PreprocessedRequest>, ManyOut<Annotated<LLMEngineOutpu
let track_output_blocks = self.chooser.kv_router_config().router_track_output_blocks; let track_output_blocks = self.chooser.kv_router_config().router_track_output_blocks;
let tracker = request.tracker.clone(); let tracker = request.tracker.clone();
// Extract pin state: lazily init cache_control client on first PIN request
let pin_state: Option<PinState> = async {
let ttl = request.routing.as_ref().and_then(|r| r.cache_control_ttl)?;
let cell = self.cache_control_cell.as_ref()?;
let component = self.chooser.client().endpoint.component().clone();
let client = cell
.get_or_try_init(|| create_cache_control_client(&component))
.await
.inspect_err(|e| tracing::warn!("Failed to create cache_control client: {e}"))
.ok()?
.clone();
Some(PinState {
token_ids: request.token_ids.clone(),
cc_client: client,
instance_id,
ttl_seconds: ttl,
})
}
.await;
let (mut backend_input, context) = request.into_parts(); let (mut backend_input, context) = request.into_parts();
backend_input.routing_mut().dp_rank = Some(dp_rank); backend_input.routing_mut().dp_rank = Some(dp_rank);
let updated_request = context.map(|_| backend_input); let updated_request = context.map(|_| backend_input);
...@@ -467,6 +514,7 @@ impl AsyncEngine<SingleIn<PreprocessedRequest>, ManyOut<Annotated<LLMEngineOutpu ...@@ -467,6 +514,7 @@ impl AsyncEngine<SingleIn<PreprocessedRequest>, ManyOut<Annotated<LLMEngineOutpu
isl_tokens, isl_tokens,
block_size, block_size,
expected_output_tokens, expected_output_tokens,
pin_state,
}; };
loop { loop {
......
...@@ -285,6 +285,7 @@ impl OpenAIPreprocessor { ...@@ -285,6 +285,7 @@ impl OpenAIPreprocessor {
priority_jump: hints.and_then(|h| h.latency_sensitivity), priority_jump: hints.and_then(|h| h.latency_sensitivity),
priority: hints.and_then(|h| h.priority), priority: hints.and_then(|h| h.priority),
lora_name, lora_name,
cache_control_ttl: nvext.cache_control.as_ref().map(|cc| cc.ttl_seconds()),
allowed_worker_ids: None, allowed_worker_ids: None,
}; };
builder.routing(Some(routing)); builder.routing(Some(routing));
......
...@@ -56,6 +56,10 @@ pub struct RoutingHints { ...@@ -56,6 +56,10 @@ pub struct RoutingHints {
#[serde(default, skip_serializing_if = "Option::is_none")] #[serde(default, skip_serializing_if = "Option::is_none")]
pub priority: Option<i32>, pub priority: Option<i32>,
/// TTL in seconds for cache control pinning. None = no pinning.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub cache_control_ttl: Option<u64>,
/// Optional set of allowed worker IDs to restrict routing decisions (EPP). /// Optional set of allowed worker IDs to restrict routing decisions (EPP).
/// When set, only workers in this set are considered during scoring. /// When set, only workers in this set are considered during scoring.
#[serde(default, skip_serializing_if = "Option::is_none")] #[serde(default, skip_serializing_if = "Option::is_none")]
......
...@@ -161,6 +161,12 @@ pub struct NvExt { ...@@ -161,6 +161,12 @@ pub struct NvExt {
#[builder(default, setter(strip_option))] #[builder(default, setter(strip_option))]
#[serde(default, skip_serializing_if = "Option::is_none")] #[serde(default, skip_serializing_if = "Option::is_none")]
pub agent_hints: Option<AgentHints>, pub agent_hints: Option<AgentHints>,
/// Cache control hint (Anthropic-style). When present, the router pins
/// the prefix on the selected worker with the given TTL.
#[builder(default, setter(strip_option))]
#[serde(default, skip_serializing_if = "Option::is_none")]
pub cache_control: Option<CacheControl>,
} }
/// Hints from the agent/caller about request characteristics. /// Hints from the agent/caller about request characteristics.
...@@ -195,6 +201,51 @@ pub struct AgentHints { ...@@ -195,6 +201,51 @@ pub struct AgentHints {
pub priority: Option<i32>, pub priority: Option<i32>,
} }
/// Anthropic-style cache control hint for prefix pinning with TTL.
#[derive(ToSchema, Serialize, Deserialize, Debug, Clone, Default, PartialEq)]
pub struct CacheControl {
#[serde(rename = "type")]
pub control_type: CacheControlType,
/// TTL as seconds (integer) or shorthand ("5m" = 300s, "1h" = 3600s). Clamped to [300, 3600].
#[serde(default, skip_serializing_if = "Option::is_none")]
pub ttl: Option<String>,
}
#[derive(ToSchema, Serialize, Deserialize, Debug, Clone, Default, PartialEq)]
#[serde(rename_all = "lowercase")]
pub enum CacheControlType {
#[default]
Ephemeral,
#[serde(other)]
Unknown,
}
const MIN_TTL_SECONDS: u64 = 300;
const MAX_TTL_SECONDS: u64 = 3600;
impl CacheControl {
/// Parse TTL string to seconds, clamped to [300, 3600].
///
/// Accepts integer seconds ("120", "600") or shorthand ("5m", "1h").
/// Values below 300 are clamped to 300; values above 3600 are clamped to 3600.
/// Unrecognized strings default to 300s.
pub fn ttl_seconds(&self) -> u64 {
let raw = match self.ttl.as_deref() {
None => return MIN_TTL_SECONDS,
Some("5m") => 300,
Some("1h") => 3600,
Some(other) => match other.parse::<u64>() {
Ok(secs) => secs,
Err(_) => {
tracing::warn!("Unrecognized TTL '{}', defaulting to 300s", other);
return MIN_TTL_SECONDS;
}
},
};
raw.clamp(MIN_TTL_SECONDS, MAX_TTL_SECONDS)
}
}
impl Default for NvExt { impl Default for NvExt {
fn default() -> Self { fn default() -> Self {
NvExt::builder().build().unwrap() NvExt::builder().build().unwrap()
...@@ -242,6 +293,74 @@ mod tests { ...@@ -242,6 +293,74 @@ mod tests {
assert_eq!(nv_ext.prefill_worker_id, None); assert_eq!(nv_ext.prefill_worker_id, None);
assert_eq!(nv_ext.decode_worker_id, None); assert_eq!(nv_ext.decode_worker_id, None);
assert_eq!(nv_ext.agent_hints, None); assert_eq!(nv_ext.agent_hints, None);
assert_eq!(nv_ext.cache_control, None);
}
// Test CacheControl serde roundtrip and TTL parsing
#[test]
fn test_cache_control_serde_and_ttl() {
// Default (ephemeral, no TTL)
let cc = CacheControl::default();
assert_eq!(cc.control_type, CacheControlType::Ephemeral);
assert_eq!(cc.ttl, None);
assert_eq!(cc.ttl_seconds(), 300);
// Shorthand values
let cc_5m = CacheControl {
control_type: CacheControlType::Ephemeral,
ttl: Some("5m".to_string()),
};
assert_eq!(cc_5m.ttl_seconds(), 300);
let cc_1h = CacheControl {
control_type: CacheControlType::Ephemeral,
ttl: Some("1h".to_string()),
};
assert_eq!(cc_1h.ttl_seconds(), 3600);
// Integer seconds -- within range
let cc_600 = CacheControl {
control_type: CacheControlType::Ephemeral,
ttl: Some("600".to_string()),
};
assert_eq!(cc_600.ttl_seconds(), 600);
// Integer seconds -- clamped to min (300)
let cc_low = CacheControl {
control_type: CacheControlType::Ephemeral,
ttl: Some("10".to_string()),
};
assert_eq!(cc_low.ttl_seconds(), 300);
// Integer seconds -- clamped to max (3600)
let cc_high = CacheControl {
control_type: CacheControlType::Ephemeral,
ttl: Some("7200".to_string()),
};
assert_eq!(cc_high.ttl_seconds(), 3600);
// Unrecognized string defaults to 300
let cc_bad = CacheControl {
control_type: CacheControlType::Ephemeral,
ttl: Some("forever".to_string()),
};
assert_eq!(cc_bad.ttl_seconds(), 300);
// Serde roundtrip
let json = serde_json::to_string(&cc_5m).unwrap();
let deser: CacheControl = serde_json::from_str(&json).unwrap();
assert_eq!(deser, cc_5m);
// Deserialize from API-style JSON
let api_json = r#"{"type": "ephemeral", "ttl": "1h"}"#;
let from_api: CacheControl = serde_json::from_str(api_json).unwrap();
assert_eq!(from_api.ttl_seconds(), 3600);
// NvExt with cache_control
let nvext_json = r#"{"cache_control": {"type": "ephemeral", "ttl": "5m"}}"#;
let nvext: NvExt = serde_json::from_str(nvext_json).unwrap();
assert!(nvext.cache_control.is_some());
assert_eq!(nvext.cache_control.unwrap().ttl_seconds(), 300);
} }
// Test valid builder configurations // Test valid builder configurations
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment