multimodal.py 6.48 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0

import os
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Optional, Type

import pytest

from dynamo.common.utils.paths import WORKSPACE_DIR
from tests.serve.conftest import MULTIMODAL_IMG_URL
from tests.utils.engine_process import EngineConfig
from tests.utils.payload_builder import chat_payload
from tests.utils.payloads import BasePayload, ChatPayload

LOCAL_VIDEO_TEST_PATH = Path(
    WORKSPACE_DIR, "lib/llm/tests/data/media/240p_10.mp4"
).resolve()
LOCAL_VIDEO_TEST_URI = LOCAL_VIDEO_TEST_PATH.as_uri()

AUDIO_TEST_URL = (
    "https://raw.githubusercontent.com/yuekaizhang/Triton-ASR-Client"
    "/main/datasets/mini_en/wav/1221-135766-0002.wav"
)


# ---------------------------------------------------------------------------
# Payload factories
# ---------------------------------------------------------------------------


def make_image_payload(expected_response: list[str]) -> ChatPayload:
    """Standard image color-identification payload using MULTIMODAL_IMG_URL."""
    return chat_payload(
        [
            {
                "type": "text",
                "text": "What colors are in the following image? "
                "Respond only with the colors.",
            },
            {
                "type": "image_url",
                "image_url": {"url": MULTIMODAL_IMG_URL},
            },
        ],
        repeat_count=1,
        expected_response=expected_response,
        temperature=0.0,
        max_tokens=100,
    )


def make_video_payload(expected_response: list[str]) -> ChatPayload:
    """Standard video description payload using the local test video."""
    return chat_payload(
        [
            {"type": "text", "text": "Describe the video in detail"},
            {
                "type": "video_url",
                "video_url": {"url": LOCAL_VIDEO_TEST_URI},
            },
        ],
        repeat_count=1,
        expected_response=expected_response,
        temperature=0.0,
        max_tokens=100,
    )


def make_audio_payload(expected_response: list[str]) -> ChatPayload:
    """Standard audio transcription payload using the remote test WAV."""
    return chat_payload(
        [
            {"type": "text", "text": "What is recited in the audio?"},
            {
                "type": "audio_url",
                "audio_url": {"url": AUDIO_TEST_URL},
            },
        ],
        repeat_count=1,
        expected_response=expected_response,
        temperature=0.0,
        max_tokens=100,
    )


# ---------------------------------------------------------------------------
# Config dataclasses
# ---------------------------------------------------------------------------


@dataclass
class TopologyConfig:
    """Per-topology overrides for marks, timeout, and VRAM profiling."""

    marks: list[Any] = field(default_factory=list)
    timeout_s: int = 300
    profiled_vram_gib: Optional[float] = None
    requested_vllm_kv_cache_bytes: Optional[int] = None
    delayed_start: int = 0
    directory: Optional[str] = None  # override profile-level directory
    gpu_marker: Optional[str] = None  # override profile-level gpu_marker
    single_gpu: bool = False  # append --single-gpu to script_args
105
    env: dict[str, str] = field(default_factory=dict)  # extra env vars for subprocess
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190


@dataclass
class MultimodalModelProfile:
    """Describes a multimodal model's test-relevant properties.

    Each profile generates one config per topology in ``topologies``
    via :func:`make_multimodal_configs`.
    """

    name: str  # HuggingFace model ID
    short_name: str  # kebab-case slug for config key
    topologies: dict[str, TopologyConfig]
    request_payloads: list[BasePayload]
    gpu_marker: str = "gpu_1"
    extra_vllm_args: list[str] = field(default_factory=list)
    marks: list[Any] = field(default_factory=list)  # shared across all topologies
    gated: bool = False  # if True, skip unless DYN_HF_GATED_MODELS_ENABLED=1


# ---------------------------------------------------------------------------
# Config generator
# ---------------------------------------------------------------------------


def make_multimodal_configs(
    profile: MultimodalModelProfile,
    config_cls: Type[EngineConfig],
    directory: str,
    topology_scripts: dict[str, str],
) -> dict[str, EngineConfig]:
    """Generate config entries for each topology in *profile*.

    Parameters
    ----------
    config_cls:
        The concrete config class to instantiate (e.g. ``VLLMConfig``).
    directory:
        Default directory; overridden by ``TopologyConfig.directory`` if set.
    topology_scripts:
        Mapping from topology key to shell script filename.
    """
    configs: dict[str, EngineConfig] = {}
    for topology, topo_cfg in profile.topologies.items():
        script_name = topology_scripts[topology]
        script_args = ["--model", profile.name] + profile.extra_vllm_args
        if topo_cfg.single_gpu:
            script_args.append("--single-gpu")

        gpu = topo_cfg.gpu_marker or profile.gpu_marker
        marks: list[Any] = [
            getattr(pytest.mark, gpu),
            pytest.mark.timeout(topo_cfg.timeout_s),
        ]
        marks.extend(topo_cfg.marks)
        if topo_cfg.profiled_vram_gib is not None:
            marks.append(pytest.mark.profiled_vram_gib(topo_cfg.profiled_vram_gib))
        if topo_cfg.requested_vllm_kv_cache_bytes is not None:
            marks.append(
                pytest.mark.requested_vllm_kv_cache_bytes(
                    topo_cfg.requested_vllm_kv_cache_bytes
                )
            )
        if profile.gated:
            marks.append(
                pytest.mark.skipif(
                    not os.environ.get("DYN_HF_GATED_MODELS_ENABLED"),
                    reason=(
                        f"{profile.name} is gated; set DYN_HF_GATED_MODELS_ENABLED=1 "
                        "with an HF_TOKEN that has accepted the license"
                    ),
                )
            )
        marks.extend(profile.marks)

        key = f"mm_{topology}_{profile.short_name}"
        configs[key] = config_cls(
            name=key,
            directory=topo_cfg.directory or directory,
            script_name=script_name,
            model=profile.name,
            script_args=script_args,
            marks=marks,
            delayed_start=topo_cfg.delayed_start,
            request_payloads=profile.request_payloads,
191
            env=topo_cfg.env,
192
193
        )
    return configs