Unverified Commit a82acfa0 authored by jh-nv's avatar jh-nv Committed by GitHub
Browse files

feat: Refactor frontend CLI configuration (#6201)

parent 948d6d85
......@@ -5,12 +5,16 @@
import argparse
import os
from typing import Any, Optional, TypeVar
from typing import Any, Callable, Optional, TypeVar, Union
T = TypeVar("T")
def env_or_default(env_var: str, default: T) -> T:
def env_or_default(
env_var: str,
default: T,
value_type: Optional[Union[type, Callable[..., Any]]] = None,
) -> T:
"""
Get value from environment variable or return default.
......@@ -19,32 +23,35 @@ def env_or_default(env_var: str, default: T) -> T:
Args:
env_var: Environment variable name (e.g., "DYN_NAMESPACE")
default: Default value if env var not set
value_type: If provided, use this type to convert the env value. If None, the type
is taken from type(default). Use value_type when default is None but you still
want the env value coerced (e.g. env_or_default("DYN_FOO", None, value_type=int)).
Returns:
Environment variable value (type-converted) or default
Examples:
>>> env_or_default("DYN_NAMESPACE", "test")
"test" # if DYN_NAMESPACE not set
>>> env_or_default("DYN_MIGRATION_LIMIT", 0)
5 # if DYN_MIGRATION_LIMIT="5"
"""
value = os.environ.get(env_var)
if value is None:
return default
# Type conversion based on default type
if isinstance(default, bool):
# No type info available: default=None and no explicit value_type.
if value_type is None and default is None:
return value # type: ignore[return-value]
# Prefer the explicit type if provided; otherwise derive from default
target_type = value_type if value_type is not None else type(default)
if target_type is bool:
return value.lower() in ("true", "1", "yes", "on") # type: ignore
elif isinstance(default, int):
if target_type is int:
return int(value) # type: ignore
elif isinstance(default, float):
if target_type is float:
return float(value) # type: ignore
elif isinstance(default, list):
# Env vars for list options (e.g. DYN_CONNECTOR) are space-separated; downstream expects a list.
if target_type is list:
return [x.strip() for x in value.split() if x.strip()] # type: ignore
else:
return value # type: ignore
# Fall back to calling the type/callable for custom validators (e.g., pathlib.Path)
return target_type(value) if callable(target_type) else value # type: ignore
def add_argument(
......@@ -55,7 +62,7 @@ def add_argument(
default: Any,
help: str,
obsolete_flag: Optional[str] = None,
arg_type: Optional[type] = str,
arg_type: Optional[Union[type, Callable[..., Any]]] = str,
**kwargs: Any,
) -> None:
"""
......@@ -74,7 +81,12 @@ def add_argument(
arg_type: Type for the argument (default: str)
"""
arg_dest = _get_dest_name(flag_name, kwargs.get("dest"))
default_with_env = env_or_default(env_var, default)
value_type_for_env: Optional[Union[type, Callable[..., Any]]] = None
if arg_type is not None and isinstance(arg_type, type):
value_type_for_env = arg_type
if isinstance(default, list) and (arg_type is None or arg_type is str):
value_type_for_env = None
default_with_env = env_or_default(env_var, default, value_type=value_type_for_env)
names = [flag_name]
......@@ -88,8 +100,9 @@ def add_argument(
"dest": arg_dest,
"default": default_with_env,
"help": env_help,
"type": arg_type,
}
if arg_type is not None:
add_arg_opts["type"] = arg_type
kwargs.update(add_arg_opts)
parser.add_argument(*names, **kwargs)
......@@ -114,15 +127,15 @@ def add_negatable_bool_argument(
default: Default value
help: Help text
"""
arg_dest = _get_dest_name(flag_name, dest)
default_with_env = env_or_default(env_var, default)
parser.add_argument(
flag_name,
dest=arg_dest,
add_argument(
parser,
flag_name=flag_name,
env_var=env_var,
default=default,
help=help,
dest=dest,
arg_type=None,
action=argparse.BooleanOptionalAction,
default=default_with_env,
help=_build_help_message(help, env_var, default),
)
......
......@@ -7,12 +7,14 @@ import argparse
import pytest
from dynamo.common.configuration.utils import (
add_argument,
add_negatable_bool_argument,
env_or_default,
)
pytestmark = [
pytest.mark.unit,
pytest.mark.gpu_0,
pytest.mark.pre_merge,
]
......@@ -88,6 +90,86 @@ class TestEnvOrDefault:
# Bool
assert isinstance(env_or_default("TEST_VAR", True), bool)
def test_none_default_with_no_value_type_returns_raw_env_value(self, monkeypatch):
"""Test env value is returned as string when no type info is available."""
monkeypatch.setenv("TEST_VAR", "env_value")
result = env_or_default("TEST_VAR", None, value_type=None)
assert result == "env_value"
assert isinstance(result, str)
class TestAddArgument:
"""Test add_argument function."""
def test_callable_type_with_none_default_uses_env_and_validates(self, monkeypatch):
"""Test callable arg_type works when default is None and env var is set."""
monkeypatch.setenv("TEST_MODEL_NAME", " model-A ")
parser = argparse.ArgumentParser()
def validate_model_name(value: str) -> str:
if len(value.strip()) == 0:
raise argparse.ArgumentTypeError("model-name must be non-empty")
return value.strip()
add_argument(
parser,
flag_name="--model-name",
env_var="TEST_MODEL_NAME",
default=None,
help="Model name",
arg_type=validate_model_name,
)
args = parser.parse_args([])
assert args.model_name == "model-A"
def test_callable_type_with_none_default_uses_none_when_env_unset(
self, monkeypatch
):
"""Test callable arg_type keeps None default when env var is not set."""
monkeypatch.delenv("TEST_MODEL_NAME", raising=False)
parser = argparse.ArgumentParser()
def validate_model_name(value: str) -> str:
if len(value.strip()) == 0:
raise argparse.ArgumentTypeError("model-name must be non-empty")
return value.strip()
add_argument(
parser,
flag_name="--model-name",
env_var="TEST_MODEL_NAME",
default=None,
help="Model name",
arg_type=validate_model_name,
)
args = parser.parse_args([])
assert args.model_name is None
def test_callable_type_with_invalid_env_value_fails_parse(self, monkeypatch):
"""Test invalid env value still fails validation via argparse type callable."""
monkeypatch.setenv("TEST_MODEL_NAME", " ")
parser = argparse.ArgumentParser()
def validate_model_name(value: str) -> str:
if len(value.strip()) == 0:
raise argparse.ArgumentTypeError("model-name must be non-empty")
return value.strip()
add_argument(
parser,
flag_name="--model-name",
env_var="TEST_MODEL_NAME",
default=None,
help="Model name",
arg_type=validate_model_name,
)
with pytest.raises(SystemExit):
parser.parse_args([])
class TestAddNegatableBool:
"""Test add_negatable_bool function."""
......
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
import argparse
import os
import pathlib
from typing import Any, Dict, Optional
from dynamo.common.config_dump import register_encoder
from dynamo.common.configuration.arg_group import ArgGroup
from dynamo.common.configuration.config_base import ConfigBase
from dynamo.common.configuration.utils import (
add_argument,
add_negatable_bool_argument,
env_or_default,
)
from . import __version__
def validate_model_name(value: str) -> str:
"""Validate that model-name is a non-empty string."""
if not value or not isinstance(value, str) or len(value.strip()) == 0:
raise argparse.ArgumentTypeError(
f"model-name must be a non-empty string, got: {value}"
)
return value.strip()
def validate_model_path(value: str) -> str:
"""Validate that model-path is a valid directory on disk."""
if not os.path.isdir(value):
raise argparse.ArgumentTypeError(
f"model-path must be a valid directory on disk, got: {value}"
)
return value
class FrontendConfig(ConfigBase):
"""Configuration for the Dynamo frontend."""
interactive: bool
kv_cache_block_size: Optional[int]
http_host: str
http_port: int
tls_cert_path: Optional[pathlib.Path]
tls_key_path: Optional[pathlib.Path]
router_mode: str
kv_overlap_score_weight: float
router_temperature: float
use_kv_events: bool
router_ttl: float
router_max_tree_size: int
router_prune_target_ratio: float
namespace: Optional[str] = None
router_replica_sync: bool
router_snapshot_threshold: int
router_reset_states: bool
durable_kv_events: bool
router_track_active_blocks: bool
router_assume_kv_reuse: bool
router_track_output_blocks: bool
router_event_threads: int
enforce_disagg: bool
migration_limit: int
active_decode_blocks_threshold: Optional[float]
active_prefill_tokens_threshold: Optional[int]
active_prefill_tokens_threshold_frac: Optional[float]
model_name: Optional[str]
model_path: Optional[str]
metrics_prefix: Optional[str] = None
kserve_grpc_server: bool
grpc_metrics_port: int
dump_config_to: Optional[str]
store_kv: str
request_plane: str
event_plane: str
chat_processor: str
exp_python_factory: bool
def validate(self) -> None:
if bool(self.tls_cert_path) ^ bool(self.tls_key_path): # ^ is XOR
raise ValueError(
"--tls-cert-path and --tls-key-path must be provided together"
)
if self.migration_limit < 0 or self.migration_limit > 4294967295:
raise ValueError(
"--migration-limit must be between 0 and 4294967295 (0=disabled)"
)
@register_encoder(FrontendConfig)
def _preprocess_for_encode_config(config: FrontendConfig) -> Dict[str, Any]:
"""Convert FrontendConfig object to dictionary for encoding."""
return config.__dict__
class FrontendArgGroup(ArgGroup):
"""Frontend configuration parameters."""
def add_arguments(self, parser) -> None:
parser.add_argument(
"--version", action="version", version=f"Dynamo Frontend {__version__}"
)
g = parser.add_argument_group("Dynamo Frontend Options")
# Interactive needs -i short option; use raw add_argument with BooleanOptionalAction
g.add_argument(
"-i",
"--interactive",
dest="interactive",
action=argparse.BooleanOptionalAction,
default=env_or_default("DYN_INTERACTIVE", False),
help="Interactive text chat.\nenv var: DYN_INTERACTIVE",
)
add_argument(
g,
flag_name="--kv-cache-block-size",
env_var="DYN_KV_CACHE_BLOCK_SIZE",
default=None,
help="KV cache block size (u32).",
arg_type=int,
)
add_argument(
g,
flag_name="--http-host",
env_var="DYN_HTTP_HOST",
default="0.0.0.0",
help="HTTP host for the engine (str).",
)
add_argument(
g,
flag_name="--http-port",
env_var="DYN_HTTP_PORT",
default=8000,
help="HTTP port for the engine (u16).",
arg_type=int,
)
add_argument(
g,
flag_name="--tls-cert-path",
env_var="DYN_TLS_CERT_PATH",
default=None,
help="TLS certificate path, PEM format.",
arg_type=pathlib.Path,
)
add_argument(
g,
flag_name="--tls-key-path",
env_var="DYN_TLS_KEY_PATH",
default=None,
help="TLS certificate key path, PEM format.",
arg_type=pathlib.Path,
)
add_argument(
g,
flag_name="--router-mode",
env_var="DYN_ROUTER_MODE",
default="round-robin",
help="How to route the request.",
choices=["round-robin", "random", "kv"],
)
add_argument(
g,
flag_name="--kv-overlap-score-weight",
env_var="DYN_KV_OVERLAP_SCORE_WEIGHT",
default=1.0,
help=(
"KV Router: Weight for overlap score in worker selection. "
"Higher values prioritize KV cache reuse."
),
arg_type=float,
)
add_argument(
g,
flag_name="--router-temperature",
env_var="DYN_ROUTER_TEMPERATURE",
default=0.0,
help=(
"KV Router: Temperature for worker sampling via softmax. Higher values "
"promote more randomness, and 0 fallbacks to deterministic."
),
arg_type=float,
)
add_negatable_bool_argument(
g,
flag_name="--kv-events",
env_var="DYN_KV_EVENTS",
default=True,
help=(
"KV Router: Enable/disable KV events. Use --kv-events to enable "
"(default, router receives cache state events from workers) or --no-kv-events "
"to disable (router predicts cache state based on routing decisions)."
),
dest="use_kv_events",
)
add_argument(
g,
flag_name="--router-ttl",
env_var="DYN_ROUTER_TTL",
default=120.0,
help=(
"KV Router: Time-to-live in seconds for blocks when KV events are disabled. "
"Only used when --no-kv-events is set."
),
arg_type=float,
)
add_argument(
g,
flag_name="--router-max-tree-size",
env_var="DYN_ROUTER_MAX_TREE_SIZE",
default=2**20,
help=(
"KV Router: Maximum tree size before pruning when KV events are disabled. "
"Only used when --no-kv-events is set."
),
arg_type=int,
)
add_argument(
g,
flag_name="--router-prune-target-ratio",
env_var="DYN_ROUTER_PRUNE_TARGET_RATIO",
default=0.8,
help=(
"KV Router: Target size ratio after pruning when KV events are disabled. "
"Only used when --no-kv-events is set."
),
arg_type=float,
)
add_argument(
g,
flag_name="--namespace",
env_var="DYN_NAMESPACE",
default=None,
help=(
"Dynamo namespace for model discovery scoping. If specified, models will "
"only be discovered from this namespace. If not specified, discovers models "
"from all namespaces (global discovery)."
),
)
add_negatable_bool_argument(
g,
flag_name="--router-replica-sync",
env_var="DYN_ROUTER_REPLICA_SYNC",
default=False,
help=(
"KV Router: Enable replica synchronization across multiple router instances. "
"When true, routers will publish and subscribe to events to maintain "
"consistent state."
),
)
add_argument(
g,
flag_name="--router-snapshot-threshold",
env_var="DYN_ROUTER_SNAPSHOT_THRESHOLD",
default=1000000,
help=(
"KV Router: Number of messages in stream before triggering a snapshot. "
),
arg_type=int,
)
add_negatable_bool_argument(
g,
flag_name="--router-reset-states",
env_var="DYN_ROUTER_RESET_STATES",
default=False,
help=(
"KV Router: Reset router state on startup, purging stream and object store. "
"By default, states are persisted. WARNING: This can affect existing router "
"replicas."
),
)
add_negatable_bool_argument(
g,
flag_name="--durable-kv-events",
env_var="DYN_DURABLE_KV_EVENTS",
default=False,
help=(
"KV Router: Enable durable KV events using NATS JetStream instead of NATS Core. "
"By default, the router uses the generic event plane (NATS Core or ZMQ) with "
"local_indexer mode. Use this flag when you need durability and multi-replica "
"consistency. Requires NATS with JetStream enabled."
),
)
add_negatable_bool_argument(
g,
flag_name="--track-active-blocks",
env_var="DYN_TRACK_ACTIVE_BLOCKS",
default=True,
dest="router_track_active_blocks",
help=(
"KV Router: Track active blocks (blocks being used for ongoing generation). "
"By default, active blocks are tracked for load balancing. "
),
)
add_negatable_bool_argument(
g,
flag_name="--assume-kv-reuse",
env_var="DYN_ASSUME_KV_REUSE",
default=True,
dest="router_assume_kv_reuse",
help=(
"KV Router: When tracking active blocks, assume KV cache reuse. "
"Use --no-assume-kv-reuse to generate random hashes instead (when KV cache reuse is not expected)."
),
)
add_negatable_bool_argument(
g,
flag_name="--track-output-blocks",
env_var="DYN_ROUTER_TRACK_OUTPUT_BLOCKS",
default=False,
dest="router_track_output_blocks",
help=(
"KV Router: Track output blocks during generation. When enabled, the router adds "
"placeholder blocks as tokens are generated and applies fractional decay based on "
"progress toward expected_output_tokens."
),
)
add_argument(
g,
flag_name="--router-event-threads",
env_var="DYN_ROUTER_EVENT_THREADS",
default=1,
help=(
"KV Router: Number of event processing threads. When > 1, uses a concurrent radix tree with a thread pool for higher throughput."
),
arg_type=int,
)
add_negatable_bool_argument(
g,
flag_name="--enforce-disagg",
env_var="DYN_ENFORCE_DISAGG",
default=False,
help=(
"Enforce disaggregated prefill-decode. When set, unactivated prefill router will "
"return an error instead of falling back to decode-only mode."
),
)
add_argument(
g,
flag_name="--migration-limit",
env_var="DYN_MIGRATION_LIMIT",
default=0,
help=(
"Maximum number of times a request may be migrated to a different engine worker. "
"When > 0, enables request migration on worker disconnect."
),
arg_type=int,
)
add_argument(
g,
flag_name="--active-decode-blocks-threshold",
env_var="DYN_ACTIVE_DECODE_BLOCKS_THRESHOLD",
default=None,
help=(
"Threshold percentage (0.0-1.0) for determining when a worker is considered busy "
"based on KV cache block utilization. If not set, blocks-based busy detection is disabled."
),
arg_type=float,
)
add_argument(
g,
flag_name="--active-prefill-tokens-threshold",
env_var="DYN_ACTIVE_PREFILL_TOKENS_THRESHOLD",
default=None,
help=(
"Literal token count threshold for determining when a worker is considered busy "
"based on prefill token utilization. When active prefill tokens exceed this "
"threshold, the worker is marked as busy. If not set, tokens-based busy detection is disabled."
),
arg_type=int,
)
add_argument(
g,
flag_name="--active-prefill-tokens-threshold-frac",
env_var="DYN_ACTIVE_PREFILL_TOKENS_THRESHOLD_FRAC",
default=None,
help=(
"Fraction of max_num_batched_tokens for busy detection. Worker is busy when "
"active_prefill_tokens > frac * max_num_batched_tokens. Default 1.5 (disabled). "
"Uses OR logic with --active-prefill-tokens-threshold."
),
arg_type=float,
)
add_argument(
g,
flag_name="--model-name",
env_var="DYN_MODEL_NAME",
default=None,
help="Model name as a string (e.g., 'Llama-3.2-1B-Instruct')",
arg_type=validate_model_name,
)
add_argument(
g,
flag_name="--model-path",
env_var="DYN_MODEL_PATH",
default=None,
help="Path to model directory on disk (e.g., /tmp/model_cache/llama3.2_1B/)",
arg_type=validate_model_path,
)
add_argument(
g,
flag_name="--metrics-prefix",
env_var="DYN_METRICS_PREFIX",
default=None,
help=(
"Prefix for Dynamo frontend metrics. If unset, uses DYN_METRICS_PREFIX env var "
"or 'dynamo_frontend'."
),
)
add_negatable_bool_argument(
g,
flag_name="--kserve-grpc-server",
env_var="DYN_KSERVE_GRPC_SERVER",
default=False,
help="Start KServe gRPC server.",
)
add_argument(
g,
flag_name="--grpc-metrics-port",
env_var="DYN_GRPC_METRICS_PORT",
default=8788,
help=(
"HTTP metrics port for gRPC service (u16). Only used with --kserve-grpc-server. "
"Defaults to 8788."
),
arg_type=int,
)
add_argument(
g,
flag_name="--dump-config-to",
env_var="DYN_DUMP_CONFIG_TO",
default=None,
help="Dump config to the specified file path.",
)
add_argument(
g,
flag_name="--store-kv",
env_var="DYN_STORE_KV",
default="etcd",
help=(
"Which key-value backend to use: etcd, mem, file. Etcd uses the ETCD_* env vars "
"(e.g. ETCD_ENDPOINTS) for connection details. File uses root dir from env var "
"DYN_FILE_KV or defaults to $TMPDIR/dynamo_store_kv."
),
choices=["etcd", "file", "mem"],
)
add_argument(
g,
flag_name="--request-plane",
env_var="DYN_REQUEST_PLANE",
default="tcp",
help=(
"Determines how requests are distributed from routers to workers. "
"'tcp' is fastest [nats|http|tcp]"
),
choices=["nats", "http", "tcp"],
)
add_argument(
g,
flag_name="--event-plane",
env_var="DYN_EVENT_PLANE",
default="nats",
help="Determines how events are published [nats|zmq]",
choices=["nats", "zmq"],
)
add_argument(
g,
flag_name="--chat-processor",
env_var="DYN_CHAT_PROCESSOR",
default="dynamo",
help=(
"[EXPERIMENTAL] When set to 'vllm', use local vllm for the pre and post "
"processor."
),
choices=["dynamo", "vllm"],
)
add_negatable_bool_argument(
g,
flag_name="--exp-python-factory",
env_var="DYN_EXP_PYTHON_FACTORY",
default=False,
help=(
"[EXPERIMENTAL] Enable Python-based engine factory. When set, engines will be "
"created via a Python callback instead of the default Rust pipeline."
),
)
......@@ -19,14 +19,14 @@ import argparse
import asyncio
import logging
import os
import pathlib
import signal
import sys
from argparse import Namespace
from typing import Optional
import uvloop
from dynamo.common.config_dump import dump_config
from dynamo.common.config_dump.config_dumper import add_config_dump_args
from dynamo.llm import (
EngineType,
EntrypointArgs,
......@@ -39,9 +39,7 @@ from dynamo.llm import (
from dynamo.runtime import DistributedRuntime
from dynamo.runtime.logging import configure_dynamo_logging
from . import __version__
DYN_NAMESPACE_ENV_VAR = "DYN_NAMESPACE"
from .frontend_args import FrontendArgGroup, FrontendConfig
configure_dynamo_logging()
logger = logging.getLogger(__name__)
......@@ -50,56 +48,41 @@ logger = logging.getLogger(__name__)
def setup_engine_factory(
runtime: DistributedRuntime,
router_config: RouterConfig,
flags: argparse.Namespace,
): # Returns EngineFactory:
config: FrontendConfig,
vllm_flags: Namespace,
):
"""
When using vllm pre and post processor, create the EngineFactory that
creates the engines that run requests.
"""
from .vllm_processor import EngineFactory
return EngineFactory(runtime, router_config, flags)
def validate_model_name(value):
"""Validate that model-name is a non-empty string."""
if not value or not isinstance(value, str) or len(value.strip()) == 0:
raise argparse.ArgumentTypeError(
f"model-name must be a non-empty string, got: {value}"
)
return value.strip()
def validate_model_path(value):
"""Validate that model-path is a valid directory on disk."""
if not os.path.isdir(value):
raise argparse.ArgumentTypeError(
f"model-path must be a valid directory on disk, got: {value}"
)
return value
return EngineFactory(runtime, router_config, config, vllm_flags)
def parse_args():
def parse_args() -> tuple[FrontendConfig, Optional[Namespace]]:
"""Parse command-line arguments for the Dynamo frontend.
Returns:
argparse.Namespace: Parsed command-line arguments.
FrontendConfig: Parsed configuration object.
"""
# We need to know before we parse the arguments
full_args = " ".join(sys.argv)
is_vllm = (
"--chat-processor vllm" in full_args or "--chat-processor=vllm" in full_args
parser = argparse.ArgumentParser(
description="Dynamo Frontend: HTTP+Pre-processor+Router",
formatter_class=argparse.RawTextHelpFormatter, # To preserve multi-line help formatting
)
if not is_vllm:
# Normal case, Dynamo processor
parser = argparse.ArgumentParser(
description="Dynamo Frontend: HTTP+Pre-processor+Router",
formatter_class=argparse.RawTextHelpFormatter, # To preserve multi-line help formatting
)
else:
# vllm processor
FrontendArgGroup().add_arguments(parser)
args, unknown = parser.parse_known_args()
config = FrontendConfig.from_cli_args(args)
config.validate()
vllm_flags = None
# parse extra vllm flags using vllm native parser.
if config.chat_processor == "vllm":
try:
from vllm.utils import FlexibleArgumentParser
except ImportError:
......@@ -110,259 +93,23 @@ def parse_args():
"Flag '--chat-processor vllm' requires vllm be installed."
)
sys.exit(1)
parser = FlexibleArgumentParser(
description="Dynamo Frontend: HTTP+Pre-processor+Router",
)
parser.add_argument(
"--version", action="version", version=f"Dynamo Frontend {__version__}"
)
parser.add_argument(
"-i", "--interactive", action="store_true", help="Interactive text chat"
)
parser.add_argument(
"--kv-cache-block-size",
type=int,
default=os.environ.get("DYN_KV_CACHE_BLOCK_SIZE"),
help="KV cache block size (u32). Can be set via DYN_KV_CACHE_BLOCK_SIZE env var.",
)
parser.add_argument(
"--http-host",
type=str,
default=os.environ.get("DYN_HTTP_HOST", "0.0.0.0"),
help="HTTP host for the engine (str). Can be set via DYN_HTTP_HOST env var.",
)
parser.add_argument(
"--http-port",
type=int,
default=int(os.environ.get("DYN_HTTP_PORT", "8000")),
help="HTTP port for the engine (u16). Can be set via DYN_HTTP_PORT env var.",
)
parser.add_argument(
"--tls-cert-path",
type=pathlib.Path,
default=None,
help="TLS certificate path, PEM format.",
)
parser.add_argument(
"--tls-key-path",
type=pathlib.Path,
default=None,
help="TLS certificate key path, PEM format.",
)
parser.add_argument(
"--router-mode",
type=str,
choices=["round-robin", "random", "kv"],
default=os.environ.get("DYN_ROUTER_MODE", "round-robin"),
help="How to route the request. Can be set via DYN_ROUTER_MODE env var.",
)
parser.add_argument(
"--kv-overlap-score-weight",
type=float,
default=float(os.environ.get("DYN_KV_OVERLAP_SCORE_WEIGHT", "1.0")),
help="KV Router: Weight for overlap score in worker selection. Higher values prioritize KV cache reuse.",
)
parser.add_argument(
"--router-temperature",
type=float,
default=float(os.environ.get("DYN_ROUTER_TEMPERATURE", "0.0")),
help="KV Router: Temperature for worker sampling via softmax. Higher values promote more randomness, and 0 fallbacks to deterministic.",
)
parser.add_argument(
"--kv-events",
action=argparse.BooleanOptionalAction,
dest="use_kv_events",
default=(
os.environ.get("DYN_KV_EVENTS", "true").lower() == "true"
), # default is true
help="KV Router: Enable/disable KV events. Use --kv-events to enable (default, router receives cache state events from workers) or --no-kv-events to disable (router predicts cache state based on routing decisions).",
)
parser.add_argument(
"--router-ttl",
type=float,
default=float(os.environ.get("DYN_ROUTER_TTL", "120.0")),
help="KV Router: Time-to-live in seconds for blocks when KV events are disabled. Only used when --no-kv-events is set. Can be set via DYN_ROUTER_TTL env var (default: 120.0).",
)
parser.add_argument(
"--router-max-tree-size",
type=int,
default=int(os.environ.get("DYN_ROUTER_MAX_TREE_SIZE", str(2**20))),
help="KV Router: Maximum tree size before pruning when KV events are disabled. Only used when --no-kv-events is set. Can be set via DYN_ROUTER_MAX_TREE_SIZE env var (default: 1048576, which is 2^20).",
)
parser.add_argument(
"--router-prune-target-ratio",
type=float,
default=float(os.environ.get("DYN_ROUTER_PRUNE_TARGET_RATIO", "0.8")),
help="KV Router: Target size ratio after pruning when KV events are disabled. Only used when --no-kv-events is set. Can be set via DYN_ROUTER_PRUNE_TARGET_RATIO env var (default: 0.8).",
)
parser.add_argument(
"--namespace",
type=str,
default=os.environ.get(DYN_NAMESPACE_ENV_VAR),
help="Dynamo namespace for model discovery scoping. If specified, models will only be discovered from this namespace. If not specified, discovers models from all namespaces (global discovery).",
)
parser.add_argument(
"--router-replica-sync",
action="store_true",
default=False,
help="KV Router: Enable replica synchronization across multiple router instances. When true, routers will publish and subscribe to events to maintain consistent state.",
)
parser.add_argument(
"--router-snapshot-threshold",
type=int,
default=1000000,
help="KV Router: Number of messages in stream before triggering a snapshot. Defaults to 1000000.",
)
parser.add_argument(
"--router-reset-states",
action="store_true",
dest="router_reset_states",
default=False,
help="KV Router: Reset router state on startup, purging stream and object store. By default, states are persisted. WARNING: This can affect existing router replicas.",
)
parser.add_argument(
"--durable-kv-events",
action="store_true",
dest="durable_kv_events",
default=False,
help="KV Router: Enable durable KV events using NATS JetStream instead of NATS Core. By default, the router uses the generic event plane (NATS Core or ZMQ) with local_indexer mode. Use this flag when you need durability and multi-replica consistency. Requires NATS with JetStream enabled.",
)
parser.add_argument(
"--no-track-active-blocks",
action="store_false",
dest="router_track_active_blocks",
default=True,
help="KV Router: Disable tracking of active blocks (blocks being used for ongoing generation). By default, active blocks are tracked for load balancing.",
)
parser.add_argument(
"--no-assume-kv-reuse",
action="store_false",
dest="router_assume_kv_reuse",
default=True,
help="KV Router: When tracking active blocks, do not assume KV cache reuse (generate random hashes instead of computing actual block hashes). Useful when KV cache reuse is not expected. By default, KV cache reuse is assumed.",
)
parser.add_argument(
"--track-output-blocks",
action="store_true",
dest="router_track_output_blocks",
default=False,
help="KV Router: Track output blocks during generation. When enabled, the router adds placeholder blocks as tokens are generated and applies fractional decay based on progress toward expected_output_tokens. By default, output blocks are not tracked.",
)
parser.add_argument(
"--router-event-threads",
type=int,
default=int(os.environ.get("DYN_ROUTER_EVENT_THREADS", "1")),
help="KV Router: Number of event processing threads. When > 1, uses a concurrent radix tree with a thread pool for higher throughput. Can be set via DYN_ROUTER_EVENT_THREADS env var (default: 1).",
)
parser.add_argument(
"--enforce-disagg",
action="store_true",
default=False,
help="Enforce disaggregated prefill-decode. When set, unactivated prefill router will return an error instead of falling back to decode-only mode.",
)
parser.add_argument(
"--migration-limit",
type=int,
default=0,
help="Maximum number of times a request may be migrated to a different engine worker. When > 0, enables request migration on worker disconnect (default: 0).",
)
parser.add_argument(
"--active-decode-blocks-threshold",
type=float,
default=None,
help="Threshold percentage (0.0-1.0) for determining when a worker is considered busy based on KV cache block utilization. If not set, blocks-based busy detection is disabled.",
)
parser.add_argument(
"--active-prefill-tokens-threshold",
type=int,
default=None,
help="Literal token count threshold for determining when a worker is considered busy based on prefill token utilization. When active prefill tokens exceed this threshold, the worker is marked as busy. If not set, tokens-based busy detection is disabled.",
)
parser.add_argument(
"--active-prefill-tokens-threshold-frac",
type=float,
default=None,
help="Fraction of max_num_batched_tokens for busy detection. Worker is busy when active_prefill_tokens > frac * max_num_batched_tokens. Default 1.5 (disabled). Uses OR logic with --active-prefill-tokens-threshold.",
)
parser.add_argument(
"--model-name",
type=validate_model_name,
help="Model name as a string (e.g., 'Llama-3.2-1B-Instruct')",
)
parser.add_argument(
"--model-path",
type=validate_model_path,
help="Path to model directory on disk (e.g., /tmp/model_cache/llama3.2_1B/)",
)
parser.add_argument(
"--metrics-prefix",
type=str,
default=None,
help="Prefix for Dynamo frontend metrics. If unset, uses DYN_METRICS_PREFIX env var or 'dynamo_frontend'.",
)
parser.add_argument(
"--kserve-grpc-server",
action="store_true",
default=False,
help="Start KServe gRPC server.",
)
parser.add_argument(
"--grpc-metrics-port",
type=int,
default=8788,
help="HTTP metrics port for gRPC service (u16). Only used with --kserve-grpc-server. Defaults to 8788.",
)
add_config_dump_args(parser)
parser.add_argument(
"--store-kv",
type=str,
choices=["etcd", "file", "mem"],
default=os.environ.get("DYN_STORE_KV", "etcd"),
help="Which key-value backend to use: etcd, mem, file. Etcd uses the ETCD_* env vars (e.g. ETCD_ENDPOINTS) for connection details. File uses root dir from env var DYN_FILE_KV or defaults to $TMPDIR/dynamo_store_kv.",
)
parser.add_argument(
"--request-plane",
type=str,
choices=["nats", "http", "tcp"],
default=os.environ.get("DYN_REQUEST_PLANE", "tcp"),
help="Determines how requests are distributed from routers to workers. 'tcp' is fastest [nats|http|tcp]",
)
parser.add_argument(
"--event-plane",
type=str,
choices=["nats", "zmq"],
default=os.environ.get("DYN_EVENT_PLANE", "nats"),
help="Determines how events are published [nats|zmq]",
)
parser.add_argument(
"--chat-processor",
dest="chat_processor",
type=str,
choices=["dynamo", "vllm"],
default="dynamo",
help="[EXPERIMENTAL] When set to 'vllm', use local vllm for the pre and post processor.",
)
if is_vllm:
try:
from vllm.engine.arg_utils import AsyncEngineArgs
from vllm.entrypoints.openai.cli_args import FrontendArgs
parser = FrontendArgs.add_cli_args(parser)
parser = AsyncEngineArgs.add_cli_args(parser)
except ModuleNotFoundError:
logger.exception("Flag '--chat-processor vllm' requires vllm be installed.")
sys.exit(1)
flags = parser.parse_args()
if bool(flags.tls_cert_path) ^ bool(flags.tls_key_path): # ^ is XOR
parser.error("--tls-cert-path and --tls-key-path must be provided together")
if flags.migration_limit < 0 or flags.migration_limit > 4294967295:
parser.error("--migration-limit must be between 0 and 4294967295 (0=disabled)")
return flags
vllm_parser = FlexibleArgumentParser(add_help=False)
vllm_parser = FrontendArgs.add_cli_args(vllm_parser)
vllm_parser = AsyncEngineArgs.add_cli_args(vllm_parser)
# the result is returned as Namespace object rather than AsyncEngineArgs object to avoid import error for non-vllm users.
vllm_flags = vllm_parser.parse_args(unknown)
else:
if unknown:
logger.error(f"Unknown arguments specified: {unknown}")
sys.exit(1)
return config, vllm_flags
async def async_main():
......@@ -378,12 +125,12 @@ async def async_main():
# bind that port before the worker, causing port conflicts and/or scraping the
# wrong metrics endpoint.
os.environ.pop("DYN_SYSTEM_PORT", None)
flags = parse_args()
dump_config(flags.dump_config_to, flags)
os.environ["DYN_EVENT_PLANE"] = flags.event_plane
config, vllm_flags = parse_args()
dump_config(config.dump_config_to, config)
os.environ["DYN_EVENT_PLANE"] = config.event_plane
logger.info(
f"Request migration {'enabled' if flags.migration_limit > 0 else 'disabled'} "
f"(limit: {flags.migration_limit})"
f"Request migration {'enabled' if config.migration_limit > 0 else 'disabled'} "
f"(limit: {config.migration_limit})"
)
# Warn if DYN_SYSTEM_PORT is set (frontend doesn't use system metrics server)
if os.environ.get("DYN_SYSTEM_PORT"):
......@@ -396,29 +143,31 @@ async def async_main():
)
# Configure Dynamo frontend HTTP service metrics prefix
if flags.metrics_prefix is not None:
prefix = flags.metrics_prefix.strip()
if config.metrics_prefix is not None:
prefix = config.metrics_prefix.strip()
if prefix:
os.environ["DYN_METRICS_PREFIX"] = flags.metrics_prefix
os.environ["DYN_METRICS_PREFIX"] = config.metrics_prefix
# NATS is needed when:
# 1. Request plane is NATS, OR
# 2. Durable KV events (JetStream) is explicitly requested, OR
# 3. Event plane is NATS AND KV router mode AND (KV events OR replica sync enabled)
# Note: NATS Core (without JetStream) is the default for KV events when durable_kv_events=False
enable_nats = flags.request_plane == "nats" or (
flags.router_mode == "kv"
enable_nats = config.request_plane == "nats" or (
config.router_mode == "kv"
and (
flags.durable_kv_events
config.durable_kv_events
or (
flags.event_plane == "nats"
and (flags.use_kv_events or flags.router_replica_sync)
config.event_plane == "nats"
and (config.use_kv_events or config.router_replica_sync)
)
)
)
loop = asyncio.get_running_loop()
runtime = DistributedRuntime(loop, flags.store_kv, flags.request_plane, enable_nats)
runtime = DistributedRuntime(
loop, config.store_kv, config.request_plane, enable_nats
)
def signal_handler():
asyncio.create_task(graceful_shutdown(runtime))
......@@ -426,25 +175,25 @@ async def async_main():
for sig in (signal.SIGTERM, signal.SIGINT):
loop.add_signal_handler(sig, signal_handler)
if flags.router_mode == "kv":
if config.router_mode == "kv":
router_mode = RouterMode.KV
kv_router_config = KvRouterConfig(
overlap_score_weight=flags.kv_overlap_score_weight,
router_temperature=flags.router_temperature,
use_kv_events=flags.use_kv_events,
durable_kv_events=flags.durable_kv_events,
router_replica_sync=flags.router_replica_sync,
router_track_active_blocks=flags.router_track_active_blocks,
router_track_output_blocks=flags.router_track_output_blocks,
router_assume_kv_reuse=flags.router_assume_kv_reuse,
router_snapshot_threshold=flags.router_snapshot_threshold,
router_reset_states=flags.router_reset_states,
router_ttl_secs=flags.router_ttl,
router_max_tree_size=flags.router_max_tree_size,
router_prune_target_ratio=flags.router_prune_target_ratio,
router_event_threads=flags.router_event_threads,
overlap_score_weight=config.kv_overlap_score_weight,
router_temperature=config.router_temperature,
use_kv_events=config.use_kv_events,
durable_kv_events=config.durable_kv_events,
router_replica_sync=config.router_replica_sync,
router_track_active_blocks=config.router_track_active_blocks,
router_track_output_blocks=config.router_track_output_blocks,
router_assume_kv_reuse=config.router_assume_kv_reuse,
router_snapshot_threshold=config.router_snapshot_threshold,
router_reset_states=config.router_reset_states,
router_ttl_secs=config.router_ttl,
router_max_tree_size=config.router_max_tree_size,
router_prune_target_ratio=config.router_prune_target_ratio,
router_event_threads=config.router_event_threads,
)
elif flags.router_mode == "random":
elif config.router_mode == "random":
router_mode = RouterMode.Random
kv_router_config = None
else:
......@@ -454,34 +203,38 @@ async def async_main():
router_config = RouterConfig(
router_mode,
kv_router_config,
active_decode_blocks_threshold=flags.active_decode_blocks_threshold,
active_prefill_tokens_threshold=flags.active_prefill_tokens_threshold,
active_prefill_tokens_threshold_frac=flags.active_prefill_tokens_threshold_frac,
enforce_disagg=flags.enforce_disagg,
active_decode_blocks_threshold=config.active_decode_blocks_threshold,
active_prefill_tokens_threshold=config.active_prefill_tokens_threshold,
active_prefill_tokens_threshold_frac=config.active_prefill_tokens_threshold_frac,
enforce_disagg=config.enforce_disagg,
)
kwargs = {
"http_host": flags.http_host,
"http_port": flags.http_port,
"kv_cache_block_size": flags.kv_cache_block_size,
"http_host": config.http_host,
"http_port": config.http_port,
"kv_cache_block_size": config.kv_cache_block_size,
"router_config": router_config,
"migration_limit": config.migration_limit,
}
if flags.model_name:
kwargs["model_name"] = flags.model_name
if flags.model_path:
kwargs["model_path"] = flags.model_path
if flags.tls_cert_path:
kwargs["tls_cert_path"] = flags.tls_cert_path
if flags.tls_key_path:
kwargs["tls_key_path"] = flags.tls_key_path
if flags.namespace:
kwargs["namespace"] = flags.namespace
if flags.kserve_grpc_server and flags.grpc_metrics_port:
kwargs["http_metrics_port"] = flags.grpc_metrics_port
if flags.chat_processor == "vllm":
if config.model_name:
kwargs["model_name"] = config.model_name
if config.model_path:
kwargs["model_path"] = config.model_path
if config.tls_cert_path:
kwargs["tls_cert_path"] = config.tls_cert_path
if config.tls_key_path:
kwargs["tls_key_path"] = config.tls_key_path
if config.namespace:
kwargs["namespace"] = config.namespace
if config.kserve_grpc_server and config.grpc_metrics_port:
kwargs["http_metrics_port"] = config.grpc_metrics_port
if config.chat_processor == "vllm":
assert (
vllm_flags is not None
), "vllm_flags is required when chat_processor is vllm"
chat_engine_factory = setup_engine_factory(
runtime, router_config, flags
runtime, router_config, config, vllm_flags
).chat_engine_factory
kwargs["chat_engine_factory"] = chat_engine_factory
......@@ -489,9 +242,9 @@ async def async_main():
engine = await make_engine(runtime, e)
try:
if flags.interactive:
if config.interactive:
await run_input(runtime, "text", engine)
elif flags.kserve_grpc_server:
elif config.kserve_grpc_server:
await run_input(runtime, "grpc", engine)
else:
await run_input(runtime, "http", engine)
......
......@@ -24,6 +24,7 @@ from vllm.v1.engine import EngineCoreOutput, EngineCoreRequest, FinishReason
from vllm.v1.engine.input_processor import InputProcessor
from vllm.v1.engine.output_processor import OutputProcessor, OutputProcessorOutput
from dynamo.frontend.frontend_args import FrontendConfig
from dynamo.llm import (
KvPushRouter,
ModelCardInstanceId,
......@@ -367,10 +368,12 @@ class EngineFactory:
self,
runtime: DistributedRuntime,
router_config: RouterConfig,
config: FrontendConfig,
flags: Namespace,
):
self.runtime = runtime
self.router_config = router_config
self.config = config
self.flags = flags
async def chat_engine_factory(
......@@ -444,7 +447,7 @@ class EngineFactory:
if self.router_config.router_mode == RouterMode.KV:
router = KvPushRouter(
endpoint=generate_endpoint,
block_size=self.flags.kv_cache_block_size or 16,
block_size=self.config.kv_cache_block_size or 16,
kv_router_config=self.router_config.kv_router_config,
)
else:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment