"vscode:/vscode.git/clone" did not exist on "f56483cbac97db1919b97e6a490cbc22ab3edb8a"
Unverified Commit 8ebfd1e4 authored by Hongkuan Zhou's avatar Hongkuan Zhou Committed by GitHub
Browse files

feat: global router for hierarchical planner (#5697)


Signed-off-by: default avatarhongkuanz <hongkuanz@nvidia.com>
parent 903f8184
...@@ -102,6 +102,7 @@ deploy: ...@@ -102,6 +102,7 @@ deploy:
planner: planner:
- 'components/src/dynamo/planner/**' - 'components/src/dynamo/planner/**'
- 'tests/planner/**' - 'tests/planner/**'
- 'components/src/dynamo/global_router/**'
vllm: vllm:
- 'container/Dockerfile.vllm' - 'container/Dockerfile.vllm'
......
...@@ -26,6 +26,8 @@ CODEOWNERS @ai-dynamo/Devops ...@@ -26,6 +26,8 @@ CODEOWNERS @ai-dynamo/Devops
# Planner # Planner
/components/src/dynamo/planner/ @ai-dynamo/python-codeowners @ai-dynamo/Devops /components/src/dynamo/planner/ @ai-dynamo/python-codeowners @ai-dynamo/Devops
/components/src/dynamo/global_router/ @ai-dynamo/python-codeowners @ai-dynamo/Devops
/examples/hierarchical_planner/ @ai-dynamo/python-codeowners @ai-dynamo/Devops
/benchmarks/profiler/ @ai-dynamo/python-codeowners @ai-dynamo/Devops /benchmarks/profiler/ @ai-dynamo/python-codeowners @ai-dynamo/Devops
/tests/planner/ @ai-dynamo/python-codeowners @ai-dynamo/Devops /tests/planner/ @ai-dynamo/python-codeowners @ai-dynamo/Devops
......
<!--
SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
SPDX-License-Identifier: Apache-2.0
-->
# Global Router
A hierarchical routing service that sits between the Dynamo frontend and local routers in different pool namespaces. The global router enables disaggregated serving with flexible pool selection based on request characteristics.
## Overview
The Global Router acts as both a prefill and decode worker from the frontend's perspective:
- Registers with `ModelType.Prefill` for prefill requests
- Registers with `ModelType.Chat | ModelType.Completions` for decode requests
Internally, it routes requests to local routers in different namespaces based on a configurable grid-based selection strategy.
## Supported Backends
- **vLLM** - Uses synchronous prefill path (frontend waits for prefill to complete)
- **Mocker** - Uses same synchronous path as vLLM
**Not supported:**
- **SGLang** - Bootstrap path (async KV transfer) not implemented
- **TensorRT-LLM** - Bootstrap path not implemented
## Architecture
```
Frontend
|
v
Global Router (registers as both prefill + decode)
|
+---> Prefill Pool 0 (namespace: prefill_pool_0)
| |
| +---> Local Router ---> Prefill Worker 0
| |
| +---> Prefill Worker 1
| |
| +---> ...
+---> Prefill Pool ...
|
+---> Decode Pool 0 (namespace: decode_pool_0)
| |
| +---> Local Router ---> Decode Worker 0
| |
| +---> Decode Worker 1
| |
| +---> ...
+---> Decode Pool ...
```
## Usage
```bash
python -m dynamo.global_router \
--config path/to/global_router_config.json \
--model-name Qwen/Qwen3-0.6B \
--namespace dynamo
```
### Arguments
| Argument | Required | Default | Description |
|----------|----------|---------|-------------|
| `--config` | Yes | - | Path to JSON configuration file |
| `--model-name` | Yes | - | Model name for registration (must match workers) |
| `--namespace` | No | `DYN_NAMESPACE` env var or "dynamo" | Namespace for global router |
| `--component-name` | No | "global_router" | Component name |
| `--default-ttft-target` | No | None | Default TTFT target (ms) for prefill pool selection |
| `--default-itl-target` | No | None | Default ITL target (ms) for decode pool selection |
## Configuration
The configuration file defines pool namespaces and selection strategies:
```jsonc
{
"num_prefill_pools": <int>, // Number of prefill pools
"num_decode_pools": <int>, // Number of decode pools
"prefill_pool_dynamo_namespaces": [], // List of Dynamo namespaces for each prefill pool
"decode_pool_dynamo_namespaces": [], // List of Dynamo namespaces for each decode pool
"prefill_pool_selection_strategy": {
"isl_min": <int>, // Minimum input sequence length (tokens)
"isl_max": <int>, // Maximum input sequence length (tokens)
"isl_resolution": <int>, // Number of grid rows for ISL dimension
"ttft_min": <float>, // Minimum TTFT target (ms)
"ttft_max": <float>, // Maximum TTFT target (ms)
"ttft_resolution": <int>, // Number of grid columns for TTFT dimension
"prefill_pool_mapping": [[]] // 2D array [isl_resolution][ttft_resolution] -> pool index
},
"decode_pool_selection_strategy": {
"context_length_min": <int>, // Minimum context length (tokens)
"context_length_max": <int>, // Maximum context length (tokens)
"context_length_resolution": <int>, // Number of grid rows for context length
"itl_min": <float>, // Minimum ITL target (ms)
"itl_max": <float>, // Maximum ITL target (ms)
"itl_resolution": <int>, // Number of grid columns for ITL dimension
"decode_pool_mapping": [[]] // 2D array [context_length_resolution][itl_resolution] -> pool index
}
}
```
### Pool Selection
The pool selection uses a 2D grid lookup. Each dimension is divided into buckets based on the resolution.
**Prefill Pool Selection** (based on ISL and TTFT target):
1. Compute `isl_step = (isl_max - isl_min) / isl_resolution`
2. Compute `ttft_step = (ttft_max - ttft_min) / ttft_resolution`
3. For a request with input sequence length `ISL` and target TTFT:
- `isl_idx = clamp((ISL - isl_min) / isl_step, 0, isl_resolution - 1)`
- `ttft_idx = clamp((ttft_target - ttft_min) / ttft_step, 0, ttft_resolution - 1)`
4. Lookup pool: `pool_index = prefill_pool_mapping[isl_idx][ttft_idx]`
**Decode Pool Selection** (based on context length and ITL target):
Same logic but using `context_length` and `itl_target` with `decode_pool_mapping`.
**Example**: With `isl_min=0`, `isl_max=32000`, `isl_resolution=2`:
- ISL in [0, 16000) → `isl_idx = 0`
- ISL in [16000, 32000] → `isl_idx = 1`
If `prefill_pool_mapping = [[0, 1], [0, 1]]` and `ttft_resolution=2`:
- Low ISL + Low TTFT target → pool 0
- Low ISL + High TTFT target → pool 1
- High ISL + Low TTFT target → pool 0
- High ISL + High TTFT target → pool 1
### Passing SLA Targets
Clients can pass TTFT and ITL targets via `extra_args` in the request:
```json
{
"messages": [...],
"extra_args": {
"ttft_target": 100, // Target TTFT in ms for prefill pool selection
"itl_target": 20 // Target ITL in ms for decode pool selection
}
}
```
If not provided, the middle of the configured range is used as default.
## Request Flow
1. Frontend receives request and sends to Global Router (registered as prefill)
2. Global Router selects prefill pool based on (ISL, TTFT_target)
3. Request is forwarded to local router in the selected prefill pool namespace
4. Local router forwards to a prefill worker
5. Prefill response returns with `disaggregated_params`
6. Frontend sends decode request to Global Router (registered as decode)
7. Global Router selects decode pool based on (context_length, ITL_target)
8. Request is forwarded to local router in the selected decode pool namespace
9. Tokens stream back through the chain
## Example
See `examples/hierarchical_planner/` for a complete example with:
- Global router configuration
- Local router setup for each pool
- Mocker workers for testing
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
try:
from ._version import __version__
except Exception:
try:
from importlib.metadata import version as _pkg_version
__version__ = _pkg_version("ai-dynamo")
except Exception:
__version__ = "0.0.0+unknown"
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
"""
Global Router Service for Hierarchical Routing
Usage: python -m dynamo.global_router --config <config.json> --model-name <model>
This service acts as both a prefill and decode worker from the frontend's perspective,
but internally routes requests to local routers in different namespaces based on
a grid-based pool selection strategy.
Key features:
- Registers as BOTH prefill AND decode worker via register_llm()
- Routes prefill requests based on (ISL, TTFT) to prefill pools
- Routes decode requests based on (context_length, ITL) to decode pools
- Connects to local routers in each pool's namespace
"""
import argparse
import asyncio
import logging
import os
import uvloop
from dynamo.llm import ModelInput, ModelType, register_llm
from dynamo.runtime import DistributedRuntime, dynamo_worker
from dynamo.runtime.logging import configure_dynamo_logging
from .handler import GlobalRouterHandler
configure_dynamo_logging()
logger = logging.getLogger(__name__)
DYN_NAMESPACE = os.environ.get("DYN_NAMESPACE", "dynamo")
def parse_args():
"""Parse command-line arguments for the Global Router service."""
parser = argparse.ArgumentParser(
description="Dynamo Global Router Service: Hierarchical routing to prefill/decode pools",
formatter_class=argparse.RawTextHelpFormatter,
)
parser.add_argument(
"--config",
type=str,
required=True,
help="Path to the JSON configuration file defining pool namespaces and selection strategy",
)
parser.add_argument(
"--model-name",
type=str,
required=True,
help="Model name for registration (must match workers)",
)
parser.add_argument(
"--namespace",
type=str,
default=DYN_NAMESPACE,
help=f"Dynamo namespace for the global router (default: {DYN_NAMESPACE})",
)
parser.add_argument(
"--component-name",
type=str,
default="global_router",
help="Component name for the global router (default: global_router)",
)
parser.add_argument(
"--default-ttft-target",
type=float,
default=None,
help="Default TTFT target (ms) for prefill pool selection when SLA not present in request",
)
parser.add_argument(
"--default-itl-target",
type=float,
default=None,
help="Default ITL target (ms) for decode pool selection when SLA not present in request",
)
return parser.parse_args()
@dynamo_worker()
async def worker(runtime: DistributedRuntime):
"""Main worker function for the Global Router service."""
args = parse_args()
logger.info("Starting Global Router Service")
logger.info(f"Config: {args.config}")
logger.info(f"Model name: {args.model_name}")
logger.info(f"Namespace: {args.namespace}")
# Create handler
handler = GlobalRouterHandler(
runtime=runtime,
config_path=args.config,
model_name=args.model_name,
default_ttft_target=args.default_ttft_target,
default_itl_target=args.default_itl_target,
)
# Initialize connections to local routers
await handler.initialize()
# Create component in the global router namespace
component = runtime.namespace(args.namespace).component(args.component_name)
# Create endpoints for prefill and decode
# Note: We use separate endpoints so we can register them with different ModelTypes
prefill_endpoint = component.endpoint("prefill_generate")
decode_endpoint = component.endpoint("decode_generate")
logger.info("Registering as prefill worker...")
# Register as prefill worker - frontend will send prefill requests here
# Use model_name as model_path since we don't need tokenizer/model files
await register_llm(
model_input=ModelInput.Tokens,
model_type=ModelType.Prefill,
endpoint=prefill_endpoint,
model_path=args.model_name,
model_name=args.model_name,
)
logger.info(
f"Registered prefill endpoint: {args.namespace}.{args.component_name}.prefill_generate"
)
logger.info("Registering as decode worker...")
# Register as decode worker - frontend will send decode requests here
await register_llm(
model_input=ModelInput.Tokens,
model_type=ModelType.Chat | ModelType.Completions,
endpoint=decode_endpoint,
model_path=args.model_name,
model_name=args.model_name,
)
logger.info(
f"Registered decode endpoint: {args.namespace}.{args.component_name}.decode_generate"
)
logger.info("Global Router ready - serving endpoints...")
logger.info(f"Pool info: {handler.get_pool_info()}")
# Serve both endpoints concurrently
try:
await asyncio.gather(
prefill_endpoint.serve_endpoint(
handler.handle_prefill,
graceful_shutdown=True,
metrics_labels=[("service", "global_router"), ("type", "prefill")],
),
decode_endpoint.serve_endpoint(
handler.handle_decode,
graceful_shutdown=True,
metrics_labels=[("service", "global_router"), ("type", "decode")],
),
)
except Exception as e:
logger.error(f"Failed to serve endpoints: {e}")
raise
finally:
logger.info("Global Router Service shutting down")
def main():
"""Entry point for the Global Router service."""
uvloop.run(worker())
if __name__ == "__main__":
main()
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
"""
Global Router Handler for hierarchical routing to prefill/decode pools.
This handler:
1. Receives requests from the frontend (acts as both prefill and decode worker)
2. Selects the appropriate pool based on config-driven grid selection
3. Forwards requests to local routers in the selected pool's namespace
"""
import logging
from typing import Any, AsyncGenerator, Dict, Optional
from dynamo.runtime import Client, DistributedRuntime
from .pool_selection import load_config
logger = logging.getLogger(__name__)
class GlobalRouterHandler:
"""
Handler for the Global Router that routes requests to prefill/decode pools.
The global router sits between the frontend and local routers. It:
- Receives prefill requests and routes to appropriate prefill pool
- Receives decode requests and routes to appropriate decode pool
- Uses grid-based selection strategy from config to choose pools
"""
def __init__(
self,
runtime: DistributedRuntime,
config_path: str,
model_name: str,
default_ttft_target: Optional[float] = None,
default_itl_target: Optional[float] = None,
):
"""
Initialize the Global Router Handler.
Args:
runtime: Dynamo distributed runtime for creating clients
config_path: Path to the JSON configuration file
model_name: Model name for logging/debugging
default_ttft_target: Default TTFT target (ms) when not in request
default_itl_target: Default ITL target (ms) when not in request
"""
self.runtime = runtime
self.config = load_config(config_path)
self.model_name = model_name
self.default_ttft_target = default_ttft_target
self.default_itl_target = default_itl_target
# Clients to local routers in each pool namespace
# Will be populated in initialize()
self.prefill_clients: Dict[str, Client] = {}
self.decode_clients: Dict[str, Client] = {}
# Keep track of namespace -> pool index mapping for easy access
self.prefill_namespace_to_idx: Dict[str, int] = {
ns: idx for idx, ns in enumerate(self.config.prefill_pool_dynamo_namespaces)
}
self.decode_namespace_to_idx: Dict[str, int] = {
ns: idx for idx, ns in enumerate(self.config.decode_pool_dynamo_namespaces)
}
async def initialize(self) -> None:
"""
Initialize clients to all local routers.
This connects to the local router in each pool's namespace.
Local routers are expected at: {namespace}.router.generate
"""
logger.info("Initializing Global Router Handler...")
# Connect to prefill pool local routers
for idx, namespace in enumerate(self.config.prefill_pool_dynamo_namespaces):
try:
endpoint = (
self.runtime.namespace(namespace)
.component("router")
.endpoint("generate")
)
client = await endpoint.client()
self.prefill_clients[namespace] = client
logger.info(
f"Connected to prefill pool {idx}: {namespace}.router.generate"
)
except Exception as e:
logger.error(
f"Failed to connect to prefill pool {idx} ({namespace}): {e}"
)
raise
# Connect to decode pool local routers
for idx, namespace in enumerate(self.config.decode_pool_dynamo_namespaces):
try:
endpoint = (
self.runtime.namespace(namespace)
.component("router")
.endpoint("generate")
)
client = await endpoint.client()
self.decode_clients[namespace] = client
logger.info(
f"Connected to decode pool {idx}: {namespace}.router.generate"
)
except Exception as e:
logger.error(
f"Failed to connect to decode pool {idx} ({namespace}): {e}"
)
raise
logger.info(
f"Global Router initialized: {len(self.prefill_clients)} prefill pools, "
f"{len(self.decode_clients)} decode pools"
)
async def handle_prefill(
self, request: Dict[str, Any]
) -> AsyncGenerator[Dict[str, Any], None]:
"""
Handle prefill requests from the frontend.
Selects the appropriate prefill pool based on ISL and TTFT target,
then forwards the request to the local router in that pool.
Args:
request: PreprocessedRequest dict with token_ids, etc.
Yields:
LLMEngineOutput dicts from the prefill worker
"""
# Extract ISL (input sequence length)
token_ids = request.get("token_ids", [])
isl = len(token_ids)
# Extract TTFT target from extra_args if provided, fallback to CLI default
extra_args = request.get("extra_args") or {}
ttft_target = extra_args.get("ttft_target") or self.default_ttft_target
# Select prefill pool
pool_idx = self.config.prefill_pool_selection_strategy.select_pool(
isl=isl, ttft_target=ttft_target
)
namespace = self.config.prefill_pool_dynamo_namespaces[pool_idx]
client = self.prefill_clients[namespace]
logger.info(
f"Routing prefill request: ISL={isl}, TTFT_target={ttft_target} -> "
f"pool {pool_idx} ({namespace})"
)
# Forward request to local router and stream back responses
try:
stream = await client.generate(request)
async for output in stream:
# Extract data from stream response object
data = output.data() if hasattr(output, "data") else output
yield data
except Exception as e:
logger.error(f"Error forwarding prefill request to {namespace}: {e}")
raise
async def handle_decode(
self, request: Dict[str, Any]
) -> AsyncGenerator[Dict[str, Any], None]:
"""
Handle decode requests from the frontend.
Selects the appropriate decode pool based on context length and ITL target,
then forwards the request to the local router in that pool.
Args:
request: PreprocessedRequest dict with token_ids, prefill_result, etc.
Yields:
LLMEngineOutput dicts from the decode worker
"""
# Extract context length (input tokens + any previously generated)
token_ids = request.get("token_ids", [])
# context_length should be averaged ISL + OSL // 2
# TODO: predict OSL based on ISL
context_length = len(token_ids)
# Extract ITL target from extra_args if provided, fallback to CLI default
extra_args = request.get("extra_args") or {}
itl_target = extra_args.get("itl_target") or self.default_itl_target
# Select decode pool
pool_idx = self.config.decode_pool_selection_strategy.select_pool(
context_length=context_length, itl_target=itl_target
)
namespace = self.config.decode_pool_dynamo_namespaces[pool_idx]
client = self.decode_clients[namespace]
logger.info(
f"Routing decode request: context_length={context_length}, ITL_target={itl_target} -> "
f"pool {pool_idx} ({namespace})"
)
# Forward request to local router and stream back responses
try:
stream = await client.generate(request)
async for output in stream:
# Extract data from stream response object
data = output.data() if hasattr(output, "data") else output
yield data
except Exception as e:
logger.error(f"Error forwarding decode request to {namespace}: {e}")
raise
def get_pool_info(self) -> Dict[str, Any]:
"""
Get information about connected pools for debugging/monitoring.
Returns:
Dict with pool information
"""
return {
"model_name": self.model_name,
"num_prefill_pools": self.config.num_prefill_pools,
"num_decode_pools": self.config.num_decode_pools,
"prefill_pools": self.config.prefill_pool_dynamo_namespaces,
"decode_pools": self.config.decode_pool_dynamo_namespaces,
"prefill_connected": list(self.prefill_clients.keys()),
"decode_connected": list(self.decode_clients.keys()),
}
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
"""
Configuration loading and pool selection logic for the Global Router.
The config file defines:
- Prefill and decode pool namespaces
- Grid-based pool selection strategies mapping (ISL, TTFT) -> prefill pool
and (context_length, ITL) -> decode pool
"""
import json
import logging
from dataclasses import dataclass
from pathlib import Path
from typing import List, Optional
logger = logging.getLogger(__name__)
@dataclass
class PrefillPoolSelectionStrategy:
"""Strategy for selecting prefill pools based on ISL and TTFT target."""
ttft_min: float
ttft_max: float
ttft_resolution: int
isl_min: int
isl_max: int
isl_resolution: int
prefill_pool_mapping: List[List[int]]
@property
def ttft_step(self) -> float:
"""Step size for TTFT grid."""
return (self.ttft_max - self.ttft_min) / self.ttft_resolution
@property
def isl_step(self) -> float:
"""Step size for ISL grid."""
return (self.isl_max - self.isl_min) / self.isl_resolution
def select_pool(self, isl: int, ttft_target: Optional[float] = None) -> int:
"""
Select prefill pool based on ISL and TTFT target.
Args:
isl: Input sequence length (number of tokens)
ttft_target: Target time to first token in ms. If None, uses middle of range.
Returns:
Pool index from prefill_pool_mapping
"""
if ttft_target is None:
ttft_target = (self.ttft_min + self.ttft_max) / 2
# Compute grid indices with clamping
isl_idx = self._clamp_index(
(isl - self.isl_min) / self.isl_step, self.isl_resolution
)
ttft_idx = self._clamp_index(
(ttft_target - self.ttft_min) / self.ttft_step, self.ttft_resolution
)
pool_idx = self.prefill_pool_mapping[isl_idx][ttft_idx]
logger.debug(
f"Prefill pool selection: ISL={isl}, TTFT={ttft_target} -> "
f"grid[{isl_idx}][{ttft_idx}] -> pool {pool_idx}"
)
return pool_idx
@staticmethod
def _clamp_index(value: float, resolution: int) -> int:
"""Clamp index to valid grid range."""
return max(0, min(int(value), resolution - 1))
@dataclass
class DecodePoolSelectionStrategy:
"""Strategy for selecting decode pools based on context length and ITL target."""
itl_min: float
itl_max: float
itl_resolution: int
context_length_min: int
context_length_max: int
context_length_resolution: int
decode_pool_mapping: List[List[int]]
@property
def itl_step(self) -> float:
"""Step size for ITL grid."""
return (self.itl_max - self.itl_min) / self.itl_resolution
@property
def context_length_step(self) -> float:
"""Step size for context length grid."""
return (
self.context_length_max - self.context_length_min
) / self.context_length_resolution
def select_pool(
self, context_length: int, itl_target: Optional[float] = None
) -> int:
"""
Select decode pool based on context length and ITL target.
Args:
context_length: Total context length (prompt + generated tokens so far)
itl_target: Target inter-token latency in ms. If None, uses middle of range.
Returns:
Pool index from decode_pool_mapping
"""
if itl_target is None:
itl_target = (self.itl_min + self.itl_max) / 2
# Compute grid indices with clamping
ctx_idx = self._clamp_index(
(context_length - self.context_length_min) / self.context_length_step,
self.context_length_resolution,
)
itl_idx = self._clamp_index(
(itl_target - self.itl_min) / self.itl_step, self.itl_resolution
)
pool_idx = self.decode_pool_mapping[ctx_idx][itl_idx]
logger.debug(
f"Decode pool selection: context_length={context_length}, ITL={itl_target} -> "
f"grid[{ctx_idx}][{itl_idx}] -> pool {pool_idx}"
)
return pool_idx
@staticmethod
def _clamp_index(value: float, resolution: int) -> int:
"""Clamp index to valid grid range."""
return max(0, min(int(value), resolution - 1))
@dataclass
class GlobalRouterConfig:
"""Configuration for the Global Router."""
num_prefill_pools: int
num_decode_pools: int
prefill_pool_dynamo_namespaces: List[str]
decode_pool_dynamo_namespaces: List[str]
prefill_pool_selection_strategy: PrefillPoolSelectionStrategy
decode_pool_selection_strategy: DecodePoolSelectionStrategy
def validate(self) -> None:
"""Validate configuration consistency."""
if len(self.prefill_pool_dynamo_namespaces) != self.num_prefill_pools:
raise ValueError(
f"num_prefill_pools ({self.num_prefill_pools}) does not match "
f"prefill_pool_dynamo_namespaces length ({len(self.prefill_pool_dynamo_namespaces)})"
)
if len(self.decode_pool_dynamo_namespaces) != self.num_decode_pools:
raise ValueError(
f"num_decode_pools ({self.num_decode_pools}) does not match "
f"decode_pool_dynamo_namespaces length ({len(self.decode_pool_dynamo_namespaces)})"
)
# Validate prefill strategy ranges and resolutions
prefill_strategy = self.prefill_pool_selection_strategy
if prefill_strategy.isl_resolution <= 0:
raise ValueError(
f"isl_resolution must be positive, got {prefill_strategy.isl_resolution}"
)
if prefill_strategy.ttft_resolution <= 0:
raise ValueError(
f"ttft_resolution must be positive, got {prefill_strategy.ttft_resolution}"
)
if prefill_strategy.isl_min >= prefill_strategy.isl_max:
raise ValueError(
f"isl_min ({prefill_strategy.isl_min}) must be less than "
f"isl_max ({prefill_strategy.isl_max})"
)
if prefill_strategy.ttft_min >= prefill_strategy.ttft_max:
raise ValueError(
f"ttft_min ({prefill_strategy.ttft_min}) must be less than "
f"ttft_max ({prefill_strategy.ttft_max})"
)
# Validate decode strategy ranges and resolutions
decode_strategy = self.decode_pool_selection_strategy
if decode_strategy.context_length_resolution <= 0:
raise ValueError(
f"context_length_resolution must be positive, got {decode_strategy.context_length_resolution}"
)
if decode_strategy.itl_resolution <= 0:
raise ValueError(
f"itl_resolution must be positive, got {decode_strategy.itl_resolution}"
)
if decode_strategy.context_length_min >= decode_strategy.context_length_max:
raise ValueError(
f"context_length_min ({decode_strategy.context_length_min}) must be less than "
f"context_length_max ({decode_strategy.context_length_max})"
)
if decode_strategy.itl_min >= decode_strategy.itl_max:
raise ValueError(
f"itl_min ({decode_strategy.itl_min}) must be less than "
f"itl_max ({decode_strategy.itl_max})"
)
# Validate mapping dimensions match resolution
if (
len(prefill_strategy.prefill_pool_mapping)
!= prefill_strategy.isl_resolution
):
raise ValueError(
f"prefill_pool_mapping rows ({len(prefill_strategy.prefill_pool_mapping)}) "
f"does not match isl_resolution ({prefill_strategy.isl_resolution})"
)
for i, row in enumerate(prefill_strategy.prefill_pool_mapping):
if len(row) != prefill_strategy.ttft_resolution:
raise ValueError(
f"prefill_pool_mapping row {i} length ({len(row)}) "
f"does not match ttft_resolution ({prefill_strategy.ttft_resolution})"
)
for pool_idx in row:
if pool_idx < 0 or pool_idx >= self.num_prefill_pools:
raise ValueError(
f"Invalid prefill pool index {pool_idx} in mapping "
f"(must be 0 to {self.num_prefill_pools - 1})"
)
decode_strategy = self.decode_pool_selection_strategy
if (
len(decode_strategy.decode_pool_mapping)
!= decode_strategy.context_length_resolution
):
raise ValueError(
f"decode_pool_mapping rows ({len(decode_strategy.decode_pool_mapping)}) "
f"does not match context_length_resolution ({decode_strategy.context_length_resolution})"
)
for i, row in enumerate(decode_strategy.decode_pool_mapping):
if len(row) != decode_strategy.itl_resolution:
raise ValueError(
f"decode_pool_mapping row {i} length ({len(row)}) "
f"does not match itl_resolution ({decode_strategy.itl_resolution})"
)
for pool_idx in row:
if pool_idx < 0 or pool_idx >= self.num_decode_pools:
raise ValueError(
f"Invalid decode pool index {pool_idx} in mapping "
f"(must be 0 to {self.num_decode_pools - 1})"
)
def load_config(config_path: str | Path) -> GlobalRouterConfig:
"""
Load Global Router configuration from JSON file.
Args:
config_path: Path to the JSON configuration file
Returns:
GlobalRouterConfig instance
Raises:
FileNotFoundError: If config file doesn't exist
ValueError: If config is invalid
"""
config_path = Path(config_path)
if not config_path.exists():
raise FileNotFoundError(f"Config file not found: {config_path}")
with open(config_path) as f:
data = json.load(f)
logger.info(f"Loading global router config from {config_path}")
# Parse prefill selection strategy
prefill_strategy_data = data["prefill_pool_selection_strategy"]
prefill_strategy = PrefillPoolSelectionStrategy(
ttft_min=prefill_strategy_data["ttft_min"],
ttft_max=prefill_strategy_data["ttft_max"],
ttft_resolution=prefill_strategy_data["ttft_resolution"],
isl_min=prefill_strategy_data["isl_min"],
isl_max=prefill_strategy_data["isl_max"],
isl_resolution=prefill_strategy_data["isl_resolution"],
prefill_pool_mapping=prefill_strategy_data["prefill_pool_mapping"],
)
# Parse decode selection strategy
decode_strategy_data = data["decode_pool_selection_strategy"]
decode_strategy = DecodePoolSelectionStrategy(
itl_min=decode_strategy_data["itl_min"],
itl_max=decode_strategy_data["itl_max"],
itl_resolution=decode_strategy_data["itl_resolution"],
context_length_min=decode_strategy_data["context_length_min"],
context_length_max=decode_strategy_data["context_length_max"],
context_length_resolution=decode_strategy_data["context_length_resolution"],
decode_pool_mapping=decode_strategy_data["decode_pool_mapping"],
)
config = GlobalRouterConfig(
num_prefill_pools=data["num_prefill_pools"],
num_decode_pools=data["num_decode_pools"],
prefill_pool_dynamo_namespaces=data["prefill_pool_dynamo_namespaces"],
decode_pool_dynamo_namespaces=data["decode_pool_dynamo_namespaces"],
prefill_pool_selection_strategy=prefill_strategy,
decode_pool_selection_strategy=decode_strategy,
)
config.validate()
logger.info(
f"Loaded config: {config.num_prefill_pools} prefill pools, "
f"{config.num_decode_pools} decode pools"
)
return config
<!--
SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
SPDX-License-Identifier: Apache-2.0
-->
# Hierarchical Planner Example
This example demonstrates a hierarchical routing setup with:
- A **Global Router** that routes to different pools based on request characteristics
- **Local Routers** in each pool namespace
- **Mocker Workers** simulating prefill and decode backends
## Architecture
```
Frontend (round-robin routing)
|
v
Global Router
(registers as both prefill + decode)
|
+----------------+----------------+
| | |
v v v
Prefill Pool 0 Prefill Pool 1 Decode Pool 0
(prefill_pool_0) (prefill_pool_1) (decode_pool_0)
| | |
v v v
Local Router Local Router Local Router
| | |
v v v
Mocker Worker Mocker Worker Mocker Worker
(prefill) (prefill) (decode)
```
## Configuration
The `global_router_config.json` defines:
- 2 prefill pools (`prefill_pool_0`, `prefill_pool_1`)
- 1 decode pool (`decode_pool_0`)
- Grid-based pool selection strategy
Pool selection is based on a 2x2 grid:
- **Prefill**: (ISL, TTFT_target) maps to prefill pool index
- **Decode**: (context_length, ITL_target) maps to decode pool index
## Running the Example
```bash
cd examples/hierarchical_planner
./run_example.sh
```
This starts all components in the background and provides instructions for testing.
## Testing
Once all components are running, send a request to the frontend:
```bash
curl -X POST http://localhost:8000/v1/chat/completions \
-H "Content-Type: application/json" \
-d '{
"model": "Qwen/Qwen3-0.6B",
"messages": [{"role": "user", "content": "Hello, how are you?"}],
"max_tokens": 50,
"stream": true
}'
```
## Request Flow
1. Request arrives at **Frontend**
2. Frontend's `PrefillRouter` detects both prefill and decode registered for the model
3. Frontend sends prefill request to **Global Router** (registered as prefill)
4. Global Router selects prefill pool based on (ISL, TTFT_target) grid
5. Request forwarded to **Local Router** in selected prefill pool namespace
6. Local Router forwards to **Mocker Worker** (prefill mode)
7. Prefill response returns with `disaggregated_params`
8. Frontend sends decode request to **Global Router** (registered as decode)
9. Global Router selects decode pool based on (context_length, ITL_target) grid
10. Tokens stream back through the chain
## Customizing Pool Selection
Edit `global_router_config.json` to change:
- **Number of pools**: Adjust `num_prefill_pools`, `num_decode_pools` and corresponding namespace lists
- **Selection grid**: Modify `isl_resolution`, `ttft_resolution` etc. to change grid granularity
- **Pool mapping**: Edit `prefill_pool_mapping` and `decode_pool_mapping` matrices
Example: To always route to pool 0 regardless of request characteristics:
```json
"prefill_pool_mapping": [[0, 0], [0, 0]]
```
\ No newline at end of file
{
"num_prefill_pools": 2,
"num_decode_pools": 1,
"prefill_pool_dynamo_namespaces": ["prefill_pool_0", "prefill_pool_1"],
"decode_pool_dynamo_namespaces": ["decode_pool_0"],
"prefill_pool_selection_strategy": {
"ttft_min": 10,
"ttft_max": 1000,
"ttft_resolution": 2,
"isl_min": 0,
"isl_max": 32000,
"isl_resolution": 2,
"prefill_pool_mapping": [[0,1],[0,1]]
},
"decode_pool_selection_strategy": {
"itl_min": 10,
"itl_max": 100,
"itl_resolution": 2,
"context_length_min": 0,
"context_length_max": 32000,
"context_length_resolution": 2,
"decode_pool_mapping": [[0,0],[0,0]]
}
}
\ No newline at end of file
#!/bin/bash
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
# Hierarchical Planner Example
# Run each command in a separate terminal, in order from bottom to top.
# Wait a few seconds between starting each component.
# ============================================================================
# frontend + global_router
# ============================================================================
# need to specify a namespace so that mockers are not registered to frontend
# and cannot use "dynamo" because that is reserved for all namespaces
python -m dynamo.frontend \
--router-mode round-robin \
--namespace hierarchical &
python -m dynamo.global_router \
--config examples/hierarchical_planner/global_router_config.json \
--model-name Qwen/Qwen3-0.6B \
--default-ttft-target 100 \
--default-itl-target 10 \
--namespace hierarchical &
# ============================================================================
# prefill_pool_0 - local router + mocker worker (prefill)
# ============================================================================
DYN_NAMESPACE=prefill_pool_0 python -m dynamo.router \
--endpoint prefill_pool_0.worker.generate \
--block-size 16 & \
--no-track-active-blocks # prefill router does not need to track active blocks
python -m dynamo.mocker \
--model-path Qwen/Qwen3-0.6B \
--endpoint dyn://prefill_pool_0.worker.generate \
--is-prefill-worker \
--block-size 16 &
# ============================================================================
# prefill_pool_1 - local router + mocker worker (prefill)
# ============================================================================
DYN_NAMESPACE=prefill_pool_1 python -m dynamo.router \
--endpoint prefill_pool_1.worker.generate \
--block-size 16 & \
--no-track-active-blocks # prefill router does not need to track active blocks
python -m dynamo.mocker \
--model-path Qwen/Qwen3-0.6B \
--endpoint dyn://prefill_pool_1.worker.generate \
--is-prefill-worker \
--block-size 16 &
# ============================================================================
# decode_pool_0 - local router + mocker worker (decode)
# ============================================================================
DYN_NAMESPACE=decode_pool_0 python -m dynamo.router \
--endpoint decode_pool_0.worker.generate \
--block-size 16 & \
--kv-overlap-score-weight 0
python -m dynamo.mocker \
--model-path Qwen/Qwen3-0.6B \
--endpoint dyn://decode_pool_0.worker.generate \
--is-decode-worker \
--block-size 16 &
# ============================================================================
# test request
# ============================================================================
# wait for all components to start
# curl -X POST http://localhost:8000/v1/chat/completions \
# -H "Content-Type: application/json" \
# -d '{
# "model": "Qwen/Qwen3-0.6B",
# "messages": [{"role": "user", "content": "Hello!"}],
# "max_tokens": 50,
# "stream": true
# }'
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment