Unverified Commit 6f2c71be authored by Jaseel Muhammad's avatar Jaseel Muhammad Committed by GitHub
Browse files

[Multimodal] Add PyAV video backend for concurrent video decoding (#39986)


Signed-off-by: default avatarJaseel Muhammad <jaseel.muhammad@mbzuai.ac.ae>
Signed-off-by: default avatarIsotr0py <2037008807@qq.com>
Co-authored-by: default avatarIsotr0py <2037008807@qq.com>
Co-authored-by: default avatarIsotr0py <mozf@mail2.sysu.edu.cn>
parent 2463f00f
......@@ -6,7 +6,7 @@ import pytest
from vllm.assets.video import VideoAsset
from vllm.multimodal import MULTIMODAL_REGISTRY
from vllm.multimodal.inputs import batched_tensors_equal
from vllm.multimodal.video import OpenCVDynamicVideoBackend, OpenCVVideoBackend
from vllm.multimodal.video import DynamicVideoBackend, VideoBackend
from ...utils import build_model_context
......@@ -70,9 +70,11 @@ def test_processor_override(
@pytest.mark.parametrize("model_id", ["zai-org/GLM-4.1V-9B-Thinking"])
@pytest.mark.parametrize("fps", [2])
@pytest.mark.parametrize("backend", ["opencv", "pyav"])
def test_video_loader_consistency(
model_id: str,
fps: int,
backend: str,
):
"""
Ensure dynamic video loader (pre-sampled by loader) and normal video
......@@ -93,9 +95,11 @@ def test_video_loader_consistency(
with open(video_path, "rb") as f:
video_bytes = f.read()
static_video, static_metadata = OpenCVVideoBackend.load_bytes(video_bytes)
dynamic_video, dynamic_metadata = OpenCVDynamicVideoBackend.load_bytes(
video_bytes, fps=fps
static_video, static_metadata = VideoBackend.load_bytes(
video_bytes, backend=backend
)
dynamic_video, dynamic_metadata = DynamicVideoBackend.load_bytes(
video_bytes, fps=fps, backend=backend
)
# pre-sampled loader shouldn't read all frames
......
......@@ -71,7 +71,9 @@ def test_video_backend_handles_broken_frames(monkeypatch: pytest.MonkeyPatch):
video_data = f.read()
loader = VIDEO_LOADER_REGISTRY.load("opencv")
frames, metadata = loader.load_bytes(video_data, num_frames=-1)
frames, metadata = loader.load_bytes(
video_data, num_frames=-1, backend="opencv"
)
# Verify metadata consistency:
# frames_indices must match actual loaded frames
......@@ -158,12 +160,12 @@ def test_video_recovery_simulated_failures(monkeypatch: pytest.MonkeyPatch):
# Test WITHOUT recovery - should have fewer frames due to failures
frames_no_recovery, meta_no = loader.load_bytes(
video_data, num_frames=8, frame_recovery=False
video_data, num_frames=8, frame_recovery=False, backend="opencv"
)
# Test WITH recovery - should recover using next valid frames
frames_with_recovery, meta_yes = loader.load_bytes(
video_data, num_frames=8, frame_recovery=True
video_data, num_frames=8, frame_recovery=True, backend="opencv"
)
# With recovery should have MORE frames than without
......@@ -214,12 +216,12 @@ def test_video_recovery_with_corrupted_file(monkeypatch: pytest.MonkeyPatch):
# Test without recovery - frame 17 will be skipped
frames_no_recovery, meta_no_recovery = loader.load_bytes(
video_data, num_frames=8, frame_recovery=False
video_data, num_frames=8, frame_recovery=False, backend="opencv"
)
# Test with recovery - frame 18 should fill in for frame 17
frames_with_recovery, meta_with_recovery = loader.load_bytes(
video_data, num_frames=8, frame_recovery=True
video_data, num_frames=8, frame_recovery=True, backend="opencv"
)
# Verify metadata consistency for both modes
......@@ -271,12 +273,16 @@ def test_video_recovery_dynamic_backend(monkeypatch: pytest.MonkeyPatch):
# Test without recovery
frames_no_recovery, meta_no = loader.load_bytes(
video_data, fps=2, max_duration=10, frame_recovery=False
video_data,
fps=2,
max_duration=10,
frame_recovery=False,
backend="opencv",
)
# Test with frame_recovery enabled
frames_with_recovery, meta_with = loader.load_bytes(
video_data, fps=2, max_duration=10, frame_recovery=True
video_data, fps=2, max_duration=10, frame_recovery=True, backend="opencv"
)
# Verify basic properties
......@@ -310,27 +316,81 @@ def dummy_video_path(tmp_path):
return video_path
# ============================================================================
# PyAV Backend Tests
# ============================================================================
def test_pyav_backend_loads_frames(dummy_video_path, monkeypatch: pytest.MonkeyPatch):
"""Test that the pyav codec backend can load frames from a valid video."""
with monkeypatch.context() as m:
m.setenv("VLLM_VIDEO_LOADER_BACKEND", "opencv")
with open(dummy_video_path, "rb") as f:
video_data = f.read()
loader = VIDEO_LOADER_REGISTRY.load("opencv")
frames, metadata = loader.load_bytes(video_data, num_frames=8, backend="pyav")
assert frames.ndim == 4
assert frames.shape[3] == 3 # RGB
assert frames.shape[0] == 8
assert frames.shape[0] == len(metadata["frames_indices"])
assert metadata["video_backend"] == "pyav"
assert "total_num_frames" in metadata
assert "fps" in metadata
assert "duration" in metadata
def test_pyav_dynamic_backend_loads_frames(
dummy_video_path, monkeypatch: pytest.MonkeyPatch
):
"""Test that the pyav codec with dynamic sampling can load frames."""
with monkeypatch.context() as m:
m.setenv("VLLM_VIDEO_LOADER_BACKEND", "opencv_dynamic")
with open(dummy_video_path, "rb") as f:
video_data = f.read()
loader = VIDEO_LOADER_REGISTRY.load("opencv_dynamic")
frames, metadata = loader.load_bytes(
video_data, fps=2, max_duration=10, backend="pyav"
)
assert frames.ndim == 4
assert frames.shape[3] == 3 # RGB
assert frames.shape[0] > 0
assert frames.shape[0] == len(metadata["frames_indices"])
assert metadata["video_backend"] == "pyav_dynamic"
@pytest.mark.parametrize(
"backend, kwargs, expected_num_frames",
"loader_key, kwargs, expected_num_frames",
[
# opencv: num_frames directly controls count
pytest.param("opencv", {"num_frames": 32}, 32, id="opencv-num_frames"),
pytest.param("opencv", {"fps": 2}, 120, id="opencv-fps"),
# uniform sampling + opencv codec
pytest.param(
"opencv",
{"num_frames": 500, "fps": 2},
{"num_frames": 32, "backend": "opencv"},
32,
id="opencv-num_frames",
),
pytest.param("opencv", {"fps": 2, "backend": "opencv"}, 120, id="opencv-fps"),
pytest.param(
"opencv",
{"num_frames": 500, "fps": 2, "backend": "opencv"},
120,
id="opencv-num_frames_wins_fps",
),
# dynamic sampling + opencv codec
pytest.param(
"opencv_dynamic",
{"fps": 1, "max_duration": 60},
{"fps": 1, "max_duration": 60, "backend": "opencv"},
60,
id="opencv_dynamic-within_max_duration",
),
pytest.param(
"opencv_dynamic",
{"fps": 2, "max_duration": 30},
{"fps": 2, "max_duration": 30, "backend": "opencv"},
60,
id="opencv_dynamic-exceeds_max_duration",
),
......@@ -349,18 +409,45 @@ def dummy_video_path(tmp_path):
119,
id="molmo2-fps",
),
# uniform sampling + pyav codec (same frame counts as opencv)
pytest.param(
"opencv",
{"num_frames": 32, "backend": "pyav"},
32,
id="pyav-num_frames",
),
pytest.param("opencv", {"fps": 2, "backend": "pyav"}, 120, id="pyav-fps"),
pytest.param(
"opencv",
{"num_frames": 500, "fps": 2, "backend": "pyav"},
120,
id="pyav-num_frames_wins_fps",
),
# dynamic sampling + pyav codec
pytest.param(
"opencv_dynamic",
{"fps": 1, "max_duration": 60, "backend": "pyav"},
60,
id="pyav_dynamic-within_max_duration",
),
pytest.param(
"opencv_dynamic",
{"fps": 2, "max_duration": 30, "backend": "pyav"},
60,
id="pyav_dynamic-exceeds_max_duration",
),
],
)
def test_video_loader_frames_sampling(
dummy_video_path,
monkeypatch: pytest.MonkeyPatch,
backend: str,
loader_key: str,
kwargs: dict,
expected_num_frames: int,
):
"""Test video loader frames sampling functionality."""
monkeypatch.setenv("VLLM_VIDEO_LOADER_BACKEND", backend)
loader = VIDEO_LOADER_REGISTRY.load(backend)
monkeypatch.setenv("VLLM_VIDEO_LOADER_BACKEND", loader_key)
loader = VIDEO_LOADER_REGISTRY.load(loader_key)
with open(dummy_video_path, "rb") as f:
long_video_bytes = f.read()
......
......@@ -829,9 +829,10 @@ environment_variables: dict[str, Callable[[], Any]] = {
"VLLM_MAX_AUDIO_CLIP_FILESIZE_MB": lambda: int(
os.getenv("VLLM_MAX_AUDIO_CLIP_FILESIZE_MB", "25")
),
# Backend for Video IO
# - "opencv": Default backend that uses OpenCV stream buffered backend.
# - "identity": Returns raw video bytes for model processor to handle.
# Backend for Video IO — selects the frame-sampling algorithm.
# - "opencv": uniform sampling.
# - "opencv_dynamic": duration-aware dynamic sampling.
# - "identity": returns raw video bytes for model processor to handle.
#
# Custom backend implementations can be registered
# via `@VIDEO_LOADER_REGISTRY.register("my_custom_video_loader")` and
......
......@@ -3,7 +3,7 @@
import math
from abc import abstractmethod
from io import BytesIO
from typing import Any, NamedTuple, cast
from typing import Any, ClassVar, Literal, NamedTuple, cast
import numpy as np
import numpy.typing as npt
......@@ -19,6 +19,11 @@ except ImportError:
cv2 = PlaceholderModule("cv2")
vr = PlaceholderModule("cv2").placeholder_attr("videoio_registry")
try:
import av
except ImportError:
av = PlaceholderModule("av") # type: ignore[assignment]
logger = init_logger(__name__)
......@@ -355,8 +360,75 @@ class OpenCVVideoBackendMixin:
return frames, valid_frame_indices
class PyAVVideoBackendMixin:
"""PyAV (in-process FFmpeg bindings) codec utilities.
Reads stream metadata and decodes target frames via per-frame
``container.seek()``. The seek releases the GIL between frames and
scales with the number of sampled frames rather than the video
length, enabling concurrent decoding under serving load.
"""
@staticmethod
def get_metadata(
container: "av.container.InputContainer",
) -> VideoSourceMetadata:
if not container.streams.video:
raise ValueError("No video streams found in container")
stream = container.streams.video[0]
total_frames = stream.frames or 0
fps = float(stream.average_rate) if stream.average_rate else 0.0
duration = float(stream.duration * stream.time_base) if stream.duration else 0.0
if total_frames == 0 and duration > 0 and fps > 0:
total_frames = int(duration * fps)
return VideoSourceMetadata(total_frames, fps, duration)
@staticmethod
def decode_frames(
container: "av.container.InputContainer",
frame_indices: list[int],
fps: float,
duration: float,
) -> tuple[npt.NDArray, list[int]]:
"""Decode target frames via per-frame seek + keyframe decode."""
stream = container.streams.video[0]
# SLICE parallelizes within a single frame without the
# one-frame-per-thread latency penalty of FRAME threading.
stream.thread_type = "SLICE"
time_base = stream.time_base
frames_list: list[npt.NDArray] = []
valid_indices: list[int] = []
frame_interval = 1.0 / fps if fps > 0 else 0.1
max_ts = max(0.0, duration - frame_interval) if duration > 0 else float("inf")
for idx in frame_indices:
ts = min(idx / fps, max_ts) if fps > 0 else 0.0
pts = int(ts / time_base)
container.seek(pts, stream=stream)
frame = next(container.decode(video=0), None)
if frame is not None:
frames_list.append(frame.to_ndarray(format="rgb24"))
valid_indices.append(idx)
if not frames_list:
return np.empty((0,), dtype=np.uint8), valid_indices
return np.stack(frames_list), valid_indices
@VIDEO_LOADER_REGISTRY.register("opencv")
class OpenCVVideoBackend(VideoLoader, OpenCVVideoBackendMixin):
class VideoBackend(VideoLoader, OpenCVVideoBackendMixin, PyAVVideoBackendMixin):
"""Uniform-sampling video backend.
Samples ``num_frames`` uniformly across the video (or one frame every
``1/fps`` seconds, whichever produces fewer frames). The decoding codec
is selected via the ``backend`` kwarg (``"opencv"`` or ``"pyav"``),
which can be passed through ``--media-io-kwargs``. Defaults to
``"pyav"`` for concurrent decoding.
"""
_sampling_suffix: ClassVar[str] = ""
@classmethod
def compute_frames_index_to_sample(
cls,
......@@ -366,7 +438,6 @@ class OpenCVVideoBackend(VideoLoader, OpenCVVideoBackendMixin):
) -> list[int]:
total_frames_num = source.total_frames_num
duration = source.duration
num_frames = target.num_frames
fps = target.fps
# resample video to target num_frames and fps
......@@ -376,16 +447,18 @@ class OpenCVVideoBackend(VideoLoader, OpenCVVideoBackendMixin):
num_frames_to_sample = min(num_frames, total_frames_num)
if fps > 0:
num_frames_to_sample = min(num_frames_to_sample, math.floor(duration * fps))
num_frames_to_sample = max(1, num_frames_to_sample) # at least one sample
num_frames_to_sample = max(1, num_frames_to_sample)
if num_frames_to_sample == total_frames_num:
frame_idx = list(range(0, num_frames_to_sample))
else:
uniform_sampled_frames = np.linspace(
return list(range(num_frames_to_sample))
return np.linspace(
0, total_frames_num - 1, num_frames_to_sample, dtype=int
)
frame_idx = uniform_sampled_frames.tolist()
return frame_idx
).tolist()
@classmethod
def _prepare_source(cls, source: VideoSourceMetadata) -> VideoSourceMetadata:
"""Sampling-algorithm-specific metadata adjustment hook."""
return source
@classmethod
def load_bytes(
......@@ -395,55 +468,101 @@ class OpenCVVideoBackend(VideoLoader, OpenCVVideoBackendMixin):
fps: int = -1,
max_duration: int = 300,
frame_recovery: bool = False,
*,
backend: Literal["opencv", "pyav"] = "opencv",
**kwargs,
) -> tuple[npt.NDArray, dict[str, Any]]:
"""
Load video frames from bytes.
"""Load sampled frames from raw video bytes.
Args:
data: Raw video bytes
num_frames: Target number of frames to sample (-1 for all)
fps: Target FPS for sampling (-1 for original)
max_duration: Maximum duration (unused in base backend)
frame_recovery: Enable forward-scan recovery for failed frames
data: Raw video bytes.
num_frames: Target number of frames to sample (``-1`` for all).
fps: Target FPS for sampling (``-1`` for original).
max_duration: Maximum duration in seconds — only used by the
dynamic subclass; ignored here.
frame_recovery: Enable forward-scan recovery for failed frames.
Only honored by the OpenCV codec.
backend: Decoding codec — ``"opencv"`` or ``"pyav"`` .
Returns:
Tuple of (frames_array, metadata_dict)
Tuple of ``(frames_array, metadata_dict)``.
"""
cap = cls.open_video_capture(data)
source = OpenCVVideoBackendMixin.get_video_metadata(cap)
target = VideoTargetMetadata(
num_frames=num_frames,
fps=fps,
max_duration=max_duration,
num_frames=num_frames, fps=fps, max_duration=max_duration
)
# resample video to target num_frames and fps
# - the minimum of the two will be used
if backend == "opencv":
cap = cls.open_video_capture(data)
source = cls._prepare_source(cls.get_video_metadata(cap))
frame_idx = cls.compute_frames_index_to_sample(
source=source,
target=target,
source=source, target=target, **kwargs
)
frames, valid_frame_indices = cls.read_frames(
frames, valid = cls.read_frames(
cap,
frame_idx,
total_frames_num=source.total_frames_num,
frame_recovery=frame_recovery,
)
elif backend == "pyav":
assert not frame_recovery, (
"frame_recovery is only available for `opencv` backend"
)
with av.open(BytesIO(data)) as container:
source = cls._prepare_source(cls.get_metadata(container))
frame_idx = cls.compute_frames_index_to_sample(
source=source, target=target, **kwargs
)
frames, valid = cls.decode_frames(
container, frame_idx, source.original_fps, source.duration
)
else:
raise ValueError(
f"Unknown video codec backend {backend!r}; "
"valid options: 'opencv', 'pyav'."
)
metadata = cls.create_hf_metadata(
source=source,
video_backend="opencv",
valid_frame_indices=valid_frame_indices,
if len(valid) < len(frame_idx):
logger.warning(
"%s video loading: expected %d frames but got %d.",
backend,
len(frame_idx),
len(valid),
)
return frames, metadata
return frames, cls.create_hf_metadata(
source=source,
video_backend=f"{backend}{cls._sampling_suffix}",
valid_frame_indices=valid,
)
@VIDEO_LOADER_REGISTRY.register("opencv_dynamic")
class OpenCVDynamicVideoBackend(VideoLoader, OpenCVVideoBackendMixin):
class DynamicVideoBackend(VideoBackend):
"""Duration-aware dynamic-sampling video backend.
Samples at ``fps`` up to ``max_duration`` seconds, falling back to
uniform sampling across the full duration when the video is longer
than ``max_duration``. Codec is selectable the same way as
:class:`VideoBackend`.
"""
_sampling_suffix: ClassVar[str] = "_dynamic"
@classmethod
def _prepare_source(cls, source: VideoSourceMetadata) -> VideoSourceMetadata:
# Estimate duration from frame count and fps when the container
# does not report it (common for WebM/streaming inputs).
if source.duration:
return source
if source.original_fps > 0:
max_frame_idx = source.total_frames_num - 1
duration = round(max_frame_idx / source.original_fps) + 1
else:
duration = 0
return VideoSourceMetadata(
source.total_frames_num, source.original_fps, duration
)
@classmethod
def compute_frames_index_to_sample(
cls,
......@@ -456,8 +575,8 @@ class OpenCVDynamicVideoBackend(VideoLoader, OpenCVVideoBackendMixin):
original_fps = source.original_fps
max_duration = target.max_duration
fps = target.fps
max_frame_idx = source.total_frames_num - 1
# Refer to:
# https://github.com/huggingface/transformers/blob/v4.55.4/src/transformers/models/glm4v/video_processing_glm4v.py#L103-L140
frame_indices_list: list[int]
......@@ -491,62 +610,20 @@ class OpenCVDynamicVideoBackend(VideoLoader, OpenCVVideoBackendMixin):
fps: int = 2,
max_duration: int = 300,
frame_recovery: bool = False,
*,
backend: Literal["opencv", "pyav"] = "opencv",
**kwargs,
) -> tuple[npt.NDArray, dict[str, Any]]:
"""
Load video frames with dynamic sampling based on duration.
Args:
data: Raw video bytes
num_frames: Not used in dynamic backend
fps: Target FPS for sampling (default: 2)
max_duration: Maximum video duration to process (default: 300s)
frame_recovery: Enable forward-scan recovery for failed frames
Returns:
Tuple of (frames_array, metadata_dict)
"""
cap = cls.open_video_capture(data)
orig_source = OpenCVVideoBackendMixin.get_video_metadata(cap)
max_frame_idx = orig_source.total_frames_num - 1
duration = (
orig_source.duration or round(max_frame_idx / orig_source.original_fps) + 1
)
# recompute source metadata with adjusted duration to ensure correct
# sampling indices computation
source = VideoSourceMetadata(
total_frames_num=orig_source.total_frames_num,
original_fps=orig_source.original_fps,
duration=duration,
)
target = VideoTargetMetadata(
return super().load_bytes(
data,
num_frames=num_frames,
fps=fps,
max_duration=max_duration,
)
frame_indices_list = cls.compute_frames_index_to_sample(
source=source,
target=target,
)
frames, valid_frame_indices = cls.read_frames(
cap,
frame_indices_list,
total_frames_num=source.total_frames_num,
frame_recovery=frame_recovery,
backend=backend,
**kwargs,
)
metadata = cls.create_hf_metadata(
source=source,
video_backend="opencv_dynamic",
valid_frame_indices=valid_frame_indices,
)
return frames, metadata
@VIDEO_LOADER_REGISTRY.register("molmo2")
class Molmo2VideoBackend(VideoLoader, OpenCVVideoBackendMixin):
......@@ -835,7 +912,7 @@ class Molmo2VideoBackend(VideoLoader, OpenCVVideoBackendMixin):
@VIDEO_LOADER_REGISTRY.register("nemotron_vl")
class NemotronVLVideoBackend(OpenCVVideoBackend):
class NemotronVLVideoBackend(VideoBackend):
@classmethod
def load_bytes(
cls,
......@@ -844,14 +921,17 @@ class NemotronVLVideoBackend(OpenCVVideoBackend):
fps: int = -1,
max_duration: int = 300,
frame_recovery: bool = False,
*,
backend: Literal["opencv", "pyav"] = "opencv",
**kwargs,
) -> tuple[npt.NDArray, dict[str, Any]]:
frames, metadata = OpenCVVideoBackend.load_bytes(
frames, metadata = super().load_bytes(
data,
num_frames=num_frames,
fps=fps,
max_duration=max_duration,
frame_recovery=frame_recovery,
backend=backend,
**kwargs,
)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment