Unverified Commit 269b0549 authored by Alara Dirik's avatar Alara Dirik Committed by GitHub
Browse files

Add ALIGN to transformers (#21741)

Adds the ALIGN model to transformers. ALIGN is introduced in "Scaling Up Visual and Vision-Language Representation Learning With Noisy Text Supervision" by Chao Jia, Yinfei Yang, Ye Xia, Yi-Ting Chen, Zarana Parekh, Hieu Pham, Quoc V. Le, Yunhsuan Sung, Zhen Li, Tom Duerig.
parent f7c618e3
# coding=utf-8
# Copyright 2023 The HuggingFace Inc. team.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Image/Text processor class for ALIGN
"""
from ...processing_utils import ProcessorMixin
from ...tokenization_utils_base import BatchEncoding
class AlignProcessor(ProcessorMixin):
r"""
Constructs an ALIGN processor which wraps [`EfficientNetImageProcessor`] and
[`BertTokenizer`]/[`BertTokenizerFast`] into a single processor that interits both the image processor and
tokenizer functionalities. See the [`~AlignProcessor.__call__`] and [`~OwlViTProcessor.decode`] for more
information.
Args:
image_processor ([`EfficientNetImageProcessor`]):
The image processor is a required input.
tokenizer ([`BERTTokenizer`, `BertTokenizerFast`]):
The tokenizer is a required input.
"""
attributes = ["image_processor", "tokenizer"]
image_processor_class = "EfficientNetImageProcessor"
tokenizer_class = ("BertTokenizer", "BertTokenizerFast")
def __init__(self, image_processor, tokenizer):
super().__init__(image_processor, tokenizer)
def __call__(self, text=None, images=None, padding="max_length", max_length=64, return_tensors=None, **kwargs):
"""
Main method to prepare text(s) and image(s) to be fed as input to the model. This method forwards the `text`
and `kwargs` arguments to BertTokenizerFast's [`~BertTokenizerFast.__call__`] if `text` is not `None` to encode
the text. To prepare the image(s), this method forwards the `images` and `kwargs` arguments to
EfficientNetImageProcessor's [`~EfficientNetImageProcessor.__call__`] if `images` is not `None`. Please refer
to the doctsring of the above two methods for more information.
Args:
text (`str`, `List[str]`):
The sequence or batch of sequences to be encoded. Each sequence can be a string or a list of strings
(pretokenized string). If the sequences are provided as list of strings (pretokenized), you must set
`is_split_into_words=True` (to lift the ambiguity with a batch of sequences).
images (`PIL.Image.Image`, `np.ndarray`, `torch.Tensor`, `List[PIL.Image.Image]`, `List[np.ndarray]`, `List[torch.Tensor]`):
The image or batch of images to be prepared. Each image can be a PIL image, NumPy array or PyTorch
tensor. In case of a NumPy array/PyTorch tensor, each image should be of shape (C, H, W), where C is a
number of channels, H and W are image height and width.
padding (`bool`, `str` or [`~utils.PaddingStrategy`], *optional*, defaults to `max_length`):
Activates and controls padding for tokenization of input text. Choose between [`True` or `'longest'`,
`'max_length'`, `False` or `'do_not_pad'`]
max_length (`int`, *optional*, defaults to `max_length`):
Maximum padding value to use to pad the input text during tokenization.
return_tensors (`str` or [`~utils.TensorType`], *optional*):
If set, will return tensors of a particular framework. Acceptable values are:
- `'tf'`: Return TensorFlow `tf.constant` objects.
- `'pt'`: Return PyTorch `torch.Tensor` objects.
- `'np'`: Return NumPy `np.ndarray` objects.
- `'jax'`: Return JAX `jnp.ndarray` objects.
Returns:
[`BatchEncoding`]: A [`BatchEncoding`] with the following fields:
- **input_ids** -- List of token ids to be fed to a model. Returned when `text` is not `None`.
- **attention_mask** -- List of indices specifying which tokens should be attended to by the model (when
`return_attention_mask=True` or if *"attention_mask"* is in `self.model_input_names` and if `text` is not
`None`).
- **pixel_values** -- Pixel values to be fed to a model. Returned when `images` is not `None`.
"""
if text is None and images is None:
raise ValueError("You have to specify either text or images. Both cannot be none.")
if text is not None:
encoding = self.tokenizer(
text, padding=padding, max_length=max_length, return_tensors=return_tensors, **kwargs
)
if images is not None:
image_features = self.image_processor(images, return_tensors=return_tensors, **kwargs)
if text is not None and images is not None:
encoding["pixel_values"] = image_features.pixel_values
return encoding
elif text is not None:
return encoding
else:
return BatchEncoding(data=dict(**image_features), tensor_type=return_tensors)
def batch_decode(self, *args, **kwargs):
"""
This method forwards all its arguments to BertTokenizerFast's [`~PreTrainedTokenizer.batch_decode`]. Please
refer to the docstring of this method for more information.
"""
return self.tokenizer.batch_decode(*args, **kwargs)
def decode(self, *args, **kwargs):
"""
This method forwards all its arguments to BertTokenizerFast's [`~PreTrainedTokenizer.decode`]. Please refer to
the docstring of this method for more information.
"""
return self.tokenizer.decode(*args, **kwargs)
@property
def model_input_names(self):
tokenizer_input_names = self.tokenizer.model_input_names
image_processor_input_names = self.image_processor.model_input_names
return list(dict.fromkeys(tokenizer_input_names + image_processor_input_names))
......@@ -30,6 +30,7 @@ CONFIG_MAPPING_NAMES = OrderedDict(
[
# Add configs here
("albert", "AlbertConfig"),
("align", "AlignConfig"),
("altclip", "AltCLIPConfig"),
("audio-spectrogram-transformer", "ASTConfig"),
("bart", "BartConfig"),
......@@ -207,6 +208,7 @@ CONFIG_ARCHIVE_MAP_MAPPING_NAMES = OrderedDict(
[
# Add archive maps here)
("albert", "ALBERT_PRETRAINED_CONFIG_ARCHIVE_MAP"),
("align", "ALIGN_PRETRAINED_CONFIG_ARCHIVE_MAP"),
("altclip", "ALTCLIP_PRETRAINED_CONFIG_ARCHIVE_MAP"),
("audio-spectrogram-transformer", "AUDIO_SPECTROGRAM_TRANSFORMER_PRETRAINED_CONFIG_ARCHIVE_MAP"),
("bart", "BART_PRETRAINED_CONFIG_ARCHIVE_MAP"),
......@@ -366,6 +368,7 @@ MODEL_NAMES_MAPPING = OrderedDict(
[
# Add full (and cased) model names here
("albert", "ALBERT"),
("align", "ALIGN"),
("altclip", "AltCLIP"),
("audio-spectrogram-transformer", "Audio Spectrogram Transformer"),
("bart", "BART"),
......
......@@ -37,6 +37,7 @@ logger = logging.get_logger(__name__)
IMAGE_PROCESSOR_MAPPING_NAMES = OrderedDict(
[
("align", "EfficientNetImageProcessor"),
("beit", "BeitImageProcessor"),
("bit", "BitImageProcessor"),
("blip", "BlipImageProcessor"),
......
......@@ -29,6 +29,7 @@ MODEL_MAPPING_NAMES = OrderedDict(
[
# Base model mapping
("albert", "AlbertModel"),
("align", "AlignModel"),
("altclip", "AltCLIPModel"),
("audio-spectrogram-transformer", "ASTModel"),
("bart", "BartModel"),
......@@ -922,6 +923,7 @@ MODEL_FOR_AUDIO_XVECTOR_MAPPING_NAMES = OrderedDict(
_MODEL_FOR_ZERO_SHOT_IMAGE_CLASSIFICATION_MAPPING_NAMES = OrderedDict(
[
# Model for Zero Shot Image Classification mapping
("align", "AlignModel"),
("altclip", "AltCLIPModel"),
("blip", "BlipModel"),
("chinese_clip", "ChineseCLIPModel"),
......
......@@ -41,6 +41,7 @@ logger = logging.get_logger(__name__)
PROCESSOR_MAPPING_NAMES = OrderedDict(
[
("align", "AlignProcessor"),
("altclip", "AltCLIPProcessor"),
("blip", "BlipProcessor"),
("blip-2", "Blip2Processor"),
......
......@@ -53,6 +53,7 @@ else:
"AlbertTokenizerFast" if is_tokenizers_available() else None,
),
),
("align", ("BertTokenizer", "BertTokenizerFast" if is_tokenizers_available() else None)),
("bart", ("BartTokenizer", "BartTokenizerFast")),
(
"barthez",
......
......@@ -199,7 +199,7 @@ class EfficientNetImageProcessor(BaseImageProcessor):
rescaled_image = rescaled_image.astype(np.float32)
else:
rescaled_image = rescale(image, scale=scale, data_format=data_format, **kwargs)
return rescale(image, scale=scale, data_format=data_format, **kwargs)
return rescaled_image
def normalize(
self,
......
......@@ -126,12 +126,12 @@ class EfficientNetEmbeddings(nn.Module):
def __init__(self, config: EfficientNetConfig):
super().__init__()
out_channels = round_filters(config, 32)
self.out_dim = round_filters(config, 32)
self.padding = nn.ZeroPad2d(padding=(0, 1, 0, 1))
self.convolution = nn.Conv2d(
config.num_channels, out_channels, kernel_size=3, stride=2, padding="valid", bias=False
config.num_channels, self.out_dim, kernel_size=3, stride=2, padding="valid", bias=False
)
self.batchnorm = nn.BatchNorm2d(out_channels, eps=config.batch_norm_eps, momentum=config.batch_norm_momentum)
self.batchnorm = nn.BatchNorm2d(self.out_dim, eps=config.batch_norm_eps, momentum=config.batch_norm_momentum)
self.activation = ACT2FN[config.hidden_act]
def forward(self, pixel_values: torch.Tensor) -> torch.Tensor:
......@@ -174,13 +174,7 @@ class EfficientNetExpansionLayer(nn.Module):
This corresponds to the expansion phase of each block in the original implementation.
"""
def __init__(
self,
config: EfficientNetConfig,
in_dim: int,
out_dim: int,
stride: int,
):
def __init__(self, config: EfficientNetConfig, in_dim: int, out_dim: int, stride: int):
super().__init__()
self.expand_conv = nn.Conv2d(
in_channels=in_dim,
......@@ -189,7 +183,7 @@ class EfficientNetExpansionLayer(nn.Module):
padding="same",
bias=False,
)
self.expand_bn = nn.BatchNorm2d(num_features=out_dim)
self.expand_bn = nn.BatchNorm2d(num_features=out_dim, eps=config.batch_norm_eps)
self.expand_act = ACT2FN[config.hidden_act]
def forward(self, hidden_states: torch.FloatTensor) -> torch.Tensor:
......
......@@ -356,6 +356,37 @@ def load_tf_weights_in_albert(*args, **kwargs):
requires_backends(load_tf_weights_in_albert, ["torch"])
ALIGN_PRETRAINED_MODEL_ARCHIVE_LIST = None
class AlignModel(metaclass=DummyObject):
_backends = ["torch"]
def __init__(self, *args, **kwargs):
requires_backends(self, ["torch"])
class AlignPreTrainedModel(metaclass=DummyObject):
_backends = ["torch"]
def __init__(self, *args, **kwargs):
requires_backends(self, ["torch"])
class AlignTextModel(metaclass=DummyObject):
_backends = ["torch"]
def __init__(self, *args, **kwargs):
requires_backends(self, ["torch"])
class AlignVisionModel(metaclass=DummyObject):
_backends = ["torch"]
def __init__(self, *args, **kwargs):
requires_backends(self, ["torch"])
ALTCLIP_PRETRAINED_MODEL_ARCHIVE_LIST = None
......
# coding=utf-8
# Copyright 2023 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
""" Testing suite for the PyTorch ALIGN model. """
import inspect
import os
import tempfile
import unittest
import requests
from transformers import AlignConfig, AlignProcessor, AlignTextConfig, AlignVisionConfig
from transformers.testing_utils import (
is_flax_available,
require_torch,
require_vision,
slow,
torch_device,
)
from transformers.utils import is_torch_available, is_vision_available
from ...test_configuration_common import ConfigTester
from ...test_modeling_common import (
ModelTesterMixin,
_config_zero_init,
floats_tensor,
ids_tensor,
random_attention_mask,
)
if is_torch_available():
import torch
from transformers import (
AlignModel,
AlignTextModel,
AlignVisionModel,
)
from transformers.models.align.modeling_align import ALIGN_PRETRAINED_MODEL_ARCHIVE_LIST
if is_vision_available():
from PIL import Image
if is_flax_available():
pass
class AlignVisionModelTester:
def __init__(
self,
parent,
batch_size=13,
image_size=32,
num_channels=3,
kernel_sizes=[3, 3, 5],
in_channels=[32, 16, 24],
out_channels=[16, 24, 30],
hidden_dim=64,
strides=[1, 1, 2],
num_block_repeats=[1, 1, 2],
expand_ratios=[1, 6, 6],
is_training=True,
hidden_act="gelu",
):
self.parent = parent
self.batch_size = batch_size
self.image_size = image_size
self.num_channels = num_channels
self.kernel_sizes = kernel_sizes
self.in_channels = in_channels
self.out_channels = out_channels
self.hidden_dim = hidden_dim
self.strides = strides
self.num_block_repeats = num_block_repeats
self.expand_ratios = expand_ratios
self.is_training = is_training
self.hidden_act = hidden_act
def prepare_config_and_inputs(self):
pixel_values = floats_tensor([self.batch_size, self.num_channels, self.image_size, self.image_size])
config = self.get_config()
return config, pixel_values
def get_config(self):
return AlignVisionConfig(
num_channels=self.num_channels,
kernel_sizes=self.kernel_sizes,
in_channels=self.in_channels,
out_channels=self.out_channels,
hidden_dim=self.hidden_dim,
strides=self.strides,
num_block_repeats=self.num_block_repeats,
expand_ratios=self.expand_ratios,
hidden_act=self.hidden_act,
)
def create_and_check_model(self, config, pixel_values):
model = AlignVisionModel(config=config)
model.to(torch_device)
model.eval()
with torch.no_grad():
result = model(pixel_values)
patch_size = self.image_size // 4
self.parent.assertEqual(
result.last_hidden_state.shape, (self.batch_size, config.hidden_dim, patch_size, patch_size)
)
self.parent.assertEqual(result.pooler_output.shape, (self.batch_size, config.hidden_dim))
def prepare_config_and_inputs_for_common(self):
config_and_inputs = self.prepare_config_and_inputs()
config, pixel_values = config_and_inputs
inputs_dict = {"pixel_values": pixel_values}
return config, inputs_dict
@require_torch
class AlignVisionModelTest(ModelTesterMixin, unittest.TestCase):
"""
Here we also overwrite some of the tests of test_modeling_common.py, as ALIGN does not use input_ids, inputs_embeds,
attention_mask and seq_length.
"""
all_model_classes = (AlignVisionModel,) if is_torch_available() else ()
fx_compatible = False
test_pruning = False
test_resize_embeddings = False
test_head_masking = False
has_attentions = False
def setUp(self):
self.model_tester = AlignVisionModelTester(self)
self.config_tester = ConfigTester(
self, config_class=AlignVisionConfig, has_text_modality=False, hidden_size=37
)
def test_config(self):
self.create_and_test_config_common_properties()
self.config_tester.create_and_test_config_to_json_string()
self.config_tester.create_and_test_config_to_json_file()
self.config_tester.create_and_test_config_from_and_save_pretrained()
self.config_tester.create_and_test_config_with_num_labels()
self.config_tester.check_config_can_be_init_without_params()
self.config_tester.check_config_arguments_init()
def create_and_test_config_common_properties(self):
return
@unittest.skip(reason="AlignVisionModel does not use inputs_embeds")
def test_inputs_embeds(self):
pass
@unittest.skip(reason="AlignVisionModel does not support input and output embeddings")
def test_model_common_attributes(self):
pass
def test_forward_signature(self):
config, _ = self.model_tester.prepare_config_and_inputs_for_common()
for model_class in self.all_model_classes:
model = model_class(config)
signature = inspect.signature(model.forward)
# signature.parameters is an OrderedDict => so arg_names order is deterministic
arg_names = [*signature.parameters.keys()]
expected_arg_names = ["pixel_values"]
self.assertListEqual(arg_names[:1], expected_arg_names)
def test_model(self):
config_and_inputs = self.model_tester.prepare_config_and_inputs()
self.model_tester.create_and_check_model(*config_and_inputs)
def test_hidden_states_output(self):
def check_hidden_states_output(inputs_dict, config, model_class):
model = model_class(config)
model.to(torch_device)
model.eval()
with torch.no_grad():
outputs = model(**self._prepare_for_class(inputs_dict, model_class))
hidden_states = outputs.encoder_hidden_states if config.is_encoder_decoder else outputs.hidden_states
num_blocks = sum(config.num_block_repeats) * 4
self.assertEqual(len(hidden_states), num_blocks)
self.assertListEqual(
list(hidden_states[0].shape[-2:]),
[self.model_tester.image_size // 2, self.model_tester.image_size // 2],
)
config, inputs_dict = self.model_tester.prepare_config_and_inputs_for_common()
for model_class in self.all_model_classes:
inputs_dict["output_hidden_states"] = True
check_hidden_states_output(inputs_dict, config, model_class)
# check that output_hidden_states also work using config
del inputs_dict["output_hidden_states"]
config.output_hidden_states = True
check_hidden_states_output(inputs_dict, config, model_class)
def test_training(self):
pass
def test_training_gradient_checkpointing(self):
pass
@slow
def test_model_from_pretrained(self):
for model_name in ALIGN_PRETRAINED_MODEL_ARCHIVE_LIST[:1]:
model = AlignVisionModel.from_pretrained(model_name)
self.assertIsNotNone(model)
class AlignTextModelTester:
def __init__(
self,
parent,
batch_size=13,
seq_length=7,
is_training=True,
use_input_mask=True,
use_token_type_ids=True,
vocab_size=99,
hidden_size=32,
num_hidden_layers=5,
num_attention_heads=4,
intermediate_size=37,
hidden_act="gelu",
hidden_dropout_prob=0.1,
attention_probs_dropout_prob=0.1,
max_position_embeddings=512,
type_vocab_size=16,
type_sequence_label_size=2,
initializer_range=0.02,
scope=None,
):
self.parent = parent
self.batch_size = batch_size
self.seq_length = seq_length
self.is_training = is_training
self.use_input_mask = use_input_mask
self.use_token_type_ids = use_token_type_ids
self.vocab_size = vocab_size
self.hidden_size = hidden_size
self.num_hidden_layers = num_hidden_layers
self.num_attention_heads = num_attention_heads
self.intermediate_size = intermediate_size
self.hidden_act = hidden_act
self.hidden_dropout_prob = hidden_dropout_prob
self.attention_probs_dropout_prob = attention_probs_dropout_prob
self.max_position_embeddings = max_position_embeddings
self.type_vocab_size = type_vocab_size
self.type_sequence_label_size = type_sequence_label_size
self.initializer_range = initializer_range
self.scope = scope
def prepare_config_and_inputs(self):
input_ids = ids_tensor([self.batch_size, self.seq_length], self.vocab_size)
input_mask = None
if self.use_input_mask:
input_mask = random_attention_mask([self.batch_size, self.seq_length])
token_type_ids = None
if self.use_token_type_ids:
token_type_ids = ids_tensor([self.batch_size, self.seq_length], self.type_vocab_size)
config = self.get_config()
return config, input_ids, token_type_ids, input_mask
def get_config(self):
return AlignTextConfig(
vocab_size=self.vocab_size,
hidden_size=self.hidden_size,
num_hidden_layers=self.num_hidden_layers,
num_attention_heads=self.num_attention_heads,
intermediate_size=self.intermediate_size,
hidden_act=self.hidden_act,
hidden_dropout_prob=self.hidden_dropout_prob,
attention_probs_dropout_prob=self.attention_probs_dropout_prob,
max_position_embeddings=self.max_position_embeddings,
type_vocab_size=self.type_vocab_size,
is_decoder=False,
initializer_range=self.initializer_range,
)
def create_and_check_model(self, config, input_ids, token_type_ids, input_mask):
model = AlignTextModel(config=config)
model.to(torch_device)
model.eval()
with torch.no_grad():
result = model(input_ids, attention_mask=input_mask, token_type_ids=token_type_ids)
result = model(input_ids, token_type_ids=token_type_ids)
result = model(input_ids)
self.parent.assertEqual(result.last_hidden_state.shape, (self.batch_size, self.seq_length, self.hidden_size))
self.parent.assertEqual(result.pooler_output.shape, (self.batch_size, self.hidden_size))
def prepare_config_and_inputs_for_common(self):
config_and_inputs = self.prepare_config_and_inputs()
(
config,
input_ids,
token_type_ids,
input_mask,
) = config_and_inputs
inputs_dict = {"input_ids": input_ids, "token_type_ids": token_type_ids, "attention_mask": input_mask}
return config, inputs_dict
@require_torch
class AlignTextModelTest(ModelTesterMixin, unittest.TestCase):
all_model_classes = (AlignTextModel,) if is_torch_available() else ()
fx_compatible = False
test_pruning = False
test_head_masking = False
def setUp(self):
self.model_tester = AlignTextModelTester(self)
self.config_tester = ConfigTester(self, config_class=AlignTextConfig, hidden_size=37)
def test_config(self):
self.config_tester.run_common_tests()
def test_model(self):
config_and_inputs = self.model_tester.prepare_config_and_inputs()
self.model_tester.create_and_check_model(*config_and_inputs)
def test_training(self):
pass
def test_training_gradient_checkpointing(self):
pass
@unittest.skip(reason="ALIGN does not use inputs_embeds")
def test_inputs_embeds(self):
pass
@unittest.skip(reason="AlignTextModel has no base class and is not available in MODEL_MAPPING")
def test_save_load_fast_init_from_base(self):
pass
@unittest.skip(reason="AlignTextModel has no base class and is not available in MODEL_MAPPING")
def test_save_load_fast_init_to_base(self):
pass
@slow
def test_model_from_pretrained(self):
for model_name in ALIGN_PRETRAINED_MODEL_ARCHIVE_LIST[:1]:
model = AlignTextModel.from_pretrained(model_name)
self.assertIsNotNone(model)
class AlignModelTester:
def __init__(self, parent, text_kwargs=None, vision_kwargs=None, is_training=True):
if text_kwargs is None:
text_kwargs = {}
if vision_kwargs is None:
vision_kwargs = {}
self.parent = parent
self.text_model_tester = AlignTextModelTester(parent, **text_kwargs)
self.vision_model_tester = AlignVisionModelTester(parent, **vision_kwargs)
self.is_training = is_training
def prepare_config_and_inputs(self):
test_config, input_ids, token_type_ids, input_mask = self.text_model_tester.prepare_config_and_inputs()
vision_config, pixel_values = self.vision_model_tester.prepare_config_and_inputs()
config = self.get_config()
return config, input_ids, token_type_ids, input_mask, pixel_values
def get_config(self):
return AlignConfig.from_text_vision_configs(
self.text_model_tester.get_config(), self.vision_model_tester.get_config(), projection_dim=64
)
def create_and_check_model(self, config, input_ids, token_type_ids, attention_mask, pixel_values):
model = AlignModel(config).to(torch_device).eval()
with torch.no_grad():
result = model(input_ids, pixel_values, attention_mask, token_type_ids)
self.parent.assertEqual(
result.logits_per_image.shape, (self.vision_model_tester.batch_size, self.text_model_tester.batch_size)
)
self.parent.assertEqual(
result.logits_per_text.shape, (self.text_model_tester.batch_size, self.vision_model_tester.batch_size)
)
def prepare_config_and_inputs_for_common(self):
config_and_inputs = self.prepare_config_and_inputs()
config, input_ids, token_type_ids, input_mask, pixel_values = config_and_inputs
inputs_dict = {
"input_ids": input_ids,
"token_type_ids": token_type_ids,
"attention_mask": input_mask,
"pixel_values": pixel_values,
"return_loss": True,
}
return config, inputs_dict
@require_torch
class AlignModelTest(ModelTesterMixin, unittest.TestCase):
all_model_classes = (AlignModel,) if is_torch_available() else ()
fx_compatible = False
test_head_masking = False
test_pruning = False
test_resize_embeddings = False
test_attention_outputs = False
def setUp(self):
self.model_tester = AlignModelTester(self)
def test_model(self):
config_and_inputs = self.model_tester.prepare_config_and_inputs()
self.model_tester.create_and_check_model(*config_and_inputs)
@unittest.skip(reason="Hidden_states is tested in individual model tests")
def test_hidden_states_output(self):
pass
@unittest.skip(reason="Inputs_embeds is tested in individual model tests")
def test_inputs_embeds(self):
pass
@unittest.skip(reason="Retain_grad is tested in individual model tests")
def test_retain_grad_hidden_states_attentions(self):
pass
@unittest.skip(reason="AlignModel does not have input/output embeddings")
def test_model_common_attributes(self):
pass
# override as the `temperature` parameter initilization is different for ALIGN
def test_initialization(self):
config, inputs_dict = self.model_tester.prepare_config_and_inputs_for_common()
configs_no_init = _config_zero_init(config)
for model_class in self.all_model_classes:
model = model_class(config=configs_no_init)
for name, param in model.named_parameters():
if param.requires_grad:
# check if `temperature` is initilized as per the original implementation
if name == "temperature":
self.assertAlmostEqual(
param.data.item(),
1.0,
delta=1e-3,
msg=f"Parameter {name} of model {model_class} seems not properly initialized",
)
elif name == "text_projection.weight":
self.assertTrue(
-1.0 <= ((param.data.mean() * 1e9).round() / 1e9).item() <= 1.0,
msg=f"Parameter {name} of model {model_class} seems not properly initialized",
)
else:
self.assertIn(
((param.data.mean() * 1e9).round() / 1e9).item(),
[0.0, 1.0],
msg=f"Parameter {name} of model {model_class} seems not properly initialized",
)
def _create_and_check_torchscript(self, config, inputs_dict):
if not self.test_torchscript:
return
configs_no_init = _config_zero_init(config) # To be sure we have no Nan
configs_no_init.torchscript = True
configs_no_init.return_dict = False
for model_class in self.all_model_classes:
model = model_class(config=configs_no_init)
model.to(torch_device)
model.eval()
try:
input_ids = inputs_dict["input_ids"]
pixel_values = inputs_dict["pixel_values"] # ALIGN needs pixel_values
traced_model = torch.jit.trace(model, (input_ids, pixel_values))
except RuntimeError:
self.fail("Couldn't trace module.")
with tempfile.TemporaryDirectory() as tmp_dir_name:
pt_file_name = os.path.join(tmp_dir_name, "traced_model.pt")
try:
torch.jit.save(traced_model, pt_file_name)
except Exception:
self.fail("Couldn't save module.")
try:
loaded_model = torch.jit.load(pt_file_name)
except Exception:
self.fail("Couldn't load module.")
model.to(torch_device)
model.eval()
loaded_model.to(torch_device)
loaded_model.eval()
model_state_dict = model.state_dict()
loaded_model_state_dict = loaded_model.state_dict()
self.assertEqual(set(model_state_dict.keys()), set(loaded_model_state_dict.keys()))
models_equal = True
for layer_name, p1 in model_state_dict.items():
p2 = loaded_model_state_dict[layer_name]
if p1.data.ne(p2.data).sum() > 0:
models_equal = False
self.assertTrue(models_equal)
def test_load_vision_text_config(self):
config, inputs_dict = self.model_tester.prepare_config_and_inputs_for_common()
# Save AlignConfig and check if we can load AlignVisionConfig from it
with tempfile.TemporaryDirectory() as tmp_dir_name:
config.save_pretrained(tmp_dir_name)
vision_config = AlignVisionConfig.from_pretrained(tmp_dir_name)
self.assertDictEqual(config.vision_config.to_dict(), vision_config.to_dict())
# Save AlignConfig and check if we can load AlignTextConfig from it
with tempfile.TemporaryDirectory() as tmp_dir_name:
config.save_pretrained(tmp_dir_name)
text_config = AlignTextConfig.from_pretrained(tmp_dir_name)
self.assertDictEqual(config.text_config.to_dict(), text_config.to_dict())
@slow
def test_model_from_pretrained(self):
for model_name in ALIGN_PRETRAINED_MODEL_ARCHIVE_LIST[:1]:
model = AlignModel.from_pretrained(model_name)
self.assertIsNotNone(model)
# We will verify our results on an image of cute cats
def prepare_img():
url = "http://images.cocodataset.org/val2017/000000039769.jpg"
im = Image.open(requests.get(url, stream=True).raw)
return im
@require_vision
@require_torch
class AlignModelIntegrationTest(unittest.TestCase):
@slow
def test_inference(self):
model_name = "kakaobrain/align-base"
model = AlignModel.from_pretrained(model_name).to(torch_device)
processor = AlignProcessor.from_pretrained(model_name)
image = prepare_img()
texts = ["a photo of a cat", "a photo of a dog"]
inputs = processor(text=texts, images=image, return_tensors="pt").to(torch_device)
# forward pass
with torch.no_grad():
outputs = model(**inputs)
# verify the logits
self.assertEqual(
outputs.logits_per_image.shape,
torch.Size((inputs.pixel_values.shape[0], inputs.input_ids.shape[0])),
)
self.assertEqual(
outputs.logits_per_text.shape,
torch.Size((inputs.input_ids.shape[0], inputs.pixel_values.shape[0])),
)
expected_logits = torch.tensor([[9.7093, 3.4679]], device=torch_device)
self.assertTrue(torch.allclose(outputs.logits_per_image, expected_logits, atol=1e-3))
# Copyright 2023 The HuggingFace Team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import os
import shutil
import tempfile
import unittest
import numpy as np
import pytest
from transformers import BertTokenizer, BertTokenizerFast
from transformers.models.bert.tokenization_bert import VOCAB_FILES_NAMES
from transformers.testing_utils import require_vision
from transformers.utils import IMAGE_PROCESSOR_NAME, is_vision_available
if is_vision_available():
from PIL import Image
from transformers import AlignProcessor, EfficientNetImageProcessor
@require_vision
class AlignProcessorTest(unittest.TestCase):
def setUp(self):
self.tmpdirname = tempfile.mkdtemp()
vocab_tokens = [
"[UNK]",
"[CLS]",
"[SEP]",
"[PAD]",
"[MASK]",
"want",
"##want",
"##ed",
"wa",
"un",
"runn",
"##ing",
",",
"low",
"lowest",
]
self.vocab_file = os.path.join(self.tmpdirname, VOCAB_FILES_NAMES["vocab_file"])
with open(self.vocab_file, "w", encoding="utf-8") as vocab_writer:
vocab_writer.write("".join([x + "\n" for x in vocab_tokens]))
image_processor_map = {
"do_resize": True,
"size": 20,
"do_center_crop": True,
"crop_size": 18,
"do_normalize": True,
"image_mean": [0.48145466, 0.4578275, 0.40821073],
"image_std": [0.26862954, 0.26130258, 0.27577711],
}
self.image_processor_file = os.path.join(self.tmpdirname, IMAGE_PROCESSOR_NAME)
with open(self.image_processor_file, "w", encoding="utf-8") as fp:
json.dump(image_processor_map, fp)
def get_tokenizer(self, **kwargs):
return BertTokenizer.from_pretrained(self.tmpdirname, **kwargs)
def get_rust_tokenizer(self, **kwargs):
return BertTokenizerFast.from_pretrained(self.tmpdirname, **kwargs)
def get_image_processor(self, **kwargs):
return EfficientNetImageProcessor.from_pretrained(self.tmpdirname, **kwargs)
def tearDown(self):
shutil.rmtree(self.tmpdirname)
def prepare_image_inputs(self):
"""This function prepares a list of PIL images, or a list of numpy arrays if one specifies numpify=True,
or a list of PyTorch tensors if one specifies torchify=True.
"""
image_inputs = [np.random.randint(255, size=(3, 30, 400), dtype=np.uint8)]
image_inputs = [Image.fromarray(np.moveaxis(x, 0, -1)) for x in image_inputs]
return image_inputs
def test_save_load_pretrained_default(self):
tokenizer_slow = self.get_tokenizer()
tokenizer_fast = self.get_rust_tokenizer()
image_processor = self.get_image_processor()
processor_slow = AlignProcessor(tokenizer=tokenizer_slow, image_processor=image_processor)
processor_slow.save_pretrained(self.tmpdirname)
processor_slow = AlignProcessor.from_pretrained(self.tmpdirname, use_fast=False)
processor_fast = AlignProcessor(tokenizer=tokenizer_fast, image_processor=image_processor)
processor_fast.save_pretrained(self.tmpdirname)
processor_fast = AlignProcessor.from_pretrained(self.tmpdirname)
self.assertEqual(processor_slow.tokenizer.get_vocab(), tokenizer_slow.get_vocab())
self.assertEqual(processor_fast.tokenizer.get_vocab(), tokenizer_fast.get_vocab())
self.assertEqual(tokenizer_slow.get_vocab(), tokenizer_fast.get_vocab())
self.assertIsInstance(processor_slow.tokenizer, BertTokenizer)
self.assertIsInstance(processor_fast.tokenizer, BertTokenizerFast)
self.assertEqual(processor_slow.image_processor.to_json_string(), image_processor.to_json_string())
self.assertEqual(processor_fast.image_processor.to_json_string(), image_processor.to_json_string())
self.assertIsInstance(processor_slow.image_processor, EfficientNetImageProcessor)
self.assertIsInstance(processor_fast.image_processor, EfficientNetImageProcessor)
def test_save_load_pretrained_additional_features(self):
processor = AlignProcessor(tokenizer=self.get_tokenizer(), image_processor=self.get_image_processor())
processor.save_pretrained(self.tmpdirname)
tokenizer_add_kwargs = self.get_tokenizer(bos_token="(BOS)", eos_token="(EOS)")
image_processor_add_kwargs = self.get_image_processor(do_normalize=False, padding_value=1.0)
processor = AlignProcessor.from_pretrained(
self.tmpdirname, bos_token="(BOS)", eos_token="(EOS)", do_normalize=False, padding_value=1.0
)
self.assertEqual(processor.tokenizer.get_vocab(), tokenizer_add_kwargs.get_vocab())
self.assertIsInstance(processor.tokenizer, BertTokenizerFast)
self.assertEqual(processor.image_processor.to_json_string(), image_processor_add_kwargs.to_json_string())
self.assertIsInstance(processor.image_processor, EfficientNetImageProcessor)
def test_image_processor(self):
image_processor = self.get_image_processor()
tokenizer = self.get_tokenizer()
processor = AlignProcessor(tokenizer=tokenizer, image_processor=image_processor)
image_input = self.prepare_image_inputs()
input_image_proc = image_processor(image_input, return_tensors="np")
input_processor = processor(images=image_input, return_tensors="np")
for key in input_image_proc.keys():
self.assertAlmostEqual(input_image_proc[key].sum(), input_processor[key].sum(), delta=1e-2)
def test_tokenizer(self):
image_processor = self.get_image_processor()
tokenizer = self.get_tokenizer()
processor = AlignProcessor(tokenizer=tokenizer, image_processor=image_processor)
input_str = "lower newer"
encoded_processor = processor(text=input_str)
encoded_tok = tokenizer(input_str, padding="max_length", max_length=64)
for key in encoded_tok.keys():
self.assertListEqual(encoded_tok[key], encoded_processor[key])
def test_processor(self):
image_processor = self.get_image_processor()
tokenizer = self.get_tokenizer()
processor = AlignProcessor(tokenizer=tokenizer, image_processor=image_processor)
input_str = "lower newer"
image_input = self.prepare_image_inputs()
inputs = processor(text=input_str, images=image_input)
self.assertListEqual(list(inputs.keys()), ["input_ids", "token_type_ids", "attention_mask", "pixel_values"])
# test if it raises when no input is passed
with pytest.raises(ValueError):
processor()
def test_tokenizer_decode(self):
image_processor = self.get_image_processor()
tokenizer = self.get_tokenizer()
processor = AlignProcessor(tokenizer=tokenizer, image_processor=image_processor)
predicted_ids = [[1, 4, 5, 8, 1, 0, 8], [3, 4, 3, 1, 1, 8, 9]]
decoded_processor = processor.batch_decode(predicted_ids)
decoded_tok = tokenizer.batch_decode(predicted_ids)
self.assertListEqual(decoded_tok, decoded_processor)
def test_model_input_names(self):
image_processor = self.get_image_processor()
tokenizer = self.get_tokenizer()
processor = AlignProcessor(tokenizer=tokenizer, image_processor=image_processor)
input_str = "lower newer"
image_input = self.prepare_image_inputs()
inputs = processor(text=input_str, images=image_input)
self.assertListEqual(list(inputs.keys()), processor.model_input_names)
......@@ -182,6 +182,8 @@ TEST_FILES_WITH_NO_COMMON_TESTS = [
# should **not** be the rule.
IGNORE_NON_AUTO_CONFIGURED = PRIVATE_MODELS.copy() + [
# models to ignore for model xxx mapping
"AlignTextModel",
"AlignVisionModel",
"ClapTextModel",
"ClapTextModelWithProjection",
"ClapAudioModel",
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment