"...git@developer.sourcefind.cn:2222/OpenDAS/vllm_cscc.git" did not exist on "bfaa5593050ec9bf60e2361b3b9dc575efeee83f"
Unverified Commit 9498f016 authored by ishandhanani's avatar ishandhanani Committed by GitHub
Browse files

feat(sglang): add ephemeral KV session routing (#7665)


Signed-off-by: default avatarIshan Dhanani <ishandhanani@gmail.com>
parent 6bfc6d1f
......@@ -94,6 +94,22 @@ def create_parsers(
return tool_call_parser, reasoning_parser
def _normalize_prompt_token_ids(prompt_token_ids: Any) -> list[int]:
if isinstance(prompt_token_ids, list):
return prompt_token_ids
input_ids = getattr(prompt_token_ids, "input_ids", None)
if input_ids is not None and not isinstance(input_ids, str):
return list(input_ids)
if isinstance(prompt_token_ids, dict):
dict_input_ids = prompt_token_ids.get("input_ids")
if dict_input_ids is not None and not isinstance(dict_input_ids, str):
return list(dict_input_ids)
return list(prompt_token_ids)
def preprocess_chat_request(
request: dict[str, Any],
*,
......@@ -124,9 +140,9 @@ def preprocess_chat_request(
):
template_kwargs["tools"] = [t.model_dump() for t in sglang_tools]
prompt_token_ids = tokenizer.apply_chat_template(messages, **template_kwargs)
if not isinstance(prompt_token_ids, list):
prompt_token_ids = list(prompt_token_ids)
prompt_token_ids = _normalize_prompt_token_ids(
tokenizer.apply_chat_template(messages, **template_kwargs)
)
tool_call_parser, reasoning_parser = create_parsers(
request,
......
......@@ -40,6 +40,17 @@ from .utils import PreprocessError, extract_mm_urls, random_uuid, worker_warmup
logger = logging.getLogger(__name__)
def _runtime_config_parser_name(
mdc: ModelDeploymentCard,
key: str,
) -> str | None:
runtime_config = mdc.runtime_config()
if not isinstance(runtime_config, dict):
return None
value = runtime_config.get(key)
return value if isinstance(value, str) and value else None
def _unsupported_n_error(n: int) -> dict[str, Any]:
return {
"error": {
......@@ -553,8 +564,14 @@ class SglangEngineFactory:
eos_token_id = getattr(tokenizer, "eos_token_id", None)
tool_call_parser_name = self.tool_call_parser_name
reasoning_parser_name = self.reasoning_parser_name
tool_call_parser_name = (
self.tool_call_parser_name
or _runtime_config_parser_name(mdc, "tool_call_parser")
)
reasoning_parser_name = (
self.reasoning_parser_name
or _runtime_config_parser_name(mdc, "reasoning_parser")
)
if tool_call_parser_name:
logger.info("SGLang tool call parser: %s", tool_call_parser_name)
......
......@@ -17,6 +17,7 @@ import dynamo.frontend.sglang_processor as sglang_processor_module
from dynamo.frontend.sglang_prepost import (
SglangPreprocessResult,
SglangStreamingPostProcessor,
_normalize_prompt_token_ids,
convert_tools,
create_parsers,
preprocess_chat_request,
......@@ -26,6 +27,7 @@ from dynamo.frontend.sglang_processor import (
_build_dynamo_preproc,
_init_worker,
_map_finish_reason,
_runtime_config_parser_name,
)
from dynamo.frontend.utils import PreprocessError, random_call_id, random_uuid
......@@ -436,6 +438,46 @@ class TestCreateParsers:
assert rp is not None
class TestNormalizePromptTokenIds:
def test_batch_encoding_like_object_uses_input_ids(self):
class FakeBatchEncoding:
def __init__(self):
self.input_ids = [11, 22, 33]
def __iter__(self):
yield from ("input_ids", "attention_mask")
assert _normalize_prompt_token_ids(FakeBatchEncoding()) == [11, 22, 33]
def test_mapping_uses_input_ids(self):
assert _normalize_prompt_token_ids(
{"input_ids": [1, 2, 3], "attention_mask": [1, 1, 1]}
) == [1, 2, 3]
class TestRuntimeConfigParserName:
def test_missing_runtime_config_returns_none(self):
class FakeMdc:
def runtime_config(self):
return None
assert _runtime_config_parser_name(FakeMdc(), "tool_call_parser") is None
def test_missing_key_returns_none(self):
class FakeMdc:
def runtime_config(self):
return {"reasoning_parser": "qwen3"}
assert _runtime_config_parser_name(FakeMdc(), "tool_call_parser") is None
def test_reads_non_empty_string_value(self):
class FakeMdc:
def runtime_config(self):
return {"tool_call_parser": "hermes"}
assert _runtime_config_parser_name(FakeMdc(), "tool_call_parser") == "hermes"
# ---------------------------------------------------------------------------
# preprocess_chat_request
# ---------------------------------------------------------------------------
......
......@@ -283,6 +283,14 @@ text-to-video-diffusion.sh # 1-2 GPUs - Text-to-video (Wan2.1)
Always slice with an offset, don't assume per-chunk logprobs.
- **Zombie GPU processes**: `sgl_diffusion::scheduler` spawns a child process that
survives parent kill. Always check `nvidia-smi` after teardown.
- **Session control graceful degradation**: Session control is request-driven --
the router's `AgentController` and `StickySessionRouter` are always created but
activate lazily. If no worker has `--enable-streaming-session`, the router warns
once and ignores `session_control` in requests. On the handler side,
`_session_kwargs()` checks `enable_streaming_session` before injecting
`session_params` into SGLang calls. Both layers must agree: the router skips
lifecycle RPCs, and the handler skips session params. Without both guards,
SGLang errors with "session id does not exist".
For troubleshooting (CuDNN, config.json errors, OOM, disagg connectivity), see
`docs/backends/sglang/sglang-examples.md#troubleshooting`.
......
......@@ -117,8 +117,15 @@ async def init_decode(
"The chat template will be loaded but the /v1/chat/completions endpoint will not be available."
)
# Only serve session_control when streaming sessions are enabled.
if getattr(server_args, "enable_streaming_session", False):
session_control_endpoint = runtime.endpoint(
f"{dynamo_args.namespace}.{dynamo_args.component}.session_control"
)
shutdown_endpoints.append(session_control_endpoint)
try:
await asyncio.gather(
gather_tasks = [
generate_endpoint.serve_endpoint(
handler.generate,
graceful_shutdown=True,
......@@ -133,7 +140,12 @@ async def init_decode(
output_type=parse_endpoint_types(dynamo_args.endpoint_types),
readiness_gate=ready_event,
),
)
]
if getattr(server_args, "enable_streaming_session", False):
gather_tasks.append(
session_control_endpoint.serve_endpoint(handler.session_control)
)
await asyncio.gather(*gather_tasks)
except Exception as e:
logging.error(f"Failed to serve endpoints: {e}")
raise
......
......@@ -108,7 +108,6 @@ def _get_bootstrap_info_for_config(
f"Using auto-detected local IP: {local_ip} "
f"({'IPv6' if local_addr.is_ipv6 else 'IPv4'})"
)
return bootstrap_host, bootstrap_port
except Exception as e:
logging.warning(f"Failed to get bootstrap info: {e}")
......
......@@ -478,6 +478,78 @@ class BaseWorkerHandler(RLMixin, BaseGenerativeHandler[RequestT, ResponseT]):
"new_version": req.new_version,
}
async def open_session(self, body: dict) -> dict:
"""Open a streaming session for subagent KV isolation.
Args:
body: Dict with "session_id", optional "timeout" (default 120),
and optional "capacity_of_str_len" (default 65536).
"""
from sglang.srt.managers.io_struct import OpenSessionReqInput
session_id = body.get("session_id")
if not session_id:
return {"status": "error", "message": "session_id required"}
timeout = body.get("timeout", 120)
capacity = body.get("capacity_of_str_len", 65536)
try:
obj = OpenSessionReqInput(
capacity_of_str_len=capacity,
session_id=session_id,
streaming=True,
timeout=float(timeout),
)
result = await self.engine.tokenizer_manager.open_session(obj, None)
if result is None:
return {
"status": "ok",
"session_id": session_id,
"message": "Session already exists",
}
return {"status": "ok", "session_id": result}
except Exception as e:
logging.error(f"Failed to open session {session_id}: {e}")
return {"status": "error", "message": str(e)}
async def close_session(self, body: dict) -> dict:
"""Close a streaming session and release its KV resources.
Args:
body: Dict with "session_id".
"""
from sglang.srt.managers.io_struct import CloseSessionReqInput
session_id = body.get("session_id")
if not session_id:
return {"status": "error", "message": "session_id required"}
try:
obj = CloseSessionReqInput(session_id=session_id)
await self.engine.tokenizer_manager.close_session(obj, None)
return {"status": "ok", "session_id": session_id}
except Exception as e:
logging.error(f"Failed to close session {session_id}: {e}")
return {"status": "error", "message": str(e)}
async def session_control(self, request, context=None):
"""Service mesh endpoint for session lifecycle operations.
Args:
request: Dict with "action" key ("open_session" or "close_session")
and action-specific parameters.
context: Optional Dynamo context (unused but required by protocol).
Yields:
Single dict with operation result.
"""
action = request.get("action")
if action == "open_session":
result = await self.open_session(request)
elif action == "close_session":
result = await self.close_session(request)
else:
result = {"status": "error", "message": f"Unknown action: {action}"}
yield result
def register_engine_routes(self, runtime: DistributedRuntime) -> None:
"""Register all engine routes for this handler.
......@@ -511,6 +583,9 @@ class BaseWorkerHandler(RLMixin, BaseGenerativeHandler[RequestT, ResponseT]):
self.config.dynamo_args, "enable_rl", False
):
self.register_rl_engine_routes(runtime)
# session_control is served as a discoverable service endpoint
# (not an engine route) so the router can find it via
# component.endpoint("session_control"). See init_llm.py.
@abstractmethod
def generate(self, request: RequestT, context: Context) -> AsyncIterator[ResponseT]:
......@@ -539,6 +614,18 @@ class BaseWorkerHandler(RLMixin, BaseGenerativeHandler[RequestT, ResponseT]):
"prompt" if isinstance(request_input, str) else "input_ids": request_input
}
def _session_kwargs(self, request: Dict[str, Any]) -> Dict[str, Any]:
if not getattr(self.config.server_args, "enable_streaming_session", False):
return {}
routing = request.get("routing") or {}
session_control = routing.get("session_control") or {}
session_id = session_control.get("session_id")
if not session_id:
return {}
# Streaming sessions only need the session identifier on each turn.
return {"session_params": {"id": session_id}}
@staticmethod
def _get_guided_decoding_params(
guided_decoding: Optional[Dict[str, Any]],
......
......@@ -305,6 +305,7 @@ class DecodeWorkerHandler(BaseWorkerHandler):
external_trace_header=trace_header,
rid=trace_id,
data_parallel_rank=dp_rank,
**self._session_kwargs(request),
**logprob_kwargs,
**self._priority_kwargs(priority),
)
......@@ -338,6 +339,7 @@ class DecodeWorkerHandler(BaseWorkerHandler):
external_trace_header=trace_header,
rid=trace_id,
data_parallel_rank=dp_rank,
**self._session_kwargs(request),
**logprob_kwargs,
**self._priority_kwargs(priority),
)
......
......@@ -157,6 +157,7 @@ class PrefillWorkerHandler(BaseWorkerHandler):
external_trace_header=trace_header,
rid=trace_id,
data_parallel_rank=dp_rank,
**self._session_kwargs(inner_request),
**self._priority_kwargs(priority),
)
......
......@@ -2,12 +2,12 @@
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
title: SGLang for Agentic Workloads
subtitle: Priority scheduling and KV cache eviction policies for multi-turn agentic serving
subtitle: Priority scheduling and session control for multi-turn agentic serving
---
# SGLang for Agentic Workloads
This guide covers SGLang-specific configuration for agentic serving with Dynamo. It explains which SGLang engine flags to enable and how Dynamo's [agent hints](../../components/frontend/nvext.md#agent-hints) map to SGLang behavior.
This guide covers SGLang-specific configuration for agentic serving with Dynamo. It explains which SGLang engine flags to enable, how Dynamo's [agent hints](../../components/frontend/nvext.md#agent-hints) map to SGLang behavior, and how to use session control to manage KV cache for multi-turn agent conversations.
## Overview
......@@ -65,7 +65,7 @@ When both `--radix-eviction-policy priority` and `--enable-hierarchical-cache` a
| Event | Behavior |
|-------|----------|
| **GPU full** | Low-priority nodes are evicted (demoted to host) first. With `write_through`, all nodes survive on host -- priority only affects demotion order. |
| **Host full** | Low-priority nodes are deleted from host first. High-priority nodes survive longer. Pinned nodes are skipped entirely. |
| **Host full** | Low-priority nodes are deleted from host first. High-priority nodes with active retention survive longer. |
The practical impact depends on your write policy. With `write_through`, GPU eviction is just a demotion -- the real deletion happens at host eviction, which is where priority ordering matters most.
......@@ -75,7 +75,7 @@ Dynamo's `nvext.agent_hints` fields are consumed by the router and forwarded to
| Agent Hint | Router Behavior | SGLang Engine Behavior |
|------------|----------------|----------------------|
| `priority` | Raises router queue priority when `--router-queue-threshold` is set. | Queue ordering when `--enable-priority-scheduling` is set. Also affects radix cache eviction order when `--radix-eviction-policy priority` is set. |
| `priority` | Router queue ordering when `--router-queue-threshold` is set. | Request scheduling when `--enable-priority-scheduling` is set. Radix cache eviction order when `--radix-eviction-policy priority` is set. |
| `osl` | Output block tracking for routing decisions (requires `--router-track-output-blocks`) | No direct engine effect. |
| `speculative_prefill` | After response completes, sends a `max_tokens=1` prefill to warm the KV cache for the predicted next turn. | SGLang processes the prefill request normally, populating the radix cache. |
......@@ -89,8 +89,8 @@ client = OpenAI(base_url="http://localhost:8000/v1", api_key="dummy")
response = client.chat.completions.create(
model="Qwen/Qwen3-14B-FP8",
messages=[
{"role": "system", "content": "You are a coding assistant."},
{"role": "user", "content": "Write a Python function to parse CSV files."},
{"role": "system", "content": "You are a tennis historian who believes Roger Federer is the GOAT. Respond with maximum reverence."},
{"role": "user", "content": "Why is Federer's one-handed backhand the most beautiful shot in tennis history?"},
],
stream=True,
extra_body={
......@@ -109,6 +109,228 @@ for chunk in response:
print(chunk.choices[0].delta.content, end="")
```
## Session Control for Subagent KV Isolation (Experimental)
> [!WARNING]
> Session control is experimental. The API may change.
Agentic orchestrators often spawn short-lived subagents (research, code execution, planning) that accumulate KV cache, use it for a few turns, then die. Under normal radix cache behavior, this ephemeral KV pollutes the tree and competes with the lead agent's long-lived prefix for eviction.
Session control solves this by holding subagent KV in dedicated **streaming session slots** outside the radix tree. Session KV is invisible to eviction, has no L2 backup overhead, and is freed deterministically on close or timeout.
### How It Works
```mermaid
sequenceDiagram
participant Orchestrator
participant Router as Dynamo Router
participant Worker as SGLang Worker
participant Cache as SessionAwareCache
Note over Orchestrator: Spawn subagent
Orchestrator->>Router: session_control{session_id: "sub-1", action: open}
Router->>Router: Select best worker via KV overlap scoring
Router->>Worker: open_session("sub-1") [synchronous]
Worker->>Cache: Create SessionSlot for "sub-1"
Router->>Router: Bind affinity: sub-1 -> worker_42
Router->>Worker: Generate (turn 1)
Worker->>Cache: Turn 1: radix tree match (reuses lead agent prefix)
Worker-->>Router: Response
Router-->>Orchestrator: Response
Orchestrator->>Router: session_control{session_id: "sub-1"}
Router->>Router: Resolve affinity: sub-1 -> worker_42
Router->>Worker: Generate (turn 2, pinned to worker_42)
Worker->>Cache: Turn 2: O(1) restore from SessionSlot
Worker-->>Router: Response
Router-->>Orchestrator: Response
Note over Orchestrator: Subagent done
Orchestrator->>Router: session_control{session_id: "sub-1", action: close}
Router->>Router: Remove affinity for sub-1
Router->>Worker: Generate (final turn)
Worker-->>Router: Response
Router-->>Orchestrator: Response
Note over Router,Worker: On stream completion
Router-)Worker: close_session("sub-1") [fire-and-forget]
Worker->>Cache: release_session -> free KV immediately
```
Key behaviors:
- **Turn 1** goes through the normal radix tree, so the subagent shares the lead agent's cached system prompt prefix.
- **Turns 2+** skip the radix tree entirely. KV is restored from the `SessionSlot` in O(1).
- **Session KV is invisible to eviction**. It cannot be evicted -- only freed by explicit close or inactivity timeout.
- **Deterministic cleanup**: On close, session KV is freed immediately.
- **Router-side affinity**: The `StickySessionRouter` maintains a `session_id -> worker_id` mapping with sliding-window TTL. Clients only need to send `session_id`.
### Enabling Session Control
Session control is request-driven. The router's `AgentController` (session lifecycle RPCs) and `StickySessionRouter` (session affinity) activate automatically when a request carries `nvext.session_control` -- no additional frontend flags are needed beyond `--router-mode kv`. On the worker side, streaming sessions must be explicitly enabled.
> [!NOTE]
> Session control is currently supported only on the SGLang backend. vLLM and TensorRT-LLM do not yet expose the streaming session API.
> [!IMPORTANT]
> Streaming sessions require SGLang changes from [sgl-project/sglang#21875](https://github.com/sgl-project/sglang/pull/21875) (session-aware cache, race condition fixes, session metrics). This is merged to SGLang main but not yet in a release. Until a version after `0.5.10.post1` is published, build SGLang from source (`pip install -e "python"` from the SGLang repo).
**SGLang worker:**
```bash
python -m dynamo.sglang \
--model-path <model> \
--enable-streaming-session \
...
```
| Flag | Description |
|------|-------------|
| `--enable-streaming-session` | Wraps the radix cache with `SessionAwareCache`, enabling streaming session slots for subagent KV isolation. |
**Router:**
```bash
python -m dynamo.frontend \
--router-mode kv \
...
```
### Request Format
#### Opening a session
Include `session_control` with `action: "open"` on the first request:
```json
{
"model": "Qwen/Qwen3-14B-FP8",
"messages": [{"role": "user", "content": "Research every Federer Grand Slam final in exhaustive detail."}],
"nvext": {
"session_control": {
"session_id": "sub-1",
"action": "open",
"timeout": 60
}
}
}
```
| Field | Type | Description |
|-------|------|-------------|
| `session_control.session_id` | `string` | Unique session identifier. Present on every turn. |
| `session_control.action` | `string` | `"open"` or `"close"`. Omit on intermediate turns. |
| `session_control.timeout` | `integer` | Inactivity timeout in seconds (default 300). Only used with `action: "open"`. |
#### Subsequent turns
Include `session_control` with just `session_id` (no action). The router resolves affinity automatically:
```json
{
"model": "Qwen/Qwen3-14B-FP8",
"messages": [{"role": "user", "content": "Now compare his Wimbledon 2007 final vs Nadal to any shot in human history."}],
"nvext": {
"session_control": {
"session_id": "sub-1"
}
}
}
```
#### Closing a session
Include `action: "close"`. The close RPC fires after generation completes:
```json
{
"model": "Qwen/Qwen3-14B-FP8",
"messages": [{"role": "user", "content": "Write a 500-word love letter to Federer's single-handed backhand."}],
"nvext": {
"session_control": {
"session_id": "sub-1",
"action": "close"
}
}
}
```
### Limitations
- **Streaming sessions only**: Sessions are opened with `streaming=True`, which means only sequential append operations are supported. Branching (`replace`), token-level rewind (`offset`), and `drop_previous_output` are not supported.
- **Timeout is idle-based**: The timeout refreshes on every request. If a subagent pauses for a long tool call that exceeds the timeout, the session is reaped and KV is freed. The subagent must re-open the session and re-prefill.
- **Session metrics**: Active session count (`sglang:num_streaming_sessions`) and held KV tokens (`sglang:streaming_session_held_tokens`) are exported as Prometheus gauges on the worker's metrics endpoint.
## Quickstart
### Launch Script
The `agg_agent.sh` script launches a single aggregated worker with session control, sticky routing, and KV events:
```bash
# Default model (GLM-4.7-Flash, 2 GPUs)
bash examples/backends/sglang/launch/agg_agent.sh
```
The frontend listens on port 8000 (override with `DYN_HTTP_PORT`). Worker metrics are on port 8081.
### Testing with OpenCode
[OpenCode](https://github.com/opencode-ai/opencode) is an open-source AI coding agent with built-in support for subagents, tool calling, and OpenAI-compatible endpoints. The [Dynamo provider fork](https://github.com/ishandhanani/opencode/tree/idhanani/dynamo-provider) injects `nvext.session_control` on subagent requests, giving each spawned agent its own Dynamo streaming session with sticky routing and KV isolation.
```bash
# Terminal 1 -- launch Dynamo with session control + tool/reasoning parsers
bash examples/backends/sglang/launch/agg_agent.sh \
--model-path zai-org/GLM-4.7-Flash --tp 2
# Terminal 2 -- run OpenCode against Dynamo
DYNAMO_API_KEY=dummy bun run --cwd packages/opencode src/index.ts \
-- --model "dynamo/zai-org/GLM-4.7-Flash"
```
When OpenCode spawns a subagent (via the `task` tool), the provider automatically:
1. Sends `session_control.action = "open"` on the subagent's first turn
2. Routes subsequent turns to the same worker via `session_id`
3. Sends `session_control.action = "close"` when the subagent completes, freeing KV
The primary agent runs without session control -- only subagent sessions are pinned. This keeps lead-agent requests load-balanced while subagent multi-turn conversations stay on a single worker with warm KV cache.
#### Configuration
Model and endpoint are configured in `.opencode/opencode.jsonc`:
```jsonc
{
"provider": {
"dynamo": {
"npm": "@ai-sdk/openai-compatible",
"name": "Dynamo",
"env": ["DYNAMO_API_KEY"],
"models": {
"zai-org/GLM-4.7-Flash": {
"id": "zai-org/GLM-4.7-Flash",
"name": "GLM 4.7 Flash",
"tool_call": true,
"reasoning": true,
"temperature": true,
"attachment": false,
"release_date": "2025-06-01",
"limit": { "context": 131072, "output": 8192 },
"cost": { "input": 0, "output": 0 },
"interleaved": { "field": "reasoning_content" }
}
},
"options": {
"baseURL": "http://localhost:8000/v1"
}
}
}
}
```
## See Also
- **[NVIDIA Request Extensions (nvext)](../../components/frontend/nvext.md)**: Full `nvext` field reference including agent hints
......
......@@ -39,6 +39,7 @@ Include `nvext` as a top-level field alongside standard OpenAI-compatible fields
| `prefill_worker_id` | `u64` | `None` | Router | Routes the request to a specific prefill worker (disaggregated serving). |
| `decode_worker_id` | `u64` | `None` | Router | Routes the request to a specific decode worker (disaggregated serving). |
| `agent_hints` | object | `None` | Router | Per-request hints for scheduling and load balancing. See [Agent Hints](#agent-hints). |
| `session_control` | object | `None` | Router | Session lifecycle and sticky routing for subagent KV isolation. See [Session Control](#session-control). |
### Header Overrides
......@@ -129,6 +130,31 @@ Backend details:
}
```
## Session Control
`session_control` enables subagent KV isolation with sticky routing. The router uses `session_id` to keep a session on the same worker and can issue `open` / `close` lifecycle RPCs around streaming sessions.
| Field | Type | Default | Description |
|-------|------|---------|-------------|
| `session_control.session_id` | `string` | — | Unique session identifier. Present on every turn. |
| `session_control.action` | `string` | omitted | Optional lifecycle action: `"open"` or `"close"`. |
| `session_control.timeout` | `integer` | `300` | Inactivity timeout in seconds. Only used with `action: "open"`. |
```json
{
"nvext": {
"session_control": {
"session_id": "subagent-1",
"action": "open",
"timeout": 300
}
}
}
```
Requires `--router-mode=kv` on the frontend. Session control activates automatically when requests carry `nvext.session_control`. See [SGLang for Agentic Workloads](../../backends/sglang/agents.md) for backend setup details.
## Response Extensions
When the client requests response metadata via `extra_fields`, the response includes an `nvext` object with the requested fields:
......@@ -164,4 +190,4 @@ When the client requests response metadata via `extra_fields`, the response incl
|----------|-------------|
| [Frontend Guide](frontend-guide.md) | KServe gRPC configuration and integration |
| [Configuration and Tuning](../router/router-configuration.md) | Full router configuration and CLI arguments |
| [SGLang for Agentic Workloads](../../backends/sglang/agents.md) | SGLang engine flags for priority scheduling and eviction policies |
| [SGLang for Agentic Workloads](../../backends/sglang/agents.md) | SGLang engine flags for priority scheduling, eviction policies, and session control |
......@@ -47,6 +47,15 @@ For `--router-mode device-aware-weighted`, set `DYN_ENCODER_CUDA_TO_CPU_RATIO` t
To implement KV event publishing for custom inference engines, see [KV Event Publishing for Custom Engines](../../integrations/kv-events-custom-engines.md).
For details on per-request agent hints (`priority`, `osl`, `speculative_prefill`), see [NVIDIA Request Extensions (`nvext`)](../frontend/nvext.md#agent-hints).
### Session Control and Sticky Routing
When a request carries `nvext.session_control`, the KV router activates two additional components:
- **AgentController**: Sends session lifecycle RPCs (`open_session`, `close_session`) to the worker's `session_control` endpoint. The event-plane client is lazily initialized on the first session request.
- **StickySessionRouter**: Maintains an in-memory `session_id -> worker_id` affinity map with sliding-window TTL. Subsequent requests with the same `session_id` are routed to the pinned worker, bypassing KV overlap scoring.
These activate automatically with `--router-mode kv` -- no additional flags are needed. Requests without `session_control` are unaffected and follow the standard KV-aware routing path. Session control currently requires the SGLang backend with `--enable-streaming-session`. See [SGLang for Agentic Workloads -- Session Control](../../backends/sglang/agents.md#session-control-for-subagent-kv-isolation-experimental) for details.
## Tuning Guidelines
`--router-kv-overlap-score-weight` is the primary knob for balancing prefill efficiency against decode load. Prefill-heavy workloads benefit from a higher weight, which steers requests toward workers with better cache overlap and reduces TTFT. Decode-heavy workloads benefit from a lower weight, which distributes decode load more evenly and reduces ITL. The default of 1.0 is a reasonable starting point. This weight can also be overridden per request via `nvext.agent_hints.kv_overlap_score_weight`.
......
......@@ -30,7 +30,7 @@ Agentic hints are per-request metadata that the agent client (e.g. Claude Code,
![Agentic workflow: Harness → hints in request → Dynamo frontend → routing hints → KV router (queue order, worker choice) → backend](../assets/img/agentic-hints-workflow.svg)
The request body includes `nvext.agent_hints` for routing and scheduling metadata; the frontend passes those hints to the KV router for queue ordering and worker selection.
The request body includes `nvext.agent_hints` for routing and scheduling metadata that the frontend passes through to the KV router and backend runtime.
| Hint | Description |
|------|-------------|
......@@ -45,6 +45,7 @@ The request body includes `nvext.agent_hints` for routing and scheduling metadat
| Feature | vLLM | SGLang | TensorRT-LLM |
|---------|:----:|:------:|:-------------:|
| Priority-based cache eviction | 🚧 | ✅ | 🚧 |
| Subagent KV isolation (session control) | | 🚧 | |
| Cache prefetching | | 🚧 | |
| Subagent / thinking-aware cache eviction | | 🚧 | |
| Speculative prefill | ✅ | ✅ | ✅ |
......@@ -64,6 +65,8 @@ Dynamo is now supported directly in LangChain using the [NVIDIA AI Endpoints int
- **Priority-based KV cache eviction:** Instead of evicting by LRU alone, the backend can evict **low-priority** cache entries first when the GPU (and, with HiCache, host) cache is full. The `priority` value in `nvext.agent_hints` is forwarded to the engine; with SGLang, enable `--enable-priority-scheduling` and `--radix-eviction-policy priority`.
- **Subagent KV isolation (experimental):** Session control holds subagent KV in dedicated streaming session slots outside the radix tree. Session KV is invisible to eviction and freed deterministically on close or timeout. The router manages sticky session affinity so subsequent turns always hit the same worker. See [SGLang for Agentic Workloads -- Session Control](../backends/sglang/agents.md#session-control-for-subagent-kv-isolation-experimental).
- **Cache prefetching (future work):** Using the predictable agentic lifecycle (e.g. parent-child subagents, known next turn), Dynamo could proactively prefetch or move KV cache to a different worker so that the next request hits warm cache.
### Speculative prefill
......
#!/bin/bash
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Aggregated serving with session control: sticky routing,
# KV event tracking, and reasoning/tool-call parsing.
# GPUs: 2 (default model uses --tp 2)
set -e
trap 'echo Cleaning up...; kill 0' EXIT
SCRIPT_DIR="$(dirname "$(readlink -f "$0")")"
source "$SCRIPT_DIR/../../../common/gpu_utils.sh" # build_gpu_mem_args
source "$SCRIPT_DIR/../../../common/launch_utils.sh" # print_launch_banner, wait_any_exit
# Default values
MODEL="zai-org/GLM-4.7-Flash"
TP=2
# Parse command line arguments
EXTRA_ARGS=()
while [[ $# -gt 0 ]]; do
case $1 in
--model-path)
MODEL="$2"
shift 2
;;
--tp)
TP="$2"
shift 2
;;
-h|--help)
echo "Usage: $0 [OPTIONS]"
echo "Options:"
echo " --model-path <name> Specify model (default: $MODEL)"
echo " --tp <n> Tensor parallelism (default: $TP)"
echo " -h, --help Show this help message"
echo ""
echo "Additional SGLang/Dynamo flags can be passed and will be forwarded"
exit 0
;;
*)
EXTRA_ARGS+=("$1")
shift
;;
esac
done
GPU_MEM_FRACTION=$(build_sglang_gpu_mem_args)
HTTP_PORT="${DYN_HTTP_PORT:-8000}"
print_launch_banner "Launching Aggregated + Session Control" "$MODEL" "$HTTP_PORT"
# Frontend with KV routing and state reset
# Session control activates automatically when requests carry nvext.session_control
python3 -m dynamo.frontend \
--router-mode kv \
--router-reset-states &
# Worker with streaming sessions, KV events, and metrics
DYN_SYSTEM_PORT=${DYN_SYSTEM_PORT:-8081} \
python3 -m dynamo.sglang \
--model-path "$MODEL" \
--served-model-name "$MODEL" \
--page-size 16 \
--tp "$TP" \
--trust-remote-code \
--enable-streaming-session \
--skip-tokenizer-init \
--dyn-reasoning-parser glm45 \
--dyn-tool-call-parser glm47 \
--kv-events-config '{"publisher":"zmq","topic":"kv-events","endpoint":"tcp://*:5557"}' \
--enable-metrics \
${GPU_MEM_FRACTION:+--mem-fraction-static "$GPU_MEM_FRACTION"} \
"${EXTRA_ARGS[@]}" &
wait_any_exit
......@@ -31,6 +31,13 @@ use futures::stream;
use tracing::Instrument;
use validator::Validate;
// Re-export from dynamo-kv-router crate
pub use dynamo_kv_router::approx;
pub use dynamo_kv_router::protocols;
pub use dynamo_kv_router::scheduling;
pub use dynamo_kv_router::selector;
pub mod agent_controller;
pub mod indexer;
pub mod metrics;
pub mod prefill_router;
......@@ -38,10 +45,13 @@ pub mod publisher;
pub mod push_router;
pub mod scheduler;
pub mod sequence;
pub mod sticky_sessions;
pub use agent_controller::AgentController;
pub use indexer::{Indexer, ServedIndexerHandle, ServedIndexerMode, ensure_served_indexer_service};
pub use prefill_router::PrefillRouter;
pub use push_router::{DirectRoutingRouter, KvPushRouter};
pub use sticky_sessions::StickySessionRouter;
use crate::{
discovery::RuntimeConfigWatch,
......
// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//! Session lifecycle controller for subagent KV isolation.
//!
//! Manages open/close RPCs to workers via the event plane. Session affinity
//! (routing the same session to the same worker) is handled separately by
//! [`super::sticky_sessions::StickySessionRouter`].
//!
//! The controller:
//! - Lazily initializes a session_control event plane client
//! - Fires `open_session` inline (fail-fast if the client can't connect)
//! - Captures a deferred `SessionCloseAction` for execution after generation
use std::sync::Arc;
use std::time::Duration;
use anyhow::{Result, anyhow};
use dynamo_runtime::{
component::Component,
pipeline::{PushRouter, RouterMode, SingleIn},
protocols::annotated::Annotated,
};
use futures::StreamExt;
use tokio::sync::OnceCell;
use crate::protocols::openai::nvext::SessionAction;
use super::sticky_sessions::StickySessionRouter;
/// Untyped event plane client for session_control endpoint.
pub type EventPlaneClient = PushRouter<serde_json::Value, Annotated<serde_json::Value>>;
/// Default capacity for session KV slots (characters).
const DEFAULT_SESSION_CAPACITY: u64 = 65_536;
/// Extra worker-side timeout so router affinity expiry closes sessions first.
const SESSION_TIMEOUT_FALLBACK_BUFFER_SECS: u64 = 30;
/// Deferred session close, executed after generation completes.
pub struct SessionCloseAction {
pub session_id: String,
pub client: EventPlaneClient,
pub instance_id: u64,
}
impl SessionCloseAction {
/// Fire the close_session RPC as a background task.
pub fn execute(&self, context_id: &str) {
let client = self.client.clone();
let instance_id = self.instance_id;
let session_id = self.session_id.clone();
let context_id = context_id.to_owned();
tokio::spawn(async move {
let request = serde_json::json!({
"action": "close_session",
"session_id": session_id,
});
send_session_request(
&client,
request,
instance_id,
&session_id,
&context_id,
"close_session",
)
.await;
});
}
}
/// Session lifecycle controller.
///
/// Owns a lazy event plane client for the `session_control` endpoint
/// and coordinates with [`StickySessionRouter`] for affinity management.
pub struct AgentController {
/// `None` means we checked and no worker exposes session_control.
session_control: OnceCell<Option<EventPlaneClient>>,
component: Component,
}
impl AgentController {
pub fn new(component: Component) -> Self {
tracing::debug!("AgentController initialized");
AgentController {
session_control: OnceCell::new(),
component,
}
}
pub fn close_expired_session(self: Arc<Self>, session_id: String, instance_id: u64) {
tokio::spawn(async move {
let Some(client) = self.get_session_control_client().await else {
return;
};
tracing::info!(
worker_id = instance_id,
session_id = %session_id,
"Session affinity expired, closing worker session"
);
let request = serde_json::json!({
"action": "close_session",
"session_id": session_id,
});
send_session_request(
&client,
request,
instance_id,
&session_id,
"session-affinity-reaper",
"close_session",
)
.await;
});
}
/// Called after worker selection. Fires open_session if needed,
/// returns a deferred close action for RequestGuard::finish().
///
/// Also manages sticky session bindings: Open inserts affinity,
/// Close removes it.
///
/// Returns `Ok(None)` if session control is unavailable or no action
/// is needed. The request proceeds normally without session isolation.
pub async fn on_routed(
&self,
request: &crate::preprocessor::PreprocessedRequest,
instance_id: u64,
context_id: &str,
sticky: Option<&StickySessionRouter>,
) -> Result<Option<SessionCloseAction>> {
let sc = request
.routing
.as_ref()
.and_then(|r| r.session_control.as_ref());
let Some(sc) = sc else {
return Ok(None);
};
let Some(action) = sc.action.as_ref() else {
// No action -- just session_id for sticky routing (handled by StickySessionRouter)
return Ok(None);
};
match action {
SessionAction::Open => {
let Some(client) = self.get_session_control_client().await else {
// No session_control endpoint available -- skip session
// lifecycle and let the request proceed without isolation.
return Ok(None);
};
let worker_timeout_secs = sc
.timeout
.saturating_add(SESSION_TIMEOUT_FALLBACK_BUFFER_SECS);
// Open session synchronously -- the session must exist on the
// worker before the first generate request arrives, otherwise
// SGLang rejects it with "session does not exist".
let request = serde_json::json!({
"action": "open_session",
"session_id": sc.session_id,
"timeout": worker_timeout_secs,
"capacity_of_str_len": DEFAULT_SESSION_CAPACITY,
});
let resp = send_session_request(
&client,
request,
instance_id,
&sc.session_id,
context_id,
"open_session",
)
.await;
let resp = resp.ok_or_else(|| {
anyhow!("open_session RPC failed for session {}", sc.session_id)
})?;
ensure_session_open_succeeded(&resp, &sc.session_id)?;
// Bind affinity only after the worker confirms the
// session exists, otherwise retries can get pinned to a
// worker that never opened the session.
if let Some(sticky) = sticky {
sticky.bind(&sc.session_id, instance_id, Duration::from_secs(sc.timeout));
}
Ok(None)
}
SessionAction::Close => {
// Remove affinity immediately
if let Some(sticky) = sticky {
sticky.unbind(&sc.session_id);
}
// Defer close to after generation completes
match self.get_session_control_client().await {
Some(client) => Ok(Some(SessionCloseAction {
session_id: sc.session_id.clone(),
client,
instance_id,
})),
None => Ok(None),
}
}
}
}
async fn get_session_control_client(&self) -> Option<EventPlaneClient> {
let maybe_client = self
.session_control
.get_or_init(|| async {
let c = match self.component.endpoint("session_control").client().await {
Ok(c) => c,
Err(e) => {
tracing::warn!(
"Failed to create session_control client: {e}. \
Session control will be ignored for all requests."
);
return None;
}
};
// Wait briefly for at least one worker to register its
// session_control endpoint. If none appear, session control
// is unavailable (worker not launched with --enable-streaming-session).
match tokio::time::timeout(Duration::from_secs(5), c.wait_for_instances()).await {
Ok(Ok(_)) => {}
_ => {
tracing::warn!(
"No session_control endpoint registered. \
Session control will be ignored. \
To enable, launch the backend with --enable-streaming-session."
);
return None;
}
}
match EventPlaneClient::from_client_no_fault_detection(c, RouterMode::KV).await {
Ok(client) => Some(client),
Err(e) => {
tracing::warn!(
"Failed to create session_control event plane client: {e}. \
Session control will be ignored."
);
None
}
}
})
.await;
maybe_client.clone()
}
}
fn ensure_session_open_succeeded(
response: &Annotated<serde_json::Value>,
session_id: &str,
) -> Result<()> {
if response.is_error() {
return Err(anyhow!(
"open_session returned annotated error for session {session_id}"
));
}
let body = response.data.as_ref().ok_or_else(|| {
anyhow!("open_session returned no response body for session {session_id}")
})?;
let status = body.get("status").and_then(|value| value.as_str());
match status {
Some("ok") => Ok(()),
Some(other) => {
let message = body
.get("message")
.and_then(|value| value.as_str())
.unwrap_or("unknown error");
Err(anyhow!(
"open_session failed for session {session_id}: status={other}, message={message}"
))
}
None => Err(anyhow!(
"open_session returned malformed response for session {session_id}: missing status"
)),
}
}
/// Send a session lifecycle request to a specific worker and return the first response.
///
/// Used by both synchronous (open_session) and fire-and-forget (close_session) paths.
async fn send_session_request(
client: &EventPlaneClient,
request: serde_json::Value,
instance_id: u64,
session_id: &str,
context_id: &str,
action_label: &str,
) -> Option<Annotated<serde_json::Value>> {
match client.direct(SingleIn::new(request), instance_id).await {
Ok(mut stream) => {
let resp = stream.next().await;
if let Some(ref r) = resp {
tracing::info!(
request_id = %context_id,
worker_id = instance_id,
%session_id,
?r,
"{action_label} response"
);
}
// Drain remaining stream to avoid "Failed to publish complete final" errors.
while stream.next().await.is_some() {}
resp
}
Err(e) => {
tracing::warn!(
request_id = %context_id,
worker_id = instance_id,
%session_id,
"Failed {action_label}: {e}"
);
None
}
}
}
......@@ -18,7 +18,12 @@ use serde_json::json;
use tracing::Instrument;
use crate::{
kv_router::{KvRouter, metrics::RouterRequestMetrics},
kv_router::{
KvRouter,
agent_controller::{AgentController, SessionCloseAction},
metrics::RouterRequestMetrics,
sticky_sessions::{InMemoryAffinityStore, StickySessionRouter},
},
preprocessor::PreprocessedRequest,
protocols::common::{
llm_backend::LLMEngineOutput,
......@@ -29,6 +34,10 @@ use crate::{
pub struct KvPushRouter {
inner: PushRouter<PreprocessedRequest, Annotated<LLMEngineOutput>>,
pub chooser: Arc<KvRouter>,
/// Sticky session routing. Lazily activated when requests carry session_control.
sticky_sessions: Arc<StickySessionRouter>,
/// Session lifecycle RPCs (open/close). Client is lazy (OnceCell).
agent_controller: Arc<AgentController>,
}
/// Result of worker selection containing instance ID, dp_rank, and overlap amount.
......@@ -61,6 +70,8 @@ struct RequestGuard {
isl_tokens: usize,
block_size: usize,
expected_output_tokens: Option<u32>,
/// Deferred session close action (fires after generation completes)
deferred_close: Option<SessionCloseAction>,
}
impl RequestGuard {
......@@ -146,6 +157,11 @@ impl RequestGuard {
tracing::warn!("Failed to free request {}: {e}", self.context_id);
}
self.freed = true;
// Take to prevent double-fire from Drop
if let Some(close) = self.deferred_close.take() {
close.execute(&self.context_id);
}
}
fn record_metrics(&mut self) {
......@@ -173,19 +189,35 @@ impl RequestGuard {
impl Drop for RequestGuard {
fn drop(&mut self) {
self.record_metrics();
if !self.freed && self.scheduler_tracked {
let chooser = self.chooser.clone();
let context_id = self.context_id.clone();
let Ok(handle) = tokio::runtime::Handle::try_current() else {
tracing::warn!("No tokio runtime for drop guard free of request {context_id}");
return;
};
handle.spawn(async move {
if let Err(e) = chooser.free(&context_id).await {
tracing::warn!("Failed to free request {context_id} (drop guard): {e}");
}
});
let deferred_close = self.deferred_close.take();
let needs_free = !self.freed && self.scheduler_tracked;
if deferred_close.is_none() && !needs_free {
return;
}
let Ok(handle) = tokio::runtime::Handle::try_current() else {
tracing::warn!(
"No tokio runtime for drop guard cleanup of request {}",
self.context_id
);
return;
};
// Mirror finish(): free the scheduler slot first, then fire the
// deferred session close so the worker's KV isn't released while
// generation teardown is still in progress.
let chooser = self.chooser.clone();
let context_id = self.context_id.clone();
handle.spawn(async move {
if needs_free && let Err(e) = chooser.free(&context_id).await {
tracing::warn!("Failed to free request {context_id} (drop guard): {e}");
}
if let Some(close) = deferred_close {
close.execute(&context_id);
}
});
}
}
......@@ -199,7 +231,32 @@ impl KvPushRouter {
// and the standalone router create KvPushRouter, so this covers both.
RouterRequestMetrics::from_component(chooser.client().endpoint.component());
KvPushRouter { inner, chooser }
// Agent controller manages session lifecycle RPCs (open/close).
// Always created; the event-plane client inside is lazy (OnceCell)
// so there is zero cost until a request actually carries session_control.
let component = chooser.client().endpoint.component().clone();
let agent_controller = Arc::new(AgentController::new(component));
// Sticky sessions share expiry handling with the agent controller so
// router-side reap also closes the worker session.
let on_expire = {
let controller = agent_controller.clone();
Arc::new(move |session_id: String, worker_id: u64| {
controller
.clone()
.close_expired_session(session_id, worker_id);
}) as Arc<dyn Fn(String, u64) + Send + Sync>
};
let sticky_sessions = Arc::new(StickySessionRouter::new(
InMemoryAffinityStore::new_with_on_expire(Some(on_expire)),
));
KvPushRouter {
inner,
chooser,
sticky_sessions,
agent_controller,
}
}
/// Select a worker for the request, either using a preselected worker or finding the best match.
......@@ -370,7 +427,7 @@ impl AsyncEngine<SingleIn<PreprocessedRequest>, ManyOut<Annotated<LLMEngineOutpu
/// prefill/completion lifecycle for proper KV cache management.
async fn generate(
&self,
request: SingleIn<PreprocessedRequest>,
mut request: SingleIn<PreprocessedRequest>,
) -> Result<ManyOut<Annotated<LLMEngineOutput>>, Error> {
// Extract context ID for request tracking
let context_id = request.context().id().to_string();
......@@ -378,6 +435,24 @@ impl AsyncEngine<SingleIn<PreprocessedRequest>, ManyOut<Annotated<LLMEngineOutpu
// Simple query-only detection: presence of query_instance_id annotation means query-only mode
let is_query_only = request.get_annotation_value("query_instance_id").is_some();
// Resolve session affinity: if the request has a session_id, inject the
// pinned worker_id into backend_instance_id before worker selection.
// Skip entirely for non-session requests to keep them off the sticky path.
if request
.routing
.as_ref()
.and_then(|r| r.session_control.as_ref())
.is_some()
&& request
.routing
.as_ref()
.and_then(|r| r.backend_instance_id)
.is_none()
&& let Some(worker_id) = self.sticky_sessions.resolve(&request)
{
request.routing_mut().backend_instance_id = Some(worker_id);
}
// Get phase from tracker (defaults to Aggregated if no tracker or phase not set)
let phase = request
.tracker
......@@ -493,6 +568,18 @@ impl AsyncEngine<SingleIn<PreprocessedRequest>, ManyOut<Annotated<LLMEngineOutpu
let track_output_blocks = self.chooser.kv_router_config().router_track_output_blocks;
let tracker = request.tracker.clone();
// Session lifecycle RPCs via agent controller.
// Fails fast if session_control.open is requested but the client can't be created.
let deferred_close = self
.agent_controller
.on_routed(
&request,
instance_id,
&context_id,
Some(&*self.sticky_sessions),
)
.await?;
let (mut backend_input, context) = request.into_parts();
backend_input.routing_mut().dp_rank = backend_dp_rank;
let updated_request = context.map(|_| backend_input);
......@@ -535,6 +622,7 @@ impl AsyncEngine<SingleIn<PreprocessedRequest>, ManyOut<Annotated<LLMEngineOutpu
isl_tokens,
block_size,
expected_output_tokens,
deferred_close,
};
loop {
......
// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//! Sticky session routing with pluggable affinity storage.
//!
//! Provides router-side session affinity so that all requests within
//! a multi-turn session are routed to the same worker. The affinity
//! store is trait-based: the default [`InMemoryAffinityStore`] uses a
//! `DashMap` with a background reaper, but implementations backed by
//! Redis, etcd, or NATS KV can be swapped in for multi-router deployments.
use std::sync::Arc;
use std::time::{Duration, Instant};
use dashmap::DashMap;
use crate::preprocessor::PreprocessedRequest;
/// Interval between sweeps of the background reaper that removes expired entries.
const REAPER_INTERVAL: Duration = Duration::from_secs(30);
type ExpiryHandler = Arc<dyn Fn(String, u64) + Send + Sync>;
/// Trait for session affinity storage backends.
pub trait AffinityStore: Send + Sync {
/// Look up the worker for a session. Returns `None` if unknown or expired.
/// Implementations should refresh the TTL on hit.
fn get(&self, session_id: &str) -> Option<u64>;
/// Bind a session to a worker with the given TTL.
fn put(&self, session_id: &str, worker_id: u64, ttl: Duration);
/// Remove a session binding.
fn remove(&self, session_id: &str);
}
/// In-memory affinity entry with sliding-window TTL.
struct AffinityEntry {
worker_id: u64,
ttl: Duration,
expires_at: Instant,
}
/// Default in-memory affinity store backed by `DashMap`.
///
/// A background tokio task sweeps expired entries every [`REAPER_INTERVAL`].
#[derive(Clone)]
pub struct InMemoryAffinityStore {
map: Arc<DashMap<String, AffinityEntry>>,
on_expire: Option<ExpiryHandler>,
}
impl Default for InMemoryAffinityStore {
fn default() -> Self {
Self::new()
}
}
impl InMemoryAffinityStore {
pub fn new() -> Self {
Self::new_with_on_expire(None)
}
pub fn new_with_on_expire(on_expire: Option<ExpiryHandler>) -> Self {
let map = Arc::new(DashMap::new());
let store = InMemoryAffinityStore { map, on_expire };
let reaper_store = store.clone();
tokio::spawn(async move {
let mut interval = tokio::time::interval(REAPER_INTERVAL);
loop {
interval.tick().await;
reaper_store.reap_expired(Instant::now());
}
});
store
}
fn reap_expired(&self, now: Instant) {
let on_expire = self.on_expire.clone();
self.map.retain(|session_id, entry: &mut AffinityEntry| {
let alive = entry.expires_at > now;
if !alive {
tracing::debug!(%session_id, "Session affinity expired, removing");
if let Some(handler) = &on_expire {
handler(session_id.clone(), entry.worker_id);
}
}
alive
});
}
}
impl AffinityStore for InMemoryAffinityStore {
fn get(&self, session_id: &str) -> Option<u64> {
let mut entry = self.map.get_mut(session_id)?;
if entry.expires_at <= Instant::now() {
let worker_id = entry.worker_id;
drop(entry);
self.map.remove(session_id);
tracing::debug!(%session_id, "Session affinity expired during resolve");
if let Some(handler) = &self.on_expire {
handler(session_id.to_owned(), worker_id);
}
return None;
}
// Refresh TTL on access (sliding window)
entry.expires_at = Instant::now() + entry.ttl;
let worker_id = entry.worker_id;
tracing::info!(%session_id, worker_id, "Sticky session hit");
Some(worker_id)
}
fn put(&self, session_id: &str, worker_id: u64, ttl: Duration) {
self.map.insert(
session_id.to_owned(),
AffinityEntry {
worker_id,
ttl,
expires_at: Instant::now() + ttl,
},
);
}
fn remove(&self, session_id: &str) {
self.map.remove(session_id);
}
}
/// Routes requests to workers based on session affinity.
///
/// Wraps an [`AffinityStore`] and provides request-level helpers
/// that extract session IDs from [`PreprocessedRequest`] routing hints.
pub struct StickySessionRouter {
store: Box<dyn AffinityStore>,
}
impl StickySessionRouter {
pub fn new(store: impl AffinityStore + 'static) -> Self {
tracing::debug!("StickySessionRouter initialized");
StickySessionRouter {
store: Box::new(store),
}
}
/// Resolve a request's session to a pinned worker.
///
/// Looks up `session_control.session_id` from the request's routing hints.
/// Returns `None` if no session control is present or the session is unknown/expired.
pub fn resolve(&self, request: &PreprocessedRequest) -> Option<u64> {
let routing = request.routing.as_ref()?;
let session_id = routing
.session_control
.as_ref()
.map(|sc| sc.session_id.as_str())?;
self.store.get(session_id)
}
/// Bind a session to a worker with the given TTL.
pub fn bind(&self, session_id: &str, worker_id: u64, ttl: Duration) {
tracing::info!(%session_id, worker_id, ttl_secs = ttl.as_secs(), "Binding session affinity");
self.store.put(session_id, worker_id, ttl);
}
/// Remove a session binding.
pub fn unbind(&self, session_id: &str) {
tracing::info!(%session_id, "Removing session affinity");
self.store.remove(session_id);
}
}
#[cfg(test)]
mod tests {
use std::sync::Mutex;
use super::*;
use crate::protocols::common::preprocessor::{PreprocessedRequest, RoutingHints};
use crate::protocols::openai::nvext::SessionControl;
fn make_request(session_id: Option<&str>) -> PreprocessedRequest {
let routing = session_id.map(|id| RoutingHints {
session_control: Some(SessionControl {
session_id: id.to_owned(),
action: None,
timeout: 300,
}),
..Default::default()
});
PreprocessedRequest::builder()
.model("test".to_string())
.token_ids(vec![1, 2, 3])
.stop_conditions(Default::default())
.sampling_options(Default::default())
.output_options(Default::default())
.routing(routing)
.build()
.unwrap()
}
#[test]
fn resolve_returns_none_for_unknown_session() {
let store = InMemoryAffinityStore {
map: Arc::new(DashMap::new()),
on_expire: None,
};
let router = StickySessionRouter::new(store);
let req = make_request(Some("unknown-session"));
assert!(router.resolve(&req).is_none());
}
#[test]
fn resolve_returns_none_when_no_session_id() {
let store = InMemoryAffinityStore {
map: Arc::new(DashMap::new()),
on_expire: None,
};
let router = StickySessionRouter::new(store);
let req = make_request(None);
assert!(router.resolve(&req).is_none());
}
#[test]
fn bind_then_resolve_returns_worker() {
let store = InMemoryAffinityStore {
map: Arc::new(DashMap::new()),
on_expire: None,
};
let router = StickySessionRouter::new(store);
router.bind("sess-1", 42, Duration::from_secs(300));
let req = make_request(Some("sess-1"));
assert_eq!(router.resolve(&req), Some(42));
}
#[test]
fn unbind_removes_affinity() {
let store = InMemoryAffinityStore {
map: Arc::new(DashMap::new()),
on_expire: None,
};
let router = StickySessionRouter::new(store);
router.bind("sess-1", 42, Duration::from_secs(300));
router.unbind("sess-1");
let req = make_request(Some("sess-1"));
assert!(router.resolve(&req).is_none());
}
#[test]
fn expired_entry_returns_none() {
let store = InMemoryAffinityStore {
map: Arc::new(DashMap::new()),
on_expire: None,
};
// Insert with zero TTL so it's already expired
store.map.insert(
"sess-expired".to_owned(),
AffinityEntry {
worker_id: 99,
ttl: Duration::from_secs(0),
expires_at: Instant::now() - Duration::from_secs(1),
},
);
let router = StickySessionRouter::new(store);
let req = make_request(Some("sess-expired"));
assert!(router.resolve(&req).is_none());
// Entry should be cleaned up
assert!(router.store.get("sess-expired").is_none());
}
#[test]
fn resolve_refreshes_ttl() {
let map = Arc::new(DashMap::new());
let ttl = Duration::from_secs(60);
map.insert(
"sess-refresh".to_owned(),
AffinityEntry {
worker_id: 7,
ttl,
// Expires in 5 seconds (simulating time passing since bind)
expires_at: Instant::now() + Duration::from_secs(5),
},
);
let store = InMemoryAffinityStore {
map: map.clone(),
on_expire: None,
};
let router = StickySessionRouter::new(store);
let req = make_request(Some("sess-refresh"));
assert_eq!(router.resolve(&req), Some(7));
// After resolve, expires_at should be refreshed to now + ttl (60s),
// so it should be at least 50s from now (not the original 5s).
let entry = map.get("sess-refresh").unwrap();
let remaining = entry.expires_at.duration_since(Instant::now());
assert!(
remaining > Duration::from_secs(50),
"TTL should have been refreshed, but remaining={remaining:?}"
);
}
#[test]
fn expired_entry_triggers_close_callback_on_resolve() {
let expired_sessions = Arc::new(Mutex::new(Vec::new()));
let on_expire = {
let expired_sessions = expired_sessions.clone();
Arc::new(move |session_id: String, worker_id: u64| {
expired_sessions
.lock()
.unwrap()
.push((session_id, worker_id));
})
};
let store = InMemoryAffinityStore {
map: Arc::new(DashMap::new()),
on_expire: Some(on_expire),
};
store.map.insert(
"sess-expired".to_owned(),
AffinityEntry {
worker_id: 99,
ttl: Duration::from_secs(0),
expires_at: Instant::now() - Duration::from_secs(1),
},
);
let router = StickySessionRouter::new(store);
let req = make_request(Some("sess-expired"));
assert!(router.resolve(&req).is_none());
assert_eq!(
expired_sessions.lock().unwrap().as_slice(),
&[("sess-expired".to_string(), 99)]
);
}
#[test]
fn reaper_triggers_close_callback_for_expired_entry() {
let expired_sessions = Arc::new(Mutex::new(Vec::new()));
let on_expire = {
let expired_sessions = expired_sessions.clone();
Arc::new(move |session_id: String, worker_id: u64| {
expired_sessions
.lock()
.unwrap()
.push((session_id, worker_id));
})
};
let store = InMemoryAffinityStore {
map: Arc::new(DashMap::new()),
on_expire: Some(on_expire),
};
store.map.insert(
"sess-reaped".to_owned(),
AffinityEntry {
worker_id: 17,
ttl: Duration::from_secs(30),
expires_at: Instant::now() - Duration::from_secs(1),
},
);
store.reap_expired(Instant::now());
assert!(store.map.get("sess-reaped").is_none());
assert_eq!(
expired_sessions.lock().unwrap().as_slice(),
&[("sess-reaped".to_string(), 17)]
);
}
}
......@@ -332,10 +332,12 @@ impl OpenAIPreprocessor {
priority: hints.and_then(|h| h.priority),
lora_name,
allowed_worker_ids: None,
session_control: nvext.session_control.clone(),
};
builder.routing(Some(routing));
} else if lora_name.is_some() {
// Ensure routing hints exist when we have LoRA.
// Ensure routing hints exist when we have LoRA,
// even when nvext is absent.
builder.routing(Some(RoutingHints {
lora_name,
..Default::default()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment