Unverified Commit c770600f authored by jiqing-feng's avatar jiqing-feng Committed by GitHub
Browse files

TVP model (#25856)

* tvp model for video grounding

add tokenizer auto

fix param in TVPProcessor

add docs

clear comments and enable different torch dtype

add image processor test and model test and fix code style

* fix conflict

* fix model doc

* fix image processing tests

* fix tvp tests

* remove torch in processor

* fix grammar error

* add more details on tvp.md

* fix model arch for loss, grammar, and processor

* add docstring and do not regard TvpTransformer, TvpVisionModel as individual model

* use pad_image

* update copyright

* control first downsample stride

* reduce first only works for ResNetBottleNeckLayer

* fix param name

* fix style

* add testing

* fix style

* rm init_weight

* fix style

* add post init

* fix comments

* do not test TvpTransformer

* fix warning

* fix style

* fix example

* fix config map

* add link in config

* fix comments

* fix style

* rm useless param

* change attention

* change test

* add notes

* fix comments

* fix tvp

* import checkpointing

* fix gradient checkpointing

* Use a more accurate example in readme

* update

* fix copy

* fix style

* update readme

* delete print

* remove tvp test_forward_signature

* remove TvpTransformer

* fix test init model

* merge main and make style

* fix tests and others

* fix image processor

* fix style and model_input_names

* fix tests
parent f5c9738f
# coding=utf-8
# Copyright 2023 The Intel AIA Team Authors, and HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License=, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing=, software
# distributed under the License is distributed on an "AS IS" BASIS=,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND=, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Image processor class for TVP."""
from typing import Dict, Iterable, List, Optional, Tuple, Union
import numpy as np
from ...image_processing_utils import BaseImageProcessor, BatchFeature, get_size_dict
from ...image_transforms import (
PaddingMode,
flip_channel_order,
pad,
resize,
to_channel_dimension_format,
)
from ...image_utils import (
IMAGENET_STANDARD_MEAN,
IMAGENET_STANDARD_STD,
ChannelDimension,
ImageInput,
PILImageResampling,
get_image_size,
is_valid_image,
to_numpy_array,
valid_images,
)
from ...utils import TensorType, is_vision_available, logging
if is_vision_available():
import PIL
logger = logging.get_logger(__name__)
# Copied from transformers.models.vivit.image_processing_vivit.make_batched
def make_batched(videos) -> List[List[ImageInput]]:
if isinstance(videos, (list, tuple)) and isinstance(videos[0], (list, tuple)) and is_valid_image(videos[0][0]):
return videos
elif isinstance(videos, (list, tuple)) and is_valid_image(videos[0]):
return [videos]
elif is_valid_image(videos):
return [[videos]]
raise ValueError(f"Could not make batched video from {videos}")
def get_resize_output_image_size(
input_image: np.ndarray,
max_size: int = 448,
input_data_format: Optional[Union[str, ChannelDimension]] = None,
) -> Tuple[int, int]:
height, width = get_image_size(input_image, input_data_format)
if height >= width:
ratio = width * 1.0 / height
new_height = max_size
new_width = new_height * ratio
else:
ratio = height * 1.0 / width
new_width = max_size
new_height = new_width * ratio
size = (int(new_height), int(new_width))
return size
class TvpImageProcessor(BaseImageProcessor):
r"""
Constructs a Tvp image processor.
Args:
do_resize (`bool`, *optional*, defaults to `True`):
Whether to resize the image's (height, width) dimensions to the specified `size`. Can be overridden by the
`do_resize` parameter in the `preprocess` method.
size (`Dict[str, int]` *optional*, defaults to `{"longest_edge": 448}`):
Size of the output image after resizing. The longest edge of the image will be resized to
`size["longest_edge"]` while maintaining the aspect ratio of the original image. Can be overriden by
`size` in the `preprocess` method.
resample (`PILImageResampling`, *optional*, defaults to `Resampling.BILINEAR`):
Resampling filter to use if resizing the image. Can be overridden by the `resample` parameter in the
`preprocess` method.
do_center_crop (`bool`, *optional*, defaults to `True`):
Whether to center crop the image to the specified `crop_size`. Can be overridden by the `do_center_crop`
parameter in the `preprocess` method.
crop_size (`Dict[str, int]`, *optional*, defaults to `{"height": 448, "width": 448}`):
Size of the image after applying the center crop. Can be overridden by the `crop_size` parameter in the
`preprocess` method.
do_rescale (`bool`, *optional*, defaults to `True`):
Whether to rescale the image by the specified scale `rescale_factor`. Can be overridden by the `do_rescale`
parameter in the `preprocess` method.
rescale_factor (`int` or `float`, *optional*, defaults to `1/255`):
Defines the scale factor to use if rescaling the image. Can be overridden by the `rescale_factor` parameter
in the `preprocess` method.
do_pad (`bool`, *optional*, defaults to `True`):
Whether to pad the image. Can be overridden by the `do_pad` parameter in the `preprocess` method.
pad_size (`Dict[str, int]`, *optional*, defaults to `{"height": 448, "width": 448}`):
Size of the image after applying the padding. Can be overridden by the `pad_size` parameter in the
`preprocess` method.
constant_values (`Union[float, Iterable[float]]`, *optional*, defaults to 0):
The fill value to use when padding the image.
pad_mode (`PaddingMode`, *optional*, defaults to `PaddingMode.CONSTANT`):
Use what kind of mode in padding.
do_normalize (`bool`, *optional*, defaults to `True`):
Whether to normalize the image. Can be overridden by the `do_normalize` parameter in the `preprocess`
method.
do_flip_channel_order (`bool`, *optional*, defaults to `True`):
Whether to flip the color channels from RGB to BGR. Can be overridden by the `do_flip_channel_order`
parameter in the `preprocess` method.
image_mean (`float` or `List[float]`, *optional*, defaults to `IMAGENET_STANDARD_MEAN`):
Mean to use if normalizing the image. This is a float or list of floats the length of the number of
channels in the image. Can be overridden by the `image_mean` parameter in the `preprocess` method.
image_std (`float` or `List[float]`, *optional*, defaults to `IMAGENET_STANDARD_STD`):
Standard deviation to use if normalizing the image. This is a float or list of floats the length of the
number of channels in the image. Can be overridden by the `image_std` parameter in the `preprocess` method.
"""
model_input_names = ["pixel_values"]
def __init__(
self,
do_resize: bool = True,
size: Dict[str, int] = None,
resample: PILImageResampling = PILImageResampling.BILINEAR,
do_center_crop: bool = True,
crop_size: Dict[str, int] = None,
do_rescale: bool = True,
rescale_factor: Union[int, float] = 1 / 255,
do_pad: bool = True,
pad_size: Dict[str, int] = None,
constant_values: Union[float, Iterable[float]] = 0,
pad_mode: PaddingMode = PaddingMode.CONSTANT,
do_normalize: bool = True,
do_flip_channel_order: bool = True,
image_mean: Optional[Union[float, List[float]]] = None,
image_std: Optional[Union[float, List[float]]] = None,
**kwargs,
) -> None:
super().__init__(**kwargs)
size = size if size is not None else {"longest_edge": 448}
crop_size = crop_size if crop_size is not None else {"height": 448, "width": 448}
pad_size = pad_size if pad_size is not None else {"height": 448, "width": 448}
self.do_resize = do_resize
self.size = size
self.do_center_crop = do_center_crop
self.crop_size = crop_size
self.resample = resample
self.do_rescale = do_rescale
self.rescale_factor = rescale_factor
self.do_pad = do_pad
self.pad_size = pad_size
self.constant_values = constant_values
self.pad_mode = pad_mode
self.do_normalize = do_normalize
self.do_flip_channel_order = do_flip_channel_order
self.image_mean = image_mean if image_mean is not None else IMAGENET_STANDARD_MEAN
self.image_std = image_std if image_std is not None else IMAGENET_STANDARD_STD
def resize(
self,
image: np.ndarray,
size: Dict[str, int],
resample: PILImageResampling = PILImageResampling.BILINEAR,
data_format: Optional[Union[str, ChannelDimension]] = None,
input_data_format: Optional[Union[str, ChannelDimension]] = None,
**kwargs,
) -> np.ndarray:
"""
Resize an image.
Args:
image (`np.ndarray`):
Image to resize.
size (`Dict[str, int]`):
Size of the output image. If `size` is of the form `{"height": h, "width": w}`, the output image will
have the size `(h, w)`. If `size` is of the form `{"longest_edge": s}`, the output image will have its
longest edge of length `s` while keeping the aspect ratio of the original image.
resample (`PILImageResampling`, *optional*, defaults to `PILImageResampling.BILINEAR`):
Resampling filter to use when resiizing the image.
data_format (`str` or `ChannelDimension`, *optional*):
The channel dimension format of the image. If not provided, it will be the same as the input image.
input_data_format (`str` or `ChannelDimension`, *optional*):
The channel dimension format of the input image. If not provided, it will be inferred.
"""
size = get_size_dict(size, default_to_square=False)
if "height" in size and "width" in size:
output_size = (size["height"], size["width"])
elif "longest_edge" in size:
output_size = get_resize_output_image_size(image, size["longest_edge"], input_data_format)
else:
raise ValueError(f"Size must have 'height' and 'width' or 'longest_edge' as keys. Got {size.keys()}")
return resize(
image,
size=output_size,
resample=resample,
data_format=data_format,
input_data_format=input_data_format,
**kwargs,
)
def pad_image(
self,
image: np.ndarray,
pad_size: Dict[str, int] = None,
constant_values: Union[float, Iterable[float]] = 0,
pad_mode: PaddingMode = PaddingMode.CONSTANT,
data_format: Optional[Union[str, ChannelDimension]] = None,
input_data_format: Optional[Union[str, ChannelDimension]] = None,
**kwargs,
):
"""
Pad an image with zeros to the given size.
Args:
image (`np.ndarray`):
Image to pad.
pad_size (`Dict[str, int]`)
Size of the output image with pad.
constant_values (`Union[float, Iterable[float]]`)
The fill value to use when padding the image.
pad_mode (`PaddingMode`)
The pad mode, default to PaddingMode.CONSTANT
data_format (`ChannelDimension` or `str`, *optional*)
The channel dimension format of the image. If not provided, it will be the same as the input image.
input_data_format (`ChannelDimension` or `str`, *optional*):
The channel dimension format of the input image. If not provided, it will be inferred.
"""
height, width = get_image_size(image, channel_dim=input_data_format)
max_height = pad_size.get("height", height)
max_width = pad_size.get("width", width)
pad_right, pad_bottom = max_width - width, max_height - height
if pad_right < 0 or pad_bottom < 0:
raise ValueError("The padding size must be greater than image size")
padding = ((0, pad_bottom), (0, pad_right))
padded_image = pad(
image,
padding,
mode=pad_mode,
constant_values=constant_values,
data_format=data_format,
input_data_format=input_data_format,
)
return padded_image
def _preprocess_image(
self,
image: ImageInput,
do_resize: bool = None,
size: Dict[str, int] = None,
resample: PILImageResampling = None,
do_center_crop: bool = None,
crop_size: Dict[str, int] = None,
do_rescale: bool = None,
rescale_factor: float = None,
do_pad: bool = True,
pad_size: Dict[str, int] = None,
constant_values: Union[float, Iterable[float]] = None,
pad_mode: PaddingMode = None,
do_normalize: bool = None,
do_flip_channel_order: bool = None,
image_mean: Optional[Union[float, List[float]]] = None,
image_std: Optional[Union[float, List[float]]] = None,
data_format: Optional[ChannelDimension] = ChannelDimension.FIRST,
input_data_format: Optional[Union[str, ChannelDimension]] = None,
**kwargs,
) -> np.ndarray:
"""Preprocesses a single image."""
if do_resize and size is None or resample is None:
raise ValueError("Size and resample must be specified if do_resize is True.")
if do_center_crop and crop_size is None:
raise ValueError("Crop size must be specified if do_center_crop is True.")
if do_rescale and rescale_factor is None:
raise ValueError("Rescale factor must be specified if do_rescale is True.")
if do_pad and pad_size is None:
raise ValueError("Padding size must be specified if do_pad is True.")
if do_normalize and (image_mean is None or image_std is None):
raise ValueError("Image mean and std must be specified if do_normalize is True.")
# All transformations expect numpy arrays.
image = to_numpy_array(image)
if do_resize:
image = self.resize(image=image, size=size, resample=resample, input_data_format=input_data_format)
if do_center_crop:
image = self.center_crop(image, size=crop_size, input_data_format=input_data_format)
if do_rescale:
image = self.rescale(image=image, scale=rescale_factor, input_data_format=input_data_format)
if do_normalize:
image = self.normalize(
image=image.astype(np.float32), mean=image_mean, std=image_std, input_data_format=input_data_format
)
if do_pad:
image = self.pad_image(
image=image,
pad_size=pad_size,
constant_values=constant_values,
pad_mode=pad_mode,
input_data_format=input_data_format,
)
# the pretrained checkpoints assume images are BGR, not RGB
if do_flip_channel_order:
image = flip_channel_order(image=image, input_data_format=input_data_format)
image = to_channel_dimension_format(image, data_format, input_channel_dim=input_data_format)
return image
def preprocess(
self,
videos: Union[ImageInput, List[ImageInput], List[List[ImageInput]]],
do_resize: bool = None,
size: Dict[str, int] = None,
resample: PILImageResampling = None,
do_center_crop: bool = None,
crop_size: Dict[str, int] = None,
do_rescale: bool = None,
rescale_factor: float = None,
do_pad: bool = None,
pad_size: Dict[str, int] = None,
constant_values: Union[float, Iterable[float]] = None,
pad_mode: PaddingMode = None,
do_normalize: bool = None,
do_flip_channel_order: bool = None,
image_mean: Optional[Union[float, List[float]]] = None,
image_std: Optional[Union[float, List[float]]] = None,
return_tensors: Optional[Union[str, TensorType]] = None,
data_format: ChannelDimension = ChannelDimension.FIRST,
input_data_format: Optional[Union[str, ChannelDimension]] = None,
**kwargs,
) -> PIL.Image.Image:
"""
Preprocess an image or batch of images.
Args:
videos (`ImageInput` or `List[ImageInput]` or `List[List[ImageInput]]`):
Frames to preprocess.
do_resize (`bool`, *optional*, defaults to `self.do_resize`):
Whether to resize the image.
size (`Dict[str, int]`, *optional*, defaults to `self.size`):
Size of the image after applying resize.
resample (`PILImageResampling`, *optional*, defaults to `self.resample`):
Resampling filter to use if resizing the image. This can be one of the enum `PILImageResampling`, Only
has an effect if `do_resize` is set to `True`.
do_center_crop (`bool`, *optional*, defaults to `self.do_centre_crop`):
Whether to centre crop the image.
crop_size (`Dict[str, int]`, *optional*, defaults to `self.crop_size`):
Size of the image after applying the centre crop.
do_rescale (`bool`, *optional*, defaults to `self.do_rescale`):
Whether to rescale the image values between [0 - 1].
rescale_factor (`float`, *optional*, defaults to `self.rescale_factor`):
Rescale factor to rescale the image by if `do_rescale` is set to `True`.
do_pad (`bool`, *optional*, defaults to `True`):
Whether to pad the image. Can be overridden by the `do_pad` parameter in the `preprocess` method.
pad_size (`Dict[str, int]`, *optional*, defaults to `{"height": 448, "width": 448}`):
Size of the image after applying the padding. Can be overridden by the `pad_size` parameter in the
`preprocess` method.
constant_values (`Union[float, Iterable[float]]`, *optional*, defaults to 0):
The fill value to use when padding the image.
pad_mode (`PaddingMode`, *optional*, defaults to "PaddingMode.CONSTANT"):
Use what kind of mode in padding.
do_normalize (`bool`, *optional*, defaults to `self.do_normalize`):
Whether to normalize the image.
do_flip_channel_order (`bool`, *optional*, defaults to `self.do_flip_channel_order`):
Whether to flip the channel order of the image.
image_mean (`float` or `List[float]`, *optional*, defaults to `self.image_mean`):
Image mean.
image_std (`float` or `List[float]`, *optional*, defaults to `self.image_std`):
Image standard deviation.
return_tensors (`str` or `TensorType`, *optional*):
The type of tensors to return. Can be one of:
- Unset: Return a list of `np.ndarray`.
- `TensorType.TENSORFLOW` or `'tf'`: Return a batch of type `tf.Tensor`.
- `TensorType.PYTORCH` or `'pt'`: Return a batch of type `torch.Tensor`.
- `TensorType.NUMPY` or `'np'`: Return a batch of type `np.ndarray`.
- `TensorType.JAX` or `'jax'`: Return a batch of type `jax.numpy.ndarray`.
data_format (`ChannelDimension` or `str`, *optional*, defaults to `ChannelDimension.FIRST`):
The channel dimension format for the output image. Can be one of:
- `ChannelDimension.FIRST`: image in (num_channels, height, width) format.
- `ChannelDimension.LAST`: image in (height, width, num_channels) format.
- Unset: Use the inferred channel dimension format of the input image.
input_data_format (`ChannelDimension` or `str`, *optional*):
The channel dimension format for the input image. If unset, the channel dimension format is inferred
from the input image. Can be one of:
- `"channels_first"` or `ChannelDimension.FIRST`: image in (num_channels, height, width) format.
- `"channels_last"` or `ChannelDimension.LAST`: image in (height, width, num_channels) format.
- `"none"` or `ChannelDimension.NONE`: image in (height, width) format.
"""
do_resize = do_resize if do_resize is not None else self.do_resize
resample = resample if resample is not None else self.resample
do_center_crop = do_center_crop if do_center_crop is not None else self.do_center_crop
do_rescale = do_rescale if do_rescale is not None else self.do_rescale
rescale_factor = rescale_factor if rescale_factor is not None else self.rescale_factor
do_pad = do_pad if do_pad is not None else self.do_pad
pad_size = pad_size if pad_size is not None else self.pad_size
constant_values = constant_values if constant_values is not None else self.constant_values
pad_mode = pad_mode if pad_mode else self.pad_mode
do_normalize = do_normalize if do_normalize is not None else self.do_normalize
do_flip_channel_order = (
do_flip_channel_order if do_flip_channel_order is not None else self.do_flip_channel_order
)
image_mean = image_mean if image_mean is not None else self.image_mean
image_std = image_std if image_std is not None else self.image_std
size = size if size is not None else self.size
size = get_size_dict(size, default_to_square=False)
crop_size = crop_size if crop_size is not None else self.crop_size
crop_size = get_size_dict(crop_size, param_name="crop_size")
if not valid_images(videos):
raise ValueError(
"Invalid image type. Must be of type PIL.Image.Image, numpy.ndarray, "
"torch.Tensor, tf.Tensor or jax.ndarray."
)
videos = make_batched(videos)
videos = [
np.array(
[
self._preprocess_image(
image=img,
do_resize=do_resize,
size=size,
resample=resample,
do_center_crop=do_center_crop,
crop_size=crop_size,
do_rescale=do_rescale,
rescale_factor=rescale_factor,
do_pad=do_pad,
pad_size=pad_size,
constant_values=constant_values,
pad_mode=pad_mode,
do_normalize=do_normalize,
do_flip_channel_order=do_flip_channel_order,
image_mean=image_mean,
image_std=image_std,
data_format=data_format,
input_data_format=input_data_format,
)
for img in video
]
)
for video in videos
]
data = {"pixel_values": videos}
return BatchFeature(data=data, tensor_type=return_tensors)
# coding=utf-8
# Copyright 2023 The Intel AIA Team Authors, and HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License=, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing=, software
# distributed under the License is distributed on an "AS IS" BASIS=,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND=, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""PyTorch TVP Model"""
import math
from dataclasses import dataclass
from typing import Optional, Tuple
import torch
import torch.utils.checkpoint
from torch import nn
from ...activations import ACT2FN
from ...file_utils import add_start_docstrings, add_start_docstrings_to_model_forward, replace_return_docstrings
from ...modeling_outputs import BaseModelOutput, BaseModelOutputWithPooling, ModelOutput
from ...modeling_utils import PreTrainedModel
from ...pytorch_utils import prune_linear_layer
from ...utils import logging
from ..auto import AutoBackbone
from .configuration_tvp import TvpConfig
logger = logging.get_logger(__name__)
TVP_PRETRAINED_MODEL_ARCHIVE_LIST = [
"Intel/tvp-base",
"Intel/tvp-base-ANet",
# See all Tvp models at https://huggingface.co/models?filter=tvp
]
@dataclass
class TvpVideoGroundingOutput(ModelOutput):
"""
Args:
loss (`torch.FloatTensor` of shape `(1,)`, *optional*, returned when `return_loss` is `True`):
Temporal-Distance IoU loss for video grounding.
logits (`torch.FloatTensor` of shape `(batch_size, 2)`):
Contains start_time/duration and end_time/duration. It is the time slot of the videos corresponding to the
input texts.
hidden_states (`tuple(torch.FloatTensor)`, *optional*, returned when `output_hidden_states=True` is passed or when `config.output_hidden_states=True`):
Tuple of `torch.FloatTensor` (one for the output of the embeddings, if the model has an embedding layer, +
one for the output of each layer) of shape `(batch_size, sequence_length, hidden_size)`. Hidden-states of
the model at the output of each layer plus the optional initial embedding outputs.
attentions (`tuple(torch.FloatTensor)`, *optional*, returned when `output_attentions=True` is passed or when `config.output_attentions=True`):
Tuple of `torch.FloatTensor` (one for each layer) of shape `(batch_size, num_heads, sequence_length,
sequence_length)`.
"""
loss: Optional[torch.FloatTensor] = None
logits: torch.FloatTensor = None
hidden_states: Optional[Tuple[torch.FloatTensor]] = None
attentions: Optional[Tuple[torch.FloatTensor]] = None
class TvpLoss(nn.Module):
"""
This class computes the losses for `TvpForVideoGrounding`. The process happens in two steps: 1) we compute
hungarian assignment between ground truth boxes and the outputs of the model 2) we supervise each pair of matched
ground-truth / prediction (supervise class and box).
Args:
losses (`List[str]`):
List of all the losses to be applied.
"""
def __init__(self, losses):
super().__init__()
self.loss_map = {
"iou": self.loss_iou,
"distance": self.loss_distance,
"duration": self.loss_duration,
}
for loss in losses:
if loss not in self.loss_map:
raise ValueError(f"Loss {loss} not supported")
self.losses = losses
def loss_iou(self, start_time, end_time, candidates_start_time, candidates_end_time, duration):
"""
Measure the intersection over union.
"""
inter = torch.min(candidates_end_time, end_time) - torch.max(candidates_start_time, start_time)
union = torch.max(candidates_end_time, end_time) - torch.min(candidates_start_time, start_time)
iou = 1 - inter.clamp(min=0) / union
return iou
def loss_distance(self, start_time, end_time, candidates_start_time, candidates_end_time, duration):
"""
Measure the distance of mid points.
"""
mid_candidates = torch.div(torch.add(candidates_start_time, candidates_end_time), 2.0)
mid_groundtruth = torch.div(torch.add(start_time, end_time), 2.0)
distance_diff = torch.div(
torch.max(mid_candidates, mid_groundtruth) - torch.min(mid_candidates, mid_groundtruth), duration
).clamp(min=0.2)
return distance_diff
def loss_duration(self, start_time, end_time, candidates_start_time, candidates_end_time, duration):
"""
Measure the difference of duration.
"""
duration_candidates = torch.sub(candidates_end_time, candidates_start_time)
duration_groundtruth = torch.sub(end_time, start_time)
duration_diff = torch.square(torch.div(torch.sub(duration_candidates, duration_groundtruth), duration))
duration_diff = duration_diff.clamp(min=0.4)
return duration_diff
def forward(self, logits, labels):
"""
This performs the loss computation.
Args:
logits (`torch.FloatTensor`):
The output logits of head module.
labels (`List[torch.FloatTensor]`):
List of tensors ([start, end, duration]), which contains start time, end time of the video corresponding to the text, and also the duration.
"""
duration, start_time, end_time = labels
candidates = torch.mul(logits, duration)
candidates_start_time, candidates_end_time = candidates[:, 0].float(), candidates[:, 1].float()
losses_dict = {}
for loss in self.losses:
losses_dict.update(
{loss: self.loss_map[loss](start_time, end_time, candidates_start_time, candidates_end_time, duration)}
)
return losses_dict
class TvpVisionModel(nn.Module):
def __init__(self, config):
super().__init__()
self.backbone = AutoBackbone.from_config(config.backbone_config)
self.grid_encoder_conv = nn.Conv2d(
config.backbone_config.hidden_sizes[-1],
config.hidden_size,
kernel_size=3,
stride=1,
padding=1,
groups=1,
bias=False,
)
def forward(self, pixel_values):
batch_size, num_frames, num_channels, height, width = pixel_values.shape
# (batch_size * num_frames, num_channels, height, width)
pixel_values = pixel_values.view(batch_size * num_frames, num_channels, height, width)
grid_feat_outputs = self.backbone(pixel_values)["feature_maps"][0]
grid = self.grid_encoder_conv(grid_feat_outputs)
grid = nn.functional.max_pool2d(grid, kernel_size=2, stride=2)
grid = nn.functional.relu(grid, inplace=True)
new_channel, new_height, new_width = grid.shape[-3:]
# (batch_size, num_frames, num_channels, height, width)
grid = grid.view(batch_size, num_frames, new_channel, new_height, new_width)
# (batch_size, num_frames, height, width, num_channels)
grid = grid.permute(0, 1, 3, 4, 2)
return grid
class TvpVisualInputEmbedding(nn.Module):
"""
Takes input of both image and video (multi-frame)
"""
def __init__(self, config):
super().__init__()
# sequence embedding
self.position_embeddings = nn.Embedding(config.max_position_embeddings, config.hidden_size)
self.row_position_embeddings = nn.Embedding(config.max_grid_row_position_embeddings, config.hidden_size)
self.col_position_embeddings = nn.Embedding(config.max_grid_col_position_embeddings, config.hidden_size)
self.token_type_embeddings = nn.Embedding(1, config.hidden_size)
self.layer_norm = nn.LayerNorm(config.hidden_size, eps=config.layer_norm_eps)
self.dropout = nn.Dropout(config.hidden_dropout_prob)
def add_2d_positional_embeddings(self, grid):
"""
Args:
grid: (batch_size, height, width, hidden_dim)
Returns:
grid + col_position_embeddings.view(*col_shape): (batch_size, *, height, width, hidden_dim)
"""
batch_size, height, width, hidden_dim = grid.shape
# add row-wise position embeddings
row_position_ids = torch.arange(height, dtype=torch.long, device=grid.device) # (height, )
row_position_embeddings = self.row_position_embeddings(row_position_ids) # (height, hidden_dim)
row_shape = (1,) * (len(grid.shape) - 3) + (height, 1, hidden_dim) # (1, height, 1, hidden_dim)
grid = grid + row_position_embeddings.view(*row_shape) # broadcast automatically
# add column-wise position embeddings
col_position_ids = torch.arange(width, dtype=torch.long, device=grid.device) # (width, )
col_position_embeddings = self.col_position_embeddings(col_position_ids) # (width, hidden_dim)
col_shape = (batch_size, 1, width, hidden_dim) # (1, 1, width, hidden_dim)
return grid + col_position_embeddings.view(*col_shape) # broadcast automatically
def forward(self, grid):
"""
Args:
grid: Array of shape (batch_size, num_frames, height, width, num_channels).
It contains processed frames extracted from videos, and is generated by Tvp image preprocessor. Note,
num_frames can be 1
Returns:
embeddings: The embedding of grid with size (batch_size, height*width, num_channels)
"""
batch_size, num_frames, height, width, num_channels = grid.shape
# temporal mean pooling, (batch_size, height, width, hidden_size)
grid = grid.mean(1)
grid = self.add_2d_positional_embeddings(grid)
# image token sequence, (batch_size, height*width, num_channels)
visual_tokens = grid.view(batch_size, -1, num_channels)
visual_tokens_shape = visual_tokens.shape[:-1]
device = visual_tokens.device
# image token type embeddings.
token_type_ids = torch.zeros(visual_tokens_shape, dtype=torch.long, device=device)
token_type_embeddings = self.token_type_embeddings(token_type_ids)
embeddings = visual_tokens + token_type_embeddings
embeddings = self.layer_norm(embeddings)
embeddings = self.dropout(embeddings)
return embeddings
class TvpTextInputEmbeddings(nn.Module):
"""Construct the embeddings from word, position and token_type embeddings."""
def __init__(self, config):
super().__init__()
self.word_embeddings = nn.Embedding(config.vocab_size, config.hidden_size, padding_idx=config.pad_token_id)
self.position_embeddings = nn.Embedding(config.max_position_embeddings, config.hidden_size)
self.token_type_embeddings = nn.Embedding(config.type_vocab_size, config.hidden_size)
self.layer_norm = nn.LayerNorm(config.hidden_size, eps=config.layer_norm_eps)
self.dropout = nn.Dropout(config.hidden_dropout_prob)
def forward(self, input_ids=None, token_type_ids=None, position_ids=None, inputs_embeds=None):
if input_ids is not None:
input_shape = input_ids.size()
else:
input_shape = inputs_embeds.size()[:-1]
seq_length = input_shape[1]
device = input_ids.device if input_ids is not None else inputs_embeds.device
if position_ids is None:
position_ids = torch.arange(seq_length, dtype=torch.long, device=device)
position_ids = position_ids.unsqueeze(0).expand(input_shape)
if token_type_ids is None:
token_type_ids = torch.zeros(input_shape, dtype=torch.long, device=device)
if inputs_embeds is None:
inputs_embeds = self.word_embeddings(input_ids)
position_embeddings = self.position_embeddings(position_ids)
token_type_embeddings = self.token_type_embeddings(token_type_ids)
embeddings = inputs_embeds + position_embeddings + token_type_embeddings
embeddings = self.layer_norm(embeddings)
embeddings = self.dropout(embeddings)
return embeddings
class TvpAttention(nn.Module):
def __init__(self, config):
super().__init__()
if config.hidden_size % config.num_attention_heads != 0 and not hasattr(config, "embedding_size"):
raise ValueError(
f"The hidden size {config.hidden_size} is not a multiple of the number of attention heads {config.num_attention_heads}"
)
self.num_attention_heads = config.num_attention_heads
self.attention_head_size = int(config.hidden_size / config.num_attention_heads)
self.all_head_size = self.num_attention_heads * self.attention_head_size
self.query = nn.Linear(config.hidden_size, self.all_head_size)
self.key = nn.Linear(config.hidden_size, self.all_head_size)
self.value = nn.Linear(config.hidden_size, self.all_head_size)
self.attn_dropout = nn.Dropout(config.attention_probs_dropout_prob)
self.dense = nn.Linear(config.hidden_size, config.hidden_size)
self.layer_norm = nn.LayerNorm(config.hidden_size, eps=config.layer_norm_eps)
self.dropout = nn.Dropout(config.hidden_dropout_prob)
self.pruned_heads = set()
def prune_heads(self, heads):
if len(heads) == 0:
return
mask = torch.ones(self.num_attention_heads, self.attention_head_size)
heads = set(heads) - self.pruned_heads # Convert to set and remove already pruned heads
for head in heads:
# Compute how many pruned heads are before the head and move the index accordingly
head = head - sum(1 if h < head else 0 for h in self.pruned_heads)
mask[head] = 0
mask = mask.view(-1).contiguous().eq(1)
index = torch.arange(len(mask))[mask].long()
# Prune linear layers
self.query = prune_linear_layer(self.query, index)
self.key = prune_linear_layer(self.key, index)
self.value = prune_linear_layer(self.value, index)
self.dense = prune_linear_layer(self.dense, index, dim=1)
# Update hyper params and store pruned heads
self.num_attention_heads = self.num_attention_heads - len(heads)
self.all_head_size = self.attention_head_size * self.num_attention_heads
self.pruned_heads = self.pruned_heads.union(heads)
def _reshape(self, tensor: torch.Tensor, sequence_length: int, batch_size: int):
return (
tensor.view(batch_size, sequence_length, self.num_attention_heads, self.attention_head_size)
.transpose(1, 2)
.contiguous()
)
def forward(
self,
hidden_states,
attention_mask=None,
head_mask=None,
output_attentions: Optional[bool] = None,
):
batch_size, sequence_length = hidden_states.shape[:2]
mixed_query_layer = self.query(hidden_states)
mixed_key_layer = self.key(hidden_states)
mixed_value_layer = self.value(hidden_states)
query_layer = self._reshape(mixed_query_layer, sequence_length, batch_size)
key_layer = self._reshape(mixed_key_layer, sequence_length, batch_size)
value_layer = self._reshape(mixed_value_layer, sequence_length, batch_size)
# Take the dot product between "query" and "key" to get the raw attention scores.
attention_scores = torch.matmul(query_layer, key_layer.transpose(-1, -2))
attention_scores = attention_scores / math.sqrt(self.attention_head_size)
if attention_mask is not None:
attention_scores = attention_scores + attention_mask
# Normalize the attention scores to probabilities.
attention_probs = nn.functional.softmax(attention_scores, dim=-1)
# This is actually dropping out entire tokens to attend to, which might
# seem a bit unusual, but is taken from the original Transformer paper.
attention_probs = self.attn_dropout(attention_probs)
# Mask heads if we want to
if head_mask is not None:
attention_probs = attention_probs * head_mask
attn_output = torch.matmul(attention_probs, value_layer)
attn_output = attn_output.transpose(1, 2).contiguous()
attn_output = attn_output.reshape(batch_size, sequence_length, self.all_head_size)
attn_output = self.dense(attn_output)
attn_output = self.dropout(attn_output)
attn_output = self.layer_norm(attn_output + hidden_states)
# add attentions if we output them
outputs = (attn_output, attention_probs) if output_attentions else (attn_output,)
return outputs
# Copied from transformers.models.bert.modeling_bert.BertIntermediate with Bert->Tvp
class TvpIntermediate(nn.Module):
def __init__(self, config):
super().__init__()
self.dense = nn.Linear(config.hidden_size, config.intermediate_size)
if isinstance(config.hidden_act, str):
self.intermediate_act_fn = ACT2FN[config.hidden_act]
else:
self.intermediate_act_fn = config.hidden_act
def forward(self, hidden_states: torch.Tensor) -> torch.Tensor:
hidden_states = self.dense(hidden_states)
hidden_states = self.intermediate_act_fn(hidden_states)
return hidden_states
class TvpOutputLayer(nn.Module):
def __init__(self, config):
super().__init__()
self.dense = nn.Linear(config.intermediate_size, config.hidden_size)
self.layer_norm = nn.LayerNorm(config.hidden_size, eps=config.layer_norm_eps)
self.dropout = nn.Dropout(config.hidden_dropout_prob)
def forward(self, hidden_states: torch.Tensor, input_tensor: torch.Tensor) -> torch.Tensor:
hidden_states = self.dense(hidden_states)
hidden_states = self.dropout(hidden_states)
hidden_states = self.layer_norm(hidden_states + input_tensor)
return hidden_states
class TvpEncodeLayer(nn.Module):
def __init__(self, config):
super().__init__()
self.attention = TvpAttention(config)
self.intermediate = TvpIntermediate(config)
self.output = TvpOutputLayer(config)
def forward(
self,
hidden_states,
attention_mask=None,
head_mask=None,
output_attentions: Optional[bool] = None,
):
self_attention_outputs = self.attention(
hidden_states,
attention_mask,
head_mask,
output_attentions=output_attentions,
)
attention_output = self_attention_outputs[0]
outputs = self_attention_outputs[1:] # add self attentions if we output attention weights
intermediate_output = self.intermediate(attention_output)
layer_output = self.output(intermediate_output, attention_output)
outputs = (layer_output,) + outputs
return outputs
class TvpEncoder(nn.Module):
def __init__(self, config):
super().__init__()
self.config = config
self.layer = nn.ModuleList([TvpEncodeLayer(config) for _ in range(config.num_hidden_layers)])
self.gradient_checkpointing = False
def forward(
self,
hidden_states,
attention_mask=None,
head_mask: Optional[torch.FloatTensor] = None,
output_attentions: Optional[bool] = None,
output_hidden_states: Optional[bool] = None,
return_dict: Optional[bool] = None,
):
return_dict = return_dict if return_dict is not None else self.config.return_dict
output_attentions = output_attentions if output_attentions is not None else self.config.output_attentions
output_hidden_states = (
output_hidden_states if output_hidden_states is not None else self.config.output_hidden_states
)
all_hidden_states = ()
all_attentions = ()
for i, layer_module in enumerate(self.layer):
if output_hidden_states:
all_hidden_states = all_hidden_states + (hidden_states,)
if self.gradient_checkpointing and self.training:
layer_outputs = self._gradient_checkpointing_func(
layer_module.__call__,
hidden_states,
attention_mask,
(head_mask[i] if head_mask is not None else None),
output_attentions,
)
else:
layer_outputs = layer_module(hidden_states, attention_mask, head_mask[i], output_attentions)
hidden_states = layer_outputs[0]
if output_attentions:
all_attentions = all_attentions + (layer_outputs[1],)
# Add last layer
if output_hidden_states:
all_hidden_states = all_hidden_states + (hidden_states,)
if not return_dict:
outputs = (hidden_states,)
if output_hidden_states:
outputs = outputs + (all_hidden_states,)
if output_attentions:
outputs = outputs + (all_attentions,)
return outputs # last-layer hidden state, (all hidden states), (all attentions)
return BaseModelOutput(
last_hidden_state=hidden_states,
hidden_states=all_hidden_states if output_hidden_states else None,
attentions=all_attentions if output_attentions else None,
)
# Copied from transformers.models.bert.modeling_bert.BertPooler with Bert->Tvp
class TvpPooler(nn.Module):
def __init__(self, config):
super().__init__()
self.dense = nn.Linear(config.hidden_size, config.hidden_size)
self.activation = nn.Tanh()
def forward(self, hidden_states: torch.Tensor) -> torch.Tensor:
# We "pool" the model by simply taking the hidden state corresponding
# to the first token.
first_token_tensor = hidden_states[:, 0]
pooled_output = self.dense(first_token_tensor)
pooled_output = self.activation(pooled_output)
return pooled_output
class TvpPreTrainedModel(PreTrainedModel):
"""An abstract class to handle weights initialization and
a simple interface for downloading and loading pretrained models.
"""
config_class = TvpConfig
base_model_prefix = "model"
supports_gradient_checkpointing = True
def _init_weights(self, module):
"""Initialize the weights"""
if isinstance(module, (nn.Linear, nn.Embedding)):
# Slightly different from the TF version which uses truncated_normal for initialization
# cf https://github.com/pytorch/pytorch/pull/5617
module.weight.data.normal_(mean=0.0, std=self.config.initializer_range)
elif isinstance(module, nn.LayerNorm):
module.bias.data.zero_()
module.weight.data.fill_(1.0)
if isinstance(module, nn.Linear) and module.bias is not None:
module.bias.data.zero_()
if isinstance(module, nn.Conv2d):
nn.init.kaiming_normal_(module.weight, mode="fan_out", nonlinearity="relu")
if module.bias is not None:
nn.init.constant_(module.bias, 0)
TVP_START_DOCSTRING = r"""
This model is a PyTorch [torch.nn.Module](https://pytorch.org/docs/stable/nn.html#torch.nn.Module) subclass. Use it
as a regular PyTorch Module and refer to the PyTorch documentation for all matter related to general usage and
behavior.
Parameters:
config ([`TvpConfig`]): Model configuration class with all the parameters of the model.
Initializing with a config file does not load the weights associated with the model, only the
configuration. Check out the [`~PreTrainedModel.from_pretrained`] method to load the model weights.
"""
TVP_INPUTS_DOCSTRING = r"""
Args:
input_ids (`torch.LongTensor` of shape `(batch_size, sequence_length)`):
Indices of input sequence tokens in the vocabulary. Indices can be obtained using [`AutoTokenizer`]. See
[`PreTrainedTokenizer.encode`] and [`PreTrainedTokenizer.__call__`] for details. [What are input
IDs?](../glossary#input-ids)
pixel_values (`torch.FloatTensor` of shape `(batch_size, num_frames, num_channels, height, width)`):
Pixel values. Pixel values can be obtained using [`TvpImageProcessor`]. See [`TvpImageProcessor.__call__`]
for details.
attention_mask (`torch.FloatTensor` of shape `(batch_size, sequence_length)`, *optional*):
Mask to avoid performing attention on padding token indices. Mask values selected in `[0, 1]`:
- 1 for tokens that are **not masked**,
- 0 for tokens that are **masked**.
[What are attention masks?](../glossary#attention-mask)
head_mask (`torch.FloatTensor` of shape `(num_heads,)` or `(num_layers, num_heads)`, *optional*):
Mask to nullify selected heads of the self-attention modules. Mask values selected in `[0, 1]`:
- 1 indicates the head is **not masked**,
- 0 indicates the head is **masked**.
output_attentions (`bool`, *optional*):
Whether or not to return the attentions tensors of all attention layers. See `attentions` under returned
tensors for more detail.
output_hidden_states (`bool`, *optional*):
Whether or not to return the hidden states of all layers. See `hidden_states` under returned tensors for
more detail.
return_dict (`bool`, *optional*):
Whether or not to return a [`~utils.ModelOutput`] instead of a plain tuple.
"""
class TvpFrameDownPadPrompter(nn.Module):
"""
Pad frames extracted from videos only at the bottom.
"""
def __init__(self, config):
if config.visual_prompter_apply not in ("add", "replace", "remove"):
raise ValueError("`visual_prompter_apply` must be in (add, replace, remove)")
super().__init__()
self.visual_prompt_size = config.visual_prompt_size
self.frame_num = config.frame_num
self.max_img_size = config.max_img_size
self.visual_prompter_apply = config.visual_prompter_apply
self.pad_down = nn.Parameter(
torch.randn([1, config.frame_num, 3, config.visual_prompt_size, config.max_img_size])
)
def forward(self, pixel_values):
if self.visual_prompter_apply != "add":
visual_prompt_mask = torch.ones([self.max_img_size, self.max_img_size], dtype=pixel_values.dtype)
visual_prompt_mask[self.max_img_size - self.visual_prompt_size : self.max_img_size, :] = 0.0
pixel_values *= visual_prompt_mask
if self.visual_prompter_apply != "remove":
prompt = torch.zeros(
[pixel_values.shape[0], pixel_values.shape[1], 3, self.max_img_size, self.max_img_size]
)
start_point = self.max_img_size - self.visual_prompt_size
prompt[:, :, :, start_point : self.max_img_size, :] = self.pad_down
pixel_values += prompt.to(pixel_values.dtype)
return pixel_values
class TvpFramePadPrompter(nn.Module):
"""
Pad frames extracted from videos in the surroundings.
"""
def __init__(self, config):
if config.visual_prompter_apply not in ("add", "replace", "remove"):
raise ValueError("`visual_prompter_apply` must be in (add, replace, remove)")
super().__init__()
self.num_frames = config.num_frames
self.max_img_size = config.max_img_size
self.visual_prompter_apply = config.visual_prompter_apply
self.base_size = config.max_img_size - config.visual_prompt_size * 2
self.pad_up = nn.Parameter(
torch.randn([1, config.num_frames, 3, config.visual_prompt_size, config.max_img_size])
)
self.pad_down = nn.Parameter(
torch.randn([1, config.num_frames, 3, config.visual_prompt_size, config.max_img_size])
)
self.pad_left = nn.Parameter(
torch.randn(
[
1,
config.num_frames,
3,
config.max_img_size - config.visual_prompt_size * 2,
config.visual_prompt_size,
]
)
)
self.pad_right = nn.Parameter(
torch.randn(
[
1,
config.num_frames,
3,
config.max_img_size - config.visual_prompt_size * 2,
config.visual_prompt_size,
]
)
)
def forward(self, pixel_values):
if self.visual_prompter_apply not in ("add", "remove", "replace"):
raise ValueError(f"Invalid visual_prompter_apply value {self.visual_prompter_apply}")
if self.visual_prompter_apply in ("replace", "remove"):
visual_prompt_mask = torch.ones([self.max_img_size, self.max_img_size], dtype=pixel_values.dtype)
pixel_values *= visual_prompt_mask
if self.visual_prompter_apply in ("replace", "add"):
base = torch.zeros(1, self.num_frames, 3, self.base_size, self.base_size)
prompt = torch.cat([self.pad_left, base, self.pad_right], dim=4)
prompt = torch.cat([self.pad_up, prompt, self.pad_down], dim=3)
prompt = torch.cat(pixel_values.size(0) * [prompt])
pixel_values += prompt.to(pixel_values.dtype)
return pixel_values
TVP_PROMPTER_CLASSES_MAPPING = {
"framedownpad": TvpFrameDownPadPrompter,
"framepad": TvpFramePadPrompter,
}
@add_start_docstrings(
"The bare Tvp Model transformer outputting BaseModelOutputWithPooling object without any specific head on" " top.",
TVP_START_DOCSTRING,
)
class TvpModel(TvpPreTrainedModel):
def __init__(self, config):
super().__init__(config)
self.config = config
self.vision_model = TvpVisionModel(config)
self.embeddings = TvpTextInputEmbeddings(config)
self.visual_embeddings = TvpVisualInputEmbedding(config)
self.encoder = TvpEncoder(config)
self.pooler = TvpPooler(config)
self.text_prompt = nn.Parameter(torch.randn([1, 10, config.hidden_size]))
self.dropout = nn.Dropout(config.hidden_dropout_prob)
if config.visual_prompter_type not in TVP_PROMPTER_CLASSES_MAPPING:
raise ValueError("`visual_prompter_type` must be in (framedownpad, framepad)")
self.visual_prompter = TVP_PROMPTER_CLASSES_MAPPING[config.visual_prompter_type](config)
self.post_init()
def get_input_embeddings(self):
return self.embeddings.word_embeddings
def set_input_embeddings(self, value):
self.embeddings.word_embeddings = value
def _prune_heads(self, heads_to_prune):
"""Prunes heads of the model.
heads_to_prune: dict of {layer_num: list of heads to prune in this layer} See base class PreTrainedModel
"""
for layer, heads in heads_to_prune.items():
self.encoder.layer[layer].attention.prune_heads(heads)
@add_start_docstrings_to_model_forward(TVP_INPUTS_DOCSTRING)
@replace_return_docstrings(output_type=BaseModelOutputWithPooling, config_class=TvpConfig)
def forward(
self,
input_ids: Optional[torch.LongTensor] = None,
pixel_values: Optional[torch.FloatTensor] = None,
attention_mask: Optional[torch.LongTensor] = None,
head_mask: Optional[torch.FloatTensor] = None,
output_attentions: Optional[bool] = None,
output_hidden_states: Optional[bool] = None,
return_dict: Optional[bool] = None,
):
r"""
Returns:
Examples:
```python
>>> import torch
>>> from transformers import AutoConfig, AutoTokenizer, TvpModel
>>> model = TvpModel.from_pretrained("Jiqing/tiny-random-tvp")
>>> tokenizer = AutoTokenizer.from_pretrained("Jiqing/tiny-random-tvp")
>>> pixel_values = torch.rand(1, 1, 3, 448, 448)
>>> text_inputs = tokenizer("This is an example input", return_tensors="pt")
>>> output = model(text_inputs.input_ids, pixel_values, text_inputs.attention_mask)
```"""
return_dict = return_dict if return_dict is not None else self.config.return_dict
# Add visual prompt, it compensates for the spatiotemporal information loss in 2D visual features.
pixel_values = self.vision_model(self.visual_prompter(pixel_values))
# (batch_size, sequence_length, hidden_size)
text_embedding_output = self.embeddings(input_ids=input_ids)
# (batch_size, visual_sequence_length, hidden_size)
visual_embedding_output = self.visual_embeddings(pixel_values)
if attention_mask is not None:
# (batch_size, visual_sequence_length)
visual_attention_mask = attention_mask.new_ones(visual_embedding_output.shape[:2])
pt_mask = torch.ones(attention_mask.shape[0], 10).to(
device=attention_mask.device, dtype=attention_mask.dtype
)
attention_mask = torch.cat([pt_mask, attention_mask, visual_attention_mask], dim=-1)
# We can provide a self-attention mask of dimensions [batch_size, from_seq_length, to_seq_length]
# ourselves in which case we just need to make it broadcastable to all heads.
attention_mask = self.get_extended_attention_mask(attention_mask, input_ids.size()).to(input_ids.device)
text_prompt = self.text_prompt.expand(text_embedding_output.shape[0], -1, -1)
# (batch_size, sequence_length + visual_sequence_length, hidden_size)
embedding_output = torch.cat([text_prompt, text_embedding_output, visual_embedding_output], dim=1)
encoder_outputs = self.encoder(
embedding_output,
attention_mask=attention_mask,
head_mask=self.get_head_mask(head_mask, self.config.num_hidden_layers),
output_attentions=output_attentions,
output_hidden_states=output_hidden_states,
return_dict=return_dict,
)
last_hidden_state = encoder_outputs.last_hidden_state if return_dict else encoder_outputs[0]
pooled_output = self.pooler(last_hidden_state)
last_hidden_state = self.dropout(last_hidden_state)
pooled_output = self.dropout(pooled_output)
if not return_dict:
return (last_hidden_state, pooled_output) + encoder_outputs[1:]
return BaseModelOutputWithPooling(
last_hidden_state=last_hidden_state,
pooler_output=pooled_output,
hidden_states=encoder_outputs.hidden_states,
attentions=encoder_outputs.attentions,
)
class TvpVideoGroundingHead(nn.Module):
def __init__(self, config):
super().__init__()
self.layer_0 = nn.Linear(config.hidden_size, config.hidden_size * 2)
self.layer_1 = nn.Linear(config.hidden_size * 2, 2)
self.activation_0 = nn.ReLU()
self.activation_1 = nn.Sigmoid()
def forward(self, pooler_output):
logits = self.activation_0(self.layer_0(pooler_output))
logits = self.activation_1(self.layer_1(logits))
return logits
@add_start_docstrings(
"""
Tvp Model with a video grounding head on top computing IoU, distance, and duration loss.
""",
TVP_START_DOCSTRING,
)
class TvpForVideoGrounding(TvpPreTrainedModel):
def __init__(self, config):
super().__init__(config)
self.config = config
self.model = TvpModel(config)
self.video_grounding_head = TvpVideoGroundingHead(config)
self.post_init()
@add_start_docstrings_to_model_forward(TVP_INPUTS_DOCSTRING)
@replace_return_docstrings(output_type=TvpVideoGroundingOutput, config_class=TvpConfig)
def forward(
self,
input_ids: Optional[torch.LongTensor] = None,
pixel_values: Optional[torch.FloatTensor] = None,
attention_mask: Optional[torch.LongTensor] = None,
labels: Tuple[torch.Tensor] = None,
head_mask: Optional[torch.FloatTensor] = None,
output_attentions: Optional[bool] = None,
output_hidden_states: Optional[bool] = None,
return_dict: Optional[bool] = None,
):
r"""
labels (`torch.FloatTensor` of shape `(batch_size, 3)`, *optional*):
The labels contains duration, start time, and end time of the video corresponding to the text.
Returns:
Examples:
```python
>>> import torch
>>> from transformers import AutoConfig, AutoTokenizer, TvpForVideoGrounding
>>> model = TvpForVideoGrounding.from_pretrained("Jiqing/tiny-random-tvp")
>>> tokenizer = AutoTokenizer.from_pretrained("Jiqing/tiny-random-tvp")
>>> pixel_values = torch.rand(1, 1, 3, 448, 448)
>>> text_inputs = tokenizer("This is an example input", return_tensors="pt")
>>> output = model(text_inputs.input_ids, pixel_values, text_inputs.attention_mask)
```"""
return_dict = return_dict if return_dict is not None else self.config.return_dict
outputs = self.model(
input_ids,
pixel_values,
attention_mask,
head_mask=head_mask,
output_attentions=output_attentions,
output_hidden_states=output_hidden_states,
return_dict=return_dict,
)
pooler_output = outputs[1]
logits = self.video_grounding_head(pooler_output)
loss = None
if labels is not None:
criterion = TvpLoss(["iou", "distance", "duration"])
criterion.to(self.device)
loss_dict = criterion(logits, labels)
loss = (
loss_dict["iou"]
+ self.config.distance_loss_weight * loss_dict["distance"]
+ self.config.duration_loss_weight * loss_dict["duration"]
)
if not return_dict:
outputs = (logits,) + outputs[2:]
if loss is not None:
outputs = (loss,) + outputs
return outputs
return TvpVideoGroundingOutput(
loss=loss,
logits=logits,
hidden_states=outputs.hidden_states,
attentions=outputs.attentions,
)
# coding=utf-8
# Copyright 2023 The Intel AIA Team Authors, and HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License=, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing=, software
# distributed under the License is distributed on an "AS IS" BASIS=,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND=, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Processor class for TVP.
"""
from ...processing_utils import ProcessorMixin
from ...tokenization_utils_base import BatchEncoding
class TvpProcessor(ProcessorMixin):
r"""
Constructs an TVP processor which wraps a TVP image processor and a Bert tokenizer into a single processor.
[`TvpProcessor`] offers all the functionalities of [`TvpImageProcessor`] and [`BertTokenizerFast`]. See the
[`~TvpProcessor.__call__`] and [`~TvpProcessor.decode`] for more information.
Args:
image_processor ([`TvpImageProcessor`], *optional*):
The image processor is a required input.
tokenizer ([`BertTokenizerFast`], *optional*):
The tokenizer is a required input.
"""
attributes = ["image_processor", "tokenizer"]
image_processor_class = "TvpImageProcessor"
tokenizer_class = ("BertTokenizer", "BertTokenizerFast")
def __init__(self, image_processor=None, tokenizer=None, **kwargs):
if image_processor is None:
raise ValueError("You need to specify an `image_processor`.")
if tokenizer is None:
raise ValueError("You need to specify a `tokenizer`.")
super().__init__(image_processor, tokenizer)
def __call__(self, text=None, videos=None, return_tensors=None, **kwargs):
"""
Main method to prepare for the model one or several sequences(s) and image(s). This method forwards the `text`
and `kwargs` arguments to BertTokenizerFast's [`~BertTokenizerFast.__call__`] if `text` is not `None` to encode
the text. To prepare the image(s), this method forwards the `videos` and `kwargs` arguments to
TvpImageProcessor's [`~TvpImageProcessor.__call__`] if `videos` is not `None`. Please refer to the doctsring of
the above two methods for more information.
Args:
text (`str`, `List[str]`, `List[List[str]]`):
The sequence or batch of sequences to be encoded. Each sequence can be a string or a list of strings
(pretokenized string). If the sequences are provided as list of strings (pretokenized), you must set
`is_split_into_words=True` (to lift the ambiguity with a batch of sequences).
videos (`List[PIL.Image.Image]`, `List[np.ndarray]`, `List[torch.Tensor]`, `List[List[PIL.Image.Image]]`, `List[List[np.ndarrray]]`,:
`List[List[torch.Tensor]]`): The video or batch of videos to be prepared. Each video should be a list
of frames, which can be either PIL images or NumPy arrays. In case of NumPy arrays/PyTorch tensors,
each frame should be of shape (H, W, C), where H and W are frame height and width, and C is a number of
channels.
return_tensors (`str` or [`~utils.TensorType`], *optional*):
If set, will return tensors of a particular framework. Acceptable values are:
- `'tf'`: Return TensorFlow `tf.constant` objects.
- `'pt'`: Return PyTorch `torch.Tensor` objects.
- `'np'`: Return NumPy `np.ndarray` objects.
- `'jax'`: Return JAX `jnp.ndarray` objects.
Returns:
[`BatchEncoding`]: A [`BatchEncoding`] with the following fields:
- **input_ids** -- List of token ids to be fed to a model. Returned when `text` is not `None`.
- **attention_mask** -- List of indices specifying which tokens should be attended to by the model (when
`return_attention_mask=True` or if *"attention_mask"* is in `self.model_input_names` and if `text` is not
`None`).
- **pixel_values** -- Pixel values to be fed to a model. Returned when `videos` is not `None`.
"""
max_text_length = kwargs.pop("max_text_length", None)
if text is None and videos is None:
raise ValueError("You have to specify either text or videos. Both cannot be none.")
encoding = {}
if text is not None:
textual_input = self.tokenizer.batch_encode_plus(
text,
truncation=True,
padding="max_length",
max_length=max_text_length,
pad_to_max_length=True,
return_tensors=return_tensors,
return_token_type_ids=False,
**kwargs,
)
encoding.update(textual_input)
if videos is not None:
image_features = self.image_processor(videos, return_tensors=return_tensors, **kwargs)
encoding.update(image_features)
return BatchEncoding(data=encoding, tensor_type=return_tensors)
def batch_decode(self, *args, **kwargs):
"""
This method forwards all its arguments to BertTokenizerFast's [`~PreTrainedTokenizer.batch_decode`]. Please
refer to the docstring of this method for more information.
"""
return self.tokenizer.batch_decode(*args, **kwargs)
def decode(self, *args, **kwargs):
"""
This method forwards all its arguments to BertTokenizerFast's [`~PreTrainedTokenizer.decode`]. Please refer to
the docstring of this method for more information.
"""
return self.tokenizer.decode(*args, **kwargs)
def post_process_video_grounding(self, logits, video_durations):
"""
Compute the time of the video.
Args:
logits (`torch.Tensor`):
The logits output of TvpForVideoGrounding.
video_durations (`float`):
The video's duration.
Returns:
start (`float`):
The start time of the video.
end (`float`):
The end time of the video.
"""
start, end = (
round(logits.tolist()[0][0] * video_durations, 1),
round(logits.tolist()[0][1] * video_durations, 1),
)
return start, end
@property
# Copied from transformers.models.blip.processing_blip.BlipProcessor.model_input_names
def model_input_names(self):
tokenizer_input_names = self.tokenizer.model_input_names
image_processor_input_names = self.image_processor.model_input_names
return list(dict.fromkeys(tokenizer_input_names + image_processor_input_names))
......@@ -7829,6 +7829,30 @@ class TvltPreTrainedModel(metaclass=DummyObject):
requires_backends(self, ["torch"])
TVP_PRETRAINED_MODEL_ARCHIVE_LIST = None
class TvpForVideoGrounding(metaclass=DummyObject):
_backends = ["torch"]
def __init__(self, *args, **kwargs):
requires_backends(self, ["torch"])
class TvpModel(metaclass=DummyObject):
_backends = ["torch"]
def __init__(self, *args, **kwargs):
requires_backends(self, ["torch"])
class TvpPreTrainedModel(metaclass=DummyObject):
_backends = ["torch"]
def __init__(self, *args, **kwargs):
requires_backends(self, ["torch"])
class UMT5EncoderModel(metaclass=DummyObject):
_backends = ["torch"]
......
......@@ -485,6 +485,13 @@ class TvltImageProcessor(metaclass=DummyObject):
requires_backends(self, ["vision"])
class TvpImageProcessor(metaclass=DummyObject):
_backends = ["vision"]
def __init__(self, *args, **kwargs):
requires_backends(self, ["vision"])
class VideoMAEFeatureExtractor(metaclass=DummyObject):
_backends = ["vision"]
......
# coding=utf-8
# Copyright 2023 The Intel Team Authors, The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
from typing import Dict, List, Optional, Union
import numpy as np
from transformers.image_transforms import PaddingMode
from transformers.testing_utils import require_torch, require_vision
from transformers.utils import is_torch_available, is_vision_available
from ...test_image_processing_common import ImageProcessingTestMixin, prepare_video_inputs
if is_torch_available():
import torch
if is_vision_available():
from PIL import Image
from transformers import TvpImageProcessor
class TvpImageProcessingTester(unittest.TestCase):
def __init__(
self,
parent,
do_resize: bool = True,
size: Dict[str, int] = {"longest_edge": 40},
do_center_crop: bool = False,
crop_size: Dict[str, int] = None,
do_rescale: bool = False,
rescale_factor: Union[int, float] = 1 / 255,
do_pad: bool = True,
pad_size: Dict[str, int] = {"height": 80, "width": 80},
fill: int = None,
pad_mode: PaddingMode = None,
do_normalize: bool = True,
image_mean: Optional[Union[float, List[float]]] = [0.48145466, 0.4578275, 0.40821073],
image_std: Optional[Union[float, List[float]]] = [0.26862954, 0.26130258, 0.27577711],
batch_size=2,
min_resolution=40,
max_resolution=80,
num_channels=3,
num_frames=2,
):
self.do_resize = do_resize
self.size = size
self.do_center_crop = do_center_crop
self.crop_size = crop_size
self.do_rescale = do_rescale
self.rescale_factor = rescale_factor
self.do_pad = do_pad
self.pad_size = pad_size
self.fill = fill
self.pad_mode = pad_mode
self.do_normalize = do_normalize
self.image_mean = image_mean
self.image_std = image_std
self.batch_size = batch_size
self.num_channels = num_channels
self.min_resolution = min_resolution
self.max_resolution = max_resolution
self.num_frames = num_frames
def prepare_image_processor_dict(self):
return {
"image_mean": self.image_mean,
"image_std": self.image_std,
"do_normalize": self.do_normalize,
"do_resize": self.do_resize,
"size": self.size,
"do_rescale": self.do_rescale,
"do_center_crop": self.do_center_crop,
"do_pad": self.do_pad,
"pad_size": self.pad_size,
}
def get_expected_values(self, image_inputs, batched=False):
"""
This function computes the expected height and width when providing images to TvpImageProcessor,
assuming do_resize is set to True with a scalar size.
"""
if not batched:
return (int(self.pad_size["height"]), int(self.pad_size["width"]))
else:
expected_values = []
for image in image_inputs:
expected_height, expected_width = self.get_expected_values([image])
expected_values.append((expected_height, expected_width))
expected_height = max(expected_values, key=lambda item: item[0])[0]
expected_width = max(expected_values, key=lambda item: item[1])[1]
return expected_height, expected_width
def prepare_video_inputs(self, equal_resolution=False, numpify=False, torchify=False):
return prepare_video_inputs(
batch_size=self.batch_size,
num_frames=self.num_frames,
num_channels=self.num_channels,
min_resolution=self.min_resolution,
max_resolution=self.max_resolution,
equal_resolution=equal_resolution,
numpify=numpify,
torchify=torchify,
)
@require_torch
@require_vision
class TvpImageProcessingTest(ImageProcessingTestMixin, unittest.TestCase):
image_processing_class = TvpImageProcessor if is_vision_available() else None
def setUp(self):
self.image_processor_tester = TvpImageProcessingTester(self)
@property
def image_processor_dict(self):
return self.image_processor_tester.prepare_image_processor_dict()
def test_image_processor_properties(self):
image_processing = self.image_processing_class(**self.image_processor_dict)
self.assertTrue(hasattr(image_processing, "image_mean"))
self.assertTrue(hasattr(image_processing, "image_std"))
self.assertTrue(hasattr(image_processing, "do_normalize"))
self.assertTrue(hasattr(image_processing, "do_resize"))
self.assertTrue(hasattr(image_processing, "do_center_crop"))
self.assertTrue(hasattr(image_processing, "size"))
self.assertTrue(hasattr(image_processing, "do_rescale"))
self.assertTrue(hasattr(image_processing, "do_pad"))
self.assertTrue(hasattr(image_processing, "pad_size"))
def test_image_processor_from_dict_with_kwargs(self):
image_processor = self.image_processing_class.from_dict(self.image_processor_dict)
self.assertEqual(image_processor.size, {"longest_edge": 40})
image_processor = self.image_processing_class.from_dict(self.image_processor_dict, size={"longest_edge": 12})
self.assertEqual(image_processor.size, {"longest_edge": 12})
def test_call_pil(self):
# Initialize image_processing
image_processing = self.image_processing_class(**self.image_processor_dict)
# create random PIL videos
video_inputs = self.image_processor_tester.prepare_video_inputs(equal_resolution=False)
for video in video_inputs:
self.assertIsInstance(video, list)
self.assertIsInstance(video[0], Image.Image)
# Test not batched input
expected_height, expected_width = self.image_processor_tester.get_expected_values(video_inputs)
encoded_videos = image_processing(video_inputs[0], return_tensors="pt").pixel_values
self.assertEqual(
encoded_videos.shape,
(
1,
self.image_processor_tester.num_frames,
self.image_processor_tester.num_channels,
expected_height,
expected_width,
),
)
# Test batched
expected_height, expected_width = self.image_processor_tester.get_expected_values(video_inputs, batched=True)
encoded_videos = image_processing(video_inputs, return_tensors="pt").pixel_values
self.assertEqual(
encoded_videos.shape,
(
self.image_processor_tester.batch_size,
self.image_processor_tester.num_frames,
self.image_processor_tester.num_channels,
expected_height,
expected_width,
),
)
def test_call_numpy(self):
# Initialize image_processing
image_processing = self.image_processing_class(**self.image_processor_dict)
# create random numpy tensors
video_inputs = self.image_processor_tester.prepare_video_inputs(equal_resolution=False, numpify=True)
for video in video_inputs:
self.assertIsInstance(video, list)
self.assertIsInstance(video[0], np.ndarray)
# Test not batched input
expected_height, expected_width = self.image_processor_tester.get_expected_values(video_inputs)
encoded_videos = image_processing(video_inputs[0], return_tensors="pt").pixel_values
self.assertEqual(
encoded_videos.shape,
(
1,
self.image_processor_tester.num_frames,
self.image_processor_tester.num_channels,
expected_height,
expected_width,
),
)
# Test batched
expected_height, expected_width = self.image_processor_tester.get_expected_values(video_inputs, batched=True)
encoded_videos = image_processing(video_inputs, return_tensors="pt").pixel_values
self.assertEqual(
encoded_videos.shape,
(
self.image_processor_tester.batch_size,
self.image_processor_tester.num_frames,
self.image_processor_tester.num_channels,
expected_height,
expected_width,
),
)
def test_call_numpy_4_channels(self):
# Initialize image_processing
image_processing = self.image_processing_class(**self.image_processor_dict)
# create random numpy tensors
video_inputs = self.image_processor_tester.prepare_video_inputs(equal_resolution=False, numpify=True)
for video in video_inputs:
self.assertIsInstance(video, list)
self.assertIsInstance(video[0], np.ndarray)
# Test not batched input
expected_height, expected_width = self.image_processor_tester.get_expected_values(video_inputs)
encoded_videos = image_processing(
video_inputs[0], return_tensors="pt", image_mean=0, image_std=1, input_data_format="channels_first"
).pixel_values
self.assertEqual(
encoded_videos.shape,
(
1,
self.image_processor_tester.num_frames,
self.image_processor_tester.num_channels,
expected_height,
expected_width,
),
)
# Test batched
expected_height, expected_width = self.image_processor_tester.get_expected_values(video_inputs, batched=True)
encoded_videos = image_processing(
video_inputs, return_tensors="pt", image_mean=0, image_std=1, input_data_format="channels_first"
).pixel_values
self.assertEqual(
encoded_videos.shape,
(
self.image_processor_tester.batch_size,
self.image_processor_tester.num_frames,
self.image_processor_tester.num_channels,
expected_height,
expected_width,
),
)
self.image_processor_tester.num_channels = 3
def test_call_pytorch(self):
# Initialize image_processing
image_processing = self.image_processing_class(**self.image_processor_dict)
# create random PyTorch tensors
video_inputs = self.image_processor_tester.prepare_video_inputs(equal_resolution=False, torchify=True)
for video in video_inputs:
self.assertIsInstance(video, list)
self.assertIsInstance(video[0], torch.Tensor)
# Test not batched input
expected_height, expected_width = self.image_processor_tester.get_expected_values(video_inputs)
encoded_videos = image_processing(video_inputs[0], return_tensors="pt").pixel_values
self.assertEqual(
encoded_videos.shape,
(
1,
self.image_processor_tester.num_frames,
self.image_processor_tester.num_channels,
expected_height,
expected_width,
),
)
# Test batched
expected_height, expected_width = self.image_processor_tester.get_expected_values(video_inputs, batched=True)
encoded_videos = image_processing(video_inputs, return_tensors="pt").pixel_values
self.assertEqual(
encoded_videos.shape,
(
self.image_processor_tester.batch_size,
self.image_processor_tester.num_frames,
self.image_processor_tester.num_channels,
expected_height,
expected_width,
),
)
# coding=utf-8
# Copyright 2023 The Intel Team Authors, The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
""" Testing suite for the PyTorch TVP model. """
import unittest
from transformers import ResNetConfig, TvpConfig
from transformers.testing_utils import require_torch, require_vision, torch_device
from transformers.utils import cached_property, is_torch_available, is_vision_available
from ...test_modeling_common import (
ModelTesterMixin,
_config_zero_init,
floats_tensor,
ids_tensor,
random_attention_mask,
)
from ...test_pipeline_mixin import PipelineTesterMixin
if is_torch_available():
import torch
from transformers import TvpForVideoGrounding, TvpModel
if is_vision_available():
from PIL import Image
from transformers import TvpImageProcessor
# Copied from test.models.videomae.test_modeling_videomae.VideoMAEModelTester with VideoMAE->TVP
class TVPModelTester:
def __init__(
self,
parent,
batch_size=1,
seq_length=2,
alpha=1.0,
beta=0.1,
visual_prompter_type="framepad",
visual_prompter_apply="replace",
num_frames=2,
max_img_size=448,
visual_prompt_size=96,
vocab_size=100,
hidden_size=32,
intermediate_size=32,
num_hidden_layers=2,
num_attention_heads=4,
max_position_embeddings=30,
max_grid_col_position_embeddings=30,
max_grid_row_position_embeddings=30,
hidden_dropout_prob=0.1,
hidden_act="gelu",
layer_norm_eps=1e-12,
initializer_range=0.02,
pad_token_id=0,
type_vocab_size=2,
attention_probs_dropout_prob=0.1,
):
self.parent = parent
self.batch_size = batch_size
self.input_id_length = seq_length
self.seq_length = seq_length + 10 + 784 # include text prompt length and visual input length
self.alpha = alpha
self.beta = beta
self.visual_prompter_type = visual_prompter_type
self.visual_prompter_apply = visual_prompter_apply
self.num_frames = num_frames
self.max_img_size = max_img_size
self.visual_prompt_size = visual_prompt_size
self.vocab_size = vocab_size
self.hidden_size = hidden_size
self.num_hidden_layers = num_hidden_layers
self.num_attention_heads = num_attention_heads
self.intermediate_size = intermediate_size
self.hidden_act = hidden_act
self.hidden_dropout_prob = hidden_dropout_prob
self.attention_probs_dropout_prob = attention_probs_dropout_prob
self.max_position_embeddings = max_position_embeddings
self.max_grid_col_position_embeddings = max_grid_col_position_embeddings
self.max_grid_row_position_embeddings = max_grid_row_position_embeddings
self.layer_norm_eps = layer_norm_eps
self.initializer_range = initializer_range
self.pad_token_id = pad_token_id
self.type_vocab_size = type_vocab_size
self.is_training = False
self.num_channels = 3
def prepare_config_and_inputs(self):
input_ids = ids_tensor([self.batch_size, self.input_id_length], self.vocab_size)
attention_mask = random_attention_mask([self.batch_size, self.input_id_length])
pixel_values = floats_tensor(
[self.batch_size, self.num_frames, self.num_channels, self.max_img_size, self.max_img_size]
)
config = self.get_config()
return (config, input_ids, pixel_values, attention_mask)
def get_config(self):
resnet_config = ResNetConfig(
num_channels=3,
embeddings_size=64,
hidden_sizes=[64, 128],
depths=[2, 2],
hidden_act="relu",
out_features=["stage2"],
out_indices=[2],
)
return TvpConfig(
backbone_config=resnet_config,
alpha=self.alpha,
beta=self.beta,
visual_prompter_type=self.visual_prompter_type,
visual_prompter_apply=self.visual_prompter_apply,
num_frames=self.num_frames,
max_img_size=self.max_img_size,
visual_prompt_size=self.visual_prompt_size,
vocab_size=self.vocab_size,
hidden_size=self.hidden_size,
num_hidden_layers=self.num_hidden_layers,
num_attention_heads=self.num_attention_heads,
intermediate_size=self.intermediate_size,
hidden_act=self.hidden_act,
hidden_dropout_prob=self.hidden_dropout_prob,
attention_probs_dropout_prob=self.attention_probs_dropout_prob,
max_position_embeddings=self.max_position_embeddings,
max_grid_col_position_embeddings=self.max_grid_col_position_embeddings,
max_grid_row_position_embeddings=self.max_grid_row_position_embeddings,
layer_norm_eps=self.layer_norm_eps,
initializer_range=self.initializer_range,
pad_token_id=self.pad_token_id,
type_vocab_size=self.type_vocab_size,
)
def create_and_check_model(self, config, input_ids, pixel_values, attention_mask):
model = TvpModel(config=config)
model.to(torch_device)
model.eval()
result = model(input_ids, pixel_values, attention_mask)
self.parent.assertEqual(result.last_hidden_state.shape, (self.batch_size, self.seq_length, self.hidden_size))
def prepare_config_and_inputs_for_common(self):
config_and_inputs = self.prepare_config_and_inputs()
config, input_ids, pixel_values, attention_mask = config_and_inputs
inputs_dict = {"input_ids": input_ids, "pixel_values": pixel_values, "attention_mask": attention_mask}
return config, inputs_dict
@require_torch
class TVPModelTest(ModelTesterMixin, PipelineTesterMixin, unittest.TestCase):
"""
Here we also overwrite some of the tests of test_modeling_common.py, as TVP does not use, inputs_embeds.
The seq_length in TVP contain textual and visual inputs, and prompt.
"""
all_model_classes = (TvpModel, TvpForVideoGrounding) if is_torch_available() else ()
pipeline_model_mapping = (
{"feature-extraction": TvpModel, "temporal-video-grounding": TvpForVideoGrounding}
if is_torch_available()
else {}
)
def setUp(self):
self.model_tester = TVPModelTester(self)
def test_model(self):
config_and_inputs = self.model_tester.prepare_config_and_inputs()
self.model_tester.create_and_check_model(*config_and_inputs)
@unittest.skip(reason="TVP does not use inputs_embeds")
def test_inputs_embeds(self):
pass
@unittest.skip(reason="TVPModel does not have input/output embeddings")
def test_model_common_attributes(self):
pass
# override as the `logit_scale` parameter initilization is different for TVP
def test_initialization(self):
config, inputs_dict = self.model_tester.prepare_config_and_inputs_for_common()
configs_no_init = _config_zero_init(config)
for model_class in self.all_model_classes:
model = model_class(config=configs_no_init)
for name, param in model.named_parameters():
if param.requires_grad:
# params are randomly initialized.
self.assertAlmostEqual(
param.data.mean().item(),
0.0,
delta=1.0,
msg=f"Parameter {name} of model {model_class} seems not properly initialized",
)
# We will verify our results on an image of cute cats
def prepare_img():
image = Image.open("./tests/fixtures/tests_samples/COCO/000000039769.png")
return image
@require_vision
@require_torch
class TvpModelIntegrationTests(unittest.TestCase):
@cached_property
def default_image_processor(self):
return TvpImageProcessor.from_pretrained("Jiqing/tiny-random-tvp") if is_vision_available() else None
def test_inference_no_head(self):
model = TvpModel.from_pretrained("Jiqing/tiny-random-tvp").to(torch_device)
image_processor = self.default_image_processor
image = prepare_img()
encoding = image_processor(images=image, return_tensors="pt").to(torch_device)
input_ids = torch.tensor([[1, 2]])
attention_mask = torch.tensor([[1, 1]])
encoding.update({"input_ids": input_ids, "attention_mask": attention_mask})
with torch.no_grad():
outputs = model(**encoding)
expected_shape = torch.Size((1, 796, 128))
assert outputs.last_hidden_state.shape == expected_shape
expected_slice = torch.tensor(
[[-0.4902, -0.4121, -1.7872], [-0.2184, 2.1211, -0.9371], [0.1180, 0.5003, -0.1727]]
).to(torch_device)
self.assertTrue(torch.allclose(outputs.last_hidden_state[0, :3, :3], expected_slice, atol=1e-4))
def test_inference_with_head(self):
model = TvpForVideoGrounding.from_pretrained("Jiqing/tiny-random-tvp").to(torch_device)
image_processor = self.default_image_processor
image = prepare_img()
encoding = image_processor(images=image, return_tensors="pt").to(torch_device)
input_ids = torch.tensor([[1, 2]])
attention_mask = torch.tensor([[1, 1]])
encoding.update({"input_ids": input_ids, "attention_mask": attention_mask})
with torch.no_grad():
outputs = model(**encoding)
expected_shape = torch.Size((1, 2))
assert outputs.logits.shape == expected_shape
expected_slice = torch.tensor([[0.5061, 0.4988]]).to(torch_device)
self.assertTrue(torch.allclose(outputs.logits, expected_slice, atol=1e-4))
......@@ -114,7 +114,7 @@ IGNORE_NON_TESTED = PRIVATE_MODELS.copy() + [
"BridgeTowerTextModel", # No need to test it as it is tested by BridgeTowerModel model.
"BridgeTowerVisionModel", # No need to test it as it is tested by BridgeTowerModel model.
"BarkCausalModel", # Building part of bigger (tested) model.
"BarkModel", # Does not have a forward signature - generation tested with integration tests
"BarkModel", # Does not have a forward signature - generation tested with integration tests.
"SeamlessM4TTextToUnitModel", # Building part of bigger (tested) model.
"SeamlessM4TCodeHifiGan", # Building part of bigger (tested) model.
"SeamlessM4TTextToUnitForConditionalGeneration", # Building part of bigger (tested) model.
......@@ -293,6 +293,7 @@ IGNORE_NON_AUTO_CONFIGURED = PRIVATE_MODELS.copy() + [
"SeamlessM4TTextToUnitForConditionalGeneration",
"SeamlessM4TCodeHifiGan",
"SeamlessM4TForSpeechToSpeech", # no auto class for speech-to-speech
"TvpForVideoGrounding",
]
# DO NOT edit this list!
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment