Unverified Commit 39d645e5 authored by Jonathan Tong's avatar Jonathan Tong Committed by GitHub
Browse files

docs: migrate Fern docs from fern/ into docs/ (#6206)


Signed-off-by: default avatarJont828 <jt572@cornell.edu>
parent d381e6ff
<!--
SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
SPDX-License-Identifier: Apache-2.0
-->
# Router Design
This document describes the internal architecture of the Dynamo KV Router, including block tracking mechanisms, the KV cache optimization system, event handling, and transport modes.
## KV Router Architecture
The KV Router tracks two key metrics for each worker:
1. **Potential Active Blocks**: The number of blocks that would be used for decoding if a request is routed to a worker. This includes both existing active blocks and new blocks from the incoming request.
2. **Potential New Prefill Blocks**: The number of tokens that need to be computed from scratch on a worker, calculated as:
- New prefill tokens = Total input tokens - (Overlap blocks × Block size)
- Potential prefill blocks = New prefill tokens / Block size
### Block Tracking Mechanisms
The router maintains block information through two complementary systems:
- **Active Decoding Blocks**: Tracked locally by the router throughout the request lifecycle:
- Incremented when adding a new request
- Updated during token generation
- Decremented upon request completion
- **Cached Blocks**: Maintained globally by the KvIndexer using a prefix tree built from worker-reported KV events. This provides accurate overlap information for routing decisions.
## KV Cache Router
The leading Large Language Models (LLMs) today are auto-regressive and based off of the [transformer architecture](https://proceedings.neurips.cc/paper_files/paper/2017/file/3f5ee243547dee91fbd053c1c4a845aa-Paper.pdf). One key inference optimization technique is to cache the already computed keys and values and to reuse them for the future tokens. This is called the [KV Cache](https://developer.nvidia.com/blog/mastering-llm-techniques-inference-optimization/#key-value_caching).
### KV Cache Routing and Load Balancing
```mermaid
graph TD
T[Tokens] --> R[KV Aware Router]
R -.-> W1["Worker 1<br/>Cached: 2 blocks<br/>Prefill: 8 blks<br/>Decode: 10 blks"]
R ==>|Selected| W2["Worker 2<br/>Cached: 5 blocks<br/>Prefill: 5 blks<br/>Decode: 5 blks"]
R -.-> W3["Worker 3<br/>Cached: 8 blocks<br/>Prefill: 2 blks<br/>Decode: 9 blks"]
style T fill:#fff3e0,stroke:#333,color:#333
style R fill:#2e8b57,stroke:#333,color:#fff
style W1 fill:#f3e5f5,stroke:#333,color:#333
style W2 fill:#c8e6c9,stroke:#333,color:#333
style W3 fill:#f3e5f5,stroke:#333,color:#333
linkStyle 0,1,2,3 stroke:#8b4513,stroke-width:2px
```
The router uses a cost function that considers both the prefill cost (influenced by cached blocks) and the decode load to make optimal routing decisions.
#### Cost Calculation
1. **Prefill blocks**: Calculated by dividing the number of tokens requiring prefill processing by the block size. The system predicts this based on input tokens and available cached blocks per worker, updating the count when the first output token signals prefill completion.
2. **Decode blocks**: Estimated from the request's input tokens and each worker's active sequences. The count updates when requests complete and their blocks are freed.
3. **Cost formula**: `cost = overlap_score_weight * prefill_blocks + decode_blocks`
- Lower costs indicate better routing choices
- `overlap_score_weight` balances cache hit optimization against load distribution
- Higher weights favor cache reuse (improving TTFT), while lower weights prioritize even load distribution (improving ITL)
#### Worker Selection
The router selects the worker with the lowest cost. When `router_temperature` is set to a non-zero value, the router uses softmax sampling on the normalized cost logits to introduce randomness in the selection, which can help with load distribution.
Example calculation with `overlap_score_weight = 1.0`:
- Worker 1: cost = 1.0 * 8 + 10 = 18
- **Worker 2: cost = 1.0 * 5 + 5 = 10** (selected - lowest cost)
- Worker 3: cost = 1.0 * 2 + 9 = 11
### KV Cache Optimizations
Every inference framework will have a KV Cache for each worker. A popular inference framework library is [vLLM](https://github.com/vllm-project/vllm) where a key contribution was [PagedAttention](https://arxiv.org/abs/2309.06180), which allowed them to manage KV Cache in an efficient way by chunking requests into blocks.
Another popular inference framework, [SGLang](https://github.com/sgl-project/sglang), contributed [RadixAttention](https://arxiv.org/abs/2312.07104) which introduced a prefix tree which allows for efficient matching, inserting and eviction of KV Cache blocks. The prefix tree structure popularized KV Cache reuse.
In Dynamo, we introduce a KVPublisher which emits KV Cache events that occur at each worker and a KVIndexer which keeps track of these events globally.
### KV Block Management Flow
To get a feel for how KV Cache management works on a single worker with KV Cache reuse turned on and where the KVPublisher gets plugged in, we can walk through the KV Block management flow:
1. **Request tokenization**: The incoming prompt is converted into tokens
2. **Block partitioning**: The token sequence is divided into fixed-size blocks (e.g., 16 or 64 tokens per block)
3. **Block hashing**: Each block of tokens is hashed to create a unique identifier
4. **Cache lookup**:
- For each block, the system checks if a matching block already exists in the KV cache
- If a match is found, the existing KV cache block is reused
- If no match is found, the system proceeds to the next step
5. **Resource allocation**:
- For blocks without matches, the system attempts to allocate new memory space
- If sufficient memory is available, allocate memory space and proceed to step 7
- If memory is constrained, proceed to step 6
6. **Cache eviction** (when necessary):
- The system applies an eviction policy (e.g., LRU, LFU) to identify blocks for removal
- Selected blocks are evicted from the cache
- **KVPublisher emits a KV removed event notifying KVIndexer about the removed block.**
- Alternatively, some systems may offload less-frequently used blocks to CPU memory.
7. **KV computation**:
- For new blocks, the model computes key and value tensors
- These tensors are stored in the newly allocated cache blocks
- **KVPublisher emits a kv stored event notifying KVIndexer about newly stored blocks**.
Further details can be found for: [TRT-LLM](https://developer.nvidia.com/blog/introducing-new-kv-cache-reuse-optimizations-in-nvidia-tensorrt-llm/), [vLLM](https://docs.vllm.ai/en/latest/design/automatic_prefix_caching.html#design-automatic-prefix-caching) and [SGLang](https://lmsys.org/blog/2024-01-17-sglang/).
## Events
### KVPublisher
The KVPublisher can be initialized and then called in the inference framework where blocks are allocated and removed.
The two types of events are:
- KV stored event
- KV removed event
The publisher can be initialized and used through C bindings or Python bindings.
### Deterministic Event IDs
Engines do not need to emit deterministic block identifiers in KV events, as the router uses local block hashes (computed from token content) for tracking and matching blocks across workers. However, it is strongly preferred that engines do emit deterministic block identifiers, as this keeps the KvIndexer's internal lookup table smaller and more efficient. To ensure deterministic behavior, all workers should use identical engine versions/configuration. If your engine relies on Python's built-in `hash()` for any event IDs, set `PYTHONHASHSEED=0`; otherwise this setting has no effect.
### KVIndexer
The KVIndexer builds and maintains a global view of cached blocks in a prefix tree. We modify the original prefix tree by also storing the worker id on each node. This is so we can return the number of matched blocks for each worker.
The KVIndexer has a method `find_matches_for_request`, which takes in tokens and returns a dictionary with keys of worker id and values of the number of matched KV Blocks.
### Inter-Router Communication
In distributed deployments with multiple routers, each router maintains visibility over only a portion of the total requests. To ensure consistent routing decisions, routers synchronize their states through three event types:
1. **AddRequest**: Notifies other routers when a request is assigned to a worker. Includes request ID, worker ID, token sequence blocks, and overlap score to track block usage across the system.
2. **MarkPrefillCompleted**: Signals when a request moves from prefill to decode phase, allowing routers to update their worker load calculations by excluding completed prefill tokens.
3. **Free**: Indicates request completion and resource release, enabling accurate block reference counting across all routers.
Each event carries a unique router ID to prevent self-event processing. This asynchronous communication system ensures optimal routing decisions by maintaining consistent KV cache state across all routers, even as they handle different request streams.
## Event Transport Modes
The router supports two event transport modes for KV cache state synchronization:
- **NATS Core / Event Plane with Local Indexer (default)**: Fire-and-forget pub/sub where workers maintain local radix trees (enabled by default). Router rebuilds state by querying workers on startup. Lower latency, simpler setup. Works with both NATS Core and ZMQ event planes.
- **JetStream** (`--durable-kv-events` on **both** frontend **and** workers): Persistent event stream with durable consumers. State persists across router restarts via snapshots in NATS object store. Best for production with multi-replica consistency. **Important:** Both the frontend and all workers must specify `--durable-kv-events` for JetStream mode to work correctly.
### JetStream Mode (Opt-in)
KV events are sent to a persistent NATS JetStream. Each KV router/indexer replica acts as a durable consumer, pulling messages from this shared stream. This architecture ensures consistency across router replicas and persistence across restarts.
- **Best for**: Production deployments requiring durability and multi-replica router consistency
- **Tradeoffs**: Requires JetStream setup; slightly higher latency due to persistence guarantees
- **Enable with**: `--durable-kv-events` flag on **both** the frontend **and** all workers
> [!Note]
> **Both frontend and workers must specify `--durable-kv-events`** for JetStream mode to work correctly. The frontend uses this flag to consume from JetStream, while workers use it to publish to JetStream instead of the local indexer.
```mermaid
graph TD
subgraph Engines
E1[Engine 1<br/>KVPublisher]
E2[Engine 2<br/>KVPublisher]
E3[Engine 3<br/>KVPublisher]
end
subgraph "NATS JetStream"
JS[(Persistent KV Events Stream<br/>- Block created<br/>- Block removed)]
end
subgraph "NATS Object Store"
OS[(Radix Tree<br/>State Snapshot)]
end
subgraph "Router Replicas"
R1[Router 1<br/>KVIndexer]
R2[Router 2<br/>KVIndexer]
end
E1 -->|Publish Events| JS
E2 -->|Publish Events| JS
E3 -->|Publish Events| JS
JS -->|Consume as Durable Consumer| R1
JS -->|Consume as Durable Consumer| R2
JS -->|Periodic Snapshot| OS
style JS fill:#e1f5fe,stroke:#333,color:#333
style OS fill:#e1f5fe,stroke:#333,color:#333
style E1 fill:#f3e5f5,stroke:#333,color:#333
style E2 fill:#f3e5f5,stroke:#333,color:#333
style E3 fill:#f3e5f5,stroke:#333,color:#333
style R1 fill:#2e8b57,stroke:#333,color:#fff
style R2 fill:#2e8b57,stroke:#333,color:#fff
linkStyle 0,1,2,3,4,5 stroke:#2196f3,stroke-width:2px
```
### NATS Core / Event Plane with Local Indexer (Default)
By default, workers have local indexer enabled. Each worker maintains its own local radix tree (local indexer) and publishes events over the generic event plane (NATS Core or ZMQ, depending on `--event-plane`). Each worker assigns monotonically increasing event IDs to its events. The router detects gaps in event sequences and recovers missed events by querying the worker's local indexer directly.
- **Best for**: Lower-latency setups; simpler deployments without JetStream; single-router scenarios; deployments without NATS (using ZMQ event plane)
- **Tradeoffs**: State persists on workers (not centralized); recovery depends on workers being available
- **Switch to JetStream**: Use `--durable-kv-events` flag on **both** workers (vLLM, SGLang, TRT-LLM, mocker) **and** frontend
```mermaid
graph TD
subgraph Engines
E1[Engine 1<br/>LocalKvIndexer]
E2[Engine 2<br/>LocalKvIndexer]
E3[Engine 3<br/>LocalKvIndexer]
end
subgraph "NATS Core"
NC[KV Events Pub/Sub<br/>- Block created<br/>- Block removed]
end
subgraph "Router Replicas"
R1[Router 1<br/>KVIndexer]
R2[Router 2<br/>KVIndexer]
end
E1 -->|Publish Events| NC
E2 -->|Publish Events| NC
E3 -->|Publish Events| NC
NC -->|Subscribe| R1
NC -->|Subscribe| R2
style NC fill:#e1f5fe,stroke:#333,color:#333
style E1 fill:#f3e5f5,stroke:#333,color:#333
style E2 fill:#f3e5f5,stroke:#333,color:#333
style E3 fill:#f3e5f5,stroke:#333,color:#333
style R1 fill:#2e8b57,stroke:#333,color:#fff
style R2 fill:#2e8b57,stroke:#333,color:#fff
linkStyle 0,1,2,3,4 stroke:#2196f3,stroke-width:2px
```
**How gap detection works:**
1. Each worker assigns monotonically increasing event IDs starting from 0
2. The router tracks the last received event ID per worker
3. If an event arrives with `event_id > last_id + 1`, the router detects a gap
4. The router queries the worker's local indexer for the missing event range `[last_id+1, event_id-1]`
5. On worker discovery (Added event), the router dumps the worker's entire local indexer state
**Startup behavior:**
- When a worker is discovered, the router queries and ingests its full local indexer state
- When a worker is removed, the router removes all its blocks from the global radix tree
>[!Note]
> By default, all workers have `enable_local_indexer=true`, so the router uses NATS Core / Event Plane mode with local indexer. To use JetStream mode instead, specify `--durable-kv-events` on **both** the frontend and all workers.
### Local Active Block Management with Replica Sync
In addition to cached blocks, each router replica needs to track active blocks (blocks being used for ongoing generation) as load metrics. Since this information is highly time-sensitive, it should be predicted immediately when:
- The router receives and routes a request
- The first token is generated (prefill complete)
- The response ends (request freed)
This is managed locally in each router via a "slot manager". To maintain consistency across the system, router replicas synchronize these local predictions with each other through NATS core messaging.
```mermaid
sequenceDiagram
participant C1 as Client 1
participant R1 as Router 1<br/>(Slot Manager)
participant R2 as Router 2<br/>(Slot Manager)
participant C2 as Client 2
Note over R1,R2: Router Replica Sync Enabled
C1->>R1: Request A
activate R1
R1->>R1: Predict blocks & route to worker
R1-->>R2: Sync: AddRequest(A)
C2->>R2: Request B
activate R2
R2->>R2: Predict blocks & route to worker
R2-->>R1: Sync: AddRequest(B)
R1->>R1: First token received<br/>(prefill complete)
R1-->>R2: Sync: MarkPrefillCompleted(A)
R1->>C1: Stream response
R2->>R2: First token received<br/>(prefill complete)
R2-->>R1: Sync: MarkPrefillCompleted(B)
R2->>C2: Stream response
R1->>R1: Response complete<br/>(free blocks)
R1-->>R2: Sync: Free(A)
deactivate R1
R2->>R2: Response complete<br/>(free blocks)
R2-->>R1: Sync: Free(B)
deactivate R2
Note over R1,R2: Both routers have consistent<br/>view of active blocks
```
This dual-layer approach—persistent global KV cache state via JetStream and ephemeral active block synchronization via router replicas—enables the system to make optimal routing decisions that balance cache reuse with load distribution.
## See Also
- **[Router README](../components/router/README.md)**: Quick start guide for the KV Router
- **[Router Guide](../components/router/router_guide.md)**: Configuration, tuning, and production setup
- **[Router Examples](../components/router/router_examples.md)**: Python API usage and custom routing patterns
- **[KV Event Publishing for Custom Engines](../integrations/kv_events_custom_engines.md)**: Integrate custom inference engines with KV-aware routing
<!--
SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES.
All rights reserved.
SPDX-License-Identifier: Apache-2.0
-->
# Writing Python Workers in Dynamo
This guide explains how to create your own Python worker in Dynamo.
The [dynamo](https://pypi.org/project/ai-dynamo/) Python library allows you to build your own engine and attach it to Dynamo.
The Python file must do three things:
1. Decorate a function to get the runtime
2. Register on the network
3. Attach a request handler
```
from dynamo.llm import ModelInput, ModelType, register_llm
from dynamo.runtime import DistributedRuntime, dynamo_worker
# 1. Decorate a function to get the runtime
#
@dynamo_worker()
async def worker(runtime: DistributedRuntime):
# 2. Register ourselves on the network
#
component = runtime.namespace("namespace").component("component")
model_path = "Qwen/Qwen3-0.6B" # or "/data/models/Qwen3-0.6B"
model_input = ModelInput.Tokens # or ModelInput.Text if engine handles pre-processing
model_type = ModelType.Chat # or ModelType.Chat | ModelType.Completions if model can be deployed on chat and completions endpoints
endpoint = component.endpoint("endpoint")
# Optional last param to register_llm is model_name. If not present derives it from model_path
await register_llm(model_input, model_type, endpoint, model_path)
# Initialize your engine here
# engine = ...
# 3. Attach request handler
#
await endpoint.serve_endpoint(RequestHandler(engine).generate)
class RequestHandler:
def __init__(self, engine):
...
async def generate(self, request):
# Call the engine
# yield result dict
...
if __name__ == "__main__":
uvloop.install()
asyncio.run(worker())
```
The `model_path` can be:
- A HuggingFace repo ID, optionally prefixed with `hf://`. It is downloaded and cached locally.
- The path to a checkout of a HuggingFace repo - any folder containing safetensor files as well as `config.json`, `tokenizer.json` and `tokenizer_config.json`.
The `model_input` can be:
- ModelInput.Tokens. Your engine expects pre-processed input (token IDs). Dynamo handles tokenization and pre-processing.
- ModelInput.Text. Your engine expects raw text input and handles its own tokenization and pre-processing.
The `model_type` can be:
- ModelType.Chat. Your `generate` method receives a `request` and must return a response dict of type [OpenAI Chat Completion](https://platform.openai.com/docs/api-reference/chat).
- ModelType.Completions. Your `generate` method receives a `request` and must return a response dict of the older [Completions](https://platform.openai.com/docs/api-reference/completions).
`register_llm` can also take the following kwargs:
- `model_name`: The name to call the model. Your incoming HTTP requests model name must match this. Defaults to the hugging face repo name or the folder name.
- `context_length`: Max model length in tokens. Defaults to the model's set max. Only set this if you need to reduce KV cache allocation to fit into VRAM.
- `kv_cache_block_size`: Size of a KV block for the engine, in tokens. Defaults to 16.
- `user_data`: Optional dictionary containing custom metadata for worker behavior (e.g., LoRA configuration). Defaults to None.
See `examples/backends` for full code examples.
## Component names
A worker needs three names to register itself: namespace.component.endpoint
* *Namespace*: A pipeline. Usually a model. e.g "llama_8b". Just a name.
* *Component*: A load balanced service needed to run that pipeline. "backend", "prefill", "decode", "preprocessor", "draft", etc. This typically has some configuration (which model to use, for example).
* *Endpoint*: Like a URL. "generate", "load_metrics".
* *Instance*: A process. Unique. Dynamo assigns each one a unique instance_id. The thing that is running is always an instance. Namespace/component/endpoint can refer to multiple instances.
If you run two models, that is two pipelines. An exception would be if doing speculative decoding. The draft model is part of the pipeline of a bigger model.
If you run two instances of the same model ("data parallel") they are the same namespace+component+endpoint but different instances. The router will spread traffic over all the instances of a namespace+component+endpoint. If you have four prefill workers in a pipeline, they all have the same namespace+component+endpoint and are automatically assigned unique instance_ids.
Example 1: Data parallel load balanced, one model one pipeline two instances.
```
Node 1: namespace: qwen3-32b, component: backend, endpoint: generate, model: /data/Qwen3-32B --tensor-parallel-size 2 --base-gpu-id 0
Node 2: namespace: qwen3-32b, component: backend, endpoint: generate model: /data/Qwen3-32B --tensor-parallel-size 2 --base-gpu-id 2
```
Example 2: Two models, two pipelines.
```
Node 1: namespace: qwen3-32b, component: backend, endpoint: generate, model: /data/Qwen3-32B
Node 2: namespace: llama3-1-8b, component: backend, endpoint: generat, model: /data/Llama-3.1-8B-Instruct/
```
Example 3: Different endpoints.
The KV metrics publisher in VLLM adds a `load_metrics` endpoint to the current component. If the `llama3-1-8b.backend` component above is using patched vllm it will also expose `llama3-1-8b.backend.load_metrics`.
Example 4: Multiple component in a pipeline.
In the P/D disaggregated setup you would have `deepseek-distill-llama8b.prefill.generate` (possibly multiple instances of this) and `deepseek-distill-llama8b.decode.generate`.
## Migrate Ongoing Requests
A Python worker may need to be shut down promptly, for example when the node running the worker is to be reclaimed and there isn't enough time to complete all ongoing requests before the shutdown deadline.
In such cases, you can signal incomplete responses by raising a `GeneratorExit` exception in your generate loop. This will immediately close the response stream, signaling to the frontend that the stream is incomplete. With request migration enabled (see the [`migration_limit`](../fault_tolerance/request_migration.md) parameter), the frontend will automatically migrate the partially completed request to another worker instance, if available, to be completed.
> [!WARNING]
> We will update the `GeneratorExit` exception to a new Dynamo exception. Please expect minor code breaking change in the near future.
Here's an example of how to implement this in your `RequestHandler`:
```python
class RequestHandler:
async def generate(self, request):
"""Generate response, with support for request migration"""
for result in self.engine.generate_streaming(request):
# Check if we need to migrate before yielding each token
if is_shutting_down():
# Raising GeneratorExit closes the stream and triggers migration
raise GeneratorExit("Worker shutting down, migrating request")
yield result
```
When `GeneratorExit` is raised, the frontend receives the incomplete response and can seamlessly continue generation on another available worker instance, preserving the user experience even during worker shutdowns.
For more information about how request migration works, see the [Request Migration Architecture](../fault_tolerance/request_migration.md) documentation.
## Request Cancellation
Your Python worker's request handler can optionally support request cancellation by accepting a `context` argument after the `request` argument. This context object allows you to check for cancellation signals and respond appropriately:
```python
class RequestHandler:
async def generate(self, request, context):
"""Generate response with cancellation support"""
for result in self.engine.generate_streaming(request):
# Check if the request has been cancelled
if context.is_stopped():
# Stop processing and clean up
break
yield result
```
The context parameter is optional - if your generate method doesn't include it in its signature, Dynamo will call your method without the context argument.
For detailed information about request cancellation, including async cancellation monitoring and context propagation patterns, see the [Request Cancellation Architecture](../fault_tolerance/request_cancellation.md) documentation.
<!--
SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
SPDX-License-Identifier: Apache-2.0
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
# JailedStream Implementation
## Overview
The `JailedStream` is a standalone implementation for handling "jail" detection in token streams. It provides a clean, builder-based API for accumulating tokens when certain sequences are detected, then releasing them as a single chunk when the jail ends.
## Key Features
- **Builder Pattern**: Clean configuration API using the builder pattern
- **Configurable Sequences**: Support for multiple start/end jail sequences
- **Tool Call Parsing**: Integrated tool call detection and parsing
- **Stream Macro**: Uses `async-stream::stream!` for clean async implementation
- **Standalone**: Completely independent of existing code
- **Annotations**: Preserves annotations for observability
## Implementation
### Location
- Main implementation: `lib/llm/src/protocols/openai/chat_completions/jail.rs`
- Examples: `lib/llm/src/protocols/openai/chat_completions/jail_example.rs`
### Usage
```rust
use crate::protocols::openai::chat_completions::jail::JailedStream;
use dynamo_runtime::engine::{AsyncEngineContextProvider, ResponseStream};
// Get your ResponseStream with context
let response_stream: Pin<Box<ResponseStream<_>>> = get_stream_from_engine();
// Extract context BEFORE passing to apply
let context = response_stream.context();
// Apply jail transformation (ResponseStream implements Stream)
let jail = JailedStream::builder()
.tool_call_parser("nemotron_deci")
.build();
let jailed_stream = jail.apply(response_stream);
// Re-wrap with context when needed for engine consumption
let final_stream = ResponseStream::new(Box::pin(jailed_stream), context);
```
### Advanced Configuration
```rust
// With custom jail sequences
let jail = JailedStream::builder()
.jail_start_sequence("<TOOLCALL>")
.jail_end_sequence("</TOOLCALL>")
.tool_call_parser("nemotron_deci")
.build();
// With multiple sequences
let jail = JailedStream::builder()
.jail_start_sequences(vec!["<TOOLCALL>", "<FUNCTION>"])
.jail_end_sequences(vec!["</TOOLCALL>", "</FUNCTION>"])
.tool_call_parser("harmony")
.build();
```
## How It Works
1. **Detection**: When a jail start sequence (or tool call start) is detected, the stream enters "jail" mode
2. **Accumulation**: While jailed, tokens are accumulated in memory instead of being yielded
3. **Annotations**: Empty chunks with annotations are sent downstream for observability
4. **Release**: When a jail end sequence is detected OR the stream ends:
- Accumulated content is parsed for tool calls
- A single chunk with the parsed content is yielded
5. **Pass-through**: Non-jailed content passes through unchanged
## Testing
The implementation includes comprehensive tests:
- `test_jailed_stream_with_start_end_sequences`: Tests explicit jail sequences
- `test_jailed_stream_with_tool_calls`: Tests tool call detection and parsing
- `test_jailed_stream_no_jailing`: Tests normal pass-through behavior
Run tests with:
```bash
cargo test -p dynamo-llm jail --lib
```
## Benefits
1. **Standalone**: No modifications to existing code required
2. **Clean API**: Builder pattern makes configuration intuitive
3. **Flexible**: Supports multiple jail detection strategies
4. **Maintainable**: Uses `stream!` macro for cleaner async code
5. **Testable**: Comprehensive test suite with shared utilities
6. **Efficient**: No unnecessary boxing or context handling in the library
7. **Composable**: Can chain multiple stream transformers before re-adding context
## Performance Optimizations
- **No Boxing in Library**: Returns `impl Stream` instead of `Pin<Box<ResponseStream>>`
- **Stack Pinning**: Uses `tokio::pin!()` instead of `Box::pin()` for better performance
- **No Context Overhead**: JailedStream doesn't manage AsyncEngineContext
- **Lazy Evaluation**: Only processes what's needed
- **Efficient State Management**: Minimal cloning, only when entering jail state
## Integration Options
To replace the existing `apply_tool_calling_jail_internal` function:
```rust
// In preprocessor.rs
pub fn apply_tool_calling_jail_with_parser(
&self,
stream: ManyOut<Annotated<NvCreateChatCompletionStreamResponse>>,
) -> ManyOut<Annotated<NvCreateChatCompletionStreamResponse>> {
let jail = JailedStream::builder()
.tool_call_parser(self.tool_call_parser.clone())
.build();
jail.apply(stream)
}
```
## Future Enhancements
- Add support for regex patterns for jail sequences
- Add metrics/telemetry for jail detection
- Support for partial sequence matching across chunk boundaries
- Configurable accumulation limits
- Support for nested jails
\ No newline at end of file
<!--
SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
SPDX-License-Identifier: Apache-2.0
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
# Dynamo Runtime
<h4>A Datacenter Scale Distributed Inference Serving Framework</h4>
[![License](https://img.shields.io/badge/License-Apache_2.0-blue.svg)](https://opensource.org/licenses/Apache-2.0)
Rust implementation of the Dynamo runtime system, enabling distributed computing capabilities for machine learning workloads.
## Prerequisites
### Install Rust and Cargo using [rustup](https://rustup.rs/):
```bash
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh
```
### Build
```
cargo build
cargo test
```
### Start Dependencies
#### Docker Compose
The simplest way to deploy the pre-requisite services is using
[docker-compose](https://docs.docker.com/compose/install/linux/),
defined in [deploy/docker-compose.yml](../../deploy/docker-compose.yml).
```
# At the root of the repository:
docker compose -f deploy/docker-compose.yml up -d
```
This will deploy a [NATS.io](https://nats.io/) server and an [etcd](https://etcd.io/)
server used to communicate between and discover components at runtime.
#### Local (alternate)
To deploy the pre-requisite services locally instead of using `docker-compose`
above, you can manually launch each:
- [NATS.io](https://docs.nats.io/running-a-nats-service/introduction/installation) server with [Jetstream](https://docs.nats.io/nats-concepts/jetstream)
- example: `nats-server -js --trace`
- [etcd](https://etcd.io) server
- follow instructions in [etcd installation](https://etcd.io/docs/v3.5/install/) to start an `etcd-server` locally
### Run Examples
When developing or running examples, any process or user that shared your core-services (`etcd` and `nats.io`) will
be operating within your distributed runtime.
The current examples use a hard-coded `namespace`. We will address the `namespace` collisions later.
Most examples require `etcd` for service discovery. `nats.io` is required for KV-aware routing with event tracking; for approximate mode (`--no-kv-events`), NATS is optional.
#### Rust `hello_world`
With two terminals open, in one window:
```
cd examples/hello_world
cargo run --bin server
```
In the second terminal, execute:
```
cd examples/hello_world
cargo run --bin client
```
which should yield some output similar to:
```
Finished `dev` profile [unoptimized + debuginfo] target(s) in 6.25s
Running `target/debug/client`
Annotated { data: Some("h"), id: None, event: None, comment: None }
Annotated { data: Some("e"), id: None, event: None, comment: None }
Annotated { data: Some("l"), id: None, event: None, comment: None }
Annotated { data: Some("l"), id: None, event: None, comment: None }
Annotated { data: Some("o"), id: None, event: None, comment: None }
Annotated { data: Some(" "), id: None, event: None, comment: None }
Annotated { data: Some("w"), id: None, event: None, comment: None }
Annotated { data: Some("o"), id: None, event: None, comment: None }
Annotated { data: Some("r"), id: None, event: None, comment: None }
Annotated { data: Some("l"), id: None, event: None, comment: None }
Annotated { data: Some("d"), id: None, event: None, comment: None }
```
#### Python
See the [README.md](../../lib/runtime/lib/bindings/python/README.md) for details
The Python and Rust `hello_world` client and server examples are interchangeable,
so you can start the Python `server.py` and talk to it from the Rust `client`.
direction: right
aggregated: Aggregated {
width: 600
height: 450
frontend: Frontend {
width: 180
height: 60
style.font-size: 20
}
router: Router {
width: 180
height: 60
style.font-size: 20
}
w1: "W1 (TP2)" {
width: 180
height: 60
style.font-size: 20
}
w2: "W2 (TP2)" {
width: 180
height: 60
style.font-size: 20
}
w3: "W3 (TP2)" {
width: 180
height: 60
style.font-size: 20
}
w4: "W4 (TP2)" {
width: 180
height: 60
style.font-size: 20
}
frontend -> router
router -> w1
router -> w2
router -> w3
router -> w4
note: |md
Each worker handles both prefill and decode.
|
note.style.font-size: 18
}
disaggregated: Disaggregated {
width: 600
height: 450
frontend: Frontend {
width: 180
height: 60
style.font-size: 20
}
router: Router {
width: 180
height: 60
style.font-size: 20
}
p1: "Prefill 1 (TP2)" {
width: 220
height: 60
style.font-size: 20
}
p2: "Prefill 2 (TP2)" {
width: 220
height: 60
style.font-size: 20
}
decode: "Decode (TP4)" {
width: 220
height: 60
style.font-size: 20
}
frontend -> router
router -> p1
router -> p2
p1 -> decode: "KV Cache via RDMA"
p2 -> decode: "KV Cache via RDMA"
note: |md
Prefill and decode on separate workers.
|
note.style.font-size: 18
}
aggregated.style.font-size: 24
disaggregated.style.font-size: 24
direction: down
q1: "AIC shows disagg > agg\nthroughput?" {
shape: diamond
}
q2: "RDMA available\nin cluster?" {
shape: diamond
}
q3: "ISL/OSL ratio > 8:1?" {
shape: diamond
}
q4: "Disagg > 20%\nfaster?" {
shape: diamond
}
agg: "AGGREGATED\nSimpler, no RDMA needed" {
shape: rectangle
width: 380
height: 80
}
disagg: "DISAGGREGATED\nHigher throughput, needs RDMA" {
shape: rectangle
width: 380
height: 80
}
q1 -> q2: Yes
q1 -> agg: No
q2 -> q3: Yes
q2 -> agg: No
q3 -> disagg: Yes
q3 -> q4: No
q4 -> disagg: Yes
q4 -> agg: No
direction: right
install: {
label: "Install"
shape: rectangle
install_detail: |md
pip3 install
aiconfigurator
|
}
configure: {
label: "Configure"
shape: rectangle
configure_detail: |md
Model, GPUs,
SLA targets
|
}
compare: {
label: "Compare"
shape: rectangle
compare_detail: |md
Agg vs disagg
rankings
|
}
deploy: {
label: "Deploy"
shape: rectangle
deploy_detail: |md
Apply DGD
manifest
|
}
validate: {
label: "Validate"
shape: rectangle
validate_detail: |md
Benchmark
with AIPerf
|
}
install -> configure
configure -> compare
compare -> deploy
deploy -> validate
direction: down
coord: "Coordination Layer" {
style.font-size: 32
direction: right
sd: "Service Discovery" {
style.font-size: 28
k8s: "K8s: CRDs + API" {
style.font-size: 22
shape: rectangle
}
bm: "Bare metal: etcd" {
style.font-size: 22
shape: rectangle
}
}
nats: "NATS (Optional)" {
style.font-size: 28
kv: "KV Cache Events" {
style.font-size: 22
shape: rectangle
}
rs: "Router Replica Sync" {
style.font-size: 22
shape: rectangle
}
js: "JetStream Persistence" {
style.font-size: 22
shape: rectangle
}
}
}
frontend: Frontend {
style.font-size: 28
shape: rectangle
}
plan: Planner {
style.font-size: 28
shape: rectangle
}
worker: Worker {
style.font-size: 28
shape: rectangle
}
coord -> frontend
coord -> plan
coord -> worker
direction: right
dr: "DistributedRuntime" {
style.font-size: 28
ns: "• Namespace" {
style.font-size: 22
shape: text
}
comp: "• Components" {
style.font-size: 22
shape: text
}
ep: "• Endpoints" {
style.font-size: 22
shape: text
}
}
lease: "Primary Lease\nTTL: 10s" {
style.font-size: 24
shape: rectangle
style.bold: true
}
etcd: etcd {
style.font-size: 28
shape: cylinder
}
dr -> lease
lease -> etcd: "Keep-Alive\nHeartbeat" {
style.font-size: 22
}
direction: right
aic: "AIC Output" {
shape: rectangle
c1: "concurrency: 56 (=14x4)" {
width: 280
}
c2: "ISL: 4000, OSL: 500" {
width: 280
}
c3: "Model: Qwen3-32B-FP8" {
width: 280
}
c4: "concurrency x ~14" {
width: 280
}
c5: "(best practice)" {
width: 280
}
}
aiperf: "AIPerf Argument" {
shape: rectangle
a1: "--concurrency 56" {
width: 320
}
a2: "--isl 4000 --osl 500" {
width: 320
}
a3: "-m Qwen/Qwen3-32B-FP8" {
width: 320
}
a4: "--num-requests 800" {
width: 320
}
a5: "--extra-inputs \"ignore_eos:true\"" {
width: 320
}
}
aic.c1 -> aiperf.a1
aic.c2 -> aiperf.a2
aic.c3 -> aiperf.a3
aic.c4 -> aiperf.a4
aic.c5 -> aiperf.a5
direction: down
planner: "Planner Component" {
style.font-size: 32
inputs: {
direction: right
style.border-radius: 8
mc: "Metric Collector\n(Prometheus)" {
style.font-size: 24
shape: rectangle
}
lp: "Load Predictor\n(ARIMA / Kalman / Prophet)" {
style.font-size: 24
shape: rectangle
}
pi: "Performance Interpolator\n(NPZ profiling data)" {
style.font-size: 24
shape: rectangle
}
}
sa: "Scaling Algorithm" {
style.font-size: 28
shape: rectangle
style.bold: true
}
connector: "Connector Layer" {
style.font-size: 28
direction: right
kc: "KubernetesConnector\n(PATCH DGD)" {
style.font-size: 24
shape: rectangle
}
vc: "VirtualConnector\n(Runtime bridge)" {
style.font-size: 24
shape: rectangle
}
}
inputs.mc -> sa
inputs.lp -> sa
inputs.pi -> sa
sa -> connector
}
../../examples/README.md
\ No newline at end of file
../../../../examples/custom_backend/hello_world/README.md
\ No newline at end of file
# SPDX-FileCopyrightText: Copyright (c) 2023-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License
README.md
\ No newline at end of file
<!--
SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
SPDX-License-Identifier: Apache-2.0
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
# Fault Tolerance
Dynamo provides comprehensive fault tolerance mechanisms to ensure reliable LLM inference in production deployments. This section covers the various strategies and features that enable Dynamo to handle failures gracefully and maintain service availability.
## Overview
Fault tolerance in Dynamo operates at multiple levels:
| Layer | Mechanism | Purpose |
|-------|-----------|---------|
| **Request** | Migration, Cancellation | Handle in-flight request failures |
| **Worker** | Health Checks, Graceful Shutdown | Detect and recover from worker failures |
| **System** | Load Shedding, Request Rejection | Prevent system overload |
| **Infrastructure** | etcd HA, NATS resilience | Handle infrastructure component failures |
## Key Features
### Request Migration
When a worker fails during request processing, Dynamo can migrate in-progress requests to healthy workers. The migration system:
- Preserves partial generation state (accumulated tokens)
- Transparently continues generation on a new worker
- Maintains seamless token flow to clients
See [Request Migration](request_migration.md) for details.
### Request Cancellation
Dynamo supports canceling in-flight requests to free computational resources:
- Graceful stop signals for clean termination
- Kill signals for immediate termination
- Hierarchical cancellation propagation through request chains
See [Request Cancellation](request_cancellation.md) for details.
### Graceful Shutdown
Workers handle shutdown signals (SIGTERM/SIGINT) gracefully:
- Immediately stop accepting new requests
- Optionally drain in-flight requests before terminating
- Clean up resources (engines, connections, temp files)
See [Graceful Shutdown](graceful_shutdown.md) for details.
### Request Rejection (Load Shedding)
When workers are overloaded, Dynamo rejects new requests to prevent cascading failures:
- Configurable busy thresholds based on KV cache utilization
- Real-time worker load monitoring
- HTTP 503 responses with retry guidance
See [Request Rejection](request_rejection.md) for details.
### Health Checks
Dynamo provides multiple health check mechanisms:
- **HTTP Endpoints**: `/health` and `/live` endpoints for orchestration
- **Canary Health Checks**: Active monitoring via periodic test requests
- **Engine Monitoring**: Automatic shutdown on engine failure detection
See [Health Checks](../observability/health-checks.md) for details.
## Configuration Quick Reference
| Feature | Environment Variable | Default |
|---------|---------------------|---------|
| Worker health port | `DYN_SYSTEM_PORT` | `9090` |
| Canary health checks | `DYN_HEALTH_CHECK_ENABLED` | `false` (K8s: `true`) |
| Canary wait time | `DYN_CANARY_WAIT_TIME` | `10` seconds |
| Health check timeout | `DYN_HEALTH_CHECK_REQUEST_TIMEOUT` | `3` seconds |
| Decode blocks threshold | `--active-decode-blocks-threshold` | None (disabled) |
| Prefill tokens threshold | `--active-prefill-tokens-threshold` | None (disabled) |
## Failure Scenarios and Recovery
### Worker Pod Restart
1. Worker receives SIGTERM from Kubernetes
2. Endpoints are immediately invalidated (no new requests)
3. In-flight requests complete or migrate (based on configuration)
4. Resources are cleaned up
5. Pod restarts with fresh state
### Worker Crash (Unexpected)
1. etcd lease expires (TTL-based detection)
2. Client discovers endpoint removal via etcd watch
3. New requests route to remaining healthy workers
4. In-flight requests on crashed worker are migrated (if enabled)
### Network Partition
1. Worker loses connectivity to etcd/NATS
2. Lease keep-alive fails, lease eventually expires
3. Worker is removed from service discovery
4. Traffic reroutes to reachable workers
### GPU Failure
1. Engine health check detects GPU error (XID, OOM, etc.)
2. Worker initiates graceful shutdown
3. Runtime is shut down, engine cleaned up
4. Process exits with code 1 for pod restart
## Testing Fault Tolerance
Dynamo includes a comprehensive testing framework for validating fault tolerance:
- Request cancellation tests
- Migration tests with worker failures
- etcd HA failover tests
- Hardware fault injection (GPU XID, network partitions)
See [Fault Tolerance Testing](testing.md) for details.
## Related Documentation
- [Observability](../observability/README.md) - Metrics and monitoring
- [Distributed Runtime](../design_docs/distributed_runtime.md) - Service discovery architecture
- [Event Plane](../design_docs/event_plane.md) - etcd and NATS coordination
```{toctree}
:hidden:
Request Migration <request_migration>
Request Cancellation <request_cancellation>
Graceful Shutdown <graceful_shutdown>
Request Rejection <request_rejection>
Testing <testing>
```
<!--
SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
SPDX-License-Identifier: Apache-2.0
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
# Graceful Shutdown
This document describes how Dynamo components handle shutdown signals to ensure in-flight requests complete successfully and resources are properly cleaned up.
## Overview
Graceful shutdown in Dynamo ensures that:
1. **No new requests are accepted** - Endpoints are immediately invalidated
2. **In-flight requests complete** - Existing requests finish processing (configurable)
3. **Resources are cleaned up** - Engines, connections, and temporary files are released
4. **Pods restart cleanly** - Exit codes signal Kubernetes for proper restart behavior
## Signal Handling
All Dynamo components handle Unix signals for graceful shutdown:
| Signal | Trigger | Behavior |
|--------|---------|----------|
| `SIGTERM` | Kubernetes pod termination | Graceful shutdown initiated |
| `SIGINT` | Ctrl+C / manual interrupt | Graceful shutdown initiated |
### Implementation
Each component registers signal handlers at startup:
```python
def signal_handler():
asyncio.create_task(graceful_shutdown(runtime))
for sig in (signal.SIGTERM, signal.SIGINT):
loop.add_signal_handler(sig, signal_handler)
```
The `graceful_shutdown()` function:
1. Logs the shutdown signal
2. Calls `runtime.shutdown()` to invalidate endpoints
3. Waits for in-flight requests (based on configuration)
4. Returns to allow cleanup to proceed
## Endpoint Draining
When `runtime.shutdown()` is called, endpoints are immediately invalidated so no new requests are accepted. The behavior for in-flight requests depends on the `graceful_shutdown` parameter when serving the endpoint.
### Configuration
When registering an endpoint, the `graceful_shutdown` parameter controls draining behavior:
```python
generate_endpoint.serve_endpoint(
handler.generate,
graceful_shutdown=True, # Wait for all requests to finish
metrics_labels=[("model", model_name)],
health_check_payload=health_check_payload,
)
```
| `graceful_shutdown` | Behavior |
|---------------------|----------|
| `True` | Wait for all in-flight requests to complete before returning |
| `False` | Return immediately without waiting for requests |
### Component-Specific Behavior
| Component | Default Behavior | Rationale |
|-----------|------------------|-----------|
| **Frontend** | N/A (HTTP server) | HTTP server handles its own shutdown |
| **Prefill Workers** | `graceful_shutdown=True` | Prefill operations must complete to avoid wasted computation |
| **Decode Workers** | `graceful_shutdown=True` | Decode operations should complete to avoid wasted computation |
| **Router** | `graceful_shutdown=True` | Ensure routing decisions complete |
### Migration Integration
Backend workers always use `graceful_shutdown=True`, meaning they wait for in-flight requests to complete until the engine is stopped. Request migration is configured at the **frontend** level via `--migration-limit`:
- When migration is enabled at the frontend, disconnected streams from failed workers are automatically retried on healthy workers
- Workers don't need to know about migration configuration - they simply complete their work or signal incomplete streams
- See [Request Migration Architecture](./request_migration.md) for details on how migration works
## Resource Cleanup
After endpoint draining, components clean up their resources in `finally` blocks:
### vLLM Worker Cleanup
```python
finally:
logger.debug("Cleaning up worker")
handler.cleanup()
```
The handler's `cleanup()` method:
- Removes temporary directories (LoRA adapters, etc.)
- Releases engine resources
### SGLang Worker Cleanup
```python
def cleanup(self) -> None:
# Cancel pending consume tasks
for task in self._consume_tasks:
if not task.done():
task.cancel()
self._consume_tasks.clear()
# Shutdown engine
self.engine.shutdown()
```
### TensorRT-LLM Worker Cleanup
```python
async def cleanup(self):
if self._llm:
try:
self._llm.shutdown()
except Exception as e:
logging.error(f"Error during cleanup: {e}")
finally:
self._llm = None
```
## Error-Initiated Shutdown
Workers can initiate graceful shutdown when fatal errors occur:
### Engine Health Monitoring (vLLM)
The `VllmEngineMonitor` continuously checks engine health:
```python
async def _check_engine_health(self):
while True:
try:
await self.engine_client.check_health()
await asyncio.sleep(HEALTH_CHECK_INTERVAL) # 2 seconds
except EngineDeadError as e:
logger.error(f"Health check failed: {e}")
self._shutdown_engine()
self.runtime.shutdown()
os._exit(1)
```
Configuration:
- `HEALTH_CHECK_INTERVAL`: 2 seconds between checks
- `ENGINE_SHUTDOWN_TIMEOUT`: 30 seconds max for engine shutdown
### Fatal Error Handling (TensorRT-LLM)
```python
async def _initiate_shutdown(self, error: Exception):
logging.warning(f"Initiating graceful shutdown due to: {error}")
try:
if self.runtime:
self.runtime.shutdown()
if self.engine:
await self.engine.cleanup()
except Exception as cleanup_error:
logging.error(f"Error during graceful shutdown: {cleanup_error}")
finally:
logging.critical("Forcing process exit for restart")
os._exit(1)
```
## Kubernetes Integration
### Pod Termination Flow
1. Kubernetes sends `SIGTERM` to the pod
2. Dynamo initiates graceful shutdown
3. Pod has `terminationGracePeriodSeconds` to complete (default: 30s)
4. If not terminated, Kubernetes sends `SIGKILL`
### Recommended Configuration
For production deployments, configure adequate termination grace period:
```yaml
apiVersion: nvidia.com/v1alpha1
kind: DynamoGraphDeployment
spec:
services:
VllmWorker:
extraPodSpec:
terminationGracePeriodSeconds: 60 # Allow time for request draining
```
### Health Check Integration
Kubernetes uses health endpoints to determine pod readiness:
- **During shutdown**: Endpoints become unavailable
- **Readiness probe fails**: Traffic stops routing to the pod
- **Graceful draining**: Existing requests complete
## Best Practices
### 1. Set Appropriate Grace Periods
Match `terminationGracePeriodSeconds` to your expected request completion time:
- Short requests (< 10s): 30s grace period
- Long generation (> 30s): 120s+ grace period
### 2. Enable Request Migration
Enable migration at the frontend to allow request recovery when workers shut down:
```bash
python3 -m dynamo.frontend ... --migration-limit 3 # Allow up to 3 migration attempts
```
This allows the frontend to automatically retry disconnected streams on healthy workers.
### 3. Monitor Shutdown Metrics
Track shutdown behavior via logs:
```
INFO Received shutdown signal, shutting down DistributedRuntime
INFO DistributedRuntime shutdown complete
DEBUG Cleaning up worker
```
### 4. Handle Cleanup Errors
Ensure cleanup methods handle errors gracefully:
```python
def cleanup(self):
for resource in self.resources:
try:
resource.cleanup()
except Exception as e:
logger.warning(f"Cleanup failed: {e}")
# Continue with other resources
```
## Related Documentation
- [Request Migration](request_migration.md) - How requests migrate during shutdown
- [Request Cancellation](request_cancellation.md) - Canceling in-flight requests
- [Health Checks](../observability/health-checks.md) - Liveness and readiness probes
# Request Cancellation Architecture
This document describes how Dynamo implements request cancellation to cancel in-flight requests between Dynamo workers. Request cancellation allows in-flight requests to terminate early, saving computational resources that would otherwise be spent on responses that are no longer needed.
## AsyncEngineContext Trait
At the core of Dynamo's request cancellation system is the `AsyncEngineContext` trait. This trait is associated with every request stream and provides lifecycle management for async operations, including stream identification, graceful shutdown capabilities, and immediate termination capabilities.
### Key Methods
#### Identification
- **`id()`**: Returns the unique identifier for the stream. This ID is set by the user for request identification, and the same ID can be used for sub-requests to associate them with the original user request.
#### Status Checking
- **`is_stopped()`**: Returns `true` if graceful cancellation has been requested via `stop_generating()`. This represents a signal to the worker that the request has been cancelled and it should return early.
- **`is_killed()`**: Returns `true` if a hard stop has been issued via `kill()`. This typically indicates that the network connection between client and server has been cut or an immediate termination is required.
#### Async Status Monitoring
- **`stopped()`**: An async method that completes when the context becomes stopped. If already stopped, returns immediately.
- **`killed()`**: An async method that completes when the context becomes killed. If already killed, returns immediately.
#### Cancellation Control
- **`stop_generating()`**: The recommended method for cancelling a request. This informs the engine to stop producing results for the stream gracefully. This method is idempotent and does not invalidate results currently in the stream.
- **`stop()`**: Alias for `stop_generating()`.
- **`kill()`**: Extends `stop_generating()` but also indicates a preference to terminate without draining remaining items in the stream. This is implementation-specific and may not be supported by all engines.
#### Child Request Management
- **`link_child(child: Arc<dyn AsyncEngineContext>)`**: Links a child `AsyncEngineContext` to this context. When `stop_generating()`, `stop()`, or `kill()` is called on the parent context, the same method is automatically called on all linked child contexts in the order they were linked. This is especially useful in disaggregated serving scenarios where a frontend receives cancellation notification and needs to cancel requests to workers, and the worker can then cancel its sub-requests (e.g., remote prefill operations).
### Thread Safety
The `AsyncEngineContext` trait ensures thread-safety with `Send + Sync` bounds, allowing safe concurrent access across multiple threads and async tasks.
## Python Bindings
The `AsyncEngineContext` functionality is exposed to Python through the `Context` class, which provides a largely one-to-one mapping from Rust methods to Python methods.
### Python Context Class
The Python `Context` class wraps the Rust `AsyncEngineContext` and exposes the following methods:
- **`id()`**: Returns the unique identifier for the context
- **`is_stopped()`**: Synchronous method equivalent to the Rust `is_stopped()`
- **`is_killed()`**: Synchronous method equivalent to the Rust `is_killed()`
- **`stop_generating()`**: Issues a stop generating signal, equivalent to the Rust method
- **`async_killed_or_stopped()`**: An async method that completes when the context becomes either killed or stopped, whichever happens first. This combines the functionality of the Rust `killed()` and `stopped()` async methods using `tokio::select!`.
For a working example of request cancellation, see the [cancellation demo](../../examples/custom_backend/cancellation/README.md).
### Context Usage in Python
The context is available optionally in both incoming and outgoing request scenarios:
#### Incoming Requests
For incoming requests, the generate method may optionally accept a `context` argument after the `request` argument. If the `context` parameter is specified in the method signature, it will receive the context object of the incoming request. Request handlers can:
- Check for cancellation synchronously using `context.is_stopped()` before beginning expensive operations
- Listen for cancellation asynchronously using `await context.async_killed_or_stopped()`
Example:
```python
async def generate(self, request, context):
for i in range(1000):
# Check for cancellation before expensive work
if context.is_stopped():
raise asyncio.CancelledError
# Perform work...
await expensive_computation()
yield result
```
#### Outgoing Requests
For outgoing requests, Python scripts may optionally provide a context object to outgoing runtime endpoint client router operations (such as `generate`, `round_robin`, `random`, `direct` methods) as a keyword argument. The script can cancel the outgoing request via the provided context object.
This is especially useful when child outgoing requests need to be cancelled when the parent incoming request is cancelled. In such cases, the script can simply pass the incoming context object to the outgoing request, automatically linking the cancellation behavior.
Example:
```python
async def generate(self, request, context):
# Forward the incoming context to outgoing request
# If the incoming request is cancelled, the outgoing request will be too
stream = await self.client.generate(request, context=context)
async for response in stream:
yield response
```
This design enables seamless cancellation propagation through multi-tier request chains, ensuring that when a client cancels a request, all associated sub-requests are automatically cancelled, saving computational resources across the entire request pipeline.
# Request Migration Architecture
This document describes how Dynamo implements request migration to handle worker failures gracefully during LLM text generation. Request migration allows in-progress requests to continue on different workers when the original worker becomes unavailable, providing fault tolerance and improved user experience.
## Overview
Request migration is implemented through a Migration operator that sits in the LLM processing pipeline between the Backend operator and the service backend. When a worker fails during request processing, the migration system preserves the partial generation state and recreates the request on a new worker to continue from where the previous worker left off.
## Architecture Components
### Migrator
The migration system is integrated into the LLM processing pipeline between the frontend preprocessing and the actual service backends. This positioning allows it to intercept all communication flows and manage failure scenarios transparently.
Key responsibilities:
- Intercepts all requests and responses flowing through the pipeline
- Detects worker failure scenarios through error pattern matching
- Manages retry logic with configurable migration limits
- Tracks partial response state for seamless continuation
### Migration Limit Configuration
The migration limit is configured at the **frontend** level and applies globally to all models served by that frontend. This parameter specifies the maximum number of times a request can be migrated to another worker:
- Default behavior: no migration allowed (migration_limit=0)
- Set via `--migration-limit` flag on the frontend
- Applies to all models served by the frontend
## Token State Tracking and Request Migration
The core of the migration system is the ability to preserve and continue partial generations through token state management. This ensures that when a worker fails mid-generation, the new worker can seamlessly continue from the exact point of failure.
### Token Accumulation Process
When a request is being processed and responses are flowing back from a worker, the migration system tracks every token that has been successfully generated:
1. **Initial Request State**: The system starts with the original preprocessed request containing the initial prompt tokens.
2. **Response Tracking**: As each response arrives from the worker, the migration system extracts the newly generated tokens and appends them to the request's token sequence. This creates accumulates all tokens that have been generated.
3. **Token Count Management**: The system also updates the remaining token budget to reflect the number of tokens already generated, ensuring that the total generation stays within the originally requested limits.
### Migration Trigger Scenarios
The migration system handles two distinct failure scenarios:
#### 1. New Request Migration (Initial Connection Failure)
**Scenario**: Worker is unreachable when creating the initial connection.
**Error Pattern**: Communication system reports chosen worker instance is unavailable.
**Migration Process**:
- Detects connection failure during initial stream setup
- Decrements migration retry count
- Attempts to create a new stream with the original request
- No partial state to preserve since generation hasn't started
#### 2. Ongoing Request Migration (Mid-Stream Disconnection)
**Scenario**: Connection lost during active generation after partial responses have been received.
**Error Pattern**: Stream termination detected before generation completion.
**Migration Process**:
1. **Failure Detection**: The system detects the stream disconnection through error monitoring.
2. **State Preservation**: At this point, the request's token sequence contains both the original prompt tokens and all successfully generated tokens from the failed worker.
3. **New Stream Creation**: A fresh stream is created with the accumulated request state, ensuring the new worker has complete context.
4. **Continuation**: The new worker receives the request with the full token context and continues generation from the exact point where the previous worker left off.
### Seamless Token Flow and Request State Evolution
From the client's perspective, the token stream appears continuous and uninterrupted. The client receives tokens from the first worker until failure occurs, then seamlessly continues receiving tokens from the backup worker without any indication of the underlying migration.
The request state evolves dynamically during processing. Initially, the request contains only the original prompt tokens. As generation proceeds, each successfully generated token is appended to the request's token sequence, creating a growing record of the complete conversation context.
When a migration occurs, this accumulated state is transferred to the new worker, which uses it to reconstruct the complete context. The new worker then continues generation as if it had been processing the request from the beginning, but starting from the current position in the sequence.
The migration is transparent because:
1. No tokens are lost or duplicated during the transition
2. The new worker has complete context via the accumulated token sequence
3. Generation continues from the exact failure point
4. Response streaming maintains consistent format and timing
This token accumulation mechanism ensures that migrations are truly seamless, preserving all computational work and maintaining generation quality across worker transitions.
## Benefits
1. **Fault Tolerance**: System continues operating during individual worker failures
2. **Resource Efficiency**: Partial generations are preserved rather than restarted
3. **Seamless User Experience**: Users experience no interruption during worker failures
4. **Configurable Behavior**: Migration limits allow tuning based on deployment requirements
5. **No Token Loss**: Complete preservation of generation state across migrations
## Design Considerations
The migration system is designed with several important architectural considerations:
**Multi-Model Support**: Since a frontend may serve multiple models simultaneously, the migration limit is configured at the frontend level and applies uniformly to all models, simplifying operational management.
**State Management**: The system carefully tracks not only token sequences but also metadata such as remaining token budgets, stop conditions, and sampling parameters to ensure complete state preservation.
**Error Handling**: The migration system distinguishes between different types of failures and applies appropriate recovery strategies for each scenario.
## Monitoring and Metrics
The migration system exposes Prometheus metrics to monitor migration activity. These metrics are available on the frontend's `/metrics` endpoint (default port 8000):
- `dynamo_frontend_model_migration_total`: Counter tracking the total number of request migrations
- Labels:
- `model`: The model name being served
- `migration_type`: Either `new_request` (initial connection failure) or `ongoing_request` (mid-stream disconnection)
**Example metrics output:**
```
dynamo_frontend_model_migration_total{migration_type="ongoing_request",model="Qwen/Qwen3-0.6B"} 3
dynamo_frontend_model_migration_total{migration_type="new_request",model="Qwen/Qwen3-0.6B"} 1
```
These metrics can be used to:
- Monitor worker reliability and failure patterns
- Alert on excessive migration rates indicating infrastructure issues
- Track the effectiveness of fault tolerance mechanisms
For more information on Dynamo metrics, see the [Metrics documentation](../observability/metrics.md).
## Operational Impact
Request migration fundamentally changes how the system handles failures, moving from a "fail-fast" approach to a "graceful degradation" model. This architectural shift enables higher availability and better resource utilization while maintaining the same external API contract for clients.
<!--
SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
SPDX-License-Identifier: Apache-2.0
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
# Request Rejection (Load Shedding)
This document describes how Dynamo implements request rejection to prevent system overload and maintain service stability under high load conditions.
## Overview
Request rejection (also known as load shedding) is a fault tolerance mechanism that proactively rejects new requests when workers are overloaded. This prevents:
- Cascading failures from resource exhaustion
- Degraded latency for all requests
- Out-of-memory conditions on GPU workers
When all workers exceed their configured busy thresholds, new requests receive an HTTP 503 (Service Unavailable) response, signaling clients to retry later.
## Architecture
```
┌─────────────────┐
│ Worker Monitor │
│ (Background) │
└────────┬────────┘
│ Updates busy list
┌──────────┐ ┌──────────┐ ┌─────────────────────┐ ┌──────────┐
│ Client │───▶│ Frontend │───▶│ Push Router │───▶│ Worker │
└──────────┘ └──────────┘ │ (checks busy list) │ └──────────┘
└─────────────────────┘
│ If all workers busy
┌─────────────────────┐
│ HTTP 503 Error │
│ "All workers busy" │
└─────────────────────┘
```
## Configuration
### Frontend Arguments
Configure busy thresholds when starting the frontend:
```bash
python -m dynamo.frontend \
--active-decode-blocks-threshold 0.85 \
--active-prefill-tokens-threshold 10000
```
| Argument | Type | Description |
|----------|------|-------------|
| `--active-decode-blocks-threshold` | float (0.0-1.0) | KV cache block utilization threshold |
| `--active-prefill-tokens-threshold` | int | Prefill token count threshold |
### Dynamic Configuration via API
Thresholds can be adjusted at runtime via the `/busy_threshold` endpoint:
#### Set Thresholds
```bash
curl -X POST http://localhost:8000/busy_threshold \
-H "Content-Type: application/json" \
-d '{
"model": "Qwen/Qwen3-0.6B",
"active_decode_blocks_threshold": 0.85,
"active_prefill_tokens_threshold": 10000
}'
```
#### Get Current Thresholds
```bash
curl http://localhost:8000/busy_threshold
```
Response:
```json
{
"thresholds": [
{
"model": "Qwen/Qwen3-0.6B",
"active_decode_blocks_threshold": 0.85,
"active_prefill_tokens_threshold": 10000
}
]
}
```
## Busy Detection Logic
Workers are marked as "busy" based on a dual-threshold system. A worker is considered busy when **either** threshold is exceeded.
### KV Cache Block Threshold
Monitors the percentage of KV cache blocks in use:
```
busy = active_decode_blocks / kv_total_blocks > threshold
```
Example: With `active_decode_blocks_threshold=0.85`, a worker using 87% of its KV cache blocks is marked busy.
### Prefill Token Threshold
Monitors the number of tokens currently being prefilled:
```
busy = active_prefill_tokens > threshold
```
Example: With `active_prefill_tokens_threshold=10000`, a worker prefilling 12,000 tokens is marked busy.
### Data-Parallel Rank Aggregation
For workers with multiple data-parallel ranks (tensor parallelism), the worker is only marked busy if **ALL** ranks are busy:
```python
def is_busy(worker):
return all(rank.is_busy() for rank in worker.dp_ranks)
```
This prevents false positives when only some ranks are temporarily loaded.
## Worker Load Monitoring
The `KvWorkerMonitor` runs as a background task that:
1. Subscribes to KV cache metrics events from workers
2. Maintains load state for each worker instance
3. Recalculates busy instances when metrics change
4. Updates the router with the current busy list
### Metrics Collected
Workers publish these metrics for monitoring:
| Metric | Description |
|--------|-------------|
| `active_decode_blocks` | Number of KV cache blocks currently in use |
| `kv_total_blocks` | Total KV cache blocks available |
| `active_prefill_tokens` | Number of tokens currently being prefilled |
## Rejection Behavior
### Request Flow
1. Request arrives at frontend
2. Push router checks if busy threshold is configured
3. If configured, router retrieves list of free (non-busy) instances
4. If no free instances exist (but instances are registered):
- Request is rejected with `PipelineError::ServiceOverloaded`
- HTTP 503 response is returned to client
### Error Response
When requests are rejected, clients receive:
```http
HTTP/1.1 503 Service Unavailable
Content-Type: application/json
{
"message": "Service temporarily unavailable: All workers are busy, please retry later",
"type": "service_unavailable",
"code": 503
}
```
### Client Retry Strategy
Clients should implement exponential backoff when receiving 503 responses:
```python
import time
import random
def send_with_retry(request, max_retries=5):
for attempt in range(max_retries):
response = send_request(request)
if response.status_code != 503:
return response
# Exponential backoff with jitter
wait_time = min(60, (2 ** attempt) + random.uniform(0, 1))
time.sleep(wait_time)
raise Exception("Max retries exceeded")
```
## Monitoring
### Prometheus Metrics
Track rejection behavior with these metrics:
| Metric | Type | Description |
|--------|------|-------------|
| `dynamo_tasks_rejected_total` | Counter | Total number of rejected tasks |
| `dynamo_queued_requests` | Gauge | Requests waiting in HTTP queue |
### Example Prometheus Queries
```promql
# Rejection rate over 5 minutes
rate(dynamo_tasks_rejected_total[5m])
# Percentage of requests rejected
sum(rate(dynamo_tasks_rejected_total[5m])) /
sum(rate(dynamo_tasks_issued_total[5m])) * 100
```
### Grafana Alerting
Example alert for high rejection rate:
```yaml
alert: HighRequestRejectionRate
expr: |
sum(rate(dynamo_tasks_rejected_total[5m])) /
sum(rate(dynamo_tasks_issued_total[5m])) > 0.1
for: 5m
labels:
severity: warning
annotations:
summary: "High request rejection rate"
description: "More than 10% of requests are being rejected"
```
## Tuning Thresholds
### Conservative Settings (Latency-Focused)
For applications prioritizing low latency:
```bash
--active-decode-blocks-threshold 0.70
--active-prefill-tokens-threshold 5000
```
- Rejects earlier, before workers become fully loaded
- Maintains lower queue depths
- Better tail latencies
### Aggressive Settings (Throughput-Focused)
For applications prioritizing throughput:
```bash
--active-decode-blocks-threshold 0.95
--active-prefill-tokens-threshold 20000
```
- Allows higher worker utilization
- May increase latency variability
- Better overall throughput
### Disabled (No Rejection)
To disable request rejection entirely:
```bash
# Simply don't set the threshold arguments
python -m dynamo.frontend
```
Without thresholds configured, all requests are accepted regardless of worker load.
## Best Practices
### 1. Start Conservative, Then Tune
Begin with conservative thresholds and increase based on observed behavior:
```bash
# Start here
--active-decode-blocks-threshold 0.75
# Increase if rejection rate is too high
--active-decode-blocks-threshold 0.85
```
### 2. Monitor Before Enabling
Observe worker load patterns before setting thresholds:
```bash
# Watch KV cache utilization
watch -n 1 'curl -s localhost:8000/metrics | grep kv_blocks'
```
### 3. Use Both Thresholds for Disaggregated Serving
In disaggregated deployments:
- Use `active_prefill_tokens_threshold` for prefill workers
- Use `active_decode_blocks_threshold` for decode workers
### 4. Coordinate with Autoscaling
If using Kubernetes HPA, ensure rejection thresholds trigger before autoscaling:
```yaml
# HPA triggers at 70% utilization
# Rejection at 85% provides buffer
--active-decode-blocks-threshold 0.85
```
## Related Documentation
- [Health Checks](../observability/health-checks.md) - Worker health monitoring
- [Metrics](../observability/metrics.md) - Available Prometheus metrics
- [Request Migration](request_migration.md) - Handling failed requests
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment