Unverified Commit 2e1f5076 authored by William Arnold's avatar William Arnold Committed by GitHub
Browse files

feat: add flag for dumping dynamo engine config and environment (#3286)


Signed-off-by: default avatarWilliam Arnold <7565007+Aphoh@users.noreply.github.com>
parent 04aafa9a
......@@ -100,4 +100,7 @@ TensorRT-LLM
/CLAUDE.md.bak
# Benchmarks
benchmarks/results
\ No newline at end of file
benchmarks/results
# Direnv
.envrc
\ No newline at end of file
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
"""
Dynamo Common Module
This module contains shared utilities and components used across multiple
Dynamo backends and components.
Main submodules:
- config_dump: Configuration dumping and system diagnostics utilities
"""
from dynamo.common import config_dump
from dynamo.common._version import __version__
__all__ = ["__version__", "config_dump"]
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
"""
Configuration Dumping Utilities
This module provides utilities for dumping configuration and system information
for debugging and diagnostics purposes.
"""
from dynamo.common.config_dump.config_dumper import (
add_config_dump_args,
dump_config,
get_config_dump,
register_encoder,
)
from dynamo.common.config_dump.environment import get_environment_vars
from dynamo.common.config_dump.system_info import (
get_gpu_info,
get_runtime_info,
get_system_info,
)
__all__ = [
"add_config_dump_args",
"dump_config",
"get_config_dump",
"get_environment_vars",
"get_gpu_info",
"get_runtime_info",
"get_system_info",
"register_encoder",
]
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
import argparse
import dataclasses
import functools
import json
import logging
import pathlib
from enum import Enum
from typing import Any, Dict, Optional
from dynamo.common._version import __version__
from .environment import get_environment_vars
from .system_info import (
get_gpu_info,
get_package_info,
get_runtime_info,
get_system_info,
)
logger = logging.getLogger(__name__)
def _get_sglang_version() -> Optional[str]:
"""Get SGLang version if available.
Returns:
Version string if SGLang is installed, None otherwise.
"""
try:
import sglang as sgl
return sgl.__version__
except ImportError:
logger.debug("SGLang not available")
return None
except AttributeError:
logger.warning("SGLang installed but version not available")
return None
def _get_trtllm_version() -> Optional[str]:
"""Get TensorRT-LLM version if available.
Returns:
Version string if TensorRT-LLM is installed, None otherwise.
"""
try:
import tensorrt_llm
return tensorrt_llm.__version__
except ImportError:
logger.debug("TensorRT-LLM not available")
return None
except AttributeError:
logger.warning("TensorRT-LLM installed but version not available")
return None
def _get_vllm_version() -> Optional[str]:
"""Get vLLM version if available.
Returns:
Version string if vLLM is installed, None otherwise.
"""
try:
import vllm
return vllm.__version__
except ImportError:
logger.debug("vLLM not available")
return None
except AttributeError:
logger.warning("vLLM installed but version not available")
return None
def dump_config(dump_config_to: Optional[str], config: Any) -> None:
"""
Dump the configuration to a file or stdout.
If dump_config_to is not provided, the config will be logged to stdout at VERBOSE level.
Args:
dump_config_to: Optional path to dump the config to. If None, logs to stdout.
config: The configuration object to dump (must be JSON-serializable).
Raises:
Logs errors but does not raise exceptions to ensure graceful degradation.
"""
config_dump_payload = get_config_dump(config)
if dump_config_to:
try:
dump_path = pathlib.Path(dump_config_to)
dump_path.parent.mkdir(parents=True, exist_ok=True)
with open(dump_path.resolve(), "w", encoding="utf-8") as f:
f.write(config_dump_payload)
logger.info(f"Dumped config to {dump_path.resolve()}")
except (OSError, IOError):
logger.exception(f"Failed to dump config to {dump_config_to}")
logger.info(f"CONFIG_DUMP: {config_dump_payload}")
except Exception:
logger.exception("Unexpected error dumping config")
logger.info(f"CONFIG_DUMP: {config_dump_payload}")
else:
logger.info(f"CONFIG_DUMP: {config_dump_payload}")
def get_config_dump(config: Any, extra_info: Optional[Dict[str, Any]] = None) -> str:
"""
Collect comprehensive config information about a backend instance.
Args:
config: Any JSON-serializable object containing the backend configuration.
extra_info: Optional dict of additional information to include in the dump.
Returns:
JSON string containing comprehensive information.
Note:
Returns error information if collection fails, ensuring some diagnostic data is always available.
"""
if extra_info is None:
extra_info = {}
try:
config_dump = {
"system_info": get_system_info(),
"environment": get_environment_vars(),
"config": config,
"runtime_info": get_runtime_info(),
"dynamo_version": __version__,
"gpu_info": get_gpu_info(),
"installed_packages": get_package_info(),
}
# Add common versions
if ver := _get_sglang_version():
config_dump["sglang_version"] = ver
if ver := _get_trtllm_version():
config_dump["trtllm_version"] = ver
if ver := _get_vllm_version():
config_dump["vllm_version"] = ver
# Add any extra information provided by the caller
if extra_info:
config_dump.update(extra_info)
return canonical_json_encoder.encode(config_dump)
except Exception as e:
logger.error(f"Error collecting config dump: {e}")
# Return a basic error response with at least system info
error_info = {
"error": f"Failed to collect config dump: {str(e)}",
"system_info": get_system_info(), # Always try to include basic system info
}
return canonical_json_encoder.encode(error_info)
def add_config_dump_args(parser: argparse.ArgumentParser):
"""
Add arguments to the parser to dump the config to a file.
Args:
parser: The parser to add the arguments to
"""
parser.add_argument(
"--dump-config-to",
type=str,
default=None,
help="Dump config to the specified file path. If not specified, the config will be dumped to stdout at INFO level.",
)
@functools.singledispatch
def _preprocess_for_encode(obj: object) -> object:
"""
Single dispatch function for preprocessing objects before JSON encoding.
This function should be extended using @register_encoder decorator
for backend-specific types.
"""
if dataclasses.is_dataclass(obj) and not isinstance(obj, type):
return dataclasses.asdict(obj)
logger.warning(f"Unknown type {type(obj)}, using __dict__ or str(obj)")
if hasattr(obj, "__dict__"):
return obj.__dict__
return str(obj)
def register_encoder(type_class):
"""
Decorator to register custom encoders for specific types.
Usage:
@register_encoder(MyClass)
def encode_my_class(obj: MyClass):
return {"field": obj.field}
"""
logger.debug(f"Registering encoder for {type_class}")
return _preprocess_for_encode.register(type_class)
@register_encoder(set)
def _preprocess_for_encode_set(
obj: set,
) -> list: # pyright: ignore[reportUnusedFunction]
return sorted(list(obj))
@register_encoder(Enum)
def _preprocess_for_encode_enum(
obj: Enum,
) -> str: # pyright: ignore[reportUnusedFunction]
return str(obj)
# Create a canonical JSON encoder with consistent formatting
canonical_json_encoder = json.JSONEncoder(
ensure_ascii=False,
separators=(",", ":"),
allow_nan=False,
sort_keys=True,
indent=None,
default=_preprocess_for_encode,
)
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
import os
from typing import Dict, List, Optional, Set
# Default environment variable prefixes to capture
# These cover common ML/GPU/Dynamo-related configurations
DEFAULT_ENV_PREFIXES = [
"DYN_", # Dynamo-specific variables
"CUDA_", # CUDA configuration
"NCCL_", # NVIDIA Collective Communications Library
"HF_", # HuggingFace
"TRANSFORMERS_", # Transformers library
"SGLANG_", # SGLang
"SGL_", # SGLang (short prefix)
"MC_", # Mooncake
"VLLM_", # vLLM
"TENSORRT_", # TensorRT
"TORCH_", # PyTorch
"UCX_", # UCX
"NIXL_", # NIXL
"OMPI_", # OpenMPI
]
# Sensitive variable patterns to redact (case-insensitive)
SENSITIVE_PATTERNS = [
"TOKEN",
"API_KEY",
"SECRET",
"PASSWORD",
"CREDENTIAL",
"AUTH",
]
def get_environment_vars(
prefixes: Optional[List[str]] = None,
include_sensitive: bool = False,
additional_vars: Optional[Set[str]] = None,
) -> Dict[str, str]:
"""
Get relevant environment variables based on prefixes.
Args:
prefixes: List of environment variable prefixes to capture.
If None, uses DEFAULT_ENV_PREFIXES.
include_sensitive: If False, redacts values of potentially sensitive variables.
Default is False for security.
additional_vars: Set of specific variable names to include regardless of prefix.
Returns:
Dictionary of environment variable names to values.
Sensitive values are replaced with "<REDACTED>" unless include_sensitive is True.
Examples:
>>> get_environment_vars() # Uses default prefixes
>>> get_environment_vars(prefixes=["MY_APP_"]) # Custom prefixes only
>>> get_environment_vars(additional_vars={"PATH", "HOME"}) # Include specific vars
"""
if prefixes is None:
prefixes = DEFAULT_ENV_PREFIXES
if additional_vars is None:
additional_vars = set()
relevant_env_vars = {}
for key, value in os.environ.items():
# Check if matches prefix or is in additional_vars
if any(key.startswith(prefix) for prefix in prefixes) or key in additional_vars:
# Redact sensitive values unless explicitly requested
if not include_sensitive and _is_sensitive(key):
relevant_env_vars[key] = "<REDACTED>"
else:
relevant_env_vars[key] = value
return relevant_env_vars
def _is_sensitive(var_name: str) -> bool:
"""
Check if an environment variable name suggests it contains sensitive data.
Args:
var_name: The environment variable name to check.
Returns:
True if the variable name matches sensitive patterns.
"""
var_name_upper = var_name.upper()
return any(pattern in var_name_upper for pattern in SENSITIVE_PATTERNS)
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
import importlib.metadata
import logging
import platform
import sys
from typing import Any, Dict, Optional
logger = logging.getLogger(__name__)
def get_system_info() -> Dict[str, Any]:
"""
Get comprehensive system information.
Returns:
Dictionary containing platform, architecture, processor, hostname,
and operating system details.
Note:
Gracefully handles errors by returning partial information.
"""
info: Dict[str, Any] = {}
try:
info["platform"] = platform.platform()
except Exception as e:
logger.warning(f"Failed to get platform: {e}")
info["platform"] = "unknown"
try:
info["architecture"] = platform.architecture()
except Exception as e:
logger.warning(f"Failed to get architecture: {e}")
info["architecture"] = ("unknown", "unknown")
try:
info["processor"] = platform.processor() or "unknown"
except Exception as e:
logger.warning(f"Failed to get processor: {e}")
info["processor"] = "unknown"
try:
info["hostname"] = platform.node()
except Exception as e:
logger.warning(f"Failed to get hostname: {e}")
info["hostname"] = "unknown"
try:
info["os_name"] = platform.system()
info["os_release"] = platform.release()
info["os_version"] = platform.version()
except Exception as e:
logger.warning(f"Failed to get OS details: {e}")
return info
def get_runtime_info() -> Dict[str, Any]:
"""
Get Python runtime information.
Returns:
Dictionary containing Python version, executable path, and command-line arguments.
Note:
Gracefully handles errors by returning partial information.
"""
info: Dict[str, Any] = {}
try:
info["python_version"] = sys.version
info["python_version_info"] = {
"major": sys.version_info.major,
"minor": sys.version_info.minor,
"micro": sys.version_info.micro,
}
except Exception as e:
logger.warning(f"Failed to get Python version: {e}")
info["python_version"] = "unknown"
try:
info["python_executable"] = sys.executable
except Exception as e:
logger.warning(f"Failed to get Python executable: {e}")
info["python_executable"] = "unknown"
try:
info["command_line_args"] = sys.argv
except Exception as e:
logger.warning(f"Failed to get command-line args: {e}")
info["command_line_args"] = []
return info
def get_gpu_info() -> Optional[Dict[str, Any]]:
"""
Get GPU information if available.
Returns:
Dictionary containing GPU details if available, None otherwise.
Attempts to use nvidia-smi via subprocess.
Note:
This is a best-effort function and returns None if GPU info cannot be obtained.
"""
try:
import subprocess
result = subprocess.run(
[
"nvidia-smi",
"--query-gpu=name,driver_version,memory.total",
"--format=csv,noheader",
],
capture_output=True,
text=True,
timeout=5,
)
if result.returncode == 0:
gpu_lines = result.stdout.strip().split("\n")
gpus = []
for line in gpu_lines:
if line:
parts = [p.strip() for p in line.split(",")]
if len(parts) >= 3:
gpus.append(
{
"name": parts[0],
"driver_version": parts[1],
"memory_total": parts[2],
}
)
return {"gpus": gpus, "count": len(gpus)} if gpus else None
except (FileNotFoundError, subprocess.TimeoutExpired, Exception) as e:
logger.debug(f"Failed to get GPU info: {e}")
return None
def get_package_info() -> Optional[Dict[str, Any]]:
"""
Get package information.
Returns:
Dictionary containing installed packages and their versions.
"""
packages = {}
for package in importlib.metadata.distributions():
packages[package.name] = package.version
return packages
......@@ -30,6 +30,8 @@ import re
import uvloop
from dynamo.common.config_dump import dump_config
from dynamo.common.config_dump.config_dumper import add_config_dump_args
from dynamo.llm import (
EngineType,
EntrypointArgs,
......@@ -207,6 +209,7 @@ def parse_args():
default=False,
help="Start KServe gRPC server.",
)
add_config_dump_args(parser)
flags = parser.parse_args()
......@@ -220,6 +223,7 @@ def parse_args():
async def async_main():
flags = parse_args()
dump_config(flags.dump_config_to, flags)
is_static = bool(flags.static_endpoint) # true if the string has a value
# Configure Dynamo frontend HTTP service metrics prefix
......
......@@ -15,6 +15,7 @@ from typing import Any, Dict, Generator, List, Optional
from sglang.srt.server_args import ServerArgs
from dynamo._core import get_reasoning_parser_names, get_tool_parser_names
from dynamo.common.config_dump import register_encoder
from dynamo.runtime.logging import configure_dynamo_logging
from dynamo.sglang import __version__
......@@ -85,6 +86,12 @@ DYNAMO_ARGS: Dict[str, Dict[str, Any]] = {
"default": False,
"help": "Run as embedding worker component (Dynamo flag, also sets SGLang's --is-embedding)",
},
"dump-config-to": {
"flags": ["--dump-config-to"],
"type": str,
"default": None,
"help": "Dump debug config to the specified file path. If not specified, the config will be dumped to stdout at INFO level.",
},
}
......@@ -110,6 +117,8 @@ class DynamoArgs:
# embedding options
embedding_worker: bool = False
# config dump options
dump_config_to: Optional[str] = None
class DisaggregationMode(Enum):
......@@ -137,6 +146,20 @@ class Config:
return DisaggregationMode.AGGREGATED
# Register SGLang-specific encoders with the shared system
@register_encoder(Config)
def _preprocess_for_encode_config(
config: Config,
) -> Dict[str, Any]: # pyright: ignore[reportUnusedFunction]
return {
"server_args": config.server_args,
"dynamo_args": config.dynamo_args,
"serving_mode": config.serving_mode.value
if config.serving_mode is not None
else "None",
}
def _set_parser(
sglang_str: Optional[str],
dynamo_str: Optional[str],
......@@ -307,6 +330,7 @@ def parse_args(args: list[str]) -> Config:
multimodal_encode_worker=parsed_args.multimodal_encode_worker,
multimodal_worker=parsed_args.multimodal_worker,
embedding_worker=parsed_args.embedding_worker,
dump_config_to=parsed_args.dump_config_to,
)
logging.debug(f"Dynamo args: {dynamo_args}")
......
......@@ -9,6 +9,7 @@ import sys
import sglang as sgl
import uvloop
from dynamo.common.config_dump import dump_config
from dynamo.llm import ModelInput, ModelType
from dynamo.runtime import DistributedRuntime, dynamo_worker
from dynamo.runtime.logging import configure_dynamo_logging
......@@ -45,6 +46,7 @@ async def worker(runtime: DistributedRuntime):
logging.info("Signal handlers will trigger a graceful shutdown of the runtime")
config = parse_args(sys.argv[1:])
dump_config(config.dynamo_args.dump_config_to, config)
if config.dynamo_args.embedding_worker:
await init_embedding(runtime, config)
elif config.dynamo_args.multimodal_processor:
......
......@@ -23,6 +23,7 @@ from torch.cuda import device_count
from transformers import AutoConfig
import dynamo.nixl_connect as nixl_connect
from dynamo.common.config_dump import dump_config
from dynamo.llm import ModelInput, ModelRuntimeConfig, ModelType, register_llm
from dynamo.runtime import DistributedRuntime, dynamo_worker
from dynamo.runtime.logging import configure_dynamo_logging
......@@ -270,6 +271,10 @@ async def init(runtime: DistributedRuntime, config: Config):
connector = nixl_connect.Connector()
await connector.initialize()
dump_config(
config.dump_config_to, {"engine_args": engine_args, "dynamo_args": config}
)
async with get_llm_engine(engine_args) as engine:
endpoint = component.endpoint(config.endpoint)
......
......@@ -8,6 +8,7 @@ from typing import Optional
from tensorrt_llm.llmapi import BuildConfig
from dynamo._core import get_reasoning_parser_names, get_tool_parser_names
from dynamo.common.config_dump import add_config_dump_args
from dynamo.trtllm import __version__
from dynamo.trtllm.request_handlers.handler_base import (
DisaggregationMode,
......@@ -59,6 +60,7 @@ class Config:
self.max_file_size_mb: int = 50
self.reasoning_parser: Optional[str] = None
self.tool_call_parser: Optional[str] = None
self.dump_config_to: Optional[str] = None
self.custom_jinja_template: Optional[str] = None
def __str__(self) -> str:
......@@ -91,6 +93,7 @@ class Config:
f"max_file_size_mb={self.max_file_size_mb}, "
f"reasoning_parser={self.reasoning_parser}, "
f"tool_call_parser={self.tool_call_parser}, "
f"dump_config_to={self.dump_config_to},"
f"custom_jinja_template={self.custom_jinja_template}"
)
......@@ -298,6 +301,7 @@ def cmd_line_args():
choices=get_reasoning_parser_names(),
help="Reasoning parser name for the model. If not specified, no reasoning parsing is performed.",
)
add_config_dump_args(parser)
parser.add_argument(
"--custom-jinja-template",
type=str,
......@@ -374,6 +378,7 @@ def cmd_line_args():
config.reasoning_parser = args.dyn_reasoning_parser
config.tool_call_parser = args.dyn_tool_call_parser
config.dump_config_to = args.dump_config_to
# Handle custom jinja template path expansion (environment variables and home directory)
if args.custom_jinja_template:
......
......@@ -4,7 +4,7 @@
import logging
import os
from typing import Optional
from typing import Any, Dict, Optional
from vllm.config import KVTransferConfig
from vllm.distributed.kv_events import KVEventsConfig
......@@ -12,6 +12,7 @@ from vllm.engine.arg_utils import AsyncEngineArgs
from vllm.utils import FlexibleArgumentParser
from dynamo._core import get_reasoning_parser_names, get_tool_parser_names
from dynamo.common.config_dump import add_config_dump_args, register_encoder
from dynamo.runtime import DistributedRuntime
from . import __version__
......@@ -63,6 +64,14 @@ class Config:
tool_call_parser: Optional[str] = None
reasoning_parser: Optional[str] = None
# dump config to file
dump_config_to: Optional[str] = None
@register_encoder(Config)
def _preprocess_for_encode_config(config: Config) -> Dict[str, Any]:
return config.__dict__
def parse_args() -> Config:
parser = FlexibleArgumentParser(
......@@ -122,6 +131,7 @@ def parse_args() -> Config:
default=None,
help="Path to a custom Jinja template file to override the model's default chat template. This template will take precedence over any template found in the model repository.",
)
add_config_dump_args(parser)
parser = AsyncEngineArgs.add_cli_args(parser)
args = parser.parse_args()
......@@ -206,6 +216,8 @@ def parse_args() -> Config:
f"Setting reasonable default of {config.engine_args.block_size} for block_size"
)
config.dump_config_to = args.dump_config_to
return config
......
......@@ -12,6 +12,7 @@ from vllm.distributed.kv_events import ZmqEventPublisher
from vllm.usage.usage_lib import UsageContext
from vllm.v1.engine.async_llm import AsyncLLM
from dynamo.common.config_dump import dump_config
from dynamo.llm import (
ModelInput,
ModelRuntimeConfig,
......@@ -78,6 +79,7 @@ async def worker(runtime: DistributedRuntime):
logging.debug("Signal handlers set up for graceful shutdown")
dump_config(config.dump_config_to, config)
if config.is_prefill_worker:
await init_prefill(runtime, config)
logger.debug("init_prefill completed")
......
......@@ -7,6 +7,7 @@ import subprocess
from hatchling.builders.hooks.plugin.interface import BuildHookInterface
COMPONENTS = [
"common",
"frontend",
"vllm",
"sglang",
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment