Unverified Commit f81f5277 authored by Ayush Agarwal's avatar Ayush Agarwal Committed by GitHub
Browse files

feat: vllm omni cli (#6788)


Signed-off-by: default avatarayushag <ayushag@nvidia.com>
parent c5ad4a87
...@@ -158,9 +158,6 @@ def update_dynamo_config_with_engine( ...@@ -158,9 +158,6 @@ def update_dynamo_config_with_engine(
): ):
dynamo_config.component = "backend" dynamo_config.component = "backend"
dynamo_config.endpoint = "generate" dynamo_config.endpoint = "generate"
elif dynamo_config.omni:
dynamo_config.component = "backend"
dynamo_config.endpoint = "generate"
elif dynamo_config.disaggregation_mode == DisaggregationMode.PREFILL: elif dynamo_config.disaggregation_mode == DisaggregationMode.PREFILL:
dynamo_config.component = "prefill" dynamo_config.component = "prefill"
dynamo_config.endpoint = "generate" dynamo_config.endpoint = "generate"
......
...@@ -146,153 +146,6 @@ class DynamoVllmArgGroup(ArgGroup): ...@@ -146,153 +146,6 @@ class DynamoVllmArgGroup(ArgGroup):
choices=[m.value for m in EmbeddingTransferMode], choices=[m.value for m in EmbeddingTransferMode],
) )
# vLLM-Omni
add_negatable_bool_argument(
g,
flag_name="--omni",
env_var="DYN_VLLM_OMNI",
default=False,
help="Run as vLLM-Omni worker for multi-stage pipelines (supports text-to-text, text-to-image, etc.).",
)
add_argument(
g,
flag_name="--stage-configs-path",
env_var="DYN_VLLM_STAGE_CONFIGS_PATH",
default=None,
help="Path to vLLM-Omni stage configuration YAML file for --omni mode (optional).",
)
# Video encoding
add_argument(
g,
flag_name="--default-video-fps",
env_var="DYN_VLLM_DEFAULT_VIDEO_FPS",
default=16,
arg_type=int,
help="Default frames per second for generated videos.",
)
# Diffusion engine-level args (passed to AsyncOmni constructor).
# All flags use the --omni- prefix to avoid collisions with vLLM's
# native engine flags (e.g. --enforce-eager), which are parsed by a
# separate argparse pass and would otherwise be silently consumed here.
add_negatable_bool_argument(
g,
flag_name="--omni-enable-layerwise-offload",
env_var="DYN_VLLM_ENABLE_LAYERWISE_OFFLOAD",
default=False,
help="Enable layerwise (blockwise) offloading on DiT modules to reduce GPU memory.",
)
add_argument(
g,
flag_name="--omni-layerwise-num-gpu-layers",
env_var="DYN_VLLM_LAYERWISE_NUM_GPU_LAYERS",
default=1,
arg_type=int,
help="Number of ready layers (blocks) to keep on GPU during generation.",
)
add_negatable_bool_argument(
g,
flag_name="--omni-vae-use-slicing",
env_var="DYN_VLLM_VAE_USE_SLICING",
default=False,
help="Enable VAE slicing for memory optimization in diffusion models.",
)
add_negatable_bool_argument(
g,
flag_name="--omni-vae-use-tiling",
env_var="DYN_VLLM_VAE_USE_TILING",
default=False,
help="Enable VAE tiling for memory optimization in diffusion models.",
)
add_argument(
g,
flag_name="--omni-boundary-ratio",
env_var="DYN_VLLM_BOUNDARY_RATIO",
default=0.875,
arg_type=float,
help=(
"Boundary split ratio for low/high DiT transformers. "
"Default 0.875 uses both transformers for best quality. "
"Set to 1.0 to load only the low-noise transformer (saves memory). "
"Only used with --omni."
),
)
add_argument(
g,
flag_name="--omni-flow-shift",
env_var="DYN_VLLM_FLOW_SHIFT",
default=None,
arg_type=float,
help="Scheduler flow_shift parameter (5.0 for 720p, 12.0 for 480p). Only used with --omni.",
)
add_argument(
g,
flag_name="--omni-diffusion-cache-backend",
env_var="DYN_VLLM_DIFFUSION_CACHE_BACKEND",
default=None,
choices=["cache_dit", "tea_cache"],
help=(
"Cache backend for diffusion acceleration. "
"'cache_dit' enables DBCache + SCM + TaylorSeer. "
"'tea_cache' enables TeaCache. Only used with --omni."
),
)
add_argument(
g,
flag_name="--omni-diffusion-cache-config",
env_var="DYN_VLLM_DIFFUSION_CACHE_CONFIG",
default=None,
help="Cache configuration as JSON string (overrides defaults). Only used with --omni.",
)
add_negatable_bool_argument(
g,
flag_name="--omni-enable-cache-dit-summary",
env_var="DYN_VLLM_ENABLE_CACHE_DIT_SUMMARY",
default=False,
help="Enable cache-dit summary logging after diffusion forward passes.",
)
add_negatable_bool_argument(
g,
flag_name="--omni-enable-cpu-offload",
env_var="DYN_VLLM_ENABLE_CPU_OFFLOAD",
default=False,
help="Enable CPU offloading for diffusion models to reduce GPU memory usage.",
)
add_negatable_bool_argument(
g,
flag_name="--omni-enforce-eager",
env_var="DYN_VLLM_ENFORCE_EAGER",
default=False,
help="Disable torch.compile and force eager execution for diffusion models.",
)
# Diffusion parallel configuration
add_argument(
g,
flag_name="--omni-ulysses-degree",
env_var="DYN_VLLM_ULYSSES_DEGREE",
default=1,
arg_type=int,
help="Number of GPUs used for Ulysses sequence parallelism in diffusion.",
)
add_argument(
g,
flag_name="--omni-ring-degree",
env_var="DYN_VLLM_RING_DEGREE",
default=1,
arg_type=int,
help="Number of GPUs used for ring sequence parallelism in diffusion.",
)
add_argument(
g,
flag_name="--omni-cfg-parallel-size",
env_var="DYN_VLLM_CFG_PARALLEL_SIZE",
default=1,
arg_type=int,
choices=[1, 2],
help="Number of GPUs used for classifier free guidance parallelism.",
)
# Headless mode for multi-node TP/PP # Headless mode for multi-node TP/PP
add_negatable_bool_argument( add_negatable_bool_argument(
g, g,
...@@ -339,33 +192,6 @@ class DynamoVllmConfig(ConfigBase): ...@@ -339,33 +192,6 @@ class DynamoVllmConfig(ConfigBase):
str, EmbeddingTransferMode str, EmbeddingTransferMode
] # resolved to enum in validate() ] # resolved to enum in validate()
# vLLM-Omni
omni: bool
stage_configs_path: Optional[str] = None
# Video encoding
default_video_fps: int = 16
# Diffusion engine-level parameters (passed to AsyncOmni constructor).
# Field names use omni_ prefix to match the --omni-* CLI flags and avoid
# collisions with vLLM's native engine args (e.g. enforce_eager).
omni_enable_layerwise_offload: bool = False
omni_layerwise_num_gpu_layers: int = 1
omni_vae_use_slicing: bool = False
omni_vae_use_tiling: bool = False
omni_boundary_ratio: float = 0.875
omni_flow_shift: Optional[float] = None
omni_diffusion_cache_backend: Optional[str] = None
omni_diffusion_cache_config: Optional[str] = None
omni_enable_cache_dit_summary: bool = False
omni_enable_cpu_offload: bool = False
omni_enforce_eager: bool = False
# Diffusion parallel configuration
omni_ulysses_degree: int = 1
omni_ring_degree: int = 1
omni_cfg_parallel_size: int = 1
# Headless mode for multi-node TP/PP # Headless mode for multi-node TP/PP
headless: bool = False headless: bool = False
...@@ -378,7 +204,6 @@ class DynamoVllmConfig(ConfigBase): ...@@ -378,7 +204,6 @@ class DynamoVllmConfig(ConfigBase):
self._resolve_embedding_transfer_mode() self._resolve_embedding_transfer_mode()
self._validate_multimodal_role_exclusivity() self._validate_multimodal_role_exclusivity()
self._validate_multimodal_requires_flag() self._validate_multimodal_requires_flag()
self._validate_omni_stage_config()
def _resolve_embedding_transfer_mode(self) -> None: def _resolve_embedding_transfer_mode(self) -> None:
"""Resolve embedding_transfer_mode from string to enum.""" """Resolve embedding_transfer_mode from string to enum."""
...@@ -467,11 +292,3 @@ class DynamoVllmConfig(ConfigBase): ...@@ -467,11 +292,3 @@ class DynamoVllmConfig(ConfigBase):
raise ValueError( raise ValueError(
"Use --enable-multimodal when enabling any multimodal component" "Use --enable-multimodal when enabling any multimodal component"
) )
def _validate_omni_stage_config(self) -> None:
"""Require stage_configs_path when using --omni."""
if self.stage_configs_path and not self.omni:
raise ValueError(
"--stage-configs-path is only allowed when using --omni. "
"Specify a YAML file containing stage configurations for the multi-stage pipeline."
)
...@@ -19,10 +19,8 @@ from vllm.v1.metrics.prometheus import setup_multiprocess_prometheus ...@@ -19,10 +19,8 @@ from vllm.v1.metrics.prometheus import setup_multiprocess_prometheus
from dynamo import prometheus_names from dynamo import prometheus_names
from dynamo.common.config_dump import dump_config from dynamo.common.config_dump import dump_config
from dynamo.common.storage import get_fs
from dynamo.common.utils.endpoint_types import parse_endpoint_types from dynamo.common.utils.endpoint_types import parse_endpoint_types
from dynamo.common.utils.graceful_shutdown import install_signal_handlers from dynamo.common.utils.graceful_shutdown import install_signal_handlers
from dynamo.common.utils.output_modalities import get_output_modalities
from dynamo.common.utils.prometheus import ( from dynamo.common.utils.prometheus import (
LLMBackendMetrics, LLMBackendMetrics,
register_engine_metrics_callback, register_engine_metrics_callback,
...@@ -44,11 +42,7 @@ from . import envs ...@@ -44,11 +42,7 @@ from . import envs
from .args import Config, _uses_dynamo_connector, parse_args from .args import Config, _uses_dynamo_connector, parse_args
from .constants import DisaggregationMode from .constants import DisaggregationMode
from .handlers import DecodeWorkerHandler, PrefillWorkerHandler, get_dp_range_for_worker from .handlers import DecodeWorkerHandler, PrefillWorkerHandler, get_dp_range_for_worker
from .health_check import ( from .health_check import VllmHealthCheckPayload, VllmPrefillHealthCheckPayload
VllmHealthCheckPayload,
VllmOmniHealthCheckPayload,
VllmPrefillHealthCheckPayload,
)
from .publisher import DYNAMO_COMPONENT_REGISTRY, StatLoggerFactory from .publisher import DYNAMO_COMPONENT_REGISTRY, StatLoggerFactory
from .snapshot import get_checkpoint_config from .snapshot import get_checkpoint_config
...@@ -180,9 +174,6 @@ async def worker() -> None: ...@@ -180,9 +174,6 @@ async def worker() -> None:
snapshot_engine=snapshot_engine, snapshot_engine=snapshot_engine,
) )
logger.debug("multimodal worker completed") logger.debug("multimodal worker completed")
elif config.omni:
await init_omni(runtime, config, shutdown_event)
logger.debug("init_omni completed")
elif config.disaggregation_mode == DisaggregationMode.PREFILL: elif config.disaggregation_mode == DisaggregationMode.PREFILL:
await init_prefill( await init_prefill(
runtime, runtime,
...@@ -1048,86 +1039,6 @@ def get_engine_cache_info(engine: AsyncLLM) -> dict[str, Any]: ...@@ -1048,86 +1039,6 @@ def get_engine_cache_info(engine: AsyncLLM) -> dict[str, Any]:
raise raise
async def init_omni(
runtime: DistributedRuntime, config: Config, shutdown_event: asyncio.Event
) -> None:
"""Initialize Omni worker for multi-stage pipeline generation using vLLM-Omni.
Supports text-to-text, text-to-image, and text-to-video generation
through a single unified OmniHandler.
"""
from dynamo.vllm.omni import OmniHandler
generate_endpoint = runtime.endpoint(
f"{config.namespace}.{config.component}.{config.endpoint}"
)
shutdown_endpoints[:] = [generate_endpoint]
# Initialize media filesystem for storing generated images/videos
media_fs = (
get_fs(config.media_output_fs_url) if config.media_output_fs_url else None
)
# Initialize unified OmniHandler
handler = OmniHandler(
runtime=runtime,
config=config,
default_sampling_params={},
shutdown_event=shutdown_event,
media_output_fs=media_fs,
media_output_http_url=config.media_output_http_url,
)
logger.info(f"Omni worker initialized for model: {config.model}")
# Set up metrics collection for vLLM and LMCache metrics
setup_metrics_collection(config, generate_endpoint, logger)
# TODO: extend for multi-stage pipelines
model_type = get_output_modalities(config.output_modalities, config.model)
if model_type is None:
# Default to Images
model_type = ModelType.Images
await register_model(
ModelInput.Text,
model_type,
generate_endpoint,
config.model,
config.served_model_name,
kv_cache_block_size=config.engine_args.block_size,
)
logger.info("Starting to serve Omni worker endpoint...")
health_check_payload = (
await VllmOmniHealthCheckPayload.create(handler.engine_client)
).to_dict()
try:
await generate_endpoint.serve_endpoint(
handler.generate,
graceful_shutdown=True,
metrics_labels=[
(
prometheus_names.labels.MODEL,
config.served_model_name or config.model,
),
(
prometheus_names.labels.MODEL_NAME,
config.served_model_name or config.model,
),
],
health_check_payload=health_check_payload,
)
except Exception as e:
logger.error(f"Failed to serve Omni endpoint: {e}")
raise
finally:
logger.debug("Cleaning up Omni worker")
handler.cleanup()
def main() -> None: def main() -> None:
uvloop.run(worker()) uvloop.run(worker())
......
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
import os
if "PYTHONHASHSEED" not in os.environ:
os.environ["PYTHONHASHSEED"] = "0"
from dynamo.vllm.omni.main import main
if __name__ == "__main__":
main()
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
"""Omni-specific argument parsing for python -m dynamo.vllm.omni."""
import argparse
import logging
from typing import Optional
from vllm_omni.engine.arg_utils import AsyncOmniEngineArgs
try:
from vllm.utils import FlexibleArgumentParser
except ImportError:
from vllm.utils.argparse_utils import FlexibleArgumentParser
from dynamo.common.configuration.arg_group import ArgGroup
from dynamo.common.configuration.groups.runtime_args import (
DynamoRuntimeArgGroup,
DynamoRuntimeConfig,
)
from dynamo.common.configuration.utils import add_argument, add_negatable_bool_argument
logger = logging.getLogger(__name__)
class OmniArgGroup(ArgGroup):
"""Diffusion pipeline kwargs passed through to AsyncOmni() constructor.
These are NOT part of AsyncOmniEngineArgs (which handles vLLM engine-level
args like model, tp, max_model_len). Instead they are direct constructor
kwargs for AsyncOmni and need Dynamo-side env-var (DYN_OMNI_*) support,
so we define them here rather than relying on the upstream arg parser.
"""
name = "dynamo-omni"
def add_arguments(self, parser) -> None:
g = parser.add_argument_group(
"Omni Diffusion Options",
"Diffusion pipeline parameters for vLLM-Omni multi-stage generation.",
)
add_argument(
g,
flag_name="--stage-configs-path",
env_var="DYN_OMNI_STAGE_CONFIGS_PATH",
default=None,
help="Path to vLLM-Omni stage configuration YAML file (optional).",
)
# Video encoding
add_argument(
g,
flag_name="--default-video-fps",
env_var="DYN_OMNI_DEFAULT_VIDEO_FPS",
default=16,
arg_type=int,
help="Default frames per second for generated videos.",
)
# Layerwise offloading
add_negatable_bool_argument(
g,
flag_name="--enable-layerwise-offload",
env_var="DYN_OMNI_ENABLE_LAYERWISE_OFFLOAD",
default=False,
help="Enable layerwise (blockwise) offloading on DiT modules to reduce GPU memory.",
)
add_argument(
g,
flag_name="--layerwise-num-gpu-layers",
env_var="DYN_OMNI_LAYERWISE_NUM_GPU_LAYERS",
default=1,
arg_type=int,
help="Number of ready layers (blocks) to keep on GPU during generation.",
)
# VAE optimization
add_negatable_bool_argument(
g,
flag_name="--vae-use-slicing",
env_var="DYN_OMNI_VAE_USE_SLICING",
default=False,
help="Enable VAE slicing for memory optimization in diffusion models.",
)
add_negatable_bool_argument(
g,
flag_name="--vae-use-tiling",
env_var="DYN_OMNI_VAE_USE_TILING",
default=False,
help="Enable VAE tiling for memory optimization in diffusion models.",
)
# Diffusion scheduling
add_argument(
g,
flag_name="--boundary-ratio",
env_var="DYN_OMNI_BOUNDARY_RATIO",
default=0.875,
arg_type=float,
help=(
"Boundary split ratio for low/high DiT transformers. "
"Default 0.875 uses both transformers for best quality. "
"Set to 1.0 to load only the low-noise transformer (saves memory)."
),
)
add_argument(
g,
flag_name="--flow-shift",
env_var="DYN_OMNI_FLOW_SHIFT",
default=None,
arg_type=float,
help="Scheduler flow_shift parameter (5.0 for 720p, 12.0 for 480p).",
)
# Cache acceleration
add_argument(
g,
flag_name="--cache-backend",
env_var="DYN_OMNI_CACHE_BACKEND",
default=None,
choices=["cache_dit", "tea_cache"],
help=(
"Cache backend for diffusion acceleration. "
"'cache_dit' enables DBCache + SCM + TaylorSeer. "
"'tea_cache' enables TeaCache."
),
)
add_argument(
g,
flag_name="--cache-config",
env_var="DYN_OMNI_CACHE_CONFIG",
default=None,
help="Cache configuration as JSON string (overrides defaults).",
)
add_negatable_bool_argument(
g,
flag_name="--enable-cache-dit-summary",
env_var="DYN_OMNI_ENABLE_CACHE_DIT_SUMMARY",
default=False,
help="Enable cache-dit summary logging after diffusion forward passes.",
)
# Execution mode
add_negatable_bool_argument(
g,
flag_name="--enable-cpu-offload",
env_var="DYN_OMNI_ENABLE_CPU_OFFLOAD",
default=False,
help="Enable CPU offloading for diffusion models to reduce GPU memory usage.",
)
add_negatable_bool_argument(
g,
flag_name="--enforce-eager",
env_var="DYN_OMNI_ENFORCE_EAGER",
default=False,
help="Disable torch.compile and force eager execution for diffusion models.",
)
# Diffusion parallel configuration
add_argument(
g,
flag_name="--ulysses-degree",
env_var="DYN_OMNI_ULYSSES_DEGREE",
default=1,
arg_type=int,
help="Number of GPUs used for Ulysses sequence parallelism in diffusion.",
)
add_argument(
g,
flag_name="--ring-degree",
env_var="DYN_OMNI_RING_DEGREE",
default=1,
arg_type=int,
help="Number of GPUs used for ring sequence parallelism in diffusion.",
)
add_argument(
g,
flag_name="--cfg-parallel-size",
env_var="DYN_OMNI_CFG_PARALLEL_SIZE",
default=1,
arg_type=int,
choices=[1, 2],
help="Number of GPUs used for classifier free guidance parallelism.",
)
class OmniConfig(DynamoRuntimeConfig):
"""Configuration for Dynamo vLLM-Omni worker."""
component: str = "backend"
endpoint: Optional[str] = None
# mirror vLLM
model: str
served_model_name: Optional[str] = None
# vLLM-Omni engine args
engine_args: AsyncOmniEngineArgs
# OmniArgGroup fields (populated by from_cli_args)
stage_configs_path: Optional[str] = None
default_video_fps: int = 16
enable_layerwise_offload: bool = False
layerwise_num_gpu_layers: int = 1
vae_use_slicing: bool = False
vae_use_tiling: bool = False
boundary_ratio: float = 0.875
flow_shift: Optional[float] = None
cache_backend: Optional[str] = None
cache_config: Optional[str] = None
enable_cache_dit_summary: bool = False
enable_cpu_offload: bool = False
enforce_eager: bool = False
ulysses_degree: int = 1
ring_degree: int = 1
cfg_parallel_size: int = 1
def validate(self) -> None:
DynamoRuntimeConfig.validate(self)
if self.default_video_fps <= 0:
raise ValueError("--default-video-fps must be > 0")
if self.ulysses_degree <= 0:
raise ValueError("--ulysses-degree must be > 0")
if self.ring_degree <= 0:
raise ValueError("--ring-degree must be > 0")
if not (0 < self.boundary_ratio <= 1):
raise ValueError("--boundary-ratio must be in (0, 1]")
def parse_omni_args() -> OmniConfig:
"""Parse command-line arguments for the vLLM-Omni backend."""
dynamo_runtime_argspec = DynamoRuntimeArgGroup()
omni_argspec = OmniArgGroup()
parser = argparse.ArgumentParser(
description="Dynamo vLLM-Omni worker",
formatter_class=argparse.RawTextHelpFormatter,
allow_abbrev=False,
)
dynamo_runtime_argspec.add_arguments(parser)
omni_argspec.add_arguments(parser)
# Add vLLM-Omni engine args
vg = parser.add_argument_group(
"vLLM-Omni Engine Options. Please refer to vLLM-Omni documentation for more details."
)
vllm_parser = FlexibleArgumentParser(add_help=False)
AsyncOmniEngineArgs.add_cli_args(vllm_parser, async_args_only=False)
for action in vllm_parser._actions:
if not action.option_strings:
continue
vg._group_actions.append(action)
args, unknown = parser.parse_known_args()
config = OmniConfig.from_cli_args(args)
vllm_args = vllm_parser.parse_args(unknown)
config.model = vllm_args.model
engine_args = AsyncOmniEngineArgs.from_cli_args(vllm_args)
if getattr(engine_args, "served_model_name", None) is not None:
served = engine_args.served_model_name
if len(served) > 1:
raise ValueError("We do not support multiple model names.")
config.served_model_name = served[0]
config.engine_args = engine_args
config.validate()
return config
...@@ -58,22 +58,11 @@ class BaseOmniHandler(BaseWorkerHandler): ...@@ -58,22 +58,11 @@ class BaseOmniHandler(BaseWorkerHandler):
self.config = config self.config = config
self.model_max_len = config.engine_args.max_model_len self.model_max_len = config.engine_args.max_model_len
self.shutdown_event = shutdown_event self.shutdown_event = shutdown_event
self.use_vllm_tokenizer = config.use_vllm_tokenizer
logger.info(f"{self.__class__.__name__} initialized successfully") logger.info(f"{self.__class__.__name__} initialized successfully")
def _build_omni_kwargs(self, config) -> Dict[str, Any]: def _build_omni_kwargs(self, config) -> Dict[str, Any]:
"""Build keyword arguments for AsyncOmni constructor. """Build keyword arguments for AsyncOmni constructor."""
Constructs the full kwargs dict including engine-level diffusion
parameters and parallel configuration when available.
Args:
config: Parsed Config object.
Returns:
Dictionary of keyword arguments for AsyncOmni.
"""
omni_kwargs: Dict[str, Any] = { omni_kwargs: Dict[str, Any] = {
"model": config.model, "model": config.model,
"trust_remote_code": config.engine_args.trust_remote_code, "trust_remote_code": config.engine_args.trust_remote_code,
...@@ -82,39 +71,34 @@ class BaseOmniHandler(BaseWorkerHandler): ...@@ -82,39 +71,34 @@ class BaseOmniHandler(BaseWorkerHandler):
if config.stage_configs_path: if config.stage_configs_path:
omni_kwargs["stage_configs_path"] = config.stage_configs_path omni_kwargs["stage_configs_path"] = config.stage_configs_path
# Add diffusion engine-level params if present on config. # Diffusion engine-level params — read directly from config namespace
# Config fields use the omni_ prefix; map them to AsyncOmni kwarg names. diffusion_fields = [
diffusion_params = { "enable_layerwise_offload",
# config attr → AsyncOmni kwarg "layerwise_num_gpu_layers",
"omni_enable_layerwise_offload": "enable_layerwise_offload", "vae_use_slicing",
"omni_layerwise_num_gpu_layers": "layerwise_num_gpu_layers", "vae_use_tiling",
"omni_vae_use_slicing": "vae_use_slicing", "boundary_ratio",
"omni_vae_use_tiling": "vae_use_tiling", "flow_shift",
"omni_boundary_ratio": "boundary_ratio", "cache_backend",
"omni_flow_shift": "flow_shift", "cache_config",
"omni_diffusion_cache_backend": "cache_backend", "enable_cache_dit_summary",
"omni_diffusion_cache_config": "cache_config", "enable_cpu_offload",
"omni_enable_cache_dit_summary": "enable_cache_dit_summary", "enforce_eager",
"omni_enable_cpu_offload": "enable_cpu_offload", ]
"omni_enforce_eager": "enforce_eager", for field in diffusion_fields:
} value = getattr(config, field, None)
for config_attr, kwarg_name in diffusion_params.items(): if value is not None:
if hasattr(config, config_attr): omni_kwargs[field] = value
value = getattr(config, config_attr)
if value is not None: # Build DiffusionParallelConfig if available
omni_kwargs[kwarg_name] = value if DiffusionParallelConfig is not None:
# Build DiffusionParallelConfig if parallel params are present
if DiffusionParallelConfig is not None and hasattr(
config, "omni_ulysses_degree"
):
parallel_config = DiffusionParallelConfig( parallel_config = DiffusionParallelConfig(
ulysses_degree=getattr(config, "omni_ulysses_degree", 1), ulysses_degree=getattr(config, "ulysses_degree", 1),
ring_degree=getattr(config, "omni_ring_degree", 1), ring_degree=getattr(config, "ring_degree", 1),
cfg_parallel_size=getattr(config, "omni_cfg_parallel_size", 1), cfg_parallel_size=getattr(config, "cfg_parallel_size", 1),
) )
omni_kwargs["parallel_config"] = parallel_config omni_kwargs["parallel_config"] = parallel_config
elif DiffusionParallelConfig is None: else:
logger.warning( logger.warning(
"DiffusionParallelConfig not available; " "DiffusionParallelConfig not available; "
"skipping parallel config for AsyncOmni" "skipping parallel config for AsyncOmni"
......
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
"""Omni worker entrypoint for python -m dynamo.vllm.omni."""
import asyncio
import logging
import os
import uvloop
from dynamo import prometheus_names
from dynamo.common.config_dump import dump_config
from dynamo.common.storage import get_fs
from dynamo.common.utils.graceful_shutdown import install_signal_handlers
from dynamo.common.utils.output_modalities import get_output_modalities
from dynamo.common.utils.runtime import create_runtime
from dynamo.llm import ModelInput, ModelType, fetch_model, register_model
from dynamo.runtime import DistributedRuntime
from dynamo.runtime.logging import configure_dynamo_logging
from dynamo.vllm.health_check import VllmOmniHealthCheckPayload
from dynamo.vllm.main import setup_metrics_collection
from .args import OmniConfig, parse_omni_args
configure_dynamo_logging()
logger = logging.getLogger(__name__)
shutdown_endpoints: list = []
async def init_omni(
runtime: DistributedRuntime, config: OmniConfig, shutdown_event: asyncio.Event
):
"""Initialize Omni worker for multi-stage pipeline generation."""
from dynamo.vllm.omni import OmniHandler
generate_endpoint = runtime.endpoint(
f"{config.namespace}.{config.component}.{config.endpoint}"
)
shutdown_endpoints[:] = [generate_endpoint]
media_fs = (
get_fs(config.media_output_fs_url) if config.media_output_fs_url else None
)
handler = OmniHandler(
runtime=runtime,
config=config,
default_sampling_params={},
shutdown_event=shutdown_event,
media_output_fs=media_fs,
media_output_http_url=config.media_output_http_url,
)
logger.info("Omni worker initialized for model: %s", config.model)
setup_metrics_collection(config, generate_endpoint, logger)
if config.engine_args.data_parallel_rank:
logger.info(
"Non-leader DP rank %d; skipping endpoint registration",
config.engine_args.data_parallel_rank,
)
await shutdown_event.wait()
return
model_type = get_output_modalities(config.output_modalities, config.model)
if model_type is None:
model_type = ModelType.Images
await register_model(
ModelInput.Text,
model_type,
generate_endpoint,
config.model,
config.served_model_name,
kv_cache_block_size=config.engine_args.block_size,
)
logger.info("Starting to serve Omni worker endpoint...")
health_check_payload = (
await VllmOmniHealthCheckPayload.create(handler.engine_client)
).to_dict()
try:
await generate_endpoint.serve_endpoint(
handler.generate,
graceful_shutdown=True,
metrics_labels=[
(
prometheus_names.labels.MODEL,
config.served_model_name or config.model,
),
(
prometheus_names.labels.MODEL_NAME,
config.served_model_name or config.model,
),
],
health_check_payload=health_check_payload,
)
except Exception as e:
logger.error("Failed to serve Omni endpoint: %s", e)
raise
finally:
logger.debug("Cleaning up Omni worker")
handler.cleanup()
async def worker():
config = parse_omni_args()
dump_config(config.dump_config_to, config)
if not config.served_model_name:
config.served_model_name = config.engine_args.served_model_name = config.model
if not os.path.exists(config.model):
await fetch_model(config.model)
shutdown_event = asyncio.Event()
runtime, loop = create_runtime(
discovery_backend=config.discovery_backend,
request_plane=config.request_plane,
event_plane=config.event_plane,
use_kv_events=False,
)
install_signal_handlers(loop, runtime, shutdown_endpoints, shutdown_event)
await init_omni(runtime, config, shutdown_event)
logger.debug("Omni worker completed, exiting...")
def main():
uvloop.run(worker())
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
"""Unit tests for OmniConfig validation."""
from types import SimpleNamespace
import pytest
try:
from dynamo.vllm.omni.args import OmniConfig
except ImportError:
pytest.skip("vLLM omni dependencies not available", allow_module_level=True)
pytestmark = [
pytest.mark.unit,
pytest.mark.vllm,
pytest.mark.gpu_1,
pytest.mark.pre_merge,
]
def _make_omni_config(**overrides) -> OmniConfig:
"""Build a minimal OmniConfig with valid defaults, applying overrides."""
defaults = {
# DynamoRuntimeConfig fields
"namespace": "dynamo",
"component": "backend",
"endpoint": None,
"discovery_backend": "etcd",
"request_plane": "tcp",
"event_plane": "nats",
"connector": [],
"enable_local_indexer": True,
"durable_kv_events": False,
"dyn_tool_call_parser": None,
"dyn_reasoning_parser": None,
"custom_jinja_template": None,
"endpoint_types": "chat,completions",
"dump_config_to": None,
"multimodal_embedding_cache_capacity_gb": 0,
"output_modalities": None,
"media_output_fs_url": "file:///tmp/dynamo_media",
"media_output_http_url": None,
# OmniConfig fields
"model": "test-model",
"served_model_name": None,
"engine_args": SimpleNamespace(),
"stage_configs_path": None,
"default_video_fps": 16,
"enable_layerwise_offload": False,
"layerwise_num_gpu_layers": 1,
"vae_use_slicing": False,
"vae_use_tiling": False,
"boundary_ratio": 0.875,
"flow_shift": None,
"cache_backend": None,
"cache_config": None,
"enable_cache_dit_summary": False,
"enable_cpu_offload": False,
"enforce_eager": False,
"ulysses_degree": 1,
"ring_degree": 1,
"cfg_parallel_size": 1,
}
defaults.update(overrides)
obj = OmniConfig.__new__(OmniConfig)
for k, v in defaults.items():
setattr(obj, k, v)
return obj
def test_omni_config_valid_defaults():
"""Config with valid defaults passes validation."""
config = _make_omni_config()
config.validate() # should not raise
@pytest.mark.parametrize("fps", [0, -1, -100])
def test_omni_config_invalid_video_fps(fps):
"""Non-positive FPS must be rejected."""
config = _make_omni_config(default_video_fps=fps)
with pytest.raises(ValueError, match="--default-video-fps must be > 0"):
config.validate()
@pytest.mark.parametrize("degree", [0, -1])
def test_omni_config_invalid_ulysses_degree(degree):
"""Non-positive ulysses_degree must be rejected."""
config = _make_omni_config(ulysses_degree=degree)
with pytest.raises(ValueError, match="--ulysses-degree must be > 0"):
config.validate()
@pytest.mark.parametrize("degree", [0, -1])
def test_omni_config_invalid_ring_degree(degree):
"""Non-positive ring_degree must be rejected."""
config = _make_omni_config(ring_degree=degree)
with pytest.raises(ValueError, match="--ring-degree must be > 0"):
config.validate()
@pytest.mark.parametrize("ratio", [0, -0.1, 1.01, 2.0])
def test_omni_config_invalid_boundary_ratio(ratio):
"""boundary_ratio outside (0, 1] must be rejected."""
config = _make_omni_config(boundary_ratio=ratio)
with pytest.raises(ValueError, match=r"--boundary-ratio must be in \(0, 1\]"):
config.validate()
@pytest.mark.parametrize("ratio", [0.001, 0.5, 0.875, 1.0])
def test_omni_config_valid_boundary_ratio(ratio):
"""boundary_ratio within (0, 1] should pass."""
config = _make_omni_config(boundary_ratio=ratio)
config.validate() # should not raise
...@@ -205,13 +205,29 @@ The I2V-specific `nvext` fields (`boundary_ratio`, `guidance_scale_2`) control t ...@@ -205,13 +205,29 @@ The I2V-specific `nvext` fields (`boundary_ratio`, `guidance_scale_2`) control t
## CLI Reference ## CLI Reference
For the full list of Omni-related flags (including `--omni`, `--output-modalities`, `--stage-configs-path`, `--media-output-fs-url`, `--media-output-http-url`, and the `--omni-*` diffusion flags), run: The omni backend uses a dedicated entrypoint: `python -m dynamo.vllm.omni`.
```bash | Flag | Description |
python -m dynamo.vllm --help |---|---|
``` | `--output-modalities <modality>` | Output modality: `text`, `image`, or `video` |
| `--stage-configs-path <path>` | Path to stage config YAML (optional; vLLM-Omni uses model defaults if omitted) |
See also the [Argument Reference](vllm-reference-guide.md#argument-reference) in the Reference Guide. | `--boundary-ratio <float>` | MoE expert switching boundary (default: 0.875) |
| `--flow-shift <float>` | Scheduler flow_shift (5.0 for 720p, 12.0 for 480p) |
| `--vae-use-slicing` | Enable VAE slicing for memory optimization |
| `--vae-use-tiling` | Enable VAE tiling for memory optimization |
| `--default-video-fps <int>` | Default frames per second for generated videos (default: 16) |
| `--enable-layerwise-offload` | Enable layerwise offloading on DiT modules to reduce GPU memory |
| `--layerwise-num-gpu-layers <int>` | Number of ready layers to keep on GPU during generation (default: 1) |
| `--cache-backend <backend>` | Diffusion cache: `cache_dit` or `tea_cache` |
| `--cache-config <json>` | Cache configuration as JSON string (overrides defaults) |
| `--enable-cache-dit-summary` | Enable cache-dit summary logging after diffusion forward passes |
| `--enforce-eager` | Disable torch.compile for diffusion models |
| `--enable-cpu-offload` | Enable CPU offloading for diffusion models |
| `--ulysses-degree <int>` | GPUs for Ulysses sequence parallelism in diffusion (default: 1) |
| `--ring-degree <int>` | GPUs for ring sequence parallelism in diffusion (default: 1) |
| `--cfg-parallel-size <int>` | GPUs for classifier-free guidance parallelism (1 or 2, default: 1) |
| `--media-output-fs-url <url>` | Filesystem URL for storing generated media (default: `file:///tmp/dynamo_media`) |
| `--media-output-http-url <url>` | Base URL for rewriting media paths in responses (optional) |
## Storage Configuration ## Storage Configuration
......
...@@ -33,6 +33,8 @@ done ...@@ -33,6 +33,8 @@ done
HTTP_PORT="${DYN_HTTP_PORT:-8000}" HTTP_PORT="${DYN_HTTP_PORT:-8000}"
print_launch_banner "Launching vLLM-Omni Text-to-Text (1 GPU)" "$MODEL" "$HTTP_PORT" print_launch_banner "Launching vLLM-Omni Text-to-Text (1 GPU)" "$MODEL" "$HTTP_PORT"
# Disable version check for flashinfer
export FLASHINFER_DISABLE_VERSION_CHECK=1
# Run ingress (frontend) # Run ingress (frontend)
# dynamo.frontend accepts either --http-port flag or DYN_HTTP_PORT env var (defaults to 8000) # dynamo.frontend accepts either --http-port flag or DYN_HTTP_PORT env var (defaults to 8000)
...@@ -44,9 +46,8 @@ sleep 2 ...@@ -44,9 +46,8 @@ sleep 2
echo "Starting Omni worker..." echo "Starting Omni worker..."
DYN_SYSTEM_PORT=${DYN_SYSTEM_PORT:-8081} \ DYN_SYSTEM_PORT=${DYN_SYSTEM_PORT:-8081} \
python -m dynamo.vllm \ python -m dynamo.vllm.omni \
--model "$MODEL" \ --model "$MODEL" \
--omni \
--stage-configs-path "$STAGE_CONFIG" \ --stage-configs-path "$STAGE_CONFIG" \
"${EXTRA_ARGS[@]}" & "${EXTRA_ARGS[@]}" &
......
...@@ -49,9 +49,8 @@ sleep 2 ...@@ -49,9 +49,8 @@ sleep 2
echo "Starting Omni worker..." echo "Starting Omni worker..."
DYN_SYSTEM_PORT=${DYN_SYSTEM_PORT:-8081} \ DYN_SYSTEM_PORT=${DYN_SYSTEM_PORT:-8081} \
python -m dynamo.vllm \ python -m dynamo.vllm.omni \
--model "$MODEL" \ --model "$MODEL" \
--omni \
--output-modalities video \ --output-modalities video \
--media-output-fs-url file:///tmp/dynamo_media \ --media-output-fs-url file:///tmp/dynamo_media \
"${EXTRA_ARGS[@]}" "${EXTRA_ARGS[@]}"
...@@ -36,9 +36,8 @@ sleep 2 ...@@ -36,9 +36,8 @@ sleep 2
echo "Starting Omni worker..." echo "Starting Omni worker..."
DYN_SYSTEM_PORT=${DYN_SYSTEM_PORT:-8081} \ DYN_SYSTEM_PORT=${DYN_SYSTEM_PORT:-8081} \
python -m dynamo.vllm \ python -m dynamo.vllm.omni \
--model "$MODEL" \ --model "$MODEL" \
--omni \
--output-modalities image \ --output-modalities image \
--media-output-fs-url file:///tmp/dynamo_media \ --media-output-fs-url file:///tmp/dynamo_media \
"${EXTRA_ARGS[@]}" & "${EXTRA_ARGS[@]}" &
......
...@@ -36,9 +36,8 @@ sleep 2 ...@@ -36,9 +36,8 @@ sleep 2
echo "Starting Omni worker..." echo "Starting Omni worker..."
DYN_SYSTEM_PORT=${DYN_SYSTEM_PORT:-8081} \ DYN_SYSTEM_PORT=${DYN_SYSTEM_PORT:-8081} \
python -m dynamo.vllm \ python -m dynamo.vllm.omni \
--model "$MODEL" \ --model "$MODEL" \
--omni \
--output-modalities video \ --output-modalities video \
--media-output-fs-url file:///tmp/dynamo_media \ --media-output-fs-url file:///tmp/dynamo_media \
"${EXTRA_ARGS[@]}" & "${EXTRA_ARGS[@]}" &
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment