Unverified Commit 6c811a32 authored by Stas Bekman's avatar Stas Bekman Committed by GitHub
Browse files

new model: IDEFICS via HuggingFaceM4 (#24796)



* rename

* restore

* mappings

* unedited tests+docs

* docs

* fixes

* fix auto-sync breakage

* cleanup

* wip

* wip

* add fetch_images

* remove einops dependency

* update

* fix

* fix

* fix

* fix

* fix

* re-add

* add batching

* rework

* fix

* improve

* add Leo as I am extending his work

* cleanup

* fix

* cleanup

* slow-test

* fix

* fix

* fixes

* deal with warning

* rename modified llama classes

* rework fetch_images

* alternative implementation

* cleanup

* strict version

* cleanup

* [`IDEFICS`] Fix idefics ci (#25056)

* Fix IDEFICS CI

* fix test file

* fixup

* some changes to make tests pass

* fix

* fixup

* Update src/transformers/models/idefics/configuration_idefics.py
Co-authored-by: default avatarStas Bekman <stas00@users.noreply.github.com>

---------
Co-authored-by: default avatarStas Bekman <stas00@users.noreply.github.com>

* remove compat checks

* style

* explain that Idefics is not for training from scratch

* require pt>=2.0

* fix idefics vision config (#25092)

* fix idefics vision config

* fixup

* clean

* Update src/transformers/models/idefics/configuration_idefics.py

---------
Co-authored-by: default avatarStas Bekman <stas00@users.noreply.github.com>

* cleanup

* style

* cleanup

* Apply suggestions from code review
Co-authored-by: default avatarSylvain Gugger <35901082+sgugger@users.noreply.github.com>

* upcase

* sequence of images

* handle the case with no images

* Update src/transformers/image_processing_utils.py
Co-authored-by: default avatarVictor SANH <victorsanh@gmail.com>

* support pure lm take 2

* support tokenizer options

* parameterize num_channels

* fix upcase

* s|IdeficsForCausalLM|IdeficsForVisionText2Text|g

* manual to one line

* addressing review

* unbreak

* remove clip dependency

* fix test

* consistency

* PIL import

* Idefics prefix

* Idefics prefix

* hack to make tests work

* style

* fix

* fix

* revert

* try/finally

* cleanup

* clean up

* move

* [`IDEFICS`] Fix idefics config refactor (#25149)

* refactor config

* nuke init weights

* more refactor

* oops

* remove visual question answering pipeline support

* Update src/transformers/models/idefics/clip.py
Co-authored-by: default avatarStas Bekman <stas00@users.noreply.github.com>

* Update src/transformers/models/idefics/modeling_idefics.py

* cleanup

* mv clip.py vision.py

* tidyup

---------
Co-authored-by: default avatarStas Bekman <stas00@users.noreply.github.com>
Co-authored-by: default avatarStas Bekman <stas@stason.org>

* fix

* license

* condition on pt

* fix

* style

* fix

* rm torchvision dependency, allow custom transforms

* address review

* rework device arg

* add_eos_token

* s/transforms/transform/

* fix top level imports

* fix return value

* cleanup

* cleanup

* fix

* style

* license

* license

* Update src/transformers/models/idefics/image_processing_idefics.py
Co-authored-by: default avatarSylvain Gugger <35901082+sgugger@users.noreply.github.com>

* add a wrapper to freeze vision layears

* tidyup

* use the correct std/mean settings

* parameterize values from config

* add tests/models/idefics/test_image_processing_idefics.py

* add test_processor_idefics.py

* cleanup

* cleanups

* fix

* fix

* move to the right group

* style

* Apply suggestions from code review
Co-authored-by: default avatarSylvain Gugger <35901082+sgugger@users.noreply.github.com>

* add perceiver config

* reset

* missing arg docs

* Apply suggestions from code review
Co-authored-by: default avatarLeo Tronchon <leo.tronchon@gmail.com>

* address review comments

* inject automatic end of utterance tokens (#25218)

* inject automatic end of utterance tokens

* fix

* fix

* fix

* rework to not use the config

* not end_of_utterance_token at the end

* Update src/transformers/models/idefics/processing_idefics.py
Co-authored-by: default avatarSylvain Gugger <35901082+sgugger@users.noreply.github.com>

* address review

* Apply suggestions from code review
Co-authored-by: default avatarJoao Gante <joaofranciscocardosogante@gmail.com>

* Update src/transformers/image_processing_utils.py
Co-authored-by: default avatarNicolas Patry <patry.nicolas@protonmail.com>

* [`Idefics`] add image_embeddings option in generate-related methods (#25442)

* add image_embeddings option in generate-related methods

* style

* rename image_embeddings and allow perceiver embeddings precomputation

* compute embeddings within generate

* make is_encoder_decoder= True the default in config

* nested if else fix

* better triple check

* switch if elif order for pixel values / img embeds

* update model_kwargs perceiver only at the end

* use _prepare_model_inputs instead of encoder_decoder logic

* fix comment typo

* fix config default for is_encoder_decoder

* style

* add typehints

* precompute in forward

* doc builder

* style

* pop instead of get image hidden states

* Trigger CI

* Update src/transformers/models/idefics/modeling_idefics.py
Co-authored-by: default avatarArthur <48595927+ArthurZucker@users.noreply.github.com>

* Update src/transformers/models/idefics/modeling_idefics.py
Co-authored-by: default avatarArthur <48595927+ArthurZucker@users.noreply.github.com>

* fix * + indentation + style

* simplify a bit the use_resampler logic using comments

* update diocstrings

* Trigger CI

---------
Co-authored-by: default avatarArthur <48595927+ArthurZucker@users.noreply.github.com>

* fix rebase changes

* unbreak #25237 - to be fixed in follow up PRs

* is_composition = False

* no longer needed

---------
Co-authored-by: default avatarleot13 <leo.tronchon@gmail.com>
Co-authored-by: default avatarYounes Belkada <49240599+younesbelkada@users.noreply.github.com>
Co-authored-by: default avatarSylvain Gugger <35901082+sgugger@users.noreply.github.com>
Co-authored-by: default avatarVictor SANH <victorsanh@gmail.com>
Co-authored-by: default avatarJoao Gante <joaofranciscocardosogante@gmail.com>
Co-authored-by: default avatarNicolas Patry <patry.nicolas@protonmail.com>
Co-authored-by: default avatarArthur <48595927+ArthurZucker@users.noreply.github.com>
parent 4d64157e
# coding=utf-8
# Copyright 2022 EleutherAI and the HuggingFace Inc. team. All rights reserved.
#
# This code is based on EleutherAI's GPT-NeoX library and the GPT-NeoX
# and OPT implementations in this library. It has been modified from its
# original forms to accommodate minor architectural differences compared
# to GPT-NeoX and OPT used by the Meta AI team that trained the model.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
""" Idefics model configuration"""
from ...configuration_utils import PretrainedConfig
from ...utils import logging
logger = logging.get_logger(__name__)
IDEFICS_PRETRAINED_CONFIG_ARCHIVE_MAP = {
"HuggingFaceM4/idefics-9b": "https://huggingface.co/HuggingFaceM4/idefics-9b/blob/main/config.json",
"HuggingFaceM4/idefics-80b": "https://huggingface.co/HuggingFaceM4/idefics-80b/blob/main/config.json",
}
class IdeficsVisionConfig(PretrainedConfig):
r"""
This is the configuration class to store the configuration of a [`IdeficsModel`]. It is used to instantiate an
Idefics model according to the specified arguments, defining the model architecture. Instantiating a configuration
with the defaults will yield a similar configuration to that of the Idefics-9B.
e.g. [HuggingFaceM4/idefics-9b](https://huggingface.co/HuggingFaceM4/idefics-9b)
Configuration objects inherit from [`PretrainedConfig`] and can be used to control the model outputs. Read the
documentation from [`PretrainedConfig`] for more information.
Args:
hidden_size (`int`, *optional*, defaults to 768):
Dimensionality of the encoder layers and the pooler layer. (elsewhere referred to as `hidden_size`)
image_size (`int`, *optional*, defaults to 224):
The size (resolution) of each image.
intermediate_size (`int`, *optional*, defaults to 5120):
Dimensionality of the "intermediate" (i.e., feed-forward) layer in the Transformer encoder.
patch_size (`int`, *optional*, defaults to 14):
The size (resolution) of each patch.
num_hidden_layers (`int`, *optional*, defaults to 32):
Number of hidden layers in the Transformer encoder.
num_attention_heads (`int`, *optional*, defaults to 16):
Number of attention heads for each attention layer in the Transformer encoder.
image_num_channels (`int`, *optional*, defaults to `3`):
Number of image channels.
hidden_act (`str` or `function`, *optional*, defaults to `"quick_gelu"`):
The non-linear activation function (function or string) in the encoder and pooler. If string, `"gelu"`,
`"relu"`, `"selu"` and `"gelu_new"` ``"quick_gelu"` are supported.
layer_norm_eps (`float`, *optional*, defaults to 1e-5):
The epsilon used by the layer normalization layers.
attention_dropout (`float`, *optional*, defaults to 0.0):
The dropout ratio for the attention probabilities.
initializer_range (`float`, *optional*, defaults to 0.02):
The standard deviation of the truncated_normal_initializer for initializing all weight matrices.
initializer_factor (`float`, *optional*, defaults to 1.0):
A factor for initializing all weight matrices (should be kept to 1.0, used internally for initialization
testing).
initializer_range (`float`, *optional*, defaults to 0.02):
The standard deviation of the truncated_normal_initializer for initializing all weight matrices.
"""
model_type = "idefics"
attribute_map = {
"hidden_size": "embed_dim",
}
def __init__(
self,
embed_dim=768,
image_size=224,
intermediate_size=5120,
patch_size=14,
num_hidden_layers=32,
num_attention_heads=16,
num_channels=3,
hidden_act="quick_gelu",
layer_norm_eps=1e-5,
attention_dropout=0.0,
initializer_range=0.02,
initializer_factor=1.0,
**kwargs,
):
self.embed_dim = embed_dim
self.image_size = image_size
self.intermediate_size = intermediate_size
self.patch_size = patch_size
self.num_hidden_layers = num_hidden_layers
self.num_attention_heads = num_attention_heads
self.num_channels = num_channels
self.layer_norm_eps = layer_norm_eps
self.attention_dropout = attention_dropout
self.initializer_range = initializer_range
self.initializer_factor = initializer_factor
self.hidden_act = hidden_act
super().__init__(**kwargs)
class IdeficsPerceiverConfig(PretrainedConfig):
r"""
This is the configuration class to store the configuration of a [`IdeficsModel`]. It is used to instantiate an
Idefics model according to the specified arguments, defining the model architecture. Instantiating a configuration
with the defaults will yield a similar configuration to that of the Idefics-9B.
e.g. [HuggingFaceM4/idefics-9b](https://huggingface.co/HuggingFaceM4/idefics-9b)
Configuration objects inherit from [`PretrainedConfig`] and can be used to control the model outputs. Read the
documentation from [`PretrainedConfig`] for more information.
Args:
use_resampler (`bool`, *optional*, defaults to `False`):
Whether or not to use the resampler
resampler_n_latents (`int`, *optional*, defaults to ):
Number of latent embeddings to resample ("compress") the input sequence to (usually < 128).
resampler_depth (`int`, *optional*, defaults to 6):
Depth of the Perceiver Resampler (Transformer w/ cross attention). Should be shallow (< 3).
resampler_n_heads (`int`, *optional*, defaults to 16):
Number of heads in each Transformer block (for multi-headed self-attention).
resampler_head_dim (`int`, *optional*, defaults to 96):
Dimensionality of each head projection in the Transformer block.
qk_layer_norms_perceiver (`bool`, *optional*, defaults to `False`):
Whether or not to use qk layer norms in perceiver
"""
model_type = "idefics"
def __init__(
self,
use_resampler=False,
resampler_n_latents=64,
resampler_depth=6,
resampler_n_heads=16,
resampler_head_dim=96,
qk_layer_norms_perceiver=False,
**kwargs,
):
self.use_resampler = use_resampler
self.resampler_n_latents = resampler_n_latents
self.resampler_depth = resampler_depth
self.resampler_n_heads = resampler_n_heads
self.resampler_head_dim = resampler_head_dim
self.qk_layer_norms_perceiver = qk_layer_norms_perceiver
super().__init__(**kwargs)
class IdeficsConfig(PretrainedConfig):
r"""
This is the configuration class to store the configuration of a [`IdeficsModel`]. It is used to instantiate an
Idefics model according to the specified arguments, defining the model architecture. Instantiating a configuration
with the defaults will yield a similar configuration to that of the Idefics-9B.
e.g. [HuggingFaceM4/idefics-9b](https://huggingface.co/HuggingFaceM4/idefics-9b)
Configuration objects inherit from [`PretrainedConfig`] and can be used to control the model outputs. Read the
documentation from [`PretrainedConfig`] for more information.
Args:
additional_vocab_size (`int`, *optional`, defaults to 0):
Additional vocabulary size of the model, typically for the special "<img>" token. Additional vocab tokens
are always trainable whereas regular vocab tokens can be frozen or not.
vocab_size (`int`, *optional*, defaults to 32000):
Vocabulary size of the Idefics model. Defines the number of different tokens that can be represented by the
`inputs_ids` passed when calling [`~IdeficsModel`]
hidden_size (`int`, *optional*, defaults to 4096):
Dimension of the hidden representations.
intermediate_size (`int`, *optional*, defaults to 11008):
Dimension of the MLP representations.
num_hidden_layers (`int`, *optional*, defaults to 32):
Number of hidden layers in the Transformer encoder.
num_attention_heads (`int`, *optional*, defaults to 32):
Number of attention heads for each attention layer in the Transformer encoder.
dropout (`float`, *optional*, defaults to 0.0):
The dropout probability for all fully connected layers in the embeddings, encoder, and pooler.
hidden_act (`str` or `function`, *optional*, defaults to `"silu"`):
The non-linear activation function (function or string) in the decoder.
initializer_range (`float`, *optional*, defaults to 0.02):
The standard deviation of the truncated_normal_initializer for initializing all weight matrices.
alpha_initializer (`str`, *optional*, defaults to `"zeros"`):
Initialization type for the alphas.
alphas_initializer_range (`float`, *optional*, defaults to 0.0):
The standard deviation of the truncated_normal_initializer for initializing the alphas in the Gated Cross
Attention.
alpha_type (`str`, *optional*, defaults to `"float"`):
Whether the gating alphas should be vectors or single floats.
rms_norm_eps (`float`, *optional*, defaults to 1e-6):
The epsilon used by the rms normalization layers.
use_cache (`bool`, *optional*, defaults to `True`):
Whether or not the model should return the last key/values attentions (not used by all models). Only
relevant if `config.is_decoder=True`.
pad_token_id (`int`, *optional*, defaults to 0)
Padding token id.
bos_token_id (`int`, *optional*, defaults to 1)
Beginning of stream token id.
eos_token_id (`int`, *optional*, defaults to 2)
End of stream token id.
tie_word_embeddings(`bool`, *optional*, defaults to `False`):
Whether to tie weight embeddings
cross_layer_interval (`int`, *optional*, default to 1)
Interval for cross attention (from text to image) layers.
qk_layer_norms (`bool`, *optional*, defaults to `False`): Whether to add layer norm after q and k
freeze_text_layers (`bool`, *optional*, defaults to `True`): Whether to freeze text layers
freeze_text_module_exceptions (`bool`, *optional*, defaults to `[]`):
Exceptions to freezing text layers when `freeze_text_layers` is `True`
freeze_lm_head (`bool`, *optional*, defaults to `False`): Whether to freeze lm head
freeze_vision_layers (`bool`, *optional*, defaults to `True`): Whether to freeze vision layers
freeze_vision_module_exceptions (`bool`, *optional*, defaults to `[]`):
Exceptions to freezing vision layers when `freeze_vision_layers` is `True`
use_resampler (`bool`, *optional*, defaults to `False`): Whether to use the Resampler
vision_config (`IdeficsVisionConfig`, *optional*): Custom vision config or dict
perceiver_config (`IdeficsPerceiverConfig`, *optional*): Custom perceiver config or dict
Example:
```python
>>> from transformers import IdeficsModel, IdeficsConfig
>>> # Initializing a Idefics idefics-9b style configuration
>>> configuration = IdeficsConfig()
>>> # Initializing a model from the idefics-9b style configuration
>>> model = IdeficsModel(configuration)
>>> # Accessing the model configuration
>>> configuration = model.config
```"""
model_type = "idefics"
is_composition = False
def __init__(
self,
vocab_size=32000,
additional_vocab_size=0,
hidden_size=4096,
intermediate_size=11008,
num_hidden_layers=32,
num_attention_heads=32,
dropout=0.0,
hidden_act="silu",
initializer_range=0.02,
alpha_initializer="zeros",
alphas_initializer_range=0.0,
alpha_type="float",
rms_norm_eps=1e-6,
use_cache=True,
pad_token_id=0,
bos_token_id=1,
eos_token_id=2,
tie_word_embeddings=False,
cross_layer_interval=1,
qk_layer_norms=False,
freeze_text_layers=True,
freeze_text_module_exceptions=[],
freeze_lm_head=False,
freeze_vision_layers=True,
freeze_vision_module_exceptions=[],
use_resampler=False,
vision_config=None,
perceiver_config=None,
**kwargs,
):
self.vocab_size = vocab_size
self.additional_vocab_size = additional_vocab_size
self.hidden_size = hidden_size
self.intermediate_size = intermediate_size
self.num_hidden_layers = num_hidden_layers
self.num_attention_heads = num_attention_heads
self.dropout = dropout
self.hidden_act = hidden_act
self.initializer_range = initializer_range
self.alpha_initializer = alpha_initializer
self.alphas_initializer_range = alphas_initializer_range
self.alpha_type = alpha_type
self.rms_norm_eps = rms_norm_eps
self.use_cache = use_cache
self.cross_layer_interval = cross_layer_interval
self.qk_layer_norms = qk_layer_norms
self.freeze_vision_layers = freeze_vision_layers
self.freeze_text_layers = freeze_text_layers
self.freeze_text_module_exceptions = freeze_text_module_exceptions
self.freeze_vision_module_exceptions = freeze_vision_module_exceptions
self.freeze_lm_head = freeze_lm_head
self.use_resampler = use_resampler
if perceiver_config is None:
self.perceiver_config = IdeficsPerceiverConfig()
elif isinstance(perceiver_config, dict):
self.perceiver_config = IdeficsPerceiverConfig(**perceiver_config)
elif isinstance(perceiver_config, IdeficsPerceiverConfig):
self.perceiver_config = perceiver_config
if vision_config is None:
self.vision_config = IdeficsVisionConfig()
elif isinstance(vision_config, dict):
self.vision_config = IdeficsVisionConfig(**vision_config)
elif isinstance(vision_config, IdeficsVisionConfig):
self.vision_config = vision_config
super().__init__(
pad_token_id=pad_token_id,
bos_token_id=bos_token_id,
eos_token_id=eos_token_id,
tie_word_embeddings=tie_word_embeddings,
**kwargs,
)
# IMPORTANT: Do not do any __init__ args-based checks in the constructor, since
# PretrainedConfig.from_dict first instantiates the class with the config dict and only then
# updates the config object with `kwargs` from from_pretrained, so during the instantiation
# of this object many attributes have default values and haven't yet been overridden.
# Do any required checks inside `from_pretrained` once the superclass' `from_pretrained` was run.
# coding=utf-8
# Copyright 2022 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Image processor class for Idefics."""
from typing import Callable, Dict, List, Optional, Union
from PIL import Image
from ...image_processing_utils import BaseImageProcessor, BatchFeature
from ...image_transforms import resize, to_channel_dimension_format
from ...image_utils import (
ChannelDimension,
ImageInput,
PILImageResampling,
make_list_of_images,
to_numpy_array,
valid_images,
)
from ...utils import TensorType, is_torch_available
IDEFICS_STANDARD_MEAN = [0.48145466, 0.4578275, 0.40821073]
IDEFICS_STANDARD_STD = [0.26862954, 0.26130258, 0.27577711]
def convert_to_rgb(image):
# `image.convert("RGB")` would only work for .jpg images, as it creates a wrong background
# for transparent images. The call to `alpha_composite` handles this case
if image.mode == "RGB":
return image
image_rgba = image.convert("RGBA")
background = Image.new("RGBA", image_rgba.size, (255, 255, 255))
alpha_composite = Image.alpha_composite(background, image_rgba)
alpha_composite = alpha_composite.convert("RGB")
return alpha_composite
class IdeficsImageProcessor(BaseImageProcessor):
r"""
Constructs a Idefics image processor.
Args:
image_size (`int`, *optional*, defaults to `224`):
Resize to image size
image_num_channels (`int`, *optional*, defaults to `3`):
Number of image channels.
image_mean (`float` or `List[float]`, *optional*, defaults to `IDEFICS_STANDARD_MEAN`):
Mean to use if normalizing the image. This is a float or list of floats the length of the number of
channels in the image. Can be overridden by the `image_mean` parameter in the `preprocess` method. Can be
overridden by the `image_mean` parameter in the `preprocess` method.
image_std (`float` or `List[float]`, *optional*, defaults to `IDEFICS_STANDARD_STD`):
Standard deviation to use if normalizing the image. This is a float or list of floats the length of the
number of channels in the image. Can be overridden by the `image_std` parameter in the `preprocess` method.
Can be overridden by the `image_std` parameter in the `preprocess` method.
"""
model_input_names = ["pixel_values"]
def __init__(
self,
image_size: int = 224,
image_mean: Optional[Union[float, List[float]]] = None,
image_std: Optional[Union[float, List[float]]] = None,
image_num_channels: Optional[int] = 3,
**kwargs,
) -> None:
super().__init__(**kwargs)
self.image_size = image_size
self.image_num_channels = image_num_channels
self.image_mean = image_mean
self.image_std = image_std
def preprocess(
self,
images: ImageInput,
image_num_channels: Optional[int] = 3,
image_size: Optional[Dict[str, int]] = None,
image_mean: Optional[Union[float, List[float]]] = None,
image_std: Optional[Union[float, List[float]]] = None,
transform: Callable = None,
**kwargs,
) -> TensorType.PYTORCH:
"""
Preprocess a batch of images.
Args:
images (`ImageInput`):
A list of images to preprocess.
image_size (`int`, *optional*, defaults to `self.image_size`):
Resize to image size
image_num_channels (`int`, *optional*, defaults to `self.image_num_channels`):
Number of image channels.
image_mean (`float` or `List[float]`, *optional*, defaults to `IDEFICS_STANDARD_MEAN`):
Mean to use if normalizing the image. This is a float or list of floats the length of the number of
channels in the image. Can be overridden by the `image_mean` parameter in the `preprocess` method. Can
be overridden by the `image_mean` parameter in the `preprocess` method.
image_std (`float` or `List[float]`, *optional*, defaults to `IDEFICS_STANDARD_STD`):
Standard deviation to use if normalizing the image. This is a float or list of floats the length of the
number of channels in the image. Can be overridden by the `image_std` parameter in the `preprocess`
method. Can be overridden by the `image_std` parameter in the `preprocess` method.
transform (`Callable`, *optional*, defaults to `None`):
A custom transform function that accepts a single image can be passed for training. For example,
`torchvision.Compose` can be used to compose multiple transforms. If `None` - an inference mode is
assumed - and then a preset of inference-specific transforms will be applied to the images
Returns:
a PyTorch tensor of the processed images
"""
image_size = image_size if image_size is not None else self.image_size
image_num_channels = image_num_channels if image_num_channels is not None else self.image_num_channels
image_mean = image_mean if image_mean is not None else self.image_mean
image_std = image_std if image_std is not None else self.image_std
size = (image_size, image_size)
if isinstance(images, list) and len(images) == 0:
return []
images = make_list_of_images(images)
if not valid_images(images):
raise ValueError(
"Invalid image type. Must be of type PIL.Image.Image, numpy.ndarray, "
"torch.Tensor, tf.Tensor or jax.ndarray."
)
# For training a user needs to pass their own set of transforms as a Callable.
# For reference this is what was used in the original IDEFICS training:
# transform = transforms.Compose([
# convert_to_rgb,
# transforms.RandomResizedCrop((size, size), scale=(0.9, 1.0), interpolation=transforms.InterpolationMode.BICUBIC),
# transforms.ToTensor(),
# transforms.Normalize(mean=image_mean, std=image_std),
# ])
if transform is not None:
if not is_torch_available():
raise ImportError("To pass in `transform` torch must be installed")
import torch
images = [transform(x) for x in images]
return torch.stack(images)
# for inference we do the exact transforms that were used to train IDEFICS
images = [convert_to_rgb(x) for x in images]
# further transforms expect numpy arrays
images = [to_numpy_array(x) for x in images]
images = [resize(x, size, resample=PILImageResampling.BICUBIC) for x in images]
images = [self.rescale(image=image, scale=1 / 255) for image in images]
images = [self.normalize(x, mean=image_mean, std=image_std) for x in images]
images = [to_channel_dimension_format(x, ChannelDimension.FIRST) for x in images]
# TODO: this converts to torch tensors - switch to convert_to_tensors once it becomes available
images = BatchFeature(data={"pixel_values": images}, tensor_type=TensorType.PYTORCH)["pixel_values"]
return images
# coding=utf-8
# Copyright 2022 EleutherAI and the HuggingFace Inc. team. All rights reserved.
#
# This code is based on EleutherAI's GPT-NeoX library and the GPT-NeoX
# and OPT implementations in this library. It has been modified from its
# original forms to accommodate minor architectural differences compared
# to GPT-NeoX and OPT used by the Meta AI team that trained the model.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
""" PyTorch Idefics model."""
from dataclasses import dataclass
from typing import List, Optional, Tuple, Union
import torch
import torch.nn.functional as F
import torch.utils.checkpoint
from torch import nn
from torch.nn import CrossEntropyLoss
from ... import PreTrainedModel
from ...activations import ACT2FN
from ...modeling_outputs import ModelOutput
from ...modeling_utils import PretrainedConfig
from ...utils import (
add_start_docstrings,
add_start_docstrings_to_model_forward,
logging,
replace_return_docstrings,
)
from .configuration_idefics import IdeficsConfig
from .perceiver import IdeficsPerceiverResampler
from .vision import IdeficsVisionTransformer
logger = logging.get_logger(__name__)
_CONFIG_FOR_DOC = "IdeficsConfig"
IDEFICS_PRETRAINED_MODEL_ARCHIVE_LIST = [
"HuggingFaceM4/idefics-9b",
"HuggingFaceM4/idefics-80b",
# See all Idefics models at https://huggingface.co/models?filter=idefics
]
@dataclass
class IdeficsBaseModelOutputWithPast(ModelOutput):
"""
Base class for Idefics model's outputs that may also contain a past key/values (to speed up sequential decoding).
Args:
last_hidden_state (`torch.FloatTensor` of shape `(batch_size, sequence_length, hidden_size)`):
Sequence of hidden-states at the output of the last layer of the model.
If `past_key_values` is used only the last hidden-state of the sequences of shape `(batch_size, 1,
hidden_size)` is output.
past_key_values (`tuple(tuple(torch.FloatTensor))`, *optional*, returned when `use_cache=True` is passed or when `config.use_cache=True`):
Tuple of `tuple(torch.FloatTensor)` of length `config.n_layers`, with each tuple having 2 tensors of shape
`(batch_size, num_heads, sequence_length, embed_size_per_head)`) and optionally if
`config.is_encoder_decoder=True` 2 additional tensors of shape `(batch_size, num_heads,
encoder_sequence_length, embed_size_per_head)`.
Contains pre-computed hidden-states (key and values in the self-attention blocks and optionally if
`config.is_encoder_decoder=True` in the cross-attention blocks) that can be used (see `past_key_values`
input) to speed up sequential decoding.
hidden_states (`tuple(torch.FloatTensor)`, *optional*, returned when `output_hidden_states=True` is passed or when `config.output_hidden_states=True`):
Tuple of `torch.FloatTensor` (one for the output of the embeddings, if the model has an embedding layer, +
one for the output of each layer) of shape `(batch_size, sequence_length, hidden_size)`.
Hidden-states of the model at the output of each layer plus the optional initial embedding outputs.
attentions (`tuple(torch.FloatTensor)`, *optional*, returned when `output_attentions=True` is passed or when `config.output_attentions=True`):
Tuple of `torch.FloatTensor` (one for each layer) of shape `(batch_size, num_heads, sequence_length,
sequence_length)`.
Attentions weights after the attention softmax, used to compute the weighted average in the self-attention
heads.
image_hidden_states (`tuple(torch.FloatTensor)`, *optional*):
Tuple of `torch.FloatTensor` (one for the output of the image embeddings, `(batch_size, num_images,
sequence_length, hidden_size)`.
image_hidden_states of the model produced by the vision encoder, and optionally by the perceiver
"""
last_hidden_state: torch.FloatTensor = None
past_key_values: Optional[Tuple[Tuple[torch.FloatTensor]]] = None
hidden_states: Optional[Tuple[torch.FloatTensor]] = None
attentions: Optional[Tuple[torch.FloatTensor]] = None
image_hidden_states: Optional[Tuple[torch.FloatTensor]] = None
@dataclass
class IdeficsCausalLMOutputWithPast(ModelOutput):
"""
Base class for Idefics causal language model (or autoregressive) outputs.
Args:
loss (`torch.FloatTensor` of shape `(1,)`, *optional*, returned when `labels` is provided):
Language modeling loss (for next-token prediction).
logits (`torch.FloatTensor` of shape `(batch_size, sequence_length, config.vocab_size)`):
Prediction scores of the language modeling head (scores for each vocabulary token before SoftMax).
past_key_values (`tuple(tuple(torch.FloatTensor))`, *optional*, returned when `use_cache=True` is passed or when `config.use_cache=True`):
Tuple of `tuple(torch.FloatTensor)` of length `config.n_layers`, with each tuple having 2 tensors of shape
`(batch_size, num_heads, sequence_length, embed_size_per_head)`)
Contains pre-computed hidden-states (key and values in the self-attention blocks) that can be used (see
`past_key_values` input) to speed up sequential decoding.
hidden_states (`tuple(torch.FloatTensor)`, *optional*, returned when `output_hidden_states=True` is passed or when `config.output_hidden_states=True`):
Tuple of `torch.FloatTensor` (one for the output of the embeddings, if the model has an embedding layer, +
one for the output of each layer) of shape `(batch_size, sequence_length, hidden_size)`.
Hidden-states of the model at the output of each layer plus the optional initial embedding outputs.
attentions (`tuple(torch.FloatTensor)`, *optional*, returned when `output_attentions=True` is passed or when `config.output_attentions=True`):
Tuple of `torch.FloatTensor` (one for each layer) of shape `(batch_size, num_heads, sequence_length,
sequence_length)`.
Attentions weights after the attention softmax, used to compute the weighted average in the self-attention
heads.
image_hidden_states (`tuple(torch.FloatTensor)`, *optional*):
Tuple of `torch.FloatTensor` (one for the output of the image embeddings, `(batch_size, num_images,
sequence_length, hidden_size)`.
image_hidden_states of the model produced by the vision encoder, and optionally by the perceiver
"""
loss: Optional[torch.FloatTensor] = None
logits: torch.FloatTensor = None
past_key_values: Optional[List[torch.FloatTensor]] = None
hidden_states: Optional[Tuple[torch.FloatTensor]] = None
attentions: Optional[Tuple[torch.FloatTensor]] = None
image_hidden_states: Optional[Tuple[torch.FloatTensor]] = None
def expand_inputs_for_generation(
input_ids,
expand_size=1,
is_encoder_decoder=False,
attention_mask=None,
encoder_outputs=None,
**model_kwargs,
):
expanded_return_idx = (
torch.arange(input_ids.shape[0]).view(-1, 1).repeat(1, expand_size).view(-1).to(input_ids.device)
)
input_ids = input_ids.index_select(0, expanded_return_idx)
model_kwargs["pixel_values"] = model_kwargs.get("pixel_values", None)
model_kwargs["image_encoder_embeddings"] = model_kwargs.get("image_encoder_embeddings", None)
model_kwargs["perceiver_embeddings"] = model_kwargs.get("perceiver_embeddings", None)
model_kwargs["image_attention_mask"] = model_kwargs.get("image_attention_mask", None)
if "token_type_ids" in model_kwargs:
token_type_ids = model_kwargs["token_type_ids"]
model_kwargs["token_type_ids"] = token_type_ids.index_select(0, expanded_return_idx)
if attention_mask is not None:
model_kwargs["attention_mask"] = attention_mask.index_select(0, expanded_return_idx)
if model_kwargs["image_attention_mask"] is not None:
model_kwargs["image_attention_mask"] = model_kwargs["image_attention_mask"].index_select(
0, expanded_return_idx
)
if model_kwargs["pixel_values"] is not None:
model_kwargs["pixel_values"] = model_kwargs["pixel_values"].index_select(0, expanded_return_idx)
elif model_kwargs["image_encoder_embeddings"] is not None:
model_kwargs["image_encoder_embeddings"] = model_kwargs["image_encoder_embeddings"].index_select(
0, expanded_return_idx
)
elif model_kwargs["perceiver_embeddings"] is not None:
model_kwargs["perceiver_embeddings"] = model_kwargs["perceiver_embeddings"].index_select(
0, expanded_return_idx
)
return input_ids, model_kwargs
def update_model_kwargs_for_generation(outputs, model_kwargs):
# must have this key set to at least None
if "past_key_values" in outputs:
model_kwargs["past_key_values"] = outputs.past_key_values
else:
model_kwargs["past_key_values"] = None
# update token_type_ids with last value
if "token_type_ids" in model_kwargs:
token_type_ids = model_kwargs["token_type_ids"]
model_kwargs["token_type_ids"] = torch.cat([token_type_ids, token_type_ids[:, -1].unsqueeze(-1)], dim=-1)
# update attention masks
if "attention_mask" in model_kwargs:
attention_mask = model_kwargs["attention_mask"]
model_kwargs["attention_mask"] = torch.cat(
[attention_mask, attention_mask.new_ones((attention_mask.shape[0], 1))], dim=-1
)
if "image_attention_mask" in model_kwargs:
image_attention_mask = model_kwargs["image_attention_mask"]
last_mask = image_attention_mask[:, -1, :].unsqueeze(1)
model_kwargs["image_attention_mask"] = last_mask
# Get the precomputed image_hidden_states
model_kwargs["image_hidden_states"] = outputs.image_hidden_states
return model_kwargs
def prepare_inputs_for_generation(input_ids, past_key_values=None, **kwargs):
token_type_ids = kwargs.get("token_type_ids", None)
# only last token for inputs_ids if past is defined in kwargs
if past_key_values:
input_ids = input_ids[:, -1].unsqueeze(-1)
if token_type_ids is not None:
token_type_ids = token_type_ids[:, -1].unsqueeze(-1)
attention_mask = kwargs.get("attention_mask", None)
position_ids = kwargs.get("position_ids", None)
if attention_mask is not None and position_ids is None:
# create position_ids on the fly for batch generation
position_ids = attention_mask.long().cumsum(-1) - 1
position_ids.masked_fill_(attention_mask == 0, 1)
if past_key_values:
position_ids = position_ids[:, -1].unsqueeze(-1)
pixel_values = kwargs.get("pixel_values", None)
image_encoder_embeddings = kwargs.get("image_encoder_embeddings", None)
perceiver_embeddings = kwargs.get("perceiver_embeddings", None)
image_attention_mask = kwargs.get("image_attention_mask", None)
return {
"input_ids": input_ids,
"past_key_values": past_key_values,
"use_cache": kwargs.get("use_cache"),
"position_ids": position_ids,
"attention_mask": attention_mask,
"token_type_ids": token_type_ids,
"pixel_values": pixel_values,
"image_encoder_embeddings": image_encoder_embeddings,
"perceiver_embeddings": perceiver_embeddings,
"image_attention_mask": image_attention_mask,
}
def freeze_model(model, module_exceptions=[]):
mapping = {
"LayerNorm": nn.LayerNorm,
"Linear": nn.Linear,
"Embedding": nn.Embedding,
}
module_exceptions_mapped = [mapping[m] for m in module_exceptions]
for module in model.modules():
if module_exceptions and any([isinstance(module, t) for t in module_exceptions_mapped]):
module.requires_grad_(True) # Explicitely setting it to true to avoid any mistakes
else:
module.requires_grad_(False)
return model
class IdeficsDecoupledEmbedding(nn.Embedding):
# Derived from https://pytorch.org/docs/stable/_modules/torch/nn/modules/sparse.html#Embedding
"""
Implements a decoupling of parameters to allow freezing (or not) a subset of the embeddings. In practise, the
regular `weight` can be trained or frozen (i.e. `partially_freeze=True`), and if `num_additional_embeddings` > 0,
then it will create `num_additional_embeddings` additional parameters that are always trained. If
`num_additional_embeddings=0`, then the module defaults back to the regular behavior of `nn.Embedding`.
"""
def __init__(
self,
num_embeddings,
num_additional_embeddings,
embedding_dim,
partially_freeze: Optional[bool] = False,
device=None,
dtype=None,
padding_idx=None,
**kwargs,
) -> None:
"""
Args:
num_embeddings (`int`):
Size of the dictionary of embeddings
num_additional_embeddings (`int`):
Number of additional embeddings. Only useful when you `partially_freeze=True`.
embedding_dim (`int`):
The size of each embedding vector
partially_freeze: (`bool`, *optional*, defaults to `False`):
If `True`, the regular `weight` will be frozen. `additional_weight` is never frozen.
padding_idx (`int`, *optional*):
The padding index (needs to be less than num_embeddings)
Note: there are a lot of other parameters to initialize a standard `nn.Embedding` such as `padding_idx`,
`max_norm` or `norm_type`. We are not supporting these.
"""
if padding_idx is not None and padding_idx > num_embeddings:
raise ValueError(f"padding_idx must be within num_embeddings. Got {padding_idx} and {num_embeddings}")
super().__init__(
num_embeddings=num_embeddings,
embedding_dim=embedding_dim,
device=device,
dtype=dtype,
padding_idx=padding_idx,
**kwargs,
)
self.num_embeddings = num_embeddings
self.padding_idx = padding_idx
self.num_additional_embeddings = num_additional_embeddings
self.partially_freeze = partially_freeze
if partially_freeze:
self.weight.requires_grad_(False)
if self.num_additional_embeddings > 0:
self.additional_embedding = nn.Embedding(
num_embeddings=self.num_additional_embeddings,
embedding_dim=embedding_dim,
device=device,
dtype=dtype,
)
def forward(self, input_ids):
"""
we have 2 embeddings, with different indices - one pretrained self.weight and another
self.additional_embedding.weight that is being trained.
in order to make a lookup of the input ids, we:
1. find out the indices of the entries belonging to the 2nd embedding
2. extract those values while subtracting the size of the first embedding (num_embeddings), since the 2nd
embedding starts from 0 and not num_embeddings
3. perform the 2nd embedding lookup
4. now we handle the 1st embedding, we overwrite indices belonging to the 2nd embedding with a padding index
5. perform the 1st embedding lookup
6. now we overwrite the values in the 1st embedding lookup with the values of the 2nd embedding lookup
note: for the 1st embedding lookup we could have looked up only the low indices and not do the padding, but
then we have to create a new tensor and populate it with 2 tensors that are spread out across various indices -
i.e. not a simple concat - I haven't benchmarked the complex case if it's any faster, given that seqlens are
usually relatively short it's probably not faster or if faster not by much - but might be a good idea to
measure.
"""
if self.num_additional_embeddings == 0:
return F.embedding(input_ids, self.weight)
# Clone so that we don't modify the original input_ids later on
input_ids = input_ids.clone()
additional_vocab_indices = torch.where(input_ids >= self.num_embeddings)
input_ids_additional_vocab = input_ids[additional_vocab_indices]
additional_embeddings = self.additional_embedding(input_ids_additional_vocab - self.num_embeddings)
# for successful lookup replace input_ids with 0, the results of these will be discarded anyway
input_ids[additional_vocab_indices] = 0
full_vector = F.embedding(input_ids, self.weight)
# overwrite the records with high indices
full_vector[additional_vocab_indices] = additional_embeddings
return full_vector
def extra_repr(self) -> str:
return "num_embeddings={}, num_additional_embeddings={}, embedding_dim={}, partially_freeze={}".format(
self.num_embeddings,
self.num_additional_embeddings,
self.embedding_dim,
self.partially_freeze,
)
class IdeficsDecoupledLinear(nn.Linear):
# Derived from https://pytorch.org/docs/stable/_modules/torch/nn/modules/linear.html#Linear
"""
Implements a decoupling of parameters to allow freezing (or not) a subset of the parameters. In practise, the
regular `weight` can be trained or frozen (i.e. `partially_freeze=True`), and if `out_additional_features` > 0,
then it will create `out_additional_features * in_features` additional parameters that are always trained. If
`out_additional_features=0`, then the module defaults back to the regular behavior of `nn.Linear`.
"""
def __init__(
self,
in_features: int,
out_features: int,
out_additional_features: int = 0,
bias: bool = True,
partially_freeze: bool = True,
device=None,
dtype=None,
) -> None:
"""
out_additional_features: int. Number of additional trainable dimensions. Only makes sense when
`partially_freeze=True`. partially_freeze: bool. If True, the regular `weight` will be frozen and extra
parameters (if any) will be trainable. If False, default to the regular behavior of nn.Linear.
"""
super().__init__(in_features, out_features, bias, device, dtype)
self.out_additional_features = out_additional_features
self.partially_freeze = partially_freeze
self.in_features = in_features
self.out_features = out_features
if partially_freeze:
self.weight.requires_grad_(False)
if bias:
self.bias.requires_grad_(False)
if out_additional_features > 0:
self.additional_fc = nn.Linear(
in_features=in_features,
out_features=out_additional_features,
bias=bias,
device=device,
dtype=dtype,
)
def forward(self, input: torch.Tensor) -> torch.Tensor:
output = F.linear(input, self.weight, self.bias)
if self.out_additional_features > 0:
additional_features = F.linear(input, self.additional_fc.weight, self.additional_fc.bias)
output = torch.cat((output, additional_features), -1)
return output
def extra_repr(self) -> str:
"""Overwriting `nn.Linear.extra_repr` to include new parameters."""
return "in_features={}, out_features={}, out_additional_features={}, bias={}, partially_freeze={}".format(
self.in_features,
self.out_features,
self.out_additional_features,
self.bias is not None,
self.partially_freeze,
)
# Copied from transformers.models.bart.modeling_bart._make_causal_mask
def _make_causal_mask(
input_ids_shape: torch.Size, dtype: torch.dtype, device: torch.device, past_key_values_length: int = 0
):
"""
Make causal mask used for bi-directional self-attention.
"""
bsz, tgt_len = input_ids_shape
mask = torch.full((tgt_len, tgt_len), torch.finfo(dtype).min, device=device)
mask_cond = torch.arange(mask.size(-1), device=device)
mask.masked_fill_(mask_cond < (mask_cond + 1).view(mask.size(-1), 1), 0)
mask = mask.to(dtype)
if past_key_values_length > 0:
mask = torch.cat([torch.zeros(tgt_len, past_key_values_length, dtype=dtype, device=device), mask], dim=-1)
return mask[None, None, :, :].expand(bsz, 1, tgt_len, tgt_len + past_key_values_length)
def _expand_mask(mask: torch.Tensor, dtype: torch.dtype, tgt_len: Optional[int] = None):
"""
Expands attention_mask from `[bsz, seq_len]` to `[bsz, 1, tgt_seq_len, src_seq_len]`.
"""
bsz, src_len = mask.size()
tgt_len = tgt_len if tgt_len is not None else src_len
expanded_mask = mask[:, None, None, :].expand(bsz, 1, tgt_len, src_len).to(dtype)
inverted_mask = 1.0 - expanded_mask
return inverted_mask.masked_fill(inverted_mask.to(torch.bool), torch.finfo(dtype).min)
# this was adapted from LlamaRMSNorm
class IdeficsRMSNorm(nn.Module):
def __init__(self, hidden_size, eps=1e-6):
"""
IdeficsRMSNorm is equivalent to T5LayerNorm
"""
super().__init__()
self.weight = nn.Parameter(torch.ones(hidden_size))
self.variance_epsilon = eps
def forward(self, hidden_states):
variance = hidden_states.to(torch.float32).pow(2).mean(-1, keepdim=True)
hidden_states = hidden_states * torch.rsqrt(variance + self.variance_epsilon)
# convert into half-precision if necessary
if self.weight.dtype in [torch.float16, torch.bfloat16]:
hidden_states = hidden_states.to(self.weight.dtype)
return self.weight * hidden_states
# this was adapted from LlamaRotaryEmbedding
class IdeficsEmbedding(torch.nn.Module):
def __init__(self, dim, max_position_embeddings=2048, base=10000, device=None):
super().__init__()
inv_freq = 1.0 / (base ** (torch.arange(0, dim, 2).float().to(device) / dim))
self.register_buffer("inv_freq", inv_freq)
# Build here to make `torch.jit.trace` work.
self.max_seq_len_cached = max_position_embeddings
t = torch.arange(self.max_seq_len_cached, device=self.inv_freq.device, dtype=self.inv_freq.dtype)
freqs = torch.einsum("i,j->ij", t, self.inv_freq)
# Different from paper, but it uses a different permutation in order to obtain the same calculation
emb = torch.cat((freqs, freqs), dim=-1)
self.register_buffer("cos_cached", emb.cos()[None, None, :, :], persistent=False)
self.register_buffer("sin_cached", emb.sin()[None, None, :, :], persistent=False)
def forward(self, x, seq_len=None):
# x: [bs, num_attention_heads, seq_len, head_size]
# This `if` block is unlikely to be run after we build sin/cos in `__init__`. Keep the logic here just in case.
if seq_len > self.max_seq_len_cached:
self.max_seq_len_cached = seq_len
t = torch.arange(self.max_seq_len_cached, device=x.device, dtype=self.inv_freq.dtype)
freqs = torch.einsum("i,j->ij", t, self.inv_freq)
# Different from paper, but it uses a different permutation in order to obtain the same calculation
emb = torch.cat((freqs, freqs), dim=-1).to(x.device)
self.register_buffer("cos_cached", emb.cos()[None, None, :, :], persistent=False)
self.register_buffer("sin_cached", emb.sin()[None, None, :, :], persistent=False)
return (
self.cos_cached[:, :, :seq_len, ...].to(dtype=x.dtype),
self.sin_cached[:, :, :seq_len, ...].to(dtype=x.dtype),
)
def rotate_half(x):
"""Rotates half the hidden dims of the input."""
x1 = x[..., : x.shape[-1] // 2]
x2 = x[..., x.shape[-1] // 2 :]
return torch.cat((-x2, x1), dim=-1)
def apply_rotary_pos_emb(q, k, cos, sin, position_ids):
gather_indices = position_ids[:, None, :, None] # [bs, 1, seq_len, 1]
gather_indices = gather_indices.repeat(1, cos.shape[1], 1, cos.shape[3])
cos = torch.gather(cos.repeat(gather_indices.shape[0], 1, 1, 1), 2, gather_indices)
sin = torch.gather(sin.repeat(gather_indices.shape[0], 1, 1, 1), 2, gather_indices)
q_embed = (q * cos) + (rotate_half(q) * sin)
k_embed = (k * cos) + (rotate_half(k) * sin)
return q_embed, k_embed
# this was adapted from LlamaMLP
class IdeficsMLP(nn.Module):
def __init__(
self,
hidden_size: int,
intermediate_size: int,
hidden_act: str,
):
super().__init__()
self.gate_proj = nn.Linear(hidden_size, intermediate_size, bias=False)
self.down_proj = nn.Linear(intermediate_size, hidden_size, bias=False)
self.up_proj = nn.Linear(hidden_size, intermediate_size, bias=False)
self.act_fn = ACT2FN[hidden_act]
def forward(self, x):
return self.down_proj(self.act_fn(self.gate_proj(x)) * self.up_proj(x))
# this was adapted from LlamaAttention
class IdeficsAttention(nn.Module):
"""Multi-headed attention from 'Attention Is All You Need' paper"""
def __init__(
self,
hidden_size: int,
num_heads: int,
dropout: float = 0.0,
is_cross_attention: bool = False,
config: PretrainedConfig = None,
qk_layer_norms: bool = False,
):
super().__init__()
self.hidden_size = hidden_size
self.num_heads = num_heads
self.head_dim = hidden_size // num_heads
self.dropout = dropout
if (self.head_dim * num_heads) != self.hidden_size:
raise ValueError(
f"hidden_size must be divisible by num_heads (got `hidden_size`: {self.hidden_size}"
f" and `num_heads`: {num_heads})."
)
self.is_cross_attention = is_cross_attention
if not hasattr(nn.functional, "scaled_dot_product_attention"):
raise ValueError("this model requires pytorch 2.0 or higher")
if self.is_cross_attention:
kv_input_dim = (
self.hidden_size if not hasattr(config.vision_config, "embed_dim") else config.vision_config.embed_dim
)
self.q_proj = nn.Linear(
self.hidden_size,
num_heads * self.head_dim,
bias=False,
)
self.k_proj = nn.Linear(kv_input_dim, num_heads * self.head_dim, bias=False)
self.v_proj = nn.Linear(
kv_input_dim,
num_heads * self.head_dim,
bias=False,
)
else:
self.q_proj = nn.Linear(
self.hidden_size,
num_heads * self.head_dim,
bias=False,
)
self.k_proj = nn.Linear(
self.hidden_size,
num_heads * self.head_dim,
bias=False,
)
self.v_proj = nn.Linear(
self.hidden_size,
num_heads * self.head_dim,
bias=False,
)
self.o_proj = nn.Linear(
num_heads * self.head_dim,
hidden_size,
bias=False,
)
self.rotary_emb = IdeficsEmbedding(self.head_dim)
self.qk_layer_norms = qk_layer_norms
if self.qk_layer_norms:
self.q_layer_norm = IdeficsRMSNorm(self.head_dim, eps=config.rms_norm_eps)
self.k_layer_norm = IdeficsRMSNorm(self.head_dim, eps=config.rms_norm_eps)
def _shape(self, tensor: torch.Tensor, seq_len: int, bsz: int):
return tensor.view(bsz, seq_len, self.num_heads, self.head_dim).transpose(1, 2).contiguous()
def forward(
self,
hidden_states: torch.Tensor,
key_value_states: Optional[torch.Tensor] = None,
attention_mask: Optional[torch.Tensor] = None,
position_ids: Optional[torch.LongTensor] = None,
past_key_value: Optional[Tuple[torch.Tensor]] = None,
output_attentions: bool = False,
use_cache: bool = False,
) -> Tuple[torch.Tensor, Optional[torch.Tensor], Optional[Tuple[torch.Tensor]]]:
# if key_value_states are provided this layer is used as a cross-attention layer
is_cross_attention = self.is_cross_attention or key_value_states is not None
bsz, q_len, _ = hidden_states.size()
query_states = self.q_proj(hidden_states).view(bsz, q_len, self.num_heads, self.head_dim).transpose(1, 2)
if not is_cross_attention:
key_states = self.k_proj(hidden_states).view(bsz, q_len, self.num_heads, self.head_dim).transpose(1, 2)
value_states = self.v_proj(hidden_states).view(bsz, q_len, self.num_heads, self.head_dim).transpose(1, 2)
else:
_, kv_len, _ = key_value_states.size() # Note that, in this case, `kv_len` == `kv_seq_len`
key_states = self.k_proj(key_value_states).view(bsz, kv_len, self.num_heads, self.head_dim).transpose(1, 2)
value_states = (
self.v_proj(key_value_states).view(bsz, kv_len, self.num_heads, self.head_dim).transpose(1, 2)
)
kv_seq_len = key_states.shape[-2]
if past_key_value is not None:
kv_seq_len += past_key_value[0].shape[-2]
if not is_cross_attention:
cos, sin = self.rotary_emb(value_states, seq_len=max(kv_seq_len, q_len))
query_states, key_states = apply_rotary_pos_emb(query_states, key_states, cos, sin, position_ids)
# [bsz, nh, t, hd]
if past_key_value is not None:
# reuse k, v, self_attention
key_states = torch.cat([past_key_value[0], key_states], dim=2)
value_states = torch.cat([past_key_value[1], value_states], dim=2)
past_key_value = (key_states, value_states) if use_cache else None
if self.qk_layer_norms:
query_states = self.q_layer_norm(query_states)
key_states = self.k_layer_norm(key_states)
if attention_mask is not None:
if attention_mask.size() != (bsz, 1, q_len, kv_seq_len):
raise ValueError(
f"Attention mask should be of size {(bsz, 1, q_len, kv_seq_len)}, but is {attention_mask.size()}"
)
attn_output = nn.functional.scaled_dot_product_attention(
query_states,
key_states,
value_states,
attn_mask=attention_mask,
dropout_p=self.dropout,
)
if attn_output.size() != (bsz, self.num_heads, q_len, self.head_dim):
raise ValueError(
f"`attn_output` should be of size {(bsz, self.num_heads, q_len, self.head_dim)}, but is"
f" {attn_output.size()}"
)
attn_output = attn_output.transpose(1, 2)
attn_output = attn_output.reshape(bsz, q_len, self.hidden_size)
attn_output = self.o_proj(attn_output)
attn_weights = None
if output_attentions:
logger.warning_once(
"attn_weights are not extracted in scaled_dot_product_attention. The model returns None instead"
)
return attn_output, attn_weights, past_key_value
# this was adapted from LlamaDecoderLayer
class IdeficsDecoderLayer(nn.Module):
def __init__(self, config: IdeficsConfig):
super().__init__()
self.hidden_size = config.hidden_size
self.self_attn = IdeficsAttention(
hidden_size=self.hidden_size,
num_heads=config.num_attention_heads,
dropout=config.dropout,
config=config,
)
self.mlp = IdeficsMLP(
hidden_size=self.hidden_size,
intermediate_size=config.intermediate_size,
hidden_act=config.hidden_act,
)
self.input_layernorm = IdeficsRMSNorm(config.hidden_size, eps=config.rms_norm_eps)
self.post_attention_layernorm = IdeficsRMSNorm(config.hidden_size, eps=config.rms_norm_eps)
self.dropout = config.dropout
def forward(
self,
hidden_states: torch.Tensor,
attention_mask: Optional[torch.Tensor] = None,
position_ids: Optional[torch.LongTensor] = None,
past_key_value: Optional[Tuple[torch.Tensor]] = None,
output_attentions: Optional[bool] = False,
use_cache: Optional[bool] = False,
) -> Tuple[torch.FloatTensor, Optional[Tuple[torch.FloatTensor, torch.FloatTensor]]]:
"""
Args:
hidden_states (`torch.FloatTensor`): input to the layer of shape `(batch, seq_len, embed_dim)`
attention_mask (`torch.FloatTensor`, *optional*): attention mask of size
`(batch, 1, tgt_len, src_len)` where padding elements are indicated by very large negative values.
output_attentions (`bool`, *optional*):
Whether or not to return the attentions tensors of all attention layers. See `attentions` under
returned tensors for more detail.
use_cache (`bool`, *optional*):
If set to `True`, `past_key_values` key value states are returned and can be used to speed up decoding
(see `past_key_values`).
past_key_value (`Tuple(torch.FloatTensor)`, *optional*): cached past key and value projection states
"""
residual = hidden_states
hidden_states = self.input_layernorm(hidden_states)
# Self Attention
hidden_states, self_attn_weights, present_key_value = self.self_attn(
hidden_states=hidden_states,
attention_mask=attention_mask,
position_ids=position_ids,
past_key_value=past_key_value,
output_attentions=output_attentions,
use_cache=use_cache,
)
hidden_states = nn.functional.dropout(hidden_states, p=self.dropout, training=self.training)
hidden_states = residual + hidden_states
# Fully Connected
residual = hidden_states
hidden_states = self.post_attention_layernorm(hidden_states)
hidden_states = self.mlp(hidden_states)
hidden_states = nn.functional.dropout(hidden_states, p=self.dropout, training=self.training)
hidden_states = residual + hidden_states
outputs = (hidden_states,)
if output_attentions:
outputs += (self_attn_weights,)
if use_cache:
outputs += (present_key_value,)
return outputs
class IdeficsGatedCrossAttentionLayer(nn.Module):
def __init__(self, config: IdeficsConfig):
super().__init__()
self.hidden_size = config.hidden_size
self.cross_attn = IdeficsAttention(
hidden_size=self.hidden_size,
num_heads=config.num_attention_heads,
is_cross_attention=True,
dropout=config.dropout,
config=config,
qk_layer_norms=config.qk_layer_norms,
)
self.mlp = IdeficsMLP(
hidden_size=self.hidden_size,
intermediate_size=config.intermediate_size,
hidden_act=config.hidden_act,
)
self.input_layernorm = IdeficsRMSNorm(config.hidden_size, eps=config.rms_norm_eps)
self.post_attention_layernorm = IdeficsRMSNorm(config.hidden_size, eps=config.rms_norm_eps)
self.config = config.dropout
self.act_cross_attn = nn.Tanh()
self.act_dense = nn.Tanh()
if config.alpha_initializer == "zeros":
if config.alpha_type == "vector":
self.alpha_cross_attn = nn.Parameter(torch.zeros(1, 1, self.hidden_size))
self.alpha_dense = nn.Parameter(torch.zeros(1, 1, self.hidden_size))
elif config.alpha_type == "float":
self.alpha_cross_attn = nn.Parameter(torch.zeros(1))
self.alpha_dense = nn.Parameter(torch.zeros(1))
else:
raise ValueError(f"Unknown value for `alpha_type` ({config.alpha_type})")
elif config.alpha_initializer == "ones":
if config.alpha_type == "vector":
self.alpha_cross_attn = nn.Parameter(torch.ones(1, 1, self.hidden_size))
self.alpha_dense = nn.Parameter(torch.ones(1, 1, self.hidden_size))
elif config.alpha_type == "float":
self.alpha_cross_attn = nn.Parameter(torch.ones(1))
self.alpha_dense = nn.Parameter(torch.ones(1))
else:
raise ValueError(f"Unknown value for `alpha_type` ({config.alpha_type})")
elif config.alpha_initializer in {"normal", "gaussian", "random"}:
if config.alpha_type == "vector":
self.alpha_cross_attn = nn.Parameter(
torch.normal(mean=0.0, std=config.alphas_initializer_range, size=(1, 1, self.hidden_size))
)
self.alpha_dense = nn.Parameter(
torch.normal(mean=0.0, std=config.alphas_initializer_range, size=(1, 1, self.hidden_size))
)
elif config.alpha_type == "float":
self.alpha_cross_attn = nn.Parameter(
torch.normal(mean=0.0, std=config.alphas_initializer_range, size=(1))
)
self.alpha_dense = nn.Parameter(torch.normal(mean=0.0, std=config.alphas_initializer_range, size=(1)))
else:
raise ValueError(f"Unknown value for `alpha_type` ({config.alpha_type})")
else:
raise NotImplementedError(f"Alpha initialization scheme {config.alpha_initializer} not yet implemented!")
if not (hasattr(self, "alpha_cross_attn") and hasattr(self, "alpha_dense")):
raise ValueError("Alpha parameters not initialized correctly!")
def forward(
self,
hidden_states: torch.Tensor,
attention_mask: Optional[torch.Tensor] = None,
image_hidden_states: Optional[torch.Tensor] = None,
image_attention_mask: Optional[torch.Tensor] = None,
output_attentions: Optional[bool] = False,
use_cache: Optional[bool] = False,
past_key_value: Optional[Tuple[torch.Tensor]] = None,
no_images: Optional[bool] = False,
) -> Tuple[torch.FloatTensor, Optional[Tuple[torch.FloatTensor, torch.FloatTensor]]]:
"""
Args:
hidden_states (`torch.FloatTensor`): input to the layer of shape `(batch, seq_len, embed_dim)`
attention_mask (`torch.FloatTensor`, *optional*): attention mask of size
`(batch, 1, tgt_len, src_len)` where padding elements are indicated by very large negative values.
output_attentions (`bool`, *optional*):
Whether or not to return the attentions tensors of all attention layers. See `attentions` under
returned tensors for more detail.
use_cache (`bool`, *optional*):
If set to `True`, `past_key_values` key value states are returned and can be used to speed up decoding
(see `past_key_values`).
past_key_value (`Tuple(torch.FloatTensor)`, *optional*): cached past key and value projection states
no_images (`bool`, *optional*, defaults to `False`): If `True` the vision part is ignored
"""
if image_hidden_states is None:
raise ValueError(
"`image_hidden_states` is required for Idefics cross attention module which are visual features to be"
" conditioned on."
)
if past_key_value is not None:
raise NotImplementedError("Past key value states are not implemented for Idefics cross attention module.")
residual = hidden_states
hidden_states = self.input_layernorm(hidden_states)
# Self Attention
hidden_states, self_attn_weights, present_key_value = self.cross_attn(
hidden_states=hidden_states,
key_value_states=image_hidden_states,
attention_mask=image_attention_mask,
output_attentions=output_attentions,
)
hidden_states = nn.functional.dropout(hidden_states, p=self.config, training=self.training)
# when there are no images the model is used in pure language mode
gate = 0 if no_images else 1
hidden_states = residual + gate * self.act_cross_attn(self.alpha_cross_attn) * hidden_states
# Fully Connected
residual = hidden_states
hidden_states = self.post_attention_layernorm(hidden_states)
hidden_states = self.mlp(hidden_states)
hidden_states = nn.functional.dropout(hidden_states, p=self.config, training=self.training)
hidden_states = residual + self.act_dense(self.alpha_dense) * hidden_states
outputs = (hidden_states,)
if output_attentions:
outputs += (self_attn_weights,)
if use_cache:
outputs += (present_key_value,)
return outputs
LLAMA_START_DOCSTRING = r"""
This model inherits from [`PreTrainedModel`]. Check the superclass documentation for the generic methods the
library implements for all its model (such as downloading or saving, resizing the input embeddings, pruning heads
etc.)
This model is also a PyTorch [torch.nn.Module](https://pytorch.org/docs/stable/nn.html#torch.nn.Module) subclass.
Use it as a regular PyTorch Module and refer to the PyTorch documentation for all matter related to general usage
and behavior.
Parameters:
config ([`IdeficsConfig`]):
Model configuration class with all the parameters of the model. Initializing with a config file does not
load the weights associated with the model, only the configuration. Check out the
[`~PreTrainedModel.from_pretrained`] method to load the model weights.
"""
@add_start_docstrings(
"The bare LLaMA Model outputting raw hidden-states without any specific head on top.",
LLAMA_START_DOCSTRING,
)
class IdeficsPreTrainedModel(PreTrainedModel):
config_class = IdeficsConfig
base_model_prefix = "model"
supports_gradient_checkpointing = True
_no_split_modules = ["IdeficsDecoderLayer", "IdeficsGatedCrossAttentionLayer"]
def _init_weights(self, module):
# important: this ported version of Idefics isn't meant for training from scratch - only
# inference and fine-tuning - so the proper init weights code has been removed - the m4 code
# base should be used for training from scratch and it contains the correct code.
std = self.config.initializer_range
if isinstance(module, nn.Linear):
module.weight.data.normal_(mean=0.0, std=std)
if module.bias is not None:
module.bias.data.zero_()
elif isinstance(module, nn.Embedding):
module.weight.data.normal_(mean=0.0, std=std)
if module.padding_idx is not None:
module.weight.data[module.padding_idx].zero_()
def _set_gradient_checkpointing(self, module, value=False):
if isinstance(module, IdeficsModel):
module.gradient_checkpointing = value
LLAMA_INPUTS_DOCSTRING = r"""
Args:
input_ids (`torch.LongTensor` of shape `(batch_size, sequence_length)`):
Indices of input sequence tokens in the vocabulary. Padding will be ignored by default should you provide
it.
Indices can be obtained using [`AutoTokenizer`]. See [`PreTrainedTokenizer.encode`] and
[`PreTrainedTokenizer.__call__`] for details.
[What are input IDs?](../glossary#input-ids)
attention_mask (`torch.Tensor` of shape `(batch_size, sequence_length)`, *optional*):
Mask to avoid performing attention on padding token indices. Mask values selected in `[0, 1]`:
- 1 for tokens that are **not masked**,
- 0 for tokens that are **masked**.
[What are attention masks?](../glossary#attention-mask)
Indices can be obtained using [`AutoTokenizer`]. See [`PreTrainedTokenizer.encode`] and
[`PreTrainedTokenizer.__call__`] for details.
If `past_key_values` is used, optionally only the last `decoder_input_ids` have to be input (see
`past_key_values`).
If you want to change padding behavior, you should read [`modeling_opt._prepare_decoder_attention_mask`]
and modify to your needs. See diagram 1 in [the paper](https://arxiv.org/abs/1910.13461) for more
information on the default strategy.
- 1 indicates the head is **not masked**,
- 0 indicates the head is **masked**.
position_ids (`torch.LongTensor` of shape `(batch_size, sequence_length)`, *optional*):
Indices of positions of each input sequence tokens in the position embeddings. Selected in the range `[0,
config.n_positions - 1]`. [What are position IDs?](../glossary#position-ids)
past_key_values (`tuple(tuple(torch.FloatTensor))`, *optional*, returned when `use_cache=True` is passed or when `config.use_cache=True`):
Tuple of `tuple(torch.FloatTensor)` of length `config.n_layers`, with each tuple having 2 tensors of shape
`(batch_size, num_heads, sequence_length, embed_size_per_head)`) and 2 additional tensors of shape
`(batch_size, num_heads, encoder_sequence_length, embed_size_per_head)`.
Contains pre-computed hidden-states (key and values in the self-attention blocks and in the cross-attention
blocks) that can be used (see `past_key_values` input) to speed up sequential decoding.
If `past_key_values` are used, the user can optionally input only the last `decoder_input_ids` (those that
don't have their past key value states given to this model) of shape `(batch_size, 1)` instead of all
`decoder_input_ids` of shape `(batch_size, sequence_length)`.
inputs_embeds (`torch.FloatTensor` of shape `(batch_size, sequence_length, hidden_size)`, *optional*):
Optionally, instead of passing `input_ids` you can choose to directly pass an embedded representation. This
is useful if you want more control over how to convert `input_ids` indices into associated vectors than the
model's internal embedding lookup matrix.
use_cache (`bool`, *optional*):
If set to `True`, `past_key_values` key value states are returned and can be used to speed up decoding (see
`past_key_values`).
output_attentions (`bool`, *optional*):
Whether or not to return the attentions tensors of all attention layers. See `attentions` under returned
tensors for more detail.
output_hidden_states (`bool`, *optional*):
Whether or not to return the hidden states of all layers. See `hidden_states` under returned tensors for
more detail.
return_dict (`bool`, *optional*):
Whether or not to return a [`~utils.ModelOutput`] instead of a plain tuple.
"""
@add_start_docstrings(
"The bare LLaMA Model outputting raw hidden-states without any specific head on top.",
LLAMA_START_DOCSTRING,
)
class IdeficsModel(IdeficsPreTrainedModel):
"""
Transformer decoder consisting of `config.num_hidden_layers` layers. Each layer is a [`IdeficsDecoderLayer`]
Args:
config: IdeficsConfig
"""
def __init__(self, config: IdeficsConfig):
super().__init__(config)
self.config = config
self.padding_idx = config.pad_token_id
self.vocab_size = config.vocab_size
self.embed_tokens = IdeficsDecoupledEmbedding(
num_embeddings=config.vocab_size,
num_additional_embeddings=config.additional_vocab_size,
embedding_dim=config.hidden_size,
partially_freeze=config.freeze_text_layers,
padding_idx=self.padding_idx,
)
self.image_size = config.vision_config.image_size
self.vision_config = config.vision_config
self.vision_model = IdeficsVisionTransformer(config.vision_config)
# Perceiver Resampler
if config.use_resampler:
perceiver_config = config.perceiver_config
self.perceiver_resampler = IdeficsPerceiverResampler(
config,
config.vision_config.embed_dim,
perceiver_config.resampler_depth,
perceiver_config.resampler_n_heads,
perceiver_config.resampler_head_dim,
perceiver_config.resampler_n_latents,
)
self.layers = nn.ModuleList([IdeficsDecoderLayer(config) for _ in range(config.num_hidden_layers)])
self.cross_layer_interval = config.cross_layer_interval
num_cross_layers = config.num_hidden_layers // self.cross_layer_interval
self.gated_cross_attn_layers = nn.ModuleList(
[IdeficsGatedCrossAttentionLayer(config) for _ in range(num_cross_layers)]
)
self.gradient_checkpointing = False
self.norm = IdeficsRMSNorm(config.hidden_size, eps=config.rms_norm_eps)
self.gradient_checkpointing = False
# Initialize weights and apply final processing
self.post_init()
self.freeze_relevant_params(config)
def freeze_relevant_params(self, config=None):
if config is None:
config = self.config
if config.freeze_text_layers:
self.freeze_text_layers(config.freeze_text_module_exceptions)
if config.freeze_vision_layers:
freeze_model(self.vision_model, module_exceptions=config.freeze_vision_module_exceptions)
def freeze_text_layers(self, module_exceptions=[]):
for module in [self.layers, self.norm]:
freeze_model(module, module_exceptions=module_exceptions)
def freeze_vision_layers(self, module_exceptions=[]):
freeze_model(self.vision_model, module_exceptions=module_exceptions)
def get_input_embeddings(self):
return self.embed_tokens
def set_input_embeddings(self, value):
self.embed_tokens = value
# Copied from transformers.models.bart.modeling_bart.BartDecoder._prepare_decoder_attention_mask
def _prepare_decoder_attention_mask(self, attention_mask, input_shape, inputs_embeds, past_key_values_length):
# create causal mask
# [bsz, seq_len] -> [bsz, 1, tgt_seq_len, src_seq_len]
combined_attention_mask = None
if input_shape[-1] > 1:
combined_attention_mask = _make_causal_mask(
input_shape,
inputs_embeds.dtype,
device=inputs_embeds.device,
past_key_values_length=past_key_values_length,
)
if attention_mask is not None:
# [bsz, seq_len] -> [bsz, 1, tgt_seq_len, src_seq_len]
expanded_attn_mask = _expand_mask(attention_mask, inputs_embeds.dtype, tgt_len=input_shape[-1]).to(
inputs_embeds.device
)
combined_attention_mask = (
expanded_attn_mask if combined_attention_mask is None else expanded_attn_mask + combined_attention_mask
)
return combined_attention_mask
@add_start_docstrings_to_model_forward(LLAMA_INPUTS_DOCSTRING)
def forward(
self,
input_ids: torch.LongTensor = None,
attention_mask: Optional[torch.Tensor] = None,
position_ids: Optional[torch.LongTensor] = None,
past_key_values: Optional[List[torch.FloatTensor]] = None,
inputs_embeds: Optional[torch.FloatTensor] = None,
pixel_values: Optional[torch.FloatTensor] = None,
image_encoder_embeddings: Optional[torch.FloatTensor] = None,
perceiver_embeddings: Optional[torch.FloatTensor] = None,
image_attention_mask: Optional[torch.Tensor] = None,
use_cache: Optional[bool] = None,
output_attentions: Optional[bool] = None,
output_hidden_states: Optional[bool] = None,
return_dict: Optional[bool] = None,
) -> Union[Tuple, IdeficsBaseModelOutputWithPast]:
device = input_ids.device if input_ids is not None else inputs_embeds.device
output_attentions = output_attentions if output_attentions is not None else self.config.output_attentions
output_hidden_states = (
output_hidden_states if output_hidden_states is not None else self.config.output_hidden_states
)
use_cache = use_cache if use_cache is not None else self.config.use_cache
return_dict = return_dict if return_dict is not None else self.config.use_return_dict
# retrieve input_ids and inputs_embeds
if input_ids is not None and inputs_embeds is not None:
raise ValueError("You cannot specify both decoder_input_ids and decoder_inputs_embeds at the same time")
elif input_ids is not None:
batch_size, seq_length = input_ids.shape
elif inputs_embeds is not None:
batch_size, seq_length, _ = inputs_embeds.shape
else:
raise ValueError("You have to specify either decoder_input_ids or decoder_inputs_embeds")
seq_length_with_past = seq_length
past_key_values_length = 0
if past_key_values is not None:
past_key_values_length = past_key_values[0][0].shape[2]
seq_length_with_past = seq_length_with_past + past_key_values_length
if attention_mask is not None and position_ids is None:
# create position_ids on the fly for batch generation
position_ids = attention_mask.long().cumsum(-1) - 1
position_ids.masked_fill_(attention_mask == 0, 1)
elif position_ids is None:
device = input_ids.device if input_ids is not None else inputs_embeds.device
position_ids = torch.arange(
past_key_values_length, seq_length + past_key_values_length, dtype=torch.long, device=device
)
position_ids = position_ids.unsqueeze(0).view(-1, seq_length)
else:
position_ids = position_ids.view(-1, seq_length).long()
no_images = False
if (pixel_values, image_encoder_embeddings, perceiver_embeddings).count(None) != 2:
raise ValueError(
"Exactly 1 of pixel_values, image_encoder_embeddings or perceiver_embeddings has to be not-None."
)
elif pixel_values is not None:
no_images = len(torch.nonzero(pixel_values)) == 0
pixel_values = pixel_values.to(dtype=self.dtype, device=device) # fp16 compatibility
batch_size, num_images = pixel_values.shape[:2]
pixel_values = pixel_values.contiguous().view(batch_size * num_images, *pixel_values.shape[2:])
# Get sequence from the vision encoder
image_hidden_states = self.vision_model(pixel_values=pixel_values).last_hidden_state
elif image_encoder_embeddings is not None:
batch_size, num_images, image_seq_len, image_hidden_size = image_encoder_embeddings.size()
image_hidden_states = image_encoder_embeddings.to(dtype=self.dtype, device=input_ids.device)
image_hidden_states = image_hidden_states.view(batch_size * num_images, image_seq_len, image_hidden_size)
if self.config.use_resampler:
if perceiver_embeddings is None:
perceiver_embeddings = self.perceiver_resampler(image_hidden_states)
image_seq_len, image_hidden_size = perceiver_embeddings.size(1), perceiver_embeddings.size(2)
else:
batch_size, num_images, image_seq_len, image_hidden_size = perceiver_embeddings.size()
image_hidden_states = perceiver_embeddings
elif perceiver_embeddings is None:
image_seq_len, image_hidden_size = image_hidden_states.size(1), image_hidden_states.size(2)
else:
raise ValueError("If `perceiver_embeddings` are passed, use_resampler should be True")
image_hidden_states = image_hidden_states.view(batch_size, num_images * image_seq_len, image_hidden_size)
# # Hack to use the model in full language modeling mode
# image_attention_mask = torch.zeros(batch_size, seq_length, 1, dtype=torch.long, device=image_hidden_states.device)
# Make image_attention_mask compatible with hidden states
text_seq_len = image_attention_mask.size(1)
image_attention_mask = image_attention_mask.unsqueeze(-1)
image_attention_mask = image_attention_mask.repeat(1, 1, 1, image_seq_len)
image_attention_mask = image_attention_mask.view(batch_size, text_seq_len, num_images * image_seq_len)
if image_hidden_states is not None:
image_batch_size, image_sequence_length, _ = image_hidden_states.size()
image_hidden_shape = (image_batch_size, image_sequence_length)
if image_attention_mask is None:
image_attention_mask = torch.ones(image_hidden_shape, device=device)
image_attention_mask = self.invert_attention_mask(image_attention_mask)
else:
image_attention_mask = None
if inputs_embeds is None:
inputs_embeds = self.embed_tokens(input_ids)
# embed positions
if attention_mask is None:
attention_mask = torch.ones(
(batch_size, seq_length_with_past), dtype=torch.bool, device=inputs_embeds.device
)
attention_mask = self._prepare_decoder_attention_mask(
attention_mask, (batch_size, seq_length), inputs_embeds, past_key_values_length
)
hidden_states = inputs_embeds
if self.gradient_checkpointing and self.training:
if use_cache:
logger.warning_once(
"`use_cache=True` is incompatible with gradient checkpointing. Setting `use_cache=False`..."
)
use_cache = False
# decoder layers
all_hidden_states = () if output_hidden_states else None
all_self_attns = () if output_attentions else None
next_decoder_cache = () if use_cache else None
for idx, decoder_layer in enumerate(self.layers):
if output_hidden_states:
all_hidden_states += (hidden_states,)
past_key_value = past_key_values[idx] if past_key_values is not None else None
def vblock(
main_block,
hidden_states,
attention_mask,
position_ids,
past_key_value,
image_hidden_states,
image_attention_mask,
output_attentions,
use_cache,
no_images,
layer_idx,
cross_layer_interval,
gated_cross_attn_layers,
):
# TODO(ls): Add cross attention values to respective lists
if layer_idx % cross_layer_interval == 0:
xblock = gated_cross_attn_layers[layer_idx // cross_layer_interval]
outputs = xblock(
hidden_states,
attention_mask=attention_mask,
image_hidden_states=image_hidden_states,
image_attention_mask=image_attention_mask,
output_attentions=output_attentions,
use_cache=use_cache,
past_key_value=None, # not implemented
no_images=no_images,
)
hidden_states = outputs[0]
layer_outputs = main_block(
hidden_states,
attention_mask=attention_mask,
position_ids=position_ids,
past_key_value=past_key_value,
output_attentions=output_attentions,
use_cache=use_cache,
)
return layer_outputs
if self.gradient_checkpointing and self.training:
past_key_value = None
if use_cache:
logger.warning_once(
"`use_cache=True` is incompatible with gradient checkpointing. Setting `use_cache=False`..."
)
use_cache = False
layer_outputs = torch.utils.checkpoint.checkpoint(
vblock,
decoder_layer,
hidden_states,
attention_mask,
position_ids,
past_key_value,
image_hidden_states,
image_attention_mask,
output_attentions,
use_cache,
no_images,
idx,
self.cross_layer_interval,
self.gated_cross_attn_layers,
)
else:
layer_outputs = vblock(
decoder_layer,
hidden_states,
attention_mask=attention_mask,
position_ids=position_ids,
past_key_value=past_key_value,
image_hidden_states=image_hidden_states,
image_attention_mask=image_attention_mask,
output_attentions=output_attentions,
use_cache=use_cache,
no_images=no_images,
layer_idx=idx,
cross_layer_interval=self.cross_layer_interval,
gated_cross_attn_layers=self.gated_cross_attn_layers,
)
hidden_states = layer_outputs[0]
if use_cache:
next_decoder_cache += (layer_outputs[2 if output_attentions else 1],)
if output_attentions:
all_self_attns += (layer_outputs[1],)
hidden_states = self.norm(hidden_states)
# add hidden states from the last decoder layer
if output_hidden_states:
all_hidden_states += (hidden_states,)
next_cache = next_decoder_cache if use_cache else None
image_hidden_states = image_hidden_states.view(batch_size, num_images, image_seq_len, image_hidden_size)
if not return_dict:
return tuple(
v
for v in [hidden_states, next_cache, all_hidden_states, all_self_attns, image_hidden_states]
if v is not None
)
return IdeficsBaseModelOutputWithPast(
last_hidden_state=hidden_states,
past_key_values=next_cache,
hidden_states=all_hidden_states,
attentions=all_self_attns,
image_hidden_states=image_hidden_states,
)
class IdeficsForVisionText2Text(IdeficsPreTrainedModel):
_keys_to_ignore_on_load_missing = [r"lm_head.weight"]
_tied_weights_keys = ["model.embed_tokens.weight", "lm_head.weight"]
def __init__(self, config, vision_model=None):
super().__init__(config)
self.model = IdeficsModel(config)
self.lm_head = IdeficsDecoupledLinear(
in_features=config.hidden_size,
out_features=config.vocab_size,
out_additional_features=config.additional_vocab_size,
bias=False,
partially_freeze=config.freeze_lm_head,
)
# Initialize weights and apply final processing
self.post_init()
def get_input_embeddings(self):
return self.model.embed_tokens
def set_input_embeddings(self, value):
self.model.embed_tokens = value
def get_output_embeddings(self):
return self.lm_head
def set_output_embeddings(self, new_embeddings):
self.lm_head = new_embeddings
def set_decoder(self, decoder):
self.model = decoder
def get_decoder(self):
return self.model
def tie_weights(self):
"""
Overwrite `transformers.modeling_utils.PreTrainedModel.tie_weights` to handle the case of
IdeficsDecoupledLinear and IdeficsDecoupledEmbedding.
"""
output_embeddings = self.get_output_embeddings()
input_embeddings = self.get_input_embeddings()
if getattr(self.config, "tie_word_embeddings", True):
output_embeddings.weight = input_embeddings.weight
if input_embeddings.num_additional_embeddings > 0:
assert output_embeddings.out_additional_features == input_embeddings.num_additional_embeddings
output_embeddings.additional_fc.weight = input_embeddings.additional_embedding.weight
if hasattr(output_embeddings, "out_features") and hasattr(input_embeddings, "num_embeddings"):
output_embeddings.out_features = input_embeddings.num_embeddings
if hasattr(output_embeddings, "out_additional_features") and hasattr(
input_embeddings, "num_additional_embeddings"
):
output_embeddings.out_additional_features = input_embeddings.num_additional_embeddings
@add_start_docstrings_to_model_forward(LLAMA_INPUTS_DOCSTRING)
@replace_return_docstrings(output_type=IdeficsCausalLMOutputWithPast, config_class=_CONFIG_FOR_DOC)
def forward(
self,
input_ids: torch.LongTensor = None,
attention_mask: Optional[torch.Tensor] = None,
position_ids: Optional[torch.LongTensor] = None,
past_key_values: Optional[List[torch.FloatTensor]] = None,
inputs_embeds: Optional[torch.FloatTensor] = None,
pixel_values: Optional[torch.FloatTensor] = None,
image_encoder_embeddings: Optional[torch.FloatTensor] = None,
perceiver_embeddings: Optional[torch.FloatTensor] = None,
image_attention_mask: Optional[torch.Tensor] = None,
labels: Optional[torch.LongTensor] = None,
use_cache: Optional[bool] = None,
output_attentions: Optional[bool] = None,
output_hidden_states: Optional[bool] = None,
return_dict: Optional[bool] = None,
) -> Union[Tuple, IdeficsCausalLMOutputWithPast]:
r"""
Args:
labels (`torch.LongTensor` of shape `(batch_size, sequence_length)`, *optional*):
Labels for computing the masked language modeling loss. Indices should either be in `[0, ...,
config.vocab_size]` or -100 (see `input_ids` docstring). Tokens with indices set to `-100` are ignored
(masked), the loss is only computed for the tokens with labels in `[0, ..., config.vocab_size]`.
Returns:
Example:
```python
>>> from transformers import AutoTokenizer, LlamaForCausalLM
>>> model = LlamaForCausalLM.from_pretrained(PATH_TO_CONVERTED_WEIGHTS)
>>> tokenizer = AutoTokenizer.from_pretrained(PATH_TO_CONVERTED_TOKENIZER)
>>> prompt = "Hey, are you consciours? Can you talk to me?"
>>> inputs = tokenizer(prompt, return_tensors="pt")
>>> # Generate
>>> generate_ids = model.generate(inputs.input_ids, max_length=30)
>>> tokenizer.batch_decode(generate_ids, skip_special_tokens=True, clean_up_tokenization_spaces=False)[0]
"Hey, are you consciours? Can you talk to me?\nI'm not consciours, but I can talk to you."
```"""
output_attentions = output_attentions if output_attentions is not None else self.config.output_attentions
output_hidden_states = (
output_hidden_states if output_hidden_states is not None else self.config.output_hidden_states
)
return_dict = return_dict if return_dict is not None else self.config.use_return_dict
# decoder outputs consists of (dec_features, layer_state, dec_hidden, dec_attn)
outputs = self.model(
input_ids=input_ids,
attention_mask=attention_mask,
position_ids=position_ids,
past_key_values=past_key_values,
inputs_embeds=inputs_embeds,
pixel_values=pixel_values,
image_encoder_embeddings=image_encoder_embeddings,
perceiver_embeddings=perceiver_embeddings,
image_attention_mask=image_attention_mask,
use_cache=use_cache,
output_attentions=output_attentions,
output_hidden_states=output_hidden_states,
return_dict=return_dict,
)
hidden_states = outputs[0]
logits = self.lm_head(hidden_states)
loss = None
if labels is not None:
# Shift so that tokens < n predict n
if attention_mask is not None:
shift_attention_mask = attention_mask[..., 1:]
shift_logits = logits[..., :-1, :][shift_attention_mask != 0].contiguous()
shift_labels = labels[..., 1:][shift_attention_mask != 0].contiguous()
else:
shift_logits = logits[..., :-1, :].contiguous()
shift_labels = labels[..., 1:].contiguous()
# Flatten the tokens
loss_fct = CrossEntropyLoss()
loss = loss_fct(shift_logits.view(-1, shift_logits.size(-1)), shift_labels.view(-1))
if not return_dict:
output = (logits,) + outputs[1:]
return (loss,) + output if loss is not None else output
return IdeficsCausalLMOutputWithPast(
loss=loss,
logits=logits,
past_key_values=outputs.past_key_values,
hidden_states=outputs.hidden_states,
attentions=outputs.attentions,
image_hidden_states=outputs.image_hidden_states,
)
def prepare_inputs_for_generation(self, input_ids, past=None, **kwargs):
image_hidden_states = kwargs.pop("image_hidden_states", None)
if image_hidden_states is not None:
if self.config.use_resampler:
kwargs["perceiver_embeddings"] = image_hidden_states
else:
kwargs["image_encoder_embeddings"] = image_hidden_states
kwargs["pixel_values"] = None
inputs = prepare_inputs_for_generation(input_ids, past=past, **kwargs)
unwanted_kwargs = ["token_type_ids"]
for kwarg in unwanted_kwargs:
inputs.pop(kwarg, None)
return inputs
@staticmethod
def _expand_inputs_for_generation(
*args,
**model_kwargs,
):
return expand_inputs_for_generation(*args, **model_kwargs)
@staticmethod
def _update_model_kwargs_for_generation(outputs, model_kwargs, is_encoder_decoder):
return update_model_kwargs_for_generation(outputs, model_kwargs)
@staticmethod
def _reorder_cache(past, beam_idx):
reordered_past = ()
for layer_past in past:
reordered_past += (tuple(past_state.index_select(0, beam_idx) for past_state in layer_past),)
return reordered_past
# This code was adapted from https://github.com/lucidrains/flamingo-pytorch licensed under the MIT License.
#
# MIT License
#
# Copyright (c) 2020 The Google AI Language Team Authors, The HuggingFace Inc. team and github/lonePatient
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
"""
Generic interface to various configurations of the Perceiver Resampler, that simply takes in a series of (potentially
time-indexed) contextual embeddings, and "resamples" (compresses) them down to a pre-specified number of latents! Note
that the Perceiver in general resamples based solely off the *long-range* context; there's a nice opportunity here to
prime the Perceiver Resampler with say a single layer's worth of language embeddings (the target domain), and use that
to softly "retrieve & compress" what we need --> this would be a novel contribution we should explore.
References:
- DeepMind's Flamingo: https://www.deepmind.com/blog/tackling-multiple-tasks-with-a-single-visual-language-model
- Code borrowed w/ love from: https://github.com/lucidrains/flamingo-pytorch
"""
from typing import Optional, Tuple
import torch
import torch.nn as nn
from .configuration_idefics import IdeficsConfig
class IdeficsPerceiverResampler(nn.Module):
def __init__(
self, config: IdeficsConfig, embed_dim: int, depth: int, n_heads: int, head_dim: int, n_latents: int
) -> None:
"""
Instantiates a Perceiver Resampler that operates over a sequence of embeddings (say from a ResNet or ViT or
MAE) of a given dimension, performs `depth` blocks of cross-attention with a fixed `n_latents` inputs, then
returns a Tensor of shape [bsz, n_latents, embed_dim]. :param embed_dim: Dimensionality of embeddings being fed
to the Perceiver Resampler (also dimensionality of latent embeddings *returned* by the Perceiver Resampler.
Could be e.g., VIT embed_dim, ResNet pool dim, and so on.
Args:
config (`IdeficsConfig`): config object
embed_dim (`int`): The size of each embedding vector
depth (`int`): Depth of the Perceiver Resampler (Transformer w/ cross attention). Should be shallow (< 3).
n_heads (`int`): Number of heads in each Transformer block (for multi-headed self-attention).
head_dim (`int`): Dimensionality of each head projection in the Transformer block.
n_latents (`int`):
Number of latent embeddings to resample ("compress") the input sequence to (usually < 128).
"""
super().__init__()
self.embed_dim, self.n_heads, self.head_dim, self.n_latents = embed_dim, n_heads, head_dim, n_latents
self.qk_layer_norms = config.perceiver_config.qk_layer_norms_perceiver
# Create Latents for Perceiver
self.latents = nn.Parameter(torch.randn(self.n_latents, self.embed_dim), requires_grad=True)
self.intermediate_dim = (
self.embed_dim * 4
if not hasattr(config.vision_config, "embed_dim")
else config.vision_config.embed_dim * 4
)
# Create Transformer Blocks
self.blocks = nn.ModuleList(
[
nn.ModuleList(
[
IdeficsPerceiverAttention(self.embed_dim, self.n_heads, self.head_dim, self.qk_layer_norms),
IdeficsMLP(self.intermediate_dim, config),
]
)
for _ in range(depth)
]
)
self.layer_norm = nn.LayerNorm(self.embed_dim)
def forward(self, context: torch.Tensor) -> torch.Tensor:
"""Resample arbitrary length context & *compress* down to self.n_latents latent embeddings"""
# einsum.repeat(self.latents, "seq embed -> bsz seq embed", bsz=context.shape[0])
latents = self.latents.repeat(context.shape[0], 1, 1)
# Feed through Perceiver Attention blocks...
for attn, ff in self.blocks:
latents = attn(context, latents) + latents
latents = ff(latents) + latents
return self.layer_norm(latents)
class IdeficsPerceiverAttention(nn.Module):
def __init__(self, embed_dim: int, n_heads: int, head_dim: int, qk_layer_norms: bool) -> None:
"""Perceiver Cross-Attention Module --> let long-form inputs be `context`, resampled embeddings be `latents`"""
super().__init__()
self.embed_dim, self.n_heads, self.head_dim = embed_dim, n_heads, head_dim
self.qk_layer_norms = qk_layer_norms
# Normalization & Scaling
self.context_layer_norm = nn.LayerNorm(self.embed_dim)
self.latents_layer_norm = nn.LayerNorm(self.embed_dim)
if self.qk_layer_norms:
self.q_layer_norm = nn.LayerNorm(self.head_dim)
self.k_layer_norm = nn.LayerNorm(self.head_dim)
self.qk_scale = self.head_dim**-0.5
# Q, K, V Projection (no bias -- detail from Perceiver/Flamingo Papers).
self.q_proj = nn.Linear(self.embed_dim, self.n_heads * self.head_dim, bias=False)
self.k_proj = nn.Linear(self.embed_dim, self.n_heads * self.head_dim, bias=False)
self.v_proj = nn.Linear(self.embed_dim, self.n_heads * self.head_dim, bias=False)
self.output_proj = nn.Linear(self.n_heads * self.head_dim, embed_dim, bias=False)
def forward(self, context: torch.Tensor, latents: torch.Tensor) -> torch.Tensor:
"""
Runs Perceiver Self-Attention, with special (context, latents) appended along the `seq` dimension!
Args:
context (`torch.Tensor`):
Tensor of shape `[bsz, seq, embed_dim]` representing long-form context to resample.
latents (`torch.Tensor`):
Tensor of shape `[bsz, n_latents, embed_dim]` representing fixed length latents to compress to.
Returns:
`torch.Tensor`: Tensor of shape `[bsz, n_latents, embed_dim]` representing attention over latents w/ cross
from context.
"""
context = self.context_layer_norm(context)
latents = self.latents_layer_norm(latents)
batch_size, seq_length, embed_dim = context.shape[:3]
# Query, Key, Value Projections --> Note that in Flamingo, latents are *concatenated* with context prior to attn!
# Note: This results in queries w/ `seq = n_latents`, and keys, values with `seq = len(context) + n_latents`
q = self.q_proj(latents)
k = self.k_proj(torch.cat([context, latents], dim=-2))
v = self.v_proj(torch.cat([context, latents], dim=-2))
# Multiheaded Self-Attention w/ stable softmax (subtract per-row max -- `amax` -- before softmax call)
# =>> `attn` should be a 2D matrix of shape [n_latents x (context + n_latents)]
# einsum.rearrange(x, "bsz seq (heads embed) -> bsz heads seq embed", heads=self.n_heads)
q, k, v = [x.reshape(batch_size, x.shape[1], self.n_heads, self.head_dim).transpose(1, 2) for x in (q, k, v)]
if self.qk_layer_norms:
q = self.q_layer_norm(q)
k = self.k_layer_norm(k)
scores = torch.einsum("... i d, ... j d -> ... i j", q * self.qk_scale, k)
stabilized_scores = scores - (scores.amax(dim=-1, keepdim=True).detach())
attn = stabilized_scores.softmax(dim=-1)
# Attend & project back to output...
resampled = torch.einsum("... i j, ... j d -> ... i d", attn, v)
# einsum.rearrange(resampled, "bsz heads seq embed -> bsz seq (heads embed)", heads=self.n_heads)
return self.output_proj(resampled.transpose(1, 2).flatten(-2))
class IdeficsMLP(nn.Module):
def __init__(self, intermediate_size, config: IdeficsConfig):
"""Simple MLP block with intermediate_size and embedding size"""
super().__init__()
self.embed_dim = config.vision_config.embed_dim
self.ln = nn.LayerNorm(self.embed_dim)
self.fc = nn.Linear(self.embed_dim, intermediate_size, bias=False)
self.act = nn.ReLU()
self.c_proj = nn.Linear(intermediate_size, self.embed_dim, bias=False)
def forward(self, hidden_states: Optional[Tuple[torch.FloatTensor]]) -> torch.FloatTensor:
hidden_states = self.ln(hidden_states)
hidden_states = self.fc(hidden_states)
hidden_states = self.act(hidden_states)
hidden_states = self.c_proj(hidden_states)
return hidden_states
# coding=utf-8
# Copyright 2022 The HuggingFace Inc. team.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Processor class for IDEFICS.
"""
from typing import Callable, List, Optional, Union
from urllib.parse import urlparse
from ...feature_extraction_utils import BatchFeature
from ...processing_utils import ProcessorMixin
from ...tokenization_utils_base import BatchEncoding, PaddingStrategy, TextInput, TruncationStrategy
from ...utils import TensorType, is_torch_available
if is_torch_available():
import torch
IMAGE_TOKEN = "<image>"
# copied from m4.training.packing
def incremental_to_binary_attention_mask(incremental_mask, num_classes=-1):
# This function converts: [-1, 0, 1] => [[0, 0], [1, 0], [0, 1]]
# If any of images index are more than num_classes, set them to -1.
# Words after the max number of images allowed have been seen don't attend on anything
if num_classes != -1:
incremental_mask[incremental_mask >= num_classes] = -1
negatives = incremental_mask == -1
incremental_mask[negatives] = 0
attn_mask = torch.nn.functional.one_hot(incremental_mask, num_classes=num_classes)
attn_mask[negatives, :] = 0
return attn_mask
# copied from m4.training.packing
def image_attention_mask_for_packed_input_ids(input_ids, tokenizer):
image_attention_mask = torch.full_like(input_ids, fill_value=-1)
next_image_attention_mask = torch.full_like(input_ids, fill_value=-1)
image_token_id = tokenizer.convert_tokens_to_ids(IMAGE_TOKEN)
eod_token_id = tokenizer.eos_token_id
for batch_idx in range(input_ids.size(0)):
count = -1
seen_eod = False
for idx, token_id in enumerate(input_ids[batch_idx]):
if token_id == image_token_id:
count += 1
image_attention_mask[batch_idx][idx] = count
seen_eod = False
else:
image_attention_mask[batch_idx][idx] = count
if seen_eod:
image_attention_mask[batch_idx][idx] = -1
if token_id == eod_token_id:
seen_eod = True
for batch_idx in range(input_ids.size(0)):
count = -1
seen_eod = False
for idx in range(input_ids[batch_idx].size(0) - 1, -1, -1):
token_id = input_ids[batch_idx][idx]
if token_id == image_token_id:
count += 1
next_image_attention_mask[batch_idx][idx] = count
seen_eod = False
else:
next_image_attention_mask[batch_idx][idx] = count
if token_id == eod_token_id:
seen_eod = True
if seen_eod:
next_image_attention_mask[batch_idx][idx] = -1
non_negative_indices = next_image_attention_mask[batch_idx] != -1
next_image_attention_mask[batch_idx][non_negative_indices] -= count
next_image_attention_mask[batch_idx][non_negative_indices] *= -1
return image_attention_mask, next_image_attention_mask
def is_url(string):
"""Checks if the passed string contains a valid url and nothing else. e.g. if space is included it's immediately
invalidated the url"""
if " " in string:
return False
result = urlparse(string)
return all([result.scheme, result.netloc])
class IdeficsProcessor(ProcessorMixin):
r"""
Constructs a IDEFICS processor which wraps a LLama tokenizer and IDEFICS image processor into a single processor.
[`IdeficsProcessor`] offers all the functionalities of [`IdeficsImageProcessor`] and [`LlamaTokenizerFast`]. See
the docstring of [`~IdeficsProcessor.__call__`] and [`~IdeficsProcessor.decode`] for more information.
Args:
image_processor (`IdeficsImageProcessor`):
An instance of [`IdeficsImageProcessor`]. The image processor is a required input.
tokenizer (`LlamaTokenizerFast`):
An instance of [`LlamaTokenizerFast`]. The tokenizer is a required input.
image_size (`int`, *optional*, defaults to 224): Image size (assuming a square image)
"""
attributes = ["image_processor", "tokenizer"]
image_processor_class = "IdeficsImageProcessor"
tokenizer_class = "LlamaTokenizerFast"
def __init__(self, image_processor, tokenizer=None, image_size=224, add_end_of_utterance_token=None, **kwargs):
if image_processor is None:
raise ValueError("You need to specify an `image_processor`.")
if tokenizer is None:
raise ValueError("You need to specify a `tokenizer`.")
super().__init__(image_processor, tokenizer)
self.current_processor = self.image_processor
self.image_token_id = tokenizer.convert_tokens_to_ids(IMAGE_TOKEN)
self.default_image_dims = (
self.image_processor.image_num_channels,
self.image_processor.image_size,
self.image_processor.image_size,
)
self.tokenizer_was_trained_with_end_of_utterance_token = (
True
if "<end_of_utterance>" in self.tokenizer.special_tokens_map.get("additional_special_tokens", [])
else False
)
def __call__(
self,
prompts: Union[List[TextInput], List[List[TextInput]]],
padding: Union[bool, str, PaddingStrategy] = False,
truncation: Union[bool, str, TruncationStrategy] = None,
max_length: Optional[int] = None,
transform: Callable = None,
add_eos_token=False,
add_end_of_utterance_token=None,
debug=False,
return_tensors: Optional[Union[str, TensorType]] = TensorType.PYTORCH,
) -> BatchEncoding:
"""This method takes batched or non-batched prompts made of text and images and converts them into prompts that
the model was trained on and prepares the image pixel values for the model to process.
Args:
prompts (`Union[List[TextInput], [List[List[TextInput]]]]`):
either a single prompt or a batched list of prompts - see the detailed description immediately after
the end of the arguments doc section.
padding (`bool`, `str` or [`~utils.PaddingStrategy`], *optional*, defaults to `False`):
Select a strategy to pad the returned sequences (according to the model's padding side and padding
index) among:
- `True` or `'longest'`: Pad to the longest sequence in the batch (or no padding if only a single
sequence if provided).
- `'max_length'`: Pad to a maximum length specified with the argument `max_length` or to the maximum
acceptable input length for the model if that argument is not provided.
- `False` or `'do_not_pad'` (default): No padding (i.e., can output a batch with sequences of different
lengths).
max_length (`int`, *optional*):
Maximum length of the returned list and optionally padding length (see above).
truncation (`bool`, *optional*):
Activates truncation to cut input sequences longer than `max_length` to `max_length`.
transform (`Callable`, *optional*):
A custom transform function that accepts a single image can be passed for training. For example,
`torchvision.Compose` can be used to compose multiple functions. If `None` a preset inference-specific
set of transforms will be applied to the images
add_eos_token (`bool`, *optional*, defaults to `False`):
Adds `eos_token` at the end of the final prompt if True`
add_end_of_utterance_token (`bool`, *optional*)
Whether to automatically add `<end_of_utterance>` after each prompt's text input (unless followed by an
image). If `None` the tokenizer will be checked instead and if this token is found in
`additional_special_tokens` then the value will be `True`.
debug (`bool`, *optional*, defaults to `False`):
`True` value will help debug prompt generation by dumping useful information
return_tensors (`str` or `TensorType`, *optional*, defaults to `TensorType.PYTORCH`):
The type of tensors to return. Can be one of:
- `TensorType.PYTORCH` or `'pt'`: Return a batch of type `torch.Tensor`.
Returns:
a dict with entries: `input_ids`, `attention_mask`, `pixel_values`, `image_attention_mask` which can be
directly passed to `model.generate`
Detailed explanation:
Each entry in `prompts` is either a text to be passed as is or an image that will be processed.
An image can be either an image object (`PIL.Image`) or a url from which the image can be retrieved.
When the processor encounters an image it'll inject `<fake_token_around_image><image><fake_token_around_image>`
entry into the prompt.
Example:
```python
checkpoint = "HuggingFaceM4/idefics-9b"
processor = AutoProcessor.from_pretrained(checkpoint)
url = "https://hips.hearstapps.com/hmg-prod/images/cute-photos-of-cats-in-grass-1593184777.jpg"
img = processor.image_processor.fetch_images([url])[0]
prompts = [
"User:",
img,
"Describe this image.\nAssistant: An image of two kittens in grass.\n",
"User:",
"https://hips.hearstapps.com/hmg-prod/images/dog-puns-1581708208.jpg",
"Describe this image.\nAssistant:",
]
inputs = processor(prompts, return_tensors="pt")
generated_ids = model.generate(**inputs, max_length=100)
generated_text = processor.batch_decode(generated_ids, skip_special_tokens=True)[0]
```
In this example the `prompts` will be converted into:
```
<s>User:<fake_token_around_image><image><fake_token_around_image>Describe this image.
Assistant: An image of two kittens in grass.
User:<fake_token_around_image><image><fake_token_around_image>Describe this image.
Assistant:'
```
and the two images will be massaged using [`IdeficsImageProcessor.__call__`] method and placed inside the
`pixel_values` dict entry of the return value.
This example also examplifies that images can be passed as objects or as text urls. It can be seen that the
first image is passed as object and the second one as a url.
To do training do:
```python
image_transform = transforms.Compose(
[
transforms.RandomResizedCrop(
(w, h), scale=(0.9, 1.0), interpolation=transforms.InterpolationMode.BICUBIC
),
transforms.ToTensor(),
transforms.Normalize(mean=self.image_mean, std=self.image_std),
]
)
inputs = processor(prompts, transform=image_transform, return_tensors="pt")
```
In order to help debug prompt generation enable `debug=True` which will show you what's happening.
"""
# if the value isn't overriden by the user, check if the tokenizer was trained with this token and then use it
if add_end_of_utterance_token is None:
add_end_of_utterance_token = self.tokenizer_was_trained_with_end_of_utterance_token
# turn non-batched prompts into batched
if not any(isinstance(i, list) for i in prompts):
prompts = [prompts]
fake_token = "<fake_token_around_image>"
image_token = "<image>"
end_of_utterance_token = "<end_of_utterance>"
def image_tokens(last_was_image):
if last_was_image:
return image_token + fake_token
else:
return fake_token + image_token + fake_token
all_texts = []
all_images = []
for sample in prompts:
# the model was trained on samples starting with <s>
full_text = f"{self.tokenizer.bos_token}"
# an image can either be an image object in the item or the url, everything else is a verbatim prompt text
image_objects = []
last_was_image = False
last_was_text = False
for i, item in enumerate(sample):
if i > 0:
last_was_text = True if not last_was_image else False
if isinstance(item, str):
item = item.strip(" ")
if is_url(item):
image = self.image_processor.fetch_images(item)
full_text += image_tokens(last_was_image)
image_objects.append(image)
last_was_image = True
else:
# we add end_of_utterance_token between each subsequent text prompts (but not at the last one!)
if add_end_of_utterance_token and last_was_text:
full_text += end_of_utterance_token
full_text += item
last_was_image = False
else:
# must be an image obj
full_text += image_tokens(last_was_image)
image_objects.append(item)
last_was_image = True
if add_eos_token:
full_text += self.tokenizer.eos_token
if debug is True:
print(f"{full_text=}")
image_objects = self.image_processor(image_objects, transform=transform)
text_encoding = self.tokenizer(
text=full_text,
add_special_tokens=False,
padding=padding,
truncation=truncation,
max_length=max_length,
)
all_texts.append(text_encoding["input_ids"])
all_images.append(image_objects)
max_seq_len = max(len(x) for x in all_texts)
# max_num_images has to be at least 1 even when there are no images
max_num_images = max(len(x) for x in all_images)
max_num_images = max(1, max_num_images)
at_least_one_image = sum(len(x) for x in all_images) > 0
output_input_ids = []
output_images = []
output_attention_masks = []
for text, images in zip(all_texts, all_images):
padded_input_ids = [self.tokenizer.pad_token_id] * max_seq_len
unpadded_seq_len = len(text)
start = max_seq_len - unpadded_seq_len
padded_input_ids[start:] = text[:max_seq_len]
attention_mask = torch.zeros((max_seq_len,), dtype=torch.long)
attention_mask[start:] = 1
image_count = padded_input_ids.count(self.image_token_id)
local_max_num_images = min(image_count, max_num_images)
current_images = images[:local_max_num_images]
if len(current_images) > 0:
padded_image_tensor = torch.zeros(max_num_images, *current_images.size()[1:])
padded_image_tensor[: current_images.size(0)] = current_images
else:
padded_image_tensor = torch.zeros(max_num_images, *self.default_image_dims)
output_images.append(padded_image_tensor)
output_input_ids.append(torch.tensor(padded_input_ids))
output_attention_masks.append(attention_mask)
output_input_ids = torch.stack(output_input_ids)
output_images = torch.stack(output_images)
output_attention_masks = torch.stack(output_attention_masks)
if at_least_one_image:
image_attention_mask, _ = image_attention_mask_for_packed_input_ids(output_input_ids, self.tokenizer)
image_attention_mask = incremental_to_binary_attention_mask(
image_attention_mask, num_classes=max_num_images
)
else:
# in full language mode we set the image mask to all-0s
image_attention_mask = torch.zeros(
output_input_ids.shape[0], output_input_ids.shape[1], 1, dtype=torch.bool
)
return BatchFeature(
data={
"input_ids": output_input_ids,
"attention_mask": output_attention_masks,
"pixel_values": output_images,
"image_attention_mask": image_attention_mask,
}
)
def batch_decode(self, *args, **kwargs):
"""
This method forwards all its arguments to LlamaTokenizerFast's [`~PreTrainedTokenizer.batch_decode`]. Please
refer to the docstring of this method for more information.
"""
return self.tokenizer.batch_decode(*args, **kwargs)
def decode(self, *args, **kwargs):
"""
This method forwards all its arguments to LlamaTokenizerFast's [`~PreTrainedTokenizer.decode`]. Please refer to
the docstring of this method for more information.
"""
return self.tokenizer.decode(*args, **kwargs)
@property
def model_input_names(self):
tokenizer_input_names = self.tokenizer.model_input_names
image_processor_input_names = self.image_processor.model_input_names
return list(dict.fromkeys(tokenizer_input_names + image_processor_input_names))
# coding=utf-8
# Copyright 2021 The OpenAI Team Authors and The HuggingFace Team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
""" PyTorch IdeficsVision model: a copy of CLIPVisionModel using a simpler config object"""
from dataclasses import dataclass
from typing import Optional, Tuple, Union
import torch
import torch.utils.checkpoint
from torch import nn
from ...activations import ACT2FN
from ...modeling_outputs import BaseModelOutput, BaseModelOutputWithPooling
from ...utils import (
ModelOutput,
logging,
)
from .configuration_idefics import IdeficsVisionConfig
logger = logging.get_logger(__name__)
@dataclass
class IdeficsVisionModelOutput(ModelOutput):
"""
Base class for vision model's outputs that also contains image embeddings of the pooling of the last hidden states.
Args:
image_embeds (`torch.FloatTensor` of shape `(batch_size, output_dim)` *optional* returned when model is initialized with `with_projection=True`):
The image embeddings obtained by applying the projection layer to the pooler_output.
last_hidden_state (`torch.FloatTensor` of shape `(batch_size, sequence_length, hidden_size)`):
Sequence of hidden-states at the output of the last layer of the model.
hidden_states (`tuple(torch.FloatTensor)`, *optional*, returned when `output_hidden_states=True` is passed or when `config.output_hidden_states=True`):
Tuple of `torch.FloatTensor` (one for the output of the embeddings, if the model has an embedding layer, +
one for the output of each layer) of shape `(batch_size, sequence_length, hidden_size)`.
Hidden-states of the model at the output of each layer plus the optional initial embedding outputs.
attentions (`tuple(torch.FloatTensor)`, *optional*, returned when `output_attentions=True` is passed or when `config.output_attentions=True`):
Tuple of `torch.FloatTensor` (one for each layer) of shape `(batch_size, num_heads, sequence_length,
sequence_length)`.
Attentions weights after the attention softmax, used to compute the weighted average in the self-attention
heads.
"""
image_embeds: Optional[torch.FloatTensor] = None
last_hidden_state: torch.FloatTensor = None
hidden_states: Optional[Tuple[torch.FloatTensor]] = None
attentions: Optional[Tuple[torch.FloatTensor]] = None
# Copied from transformers.models.clip.modeling_clip.CLIPVisionEmbeddings with CLIP->Idefics
class IdeficsVisionEmbeddings(nn.Module):
def __init__(self, config: IdeficsVisionConfig):
super().__init__()
self.config = config
self.embed_dim = config.hidden_size
self.image_size = config.image_size
self.patch_size = config.patch_size
self.class_embedding = nn.Parameter(torch.randn(self.embed_dim))
self.patch_embedding = nn.Conv2d(
in_channels=config.num_channels,
out_channels=self.embed_dim,
kernel_size=self.patch_size,
stride=self.patch_size,
bias=False,
)
self.num_patches = (self.image_size // self.patch_size) ** 2
self.num_positions = self.num_patches + 1
self.position_embedding = nn.Embedding(self.num_positions, self.embed_dim)
self.register_buffer("position_ids", torch.arange(self.num_positions).expand((1, -1)), persistent=False)
def forward(self, pixel_values: torch.FloatTensor) -> torch.Tensor:
batch_size = pixel_values.shape[0]
target_dtype = self.patch_embedding.weight.dtype
patch_embeds = self.patch_embedding(pixel_values.to(dtype=target_dtype)) # shape = [*, width, grid, grid]
patch_embeds = patch_embeds.flatten(2).transpose(1, 2)
class_embeds = self.class_embedding.expand(batch_size, 1, -1)
embeddings = torch.cat([class_embeds, patch_embeds], dim=1)
embeddings = embeddings + self.position_embedding(self.position_ids)
return embeddings
# Copied from transformers.models.clip.modeling_clip.CLIPAttention with CLIP->IdeficsVision
class IdeficsVisionAttention(nn.Module):
"""Multi-headed attention from 'Attention Is All You Need' paper"""
def __init__(self, config):
super().__init__()
self.config = config
self.embed_dim = config.hidden_size
self.num_heads = config.num_attention_heads
self.head_dim = self.embed_dim // self.num_heads
if self.head_dim * self.num_heads != self.embed_dim:
raise ValueError(
f"embed_dim must be divisible by num_heads (got `embed_dim`: {self.embed_dim} and `num_heads`:"
f" {self.num_heads})."
)
self.scale = self.head_dim**-0.5
self.dropout = config.attention_dropout
self.k_proj = nn.Linear(self.embed_dim, self.embed_dim)
self.v_proj = nn.Linear(self.embed_dim, self.embed_dim)
self.q_proj = nn.Linear(self.embed_dim, self.embed_dim)
self.out_proj = nn.Linear(self.embed_dim, self.embed_dim)
def _shape(self, tensor: torch.Tensor, seq_len: int, bsz: int):
return tensor.view(bsz, seq_len, self.num_heads, self.head_dim).transpose(1, 2).contiguous()
def forward(
self,
hidden_states: torch.Tensor,
attention_mask: Optional[torch.Tensor] = None,
causal_attention_mask: Optional[torch.Tensor] = None,
output_attentions: Optional[bool] = False,
) -> Tuple[torch.Tensor, Optional[torch.Tensor], Optional[Tuple[torch.Tensor]]]:
"""Input shape: Batch x Time x Channel"""
bsz, tgt_len, embed_dim = hidden_states.size()
# get query proj
query_states = self.q_proj(hidden_states) * self.scale
key_states = self._shape(self.k_proj(hidden_states), -1, bsz)
value_states = self._shape(self.v_proj(hidden_states), -1, bsz)
proj_shape = (bsz * self.num_heads, -1, self.head_dim)
query_states = self._shape(query_states, tgt_len, bsz).view(*proj_shape)
key_states = key_states.view(*proj_shape)
value_states = value_states.view(*proj_shape)
src_len = key_states.size(1)
attn_weights = torch.bmm(query_states, key_states.transpose(1, 2))
if attn_weights.size() != (bsz * self.num_heads, tgt_len, src_len):
raise ValueError(
f"Attention weights should be of size {(bsz * self.num_heads, tgt_len, src_len)}, but is"
f" {attn_weights.size()}"
)
# apply the causal_attention_mask first
if causal_attention_mask is not None:
if causal_attention_mask.size() != (bsz, 1, tgt_len, src_len):
raise ValueError(
f"Attention mask should be of size {(bsz, 1, tgt_len, src_len)}, but is"
f" {causal_attention_mask.size()}"
)
attn_weights = attn_weights.view(bsz, self.num_heads, tgt_len, src_len) + causal_attention_mask
attn_weights = attn_weights.view(bsz * self.num_heads, tgt_len, src_len)
if attention_mask is not None:
if attention_mask.size() != (bsz, 1, tgt_len, src_len):
raise ValueError(
f"Attention mask should be of size {(bsz, 1, tgt_len, src_len)}, but is {attention_mask.size()}"
)
attn_weights = attn_weights.view(bsz, self.num_heads, tgt_len, src_len) + attention_mask
attn_weights = attn_weights.view(bsz * self.num_heads, tgt_len, src_len)
attn_weights = nn.functional.softmax(attn_weights, dim=-1)
if output_attentions:
# this operation is a bit akward, but it's required to
# make sure that attn_weights keeps its gradient.
# In order to do so, attn_weights have to reshaped
# twice and have to be reused in the following
attn_weights_reshaped = attn_weights.view(bsz, self.num_heads, tgt_len, src_len)
attn_weights = attn_weights_reshaped.view(bsz * self.num_heads, tgt_len, src_len)
else:
attn_weights_reshaped = None
attn_probs = nn.functional.dropout(attn_weights, p=self.dropout, training=self.training)
attn_output = torch.bmm(attn_probs, value_states)
if attn_output.size() != (bsz * self.num_heads, tgt_len, self.head_dim):
raise ValueError(
f"`attn_output` should be of size {(bsz, self.num_heads, tgt_len, self.head_dim)}, but is"
f" {attn_output.size()}"
)
attn_output = attn_output.view(bsz, self.num_heads, tgt_len, self.head_dim)
attn_output = attn_output.transpose(1, 2)
attn_output = attn_output.reshape(bsz, tgt_len, embed_dim)
attn_output = self.out_proj(attn_output)
return attn_output, attn_weights_reshaped
# Copied from transformers.models.clip.modeling_clip.CLIPMLP with CLIP->IdeficsVision
class IdeficsVisionMLP(nn.Module):
def __init__(self, config):
super().__init__()
self.config = config
self.activation_fn = ACT2FN[config.hidden_act]
self.fc1 = nn.Linear(config.hidden_size, config.intermediate_size)
self.fc2 = nn.Linear(config.intermediate_size, config.hidden_size)
def forward(self, hidden_states: torch.Tensor) -> torch.Tensor:
hidden_states = self.fc1(hidden_states)
hidden_states = self.activation_fn(hidden_states)
hidden_states = self.fc2(hidden_states)
return hidden_states
# Copied from transformers.models.clip.modeling_clip.CLIPEncoderLayer with CLIP->IdeficsVision
class IdeficsVisionEncoderLayer(nn.Module):
def __init__(self, config: IdeficsVisionConfig):
super().__init__()
self.embed_dim = config.hidden_size
self.self_attn = IdeficsVisionAttention(config)
self.layer_norm1 = nn.LayerNorm(self.embed_dim, eps=config.layer_norm_eps)
self.mlp = IdeficsVisionMLP(config)
self.layer_norm2 = nn.LayerNorm(self.embed_dim, eps=config.layer_norm_eps)
def forward(
self,
hidden_states: torch.Tensor,
attention_mask: torch.Tensor,
causal_attention_mask: torch.Tensor,
output_attentions: Optional[bool] = False,
) -> Tuple[torch.FloatTensor]:
"""
Args:
hidden_states (`torch.FloatTensor`): input to the layer of shape `(batch, seq_len, embed_dim)`
attention_mask (`torch.FloatTensor`): attention mask of size
`(batch, 1, tgt_len, src_len)` where padding elements are indicated by very large negative values.
`(config.encoder_attention_heads,)`.
output_attentions (`bool`, *optional*):
Whether or not to return the attentions tensors of all attention layers. See `attentions` under
returned tensors for more detail.
"""
residual = hidden_states
hidden_states = self.layer_norm1(hidden_states)
hidden_states, attn_weights = self.self_attn(
hidden_states=hidden_states,
attention_mask=attention_mask,
causal_attention_mask=causal_attention_mask,
output_attentions=output_attentions,
)
hidden_states = residual + hidden_states
residual = hidden_states
hidden_states = self.layer_norm2(hidden_states)
hidden_states = self.mlp(hidden_states)
hidden_states = residual + hidden_states
outputs = (hidden_states,)
if output_attentions:
outputs += (attn_weights,)
return outputs
# Copied from transformers.models.clip.modeling_clip.CLIPEncoder with CLIP->IdeficsVision
class IdeficsVisionEncoder(nn.Module):
"""
Transformer encoder consisting of `config.num_hidden_layers` self attention layers. Each layer is a
[`IdeficsVisionEncoderLayer`].
Args:
config: IdeficsVisionConfig
"""
def __init__(self, config: IdeficsVisionConfig):
super().__init__()
self.config = config
self.layers = nn.ModuleList([IdeficsVisionEncoderLayer(config) for _ in range(config.num_hidden_layers)])
self.gradient_checkpointing = False
def forward(
self,
inputs_embeds,
attention_mask: Optional[torch.Tensor] = None,
causal_attention_mask: Optional[torch.Tensor] = None,
output_attentions: Optional[bool] = None,
output_hidden_states: Optional[bool] = None,
return_dict: Optional[bool] = None,
) -> Union[Tuple, BaseModelOutput]:
r"""
Args:
inputs_embeds (`torch.FloatTensor` of shape `(batch_size, sequence_length, hidden_size)`):
Optionally, instead of passing `input_ids` you can choose to directly pass an embedded representation.
This is useful if you want more control over how to convert `input_ids` indices into associated vectors
than the model's internal embedding lookup matrix.
attention_mask (`torch.Tensor` of shape `(batch_size, sequence_length)`, *optional*):
Mask to avoid performing attention on padding token indices. Mask values selected in `[0, 1]`:
- 1 for tokens that are **not masked**,
- 0 for tokens that are **masked**.
[What are attention masks?](../glossary#attention-mask)
causal_attention_mask (`torch.Tensor` of shape `(batch_size, sequence_length)`, *optional*):
Causal mask for the text model. Mask values selected in `[0, 1]`:
- 1 for tokens that are **not masked**,
- 0 for tokens that are **masked**.
[What are attention masks?](../glossary#attention-mask)
output_attentions (`bool`, *optional*):
Whether or not to return the attentions tensors of all attention layers. See `attentions` under
returned tensors for more detail.
output_hidden_states (`bool`, *optional*):
Whether or not to return the hidden states of all layers. See `hidden_states` under returned tensors
for more detail.
return_dict (`bool`, *optional*):
Whether or not to return a [`~utils.ModelOutput`] instead of a plain tuple.
"""
output_attentions = output_attentions if output_attentions is not None else self.config.output_attentions
output_hidden_states = (
output_hidden_states if output_hidden_states is not None else self.config.output_hidden_states
)
return_dict = return_dict if return_dict is not None else self.config.use_return_dict
encoder_states = () if output_hidden_states else None
all_attentions = () if output_attentions else None
hidden_states = inputs_embeds
for idx, encoder_layer in enumerate(self.layers):
if output_hidden_states:
encoder_states = encoder_states + (hidden_states,)
if self.gradient_checkpointing and self.training:
def create_custom_forward(module):
def custom_forward(*inputs):
return module(*inputs, output_attentions)
return custom_forward
layer_outputs = torch.utils.checkpoint.checkpoint(
create_custom_forward(encoder_layer),
hidden_states,
attention_mask,
causal_attention_mask,
)
else:
layer_outputs = encoder_layer(
hidden_states,
attention_mask,
causal_attention_mask,
output_attentions=output_attentions,
)
hidden_states = layer_outputs[0]
if output_attentions:
all_attentions = all_attentions + (layer_outputs[1],)
if output_hidden_states:
encoder_states = encoder_states + (hidden_states,)
if not return_dict:
return tuple(v for v in [hidden_states, encoder_states, all_attentions] if v is not None)
return BaseModelOutput(
last_hidden_state=hidden_states, hidden_states=encoder_states, attentions=all_attentions
)
# Adapted from transformers.models.clip.modeling_clip.CLIPVisionTransformer
class IdeficsVisionTransformer(nn.Module):
def __init__(self, config: IdeficsVisionConfig):
super().__init__()
self.config = config
embed_dim = config.hidden_size
self.embeddings = IdeficsVisionEmbeddings(config)
self.pre_layrnorm = nn.LayerNorm(embed_dim, eps=config.layer_norm_eps)
self.encoder = IdeficsVisionEncoder(config)
self.post_layernorm = nn.LayerNorm(embed_dim, eps=config.layer_norm_eps)
# copied from transformers.models.clip.modeling_clip.CLIPVisionTransformer.forward
def forward(
self,
pixel_values: Optional[torch.FloatTensor] = None,
output_attentions: Optional[bool] = None,
output_hidden_states: Optional[bool] = None,
return_dict: Optional[bool] = None,
) -> Union[Tuple, BaseModelOutputWithPooling]:
r"""
Returns:
"""
output_attentions = output_attentions if output_attentions is not None else self.config.output_attentions
output_hidden_states = (
output_hidden_states if output_hidden_states is not None else self.config.output_hidden_states
)
return_dict = return_dict if return_dict is not None else self.config.use_return_dict
if pixel_values is None:
raise ValueError("You have to specify pixel_values")
hidden_states = self.embeddings(pixel_values)
hidden_states = self.pre_layrnorm(hidden_states)
encoder_outputs = self.encoder(
inputs_embeds=hidden_states,
output_attentions=output_attentions,
output_hidden_states=output_hidden_states,
return_dict=return_dict,
)
last_hidden_state = encoder_outputs[0]
pooled_output = last_hidden_state[:, 0, :]
pooled_output = self.post_layernorm(pooled_output)
if not return_dict:
return (last_hidden_state, pooled_output) + encoder_outputs[1:]
return BaseModelOutputWithPooling(
last_hidden_state=last_hidden_state,
pooler_output=pooled_output,
hidden_states=encoder_outputs.hidden_states,
attentions=encoder_outputs.attentions,
)
...@@ -3942,6 +3942,37 @@ class IBertPreTrainedModel(metaclass=DummyObject): ...@@ -3942,6 +3942,37 @@ class IBertPreTrainedModel(metaclass=DummyObject):
requires_backends(self, ["torch"]) requires_backends(self, ["torch"])
IDEFICS_PRETRAINED_MODEL_ARCHIVE_LIST = None
class IdeficsForVisionText2Text(metaclass=DummyObject):
_backends = ["torch"]
def __init__(self, *args, **kwargs):
requires_backends(self, ["torch"])
class IdeficsModel(metaclass=DummyObject):
_backends = ["torch"]
def __init__(self, *args, **kwargs):
requires_backends(self, ["torch"])
class IdeficsPreTrainedModel(metaclass=DummyObject):
_backends = ["torch"]
def __init__(self, *args, **kwargs):
requires_backends(self, ["torch"])
class IdeficsProcessor(metaclass=DummyObject):
_backends = ["torch"]
def __init__(self, *args, **kwargs):
requires_backends(self, ["torch"])
IMAGEGPT_PRETRAINED_MODEL_ARCHIVE_LIST = None IMAGEGPT_PRETRAINED_MODEL_ARCHIVE_LIST = None
......
...@@ -233,6 +233,13 @@ class GLPNImageProcessor(metaclass=DummyObject): ...@@ -233,6 +233,13 @@ class GLPNImageProcessor(metaclass=DummyObject):
requires_backends(self, ["vision"]) requires_backends(self, ["vision"])
class IdeficsImageProcessor(metaclass=DummyObject):
_backends = ["vision"]
def __init__(self, *args, **kwargs):
requires_backends(self, ["vision"])
class ImageGPTFeatureExtractor(metaclass=DummyObject): class ImageGPTFeatureExtractor(metaclass=DummyObject):
_backends = ["vision"] _backends = ["vision"]
......
# coding=utf-8
# Copyright 2021 HuggingFace Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
from transformers.testing_utils import require_torch, require_torchvision, require_vision
from transformers.utils import is_torch_available, is_torchvision_available, is_vision_available
from ...test_image_processing_common import ImageProcessingTestMixin, prepare_image_inputs
if is_torch_available():
import torch
if is_torchvision_available():
import torchvision.transforms as transforms
if is_vision_available():
from PIL import Image
from transformers import IdeficsImageProcessor
class IdeficsImageProcessingTester(unittest.TestCase):
def __init__(
self,
parent,
batch_size=7,
num_channels=3,
image_size=18,
min_resolution=30,
max_resolution=400,
size=None,
image_mean=[0.48145466, 0.4578275, 0.40821073],
image_std=[0.26862954, 0.26130258, 0.27577711],
):
size = size if size is not None else {"shortest_edge": 30}
self.parent = parent
self.batch_size = batch_size
self.num_channels = num_channels
self.image_size = image_size
self.min_resolution = min_resolution
self.max_resolution = max_resolution
# self.size = size
self.image_mean = image_mean
self.image_std = image_std
def prepare_image_processor_dict(self):
return {
"image_mean": self.image_mean,
"image_std": self.image_std,
"image_size": self.image_size,
}
def get_expected_values(self, image_inputs, batched=False):
"""
This function computes the expected height and width when providing images to IdeficsImageProcessor,
assuming do_resize is set to True with a scalar size and size_divisor.
"""
if not batched:
size = self.image_size
image = image_inputs[0]
if isinstance(image, Image.Image):
w, h = image.size
else:
h, w = image.shape[1], image.shape[2]
scale = size / min(w, h)
if h < w:
newh, neww = size, scale * w
else:
newh, neww = scale * h, size
max_size = int((1333 / 800) * size)
if max(newh, neww) > max_size:
scale = max_size / max(newh, neww)
newh = newh * scale
neww = neww * scale
newh, neww = int(newh + 0.5), int(neww + 0.5)
expected_height, expected_width = (
newh // self.size_divisor * self.size_divisor,
neww // self.size_divisor * self.size_divisor,
)
else:
expected_values = []
for image in image_inputs:
expected_height, expected_width = self.get_expected_values([image])
expected_values.append((expected_height, expected_width))
expected_height = max(expected_values, key=lambda item: item[0])[0]
expected_width = max(expected_values, key=lambda item: item[1])[1]
return expected_height, expected_width
def expected_output_image_shape(self, images):
height, width = self.get_expected_values(images, batched=True)
return (self.num_channels, height, width)
def prepare_image_inputs(self, equal_resolution=False, numpify=False, torchify=False):
return prepare_image_inputs(
batch_size=self.batch_size,
num_channels=self.num_channels,
min_resolution=self.min_resolution,
max_resolution=self.max_resolution,
equal_resolution=equal_resolution,
numpify=numpify,
torchify=torchify,
)
@require_torch
@require_vision
class IdeficsImageProcessingTest(ImageProcessingTestMixin, unittest.TestCase):
image_processing_class = IdeficsImageProcessor if is_vision_available() else None
def setUp(self):
self.image_processor_tester = IdeficsImageProcessingTester(self)
@property
def image_processor_dict(self):
return self.image_processor_tester.prepare_image_processor_dict()
def test_image_processor_properties(self):
image_processing = self.image_processing_class(**self.image_processor_dict)
self.assertTrue(hasattr(image_processing, "image_mean"))
self.assertTrue(hasattr(image_processing, "image_std"))
self.assertTrue(hasattr(image_processing, "image_size"))
def test_image_processor_from_dict_with_kwargs(self):
image_processor = self.image_processing_class.from_dict(self.image_processor_dict)
self.assertNotEqual(image_processor.image_size, 30)
image_processor = self.image_processing_class.from_dict(self.image_processor_dict, image_size=42)
self.assertEqual(image_processor.image_size, 42)
@require_torchvision
def test_torchvision_numpy_transforms_equivalency(self):
# as we had to reimplement the torchvision transforms using transformers utils we must check
# they both do the same
image_inputs = self.image_processor_tester.prepare_image_inputs(equal_resolution=False)
image_processor = self.image_processing_class(**self.image_processor_dict)
print(image_inputs)
def convert_to_rgb(image):
# `image.convert("RGB")` would only work for .jpg images, as it creates a wrong background
# for transparent images. The call to `alpha_composite` handles this case
if image.mode == "RGB":
return image
image_rgba = image.convert("RGBA")
background = Image.new("RGBA", image_rgba.size, (255, 255, 255))
alpha_composite = Image.alpha_composite(background, image_rgba)
alpha_composite = alpha_composite.convert("RGB")
return alpha_composite
image_size = image_processor.image_size
image_mean = image_processor.image_mean
image_std = image_processor.image_std
transform = transforms.Compose(
[
convert_to_rgb,
transforms.Resize((image_size, image_size), interpolation=transforms.InterpolationMode.BICUBIC),
transforms.ToTensor(),
transforms.Normalize(mean=image_mean, std=image_std),
]
)
pixel_values_transform_implied = image_processor(image_inputs, transform=None)
pixel_values_transform_supplied = image_processor(image_inputs, transform=transform)
torch.testing.assert_close(pixel_values_transform_implied, pixel_values_transform_supplied, rtol=0.0, atol=0.0)
@unittest.skip("not supported")
def test_call_numpy(self):
pass
@unittest.skip("not supported")
def test_call_numpy_4_channels(self):
pass
@unittest.skip("not supported")
def test_call_pil(self):
pass
@unittest.skip("not supported")
def test_call_pytorch(self):
pass
# coding=utf-8
# Copyright 2023 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
""" Testing suite for the PyTorch Idefics model. """
import unittest
from transformers import IdeficsConfig, is_torch_available, is_vision_available
from transformers.testing_utils import TestCasePlus, require_torch, require_vision, slow, torch_device
from transformers.utils import cached_property
from ...test_configuration_common import ConfigTester
from ...test_modeling_common import ModelTesterMixin, floats_tensor, ids_tensor, random_attention_mask
from ...test_pipeline_mixin import PipelineTesterMixin
if is_torch_available():
import torch
from transformers import IdeficsForVisionText2Text, IdeficsModel, IdeficsProcessor
from transformers.models.idefics.configuration_idefics import IdeficsPerceiverConfig, IdeficsVisionConfig
from transformers.models.idefics.modeling_idefics import IDEFICS_PRETRAINED_MODEL_ARCHIVE_LIST
from transformers.pytorch_utils import is_torch_greater_or_equal_than_2_0
else:
is_torch_greater_or_equal_than_2_0 = False
if is_vision_available():
from PIL import Image
class IdeficsModelTester:
def __init__(
self,
parent,
batch_size=1,
seq_length=7,
image_size=30,
patch_size=2,
num_channels=3,
is_training=True,
use_input_mask=True,
use_token_type_ids=True,
use_labels=True,
vocab_size=99,
hidden_size=32,
num_hidden_layers=5,
num_attention_heads=4,
intermediate_size=37,
hidden_act="gelu",
hidden_dropout_prob=0.1,
attention_probs_dropout_prob=0.1,
max_position_embeddings=512,
type_vocab_size=16,
type_sequence_label_size=2,
initializer_range=0.02,
num_labels=3,
scope=None,
modality_type_vocab_size=2,
add_multiple_images=False,
num_images=-1,
vision_embed_dim=32,
vision_patch_size=2,
vision_image_size=30,
vision_num_attention_heads=4,
vision_num_hidden_layers=5,
vision_intermediate_size=37,
perceiver_qk_layer_norms_perceiver=False,
perceiver_resampler_depth=2,
perceiver_resampler_head_dim=8,
perceiver_resampler_n_heads=2,
perceiver_resampler_n_latents=16,
):
self.parent = parent
self.batch_size = batch_size
self.seq_length = seq_length
self.image_size = image_size
self.patch_size = patch_size
self.num_channels = num_channels
self.is_training = is_training
self.use_input_mask = use_input_mask
self.use_token_type_ids = use_token_type_ids
self.use_labels = use_labels
self.vocab_size = vocab_size
self.hidden_size = hidden_size
self.num_hidden_layers = num_hidden_layers
self.num_attention_heads = num_attention_heads
self.intermediate_size = intermediate_size
self.hidden_act = hidden_act
self.hidden_dropout_prob = hidden_dropout_prob
self.attention_probs_dropout_prob = attention_probs_dropout_prob
self.max_position_embeddings = max_position_embeddings
self.type_vocab_size = type_vocab_size
self.type_sequence_label_size = type_sequence_label_size
self.initializer_range = initializer_range
self.num_labels = num_labels
self.scope = scope
self.modality_type_vocab_size = modality_type_vocab_size
self.add_multiple_images = add_multiple_images
self.num_images = num_images
self.vision_embed_dim = vision_embed_dim
self.vision_patch_size = vision_patch_size
self.vision_image_size = vision_image_size
self.vision_num_attention_heads = vision_num_attention_heads
self.vision_num_hidden_layers = vision_num_hidden_layers
self.vision_intermediate_size = vision_intermediate_size
self.vision_config = IdeficsVisionConfig(
embed_dim=self.vision_embed_dim,
patch_size=self.vision_patch_size,
image_size=self.vision_image_size,
num_attention_heads=self.vision_num_attention_heads,
num_hidden_layers=self.vision_num_hidden_layers,
intermediate_size=self.vision_intermediate_size,
)
self.perceiver_qk_layer_norms_perceiver = perceiver_qk_layer_norms_perceiver
self.perceiver_resampler_depth = perceiver_resampler_depth
self.perceiver_resampler_head_dim = perceiver_resampler_head_dim
self.perceiver_resampler_n_heads = perceiver_resampler_n_heads
self.perceiver_resampler_n_latents = perceiver_resampler_n_latents
self.perceiver_config = IdeficsPerceiverConfig(
qk_layer_norms_perceiver=self.perceiver_qk_layer_norms_perceiver,
resampler_depth=self.perceiver_resampler_depth,
resampler_head_dim=self.perceiver_resampler_head_dim,
resampler_n_heads=self.perceiver_resampler_n_heads,
resampler_n_latents=self.perceiver_resampler_n_latents,
)
# we set the expected sequence length (which is used in several tests)
# this is equal to the seq length of the text tokens + number of image patches + 1 for the CLS token
self.expected_seq_len = self.seq_length + (self.image_size // self.patch_size) ** 2 + 1
def prepare_config_and_inputs(self):
self.seq_length = 42
input_ids = ids_tensor([self.batch_size, self.seq_length], self.vocab_size)
num_images = 2 if self.add_multiple_images else 1
pixel_values = floats_tensor(
[self.batch_size, num_images, self.num_channels, self.image_size, self.image_size]
)
input_mask = None
if self.use_input_mask:
input_mask = random_attention_mask([self.batch_size, self.seq_length])
image_attention_mask = random_attention_mask([self.batch_size, self.seq_length, num_images])
config = self.get_config()
return (config, input_ids, input_mask, pixel_values, image_attention_mask)
def get_config(self):
return IdeficsConfig(
image_size=self.image_size,
patch_size=self.patch_size,
num_channels=self.num_channels,
vocab_size=self.vocab_size,
hidden_size=self.hidden_size,
num_hidden_layers=self.num_hidden_layers,
num_attention_heads=self.num_attention_heads,
intermediate_size=self.intermediate_size,
hidden_act=self.hidden_act,
hidden_dropout_prob=self.hidden_dropout_prob,
attention_probs_dropout_prob=self.attention_probs_dropout_prob,
max_position_embeddings=self.max_position_embeddings,
type_vocab_size=self.type_vocab_size,
is_decoder=False,
initializer_range=self.initializer_range,
num_labels=self.num_labels,
modality_type_vocab_size=self.modality_type_vocab_size,
num_images=self.num_images,
vision_config=self.vision_config,
)
def create_and_check_model(
self,
config,
input_ids,
input_mask,
pixel_values,
image_attention_mask,
):
model = IdeficsModel(config=config)
model.to(torch_device)
model.eval()
result = model(
input_ids, attention_mask=input_mask, pixel_values=pixel_values, image_attention_mask=image_attention_mask
)
self.parent.assertEqual(
result.last_hidden_state.shape, (self.batch_size, input_ids.shape[1], self.hidden_size)
)
def prepare_config_and_inputs_for_common(self):
config_and_inputs = self.prepare_config_and_inputs()
(
config,
input_ids,
input_mask,
pixel_values,
image_attention_mask,
) = config_and_inputs
inputs_dict = {
"input_ids": input_ids,
"attention_mask": input_mask,
"pixel_values": pixel_values,
"image_attention_mask": image_attention_mask,
}
return config, inputs_dict
def prepare_pixel_values(self):
return floats_tensor([self.batch_size, self.num_channels, self.image_size, self.image_size])
@unittest.skipIf(not is_torch_greater_or_equal_than_2_0, reason="pytorch 2.0 or higher is required")
@require_torch
class IdeficsModelTest(ModelTesterMixin, PipelineTesterMixin, unittest.TestCase):
all_model_classes = (IdeficsModel, IdeficsForVisionText2Text) if is_torch_available() else ()
pipeline_model_mapping = {}
test_pruning = False
test_headmasking = False
test_torchscript = False
def _prepare_for_class(self, inputs_dict, model_class, return_labels=False):
inputs_dict = super()._prepare_for_class(inputs_dict, model_class, return_labels=return_labels)
# XXX: IdeficsForVisionText2TextTest has no MODEL_FOR group yet, but it should be the same
# as MODEL_FOR_CAUSAL_LM_MAPPING_NAMES, so for now manually changing to do the right thing
# as super won't do it
if return_labels:
inputs_dict["labels"] = torch.zeros(
(self.model_tester.batch_size, self.model_tester.seq_length), dtype=torch.long, device=torch_device
)
return inputs_dict
def test_model_outputs_equivalence(self):
try:
orig = self.all_model_classes
# IdeficsModel.forward doesn't have labels input arg - only IdeficsForVisionText2Text does
self.all_model_classes = (IdeficsForVisionText2Text,) if is_torch_available() else ()
super().test_model_outputs_equivalence()
finally:
self.all_model_classes = orig
def setUp(self):
self.model_tester = IdeficsModelTester(self)
self.config_tester = ConfigTester(self, config_class=IdeficsConfig, hidden_size=37)
def test_config(self):
self.config_tester.run_common_tests()
def test_model(self):
config_and_inputs = self.model_tester.prepare_config_and_inputs()
self.model_tester.create_and_check_model(*config_and_inputs)
def test_training(self):
if not self.model_tester.is_training:
return
for model_class in self.all_model_classes:
# IdeficsModel does not support training, users should use
# IdeficsForVisionText2Text for this purpose
if model_class == IdeficsModel:
return
config, inputs_dict = self.model_tester.prepare_config_and_inputs_for_common()
config.return_dict = True
model = model_class(config)
model.to(torch_device)
model.train()
inputs = self._prepare_for_class(inputs_dict, model_class, return_labels=True)
for k, v in inputs.items():
print(k, v.shape)
loss = model(**inputs).loss
loss.backward()
def test_training_gradient_checkpointing(self):
if not self.model_tester.is_training:
return
for model_class in self.all_model_classes:
# IdeficsModel does not support training, users should use
# IdeficsForVisionText2Text for this purpose
if model_class == IdeficsModel:
return
config, inputs_dict = self.model_tester.prepare_config_and_inputs_for_common()
config.use_cache = False
config.return_dict = True
model = model_class(config)
model.to(torch_device)
model.gradient_checkpointing_enable()
model.train()
inputs = self._prepare_for_class(inputs_dict, model_class, return_labels=True)
loss = model(**inputs).loss
loss.backward()
@unittest.skip(reason="""IDEFICS does not support retaining the gradients of the hidden states and attention""")
def test_retain_grad_hidden_states_attentions(self):
return
def test_attention_outputs(self):
config, inputs_dict = self.model_tester.prepare_config_and_inputs_for_common()
config.return_dict = True
for model_class in self.all_model_classes:
inputs_dict["output_attentions"] = True
inputs_dict["output_hidden_states"] = False
config.return_dict = True
model = model_class(config)
model.to(torch_device)
model.eval()
with torch.no_grad():
outputs = model(**self._prepare_for_class(inputs_dict, model_class))
attentions = outputs.attentions
self.assertEqual(len(attentions), self.model_tester.num_hidden_layers)
# check that output_attentions also work using config
del inputs_dict["output_attentions"]
config.output_attentions = True
model = model_class(config)
model.to(torch_device)
model.eval()
with torch.no_grad():
outputs = model(**self._prepare_for_class(inputs_dict, model_class))
attentions = outputs.attentions
# IDEFICS does not support outputting attention score becuase it uses SDPA under the hood
self.assertTrue(attentions[0] is None)
out_len = len(outputs)
# Check attention is always last and order is fine
inputs_dict["output_attentions"] = True
inputs_dict["output_hidden_states"] = True
model = model_class(config)
model.to(torch_device)
model.eval()
with torch.no_grad():
outputs = model(**self._prepare_for_class(inputs_dict, model_class))
self.assertEqual(out_len + 1, len(outputs))
self_attentions = outputs.encoder_attentions if config.is_encoder_decoder else outputs.attentions
self.assertEqual(len(self_attentions), self.model_tester.num_hidden_layers)
# IDEFICS does not support outputting attention score becuase it uses SDPA under the hood
self.assertTrue(self_attentions[0] is None)
def test_hidden_states_output(self):
def check_hidden_states_output(inputs_dict, config, model_class):
model = model_class(config)
model.to(torch_device)
model.eval()
with torch.no_grad():
outputs = model(**self._prepare_for_class(inputs_dict, model_class))
hidden_states = outputs.encoder_hidden_states if config.is_encoder_decoder else outputs.hidden_states
expected_num_layers = getattr(
self.model_tester, "expected_num_hidden_layers", self.model_tester.num_hidden_layers + 1
)
self.assertEqual(len(hidden_states), expected_num_layers)
seq_length = self.model_tester.seq_length
self.assertListEqual(
list(hidden_states[0].shape[-2:]),
[seq_length, self.model_tester.hidden_size],
)
config, inputs_dict = self.model_tester.prepare_config_and_inputs_for_common()
for model_class in self.all_model_classes:
inputs_dict["output_hidden_states"] = True
check_hidden_states_output(inputs_dict, config, model_class)
# check that output_hidden_states also work using config
del inputs_dict["output_hidden_states"]
config.output_hidden_states = True
check_hidden_states_output(inputs_dict, config, model_class)
@slow
def test_model_from_pretrained(self):
for model_name in IDEFICS_PRETRAINED_MODEL_ARCHIVE_LIST[:1]:
model = IdeficsModel.from_pretrained(model_name)
self.assertIsNotNone(model)
@unittest.skipIf(not is_torch_greater_or_equal_than_2_0, reason="pytorch 2.0 or higher is required")
@require_torch
class IdeficsForVisionText2TextTest(IdeficsModelTest, unittest.TestCase):
all_model_classes = (IdeficsForVisionText2Text,) if is_torch_available() else ()
def setUp(self):
self.model_tester = IdeficsModelTester(
self, modality_type_vocab_size=3, add_multiple_images=True, num_images=2
)
self.config_tester = ConfigTester(self, config_class=IdeficsConfig, hidden_size=37)
@unittest.skip("We only test the model that takes in multiple images")
def test_model(self):
pass
@unittest.skip("We only test the model that takes in multiple images")
def test_for_token_classification(self):
pass
@unittest.skip(reason="""IDEFICS does not support retaining the gradients of the hidden states and attention""")
def test_retain_grad_hidden_states_attentions(self):
pass
@unittest.skipIf(not is_torch_greater_or_equal_than_2_0, reason="pytorch 2.0 or higher is required")
@require_torch
@require_vision
class IdeficsModelIntegrationTest(TestCasePlus):
@cached_property
def default_processor(self):
return IdeficsProcessor.from_pretrained("HuggingFaceM4/idefics-9b") if is_vision_available() else None
@slow
def test_inference_natural_language_visual_reasoning(self):
cat_image_path = self.tests_dir / "fixtures/tests_samples/COCO/000000039769.png"
cats_image_obj = Image.open(cat_image_path) # 2 cats
dogs_image_url = "https://huggingface.co/datasets/hf-internal-testing/fixtures_nlvr2/raw/main/image1.jpeg"
prompts = [
[
"User:",
dogs_image_url,
"Describe this image.\nAssistant: An image of two dogs.\n",
"User:",
cats_image_obj,
"Describe this image.\nAssistant:",
],
[
"User:",
cats_image_obj,
"Describe this image.\nAssistant: An image of two kittens.\n",
"User:",
dogs_image_url,
"Describe this image.\nAssistant:",
],
]
model = IdeficsForVisionText2Text.from_pretrained("HuggingFaceM4/idefics-9b").to(torch_device)
processor = self.default_processor
inputs = processor(prompts, return_tensors="pt").to(torch_device)
generated_ids = model.generate(**inputs, max_length=100)
generated_text = processor.batch_decode(generated_ids, skip_special_tokens=True)
# keep for debugging
for i, t in enumerate(generated_text):
t = bytes(t, "utf-8").decode("unicode_escape")
print(f"{i}:\n{t}\n")
self.assertIn("image of two cats", generated_text[0])
self.assertIn("image of two dogs", generated_text[1])
# Copyright 2022 The HuggingFace Team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import numpy as np
from transformers.testing_utils import TestCasePlus, require_torch, require_vision
from transformers.utils import is_torch_available, is_vision_available
if is_torch_available():
import torch
if is_vision_available():
from PIL import Image
from transformers import (
AutoProcessor,
IdeficsImageProcessor,
IdeficsProcessor,
LlamaTokenizerFast,
PreTrainedTokenizerFast,
)
@require_torch
@require_vision
class IdeficsProcessorTest(TestCasePlus):
def setUp(self):
super().setUp()
self.checkpoint_path = self.get_auto_remove_tmp_dir()
image_processor = IdeficsImageProcessor()
tokenizer = LlamaTokenizerFast.from_pretrained("HuggingFaceM4/tiny-random-idefics")
processor = IdeficsProcessor(image_processor, tokenizer)
processor.save_pretrained(self.checkpoint_path)
self.input_keys = ["pixel_values", "input_ids", "attention_mask", "image_attention_mask"]
def get_tokenizer(self, **kwargs):
return AutoProcessor.from_pretrained(self.checkpoint_path, **kwargs).tokenizer
def get_image_processor(self, **kwargs):
return AutoProcessor.from_pretrained(self.checkpoint_path, **kwargs).image_processor
def prepare_prompts(self):
"""This function prepares a list of PIL images"""
num_images = 2
images = [np.random.randint(255, size=(3, 30, 400), dtype=np.uint8) for x in range(num_images)]
images = [Image.fromarray(np.moveaxis(x, 0, -1)) for x in images]
# print([type(x) for x in images])
# die
prompts = [
# text and 1 image
[
"User:",
images[0],
"Describe this image.\nAssistant:",
],
# text and images
[
"User:",
images[0],
"Describe this image.\nAssistant: An image of two dogs.\n",
"User:",
images[1],
"Describe this image.\nAssistant:",
],
# only text
[
"User:",
"Describe this image.\nAssistant: An image of two kittens.\n",
"User:",
"Describe this image.\nAssistant:",
],
# only images
[
images[0],
images[1],
],
]
return prompts
def test_save_load_pretrained_additional_features(self):
processor = IdeficsProcessor(tokenizer=self.get_tokenizer(), image_processor=self.get_image_processor())
processor.save_pretrained(self.checkpoint_path)
tokenizer_add_kwargs = self.get_tokenizer(bos_token="(BOS)", eos_token="(EOS)")
image_processor_add_kwargs = self.get_image_processor(do_normalize=False, padding_value=1.0)
processor = IdeficsProcessor.from_pretrained(
self.checkpoint_path, bos_token="(BOS)", eos_token="(EOS)", do_normalize=False, padding_value=1.0
)
self.assertEqual(processor.tokenizer.get_vocab(), tokenizer_add_kwargs.get_vocab())
self.assertIsInstance(processor.tokenizer, PreTrainedTokenizerFast)
self.assertEqual(processor.image_processor.to_json_string(), image_processor_add_kwargs.to_json_string())
self.assertIsInstance(processor.image_processor, IdeficsImageProcessor)
def test_processor(self):
image_processor = self.get_image_processor()
tokenizer = self.get_tokenizer()
processor = IdeficsProcessor(tokenizer=tokenizer, image_processor=image_processor)
prompts = self.prepare_prompts()
# test that all prompts succeeded
input_processor = processor(prompts, return_tensors="pt")
for key in self.input_keys:
assert torch.is_tensor(input_processor[key])
def test_tokenizer_decode(self):
image_processor = self.get_image_processor()
tokenizer = self.get_tokenizer()
processor = IdeficsProcessor(tokenizer=tokenizer, image_processor=image_processor)
predicted_ids = [[1, 4, 5, 8, 1, 0, 8], [3, 4, 3, 1, 1, 8, 9]]
decoded_processor = processor.batch_decode(predicted_ids)
decoded_tok = tokenizer.batch_decode(predicted_ids)
self.assertListEqual(decoded_tok, decoded_processor)
def test_model_input_names(self):
image_processor = self.get_image_processor()
tokenizer = self.get_tokenizer()
processor = IdeficsProcessor(tokenizer=tokenizer, image_processor=image_processor)
prompts = self.prepare_prompts()
inputs = processor(prompts)
# For now the processor supports only ['pixel_values', 'input_ids', 'attention_mask']
self.assertSetEqual(set(inputs.keys()), set(self.input_keys))
...@@ -121,6 +121,9 @@ SPECIAL_CASES_TO_ALLOW.update( ...@@ -121,6 +121,9 @@ SPECIAL_CASES_TO_ALLOW.update(
"JukeboxPriorConfig": True, "JukeboxPriorConfig": True,
# TODO: @Younes (for `is_decoder`) # TODO: @Younes (for `is_decoder`)
"Pix2StructTextConfig": True, "Pix2StructTextConfig": True,
"IdeficsConfig": True,
"IdeficsVisionConfig": True,
"IdeficsPerceiverConfig": True,
} }
) )
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment