".devcontainer/vscode:/vscode.git/clone" did not exist on "2c747d641e9fdc980f6ccbf1f6e40cf73ccd1e3d"
Unverified Commit a96f5d73 authored by jh-nv's avatar jh-nv Committed by GitHub
Browse files

chore: add mypy for router (#6864)

parent b94cb543
......@@ -109,23 +109,23 @@ class StandaloneRouterHandler:
}
async for worker_output in await self.kv_router.generate_from_request(
preprocessed_request
preprocessed_request # type: ignore[arg-type]
):
# Wrap worker output into LLMEngineOutput format
# Worker should return dict with at minimum kv_transfer_params in extra_args
llm_engine_output = {
"token_ids": worker_output.get("token_ids", []),
"tokens": worker_output.get("tokens"),
"text": worker_output.get("text"),
"cum_log_probs": worker_output.get("cum_log_probs"),
"log_probs": worker_output.get("log_probs"),
"top_logprobs": worker_output.get("top_logprobs"),
"finish_reason": worker_output.get("finish_reason"),
"stop_reason": worker_output.get("stop_reason"),
"index": worker_output.get("index"),
"disaggregated_params": worker_output.get("disaggregated_params"),
"extra_args": worker_output.get("extra_args"),
"completion_usage": worker_output.get("completion_usage"),
"token_ids": worker_output.get("token_ids", []), # type: ignore[attr-defined]
"tokens": worker_output.get("tokens"), # type: ignore[attr-defined]
"text": worker_output.get("text"), # type: ignore[attr-defined]
"cum_log_probs": worker_output.get("cum_log_probs"), # type: ignore[attr-defined]
"log_probs": worker_output.get("log_probs"), # type: ignore[attr-defined]
"top_logprobs": worker_output.get("top_logprobs"), # type: ignore[attr-defined]
"finish_reason": worker_output.get("finish_reason"), # type: ignore[attr-defined]
"stop_reason": worker_output.get("stop_reason"), # type: ignore[attr-defined]
"index": worker_output.get("index"), # type: ignore[attr-defined]
"disaggregated_params": worker_output.get("disaggregated_params"), # type: ignore[attr-defined]
"extra_args": worker_output.get("extra_args"), # type: ignore[attr-defined]
"completion_usage": worker_output.get("completion_usage"), # type: ignore[attr-defined]
}
yield llm_engine_output
......
......@@ -4,6 +4,7 @@
"""Router CLI parsing, config, and assembly for the standalone router."""
import argparse
from typing import Optional
from dynamo.common.configuration.arg_group import ArgGroup
from dynamo.common.configuration.groups.kv_router_args import (
......@@ -74,7 +75,7 @@ def build_kv_router_config(router_config: DynamoRouterConfig) -> KvRouterConfig:
return KvRouterConfig(**router_config.kv_router_kwargs())
def parse_args(argv=None) -> DynamoRouterConfig:
def parse_args(argv: Optional[list[str]] = None) -> DynamoRouterConfig:
"""Parse command-line arguments for the standalone router.
Returns:
......
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
"""Dynamo standalone router configuration ArgGroup."""
import argparse
from dynamo.common.configuration.arg_group import ArgGroup
from dynamo.common.configuration.config_base import ConfigBase
from dynamo.common.configuration.utils import add_argument, add_negatable_bool_argument
class DynamoRouterConfig(ConfigBase):
"""Typed configuration for the standalone KV router (router-owned options only)."""
namespace: str
endpoint: str
router_block_size: int
router_kv_overlap_score_weight: float
router_temperature: float
router_use_kv_events: bool
router_replica_sync: bool
router_snapshot_threshold: int
router_reset_states: bool
router_durable_kv_events: bool
router_track_active_blocks: bool
router_assume_kv_reuse: bool
router_track_output_blocks: bool
router_ttl_secs: float
router_max_tree_size: int
router_prune_target_ratio: float
router_event_threads: int
def validate(self) -> None:
"""Validate config invariants (aligned with Rust KvRouterConfig where applicable)."""
if not self.endpoint:
raise ValueError(
"endpoint is required (set --endpoint or DYN_ROUTER_ENDPOINT)"
)
parts = self.endpoint.split(".")
if len(parts) != 3:
raise ValueError(
f"Invalid endpoint format: {self.endpoint!r}. "
"Expected format: namespace.component.endpoint"
)
self.namespace = parts[0]
class DynamoRouterArgGroup(ArgGroup):
"""CLI argument group for standalone router options."""
name = "dynamo-router"
def add_arguments(self, parser: argparse.ArgumentParser) -> None:
"""Add router-owned arguments to parser."""
g = parser.add_argument_group("Dynamo Router Options")
add_argument(
g,
flag_name="--endpoint",
env_var="DYN_ROUTER_ENDPOINT",
default=None,
help="Full endpoint path for workers in the format namespace.component.endpoint (e.g., dynamo.prefill.generate for prefill workers)",
arg_type=str,
)
add_argument(
g,
flag_name="--router-block-size",
env_var="DYN_ROUTER_BLOCK_SIZE",
default=128,
help="KV cache block size for routing decisions",
arg_type=int,
obsolete_flag="--block-size",
)
add_argument(
g,
flag_name="--router-kv-overlap-score-weight",
env_var="DYN_ROUTER_KV_OVERLAP_SCORE_WEIGHT",
default=1.0,
help="KV Router: Weight for overlap score in worker selection. Higher values prioritize KV cache reuse",
arg_type=float,
obsolete_flag="--kv-overlap-score-weight",
)
add_argument(
g,
flag_name="--router-temperature",
env_var="DYN_ROUTER_TEMPERATURE",
default=0.0,
help="KV Router: Temperature for worker sampling via softmax. Higher values promote more randomness, and 0 fallbacks to deterministic.",
arg_type=float,
)
add_negatable_bool_argument(
g,
flag_name="--router-kv-events",
env_var="DYN_ROUTER_USE_KV_EVENTS",
default=True,
help="KV Router: Enable KV events from workers. When disabled (--no-router-kv-events), the router predicts cache state based on routing decisions with TTL-based expiration and pruning, rather than receiving events from workers.",
dest="router_use_kv_events",
obsolete_flag="--kv-events",
)
add_negatable_bool_argument(
g,
flag_name="--router-replica-sync",
env_var="DYN_ROUTER_REPLICA_SYNC",
default=False,
help="KV Router: Enable replica synchronization across multiple router instances. When true, routers will publish and subscribe to events to maintain consistent state.",
)
add_argument(
g,
flag_name="--router-snapshot-threshold",
env_var="DYN_ROUTER_SNAPSHOT_THRESHOLD",
default=1000000,
help="KV Router: Number of messages in stream before triggering a snapshot",
arg_type=int,
)
add_negatable_bool_argument(
g,
flag_name="--router-reset-states",
env_var="DYN_ROUTER_RESET_STATES",
default=False,
help="KV Router: Reset router state on startup, purging stream and object store. WARNING: Can affect existing router replicas.",
)
add_negatable_bool_argument(
g,
flag_name="--router-durable-kv-events",
env_var="DYN_ROUTER_DURABLE_KV_EVENTS",
default=False,
help="[Deprecated] KV Router: Enable durable KV events using NATS JetStream. This option will be removed in a future release. The event-plane subscriber (local_indexer mode) is now the recommended path.",
obsolete_flag="--durable-kv-events",
)
add_negatable_bool_argument(
g,
flag_name="--router-track-active-blocks",
env_var="DYN_ROUTER_TRACK_ACTIVE_BLOCKS",
default=True,
help="KV Router: Track active blocks for load balancing. Use --no-router-track-active-blocks to disable",
obsolete_flag="--track-active-blocks",
)
add_negatable_bool_argument(
g,
flag_name="--router-assume-kv-reuse",
env_var="DYN_ROUTER_ASSUME_KV_REUSE",
default=True,
help="KV Router: When tracking active blocks, assume KV cache reuse. Use --no-router-assume-kv-reuse to use random hashes, useful when KV cache reuse is not expected.",
obsolete_flag="--assume-kv-reuse",
)
add_negatable_bool_argument(
g,
flag_name="--router-track-output-blocks",
env_var="DYN_ROUTER_TRACK_OUTPUT_BLOCKS",
default=False,
help="KV Router: Track output blocks during generation. When enabled, the router adds placeholder blocks as tokens are generated and applies fractional decay based on progress toward expected output sequence length (agent_hints.osl in nvext).",
obsolete_flag="--track-output-blocks",
)
add_argument(
g,
flag_name="--router-ttl-secs",
env_var="DYN_ROUTER_TTL_SECS",
default=120.0,
help="KV Router: TTL for blocks in seconds. Only used when --no-router-kv-events is set. Controls how long cached blocks are considered valid without explicit events.",
arg_type=float,
)
add_argument(
g,
flag_name="--router-max-tree-size",
env_var="DYN_ROUTER_MAX_TREE_SIZE",
default=2**20,
help="KV Router: Maximum tree size before pruning. Only used when --no-router-kv-events is set. When the indexer tree exceeds this size, pruning is triggered.",
arg_type=int,
)
add_argument(
g,
flag_name="--router-prune-target-ratio",
env_var="DYN_ROUTER_PRUNE_TARGET_RATIO",
default=0.8,
help="KV Router: Target size ratio after pruning (0.0-1.0). Only used when --no-router-kv-events is set. Determines how aggressively to prune the tree.",
arg_type=float,
)
add_argument(
g,
flag_name="--router-event-threads",
env_var="DYN_ROUTER_EVENT_THREADS",
default=4,
help="KV Router: Number of event processing threads. >1 uses concurrent radix tree and thread pool for higher throughput. Ignored when --no-router-kv-events is set (approximate mode always uses single-threaded indexer with TTL/pruning).",
arg_type=int,
)
......@@ -1378,6 +1378,18 @@ class KvRouter:
"""
...
async def generate_from_request(
self,
request: JsonLike,
) -> AsyncIterator[JsonLike]:
"""
Generate from a preprocessed request dict (PreprocessedRequest format).
Accepts a full request dict with token_ids, model, stop_conditions, etc.
Returns an async iterator yielding generation responses.
"""
...
async def best_worker(
self,
token_ids: List[int],
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment