Unverified Commit 691fd8fd authored by Yih-Dar's avatar Yih-Dar Committed by GitHub
Browse files

Add `Kosmos-2` model (#24709)



* Add KOSMOS-2 model

* update

* update

* update

* address review comment - 001

* address review comment - 002

* address review comment - 003

* style

* Apply suggestions from code review
Co-authored-by: default avataramyeroberts <22614925+amyeroberts@users.noreply.github.com>

* fix

* address review comment - 004

* address review comment - 005

* address review comment - 006

* address review comment - 007

* address review comment - 008

* address review comment - 009

* address review comment - 010

* address review comment - 011

* update readme

* fix

* fix

* fix

* [skip ci] fix

* revert the change in _decode

* fix docstring

* fix docstring

* Update docs/source/en/model_doc/kosmos-2.md
Co-authored-by: default avatarNielsRogge <48327001+NielsRogge@users.noreply.github.com>

* no more Kosmos2Tokenizer

* style

* remove "returned when being computed by the model"

* Apply suggestions from code review
Co-authored-by: default avatarArthur <48595927+ArthurZucker@users.noreply.github.com>

* UTM5 Atten

* fix attn mask

* use present_key_value_states instead of next_decoder_cache

* style

* conversion scripts

* conversion scripts

* conversion scripts

* Add _reorder_cache

* fix doctest and copies

* rename 1

* rename 2

* rename 3

* make fixup

* fix table

* fix docstring

* rename 4

* change repo_id

* remove tip

* update md file

* make style

* update md file

* put docs/source/en/model_doc/kosmos-2.md to slow

* update conversion script

* Use CLIPImageProcessor in Kosmos2Processor

* Remove Kosmos2ImageProcessor

* Remove to_dict in Kosmos2Config

* Remove files

* fix import

* Update conversion

* normalized=False

* Not using hardcoded values like <image>

* elt --> element

* Apply suggestion

* Not using hardcoded values like </image>

* No assert

* No nested functions

* Fix md file

* copy

* update doc

* fix docstring

* fix name

* Remove _add_remove_spaces_around_tag_tokens

* Remove dummy docstring of _preprocess_single_example

* Use `BatchEncoding`

* temp

* temp

* temp

* Update

* Update

* Make Kosmos2ProcessorTest a bit pretty

* Update gradient checkpointing

* Fix gradient checkpointing test

* Remove one liner remove_special_fields

* Simplify conversion script

* fix add_eos_token

* update readme

* update tests

* Change to microsoft/kosmos-2-patch14-224

* style

* Fix doc

---------
Co-authored-by: default avatarydshieh <ydshieh@users.noreply.github.com>
Co-authored-by: default avataramyeroberts <22614925+amyeroberts@users.noreply.github.com>
Co-authored-by: default avatarNielsRogge <48327001+NielsRogge@users.noreply.github.com>
Co-authored-by: default avatarArthur <48595927+ArthurZucker@users.noreply.github.com>
parent d751dbec
# coding=utf-8
# Copyright 2023 Microsoft Research and The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Processor class for KOSMOS-2."""
import copy
import math
import re
from typing import List, Optional, Tuple, Union
from ...image_processing_utils import BatchFeature
from ...image_utils import ImageInput, is_batched
from ...processing_utils import ProcessorMixin
from ...tokenization_utils import AddedToken
from ...tokenization_utils_base import BatchEncoding, PaddingStrategy, TextInput, TruncationStrategy
from ...utils import TensorType
BboxInput = Union[
List[Tuple[int, int]],
List[Tuple[float, float, float, float]],
List[List[Tuple[int, int]]],
List[List[Tuple[float, float, float]]],
]
class Kosmos2Processor(ProcessorMixin):
r"""
Constructs an KOSMOS-2 processor which wraps a KOSMOS-2 image processor and a KOSMOS-2 tokenizer into a single
processor.
[`Kosmos2Processor`] offers all the functionalities of [`CLIPImageProcessor`] and some functionalities of
[`XLMRobertaTokenizerFast`]. See the docstring of [`~Kosmos2Processor.__call__`] and [`~Kosmos2Processor.decode`]
for more information.
Args:
image_processor (`CLIPImageProcessor`):
An instance of [`CLIPImageProcessor`]. The image processor is a required input.
tokenizer (`XLMRobertaTokenizerFast`):
An instance of ['XLMRobertaTokenizerFast`]. The tokenizer is a required input.
num_patch_index_tokens (`int`, *optional*, defaults to 1024):
The number of tokens that represent patch indices.
"""
attributes = ["image_processor", "tokenizer"]
image_processor_class = "CLIPImageProcessor"
tokenizer_class = ("XLMRobertaTokenizer", "XLMRobertaTokenizerFast")
def __init__(self, image_processor, tokenizer, num_patch_index_tokens=1024):
tokenizer.return_token_type_ids = False
self.eod_token = "</doc>"
self.boi_token = "<image>"
self.eoi_token = "</image>"
self.eoc_token = "</chunk>"
self.eol_token = "</line>"
self.bop_token = "<phrase>"
self.eop_token = "</phrase>"
self.boo_token = "<object>"
self.eoo_token = "</object>"
self.dom_token = "</delimiter_of_multi_objects/>"
self.grd_token = "<grounding>"
self.tag_tokens = [
self.eod_token,
self.boi_token,
self.eoi_token,
self.eoc_token,
self.eol_token,
self.bop_token,
self.eop_token,
self.boo_token,
self.eoo_token,
self.dom_token,
self.grd_token,
]
self.num_patch_index_tokens = num_patch_index_tokens
patch_index_tokens = [f"<patch_index_{str(x).zfill(4)}>" for x in range(self.num_patch_index_tokens)]
tokens_to_add = []
for token in self.tag_tokens + patch_index_tokens:
tokens_to_add.append(AddedToken(token, lstrip=True, rstrip=False, normalized=False))
tokenizer.add_tokens(tokens_to_add)
super().__init__(image_processor, tokenizer)
def __call__(
self,
images: ImageInput = None,
text: Union[TextInput, List[TextInput]] = None,
bboxes: BboxInput = None,
num_image_tokens: Optional[int] = 64,
first_image_token_id: Optional[int] = None,
add_special_tokens: bool = True,
add_eos_token: bool = False,
padding: Union[bool, str, PaddingStrategy] = False,
truncation: Union[bool, str, TruncationStrategy] = None,
max_length: Optional[int] = None,
pad_to_multiple_of: Optional[int] = None,
return_attention_mask: Optional[bool] = None,
return_length: bool = False,
verbose: bool = True,
return_tensors: Optional[Union[str, TensorType]] = None,
**kwargs,
) -> BatchFeature:
"""
This method uses [`CLIPImageProcessor.__call__`] method to prepare image(s) for the model, and
[`XLMRobertaTokenizerFast.__call__`] to prepare text for the model.
Please refer to the docstring of the above two methods for more information.
The rest of this documentation shows the arguments specific to `Kosmos2Processor`.
Args:
bboxes (`Union[List[Tuple[int]], List[Tuple[float]], List[List[Tuple[int]]], List[List[Tuple[float]]]]`, *optional*):
The bounding bboxes associated to `texts`.
num_image_tokens (`int`, defaults to 64):
The number of (consecutive) places that are used to mark the placeholders to store image information.
This should be the same as `latent_query_num` in the instance of `Kosmos2Config` you are using.
first_image_token_id (`int`, *optional*):
The token id that will be used for the first place of the subsequence that is reserved to store image
information. If unset, will default to `self.tokenizer.unk_token_id + 1`.
add_eos_token (`bool`, defaults to `False`):
Whether or not to include `EOS` token id in the encoding when `add_special_tokens=True`.
"""
if images is None and text is None:
raise ValueError("You have to specify either images or text.")
encoding = BatchFeature()
if images is not None:
image_encoding = self.image_processor(images, return_tensors=return_tensors)
encoding.update(image_encoding)
if text is not None:
text = self.preprocess_examples(text, images, bboxes, num_image_tokens=num_image_tokens)
if add_special_tokens and not add_eos_token:
if isinstance(text, str):
text = f"{self.tokenizer.bos_token}{text}"
elif isinstance(text, list):
text = [f"{self.tokenizer.bos_token}{s}" for s in text]
text_encoding = self.tokenizer(
text=text,
add_special_tokens=(add_special_tokens and add_eos_token),
padding=padding and images is None,
truncation=truncation,
max_length=max_length,
pad_to_multiple_of=pad_to_multiple_of if images is None else pad_to_multiple_of,
return_attention_mask=return_attention_mask,
verbose=verbose,
return_tensors=return_tensors if images is None else None,
**kwargs,
)
encoding.update(text_encoding)
if text is not None and images is not None:
# Use the id of the first token after <unk>
if first_image_token_id is None:
first_image_token_id = self.tokenizer.unk_token_id + 1
# To see if we need one more `0` (for `<s>`) at the beginning of `image_embeds_position_mask`.
with_bos = add_special_tokens
# The first (actual) `<image>` token is always at the 1st or 2nd place (after `<s>` if any). Here we look
# for the second `<image>` token (which indicate the first image token).
start_index = int(with_bos) + 1
# Add `image_embeds_position_mask`: the leading and trailing `0` are for `boi` and `eoi` tokens. The `1` indicates
# the places of image tokens.
image_token_ids = list(range(first_image_token_id, first_image_token_id + num_image_tokens))
base_image_embeds_position_mask = [0] + [1] * num_image_tokens + [0]
# loop over `encoding["input_ids"]`
input_ids = []
image_embeds_position_mask = []
all_input_ids = encoding["input_ids"]
# not batched -> (changed to) batch of size 1
if isinstance(text, str):
all_input_ids = [all_input_ids]
encoding["attention_mask"] = [encoding["attention_mask"]]
for text_ids in all_input_ids:
# change the ids for the fake `<image>` tokens in `input_ids`
text_ids = text_ids[:start_index] + image_token_ids + text_ids[start_index + num_image_tokens :]
input_ids.append(text_ids)
mask = copy.copy(base_image_embeds_position_mask)
if with_bos:
# for `<s>`
mask = [0] + mask
# trailing part (which are not related to the image)
mask += [0] * (len(text_ids) - len(mask))
image_embeds_position_mask.append(mask)
if isinstance(text, list):
sorted_length = sorted([(idx, len(x)) for idx, x in enumerate(text_encoding.input_ids)])
_, min_len_not_padded = sorted_length[0]
idx, _ = sorted_length[-1]
text_encoding = self.tokenizer(
text=[text[idx]],
add_special_tokens=(add_special_tokens and add_eos_token),
padding=padding,
truncation=truncation,
max_length=max_length,
pad_to_multiple_of=pad_to_multiple_of,
verbose=verbose,
return_tensors=None,
**kwargs,
)
max_len_padded = len(text_encoding.input_ids[0])
if min_len_not_padded != max_len_padded:
if self.tokenizer.padding_side == "right":
input_ids = [x + [self.tokenizer.pad_token_id] * (max_len_padded - len(x)) for x in input_ids]
image_embeds_position_mask = [
x + [0] * (max_len_padded - len(x)) for x in image_embeds_position_mask
]
encoding["attention_mask"] = [
x + [0] * (max_len_padded - len(x)) for x in encoding["attention_mask"]
]
elif self.tokenizer.padding_side == "left":
input_ids = [[self.tokenizer.pad_token_id] * (max_len_padded - len(x)) + x for x in input_ids]
image_embeds_position_mask = [
[0] * (max_len_padded - len(x)) + x for x in image_embeds_position_mask
]
encoding["attention_mask"] = [
[0] * (max_len_padded - len(x)) + x for x in encoding["attention_mask"]
]
# un-batch if necessary
if isinstance(text, str) and return_tensors is None:
input_ids = input_ids[0]
encoding["attention_mask"] = encoding["attention_mask"][0]
image_embeds_position_mask = image_embeds_position_mask[0]
# update (with the target tensor type if specified)
encoding.update(
BatchEncoding(
data={
"input_ids": input_ids,
"attention_mask": encoding["attention_mask"],
"image_embeds_position_mask": image_embeds_position_mask,
},
tensor_type=return_tensors,
)
)
return encoding
def _check_bboxes_for_single_text(self, bboxes):
"""
Check `bboxes` for a single text example. It could be
- `None`: no bounding box associated to a text.
- A list with each element being the bounding boxes associated to one `<phrase> ... </phrase>` pair found
in a text. This could be:
- `None`: no bounding box associated to a `<phrase> ... </phrase>` pair.
- A tuple of 2 integers: A single bounding box specified by patch indices.
- A tuple of 4 float point number: A single bounding box specified by (normalized) coordinates.
- A list containing the above 2 tuple types: Multiple bounding boxes for a
`<phrase> ... </phrase>` pair.
"""
if bboxes is None:
return
elif not isinstance(bboxes, list):
raise ValueError("`bboxes` (for a single text example) should be `None` or a list.")
# `bbox` is the bounding boxes for a single <phrase> </phrase> pair
for bbox in bboxes:
if bbox is None:
continue
elif not isinstance(bbox, list):
bbox = [bbox]
for element in bbox:
if not isinstance(element, tuple) or not (
(len(element) == 2 and all(isinstance(x, int) for x in element))
or (len(element) == 4 and all(isinstance(x, float) for x in element))
):
raise ValueError(
"Each element in `bboxes` (for a single text example) should be either `None`, a tuple containing "
"2 integers or 4 float point numbers, or a list containing such tuples. Also "
"make sure the arguments `texts` and `bboxes` passed to `preprocess_text` are both in "
"batches or both for a single example."
)
def _preprocess_single_example(self, text, image, bboxes, img_info_tokens):
text = text.strip()
if image is not None:
# Add `<image> ... (fake) image tokens ... </image>`
text = f"{img_info_tokens} {text}"
# Add `<object> <patch_idx_xxxx> <patch_idx_yyy> </object>` after `<phrase> phrase text </phrase>`
text = self._insert_patch_index_tokens(text, bboxes)
return text
def preprocess_examples(
self,
texts: Union[TextInput, List[TextInput]],
images: ImageInput = None,
bboxes: BboxInput = None,
num_image_tokens: Optional[int] = 64,
) -> Union[str, List[str]]:
"""Add image and bounding box information to `texts` as image and patch index tokens.
Args:
texts (`Union[TextInput, List[TextInput]]`): The texts to be processed.
images (`ImageInput`, *optional*): The images associated to `texts`.
bboxes (`Union[List[Tuple[int]], List[Tuple[float]], List[List[Tuple[int]]], List[List[Tuple[float]]]]`, *optional*):
The bounding bboxes associated to `texts`.
num_image_tokens (`int`, *optional*, defaults to 64):
The number of image tokens (used as latent queries). This should corresponds to the `latent_query_num`
attribute in `Kosmos2Config`.
Returns:
`Union[TextInput, List[TextInput]]`: The processed texts with image and patch index tokens.
"""
# These are fake `<image>` tokens enclosed between (the actual) `<image>` token and `</image>`.
img_tokens = [self.boi_token] * num_image_tokens
img_info_tokens = " ".join([self.boi_token] + img_tokens + [self.eoi_token])
# make batch to simplify processing logic
batched = True
if isinstance(texts, str):
batched = False
texts = [texts]
if images is None:
images = [None] * len(texts)
elif not is_batched(images):
images = [images]
if len(texts) != len(images):
raise ValueError(
f"The number of examples in `texts` and `images` should be the same. Got {len(texts)} v.s. {len(images)} instead."
)
if not batched:
self._check_bboxes_for_single_text(bboxes)
bboxes = [bboxes]
elif bboxes is not None:
if not isinstance(bboxes, list):
raise ValueError("`bboxes` should be `None` or a list (as a batch) when `texts` is passed as a batch.")
for x in bboxes:
self._check_bboxes_for_single_text(x)
else:
bboxes = [None] * len(texts)
if len(bboxes) != len(texts):
raise ValueError(
f"The number of examples in `texts` and `bboxes` should be the same. Got {len(texts)} v.s. {len(bboxes)} instead."
)
result = [
self._preprocess_single_example(text, image, bbox, img_info_tokens)
for text, image, bbox in zip(texts, images, bboxes)
]
# un-batch if necessary
if not batched:
result = result[0]
return result
# Copied from transformers.models.blip.processing_blip.BlipProcessor.batch_decode with BertTokenizerFast->PreTrainedTokenizer
def batch_decode(self, *args, **kwargs):
"""
This method forwards all its arguments to PreTrainedTokenizer's [`~PreTrainedTokenizer.batch_decode`]. Please
refer to the docstring of this method for more information.
"""
return self.tokenizer.batch_decode(*args, **kwargs)
# Copied from transformers.models.blip.processing_blip.BlipProcessor.decode with BertTokenizerFast->PreTrainedTokenizer
def decode(self, *args, **kwargs):
"""
This method forwards all its arguments to PreTrainedTokenizer's [`~PreTrainedTokenizer.decode`]. Please refer
to the docstring of this method for more information.
"""
return self.tokenizer.decode(*args, **kwargs)
def post_process_generation(self, text, cleanup_and_extract=True):
caption = text.split(self.eoi_token)[-1]
if cleanup_and_extract:
return clean_text_and_extract_entities_with_bboxes(caption)
return caption
@property
# Copied from transformers.models.blip.processing_blip.BlipProcessor.model_input_names
def model_input_names(self):
tokenizer_input_names = self.tokenizer.model_input_names
image_processor_input_names = self.image_processor.model_input_names
return list(dict.fromkeys(tokenizer_input_names + image_processor_input_names))
def _insert_patch_index_tokens(self, text: str, bboxes: Union[List[Tuple[int]], List[Tuple[float]]]) -> str:
if bboxes is None or len(bboxes) == 0:
return text
matched_phrases = list(re.finditer(r"<phrase>.+?</phrase>", string=text))
if len(matched_phrases) != len(bboxes):
raise ValueError(
f"The number of elements in `bboxes` should be the same as the number of `<phrase> ... </phrase>` pairs in `text`. Got {len(matched_phrases)} v.s. {len(bboxes)} instead."
)
# insert object's patch index tokens
# the found `<phrase> ... </phrase>` pairs.
curr_pos = 0
buffer = []
for matched, bbox in zip(matched_phrases, bboxes):
_, end = matched.span()
buffer.append(text[curr_pos:end])
curr_pos = end
# A phrase without bbox
if bbox is None:
continue
# A phrase with a single bbox
if isinstance(bbox, tuple):
bbox = [bbox]
patch_index_strings = []
# A phrase could have multiple bboxes
if not all(box is not None for box in bbox):
raise ValueError(
"The multiple bounding boxes for a single phrase should not contain any `None` value."
)
for box in bbox:
patch_index_1, patch_index_2 = self._convert_bbox_to_patch_index_tokens(box)
patch_index_strings.append(f"{patch_index_1} {patch_index_2}")
# `bbox` being an empty list
if len(patch_index_strings) == 0:
continue
position_str = " </delimiter_of_multi_objects/> ".join(patch_index_strings)
buffer.append(f"<object> {position_str} </object>")
# remaining
if curr_pos < len(text):
buffer.append(text[curr_pos:])
text = "".join(buffer)
return text
def _convert_bbox_to_patch_index_tokens(
self, bbox: Union[Tuple[int, int], Tuple[float, float, float, float]]
) -> Tuple[str, str]:
# already computed patch indices
if len(bbox) == 2:
idx_1, idx_2 = bbox
# bbox specified with (normalized) coordinates
else:
# use `self.tokenizer` to get `num_patches_per_side`
num_patches_per_side = int(math.sqrt(self.num_patch_index_tokens))
idx_1, idx_2 = coordinate_to_patch_index(bbox, num_patches_per_side)
token_1 = f"<patch_index_{str(idx_1).zfill(4)}>"
token_2 = f"<patch_index_{str(idx_2).zfill(4)}>"
return token_1, token_2
def coordinate_to_patch_index(bbox: Tuple[float, float, float, float], num_patches_per_side: int) -> Tuple[int, int]:
"""Convert a bounding box to a pair of patch indices.
Args:
bbox (`Tuple[float, float, float, float]`):
The 4 coordinates of the bounding box, with the format being (x1, y1, x2, y2) specifying the upper-left and
lower-right corners of the box. It should have x2 > x1 and y2 > y1.
num_patches_per_side (`int`): the number of patches along each side.
Returns:
`Tuple[int, int]`: A pair of patch indices representing the upper-left patch and lower-right patch.
"""
(x1, y1, x2, y2) = bbox
if not (x2 > x1 and y2 > y1):
raise ValueError("The coordinates in `bbox` should be `(x1, y1, x2, y2)` with `x2 > x1` and `y2 > y1`.")
ul_x = math.floor(x1 * num_patches_per_side)
ul_y = math.floor(y1 * num_patches_per_side)
lr_x = math.ceil(x2 * num_patches_per_side - 1)
lr_y = math.ceil(y2 * num_patches_per_side - 1)
ul_idx = ul_y * num_patches_per_side + ul_x
lr_idx = lr_y * num_patches_per_side + lr_x
return ul_idx, lr_idx
# copied from https://github.com/microsoft/unilm/blob/97e4923e97d3ee10b57e97013556e3fd0d207a9b/kosmos-2/demo/decode_string.py#L35C1-L75C38
# (with format modifications)
def patch_index_to_coordinate(ul_idx: int, lr_idx: int, num_patches_per_side: int):
"""
Given a grid of length `num_patches_per_side` and the indices of the upper-left and lower-right corners of a
bounding box, returns the normalized coordinates of the bounding box, in the form (x1, y1, x2, y2).
Args:
ul_idx (`int`): the index of the grid cell that corresponds to the upper-left corner of the bounding box.
lr_idx (`int`): the index of the grid cell that corresponds to the lower-right corner of the bounding box.
num_patches_per_side (`int`): the number of patches along each side.
Returns:
`Tuple[float]`: the normalized coordinates of the bounding box, in the form (x1, y1, x2, y2).
"""
# Compute the size of each cell in the grid
cell_size = 1.0 / num_patches_per_side
# Compute the x and y indices of the upper-left and lower-right corners of the bounding box
ul_x = ul_idx % num_patches_per_side
ul_y = ul_idx // num_patches_per_side
lr_x = lr_idx % num_patches_per_side
lr_y = lr_idx // num_patches_per_side
# Compute the normalized coordinates of the bounding box
if ul_idx == lr_idx:
x1 = ul_x * cell_size
y1 = ul_y * cell_size
x2 = lr_x * cell_size + cell_size
y2 = lr_y * cell_size + cell_size
elif ul_x == lr_x or ul_y == lr_y:
x1 = ul_x * cell_size
y1 = ul_y * cell_size
x2 = lr_x * cell_size + cell_size
y2 = lr_y * cell_size + cell_size
else:
x1 = ul_x * cell_size + cell_size / 2
y1 = ul_y * cell_size + cell_size / 2
x2 = lr_x * cell_size + cell_size / 2
y2 = lr_y * cell_size + cell_size / 2
return x1, y1, x2, y2
# copied from https://github.com/microsoft/unilm/blob/97e4923e97d3ee10b57e97013556e3fd0d207a9b/kosmos-2/demo/decode_string.py#L4-L33
# (with format modifications)
def extract_entities_with_patch_indices(text):
"""Extract entities contained in `text`. The bounding bboxes is given in the form of patch indices.
This functioin is only intended to be used within `clean_text_and_extract_entities_with_bboxes` where further
processing happens, including converting to normalized coordinates and whitespace character cleaning up.
Examples:
```python
>>> text = "<grounding> An image of<phrase> a snowman</phrase><object><patch_index_0044><patch_index_0863></object> warming himself by<phrase> a fire</phrase><object><patch_index_0005><patch_index_0911></object>."
>>> entities = extract_entities_with_patch_indices(text)
>>> entities
[(' a snowman', (31, 41), [(44, 863)]), (' a fire', (130, 137), [(5, 911)])]
```"""
# The regular expression pattern for matching the required formats
pattern = r"(?:(<phrase>([^<]+)</phrase>))?<object>((?:<patch_index_\d+><patch_index_\d+></delimiter_of_multi_objects/>)*<patch_index_\d+><patch_index_\d+>)</object>"
# Find all matches in the given string
matches = re.finditer(pattern, text)
# Initialize an empty list to store the valid patch_index combinations
entities_with_patch_indices = []
for match in matches:
# span of a `phrase` that is between <phrase> and </phrase>
span = match.span(2)
phrase_tag, phrase, match_content = match.groups()
if not phrase_tag:
phrase = None
# We take the starting position of `<object>`
span = (match.span(0)[0], match.span(0)[0])
# Split the match_content by the delimiter to get individual patch_index pairs
patch_index_pairs = match_content.split("</delimiter_of_multi_objects/>")
entity_bboxes = []
for pair in patch_index_pairs:
# Extract the xxxx and yyyy values from the patch_index pair
x = re.search(r"<patch_index_(\d+)>", pair)
y = re.search(r"<patch_index_(\d+)>", pair[1:])
if x and y:
if phrase:
entity_bboxes.append((int(x.group(1)), int(y.group(1))))
else:
entity_bboxes.append((int(x.group(1)), int(y.group(1))))
if phrase:
entities_with_patch_indices.append((phrase, span, entity_bboxes))
else:
for bbox in entity_bboxes:
# fake entity name
entity = f"<patch_index_{bbox[0]}><patch_index_{bbox[1]}>"
entities_with_patch_indices.append((entity, span, [bbox]))
return entities_with_patch_indices
def adjust_entity_positions(entity, text):
"""Adjust the positions of the entities in `text` to be relative to the text with special fields removed."""
entity_name, (start, end) = entity
# computed the length of strings with special fields (tag tokens, patch index tokens, etc.) removed
adjusted_start = len(re.sub("<.*?>", "", text[:start]))
adjusted_end = len(re.sub("<.*?>", "", text[:end]))
adjusted_entity = (entity_name, (adjusted_start, adjusted_end))
return adjusted_entity
def _cleanup_spaces(text, entities):
"""Remove the spaces around the text and the entities in it."""
new_text = text.strip()
leading_spaces = len(text) - len(text.lstrip())
new_entities = []
for entity_name, (start, end), bboxes in entities:
entity_name_leading_spaces = len(entity_name) - len(entity_name.lstrip())
entity_name_trailing_spaces = len(entity_name) - len(entity_name.rstrip())
start = start - leading_spaces + entity_name_leading_spaces
end = end - leading_spaces - entity_name_trailing_spaces
entity_name = entity_name.strip()
new_entities.append((entity_name, (start, end), bboxes))
return new_text, new_entities
# copied from https://github.com/microsoft/unilm/blob/97e4923e97d3ee10b57e97013556e3fd0d207a9b/kosmos-2/demo/decode_string.py#L77-L87
# (with format modifications)
def clean_text_and_extract_entities_with_bboxes(text, num_patches_per_side=32):
"""Remove the tag tokens from `text`, extract entities in it with some cleaning up of white characters.
Examples:
```python
>>> text = "<grounding> An image of<phrase> a snowman</phrase><object><patch_index_0044><patch_index_0863></object> warming himself by<phrase> a fire</phrase><object><patch_index_0005><patch_index_0911></object>."
>>> clean_text, entities = clean_text_and_extract_entities_with_bboxes(text)
>>> clean_text
'An image of a snowman warming himself by a fire.'
>>> entities
[('a snowman', (12, 21), [(0.390625, 0.046875, 0.984375, 0.828125)]), ('a fire', (41, 47), [(0.171875, 0.015625, 0.484375, 0.890625)])]
```"""
# remove special fields (tag tokens, patch index tokens, etc.)
processed_text = re.sub("<.*?>", "", text)
entities_with_patch_indices = extract_entities_with_patch_indices(text)
entities = []
for item in entities_with_patch_indices:
entity, bboxes = item[0:2], item[2]
adjusted_entity = adjust_entity_positions(entity, text)
bboxes_in_coords = [patch_index_to_coordinate(bbox[0], bbox[1], num_patches_per_side) for bbox in bboxes]
entities.append(adjusted_entity + (bboxes_in_coords,))
return _cleanup_spaces(processed_text, entities)
......@@ -4261,6 +4261,30 @@ class JukeboxVQVAE(metaclass=DummyObject):
requires_backends(self, ["torch"])
KOSMOS2_PRETRAINED_MODEL_ARCHIVE_LIST = None
class Kosmos2ForConditionalGeneration(metaclass=DummyObject):
_backends = ["torch"]
def __init__(self, *args, **kwargs):
requires_backends(self, ["torch"])
class Kosmos2Model(metaclass=DummyObject):
_backends = ["torch"]
def __init__(self, *args, **kwargs):
requires_backends(self, ["torch"])
class Kosmos2PreTrainedModel(metaclass=DummyObject):
_backends = ["torch"]
def __init__(self, *args, **kwargs):
requires_backends(self, ["torch"])
LAYOUTLM_PRETRAINED_MODEL_ARCHIVE_LIST = None
......
# coding=utf-8
# Copyright 2023 Microsoft Research and The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
""" Testing suite for the PyTorch KOSMOS-2 model. """
import copy
import inspect
import os
import tempfile
import unittest
import numpy as np
import requests
from transformers import AutoModelForVision2Seq, AutoProcessor, Kosmos2Config
from transformers.models.kosmos2.configuration_kosmos2 import Kosmos2TextConfig, Kosmos2VisionConfig
from transformers.testing_utils import require_torch, require_vision, slow, torch_device
from transformers.utils import is_torch_available, is_vision_available
from ...test_configuration_common import ConfigTester
from ...test_modeling_common import (
ModelTesterMixin,
_config_zero_init,
floats_tensor,
ids_tensor,
random_attention_mask,
)
if is_torch_available():
import torch
from transformers import Kosmos2ForConditionalGeneration, Kosmos2Model
from transformers.models.kosmos2.modeling_kosmos2 import KOSMOS2_PRETRAINED_MODEL_ARCHIVE_LIST
if is_vision_available():
from PIL import Image
class Kosmos2VisionModelTester:
def __init__(
self,
parent,
batch_size=12,
image_size=32,
patch_size=4,
num_channels=3,
is_training=True,
hidden_size=32,
num_hidden_layers=2,
num_attention_heads=4,
intermediate_size=37,
dropout=0.1,
attention_dropout=0.1,
initializer_range=1e-10,
scope=None,
):
self.parent = parent
self.batch_size = batch_size
self.image_size = image_size
self.patch_size = patch_size
self.num_channels = num_channels
self.is_training = is_training
self.hidden_size = hidden_size
self.num_hidden_layers = num_hidden_layers
self.num_attention_heads = num_attention_heads
self.intermediate_size = intermediate_size
self.dropout = dropout
self.attention_dropout = attention_dropout
self.initializer_range = initializer_range
self.scope = scope
# in ViT, the seq length equals the number of patches + 1 (we add 1 for the [CLS] token)
num_patches = (image_size // patch_size) ** 2
self.seq_length = num_patches + 1
def prepare_config_and_inputs(self):
pixel_values = floats_tensor([self.batch_size, self.num_channels, self.image_size, self.image_size])
config = self.get_config()
return config, pixel_values
def get_config(self):
return Kosmos2VisionConfig(
image_size=self.image_size,
patch_size=self.patch_size,
num_channels=self.num_channels,
hidden_size=self.hidden_size,
num_hidden_layers=self.num_hidden_layers,
num_attention_heads=self.num_attention_heads,
intermediate_size=self.intermediate_size,
dropout=self.dropout,
attention_dropout=self.attention_dropout,
initializer_range=self.initializer_range,
)
def prepare_config_and_inputs_for_common(self):
config_and_inputs = self.prepare_config_and_inputs()
config, pixel_values = config_and_inputs
inputs_dict = {"pixel_values": pixel_values}
return config, inputs_dict
class Kosmos2TextModelTester:
def __init__(
self,
parent,
batch_size=12,
seq_length=7,
is_training=True,
use_input_mask=True,
use_labels=True,
vocab_size=99,
hidden_size=32,
num_hidden_layers=2,
num_attention_heads=4,
intermediate_size=37,
dropout=0.1,
attention_dropout=0.1,
max_position_embeddings=512,
scope=None,
):
self.parent = parent
self.batch_size = batch_size
self.seq_length = seq_length
self.is_training = is_training
self.use_input_mask = use_input_mask
self.use_labels = use_labels
self.vocab_size = vocab_size
self.hidden_size = hidden_size
self.num_hidden_layers = num_hidden_layers
self.num_attention_heads = num_attention_heads
self.intermediate_size = intermediate_size
self.dropout = dropout
self.attention_dropout = attention_dropout
self.max_position_embeddings = max_position_embeddings
self.scope = scope
def prepare_config_and_inputs(self):
input_ids = ids_tensor([self.batch_size, self.seq_length], self.vocab_size)
input_mask = None
if self.use_input_mask:
input_mask = random_attention_mask([self.batch_size, self.seq_length])
if input_mask is not None:
batch_size, seq_length = input_mask.shape
rnd_start_indices = np.random.randint(1, seq_length - 1, size=(batch_size,))
for batch_idx, start_index in enumerate(rnd_start_indices):
input_mask[batch_idx, :start_index] = 1
input_mask[batch_idx, start_index:] = 0
config = self.get_config()
return config, input_ids, input_mask
def get_config(self):
return Kosmos2TextConfig(
vocab_size=self.vocab_size,
embed_dim=self.hidden_size,
layers=self.num_hidden_layers,
attention_heads=self.num_attention_heads,
ffn_dim=self.intermediate_size,
dropout=self.dropout,
attention_dropout=self.attention_dropout,
max_position_embeddings=self.max_position_embeddings,
)
def prepare_config_and_inputs_for_common(self):
config_and_inputs = self.prepare_config_and_inputs()
config, input_ids, input_mask = config_and_inputs
inputs_dict = {"input_ids": input_ids, "attention_mask": input_mask}
return config, inputs_dict
class Kosmos2ModelTester:
def __init__(self, parent, text_kwargs=None, vision_kwargs=None, latent_query_num=3, is_training=True):
if text_kwargs is None:
text_kwargs = {}
if vision_kwargs is None:
vision_kwargs = {}
self.parent = parent
self.text_model_tester = Kosmos2TextModelTester(parent, **text_kwargs)
self.vision_model_tester = Kosmos2VisionModelTester(parent, **vision_kwargs)
self.latent_query_num = latent_query_num
self.is_training = is_training
def prepare_config_and_inputs(self):
text_config, input_ids, attention_mask = self.text_model_tester.prepare_config_and_inputs()
vision_config, pixel_values = self.vision_model_tester.prepare_config_and_inputs()
# build `image_embeds_position_mask`
image_embeds_position_mask = torch.zeros_like(input_ids)
image_embeds_position_mask[:, 1 : 1 + self.latent_query_num :] = 1
config = self.get_config()
return config, input_ids, attention_mask, image_embeds_position_mask, pixel_values
def get_config(self):
return Kosmos2Config(
self.text_model_tester.get_config().to_dict(),
self.vision_model_tester.get_config().to_dict(),
latent_query_num=self.latent_query_num,
)
def create_and_check_model(self, config, input_ids, attention_mask, image_embeds_position_mask, pixel_values):
model = Kosmos2Model(config).to(torch_device).eval()
with torch.no_grad():
result = model(pixel_values, input_ids, image_embeds_position_mask, attention_mask)
self.parent.assertEqual(
result.last_hidden_state.shape,
(self.text_model_tester.batch_size, self.text_model_tester.seq_length, self.text_model_tester.hidden_size),
)
self.parent.assertEqual(
result.image_embeds.shape,
(self.text_model_tester.batch_size, self.latent_query_num, self.text_model_tester.hidden_size),
)
def prepare_config_and_inputs_for_common(self):
config_and_inputs = self.prepare_config_and_inputs()
config, input_ids, attention_mask, image_embeds_position_mask, pixel_values = config_and_inputs
inputs_dict = {
"input_ids": input_ids,
"attention_mask": attention_mask,
"image_embeds_position_mask": image_embeds_position_mask,
"pixel_values": pixel_values,
}
return config, inputs_dict
@require_torch
class Kosmos2ModelTest(ModelTesterMixin, unittest.TestCase):
all_model_classes = (Kosmos2Model, Kosmos2ForConditionalGeneration) if is_torch_available() else ()
all_generative_model_classes = (Kosmos2ForConditionalGeneration,) if is_torch_available() else ()
fx_compatible = False
test_head_masking = False
test_pruning = False
test_resize_embeddings = False
test_attention_outputs = False
def _prepare_for_class(self, inputs_dict, model_class, return_labels=False):
inputs_dict = copy.deepcopy(inputs_dict)
if return_labels:
if model_class.__name__ == "Kosmos2ForConditionalGeneration":
inputs_dict["labels"] = torch.zeros(
(self.model_tester.text_model_tester.batch_size, self.model_tester.text_model_tester.seq_length),
dtype=torch.long,
device=torch_device,
)
return inputs_dict
def setUp(self):
self.model_tester = Kosmos2ModelTester(self)
self.config_tester = ConfigTester(self, config_class=Kosmos2Config, hidden_size=37)
# overwrite from common to skip `image_to_text_projection.latent_query`
def test_initialization(self):
config, inputs_dict = self.model_tester.prepare_config_and_inputs_for_common()
configs_no_init = _config_zero_init(config)
for model_class in self.all_model_classes:
model = model_class(config=configs_no_init)
for name, param in model.named_parameters():
if param.requires_grad:
if name == "image_to_text_projection.latent_query":
# The original code use ` nn.Parameter(torch.randn(...))` for which this test won't pass.
continue
self.assertIn(
((param.data.mean() * 1e9).round() / 1e9).item(),
[0.0, 1.0],
msg=f"Parameter {name} of model {model_class} seems not properly initialized",
)
def test_model(self):
config_and_inputs = self.model_tester.prepare_config_and_inputs()
self.model_tester.create_and_check_model(*config_and_inputs)
def test_forward_signature(self):
config, _ = self.model_tester.prepare_config_and_inputs_for_common()
for model_class in self.all_model_classes:
model = model_class(config)
signature = inspect.signature(model.forward)
# signature.parameters is an OrderedDict => so arg_names order is deterministic
arg_names = [*signature.parameters.keys()]
expected_arg_names = ["pixel_values"]
self.assertListEqual(arg_names[:1], expected_arg_names)
# overwrite from common in order to use `self.model_tester.text_model_tester.num_hidden_layers`
def test_hidden_states_output(self):
def check_hidden_states_output(inputs_dict, config, model_class):
model = model_class(config)
model.to(torch_device)
model.eval()
with torch.no_grad():
outputs = model(**self._prepare_for_class(inputs_dict, model_class))
hidden_states = outputs.hidden_states
expected_num_layers = getattr(
self.model_tester,
"expected_num_hidden_layers",
self.model_tester.text_model_tester.num_hidden_layers + 1,
)
self.assertEqual(len(hidden_states), expected_num_layers)
seq_length = self.model_tester.text_model_tester.seq_length
self.assertListEqual(
list(hidden_states[0].shape[-2:]),
[seq_length, self.model_tester.text_model_tester.hidden_size],
)
config, inputs_dict = self.model_tester.prepare_config_and_inputs_for_common()
for model_class in self.all_model_classes:
inputs_dict["output_hidden_states"] = True
check_hidden_states_output(inputs_dict, config, model_class)
# check that output_hidden_states also work using config
del inputs_dict["output_hidden_states"]
config.output_hidden_states = True
check_hidden_states_output(inputs_dict, config, model_class)
# overwrite from common in order to use `config.text_config.vocab_size` instead of `config.vocab_size`
def test_tie_model_weights(self):
if not self.test_torchscript:
return
config, inputs_dict = self.model_tester.prepare_config_and_inputs_for_common()
def check_same_values(layer_1, layer_2):
equal = True
for p1, p2 in zip(layer_1.weight, layer_2.weight):
if p1.data.ne(p2.data).sum() > 0:
equal = False
return equal
for model_class in self.all_model_classes:
config.torchscript = True
model_not_tied = model_class(config)
if model_not_tied.get_output_embeddings() is None:
continue
config_tied = copy.deepcopy(config)
config_tied.torchscript = False
model_tied = model_class(config_tied)
params_tied = list(model_tied.parameters())
# Check that the embedding layer and decoding layer are the same in size and in value
# self.assertTrue(check_same_values(embeddings, decoding))
# # Check that after modification, they remain the same.
# embeddings.weight.data.div_(2)
# # Check that the embedding layer and decoding layer are the same in size and in value
# self.assertTrue(embeddings.weight.shape, decoding.weight.shape)
# self.assertTrue(check_same_values(embeddings, decoding))
# # Check that after modification, they remain the same.
# decoding.weight.data.div_(4)
# # Check that the embedding layer and decoding layer are the same in size and in value
# self.assertTrue(embeddings.weight.shape, decoding.weight.shape)
# self.assertTrue(check_same_values(embeddings, decoding))
# Check that after resize they remain tied.
model_tied.resize_token_embeddings(config.text_config.vocab_size + 10)
params_tied_2 = list(model_tied.parameters())
self.assertEqual(len(params_tied_2), len(params_tied))
# decoding.weight.data.mul_(20)
# # Check that the embedding layer and decoding layer are the same in size and in value
# self.assertTrue(model.transformer.wte.weight.shape, model.lm_head.weight.shape)
# self.assertTrue(check_same_values(model.transformer.wte, model.lm_head))
@slow
def test_model_from_pretrained(self):
for model_name in KOSMOS2_PRETRAINED_MODEL_ARCHIVE_LIST[:1]:
model = Kosmos2Model.from_pretrained(model_name)
self.assertIsNotNone(model)
def _create_and_check_torchscript(self, config, inputs_dict):
if not self.test_torchscript:
return
configs_no_init = _config_zero_init(config) # To be sure we have no Nan
configs_no_init.torchscript = True
for model_class in self.all_model_classes:
model = model_class(config=configs_no_init)
model.to(torch_device)
model.eval()
inputs = self._prepare_for_class(inputs_dict, model_class)
main_input_name = model_class.main_input_name
try:
main_input = inputs[main_input_name]
model(main_input, inputs["input_ids"], inputs["image_embeds_position_mask"])
traced_model = torch.jit.trace(
model, (main_input, inputs["input_ids"], inputs["image_embeds_position_mask"])
)
except RuntimeError:
self.fail("Couldn't trace module.")
with tempfile.TemporaryDirectory() as tmp_dir_name:
pt_file_name = os.path.join(tmp_dir_name, "traced_model.pt")
try:
torch.jit.save(traced_model, pt_file_name)
except Exception:
self.fail("Couldn't save module.")
try:
loaded_model = torch.jit.load(pt_file_name)
except Exception:
self.fail("Couldn't load module.")
model.to(torch_device)
model.eval()
loaded_model.to(torch_device)
loaded_model.eval()
model_state_dict = model.state_dict()
loaded_model_state_dict = loaded_model.state_dict()
non_persistent_buffers = {}
for key in loaded_model_state_dict.keys():
if key not in model_state_dict.keys():
non_persistent_buffers[key] = loaded_model_state_dict[key]
loaded_model_state_dict = {
key: value for key, value in loaded_model_state_dict.items() if key not in non_persistent_buffers
}
self.assertEqual(set(model_state_dict.keys()), set(loaded_model_state_dict.keys()))
model_buffers = list(model.buffers())
for non_persistent_buffer in non_persistent_buffers.values():
found_buffer = False
for i, model_buffer in enumerate(model_buffers):
if torch.equal(non_persistent_buffer, model_buffer):
found_buffer = True
break
self.assertTrue(found_buffer)
model_buffers.pop(i)
models_equal = True
for layer_name, p1 in model_state_dict.items():
if layer_name in loaded_model_state_dict:
p2 = loaded_model_state_dict[layer_name]
if p1.data.ne(p2.data).sum() > 0:
models_equal = False
self.assertTrue(models_equal)
# Avoid memory leak. Without this, each call increase RAM usage by ~20MB.
# (Even with this call, there are still memory leak by ~0.04MB)
self.clear_torch_jit_class_registry()
# We will verify our results on an image of cute cats
def prepare_img():
url = "https://huggingface.co/hf-internal-testing/Kosmos2-test-image/resolve/main/demo.jpg"
im = Image.open(requests.get(url, stream=True).raw)
return im
@require_vision
@require_torch
@slow
class Kosmos2ModelIntegrationTest(unittest.TestCase):
def run_example(self, prompt, image, model, processor):
inputs = processor(text=prompt, images=image, return_tensors="pt", padding=True).to(torch_device)
generation_outputs = model.generate(
pixel_values=inputs["pixel_values"],
input_ids=inputs["input_ids"],
attention_mask=inputs["attention_mask"],
image_embeds=None,
image_embeds_position_mask=inputs["image_embeds_position_mask"],
use_cache=True,
max_new_tokens=128,
output_scores=True,
return_dict_in_generate=True,
)
scores = generation_outputs.scores
generated_ids = generation_outputs.sequences
generated_text = processor.batch_decode(generated_ids, skip_special_tokens=True)
# Specify `cleanup_and_extract=False` in order to see the raw model generation.
processed_text = [processor.post_process_generation(x, cleanup_and_extract=False) for x in generated_text]
# By default, the generated text is cleanup and the entities are extracted.
final_text_with_entities = [processor.post_process_generation(x) for x in generated_text]
return scores, generated_ids, generated_text, processed_text, final_text_with_entities
def test_snowman_image_captioning(self):
url = "https://huggingface.co/microsoft/kosmos-2-patch14-224/resolve/main/snowman.png"
image = Image.open(requests.get(url, stream=True).raw)
image.save("new_image.jpg")
image = Image.open("new_image.jpg")
model = AutoModelForVision2Seq.from_pretrained("microsoft/kosmos-2-patch14-224").to(torch_device)
processor = AutoProcessor.from_pretrained("microsoft/kosmos-2-patch14-224")
prompt = "<grounding>An image of"
scores, generated_ids, generated_text, processed_text, final_text_with_entities = self.run_example(
prompt, image, model, processor
)
processed_text = processed_text[0]
final_text, entities = final_text_with_entities[0]
np.testing.assert_allclose(
torch.concat(scores[1:4])[:3, :3].to("cpu").numpy(),
np.array(
[
[-1.5672581195831299, -5.007406711578369, 4.36448860168457],
[-2.147017002105713, -4.966302871704102, 4.592559337615967],
[-0.9352350831031799, -4.688288688659668, 6.240612983703613],
]
),
atol=1e-5,
)
np.testing.assert_allclose(
torch.concat(scores[-3:])[-3:, -3:].to("cpu").numpy(),
np.array(
[
[2.9916205406188965, 2.481820583343506, 4.646594524383545],
[-2.8381078243255615, -2.9687185287475586, -2.6926779747009277],
[-2.8909168243408203, -3.2228589057922363, -1.7056822776794434],
]
),
atol=1e-5,
)
# fmt: off
EXPECTED_IDS = [
[
0, 64003, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28,
29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54,
55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 64004, 64012, 712, 1648, 9, 64007, 10, 43867, 64008,
64009, 64057, 64876, 64010, 5950, 597, 32, 64007, 10, 646, 64008, 64009, 64018, 64924, 64010, 4, 2
]
]
# fmt: on
self.assertListEqual(generated_ids.to("cpu").numpy().tolist(), EXPECTED_IDS)
EXPECTED_PROCESSED_TEXT = (
"<grounding> An image of<phrase> a snowman</phrase><object><patch_index_0044><patch_index_0863></object> "
"warming himself by<phrase> a fire</phrase><object><patch_index_0005><patch_index_0911></object>."
)
self.assertEqual(processed_text, EXPECTED_PROCESSED_TEXT)
self.assertEqual(final_text, "An image of a snowman warming himself by a fire.")
EXPECTED_ENTITIES = [
("a snowman", (12, 21), [(0.390625, 0.046875, 0.984375, 0.828125)]),
("a fire", (41, 47), [(0.171875, 0.015625, 0.484375, 0.890625)]),
]
self.assertListEqual(entities, EXPECTED_ENTITIES)
# test with the detail caption generation
prompt = "<grounding>Describe this image in detail:"
scores, generated_ids, generated_text, processed_text, final_text_with_entities = self.run_example(
prompt, image, model, processor
)
processed_text = processed_text[0]
final_text, entities = final_text_with_entities[0]
np.testing.assert_allclose(
torch.concat(scores[1:4])[:3, :3].to("cpu").numpy(),
np.array(
[
[-0.9093570113182068, -4.578373908996582, 5.96360969543457],
[2.452126979827881, -4.090598106384277, 8.738677024841309],
[-0.7624598741531372, -4.771658897399902, 6.576295852661133],
]
),
atol=1e-5,
)
np.testing.assert_allclose(
torch.concat(scores[-3:])[-3:, -3:].to("cpu").numpy(),
np.array(
[
[-1.673659086227417, -2.162452220916748, -1.95430588722229],
[-2.006824493408203, -2.2038745880126953, -1.24686861038208],
[-3.2783470153808594, -2.814181089401245, -1.390632152557373],
]
),
atol=1e-5,
)
# fmt: off
EXPECTED_IDS_LONG = [
[
0, 64003, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28,
29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54,
55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 64004, 64012, 34645, 247, 38, 1648, 12, 3391, 55,
24, 1648, 1338, 10, 43867, 1280, 32, 64007, 10, 30879, 64008, 64009, 64018, 65020, 64010, 12, 5, 1842,
4, 71, 17, 1679, 64007, 10, 3958, 64008, 64009, 64061, 64263, 64010, 6, 64007, 15719, 64008, 64009,
64253, 64617, 64010, 6, 8, 64007, 9626, 64008, 64009, 64413, 64545, 64010, 6, 23, 64007, 10, 4363,
64008, 64009, 64623, 64885, 64010, 2255, 8, 64007, 10, 3486, 64008, 64009, 64809, 65036, 64010, 1560,
2255, 4, 24, 43867, 1684, 7, 27, 3774, 5, 10356, 9, 5, 646, 6, 8, 22, 1684, 7, 30, 10, 2007, 8, 16239,
4337, 4, 2
]
]
# fmt: on
self.assertListEqual(generated_ids.to("cpu").numpy().tolist(), EXPECTED_IDS_LONG)
EXPECTED_PROCESSED_TEXT_LONG = (
"<grounding> Describe this image in detail: The image features a snowman sitting by<phrase> a campfire"
"</phrase><object><patch_index_0005><patch_index_1007></object> in the snow. He is wearing<phrase> a hat"
"</phrase><object><patch_index_0048><patch_index_0250></object>,<phrase> scarf</phrase><object>"
"<patch_index_0240><patch_index_0604></object>, and<phrase> gloves</phrase><object><patch_index_0400>"
"<patch_index_0532></object>, with<phrase> a pot</phrase><object><patch_index_0610><patch_index_0872>"
"</object> nearby and<phrase> a cup</phrase><object><patch_index_0796><patch_index_1023></object> placed "
"nearby. The snowman appears to be enjoying the warmth of the fire, and it appears to have a warm and cozy "
"atmosphere."
)
self.assertEqual(processed_text, EXPECTED_PROCESSED_TEXT_LONG)
EXPECTED_FINAL_TEXT_LONG = (
"Describe this image in detail: The image features a snowman sitting by a campfire in the snow. He is "
"wearing a hat, scarf, and gloves, with a pot nearby and a cup placed nearby. The snowman appears to be "
"enjoying the warmth of the fire, and it appears to have a warm and cozy atmosphere."
)
self.assertEqual(final_text, EXPECTED_FINAL_TEXT_LONG)
EXPECTED_ENTITIES_LONG = [
("a campfire", (71, 81), [(0.171875, 0.015625, 0.484375, 0.984375)]),
("a hat", (109, 114), [(0.515625, 0.046875, 0.828125, 0.234375)]),
("scarf", (116, 121), [(0.515625, 0.234375, 0.890625, 0.578125)]),
("gloves", (127, 133), [(0.515625, 0.390625, 0.640625, 0.515625)]),
("a pot", (140, 145), [(0.078125, 0.609375, 0.265625, 0.859375)]),
("a cup", (157, 162), [(0.890625, 0.765625, 0.984375, 0.984375)]),
]
self.assertListEqual(entities, EXPECTED_ENTITIES_LONG)
def test_snowman_image_captioning_batch(self):
url = "https://huggingface.co/microsoft/kosmos-2-patch14-224/resolve/main/snowman.png"
image = Image.open(requests.get(url, stream=True).raw)
image.save("new_image.jpg")
image = Image.open("new_image.jpg")
model = AutoModelForVision2Seq.from_pretrained("microsoft/kosmos-2-patch14-224").to(torch_device)
prompt = ["<grounding>An image of", "<grounding>Describe this image in detail:"]
# left padding
processor = AutoProcessor.from_pretrained("microsoft/kosmos-2-patch14-224", padding_side="left")
scores, generated_ids, generated_text, processed_text, final_text_with_entities = self.run_example(
prompt, [image] * len(prompt), model, processor
)
all_final_text = [x[0] for x in final_text_with_entities]
all_entities = [x[1] for x in final_text_with_entities]
# left padding gives identical results as non-padding
EXPECTED_PROCESSED_TEXT_0 = (
"<grounding> An image of<phrase> a snowman</phrase><object><patch_index_0044><patch_index_0863></object> "
"warming himself by<phrase> a fire</phrase><object><patch_index_0005><patch_index_0911></object>."
)
EXPECTED_PROCESSED_TEXT_1 = (
"<grounding> Describe this image in detail: The image features a snowman sitting by<phrase> a campfire"
"</phrase><object><patch_index_0005><patch_index_1007></object> in the snow. He is wearing<phrase> a hat"
"</phrase><object><patch_index_0048><patch_index_0250></object>,<phrase> scarf</phrase><object>"
"<patch_index_0240><patch_index_0604></object>, and<phrase> gloves</phrase><object><patch_index_0400>"
"<patch_index_0532></object>, with<phrase> a pot</phrase><object><patch_index_0610><patch_index_0872>"
"</object> nearby and<phrase> a cup</phrase><object><patch_index_0796><patch_index_1023></object> placed "
"nearby. The snowman appears to be enjoying the warmth of the fire, and it appears to have a warm and cozy "
"atmosphere."
)
self.assertListEqual(processed_text, [EXPECTED_PROCESSED_TEXT_0, EXPECTED_PROCESSED_TEXT_1])
EXPECTED_FINAL_TEXT_0 = "An image of a snowman warming himself by a fire."
EXPECTED_FINAL_TEXT_1 = (
"Describe this image in detail: The image features a snowman sitting by a campfire in the snow. He is "
"wearing a hat, scarf, and gloves, with a pot nearby and a cup placed nearby. The snowman appears to be "
"enjoying the warmth of the fire, and it appears to have a warm and cozy atmosphere."
)
self.assertListEqual(all_final_text, [EXPECTED_FINAL_TEXT_0, EXPECTED_FINAL_TEXT_1])
EXPECTED_ENTITIES_0 = [
("a snowman", (12, 21), [(0.390625, 0.046875, 0.984375, 0.828125)]),
("a fire", (41, 47), [(0.171875, 0.015625, 0.484375, 0.890625)]),
]
EXPECTED_ENTITIES_1 = [
("a campfire", (71, 81), [(0.171875, 0.015625, 0.484375, 0.984375)]),
("a hat", (109, 114), [(0.515625, 0.046875, 0.828125, 0.234375)]),
("scarf", (116, 121), [(0.515625, 0.234375, 0.890625, 0.578125)]),
("gloves", (127, 133), [(0.515625, 0.390625, 0.640625, 0.515625)]),
("a pot", (140, 145), [(0.078125, 0.609375, 0.265625, 0.859375)]),
("a cup", (157, 162), [(0.890625, 0.765625, 0.984375, 0.984375)]),
]
self.assertListEqual(all_entities, [EXPECTED_ENTITIES_0, EXPECTED_ENTITIES_1])
# right padding
processor = AutoProcessor.from_pretrained("microsoft/kosmos-2-patch14-224")
scores, generated_ids, generated_text, processed_text, final_text_with_entities = self.run_example(
prompt, [image] * len(prompt), model, processor
)
all_final_text = [x[0] for x in final_text_with_entities]
all_entities = [x[1] for x in final_text_with_entities]
# For right padding, only the non-padded sequences will give the same results as non-padding
self.assertEqual(processed_text[1], EXPECTED_PROCESSED_TEXT_1)
self.assertEqual(all_final_text[1], EXPECTED_FINAL_TEXT_1)
self.assertListEqual(all_entities[1], EXPECTED_ENTITIES_1)
# coding=utf-8
# Copyright 2023 Microsoft Research and The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import shutil
import tempfile
import unittest
import numpy as np
import pytest
import requests
from transformers.testing_utils import (
get_tests_dir,
require_sentencepiece,
require_tokenizers,
require_torch,
require_vision,
)
from transformers.utils import is_vision_available
if is_vision_available():
from PIL import Image
from transformers import (
AutoProcessor,
CLIPImageProcessor,
Kosmos2Processor,
PreTrainedTokenizerFast,
XLMRobertaTokenizer,
XLMRobertaTokenizerFast,
)
SAMPLE_VOCAB = get_tests_dir("fixtures/test_sentencepiece.model")
@require_sentencepiece
@require_tokenizers
@require_vision
class Kosmos2ProcessorTest(unittest.TestCase):
def setUp(self):
self.tmpdirname = tempfile.mkdtemp()
image_processor = CLIPImageProcessor(use_square_size=True)
# We have a SentencePiece fixture for testing
slow_tokenizer = XLMRobertaTokenizer(SAMPLE_VOCAB)
fast_tokenizer = XLMRobertaTokenizerFast(__slow_tokenizer=slow_tokenizer)
processor = Kosmos2Processor(image_processor, fast_tokenizer)
processor.save_pretrained(self.tmpdirname)
def get_tokenizer(self, **kwargs):
return AutoProcessor.from_pretrained(self.tmpdirname, **kwargs).tokenizer
def get_image_processor(self, **kwargs):
return AutoProcessor.from_pretrained(self.tmpdirname, **kwargs).image_processor
def tearDown(self):
shutil.rmtree(self.tmpdirname)
def prepare_image_inputs(self):
"""This function prepares a list of PIL images, or a list of numpy arrays if one specifies numpify=True,
or a list of PyTorch tensors if one specifies torchify=True.
"""
image_inputs = [np.random.randint(255, size=(3, 30, 400), dtype=np.uint8)]
image_inputs = [Image.fromarray(np.moveaxis(x, 0, -1)) for x in image_inputs]
return image_inputs
def test_save_load_pretrained_additional_features(self):
processor = Kosmos2Processor(tokenizer=self.get_tokenizer(), image_processor=self.get_image_processor())
processor.save_pretrained(self.tmpdirname)
tokenizer_add_kwargs = self.get_tokenizer(bos_token="(BOS)", eos_token="(EOS)")
image_processor_add_kwargs = self.get_image_processor(do_normalize=False, padding_value=1.0)
processor = Kosmos2Processor.from_pretrained(
self.tmpdirname, bos_token="(BOS)", eos_token="(EOS)", do_normalize=False, padding_value=1.0
)
self.assertEqual(processor.tokenizer.get_vocab(), tokenizer_add_kwargs.get_vocab())
self.assertIsInstance(processor.tokenizer, PreTrainedTokenizerFast)
self.assertEqual(processor.image_processor.to_json_string(), image_processor_add_kwargs.to_json_string())
self.assertIsInstance(processor.image_processor, CLIPImageProcessor)
def test_image_processor(self):
image_processor = self.get_image_processor()
tokenizer = self.get_tokenizer()
processor = Kosmos2Processor(tokenizer=tokenizer, image_processor=image_processor)
image_input = self.prepare_image_inputs()
input_image_processor = image_processor(image_input, return_tensors="np")
input_processor = processor(images=image_input, return_tensors="np")
for key in input_image_processor.keys():
self.assertAlmostEqual(input_image_processor[key].sum(), input_processor[key].sum(), delta=1e-2)
def test_tokenizer(self):
image_processor = self.get_image_processor()
tokenizer = self.get_tokenizer()
processor = Kosmos2Processor(tokenizer=tokenizer, image_processor=image_processor)
input_str = "This is a test"
encoded_processor = processor(text=input_str, add_eos_token=True)
encoded_tok = tokenizer(input_str, return_token_type_ids=False)
for key in encoded_tok.keys():
self.assertListEqual(encoded_tok[key], encoded_processor[key])
def test_processor(self):
image_processor = self.get_image_processor()
tokenizer = self.get_tokenizer()
processor = Kosmos2Processor(tokenizer=tokenizer, image_processor=image_processor)
input_str = "This is a test"
image_input = self.prepare_image_inputs()
inputs = processor(text=input_str, images=image_input)
self.assertListEqual(
list(inputs.keys()), ["pixel_values", "input_ids", "attention_mask", "image_embeds_position_mask"]
)
# test if it raises when no input is passed
with pytest.raises(ValueError):
processor()
def test_tokenizer_decode(self):
image_processor = self.get_image_processor()
tokenizer = self.get_tokenizer()
processor = Kosmos2Processor(tokenizer=tokenizer, image_processor=image_processor)
predicted_ids = [[1, 4, 5, 8, 1, 0, 8], [3, 4, 3, 1, 1, 8, 9]]
decoded_processor = processor.batch_decode(predicted_ids)
decoded_tok = tokenizer.batch_decode(predicted_ids)
self.assertListEqual(decoded_tok, decoded_processor)
def test_model_input_names(self):
image_processor = self.get_image_processor()
tokenizer = self.get_tokenizer()
processor = Kosmos2Processor(tokenizer=tokenizer, image_processor=image_processor)
input_str = "This is a test"
image_input = self.prepare_image_inputs()
# both image and text
inputs = processor(text=input_str, images=image_input)
self.assertListEqual(
list(inputs.keys()), ["pixel_values", "input_ids", "attention_mask", "image_embeds_position_mask"]
)
# only text
inputs = processor(text=input_str)
self.assertListEqual(list(inputs.keys()), ["input_ids", "attention_mask"])
# only image
inputs = processor(images=image_input)
self.assertListEqual(list(inputs.keys()), ["pixel_values"])
@require_torch
def test_full_processor(self):
url = "https://huggingface.co/microsoft/kosmos-2-patch14-224/resolve/main/two_dogs.jpg"
processor = Kosmos2Processor.from_pretrained("microsoft/kosmos-2-patch14-224")
# test with different input formats.
# fmt: off
texts = [
# no phrase
"<grounding> Two puppies sit in a field of grass.",
# 1 phrase
"<grounding> <phrase> Two puppies </phrase> sit in a field of grass.",
# 2 phrases
"<grounding> <phrase> Two puppies </phrase> sit in a field of <phrase> grass </phrase>.",
# 2 phrases: bboxes already specified for the 1st phrase
"<grounding> <phrase> Two puppies </phrase> <object> <patch_index_0079> <patch_index_1016> </delimiter_of_multi_objects/> <patch_index_0135> <patch_index_1008> </object> sit in a field of <phrase> grass </phrase>.",
]
# fmt: on
image = Image.open(requests.get(url, stream=True).raw)
# To match the official (microsoft) Kosmos-2 demo from which the expected values here are grabbed
image_path = os.path.join(self.tmpdirname, "image.jpg")
image.save(image_path)
image = Image.open(image_path)
# fmt: off
bboxes = [
[None, []],
[[None], [[]], [(79, 1016)], [[(79, 1016)]], [[(79, 1016), (135, 1008)]]],
[[[(79, 1016), (135, 1008)], None], [[(79, 1016), (135, 1008)], []], [[(79, 1016), (135, 1008)], (480, 1023)], [[(79, 1016), (135, 1008)], [(480, 1023)]]],
[[None, [(480, 1023)]]],
]
# fmt: on
batch_image = [image] * 4
batch_text = [texts[0], texts[1], texts[1], texts[2]]
batch_bboxes = [
None, # no phrase
[[]], # 1 phrase: no bbox
[(79, 1016)], # 1 phrase: 1 bbox
[[(79, 1016), (135, 1008)], (480, 1023)], # 2 phrase: 2 bboxes + 1 bbox
]
# fmt: off
expected_input_ids = [
[0, 64012, 1264, 17772, 1357, 12, 10, 770, 9, 4464, 4, 2],
[0, 64012, 64007, 1264, 17772, 64008, 1357, 12, 10, 770, 9, 4464, 4, 2],
[0, 64012, 64007, 1264, 17772, 64008, 64009, 64092, 65029, 64010, 1357, 12, 10, 770, 9, 4464, 4, 2],
[0, 64012, 64007, 1264, 17772, 64008, 64009, 64092, 65029, 64011, 64148, 65021, 64010, 1357, 12, 10, 770, 9, 4464, 4, 2],
[0, 64012, 64007, 1264, 17772, 64008, 64009, 64092, 65029, 64011, 64148, 65021, 64010, 1357, 12, 10, 770, 9, 64007, 4464, 64008, 106, 4, 2],
[0, 64012, 64007, 1264, 17772, 64008, 64009, 64092, 65029, 64011, 64148, 65021, 64010, 1357, 12, 10, 770, 9, 64007, 4464, 64008, 64009, 64493, 65036, 64010, 106, 4, 2],
]
# fmt: on
EXPECTED_PIXEL_VALUES_1 = np.array(
[
[
[-0.6535852551460266, -0.6389868259429932, -0.6243883967399597],
[-0.6535852551460266, -0.6389868259429932, -0.6243883967399597],
[-0.6243883967399597, -0.6243883967399597, -0.5951915383338928],
],
[
[-0.20629698038101196, -0.19128920137882233, -0.19128920137882233],
[-0.20629698038101196, -0.19128920137882233, -0.17628143727779388],
[-0.2213047444820404, -0.20629698038101196, -0.16127367317676544],
],
[
[-0.5843556523323059, -0.5701355338096619, -0.5701355338096619],
[-0.5843556523323059, -0.5701355338096619, -0.5559154152870178],
[-0.5843556523323059, -0.5559154152870178, -0.5416953563690186],
],
]
)
EXPECTED_PIXEL_VALUES_2 = np.array(
[
[
[-0.4346088469028473, -0.47840413451194763, -0.7849710583686829],
[-0.5221993923187256, -0.5076009631156921, -0.755774199962616],
[-0.5221993923187256, -0.5076009631156921, -0.7411757707595825],
],
[
[-0.2813358008861542, -0.2963435649871826, -0.431413471698761],
[-0.26632803678512573, -0.2963435649871826, -0.4764367938041687],
[-0.2213047444820404, -0.2813358008861542, -0.49144455790519714],
],
[
[-0.5701355338096619, -0.641235888004303, -0.7549964189529419],
[-0.5843556523323059, -0.641235888004303, -0.7834365367889404],
[-0.5559154152870178, -0.641235888004303, -0.7834365367889404],
],
]
)
def check(texts, bboxes, expected_input_ids):
outputs = processor(images=None, text=texts, bboxes=bboxes, add_eos_token=True)
self.assertListEqual(outputs.input_ids, expected_input_ids)
# no phrase
check(texts[0], bboxes[0][0], expected_input_ids[0])
# no phrase
check(texts[0], bboxes[0][1], expected_input_ids[0])
# 1 phrase: no bbox
check(texts[1], bboxes[1][0], expected_input_ids[1])
# 1 phrase: no bbox
check(texts[1], bboxes[1][1], expected_input_ids[1])
# 1 phrase: 1 bbox
check(texts[1], bboxes[1][2], expected_input_ids[2])
# 1 phrase: 1 bbox
check(texts[1], bboxes[1][3], expected_input_ids[2])
# 1 phrase: 2 bboxes
check(texts[1], bboxes[1][4], expected_input_ids[3])
# could not contain `[None]`
with pytest.raises(ValueError):
_ = processor.preprocess_examples(images=None, texts=texts[1], bboxes=[[None]])
# 2 phrase: 2 bboxes + no bbox
check(texts[2], bboxes[2][0], expected_input_ids[4])
# 2 phrase: 2 bboxes + no bbox
check(texts[2], bboxes[2][1], expected_input_ids[4])
# 2 phrase: 2 bboxes + 1 bbox
check(texts[2], bboxes[2][2], expected_input_ids[5])
# 2 phrase: 2 bboxes + 1 bbox
check(texts[2], bboxes[2][3], expected_input_ids[5])
# 2 phrase: no box (as already specified in the text) + 1 bbox
check(texts[3], bboxes[3][0], expected_input_ids[5])
# could not contain `[None]`
with pytest.raises(ValueError):
_ = processor.preprocess_examples(images=None, texts=texts[2], bboxes=[[(79, 1016), (135, 1008)], [None]])
# test batch
outputs = processor(
images=None,
text=batch_text,
bboxes=batch_bboxes,
add_eos_token=True,
)
self.assertListEqual(
outputs.input_ids,
[expected_input_ids[0], expected_input_ids[1], expected_input_ids[2], expected_input_ids[5]],
)
# test batch with padding (without `return_tensors`)
outputs = processor(
images=None,
text=batch_text,
bboxes=batch_bboxes,
padding=True,
add_eos_token=True,
)
# padding on the right
self.assertListEqual(
outputs.input_ids[0],
expected_input_ids[0] + [1] * (len(expected_input_ids[5]) - len(expected_input_ids[0])),
)
self.assertListEqual(
outputs.attention_mask[0],
[1] * len(expected_input_ids[0]) + [0] * (len(expected_input_ids[5]) - len(expected_input_ids[0])),
)
# no padding for the longest sequence
self.assertListEqual(outputs.input_ids[-1], expected_input_ids[5])
self.assertListEqual(outputs.attention_mask[-1], [1] * len(expected_input_ids[5]))
# test batch with padding (with `return_tensors`)
outputs = processor(
images=None,
text=batch_text,
bboxes=batch_bboxes,
return_tensors="pt",
padding=True,
add_eos_token=True,
)
# padding on the right
self.assertListEqual(
outputs.input_ids.numpy().tolist()[0],
expected_input_ids[0] + [1] * (len(expected_input_ids[5]) - len(expected_input_ids[0])),
)
self.assertListEqual(
outputs.attention_mask.numpy().tolist()[0],
[1] * len(expected_input_ids[0]) + [0] * (len(expected_input_ids[5]) - len(expected_input_ids[0])),
)
# no padding for the longest sequence
self.assertListEqual(outputs.input_ids.numpy().tolist()[-1], expected_input_ids[5])
self.assertListEqual(outputs.attention_mask.numpy().tolist()[-1], [1] * len(expected_input_ids[5]))
# test with image
num_image_tokens = 64
outputs = processor(images=image, text=texts[0], bboxes=None, add_eos_token=True)
self.assertTupleEqual(outputs.pixel_values[0].shape, (3, 224, 224))
self.assertListEqual(
outputs.input_ids,
[0, 64003] + list(range(4, 4 + num_image_tokens)) + [64004] + expected_input_ids[0][1:],
)
self.assertListEqual(
outputs.image_embeds_position_mask,
[0] * 2 + [1] * num_image_tokens + [0] + [0] * (len(expected_input_ids[0]) - 1),
)
np.testing.assert_allclose(outputs.pixel_values[0][:3, :3, :3], EXPECTED_PIXEL_VALUES_1, atol=1e-9)
np.testing.assert_allclose(outputs.pixel_values[0][:3, -3:, -3:], EXPECTED_PIXEL_VALUES_2, atol=1e-9)
# test with image in batch (right padding)
outputs = processor(
images=batch_image,
text=batch_text,
bboxes=batch_bboxes,
return_tensors="pt",
padding=True,
add_eos_token=True,
)
self.assertTupleEqual(outputs.pixel_values.shape, (4, 3, 224, 224))
np.testing.assert_allclose(
outputs.pixel_values[:, :3, :3, :3].numpy(), [EXPECTED_PIXEL_VALUES_1] * len(batch_image), atol=1e-9
)
np.testing.assert_allclose(
outputs.pixel_values[:, :3, -3:, -3:].numpy(), [EXPECTED_PIXEL_VALUES_2] * len(batch_image), atol=1e-9
)
# padding on the right: the `[1:]` below is because the part for `BOS` is already added in the beginning of each (dynamically computed) expected value # noqa
# fmt: off
EXPECTED_IDS_BATCH_RIGHT_PADDING = [
[0, 64003] + list(range(4, 4 + num_image_tokens)) + [64004] + expected_input_ids[0][1:] + [1] * (len(expected_input_ids[5]) - len(expected_input_ids[0])),
[0, 64003] + list(range(4, 4 + num_image_tokens)) + [64004] + expected_input_ids[5][1:],
]
EXPECTED_MASK_BATCH_RIGHT_PADDING = [
[1, 1] + [1] * num_image_tokens + [1] + [1] * len(expected_input_ids[0][1:]) + [0] * (len(expected_input_ids[5]) - len(expected_input_ids[0])),
[1] * (2 + num_image_tokens + len(expected_input_ids[5])),
]
# fmt: on
self.assertListEqual(outputs.input_ids.numpy().tolist()[0], EXPECTED_IDS_BATCH_RIGHT_PADDING[0])
self.assertListEqual(outputs.attention_mask.numpy().tolist()[0], EXPECTED_MASK_BATCH_RIGHT_PADDING[0])
self.assertListEqual(outputs.input_ids.numpy().tolist()[-1], EXPECTED_IDS_BATCH_RIGHT_PADDING[-1])
self.assertListEqual(outputs.attention_mask.numpy().tolist()[-1], EXPECTED_MASK_BATCH_RIGHT_PADDING[-1])
self.assertListEqual(
outputs.image_embeds_position_mask.numpy().tolist(),
[[0, 0] + [1] * num_image_tokens + [0] + [0] * (len(expected_input_ids[5]) - 1)] * len(batch_image),
)
processor = Kosmos2Processor.from_pretrained("microsoft/kosmos-2-patch14-224", padding_side="left")
# test with image in batch (left padding)
outputs = processor(
images=batch_image,
text=batch_text,
bboxes=batch_bboxes,
return_tensors="pt",
padding=True,
add_eos_token=True,
)
# padding on the left: the `[1:]` below is because the part for `BOS` is already added in the beginning of each (dynamically computed) expected value # noqa
# fmt: off
EXPECTED_IDS_BATCH = [
[1] * (len(expected_input_ids[5]) - len(expected_input_ids[0])) + [0, 64003] + list(range(4, 4 + num_image_tokens)) + [64004] + expected_input_ids[0][1:],
[0, 64003] + list(range(4, 4 + num_image_tokens)) + [64004] + expected_input_ids[5][1:],
]
EXPECTED_MASK_BATCH =[
[0] * (len(expected_input_ids[5]) - len(expected_input_ids[0])) + [1, 1] + [1] * num_image_tokens + [1] + [1] * len(expected_input_ids[0][1:]),
[1] * (2 + num_image_tokens + len(expected_input_ids[5])),
]
EXPECTED_IMG_POS_MASK_BATCH = [
[0] * (len(expected_input_ids[5]) - len(expected_input_ids[0])) + [0, 0] + [1] * num_image_tokens + [0] + [0] * len(expected_input_ids[0][1:]),
[0, 0] + [1] * num_image_tokens + [0] + [0] * (len(expected_input_ids[5]) - 1),
]
# fmt: on
self.assertListEqual(outputs.input_ids.numpy().tolist()[0], EXPECTED_IDS_BATCH[0])
self.assertListEqual(outputs.attention_mask.numpy().tolist()[0], EXPECTED_MASK_BATCH[0])
self.assertListEqual(outputs.image_embeds_position_mask.numpy().tolist()[0], EXPECTED_IMG_POS_MASK_BATCH[0])
# no padding for the longest sequence
self.assertListEqual(outputs.input_ids.numpy().tolist()[-1], EXPECTED_IDS_BATCH[-1])
self.assertListEqual(outputs.attention_mask.numpy().tolist()[-1], EXPECTED_MASK_BATCH[-1])
self.assertListEqual(outputs.image_embeds_position_mask.numpy().tolist()[-1], EXPECTED_IMG_POS_MASK_BATCH[-1])
......@@ -73,6 +73,9 @@ PRIVATE_MODELS = [
"MaskFormerSwinPreTrainedModel",
"BridgeTowerTextModel",
"BridgeTowerVisionModel",
"Kosmos2TextModel",
"Kosmos2TextForCausalLM",
"Kosmos2VisionModel",
]
# Update this list for models that are not tested with a comment explaining the reason it should not be.
......
......@@ -618,6 +618,7 @@ src/transformers/models/instructblip/processing_instructblip.py
src/transformers/models/jukebox/configuration_jukebox.py
src/transformers/models/jukebox/convert_jukebox.py
src/transformers/models/jukebox/modeling_jukebox.py
src/transformers/models/kosmos2/convert_kosmos2_original_pytorch_checkpoint_to_pytorch.py
src/transformers/models/led/configuration_led.py
src/transformers/models/led/modeling_led.py
src/transformers/models/led/modeling_tf_led.py
......
docs/source/en/generation_strategies.md
docs/source/en/model_doc/ctrl.md
docs/source/en/model_doc/kosmos-2.md
docs/source/en/model_doc/seamless_m4t.md
docs/source/en/task_summary.md
docs/source/en/tasks/prompting.md
src/transformers/models/blip_2/modeling_blip_2.py
src/transformers/models/ctrl/modeling_ctrl.py
src/transformers/models/kosmos2/modeling_kosmos2.py
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment