Commit 81028572 authored by luopl's avatar luopl
Browse files

init

parents
Pipeline #1722 canceled with stages
from multiprocessing import Pool
import os
from typing import Callable, Iterable, Sized
from rich.progress import (BarColumn, MofNCompleteColumn, Progress, Task,
TaskProgressColumn, TextColumn, TimeRemainingColumn)
from rich.text import Text
import os.path as osp
import portalocker
from ..smp import load, dump
class _Worker:
"""Function wrapper for ``track_progress_rich``"""
def __init__(self, func) -> None:
self.func = func
def __call__(self, inputs):
inputs, idx = inputs
if not isinstance(inputs, (tuple, list, dict)):
inputs = (inputs, )
if isinstance(inputs, dict):
return self.func(**inputs), idx
else:
return self.func(*inputs), idx
class _SkipFirstTimeRemainingColumn(TimeRemainingColumn):
"""Skip calculating remaining time for the first few times.
Args:
skip_times (int): The number of times to skip. Defaults to 0.
"""
def __init__(self, *args, skip_times=0, **kwargs):
super().__init__(*args, **kwargs)
self.skip_times = skip_times
def render(self, task: Task) -> Text:
"""Show time remaining."""
if task.completed <= self.skip_times:
return Text('-:--:--', style='progress.remaining')
return super().render(task)
def _tasks_with_index(tasks):
"""Add index to tasks."""
for idx, task in enumerate(tasks):
yield task, idx
def track_progress_rich(func: Callable,
tasks: Iterable = tuple(),
task_num: int = None,
nproc: int = 1,
chunksize: int = 1,
description: str = 'Processing',
save=None, keys=None,
color: str = 'blue') -> list:
"""Track the progress of parallel task execution with a progress bar. The
built-in :mod:`multiprocessing` module is used for process pools and tasks
are done with :func:`Pool.map` or :func:`Pool.imap_unordered`.
Args:
func (callable): The function to be applied to each task.
tasks (Iterable or Sized): A tuple of tasks. There are several cases
for different format tasks:
- When ``func`` accepts no arguments: tasks should be an empty
tuple, and ``task_num`` must be specified.
- When ``func`` accepts only one argument: tasks should be a tuple
containing the argument.
- When ``func`` accepts multiple arguments: tasks should be a
tuple, with each element representing a set of arguments.
If an element is a ``dict``, it will be parsed as a set of
keyword-only arguments.
Defaults to an empty tuple.
task_num (int, optional): If ``tasks`` is an iterator which does not
have length, the number of tasks can be provided by ``task_num``.
Defaults to None.
nproc (int): Process (worker) number, if nuproc is 1,
use single process. Defaults to 1.
chunksize (int): Refer to :class:`multiprocessing.Pool` for details.
Defaults to 1.
description (str): The description of progress bar.
Defaults to "Process".
color (str): The color of progress bar. Defaults to "blue".
Examples:
>>> import time
>>> def func(x):
... time.sleep(1)
... return x**2
>>> track_progress_rich(func, range(10), nproc=2)
Returns:
list: The task results.
"""
if save is not None:
assert osp.exists(osp.dirname(save)) or osp.dirname(save) == ''
if not osp.exists(save):
dump({}, save)
if keys is not None:
assert len(keys) == len(tasks)
if not callable(func):
raise TypeError('func must be a callable object')
if not isinstance(tasks, Iterable):
raise TypeError(
f'tasks must be an iterable object, but got {type(tasks)}')
if isinstance(tasks, Sized):
if len(tasks) == 0:
if task_num is None:
raise ValueError('If tasks is an empty iterable, '
'task_num must be set')
else:
tasks = tuple(tuple() for _ in range(task_num))
else:
if task_num is not None and task_num != len(tasks):
raise ValueError('task_num does not match the length of tasks')
task_num = len(tasks)
if nproc <= 0:
raise ValueError('nproc must be a positive number')
skip_times = nproc * chunksize if nproc > 1 else 0
prog_bar = Progress(
TextColumn('{task.description}'),
BarColumn(),
_SkipFirstTimeRemainingColumn(skip_times=skip_times),
MofNCompleteColumn(),
TaskProgressColumn(show_speed=True),
)
worker = _Worker(func)
task_id = prog_bar.add_task(
total=task_num, color=color, description=description)
tasks = _tasks_with_index(tasks)
# Use single process when nproc is 1, else use multiprocess.
with prog_bar:
if nproc == 1:
results = []
for task in tasks:
result, idx = worker(task)
results.append(result)
if save is not None:
with portalocker.Lock(save, timeout=5) as fh:
ans = load(save)
ans[keys[idx]] = result
if os.environ.get('VERBOSE', True):
print(keys[idx], result, flush=True)
dump(ans, save)
fh.flush()
os.fsync(fh.fileno())
prog_bar.update(task_id, advance=1, refresh=True)
else:
with Pool(nproc) as pool:
results = []
unordered_results = []
gen = pool.imap_unordered(worker, tasks, chunksize)
try:
for result in gen:
result, idx = result
unordered_results.append((result, idx))
if save is not None:
with portalocker.Lock(save, timeout=5) as fh:
ans = load(save)
ans[keys[idx]] = result
if os.environ.get('VERBOSE', False):
print(keys[idx], result, flush=True)
dump(ans, save)
fh.flush()
os.fsync(fh.fileno())
results.append(None)
prog_bar.update(task_id, advance=1, refresh=True)
except Exception as e:
prog_bar.stop()
raise e
for result, idx in unordered_results:
results[idx] = result
return results
from ..smp import *
from ..dataset.utils.judge_util import build_judge
from ..dataset.utils.multiple_choice import extract_answer_from_item
from .matching_util import can_infer
from .mp_util import track_progress_rich
def MMMU_result_transfer(result_path):
res = {}
result_data = load(result_path)
mcq = result_data['A'].notna()
lt = len(result_data)
for i in range(lt):
line = result_data.iloc[i]
if mcq[i]:
options = {
cand: line[cand]
for cand in string.ascii_uppercase
if cand in line and not pd.isna(line[cand])
}
prediction = line['prediction']
infer_prediction = can_infer(prediction, options)
res[line['id']] = infer_prediction
else:
res[line['id']] = line['prediction']
result_json = result_path.replace('.xlsx', '.json')
dump(res, result_json)
return result_json
def MMTBench_result_transfer(eval_file, dataset='default', **judge_kwargs):
logger = get_logger('Evaluation')
nproc = judge_kwargs.pop('nproc', 4)
rd.seed(2680)
suffix = eval_file.split('.')[-1]
model = judge_kwargs['model']
assert model in ['chatgpt-0125', 'exact_matching', 'gpt-4-0125']
name_str_map = {
'chatgpt-0125': 'openai',
'gpt-4-0125': 'gpt4'
}
name_str = name_str_map[model] if model in name_str_map else model
if model == 'exact_matching':
model = None
elif gpt_key_set():
model = build_judge(**judge_kwargs)
if not model.working():
logger.error('The OPENAI API is not working properly, will use exact matching for evaluation')
model = None
else:
logger.error('OPENAI_API_KEY is not set properly, will use exact matching for evaluation')
model = None
logger.info(f'Evaluating {eval_file}')
result_file = eval_file.replace(f'.{suffix}', f'_{name_str}_option.pkl')
result = {}
if osp.exists(result_file):
result = load(result_file)
data = load(eval_file)
assert 'index' in data, 'Essentail columns missing in the eval_file.'
data = data.sort_values(by='index')
data['prediction'] = [str(x) for x in data['prediction']]
for k in data.keys():
data[k.lower() if k not in list(string.ascii_uppercase) else k] = data.pop(k)
idx2lines = {data.iloc[i]['index']: data.iloc[i] for i in range(len(data))}
idx2lines = {k: v for k, v in idx2lines.items() if k not in result}
indices = list(idx2lines.keys())
lines = [idx2lines[i] for i in indices]
tups = [(model, line) for line in lines]
res = track_progress_rich(
extract_answer_from_item,
tups,
nproc=nproc,
chunksize=nproc,
save=result_file,
keys=indices)
for i, r in zip(indices, res):
if i in result:
assert result[i]['opt'] == r['opt'] and result[i]['log'] == r['log']
else:
result[i] = r
indices = list(data['index'])
data['opt'] = [result[i]['opt'] for i in data['index']]
data['log'] = [result[i]['log'] for i in data['index']]
# load split
output_path = eval_file.replace(f'.{suffix}', f'_{name_str}_submission.tsv')
dump(data, eval_file.replace(f'.{suffix}', f'_{name_str}_submission.tsv'))
return output_path
import torch
torch.set_grad_enabled(False)
torch.manual_seed(1234)
from .base import BaseModel
from .cogvlm import CogVlm, GLM4v
from .emu import Emu
from .eagle_x import Eagle
from .idefics import IDEFICS, IDEFICS2
from .instructblip import InstructBLIP
from .llava import LLaVA, LLaVA_Next, LLaVA_XTuner, LLaVA_Next2, LLaVA_OneVision
from .minicpm_v import MiniCPM_V, MiniCPM_Llama3_V, MiniCPM_V_2_6
from .minigpt4 import MiniGPT4
from .mmalaya import MMAlaya, MMAlaya2
from .monkey import Monkey, MonkeyChat
from .moondream import Moondream1, Moondream2
from .minimonkey import MiniMonkey
from .mplug_owl2 import mPLUG_Owl2
from .omnilmm import OmniLMM12B
from .open_flamingo import OpenFlamingo
from .pandagpt import PandaGPT
from .qwen_vl import QwenVL, QwenVLChat
from .qwen2_vl import Qwen2VLChat
from .transcore_m import TransCoreM
from .visualglm import VisualGLM
from .xcomposer import ShareCaptioner, XComposer, XComposer2, XComposer2_4KHD, XComposer2d5
from .yi_vl import Yi_VL
from .internvl_chat import InternVLChat
from .deepseek_vl import DeepSeekVL
from .mgm import Mini_Gemini
from .bunnyllama3 import BunnyLLama3
from .vxverse import VXVERSE
from .paligemma import PaliGemma
from .qh_360vl import QH_360VL
from .phi3_vision import Phi3Vision, Phi3_5Vision
from .wemm import WeMM
from .cambrian import Cambrian
from .chameleon import Chameleon
from .video_llm import VideoLLaVA, VideoLLaVA_HF, Chatunivi, VideoChatGPT, LLaMAVID, VideoChat2_HD, PLLaVA
from .vila import VILA
from .ovis import Ovis, Ovis1_6
from .mantis import Mantis
from .mixsense import LLama3Mixsense
from .parrot import Parrot
from .omchat import OmChat
from .rbdash import RBDash
from .xgen_mm import XGenMM
from .slime import SliME
from .mplug_owl3 import mPLUG_Owl3
from .pixtral import Pixtral
from ..smp import *
from ..dataset import img_root_map
from abc import abstractmethod
class BaseModel:
INTERLEAVE = False
allowed_types = ['text', 'image', 'video']
def __init__(self):
self.dump_image_func = None
def use_custom_prompt(self, dataset):
"""Whether to use custom prompt for the given dataset.
Args:
dataset (str): The name of the dataset.
Returns:
bool: Whether to use custom prompt. If True, will call `build_prompt` of the VLM to build the prompt.
Default to False.
"""
return False
@abstractmethod
def build_prompt(self, line, dataset):
"""Build custom prompts for a specific dataset. Called only if `use_custom_prompt` returns True.
Args:
line (line of pd.DataFrame): The raw input line.
dataset (str): The name of the dataset.
Returns:
str: The built message.
"""
raise NotImplementedError
def set_dump_image(self, dump_image_func):
self.dump_image_func = dump_image_func
def dump_image(self, line, dataset):
return self.dump_image_func(line)
@abstractmethod
def generate_inner(self, message, dataset=None):
raise NotImplementedError
def check_content(self, msgs):
"""Check the content type of the input. Four types are allowed: str, dict, liststr, listdict.
"""
if isinstance(msgs, str):
return 'str'
if isinstance(msgs, dict):
return 'dict'
if isinstance(msgs, list):
types = [self.check_content(m) for m in msgs]
if all(t == 'str' for t in types):
return 'liststr'
if all(t == 'dict' for t in types):
return 'listdict'
return 'unknown'
def preproc_content(self, inputs):
"""Convert the raw input messages to a list of dicts.
Args:
inputs: raw input messages.
Returns:
list(dict): The preprocessed input messages. Will return None if failed to preprocess the input.
"""
if self.check_content(inputs) == 'str':
return [dict(type='text', value=inputs)]
elif self.check_content(inputs) == 'dict':
assert 'type' in inputs and 'value' in inputs
return [inputs]
elif self.check_content(inputs) == 'liststr':
res = []
for s in inputs:
mime, pth = parse_file(s)
if mime is None or mime == 'unknown':
res.append(dict(type='text', value=s))
else:
res.append(dict(type=mime.split('/')[0], value=pth))
return res
elif self.check_content(inputs) == 'listdict':
for item in inputs:
assert 'type' in item and 'value' in item
mime, s = parse_file(item['value'])
if mime is None:
assert item['type'] == 'text'
else:
assert mime.split('/')[0] == item['type']
item['value'] = s
return inputs
else:
return None
def generate(self, message, dataset=None):
"""Generate the output message.
Args:
message (list[dict]): The input message.
dataset (str, optional): The name of the dataset. Defaults to None.
Returns:
str: The generated message.
"""
assert self.check_content(message) in ['str', 'dict', 'liststr', 'listdict'], f'Invalid input type: {message}'
message = self.preproc_content(message)
assert message is not None and self.check_content(message) == 'listdict'
for item in message:
assert item['type'] in self.allowed_types, f'Invalid input type: {item["type"]}'
return self.generate_inner(message, dataset)
def chat(self, messages, dataset=None):
"""The main function for multi-turn chatting. Will call `chat_inner` with the preprocessed input messages."""
assert hasattr(self, 'chat_inner'), 'The API model should has the `chat_inner` method. '
for msg in messages:
assert isinstance(msg, dict) and 'role' in msg and 'content' in msg, msg
assert self.check_content(msg['content']) in ['str', 'dict', 'liststr', 'listdict'], msg
msg['content'] = self.preproc_content(msg['content'])
while len(messages):
try:
return self.chat_inner(messages, dataset=dataset)
except:
messages = messages[1:]
while len(messages) and messages[0]['role'] != 'user':
messages = messages[1:]
continue
return 'Chat Mode: Failed with all possible conversation turns.'
def message_to_promptimg(self, message, dataset=None):
assert not self.INTERLEAVE
model_name = self.__class__.__name__
warnings.warn(
f'Model {model_name} does not support interleaved input. '
'Will use the first image and aggregated texts as prompt. ')
num_images = len([x for x in message if x['type'] == 'image'])
if num_images == 0:
prompt = '\n'.join([x['value'] for x in message if x['type'] == 'text'])
image = None
else:
prompt = '\n'.join([x['value'] for x in message if x['type'] == 'text'])
images = [x['value'] for x in message if x['type'] == 'image']
if 'BLINK' == dataset:
image = concat_images_vlmeval(images, target_size=512)
else:
image = images[0]
return prompt, image
def message_to_promptvideo(self, message):
if self.VIDEO_LLM:
num_videos = len([x for x in message if x['type'] == 'video'])
if num_videos == 0:
prompt = '\n'.join([x['value'] for x in message if x['type'] == 'text'])
video = None
else:
prompt = '\n'.join([x['value'] for x in message if x['type'] == 'text'])
video = [x['value'] for x in message if x['type'] == 'video'][0]
return prompt, video
else:
import sys
warnings.warn('Model does not support video input.')
sys.exit(-1)
import torch
import transformers
from transformers import AutoModelForCausalLM, AutoTokenizer
from PIL import Image
import warnings
import re
from .base import BaseModel
from ..smp import *
from ..dataset import DATASET_TYPE
class BunnyLLama3(BaseModel):
INSTALL_REQ = False
INTERLEAVE = False
def __init__(self, model_path='BAAI/Bunny-v1_1-Llama-3-8B-V', **kwargs):
assert model_path is not None
transformers.logging.set_verbosity_error()
transformers.logging.disable_progress_bar()
warnings.filterwarnings('ignore')
self.tokenizer = AutoTokenizer.from_pretrained(model_path, trust_remote_code=True)
self.model = AutoModelForCausalLM.from_pretrained(model_path, device_map='auto', trust_remote_code=True)
self.kwargs = kwargs
def use_custom_prompt(self, dataset):
if listinstr(['MCQ', 'Y/N'], DATASET_TYPE(dataset)) or listinstr(['mathvista'], dataset.lower()):
return True
else:
return False
def build_prompt(self, line, dataset):
if dataset is None:
dataset = self.dataset
if isinstance(line, int):
line = self.data.iloc[line]
tgt_path = self.dump_image(line, dataset)
prompt = line['question']
if DATASET_TYPE(dataset) == 'MCQ':
if listinstr(['mmmu'], dataset.lower()):
hint = line['hint'] if ('hint' in line and not pd.isna(line['hint'])) else None
assert hint is None
question = line['question']
question = re.sub(r'<image (\d+)>', lambda x: x.group(0)[1:-1], question)
options = {
cand: line[cand]
for cand in string.ascii_uppercase
if cand in line and not pd.isna(line[cand])
}
options_prompt = '\n'
for key, item in options.items():
options_prompt += f'({key}) {item}\n'
prompt = question
if len(options):
prompt += options_prompt
prompt += "\nAnswer with the option's letter from the given choices directly."
else:
prompt += '\nAnswer the question using a single word or phrase.'
else:
hint = line['hint'] if ('hint' in line and not pd.isna(line['hint'])) else None
prompt = ''
if hint is not None:
prompt += f'{hint}\n'
question = line['question']
options = {
cand: line[cand]
for cand in string.ascii_uppercase
if cand in line and not pd.isna(line[cand])
}
options_prompt = '\n'
for key, item in options.items():
options_prompt += f'{key}. {item}\n'
prompt += question + options_prompt
if listinstr(['cn', 'ccbench'], dataset.lower()):
prompt += '请直接回答选项字母。'
else:
prompt += "Answer with the option's letter from the given choices directly."
elif DATASET_TYPE(dataset) == 'Y/N':
if listinstr(['mme'], dataset.lower()):
if not listinstr(
['code_reasoning', 'commonsense_reasoning', 'numerical_calculation', 'text_translation'],
line['category']):
prompt = prompt.replace(' Please answer yes or no.',
'\nAnswer the question using a single word or phrase.')
elif listinstr(['pope'], dataset.lower()):
prompt = prompt.replace(' Please answer yes or no.',
'\nAnswer the question using a single word or phrase.')
elif listinstr(['mathvista'], dataset.lower()):
match = re.search(r'Hint: (.*?)\nQuestion: (.*?)\n(Choices:\n(.*))?', prompt + '\n', re.DOTALL)
prompt = match.group(2)
if match.group(4) is not None:
prompt += '\n' + match.group(4).rstrip('\n')
prompt += '\n' + match.group(1)
else:
raise ValueError(
f"Bunny doesn't implement a custom prompt for {dataset}. It should use the default prompt, but didn't.")
msgs = []
if isinstance(tgt_path, list):
msgs.extend([dict(type='image', value=p) for p in tgt_path])
else:
msgs = [dict(type='image', value=tgt_path)]
msgs.append(dict(type='text', value=prompt))
return msgs
def generate_inner(self, message, dataset=None):
prompt, image_path = self.message_to_promptimg(message, dataset=dataset)
text = (f'A chat between a curious user and an artificial intelligence assistant. '
f"The assistant gives helpful, detailed, and polite answers to the user's questions. "
f'USER: <image>\n{prompt} ASSISTANT:')
text_chunks = [self.tokenizer(chunk).input_ids for chunk in text.split('<image>')]
input_ids = torch.tensor(text_chunks[0] + [-200] + text_chunks[1][1:], dtype=torch.long).unsqueeze(0)
image = Image.open(image_path).convert('RGB')
image_tensor = self.model.process_images([image], self.model.config).to(dtype=self.model.dtype)
output_ids = self.model.generate(input_ids, images=image_tensor, max_new_tokens=128, use_cache=True)[0]
response = self.tokenizer.decode(output_ids[input_ids.shape[1]:], skip_special_tokens=True)
return response
import torch
from PIL import Image
from .base import BaseModel
from ..smp import *
import warnings
IMAGE_TOKEN_INDEX = -200
DEFAULT_IMAGE_TOKEN = '<image>'
DEFAULT_IM_START_TOKEN = '<im_start>'
DEFAULT_IM_END_TOKEN = '<im_end>'
class Cambrian(BaseModel):
INSTALL_REQ = True
INTERLEAVE = False
def __init__(self, model_path='nyu-visionx/cambrian-8b', **kwargs):
assert model_path is not None
try:
from cambrian.conversation import conv_templates, SeparatorStyle
from cambrian.model.builder import load_pretrained_model
from cambrian.mm_utils import tokenizer_image_token, process_images, get_model_name_from_path
except:
warnings.warn('Please install cambrian from https://github.com/cambrian-mllm/cambrian.')
model_name = get_model_name_from_path(model_path)
tokenizer, model, image_processor, context_len = load_pretrained_model(
model_path,
None,
model_name,
device_map=None
)
if '8b' in model_path:
self.conv_mode = 'llama_3'
elif '13b' in model_path:
self.conv_mode = 'vicuna_v1'
else:
self.conv_mode = 'chatml_direct'
self.model_config = model.config
self.conv_templates = conv_templates
self.tokenizer_image_token = tokenizer_image_token
self.process_images = process_images
self.tokenizer = tokenizer
self.image_processor = image_processor
self.model = model.to('cuda')
def process(self, image, question):
if self.model_config.mm_use_im_start_end:
question = DEFAULT_IM_START_TOKEN + DEFAULT_IMAGE_TOKEN + DEFAULT_IM_END_TOKEN + '\n' + question
else:
question = DEFAULT_IMAGE_TOKEN + '\n' + question
conv = self.conv_templates[self.conv_mode].copy()
conv.append_message(conv.roles[0], question)
conv.append_message(conv.roles[1], None)
prompt = conv.get_prompt()
image_size = [image.size]
image_tensor = self.process_images([image], self.image_processor, self.model_config)
input_ids = self.tokenizer_image_token(prompt, self.tokenizer, IMAGE_TOKEN_INDEX, return_tensors='pt')
input_ids = input_ids.unsqueeze(0).cuda()
return input_ids, image_tensor, image_size, prompt
def generate_inner(self, message, dataset=None):
prompt, image_path = self.message_to_promptimg(message, dataset=dataset)
image = Image.open(image_path).convert('RGB')
input_ids, image_tensor, image_sizes, prompt = self.process(image, prompt)
input_ids = input_ids.to(device='cuda', non_blocking=True)
with torch.inference_mode():
output_ids = self.model.generate(
input_ids,
images=image_tensor,
image_sizes=image_sizes,
do_sample=False,
temperature=0,
num_beams=1,
max_new_tokens=512,
use_cache=True
)
outputs = self.tokenizer.batch_decode(output_ids, skip_special_tokens=True)[0].strip()
return outputs
import os.path as osp
import warnings
from .base import BaseModel
from ..smp import *
from PIL import Image
import torch
class Chameleon(BaseModel):
INSTALL_REQ = False
INTERLEAVE = True
def __init__(self, model_path='facebook/chameleon-7b', **kwargs):
try:
from transformers import ChameleonProcessor, ChameleonForConditionalGeneration
except:
warnings.warn('Please install the latest transformers.')
processor = ChameleonProcessor.from_pretrained(model_path)
model = ChameleonForConditionalGeneration.from_pretrained(model_path, torch_dtype=torch.bfloat16)
self.model = model.cuda().eval()
self.processor = processor
def generate_inner(self, message, dataset=None):
content, images = '', []
for x in message:
if x['type'] == 'text':
content += x['value']
elif x['type'] == 'image':
content += '<image>\n'
images.append(Image.open(x['value']))
inputs = self.processor(
text=[content],
images=images,
padding=True,
return_tensors='pt'
).to(device='cuda', dtype=torch.bfloat16)
generate_ids = self.model.generate(**inputs, max_new_tokens=512)
input_token_len = inputs.input_ids.shape[1]
text = self.processor.batch_decode(
generate_ids[:, input_token_len:],
skip_special_tokens=True,
clean_up_tokenization_spaces=False
)[0]
return text
import torch
from PIL import Image
from .base import BaseModel
from ..smp import *
from ..dataset import DATASET_TYPE
from transformers import AutoModelForCausalLM, LlamaTokenizer, AutoTokenizer
class GLM4v(BaseModel):
INSTALL_REQ = False
INTERLEAVE = False
def __init__(self, model_path='THUDM/glm-4v-9b', **kwargs):
assert model_path is not None
self.model_path = model_path
self.tokenizer = AutoTokenizer.from_pretrained(model_path, trust_remote_code=True)
self.model = AutoModelForCausalLM.from_pretrained(
model_path,
torch_dtype=torch.bfloat16,
low_cpu_mem_usage=True,
trust_remote_code=True
).to('cuda').eval()
gen_kwargs = {'max_length': 2048, 'do_sample': False}
gen_kwargs.update(kwargs)
self.kwargs = gen_kwargs
self.end_text_token = '<|endoftext|>'
def generate_inner(self, message, dataset=None):
prompt, image_path = self.message_to_promptimg(message, dataset=dataset)
image = Image.open(image_path).convert('RGB')
if dataset is not None and DATASET_TYPE(dataset) in ['MCQ', 'Y/N']:
prompt += '\nShort Answer.'
inputs = self.tokenizer.apply_chat_template(
[{'role': 'user', 'image': image, 'content': prompt}],
add_generation_prompt=True, tokenize=True, return_tensors='pt', return_dict=True
)
inputs = inputs.to('cuda')
with torch.no_grad():
outputs = self.model.generate(**inputs, **self.kwargs)
outputs = outputs[:, inputs['input_ids'].shape[1]:]
response = self.tokenizer.decode(outputs[0])
return response.split(self.end_text_token)[0]
class CogVlm(BaseModel):
INSTALL_REQ = False
INTERLEAVE = False
def __init__(self, model_path='THUDM/cogvlm2-llama3-chat-19B', tokenizer_name=None, **kwargs):
assert model_path is not None
model = AutoModelForCausalLM.from_pretrained(
model_path,
torch_dtype=torch.bfloat16,
trust_remote_code=True,
).to('cuda').eval()
self.kwargs = kwargs
if tokenizer_name:
tokenizer = LlamaTokenizer.from_pretrained(tokenizer_name)
gen_kwargs = {'max_length': 2048, 'do_sample': False}
self.end_text_token = '</s>'
else:
tokenizer = AutoTokenizer.from_pretrained(model_path, trust_remote_code=True)
gen_kwargs = {'max_new_tokens': 2048, 'pad_token_id': 128002}
self.end_text_token = '<|end_of_text|>'
self.kwargs.update(gen_kwargs)
self.tokenizer = tokenizer
self.model = model
def use_custom_prompt(self, dataset):
assert dataset is not None
if DATASET_TYPE(dataset) == 'MCQ':
return True
return False
def build_prompt(self, line, dataset=None):
assert dataset is None or isinstance(dataset, str)
assert self.use_custom_prompt(dataset)
tgt_path = self.dump_image(line, dataset)
if dataset is not None and DATASET_TYPE(dataset) == 'MCQ':
question = line['question']
hint = line['hint'] if ('hint' in line and not pd.isna(line['hint'])) else None
if hint is not None:
question = hint + '\n' + question
option_candidate = string.ascii_uppercase
options = {
cand: line[cand]
for cand in option_candidate
if cand in line and not pd.isna(line[cand])
}
for key, item in options.items():
question += f'\n{key}. {item}'
prompt = question
if not cn_string(prompt):
prompt = prompt + '\n' + "Answer with the option's letter from the given choices directly."
else:
prompt = prompt + '\n' + '请直接回答选项字母。'
else:
prompt = line['question']
message = [dict(type='text', value=prompt)]
message.extend([dict(type='image', value=p) for p in tgt_path])
return message
def generate_inner(self, message, dataset=None):
prompt, image_path = self.message_to_promptimg(message, dataset=dataset)
if dataset is not None and DATASET_TYPE(dataset) in ['MCQ', 'Y/N']:
prompt += '\nShort Answer.'
image = Image.open(image_path).convert('RGB')
inputs = self.model.build_conversation_input_ids(
self.tokenizer, query=prompt, history=[], images=[image]) # chat mode
inputs = {
'input_ids': inputs['input_ids'].unsqueeze(0).to('cuda'),
'token_type_ids': inputs['token_type_ids'].unsqueeze(0).to('cuda'),
'attention_mask': inputs['attention_mask'].unsqueeze(0).to('cuda'),
'images': [[inputs['images'][0].to('cuda').to(torch.bfloat16)]],
}
with torch.no_grad():
outputs = self.model.generate(**inputs, **self.kwargs)
outputs = outputs[:, inputs['input_ids'].shape[1]:]
response = self.tokenizer.decode(outputs[0])
response = response.split(self.end_text_token)[0].strip()
return response
import sys
import torch
from transformers import AutoModelForCausalLM
import warnings
from .base import BaseModel
class DeepSeekVL(BaseModel):
INSTALL_REQ = True
INTERLEAVE = True
def check_install(self):
try:
import deepseek_vl
except ImportError:
warnings.warn(
'Please first install deepseek_vl from source codes in: https://github.com/deepseek-ai/DeepSeek-VL')
sys.exit(-1)
def __init__(self, model_path='deepseek-ai/deepseek-vl-1.3b-chat', **kwargs):
self.check_install()
assert model_path is not None
self.model_path = model_path
from deepseek_vl.models import VLChatProcessor
self.vl_chat_processor = VLChatProcessor.from_pretrained(model_path)
self.tokenizer = self.vl_chat_processor.tokenizer
model = AutoModelForCausalLM.from_pretrained(model_path, trust_remote_code=True)
self.model = model.to(torch.bfloat16).cuda().eval()
torch.cuda.empty_cache()
default_kwargs = dict(max_new_tokens=512, do_sample=False, use_cache=True)
default_kwargs.update(kwargs)
self.kwargs = default_kwargs
warnings.warn(f'Following kwargs received: {self.kwargs}, will use as generation config. ')
def prepare_inputs(self, message):
def prepare_itlist(msgs):
content, images = '', []
for s in msgs:
if s['type'] == 'image':
images.append(s['value'])
content += '<image_placeholder>'
elif s['type'] == 'text':
content += s['value']
return content, images
conversation = []
if 'role' not in message[0]:
content, images = prepare_itlist(message)
conversation.append(dict(role='User', content=content, images=images))
else:
role_map = {'user': 'User', 'assistant': 'Assistant'}
for msgs in message:
role = role_map[msgs['role']]
content, images = prepare_itlist(msgs['content'])
conversation.append(dict(role=role, content=content, images=images))
conversation.append(dict(role='Assistant', content=''))
return conversation
def generate_inner(self, message, dataset=None):
conversation = self.prepare_inputs(message)
from deepseek_vl.utils.io import load_pil_images
pil_images = load_pil_images(conversation)
prepare_inputs = self.vl_chat_processor(conversations=conversation, images=pil_images, force_batchify=True)
prepare_inputs = prepare_inputs.to(self.model.device)
inputs_embeds = self.model.prepare_inputs_embeds(**prepare_inputs)
outputs = self.model.language_model.generate(
inputs_embeds=inputs_embeds,
attention_mask=prepare_inputs.attention_mask,
pad_token_id=self.tokenizer.eos_token_id,
bos_token_id=self.tokenizer.bos_token_id,
eos_token_id=self.tokenizer.eos_token_id,
**self.kwargs)
answer = self.tokenizer.decode(outputs[0].cpu().tolist(), skip_special_tokens=True)
return answer
def chat_inner(self, message, dataset=None):
return self.generate_inner(message, dataset=dataset)
import torch
from PIL import Image
from abc import abstractproperty
import sys
import os.path as osp
from .base import BaseModel
from ..smp import *
from ..dataset import DATASET_TYPE
import copy
class Eagle(BaseModel):
INSTALL_REQ = True
INTERLEAVE = True
def __init__(self,
model_path='NVEagle/Eagle-X5-7B',
**kwargs):
try:
from eagle.model.builder import load_pretrained_model
from eagle.utils import disable_torch_init
from eagle.mm_utils import get_model_name_from_path
except:
warnings.warn('''Please install eagle before using Eagle,
you can install it from "https://github.com/NVlabs/EAGLE.git"''')
sys.exit(-1)
warnings.warn('Please install the latest version of eagle from github before you evaluate the Eagle model.')
assert osp.exists(model_path) or splitlen(model_path) == 2
model_name = get_model_name_from_path(model_path)
self.tokenizer, self.model, self.image_processor, self.context_len = (
load_pretrained_model(model_path, None, model_name, False, False, device='cpu')
)
self.model.cuda().eval()
self.conv_mode = 'vicuna_v1'
default_kwargs = dict(
do_sample=True,
temperature=0.2,
top_p=0.5,
num_beams=1,
max_new_tokens=512,
use_cache=True
)
default_kwargs.update(kwargs)
self.kwargs = default_kwargs
warnings.warn(f'Following kwargs received: {self.kwargs}, will use as generation config. ')
torch.cuda.empty_cache()
def generate_inner(self, message, dataset=None):
try:
from eagle import conversation as conversation_lib
from eagle.constants import (IMAGE_TOKEN_INDEX, DEFAULT_IMAGE_TOKEN,
DEFAULT_IM_START_TOKEN, DEFAULT_IM_END_TOKEN)
from eagle.conversation import conv_templates, SeparatorStyle
from eagle.mm_utils import tokenizer_image_token, process_images, KeywordsStoppingCriteria
except:
warnings.warn('''Please install eagle before using Eagle,
you can install it from "https://github.com/NVlabs/EAGLE.git"''')
sys.exit(-1)
kwargs = {}
if dataset is not None:
kwargs = self.kwargs
images = []
prompt = ''
for s in message:
if s['type'] == 'image':
images.append(s['value'])
elif s['type'] == 'text':
prompt += s['value']
DEFAULT_IMAGE_TOKEN = DEFAULT_IMAGE_TOKEN * len(images)
if self.model.config.mm_use_im_start_end:
prompt = DEFAULT_IM_START_TOKEN + DEFAULT_IMAGE_TOKEN + DEFAULT_IM_END_TOKEN + '\n' + prompt
else:
prompt = DEFAULT_IMAGE_TOKEN + '\n' + prompt
conv = conv_templates[self.conv_mode].copy()
conv.append_message(conv.roles[0], prompt)
conv.append_message(conv.roles[1], None)
prompt = conv.get_prompt()
images = [Image.open(s).convert('RGB') for s in images]
image_tensor = process_images(images, self.image_processor, self.model.config)
input_ids = tokenizer_image_token(prompt, self.tokenizer, IMAGE_TOKEN_INDEX, return_tensors='pt')
input_ids = input_ids.to(device='cuda', non_blocking=True)
image_tensor = image_tensor.to(dtype=torch.float16, device='cuda', non_blocking=True)
with torch.inference_mode():
output_ids = self.model.generate(
input_ids.unsqueeze(0),
images=image_tensor,
image_sizes=[img.size for img in images],
**kwargs
)
outputs = self.tokenizer.batch_decode(output_ids, skip_special_tokens=True)[0].strip()
return outputs
def use_custom_prompt(self, dataset):
assert dataset is not None
if listinstr(['MMMU'], dataset):
return False
if DATASET_TYPE(dataset) == 'MCQ' or dataset == 'MMVet':
return True
return False
def build_prompt(self, line, dataset=None):
assert dataset is None or isinstance(dataset, str)
assert self.use_custom_prompt(dataset)
tgt_path = self.dump_image(line, dataset)
question = line['question']
if dataset == 'MMVet':
prompt = question + '\nAnswer the question directly. '
elif DATASET_TYPE(dataset) == 'MCQ':
options = {
cand: line[cand]
for cand in string.ascii_uppercase
if cand in line and not pd.isna(line[cand])
}
options_prompt = ''
for key, item in options.items():
options_prompt += f'{key}. {item}\n'
hint = line['hint'] if ('hint' in line and not pd.isna(line['hint'])) else None
prompt = f'Hint: {hint}\n' if hint is not None else ''
prompt += f'{question}\n'
prompt += (
f'{options_prompt}\nAnswer with the option’s letter from the given choices directly. '
if len(options) else 'Answer the question directly. '
)
else:
raise NotImplementedError
message = [dict(type='text', value=prompt)]
message.extend([dict(type='image', value=s) for s in tgt_path])
return message
import os
import torch
from PIL import Image
import os.path as osp
from .base import BaseModel
from ..smp import *
class Emu(BaseModel):
INSTALL_REQ = False
INTERLEAVE = True
def __init__(self,
model_path='BAAI/Emu2-Chat',
**kwargs):
self.model_path = model_path
assert osp.exists(model_path) or splitlen(model_path) == 2
from transformers import AutoModelForCausalLM, AutoTokenizer
from accelerate import init_empty_weights, infer_auto_device_map, dispatch_model
local_rank = os.environ.get('LOCAL_RANK', 0)
device_num = torch.cuda.device_count()
assert local_rank * 2 <= device_num, 'The number of devices does not match the world size'
assert device_num >= 2, 'You need at least 2 GPUs to use EMU'
device_1 = local_rank
device_2 = local_rank + device_num // 2
torch.cuda.set_device(device_1)
torch.cuda.set_device(device_2)
tokenizer = AutoTokenizer.from_pretrained(model_path) # "BAAI/Emu2-Chat"
self.tokenizer = tokenizer
with init_empty_weights():
model = AutoModelForCausalLM.from_pretrained(
model_path, # "BAAI/Emu2-Chat"
torch_dtype=torch.bfloat16,
low_cpu_mem_usage=True,
trust_remote_code=True)
device_map = infer_auto_device_map(
model,
max_memory={
device_1: '38GiB',
device_2: '38GiB'
},
no_split_module_classes=['Block', 'LlamaDecoderLayer'])
# input and output logits should be on same device
device_map['model.decoder.lm.lm_head'] = device_1
model = dispatch_model(
model,
device_map=device_map).eval()
self.model = model
kwargs_default = dict(max_new_tokens=512, length_penalty=-1)
kwargs_default.update(kwargs)
self.kwargs = kwargs_default
warnings.warn(f'Following kwargs received: {self.kwargs}, will use as generation config. ')
def generate_inner(self, message, dataset=None):
query, images = '', []
for item in message:
if item['type'] == 'image':
images.append(Image.open(item['value']).convert('RGB'))
query += '[<IMG_PLH>]'
elif item['type'] == 'text':
query += item['value']
inputs = self.model.build_input_ids(
text=[query],
tokenizer=self.tokenizer,
image=images
)
with torch.no_grad():
outputs = self.model.generate(
input_ids=inputs['input_ids'],
attention_mask=inputs['attention_mask'],
image=inputs['image'].to(torch.bfloat16),
**self.kwargs)
output_text = self.tokenizer.batch_decode(outputs, skip_special_tokens=True)
return output_text[0]
import torch
import os.path as osp
import warnings
from .base import BaseModel
from ..smp import splitlen
from PIL import Image
from transformers import AutoProcessor, AutoModelForVision2Seq
from transformers.image_utils import load_image
class IDEFICS(BaseModel):
INSTALL_REQ = False
INTERLEAVE = True
def __init__(self, model_path='HuggingFaceM4/idefics-9b-instruct', **kwargs):
assert osp.exists(model_path) or splitlen(model_path) == 2
from transformers import IdeficsForVisionText2Text, AutoProcessor
self.model = IdeficsForVisionText2Text.from_pretrained(
model_path, torch_dtype=torch.bfloat16, device_map='auto'
)
self.processor = AutoProcessor.from_pretrained(model_path)
kwargs_default = {'max_new_tokens': 512}
kwargs_default.update(kwargs)
self.kwargs = kwargs_default
self.file_root = osp.dirname(__file__)
warnings.warn(
f'Following kwargs received: {self.kwargs}, will use as generation config. '
)
def generate_inner(self, message, dataset=None):
prompts = (
['Users:']
+ [msg['value'] if msg['type'] == 'text' else Image.open(msg['value']) for msg in message]
+ ['<end_of_utterance>', '\nAssistant: ']
)
inputs = self.processor(
prompts, add_end_of_utterance_token=False, return_tensors='pt'
).to('cuda')
exit_condition = self.processor.tokenizer(
'<end_of_utterance>', add_special_tokens=False
).input_ids
bad_words_ids = self.processor.tokenizer(
['<image>', '<fake_token_around_image>'], add_special_tokens=False
).input_ids
generated_ids = self.model.generate(
**inputs,
eos_token_id=exit_condition,
bad_words_ids=bad_words_ids,
**self.kwargs,
)
generated_text = self.processor.batch_decode(
generated_ids, skip_special_tokens=True
)
text = generated_text[0].split('\nAssistant: ')[-1]
return text
class IDEFICS2(BaseModel):
INSTALL_REQ = True
INTERLEAVE = True
def __init__(self, model_path='HuggingFaceM4/idefics2-8b', **kwargs):
assert model_path is not None
self.model_path = model_path
if 'Idefics3' in self.model_path.lower():
warnings.warn('Install transfomers from source: PR https://github.com/open-compass/VLMEvalKit/pull/379')
warnings.warn('Reference: https://huggingface.co/HuggingFaceM4/Idefics3-8B-Llama3')
self.processor = AutoProcessor.from_pretrained(model_path)
model = AutoModelForVision2Seq.from_pretrained(
model_path,
torch_dtype=torch.bfloat16,
_attn_implementation='flash_attention_2',
device_map='cpu')
self.model = model.to('cuda')
kwargs_default = {'max_new_tokens': 1024}
kwargs_default.update(kwargs)
self.kwargs = kwargs_default
warnings.warn(
f'Following kwargs received: {self.kwargs}, will use as generation config. '
)
torch.cuda.empty_cache()
def _process(self, formatted_messages, formatted_images):
inputs = self.processor(
text=formatted_messages, images=formatted_images, return_tensors='pt'
)
inputs = {k: v.to(self.model.device) for k, v in inputs.items()}
return inputs
def build_prompt_default(self, message, add_brief=False, add_yes_or_no=False):
prompt, images = 'User:', []
for msg in message:
if msg['type'] == 'image':
img = load_image(msg['value'])
images.append(img)
prompt += '<image>'
elif msg['type'] == 'text':
prompt += msg['value'].strip()
if add_brief:
prompt += '\nGive a very brief answer.'
if add_yes_or_no:
prompt += '\nAnswer yes or no.'
prompt += '<end_of_utterance>\nAssistant:'
return prompt, images
def build_prompt_puremcq(self, message):
replace_mapping = {
'\nOptions:': '\nChoices:',
'Please select the correct answer from the options above.': 'Answer with the letter.',
}
prompt, images = 'User:', []
for msg in message:
if msg['type'] == 'image':
img = load_image(msg['value'])
images.append(img)
prompt += '<image>'
elif msg['type'] == 'text':
instruction = msg['value'].strip()
for k, v in replace_mapping.items():
instruction = instruction.replace(k, v)
prompt += instruction
prompt += '<end_of_utterance>\nAssistant: Answer:'
return prompt, images
def build_prompt_mt(self, message):
prompt, images = '', []
for msg in message:
if msg['role'] == 'user':
prompt += 'User: '
elif msg['role'] == 'assistant':
prompt += 'Assistant: '
for item in msg['content']:
if item['type'] == 'image':
img = load_image(item['value'])
images.append(img)
prompt += '<image>'
elif item['type'] == 'text':
prompt += item['value'].strip()
prompt += '<end_of_utterance>\n'
return prompt + 'Assistant: '
def build_prompt_mmbench(self, message):
replace_mapping = {
'\nOptions:': '\nChoices:',
'Please select the correct answer from the options above.': 'Answer with a letter.',
}
prompt, images = 'User:', []
for msg in message:
if msg['type'] == 'image':
img = load_image(msg['value'])
images.append(img)
prompt += '<image>'
elif msg['type'] == 'text':
instruction = msg['value'].strip()
for k, v in replace_mapping.items():
instruction = instruction.replace(k, v)
# Swap hint and question
if instruction.startswith('Hint:'):
hint, question = instruction.split('\nQuestion:')
question, choices = question.split('\nChoices:')
instruction = (
'Question:' + question + '\n' + hint + '\nChoices:' + choices
)
prompt += instruction
prompt += '<end_of_utterance>\nAssistant: Answer:'
return prompt, images
def build_prompt_mmmu(self, message):
replace_mapping = {
'Question:': '',
'Please select the correct answer from the options above.': 'Answer with the letter.',
'\nOptions:': '\nChoices:',
}
prompt, images, img_counter = 'User: Question: ', [], 1
for msg in message:
if msg['type'] == 'image':
prompt += f'<image {img_counter}>:<image>\n'
img_counter += 1
img_counter = 1
for msg in message:
if msg['type'] == 'image':
img = load_image(msg['value'])
images.append(img)
prompt += f' <image {img_counter}> '
img_counter += 1
elif msg['type'] == 'text':
instruction = msg['value'].strip()
for k, v in replace_mapping.items():
instruction = instruction.replace(k, v)
prompt += instruction.strip()
prompt += '<end_of_utterance>\nAssistant:'
if 'A.' in prompt and 'B.' in prompt:
prompt += ' Answer:'
return prompt, images
def build_prompt_mathvista(self, message):
replace_mapping = {
'(A) ': 'A. ',
'(B) ': 'B. ',
'(C) ': 'C. ',
'(D) ': 'D. ',
'(E) ': 'E. ',
'(F) ': 'F. ',
'(G) ': 'G. ',
'(H) ': 'H. ',
'\nOptions:': '\nChoices:',
'Hint: ': '',
}
prompt, images = 'User:', []
for msg in message:
if msg['type'] == 'image':
img = load_image(msg['value'])
images.append(img)
prompt += '<image>'
elif msg['type'] == 'text':
instruction = msg['value'].strip()
for k, v in replace_mapping.items():
instruction = instruction.replace(k, v)
prompt += instruction.strip()
if 'A.' in prompt and 'B.' in prompt:
prompt += '\nAnswer with the letter.'
prompt += '<end_of_utterance>\nAssistant:'
if 'A.' in prompt and 'B.' in prompt:
prompt += ' Answer:'
return prompt, images
def chat_inner(self, message, dataset=None):
formatted_messages, formatted_images = self.build_prompt_mt(message)
inputs = self._process(formatted_messages, formatted_images)
generated_ids = self.model.generate(**inputs, **self.kwargs)
generated_text = self.processor.batch_decode(
generated_ids[:, inputs['input_ids'].size(1):], skip_special_tokens=True
)[0]
response = generated_text.strip()
# print(dataset, " | ", formatted_messages.replace("\n", "\\n"), " | ", response.replace("\n", "\\n"))
return response
def generate_inner(self, message, dataset=None):
if dataset in [
'MMBench_DEV_EN', 'MMBench_DEV_EN_V11',
'MMBench_TEST_EN', 'MMBench_TEST_EN_V11',
'MMBench_DEV_CN', 'MMBench_DEV_CN_V11',
'MMBench_TEST_CN', 'MMBench_TEST_CN_V11',
'MMBench', 'MMBench_V11', 'MMBench_CN', 'MMBench_CN_V11'
]:
formatted_messages, formatted_images = self.build_prompt_mmbench(message)
elif dataset in ['MMMU_DEV_VAL', 'MMMU_TEST']:
formatted_messages, formatted_images = self.build_prompt_mmmu(message)
elif dataset in ['MathVista_MINI']:
formatted_messages, formatted_images = self.build_prompt_mathvista(message)
elif dataset in [
'MME',
'MMVet',
'OCRVQA_TEST',
'OCRVQA_TESTCORE',
'TextVQA_VAL',
'ChartQA_TEST',
'DocVQA_VAL',
'DocVQA_TEST',
'InfoVQA_VAL',
'InfoVQA_TEST',
]:
formatted_messages, formatted_images = self.build_prompt_default(
message, add_brief=True
)
elif dataset == 'HallusionBench':
formatted_messages, formatted_images = self.build_prompt_default(
message, add_yes_or_no=True
)
elif dataset in [
'MMStar',
'SEEDBench_IMG',
'AI2D_TEST',
'ScienceQA_VAL',
'ScienceQA_TEST',
]:
formatted_messages, formatted_images = self.build_prompt_puremcq(message)
else:
formatted_messages, formatted_images = self.build_prompt_default(message)
inputs = self._process(formatted_messages, formatted_images)
generated_ids = self.model.generate(**inputs, **self.kwargs)
generated_text = self.processor.batch_decode(
generated_ids[:, inputs['input_ids'].size(1):], skip_special_tokens=True
)[0]
response = generated_text.strip()
# print(dataset, " | ", formatted_messages.replace("\n", "\\n"), " | ", response.replace("\n", "\\n"))
return response
import torch
from PIL import Image
import os.path as osp
import sys
from .base import BaseModel
from ..smp import *
class InstructBLIP(BaseModel):
INSTALL_REQ = True
INTERLEAVE = False
def __init__(self, name):
self.config_map = {
'instructblip_7b': 'misc/blip2_instruct_vicuna7b.yaml',
'instructblip_13b': 'misc/blip2_instruct_vicuna13b.yaml',
}
self.file_path = __file__
config_root = osp.dirname(self.file_path)
try:
from lavis.models import load_preprocess
from omegaconf import OmegaConf
from lavis.common.registry import registry
except:
warnings.warn('Please install lavis before using InstructBLIP. ')
sys.exit(-1)
assert name in self.config_map
cfg_path = osp.join(config_root, self.config_map[name])
cfg = OmegaConf.load(cfg_path)
model_cfg = cfg.model
assert osp.exists(model_cfg.llm_model) or splitlen(model_cfg.llm_model) == 2
model_cls = registry.get_model_class(name='blip2_vicuna_instruct')
model = model_cls.from_config(model_cfg)
model.eval()
self.device = torch.device('cuda') if torch.cuda.is_available() else 'cpu'
device = self.device
model.to(device)
self.model = model
self.kwargs = {'max_length': 512}
preprocess_cfg = cfg.preprocess
vis_processors, _ = load_preprocess(preprocess_cfg)
self.vis_processors = vis_processors
def generate_inner(self, message, dataset=None):
prompt, image_path = self.message_to_promptimg(message, dataset=dataset)
vis_processors = self.vis_processors
raw_image = Image.open(image_path).convert('RGB')
image_tensor = vis_processors['eval'](raw_image).unsqueeze(0).to(self.device)
outputs = self.model.generate(dict(image=image_tensor, prompt=prompt))
return outputs[0]
import torch
from transformers import AutoTokenizer, AutoConfig, AutoModel, CLIPImageProcessor
import warnings
from PIL import Image
from .base import BaseModel
from ..smp import *
from ..dataset import DATASET_TYPE
import pandas as pd
import string
import torch.distributed as dist
import torchvision.transforms as T
import transformers
from torchvision.transforms.functional import InterpolationMode
import re
IMAGENET_MEAN = (0.485, 0.456, 0.406)
IMAGENET_STD = (0.229, 0.224, 0.225)
def build_transform(input_size):
MEAN, STD = IMAGENET_MEAN, IMAGENET_STD
transform = T.Compose([
T.Lambda(lambda img: img.convert('RGB') if img.mode != 'RGB' else img),
T.Resize((input_size, input_size), interpolation=InterpolationMode.BICUBIC),
T.ToTensor(),
T.Normalize(mean=MEAN, std=STD)
])
return transform
def find_closest_aspect_ratio(aspect_ratio, target_ratios, width, height, image_size):
best_ratio_diff = float('inf')
best_ratio = (1, 1)
area = width * height
for ratio in target_ratios:
target_aspect_ratio = ratio[0] / ratio[1]
ratio_diff = abs(aspect_ratio - target_aspect_ratio)
if ratio_diff < best_ratio_diff:
best_ratio_diff = ratio_diff
best_ratio = ratio
elif ratio_diff == best_ratio_diff:
if area > 0.5 * image_size * image_size * ratio[0] * ratio[1]:
best_ratio = ratio
return best_ratio
def dynamic_preprocess(image, min_num=1, max_num=6, image_size=448, use_thumbnail=False):
orig_width, orig_height = image.size
aspect_ratio = orig_width / orig_height
# calculate the existing image aspect ratio
target_ratios = set(
(i, j) for n in range(min_num, max_num + 1) for i in range(1, n + 1) for j in range(1, n + 1) if
i * j <= max_num and i * j >= min_num)
target_ratios = sorted(target_ratios, key=lambda x: x[0] * x[1])
# find the closest aspect ratio to the target
target_aspect_ratio = find_closest_aspect_ratio(
aspect_ratio, target_ratios, orig_width, orig_height, image_size)
# calculate the target width and height
target_width = image_size * target_aspect_ratio[0]
target_height = image_size * target_aspect_ratio[1]
blocks = target_aspect_ratio[0] * target_aspect_ratio[1]
# resize the image
resized_img = image.resize((target_width, target_height))
processed_images = []
for i in range(blocks):
box = (
(i % (target_width // image_size)) * image_size,
(i // (target_width // image_size)) * image_size,
((i % (target_width // image_size)) + 1) * image_size,
((i // (target_width // image_size)) + 1) * image_size
)
# split the image
split_img = resized_img.crop(box)
processed_images.append(split_img)
assert len(processed_images) == blocks
if use_thumbnail and len(processed_images) != 1:
thumbnail_img = image.resize((image_size, image_size))
processed_images.append(thumbnail_img)
return processed_images
def load_image(image_file, input_size=448, max_num=6, upscale=False):
image = Image.open(image_file).convert('RGB')
if upscale:
image = image.resize((image.width * 2, image.height * 2), Image.BILINEAR)
transform = build_transform(input_size=input_size)
images = dynamic_preprocess(image, image_size=input_size, use_thumbnail=True, max_num=max_num)
pixel_values = [transform(image) for image in images]
pixel_values = torch.stack(pixel_values)
return pixel_values
# This function is used to split InternVL2-Llama3-76B
def split_model(model_name):
import math
device_map = {}
num_gpus = torch.cuda.device_count()
rank, world_size = get_rank_and_world_size()
num_gpus = num_gpus // world_size
num_layers = {'InternVL2-8B': 32, 'InternVL2-26B': 48,
'InternVL2-40B': 60, 'InternVL2-Llama3-76B': 80}[model_name]
# Since the first GPU will be used for ViT, treat it as 0.8 GPU.
num_layers_per_gpu = math.ceil(num_layers / (num_gpus - 0.2))
num_layers_per_gpu = [num_layers_per_gpu] * num_gpus
num_layers_per_gpu[0] = math.ceil(num_layers_per_gpu[0] * 0.8)
layer_cnt = 0
for i, num_layer in enumerate(num_layers_per_gpu):
for j in range(num_layer):
device_map[f'language_model.model.layers.{layer_cnt}'] = rank + world_size * i
layer_cnt += 1
device_map['vision_model'] = rank
device_map['mlp1'] = rank
device_map['language_model.model.tok_embeddings'] = rank
device_map['language_model.model.embed_tokens'] = rank
device_map['language_model.output'] = rank
device_map['language_model.model.norm'] = rank
device_map['language_model.lm_head'] = rank
device_map[f'language_model.model.layers.{num_layers - 1}'] = rank
return device_map
class InternVLChat(BaseModel):
INSTALL_REQ = False
INTERLEAVE = True
def __init__(self, model_path='OpenGVLab/InternVL-Chat-V1-5', load_in_8bit=False, version='V1.0', **kwargs):
assert model_path is not None
assert version_cmp(transformers.__version__, '4.36.2', 'ge')
self.model_path = model_path
self.tokenizer = AutoTokenizer.from_pretrained(model_path, trust_remote_code=True, use_fast=False)
# Regular expression to match the pattern 'Image' followed by a number, e.g. Image1
self.pattern = r'Image(\d+)'
# Replacement pattern to insert a hyphen between 'Image' and the number, e.g. Image-1
self.replacement = r'Image-\1'
# Convert InternVL2 response to dataset format
# e.g. Image1 -> Image-1
# Regular expression to match the pattern 'Image-' followed by a number
self.reverse_pattern = r'Image-(\d+)'
# Replacement pattern to remove the hyphen (Image-1 -> Image1)
self.reverse_replacement = r'Image\1'
if listinstr(['InternVL2-Llama3-76B'], model_path):
device_map = split_model(model_path.split('/')[-1])
self.device = 'cuda'
self.model = AutoModel.from_pretrained(
model_path,
torch_dtype=torch.bfloat16,
load_in_8bit=load_in_8bit,
trust_remote_code=True,
low_cpu_mem_usage=True,
device_map=device_map).eval()
else:
device = torch.cuda.current_device()
self.device = device
self.model = AutoModel.from_pretrained(
model_path,
torch_dtype=torch.bfloat16,
trust_remote_code=True,
load_in_8bit=load_in_8bit).eval()
if not load_in_8bit:
self.model = self.model.to(device)
self.image_size = self.model.config.vision_config.image_size
self.version = version
kwargs_default = dict(do_sample=False, max_new_tokens=1024, top_p=None, num_beams=1)
kwargs_default.update(kwargs)
self.kwargs = kwargs_default
warnings.warn(f'Following kwargs received: {self.kwargs}, will use as generation config. ')
def use_custom_prompt(self, dataset):
assert dataset is not None
if listinstr(['MMDU', 'MME-RealWorld', 'MME-RealWorld-CN'], dataset):
# For Multi-Turn we don't have custom prompt
return False
if listinstr(['MMBench-Video', 'Video-MME', 'MVBench', 'Video'], dataset):
# For Video benchmarks we don't have custom prompt at here
return False
else:
return True
def build_multi_choice_prompt(self, line, dataset=None):
question = line['question']
hint = line['hint'] if ('hint' in line and not pd.isna(line['hint'])) else None
if hint is not None:
question = hint + '\n' + question
options = {
cand: line[cand]
for cand in string.ascii_uppercase
if cand in line and not pd.isna(line[cand])
}
for key, item in options.items():
question += f'\n{key}. {item}'
prompt = question
if len(options):
prompt += '\n请直接回答选项字母。' if cn_string(
prompt) else "\nAnswer with the option's letter from the given choices directly."
else:
prompt += '\n请直接回答问题。' if cn_string(prompt) else '\nAnswer the question directly.'
return prompt
def build_video_prompt(self, prompt, dataset=None, max_frames=64):
for start in range(0, max_frames, 8):
images_to_remove = ''.join([f'<Image-{i}>' for i in range(start + 1, start + 9)])
prompt = prompt.replace(images_to_remove, '')
for i in range(max_frames):
prompt = prompt.replace(f'Image-{i + 1}', f'Frame-{i + 1}')
if listinstr(['MMBench-Video'], dataset):
prompt = prompt.replace('\nAnswer:', '')
elif listinstr(['Video-MME'], dataset):
prompt = prompt.replace('\nAnswer:', '')
prompt += "\nAnswer with the option's letter from the given choices directly."
elif listinstr(['MVBench'], dataset):
prompt = prompt.replace('Best option:(', '')
return prompt
def build_prompt(self, line, dataset=None):
assert self.use_custom_prompt(dataset)
assert dataset is None or isinstance(dataset, str)
tgt_path = self.dump_image(line, dataset)
if self.version == 'V1.1':
kwargs_default = dict(do_sample=False, max_new_tokens=1024, top_p=None, num_beams=5)
else:
kwargs_default = dict(do_sample=False, max_new_tokens=1024, top_p=None, num_beams=1)
self.kwargs = kwargs_default
if dataset is not None and DATASET_TYPE(dataset) == 'Y/N':
question = line['question']
if listinstr(['MME'], dataset):
prompt = question + ' Answer the question using a single word or phrase.'
elif listinstr(['HallusionBench'], dataset):
prompt = question + ' Please answer yes or no. Answer the question using a single word or phrase.'
elif dataset is not None and DATASET_TYPE(dataset) == 'MCQ':
prompt = self.build_multi_choice_prompt(line, dataset)
elif dataset is not None and DATASET_TYPE(dataset) == 'VQA':
question = line['question']
if listinstr(['MathVista', 'MathVision', 'VCR', 'MTVQA', 'MMVet', 'MathVerse'], dataset):
prompt = question
elif listinstr(['LLaVABench'], dataset):
prompt = question + '\nAnswer this question in detail.'
else:
prompt = question + '\nAnswer the question using a single word or phrase.'
else:
prompt = line['question']
message = [dict(type='text', value=prompt)]
message.extend([dict(type='image', value=s) for s in tgt_path])
return message
def set_max_num(self, dataset):
assert dataset is not None
res_1_datasets = ['MMBench-Video', 'Video-MME', 'MVBench', 'Video']
res_12_datasets = ['ChartQA_TEST', 'MMMU_DEV_VAL', 'MMMU_TEST', 'MME-RealWorld',
'MME-RealWorld', 'VCR_EN', 'VCR_ZH']
res_18_datasets = ['DocVQA_VAL', 'DocVQA_TEST']
res_24_datasets = ['InfoVQA_VAL', 'InfoVQA_TEST', 'OCRBench', 'HRBench4K', 'HRBench8K']
if listinstr(res_1_datasets, dataset):
self.max_num = 1
elif listinstr(res_12_datasets, dataset):
self.max_num = 12
elif listinstr(res_18_datasets, dataset):
self.max_num = 18
elif listinstr(res_24_datasets, dataset):
self.max_num = 24
else:
self.max_num = 6
def generate_v1_2(self, message, dataset=None):
self.INTERLEAVE = False
prompt, image_path = self.message_to_promptimg(message, dataset=dataset)
image = Image.open(image_path).convert('RGB')
image = image.resize((self.image_size, self.image_size))
image_processor = CLIPImageProcessor.from_pretrained(self.model_path)
pixel_values = image_processor(images=image, return_tensors='pt').pixel_values
pixel_values = pixel_values.to(torch.bfloat16).to(self.device)
with torch.no_grad():
response = self.model.chat(self.tokenizer, pixel_values=pixel_values,
question=prompt, generation_config=self.kwargs)
return response
def generate_v1_5(self, message, dataset=None):
image_num = len([x for x in message if x['type'] == 'image'])
prompt = '\n'.join([x['value'] for x in message if x['type'] == 'text'])
if listinstr(['Video'], dataset):
prompt = self.build_video_prompt(prompt, dataset)
if image_num > 1:
image_path = [x['value'] for x in message if x['type'] == 'image']
pixel_values_list = []
for file_name in image_path:
pixel_values_list.append(load_image(file_name, max_num=self.max_num).to(self.device).to(torch.bfloat16))
pixel_values = torch.cat(pixel_values_list, dim=0)
elif image_num == 1:
image_path = [x['value'] for x in message if x['type'] == 'image'][0]
pixel_values = load_image(image_path, max_num=self.max_num).to(self.device).to(torch.bfloat16)
else:
pixel_values = None
with torch.no_grad():
response = self.model.chat(
self.tokenizer,
pixel_values=pixel_values,
question=prompt,
generation_config=self.kwargs,
verbose=False)
return response
def generate_v2(self, message, dataset=None):
image_num = len([x for x in message if x['type'] == 'image'])
if image_num == 1:
prompt = '<image>\n' + '\n'.join([x['value'] for x in message if x['type'] == 'text'])
else:
prompt, image_idx = '', 1
for x in message:
if x['type'] == 'text':
prompt += x['value']
elif x['type'] == 'image':
prompt += f'<Image-{image_idx}>'
image_idx += 1
prompt = '\n'.join([f'Image-{i + 1}: <image>' for i in range(image_num)]) + '\n' + prompt
if listinstr(['Video', 'MVBench'], dataset):
prompt = self.build_video_prompt(prompt, dataset)
if image_num > 1:
image_path = [x['value'] for x in message if x['type'] == 'image']
num_patches_list = []
pixel_values_list = []
for image_idx, file_name in enumerate(image_path):
upscale_flag = image_idx == 0 and dataset is not None and listinstr(['MMMU_DEV_VAL'], dataset)
curr_pixel_values = load_image(
file_name, max_num=self.max_num, upscale=upscale_flag).to(self.device).to(torch.bfloat16)
num_patches_list.append(curr_pixel_values.size(0))
pixel_values_list.append(curr_pixel_values)
pixel_values = torch.cat(pixel_values_list, dim=0)
elif image_num == 1:
image_path = [x['value'] for x in message if x['type'] == 'image'][0]
upscale_flag = listinstr(['MMMU_DEV_VAL'], dataset)
pixel_values = load_image(
image_path, max_num=self.max_num, upscale=upscale_flag).to(self.device).to(torch.bfloat16)
num_patches_list = [pixel_values.size(0)]
else:
pixel_values = None
num_patches_list = []
with torch.no_grad():
response = self.model.chat(
self.tokenizer,
pixel_values=pixel_values,
num_patches_list=num_patches_list,
question=prompt,
generation_config=self.kwargs,
verbose=False
)
return response
def generate_inner(self, message, dataset=None):
self.set_max_num(dataset)
print(f'InternVL model version: {self.version}')
if self.version in ['V1.1', 'V1.2']:
return self.generate_v1_2(message, dataset)
elif self.version == 'V1.5':
return self.generate_v1_5(message, dataset)
elif self.version == 'V2.0':
return self.generate_v2(message, dataset)
else:
raise ValueError(f'Unsupported version: {self.version}')
def build_history(self, message):
# Global Variables
image_path = []
image_cnt = 0
def concat_tilist(tilist):
nonlocal image_cnt # Declare image_cnt as nonlocal to modify it
prompt = ''
for item in tilist:
# Substitute the pattern in the text
if item['type'] == 'text':
prompt += re.sub(self.pattern, self.replacement, item['value'])
elif item['type'] == 'image':
image_cnt += 1
prompt += '<image>\n'
image_path.append(item['value'])
return prompt
# Only previous messages
assert len(message) % 2 == 0
history = []
for i in range(len(message) // 2):
m1, m2 = message[2 * i], message[2 * i + 1]
assert m1['role'] == 'user' and m2['role'] == 'assistant'
history.append((concat_tilist(m1['content']), concat_tilist(m2['content'])))
return history, image_path, image_cnt
def chat_inner_v2(self, message, dataset=None):
image_cnt = 0
if len(message) > 1:
history, image_path, image_cnt = self.build_history(message[:-1])
else:
history, image_path, image_cnt = None, [], 1
current_msg = message[-1]
question = ''
# If message is just text in the conversation
if len(current_msg['content']) == 1 and current_msg['content'][0]['type'] == 'text':
question = current_msg['content'][0]['value']
question = re.sub(self.pattern, self.replacement, question) # Fix pattern as per InternVL
else:
for msg in current_msg['content']:
if msg['type'] == 'text':
question += re.sub(self.pattern, self.replacement, msg['value'])
elif msg['type'] == 'image':
image_cnt += 1
question += '<image>\n'
image_path.append(msg['value'])
if image_cnt > 1:
num_patches_list = []
pixel_values_list = []
for image_idx, file_name in enumerate(image_path):
upscale_flag = image_idx == 0 and dataset is not None and listinstr(['MMMU_DEV_VAL'], dataset)
curr_pixel_values = load_image(
file_name, max_num=self.max_num, upscale=upscale_flag).to(self.device).to(torch.bfloat16)
num_patches_list.append(curr_pixel_values.size(0))
pixel_values_list.append(curr_pixel_values)
pixel_values = torch.cat(pixel_values_list, dim=0)
elif image_cnt == 1:
upscale_flag = listinstr(['MMMU_DEV_VAL'], dataset)
pixel_values = load_image(
image_path, max_num=self.max_num, upscale=upscale_flag).to(self.device).to(torch.bfloat16)
num_patches_list = [pixel_values.size(0)]
else:
pixel_values = None
num_patches_list = []
response, history = self.model.chat(
self.tokenizer,
pixel_values=pixel_values,
num_patches_list=num_patches_list,
question=question,
generation_config=self.kwargs,
history=history,
return_history=True
)
response = re.sub(self.reverse_pattern, self.reverse_replacement, response)
return response
def chat_inner(self, message, dataset=None):
self.set_max_num(dataset)
if self.version in ['V1.1', 'V1.2']:
raise ValueError(f'Unsupported version for Multi-Turn: {self.version}')
elif self.version == 'V1.5':
raise ValueError(f'Unsupported version for Multi-Turn: {self.version}')
elif self.version == 'V2.0':
kwargs_default = dict(do_sample=False, max_new_tokens=512, top_p=None, num_beams=1)
self.kwargs = kwargs_default
return self.chat_inner_v2(message, dataset)
else:
raise ValueError(f'Unsupported version for Multi-Turn: {self.version}')
from .llava import LLaVA, LLaVA_Next, LLaVA_Next2, LLaVA_OneVision
from .llava_xtuner import LLaVA_XTuner
__all__ = ['LLaVA', 'LLaVA_Next', 'LLaVA_XTuner', 'LLaVA_Next2', 'LLaVA_OneVision']
import torch
from PIL import Image
from abc import abstractproperty
import sys
import os.path as osp
from ..base import BaseModel
from ...smp import *
from ...dataset import DATASET_TYPE
import copy
class LLaVA(BaseModel):
INSTALL_REQ = True
INTERLEAVE = True
def __init__(self,
model_path='liuhaotian/llava_v1.5_7b',
**kwargs):
try:
from llava.model.builder import load_pretrained_model
from llava.mm_utils import get_model_name_from_path
except:
warnings.warn('Please install llava before using LLaVA')
sys.exit(-1)
warnings.warn('Please install the latest version of llava from github before you evaluate the LLaVA model. ')
assert osp.exists(model_path) or splitlen(model_path) == 2
self.system_prompt = (
'A chat between a curious human and an artificial intelligence assistant. '
"The assistant gives helpful, detailed, and polite answers to the human's questions. "
)
self.stop_str = '</s>'
if model_path == 'Lin-Chen/ShareGPT4V-7B':
model_name = 'llava-v1.5-7b'
elif model_path == 'Lin-Chen/ShareGPT4V-13B':
model_name = 'llava-v1.5-13b'
else:
model_name = get_model_name_from_path(model_path)
try:
self.tokenizer, self.model, self.image_processor, self.context_len = load_pretrained_model(
model_path=model_path,
model_base=None,
model_name=model_name,
device='cpu',
device_map='cpu'
)
except:
if 'ShareGPT4V' in model_path:
import llava
warnings.warn(
'Please manually remove the encoder type check in '
f'{llava.__path__[0]}/model/multimodal_encoder/builder.py '
'Line 8 to use the ShareGPT4V model. ')
else:
warnings.warn('Unknown error when loading LLaVA model.')
exit(-1)
self.model = self.model.cuda()
self.conv_mode = 'llava_v1'
kwargs_default = dict(do_sample=False, temperature=0, max_new_tokens=512, top_p=None, num_beams=1, use_cache=True) # noqa E501
kwargs_default.update(kwargs)
self.kwargs = kwargs_default
warnings.warn(f'Following kwargs received: {self.kwargs}, will use as generation config. ')
def use_custom_prompt(self, dataset):
assert dataset is not None
if DATASET_TYPE(dataset) == 'MCQ':
return True
return False
def build_prompt(self, line, dataset=None):
assert self.use_custom_prompt(dataset)
assert dataset is None or isinstance(dataset, str)
tgt_path = self.dump_image(line, dataset)
question = line['question']
hint = line['hint'] if ('hint' in line and not pd.isna(line['hint'])) else None
if hint is not None:
question = hint + '\n' + question
options = {
cand: line[cand]
for cand in string.ascii_uppercase
if cand in line and not pd.isna(line[cand])
}
for key, item in options.items():
question += f'\n{key}. {item}'
prompt = question
if len(options):
prompt += (
'\n请直接回答选项字母。' if cn_string(prompt) else
"\nAnswer with the option's letter from the given choices directly."
)
else:
prompt += '\n请直接回答问题。' if cn_string(prompt) else '\nAnswer the question directly.'
message = [dict(type='image', value=s) for s in tgt_path]
message.append(dict(type='text', value=prompt))
return message
def concat_tilist(self, message):
text, images = '', []
for item in message:
if item['type'] == 'text':
text += item['value']
elif item['type'] == 'image':
text += ' <image> '
images.append(item['value'])
return text, images
def chat_inner(self, message, dataset=None):
from llava.mm_utils import process_images, tokenizer_image_token, KeywordsStoppingCriteria
from llava.constants import IMAGE_TOKEN_INDEX
prompt = self.system_prompt
images = []
for utter in message:
prompt += 'USER: ' if utter['role'] == 'user' else 'ASSISTANT: '
content, images_sub = self.concat_tilist(utter['content'])
prompt += content
images.extend(images_sub)
prompt += ' ' if utter['role'] == 'user' else self.stop_str
assert message[-1]['role'] == 'user', message
prompt += 'ASSISTANT: '
images = [Image.open(s).convert('RGB') for s in images]
args = abstractproperty()
args.image_aspect_ratio = 'pad'
image_tensor = process_images(images, self.image_processor, args).to('cuda', dtype=torch.float16)
input_ids = tokenizer_image_token(
prompt, self.tokenizer, IMAGE_TOKEN_INDEX, return_tensors='pt').unsqueeze(0).cuda()
keywords = [self.stop_str]
stopping_criteria = KeywordsStoppingCriteria(keywords, self.tokenizer, input_ids)
with torch.inference_mode():
output_ids = self.model.generate(
input_ids, images=image_tensor, stopping_criteria=[stopping_criteria], **self.kwargs)
output = self.tokenizer.batch_decode(output_ids, skip_special_tokens=True)[0].strip()
return output
def generate_inner(self, message, dataset=None):
from llava.mm_utils import process_images, tokenizer_image_token, KeywordsStoppingCriteria
from llava.constants import IMAGE_TOKEN_INDEX
# Support interleave text and image
content, images = self.concat_tilist(message)
images = [Image.open(s).convert('RGB') for s in images]
args = abstractproperty()
args.image_aspect_ratio = 'pad'
if images:
image_tensor = process_images(images, self.image_processor, args).to('cuda', dtype=torch.float16)
else:
image_tensor = None
prompt = self.system_prompt + 'USER: ' + content + ' ASSISTANT: '
input_ids = tokenizer_image_token(
prompt, self.tokenizer, IMAGE_TOKEN_INDEX, return_tensors='pt').unsqueeze(0).cuda()
keywords = [self.stop_str]
stopping_criteria = KeywordsStoppingCriteria(keywords, self.tokenizer, input_ids)
with torch.inference_mode():
output_ids = self.model.generate(
input_ids, images=image_tensor, stopping_criteria=[stopping_criteria], **self.kwargs)
output = self.tokenizer.batch_decode(output_ids, skip_special_tokens=True)[0].strip()
return output
class LLaVA_Next(BaseModel):
INSTALL_REQ = False
INTERLEAVE = True
def __init__(self, model_path='llava-hf/llava-v1.6-vicuna-7b-hf', **kwargs):
import transformers
from transformers import LlavaNextProcessor, LlavaNextForConditionalGeneration, \
AutoProcessor, LlavaForConditionalGeneration
self.model_path = model_path
if '34b' in model_path.lower():
self.processor = LlavaNextProcessor.from_pretrained(self.model_path, use_fast=False)
elif 'interleave' in model_path.lower():
self.processor = AutoProcessor.from_pretrained(self.model_path)
else:
self.processor = LlavaNextProcessor.from_pretrained(self.model_path)
flash_attn_flag = False
try:
import flash_attn
flash_attn_flag = True
except ImportError:
pass
if flash_attn_flag:
if 'interleave' in model_path.lower():
model = LlavaForConditionalGeneration.from_pretrained(
self.model_path, torch_dtype=torch.float16, low_cpu_mem_usage=True, use_flash_attention_2=True)
else:
model = LlavaNextForConditionalGeneration.from_pretrained(
self.model_path, torch_dtype=torch.float16, low_cpu_mem_usage=True, use_flash_attention_2=True)
else:
if 'interleave' in model_path.lower():
model = LlavaForConditionalGeneration.from_pretrained(
self.model_path, torch_dtype=torch.float16, low_cpu_mem_usage=True)
else:
model = LlavaNextForConditionalGeneration.from_pretrained(
self.model_path, torch_dtype=torch.float16, low_cpu_mem_usage=True)
model = model.eval()
self.model = model.cuda()
kwargs_default = dict(do_sample=False, temperature=0, max_new_tokens=512, top_p=None, num_beams=1)
kwargs_default.update(kwargs)
self.kwargs = kwargs_default
warnings.warn(f'Following kwargs received: {self.kwargs}, will use as generation config. ')
def apply_prompt_template(self, prompt):
model_path = self.model_path.lower()
if 'mistral' in model_path:
template = '[INST] PLACEHOLDER [/INST]'
elif 'vicuna' in model_path:
template = (
'A chat between a curious human and an artificial intelligence assistant. '
"The assistant gives helpful, detailed, and polite answers to the human's questions. "
'USER: PLACEHOLDER ASSISTANT:'
)
elif '34b' in model_path:
template = (
'<|im_start|>system\nAnswer the questions.<|im_end|><|im_start|>user\nPLACEHOLDER<|im_end|>'
'<|im_start|>assistant\n'
)
else:
raise NotImplementedError(f'Prompt template for {model_path} not implemented.')
prompt = template.replace('PLACEHOLDER', f'<image>\n{prompt}')
return prompt
def output_process(self, answer):
if '<s>' in answer:
answer = answer.replace('<s>', '').strip()
if '[/INST]' in answer:
answer = answer.split('[/INST]')[1].strip()
elif 'ASSISTANT:' in answer:
answer = answer.split('ASSISTANT:')[1].strip()
elif 'assistant\n' in answer:
answer = answer.split('assistant\n')[1].strip()
elif '<|end_header_id|>\n\n' in answer:
answer = answer.split('<|end_header_id|>\n\n')[2].strip()
if '</s>' in answer:
answer = answer.split('</s>')[0].strip()
elif '<|im_end|>' in answer:
answer = answer.split('<|im_end|>')[0].strip()
elif '<|eot_id|>' in answer:
answer = answer.split('<|eot_id|>')[0].strip()
return answer
def use_custom_prompt(self, dataset):
assert dataset is not None
if DATASET_TYPE(dataset) == 'MCQ':
return True
return False
def build_prompt(self, line, dataset=None):
assert self.use_custom_prompt(dataset)
assert dataset is None or isinstance(dataset, str)
tgt_path = self.dump_image(line, dataset)
question = line['question']
hint = line['hint'] if ('hint' in line and not pd.isna(line['hint'])) else None
if hint is not None:
question = hint + '\n' + question
options = {
cand: line[cand]
for cand in string.ascii_uppercase
if cand in line and not pd.isna(line[cand])
}
for key, item in options.items():
question += f'\n{key}. {item}'
prompt = question
if len(options):
prompt += (
'\n请直接回答选项字母。' if cn_string(prompt) else
"\nAnswer with the option's letter from the given choices directly."
)
else:
prompt += '\n请直接回答问题。' if cn_string(prompt) else '\nAnswer the question directly.'
message = [dict(type='image', value=s) for s in tgt_path]
message.append(dict(type='text', value=prompt))
return message
def generate_inner(self, message, dataset=None):
content, images = [], []
for msg in message:
if msg['type'] == 'text':
content.append({'type': msg['type'], 'text': msg['value']})
else:
content.append({'type': 'image'})
images.append(Image.open(msg['value']).convert('RGB'))
conversation = [
{
'role': 'user',
'content': content,
}
]
prompt = self.processor.apply_chat_template(conversation, add_generation_prompt=True)
inputs = self.processor(prompt, images, return_tensors='pt').to('cuda', torch.float16)
output = self.model.generate(**inputs, **self.kwargs)
answer = self.processor.decode(output[0], skip_special_token=True)
answer = self.output_process(answer)
return answer
class LLaVA_Next2(BaseModel):
INSTALL_REQ = True
INTERLEAVE = True
DEFAULT_IMAGE_TOKEN = '<image>'
IMAGE_TOKEN_INDEX = -200
def __init__(self, model_path='lmms-lab/llama3-llava-next-8b', **kwargs):
assert model_path is not None
try:
from llava.model.builder import load_pretrained_model
from llava.conversation import conv_templates
from llava.mm_utils import get_model_name_from_path, tokenizer_image_token
except:
warnings.warn('Please `pip install git+https://github.com/LLaVA-VL/LLaVA-NeXT.git`')
model_name = get_model_name_from_path(model_path)
tokenizer, model, image_processor, _ = load_pretrained_model(model_path, None, model_name, device_map=None)
model.cuda().eval()
model.tie_weights()
if 'llama3' in model_path.lower():
conv_mode = 'llava_llama_3'
elif 'qwen' in model_path.lower():
conv_mode = 'qwen_1_5'
self.conv_template = conv_mode
self.conv_templates = conv_templates
self.tokenizer = tokenizer
self.model = model
self.image_processor = image_processor
self.tokenizer_image_token = tokenizer_image_token
def generate_inner(self, message, dataset=None):
content, images = '', []
for msg in message:
if msg['type'] == 'text':
content += msg['value']
else:
images.append(Image.open(msg['value']).convert('RGB'))
content += (self.DEFAULT_IMAGE_TOKEN + '\n')
preprocess = self.image_processor.preprocess
image_tokenizer = self.tokenizer_image_token
image_tensor = [
preprocess(f, return_tensors='pt')['pixel_values'][0].half().cuda() for f in images
]
image_tensor = torch.stack(image_tensor)
conv = copy.deepcopy(self.conv_templates[self.conv_template])
conv.append_message(conv.roles[0], content)
conv.append_message(conv.roles[1], None)
prompt_question = conv.get_prompt()
input_ids = image_tokenizer(prompt_question, self.tokenizer, self.IMAGE_TOKEN_INDEX, return_tensors='pt')
input_ids = input_ids.unsqueeze(0).cuda()
cont = self.model.generate(
input_ids,
images=image_tensor,
do_sample=False,
temperature=0,
max_new_tokens=512,
)
text_outputs = self.tokenizer.batch_decode(cont, skip_special_tokens=True)[0]
return text_outputs
class LLaVA_OneVision(BaseModel):
INSTALL_REQ = True
INTERLEAVE = True
VIDEO_LLM = True
DEFAULT_IMAGE_TOKEN = '<image>'
IMAGE_TOKEN_INDEX = -200
# This function is used to split InternVL2-Llama3-76B
def split_model(self, model_path):
import math
device_map = {}
num_gpus = torch.cuda.device_count()
rank, world_size = get_rank_and_world_size()
num_gpus = num_gpus // world_size
if '72b' not in model_path.lower():
return None
# embed_tokens, vision_tower, mm_projector, lm_head are treated as 2 layers
num_layers = 80 + 8
num_layers_per_gpu = math.ceil(num_layers / num_gpus)
num_layers_per_gpu = [num_layers_per_gpu] * num_gpus
num_layers_per_gpu[0] -= 6
num_layers_per_gpu[-1] -= 2
layer_cnt = 0
for i, num_layer in enumerate(num_layers_per_gpu):
for j in range(num_layer):
device_map[f'model.layers.{layer_cnt}'] = rank + world_size * i
layer_cnt += 1
last_gpu = rank + world_size * (num_gpus - 1)
device_map['model.image_newline'] = rank
device_map['model.embed_tokens'] = rank
device_map['model.norm'] = rank
device_map['model.vision_tower'] = rank
device_map['model.vision_resampler'] = rank
device_map['model.mm_projector'] = rank
device_map['lm_head'] = last_gpu
return device_map
def __init__(self, model_path='lmms-lab/llava-onevision-qwen2-7b-si', **kwargs):
assert model_path is not None
try:
from llava.model.builder import load_pretrained_model
from llava.conversation import conv_templates
from llava.mm_utils import get_model_name_from_path, process_images, tokenizer_image_token
except ImportError:
warnings.warn('Please `pip install git+https://github.com/LLaVA-VL/LLaVA-NeXT.git`')
model_name = get_model_name_from_path(model_path)
device_map = self.split_model(model_path)
if device_map is None:
tokenizer, model, image_processor, _ = load_pretrained_model(model_path, None, model_name, device_map='cpu')
model.cuda()
else:
tokenizer, model, image_processor, _ = load_pretrained_model(
model_path, None, model_name, device_map=device_map
)
model.eval()
model.tie_weights()
if 'llava' in model_path.lower():
conv_mode = 'qwen_1_5'
self.nframe = 16
if '72b' in model_path.lower():
self.nframe = 32
self.conv_template = conv_mode
self.conv_templates = conv_templates
self.tokenizer = tokenizer
self.model = model
self.image_processor = image_processor
self.tokenizer_image_token = tokenizer_image_token
self.process_images = process_images # Store process_images as a class attribute
def generate_inner_image(self, message, dataset=None):
content, images = '', []
image_sizes = [] # Store image sizes
for msg in message:
if msg['type'] == 'text':
content += msg['value']
else:
img = Image.open(msg['value']).convert('RGB')
images.append(img)
image_sizes.append(img.size) # Store the size of each image
content += (self.DEFAULT_IMAGE_TOKEN + '\n')
# Process images using the class attribute self.process_images
image_tensor = self.process_images(images, self.image_processor, self.model.config)
image_tensor = [_image.to(dtype=torch.float16, device='cuda') for _image in image_tensor]
conv = copy.deepcopy(self.conv_templates[self.conv_template])
conv.append_message(conv.roles[0], content)
conv.append_message(conv.roles[1], None)
prompt_question = conv.get_prompt()
input_ids = self.tokenizer_image_token(prompt_question,
self.tokenizer,
self.IMAGE_TOKEN_INDEX,
return_tensors='pt')
input_ids = input_ids.unsqueeze(0).cuda()
# Pass image sizes along with other parameters
cont = self.model.generate(
input_ids,
images=image_tensor,
image_sizes=image_sizes, # Pass the image sizes here
do_sample=False,
temperature=0,
max_new_tokens=512,
)
text_outputs = self.tokenizer.batch_decode(cont, skip_special_tokens=True)[0]
return text_outputs
def generate_inner_video(self, message, dataset=None):
content, videos = '', []
for msg in message:
if msg['type'] == 'text':
content += msg['value']
else:
videos.append(msg['value'])
content += (self.DEFAULT_IMAGE_TOKEN + '\n')
if len(videos) > 1:
raise ValueError('LLaVA-OneVision does not support multiple videos as input.')
video_frames = self.load_video(videos[0], self.nframe)
image_tensors = []
frames = self.image_processor.preprocess(video_frames, return_tensors='pt')['pixel_values'].half().cuda()
image_tensors.append(frames)
conv = copy.deepcopy(self.conv_templates[self.conv_template])
conv.append_message(conv.roles[0], content)
conv.append_message(conv.roles[1], None)
prompt_question = conv.get_prompt()
input_ids = self.tokenizer_image_token(prompt_question,
self.tokenizer,
self.IMAGE_TOKEN_INDEX,
return_tensors='pt')
input_ids = input_ids.unsqueeze(0).cuda()
image_sizes = [frame.size for frame in video_frames]
modalities = ['video'] * len(video_frames)
# Pass image sizes along with other parameters
cont = self.model.generate(
input_ids,
images=image_tensors,
image_sizes=image_sizes, # Pass the image sizes here
do_sample=False,
temperature=0,
max_new_tokens=512,
modalities=modalities
)
text_outputs = self.tokenizer.batch_decode(cont, skip_special_tokens=True)[0]
return text_outputs
def load_video(self, video_path, max_frames_num):
from decord import VideoReader, cpu
if type(video_path) == str:
vr = VideoReader(video_path, ctx=cpu(0))
else:
vr = VideoReader(video_path[0], ctx=cpu(0))
total_frame_num = len(vr)
uniform_sampled_frames = np.linspace(0, total_frame_num - 1, max_frames_num, dtype=int)
frame_idx = uniform_sampled_frames.tolist()
spare_frames = vr.get_batch(frame_idx).asnumpy()
return spare_frames # (frames, height, width, channels)
def generate_inner(self, message, dataset=None):
if dataset in ['MMBench-Video', 'Video-MME', 'MVBench', 'MVBench_MP4']:
return self.generate_inner_video(message, dataset)
else:
return self.generate_inner_image(message, dataset)
import os
import os.path as osp
import string
import sys
import warnings
import pandas as pd
import torch
from huggingface_hub import snapshot_download
from PIL import Image
from transformers import (AutoModel, AutoModelForCausalLM, AutoTokenizer,
CLIPImageProcessor, CLIPVisionModel,
GenerationConfig, StoppingCriteriaList)
from ..base import BaseModel
from ...smp import cn_string, get_cache_path
from ...dataset import DATASET_TYPE
class LLaVA_XTuner(BaseModel):
INSTALL_REQ = True
INTERLEAVE = False
def __init__(self,
llava_path,
llm_path=None,
visual_encoder_path='openai/clip-vit-large-patch14-336',
visual_select_layer=-2,
prompt_template=None,
stop_words=[],
torch_dtype=torch.float16):
try:
from peft import PeftModel
from xtuner.utils import PROMPT_TEMPLATE, StopWordStoppingCriteria
except Exception:
warnings.warn(
'Please install xtuner with `pip install -U xtuner` before '
'using LLaVA_XTuner')
sys.exit(-1)
if not osp.isdir(llava_path):
cache_path = get_cache_path(llava_path)
if cache_path is not None:
llava_path = cache_path
else:
llava_path = snapshot_download(repo_id=llava_path)
assert osp.exists(llava_path) and osp.isdir(llava_path)
# build visual_encoder
if 'llm' in os.listdir(llava_path):
assert llm_path is None, (
"Please don't specify the `llm_path` since passed "
'`llava_path` contains a LLM!')
llm_path = osp.join(llava_path, 'llm')
else:
assert llm_path is not None, 'Please specify the `llm_path`!'
llm = AutoModelForCausalLM.from_pretrained(llm_path,
trust_remote_code=True,
torch_dtype=torch_dtype,
device_map='cpu')
tokenizer = AutoTokenizer.from_pretrained(llm_path,
trust_remote_code=True,
encode_special_tokens=True)
print(f'Load LLM from {llm_path}')
# build visual_encoder
if 'visual_encoder' in os.listdir(llava_path):
assert visual_encoder_path is None, (
"Please don't specify the `visual_encoder_path` since passed "
'`llava_path` contains a visual encoder!')
visual_encoder_path = osp.join(llava_path, 'visual_encoder')
else:
assert visual_encoder_path is not None, (
'Please specify the `visual_encoder_path`!')
visual_encoder = CLIPVisionModel.from_pretrained(
visual_encoder_path, torch_dtype=torch_dtype, device_map='cpu')
image_processor = CLIPImageProcessor.from_pretrained(
visual_encoder_path)
print(f'Load visual_encoder from {visual_encoder_path}')
# load adapter
if 'llm_adapter' in os.listdir(llava_path):
adapter_path = osp.join(llava_path, 'llm_adapter')
llm = PeftModel.from_pretrained(llm,
adapter_path,
trust_remote_code=True,
device_map='cpu')
print(f'Load LLM adapter from {llava_path}')
if 'visual_encoder_adapter' in os.listdir(llava_path):
adapter_path = osp.join(llava_path, 'visual_encoder_adapter')
visual_encoder = PeftModel.from_pretrained(visual_encoder,
adapter_path,
trust_remote_code=True,
device_map='cpu')
print(f'Load visual_encoder adapter from {llava_path}')
# build projector
projector_path = osp.join(llava_path, 'projector')
projector = AutoModel.from_pretrained(projector_path,
trust_remote_code=True,
torch_dtype=torch_dtype,
device_map='cpu')
print(f'Load projector from {llava_path}')
llm.eval()
visual_encoder.eval()
projector.eval()
self.llm = llm.cuda()
self.tokenizer = tokenizer
self.visual_encoder = visual_encoder.cuda()
self.image_processor = image_processor
self.projector = projector.cuda()
self.visual_select_layer = visual_select_layer
if prompt_template is not None:
# modified prompt template
if prompt_template == 'llama3_chat':
self.prompt_template = dict(
SYSTEM=('<|start_header_id|>system<|end_header_id|>\n\n'
'{system}<|eot_id|>'),
INSTRUCTION=(
'<|start_header_id|>user<|end_header_id|>\n\n{input}<|eot_id|>'
'<|start_header_id|>assistant<|end_header_id|>\n\n'),
SUFFIX='<|eot_id|>',
SUFFIX_AS_EOS=True,
STOP_WORDS=['<|eot_id|>'])
else:
self.prompt_template = PROMPT_TEMPLATE[prompt_template]
stop_words += self.prompt_template.get('STOP_WORDS', [])
else:
self.prompt_template = None
self.stop_criteria = StoppingCriteriaList()
for word in stop_words:
self.stop_criteria.append(
StopWordStoppingCriteria(self.tokenizer, word))
def build_gen_config(self, dataset):
gen_kwargs = dict(max_new_tokens=512,
do_sample=True,
temperature=1,
num_beams=5,
eos_token_id=self.tokenizer.eos_token_id,
pad_token_id=self.tokenizer.pad_token_id
if self.tokenizer.pad_token_id is not None else
self.tokenizer.eos_token_id)
# For single word generation
if (dataset is not None
and DATASET_TYPE(dataset) in ['MCQ', 'Y/N']):
gen_kwargs.update(
dict(max_new_tokens=5, do_sample=False, num_beams=1))
return GenerationConfig(**gen_kwargs)
def use_custom_prompt(self, dataset):
assert dataset is not None
if DATASET_TYPE(dataset) == 'MCQ':
return True
return False
def build_prompt(self, line, dataset=None):
assert self.use_custom_prompt(dataset)
assert dataset is None or isinstance(dataset, str)
tgt_path = self.dump_image(line, dataset)
question = line['question']
hint = line['hint'] if ('hint' in line
and not pd.isna(line['hint'])) else None
if hint is not None:
question = hint + '\n' + question
options = {
cand: line[cand]
for cand in string.ascii_uppercase
if cand in line and not pd.isna(line[cand])
}
for key, item in options.items():
question += f'\n{key}. {item}'
if not cn_string(question):
prompt = question + '\n' + ("Answer with the option's letter "
'from the given choices directly.')
else:
prompt = question + '\n' + '请直接回答选项字母。'
message = [dict(type='text', value=prompt)]
message.extend([dict(type='image', value=s) for s in tgt_path])
return message
def generate_inner(self, message, dataset=None):
from xtuner.dataset.utils import expand2square
from xtuner.model.utils import prepare_inputs_labels_for_multimodal
from xtuner.utils import DEFAULT_IMAGE_TOKEN, IMAGE_TOKEN_INDEX
prompt, image_path = self.message_to_promptimg(message, dataset=dataset)
prompt = prompt.replace('<image>', '')
image = Image.open(image_path).convert('RGB')
image = expand2square(
image,
tuple(int(x * 255) for x in self.image_processor.image_mean))
image = self.image_processor.preprocess(
image, return_tensors='pt')['pixel_values'][0]
image = image.cuda().unsqueeze(0)
visual_outputs = self.visual_encoder(image, output_hidden_states=True)
pixel_values = self.projector(
visual_outputs.hidden_states[self.visual_select_layer][:, 1:])
inputs = DEFAULT_IMAGE_TOKEN + '\n' + prompt
if self.prompt_template:
inputs = self.prompt_template['INSTRUCTION'].format(input=inputs)
chunk_encode = []
for idx, chunk in enumerate(inputs.split(DEFAULT_IMAGE_TOKEN)):
if idx == 0:
cur_encode = self.tokenizer(chunk)
else:
cur_encode = self.tokenizer(chunk, add_special_tokens=False)
chunk_encode.append(cur_encode)
assert len(chunk_encode) == 2
ids = []
for idx, cur_chunk_encode in enumerate(chunk_encode):
ids.extend(cur_chunk_encode['input_ids'])
if idx != len(chunk_encode) - 1:
ids.append(IMAGE_TOKEN_INDEX)
ids = torch.tensor(ids).cuda().unsqueeze(0)
mm_inputs = prepare_inputs_labels_for_multimodal(
llm=self.llm, input_ids=ids, pixel_values=pixel_values)
gen_config = self.build_gen_config(dataset)
generate_output = self.llm.generate(
**mm_inputs,
generation_config=gen_config,
streamer=None,
bos_token_id=self.tokenizer.bos_token_id,
stopping_criteria=self.stop_criteria)
predict = self.tokenizer.decode(generate_output[0],
skip_special_tokens=True).strip()
return predict
import torch
from PIL import Image
from abc import abstractproperty
from .base import BaseModel
from ..smp import *
from ..dataset import DATASET_TYPE
import warnings
class Mantis(BaseModel):
"""
Mantis Model
This implementation is adpated from the Llava model from llava.py and the Idefics model from idefics.py
"""
INSTALL_REQ = True
INTERLEAVE = True
DEFAULT_IMAGE_TOKEN = '<image>'
IMAGE_TOKEN_INDEX = -200
def __init__(self, model_path='TIGER-Lab/Mantis-8B-siglip-llama3', **kwargs):
assert model_path is not None
try:
from mantis.models.mllava import LlavaForConditionalGeneration, MLlavaProcessor
from mantis.models.mfuyu import MFuyuForCausalLM, MFuyuProcessor
from mantis.models.conversation import conv_mllava_v1 as default_conv, conv_templates
except:
warnings.warn(
"Mantis is not installed. Please install Mantis to use this model.Please use 'pip install "
"git+https://github.com/TIGER-AI-Lab/Mantis.git' to install"
)
try:
from transformers import AutoModelForVision2Seq, AutoProcessor
except Exception as e:
warnings.warn("Upgrade transformers to use Mantis's idefics model.\nError: %s" % e)
except:
warnings.warn('Please `pip install git+https://github.com/LLaVA-VL/LLaVA-NeXT.git')
# inference implementation for attention, can be "sdpa", "eager", "flash_attention_2".
# Seems FA2 is not effective during inference:
# https://discuss.huggingface.co/t/flash-attention-has-no-effect-on-inference/73453/5
# if is_flash_attn_2_available:
# best_fit_attn_implementation = "flash_attention_2"
# flash_attn has a bug that says: ERROR Error query and key must have the same dtype in generating
try:
import flash_attn
best_fit_attn_implementation = 'flash_attention_2'
except ImportError:
best_fit_attn_implementation = 'eager'
self.model_path = model_path
attn_implementation = best_fit_attn_implementation
self._is_idefics = 'idefics' in model_path.lower()
# Here load the "non-idefics" Mantis model.
if not self._is_idefics:
if 'fuyu' in model_path.lower():
self.processor = MFuyuProcessor.from_pretrained(self.model_path)
model = MFuyuForCausalLM.from_pretrained(
self.model_path,
device_map='cuda',
attn_implementation=attn_implementation,
torch_dtype=torch.float16
)
else:
self.processor = MLlavaProcessor.from_pretrained(self.model_path)
model = LlavaForConditionalGeneration.from_pretrained(
self.model_path,
device_map='cuda',
attn_implementation=attn_implementation,
torch_dtype=torch.float16
)
else:
self.processor = AutoProcessor.from_pretrained(self.model_path)
model = AutoModelForVision2Seq.from_pretrained(
self.model_path,
device_map='cuda',
torch_dtype=torch.float16
)
model = model.eval()
self.model = model.cuda()
kwargs_default = dict(do_sample=False, temperature=0, max_new_tokens=1024, top_p=None, num_beams=1)
kwargs_default.update(kwargs)
self.kwargs = kwargs_default
warnings.warn(f'Following kwargs received: {self.kwargs}, will use as generation config. ')
self.tokenizer = self.processor.tokenizer
self.default_conv = default_conv
self.conv_templates = conv_templates
def use_custom_prompt(self, dataset):
assert dataset is not None
if DATASET_TYPE(dataset) == 'MCQ':
return True
return False
def build_prompt(self, line, dataset=None):
assert self.use_custom_prompt(dataset)
assert dataset is None or isinstance(dataset, str)
tgt_path = self.dump_image(line, dataset)
question = line['question']
hint = line['hint'] if ('hint' in line and not pd.isna(line['hint'])) else None
if hint is not None:
question = hint + '\n' + question
options = {
cand: line[cand]
for cand in string.ascii_uppercase
if cand in line and not pd.isna(line[cand])
}
for key, item in options.items():
question += f'\n{key}. {item}'
prompt = question
if len(options):
prompt += (
'\n请直接回答选项字母。' if cn_string(prompt) else
"\nAnswer with the option's letter from the given choices directly."
)
else:
prompt += '\n请直接回答问题。' if cn_string(prompt) else '\nAnswer the question directly.'
message = [dict(type='image', value=s) for s in tgt_path]
message.append(dict(type='text', value=prompt))
return message
def output_process(self, answer):
if '<s>' in answer:
answer = answer.replace('<s>', '').strip()
if '[/INST]' in answer:
answer = answer.split('[/INST]')[1].strip()
elif 'ASSISTANT:' in answer:
answer = answer.split('ASSISTANT:')[1].strip()
elif 'assistant\n' in answer:
answer = answer.split('assistant\n')[1].strip()
elif '<|end_header_id|>\n\n' in answer:
answer = answer.split('<|end_header_id|>\n\n')[2].strip()
if '</s>' in answer:
answer = answer.split('</s>')[0].strip()
elif '<|im_end|>' in answer:
answer = answer.split('<|im_end|>')[0].strip()
elif '<|eot_id|>' in answer:
answer = answer.split('<|eot_id|>')[0].strip()
elif '<end_of_utterance>' in answer:
answer = answer.split('<end_of_utterance>')[0].strip()
elif '|ENDOFTEXT|' in answer:
answer = answer.split('|ENDOFTEXT|')[0].strip()
return answer
def generate_inner(self, message, dataset=None):
content, images = '', []
ide_content, question = [], ''
for msg in message:
if msg['type'] == 'text':
content += msg['value']
question += msg['value']
else:
images.append(Image.open(msg['value']).convert('RGB'))
content += (self.DEFAULT_IMAGE_TOKEN + '\n')
ide_content.append({'type': 'image'})
if self._is_idefics:
# Follow the idefics implementation:
ide_content.append({'type': 'text', 'text': question})
prompt = [{'role': 'user', 'content': ide_content}]
prompt = self.processor.apply_chat_template(prompt, add_generation_prompt=True)
else:
# Follow the Mantis code base to make sure they are consistent:
# https://github.com/TIGER-AI-Lab/Mantis/blob/main/mantis/models/mllava/utils.py#L33
# Users don't need to define chat template as it is done here
if 'llama-3' in self.model.language_model.name_or_path.lower():
conv = self.conv_templates['llama_3']
terminators = [
self.processor.tokenizer.eos_token_id,
self.processor.tokenizer.convert_tokens_to_ids('<|eot_id|>')
]
else:
conv = self.default_conv
terminators = [self.processor.tokenizer.eos_token_id]
# Using EOT because end of *text* is more accurate for what we're doing than end of *sentence*
if 'eos_token_id' not in self.kwargs:
self.kwargs['eos_token_id'] = terminators
conv = conv.copy()
conv.append_message(conv.roles[0], content)
conv.append_message(conv.roles[1], '')
assert conv.messages[-1][0] == conv.roles[1] and conv.messages[-1][1] == '', 'Format check'
prompt = conv.get_prompt()
inputs = self.processor(prompt, images, return_tensors='pt', truncation=True)
# FIXME: Fuyu model would return a list instead of a pytorch tensor. This weird behavior needs fixing.
if 'image_patches' in inputs.keys():
inputs['image_patches'] = inputs['image_patches'][0]
inputs = {k: v.to(self.model.device) for k, v in inputs.items()}
output = self.model.generate(**inputs, **self.kwargs)
output = output[0]
generated_ids = output[inputs['input_ids'].shape[-1]:]
answer = self.processor.decode(generated_ids, skip_special_token=True)
answer = self.output_process(answer)
return answer
import sys
import torch
import os.path as osp
import os
import warnings
from .base import BaseModel
from PIL import Image
'''
Please follow the instructions to download ckpt.
https://github.com/dvlab-research/MGM?tab=readme-ov-file#pretrained-weights
'''
class Mini_Gemini(BaseModel):
INSTALL_REQ = True
INTERLEAVE = False
def __init__(self, model_path, root=None, conv_mode='llava_v1', **kwargs):
if root is None:
warnings.warn('Please set `root` to Mini_Gemini code directory, \
which is cloned from here: "https://github.com/dvlab-research/MGM?tab=readme-ov-file" ')
sys.exit(-1)
warnings.warn('Please follow the instructions of Mini_Gemini to put the ckpt file in the right place, \
which can be found at https://github.com/dvlab-research/MGM?tab=readme-ov-file#structure')
assert model_path == 'YanweiLi/MGM-7B-HD', 'We only support MGM-7B-HD for now'
self.model_path = model_path
sys.path.append(root)
try:
from mgm.model.builder import load_pretrained_model
from mgm.mm_utils import get_model_name_from_path
except:
raise ImportError(
'Please first install Mini_Gemini and set the root path to use Mini_Gemini, '
'which is cloned from here: "https://github.com/dvlab-research/MGM?tab=readme-ov-file" '
)
VLMEvalKit_path = os.getcwd()
os.chdir(root)
warnings.warn('Please set `root` to Mini_Gemini code directory, \
which is cloned from here: "https://github.com/dvlab-research/MGM?tab=readme-ov-file" ')
model_path = osp.join(root, 'work_dirs', 'MGM', 'MGM-7B-HD')
try:
model_name = get_model_name_from_path(model_path)
except:
raise ImportError(
'Please follow the instructions of Mini_Gemini to put the ckpt file in the right place, '
'which can be found at https://github.com/dvlab-research/MGM?tab=readme-ov-file#structure'
)
tokenizer, model, image_processor, context_len = load_pretrained_model(model_path, None, model_name)
os.chdir(VLMEvalKit_path)
self.model = model
self.tokenizer = tokenizer
self.image_processor = image_processor
self.conv_mode = conv_mode
kwargs_default = dict(temperature=float(0), num_beams=1, top_p=None, max_new_tokens=1024, use_cache=True)
kwargs_default.update(kwargs)
do_sample = kwargs_default['temperature'] > 0
kwargs_default.update({'do_sample': do_sample})
self.kwargs = kwargs_default
def generate_inner(self, message, dataset=None):
try:
from mgm.constants import IMAGE_TOKEN_INDEX, DEFAULT_IMAGE_TOKEN, \
DEFAULT_IM_START_TOKEN, DEFAULT_IM_END_TOKEN
from mgm.conversation import conv_templates
from mgm.mm_utils import tokenizer_image_token, process_images
except:
raise ImportError(
'Please first install Mini_Gemini and set the root path to use Mini_Gemini, '
'which is cloned from here: "https://github.com/dvlab-research/MGM?tab=readme-ov-file" '
)
prompt, image = self.message_to_promptimg(message, dataset=dataset)
image = Image.open(image)
prompt = DEFAULT_IM_START_TOKEN + DEFAULT_IMAGE_TOKEN + DEFAULT_IM_END_TOKEN + '\n' + prompt
conv = conv_templates[self.conv_mode].copy()
conv.append_message(conv.roles[0], prompt)
conv.append_message(conv.roles[1], None)
prompt = conv.get_prompt()
input_ids = tokenizer_image_token(prompt, self.tokenizer, IMAGE_TOKEN_INDEX, return_tensors='pt')
input_ids = input_ids.unsqueeze(0).cuda()
if hasattr(self.model.config, 'image_size_aux'):
if not hasattr(self.image_processor, 'image_size_raw'):
self.image_processor.image_size_raw = self.image_processor.crop_size.copy()
self.image_processor.crop_size['height'] = self.model.config.image_size_aux
self.image_processor.crop_size['width'] = self.model.config.image_size_aux
self.image_processor.size['shortest_edge'] = self.model.config.image_size_aux
image_tensor = process_images([image], self.image_processor, self.model.config)[0]
image_grid = getattr(self.model.config, 'image_grid', 1)
if hasattr(self.model.config, 'image_size_aux'):
raw_shape = [
self.image_processor.image_size_raw['height'] * image_grid,
self.image_processor.image_size_raw['width'] * image_grid
]
image_tensor_aux = image_tensor
image_tensor = torch.nn.functional.interpolate(
image_tensor[None],
size=raw_shape,
mode='bilinear',
align_corners=False
)[0]
else:
image_tensor_aux = []
if image_grid >= 2:
raw_image = image_tensor.reshape(
3, image_grid, self.image_processor.image_size_raw['height'],
image_grid, self.image_processor.image_size_raw['width']
)
raw_image = raw_image.permute(1, 3, 0, 2, 4)
raw_image = raw_image.reshape(
-1, 3, self.image_processor.image_size_raw['height'], self.image_processor.image_size_raw['width']
)
if getattr(self.model.config, 'image_global', False):
global_image = image_tensor
if len(global_image.shape) == 3:
global_image = global_image[None]
global_image = torch.nn.functional.interpolate(
global_image,
size=[
self.image_processor.image_size_raw['height'],
self.image_processor.image_size_raw['width']
],
mode='bilinear',
align_corners=False
)
# [image_crops, image_global]
raw_image = torch.cat([raw_image, global_image], dim=0)
image_tensor = raw_image.contiguous()
images = image_tensor[None].to(dtype=self.model.dtype, device='cuda', non_blocking=True)
if len(image_tensor_aux) > 0:
images_aux = image_tensor_aux[None].to(dtype=self.model.dtype, device='cuda', non_blocking=True)
else:
images_aux = None
with torch.inference_mode():
output_ids = self.model.generate(
input_ids,
images=images,
images_aux=images_aux,
# no_repeat_ngram_size=3,
bos_token_id=self.tokenizer.bos_token_id, # Begin of sequence token
eos_token_id=self.tokenizer.eos_token_id, # End of sequence token
pad_token_id=self.tokenizer.pad_token_id, # Pad token
**self.kwargs
)
outputs = self.tokenizer.batch_decode(output_ids, skip_special_tokens=True)[0].strip()
return outputs
import math
import torch
import random
import numpy as np
from PIL import Image
from transformers import AutoModel, AutoTokenizer
from .base import BaseModel
from ..smp import *
from ..dataset import DATASET_TYPE
class MiniCPM_V(BaseModel):
INSTALL_REQ = False
INTERLEAVE = False
def __init__(self, model_path='openbmb/MiniCPM-V', **kwargs):
assert model_path is not None
self.model_path = model_path
print(f'load from {self.model_path}')
self.model = AutoModel.from_pretrained(self.model_path, trust_remote_code=True)
self.model = self.model.to(dtype=torch.bfloat16)
self.model.eval().cuda()
self.kwargs = kwargs
self.tokenizer = AutoTokenizer.from_pretrained(self.model_path, trust_remote_code=True)
torch.cuda.empty_cache()
self.num_beams = 1 if self.model_path == 'openbmb/MiniCPM-V' else 3
def use_custom_prompt(self, dataset):
assert dataset is not None
if listinstr(['MMDU', 'MME-RealWorld', 'MME-RealWorld-CN'], dataset):
# For Multi-Turn we don't have custom prompt
return False
return False
def build_prompt(self, line, dataset=None):
assert dataset is None or isinstance(dataset, str)
assert self.use_custom_prompt(dataset)
tgt_path = self.dump_image(line, dataset)
question = line['question']
options = {
cand: line[cand]
for cand in string.ascii_uppercase
if cand in line and not pd.isna(line[cand])
}
options_prompt = 'Options:\n'
for key, item in options.items():
options_prompt += f'{key}. {item}\n'
hint = line['hint'] if ('hint' in line and not pd.isna(line['hint'])) else None
prompt = ''
if hint is not None:
prompt += f'Hint: {hint}\n'
prompt += f'{question}\n'
if len(options):
prompt += options_prompt
prompt = 'Study the image carefully and pick the option associated with the correct answer. \
Focus solely on selecting the option and avoid including any other content.\n' + prompt
message = [dict(type='text', value=prompt)]
message.extend([dict(type='image', value=p) for p in tgt_path])
return message
def generate_inner(self, message, dataset=None):
prompt, image_path = self.message_to_promptimg(message, dataset=dataset)
image = Image.open(image_path).convert('RGB')
msgs = [{'role': 'user', 'content': prompt}]
if DATASET_TYPE(dataset) == 'MCQ':
max_new_tokens = 20
elif DATASET_TYPE(dataset) == 'Y/N':
max_new_tokens = 100
else:
max_new_tokens = 1024
default_kwargs = dict(
max_new_tokens=max_new_tokens,
sampling=False,
num_beams=self.num_beams
)
default_kwargs.update(self.kwargs)
res, _, _ = self.model.chat(
image=image,
msgs=msgs,
context=None,
tokenizer=self.tokenizer,
**default_kwargs
)
return res
class MiniCPM_Llama3_V(BaseModel):
INSTALL_REQ = False
INTERLEAVE = True
def __init__(self, model_path='openbmb/MiniCPM-Llama3-V-2_5', **kwargs):
assert model_path is not None
self.model_path = model_path
print(f'load from {self.model_path}')
self.model = AutoModel.from_pretrained(self.model_path, trust_remote_code=True)
self.model = self.model.to(dtype=torch.float16)
self.model.eval().cuda()
self.kwargs = kwargs
self.tokenizer = AutoTokenizer.from_pretrained(self.model_path, trust_remote_code=True)
torch.cuda.empty_cache()
self.num_beams = 1 if self.model_path == 'openbmb/MiniCPM-V' else 3
self.options_system_prompt = ('Carefully read the following question and select the letter corresponding '
'to the correct answer. Highlight the applicable choices without giving '
'explanations.')
self.wo_options_system_prompt = 'Carefully read the following question Answer the question directly.'
self.detail_system_prompt = 'Answer this question in detail.'
self.vqa_prompt = 'Answer the question using a single word or phrase.'
def use_custom_prompt(self, dataset):
if listinstr(['MCQ', 'VQA'], DATASET_TYPE(dataset)):
return True
elif dataset is not None and listinstr(['HallusionBench'], dataset):
return True
return False
def build_prompt(self, line, dataset=None):
if isinstance(line, int):
line = self.data.iloc[line]
tgt_path = self.dump_image(line, dataset)
system_prompt = ''
question = line['question']
if DATASET_TYPE(dataset) == 'MCQ':
options = {
cand: line[cand]
for cand in string.ascii_uppercase
if cand in line and not pd.isna(line[cand])
}
options_prompt = 'Options:\n'
for key, item in options.items():
options_prompt += f'{key}. {item}\n'
hint = line['hint'] if ('hint' in line and not pd.isna(line['hint'])) else None
prompt = ''
if hint is not None:
prompt += f'Hint: {hint}\n'
prompt += f'Question: {question}\n'
if len(options):
prompt += options_prompt
system_prompt = self.options_system_prompt + '\nPlease just indicate your choice.'
else:
system_prompt = self.wo_options_system_prompt
if 'MMMU' in dataset: # Corner Case
prompt = system_prompt + '\n' + prompt
system_prompt = ''
elif dataset is not None and listinstr(['HallusionBench'], dataset):
question = line['question'] + ' Yes or No?'
prompt = question
elif dataset is not None and listinstr(['MME'], dataset):
question = line['question'] + ' Yes or No?'
prompt = question
elif dataset is not None and listinstr(['OCRBench'], dataset):
system_prompt = self.vqa_prompt
question = line['question']
prompt = question
elif DATASET_TYPE(dataset) == 'VQA':
if listinstr(['LLaVABench', 'MMLongBench_DOC'], dataset):
system_prompt = ''
prompt = question
elif listinstr(['MMVet'], dataset):
system_prompt = self.detail_system_prompt
prompt = question
else:
system_prompt = self.vqa_prompt
prompt = question
msgs = []
if system_prompt:
msgs.append(dict(type='text', value=system_prompt))
if isinstance(tgt_path, list):
msgs.extend([dict(type='image', value=p) for p in tgt_path])
else:
msgs = [dict(type='image', value=tgt_path)]
msgs.append(dict(type='text', value=prompt))
return msgs
def generate_inner(self, message, dataset=None):
if DATASET_TYPE(dataset) == 'MCQ':
max_new_tokens = 200
elif DATASET_TYPE(dataset) == 'Y/N':
max_new_tokens = 3
else:
max_new_tokens = 1024
default_kwargs = dict(
max_new_tokens=max_new_tokens,
sampling=False,
num_beams=self.num_beams,
)
default_kwargs.update(self.kwargs)
content = []
for x in message:
if x['type'] == 'text':
content.append(x['value'])
elif x['type'] == 'image':
image = Image.open(x['value']).convert('RGB')
content.append(image)
msgs = [{'role': 'user', 'content': content}]
res = self.model.chat(
msgs=msgs,
context=None,
image=None,
tokenizer=self.tokenizer,
**default_kwargs
)
if isinstance(res, tuple) and len(res) > 0:
res = res[0]
return res
def chat_inner(self, message, dataset=None):
max_new_tokens = 1024
default_kwargs = dict(
max_new_tokens=max_new_tokens,
sampling=False,
num_beams=self.num_beams,
)
default_kwargs.update(self.kwargs)
msgs = []
for msg in message:
content = []
if len(msg['content']) == 1 and msg['content'][0]['type'] == 'text':
msg_new = {'role': msg['role'], 'content': msg['content'][0]['value']}
msgs.append(msg_new)
continue
for x in msg['content']:
if x['type'] == 'text':
content.append(x['value'])
elif x['type'] == 'image':
image = Image.open(x['value']).convert('RGB')
content.append(image)
msg_new = {'role': msg['role'], 'content': content}
msgs.append(msg_new)
res = self.model.chat(
msgs=msgs,
context=None,
image=None,
tokenizer=self.tokenizer,
**default_kwargs)
if isinstance(res, tuple) and len(res) > 0:
res = res[0]
return res
class MiniCPM_V_2_6(BaseModel):
INSTALL_REQ = False
INTERLEAVE = True
def __init__(self, model_path='openbmb/MiniCPM-V', **kwargs):
random.seed(0)
np.random.seed(0)
torch.manual_seed(0)
torch.cuda.manual_seed_all(0)
assert model_path is not None
self.model_path = model_path
print(f'load from path {self.model_path}')
self.model = AutoModel.from_pretrained(self.model_path, trust_remote_code=True)
self.model = self.model.to(dtype=torch.bfloat16)
self.model.eval().cuda()
self.kwargs = kwargs
self.tokenizer = AutoTokenizer.from_pretrained(self.model_path, trust_remote_code=True)
torch.cuda.empty_cache()
self.num_beams = 1 if self.model_path == 'openbmb/MiniCPM-V' else 3
self.options_suffix_prompt = '''\nAnswer with the option's letter from the given choices directly.'''
self.wo_options_system_prompt = 'Carefully read the following question Answer the question directly.'
self.detail_system_prompt = 'Answer this question in detail.'
self.vqa_prompt = 'Answer the question using a single word or phrase.'
self.multi_choice_cot_prompt = ('''Carefully read the following multichoice question, solve it step '''
'''by step and finally pick the option associated with the correct '''
'''answer in the format of "Answer: selected option\n\n''')
self.short_ans_cot_prompt = ('''Read the following question carefully, solve it step by step, and '''
'''then output the final answer in the format of "Answer: single number '''
'''or single word or phrase".\n\n''')
def use_custom_prompt(self, dataset=None):
if dataset is None:
return False
if DATASET_TYPE(dataset) in ['MCQ', 'VQA', 'Y/N']:
return True
return False
def use_cot(self, dataset=None):
if dataset is None:
return False
if listinstr(['MMMU', 'HallusionBench', 'OCRBench', 'ChartQA'], dataset):
return True
elif listinstr(['MathVista', 'MMVet', 'MMBench', 'MMStar', 'AI2D', 'RealWorldQA',
'POPE', 'ScienceQA', 'TextVQA', 'DocVQA'], dataset):
return False
else:
return False
def use_upsize(self, dataset=None):
if dataset is None:
return False
if listinstr(['MMVet', 'MMBench', 'MMStar', 'AI2D', 'OCRBench'], dataset):
return True
else:
return False
def build_prompt(self, line, dataset=None):
if isinstance(line, int):
line = self.data.iloc[line]
tgt_path = self.dump_image(line, dataset)
system_prompt, prompt = '', ''
question = line['question']
if not self.use_cot(dataset):
if DATASET_TYPE(dataset) == 'MCQ':
options = {
cand: line[cand]
for cand in string.ascii_uppercase
if cand in line and not pd.isna(line[cand])
}
options_prompt = 'Options:\n'
for key, item in options.items():
options_prompt += f'{key}. {item}\n'
hint = line['hint'] if ('hint' in line and not pd.isna(line['hint'])) else None
if hint is not None:
prompt += f'Hint: {hint}\n'
prompt += f'Question: {question}\n'
if len(options):
prompt += options_prompt
prompt += self.options_suffix_prompt
else:
system_prompt = self.wo_options_system_prompt
if 'MMMU' in dataset:
if len(system_prompt) > 0:
prompt = system_prompt + '\n' + prompt
system_prompt = ''
elif dataset is not None and listinstr(['HallusionBench'], dataset):
question += ' Yes or No?'
prompt = question
elif dataset is not None and listinstr(['OCRBench'], dataset):
system_prompt = self.vqa_prompt
prompt = question
elif DATASET_TYPE(dataset) == 'VQA':
if listinstr(['LLaVABench'], dataset):
system_prompt = ''
elif listinstr(['MMVet'], dataset):
system_prompt = self.detail_system_prompt
else:
system_prompt = self.vqa_prompt
prompt = question
else:
prompt = question
else:
has_options = True
if DATASET_TYPE(dataset) == 'MCQ':
options = {
cand: line[cand]
for cand in string.ascii_uppercase
if cand in line and not pd.isna(line[cand])
}
options_prompt = ''
for key, item in options.items():
options_prompt += f'{key}. {item}\n'
hint = line['hint'] if ('hint' in line and not pd.isna(line['hint'])) else None
if hint is not None:
prompt += f'Hint: {hint}\n'
prompt += f'{question}\n'
if len(options):
prompt += options_prompt
else:
has_options = False
if 'MMMU' in dataset:
if len(system_prompt) > 0:
prompt = system_prompt + '\n' + prompt
system_prompt = ''
else:
prompt = question
if DATASET_TYPE(dataset) in ['MCQ', 'Y/N', 'VQA']:
if DATASET_TYPE(dataset) == 'MCQ':
if has_options:
prompt = self.multi_choice_cot_prompt + prompt
else:
prompt = self.short_ans_cot_prompt + prompt
elif DATASET_TYPE(dataset) == 'Y/N':
prompt = self.short_ans_cot_prompt + prompt
else:
prompt = self.short_ans_cot_prompt + prompt
msgs = []
if system_prompt:
msgs.append(dict(type='text', value=system_prompt))
if isinstance(tgt_path, list):
msgs.extend([dict(type='image', value=p) for p in tgt_path])
else:
msgs = [dict(type='image', value=tgt_path)]
msgs.append(dict(type='text', value=prompt))
return msgs
def generate_inner(self, message, dataset=None):
if listinstr(['Video', 'MVBench'], dataset):
max_slice_nums = 1
use_image_id = False
max_inp_length = 2048 * 10
else:
max_slice_nums = None
use_image_id = True
max_inp_length = 8192
max_new_tokens = 2048
default_kwargs = dict(
max_new_tokens=max_new_tokens,
sampling=False,
num_beams=self.num_beams,
)
default_kwargs.update(self.kwargs)
content = []
for x in message:
if x['type'] == 'text':
content.append(x['value'])
elif x['type'] == 'image':
image = Image.open(x['value']).convert('RGB')
if not self.use_upsize(dataset):
content.append(image)
else:
img_width, img_height = image.width, image.height
if (img_width * img_height) >= (1344 * 1344):
content.append(image)
else:
ratio = math.sqrt((1344 * 1344) / (img_width * img_height))
max_img_width = int(img_width * ratio)
new_img_width = random.randint(img_width, max_img_width)
new_img_height = int(new_img_width / img_width * img_height)
resized_image = image.resize((new_img_width, new_img_height))
content.append(resized_image)
msgs = [{'role': 'user', 'content': content}]
res = self.model.chat(
image=None,
msgs=msgs,
context=None,
tokenizer=self.tokenizer,
max_inp_length=max_inp_length,
use_image_id=use_image_id,
max_slice_nums=max_slice_nums,
**default_kwargs
)
if isinstance(res, tuple) and len(res) > 0:
res = res[0]
return res
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment