# SPDX-License-Identifier: Apache-2.0 # SPDX-FileCopyrightText: Copyright contributors to the vLLM project """ Pytest configuration and fixtures for vllm-omni tests. """ from typing import Any import pytest from vllm import TextPrompt from vllm.distributed.parallel_state import cleanup_dist_env_and_memory from tests.conftest import _run_post_test_cleanup, _run_pre_test_cleanup from vllm_omni.entrypoints.omni import Omni from vllm_omni.inputs.data import OmniSamplingParams from vllm_omni.outputs import OmniRequestOutput PromptAudioInput = list[tuple[Any, int]] | tuple[Any, int] | None PromptImageInput = list[Any] | Any | None PromptVideoInput = list[Any] | Any | None class OmniRunner: """ Test runner for Omni models. """ def __init__( self, model_name: str, seed: int = 42, stage_init_timeout: int = 300, batch_timeout: int = 10, init_timeout: int = 300, shm_threshold_bytes: int = 65536, log_stats: bool = False, stage_configs_path: str | None = None, **kwargs, ) -> None: """ Initialize an OmniRunner for testing. Args: model_name: The model name or path seed: Random seed for reproducibility stage_init_timeout: Timeout for initializing a single stage in seconds batch_timeout: Timeout for batching in seconds init_timeout: Timeout for initializing stages in seconds shm_threshold_bytes: Threshold for using shared memory log_stats: Enable detailed statistics logging stage_configs_path: Optional path to YAML stage config file **kwargs: Additional arguments passed to Omni """ cleanup_dist_env_and_memory() _run_pre_test_cleanup(enable_force=True) _run_post_test_cleanup(enable_force=True) self.model_name = model_name self.seed = seed self.omni = Omni( model=model_name, log_stats=log_stats, stage_init_timeout=stage_init_timeout, batch_timeout=batch_timeout, init_timeout=init_timeout, shm_threshold_bytes=shm_threshold_bytes, stage_configs_path=stage_configs_path, **kwargs, ) def get_default_sampling_params_list(self) -> list[OmniSamplingParams]: """ Get a list of default sampling parameters for all stages. Returns: List of SamplingParams with default decoding for each stage """ return [st.default_sampling_params for st in self.omni.stage_list] def get_omni_inputs( self, prompts: list[str] | str, system_prompt: str | None = None, audios: PromptAudioInput = None, images: PromptImageInput = None, videos: PromptVideoInput = None, mm_processor_kwargs: dict[str, Any] | None = None, modalities: list[str] | None = None, ) -> list[TextPrompt]: """ Construct Omni input format from prompts and multimodal data. Args: prompts: Text prompt(s) - either a single string or list of strings system_prompt: Optional system prompt (defaults to Qwen system prompt) audios: Audio input(s) - tuple of (audio_array, sample_rate) or list of tuples images: Image input(s) - PIL Image or list of PIL Images videos: Video input(s) - numpy array or list of numpy arrays mm_processor_kwargs: Optional processor kwargs (e.g., use_audio_in_video) Returns: List of prompt dictionaries suitable for Omni.generate() """ if system_prompt is None: system_prompt = ( "You are Qwen, a virtual human developed by the Qwen Team, Alibaba " "Group, capable of perceiving auditory and visual inputs, as well as " "generating text and speech." ) video_padding_token = "<|VIDEO|>" image_padding_token = "<|IMAGE|>" audio_padding_token = "<|AUDIO|>" if self.model_name == "Qwen/Qwen3-Omni-30B-A3B-Instruct": video_padding_token = "<|video_pad|>" image_padding_token = "<|image_pad|>" audio_padding_token = "<|audio_pad|>" if isinstance(prompts, str): prompts = [prompts] def _normalize_mm_input(mm_input, num_prompts): if mm_input is None: return [None] * num_prompts if isinstance(mm_input, list): if len(mm_input) != num_prompts: raise ValueError( f"Multimodal input list length ({len(mm_input)}) must match prompts length ({num_prompts})" ) return mm_input return [mm_input] * num_prompts num_prompts = len(prompts) audios_list = _normalize_mm_input(audios, num_prompts) images_list = _normalize_mm_input(images, num_prompts) videos_list = _normalize_mm_input(videos, num_prompts) omni_inputs = [] for i, prompt_text in enumerate(prompts): user_content = "" multi_modal_data = {} audio = audios_list[i] if audio is not None: if isinstance(audio, list): for _ in audio: user_content += f"<|audio_bos|>{audio_padding_token}<|audio_eos|>" multi_modal_data["audio"] = audio else: user_content += f"<|audio_bos|>{audio_padding_token}<|audio_eos|>" multi_modal_data["audio"] = audio image = images_list[i] if image is not None: if isinstance(image, list): for _ in image: user_content += f"<|vision_bos|>{image_padding_token}<|vision_eos|>" multi_modal_data["image"] = image else: user_content += f"<|vision_bos|>{image_padding_token}<|vision_eos|>" multi_modal_data["image"] = image video = videos_list[i] if video is not None: if isinstance(video, list): for _ in video: user_content += f"<|vision_bos|>{video_padding_token}<|vision_eos|>" multi_modal_data["video"] = video else: user_content += f"<|vision_bos|>{video_padding_token}<|vision_eos|>" multi_modal_data["video"] = video user_content += prompt_text full_prompt = ( f"<|im_start|>system\n{system_prompt}<|im_end|>\n" f"<|im_start|>user\n{user_content}<|im_end|>\n" f"<|im_start|>assistant\n" ) input_dict: TextPrompt = {"prompt": full_prompt} if multi_modal_data: input_dict["multi_modal_data"] = multi_modal_data if modalities: input_dict["modalities"] = modalities if mm_processor_kwargs: input_dict["mm_processor_kwargs"] = mm_processor_kwargs omni_inputs.append(input_dict) return omni_inputs def generate( self, prompts: list[TextPrompt], sampling_params_list: list[OmniSamplingParams] | None = None, ) -> list[OmniRequestOutput]: """ Generate outputs for the given prompts. Args: prompts: List of prompt dictionaries with 'prompt' and optionally 'multi_modal_data' keys sampling_params_list: List of sampling parameters for each stage. If None, uses default parameters. Returns: List of OmniRequestOutput objects from stages with final_output=True """ if sampling_params_list is None: sampling_params_list = self.get_default_sampling_params_list() return self.omni.generate(prompts, sampling_params_list) def generate_multimodal( self, prompts: list[str] | str, sampling_params_list: list[OmniSamplingParams] | None = None, system_prompt: str | None = None, audios: PromptAudioInput = None, images: PromptImageInput = None, videos: PromptVideoInput = None, mm_processor_kwargs: dict[str, Any] | None = None, modalities: list[str] | None = None, ) -> list[OmniRequestOutput]: """ Convenience method to generate with multimodal inputs. Args: prompts: Text prompt(s) sampling_params_list: List of sampling parameters for each stage system_prompt: Optional system prompt audios: Audio input(s) images: Image input(s) videos: Video input(s) mm_processor_kwargs: Optional processor kwargs Returns: List of OmniRequestOutput objects from stages with final_output=True """ omni_inputs = self.get_omni_inputs( prompts=prompts, system_prompt=system_prompt, audios=audios, images=images, videos=videos, mm_processor_kwargs=mm_processor_kwargs, modalities=modalities, ) return self.generate(omni_inputs, sampling_params_list) def generate_audio( self, prompts: list[str] | str, sampling_params_list: list[OmniSamplingParams] | None = None, system_prompt: str | None = None, audios: PromptAudioInput = None, mm_processor_kwargs: dict[str, Any] | None = None, ) -> list[OmniRequestOutput]: """ Convenience method to generate with multimodal inputs. Args: prompts: Text prompt(s) sampling_params_list: List of sampling parameters for each stage system_prompt: Optional system prompt audios: Audio input(s) mm_processor_kwargs: Optional processor kwargs Returns: List of OmniRequestOutput objects from stages with final_output=True """ omni_inputs = self.get_omni_inputs( prompts=prompts, system_prompt=system_prompt, audios=audios, mm_processor_kwargs=mm_processor_kwargs, ) return self.generate(omni_inputs, sampling_params_list) def generate_video( self, prompts: list[str] | str, sampling_params_list: list[OmniSamplingParams] | None = None, system_prompt: str | None = None, videos: PromptVideoInput = None, mm_processor_kwargs: dict[str, Any] | None = None, ) -> list[OmniRequestOutput]: """ Convenience method to generate with multimodal inputs. Args: prompts: Text prompt(s) sampling_params_list: List of sampling parameters for each stage system_prompt: Optional system prompt videos: Video input(s) mm_processor_kwargs: Optional processor kwargs Returns: List of OmniRequestOutput objects from stages with final_output=True """ omni_inputs = self.get_omni_inputs( prompts=prompts, system_prompt=system_prompt, videos=videos, mm_processor_kwargs=mm_processor_kwargs, ) return self.generate(omni_inputs, sampling_params_list) def generate_image( self, prompts: list[str] | str, sampling_params_list: list[OmniSamplingParams] | None = None, system_prompt: str | None = None, images: PromptImageInput = None, mm_processor_kwargs: dict[str, Any] | None = None, ) -> list[OmniRequestOutput]: """ Convenience method to generate with multimodal inputs. Args: prompts: Text prompt(s) sampling_params_list: List of sampling parameters for each stage system_prompt: Optional system prompt images: Image input(s) mm_processor_kwargs: Optional processor kwargs Returns: List of OmniRequestOutput objects from stages with final_output=True """ omni_inputs = self.get_omni_inputs( prompts=prompts, system_prompt=system_prompt, images=images, mm_processor_kwargs=mm_processor_kwargs, ) return self.generate(omni_inputs, sampling_params_list) def __enter__(self): """Context manager entry.""" return self def __exit__(self, exc_type, exc_val, exc_tb): """Context manager exit - cleanup resources.""" self.close() del self.omni cleanup_dist_env_and_memory() _run_post_test_cleanup(enable_force=True) def close(self): """Close and cleanup the Omni instance.""" if hasattr(self.omni, "close"): self.omni.close() @pytest.fixture(scope="session") def omni_runner(): return OmniRunner