utils.py 11.6 KB
Newer Older
1
2
# SPDX-License-Identifier: Apache-2.0

3
from itertools import groupby
4
from pathlib import Path
5
from typing import TYPE_CHECKING, Optional, TypeVar, Union
6
from urllib.parse import ParseResult, urlparse
7

8
import numpy as np
9
import numpy.typing as npt
10
import torch
11
from PIL import Image
12
import os
13

14
import vllm.envs as envs
15
from vllm.connections import HTTPConnection, global_http_connection
16

17
18
from .audio import AudioMediaIO
from .base import MediaIO
19
from .image import ImageEmbeddingMediaIO, ImageMediaIO
20
21
from .inputs import PlaceholderRange
from .video import VideoMediaIO
22

23
_M = TypeVar("_M")
24

25
26
if TYPE_CHECKING:
    from .hasher import MultiModalHashDict
27
    from .inputs import MultiModalKwargs, MultiModalPlaceholderDict
28
29


30
class MediaConnector:
31

32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
    def __init__(
        self,
        connection: HTTPConnection = global_http_connection,
        *,
        allowed_local_media_path: str = "",
    ) -> None:
        super().__init__()

        self.connection = connection

        if allowed_local_media_path:
            allowed_local_media_path_ = Path(allowed_local_media_path)

            if not allowed_local_media_path_.exists():
                raise ValueError(
                    "Invalid `--allowed-local-media-path`: The path "
                    f"{allowed_local_media_path_} does not exist.")
            if not allowed_local_media_path_.is_dir():
                raise ValueError(
                    "Invalid `--allowed-local-media-path`: The path "
                    f"{allowed_local_media_path_} must be a directory.")
        else:
            allowed_local_media_path_ = None

        self.allowed_local_media_path = allowed_local_media_path_

    def _load_data_url(
        self,
        url_spec: ParseResult,
        media_io: MediaIO[_M],
    ) -> _M:
        data_spec, data = url_spec.path.split(",", 1)
        media_type, data_type = data_spec.split(";", 1)

        if data_type != "base64":
            msg = "Only base64 data URLs are supported for now."
            raise NotImplementedError(msg)

        return media_io.load_base64(media_type, data)

    def _load_file_url(
        self,
        url_spec: ParseResult,
        media_io: MediaIO[_M],
    ) -> _M:
        allowed_local_media_path = self.allowed_local_media_path
        if allowed_local_media_path is None:
            raise RuntimeError("Cannot load local files without "
                               "`--allowed-local-media-path`.")

        filepath = Path(url_spec.path)
        if allowed_local_media_path not in filepath.resolve().parents:
84
            raise ValueError(
85
86
                f"The file path {filepath} must be a subpath "
                f"of `--allowed-local-media-path` {allowed_local_media_path}.")
zhuwenwen's avatar
zhuwenwen committed
87

88
        return media_io.load_file(filepath)
89

zhuwenwen's avatar
zhuwenwen committed
90

91
92
93
94
95
96
97
98
    def load_from_url(
        self,
        url: str,
        media_io: MediaIO[_M],
        *,
        fetch_timeout: Optional[int] = None,
    ) -> _M:
        url_spec = urlparse(url)
99

100
101
102
        if url_spec.scheme.startswith("http"):
            connection = self.connection
            data = connection.get_bytes(url, timeout=fetch_timeout)
103

104
            return media_io.load_bytes(data)
105

106
107
        if url_spec.scheme == "data":
            return self._load_data_url(url_spec, media_io)
108

109
110
        if url_spec.scheme == "file":
            return self._load_file_url(url_spec, media_io)
111

112
113
        msg = "The URL must be either a HTTP, data or file URL."
        raise ValueError(msg)
114

115
116
117
118
119
120
121
122
    async def load_from_url_async(
        self,
        url: str,
        media_io: MediaIO[_M],
        *,
        fetch_timeout: Optional[int] = None,
    ) -> _M:
        url_spec = urlparse(url)
123

124
125
126
        if url_spec.scheme.startswith("http"):
            connection = self.connection
            data = await connection.async_get_bytes(url, timeout=fetch_timeout)
127

128
            return media_io.load_bytes(data)
129

130
131
        if url_spec.scheme == "data":
            return self._load_data_url(url_spec, media_io)
132

133
134
        if url_spec.scheme == "file":
            return self._load_file_url(url_spec, media_io)
135

136
137
        msg = "The URL must be either a HTTP, data or file URL."
        raise ValueError(msg)
138

139
140
141
142
143
144
145
146
    def fetch_audio(
        self,
        audio_url: str,
    ) -> tuple[np.ndarray, Union[int, float]]:
        """
        Load audio from a URL.
        """
        audio_io = AudioMediaIO()
147

148
        return self.load_from_url(
149
            audio_url,
150
151
            audio_io,
            fetch_timeout=envs.VLLM_AUDIO_FETCH_TIMEOUT,
152
        )
153

154
155
156
157
158
159
160
161
    async def fetch_audio_async(
        self,
        audio_url: str,
    ) -> tuple[np.ndarray, Union[int, float]]:
        """
        Asynchronously fetch audio from a URL.
        """
        audio_io = AudioMediaIO()
162

163
        return await self.load_from_url_async(
164
            audio_url,
165
166
            audio_io,
            fetch_timeout=envs.VLLM_AUDIO_FETCH_TIMEOUT,
167
        )
168

169
170
171
172
173
174
175
176
    def fetch_image(
        self,
        image_url: str,
        *,
        image_mode: str = "RGB",
    ) -> Image.Image:
        """
        Load a PIL image from a HTTP or base64 data URL.
177

178
179
180
        By default, the image is converted into RGB format.
        """
        image_io = ImageMediaIO(image_mode=image_mode)
181

182
183
184
185
186
        return self.load_from_url(
            image_url,
            image_io,
            fetch_timeout=envs.VLLM_IMAGE_FETCH_TIMEOUT,
        )
187

188
189
    async def fetch_image_async(
        self,
190
191
        image_url: str,
        *,
192
193
194
195
        image_mode: str = "RGB",
    ) -> Image.Image:
        """
        Asynchronously load a PIL image from a HTTP or base64 data URL.
196

197
198
199
        By default, the image is converted into RGB format.
        """
        image_io = ImageMediaIO(image_mode=image_mode)
200

201
202
203
204
205
        return await self.load_from_url_async(
            image_url,
            image_io,
            fetch_timeout=envs.VLLM_IMAGE_FETCH_TIMEOUT,
        )
206

207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
    def fetch_video(
        self,
        video_url: str,
        *,
        image_mode: str = "RGB",
        num_frames: int = 32,
    ) -> npt.NDArray:
        """
        Load video from a HTTP or base64 data URL.
        """
        image_io = ImageMediaIO(image_mode=image_mode)
        video_io = VideoMediaIO(image_io, num_frames=num_frames)

        return self.load_from_url(
            video_url,
            video_io,
            fetch_timeout=envs.VLLM_VIDEO_FETCH_TIMEOUT,
        )
225

226
227
228
    async def fetch_video_async(
        self,
        video_url: str,
229
        *,
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
        image_mode: str = "RGB",
        num_frames: int = 32,
    ) -> npt.NDArray:
        """
        Asynchronously load video from a HTTP or base64 data URL.

        By default, the image is converted into RGB format.
        """
        image_io = ImageMediaIO(image_mode=image_mode)
        video_io = VideoMediaIO(image_io, num_frames=num_frames)

        return await self.load_from_url_async(
            video_url,
            video_io,
            fetch_timeout=envs.VLLM_VIDEO_FETCH_TIMEOUT,
        )
246

247
248
249
250
251
252
253
254
255
256
257
    def fetch_image_embedding(
        self,
        data: str,
    ) -> torch.Tensor:
        """
        Load image embedding from a URL.
        """
        image_embedding_io = ImageEmbeddingMediaIO()

        return image_embedding_io.load_base64("", data)

258

259
260
261
262
263
264
global_media_connector = MediaConnector()
"""The global :class:`MediaConnector` instance used by vLLM."""

fetch_audio = global_media_connector.fetch_audio
fetch_image = global_media_connector.fetch_image
fetch_video = global_media_connector.fetch_video
265
266


267
268
269
270
271
def encode_audio_base64(
    audio: np.ndarray,
    sampling_rate: int,
) -> str:
    """Encode audio as base64."""
272
273
    audio_io = AudioMediaIO()
    return audio_io.encode_base64((audio, sampling_rate))
274
275


276
277
278
279
280
281
282
283
def encode_image_base64(
    image: Image.Image,
    *,
    image_mode: str = "RGB",
    format: str = "JPEG",
) -> str:
    """
    Encode a pillow image to base64 format.
284

285
286
    By default, the image is converted into RGB format before being encoded.
    """
287
288
    image_io = ImageMediaIO(image_mode=image_mode)
    return image_io.encode_base64(image, image_format=format)
289
290


291
def encode_video_base64(frames: npt.NDArray) -> str:
292
293
294
    image_io = ImageMediaIO()
    video_io = VideoMediaIO(image_io)
    return video_io.encode_base64(frames)
295
296


297
298
299
300
301
302
303
304
305
306
307
308
309
def merge_and_sort_multimodal_metadata(
    mm_positions: "MultiModalPlaceholderDict",
    mm_hashes: Optional["MultiModalHashDict"],
) -> tuple[list[str], list[PlaceholderRange], Optional[list[str]]]:
    """Given a MultiModalPlaceholderDict, merge all PlaceholderRange
    objects from all available modalities into a single list of 
    PlaceholderRange, sorted by their offset (starting index in the input 
    sequence) in the ascending order.

    Optionally if a MultiModalHashDict is given, same operation will be 
    applied to the object and the sorted list of hashes will be returned.
    
    Returns:
310
311
        list[str]: List of item modalities in order of their positions in
            the input sequence.
312
313
314
315
316
317
318
319
320
321
322
323
324
        list[PlaceholderRange]: Sorted list of all PlaceholdeRanges from 
            mm_positions.
        Optional[list[str]]: Sorted list of all hashes from mm_hashes if 
            given, None otherwise.
    """

    modalities = list(mm_positions.keys())

    assert len(modalities) > 0, "No modalities found in the mm_positions."

    # For single modality, placeholder ranges and hashes are already sorted
    # so we can return the list directly.
    if len(modalities) == 1:
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
        modality = modalities[0]
        placeholder_list = list(mm_positions[modality])

        return [modality] * len(
            placeholder_list
        ), placeholder_list, None if not mm_hashes else mm_hashes[modality]

    # Create a list of (modality, placeholder, hash) tuples for all placeholders
    all_items = []
    for modality in modalities:
        placeholder_list = list(mm_positions[modality])
        hash_list: list[Optional[str]] = list(
            mm_hashes[modality]) if mm_hashes and modality in mm_hashes else [
                None
            ] * len(placeholder_list)

        for placeholder, hash_value in zip(placeholder_list, hash_list):
            all_items.append((modality, placeholder, hash_value))

    # Sort all items by offset
345
    all_items.sort(key=lambda x: x[1]['offset'])
346
347
348
349
350
351

    # Split into separate lists
    sorted_modalities = [item[0] for item in all_items]
    merged_placeholders = [item[1] for item in all_items]
    merged_hashes = [str(item[2])
                     for item in all_items] if mm_hashes is not None else None
352
353

    return sorted_modalities, merged_placeholders, merged_hashes
354
355
356
357
358
359
360
361
362
363
364
365
366


def group_mm_inputs_by_modality(
        mm_inputs: list["MultiModalKwargs"]) -> list[list["MultiModalKwargs"]]:
    """Group consecutive MultiModalKwargs from mm_inputs with the same modality 
    together into the same list for batching purpose. For MultiModalKwargs with 
    multiple modalities, put them into their own list.

    Args:
        mm_inputs: List of MultiModalKwargs.

    Returns:
        list[list[MultiModalKwargs]]: List of list of MultiModalKwargs, each 
367
        inner list contains consecutive MultiModalKwargs with same modality.
368
369
370
371
372
373
374
375
376
377
    """
    if not mm_inputs:
        return []

    def modality_group_func(mm_input: "MultiModalKwargs") -> Union[str, int]:
        # If the input has multiple modalities, return a id as the unique key
        # for the mm_input input.
        if len(mm_input.modalities) > 1:
            return id(mm_input)

378
379
380
381
382
383
384
        elif len(mm_input.modalities) == 1:
            return list(mm_input.modalities)[0]

        # FIXME(Isotr0py): Modality of mm_input from legacy pipeline is empty,
        # this is used to make InternVL with legacy pipeline still work with v1.
        else:
            return ""
385
386
387
388

    return [
        list(group) for _, group in groupby(mm_inputs, key=modality_group_func)
    ]