Unverified Commit a564d10a authored by amyeroberts's avatar amyeroberts Committed by GitHub
Browse files

Deprecate low use models (#30781)

* Deprecate models
- graphormer
- time_series_transformer
- xlm_prophetnet
- qdqbert
- nat
- ernie_m
- tvlt
- nezha
- mega
- jukebox
- vit_hybrid
- x_clip
- deta
- speech_to_text_2
- efficientformer
- realm
- gptsan_japanese

* Fix up

* Fix speech2text2 imports

* Make sure message isn't indented

* Fix docstrings

* Correctly map for deprecated models from model_type

* Uncomment out

* Add back time series transformer and x-clip

* Import fix and fix-up

* Fix up with updated ruff
parent 7f08817b
# coding=utf-8
# Copyright 2021 The HuggingFace Inc. team. All rights reserved.
# Copyright 2021 NVIDIA Corporation. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Testing suite for the PyTorch QDQBERT model."""
import unittest
from transformers import QDQBertConfig, is_torch_available
from transformers.testing_utils import require_pytorch_quantization, require_torch, slow, torch_device
from ...test_configuration_common import ConfigTester
from ...test_modeling_common import ModelTesterMixin, floats_tensor, ids_tensor, random_attention_mask
from ...test_pipeline_mixin import PipelineTesterMixin
if is_torch_available():
import torch
from transformers import (
QDQBertForMaskedLM,
QDQBertForMultipleChoice,
QDQBertForNextSentencePrediction,
QDQBertForQuestionAnswering,
QDQBertForSequenceClassification,
QDQBertForTokenClassification,
QDQBertLMHeadModel,
QDQBertModel,
)
class QDQBertModelTester:
def __init__(
self,
parent,
batch_size=13,
seq_length=7,
is_training=True,
use_input_mask=True,
use_token_type_ids=True,
use_labels=True,
vocab_size=99,
hidden_size=32,
num_hidden_layers=2,
num_attention_heads=4,
intermediate_size=37,
hidden_act="gelu",
hidden_dropout_prob=0.1,
attention_probs_dropout_prob=0.1,
max_position_embeddings=512,
type_vocab_size=16,
type_sequence_label_size=2,
initializer_range=0.02,
num_labels=3,
num_choices=4,
scope=None,
):
self.parent = parent
self.batch_size = batch_size
self.seq_length = seq_length
self.is_training = is_training
self.use_input_mask = use_input_mask
self.use_token_type_ids = use_token_type_ids
self.use_labels = use_labels
self.vocab_size = vocab_size
self.hidden_size = hidden_size
self.num_hidden_layers = num_hidden_layers
self.num_attention_heads = num_attention_heads
self.intermediate_size = intermediate_size
self.hidden_act = hidden_act
self.hidden_dropout_prob = hidden_dropout_prob
self.attention_probs_dropout_prob = attention_probs_dropout_prob
self.max_position_embeddings = max_position_embeddings
self.type_vocab_size = type_vocab_size
self.type_sequence_label_size = type_sequence_label_size
self.initializer_range = initializer_range
self.num_labels = num_labels
self.num_choices = num_choices
self.scope = scope
def prepare_config_and_inputs(self):
# Set default quantizers before creating the model.
import pytorch_quantization.nn as quant_nn
from pytorch_quantization.tensor_quant import QuantDescriptor
# The default tensor quantizer is set to use Max calibration method
input_desc = QuantDescriptor(num_bits=8, calib_method="max")
# The default tensor quantizer is set to be per-channel quantization for weights
weight_desc = QuantDescriptor(num_bits=8, axis=((0,)))
quant_nn.QuantLinear.set_default_quant_desc_input(input_desc)
quant_nn.QuantLinear.set_default_quant_desc_weight(weight_desc)
# For the test cases, since QDQBert model is tested in one run without calibration, the quantized tensors are set as fake quantized tensors which give float type tensors in the end.
quant_nn.TensorQuantizer.use_fb_fake_quant = True
input_ids = ids_tensor([self.batch_size, self.seq_length], self.vocab_size)
input_mask = None
if self.use_input_mask:
input_mask = random_attention_mask([self.batch_size, self.seq_length])
token_type_ids = None
if self.use_token_type_ids:
token_type_ids = ids_tensor([self.batch_size, self.seq_length], self.type_vocab_size)
sequence_labels = None
token_labels = None
choice_labels = None
if self.use_labels:
sequence_labels = ids_tensor([self.batch_size], self.type_sequence_label_size)
token_labels = ids_tensor([self.batch_size, self.seq_length], self.num_labels)
choice_labels = ids_tensor([self.batch_size], self.num_choices)
config = self.get_config()
return config, input_ids, token_type_ids, input_mask, sequence_labels, token_labels, choice_labels
def get_config(self):
return QDQBertConfig(
vocab_size=self.vocab_size,
hidden_size=self.hidden_size,
num_hidden_layers=self.num_hidden_layers,
num_attention_heads=self.num_attention_heads,
intermediate_size=self.intermediate_size,
hidden_act=self.hidden_act,
hidden_dropout_prob=self.hidden_dropout_prob,
attention_probs_dropout_prob=self.attention_probs_dropout_prob,
max_position_embeddings=self.max_position_embeddings,
type_vocab_size=self.type_vocab_size,
is_decoder=False,
initializer_range=self.initializer_range,
)
def prepare_config_and_inputs_for_decoder(self):
(
config,
input_ids,
token_type_ids,
input_mask,
sequence_labels,
token_labels,
choice_labels,
) = self.prepare_config_and_inputs()
config.is_decoder = True
encoder_hidden_states = floats_tensor([self.batch_size, self.seq_length, self.hidden_size])
encoder_attention_mask = ids_tensor([self.batch_size, self.seq_length], vocab_size=2)
return (
config,
input_ids,
token_type_ids,
input_mask,
sequence_labels,
token_labels,
choice_labels,
encoder_hidden_states,
encoder_attention_mask,
)
def create_and_check_model(
self, config, input_ids, token_type_ids, input_mask, sequence_labels, token_labels, choice_labels
):
model = QDQBertModel(config=config)
model.to(torch_device)
model.eval()
result = model(input_ids, attention_mask=input_mask, token_type_ids=token_type_ids)
result = model(input_ids, token_type_ids=token_type_ids)
result = model(input_ids)
self.parent.assertEqual(result.last_hidden_state.shape, (self.batch_size, self.seq_length, self.hidden_size))
def create_and_check_model_as_decoder(
self,
config,
input_ids,
token_type_ids,
input_mask,
sequence_labels,
token_labels,
choice_labels,
encoder_hidden_states,
encoder_attention_mask,
):
config.add_cross_attention = True
model = QDQBertModel(config)
model.to(torch_device)
model.eval()
result = model(
input_ids,
attention_mask=input_mask,
token_type_ids=token_type_ids,
encoder_hidden_states=encoder_hidden_states,
encoder_attention_mask=encoder_attention_mask,
)
result = model(
input_ids,
attention_mask=input_mask,
token_type_ids=token_type_ids,
encoder_hidden_states=encoder_hidden_states,
)
result = model(input_ids, attention_mask=input_mask, token_type_ids=token_type_ids)
self.parent.assertEqual(result.last_hidden_state.shape, (self.batch_size, self.seq_length, self.hidden_size))
def create_and_check_for_causal_lm(
self,
config,
input_ids,
token_type_ids,
input_mask,
sequence_labels,
token_labels,
choice_labels,
encoder_hidden_states,
encoder_attention_mask,
):
model = QDQBertLMHeadModel(config=config)
model.to(torch_device)
model.eval()
result = model(input_ids, attention_mask=input_mask, token_type_ids=token_type_ids, labels=token_labels)
self.parent.assertEqual(result.logits.shape, (self.batch_size, self.seq_length, self.vocab_size))
def create_and_check_for_masked_lm(
self, config, input_ids, token_type_ids, input_mask, sequence_labels, token_labels, choice_labels
):
model = QDQBertForMaskedLM(config=config)
model.to(torch_device)
model.eval()
result = model(input_ids, attention_mask=input_mask, token_type_ids=token_type_ids, labels=token_labels)
self.parent.assertEqual(result.logits.shape, (self.batch_size, self.seq_length, self.vocab_size))
def create_and_check_model_for_causal_lm_as_decoder(
self,
config,
input_ids,
token_type_ids,
input_mask,
sequence_labels,
token_labels,
choice_labels,
encoder_hidden_states,
encoder_attention_mask,
):
config.add_cross_attention = True
model = QDQBertLMHeadModel(config=config)
model.to(torch_device)
model.eval()
result = model(
input_ids,
attention_mask=input_mask,
token_type_ids=token_type_ids,
labels=token_labels,
encoder_hidden_states=encoder_hidden_states,
encoder_attention_mask=encoder_attention_mask,
)
result = model(
input_ids,
attention_mask=input_mask,
token_type_ids=token_type_ids,
labels=token_labels,
encoder_hidden_states=encoder_hidden_states,
)
self.parent.assertEqual(result.logits.shape, (self.batch_size, self.seq_length, self.vocab_size))
def create_and_check_decoder_model_past_large_inputs(
self,
config,
input_ids,
token_type_ids,
input_mask,
sequence_labels,
token_labels,
choice_labels,
encoder_hidden_states,
encoder_attention_mask,
):
config.is_decoder = True
config.add_cross_attention = True
model = QDQBertLMHeadModel(config=config)
model.to(torch_device)
model.eval()
# first forward pass
outputs = model(
input_ids,
attention_mask=input_mask,
encoder_hidden_states=encoder_hidden_states,
encoder_attention_mask=encoder_attention_mask,
use_cache=True,
)
past_key_values = outputs.past_key_values
# create hypothetical multiple next token and extent to next_input_ids
next_tokens = ids_tensor((self.batch_size, 3), config.vocab_size)
next_mask = ids_tensor((self.batch_size, 3), vocab_size=2)
# append to next input_ids and
next_input_ids = torch.cat([input_ids, next_tokens], dim=-1)
next_attention_mask = torch.cat([input_mask, next_mask], dim=-1)
output_from_no_past = model(
next_input_ids,
attention_mask=next_attention_mask,
encoder_hidden_states=encoder_hidden_states,
encoder_attention_mask=encoder_attention_mask,
output_hidden_states=True,
)["hidden_states"][0]
output_from_past = model(
next_tokens,
attention_mask=next_attention_mask,
encoder_hidden_states=encoder_hidden_states,
encoder_attention_mask=encoder_attention_mask,
past_key_values=past_key_values,
output_hidden_states=True,
)["hidden_states"][0]
# select random slice
random_slice_idx = ids_tensor((1,), output_from_past.shape[-1]).item()
output_from_no_past_slice = output_from_no_past[:, -3:, random_slice_idx].detach()
output_from_past_slice = output_from_past[:, :, random_slice_idx].detach()
self.parent.assertTrue(output_from_past_slice.shape[1] == next_tokens.shape[1])
# test that outputs are equal for slice
self.parent.assertTrue(torch.allclose(output_from_past_slice, output_from_no_past_slice, atol=1e-3))
def create_and_check_for_next_sequence_prediction(
self, config, input_ids, token_type_ids, input_mask, sequence_labels, token_labels, choice_labels
):
model = QDQBertForNextSentencePrediction(config=config)
model.to(torch_device)
model.eval()
result = model(
input_ids,
attention_mask=input_mask,
token_type_ids=token_type_ids,
labels=sequence_labels,
)
self.parent.assertEqual(result.logits.shape, (self.batch_size, 2))
def create_and_check_for_question_answering(
self, config, input_ids, token_type_ids, input_mask, sequence_labels, token_labels, choice_labels
):
model = QDQBertForQuestionAnswering(config=config)
model.to(torch_device)
model.eval()
result = model(
input_ids,
attention_mask=input_mask,
token_type_ids=token_type_ids,
start_positions=sequence_labels,
end_positions=sequence_labels,
)
self.parent.assertEqual(result.start_logits.shape, (self.batch_size, self.seq_length))
self.parent.assertEqual(result.end_logits.shape, (self.batch_size, self.seq_length))
def create_and_check_for_sequence_classification(
self, config, input_ids, token_type_ids, input_mask, sequence_labels, token_labels, choice_labels
):
config.num_labels = self.num_labels
model = QDQBertForSequenceClassification(config)
model.to(torch_device)
model.eval()
result = model(input_ids, attention_mask=input_mask, token_type_ids=token_type_ids, labels=sequence_labels)
self.parent.assertEqual(result.logits.shape, (self.batch_size, self.num_labels))
def create_and_check_for_token_classification(
self, config, input_ids, token_type_ids, input_mask, sequence_labels, token_labels, choice_labels
):
config.num_labels = self.num_labels
model = QDQBertForTokenClassification(config=config)
model.to(torch_device)
model.eval()
result = model(input_ids, attention_mask=input_mask, token_type_ids=token_type_ids, labels=token_labels)
self.parent.assertEqual(result.logits.shape, (self.batch_size, self.seq_length, self.num_labels))
def create_and_check_for_multiple_choice(
self, config, input_ids, token_type_ids, input_mask, sequence_labels, token_labels, choice_labels
):
config.num_choices = self.num_choices
model = QDQBertForMultipleChoice(config=config)
model.to(torch_device)
model.eval()
multiple_choice_inputs_ids = input_ids.unsqueeze(1).expand(-1, self.num_choices, -1).contiguous()
multiple_choice_token_type_ids = token_type_ids.unsqueeze(1).expand(-1, self.num_choices, -1).contiguous()
multiple_choice_input_mask = input_mask.unsqueeze(1).expand(-1, self.num_choices, -1).contiguous()
result = model(
multiple_choice_inputs_ids,
attention_mask=multiple_choice_input_mask,
token_type_ids=multiple_choice_token_type_ids,
labels=choice_labels,
)
self.parent.assertEqual(result.logits.shape, (self.batch_size, self.num_choices))
def prepare_config_and_inputs_for_common(self):
config_and_inputs = self.prepare_config_and_inputs()
(
config,
input_ids,
token_type_ids,
input_mask,
sequence_labels,
token_labels,
choice_labels,
) = config_and_inputs
inputs_dict = {"input_ids": input_ids, "token_type_ids": token_type_ids, "attention_mask": input_mask}
return config, inputs_dict
@require_torch
@require_pytorch_quantization
class QDQBertModelTest(ModelTesterMixin, PipelineTesterMixin, unittest.TestCase):
all_model_classes = (
(
QDQBertModel,
QDQBertForMaskedLM,
QDQBertForMultipleChoice,
QDQBertForNextSentencePrediction,
QDQBertForQuestionAnswering,
QDQBertForSequenceClassification,
QDQBertForTokenClassification,
QDQBertLMHeadModel,
)
if is_torch_available()
else ()
)
all_generative_model_classes = (QDQBertLMHeadModel,) if is_torch_available() else ()
pipeline_model_mapping = (
{
"feature-extraction": QDQBertModel,
"fill-mask": QDQBertForMaskedLM,
"question-answering": QDQBertForQuestionAnswering,
"text-classification": QDQBertForSequenceClassification,
"text-generation": QDQBertLMHeadModel,
"token-classification": QDQBertForTokenClassification,
"zero-shot": QDQBertForSequenceClassification,
}
if is_torch_available()
else {}
)
def setUp(self):
self.model_tester = QDQBertModelTester(self)
self.config_tester = ConfigTester(self, config_class=QDQBertConfig, hidden_size=37)
def test_config(self):
self.config_tester.run_common_tests()
def test_model(self):
config_and_inputs = self.model_tester.prepare_config_and_inputs()
self.model_tester.create_and_check_model(*config_and_inputs)
def test_model_various_embeddings(self):
config_and_inputs = self.model_tester.prepare_config_and_inputs()
for type in ["absolute", "relative_key", "relative_key_query"]:
config_and_inputs[0].position_embedding_type = type
self.model_tester.create_and_check_model(*config_and_inputs)
def test_model_as_decoder(self):
config_and_inputs = self.model_tester.prepare_config_and_inputs_for_decoder()
self.model_tester.create_and_check_model_as_decoder(*config_and_inputs)
def test_model_as_decoder_with_default_input_mask(self):
# This regression test was failing with PyTorch < 1.3
(
config,
input_ids,
token_type_ids,
input_mask,
sequence_labels,
token_labels,
choice_labels,
encoder_hidden_states,
encoder_attention_mask,
) = self.model_tester.prepare_config_and_inputs_for_decoder()
input_mask = None
self.model_tester.create_and_check_model_as_decoder(
config,
input_ids,
token_type_ids,
input_mask,
sequence_labels,
token_labels,
choice_labels,
encoder_hidden_states,
encoder_attention_mask,
)
def test_for_causal_lm(self):
config_and_inputs = self.model_tester.prepare_config_and_inputs_for_decoder()
self.model_tester.create_and_check_for_causal_lm(*config_and_inputs)
def test_for_masked_lm(self):
config_and_inputs = self.model_tester.prepare_config_and_inputs()
self.model_tester.create_and_check_for_masked_lm(*config_and_inputs)
def test_for_causal_lm_decoder(self):
config_and_inputs = self.model_tester.prepare_config_and_inputs_for_decoder()
self.model_tester.create_and_check_model_for_causal_lm_as_decoder(*config_and_inputs)
def test_decoder_model_past_with_large_inputs(self):
config_and_inputs = self.model_tester.prepare_config_and_inputs_for_decoder()
self.model_tester.create_and_check_decoder_model_past_large_inputs(*config_and_inputs)
def test_for_multiple_choice(self):
config_and_inputs = self.model_tester.prepare_config_and_inputs()
self.model_tester.create_and_check_for_multiple_choice(*config_and_inputs)
def test_for_next_sequence_prediction(self):
config_and_inputs = self.model_tester.prepare_config_and_inputs()
self.model_tester.create_and_check_for_next_sequence_prediction(*config_and_inputs)
def test_for_question_answering(self):
config_and_inputs = self.model_tester.prepare_config_and_inputs()
self.model_tester.create_and_check_for_question_answering(*config_and_inputs)
def test_for_sequence_classification(self):
config_and_inputs = self.model_tester.prepare_config_and_inputs()
self.model_tester.create_and_check_for_sequence_classification(*config_and_inputs)
def test_for_token_classification(self):
config_and_inputs = self.model_tester.prepare_config_and_inputs()
self.model_tester.create_and_check_for_token_classification(*config_and_inputs)
@slow
def test_model_from_pretrained(self):
model_name = "google-bert/bert-base-uncased"
model = QDQBertModel.from_pretrained(model_name)
self.assertIsNotNone(model)
# Override
def test_feed_forward_chunking(self):
# feed forward chunking is not supported in QDQBert
pass
@require_torch
@require_pytorch_quantization
class QDQBertModelIntegrationTest(unittest.TestCase):
@slow
def test_inference_no_head_absolute_embedding(self):
# Set default quantizers before creating the model.
import pytorch_quantization.nn as quant_nn
from pytorch_quantization.tensor_quant import QuantDescriptor
# The default tensor quantizer is set to use Max calibration method
input_desc = QuantDescriptor(num_bits=8, calib_method="max")
# The default tensor quantizer is set to be per-channel quantization for weights
weight_desc = QuantDescriptor(num_bits=8, axis=((0,)))
quant_nn.QuantLinear.set_default_quant_desc_input(input_desc)
quant_nn.QuantLinear.set_default_quant_desc_weight(weight_desc)
model = QDQBertModel.from_pretrained("google-bert/bert-base-uncased")
input_ids = torch.tensor([[0, 345, 232, 328, 740, 140, 1695, 69, 6078, 1588, 2]])
attention_mask = torch.tensor([[0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]])
output = model(input_ids, attention_mask=attention_mask)[0]
expected_shape = torch.Size((1, 11, 768))
self.assertEqual(output.shape, expected_shape)
expected_slice = torch.tensor(
[[[0.4571, -0.0735, 0.8594], [0.2774, -0.0278, 0.8794], [0.3548, -0.0473, 0.7593]]]
)
self.assertTrue(torch.allclose(output[:, 1:4, 1:4], expected_slice, atol=1e-4))
# coding=utf-8
# Copyright 2022 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Testing suite for the PyTorch REALM model."""
import copy
import unittest
import numpy as np
from transformers import RealmConfig, is_torch_available
from transformers.testing_utils import require_torch, slow, torch_device
from ...test_configuration_common import ConfigTester
from ...test_modeling_common import ModelTesterMixin, floats_tensor, ids_tensor, random_attention_mask
from ...test_pipeline_mixin import PipelineTesterMixin
if is_torch_available():
import torch
from transformers import (
RealmEmbedder,
RealmForOpenQA,
RealmKnowledgeAugEncoder,
RealmReader,
RealmRetriever,
RealmScorer,
RealmTokenizer,
)
class RealmModelTester:
def __init__(
self,
parent,
batch_size=13,
retriever_proj_size=128,
seq_length=7,
is_training=True,
use_input_mask=True,
use_token_type_ids=True,
use_labels=True,
vocab_size=99,
hidden_size=32,
num_hidden_layers=2,
num_attention_heads=4,
intermediate_size=37,
hidden_act="gelu",
hidden_dropout_prob=0.1,
attention_probs_dropout_prob=0.1,
max_position_embeddings=512,
type_vocab_size=16,
type_sequence_label_size=2,
initializer_range=0.02,
layer_norm_eps=1e-12,
span_hidden_size=50,
max_span_width=10,
reader_layer_norm_eps=1e-3,
reader_beam_size=4,
reader_seq_len=288 + 32,
num_block_records=13353718,
searcher_beam_size=8,
searcher_seq_len=64,
num_labels=3,
num_choices=4,
num_candidates=10,
scope=None,
):
# General config
self.parent = parent
self.batch_size = batch_size
self.retriever_proj_size = retriever_proj_size
self.seq_length = seq_length
self.is_training = is_training
self.use_input_mask = use_input_mask
self.use_token_type_ids = use_token_type_ids
self.use_labels = use_labels
self.vocab_size = vocab_size
self.hidden_size = hidden_size
self.num_hidden_layers = num_hidden_layers
self.num_attention_heads = num_attention_heads
self.intermediate_size = intermediate_size
self.hidden_act = hidden_act
self.hidden_dropout_prob = hidden_dropout_prob
self.attention_probs_dropout_prob = attention_probs_dropout_prob
self.max_position_embeddings = max_position_embeddings
self.type_vocab_size = type_vocab_size
self.type_sequence_label_size = type_sequence_label_size
self.initializer_range = initializer_range
self.layer_norm_eps = layer_norm_eps
# Reader config
self.span_hidden_size = span_hidden_size
self.max_span_width = max_span_width
self.reader_layer_norm_eps = reader_layer_norm_eps
self.reader_beam_size = reader_beam_size
self.reader_seq_len = reader_seq_len
# Searcher config
self.num_block_records = num_block_records
self.searcher_beam_size = searcher_beam_size
self.searcher_seq_len = searcher_seq_len
self.num_labels = num_labels
self.num_choices = num_choices
self.num_candidates = num_candidates
self.scope = scope
def prepare_config_and_inputs(self):
input_ids = ids_tensor([self.batch_size, self.seq_length], self.vocab_size)
candiate_input_ids = ids_tensor([self.batch_size, self.num_candidates, self.seq_length], self.vocab_size)
reader_input_ids = ids_tensor([self.reader_beam_size, self.reader_seq_len], self.vocab_size)
input_mask = None
candiate_input_mask = None
reader_input_mask = None
if self.use_input_mask:
input_mask = random_attention_mask([self.batch_size, self.seq_length])
candiate_input_mask = random_attention_mask([self.batch_size, self.num_candidates, self.seq_length])
reader_input_mask = random_attention_mask([self.reader_beam_size, self.reader_seq_len])
token_type_ids = None
candidate_token_type_ids = None
reader_token_type_ids = None
if self.use_token_type_ids:
token_type_ids = ids_tensor([self.batch_size, self.seq_length], self.type_vocab_size)
candidate_token_type_ids = ids_tensor(
[self.batch_size, self.num_candidates, self.seq_length], self.type_vocab_size
)
reader_token_type_ids = ids_tensor([self.reader_beam_size, self.reader_seq_len], self.type_vocab_size)
sequence_labels = None
token_labels = None
choice_labels = None
if self.use_labels:
sequence_labels = ids_tensor([self.batch_size], self.type_sequence_label_size)
token_labels = ids_tensor([self.batch_size, self.seq_length], self.num_labels)
choice_labels = ids_tensor([self.batch_size], self.num_choices)
config = self.get_config()
# inputs with additional num_candidates axis.
scorer_encoder_inputs = (candiate_input_ids, candiate_input_mask, candidate_token_type_ids)
# reader inputs
reader_inputs = (reader_input_ids, reader_input_mask, reader_token_type_ids)
return (
config,
input_ids,
token_type_ids,
input_mask,
scorer_encoder_inputs,
reader_inputs,
sequence_labels,
token_labels,
choice_labels,
)
def get_config(self):
return RealmConfig(
vocab_size=self.vocab_size,
hidden_size=self.hidden_size,
retriever_proj_size=self.retriever_proj_size,
num_hidden_layers=self.num_hidden_layers,
num_attention_heads=self.num_attention_heads,
num_candidates=self.num_candidates,
intermediate_size=self.intermediate_size,
hidden_act=self.hidden_act,
hidden_dropout_prob=self.hidden_dropout_prob,
attention_probs_dropout_prob=self.attention_probs_dropout_prob,
max_position_embeddings=self.max_position_embeddings,
type_vocab_size=self.type_vocab_size,
initializer_range=self.initializer_range,
)
def create_and_check_embedder(
self,
config,
input_ids,
token_type_ids,
input_mask,
scorer_encoder_inputs,
reader_inputs,
sequence_labels,
token_labels,
choice_labels,
):
model = RealmEmbedder(config=config)
model.to(torch_device)
model.eval()
result = model(input_ids, attention_mask=input_mask, token_type_ids=token_type_ids)
self.parent.assertEqual(result.projected_score.shape, (self.batch_size, self.retriever_proj_size))
def create_and_check_encoder(
self,
config,
input_ids,
token_type_ids,
input_mask,
scorer_encoder_inputs,
reader_inputs,
sequence_labels,
token_labels,
choice_labels,
):
model = RealmKnowledgeAugEncoder(config=config)
model.to(torch_device)
model.eval()
relevance_score = floats_tensor([self.batch_size, self.num_candidates])
result = model(
scorer_encoder_inputs[0],
attention_mask=scorer_encoder_inputs[1],
token_type_ids=scorer_encoder_inputs[2],
relevance_score=relevance_score,
labels=token_labels,
)
self.parent.assertEqual(
result.logits.shape, (self.batch_size * self.num_candidates, self.seq_length, self.vocab_size)
)
def create_and_check_reader(
self,
config,
input_ids,
token_type_ids,
input_mask,
scorer_encoder_inputs,
reader_inputs,
sequence_labels,
token_labels,
choice_labels,
):
model = RealmReader(config=config)
model.to(torch_device)
model.eval()
relevance_score = floats_tensor([self.reader_beam_size])
result = model(
reader_inputs[0],
attention_mask=reader_inputs[1],
token_type_ids=reader_inputs[2],
relevance_score=relevance_score,
)
self.parent.assertEqual(result.block_idx.shape, ())
self.parent.assertEqual(result.candidate.shape, ())
self.parent.assertEqual(result.start_pos.shape, ())
self.parent.assertEqual(result.end_pos.shape, ())
def create_and_check_scorer(
self,
config,
input_ids,
token_type_ids,
input_mask,
scorer_encoder_inputs,
reader_inputs,
sequence_labels,
token_labels,
choice_labels,
):
model = RealmScorer(config=config)
model.to(torch_device)
model.eval()
result = model(
input_ids,
attention_mask=input_mask,
token_type_ids=token_type_ids,
candidate_input_ids=scorer_encoder_inputs[0],
candidate_attention_mask=scorer_encoder_inputs[1],
candidate_token_type_ids=scorer_encoder_inputs[2],
)
self.parent.assertEqual(result.relevance_score.shape, (self.batch_size, self.num_candidates))
self.parent.assertEqual(result.query_score.shape, (self.batch_size, self.retriever_proj_size))
self.parent.assertEqual(
result.candidate_score.shape, (self.batch_size, self.num_candidates, self.retriever_proj_size)
)
def prepare_config_and_inputs_for_common(self):
config_and_inputs = self.prepare_config_and_inputs()
(
config,
input_ids,
token_type_ids,
input_mask,
scorer_encoder_inputs,
reader_inputs,
sequence_labels,
token_labels,
choice_labels,
) = config_and_inputs
inputs_dict = {"input_ids": input_ids, "token_type_ids": token_type_ids, "attention_mask": input_mask}
return config, inputs_dict
@require_torch
class RealmModelTest(ModelTesterMixin, PipelineTesterMixin, unittest.TestCase):
all_model_classes = (
(
RealmEmbedder,
RealmKnowledgeAugEncoder,
# RealmScorer is excluded from common tests as it is a container model
# consisting of two RealmEmbedders & a simple inner product calculation.
# RealmScorer
)
if is_torch_available()
else ()
)
all_generative_model_classes = ()
pipeline_model_mapping = {} if is_torch_available() else {}
# disable these tests because there is no base_model in Realm
test_save_load_fast_init_from_base = False
test_save_load_fast_init_to_base = False
def setUp(self):
self.test_pruning = False
self.model_tester = RealmModelTester(self)
self.config_tester = ConfigTester(self, config_class=RealmConfig)
def test_config(self):
self.config_tester.run_common_tests()
def test_embedder(self):
config_and_inputs = self.model_tester.prepare_config_and_inputs()
self.model_tester.create_and_check_embedder(*config_and_inputs)
def test_encoder(self):
config_and_inputs = self.model_tester.prepare_config_and_inputs()
self.model_tester.create_and_check_encoder(*config_and_inputs)
def test_model_various_embeddings(self):
config_and_inputs = self.model_tester.prepare_config_and_inputs()
for type in ["absolute", "relative_key", "relative_key_query"]:
config_and_inputs[0].position_embedding_type = type
self.model_tester.create_and_check_embedder(*config_and_inputs)
self.model_tester.create_and_check_encoder(*config_and_inputs)
def test_scorer(self):
config_and_inputs = self.model_tester.prepare_config_and_inputs()
self.model_tester.create_and_check_scorer(*config_and_inputs)
def test_training(self):
if not self.model_tester.is_training:
return
config, *inputs = self.model_tester.prepare_config_and_inputs()
input_ids, token_type_ids, input_mask, scorer_encoder_inputs = inputs[0:4]
config.return_dict = True
tokenizer = RealmTokenizer.from_pretrained("google/realm-orqa-nq-openqa")
# RealmKnowledgeAugEncoder training
model = RealmKnowledgeAugEncoder(config)
model.to(torch_device)
model.train()
inputs_dict = {
"input_ids": scorer_encoder_inputs[0].to(torch_device),
"attention_mask": scorer_encoder_inputs[1].to(torch_device),
"token_type_ids": scorer_encoder_inputs[2].to(torch_device),
"relevance_score": floats_tensor([self.model_tester.batch_size, self.model_tester.num_candidates]),
}
inputs_dict["labels"] = torch.zeros(
(self.model_tester.batch_size, self.model_tester.seq_length), dtype=torch.long, device=torch_device
)
inputs = inputs_dict
loss = model(**inputs).loss
loss.backward()
# RealmForOpenQA training
openqa_config = copy.deepcopy(config)
openqa_config.vocab_size = 30522 # the retrieved texts will inevitably have more than 99 vocabs.
openqa_config.num_block_records = 5
openqa_config.searcher_beam_size = 2
block_records = np.array(
[
b"This is the first record.",
b"This is the second record.",
b"This is the third record.",
b"This is the fourth record.",
b"This is the fifth record.",
],
dtype=object,
)
retriever = RealmRetriever(block_records, tokenizer)
model = RealmForOpenQA(openqa_config, retriever)
model.to(torch_device)
model.train()
inputs_dict = {
"input_ids": input_ids[:1].to(torch_device),
"attention_mask": input_mask[:1].to(torch_device),
"token_type_ids": token_type_ids[:1].to(torch_device),
"answer_ids": input_ids[:1].tolist(),
}
inputs = self._prepare_for_class(inputs_dict, RealmForOpenQA)
loss = model(**inputs).reader_output.loss
loss.backward()
# Test model.block_embedding_to
device = torch.device("cpu")
model.block_embedding_to(device)
loss = model(**inputs).reader_output.loss
loss.backward()
self.assertEqual(model.block_emb.device.type, device.type)
@slow
def test_embedder_from_pretrained(self):
model = RealmEmbedder.from_pretrained("google/realm-cc-news-pretrained-embedder")
self.assertIsNotNone(model)
@slow
def test_encoder_from_pretrained(self):
model = RealmKnowledgeAugEncoder.from_pretrained("google/realm-cc-news-pretrained-encoder")
self.assertIsNotNone(model)
@slow
def test_open_qa_from_pretrained(self):
model = RealmForOpenQA.from_pretrained("google/realm-orqa-nq-openqa")
self.assertIsNotNone(model)
@slow
def test_reader_from_pretrained(self):
model = RealmReader.from_pretrained("google/realm-orqa-nq-reader")
self.assertIsNotNone(model)
@slow
def test_scorer_from_pretrained(self):
model = RealmScorer.from_pretrained("google/realm-cc-news-pretrained-scorer")
self.assertIsNotNone(model)
@require_torch
class RealmModelIntegrationTest(unittest.TestCase):
@slow
def test_inference_embedder(self):
retriever_projected_size = 128
model = RealmEmbedder.from_pretrained("google/realm-cc-news-pretrained-embedder")
input_ids = torch.tensor([[0, 1, 2, 3, 4, 5]])
output = model(input_ids)[0]
expected_shape = torch.Size((1, retriever_projected_size))
self.assertEqual(output.shape, expected_shape)
expected_slice = torch.tensor([[-0.0714, -0.0837, -0.1314]])
self.assertTrue(torch.allclose(output[:, :3], expected_slice, atol=1e-4))
@slow
def test_inference_encoder(self):
num_candidates = 2
vocab_size = 30522
model = RealmKnowledgeAugEncoder.from_pretrained(
"google/realm-cc-news-pretrained-encoder", num_candidates=num_candidates
)
input_ids = torch.tensor([[0, 1, 2, 3, 4, 5], [6, 7, 8, 9, 10, 11]])
relevance_score = torch.tensor([[0.3, 0.7]], dtype=torch.float32)
output = model(input_ids, relevance_score=relevance_score)[0]
expected_shape = torch.Size((2, 6, vocab_size))
self.assertEqual(output.shape, expected_shape)
expected_slice = torch.tensor([[[-11.0888, -11.2544], [-10.2170, -10.3874]]])
self.assertTrue(torch.allclose(output[1, :2, :2], expected_slice, atol=1e-4))
@slow
def test_inference_open_qa(self):
from transformers.models.realm.retrieval_realm import RealmRetriever
tokenizer = RealmTokenizer.from_pretrained("google/realm-orqa-nq-openqa")
retriever = RealmRetriever.from_pretrained("google/realm-orqa-nq-openqa")
model = RealmForOpenQA.from_pretrained(
"google/realm-orqa-nq-openqa",
retriever=retriever,
)
question = "Who is the pioneer in modern computer science?"
question = tokenizer(
[question],
padding=True,
truncation=True,
max_length=model.config.searcher_seq_len,
return_tensors="pt",
).to(model.device)
predicted_answer_ids = model(**question).predicted_answer_ids
predicted_answer = tokenizer.decode(predicted_answer_ids)
self.assertEqual(predicted_answer, "alan mathison turing")
@slow
def test_inference_reader(self):
config = RealmConfig(reader_beam_size=2, max_span_width=3)
model = RealmReader.from_pretrained("google/realm-orqa-nq-reader", config=config)
concat_input_ids = torch.arange(10).view((2, 5))
concat_token_type_ids = torch.tensor([[0, 0, 1, 1, 1], [0, 0, 1, 1, 1]], dtype=torch.int64)
concat_block_mask = torch.tensor([[0, 0, 1, 1, 0], [0, 0, 1, 1, 0]], dtype=torch.int64)
relevance_score = torch.tensor([0.3, 0.7], dtype=torch.float32)
output = model(
concat_input_ids,
token_type_ids=concat_token_type_ids,
relevance_score=relevance_score,
block_mask=concat_block_mask,
return_dict=True,
)
block_idx_expected_shape = torch.Size(())
start_pos_expected_shape = torch.Size((1,))
end_pos_expected_shape = torch.Size((1,))
self.assertEqual(output.block_idx.shape, block_idx_expected_shape)
self.assertEqual(output.start_pos.shape, start_pos_expected_shape)
self.assertEqual(output.end_pos.shape, end_pos_expected_shape)
expected_block_idx = torch.tensor(1)
expected_start_pos = torch.tensor(3)
expected_end_pos = torch.tensor(3)
self.assertTrue(torch.allclose(output.block_idx, expected_block_idx, atol=1e-4))
self.assertTrue(torch.allclose(output.start_pos, expected_start_pos, atol=1e-4))
self.assertTrue(torch.allclose(output.end_pos, expected_end_pos, atol=1e-4))
@slow
def test_inference_scorer(self):
num_candidates = 2
model = RealmScorer.from_pretrained("google/realm-cc-news-pretrained-scorer", num_candidates=num_candidates)
input_ids = torch.tensor([[0, 1, 2, 3, 4, 5]])
candidate_input_ids = torch.tensor([[0, 1, 2, 3, 4, 5], [6, 7, 8, 9, 10, 11]])
output = model(input_ids, candidate_input_ids=candidate_input_ids)[0]
expected_shape = torch.Size((1, 2))
self.assertEqual(output.shape, expected_shape)
expected_slice = torch.tensor([[0.7410, 0.7170]])
self.assertTrue(torch.allclose(output, expected_slice, atol=1e-4))
# coding=utf-8
# Copyright 2022 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import shutil
import tempfile
from unittest import TestCase
from unittest.mock import patch
import numpy as np
from datasets import Dataset
from transformers.models.realm.configuration_realm import RealmConfig
from transformers.models.realm.retrieval_realm import _REALM_BLOCK_RECORDS_FILENAME, RealmRetriever
from transformers.models.realm.tokenization_realm import VOCAB_FILES_NAMES, RealmTokenizer
class RealmRetrieverTest(TestCase):
def setUp(self):
self.tmpdirname = tempfile.mkdtemp()
self.num_block_records = 5
# Realm tok
vocab_tokens = [
"[UNK]",
"[CLS]",
"[SEP]",
"[PAD]",
"[MASK]",
"test",
"question",
"this",
"is",
"the",
"first",
"second",
"third",
"fourth",
"fifth",
"record",
"want",
"##want",
"##ed",
"wa",
"un",
"runn",
"##ing",
",",
"low",
"lowest",
]
realm_tokenizer_path = os.path.join(self.tmpdirname, "realm_tokenizer")
os.makedirs(realm_tokenizer_path, exist_ok=True)
self.vocab_file = os.path.join(realm_tokenizer_path, VOCAB_FILES_NAMES["vocab_file"])
with open(self.vocab_file, "w", encoding="utf-8") as vocab_writer:
vocab_writer.write("".join([x + "\n" for x in vocab_tokens]))
realm_block_records_path = os.path.join(self.tmpdirname, "realm_block_records")
os.makedirs(realm_block_records_path, exist_ok=True)
def get_tokenizer(self) -> RealmTokenizer:
return RealmTokenizer.from_pretrained(os.path.join(self.tmpdirname, "realm_tokenizer"))
def tearDown(self):
shutil.rmtree(self.tmpdirname)
def get_config(self):
config = RealmConfig(num_block_records=self.num_block_records)
return config
def get_dummy_dataset(self):
dataset = Dataset.from_dict(
{
"id": ["0", "1"],
"question": ["foo", "bar"],
"answers": [["Foo", "Bar"], ["Bar"]],
}
)
return dataset
def get_dummy_block_records(self):
block_records = np.array(
[
b"This is the first record",
b"This is the second record",
b"This is the third record",
b"This is the fourth record",
b"This is the fifth record",
b"This is a longer longer longer record",
],
dtype=object,
)
return block_records
def get_dummy_retriever(self):
retriever = RealmRetriever(
block_records=self.get_dummy_block_records(),
tokenizer=self.get_tokenizer(),
)
return retriever
def test_retrieve(self):
config = self.get_config()
retriever = self.get_dummy_retriever()
tokenizer = retriever.tokenizer
retrieved_block_ids = np.array([0, 3], dtype="long")
question_input_ids = tokenizer(["Test question"]).input_ids
answer_ids = tokenizer(
["the fourth"],
add_special_tokens=False,
return_token_type_ids=False,
return_attention_mask=False,
).input_ids
max_length = config.reader_seq_len
has_answers, start_pos, end_pos, concat_inputs = retriever(
retrieved_block_ids, question_input_ids, answer_ids=answer_ids, max_length=max_length, return_tensors="np"
)
self.assertEqual(len(has_answers), 2)
self.assertEqual(len(start_pos), 2)
self.assertEqual(len(end_pos), 2)
self.assertEqual(concat_inputs.input_ids.shape, (2, 10))
self.assertEqual(concat_inputs.attention_mask.shape, (2, 10))
self.assertEqual(concat_inputs.token_type_ids.shape, (2, 10))
self.assertEqual(concat_inputs.special_tokens_mask.shape, (2, 10))
self.assertEqual(
tokenizer.convert_ids_to_tokens(concat_inputs.input_ids[0]),
["[CLS]", "test", "question", "[SEP]", "this", "is", "the", "first", "record", "[SEP]"],
)
self.assertEqual(
tokenizer.convert_ids_to_tokens(concat_inputs.input_ids[1]),
["[CLS]", "test", "question", "[SEP]", "this", "is", "the", "fourth", "record", "[SEP]"],
)
def test_block_has_answer(self):
config = self.get_config()
retriever = self.get_dummy_retriever()
tokenizer = retriever.tokenizer
retrieved_block_ids = np.array([0, 3, 5], dtype="long")
question_input_ids = tokenizer(["Test question"]).input_ids
answer_ids = tokenizer(
["the fourth", "longer longer"],
add_special_tokens=False,
return_token_type_ids=False,
return_attention_mask=False,
).input_ids
max_length = config.reader_seq_len
has_answers, start_pos, end_pos, _ = retriever(
retrieved_block_ids, question_input_ids, answer_ids=answer_ids, max_length=max_length, return_tensors="np"
)
self.assertEqual([False, True, True], has_answers)
self.assertEqual([[-1, -1, -1], [6, -1, -1], [6, 7, 8]], start_pos)
self.assertEqual([[-1, -1, -1], [7, -1, -1], [7, 8, 9]], end_pos)
def test_save_load_pretrained(self):
retriever = self.get_dummy_retriever()
retriever.save_pretrained(os.path.join(self.tmpdirname, "realm_block_records"))
# Test local path
retriever = retriever.from_pretrained(os.path.join(self.tmpdirname, "realm_block_records"))
self.assertEqual(retriever.block_records[0], b"This is the first record")
# Test mocked remote path
with patch("transformers.models.realm.retrieval_realm.hf_hub_download") as mock_hf_hub_download:
mock_hf_hub_download.return_value = os.path.join(
os.path.join(self.tmpdirname, "realm_block_records"), _REALM_BLOCK_RECORDS_FILENAME
)
retriever = RealmRetriever.from_pretrained("google/realm-cc-news-pretrained-openqa")
self.assertEqual(retriever.block_records[0], b"This is the first record")
# coding=utf-8
# Copyright 2022 The HuggingFace Team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import unittest
from transformers import RealmTokenizerFast
from transformers.models.bert.tokenization_bert import (
VOCAB_FILES_NAMES,
BasicTokenizer,
WordpieceTokenizer,
_is_control,
_is_punctuation,
_is_whitespace,
)
from transformers.models.realm.tokenization_realm import RealmTokenizer
from transformers.testing_utils import require_tokenizers, slow
from ...test_tokenization_common import TokenizerTesterMixin, filter_non_english
@require_tokenizers
class RealmTokenizationTest(TokenizerTesterMixin, unittest.TestCase):
from_pretrained_id = "google/realm-cc-news-pretrained-embedder"
tokenizer_class = RealmTokenizer
rust_tokenizer_class = RealmTokenizerFast
test_rust_tokenizer = True
space_between_special_tokens = True
from_pretrained_filter = filter_non_english
def setUp(self):
super().setUp()
vocab_tokens = [
"[UNK]",
"[CLS]",
"[SEP]",
"[PAD]",
"[MASK]",
"want",
"##want",
"##ed",
"wa",
"un",
"runn",
"##ing",
",",
"low",
"lowest",
]
self.vocab_file = os.path.join(self.tmpdirname, VOCAB_FILES_NAMES["vocab_file"])
with open(self.vocab_file, "w", encoding="utf-8") as vocab_writer:
vocab_writer.write("".join([x + "\n" for x in vocab_tokens]))
def get_input_output_texts(self, tokenizer):
input_text = "UNwant\u00e9d,running"
output_text = "unwanted, running"
return input_text, output_text
def test_full_tokenizer(self):
tokenizer = self.tokenizer_class(self.vocab_file)
tokens = tokenizer.tokenize("UNwant\u00e9d,running")
self.assertListEqual(tokens, ["un", "##want", "##ed", ",", "runn", "##ing"])
self.assertListEqual(tokenizer.convert_tokens_to_ids(tokens), [9, 6, 7, 12, 10, 11])
def test_rust_and_python_full_tokenizers(self):
if not self.test_rust_tokenizer:
return
tokenizer = self.get_tokenizer()
rust_tokenizer = self.get_rust_tokenizer()
sequence = "UNwant\u00e9d,running"
tokens = tokenizer.tokenize(sequence)
rust_tokens = rust_tokenizer.tokenize(sequence)
self.assertListEqual(tokens, rust_tokens)
ids = tokenizer.encode(sequence, add_special_tokens=False)
rust_ids = rust_tokenizer.encode(sequence, add_special_tokens=False)
self.assertListEqual(ids, rust_ids)
rust_tokenizer = self.get_rust_tokenizer()
ids = tokenizer.encode(sequence)
rust_ids = rust_tokenizer.encode(sequence)
self.assertListEqual(ids, rust_ids)
# With lower casing
tokenizer = self.get_tokenizer(do_lower_case=True)
rust_tokenizer = self.get_rust_tokenizer(do_lower_case=True)
sequence = "UNwant\u00e9d,running"
tokens = tokenizer.tokenize(sequence)
rust_tokens = rust_tokenizer.tokenize(sequence)
self.assertListEqual(tokens, rust_tokens)
ids = tokenizer.encode(sequence, add_special_tokens=False)
rust_ids = rust_tokenizer.encode(sequence, add_special_tokens=False)
self.assertListEqual(ids, rust_ids)
rust_tokenizer = self.get_rust_tokenizer()
ids = tokenizer.encode(sequence)
rust_ids = rust_tokenizer.encode(sequence)
self.assertListEqual(ids, rust_ids)
def test_chinese(self):
tokenizer = BasicTokenizer()
self.assertListEqual(tokenizer.tokenize("ah\u535a\u63a8zz"), ["ah", "\u535a", "\u63a8", "zz"])
def test_basic_tokenizer_lower(self):
tokenizer = BasicTokenizer(do_lower_case=True)
self.assertListEqual(
tokenizer.tokenize(" \tHeLLo!how \n Are yoU? "), ["hello", "!", "how", "are", "you", "?"]
)
self.assertListEqual(tokenizer.tokenize("H\u00e9llo"), ["hello"])
def test_basic_tokenizer_lower_strip_accents_false(self):
tokenizer = BasicTokenizer(do_lower_case=True, strip_accents=False)
self.assertListEqual(
tokenizer.tokenize(" \tHäLLo!how \n Are yoU? "), ["hällo", "!", "how", "are", "you", "?"]
)
self.assertListEqual(tokenizer.tokenize("H\u00e9llo"), ["h\u00e9llo"])
def test_basic_tokenizer_lower_strip_accents_true(self):
tokenizer = BasicTokenizer(do_lower_case=True, strip_accents=True)
self.assertListEqual(
tokenizer.tokenize(" \tHäLLo!how \n Are yoU? "), ["hallo", "!", "how", "are", "you", "?"]
)
self.assertListEqual(tokenizer.tokenize("H\u00e9llo"), ["hello"])
def test_basic_tokenizer_lower_strip_accents_default(self):
tokenizer = BasicTokenizer(do_lower_case=True)
self.assertListEqual(
tokenizer.tokenize(" \tHäLLo!how \n Are yoU? "), ["hallo", "!", "how", "are", "you", "?"]
)
self.assertListEqual(tokenizer.tokenize("H\u00e9llo"), ["hello"])
def test_basic_tokenizer_no_lower(self):
tokenizer = BasicTokenizer(do_lower_case=False)
self.assertListEqual(
tokenizer.tokenize(" \tHeLLo!how \n Are yoU? "), ["HeLLo", "!", "how", "Are", "yoU", "?"]
)
def test_basic_tokenizer_no_lower_strip_accents_false(self):
tokenizer = BasicTokenizer(do_lower_case=False, strip_accents=False)
self.assertListEqual(
tokenizer.tokenize(" \tHäLLo!how \n Are yoU? "), ["HäLLo", "!", "how", "Are", "yoU", "?"]
)
def test_basic_tokenizer_no_lower_strip_accents_true(self):
tokenizer = BasicTokenizer(do_lower_case=False, strip_accents=True)
self.assertListEqual(
tokenizer.tokenize(" \tHäLLo!how \n Are yoU? "), ["HaLLo", "!", "how", "Are", "yoU", "?"]
)
def test_basic_tokenizer_respects_never_split_tokens(self):
tokenizer = BasicTokenizer(do_lower_case=False, never_split=["[UNK]"])
self.assertListEqual(
tokenizer.tokenize(" \tHeLLo!how \n Are yoU? [UNK]"), ["HeLLo", "!", "how", "Are", "yoU", "?", "[UNK]"]
)
def test_wordpiece_tokenizer(self):
vocab_tokens = ["[UNK]", "[CLS]", "[SEP]", "want", "##want", "##ed", "wa", "un", "runn", "##ing"]
vocab = {}
for i, token in enumerate(vocab_tokens):
vocab[token] = i
tokenizer = WordpieceTokenizer(vocab=vocab, unk_token="[UNK]")
self.assertListEqual(tokenizer.tokenize(""), [])
self.assertListEqual(tokenizer.tokenize("unwanted running"), ["un", "##want", "##ed", "runn", "##ing"])
self.assertListEqual(tokenizer.tokenize("unwantedX running"), ["[UNK]", "runn", "##ing"])
def test_is_whitespace(self):
self.assertTrue(_is_whitespace(" "))
self.assertTrue(_is_whitespace("\t"))
self.assertTrue(_is_whitespace("\r"))
self.assertTrue(_is_whitespace("\n"))
self.assertTrue(_is_whitespace("\u00a0"))
self.assertFalse(_is_whitespace("A"))
self.assertFalse(_is_whitespace("-"))
def test_is_control(self):
self.assertTrue(_is_control("\u0005"))
self.assertFalse(_is_control("A"))
self.assertFalse(_is_control(" "))
self.assertFalse(_is_control("\t"))
self.assertFalse(_is_control("\r"))
def test_is_punctuation(self):
self.assertTrue(_is_punctuation("-"))
self.assertTrue(_is_punctuation("$"))
self.assertTrue(_is_punctuation("`"))
self.assertTrue(_is_punctuation("."))
self.assertFalse(_is_punctuation("A"))
self.assertFalse(_is_punctuation(" "))
def test_clean_text(self):
tokenizer = self.get_tokenizer()
# Example taken from the issue https://github.com/huggingface/tokenizers/issues/340
self.assertListEqual([tokenizer.tokenize(t) for t in ["Test", "\xad", "test"]], [["[UNK]"], [], ["[UNK]"]])
if self.test_rust_tokenizer:
rust_tokenizer = self.get_rust_tokenizer()
self.assertListEqual(
[rust_tokenizer.tokenize(t) for t in ["Test", "\xad", "test"]], [["[UNK]"], [], ["[UNK]"]]
)
@slow
def test_sequence_builders(self):
tokenizer = self.tokenizer_class.from_pretrained("google-bert/bert-base-uncased")
text = tokenizer.encode("sequence builders", add_special_tokens=False)
text_2 = tokenizer.encode("multi-sequence build", add_special_tokens=False)
encoded_sentence = tokenizer.build_inputs_with_special_tokens(text)
encoded_pair = tokenizer.build_inputs_with_special_tokens(text, text_2)
assert encoded_sentence == [101] + text + [102]
assert encoded_pair == [101] + text + [102] + text_2 + [102]
def test_offsets_with_special_characters(self):
for tokenizer, pretrained_name, kwargs in self.tokenizers_list:
with self.subTest(f"{tokenizer.__class__.__name__} ({pretrained_name})"):
tokenizer_r = self.rust_tokenizer_class.from_pretrained(pretrained_name, **kwargs)
sentence = f"A, naïve {tokenizer_r.mask_token} AllenNLP sentence."
tokens = tokenizer_r.encode_plus(
sentence,
return_attention_mask=False,
return_token_type_ids=False,
return_offsets_mapping=True,
add_special_tokens=True,
)
do_lower_case = tokenizer_r.do_lower_case if hasattr(tokenizer_r, "do_lower_case") else False
expected_results = (
[
((0, 0), tokenizer_r.cls_token),
((0, 1), "A"),
((1, 2), ","),
((3, 5), "na"),
((5, 6), "##ï"),
((6, 8), "##ve"),
((9, 15), tokenizer_r.mask_token),
((16, 21), "Allen"),
((21, 23), "##NL"),
((23, 24), "##P"),
((25, 33), "sentence"),
((33, 34), "."),
((0, 0), tokenizer_r.sep_token),
]
if not do_lower_case
else [
((0, 0), tokenizer_r.cls_token),
((0, 1), "a"),
((1, 2), ","),
((3, 8), "naive"),
((9, 15), tokenizer_r.mask_token),
((16, 21), "allen"),
((21, 23), "##nl"),
((23, 24), "##p"),
((25, 33), "sentence"),
((33, 34), "."),
((0, 0), tokenizer_r.sep_token),
]
)
self.assertEqual(
[e[1] for e in expected_results], tokenizer_r.convert_ids_to_tokens(tokens["input_ids"])
)
self.assertEqual([e[0] for e in expected_results], tokens["offset_mapping"])
@slow
def test_batch_encode_candidates(self):
for tokenizer, pretrained_name, kwargs in self.tokenizers_list:
with self.subTest(f"{tokenizer.__class__.__name__} ({pretrained_name})"):
tokenizer_r = self.rust_tokenizer_class.from_pretrained(pretrained_name, **kwargs)
tokenizer_p = self.tokenizer_class.from_pretrained(pretrained_name, **kwargs)
text = [["Hello world!", "Nice to meet you!"], ["The cute cat.", "The adorable dog."]]
encoded_sentence_r = tokenizer_r.batch_encode_candidates(text, max_length=10, return_tensors="np")
encoded_sentence_p = tokenizer_p.batch_encode_candidates(text, max_length=10, return_tensors="np")
expected_shape = (2, 2, 10)
self.assertEqual(encoded_sentence_r["input_ids"].shape, expected_shape)
self.assertEqual(encoded_sentence_r["attention_mask"].shape, expected_shape)
self.assertEqual(encoded_sentence_r["token_type_ids"].shape, expected_shape)
self.assertEqual(encoded_sentence_p["input_ids"].shape, expected_shape)
self.assertEqual(encoded_sentence_p["attention_mask"].shape, expected_shape)
self.assertEqual(encoded_sentence_p["token_type_ids"].shape, expected_shape)
...@@ -23,7 +23,6 @@ from transformers.testing_utils import require_deterministic_for_xpu, require_to ...@@ -23,7 +23,6 @@ from transformers.testing_utils import require_deterministic_for_xpu, require_to
from ...test_modeling_common import floats_tensor, ids_tensor, random_attention_mask from ...test_modeling_common import floats_tensor, ids_tensor, random_attention_mask
from ..bert.test_modeling_bert import BertModelTester from ..bert.test_modeling_bert import BertModelTester
from ..speech_to_text.test_modeling_speech_to_text import Speech2TextModelTester from ..speech_to_text.test_modeling_speech_to_text import Speech2TextModelTester
from ..speech_to_text_2.test_modeling_speech_to_text_2 import Speech2Text2StandaloneDecoderModelTester
from ..wav2vec2.test_modeling_wav2vec2 import Wav2Vec2ModelTester from ..wav2vec2.test_modeling_wav2vec2 import Wav2Vec2ModelTester
...@@ -33,7 +32,6 @@ if is_torch_available(): ...@@ -33,7 +32,6 @@ if is_torch_available():
from transformers import ( from transformers import (
BertLMHeadModel, BertLMHeadModel,
Speech2Text2ForCausalLM,
SpeechEncoderDecoderConfig, SpeechEncoderDecoderConfig,
SpeechEncoderDecoderModel, SpeechEncoderDecoderModel,
Wav2Vec2Model, Wav2Vec2Model,
...@@ -583,43 +581,3 @@ class Speech2TextBertModelTest(EncoderDecoderMixin, unittest.TestCase): ...@@ -583,43 +581,3 @@ class Speech2TextBertModelTest(EncoderDecoderMixin, unittest.TestCase):
# all published pretrained models are Speech2TextModel != Speech2TextEncoder # all published pretrained models are Speech2TextModel != Speech2TextEncoder
def test_real_model_save_load_from_pretrained(self): def test_real_model_save_load_from_pretrained(self):
pass pass
@require_torch
class Wav2Vec2Speech2Text2(EncoderDecoderMixin, unittest.TestCase):
def get_encoder_decoder_model(self, config, decoder_config):
encoder_model = Wav2Vec2Model(config).eval()
decoder_model = Speech2Text2ForCausalLM(decoder_config).eval()
return encoder_model, decoder_model
def prepare_config_and_inputs(self):
model_tester_encoder = Wav2Vec2ModelTester(self, batch_size=13)
model_tester_decoder = Speech2Text2StandaloneDecoderModelTester(
self, batch_size=13, d_model=32, max_position_embeddings=512
)
encoder_config_and_inputs = model_tester_encoder.prepare_config_and_inputs()
decoder_config_and_inputs = model_tester_decoder.prepare_config_and_inputs()
(
config,
input_values,
input_mask,
) = encoder_config_and_inputs
(decoder_config, decoder_input_ids, decoder_attention_mask, _) = decoder_config_and_inputs
# make sure that cross attention layers are added
decoder_config.add_cross_attention = True
# disable cache for now
decoder_config.use_cache = False
return {
"config": config,
"input_values": input_values,
"attention_mask": input_mask,
"decoder_config": decoder_config,
"decoder_input_ids": decoder_input_ids,
"decoder_attention_mask": decoder_attention_mask,
"labels": decoder_input_ids,
}
# there are no published pretrained Speech2Text2ForCausalLM for now
def test_real_model_save_load_from_pretrained(self):
pass
# coding=utf-8
# Copyright 2021 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Testing suite for the PyTorch Speech2Text model."""
import unittest
from transformers import Speech2Text2Config
from transformers.testing_utils import is_torch_available, require_torch, torch_device
from ...generation.test_utils import GenerationTesterMixin
from ...test_configuration_common import ConfigTester
from ...test_modeling_common import ModelTesterMixin, ids_tensor
from ...test_pipeline_mixin import PipelineTesterMixin
if is_torch_available():
import torch
from transformers.models.speech_to_text_2.modeling_speech_to_text_2 import (
Speech2Text2Decoder,
Speech2Text2ForCausalLM,
)
@require_torch
class Speech2Text2StandaloneDecoderModelTester:
def __init__(
self,
parent,
vocab_size=99,
batch_size=13,
d_model=16,
decoder_seq_length=7,
is_training=True,
is_decoder=True,
use_attention_mask=True,
use_cache=False,
use_labels=True,
decoder_start_token_id=2,
decoder_ffn_dim=32,
decoder_layers=2,
decoder_attention_heads=4,
max_position_embeddings=30,
pad_token_id=0,
bos_token_id=1,
eos_token_id=2,
scope=None,
):
self.parent = parent
self.batch_size = batch_size
self.decoder_seq_length = decoder_seq_length
# For common tests
self.seq_length = self.decoder_seq_length
self.is_training = is_training
self.use_attention_mask = use_attention_mask
self.use_labels = use_labels
self.vocab_size = vocab_size
self.d_model = d_model
self.hidden_size = d_model
self.num_hidden_layers = decoder_layers
self.decoder_layers = decoder_layers
self.decoder_ffn_dim = decoder_ffn_dim
self.decoder_attention_heads = decoder_attention_heads
self.num_attention_heads = decoder_attention_heads
self.eos_token_id = eos_token_id
self.bos_token_id = bos_token_id
self.pad_token_id = pad_token_id
self.decoder_start_token_id = decoder_start_token_id
self.use_cache = use_cache
self.max_position_embeddings = max_position_embeddings
self.scope = None
self.decoder_key_length = decoder_seq_length
self.base_model_out_len = 2
self.decoder_attention_idx = 1
def prepare_config_and_inputs(self):
input_ids = ids_tensor([self.batch_size, self.decoder_seq_length], self.vocab_size)
attention_mask = None
if self.use_attention_mask:
attention_mask = ids_tensor([self.batch_size, self.decoder_seq_length], vocab_size=2)
lm_labels = None
if self.use_labels:
lm_labels = ids_tensor([self.batch_size, self.decoder_seq_length], self.vocab_size)
config = Speech2Text2Config(
vocab_size=self.vocab_size,
d_model=self.d_model,
decoder_layers=self.decoder_layers,
decoder_ffn_dim=self.decoder_ffn_dim,
decoder_attention_heads=self.decoder_attention_heads,
eos_token_id=self.eos_token_id,
bos_token_id=self.bos_token_id,
use_cache=self.use_cache,
pad_token_id=self.pad_token_id,
decoder_start_token_id=self.decoder_start_token_id,
max_position_embeddings=self.max_position_embeddings,
)
return (
config,
input_ids,
attention_mask,
lm_labels,
)
def create_and_check_decoder_model_past(
self,
config,
input_ids,
attention_mask,
lm_labels,
):
config.use_cache = True
model = Speech2Text2Decoder(config=config).to(torch_device).eval()
input_ids = input_ids[:2]
input_ids[input_ids == 0] += 1
# first forward pass
outputs = model(input_ids, use_cache=True)
outputs_use_cache_conf = model(input_ids)
outputs_no_past = model(input_ids, use_cache=False)
self.parent.assertTrue(len(outputs) == len(outputs_use_cache_conf))
self.parent.assertTrue(len(outputs) == len(outputs_no_past) + 1)
past_key_values = outputs["past_key_values"]
# create hypothetical next token and extent to next_input_ids
next_tokens = ids_tensor((2, 1), config.vocab_size - 1) + 1
# append to next input_ids and
next_input_ids = torch.cat([input_ids, next_tokens], dim=-1)
print(next_input_ids)
output_from_no_past = model(next_input_ids)["last_hidden_state"]
output_from_past = model(next_tokens, past_key_values=past_key_values)["last_hidden_state"]
# select random slice
random_slice_idx = ids_tensor((1,), output_from_past.shape[-1]).item()
output_from_no_past_slice = output_from_no_past[:, next_input_ids.shape[-1] - 1, random_slice_idx].detach()
output_from_past_slice = output_from_past[:, 0, random_slice_idx].detach()
# test that outputs are equal for slice
assert torch.allclose(output_from_past_slice, output_from_no_past_slice, atol=1e-3)
def prepare_config_and_inputs_for_common(self):
config_and_inputs = self.prepare_config_and_inputs()
(
config,
input_ids,
attention_mask,
lm_labels,
) = config_and_inputs
inputs_dict = {
"input_ids": input_ids,
"attention_mask": attention_mask,
}
return config, inputs_dict
@require_torch
class Speech2Text2StandaloneDecoderModelTest(
ModelTesterMixin, GenerationTesterMixin, PipelineTesterMixin, unittest.TestCase
):
all_model_classes = (Speech2Text2Decoder, Speech2Text2ForCausalLM) if is_torch_available() else ()
all_generative_model_classes = (Speech2Text2ForCausalLM,) if is_torch_available() else ()
pipeline_model_mapping = {"text-generation": Speech2Text2ForCausalLM} if is_torch_available() else {}
fx_compatible = True
test_pruning = False
def setUp(
self,
):
self.model_tester = Speech2Text2StandaloneDecoderModelTester(self, is_training=False)
self.config_tester = ConfigTester(self, config_class=Speech2Text2Config)
# not implemented currently
def test_inputs_embeds(self):
pass
# speech2text2 has no base model
def test_save_load_fast_init_from_base(self):
pass
# speech2text2 has no base model
def test_save_load_fast_init_to_base(self):
pass
def test_config(self):
self.config_tester.run_common_tests()
def test_decoder_model_past(self):
config_and_inputs = self.model_tester.prepare_config_and_inputs()
self.model_tester.create_and_check_decoder_model_past(*config_and_inputs)
# decoder cannot keep gradients
def test_retain_grad_hidden_states_attentions(self):
return
# Copyright 2021 The HuggingFace Team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import inspect
import json
import os
import tempfile
import unittest
from transformers.models.speech_to_text_2 import Speech2Text2Tokenizer
from transformers.models.speech_to_text_2.tokenization_speech_to_text_2 import VOCAB_FILES_NAMES
from ...test_tokenization_common import TokenizerTesterMixin
class SpeechToTextTokenizerTest(TokenizerTesterMixin, unittest.TestCase):
from_pretrained_id = "facebook/s2t-wav2vec2-large-en-de"
tokenizer_class = Speech2Text2Tokenizer
test_rust_tokenizer = False
def setUp(self):
super().setUp()
vocab = "<s> <pad> </s> <unk> here@@ a couple of@@ words for the he@@ re@@ vocab".split(" ")
merges = ["he re</w> 123", "here a 1456"]
vocab_tokens = dict(zip(vocab, range(len(vocab))))
self.special_tokens_map = {"pad_token": "<pad>", "unk_token": "<unk>", "bos_token": "<s>", "eos_token": "</s>"}
self.tmpdirname = tempfile.mkdtemp()
self.vocab_file = os.path.join(self.tmpdirname, VOCAB_FILES_NAMES["vocab_file"])
self.merges_file = os.path.join(self.tmpdirname, VOCAB_FILES_NAMES["merges_file"])
with open(self.vocab_file, "w", encoding="utf-8") as fp:
fp.write(json.dumps(vocab_tokens) + "\n")
with open(self.merges_file, "w") as fp:
fp.write("\n".join(merges))
def test_get_vocab(self):
vocab_keys = list(self.get_tokenizer().get_vocab().keys())
self.assertEqual(vocab_keys[0], "<s>")
self.assertEqual(vocab_keys[1], "<pad>")
self.assertEqual(vocab_keys[-1], "vocab")
self.assertEqual(len(vocab_keys), 14)
def test_vocab_size(self):
self.assertEqual(self.get_tokenizer().vocab_size, 14)
def test_tokenizer_decode(self):
tokenizer = Speech2Text2Tokenizer.from_pretrained(self.tmpdirname)
# make sure @@ is correctly concatenated
token_ids = [4, 6, 8, 7, 10] # ["here@@", "couple", "words", "of@@", "the"]
output_string = tokenizer.decode(token_ids)
self.assertTrue(output_string == "herecouple words ofthe")
def test_load_no_merges_file(self):
tokenizer = Speech2Text2Tokenizer.from_pretrained(self.tmpdirname)
with tempfile.TemporaryDirectory() as tmp_dirname:
tokenizer.save_pretrained(tmp_dirname)
os.remove(os.path.join(tmp_dirname, "merges.txt"))
# load tokenizer without merges file should not throw an error
tokenizer = Speech2Text2Tokenizer.from_pretrained(tmp_dirname)
with tempfile.TemporaryDirectory() as tmp_dirname:
# save tokenizer and load again
tokenizer.save_pretrained(tmp_dirname)
tokenizer = Speech2Text2Tokenizer.from_pretrained(tmp_dirname)
self.assertIsNotNone(tokenizer)
# overwrite since merges_file is optional
def test_tokenizer_slow_store_full_signature(self):
if not self.test_slow_tokenizer:
return
signature = inspect.signature(self.tokenizer_class.__init__)
tokenizer = self.get_tokenizer()
for parameter_name, parameter in signature.parameters.items():
if parameter.default != inspect.Parameter.empty and parameter_name != "merges_file":
self.assertIn(parameter_name, tokenizer.init_kwargs)
# coding=utf-8
# Copyright 2023 HuggingFace Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Testing suite for the TVLT feature extraction."""
import itertools
import random
import unittest
import numpy as np
from transformers import TvltFeatureExtractor, is_datasets_available
from transformers.testing_utils import require_torch, require_torchaudio
from transformers.utils.import_utils import is_torch_available
from ...test_sequence_feature_extraction_common import SequenceFeatureExtractionTestMixin
if is_torch_available():
import torch
if is_datasets_available():
from datasets import load_dataset
global_rng = random.Random()
# Copied from tests.models.whisper.test_feature_extraction_whisper.floats_list
def floats_list(shape, scale=1.0, rng=None, name=None):
"""Creates a random float32 tensor"""
if rng is None:
rng = global_rng
values = []
for batch_idx in range(shape[0]):
values.append([])
for _ in range(shape[1]):
values[-1].append(rng.random() * scale)
return values
class TvltFeatureExtractionTester(unittest.TestCase):
def __init__(
self,
parent,
batch_size=7,
min_seq_length=400,
max_seq_length=2000,
spectrogram_length=2048,
feature_size=128,
num_audio_channels=1,
hop_length=512,
chunk_length=30,
sampling_rate=44100,
):
self.parent = parent
self.batch_size = batch_size
self.min_seq_length = min_seq_length
self.max_seq_length = max_seq_length
self.seq_length_diff = (self.max_seq_length - self.min_seq_length) // (self.batch_size - 1)
self.spectrogram_length = spectrogram_length
self.feature_size = feature_size
self.num_audio_channels = num_audio_channels
self.hop_length = hop_length
self.chunk_length = chunk_length
self.sampling_rate = sampling_rate
def prepare_feat_extract_dict(self):
return {
"spectrogram_length": self.spectrogram_length,
"feature_size": self.feature_size,
"num_audio_channels": self.num_audio_channels,
"hop_length": self.hop_length,
"chunk_length": self.chunk_length,
"sampling_rate": self.sampling_rate,
}
def prepare_inputs_for_common(self, equal_length=False, numpify=False):
def _flatten(list_of_lists):
return list(itertools.chain(*list_of_lists))
if equal_length:
speech_inputs = [floats_list((self.max_seq_length, self.feature_size)) for _ in range(self.batch_size)]
else:
# make sure that inputs increase in size
speech_inputs = [
floats_list((x, self.feature_size))
for x in range(self.min_seq_length, self.max_seq_length, self.seq_length_diff)
]
if numpify:
speech_inputs = [np.asarray(x) for x in speech_inputs]
return speech_inputs
@require_torch
@require_torchaudio
class TvltFeatureExtractionTest(SequenceFeatureExtractionTestMixin, unittest.TestCase):
feature_extraction_class = TvltFeatureExtractor
def setUp(self):
self.feat_extract_tester = TvltFeatureExtractionTester(self)
def test_feat_extract_properties(self):
feature_extractor = self.feature_extraction_class(**self.feat_extract_dict)
self.assertTrue(hasattr(feature_extractor, "spectrogram_length"))
self.assertTrue(hasattr(feature_extractor, "feature_size"))
self.assertTrue(hasattr(feature_extractor, "num_audio_channels"))
self.assertTrue(hasattr(feature_extractor, "hop_length"))
self.assertTrue(hasattr(feature_extractor, "chunk_length"))
self.assertTrue(hasattr(feature_extractor, "sampling_rate"))
def test_call(self):
# Initialize feature_extractor
feature_extractor = self.feature_extraction_class(**self.feat_extract_dict)
# create three inputs of length 800, 1000, and 1200
speech_inputs = [floats_list((1, x))[0] for x in range(800, 1400, 200)]
np_speech_inputs = [np.asarray(speech_input) for speech_input in speech_inputs]
# Test not batched input
encoded_audios = feature_extractor(np_speech_inputs[0], return_tensors="np", sampling_rate=44100).audio_values
self.assertTrue(encoded_audios.ndim == 4)
self.assertTrue(encoded_audios.shape[-1] == feature_extractor.feature_size)
self.assertTrue(encoded_audios.shape[-2] <= feature_extractor.spectrogram_length)
self.assertTrue(encoded_audios.shape[-3] == feature_extractor.num_channels)
# Test batched
encoded_audios = feature_extractor(np_speech_inputs, return_tensors="np", sampling_rate=44100).audio_values
self.assertTrue(encoded_audios.ndim == 4)
self.assertTrue(encoded_audios.shape[-1] == feature_extractor.feature_size)
self.assertTrue(encoded_audios.shape[-2] <= feature_extractor.spectrogram_length)
self.assertTrue(encoded_audios.shape[-3] == feature_extractor.num_channels)
# Test audio masking
encoded_audios = feature_extractor(
np_speech_inputs, return_tensors="np", sampling_rate=44100, mask_audio=True
).audio_values
self.assertTrue(encoded_audios.ndim == 4)
self.assertTrue(encoded_audios.shape[-1] == feature_extractor.feature_size)
self.assertTrue(encoded_audios.shape[-2] <= feature_extractor.spectrogram_length)
self.assertTrue(encoded_audios.shape[-3] == feature_extractor.num_channels)
# Test 2-D numpy arrays are batched.
speech_inputs = [floats_list((1, x))[0] for x in (800, 800, 800)]
np_speech_inputs = np.asarray(speech_inputs)
encoded_audios = feature_extractor(np_speech_inputs, return_tensors="np", sampling_rate=44100).audio_values
self.assertTrue(encoded_audios.ndim == 4)
self.assertTrue(encoded_audios.shape[-1] == feature_extractor.feature_size)
self.assertTrue(encoded_audios.shape[-2] <= feature_extractor.spectrogram_length)
self.assertTrue(encoded_audios.shape[-3] == feature_extractor.num_channels)
def _load_datasamples(self, num_samples):
ds = load_dataset("hf-internal-testing/librispeech_asr_dummy", "clean", split="validation")
# automatic decoding with librispeech
speech_samples = ds.sort("id").select(range(num_samples))[:num_samples]["audio"]
return [x["array"] for x in speech_samples]
def test_integration(self):
input_speech = self._load_datasamples(1)
feature_extractor = TvltFeatureExtractor()
audio_values = feature_extractor(input_speech, return_tensors="pt").audio_values
self.assertEqual(audio_values.shape, (1, 1, 192, 128))
expected_slice = torch.tensor([[-0.3032, -0.2708], [-0.4434, -0.4007]])
self.assertTrue(torch.allclose(audio_values[0, 0, :2, :2], expected_slice, atol=1e-4))
# coding=utf-8
# Copyright 2023 HuggingFace Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Testing suite for the TVLT image processor."""
import unittest
import numpy as np
from transformers.testing_utils import require_torch, require_vision
from transformers.utils import is_torch_available, is_vision_available
from ...test_image_processing_common import ImageProcessingTestMixin
if is_torch_available():
import torch
if is_vision_available():
from PIL import Image
from transformers import TvltImageProcessor
def prepare_video(image_processor_tester, width=10, height=10, numpify=False, torchify=False):
"""This function prepares a video as a list of PIL images/NumPy arrays/PyTorch tensors."""
video = []
for i in range(image_processor_tester.num_frames):
video.append(np.random.randint(255, size=(image_processor_tester.num_channels, width, height), dtype=np.uint8))
if not numpify and not torchify:
# PIL expects the channel dimension as last dimension
video = [Image.fromarray(np.moveaxis(frame, 0, -1)) for frame in video]
if torchify:
video = [torch.from_numpy(frame) for frame in video]
return video
def prepare_video_inputs(image_processor_tester, equal_resolution=False, numpify=False, torchify=False):
"""This function prepares a batch of videos: a list of list of PIL images, or a list of list of numpy arrays if
one specifies numpify=True, or a list of list of PyTorch tensors if one specifies torchify=True.
One can specify whether the videos are of the same resolution or not.
"""
assert not (numpify and torchify), "You cannot specify both numpy and PyTorch tensors at the same time"
video_inputs = []
for i in range(image_processor_tester.batch_size):
if equal_resolution:
width = height = image_processor_tester.max_resolution
else:
width, height = np.random.choice(
np.arange(image_processor_tester.min_resolution, image_processor_tester.max_resolution), 2
)
video = prepare_video(
image_processor_tester=image_processor_tester,
width=width,
height=height,
numpify=numpify,
torchify=torchify,
)
video_inputs.append(video)
return video_inputs
class TvltImageProcessorTester(unittest.TestCase):
def __init__(
self,
parent,
batch_size=7,
num_channels=3,
num_frames=4,
image_size=18,
min_resolution=30,
max_resolution=400,
do_resize=True,
size=None,
do_normalize=True,
image_mean=[0.5, 0.5, 0.5],
image_std=[0.5, 0.5, 0.5],
do_center_crop=True,
crop_size=None,
):
size = size if size is not None else {"shortest_edge": 18}
crop_size = crop_size if crop_size is not None else {"height": 18, "width": 18}
self.parent = parent
self.batch_size = batch_size
self.num_channels = num_channels
self.num_frames = num_frames
self.image_size = image_size
self.min_resolution = min_resolution
self.max_resolution = max_resolution
self.do_resize = do_resize
self.size = size
self.do_normalize = do_normalize
self.image_mean = image_mean
self.image_std = image_std
self.do_center_crop = do_center_crop
self.crop_size = crop_size
def prepare_image_processor_dict(self):
return {
"image_mean": self.image_mean,
"image_std": self.image_std,
"do_normalize": self.do_normalize,
"do_resize": self.do_resize,
"size": self.size,
"do_center_crop": self.do_center_crop,
"crop_size": self.crop_size,
}
@require_torch
@require_vision
class TvltImageProcessorTest(ImageProcessingTestMixin, unittest.TestCase):
image_processing_class = TvltImageProcessor if is_vision_available() else None
def setUp(self):
self.image_processor_tester = TvltImageProcessorTester(self)
@property
def image_processor_dict(self):
return self.image_processor_tester.prepare_image_processor_dict()
def test_image_processor_properties(self):
image_processor = self.image_processing_class(**self.image_processor_dict)
self.assertTrue(hasattr(image_processor, "image_mean"))
self.assertTrue(hasattr(image_processor, "image_std"))
self.assertTrue(hasattr(image_processor, "do_normalize"))
self.assertTrue(hasattr(image_processor, "do_resize"))
self.assertTrue(hasattr(image_processor, "do_center_crop"))
self.assertTrue(hasattr(image_processor, "size"))
def test_call_pil(self):
# Initialize image_processor
image_processor = self.image_processing_class(**self.image_processor_dict)
# create random PIL videos
video_inputs = prepare_video_inputs(self.image_processor_tester, equal_resolution=False)
for video in video_inputs:
self.assertIsInstance(video, list)
self.assertIsInstance(video[0], Image.Image)
# Test not batched input
encoded_videos = image_processor(video_inputs[0], return_tensors="pt").pixel_values
self.assertEqual(
encoded_videos.shape,
(
1,
self.image_processor_tester.num_frames,
self.image_processor_tester.num_channels,
self.image_processor_tester.crop_size["height"],
self.image_processor_tester.crop_size["width"],
),
)
# Test batched
encoded_videos = image_processor(video_inputs, return_tensors="pt").pixel_values
self.assertEqual(
encoded_videos.shape,
(
self.image_processor_tester.batch_size,
self.image_processor_tester.num_frames,
self.image_processor_tester.num_channels,
self.image_processor_tester.crop_size["height"],
self.image_processor_tester.crop_size["width"],
),
)
def test_call_numpy(self):
# Initialize image_processor
image_processor = self.image_processing_class(**self.image_processor_dict)
# create random numpy tensors
video_inputs = prepare_video_inputs(self.image_processor_tester, equal_resolution=False, numpify=True)
for video in video_inputs:
self.assertIsInstance(video, list)
self.assertIsInstance(video[0], np.ndarray)
# Test not batched input
encoded_videos = image_processor(video_inputs[0], return_tensors="pt").pixel_values
self.assertEqual(
encoded_videos.shape,
(
1,
self.image_processor_tester.num_frames,
self.image_processor_tester.num_channels,
self.image_processor_tester.crop_size["height"],
self.image_processor_tester.crop_size["width"],
),
)
# Test batched
encoded_videos = image_processor(video_inputs, return_tensors="pt").pixel_values
self.assertEqual(
encoded_videos.shape,
(
self.image_processor_tester.batch_size,
self.image_processor_tester.num_frames,
self.image_processor_tester.num_channels,
self.image_processor_tester.crop_size["height"],
self.image_processor_tester.crop_size["width"],
),
)
def test_call_numpy_4_channels(self):
# Initialize image_processor
image_processor = self.image_processing_class(**self.image_processor_dict)
# create random numpy tensors
self.image_processor_tester.num_channels = 4
video_inputs = prepare_video_inputs(self.image_processor_tester, equal_resolution=False, numpify=True)
for video in video_inputs:
self.assertIsInstance(video, list)
self.assertIsInstance(video[0], np.ndarray)
# Test not batched input
encoded_videos = image_processor(
video_inputs[0], return_tensors="pt", input_data_format="channels_first", image_mean=0, image_std=1
).pixel_values
self.assertEqual(
encoded_videos.shape,
(
1,
self.image_processor_tester.num_frames,
self.image_processor_tester.num_channels,
self.image_processor_tester.crop_size["height"],
self.image_processor_tester.crop_size["width"],
),
)
# Test batched
encoded_videos = image_processor(
video_inputs, return_tensors="pt", input_data_format="channels_first", image_mean=0, image_std=1
).pixel_values
self.assertEqual(
encoded_videos.shape,
(
self.image_processor_tester.batch_size,
self.image_processor_tester.num_frames,
self.image_processor_tester.num_channels,
self.image_processor_tester.crop_size["height"],
self.image_processor_tester.crop_size["width"],
),
)
self.image_processor_tester.num_channels = 3
def test_call_pytorch(self):
# Initialize image_processor
image_processor = self.image_processing_class(**self.image_processor_dict)
# create random PyTorch tensors
video_inputs = prepare_video_inputs(self.image_processor_tester, equal_resolution=False, torchify=True)
for video in video_inputs:
self.assertIsInstance(video, list)
self.assertIsInstance(video[0], torch.Tensor)
# Test not batched input
encoded_videos = image_processor(video_inputs[0], return_tensors="pt").pixel_values
self.assertEqual(
encoded_videos.shape,
(
1,
self.image_processor_tester.num_frames,
self.image_processor_tester.num_channels,
self.image_processor_tester.crop_size["height"],
self.image_processor_tester.crop_size["width"],
),
)
# Test batched
encoded_videos = image_processor(video_inputs, return_tensors="pt").pixel_values
self.assertEqual(
encoded_videos.shape,
(
self.image_processor_tester.batch_size,
self.image_processor_tester.num_frames,
self.image_processor_tester.num_channels,
self.image_processor_tester.crop_size["height"],
self.image_processor_tester.crop_size["width"],
),
)
# coding=utf-8
# Copyright 2023 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Testing suite for the PyTorch TVLT model."""
import copy
import inspect
import unittest
import numpy as np
from huggingface_hub import hf_hub_download
from transformers import (
TvltConfig,
is_datasets_available,
is_speech_available,
is_torch_available,
is_vision_available,
)
from transformers.testing_utils import require_torch, require_vision, slow, torch_device
from transformers.utils import cached_property
from ...test_configuration_common import ConfigTester
from ...test_modeling_common import ModelTesterMixin, floats_tensor
from ...test_pipeline_mixin import PipelineTesterMixin
if is_torch_available():
import torch
import torch.nn as nn
from transformers import TvltForAudioVisualClassification, TvltForPreTraining, TvltModel
if is_datasets_available():
from datasets import load_dataset
if is_vision_available():
from transformers import TvltImageProcessor
if is_speech_available():
from transformers import TvltFeatureExtractor
class TvltModelTester:
def __init__(
self,
parent,
batch_size=2,
image_size=32,
spectrogram_length=32,
frequency_length=16,
image_patch_size=[2, 2],
audio_patch_size=[2, 2],
num_image_channels=3,
num_audio_channels=1,
num_frames=2,
hidden_size=32,
num_hidden_layers=2,
num_attention_heads=4,
intermediate_size=128,
hidden_act="gelu",
hidden_dropout_prob=0.0,
attention_probs_dropout_prob=0.0,
initializer_range=0.02,
layer_norm_eps=1e-12,
qkv_bias=True,
use_mean_pooling=True,
decoder_num_attention_heads=4,
decoder_hidden_size=32,
decoder_num_hidden_layers=2,
decoder_intermediate_size=128,
image_mask_ratio=0.75,
audio_mask_ratio=0.15,
audio_mask_type="frame-level",
task_matching=True,
task_mae=True,
num_labels=1,
is_training=True,
):
self.parent = parent
self.batch_size = batch_size
self.image_size = image_size
self.spectrogram_length = spectrogram_length
self.frequency_length = frequency_length
self.image_patch_size = image_patch_size
self.audio_patch_size = audio_patch_size
self.num_image_channels = num_image_channels
self.num_audio_channels = num_audio_channels
self.num_frames = num_frames
self.hidden_size = hidden_size
self.num_hidden_layers = num_hidden_layers
self.num_attention_heads = num_attention_heads
self.intermediate_size = intermediate_size
self.hidden_act = hidden_act
self.hidden_dropout_prob = hidden_dropout_prob
self.attention_probs_dropout_prob = attention_probs_dropout_prob
self.initializer_range = initializer_range
self.layer_norm_eps = layer_norm_eps
self.qkv_bias = qkv_bias
self.use_mean_pooling = use_mean_pooling
self.decoder_num_attention_heads = decoder_num_attention_heads
self.decoder_hidden_size = decoder_hidden_size
self.decoder_num_hidden_layers = decoder_num_hidden_layers
self.decoder_intermediate_size = decoder_intermediate_size
self.image_mask_ratio = image_mask_ratio
self.audio_mask_ratio = audio_mask_ratio
self.task_matching = task_matching
self.task_mae = task_mae
self.num_labels = num_labels
self.expected_pixel_seq_len = (self.image_size // self.image_patch_size[0]) ** 2 * self.num_frames
self.expected_audio_seq_len = (self.spectrogram_length // self.audio_patch_size[0]) * (
self.frequency_length // self.audio_patch_size[1]
)
# we set the expected sequence length (which is used in several tests)
# this is equal to the seq length of number of image/video patches + number of audio patches
self.expected_seq_len = self.expected_pixel_seq_len + self.expected_audio_seq_len + 1
self.image_mae_output_dim = image_patch_size[0] ** 2 * num_image_channels
self.audio_mae_output_dim = audio_patch_size[0] * audio_patch_size[1] * num_audio_channels
self.is_training = is_training
def prepare_config_and_inputs(self):
pixel_values = floats_tensor(
[self.batch_size, self.num_frames, self.num_image_channels, self.image_size, self.image_size]
)
audio_values = floats_tensor(
[self.batch_size, self.num_audio_channels, self.spectrogram_length, self.frequency_length]
)
pixel_mask = floats_tensor([self.batch_size, self.expected_pixel_seq_len])
audio_mask = floats_tensor([self.batch_size, self.expected_audio_seq_len])
config = self.get_config()
return (config, pixel_values, audio_values, pixel_mask, audio_mask)
def prepare_config_and_inputs_for_pretraining(self):
pixel_values = floats_tensor(
[self.batch_size, self.num_frames, self.num_image_channels, self.image_size, self.image_size]
)
audio_values = floats_tensor(
[self.batch_size, self.num_audio_channels, self.spectrogram_length, self.frequency_length]
)
pixel_mask = floats_tensor([self.batch_size, self.expected_pixel_seq_len])
audio_mask = floats_tensor([self.batch_size, self.expected_audio_seq_len])
pixel_values_mixed = floats_tensor(
[self.batch_size, self.num_frames, self.num_image_channels, self.image_size, self.image_size]
)
pixel_mask_mixed = floats_tensor([self.batch_size, self.expected_pixel_seq_len])
labels = floats_tensor([self.batch_size])
config = self.get_config()
return (
config,
pixel_values,
audio_values,
pixel_mask,
audio_mask,
pixel_values_mixed,
pixel_mask_mixed,
labels,
)
def get_config(self):
return TvltConfig(
image_size=self.image_size,
spectrogram_length=self.spectrogram_length,
frequency_length=self.frequency_length,
image_patch_size=self.image_patch_size,
audio_patch_size=self.audio_patch_size,
num_image_channels=self.num_image_channels,
num_audio_channels=self.num_audio_channels,
num_frames=self.num_frames,
hidden_size=self.hidden_size,
num_hidden_layers=self.num_hidden_layers,
num_attention_heads=self.num_attention_heads,
intermediate_size=self.intermediate_size,
hidden_act=self.hidden_act,
hidden_dropout_prob=self.hidden_dropout_prob,
attention_probs_dropout_prob=self.attention_probs_dropout_prob,
initializer_range=self.initializer_range,
layer_norm_eps=self.layer_norm_eps,
qkv_bias=self.qkv_bias,
use_mean_pooling=self.use_mean_pooling,
decoder_num_attention_heads=self.decoder_num_attention_heads,
decoder_hidden_size=self.decoder_hidden_size,
decoder_num_hidden_layers=self.decoder_num_hidden_layers,
decoder_intermediate_size=self.decoder_intermediate_size,
image_mask_ratio=self.image_mask_ratio,
audio_mask_ratio=self.audio_mask_ratio,
task_matching=self.task_matching,
task_mae=self.task_mae,
num_labels=self.num_labels,
)
def create_and_check_model(self, config, pixel_values, audio_values, pixel_mask, audio_mask):
model = TvltModel(config=config)
model.to(torch_device)
model.eval()
result = model(pixel_values, audio_values, pixel_mask=pixel_mask, audio_mask=audio_mask)
result = model(pixel_values, audio_values)
self.parent.assertEqual(
result.last_hidden_state.shape, (self.batch_size, self.expected_seq_len, self.hidden_size)
)
def create_and_check_for_audiovisual_classification(
self, config, pixel_values, audio_values, pixel_mask, audio_mask
):
model = TvltForAudioVisualClassification(config=config)
model.to(torch_device)
model.eval()
result = model(pixel_values, audio_values, pixel_mask=pixel_mask, audio_mask=audio_mask)
result = model(pixel_values, audio_values)
self.parent.assertEqual(result.logits.shape, (self.batch_size, self.num_labels))
def create_and_check_for_pretraining(
self,
config,
pixel_values,
audio_values,
pixel_mask,
audio_mask,
pixel_values_mixed,
pixel_mask_mixed,
labels,
):
model = TvltForPreTraining(config=config)
model.to(torch_device)
model.train()
result = model(
pixel_values,
audio_values,
pixel_mask,
audio_mask,
pixel_values_mixed=pixel_values_mixed,
pixel_mask_mixed=pixel_mask_mixed,
labels=labels,
)
self.parent.assertEqual(
result.pixel_logits.shape, (self.batch_size, self.expected_pixel_seq_len, self.image_mae_output_dim)
)
self.parent.assertEqual(
result.audio_logits.shape, (self.batch_size, self.expected_audio_seq_len, self.audio_mae_output_dim)
)
self.parent.assertEqual(result.matching_logits.shape, (self.batch_size, self.num_labels))
def create_and_check_for_pretraining_inference(
self,
config,
pixel_values,
audio_values,
pixel_mask,
audio_mask,
pixel_values_mixed,
pixel_mask_mixed,
labels,
):
model = TvltForPreTraining(config=config)
model.to(torch_device)
model.eval()
result = model(
pixel_values,
audio_values,
pixel_mask,
audio_mask,
pixel_values_mixed=pixel_values_mixed,
pixel_mask_mixed=pixel_mask_mixed,
labels=labels,
)
if result.pixel_logits is not None:
self.parent.assertEqual(
result.pixel_logits.shape, (self.batch_size, self.expected_pixel_seq_len, self.image_mae_output_dim)
)
if result.audio_logits is not None:
self.parent.assertEqual(
result.audio_logits.shape, (self.batch_size, self.expected_audio_seq_len, self.audio_mae_output_dim)
)
self.parent.assertEqual(result.matching_logits.shape, (self.batch_size, self.num_labels))
def prepare_config_and_inputs_for_common(self):
config_and_inputs = self.prepare_config_and_inputs()
(config, pixel_values, audio_values, pixel_mask, audio_mask) = config_and_inputs
inputs_dict = {
"pixel_values": pixel_values,
"audio_values": audio_values,
"pixel_mask": pixel_mask,
"audio_mask": audio_mask,
}
return config, inputs_dict
def prepare_pixel_values(self):
return floats_tensor(
[self.batch_size, self.num_frames, self.num_image_channels, self.image_size, self.image_size]
)
def prepare_audio_values(self):
return floats_tensor(
[self.batch_size, self.num_audio_channels, self.spectrogram_length, self.frequency_length]
)
@require_torch
class TvltModelTest(ModelTesterMixin, PipelineTesterMixin, unittest.TestCase):
all_model_classes = (
(TvltModel, TvltForPreTraining, TvltForAudioVisualClassification) if is_torch_available() else ()
)
pipeline_model_mapping = {"feature-extraction": TvltModel} if is_torch_available() else {}
fx_compatible = False
test_pruning = False
test_headmasking = False
test_torchscript = False
test_resize_embeddings = False
main_input_name = "pixel_values"
# TvltForAudioVisualClassification and TvltForPreTraining require special treatment
def _prepare_for_class(self, inputs_dict, model_class, return_labels=True):
inputs_dict = copy.deepcopy(inputs_dict)
if return_labels:
if model_class.__name__ == "TvltForAudioVisualClassification":
inputs_dict["labels"] = torch.zeros(
(self.model_tester.batch_size,), dtype=torch.long, device=torch_device
)
elif model_class.__name__ == "TvltForPreTraining":
inputs_dict["labels"] = torch.zeros(
(self.model_tester.batch_size,), dtype=torch.float, device=torch_device
)
inputs_dict["pixel_values_mixed"] = torch.zeros(
(
self.model_tester.batch_size,
self.model_tester.num_frames,
self.model_tester.num_image_channels,
self.model_tester.image_size,
self.model_tester.image_size,
),
dtype=torch.float,
device=torch_device,
)
inputs_dict["pixel_mask_mixed"] = torch.zeros(
(self.model_tester.batch_size, self.model_tester.expected_pixel_seq_len),
dtype=torch.float,
device=torch_device,
)
return inputs_dict
def setUp(self):
self.model_tester = TvltModelTester(self)
self.config_tester = ConfigTester(self, config_class=TvltConfig, has_text_modality=False, hidden_size=37)
def test_config(self):
self.config_tester.run_common_tests()
@unittest.skip(reason="TVLT does not use inputs_embeds")
def test_inputs_embeds(self):
pass
def test_model_common_attributes(self):
config, _ = self.model_tester.prepare_config_and_inputs_for_common()
for model_class in self.all_model_classes:
model = model_class(config)
input_embeddings = model.get_input_embeddings()
self.assertIsInstance(input_embeddings, (tuple))
for embedding in input_embeddings:
self.assertIsInstance(embedding, (nn.Module))
x = model.get_output_embeddings()
self.assertTrue(x is None or isinstance(x, nn.Linear))
def test_forward_signature(self):
config, _ = self.model_tester.prepare_config_and_inputs_for_common()
for model_class in self.all_model_classes:
model = model_class(config)
signature = inspect.signature(model.forward)
# signature.parameters is an OrderedDict => so arg_names order is deterministic
arg_names = [*signature.parameters.keys()]
expected_arg_names = ["pixel_values", "audio_values"]
self.assertListEqual(arg_names[:2], expected_arg_names)
def test_model(self):
config_and_inputs = self.model_tester.prepare_config_and_inputs()
self.model_tester.create_and_check_model(*config_and_inputs)
def test_for_audiovisual_classification(self):
config_and_inputs = self.model_tester.prepare_config_and_inputs()
self.model_tester.create_and_check_for_audiovisual_classification(*config_and_inputs)
def test_for_pretraining(self):
config_and_inputs = self.model_tester.prepare_config_and_inputs_for_pretraining()
self.model_tester.create_and_check_for_pretraining(*config_and_inputs)
self.model_tester.create_and_check_for_pretraining_inference(*config_and_inputs)
@slow
def test_model_from_pretrained(self):
model_name = "ZinengTang/tvlt-base"
model = TvltModel.from_pretrained(model_name)
self.assertIsNotNone(model)
def test_training(self):
if not self.model_tester.is_training:
return
for model_class in self.all_model_classes[1:]:
config, inputs_dict = self.model_tester.prepare_config_and_inputs_for_common()
config.return_dict = True
model = model_class(config)
model.to(torch_device)
model.train()
inputs = self._prepare_for_class(inputs_dict, model_class)
for k, v in inputs.items():
print(k, v.shape)
loss = model(**inputs).loss
loss.backward()
def test_training_gradient_checkpointing(self):
if not self.model_tester.is_training:
return
for model_class in self.all_model_classes[1:]:
config, inputs_dict = self.model_tester.prepare_config_and_inputs_for_common()
config.use_cache = False
config.return_dict = True
model = model_class(config)
model.to(torch_device)
model.gradient_checkpointing_enable()
model.train()
inputs = self._prepare_for_class(inputs_dict, model_class)
loss = model(**inputs).loss
loss.backward()
def test_attention_outputs(self):
if not self.has_attentions:
pass
else:
config, inputs_dict = self.model_tester.prepare_config_and_inputs_for_common()
config.return_dict = True
for model_class in self.all_model_classes[2:]:
seq_len = self.model_tester.expected_seq_len
inputs_dict["output_attentions"] = True
inputs_dict["output_hidden_states"] = False
config.return_dict = True
model = model_class(config)
model.to(torch_device)
model.eval()
with torch.no_grad():
outputs = model(**self._prepare_for_class(inputs_dict, model_class))
attentions = outputs.attentions
self.assertEqual(len(attentions), self.model_tester.num_hidden_layers)
# check that output_attentions also work using config
del inputs_dict["output_attentions"]
config.output_attentions = True
model = model_class(config)
model.to(torch_device)
model.eval()
with torch.no_grad():
outputs = model(**self._prepare_for_class(inputs_dict, model_class))
attentions = outputs.attentions
self.assertEqual(len(attentions), self.model_tester.num_hidden_layers)
self.assertListEqual(
list(attentions[0].shape[-3:]),
[self.model_tester.num_attention_heads, seq_len, seq_len],
)
out_len = len(outputs)
# Check attention is always last and order is fine
inputs_dict["output_attentions"] = True
inputs_dict["output_hidden_states"] = True
model = model_class(config)
model.to(torch_device)
model.eval()
with torch.no_grad():
outputs = model(**self._prepare_for_class(inputs_dict, model_class))
self.assertEqual(out_len + 1, len(outputs))
self_attentions = outputs.attentions
self.assertEqual(len(self_attentions), self.model_tester.num_hidden_layers)
self.assertListEqual(
list(self_attentions[0].shape[-3:]),
[self.model_tester.num_attention_heads, seq_len, seq_len],
)
def test_hidden_states_output(self):
def check_hidden_states_output(inputs_dict, config, model_class):
model = model_class(config)
model.to(torch_device)
model.eval()
with torch.no_grad():
outputs = model(**self._prepare_for_class(inputs_dict, model_class))
hidden_states = outputs.hidden_states
expected_num_layers = self.model_tester.num_hidden_layers + 1
self.assertEqual(len(hidden_states), expected_num_layers)
seq_length = self.model_tester.expected_seq_len
self.assertListEqual(
list(hidden_states[0].shape[-2:]),
[seq_length, self.model_tester.hidden_size],
)
config, inputs_dict = self.model_tester.prepare_config_and_inputs_for_common()
for model_class in self.all_model_classes[2:]:
inputs_dict["output_hidden_states"] = True
check_hidden_states_output(inputs_dict, config, model_class)
# check that output_hidden_states also work using config
del inputs_dict["output_hidden_states"]
config.output_hidden_states = True
check_hidden_states_output(inputs_dict, config, model_class)
# We will verify our results on a video of eating spaghetti
# Frame indices used: [164 168 172 176 181 185 189 193 198 202 206 210 215 219 223 227]
def prepare_video(num_frames=8):
file = hf_hub_download(
repo_id="hf-internal-testing/spaghetti-video", filename="eating_spaghetti.npy", repo_type="dataset"
)
video = np.load(file)[:num_frames]
return list(video)
def prepare_audio(num_samples=1):
ds = load_dataset("hf-internal-testing/librispeech_asr_dummy", "clean", split="validation")
# automatic decoding with librispeech
speech_samples = ds.sort("id").select(range(num_samples))[:num_samples]["audio"]
return [x["array"] for x in speech_samples]
@require_torch
@require_vision
class TvltModelIntegrationTest(unittest.TestCase):
@cached_property
def default_processors(self):
# logits were tested with a different mean and std, so we use the same here
return (
TvltImageProcessor() if is_vision_available() else None,
TvltFeatureExtractor(),
)
def test_inference_for_base_model(self):
model = TvltModel.from_pretrained("ZinengTang/tvlt-base").to(torch_device)
image_processor, audio_feature_extractor = self.default_processors
video = prepare_video()
audio = prepare_audio()
video_inputs = image_processor(video, return_tensors="pt").to(torch_device)
audio_inputs = audio_feature_extractor(audio, return_tensors="pt").to(torch_device)
inputs = {}
inputs.update(video_inputs)
inputs.update(audio_inputs)
# forward pass
with torch.no_grad():
outputs = model(**inputs)
# verify the logits
expected_last_hidden_state_slice = torch.tensor([[-0.0186, -0.0691], [0.0242, -0.0398]], device=torch_device)
self.assertTrue(
torch.allclose(outputs.last_hidden_state[:, :2, :2], expected_last_hidden_state_slice, atol=1e-4)
)
def test_inference_for_pretraining(self):
model = TvltForPreTraining.from_pretrained("ZinengTang/tvlt-base").to(torch_device)
image_processor, audio_feature_extractor = self.default_processors
video = prepare_video()
video_mixed = prepare_video()
audio = prepare_audio()
video_inputs = image_processor(video, return_tensors="pt", mask_pixel=True).to(torch_device)
video_mixed_inputs = image_processor(video_mixed, is_mixed=True, return_tensors="pt").to(torch_device)
audio_inputs = audio_feature_extractor(audio, return_tensors="pt", mask_audio=True).to(torch_device)
labels = torch.tensor([[0.0]], device=torch_device)
inputs = {}
inputs.update(video_inputs)
inputs.update(video_mixed_inputs)
inputs.update(audio_inputs)
inputs.update({"labels": labels})
# forward pass
with torch.no_grad():
outputs = model(**inputs)
# verify the logits
expected_pixel_logits_shape = torch.Size([1, 1568, 768])
expected_audio_logits_shape = torch.Size([1, 96, 256])
expected_matching_logits_shape = torch.Size([1, 1])
if outputs.pixel_logits is not None:
self.assertEqual(outputs.pixel_logits.shape, expected_pixel_logits_shape)
if outputs.audio_logits is not None:
self.assertEqual(outputs.audio_logits.shape, expected_audio_logits_shape)
self.assertTrue(outputs.matching_logits.shape, expected_matching_logits_shape)
# Copyright 2023 The HuggingFace Team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import shutil
import tempfile
import unittest
import numpy as np
import pytest
from transformers import is_speech_available, is_vision_available
from transformers.testing_utils import require_torch
if is_vision_available():
from transformers import TvltImageProcessor
if is_speech_available():
from transformers import TvltFeatureExtractor
from transformers import TvltProcessor
@require_torch
class TvltProcessorTest(unittest.TestCase):
def setUp(self):
self.checkpoint = "ZinengTang/tvlt-base"
self.tmpdirname = tempfile.mkdtemp()
def get_image_processor(self, **kwargs):
return TvltImageProcessor.from_pretrained(self.checkpoint, **kwargs)
def get_feature_extractor(self, **kwargs):
return TvltFeatureExtractor.from_pretrained(self.checkpoint, **kwargs)
def tearDown(self):
shutil.rmtree(self.tmpdirname)
def test_save_load_pretrained_default(self):
image_processor = self.get_image_processor()
feature_extractor = self.get_feature_extractor()
processor = TvltProcessor(image_processor=image_processor, feature_extractor=feature_extractor)
processor.save_pretrained(self.tmpdirname)
processor = TvltProcessor.from_pretrained(self.tmpdirname)
self.assertIsInstance(processor.feature_extractor, TvltFeatureExtractor)
self.assertIsInstance(processor.image_processor, TvltImageProcessor)
def test_feature_extractor(self):
image_processor = self.get_image_processor()
feature_extractor = self.get_feature_extractor()
processor = TvltProcessor(image_processor=image_processor, feature_extractor=feature_extractor)
audio = np.ones([12000])
audio_dict = feature_extractor(audio, return_tensors="np")
input_processor = processor(audio=audio, return_tensors="np")
for key in audio_dict.keys():
self.assertAlmostEqual(audio_dict[key].sum(), input_processor[key].sum(), delta=1e-2)
def test_image_processor(self):
image_processor = self.get_image_processor()
feature_extractor = self.get_feature_extractor()
processor = TvltProcessor(image_processor=image_processor, feature_extractor=feature_extractor)
images = np.ones([3, 224, 224])
image_dict = image_processor(images, return_tensors="np")
input_processor = processor(images=images, return_tensors="np")
for key in image_dict.keys():
self.assertAlmostEqual(image_dict[key].sum(), input_processor[key].sum(), delta=1e-2)
def test_processor(self):
image_processor = self.get_image_processor()
feature_extractor = self.get_feature_extractor()
processor = TvltProcessor(image_processor=image_processor, feature_extractor=feature_extractor)
audio = np.ones([12000])
images = np.ones([3, 224, 224])
inputs = processor(audio=audio, images=images)
self.assertListEqual(list(inputs.keys()), ["audio_values", "audio_mask", "pixel_values", "pixel_mask"])
# test if it raises when no input is passed
with pytest.raises(ValueError):
processor()
def test_model_input_names(self):
image_processor = self.get_image_processor()
feature_extractor = self.get_feature_extractor()
processor = TvltProcessor(image_processor=image_processor, feature_extractor=feature_extractor)
self.assertListEqual(
processor.model_input_names,
image_processor.model_input_names + feature_extractor.model_input_names,
msg="`processor` and `image_processor`+`feature_extractor` model input names do not match",
)
# coding=utf-8
# Copyright 2022 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Testing suite for the PyTorch ViT Hybrid model."""
import unittest
from transformers import ViTHybridConfig
from transformers.testing_utils import is_flaky, require_accelerate, require_torch, require_vision, slow, torch_device
from transformers.utils import cached_property, is_torch_available, is_vision_available
from ...test_configuration_common import ConfigTester
from ...test_modeling_common import ModelTesterMixin, _config_zero_init, floats_tensor, ids_tensor
from ...test_pipeline_mixin import PipelineTesterMixin
if is_torch_available():
import torch
from torch import nn
from transformers import ViTHybridForImageClassification, ViTHybridImageProcessor, ViTHybridModel
if is_vision_available():
from PIL import Image
class ViTHybridModelTester:
def __init__(
self,
parent,
batch_size=13,
image_size=64,
patch_size=2,
num_channels=3,
is_training=True,
use_labels=True,
hidden_size=32,
num_hidden_layers=2,
num_attention_heads=4,
intermediate_size=37,
hidden_act="gelu",
hidden_dropout_prob=0.1,
attention_probs_dropout_prob=0.1,
type_sequence_label_size=10,
initializer_range=0.02,
backbone_featmap_shape=[1, 16, 4, 4],
scope=None,
attn_implementation="eager",
):
self.parent = parent
self.batch_size = batch_size
self.image_size = image_size
self.patch_size = patch_size
self.num_channels = num_channels
self.is_training = is_training
self.use_labels = use_labels
self.hidden_size = hidden_size
self.num_hidden_layers = num_hidden_layers
self.num_attention_heads = num_attention_heads
self.intermediate_size = intermediate_size
self.hidden_act = hidden_act
self.hidden_dropout_prob = hidden_dropout_prob
self.attention_probs_dropout_prob = attention_probs_dropout_prob
self.type_sequence_label_size = type_sequence_label_size
self.initializer_range = initializer_range
self.scope = scope
self.backbone_featmap_shape = backbone_featmap_shape
self.attn_implementation = attn_implementation
# in ViT hybrid, the seq length equals the number of patches + 1 (we add 1 for the [CLS] token)
# the number of patches is based on the feature map of the backbone, which by default uses an output stride
# of 32, which means that the feature map has a spatial resolution of 1/32 of the input image size
num_patches = (self.image_size // 32) ** 2
self.seq_length = num_patches + 1
def prepare_config_and_inputs(self):
pixel_values = floats_tensor([self.batch_size, self.num_channels, self.image_size, self.image_size])
labels = None
if self.use_labels:
labels = ids_tensor([self.batch_size], self.type_sequence_label_size)
config = self.get_config()
return config, pixel_values, labels
def get_config(self):
backbone_config = {
"global_padding": "same",
"layer_type": "bottleneck",
"depths": [3, 4, 9],
"out_features": ["stage1", "stage2", "stage3"],
"embedding_dynamic_padding": True,
"hidden_sizes": [4, 8, 16, 32],
"num_groups": 2,
}
return ViTHybridConfig(
image_size=self.image_size,
patch_size=self.patch_size,
num_channels=self.num_channels,
hidden_size=self.hidden_size,
num_hidden_layers=self.num_hidden_layers,
num_attention_heads=self.num_attention_heads,
intermediate_size=self.intermediate_size,
hidden_act=self.hidden_act,
hidden_dropout_prob=self.hidden_dropout_prob,
attention_probs_dropout_prob=self.attention_probs_dropout_prob,
is_decoder=False,
initializer_range=self.initializer_range,
backbone_featmap_shape=self.backbone_featmap_shape,
backbone_config=backbone_config,
backbone=None,
attn_implementation=self.attn_implementation,
)
def create_and_check_model(self, config, pixel_values, labels):
model = ViTHybridModel(config=config)
model.to(torch_device)
model.eval()
result = model(pixel_values)
self.parent.assertEqual(result.last_hidden_state.shape, (self.batch_size, self.seq_length, self.hidden_size))
def create_and_check_for_image_classification(self, config, pixel_values, labels):
config.num_labels = self.type_sequence_label_size
model = ViTHybridForImageClassification(config)
model.to(torch_device)
model.eval()
result = model(pixel_values, labels=labels)
self.parent.assertEqual(result.logits.shape, (self.batch_size, self.type_sequence_label_size))
def prepare_config_and_inputs_for_common(self):
config_and_inputs = self.prepare_config_and_inputs()
config, pixel_values, labels = config_and_inputs
inputs_dict = {"pixel_values": pixel_values}
return config, inputs_dict
@require_torch
class ViTHybridModelTest(ModelTesterMixin, PipelineTesterMixin, unittest.TestCase):
"""
Here we also overwrite some of the tests of test_modeling_common.py, as ViT does not use input_ids, inputs_embeds,
attention_mask and seq_length.
"""
all_model_classes = (ViTHybridModel, ViTHybridForImageClassification) if is_torch_available() else ()
pipeline_model_mapping = (
{"image-feature-extraction": ViTHybridModel, "image-classification": ViTHybridForImageClassification}
if is_torch_available()
else {}
)
test_pruning = False
test_resize_embeddings = False
test_head_masking = False
model_split_percents = [0.5, 0.9]
def setUp(self):
self.model_tester = ViTHybridModelTester(self)
self.config_tester = ConfigTester(self, config_class=ViTHybridConfig, has_text_modality=False, hidden_size=37)
def test_config(self):
self.config_tester.run_common_tests()
@unittest.skip(reason="ViT does not use inputs_embeds")
def test_inputs_embeds(self):
pass
def test_model_common_attributes(self):
config, _ = self.model_tester.prepare_config_and_inputs_for_common()
for model_class in self.all_model_classes:
model = model_class(config)
self.assertIsInstance(model.get_input_embeddings(), (nn.Module))
x = model.get_output_embeddings()
self.assertTrue(x is None or isinstance(x, nn.Linear))
def test_model(self):
config_and_inputs = self.model_tester.prepare_config_and_inputs()
self.model_tester.create_and_check_model(*config_and_inputs)
def test_for_image_classification(self):
config_and_inputs = self.model_tester.prepare_config_and_inputs()
self.model_tester.create_and_check_for_image_classification(*config_and_inputs)
def test_initialization(self):
config, inputs_dict = self.model_tester.prepare_config_and_inputs_for_common()
configs_no_init = _config_zero_init(config)
for model_class in self.all_model_classes:
model = model_class(config=configs_no_init)
# Skip the check for the backbone
for name, module in model.named_modules():
if module.__class__.__name__ == "ViTHybridPatchEmbeddings":
backbone_params = [f"{name}.{key}" for key in module.state_dict().keys()]
break
for name, param in model.named_parameters():
if param.requires_grad:
if name in backbone_params:
continue
self.assertIn(
((param.data.mean() * 1e9).round() / 1e9).item(),
[0.0, 1.0],
msg=f"Parameter {name} of model {model_class} seems not properly initialized",
)
@slow
def test_model_from_pretrained(self):
model_name = "google/vit-hybrid-base-bit-384"
model = ViTHybridModel.from_pretrained(model_name)
self.assertIsNotNone(model)
@is_flaky(description="is_flaky https://github.com/huggingface/transformers/issues/29516")
def test_batching_equivalence(self):
super().test_batching_equivalence()
# We will verify our results on an image of cute cats
def prepare_img():
image = Image.open("./tests/fixtures/tests_samples/COCO/000000039769.png")
return image
@require_torch
@require_vision
class ViTModelIntegrationTest(unittest.TestCase):
@cached_property
def default_image_processor(self):
return (
ViTHybridImageProcessor.from_pretrained("google/vit-hybrid-base-bit-384")
if is_vision_available()
else None
)
@slow
def test_inference_image_classification_head(self):
model = ViTHybridForImageClassification.from_pretrained("google/vit-hybrid-base-bit-384").to(torch_device)
image_processor = self.default_image_processor
image = prepare_img()
inputs = image_processor(images=image, return_tensors="pt").to(torch_device)
# forward pass
with torch.no_grad():
outputs = model(**inputs)
# verify the logits
expected_shape = torch.Size((1, 1000))
self.assertEqual(outputs.logits.shape, expected_shape)
expected_slice = torch.tensor([-1.9090, -0.4993, -0.2389]).to(torch_device)
self.assertTrue(torch.allclose(outputs.logits[0, :3], expected_slice, atol=1e-4))
@slow
@require_accelerate
def test_accelerate_inference(self):
image_processor = ViTHybridImageProcessor.from_pretrained("google/vit-hybrid-base-bit-384")
model = ViTHybridForImageClassification.from_pretrained("google/vit-hybrid-base-bit-384", device_map="auto")
image = prepare_img()
inputs = image_processor(images=image, return_tensors="pt").to(torch_device)
outputs = model(**inputs)
logits = outputs.logits
# model predicts one of the 1000 ImageNet classes
predicted_class_idx = logits.argmax(-1).item()
self.assertTrue(model.config.id2label[predicted_class_idx], "tabby, tabby cat")
# coding=utf-8
# Copyright 2020 The HuggingFace Inc. team, The Microsoft Research team.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
from transformers import is_torch_available
from transformers.testing_utils import require_torch, slow, torch_device
if is_torch_available():
import torch
from transformers import XLMProphetNetForConditionalGeneration, XLMProphetNetTokenizer
@require_torch
class XLMProphetNetModelIntegrationTest(unittest.TestCase):
@slow
def test_pretrained_checkpoint_hidden_states(self):
model = XLMProphetNetForConditionalGeneration.from_pretrained("microsoft/xprophetnet-large-wiki100-cased")
model.to(torch_device)
# encoder-decoder outputs
encoder_ids = torch.tensor([[17, 96208, 103471, 2]]).to(torch_device)
decoder_prev_ids = torch.tensor(
[[2, 250, 9953, 34, 69489, 1620, 32, 118424, 624, 210, 105, 2913, 1032, 351]]
).to(torch_device)
output = model(
input_ids=encoder_ids, attention_mask=None, encoder_outputs=None, decoder_input_ids=decoder_prev_ids
)
output_predited_logis = output[0]
expected_shape = torch.Size((1, 14, 250012))
self.assertEqual(output_predited_logis.shape, expected_shape)
expected_slice = torch.tensor(
[[[-6.3986, -8.2391, 12.5189], [-6.3289, -8.0864, 12.6211], [-6.2418, -8.0445, 12.7968]]]
).to(torch_device)
self.assertTrue(torch.allclose(output_predited_logis[:, :3, :3], expected_slice, atol=1e-4))
# encoder outputs
encoder_outputs = model.prophetnet.encoder(encoder_ids)[0]
expected_encoder_outputs_slice = torch.tensor(
[[[-1.4260, -0.7628, 0.8453], [-1.4719, -0.1391, 0.7807], [-1.7678, 0.0114, 0.4646]]]
).to(torch_device)
expected_shape_encoder = torch.Size((1, 4, 1024))
self.assertEqual(encoder_outputs.shape, expected_shape_encoder)
self.assertTrue(torch.allclose(encoder_outputs[:, :3, :3], expected_encoder_outputs_slice, atol=1e-4))
# decoder outputs
decoder_outputs = model.prophetnet.decoder(
decoder_prev_ids,
encoder_hidden_states=encoder_outputs,
)
predicting_streams = decoder_outputs[1].view(1, model.config.ngram, 14, -1)
predicting_streams_logits = model.lm_head(predicting_streams)
next_first_stream_logits = predicting_streams_logits[:, 0]
self.assertTrue(torch.allclose(next_first_stream_logits[:, :3, :3], expected_slice, atol=1e-4))
@slow
def test_ntg_hidden_states(self):
model = XLMProphetNetForConditionalGeneration.from_pretrained(
"microsoft/xprophetnet-large-wiki100-cased-xglue-ntg"
)
model.to(torch_device)
encoder_ids = torch.tensor([[17, 96208, 103471, 2]]).to(torch_device)
decoder_prev_ids = torch.tensor(
[[2, 250, 9953, 34, 69489, 1620, 32, 118424, 624, 210, 105, 2913, 1032, 351]]
).to(torch_device)
output = model(
input_ids=encoder_ids, attention_mask=None, encoder_outputs=None, decoder_input_ids=decoder_prev_ids
)
output_predited_logis = output[0]
expected_shape = torch.Size((1, 14, 250012))
self.assertEqual(output_predited_logis.shape, expected_shape)
# compare the actual values for a slice.
expected_slice = torch.tensor(
[[[-9.2253, -9.7173, -6.3529], [-7.6701, -9.0145, -1.9382], [-8.0195, -7.0004, -0.1523]]]
).to(torch_device)
self.assertTrue(torch.allclose(output_predited_logis[:, :3, :3], expected_slice, atol=1e-4))
@slow
def test_xprophetnet_ntg_inference(self):
model = XLMProphetNetForConditionalGeneration.from_pretrained(
"microsoft/xprophetnet-large-wiki100-cased-xglue-ntg"
)
model.to(torch_device)
model.config.max_length = 512
tokenizer = XLMProphetNetTokenizer.from_pretrained("microsoft/xprophetnet-large-wiki100-cased-xglue-ntg")
EN_SENTENCE = (
"Microsoft Corporation intends to officially end free support for the Windows 7 operating system after"
" January 14, 2020, according to the official portal of the organization. From that day, users of this"
" system will not be able to receive security updates, which could make their computers vulnerable to"
" cyber attacks."
)
RU_SENTENCE = (
"орпорация Microsoft намерена официально прекратить бесплатную поддержку операционной системы Windows 7"
" после 14 января 2020 года, сообщается на официальном портале организации . С указанного дня пользователи"
" этой системы не смогут получать обновления безопасности, из-за чего их компьютеры могут стать уязвимыми"
" к кибератакам."
)
ZH_SENTENCE = "根据该组织的官方门户网站,微软公司打算在2020年1月14日之后正式终止对Windows 7操作系统的免费支持。从那时起,该系统的用户将无法接收安全更新,这可能会使他们的计算机容易受到网络攻击。"
input_ids = tokenizer(
[EN_SENTENCE, RU_SENTENCE, ZH_SENTENCE], padding=True, max_length=255, return_tensors="pt"
).input_ids
input_ids = input_ids.to(torch_device)
summary_ids = model.generate(
input_ids, num_beams=10, length_penalty=1.0, no_repeat_ngram_size=3, early_stopping=True
)
generated_titles = [tokenizer.decode(g, skip_special_tokens=True) for g in summary_ids]
EXPECTED_TITLE_EN = "Microsoft to end Windows 7 free support after January 14, 2020"
EXPECTED_TITLE_RU = "Microsoft намерена прекратить бесплатную поддержку Windows 7 после 14 января 2020 года"
EXPECTED_TITLE_ZH = "微软打算终止对Windows 7操作系统的免费支持"
self.assertListEqual(
[EXPECTED_TITLE_EN, EXPECTED_TITLE_RU, EXPECTED_TITLE_ZH],
generated_titles,
)
summary_ids_beam1 = model.generate(
input_ids, num_beams=1, length_penalty=1.0, no_repeat_ngram_size=3, early_stopping=True
)
generated_titles_beam1_tok = [
tokenizer.convert_ids_to_tokens(g, skip_special_tokens=True) for g in summary_ids_beam1
]
EXPECTED_TITLE_EN_BEAM1_TOK = "▁Microsoft ▁to ▁end ▁free ▁support ▁for ▁Windows ▁7".split(" ")
EXPECTED_TITLE_RU_BEAM1_TOK = "▁Microsoft ▁намерен а ▁прекрати ть ▁бес плат ную ▁поддержку ▁Windows ▁7 ▁после ▁14 ▁января ▁2020 ▁года".split(
" "
)
EXPECTED_TITLE_ZH_BEAM1_TOK = "微软 公司 打算 终止 对 Windows ▁7 操作 系统的 免费 支持".split(" ")
self.assertListEqual(
[EXPECTED_TITLE_EN_BEAM1_TOK, EXPECTED_TITLE_RU_BEAM1_TOK, EXPECTED_TITLE_ZH_BEAM1_TOK],
generated_titles_beam1_tok,
)
# coding=utf-8
# Copyright 2020 The HuggingFace Inc. team, The Microsoft Research team.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
from transformers.models.xlm_prophetnet.tokenization_xlm_prophetnet import SPIECE_UNDERLINE, XLMProphetNetTokenizer
from transformers.testing_utils import get_tests_dir, require_sentencepiece, slow
from transformers.utils import cached_property
from ...test_tokenization_common import TokenizerTesterMixin
SAMPLE_VOCAB = get_tests_dir("fixtures/test_sentencepiece.model")
@require_sentencepiece
class XLMProphetNetTokenizationTest(TokenizerTesterMixin, unittest.TestCase):
from_pretrained_id = "microsoft/xprophetnet-large-wiki100-cased"
tokenizer_class = XLMProphetNetTokenizer
test_rust_tokenizer = False
test_sentencepiece = True
def setUp(self):
super().setUp()
# We have a SentencePiece fixture for testing
tokenizer = XLMProphetNetTokenizer(SAMPLE_VOCAB, keep_accents=True)
tokenizer.save_pretrained(self.tmpdirname)
def test_convert_token_and_id(self):
"""Test ``_convert_token_to_id`` and ``_convert_id_to_token``."""
token = "[PAD]"
token_id = 0
self.assertEqual(self.get_tokenizer()._convert_token_to_id(token), token_id)
self.assertEqual(self.get_tokenizer()._convert_id_to_token(token_id), token)
def test_get_vocab(self):
vocab_keys = list(self.get_tokenizer().get_vocab().keys())
self.assertEqual(vocab_keys[0], "[PAD]")
self.assertEqual(vocab_keys[1], "[CLS]")
self.assertEqual(vocab_keys[-1], "j")
self.assertEqual(len(vocab_keys), 1_012)
def test_vocab_size(self):
self.assertEqual(self.get_tokenizer().vocab_size, 1_012)
def test_full_tokenizer(self):
tokenizer = XLMProphetNetTokenizer(SAMPLE_VOCAB, keep_accents=True)
tokens = tokenizer.tokenize("This is a test")
self.assertListEqual(tokens, ["▁This", "▁is", "▁a", "▁t", "est"])
self.assertListEqual(
tokenizer.convert_tokens_to_ids(tokens),
[value + tokenizer.fairseq_offset for value in [285, 46, 10, 170, 382]],
)
tokens = tokenizer.tokenize("I was born in 92000, and this is falsé.")
self.assertListEqual(
tokens,
[
SPIECE_UNDERLINE + "I",
SPIECE_UNDERLINE + "was",
SPIECE_UNDERLINE + "b",
"or",
"n",
SPIECE_UNDERLINE + "in",
SPIECE_UNDERLINE + "",
"9",
"2",
"0",
"0",
"0",
",",
SPIECE_UNDERLINE + "and",
SPIECE_UNDERLINE + "this",
SPIECE_UNDERLINE + "is",
SPIECE_UNDERLINE + "f",
"al",
"s",
"é",
".",
],
)
ids = tokenizer.convert_tokens_to_ids(tokens)
self.assertListEqual(
ids,
[
value + tokenizer.fairseq_offset
for value in [8, 21, 84, 55, 24, 19, 7, -9, 602, 347, 347, 347, 3, 12, 66, 46, 72, 80, 6, -9, 4]
],
)
back_tokens = tokenizer.convert_ids_to_tokens(ids)
self.assertListEqual(
back_tokens,
[
SPIECE_UNDERLINE + "I",
SPIECE_UNDERLINE + "was",
SPIECE_UNDERLINE + "b",
"or",
"n",
SPIECE_UNDERLINE + "in",
SPIECE_UNDERLINE + "",
"[UNK]",
"2",
"0",
"0",
"0",
",",
SPIECE_UNDERLINE + "and",
SPIECE_UNDERLINE + "this",
SPIECE_UNDERLINE + "is",
SPIECE_UNDERLINE + "f",
"al",
"s",
"[UNK]",
".",
],
)
@cached_property
def big_tokenizer(self):
return XLMProphetNetTokenizer.from_pretrained("microsoft/xprophetnet-large-wiki100-cased")
@slow
def test_tokenization_base_easy_symbols(self):
symbols = "Hello World!"
original_tokenizer_encodings = [35389, 6672, 49, 2]
self.assertListEqual(original_tokenizer_encodings, self.big_tokenizer.encode(symbols))
@slow
def test_tokenizer_integration(self):
expected_encoding = {'input_ids': [[11073, 82783, 18, 26, 82783, 549, 51540, 248, 17209, 1301, 217, 20, 215186, 1325, 147, 17209, 1301, 217, 20, 56370, 53, 122020, 20, 16477, 27, 87355, 4548, 20, 4728, 78392, 17, 159969, 18, 26, 24491, 629, 15, 538, 22704, 5439, 15, 2788, 24491, 9885, 15, 43534, 605, 15, 814, 18403, 33200, 29, 15, 43534, 24458, 12410, 111, 24966, 83669, 9637, 144068, 26, 850, 22346, 27, 147, 24966, 83669, 83490, 26, 39113, 735, 27, 689, 656, 2800, 1339, 4600, 53, 122020, 115785, 34, 816, 1339, 46887, 18, 147, 53905, 1951, 42238, 41170, 17732, 834, 436, 15, 27523, 98733, 217, 147, 5542, 4981, 930, 17347, 16, 2], [20091, 629, 94, 82786, 58, 490, 20, 1528, 84, 53905, 344, 80592, 110128, 18822, 5267, 1306, 62, 152537, 308, 7997, 401, 124427, 549, 35442, 225, 109, 15055, 25748, 147, 7119, 43712, 34, 767, 135366, 18, 16, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], [592, 63784, 119466, 17, 147808, 88214, 18, 656, 81, 32, 3296, 10280, 16, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]], 'attention_mask': [[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1], [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]]} # fmt: skip
self.tokenizer_integration_test_util(
expected_encoding=expected_encoding,
model_name="microsoft/xprophetnet-large-wiki100-cased",
revision="1acad1643ddd54a44df6a1b797ada8373685d90e",
)
...@@ -61,8 +61,6 @@ SPECIAL_CASES_TO_ALLOW = { ...@@ -61,8 +61,6 @@ SPECIAL_CASES_TO_ALLOW = {
# `ignore_value` used during training (despite we don't have training script for these models yet) # `ignore_value` used during training (despite we don't have training script for these models yet)
# `norm` used in conversion script (despite not using in the modeling file) # `norm` used in conversion script (despite not using in the modeling file)
"OneFormerConfig": ["ignore_value", "norm"], "OneFormerConfig": ["ignore_value", "norm"],
# used during preprocessing and collation, see `collating_graphormer.py`
"GraphormerConfig": ["spatial_pos_max"],
# used internally in the configuration class file # used internally in the configuration class file
"T5Config": ["feed_forward_proj"], "T5Config": ["feed_forward_proj"],
# used internally in the configuration class file # used internally in the configuration class file
...@@ -134,20 +132,16 @@ SPECIAL_CASES_TO_ALLOW.update( ...@@ -134,20 +132,16 @@ SPECIAL_CASES_TO_ALLOW.update(
{ {
"CLIPSegConfig": True, "CLIPSegConfig": True,
"DeformableDetrConfig": True, "DeformableDetrConfig": True,
"DetaConfig": True,
"DinatConfig": True, "DinatConfig": True,
"DonutSwinConfig": True, "DonutSwinConfig": True,
"EfficientFormerConfig": True,
"FastSpeech2ConformerConfig": True, "FastSpeech2ConformerConfig": True,
"FSMTConfig": True, "FSMTConfig": True,
"JukeboxConfig": True,
"LayoutLMv2Config": True, "LayoutLMv2Config": True,
"MaskFormerSwinConfig": True, "MaskFormerSwinConfig": True,
"MT5Config": True, "MT5Config": True,
# For backward compatibility with trust remote code models # For backward compatibility with trust remote code models
"MptConfig": True, "MptConfig": True,
"MptAttentionConfig": True, "MptAttentionConfig": True,
"NatConfig": True,
"OneFormerConfig": True, "OneFormerConfig": True,
"PerceiverConfig": True, "PerceiverConfig": True,
"RagConfig": True, "RagConfig": True,
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment