Unverified Commit c09ac697 authored by ishandhanani's avatar ishandhanani Committed by GitHub
Browse files

refactor(protocols): deprecate cache control (#7790)

parent abd0ba5d
...@@ -34,7 +34,6 @@ _KV_ROUTER_FIELDS: tuple[str, ...] = ( ...@@ -34,7 +34,6 @@ _KV_ROUTER_FIELDS: tuple[str, ...] = (
"router_prune_target_ratio", "router_prune_target_ratio",
"router_queue_threshold", "router_queue_threshold",
"router_event_threads", "router_event_threads",
"router_enable_cache_control",
"router_queue_policy", "router_queue_policy",
"remote_indexer_component", "remote_indexer_component",
) )
...@@ -59,7 +58,6 @@ class KvRouterConfigBase(ConfigBase): ...@@ -59,7 +58,6 @@ class KvRouterConfigBase(ConfigBase):
router_prune_target_ratio: float router_prune_target_ratio: float
router_queue_threshold: Optional[float] router_queue_threshold: Optional[float]
router_event_threads: int router_event_threads: int
router_enable_cache_control: bool
router_queue_policy: str router_queue_policy: str
remote_indexer_component: Optional[str] remote_indexer_component: Optional[str]
...@@ -260,18 +258,6 @@ class KvRouterArgGroup(ArgGroup): ...@@ -260,18 +258,6 @@ class KvRouterArgGroup(ArgGroup):
), ),
arg_type=int, arg_type=int,
) )
add_negatable_bool_argument(
g,
flag_name="--enable-cache-control",
env_var="DYN_ENABLE_CACHE_CONTROL",
default=False,
dest="router_enable_cache_control",
help=(
"KV Router: Enable cache control (PIN with TTL). When set, the router creates "
"a cache_control service mesh client and fires pin_prefix after generation for "
"requests with nvext.cache_control."
),
)
add_argument( add_argument(
g, g,
flag_name="--router-queue-policy", flag_name="--router-queue-policy",
......
...@@ -93,8 +93,6 @@ class FrontendConfig(KvRouterConfigBase): ...@@ -93,8 +93,6 @@ class FrontendConfig(KvRouterConfigBase):
) )
if self.min_initial_workers < 0: if self.min_initial_workers < 0:
raise ValueError("--router-min-initial-workers must be >= 0") raise ValueError("--router-min-initial-workers must be >= 0")
if self.router_enable_cache_control and self.router_mode != "kv":
raise ValueError("--enable-cache-control requires --router-mode=kv")
if self.tokenizer_backend not in self._VALID_TOKENIZER_BACKENDS: if self.tokenizer_backend not in self._VALID_TOKENIZER_BACKENDS:
raise ValueError( raise ValueError(
f"--tokenizer: invalid value '{self.tokenizer_backend}' " f"--tokenizer: invalid value '{self.tokenizer_backend}' "
......
...@@ -380,47 +380,6 @@ class BaseWorkerHandler(BaseGenerativeHandler[RequestT, ResponseT]): ...@@ -380,47 +380,6 @@ class BaseWorkerHandler(BaseGenerativeHandler[RequestT, ResponseT]):
"new_version": req.new_version, "new_version": req.new_version,
} }
async def pin_prefix(self, body: dict) -> dict:
"""Pin a prefix by token_ids to resist eviction.
Args:
body: Dict with "token_ids" list of token IDs and optional
"ttl_seconds" (default 300).
"""
token_ids = body.get("token_ids", [])
ttl_seconds = body.get("ttl_seconds", 300)
if not token_ids:
return {"status": "error", "message": "token_ids required"}
try:
result = await self.engine.tokenizer_manager.pin_prefix(
token_ids, ttl_seconds
)
return {
"status": "ok" if result.success else "error",
"nodes_pinned": result.nodes_pinned,
"message": result.message,
}
except Exception as e:
logging.error(f"Failed to pin prefix: {e}")
return {"status": "error", "message": str(e)}
async def cache_control(self, request, context=None):
"""Service mesh endpoint for cache control operations.
Args:
request: Dict with "action" key and action-specific parameters.
context: Optional Dynamo context (unused but required by protocol).
Yields:
Single dict with operation result.
"""
action = request.get("action")
if action == "pin_prefix":
result = await self.pin_prefix(request)
else:
result = {"status": "error", "message": f"Unknown action: {action}"}
yield result
def register_engine_routes(self, runtime: DistributedRuntime) -> None: def register_engine_routes(self, runtime: DistributedRuntime) -> None:
"""Register all engine routes for this handler. """Register all engine routes for this handler.
...@@ -435,7 +394,6 @@ class BaseWorkerHandler(BaseGenerativeHandler[RequestT, ResponseT]): ...@@ -435,7 +394,6 @@ class BaseWorkerHandler(BaseGenerativeHandler[RequestT, ResponseT]):
runtime.register_engine_route( runtime.register_engine_route(
"resume_memory_occupation", self.resume_memory_occupation "resume_memory_occupation", self.resume_memory_occupation
) )
runtime.register_engine_route("pin_prefix", self.pin_prefix)
runtime.register_engine_route( runtime.register_engine_route(
"update_weights_from_disk", self.update_weights_from_disk "update_weights_from_disk", self.update_weights_from_disk
) )
......
...@@ -2,12 +2,12 @@ ...@@ -2,12 +2,12 @@
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0 # SPDX-License-Identifier: Apache-2.0
title: SGLang for Agentic Workloads title: SGLang for Agentic Workloads
subtitle: Priority scheduling, KV cache eviction policies, and cache pinning for multi-turn agentic serving subtitle: Priority scheduling and KV cache eviction policies for multi-turn agentic serving
--- ---
# SGLang for Agentic Workloads # SGLang for Agentic Workloads
This guide covers SGLang-specific configuration for agentic serving with Dynamo. It explains which SGLang engine flags to enable, how Dynamo's [agent hints](../../components/frontend/nvext.md#agent-hints) map to SGLang behavior, and how to use experimental cache pinning to protect KV cache for high-value conversations. This guide covers SGLang-specific configuration for agentic serving with Dynamo. It explains which SGLang engine flags to enable and how Dynamo's [agent hints](../../components/frontend/nvext.md#agent-hints) map to SGLang behavior.
## Overview ## Overview
...@@ -109,192 +109,6 @@ for chunk in response: ...@@ -109,192 +109,6 @@ for chunk in response:
print(chunk.choices[0].delta.content, end="") print(chunk.choices[0].delta.content, end="")
``` ```
## Cache Pinning (Experimental)
> [!WARNING]
> Cache pinning is experimental and available on development branches only. The API may change.
**Required PRs:**
- SGLang: [feat: TTL-based prefix pinning with refresh-on-hit for HiRadixCache](https://github.com/sgl-project/sglang/pull/18941)
- Dynamo: [feat: wire nvext.cache_control TTL-based pinning through Dynamo router](https://github.com/ai-dynamo/dynamo/pull/6213)
Cache pinning lets you explicitly protect KV cache for high-value conversation prefixes. When a request includes `nvext.cache_control`, the router fires a `pin_prefix` call to the SGLang worker after generation completes. Pinned nodes resist eviction for the specified TTL -- even under memory pressure, they are retained (demoted to host memory with HiCache rather than deleted).
### How It Works
```mermaid
sequenceDiagram
participant Client
participant Preprocessor
participant Router
participant Worker as SGLang Worker
participant Cache as Radix Cache
Client->>Preprocessor: chat/completions + nvext.cache_control{ttl}
Preprocessor->>Preprocessor: Extract TTL, attach to RoutingHints
Preprocessor->>Router: PreprocessedRequest (cache_control_ttl=N)
Router->>Router: Select worker, record token_ids + TTL in PinState
Router->>Worker: Generate request
Worker-->>Router: Stream response tokens
Router-->>Client: Stream response tokens
Note over Router,Worker: On stream completion
Router-)Worker: pin_prefix(token_ids, ttl) [fire-and-forget]
Worker->>Cache: Walk radix tree along token sequence
Cache->>Cache: Set pin_expiry, acquire host_ref_counter hold
Worker--)Router: {status: ok, nodes_pinned: N}
Note over Cache: TTL expires
Cache->>Cache: Clear pin_expiry, release host_ref_counter
Note over Cache: Node now eligible for normal eviction
```
1. The client includes `nvext.cache_control` with a TTL in the request.
2. The Dynamo preprocessor extracts the TTL and attaches it to routing hints.
3. The router routes the request normally and records the token IDs in a `PinState`.
4. After the response stream completes, the router spawns a fire-and-forget `pin_prefix` RPC to the worker that served the request.
5. The worker walks the radix tree along the token sequence and pins each node, setting `pin_expiry` and acquiring a `host_ref_counter` hold that prevents eviction.
6. When TTL expires, the pin is cleared and the node becomes eligible for normal eviction.
### Enabling Cache Pinning
**Frontend flag:**
```bash
python -m dynamo.frontend \
--router-mode kv \
--enable-cache-control \
...
```
| Flag | Description |
|------|-------------|
| `--enable-cache-control` | Enables cache control (PIN with TTL). Creates a `cache_control` service mesh client and fires `pin_prefix` after generation for requests with `nvext.cache_control`. Requires `--router-mode=kv`. |
**SGLang worker:** The worker receives PIN requests via its `cache_control` service mesh endpoint. You **must** set the `SGLANG_HICACHE_MAX_PINNED_RATIO` environment variable to a non-zero value -- pinning is disabled by default.
| Environment Variable | Type | Default | Description |
|---------------------|------|---------|-------------|
| `SGLANG_HICACHE_MAX_PINNED_RATIO` | `float` | `0.0` | Max fraction of cache tokens that can be pinned. Must be in `[0, 1)`. `0` disables pinning entirely. |
HiCache is required (`--enable-hierarchical-cache`). Without it, the scheduler rejects PIN requests. For best results, use `write_through` so that pinned nodes demote to host memory instead of being deleted when GPU memory fills:
```bash
SGLANG_HICACHE_MAX_PINNED_RATIO=0.1 python -m dynamo.sglang \
--model-path Qwen/Qwen3-14B-FP8 \
--enable-hierarchical-cache \
--hicache-ratio 2.0 \
--hicache-write-policy write_through \
...
```
### Request Format
Include `cache_control` as a top-level field in `nvext`:
```json
{
"model": "Qwen/Qwen3-14B-FP8",
"messages": [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "Explain quantum computing."}
],
"nvext": {
"cache_control": {
"type": "ephemeral",
"ttl": "1h"
}
}
}
```
| Field | Type | Description |
|-------|------|-------------|
| `cache_control.type` | `string` | Currently only `"ephemeral"` is supported. |
| `cache_control.ttl` | `string` | TTL as integer seconds (`"600"`) or shorthand (`"5m"`, `"1h"`). Clamped to [300, 3600] seconds. Unrecognized strings default to 300s. |
### Python Example
```python
from openai import OpenAI
client = OpenAI(base_url="http://localhost:8000/v1", api_key="dummy")
# First turn -- pin the conversation prefix for 1 hour
response = client.chat.completions.create(
model="Qwen/Qwen3-14B-FP8",
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": "Analyze this codebase and suggest improvements."},
],
stream=True,
extra_body={
"nvext": {
"cache_control": {
"type": "ephemeral",
"ttl": "1h"
}
}
}
)
# Collect the assistant reply
assistant_response = ""
for chunk in response:
if chunk.choices[0].delta.content:
assistant_response += chunk.choices[0].delta.content
# Later turns reuse the pinned prefix -- even after heavy load from
# other requests, the KV cache for this conversation is preserved.
response = client.chat.completions.create(
model="Qwen/Qwen3-14B-FP8",
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": "Analyze this codebase and suggest improvements."},
{"role": "assistant", "content": assistant_response},
{"role": "user", "content": "Now focus on the database layer."},
],
stream=True,
extra_body={
"nvext": {
"cache_control": {
"type": "ephemeral",
"ttl": "1h"
}
}
}
)
```
### Verifying Cache Hits
The response includes `prompt_tokens_details.cached_tokens` in the `usage` object when `--enable-cache-report` is set on the SGLang worker:
```json
{
"usage": {
"prompt_tokens": 2048,
"completion_tokens": 150,
"prompt_tokens_details": {
"cached_tokens": 1920
}
}
}
```
A high `cached_tokens / prompt_tokens` ratio on subsequent turns confirms that the pinned prefix was preserved.
### Limitations
- **Pinning disabled by default**: `SGLANG_HICACHE_MAX_PINNED_RATIO` defaults to `0.0`. You must set it to a non-zero value (e.g., `0.1`) or all PIN requests will be rejected.
- **HiCache required**: The scheduler rejects PIN requests unless `--enable-hierarchical-cache` is set.
- **TTL clamping**: Values are clamped to [300, 3600] seconds. You cannot pin for less than 5 minutes or more than 1 hour.
- **Pin budget**: Pinned tokens consume a budget controlled by `SGLANG_HICACHE_MAX_PINNED_RATIO` (fraction of host pool capacity). Requests exceeding this budget are rejected.
- **No priority on pinned nodes**: `pin_prefix` does not set a priority on the radix tree nodes. All pinned nodes have equal eviction priority and fall back to LRU ordering among themselves when host memory fills.
- **Requires stack restart for A/B testing**: Pins persist in cache across benchmark runs. When comparing pinned vs. unpinned performance, restart the full stack between phases to avoid false cache hits.
## See Also ## See Also
- **[NVIDIA Request Extensions (nvext)](../../components/frontend/nvext.md)**: Full `nvext` field reference including agent hints - **[NVIDIA Request Extensions (nvext)](../../components/frontend/nvext.md)**: Full `nvext` field reference including agent hints
......
...@@ -45,7 +45,6 @@ The Rust HTTP server also reads these environment variables (not exposed as CLI ...@@ -45,7 +45,6 @@ The Rust HTTP server also reads these environment variables (not exposed as CLI
| `--router-event-threads` | `DYN_ROUTER_EVENT_THREADS` | `4` | Event processing threads. >1 enables concurrent radix tree | | `--router-event-threads` | `DYN_ROUTER_EVENT_THREADS` | `4` | Event processing threads. >1 enables concurrent radix tree |
| `--router-queue-threshold` | `DYN_ROUTER_QUEUE_THRESHOLD` | `4.0` | Queue threshold fraction of prefill capacity. Enables priority scheduling | | `--router-queue-threshold` | `DYN_ROUTER_QUEUE_THRESHOLD` | `4.0` | Queue threshold fraction of prefill capacity. Enables priority scheduling |
| `--router-queue-policy` | `DYN_ROUTER_QUEUE_POLICY` | `fcfs` | Queue scheduling policy: `fcfs` (tail TTFT), `wspt` (avg TTFT), or `lcfs` (comparison-only reverse ordering) | | `--router-queue-policy` | `DYN_ROUTER_QUEUE_POLICY` | `fcfs` | Queue scheduling policy: `fcfs` (tail TTFT), `wspt` (avg TTFT), or `lcfs` (comparison-only reverse ordering) |
| `--enable-cache-control` / `--no-enable-cache-control` | `DYN_ENABLE_CACHE_CONTROL` | `false` | Enable TTL-based cache pinning (requires `--router-mode=kv`) |
| `--decode-fallback` / `--no-decode-fallback` | `DYN_DECODE_FALLBACK` | `false` | Fall back to aggregated mode when prefill workers unavailable | | `--decode-fallback` / `--no-decode-fallback` | `DYN_DECODE_FALLBACK` | `false` | Fall back to aggregated mode when prefill workers unavailable |
## Fault Tolerance ## Fault Tolerance
......
...@@ -39,7 +39,6 @@ Include `nvext` as a top-level field alongside standard OpenAI-compatible fields ...@@ -39,7 +39,6 @@ Include `nvext` as a top-level field alongside standard OpenAI-compatible fields
| `prefill_worker_id` | `u64` | `None` | Router | Routes the request to a specific prefill worker (disaggregated serving). | | `prefill_worker_id` | `u64` | `None` | Router | Routes the request to a specific prefill worker (disaggregated serving). |
| `decode_worker_id` | `u64` | `None` | Router | Routes the request to a specific decode worker (disaggregated serving). | | `decode_worker_id` | `u64` | `None` | Router | Routes the request to a specific decode worker (disaggregated serving). |
| `agent_hints` | object | `None` | Router | Per-request hints for scheduling and load balancing. See [Agent Hints](#agent-hints). | | `agent_hints` | object | `None` | Router | Per-request hints for scheduling and load balancing. See [Agent Hints](#agent-hints). |
| `cache_control` | object | `None` | Router | KV cache pinning hint with TTL. See [Cache Control](#cache-control). |
### Header Overrides ### Header Overrides
...@@ -130,31 +129,6 @@ Backend details: ...@@ -130,31 +129,6 @@ Backend details:
} }
``` ```
## Cache Control
> [!WARNING]
> Cache control is experimental and available on development branches only. The API may change.
The `cache_control` object enables explicit KV cache pinning with a TTL. When set, the router fires a `pin_prefix` call to the backend worker after generation completes, protecting the conversation's KV cache from eviction for the specified duration.
| Field | Type | Default | Description |
|-------|------|---------|-------------|
| `cache_control.type` | `string` | — | Cache control type. Currently only `"ephemeral"` is supported. |
| `cache_control.ttl` | `string` | `"300"` | TTL as integer seconds (`"600"`) or shorthand (`"5m"`, `"1h"`). Clamped to [300, 3600] seconds. |
```json
{
"nvext": {
"cache_control": {
"type": "ephemeral",
"ttl": "1h"
}
}
}
```
Requires `--enable-cache-control` and `--router-mode=kv` on the frontend. See [SGLang for Agentic Workloads](../../backends/sglang/agents.md#cache-pinning-experimental) for full setup and usage details.
## Response Extensions ## Response Extensions
When the client requests response metadata via `extra_fields`, the response includes an `nvext` object with the requested fields: When the client requests response metadata via `extra_fields`, the response includes an `nvext` object with the requested fields:
...@@ -190,4 +164,4 @@ When the client requests response metadata via `extra_fields`, the response incl ...@@ -190,4 +164,4 @@ When the client requests response metadata via `extra_fields`, the response incl
|----------|-------------| |----------|-------------|
| [Frontend Guide](frontend-guide.md) | KServe gRPC configuration and integration | | [Frontend Guide](frontend-guide.md) | KServe gRPC configuration and integration |
| [Router Guide](../router/router-guide.md) | Full router configuration and CLI arguments | | [Router Guide](../router/router-guide.md) | Full router configuration and CLI arguments |
| [SGLang for Agentic Workloads](../../backends/sglang/agents.md) | SGLang engine flags for priority scheduling, eviction policies, and cache pinning | | [SGLang for Agentic Workloads](../../backends/sglang/agents.md) | SGLang engine flags for priority scheduling and eviction policies |
...@@ -20,17 +20,17 @@ Three gaps stand out with current workflows: ...@@ -20,17 +20,17 @@ Three gaps stand out with current workflows:
## Dynamo as an Agentic Runtime ## Dynamo as an Agentic Runtime
Dynamo exposes **agentic hints** and uses them at three layers: frontend API, router, and KV cache management. Together, these enable workload-aware inference instead of generic, state-of-the-moment optimization. Dynamo exposes **agentic hints** and uses them at the frontend API, router, and backend scheduling layers. Together, these enable workload-aware inference instead of generic, state-of-the-moment optimization.
### Agentic Hints ### Agentic Hints
Agentic hints are per-request metadata that the agent client (e.g. Claude Code, Codex, [NeMo Agent Toolkit](https://github.com/NVIDIA/NeMo-Agent-Toolkit)) sends to Dynamo's frontend. They are carried in the request body under [**nvext**](../components/frontend/nvext.md#agent-hints) on chat completions. The frontend parses them and passes them to the KV router and, where applicable, to the KV cache manager and backends. Agentic hints are per-request metadata that the agent client (e.g. Claude Code, Codex, [NeMo Agent Toolkit](https://github.com/NVIDIA/NeMo-Agent-Toolkit)) sends to Dynamo's frontend. They are carried in the request body under [**nvext**](../components/frontend/nvext.md#agent-hints) on chat completions. The frontend parses them and passes them to the KV router and, where applicable, to backends.
- **Flow:** Harness sets hints in the request → Dynamo frontend parses `nvext` into routing hints → KV router uses them for queue ordering and worker selection → backends use them for priority scheduling and cache eviction. - **Flow:** Harness sets hints in the request → Dynamo frontend parses `nvext` into routing hints → KV router uses them for queue ordering and worker selection → backends use them for priority scheduling and cache eviction.
![Agentic workflow: Harness → hints in request → Dynamo frontend → routing hints → KV router (queue order, worker choice) → backend](../assets/img/agentic-hints-workflow.svg) ![Agentic workflow: Harness → hints in request → Dynamo frontend → routing hints → KV router (queue order, worker choice) → backend](../assets/img/agentic-hints-workflow.svg)
The request body includes `nvext.agent_hints` (routing, scheduling) and `nvext.cache_control` (TTL-based pinning); the frontend passes the former to the KV router and the latter to the KV block manager for cache pinning, prefetching, and eviction. The request body includes `nvext.agent_hints` for routing and scheduling metadata; the frontend passes those hints to the KV router for queue ordering and worker selection.
| Hint | Description | | Hint | Description |
|------|-------------| |------|-------------|
...@@ -40,15 +40,11 @@ The request body includes `nvext.agent_hints` (routing, scheduling) and `nvext.c ...@@ -40,15 +40,11 @@ The request body includes `nvext.agent_hints` (routing, scheduling) and `nvext.c
| `program_id` | (Planned) Identifies the agentic program for program-level metrics and cache affinity. | | `program_id` | (Planned) Identifies the agentic program for program-level metrics and cache affinity. |
| `context_type` | (Planned) Semantic type (e.g. system prompt, tool definition, reasoning branch) for context-aware eviction. | | `context_type` | (Planned) Semantic type (e.g. system prompt, tool definition, reasoning branch) for context-aware eviction. |
**`nvext.cache_control`** (sibling of `agent_hints`, not inside it) provides TTL-based KV cache pinning. Pinned prefixes resist eviction for the specified duration. See [SGLang for Agentic Workloads — Cache Pinning](../backends/sglang/agents.md#cache-pinning-experimental).
## Feature matrix ## Feature matrix
| Feature | vLLM | SGLang | TensorRT-LLM | | Feature | vLLM | SGLang | TensorRT-LLM |
|---------|:----:|:------:|:-------------:| |---------|:----:|:------:|:-------------:|
| Priority-based cache eviction | 🚧 | ✅ | 🚧 | | Priority-based cache eviction | 🚧 | ✅ | 🚧 |
| Cache pinning | | ✅ | 🚧 |
| Cache prefetching | | 🚧 | | | Cache prefetching | | 🚧 | |
| Subagent / thinking-aware cache eviction | | 🚧 | | | Subagent / thinking-aware cache eviction | | 🚧 | |
| Speculative prefill | ✅ | ✅ | ✅ | | Speculative prefill | ✅ | ✅ | ✅ |
...@@ -68,11 +64,6 @@ Dynamo is now supported directly in LangChain using the [NVIDIA AI Endpoints int ...@@ -68,11 +64,6 @@ Dynamo is now supported directly in LangChain using the [NVIDIA AI Endpoints int
- **Priority-based KV cache eviction:** Instead of evicting by LRU alone, the backend can evict **low-priority** cache entries first when the GPU (and, with HiCache, host) cache is full. The `priority` value in `nvext.agent_hints` is forwarded to the engine; with SGLang, enable `--enable-priority-scheduling` and `--radix-eviction-policy priority`. - **Priority-based KV cache eviction:** Instead of evicting by LRU alone, the backend can evict **low-priority** cache entries first when the GPU (and, with HiCache, host) cache is full. The `priority` value in `nvext.agent_hints` is forwarded to the engine; with SGLang, enable `--enable-priority-scheduling` and `--radix-eviction-policy priority`.
- **Cache pinning (experimental):** [Anthropic's v1/messages](https://docs.anthropic.com/en/docs/build-with-claude/caching) includes a `cache_control` field that tells servers how long to keep KV cache for specific blocks. Dynamo implements an OSS version with SGLang's HiCache: users can set `cache_control` via the same API as Anthropic or as an `nvext` field on chat completions. When set, the Dynamo router calls a hook in HiCache after the request completes to **pin** the blocks created by those tokens for the user-specified TTL. Pinned nodes resist eviction (demoting to host memory rather than being deleted).
In the Nemo Agentic toolkit and Dynamo integration, TTL is dynamically computed as the product of how many times a block is expected to be reused and the time between those requests; the NAT profiler pre-computes these expectations during agent evaluations and stores them in a data structure per agent, then injects `nvext.cache_control` with the derived TTL (see [dynamo_llm.py](https://github.com/NVIDIA/NeMo-Agent-Toolkit/blob/develop/packages/nvidia_nat_core/src/nat/llm/dynamo_llm.py)).
**Future work:** TTL could be determined dynamically by context type—e.g. think tokens or scratchpad content could use a lower TTL than system prompt or tool definitions, so high-value static context is retained longer while ephemeral context expires sooner.
- **Cache prefetching (future work):** Using the predictable agentic lifecycle (e.g. parent-child subagents, known next turn), Dynamo could proactively prefetch or move KV cache to a different worker so that the next request hits warm cache. - **Cache prefetching (future work):** Using the predictable agentic lifecycle (e.g. parent-child subagents, known next turn), Dynamo could proactively prefetch or move KV cache to a different worker so that the next request hits warm cache.
### Speculative prefill ### Speculative prefill
......
...@@ -58,7 +58,7 @@ impl KvRouterConfig { ...@@ -58,7 +58,7 @@ impl KvRouterConfig {
#[pymethods] #[pymethods]
impl KvRouterConfig { impl KvRouterConfig {
#[new] #[new]
#[pyo3(signature = (overlap_score_weight=1.0, router_temperature=0.0, use_kv_events=true, durable_kv_events=false, router_replica_sync=false, router_track_active_blocks=true, router_track_output_blocks=false, router_assume_kv_reuse=true, router_track_prefill_tokens=true, router_snapshot_threshold=1000000, router_reset_states=false, router_ttl_secs=120.0, router_max_tree_size=1048576, router_prune_target_ratio=0.8, router_queue_threshold=Some(4.0), router_event_threads=4, router_enable_cache_control=false, router_queue_policy="fcfs", remote_indexer_component=None))] #[pyo3(signature = (overlap_score_weight=1.0, router_temperature=0.0, use_kv_events=true, durable_kv_events=false, router_replica_sync=false, router_track_active_blocks=true, router_track_output_blocks=false, router_assume_kv_reuse=true, router_track_prefill_tokens=true, router_snapshot_threshold=1000000, router_reset_states=false, router_ttl_secs=120.0, router_max_tree_size=1048576, router_prune_target_ratio=0.8, router_queue_threshold=Some(4.0), router_event_threads=4, router_queue_policy="fcfs", remote_indexer_component=None))]
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
fn new( fn new(
overlap_score_weight: f64, overlap_score_weight: f64,
...@@ -77,7 +77,6 @@ impl KvRouterConfig { ...@@ -77,7 +77,6 @@ impl KvRouterConfig {
router_prune_target_ratio: f64, router_prune_target_ratio: f64,
router_queue_threshold: Option<f64>, router_queue_threshold: Option<f64>,
router_event_threads: u32, router_event_threads: u32,
router_enable_cache_control: bool,
router_queue_policy: &str, router_queue_policy: &str,
remote_indexer_component: Option<String>, remote_indexer_component: Option<String>,
) -> Self { ) -> Self {
...@@ -99,7 +98,6 @@ impl KvRouterConfig { ...@@ -99,7 +98,6 @@ impl KvRouterConfig {
router_prune_target_ratio, router_prune_target_ratio,
router_queue_threshold, router_queue_threshold,
router_event_threads, router_event_threads,
router_enable_cache_control,
skip_initial_worker_wait: false, skip_initial_worker_wait: false,
router_queue_policy: router_queue_policy.parse().unwrap_or_else(|_| { router_queue_policy: router_queue_policy.parse().unwrap_or_else(|_| {
panic!("invalid router_queue_policy: {router_queue_policy:?}") panic!("invalid router_queue_policy: {router_queue_policy:?}")
......
...@@ -1179,7 +1179,6 @@ class KvRouterConfig: ...@@ -1179,7 +1179,6 @@ class KvRouterConfig:
router_prune_target_ratio: float = 0.8, router_prune_target_ratio: float = 0.8,
router_queue_threshold: Optional[float] = 4.0, router_queue_threshold: Optional[float] = 4.0,
router_event_threads: int = 4, router_event_threads: int = 4,
router_enable_cache_control: bool = False,
router_queue_policy: str = "fcfs", router_queue_policy: str = "fcfs",
) -> None: ) -> None:
""" """
...@@ -1211,8 +1210,6 @@ class KvRouterConfig: ...@@ -1211,8 +1210,6 @@ class KvRouterConfig:
Set to None to disable queueing (all requests go directly to the scheduler). Set to None to disable queueing (all requests go directly to the scheduler).
router_event_threads: Number of event processing threads (default: 4). router_event_threads: Number of event processing threads (default: 4).
When > 1, uses a concurrent radix tree with a thread pool. When > 1, uses a concurrent radix tree with a thread pool.
router_enable_cache_control: Enable cache control (PIN with TTL) via the worker's
cache_control service mesh endpoint (default: False).
router_queue_policy: Scheduling policy for the router queue (default: "fcfs"). router_queue_policy: Scheduling policy for the router queue (default: "fcfs").
"fcfs": first-come first-served with priority bumps — optimizes tail TTFT. "fcfs": first-come first-served with priority bumps — optimizes tail TTFT.
"lcfs": last-come first-served with priority bumps — intentionally worsens tail behavior for policy comparisons. "lcfs": last-come first-served with priority bumps — intentionally worsens tail behavior for policy comparisons.
...@@ -2035,4 +2032,3 @@ class StreamIncomplete(DynamoException): ...@@ -2035,4 +2032,3 @@ class StreamIncomplete(DynamoException):
"""The response stream was terminated before completion.""" """The response stream was terminated before completion."""
... ...
...@@ -156,12 +156,6 @@ pub struct KvRouterConfig { ...@@ -156,12 +156,6 @@ pub struct KvRouterConfig {
#[validate(range(min = 1))] #[validate(range(min = 1))]
pub router_event_threads: u32, pub router_event_threads: u32,
/// Enable cache control (PIN with TTL) via the worker's cache_control service mesh endpoint.
/// When true, the router creates a cache_control client and honors nvext.cache_control on
/// requests, firing a pin_prefix call (with TTL) to the worker after generation completes.
/// When false (default), cache_control is ignored and no cache_control client is created.
pub router_enable_cache_control: bool,
pub skip_initial_worker_wait: bool, pub skip_initial_worker_wait: bool,
/// Scheduling policy for the router queue. /// Scheduling policy for the router queue.
...@@ -196,7 +190,6 @@ impl Default for KvRouterConfig { ...@@ -196,7 +190,6 @@ impl Default for KvRouterConfig {
router_prune_target_ratio: 0.8, router_prune_target_ratio: 0.8,
router_queue_threshold: Some(4.0), router_queue_threshold: Some(4.0),
router_event_threads: 4, router_event_threads: 4,
router_enable_cache_control: false,
skip_initial_worker_wait: false, skip_initial_worker_wait: false,
router_queue_policy: RouterQueuePolicy::default(), router_queue_policy: RouterQueuePolicy::default(),
remote_indexer_component: None, remote_indexer_component: None,
......
...@@ -28,7 +28,6 @@ use futures::stream; ...@@ -28,7 +28,6 @@ use futures::stream;
use tracing::Instrument; use tracing::Instrument;
use validator::Validate; use validator::Validate;
pub mod cache_control;
pub mod indexer; pub mod indexer;
mod jetstream; mod jetstream;
pub mod metrics; pub mod metrics;
...@@ -40,7 +39,6 @@ pub mod sequence; ...@@ -40,7 +39,6 @@ pub mod sequence;
pub mod subscriber; pub mod subscriber;
pub mod worker_query; pub mod worker_query;
pub use cache_control::{CacheControlClient, spawn_pin_prefix};
pub use indexer::Indexer; pub use indexer::Indexer;
pub use prefill_router::PrefillRouter; pub use prefill_router::PrefillRouter;
pub use push_router::{DirectRoutingRouter, KvPushRouter}; pub use push_router::{DirectRoutingRouter, KvPushRouter};
......
// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
use anyhow::Result;
use dynamo_runtime::{
component::Component,
pipeline::{PushRouter, RouterMode, SingleIn},
protocols::annotated::Annotated,
};
use futures::StreamExt;
use crate::protocols::TokenIdType;
/// State captured at routing time for a deferred PIN after generation completes.
pub(crate) struct PinState {
pub token_ids: Vec<TokenIdType>,
pub cc_client: CacheControlClient,
pub instance_id: u64,
pub ttl_seconds: u64,
}
/// A PushRouter client typed for cache_control requests/responses.
///
/// Both request and response are untyped JSON. The worker's cache_control
/// endpoint returns {"status": "ok"/"error", ...} but the router treats
/// PIN as fire-and-forget and only logs the response at debug level.
pub type CacheControlClient = PushRouter<serde_json::Value, Annotated<serde_json::Value>>;
/// Create a cache_control client from a component.
///
/// Connects to the "cache_control" endpoint on the given component and returns
/// a PushRouter client for sending cache control operations (pin_prefix,
/// unpin_prefix) to workers.
pub(crate) async fn create_cache_control_client(
component: &Component,
) -> Result<CacheControlClient> {
let client = component.endpoint("cache_control").client().await?;
CacheControlClient::from_client(client, RouterMode::KV).await
}
/// Fire-and-forget pin_prefix to the worker that served this request.
///
/// Spawns a detached task that sends the pin request and logs the outcome.
/// Does nothing if `client` is `None` (logs a warning).
pub fn spawn_pin_prefix(
client: Option<&CacheControlClient>,
token_ids: &[TokenIdType],
instance_id: u64,
context_id: &str,
ttl_seconds: u64,
) {
let Some(cc) = client else {
tracing::warn!(
request_id = %context_id,
"cache_control set but no cache_control_client configured"
);
return;
};
let cc = cc.clone();
let token_ids = token_ids.to_vec();
let context_id = context_id.to_owned();
tokio::spawn(async move {
let pin_request = serde_json::json!({
"action": "pin_prefix",
"token_ids": token_ids,
"ttl_seconds": ttl_seconds,
});
match cc.direct(SingleIn::new(pin_request), instance_id).await {
Ok(mut stream) => {
if let Some(resp) = stream.next().await {
tracing::info!(
request_id = %context_id,
worker_id = instance_id,
?resp,
"pin_prefix response"
);
}
// Drain remaining stream to avoid "Failed to publish
// complete final" errors from the push handler.
while stream.next().await.is_some() {}
}
Err(e) => {
tracing::warn!(
request_id = %context_id,
worker_id = instance_id,
"Failed to pin prefix: {e}"
);
}
}
});
}
...@@ -15,15 +15,10 @@ use dynamo_runtime::{ ...@@ -15,15 +15,10 @@ use dynamo_runtime::{
}; };
use futures::stream::{self, StreamExt}; use futures::stream::{self, StreamExt};
use serde_json::json; use serde_json::json;
use tokio::sync::OnceCell;
use tracing::Instrument; use tracing::Instrument;
use crate::{ use crate::{
kv_router::{ kv_router::{KvRouter, metrics::RouterRequestMetrics},
CacheControlClient, KvRouter,
cache_control::{PinState, create_cache_control_client, spawn_pin_prefix},
metrics::RouterRequestMetrics,
},
preprocessor::PreprocessedRequest, preprocessor::PreprocessedRequest,
protocols::common::{ protocols::common::{
llm_backend::LLMEngineOutput, llm_backend::LLMEngineOutput,
...@@ -34,8 +29,6 @@ use crate::{ ...@@ -34,8 +29,6 @@ use crate::{
pub struct KvPushRouter { pub struct KvPushRouter {
inner: PushRouter<PreprocessedRequest, Annotated<LLMEngineOutput>>, inner: PushRouter<PreprocessedRequest, Annotated<LLMEngineOutput>>,
pub chooser: Arc<KvRouter>, pub chooser: Arc<KvRouter>,
/// Lazily initialized on first PIN request. `None` when cache_control is disabled.
cache_control_cell: Option<OnceCell<CacheControlClient>>,
} }
/// Result of worker selection containing instance ID, dp_rank, and overlap amount. /// Result of worker selection containing instance ID, dp_rank, and overlap amount.
...@@ -66,8 +59,6 @@ struct RequestGuard { ...@@ -66,8 +59,6 @@ struct RequestGuard {
isl_tokens: usize, isl_tokens: usize,
block_size: usize, block_size: usize,
expected_output_tokens: Option<u32>, expected_output_tokens: Option<u32>,
// PIN state: set when cache_control TTL is present and a cc_client exists
pin_state: Option<PinState>,
} }
impl RequestGuard { impl RequestGuard {
...@@ -143,16 +134,6 @@ impl RequestGuard { ...@@ -143,16 +134,6 @@ impl RequestGuard {
tracing::warn!("Failed to free request {}: {e}", self.context_id); tracing::warn!("Failed to free request {}: {e}", self.context_id);
} }
self.freed = true; self.freed = true;
if let Some(ref pin) = self.pin_state {
spawn_pin_prefix(
Some(&pin.cc_client),
&pin.token_ids,
pin.instance_id,
&self.context_id,
pin.ttl_seconds,
);
}
} }
fn record_metrics(&mut self) { fn record_metrics(&mut self) {
...@@ -200,17 +181,7 @@ impl KvPushRouter { ...@@ -200,17 +181,7 @@ impl KvPushRouter {
// and the standalone router create KvPushRouter, so this covers both. // and the standalone router create KvPushRouter, so this covers both.
RouterRequestMetrics::from_component(chooser.client().endpoint.component()); RouterRequestMetrics::from_component(chooser.client().endpoint.component());
let cache_control_cell = if chooser.kv_router_config().router_enable_cache_control { KvPushRouter { inner, chooser }
tracing::info!("Cache control enabled for PIN operations (lazy init)");
Some(OnceCell::new())
} else {
None
};
KvPushRouter {
inner,
chooser,
cache_control_cell,
}
} }
/// Select a worker for the request, either using a preselected worker or finding the best match. /// Select a worker for the request, either using a preselected worker or finding the best match.
...@@ -472,26 +443,6 @@ impl AsyncEngine<SingleIn<PreprocessedRequest>, ManyOut<Annotated<LLMEngineOutpu ...@@ -472,26 +443,6 @@ impl AsyncEngine<SingleIn<PreprocessedRequest>, ManyOut<Annotated<LLMEngineOutpu
let track_output_blocks = self.chooser.kv_router_config().router_track_output_blocks; let track_output_blocks = self.chooser.kv_router_config().router_track_output_blocks;
let tracker = request.tracker.clone(); let tracker = request.tracker.clone();
// Extract pin state: lazily init cache_control client on first PIN request
let pin_state: Option<PinState> = async {
let ttl = request.routing.as_ref().and_then(|r| r.cache_control_ttl)?;
let cell = self.cache_control_cell.as_ref()?;
let component = self.chooser.client().endpoint.component().clone();
let client = cell
.get_or_try_init(|| create_cache_control_client(&component))
.await
.inspect_err(|e| tracing::warn!("Failed to create cache_control client: {e}"))
.ok()?
.clone();
Some(PinState {
token_ids: request.token_ids.clone(),
cc_client: client,
instance_id,
ttl_seconds: ttl,
})
}
.await;
let (mut backend_input, context) = request.into_parts(); let (mut backend_input, context) = request.into_parts();
backend_input.routing_mut().dp_rank = Some(dp_rank); backend_input.routing_mut().dp_rank = Some(dp_rank);
let updated_request = context.map(|_| backend_input); let updated_request = context.map(|_| backend_input);
...@@ -533,7 +484,6 @@ impl AsyncEngine<SingleIn<PreprocessedRequest>, ManyOut<Annotated<LLMEngineOutpu ...@@ -533,7 +484,6 @@ impl AsyncEngine<SingleIn<PreprocessedRequest>, ManyOut<Annotated<LLMEngineOutpu
isl_tokens, isl_tokens,
block_size, block_size,
expected_output_tokens, expected_output_tokens,
pin_state,
}; };
loop { loop {
......
...@@ -312,9 +312,6 @@ impl OpenAIPreprocessor { ...@@ -312,9 +312,6 @@ impl OpenAIPreprocessor {
builder.mdc_sum(Some(self.mdcsum.clone())); builder.mdc_sum(Some(self.mdcsum.clone()));
let lora_name = self.lora_name.clone(); let lora_name = self.lora_name.clone();
// Extract cache_control TTL from either nvext or top-level field
let cache_control_ttl = request.effective_cache_control().map(|cc| cc.ttl_seconds());
// Extract routing hints from nvext if present // Extract routing hints from nvext if present
if let Some(nvext) = request.nvext() { if let Some(nvext) = request.nvext() {
// Build routing hints from nvext fields // Build routing hints from nvext fields
...@@ -333,16 +330,13 @@ impl OpenAIPreprocessor { ...@@ -333,16 +330,13 @@ impl OpenAIPreprocessor {
}), }),
priority: hints.and_then(|h| h.priority), priority: hints.and_then(|h| h.priority),
lora_name, lora_name,
cache_control_ttl: nvext.cache_control.as_ref().map(|cc| cc.ttl_seconds()),
allowed_worker_ids: None, allowed_worker_ids: None,
}; };
builder.routing(Some(routing)); builder.routing(Some(routing));
} else if lora_name.is_some() || cache_control_ttl.is_some() { } else if lora_name.is_some() {
// Ensure routing hints exist when we have LoRA or cache_control, // Ensure routing hints exist when we have LoRA.
// even when nvext is absent (e.g. Anthropic endpoint requests).
builder.routing(Some(RoutingHints { builder.routing(Some(RoutingHints {
lora_name, lora_name,
cache_control_ttl,
..Default::default() ..Default::default()
})); }));
} }
......
...@@ -28,7 +28,7 @@ use crate::protocols::openai::chat_completions::{ ...@@ -28,7 +28,7 @@ use crate::protocols::openai::chat_completions::{
NvCreateChatCompletionRequest, NvCreateChatCompletionResponse, NvCreateChatCompletionRequest, NvCreateChatCompletionResponse,
}; };
use crate::protocols::openai::common_ext::CommonExt; use crate::protocols::openai::common_ext::CommonExt;
use crate::protocols::openai::nvext::NvExt;
impl TryFrom<AnthropicCreateMessageRequest> for NvCreateChatCompletionRequest { impl TryFrom<AnthropicCreateMessageRequest> for NvCreateChatCompletionRequest {
type Error = anyhow::Error; type Error = anyhow::Error;
...@@ -119,44 +119,7 @@ impl TryFrom<AnthropicCreateMessageRequest> for NvCreateChatCompletionRequest { ...@@ -119,44 +119,7 @@ impl TryFrom<AnthropicCreateMessageRequest> for NvCreateChatCompletionRequest {
top_k: req.top_k.map(|k| k as i32), top_k: req.top_k.map(|k| k as i32),
..Default::default() ..Default::default()
}, },
nvext: { nvext: None,
// Lossy: collapse all per-block cache_control into a single
// last-one-wins value. Sufficient for backends with a single
// prefix cache boundary. Full per-block breakpoints are
// preserved in AnthropicContext::cache_breakpoints via UnifiedRequest.
let mut last_block_cc: Option<CacheControl> = None;
for msg in &req.messages {
if let AnthropicMessageContent::Blocks { content } = &msg.content {
for block in content {
let block_cc = match block {
AnthropicContentBlock::Text { cache_control, .. } => {
cache_control.as_ref()
}
AnthropicContentBlock::ToolUse { cache_control, .. } => {
cache_control.as_ref()
}
AnthropicContentBlock::ToolResult { cache_control, .. } => {
cache_control.as_ref()
}
AnthropicContentBlock::Thinking { cache_control, .. } => {
cache_control.as_ref()
}
_ => None,
};
if let Some(cc) = block_cc {
last_block_cc = Some(cc.clone());
}
}
}
}
// Merge: top-level > per-block > system block cache_control
let system_cc = req.system.as_ref().and_then(|s| s.cache_control.clone());
let effective_cc = req.cache_control.clone().or(last_block_cc).or(system_cc);
effective_cc.map(|cc| NvExt {
cache_control: Some(cc),
..Default::default()
})
},
// chat_template_args may be augmented by the Anthropic handler // chat_template_args may be augmented by the Anthropic handler
// (anthropic.rs) after conversion — e.g., setting enable_thinking=true // (anthropic.rs) after conversion — e.g., setting enable_thinking=true
// when a reasoning parser is configured. The conversion layer only // when a reasoning parser is configured. The conversion layer only
...@@ -1419,7 +1382,7 @@ mod tests { ...@@ -1419,7 +1382,7 @@ mod tests {
#[test] #[test]
fn test_cache_control_passthrough() { fn test_cache_control_passthrough() {
use crate::protocols::openai::nvext::{CacheControl, CacheControlType}; use dynamo_protocols::types::anthropic::{CacheControl, CacheControlType};
let req = AnthropicCreateMessageRequest { let req = AnthropicCreateMessageRequest {
model: "test-model".into(), model: "test-model".into(),
...@@ -1450,18 +1413,11 @@ mod tests { ...@@ -1450,18 +1413,11 @@ mod tests {
}; };
let chat_req: NvCreateChatCompletionRequest = req.try_into().unwrap(); let chat_req: NvCreateChatCompletionRequest = req.try_into().unwrap();
let nvext = chat_req.nvext.expect("nvext should be set"); assert!(chat_req.nvext.is_none());
let cc = nvext
.cache_control
.expect("nvext.cache_control should be set");
assert_eq!(cc.control_type, CacheControlType::Ephemeral);
assert_eq!(cc.ttl_seconds(), 300);
} }
#[test] #[test]
fn test_cache_control_1h_ttl_passthrough() { fn test_cache_control_1h_ttl_passthrough() {
use crate::protocols::openai::nvext::CacheControlType;
let json = r#"{ let json = r#"{
"model": "test", "model": "test",
"max_tokens": 100, "max_tokens": 100,
...@@ -1472,12 +1428,7 @@ mod tests { ...@@ -1472,12 +1428,7 @@ mod tests {
assert!(req.cache_control.is_some()); assert!(req.cache_control.is_some());
let chat_req: NvCreateChatCompletionRequest = req.try_into().unwrap(); let chat_req: NvCreateChatCompletionRequest = req.try_into().unwrap();
let nvext = chat_req.nvext.expect("nvext should be set"); assert!(chat_req.nvext.is_none());
let cc = nvext
.cache_control
.expect("nvext.cache_control should be set");
assert_eq!(cc.control_type, CacheControlType::Ephemeral);
assert_eq!(cc.ttl_seconds(), 3600);
} }
#[test] #[test]
...@@ -1546,8 +1497,6 @@ mod tests { ...@@ -1546,8 +1497,6 @@ mod tests {
#[test] #[test]
fn test_per_block_cache_control_last_wins() { fn test_per_block_cache_control_last_wins() {
use crate::protocols::openai::nvext::CacheControlType;
let json = r#"{ let json = r#"{
"model": "test", "model": "test",
"max_tokens": 100, "max_tokens": 100,
...@@ -1563,10 +1512,7 @@ mod tests { ...@@ -1563,10 +1512,7 @@ mod tests {
}"#; }"#;
let req: AnthropicCreateMessageRequest = serde_json::from_str(json).unwrap(); let req: AnthropicCreateMessageRequest = serde_json::from_str(json).unwrap();
let chat_req: NvCreateChatCompletionRequest = req.try_into().unwrap(); let chat_req: NvCreateChatCompletionRequest = req.try_into().unwrap();
let nvext = chat_req.nvext.expect("nvext should be set"); assert!(chat_req.nvext.is_none());
let cc = nvext.cache_control.expect("cache_control should be set");
assert_eq!(cc.control_type, CacheControlType::Ephemeral);
assert_eq!(cc.ttl_seconds(), 3600); // Last block's 1h TTL wins
} }
#[test] #[test]
...@@ -1586,16 +1532,11 @@ mod tests { ...@@ -1586,16 +1532,11 @@ mod tests {
}"#; }"#;
let req: AnthropicCreateMessageRequest = serde_json::from_str(json).unwrap(); let req: AnthropicCreateMessageRequest = serde_json::from_str(json).unwrap();
let chat_req: NvCreateChatCompletionRequest = req.try_into().unwrap(); let chat_req: NvCreateChatCompletionRequest = req.try_into().unwrap();
let nvext = chat_req.nvext.expect("nvext should be set"); assert!(chat_req.nvext.is_none());
let cc = nvext.cache_control.expect("cache_control should be set");
// Top-level (no TTL = 300s default) takes precedence over per-block (1h)
assert_eq!(cc.ttl_seconds(), 300);
} }
#[test] #[test]
fn test_system_block_array_with_cache_control() { fn test_system_block_array_with_cache_control() {
use crate::protocols::openai::nvext::CacheControlType;
let json = r#"{ let json = r#"{
"model": "test", "model": "test",
"max_tokens": 100, "max_tokens": 100,
...@@ -1612,11 +1553,7 @@ mod tests { ...@@ -1612,11 +1553,7 @@ mod tests {
assert!(system.cache_control.is_some()); assert!(system.cache_control.is_some());
let chat_req: NvCreateChatCompletionRequest = req.try_into().unwrap(); let chat_req: NvCreateChatCompletionRequest = req.try_into().unwrap();
let nvext = chat_req assert!(chat_req.nvext.is_none());
.nvext
.expect("nvext should be set from system cache_control");
let cc = nvext.cache_control.expect("cache_control should be set");
assert_eq!(cc.control_type, CacheControlType::Ephemeral);
} }
#[test] #[test]
......
...@@ -58,10 +58,6 @@ pub struct RoutingHints { ...@@ -58,10 +58,6 @@ pub struct RoutingHints {
#[serde(default, skip_serializing_if = "Option::is_none")] #[serde(default, skip_serializing_if = "Option::is_none")]
pub priority: Option<i32>, pub priority: Option<i32>,
/// TTL in seconds for cache control pinning. None = no pinning.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub cache_control_ttl: Option<u64>,
/// Worker IDs provided externally and not discovered by the router. /// Worker IDs provided externally and not discovered by the router.
/// When set, only workers in this set are considered during scoring. /// When set, only workers in this set are considered during scoring.
#[serde(default, skip_serializing_if = "Option::is_none")] #[serde(default, skip_serializing_if = "Option::is_none")]
......
...@@ -95,10 +95,6 @@ impl NvExtProvider for NvCreateChatCompletionRequest { ...@@ -95,10 +95,6 @@ impl NvExtProvider for NvCreateChatCompletionRequest {
fn raw_prompt(&self) -> Option<String> { fn raw_prompt(&self) -> Option<String> {
None None
} }
fn effective_cache_control(&self) -> Option<&crate::protocols::openai::nvext::CacheControl> {
NvExtProvider::nvext(self).and_then(|ext| ext.cache_control.as_ref())
}
} }
/// Implements `AnnotationsProvider` for `NvCreateChatCompletionRequest`, /// Implements `AnnotationsProvider` for `NvCreateChatCompletionRequest`,
......
...@@ -49,13 +49,6 @@ pub fn apply_header_routing_overrides(nvext: Option<NvExt>, headers: &HeaderMap) ...@@ -49,13 +49,6 @@ pub fn apply_header_routing_overrides(nvext: Option<NvExt>, headers: &HeaderMap)
pub trait NvExtProvider { pub trait NvExtProvider {
fn nvext(&self) -> Option<&NvExt>; fn nvext(&self) -> Option<&NvExt>;
fn raw_prompt(&self) -> Option<String>; fn raw_prompt(&self) -> Option<String>;
/// Return the effective cache control for this request.
/// Default: delegates to `nvext.cache_control`. Implementations may override
/// to also check a top-level `cache_control` field (see `NvCreateChatCompletionRequest`).
fn effective_cache_control(&self) -> Option<&CacheControl> {
self.nvext().and_then(|ext| ext.cache_control.as_ref())
}
} }
/// Worker ID information for disaggregated serving /// Worker ID information for disaggregated serving
...@@ -169,12 +162,6 @@ pub struct NvExt { ...@@ -169,12 +162,6 @@ pub struct NvExt {
#[serde(default, skip_serializing_if = "Option::is_none")] #[serde(default, skip_serializing_if = "Option::is_none")]
pub agent_hints: Option<AgentHints>, pub agent_hints: Option<AgentHints>,
/// Cache control hint (Anthropic-style). When present, the router pins
/// the prefix on the selected worker with the given TTL.
#[builder(default, setter(strip_option))]
#[serde(default, skip_serializing_if = "Option::is_none")]
pub cache_control: Option<CacheControl>,
/// Optional request timestamp in milliseconds for trace replay / virtual-time simulation. /// Optional request timestamp in milliseconds for trace replay / virtual-time simulation.
#[builder(default, setter(strip_option))] #[builder(default, setter(strip_option))]
#[serde(default, skip_serializing_if = "Option::is_none")] #[serde(default, skip_serializing_if = "Option::is_none")]
...@@ -214,10 +201,6 @@ pub struct AgentHints { ...@@ -214,10 +201,6 @@ pub struct AgentHints {
pub latency_sensitivity: Option<f64>, pub latency_sensitivity: Option<f64>,
} }
// Re-export CacheControl types from dynamo-async-openai where they are canonically defined
// alongside the Anthropic protocol types they originate from.
pub use dynamo_protocols::types::anthropic::{CacheControl, CacheControlType};
impl Default for NvExt { impl Default for NvExt {
fn default() -> Self { fn default() -> Self {
NvExt::builder().build().unwrap() NvExt::builder().build().unwrap()
...@@ -265,74 +248,7 @@ mod tests { ...@@ -265,74 +248,7 @@ mod tests {
assert_eq!(nv_ext.prefill_worker_id, None); assert_eq!(nv_ext.prefill_worker_id, None);
assert_eq!(nv_ext.decode_worker_id, None); assert_eq!(nv_ext.decode_worker_id, None);
assert_eq!(nv_ext.agent_hints, None); assert_eq!(nv_ext.agent_hints, None);
assert_eq!(nv_ext.cache_control, None); assert_eq!(nv_ext.request_timestamp_ms, None);
}
// Test CacheControl serde roundtrip and TTL parsing
#[test]
fn test_cache_control_serde_and_ttl() {
// Default (ephemeral, no TTL)
let cc = CacheControl::default();
assert_eq!(cc.control_type, CacheControlType::Ephemeral);
assert_eq!(cc.ttl, None);
assert_eq!(cc.ttl_seconds(), 300);
// Shorthand values
let cc_5m = CacheControl {
control_type: CacheControlType::Ephemeral,
ttl: Some("5m".to_string()),
};
assert_eq!(cc_5m.ttl_seconds(), 300);
let cc_1h = CacheControl {
control_type: CacheControlType::Ephemeral,
ttl: Some("1h".to_string()),
};
assert_eq!(cc_1h.ttl_seconds(), 3600);
// Integer seconds -- within range
let cc_600 = CacheControl {
control_type: CacheControlType::Ephemeral,
ttl: Some("600".to_string()),
};
assert_eq!(cc_600.ttl_seconds(), 600);
// Integer seconds -- clamped to min (300)
let cc_low = CacheControl {
control_type: CacheControlType::Ephemeral,
ttl: Some("10".to_string()),
};
assert_eq!(cc_low.ttl_seconds(), 300);
// Integer seconds -- clamped to max (3600)
let cc_high = CacheControl {
control_type: CacheControlType::Ephemeral,
ttl: Some("7200".to_string()),
};
assert_eq!(cc_high.ttl_seconds(), 3600);
// Unrecognized string defaults to 300
let cc_bad = CacheControl {
control_type: CacheControlType::Ephemeral,
ttl: Some("forever".to_string()),
};
assert_eq!(cc_bad.ttl_seconds(), 300);
// Serde roundtrip
let json = serde_json::to_string(&cc_5m).unwrap();
let deser: CacheControl = serde_json::from_str(&json).unwrap();
assert_eq!(deser, cc_5m);
// Deserialize from API-style JSON
let api_json = r#"{"type": "ephemeral", "ttl": "1h"}"#;
let from_api: CacheControl = serde_json::from_str(api_json).unwrap();
assert_eq!(from_api.ttl_seconds(), 3600);
// NvExt with cache_control
let nvext_json = r#"{"cache_control": {"type": "ephemeral", "ttl": "5m"}}"#;
let nvext: NvExt = serde_json::from_str(nvext_json).unwrap();
assert!(nvext.cache_control.is_some());
assert_eq!(nvext.cache_control.unwrap().ttl_seconds(), 300);
} }
// Test valid builder configurations // Test valid builder configurations
......
...@@ -33,6 +33,7 @@ ...@@ -33,6 +33,7 @@
use std::collections::HashMap; use std::collections::HashMap;
use dynamo_protocols::types::anthropic::CacheControl;
use dynamo_runtime::protocols::annotated::AnnotationsProvider; use dynamo_runtime::protocols::annotated::AnnotationsProvider;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
...@@ -41,7 +42,7 @@ use crate::preprocessor::prompt::{OAIChatLikeRequest, TextInput}; ...@@ -41,7 +42,7 @@ use crate::preprocessor::prompt::{OAIChatLikeRequest, TextInput};
use crate::protocols::openai::chat_completions::NvCreateChatCompletionRequest; use crate::protocols::openai::chat_completions::NvCreateChatCompletionRequest;
use crate::protocols::openai::common_ext::{CommonExt, CommonExtProvider}; use crate::protocols::openai::common_ext::{CommonExt, CommonExtProvider};
use crate::protocols::openai::nvext::{CacheControl, NvExt, NvExtProvider}; use crate::protocols::openai::nvext::{NvExt, NvExtProvider};
use crate::protocols::openai::{ use crate::protocols::openai::{
OpenAIOutputOptionsProvider, OpenAISamplingOptionsProvider, OpenAIStopConditionsProvider, OpenAIOutputOptionsProvider, OpenAISamplingOptionsProvider, OpenAIStopConditionsProvider,
}; };
...@@ -77,12 +78,8 @@ pub struct AnthropicContext { ...@@ -77,12 +78,8 @@ pub struct AnthropicContext {
pub thinking: Option<ThinkingConfig>, pub thinking: Option<ThinkingConfig>,
/// Per-block cache control breakpoints with their position in the /// Per-block cache control breakpoints with their position in the
/// message array. The existing Anthropic→Chat Completions conversion /// message array. These remain available in the API sidecar even when
/// collapses all per-block `cache_control` annotations into a single /// the request conversion does not forward cache control into `nvext`.
/// last-one-wins `nvext.cache_control` field. This preserves the full
/// per-block granularity for future use (e.g., multi-breakpoint prefix
/// caching, or faithfully reporting per-breakpoint `cache_creation_input_tokens`
/// / `cache_read_input_tokens` in the response).
#[serde(default, skip_serializing_if = "Vec::is_empty")] #[serde(default, skip_serializing_if = "Vec::is_empty")]
pub cache_breakpoints: Vec<CacheBreakpoint>, pub cache_breakpoints: Vec<CacheBreakpoint>,
...@@ -287,15 +284,6 @@ impl NvExtProvider for UnifiedRequest { ...@@ -287,15 +284,6 @@ impl NvExtProvider for UnifiedRequest {
fn raw_prompt(&self) -> Option<String> { fn raw_prompt(&self) -> Option<String> {
None None
} }
/// Returns the single collapsed cache control from `nvext`. This is the
/// last-one-wins value produced by the Anthropic→Chat Completions conversion
/// and is sufficient for backends that support a single prefix cache boundary
/// (SGLang, vLLM). For per-block granularity, consult
/// `AnthropicContext::cache_breakpoints` via the `ApiContext` sidecar.
fn effective_cache_control(&self) -> Option<&CacheControl> {
NvExtProvider::nvext(self).and_then(|ext| ext.cache_control.as_ref())
}
} }
impl AnnotationsProvider for UnifiedRequest { impl AnnotationsProvider for UnifiedRequest {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment