Unverified Commit 116880a5 authored by Cyrus Leung's avatar Cyrus Leung Committed by GitHub
Browse files

[Bugfix] Make MM batching more robust (#33817)


Signed-off-by: default avatarDarkLight1337 <tlleungac@connect.ust.hk>
parent 4145e50d
......@@ -863,10 +863,11 @@ steps:
torch_nightly: true
source_file_dependencies:
- vllm/
- tests/models/test_terratorch.py
- tests/models/test_transformers.py
- tests/models/test_registry.py
commands:
- pytest -v -s models/test_transformers.py models/test_registry.py
- pytest -v -s models/test_terratorch.py models/test_transformers.py models/test_registry.py
- label: Basic Models Test (Other CPU) # 5min
mirror_hardwares: [amdexperimental, amdproduction]
......
......@@ -804,10 +804,11 @@ steps:
torch_nightly: true
source_file_dependencies:
- vllm/
- tests/models/test_terratorch.py
- tests/models/test_transformers.py
- tests/models/test_registry.py
commands:
- pytest -v -s models/test_transformers.py models/test_registry.py
- pytest -v -s models/test_terratorch.py models/test_transformers.py models/test_registry.py
- label: Basic Models Test (Other CPU) # 5min
timeout_in_minutes: 10
......
......@@ -33,10 +33,11 @@ steps:
timeout_in_minutes: 45
source_file_dependencies:
- vllm/
- tests/models/test_terratorch.py
- tests/models/test_transformers.py
- tests/models/test_registry.py
commands:
- pytest -v -s models/test_transformers.py models/test_registry.py
- pytest -v -s models/test_terratorch.py models/test_transformers.py models/test_registry.py
- label: Basic Models Test (Other CPU) # 5min
depends_on:
......
......@@ -5,8 +5,10 @@ import pytest
import torch
from tests.conftest import VllmRunner
from tests.utils import create_new_process_for_each_test
@create_new_process_for_each_test() # Memory is not cleaned up properly otherwise
@pytest.mark.parametrize(
"model",
[
......@@ -22,10 +24,14 @@ def test_inference(
location_coords = torch.full((1, 2), 1.0, dtype=torch.float16)
prompt = dict(
prompt_token_ids=[1],
multi_modal_data=dict(
pixel_values=pixel_values, location_coords=location_coords
),
multi_modal_data={
"image": {
"pixel_values": pixel_values,
"location_coords": location_coords,
}
},
)
with vllm_runner(
model,
runner="pooling",
......
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
import asyncio
import base64
import mimetypes
import os
from tempfile import NamedTemporaryFile, TemporaryDirectory
import numpy as np
import pytest
import torch
from PIL import Image, ImageChops
from vllm.multimodal.image import convert_image_mode
from vllm.multimodal.inputs import PlaceholderRange
from vllm.multimodal.media import MediaConnector
# Test different image extensions (JPG/PNG) and formats (gray/RGB/RGBA)
TEST_IMAGE_ASSETS = [
"2560px-Gfp-wisconsin-madison-the-nature-boardwalk.jpg", # "https://vllm-public-assets.s3.us-west-2.amazonaws.com/vision_model_images/2560px-Gfp-wisconsin-madison-the-nature-boardwalk.jpg"
"Grayscale_8bits_palette_sample_image.png", # "https://vllm-public-assets.s3.us-west-2.amazonaws.com/vision_model_images/Grayscale_8bits_palette_sample_image.png",
"1280px-Venn_diagram_rgb.svg.png", # "https://vllm-public-assets.s3.us-west-2.amazonaws.com/vision_model_images/1280px-Venn_diagram_rgb.svg.png",
"RGBA_comp.png", # "https://vllm-public-assets.s3.us-west-2.amazonaws.com/vision_model_images/RGBA_comp.png",
]
TEST_VIDEO_URLS = [
"https://www.bogotobogo.com/python/OpenCV_Python/images/mean_shift_tracking/slow_traffic_small.mp4",
"https://github.com/opencv/opencv/raw/refs/tags/4.12.0/samples/data/vtest.avi",
]
@pytest.fixture(scope="module")
def url_images(local_asset_server) -> dict[str, Image.Image]:
return {
image_url: local_asset_server.get_image_asset(image_url)
for image_url in TEST_IMAGE_ASSETS
}
def get_supported_suffixes() -> tuple[str, ...]:
# We should at least test the file types mentioned in GPT-4 with Vision
OPENAI_SUPPORTED_SUFFIXES = (".png", ".jpeg", ".jpg", ".webp", ".gif")
# Additional file types that are supported by us
EXTRA_SUPPORTED_SUFFIXES = (".bmp", ".tiff")
return OPENAI_SUPPORTED_SUFFIXES + EXTRA_SUPPORTED_SUFFIXES
def _image_equals(a: Image.Image, b: Image.Image) -> bool:
return (np.asarray(a) == np.asarray(convert_image_mode(b, a.mode))).all()
@pytest.mark.asyncio
@pytest.mark.parametrize("image_url", TEST_IMAGE_ASSETS, indirect=True)
async def test_fetch_image_http(image_url: str):
connector = MediaConnector()
image_sync = connector.fetch_image(image_url)
image_async = await connector.fetch_image_async(image_url)
assert _image_equals(image_sync, image_async)
@pytest.mark.asyncio
@pytest.mark.parametrize("raw_image_url", TEST_IMAGE_ASSETS)
@pytest.mark.parametrize("suffix", get_supported_suffixes())
async def test_fetch_image_base64(
url_images: dict[str, Image.Image], raw_image_url: str, suffix: str
):
connector = MediaConnector(
# Domain restriction should not apply to data URLs.
allowed_media_domains=[
"www.bogotobogo.com",
"github.com",
]
)
url_image = url_images[raw_image_url]
try:
mime_type = Image.MIME[Image.registered_extensions()[suffix]]
except KeyError:
try:
mime_type = mimetypes.types_map[suffix]
except KeyError:
pytest.skip("No MIME type")
with NamedTemporaryFile(suffix=suffix) as f:
try:
url_image.save(f.name)
except Exception as e:
if e.args[0] == "cannot write mode RGBA as JPEG":
pytest.skip("Conversion not supported")
raise
base64_image = base64.b64encode(f.read()).decode("utf-8")
data_url = f"data:{mime_type};base64,{base64_image}"
data_image_sync = connector.fetch_image(data_url)
if _image_equals(url_image, Image.open(f)):
assert _image_equals(url_image, data_image_sync)
else:
pass # Lossy format; only check that image can be opened
data_image_async = await connector.fetch_image_async(data_url)
assert _image_equals(data_image_sync, data_image_async)
@pytest.mark.asyncio
@pytest.mark.parametrize("image_url", TEST_IMAGE_ASSETS, indirect=True)
async def test_fetch_image_local_files(image_url: str):
connector = MediaConnector()
with TemporaryDirectory() as temp_dir:
local_connector = MediaConnector(allowed_local_media_path=temp_dir)
origin_image = connector.fetch_image(image_url)
origin_image.save(
os.path.join(temp_dir, os.path.basename(image_url)),
quality=100,
icc_profile=origin_image.info.get("icc_profile"),
)
image_async = await local_connector.fetch_image_async(
f"file://{temp_dir}/{os.path.basename(image_url)}"
)
image_sync = local_connector.fetch_image(
f"file://{temp_dir}/{os.path.basename(image_url)}"
)
# Check that the images are equal
assert not ImageChops.difference(image_sync, image_async).getbbox()
with pytest.raises(ValueError, match="must be a subpath"):
await local_connector.fetch_image_async(
f"file://{temp_dir}/../{os.path.basename(image_url)}"
)
with pytest.raises(RuntimeError, match="Cannot load local files"):
await connector.fetch_image_async(
f"file://{temp_dir}/../{os.path.basename(image_url)}"
)
with pytest.raises(ValueError, match="must be a subpath"):
local_connector.fetch_image(
f"file://{temp_dir}/../{os.path.basename(image_url)}"
)
with pytest.raises(RuntimeError, match="Cannot load local files"):
connector.fetch_image(f"file://{temp_dir}/../{os.path.basename(image_url)}")
@pytest.mark.asyncio
@pytest.mark.parametrize("image_url", [TEST_IMAGE_ASSETS[0]], indirect=True)
async def test_fetch_image_local_files_with_space_in_name(image_url: str):
connector = MediaConnector()
with TemporaryDirectory() as temp_dir:
local_connector = MediaConnector(allowed_local_media_path=temp_dir)
origin_image = connector.fetch_image(image_url)
filename = "file name with space.jpg"
origin_image.save(
os.path.join(temp_dir, filename),
quality=100,
icc_profile=origin_image.info.get("icc_profile"),
)
try:
image_async = await local_connector.fetch_image_async(
f"file://{temp_dir}/{filename}"
)
image_sync = local_connector.fetch_image(f"file://{temp_dir}/{filename}")
except FileNotFoundError as e:
pytest.fail("Failed to fetch image with space in name: {}".format(e))
# Check that the images are equal
assert not ImageChops.difference(image_sync, image_async).getbbox()
@pytest.mark.asyncio
async def test_fetch_image_error_conversion():
connector = MediaConnector()
broken_img = "data:image/png;base64,aGVsbG9fdmxsbV9jb21tdW5pdHkK"
# PIL.UnidentifiedImageError should be converted to ValueError
with pytest.raises(ValueError):
await connector.fetch_image_async(broken_img)
with pytest.raises(ValueError):
connector.fetch_image(broken_img)
@pytest.mark.flaky(reruns=3, reruns_delay=5)
@pytest.mark.asyncio
@pytest.mark.parametrize("video_url", TEST_VIDEO_URLS)
@pytest.mark.parametrize("num_frames", [-1, 32, 1800])
async def test_fetch_video_http(video_url: str, num_frames: int):
connector = MediaConnector(
media_io_kwargs={
"video": {
"num_frames": num_frames,
}
}
)
try:
video_sync, metadata_sync = connector.fetch_video(video_url)
video_async, metadata_async = await connector.fetch_video_async(video_url)
except (TimeoutError, asyncio.TimeoutError) as e:
pytest.skip(f"Timeout fetching video (CI network flakiness): {e}")
assert np.array_equal(video_sync, video_async)
assert metadata_sync == metadata_async
@pytest.mark.asyncio
@pytest.mark.parametrize("video_url", TEST_VIDEO_URLS)
@pytest.mark.parametrize("max_duration", [1, 60, 1800])
@pytest.mark.parametrize("requested_fps", [2, 24])
async def test_fetch_video_http_with_dynamic_loader(
video_url: str,
max_duration: int,
requested_fps: int,
monkeypatch: pytest.MonkeyPatch,
):
with monkeypatch.context() as m:
m.setenv("VLLM_VIDEO_LOADER_BACKEND", "opencv_dynamic")
connector = MediaConnector(
media_io_kwargs={
"video": {
"max_duration": max_duration,
"requested_fps": requested_fps,
}
}
)
video_sync, metadata_sync = connector.fetch_video(video_url)
video_async, metadata_async = await connector.fetch_video_async(video_url)
assert np.array_equal(video_sync, video_async)
assert metadata_sync == metadata_async
assert metadata_sync["video_backend"] == "opencv_dynamic"
@pytest.mark.parametrize(
"is_embed,start_idx,end_idx,expected",
[
(None, 2, 4, (2, 4)),
(
torch.tensor([False, True, False, True, True]),
3,
5,
(1, 3),
),
(
torch.tensor([False, True, False, True, True]),
0,
2,
(0, 1),
),
(
torch.tensor([True, False, True, False]),
2,
2,
(1, 1),
),
],
)
def test_placeholder_range_get_embeds_indices_in_range(
is_embed, start_idx, end_idx, expected
):
length = len(is_embed) if is_embed is not None else 5
pr = PlaceholderRange(offset=0, length=length, is_embed=is_embed)
assert pr.get_embeds_indices_in_range(start_idx, end_idx) == expected
@pytest.mark.parametrize(
"offset,is_embed,expected",
[
(0, None, [(0, 4)]),
(
2,
torch.tensor([False, True, False, True, True]),
[(3, 3), (5, 6)],
),
(0, torch.tensor([True, True, True, True]), [(0, 3)]),
(0, torch.tensor([False, False, False, False]), []),
],
)
def test_placeholder_range_extract_embeds_range(offset, is_embed, expected):
length = len(is_embed) if is_embed is not None else 5
pr = PlaceholderRange(offset=offset, length=length, is_embed=is_embed)
assert pr.extract_embeds_range() == expected
@pytest.mark.asyncio
@pytest.mark.parametrize("video_url", TEST_VIDEO_URLS)
@pytest.mark.parametrize("num_frames", [-1, 32, 1800])
async def test_allowed_media_domains(video_url: str, num_frames: int):
connector = MediaConnector(
media_io_kwargs={
"video": {
"num_frames": num_frames,
}
},
allowed_media_domains=[
"www.bogotobogo.com",
"github.com",
],
)
video_sync, metadata_sync = connector.fetch_video(video_url)
video_async, metadata_async = await connector.fetch_video_async(video_url)
assert np.array_equal(video_sync, video_async)
assert metadata_sync == metadata_async
disallowed_url = "https://upload.wikimedia.org/wikipedia/commons/4/47/PNG_transparency_demonstration_1.png"
with pytest.raises(ValueError):
_, _ = connector.fetch_video(disallowed_url)
with pytest.raises(ValueError):
_, _ = await connector.fetch_video_async(disallowed_url)
......@@ -16,6 +16,22 @@ ASSETS_DIR = Path(__file__).parent / "assets"
assert ASSETS_DIR.exists()
def test_hash_single_item_different_shape():
x1 = torch.zeros(())
x2 = torch.zeros((1,))
hasher = MultiModalHasher
assert hasher.hash_kwargs(x=x1) != hasher.hash_kwargs(x=x2)
def test_hash_key_order_invariant():
x = torch.zeros((5, 10))
y = torch.ones((5, 10))
hasher = MultiModalHasher
assert hasher.hash_kwargs(x=x, y=y) == hasher.hash_kwargs(y=y, x=x)
# NOTE: Images that are the same visually are allowed to have the same hash
@pytest.mark.parametrize("mode_pair", [("1", "L"), ("RGBA", "CMYK")])
def test_hash_collision_image_mode(mode_pair):
......
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
import pytest
import torch
from vllm.multimodal.inputs import PlaceholderRange
@pytest.mark.parametrize(
"is_embed,expected",
[
(None, 5),
(torch.tensor([True, True, True, True, True]), 5),
(torch.tensor([False, False, False, False, False]), 0),
(torch.tensor([True, False, True, False, True]), 3),
(torch.tensor([True]), 1),
],
)
def test_placeholder_range_get_num_embeds(is_embed, expected):
length = len(is_embed) if is_embed is not None else 5
pr = PlaceholderRange(offset=0, length=length, is_embed=is_embed)
assert pr.get_num_embeds == expected
@pytest.mark.parametrize(
"is_embed,expected",
[
(None, None),
(
torch.tensor([False, True, False, True, True]),
torch.tensor([0, 1, 1, 2, 3]),
),
(torch.tensor([True, True, True]), torch.tensor([1, 2, 3])),
],
)
def test_placeholder_range_embeds_cumsum(is_embed, expected):
length = len(is_embed) if is_embed is not None else 5
pr = PlaceholderRange(offset=0, length=length, is_embed=is_embed)
if expected is None:
assert pr.embeds_cumsum is None
return
assert torch.equal(pr.embeds_cumsum, expected)
# cached_property should return the same object on repeated access
assert pr.embeds_cumsum is pr.embeds_cumsum
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
import asyncio
import base64
import mimetypes
import os
from tempfile import NamedTemporaryFile, TemporaryDirectory
import numpy as np
import pytest
import torch
from PIL import Image, ImageChops
from vllm.multimodal.image import convert_image_mode
from vllm.multimodal.inputs import PlaceholderRange
from vllm.multimodal.media import MediaConnector
from vllm.multimodal.utils import argsort_mm_positions
# Test different image extensions (JPG/PNG) and formats (gray/RGB/RGBA)
TEST_IMAGE_ASSETS = [
"2560px-Gfp-wisconsin-madison-the-nature-boardwalk.jpg", # "https://vllm-public-assets.s3.us-west-2.amazonaws.com/vision_model_images/2560px-Gfp-wisconsin-madison-the-nature-boardwalk.jpg"
"Grayscale_8bits_palette_sample_image.png", # "https://vllm-public-assets.s3.us-west-2.amazonaws.com/vision_model_images/Grayscale_8bits_palette_sample_image.png",
"1280px-Venn_diagram_rgb.svg.png", # "https://vllm-public-assets.s3.us-west-2.amazonaws.com/vision_model_images/1280px-Venn_diagram_rgb.svg.png",
"RGBA_comp.png", # "https://vllm-public-assets.s3.us-west-2.amazonaws.com/vision_model_images/RGBA_comp.png",
]
TEST_VIDEO_URLS = [
"https://www.bogotobogo.com/python/OpenCV_Python/images/mean_shift_tracking/slow_traffic_small.mp4",
"https://github.com/opencv/opencv/raw/refs/tags/4.12.0/samples/data/vtest.avi",
]
@pytest.fixture(scope="module")
def url_images(local_asset_server) -> dict[str, Image.Image]:
return {
image_url: local_asset_server.get_image_asset(image_url)
for image_url in TEST_IMAGE_ASSETS
}
def get_supported_suffixes() -> tuple[str, ...]:
# We should at least test the file types mentioned in GPT-4 with Vision
OPENAI_SUPPORTED_SUFFIXES = (".png", ".jpeg", ".jpg", ".webp", ".gif")
# Additional file types that are supported by us
EXTRA_SUPPORTED_SUFFIXES = (".bmp", ".tiff")
return OPENAI_SUPPORTED_SUFFIXES + EXTRA_SUPPORTED_SUFFIXES
def _image_equals(a: Image.Image, b: Image.Image) -> bool:
return (np.asarray(a) == np.asarray(convert_image_mode(b, a.mode))).all()
@pytest.mark.asyncio
@pytest.mark.parametrize("image_url", TEST_IMAGE_ASSETS, indirect=True)
async def test_fetch_image_http(image_url: str):
connector = MediaConnector()
image_sync = connector.fetch_image(image_url)
image_async = await connector.fetch_image_async(image_url)
assert _image_equals(image_sync, image_async)
@pytest.mark.asyncio
@pytest.mark.parametrize("raw_image_url", TEST_IMAGE_ASSETS)
@pytest.mark.parametrize("suffix", get_supported_suffixes())
async def test_fetch_image_base64(
url_images: dict[str, Image.Image], raw_image_url: str, suffix: str
):
connector = MediaConnector(
# Domain restriction should not apply to data URLs.
allowed_media_domains=[
"www.bogotobogo.com",
"github.com",
]
)
url_image = url_images[raw_image_url]
try:
mime_type = Image.MIME[Image.registered_extensions()[suffix]]
except KeyError:
try:
mime_type = mimetypes.types_map[suffix]
except KeyError:
pytest.skip("No MIME type")
with NamedTemporaryFile(suffix=suffix) as f:
try:
url_image.save(f.name)
except Exception as e:
if e.args[0] == "cannot write mode RGBA as JPEG":
pytest.skip("Conversion not supported")
raise
base64_image = base64.b64encode(f.read()).decode("utf-8")
data_url = f"data:{mime_type};base64,{base64_image}"
data_image_sync = connector.fetch_image(data_url)
if _image_equals(url_image, Image.open(f)):
assert _image_equals(url_image, data_image_sync)
else:
pass # Lossy format; only check that image can be opened
data_image_async = await connector.fetch_image_async(data_url)
assert _image_equals(data_image_sync, data_image_async)
@pytest.mark.asyncio
@pytest.mark.parametrize("image_url", TEST_IMAGE_ASSETS, indirect=True)
async def test_fetch_image_local_files(image_url: str):
connector = MediaConnector()
with TemporaryDirectory() as temp_dir:
local_connector = MediaConnector(allowed_local_media_path=temp_dir)
origin_image = connector.fetch_image(image_url)
origin_image.save(
os.path.join(temp_dir, os.path.basename(image_url)),
quality=100,
icc_profile=origin_image.info.get("icc_profile"),
)
image_async = await local_connector.fetch_image_async(
f"file://{temp_dir}/{os.path.basename(image_url)}"
)
image_sync = local_connector.fetch_image(
f"file://{temp_dir}/{os.path.basename(image_url)}"
)
# Check that the images are equal
assert not ImageChops.difference(image_sync, image_async).getbbox()
with pytest.raises(ValueError, match="must be a subpath"):
await local_connector.fetch_image_async(
f"file://{temp_dir}/../{os.path.basename(image_url)}"
)
with pytest.raises(RuntimeError, match="Cannot load local files"):
await connector.fetch_image_async(
f"file://{temp_dir}/../{os.path.basename(image_url)}"
)
with pytest.raises(ValueError, match="must be a subpath"):
local_connector.fetch_image(
f"file://{temp_dir}/../{os.path.basename(image_url)}"
)
with pytest.raises(RuntimeError, match="Cannot load local files"):
connector.fetch_image(f"file://{temp_dir}/../{os.path.basename(image_url)}")
@pytest.mark.asyncio
@pytest.mark.parametrize("image_url", [TEST_IMAGE_ASSETS[0]], indirect=True)
async def test_fetch_image_local_files_with_space_in_name(image_url: str):
connector = MediaConnector()
with TemporaryDirectory() as temp_dir:
local_connector = MediaConnector(allowed_local_media_path=temp_dir)
origin_image = connector.fetch_image(image_url)
filename = "file name with space.jpg"
origin_image.save(
os.path.join(temp_dir, filename),
quality=100,
icc_profile=origin_image.info.get("icc_profile"),
)
try:
image_async = await local_connector.fetch_image_async(
f"file://{temp_dir}/{filename}"
)
image_sync = local_connector.fetch_image(f"file://{temp_dir}/{filename}")
except FileNotFoundError as e:
pytest.fail("Failed to fetch image with space in name: {}".format(e))
# Check that the images are equal
assert not ImageChops.difference(image_sync, image_async).getbbox()
@pytest.mark.asyncio
async def test_fetch_image_error_conversion():
connector = MediaConnector()
broken_img = "data:image/png;base64,aGVsbG9fdmxsbV9jb21tdW5pdHkK"
# PIL.UnidentifiedImageError should be converted to ValueError
with pytest.raises(ValueError):
await connector.fetch_image_async(broken_img)
with pytest.raises(ValueError):
connector.fetch_image(broken_img)
@pytest.mark.flaky(reruns=3, reruns_delay=5)
@pytest.mark.asyncio
@pytest.mark.parametrize("video_url", TEST_VIDEO_URLS)
@pytest.mark.parametrize("num_frames", [-1, 32, 1800])
async def test_fetch_video_http(video_url: str, num_frames: int):
connector = MediaConnector(
media_io_kwargs={
"video": {
"num_frames": num_frames,
}
}
)
try:
video_sync, metadata_sync = connector.fetch_video(video_url)
video_async, metadata_async = await connector.fetch_video_async(video_url)
except (TimeoutError, asyncio.TimeoutError) as e:
pytest.skip(f"Timeout fetching video (CI network flakiness): {e}")
assert np.array_equal(video_sync, video_async)
assert metadata_sync == metadata_async
@pytest.mark.asyncio
@pytest.mark.parametrize("video_url", TEST_VIDEO_URLS)
@pytest.mark.parametrize("max_duration", [1, 60, 1800])
@pytest.mark.parametrize("requested_fps", [2, 24])
async def test_fetch_video_http_with_dynamic_loader(
video_url: str,
max_duration: int,
requested_fps: int,
monkeypatch: pytest.MonkeyPatch,
):
with monkeypatch.context() as m:
m.setenv("VLLM_VIDEO_LOADER_BACKEND", "opencv_dynamic")
connector = MediaConnector(
media_io_kwargs={
"video": {
"max_duration": max_duration,
"requested_fps": requested_fps,
}
}
)
video_sync, metadata_sync = connector.fetch_video(video_url)
video_async, metadata_async = await connector.fetch_video_async(video_url)
assert np.array_equal(video_sync, video_async)
assert metadata_sync == metadata_async
assert metadata_sync["video_backend"] == "opencv_dynamic"
from vllm.multimodal.inputs import (
MultiModalBatchedField,
MultiModalFieldElem,
MultiModalKwargsItem,
MultiModalSharedField,
PlaceholderRange,
)
from vllm.multimodal.utils import argsort_mm_positions, group_and_batch_mm_items
@pytest.mark.parametrize(
......@@ -412,121 +184,35 @@ def test_argsort_mm_positions(case):
assert modality_idxs == expected_modality_idxs
@pytest.mark.parametrize(
"is_embed,expected",
[
(None, 5),
(torch.tensor([True, True, True, True, True]), 5),
(torch.tensor([False, False, False, False, False]), 0),
(torch.tensor([True, False, True, False, True]), 3),
(torch.tensor([True]), 1),
],
)
def test_placeholder_range_get_num_embeds(is_embed, expected):
length = len(is_embed) if is_embed is not None else 5
pr = PlaceholderRange(offset=0, length=length, is_embed=is_embed)
assert pr.get_num_embeds == expected
@pytest.mark.parametrize(
"is_embed,expected",
[
(None, None),
(
torch.tensor([False, True, False, True, True]),
torch.tensor([0, 1, 1, 2, 3]),
),
(torch.tensor([True, True, True]), torch.tensor([1, 2, 3])),
],
)
def test_placeholder_range_embeds_cumsum(is_embed, expected):
length = len(is_embed) if is_embed is not None else 5
pr = PlaceholderRange(offset=0, length=length, is_embed=is_embed)
if expected is None:
assert pr.embeds_cumsum is None
return
assert torch.equal(pr.embeds_cumsum, expected)
# cached_property should return the same object on repeated access
assert pr.embeds_cumsum is pr.embeds_cumsum
@pytest.mark.parametrize(
"is_embed,start_idx,end_idx,expected",
[
(None, 2, 4, (2, 4)),
(
torch.tensor([False, True, False, True, True]),
3,
5,
(1, 3),
),
(
torch.tensor([False, True, False, True, True]),
0,
2,
(0, 1),
),
(
torch.tensor([True, False, True, False]),
2,
2,
(1, 1),
),
],
)
def test_placeholder_range_get_embeds_indices_in_range(
is_embed, start_idx, end_idx, expected
):
length = len(is_embed) if is_embed is not None else 5
pr = PlaceholderRange(offset=0, length=length, is_embed=is_embed)
assert pr.get_embeds_indices_in_range(start_idx, end_idx) == expected
@pytest.mark.parametrize(
"offset,is_embed,expected",
[
(0, None, [(0, 4)]),
(
2,
torch.tensor([False, True, False, True, True]),
[(3, 3), (5, 6)],
),
(0, torch.tensor([True, True, True, True]), [(0, 3)]),
(0, torch.tensor([False, False, False, False]), []),
],
)
def test_placeholder_range_extract_embeds_range(offset, is_embed, expected):
length = len(is_embed) if is_embed is not None else 5
pr = PlaceholderRange(offset=offset, length=length, is_embed=is_embed)
assert pr.extract_embeds_range() == expected
@pytest.mark.asyncio
@pytest.mark.parametrize("video_url", TEST_VIDEO_URLS)
@pytest.mark.parametrize("num_frames", [-1, 32, 1800])
async def test_allowed_media_domains(video_url: str, num_frames: int):
connector = MediaConnector(
media_io_kwargs={
"video": {
"num_frames": num_frames,
}
},
allowed_media_domains=[
"www.bogotobogo.com",
"github.com",
],
def test_group_and_batch_mm_items_split_by_fieldset():
elem = MultiModalFieldElem(
data=torch.empty(1, dtype=torch.uint8),
field=MultiModalBatchedField(),
)
item1 = MultiModalKwargsItem({"x": elem, "y": elem})
item2 = MultiModalKwargsItem({"y": elem, "x": elem})
item3 = MultiModalKwargsItem({"x": elem, "y": elem, "z": elem})
item4 = MultiModalKwargsItem({"x": elem})
item5 = MultiModalKwargsItem({"x": elem, "y": elem})
video_sync, metadata_sync = connector.fetch_video(video_url)
video_async, metadata_async = await connector.fetch_video_async(video_url)
assert np.array_equal(video_sync, video_async)
assert metadata_sync == metadata_async
res = group_and_batch_mm_items([item1, item2, item3, item4, item5])
assert [num_items for num_items, _ in res] == [2, 1, 1, 1]
disallowed_url = "https://upload.wikimedia.org/wikipedia/commons/4/47/PNG_transparency_demonstration_1.png"
with pytest.raises(ValueError):
_, _ = connector.fetch_video(disallowed_url)
with pytest.raises(ValueError):
_, _ = await connector.fetch_video_async(disallowed_url)
def test_group_and_batch_mm_items_split_by_shared_data():
elem1 = MultiModalFieldElem(
data=torch.zeros(1, dtype=torch.uint8),
field=MultiModalSharedField(batch_size=1),
)
elem2 = MultiModalFieldElem(
data=torch.zeros(2, dtype=torch.uint8),
field=MultiModalSharedField(batch_size=1),
)
item1 = MultiModalKwargsItem({"x": elem1})
item2 = MultiModalKwargsItem({"x": elem1})
item3 = MultiModalKwargsItem({"x": elem2})
item4 = MultiModalKwargsItem({"x": elem1})
item5 = MultiModalKwargsItem({"x": elem2})
res = group_and_batch_mm_items([item1, item2, item3, item4, item5])
assert [num_items for num_items, _ in res] == [2, 1, 1, 1]
......@@ -71,9 +71,7 @@ class Step3VLImagePixelInputs(TensorSchema):
type: Literal["pixel_values"]
pixel_values: Annotated[torch.Tensor, TensorShape("bn", 3, "h", "w")]
patch_pixel_values: Annotated[
torch.Tensor | None, TensorShape("bnp", 3, "hp", "wp")
]
patch_pixel_values: Annotated[torch.Tensor, TensorShape("bnp", 3, "hp", "wp")]
num_patches: Annotated[torch.Tensor, TensorShape("bn")]
......@@ -91,7 +89,7 @@ class Step3VLImageEmbeddingInputs(TensorSchema):
Step3VLImageInputs: TypeAlias = Step3VLImagePixelInputs | Step3VLImageEmbeddingInputs
ImageWithPatches = tuple[Image.Image, list[Image.Image], list[int] | None]
ImageWithPatches = tuple[Image.Image, list[Image.Image], list[bool] | None]
MAX_IMAGE_SIZE: int = 3024
......@@ -432,7 +430,7 @@ class Step3VLProcessor:
if len(parts) - 1 != len(repls):
raise ValueError(
"The number of placeholders does not match the number of replacements." # noqa: E501
"The number of placeholders does not match the number of replacements."
)
result = [parts[0]]
......@@ -468,7 +466,7 @@ class Step3VLProcessor:
image_repl_str_lst = []
image_repl_ids_lst = []
num_patches = []
for raw_img, img_patches, patch_newline_mask in splitted_images_data: # noqa: E501
for raw_img, img_patches, patch_newline_mask in splitted_images_data:
pixel_values_lst.extend(self._convert_images_to_pixel_values([raw_img]))
if len(img_patches) > 0:
......@@ -486,16 +484,20 @@ class Step3VLProcessor:
if patch_newline_mask is not None:
patch_newline_mask_lst.extend(patch_newline_mask)
pixel_values = torch.cat(pixel_values_lst)
patch_size = self.patch_size
image_inputs = {
"pixel_values": torch.cat(pixel_values_lst),
"pixel_values": pixel_values,
"num_patches": num_patches,
}
if patch_pixel_values_lst:
image_inputs["patch_pixel_values"] = torch.cat(patch_pixel_values_lst)
if patch_newline_mask_lst:
image_inputs["patch_newline_mask"] = torch.tensor(
"patch_pixel_values": (
torch.cat(patch_pixel_values_lst)
if patch_pixel_values_lst
else pixel_values.new_empty((0, 3, patch_size, patch_size))
),
"patch_newline_mask": torch.tensor(
patch_newline_mask_lst, dtype=torch.bool
)
),
}
text = [
self.replace_placeholder(t, self.image_token, image_repl_str_lst)
......@@ -998,13 +1000,11 @@ class Step3VLForConditionalGeneration(nn.Module, SupportsMultiModal, SupportsPP)
if pixel_values is None and image_embeds is None:
return None
if pixel_values is not None:
if pixel_values is not None and patch_pixel_values is not None:
return Step3VLImagePixelInputs(
type="pixel_values",
pixel_values=pixel_values.to(self.dtype),
patch_pixel_values=patch_pixel_values.to(self.dtype)
if patch_pixel_values is not None
else None,
patch_pixel_values=patch_pixel_values.to(self.dtype),
num_patches=num_patches,
)
......@@ -1039,7 +1039,7 @@ class Step3VLForConditionalGeneration(nn.Module, SupportsMultiModal, SupportsPP)
image_features = self._get_vision_model_output(image_input["pixel_values"])
patch_image_features = (
self._get_vision_model_output(image_input["patch_pixel_values"])
if image_input["patch_pixel_values"] is not None
if len(image_input["patch_pixel_values"]) > 0
else None
)
num_patches = image_input["num_patches"]
......
......@@ -62,7 +62,6 @@ from vllm.multimodal.processing import (
PromptUpdate,
)
from vllm.sequence import IntermediateTensors
from vllm.utils import length_from_prompt_token_ids_or_embeds
from .interfaces import IsAttentionFree, MultiModalEmbeddings, SupportsMultiModal
from .interfaces_base import attn_type
......@@ -74,7 +73,11 @@ def _terratorch_field_names(input_definition: InputDefinition):
return set(input_definition.data.keys())
def _terratorch_field_factory(input_definition: InputDefinition):
def _terratorch_field_factory(
input_definition: InputDefinition,
*,
is_shared: bool = True, # True for unprocessed data, False for processed data
):
def _terratorch_field_config(
hf_inputs: Mapping[str, torch.Tensor],
) -> Mapping[str, MultiModalFieldConfig]:
......@@ -82,7 +85,11 @@ def _terratorch_field_factory(input_definition: InputDefinition):
for name, input in input_definition.data.items():
modality = "image"
if input.type == InputTypeEnum.tensor:
fields[name] = MultiModalFieldConfig.shared(modality, batch_size=1)
fields[name] = (
MultiModalFieldConfig.shared(modality, batch_size=1)
if is_shared
else MultiModalFieldConfig.batched(modality)
)
return fields
......@@ -166,8 +173,14 @@ class TerratorchMultiModalProcessor(BaseMultiModalProcessor[TerratorchProcessing
self,
hf_inputs: BatchFeature,
hf_processor_mm_kwargs: Mapping[str, object],
*,
is_shared: bool = True,
) -> Mapping[str, MultiModalFieldConfig]:
return _terratorch_field_factory(self.info.input_definition)(hf_inputs)
factory = _terratorch_field_factory(
self.info.input_definition,
is_shared=is_shared,
)
return factory(hf_inputs)
def _get_prompt_updates(
self,
......@@ -193,12 +206,19 @@ class TerratorchMultiModalProcessor(BaseMultiModalProcessor[TerratorchProcessing
)
_, passthrough_data = self._get_hf_mm_data(mm_items)
mm_processed_data = BatchFeature(dict(passthrough_data), tensor_type="pt")
mm_processed_data = BatchFeature(
{k: torch.tensor(v).unsqueeze(0) for k, v in passthrough_data.items()},
tensor_type="pt",
)
mm_placeholders = {"image": [PlaceholderRange(offset=0, length=0)]}
mm_kwargs = MultiModalKwargsItems.from_hf_inputs(
mm_processed_data,
self._get_mm_fields_config(mm_processed_data, hf_processor_mm_kwargs),
self._get_mm_fields_config(
mm_processed_data,
hf_processor_mm_kwargs,
is_shared=False,
),
)
return MultiModalInputs(
......@@ -235,9 +255,6 @@ class Terratorch(nn.Module, IsAttentionFree, SupportsMultiModal):
self.inference_runner = InferenceRunner(config)
self.model = self.inference_runner.model
pooler_config = vllm_config.model_config.pooler_config
assert pooler_config is not None
self.pooler = IdentityPooler()
def embed_input_ids(
......@@ -262,15 +279,8 @@ class Terratorch(nn.Module, IsAttentionFree, SupportsMultiModal):
inputs_embeds: torch.Tensor | None = None,
**kwargs: object,
):
input_len = length_from_prompt_token_ids_or_embeds(input_ids, inputs_embeds)
batched_kwargs = {k: v.unsqueeze(0) for k, v in kwargs.items()}
model_output = self.inference_runner.forward(**batched_kwargs).output
# The leading dimension of hidden states needs to equal input length
return model_output.expand(
input_len, *(-1 for _ in range(model_output.ndim - 1))
)
model_output = self.inference_runner.forward(**kwargs)
return model_output.output
def load_weights(self, weights: Iterable[tuple[str, torch.Tensor]]) -> set[str]:
params_list = []
......
......@@ -102,12 +102,19 @@ class MultiModalHasher:
"data": tensor_obj.numpy(),
},
)
return cls.iter_item_to_bytes("tensor", tensor_obj.numpy())
if isinstance(obj, np.ndarray):
if obj.ndim == 0:
arr_data = obj.item()
elif obj.flags.c_contiguous:
# Not valid for 0-D arrays
arr_data = obj.view(np.uint8).data
else:
# If the array is non-contiguous, we need to copy it first
arr_data = (
obj.view(np.uint8).data if obj.flags.c_contiguous else obj.tobytes()
)
arr_data = obj.tobytes()
return cls.iter_item_to_bytes(
"ndarray",
{
......@@ -116,6 +123,7 @@ class MultiModalHasher:
"data": arr_data,
},
)
logger.warning(
"No serialization method found for %s. Falling back to pickle.", type(obj)
)
......@@ -147,7 +155,7 @@ class MultiModalHasher:
hasher_factory = _get_hasher_factory(envs.VLLM_MM_HASHER_ALGORITHM)
hasher = hasher_factory()
for k, v in kwargs.items():
for k, v in sorted(kwargs.items(), key=lambda kv: kv[0]):
for bytes_ in cls.iter_item_to_bytes(k, v):
hasher.update(bytes_)
......
......@@ -424,8 +424,9 @@ class BaseMultiModalField(ABC):
keep_on_cpu: bool = False
"""
If `True`, then this field is excluded from being moved to the accelerator
when `MultiModalKwargsItems.get_data()` is called to batch the data.
If `True`, then this field is excluded from being moved to the accelerator when
[`group_and_batch_mm_items`][vllm.multimodal.utils.group_and_batch_mm_items]
is called to batch the data.
"""
def _field_factory(self):
......@@ -1006,27 +1007,38 @@ class MultiModalKwargsItems(UserDict[str, Sequence[_I]]):
pin_memory: bool = False,
) -> BatchedTensorInputs:
"""Construct a dictionary of keyword arguments to pass to the model."""
elems_by_key = defaultdict[str, list[MultiModalFieldElem]](list)
for modality, items in self.items():
for i, item in enumerate(items):
if item is None:
raise RuntimeError(
f"Cannot build data from empty mm_items[{modality}][{i}]"
)
for key, elem in item.items():
elems_by_key[key].append(elem)
data = {
key: elems[0].field.reduce_data(
elems,
from .utils import group_and_batch_mm_items
items_by_modality = self.require_data()
batches_by_modality = {
modality: [
data
for _, data in group_and_batch_mm_items(
items,
device=device,
pin_memory=pin_memory,
)
for key, elems in elems_by_key.items()
]
for modality, items in items_by_modality.items()
if len(items) > 0
}
out_data: BatchedTensorInputs = {}
for _, batches in batches_by_modality.items():
if len(batches) != 1:
num_batches_by_modality = {
modality: len(batches)
for modality, batches in batches_by_modality.items()
}
return data
raise RuntimeError(
f"Some modalities cannot be merged into a single batch "
f"({num_batches_by_modality=})"
)
out_data.update(batches[0])
return out_data
MultiModalKwargsOptionalItems: TypeAlias = (
......
......@@ -3,7 +3,8 @@
import mimetypes
import warnings
from collections.abc import Generator
from collections import defaultdict
from collections.abc import Generator, Sequence
from itertools import groupby
from typing import TYPE_CHECKING, Any
......@@ -13,11 +14,13 @@ from PIL import Image
from vllm.utils.import_utils import LazyLoader
from .hasher import MultiModalHasher
from .inputs import (
BatchedTensorInputs,
MultiModalFieldElem,
MultiModalKwargsItem,
MultiModalKwargsItems,
MultiModalPlaceholderDict,
MultiModalSharedField,
)
from .media import AudioMediaIO, ImageMediaIO, MediaConnector, VideoMediaIO
......@@ -146,32 +149,119 @@ def argsort_mm_positions(
return [(modality, idx) for modality, idx, _ in sorted_flat_items]
def _get_group_hash(elem: MultiModalFieldElem):
if not isinstance(elem.field, MultiModalSharedField):
return None
return MultiModalHasher.hash_kwargs(data=elem.data)
def _batch_mm_items(
items: Sequence[MultiModalKwargsItem],
*,
device: torch.types.Device = None,
pin_memory: bool = False,
):
elems = defaultdict[str, list[MultiModalFieldElem]](list)
for item in items:
for key, elem in item.items():
elems[key].append(elem)
return {
key: elems[0].field.reduce_data(
elems,
device=device,
pin_memory=pin_memory,
)
for key, elems in elems.items()
}
def group_and_batch_mm_items(
items: Sequence[MultiModalKwargsItem],
*,
device: torch.types.Device = None,
pin_memory: bool = False,
) -> Generator[tuple[int, BatchedTensorInputs]]:
"""
Group consecutive items (possibly from different requests) into batches.
Items must be split across groups if any of the following occurs,
as the batch would otherwise be invalid:
- They have different fields (e.g. mixed image and embedding inputs).
- They have different values in `MultiModalSharedField`.
Args:
items: List of `MultiModalKwargsItem`.
device: The device to place the grouped tensors on.
pin_memory: Whether to pin memory for faster host-to-device transfer.
Yields:
A tuple `(num_items, grouped_kwargs)`, where:
- `kwargs` is a dictionary of keyword arguments to pass to the model;
- `num_items` is the corresponding number of items.
"""
group_ids = [
tuple(
(key, _get_group_hash(elem))
for key, elem in sorted(item.items(), key=lambda kv: kv[0])
)
for item in items
]
group_sizes = [sum(1 for _ in group) for _, group in groupby(group_ids)]
start_idx = 0
for group_size in group_sizes:
group_data = _batch_mm_items(
items[start_idx : start_idx + group_size],
device=device,
pin_memory=pin_memory,
)
yield group_size, group_data
start_idx += group_size
assert start_idx == len(items)
def group_mm_kwargs_by_modality(
mm_kwargs: list[tuple[str, MultiModalKwargsItem]],
*,
device: torch.types.Device = None,
pin_memory: bool = False,
) -> Generator[tuple[str, int, BatchedTensorInputs], None, None]:
"""Group consecutive `MultiModalKwargsItem`s from `mm_kwargs` with the same
modality together into the same `MultiModalKwargs` instance.
"""
Group consecutive items (possibly from different requests) into batches.
Items must be split across groups if any of the following occurs,
as the batch would otherwise be invalid:
- They have different fields (e.g. mixed image and embedding inputs).
- They have different values in `MultiModalSharedField`.
To simplify the implementation of `embed_multimodal`, we add another
restriction that the items in a batch must belong to the same modality.
Args:
mm_kwargs: List of `MultiModalKwargsItem`.
mm_kwargs: List of `(modality, item)`.
device: The device to place the grouped tensors on.
pin_memory: Whether to pin memory for faster host-to-device transfer.
Yields:
A tuple `(modality, num_items, grouped_kwargs)`.
A tuple `(modality, num_items, grouped_kwargs)`, where:
- `modality` is the modality of the batch;
- `kwargs` is a dictionary of keyword arguments to pass to the model;
- `num_items` is the corresponding number of items.
"""
for modality, group in groupby(mm_kwargs, key=lambda x: x[0]):
items_lst = [item for _, item in group]
mm_kwargs_items = MultiModalKwargsItems({modality: items_lst})
mm_kwargs_data = mm_kwargs_items.get_data(
for num_items, mm_kwargs_batch in group_and_batch_mm_items(
items_lst,
device=device,
pin_memory=pin_memory,
)
yield modality, len(items_lst), mm_kwargs_data
):
yield modality, num_items, mm_kwargs_batch
def fetch_audio(
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment