Unverified Commit ee0218b5 authored by Yan Ru Pei's avatar Yan Ru Pei Committed by GitHub
Browse files

feat: plumb request priority through sglang and vllm handlers (#6348)


Signed-off-by: default avatarPeaBrane <yanrpei@gmail.com>
Co-authored-by: default avatarCursor <cursoragent@cursor.com>
parent d2a57839
...@@ -2,6 +2,7 @@ ...@@ -2,6 +2,7 @@
# SPDX-License-Identifier: Apache-2.0 # SPDX-License-Identifier: Apache-2.0
import asyncio import asyncio
import inspect
import logging import logging
import random import random
import socket import socket
...@@ -139,6 +140,15 @@ class BaseWorkerHandler(BaseGenerativeHandler): ...@@ -139,6 +140,15 @@ class BaseWorkerHandler(BaseGenerativeHandler):
else None else None
) )
self._engine_supports_priority = (
"priority" in inspect.signature(engine.async_generate).parameters
)
def _priority_kwargs(self, priority: Any) -> Dict[str, Any]:
if priority is not None and self._engine_supports_priority:
return {"priority": priority}
return {}
async def release_memory_occupation(self, body: dict) -> dict: async def release_memory_occupation(self, body: dict) -> dict:
"""Release GPU memory occupation and unregister from discovery. """Release GPU memory occupation and unregister from discovery.
......
...@@ -109,6 +109,7 @@ class DecodeWorkerHandler(BaseWorkerHandler): ...@@ -109,6 +109,7 @@ class DecodeWorkerHandler(BaseWorkerHandler):
trace_id = context.trace_id trace_id = context.trace_id
sampling_params = self._build_sampling_params(request) sampling_params = self._build_sampling_params(request)
input_param = self._get_input_param(request) input_param = self._get_input_param(request)
priority = (request.get("routing") or {}).get("priority")
if self.serving_mode == DisaggregationMode.DECODE: if self.serving_mode == DisaggregationMode.DECODE:
# Check if bootstrap_info is pre-computed in the request (from frontend) # Check if bootstrap_info is pre-computed in the request (from frontend)
...@@ -144,6 +145,7 @@ class DecodeWorkerHandler(BaseWorkerHandler): ...@@ -144,6 +145,7 @@ class DecodeWorkerHandler(BaseWorkerHandler):
external_trace_header=trace_header, external_trace_header=trace_header,
rid=trace_id, rid=trace_id,
data_parallel_rank=dp_rank, data_parallel_rank=dp_rank,
**self._priority_kwargs(priority),
) )
if self.skip_tokenizer_init: if self.skip_tokenizer_init:
...@@ -182,6 +184,7 @@ class DecodeWorkerHandler(BaseWorkerHandler): ...@@ -182,6 +184,7 @@ class DecodeWorkerHandler(BaseWorkerHandler):
external_trace_header=trace_header, external_trace_header=trace_header,
rid=trace_id, rid=trace_id,
data_parallel_rank=dp_rank, data_parallel_rank=dp_rank,
**self._priority_kwargs(priority),
) )
if self.skip_tokenizer_init: if self.skip_tokenizer_init:
async for out in self._process_token_stream(agg, context): async for out in self._process_token_stream(agg, context):
......
...@@ -118,6 +118,7 @@ class PrefillWorkerHandler(BaseWorkerHandler): ...@@ -118,6 +118,7 @@ class PrefillWorkerHandler(BaseWorkerHandler):
} }
input_param = self._get_input_param(inner_request) input_param = self._get_input_param(inner_request)
priority = (inner_request.get("routing") or {}).get("priority")
trace_header = self._get_trace_header(context) if self.enable_trace else None trace_header = self._get_trace_header(context) if self.enable_trace else None
...@@ -130,6 +131,7 @@ class PrefillWorkerHandler(BaseWorkerHandler): ...@@ -130,6 +131,7 @@ class PrefillWorkerHandler(BaseWorkerHandler):
bootstrap_room=bootstrap_room, bootstrap_room=bootstrap_room,
external_trace_header=trace_header, external_trace_header=trace_header,
rid=trace_id, rid=trace_id,
**self._priority_kwargs(priority),
) )
task = asyncio.create_task(self._consume_results(results, context)) task = asyncio.create_task(self._consume_results(results, context))
......
...@@ -1188,6 +1188,7 @@ class BaseWorkerHandler(ABC): ...@@ -1188,6 +1188,7 @@ class BaseWorkerHandler(ABC):
lora_request=None, lora_request=None,
embedding_sequence_length=None, embedding_sequence_length=None,
trace_headers=None, trace_headers=None,
priority=0,
): ):
try: try:
# Log LoRA usage for this generation (debug level to avoid log spam) # Log LoRA usage for this generation (debug level to avoid log spam)
...@@ -1203,6 +1204,7 @@ class BaseWorkerHandler(ABC): ...@@ -1203,6 +1204,7 @@ class BaseWorkerHandler(ABC):
lora_request=lora_request, lora_request=lora_request,
data_parallel_rank=data_parallel_rank, data_parallel_rank=data_parallel_rank,
trace_headers=trace_headers, trace_headers=trace_headers,
priority=priority,
) )
num_output_tokens_so_far = 0 num_output_tokens_so_far = 0
...@@ -1362,7 +1364,9 @@ class DecodeWorkerHandler(BaseWorkerHandler): ...@@ -1362,7 +1364,9 @@ class DecodeWorkerHandler(BaseWorkerHandler):
logger.debug( logger.debug(
f"Decode request {request_id} has no LoRA specified (model: {model_name})" f"Decode request {request_id} has no LoRA specified (model: {model_name})"
) )
dp_rank = request.get("routing", {}).get("dp_rank") routing = request.get("routing") or {}
dp_rank = routing.get("dp_rank")
priority = routing.get("priority", 0)
trace_headers = build_trace_headers(context) trace_headers = build_trace_headers(context)
...@@ -1376,6 +1380,7 @@ class DecodeWorkerHandler(BaseWorkerHandler): ...@@ -1376,6 +1380,7 @@ class DecodeWorkerHandler(BaseWorkerHandler):
lora_request=lora_request, lora_request=lora_request,
embedding_sequence_length=embedding_sequence_length, embedding_sequence_length=embedding_sequence_length,
trace_headers=trace_headers, trace_headers=trace_headers,
priority=priority,
): ):
if prefill_result is not None and "completion_usage" in tok: if prefill_result is not None and "completion_usage" in tok:
tok["completion_usage"][ tok["completion_usage"][
...@@ -1406,7 +1411,9 @@ class DecodeWorkerHandler(BaseWorkerHandler): ...@@ -1406,7 +1411,9 @@ class DecodeWorkerHandler(BaseWorkerHandler):
request, self.default_sampling_params request, self.default_sampling_params
) )
dp_rank = request.get("routing", {}).get("dp_rank") routing = request.get("routing") or {}
dp_rank = routing.get("dp_rank")
priority = routing.get("priority", 0)
openai_request_id = request.get("id") or request.get("request_id", request_id) openai_request_id = request.get("id") or request.get("request_id", request_id)
previous_text = "" previous_text = ""
...@@ -1420,6 +1427,7 @@ class DecodeWorkerHandler(BaseWorkerHandler): ...@@ -1420,6 +1427,7 @@ class DecodeWorkerHandler(BaseWorkerHandler):
request_id, request_id,
data_parallel_rank=dp_rank, data_parallel_rank=dp_rank,
trace_headers=trace_headers, trace_headers=trace_headers,
priority=priority,
) )
async for res in gen: async for res in gen:
...@@ -1574,7 +1582,9 @@ class PrefillWorkerHandler(BaseWorkerHandler): ...@@ -1574,7 +1582,9 @@ class PrefillWorkerHandler(BaseWorkerHandler):
f"Prefill request {request_id} has no LoRA specified (model: {model_name})" f"Prefill request {request_id} has no LoRA specified (model: {model_name})"
) )
dp_rank = request.get("routing", {}).get("dp_rank") routing = request.get("routing") or {}
dp_rank = routing.get("dp_rank")
priority = routing.get("priority", 0)
trace_headers = build_trace_headers(context) trace_headers = build_trace_headers(context)
...@@ -1587,6 +1597,7 @@ class PrefillWorkerHandler(BaseWorkerHandler): ...@@ -1587,6 +1597,7 @@ class PrefillWorkerHandler(BaseWorkerHandler):
data_parallel_rank=dp_rank, data_parallel_rank=dp_rank,
lora_request=lora_request, lora_request=lora_request,
trace_headers=trace_headers, trace_headers=trace_headers,
priority=priority,
) )
except EngineDeadError as e: except EngineDeadError as e:
logger.error(f"vLLM EngineDeadError: {e}") logger.error(f"vLLM EngineDeadError: {e}")
......
...@@ -164,4 +164,5 @@ Backends auto-register with the frontend when they call `register_model()`. Supp ...@@ -164,4 +164,5 @@ Backends auto-register with the frontend when they call `register_model()`. Supp
| Document | Description | | Document | Description |
|----------|-------------| |----------|-------------|
| [Frontend Overview](README.md) | Quick start and feature matrix | | [Frontend Overview](README.md) | Quick start and feature matrix |
| [NVIDIA Request Extensions (`nvext`)](nvext.md) | Routing, preprocessing, response metadata, and engine priority extensions |
| [Router Documentation](../router/README.md) | KV-aware routing configuration | | [Router Documentation](../router/README.md) | KV-aware routing configuration |
---
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
title: NVIDIA Request Extensions (nvext)
---
# NVIDIA Request Extensions (`nvext`)
`nvext` is a top-level JSON object on the request body that provides NVIDIA-specific extensions to the OpenAI-compatible API. `nvext` fields are consumed by the Dynamo frontend, preprocessor, router, and backend workers to control routing, preprocessing, response metadata, scheduling, and engine-level priority.
## Usage
Include `nvext` as a top-level field alongside standard OpenAI-compatible fields:
```json
{
"model": "my-model",
"messages": [{"role": "user", "content": "Hello"}],
"nvext": {
"greed_sampling": true,
"extra_fields": ["worker_id", "timing"],
"agent_hints": {
"latency_sensitivity": 5.0,
"osl": 1024,
"priority": 5
}
}
}
```
## Field Reference
| Field | Type | Default | Consumed By | Description |
|-------|------|---------|-------------|-------------|
| `greed_sampling` | `bool` | `None` | Preprocessor | Forces greedy sampling regardless of other sampling parameters. |
| `use_raw_prompt` | `bool` | `None` | Preprocessor | Bypasses the prompt template and passes the prompt directly to the tokenizer. |
| `annotations` | `string[]` | `None` | Preprocessor | Triggers out-of-band information in the SSE stream via the `event:` field. |
| `backend_instance_id` | `u64` | `None` | Router | Routes the request to a specific backend instance. |
| `token_data` | `u32[]` | `None` | Preprocessor | Pre-tokenized prompt tokens. When provided with `backend_instance_id`, tokenization is skipped. |
| `max_thinking_tokens` | `u32` | `None` | Backend | Maximum thinking tokens allowed (passed through to backends). |
| `extra_fields` | `string[]` | `None` | Response builder | Fields to include in the response `nvext`. Supported: `"worker_id"`, `"timing"`. |
| `prefill_worker_id` | `u64` | `None` | Router | Routes the request to a specific prefill worker (disaggregated serving). |
| `decode_worker_id` | `u64` | `None` | Router | Routes the request to a specific decode worker (disaggregated serving). |
| `agent_hints` | object | `None` | Router | Per-request hints for scheduling and load balancing. See [Agent Hints](#agent-hints). |
### Header Overrides
Routing fields can also be set via HTTP headers, which take priority over `nvext` values:
| Header | Overrides |
|--------|-----------|
| `x-worker-instance-id` | `backend_instance_id` and `decode_worker_id` |
| `x-prefill-instance-id` | `prefill_worker_id` |
## Agent Hints
The `agent_hints` sub-object carries per-request hints that the router uses for scheduling, load balancing, and KV cache optimization.
| Field | Type | Default | Description |
|-------|------|---------|-------------|
| `latency_sensitivity` | `f64` | `None` | Priority scheduling hint in seconds. Shifts the request's effective arrival time earlier in the router queue. Requires `--router-queue-threshold`. |
| `osl` | `u32` | `None` | Expected output sequence length (tokens). Used for output block tracking and resource estimation. |
| `speculative_prefill` | `bool` | `false` | When `true`, speculatively prefills the predicted next-turn prompt after the current turn completes to warm the KV cache. |
| `priority` | `i32` | `None` | Backend engine scheduling priority. Forwarded to the engine's generate call for queue ordering, preemption, and KV cache eviction. |
### `latency_sensitivity`
When `--router-queue-threshold` is set and the queue is active, this value shifts the request's effective arrival time earlier in the queue, giving it priority over requests with lower (or no) `latency_sensitivity`. A value of `5.0` means the request is treated as if it arrived 5 seconds earlier than it actually did. Has no effect when queueing is disabled.
```json
{
"nvext": {
"agent_hints": {
"latency_sensitivity": 5.0
}
}
}
```
### `osl`
Expected output sequence length — the estimated number of output tokens the request will generate. The router uses this hint in two ways:
1. **Output block tracking**: When `--track-output-blocks` is enabled, the router adds placeholder blocks during generation and applies fractional decay based on progress toward `osl`.
2. **Resource estimation**: Helps the router estimate total resource requirements when making routing decisions.
```json
{
"nvext": {
"agent_hints": {
"osl": 1024
}
}
}
```
### `speculative_prefill`
When set to `true`, the system speculatively prefills the predicted next-turn prompt after the current assistant turn completes. This is designed for multi-turn agentic workloads where the next request's prefix is predictable.
How it works:
1. As the assistant response streams, the system accumulates the full response text.
2. Once the response finishes, a background task constructs the next-turn prompt by appending the assistant response to the conversation history (with thinking content stripped for non-last turns).
3. The constructed prompt is tokenized and sent as a `max_tokens=1` request to warm the KV cache on a worker.
4. When the actual next request arrives, it benefits from the already-warm KV cache, reducing TTFT.
```json
{
"nvext": {
"agent_hints": {
"speculative_prefill": true
}
}
}
```
### `priority`
Backend engine scheduling priority forwarded to the engine's `generate` call. Influences queue ordering, KV cache eviction under memory pressure, and preemption of running requests.
The semantics of the priority value differ between backends:
- **vLLM**: Smaller values = higher priority. A request with `priority: 0` is scheduled before `priority: 10`. Ties are broken by arrival time. Requires `--scheduling-policy priority` on the engine.
- **SGLang**: By default, larger values = higher priority. This can be inverted with `--schedule-low-priority-values-first` to match vLLM's convention. Requires `--enable-priority-scheduling` on the engine.
When omitted, vLLM defaults to `0`; SGLang defaults to `None` (engine default). TensorRT-LLM does not currently support per-request priority.
```json
{
"nvext": {
"agent_hints": {
"priority": 5
}
}
}
```
## Response Extensions
When the client requests response metadata via `extra_fields`, the response includes an `nvext` object with the requested fields:
| Field | Requested Via | Description |
|-------|---------------|-------------|
| `worker_id` | `extra_fields: ["worker_id"]` | Prefill/decode worker IDs and data parallel ranks that processed the request. |
| `timing` | `extra_fields: ["timing"]` | Per-request timing information (TTFT, ITL, queue time, etc.). |
| `token_ids` | Automatic (GAIE Stage 1) | Tokenized prompt for reuse in Stage 2 query-only mode. |
### Example response `nvext`
```json
{
"nvext": {
"worker_id": {
"prefill_worker_id": 1,
"prefill_dp_rank": 0,
"decode_worker_id": 2,
"decode_dp_rank": 0
},
"timing": {
"ttft_ms": 45.2,
"itl_ms": 12.1
}
}
}
```
## See Also
| Document | Description |
|----------|-------------|
| [Frontend Guide](frontend-guide.md) | KServe gRPC configuration and integration |
| [Router Guide](../router/router-guide.md) | Full router configuration and CLI arguments |
...@@ -283,6 +283,7 @@ impl OpenAIPreprocessor { ...@@ -283,6 +283,7 @@ impl OpenAIPreprocessor {
dp_rank: None, // dp_rank is set later in the pipeline dp_rank: None, // dp_rank is set later in the pipeline
expected_output_tokens: hints.and_then(|h| h.osl), expected_output_tokens: hints.and_then(|h| h.osl),
priority_jump: hints.and_then(|h| h.latency_sensitivity), priority_jump: hints.and_then(|h| h.latency_sensitivity),
priority: hints.and_then(|h| h.priority),
lora_name, lora_name,
}; };
builder.routing(Some(routing)); builder.routing(Some(routing));
......
...@@ -50,6 +50,10 @@ pub struct RoutingHints { ...@@ -50,6 +50,10 @@ pub struct RoutingHints {
/// ahead in the scheduler queue. /// ahead in the scheduler queue.
#[serde(default, skip_serializing_if = "Option::is_none")] #[serde(default, skip_serializing_if = "Option::is_none")]
pub priority_jump: Option<f64>, pub priority_jump: Option<f64>,
/// Backend engine scheduling priority forwarded to the generate call.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub priority: Option<i32>,
} }
#[derive(Serialize, Deserialize, Debug, Clone, Default)] #[derive(Serialize, Deserialize, Debug, Clone, Default)]
......
...@@ -180,6 +180,14 @@ pub struct AgentHints { ...@@ -180,6 +180,14 @@ pub struct AgentHints {
#[builder(default, setter(strip_option))] #[builder(default, setter(strip_option))]
#[serde(default, skip_serializing_if = "Option::is_none")] #[serde(default, skip_serializing_if = "Option::is_none")]
pub speculative_prefill: Option<bool>, pub speculative_prefill: Option<bool>,
/// Backend engine scheduling priority.
/// Forwarded to the engine's generate call for queue ordering, KV cache eviction,
/// and preemption decisions. Interpretation is backend-specific:
/// vLLM uses lower-is-higher, SGLang uses higher-is-higher (configurable).
#[builder(default, setter(strip_option))]
#[serde(default, skip_serializing_if = "Option::is_none")]
pub priority: Option<i32>,
} }
impl Default for NvExt { impl Default for NvExt {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment