Unverified Commit f4f82762 authored by jh-nv's avatar jh-nv Committed by GitHub
Browse files

feat: migrate SGLang configuration (#6280)


Signed-off-by: default avatarjh-nv <jihao@nvidia.com>
parent 359765d3
...@@ -15,6 +15,7 @@ class DynamoRuntimeConfig(ConfigBase): ...@@ -15,6 +15,7 @@ class DynamoRuntimeConfig(ConfigBase):
"""Configuration for Dynamo runtime (common across all backends).""" """Configuration for Dynamo runtime (common across all backends)."""
namespace: str namespace: str
endpoint: Optional[str] = None
discovery_backend: str discovery_backend: str
request_plane: str request_plane: str
event_plane: str event_plane: str
...@@ -52,6 +53,13 @@ class DynamoRuntimeArgGroup(ArgGroup): ...@@ -52,6 +53,13 @@ class DynamoRuntimeArgGroup(ArgGroup):
default="dynamo", default="dynamo",
help="Dynamo namespace", help="Dynamo namespace",
) )
add_argument(
g,
flag_name="--endpoint",
env_var="DYN_ENDPOINT",
default=None,
help="Dynamo endpoint string in 'dyn://namespace.component.endpoint' format. Example: dyn://dynamo.backend.generate. Currently used only by TRT-LLM and SGLang backends.",
)
add_argument( add_argument(
g, g,
flag_name="--discovery-backend", flag_name="--discovery-backend",
......
...@@ -9,206 +9,47 @@ import socket ...@@ -9,206 +9,47 @@ import socket
import sys import sys
import tempfile import tempfile
from argparse import Namespace from argparse import Namespace
from dataclasses import dataclass
from enum import Enum from enum import Enum
from pathlib import Path from pathlib import Path
from typing import Any, Dict, Generator, List, Optional from typing import Any, Dict, Generator, Optional
import yaml import yaml
from sglang.srt.server_args import ServerArgs from sglang.srt.server_args import ServerArgs
from sglang.srt.server_args_config_parser import ConfigArgumentMerger from sglang.srt.server_args_config_parser import ConfigArgumentMerger
from dynamo._core import get_reasoning_parser_names, get_tool_parser_names
from dynamo.common.config_dump import register_encoder from dynamo.common.config_dump import register_encoder
from dynamo.common.configuration.groups import DynamoRuntimeConfig
from dynamo.common.configuration.groups.runtime_args import DynamoRuntimeArgGroup
from dynamo.common.utils.runtime import parse_endpoint from dynamo.common.utils.runtime import parse_endpoint
from dynamo.llm import fetch_model from dynamo.llm import fetch_model
from dynamo.runtime.logging import configure_dynamo_logging from dynamo.runtime.logging import configure_dynamo_logging
from dynamo.sglang import __version__ from dynamo.sglang.backend_args import DynamoSGLangArgGroup, DynamoSGLangConfig
configure_dynamo_logging() configure_dynamo_logging()
DYN_NAMESPACE = os.environ.get("DYN_NAMESPACE", "dynamo")
DEFAULT_ENDPOINT = f"dyn://{DYN_NAMESPACE}.backend.generate"
DYNAMO_ARGS: Dict[str, Dict[str, Any]] = {
"endpoint": {
"flags": ["--endpoint"],
"type": str,
"help": f"Dynamo endpoint string in 'dyn://namespace.component.endpoint' format. Example: {DEFAULT_ENDPOINT}",
},
"tool-call-parser": {
"flags": ["--dyn-tool-call-parser"],
"type": str,
"default": None,
"choices": get_tool_parser_names(),
"help": "Tool call parser name for the model.",
},
"reasoning-parser": {
"flags": ["--dyn-reasoning-parser"],
"type": str,
"default": None,
"choices": get_reasoning_parser_names(),
"help": "Reasoning parser name for the model. If not specified, no reasoning parsing is performed.",
},
"custom-jinja-template": {
"flags": ["--custom-jinja-template"],
"type": str,
"default": None,
"help": "Path to a custom Jinja template file to override the model's default chat template. This template will take precedence over any template found in the model repository. This template will be applied by Dynamo's preprocessor and cannot be used with --use-sglang-tokenizer.",
},
"endpoint-types": {
"flags": ["--dyn-endpoint-types"],
"type": str,
"default": "chat,completions",
"help": "Comma-separated list of endpoint types to enable. Options: 'chat', 'completions'. Default: 'chat,completions'. Use 'completions' for models without chat templates.",
},
"use-sglang-tokenizer": {
"flags": ["--use-sglang-tokenizer"],
"action": "store_true",
"default": False,
"help": "Use SGLang's tokenizer for pre and post processing. This bypasses Dynamo's preprocessor and only v1/chat/completions will be available through the Dynamo frontend. Cannot be used with --custom-jinja-template.",
},
"multimodal-processor": {
"flags": ["--multimodal-processor"],
"action": "store_true",
"default": False,
"help": "Run as multimodal processor component for handling multimodal requests",
},
"multimodal-encode-worker": {
"flags": ["--multimodal-encode-worker"],
"action": "store_true",
"default": False,
"help": "Run as multimodal encode worker component for processing images/videos",
},
"multimodal-worker": {
"flags": ["--multimodal-worker"],
"action": "store_true",
"default": False,
"help": "Run as multimodal worker component for LLM inference with multimodal data",
},
"embedding-worker": {
"flags": ["--embedding-worker"],
"action": "store_true",
"default": False,
"help": "Run as embedding worker component (Dynamo flag, also sets SGLang's --is-embedding)",
},
"dump-config-to": {
"flags": ["--dump-config-to"],
"type": str,
"default": None,
"help": "Dump debug config to the specified file path. If not specified, the config will be dumped to stdout at INFO level.",
},
"discovery-backend": {
"flags": ["--discovery-backend"],
"type": str,
"choices": ["kubernetes", "etcd", "file", "mem"],
"default": os.environ.get("DYN_DISCOVERY_BACKEND", "etcd"),
"help": "Discovery backend: kubernetes (K8s API), etcd (distributed KV), file (local filesystem), mem (in-memory). Etcd uses the ETCD_* env vars (e.g. ETCD_ENDPOINTS) for connection details. File uses root dir from env var DYN_FILE_KV or defaults to $TMPDIR/dynamo_store_kv.",
},
"request-plane": {
"flags": ["--request-plane"],
"type": str,
"choices": ["nats", "http", "tcp"],
"default": os.environ.get("DYN_REQUEST_PLANE", "tcp"),
"help": "Determines how requests are distributed from routers to workers. 'tcp' is fastest [nats|http|tcp]",
},
"event-plane": {
"flags": ["--event-plane"],
"type": str,
"choices": ["nats", "zmq"],
"default": os.environ.get("DYN_EVENT_PLANE", "nats"),
"help": "Determines how events are published [nats|zmq]",
},
"durable-kv-events": {
"flags": ["--durable-kv-events"],
"action": "store_true",
"default": os.environ.get("DYN_DURABLE_KV_EVENTS", "false").lower() == "true",
"help": "Enable durable KV events using NATS JetStream instead of the local indexer. By default, local indexer is enabled for lower latency. Use this flag when you need durability and multi-replica router consistency. Requires NATS with JetStream enabled. Can also be set via DYN_DURABLE_KV_EVENTS=true env var.",
},
"image-diffusion-worker": {
"flags": ["--image-diffusion-worker"],
"action": "store_true",
"default": False,
"help": "Run as image diffusion worker for image generation",
},
"image-diffusion-fs-url": {
"flags": ["--image-diffusion-fs-url"],
"type": str,
"default": None,
"help": "Filesystem URL for storing generated images using fsspec (e.g., s3://bucket/path, gs://bucket/path, file:///local/path). Supports any fsspec-compatible filesystem.",
},
"video-generation-worker": {
"flags": ["--video-generation-worker"],
"action": "store_true",
"default": False,
"help": "Run as video generation worker for video generation (T2V/I2V)",
},
"video-generation-fs-url": {
"flags": ["--video-generation-fs-url"],
"type": str,
"default": None,
"help": "Filesystem URL for storing generated videos using fsspec (e.g., s3://bucket/path, gs://bucket/path, file:///local/path). Supports any fsspec-compatible filesystem.",
},
}
@dataclass
class DynamoArgs:
namespace: str
component: str
endpoint: str
discovery_backend: str
request_plane: str
event_plane: str
# tool and reasoning parser options
tool_call_parser: Optional[str] = None
reasoning_parser: Optional[str] = None
custom_jinja_template: Optional[str] = None
# endpoint types to enable
dyn_endpoint_types: str = "chat,completions"
# preprocessing options class DisaggregationMode(Enum):
use_sglang_tokenizer: bool = False AGGREGATED = "agg"
PREFILL = "prefill"
DECODE = "decode"
# multimodal options
multimodal_processor: bool = False
multimodal_encode_worker: bool = False
multimodal_worker: bool = False
# embedding options class DynamoConfig(DynamoRuntimeConfig, DynamoSGLangConfig):
embedding_worker: bool = False """Combined configuration container for SGLang server and Dynamo args."""
# diffusion language model options (derived from server_args.dllm_algorithm) component: str
diffusion_worker: bool = False diffusion_worker: bool = False
# config dump options
dump_config_to: Optional[str] = None
# local indexer option
enable_local_indexer: bool = True
# Whether to enable NATS for KV events (derived from server_args.kv_events_config)
use_kv_events: bool = False use_kv_events: bool = False
# image diffusion options def validate(self) -> None:
image_diffusion_worker: bool = False DynamoRuntimeConfig.validate(self)
image_diffusion_fs_url: Optional[str] = None DynamoSGLangConfig.validate(self)
# video generation options
video_generation_worker: bool = False
video_generation_fs_url: Optional[str] = None
class DisaggregationMode(Enum):
AGGREGATED = "agg"
PREFILL = "prefill"
DECODE = "decode"
class Config: class Config:
"""Combined configuration container for SGLang server and Dynamo args.""" """Combined configuration container for SGLang server and Dynamo args."""
def __init__(self, server_args: ServerArgs, dynamo_args: DynamoArgs) -> None: def __init__(self, server_args: ServerArgs, dynamo_args: DynamoConfig) -> None:
self.server_args = server_args self.server_args = server_args
self.dynamo_args = dynamo_args self.dynamo_args = dynamo_args
self.serving_mode = self._set_serving_strategy() self.serving_mode = self._set_serving_strategy()
...@@ -248,68 +89,77 @@ def _validate_parser_flags( ...@@ -248,68 +89,77 @@ def _validate_parser_flags(
sys.exit(1) sys.exit(1)
def _extract_config_section( def _has_cli_flag(args: list[str], flag: str) -> bool:
args: List[str], config_path: str, config_key: str """Return True when a CLI flag is present in '--flag val' or '--flag=val' form."""
) -> tuple[List[str], str]: return any(arg == flag or arg.startswith(f"{flag}=") for arg in args)
"""
Extract a section from nested YAML and create temp flat file.
Args:
args: CLI arguments list
config_path: Path to the YAML config file
config_key: Key to extract from nested YAML
Returns: def _remove_cli_flag_and_value(args: list[str], flag: str) -> list[str]:
tuple: (modified args with temp file path, temp file path for cleanup) """Remove a flag from CLI args, supporting '--flag val' and '--flag=val' forms."""
updated: list[str] = []
skip_next = False
for arg in args:
if skip_next:
skip_next = False
continue
if arg == flag:
skip_next = True
continue
if arg.startswith(f"{flag}="):
continue
updated.append(arg)
return updated
Raises:
ValueError: If config file not found, key missing, or invalid format def _load_disagg_config_section(config_path: str, config_key: str) -> dict[str, Any]:
"""
Load a disaggregated config section from YAML.
The selected section must exist and be a dictionary.
""" """
logging.info(f"Extracting config section '{config_key}' from {config_path}") logging.info(f"Loading disagg config section '{config_key}' from {config_path}")
path = Path(config_path) path = Path(config_path)
if not path.exists(): if not path.exists():
raise ValueError(f"Config file not found: {config_path}") raise ValueError(f"Disagg config file not found: {config_path}")
with open(config_path, "r") as f: with open(config_path, "r", encoding="utf-8") as f:
config_data = yaml.safe_load(f) config_data = yaml.safe_load(f)
if not isinstance(config_data, dict): if not isinstance(config_data, dict):
raise ValueError( raise ValueError(
f"Config file must contain a dictionary, got {type(config_data).__name__}" f"Disagg config file must contain a dictionary, got {type(config_data).__name__}"
) )
available_keys = list(config_data.keys()) available_keys = list(config_data.keys())
logging.info(f"Available config keys in {config_path}: {available_keys}")
if config_key not in config_data: if config_key not in config_data:
raise ValueError( raise ValueError(
f"Config key '{config_key}' not found in {config_path}. " f"Disagg config key '{config_key}' not found in {config_path}. "
f"Available keys: {available_keys}" f"Available keys: {available_keys}"
) )
section_data = config_data[config_key] section_data = config_data[config_key]
if not isinstance(section_data, dict): if not isinstance(section_data, dict):
raise ValueError( raise ValueError(
f"Config section '{config_key}' must be a dictionary, got {type(section_data).__name__}" f"Disagg config section '{config_key}' must be a dictionary, got {type(section_data).__name__}"
) )
return section_data
def _dump_disagg_config_section(disagg_config: dict[str, Any]) -> str:
"""Dump the disaggregation configuration section to a YAML file."""
temp_fd, temp_path = tempfile.mkstemp(suffix=".yaml", prefix="dynamo_config_") temp_fd, temp_path = tempfile.mkstemp(suffix=".yaml", prefix="dynamo_config_")
try: try:
with os.fdopen(temp_fd, "w") as f: with os.fdopen(temp_fd, "w") as f:
yaml.dump(section_data, f) yaml.dump(disagg_config, f)
logging.info(f"Successfully wrote config section '{config_key}' to temp file") logging.info("Successfully wrote config section to temp file")
except Exception: except Exception:
os.unlink(temp_path) os.unlink(temp_path)
raise raise
config_index = args.index("--config") return temp_path
args = list(args)
args[config_index + 1] = temp_path
return args, temp_path
async def parse_args(args: list[str]) -> Config: async def parse_args(args: list[str]) -> Config:
...@@ -325,69 +175,57 @@ async def parse_args(args: list[str]) -> Config: ...@@ -325,69 +175,57 @@ async def parse_args(args: list[str]) -> Config:
Raises: Raises:
SystemExit: If arguments are invalid or incompatible. SystemExit: If arguments are invalid or incompatible.
""" """
parser = argparse.ArgumentParser() runtime_argspec = DynamoRuntimeArgGroup()
dynamo_sglang_argspec = DynamoSGLangArgGroup()
parser.add_argument( parser = argparse.ArgumentParser(
"--version", action="version", version=f"Dynamo Backend SGLang {__version__}" description="Dynamo SGLang worker configuration",
formatter_class=argparse.RawTextHelpFormatter,
) )
# Dynamo args runtime_argspec.add_arguments(parser)
for info in DYNAMO_ARGS.values(): dynamo_sglang_argspec.add_arguments(parser)
kwargs = {
"default": info["default"] if "default" in info else None,
"help": info["help"],
}
if "type" in info:
kwargs["type"] = info["type"]
if "choices" in info:
kwargs["choices"] = info["choices"]
if "action" in info:
action = info["action"]
# Handle string "BooleanOptionalAction" for dict-based config
if action == "BooleanOptionalAction":
kwargs["action"] = argparse.BooleanOptionalAction
else:
kwargs["action"] = action
parser.add_argument(*info["flags"], **kwargs)
# Config key argument (for nested configs)
parser.add_argument(
"--config-key",
type=str,
default=None,
help="Key to select from nested config file (e.g., 'prefill', 'decode')",
)
# SGLang args sglang_only_parser = argparse.ArgumentParser(add_help=False)
bootstrap_port = _reserve_disaggregation_bootstrap_port() ServerArgs.add_cli_args(sglang_only_parser)
ServerArgs.add_cli_args(parser)
# Add "gms" to --load-format choices so it passes argparse validation. # Add "gms" to --load-format choices so it passes argparse validation.
# The actual loader class is set in main.py when load_format == "gms". # The actual loader class is set in main.py when load_format == "gms".
for action in parser._actions: for action in sglang_only_parser._actions:
if getattr(action, "dest", None) == "load_format" and action.choices: if getattr(action, "dest", None) == "load_format" and action.choices:
action.choices = list(action.choices) + ["gms"] action.choices = list(action.choices) + ["gms"]
break break
# Handle config file if present # trick to add sglang flags to a specific group without breaking the Dynamo groups.
sg = parser.add_argument_group(
"SGLang Engine Options. Please refer to SGLang documentation for more details."
)
for action in sglang_only_parser._actions:
if not action.option_strings:
continue
sg._group_actions.append(action)
dynamo_args, unknown = parser.parse_known_args(args)
dynamo_config = DynamoConfig.from_cli_args(dynamo_args)
dynamo_config.validate()
# Dealing with SGLang native configs
temp_config_file = None # Track temp file for cleanup temp_config_file = None # Track temp file for cleanup
if "--config" in args: if dynamo_config.disagg_config and dynamo_config.disagg_config_key:
# Check if --config-key is also present section_data = _load_disagg_config_section(
if "--config-key" in args: dynamo_config.disagg_config, dynamo_config.disagg_config_key
key_index = args.index("--config-key") )
config_key = args[key_index + 1]
config_index = args.index("--config")
config_path = args[config_index + 1]
# Extract nested section to temp file
args, temp_config_file = _extract_config_section(
args, config_path, config_key
)
# Remove --config-key from args (not recognized by SGLang) temp_config_file = _dump_disagg_config_section(section_data)
args = args[:key_index] + args[key_index + 2 :]
# Remove any existing --config (both '--config val' and '--config=val' forms)
unknown = _remove_cli_flag_and_value(unknown, "--config")
unknown.append("--config")
unknown.append(temp_config_file)
# Handle SGLang --config file merge if present.
if "--config" in unknown:
# Merge config file arguments with CLI arguments. # Merge config file arguments with CLI arguments.
# ConfigArgumentMerger API changed after SGLang v0.5.7: # ConfigArgumentMerger API changed after SGLang v0.5.7:
# - New API (post-v0.5.7): accepts parser= for proper store_true detection # - New API (post-v0.5.7): accepts parser= for proper store_true detection
...@@ -400,18 +238,18 @@ async def parse_args(args: list[str]) -> Config: ...@@ -400,18 +238,18 @@ async def parse_args(args: list[str]) -> Config:
sig = inspect.signature(ConfigArgumentMerger.__init__) sig = inspect.signature(ConfigArgumentMerger.__init__)
if "parser" in sig.parameters: if "parser" in sig.parameters:
config_merger = ConfigArgumentMerger(parser=parser) config_merger = ConfigArgumentMerger(parser=sglang_only_parser)
else: else:
# Legacy path: extract store_true actions manually # Legacy path: extract store_true actions manually
boolean_actions = [ boolean_actions = [
action.dest action.dest
for action in parser._actions for action in sglang_only_parser._actions
if isinstance(action, argparse._StoreTrueAction) if isinstance(action, argparse._StoreTrueAction)
] ]
config_merger = ConfigArgumentMerger(boolean_actions=boolean_actions) config_merger = ConfigArgumentMerger(boolean_actions=boolean_actions)
args = config_merger.merge_config_with_args(args) unknown = config_merger.merge_config_with_args(unknown)
parsed_args = parser.parse_args(args) parsed_args = sglang_only_parser.parse_args(unknown)
# Clean up temp file if created # Clean up temp file if created
if temp_config_file and os.path.exists(temp_config_file): if temp_config_file and os.path.exists(temp_config_file):
...@@ -420,8 +258,10 @@ async def parse_args(args: list[str]) -> Config: ...@@ -420,8 +258,10 @@ async def parse_args(args: list[str]) -> Config:
except Exception: except Exception:
logging.warning(f"Failed to clean up temp config file: {temp_config_file}") logging.warning(f"Failed to clean up temp config file: {temp_config_file}")
bootstrap_port = _reserve_disaggregation_bootstrap_port()
# Auto-set bootstrap port if not provided # Auto-set bootstrap port if not provided
if not any(arg.startswith("--disaggregation-bootstrap-port") for arg in args): if not any(arg.startswith("--disaggregation-bootstrap-port") for arg in unknown):
args_dict = vars(parsed_args) args_dict = vars(parsed_args)
args_dict["disaggregation_bootstrap_port"] = bootstrap_port args_dict["disaggregation_bootstrap_port"] = bootstrap_port
parsed_args = Namespace(**args_dict) parsed_args = Namespace(**args_dict)
...@@ -429,31 +269,31 @@ async def parse_args(args: list[str]) -> Config: ...@@ -429,31 +269,31 @@ async def parse_args(args: list[str]) -> Config:
# Dynamo argument processing # Dynamo argument processing
# If an endpoint is provided, validate and use it # If an endpoint is provided, validate and use it
# otherwise fall back to default endpoints # otherwise fall back to default endpoints
namespace = os.environ.get("DYN_NAMESPACE", "dynamo") namespace = dynamo_config.namespace
# If --embedding-worker is set, also set SGLang's --is-embedding flag # If --embedding-worker is set, also set SGLang's --is-embedding flag
if parsed_args.embedding_worker: if dynamo_config.embedding_worker:
parsed_args.is_embedding = True parsed_args.is_embedding = True
endpoint = parsed_args.endpoint endpoint = dynamo_config.endpoint
if endpoint is None: if endpoint is None:
if parsed_args.embedding_worker: if dynamo_config.embedding_worker:
endpoint = f"dyn://{namespace}.backend.generate" endpoint = f"dyn://{namespace}.backend.generate"
elif getattr(parsed_args, "image_diffusion_worker", False): elif dynamo_config.image_diffusion_worker:
endpoint = f"dyn://{namespace}.backend.generate" endpoint = f"dyn://{namespace}.backend.generate"
elif getattr(parsed_args, "video_generation_worker", False): elif dynamo_config.video_generation_worker:
endpoint = f"dyn://{namespace}.backend.generate" endpoint = f"dyn://{namespace}.backend.generate"
elif ( elif (
hasattr(parsed_args, "disaggregation_mode") hasattr(parsed_args, "disaggregation_mode")
and parsed_args.disaggregation_mode == "prefill" and parsed_args.disaggregation_mode == "prefill"
): ):
endpoint = f"dyn://{namespace}.prefill.generate" endpoint = f"dyn://{namespace}.prefill.generate"
elif parsed_args.multimodal_processor: elif dynamo_config.multimodal_processor:
endpoint = f"dyn://{namespace}.processor.generate" endpoint = f"dyn://{namespace}.processor.generate"
elif parsed_args.multimodal_encode_worker: elif dynamo_config.multimodal_encode_worker:
endpoint = f"dyn://{namespace}.encoder.generate" endpoint = f"dyn://{namespace}.encoder.generate"
elif ( elif (
parsed_args.multimodal_worker dynamo_config.multimodal_worker
and parsed_args.disaggregation_mode == "prefill" and parsed_args.disaggregation_mode == "prefill"
): ):
endpoint = f"dyn://{namespace}.prefill.generate" endpoint = f"dyn://{namespace}.prefill.generate"
...@@ -469,18 +309,16 @@ async def parse_args(args: list[str]) -> Config: ...@@ -469,18 +309,16 @@ async def parse_args(args: list[str]) -> Config:
# --dyn-{name} choices are validated by argparse; --{name} by SGLang. # --dyn-{name} choices are validated by argparse; --{name} by SGLang.
_validate_parser_flags( _validate_parser_flags(
parsed_args.tool_call_parser, parsed_args.tool_call_parser,
parsed_args.dyn_tool_call_parser, dynamo_config.dyn_tool_call_parser,
"tool-call-parser", "tool-call-parser",
) )
_validate_parser_flags( _validate_parser_flags(
parsed_args.reasoning_parser, parsed_args.reasoning_parser,
parsed_args.dyn_reasoning_parser, dynamo_config.dyn_reasoning_parser,
"reasoning-parser", "reasoning-parser",
) )
tool_call_parser = parsed_args.dyn_tool_call_parser
reasoning_parser = parsed_args.dyn_reasoning_parser
if parsed_args.custom_jinja_template and parsed_args.use_sglang_tokenizer: if dynamo_config.custom_jinja_template and dynamo_config.use_sglang_tokenizer:
logging.error( logging.error(
"Cannot use --custom-jinja-template and --use-sglang-tokenizer together. " "Cannot use --custom-jinja-template and --use-sglang-tokenizer together. "
"--custom-jinja-template requires Dynamo's preprocessor to apply the template, " "--custom-jinja-template requires Dynamo's preprocessor to apply the template, "
...@@ -492,9 +330,9 @@ async def parse_args(args: list[str]) -> Config: ...@@ -492,9 +330,9 @@ async def parse_args(args: list[str]) -> Config:
# Replaces any environment variables or home dir (~) to get absolute path # Replaces any environment variables or home dir (~) to get absolute path
expanded_template_path = None expanded_template_path = None
if parsed_args.custom_jinja_template: if dynamo_config.custom_jinja_template:
expanded_template_path = os.path.expandvars( expanded_template_path = os.path.expandvars(
os.path.expanduser(parsed_args.custom_jinja_template) os.path.expanduser(dynamo_config.custom_jinja_template)
) )
# Validate custom Jinja template file exists # Validate custom Jinja template file exists
if not os.path.isfile(expanded_template_path): if not os.path.isfile(expanded_template_path):
...@@ -522,8 +360,8 @@ async def parse_args(args: list[str]) -> Config: ...@@ -522,8 +360,8 @@ async def parse_args(args: list[str]) -> Config:
# For diffusion/video workers, create a minimal dummy ServerArgs since diffusion # For diffusion/video workers, create a minimal dummy ServerArgs since diffusion
# doesn't use transformer models or sglang Engine - it uses DiffGenerator directly # doesn't use transformer models or sglang Engine - it uses DiffGenerator directly
image_diffusion_worker = getattr(parsed_args, "image_diffusion_worker", False) image_diffusion_worker = dynamo_config.image_diffusion_worker
video_generation_worker = getattr(parsed_args, "video_generation_worker", False) video_generation_worker = dynamo_config.video_generation_worker
if image_diffusion_worker or video_generation_worker: if image_diffusion_worker or video_generation_worker:
worker_type = ( worker_type = (
...@@ -562,7 +400,7 @@ async def parse_args(args: list[str]) -> Config: ...@@ -562,7 +400,7 @@ async def parse_args(args: list[str]) -> Config:
# Force stream_output=True for optimal streaming performance. # Force stream_output=True for optimal streaming performance.
server_args.stream_output = True server_args.stream_output = True
if parsed_args.use_sglang_tokenizer: if dynamo_config.use_sglang_tokenizer:
logging.info( logging.info(
"Using SGLang's built in tokenizer. Setting skip_tokenizer_init to False" "Using SGLang's built in tokenizer. Setting skip_tokenizer_init to False"
) )
...@@ -591,34 +429,16 @@ async def parse_args(args: list[str]) -> Config: ...@@ -591,34 +429,16 @@ async def parse_args(args: list[str]) -> Config:
# Auto-detect diffusion worker mode if dllm_algorithm # Auto-detect diffusion worker mode if dllm_algorithm
diffusion_worker = server_args.dllm_algorithm is not None diffusion_worker = server_args.dllm_algorithm is not None
dynamo_args = DynamoArgs( dynamo_config.namespace = parsed_namespace
namespace=parsed_namespace, dynamo_config.component = parsed_component_name
component=parsed_component_name, dynamo_config.endpoint = parsed_endpoint_name
endpoint=parsed_endpoint_name, dynamo_config.custom_jinja_template = expanded_template_path
discovery_backend=parsed_args.discovery_backend, dynamo_config.diffusion_worker = diffusion_worker
request_plane=parsed_args.request_plane, dynamo_config.use_kv_events = use_kv_events
event_plane=parsed_args.event_plane,
tool_call_parser=tool_call_parser, logging.debug(f"Dynamo configs: {dynamo_config}")
reasoning_parser=reasoning_parser,
custom_jinja_template=expanded_template_path,
dyn_endpoint_types=parsed_args.dyn_endpoint_types,
use_sglang_tokenizer=parsed_args.use_sglang_tokenizer,
multimodal_processor=parsed_args.multimodal_processor,
multimodal_encode_worker=parsed_args.multimodal_encode_worker,
multimodal_worker=parsed_args.multimodal_worker,
embedding_worker=parsed_args.embedding_worker,
diffusion_worker=diffusion_worker,
image_diffusion_worker=getattr(parsed_args, "image_diffusion_worker", False),
image_diffusion_fs_url=getattr(parsed_args, "image_diffusion_fs_url", None),
video_generation_worker=getattr(parsed_args, "video_generation_worker", False),
video_generation_fs_url=getattr(parsed_args, "video_generation_fs_url", None),
dump_config_to=parsed_args.dump_config_to,
enable_local_indexer=not parsed_args.durable_kv_events,
use_kv_events=use_kv_events,
)
logging.debug(f"Dynamo args: {dynamo_args}")
return Config(server_args, dynamo_args) return Config(server_args, dynamo_config)
@contextlib.contextmanager @contextlib.contextmanager
......
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
"""Dynamo SGLang wrapper configuration ArgGroup."""
from typing import Optional
from dynamo.common.configuration.arg_group import ArgGroup
from dynamo.common.configuration.config_base import ConfigBase
from dynamo.common.configuration.utils import add_argument, add_negatable_bool_argument
from . import __version__
class DynamoSGLangArgGroup(ArgGroup):
"""SGLang-specific Dynamo wrapper configuration (not native SGLang engine args)."""
name = "dynamo-sglang"
def add_arguments(self, parser) -> None:
"""Add Dynamo SGLang arguments to parser."""
parser.add_argument(
"--version",
action="version",
version=f"Dynamo Backend SGLang {__version__}",
)
g = parser.add_argument_group("Dynamo SGLang Options")
add_negatable_bool_argument(
g,
flag_name="--use-sglang-tokenizer",
env_var="DYN_SGL_USE_TOKENIZER",
default=False,
help="Use SGLang's tokenizer for pre and post processing. This bypasses Dynamo's preprocessor and only v1/chat/completions will be available through the Dynamo frontend. Cannot be used with --custom-jinja-template.",
)
add_negatable_bool_argument(
g,
flag_name="--multimodal-processor",
env_var="DYN_SGL_MULTIMODAL_PROCESSOR",
default=False,
help="Run as multimodal processor component for handling multimodal requests.",
)
add_negatable_bool_argument(
g,
flag_name="--multimodal-encode-worker",
env_var="DYN_SGL_MULTIMODAL_ENCODE_WORKER",
default=False,
help="Run as multimodal encode worker component for processing images/videos.",
)
add_negatable_bool_argument(
g,
flag_name="--multimodal-worker",
env_var="DYN_SGL_MULTIMODAL_WORKER",
default=False,
help="Run as multimodal worker component for LLM inference with multimodal data.",
)
add_negatable_bool_argument(
g,
flag_name="--embedding-worker",
env_var="DYN_SGL_EMBEDDING_WORKER",
default=False,
help="Run as embedding worker component (Dynamo flag, also sets SGLang's --is-embedding).",
)
add_negatable_bool_argument(
g,
flag_name="--image-diffusion-worker",
env_var="DYN_SGL_IMAGE_DIFFUSION_WORKER",
default=False,
help="Run as image diffusion worker for image generation.",
)
add_argument(
g,
flag_name="--image-diffusion-fs-url",
env_var="DYN_SGL_IMAGE_DIFFUSION_FS_URL",
default=None,
help="Filesystem URL for storing generated images using fsspec (e.g., s3://bucket/path, gs://bucket/path, file:///local/path). Supports any fsspec-compatible filesystem.",
)
add_argument(
g,
flag_name="--image-diffusion-base-url",
env_var="DYN_SGL_IMAGE_DIFFUSION_BASE_URL",
default="http://localhost:8008/",
help="Base URL for rewriting image URLs in responses (e.g., http://localhost:8008/). When set, generated image URLs will use this base instead of filesystem URLs.",
)
add_argument(
g,
flag_name="--disagg-config",
env_var="DYN_SGL_DISAGG_CONFIG",
default=None,
help="Disaggregation configuration file in YAML format.",
)
add_argument(
g,
flag_name="--disagg-config-key",
env_var="DYN_SGL_DISAGG_CONFIG_KEY",
default=None,
help="Key to select from nested disaggregation configuration file (e.g., 'prefill', 'decode').",
)
add_negatable_bool_argument(
g,
flag_name="--video-generation-worker",
env_var="DYN_SGL_VIDEO_GENERATION_WORKER",
default=False,
help="Run as video generation worker for video generation (T2V/I2V).",
)
add_argument(
g,
flag_name="--video-generation-fs-url",
env_var="DYN_SGL_VIDEO_GENERATION_FS_URL",
default=None,
help="Filesystem URL for storing generated videos using fsspec (e.g., s3://bucket/path, gs://bucket/path, file:///local/path). Supports any fsspec-compatible filesystem.",
)
class DynamoSGLangConfig(ConfigBase):
"""Configuration for Dynamo SGLang wrapper (SGLang-specific only)."""
use_sglang_tokenizer: bool
multimodal_processor: bool
multimodal_encode_worker: bool
multimodal_worker: bool
embedding_worker: bool
image_diffusion_worker: bool
image_diffusion_fs_url: Optional[str] = None
image_diffusion_base_url: Optional[str] = None
disagg_config: Optional[str] = None
disagg_config_key: Optional[str] = None
video_generation_worker: bool
video_generation_fs_url: Optional[str] = None
def validate(self) -> None:
if (self.disagg_config is not None) ^ (self.disagg_config_key is not None):
raise ValueError(
"Both 'disagg_config' and 'disagg_config_key' must be provided together."
)
...@@ -285,13 +285,8 @@ async def init( ...@@ -285,13 +285,8 @@ async def init(
engine, use_text_input=dynamo_args.use_sglang_tokenizer engine, use_text_input=dynamo_args.use_sglang_tokenizer
).to_dict() ).to_dict()
logging.info( logging.info(f"Registering model with endpoint types: {dynamo_args.endpoint_types}")
f"Registering model with endpoint types: {dynamo_args.dyn_endpoint_types}" if dynamo_args.custom_jinja_template and "chat" not in dynamo_args.endpoint_types:
)
if (
dynamo_args.custom_jinja_template
and "chat" not in dynamo_args.dyn_endpoint_types
):
logging.warning( logging.warning(
"Custom Jinja template provided (--custom-jinja-template) but 'chat' not in --dyn-endpoint-types. " "Custom Jinja template provided (--custom-jinja-template) but 'chat' not in --dyn-endpoint-types. "
"The chat template will be loaded but the /v1/chat/completions endpoint will not be available." "The chat template will be loaded but the /v1/chat/completions endpoint will not be available."
...@@ -312,7 +307,7 @@ async def init( ...@@ -312,7 +307,7 @@ async def init(
generate_endpoint, generate_endpoint,
server_args, server_args,
dynamo_args, dynamo_args,
output_type=parse_endpoint_types(dynamo_args.dyn_endpoint_types), output_type=parse_endpoint_types(dynamo_args.endpoint_types),
readiness_gate=ready_event, readiness_gate=ready_event,
), ),
) )
...@@ -462,7 +457,7 @@ async def init_diffusion( ...@@ -462,7 +457,7 @@ async def init_diffusion(
).to_dict() ).to_dict()
logging.info( logging.info(
f"Registering diffusion model with endpoint types: {dynamo_args.dyn_endpoint_types}" f"Registering diffusion model with endpoint types: {dynamo_args.endpoint_types}"
) )
try: try:
...@@ -479,7 +474,7 @@ async def init_diffusion( ...@@ -479,7 +474,7 @@ async def init_diffusion(
generate_endpoint, generate_endpoint,
server_args, server_args,
dynamo_args, dynamo_args,
output_type=parse_endpoint_types(dynamo_args.dyn_endpoint_types), output_type=parse_endpoint_types(dynamo_args.endpoint_types),
readiness_gate=ready_event, readiness_gate=ready_event,
), ),
) )
......
...@@ -12,14 +12,14 @@ from sglang.srt.utils import get_local_ip_auto ...@@ -12,14 +12,14 @@ from sglang.srt.utils import get_local_ip_auto
from dynamo._core import Endpoint from dynamo._core import Endpoint
from dynamo.llm import ModelInput, ModelRuntimeConfig, ModelType, register_model from dynamo.llm import ModelInput, ModelRuntimeConfig, ModelType, register_model
from dynamo.sglang.args import DynamoArgs from dynamo.sglang.args import DynamoConfig
async def _register_model_with_runtime_config( async def _register_model_with_runtime_config(
engine: sgl.Engine, engine: sgl.Engine,
endpoint: Endpoint, endpoint: Endpoint,
server_args: ServerArgs, server_args: ServerArgs,
dynamo_args: DynamoArgs, dynamo_args: DynamoConfig,
input_type: Optional[ModelInput] = ModelInput.Tokens, input_type: Optional[ModelInput] = ModelInput.Tokens,
output_type: Optional[ModelType] = ModelType.Chat | ModelType.Completions, output_type: Optional[ModelType] = ModelType.Chat | ModelType.Completions,
) -> bool: ) -> bool:
...@@ -144,7 +144,7 @@ def _get_bootstrap_info_for_config( ...@@ -144,7 +144,7 @@ def _get_bootstrap_info_for_config(
async def _get_runtime_config( async def _get_runtime_config(
engine: sgl.Engine, server_args: ServerArgs, dynamo_args: DynamoArgs engine: sgl.Engine, server_args: ServerArgs, dynamo_args: DynamoConfig
) -> Optional[ModelRuntimeConfig]: ) -> Optional[ModelRuntimeConfig]:
"""Extract runtime configuration from SGLang engine and args. """Extract runtime configuration from SGLang engine and args.
...@@ -158,8 +158,8 @@ async def _get_runtime_config( ...@@ -158,8 +158,8 @@ async def _get_runtime_config(
""" """
runtime_config = ModelRuntimeConfig() runtime_config = ModelRuntimeConfig()
# set reasoning parser and tool call parser # set reasoning parser and tool call parser
runtime_config.reasoning_parser = dynamo_args.reasoning_parser runtime_config.reasoning_parser = dynamo_args.dyn_reasoning_parser
runtime_config.tool_call_parser = dynamo_args.tool_call_parser runtime_config.tool_call_parser = dynamo_args.dyn_tool_call_parser
# Decode workers don't create the WorkerKvQuery endpoint, so don't advertise local indexer # Decode workers don't create the WorkerKvQuery endpoint, so don't advertise local indexer
is_decode_worker = server_args.disaggregation_mode == "decode" is_decode_worker = server_args.disaggregation_mode == "decode"
runtime_config.enable_local_indexer = ( runtime_config.enable_local_indexer = (
...@@ -235,7 +235,7 @@ async def register_model_with_readiness_gate( ...@@ -235,7 +235,7 @@ async def register_model_with_readiness_gate(
engine: sgl.Engine, engine: sgl.Engine,
generate_endpoint: Endpoint, generate_endpoint: Endpoint,
server_args: ServerArgs, server_args: ServerArgs,
dynamo_args: DynamoArgs, dynamo_args: DynamoConfig,
input_type: Optional[ModelInput] = ModelInput.Tokens, input_type: Optional[ModelInput] = ModelInput.Tokens,
output_type: Optional[ModelType] = ModelType.Chat | ModelType.Completions, output_type: Optional[ModelType] = ModelType.Chat | ModelType.Completions,
readiness_gate: Optional[asyncio.Event] = None, readiness_gate: Optional[asyncio.Event] = None,
......
...@@ -8,6 +8,7 @@ import sys ...@@ -8,6 +8,7 @@ import sys
from pathlib import Path from pathlib import Path
import pytest import pytest
import yaml
from dynamo.sglang.args import parse_args from dynamo.sglang.args import parse_args
from dynamo.sglang.tests.conftest import make_cli_args_fixture from dynamo.sglang.tests.conftest import make_cli_args_fixture
...@@ -92,7 +93,7 @@ async def test_tool_call_parser_valid_with_dynamo_tokenizer(mock_sglang_cli): ...@@ -92,7 +93,7 @@ async def test_tool_call_parser_valid_with_dynamo_tokenizer(mock_sglang_cli):
config = await parse_args(sys.argv[1:]) config = await parse_args(sys.argv[1:])
assert config.dynamo_args.tool_call_parser == "hermes" assert config.dynamo_args.dyn_tool_call_parser == "hermes"
@pytest.mark.asyncio @pytest.mark.asyncio
...@@ -120,3 +121,147 @@ async def test_tool_call_parser_both_flags_error(mock_sglang_cli): ...@@ -120,3 +121,147 @@ async def test_tool_call_parser_both_flags_error(mock_sglang_cli):
with pytest.raises(SystemExit): with pytest.raises(SystemExit):
await parse_args(sys.argv[1:]) await parse_args(sys.argv[1:])
@pytest.mark.asyncio
async def test_namespace_flag_drives_default_endpoint_namespace(mock_sglang_cli):
"""CLI namespace should be used for auto-derived endpoint."""
mock_sglang_cli(
"--model",
"Qwen/Qwen3-0.6B",
"--namespace",
"custom-ns",
)
config = await parse_args(sys.argv[1:])
assert config.dynamo_args.namespace == "custom-ns"
@pytest.mark.asyncio
async def test_obsolete_dyn_endpoint_types_flag_is_supported(mock_sglang_cli):
"""Obsolete --dyn-endpoint-types alias should map to endpoint_types."""
mock_sglang_cli(
"--model",
"Qwen/Qwen3-0.6B",
"--dyn-endpoint-types",
"completions",
)
config = await parse_args(sys.argv[1:])
assert config.dynamo_args.endpoint_types == "completions"
@pytest.mark.asyncio
async def test_disagg_config_requires_disagg_config_key(mock_sglang_cli):
"""--disagg-config and --disagg-config-key must be provided together."""
mock_sglang_cli(
"--model",
"Qwen/Qwen3-0.6B",
"--disagg-config",
"/tmp/nonexistent.yaml",
)
with pytest.raises(ValueError, match="disagg_config.*disagg_config_key.*together"):
await parse_args(sys.argv[1:])
@pytest.mark.asyncio
async def test_disagg_config_key_requires_disagg_config(mock_sglang_cli):
"""--disagg-config-key alone should fail."""
mock_sglang_cli(
"--model",
"Qwen/Qwen3-0.6B",
"--disagg-config-key",
"prefill",
)
with pytest.raises(ValueError, match="disagg_config.*disagg_config_key.*together"):
await parse_args(sys.argv[1:])
@pytest.mark.asyncio
async def test_disagg_config_key_not_found_error(tmp_path, mock_sglang_cli):
"""Missing disagg section key should raise a clear ValueError."""
config_path = tmp_path / "disagg.yaml"
config_path.write_text(
yaml.safe_dump({"prefill": {"tensor_parallel_size": 1}}), encoding="utf-8"
)
mock_sglang_cli(
"--model",
"Qwen/Qwen3-0.6B",
"--disagg-config",
str(config_path),
"--disagg-config-key",
"decode",
)
with pytest.raises(ValueError, match="Disagg config key 'decode' not found"):
await parse_args(sys.argv[1:])
@pytest.mark.asyncio
async def test_disagg_config_section_must_be_dict(tmp_path, mock_sglang_cli):
"""Selected disagg section must be a dictionary."""
config_path = tmp_path / "disagg.yaml"
config_path.write_text(yaml.safe_dump({"prefill": "not-a-dict"}), encoding="utf-8")
mock_sglang_cli(
"--model",
"Qwen/Qwen3-0.6B",
"--disagg-config",
str(config_path),
"--disagg-config-key",
"prefill",
)
with pytest.raises(
ValueError, match="Disagg config section 'prefill' must be a dictionary"
):
await parse_args(sys.argv[1:])
@pytest.mark.asyncio
async def test_disagg_config_preserves_bootstrap_port(tmp_path, mock_sglang_cli):
"""Bootstrap port from disagg section should not be overridden by auto-port logic."""
config_path = tmp_path / "disagg.yaml"
config_path.write_text(
yaml.safe_dump({"prefill": {"disaggregation-bootstrap-port": 42345}}),
encoding="utf-8",
)
mock_sglang_cli(
"--model",
"Qwen/Qwen3-0.6B",
"--disagg-config",
str(config_path),
"--disagg-config-key",
"prefill",
)
config = await parse_args(sys.argv[1:])
assert config.server_args.disaggregation_bootstrap_port == 42345
@pytest.mark.asyncio
async def test_disagg_config_rejects_dynamo_keys(tmp_path, mock_sglang_cli, capfd):
"""Disagg config should only accept SGLang-native keys."""
config_path = tmp_path / "disagg.yaml"
config_path.write_text(
yaml.safe_dump({"prefill": {"store-kv": "mem"}}), encoding="utf-8"
)
mock_sglang_cli(
"--model",
"Qwen/Qwen3-0.6B",
"--disagg-config",
str(config_path),
"--disagg-config-key",
"prefill",
)
with pytest.raises(SystemExit):
await parse_args(sys.argv[1:])
out, err = capfd.readouterr()
assert "unrecognized arguments: --store-kv mem" in err
...@@ -33,7 +33,6 @@ VALID_CONNECTORS = {"nixl", "lmcache", "kvbm", "null", "none"} ...@@ -33,7 +33,6 @@ VALID_CONNECTORS = {"nixl", "lmcache", "kvbm", "null", "none"}
class Config(DynamoRuntimeConfig, DynamoVllmConfig): class Config(DynamoRuntimeConfig, DynamoVllmConfig):
component: str component: str
endpoint: str
is_prefill_worker: bool is_prefill_worker: bool
is_decode_worker: bool is_decode_worker: bool
custom_jinja_template: Optional[str] = None custom_jinja_template: Optional[str] = None
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment